This is an automated email from the ASF dual-hosted git repository.
potiuk pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git
The following commit(s) were added to refs/heads/main by this push:
new 8d0c5d9008 Change retry type for Google Dataflow Client to async one
(#36141)
8d0c5d9008 is described below
commit 8d0c5d900875ce3b9dda1a86f1de534759e9d7f6
Author: Jarek Potiuk <[email protected]>
AuthorDate: Sat Dec 9 22:14:27 2023 +0100
Change retry type for Google Dataflow Client to async one (#36141)
Google Dataflow Client 0.8.6 implemented bugfix where retry type
was changed to async. This caused our canary builds to fail.
We change the client to Async now and bump min version of the
client to 0.8.6.
---
.../providers/google/cloud/hooks/bigquery_dts.py | 3 +-
.../providers/google/cloud/hooks/cloud_build.py | 3 +-
.../providers/google/cloud/hooks/cloud_composer.py | 7 ++--
airflow/providers/google/cloud/hooks/dataplex.py | 3 +-
airflow/providers/google/cloud/hooks/dataproc.py | 37 +++++++++++-----------
.../providers/google/cloud/operators/dataproc.py | 11 ++++---
airflow/providers/google/provider.yaml | 14 ++++----
generated/provider_dependencies.json | 14 ++++----
.../cloud/dataproc/example_dataproc_batch.py | 4 +--
9 files changed, 51 insertions(+), 45 deletions(-)
diff --git a/airflow/providers/google/cloud/hooks/bigquery_dts.py
b/airflow/providers/google/cloud/hooks/bigquery_dts.py
index c3f5780ea8..c7ef15fb44 100644
--- a/airflow/providers/google/cloud/hooks/bigquery_dts.py
+++ b/airflow/providers/google/cloud/hooks/bigquery_dts.py
@@ -38,6 +38,7 @@ from airflow.providers.google.common.hooks.base_google import
(
if TYPE_CHECKING:
from google.api_core.retry import Retry
+ from google.api_core.retry_async import AsyncRetry
from googleapiclient.discovery import Resource
@@ -321,7 +322,7 @@ class
AsyncBiqQueryDataTransferServiceHook(GoogleBaseAsyncHook):
run_id: str,
project_id: str | None,
location: str | None = None,
- retry: Retry | _MethodDefault = DEFAULT,
+ retry: AsyncRetry | _MethodDefault = DEFAULT,
timeout: float | None = None,
metadata: Sequence[tuple[str, str]] = (),
):
diff --git a/airflow/providers/google/cloud/hooks/cloud_build.py
b/airflow/providers/google/cloud/hooks/cloud_build.py
index 5cd9b798ea..189303a9ce 100644
--- a/airflow/providers/google/cloud/hooks/cloud_build.py
+++ b/airflow/providers/google/cloud/hooks/cloud_build.py
@@ -33,6 +33,7 @@ from airflow.providers.google.common.hooks.base_google import
PROVIDE_PROJECT_ID
if TYPE_CHECKING:
from google.api_core.operation import Operation
from google.api_core.retry import Retry
+ from google.api_core.retry_async import AsyncRetry
from google.cloud.devtools.cloudbuild_v1.types import Build, BuildTrigger,
RepoSource
# Time to sleep between active checks of the operation results
@@ -645,7 +646,7 @@ class CloudBuildAsyncHook(GoogleBaseHook):
self,
id_: str,
project_id: str = PROVIDE_PROJECT_ID,
- retry: Retry | _MethodDefault = DEFAULT,
+ retry: AsyncRetry | _MethodDefault = DEFAULT,
timeout: float | None = None,
metadata: Sequence[tuple[str, str]] = (),
location: str = "global",
diff --git a/airflow/providers/google/cloud/hooks/cloud_composer.py
b/airflow/providers/google/cloud/hooks/cloud_composer.py
index 63170a4513..01e88df8a3 100644
--- a/airflow/providers/google/cloud/hooks/cloud_composer.py
+++ b/airflow/providers/google/cloud/hooks/cloud_composer.py
@@ -35,6 +35,7 @@ if TYPE_CHECKING:
from google.api_core.operation import Operation
from google.api_core.operation_async import AsyncOperation
from google.api_core.retry import Retry
+ from google.api_core.retry_async import AsyncRetry
from
google.cloud.orchestration.airflow.service_v1.services.environments.pagers
import (
ListEnvironmentsPager,
)
@@ -332,7 +333,7 @@ class CloudComposerAsyncHook(GoogleBaseHook):
project_id: str,
region: str,
environment: Environment | dict,
- retry: Retry | _MethodDefault = DEFAULT,
+ retry: AsyncRetry | _MethodDefault = DEFAULT,
timeout: float | None = None,
metadata: Sequence[tuple[str, str]] = (),
) -> AsyncOperation:
@@ -361,7 +362,7 @@ class CloudComposerAsyncHook(GoogleBaseHook):
project_id: str,
region: str,
environment_id: str,
- retry: Retry | _MethodDefault = DEFAULT,
+ retry: AsyncRetry | _MethodDefault = DEFAULT,
timeout: float | None = None,
metadata: Sequence[tuple[str, str]] = (),
) -> AsyncOperation:
@@ -389,7 +390,7 @@ class CloudComposerAsyncHook(GoogleBaseHook):
environment_id: str,
environment: Environment | dict,
update_mask: dict | FieldMask,
- retry: Retry | _MethodDefault = DEFAULT,
+ retry: AsyncRetry | _MethodDefault = DEFAULT,
timeout: float | None = None,
metadata: Sequence[tuple[str, str]] = (),
) -> AsyncOperation:
diff --git a/airflow/providers/google/cloud/hooks/dataplex.py
b/airflow/providers/google/cloud/hooks/dataplex.py
index 4eeefa2a3d..3832026886 100644
--- a/airflow/providers/google/cloud/hooks/dataplex.py
+++ b/airflow/providers/google/cloud/hooks/dataplex.py
@@ -40,6 +40,7 @@ from airflow.providers.google.common.hooks.base_google import
GoogleBaseAsyncHoo
if TYPE_CHECKING:
from google.api_core.operation import Operation
from google.api_core.retry import Retry
+ from google.api_core.retry_async import AsyncRetry
from googleapiclient.discovery import Resource
PATH_DATA_SCAN =
"projects/{project_id}/locations/{region}/dataScans/{data_scan_id}"
@@ -896,7 +897,7 @@ class DataplexAsyncHook(GoogleBaseAsyncHook):
region: str,
data_scan_id: str | None = None,
job_id: str | None = None,
- retry: Retry | _MethodDefault = DEFAULT,
+ retry: AsyncRetry | _MethodDefault = DEFAULT,
timeout: float | None = None,
metadata: Sequence[tuple[str, str]] = (),
) -> Any:
diff --git a/airflow/providers/google/cloud/hooks/dataproc.py
b/airflow/providers/google/cloud/hooks/dataproc.py
index 349a8488bc..10459118c0 100644
--- a/airflow/providers/google/cloud/hooks/dataproc.py
+++ b/airflow/providers/google/cloud/hooks/dataproc.py
@@ -51,6 +51,7 @@ if TYPE_CHECKING:
from google.api_core.operation_async import AsyncOperation
from google.api_core.operations_v1.operations_client import
OperationsClient
from google.api_core.retry import Retry
+ from google.api_core.retry_async import AsyncRetry
from google.protobuf.duration_pb2 import Duration
from google.protobuf.field_mask_pb2 import FieldMask
@@ -256,7 +257,7 @@ class DataprocHook(GoogleBaseHook):
self,
operation: Operation,
timeout: float | None = None,
- result_retry: Retry | _MethodDefault = DEFAULT,
+ result_retry: AsyncRetry | _MethodDefault = DEFAULT,
) -> Any:
"""Wait for a long-lasting operation to complete."""
try:
@@ -997,7 +998,7 @@ class DataprocHook(GoogleBaseHook):
region: str,
project_id: str,
wait_check_interval: int = 10,
- retry: Retry | _MethodDefault = DEFAULT,
+ retry: AsyncRetry | _MethodDefault = DEFAULT,
timeout: float | None = None,
metadata: Sequence[tuple[str, str]] = (),
) -> Batch:
@@ -1132,7 +1133,7 @@ class DataprocAsyncHook(GoogleBaseHook):
virtual_cluster_config: dict | None = None,
labels: dict[str, str] | None = None,
request_id: str | None = None,
- retry: Retry | _MethodDefault = DEFAULT,
+ retry: AsyncRetry | _MethodDefault = DEFAULT,
timeout: float | None = None,
metadata: Sequence[tuple[str, str]] = (),
) -> AsyncOperation:
@@ -1199,7 +1200,7 @@ class DataprocAsyncHook(GoogleBaseHook):
project_id: str,
cluster_uuid: str | None = None,
request_id: str | None = None,
- retry: Retry | _MethodDefault = DEFAULT,
+ retry: AsyncRetry | _MethodDefault = DEFAULT,
timeout: float | None = None,
metadata: Sequence[tuple[str, str]] = (),
) -> AsyncOperation:
@@ -1242,7 +1243,7 @@ class DataprocAsyncHook(GoogleBaseHook):
region: str,
cluster_name: str,
project_id: str,
- retry: Retry | _MethodDefault = DEFAULT,
+ retry: AsyncRetry | _MethodDefault = DEFAULT,
timeout: float | None = None,
metadata: Sequence[tuple[str, str]] = (),
) -> str:
@@ -1277,7 +1278,7 @@ class DataprocAsyncHook(GoogleBaseHook):
region: str,
cluster_name: str,
project_id: str,
- retry: Retry | _MethodDefault = DEFAULT,
+ retry: AsyncRetry | _MethodDefault = DEFAULT,
timeout: float | None = None,
metadata: Sequence[tuple[str, str]] = (),
) -> Cluster:
@@ -1309,7 +1310,7 @@ class DataprocAsyncHook(GoogleBaseHook):
filter_: str,
project_id: str,
page_size: int | None = None,
- retry: Retry | _MethodDefault = DEFAULT,
+ retry: AsyncRetry | _MethodDefault = DEFAULT,
timeout: float | None = None,
metadata: Sequence[tuple[str, str]] = (),
):
@@ -1349,7 +1350,7 @@ class DataprocAsyncHook(GoogleBaseHook):
region: str,
graceful_decommission_timeout: dict | Duration | None = None,
request_id: str | None = None,
- retry: Retry | _MethodDefault = DEFAULT,
+ retry: AsyncRetry | _MethodDefault = DEFAULT,
timeout: float | None = None,
metadata: Sequence[tuple[str, str]] = (),
) -> AsyncOperation:
@@ -1429,7 +1430,7 @@ class DataprocAsyncHook(GoogleBaseHook):
template: dict | WorkflowTemplate,
project_id: str,
region: str,
- retry: Retry | _MethodDefault = DEFAULT,
+ retry: AsyncRetry | _MethodDefault = DEFAULT,
timeout: float | None = None,
metadata: Sequence[tuple[str, str]] = (),
) -> WorkflowTemplate:
@@ -1465,7 +1466,7 @@ class DataprocAsyncHook(GoogleBaseHook):
version: int | None = None,
request_id: str | None = None,
parameters: dict[str, str] | None = None,
- retry: Retry | _MethodDefault = DEFAULT,
+ retry: AsyncRetry | _MethodDefault = DEFAULT,
timeout: float | None = None,
metadata: Sequence[tuple[str, str]] = (),
) -> AsyncOperation:
@@ -1511,7 +1512,7 @@ class DataprocAsyncHook(GoogleBaseHook):
project_id: str,
region: str,
request_id: str | None = None,
- retry: Retry | _MethodDefault = DEFAULT,
+ retry: AsyncRetry | _MethodDefault = DEFAULT,
timeout: float | None = None,
metadata: Sequence[tuple[str, str]] = (),
) -> AsyncOperation:
@@ -1554,7 +1555,7 @@ class DataprocAsyncHook(GoogleBaseHook):
job_id: str,
project_id: str,
region: str,
- retry: Retry | _MethodDefault = DEFAULT,
+ retry: AsyncRetry | _MethodDefault = DEFAULT,
timeout: float | None = None,
metadata: Sequence[tuple[str, str]] = (),
) -> Job:
@@ -1588,7 +1589,7 @@ class DataprocAsyncHook(GoogleBaseHook):
project_id: str,
region: str,
request_id: str | None = None,
- retry: Retry | _MethodDefault = DEFAULT,
+ retry: AsyncRetry | _MethodDefault = DEFAULT,
timeout: float | None = None,
metadata: Sequence[tuple[str, str]] = (),
) -> Job:
@@ -1624,7 +1625,7 @@ class DataprocAsyncHook(GoogleBaseHook):
job_id: str,
project_id: str,
region: str | None = None,
- retry: Retry | _MethodDefault = DEFAULT,
+ retry: AsyncRetry | _MethodDefault = DEFAULT,
timeout: float | None = None,
metadata: Sequence[tuple[str, str]] = (),
) -> Job:
@@ -1658,7 +1659,7 @@ class DataprocAsyncHook(GoogleBaseHook):
batch: dict | Batch,
batch_id: str | None = None,
request_id: str | None = None,
- retry: Retry | _MethodDefault = DEFAULT,
+ retry: AsyncRetry | _MethodDefault = DEFAULT,
timeout: float | None = None,
metadata: Sequence[tuple[str, str]] = (),
) -> AsyncOperation:
@@ -1703,7 +1704,7 @@ class DataprocAsyncHook(GoogleBaseHook):
batch_id: str,
region: str,
project_id: str,
- retry: Retry | _MethodDefault = DEFAULT,
+ retry: AsyncRetry | _MethodDefault = DEFAULT,
timeout: float | None = None,
metadata: Sequence[tuple[str, str]] = (),
) -> None:
@@ -1737,7 +1738,7 @@ class DataprocAsyncHook(GoogleBaseHook):
batch_id: str,
region: str,
project_id: str,
- retry: Retry | _MethodDefault = DEFAULT,
+ retry: AsyncRetry | _MethodDefault = DEFAULT,
timeout: float | None = None,
metadata: Sequence[tuple[str, str]] = (),
) -> Batch:
@@ -1773,7 +1774,7 @@ class DataprocAsyncHook(GoogleBaseHook):
project_id: str,
page_size: int | None = None,
page_token: str | None = None,
- retry: Retry | _MethodDefault = DEFAULT,
+ retry: AsyncRetry | _MethodDefault = DEFAULT,
timeout: float | None = None,
metadata: Sequence[tuple[str, str]] = (),
filter: str | None = None,
diff --git a/airflow/providers/google/cloud/operators/dataproc.py
b/airflow/providers/google/cloud/operators/dataproc.py
index 477ddbf7d0..0e9784dfc2 100644
--- a/airflow/providers/google/cloud/operators/dataproc.py
+++ b/airflow/providers/google/cloud/operators/dataproc.py
@@ -63,6 +63,7 @@ from airflow.utils import timezone
if TYPE_CHECKING:
from google.api_core import operation
+ from google.api_core.retry_async import AsyncRetry
from google.protobuf.duration_pb2 import Duration
from google.protobuf.field_mask_pb2 import FieldMask
@@ -592,7 +593,7 @@ class
DataprocCreateClusterOperator(GoogleCloudBaseOperator):
request_id: str | None = None,
delete_on_error: bool = True,
use_if_exists: bool = True,
- retry: Retry | _MethodDefault = DEFAULT,
+ retry: AsyncRetry | _MethodDefault = DEFAULT,
timeout: float = 1 * 60 * 60,
metadata: Sequence[tuple[str, str]] = (),
gcp_conn_id: str = "google_cloud_default",
@@ -985,7 +986,7 @@ class
DataprocDeleteClusterOperator(GoogleCloudBaseOperator):
project_id: str | None = None,
cluster_uuid: str | None = None,
request_id: str | None = None,
- retry: Retry | _MethodDefault = DEFAULT,
+ retry: AsyncRetry | _MethodDefault = DEFAULT,
timeout: float = 1 * 60 * 60,
metadata: Sequence[tuple[str, str]] = (),
gcp_conn_id: str = "google_cloud_default",
@@ -1891,7 +1892,7 @@ class
DataprocInstantiateWorkflowTemplateOperator(GoogleCloudBaseOperator):
version: int | None = None,
request_id: str | None = None,
parameters: dict[str, str] | None = None,
- retry: Retry | _MethodDefault = DEFAULT,
+ retry: AsyncRetry | _MethodDefault = DEFAULT,
timeout: float | None = None,
metadata: Sequence[tuple[str, str]] = (),
gcp_conn_id: str = "google_cloud_default",
@@ -2341,7 +2342,7 @@ class
DataprocUpdateClusterOperator(GoogleCloudBaseOperator):
region: str,
request_id: str | None = None,
project_id: str | None = None,
- retry: Retry | _MethodDefault = DEFAULT,
+ retry: AsyncRetry | _MethodDefault = DEFAULT,
timeout: float | None = None,
metadata: Sequence[tuple[str, str]] = (),
gcp_conn_id: str = "google_cloud_default",
@@ -2481,7 +2482,7 @@ class
DataprocCreateBatchOperator(GoogleCloudBaseOperator):
metadata: Sequence[tuple[str, str]] = (),
gcp_conn_id: str = "google_cloud_default",
impersonation_chain: str | Sequence[str] | None = None,
- result_retry: Retry | _MethodDefault = DEFAULT,
+ result_retry: AsyncRetry | _MethodDefault = DEFAULT,
asynchronous: bool = False,
deferrable: bool = conf.getboolean("operators", "default_deferrable",
fallback=False),
polling_interval_seconds: int = 5,
diff --git a/airflow/providers/google/provider.yaml
b/airflow/providers/google/provider.yaml
index 36003fdd98..4dac866e67 100644
--- a/airflow/providers/google/provider.yaml
+++ b/airflow/providers/google/provider.yaml
@@ -95,17 +95,17 @@ dependencies:
- google-auth>=1.0.0
- google-auth-httplib2>=0.0.1
- google-cloud-aiplatform>=1.22.1
- - google-cloud-automl>=2.11.0
- - google-cloud-bigquery-datatransfer>=3.11.0
+ - google-cloud-automl>=2.12.0
+ - google-cloud-bigquery-datatransfer>=3.13.0
- google-cloud-bigtable>=2.17.0
- - google-cloud-build>=3.13.0
+ - google-cloud-build>=3.22.0
- google-cloud-compute>=1.10.0
- google-cloud-container>=2.17.4
- google-cloud-datacatalog>=3.11.1
- - google-cloud-dataflow-client>=0.8.2
+ - google-cloud-dataflow-client>=0.8.6
- google-cloud-dataform>=0.5.0
- - google-cloud-dataplex>=1.4.2
- - google-cloud-dataproc>=5.5.0
+ - google-cloud-dataplex>=1.10.0
+ - google-cloud-dataproc>=5.8.0
- google-cloud-dataproc-metastore>=1.12.0
- google-cloud-dlp>=3.12.0
- google-cloud-kms>=2.15.0
@@ -113,7 +113,7 @@ dependencies:
- google-cloud-logging>=3.5.0
- google-cloud-memcache>=1.7.0
- google-cloud-monitoring>=2.14.1
- - google-cloud-orchestration-airflow>=1.7.0
+ - google-cloud-orchestration-airflow>=1.10.0
- google-cloud-os-login>=2.9.1
- google-cloud-pubsub>=2.15.0
- google-cloud-redis>=2.12.0
diff --git a/generated/provider_dependencies.json
b/generated/provider_dependencies.json
index 24906c94cf..cdbca9d6ad 100644
--- a/generated/provider_dependencies.json
+++ b/generated/provider_dependencies.json
@@ -422,26 +422,26 @@
"google-auth-httplib2>=0.0.1",
"google-auth>=1.0.0",
"google-cloud-aiplatform>=1.22.1",
- "google-cloud-automl>=2.11.0",
+ "google-cloud-automl>=2.12.0",
"google-cloud-batch>=0.13.0",
- "google-cloud-bigquery-datatransfer>=3.11.0",
+ "google-cloud-bigquery-datatransfer>=3.13.0",
"google-cloud-bigtable>=2.17.0",
- "google-cloud-build>=3.13.0",
+ "google-cloud-build>=3.22.0",
"google-cloud-compute>=1.10.0",
"google-cloud-container>=2.17.4",
"google-cloud-datacatalog>=3.11.1",
- "google-cloud-dataflow-client>=0.8.2",
+ "google-cloud-dataflow-client>=0.8.6",
"google-cloud-dataform>=0.5.0",
- "google-cloud-dataplex>=1.4.2",
+ "google-cloud-dataplex>=1.10.0",
"google-cloud-dataproc-metastore>=1.12.0",
- "google-cloud-dataproc>=5.5.0",
+ "google-cloud-dataproc>=5.8.0",
"google-cloud-dlp>=3.12.0",
"google-cloud-kms>=2.15.0",
"google-cloud-language>=2.9.0",
"google-cloud-logging>=3.5.0",
"google-cloud-memcache>=1.7.0",
"google-cloud-monitoring>=2.14.1",
- "google-cloud-orchestration-airflow>=1.7.0",
+ "google-cloud-orchestration-airflow>=1.10.0",
"google-cloud-os-login>=2.9.1",
"google-cloud-pubsub>=2.15.0",
"google-cloud-redis>=2.12.0",
diff --git
a/tests/system/providers/google/cloud/dataproc/example_dataproc_batch.py
b/tests/system/providers/google/cloud/dataproc/example_dataproc_batch.py
index 7dd5eff73a..7d126fc28b 100644
--- a/tests/system/providers/google/cloud/dataproc/example_dataproc_batch.py
+++ b/tests/system/providers/google/cloud/dataproc/example_dataproc_batch.py
@@ -22,7 +22,7 @@ from __future__ import annotations
import os
from datetime import datetime
-from google.api_core.retry import Retry
+from google.api_core.retry_async import AsyncRetry
from airflow.models.dag import DAG
from airflow.providers.google.cloud.operators.dataproc import (
@@ -75,7 +75,7 @@ with DAG(
region=REGION,
batch=BATCH_CONFIG,
batch_id=BATCH_ID_2,
- result_retry=Retry(maximum=10.0, initial=10.0, multiplier=1.0),
+ result_retry=AsyncRetry(maximum=10.0, initial=10.0, multiplier=1.0),
)
create_batch_3 = DataprocCreateBatchOperator(