This is an automated email from the ASF dual-hosted git repository.
pankaj pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git
The following commit(s) were added to refs/heads/main by this push:
new ca4c559865 Fix Async GCSObjectsWithPrefixExistenceSensor xcom push
(#37634)
ca4c559865 is described below
commit ca4c55986534a553baea80a7bb5b834f7fdf0ddd
Author: Pankaj Singh <[email protected]>
AuthorDate: Fri Feb 23 02:29:58 2024 +0530
Fix Async GCSObjectsWithPrefixExistenceSensor xcom push (#37634)
Fix GCSObjectsWithPrefixExistenceSensor found prefix in fist
poke only in async mode of sensor then it does not push matches
in xcom. This PR fix it.
---
airflow/providers/google/cloud/sensors/gcs.py | 2 ++
tests/providers/google/cloud/sensors/test_gcs.py | 14 ++++++++++++++
2 files changed, 16 insertions(+)
diff --git a/airflow/providers/google/cloud/sensors/gcs.py
b/airflow/providers/google/cloud/sensors/gcs.py
index a952ab8a25..92ec7b92ce 100644
--- a/airflow/providers/google/cloud/sensors/gcs.py
+++ b/airflow/providers/google/cloud/sensors/gcs.py
@@ -363,6 +363,8 @@ class
GCSObjectsWithPrefixExistenceSensor(BaseSensorOperator):
),
method_name="execute_complete",
)
+ else:
+ return self._matches
def execute_complete(self, context: dict[str, Any], event: dict[str, str |
list[str]]) -> str | list[str]:
"""Return immediately and rely on trigger to throw a success event.
Callback for the trigger."""
diff --git a/tests/providers/google/cloud/sensors/test_gcs.py
b/tests/providers/google/cloud/sensors/test_gcs.py
index 641b2052fd..690cf89fae 100644
--- a/tests/providers/google/cloud/sensors/test_gcs.py
+++ b/tests/providers/google/cloud/sensors/test_gcs.py
@@ -429,6 +429,20 @@ class TestGoogleCloudStoragePrefixSensor:
task.execute(mock.MagicMock())
assert not mock_defer.called
+ @mock.patch("airflow.providers.google.cloud.sensors.gcs.GCSHook")
+ def test_xcom_value_when_poke_success(self, mock_hook):
+ mock_hook.return_value.list.return_value = ["test.txt"]
+ task = GCSObjectsWithPrefixExistenceSensor(
+ task_id="task-id",
+ bucket=TEST_BUCKET,
+ prefix=TEST_PREFIX,
+ google_cloud_conn_id=TEST_GCP_CONN_ID,
+ impersonation_chain=TEST_IMPERSONATION_CHAIN,
+ deferrable=True,
+ )
+ responses = task.execute(None)
+ assert responses == ["test.txt"]
+
class TestGCSObjectsWithPrefixExistenceAsyncSensor:
OPERATOR = GCSObjectsWithPrefixExistenceSensor(