This is an automated email from the ASF dual-hosted git repository.
bolke pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git
The following commit(s) were added to refs/heads/main by this push:
new 33996a49f1 Add support for openlineage to AFS and common.io (#36410)
33996a49f1 is described below
commit 33996a49f15cff35b6c23f245768243167944db6
Author: Bolke de Bruin <[email protected]>
AuthorDate: Thu Jan 4 18:45:25 2024 +0100
Add support for openlineage to AFS and common.io (#36410)
This adds low level support for open lineage to ObjectStorage
and integrates it into common.io.
---
airflow/io/path.py | 11 ++++++-
.../providers/common/io/operators/file_transfer.py | 37 +++++++++++++++-------
dev/breeze/tests/test_selective_checks.py | 4 +--
.../apache-airflow/core-concepts/objectstorage.rst | 8 ++++-
generated/provider_dependencies.json | 4 ++-
tests/io/test_path.py | 11 ++++---
.../common/io/operators/test_file_transfer.py | 24 ++++++++++++++
7 files changed, 78 insertions(+), 21 deletions(-)
diff --git a/airflow/io/path.py b/airflow/io/path.py
index bd7c320653..d65d837e7e 100644
--- a/airflow/io/path.py
+++ b/airflow/io/path.py
@@ -52,6 +52,9 @@ class _AirflowCloudAccessor(_CloudAccessor):
conn_id: str | None = None,
**kwargs: typing.Any,
) -> None:
+ # warning: we are not calling super().__init__ here
+ # as it will try to create a new fs from a different
+ # set if registered filesystems
if parsed_url and parsed_url.scheme:
self._store = attach(parsed_url.scheme, conn_id)
else:
@@ -173,10 +176,16 @@ class ObjectStoragePath(CloudPath):
@property
def key(self) -> str:
if self._url:
- return self._url.path
+ # per convention, we strip the leading slashes to ensure a
relative key is returned
+ # we keep the trailing slash to allow for directory-like semantics
+ return self._url.path.lstrip(self.sep)
else:
return ""
+ @property
+ def namespace(self) -> str:
+ return f"{self.protocol}://{self.bucket}" if self.bucket else
self.protocol
+
def stat(self) -> stat_result: # type: ignore[override]
"""Call ``stat`` and return the result."""
return stat_result(
diff --git a/airflow/providers/common/io/operators/file_transfer.py
b/airflow/providers/common/io/operators/file_transfer.py
index e720f78666..e79212882d 100644
--- a/airflow/providers/common/io/operators/file_transfer.py
+++ b/airflow/providers/common/io/operators/file_transfer.py
@@ -23,6 +23,7 @@ from airflow.io.path import ObjectStoragePath
from airflow.models import BaseOperator
if TYPE_CHECKING:
+ from airflow.providers.openlineage.extractors import OperatorLineage
from airflow.utils.context import Context
@@ -64,21 +65,33 @@ class FileTransferOperator(BaseOperator):
self.overwrite = overwrite
def execute(self, context: Context) -> None:
- src: ObjectStoragePath
- dst: ObjectStoragePath
-
- if isinstance(self.src, str):
- src = ObjectStoragePath(self.src, conn_id=self.source_conn_id)
- else:
- src = self.src
-
- if isinstance(self.dst, str):
- dst = ObjectStoragePath(self.dst, conn_id=self.dst_conn_id)
- else:
- dst = self.dst
+ src: ObjectStoragePath = self._get_path(self.src, self.source_conn_id)
+ dst: ObjectStoragePath = self._get_path(self.dst, self.dst_conn_id)
if not self.overwrite:
if dst.exists() and dst.is_file():
raise ValueError(f"Destination {dst} already exists")
src.copy(dst)
+
+ def get_openlineage_facets_on_start(self) -> OperatorLineage:
+ from openlineage.client.run import Dataset
+
+ from airflow.providers.openlineage.extractors import OperatorLineage
+
+ src: ObjectStoragePath = self._get_path(self.src, self.source_conn_id)
+ dst: ObjectStoragePath = self._get_path(self.dst, self.dst_conn_id)
+
+ input_dataset = Dataset(namespace=src.namespace, name=src.key)
+ output_dataset = Dataset(namespace=dst.namespace, name=dst.key)
+
+ return OperatorLineage(
+ inputs=[input_dataset],
+ outputs=[output_dataset],
+ )
+
+ @staticmethod
+ def _get_path(path: str | ObjectStoragePath, conn_id: str | None) ->
ObjectStoragePath:
+ if isinstance(path, str):
+ return ObjectStoragePath(path, conn_id=conn_id)
+ return path
diff --git a/dev/breeze/tests/test_selective_checks.py
b/dev/breeze/tests/test_selective_checks.py
index 495c98fd87..8c11e2628e 100644
--- a/dev/breeze/tests/test_selective_checks.py
+++ b/dev/breeze/tests/test_selective_checks.py
@@ -525,7 +525,7 @@ def assert_outputs_are_printed(expected_outputs: dict[str,
str], stderr: str):
"tests/providers/common/io/operators/test_file_transfer.py",
),
{
- "affected-providers-list-as-string": "common.io",
+ "affected-providers-list-as-string": "common.io openlineage",
"all-python-versions": "['3.8']",
"all-python-versions-list-as-string": "3.8",
"python-versions": "['3.8']",
@@ -538,7 +538,7 @@ def assert_outputs_are_printed(expected_outputs: dict[str,
str], stderr: str):
"docs-build": "false",
"run-kubernetes-tests": "false",
"upgrade-to-newer-dependencies": "false",
- "parallel-test-types-list-as-string": "Always
Providers[common.io]",
+ "parallel-test-types-list-as-string": "Always
Providers[common.io,openlineage]",
},
id="Only Always and Common.IO tests should run when only common.io
and tests/always changed",
),
diff --git a/docs/apache-airflow/core-concepts/objectstorage.rst
b/docs/apache-airflow/core-concepts/objectstorage.rst
index f5b113a861..5a3919e433 100644
--- a/docs/apache-airflow/core-concepts/objectstorage.rst
+++ b/docs/apache-airflow/core-concepts/objectstorage.rst
@@ -76,7 +76,8 @@ object you want to interact with. For example, to point to a
bucket in s3, you w
base = ObjectStoragePath("s3://aws_default@my-bucket/")
-The username part of the URI is optional. It can alternatively be passed in as
a separate keyword argument:
+The username part of the URI represents the Airflow connection id and is
optional. It can alternatively be passed
+in as a separate keyword argument:
.. code-block:: python
@@ -242,6 +243,11 @@ key
Returns the object key.
+namespace
+^^^^^^^^^
+
+Returns the namespace of the object. Typically this is the protocol, like
``s3://`` with the
+bucket name.
path
^^^^
diff --git a/generated/provider_dependencies.json
b/generated/provider_dependencies.json
index fe07b77f44..0366b3ebab 100644
--- a/generated/provider_dependencies.json
+++ b/generated/provider_dependencies.json
@@ -300,7 +300,9 @@
"deps": [
"apache-airflow>=2.8.0"
],
- "cross-providers-deps": [],
+ "cross-providers-deps": [
+ "openlineage"
+ ],
"excluded-python-versions": [],
"state": "ready"
},
diff --git a/tests/io/test_path.py b/tests/io/test_path.py
index af6044c150..ab143b038e 100644
--- a/tests/io/test_path.py
+++ b/tests/io/test_path.py
@@ -72,18 +72,21 @@ class TestFs:
def test_init_objectstoragepath(self):
path = ObjectStoragePath("file://bucket/key/part1/part2")
assert path.bucket == "bucket"
- assert path.key == "/key/part1/part2"
+ assert path.key == "key/part1/part2"
assert path.protocol == "file"
+ assert path.path == "bucket/key/part1/part2"
path2 = ObjectStoragePath(path / "part3")
assert path2.bucket == "bucket"
- assert path2.key == "/key/part1/part2/part3"
+ assert path2.key == "key/part1/part2/part3"
assert path2.protocol == "file"
+ assert path2.path == "bucket/key/part1/part2/part3"
path3 = ObjectStoragePath(path2 / "2023")
assert path3.bucket == "bucket"
- assert path3.key == "/key/part1/part2/part3/2023"
+ assert path3.key == "key/part1/part2/part3/2023"
assert path3.protocol == "file"
+ assert path3.path == "bucket/key/part1/part2/part3/2023"
def test_read_write(self):
o = ObjectStoragePath(f"file:///tmp/{str(uuid.uuid4())}")
@@ -171,7 +174,7 @@ class TestFs:
o = ObjectStoragePath(f"{protocol}://{bucket}/{key}")
assert o.bucket == bucket
assert o.container == bucket
- assert o.key == f"/{key}"
+ assert o.key == f"{key}"
assert o.protocol == protocol
def test_cwd_home(self):
diff --git a/tests/providers/common/io/operators/test_file_transfer.py
b/tests/providers/common/io/operators/test_file_transfer.py
index e2cb68ca43..c48a61c314 100644
--- a/tests/providers/common/io/operators/test_file_transfer.py
+++ b/tests/providers/common/io/operators/test_file_transfer.py
@@ -19,6 +19,8 @@ from __future__ import annotations
from unittest import mock
+from openlineage.client.run import Dataset
+
from airflow.providers.common.io.operators.file_transfer import
FileTransferOperator
@@ -45,3 +47,25 @@ def test_file_transfer_copy():
)
source_path.copy.assert_called_once_with(target_path)
target_path.copy.assert_not_called()
+
+
+def test_get_openlineage_facets_on_start():
+ src_bucket = "src-bucket"
+ src_key = "src-key"
+ dst_bucket = "dst-bucket"
+ dst_key = "dst-key"
+
+ expected_input = Dataset(namespace=f"s3://{src_bucket}", name=src_key)
+ expected_output = Dataset(namespace=f"s3://{dst_bucket}", name=dst_key)
+
+ op = FileTransferOperator(
+ task_id="test",
+ src=f"s3://{src_bucket}/{src_key}",
+ dst=f"s3://{dst_bucket}/{dst_key}",
+ )
+
+ lineage = op.get_openlineage_facets_on_start()
+ assert len(lineage.inputs) == 1
+ assert len(lineage.outputs) == 1
+ assert lineage.inputs[0] == expected_input
+ assert lineage.outputs[0] == expected_output