This is an automated email from the ASF dual-hosted git repository.

mobuchowski pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git


The following commit(s) were added to refs/heads/main by this push:
     new 17e60b0a2b fix: Use prefixes instead of all file paths for OpenLineage 
datasets in GCSDeleteObjectsOperator (#39059)
17e60b0a2b is described below

commit 17e60b0a2b640a6974eeecca0765e600817cd097
Author: Kacper Muda <[email protected]>
AuthorDate: Thu Apr 18 17:12:48 2024 +0200

    fix: Use prefixes instead of all file paths for OpenLineage datasets in 
GCSDeleteObjectsOperator (#39059)
    
    Signed-off-by: Kacper Muda <[email protected]>
---
 airflow/providers/google/cloud/operators/gcs.py    | 34 +++++++-----
 tests/providers/google/cloud/operators/test_gcs.py | 61 +++++++++++++---------
 2 files changed, 58 insertions(+), 37 deletions(-)

diff --git a/airflow/providers/google/cloud/operators/gcs.py 
b/airflow/providers/google/cloud/operators/gcs.py
index 6c72378a43..f4c8ce932e 100644
--- a/airflow/providers/google/cloud/operators/gcs.py
+++ b/airflow/providers/google/cloud/operators/gcs.py
@@ -297,7 +297,7 @@ class GCSDeleteObjectsOperator(GoogleCloudBaseOperator):
         *,
         bucket_name: str,
         objects: list[str] | None = None,
-        prefix: str | None = None,
+        prefix: str | list[str] | None = None,
         gcp_conn_id: str = "google_cloud_default",
         impersonation_chain: str | Sequence[str] | None = None,
         **kwargs,
@@ -309,12 +309,14 @@ class GCSDeleteObjectsOperator(GoogleCloudBaseOperator):
         self.impersonation_chain = impersonation_chain
 
         if objects is None and prefix is None:
-            err_message = "(Task {task_id}) Either object or prefix should be 
set. Both are None.".format(
+            err_message = "(Task {task_id}) Either objects or prefix should be 
set. Both are None.".format(
                 **kwargs
             )
             raise ValueError(err_message)
+        if objects is not None and prefix is not None:
+            err_message = "(Task {task_id}) Objects or prefix should be set. 
Both provided.".format(**kwargs)
+            raise ValueError(err_message)
 
-        self._objects: list[str] = []
         super().__init__(**kwargs)
 
     def execute(self, context: Context) -> None:
@@ -324,15 +326,14 @@ class GCSDeleteObjectsOperator(GoogleCloudBaseOperator):
         )
 
         if self.objects is not None:
-            self._objects = self.objects
+            objects = self.objects
         else:
-            self._objects = hook.list(bucket_name=self.bucket_name, 
prefix=self.prefix)
-        self.log.info("Deleting %s objects from %s", len(self._objects), 
self.bucket_name)
-        for object_name in self._objects:
+            objects = hook.list(bucket_name=self.bucket_name, 
prefix=self.prefix)
+        self.log.info("Deleting %s objects from %s", len(objects), 
self.bucket_name)
+        for object_name in objects:
             hook.delete(bucket_name=self.bucket_name, object_name=object_name)
 
-    def get_openlineage_facets_on_complete(self, task_instance):
-        """Implement on_complete as execute() resolves object names."""
+    def get_openlineage_facets_on_start(self):
         from openlineage.client.facet import (
             LifecycleStateChange,
             LifecycleStateChangeDatasetFacet,
@@ -342,8 +343,17 @@ class GCSDeleteObjectsOperator(GoogleCloudBaseOperator):
 
         from airflow.providers.openlineage.extractors import OperatorLineage
 
-        if not self._objects:
-            return OperatorLineage()
+        objects = []
+        if self.objects is not None:
+            objects = self.objects
+        elif self.prefix is not None:
+            prefixes = [self.prefix] if isinstance(self.prefix, str) else 
self.prefix
+            for pref in prefixes:
+                # Use parent if not a file (dot not in name) and not a dir 
(ends with slash)
+                if "." not in pref.split("/")[-1] and not pref.endswith("/"):
+                    pref = Path(pref).parent.as_posix()
+                pref = "/" if pref in (".", "", "/") else pref.rstrip("/")
+                objects.append(pref)
 
         bucket_url = f"gs://{self.bucket_name}"
         input_datasets = [
@@ -360,7 +370,7 @@ class GCSDeleteObjectsOperator(GoogleCloudBaseOperator):
                     )
                 },
             )
