This is an automated email from the ASF dual-hosted git repository.
potiuk pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git
The following commit(s) were added to refs/heads/main by this push:
new 57fb80cff0 Support CloudDataTransferServiceJobStatusSensor without
specifying a project_id (#30035)
57fb80cff0 is described below
commit 57fb80cff020a3e405c7d6a72037a7757ccdd5f5
Author: J.C. Zhang <[email protected]>
AuthorDate: Wed Mar 15 06:10:09 2023 +0900
Support CloudDataTransferServiceJobStatusSensor without specifying a
project_id (#30035)
---
.../sensors/cloud_storage_transfer_service.py | 2 +-
.../sensors/test_cloud_storage_transfer_service.py | 35 +++++++++++++++++++++-
2 files changed, 35 insertions(+), 2 deletions(-)
diff --git
a/airflow/providers/google/cloud/sensors/cloud_storage_transfer_service.py
b/airflow/providers/google/cloud/sensors/cloud_storage_transfer_service.py
index 67f64e2497..cccc628d3c 100644
--- a/airflow/providers/google/cloud/sensors/cloud_storage_transfer_service.py
+++ b/airflow/providers/google/cloud/sensors/cloud_storage_transfer_service.py
@@ -93,7 +93,7 @@ class
CloudDataTransferServiceJobStatusSensor(BaseSensorOperator):
impersonation_chain=self.impersonation_chain,
)
operations = hook.list_transfer_operations(
- request_filter={"project_id": self.project_id, "job_names":
[self.job_name]}
+ request_filter={"project_id": self.project_id or hook.project_id,
"job_names": [self.job_name]}
)
for operation in operations:
diff --git
a/tests/providers/google/cloud/sensors/test_cloud_storage_transfer_service.py
b/tests/providers/google/cloud/sensors/test_cloud_storage_transfer_service.py
index ff8dad7a1b..68517e17ea 100644
---
a/tests/providers/google/cloud/sensors/test_cloud_storage_transfer_service.py
+++
b/tests/providers/google/cloud/sensors/test_cloud_storage_transfer_service.py
@@ -72,8 +72,41 @@ class TestGcpStorageTransferOperationWaitForJobStatusSensor:
@mock.patch(
"airflow.providers.google.cloud.sensors.cloud_storage_transfer_service.CloudDataTransferServiceHook"
)
- def test_wait_for_status_success_default_expected_status(self, mock_tool):
+ def test_wait_for_status_success_without_project_id(self, mock_tool):
+ operations = [
+ {
+ "name": TEST_NAME,
+ "metadata": {
+ "status": GcpTransferOperationStatus.SUCCESS,
+ "counters": TEST_COUNTERS,
+ },
+ }
+ ]
+ mock_tool.return_value.list_transfer_operations.return_value =
operations
+ mock_tool.operations_contain_expected_statuses.return_value = True
+ mock_tool.return_value.project_id = "project-id"
+ op = CloudDataTransferServiceJobStatusSensor(
+ task_id="task-id",
+ job_name=JOB_NAME,
+ expected_statuses=GcpTransferOperationStatus.SUCCESS,
+ )
+
+ context = {"ti": (mock.Mock(**{"xcom_push.return_value": None}))}
+ result = op.poke(context)
+
+
mock_tool.return_value.list_transfer_operations.assert_called_once_with(
+ request_filter={"project_id": "project-id", "job_names":
[JOB_NAME]}
+ )
+ mock_tool.operations_contain_expected_statuses.assert_called_once_with(
+ operations=operations,
expected_statuses={GcpTransferOperationStatus.SUCCESS}
+ )
+ assert result
+
+ @mock.patch(
+
"airflow.providers.google.cloud.sensors.cloud_storage_transfer_service.CloudDataTransferServiceHook"
+ )
+ def test_wait_for_status_success_default_expected_status(self, mock_tool):
op = CloudDataTransferServiceJobStatusSensor(
task_id="task-id",
job_name=JOB_NAME,