This is an automated email from the ASF dual-hosted git repository.
bolke pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git
The following commit(s) were added to refs/heads/main by this push:
new 2fc8d2a1d9 Refactor ObjectStorage into a Path (#35612)
2fc8d2a1d9 is described below
commit 2fc8d2a1d90d39e0ade38f969b65511b5cd677c2
Author: Bolke de Bruin <[email protected]>
AuthorDate: Sat Nov 18 21:38:14 2023 +0100
Refactor ObjectStorage into a Path (#35612)
---
airflow/example_dags/tutorial_objectstorage.py | 4 +-
airflow/io/path.py | 397 +++++++++++
airflow/io/store/path.py | 725 ---------------------
{tests/io/store => airflow/io/utils}/__init__.py | 0
airflow/io/{store => utils}/stat.py | 10 +-
.../providers/common/io/operators/file_transfer.py | 2 +-
.../apache-airflow/core-concepts/objectstorage.rst | 119 ++--
docs/spelling_wordlist.txt | 1 +
setup.cfg | 1 +
tests/io/{store/test_store.py => test_path.py} | 144 ++--
.../common/io/example_file_transfer_local_to_s3.py | 4 +-
11 files changed, 564 insertions(+), 843 deletions(-)
diff --git a/airflow/example_dags/tutorial_objectstorage.py
b/airflow/example_dags/tutorial_objectstorage.py
index 5394238c7a..47db595c24 100644
--- a/airflow/example_dags/tutorial_objectstorage.py
+++ b/airflow/example_dags/tutorial_objectstorage.py
@@ -23,7 +23,7 @@ import pendulum
import requests
from airflow.decorators import dag, task
-from airflow.io.store.path import ObjectStoragePath
+from airflow.io.path import ObjectStoragePath
# [END import_module]
@@ -93,7 +93,7 @@ def tutorial_objectstorage():
response.raise_for_status()
# ensure the bucket exists
- base.mkdir(exists_ok=True)
+ base.mkdir(exist_ok=True)
formatted_date = execution_date.format("YYYYMMDD")
path = base / f"air_quality_{formatted_date}.parquet"
diff --git a/airflow/io/path.py b/airflow/io/path.py
new file mode 100644
index 0000000000..9fd51091dc
--- /dev/null
+++ b/airflow/io/path.py
@@ -0,0 +1,397 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+from __future__ import annotations
+
+import contextlib
+import functools
+import os
+import shutil
+import typing
+from pathlib import PurePath
+from urllib.parse import urlsplit
+
+from fsspec.core import split_protocol
+from fsspec.utils import stringify_path
+from upath.implementations.cloud import CloudPath, _CloudAccessor
+from upath.registry import get_upath_class
+
+from airflow.io.store import attach
+from airflow.io.utils.stat import stat_result
+
+if typing.TYPE_CHECKING:
+ from urllib.parse import SplitResult
+
+ from fsspec import AbstractFileSystem
+
+
+PT = typing.TypeVar("PT", bound="ObjectStoragePath")
+
+default = "file"
+
+
+class _AirflowCloudAccessor(_CloudAccessor):
+ __slots__ = ("_store",)
+
+ def __init__(self, parsed_url: SplitResult | None, **kwargs: typing.Any)
-> None:
+ store = kwargs.pop("store", None)
+ conn_id = kwargs.pop("conn_id", None)
+ if store:
+ self._store = store
+ elif parsed_url and parsed_url.scheme:
+ self._store = attach(parsed_url.scheme, conn_id)
+ else:
+ self._store = attach(default, conn_id)
+
+ @property
+ def _fs(self) -> AbstractFileSystem:
+ return self._store.fs
+
+ def __eq__(self, other):
+ return isinstance(other, _AirflowCloudAccessor) and self._store ==
other._store
+
+
+class ObjectStoragePath(CloudPath):
+ """A path-like object for object storage."""
+
+ _accessor: _AirflowCloudAccessor
+
+ __version__: typing.ClassVar[int] = 1
+
+ _default_accessor = _AirflowCloudAccessor
+
+ sep: typing.ClassVar[str] = "/"
+ root_marker: typing.ClassVar[str] = "/"
+
+ _bucket: str
+ _key: str
+ _protocol: str
+ _hash: int | None
+
+ __slots__ = (
+ "_bucket",
+ "_key",
+ "_conn_id",
+ "_protocol",
+ "_hash",
+ )
+
+ def __new__(cls: type[PT], *args: str | os.PathLike, **kwargs: typing.Any)
-> PT:
+ args_list = list(args)
+
+ try:
+ other = args_list.pop(0)
+ except IndexError:
+ other = "."
+ else:
+ other = other or "."
+
+ if isinstance(other, PurePath):
+ _cls: typing.Any = type(other)
+ drv, root, parts = _cls._parse_args(args_list)
+ drv, root, parts = _cls._flavour.join_parsed_parts(
+ other._drv, # type: ignore[attr-defined]
+ other._root, # type: ignore[attr-defined]
+ other._parts, # type: ignore[attr-defined]
+ drv,
+ root,
+ parts, # type: ignore
+ )
+
+ _kwargs = getattr(other, "_kwargs", {})
+ _url = getattr(other, "_url", None)
+ other_kwargs = _kwargs.copy()
+ if _url and _url.scheme:
+ other_kwargs["url"] = _url
+ new_kwargs = _kwargs.copy()
+ new_kwargs.update(kwargs)
+
+ return _cls(_cls._format_parsed_parts(drv, root, parts,
**other_kwargs), **new_kwargs)
+
+ url = stringify_path(other)
+ parsed_url: SplitResult = urlsplit(url)
+ protocol: str | None = split_protocol(url)[0] or parsed_url.scheme
+
+ # allow override of protocol
+ protocol = kwargs.get("scheme", protocol)
+
+ for key in ["scheme", "url"]:
+ val = kwargs.pop(key, None)
+ if val:
+ parsed_url = parsed_url._replace(**{key: val})
+
+ if not parsed_url.path:
+ parsed_url = parsed_url._replace(path="/") # ensure path has root
+
+ if not protocol:
+ args_list.insert(0, url)
+ else:
+ args_list.insert(0, parsed_url.path)
+
+ return cls._from_parts(args_list, url=parsed_url, **kwargs) # type:
ignore
+
+ @functools.lru_cache
+ def __hash__(self) -> int:
+ return hash(self._bucket)
+
+ def samestore(self, other: typing.Any) -> bool:
+ return isinstance(other, ObjectStoragePath) and self._accessor ==
other._accessor
+
+ @property
+ def container(self) -> str:
+ return self.bucket
+
+ @property
+ def bucket(self) -> str:
+ if self._url:
+ return self._url.netloc
+ else:
+ return ""
+
+ @property
+ def key(self) -> str:
+ if self._url:
+ return self._url.path
+ else:
+ return ""
+
+ def stat(self) -> stat_result: # type: ignore[override]
+ """Call ``stat`` and return the result."""
+ return stat_result(
+ self._accessor.stat(self),
+ protocol=self.protocol,
+ conn_id=self._accessor._store.conn_id,
+ )
+
+ def samefile(self, other_path: typing.Any) -> bool:
+ """Return whether other_path is the same or not as this file."""
+ if not isinstance(other_path, ObjectStoragePath):
+ return False
+
+ st = self.stat()
+ other_st = other_path.stat()
+
+ return (
+ st["protocol"] == other_st["protocol"]
+ and st["conn_id"] == other_st["conn_id"]
+ and st["ino"] == other_st["ino"]
+ )
+
+ def _scandir(self):
+ # Emulate os.scandir(), which returns an object that can be used as a
+ # context manager.
+ return contextlib.nullcontext(self.iterdir())
+
+ def replace(self, target) -> ObjectStoragePath:
+ """
+ Rename this path to the target path, overwriting if that path exists.
+
+ The target path may be absolute or relative. Relative paths are
+ interpreted relative to the current working directory, *not* the
+ directory of the Path object.
+
+ Returns the new Path instance pointing to the target path.
+ """
+ return self.rename(target, overwrite=True)
+
+ @classmethod
+ def cwd(cls):
+ if cls is ObjectStoragePath:
+ return get_upath_class("").cwd()
+ else:
+ raise NotImplementedError
+
+ @classmethod
+ def home(cls):
+ if cls is ObjectStoragePath:
+ return get_upath_class("").home()
+ else:
+ raise NotImplementedError
+
+ # EXTENDED OPERATIONS
+
+ def ukey(self) -> str:
+ """Hash of file properties, to tell if it has changed."""
+ return self.fs.ukey(self.path)
+
+ def checksum(self) -> int:
+ """Return the checksum of the file at this path."""
+ # we directly access the fs here to avoid changing the abstract
interface
+ return self.fs.checksum(self.path)
+
+ def read_block(self, offset: int, length: int, delimiter=None):
+ r"""Read a block of bytes.
+
+ Starting at ``offset`` of the file, read ``length`` bytes. If
+ ``delimiter`` is set then we ensure that the read starts and stops at
+ delimiter boundaries that follow the locations ``offset`` and ``offset
+ + length``. If ``offset`` is zero then we start at zero. The
+ bytestring returned WILL include the end delimiter string.
+
+ If offset+length is beyond the eof, reads to eof.
+
+ :param offset: int
+ Byte offset to start read
+ :param length: int
+ Number of bytes to read. If None, read to the end.
+ :param delimiter: bytes (optional)
+ Ensure reading starts and stops at delimiter bytestring
+
+ Examples
+ --------
+ >>> read_block(0, 13)
+ b'Alice, 100\\nBo'
+ >>> read_block(0, 13, delimiter=b'\\n')
+ b'Alice, 100\\nBob, 200\\n'
+
+ Use ``length=None`` to read to the end of the file.
+ >>> read_block(0, None, delimiter=b'\\n')
+ b'Alice, 100\\nBob, 200\\nCharlie, 300'
+
+ See Also
+ --------
+ :func:`fsspec.utils.read_block`
+ """
+ return self.fs.read_block(self.path, offset=offset, length=length,
delimiter=delimiter)
+
+ def sign(self, expiration: int = 100, **kwargs):
+ """Create a signed URL representing the given path.
+
+ Some implementations allow temporary URLs to be generated, as a
+ way of delegating credentials.
+
+ :param path: str
+ The path on the filesystem
+ :param expiration: int
+ Number of seconds to enable the URL for (if
supported)
+
+ :returns URL: str
+ The signed URL
+
+ :raises NotImplementedError: if the method is not implemented for a
store
+ """
+ return self.fs.sign(self.path, expiration=expiration, **kwargs)
+
+ def size(self) -> int:
+ """Size in bytes of the file at this path."""
+ return self.fs.size(self.path)
+
+ def _cp_file(self, dst: ObjectStoragePath, **kwargs):
+ """Copy a single file from this path to another location by streaming
the data."""
+ # create the directory or bucket if required
+ if dst.key.endswith(self.sep) or not dst.key:
+ dst.mkdir(exist_ok=True, parents=True)
+ dst = dst / self.key
+ elif dst.is_dir():
+ dst = dst / self.key
+
+ # streaming copy
+ with self.open("rb") as f1, dst.open("wb") as f2:
+ # make use of system dependent buffer size
+ shutil.copyfileobj(f1, f2, **kwargs)
+
+ def copy(self, dst: str | ObjectStoragePath, recursive: bool = False,
**kwargs) -> None:
+ """Copy file(s) from this path to another location.
+
+ For remote to remote copies, the key used for the destination will be
the same as the source.
+ So that s3://src_bucket/foo/bar will be copied to
gcs://dst_bucket/foo/bar and not
+ gcs://dst_bucket/bar.
+
+ :param dst: Destination path
+ :param recursive: If True, copy directories recursively.
+
+ kwargs: Additional keyword arguments to be passed to the underlying
implementation.
+ """
+ if isinstance(dst, str):
+ dst = ObjectStoragePath(dst)
+
+ # same -> same
+ if self.samestore(dst):
+ self.fs.copy(self.path, dst.path, recursive=recursive, **kwargs)
+ return
+
+ # use optimized path for local -> remote or remote -> local
+ if self.protocol == "file":
+ dst.fs.put(self.path, dst.path, recursive=recursive, **kwargs)
+ return
+
+ if dst.protocol == "file":
+ self.fs.get(self.path, dst.path, recursive=recursive, **kwargs)
+ return
+
+ if not self.exists():
+ raise FileNotFoundError(f"{self} does not exist")
+
+ # remote dir -> remote dir
+ if self.is_dir():
+ if dst.is_file():
+ raise ValueError("Cannot copy directory to a file.")
+
+ dst.mkdir(exist_ok=True, parents=True)
+
+ out = self.fs.expand_path(self.path, recursive=True, **kwargs)
+
+ for path in out:
+ # this check prevents one extra call to is_dir() as
+ # glob returns self as well
+ if path == self.path:
+ continue
+
+ src_obj = ObjectStoragePath(path,
conn_id=self._accessor._store.conn_id)
+
+ # skip directories, empty directories will not be created
+ if src_obj.is_dir():
+ continue
+
+ src_obj._cp_file(dst)
+
+ return
+
+ # remote file -> remote dir
+ self._cp_file(dst, **kwargs)
+
+ def move(self, path: str | ObjectStoragePath, recursive: bool = False,
**kwargs) -> None:
+ """Move file(s) from this path to another location.
+
+ :param path: Destination path
+ :param recursive: bool
+ If True, move directories recursively.
+
+ kwargs: Additional keyword arguments to be passed to the underlying
implementation.
+ """
+ if isinstance(path, str):
+ path = ObjectStoragePath(path)
+
+ if self.samestore(path):
+ return self.fs.move(self.path, path.path, recursive=recursive,
**kwargs)
+
+ # non-local copy
+ self.copy(path, recursive=recursive, **kwargs)
+ self.unlink()
+
+ def serialize(self) -> dict[str, str]:
+ return {
+ "path": str(self),
+ **self._kwargs,
+ }
+
+ @classmethod
+ def deserialize(cls, data: dict, version: int) -> ObjectStoragePath:
+ if version > cls.__version__:
+ raise ValueError(f"Cannot deserialize version {version} with
version {cls.__version__}.")
+
+ path = data.pop("path")
+ return ObjectStoragePath(path, **data)
diff --git a/airflow/io/store/path.py b/airflow/io/store/path.py
deleted file mode 100644
index 17e951886f..0000000000
--- a/airflow/io/store/path.py
+++ /dev/null
@@ -1,725 +0,0 @@
-# Licensed to the Apache Software Foundation (ASF) under one
-# or more contributor license agreements. See the NOTICE file
-# distributed with this work for additional information
-# regarding copyright ownership. The ASF licenses this file
-# to you under the Apache License, Version 2.0 (the
-# "License"); you may not use this file except in compliance
-# with the License. You may obtain a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing,
-# software distributed under the License is distributed on an
-# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-# KIND, either express or implied. See the License for the
-# specific language governing permissions and limitations
-# under the License.
-from __future__ import annotations
-
-import contextlib
-import functools
-import os
-import shutil
-import typing
-from io import UnsupportedOperation
-from stat import S_ISLNK
-
-from fsspec.utils import stringify_path
-
-from airflow.io.store import ObjectStore, attach
-from airflow.io.store.stat import stat_result
-
-if typing.TYPE_CHECKING:
- from fsspec import AbstractFileSystem
-
-
-def _rewrite_info(info: dict, store: ObjectStore) -> dict:
- info["name"] = ObjectStoragePath(info["name"], store=store)
- return info
-
-
[email protected]_ordering
-class ObjectStoragePath(os.PathLike):
- """A path-like object for object storage."""
-
- __version__: typing.ClassVar[int] = 1
-
- sep: typing.ClassVar[str] = "/"
- root_marker: typing.ClassVar[str] = "/"
-
- _store: ObjectStore | None
- _bucket: str
- _key: str
- _conn_id: str | None
- _protocol: str
- _hash: int | None
-
- __slots__ = (
- "_store",
- "_bucket",
- "_key",
- "_conn_id",
- "_protocol",
- "_hash",
- )
-
- def __init__(
- self,
- path: str | ObjectStoragePath,
- *,
- conn_id: str | None = None,
- store: ObjectStore | None = None,
- ) -> None:
- self._conn_id = conn_id
- self._store = store
-
- if isinstance(path, ObjectStoragePath):
- self._protocol = path._protocol
- self._bucket = path._bucket
- self._key = path._key
- self._store = path._store
- else:
- self._protocol, self._bucket, self._key = self._split_path(path)
-
- if store:
- self._conn_id = store.conn_id
- self._protocol = self._protocol if self._protocol else
store.protocol
- elif self._protocol and not self._store:
- self._store = attach(self._protocol, conn_id)
-
- @classmethod
- def _split_path(cls, p: typing.Any) -> tuple[str, str, str]:
- protocol, _, path = str(stringify_path(p)).rpartition("://")
-
- if cls.sep not in path:
- bucket = path
- key = ""
- else:
- bucket, key = path.split(cls.sep, 1)
-
- # we don't care about versions etc
- return protocol, bucket, key
-
- def __repr__(self) -> str:
- return f"<{type(self).__name__}('{self}')>"
-
- def __str__(self) -> str:
- return (
- f"{self._protocol}://{self._bucket}/{self._key}"
- if self._protocol
- else f"{self._bucket}/{self._key}"
- )
-
- __fspath__ = __str__
-
- def __lt__(self, other: typing.Any) -> bool:
- if not isinstance(other, ObjectStoragePath):
- return NotImplemented
- return self._bucket < other._bucket
-
- def __eq__(self, other: typing.Any) -> bool:
- if not isinstance(other, ObjectStoragePath):
- return NotImplemented
- return self._bucket == other._bucket
-
- def __ne__(self, other: typing.Any) -> bool:
- if not isinstance(other, ObjectStoragePath):
- return NotImplemented
- return self._bucket != other._bucket
-
- @functools.lru_cache
- def __hash__(self) -> int:
- return hash(self._bucket)
-
- def __truediv__(self, other: typing.Any) -> ObjectStoragePath:
- o_protocol, o_bucket, _ = self._split_path(other)
- if isinstance(other, ObjectStoragePath) and o_bucket and self._bucket
!= o_bucket:
- raise ValueError("Cannot combine paths from different buckets /
containers")
-
- if o_protocol and self._protocol != o_protocol:
- raise ValueError("Cannot combine paths from different protocols")
-
- self_path = str(stringify_path(self))
- other_path = str(stringify_path(other))
-
- path = f"{self_path.rstrip(self.sep)}/{other_path.lstrip(self.sep)}"
- return ObjectStoragePath(path, conn_id=self._conn_id)
-
- def _unsupported(self, method_name: str) -> typing.NoReturn:
- msg = f"{type(self).__name__}.{method_name}() is unsupported"
- raise UnsupportedOperation(msg)
-
- def samestore(self, other: typing.Any) -> bool:
- return isinstance(other, ObjectStoragePath) and self._store ==
other._store
-
- @property
- def container(self) -> str:
- return self._bucket
-
- @property
- def bucket(self) -> str:
- return self._bucket
-
- @property
- def key(self) -> str:
- return self._key
-
- @property
- def store(self) -> ObjectStore:
- if not self._store:
- raise ValueError("Cannot do operations. No store attached.")
-
- return self._store
-
- @property
- def protocol(self) -> str:
- return self._protocol
-
- @property
- def fs(self) -> AbstractFileSystem:
- return self.store.fs
-
- @property
- def parent(self) -> ObjectStoragePath:
- return ObjectStoragePath(self.store.fs._parent(str(self)),
store=self.store)
-
- def stat(self, *, follow_symlinks: bool = True) -> stat_result:
- """Return the result of the `stat()` call.""" # noqa: D402
- return stat_result(self.store.fs.stat(self),
protocol=self.store.protocol, conn_id=self.store.conn_id)
-
- def lstat(self) -> stat_result:
- """Like stat() except that it doesn't follow symlinks."""
- return self.stat(follow_symlinks=False)
-
- def exists(self) -> bool:
- """Whether this path exists."""
- return self.store.fs.exists(self)
-
- def is_dir(self) -> bool:
- """Return True if this path is directory like."""
- return self.store.fs.isdir(self)
-
- def is_file(self) -> bool:
- """Return True if this path is a regular file."""
- return self.store.fs.isfile(self)
-
- def is_mount(self) -> bool:
- self._unsupported("is_mount")
-
- def is_symlink(self) -> bool:
- """Whether this path is a symbolic link."""
- try:
- return S_ISLNK(self.lstat().st_mode)
- except OSError:
- # Path doesn't exist
- return False
- except ValueError:
- # Non-encodable path
- return False
-
- def is_block_device(self) -> bool:
- self._unsupported("is_block_device")
-
- def is_char_device(self) -> bool:
- self._unsupported("is_char_device")
-
- def is_fifo(self) -> bool:
- self._unsupported("is_fifo")
-
- def is_socket(self) -> bool:
- self._unsupported("is_socket")
-
- def samefile(self, other_path: typing.Any) -> bool:
- """Return whether other_path is the same or not as this file."""
- if not isinstance(other_path, ObjectStoragePath):
- return False
-
- st = self.stat()
- other_st = other_path.stat()
-
- return (
- st["protocol"] == other_st["protocol"]
- and st["conn_id"] == other_st["conn_id"]
- and st["ino"] == other_st["ino"]
- )
-
- def checksum(self) -> int:
- """Return the checksum of the file at this path."""
- return self.store.fs.checksum(self)
-
- def open(
- self,
- mode="rb",
- block_size=None,
- cache_options=None,
- compression=None,
- encoding=None,
- errors=None,
- newline=None,
- **kwargs,
- ) -> typing.IO:
- """
- Return a file-like object from the filesystem.
-
- The resultant instance must function correctly in a context 'with'
block.
-
- :param mode: str like 'rb', 'w'
- See builtin 'open()'.
- :param block_size: int
- Some indication of buffering - this is a value in
bytes.
- :param cache_options: dict, optional
- Extra arguments to pass through to the cache.
- :param compression: string or None
- If given, open file using a compression codec. Can
either be a compression
- name (a key in 'fsspec.compression.compr') or 'infer'
to guess the
- compression from the filename suffix.
- :param encoding: passed on to TextIOWrapper for text mode
- :param errors: passed on to TextIOWrapper for text mode
- :param newline: passed on to TextIOWrapper for text mode
-
- kwargs: Additional keyword arguments to be passed on.
- """
- return self.store.fs.open(
- str(self),
- mode=mode,
- block_size=block_size,
- cache_options=cache_options,
- compression=compression,
- encoding=encoding,
- errors=errors,
- newline=newline,
- **kwargs,
- )
-
- def read_bytes(self, start: int | None = None, end: int | None = None) ->
bytes:
- """Open the file in bytes mode, read it, and close the file."""
- return self.store.fs.read_bytes(str(self), start=start, end=end)
-
- def read_text(self, encoding=None, errors=None, newline=None, **kwargs) ->
str:
- """Open the file in text mode, read it, and close the file."""
- return self.store.fs.read_text(str(self), encoding=encoding,
errors=errors, newline=newline, **kwargs)
-
- def write_bytes(self, data, **kwargs) -> int:
- """Open the file in bytes mode, write to it, and close the file."""
- return self.store.fs.pipe_file(self, value=data, **kwargs)
-
- def write_text(self, data, encoding=None, errors=None, newline=None,
**kwargs) -> int:
- """Open the file in text mode, write to it, and close the file."""
- return self.store.fs.write_text(
- str(self), value=data, encoding=encoding, errors=errors,
newline=newline, **kwargs
- )
-
- def iterdir(self):
- """Iterate over the files in this directory."""
- self._unsupported("iterdir")
-
- def _scandir(self):
- # Emulate os.scandir(), which returns an object that can be used as a
- # context manager.
- return contextlib.nullcontext(self.iterdir())
-
- def glob(self, pattern: str, maxdepth: int | None = None, **kwargs):
- """
- Find files by glob-matching.
-
- If the path ends with '/', only folders are returned.
-
- We support ``"**"``,
- ``"?"`` and ``"[..]"``. We do not support ^ for pattern negation.
-
- The `maxdepth` option is applied on the first `**` found in the path.
-
- Search path names that contain embedded characters special to this
- implementation of glob may not produce expected results;
- e.g., 'foo/bar/*starredfilename*'.
-
- :param pattern: str
- The glob pattern to match against.
- :param maxdepth: int or None
- The maximum depth to search. If None, there is no
depth limit.
-
- kwargs: Additional keyword arguments to be passed on.
- """
- path = os.path.join(self._bucket, pattern)
-
- detail = kwargs.get("detail", False)
- items = self.store.fs.glob(path, maxdepth=maxdepth, **kwargs)
- if detail:
- t = {
- ObjectStoragePath(k, store=self.store): _rewrite_info(v,
self.store) for k, v in items.items()
- }
- return t
- else:
- return [ObjectStoragePath(c, store=self.store) for c in items]
-
- def rglob(self, maxdepth: int | None = None, **kwargs):
- self._unsupported("rglob")
-
- def walk(self, maxdepth: int | None = None, topdown: bool = True,
on_error: str = "omit", **kwargs):
- """
- Return all files belows path.
-
- List all files, recursing into subdirectories; output is
iterator-style,
- like ``os.walk()``. For a simple list of files, ``find()`` is
available.
-
- When topdown is True, the caller can modify the dirnames list in-place
(perhaps
- using del or slice assignment), and walk() will
- only recurse into the subdirectories whose names remain in dirnames;
- this can be used to prune the search, impose a specific order of
visiting,
- or even to inform walk() about directories the caller creates or
renames before
- it resumes walk() again.
- Modifying dirnames when topdown is False has no effect. (see os.walk)
-
- Note that the "files" outputted will include anything that is not
- a directory, such as links.
-
- :param maxdepth: int or None
- Maximum recursion depth. None means limitless, but not
recommended
- on link-based file-systems.
- :param topdown: bool (True)
- Whether to walk the directory tree from the top
downwards or from
- the bottom upwards.
- :param on_error: "omit", "raise", a collable
- if omit (default), path with exception will simply be
empty;
- If raise, an underlying exception will be raised;
- if callable, it will be called with a single OSError
instance as argument
- kwargs: Additional keyword arguments to be passed on.
- """
- detail = kwargs.get("detail", False)
- items = self.store.fs.walk(str(self), maxdepth=maxdepth,
topdown=topdown, on_error=on_error, **kwargs)
- if not detail:
- for path, dirs, files in items:
- yield ObjectStoragePath(path, store=self.store), dirs, files
- else:
- for path, dirs, files in items:
- yield (
- ObjectStoragePath(path, store=self.store),
- {k: _rewrite_info(v, self.store) for k, v in dirs.items()},
- {k: _rewrite_info(v, self.store) for k, v in
files.items()},
- )
-
- def ls(self, detail: bool = True, **kwargs) -> list[ObjectStoragePath] |
list[dict]:
- """
- List files at path.
-
- :param detail: bool
- If True, return a dict containing details about each
entry, otherwise
- return a list of paths.
-
- kwargs: Additional keyword arguments to be passed on.
- """
- items = self.store.fs.ls(str(self), detail=detail, **kwargs)
-
- if detail:
- return [_rewrite_info(c, self.store) for c in items]
- else:
- return [ObjectStoragePath(c, store=self.store) for c in items]
-
- def absolute(self) -> ObjectStoragePath:
- """Return an absolute version of this path. Resolving any aliases."""
- return ObjectStoragePath(f"{self.store.protocol}://{self._key}")
-
- def touch(self, truncate: bool = True) -> None:
- """Create an empty file, or update the timestamp.
-
- :param truncate: bool (True)
- If True, always set the file size to 0; if False,
update the timestamp and
- leave the file unchanged, if the backend allows this.
- """
- self.store.fs.touch(str(self), truncate=truncate)
-
- def mkdir(self, create_parents: bool = True, exists_ok: bool = False,
**kwargs) -> None:
- """
- Create a directory entry at the specified path or within a
bucket/container.
-
- For systems that don't have true directories, it may create a
directory entry
- for this instance only and not affect the real filesystem.
-
- :param create_parents: bool
- if True, this is equivalent to 'makedirs'.
- :param exists_ok: bool
- if True, do not raise an error if the target
directory already exists.
-
- kwargs: Additional keyword arguments, which may include permissions,
etc.
- """
- if not exists_ok and self.exists():
- raise FileExistsError(f"Target {self} exists")
- with contextlib.suppress(FileExistsError):
- self.store.fs.mkdir(str(self), create_parents=create_parents,
**kwargs)
-
- def unlink(self, recursive: bool = False, maxdepth: int | None = None) ->
None:
- """
- Remove this file or link.
-
- If the path is a directory, use rmdir() instead.
- """
- self.store.fs.rm(str(self), recursive=recursive, maxdepth=maxdepth)
-
- rm = unlink
- """
- Remove this file or link.
-
- Alias of unlink
- """
-
- def rmdir(self) -> None:
- """Remove this directory. The directory must be empty."""
- self.store.fs.rmdir(str(self))
-
- def rename(self, target: str | ObjectStoragePath, overwrite=False) ->
ObjectStoragePath:
- """
- Rename this path to the target path.
-
- The target path may be absolute or relative. Relative paths are
- interpreted relative to the current working directory, *not* the
- directory of the Path object.
-
- Returns the new Path instance pointing to the target path.
- """
- if isinstance(target, str):
- target = ObjectStoragePath(target, store=self.store)
-
- if not self.samestore(target):
- raise ValueError("You can only rename within the same store")
-
- if not overwrite and self.store.fs.exists(target):
- raise FileExistsError(f"Target {target} exists")
-
- return ObjectStoragePath(self.store.fs.mv(str(self), target),
store=self._store)
-
- def replace(self, target: str | ObjectStoragePath) -> ObjectStoragePath:
- """
- Rename this path to the target path, overwriting if that path exists.
-
- The target path may be absolute or relative. Relative paths are
- interpreted relative to the current working directory, *not* the
- directory of the Path object.
-
- Returns the new Path instance pointing to the target path.
- """
- return self.rename(target, overwrite=True)
-
- # EXTENDED OPERATIONS
-
- def ukey(self) -> str:
- """Hash of file properties, to tell if it has changed."""
- return self.store.fs.ukey(str(self))
-
- def read_block(self, offset: int, length: int, delimiter=None):
- r"""Read a block of bytes.
-
- Starting at ``offset`` of the file, read ``length`` bytes. If
- ``delimiter`` is set then we ensure that the read starts and stops at
- delimiter boundaries that follow the locations ``offset`` and ``offset
- + length``. If ``offset`` is zero then we start at zero. The
- bytestring returned WILL include the end delimiter string.
-
- If offset+length is beyond the eof, reads to eof.
-
- :param offset: int
- Byte offset to start read
- :param length: int
- Number of bytes to read. If None, read to the end.
- :param delimiter: bytes (optional)
- Ensure reading starts and stops at delimiter bytestring
-
- Examples
- --------
- >>> read_block(0, 13)
- b'Alice, 100\\nBo'
- >>> read_block(0, 13, delimiter=b'\\n')
- b'Alice, 100\\nBob, 200\\n'
-
- Use ``length=None`` to read to the end of the file.
- >>> read_block(0, None, delimiter=b'\\n')
- b'Alice, 100\\nBob, 200\\nCharlie, 300'
-
- See Also
- --------
- :func:`fsspec.utils.read_block`
- """
- return self.store.fs.read_block(str(self), offset, length,
delimiter=delimiter)
-
- def sign(self, expiration: int = 100, **kwargs):
- """Create a signed URL representing the given path.
-
- Some implementations allow temporary URLs to be generated, as a
- way of delegating credentials.
-
- :param path: str
- The path on the filesystem
- :param expiration: int
- Number of seconds to enable the URL for (if
supported)
-
- :returns URL: str
- The signed URL
-
- :raises NotImplementedError: if the method is not implemented for a
store
- """
- return self.store.fs.sign(str(self), expiration=expiration, **kwargs)
-
- def size(self) -> int:
- """Size in bytes of the file at this path."""
- return self.store.fs.size(self)
-
- def du(self, total: bool = True, maxdepth: int | None = None, withdirs:
bool = False, **kwargs):
- """Space used by files and optionally directories within a path.
-
- Directory size does not include the size of its contents.
-
- :param total: bool
- Whether to sum all the file sizes
- :param maxdepth: int or None
- Maximum number of directory levels to descend, None
for unlimited.
- :param withdirs: bool
- Whether to include directory paths in the output.
-
- kwargs: Additional keyword arguments to be passed on.
-
- :returns: Dict of {path: size} if total=False, or int otherwise, where
numbers
- refer to bytes used.
- """
- return self.store.fs.du(str(self), total=total, maxdepth=maxdepth,
withdirs=withdirs, **kwargs)
-
- def find(
- self, path: str, maxdepth: int | None = None, withdirs: bool = False,
detail: bool = False, **kwargs
- ):
- """List all files below the specified path.
-
- Like posix ``find`` command without conditions.
-
- :param path: str
- Path pattern to search.
- :param maxdepth: int or None
- If not None, the maximum number of levels to descend.
- :param withdirs: bool
- Whether to include directory paths in the output.
This is True
- when used by glob, but users usually only want files.
- :param detail: bool
- Whether to include file info.
-
- kwargs: Additional keyword arguments to be passed to ``ls``.
- """
- path = self.sep.join([str(self), path.lstrip("/")])
- items = self.store.fs.find(path, maxdepth=maxdepth, withdirs=withdirs,
detail=detail, **kwargs)
-
- if detail:
- return {
- ObjectStoragePath(k, store=self.store): _rewrite_info(v,
self.store) for k, v in items.items()
- }
- else:
- return [ObjectStoragePath(c, store=self.store) for c in items]
-
- def _cp_file(self, dst: str | ObjectStoragePath, **kwargs):
- """Copy a single file from this path to another location by streaming
the data."""
- if isinstance(dst, str):
- dst = ObjectStoragePath(dst)
-
- # create the directory or bucket if required
- if dst.key.endswith(self.sep) or not dst.key:
- dst.mkdir(exists_ok=True, create_parents=True)
- dst = dst / self.key
- elif dst.is_dir():
- dst = dst / self.key
-
- # streaming copy
- with self.open("rb") as f1, dst.open("wb") as f2:
- # make use of system dependent buffer size
- shutil.copyfileobj(f1, f2, **kwargs)
-
- def copy(self, dst: str | ObjectStoragePath, recursive: bool = False,
**kwargs) -> None:
- """Copy file(s) from this path to another location.
-
- For remote to remote copies, the key used for the destination will be
the same as the source.
- So that s3://src_bucket/foo/bar will be copied to
gcs://dst_bucket/foo/bar and not
- gcs://dst_bucket/bar.
-
- :param dst: Destination path
- :param recursive: If True, copy directories recursively.
-
- kwargs: Additional keyword arguments to be passed to the underlying
implementation.
- """
- if isinstance(dst, str):
- dst = ObjectStoragePath(dst)
-
- # same -> same
- if self.samestore(dst):
- self.store.fs.copy(str(self), dst, recursive=recursive, **kwargs)
- return
-
- # use optimized path for local -> remote or remote -> local
- if self.store.protocol == "file":
- lpath = self.store.fs._strip_protocol(str(self))
- dst.store.fs.put(lpath, str(dst), recursive=recursive, **kwargs)
- return
-
- if dst.store.protocol == "file":
- rpath = dst.store.fs._strip_protocol(str(dst))
- self.store.fs.get(str(self), rpath, recursive=recursive, **kwargs)
- return
-
- if not self.exists():
- raise FileNotFoundError(f"{self} does not exist")
-
- # remote dir -> remote dir
- if self.is_dir():
- if dst.is_file():
- raise ValueError("Cannot copy directory to a file.")
-
- dst.mkdir(exists_ok=True, create_parents=True)
-
- out = self.store.fs.expand_path(str(self), recursive=True,
**kwargs)
- source_stripped = self.store.fs._strip_protocol(str(self))
-
- for path in out:
- # this check prevents one extra call to is_dir() as
- # glob returns self as well
- if path == source_stripped:
- continue
-
- src_obj = ObjectStoragePath(path, store=self.store)
-
- # skip directories, empty directories will not be created
- if src_obj.is_dir():
- continue
-
- src_obj._cp_file(dst)
-
- return
-
- # remote file -> remote dir
- if self.is_file():
- self._cp_file(dst, **kwargs)
- return
-
- def move(self, path: str | ObjectStoragePath, recursive: bool = False,
**kwargs) -> None:
- """Move file(s) from this path to another location.
-
- :param path: Destination path
- :param recursive: bool
- If True, move directories recursively.
-
- kwargs: Additional keyword arguments to be passed to the underlying
implementation.
- """
- if isinstance(path, str):
- path = ObjectStoragePath(path)
-
- if self.samestore(path):
- return self.store.fs.move(str(self), str(path),
recursive=recursive, **kwargs)
-
- # non-local copy
- self.copy(path, recursive=recursive, **kwargs)
- self.unlink(recursive=recursive)
-
- def serialize(self) -> dict[str, str | ObjectStore]:
- return {
- "path": str(self),
- "store": self.store,
- }
-
- @classmethod
- def deserialize(cls, data: dict, version: int) -> ObjectStoragePath:
- if version > cls.__version__:
- raise ValueError(f"Cannot deserialize version {version} with
version {cls.__version__}.")
-
- return ObjectStoragePath(**data)
diff --git a/tests/io/store/__init__.py b/airflow/io/utils/__init__.py
similarity index 100%
rename from tests/io/store/__init__.py
rename to airflow/io/utils/__init__.py
diff --git a/airflow/io/store/stat.py b/airflow/io/utils/stat.py
similarity index 87%
rename from airflow/io/store/stat.py
rename to airflow/io/utils/stat.py
index 75131cda95..6d873b614d 100644
--- a/airflow/io/store/stat.py
+++ b/airflow/io/utils/stat.py
@@ -36,19 +36,19 @@ class stat_result(dict):
st_dev = property(lambda self: 0)
"""device"""
- st_size = property(lambda self: self._info.get("size", 0))
+ st_size = property(lambda self: self.get("size", 0))
"""total size, in bytes"""
- st_gid = property(lambda self: self._info.get("gid", 0))
+ st_gid = property(lambda self: self.get("gid", 0))
"""group ID of owner"""
- st_uid = property(lambda self: self._info.get("uid", 0))
+ st_uid = property(lambda self: self.get("uid", 0))
"""user ID of owner"""
- st_ino = property(lambda self: self._info.get("ino", 0))
+ st_ino = property(lambda self: self.get("ino", 0))
"""inode"""
- st_nlink = property(lambda self: self._info.get("nlink", 0))
+ st_nlink = property(lambda self: self.get("nlink", 0))
"""number of hard links"""
@property
diff --git a/airflow/providers/common/io/operators/file_transfer.py
b/airflow/providers/common/io/operators/file_transfer.py
index f23fa422d9..e720f78666 100644
--- a/airflow/providers/common/io/operators/file_transfer.py
+++ b/airflow/providers/common/io/operators/file_transfer.py
@@ -19,7 +19,7 @@ from __future__ import annotations
from typing import TYPE_CHECKING, Sequence
-from airflow.io.store.path import ObjectStoragePath
+from airflow.io.path import ObjectStoragePath
from airflow.models import BaseOperator
if TYPE_CHECKING:
diff --git a/docs/apache-airflow/core-concepts/objectstorage.rst
b/docs/apache-airflow/core-concepts/objectstorage.rst
index 194db0477e..046cb48522 100644
--- a/docs/apache-airflow/core-concepts/objectstorage.rst
+++ b/docs/apache-airflow/core-concepts/objectstorage.rst
@@ -23,6 +23,8 @@ Object Storage
.. versionadded:: 2.8.0
+|experimental|
+
All major cloud providers offer persistent data storage in object stores.
These are not classic
"POSIX" file systems. In order to store hundreds of petabytes of data without
any single points
of failure, object stores replace the classic file system directory tree with
a simpler model
@@ -65,12 +67,12 @@ However, you should be aware of the limitations of object
storage when designing
Basic Use
---------
-To use object storage, you need to instantiate a Path-like (see below) object
with the URI of the
+To use object storage, you need to instantiate a Path (see below) object with
the URI of the
object you want to interact with. For example, to point to a bucket in s3, you
would do the following:
.. code-block:: python
- from airflow.io.store.path import ObjectStoragePath
+ from airflow.io.path import ObjectStoragePath
base = ObjectStoragePath("s3://my-bucket/", conn_id="aws_default") #
conn_id is optional
@@ -149,7 +151,7 @@ would do the following:
.. code-block:: python
- from airflow.io.store.path import ObjectStoragePath
+ from airflow.io.path import ObjectStoragePath
from airflow.io.store import attach
from fsspec.implementations.dbfs import DBFSFileSystem
@@ -165,27 +167,15 @@ would do the following:
.. _concepts:api:
-Path-like API
+Path API
-------------
-The object storage abstraction is implemented as a `Path-like API
<https://docs.python.org/3/library/pathlib.html>`_.
-This means that you can mostly use the same API to interact with object
storage as you would with a local filesystem.
-In this section we only list the differences between the two APIs. Extended
operations beyond the standard Path API
-, like copying and moving, are listed in the next section. For details about
each operation, like what arguments
-they take, see the documentation of the
:class:`~airflow.io.store.path.ObjectStoragePath` class.
-
-
-stat
-^^^^
-
-Returns a ``stat_result`` like object that supports the following attributes:
``st_size``, ``st_mtime``, ``st_mode``,
-but also acts like a dictionary that can provide additional metadata about the
object. For example, for s3 it will,
-return the additional keys like: ``['ETag', 'ContentType']``. If your code
needs to be portable across different object
-store do not rely on the extended metadata.
-
-.. note::
- While ``stat`` does accept the ``follow_symlinks`` argument, it is not
passed on to the object storage backend as
- not all object storage does not support symlinks.
+The object storage abstraction is implemented as a `Path API
<https://docs.python.org/3/library/pathlib.html>`_.
+and builds upon `Universal Pathlib
<https://github.com/fsspec/universal_pathlib>`_ This means that you can mostly
use
+the same API to interact with object storage as you would with a local
filesystem. In this section we only list the
+differences between the two APIs. Extended operations beyond the standard Path
API, like copying and moving, are listed
+in the next section. For details about each operation, like what arguments
they take, see the documentation of
+the :class:`~airflow.io.path.ObjectStoragePath` class.
mkdir
@@ -194,57 +184,85 @@ mkdir
Create a directory entry at the specified path or within a bucket/container.
For systems that don't have true
directories, it may create a directory entry for this instance only and not
affect the real filesystem.
-If ``create_parents`` is ``True`` (the default), any missing parents of this
path are created as needed.
+If ``parents`` is ``True``, any missing parents of this path are created as
needed.
touch
^^^^^
-Create an empty file, or update the timestamp. If ``truncate`` is ``True``,
the file is truncated, which is the
-default.
+Create a file at this given path, or update the timestamp. If ``truncate`` is
``True``, the file is truncated, which is
+the default. If the file already exists, the function succeeds if
``exists_ok`` is true (and its modification time is
+updated to the current time), otherwise ``FileExistsError`` is raised.
+
+
+stat
+^^^^
+
+Returns a ``stat_result`` like object that supports the following attributes:
``st_size``, ``st_mtime``, ``st_mode``,
+but also acts like a dictionary that can provide additional metadata about the
object. For example, for s3 it will,
+return the additional keys like: ``['ETag', 'ContentType']``. If your code
needs to be portable across different object
+stores do not rely on the extended metadata.
.. _concepts:extended-operations:
-Extended Operations
--------------------
+Extensions
+----------
The following operations are not part of the standard Path API, but are
supported by the object storage abstraction.
-ukey
-^^^^
+bucket
+^^^^^^
-Hash of file properties, to tell if it has changed.
+Returns the bucket name.
checksum
^^^^^^^^
-Return the checksum of the file.
+Returns the checksum of the file.
-read_block
-^^^^^^^^^^
+container
+^^^^^^^^^
-Read a block of bytes from the file. This is useful for reading large files in
chunks.
+Alias of bucket
-du
+fs
^^
-Space used by files and optionally directories within a path.
+Convenience attribute to access an instantiated filesystem
+
+
+key
+^^^
+Returns the object key.
-find
+
+path
^^^^
+the ``fsspec`` compatible path for use with filesystem instances
-Find files and optionally directories within a path.
+protocol
+^^^^^^^^
-ls
-^^
+the filesystem_spec protocol.
+
+
+read_block
+^^^^^^^^^^
+
+Read a block of bytes from the file at this given path.
-List files within a path.
+Starting at offset of the file, read length bytes. If delimiter is set then we
ensure
+that the read starts and stops at delimiter boundaries that follow the
locations offset
+and offset + length. If offset is zero then we start at zero. The bytestring
returned
+WILL include the end delimiter string.
+
+If offset+length is beyond the eof, reads to eof.
sign
@@ -254,17 +272,22 @@ Create a signed URL representing the given path. Some
implementations allow temp
way of delegating credentials.
-copy
+size
^^^^
-Copy a file from one path to another. If the destination is a directory, the
file will be copied into it. If the
-destination is a file, it will be overwritten.
+Returns the size in bytes of the file at the given path.
+
-move
+storage_options
+^^^^^^^^^^^^^^^
+
+The storage options for instantiating the underlying filesystem.
+
+
+ukey
^^^^
-Move a file from one path to another. If the destination is a directory, the
file will be moved into it. If the
-destination is a file, it will be overwritten.
+Hash of file properties, to tell if it has changed.
.. _concepts:copying-and-moving:
@@ -291,7 +314,7 @@ are used to connect to s3 and a parquet file, indicated by
a ``ObjectStoragePath
.. code-block:: python
import duckdb
- from airflow.io.store.path import ObjectStoragePath
+ from airflow.io.path import ObjectStoragePath
path = ObjectStoragePath("s3://my-bucket/my-table.parquet",
conn_id="aws_default")
conn = duckdb.connect(database=":memory:")
diff --git a/docs/spelling_wordlist.txt b/docs/spelling_wordlist.txt
index 65bbc8cb6f..02380d9884 100644
--- a/docs/spelling_wordlist.txt
+++ b/docs/spelling_wordlist.txt
@@ -550,6 +550,7 @@ env
envFrom
EnvVar
envvar
+eof
eol
eols
eRevalue
diff --git a/setup.cfg b/setup.cfg
index 2cda0924cc..1bbf5fb8f1 100644
--- a/setup.cfg
+++ b/setup.cfg
@@ -163,6 +163,7 @@ install_requires =
# See https://github.com/apache/airflow/pull/31693
# We should also remove "licenses/LICENSE-unicodecsv.txt" file when we
remove this dependency
unicodecsv>=0.14.1
+ universal_pathlib>=0.1.4
werkzeug>=2.0
[options.packages.find]
diff --git a/tests/io/store/test_store.py b/tests/io/test_path.py
similarity index 65%
rename from tests/io/store/test_store.py
rename to tests/io/test_path.py
index 7a7018e7ec..832f8ae663 100644
--- a/tests/io/store/test_store.py
+++ b/tests/io/test_path.py
@@ -18,14 +18,16 @@
from __future__ import annotations
import uuid
+from stat import S_ISDIR, S_ISREG
+from tempfile import NamedTemporaryFile
from unittest import mock
import pytest
from fsspec.implementations.local import LocalFileSystem
from fsspec.utils import stringify_path
+from airflow.io.path import ObjectStoragePath
from airflow.io.store import _STORE_CACHE, ObjectStore, attach
-from airflow.io.store.path import ObjectStoragePath
from airflow.utils.module_loading import qualname
FAKE = "file:///fake"
@@ -58,19 +60,18 @@ class TestFs:
def test_init_objectstoragepath(self):
path = ObjectStoragePath("file://bucket/key/part1/part2")
assert path.bucket == "bucket"
- assert path.key == "key/part1/part2"
- assert path._protocol == "file"
+ assert path.key == "/key/part1/part2"
+ assert path.protocol == "file"
path2 = ObjectStoragePath(path / "part3")
assert path2.bucket == "bucket"
- assert path2.key == "key/part1/part2/part3"
- assert path2._protocol == "file"
+ assert path2.key == "/key/part1/part2/part3"
+ assert path2.protocol == "file"
- # check if we can append a non string to the path
- path3 = ObjectStoragePath(path2 / 2023)
+ path3 = ObjectStoragePath(path2 / "2023")
assert path3.bucket == "bucket"
- assert path3.key == "key/part1/part2/part3/2023"
- assert path3._protocol == "file"
+ assert path3.key == "/key/part1/part2/part3/2023"
+ assert path3.protocol == "file"
def test_read_write(self):
o = ObjectStoragePath(f"file:///tmp/{str(uuid.uuid4())}")
@@ -87,69 +88,43 @@ class TestFs:
filename = str(uuid.uuid4())
d = ObjectStoragePath(f"file:///tmp/{dirname}")
- d.mkdir(create_parents=True)
+ d.mkdir(parents=True)
o = d / filename
o.touch()
- data = d.ls()
+ data = list(d.iterdir())
assert len(data) == 1
- assert data[0]["name"] == o
+ assert data[0] == o
- data = d.ls(detail=False)
- assert data == [o]
-
- d.unlink(recursive=True)
+ d.rmdir(recursive=True)
assert not o.exists()
- def test_find(self):
- dirname = str(uuid.uuid4())
- filename = str(uuid.uuid4())
-
- d = ObjectStoragePath(f"file:///tmp/{dirname}")
- d.mkdir(create_parents=True)
- o = d / filename
- o.touch()
-
- data = d.find("")
- assert len(data) == 1
- assert data == [o]
-
- data = d.ls(detail=True)
- assert len(data) == 1
- assert data[0]["name"] == o
-
- d.unlink(recursive=True)
-
@pytest.mark.parametrize(
"fn, args, fn2, path, expected_args, expected_kwargs",
[
- ("du", {}, "du", FOO, BAR, {"total": True, "maxdepth": None,
"withdirs": False}),
- ("exists", {}, "exists", FOO, ObjectStoragePath(BAR), {}),
- ("checksum", {}, "checksum", FOO, ObjectStoragePath(BAR), {}),
- ("size", {}, "size", FOO, ObjectStoragePath(BAR), {}),
- ("is_dir", {}, "isdir", FOO, ObjectStoragePath(BAR), {}),
- ("is_file", {}, "isfile", FOO, ObjectStoragePath(BAR), {}),
- # ("is_symlink", {}, "islink", FOO, ObjectStoragePath(BAR), {}),
- ("touch", {}, "touch", FOO, BAR, {"truncate": True}),
- ("mkdir", {"exists_ok": True}, "mkdir", FOO, BAR,
{"create_parents": True}),
- ("read_text", {}, "read_text", FOO, BAR, {"encoding": None,
"errors": None, "newline": None}),
- ("read_bytes", {}, "read_bytes", FOO, BAR, {"start": None, "end":
None}),
- ("rm", {}, "rm", FOO, BAR, {"maxdepth": None, "recursive": False}),
- ("rmdir", {}, "rmdir", FOO, BAR, {}),
- ("write_bytes", {"data": b"foo"}, "pipe_file", FOO,
ObjectStoragePath(BAR), {"value": b"foo"}),
+ ("checksum", {}, "checksum", FOO,
FakeRemoteFileSystem._strip_protocol(BAR), {}),
+ ("size", {}, "size", FOO,
FakeRemoteFileSystem._strip_protocol(BAR), {}),
+ (
+ "sign",
+ {"expiration": 200, "extra": "xtra"},
+ "sign",
+ FOO,
+ FakeRemoteFileSystem._strip_protocol(BAR),
+ {"expiration": 200, "extra": "xtra"},
+ ),
+ ("ukey", {}, "ukey", FOO,
FakeRemoteFileSystem._strip_protocol(BAR), {}),
(
- "write_text",
- {"data": "foo"},
- "write_text",
+ "read_block",
+ {"offset": 0, "length": 1},
+ "read_block",
FOO,
- BAR,
- {"value": "foo", "encoding": None, "errors": None, "newline":
None},
+ FakeRemoteFileSystem._strip_protocol(BAR),
+ {"delimiter": None, "length": 1, "offset": 0},
),
- ("ukey", {}, "ukey", FOO, BAR, {}),
],
)
- def test_standard_api(self, fn, args, fn2, path, expected_args,
expected_kwargs):
+ def test_standard_extended_api(self, fn, args, fn2, path, expected_args,
expected_kwargs):
_fs = mock.Mock()
_fs._strip_protocol.return_value = "/"
_fs.conn_id = "fake"
@@ -160,6 +135,46 @@ class TestFs:
getattr(o, fn)(**args)
getattr(store.fs, fn2).assert_called_once_with(expected_args,
**expected_kwargs)
+ def test_stat(self):
+ with NamedTemporaryFile() as f:
+ o = ObjectStoragePath(f"file://{f.name}")
+ assert o.stat().st_size == 0
+ assert S_ISREG(o.stat().st_mode)
+ assert S_ISDIR(o.parent.stat().st_mode)
+
+ def test_bucket_key_protocol(self):
+ bucket = "bkt"
+ key = "yek"
+ protocol = "s3"
+
+ o = ObjectStoragePath(f"{protocol}://{bucket}/{key}")
+ assert o.bucket == bucket
+ assert o.container == bucket
+ assert o.key == f"/{key}"
+ assert o.protocol == protocol
+
+ def test_cwd_home(self):
+ assert ObjectStoragePath.cwd()
+ assert ObjectStoragePath.home()
+
+ def test_replace(self):
+ o = ObjectStoragePath(f"file:///tmp/{str(uuid.uuid4())}")
+ i = ObjectStoragePath(f"file:///tmp/{str(uuid.uuid4())}")
+
+ o.touch()
+ i.touch()
+
+ assert i.size() == 0
+
+ txt = "foo"
+ o.write_text(txt)
+ e = o.replace(i)
+ assert o.exists() is False
+ assert i == e
+ assert e.size() == len(txt)
+
+ e.unlink()
+
def test_move_local(self):
_from = ObjectStoragePath(f"file:///tmp/{str(uuid.uuid4())}")
_to = ObjectStoragePath(f"file:///tmp/{str(uuid.uuid4())}")
@@ -210,18 +225,27 @@ class TestFs:
assert (_to / _from.key / key).exists()
assert (_to / _from.key / key).is_file()
- _from.unlink(recursive=True)
- _to.unlink(recursive=True)
+ _from.rmdir(recursive=True)
+ _to.rmdir(recursive=True)
def test_serde_objectstoragepath(self):
path = "file://bucket/key/part1/part2"
o = ObjectStoragePath(path)
- s = o.serialize()
- d = ObjectStoragePath.deserialize(s, 1)
+ s = o.serialize()
assert s["path"] == path
+ d = ObjectStoragePath.deserialize(s, 1)
assert o == d
+ o = ObjectStoragePath(path, my_setting="foo")
+ s = o.serialize()
+ assert s["my_setting"] == "foo"
+
+ store = attach("filex", conn_id="mock")
+ o = ObjectStoragePath(path, store=store)
+ s = o.serialize()
+ assert s["store"] == store
+
def test_serde_store(self):
store = attach("file", conn_id="mock")
s = store.serialize()
diff --git
a/tests/system/providers/common/io/example_file_transfer_local_to_s3.py
b/tests/system/providers/common/io/example_file_transfer_local_to_s3.py
index ff5bfa0471..13c495e620 100644
--- a/tests/system/providers/common/io/example_file_transfer_local_to_s3.py
+++ b/tests/system/providers/common/io/example_file_transfer_local_to_s3.py
@@ -23,7 +23,7 @@ from typing import cast
from airflow import DAG
from airflow.decorators import task
-from airflow.io.store.path import ObjectStoragePath
+from airflow.io.path import ObjectStoragePath
from airflow.providers.common.io.operators.file_transfer import
FileTransferOperator
from airflow.utils.trigger_rule import TriggerRule
@@ -57,7 +57,7 @@ def delete_temp_file(path: ObjectStoragePath):
@task
def remove_bucket():
- AWS_BUCKET.unlink(recursive=True)
+ AWS_BUCKET.rmdir(recursive=True)
with DAG(