This is an automated email from the ASF dual-hosted git repository.

kszucs pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/arrow.git


The following commit(s) were added to refs/heads/master by this push:
     new d8977165d6 ARROW-16413: [Python] Certain dataset APIs hang with a 
python filesystem
d8977165d6 is described below

commit d8977165d610d3b828eea0923d733cc5a1cf2c4e
Author: Joris Van den Bossche <[email protected]>
AuthorDate: Tue May 3 18:50:17 2022 +0200

    ARROW-16413: [Python] Certain dataset APIs hang with a python filesystem
    
    Closes #13033 from jorisvandenbossche/ARROW-16413
    
    Authored-by: Joris Van den Bossche <[email protected]>
    Signed-off-by: Krisztián Szűcs <[email protected]>
---
 python/pyarrow/_dataset.pyx          |  8 ++++--
 python/pyarrow/_dataset_parquet.pyx  |  7 ++---
 python/pyarrow/tests/test_dataset.py | 51 ++++++++++++++++++++++++++++++++++++
 3 files changed, 61 insertions(+), 5 deletions(-)

diff --git a/python/pyarrow/_dataset.pyx b/python/pyarrow/_dataset.pyx
index 9cc93e4e7f..10aeafbdba 100644
--- a/python/pyarrow/_dataset.pyx
+++ b/python/pyarrow/_dataset.pyx
@@ -761,8 +761,12 @@ cdef class FileFormat(_Weakrefable):
         schema : Schema
             The schema inferred from the file
         """
-        c_source = _make_file_source(file, filesystem)
-        c_schema = GetResultValue(self.format.Inspect(c_source))
+        cdef:
+            CFileSource c_source = _make_file_source(file, filesystem)
+            CResult[shared_ptr[CSchema]] c_result
+        with nogil:
+            c_result = self.format.Inspect(c_source)
+        c_schema = GetResultValue(c_result)
         return pyarrow_wrap_schema(move(c_schema))
 
     def make_fragment(self, file, filesystem=None,
diff --git a/python/pyarrow/_dataset_parquet.pyx 
b/python/pyarrow/_dataset_parquet.pyx
index 9f097947c1..7b91d4c2c7 100644
--- a/python/pyarrow/_dataset_parquet.pyx
+++ b/python/pyarrow/_dataset_parquet.pyx
@@ -788,7 +788,7 @@ cdef class ParquetDatasetFactory(DatasetFactory):
                  FileFormat format not None,
                  ParquetFactoryOptions options=None):
         cdef:
-            c_string path
+            c_string c_path
             shared_ptr[CFileSystem] c_filesystem
             shared_ptr[CParquetFileFormat] c_format
             CResult[shared_ptr[CDatasetFactory]] result
@@ -801,8 +801,9 @@ cdef class ParquetDatasetFactory(DatasetFactory):
         options = options or ParquetFactoryOptions()
         c_options = options.unwrap()
 
-        result = CParquetDatasetFactory.MakeFromMetaDataPath(
-            c_path, c_filesystem, c_format, c_options)
+        with nogil:
+            result = CParquetDatasetFactory.MakeFromMetaDataPath(
+                c_path, c_filesystem, c_format, c_options)
         self.init(GetResultValue(result))
 
     cdef init(self, shared_ptr[CDatasetFactory]& sp):
diff --git a/python/pyarrow/tests/test_dataset.py 
b/python/pyarrow/tests/test_dataset.py
index 44769b4ec0..6eda764f27 100644
--- a/python/pyarrow/tests/test_dataset.py
+++ b/python/pyarrow/tests/test_dataset.py
@@ -21,6 +21,7 @@ import posixpath
 import datetime
 import pathlib
 import pickle
+import sys
 import textwrap
 import tempfile
 import threading
@@ -2582,6 +2583,32 @@ def test_open_dataset_from_fsspec(tempdir):
     assert dataset.schema.equals(table.schema)
 
 
[email protected]
+def test_file_format_inspect_fsspec(tempdir):
+    # https://issues.apache.org/jira/browse/ARROW-16413
+    fsspec = pytest.importorskip("fsspec")
+
+    # create bucket + file with pyarrow
+    table = pa.table({'a': [1, 2, 3]})
+    path = tempdir / "data.parquet"
+    pq.write_table(table, path)
+
+    # read using fsspec filesystem
+    fsspec_fs = fsspec.filesystem("file")
+    assert fsspec_fs.ls(tempdir)[0].endswith("data.parquet")
+
+    # inspect using dataset file format
+    format = ds.ParquetFileFormat()
+    # manually creating a PyFileSystem instead of using fs._ensure_filesystem
+    # which would convert an fsspec local filesystem to a native one
+    filesystem = fs.PyFileSystem(fs.FSSpecHandler(fsspec_fs))
+    schema = format.inspect(path, filesystem)
+    assert schema.equals(table.schema)
+
+    fragment = format.make_fragment(path, filesystem)
+    assert fragment.physical_schema.equals(table.schema)
+
+
 @pytest.mark.pandas
 def test_filter_timestamp(tempdir, dataset_reader):
     # ARROW-11379
@@ -3094,6 +3121,30 @@ def test_parquet_dataset_factory(tempdir):
     assert result.num_rows == 40
 
 
[email protected]
[email protected]  # write_to_dataset currently requires pandas
[email protected](sys.platform == 'win32',
+                    reason="Results in FileNotFoundError on Windows")
+def test_parquet_dataset_factory_fsspec(tempdir):
+    # https://issues.apache.org/jira/browse/ARROW-16413
+    fsspec = pytest.importorskip("fsspec")
+
+    # create dataset with pyarrow
+    root_path = tempdir / "test_parquet_dataset"
+    metadata_path, table = _create_parquet_dataset_simple(root_path)
+
+    # read using fsspec filesystem
+    fsspec_fs = fsspec.filesystem("file")
+    # manually creating a PyFileSystem, because passing the local fsspec
+    # filesystem would internally be converted to native LocalFileSystem
+    filesystem = fs.PyFileSystem(fs.FSSpecHandler(fsspec_fs))
+    dataset = ds.parquet_dataset(metadata_path, filesystem=filesystem)
+    assert dataset.schema.equals(table.schema)
+    assert len(dataset.files) == 4
+    result = dataset.to_table()
+    assert result.num_rows == 40
+
+
 @pytest.mark.parquet
 @pytest.mark.pandas  # write_to_dataset currently requires pandas
 @pytest.mark.parametrize('use_legacy_dataset', [False, True])

Reply via email to