This is an automated email from the ASF dual-hosted git repository.

potiuk pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git


The following commit(s) were added to refs/heads/main by this push:
     new 967140e6c3 Add location support to 
BigQueryDataTransferServiceTransferRunSensor.
967140e6c3 is described below

commit 967140e6c3bd0f359393e018bf27b7f2310a2fd9
Author: Karthikeyan Singaravelan <[email protected]>
AuthorDate: Fri Apr 22 15:25:31 2022 +0530

    Add location support to BigQueryDataTransferServiceTransferRunSensor.
---
 airflow/providers/google/cloud/sensors/bigquery_dts.py    | 3 +++
 tests/providers/google/cloud/sensors/test_bigquery_dts.py | 9 +++++++++
 2 files changed, 12 insertions(+)

diff --git a/airflow/providers/google/cloud/sensors/bigquery_dts.py 
b/airflow/providers/google/cloud/sensors/bigquery_dts.py
index 9e14845197..ef92601c0a 100644
--- a/airflow/providers/google/cloud/sensors/bigquery_dts.py
+++ b/airflow/providers/google/cloud/sensors/bigquery_dts.py
@@ -84,6 +84,7 @@ class 
BigQueryDataTransferServiceTransferRunSensor(BaseSensorOperator):
         retry: Union[Retry, _MethodDefault] = DEFAULT,
         request_timeout: Optional[float] = None,
         metadata: Sequence[Tuple[str, str]] = (),
+        location: Optional[str] = None,
         impersonation_chain: Optional[Union[str, Sequence[str]]] = None,
         **kwargs,
     ) -> None:
@@ -97,6 +98,7 @@ class 
BigQueryDataTransferServiceTransferRunSensor(BaseSensorOperator):
         self.project_id = project_id
         self.gcp_cloud_conn_id = gcp_conn_id
         self.impersonation_chain = impersonation_chain
+        self.location = location
 
     def _normalize_state_list(self, states) -> Set[TransferState]:
         states = {states} if isinstance(states, (str, TransferState, int)) 
else states
@@ -122,6 +124,7 @@ class 
BigQueryDataTransferServiceTransferRunSensor(BaseSensorOperator):
         hook = BiqQueryDataTransferServiceHook(
             gcp_conn_id=self.gcp_cloud_conn_id,
             impersonation_chain=self.impersonation_chain,
+            location=self.location,
         )
         run = hook.get_transfer_run(
             run_id=self.run_id,
diff --git a/tests/providers/google/cloud/sensors/test_bigquery_dts.py 
b/tests/providers/google/cloud/sensors/test_bigquery_dts.py
index f75d47714d..f55d338e2f 100644
--- a/tests/providers/google/cloud/sensors/test_bigquery_dts.py
+++ b/tests/providers/google/cloud/sensors/test_bigquery_dts.py
@@ -30,6 +30,8 @@ from airflow.providers.google.cloud.sensors.bigquery_dts 
import BigQueryDataTran
 TRANSFER_CONFIG_ID = "config_id"
 RUN_ID = "run_id"
 PROJECT_ID = "project_id"
+LOCATION = "europe"
+GCP_CONN_ID = "google_cloud_default"
 
 
 class TestBigQueryDataTransferServiceTransferRunSensor(unittest.TestCase):
@@ -48,6 +50,8 @@ class 
TestBigQueryDataTransferServiceTransferRunSensor(unittest.TestCase):
 
         with pytest.raises(AirflowException, match="Transfer"):
             op.poke({})
+
+        mock_hook.assert_called_once_with(gcp_conn_id=GCP_CONN_ID, 
impersonation_chain=None, location=None)
         mock_hook.return_value.get_transfer_run.assert_called_once_with(
             transfer_config_id=TRANSFER_CONFIG_ID,
             run_id=RUN_ID,
@@ -68,10 +72,15 @@ class 
TestBigQueryDataTransferServiceTransferRunSensor(unittest.TestCase):
             task_id="id",
             project_id=PROJECT_ID,
             expected_statuses={"SUCCEEDED"},
+            location=LOCATION,
         )
         result = op.poke({})
 
         assert result is True
+
+        mock_hook.assert_called_once_with(
+            gcp_conn_id=GCP_CONN_ID, impersonation_chain=None, 
location=LOCATION
+        )
         mock_hook.return_value.get_transfer_run.assert_called_once_with(
             transfer_config_id=TRANSFER_CONFIG_ID,
             run_id=RUN_ID,

Reply via email to