Repository: arrow Updated Branches: refs/heads/master 33c731dbd -> 06be7aed0
ARROW-389: Python: Write Parquet files to pyarrow.io.NativeFile objects Author: Uwe L. Korn <uw...@xhochy.com> Closes #214 from xhochy/ARROW-389 and squashes the following commits: e66c895 [Uwe L. Korn] Switch image to deprecated group 876cd65 [Uwe L. Korn] ARROW-389: Python: Write Parquet files to pyarrow.io.NativeFile objects Project: http://git-wip-us.apache.org/repos/asf/arrow/repo Commit: http://git-wip-us.apache.org/repos/asf/arrow/commit/06be7aed Tree: http://git-wip-us.apache.org/repos/asf/arrow/tree/06be7aed Diff: http://git-wip-us.apache.org/repos/asf/arrow/diff/06be7aed Branch: refs/heads/master Commit: 06be7aed062aca32b683f2ab3a94a201ae54b4f3 Parents: 33c731d Author: Uwe L. Korn <uw...@xhochy.com> Authored: Fri Dec 2 11:48:24 2016 -0500 Committer: Wes McKinney <wes.mckin...@twosigma.com> Committed: Fri Dec 2 11:48:24 2016 -0500 ---------------------------------------------------------------------- .travis.yml | 1 + python/pyarrow/includes/parquet.pxd | 7 +++++-- python/pyarrow/parquet.pyx | 18 ++++++++++++------ python/pyarrow/tests/test_parquet.py | 27 +++++++++++++++++++++++++++ 4 files changed, 45 insertions(+), 8 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/arrow/blob/06be7aed/.travis.yml ---------------------------------------------------------------------- diff --git a/.travis.yml b/.travis.yml index 052c22c..bfc2f26 100644 --- a/.travis.yml +++ b/.travis.yml @@ -24,6 +24,7 @@ matrix: - compiler: gcc language: cpp os: linux + group: deprecated before_script: - export CC="gcc-4.9" - export CXX="g++-4.9" http://git-wip-us.apache.org/repos/asf/arrow/blob/06be7aed/python/pyarrow/includes/parquet.pxd ---------------------------------------------------------------------- diff --git a/python/pyarrow/includes/parquet.pxd b/python/pyarrow/includes/parquet.pxd index 57c35ba..cb791e1 100644 --- a/python/pyarrow/includes/parquet.pxd +++ b/python/pyarrow/includes/parquet.pxd @@ -19,7 +19,7 @@ from pyarrow.includes.common cimport * from pyarrow.includes.libarrow cimport CArray, CSchema, CStatus, CTable, MemoryPool -from pyarrow.includes.libarrow_io cimport ReadableFileInterface +from pyarrow.includes.libarrow_io cimport ReadableFileInterface, OutputStream cdef extern from "parquet/api/schema.h" namespace "parquet::schema" nogil: @@ -131,6 +131,9 @@ cdef extern from "parquet/arrow/io.h" namespace "parquet::arrow" nogil: ParquetReadSource(ParquetAllocator* allocator) Open(const shared_ptr[ReadableFileInterface]& file) + cdef cppclass ParquetWriteSink: + ParquetWriteSink(const shared_ptr[OutputStream]& file) + cdef extern from "parquet/arrow/reader.h" namespace "parquet::arrow" nogil: CStatus OpenFile(const shared_ptr[ReadableFileInterface]& file, @@ -154,6 +157,6 @@ cdef extern from "parquet/arrow/schema.h" namespace "parquet::arrow" nogil: cdef extern from "parquet/arrow/writer.h" namespace "parquet::arrow" nogil: cdef CStatus WriteFlatTable( const CTable* table, MemoryPool* pool, - const shared_ptr[ParquetOutputStream]& sink, + const shared_ptr[ParquetWriteSink]& sink, int64_t chunk_size, const shared_ptr[WriterProperties]& properties) http://git-wip-us.apache.org/repos/asf/arrow/blob/06be7aed/python/pyarrow/parquet.pyx ---------------------------------------------------------------------- diff --git a/python/pyarrow/parquet.pyx b/python/pyarrow/parquet.pyx index a6e3ac3..83fddb2 100644 --- a/python/pyarrow/parquet.pyx +++ b/python/pyarrow/parquet.pyx @@ -21,7 +21,7 @@ from pyarrow.includes.libarrow cimport * from pyarrow.includes.parquet cimport * -from pyarrow.includes.libarrow_io cimport ReadableFileInterface +from pyarrow.includes.libarrow_io cimport ReadableFileInterface, OutputStream, FileOutputStream cimport pyarrow.includes.pyarrow as pyarrow from pyarrow.array cimport Array @@ -151,7 +151,7 @@ def read_table(source, columns=None): return Table.from_arrays(columns, arrays) -def write_table(table, filename, chunk_size=None, version=None, +def write_table(table, sink, chunk_size=None, version=None, use_dictionary=True, compression=None): """ Write a Table to Parquet format @@ -159,7 +159,7 @@ def write_table(table, filename, chunk_size=None, version=None, Parameters ---------- table : pyarrow.Table - filename : string + sink: string or pyarrow.io.NativeFile chunk_size : int The maximum number of rows in each Parquet RowGroup. As a default, we will write a single RowGroup per file. @@ -173,7 +173,8 @@ def write_table(table, filename, chunk_size=None, version=None, """ cdef Table table_ = table cdef CTable* ctable_ = table_.table - cdef shared_ptr[ParquetOutputStream] sink + cdef shared_ptr[ParquetWriteSink] sink_ + cdef shared_ptr[FileOutputStream] filesink_ cdef WriterProperties.Builder properties_builder cdef int64_t chunk_size_ = 0 if chunk_size is None: @@ -230,7 +231,12 @@ def write_table(table, filename, chunk_size=None, version=None, else: raise ArrowException("Unsupport compression codec") - sink.reset(new LocalFileOutputStream(tobytes(filename))) + if isinstance(sink, six.string_types): + check_status(FileOutputStream.Open(tobytes(sink), &filesink_)) + sink_.reset(new ParquetWriteSink(<shared_ptr[OutputStream]>filesink_)) + elif isinstance(sink, NativeFile): + sink_.reset(new ParquetWriteSink((<NativeFile>sink).wr_file)) + with nogil: - check_status(WriteFlatTable(ctable_, default_memory_pool(), sink, + check_status(WriteFlatTable(ctable_, default_memory_pool(), sink_, chunk_size_, properties_builder.build())) http://git-wip-us.apache.org/repos/asf/arrow/blob/06be7aed/python/pyarrow/tests/test_parquet.py ---------------------------------------------------------------------- diff --git a/python/pyarrow/tests/test_parquet.py b/python/pyarrow/tests/test_parquet.py index c1d44ce..841830f 100644 --- a/python/pyarrow/tests/test_parquet.py +++ b/python/pyarrow/tests/test_parquet.py @@ -18,6 +18,7 @@ import pytest import pyarrow as A +import pyarrow.io as paio import numpy as np import pandas as pd @@ -132,6 +133,32 @@ def test_pandas_column_selection(tmpdir): pdt.assert_frame_equal(df[['uint8']], df_read) @parquet +def test_pandas_parquet_native_file_roundtrip(tmpdir): + size = 10000 + np.random.seed(0) + df = pd.DataFrame({ + 'uint8': np.arange(size, dtype=np.uint8), + 'uint16': np.arange(size, dtype=np.uint16), + 'uint32': np.arange(size, dtype=np.uint32), + 'uint64': np.arange(size, dtype=np.uint64), + 'int8': np.arange(size, dtype=np.int16), + 'int16': np.arange(size, dtype=np.int16), + 'int32': np.arange(size, dtype=np.int32), + 'int64': np.arange(size, dtype=np.int64), + 'float32': np.arange(size, dtype=np.float32), + 'float64': np.arange(size, dtype=np.float64), + 'bool': np.random.randn(size) > 0 + }) + arrow_table = A.from_pandas_dataframe(df) + imos = paio.InMemoryOutputStream() + pq.write_table(arrow_table, imos, version="2.0") + buf = imos.get_result() + reader = paio.BufferReader(buf) + df_read = pq.read_table(reader).to_pandas() + pdt.assert_frame_equal(df, df_read) + + +@parquet def test_pandas_parquet_configuration_options(tmpdir): size = 10000 np.random.seed(0)