This is an automated email from the ASF dual-hosted git repository.
kszucs pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/arrow.git
The following commit(s) were added to refs/heads/master by this push:
new d8977165d6 ARROW-16413: [Python] Certain dataset APIs hang with a
python filesystem
d8977165d6 is described below
commit d8977165d610d3b828eea0923d733cc5a1cf2c4e
Author: Joris Van den Bossche <[email protected]>
AuthorDate: Tue May 3 18:50:17 2022 +0200
ARROW-16413: [Python] Certain dataset APIs hang with a python filesystem
Closes #13033 from jorisvandenbossche/ARROW-16413
Authored-by: Joris Van den Bossche <[email protected]>
Signed-off-by: Krisztián Szűcs <[email protected]>
---
python/pyarrow/_dataset.pyx | 8 ++++--
python/pyarrow/_dataset_parquet.pyx | 7 ++---
python/pyarrow/tests/test_dataset.py | 51 ++++++++++++++++++++++++++++++++++++
3 files changed, 61 insertions(+), 5 deletions(-)
diff --git a/python/pyarrow/_dataset.pyx b/python/pyarrow/_dataset.pyx
index 9cc93e4e7f..10aeafbdba 100644
--- a/python/pyarrow/_dataset.pyx
+++ b/python/pyarrow/_dataset.pyx
@@ -761,8 +761,12 @@ cdef class FileFormat(_Weakrefable):
schema : Schema
The schema inferred from the file
"""
- c_source = _make_file_source(file, filesystem)
- c_schema = GetResultValue(self.format.Inspect(c_source))
+ cdef:
+ CFileSource c_source = _make_file_source(file, filesystem)
+ CResult[shared_ptr[CSchema]] c_result
+ with nogil:
+ c_result = self.format.Inspect(c_source)
+ c_schema = GetResultValue(c_result)
return pyarrow_wrap_schema(move(c_schema))
def make_fragment(self, file, filesystem=None,
diff --git a/python/pyarrow/_dataset_parquet.pyx
b/python/pyarrow/_dataset_parquet.pyx
index 9f097947c1..7b91d4c2c7 100644
--- a/python/pyarrow/_dataset_parquet.pyx
+++ b/python/pyarrow/_dataset_parquet.pyx
@@ -788,7 +788,7 @@ cdef class ParquetDatasetFactory(DatasetFactory):
FileFormat format not None,
ParquetFactoryOptions options=None):
cdef:
- c_string path
+ c_string c_path
shared_ptr[CFileSystem] c_filesystem
shared_ptr[CParquetFileFormat] c_format
CResult[shared_ptr[CDatasetFactory]] result
@@ -801,8 +801,9 @@ cdef class ParquetDatasetFactory(DatasetFactory):
options = options or ParquetFactoryOptions()
c_options = options.unwrap()
- result = CParquetDatasetFactory.MakeFromMetaDataPath(
- c_path, c_filesystem, c_format, c_options)
+ with nogil:
+ result = CParquetDatasetFactory.MakeFromMetaDataPath(
+ c_path, c_filesystem, c_format, c_options)
self.init(GetResultValue(result))
cdef init(self, shared_ptr[CDatasetFactory]& sp):
diff --git a/python/pyarrow/tests/test_dataset.py
b/python/pyarrow/tests/test_dataset.py
index 44769b4ec0..6eda764f27 100644
--- a/python/pyarrow/tests/test_dataset.py
+++ b/python/pyarrow/tests/test_dataset.py
@@ -21,6 +21,7 @@ import posixpath
import datetime
import pathlib
import pickle
+import sys
import textwrap
import tempfile
import threading
@@ -2582,6 +2583,32 @@ def test_open_dataset_from_fsspec(tempdir):
assert dataset.schema.equals(table.schema)
[email protected]
+def test_file_format_inspect_fsspec(tempdir):
+ # https://issues.apache.org/jira/browse/ARROW-16413
+ fsspec = pytest.importorskip("fsspec")
+
+ # create bucket + file with pyarrow
+ table = pa.table({'a': [1, 2, 3]})
+ path = tempdir / "data.parquet"
+ pq.write_table(table, path)
+
+ # read using fsspec filesystem
+ fsspec_fs = fsspec.filesystem("file")
+ assert fsspec_fs.ls(tempdir)[0].endswith("data.parquet")
+
+ # inspect using dataset file format
+ format = ds.ParquetFileFormat()
+ # manually creating a PyFileSystem instead of using fs._ensure_filesystem
+ # which would convert an fsspec local filesystem to a native one
+ filesystem = fs.PyFileSystem(fs.FSSpecHandler(fsspec_fs))
+ schema = format.inspect(path, filesystem)
+ assert schema.equals(table.schema)
+
+ fragment = format.make_fragment(path, filesystem)
+ assert fragment.physical_schema.equals(table.schema)
+
+
@pytest.mark.pandas
def test_filter_timestamp(tempdir, dataset_reader):
# ARROW-11379
@@ -3094,6 +3121,30 @@ def test_parquet_dataset_factory(tempdir):
assert result.num_rows == 40
[email protected]
[email protected] # write_to_dataset currently requires pandas
[email protected](sys.platform == 'win32',
+ reason="Results in FileNotFoundError on Windows")
+def test_parquet_dataset_factory_fsspec(tempdir):
+ # https://issues.apache.org/jira/browse/ARROW-16413
+ fsspec = pytest.importorskip("fsspec")
+
+ # create dataset with pyarrow
+ root_path = tempdir / "test_parquet_dataset"
+ metadata_path, table = _create_parquet_dataset_simple(root_path)
+
+ # read using fsspec filesystem
+ fsspec_fs = fsspec.filesystem("file")
+ # manually creating a PyFileSystem, because passing the local fsspec
+ # filesystem would internally be converted to native LocalFileSystem
+ filesystem = fs.PyFileSystem(fs.FSSpecHandler(fsspec_fs))
+ dataset = ds.parquet_dataset(metadata_path, filesystem=filesystem)
+ assert dataset.schema.equals(table.schema)
+ assert len(dataset.files) == 4
+ result = dataset.to_table()
+ assert result.num_rows == 40
+
+
@pytest.mark.parquet
@pytest.mark.pandas # write_to_dataset currently requires pandas
@pytest.mark.parametrize('use_legacy_dataset', [False, True])