This is an automated email from the ASF dual-hosted git repository.
potiuk pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git
The following commit(s) were added to refs/heads/main by this push:
new 79a2fa7db9 Merge GCSObjectExistenceAsyncSensor logic to
GCSObjectExistenceSensor (#30014)
79a2fa7db9 is described below
commit 79a2fa7db9d1689c5fe8a0afaa3883c4e0ccc00d
Author: Wei Lee <[email protected]>
AuthorDate: Tue Mar 21 21:28:30 2023 +0900
Merge GCSObjectExistenceAsyncSensor logic to GCSObjectExistenceSensor
(#30014)
---
airflow/providers/google/cloud/sensors/gcs.py | 72 +++++++++++++---------
.../operators/cloud/gcs.rst | 10 ++-
tests/providers/google/cloud/sensors/test_gcs.py | 69 ++++++++++++++++++---
.../google/cloud/gcs/example_gcs_sensor.py | 13 +++-
4 files changed, 123 insertions(+), 41 deletions(-)
diff --git a/airflow/providers/google/cloud/sensors/gcs.py
b/airflow/providers/google/cloud/sensors/gcs.py
index 83be9034ee..ee78ae463c 100644
--- a/airflow/providers/google/cloud/sensors/gcs.py
+++ b/airflow/providers/google/cloud/sensors/gcs.py
@@ -22,7 +22,7 @@ import os
import textwrap
import warnings
from datetime import datetime, timedelta
-from typing import TYPE_CHECKING, Callable, Sequence
+from typing import TYPE_CHECKING, Any, Callable, Sequence
from google.api_core.retry import Retry
from google.cloud.storage.retry import DEFAULT_RETRY
@@ -75,6 +75,7 @@ class GCSObjectExistenceSensor(BaseSensorOperator):
delegate_to: str | None = None,
impersonation_chain: str | Sequence[str] | None = None,
retry: Retry = DEFAULT_RETRY,
+ deferrable: bool = False,
**kwargs,
) -> None:
@@ -90,6 +91,8 @@ class GCSObjectExistenceSensor(BaseSensorOperator):
self.impersonation_chain = impersonation_chain
self.retry = retry
+ self.deferrable = deferrable
+
def poke(self, context: Context) -> bool:
self.log.info("Sensor checks existence of : %s, %s", self.bucket,
self.object)
hook = GCSHook(
@@ -99,10 +102,43 @@ class GCSObjectExistenceSensor(BaseSensorOperator):
)
return hook.exists(self.bucket, self.object, self.retry)
+ def execute(self, context: Context) -> None:
+ """Airflow runs this method on the worker and defers using the
trigger."""
+ if not self.deferrable:
+ super().execute(context)
+ else:
+ self.defer(
+ timeout=timedelta(seconds=self.timeout),
+ trigger=GCSBlobTrigger(
+ bucket=self.bucket,
+ object_name=self.object,
+ poke_interval=self.poke_interval,
+ google_cloud_conn_id=self.google_cloud_conn_id,
+ hook_params={
+ "delegate_to": self.delegate_to,
+ "impersonation_chain": self.impersonation_chain,
+ },
+ ),
+ method_name="execute_complete",
+ )
+
+ def execute_complete(self, context: Context, event: dict[str, str]) -> str:
+ """
+ Callback for when the trigger fires - returns immediately.
+ Relies on trigger to throw an exception, otherwise it assumes
execution was
+ successful.
+ """
+ if event["status"] == "error":
+ raise AirflowException(event["message"])
+ self.log.info("File %s was found in bucket %s.", self.object,
self.bucket)
+ return event["message"]
+
class GCSObjectExistenceAsyncSensor(GCSObjectExistenceSensor):
"""
- Checks for the existence of a file in Google Cloud Storage .
+ Checks for the existence of a file in Google Cloud Storage.
+ Class `GCSObjectExistenceAsyncSensor` is deprecated and will be removed in
a future release.
+ Please use `GCSObjectExistenceSensor` and set `deferrable` attribute to
`True` instead
:param bucket: The Google Cloud Storage bucket where the object is.
:param object: The name of the object to check in the Google cloud storage
bucket.
@@ -120,33 +156,13 @@ class
GCSObjectExistenceAsyncSensor(GCSObjectExistenceSensor):
account from the list granting this role to the originating account
(templated).
"""
- def execute(self, context: Context) -> None:
- """Airflow runs this method on the worker and defers using the
trigger."""
- self.defer(
- timeout=timedelta(seconds=self.timeout),
- trigger=GCSBlobTrigger(
- bucket=self.bucket,
- object_name=self.object,
- poke_interval=self.poke_interval,
- google_cloud_conn_id=self.google_cloud_conn_id,
- hook_params={
- "delegate_to": self.delegate_to,
- "impersonation_chain": self.impersonation_chain,
- },
- ),
- method_name="execute_complete",
+ def __init__(self, **kwargs: Any) -> None:
+ warnings.warn(
+ "Class `GCSObjectExistenceAsyncSensor` is deprecated and will be
removed in a future release. "
+ "Please use `GCSObjectExistenceSensor` and set `deferrable`
attribute to `True` instead",
+ DeprecationWarning,
)
-
- def execute_complete(self, context: Context, event: dict[str, str]) -> str:
- """
- Callback for when the trigger fires - returns immediately.
- Relies on trigger to throw an exception, otherwise it assumes
execution was
- successful.
- """
- if event["status"] == "error":
- raise AirflowException(event["message"])
- self.log.info("File %s was found in bucket %s.", self.object,
self.bucket)
- return event["message"]
+ super().__init__(deferrable=True, **kwargs)
def ts_function(context):
diff --git a/docs/apache-airflow-providers-google/operators/cloud/gcs.rst
b/docs/apache-airflow-providers-google/operators/cloud/gcs.rst
index 2a037f6050..af32b74627 100644
--- a/docs/apache-airflow-providers-google/operators/cloud/gcs.rst
+++ b/docs/apache-airflow-providers-google/operators/cloud/gcs.rst
@@ -164,14 +164,20 @@ Use the
:class:`~airflow.providers.google.cloud.sensors.gcs.GCSObjectExistenceSe
:start-after: [START howto_sensor_object_exists_task]
:end-before: [END howto_sensor_object_exists_task]
+Also you can use deferrable mode in this operator if you would like to free up
the worker slots while the sensor is running.
+
+.. exampleinclude::
/../../tests/system/providers/google/cloud/gcs/example_gcs_sensor.py
+ :language: python
+ :dedent: 4
+ :start-after: [START howto_sensor_object_exists_task_defered]
+ :end-before: [END howto_sensor_object_exists_task_defered]
.. _howto/sensor:GCSObjectExistenceAsyncSensor:
GCSObjectExistenceAsyncSensor
-----------------------------
-Use the
:class:`~airflow.providers.google.cloud.sensors.gcs.GCSObjectExistenceAsyncSensor`
-(deferrable version) if you would like to free up the worker slots while the
sensor is running.
+:class:`~airflow.providers.google.cloud.sensors.gcs.GCSObjectExistenceAsyncSensor`
is deprecated and will be removed in a future release. Please use
:class:`~airflow.providers.google.cloud.sensors.gcs.GCSObjectExistenceSensor`
and use the deferrable mode in that operator.
.. exampleinclude::
/../../tests/system/providers/google/cloud/gcs/example_gcs_sensor.py
:language: python
diff --git a/tests/providers/google/cloud/sensors/test_gcs.py
b/tests/providers/google/cloud/sensors/test_gcs.py
index 642461f63a..2f4c6a40c7 100644
--- a/tests/providers/google/cloud/sensors/test_gcs.py
+++ b/tests/providers/google/cloud/sensors/test_gcs.py
@@ -98,47 +98,96 @@ class TestGoogleCloudStorageObjectSensor:
)
mock_hook.return_value.exists.assert_called_once_with(TEST_BUCKET,
TEST_OBJECT, DEFAULT_RETRY)
-
-class TestGoogleCloudStorageObjectSensorAsync:
- def test_gcs_object_existence_sensor_async(self):
+ def test_gcs_object_existence_sensor_deferred(self):
"""
Asserts that a task is deferred and a GCSBlobTrigger will be fired
- when the GCSObjectExistenceAsyncSensor is executed.
+ when the GCSObjectExistenceSensor is executed and deferrable is set to
True.
"""
- task = GCSObjectExistenceAsyncSensor(
+ task = GCSObjectExistenceSensor(
task_id="task-id",
bucket=TEST_BUCKET,
object=TEST_OBJECT,
google_cloud_conn_id=TEST_GCP_CONN_ID,
+ deferrable=True,
)
with pytest.raises(TaskDeferred) as exc:
task.execute(context)
assert isinstance(exc.value.trigger, GCSBlobTrigger), "Trigger is not
a GCSBlobTrigger"
- def test_gcs_object_existence_sensor_async_execute_failure(self):
- """Tests that an AirflowException is raised in case of error event"""
- task = GCSObjectExistenceAsyncSensor(
+ def test_gcs_object_existence_sensor_deferred_execute_failure(self):
+ """Tests that an AirflowException is raised in case of error event
when deferrable is set to True"""
+ task = GCSObjectExistenceSensor(
task_id="task-id",
bucket=TEST_BUCKET,
object=TEST_OBJECT,
google_cloud_conn_id=TEST_GCP_CONN_ID,
+ deferrable=True,
)
with pytest.raises(AirflowException):
task.execute_complete(context=None, event={"status": "error",
"message": "test failure message"})
def test_gcs_object_existence_sensor_async_execute_complete(self):
- """Asserts that logging occurs as expected"""
- task = GCSObjectExistenceAsyncSensor(
+ """Asserts that logging occurs as expected when deferrable is set to
True"""
+ task = GCSObjectExistenceSensor(
task_id="task-id",
bucket=TEST_BUCKET,
object=TEST_OBJECT,
google_cloud_conn_id=TEST_GCP_CONN_ID,
+ deferrable=True,
)
with mock.patch.object(task.log, "info") as mock_log_info:
task.execute_complete(context=None, event={"status": "success",
"message": "Job completed"})
mock_log_info.assert_called_with("File %s was found in bucket %s.",
TEST_OBJECT, TEST_BUCKET)
+class TestGoogleCloudStorageObjectSensorAsync:
+ depcrecation_message = (
+ "Class `GCSObjectExistenceAsyncSensor` is deprecated and will be
removed in a future release. "
+ "Please use `GCSObjectExistenceSensor` and set `deferrable` attribute
to `True` instead"
+ )
+
+ def test_gcs_object_existence_sensor_async(self):
+ """
+ Asserts that a task is deferred and a GCSBlobTrigger will be fired
+ when the GCSObjectExistenceAsyncSensor is executed.
+ """
+ with pytest.warns(DeprecationWarning, match=self.depcrecation_message):
+ task = GCSObjectExistenceAsyncSensor(
+ task_id="task-id",
+ bucket=TEST_BUCKET,
+ object=TEST_OBJECT,
+ google_cloud_conn_id=TEST_GCP_CONN_ID,
+ )
+ with pytest.raises(TaskDeferred) as exc:
+ task.execute(context)
+ assert isinstance(exc.value.trigger, GCSBlobTrigger), "Trigger is not
a GCSBlobTrigger"
+
+ def test_gcs_object_existence_sensor_async_execute_failure(self):
+ """Tests that an AirflowException is raised in case of error event"""
+ with pytest.warns(DeprecationWarning, match=self.depcrecation_message):
+ task = GCSObjectExistenceAsyncSensor(
+ task_id="task-id",
+ bucket=TEST_BUCKET,
+ object=TEST_OBJECT,
+ google_cloud_conn_id=TEST_GCP_CONN_ID,
+ )
+ with pytest.raises(AirflowException):
+ task.execute_complete(context=None, event={"status": "error",
"message": "test failure message"})
+
+ def test_gcs_object_existence_sensor_async_execute_complete(self):
+ """Asserts that logging occurs as expected"""
+ with pytest.warns(DeprecationWarning, match=self.depcrecation_message):
+ task = GCSObjectExistenceAsyncSensor(
+ task_id="task-id",
+ bucket=TEST_BUCKET,
+ object=TEST_OBJECT,
+ google_cloud_conn_id=TEST_GCP_CONN_ID,
+ )
+ with mock.patch.object(task.log, "info") as mock_log_info:
+ task.execute_complete(context=None, event={"status": "success",
"message": "Job completed"})
+ mock_log_info.assert_called_with("File %s was found in bucket %s.",
TEST_OBJECT, TEST_BUCKET)
+
+
class TestTsFunction:
def test_should_support_datetime(self):
context = {
diff --git a/tests/system/providers/google/cloud/gcs/example_gcs_sensor.py
b/tests/system/providers/google/cloud/gcs/example_gcs_sensor.py
index ef48fa28db..bc7a5dda5d 100644
--- a/tests/system/providers/google/cloud/gcs/example_gcs_sensor.py
+++ b/tests/system/providers/google/cloud/gcs/example_gcs_sensor.py
@@ -124,6 +124,12 @@ with models.DAG(
)
# [END howto_sensor_object_exists_task_async]
+ # [START howto_sensor_object_exists_task_defered]
+ gcs_object_exists_defered = GCSObjectExistenceSensor(
+ bucket=BUCKET_NAME, object=FILE_NAME,
task_id="gcs_object_exists_defered", deferrable=True
+ )
+ # [END howto_sensor_object_exists_task_defered]
+
# [START howto_sensor_object_with_prefix_exists_task]
gcs_object_with_prefix_exists = GCSObjectsWithPrefixExistenceSensor(
bucket=BUCKET_NAME,
@@ -144,7 +150,12 @@ with models.DAG(
sleep,
upload_file,
# TEST BODY
- [gcs_object_exists, gcs_object_exists_async,
gcs_object_with_prefix_exists],
+ [
+ gcs_object_exists,
+ gcs_object_exists_defered,
+ gcs_object_exists_async,
+ gcs_object_with_prefix_exists,
+ ],
# TEST TEARDOWN
delete_bucket,
)