Repository: arrow Updated Branches: refs/heads/master 874666a61 -> b4892fd9f
ARROW-528: [Python] Utilize improved Parquet writer C++ API, add write_metadata function, test _metadata files Author: Wes McKinney <[email protected]> Closes #539 from wesm/ARROW-528 and squashes the following commits: 848ff93 [Wes McKinney] Add test for _metadata file 8b8f333 [Wes McKinney] Refactor to use APIs introduced in PARQUET-953. Add write_metadata function Project: http://git-wip-us.apache.org/repos/asf/arrow/repo Commit: http://git-wip-us.apache.org/repos/asf/arrow/commit/b4892fd9 Tree: http://git-wip-us.apache.org/repos/asf/arrow/tree/b4892fd9 Diff: http://git-wip-us.apache.org/repos/asf/arrow/diff/b4892fd9 Branch: refs/heads/master Commit: b4892fd9fb676a678a966da51407b3ce4ba3ec65 Parents: 874666a Author: Wes McKinney <[email protected]> Authored: Fri Apr 14 12:15:57 2017 -0400 Committer: Wes McKinney <[email protected]> Committed: Fri Apr 14 12:15:57 2017 -0400 ---------------------------------------------------------------------- python/pyarrow/_parquet.pxd | 16 +++++++--- python/pyarrow/_parquet.pyx | 52 ++++++++++++++++++------------- python/pyarrow/parquet.py | 34 +++++++++++++++++--- python/pyarrow/tests/test_parquet.py | 24 ++++++++++++++ 4 files changed, 94 insertions(+), 32 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/arrow/blob/b4892fd9/python/pyarrow/_parquet.pxd ---------------------------------------------------------------------- diff --git a/python/pyarrow/_parquet.pxd b/python/pyarrow/_parquet.pxd index 1ac1f69..9f6edc0 100644 --- a/python/pyarrow/_parquet.pxd +++ b/python/pyarrow/_parquet.pxd @@ -235,8 +235,14 @@ cdef extern from "parquet/arrow/schema.h" namespace "parquet::arrow" nogil: cdef extern from "parquet/arrow/writer.h" namespace "parquet::arrow" nogil: - cdef CStatus WriteTable( - const CTable& table, CMemoryPool* pool, - const shared_ptr[OutputStream]& sink, - int64_t chunk_size, - const shared_ptr[WriterProperties]& properties) + cdef cppclass FileWriter: + + @staticmethod + CStatus Open(const CSchema& schema, CMemoryPool* pool, + const shared_ptr[OutputStream]& sink, + const shared_ptr[WriterProperties]& properties, + unique_ptr[FileWriter]* writer) + + CStatus WriteTable(const CTable& table, int64_t chunk_size) + CStatus NewRowGroup(int64_t chunk_size) + CStatus Close() http://git-wip-us.apache.org/repos/asf/arrow/blob/b4892fd9/python/pyarrow/_parquet.pyx ---------------------------------------------------------------------- diff --git a/python/pyarrow/_parquet.pyx b/python/pyarrow/_parquet.pyx index 5418e1d..b7358a6 100644 --- a/python/pyarrow/_parquet.pyx +++ b/python/pyarrow/_parquet.pyx @@ -23,7 +23,7 @@ from cython.operator cimport dereference as deref from pyarrow.includes.common cimport * from pyarrow.includes.libarrow cimport * cimport pyarrow.includes.pyarrow as pyarrow -from pyarrow._array cimport Array +from pyarrow._array cimport Array, Schema from pyarrow._error cimport check_status from pyarrow._memory cimport MemoryPool, maybe_unbox_memory_pool from pyarrow._table cimport Table, table_from_ctable @@ -108,7 +108,7 @@ cdef class FileMetaData: if self._schema is not None: return self._schema - cdef Schema schema = Schema() + cdef ParquetSchema schema = ParquetSchema() schema.init_from_filemeta(self) self._schema = schema return schema @@ -160,7 +160,7 @@ cdef class FileMetaData: return result -cdef class Schema: +cdef class ParquetSchema: cdef: object parent # the FileMetaData owning the SchemaDescriptor const SchemaDescriptor* schema @@ -194,7 +194,7 @@ cdef class Schema: def __getitem__(self, i): return self.column(i) - def equals(self, Schema other): + def equals(self, ParquetSchema other): """ Returns True if the Parquet schemas are equal """ @@ -217,7 +217,7 @@ cdef class ColumnSchema: def __cinit__(self): self.descr = NULL - cdef init_from_schema(self, Schema schema, int i): + cdef init_from_schema(self, ParquetSchema schema, int i): self.parent = schema self.descr = schema.schema.Column(i) @@ -373,7 +373,8 @@ cdef class ParquetReader: if self._metadata is not None: return self._metadata - metadata = self.reader.get().parquet_reader().metadata() + with nogil: + metadata = self.reader.get().parquet_reader().metadata() self._metadata = result = FileMetaData() result.init(metadata) @@ -487,9 +488,7 @@ cdef ParquetCompression compression_from_name(object name): cdef class ParquetWriter: cdef: - shared_ptr[WriterProperties] properties - shared_ptr[OutputStream] sink - CMemoryPool* allocator + unique_ptr[FileWriter] writer cdef readonly: object use_dictionary @@ -497,28 +496,34 @@ cdef class ParquetWriter: object version int row_group_size - def __cinit__(self, where, use_dictionary=None, compression=None, - version=None, MemoryPool memory_pool=None): - cdef shared_ptr[FileOutputStream] filestream + def __cinit__(self, where, Schema schema, use_dictionary=None, + compression=None, version=None, + MemoryPool memory_pool=None): + cdef: + shared_ptr[FileOutputStream] filestream + shared_ptr[OutputStream] sink + shared_ptr[WriterProperties] properties if isinstance(where, six.string_types): check_status(FileOutputStream.Open(tobytes(where), &filestream)) - self.sink = <shared_ptr[OutputStream]> filestream + sink = <shared_ptr[OutputStream]> filestream else: - get_writer(where, &self.sink) - self.allocator = maybe_unbox_memory_pool(memory_pool) + get_writer(where, &sink) self.use_dictionary = use_dictionary self.compression = compression self.version = version - self._setup_properties() - cdef _setup_properties(self): cdef WriterProperties.Builder properties_builder self._set_version(&properties_builder) self._set_compression_props(&properties_builder) self._set_dictionary_props(&properties_builder) - self.properties = properties_builder.build() + properties = properties_builder.build() + + check_status( + FileWriter.Open(deref(schema.schema), + maybe_unbox_memory_pool(memory_pool), + sink, properties, &self.writer)) cdef _set_version(self, WriterProperties.Builder* props): if self.version is not None: @@ -546,12 +551,16 @@ cdef class ParquetWriter: props.enable_dictionary() else: props.disable_dictionary() - else: + elif self.use_dictionary is not None: # Deactivate dictionary encoding by default props.disable_dictionary() for column in self.use_dictionary: props.enable_dictionary(column) + def close(self): + with nogil: + check_status(self.writer.get().Close()) + def write_table(self, Table table, row_group_size=None): cdef CTable* ctable = table.table @@ -563,6 +572,5 @@ cdef class ParquetWriter: cdef int c_row_group_size = row_group_size with nogil: - check_status(WriteTable(deref(ctable), self.allocator, - self.sink, c_row_group_size, - self.properties)) + check_status(self.writer.get() + .WriteTable(deref(ctable), c_row_group_size)) http://git-wip-us.apache.org/repos/asf/arrow/blob/b4892fd9/python/pyarrow/parquet.py ---------------------------------------------------------------------- diff --git a/python/pyarrow/parquet.py b/python/pyarrow/parquet.py index aaec43a..4ff7e03 100644 --- a/python/pyarrow/parquet.py +++ b/python/pyarrow/parquet.py @@ -21,7 +21,8 @@ import numpy as np from pyarrow.filesystem import LocalFilesystem from pyarrow._parquet import (ParquetReader, FileMetaData, # noqa - RowGroupMetaData, Schema, ParquetWriter) + RowGroupMetaData, ParquetSchema, + ParquetWriter) import pyarrow._parquet as _parquet # noqa import pyarrow._array as _array import pyarrow._table as _table @@ -471,7 +472,8 @@ class ParquetDataset(object): else: self.fs = filesystem - self.pieces, self.partitions = _make_manifest(path_or_paths, self.fs) + (self.pieces, self.partitions, + self.metadata_path) = _make_manifest(path_or_paths, self.fs) self.metadata = metadata self.schema = schema @@ -488,7 +490,10 @@ class ParquetDataset(object): open_file = self._get_open_file_func() if self.metadata is None and self.schema is None: - self.schema = self.pieces[0].get_metadata(open_file).schema + if self.metadata_path is not None: + self.schema = open_file(self.metadata_path).schema + else: + self.schema = self.pieces[0].get_metadata(open_file).schema elif self.schema is None: self.schema = self.metadata.schema @@ -543,10 +548,12 @@ class ParquetDataset(object): def _make_manifest(path_or_paths, fs, pathsep='/'): partitions = None + metadata_path = None if is_string(path_or_paths) and fs.isdir(path_or_paths): manifest = ParquetManifest(path_or_paths, filesystem=fs, pathsep=pathsep) + metadata_path = manifest.metadata_path pieces = manifest.pieces partitions = manifest.partitions else: @@ -565,7 +572,7 @@ def _make_manifest(path_or_paths, fs, pathsep='/'): piece = ParquetDatasetPiece(path) pieces.append(piece) - return pieces, partitions + return pieces, partitions, metadata_path def read_table(source, columns=None, nthreads=1, metadata=None): @@ -622,7 +629,24 @@ def write_table(table, where, row_group_size=None, version='1.0', Specify the compression codec, either on a general basis or per-column. """ row_group_size = kwargs.get('chunk_size', row_group_size) - writer = ParquetWriter(where, use_dictionary=use_dictionary, + writer = ParquetWriter(where, table.schema, + use_dictionary=use_dictionary, compression=compression, version=version) writer.write_table(table, row_group_size=row_group_size) + writer.close() + + +def write_metadata(schema, where, version='1.0'): + """ + Write metadata-only Parquet file from schema + + Parameters + ---------- + schema : pyarrow.Schema + where: string or pyarrow.io.NativeFile + version : {"1.0", "2.0"}, default "1.0" + The Parquet format version, defaults to 1.0 + """ + writer = ParquetWriter(where, schema, version=version) + writer.close() http://git-wip-us.apache.org/repos/asf/arrow/blob/b4892fd9/python/pyarrow/tests/test_parquet.py ---------------------------------------------------------------------- diff --git a/python/pyarrow/tests/test_parquet.py b/python/pyarrow/tests/test_parquet.py index a5c70aa..ca6ae2d 100644 --- a/python/pyarrow/tests/test_parquet.py +++ b/python/pyarrow/tests/test_parquet.py @@ -529,6 +529,30 @@ def _generate_partition_directories(base_dir, partition_spec, df): _visit_level(base_dir, 0, []) +@parquet +def test_read_common_metadata_files(tmpdir): + N = 100 + df = pd.DataFrame({ + 'index': np.arange(N), + 'values': np.random.randn(N) + }, columns=['index', 'values']) + + base_path = str(tmpdir) + data_path = pjoin(base_path, 'data.parquet') + + table = pa.Table.from_pandas(df) + pq.write_table(table, data_path) + + metadata_path = pjoin(base_path, '_metadata') + pq.write_metadata(table.schema, metadata_path) + + dataset = pq.ParquetDataset(base_path) + assert dataset.metadata_path == metadata_path + + pf = pq.ParquetFile(data_path) + assert dataset.schema.equals(pf.schema) + + def _filter_partition(df, part_keys): predicate = np.ones(len(df), dtype=bool)
