Repository: arrow Updated Branches: refs/heads/master c327b5fd2 -> 1f81adcc8
ARROW-503: [Python] Implement Python interface to streaming file format See the new `StreamWriter` and `StreamReader` classes. This patch is stacked on top of the patch for ARROW-475. Will rebase when that is merged. Author: Wes McKinney <[email protected]> Closes #299 from wesm/ARROW-503 and squashes the following commits: e9d918e [Wes McKinney] Close BufferOutputStream after completing file or stream writes 31e519f [Wes McKinney] Add function alias to preserve backwards compatibility faac28c [Wes McKinney] Fix small bug in BinaryArray::Equals, add rudimentary StreamReader/Writer interface and tests d9fb3dc [Wes McKinney] Refactoring, consolidate IPC code into io.pyx Project: http://git-wip-us.apache.org/repos/asf/arrow/repo Commit: http://git-wip-us.apache.org/repos/asf/arrow/commit/1f81adcc Tree: http://git-wip-us.apache.org/repos/asf/arrow/tree/1f81adcc Diff: http://git-wip-us.apache.org/repos/asf/arrow/diff/1f81adcc Branch: refs/heads/master Commit: 1f81adcc88b138c6ae5f5ffb3250f87239c89dc1 Parents: c327b5f Author: Wes McKinney <[email protected]> Authored: Mon Jan 23 09:10:18 2017 -0500 Committer: Wes McKinney <[email protected]> Committed: Mon Jan 23 09:10:18 2017 -0500 ---------------------------------------------------------------------- cpp/src/arrow/array.cc | 2 +- cpp/src/arrow/ipc/ipc-file-test.cc | 2 + cpp/src/arrow/ipc/stream.cc | 6 +- cpp/src/arrow/ipc/stream.h | 3 + python/CMakeLists.txt | 1 - python/pyarrow/__init__.py | 2 + python/pyarrow/includes/libarrow_ipc.pxd | 29 +- python/pyarrow/io.pyx | 367 ++++++++++++++++++-------- python/pyarrow/ipc.py | 83 ++++++ python/pyarrow/ipc.pyx | 115 -------- python/pyarrow/schema.pyx | 1 + python/pyarrow/table.pxd | 1 + python/pyarrow/tests/test_ipc.py | 120 ++++----- python/setup.py | 1 - 14 files changed, 438 insertions(+), 295 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/arrow/blob/1f81adcc/cpp/src/arrow/array.cc ---------------------------------------------------------------------- diff --git a/cpp/src/arrow/array.cc b/cpp/src/arrow/array.cc index 7509520..aa4a692 100644 --- a/cpp/src/arrow/array.cc +++ b/cpp/src/arrow/array.cc @@ -359,7 +359,7 @@ bool BinaryArray::EqualsExact(const BinaryArray& other) const { if (!data_buffer_ && !(other.data_buffer_)) { return true; } - return data_buffer_->Equals(*other.data_buffer_, data_buffer_->size()); + return data_buffer_->Equals(*other.data_buffer_, raw_offsets()[length_]); } bool BinaryArray::Equals(const std::shared_ptr<Array>& arr) const { http://git-wip-us.apache.org/repos/asf/arrow/blob/1f81adcc/cpp/src/arrow/ipc/ipc-file-test.cc ---------------------------------------------------------------------- diff --git a/cpp/src/arrow/ipc/ipc-file-test.cc b/cpp/src/arrow/ipc/ipc-file-test.cc index 15ceb80..7cd8054 100644 --- a/cpp/src/arrow/ipc/ipc-file-test.cc +++ b/cpp/src/arrow/ipc/ipc-file-test.cc @@ -75,6 +75,7 @@ class TestFileFormat : public ::testing::TestWithParam<MakeRecordBatch*> { RETURN_NOT_OK(writer->WriteRecordBatch(*batch)); } RETURN_NOT_OK(writer->Close()); + RETURN_NOT_OK(sink_->Close()); // Current offset into stream is the end of the file int64_t footer_offset; @@ -138,6 +139,7 @@ class TestStreamFormat : public ::testing::TestWithParam<MakeRecordBatch*> { RETURN_NOT_OK(writer->WriteRecordBatch(batch)); } RETURN_NOT_OK(writer->Close()); + RETURN_NOT_OK(sink_->Close()); // Open the file auto buf_reader = std::make_shared<io::BufferReader>(buffer_); http://git-wip-us.apache.org/repos/asf/arrow/blob/1f81adcc/cpp/src/arrow/ipc/stream.cc ---------------------------------------------------------------------- diff --git a/cpp/src/arrow/ipc/stream.cc b/cpp/src/arrow/ipc/stream.cc index a2ca672..c9057e8 100644 --- a/cpp/src/arrow/ipc/stream.cc +++ b/cpp/src/arrow/ipc/stream.cc @@ -117,9 +117,9 @@ Status StreamWriter::WriteRecordBatch(const RecordBatch& batch) { } Status StreamWriter::Close() { - // Close the stream - RETURN_NOT_OK(CheckStarted()); - return sink_->Close(); + // Write the schema if not already written + // User is responsible for closing the OutputStream + return CheckStarted(); } // ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/arrow/blob/1f81adcc/cpp/src/arrow/ipc/stream.h ---------------------------------------------------------------------- diff --git a/cpp/src/arrow/ipc/stream.h b/cpp/src/arrow/ipc/stream.h index 0b0e62f..53f51dc 100644 --- a/cpp/src/arrow/ipc/stream.h +++ b/cpp/src/arrow/ipc/stream.h @@ -54,6 +54,9 @@ class ARROW_EXPORT StreamWriter { std::shared_ptr<StreamWriter>* out); virtual Status WriteRecordBatch(const RecordBatch& batch); + + /// Perform any logic necessary to finish the stream. User is responsible for + /// closing the actual OutputStream virtual Status Close(); protected: http://git-wip-us.apache.org/repos/asf/arrow/blob/1f81adcc/python/CMakeLists.txt ---------------------------------------------------------------------- diff --git a/python/CMakeLists.txt b/python/CMakeLists.txt index b3735b1..d63fff4 100644 --- a/python/CMakeLists.txt +++ b/python/CMakeLists.txt @@ -409,7 +409,6 @@ set(CYTHON_EXTENSIONS config error io - ipc scalar schema table http://git-wip-us.apache.org/repos/asf/arrow/blob/1f81adcc/python/pyarrow/__init__.py ---------------------------------------------------------------------- diff --git a/python/pyarrow/__init__.py b/python/pyarrow/__init__.py index d563c7a..7c521db 100644 --- a/python/pyarrow/__init__.py +++ b/python/pyarrow/__init__.py @@ -46,6 +46,8 @@ from pyarrow.filesystem import Filesystem, HdfsClient, LocalFilesystem from pyarrow.io import (HdfsFile, NativeFile, PythonFileInterface, Buffer, InMemoryOutputStream, BufferReader) +from pyarrow.ipc import FileReader, FileWriter, StreamReader, StreamWriter + from pyarrow.scalar import (ArrayValue, Scalar, NA, NAType, BooleanValue, Int8Value, Int16Value, Int32Value, Int64Value, http://git-wip-us.apache.org/repos/asf/arrow/blob/1f81adcc/python/pyarrow/includes/libarrow_ipc.pxd ---------------------------------------------------------------------- diff --git a/python/pyarrow/includes/libarrow_ipc.pxd b/python/pyarrow/includes/libarrow_ipc.pxd index 8295760..bfece14 100644 --- a/python/pyarrow/includes/libarrow_ipc.pxd +++ b/python/pyarrow/includes/libarrow_ipc.pxd @@ -20,18 +20,37 @@ from pyarrow.includes.common cimport * from pyarrow.includes.libarrow cimport (MemoryPool, CArray, CSchema, CRecordBatch) -from pyarrow.includes.libarrow_io cimport (OutputStream, ReadableFileInterface) +from pyarrow.includes.libarrow_io cimport (InputStream, OutputStream, + ReadableFileInterface) -cdef extern from "arrow/ipc/file.h" namespace "arrow::ipc" nogil: - cdef cppclass CFileWriter " arrow::ipc::FileWriter": +cdef extern from "arrow/ipc/stream.h" namespace "arrow::ipc" nogil: + + cdef cppclass CStreamWriter " arrow::ipc::StreamWriter": @staticmethod CStatus Open(OutputStream* sink, const shared_ptr[CSchema]& schema, - shared_ptr[CFileWriter]* out) + shared_ptr[CStreamWriter]* out) + CStatus Close() CStatus WriteRecordBatch(const CRecordBatch& batch) - CStatus Close() + cdef cppclass CStreamReader " arrow::ipc::StreamReader": + + @staticmethod + CStatus Open(const shared_ptr[InputStream]& stream, + shared_ptr[CStreamReader]* out) + + shared_ptr[CSchema] schema() + + CStatus GetNextRecordBatch(shared_ptr[CRecordBatch]* batch) + + +cdef extern from "arrow/ipc/file.h" namespace "arrow::ipc" nogil: + + cdef cppclass CFileWriter " arrow::ipc::FileWriter"(CStreamWriter): + @staticmethod + CStatus Open(OutputStream* sink, const shared_ptr[CSchema]& schema, + shared_ptr[CFileWriter]* out) cdef cppclass CFileReader " arrow::ipc::FileReader": http://git-wip-us.apache.org/repos/asf/arrow/blob/1f81adcc/python/pyarrow/io.pyx ---------------------------------------------------------------------- diff --git a/python/pyarrow/io.pyx b/python/pyarrow/io.pyx index 2621512..0755ed8 100644 --- a/python/pyarrow/io.pyx +++ b/python/pyarrow/io.pyx @@ -15,20 +15,26 @@ # specific language governing permissions and limitations # under the License. -# Cython wrappers for IO interfaces defined in arrow/io +# Cython wrappers for IO interfaces defined in arrow::io and messaging in +# arrow::ipc # cython: profile=False # distutils: language = c++ # cython: embedsignature = True +from cython.operator cimport dereference as deref + from libc.stdlib cimport malloc, free from pyarrow.includes.libarrow cimport * -cimport pyarrow.includes.pyarrow as pyarrow from pyarrow.includes.libarrow_io cimport * +from pyarrow.includes.libarrow_ipc cimport * +cimport pyarrow.includes.pyarrow as pyarrow from pyarrow.compat import frombytes, tobytes, encode_file_path from pyarrow.error cimport check_status +from pyarrow.schema cimport Schema +from pyarrow.table cimport RecordBatch, batch_from_cbatch cimport cpython as cp @@ -38,6 +44,11 @@ import sys import threading import time + +# 64K +DEFAULT_BUFFER_SIZE = 2 ** 16 + + # To let us get a PyObject* and avoid Cython auto-ref-counting cdef extern from "Python.h": PyObject* PyBytes_FromStringAndSizeNative" PyBytes_FromStringAndSize"( @@ -167,6 +178,129 @@ cdef class NativeFile: return wrap_buffer(output) + def download(self, stream_or_path, buffer_size=None): + """ + Read file completely to local path (rather than reading completely into + memory). First seeks to the beginning of the file. + """ + cdef: + int64_t bytes_read = 0 + uint8_t* buf + self._assert_readable() + + buffer_size = buffer_size or DEFAULT_BUFFER_SIZE + + write_queue = Queue(50) + + if not hasattr(stream_or_path, 'read'): + stream = open(stream_or_path, 'wb') + cleanup = lambda: stream.close() + else: + stream = stream_or_path + cleanup = lambda: None + + done = False + exc_info = None + def bg_write(): + try: + while not done or write_queue.qsize() > 0: + try: + buf = write_queue.get(timeout=0.01) + except QueueEmpty: + continue + stream.write(buf) + except Exception as e: + exc_info = sys.exc_info() + finally: + cleanup() + + self.seek(0) + + writer_thread = threading.Thread(target=bg_write) + + # This isn't ideal -- PyBytes_FromStringAndSize copies the data from + # the passed buffer, so it's hard for us to avoid doubling the memory + buf = <uint8_t*> malloc(buffer_size) + if buf == NULL: + raise MemoryError("Failed to allocate {0} bytes" + .format(buffer_size)) + + writer_thread.start() + + cdef int64_t total_bytes = 0 + cdef int32_t c_buffer_size = buffer_size + + try: + while True: + with nogil: + check_status(self.rd_file.get() + .Read(c_buffer_size, &bytes_read, buf)) + + total_bytes += bytes_read + + # EOF + if bytes_read == 0: + break + + pybuf = cp.PyBytes_FromStringAndSize(<const char*>buf, + bytes_read) + + write_queue.put_nowait(pybuf) + finally: + free(buf) + done = True + + writer_thread.join() + if exc_info is not None: + raise exc_info[0], exc_info[1], exc_info[2] + + def upload(self, stream, buffer_size=None): + """ + Pipe file-like object to file + """ + write_queue = Queue(50) + self._assert_writeable() + + buffer_size = buffer_size or DEFAULT_BUFFER_SIZE + + done = False + exc_info = None + def bg_write(): + try: + while not done or write_queue.qsize() > 0: + try: + buf = write_queue.get(timeout=0.01) + except QueueEmpty: + continue + + self.write(buf) + + except Exception as e: + exc_info = sys.exc_info() + + writer_thread = threading.Thread(target=bg_write) + writer_thread.start() + + try: + while True: + buf = stream.read(buffer_size) + if not buf: + break + + if writer_thread.is_alive(): + while write_queue.full(): + time.sleep(0.01) + else: + break + + write_queue.put_nowait(buf) + finally: + done = True + + writer_thread.join() + if exc_info is not None: + raise exc_info[0], exc_info[1], exc_info[2] + # ---------------------------------------------------------------------- # Python file-like objects @@ -679,58 +813,17 @@ cdef class _HdfsClient: return out - def upload(self, path, stream, buffer_size=2**16): + def download(self, path, stream, buffer_size=None): + with self.open(path, 'rb') as f: + f.download(stream, buffer_size=buffer_size) + + def upload(self, path, stream, buffer_size=None): """ Upload file-like object to HDFS path """ - write_queue = Queue(50) - with self.open(path, 'wb') as f: - done = False - exc_info = None - def bg_write(): - try: - while not done or write_queue.qsize() > 0: - try: - buf = write_queue.get(timeout=0.01) - except QueueEmpty: - continue - - f.write(buf) - - except Exception as e: - exc_info = sys.exc_info() - - writer_thread = threading.Thread(target=bg_write) - writer_thread.start() + f.upload(stream, buffer_size=buffer_size) - try: - while True: - buf = stream.read(buffer_size) - if not buf: - break - - if writer_thread.is_alive(): - while write_queue.full(): - time.sleep(0.01) - else: - break - - write_queue.put_nowait(buf) - finally: - done = True - - writer_thread.join() - if exc_info is not None: - raise exc_info[0], exc_info[1], exc_info[2] - - def download(self, path, stream, buffer_size=None): - with self.open(path, 'rb', buffer_size=buffer_size) as f: - f.download(stream) - - -# ---------------------------------------------------------------------- -# Specialization for HDFS # ARROW-404: Helper class to ensure that files are closed before the # client. During deallocation of the extension class, the attributes are @@ -766,75 +859,139 @@ cdef class HdfsFile(NativeFile): def __dealloc__(self): self.parent = None - def download(self, stream_or_path): +# ---------------------------------------------------------------------- +# File and stream readers and writers + +cdef class _StreamWriter: + cdef: + shared_ptr[CStreamWriter] writer + shared_ptr[OutputStream] sink + bint closed + + def __cinit__(self): + self.closed = True + + def __dealloc__(self): + if not self.closed: + self.close() + + def _open(self, sink, Schema schema): + get_writer(sink, &self.sink) + + with nogil: + check_status(CStreamWriter.Open(self.sink.get(), schema.sp_schema, + &self.writer)) + + self.closed = False + + def write_batch(self, RecordBatch batch): + with nogil: + check_status(self.writer.get() + .WriteRecordBatch(deref(batch.batch))) + + def close(self): + with nogil: + check_status(self.writer.get().Close()) + self.closed = True + + +cdef class _StreamReader: + cdef: + shared_ptr[CStreamReader] reader + + cdef readonly: + Schema schema + + def __cinit__(self): + pass + + def _open(self, source): + cdef: + shared_ptr[ReadableFileInterface] reader + shared_ptr[InputStream] in_stream + + get_reader(source, &reader) + in_stream = <shared_ptr[InputStream]> reader + + with nogil: + check_status(CStreamReader.Open(in_stream, &self.reader)) + + schema = Schema() + schema.init_schema(self.reader.get().schema()) + + def get_next_batch(self): """ - Read file completely to local path (rather than reading completely into - memory). First seeks to the beginning of the file. + Read next RecordBatch from the stream. Raises StopIteration at end of + stream """ - cdef: - int64_t bytes_read = 0 - uint8_t* buf - self._assert_readable() + cdef shared_ptr[CRecordBatch] batch - write_queue = Queue(50) + with nogil: + check_status(self.reader.get().GetNextRecordBatch(&batch)) - if not hasattr(stream_or_path, 'read'): - stream = open(stream_or_path, 'wb') - cleanup = lambda: stream.close() - else: - stream = stream_or_path - cleanup = lambda: None + if batch.get() == NULL: + raise StopIteration - done = False - exc_info = None - def bg_write(): - try: - while not done or write_queue.qsize() > 0: - try: - buf = write_queue.get(timeout=0.01) - except QueueEmpty: - continue - stream.write(buf) - except Exception as e: - exc_info = sys.exc_info() - finally: - cleanup() + return batch_from_cbatch(batch) - self.seek(0) - writer_thread = threading.Thread(target=bg_write) +cdef class _FileWriter(_StreamWriter): - # This isn't ideal -- PyBytes_FromStringAndSize copies the data from - # the passed buffer, so it's hard for us to avoid doubling the memory - buf = <uint8_t*> malloc(self.buffer_size) - if buf == NULL: - raise MemoryError("Failed to allocate {0} bytes" - .format(self.buffer_size)) + def _open(self, sink, Schema schema): + cdef shared_ptr[CFileWriter] writer + get_writer(sink, &self.sink) - writer_thread.start() + with nogil: + check_status(CFileWriter.Open(self.sink.get(), schema.sp_schema, + &writer)) - cdef int64_t total_bytes = 0 + # Cast to base class, because has same interface + self.writer = <shared_ptr[CStreamWriter]> writer + self.closed = False - try: - while True: - with nogil: - check_status(self.rd_file.get() - .Read(self.buffer_size, &bytes_read, buf)) - total_bytes += bytes_read +cdef class _FileReader: + cdef: + shared_ptr[CFileReader] reader - # EOF - if bytes_read == 0: - break + def __cinit__(self): + pass - pybuf = cp.PyBytes_FromStringAndSize(<const char*>buf, - bytes_read) + def _open(self, source, footer_offset=None): + cdef shared_ptr[ReadableFileInterface] reader + get_reader(source, &reader) - write_queue.put_nowait(pybuf) - finally: - free(buf) - done = True + cdef int64_t offset = 0 + if footer_offset is not None: + offset = footer_offset - writer_thread.join() - if exc_info is not None: - raise exc_info[0], exc_info[1], exc_info[2] + with nogil: + if offset != 0: + check_status(CFileReader.Open2(reader, offset, &self.reader)) + else: + check_status(CFileReader.Open(reader, &self.reader)) + + property num_dictionaries: + + def __get__(self): + return self.reader.get().num_dictionaries() + + property num_record_batches: + + def __get__(self): + return self.reader.get().num_record_batches() + + def get_batch(self, int i): + cdef shared_ptr[CRecordBatch] batch + + if i < 0 or i >= self.num_record_batches: + raise ValueError('Batch number {0} out of range'.format(i)) + + with nogil: + check_status(self.reader.get().GetRecordBatch(i, &batch)) + + return batch_from_cbatch(batch) + + # TODO(wesm): ARROW-503: Function was renamed. Remove after a period of + # time has passed + get_record_batch = get_batch http://git-wip-us.apache.org/repos/asf/arrow/blob/1f81adcc/python/pyarrow/ipc.py ---------------------------------------------------------------------- diff --git a/python/pyarrow/ipc.py b/python/pyarrow/ipc.py new file mode 100644 index 0000000..5a56165 --- /dev/null +++ b/python/pyarrow/ipc.py @@ -0,0 +1,83 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + +# Arrow file and stream reader/writer classes, and other messaging tools + +import pyarrow.io as io + + +class StreamReader(io._StreamReader): + """ + Reader for the Arrow streaming binary format + + Parameters + ---------- + source : str, pyarrow.NativeFile, or file-like Python object + Either a file path, or a readable file object + """ + def __init__(self, source): + self._open(source) + + def __iter__(self): + while True: + yield self.get_next_batch() + + +class StreamWriter(io._StreamWriter): + """ + Writer for the Arrow streaming binary format + + Parameters + ---------- + sink : str, pyarrow.NativeFile, or file-like Python object + Either a file path, or a writeable file object + schema : pyarrow.Schema + The Arrow schema for data to be written to the file + """ + def __init__(self, sink, schema): + self._open(sink, schema) + + +class FileReader(io._FileReader): + """ + Class for reading Arrow record batch data from the Arrow binary file format + + Parameters + ---------- + source : str, pyarrow.NativeFile, or file-like Python object + Either a file path, or a readable file object + footer_offset : int, default None + If the file is embedded in some larger file, this is the byte offset to + the very end of the file data + """ + def __init__(self, source, footer_offset=None): + self._open(source, footer_offset=footer_offset) + + +class FileWriter(io._FileWriter): + """ + Writer to create the Arrow binary file format + + Parameters + ---------- + sink : str, pyarrow.NativeFile, or file-like Python object + Either a file path, or a writeable file object + schema : pyarrow.Schema + The Arrow schema for data to be written to the file + """ + def __init__(self, sink, schema): + self._open(sink, schema) http://git-wip-us.apache.org/repos/asf/arrow/blob/1f81adcc/python/pyarrow/ipc.pyx ---------------------------------------------------------------------- diff --git a/python/pyarrow/ipc.pyx b/python/pyarrow/ipc.pyx deleted file mode 100644 index 22069a7..0000000 --- a/python/pyarrow/ipc.pyx +++ /dev/null @@ -1,115 +0,0 @@ -# Licensed to the Apache Software Foundation (ASF) under one -# or more contributor license agreements. See the NOTICE file -# distributed with this work for additional information -# regarding copyright ownership. The ASF licenses this file -# to you under the Apache License, Version 2.0 (the -# "License"); you may not use this file except in compliance -# with the License. You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, -# software distributed under the License is distributed on an -# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -# KIND, either express or implied. See the License for the -# specific language governing permissions and limitations -# under the License. - -# Cython wrappers for arrow::ipc - -# cython: profile=False -# distutils: language = c++ -# cython: embedsignature = True - -from cython.operator cimport dereference as deref - -from pyarrow.includes.libarrow cimport * -from pyarrow.includes.libarrow_io cimport * -from pyarrow.includes.libarrow_ipc cimport * -cimport pyarrow.includes.pyarrow as pyarrow - -from pyarrow.error cimport check_status -from pyarrow.io cimport NativeFile, get_reader, get_writer -from pyarrow.schema cimport Schema -from pyarrow.table cimport RecordBatch - -from pyarrow.compat import frombytes, tobytes -import pyarrow.io as io - -cimport cpython as cp - - -cdef class ArrowFileWriter: - cdef: - shared_ptr[CFileWriter] writer - shared_ptr[OutputStream] sink - bint closed - - def __cinit__(self, sink, Schema schema): - self.closed = True - get_writer(sink, &self.sink) - - with nogil: - check_status(CFileWriter.Open(self.sink.get(), schema.sp_schema, - &self.writer)) - - self.closed = False - - def __dealloc__(self): - if not self.closed: - self.close() - - def write_record_batch(self, RecordBatch batch): - with nogil: - check_status(self.writer.get() - .WriteRecordBatch(deref(batch.batch))) - - def close(self): - with nogil: - check_status(self.writer.get().Close()) - self.closed = True - - -cdef class ArrowFileReader: - cdef: - shared_ptr[CFileReader] reader - - def __cinit__(self, source, footer_offset=None): - cdef shared_ptr[ReadableFileInterface] reader - get_reader(source, &reader) - - cdef int64_t offset = 0 - if footer_offset is not None: - offset = footer_offset - - with nogil: - if offset != 0: - check_status(CFileReader.Open2(reader, offset, &self.reader)) - else: - check_status(CFileReader.Open(reader, &self.reader)) - - property num_dictionaries: - - def __get__(self): - return self.reader.get().num_dictionaries() - - property num_record_batches: - - def __get__(self): - return self.reader.get().num_record_batches() - - def get_record_batch(self, int i): - cdef: - shared_ptr[CRecordBatch] batch - RecordBatch result - - if i < 0 or i >= self.num_record_batches: - raise ValueError('Batch number {0} out of range'.format(i)) - - with nogil: - check_status(self.reader.get().GetRecordBatch(i, &batch)) - - result = RecordBatch() - result.init(batch) - - return result http://git-wip-us.apache.org/repos/asf/arrow/blob/1f81adcc/python/pyarrow/schema.pyx ---------------------------------------------------------------------- diff --git a/python/pyarrow/schema.pyx b/python/pyarrow/schema.pyx index 2bcfec1..52eeeaf 100644 --- a/python/pyarrow/schema.pyx +++ b/python/pyarrow/schema.pyx @@ -112,6 +112,7 @@ cdef class Field: def __get__(self): return frombytes(self.field.name) + cdef class Schema: def __cinit__(self): http://git-wip-us.apache.org/repos/asf/arrow/blob/1f81adcc/python/pyarrow/table.pxd ---------------------------------------------------------------------- diff --git a/python/pyarrow/table.pxd b/python/pyarrow/table.pxd index df3687d..389727b 100644 --- a/python/pyarrow/table.pxd +++ b/python/pyarrow/table.pxd @@ -59,3 +59,4 @@ cdef class RecordBatch: cdef _check_nullptr(self) cdef api object table_from_ctable(const shared_ptr[CTable]& ctable) +cdef api object batch_from_cbatch(const shared_ptr[CRecordBatch]& cbatch) http://git-wip-us.apache.org/repos/asf/arrow/blob/1f81adcc/python/pyarrow/tests/test_ipc.py ---------------------------------------------------------------------- diff --git a/python/pyarrow/tests/test_ipc.py b/python/pyarrow/tests/test_ipc.py index bbd6c6a..819d1b7 100644 --- a/python/pyarrow/tests/test_ipc.py +++ b/python/pyarrow/tests/test_ipc.py @@ -16,21 +16,20 @@ # under the License. import io +import pytest import numpy as np from pandas.util.testing import assert_frame_equal import pandas as pd -import pyarrow as A -import pyarrow.io as aio -import pyarrow.ipc as ipc +from pyarrow.compat import unittest +import pyarrow as pa -class RoundtripTest(object): - # Also tests writing zero-copy NumPy array with additional padding +class MessagingTest(object): - def __init__(self): + def setUp(self): self.sink = self._get_sink() def _get_sink(self): @@ -39,14 +38,15 @@ class RoundtripTest(object): def _get_source(self): return self.sink.getvalue() - def run(self): + def write_batches(self): nrows = 5 df = pd.DataFrame({ 'one': np.random.randn(nrows), 'two': ['foo', np.nan, 'bar', 'bazbaz', 'qux']}) - batch = A.RecordBatch.from_pandas(df) - writer = ipc.ArrowFileWriter(self.sink, batch.schema) + batch = pa.RecordBatch.from_pandas(df) + + writer = self._get_writer(self.sink, batch.schema) num_batches = 5 frames = [] @@ -55,46 +55,73 @@ class RoundtripTest(object): unique_df = df.copy() unique_df['one'] = np.random.randn(nrows) - batch = A.RecordBatch.from_pandas(unique_df) - writer.write_record_batch(batch) + batch = pa.RecordBatch.from_pandas(unique_df) + writer.write_batch(batch) frames.append(unique_df) batches.append(batch) writer.close() + return batches + + +class TestFile(MessagingTest, unittest.TestCase): + # Also tests writing zero-copy NumPy array with additional padding + + def _get_writer(self, sink, schema): + return pa.FileWriter(sink, schema) + def test_simple_roundtrip(self): + batches = self.write_batches() file_contents = self._get_source() - reader = ipc.ArrowFileReader(aio.BufferReader(file_contents)) - assert reader.num_record_batches == num_batches + reader = pa.FileReader(pa.BufferReader(file_contents)) - for i in range(num_batches): + assert reader.num_record_batches == len(batches) + + for i, batch in enumerate(batches): # it works. Must convert back to DataFrame - batch = reader.get_record_batch(i) + batch = reader.get_batch(i) assert batches[i].equals(batch) -class InMemoryStreamTest(RoundtripTest): +class TestStream(MessagingTest, unittest.TestCase): + + def _get_writer(self, sink, schema): + return pa.StreamWriter(sink, schema) + + def test_simple_roundtrip(self): + batches = self.write_batches() + file_contents = self._get_source() + reader = pa.StreamReader(pa.BufferReader(file_contents)) + + total = 0 + for i, next_batch in enumerate(reader): + assert next_batch.equals(batches[i]) + total += 1 + + assert total == len(batches) + + with pytest.raises(StopIteration): + reader.get_next_batch() + + +class TestInMemoryFile(TestFile): def _get_sink(self): - return aio.InMemoryOutputStream() + return pa.InMemoryOutputStream() def _get_source(self): return self.sink.get_result() -def test_ipc_file_simple_roundtrip(): - helper = RoundtripTest() - helper.run() - - def test_ipc_zero_copy_numpy(): df = pd.DataFrame({'foo': [1.5]}) - batch = A.RecordBatch.from_pandas(df) - sink = aio.InMemoryOutputStream() + batch = pa.RecordBatch.from_pandas(df) + sink = pa.InMemoryOutputStream() write_file(batch, sink) buffer = sink.get_result() - reader = aio.BufferReader(buffer) + reader = pa.BufferReader(buffer) batches = read_file(reader) @@ -103,48 +130,13 @@ def test_ipc_zero_copy_numpy(): assert_frame_equal(df, rdf) -# XXX: For benchmarking - -def big_batch(): - K = 2**4 - N = 2**20 - df = pd.DataFrame( - np.random.randn(K, N).T, - columns=[str(i) for i in range(K)] - ) - - df = pd.concat([df] * 2 ** 3, ignore_index=True) - return df - - -def write_to_memory2(batch): - sink = aio.InMemoryOutputStream() - write_file(batch, sink) - return sink.get_result() - - -def write_to_memory(batch): - sink = io.BytesIO() - write_file(batch, sink) - return sink.getvalue() - - def write_file(batch, sink): - writer = ipc.ArrowFileWriter(sink, batch.schema) - writer.write_record_batch(batch) + writer = pa.FileWriter(sink, batch.schema) + writer.write_batch(batch) writer.close() def read_file(source): - reader = ipc.ArrowFileReader(source) - return [reader.get_record_batch(i) + reader = pa.FileReader(source) + return [reader.get_batch(i) for i in range(reader.num_record_batches)] - -# df = big_batch() -# batch = A.RecordBatch.from_pandas(df) -# mem = write_to_memory(batch) -# batches = read_file(mem) -# data = batches[0].to_pandas() -# rdf = pd.DataFrame(data) - -# [x.to_pandas() for x in batches] http://git-wip-us.apache.org/repos/asf/arrow/blob/1f81adcc/python/setup.py ---------------------------------------------------------------------- diff --git a/python/setup.py b/python/setup.py index de59a92..9c63e93 100644 --- a/python/setup.py +++ b/python/setup.py @@ -94,7 +94,6 @@ class build_ext(_build_ext): 'config', 'error', 'io', - 'ipc', '_parquet', 'scalar', 'schema',
