This is an automated email from the ASF dual-hosted git repository.
mobuchowski pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git
The following commit(s) were added to refs/heads/main by this push:
new 4938ac04b6 feat: Add OpenLineage support for File and User Airflow's
lineage entities (#37744)
4938ac04b6 is described below
commit 4938ac04b606ab00d70c3b887e08f76a2b3ea857
Author: Kacper Muda <[email protected]>
AuthorDate: Thu Feb 29 11:40:12 2024 +0100
feat: Add OpenLineage support for File and User Airflow's lineage entities
(#37744)
---
.../providers/openlineage/extractors/manager.py | 76 +++++++++-
.../guides/developer.rst | 62 ++++----
tests/always/test_project_structure.py | 1 -
.../openlineage/extractors/test_manager.py | 159 +++++++++++++++++++++
4 files changed, 265 insertions(+), 33 deletions(-)
diff --git a/airflow/providers/openlineage/extractors/manager.py
b/airflow/providers/openlineage/extractors/manager.py
index a5654d8bbf..cb77c796ba 100644
--- a/airflow/providers/openlineage/extractors/manager.py
+++ b/airflow/providers/openlineage/extractors/manager.py
@@ -34,6 +34,9 @@ from airflow.utils.log.logging_mixin import LoggingMixin
from airflow.utils.module_loading import import_string
if TYPE_CHECKING:
+ from openlineage.client.run import Dataset
+
+ from airflow.lineage.entities import Table
from airflow.models import Operator
@@ -178,19 +181,78 @@ class ExtractorManager(LoggingMixin):
task_metadata.outputs.append(d)
@staticmethod
- def convert_to_ol_dataset(obj):
+ def convert_to_ol_dataset_from_object_storage_uri(uri: str) -> Dataset |
None:
+ from urllib.parse import urlparse
+
+ from openlineage.client.run import Dataset
+
+ try:
+ scheme, netloc, path, params, _, _ = urlparse(uri)
+ except Exception:
+ return None
+ if scheme.startswith("s3"):
+ return Dataset(namespace=f"s3://{netloc}", name=path.lstrip("/"))
+ elif scheme.startswith(("gcs", "gs")):
+ return Dataset(namespace=f"gs://{netloc}", name=path.lstrip("/"))
+ elif "/" not in uri:
+ return None
+ return Dataset(namespace=scheme, name=f"{netloc}{path}")
+
+ @staticmethod
+ def convert_to_ol_dataset_from_table(table: Table) -> Dataset:
+ from openlineage.client.facet import (
+ BaseFacet,
+ OwnershipDatasetFacet,
+ OwnershipDatasetFacetOwners,
+ SchemaDatasetFacet,
+ SchemaField,
+ )
+ from openlineage.client.run import Dataset
+
+ facets: dict[str, BaseFacet] = {}
+ if table.columns:
+ facets["schema"] = SchemaDatasetFacet(
+ fields=[
+ SchemaField(
+ name=column.name,
+ type=column.data_type,
+ description=column.description,
+ )
+ for column in table.columns
+ ]
+ )
+ if table.owners:
+ facets["ownership"] = OwnershipDatasetFacet(
+ owners=[
+ OwnershipDatasetFacetOwners(
+ # f.e. "user:John Doe <[email protected]>" or just
"user:<[email protected]>"
+ name=f"user:"
+ f"{user.first_name + ' ' if user.first_name else ''}"
+ f"{user.last_name + ' ' if user.last_name else ''}"
+ f"<{user.email}>",
+ type="",
+ )
+ for user in table.owners
+ ]
+ )
+ return Dataset(
+ namespace=f"{table.cluster}",
+ name=f"{table.database}.{table.name}",
+ facets=facets,
+ )
+
+ @staticmethod
+ def convert_to_ol_dataset(obj) -> Dataset | None:
from openlineage.client.run import Dataset
- from airflow.lineage.entities import Table
+ from airflow.lineage.entities import File, Table
if isinstance(obj, Dataset):
return obj
elif isinstance(obj, Table):
- return Dataset(
- namespace=f"{obj.cluster}",
- name=f"{obj.database}.{obj.name}",
- facets={},
- )
+ return ExtractorManager.convert_to_ol_dataset_from_table(obj)
+ elif isinstance(obj, File):
+ return
ExtractorManager.convert_to_ol_dataset_from_object_storage_uri(obj.url)
else:
return None
diff --git a/docs/apache-airflow-providers-openlineage/guides/developer.rst
b/docs/apache-airflow-providers-openlineage/guides/developer.rst
index 035774e69a..3c7518d6cb 100644
--- a/docs/apache-airflow-providers-openlineage/guides/developer.rst
+++ b/docs/apache-airflow-providers-openlineage/guides/developer.rst
@@ -364,10 +364,7 @@ Airflow allows Operators to track lineage by specifying
the input and outputs of
`inlets and outlets
<https://airflow.apache.org/docs/apache-airflow/stable/administration-and-deployment/lineage.html#lineage>`_.
OpenLineage will, by default, use inlets and outlets as input/output datasets
if it cannot find any successful extraction from the OpenLineage methods or the
Extractors.
-.. important::
-
- Airflow supports inlets and outlets to be either a Table, Column, File or
User entity. However, currently OpenLineage only extracts lineage via Table
entity
-
+Airflow supports inlets and outlets to be either a Table, Column, File or User
entity and so does OpenLineage.
Example
^^^^^^^
@@ -379,33 +376,50 @@ An Operator inside the Airflow DAG can be annotated with
inlets and outlets like
"""Example DAG demonstrating the usage of the extraction via Inlets and
Outlets."""
import pendulum
- import datetime
from airflow import DAG
from airflow.operators.bash import BashOperator
- from airflow.lineage.entities import Table, File
+ from airflow.lineage.entities import Table, File, Column, User
+
+
+ t1 = Table(
+ cluster="c1",
+ database="d1",
+ name="t1",
+ owners=[User(email="[email protected]", first_name="Joe", last_name="Doe")],
+ )
+ t2 = Table(
+ cluster="c1",
+ database="d1",
+ name="t2",
+ columns=[
+ Column(name="col1", description="desc1", data_type="type1"),
+ Column(name="col2", description="desc2", data_type="type2"),
+ ],
+ owners=[
+ User(email="[email protected]", first_name="Mike",
last_name="Smith"),
+ User(email="[email protected]", first_name="Theo"),
+ User(email="[email protected]", last_name="Smith"),
+ User(email="[email protected]"),
+ ],
+ )
+ t3 = Table(
+ cluster="c1",
+ database="d1",
+ name="t3",
+ columns=[
+ Column(name="col3", description="desc3", data_type="type3"),
+ Column(name="col4", description="desc4", data_type="type4"),
+ ],
+ )
+ t4 = Table(cluster="c1", database="d1", name="t4")
+ f1 = File(url="s3://bucket/dir/file1")
- def create_table(cluster, database, name):
- return Table(
- database=database,
- cluster=cluster,
- name=name,
- )
-
-
- t1 = create_table("c1", "d1", "t1")
- t2 = create_table("c1", "d1", "t2")
- t3 = create_table("c1", "d1", "t3")
- t4 = create_table("c1", "d1", "t4")
- f1 = File(url="http://randomfile")
-
with DAG(
dag_id="example_operator",
- schedule_interval="0 0 * * *",
+ schedule_interval="@once",
start_date=pendulum.datetime(2021, 1, 1, tz="UTC"),
- dagrun_timeout=datetime.timedelta(minutes=60),
- params={"example_key": "example_value"},
) as dag:
task1 = BashOperator(
task_id="task_1_with_inlet_outlet",
@@ -426,8 +440,6 @@ An Operator inside the Airflow DAG can be annotated with
inlets and outlets like
if __name__ == "__main__":
dag.cli()
-Note that the ``File`` entity, defined in the example code, is not captured by
the lineage event currently as described in the ``important`` box above.
-
Conversion from Airflow Table entity to OpenLineage Dataset is made in the
following way:
- ``CLUSTER`` of the table entity becomes the namespace of OpenLineage's
Dataset
- The name of the dataset is formed by ``{{DATABASE}}.{{NAME}}`` where
``DATABASE`` and ``NAME`` are attributes specified by Airflow's Table entity.
diff --git a/tests/always/test_project_structure.py
b/tests/always/test_project_structure.py
index bab56abead..85c2c30655 100644
--- a/tests/always/test_project_structure.py
+++ b/tests/always/test_project_structure.py
@@ -159,7 +159,6 @@ class TestProjectStructure:
"tests/providers/microsoft/azure/operators/test_adls.py",
"tests/providers/microsoft/azure/transfers/test_azure_blob_to_gcs.py",
"tests/providers/mongo/sensors/test_mongo.py",
- "tests/providers/openlineage/extractors/test_manager.py",
"tests/providers/openlineage/plugins/test_adapter.py",
"tests/providers/openlineage/plugins/test_facets.py",
"tests/providers/openlineage/test_sqlparser.py",
diff --git a/tests/providers/openlineage/extractors/test_manager.py
b/tests/providers/openlineage/extractors/test_manager.py
new file mode 100644
index 0000000000..d1f794b49d
--- /dev/null
+++ b/tests/providers/openlineage/extractors/test_manager.py
@@ -0,0 +1,159 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+from __future__ import annotations
+
+import pytest
+from openlineage.client.facet import (
+ OwnershipDatasetFacet,
+ OwnershipDatasetFacetOwners,
+ SchemaDatasetFacet,
+ SchemaField,
+)
+from openlineage.client.run import Dataset
+
+from airflow.lineage.entities import Column, File, Table, User
+from airflow.providers.openlineage.extractors.manager import ExtractorManager
+
+
[email protected](
+ ("uri", "dataset"),
+ (
+ ("s3://bucket1/dir1/file1", Dataset(namespace="s3://bucket1",
name="dir1/file1")),
+ ("gs://bucket2/dir2/file2", Dataset(namespace="gs://bucket2",
name="dir2/file2")),
+ ("gcs://bucket3/dir3/file3", Dataset(namespace="gs://bucket3",
name="dir3/file3")),
+ ("https://test.com", Dataset(namespace="https", name="test.com")),
+ ("https://test.com?param1=test1¶m2=test2",
Dataset(namespace="https", name="test.com")),
+ ("not_an_url", None),
+ ),
+)
+def test_convert_to_ol_dataset_from_object_storage_uri(uri, dataset):
+ result =
ExtractorManager.convert_to_ol_dataset_from_object_storage_uri(uri)
+ assert result == dataset
+
+
[email protected](
+ ("obj", "dataset"),
+ (
+ (
+ Dataset(namespace="n1", name="f1"),
+ Dataset(namespace="n1", name="f1"),
+ ),
+ (File(url="s3://bucket1/dir1/file1"),
Dataset(namespace="s3://bucket1", name="dir1/file1")),
+ (File(url="gs://bucket2/dir2/file2"),
Dataset(namespace="gs://bucket2", name="dir2/file2")),
+ (File(url="https://test.com"), Dataset(namespace="https",
name="test.com")),
+ (Table(cluster="c1", database="d1", name="t1"),
Dataset(namespace="c1", name="d1.t1")),
+ ("gs://bucket2/dir2/file2", None),
+ ("not_an_url", None),
+ ),
+)
+def test_convert_to_ol_dataset(obj, dataset):
+ result = ExtractorManager.convert_to_ol_dataset(obj)
+ assert result == dataset
+
+
+def test_convert_to_ol_dataset_from_table_with_columns_and_owners():
+ table = Table(
+ cluster="c1",
+ database="d1",
+ name="t1",
+ columns=[
+ Column(name="col1", description="desc1", data_type="type1"),
+ Column(name="col2", description="desc2", data_type="type2"),
+ ],
+ owners=[
+ User(email="[email protected]", first_name="Mike",
last_name="Smith"),
+ User(email="[email protected]", first_name="Theo"),
+ User(email="[email protected]", last_name="Smith"),
+ User(email="[email protected]"),
+ ],
+ )
+ expected_facets = {
+ "schema": SchemaDatasetFacet(
+ fields=[
+ SchemaField(
+ name="col1",
+ type="type1",
+ description="desc1",
+ ),
+ SchemaField(
+ name="col2",
+ type="type2",
+ description="desc2",
+ ),
+ ]
+ ),
+ "ownership": OwnershipDatasetFacet(
+ owners=[
+ OwnershipDatasetFacetOwners(name="user:Mike Smith
<[email protected]>", type=""),
+ OwnershipDatasetFacetOwners(name="user:Theo
<[email protected]>", type=""),
+ OwnershipDatasetFacetOwners(name="user:Smith
<[email protected]>", type=""),
+ OwnershipDatasetFacetOwners(name="user:<[email protected]>",
type=""),
+ ]
+ ),
+ }
+ result = ExtractorManager.convert_to_ol_dataset_from_table(table)
+ assert result.namespace == "c1"
+ assert result.name == "d1.t1"
+ assert result.facets == expected_facets
+
+
+def test_convert_to_ol_dataset_table():
+ table = Table(
+ cluster="c1",
+ database="d1",
+ name="t1",
+ columns=[
+ Column(name="col1", description="desc1", data_type="type1"),
+ Column(name="col2", description="desc2", data_type="type2"),
+ ],
+ owners=[
+ User(email="[email protected]", first_name="Mike",
last_name="Smith"),
+ User(email="[email protected]", first_name="Theo"),
+ User(email="[email protected]", last_name="Smith"),
+ User(email="[email protected]"),
+ ],
+ )
+ expected_facets = {
+ "schema": SchemaDatasetFacet(
+ fields=[
+ SchemaField(
+ name="col1",
+ type="type1",
+ description="desc1",
+ ),
+ SchemaField(
+ name="col2",
+ type="type2",
+ description="desc2",
+ ),
+ ]
+ ),
+ "ownership": OwnershipDatasetFacet(
+ owners=[
+ OwnershipDatasetFacetOwners(name="user:Mike Smith
<[email protected]>", type=""),
+ OwnershipDatasetFacetOwners(name="user:Theo
<[email protected]>", type=""),
+ OwnershipDatasetFacetOwners(name="user:Smith
<[email protected]>", type=""),
+ OwnershipDatasetFacetOwners(name="user:<[email protected]>",
type=""),
+ ]
+ ),
+ }
+
+ result = ExtractorManager.convert_to_ol_dataset(table)
+ assert result.namespace == "c1"
+ assert result.name == "d1.t1"
+ assert result.facets == expected_facets