This is an automated email from the ASF dual-hosted git repository.

potiuk pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git


The following commit(s) were added to refs/heads/main by this push:
     new 57fb80cff0 Support CloudDataTransferServiceJobStatusSensor without 
specifying a project_id (#30035)
57fb80cff0 is described below

commit 57fb80cff020a3e405c7d6a72037a7757ccdd5f5
Author: J.C. Zhang <[email protected]>
AuthorDate: Wed Mar 15 06:10:09 2023 +0900

    Support CloudDataTransferServiceJobStatusSensor without specifying a 
project_id (#30035)
---
 .../sensors/cloud_storage_transfer_service.py      |  2 +-
 .../sensors/test_cloud_storage_transfer_service.py | 35 +++++++++++++++++++++-
 2 files changed, 35 insertions(+), 2 deletions(-)

diff --git 
a/airflow/providers/google/cloud/sensors/cloud_storage_transfer_service.py 
b/airflow/providers/google/cloud/sensors/cloud_storage_transfer_service.py
index 67f64e2497..cccc628d3c 100644
--- a/airflow/providers/google/cloud/sensors/cloud_storage_transfer_service.py
+++ b/airflow/providers/google/cloud/sensors/cloud_storage_transfer_service.py
@@ -93,7 +93,7 @@ class 
CloudDataTransferServiceJobStatusSensor(BaseSensorOperator):
             impersonation_chain=self.impersonation_chain,
         )
         operations = hook.list_transfer_operations(
-            request_filter={"project_id": self.project_id, "job_names": 
[self.job_name]}
+            request_filter={"project_id": self.project_id or hook.project_id, 
"job_names": [self.job_name]}
         )
 
         for operation in operations:
diff --git 
a/tests/providers/google/cloud/sensors/test_cloud_storage_transfer_service.py 
b/tests/providers/google/cloud/sensors/test_cloud_storage_transfer_service.py
index ff8dad7a1b..68517e17ea 100644
--- 
a/tests/providers/google/cloud/sensors/test_cloud_storage_transfer_service.py
+++ 
b/tests/providers/google/cloud/sensors/test_cloud_storage_transfer_service.py
@@ -72,8 +72,41 @@ class TestGcpStorageTransferOperationWaitForJobStatusSensor:
     @mock.patch(
         
"airflow.providers.google.cloud.sensors.cloud_storage_transfer_service.CloudDataTransferServiceHook"
     )
-    def test_wait_for_status_success_default_expected_status(self, mock_tool):
+    def test_wait_for_status_success_without_project_id(self, mock_tool):
+        operations = [
+            {
+                "name": TEST_NAME,
+                "metadata": {
+                    "status": GcpTransferOperationStatus.SUCCESS,
+                    "counters": TEST_COUNTERS,
+                },
+            }
+        ]
+        mock_tool.return_value.list_transfer_operations.return_value = 
operations
+        mock_tool.operations_contain_expected_statuses.return_value = True
+        mock_tool.return_value.project_id = "project-id"
 
+        op = CloudDataTransferServiceJobStatusSensor(
+            task_id="task-id",
+            job_name=JOB_NAME,
+            expected_statuses=GcpTransferOperationStatus.SUCCESS,
+        )
+
+        context = {"ti": (mock.Mock(**{"xcom_push.return_value": None}))}
+        result = op.poke(context)
+
+        
mock_tool.return_value.list_transfer_operations.assert_called_once_with(
+            request_filter={"project_id": "project-id", "job_names": 
[JOB_NAME]}
+        )
+        mock_tool.operations_contain_expected_statuses.assert_called_once_with(
+            operations=operations, 
expected_statuses={GcpTransferOperationStatus.SUCCESS}
+        )
+        assert result
+
+    @mock.patch(
+        
"airflow.providers.google.cloud.sensors.cloud_storage_transfer_service.CloudDataTransferServiceHook"
+    )
+    def test_wait_for_status_success_default_expected_status(self, mock_tool):
         op = CloudDataTransferServiceJobStatusSensor(
             task_id="task-id",
             job_name=JOB_NAME,

Reply via email to