This is an automated email from the ASF dual-hosted git repository.

mobuchowski pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git


The following commit(s) were added to refs/heads/main by this push:
     new e358bb2253 tests: Add OpenLineage test cases for File to Dataset 
conversion (#37791)
e358bb2253 is described below

commit e358bb2253509dcb3631db7ddffad7dc557ca97e
Author: Kacper Muda <[email protected]>
AuthorDate: Thu Feb 29 13:15:35 2024 +0100

    tests: Add OpenLineage test cases for File to Dataset conversion (#37791)
---
 airflow/providers/openlineage/extractors/manager.py  | 20 ++++++++++++++------
 .../providers/openlineage/extractors/test_manager.py | 13 +++++++++++++
 2 files changed, 27 insertions(+), 6 deletions(-)

diff --git a/airflow/providers/openlineage/extractors/manager.py 
b/airflow/providers/openlineage/extractors/manager.py
index cb77c796ba..405db3d8e5 100644
--- a/airflow/providers/openlineage/extractors/manager.py
+++ b/airflow/providers/openlineage/extractors/manager.py
@@ -186,16 +186,24 @@ class ExtractorManager(LoggingMixin):
 
         from openlineage.client.run import Dataset
 
+        if "/" not in uri:
+            return None
+
         try:
             scheme, netloc, path, params, _, _ = urlparse(uri)
         except Exception:
             return None
-        if scheme.startswith("s3"):
-            return Dataset(namespace=f"s3://{netloc}", name=path.lstrip("/"))
-        elif scheme.startswith(("gcs", "gs")):
-            return Dataset(namespace=f"gs://{netloc}", name=path.lstrip("/"))
-        elif "/" not in uri:
-            return None
+
+        common_schemas = {
+            "s3": "s3",
+            "gs": "gs",
+            "gcs": "gs",
+            "hdfs": "hdfs",
+            "file": "file",
+        }
+        for found, final in common_schemas.items():
+            if scheme.startswith(found):
+                return Dataset(namespace=f"{final}://{netloc}", 
name=path.lstrip("/"))
         return Dataset(namespace=scheme, name=f"{netloc}{path}")
 
     @staticmethod
diff --git a/tests/providers/openlineage/extractors/test_manager.py 
b/tests/providers/openlineage/extractors/test_manager.py
index d1f794b49d..7a790e8393 100644
--- a/tests/providers/openlineage/extractors/test_manager.py
+++ b/tests/providers/openlineage/extractors/test_manager.py
@@ -36,8 +36,13 @@ from airflow.providers.openlineage.extractors.manager import 
ExtractorManager
         ("s3://bucket1/dir1/file1", Dataset(namespace="s3://bucket1", 
name="dir1/file1")),
         ("gs://bucket2/dir2/file2", Dataset(namespace="gs://bucket2", 
name="dir2/file2")),
         ("gcs://bucket3/dir3/file3", Dataset(namespace="gs://bucket3", 
name="dir3/file3")),
+        ("hdfs://namenodehost:8020/file1", 
Dataset(namespace="hdfs://namenodehost:8020", name="file1")),
+        ("hdfs://namenodehost/file2", Dataset(namespace="hdfs://namenodehost", 
name="file2")),
+        ("file://localhost/etc/fstab", Dataset(namespace="file://localhost", 
name="etc/fstab")),
+        ("file:///etc/fstab", Dataset(namespace="file://", name="etc/fstab")),
         ("https://test.com";, Dataset(namespace="https", name="test.com")),
         ("https://test.com?param1=test1&param2=test2";, 
Dataset(namespace="https", name="test.com")),
+        ("file:test.csv", None),
         ("not_an_url", None),
     ),
 )
@@ -55,6 +60,14 @@ def test_convert_to_ol_dataset_from_object_storage_uri(uri, 
dataset):
         ),
         (File(url="s3://bucket1/dir1/file1"), 
Dataset(namespace="s3://bucket1", name="dir1/file1")),
         (File(url="gs://bucket2/dir2/file2"), 
Dataset(namespace="gs://bucket2", name="dir2/file2")),
+        (File(url="gcs://bucket3/dir3/file3"), 
Dataset(namespace="gs://bucket3", name="dir3/file3")),
+        (
+            File(url="hdfs://namenodehost:8020/file1"),
+            Dataset(namespace="hdfs://namenodehost:8020", name="file1"),
+        ),
+        (File(url="hdfs://namenodehost/file2"), 
Dataset(namespace="hdfs://namenodehost", name="file2")),
+        (File(url="file://localhost/etc/fstab"), 
Dataset(namespace="file://localhost", name="etc/fstab")),
+        (File(url="file:///etc/fstab"), Dataset(namespace="file://", 
name="etc/fstab")),
         (File(url="https://test.com";), Dataset(namespace="https", 
name="test.com")),
         (Table(cluster="c1", database="d1", name="t1"), 
Dataset(namespace="c1", name="d1.t1")),
         ("gs://bucket2/dir2/file2", None),

Reply via email to