Repository: arrow Updated Branches: refs/heads/master e4845c447 -> 1f26040f5
ARROW-548: [Python] Add nthreads to Filesystem.read_parquet and pass through Author: Wes McKinney <[email protected]> Closes #337 from wesm/ARROW-548 and squashes the following commits: b9aeaeb [Wes McKinney] Add nthreads to Filesystem.read_parquet and pass through Project: http://git-wip-us.apache.org/repos/asf/arrow/repo Commit: http://git-wip-us.apache.org/repos/asf/arrow/commit/1f26040f Tree: http://git-wip-us.apache.org/repos/asf/arrow/tree/1f26040f Diff: http://git-wip-us.apache.org/repos/asf/arrow/diff/1f26040f Branch: refs/heads/master Commit: 1f26040f55eb54e00dc5e67ce0c1df64e51a1567 Parents: e4845c4 Author: Wes McKinney <[email protected]> Authored: Mon Feb 13 09:52:59 2017 +0100 Committer: Uwe L. Korn <[email protected]> Committed: Mon Feb 13 09:52:59 2017 +0100 ---------------------------------------------------------------------- python/pyarrow/filesystem.py | 9 +++++++-- python/pyarrow/parquet.py | 4 ++-- python/pyarrow/tests/test_parquet.py | 8 +++++++- 3 files changed, 16 insertions(+), 5 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/arrow/blob/1f26040f/python/pyarrow/filesystem.py ---------------------------------------------------------------------- diff --git a/python/pyarrow/filesystem.py b/python/pyarrow/filesystem.py index 55bcad0..e820806 100644 --- a/python/pyarrow/filesystem.py +++ b/python/pyarrow/filesystem.py @@ -62,7 +62,8 @@ class Filesystem(object): """ raise NotImplementedError - def read_parquet(self, path, columns=None, metadata=None, schema=None): + def read_parquet(self, path, columns=None, metadata=None, schema=None, + nthreads=1): """ Read Parquet data from path in file system. Can read from a single file or a directory of files @@ -78,6 +79,9 @@ class Filesystem(object): schema : pyarrow.parquet.Schema Known schema to validate files against. Alternative to metadata argument + nthreads : int, default 1 + Number of columns to read in parallel. If > 1, requires that the + underlying file source is threadsafe Returns ------- @@ -95,7 +99,8 @@ class Filesystem(object): return read_multiple_files(paths_to_read, columns=columns, filesystem=self, schema=schema, - metadata=metadata) + metadata=metadata, + nthreads=nthreads) class LocalFilesystem(Filesystem): http://git-wip-us.apache.org/repos/asf/arrow/blob/1f26040f/python/pyarrow/parquet.py ---------------------------------------------------------------------- diff --git a/python/pyarrow/parquet.py b/python/pyarrow/parquet.py index 9766ff6..fa96f95 100644 --- a/python/pyarrow/parquet.py +++ b/python/pyarrow/parquet.py @@ -59,8 +59,8 @@ class ParquetFile(object): columns: list If not None, only these columns will be read from the file. nthreads : int, default 1 - Number of columns to read in parallel. Requires that the underlying - file source is threadsafe + Number of columns to read in parallel. If > 1, requires that the + underlying file source is threadsafe Returns ------- http://git-wip-us.apache.org/repos/asf/arrow/blob/1f26040f/python/pyarrow/tests/test_parquet.py ---------------------------------------------------------------------- diff --git a/python/pyarrow/tests/test_parquet.py b/python/pyarrow/tests/test_parquet.py index 969f68b..96f2d15 100644 --- a/python/pyarrow/tests/test_parquet.py +++ b/python/pyarrow/tests/test_parquet.py @@ -320,17 +320,20 @@ def test_compare_schemas(): assert fileh.schema[0].equals(fileh.schema[0]) assert not fileh.schema[0].equals(fileh.schema[1]) + @parquet def test_column_of_lists(tmpdir): df, schema = dataframe_with_arrays() filename = tmpdir.join('pandas_rountrip.parquet') - arrow_table = pa.Table.from_pandas(df, timestamps_to_ms=True, schema=schema) + arrow_table = pa.Table.from_pandas(df, timestamps_to_ms=True, + schema=schema) pq.write_table(arrow_table, filename.strpath, version="2.0") table_read = pq.read_table(filename.strpath) df_read = table_read.to_pandas() pdt.assert_frame_equal(df, df_read) + @parquet def test_multithreaded_read(): df = alltypes_sample(size=10000) @@ -418,6 +421,9 @@ def test_read_multiple_files(tmpdir): expected = pa.Table.from_arrays(to_read) assert result.equals(expected) + # Read with multiple threads + pa.localfs.read_parquet(dirpath, nthreads=2) + # Test failure modes with non-uniform metadata bad_apple = _test_dataframe(size, seed=i).iloc[:, :4] bad_apple_path = tmpdir.join('{0}.parquet'.format(guid())).strpath
