Repository: arrow Updated Branches: refs/heads/master ff744ef13 -> 56f1e91d2
ARROW-771: [Python] Add read_row_group / num_row_groups to ParquetFile requires PARQUET-946 https://github.com/apache/parquet-cpp/pull/291 cc @cpcloud @jreback @mrocklin Author: Wes McKinney <wes.mckin...@twosigma.com> Closes #494 from wesm/ARROW-771 and squashes the following commits: 126789a [Wes McKinney] Fix docstring 1009423 [Wes McKinney] Add read_row_group / num_row_groups to ParquetFile Project: http://git-wip-us.apache.org/repos/asf/arrow/repo Commit: http://git-wip-us.apache.org/repos/asf/arrow/commit/56f1e91d Tree: http://git-wip-us.apache.org/repos/asf/arrow/tree/56f1e91d Diff: http://git-wip-us.apache.org/repos/asf/arrow/diff/56f1e91d Branch: refs/heads/master Commit: 56f1e91d2961a13b7f677785fa705bed06d9639d Parents: ff744ef Author: Wes McKinney <wes.mckin...@twosigma.com> Authored: Thu Apr 6 13:49:32 2017 -0400 Committer: Wes McKinney <wes.mckin...@twosigma.com> Committed: Thu Apr 6 13:49:32 2017 -0400 ---------------------------------------------------------------------- python/pyarrow/_parquet.pxd | 17 +++++++--- python/pyarrow/_parquet.pyx | 37 ++++++++++++++++----- python/pyarrow/parquet.py | 53 +++++++++++++++++++++++-------- python/pyarrow/tests/test_parquet.py | 29 +++++++++++++++++ 4 files changed, 109 insertions(+), 27 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/arrow/blob/56f1e91d/python/pyarrow/_parquet.pxd ---------------------------------------------------------------------- diff --git a/python/pyarrow/_parquet.pxd b/python/pyarrow/_parquet.pxd index f12c86f..1ac1f69 100644 --- a/python/pyarrow/_parquet.pxd +++ b/python/pyarrow/_parquet.pxd @@ -179,7 +179,7 @@ cdef extern from "parquet/api/reader.h" namespace "parquet" nogil: @staticmethod unique_ptr[ParquetFileReader] OpenFile(const c_string& path) - shared_ptr[CFileMetaData] metadata(); + shared_ptr[CFileMetaData] metadata() cdef extern from "parquet/api/writer.h" namespace "parquet" nogil: @@ -211,11 +211,18 @@ cdef extern from "parquet/arrow/reader.h" namespace "parquet::arrow" nogil: cdef cppclass FileReader: FileReader(CMemoryPool* pool, unique_ptr[ParquetFileReader] reader) - CStatus ReadColumn(int i, shared_ptr[CArray]* out); - CStatus ReadTable(shared_ptr[CTable]* out); + CStatus ReadColumn(int i, shared_ptr[CArray]* out) + + int num_row_groups() + CStatus ReadRowGroup(int i, shared_ptr[CTable]* out) + CStatus ReadRowGroup(int i, const vector[int]& column_indices, + shared_ptr[CTable]* out) + + CStatus ReadTable(shared_ptr[CTable]* out) CStatus ReadTable(const vector[int]& column_indices, - shared_ptr[CTable]* out); - const ParquetFileReader* parquet_reader(); + shared_ptr[CTable]* out) + + const ParquetFileReader* parquet_reader() void set_num_threads(int num_threads) http://git-wip-us.apache.org/repos/asf/arrow/blob/56f1e91d/python/pyarrow/_parquet.pyx ---------------------------------------------------------------------- diff --git a/python/pyarrow/_parquet.pyx b/python/pyarrow/_parquet.pyx index cfd2816..079bf5e 100644 --- a/python/pyarrow/_parquet.pyx +++ b/python/pyarrow/_parquet.pyx @@ -31,7 +31,7 @@ from pyarrow.error import ArrowException from pyarrow.error cimport check_status from pyarrow.io import NativeFile from pyarrow.memory cimport MemoryPool, maybe_unbox_memory_pool -from pyarrow.table cimport Table +from pyarrow.table cimport Table, table_from_ctable from pyarrow.io cimport NativeFile, get_reader, get_writer @@ -381,16 +381,39 @@ cdef class ParquetReader: result.init(metadata) return result - def read(self, column_indices=None, nthreads=1): + property num_row_groups: + + def __get__(self): + return self.reader.get().num_row_groups() + + def set_num_threads(self, int nthreads): + self.reader.get().set_num_threads(nthreads) + + def read_row_group(self, int i, column_indices=None): cdef: - Table table = Table() shared_ptr[CTable] ctable vector[int] c_column_indices - self.reader.get().set_num_threads(nthreads) + if column_indices is not None: + for index in column_indices: + c_column_indices.push_back(index) + + with nogil: + check_status(self.reader.get() + .ReadRowGroup(i, c_column_indices, &ctable)) + else: + # Read all columns + with nogil: + check_status(self.reader.get() + .ReadRowGroup(i, &ctable)) + return table_from_ctable(ctable) + + def read_all(self, column_indices=None): + cdef: + shared_ptr[CTable] ctable + vector[int] c_column_indices if column_indices is not None: - # Read only desired column indices for index in column_indices: c_column_indices.push_back(index) @@ -402,9 +425,7 @@ cdef class ParquetReader: with nogil: check_status(self.reader.get() .ReadTable(&ctable)) - - table.init(ctable) - return table + return table_from_ctable(ctable) def column_name_idx(self, column_name): """ http://git-wip-us.apache.org/repos/asf/arrow/blob/56f1e91d/python/pyarrow/parquet.py ---------------------------------------------------------------------- diff --git a/python/pyarrow/parquet.py b/python/pyarrow/parquet.py index 2985316..d95c3b3 100644 --- a/python/pyarrow/parquet.py +++ b/python/pyarrow/parquet.py @@ -50,7 +50,32 @@ class ParquetFile(object): def schema(self): return self.metadata.schema - def read(self, nrows=None, columns=None, nthreads=1): + @property + def num_row_groups(self): + return self.reader.num_row_groups + + def read_row_group(self, i, columns=None, nthreads=1): + """ + Read a single row group from a Parquet file + + Parameters + ---------- + columns: list + If not None, only these columns will be read from the row group. + nthreads : int, default 1 + Number of columns to read in parallel. If > 1, requires that the + underlying file source is threadsafe + + Returns + ------- + pyarrow.table.Table + Content of the row group as a table (of columns) + """ + column_indices = self._get_column_indices(columns) + self.reader.set_num_threads(nthreads) + return self.reader.read_row_group(i, column_indices=column_indices) + + def read(self, columns=None, nthreads=1): """ Read a Table from Parquet format @@ -67,17 +92,16 @@ class ParquetFile(object): pyarrow.table.Table Content of the file as a table (of columns) """ - if nrows is not None: - raise NotImplementedError("nrows argument") + column_indices = self._get_column_indices(columns) + self.reader.set_num_threads(nthreads) + return self.reader.read_all(column_indices=column_indices) - if columns is None: - column_indices = None + def _get_column_indices(self, column_names): + if column_names is None: + return None else: - column_indices = [self.reader.column_name_idx(column) - for column in columns] - - return self.reader.read(column_indices=column_indices, - nthreads=nthreads) + return [self.reader.column_name_idx(column) + for column in column_names] def read_table(source, columns=None, nthreads=1, metadata=None): @@ -178,8 +202,8 @@ def read_multiple_files(paths, columns=None, filesystem=None, nthreads=1, return all_data -def write_table(table, sink, chunk_size=None, version='1.0', - use_dictionary=True, compression='snappy'): +def write_table(table, sink, row_group_size=None, version='1.0', + use_dictionary=True, compression='snappy', **kwargs): """ Write a Table to Parquet format @@ -187,7 +211,7 @@ def write_table(table, sink, chunk_size=None, version='1.0', ---------- table : pyarrow.Table sink: string or pyarrow.io.NativeFile - chunk_size : int, default None + row_group_size : int, default None The maximum number of rows in each Parquet RowGroup. As a default, we will write a single RowGroup per file. version : {"1.0", "2.0"}, default "1.0" @@ -198,7 +222,8 @@ def write_table(table, sink, chunk_size=None, version='1.0', compression : str or dict Specify the compression codec, either on a general basis or per-column. """ + row_group_size = kwargs.get('chunk_size', row_group_size) writer = ParquetWriter(sink, use_dictionary=use_dictionary, compression=compression, version=version) - writer.write_table(table, row_group_size=chunk_size) + writer.write_table(table, row_group_size=row_group_size) http://git-wip-us.apache.org/repos/asf/arrow/blob/56f1e91d/python/pyarrow/tests/test_parquet.py ---------------------------------------------------------------------- diff --git a/python/pyarrow/tests/test_parquet.py b/python/pyarrow/tests/test_parquet.py index b8b2800..86165be 100644 --- a/python/pyarrow/tests/test_parquet.py +++ b/python/pyarrow/tests/test_parquet.py @@ -403,6 +403,35 @@ def test_pass_separate_metadata(): @parquet +def test_read_single_row_group(): + # ARROW-471 + N, K = 10000, 4 + df = alltypes_sample(size=N) + + a_table = pa.Table.from_pandas(df, timestamps_to_ms=True) + + buf = io.BytesIO() + pq.write_table(a_table, buf, row_group_size=N / K, + compression='snappy', version='2.0') + + buf.seek(0) + + pf = pq.ParquetFile(buf) + + assert pf.num_row_groups == K + + row_groups = [pf.read_row_group(i) for i in range(K)] + result = pa.concat_tables(row_groups) + pdt.assert_frame_equal(df, result.to_pandas()) + + cols = df.columns[:2] + row_groups = [pf.read_row_group(i, columns=cols) + for i in range(K)] + result = pa.concat_tables(row_groups) + pdt.assert_frame_equal(df[cols], result.to_pandas()) + + +@parquet def test_read_multiple_files(tmpdir): nfiles = 10 size = 5