This is an automated email from the ASF dual-hosted git repository. kszucs pushed a commit to branch release-8.0.0 in repository https://gitbox.apache.org/repos/asf/arrow.git
commit 0d30a05212b1448f53233f2ab325924311d76e54 Author: Joris Van den Bossche <[email protected]> AuthorDate: Tue May 3 18:50:17 2022 +0200 ARROW-16413: [Python] Certain dataset APIs hang with a python filesystem Closes #13033 from jorisvandenbossche/ARROW-16413 Authored-by: Joris Van den Bossche <[email protected]> Signed-off-by: Krisztián Szűcs <[email protected]> --- python/pyarrow/_dataset.pyx | 8 ++++-- python/pyarrow/_dataset_parquet.pyx | 7 ++--- python/pyarrow/tests/test_dataset.py | 51 ++++++++++++++++++++++++++++++++++++ 3 files changed, 61 insertions(+), 5 deletions(-) diff --git a/python/pyarrow/_dataset.pyx b/python/pyarrow/_dataset.pyx index 9cc93e4e7f..10aeafbdba 100644 --- a/python/pyarrow/_dataset.pyx +++ b/python/pyarrow/_dataset.pyx @@ -761,8 +761,12 @@ cdef class FileFormat(_Weakrefable): schema : Schema The schema inferred from the file """ - c_source = _make_file_source(file, filesystem) - c_schema = GetResultValue(self.format.Inspect(c_source)) + cdef: + CFileSource c_source = _make_file_source(file, filesystem) + CResult[shared_ptr[CSchema]] c_result + with nogil: + c_result = self.format.Inspect(c_source) + c_schema = GetResultValue(c_result) return pyarrow_wrap_schema(move(c_schema)) def make_fragment(self, file, filesystem=None, diff --git a/python/pyarrow/_dataset_parquet.pyx b/python/pyarrow/_dataset_parquet.pyx index 9f097947c1..7b91d4c2c7 100644 --- a/python/pyarrow/_dataset_parquet.pyx +++ b/python/pyarrow/_dataset_parquet.pyx @@ -788,7 +788,7 @@ cdef class ParquetDatasetFactory(DatasetFactory): FileFormat format not None, ParquetFactoryOptions options=None): cdef: - c_string path + c_string c_path shared_ptr[CFileSystem] c_filesystem shared_ptr[CParquetFileFormat] c_format CResult[shared_ptr[CDatasetFactory]] result @@ -801,8 +801,9 @@ cdef class ParquetDatasetFactory(DatasetFactory): options = options or ParquetFactoryOptions() c_options = options.unwrap() - result = CParquetDatasetFactory.MakeFromMetaDataPath( - c_path, c_filesystem, c_format, c_options) + with nogil: + result = CParquetDatasetFactory.MakeFromMetaDataPath( + c_path, c_filesystem, c_format, c_options) self.init(GetResultValue(result)) cdef init(self, shared_ptr[CDatasetFactory]& sp): diff --git a/python/pyarrow/tests/test_dataset.py b/python/pyarrow/tests/test_dataset.py index 44769b4ec0..6eda764f27 100644 --- a/python/pyarrow/tests/test_dataset.py +++ b/python/pyarrow/tests/test_dataset.py @@ -21,6 +21,7 @@ import posixpath import datetime import pathlib import pickle +import sys import textwrap import tempfile import threading @@ -2582,6 +2583,32 @@ def test_open_dataset_from_fsspec(tempdir): assert dataset.schema.equals(table.schema) [email protected] +def test_file_format_inspect_fsspec(tempdir): + # https://issues.apache.org/jira/browse/ARROW-16413 + fsspec = pytest.importorskip("fsspec") + + # create bucket + file with pyarrow + table = pa.table({'a': [1, 2, 3]}) + path = tempdir / "data.parquet" + pq.write_table(table, path) + + # read using fsspec filesystem + fsspec_fs = fsspec.filesystem("file") + assert fsspec_fs.ls(tempdir)[0].endswith("data.parquet") + + # inspect using dataset file format + format = ds.ParquetFileFormat() + # manually creating a PyFileSystem instead of using fs._ensure_filesystem + # which would convert an fsspec local filesystem to a native one + filesystem = fs.PyFileSystem(fs.FSSpecHandler(fsspec_fs)) + schema = format.inspect(path, filesystem) + assert schema.equals(table.schema) + + fragment = format.make_fragment(path, filesystem) + assert fragment.physical_schema.equals(table.schema) + + @pytest.mark.pandas def test_filter_timestamp(tempdir, dataset_reader): # ARROW-11379 @@ -3094,6 +3121,30 @@ def test_parquet_dataset_factory(tempdir): assert result.num_rows == 40 [email protected] [email protected] # write_to_dataset currently requires pandas [email protected](sys.platform == 'win32', + reason="Results in FileNotFoundError on Windows") +def test_parquet_dataset_factory_fsspec(tempdir): + # https://issues.apache.org/jira/browse/ARROW-16413 + fsspec = pytest.importorskip("fsspec") + + # create dataset with pyarrow + root_path = tempdir / "test_parquet_dataset" + metadata_path, table = _create_parquet_dataset_simple(root_path) + + # read using fsspec filesystem + fsspec_fs = fsspec.filesystem("file") + # manually creating a PyFileSystem, because passing the local fsspec + # filesystem would internally be converted to native LocalFileSystem + filesystem = fs.PyFileSystem(fs.FSSpecHandler(fsspec_fs)) + dataset = ds.parquet_dataset(metadata_path, filesystem=filesystem) + assert dataset.schema.equals(table.schema) + assert len(dataset.files) == 4 + result = dataset.to_table() + assert result.num_rows == 40 + + @pytest.mark.parquet @pytest.mark.pandas # write_to_dataset currently requires pandas @pytest.mark.parametrize('use_legacy_dataset', [False, True])
