Repository: arrow Updated Branches: refs/heads/master c90ca60c1 -> 61a54f8a6
ARROW-509: [Python] Add support for multithreaded Parquet reads I'm getting very nice speedups on a Parquet file storing a ~4.5 GB dataset: ``` In [1]: import pyarrow.parquet as pq In [2]: %time table = pq.read_table('/home/wesm/data/airlines_parquet/4345e5eef217aa1b-c8f16177f35fd983_1150363067_data.0.parq') CPU times: user 8.21 s, sys: 468 ms, total: 8.68 s Wall time: 8.68 s In [3]: %time table = pq.read_table('/home/wesm/data/airlines_parquet/4345e5eef217aa1b-c8f16177f35fd983_1150363067_data.0.parq', nthreads=4) CPU times: user 8.84 s, sys: 4.28 s, total: 13.1 s Wall time: 3.91 s In [4]: %time table = pq.read_table('/home/wesm/data/airlines_parquet/4345e5eef217aa1b-c8f16177f35fd983_1150363067_data.0.parq', nthreads=8) CPU times: user 13.3 s, sys: 1.15 s, total: 14.4 s Wall time: 2.86 s ``` This requires a bugfix in parquet-cpp that will come soon in a patch for PARQUET-836 Author: Wes McKinney <wes.mckin...@twosigma.com> Closes #301 from wesm/ARROW-509 and squashes the following commits: 9816689 [Wes McKinney] Update docs slightly, flake8 warning 239b086 [Wes McKinney] Add support for nthreads option in parquet::arrow, unit tests Project: http://git-wip-us.apache.org/repos/asf/arrow/repo Commit: http://git-wip-us.apache.org/repos/asf/arrow/commit/61a54f8a Tree: http://git-wip-us.apache.org/repos/asf/arrow/tree/61a54f8a Diff: http://git-wip-us.apache.org/repos/asf/arrow/diff/61a54f8a Branch: refs/heads/master Commit: 61a54f8a619efc4fd256c446be29905d6484c5e9 Parents: c90ca60 Author: Wes McKinney <wes.mckin...@twosigma.com> Authored: Tue Jan 24 08:30:37 2017 -0500 Committer: Wes McKinney <wes.mckin...@twosigma.com> Committed: Tue Jan 24 08:30:37 2017 -0500 ---------------------------------------------------------------------- python/pyarrow/_parquet.pxd | 4 ++++ python/pyarrow/_parquet.pyx | 21 ++++++++++++++---- python/pyarrow/parquet.py | 36 ++++++++++++++++++++----------- python/pyarrow/tests/test_parquet.py | 18 ++++++++++++++++ 4 files changed, 62 insertions(+), 17 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/arrow/blob/61a54f8a/python/pyarrow/_parquet.pxd ---------------------------------------------------------------------- diff --git a/python/pyarrow/_parquet.pxd b/python/pyarrow/_parquet.pxd index cf1da1c..fabee5d 100644 --- a/python/pyarrow/_parquet.pxd +++ b/python/pyarrow/_parquet.pxd @@ -213,8 +213,12 @@ cdef extern from "parquet/arrow/reader.h" namespace "parquet::arrow" nogil: FileReader(MemoryPool* pool, unique_ptr[ParquetFileReader] reader) CStatus ReadFlatColumn(int i, shared_ptr[CArray]* out); CStatus ReadFlatTable(shared_ptr[CTable]* out); + CStatus ReadFlatTable(const vector[int]& column_indices, + shared_ptr[CTable]* out); const ParquetFileReader* parquet_reader(); + void set_num_threads(int num_threads) + cdef extern from "parquet/arrow/schema.h" namespace "parquet::arrow" nogil: CStatus FromParquetSchema(const SchemaDescriptor* parquet_schema, http://git-wip-us.apache.org/repos/asf/arrow/blob/61a54f8a/python/pyarrow/_parquet.pyx ---------------------------------------------------------------------- diff --git a/python/pyarrow/_parquet.pyx b/python/pyarrow/_parquet.pyx index b11cee3..3f847e9 100644 --- a/python/pyarrow/_parquet.pyx +++ b/python/pyarrow/_parquet.pyx @@ -382,14 +382,27 @@ cdef class ParquetReader: result.init(metadata) return result - def read_all(self): + def read(self, column_indices=None, nthreads=1): cdef: Table table = Table() shared_ptr[CTable] ctable + vector[int] c_column_indices - with nogil: - check_status(self.reader.get() - .ReadFlatTable(&ctable)) + self.reader.get().set_num_threads(nthreads) + + if column_indices is not None: + # Read only desired column indices + for index in column_indices: + c_column_indices.push_back(index) + + with nogil: + check_status(self.reader.get() + .ReadFlatTable(c_column_indices, &ctable)) + else: + # Read all columns + with nogil: + check_status(self.reader.get() + .ReadFlatTable(&ctable)) table.init(ctable) return table http://git-wip-us.apache.org/repos/asf/arrow/blob/61a54f8a/python/pyarrow/parquet.py ---------------------------------------------------------------------- diff --git a/python/pyarrow/parquet.py b/python/pyarrow/parquet.py index cbe1c6e..6654b77 100644 --- a/python/pyarrow/parquet.py +++ b/python/pyarrow/parquet.py @@ -18,7 +18,7 @@ from pyarrow._parquet import (ParquetReader, FileMetaData, # noqa RowGroupMetaData, Schema, ParquetWriter) import pyarrow._parquet as _parquet # noqa -from pyarrow.table import Table, concat_tables +from pyarrow.table import concat_tables class ParquetFile(object): @@ -45,7 +45,7 @@ class ParquetFile(object): def schema(self): return self.metadata.schema - def read(self, nrows=None, columns=None): + def read(self, nrows=None, columns=None, nthreads=1): """ Read a Table from Parquet format @@ -53,6 +53,9 @@ class ParquetFile(object): ---------- columns: list If not None, only these columns will be read from the file. + nthreads : int, default 1 + Number of columns to read in parallel. Requires that the underlying + file source is threadsafe Returns ------- @@ -63,16 +66,16 @@ class ParquetFile(object): raise NotImplementedError("nrows argument") if columns is None: - return self.reader.read_all() + column_indices = None else: - column_idxs = [self.reader.column_name_idx(column) - for column in columns] - arrays = [self.reader.read_column(column_idx) - for column_idx in column_idxs] - return Table.from_arrays(arrays, names=columns) + column_indices = [self.reader.column_name_idx(column) + for column in columns] + return self.reader.read(column_indices=column_indices, + nthreads=nthreads) -def read_table(source, columns=None, metadata=None): + +def read_table(source, columns=None, nthreads=1, metadata=None): """ Read a Table from Parquet format @@ -83,6 +86,9 @@ def read_table(source, columns=None, metadata=None): pyarrow.io.PythonFileInterface or pyarrow.io.BufferReader. columns: list If not None, only these columns will be read from the file. + nthreads : int, default 1 + Number of columns to read in parallel. Requires that the underlying + file source is threadsafe metadata : FileMetaData If separately computed @@ -91,11 +97,12 @@ def read_table(source, columns=None, metadata=None): pyarrow.Table Content of the file as a table (of columns) """ - return ParquetFile(source, metadata=metadata).read(columns=columns) + pf = ParquetFile(source, metadata=metadata) + return pf.read(columns=columns, nthreads=nthreads) -def read_multiple_files(paths, columns=None, filesystem=None, metadata=None, - schema=None): +def read_multiple_files(paths, columns=None, filesystem=None, nthreads=1, + metadata=None, schema=None): """ Read multiple Parquet files as a single pyarrow.Table @@ -108,6 +115,9 @@ def read_multiple_files(paths, columns=None, filesystem=None, metadata=None, filesystem : Filesystem, default None If nothing passed, paths assumed to be found in the local on-disk filesystem + nthreads : int, default 1 + Number of columns to read in parallel. Requires that the underlying + file source is threadsafe metadata : pyarrow.parquet.FileMetaData Use metadata obtained elsewhere to validate file schemas schema : pyarrow.parquet.Schema @@ -147,7 +157,7 @@ def read_multiple_files(paths, columns=None, filesystem=None, metadata=None, tables = [] for path, path_metadata in zip(paths, all_file_metadata): reader = open_file(path, meta=path_metadata) - table = reader.read(columns=columns) + table = reader.read(columns=columns, nthreads=nthreads) tables.append(table) all_data = concat_tables(tables) http://git-wip-us.apache.org/repos/asf/arrow/blob/61a54f8a/python/pyarrow/tests/test_parquet.py ---------------------------------------------------------------------- diff --git a/python/pyarrow/tests/test_parquet.py b/python/pyarrow/tests/test_parquet.py index a94fe45..d85f0e5 100644 --- a/python/pyarrow/tests/test_parquet.py +++ b/python/pyarrow/tests/test_parquet.py @@ -321,6 +321,24 @@ def test_compare_schemas(): @parquet +def test_multithreaded_read(): + df = alltypes_sample(size=10000) + + table = pa.Table.from_pandas(df, timestamps_to_ms=True) + + buf = io.BytesIO() + pq.write_table(table, buf, compression='SNAPPY', version='2.0') + + buf.seek(0) + table1 = pq.read_table(buf, nthreads=4) + + buf.seek(0) + table2 = pq.read_table(buf, nthreads=1) + + assert table1.equals(table2) + + +@parquet def test_pass_separate_metadata(): # ARROW-471 df = alltypes_sample(size=10000)