This is an automated email from the ASF dual-hosted git repository.
potiuk pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git
The following commit(s) were added to refs/heads/main by this push:
new 61710518ab fix: OpenLineage in FileTransferOperator for Airflow 2.8
(#39755)
61710518ab is described below
commit 61710518abf5a59e45201f8ff8850c104f12c903
Author: Kacper Muda <[email protected]>
AuthorDate: Wed May 22 15:47:26 2024 +0200
fix: OpenLineage in FileTransferOperator for Airflow 2.8 (#39755)
Signed-off-by: Kacper Muda <[email protected]>
---
.../providers/common/io/operators/file_transfer.py | 14 +++++++++--
.../common/io/operators/test_file_transfer.py | 27 ++++++++++++++++++++++
2 files changed, 39 insertions(+), 2 deletions(-)
diff --git a/airflow/providers/common/io/operators/file_transfer.py
b/airflow/providers/common/io/operators/file_transfer.py
index e79212882d..9a396c86e4 100644
--- a/airflow/providers/common/io/operators/file_transfer.py
+++ b/airflow/providers/common/io/operators/file_transfer.py
@@ -79,11 +79,21 @@ class FileTransferOperator(BaseOperator):
from airflow.providers.openlineage.extractors import OperatorLineage
+ def _prepare_ol_dataset(path: ObjectStoragePath) -> Dataset:
+ if hasattr(path, "namespace"):
+ # namespace has been added in Airflow 2.9.0; #36410
+ return Dataset(namespace=path.namespace, name=path.key)
+ # manually recreating namespace
+ return Dataset(
+ namespace=f"{path.protocol}://{path.bucket}" if path.bucket
else path.protocol,
+ name=path.key.lstrip(path.sep),
+ )
+
src: ObjectStoragePath = self._get_path(self.src, self.source_conn_id)
dst: ObjectStoragePath = self._get_path(self.dst, self.dst_conn_id)
- input_dataset = Dataset(namespace=src.namespace, name=src.key)
- output_dataset = Dataset(namespace=dst.namespace, name=dst.key)
+ input_dataset = _prepare_ol_dataset(src)
+ output_dataset = _prepare_ol_dataset(dst)
return OperatorLineage(
inputs=[input_dataset],
diff --git a/tests/providers/common/io/operators/test_file_transfer.py
b/tests/providers/common/io/operators/test_file_transfer.py
index c48a61c314..2dc204eb3d 100644
--- a/tests/providers/common/io/operators/test_file_transfer.py
+++ b/tests/providers/common/io/operators/test_file_transfer.py
@@ -69,3 +69,30 @@ def test_get_openlineage_facets_on_start():
assert len(lineage.outputs) == 1
assert lineage.inputs[0] == expected_input
assert lineage.outputs[0] == expected_output
+
+
+def test_get_openlineage_facets_on_start_without_namespace():
+ mock_src = mock.MagicMock(key="/src_key", protocol="s3",
bucket="src_bucket", sep="/")
+ mock_dst = mock.MagicMock(key="dst_key", protocol="gcs", bucket="",
sep="/")
+
+ # Ensure the `namespace` attribute does not exist
+ if hasattr(mock_src, "namespace"):
+ delattr(mock_src, "namespace")
+ if hasattr(mock_dst, "namespace"):
+ delattr(mock_dst, "namespace")
+
+ operator = FileTransferOperator(
+ task_id="task",
+ src=mock_src,
+ dst=mock_dst,
+ source_conn_id="source_conn_id",
+ dest_conn_id="dest_conn_id",
+ )
+ # Make sure the _get_path method returns the mock objects
+ operator._get_path = mock.Mock(side_effect=[mock_src, mock_dst])
+
+ lineage = operator.get_openlineage_facets_on_start()
+ assert len(lineage.inputs) == 1
+ assert len(lineage.outputs) == 1
+ assert lineage.inputs[0] == Dataset(namespace="s3://src_bucket",
name="src_key")
+ assert lineage.outputs[0] == Dataset(namespace="gcs", name="dst_key")