This is an automated email from the ASF dual-hosted git repository.
potiuk pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git
The following commit(s) were added to refs/heads/main by this push:
new 967140e6c3 Add location support to
BigQueryDataTransferServiceTransferRunSensor.
967140e6c3 is described below
commit 967140e6c3bd0f359393e018bf27b7f2310a2fd9
Author: Karthikeyan Singaravelan <[email protected]>
AuthorDate: Fri Apr 22 15:25:31 2022 +0530
Add location support to BigQueryDataTransferServiceTransferRunSensor.
---
airflow/providers/google/cloud/sensors/bigquery_dts.py | 3 +++
tests/providers/google/cloud/sensors/test_bigquery_dts.py | 9 +++++++++
2 files changed, 12 insertions(+)
diff --git a/airflow/providers/google/cloud/sensors/bigquery_dts.py
b/airflow/providers/google/cloud/sensors/bigquery_dts.py
index 9e14845197..ef92601c0a 100644
--- a/airflow/providers/google/cloud/sensors/bigquery_dts.py
+++ b/airflow/providers/google/cloud/sensors/bigquery_dts.py
@@ -84,6 +84,7 @@ class
BigQueryDataTransferServiceTransferRunSensor(BaseSensorOperator):
retry: Union[Retry, _MethodDefault] = DEFAULT,
request_timeout: Optional[float] = None,
metadata: Sequence[Tuple[str, str]] = (),
+ location: Optional[str] = None,
impersonation_chain: Optional[Union[str, Sequence[str]]] = None,
**kwargs,
) -> None:
@@ -97,6 +98,7 @@ class
BigQueryDataTransferServiceTransferRunSensor(BaseSensorOperator):
self.project_id = project_id
self.gcp_cloud_conn_id = gcp_conn_id
self.impersonation_chain = impersonation_chain
+ self.location = location
def _normalize_state_list(self, states) -> Set[TransferState]:
states = {states} if isinstance(states, (str, TransferState, int))
else states
@@ -122,6 +124,7 @@ class
BigQueryDataTransferServiceTransferRunSensor(BaseSensorOperator):
hook = BiqQueryDataTransferServiceHook(
gcp_conn_id=self.gcp_cloud_conn_id,
impersonation_chain=self.impersonation_chain,
+ location=self.location,
)
run = hook.get_transfer_run(
run_id=self.run_id,
diff --git a/tests/providers/google/cloud/sensors/test_bigquery_dts.py
b/tests/providers/google/cloud/sensors/test_bigquery_dts.py
index f75d47714d..f55d338e2f 100644
--- a/tests/providers/google/cloud/sensors/test_bigquery_dts.py
+++ b/tests/providers/google/cloud/sensors/test_bigquery_dts.py
@@ -30,6 +30,8 @@ from airflow.providers.google.cloud.sensors.bigquery_dts
import BigQueryDataTran
TRANSFER_CONFIG_ID = "config_id"
RUN_ID = "run_id"
PROJECT_ID = "project_id"
+LOCATION = "europe"
+GCP_CONN_ID = "google_cloud_default"
class TestBigQueryDataTransferServiceTransferRunSensor(unittest.TestCase):
@@ -48,6 +50,8 @@ class
TestBigQueryDataTransferServiceTransferRunSensor(unittest.TestCase):
with pytest.raises(AirflowException, match="Transfer"):
op.poke({})
+
+ mock_hook.assert_called_once_with(gcp_conn_id=GCP_CONN_ID,
impersonation_chain=None, location=None)
mock_hook.return_value.get_transfer_run.assert_called_once_with(
transfer_config_id=TRANSFER_CONFIG_ID,
run_id=RUN_ID,
@@ -68,10 +72,15 @@ class
TestBigQueryDataTransferServiceTransferRunSensor(unittest.TestCase):
task_id="id",
project_id=PROJECT_ID,
expected_statuses={"SUCCEEDED"},
+ location=LOCATION,
)
result = op.poke({})
assert result is True
+
+ mock_hook.assert_called_once_with(
+ gcp_conn_id=GCP_CONN_ID, impersonation_chain=None,
location=LOCATION
+ )
mock_hook.return_value.get_transfer_run.assert_called_once_with(
transfer_config_id=TRANSFER_CONFIG_ID,
run_id=RUN_ID,