This is an automated email from the ASF dual-hosted git repository.
apitrou pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/arrow.git
The following commit(s) were added to refs/heads/main by this push:
new 58318efac4 GH-44900: [Python] Support explicit `fsspec+{protocol}` and
`hf://` filesystem URIs (#45089)
58318efac4 is described below
commit 58318efac40af22ff76c43f9ffbc125ac7c7218e
Author: Krisztián Szűcs <[email protected]>
AuthorDate: Tue Jul 1 17:14:14 2025 +0200
GH-44900: [Python] Support explicit `fsspec+{protocol}` and `hf://`
filesystem URIs (#45089)
### Rationale for this change
Make filesystem operations more convenient to users by supporting:
- `fsspec` filesystems explicitly by passing `fsspec+` prefix to the
filsystem scheme
- explicitly support `hf://` scheme by loading the huggingface fsspec
filesystem adapter
### What changes are included in this PR?
`_resolve_filesystem_and_path()` is now able to recognize
`fsspec+{protocol}` and `hf://` scheme in which case it attempts to load the
specific fsspec filesystem implementation. Made some additional refactoring to
consolidate the filesystem from string logic.
### Are these changes tested?
Yes. `fsspec+{protocol}` uris are tested using the
`fsspec.MemoryFilesystem` whereas the `hf://` scheme is tested by mocking out
the `huggingface_hub.HfFileSystem` class. I also tested the huggingface
integration using a live huggingface repository.
### Are there any user-facing changes?
Various user facing functions now support more filesystem implementations:
```py
pyarrow.fs.copy_files()
ParquetFile()
ParquetDataset()
dt.dataset()
pq.read_metadata()
pq.write_metadata()
pq.read_table()
pq.read_schema()
orc.read_table()
```
This includes both the path containing additional schemes and the
`filesystem=` argument.
* GitHub Issue: #44900
Authored-by: Krisztian Szucs <[email protected]>
Signed-off-by: Antoine Pitrou <[email protected]>
---
docs/source/python/filesystems.rst | 26 +++++++
python/pyarrow/_fs.pyx | 55 +++++++++++----
python/pyarrow/fs.py | 85 +++++++++++------------
python/pyarrow/parquet/core.py | 15 ++--
python/pyarrow/tests/parquet/test_parquet_file.py | 65 +++++++++++++++++
python/pyarrow/tests/test_fs.py | 35 ++++++++++
6 files changed, 215 insertions(+), 66 deletions(-)
diff --git a/docs/source/python/filesystems.rst
b/docs/source/python/filesystems.rst
index aaf5dfe4dd..ebb3664d82 100644
--- a/docs/source/python/filesystems.rst
+++ b/docs/source/python/filesystems.rst
@@ -388,6 +388,32 @@ Then all the functionalities of :class:`FileSystem` are
accessible::
ds.dataset("data/", filesystem=pa_fs)
+Using fsspec-compatible filesystem URIs
+---------------------------------------
+
+PyArrow can automatically instantiate fsspec filesystems by prefixing the URI
+scheme with ``fsspec+``. This allows you to use the fsspec-compatible
+filesystems directly with PyArrow's IO functions without needing to manually
+create a filesystem object. Example writing and reading a Parquet file
+using an in-memory filesystem provided by `fsspec`_::
+
+ import pyarrow as pa
+ import pyarrow.parquet as pq
+
+ table = pa.table({'a': [1, 2, 3]})
+ pq.write_table(table, "fsspec+memory://path/to/my_table.parquet")
+ pq.read_table("fsspec+memory://path/to/my_table.parquet")
+
+Example reading parquet file from GitHub directly::
+
+
pq.read_table("fsspec+github://apache:arrow-testing@/data/parquet/alltypes-java.parquet")
+
+Hugging Face URIs are explicitly allowed as a shortcut without needing to
prefix
+with ``fsspec+``. This is useful for reading datasets hosted on Hugging Face::
+
+
pq.read_table("hf://datasets/stanfordnlp/imdb/plain_text/train-00000-of-00001.parquet")
+
+
Using Arrow filesystems with fsspec
-----------------------------------
diff --git a/python/pyarrow/_fs.pyx b/python/pyarrow/_fs.pyx
index 660d4e56a6..40294c6b53 100644
--- a/python/pyarrow/_fs.pyx
+++ b/python/pyarrow/_fs.pyx
@@ -424,6 +424,38 @@ cdef class FileSystem(_Weakrefable):
return fs
@staticmethod
+ def _fsspec_from_uri(uri):
+ """Instantiate FSSpecHandler and path for the given URI."""
+ try:
+ import fsspec
+ except ImportError:
+ raise ImportError(
+ "`fsspec` is required to handle `fsspec+<filesystem>://` and
`hf://` URIs."
+ )
+ from .fs import FSSpecHandler
+
+ uri = uri.removeprefix("fsspec+")
+ fs, path = fsspec.url_to_fs(uri)
+ fs = PyFileSystem(FSSpecHandler(fs))
+
+ return fs, path
+
+ @staticmethod
+ def _native_from_uri(uri):
+ """Instantiate native FileSystem and path for the given URI."""
+ cdef:
+ c_string c_path
+ c_string c_uri
+ CResult[shared_ptr[CFileSystem]] result
+
+ if isinstance(uri, pathlib.Path):
+ # Make absolute
+ uri = uri.resolve().absolute()
+ c_uri = tobytes(_stringify_path(uri))
+ with nogil:
+ result = CFileSystemFromUriOrPath(c_uri, &c_path)
+ return FileSystem.wrap(GetResultValue(result)), frombytes(c_path)
+
def from_uri(uri):
"""
Create a new FileSystem from URI or Path.
@@ -458,19 +490,16 @@ cdef class FileSystem(_Weakrefable):
>>> fs.FileSystem.from_uri("s3://usgs-landsat/collection02/")
(<pyarrow._s3fs.S3FileSystem object at ...>,
'usgs-landsat/collection02')
- """
- cdef:
- c_string c_path
- c_string c_uri
- CResult[shared_ptr[CFileSystem]] result
- if isinstance(uri, pathlib.Path):
- # Make absolute
- uri = uri.resolve().absolute()
- c_uri = tobytes(_stringify_path(uri))
- with nogil:
- result = CFileSystemFromUriOrPath(c_uri, &c_path)
- return FileSystem.wrap(GetResultValue(result)), frombytes(c_path)
+ Or from an fsspec+ URI:
+
+ >>> fs.FileSystem.from_uri("fsspec+memory:///path/to/file")
+ (<pyarrow._fs.PyFileSystem object at ...>, '/path/to/file')
+ """
+ if isinstance(uri, str) and uri.startswith(("fsspec+", "hf://")):
+ return FileSystem._fsspec_from_uri(uri)
+ else:
+ return FileSystem._native_from_uri(uri)
cdef init(self, const shared_ptr[CFileSystem]& wrapped):
self.wrapped = wrapped
@@ -1016,7 +1045,7 @@ cdef class LocalFileSystem(FileSystem):
Create a FileSystem object inferred from a URI of the saved file:
- >>> local_new, path = fs.LocalFileSystem().from_uri('/tmp/local_fs.dat')
+ >>> local_new, path = fs.LocalFileSystem.from_uri('/tmp/local_fs.dat')
>>> local_new
<pyarrow._fs.LocalFileSystem object at ...
>>> path
diff --git a/python/pyarrow/fs.py b/python/pyarrow/fs.py
index f94233e894..157dbdf938 100644
--- a/python/pyarrow/fs.py
+++ b/python/pyarrow/fs.py
@@ -82,58 +82,55 @@ def __getattr__(name):
)
-def _filesystem_from_str(uri):
- # instantiate the file system from an uri, if the uri has a path
- # component then it will be treated as a path prefix
- filesystem, prefix = FileSystem.from_uri(uri)
- prefix = filesystem.normalize_path(prefix)
- if prefix:
- # validate that the prefix is pointing to a directory
- prefix_info = filesystem.get_file_info([prefix])[0]
- if prefix_info.type != FileType.Directory:
- raise ValueError(
- "The path component of the filesystem URI must point to a "
- f"directory but it has a type: `{prefix_info.type.name}`. The
path "
- f"component is `{prefix_info.path}` and the given filesystem
URI is "
- f"`{uri}`"
- )
- filesystem = SubTreeFileSystem(prefix, filesystem)
- return filesystem
-
-
def _ensure_filesystem(filesystem, *, use_mmap=False):
if isinstance(filesystem, FileSystem):
return filesystem
elif isinstance(filesystem, str):
+ # create a filesystem from a URI string, note that the `path` part of
the URI
+ # is treated as a prefix if specified, so the filesystem is wrapped in
a
+ # SubTreeFileSystem
if use_mmap:
raise ValueError(
"Specifying to use memory mapping not supported for "
"filesystem specified as an URI string"
)
- return _filesystem_from_str(filesystem)
-
- # handle fsspec-compatible filesystems
- try:
- import fsspec
- except ImportError:
- pass
+ fs, path = FileSystem.from_uri(filesystem)
+ prefix = fs.normalize_path(path)
+ if prefix:
+ # validate that the prefix is pointing to a directory
+ prefix_info = fs.get_file_info([prefix])[0]
+ if prefix_info.type != FileType.Directory:
+ raise ValueError(
+ "The path component of the filesystem URI must point to a "
+ f"directory but it has a type: `{prefix_info.type.name}`.
The path "
+ f"component is `{prefix_info.path}` and the given
filesystem URI "
+ f"is `{filesystem}`"
+ )
+ fs = SubTreeFileSystem(prefix, fs)
+ return fs
else:
- if isinstance(filesystem, fsspec.AbstractFileSystem):
- if type(filesystem).__name__ == 'LocalFileSystem':
- # In case its a simple LocalFileSystem, use native arrow one
- return LocalFileSystem(use_mmap=use_mmap)
- return PyFileSystem(FSSpecHandler(filesystem))
-
- raise TypeError(
- f"Unrecognized filesystem: {type(filesystem)}. `filesystem` argument
must be a "
- "FileSystem instance or a valid file system URI"
- )
+ # handle fsspec-compatible filesystems
+ try:
+ import fsspec
+ except ImportError:
+ pass
+ else:
+ if isinstance(filesystem, fsspec.AbstractFileSystem):
+ if type(filesystem).__name__ == 'LocalFileSystem':
+ # In case its a simple LocalFileSystem, use native arrow
one
+ return LocalFileSystem(use_mmap=use_mmap)
+ return PyFileSystem(FSSpecHandler(filesystem))
+
+ raise TypeError(
+ f"Unrecognized filesystem: {type(filesystem)}. `filesystem`
argument must "
+ "be a FileSystem instance or a valid file system URI"
+ )
def _resolve_filesystem_and_path(path, filesystem=None, *, memory_map=False):
"""
Return filesystem/path from path which could be an URI or a plain
- filesystem path.
+ filesystem path or a combination of fsspec protocol and URI.
"""
if not _is_path_like(path):
if filesystem is not None:
@@ -176,12 +173,14 @@ def _resolve_filesystem_and_path(path, filesystem=None,
*, memory_map=False):
try:
filesystem, path = FileSystem.from_uri(path)
except ValueError as e:
- # neither an URI nor a locally existing path, so assume that
- # local path was given and propagate a nicer file not found error
- # instead of a more confusing scheme parsing error
- if "empty scheme" not in str(e) \
- and "Cannot parse URI" not in str(e):
- raise
+ msg = str(e)
+ if "empty scheme" in msg or "Cannot parse URI" in msg:
+ # neither an URI nor a locally existing path, so assume that
+ # local path was given and propagate a nicer file not found
+ # error instead of a more confusing scheme parsing error
+ pass
+ else:
+ raise e
else:
path = filesystem.normalize_path(path)
diff --git a/python/pyarrow/parquet/core.py b/python/pyarrow/parquet/core.py
index e98a813395..823a0ca774 100644
--- a/python/pyarrow/parquet/core.py
+++ b/python/pyarrow/parquet/core.py
@@ -44,8 +44,8 @@ from pyarrow._parquet import (ParquetReader, Statistics, #
noqa
FileEncryptionProperties,
FileDecryptionProperties,
SortingColumn)
-from pyarrow.fs import (LocalFileSystem, FileSystem, FileType,
- _resolve_filesystem_and_path, _ensure_filesystem)
+from pyarrow.fs import (LocalFileSystem, FileType,
_resolve_filesystem_and_path,
+ _ensure_filesystem)
from pyarrow.util import guid, _is_path_like, _stringify_path, _deprecate_api
@@ -1392,14 +1392,9 @@ Examples
self._base_dir = None
if not isinstance(path_or_paths, list):
if _is_path_like(path_or_paths):
- path_or_paths = _stringify_path(path_or_paths)
- if filesystem is None:
- # path might be a URI describing the FileSystem as well
- try:
- filesystem, path_or_paths = FileSystem.from_uri(
- path_or_paths)
- except ValueError:
- filesystem = LocalFileSystem(use_mmap=memory_map)
+ filesystem, path_or_paths = _resolve_filesystem_and_path(
+ path_or_paths, filesystem, memory_map=memory_map
+ )
finfo = filesystem.get_file_info(path_or_paths)
if finfo.type == FileType.Directory:
self._base_dir = path_or_paths
diff --git a/python/pyarrow/tests/parquet/test_parquet_file.py
b/python/pyarrow/tests/parquet/test_parquet_file.py
index a30bbc1f6f..a6da3f7dac 100644
--- a/python/pyarrow/tests/parquet/test_parquet_file.py
+++ b/python/pyarrow/tests/parquet/test_parquet_file.py
@@ -17,9 +17,12 @@
import io
import os
+import re
import sys
+import types
import pytest
+from unittest import mock
import pyarrow as pa
@@ -365,3 +368,65 @@ def test_read_undefined_logical_type(parquet_test_datadir):
b"unknown string 2",
b"unknown string 3"
]
+
+
+def test_parquet_file_fsspec_support():
+ pytest.importorskip("fsspec")
+
+ table = pa.table({"a": range(10)})
+ pq.write_table(table, "fsspec+memory://example.parquet")
+ table2 = pq.read_table("fsspec+memory://example.parquet")
+ assert table.equals(table2)
+
+ msg = "Unrecognized filesystem type in URI"
+ with pytest.raises(pa.ArrowInvalid, match=msg):
+ pq.read_table("non-existing://example.parquet")
+
+
+def test_parquet_file_fsspec_support_through_filesystem_argument():
+ try:
+ from fsspec.implementations.memory import MemoryFileSystem
+ except ImportError:
+ pytest.skip("fsspec is not installed, skipping test")
+
+ table = pa.table({"b": range(10)})
+
+ fs = MemoryFileSystem()
+ fs.mkdir("/path/to/prefix", create_parents=True)
+ assert fs.exists("/path/to/prefix")
+
+ fs_str = "fsspec+memory://path/to/prefix"
+ pq.write_table(table, "b.parquet", filesystem=fs_str)
+ table2 = pq.read_table("fsspec+memory://path/to/prefix/b.parquet")
+ assert table.equals(table2)
+
+
+def test_parquet_file_hugginface_support():
+ try:
+ from fsspec.implementations.memory import MemoryFileSystem
+ except ImportError:
+ pytest.skip("fsspec is not installed, skipping Hugging Face test")
+
+ fake_hf_module = types.ModuleType("huggingface_hub")
+ fake_hf_module.HfFileSystem = MemoryFileSystem
+ with mock.patch.dict("sys.modules", {"huggingface_hub": fake_hf_module}):
+ uri = "hf://datasets/apache/arrow/test.parquet"
+ table = pa.table({"a": range(10)})
+ pq.write_table(table, uri)
+ table2 = pq.read_table(uri)
+ assert table.equals(table2)
+
+
+def test_fsspec_uri_raises_if_fsspec_is_not_available():
+ # sadly cannot patch sys.modules because cython will still be able to
import fsspec
+ try:
+ import fsspec # noqa: F401
+ except ImportError:
+ pass
+ else:
+ pytest.skip("fsspec is available, skipping test")
+
+ msg = re.escape(
+ "`fsspec` is required to handle `fsspec+<filesystem>://` and `hf://`
URIs.")
+ with pytest.raises(ImportError, match=msg):
+ pq.read_table("fsspec+memory://example.parquet")
diff --git a/python/pyarrow/tests/test_fs.py b/python/pyarrow/tests/test_fs.py
index 0d61ea6e0c..775738a5a1 100644
--- a/python/pyarrow/tests/test_fs.py
+++ b/python/pyarrow/tests/test_fs.py
@@ -2140,3 +2140,38 @@ def test_uwsgi_integration():
proc.terminate()
# ... and uwsgi should gracefully shutdown after it's been asked above
assert proc.wait() == 30 # UWSGI_END_CODE = 30
+
+
+def test_fsspec_filesystem_from_uri():
+ try:
+ from fsspec.implementations.local import LocalFileSystem
+ from fsspec.implementations.memory import MemoryFileSystem
+ except ImportError:
+ pytest.skip("fsspec not installed")
+
+ fs, path = FileSystem.from_uri("fsspec+memory://path/to/data.parquet")
+ expected_fs = PyFileSystem(FSSpecHandler(MemoryFileSystem()))
+ assert fs == expected_fs
+ assert path == "/path/to/data.parquet"
+
+ # check that if fsspec+ is specified than we don't coerce to the native
+ # arrow local filesystem
+ uri = "file:///tmp/my.file"
+ fs, _ = FileSystem.from_uri(f"fsspec+{uri}")
+ expected_fs = PyFileSystem(FSSpecHandler(LocalFileSystem()))
+ assert fs == expected_fs
+
+
+def test_huggingface_filesystem_from_uri():
+ pytest.importorskip("fsspec")
+ try:
+ from huggingface_hub import HfFileSystem
+ except ImportError:
+ pytest.skip("huggingface_hub not installed")
+
+ fs, path = FileSystem.from_uri(
+
"hf://datasets/stanfordnlp/imdb/plain_text/train-00000-of-00001.parquet"
+ )
+ expected_fs = PyFileSystem(FSSpecHandler(HfFileSystem()))
+ assert fs == expected_fs
+ assert path ==
"datasets/stanfordnlp/imdb/plain_text/train-00000-of-00001.parquet"