This is an automated email from the ASF dual-hosted git repository.
mobuchowski pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git
The following commit(s) were added to refs/heads/main by this push:
new e358bb2253 tests: Add OpenLineage test cases for File to Dataset
conversion (#37791)
e358bb2253 is described below
commit e358bb2253509dcb3631db7ddffad7dc557ca97e
Author: Kacper Muda <[email protected]>
AuthorDate: Thu Feb 29 13:15:35 2024 +0100
tests: Add OpenLineage test cases for File to Dataset conversion (#37791)
---
airflow/providers/openlineage/extractors/manager.py | 20 ++++++++++++++------
.../providers/openlineage/extractors/test_manager.py | 13 +++++++++++++
2 files changed, 27 insertions(+), 6 deletions(-)
diff --git a/airflow/providers/openlineage/extractors/manager.py
b/airflow/providers/openlineage/extractors/manager.py
index cb77c796ba..405db3d8e5 100644
--- a/airflow/providers/openlineage/extractors/manager.py
+++ b/airflow/providers/openlineage/extractors/manager.py
@@ -186,16 +186,24 @@ class ExtractorManager(LoggingMixin):
from openlineage.client.run import Dataset
+ if "/" not in uri:
+ return None
+
try:
scheme, netloc, path, params, _, _ = urlparse(uri)
except Exception:
return None
- if scheme.startswith("s3"):
- return Dataset(namespace=f"s3://{netloc}", name=path.lstrip("/"))
- elif scheme.startswith(("gcs", "gs")):
- return Dataset(namespace=f"gs://{netloc}", name=path.lstrip("/"))
- elif "/" not in uri:
- return None
+
+ common_schemas = {
+ "s3": "s3",
+ "gs": "gs",
+ "gcs": "gs",
+ "hdfs": "hdfs",
+ "file": "file",
+ }
+ for found, final in common_schemas.items():
+ if scheme.startswith(found):
+ return Dataset(namespace=f"{final}://{netloc}",
name=path.lstrip("/"))
return Dataset(namespace=scheme, name=f"{netloc}{path}")
@staticmethod
diff --git a/tests/providers/openlineage/extractors/test_manager.py
b/tests/providers/openlineage/extractors/test_manager.py
index d1f794b49d..7a790e8393 100644
--- a/tests/providers/openlineage/extractors/test_manager.py
+++ b/tests/providers/openlineage/extractors/test_manager.py
@@ -36,8 +36,13 @@ from airflow.providers.openlineage.extractors.manager import
ExtractorManager
("s3://bucket1/dir1/file1", Dataset(namespace="s3://bucket1",
name="dir1/file1")),
("gs://bucket2/dir2/file2", Dataset(namespace="gs://bucket2",
name="dir2/file2")),
("gcs://bucket3/dir3/file3", Dataset(namespace="gs://bucket3",
name="dir3/file3")),
+ ("hdfs://namenodehost:8020/file1",
Dataset(namespace="hdfs://namenodehost:8020", name="file1")),
+ ("hdfs://namenodehost/file2", Dataset(namespace="hdfs://namenodehost",
name="file2")),
+ ("file://localhost/etc/fstab", Dataset(namespace="file://localhost",
name="etc/fstab")),
+ ("file:///etc/fstab", Dataset(namespace="file://", name="etc/fstab")),
("https://test.com", Dataset(namespace="https", name="test.com")),
("https://test.com?param1=test1¶m2=test2",
Dataset(namespace="https", name="test.com")),
+ ("file:test.csv", None),
("not_an_url", None),
),
)
@@ -55,6 +60,14 @@ def test_convert_to_ol_dataset_from_object_storage_uri(uri,
dataset):
),
(File(url="s3://bucket1/dir1/file1"),
Dataset(namespace="s3://bucket1", name="dir1/file1")),
(File(url="gs://bucket2/dir2/file2"),
Dataset(namespace="gs://bucket2", name="dir2/file2")),
+ (File(url="gcs://bucket3/dir3/file3"),
Dataset(namespace="gs://bucket3", name="dir3/file3")),
+ (
+ File(url="hdfs://namenodehost:8020/file1"),
+ Dataset(namespace="hdfs://namenodehost:8020", name="file1"),
+ ),
+ (File(url="hdfs://namenodehost/file2"),
Dataset(namespace="hdfs://namenodehost", name="file2")),
+ (File(url="file://localhost/etc/fstab"),
Dataset(namespace="file://localhost", name="etc/fstab")),
+ (File(url="file:///etc/fstab"), Dataset(namespace="file://",
name="etc/fstab")),
(File(url="https://test.com"), Dataset(namespace="https",
name="test.com")),
(Table(cluster="c1", database="d1", name="t1"),
Dataset(namespace="c1", name="d1.t1")),
("gs://bucket2/dir2/file2", None),