-            for object_name in self._objects
+            for object_name in objects
         ]
 
         return OperatorLineage(inputs=input_datasets)
diff --git a/tests/providers/google/cloud/operators/test_gcs.py 
b/tests/providers/google/cloud/operators/test_gcs.py
index 6236aa5f23..0024ad6407 100644
--- a/tests/providers/google/cloud/operators/test_gcs.py
+++ b/tests/providers/google/cloud/operators/test_gcs.py
@@ -172,48 +172,59 @@ class TestGCSDeleteObjectsOperator:
             any_order=True,
         )
 
-    @mock.patch("airflow.providers.google.cloud.operators.gcs.GCSHook")
-    def test_get_openlineage_facets_on_complete(self, mock_hook):
+    @pytest.mark.parametrize(
+        ("objects", "prefix", "inputs"),
+        (
+            (["folder/a.txt", "b.json"], None, ["folder/a.txt", "b.json"]),
+            (["folder/a.txt", "folder/b.json"], None, ["folder/a.txt", 
"folder/b.json"]),
+            (None, ["folder/a.txt", "b.json"], ["folder/a.txt", "b.json"]),
+            (None, "dir/pre", ["dir"]),
+            (None, ["dir/"], ["dir"]),
+            (None, "", ["/"]),
+            (None, "/", ["/"]),
+            (None, "pre", ["/"]),
+            (None, "dir/pre*", ["dir"]),
+            (None, "*", ["/"]),
+        ),
+        ids=(
+            "objects",
+            "multiple objects in the same dir",
+            "objects as prefixes",
+            "directory with prefix",
+            "directory",
+            "empty prefix",
+            "slash as prefix",
+            "prefix with no ending slash",
+            "directory with prefix with wildcard",
+            "just wildcard",
+        ),
+    )
+    def test_get_openlineage_facets_on_start(self, objects, prefix, inputs):
         bucket_url = f"gs://{TEST_BUCKET}"
         expected_inputs = [
             Dataset(
                 namespace=bucket_url,
-                name="folder/a.txt",
+                name=name,
                 facets={
                     "lifecycleStateChange": LifecycleStateChangeDatasetFacet(
                         lifecycleStateChange=LifecycleStateChange.DROP.value,
                         
previousIdentifier=LifecycleStateChangeDatasetFacetPreviousIdentifier(
                             namespace=bucket_url,
-                            name="folder/a.txt",
+                            name=name,
                         ),
                     )
                 },
-            ),
-            Dataset(
-                namespace=bucket_url,
-                name="b.txt",
-                facets={
-                    "lifecycleStateChange": LifecycleStateChangeDatasetFacet(
-                        lifecycleStateChange=LifecycleStateChange.DROP.value,
-                        
previousIdentifier=LifecycleStateChangeDatasetFacetPreviousIdentifier(
-                            namespace=bucket_url,
-                            name="b.txt",
-                        ),
-                    )
-                },
-            ),
+            )
+            for name in inputs
         ]
 
         operator = GCSDeleteObjectsOperator(
-            task_id=TASK_ID, bucket_name=TEST_BUCKET, objects=["folder/a.txt", 
"b.txt"]
+            task_id=TASK_ID, bucket_name=TEST_BUCKET, objects=objects, 
prefix=prefix
         )
-
-        operator.execute(None)
-
-        lineage = operator.get_openlineage_facets_on_complete(None)
-        assert len(lineage.inputs) == 2
+        lineage = operator.get_openlineage_facets_on_start()
+        assert len(lineage.inputs) == len(inputs)
         assert len(lineage.outputs) == 0
-        assert lineage.inputs == expected_inputs
+        assert sorted(lineage.inputs) == sorted(expected_inputs)
 
 
 class TestGoogleCloudStorageListOperator:

Reply via email to