This is an automated email from the ASF dual-hosted git repository.
potiuk pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git
The following commit(s) were added to refs/heads/main by this push:
new 8eaf1bf77b Revert "Update ObjectStoragePath for
universal_pathlib>=v0.2.1 (#37524)" (#37567)
8eaf1bf77b is described below
commit 8eaf1bf77bdee94a9d94cc9e4e775a7794d1539a
Author: Jarek Potiuk <[email protected]>
AuthorDate: Tue Feb 20 19:52:42 2024 +0100
Revert "Update ObjectStoragePath for universal_pathlib>=v0.2.1 (#37524)"
(#37567)
This reverts commit 08bc0f44904fe0d8bc8779e0e892e4d42def3983.
u
---
airflow/io/path.py | 169 ++++++++++++++++---------
airflow/providers/common/io/xcom/backend.py | 4 +-
pyproject.toml | 9 +-
tests/io/test_path.py | 135 +++++++-------------
tests/providers/common/io/xcom/test_backend.py | 2 +-
5 files changed, 165 insertions(+), 154 deletions(-)
diff --git a/airflow/io/path.py b/airflow/io/path.py
index cb4c48c476..d65d837e7e 100644
--- a/airflow/io/path.py
+++ b/airflow/io/path.py
@@ -17,20 +17,24 @@
from __future__ import annotations
import contextlib
+import functools
import os
import shutil
import typing
-from typing import Any, Mapping
+from pathlib import PurePath
from urllib.parse import urlsplit
+from fsspec.core import split_protocol
from fsspec.utils import stringify_path
-from upath.implementations.cloud import CloudPath
+from upath.implementations.cloud import CloudPath, _CloudAccessor
from upath.registry import get_upath_class
from airflow.io.store import attach
from airflow.io.utils.stat import stat_result
if typing.TYPE_CHECKING:
+ from urllib.parse import SplitResult
+
from fsspec import AbstractFileSystem
@@ -39,68 +43,124 @@ PT = typing.TypeVar("PT", bound="ObjectStoragePath")
default = "file"
+class _AirflowCloudAccessor(_CloudAccessor):
+ __slots__ = ("_store",)
+
+ def __init__(
+ self,
+ parsed_url: SplitResult | None,
+ conn_id: str | None = None,
+ **kwargs: typing.Any,
+ ) -> None:
+ # warning: we are not calling super().__init__ here
+ # as it will try to create a new fs from a different
+ # set if registered filesystems
+ if parsed_url and parsed_url.scheme:
+ self._store = attach(parsed_url.scheme, conn_id)
+ else:
+ self._store = attach("file", conn_id)
+
+ @property
+ def _fs(self) -> AbstractFileSystem:
+ return self._store.fs
+
+ def __eq__(self, other):
+ return isinstance(other, _AirflowCloudAccessor) and self._store ==
other._store
+
+
class ObjectStoragePath(CloudPath):
"""A path-like object for object storage."""
+ _accessor: _AirflowCloudAccessor
+
__version__: typing.ClassVar[int] = 1
- _protocol_dispatch = False
+ _default_accessor = _AirflowCloudAccessor
sep: typing.ClassVar[str] = "/"
root_marker: typing.ClassVar[str] = "/"
- __slots__ = ("_hash_cached",)
-
- @classmethod
- def _transform_init_args(
- cls,
- args: tuple[str | os.PathLike, ...],
- protocol: str,
- storage_options: dict[str, Any],
- ) -> tuple[tuple[str | os.PathLike, ...], str, dict[str, Any]]:
- """Extract conn_id from the URL and set it as a storage option."""
- if args:
- arg0 = args[0]
- parsed_url = urlsplit(stringify_path(arg0))
- userinfo, have_info, hostinfo = parsed_url.netloc.rpartition("@")
- if have_info:
- storage_options.setdefault("conn_id", userinfo or None)
- parsed_url = parsed_url._replace(netloc=hostinfo)
- args = (parsed_url.geturl(),) + args[1:]
- protocol = protocol or parsed_url.scheme
- return args, protocol, storage_options
+ _bucket: str
+ _key: str
+ _protocol: str
+ _hash: int | None
+
+ __slots__ = (
+ "_bucket",
+ "_key",
+ "_conn_id",
+ "_protocol",
+ "_hash",
+ )
+
+ def __new__(
+ cls: type[PT],
+ *args: str | os.PathLike,
+ scheme: str | None = None,
+ conn_id: str | None = None,
+ **kwargs: typing.Any,
+ ) -> PT:
+ args_list = list(args)
+
+ if args_list:
+ other = args_list.pop(0) or "."
+ else:
+ other = "."
+
+ if isinstance(other, PurePath):
+ _cls: typing.Any = type(other)
+ drv, root, parts = _cls._parse_args(args_list)
+ drv, root, parts = _cls._flavour.join_parsed_parts(
+ other._drv, # type: ignore[attr-defined]
+ other._root, # type: ignore[attr-defined]
+ other._parts, # type: ignore[attr-defined]
+ drv,
+ root,
+ parts, # type: ignore
+ )
+
+ _kwargs = getattr(other, "_kwargs", {})
+ _url = getattr(other, "_url", None)
+ other_kwargs = _kwargs.copy()
+ if _url and _url.scheme:
+ other_kwargs["url"] = _url
+ new_kwargs = _kwargs.copy()
+ new_kwargs.update(kwargs)
+
+ return _cls(_cls._format_parsed_parts(drv, root, parts,
**other_kwargs), **new_kwargs)
+
+ url = stringify_path(other)
+ parsed_url: SplitResult = urlsplit(url)
+
+ if scheme: # allow override of protocol
+ parsed_url = parsed_url._replace(scheme=scheme)
+
+ if not parsed_url.path: # ensure path has root
+ parsed_url = parsed_url._replace(path="/")
+
+ if not parsed_url.scheme and not split_protocol(url)[0]:
+ args_list.insert(0, url)
+ else:
+ args_list.insert(0, parsed_url.path)
- @classmethod
- def _parse_storage_options(
- cls, urlpath: str, protocol: str, storage_options: Mapping[str, Any]
- ) -> dict[str, Any]:
- fs = attach(protocol or "file",
conn_id=storage_options.get("conn_id")).fs
- pth_storage_options = type(fs)._get_kwargs_from_urls(urlpath)
- return {**pth_storage_options, **storage_options}
+ # This matches the parsing logic in urllib.parse; see:
+ #
https://github.com/python/cpython/blob/46adf6b701c440e047abf925df9a75a/Lib/urllib/parse.py#L194-L203
+ userinfo, have_info, hostinfo = parsed_url.netloc.rpartition("@")
+ if have_info:
+ conn_id = conn_id or userinfo or None
+ parsed_url = parsed_url._replace(netloc=hostinfo)
- @classmethod
- def _fs_factory(
- cls, urlpath: str, protocol: str, storage_options: Mapping[str, Any]
- ) -> AbstractFileSystem:
- return attach(protocol or "file", storage_options.get("conn_id")).fs
+ return cls._from_parts(args_list, url=parsed_url, conn_id=conn_id,
**kwargs) # type: ignore
+ @functools.lru_cache
def __hash__(self) -> int:
- self._hash_cached: int
- try:
- return self._hash_cached
- except AttributeError:
- self._hash_cached = hash(str(self))
- return self._hash_cached
+ return hash(str(self))
def __eq__(self, other: typing.Any) -> bool:
return self.samestore(other) and str(self) == str(other)
def samestore(self, other: typing.Any) -> bool:
- return (
- isinstance(other, ObjectStoragePath)
- and self.protocol == other.protocol
- and self.storage_options.get("conn_id") ==
other.storage_options.get("conn_id")
- )
+ return isinstance(other, ObjectStoragePath) and self._accessor ==
other._accessor
@property
def container(self) -> str:
@@ -126,17 +186,12 @@ class ObjectStoragePath(CloudPath):
def namespace(self) -> str:
return f"{self.protocol}://{self.bucket}" if self.bucket else
self.protocol
- def open(self, mode="r", **kwargs):
- """Open the file pointed to by this path."""
- kwargs.setdefault("block_size", kwargs.pop("buffering", None))
- return self.fs.open(self.path, mode=mode, **kwargs)
-
def stat(self) -> stat_result: # type: ignore[override]
"""Call ``stat`` and return the result."""
return stat_result(
- self.fs.stat(self.path),
+ self._accessor.stat(self),
protocol=self.protocol,
- conn_id=self.storage_options.get("conn_id"),
+ conn_id=self._accessor._store.conn_id,
)
def samefile(self, other_path: typing.Any) -> bool:
@@ -313,11 +368,7 @@ class ObjectStoragePath(CloudPath):
if path == self.path:
continue
- src_obj = ObjectStoragePath(
- path,
- protocol=self.protocol,
- conn_id=self.storage_options.get("conn_id"),
- )
+ src_obj = ObjectStoragePath(path,
conn_id=self._accessor._store.conn_id)
# skip directories, empty directories will not be created
if src_obj.is_dir():
@@ -350,7 +401,7 @@ class ObjectStoragePath(CloudPath):
self.unlink()
def serialize(self) -> dict[str, typing.Any]:
- _kwargs = {**self.storage_options}
+ _kwargs = self._kwargs.copy()
conn_id = _kwargs.pop("conn_id", None)
return {
diff --git a/airflow/providers/common/io/xcom/backend.py
b/airflow/providers/common/io/xcom/backend.py
index 3028a49be2..6e995c30e1 100644
--- a/airflow/providers/common/io/xcom/backend.py
+++ b/airflow/providers/common/io/xcom/backend.py
@@ -132,7 +132,7 @@ class XComObjectStoreBackend(BaseXCom):
if not p.parent.exists():
p.parent.mkdir(parents=True, exist_ok=True)
- with p.open(mode="wb", compression=compression) as f:
+ with p.open("wb", compression=compression) as f:
f.write(s_val)
return BaseXCom.serialize_value(str(p))
@@ -152,7 +152,7 @@ class XComObjectStoreBackend(BaseXCom):
try:
p = ObjectStoragePath(path) / XComObjectStoreBackend._get_key(data)
- return json.load(p.open(mode="rb", compression="infer"),
cls=XComDecoder)
+ return json.load(p.open("rb", compression="infer"),
cls=XComDecoder)
except TypeError:
return data
except ValueError:
diff --git a/pyproject.toml b/pyproject.toml
index 42265978a7..f53c1002a3 100644
--- a/pyproject.toml
+++ b/pyproject.toml
@@ -148,7 +148,14 @@ dependencies = [
# We should also remove "licenses/LICENSE-unicodecsv.txt" file when we
remove this dependency
"unicodecsv>=0.14.1",
# The Universal Pathlib provides Pathlib-like interface for FSSPEC
- "universal-pathlib>=0.2.1",
+ # In 0.1. *It was not very well defined for extension, so the way how we
use it for 0.1.*
+ # so we used a lot of private methods and attributes that were not defined
in the interface
+ # an they are broken with version 0.2.0 which is much better suited for
extension and supports
+ # Python 3.12. We should limit it, unti we migrate to 0.2.0
+ # See:
https://github.com/fsspec/universal_pathlib/pull/173#issuecomment-1937090528
+ # This is prerequistite to make Airflow compatible with Python 3.12
+ # Tracked in https://github.com/apache/airflow/pull/36755
+ "universal-pathlib>=0.1.4,<0.2.0",
# Werkzug 3 breaks Flask-Login 0.6.2, also connexion needs to be updated
to >= 3.0
# we should remove this limitation when FAB supports Flask 2.3 and we
migrate connexion to 3+
"werkzeug>=2.0,<3",
diff --git a/tests/io/test_path.py b/tests/io/test_path.py
index e03b40e0e4..deb8d412cc 100644
--- a/tests/io/test_path.py
+++ b/tests/io/test_path.py
@@ -20,13 +20,11 @@ from __future__ import annotations
import uuid
from stat import S_ISDIR, S_ISREG
from tempfile import NamedTemporaryFile
-from typing import Any, ClassVar
from unittest import mock
import pytest
from fsspec.implementations.local import LocalFileSystem
-from fsspec.implementations.memory import MemoryFileSystem
-from fsspec.registry import _registry as _fsspec_registry,
register_implementation
+from fsspec.utils import stringify_path
from airflow.datasets import Dataset
from airflow.io import _register_filesystems, get_fs
@@ -40,46 +38,19 @@ FOO = "file:///mnt/warehouse/foo"
BAR = FOO
-class FakeLocalFileSystem(MemoryFileSystem):
- protocol = ("file", "local")
- root_marker = "/"
- store: ClassVar[dict[str, Any]] = {}
- pseudo_dirs = [""]
+class FakeRemoteFileSystem(LocalFileSystem):
+ id = "fakefs"
+ auto_mk_dir = True
- def __init__(self, *args, **kwargs):
- self.conn_id = kwargs.pop("conn_id", None)
- super().__init__(*args, **kwargs)
+ @property
+ def fsid(self):
+ return self.id
@classmethod
- def _strip_protocol(cls, path):
- for protocol in cls.protocol:
- if path.startswith(f"{protocol}://"):
- return path[len(f"{protocol}://") :]
- if "::" in path or "://" in path:
- return path.rstrip("/")
- path = path.lstrip("/").rstrip("/")
- return "/" + path if path else ""
-
-
-class FakeRemoteFileSystem(MemoryFileSystem):
- protocol = ("s3", "fakefs", "ffs", "ffs2")
- root_marker = ""
- store: ClassVar[dict[str, Any]] = {}
- pseudo_dirs = [""]
-
- def __init__(self, *args, **kwargs):
- self.conn_id = kwargs.pop("conn_id", None)
- super().__init__(*args, **kwargs)
-
- @classmethod
- def _strip_protocol(cls, path):
- for protocol in cls.protocol:
- if path.startswith(f"{protocol}://"):
- return path[len(f"{protocol}://") :]
- if "::" in path or "://" in path:
- return path.rstrip("/")
- path = path.lstrip("/").rstrip("/")
- return "/" + path if path else ""
+ def _strip_protocol(cls, path) -> str:
+ path = stringify_path(path)
+ i = path.find("://")
+ return path[i + 3 :] if i > 0 else path
def get_fs_no_storage_options(_: str):
@@ -89,15 +60,10 @@ def get_fs_no_storage_options(_: str):
class TestFs:
def setup_class(self):
self._store_cache = _STORE_CACHE.copy()
- self._fsspec_registry = _fsspec_registry.copy()
- for protocol in FakeRemoteFileSystem.protocol:
- register_implementation(protocol, FakeRemoteFileSystem,
clobber=True)
def teardown(self):
_STORE_CACHE.clear()
_STORE_CACHE.update(self._store_cache)
- _fsspec_registry.clear()
- _fsspec_registry.update(self._fsspec_registry)
def test_alias(self):
store = attach("file", alias="local")
@@ -105,24 +71,22 @@ class TestFs:
assert "local" in _STORE_CACHE
def test_init_objectstoragepath(self):
- attach("s3", fs=FakeRemoteFileSystem())
-
- path = ObjectStoragePath("s3://bucket/key/part1/part2")
+ path = ObjectStoragePath("file://bucket/key/part1/part2")
assert path.bucket == "bucket"
assert path.key == "key/part1/part2"
- assert path.protocol == "s3"
+ assert path.protocol == "file"
assert path.path == "bucket/key/part1/part2"
path2 = ObjectStoragePath(path / "part3")
assert path2.bucket == "bucket"
assert path2.key == "key/part1/part2/part3"
- assert path2.protocol == "s3"
+ assert path2.protocol == "file"
assert path2.path == "bucket/key/part1/part2/part3"
path3 = ObjectStoragePath(path2 / "2023")
assert path3.bucket == "bucket"
assert path3.key == "key/part1/part2/part3/2023"
- assert path3.protocol == "s3"
+ assert path3.protocol == "file"
assert path3.path == "bucket/key/part1/part2/part3/2023"
def test_read_write(self):
@@ -152,57 +116,49 @@ class TestFs:
assert not o.exists()
- def test_objectstoragepath_init_conn_id_in_uri(self):
- attach(protocol="fake", conn_id="fake",
fs=FakeRemoteFileSystem(conn_id="fake"))
+ @pytest.fixture()
+ def fake_fs(self):
+ fs = mock.Mock()
+ fs._strip_protocol.return_value = "/"
+ fs.conn_id = "fake"
+ return fs
+
+ def test_objectstoragepath_init_conn_id_in_uri(self, fake_fs):
+ fake_fs.stat.return_value = {"stat": "result"}
+ attach(protocol="fake", conn_id="fake", fs=fake_fs)
p = ObjectStoragePath("fake://fake@bucket/path")
- p.touch()
- fsspec_info = p.fs.info(p.path)
- assert p.stat() == {**fsspec_info, "conn_id": "fake", "protocol":
"fake"}
-
- @pytest.fixture
- def fake_local_files(self):
- obj = FakeLocalFileSystem()
- obj.touch(FOO)
- try:
- yield
- finally:
- FakeLocalFileSystem.store.clear()
- FakeLocalFileSystem.pseudo_dirs[:] = [""]
+ assert p.stat() == {"stat": "result", "conn_id": "fake", "protocol":
"fake"}
@pytest.mark.parametrize(
"fn, args, fn2, path, expected_args, expected_kwargs",
[
- ("checksum", {}, "checksum", FOO,
FakeLocalFileSystem._strip_protocol(BAR), {}),
- ("size", {}, "size", FOO,
FakeLocalFileSystem._strip_protocol(BAR), {}),
+ ("checksum", {}, "checksum", FOO,
FakeRemoteFileSystem._strip_protocol(BAR), {}),
+ ("size", {}, "size", FOO,
FakeRemoteFileSystem._strip_protocol(BAR), {}),
(
"sign",
{"expiration": 200, "extra": "xtra"},
"sign",
FOO,
- FakeLocalFileSystem._strip_protocol(BAR),
+ FakeRemoteFileSystem._strip_protocol(BAR),
{"expiration": 200, "extra": "xtra"},
),
- ("ukey", {}, "ukey", FOO,
FakeLocalFileSystem._strip_protocol(BAR), {}),
+ ("ukey", {}, "ukey", FOO,
FakeRemoteFileSystem._strip_protocol(BAR), {}),
(
"read_block",
{"offset": 0, "length": 1},
"read_block",
FOO,
- FakeLocalFileSystem._strip_protocol(BAR),
+ FakeRemoteFileSystem._strip_protocol(BAR),
{"delimiter": None, "length": 1, "offset": 0},
),
],
)
- def test_standard_extended_api(
- self, fake_local_files, fn, args, fn2, path, expected_args,
expected_kwargs
- ):
- fs = FakeLocalFileSystem()
- with mock.patch.object(fs, fn2) as method:
- attach(protocol="file", conn_id="fake", fs=fs)
- o = ObjectStoragePath(path, conn_id="fake")
+ def test_standard_extended_api(self, fake_fs, fn, args, fn2, path,
expected_args, expected_kwargs):
+ store = attach(protocol="file", conn_id="fake", fs=fake_fs)
+ o = ObjectStoragePath(path, conn_id="fake")
- getattr(o, fn)(**args)
- method.assert_called_once_with(expected_args, **expected_kwargs)
+ getattr(o, fn)(**args)
+ getattr(store.fs, fn2).assert_called_once_with(expected_args,
**expected_kwargs)
def test_stat(self):
with NamedTemporaryFile() as f:
@@ -212,8 +168,6 @@ class TestFs:
assert S_ISDIR(o.parent.stat().st_mode)
def test_bucket_key_protocol(self):
- attach(protocol="s3", fs=FakeRemoteFileSystem())
-
bucket = "bkt"
key = "yek"
protocol = "s3"
@@ -273,23 +227,24 @@ class TestFs:
_to.unlink()
def test_copy_remote_remote(self):
- attach("ffs", fs=FakeRemoteFileSystem(skip_instance_cache=True))
- attach("ffs2", fs=FakeRemoteFileSystem(skip_instance_cache=True))
+ # foo = xxx added to prevent same fs token
+ attach("ffs", fs=FakeRemoteFileSystem(auto_mkdir=True, foo="bar"))
+ attach("ffs2", fs=FakeRemoteFileSystem(auto_mkdir=True, foo="baz"))
- dir_src = f"bucket1/{str(uuid.uuid4())}"
- dir_dst = f"bucket2/{str(uuid.uuid4())}"
+ dir_src = f"/tmp/{str(uuid.uuid4())}"
+ dir_dst = f"/tmp/{str(uuid.uuid4())}"
key = "foo/bar/baz.txt"
+ # note we are dealing with object storage characteristics
+ # while working on a local filesystem, so it might feel not intuitive
_from = ObjectStoragePath(f"ffs://{dir_src}")
_from_file = _from / key
_from_file.touch()
- assert _from.bucket == "bucket1"
assert _from_file.exists()
_to = ObjectStoragePath(f"ffs2://{dir_dst}")
_from.copy(_to)
- assert _to.bucket == "bucket2"
assert _to.exists()
assert _to.is_dir()
assert (_to / _from.key / key).exists()
@@ -299,7 +254,7 @@ class TestFs:
_to.rmdir(recursive=True)
def test_serde_objectstoragepath(self):
- path = "file:///bucket/key/part1/part2"
+ path = "file://bucket/key/part1/part2"
o = ObjectStoragePath(path)
s = o.serialize()
@@ -357,8 +312,6 @@ class TestFs:
_register_filesystems.cache_clear()
def test_dataset(self):
- attach("s3", fs=FakeRemoteFileSystem())
-
p = "s3"
f = "/tmp/foo"
i = Dataset(uri=f"{p}://{f}", extra={"foo": "bar"})
diff --git a/tests/providers/common/io/xcom/test_backend.py
b/tests/providers/common/io/xcom/test_backend.py
index 0641e18fe0..fce5ed985e 100644
--- a/tests/providers/common/io/xcom/test_backend.py
+++ b/tests/providers/common/io/xcom/test_backend.py
@@ -181,7 +181,7 @@ class TestXcomObjectStoreBackend:
run_id=task_instance.run_id,
session=session,
)
- assert str(p) == qry.first().value
+ assert self.path in qry.first().value
@pytest.mark.db_test
def test_clear(self, task_instance, session):