This is an automated email from the ASF dual-hosted git repository.

turbaszek pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/airflow.git


The following commit(s) were added to refs/heads/master by this push:
     new 10ce311  Deprecate using global as the default region in Google 
Dataproc operators and hooks (#10772)
10ce311 is described below

commit 10ce31127f1ff87176158935925afce46a989917
Author: Dmitri Kuksik <[email protected]>
AuthorDate: Tue Sep 8 08:46:29 2020 +0200

    Deprecate using global as the default region in Google Dataproc operators 
and hooks (#10772)
    
    The region parameter is required for some of Google Dataproc operators
    and it should be provided by users to avoid creating data-intensive
    tasks in any default location.
---
 airflow/providers/google/cloud/hooks/dataproc.py   | 10 +++++++-
 .../providers/google/cloud/operators/dataproc.py   | 29 +++++++++++++++++-----
 .../providers/google/cloud/hooks/test_dataproc.py  |  9 +++++++
 .../google/cloud/operators/test_dataproc.py        | 10 ++++++++
 4 files changed, 51 insertions(+), 7 deletions(-)

diff --git a/airflow/providers/google/cloud/hooks/dataproc.py 
b/airflow/providers/google/cloud/hooks/dataproc.py
index 0f9685b..0d068d0 100644
--- a/airflow/providers/google/cloud/hooks/dataproc.py
+++ b/airflow/providers/google/cloud/hooks/dataproc.py
@@ -851,7 +851,7 @@ class DataprocHook(GoogleBaseHook):
         self,
         job_id: str,
         project_id: str,
-        location: str = 'global',
+        location: Optional[str] = None,
         retry: Optional[Retry] = None,
         timeout: Optional[float] = None,
         metadata: Optional[Sequence[Tuple[str, str]]] = None,
@@ -874,7 +874,15 @@ class DataprocHook(GoogleBaseHook):
         :param metadata: Additional metadata that is provided to the method.
         :type metadata: Sequence[Tuple[str, str]]
         """
+        if location is None:
+            warnings.warn(
+                "Default location value `global` will be deprecated. Please, 
provide location value.",
+                DeprecationWarning,
+                stacklevel=2,
+            )
+            location = 'global'
         client = self.get_job_client(location=location)
+
         job = client.cancel_job(
             project_id=project_id,
             region=location,
diff --git a/airflow/providers/google/cloud/operators/dataproc.py 
b/airflow/providers/google/cloud/operators/dataproc.py
index a8b3780..6c59b56 100644
--- a/airflow/providers/google/cloud/operators/dataproc.py
+++ b/airflow/providers/google/cloud/operators/dataproc.py
@@ -130,7 +130,7 @@ class ClusterGenerator:
     :type internal_ip_only: bool
     :param tags: The GCE tags to add to all instances
     :type tags: list[str]
-    :param region: leave as 'global', might become relevant in the future. 
(templated)
+    :param region: The specified region where the dataproc cluster is created.
     :type region: str
     :param gcp_conn_id: The connection ID to use connecting to Google Cloud.
     :type gcp_conn_id: str
@@ -420,7 +420,7 @@ class DataprocCreateClusterOperator(BaseOperator):
         If a dict is provided, it must be of the same form as the protobuf 
message
         :class:`~google.cloud.dataproc_v1.types.ClusterConfig`
     :type cluster_config: Union[Dict, 
google.cloud.dataproc_v1.types.ClusterConfig]
-    :param region: leave as 'global', might become relevant in the future. 
(templated)
+    :param region: The specified region where the dataproc cluster is created.
     :type region: str
     :parm delete_on_error: If true the cluster will be deleted if created with 
ERROR state. Default
         value is true.
@@ -466,7 +466,7 @@ class DataprocCreateClusterOperator(BaseOperator):
         self,
         *,
         cluster_name: str,
-        region: str = 'global',
+        region: Optional[str] = None,
         project_id: Optional[str] = None,
         cluster_config: Optional[Dict] = None,
         labels: Optional[Dict] = None,
@@ -480,6 +480,14 @@ class DataprocCreateClusterOperator(BaseOperator):
         impersonation_chain: Optional[Union[str, Sequence[str]]] = None,
         **kwargs,
     ) -> None:
+        if region is None:
+            warnings.warn(
+                "Default region value `global` will be deprecated. Please, 
provide region value.",
+                DeprecationWarning,
+                stacklevel=2,
+            )
+            region = 'global'
+
         # TODO: remove one day
         if cluster_config is None:
             warnings.warn(
@@ -916,7 +924,7 @@ class DataprocJobBaseOperator(BaseOperator):
         gcp_conn_id: str = 'google_cloud_default',
         delegate_to: Optional[str] = None,
         labels: Optional[Dict] = None,
-        region: str = 'global',
+        region: Optional[str] = None,
         job_error_states: Optional[Set[str]] = None,
         impersonation_chain: Optional[Union[str, Sequence[str]]] = None,
         asynchronous: bool = False,
@@ -930,7 +938,16 @@ class DataprocJobBaseOperator(BaseOperator):
         self.cluster_name = cluster_name
         self.dataproc_properties = dataproc_properties
         self.dataproc_jars = dataproc_jars
+
+        if region is None:
+            warnings.warn(
+                "Default region value `global` will be deprecated. Please, 
provide region value.",
+                DeprecationWarning,
+                stacklevel=2,
+            )
+            region = 'global'
         self.region = region
+
         self.job_error_states = job_error_states if job_error_states is not 
None else {'ERROR'}
         self.impersonation_chain = impersonation_chain
 
@@ -1549,7 +1566,7 @@ class 
DataprocInstantiateWorkflowTemplateOperator(BaseOperator):
     :param project_id: The ID of the google cloud project in which
         the template runs
     :type project_id: str
-    :param region: leave as 'global', might become relevant in the future
+    :param region: The specified region where the dataproc cluster is created.
     :type region: str
     :param parameters: a map of parameters for Dataproc Template in key-value 
format:
         map (key: string, value: string)
@@ -1651,7 +1668,7 @@ class 
DataprocInstantiateInlineWorkflowTemplateOperator(BaseOperator):
     :param project_id: The ID of the google cloud project in which
         the template runs
     :type project_id: str
-    :param region: leave as 'global', might become relevant in the future
+    :param region: The specified region where the dataproc cluster is created.
     :type region: str
     :param parameters: a map of parameters for Dataproc Template in key-value 
format:
         map (key: string, value: string)
diff --git a/tests/providers/google/cloud/hooks/test_dataproc.py 
b/tests/providers/google/cloud/hooks/test_dataproc.py
index a2c1c4d..844fd36 100644
--- a/tests/providers/google/cloud/hooks/test_dataproc.py
+++ b/tests/providers/google/cloud/hooks/test_dataproc.py
@@ -289,6 +289,15 @@ class TestDataprocHook(unittest.TestCase):
             metadata=None,
         )
 
+    @mock.patch(DATAPROC_STRING.format("DataprocHook.get_job_client"))
+    def test_cancel_job_deprecation_warning(self, mock_client):
+        with self.assertWarns(DeprecationWarning):
+            self.hook.cancel_job(job_id=JOB_ID, project_id=GCP_PROJECT)
+        mock_client.assert_called_once_with(location='global')
+        mock_client.return_value.cancel_job.assert_called_once_with(
+            region='global', job_id=JOB_ID, project_id=GCP_PROJECT, 
retry=None, timeout=None, metadata=None,
+        )
+
 
 class TestDataProcJobBuilder(unittest.TestCase):
     def setUp(self) -> None:
diff --git a/tests/providers/google/cloud/operators/test_dataproc.py 
b/tests/providers/google/cloud/operators/test_dataproc.py
index aa4a1ec..12eee49 100644
--- a/tests/providers/google/cloud/operators/test_dataproc.py
+++ b/tests/providers/google/cloud/operators/test_dataproc.py
@@ -194,6 +194,16 @@ class TestDataprocClusterCreateOperator(unittest.TestCase):
         self.assertEqual(op.cluster_config['worker_config']['num_instances'], 
2)
         self.assertIn("zones/zone", 
op.cluster_config['master_config']["machine_type_uri"])
 
+        with self.assertWarns(DeprecationWarning) as warning:
+            op_default_region = DataprocCreateClusterOperator(
+                task_id=TASK_ID,
+                project_id=GCP_PROJECT,
+                cluster_name="cluster_name",
+                cluster_config=op.cluster_config,
+            )
+        assert_warning("Default region value", warning)
+        self.assertEqual(op_default_region.region, 'global')
+
     @mock.patch(DATAPROC_PATH.format("DataprocHook"))
     def test_execute(self, mock_hook):
         op = DataprocCreateClusterOperator(

Reply via email to