This is an automated email from the ASF dual-hosted git repository. wesm pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/arrow.git
commit b92c435b8f64f98593267fd72ecd61d26c23ffc0 Author: Wes McKinney <wes.mckin...@twosigma.com> AuthorDate: Tue Nov 28 20:07:11 2017 -0500 ARROW-1684: [Python] Support selecting nested Parquet fields by any path prefix Author: Wes McKinney <wes.mckin...@twosigma.com> Closes #1366 from wesm/ARROW-1684 and squashes the following commits: e63e42aa [Wes McKinney] Support selecting nested Parquet fields by any path prefix --- python/pyarrow/_parquet.pxd | 1 + python/pyarrow/_parquet.pyx | 29 ++++++++++++++++++++----- python/pyarrow/parquet.py | 41 +++++++++++++++++++++++++++++++----- python/pyarrow/tests/test_parquet.py | 22 +++++++++++++++++++ 4 files changed, 83 insertions(+), 10 deletions(-) diff --git a/python/pyarrow/_parquet.pxd b/python/pyarrow/_parquet.pxd index 7e5e575..55b66b5 100644 --- a/python/pyarrow/_parquet.pxd +++ b/python/pyarrow/_parquet.pxd @@ -37,6 +37,7 @@ cdef extern from "parquet/api/schema.h" namespace "parquet::schema" nogil: cdef cppclass ColumnPath: c_string ToDotString() + vector[c_string] ToDotVector() cdef extern from "parquet/api/schema.h" namespace "parquet" nogil: diff --git a/python/pyarrow/_parquet.pyx b/python/pyarrow/_parquet.pyx index eca6b20..147af21 100644 --- a/python/pyarrow/_parquet.pyx +++ b/python/pyarrow/_parquet.pyx @@ -600,9 +600,11 @@ cdef class ParquetReader: object source CMemoryPool* allocator unique_ptr[FileReader] reader - column_idx_map FileMetaData _metadata + cdef public: + _column_idx_map + def __cinit__(self, MemoryPool memory_pool=None): self.allocator = maybe_unbox_memory_pool(memory_pool) self._metadata = None @@ -624,6 +626,23 @@ cdef class ParquetReader: check_status(OpenFile(rd_handle, self.allocator, properties, c_metadata, &self.reader)) + property column_paths: + + def __get__(self): + cdef: + FileMetaData container = self.metadata + const CFileMetaData* metadata = container._metadata + vector[c_string] path + int i = 0 + + paths = [] + for i in range(0, metadata.num_columns()): + path = (metadata.schema().Column(i) + .path().get().ToDotVector()) + paths.append([frombytes(x) for x in path]) + + return paths + @property def metadata(self): cdef: @@ -729,14 +748,14 @@ cdef class ParquetReader: const CFileMetaData* metadata = container._metadata int i = 0 - if self.column_idx_map is None: - self.column_idx_map = {} + if self._column_idx_map is None: + self._column_idx_map = {} for i in range(0, metadata.num_columns()): col_bytes = tobytes(metadata.schema().Column(i) .path().get().ToDotString()) - self.column_idx_map[col_bytes] = i + self._column_idx_map[col_bytes] = i - return self.column_idx_map[tobytes(column_name)] + return self._column_idx_map[tobytes(column_name)] def read_column(self, int column_index): cdef: diff --git a/python/pyarrow/parquet.py b/python/pyarrow/parquet.py index 37da662..9fb890c 100644 --- a/python/pyarrow/parquet.py +++ b/python/pyarrow/parquet.py @@ -15,6 +15,7 @@ # specific language governing permissions and limitations # under the License. +from collections import defaultdict import os import inspect import json @@ -54,6 +55,24 @@ class ParquetFile(object): self.reader = ParquetReader() self.reader.open(source, metadata=metadata) self.common_metadata = common_metadata + self._nested_paths_by_prefix = self._build_nested_paths() + + def _build_nested_paths(self): + paths = self.reader.column_paths + + result = defaultdict(list) + + def _visit_piece(i, key, rest): + result[key].append(i) + + if len(rest) > 0: + nested_key = '.'.join((key, rest[0])) + _visit_piece(i, nested_key, rest[1:]) + + for i, path in enumerate(paths): + _visit_piece(i, path[0], path[1:]) + + return result @property def metadata(self): @@ -75,7 +94,9 @@ class ParquetFile(object): Parameters ---------- columns: list - If not None, only these columns will be read from the row group. + If not None, only these columns will be read from the row group. A + column name may be a prefix of a nested field, e.g. 'a' will select + 'a.b', 'a.c', and 'a.d.e' nthreads : int, default 1 Number of columns to read in parallel. If > 1, requires that the underlying file source is threadsafe @@ -100,7 +121,9 @@ class ParquetFile(object): Parameters ---------- columns: list - If not None, only these columns will be read from the file. + If not None, only these columns will be read from the file. A + column name may be a prefix of a nested field, e.g. 'a' will select + 'a.b', 'a.c', and 'a.d.e' nthreads : int, default 1 Number of columns to read in parallel. If > 1, requires that the underlying file source is threadsafe @@ -143,7 +166,11 @@ class ParquetFile(object): if column_names is None: return None - indices = list(map(self.reader.column_name_idx, column_names)) + indices = [] + + for name in column_names: + if name in self._nested_paths_by_prefix: + indices.extend(self._nested_paths_by_prefix[name]) if use_pandas_metadata: file_keyvalues = self.metadata.metadata @@ -837,7 +864,9 @@ def read_table(source, columns=None, nthreads=1, metadata=None, name or directory name. For passing Python file objects or byte buffers, see pyarrow.io.PythonFileInterface or pyarrow.io.BufferReader. columns: list - If not None, only these columns will be read from the file. + If not None, only these columns will be read from the file. A column + name may be a prefix of a nested field, e.g. 'a' will select 'a.b', + 'a.c', and 'a.d.e' nthreads : int, default 1 Number of columns to read in parallel. Requires that the underlying file source is threadsafe @@ -875,7 +904,9 @@ def read_pandas(source, columns=None, nthreads=1, metadata=None): name. For passing Python file objects or byte buffers, see pyarrow.io.PythonFileInterface or pyarrow.io.BufferReader. columns: list - If not None, only these columns will be read from the file. + If not None, only these columns will be read from the file. A column + name may be a prefix of a nested field, e.g. 'a' will select 'a.b', + 'a.c', and 'a.d.e' nthreads : int, default 1 Number of columns to read in parallel. Requires that the underlying file source is threadsafe diff --git a/python/pyarrow/tests/test_parquet.py b/python/pyarrow/tests/test_parquet.py index 274ff45..9004fc0 100644 --- a/python/pyarrow/tests/test_parquet.py +++ b/python/pyarrow/tests/test_parquet.py @@ -1469,6 +1469,28 @@ def test_index_column_name_duplicate(tmpdir): @parquet +def test_parquet_nested_convenience(tmpdir): + # ARROW-1684 + import pyarrow.parquet as pq + + df = pd.DataFrame({ + 'a': [[1, 2, 3], None, [4, 5], []], + 'b': [[1.], None, None, [6., 7.]], + }) + + path = str(tmpdir / 'nested_convenience.parquet') + + table = pa.Table.from_pandas(df, preserve_index=False) + _write_table(table, path) + + read = pq.read_table(path, columns=['a']) + tm.assert_frame_equal(read.to_pandas(), df[['a']]) + + read = pq.read_table(path, columns=['a', 'b']) + tm.assert_frame_equal(read.to_pandas(), df) + + +@parquet def test_backwards_compatible_index_naming(): expected_string = b"""\ carat cut color clarity depth table price x y z -- To stop receiving notification emails like this one, please contact "commits@arrow.apache.org" <commits@arrow.apache.org>.