This is an automated email from the ASF dual-hosted git repository.

potiuk pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git


The following commit(s) were added to refs/heads/main by this push:
     new 61710518ab fix: OpenLineage in FileTransferOperator for Airflow 2.8 
(#39755)
61710518ab is described below

commit 61710518abf5a59e45201f8ff8850c104f12c903
Author: Kacper Muda <[email protected]>
AuthorDate: Wed May 22 15:47:26 2024 +0200

    fix: OpenLineage in FileTransferOperator for Airflow 2.8 (#39755)
    
    Signed-off-by: Kacper Muda <[email protected]>
---
 .../providers/common/io/operators/file_transfer.py | 14 +++++++++--
 .../common/io/operators/test_file_transfer.py      | 27 ++++++++++++++++++++++
 2 files changed, 39 insertions(+), 2 deletions(-)

diff --git a/airflow/providers/common/io/operators/file_transfer.py 
b/airflow/providers/common/io/operators/file_transfer.py
index e79212882d..9a396c86e4 100644
--- a/airflow/providers/common/io/operators/file_transfer.py
+++ b/airflow/providers/common/io/operators/file_transfer.py
@@ -79,11 +79,21 @@ class FileTransferOperator(BaseOperator):
 
         from airflow.providers.openlineage.extractors import OperatorLineage
 
+        def _prepare_ol_dataset(path: ObjectStoragePath) -> Dataset:
+            if hasattr(path, "namespace"):
+                # namespace has been added in Airflow 2.9.0; #36410
+                return Dataset(namespace=path.namespace, name=path.key)
+            # manually recreating namespace
+            return Dataset(
+                namespace=f"{path.protocol}://{path.bucket}" if path.bucket 
else path.protocol,
+                name=path.key.lstrip(path.sep),
+            )
+
         src: ObjectStoragePath = self._get_path(self.src, self.source_conn_id)
         dst: ObjectStoragePath = self._get_path(self.dst, self.dst_conn_id)
 
-        input_dataset = Dataset(namespace=src.namespace, name=src.key)
-        output_dataset = Dataset(namespace=dst.namespace, name=dst.key)
+        input_dataset = _prepare_ol_dataset(src)
+        output_dataset = _prepare_ol_dataset(dst)
 
         return OperatorLineage(
             inputs=[input_dataset],
diff --git a/tests/providers/common/io/operators/test_file_transfer.py 
b/tests/providers/common/io/operators/test_file_transfer.py
index c48a61c314..2dc204eb3d 100644
--- a/tests/providers/common/io/operators/test_file_transfer.py
+++ b/tests/providers/common/io/operators/test_file_transfer.py
@@ -69,3 +69,30 @@ def test_get_openlineage_facets_on_start():
     assert len(lineage.outputs) == 1
     assert lineage.inputs[0] == expected_input
     assert lineage.outputs[0] == expected_output
+
+
+def test_get_openlineage_facets_on_start_without_namespace():
+    mock_src = mock.MagicMock(key="/src_key", protocol="s3", 
bucket="src_bucket", sep="/")
+    mock_dst = mock.MagicMock(key="dst_key", protocol="gcs", bucket="", 
sep="/")
+
+    # Ensure the `namespace` attribute does not exist
+    if hasattr(mock_src, "namespace"):
+        delattr(mock_src, "namespace")
+    if hasattr(mock_dst, "namespace"):
+        delattr(mock_dst, "namespace")
+
+    operator = FileTransferOperator(
+        task_id="task",
+        src=mock_src,
+        dst=mock_dst,
+        source_conn_id="source_conn_id",
+        dest_conn_id="dest_conn_id",
+    )
+    # Make sure the _get_path method returns the mock objects
+    operator._get_path = mock.Mock(side_effect=[mock_src, mock_dst])
+
+    lineage = operator.get_openlineage_facets_on_start()
+    assert len(lineage.inputs) == 1
+    assert len(lineage.outputs) == 1
+    assert lineage.inputs[0] == Dataset(namespace="s3://src_bucket", 
name="src_key")
+    assert lineage.outputs[0] == Dataset(namespace="gcs", name="dst_key")

Reply via email to