Repository: arrow Updated Branches: refs/heads/master 243328931 -> 2660dda40
ARROW-1377: [Python] Add ParquetFile.scan_contents function to use for benchmarking Requires PARQUET-1087: https://github.com/apache/parquet-cpp/pull/387 Author: Wes McKinney <[email protected]> Closes #1039 from wesm/ARROW-1377 and squashes the following commits: 39caa344 [Wes McKinney] Add ParquetFile.scan_contents function to use for benchmarking Project: http://git-wip-us.apache.org/repos/asf/arrow/repo Commit: http://git-wip-us.apache.org/repos/asf/arrow/commit/2660dda4 Tree: http://git-wip-us.apache.org/repos/asf/arrow/tree/2660dda4 Diff: http://git-wip-us.apache.org/repos/asf/arrow/diff/2660dda4 Branch: refs/heads/master Commit: 2660dda4082a5b74d82d773b1c35feb69ddbf447 Parents: 2433289 Author: Wes McKinney <[email protected]> Authored: Tue Sep 5 19:13:06 2017 -0400 Committer: Wes McKinney <[email protected]> Committed: Tue Sep 5 19:13:06 2017 -0400 ---------------------------------------------------------------------- python/pyarrow/_parquet.pxd | 3 +++ python/pyarrow/_parquet.pyx | 19 +++++++++++++++++++ python/pyarrow/parquet.py | 27 +++++++++++++++++++++++++-- python/pyarrow/tests/test_parquet.py | 19 +++++++++++++++++++ 4 files changed, 66 insertions(+), 2 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/arrow/blob/2660dda4/python/pyarrow/_parquet.pxd ---------------------------------------------------------------------- diff --git a/python/pyarrow/_parquet.pxd b/python/pyarrow/_parquet.pxd index ced6549..5094232 100644 --- a/python/pyarrow/_parquet.pxd +++ b/python/pyarrow/_parquet.pxd @@ -231,6 +231,9 @@ cdef extern from "parquet/arrow/reader.h" namespace "parquet::arrow" nogil: CStatus ReadTable(const vector[int]& column_indices, shared_ptr[CTable]* out) + CStatus ScanContents(vector[int] columns, int32_t column_batch_size, + int64_t* num_rows) + const ParquetFileReader* parquet_reader() void set_num_threads(int num_threads) http://git-wip-us.apache.org/repos/asf/arrow/blob/2660dda4/python/pyarrow/_parquet.pyx ---------------------------------------------------------------------- diff --git a/python/pyarrow/_parquet.pyx b/python/pyarrow/_parquet.pyx index f3b7875..aea6fb6 100644 --- a/python/pyarrow/_parquet.pyx +++ b/python/pyarrow/_parquet.pyx @@ -467,6 +467,25 @@ cdef class ParquetReader: .ReadTable(&ctable)) return pyarrow_wrap_table(ctable) + def scan_contents(self, column_indices=None, batch_size=65536): + cdef: + vector[int] c_column_indices + int32_t c_batch_size + int64_t c_num_rows + + if column_indices is not None: + for index in column_indices: + c_column_indices.push_back(index) + + c_batch_size = batch_size + + with nogil: + check_status(self.reader.get() + .ScanContents(c_column_indices, c_batch_size, + &c_num_rows)) + + return c_num_rows + def column_name_idx(self, column_name): """ Find the matching index of a column in the schema. http://git-wip-us.apache.org/repos/asf/arrow/blob/2660dda4/python/pyarrow/parquet.py ---------------------------------------------------------------------- diff --git a/python/pyarrow/parquet.py b/python/pyarrow/parquet.py index 5dabca9..568aad4 100644 --- a/python/pyarrow/parquet.py +++ b/python/pyarrow/parquet.py @@ -118,6 +118,27 @@ class ParquetFile(object): return self.reader.read_all(column_indices=column_indices, nthreads=nthreads) + def scan_contents(self, columns=None, batch_size=65536): + """ + Read contents of file with a single thread for indicated columns and + batch size. Number of rows in file is returned. This function is used + for benchmarking + + Parameters + ---------- + columns : list of integers, default None + If None, scan all columns + batch_size : int, default 64K + Number of rows to read at a time internally + + Returns + ------- + num_rows : number of rows in file + """ + column_indices = self._get_column_indices(columns) + return self.reader.scan_contents(column_indices, + batch_size=batch_size) + def _get_column_indices(self, column_names, use_pandas_metadata=False): if column_names is None: return None @@ -648,12 +669,14 @@ class ParquetDataset(object): def _ensure_filesystem(fs): fs_type = type(fs) - # If the arrow filesystem was subclassed, assume it supports the full interface and return it + # If the arrow filesystem was subclassed, assume it supports the full + # interface and return it if not issubclass(fs_type, FileSystem): for mro in inspect.getmro(fs_type): if mro.__name__ is 'S3FileSystem': return S3FSWrapper(fs) - # In case its a simple LocalFileSystem (e.g. dask) use native arrow FS + # In case its a simple LocalFileSystem (e.g. dask) use native arrow + # FS elif mro.__name__ is 'LocalFileSystem': return LocalFileSystem.get_instance() http://git-wip-us.apache.org/repos/asf/arrow/blob/2660dda4/python/pyarrow/tests/test_parquet.py ---------------------------------------------------------------------- diff --git a/python/pyarrow/tests/test_parquet.py b/python/pyarrow/tests/test_parquet.py index de6f431..fa9455b 100644 --- a/python/pyarrow/tests/test_parquet.py +++ b/python/pyarrow/tests/test_parquet.py @@ -676,6 +676,25 @@ def test_read_single_row_group_with_column_subset(): @parquet +def test_scan_contents(): + import pyarrow.parquet as pq + + N, K = 10000, 4 + df = alltypes_sample(size=N) + a_table = pa.Table.from_pandas(df) + + buf = io.BytesIO() + _write_table(a_table, buf, row_group_size=N / K, + compression='snappy', version='2.0') + + buf.seek(0) + pf = pq.ParquetFile(buf) + + assert pf.scan_contents() == 10000 + assert pf.scan_contents(df.columns[:4]) == 10000 + + +@parquet def test_parquet_piece_read(tmpdir): import pyarrow.parquet as pq
