This is an automated email from the ASF dual-hosted git repository.

mobuchowski pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git


The following commit(s) were added to refs/heads/main by this push:
     new 927e3643c2 fix: Use prefixes instead of full file paths for 
OpenLineage datasets in GCSToGCSOperator (#39058)
927e3643c2 is described below

commit 927e3643c2f901a3ac85f8dc94541ba83b3c6755
Author: Kacper Muda <[email protected]>
AuthorDate: Thu Apr 18 16:11:14 2024 +0200

    fix: Use prefixes instead of full file paths for OpenLineage datasets in 
GCSToGCSOperator (#39058)
    
    Signed-off-by: Kacper Muda <[email protected]>
---
 .../google/cloud/transfers/gcs_to_bigquery.py      |   8 +-
 .../providers/google/cloud/transfers/gcs_to_gcs.py |  34 ++--
 .../google/cloud/transfers/test_gcs_to_gcs.py      | 207 ++++++++++++++-------
 3 files changed, 165 insertions(+), 84 deletions(-)

diff --git a/airflow/providers/google/cloud/transfers/gcs_to_bigquery.py 
b/airflow/providers/google/cloud/transfers/gcs_to_bigquery.py
index 03aefcb8ad..3899048dc4 100644
--- a/airflow/providers/google/cloud/transfers/gcs_to_bigquery.py
+++ b/airflow/providers/google/cloud/transfers/gcs_to_bigquery.py
@@ -749,7 +749,6 @@ class GCSToBigQueryOperator(BaseOperator):
         )
         from openlineage.client.run import Dataset
 
-        from airflow.providers.google.cloud.hooks.gcs import _parse_gcs_url
         from airflow.providers.google.cloud.utils.openlineage import (
             get_facets_from_bq_table,
             get_identity_column_lineage_facet,
@@ -766,8 +765,7 @@ class GCSToBigQueryOperator(BaseOperator):
             "schema": output_dataset_facets["schema"],
         }
         input_datasets = []
-        for uri in sorted(self.source_uris):
-            bucket, blob = _parse_gcs_url(uri)
+        for blob in sorted(self.source_objects):
             additional_facets = {}
 
             if "*" in blob:
@@ -777,7 +775,7 @@ class GCSToBigQueryOperator(BaseOperator):
                     "symlink": SymlinksDatasetFacet(
                         identifiers=[
                             SymlinksDatasetFacetIdentifiers(
-                                namespace=f"gs://{bucket}", name=blob, 
type="file"
+                                namespace=f"gs://{self.bucket}", name=blob, 
type="file"
                             )
                         ]
                     ),
@@ -788,7 +786,7 @@ class GCSToBigQueryOperator(BaseOperator):
                     blob = "/"
 
             dataset = Dataset(
-                namespace=f"gs://{bucket}",
+                namespace=f"gs://{self.bucket}",
                 name=blob,
                 facets=merge_dicts(input_dataset_facets, additional_facets),
             )
diff --git a/airflow/providers/google/cloud/transfers/gcs_to_gcs.py 
b/airflow/providers/google/cloud/transfers/gcs_to_gcs.py
index 8d07dca58b..0b3d330b65 100644
--- a/airflow/providers/google/cloud/transfers/gcs_to_gcs.py
+++ b/airflow/providers/google/cloud/transfers/gcs_to_gcs.py
@@ -234,8 +234,6 @@ class GCSToGCSOperator(BaseOperator):
         self.source_object_required = source_object_required
         self.exact_match = exact_match
         self.match_glob = match_glob
-        self.resolved_source_objects: set[str] = set()
-        self.resolved_target_objects: set[str] = set()
 
     def execute(self, context: Context):
         hook = GCSHook(
@@ -540,13 +538,6 @@ class GCSToGCSOperator(BaseOperator):
             self.destination_bucket,
             destination_object,
         )
-
-        self.resolved_source_objects.add(source_object)
-        if not destination_object:
-            self.resolved_target_objects.add(source_object)
-        else:
-            self.resolved_target_objects.add(destination_object)
-
         hook.rewrite(self.source_bucket, source_object, 
self.destination_bucket, destination_object)
 
         if self.move_object:
@@ -559,17 +550,36 @@ class GCSToGCSOperator(BaseOperator):
         This means we won't have to normalize self.source_object and 
self.source_objects,
         destination bucket and so on.
         """
+        from pathlib import Path
+
         from openlineage.client.run import Dataset
 
         from airflow.providers.openlineage.extractors import OperatorLineage
 
+        def _process_prefix(pref):
+            if WILDCARD in pref:
+                pref = pref.split(WILDCARD)[0]
+            # Use parent if not a file (dot not in name) and not a dir (ends 
with slash)
+            if "." not in pref.split("/")[-1] and not pref.endswith("/"):
+                pref = Path(pref).parent.as_posix()
+            return ["/" if pref in ("", "/", ".") else pref.rstrip("/")]  # 
Adjust root path
+
+        inputs = []
+        for prefix in self.source_objects:
+            result = _process_prefix(prefix)
+            inputs.extend(result)
+
+        if self.destination_object is None:
+            outputs = inputs.copy()
+        else:
+            outputs = _process_prefix(self.destination_object)
+
         return OperatorLineage(
             inputs=[
-                Dataset(namespace=f"gs://{self.source_bucket}", name=source)
-                for source in sorted(self.resolved_source_objects)
+                Dataset(namespace=f"gs://{self.source_bucket}", name=source) 
for source in sorted(set(inputs))
             ],
             outputs=[
                 Dataset(namespace=f"gs://{self.destination_bucket}", 
name=target)
-                for target in sorted(self.resolved_target_objects)
+                for target in sorted(set(outputs))
             ],
         )
diff --git a/tests/providers/google/cloud/transfers/test_gcs_to_gcs.py 
b/tests/providers/google/cloud/transfers/test_gcs_to_gcs.py
index f961038f5d..97e16e7366 100644
--- a/tests/providers/google/cloud/transfers/test_gcs_to_gcs.py
+++ b/tests/providers/google/cloud/transfers/test_gcs_to_gcs.py
@@ -23,7 +23,7 @@ from unittest import mock
 import pytest
 from openlineage.client.run import Dataset
 
-from airflow.exceptions import AirflowException
+from airflow.exceptions import AirflowException, 
AirflowProviderDeprecationWarning
 from airflow.providers.google.cloud.transfers.gcs_to_gcs import WILDCARD, 
GCSToGCSOperator
 
 TASK_ID = "test-gcs-to-gcs-operator"
@@ -829,74 +829,147 @@ class TestGoogleCloudStorageToCloudStorageOperator:
         ]
         mock_hook.return_value.rewrite.assert_has_calls(mock_calls)
 
+    @pytest.mark.parametrize(
+        ("source_objects", "destination_object", "inputs", "outputs"),
+        (
+            (
+                SOURCE_OBJECTS_SINGLE_FILE,
+                None,
+                [Dataset(namespace=f"gs://{TEST_BUCKET}", 
name=SOURCE_OBJECTS_SINGLE_FILE[0])],
+                [Dataset(namespace=f"gs://{DESTINATION_BUCKET}", 
name=SOURCE_OBJECTS_SINGLE_FILE[0])],
+            ),
+            (
+                SOURCE_OBJECTS_SINGLE_FILE,
+                "target.txt",
+                [Dataset(namespace=f"gs://{TEST_BUCKET}", 
name=SOURCE_OBJECTS_SINGLE_FILE[0])],
+                [Dataset(namespace=f"gs://{DESTINATION_BUCKET}", 
name="target.txt")],
+            ),
+            (
+                SOURCE_OBJECTS_SINGLE_FILE,
+                "target_pre",
+                [Dataset(namespace=f"gs://{TEST_BUCKET}", 
name=SOURCE_OBJECTS_SINGLE_FILE[0])],
+                [Dataset(namespace=f"gs://{DESTINATION_BUCKET}", name="/")],
+            ),
+            (
+                SOURCE_OBJECTS_SINGLE_FILE,
+                "dir/",
+                [Dataset(namespace=f"gs://{TEST_BUCKET}", 
name=SOURCE_OBJECTS_SINGLE_FILE[0])],
+                [Dataset(namespace=f"gs://{DESTINATION_BUCKET}", name="dir")],
+            ),
+            (
+                SOURCE_OBJECTS_LIST,
+                "",
+                [
+                    Dataset(namespace=f"gs://{TEST_BUCKET}", 
name=SOURCE_OBJECTS_LIST[0]),
+                    Dataset(namespace=f"gs://{TEST_BUCKET}", 
name=SOURCE_OBJECTS_LIST[1]),
+                    Dataset(namespace=f"gs://{TEST_BUCKET}", 
name=SOURCE_OBJECTS_LIST[2]),
+                ],
+                [
+                    Dataset(namespace=f"gs://{DESTINATION_BUCKET}", name="/"),
+                ],
+            ),
+            (
+                [*SOURCE_OBJECTS_LIST, "dir/*"],
+                "parent/pre_",
+                [
+                    Dataset(namespace=f"gs://{TEST_BUCKET}", 
name=SOURCE_OBJECTS_LIST[0]),
+                    Dataset(namespace=f"gs://{TEST_BUCKET}", 
name=SOURCE_OBJECTS_LIST[1]),
+                    Dataset(namespace=f"gs://{TEST_BUCKET}", 
name=SOURCE_OBJECTS_LIST[2]),
+                    Dataset(namespace=f"gs://{TEST_BUCKET}", name="dir"),
+                ],
+                [Dataset(namespace=f"gs://{DESTINATION_BUCKET}", 
name="parent")],
+            ),
+            (
+                SOURCE_OBJECTS_NO_FILE,
+                "no_ending_slash",
+                [Dataset(namespace=f"gs://{TEST_BUCKET}", name="/")],
+                [Dataset(namespace=f"gs://{DESTINATION_BUCKET}", name="/")],
+            ),
+            (
+                [
+                    f"dir/{SOURCE_OBJECT_WILDCARD_PREFIX}",
+                    f"dir/{SOURCE_OBJECT_WILDCARD_SUFFIX}",
+                    f"dir/{SOURCE_OBJECT_WILDCARD_MIDDLE}",
+                    f"dir/{SOURCE_OBJECT_WILDCARD_FILENAME}",
+                    "dir/*",
+                    "dir/",
+                    "dir/pre_",
+                ],
+                "/",
+                [
+                    Dataset(namespace=f"gs://{TEST_BUCKET}", name="dir"),
+                ],
+                [
+                    Dataset(namespace=f"gs://{DESTINATION_BUCKET}", name="/"),
+                ],
+            ),
+            (
+                ["", "dir/pre", SOURCE_OBJECTS_SINGLE_FILE[0]],
+                DESTINATION_OBJECT,
+                [
+                    Dataset(namespace=f"gs://{TEST_BUCKET}", name="/"),
+                    Dataset(namespace=f"gs://{TEST_BUCKET}", name="dir"),
+                    Dataset(namespace=f"gs://{TEST_BUCKET}", 
name=SOURCE_OBJECTS_SINGLE_FILE[0]),
+                ],
+                [
+                    Dataset(namespace=f"gs://{DESTINATION_BUCKET}", 
name=DESTINATION_OBJECT_PREFIX),
+                ],
+            ),
+            (
+                [
+                    "",
+                    "dir/",
+                ],
+                None,
+                [
+                    Dataset(namespace=f"gs://{TEST_BUCKET}", name="/"),
+                    Dataset(namespace=f"gs://{TEST_BUCKET}", name="dir"),
+                ],
+                [
+                    Dataset(namespace=f"gs://{DESTINATION_BUCKET}", name="/"),
+                    Dataset(namespace=f"gs://{DESTINATION_BUCKET}", 
name="dir"),
+                ],
+            ),
+        ),
+        ids=(
+            "single file without output",
+            "single file with single file output",
+            "single file with prefix output",
+            "single file with dir output",
+            "multiple file with empty output",
+            "multiple file with prefix as output",
+            "empty prefix with prefix as output",
+            "directory + prefix or wildcard without output",
+            "mixed prefixes and file paths with output dir",
+            "empty prefix + directory without output",
+        ),
+    )
     @mock.patch("airflow.providers.google.cloud.transfers.gcs_to_gcs.GCSHook")
-    def test_execute_simple_reports_openlineage(self, mock_hook):
-        operator = GCSToGCSOperator(
-            task_id=TASK_ID,
-            source_bucket=TEST_BUCKET,
-            source_object=SOURCE_OBJECTS_SINGLE_FILE[0],
-            destination_bucket=DESTINATION_BUCKET,
-        )
-
-        operator.execute(None)
-
-        lineage = operator.get_openlineage_facets_on_complete(None)
-        assert len(lineage.inputs) == 1
-        assert len(lineage.outputs) == 1
-        assert lineage.inputs[0] == Dataset(
-            namespace=f"gs://{TEST_BUCKET}", name=SOURCE_OBJECTS_SINGLE_FILE[0]
-        )
-        assert lineage.outputs[0] == Dataset(
-            namespace=f"gs://{DESTINATION_BUCKET}", 
name=SOURCE_OBJECTS_SINGLE_FILE[0]
-        )
-
-    @mock.patch("airflow.providers.google.cloud.transfers.gcs_to_gcs.GCSHook")
-    def test_execute_multiple_reports_openlineage(self, mock_hook):
-        operator = GCSToGCSOperator(
-            task_id=TASK_ID,
-            source_bucket=TEST_BUCKET,
-            source_objects=SOURCE_OBJECTS_LIST,
-            destination_bucket=DESTINATION_BUCKET,
-            destination_object=DESTINATION_OBJECT,
-        )
-
-        operator.execute(None)
-
-        lineage = operator.get_openlineage_facets_on_complete(None)
-        assert len(lineage.inputs) == 3
-        assert len(lineage.outputs) == 1
-        assert lineage.inputs == [
-            Dataset(namespace=f"gs://{TEST_BUCKET}", 
name=SOURCE_OBJECTS_LIST[0]),
-            Dataset(namespace=f"gs://{TEST_BUCKET}", 
name=SOURCE_OBJECTS_LIST[1]),
-            Dataset(namespace=f"gs://{TEST_BUCKET}", 
name=SOURCE_OBJECTS_LIST[2]),
-        ]
-        assert lineage.outputs[0] == 
Dataset(namespace=f"gs://{DESTINATION_BUCKET}", name=DESTINATION_OBJECT)
-
-    @mock.patch("airflow.providers.google.cloud.transfers.gcs_to_gcs.GCSHook")
-    def test_execute_wildcard_reports_openlineage(self, mock_hook):
-        mock_hook.return_value.list.return_value = [
-            "test_object1.txt",
-            "test_object2.txt",
-        ]
-
-        operator = GCSToGCSOperator(
-            task_id=TASK_ID,
-            source_bucket=TEST_BUCKET,
-            source_object=SOURCE_OBJECT_WILDCARD_SUFFIX,
-            destination_bucket=DESTINATION_BUCKET,
-            destination_object=DESTINATION_OBJECT,
-        )
+    def test_get_openlineage_facets_on_complete(
+        self, mock_hook, source_objects, destination_object, inputs, outputs
+    ):
+        if source_objects and any(WILDCARD in obj for obj in source_objects):
+            with pytest.warns(AirflowProviderDeprecationWarning, match="Usage 
of wildcard"):
+                operator = GCSToGCSOperator(
+                    task_id=TASK_ID,
+                    source_bucket=TEST_BUCKET,
+                    source_objects=source_objects,
+                    destination_bucket=DESTINATION_BUCKET,
+                    destination_object=destination_object,
+                )
+        else:
+            operator = GCSToGCSOperator(
+                task_id=TASK_ID,
+                source_bucket=TEST_BUCKET,
+                source_objects=source_objects,
+                destination_bucket=DESTINATION_BUCKET,
+                destination_object=destination_object,
+            )
 
         operator.execute(None)
 
         lineage = operator.get_openlineage_facets_on_complete(None)
-        assert len(lineage.inputs) == 2
-        assert len(lineage.outputs) == 2
-        assert lineage.inputs == [
-            Dataset(namespace=f"gs://{TEST_BUCKET}", name="test_object1.txt"),
-            Dataset(namespace=f"gs://{TEST_BUCKET}", name="test_object2.txt"),
-        ]
-        assert lineage.outputs == [
-            Dataset(namespace=f"gs://{DESTINATION_BUCKET}", 
name="foo/bar/1.txt"),
-            Dataset(namespace=f"gs://{DESTINATION_BUCKET}", 
name="foo/bar/2.txt"),
-        ]
+        assert len(lineage.inputs) == len(inputs)
+        assert len(lineage.outputs) == len(outputs)
+        assert sorted(lineage.inputs) == sorted(inputs)
+        assert sorted(lineage.outputs) == sorted(outputs)

Reply via email to