This is an automated email from the ASF dual-hosted git repository.
eladkal pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git
The following commit(s) were added to refs/heads/main by this push:
new 3a5b583c91 Optimize deferrable mode in `GCSObjectExistenceSensor`
(#30901)
3a5b583c91 is described below
commit 3a5b583c916fff4603cdb2f2be815ccc5c281750
Author: Phani Kumar <[email protected]>
AuthorDate: Thu Apr 27 22:03:48 2023 +0530
Optimize deferrable mode in `GCSObjectExistenceSensor` (#30901)
* optimize deferrable mode in GCSObjectExistenceSensor
---
airflow/providers/google/cloud/sensors/gcs.py | 27 ++++++++++++------------
tests/providers/google/cloud/sensors/test_gcs.py | 22 +++++++++++++++++--
2 files changed, 34 insertions(+), 15 deletions(-)
diff --git a/airflow/providers/google/cloud/sensors/gcs.py
b/airflow/providers/google/cloud/sensors/gcs.py
index c0365bbee0..ad96fc9184 100644
--- a/airflow/providers/google/cloud/sensors/gcs.py
+++ b/airflow/providers/google/cloud/sensors/gcs.py
@@ -101,19 +101,20 @@ class GCSObjectExistenceSensor(BaseSensorOperator):
if not self.deferrable:
super().execute(context)
else:
- self.defer(
- timeout=timedelta(seconds=self.timeout),
- trigger=GCSBlobTrigger(
- bucket=self.bucket,
- object_name=self.object,
- poke_interval=self.poke_interval,
- google_cloud_conn_id=self.google_cloud_conn_id,
- hook_params={
- "impersonation_chain": self.impersonation_chain,
- },
- ),
- method_name="execute_complete",
- )
+ if not self.poke(context=context):
+ self.defer(
+ timeout=timedelta(seconds=self.timeout),
+ trigger=GCSBlobTrigger(
+ bucket=self.bucket,
+ object_name=self.object,
+ poke_interval=self.poke_interval,
+ google_cloud_conn_id=self.google_cloud_conn_id,
+ hook_params={
+ "impersonation_chain": self.impersonation_chain,
+ },
+ ),
+ method_name="execute_complete",
+ )
def execute_complete(self, context: Context, event: dict[str, str]) -> str:
"""
diff --git a/tests/providers/google/cloud/sensors/test_gcs.py
b/tests/providers/google/cloud/sensors/test_gcs.py
index 5ed2a0fef5..e4632097cf 100644
--- a/tests/providers/google/cloud/sensors/test_gcs.py
+++ b/tests/providers/google/cloud/sensors/test_gcs.py
@@ -99,7 +99,22 @@ class TestGoogleCloudStorageObjectSensor:
)
mock_hook.return_value.exists.assert_called_once_with(TEST_BUCKET,
TEST_OBJECT, DEFAULT_RETRY)
- def test_gcs_object_existence_sensor_deferred(self):
+ @mock.patch("airflow.providers.google.cloud.sensors.gcs.GCSHook")
+
@mock.patch("airflow.providers.google.cloud.sensors.gcs.GCSObjectExistenceSensor.defer")
+ def test_gcs_object_existence_sensor_finish_before_deferred(self,
mock_defer, mock_hook):
+ task = GCSObjectExistenceSensor(
+ task_id="task-id",
+ bucket=TEST_BUCKET,
+ object=TEST_OBJECT,
+ google_cloud_conn_id=TEST_GCP_CONN_ID,
+ deferrable=True,
+ )
+ mock_hook.return_value.exists.return_value = True
+ task.execute(mock.MagicMock())
+ assert not mock_defer.called
+
+ @mock.patch("airflow.providers.google.cloud.sensors.gcs.GCSHook")
+ def test_gcs_object_existence_sensor_deferred(self, mock_hook):
"""
Asserts that a task is deferred and a GCSBlobTrigger will be fired
when the GCSObjectExistenceSensor is executed and deferrable is set to
True.
@@ -111,6 +126,7 @@ class TestGoogleCloudStorageObjectSensor:
google_cloud_conn_id=TEST_GCP_CONN_ID,
deferrable=True,
)
+ mock_hook.return_value.exists.return_value = False
with pytest.raises(TaskDeferred) as exc:
task.execute(context)
assert isinstance(exc.value.trigger, GCSBlobTrigger), "Trigger is not
a GCSBlobTrigger"
@@ -147,7 +163,8 @@ class TestGoogleCloudStorageObjectSensorAsync:
"Please use `GCSObjectExistenceSensor` and set `deferrable` attribute
to `True` instead"
)
- def test_gcs_object_existence_sensor_async(self):
+ @mock.patch("airflow.providers.google.cloud.sensors.gcs.GCSHook")
+ def test_gcs_object_existence_sensor_async(self, mock_hook):
"""
Asserts that a task is deferred and a GCSBlobTrigger will be fired
when the GCSObjectExistenceAsyncSensor is executed.
@@ -159,6 +176,7 @@ class TestGoogleCloudStorageObjectSensorAsync:
object=TEST_OBJECT,
google_cloud_conn_id=TEST_GCP_CONN_ID,
)
+ mock_hook.return_value.exists.return_value = False
with pytest.raises(TaskDeferred) as exc:
task.execute(context)
assert isinstance(exc.value.trigger, GCSBlobTrigger), "Trigger is not
a GCSBlobTrigger"