This is an automated email from the ASF dual-hosted git repository.

potiuk pushed a commit to branch v2-8-test
in repository https://gitbox.apache.org/repos/asf/airflow.git

commit 950b08f42505dbcce898415107777392bc71e054
Author: Bolke de Bruin <[email protected]>
AuthorDate: Thu Jan 4 18:45:25 2024 +0100

    Add support for openlineage to AFS and common.io (#36410)
    
    This adds low level support for open lineage to ObjectStorage
    and integrates it into common.io.
    
    (cherry picked from commit 33996a49f15cff35b6c23f245768243167944db6)
---
 airflow/io/path.py                                 | 11 ++++++-
 .../providers/common/io/operators/file_transfer.py | 37 +++++++++++++++-------
 dev/breeze/tests/test_selective_checks.py          |  4 +--
 .../apache-airflow/core-concepts/objectstorage.rst |  8 ++++-
 generated/provider_dependencies.json               |  4 ++-
 tests/io/test_path.py                              | 11 ++++---
 .../common/io/operators/test_file_transfer.py      | 24 ++++++++++++++
 7 files changed, 78 insertions(+), 21 deletions(-)

diff --git a/airflow/io/path.py b/airflow/io/path.py
index bd7c320653..d65d837e7e 100644
--- a/airflow/io/path.py
+++ b/airflow/io/path.py
@@ -52,6 +52,9 @@ class _AirflowCloudAccessor(_CloudAccessor):
         conn_id: str | None = None,
         **kwargs: typing.Any,
     ) -> None:
+        # warning: we are not calling super().__init__ here
+        # as it will try to create a new fs from a different
+        # set if registered filesystems
         if parsed_url and parsed_url.scheme:
             self._store = attach(parsed_url.scheme, conn_id)
         else:
@@ -173,10 +176,16 @@ class ObjectStoragePath(CloudPath):
     @property
     def key(self) -> str:
         if self._url:
-            return self._url.path
+            # per convention, we strip the leading slashes to ensure a 
relative key is returned
+            # we keep the trailing slash to allow for directory-like semantics
+            return self._url.path.lstrip(self.sep)
         else:
             return ""
 
+    @property
+    def namespace(self) -> str:
+        return f"{self.protocol}://{self.bucket}" if self.bucket else 
self.protocol
+
     def stat(self) -> stat_result:  # type: ignore[override]
         """Call ``stat`` and return the result."""
         return stat_result(
diff --git a/airflow/providers/common/io/operators/file_transfer.py 
b/airflow/providers/common/io/operators/file_transfer.py
index e720f78666..e79212882d 100644
--- a/airflow/providers/common/io/operators/file_transfer.py
+++ b/airflow/providers/common/io/operators/file_transfer.py
@@ -23,6 +23,7 @@ from airflow.io.path import ObjectStoragePath
 from airflow.models import BaseOperator
 
 if TYPE_CHECKING:
+    from airflow.providers.openlineage.extractors import OperatorLineage
     from airflow.utils.context import Context
 
 
@@ -64,21 +65,33 @@ class FileTransferOperator(BaseOperator):
         self.overwrite = overwrite
 
     def execute(self, context: Context) -> None:
-        src: ObjectStoragePath
-        dst: ObjectStoragePath
-
-        if isinstance(self.src, str):
-            src = ObjectStoragePath(self.src, conn_id=self.source_conn_id)
-        else:
-            src = self.src
-
-        if isinstance(self.dst, str):
-            dst = ObjectStoragePath(self.dst, conn_id=self.dst_conn_id)
-        else:
-            dst = self.dst
+        src: ObjectStoragePath = self._get_path(self.src, self.source_conn_id)
+        dst: ObjectStoragePath = self._get_path(self.dst, self.dst_conn_id)
 
         if not self.overwrite:
             if dst.exists() and dst.is_file():
                 raise ValueError(f"Destination {dst} already exists")
 
         src.copy(dst)
+
+    def get_openlineage_facets_on_start(self) -> OperatorLineage:
+        from openlineage.client.run import Dataset
+
+        from airflow.providers.openlineage.extractors import OperatorLineage
+
+        src: ObjectStoragePath = self._get_path(self.src, self.source_conn_id)
+        dst: ObjectStoragePath = self._get_path(self.dst, self.dst_conn_id)
+
+        input_dataset = Dataset(namespace=src.namespace, name=src.key)
+        output_dataset = Dataset(namespace=dst.namespace, name=dst.key)
+
+        return OperatorLineage(
+            inputs=[input_dataset],
+            outputs=[output_dataset],
+        )
+
+    @staticmethod
+    def _get_path(path: str | ObjectStoragePath, conn_id: str | None) -> 
ObjectStoragePath:
+        if isinstance(path, str):
+            return ObjectStoragePath(path, conn_id=conn_id)
+        return path
diff --git a/dev/breeze/tests/test_selective_checks.py 
b/dev/breeze/tests/test_selective_checks.py
index 495c98fd87..8c11e2628e 100644
--- a/dev/breeze/tests/test_selective_checks.py
+++ b/dev/breeze/tests/test_selective_checks.py
@@ -525,7 +525,7 @@ def assert_outputs_are_printed(expected_outputs: dict[str, 
str], stderr: str):
                 "tests/providers/common/io/operators/test_file_transfer.py",
             ),
             {
-                "affected-providers-list-as-string": "common.io",
+                "affected-providers-list-as-string": "common.io openlineage",
                 "all-python-versions": "['3.8']",
                 "all-python-versions-list-as-string": "3.8",
                 "python-versions": "['3.8']",
@@ -538,7 +538,7 @@ def assert_outputs_are_printed(expected_outputs: dict[str, 
str], stderr: str):
                 "docs-build": "false",
                 "run-kubernetes-tests": "false",
                 "upgrade-to-newer-dependencies": "false",
-                "parallel-test-types-list-as-string": "Always 
Providers[common.io]",
+                "parallel-test-types-list-as-string": "Always 
Providers[common.io,openlineage]",
             },
             id="Only Always and Common.IO tests should run when only common.io 
