This is an automated email from the ASF dual-hosted git repository.
turbaszek pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/airflow.git
The following commit(s) were added to refs/heads/master by this push:
new 10ce311 Deprecate using global as the default region in Google
Dataproc operators and hooks (#10772)
10ce311 is described below
commit 10ce31127f1ff87176158935925afce46a989917
Author: Dmitri Kuksik <[email protected]>
AuthorDate: Tue Sep 8 08:46:29 2020 +0200
Deprecate using global as the default region in Google Dataproc operators
and hooks (#10772)
The region parameter is required for some of Google Dataproc operators
and it should be provided by users to avoid creating data-intensive
tasks in any default location.
---
airflow/providers/google/cloud/hooks/dataproc.py | 10 +++++++-
.../providers/google/cloud/operators/dataproc.py | 29 +++++++++++++++++-----
.../providers/google/cloud/hooks/test_dataproc.py | 9 +++++++
.../google/cloud/operators/test_dataproc.py | 10 ++++++++
4 files changed, 51 insertions(+), 7 deletions(-)
diff --git a/airflow/providers/google/cloud/hooks/dataproc.py
b/airflow/providers/google/cloud/hooks/dataproc.py
index 0f9685b..0d068d0 100644
--- a/airflow/providers/google/cloud/hooks/dataproc.py
+++ b/airflow/providers/google/cloud/hooks/dataproc.py
@@ -851,7 +851,7 @@ class DataprocHook(GoogleBaseHook):
self,
job_id: str,
project_id: str,
- location: str = 'global',
+ location: Optional[str] = None,
retry: Optional[Retry] = None,
timeout: Optional[float] = None,
metadata: Optional[Sequence[Tuple[str, str]]] = None,
@@ -874,7 +874,15 @@ class DataprocHook(GoogleBaseHook):
:param metadata: Additional metadata that is provided to the method.
:type metadata: Sequence[Tuple[str, str]]
"""
+ if location is None:
+ warnings.warn(
+ "Default location value `global` will be deprecated. Please,
provide location value.",
+ DeprecationWarning,
+ stacklevel=2,
+ )
+ location = 'global'
client = self.get_job_client(location=location)
+
job = client.cancel_job(
project_id=project_id,
region=location,
diff --git a/airflow/providers/google/cloud/operators/dataproc.py
b/airflow/providers/google/cloud/operators/dataproc.py
index a8b3780..6c59b56 100644
--- a/airflow/providers/google/cloud/operators/dataproc.py
+++ b/airflow/providers/google/cloud/operators/dataproc.py
@@ -130,7 +130,7 @@ class ClusterGenerator:
:type internal_ip_only: bool
:param tags: The GCE tags to add to all instances
:type tags: list[str]
- :param region: leave as 'global', might become relevant in the future.
(templated)
+ :param region: The specified region where the dataproc cluster is created.
:type region: str
:param gcp_conn_id: The connection ID to use connecting to Google Cloud.
:type gcp_conn_id: str
@@ -420,7 +420,7 @@ class DataprocCreateClusterOperator(BaseOperator):
If a dict is provided, it must be of the same form as the protobuf
message
:class:`~google.cloud.dataproc_v1.types.ClusterConfig`
:type cluster_config: Union[Dict,
google.cloud.dataproc_v1.types.ClusterConfig]
- :param region: leave as 'global', might become relevant in the future.
(templated)
+ :param region: The specified region where the dataproc cluster is created.
:type region: str
:parm delete_on_error: If true the cluster will be deleted if created with
ERROR state. Default
value is true.
@@ -466,7 +466,7 @@ class DataprocCreateClusterOperator(BaseOperator):
self,
*,
cluster_name: str,
- region: str = 'global',
+ region: Optional[str] = None,
project_id: Optional[str] = None,
cluster_config: Optional[Dict] = None,
labels: Optional[Dict] = None,
@@ -480,6 +480,14 @@ class DataprocCreateClusterOperator(BaseOperator):
impersonation_chain: Optional[Union[str, Sequence[str]]] = None,
**kwargs,
) -> None:
+ if region is None:
+ warnings.warn(
+ "Default region value `global` will be deprecated. Please,
provide region value.",
+ DeprecationWarning,
+ stacklevel=2,
+ )
+ region = 'global'
+
# TODO: remove one day
if cluster_config is None:
warnings.warn(
@@ -916,7 +924,7 @@ class DataprocJobBaseOperator(BaseOperator):
gcp_conn_id: str = 'google_cloud_default',
delegate_to: Optional[str] = None,
labels: Optional[Dict] = None,
- region: str = 'global',
+ region: Optional[str] = None,
job_error_states: Optional[Set[str]] = None,
impersonation_chain: Optional[Union[str, Sequence[str]]] = None,
asynchronous: bool = False,
@@ -930,7 +938,16 @@ class DataprocJobBaseOperator(BaseOperator):
self.cluster_name = cluster_name
self.dataproc_properties = dataproc_properties
self.dataproc_jars = dataproc_jars
+
+ if region is None:
+ warnings.warn(
+ "Default region value `global` will be deprecated. Please,
provide region value.",
+ DeprecationWarning,
+ stacklevel=2,
+ )
+ region = 'global'
self.region = region
+
self.job_error_states = job_error_states if job_error_states is not
None else {'ERROR'}
self.impersonation_chain = impersonation_chain
@@ -1549,7 +1566,7 @@ class
DataprocInstantiateWorkflowTemplateOperator(BaseOperator):
:param project_id: The ID of the google cloud project in which
the template runs
:type project_id: str
- :param region: leave as 'global', might become relevant in the future
+ :param region: The specified region where the dataproc cluster is created.
:type region: str
:param parameters: a map of parameters for Dataproc Template in key-value
format:
map (key: string, value: string)
@@ -1651,7 +1668,7 @@ class
DataprocInstantiateInlineWorkflowTemplateOperator(BaseOperator):
:param project_id: The ID of the google cloud project in which
the template runs
:type project_id: str
- :param region: leave as 'global', might become relevant in the future
+ :param region: The specified region where the dataproc cluster is created.
:type region: str
:param parameters: a map of parameters for Dataproc Template in key-value
format:
map (key: string, value: string)
diff --git a/tests/providers/google/cloud/hooks/test_dataproc.py
b/tests/providers/google/cloud/hooks/test_dataproc.py
index a2c1c4d..844fd36 100644
--- a/tests/providers/google/cloud/hooks/test_dataproc.py
+++ b/tests/providers/google/cloud/hooks/test_dataproc.py
@@ -289,6 +289,15 @@ class TestDataprocHook(unittest.TestCase):
metadata=None,
)
+ @mock.patch(DATAPROC_STRING.format("DataprocHook.get_job_client"))
+ def test_cancel_job_deprecation_warning(self, mock_client):
+ with self.assertWarns(DeprecationWarning):
+ self.hook.cancel_job(job_id=JOB_ID, project_id=GCP_PROJECT)
+ mock_client.assert_called_once_with(location='global')
+ mock_client.return_value.cancel_job.assert_called_once_with(
+ region='global', job_id=JOB_ID, project_id=GCP_PROJECT,
retry=None, timeout=None, metadata=None,
+ )
+
class TestDataProcJobBuilder(unittest.TestCase):
def setUp(self) -> None:
diff --git a/tests/providers/google/cloud/operators/test_dataproc.py
b/tests/providers/google/cloud/operators/test_dataproc.py
index aa4a1ec..12eee49 100644
--- a/tests/providers/google/cloud/operators/test_dataproc.py
+++ b/tests/providers/google/cloud/operators/test_dataproc.py
@@ -194,6 +194,16 @@ class TestDataprocClusterCreateOperator(unittest.TestCase):
self.assertEqual(op.cluster_config['worker_config']['num_instances'],
2)
self.assertIn("zones/zone",
op.cluster_config['master_config']["machine_type_uri"])
+ with self.assertWarns(DeprecationWarning) as warning:
+ op_default_region = DataprocCreateClusterOperator(
+ task_id=TASK_ID,
+ project_id=GCP_PROJECT,
+ cluster_name="cluster_name",
+ cluster_config=op.cluster_config,
+ )
+ assert_warning("Default region value", warning)
+ self.assertEqual(op_default_region.region, 'global')
+
@mock.patch(DATAPROC_PATH.format("DataprocHook"))
def test_execute(self, mock_hook):
op = DataprocCreateClusterOperator(