Repository: arrow Updated Branches: refs/heads/master 750b77dc6 -> f50f2eacb
http://git-wip-us.apache.org/repos/asf/arrow/blob/f50f2eac/cpp/src/arrow/python/arrow_to_python.h ---------------------------------------------------------------------- diff --git a/cpp/src/arrow/python/arrow_to_python.h b/cpp/src/arrow/python/arrow_to_python.h index 559ce18..e187d59 100644 --- a/cpp/src/arrow/python/arrow_to_python.h +++ b/cpp/src/arrow/python/arrow_to_python.h @@ -46,8 +46,7 @@ namespace py { /// \param[out] out the reconstructed data /// \return Status ARROW_EXPORT -Status ReadSerializedObject(std::shared_ptr<io::RandomAccessFile> src, - SerializedPyObject* out); +Status ReadSerializedObject(io::RandomAccessFile* src, SerializedPyObject* out); /// \brief Reconstruct Python object from Arrow-serialized representation /// \param[in] object http://git-wip-us.apache.org/repos/asf/arrow/blob/f50f2eac/cpp/src/arrow/python/python_to_arrow.cc ---------------------------------------------------------------------- diff --git a/cpp/src/arrow/python/python_to_arrow.cc b/cpp/src/arrow/python/python_to_arrow.cc index a1ccd99..e00f194 100644 --- a/cpp/src/arrow/python/python_to_arrow.cc +++ b/cpp/src/arrow/python/python_to_arrow.cc @@ -53,18 +53,18 @@ namespace py { /// scalar Python types, lists, tuples, dictionaries and tensors. class SequenceBuilder { public: - explicit SequenceBuilder(MemoryPool* pool = nullptr) + explicit SequenceBuilder(MemoryPool* pool ARROW_MEMORY_POOL_DEFAULT) : pool_(pool), - types_(pool, ::arrow::int8()), - offsets_(pool, ::arrow::int32()), + types_(::arrow::int8(), pool), + offsets_(::arrow::int32(), pool), nones_(pool), - bools_(pool, ::arrow::boolean()), - ints_(pool, ::arrow::int64()), - bytes_(pool, ::arrow::binary()), + bools_(::arrow::boolean(), pool), + ints_(::arrow::int64(), pool), + bytes_(::arrow::binary(), pool), strings_(pool), - floats_(pool, ::arrow::float32()), - doubles_(pool, ::arrow::float64()), - tensor_indices_(pool, ::arrow::int32()), + floats_(::arrow::float32(), pool), + doubles_(::arrow::float64(), pool), + tensor_indices_(::arrow::int32(), pool), list_offsets_({0}), tuple_offsets_({0}), dict_offsets_({0}) {} @@ -184,7 +184,7 @@ class SequenceBuilder { if (data != nullptr) { DCHECK(data->length() == offsets.back()); std::shared_ptr<Array> offset_array; - Int32Builder builder(pool_, std::make_shared<Int32Type>()); + Int32Builder builder(::arrow::int32(), pool_); RETURN_NOT_OK(builder.Append(offsets.data(), offsets.size())); RETURN_NOT_OK(builder.Finish(&offset_array)); std::shared_ptr<Array> list_array; http://git-wip-us.apache.org/repos/asf/arrow/blob/f50f2eac/cpp/src/arrow/type.cc ---------------------------------------------------------------------- diff --git a/cpp/src/arrow/type.cc b/cpp/src/arrow/type.cc index 82cd137..b7963b8 100644 --- a/cpp/src/arrow/type.cc +++ b/cpp/src/arrow/type.cc @@ -313,6 +313,8 @@ Status Schema::AddMetadata(const std::shared_ptr<const KeyValueMetadata>& metada return Status::OK(); } +std::shared_ptr<const KeyValueMetadata> Schema::metadata() const { return metadata_; } + std::shared_ptr<Schema> Schema::RemoveMetadata() const { return std::make_shared<Schema>(fields_); } http://git-wip-us.apache.org/repos/asf/arrow/blob/f50f2eac/cpp/src/arrow/type.h ---------------------------------------------------------------------- diff --git a/cpp/src/arrow/type.h b/cpp/src/arrow/type.h index 30cd71e..4cd17bc 100644 --- a/cpp/src/arrow/type.h +++ b/cpp/src/arrow/type.h @@ -730,7 +730,7 @@ class ARROW_EXPORT Schema { /// \brief The custom key-value metadata, if any /// /// \return metadata may be nullptr - std::shared_ptr<const KeyValueMetadata> metadata() const { return metadata_; } + std::shared_ptr<const KeyValueMetadata> metadata() const; /// \brief Render a string representation of the schema suitable for debugging std::string ToString() const; http://git-wip-us.apache.org/repos/asf/arrow/blob/f50f2eac/cpp/src/arrow/util/key_value_metadata.cc ---------------------------------------------------------------------- diff --git a/cpp/src/arrow/util/key_value_metadata.cc b/cpp/src/arrow/util/key_value_metadata.cc index 6877a6a..0497f65 100644 --- a/cpp/src/arrow/util/key_value_metadata.cc +++ b/cpp/src/arrow/util/key_value_metadata.cc @@ -46,7 +46,9 @@ KeyValueMetadata::KeyValueMetadata() : keys_(), values_() {} KeyValueMetadata::KeyValueMetadata( const std::unordered_map<std::string, std::string>& map) - : keys_(UnorderedMapKeys(map)), values_(UnorderedMapValues(map)) {} + : keys_(UnorderedMapKeys(map)), values_(UnorderedMapValues(map)) { + DCHECK_EQ(keys_.size(), values_.size()); +} KeyValueMetadata::KeyValueMetadata(const std::vector<std::string>& keys, const std::vector<std::string>& values) http://git-wip-us.apache.org/repos/asf/arrow/blob/f50f2eac/python/pyarrow/__init__.py ---------------------------------------------------------------------- diff --git a/python/pyarrow/__init__.py b/python/pyarrow/__init__.py index d00286d..68ae017 100644 --- a/python/pyarrow/__init__.py +++ b/python/pyarrow/__init__.py @@ -100,8 +100,8 @@ import pyarrow.hdfs as hdfs from pyarrow.ipc import (Message, MessageReader, RecordBatchFileReader, RecordBatchFileWriter, RecordBatchStreamReader, RecordBatchStreamWriter, - read_message, read_record_batch, read_tensor, - write_tensor, + read_message, read_record_batch, read_schema, + read_tensor, write_tensor, get_record_batch_size, get_tensor_size, open_stream, open_file, http://git-wip-us.apache.org/repos/asf/arrow/blob/f50f2eac/python/pyarrow/includes/libarrow.pxd ---------------------------------------------------------------------- diff --git a/python/pyarrow/includes/libarrow.pxd b/python/pyarrow/includes/libarrow.pxd index 082fb61..fcf27da 100644 --- a/python/pyarrow/includes/libarrow.pxd +++ b/python/pyarrow/includes/libarrow.pxd @@ -632,8 +632,8 @@ cdef extern from "arrow/ipc/api.h" namespace "arrow::ipc" nogil: cdef cppclass CRecordBatchStreamReader \ " arrow::ipc::RecordBatchStreamReader"(CRecordBatchReader): @staticmethod - CStatus Open(const shared_ptr[InputStream]& stream, - shared_ptr[CRecordBatchStreamReader]* out) + CStatus Open(const InputStream* stream, + shared_ptr[CRecordBatchReader]* out) @staticmethod CStatus Open2" Open"(unique_ptr[CMessageReader] message_reader, @@ -643,22 +643,22 @@ cdef extern from "arrow/ipc/api.h" namespace "arrow::ipc" nogil: " arrow::ipc::RecordBatchStreamWriter"(CRecordBatchWriter): @staticmethod CStatus Open(OutputStream* sink, const shared_ptr[CSchema]& schema, - shared_ptr[CRecordBatchStreamWriter]* out) + shared_ptr[CRecordBatchWriter]* out) cdef cppclass CRecordBatchFileWriter \ " arrow::ipc::RecordBatchFileWriter"(CRecordBatchWriter): @staticmethod CStatus Open(OutputStream* sink, const shared_ptr[CSchema]& schema, - shared_ptr[CRecordBatchFileWriter]* out) + shared_ptr[CRecordBatchWriter]* out) cdef cppclass CRecordBatchFileReader \ " arrow::ipc::RecordBatchFileReader": @staticmethod - CStatus Open(const shared_ptr[RandomAccessFile]& file, + CStatus Open(RandomAccessFile* file, shared_ptr[CRecordBatchFileReader]* out) @staticmethod - CStatus Open2" Open"(const shared_ptr[RandomAccessFile]& file, + CStatus Open2" Open"(RandomAccessFile* file, int64_t footer_offset, shared_ptr[CRecordBatchFileReader]* out) @@ -684,6 +684,19 @@ cdef extern from "arrow/ipc/api.h" namespace "arrow::ipc" nogil: const shared_ptr[CSchema]& schema, shared_ptr[CRecordBatch]* out) + CStatus SerializeSchema(const CSchema& schema, CMemoryPool* pool, + shared_ptr[CBuffer]* out) + + CStatus SerializeRecordBatch(const CRecordBatch& schema, + CMemoryPool* pool, + shared_ptr[CBuffer]* out) + + CStatus ReadSchema(InputStream* stream, shared_ptr[CSchema]* out) + + CStatus ReadRecordBatch(const shared_ptr[CSchema]& schema, + InputStream* stream, + shared_ptr[CRecordBatch]* out) + cdef extern from "arrow/ipc/feather.h" namespace "arrow::ipc::feather" nogil: @@ -792,7 +805,7 @@ cdef extern from "arrow/python/api.h" namespace 'arrow::py' nogil: CStatus DeserializeObject(const CSerializedPyObject& obj, PyObject* base, PyObject** out) - CStatus ReadSerializedObject(shared_ptr[RandomAccessFile] src, + CStatus ReadSerializedObject(RandomAccessFile* src, CSerializedPyObject* out) void set_serialization_callbacks(object serialize_callback, http://git-wip-us.apache.org/repos/asf/arrow/blob/f50f2eac/python/pyarrow/ipc.pxi ---------------------------------------------------------------------- diff --git a/python/pyarrow/ipc.pxi b/python/pyarrow/ipc.pxi index ceed4b0..027a00d 100644 --- a/python/pyarrow/ipc.pxi +++ b/python/pyarrow/ipc.pxi @@ -70,7 +70,7 @@ cdef class Message: def serialize(self, memory_pool=None): """ - Write message to Buffer with length-prefixed metadata, then body + Write message as encapsulated IPC message Parameters ---------- @@ -166,18 +166,13 @@ cdef class _RecordBatchWriter: pass def _open(self, sink, Schema schema): - cdef: - shared_ptr[CRecordBatchStreamWriter] writer - get_writer(sink, &self.sink) with nogil: check_status( CRecordBatchStreamWriter.Open(self.sink.get(), schema.sp_schema, - &writer)) - - self.writer = <shared_ptr[CRecordBatchWriter]> writer + &self.writer)) self.closed = False def write_batch(self, RecordBatch batch): @@ -215,6 +210,7 @@ cdef get_input_stream(object source, shared_ptr[InputStream]* out): cdef class _RecordBatchReader: cdef: shared_ptr[CRecordBatchReader] reader + shared_ptr[InputStream] in_stream cdef readonly: Schema schema @@ -223,16 +219,11 @@ cdef class _RecordBatchReader: pass def _open(self, source): - cdef: - shared_ptr[InputStream] in_stream - shared_ptr[CRecordBatchStreamReader] reader - - get_input_stream(source, &in_stream) - + get_input_stream(source, &self.in_stream) with nogil: - check_status(CRecordBatchStreamReader.Open(in_stream, &reader)) + check_status(CRecordBatchStreamReader.Open( + self.in_stream.get(), &self.reader)) - self.reader = <shared_ptr[CRecordBatchReader]> reader self.schema = Schema() self.schema.init_schema(self.reader.get().schema()) @@ -285,22 +276,20 @@ cdef class _RecordBatchReader: cdef class _RecordBatchFileWriter(_RecordBatchWriter): def _open(self, sink, Schema schema): - cdef shared_ptr[CRecordBatchFileWriter] writer get_writer(sink, &self.sink) with nogil: check_status( CRecordBatchFileWriter.Open(self.sink.get(), schema.sp_schema, - &writer)) + &self.writer)) - # Cast to base class, because has same interface - self.writer = <shared_ptr[CRecordBatchWriter]> writer self.closed = False cdef class _RecordBatchFileReader: cdef: shared_ptr[CRecordBatchFileReader] reader + shared_ptr[RandomAccessFile] file cdef readonly: Schema schema @@ -309,8 +298,7 @@ cdef class _RecordBatchFileReader: pass def _open(self, source, footer_offset=None): - cdef shared_ptr[RandomAccessFile] reader - get_reader(source, &reader) + get_reader(source, &self.file) cdef int64_t offset = 0 if footer_offset is not None: @@ -318,10 +306,12 @@ cdef class _RecordBatchFileReader: with nogil: if offset != 0: - check_status(CRecordBatchFileReader.Open2( - reader, offset, &self.reader)) + check_status( + CRecordBatchFileReader.Open2(self.file.get(), offset, + &self.reader)) else: - check_status(CRecordBatchFileReader.Open(reader, &self.reader)) + check_status( + CRecordBatchFileReader.Open(self.file.get(), &self.reader)) self.schema = pyarrow_wrap_schema(self.reader.get().schema()) @@ -476,24 +466,57 @@ def read_message(source): return result -def read_record_batch(Message batch_message, Schema schema): +def read_schema(obj): + """ + Read Schema from message or buffer + + Parameters + ---------- + obj : buffer or Message + + Returns + ------- + schema : Schema + """ + cdef: + shared_ptr[CSchema] result + shared_ptr[RandomAccessFile] cpp_file + + if isinstance(obj, Message): + raise NotImplementedError(type(obj)) + + get_reader(obj, &cpp_file) + + with nogil: + check_status(ReadSchema(cpp_file.get(), &result)) + + return pyarrow_wrap_schema(result) + + +def read_record_batch(obj, Schema schema): """ Read RecordBatch from message, given a known schema Parameters ---------- - batch_message : Message - Such as that obtained from read_message + obj : Message or Buffer-like schema : Schema Returns ------- batch : RecordBatch """ - cdef shared_ptr[CRecordBatch] result + cdef: + shared_ptr[CRecordBatch] result + Message message + + if isinstance(obj, Message): + message = obj + else: + message = read_message(obj) with nogil: - check_status(ReadRecordBatch(deref(batch_message.message.get()), + check_status(ReadRecordBatch(deref(message.message.get()), schema.sp_schema, &result)) return pyarrow_wrap_batch(result) http://git-wip-us.apache.org/repos/asf/arrow/blob/f50f2eac/python/pyarrow/ipc.py ---------------------------------------------------------------------- diff --git a/python/pyarrow/ipc.py b/python/pyarrow/ipc.py index d527722..6eb4979 100644 --- a/python/pyarrow/ipc.py +++ b/python/pyarrow/ipc.py @@ -20,7 +20,7 @@ import pyarrow as pa from pyarrow.lib import (Message, MessageReader, # noqa - read_message, read_record_batch, + read_message, read_record_batch, read_schema, read_tensor, write_tensor, get_record_batch_size, get_tensor_size) import pyarrow.lib as lib http://git-wip-us.apache.org/repos/asf/arrow/blob/f50f2eac/python/pyarrow/serialization.pxi ---------------------------------------------------------------------- diff --git a/python/pyarrow/serialization.pxi b/python/pyarrow/serialization.pxi index a6c955b..3ee34ee 100644 --- a/python/pyarrow/serialization.pxi +++ b/python/pyarrow/serialization.pxi @@ -237,7 +237,7 @@ def read_serialized(source, base=None): cdef SerializedPyObject serialized = SerializedPyObject() serialized.base = base with nogil: - check_status(ReadSerializedObject(stream, &serialized.data)) + check_status(ReadSerializedObject(stream.get(), &serialized.data)) return serialized http://git-wip-us.apache.org/repos/asf/arrow/blob/f50f2eac/python/pyarrow/table.pxi ---------------------------------------------------------------------- diff --git a/python/pyarrow/table.pxi b/python/pyarrow/table.pxi index 976f429..dd3359e 100644 --- a/python/pyarrow/table.pxi +++ b/python/pyarrow/table.pxi @@ -453,6 +453,28 @@ cdef class RecordBatch: else: return self.column(key) + def serialize(self, memory_pool=None): + """ + Write RecordBatch to Buffer as encapsulated IPC message + + Parameters + ---------- + memory_pool : MemoryPool, default None + Uses default memory pool if not specified + + Returns + ------- + serialized : Buffer + """ + cdef: + shared_ptr[CBuffer] buffer + CMemoryPool* pool = maybe_unbox_memory_pool(memory_pool) + + with nogil: + check_status(SerializeRecordBatch(deref(self.batch), + pool, &buffer)) + return pyarrow_wrap_buffer(buffer) + def slice(self, offset=0, length=None): """ Compute zero-copy slice of this RecordBatch http://git-wip-us.apache.org/repos/asf/arrow/blob/f50f2eac/python/pyarrow/tests/test_ipc.py ---------------------------------------------------------------------- diff --git a/python/pyarrow/tests/test_ipc.py b/python/pyarrow/tests/test_ipc.py index 120a982..ecdbe62 100644 --- a/python/pyarrow/tests/test_ipc.py +++ b/python/pyarrow/tests/test_ipc.py @@ -384,6 +384,21 @@ def test_pandas_serialize_round_trip_not_string_columns(): assert_frame_equal(result, df) +def test_schema_batch_serialize_methods(): + nrows = 5 + df = pd.DataFrame({ + 'one': np.random.randn(nrows), + 'two': ['foo', np.nan, 'bar', 'bazbaz', 'qux']}) + batch = pa.RecordBatch.from_pandas(df) + + s_schema = batch.schema.serialize() + s_batch = batch.serialize() + + recons_schema = pa.read_schema(s_schema) + recons_batch = pa.read_record_batch(s_batch, recons_schema) + assert recons_batch.equals(batch) + + def write_file(batch, sink): writer = pa.RecordBatchFileWriter(sink, batch.schema) writer.write_batch(batch) http://git-wip-us.apache.org/repos/asf/arrow/blob/f50f2eac/python/pyarrow/tests/test_table.py ---------------------------------------------------------------------- diff --git a/python/pyarrow/tests/test_table.py b/python/pyarrow/tests/test_table.py index 28b98f0..424e518 100644 --- a/python/pyarrow/tests/test_table.py +++ b/python/pyarrow/tests/test_table.py @@ -67,6 +67,8 @@ def test_recordbatch_basics(): batch = pa.RecordBatch.from_arrays(data, ['c0', 'c1']) + batch.schema.metadata + assert len(batch) == 5 assert batch.num_rows == 5 assert batch.num_columns == len(data) @@ -80,6 +82,16 @@ def test_recordbatch_basics(): batch[2] +def test_recordbatch_empty_metadata(): + data = [ + pa.array(range(5)), + pa.array([-10, -5, 0, 5, 10]) + ] + + batch = pa.RecordBatch.from_arrays(data, ['c0', 'c1']) + assert batch.schema.metadata is None + + def test_recordbatch_slice_getitem(): data = [ pa.array(range(5)), http://git-wip-us.apache.org/repos/asf/arrow/blob/f50f2eac/python/pyarrow/types.pxi ---------------------------------------------------------------------- diff --git a/python/pyarrow/types.pxi b/python/pyarrow/types.pxi index 592db4f..30c3aa6 100644 --- a/python/pyarrow/types.pxi +++ b/python/pyarrow/types.pxi @@ -235,7 +235,9 @@ cdef class Field: def __get__(self): self._check_null() - return box_metadata(self.field.metadata().get()) + cdef shared_ptr[const CKeyValueMetadata] metadata = ( + self.field.metadata()) + return box_metadata(metadata.get()) def _check_null(self): if self.field == NULL: @@ -306,6 +308,11 @@ cdef class Schema: return result + def _check_null(self): + if self.schema == NULL: + raise ReferenceError( + 'Schema not initialized (references NULL pointer)') + cdef void init(self, const vector[shared_ptr[CField]]& fields): self.schema = new CSchema(fields) self.sp_schema.reset(self.schema) @@ -327,7 +334,10 @@ cdef class Schema: property metadata: def __get__(self): - return box_metadata(self.schema.metadata().get()) + self._check_null() + cdef shared_ptr[const CKeyValueMetadata] metadata = ( + self.schema.metadata()) + return box_metadata(metadata.get()) def equals(self, other): """ @@ -377,6 +387,28 @@ cdef class Schema: return pyarrow_wrap_schema(new_schema) + def serialize(self, memory_pool=None): + """ + Write Schema to Buffer as encapsulated IPC message + + Parameters + ---------- + memory_pool : MemoryPool, default None + Uses default memory pool if not specified + + Returns + ------- + serialized : Buffer + """ + cdef: + shared_ptr[CBuffer] buffer + CMemoryPool* pool = maybe_unbox_memory_pool(memory_pool) + + with nogil: + check_status(SerializeSchema(deref(self.schema), + pool, &buffer)) + return pyarrow_wrap_buffer(buffer) + def remove_metadata(self): """ Create new schema without metadata, if any