and tests/always changed",
         ),
diff --git a/docs/apache-airflow/core-concepts/objectstorage.rst 
b/docs/apache-airflow/core-concepts/objectstorage.rst
index f5b113a861..5a3919e433 100644
--- a/docs/apache-airflow/core-concepts/objectstorage.rst
+++ b/docs/apache-airflow/core-concepts/objectstorage.rst
@@ -76,7 +76,8 @@ object you want to interact with. For example, to point to a 
bucket in s3, you w
 
     base = ObjectStoragePath("s3://aws_default@my-bucket/")
 
-The username part of the URI is optional. It can alternatively be passed in as 
a separate keyword argument:
+The username part of the URI represents the Airflow connection id and is 
optional. It can alternatively be passed
+in as a separate keyword argument:
 
 .. code-block:: python
 
@@ -242,6 +243,11 @@ key
 
 Returns the object key.
 
+namespace
+^^^^^^^^^
+
+Returns the namespace of the object. Typically this is the protocol, like 
``s3://`` with the
+bucket name.
 
 path
 ^^^^
diff --git a/generated/provider_dependencies.json 
b/generated/provider_dependencies.json
index d92a2559ea..6769f237a5 100644
--- a/generated/provider_dependencies.json
+++ b/generated/provider_dependencies.json
@@ -309,7 +309,9 @@
     "deps": [
       "apache-airflow>=2.8.0"
     ],
-    "cross-providers-deps": [],
+    "cross-providers-deps": [
+      "openlineage"
+    ],
     "excluded-python-versions": [],
     "state": "ready"
   },
diff --git a/tests/io/test_path.py b/tests/io/test_path.py
index 7c97c7b2b9..c87a5e97e0 100644
--- a/tests/io/test_path.py
+++ b/tests/io/test_path.py
@@ -72,18 +72,21 @@ class TestFs:
     def test_init_objectstoragepath(self):
         path = ObjectStoragePath("file://bucket/key/part1/part2")
         assert path.bucket == "bucket"
-        assert path.key == "/key/part1/part2"
+        assert path.key == "key/part1/part2"
         assert path.protocol == "file"
+        assert path.path == "bucket/key/part1/part2"
 
         path2 = ObjectStoragePath(path / "part3")
         assert path2.bucket == "bucket"
-        assert path2.key == "/key/part1/part2/part3"
+        assert path2.key == "key/part1/part2/part3"
         assert path2.protocol == "file"
+        assert path2.path == "bucket/key/part1/part2/part3"
 
         path3 = ObjectStoragePath(path2 / "2023")
         assert path3.bucket == "bucket"
-        assert path3.key == "/key/part1/part2/part3/2023"
+        assert path3.key == "key/part1/part2/part3/2023"
         assert path3.protocol == "file"
+        assert path3.path == "bucket/key/part1/part2/part3/2023"
 
     def test_read_write(self):
         o = ObjectStoragePath(f"file:///tmp/{str(uuid.uuid4())}")
@@ -171,7 +174,7 @@ class TestFs:
         o = ObjectStoragePath(f"{protocol}://{bucket}/{key}")
         assert o.bucket == bucket
         assert o.container == bucket
-        assert o.key == f"/{key}"
+        assert o.key == f"{key}"
         assert o.protocol == protocol
 
     def test_cwd_home(self):
diff --git a/tests/providers/common/io/operators/test_file_transfer.py 
b/tests/providers/common/io/operators/test_file_transfer.py
index e2cb68ca43..c48a61c314 100644
--- a/tests/providers/common/io/operators/test_file_transfer.py
+++ b/tests/providers/common/io/operators/test_file_transfer.py
@@ -19,6 +19,8 @@ from __future__ import annotations
 
 from unittest import mock
 
+from openlineage.client.run import Dataset
+
 from airflow.providers.common.io.operators.file_transfer import 
FileTransferOperator
 
 
@@ -45,3 +47,25 @@ def test_file_transfer_copy():
         )
         source_path.copy.assert_called_once_with(target_path)
         target_path.copy.assert_not_called()
+
+
+def test_get_openlineage_facets_on_start():
+    src_bucket = "src-bucket"
+    src_key = "src-key"
+    dst_bucket = "dst-bucket"
+    dst_key = "dst-key"
+
+    expected_input = Dataset(namespace=f"s3://{src_bucket}", name=src_key)
+    expected_output = Dataset(namespace=f"s3://{dst_bucket}", name=dst_key)
+
+    op = FileTransferOperator(
+        task_id="test",
+        src=f"s3://{src_bucket}/{src_key}",
+        dst=f"s3://{dst_bucket}/{dst_key}",
+    )
+
+    lineage = op.get_openlineage_facets_on_start()
+    assert len(lineage.inputs) == 1
+    assert len(lineage.outputs) == 1
+    assert lineage.inputs[0] == expected_input
+    assert lineage.outputs[0] == expected_output

Reply via email to