This is an automated email from the ASF dual-hosted git repository.

apitrou pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/arrow.git


The following commit(s) were added to refs/heads/main by this push:
     new 2ea7f79541 GH-37857: [Python][Dataset] Expose file size to python 
dataset (#37868)
2ea7f79541 is described below

commit 2ea7f79541f2024ef41aa823fd8ad2603dc2b6f8
Author: Eero Lihavainen <[email protected]>
AuthorDate: Tue Dec 5 18:26:59 2023 +0200

    GH-37857: [Python][Dataset] Expose file size to python dataset (#37868)
    
    
    
    ### Rationale for this change
    
    Allow passing known file sizes to `make_fragment`, to avoid potential 
network requests.
    
    ### What changes are included in this PR?
    
    ### Are these changes tested?
    
    Yes, tests with S3 that file size gets used.
    
    ### Are there any user-facing changes?
    
    Yes, new function arguments.
    
    * Closes: #37857
    
    Lead-authored-by: Eero Lihavainen <[email protected]>
    Co-authored-by: Benjamin Kietzman <[email protected]>
    Co-authored-by: Eero Lihavainen <[email protected]>
    Signed-off-by: Antoine Pitrou <[email protected]>
---
 python/pyarrow/_dataset.pxd                  |  5 +--
 python/pyarrow/_dataset.pyx                  | 25 ++++++++----
 python/pyarrow/_dataset_parquet.pyx          | 11 +++---
 python/pyarrow/includes/libarrow_dataset.pxd |  1 +
 python/pyarrow/tests/test_dataset.py         | 58 ++++++++++++++++++++++++++++
 5 files changed, 84 insertions(+), 16 deletions(-)

diff --git a/python/pyarrow/_dataset.pxd b/python/pyarrow/_dataset.pxd
index bee9fc1f09..220ab6b19a 100644
--- a/python/pyarrow/_dataset.pxd
+++ b/python/pyarrow/_dataset.pxd
@@ -22,11 +22,10 @@
 from pyarrow.includes.common cimport *
 from pyarrow.includes.libarrow_dataset cimport *
 from pyarrow.lib cimport *
-from pyarrow._fs cimport FileSystem
+from pyarrow._fs cimport FileSystem, FileInfo
 
 
-cdef CFileSource _make_file_source(object file, FileSystem filesystem=*)
-
+cdef CFileSource _make_file_source(object file, FileSystem filesystem=*, 
object file_size=*)
 
 cdef class DatasetFactory(_Weakrefable):
 
diff --git a/python/pyarrow/_dataset.pyx b/python/pyarrow/_dataset.pyx
index 029948a609..b93f71969e 100644
--- a/python/pyarrow/_dataset.pyx
+++ b/python/pyarrow/_dataset.pyx
@@ -32,7 +32,7 @@ from pyarrow.includes.libarrow_dataset cimport *
 from pyarrow._acero cimport ExecNodeOptions
 from pyarrow._compute cimport Expression, _bind
 from pyarrow._compute import _forbid_instantiation
-from pyarrow._fs cimport FileSystem, FileSelector
+from pyarrow._fs cimport FileSystem, FileSelector, FileInfo
 from pyarrow._csv cimport (
     ConvertOptions, ParseOptions, ReadOptions, WriteOptions)
 from pyarrow.util import _is_iterable, _is_path_like, _stringify_path
@@ -96,27 +96,33 @@ def _get_parquet_symbol(name):
     return _dataset_pq and getattr(_dataset_pq, name)
 
 
-cdef CFileSource _make_file_source(object file, FileSystem filesystem=None):
+cdef CFileSource _make_file_source(object file, FileSystem filesystem=None, 
object file_size=None):
 
     cdef:
         CFileSource c_source
         shared_ptr[CFileSystem] c_filesystem
+        CFileInfo c_info
         c_string c_path
         shared_ptr[CRandomAccessFile] c_file
         shared_ptr[CBuffer] c_buffer
+        int64_t c_size
 
     if isinstance(file, Buffer):
         c_buffer = pyarrow_unwrap_buffer(file)
         c_source = CFileSource(move(c_buffer))
-
     elif _is_path_like(file):
         if filesystem is None:
             raise ValueError("cannot construct a FileSource from "
                              "a path without a FileSystem")
         c_filesystem = filesystem.unwrap()
         c_path = tobytes(_stringify_path(file))
-        c_source = CFileSource(move(c_path), move(c_filesystem))
 
+        if file_size is not None:
+            c_size = file_size
+            c_info = FileInfo(c_path, size=c_size).unwrap()
+            c_source = CFileSource(move(c_info), move(c_filesystem))
+        else:
+            c_source = CFileSource(move(c_path), move(c_filesystem))
     elif hasattr(file, 'read'):
         # Optimistically hope this is file-like
         c_file = get_native_file(file, False).get_random_access_file()
@@ -1230,7 +1236,7 @@ cdef class FileFormat(_Weakrefable):
             The schema inferred from the file
         """
         cdef:
-            CFileSource c_source = _make_file_source(file, filesystem)
+            CFileSource c_source = _make_file_source(file, filesystem, 
file_size=None)
             CResult[shared_ptr[CSchema]] c_result
         with nogil:
             c_result = self.format.Inspect(c_source)
@@ -1238,7 +1244,8 @@ cdef class FileFormat(_Weakrefable):
         return pyarrow_wrap_schema(move(c_schema))
 
     def make_fragment(self, file, filesystem=None,
-                      Expression partition_expression=None):
+                      Expression partition_expression=None,
+                      *, file_size=None):
         """
         Make a FileFragment from a given file.
 
@@ -1252,6 +1259,9 @@ cdef class FileFormat(_Weakrefable):
         partition_expression : Expression, optional
             An expression that is guaranteed true for all rows in the 
fragment.  Allows
             fragment to be potentially skipped while scanning with a filter.
+        file_size : int, optional
+            The size of the file in bytes. Can improve performance with 
high-latency filesystems
+            when file size needs to be known before reading.
 
         Returns
         -------
@@ -1260,8 +1270,7 @@ cdef class FileFormat(_Weakrefable):
         """
         if partition_expression is None:
             partition_expression = _true
-
-        c_source = _make_file_source(file, filesystem)
+        c_source = _make_file_source(file, filesystem, file_size)
         c_fragment = <shared_ptr[CFragment]> GetResultValue(
             self.format.MakeFragment(move(c_source),
                                      partition_expression.unwrap(),
diff --git a/python/pyarrow/_dataset_parquet.pyx 
b/python/pyarrow/_dataset_parquet.pyx
index f83b78d933..d458ac4ee7 100644
--- a/python/pyarrow/_dataset_parquet.pyx
+++ b/python/pyarrow/_dataset_parquet.pyx
@@ -235,7 +235,7 @@ cdef class ParquetFileFormat(FileFormat):
         return f"<ParquetFileFormat read_options={self.read_options}>"
 
     def make_fragment(self, file, filesystem=None,
-                      Expression partition_expression=None, row_groups=None):
+                      Expression partition_expression=None, row_groups=None, 
*, file_size=None):
         """
         Make a FileFragment from a given file.
 
@@ -251,6 +251,9 @@ cdef class ParquetFileFormat(FileFormat):
             fragment to be potentially skipped while scanning with a filter.
         row_groups : Iterable, optional
             The indices of the row groups to include
+        file_size : int, optional
+            The size of the file in bytes. Can improve performance with 
high-latency filesystems
+            when file size needs to be known before reading.
 
         Returns
         -------
@@ -259,15 +262,13 @@ cdef class ParquetFileFormat(FileFormat):
         """
         cdef:
             vector[int] c_row_groups
-
         if partition_expression is None:
             partition_expression = _true
-
         if row_groups is None:
             return super().make_fragment(file, filesystem,
-                                         partition_expression)
+                                         partition_expression, 
file_size=file_size)
 
-        c_source = _make_file_source(file, filesystem)
+        c_source = _make_file_source(file, filesystem, file_size)
         c_row_groups = [<int> row_group for row_group in set(row_groups)]
 
         c_fragment = <shared_ptr[CFragment]> GetResultValue(
diff --git a/python/pyarrow/includes/libarrow_dataset.pxd 
b/python/pyarrow/includes/libarrow_dataset.pxd
index 8901d763e3..4566cb5004 100644
--- a/python/pyarrow/includes/libarrow_dataset.pxd
+++ b/python/pyarrow/includes/libarrow_dataset.pxd
@@ -178,6 +178,7 @@ cdef extern from "arrow/dataset/api.h" namespace 
"arrow::dataset" nogil:
         const c_string& path() const
         const shared_ptr[CFileSystem]& filesystem() const
         const shared_ptr[CBuffer]& buffer() const
+        const int64_t size() const
         # HACK: Cython can't handle all the overloads so don't declare them.
         # This means invalid construction of CFileSource won't be caught in
         # the C++ generation phase (though it will still be caught when
diff --git a/python/pyarrow/tests/test_dataset.py 
b/python/pyarrow/tests/test_dataset.py
index d5e7015a5d..f3c25ee8c5 100644
--- a/python/pyarrow/tests/test_dataset.py
+++ b/python/pyarrow/tests/test_dataset.py
@@ -988,6 +988,64 @@ def test_make_fragment(multisourcefs):
         assert row_group_fragment.row_groups == [0]
 
 
[email protected]
[email protected]
+def test_make_fragment_with_size(s3_example_simple):
+    """
+    Test passing file_size to make_fragment. Not all FS implementations make 
use
+    of the file size (by implementing an OpenInputFile that takes a FileInfo), 
but
+    s3 does, which is why it's used here.
+    """
+    table, path, fs, uri, host, port, access_key, secret_key = 
s3_example_simple
+
+    file_format = ds.ParquetFileFormat()
+    paths = [path]
+
+    fragments = [file_format.make_fragment(path, fs)
+                 for path in paths]
+    dataset = ds.FileSystemDataset(
+        fragments, format=file_format, schema=table.schema, filesystem=fs
+    )
+
+    tbl = dataset.to_table()
+    assert tbl.equals(table)
+
+    # true sizes -> works
+    sizes_true = [dataset.filesystem.get_file_info(x).size for x in 
dataset.files]
+    fragments_with_size = [file_format.make_fragment(path, fs, file_size=size)
+                           for path, size in zip(paths, sizes_true)]
+    dataset_with_size = ds.FileSystemDataset(
+        fragments_with_size, format=file_format, schema=table.schema, 
filesystem=fs
+    )
+    tbl = dataset.to_table()
+    assert tbl.equals(table)
+
+    # too small sizes -> error
+    sizes_toosmall = [1 for path in paths]
+    fragments_with_size = [file_format.make_fragment(path, fs, file_size=size)
+                           for path, size in zip(paths, sizes_toosmall)]
+
+    dataset_with_size = ds.FileSystemDataset(
+        fragments_with_size, format=file_format, schema=table.schema, 
filesystem=fs
+    )
+
+    with pytest.raises(pyarrow.lib.ArrowInvalid, match='Parquet file size is 1 
bytes'):
+        table = dataset_with_size.to_table()
+
+    # too large sizes -> error
+    sizes_toolarge = [1000000 for path in paths]
+    fragments_with_size = [file_format.make_fragment(path, fs, file_size=size)
+                           for path, size in zip(paths, sizes_toolarge)]
+
+    dataset_with_size = ds.FileSystemDataset(
+        fragments_with_size, format=file_format, schema=table.schema, 
filesystem=fs
+    )
+
+    # invalid range
+    with pytest.raises(OSError, match='HTTP status 416'):
+        table = dataset_with_size.to_table()
+
+
 def test_make_csv_fragment_from_buffer(dataset_reader, pickle_module):
     content = textwrap.dedent("""
         alpha,num,animal

Reply via email to