This is an automated email from the ASF dual-hosted git repository.
potiuk pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git
The following commit(s) were added to refs/heads/main by this push:
new 92335417d8 Add use_glob to GCSObjectExistenceSensor (#34137)
92335417d8 is described below
commit 92335417d881c01b0d2ef77ad254f3f3b491df4c
Author: Alberto Costa <[email protected]>
AuthorDate: Sun Dec 17 23:32:27 2023 +0100
Add use_glob to GCSObjectExistenceSensor (#34137)
---------
Co-authored-by: Alberto Costa <[email protected]>
---
airflow/providers/google/cloud/sensors/gcs.py | 11 ++++++-
airflow/providers/google/cloud/triggers/gcs.py | 16 ++++++++--
airflow/providers/google/provider.yaml | 2 +-
docs/apache-airflow/img/airflow_erd.sha256 | 2 +-
docs/apache-airflow/img/airflow_erd.svg | 4 +--
tests/providers/google/cloud/sensors/test_gcs.py | 22 +++++++++++++
tests/providers/google/cloud/triggers/test_gcs.py | 38 +++++++++++++++++++++++
7 files changed, 87 insertions(+), 8 deletions(-)
diff --git a/airflow/providers/google/cloud/sensors/gcs.py
b/airflow/providers/google/cloud/sensors/gcs.py
index 453bb3bf44..c5a80e2d55 100644
--- a/airflow/providers/google/cloud/sensors/gcs.py
+++ b/airflow/providers/google/cloud/sensors/gcs.py
@@ -50,6 +50,7 @@ class GCSObjectExistenceSensor(BaseSensorOperator):
:param bucket: The Google Cloud Storage bucket where the object is.
:param object: The name of the object to check in the Google cloud
storage bucket.
+ :param use_glob: When set to True the object parameter is interpreted as
glob
:param google_cloud_conn_id: The connection ID to use when
connecting to Google Cloud Storage.
:param impersonation_chain: Optional service account to impersonate using
short-term
@@ -75,6 +76,7 @@ class GCSObjectExistenceSensor(BaseSensorOperator):
*,
bucket: str,
object: str,
+ use_glob: bool = False,
google_cloud_conn_id: str = "google_cloud_default",
impersonation_chain: str | Sequence[str] | None = None,
retry: Retry = DEFAULT_RETRY,
@@ -84,7 +86,9 @@ class GCSObjectExistenceSensor(BaseSensorOperator):
super().__init__(**kwargs)
self.bucket = bucket
self.object = object
+ self.use_glob = use_glob
self.google_cloud_conn_id = google_cloud_conn_id
+ self._matches: list[str] = []
self.impersonation_chain = impersonation_chain
self.retry = retry
@@ -96,7 +100,11 @@ class GCSObjectExistenceSensor(BaseSensorOperator):
gcp_conn_id=self.google_cloud_conn_id,
impersonation_chain=self.impersonation_chain,
)
- return hook.exists(self.bucket, self.object, self.retry)
+ if self.use_glob:
+ self._matches = hook.list(self.bucket, match_glob=self.object)
+ return bool(self._matches)
+ else:
+ return hook.exists(self.bucket, self.object, self.retry)
def execute(self, context: Context) -> None:
"""Airflow runs this method on the worker and defers using the
trigger."""
@@ -109,6 +117,7 @@ class GCSObjectExistenceSensor(BaseSensorOperator):
trigger=GCSBlobTrigger(
bucket=self.bucket,
object_name=self.object,
+ use_glob=self.use_glob,
poke_interval=self.poke_interval,
google_cloud_conn_id=self.google_cloud_conn_id,
hook_params={
diff --git a/airflow/providers/google/cloud/triggers/gcs.py
b/airflow/providers/google/cloud/triggers/gcs.py
index a5d181cd05..f801e5ae9a 100644
--- a/airflow/providers/google/cloud/triggers/gcs.py
+++ b/airflow/providers/google/cloud/triggers/gcs.py
@@ -35,6 +35,7 @@ class GCSBlobTrigger(BaseTrigger):
:param bucket: the bucket in the google cloud storage where the objects
are residing.
:param object_name: the file or folder present in the bucket
+ :param use_glob: if true object_name is interpreted as glob
:param google_cloud_conn_id: reference to the Google Connection
:param poke_interval: polling period in seconds to check for file/folder
:param hook_params: Extra config params to be passed to the underlying
hook.
@@ -45,6 +46,7 @@ class GCSBlobTrigger(BaseTrigger):
self,
bucket: str,
object_name: str,
+ use_glob: bool,
poke_interval: float,
google_cloud_conn_id: str,
hook_params: dict[str, Any],
@@ -52,6 +54,7 @@ class GCSBlobTrigger(BaseTrigger):
super().__init__()
self.bucket = bucket
self.object_name = object_name
+ self.use_glob = use_glob
self.poke_interval = poke_interval
self.google_cloud_conn_id: str = google_cloud_conn_id
self.hook_params = hook_params
@@ -63,6 +66,7 @@ class GCSBlobTrigger(BaseTrigger):
{
"bucket": self.bucket,
"object_name": self.object_name,
+ "use_glob": self.use_glob,
"poke_interval": self.poke_interval,
"google_cloud_conn_id": self.google_cloud_conn_id,
"hook_params": self.hook_params,
@@ -98,9 +102,14 @@ class GCSBlobTrigger(BaseTrigger):
async with ClientSession() as s:
client = await hook.get_storage_client(s)
bucket = client.get_bucket(bucket_name)
- object_response = await bucket.blob_exists(blob_name=object_name)
- if object_response:
- return "success"
+ if self.use_glob:
+ list_blobs_response = await
bucket.list_blobs(match_glob=object_name)
+ if len(list_blobs_response) > 0:
+ return "success"
+ else:
+ blob_exists_response = await
bucket.blob_exists(blob_name=object_name)
+ if blob_exists_response:
+ return "success"
return "pending"
@@ -234,6 +243,7 @@ class GCSPrefixBlobTrigger(GCSBlobTrigger):
poke_interval=poke_interval,
google_cloud_conn_id=google_cloud_conn_id,
hook_params=hook_params,
+ use_glob=False,
)
self.prefix = prefix
diff --git a/airflow/providers/google/provider.yaml
b/airflow/providers/google/provider.yaml
index 1c6859696e..1d7bfd317e 100644
--- a/airflow/providers/google/provider.yaml
+++ b/airflow/providers/google/provider.yaml
@@ -87,7 +87,7 @@ dependencies:
- asgiref>=3.5.2
- gcloud-aio-auth>=4.0.0,<5.0.0
- gcloud-aio-bigquery>=6.1.2
- - gcloud-aio-storage
+ - gcloud-aio-storage>=9.0.0
- gcsfs>=2023.10.0
- google-ads>=22.1.0
- google-api-core>=2.11.0
diff --git a/docs/apache-airflow/img/airflow_erd.sha256
b/docs/apache-airflow/img/airflow_erd.sha256
index ded2722d37..eb2a21ae34 100644
--- a/docs/apache-airflow/img/airflow_erd.sha256
+++ b/docs/apache-airflow/img/airflow_erd.sha256
@@ -1 +1 @@
-a5677b0b603e8835f92da4b8b061ec268ce7257ef6b446f12593743ecf90710a
\ No newline at end of file
+194706fc390025f473f73ce934bfe4b394b50ce76748e5df33ae643e38538357
\ No newline at end of file
diff --git a/docs/apache-airflow/img/airflow_erd.svg
b/docs/apache-airflow/img/airflow_erd.svg
index 497ef76975..8e85b5fa0c 100644
--- a/docs/apache-airflow/img/airflow_erd.svg
+++ b/docs/apache-airflow/img/airflow_erd.svg
@@ -1342,14 +1342,14 @@
<g id="edge41" class="edge">
<title>task_instance--xcom</title>
<path fill="none" stroke="#7f7f7f" stroke-dasharray="5,2"
d="M1166.1,-776.37C1196.72,-770.7 1228.55,-765.52 1258.36,-761.38"/>
-<text text-anchor="start" x="1227.36" y="-750.18" font-family="Times,serif"
font-size="14.00">0..N</text>
+<text text-anchor="start" x="1248.36" y="-750.18" font-family="Times,serif"
font-size="14.00">1</text>
<text text-anchor="start" x="1166.1" y="-765.17" font-family="Times,serif"
font-size="14.00">1</text>
</g>
<!-- task_instance--xcom -->
<g id="edge42" class="edge">
<title>task_instance--xcom</title>
<path fill="none" stroke="#7f7f7f" stroke-dasharray="5,2"
d="M1166.1,-789.67C1196.72,-784.35 1228.55,-779.06 1258.36,-774.33"/>
-<text text-anchor="start" x="1248.36" y="-778.13" font-family="Times,serif"
font-size="14.00">1</text>
+<text text-anchor="start" x="1227.36" y="-778.13" font-family="Times,serif"
font-size="14.00">0..N</text>
<text text-anchor="start" x="1166.1" y="-793.47" font-family="Times,serif"
font-size="14.00">1</text>
</g>
<!-- task_instance--xcom -->
diff --git a/tests/providers/google/cloud/sensors/test_gcs.py
b/tests/providers/google/cloud/sensors/test_gcs.py
index 1d4bbcec87..37697ff58d 100644
--- a/tests/providers/google/cloud/sensors/test_gcs.py
+++ b/tests/providers/google/cloud/sensors/test_gcs.py
@@ -94,6 +94,7 @@ class TestGoogleCloudStorageObjectSensor:
task_id="task-id",
bucket=TEST_BUCKET,
object=TEST_OBJECT,
+ use_glob=False,
google_cloud_conn_id=TEST_GCP_CONN_ID,
impersonation_chain=TEST_IMPERSONATION_CHAIN,
)
@@ -108,6 +109,27 @@ class TestGoogleCloudStorageObjectSensor:
)
mock_hook.return_value.exists.assert_called_once_with(TEST_BUCKET,
TEST_OBJECT, DEFAULT_RETRY)
+ @mock.patch("airflow.providers.google.cloud.sensors.gcs.GCSHook")
+ def test_should_pass_argument_to_hook_using_glob(self, mock_hook):
+ task = GCSObjectExistenceSensor(
+ task_id="task-id",
+ bucket=TEST_BUCKET,
+ object=TEST_OBJECT,
+ use_glob=True,
+ google_cloud_conn_id=TEST_GCP_CONN_ID,
+ impersonation_chain=TEST_IMPERSONATION_CHAIN,
+ )
+ mock_hook.return_value.list.return_value = [mock.MagicMock()]
+
+ result = task.poke(mock.MagicMock())
+
+ assert result is True
+ mock_hook.assert_called_once_with(
+ gcp_conn_id=TEST_GCP_CONN_ID,
+ impersonation_chain=TEST_IMPERSONATION_CHAIN,
+ )
+ mock_hook.return_value.list.assert_called_once_with(TEST_BUCKET,
match_glob=TEST_OBJECT)
+
@mock.patch("airflow.providers.google.cloud.sensors.gcs.GCSHook")
@mock.patch("airflow.providers.google.cloud.sensors.gcs.GCSObjectExistenceSensor.defer")
def test_gcs_object_existence_sensor_finish_before_deferred(self,
mock_defer, mock_hook):
diff --git a/tests/providers/google/cloud/triggers/test_gcs.py
b/tests/providers/google/cloud/triggers/test_gcs.py
index 4fcde67711..3c4bc9031a 100644
--- a/tests/providers/google/cloud/triggers/test_gcs.py
+++ b/tests/providers/google/cloud/triggers/test_gcs.py
@@ -55,6 +55,19 @@ def trigger():
return GCSBlobTrigger(
bucket=TEST_BUCKET,
object_name=TEST_OBJECT,
+ use_glob=False,
+ poke_interval=TEST_POLLING_INTERVAL,
+ google_cloud_conn_id=TEST_GCP_CONN_ID,
+ hook_params=TEST_HOOK_PARAMS,
+ )
+
+
[email protected]
+def trigger_using_glob():
+ return GCSBlobTrigger(
+ bucket=TEST_BUCKET,
+ object_name=TEST_OBJECT,
+ use_glob=True,
poke_interval=TEST_POLLING_INTERVAL,
google_cloud_conn_id=TEST_GCP_CONN_ID,
hook_params=TEST_HOOK_PARAMS,
@@ -73,6 +86,7 @@ class TestGCSBlobTrigger:
assert kwargs == {
"bucket": TEST_BUCKET,
"object_name": TEST_OBJECT,
+ "use_glob": False,
"poke_interval": TEST_POLLING_INTERVAL,
"google_cloud_conn_id": TEST_GCP_CONN_ID,
"hook_params": TEST_HOOK_PARAMS,
@@ -141,6 +155,30 @@ class TestGCSBlobTrigger:
assert res == response
bucket.blob_exists.assert_called_once_with(blob_name=TEST_OBJECT)
+ @pytest.mark.asyncio
+ @pytest.mark.parametrize(
+ "blob_list,response",
+ [
+ ([TEST_OBJECT], "success"),
+ ([], "pending"),
+ ],
+ )
+ async def test_object_exists_using_glob(self, blob_list, response,
trigger_using_glob):
+ """
+ Tests to check if a particular object in Google Cloud Storage
+ is found or not
+ """
+ hook = AsyncMock(GCSAsyncHook)
+ storage = AsyncMock(Storage)
+ hook.get_storage_client.return_value = storage
+ bucket = AsyncMock(Bucket)
+ storage.get_bucket.return_value = bucket
+ bucket.list_blobs.return_value = blob_list
+
+ res = await trigger_using_glob._object_exists(hook, TEST_BUCKET,
TEST_OBJECT)
+ assert res == response
+ bucket.list_blobs.assert_called_once_with(match_glob=TEST_OBJECT)
+
class TestGCSPrefixBlobTrigger:
TRIGGER = GCSPrefixBlobTrigger(