This is an automated email from the ASF dual-hosted git repository.

potiuk pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git


The following commit(s) were added to refs/heads/main by this push:
     new 79a2fa7db9 Merge GCSObjectExistenceAsyncSensor logic to 
GCSObjectExistenceSensor (#30014)
79a2fa7db9 is described below

commit 79a2fa7db9d1689c5fe8a0afaa3883c4e0ccc00d
Author: Wei Lee <[email protected]>
AuthorDate: Tue Mar 21 21:28:30 2023 +0900

    Merge GCSObjectExistenceAsyncSensor logic to GCSObjectExistenceSensor 
(#30014)
---
 airflow/providers/google/cloud/sensors/gcs.py      | 72 +++++++++++++---------
 .../operators/cloud/gcs.rst                        | 10 ++-
 tests/providers/google/cloud/sensors/test_gcs.py   | 69 ++++++++++++++++++---
 .../google/cloud/gcs/example_gcs_sensor.py         | 13 +++-
 4 files changed, 123 insertions(+), 41 deletions(-)

diff --git a/airflow/providers/google/cloud/sensors/gcs.py 
b/airflow/providers/google/cloud/sensors/gcs.py
index 83be9034ee..ee78ae463c 100644
--- a/airflow/providers/google/cloud/sensors/gcs.py
+++ b/airflow/providers/google/cloud/sensors/gcs.py
@@ -22,7 +22,7 @@ import os
 import textwrap
 import warnings
 from datetime import datetime, timedelta
-from typing import TYPE_CHECKING, Callable, Sequence
+from typing import TYPE_CHECKING, Any, Callable, Sequence
 
 from google.api_core.retry import Retry
 from google.cloud.storage.retry import DEFAULT_RETRY
@@ -75,6 +75,7 @@ class GCSObjectExistenceSensor(BaseSensorOperator):
         delegate_to: str | None = None,
         impersonation_chain: str | Sequence[str] | None = None,
         retry: Retry = DEFAULT_RETRY,
+        deferrable: bool = False,
         **kwargs,
     ) -> None:
 
@@ -90,6 +91,8 @@ class GCSObjectExistenceSensor(BaseSensorOperator):
         self.impersonation_chain = impersonation_chain
         self.retry = retry
 
+        self.deferrable = deferrable
+
     def poke(self, context: Context) -> bool:
         self.log.info("Sensor checks existence of : %s, %s", self.bucket, 
self.object)
         hook = GCSHook(
@@ -99,10 +102,43 @@ class GCSObjectExistenceSensor(BaseSensorOperator):
         )
         return hook.exists(self.bucket, self.object, self.retry)
 
+    def execute(self, context: Context) -> None:
+        """Airflow runs this method on the worker and defers using the 
trigger."""
+        if not self.deferrable:
+            super().execute(context)
+        else:
+            self.defer(
+                timeout=timedelta(seconds=self.timeout),
+                trigger=GCSBlobTrigger(
+                    bucket=self.bucket,
+                    object_name=self.object,
+                    poke_interval=self.poke_interval,
+                    google_cloud_conn_id=self.google_cloud_conn_id,
+                    hook_params={
+                        "delegate_to": self.delegate_to,
+                        "impersonation_chain": self.impersonation_chain,
+                    },
+                ),
+                method_name="execute_complete",
+            )
+
+    def execute_complete(self, context: Context, event: dict[str, str]) -> str:
+        """
+        Callback for when the trigger fires - returns immediately.
+        Relies on trigger to throw an exception, otherwise it assumes 
execution was
+        successful.
+        """
+        if event["status"] == "error":
+            raise AirflowException(event["message"])
+        self.log.info("File %s was found in bucket %s.", self.object, 
self.bucket)
+        return event["message"]
+
 
 class GCSObjectExistenceAsyncSensor(GCSObjectExistenceSensor):
     """
-    Checks for the existence of a file in Google Cloud Storage .
+    Checks for the existence of a file in Google Cloud Storage.
+    Class `GCSObjectExistenceAsyncSensor` is deprecated and will be removed in 
a future release.
+    Please use `GCSObjectExistenceSensor` and set `deferrable` attribute to 
`True` instead
 
     :param bucket: The Google Cloud Storage bucket where the object is.
     :param object: The name of the object to check in the Google cloud storage 
bucket.
@@ -120,33 +156,13 @@ class 
GCSObjectExistenceAsyncSensor(GCSObjectExistenceSensor):
         account from the list granting this role to the originating account 
(templated).
     """
 
-    def execute(self, context: Context) -> None:
-        """Airflow runs this method on the worker and defers using the 
trigger."""
-        self.defer(
-            timeout=timedelta(seconds=self.timeout),
-            trigger=GCSBlobTrigger(
-                bucket=self.bucket,
-                object_name=self.object,
-                poke_interval=self.poke_interval,
-                google_cloud_conn_id=self.google_cloud_conn_id,
-                hook_params={
-                    "delegate_to": self.delegate_to,
-                    "impersonation_chain": self.impersonation_chain,
-                },
-            ),
-            method_name="execute_complete",
+    def __init__(self, **kwargs: Any) -> None:
+        warnings.warn(
+            "Class `GCSObjectExistenceAsyncSensor` is deprecated and will be 
removed in a future release. "
+            "Please use `GCSObjectExistenceSensor` and set `deferrable` 
attribute to `True` instead",
+            DeprecationWarning,
         )
-
-    def execute_complete(self, context: Context, event: dict[str, str]) -> str:
-        """
-        Callback for when the trigger fires - returns immediately.
-        Relies on trigger to throw an exception, otherwise it assumes 
execution was
-        successful.
-        """
-        if event["status"] == "error":
-            raise AirflowException(event["message"])
-        self.log.info("File %s was found in bucket %s.", self.object, 
self.bucket)
-        return event["message"]
+        super().__init__(deferrable=True, **kwargs)
 
 
 def ts_function(context):
diff --git a/docs/apache-airflow-providers-google/operators/cloud/gcs.rst 
b/docs/apache-airflow-providers-google/operators/cloud/gcs.rst
index 2a037f6050..af32b74627 100644
--- a/docs/apache-airflow-providers-google/operators/cloud/gcs.rst
+++ b/docs/apache-airflow-providers-google/operators/cloud/gcs.rst
@@ -164,14 +164,20 @@ Use the 
:class:`~airflow.providers.google.cloud.sensors.gcs.GCSObjectExistenceSe
     :start-after: [START howto_sensor_object_exists_task]
     :end-before: [END howto_sensor_object_exists_task]
 
+Also you can use deferrable mode in this operator if you would like to free up 
the worker slots while the sensor is running.
+
+.. exampleinclude:: 
/../../tests/system/providers/google/cloud/gcs/example_gcs_sensor.py
+    :language: python
+    :dedent: 4
+    :start-after: [START howto_sensor_object_exists_task_defered]
+    :end-before: [END howto_sensor_object_exists_task_defered]
 
 .. _howto/sensor:GCSObjectExistenceAsyncSensor:
 
 GCSObjectExistenceAsyncSensor
 -----------------------------
 
-Use the 
:class:`~airflow.providers.google.cloud.sensors.gcs.GCSObjectExistenceAsyncSensor`
-(deferrable version) if you would like to free up the worker slots while the 
sensor is running.
+:class:`~airflow.providers.google.cloud.sensors.gcs.GCSObjectExistenceAsyncSensor`
 is deprecated and will be removed in a future release. Please use 
:class:`~airflow.providers.google.cloud.sensors.gcs.GCSObjectExistenceSensor` 
and use the deferrable mode in that operator.
 
 .. exampleinclude:: 
/../../tests/system/providers/google/cloud/gcs/example_gcs_sensor.py
     :language: python
diff --git a/tests/providers/google/cloud/sensors/test_gcs.py 
b/tests/providers/google/cloud/sensors/test_gcs.py
index 642461f63a..2f4c6a40c7 100644
--- a/tests/providers/google/cloud/sensors/test_gcs.py
+++ b/tests/providers/google/cloud/sensors/test_gcs.py
@@ -98,47 +98,96 @@ class TestGoogleCloudStorageObjectSensor:
         )
         mock_hook.return_value.exists.assert_called_once_with(TEST_BUCKET, 
TEST_OBJECT, DEFAULT_RETRY)
 
-
-class TestGoogleCloudStorageObjectSensorAsync:
-    def test_gcs_object_existence_sensor_async(self):
+    def test_gcs_object_existence_sensor_deferred(self):
         """
         Asserts that a task is deferred and a GCSBlobTrigger will be fired
-        when the GCSObjectExistenceAsyncSensor is executed.
+        when the GCSObjectExistenceSensor is executed and deferrable is set to 
True.
         """
-        task = GCSObjectExistenceAsyncSensor(
+        task = GCSObjectExistenceSensor(
             task_id="task-id",
             bucket=TEST_BUCKET,
             object=TEST_OBJECT,
             google_cloud_conn_id=TEST_GCP_CONN_ID,
+            deferrable=True,
         )
         with pytest.raises(TaskDeferred) as exc:
             task.execute(context)
         assert isinstance(exc.value.trigger, GCSBlobTrigger), "Trigger is not 
a GCSBlobTrigger"
 
-    def test_gcs_object_existence_sensor_async_execute_failure(self):
-        """Tests that an AirflowException is raised in case of error event"""
-        task = GCSObjectExistenceAsyncSensor(
+    def test_gcs_object_existence_sensor_deferred_execute_failure(self):
+        """Tests that an AirflowException is raised in case of error event 
when deferrable is set to True"""
+        task = GCSObjectExistenceSensor(
             task_id="task-id",
             bucket=TEST_BUCKET,
             object=TEST_OBJECT,
             google_cloud_conn_id=TEST_GCP_CONN_ID,
+            deferrable=True,
         )
         with pytest.raises(AirflowException):
             task.execute_complete(context=None, event={"status": "error", 
"message": "test failure message"})
 
     def test_gcs_object_existence_sensor_async_execute_complete(self):
-        """Asserts that logging occurs as expected"""
-        task = GCSObjectExistenceAsyncSensor(
+        """Asserts that logging occurs as expected when deferrable is set to 
True"""
+        task = GCSObjectExistenceSensor(
             task_id="task-id",
             bucket=TEST_BUCKET,
             object=TEST_OBJECT,
             google_cloud_conn_id=TEST_GCP_CONN_ID,
+            deferrable=True,
         )
         with mock.patch.object(task.log, "info") as mock_log_info:
             task.execute_complete(context=None, event={"status": "success", 
"message": "Job completed"})
         mock_log_info.assert_called_with("File %s was found in bucket %s.", 
TEST_OBJECT, TEST_BUCKET)
 
 
+class TestGoogleCloudStorageObjectSensorAsync:
+    depcrecation_message = (
+        "Class `GCSObjectExistenceAsyncSensor` is deprecated and will be 
removed in a future release. "
+        "Please use `GCSObjectExistenceSensor` and set `deferrable` attribute 
to `True` instead"
+    )
+
+    def test_gcs_object_existence_sensor_async(self):
+        """
+        Asserts that a task is deferred and a GCSBlobTrigger will be fired
+        when the GCSObjectExistenceAsyncSensor is executed.
+        """
+        with pytest.warns(DeprecationWarning, match=self.depcrecation_message):
+            task = GCSObjectExistenceAsyncSensor(
+                task_id="task-id",
+                bucket=TEST_BUCKET,
+                object=TEST_OBJECT,
+                google_cloud_conn_id=TEST_GCP_CONN_ID,
+            )
+        with pytest.raises(TaskDeferred) as exc:
+            task.execute(context)
+        assert isinstance(exc.value.trigger, GCSBlobTrigger), "Trigger is not 
a GCSBlobTrigger"
+
+    def test_gcs_object_existence_sensor_async_execute_failure(self):
+        """Tests that an AirflowException is raised in case of error event"""
+        with pytest.warns(DeprecationWarning, match=self.depcrecation_message):
+            task = GCSObjectExistenceAsyncSensor(
+                task_id="task-id",
+                bucket=TEST_BUCKET,
+                object=TEST_OBJECT,
+                google_cloud_conn_id=TEST_GCP_CONN_ID,
+            )
+        with pytest.raises(AirflowException):
+            task.execute_complete(context=None, event={"status": "error", 
"message": "test failure message"})
+
+    def test_gcs_object_existence_sensor_async_execute_complete(self):
+        """Asserts that logging occurs as expected"""
+        with pytest.warns(DeprecationWarning, match=self.depcrecation_message):
+            task = GCSObjectExistenceAsyncSensor(
+                task_id="task-id",
+                bucket=TEST_BUCKET,
+                object=TEST_OBJECT,
+                google_cloud_conn_id=TEST_GCP_CONN_ID,
+            )
+        with mock.patch.object(task.log, "info") as mock_log_info:
+            task.execute_complete(context=None, event={"status": "success", 
"message": "Job completed"})
+        mock_log_info.assert_called_with("File %s was found in bucket %s.", 
TEST_OBJECT, TEST_BUCKET)
+
+
 class TestTsFunction:
     def test_should_support_datetime(self):
         context = {
diff --git a/tests/system/providers/google/cloud/gcs/example_gcs_sensor.py 
b/tests/system/providers/google/cloud/gcs/example_gcs_sensor.py
index ef48fa28db..bc7a5dda5d 100644
--- a/tests/system/providers/google/cloud/gcs/example_gcs_sensor.py
+++ b/tests/system/providers/google/cloud/gcs/example_gcs_sensor.py
@@ -124,6 +124,12 @@ with models.DAG(
     )
     # [END howto_sensor_object_exists_task_async]
 
+    # [START howto_sensor_object_exists_task_defered]
+    gcs_object_exists_defered = GCSObjectExistenceSensor(
+        bucket=BUCKET_NAME, object=FILE_NAME, 
task_id="gcs_object_exists_defered", deferrable=True
+    )
+    # [END howto_sensor_object_exists_task_defered]
+
     # [START howto_sensor_object_with_prefix_exists_task]
     gcs_object_with_prefix_exists = GCSObjectsWithPrefixExistenceSensor(
         bucket=BUCKET_NAME,
@@ -144,7 +150,12 @@ with models.DAG(
         sleep,
         upload_file,
         # TEST BODY
-        [gcs_object_exists, gcs_object_exists_async, 
gcs_object_with_prefix_exists],
+        [
+            gcs_object_exists,
+            gcs_object_exists_defered,
+            gcs_object_exists_async,
+            gcs_object_with_prefix_exists,
+        ],
         # TEST TEARDOWN
         delete_bucket,
     )

Reply via email to