ARROW-312: Read and write Arrow IPC file format from Python This also adds some IO scaffolding for interacting with `arrow::Buffer` objects from Python and assorted additions to help with testing.
Author: Wes McKinney <wes.mckin...@twosigma.com> Closes #164 from wesm/ARROW-312 and squashes the following commits: 7df3e5f [Wes McKinney] Set BUILD_WITH_INSTALL_RPATH on arrow_ipc be8cee0 [Wes McKinney] Link Cython modules to libarrow* libraries 5716601 [Wes McKinney] Fix accidental deletion 77fb03b [Wes McKinney] Add / test Buffer wrapper. Test that we can write an arrow file to a wrapped buffer. Resize buffer in BufferOutputStream on close 316537d [Wes McKinney] Get ready to wrap Arrow buffers in a Python object 4822d32 [Wes McKinney] Implement RecordBatch::Equals, compare in Python ipc file writes a931e49 [Wes McKinney] Permit buffers (write padding) in a non-multiple of 64 in an IPC context, to allow zero-copy writing of NumPy arrays 2c49cd4 [Wes McKinney] Some debugging ca1562b [Wes McKinney] Draft implementations of Arrow file read/write from Python Project: http://git-wip-us.apache.org/repos/asf/arrow/repo Commit: http://git-wip-us.apache.org/repos/asf/arrow/commit/a9747cea Tree: http://git-wip-us.apache.org/repos/asf/arrow/tree/a9747cea Diff: http://git-wip-us.apache.org/repos/asf/arrow/diff/a9747cea Branch: refs/heads/master Commit: a9747ceac2b6399c6acf027de8074d8661d5eb1d Parents: 17cd7a6 Author: Wes McKinney <wes.mckin...@twosigma.com> Authored: Mon Oct 10 11:21:49 2016 -0400 Committer: Julien Le Dem <jul...@dremio.com> Committed: Mon Oct 10 18:42:05 2016 -0700 ---------------------------------------------------------------------- cpp/src/arrow/io/io-memory-test.cc | 25 ++ cpp/src/arrow/io/memory.cc | 13 +- cpp/src/arrow/ipc/CMakeLists.txt | 7 + cpp/src/arrow/ipc/adapter.cc | 16 +- cpp/src/arrow/ipc/util.h | 6 +- cpp/src/arrow/table-test.cc | 27 ++ cpp/src/arrow/table.cc | 16 ++ cpp/src/arrow/table.h | 2 + cpp/src/arrow/types/primitive-test.cc | 3 +- cpp/src/arrow/util/bit-util.h | 13 + cpp/src/arrow/util/buffer.cc | 16 +- cpp/src/arrow/util/buffer.h | 1 - cpp/src/arrow/util/logging.h | 4 +- python/CMakeLists.txt | 8 +- python/cmake_modules/FindArrow.cmake | 11 + python/pyarrow/__init__.py | 3 +- python/pyarrow/array.pyx | 44 +--- python/pyarrow/includes/common.pxd | 4 - python/pyarrow/includes/libarrow.pxd | 29 ++- python/pyarrow/includes/libarrow_io.pxd | 14 +- python/pyarrow/includes/libarrow_ipc.pxd | 52 ++++ python/pyarrow/includes/pyarrow.pxd | 13 +- python/pyarrow/io.pxd | 6 + python/pyarrow/io.pyx | 340 ++++++++++++++++---------- python/pyarrow/ipc.pyx | 155 ++++++++++++ python/pyarrow/table.pxd | 17 +- python/pyarrow/table.pyx | 194 +++++++++++---- python/pyarrow/tests/test_array.py | 4 + python/pyarrow/tests/test_io.py | 41 ++++ python/pyarrow/tests/test_ipc.py | 116 +++++++++ python/pyarrow/tests/test_table.py | 82 +++---- python/setup.py | 1 + python/src/pyarrow/adapters/builtin.cc | 2 +- python/src/pyarrow/adapters/pandas.cc | 8 + python/src/pyarrow/common.cc | 2 +- python/src/pyarrow/common.h | 20 +- python/src/pyarrow/io.cc | 6 +- 37 files changed, 1012 insertions(+), 309 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/arrow/blob/a9747cea/cpp/src/arrow/io/io-memory-test.cc ---------------------------------------------------------------------- diff --git a/cpp/src/arrow/io/io-memory-test.cc b/cpp/src/arrow/io/io-memory-test.cc index 6de35da..a49faf3 100644 --- a/cpp/src/arrow/io/io-memory-test.cc +++ b/cpp/src/arrow/io/io-memory-test.cc @@ -121,5 +121,30 @@ TEST_F(TestMemoryMappedFile, InvalidFile) { IOError, MemoryMappedFile::Open(non_existent_path, FileMode::READ, &result)); } +class TestBufferOutputStream : public ::testing::Test { + public: + void SetUp() { + buffer_.reset(new PoolBuffer(default_memory_pool())); + stream_.reset(new BufferOutputStream(buffer_)); + } + + protected: + std::shared_ptr<PoolBuffer> buffer_; + std::unique_ptr<OutputStream> stream_; +}; + +TEST_F(TestBufferOutputStream, CloseResizes) { + std::string data = "data123456"; + + const int64_t nbytes = static_cast<int64_t>(data.size()); + const int K = 100; + for (int i = 0; i < K; ++i) { + EXPECT_OK(stream_->Write(reinterpret_cast<const uint8_t*>(data.c_str()), nbytes)); + } + + ASSERT_OK(stream_->Close()); + ASSERT_EQ(K * nbytes, buffer_->size()); +} + } // namespace io } // namespace arrow http://git-wip-us.apache.org/repos/asf/arrow/blob/a9747cea/cpp/src/arrow/io/memory.cc ---------------------------------------------------------------------- diff --git a/cpp/src/arrow/io/memory.cc b/cpp/src/arrow/io/memory.cc index 7d6e02e..c7d0ae5 100644 --- a/cpp/src/arrow/io/memory.cc +++ b/cpp/src/arrow/io/memory.cc @@ -212,7 +212,11 @@ BufferOutputStream::BufferOutputStream(const std::shared_ptr<ResizableBuffer>& b mutable_data_(buffer->mutable_data()) {} Status BufferOutputStream::Close() { - return Status::OK(); + if (position_ < capacity_) { + return buffer_->Resize(position_); + } else { + return Status::OK(); + } } Status BufferOutputStream::Tell(int64_t* position) { @@ -228,8 +232,11 @@ Status BufferOutputStream::Write(const uint8_t* data, int64_t nbytes) { } Status BufferOutputStream::Reserve(int64_t nbytes) { - while (position_ + nbytes > capacity_) { - int64_t new_capacity = std::max(kBufferMinimumSize, capacity_ * 2); + int64_t new_capacity = capacity_; + while (position_ + nbytes > new_capacity) { + new_capacity = std::max(kBufferMinimumSize, new_capacity * 2); + } + if (new_capacity > capacity_) { RETURN_NOT_OK(buffer_->Resize(new_capacity)); capacity_ = new_capacity; } http://git-wip-us.apache.org/repos/asf/arrow/blob/a9747cea/cpp/src/arrow/ipc/CMakeLists.txt ---------------------------------------------------------------------- diff --git a/cpp/src/arrow/ipc/CMakeLists.txt b/cpp/src/arrow/ipc/CMakeLists.txt index bde8c5b..8dcd9ac 100644 --- a/cpp/src/arrow/ipc/CMakeLists.txt +++ b/cpp/src/arrow/ipc/CMakeLists.txt @@ -57,6 +57,13 @@ SET_TARGET_PROPERTIES(arrow_ipc PROPERTIES LINKER_LANGUAGE CXX LINK_FLAGS "${ARROW_IPC_LINK_FLAGS}") +if (APPLE) + set_target_properties(arrow_ipc + PROPERTIES + BUILD_WITH_INSTALL_RPATH ON + INSTALL_NAME_DIR "@rpath") +endif() + ADD_ARROW_TEST(ipc-adapter-test) ARROW_TEST_LINK_LIBRARIES(ipc-adapter-test ${ARROW_IPC_TEST_LINK_LIBS}) http://git-wip-us.apache.org/repos/asf/arrow/blob/a9747cea/cpp/src/arrow/ipc/adapter.cc ---------------------------------------------------------------------- diff --git a/cpp/src/arrow/ipc/adapter.cc b/cpp/src/arrow/ipc/adapter.cc index 99974a4..cd8ab53 100644 --- a/cpp/src/arrow/ipc/adapter.cc +++ b/cpp/src/arrow/ipc/adapter.cc @@ -162,15 +162,14 @@ class RecordBatchWriter { for (size_t i = 0; i < buffers_.size(); ++i) { const Buffer* buffer = buffers_[i].get(); int64_t size = 0; + int64_t padding = 0; // The buffer might be null if we are handling zero row lengths. if (buffer) { - // We use capacity here, because size might not reflect the padding - // requirements of buffers but capacity always should. - size = buffer->capacity(); - // check that padding is appropriate - RETURN_NOT_OK(CheckMultipleOf64(size)); + size = buffer->size(); + padding = util::RoundUpToMultipleOf64(size) - size; } + // TODO(wesm): We currently have no notion of shared memory page id's, // but we've included it in the metadata IDL for when we have it in the // future. Use page=0 for now @@ -179,12 +178,17 @@ class RecordBatchWriter { // are using from any OS-level shared memory. The thought is that systems // may (in the future) associate integer page id's with physical memory // pages (according to whatever is the desired shared memory mechanism) - buffer_meta_.push_back(flatbuf::Buffer(0, position, size)); + buffer_meta_.push_back(flatbuf::Buffer(0, position, size + padding)); if (size > 0) { RETURN_NOT_OK(dst->Write(buffer->data(), size)); position += size; } + + if (padding > 0) { + RETURN_NOT_OK(dst->Write(kPaddingBytes, padding)); + position += padding; + } } *body_end_offset = position; http://git-wip-us.apache.org/repos/asf/arrow/blob/a9747cea/cpp/src/arrow/ipc/util.h ---------------------------------------------------------------------- diff --git a/cpp/src/arrow/ipc/util.h b/cpp/src/arrow/ipc/util.h index 94079a3..9000d1b 100644 --- a/cpp/src/arrow/ipc/util.h +++ b/cpp/src/arrow/ipc/util.h @@ -29,7 +29,11 @@ namespace ipc { // Align on 8-byte boundaries static constexpr int kArrowAlignment = 8; -static constexpr uint8_t kPaddingBytes[kArrowAlignment] = {0}; + +// Buffers are padded to 64-byte boundaries (for SIMD) +static constexpr int kArrowBufferAlignment = 64; + +static constexpr uint8_t kPaddingBytes[kArrowBufferAlignment] = {0}; static inline int64_t PaddedLength(int64_t nbytes, int64_t alignment = kArrowAlignment) { return ((nbytes + alignment - 1) / alignment) * alignment; http://git-wip-us.apache.org/repos/asf/arrow/blob/a9747cea/cpp/src/arrow/table-test.cc ---------------------------------------------------------------------- diff --git a/cpp/src/arrow/table-test.cc b/cpp/src/arrow/table-test.cc index 385e7d8..743fb66 100644 --- a/cpp/src/arrow/table-test.cc +++ b/cpp/src/arrow/table-test.cc @@ -123,4 +123,31 @@ TEST_F(TestTable, InvalidColumns) { ASSERT_RAISES(Invalid, table_->ValidateColumns()); } +class TestRecordBatch : public TestBase {}; + +TEST_F(TestRecordBatch, Equals) { + const int length = 10; + + auto f0 = std::make_shared<Field>("f0", INT32); + auto f1 = std::make_shared<Field>("f1", UINT8); + auto f2 = std::make_shared<Field>("f2", INT16); + + vector<shared_ptr<Field>> fields = {f0, f1, f2}; + auto schema = std::make_shared<Schema>(fields); + + auto a0 = MakePrimitive<Int32Array>(length); + auto a1 = MakePrimitive<UInt8Array>(length); + auto a2 = MakePrimitive<Int16Array>(length); + + RecordBatch b1(schema, length, {a0, a1, a2}); + RecordBatch b2(schema, 5, {a0, a1, a2}); + RecordBatch b3(schema, length, {a0, a1}); + RecordBatch b4(schema, length, {a0, a1, a1}); + + ASSERT_TRUE(b1.Equals(b1)); + ASSERT_FALSE(b1.Equals(b2)); + ASSERT_FALSE(b1.Equals(b3)); + ASSERT_FALSE(b1.Equals(b4)); +} + } // namespace arrow http://git-wip-us.apache.org/repos/asf/arrow/blob/a9747cea/cpp/src/arrow/table.cc ---------------------------------------------------------------------- diff --git a/cpp/src/arrow/table.cc b/cpp/src/arrow/table.cc index 3a250df..af84f27 100644 --- a/cpp/src/arrow/table.cc +++ b/cpp/src/arrow/table.cc @@ -21,6 +21,7 @@ #include <memory> #include <sstream> +#include "arrow/array.h" #include "arrow/column.h" #include "arrow/schema.h" #include "arrow/util/status.h" @@ -35,6 +36,21 @@ const std::string& RecordBatch::column_name(int i) const { return schema_->field(i)->name; } +bool RecordBatch::Equals(const RecordBatch& other) const { + if (num_columns() != other.num_columns() || num_rows_ != other.num_rows()) { + return false; + } + + for (int i = 0; i < num_columns(); ++i) { + if (!column(i)->Equals(other.column(i))) { return false; } + } + + return true; +} + +// ---------------------------------------------------------------------- +// Table methods + Table::Table(const std::string& name, const std::shared_ptr<Schema>& schema, const std::vector<std::shared_ptr<Column>>& columns) : name_(name), schema_(schema), columns_(columns) { http://git-wip-us.apache.org/repos/asf/arrow/blob/a9747cea/cpp/src/arrow/table.h ---------------------------------------------------------------------- diff --git a/cpp/src/arrow/table.h b/cpp/src/arrow/table.h index 36b3c8e..1a856c8 100644 --- a/cpp/src/arrow/table.h +++ b/cpp/src/arrow/table.h @@ -43,6 +43,8 @@ class ARROW_EXPORT RecordBatch { RecordBatch(const std::shared_ptr<Schema>& schema, int32_t num_rows, const std::vector<std::shared_ptr<Array>>& columns); + bool Equals(const RecordBatch& other) const; + // @returns: the table's schema const std::shared_ptr<Schema>& schema() const { return schema_; } http://git-wip-us.apache.org/repos/asf/arrow/blob/a9747cea/cpp/src/arrow/types/primitive-test.cc ---------------------------------------------------------------------- diff --git a/cpp/src/arrow/types/primitive-test.cc b/cpp/src/arrow/types/primitive-test.cc index ffebb92..87eb0fe 100644 --- a/cpp/src/arrow/types/primitive-test.cc +++ b/cpp/src/arrow/types/primitive-test.cc @@ -238,8 +238,7 @@ void TestPrimitiveBuilder<PBoolean>::Check( } typedef ::testing::Types<PBoolean, PUInt8, PUInt16, PUInt32, PUInt64, PInt8, PInt16, - PInt32, PInt64, PFloat, PDouble> - Primitives; + PInt32, PInt64, PFloat, PDouble> Primitives; TYPED_TEST_CASE(TestPrimitiveBuilder, Primitives); http://git-wip-us.apache.org/repos/asf/arrow/blob/a9747cea/cpp/src/arrow/util/bit-util.h ---------------------------------------------------------------------- diff --git a/cpp/src/arrow/util/bit-util.h b/cpp/src/arrow/util/bit-util.h index 873a195..3087ce7 100644 --- a/cpp/src/arrow/util/bit-util.h +++ b/cpp/src/arrow/util/bit-util.h @@ -19,6 +19,7 @@ #define ARROW_UTIL_BIT_UTIL_H #include <cstdint> +#include <limits> #include <memory> #include <vector> @@ -77,6 +78,18 @@ static inline bool is_multiple_of_64(int64_t n) { return (n & 63) == 0; } +inline int64_t RoundUpToMultipleOf64(int64_t num) { + // TODO(wesm): is this definitely needed? + // DCHECK_GE(num, 0); + constexpr int64_t round_to = 64; + constexpr int64_t force_carry_addend = round_to - 1; + constexpr int64_t truncate_bitmask = ~(round_to - 1); + constexpr int64_t max_roundable_num = std::numeric_limits<int64_t>::max() - round_to; + if (num <= max_roundable_num) { return (num + force_carry_addend) & truncate_bitmask; } + // handle overflow case. This should result in a malloc error upstream + return num; +} + void bytes_to_bits(const std::vector<uint8_t>& bytes, uint8_t* bits); ARROW_EXPORT Status bytes_to_bits(const std::vector<uint8_t>&, std::shared_ptr<Buffer>*); http://git-wip-us.apache.org/repos/asf/arrow/blob/a9747cea/cpp/src/arrow/util/buffer.cc ---------------------------------------------------------------------- diff --git a/cpp/src/arrow/util/buffer.cc b/cpp/src/arrow/util/buffer.cc index 703ef83..6faa048 100644 --- a/cpp/src/arrow/util/buffer.cc +++ b/cpp/src/arrow/util/buffer.cc @@ -20,25 +20,13 @@ #include <cstdint> #include <limits> +#include "arrow/util/bit-util.h" #include "arrow/util/logging.h" #include "arrow/util/memory-pool.h" #include "arrow/util/status.h" namespace arrow { -namespace { -int64_t RoundUpToMultipleOf64(int64_t num) { - DCHECK_GE(num, 0); - constexpr int64_t round_to = 64; - constexpr int64_t force_carry_addend = round_to - 1; - constexpr int64_t truncate_bitmask = ~(round_to - 1); - constexpr int64_t max_roundable_num = std::numeric_limits<int64_t>::max() - round_to; - if (num <= max_roundable_num) { return (num + force_carry_addend) & truncate_bitmask; } - // handle overflow case. This should result in a malloc error upstream - return num; -} -} // namespace - Buffer::Buffer(const std::shared_ptr<Buffer>& parent, int64_t offset, int64_t size) { data_ = parent->data() + offset; size_ = size; @@ -64,7 +52,7 @@ PoolBuffer::~PoolBuffer() { Status PoolBuffer::Reserve(int64_t new_capacity) { if (!mutable_data_ || new_capacity > capacity_) { uint8_t* new_data; - new_capacity = RoundUpToMultipleOf64(new_capacity); + new_capacity = util::RoundUpToMultipleOf64(new_capacity); if (mutable_data_) { RETURN_NOT_OK(pool_->Allocate(new_capacity, &new_data)); memcpy(new_data, mutable_data_, size_); http://git-wip-us.apache.org/repos/asf/arrow/blob/a9747cea/cpp/src/arrow/util/buffer.h ---------------------------------------------------------------------- diff --git a/cpp/src/arrow/util/buffer.h b/cpp/src/arrow/util/buffer.h index 1aeebc6..01e4259 100644 --- a/cpp/src/arrow/util/buffer.h +++ b/cpp/src/arrow/util/buffer.h @@ -23,7 +23,6 @@ #include <cstring> #include <memory> -#include "arrow/util/bit-util.h" #include "arrow/util/macros.h" #include "arrow/util/status.h" #include "arrow/util/visibility.h" http://git-wip-us.apache.org/repos/asf/arrow/blob/a9747cea/cpp/src/arrow/util/logging.h ---------------------------------------------------------------------- diff --git a/cpp/src/arrow/util/logging.h b/cpp/src/arrow/util/logging.h index b22f07d..06ee841 100644 --- a/cpp/src/arrow/util/logging.h +++ b/cpp/src/arrow/util/logging.h @@ -118,9 +118,9 @@ class CerrLog { class FatalLog : public CerrLog { public: explicit FatalLog(int /* severity */) // NOLINT - : CerrLog(ARROW_FATAL){} // NOLINT + : CerrLog(ARROW_FATAL) {} // NOLINT - [[noreturn]] ~FatalLog() { + [[noreturn]] ~FatalLog() { if (has_logged_) { std::cerr << std::endl; } std::exit(1); } http://git-wip-us.apache.org/repos/asf/arrow/blob/a9747cea/python/CMakeLists.txt ---------------------------------------------------------------------- diff --git a/python/CMakeLists.txt b/python/CMakeLists.txt index 77a771a..55f6d05 100644 --- a/python/CMakeLists.txt +++ b/python/CMakeLists.txt @@ -352,6 +352,8 @@ ADD_THIRDPARTY_LIB(arrow SHARED_LIB ${ARROW_SHARED_LIB}) ADD_THIRDPARTY_LIB(arrow_io SHARED_LIB ${ARROW_IO_SHARED_LIB}) +ADD_THIRDPARTY_LIB(arrow_ipc + SHARED_LIB ${ARROW_IPC_SHARED_LIB}) ############################################################ # Linker setup @@ -415,6 +417,8 @@ if (UNIX) set(CMAKE_BUILD_WITH_INSTALL_RPATH TRUE) endif() +SET(CMAKE_INSTALL_RPATH_USE_LINK_PATH TRUE) + add_subdirectory(src/pyarrow) add_subdirectory(src/pyarrow/util) @@ -423,6 +427,7 @@ set(CYTHON_EXTENSIONS config error io + ipc scalar schema table @@ -442,6 +447,7 @@ set(PYARROW_SRCS set(LINK_LIBS arrow arrow_io + arrow_ipc ) if(PARQUET_FOUND AND PARQUET_ARROW_FOUND) @@ -455,8 +461,6 @@ if(PARQUET_FOUND AND PARQUET_ARROW_FOUND) parquet) endif() -SET(CMAKE_INSTALL_RPATH_USE_LINK_PATH TRUE) - add_library(pyarrow SHARED ${PYARROW_SRCS}) target_link_libraries(pyarrow ${LINK_LIBS}) http://git-wip-us.apache.org/repos/asf/arrow/blob/a9747cea/python/cmake_modules/FindArrow.cmake ---------------------------------------------------------------------- diff --git a/python/cmake_modules/FindArrow.cmake b/python/cmake_modules/FindArrow.cmake index 9919746..3c359aa 100644 --- a/python/cmake_modules/FindArrow.cmake +++ b/python/cmake_modules/FindArrow.cmake @@ -47,10 +47,16 @@ find_library(ARROW_IO_LIB_PATH NAMES arrow_io ${ARROW_SEARCH_LIB_PATH} NO_DEFAULT_PATH) +find_library(ARROW_IPC_LIB_PATH NAMES arrow_ipc + PATHS + ${ARROW_SEARCH_LIB_PATH} + NO_DEFAULT_PATH) + if (ARROW_INCLUDE_DIR AND ARROW_LIB_PATH) set(ARROW_FOUND TRUE) set(ARROW_LIB_NAME libarrow) set(ARROW_IO_LIB_NAME libarrow_io) + set(ARROW_IPC_LIB_NAME libarrow_ipc) set(ARROW_LIBS ${ARROW_SEARCH_LIB_PATH}) set(ARROW_STATIC_LIB ${ARROW_SEARCH_LIB_PATH}/${ARROW_LIB_NAME}.a) @@ -58,9 +64,14 @@ if (ARROW_INCLUDE_DIR AND ARROW_LIB_PATH) set(ARROW_IO_STATIC_LIB ${ARROW_SEARCH_LIB_PATH}/${ARROW_IO_LIB_NAME}.a) set(ARROW_IO_SHARED_LIB ${ARROW_LIBS}/${ARROW_IO_LIB_NAME}${CMAKE_SHARED_LIBRARY_SUFFIX}) + + set(ARROW_IPC_STATIC_LIB ${ARROW_SEARCH_LIB_PATH}/${ARROW_IPC_LIB_NAME}.a) + set(ARROW_IPC_SHARED_LIB ${ARROW_LIBS}/${ARROW_IPC_LIB_NAME}${CMAKE_SHARED_LIBRARY_SUFFIX}) + if (NOT Arrow_FIND_QUIETLY) message(STATUS "Found the Arrow core library: ${ARROW_LIB_PATH}") message(STATUS "Found the Arrow IO library: ${ARROW_IO_LIB_PATH}") + message(STATUS "Found the Arrow IPC library: ${ARROW_IPC_LIB_PATH}") endif () else () if (NOT Arrow_FIND_QUIETLY) http://git-wip-us.apache.org/repos/asf/arrow/blob/a9747cea/python/pyarrow/__init__.py ---------------------------------------------------------------------- diff --git a/python/pyarrow/__init__.py b/python/pyarrow/__init__.py index 7561f6d..8b131aa 100644 --- a/python/pyarrow/__init__.py +++ b/python/pyarrow/__init__.py @@ -41,5 +41,4 @@ from pyarrow.schema import (null, bool_, list_, struct, field, DataType, Field, Schema, schema) -from pyarrow.array import RowBatch -from pyarrow.table import Column, Table, from_pandas_dataframe +from pyarrow.table import Column, RecordBatch, Table, from_pandas_dataframe http://git-wip-us.apache.org/repos/asf/arrow/blob/a9747cea/python/pyarrow/array.pyx ---------------------------------------------------------------------- diff --git a/python/pyarrow/array.pyx b/python/pyarrow/array.pyx index cdbe73a..84ab4a4 100644 --- a/python/pyarrow/array.pyx +++ b/python/pyarrow/array.pyx @@ -37,7 +37,7 @@ import pyarrow.schema as schema def total_allocated_bytes(): - cdef MemoryPool* pool = pyarrow.GetMemoryPool() + cdef MemoryPool* pool = pyarrow.get_memory_pool() return pool.bytes_allocated() @@ -243,12 +243,14 @@ def from_pandas_series(object series, object mask=None, timestamps_to_ms=False): series_values = series_values.astype('datetime64[ms]') if mask is None: - check_status(pyarrow.PandasToArrow(pyarrow.GetMemoryPool(), - series_values, &out)) + with nogil: + check_status(pyarrow.PandasToArrow(pyarrow.get_memory_pool(), + series_values, &out)) else: mask = series_as_ndarray(mask) - check_status(pyarrow.PandasMaskedToArrow( - pyarrow.GetMemoryPool(), series_values, mask, &out)) + with nogil: + check_status(pyarrow.PandasMaskedToArrow( + pyarrow.get_memory_pool(), series_values, mask, &out)) return box_arrow_array(out) @@ -262,35 +264,3 @@ cdef object series_as_ndarray(object obj): result = obj return result - -#---------------------------------------------------------------------- -# Table-like data structures - -cdef class RowBatch: - """ - - """ - cdef readonly: - Schema schema - int num_rows - list arrays - - def __cinit__(self, Schema schema, int num_rows, list arrays): - self.schema = schema - self.num_rows = num_rows - self.arrays = arrays - - if len(self.schema) != len(arrays): - raise ValueError('Mismatch number of data arrays and ' - 'schema fields') - - def __len__(self): - return self.num_rows - - property num_columns: - - def __get__(self): - return len(self.arrays) - - def __getitem__(self, i): - return self.arrays[i] http://git-wip-us.apache.org/repos/asf/arrow/blob/a9747cea/python/pyarrow/includes/common.pxd ---------------------------------------------------------------------- diff --git a/python/pyarrow/includes/common.pxd b/python/pyarrow/includes/common.pxd index 133797b..05c0123 100644 --- a/python/pyarrow/includes/common.pxd +++ b/python/pyarrow/includes/common.pxd @@ -47,7 +47,3 @@ cdef extern from "arrow/api.h" namespace "arrow" nogil: c_bool IsKeyError() c_bool IsNotImplemented() c_bool IsInvalid() - - cdef cppclass Buffer: - uint8_t* data() - int64_t size() http://git-wip-us.apache.org/repos/asf/arrow/blob/a9747cea/python/pyarrow/includes/libarrow.pxd ---------------------------------------------------------------------- diff --git a/python/pyarrow/includes/libarrow.pxd b/python/pyarrow/includes/libarrow.pxd index 854d07d..3ae1789 100644 --- a/python/pyarrow/includes/libarrow.pxd +++ b/python/pyarrow/includes/libarrow.pxd @@ -54,6 +54,18 @@ cdef extern from "arrow/api.h" namespace "arrow" nogil: cdef cppclass MemoryPool" arrow::MemoryPool": int64_t bytes_allocated() + cdef cppclass CBuffer" arrow::Buffer": + uint8_t* data() + int64_t size() + + cdef cppclass ResizableBuffer(CBuffer): + CStatus Resize(int64_t nbytes) + CStatus Reserve(int64_t nbytes) + + cdef cppclass PoolBuffer(ResizableBuffer): + PoolBuffer() + PoolBuffer(MemoryPool*) + cdef MemoryPool* default_memory_pool() cdef cppclass CListType" arrow::ListType"(CDataType): @@ -149,6 +161,21 @@ cdef extern from "arrow/api.h" namespace "arrow" nogil: const shared_ptr[CDataType]& type() const shared_ptr[CChunkedArray]& data() + cdef cppclass CRecordBatch" arrow::RecordBatch": + CRecordBatch(const shared_ptr[CSchema]& schema, int32_t num_rows, + const vector[shared_ptr[CArray]]& columns) + + c_bool Equals(const CRecordBatch& other) + + const shared_ptr[CSchema]& schema() + const shared_ptr[CArray]& column(int i) + const c_string& column_name(int i) + + const vector[shared_ptr[CArray]]& columns() + + int num_columns() + int32_t num_rows() + cdef cppclass CTable" arrow::Table": CTable(const c_string& name, const shared_ptr[CSchema]& schema, const vector[shared_ptr[CColumn]]& columns) @@ -186,7 +213,7 @@ cdef extern from "arrow/ipc/metadata.h" namespace "arrow::ipc" nogil: MessageType_DICTIONARY_BATCH" arrow::ipc::Message::DICTIONARY_BATCH" cdef cppclass Message: - CStatus Open(const shared_ptr[Buffer]& buf, + CStatus Open(const shared_ptr[CBuffer]& buf, shared_ptr[Message]* out) int64_t body_length() MessageType type() http://git-wip-us.apache.org/repos/asf/arrow/blob/a9747cea/python/pyarrow/includes/libarrow_io.pxd ---------------------------------------------------------------------- diff --git a/python/pyarrow/includes/libarrow_io.pxd b/python/pyarrow/includes/libarrow_io.pxd index 56d8d4c..8074915 100644 --- a/python/pyarrow/includes/libarrow_io.pxd +++ b/python/pyarrow/includes/libarrow_io.pxd @@ -18,7 +18,7 @@ # distutils: language = c++ from pyarrow.includes.common cimport * -from pyarrow.includes.libarrow cimport MemoryPool +from pyarrow.includes.libarrow cimport * cdef extern from "arrow/io/interfaces.h" namespace "arrow::io" nogil: enum FileMode" arrow::io::FileMode::type": @@ -36,7 +36,7 @@ cdef extern from "arrow/io/interfaces.h" namespace "arrow::io" nogil: FileMode mode() cdef cppclass Readable: - CStatus ReadB" Read"(int64_t nbytes, shared_ptr[Buffer]* out) + CStatus ReadB" Read"(int64_t nbytes, shared_ptr[CBuffer]* out) CStatus Read(int64_t nbytes, int64_t* bytes_read, uint8_t* out) cdef cppclass Seekable: @@ -57,7 +57,7 @@ cdef extern from "arrow/io/interfaces.h" namespace "arrow::io" nogil: CStatus ReadAt(int64_t position, int64_t nbytes, int64_t* bytes_read, uint8_t* buffer) CStatus ReadAt(int64_t position, int64_t nbytes, - int64_t* bytes_read, shared_ptr[Buffer]* out) + int64_t* bytes_read, shared_ptr[CBuffer]* out) cdef cppclass WriteableFileInterface(OutputStream, Seekable): CStatus WriteAt(int64_t position, const uint8_t* data, @@ -143,9 +143,9 @@ cdef extern from "arrow/io/hdfs.h" namespace "arrow::io" nogil: cdef extern from "arrow/io/memory.h" namespace "arrow::io" nogil: - cdef cppclass BufferReader(ReadableFileInterface): - BufferReader(const uint8_t* data, int64_t nbytes) + cdef cppclass CBufferReader" arrow::io::BufferReader"\ + (ReadableFileInterface): + CBufferReader(const uint8_t* data, int64_t nbytes) cdef cppclass BufferOutputStream(OutputStream): - # TODO(wesm) - pass + BufferOutputStream(const shared_ptr[ResizableBuffer]& buffer) http://git-wip-us.apache.org/repos/asf/arrow/blob/a9747cea/python/pyarrow/includes/libarrow_ipc.pxd ---------------------------------------------------------------------- diff --git a/python/pyarrow/includes/libarrow_ipc.pxd b/python/pyarrow/includes/libarrow_ipc.pxd new file mode 100644 index 0000000..eda5b9b --- /dev/null +++ b/python/pyarrow/includes/libarrow_ipc.pxd @@ -0,0 +1,52 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + +# distutils: language = c++ + +from pyarrow.includes.common cimport * +from pyarrow.includes.libarrow cimport (MemoryPool, CArray, CSchema, + CRecordBatch) +from pyarrow.includes.libarrow_io cimport (OutputStream, ReadableFileInterface) + +cdef extern from "arrow/ipc/file.h" namespace "arrow::ipc" nogil: + + cdef cppclass CFileWriter " arrow::ipc::FileWriter": + @staticmethod + CStatus Open(OutputStream* sink, const shared_ptr[CSchema]& schema, + shared_ptr[CFileWriter]* out) + + CStatus WriteRecordBatch(const vector[shared_ptr[CArray]]& columns, + int32_t num_rows) + + CStatus Close() + + cdef cppclass CFileReader " arrow::ipc::FileReader": + + @staticmethod + CStatus Open(const shared_ptr[ReadableFileInterface]& file, + shared_ptr[CFileReader]* out) + + @staticmethod + CStatus Open2" Open"(const shared_ptr[ReadableFileInterface]& file, + int64_t footer_offset, shared_ptr[CFileReader]* out) + + const shared_ptr[CSchema]& schema() + + int num_dictionaries() + int num_record_batches() + + CStatus GetRecordBatch(int i, shared_ptr[CRecordBatch]* batch) http://git-wip-us.apache.org/repos/asf/arrow/blob/a9747cea/python/pyarrow/includes/pyarrow.pxd ---------------------------------------------------------------------- diff --git a/python/pyarrow/includes/pyarrow.pxd b/python/pyarrow/includes/pyarrow.pxd index 4c97166..2fa5a7d 100644 --- a/python/pyarrow/includes/pyarrow.pxd +++ b/python/pyarrow/includes/pyarrow.pxd @@ -18,8 +18,8 @@ # distutils: language = c++ from pyarrow.includes.common cimport * -from pyarrow.includes.libarrow cimport (CArray, CColumn, CDataType, CStatus, - Type, MemoryPool) +from pyarrow.includes.libarrow cimport (CArray, CBuffer, CColumn, + CDataType, CStatus, Type, MemoryPool) cimport pyarrow.includes.libarrow_io as arrow_io @@ -53,7 +53,12 @@ cdef extern from "pyarrow/api.h" namespace "pyarrow" nogil: PyStatus ArrowToPandas(const shared_ptr[CColumn]& arr, object py_ref, PyObject** out) - MemoryPool* GetMemoryPool() + MemoryPool* get_memory_pool() + + +cdef extern from "pyarrow/common.h" namespace "pyarrow" nogil: + cdef cppclass PyBytesBuffer(CBuffer): + PyBytesBuffer(object o) cdef extern from "pyarrow/io.h" namespace "pyarrow" nogil: @@ -63,5 +68,5 @@ cdef extern from "pyarrow/io.h" namespace "pyarrow" nogil: cdef cppclass PyOutputStream(arrow_io.OutputStream): PyOutputStream(object fo) - cdef cppclass PyBytesReader(arrow_io.BufferReader): + cdef cppclass PyBytesReader(arrow_io.CBufferReader): PyBytesReader(object fo) http://git-wip-us.apache.org/repos/asf/arrow/blob/a9747cea/python/pyarrow/io.pxd ---------------------------------------------------------------------- diff --git a/python/pyarrow/io.pxd b/python/pyarrow/io.pxd index 1dbb3fd..d6966cd 100644 --- a/python/pyarrow/io.pxd +++ b/python/pyarrow/io.pxd @@ -22,6 +22,11 @@ from pyarrow.includes.libarrow cimport * from pyarrow.includes.libarrow_io cimport (ReadableFileInterface, OutputStream) +cdef class Buffer: + cdef: + shared_ptr[CBuffer] buffer + + cdef init(self, const shared_ptr[CBuffer]& buffer) cdef class NativeFile: cdef: @@ -29,6 +34,7 @@ cdef class NativeFile: shared_ptr[OutputStream] wr_file bint is_readonly bint is_open + bint own_file # By implementing these "virtual" functions (all functions in Cython # extension classes are technically virtual in the C++ sense) we can expose http://git-wip-us.apache.org/repos/asf/arrow/blob/a9747cea/python/pyarrow/io.pyx ---------------------------------------------------------------------- diff --git a/python/pyarrow/io.pyx b/python/pyarrow/io.pyx index e6e2b62..00a492f 100644 --- a/python/pyarrow/io.pyx +++ b/python/pyarrow/io.pyx @@ -36,6 +36,217 @@ import re import sys import threading + +cdef class NativeFile: + + def __cinit__(self): + self.is_open = False + self.own_file = False + + def __dealloc__(self): + if self.is_open and self.own_file: + self.close() + + def __enter__(self): + return self + + def __exit__(self, exc_type, exc_value, tb): + self.close() + + def close(self): + if self.is_open: + with nogil: + if self.is_readonly: + check_cstatus(self.rd_file.get().Close()) + else: + check_cstatus(self.wr_file.get().Close()) + self.is_open = False + + cdef read_handle(self, shared_ptr[ReadableFileInterface]* file): + self._assert_readable() + file[0] = <shared_ptr[ReadableFileInterface]> self.rd_file + + cdef write_handle(self, shared_ptr[OutputStream]* file): + self._assert_writeable() + file[0] = <shared_ptr[OutputStream]> self.wr_file + + def _assert_readable(self): + if not self.is_readonly: + raise IOError("only valid on readonly files") + + if not self.is_open: + raise IOError("file not open") + + def _assert_writeable(self): + if self.is_readonly: + raise IOError("only valid on writeonly files") + + if not self.is_open: + raise IOError("file not open") + + def size(self): + cdef int64_t size + self._assert_readable() + with nogil: + check_cstatus(self.rd_file.get().GetSize(&size)) + return size + + def tell(self): + cdef int64_t position + with nogil: + if self.is_readonly: + check_cstatus(self.rd_file.get().Tell(&position)) + else: + check_cstatus(self.wr_file.get().Tell(&position)) + return position + + def seek(self, int64_t position): + self._assert_readable() + with nogil: + check_cstatus(self.rd_file.get().Seek(position)) + + def write(self, data): + """ + Write bytes-like (unicode, encoded to UTF-8) to file + """ + self._assert_writeable() + + data = tobytes(data) + + cdef const uint8_t* buf = <const uint8_t*> cp.PyBytes_AS_STRING(data) + cdef int64_t bufsize = len(data) + with nogil: + check_cstatus(self.wr_file.get().Write(buf, bufsize)) + + def read(self, int nbytes): + cdef: + int64_t bytes_read = 0 + uint8_t* buf + shared_ptr[CBuffer] out + + self._assert_readable() + + with nogil: + check_cstatus(self.rd_file.get() + .ReadB(nbytes, &out)) + + result = cp.PyBytes_FromStringAndSize( + <const char*>out.get().data(), out.get().size()) + + return result + + +# ---------------------------------------------------------------------- +# Python file-like objects + +cdef class PythonFileInterface(NativeFile): + cdef: + object handle + + def __cinit__(self, handle, mode='w'): + self.handle = handle + + if mode.startswith('w'): + self.wr_file.reset(new pyarrow.PyOutputStream(handle)) + self.is_readonly = 0 + elif mode.startswith('r'): + self.rd_file.reset(new pyarrow.PyReadableFile(handle)) + self.is_readonly = 1 + else: + raise ValueError('Invalid file mode: {0}'.format(mode)) + + self.is_open = True + + +cdef class BytesReader(NativeFile): + cdef: + object obj + + def __cinit__(self, obj): + if not isinstance(obj, bytes): + raise ValueError('Must pass bytes object') + + self.obj = obj + self.is_readonly = 1 + self.is_open = True + + self.rd_file.reset(new pyarrow.PyBytesReader(obj)) + +# ---------------------------------------------------------------------- +# Arrow buffers + + +cdef class Buffer: + + def __cinit__(self): + pass + + cdef init(self, const shared_ptr[CBuffer]& buffer): + self.buffer = buffer + + def __len__(self): + return self.size + + property size: + + def __get__(self): + return self.buffer.get().size() + + def __getitem__(self, key): + # TODO(wesm): buffer slicing + raise NotImplementedError + + def to_pybytes(self): + return cp.PyBytes_FromStringAndSize( + <const char*>self.buffer.get().data(), + self.buffer.get().size()) + + +cdef shared_ptr[PoolBuffer] allocate_buffer(): + cdef shared_ptr[PoolBuffer] result + result.reset(new PoolBuffer(pyarrow.get_memory_pool())) + return result + + +cdef class InMemoryOutputStream(NativeFile): + + cdef: + shared_ptr[PoolBuffer] buffer + + def __cinit__(self): + self.buffer = allocate_buffer() + self.wr_file.reset(new BufferOutputStream( + <shared_ptr[ResizableBuffer]> self.buffer)) + self.is_readonly = 0 + self.is_open = True + + def get_result(self): + cdef Buffer result = Buffer() + + check_cstatus(self.wr_file.get().Close()) + result.init(<shared_ptr[CBuffer]> self.buffer) + + self.is_open = False + return result + + +def buffer_from_bytes(object obj): + """ + Construct an Arrow buffer from a Python bytes object + """ + if not isinstance(obj, bytes): + raise ValueError('Must pass bytes object') + + cdef shared_ptr[CBuffer] buf + buf.reset(new pyarrow.PyBytesBuffer(obj)) + + cdef Buffer result = Buffer() + result.init(buf) + return result + +# ---------------------------------------------------------------------- +# HDFS IO implementation + _HDFS_PATH_RE = re.compile('hdfs://(.*):(\d+)(.*)') try: @@ -274,6 +485,7 @@ cdef class HdfsClient: out.buffer_size = c_buffer_size out.parent = self out.is_open = True + out.own_file = True return out @@ -322,134 +534,6 @@ cdef class HdfsClient: f.download(stream) -cdef class NativeFile: - - def __cinit__(self): - self.is_open = False - - def __dealloc__(self): - if self.is_open: - self.close() - - def __enter__(self): - return self - - def __exit__(self, exc_type, exc_value, tb): - self.close() - - def close(self): - if self.is_open: - with nogil: - if self.is_readonly: - check_cstatus(self.rd_file.get().Close()) - else: - check_cstatus(self.wr_file.get().Close()) - self.is_open = False - - cdef read_handle(self, shared_ptr[ReadableFileInterface]* file): - self._assert_readable() - file[0] = <shared_ptr[ReadableFileInterface]> self.rd_file - - cdef write_handle(self, shared_ptr[OutputStream]* file): - self._assert_writeable() - file[0] = <shared_ptr[OutputStream]> self.wr_file - - def _assert_readable(self): - if not self.is_readonly: - raise IOError("only valid on readonly files") - - def _assert_writeable(self): - if self.is_readonly: - raise IOError("only valid on writeonly files") - - def size(self): - cdef int64_t size - self._assert_readable() - with nogil: - check_cstatus(self.rd_file.get().GetSize(&size)) - return size - - def tell(self): - cdef int64_t position - with nogil: - if self.is_readonly: - check_cstatus(self.rd_file.get().Tell(&position)) - else: - check_cstatus(self.wr_file.get().Tell(&position)) - return position - - def seek(self, int64_t position): - self._assert_readable() - with nogil: - check_cstatus(self.rd_file.get().Seek(position)) - - def write(self, data): - """ - Write bytes-like (unicode, encoded to UTF-8) to file - """ - self._assert_writeable() - - data = tobytes(data) - - cdef const uint8_t* buf = <const uint8_t*> cp.PyBytes_AS_STRING(data) - cdef int64_t bufsize = len(data) - with nogil: - check_cstatus(self.wr_file.get().Write(buf, bufsize)) - - def read(self, int nbytes): - cdef: - int64_t bytes_read = 0 - uint8_t* buf - shared_ptr[Buffer] out - - self._assert_readable() - - with nogil: - check_cstatus(self.rd_file.get() - .ReadB(nbytes, &out)) - - result = cp.PyBytes_FromStringAndSize( - <const char*>out.get().data(), out.get().size()) - - return result - - -# ---------------------------------------------------------------------- -# Python file-like objects - -cdef class PythonFileInterface(NativeFile): - cdef: - object handle - - def __cinit__(self, handle, mode='w'): - self.handle = handle - - if mode.startswith('w'): - self.wr_file.reset(new pyarrow.PyOutputStream(handle)) - self.is_readonly = 0 - elif mode.startswith('r'): - self.rd_file.reset(new pyarrow.PyReadableFile(handle)) - self.is_readonly = 1 - else: - raise ValueError('Invalid file mode: {0}'.format(mode)) - - self.is_open = True - - -cdef class BytesReader(NativeFile): - cdef: - object obj - - def __cinit__(self, obj): - if not isinstance(obj, bytes): - raise ValueError('Must pass bytes object') - - self.obj = obj - self.is_readonly = 1 - self.is_open = True - - self.rd_file.reset(new pyarrow.PyBytesReader(obj)) - # ---------------------------------------------------------------------- # Specialization for HDFS http://git-wip-us.apache.org/repos/asf/arrow/blob/a9747cea/python/pyarrow/ipc.pyx ---------------------------------------------------------------------- diff --git a/python/pyarrow/ipc.pyx b/python/pyarrow/ipc.pyx new file mode 100644 index 0000000..f8da3a7 --- /dev/null +++ b/python/pyarrow/ipc.pyx @@ -0,0 +1,155 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + +# Cython wrappers for arrow::ipc + +# cython: profile=False +# distutils: language = c++ +# cython: embedsignature = True + +from pyarrow.includes.libarrow cimport * +from pyarrow.includes.libarrow_io cimport * +from pyarrow.includes.libarrow_ipc cimport * +cimport pyarrow.includes.pyarrow as pyarrow + +from pyarrow.error cimport check_cstatus +from pyarrow.io cimport NativeFile +from pyarrow.schema cimport Schema +from pyarrow.table cimport RecordBatch + +from pyarrow.compat import frombytes, tobytes +import pyarrow.io as io + +cimport cpython as cp + + +cdef get_reader(source, shared_ptr[ReadableFileInterface]* reader): + cdef NativeFile nf + + if isinstance(source, bytes): + source = io.BytesReader(source) + elif not isinstance(source, io.NativeFile) and hasattr(source, 'read'): + # Optimistically hope this is file-like + source = io.PythonFileInterface(source, mode='r') + + if isinstance(source, NativeFile): + nf = source + + # TODO: what about read-write sources (e.g. memory maps) + if not nf.is_readonly: + raise IOError('Native file is not readable') + + nf.read_handle(reader) + else: + raise TypeError('Unable to read from object of type: {0}' + .format(type(source))) + + +cdef get_writer(source, shared_ptr[OutputStream]* writer): + cdef NativeFile nf + + if not isinstance(source, io.NativeFile) and hasattr(source, 'write'): + # Optimistically hope this is file-like + source = io.PythonFileInterface(source, mode='w') + + if isinstance(source, io.NativeFile): + nf = source + + if nf.is_readonly: + raise IOError('Native file is not writeable') + + nf.write_handle(writer) + else: + raise TypeError('Unable to read from object of type: {0}' + .format(type(source))) + + +cdef class ArrowFileWriter: + cdef: + shared_ptr[CFileWriter] writer + shared_ptr[OutputStream] sink + bint closed + + def __cinit__(self, sink, Schema schema): + self.closed = True + get_writer(sink, &self.sink) + + with nogil: + check_cstatus(CFileWriter.Open(self.sink.get(), schema.sp_schema, + &self.writer)) + + self.closed = False + + def __dealloc__(self): + if not self.closed: + self.close() + + def write_record_batch(self, RecordBatch batch): + cdef CRecordBatch* bptr = batch.batch + with nogil: + check_cstatus(self.writer.get() + .WriteRecordBatch(bptr.columns(), bptr.num_rows())) + + def close(self): + with nogil: + check_cstatus(self.writer.get().Close()) + self.closed = True + + +cdef class ArrowFileReader: + cdef: + shared_ptr[CFileReader] reader + + def __cinit__(self, source, footer_offset=None): + cdef shared_ptr[ReadableFileInterface] reader + get_reader(source, &reader) + + cdef int64_t offset = 0 + if footer_offset is not None: + offset = footer_offset + + with nogil: + if offset != 0: + check_cstatus(CFileReader.Open2(reader, offset, &self.reader)) + else: + check_cstatus(CFileReader.Open(reader, &self.reader)) + + property num_dictionaries: + + def __get__(self): + return self.reader.get().num_dictionaries() + + property num_record_batches: + + def __get__(self): + return self.reader.get().num_record_batches() + + def get_record_batch(self, int i): + cdef: + shared_ptr[CRecordBatch] batch + RecordBatch result + + if i < 0 or i >= self.num_record_batches: + raise ValueError('Batch number {0} out of range'.format(i)) + + with nogil: + check_cstatus(self.reader.get().GetRecordBatch(i, &batch)) + + result = RecordBatch() + result.init(batch) + + return result http://git-wip-us.apache.org/repos/asf/arrow/blob/a9747cea/python/pyarrow/table.pxd ---------------------------------------------------------------------- diff --git a/python/pyarrow/table.pxd b/python/pyarrow/table.pxd index 0a5c122..79c9ae3 100644 --- a/python/pyarrow/table.pxd +++ b/python/pyarrow/table.pxd @@ -16,7 +16,10 @@ # under the License. from pyarrow.includes.common cimport shared_ptr -from pyarrow.includes.libarrow cimport CChunkedArray, CColumn, CTable +from pyarrow.includes.libarrow cimport (CChunkedArray, CColumn, CTable, + CRecordBatch) + +from pyarrow.schema cimport Schema cdef class ChunkedArray: @@ -41,6 +44,16 @@ cdef class Table: cdef: shared_ptr[CTable] sp_table CTable* table - + cdef init(self, const shared_ptr[CTable]& table) cdef _check_nullptr(self) + + +cdef class RecordBatch: + cdef: + shared_ptr[CRecordBatch] sp_batch + CRecordBatch* batch + Schema _schema + + cdef init(self, const shared_ptr[CRecordBatch]& table) + cdef _check_nullptr(self) http://git-wip-us.apache.org/repos/asf/arrow/blob/a9747cea/python/pyarrow/table.pyx ---------------------------------------------------------------------- diff --git a/python/pyarrow/table.pyx b/python/pyarrow/table.pyx index ade82aa..a1cadcd 100644 --- a/python/pyarrow/table.pyx +++ b/python/pyarrow/table.pyx @@ -19,6 +19,8 @@ # distutils: language = c++ # cython: embedsignature = True +from cython.operator cimport dereference as deref + from pyarrow.includes.libarrow cimport * cimport pyarrow.includes.pyarrow as pyarrow @@ -45,8 +47,8 @@ cdef class ChunkedArray: cdef _check_nullptr(self): if self.chunked_array == NULL: - raise ReferenceError("ChunkedArray object references a NULL pointer." - "Not initialized.") + raise ReferenceError("ChunkedArray object references a NULL " + "pointer. Not initialized.") def length(self): self._check_nullptr() @@ -144,6 +146,130 @@ cdef class Column: return chunked_array +cdef _schema_from_arrays(arrays, names, shared_ptr[CSchema]* schema): + cdef: + Array arr + c_string c_name + vector[shared_ptr[CField]] fields + + cdef int K = len(arrays) + + fields.resize(K) + for i in range(K): + arr = arrays[i] + c_name = tobytes(names[i]) + fields[i].reset(new CField(c_name, arr.type.sp_type, True)) + + schema.reset(new CSchema(fields)) + + + +cdef _dataframe_to_arrays(df, name, timestamps_to_ms): + from pyarrow.array import from_pandas_series + + cdef: + list names = [] + list arrays = [] + + for name in df.columns: + col = df[name] + arr = from_pandas_series(col, timestamps_to_ms=timestamps_to_ms) + + names.append(name) + arrays.append(arr) + + return names, arrays + + +cdef class RecordBatch: + + def __cinit__(self): + self.batch = NULL + self._schema = None + + cdef init(self, const shared_ptr[CRecordBatch]& batch): + self.sp_batch = batch + self.batch = batch.get() + + cdef _check_nullptr(self): + if self.batch == NULL: + raise ReferenceError("Object not initialized") + + def __len__(self): + self._check_nullptr() + return self.batch.num_rows() + + property num_columns: + + def __get__(self): + self._check_nullptr() + return self.batch.num_columns() + + property num_rows: + + def __get__(self): + return len(self) + + property schema: + + def __get__(self): + cdef Schema schema + self._check_nullptr() + if self._schema is None: + schema = Schema() + schema.init_schema(self.batch.schema()) + self._schema = schema + + return self._schema + + def __getitem__(self, i): + cdef Array arr = Array() + arr.init(self.batch.column(i)) + return arr + + def equals(self, RecordBatch other): + self._check_nullptr() + other._check_nullptr() + + return self.batch.Equals(deref(other.batch)) + + @classmethod + def from_pandas(cls, df): + """ + Convert pandas.DataFrame to an Arrow RecordBatch + """ + names, arrays = _dataframe_to_arrays(df, None, False) + return cls.from_arrays(names, arrays) + + @staticmethod + def from_arrays(names, arrays): + cdef: + Array arr + RecordBatch result + c_string c_name + shared_ptr[CSchema] schema + shared_ptr[CRecordBatch] batch + vector[shared_ptr[CArray]] c_arrays + int32_t num_rows + + if len(arrays) == 0: + raise ValueError('Record batch cannot contain no arrays (for now)') + + num_rows = len(arrays[0]) + _schema_from_arrays(arrays, names, &schema) + + for i in range(len(arrays)): + arr = arrays[i] + c_arrays.push_back(arr.sp_array) + + batch.reset(new CRecordBatch(schema, num_rows, c_arrays)) + + result = RecordBatch() + result.init(batch) + + return result + + cdef class Table: ''' Do not call this class's constructor directly. @@ -161,38 +287,50 @@ cdef class Table: raise ReferenceError("Table object references a NULL pointer." "Not initialized.") - @staticmethod - def from_pandas(df, name=None): - return from_pandas_dataframe(df, name=name) + @classmethod + def from_pandas(cls, df, name=None, timestamps_to_ms=False): + """ + Convert pandas.DataFrame to an Arrow Table + + Parameters + ---------- + df: pandas.DataFrame + + name: str + + timestamps_to_ms: bool + Convert datetime columns to ms resolution. This is needed for + compability with other functionality like Parquet I/O which + only supports milliseconds. + """ + names, arrays = _dataframe_to_arrays(df, name=name, + timestamps_to_ms=timestamps_to_ms) + return cls.from_arrays(names, arrays, name=name) @staticmethod def from_arrays(names, arrays, name=None): cdef: Array arr - Table result c_string c_name vector[shared_ptr[CField]] fields vector[shared_ptr[CColumn]] columns + Table result shared_ptr[CSchema] schema shared_ptr[CTable] table - cdef int K = len(arrays) + _schema_from_arrays(arrays, names, &schema) - fields.resize(K) + cdef int K = len(arrays) columns.resize(K) for i in range(K): arr = arrays[i] - c_name = tobytes(names[i]) - - fields[i].reset(new CField(c_name, arr.type.sp_type, True)) - columns[i].reset(new CColumn(fields[i], arr.sp_array)) + columns[i].reset(new CColumn(schema.get().field(i), arr.sp_array)) if name is None: c_name = '' else: c_name = tobytes(name) - schema.reset(new CSchema(fields)) table.reset(new CTable(c_name, schema, columns)) result = Table() @@ -268,32 +406,4 @@ cdef class Table: -def from_pandas_dataframe(object df, name=None, timestamps_to_ms=False): - """ - Convert pandas.DataFrame to an Arrow Table - - Parameters - ---------- - df: pandas.DataFrame - - name: str - - timestamps_to_ms: bool - Convert datetime columns to ms resolution. This is needed for - compability with other functionality like Parquet I/O which - only supports milliseconds. - """ - from pyarrow.array import from_pandas_series - - cdef: - list names = [] - list arrays = [] - - for name in df.columns: - col = df[name] - arr = from_pandas_series(col, timestamps_to_ms=timestamps_to_ms) - - names.append(name) - arrays.append(arr) - - return Table.from_arrays(names, arrays, name=name) +from_pandas_dataframe = Table.from_pandas http://git-wip-us.apache.org/repos/asf/arrow/blob/a9747cea/python/pyarrow/tests/test_array.py ---------------------------------------------------------------------- diff --git a/python/pyarrow/tests/test_array.py b/python/pyarrow/tests/test_array.py index 86147f8..0a17f69 100644 --- a/python/pyarrow/tests/test_array.py +++ b/python/pyarrow/tests/test_array.py @@ -19,6 +19,10 @@ import pyarrow import pyarrow.formatting as fmt +def test_total_bytes_allocated(): + assert pyarrow.total_allocated_bytes() == 0 + + def test_repr_on_pre_init_array(): arr = pyarrow.array.Array() assert len(repr(arr)) > 0 http://git-wip-us.apache.org/repos/asf/arrow/blob/a9747cea/python/pyarrow/tests/test_io.py ---------------------------------------------------------------------- diff --git a/python/pyarrow/tests/test_io.py b/python/pyarrow/tests/test_io.py index 9a41ebe..211a12b 100644 --- a/python/pyarrow/tests/test_io.py +++ b/python/pyarrow/tests/test_io.py @@ -98,3 +98,44 @@ def test_bytes_reader(): def test_bytes_reader_non_bytes(): with pytest.raises(ValueError): io.BytesReader(u('some sample data')) + + + +# ---------------------------------------------------------------------- +# Buffers + + +def test_buffer_bytes(): + val = b'some data' + + buf = io.buffer_from_bytes(val) + assert isinstance(buf, io.Buffer) + + result = buf.to_pybytes() + + assert result == val + + +def test_memory_output_stream(): + # 10 bytes + val = b'dataabcdef' + + f = io.InMemoryOutputStream() + + K = 1000 + for i in range(K): + f.write(val) + + buf = f.get_result() + + assert len(buf) == len(val) * K + assert buf.to_pybytes() == val * K + + +def test_inmemory_write_after_closed(): + f = io.InMemoryOutputStream() + f.write(b'ok') + f.get_result() + + with pytest.raises(IOError): + f.write(b'not ok') http://git-wip-us.apache.org/repos/asf/arrow/blob/a9747cea/python/pyarrow/tests/test_ipc.py ---------------------------------------------------------------------- diff --git a/python/pyarrow/tests/test_ipc.py b/python/pyarrow/tests/test_ipc.py new file mode 100644 index 0000000..b9e9e6e --- /dev/null +++ b/python/pyarrow/tests/test_ipc.py @@ -0,0 +1,116 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + +import io + +import numpy as np +import pandas as pd + +import pyarrow as A +import pyarrow.io as arrow_io +import pyarrow.ipc as ipc + + +class RoundtripTest(object): + # Also tests writing zero-copy NumPy array with additional padding + + def __init__(self): + self.sink = self._get_sink() + + def _get_sink(self): + return io.BytesIO() + + def _get_source(self): + return self.sink.getvalue() + + def run(self): + nrows = 5 + df = pd.DataFrame({ + 'one': np.random.randn(nrows), + 'two': ['foo', np.nan, 'bar', 'bazbaz', 'qux']}) + + batch = A.RecordBatch.from_pandas(df) + writer = ipc.ArrowFileWriter(self.sink, batch.schema) + + num_batches = 5 + frames = [] + batches = [] + for i in range(num_batches): + unique_df = df.copy() + unique_df['one'] = np.random.randn(nrows) + + batch = A.RecordBatch.from_pandas(unique_df) + writer.write_record_batch(batch) + frames.append(unique_df) + batches.append(batch) + + writer.close() + + file_contents = self._get_source() + reader = ipc.ArrowFileReader(file_contents) + + assert reader.num_record_batches == num_batches + + for i in range(num_batches): + # it works. Must convert back to DataFrame + batch = reader.get_record_batch(i) + assert batches[i].equals(batch) + + +class InMemoryStreamTest(RoundtripTest): + + def _get_sink(self): + return arrow_io.InMemoryOutputStream() + + def _get_source(self): + return self.sink.get_result() + + +def test_ipc_file_simple_roundtrip(): + helper = RoundtripTest() + helper.run() + + +# XXX: For benchmarking + +def big_batch(): + df = pd.DataFrame( + np.random.randn(2**4, 2**20).T, + columns=[str(i) for i in range(2**4)] + ) + + df = pd.concat([df] * 2 ** 3, ignore_index=True) + + return A.RecordBatch.from_pandas(df) + + +def write_to_memory(batch): + sink = io.BytesIO() + write_file(batch, sink) + return sink.getvalue() + + +def write_file(batch, sink): + writer = ipc.ArrowFileWriter(sink, batch.schema) + writer.write_record_batch(batch) + writer.close() + + +def read_file(source): + reader = ipc.ArrowFileReader(source) + return [reader.get_record_batch(i) + for i in range(reader.num_record_batches)] http://git-wip-us.apache.org/repos/asf/arrow/blob/a9747cea/python/pyarrow/tests/test_table.py ---------------------------------------------------------------------- diff --git a/python/pyarrow/tests/test_table.py b/python/pyarrow/tests/test_table.py index abf1431..c513032 100644 --- a/python/pyarrow/tests/test_table.py +++ b/python/pyarrow/tests/test_table.py @@ -15,60 +15,52 @@ # specific language governing permissions and limitations # under the License. -from pyarrow.compat import unittest import pyarrow as A -class TestRowBatch(unittest.TestCase): +def test_recordbatch_basics(): + data = [ + A.from_pylist(range(5)), + A.from_pylist([-10, -5, 0, 5, 10]) + ] - def test_basics(self): - data = [ - A.from_pylist(range(5)), - A.from_pylist([-10, -5, 0, 5, 10]) - ] - num_rows = 5 + batch = A.RecordBatch.from_arrays(['c0', 'c1'], data) - descr = A.schema([A.field('c0', data[0].type), - A.field('c1', data[1].type)]) + assert len(batch) == 5 + assert batch.num_rows == 5 + assert batch.num_columns == len(data) - batch = A.RowBatch(descr, num_rows, data) - assert len(batch) == num_rows - assert batch.num_rows == num_rows - assert batch.num_columns == len(data) +def test_table_basics(): + data = [ + A.from_pylist(range(5)), + A.from_pylist([-10, -5, 0, 5, 10]) + ] + table = A.Table.from_arrays(('a', 'b'), data, 'table_name') + assert table.name == 'table_name' + assert len(table) == 5 + assert table.num_rows == 5 + assert table.num_columns == 2 + assert table.shape == (5, 2) + for col in table.itercolumns(): + for chunk in col.data.iterchunks(): + assert chunk is not None -class TestTable(unittest.TestCase): - def test_basics(self): - data = [ - A.from_pylist(range(5)), - A.from_pylist([-10, -5, 0, 5, 10]) - ] - table = A.Table.from_arrays(('a', 'b'), data, 'table_name') - assert table.name == 'table_name' - assert len(table) == 5 - assert table.num_rows == 5 - assert table.num_columns == 2 - assert table.shape == (5, 2) +def test_table_pandas(): + data = [ + A.from_pylist(range(5)), + A.from_pylist([-10, -5, 0, 5, 10]) + ] + table = A.Table.from_arrays(('a', 'b'), data, 'table_name') - for col in table.itercolumns(): - for chunk in col.data.iterchunks(): - assert chunk is not None + # TODO: Use this part once from_pandas is implemented + # data = {'a': range(5), 'b': [-10, -5, 0, 5, 10]} + # df = pd.DataFrame(data) + # A.Table.from_pandas(df) - def test_pandas(self): - data = [ - A.from_pylist(range(5)), - A.from_pylist([-10, -5, 0, 5, 10]) - ] - table = A.Table.from_arrays(('a', 'b'), data, 'table_name') - - # TODO: Use this part once from_pandas is implemented - # data = {'a': range(5), 'b': [-10, -5, 0, 5, 10]} - # df = pd.DataFrame(data) - # A.Table.from_pandas(df) - - df = table.to_pandas() - assert set(df.columns) == set(('a', 'b')) - assert df.shape == (5, 2) - assert df.ix[0, 'b'] == -10 + df = table.to_pandas() + assert set(df.columns) == set(('a', 'b')) + assert df.shape == (5, 2) + assert df.loc[0, 'b'] == -10 http://git-wip-us.apache.org/repos/asf/arrow/blob/a9747cea/python/setup.py ---------------------------------------------------------------------- diff --git a/python/setup.py b/python/setup.py index d1be122..d040ea7 100644 --- a/python/setup.py +++ b/python/setup.py @@ -102,6 +102,7 @@ class build_ext(_build_ext): 'config', 'error', 'io', + 'ipc', 'parquet', 'scalar', 'schema', http://git-wip-us.apache.org/repos/asf/arrow/blob/a9747cea/python/src/pyarrow/adapters/builtin.cc ---------------------------------------------------------------------- diff --git a/python/src/pyarrow/adapters/builtin.cc b/python/src/pyarrow/adapters/builtin.cc index 78ef1b3..680f3a5 100644 --- a/python/src/pyarrow/adapters/builtin.cc +++ b/python/src/pyarrow/adapters/builtin.cc @@ -426,7 +426,7 @@ Status ConvertPySequence(PyObject* obj, std::shared_ptr<arrow::Array>* out) { // Give the sequence converter an array builder std::shared_ptr<ArrayBuilder> builder; - RETURN_ARROW_NOT_OK(arrow::MakeBuilder(GetMemoryPool(), type, &builder)); + RETURN_ARROW_NOT_OK(arrow::MakeBuilder(get_memory_pool(), type, &builder)); converter->Init(builder); PY_RETURN_NOT_OK(converter->AppendData(obj)); http://git-wip-us.apache.org/repos/asf/arrow/blob/a9747cea/python/src/pyarrow/adapters/pandas.cc ---------------------------------------------------------------------- diff --git a/python/src/pyarrow/adapters/pandas.cc b/python/src/pyarrow/adapters/pandas.cc index d224074..ae24b7e 100644 --- a/python/src/pyarrow/adapters/pandas.cc +++ b/python/src/pyarrow/adapters/pandas.cc @@ -602,6 +602,8 @@ class ArrowDeserializer { } Status AllocateOutput(int type) { + PyAcquireGIL lock; + npy_intp dims[1] = {col_->length()}; out_ = reinterpret_cast<PyArrayObject*>(PyArray_SimpleNew(1, dims, type)); @@ -616,6 +618,8 @@ class ArrowDeserializer { } Status OutputFromData(int type, void* data) { + PyAcquireGIL lock; + // Zero-Copy. We can pass the data pointer directly to NumPy. Py_INCREF(py_ref_); OwnedRef py_ref(py_ref_); @@ -706,6 +710,8 @@ class ArrowDeserializer { inline typename std::enable_if< arrow_traits<T2>::is_boolean, Status>::type ConvertValues(const std::shared_ptr<Array>& arr) { + PyAcquireGIL lock; + arrow::BooleanArray* bool_arr = static_cast<arrow::BooleanArray*>(arr.get()); if (arr->null_count() > 0) { @@ -743,6 +749,8 @@ class ArrowDeserializer { inline typename std::enable_if< T2 == arrow::Type::STRING, Status>::type ConvertValues(const std::shared_ptr<Array>& arr) { + PyAcquireGIL lock; + RETURN_NOT_OK(AllocateOutput(NPY_OBJECT)); PyObject** out_values = reinterpret_cast<PyObject**>(PyArray_DATA(out_)); http://git-wip-us.apache.org/repos/asf/arrow/blob/a9747cea/python/src/pyarrow/common.cc ---------------------------------------------------------------------- diff --git a/python/src/pyarrow/common.cc b/python/src/pyarrow/common.cc index 82b14fd..09f3efb 100644 --- a/python/src/pyarrow/common.cc +++ b/python/src/pyarrow/common.cc @@ -63,7 +63,7 @@ class PyArrowMemoryPool : public arrow::MemoryPool { int64_t bytes_allocated_; }; -arrow::MemoryPool* GetMemoryPool() { +arrow::MemoryPool* get_memory_pool() { static PyArrowMemoryPool memory_pool; return &memory_pool; } http://git-wip-us.apache.org/repos/asf/arrow/blob/a9747cea/python/src/pyarrow/common.h ---------------------------------------------------------------------- diff --git a/python/src/pyarrow/common.h b/python/src/pyarrow/common.h index bc599f8..96eed16 100644 --- a/python/src/pyarrow/common.h +++ b/python/src/pyarrow/common.h @@ -109,7 +109,8 @@ class PyGILGuard { return Status::UnknownError(message); \ } -PYARROW_EXPORT arrow::MemoryPool* GetMemoryPool(); +// Return the common PyArrow memory pool +PYARROW_EXPORT arrow::MemoryPool* get_memory_pool(); class PYARROW_EXPORT NumPyBuffer : public arrow::Buffer { public: @@ -120,6 +121,7 @@ class PYARROW_EXPORT NumPyBuffer : public arrow::Buffer { data_ = reinterpret_cast<const uint8_t*>(PyArray_DATA(arr_)); size_ = PyArray_SIZE(arr_); + capacity_ = size_ * PyArray_DESCR(arr_)->elsize; } virtual ~NumPyBuffer() { @@ -139,6 +141,22 @@ class PYARROW_EXPORT PyBytesBuffer : public arrow::Buffer { PyObject* obj_; }; + +class PyAcquireGIL { + public: + PyAcquireGIL() { + state_ = PyGILState_Ensure(); + } + + ~PyAcquireGIL() { + PyGILState_Release(state_); + } + + private: + PyGILState_STATE state_; + DISALLOW_COPY_AND_ASSIGN(PyAcquireGIL); +}; + } // namespace pyarrow #endif // PYARROW_COMMON_H http://git-wip-us.apache.org/repos/asf/arrow/blob/a9747cea/python/src/pyarrow/io.cc ---------------------------------------------------------------------- diff --git a/python/src/pyarrow/io.cc b/python/src/pyarrow/io.cc index 35054e9..9879b34 100644 --- a/python/src/pyarrow/io.cc +++ b/python/src/pyarrow/io.cc @@ -47,9 +47,9 @@ static arrow::Status CheckPyError() { PyErr_Fetch(&exc_type, &exc_value, &traceback); PyObjectStringify stringified(exc_value); std::string message(stringified.bytes); - Py_DECREF(exc_type); - Py_DECREF(exc_value); - Py_DECREF(traceback); + Py_XDECREF(exc_type); + Py_XDECREF(exc_value); + Py_XDECREF(traceback); PyErr_Clear(); return arrow::Status::IOError(message); }