This is an automated email from the ASF dual-hosted git repository.

mobuchowski pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git


The following commit(s) were added to refs/heads/main by this push:
     new 4938ac04b6 feat: Add OpenLineage support for File and User Airflow's 
lineage entities (#37744)
4938ac04b6 is described below

commit 4938ac04b606ab00d70c3b887e08f76a2b3ea857
Author: Kacper Muda <[email protected]>
AuthorDate: Thu Feb 29 11:40:12 2024 +0100

    feat: Add OpenLineage support for File and User Airflow's lineage entities 
(#37744)
---
 .../providers/openlineage/extractors/manager.py    |  76 +++++++++-
 .../guides/developer.rst                           |  62 ++++----
 tests/always/test_project_structure.py             |   1 -
 .../openlineage/extractors/test_manager.py         | 159 +++++++++++++++++++++
 4 files changed, 265 insertions(+), 33 deletions(-)

diff --git a/airflow/providers/openlineage/extractors/manager.py 
b/airflow/providers/openlineage/extractors/manager.py
index a5654d8bbf..cb77c796ba 100644
--- a/airflow/providers/openlineage/extractors/manager.py
+++ b/airflow/providers/openlineage/extractors/manager.py
@@ -34,6 +34,9 @@ from airflow.utils.log.logging_mixin import LoggingMixin
 from airflow.utils.module_loading import import_string
 
 if TYPE_CHECKING:
+    from openlineage.client.run import Dataset
+
+    from airflow.lineage.entities import Table
     from airflow.models import Operator
 
 
@@ -178,19 +181,78 @@ class ExtractorManager(LoggingMixin):
                 task_metadata.outputs.append(d)
 
     @staticmethod
-    def convert_to_ol_dataset(obj):
+    def convert_to_ol_dataset_from_object_storage_uri(uri: str) -> Dataset | 
None:
+        from urllib.parse import urlparse
+
+        from openlineage.client.run import Dataset
+
+        try:
+            scheme, netloc, path, params, _, _ = urlparse(uri)
+        except Exception:
+            return None
+        if scheme.startswith("s3"):
+            return Dataset(namespace=f"s3://{netloc}", name=path.lstrip("/"))
+        elif scheme.startswith(("gcs", "gs")):
+            return Dataset(namespace=f"gs://{netloc}", name=path.lstrip("/"))
+        elif "/" not in uri:
+            return None
+        return Dataset(namespace=scheme, name=f"{netloc}{path}")
+
+    @staticmethod
+    def convert_to_ol_dataset_from_table(table: Table) -> Dataset:
+        from openlineage.client.facet import (
+            BaseFacet,
+            OwnershipDatasetFacet,
+            OwnershipDatasetFacetOwners,
+            SchemaDatasetFacet,
+            SchemaField,
+        )
+        from openlineage.client.run import Dataset
+
+        facets: dict[str, BaseFacet] = {}
+        if table.columns:
+            facets["schema"] = SchemaDatasetFacet(
+                fields=[
+                    SchemaField(
+                        name=column.name,
+                        type=column.data_type,
+                        description=column.description,
+                    )
+                    for column in table.columns
+                ]
+            )
+        if table.owners:
+            facets["ownership"] = OwnershipDatasetFacet(
+                owners=[
+                    OwnershipDatasetFacetOwners(
+                        # f.e. "user:John Doe <[email protected]>" or just 
"user:<[email protected]>"
+                        name=f"user:"
+                        f"{user.first_name + ' ' if user.first_name else ''}"
+                        f"{user.last_name + ' ' if user.last_name else ''}"
+                        f"<{user.email}>",
+                        type="",
+                    )
+                    for user in table.owners
+                ]
+            )
+        return Dataset(
+            namespace=f"{table.cluster}",
+            name=f"{table.database}.{table.name}",
+            facets=facets,
+        )
+
+    @staticmethod
+    def convert_to_ol_dataset(obj) -> Dataset | None:
         from openlineage.client.run import Dataset
 
-        from airflow.lineage.entities import Table
+        from airflow.lineage.entities import File, Table
 
         if isinstance(obj, Dataset):
             return obj
         elif isinstance(obj, Table):
-            return Dataset(
-                namespace=f"{obj.cluster}",
-                name=f"{obj.database}.{obj.name}",
-                facets={},
-            )
+            return ExtractorManager.convert_to_ol_dataset_from_table(obj)
+        elif isinstance(obj, File):
+            return 
ExtractorManager.convert_to_ol_dataset_from_object_storage_uri(obj.url)
         else:
             return None
 
diff --git a/docs/apache-airflow-providers-openlineage/guides/developer.rst 
b/docs/apache-airflow-providers-openlineage/guides/developer.rst
index 035774e69a..3c7518d6cb 100644
--- a/docs/apache-airflow-providers-openlineage/guides/developer.rst
+++ b/docs/apache-airflow-providers-openlineage/guides/developer.rst
@@ -364,10 +364,7 @@ Airflow allows Operators to track lineage by specifying 
the input and outputs of
 `inlets and outlets 
<https://airflow.apache.org/docs/apache-airflow/stable/administration-and-deployment/lineage.html#lineage>`_.
 OpenLineage will, by default, use inlets and outlets as input/output datasets 
if it cannot find any successful extraction from the OpenLineage methods or the 
Extractors.
 
-.. important::
-
-    Airflow supports inlets and outlets to be either a Table, Column, File or 
User entity. However, currently OpenLineage only extracts lineage via Table 
entity
-
+Airflow supports inlets and outlets to be either a Table, Column, File or User 
entity and so does OpenLineage.
 
 Example
 ^^^^^^^
@@ -379,33 +376,50 @@ An Operator inside the Airflow DAG can be annotated with 
inlets and outlets like
     """Example DAG demonstrating the usage of the extraction via Inlets and 
Outlets."""
 
     import pendulum
-    import datetime
 
     from airflow import DAG
     from airflow.operators.bash import BashOperator
-    from airflow.lineage.entities import Table, File
+    from airflow.lineage.entities import Table, File, Column, User
+
+
+    t1 = Table(
+        cluster="c1",
+        database="d1",
+        name="t1",
+        owners=[User(email="[email protected]", first_name="Joe", last_name="Doe")],
+    )
+    t2 = Table(
+        cluster="c1",
+        database="d1",
+        name="t2",
+        columns=[
+            Column(name="col1", description="desc1", data_type="type1"),
+            Column(name="col2", description="desc2", data_type="type2"),
+        ],
+        owners=[
+            User(email="[email protected]", first_name="Mike", 
last_name="Smith"),
+            User(email="[email protected]", first_name="Theo"),
+            User(email="[email protected]", last_name="Smith"),
+            User(email="[email protected]"),
+        ],
+    )
+    t3 = Table(
+        cluster="c1",
+        database="d1",
+        name="t3",
+        columns=[
+            Column(name="col3", description="desc3", data_type="type3"),
+            Column(name="col4", description="desc4", data_type="type4"),
+        ],
+    )
+    t4 = Table(cluster="c1", database="d1", name="t4")
+    f1 = File(url="s3://bucket/dir/file1")
 
 
-    def create_table(cluster, database, name):
-        return Table(
-            database=database,
-            cluster=cluster,
-            name=name,
-        )
-
-
-    t1 = create_table("c1", "d1", "t1")
-    t2 = create_table("c1", "d1", "t2")
-    t3 = create_table("c1", "d1", "t3")
-    t4 = create_table("c1", "d1", "t4")
-    f1 = File(url="http://randomfile";)
-
     with DAG(
         dag_id="example_operator",
-        schedule_interval="0 0 * * *",
+        schedule_interval="@once",
         start_date=pendulum.datetime(2021, 1, 1, tz="UTC"),
-        dagrun_timeout=datetime.timedelta(minutes=60),
-        params={"example_key": "example_value"},
     ) as dag:
         task1 = BashOperator(
             task_id="task_1_with_inlet_outlet",
@@ -426,8 +440,6 @@ An Operator inside the Airflow DAG can be annotated with 
inlets and outlets like
     if __name__ == "__main__":
         dag.cli()
 
-Note that the ``File`` entity, defined in the example code, is not captured by 
the lineage event currently as described in the ``important`` box above.
-
 Conversion from Airflow Table entity to OpenLineage Dataset is made in the 
following way:
 - ``CLUSTER`` of the table entity becomes the namespace of OpenLineage's 
Dataset
 - The name of the dataset is formed by ``{{DATABASE}}.{{NAME}}`` where 
``DATABASE`` and ``NAME`` are attributes specified by Airflow's Table entity.
diff --git a/tests/always/test_project_structure.py 
b/tests/always/test_project_structure.py
index bab56abead..85c2c30655 100644
--- a/tests/always/test_project_structure.py
+++ b/tests/always/test_project_structure.py
@@ -159,7 +159,6 @@ class TestProjectStructure:
             "tests/providers/microsoft/azure/operators/test_adls.py",
             
"tests/providers/microsoft/azure/transfers/test_azure_blob_to_gcs.py",
             "tests/providers/mongo/sensors/test_mongo.py",
-            "tests/providers/openlineage/extractors/test_manager.py",
             "tests/providers/openlineage/plugins/test_adapter.py",
             "tests/providers/openlineage/plugins/test_facets.py",
             "tests/providers/openlineage/test_sqlparser.py",
diff --git a/tests/providers/openlineage/extractors/test_manager.py 
b/tests/providers/openlineage/extractors/test_manager.py
new file mode 100644
index 0000000000..d1f794b49d
--- /dev/null
+++ b/tests/providers/openlineage/extractors/test_manager.py
@@ -0,0 +1,159 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+from __future__ import annotations
+
+import pytest
+from openlineage.client.facet import (
+    OwnershipDatasetFacet,
+    OwnershipDatasetFacetOwners,
+    SchemaDatasetFacet,
+    SchemaField,
+)
+from openlineage.client.run import Dataset
+
+from airflow.lineage.entities import Column, File, Table, User
+from airflow.providers.openlineage.extractors.manager import ExtractorManager
+
+
[email protected](
+    ("uri", "dataset"),
+    (
+        ("s3://bucket1/dir1/file1", Dataset(namespace="s3://bucket1", 
name="dir1/file1")),
+        ("gs://bucket2/dir2/file2", Dataset(namespace="gs://bucket2", 
name="dir2/file2")),
+        ("gcs://bucket3/dir3/file3", Dataset(namespace="gs://bucket3", 
name="dir3/file3")),
+        ("https://test.com";, Dataset(namespace="https", name="test.com")),
+        ("https://test.com?param1=test1&param2=test2";, 
Dataset(namespace="https", name="test.com")),
+        ("not_an_url", None),
+    ),
+)
+def test_convert_to_ol_dataset_from_object_storage_uri(uri, dataset):
+    result = 
ExtractorManager.convert_to_ol_dataset_from_object_storage_uri(uri)
+    assert result == dataset
+
+
[email protected](
+    ("obj", "dataset"),
+    (
+        (
+            Dataset(namespace="n1", name="f1"),
+            Dataset(namespace="n1", name="f1"),
+        ),
+        (File(url="s3://bucket1/dir1/file1"), 
Dataset(namespace="s3://bucket1", name="dir1/file1")),
+        (File(url="gs://bucket2/dir2/file2"), 
Dataset(namespace="gs://bucket2", name="dir2/file2")),
+        (File(url="https://test.com";), Dataset(namespace="https", 
name="test.com")),
+        (Table(cluster="c1", database="d1", name="t1"), 
Dataset(namespace="c1", name="d1.t1")),
+        ("gs://bucket2/dir2/file2", None),
+        ("not_an_url", None),
+    ),
+)
+def test_convert_to_ol_dataset(obj, dataset):
+    result = ExtractorManager.convert_to_ol_dataset(obj)
+    assert result == dataset
+
+
+def test_convert_to_ol_dataset_from_table_with_columns_and_owners():
+    table = Table(
+        cluster="c1",
+        database="d1",
+        name="t1",
+        columns=[
+            Column(name="col1", description="desc1", data_type="type1"),
+            Column(name="col2", description="desc2", data_type="type2"),
+        ],
+        owners=[
+            User(email="[email protected]", first_name="Mike", 
last_name="Smith"),
+            User(email="[email protected]", first_name="Theo"),
+            User(email="[email protected]", last_name="Smith"),
+            User(email="[email protected]"),
+        ],
+    )
+    expected_facets = {
+        "schema": SchemaDatasetFacet(
+            fields=[
+                SchemaField(
+                    name="col1",
+                    type="type1",
+                    description="desc1",
+                ),
+                SchemaField(
+                    name="col2",
+                    type="type2",
+                    description="desc2",
+                ),
+            ]
+        ),
+        "ownership": OwnershipDatasetFacet(
+            owners=[
+                OwnershipDatasetFacetOwners(name="user:Mike Smith 
<[email protected]>", type=""),
+                OwnershipDatasetFacetOwners(name="user:Theo 
<[email protected]>", type=""),
+                OwnershipDatasetFacetOwners(name="user:Smith 
<[email protected]>", type=""),
+                OwnershipDatasetFacetOwners(name="user:<[email protected]>", 
type=""),
+            ]
+        ),
+    }
+    result = ExtractorManager.convert_to_ol_dataset_from_table(table)
+    assert result.namespace == "c1"
+    assert result.name == "d1.t1"
+    assert result.facets == expected_facets
+
+
+def test_convert_to_ol_dataset_table():
+    table = Table(
+        cluster="c1",
+        database="d1",
+        name="t1",
+        columns=[
+            Column(name="col1", description="desc1", data_type="type1"),
+            Column(name="col2", description="desc2", data_type="type2"),
+        ],
+        owners=[
+            User(email="[email protected]", first_name="Mike", 
last_name="Smith"),
+            User(email="[email protected]", first_name="Theo"),
+            User(email="[email protected]", last_name="Smith"),
+            User(email="[email protected]"),
+        ],
+    )
+    expected_facets = {
+        "schema": SchemaDatasetFacet(
+            fields=[
+                SchemaField(
+                    name="col1",
+                    type="type1",
+                    description="desc1",
+                ),
+                SchemaField(
+                    name="col2",
+                    type="type2",
+                    description="desc2",
+                ),
+            ]
+        ),
+        "ownership": OwnershipDatasetFacet(
+            owners=[
+                OwnershipDatasetFacetOwners(name="user:Mike Smith 
<[email protected]>", type=""),
+                OwnershipDatasetFacetOwners(name="user:Theo 
<[email protected]>", type=""),
+                OwnershipDatasetFacetOwners(name="user:Smith 
<[email protected]>", type=""),
+                OwnershipDatasetFacetOwners(name="user:<[email protected]>", 
type=""),
+            ]
+        ),
+    }
+
+    result = ExtractorManager.convert_to_ol_dataset(table)
+    assert result.namespace == "c1"
+    assert result.name == "d1.t1"
+    assert result.facets == expected_facets

Reply via email to