This is an automated email from the ASF dual-hosted git repository.

pankaj pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git


The following commit(s) were added to refs/heads/main by this push:
     new ca4c559865 Fix Async GCSObjectsWithPrefixExistenceSensor xcom push 
(#37634)
ca4c559865 is described below

commit ca4c55986534a553baea80a7bb5b834f7fdf0ddd
Author: Pankaj Singh <[email protected]>
AuthorDate: Fri Feb 23 02:29:58 2024 +0530

    Fix Async GCSObjectsWithPrefixExistenceSensor xcom push (#37634)
    
    Fix GCSObjectsWithPrefixExistenceSensor found prefix in fist
    poke only in async mode of sensor then it does not push matches
    in xcom. This PR fix it.
---
 airflow/providers/google/cloud/sensors/gcs.py    |  2 ++
 tests/providers/google/cloud/sensors/test_gcs.py | 14 ++++++++++++++
 2 files changed, 16 insertions(+)

diff --git a/airflow/providers/google/cloud/sensors/gcs.py 
b/airflow/providers/google/cloud/sensors/gcs.py
index a952ab8a25..92ec7b92ce 100644
--- a/airflow/providers/google/cloud/sensors/gcs.py
+++ b/airflow/providers/google/cloud/sensors/gcs.py
@@ -363,6 +363,8 @@ class 
GCSObjectsWithPrefixExistenceSensor(BaseSensorOperator):
                     ),
                     method_name="execute_complete",
                 )
+            else:
+                return self._matches
 
     def execute_complete(self, context: dict[str, Any], event: dict[str, str | 
list[str]]) -> str | list[str]:
         """Return immediately and rely on trigger to throw a success event. 
Callback for the trigger."""
diff --git a/tests/providers/google/cloud/sensors/test_gcs.py 
b/tests/providers/google/cloud/sensors/test_gcs.py
index 641b2052fd..690cf89fae 100644
--- a/tests/providers/google/cloud/sensors/test_gcs.py
+++ b/tests/providers/google/cloud/sensors/test_gcs.py
@@ -429,6 +429,20 @@ class TestGoogleCloudStoragePrefixSensor:
         task.execute(mock.MagicMock())
         assert not mock_defer.called
 
+    @mock.patch("airflow.providers.google.cloud.sensors.gcs.GCSHook")
+    def test_xcom_value_when_poke_success(self, mock_hook):
+        mock_hook.return_value.list.return_value = ["test.txt"]
+        task = GCSObjectsWithPrefixExistenceSensor(
+            task_id="task-id",
+            bucket=TEST_BUCKET,
+            prefix=TEST_PREFIX,
+            google_cloud_conn_id=TEST_GCP_CONN_ID,
+            impersonation_chain=TEST_IMPERSONATION_CHAIN,
+            deferrable=True,
+        )
+        responses = task.execute(None)
+        assert responses == ["test.txt"]
+
 
 class TestGCSObjectsWithPrefixExistenceAsyncSensor:
     OPERATOR = GCSObjectsWithPrefixExistenceSensor(

Reply via email to