This is an automated email from the ASF dual-hosted git repository.

potiuk pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git


The following commit(s) were added to refs/heads/main by this push:
     new e166926c9b2 Update universal-pathlib to `>=0.3.8` and use 
upath.extensions.ProxyUPath (#60519)
e166926c9b2 is described below

commit e166926c9b26d9330ea0556a64fca05331d573db
Author: Andreas Poehlmann <[email protected]>
AuthorDate: Thu Jan 29 16:21:51 2026 +0100

    Update universal-pathlib to `>=0.3.8` and use upath.extensions.ProxyUPath 
(#60519)
    
    * pyproject.toml: update universal-pathlib version boundaries
    
    * task_sdk.io.path: fix test regarding relative_to
    
    * tests: adjust the lazy_load test to reflect caching is done via 
STORE_CACHE
    
    * tests: update tests to register fake remote filesystem in fsspec
    
    * airflow.sdk.io.path: implement ObjectStoragePath via ProxyUPath
    
    * airflow.sdk.io.path: provide a basic implementation for copy_into and 
move_into
    
    * airflow.sdk.io.path: fix __str__ method
    
    * airflow.sdk.io.path: docstring fixes
    
    * update spelling_wordlist.txt
---
 airflow-core/pyproject.toml             |   2 +-
 docs/spelling_wordlist.txt              |   2 +
 task-sdk/src/airflow/sdk/io/path.py     | 123 ++++++++++++++++++++------------
 task-sdk/tests/task_sdk/io/test_path.py |  34 ++++++---
 4 files changed, 105 insertions(+), 56 deletions(-)

diff --git a/airflow-core/pyproject.toml b/airflow-core/pyproject.toml
index 0eced80cc15..e964e101c05 100644
--- a/airflow-core/pyproject.toml
+++ b/airflow-core/pyproject.toml
@@ -144,7 +144,7 @@ dependencies = [
     "termcolor>=3.0.0",
     "typing-extensions>=4.14.1",
     # https://github.com/apache/airflow/issues/56369 , rework 
universal-pathlib usage
-    "universal-pathlib>=0.2.6,<0.3.0",
+    "universal-pathlib>=0.3.8",
     "uuid6>=2024.7.10",
     "apache-airflow-task-sdk<1.3.0,>=1.2.0",
     # pre-installed providers
diff --git a/docs/spelling_wordlist.txt b/docs/spelling_wordlist.txt
index 686489106be..7390ccf842c 100644
--- a/docs/spelling_wordlist.txt
+++ b/docs/spelling_wordlist.txt
@@ -1,5 +1,6 @@
 aarch
 abc
+AbstractFileSystem
 accessor
 AccessSecretVersionResponse
 accountmaking
@@ -736,6 +737,7 @@ fqdn
 frontend
 fs
 fsGroup
+fsspec
 fullname
 func
 Fundera
diff --git a/task-sdk/src/airflow/sdk/io/path.py 
b/task-sdk/src/airflow/sdk/io/path.py
index 3f87e6a1d95..89c49759c77 100644
--- a/task-sdk/src/airflow/sdk/io/path.py
+++ b/task-sdk/src/airflow/sdk/io/path.py
@@ -17,22 +17,21 @@
 
 from __future__ import annotations
 
-import contextlib
-import os
 import shutil
-from collections.abc import Mapping
 from typing import TYPE_CHECKING, Any, ClassVar
 from urllib.parse import urlsplit
 
 from fsspec.utils import stringify_path
-from upath.implementations.cloud import CloudPath
-from upath.registry import get_upath_class
+from upath import UPath
+from upath.extensions import ProxyUPath
 
 from airflow.sdk.io.stat import stat_result
 from airflow.sdk.io.store import attach
 
 if TYPE_CHECKING:
     from fsspec import AbstractFileSystem
+    from typing_extensions import Self
+    from upath.types import JoinablePathLike
 
 
 class _TrackingFileWrapper:
@@ -77,42 +76,48 @@ class _TrackingFileWrapper:
         self._obj.__exit__(exc_type, exc_val, exc_tb)
 
 
-class ObjectStoragePath(CloudPath):
+class ObjectStoragePath(ProxyUPath):
     """A path-like object for object storage."""
 
     __version__: ClassVar[int] = 1
 
-    _protocol_dispatch = False
-
     sep: ClassVar[str] = "/"
     root_marker: ClassVar[str] = "/"
 
     __slots__ = ("_hash_cached",)
 
-    @classmethod
-    def _transform_init_args(
-        cls,
-        args: tuple[str | os.PathLike, ...],
-        protocol: str,
-        storage_options: dict[str, Any],
-    ) -> tuple[tuple[str | os.PathLike, ...], str, dict[str, Any]]:
-        """Extract conn_id from the URL and set it as a storage option."""
+    def __init__(
+        self,
+        *args: JoinablePathLike,
+        protocol: str | None = None,
+        conn_id: str | None = None,
+        **storage_options: Any,
+    ) -> None:
+        # ensure conn_id is always set in storage_options
+        storage_options.setdefault("conn_id", None)
+        # parse conn_id from args if provided
         if args:
             arg0 = args[0]
-            parsed_url = urlsplit(stringify_path(arg0))
-            userinfo, have_info, hostinfo = parsed_url.netloc.rpartition("@")
-            if have_info:
-                storage_options.setdefault("conn_id", userinfo or None)
-                parsed_url = parsed_url._replace(netloc=hostinfo)
-            args = (parsed_url.geturl(),) + args[1:]
-            protocol = protocol or parsed_url.scheme
-        return args, protocol, storage_options
+            if isinstance(arg0, type(self)):
+                storage_options["conn_id"] = 
arg0.storage_options.get("conn_id")
+            else:
+                parsed_url = urlsplit(stringify_path(arg0))
+                userinfo, have_info, hostinfo = 
parsed_url.netloc.rpartition("@")
+                if have_info:
+                    conn_id = storage_options["conn_id"] = userinfo or None
+                    parsed_url = parsed_url._replace(netloc=hostinfo)
+                args = (parsed_url.geturl(),) + args[1:]
+                protocol = protocol or parsed_url.scheme
+        # override conn_id if explicitly provided
+        if conn_id is not None:
+            storage_options["conn_id"] = conn_id
+        super().__init__(*args, protocol=protocol, **storage_options)
 
-    @classmethod
-    def _fs_factory(
-        cls, urlpath: str, protocol: str, storage_options: Mapping[str, Any]
-    ) -> AbstractFileSystem:
-        return attach(protocol or "file", storage_options.get("conn_id")).fs
+    @property
+    def fs(self) -> AbstractFileSystem:
+        """Return the filesystem for this path, using airflow's attach 
mechanism."""
+        conn_id = self.storage_options.get("conn_id")
+        return attach(self.protocol or "file", conn_id).fs
 
     def __hash__(self) -> int:
         self._hash_cached: int
@@ -181,12 +186,7 @@ class ObjectStoragePath(CloudPath):
             and st["ino"] == other_st["ino"]
         )
 
-    def _scandir(self):
-        # Emulate os.scandir(), which returns an object that can be used as a
-        # context manager.
-        return contextlib.nullcontext(self.iterdir())
-
-    def replace(self, target) -> ObjectStoragePath:
+    def replace(self, target) -> Self:
         """
         Rename this path to the target path, overwriting if that path exists.
 
@@ -199,16 +199,12 @@ class ObjectStoragePath(CloudPath):
         return self.rename(target)
 
     @classmethod
-    def cwd(cls):
-        if cls is ObjectStoragePath:
-            return get_upath_class("").cwd()
-        raise NotImplementedError
+    def cwd(cls) -> Self:
+        return cls._from_upath(UPath.cwd())
 
     @classmethod
-    def home(cls):
-        if cls is ObjectStoragePath:
-            return get_upath_class("").home()
-        raise NotImplementedError
+    def home(cls) -> Self:
+        return cls._from_upath(UPath.home())
 
     # EXTENDED OPERATIONS
 
@@ -299,7 +295,7 @@ class ObjectStoragePath(CloudPath):
             # make use of system dependent buffer size
             shutil.copyfileobj(f1, f2, **kwargs)
 
-    def copy(self, dst: str | ObjectStoragePath, recursive: bool = False, 
**kwargs) -> None:
+    def copy(self, dst: str | ObjectStoragePath, recursive: bool = False, 
**kwargs) -> None:  # type: ignore[override]
         """
         Copy file(s) from this path to another location.
 
@@ -370,7 +366,23 @@ class ObjectStoragePath(CloudPath):
         # remote file -> remote dir
         self._cp_file(dst, **kwargs)
 
-    def move(self, path: str | ObjectStoragePath, recursive: bool = False, 
**kwargs) -> None:
+    def copy_into(self, target_dir: str | ObjectStoragePath, recursive: bool = 
False, **kwargs) -> None:  # type: ignore[override]
+        """
+        Copy file(s) from this path into another directory.
+
+        :param target_dir: Destination directory
+        :param recursive: If True, copy directories recursively.
+
+        kwargs: Additional keyword arguments to be passed to the underlying 
implementation.
+        """
+        if isinstance(target_dir, str):
+            target_dir = ObjectStoragePath(target_dir)
+        if not target_dir.is_dir():
+            raise NotADirectoryError(f"Destination {target_dir} is not a 
directory.")
+        dst_path = target_dir / self.name
+        self.copy(dst_path, recursive=recursive, **kwargs)
+
+    def move(self, path: str | ObjectStoragePath, recursive: bool = False, 
**kwargs) -> None:  # type: ignore[override]
         """
         Move file(s) from this path to another location.
 
@@ -394,6 +406,23 @@ class ObjectStoragePath(CloudPath):
         self.copy(path, recursive=recursive, **kwargs)
         self.unlink()
 
+    def move_into(self, target_dir: str | ObjectStoragePath, recursive: bool = 
False, **kwargs) -> None:  # type: ignore[override]
+        """
+        Move file(s) from this path into another directory.
+
+        :param target_dir: Destination directory
+        :param recursive: bool
+                         If True, move directories recursively.
+
+        kwargs: Additional keyword arguments to be passed to the underlying 
implementation.
+        """
+        if isinstance(target_dir, str):
+            target_dir = ObjectStoragePath(target_dir)
+        if not target_dir.is_dir():
+            raise NotADirectoryError(f"Destination {target_dir} is not a 
directory.")
+        dst_path = target_dir / self.name
+        self.move(dst_path, recursive=recursive, **kwargs)
+
     def serialize(self) -> dict[str, Any]:
         _kwargs = {**self.storage_options}
         conn_id = _kwargs.pop("conn_id", None)
@@ -417,6 +446,6 @@ class ObjectStoragePath(CloudPath):
 
     def __str__(self):
         conn_id = self.storage_options.get("conn_id")
-        if self._protocol and conn_id:
-            return f"{self._protocol}://{conn_id}@{self.path}"
+        if self.protocol and conn_id:
+            return f"{self.protocol}://{conn_id}@{self.path}"
         return super().__str__()
diff --git a/task-sdk/tests/task_sdk/io/test_path.py 
b/task-sdk/tests/task_sdk/io/test_path.py
index dd338e09f9d..01fc925de30 100644
--- a/task-sdk/tests/task_sdk/io/test_path.py
+++ b/task-sdk/tests/task_sdk/io/test_path.py
@@ -70,16 +70,19 @@ def test_home():
 def test_lazy_load():
     o = ObjectStoragePath("file:///tmp/foo")
     with pytest.raises(AttributeError):
-        assert o._fs_cached
+        assert o.__wrapped__._fs_cached
 
+    # ObjectStoragePath overrides .fs and provides cached filesystems via the 
STORE_CACHE
     assert o.fs is not None
-    assert o._fs_cached
+
+    with pytest.raises(AttributeError):
+        assert o.__wrapped__._fs_cached
     # Clear the cache to avoid side effects in other tests below
     _STORE_CACHE.clear()
 
 
 class _FakeRemoteFileSystem(MemoryFileSystem):
-    protocol = ("s3", "fakefs", "ffs", "ffs2")
+    protocol = ("s3", "fake", "fakefs", "ffs", "ffs2")
     root_marker = ""
     store: ClassVar[dict[str, Any]] = {}
     pseudo_dirs = [""]
@@ -99,6 +102,21 @@ class _FakeRemoteFileSystem(MemoryFileSystem):
         return path
 
 
[email protected](scope="module", autouse=True)
+def register_fake_remote_filesystem():
+    # Register the fake filesystem with fsspec so UPath can discover it
+    from fsspec.registry import _registry as fsspec_implementation_registry, 
register_implementation
+
+    old_registry = fsspec_implementation_registry.copy()
+    try:
+        for proto in _FakeRemoteFileSystem.protocol:
+            register_implementation(proto, _FakeRemoteFileSystem, clobber=True)
+        yield
+    finally:
+        fsspec_implementation_registry.clear()
+        fsspec_implementation_registry.update(old_registry)
+
+
 class TestAttach:
     FAKE = "ffs:///fake"
     MNT = "ffs:///mnt/warehouse"
@@ -168,10 +186,6 @@ class TestAttach:
 
 
 class TestRemotePath:
-    @pytest.fixture(autouse=True)
-    def fake_fs(self, monkeypatch):
-        monkeypatch.setattr(ObjectStoragePath, "_fs_factory", lambda *a, **k: 
_FakeRemoteFileSystem())
-
     def test_bucket_key_protocol(self):
         bucket = "bkt"
         key = "yek"
@@ -262,7 +276,11 @@ class TestLocalPath:
         o1 = ObjectStoragePath(f"file://{target}")
         o2 = ObjectStoragePath(f"file://{tmp_path.as_posix()}")
         o3 = ObjectStoragePath(f"file:///{uuid.uuid4()}")
-        assert o1.relative_to(o2) == o1
+        # relative_to returns the relative path from o2 to o1
+        relative = o1.relative_to(o2)
+        # The relative path should be the basename (uuid) of the target
+        expected_relative = target.split("/")[-1]
+        assert str(relative) == expected_relative
         with pytest.raises(ValueError, match="is not in the subpath of"):
             o1.relative_to(o3)
 

Reply via email to