This is an automated email from the ASF dual-hosted git repository.
potiuk pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git
The following commit(s) were added to refs/heads/main by this push:
new 5ca1f3bf70 Dataproc : remove `location` in favor of `region` (#23250)
5ca1f3bf70 is described below
commit 5ca1f3bf706f6ac8437bef0c7c4944cc2d7fca04
Author: eladkal <[email protected]>
AuthorDate: Tue Apr 26 15:55:41 2022 +0300
Dataproc : remove `location` in favor of `region` (#23250)
---
airflow/providers/google/CHANGELOG.rst | 19 ++
airflow/providers/google/cloud/hooks/dataproc.py | 165 ++-----------
.../providers/google/cloud/operators/dataproc.py | 45 +---
airflow/providers/google/cloud/sensors/dataproc.py | 16 +-
.../providers/google/cloud/hooks/test_dataproc.py | 263 +--------------------
.../google/cloud/operators/test_dataproc.py | 161 +------------
.../google/cloud/sensors/test_dataproc.py | 31 +--
7 files changed, 61 insertions(+), 639 deletions(-)
diff --git a/airflow/providers/google/CHANGELOG.rst
b/airflow/providers/google/CHANGELOG.rst
index 9cb29093ec..4bc816d9e4 100644
--- a/airflow/providers/google/CHANGELOG.rst
+++ b/airflow/providers/google/CHANGELOG.rst
@@ -46,6 +46,25 @@ Breaking changes
* ``PubSubPullSensor``: Remove ``return_immediately``
+* ``DataprocJobSensor``: Remove ``location``. Please use ``region``.
+
+* ``DataprocCreateWorkflowTemplateOperator``: Remove ``location``. Please use
``region``.
+
+* ``DataprocCreateClusterOperator``: Remove ``location``. Please use
``region``.
+
+* ``DataprocSubmitJobOperator``: Remove ``location``. Please use ``region``.
+
+* ``DataprocHook``: Remove ``location`` parameter. Please use ``region``.
+ affected functions are:
+ ``cancel_job``, ``create_workflow_template``, ``get_batch_client``,
+ ``get_cluster_client``, ``get_job``, ``get_job_client``,
``get_template_client``,
+ ``instantiate_inline_workflow_template``, ``instantiate_workflow_template``,
+ ``submit_job``, ``update_cluster``,``wait_for_job``
+
+* ``DataprocHook``: order of parameters in ``wait_for_job`` function has
changed.
+
+* ``DataprocSubmitJobOperator``: order of parameters has changed.
+
6.8.0
.....
diff --git a/airflow/providers/google/cloud/hooks/dataproc.py
b/airflow/providers/google/cloud/hooks/dataproc.py
index f14a0647a5..9f051184a9 100644
--- a/airflow/providers/google/cloud/hooks/dataproc.py
+++ b/airflow/providers/google/cloud/hooks/dataproc.py
@@ -200,18 +200,8 @@ class DataprocHook(GoogleBaseHook):
keyword arguments rather than positional.
"""
- def get_cluster_client(
- self, region: Optional[str] = None, location: Optional[str] = None
- ) -> ClusterControllerClient:
+ def get_cluster_client(self, region: Optional[str] = None) ->
ClusterControllerClient:
"""Returns ClusterControllerClient."""
- if location is not None:
- warnings.warn(
- "Parameter `location` will be deprecated. "
- "Please provide value through `region` parameter instead.",
- DeprecationWarning,
- stacklevel=2,
- )
- region = location
client_options = None
if region and region != 'global':
client_options =
ClientOptions(api_endpoint=f'{region}-dataproc.googleapis.com:443')
@@ -220,18 +210,8 @@ class DataprocHook(GoogleBaseHook):
credentials=self._get_credentials(), client_info=CLIENT_INFO,
client_options=client_options
)
- def get_template_client(
- self, region: Optional[str] = None, location: Optional[str] = None
- ) -> WorkflowTemplateServiceClient:
+ def get_template_client(self, region: Optional[str] = None) ->
WorkflowTemplateServiceClient:
"""Returns WorkflowTemplateServiceClient."""
- if location is not None:
- warnings.warn(
- "Parameter `location` will be deprecated. "
- "Please provide value through `region` parameter instead.",
- DeprecationWarning,
- stacklevel=2,
- )
- region = location
client_options = None
if region and region != 'global':
client_options =
ClientOptions(api_endpoint=f'{region}-dataproc.googleapis.com:443')
@@ -240,18 +220,8 @@ class DataprocHook(GoogleBaseHook):
credentials=self._get_credentials(), client_info=CLIENT_INFO,
client_options=client_options
)
- def get_job_client(
- self, region: Optional[str] = None, location: Optional[str] = None
- ) -> JobControllerClient:
+ def get_job_client(self, region: Optional[str] = None) ->
JobControllerClient:
"""Returns JobControllerClient."""
- if location is not None:
- warnings.warn(
- "Parameter `location` will be deprecated. "
- "Please provide value through `region` parameter instead.",
- DeprecationWarning,
- stacklevel=2,
- )
- region = location
client_options = None
if region and region != 'global':
client_options =
ClientOptions(api_endpoint=f'{region}-dataproc.googleapis.com:443')
@@ -260,18 +230,8 @@ class DataprocHook(GoogleBaseHook):
credentials=self._get_credentials(), client_info=CLIENT_INFO,
client_options=client_options
)
- def get_batch_client(
- self, region: Optional[str] = None, location: Optional[str] = None
- ) -> BatchControllerClient:
+ def get_batch_client(self, region: Optional[str] = None) ->
BatchControllerClient:
"""Returns BatchControllerClient"""
- if location is not None:
- warnings.warn(
- "Parameter `location` will be deprecated. "
- "Please provide value through `region` parameter instead.",
- DeprecationWarning,
- stacklevel=2,
- )
- region = location
client_options = None
if region and region != 'global':
client_options =
ClientOptions(api_endpoint=f'{region}-dataproc.googleapis.com:443')
@@ -506,8 +466,7 @@ class DataprocHook(GoogleBaseHook):
cluster: Union[Dict, Cluster],
update_mask: Union[Dict, FieldMask],
project_id: str,
- region: Optional[str] = None,
- location: Optional[str] = None,
+ region: str,
graceful_decommission_timeout: Optional[Union[Dict, Duration]] = None,
request_id: Optional[str] = None,
retry: Union[Retry, _MethodDefault] = DEFAULT,
@@ -519,7 +478,6 @@ class DataprocHook(GoogleBaseHook):
:param project_id: Required. The ID of the Google Cloud project the
cluster belongs to.
:param region: Required. The Cloud Dataproc region in which to handle
the request.
- :param location: (To be deprecated). The Cloud Dataproc region in
which to handle the request.
:param cluster_name: Required. The cluster name.
:param cluster: Required. The changes to the cluster.
@@ -564,16 +522,7 @@ class DataprocHook(GoogleBaseHook):
:param metadata: Additional metadata that is provided to the method.
"""
if region is None:
- if location is not None:
- warnings.warn(
- "Parameter `location` will be deprecated. "
- "Please provide value through `region` parameter instead.",
- DeprecationWarning,
- stacklevel=2,
- )
- region = location
- else:
- raise TypeError("missing 1 required keyword argument:
'region'")
+ raise TypeError("missing 1 required keyword argument: 'region'")
client = self.get_cluster_client(region=region)
operation = client.update_cluster(
request={
@@ -596,8 +545,7 @@ class DataprocHook(GoogleBaseHook):
self,
template: Union[Dict, WorkflowTemplate],
project_id: str,
- region: Optional[str] = None,
- location: Optional[str] = None,
+ region: str,
retry: Union[Retry, _MethodDefault] = DEFAULT,
timeout: Optional[float] = None,
metadata: Sequence[Tuple[str, str]] = (),
@@ -607,7 +555,6 @@ class DataprocHook(GoogleBaseHook):
:param project_id: Required. The ID of the Google Cloud project the
cluster belongs to.
:param region: Required. The Cloud Dataproc region in which to handle
the request.
- :param location: (To be deprecated). The Cloud Dataproc region in
which to handle the request.
:param template: The Dataproc workflow template to create. If a dict
is provided,
it must be of the same form as the protobuf message
WorkflowTemplate.
:param retry: A retry object used to retry requests. If ``None`` is
specified, requests will not be
@@ -617,16 +564,7 @@ class DataprocHook(GoogleBaseHook):
:param metadata: Additional metadata that is provided to the method.
"""
if region is None:
- if location is not None:
- warnings.warn(
- "Parameter `location` will be deprecated. "
- "Please provide value through `region` parameter instead.",
- DeprecationWarning,
- stacklevel=2,
- )
- region = location
- else:
- raise TypeError("missing 1 required keyword argument:
'region'")
+ raise TypeError("missing 1 required keyword argument: 'region'")
metadata = metadata or ()
client = self.get_template_client(region)
parent = f'projects/{project_id}/regions/{region}'
@@ -639,8 +577,7 @@ class DataprocHook(GoogleBaseHook):
self,
template_name: str,
project_id: str,
- region: Optional[str] = None,
- location: Optional[str] = None,
+ region: str,
version: Optional[int] = None,
request_id: Optional[str] = None,
parameters: Optional[Dict[str, str]] = None,
@@ -654,7 +591,6 @@ class DataprocHook(GoogleBaseHook):
:param template_name: Name of template to instantiate.
:param project_id: Required. The ID of the Google Cloud project the
cluster belongs to.
:param region: Required. The Cloud Dataproc region in which to handle
the request.
- :param location: (To be deprecated). The Cloud Dataproc region in
which to handle the request.
:param version: Optional. The version of workflow template to
instantiate. If specified,
the workflow will be instantiated only if the current version of
the workflow template has the supplied version.
@@ -672,16 +608,7 @@ class DataprocHook(GoogleBaseHook):
:param metadata: Additional metadata that is provided to the method.
"""
if region is None:
- if location is not None:
- warnings.warn(
- "Parameter `location` will be deprecated. "
- "Please provide value through `region` parameter instead.",
- DeprecationWarning,
- stacklevel=2,
- )
- region = location
- else:
- raise TypeError("missing 1 required keyword argument:
'region'")
+ raise TypeError("missing 1 required keyword argument: 'region'")
metadata = metadata or ()
client = self.get_template_client(region)
name =
f'projects/{project_id}/regions/{region}/workflowTemplates/{template_name}'
@@ -698,8 +625,7 @@ class DataprocHook(GoogleBaseHook):
self,
template: Union[Dict, WorkflowTemplate],
project_id: str,
- region: Optional[str] = None,
- location: Optional[str] = None,
+ region: str,
request_id: Optional[str] = None,
retry: Union[Retry, _MethodDefault] = DEFAULT,
timeout: Optional[float] = None,
@@ -712,7 +638,6 @@ class DataprocHook(GoogleBaseHook):
it must be of the same form as the protobuf message
WorkflowTemplate
:param project_id: Required. The ID of the Google Cloud project the
cluster belongs to.
:param region: Required. The Cloud Dataproc region in which to handle
the request.
- :param location: (To be deprecated). The Cloud Dataproc region in
which to handle the request.
:param request_id: Optional. A tag that prevents multiple concurrent
workflow instances
with the same tag from running. This mitigates risk of concurrent
instances started due to retries.
@@ -723,16 +648,7 @@ class DataprocHook(GoogleBaseHook):
:param metadata: Additional metadata that is provided to the method.
"""
if region is None:
- if location is not None:
- warnings.warn(
- "Parameter `location` will be deprecated. "
- "Please provide value through `region` parameter instead.",
- DeprecationWarning,
- stacklevel=2,
- )
- region = location
- else:
- raise TypeError("missing 1 required keyword argument:
'region'")
+ raise TypeError("missing 1 required keyword argument: 'region'")
metadata = metadata or ()
client = self.get_template_client(region)
parent = f'projects/{project_id}/regions/{region}'
@@ -749,9 +665,8 @@ class DataprocHook(GoogleBaseHook):
self,
job_id: str,
project_id: str,
+ region: str,
wait_time: int = 10,
- region: Optional[str] = None,
- location: Optional[str] = None,
timeout: Optional[int] = None,
) -> None:
"""
@@ -760,21 +675,11 @@ class DataprocHook(GoogleBaseHook):
:param job_id: Id of the Dataproc job
:param project_id: Required. The ID of the Google Cloud project the
cluster belongs to.
:param region: Required. The Cloud Dataproc region in which to handle
the request.
- :param location: (To be deprecated). The Cloud Dataproc region in
which to handle the request.
:param wait_time: Number of seconds between checks
:param timeout: How many seconds wait for job to be ready. Used only
if ``asynchronous`` is False
"""
if region is None:
- if location is not None:
- warnings.warn(
- "Parameter `location` will be deprecated. "
- "Please provide value through `region` parameter instead.",
- DeprecationWarning,
- stacklevel=2,
- )
- region = location
- else:
- raise TypeError("missing 1 required keyword argument:
'region'")
+ raise TypeError("missing 1 required keyword argument: 'region'")
state = None
start = time.monotonic()
while state not in (JobStatus.State.ERROR, JobStatus.State.DONE,
JobStatus.State.CANCELLED):
@@ -797,8 +702,7 @@ class DataprocHook(GoogleBaseHook):
self,
job_id: str,
project_id: str,
- region: Optional[str] = None,
- location: Optional[str] = None,
+ region: str,
retry: Union[Retry, _MethodDefault] = DEFAULT,
timeout: Optional[float] = None,
metadata: Sequence[Tuple[str, str]] = (),
@@ -809,7 +713,6 @@ class DataprocHook(GoogleBaseHook):
:param job_id: Id of the Dataproc job
:param project_id: Required. The ID of the Google Cloud project the
cluster belongs to.
:param region: Required. The Cloud Dataproc region in which to handle
the request.
- :param location: (To be deprecated). The Cloud Dataproc region in
which to handle the request.
:param retry: A retry object used to retry requests. If ``None`` is
specified, requests will not be
retried.
:param timeout: The amount of time, in seconds, to wait for the
request to complete. Note that if
@@ -817,16 +720,7 @@ class DataprocHook(GoogleBaseHook):
:param metadata: Additional metadata that is provided to the method.
"""
if region is None:
- if location is not None:
- warnings.warn(
- "Parameter `location` will be deprecated. "
- "Please provide value through `region` parameter instead.",
- DeprecationWarning,
- stacklevel=2,
- )
- region = location
- else:
- raise TypeError("missing 1 required keyword argument:
'region'")
+ raise TypeError("missing 1 required keyword argument: 'region'")
client = self.get_job_client(region=region)
job = client.get_job(
request={'project_id': project_id, 'region': region, 'job_id':
job_id},
@@ -841,8 +735,7 @@ class DataprocHook(GoogleBaseHook):
self,
job: Union[dict, Job],
project_id: str,
- region: Optional[str] = None,
- location: Optional[str] = None,
+ region: str,
request_id: Optional[str] = None,
retry: Union[Retry, _MethodDefault] = DEFAULT,
timeout: Optional[float] = None,
@@ -855,7 +748,6 @@ class DataprocHook(GoogleBaseHook):
it must be of the same form as the protobuf message Job
:param project_id: Required. The ID of the Google Cloud project the
cluster belongs to.
:param region: Required. The Cloud Dataproc region in which to handle
the request.
- :param location: (To be deprecated). The Cloud Dataproc region in
which to handle the request.
:param request_id: Optional. A tag that prevents multiple concurrent
workflow instances
with the same tag from running. This mitigates risk of concurrent
instances started due to retries.
@@ -866,16 +758,7 @@ class DataprocHook(GoogleBaseHook):
:param metadata: Additional metadata that is provided to the method.
"""
if region is None:
- if location is not None:
- warnings.warn(
- "Parameter `location` will be deprecated. "
- "Please provide value through `region` parameter instead.",
- DeprecationWarning,
- stacklevel=2,
- )
- region = location
- else:
- raise TypeError("missing 1 required keyword argument:
'region'")
+ raise TypeError("missing 1 required keyword argument: 'region'")
client = self.get_job_client(region=region)
return client.submit_job(
request={'project_id': project_id, 'region': region, 'job': job,
'request_id': request_id},
@@ -911,7 +794,6 @@ class DataprocHook(GoogleBaseHook):
job_id: str,
project_id: str,
region: Optional[str] = None,
- location: Optional[str] = None,
retry: Union[Retry, _MethodDefault] = DEFAULT,
timeout: Optional[float] = None,
metadata: Sequence[Tuple[str, str]] = (),
@@ -921,7 +803,6 @@ class DataprocHook(GoogleBaseHook):
:param project_id: Required. The ID of the Google Cloud project that
the job belongs to.
:param region: Required. The Cloud Dataproc region in which to handle
the request.
- :param location: (To be deprecated). The Cloud Dataproc region in
which to handle the request.
:param job_id: Required. The job ID.
:param retry: A retry object used to retry requests. If ``None`` is
specified, requests will not be
retried.
@@ -929,16 +810,6 @@ class DataprocHook(GoogleBaseHook):
``retry`` is specified, the timeout applies to each individual
attempt.
:param metadata: Additional metadata that is provided to the method.
"""
- if region is None:
- if location is not None:
- warnings.warn(
- "Parameter `location` will be deprecated. "
- "Please provide value through `region` parameter instead.",
- DeprecationWarning,
- stacklevel=2,
- )
- region = location
-
if region is None:
warnings.warn(
"Default region value `global` will be deprecated. Please,
provide region value.",
diff --git a/airflow/providers/google/cloud/operators/dataproc.py
b/airflow/providers/google/cloud/operators/dataproc.py
index 8008ec53c3..0d55b8b7c5 100644
--- a/airflow/providers/google/cloud/operators/dataproc.py
+++ b/airflow/providers/google/cloud/operators/dataproc.py
@@ -1512,7 +1512,6 @@ class
DataprocCreateWorkflowTemplateOperator(BaseOperator):
:param project_id: Optional. The ID of the Google Cloud project the
cluster belongs to.
:param region: Required. The Cloud Dataproc region in which to handle the
request.
- :param location: (To be deprecated). The Cloud Dataproc region in which to
handle the request.
:param template: The Dataproc workflow template to create. If a dict is
provided,
it must be of the same form as the protobuf message WorkflowTemplate.
:param retry: A retry object used to retry requests. If ``None`` is
specified, requests will not be
@@ -1530,9 +1529,8 @@ class
DataprocCreateWorkflowTemplateOperator(BaseOperator):
self,
*,
template: Dict,
- region: Optional[str] = None,
+ region: str,
project_id: Optional[str] = None,
- location: Optional[str] = None,
retry: Union[Retry, _MethodDefault] = DEFAULT,
timeout: Optional[float] = None,
metadata: Sequence[Tuple[str, str]] = (),
@@ -1540,17 +1538,6 @@ class
DataprocCreateWorkflowTemplateOperator(BaseOperator):
impersonation_chain: Optional[Union[str, Sequence[str]]] = None,
**kwargs,
):
- if region is None:
- if location is not None:
- warnings.warn(
- "Parameter `location` will be deprecated. "
- "Please provide value through `region` parameter instead.",
- DeprecationWarning,
- stacklevel=2,
- )
- region = location
- else:
- raise TypeError("missing 1 required keyword argument:
'region'")
super().__init__(**kwargs)
self.region = region
self.template = template
@@ -1775,7 +1762,6 @@ class DataprocSubmitJobOperator(BaseOperator):
:param project_id: Optional. The ID of the Google Cloud project that the
job belongs to.
:param region: Required. The Cloud Dataproc region in which to handle the
request.
- :param location: (To be deprecated). The Cloud Dataproc region in which to
handle the request.
:param job: Required. The job resource.
If a dict is provided, it must be of the same form as the protobuf
message
:class:`~google.cloud.dataproc_v1.types.Job`
@@ -1813,9 +1799,8 @@ class DataprocSubmitJobOperator(BaseOperator):
self,
*,
job: Dict,
+ region: str,
project_id: Optional[str] = None,
- region: Optional[str] = None,
- location: Optional[str] = None,
request_id: Optional[str] = None,
retry: Union[Retry, _MethodDefault] = DEFAULT,
timeout: Optional[float] = None,
@@ -1827,17 +1812,6 @@ class DataprocSubmitJobOperator(BaseOperator):
wait_timeout: Optional[int] = None,
**kwargs,
) -> None:
- if region is None:
- if location is not None:
- warnings.warn(
- "Parameter `location` will be deprecated. "
- "Please provide value through `region` parameter instead.",
- DeprecationWarning,
- stacklevel=2,
- )
- region = location
- else:
- raise TypeError("missing 1 required keyword argument:
'region'")
super().__init__(**kwargs)
self.project_id = project_id
self.region = region
@@ -1894,7 +1868,6 @@ class DataprocUpdateClusterOperator(BaseOperator):
:param region: Required. The Cloud Dataproc region in which to handle the
request.
:param project_id: Optional. The ID of the Google Cloud project the
cluster belongs to.
- :param location: (To be deprecated). The Cloud Dataproc region in which to
handle the request.
:param cluster_name: Required. The cluster name.
:param cluster: Required. The changes to the cluster.
@@ -1946,8 +1919,7 @@ class DataprocUpdateClusterOperator(BaseOperator):
cluster: Union[Dict, Cluster],
update_mask: Union[Dict, FieldMask],
graceful_decommission_timeout: Union[Dict, Duration],
- region: Optional[str] = None,
- location: Optional[str] = None,
+ region: str,
request_id: Optional[str] = None,
project_id: Optional[str] = None,
retry: Union[Retry, _MethodDefault] = DEFAULT,
@@ -1957,17 +1929,6 @@ class DataprocUpdateClusterOperator(BaseOperator):
impersonation_chain: Optional[Union[str, Sequence[str]]] = None,
**kwargs,
):
- if region is None:
- if location is not None:
- warnings.warn(
- "Parameter `location` will be deprecated. "
- "Please provide value through `region` parameter instead.",
- DeprecationWarning,
- stacklevel=2,
- )
- region = location
- else:
- raise TypeError("missing 1 required keyword argument:
'region'")
super().__init__(**kwargs)
self.project_id = project_id
self.region = region
diff --git a/airflow/providers/google/cloud/sensors/dataproc.py
b/airflow/providers/google/cloud/sensors/dataproc.py
index 3cc71882e9..929a5a8fe1 100644
--- a/airflow/providers/google/cloud/sensors/dataproc.py
+++ b/airflow/providers/google/cloud/sensors/dataproc.py
@@ -18,7 +18,6 @@
"""This module contains a Dataproc Job sensor."""
# pylint: disable=C0302
import time
-import warnings
from typing import TYPE_CHECKING, Optional, Sequence
from google.api_core.exceptions import ServerError
@@ -40,7 +39,6 @@ class DataprocJobSensor(BaseSensorOperator):
:param region: Required. The Cloud Dataproc region in which to handle the
request. (templated)
:param project_id: The ID of the google cloud project in which
to create the cluster. (templated)
- :param location: (To be deprecated). The Cloud Dataproc region in which to
handle the request. (templated)
:param gcp_conn_id: The connection ID to use connecting to Google Cloud
Platform.
:param wait_timeout: How many seconds wait for job to be ready.
"""
@@ -52,24 +50,12 @@ class DataprocJobSensor(BaseSensorOperator):
self,
*,
dataproc_job_id: str,
- region: Optional[str] = None,
+ region: str,
project_id: Optional[str] = None,
- location: Optional[str] = None,
gcp_conn_id: str = 'google_cloud_default',
wait_timeout: Optional[int] = None,
**kwargs,
) -> None:
- if region is None:
- if location is not None:
- warnings.warn(
- "Parameter `location` will be deprecated. "
- "Please provide value through `region` parameter instead.",
- DeprecationWarning,
- stacklevel=2,
- )
- region = location
- else:
- raise TypeError("missing 1 required keyword argument:
'region'")
super().__init__(**kwargs)
self.project_id = project_id
self.gcp_conn_id = gcp_conn_id
diff --git a/tests/providers/google/cloud/hooks/test_dataproc.py
b/tests/providers/google/cloud/hooks/test_dataproc.py
index 2572179401..b52039dd2d 100644
--- a/tests/providers/google/cloud/hooks/test_dataproc.py
+++ b/tests/providers/google/cloud/hooks/test_dataproc.py
@@ -83,22 +83,6 @@ class TestDataprocHook(unittest.TestCase):
client_options=ANY,
)
- @mock.patch(DATAPROC_STRING.format("DataprocHook._get_credentials"))
- @mock.patch(DATAPROC_STRING.format("ClusterControllerClient"))
- def test_get_cluster_client_region_deprecation_warning(self, mock_client,
mock_get_credentials):
- warning_message = (
- "Parameter `location` will be deprecated. "
- "Please provide value through `region` parameter instead."
- )
- with pytest.warns(DeprecationWarning) as warnings:
- self.hook.get_cluster_client(location='region1')
- mock_client.assert_called_once_with(
- credentials=mock_get_credentials.return_value,
- client_info=CLIENT_INFO,
- client_options=ANY,
- )
- assert warning_message == str(warnings[0].message)
-
@mock.patch(DATAPROC_STRING.format("DataprocHook._get_credentials"))
@mock.patch(DATAPROC_STRING.format("WorkflowTemplateServiceClient"))
def test_get_template_client_global(self, mock_client,
mock_get_credentials):
@@ -119,22 +103,6 @@ class TestDataprocHook(unittest.TestCase):
client_options=ANY,
)
- @mock.patch(DATAPROC_STRING.format("DataprocHook._get_credentials"))
- @mock.patch(DATAPROC_STRING.format("WorkflowTemplateServiceClient"))
- def test_get_template_client_region_deprecation_warning(self, mock_client,
mock_get_credentials):
- warning_message = (
- "Parameter `location` will be deprecated. "
- "Please provide value through `region` parameter instead."
- )
- with pytest.warns(DeprecationWarning) as warnings:
- _ = self.hook.get_template_client(location='region1')
- mock_client.assert_called_once_with(
- credentials=mock_get_credentials.return_value,
- client_info=CLIENT_INFO,
- client_options=ANY,
- )
- assert warning_message == str(warnings[0].message)
-
@mock.patch(DATAPROC_STRING.format("DataprocHook._get_credentials"))
@mock.patch(DATAPROC_STRING.format("JobControllerClient"))
def test_get_job_client(self, mock_client, mock_get_credentials):
@@ -155,22 +123,6 @@ class TestDataprocHook(unittest.TestCase):
client_options=ANY,
)
- @mock.patch(DATAPROC_STRING.format("DataprocHook._get_credentials"))
- @mock.patch(DATAPROC_STRING.format("JobControllerClient"))
- def test_get_job_client_region_deprecation_warning(self, mock_client,
mock_get_credentials):
- warning_message = (
- "Parameter `location` will be deprecated. "
- "Please provide value through `region` parameter instead."
- )
- with pytest.warns(DeprecationWarning) as warnings:
- self.hook.get_job_client(location='region1')
- mock_client.assert_called_once_with(
- credentials=mock_get_credentials.return_value,
- client_info=CLIENT_INFO,
- client_options=ANY,
- )
- assert warning_message == str(warnings[0].message)
-
@mock.patch(DATAPROC_STRING.format("DataprocHook._get_credentials"))
@mock.patch(DATAPROC_STRING.format("BatchControllerClient"))
def test_get_batch_client(self, mock_client, mock_get_credentials):
@@ -189,22 +141,6 @@ class TestDataprocHook(unittest.TestCase):
credentials=mock_get_credentials.return_value,
client_info=CLIENT_INFO, client_options=ANY
)
- @mock.patch(DATAPROC_STRING.format("DataprocHook._get_credentials"))
- @mock.patch(DATAPROC_STRING.format("BatchControllerClient"))
- def test_get_batch_client_region_deprecation_warning(self, mock_client,
mock_get_credentials):
- warning_message = (
- "Parameter `location` will be deprecated. "
- "Please provide value through `region` parameter instead."
- )
- with pytest.warns(DeprecationWarning) as warnings:
- self.hook.get_batch_client(location='region1')
- mock_client.assert_called_once_with(
- credentials=mock_get_credentials.return_value,
- client_info=CLIENT_INFO,
- client_options=ANY,
- )
- assert warning_message == str(warnings[0].message)
-
@mock.patch(DATAPROC_STRING.format("DataprocHook.get_cluster_client"))
def test_create_cluster(self, mock_client):
self.hook.create_cluster(
@@ -320,43 +256,13 @@ class TestDataprocHook(unittest.TestCase):
)
@mock.patch(DATAPROC_STRING.format("DataprocHook.get_cluster_client"))
- def test_update_cluster_depreciation_warning(self, mock_client):
- warning_message = (
- "Parameter `location` will be deprecated. "
- "Please provide value through `region` parameter instead."
- )
- with pytest.warns(DeprecationWarning) as warnings:
- update_mask = "update-mask"
- self.hook.update_cluster(
- project_id=GCP_PROJECT,
- location=GCP_LOCATION,
- cluster=CLUSTER,
- cluster_name=CLUSTER_NAME,
- update_mask=update_mask,
- )
- mock_client.assert_called_once_with(region=GCP_LOCATION)
- mock_client.return_value.update_cluster.assert_called_once_with(
- request=dict(
- project_id=GCP_PROJECT,
- region=GCP_LOCATION,
- cluster=CLUSTER,
- cluster_name=CLUSTER_NAME,
- update_mask=update_mask,
- graceful_decommission_timeout=None,
- request_id=None,
- ),
- metadata=(),
- retry=DEFAULT,
- timeout=None,
- )
- assert warning_message == str(warnings[0].message)
-
+ def test_update_cluster_missing_region(self, mock_client):
with pytest.raises(TypeError):
self.hook.update_cluster(
project_id=GCP_PROJECT,
cluster=CLUSTER,
cluster_name=CLUSTER_NAME,
- update_mask=update_mask,
+ update_mask="update-mask",
)
@mock.patch(DATAPROC_STRING.format("DataprocHook.get_template_client"))
@@ -368,26 +274,6 @@ class TestDataprocHook(unittest.TestCase):
request=dict(parent=parent, template=template), retry=DEFAULT,
timeout=None, metadata=()
)
- @mock.patch(DATAPROC_STRING.format("DataprocHook.get_template_client"))
- def test_create_workflow_template_depreciation_warning(self, mock_client):
- warning_message = (
- "Parameter `location` will be deprecated. "
- "Please provide value through `region` parameter instead."
- )
- with pytest.warns(DeprecationWarning) as warnings:
- template = {"test": "test"}
- parent = f'projects/{GCP_PROJECT}/regions/{GCP_LOCATION}'
- self.hook.create_workflow_template(
- location=GCP_LOCATION, template=template,
project_id=GCP_PROJECT
- )
-
mock_client.return_value.create_workflow_template.assert_called_once_with(
- request=dict(parent=parent, template=template), retry=DEFAULT,
timeout=None, metadata=()
- )
- assert warning_message == str(warnings[0].message)
-
- with pytest.raises(TypeError):
- self.hook.create_workflow_template(template=template,
project_id=GCP_PROJECT)
-
@mock.patch(DATAPROC_STRING.format("DataprocHook.get_template_client"))
def test_instantiate_workflow_template(self, mock_client):
template_name = "template_name"
@@ -403,27 +289,9 @@ class TestDataprocHook(unittest.TestCase):
)
@mock.patch(DATAPROC_STRING.format("DataprocHook.get_template_client"))
- def test_instantiate_workflow_template_depreciation_warning(self,
mock_client):
- warning_message = (
- "Parameter `location` will be deprecated. "
- "Please provide value through `region` parameter instead."
- )
- with pytest.warns(DeprecationWarning) as warnings:
- template_name = "template_name"
- name =
f'projects/{GCP_PROJECT}/regions/{GCP_LOCATION}/workflowTemplates/{template_name}'
- self.hook.instantiate_workflow_template(
- location=GCP_LOCATION, template_name=template_name,
project_id=GCP_PROJECT
- )
-
mock_client.return_value.instantiate_workflow_template.assert_called_once_with(
- request=dict(name=name, version=None, parameters=None,
request_id=None),
- retry=DEFAULT,
- timeout=None,
- metadata=(),
- )
- assert warning_message == str(warnings[0].message)
-
+ def test_instantiate_workflow_template_missing_region(self, mock_client):
with pytest.raises(TypeError):
-
self.hook.instantiate_workflow_template(template_name=template_name,
project_id=GCP_PROJECT)
+
self.hook.instantiate_workflow_template(template_name="template_name",
project_id=GCP_PROJECT)
@mock.patch(DATAPROC_STRING.format("DataprocHook.get_template_client"))
def test_instantiate_inline_workflow_template(self, mock_client):
@@ -440,27 +308,9 @@ class TestDataprocHook(unittest.TestCase):
)
@mock.patch(DATAPROC_STRING.format("DataprocHook.get_template_client"))
- def test_instantiate_inline_workflow_template_deprecation_warning(self,
mock_client):
- warning_message = (
- "Parameter `location` will be deprecated. "
- "Please provide value through `region` parameter instead."
- )
- with pytest.warns(DeprecationWarning) as warnings:
- template = {"test": "test"}
- parent = f'projects/{GCP_PROJECT}/regions/{GCP_LOCATION}'
- self.hook.instantiate_inline_workflow_template(
- location=GCP_LOCATION, template=template,
project_id=GCP_PROJECT
- )
-
mock_client.return_value.instantiate_inline_workflow_template.assert_called_once_with(
- request=dict(parent=parent, template=template,
request_id=None),
- retry=DEFAULT,
- timeout=None,
- metadata=(),
- )
- assert warning_message == str(warnings[0].message)
-
+ def test_instantiate_inline_workflow_template_missing_region(self,
mock_client):
with pytest.raises(TypeError):
- self.hook.instantiate_inline_workflow_template(template=template,
project_id=GCP_PROJECT)
+ self.hook.instantiate_inline_workflow_template(template={"test":
"test"}, project_id=GCP_PROJECT)
@mock.patch(DATAPROC_STRING.format("DataprocHook.get_job"))
def test_wait_for_job(self, mock_get_job):
@@ -477,27 +327,7 @@ class TestDataprocHook(unittest.TestCase):
mock_get_job.assert_has_calls(calls)
@mock.patch(DATAPROC_STRING.format("DataprocHook.get_job"))
- def test_wait_for_job_deprecation_warning(self, mock_get_job):
- warning_message = (
- "Parameter `location` will be deprecated. "
- "Please provide value through `region` parameter instead."
- )
- with pytest.warns(DeprecationWarning) as warnings:
- mock_get_job.side_effect = [
-
mock.MagicMock(status=mock.MagicMock(state=JobStatus.State.RUNNING)),
-
mock.MagicMock(status=mock.MagicMock(state=JobStatus.State.ERROR)),
- ]
- with pytest.raises(AirflowException):
- self.hook.wait_for_job(
- job_id=JOB_ID, location=GCP_LOCATION,
project_id=GCP_PROJECT, wait_time=0
- )
- calls = [
- mock.call(region=GCP_LOCATION, job_id=JOB_ID,
project_id=GCP_PROJECT),
- mock.call(region=GCP_LOCATION, job_id=JOB_ID,
project_id=GCP_PROJECT),
- ]
- mock_get_job.assert_has_calls(calls)
- assert warning_message == str(warnings[0].message)
-
+ def test_wait_for_job_missing_region(self, mock_get_job):
with pytest.raises(TypeError):
self.hook.wait_for_job(job_id=JOB_ID, project_id=GCP_PROJECT,
wait_time=0)
@@ -517,26 +347,7 @@ class TestDataprocHook(unittest.TestCase):
)
@mock.patch(DATAPROC_STRING.format("DataprocHook.get_job_client"))
- def test_get_job_deprecation_warning(self, mock_client):
- warning_message = (
- "Parameter `location` will be deprecated. "
- "Please provide value through `region` parameter instead."
- )
- with pytest.warns(DeprecationWarning) as warnings:
- self.hook.get_job(location=GCP_LOCATION, job_id=JOB_ID,
project_id=GCP_PROJECT)
- mock_client.assert_called_once_with(region=GCP_LOCATION)
- mock_client.return_value.get_job.assert_called_once_with(
- request=dict(
- region=GCP_LOCATION,
- job_id=JOB_ID,
- project_id=GCP_PROJECT,
- ),
- retry=DEFAULT,
- timeout=None,
- metadata=(),
- )
- assert warning_message == str(warnings[0].message)
-
+ def test_get_job_missing_region(self, mock_client):
with pytest.raises(TypeError):
self.hook.get_job(job_id=JOB_ID, project_id=GCP_PROJECT)
@@ -557,26 +368,7 @@ class TestDataprocHook(unittest.TestCase):
)
@mock.patch(DATAPROC_STRING.format("DataprocHook.get_job_client"))
- def test_submit_job_deprecation_warning(self, mock_client):
- warning_message = (
- "Parameter `location` will be deprecated. "
- "Please provide value through `region` parameter instead."
- )
- with pytest.warns(DeprecationWarning) as warnings:
- self.hook.submit_job(location=GCP_LOCATION, job=JOB,
project_id=GCP_PROJECT)
- mock_client.assert_called_once_with(region=GCP_LOCATION)
- mock_client.return_value.submit_job.assert_called_once_with(
- request=dict(
- region=GCP_LOCATION,
- job=JOB,
- project_id=GCP_PROJECT,
- request_id=None,
- ),
- retry=DEFAULT,
- timeout=None,
- metadata=(),
- )
- assert warning_message == str(warnings[0].message)
+ def test_submit_job_missing_region(self, mock_client):
with pytest.raises(TypeError):
self.hook.submit_job(job=JOB, project_id=GCP_PROJECT)
@@ -604,43 +396,6 @@ class TestDataprocHook(unittest.TestCase):
metadata=(),
)
- @mock.patch(DATAPROC_STRING.format("DataprocHook.get_job_client"))
- def test_cancel_job_deprecation_warning_default_region(self, mock_client):
- with pytest.warns(DeprecationWarning):
- self.hook.cancel_job(job_id=JOB_ID, project_id=GCP_PROJECT)
- mock_client.assert_called_once_with(region='global')
- mock_client.return_value.cancel_job.assert_called_once_with(
- request=dict(
- region='global',
- job_id=JOB_ID,
- project_id=GCP_PROJECT,
- ),
- retry=DEFAULT,
- timeout=None,
- metadata=(),
- )
-
- @mock.patch(DATAPROC_STRING.format("DataprocHook.get_job_client"))
- def test_cancel_job_deprecation_warning_param_rename(self, mock_client):
- warning_message = (
- "Parameter `location` will be deprecated. "
- "Please provide value through `region` parameter instead."
- )
- with pytest.warns(DeprecationWarning) as warnings:
- self.hook.cancel_job(location=GCP_LOCATION, job_id=JOB_ID,
project_id=GCP_PROJECT)
- mock_client.assert_called_once_with(region='global')
- mock_client.return_value.cancel_job.assert_called_once_with(
- request=dict(
- region='global',
- job_id=JOB_ID,
- project_id=GCP_PROJECT,
- ),
- retry=DEFAULT,
- timeout=None,
- metadata=(),
- )
- assert warning_message == str(warnings[0].message)
-
@mock.patch(DATAPROC_STRING.format("DataprocHook.get_batch_client"))
def test_create_batch(self, mock_client):
self.hook.create_batch(
diff --git a/tests/providers/google/cloud/operators/test_dataproc.py
b/tests/providers/google/cloud/operators/test_dataproc.py
index 6c527830ec..e342950003 100644
--- a/tests/providers/google/cloud/operators/test_dataproc.py
+++ b/tests/providers/google/cloud/operators/test_dataproc.py
@@ -973,72 +973,12 @@ class TestDataprocSubmitJobOperator(DataprocJobTestBase):
)
@mock.patch(DATAPROC_PATH.format("DataprocHook"))
- def test_location_deprecation_warning(self, mock_hook):
- xcom_push_call = call.ti.xcom_push(execution_date=None, key='conf',
value=DATAPROC_JOB_CONF_EXPECTED)
- wait_for_job_call = call.hook().wait_for_job(
- job_id=TEST_JOB_ID, region=GCP_LOCATION, project_id=GCP_PROJECT,
timeout=None
- )
-
- job = {}
- mock_hook.return_value.wait_for_job.return_value = None
- mock_hook.return_value.submit_job.return_value.reference.job_id =
TEST_JOB_ID
- self.extra_links_manager_mock.attach_mock(mock_hook, 'hook')
-
- warning_message = (
- "Parameter `location` will be deprecated. "
- "Please provide value through `region` parameter instead."
- )
-
- with pytest.warns(DeprecationWarning) as warnings:
- op = DataprocSubmitJobOperator(
- task_id=TASK_ID,
- location=GCP_LOCATION,
- project_id=GCP_PROJECT,
- job=job,
- gcp_conn_id=GCP_CONN_ID,
- retry=RETRY,
- timeout=TIMEOUT,
- metadata=METADATA,
- request_id=REQUEST_ID,
- impersonation_chain=IMPERSONATION_CHAIN,
- )
- op.execute(context=self.mock_context)
-
- mock_hook.assert_called_once_with(
- gcp_conn_id=GCP_CONN_ID,
impersonation_chain=IMPERSONATION_CHAIN
- )
-
- # Test whether xcom push occurs before polling for job
- self.assertLess(
- self.extra_links_manager_mock.mock_calls.index(xcom_push_call),
-
self.extra_links_manager_mock.mock_calls.index(wait_for_job_call),
- msg='Xcom push for Job Link has to be done before polling for
job status',
- )
-
- mock_hook.return_value.submit_job.assert_called_once_with(
- project_id=GCP_PROJECT,
- region=GCP_LOCATION,
- job=job,
- request_id=REQUEST_ID,
- retry=RETRY,
- timeout=TIMEOUT,
- metadata=METADATA,
- )
- mock_hook.return_value.wait_for_job.assert_called_once_with(
- job_id=TEST_JOB_ID, project_id=GCP_PROJECT,
region=GCP_LOCATION, timeout=None
- )
-
- self.mock_ti.xcom_push.assert_called_once_with(
- key="conf", value=DATAPROC_JOB_CONF_EXPECTED,
execution_date=None
- )
-
- assert warning_message == str(warnings[0].message)
-
- with pytest.raises(TypeError):
+ def test_missing_region_parameter(self, mock_hook):
+ with pytest.raises(AirflowException):
op = DataprocSubmitJobOperator(
task_id=TASK_ID,
project_id=GCP_PROJECT,
- job=job,
+ job={},
gcp_conn_id=GCP_CONN_ID,
retry=RETRY,
timeout=TIMEOUT,
@@ -1142,70 +1082,15 @@ class
TestDataprocUpdateClusterOperator(DataprocClusterTestBase):
)
@mock.patch(DATAPROC_PATH.format("DataprocHook"))
- def test_location_deprecation_warning(self, mock_hook):
- self.extra_links_manager_mock.attach_mock(mock_hook, 'hook')
- mock_hook.return_value.update_cluster.result.return_value = None
- cluster_decommission_timeout = {"graceful_decommission_timeout":
"600s"}
- update_cluster_args = {
- 'region': GCP_LOCATION,
- 'project_id': GCP_PROJECT,
- 'cluster_name': CLUSTER_NAME,
- 'cluster': CLUSTER,
- 'update_mask': UPDATE_MASK,
- 'graceful_decommission_timeout': cluster_decommission_timeout,
- 'request_id': REQUEST_ID,
- 'retry': RETRY,
- 'timeout': TIMEOUT,
- 'metadata': METADATA,
- }
- expected_calls = self.extra_links_expected_calls_base + [
- call.hook().update_cluster(**update_cluster_args)
- ]
- warning_message = (
- "Parameter `location` will be deprecated. "
- "Please provide value through `region` parameter instead."
- )
-
- with pytest.warns(DeprecationWarning) as warnings:
- op = DataprocUpdateClusterOperator(
- task_id=TASK_ID,
- location=GCP_LOCATION,
- cluster_name=CLUSTER_NAME,
- cluster=CLUSTER,
- update_mask=UPDATE_MASK,
- request_id=REQUEST_ID,
- graceful_decommission_timeout=cluster_decommission_timeout,
- project_id=GCP_PROJECT,
- gcp_conn_id=GCP_CONN_ID,
- retry=RETRY,
- timeout=TIMEOUT,
- metadata=METADATA,
- impersonation_chain=IMPERSONATION_CHAIN,
- )
- op.execute(context=self.mock_context)
- mock_hook.assert_called_once_with(
- gcp_conn_id=GCP_CONN_ID,
impersonation_chain=IMPERSONATION_CHAIN
- )
-
mock_hook.return_value.update_cluster.assert_called_once_with(**update_cluster_args)
- assert warning_message == str(warnings[0].message)
-
- # Test whether the xcom push happens before updating the cluster
- self.extra_links_manager_mock.assert_has_calls(expected_calls,
any_order=False)
-
- self.mock_ti.xcom_push.assert_called_once_with(
- key="conf",
- value=DATAPROC_CLUSTER_CONF_EXPECTED,
- execution_date=None,
- )
-
- with pytest.raises(TypeError):
+ def test_missing_region_parameter(self, mock_hook):
+ with pytest.raises(AirflowException):
op = DataprocUpdateClusterOperator(
task_id=TASK_ID,
cluster_name=CLUSTER_NAME,
cluster=CLUSTER,
update_mask=UPDATE_MASK,
request_id=REQUEST_ID,
- graceful_decommission_timeout=cluster_decommission_timeout,
+
graceful_decommission_timeout={"graceful_decommission_timeout": "600s"},
project_id=GCP_PROJECT,
gcp_conn_id=GCP_CONN_ID,
retry=RETRY,
@@ -1719,38 +1604,8 @@ class TestDataprocCreateWorkflowTemplateOperator:
)
@mock.patch(DATAPROC_PATH.format("DataprocHook"))
- def test_location_deprecation_warning(self, mock_hook):
- with pytest.warns(DeprecationWarning) as warnings:
- warning_message = (
- "Parameter `location` will be deprecated. "
- "Please provide value through `region` parameter instead."
- )
- op = DataprocCreateWorkflowTemplateOperator(
- task_id=TASK_ID,
- gcp_conn_id=GCP_CONN_ID,
- impersonation_chain=IMPERSONATION_CHAIN,
- location=GCP_LOCATION,
- project_id=GCP_PROJECT,
- retry=RETRY,
- timeout=TIMEOUT,
- metadata=METADATA,
- template=WORKFLOW_TEMPLATE,
- )
- op.execute(context=MagicMock())
- mock_hook.assert_called_once_with(
- gcp_conn_id=GCP_CONN_ID,
impersonation_chain=IMPERSONATION_CHAIN
- )
-
mock_hook.return_value.create_workflow_template.assert_called_once_with(
- region=GCP_LOCATION,
- project_id=GCP_PROJECT,
- retry=RETRY,
- timeout=TIMEOUT,
- metadata=METADATA,
- template=WORKFLOW_TEMPLATE,
- )
- assert warning_message == str(warnings[0].message)
-
- with pytest.raises(TypeError):
+ def test_missing_region_parameter(self, mock_hook):
+ with pytest.raises(AirflowException):
op = DataprocCreateWorkflowTemplateOperator(
task_id=TASK_ID,
gcp_conn_id=GCP_CONN_ID,
diff --git a/tests/providers/google/cloud/sensors/test_dataproc.py
b/tests/providers/google/cloud/sensors/test_dataproc.py
index 35fa9697b8..f38f89ff41 100644
--- a/tests/providers/google/cloud/sensors/test_dataproc.py
+++ b/tests/providers/google/cloud/sensors/test_dataproc.py
@@ -131,37 +131,12 @@ class TestDataprocJobSensor(unittest.TestCase):
)
@mock.patch(DATAPROC_PATH.format("DataprocHook"))
- def test_location_deprecation_warning(self, mock_hook):
- job = self.create_job(JobStatus.State.DONE)
- job_id = "job_id"
- mock_hook.return_value.get_job.return_value = job
- warning_message = (
- "Parameter `location` will be deprecated. "
- "Please provide value through `region` parameter instead."
- )
-
- with pytest.warns(DeprecationWarning) as warnings:
- sensor = DataprocJobSensor(
- task_id=TASK_ID,
- location=GCP_LOCATION,
- project_id=GCP_PROJECT,
- dataproc_job_id=job_id,
- gcp_conn_id=GCP_CONN_ID,
- timeout=TIMEOUT,
- )
- assert warning_message == str(warnings[0].message)
- ret = sensor.poke(context={})
-
- mock_hook.return_value.get_job.assert_called_once_with(
- job_id=job_id, region=GCP_LOCATION, project_id=GCP_PROJECT
- )
- assert ret
-
- with pytest.raises(TypeError):
+ def test_missing_region(self, mock_hook):
+ with pytest.raises(AirflowException):
sensor = DataprocJobSensor(
task_id=TASK_ID,
project_id=GCP_PROJECT,
- dataproc_job_id=job_id,
+ dataproc_job_id="job_id",
gcp_conn_id=GCP_CONN_ID,
timeout=TIMEOUT,
)