This is an automated email from the ASF dual-hosted git repository.
bolke pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git
The following commit(s) were added to refs/heads/main by this push:
new f3b7cfc992 Make Datasets Pathlike (#36947)
f3b7cfc992 is described below
commit f3b7cfc9925d4a0abec29698a7398c2750ca5a15
Author: Bolke de Bruin <[email protected]>
AuthorDate: Tue Jan 23 14:51:37 2024 +0100
Make Datasets Pathlike (#36947)
This makes datasets inherit from os.Pathlike so they can directly be used by
the Object Storage API.
---
airflow/datasets/__init__.py | 6 +++++-
tests/datasets/test_dataset.py | 8 ++++++++
tests/io/test_path.py | 9 +++++++++
3 files changed, 22 insertions(+), 1 deletion(-)
diff --git a/airflow/datasets/__init__.py b/airflow/datasets/__init__.py
index 0dc635a00b..a4a127e3f7 100644
--- a/airflow/datasets/__init__.py
+++ b/airflow/datasets/__init__.py
@@ -16,6 +16,7 @@
# under the License.
from __future__ import annotations
+import os
from typing import Any, ClassVar
from urllib.parse import urlsplit
@@ -23,7 +24,7 @@ import attr
@attr.define()
-class Dataset:
+class Dataset(os.PathLike):
"""A Dataset is used for marking data dependencies between workflows."""
uri: str = attr.field(validator=[attr.validators.min_len(1),
attr.validators.max_len(3000)])
@@ -42,3 +43,6 @@ class Dataset:
parsed = urlsplit(uri)
if parsed.scheme and parsed.scheme.lower() == "airflow":
raise ValueError(f"{attr.name!r} scheme `airflow` is reserved")
+
+ def __fspath__(self):
+ return self.uri
diff --git a/tests/datasets/test_dataset.py b/tests/datasets/test_dataset.py
index f707be0792..9e9ca99513 100644
--- a/tests/datasets/test_dataset.py
+++ b/tests/datasets/test_dataset.py
@@ -17,6 +17,8 @@
from __future__ import annotations
+import os
+
import pytest
from airflow.datasets import Dataset
@@ -46,3 +48,9 @@ def test_uri_with_scheme():
def test_uri_without_scheme():
dataset = Dataset(uri="example_dataset")
EmptyOperator(task_id="task1", outlets=[dataset])
+
+
+def test_fspath():
+ uri = "s3://example_dataset"
+ dataset = Dataset(uri=uri)
+ assert os.fspath(dataset) == uri
diff --git a/tests/io/test_path.py b/tests/io/test_path.py
index ab143b038e..deb8d412cc 100644
--- a/tests/io/test_path.py
+++ b/tests/io/test_path.py
@@ -26,6 +26,7 @@ import pytest
from fsspec.implementations.local import LocalFileSystem
from fsspec.utils import stringify_path
+from airflow.datasets import Dataset
from airflow.io import _register_filesystems, get_fs
from airflow.io.path import ObjectStoragePath
from airflow.io.store import _STORE_CACHE, ObjectStore, attach
@@ -309,3 +310,11 @@ class TestFs:
finally:
# Reset the cache to avoid side effects
_register_filesystems.cache_clear()
+
+ def test_dataset(self):
+ p = "s3"
+ f = "/tmp/foo"
+ i = Dataset(uri=f"{p}://{f}", extra={"foo": "bar"})
+ o = ObjectStoragePath(i)
+ assert o.protocol == p
+ assert o.path == f