This is an automated email from the ASF dual-hosted git repository. potiuk pushed a commit to branch v2-8-test in repository https://gitbox.apache.org/repos/asf/airflow.git
commit d3e6283c643da5bf2d134a1174e52c5ef753a80d Author: Jarek Potiuk <[email protected]> AuthorDate: Sat Dec 9 22:14:27 2023 +0100 Change retry type for Google Dataflow Client to async one (#36141) Google Dataflow Client 0.8.6 implemented bugfix where retry type was changed to async. This caused our canary builds to fail. We change the client to Async now and bump min version of the client to 0.8.6. (cherry picked from commit 8d0c5d900875ce3b9dda1a86f1de534759e9d7f6) --- .../providers/google/cloud/hooks/bigquery_dts.py | 3 +- .../providers/google/cloud/hooks/cloud_build.py | 3 +- .../providers/google/cloud/hooks/cloud_composer.py | 7 ++-- airflow/providers/google/cloud/hooks/dataplex.py | 3 +- airflow/providers/google/cloud/hooks/dataproc.py | 37 +++++++++++----------- .../providers/google/cloud/operators/dataproc.py | 11 ++++--- airflow/providers/google/provider.yaml | 14 ++++---- generated/provider_dependencies.json | 14 ++++---- .../cloud/dataproc/example_dataproc_batch.py | 4 +-- 9 files changed, 51 insertions(+), 45 deletions(-) diff --git a/airflow/providers/google/cloud/hooks/bigquery_dts.py b/airflow/providers/google/cloud/hooks/bigquery_dts.py index c3f5780ea8..c7ef15fb44 100644 --- a/airflow/providers/google/cloud/hooks/bigquery_dts.py +++ b/airflow/providers/google/cloud/hooks/bigquery_dts.py @@ -38,6 +38,7 @@ from airflow.providers.google.common.hooks.base_google import ( if TYPE_CHECKING: from google.api_core.retry import Retry + from google.api_core.retry_async import AsyncRetry from googleapiclient.discovery import Resource @@ -321,7 +322,7 @@ class AsyncBiqQueryDataTransferServiceHook(GoogleBaseAsyncHook): run_id: str, project_id: str | None, location: str | None = None, - retry: Retry | _MethodDefault = DEFAULT, + retry: AsyncRetry | _MethodDefault = DEFAULT, timeout: float | None = None, metadata: Sequence[tuple[str, str]] = (), ): diff --git a/airflow/providers/google/cloud/hooks/cloud_build.py b/airflow/providers/google/cloud/hooks/cloud_build.py index 5cd9b798ea..189303a9ce 100644 --- a/airflow/providers/google/cloud/hooks/cloud_build.py +++ b/airflow/providers/google/cloud/hooks/cloud_build.py @@ -33,6 +33,7 @@ from airflow.providers.google.common.hooks.base_google import PROVIDE_PROJECT_ID if TYPE_CHECKING: from google.api_core.operation import Operation from google.api_core.retry import Retry + from google.api_core.retry_async import AsyncRetry from google.cloud.devtools.cloudbuild_v1.types import Build, BuildTrigger, RepoSource # Time to sleep between active checks of the operation results @@ -645,7 +646,7 @@ class CloudBuildAsyncHook(GoogleBaseHook): self, id_: str, project_id: str = PROVIDE_PROJECT_ID, - retry: Retry | _MethodDefault = DEFAULT, + retry: AsyncRetry | _MethodDefault = DEFAULT, timeout: float | None = None, metadata: Sequence[tuple[str, str]] = (), location: str = "global", diff --git a/airflow/providers/google/cloud/hooks/cloud_composer.py b/airflow/providers/google/cloud/hooks/cloud_composer.py index 63170a4513..01e88df8a3 100644 --- a/airflow/providers/google/cloud/hooks/cloud_composer.py +++ b/airflow/providers/google/cloud/hooks/cloud_composer.py @@ -35,6 +35,7 @@ if TYPE_CHECKING: from google.api_core.operation import Operation from google.api_core.operation_async import AsyncOperation from google.api_core.retry import Retry + from google.api_core.retry_async import AsyncRetry from google.cloud.orchestration.airflow.service_v1.services.environments.pagers import ( ListEnvironmentsPager, ) @@ -332,7 +333,7 @@ class CloudComposerAsyncHook(GoogleBaseHook): project_id: str, region: str, environment: Environment | dict, - retry: Retry | _MethodDefault = DEFAULT, + retry: AsyncRetry | _MethodDefault = DEFAULT, timeout: float | None = None, metadata: Sequence[tuple[str, str]] = (), ) -> AsyncOperation: @@ -361,7 +362,7 @@ class CloudComposerAsyncHook(GoogleBaseHook): project_id: str, region: str, environment_id: str, - retry: Retry | _MethodDefault = DEFAULT, + retry: AsyncRetry | _MethodDefault = DEFAULT, timeout: float | None = None, metadata: Sequence[tuple[str, str]] = (), ) -> AsyncOperation: @@ -389,7 +390,7 @@ class CloudComposerAsyncHook(GoogleBaseHook): environment_id: str, environment: Environment | dict, update_mask: dict | FieldMask, - retry: Retry | _MethodDefault = DEFAULT, + retry: AsyncRetry | _MethodDefault = DEFAULT, timeout: float | None = None, metadata: Sequence[tuple[str, str]] = (), ) -> AsyncOperation: diff --git a/airflow/providers/google/cloud/hooks/dataplex.py b/airflow/providers/google/cloud/hooks/dataplex.py index 4eeefa2a3d..3832026886 100644 --- a/airflow/providers/google/cloud/hooks/dataplex.py +++ b/airflow/providers/google/cloud/hooks/dataplex.py @@ -40,6 +40,7 @@ from airflow.providers.google.common.hooks.base_google import GoogleBaseAsyncHoo if TYPE_CHECKING: from google.api_core.operation import Operation from google.api_core.retry import Retry + from google.api_core.retry_async import AsyncRetry from googleapiclient.discovery import Resource PATH_DATA_SCAN = "projects/{project_id}/locations/{region}/dataScans/{data_scan_id}" @@ -896,7 +897,7 @@ class DataplexAsyncHook(GoogleBaseAsyncHook): region: str, data_scan_id: str | None = None, job_id: str | None = None, - retry: Retry | _MethodDefault = DEFAULT, + retry: AsyncRetry | _MethodDefault = DEFAULT, timeout: float | None = None, metadata: Sequence[tuple[str, str]] = (), ) -> Any: diff --git a/airflow/providers/google/cloud/hooks/dataproc.py b/airflow/providers/google/cloud/hooks/dataproc.py index 349a8488bc..10459118c0 100644 --- a/airflow/providers/google/cloud/hooks/dataproc.py +++ b/airflow/providers/google/cloud/hooks/dataproc.py @@ -51,6 +51,7 @@ if TYPE_CHECKING: from google.api_core.operation_async import AsyncOperation from google.api_core.operations_v1.operations_client import OperationsClient from google.api_core.retry import Retry + from google.api_core.retry_async import AsyncRetry from google.protobuf.duration_pb2 import Duration from google.protobuf.field_mask_pb2 import FieldMask @@ -256,7 +257,7 @@ class DataprocHook(GoogleBaseHook): self, operation: Operation, timeout: float | None = None, - result_retry: Retry | _MethodDefault = DEFAULT, + result_retry: AsyncRetry | _MethodDefault = DEFAULT, ) -> Any: """Wait for a long-lasting operation to complete.""" try: @@ -997,7 +998,7 @@ class DataprocHook(GoogleBaseHook): region: str, project_id: str, wait_check_interval: int = 10, - retry: Retry | _MethodDefault = DEFAULT, + retry: AsyncRetry | _MethodDefault = DEFAULT, timeout: float | None = None, metadata: Sequence[tuple[str, str]] = (), ) -> Batch: @@ -1132,7 +1133,7 @@ class DataprocAsyncHook(GoogleBaseHook): virtual_cluster_config: dict | None = None, labels: dict[str, str] | None = None, request_id: str | None = None, - retry: Retry | _MethodDefault = DEFAULT, + retry: AsyncRetry | _MethodDefault = DEFAULT, timeout: float | None = None, metadata: Sequence[tuple[str, str]] = (), ) -> AsyncOperation: @@ -1199,7 +1200,7 @@ class DataprocAsyncHook(GoogleBaseHook): project_id: str, cluster_uuid: str | None = None, request_id: str | None = None, - retry: Retry | _MethodDefault = DEFAULT, + retry: AsyncRetry | _MethodDefault = DEFAULT, timeout: float | None = None, metadata: Sequence[tuple[str, str]] = (), ) -> AsyncOperation: @@ -1242,7 +1243,7 @@ class DataprocAsyncHook(GoogleBaseHook): region: str, cluster_name: str, project_id: str, - retry: Retry | _MethodDefault = DEFAULT, + retry: AsyncRetry | _MethodDefault = DEFAULT, timeout: float | None = None, metadata: Sequence[tuple[str, str]] = (), ) -> str: @@ -1277,7 +1278,7 @@ class DataprocAsyncHook(GoogleBaseHook): region: str, cluster_name: str, project_id: str, - retry: Retry | _MethodDefault = DEFAULT, + retry: AsyncRetry | _MethodDefault = DEFAULT, timeout: float | None = None, metadata: Sequence[tuple[str, str]] = (), ) -> Cluster: @@ -1309,7 +1310,7 @@ class DataprocAsyncHook(GoogleBaseHook): filter_: str, project_id: str, page_size: int | None = None, - retry: Retry | _MethodDefault = DEFAULT, + retry: AsyncRetry | _MethodDefault = DEFAULT, timeout: float | None = None, metadata: Sequence[tuple[str, str]] = (), ): @@ -1349,7 +1350,7 @@ class DataprocAsyncHook(GoogleBaseHook): region: str, graceful_decommission_timeout: dict | Duration | None = None, request_id: str | None = None, - retry: Retry | _MethodDefault = DEFAULT, + retry: AsyncRetry | _MethodDefault = DEFAULT, timeout: float | None = None, metadata: Sequence[tuple[str, str]] = (), ) -> AsyncOperation: @@ -1429,7 +1430,7 @@ class DataprocAsyncHook(GoogleBaseHook): template: dict | WorkflowTemplate, project_id: str, region: str, - retry: Retry | _MethodDefault = DEFAULT, + retry: AsyncRetry | _MethodDefault = DEFAULT, timeout: float | None = None, metadata: Sequence[tuple[str, str]] = (), ) -> WorkflowTemplate: @@ -1465,7 +1466,7 @@ class DataprocAsyncHook(GoogleBaseHook): version: int | None = None, request_id: str | None = None, parameters: dict[str, str] | None = None, - retry: Retry | _MethodDefault = DEFAULT, + retry: AsyncRetry | _MethodDefault = DEFAULT, timeout: float | None = None, metadata: Sequence[tuple[str, str]] = (), ) -> AsyncOperation: @@ -1511,7 +1512,7 @@ class DataprocAsyncHook(GoogleBaseHook): project_id: str, region: str, request_id: str | None = None, - retry: Retry | _MethodDefault = DEFAULT, + retry: AsyncRetry | _MethodDefault = DEFAULT, timeout: float | None = None, metadata: Sequence[tuple[str, str]] = (), ) -> AsyncOperation: @@ -1554,7 +1555,7 @@ class DataprocAsyncHook(GoogleBaseHook): job_id: str, project_id: str, region: str, - retry: Retry | _MethodDefault = DEFAULT, + retry: AsyncRetry | _MethodDefault = DEFAULT, timeout: float | None = None, metadata: Sequence[tuple[str, str]] = (), ) -> Job: @@ -1588,7 +1589,7 @@ class DataprocAsyncHook(GoogleBaseHook): project_id: str, region: str, request_id: str | None = None, - retry: Retry | _MethodDefault = DEFAULT, + retry: AsyncRetry | _MethodDefault = DEFAULT, timeout: float | None = None, metadata: Sequence[tuple[str, str]] = (), ) -> Job: @@ -1624,7 +1625,7 @@ class DataprocAsyncHook(GoogleBaseHook): job_id: str, project_id: str, region: str | None = None, - retry: Retry | _MethodDefault = DEFAULT, + retry: AsyncRetry | _MethodDefault = DEFAULT, timeout: float | None = None, metadata: Sequence[tuple[str, str]] = (), ) -> Job: @@ -1658,7 +1659,7 @@ class DataprocAsyncHook(GoogleBaseHook): batch: dict | Batch, batch_id: str | None = None, request_id: str | None = None, - retry: Retry | _MethodDefault = DEFAULT, + retry: AsyncRetry | _MethodDefault = DEFAULT, timeout: float | None = None, metadata: Sequence[tuple[str, str]] = (), ) -> AsyncOperation: @@ -1703,7 +1704,7 @@ class DataprocAsyncHook(GoogleBaseHook): batch_id: str, region: str, project_id: str, - retry: Retry | _MethodDefault = DEFAULT, + retry: AsyncRetry | _MethodDefault = DEFAULT, timeout: float | None = None, metadata: Sequence[tuple[str, str]] = (), ) -> None: @@ -1737,7 +1738,7 @@ class DataprocAsyncHook(GoogleBaseHook): batch_id: str, region: str, project_id: str, - retry: Retry | _MethodDefault = DEFAULT, + retry: AsyncRetry | _MethodDefault = DEFAULT, timeout: float | None = None, metadata: Sequence[tuple[str, str]] = (), ) -> Batch: @@ -1773,7 +1774,7 @@ class DataprocAsyncHook(GoogleBaseHook): project_id: str, page_size: int | None = None, page_token: str | None = None, - retry: Retry | _MethodDefault = DEFAULT, + retry: AsyncRetry | _MethodDefault = DEFAULT, timeout: float | None = None, metadata: Sequence[tuple[str, str]] = (), filter: str | None = None, diff --git a/airflow/providers/google/cloud/operators/dataproc.py b/airflow/providers/google/cloud/operators/dataproc.py index b489a79dc8..2d8e5f3d49 100644 --- a/airflow/providers/google/cloud/operators/dataproc.py +++ b/airflow/providers/google/cloud/operators/dataproc.py @@ -63,6 +63,7 @@ from airflow.utils import timezone if TYPE_CHECKING: from google.api_core import operation + from google.api_core.retry_async import AsyncRetry from google.protobuf.duration_pb2 import Duration from google.protobuf.field_mask_pb2 import FieldMask @@ -592,7 +593,7 @@ class DataprocCreateClusterOperator(GoogleCloudBaseOperator): request_id: str | None = None, delete_on_error: bool = True, use_if_exists: bool = True, - retry: Retry | _MethodDefault = DEFAULT, + retry: AsyncRetry | _MethodDefault = DEFAULT, timeout: float = 1 * 60 * 60, metadata: Sequence[tuple[str, str]] = (), gcp_conn_id: str = "google_cloud_default", @@ -985,7 +986,7 @@ class DataprocDeleteClusterOperator(GoogleCloudBaseOperator): project_id: str | None = None, cluster_uuid: str | None = None, request_id: str | None = None, - retry: Retry | _MethodDefault = DEFAULT, + retry: AsyncRetry | _MethodDefault = DEFAULT, timeout: float = 1 * 60 * 60, metadata: Sequence[tuple[str, str]] = (), gcp_conn_id: str = "google_cloud_default", @@ -1891,7 +1892,7 @@ class DataprocInstantiateWorkflowTemplateOperator(GoogleCloudBaseOperator): version: int | None = None, request_id: str | None = None, parameters: dict[str, str] | None = None, - retry: Retry | _MethodDefault = DEFAULT, + retry: AsyncRetry | _MethodDefault = DEFAULT, timeout: float | None = None, metadata: Sequence[tuple[str, str]] = (), gcp_conn_id: str = "google_cloud_default", @@ -2340,7 +2341,7 @@ class DataprocUpdateClusterOperator(GoogleCloudBaseOperator): region: str, request_id: str | None = None, project_id: str | None = None, - retry: Retry | _MethodDefault = DEFAULT, + retry: AsyncRetry | _MethodDefault = DEFAULT, timeout: float | None = None, metadata: Sequence[tuple[str, str]] = (), gcp_conn_id: str = "google_cloud_default", @@ -2480,7 +2481,7 @@ class DataprocCreateBatchOperator(GoogleCloudBaseOperator): metadata: Sequence[tuple[str, str]] = (), gcp_conn_id: str = "google_cloud_default", impersonation_chain: str | Sequence[str] | None = None, - result_retry: Retry | _MethodDefault = DEFAULT, + result_retry: AsyncRetry | _MethodDefault = DEFAULT, asynchronous: bool = False, deferrable: bool = conf.getboolean("operators", "default_deferrable", fallback=False), polling_interval_seconds: int = 5, diff --git a/airflow/providers/google/provider.yaml b/airflow/providers/google/provider.yaml index 1e6bcbe69b..4a7f9262f1 100644 --- a/airflow/providers/google/provider.yaml +++ b/airflow/providers/google/provider.yaml @@ -94,17 +94,17 @@ dependencies: - google-auth>=1.0.0 - google-auth-httplib2>=0.0.1 - google-cloud-aiplatform>=1.22.1 - - google-cloud-automl>=2.11.0 - - google-cloud-bigquery-datatransfer>=3.11.0 + - google-cloud-automl>=2.12.0 + - google-cloud-bigquery-datatransfer>=3.13.0 - google-cloud-bigtable>=2.17.0 - - google-cloud-build>=3.13.0 + - google-cloud-build>=3.22.0 - google-cloud-compute>=1.10.0 - google-cloud-container>=2.17.4 - google-cloud-datacatalog>=3.11.1 - - google-cloud-dataflow-client>=0.8.2 + - google-cloud-dataflow-client>=0.8.6 - google-cloud-dataform>=0.5.0 - - google-cloud-dataplex>=1.4.2 - - google-cloud-dataproc>=5.5.0 + - google-cloud-dataplex>=1.10.0 + - google-cloud-dataproc>=5.8.0 - google-cloud-dataproc-metastore>=1.12.0 - google-cloud-dlp>=3.12.0 - google-cloud-kms>=2.15.0 @@ -112,7 +112,7 @@ dependencies: - google-cloud-logging>=3.5.0 - google-cloud-memcache>=1.7.0 - google-cloud-monitoring>=2.14.1 - - google-cloud-orchestration-airflow>=1.7.0 + - google-cloud-orchestration-airflow>=1.10.0 - google-cloud-os-login>=2.9.1 - google-cloud-pubsub>=2.15.0 - google-cloud-redis>=2.12.0 diff --git a/generated/provider_dependencies.json b/generated/provider_dependencies.json index c47736d9ac..60ffee762d 100644 --- a/generated/provider_dependencies.json +++ b/generated/provider_dependencies.json @@ -422,26 +422,26 @@ "google-auth-httplib2>=0.0.1", "google-auth>=1.0.0", "google-cloud-aiplatform>=1.22.1", - "google-cloud-automl>=2.11.0", + "google-cloud-automl>=2.12.0", "google-cloud-batch>=0.13.0", - "google-cloud-bigquery-datatransfer>=3.11.0", + "google-cloud-bigquery-datatransfer>=3.13.0", "google-cloud-bigtable>=2.17.0", - "google-cloud-build>=3.13.0", + "google-cloud-build>=3.22.0", "google-cloud-compute>=1.10.0", "google-cloud-container>=2.17.4", "google-cloud-datacatalog>=3.11.1", - "google-cloud-dataflow-client>=0.8.2", + "google-cloud-dataflow-client>=0.8.6", "google-cloud-dataform>=0.5.0", - "google-cloud-dataplex>=1.4.2", + "google-cloud-dataplex>=1.10.0", "google-cloud-dataproc-metastore>=1.12.0", - "google-cloud-dataproc>=5.5.0", + "google-cloud-dataproc>=5.8.0", "google-cloud-dlp>=3.12.0", "google-cloud-kms>=2.15.0", "google-cloud-language>=2.9.0", "google-cloud-logging>=3.5.0", "google-cloud-memcache>=1.7.0", "google-cloud-monitoring>=2.14.1", - "google-cloud-orchestration-airflow>=1.7.0", + "google-cloud-orchestration-airflow>=1.10.0", "google-cloud-os-login>=2.9.1", "google-cloud-pubsub>=2.15.0", "google-cloud-redis>=2.12.0", diff --git a/tests/system/providers/google/cloud/dataproc/example_dataproc_batch.py b/tests/system/providers/google/cloud/dataproc/example_dataproc_batch.py index 7dd5eff73a..7d126fc28b 100644 --- a/tests/system/providers/google/cloud/dataproc/example_dataproc_batch.py +++ b/tests/system/providers/google/cloud/dataproc/example_dataproc_batch.py @@ -22,7 +22,7 @@ from __future__ import annotations import os from datetime import datetime -from google.api_core.retry import Retry +from google.api_core.retry_async import AsyncRetry from airflow.models.dag import DAG from airflow.providers.google.cloud.operators.dataproc import ( @@ -75,7 +75,7 @@ with DAG( region=REGION, batch=BATCH_CONFIG, batch_id=BATCH_ID_2, - result_retry=Retry(maximum=10.0, initial=10.0, multiplier=1.0), + result_retry=AsyncRetry(maximum=10.0, initial=10.0, multiplier=1.0), ) create_batch_3 = DataprocCreateBatchOperator(
