This is an automated email from the ASF dual-hosted git repository.

eladkal pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git


The following commit(s) were added to refs/heads/main by this push:
     new 3a5b583c91 Optimize deferrable mode in `GCSObjectExistenceSensor` 
(#30901)
3a5b583c91 is described below

commit 3a5b583c916fff4603cdb2f2be815ccc5c281750
Author: Phani Kumar <[email protected]>
AuthorDate: Thu Apr 27 22:03:48 2023 +0530

    Optimize deferrable mode in `GCSObjectExistenceSensor` (#30901)
    
    * optimize deferrable mode in GCSObjectExistenceSensor
---
 airflow/providers/google/cloud/sensors/gcs.py    | 27 ++++++++++++------------
 tests/providers/google/cloud/sensors/test_gcs.py | 22 +++++++++++++++++--
 2 files changed, 34 insertions(+), 15 deletions(-)

diff --git a/airflow/providers/google/cloud/sensors/gcs.py 
b/airflow/providers/google/cloud/sensors/gcs.py
index c0365bbee0..ad96fc9184 100644
--- a/airflow/providers/google/cloud/sensors/gcs.py
+++ b/airflow/providers/google/cloud/sensors/gcs.py
@@ -101,19 +101,20 @@ class GCSObjectExistenceSensor(BaseSensorOperator):
         if not self.deferrable:
             super().execute(context)
         else:
-            self.defer(
-                timeout=timedelta(seconds=self.timeout),
-                trigger=GCSBlobTrigger(
-                    bucket=self.bucket,
-                    object_name=self.object,
-                    poke_interval=self.poke_interval,
-                    google_cloud_conn_id=self.google_cloud_conn_id,
-                    hook_params={
-                        "impersonation_chain": self.impersonation_chain,
-                    },
-                ),
-                method_name="execute_complete",
-            )
+            if not self.poke(context=context):
+                self.defer(
+                    timeout=timedelta(seconds=self.timeout),
+                    trigger=GCSBlobTrigger(
+                        bucket=self.bucket,
+                        object_name=self.object,
+                        poke_interval=self.poke_interval,
+                        google_cloud_conn_id=self.google_cloud_conn_id,
+                        hook_params={
+                            "impersonation_chain": self.impersonation_chain,
+                        },
+                    ),
+                    method_name="execute_complete",
+                )
 
     def execute_complete(self, context: Context, event: dict[str, str]) -> str:
         """
diff --git a/tests/providers/google/cloud/sensors/test_gcs.py 
b/tests/providers/google/cloud/sensors/test_gcs.py
index 5ed2a0fef5..e4632097cf 100644
--- a/tests/providers/google/cloud/sensors/test_gcs.py
+++ b/tests/providers/google/cloud/sensors/test_gcs.py
@@ -99,7 +99,22 @@ class TestGoogleCloudStorageObjectSensor:
         )
         mock_hook.return_value.exists.assert_called_once_with(TEST_BUCKET, 
TEST_OBJECT, DEFAULT_RETRY)
 
-    def test_gcs_object_existence_sensor_deferred(self):
+    @mock.patch("airflow.providers.google.cloud.sensors.gcs.GCSHook")
+    
@mock.patch("airflow.providers.google.cloud.sensors.gcs.GCSObjectExistenceSensor.defer")
+    def test_gcs_object_existence_sensor_finish_before_deferred(self, 
mock_defer, mock_hook):
+        task = GCSObjectExistenceSensor(
+            task_id="task-id",
+            bucket=TEST_BUCKET,
+            object=TEST_OBJECT,
+            google_cloud_conn_id=TEST_GCP_CONN_ID,
+            deferrable=True,
+        )
+        mock_hook.return_value.exists.return_value = True
+        task.execute(mock.MagicMock())
+        assert not mock_defer.called
+
+    @mock.patch("airflow.providers.google.cloud.sensors.gcs.GCSHook")
+    def test_gcs_object_existence_sensor_deferred(self, mock_hook):
         """
         Asserts that a task is deferred and a GCSBlobTrigger will be fired
         when the GCSObjectExistenceSensor is executed and deferrable is set to 
True.
@@ -111,6 +126,7 @@ class TestGoogleCloudStorageObjectSensor:
             google_cloud_conn_id=TEST_GCP_CONN_ID,
             deferrable=True,
         )
+        mock_hook.return_value.exists.return_value = False
         with pytest.raises(TaskDeferred) as exc:
             task.execute(context)
         assert isinstance(exc.value.trigger, GCSBlobTrigger), "Trigger is not 
a GCSBlobTrigger"
@@ -147,7 +163,8 @@ class TestGoogleCloudStorageObjectSensorAsync:
         "Please use `GCSObjectExistenceSensor` and set `deferrable` attribute 
to `True` instead"
     )
 
-    def test_gcs_object_existence_sensor_async(self):
+    @mock.patch("airflow.providers.google.cloud.sensors.gcs.GCSHook")
+    def test_gcs_object_existence_sensor_async(self, mock_hook):
         """
         Asserts that a task is deferred and a GCSBlobTrigger will be fired
         when the GCSObjectExistenceAsyncSensor is executed.
@@ -159,6 +176,7 @@ class TestGoogleCloudStorageObjectSensorAsync:
                 object=TEST_OBJECT,
                 google_cloud_conn_id=TEST_GCP_CONN_ID,
             )
+        mock_hook.return_value.exists.return_value = False
         with pytest.raises(TaskDeferred) as exc:
             task.execute(context)
         assert isinstance(exc.value.trigger, GCSBlobTrigger), "Trigger is not 
a GCSBlobTrigger"

Reply via email to