[
https://issues.apache.org/jira/browse/ARROW-1783?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16267572#comment-16267572
]
ASF GitHub Bot commented on ARROW-1783:
---------------------------------------
wesm closed pull request #1362: ARROW-1783: [Python] Provide a "component" dict
representation of a serialized Python object with minimal allocation
URL: https://github.com/apache/arrow/pull/1362
This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:
As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):
diff --git a/cpp/src/arrow/io/memory.cc b/cpp/src/arrow/io/memory.cc
index d9c84b495..d52b6dc5d 100644
--- a/cpp/src/arrow/io/memory.cc
+++ b/cpp/src/arrow/io/memory.cc
@@ -240,6 +240,9 @@ BufferReader::BufferReader(const std::shared_ptr<Buffer>&
buffer)
BufferReader::BufferReader(const uint8_t* data, int64_t size)
: buffer_(nullptr), data_(data), size_(size), position_(0) {}
+BufferReader::BufferReader(const Buffer& buffer)
+ : BufferReader(buffer.data(), buffer.size()) {}
+
Status BufferReader::Close() {
// no-op
return Status::OK();
diff --git a/cpp/src/arrow/io/memory.h b/cpp/src/arrow/io/memory.h
index 3aec91f72..51471a25a 100644
--- a/cpp/src/arrow/io/memory.h
+++ b/cpp/src/arrow/io/memory.h
@@ -107,6 +107,7 @@ class ARROW_EXPORT FixedSizeBufferWriter : public
WriteableFile {
class ARROW_EXPORT BufferReader : public RandomAccessFile {
public:
explicit BufferReader(const std::shared_ptr<Buffer>& buffer);
+ explicit BufferReader(const Buffer& buffer);
BufferReader(const uint8_t* data, int64_t size);
Status Close() override;
diff --git a/cpp/src/arrow/ipc/message.cc b/cpp/src/arrow/ipc/message.cc
index 21d6a69a2..1835cefde 100644
--- a/cpp/src/arrow/ipc/message.cc
+++ b/cpp/src/arrow/ipc/message.cc
@@ -236,11 +236,35 @@ Status ReadMessage(io::InputStream* file,
std::unique_ptr<Message>* message) {
// ----------------------------------------------------------------------
// Implement InputStream message reader
-Status InputStreamMessageReader::ReadNextMessage(std::unique_ptr<Message>*
message) {
- return ReadMessage(stream_, message);
+/// \brief Implementation of MessageReader that reads from InputStream
+class InputStreamMessageReader : public MessageReader {
+ public:
+ explicit InputStreamMessageReader(io::InputStream* stream) : stream_(stream)
{}
+
+ explicit InputStreamMessageReader(const std::shared_ptr<io::InputStream>&
owned_stream)
+ : InputStreamMessageReader(owned_stream.get()) {
+ owned_stream_ = owned_stream;
+ }
+
+ ~InputStreamMessageReader() {}
+
+ Status ReadNextMessage(std::unique_ptr<Message>* message) {
+ return ReadMessage(stream_, message);
+ }
+
+ private:
+ io::InputStream* stream_;
+ std::shared_ptr<io::InputStream> owned_stream_;
+};
+
+std::unique_ptr<MessageReader> MessageReader::Open(io::InputStream* stream) {
+ return std::unique_ptr<MessageReader>(new InputStreamMessageReader(stream));
}
-InputStreamMessageReader::~InputStreamMessageReader() {}
+std::unique_ptr<MessageReader> MessageReader::Open(
+ const std::shared_ptr<io::InputStream>& owned_stream) {
+ return std::unique_ptr<MessageReader>(new
InputStreamMessageReader(owned_stream));
+}
} // namespace ipc
} // namespace arrow
diff --git a/cpp/src/arrow/ipc/message.h b/cpp/src/arrow/ipc/message.h
index 495474e50..159b39a81 100644
--- a/cpp/src/arrow/ipc/message.h
+++ b/cpp/src/arrow/ipc/message.h
@@ -144,6 +144,13 @@ class ARROW_EXPORT MessageReader {
public:
virtual ~MessageReader() = default;
+ /// \brief Create MessageReader that reads from InputStream
+ static std::unique_ptr<MessageReader> Open(io::InputStream* stream);
+
+ /// \brief Create MessageReader that reads from owned InputStream
+ static std::unique_ptr<MessageReader> Open(
+ const std::shared_ptr<io::InputStream>& owned_stream);
+
/// \brief Read next Message from the interface
///
/// \param[out] message an arrow::ipc::Message instance
@@ -151,26 +158,6 @@ class ARROW_EXPORT MessageReader {
virtual Status ReadNextMessage(std::unique_ptr<Message>* message) = 0;
};
-/// \brief Implementation of MessageReader that reads from InputStream
-/// \since 0.5.0
-class ARROW_EXPORT InputStreamMessageReader : public MessageReader {
- public:
- explicit InputStreamMessageReader(io::InputStream* stream) : stream_(stream)
{}
-
- explicit InputStreamMessageReader(const std::shared_ptr<io::InputStream>&
owned_stream)
- : InputStreamMessageReader(owned_stream.get()) {
- owned_stream_ = owned_stream;
- }
-
- ~InputStreamMessageReader();
-
- Status ReadNextMessage(std::unique_ptr<Message>* message) override;
-
- private:
- io::InputStream* stream_;
- std::shared_ptr<io::InputStream> owned_stream_;
-};
-
/// \brief Read encapulated RPC message from position in file
///
/// Read a length-prefixed message flatbuffer starting at the indicated file
diff --git a/cpp/src/arrow/ipc/reader.cc b/cpp/src/arrow/ipc/reader.cc
index 5960e8188..ae0f8f398 100644
--- a/cpp/src/arrow/ipc/reader.cc
+++ b/cpp/src/arrow/ipc/reader.cc
@@ -480,14 +480,12 @@ Status
RecordBatchStreamReader::Open(std::unique_ptr<MessageReader> message_read
Status RecordBatchStreamReader::Open(io::InputStream* stream,
std::shared_ptr<RecordBatchReader>* out) {
- std::unique_ptr<MessageReader> message_reader(new
InputStreamMessageReader(stream));
- return Open(std::move(message_reader), out);
+ return Open(MessageReader::Open(stream), out);
}
Status RecordBatchStreamReader::Open(const std::shared_ptr<io::InputStream>&
stream,
std::shared_ptr<RecordBatchReader>* out) {
- std::unique_ptr<MessageReader> message_reader(new
InputStreamMessageReader(stream));
- return Open(std::move(message_reader), out);
+ return Open(MessageReader::Open(stream), out);
}
std::shared_ptr<Schema> RecordBatchStreamReader::schema() const {
@@ -717,14 +715,17 @@ Status ReadTensor(int64_t offset, io::RandomAccessFile*
file,
std::unique_ptr<Message> message;
RETURN_NOT_OK(ReadContiguousPayload(file, &message));
+ return ReadTensor(*message, out);
+}
+Status ReadTensor(const Message& message, std::shared_ptr<Tensor>* out) {
std::shared_ptr<DataType> type;
std::vector<int64_t> shape;
std::vector<int64_t> strides;
std::vector<std::string> dim_names;
- RETURN_NOT_OK(internal::GetTensorMetadata(*message->metadata(), &type,
&shape, &strides,
+ RETURN_NOT_OK(internal::GetTensorMetadata(*message.metadata(), &type,
&shape, &strides,
&dim_names));
- *out = std::make_shared<Tensor>(type, message->body(), shape, strides,
dim_names);
+ *out = std::make_shared<Tensor>(type, message.body(), shape, strides,
dim_names);
return Status::OK();
}
diff --git a/cpp/src/arrow/ipc/reader.h b/cpp/src/arrow/ipc/reader.h
index 627f67e25..019c9bc1f 100644
--- a/cpp/src/arrow/ipc/reader.h
+++ b/cpp/src/arrow/ipc/reader.h
@@ -219,7 +219,7 @@ Status ReadRecordBatch(const Buffer& metadata, const
std::shared_ptr<Schema>& sc
int max_recursion_depth, io::RandomAccessFile* file,
std::shared_ptr<RecordBatch>* out);
-/// EXPERIMENTAL: Read arrow::Tensor as encapsulated IPC message in file
+/// \brief EXPERIMENTAL: Read arrow::Tensor as encapsulated IPC message in file
///
/// \param[in] offset the file location of the start of the message
/// \param[in] file the file where the batch is located
@@ -229,6 +229,14 @@ ARROW_EXPORT
Status ReadTensor(int64_t offset, io::RandomAccessFile* file,
std::shared_ptr<Tensor>* out);
+/// \brief EXPERIMENTAL: Read arrow::Tensor from IPC message
+///
+/// \param[in] message a Message containing the tensor metadata and body
+/// \param[out] out the read tensor
+/// \return Status
+ARROW_EXPORT
+Status ReadTensor(const Message& message, std::shared_ptr<Tensor>* out);
+
} // namespace ipc
} // namespace arrow
diff --git a/cpp/src/arrow/ipc/writer.cc b/cpp/src/arrow/ipc/writer.cc
index 3c1db0615..3aacabaef 100644
--- a/cpp/src/arrow/ipc/writer.cc
+++ b/cpp/src/arrow/ipc/writer.cc
@@ -560,9 +560,18 @@ Status WriteLargeRecordBatch(const RecordBatch& batch,
int64_t buffer_start_offs
pool, kMaxNestingDepth, true);
}
-static Status WriteStridedTensorData(int dim_index, int64_t offset, int
elem_size,
- const Tensor& tensor, uint8_t*
scratch_space,
- io::OutputStream* dst) {
+namespace {
+
+Status WriteTensorHeader(const Tensor& tensor, io::OutputStream* dst,
+ int32_t* metadata_length, int64_t* body_length) {
+ std::shared_ptr<Buffer> metadata;
+ RETURN_NOT_OK(internal::WriteTensorMessage(tensor, 0, &metadata));
+ return internal::WriteMessage(*metadata, dst, metadata_length);
+}
+
+Status WriteStridedTensorData(int dim_index, int64_t offset, int elem_size,
+ const Tensor& tensor, uint8_t* scratch_space,
+ io::OutputStream* dst) {
if (dim_index == tensor.ndim() - 1) {
const uint8_t* data_ptr = tensor.raw_data() + offset;
const int64_t stride = tensor.strides()[dim_index];
@@ -580,16 +589,37 @@ static Status WriteStridedTensorData(int dim_index,
int64_t offset, int elem_siz
return Status::OK();
}
-Status WriteTensorHeader(const Tensor& tensor, io::OutputStream* dst,
- int32_t* metadata_length, int64_t* body_length) {
- RETURN_NOT_OK(AlignStreamPosition(dst));
- std::shared_ptr<Buffer> metadata;
- RETURN_NOT_OK(internal::WriteTensorMessage(tensor, 0, &metadata));
- return internal::WriteMessage(*metadata, dst, metadata_length);
+Status GetContiguousTensor(const Tensor& tensor, MemoryPool* pool,
+ std::unique_ptr<Tensor>* out) {
+ const auto& type = static_cast<const FixedWidthType&>(*tensor.type());
+ const int elem_size = type.bit_width() / 8;
+
+ // TODO(wesm): Do we care enough about this temporary allocation to pass in
+ // a MemoryPool to this function?
+ std::shared_ptr<Buffer> scratch_space;
+ RETURN_NOT_OK(AllocateBuffer(default_memory_pool(),
+ tensor.shape()[tensor.ndim() - 1] * elem_size,
+ &scratch_space));
+
+ std::shared_ptr<ResizableBuffer> contiguous_data;
+ RETURN_NOT_OK(
+ AllocateResizableBuffer(pool, tensor.size() * elem_size,
&contiguous_data));
+
+ io::BufferOutputStream stream(contiguous_data);
+ RETURN_NOT_OK(WriteStridedTensorData(0, 0, elem_size, tensor,
+ scratch_space->mutable_data(),
&stream));
+
+ out->reset(new Tensor(tensor.type(), contiguous_data, tensor.shape()));
+
+ return Status::OK();
}
+} // namespace
+
Status WriteTensor(const Tensor& tensor, io::OutputStream* dst, int32_t*
metadata_length,
int64_t* body_length) {
+ RETURN_NOT_OK(AlignStreamPosition(dst));
+
if (tensor.is_contiguous()) {
RETURN_NOT_OK(WriteTensorHeader(tensor, dst, metadata_length,
body_length));
auto data = tensor.data();
@@ -619,6 +649,22 @@ Status WriteTensor(const Tensor& tensor, io::OutputStream*
dst, int32_t* metadat
}
}
+Status GetTensorMessage(const Tensor& tensor, MemoryPool* pool,
+ std::unique_ptr<Message>* out) {
+ const Tensor* tensor_to_write = &tensor;
+ std::unique_ptr<Tensor> temp_tensor;
+
+ if (!tensor.is_contiguous()) {
+ RETURN_NOT_OK(GetContiguousTensor(tensor, pool, &temp_tensor));
+ tensor_to_write = temp_tensor.get();
+ }
+
+ std::shared_ptr<Buffer> metadata;
+ RETURN_NOT_OK(internal::WriteTensorMessage(*tensor_to_write, 0, &metadata));
+ out->reset(new Message(metadata, tensor_to_write->data()));
+ return Status::OK();
+}
+
Status WriteDictionary(int64_t dictionary_id, const std::shared_ptr<Array>&
dictionary,
int64_t buffer_start_offset, io::OutputStream* dst,
int32_t* metadata_length, int64_t* body_length,
MemoryPool* pool) {
diff --git a/cpp/src/arrow/ipc/writer.h b/cpp/src/arrow/ipc/writer.h
index cedac45e7..11c01db8b 100644
--- a/cpp/src/arrow/ipc/writer.h
+++ b/cpp/src/arrow/ipc/writer.h
@@ -239,6 +239,17 @@ Status GetRecordBatchSize(const RecordBatch& batch,
int64_t* size);
ARROW_EXPORT
Status GetTensorSize(const Tensor& tensor, int64_t* size);
+/// \brief EXPERIMENTAL: Convert arrow::Tensor to a Message with minimal memory
+/// allocation
+///
+/// \param[in] tensor the Tensor to write
+/// \param[in] pool MemoryPool to allocate space for metadata
+/// \param[out] out the resulting Message
+/// \return Status
+ARROW_EXPORT
+Status GetTensorMessage(const Tensor& tensor, MemoryPool* pool,
+ std::unique_ptr<Message>* out);
+
/// \brief EXPERIMENTAL: Write arrow::Tensor as a contiguous message
///
/// \param[in] tensor the Tensor to write
diff --git a/cpp/src/arrow/python/arrow_to_pandas.cc
b/cpp/src/arrow/python/arrow_to_pandas.cc
index 8814fc190..096bbd55c 100644
--- a/cpp/src/arrow/python/arrow_to_pandas.cc
+++ b/cpp/src/arrow/python/arrow_to_pandas.cc
@@ -480,7 +480,7 @@ inline Status ConvertStruct(PandasOptions options, const
ChunkedArray& data,
Py_INCREF(Py_None);
field_value.reset(Py_None);
}
- // PyDict_SetItemString does not steal the value reference
+ // PyDict_SetItemString increments reference count
auto setitem_result =
PyDict_SetItemString(dict_item.obj(), name.c_str(),
field_value.obj());
RETURN_IF_PYERROR();
diff --git a/cpp/src/arrow/python/arrow_to_python.cc
b/cpp/src/arrow/python/arrow_to_python.cc
index 9686050b9..ce539a597 100644
--- a/cpp/src/arrow/python/arrow_to_python.cc
+++ b/cpp/src/arrow/python/arrow_to_python.cc
@@ -29,15 +29,17 @@
#include "arrow/array.h"
#include "arrow/io/interfaces.h"
+#include "arrow/io/memory.h"
#include "arrow/ipc/reader.h"
+#include "arrow/table.h"
+#include "arrow/util/logging.h"
+
#include "arrow/python/common.h"
#include "arrow/python/helpers.h"
#include "arrow/python/numpy_convert.h"
#include "arrow/python/pyarrow.h"
#include "arrow/python/python_to_arrow.h"
#include "arrow/python/util/datetime.h"
-#include "arrow/table.h"
-#include "arrow/util/logging.h"
namespace arrow {
namespace py {
@@ -286,5 +288,59 @@ Status DeserializeObject(PyObject* context, const
SerializedPyObject& obj, PyObj
obj, out);
}
+Status GetSerializedFromComponents(int num_tensors, int num_buffers, PyObject*
data,
+ SerializedPyObject* out) {
+ PyAcquireGIL gil;
+ const Py_ssize_t data_length = PyList_Size(data);
+ RETURN_IF_PYERROR();
+
+ const Py_ssize_t expected_data_length = 1 + num_tensors * 2 + num_buffers;
+ if (data_length != expected_data_length) {
+ return Status::Invalid("Invalid number of buffers in data");
+ }
+
+ auto GetBuffer = [&data](Py_ssize_t index, std::shared_ptr<Buffer>* out) {
+ PyObject* py_buf = PyList_GET_ITEM(data, index);
+ return unwrap_buffer(py_buf, out);
+ };
+
+ Py_ssize_t buffer_index = 0;
+
+ // Read the union batch describing object structure
+ {
+ std::shared_ptr<Buffer> data_buffer;
+ RETURN_NOT_OK(GetBuffer(buffer_index++, &data_buffer));
+ gil.release();
+ io::BufferReader buf_reader(data_buffer);
+ std::shared_ptr<RecordBatchReader> reader;
+ RETURN_NOT_OK(ipc::RecordBatchStreamReader::Open(&buf_reader, &reader));
+ RETURN_NOT_OK(reader->ReadNext(&out->batch));
+ gil.acquire();
+ }
+
+ // Zero-copy reconstruct tensors
+ for (int i = 0; i < num_tensors; ++i) {
+ std::shared_ptr<Buffer> metadata;
+ std::shared_ptr<Buffer> body;
+ std::shared_ptr<Tensor> tensor;
+ RETURN_NOT_OK(GetBuffer(buffer_index++, &metadata));
+ RETURN_NOT_OK(GetBuffer(buffer_index++, &body));
+
+ ipc::Message message(metadata, body);
+
+ RETURN_NOT_OK(ReadTensor(message, &tensor));
+ out->tensors.emplace_back(std::move(tensor));
+ }
+
+ // Unwrap and append buffers
+ for (int i = 0; i < num_buffers; ++i) {
+ std::shared_ptr<Buffer> buffer;
+ RETURN_NOT_OK(GetBuffer(buffer_index++, &buffer));
+ out->buffers.emplace_back(std::move(buffer));
+ }
+
+ return Status::OK();
+}
+
} // namespace py
} // namespace arrow
diff --git a/cpp/src/arrow/python/arrow_to_python.h
b/cpp/src/arrow/python/arrow_to_python.h
index 7509f30eb..9440ffb32 100644
--- a/cpp/src/arrow/python/arrow_to_python.h
+++ b/cpp/src/arrow/python/arrow_to_python.h
@@ -48,6 +48,19 @@ namespace py {
ARROW_EXPORT
Status ReadSerializedObject(io::RandomAccessFile* src, SerializedPyObject*
out);
+/// \brief Reconstruct SerializedPyObject from representation produced by
+/// SerializedPyObject::GetComponents.
+///
+/// \param[in] num_tensors
+/// \param[in] num_buffers
+/// \param[in] data a list containing pyarrow.Buffer instances. Must be 1 +
+/// num_tensors * 2 + num_buffers in length
+/// \param[out] out the reconstructed object
+/// \return Status
+ARROW_EXPORT
+Status GetSerializedFromComponents(int num_tensors, int num_buffers, PyObject*
data,
+ SerializedPyObject* out);
+
/// \brief Reconstruct Python object from Arrow-serialized representation
/// \param[in] context Serialization context which contains custom
serialization
/// and deserialization callbacks. Can be any Python object with a
diff --git a/cpp/src/arrow/python/python_to_arrow.cc
b/cpp/src/arrow/python/python_to_arrow.cc
index 72cc5b6e1..253e9d9a7 100644
--- a/cpp/src/arrow/python/python_to_arrow.cc
+++ b/cpp/src/arrow/python/python_to_arrow.cc
@@ -31,7 +31,9 @@
#include "arrow/array.h"
#include "arrow/builder.h"
#include "arrow/io/interfaces.h"
+#include "arrow/io/memory.h"
#include "arrow/ipc/writer.h"
+#include "arrow/memory_pool.h"
#include "arrow/record_batch.h"
#include "arrow/tensor.h"
#include "arrow/util/logging.h"
@@ -710,27 +712,89 @@ Status SerializeObject(PyObject* context, PyObject*
sequence, SerializedPyObject
return Status::OK();
}
-Status WriteSerializedObject(const SerializedPyObject& obj, io::OutputStream*
dst) {
- int32_t num_tensors = static_cast<int32_t>(obj.tensors.size());
- int32_t num_buffers = static_cast<int32_t>(obj.buffers.size());
- RETURN_NOT_OK(dst->Write(reinterpret_cast<uint8_t*>(&num_tensors),
sizeof(int32_t)));
- RETURN_NOT_OK(dst->Write(reinterpret_cast<uint8_t*>(&num_buffers),
sizeof(int32_t)));
- RETURN_NOT_OK(ipc::WriteRecordBatchStream({obj.batch}, dst));
+Status SerializedPyObject::WriteTo(io::OutputStream* dst) {
+ int32_t num_tensors = static_cast<int32_t>(this->tensors.size());
+ int32_t num_buffers = static_cast<int32_t>(this->buffers.size());
+ RETURN_NOT_OK(
+ dst->Write(reinterpret_cast<const uint8_t*>(&num_tensors),
sizeof(int32_t)));
+ RETURN_NOT_OK(
+ dst->Write(reinterpret_cast<const uint8_t*>(&num_buffers),
sizeof(int32_t)));
+ RETURN_NOT_OK(ipc::WriteRecordBatchStream({this->batch}, dst));
int32_t metadata_length;
int64_t body_length;
- for (const auto& tensor : obj.tensors) {
+ for (const auto& tensor : this->tensors) {
RETURN_NOT_OK(ipc::WriteTensor(*tensor, dst, &metadata_length,
&body_length));
}
- for (const auto& buffer : obj.buffers) {
+ for (const auto& buffer : this->buffers) {
int64_t size = buffer->size();
- RETURN_NOT_OK(dst->Write(reinterpret_cast<uint8_t*>(&size),
sizeof(int64_t)));
+ RETURN_NOT_OK(dst->Write(reinterpret_cast<const uint8_t*>(&size),
sizeof(int64_t)));
RETURN_NOT_OK(dst->Write(buffer->data(), size));
}
return Status::OK();
}
+Status SerializedPyObject::GetComponents(MemoryPool* memory_pool, PyObject**
out) {
+ PyAcquireGIL py_gil;
+
+ ScopedRef result(PyDict_New());
+ PyObject* buffers = PyList_New(0);
+
+ // TODO(wesm): Not sure how pedantic we need to be about checking the return
+ // values of these functions. There are other places where we do not check
+ // PyDict_SetItem/SetItemString return value, but these failures would be
+ // quite esoteric
+ PyDict_SetItemString(result.get(), "num_tensors",
+ PyLong_FromSize_t(this->tensors.size()));
+ PyDict_SetItemString(result.get(), "num_buffers",
+ PyLong_FromSize_t(this->buffers.size()));
+ PyDict_SetItemString(result.get(), "data", buffers);
+ RETURN_IF_PYERROR();
+
+ Py_DECREF(buffers);
+
+ auto PushBuffer = [&buffers](const std::shared_ptr<Buffer>& buffer) {
+ PyObject* wrapped_buffer = wrap_buffer(buffer);
+ RETURN_IF_PYERROR();
+ if (PyList_Append(buffers, wrapped_buffer) < 0) {
+ Py_DECREF(wrapped_buffer);
+ RETURN_IF_PYERROR();
+ }
+ Py_DECREF(wrapped_buffer);
+ return Status::OK();
+ };
+
+ constexpr int64_t kInitialCapacity = 1024;
+
+ // Write the record batch describing the object structure
+ std::shared_ptr<io::BufferOutputStream> stream;
+ std::shared_ptr<Buffer> buffer;
+
+ py_gil.release();
+ RETURN_NOT_OK(io::BufferOutputStream::Create(kInitialCapacity, memory_pool,
&stream));
+ RETURN_NOT_OK(ipc::WriteRecordBatchStream({this->batch}, stream.get()));
+ RETURN_NOT_OK(stream->Finish(&buffer));
+ py_gil.acquire();
+
+ RETURN_NOT_OK(PushBuffer(buffer));
+
+ // For each tensor, get a metadata buffer and a buffer for the body
+ for (const auto& tensor : this->tensors) {
+ std::unique_ptr<ipc::Message> message;
+ RETURN_NOT_OK(ipc::GetTensorMessage(*tensor, memory_pool, &message));
+ RETURN_NOT_OK(PushBuffer(message->metadata()));
+ RETURN_NOT_OK(PushBuffer(message->body()));
+ }
+
+ for (const auto& buf : this->buffers) {
+ RETURN_NOT_OK(PushBuffer(buf));
+ }
+
+ *out = result.release();
+ return Status::OK();
+}
+
} // namespace py
} // namespace arrow
diff --git a/cpp/src/arrow/python/python_to_arrow.h
b/cpp/src/arrow/python/python_to_arrow.h
index c5b639614..ce7aefa0e 100644
--- a/cpp/src/arrow/python/python_to_arrow.h
+++ b/cpp/src/arrow/python/python_to_arrow.h
@@ -30,6 +30,7 @@
namespace arrow {
+class MemoryPool;
class RecordBatch;
class Tensor;
@@ -45,6 +46,26 @@ struct ARROW_EXPORT SerializedPyObject {
std::shared_ptr<RecordBatch> batch;
std::vector<std::shared_ptr<Tensor>> tensors;
std::vector<std::shared_ptr<Buffer>> buffers;
+
+ /// \brief Write serialized Python object to OutputStream
+ /// \param[in,out] dst an OutputStream
+ /// \return Status
+ Status WriteTo(io::OutputStream* dst);
+
+ /// \brief Convert SerializedPyObject to a dict containing the message
+ /// components as Buffer instances with minimal memory allocation
+ ///
+ /// {
+ /// 'num_tensors': N,
+ /// 'num_buffers': K,
+ /// 'data': [Buffer]
+ /// }
+ ///
+ /// Each tensor is written as two buffers, one for the metadata and one for
+ /// the body. Therefore, the number of buffers in 'data' is 2 * N + K + 1,
+ /// with the first buffer containing the serialized record batch containing
+ /// the UnionArray that describes the whole object
+ Status GetComponents(MemoryPool* pool, PyObject** out);
};
/// \brief Serialize Python sequence as a RecordBatch plus
@@ -62,13 +83,6 @@ struct ARROW_EXPORT SerializedPyObject {
ARROW_EXPORT
Status SerializeObject(PyObject* context, PyObject* sequence,
SerializedPyObject* out);
-/// \brief Write serialized Python object to OutputStream
-/// \param[in] object a serialized Python object to write out
-/// \param[out] dst an OutputStream
-/// \return Status
-ARROW_EXPORT
-Status WriteSerializedObject(const SerializedPyObject& object,
io::OutputStream* dst);
-
} // namespace py
} // namespace arrow
diff --git a/python/doc/source/api.rst b/python/doc/source/api.rst
index bb2a0420b..636f41d67 100644
--- a/python/doc/source/api.rst
+++ b/python/doc/source/api.rst
@@ -245,6 +245,7 @@ Serialization and IPC
serialize
serialize_to
deserialize
+ deserialize_components
deserialize_from
read_serialized
SerializedPyObject
diff --git a/python/pyarrow/__init__.py b/python/pyarrow/__init__.py
index 0456a658f..bd31b21c1 100644
--- a/python/pyarrow/__init__.py
+++ b/python/pyarrow/__init__.py
@@ -101,6 +101,7 @@
# Serialization
from pyarrow.lib import (deserialize_from, deserialize,
+ deserialize_components,
serialize, serialize_to, read_serialized,
SerializedPyObject, SerializationContext,
SerializationCallbackError,
diff --git a/python/pyarrow/includes/libarrow.pxd
b/python/pyarrow/includes/libarrow.pxd
index 5d68607ef..024d1475d 100644
--- a/python/pyarrow/includes/libarrow.pxd
+++ b/python/pyarrow/includes/libarrow.pxd
@@ -682,11 +682,10 @@ cdef extern from "arrow/ipc/api.h" namespace "arrow::ipc"
nogil:
c_string FormatMessageType(MessageType type)
cdef cppclass CMessageReader" arrow::ipc::MessageReader":
- CStatus ReadNextMessage(unique_ptr[CMessage]* out)
+ @staticmethod
+ unique_ptr[CMessageReader] Open(const shared_ptr[InputStream]& stream)
- cdef cppclass CInputStreamMessageReader \
- " arrow::ipc::InputStreamMessageReader":
- CInputStreamMessageReader(const shared_ptr[InputStream]& stream)
+ CStatus ReadNextMessage(unique_ptr[CMessage]* out)
cdef cppclass CRecordBatchWriter" arrow::ipc::RecordBatchWriter":
CStatus Close()
@@ -908,12 +907,12 @@ cdef extern from "arrow/python/api.h" namespace
'arrow::py' nogil:
shared_ptr[CRecordBatch] batch
vector[shared_ptr[CTensor]] tensors
+ CStatus WriteTo(OutputStream* dst)
+ CStatus GetComponents(CMemoryPool* pool, PyObject** dst)
+
CStatus SerializeObject(object context, object sequence,
CSerializedPyObject* out)
- CStatus WriteSerializedObject(const CSerializedPyObject& obj,
- OutputStream* dst)
-
CStatus DeserializeObject(object context,
const CSerializedPyObject& obj,
PyObject* base, PyObject** out)
@@ -921,6 +920,10 @@ cdef extern from "arrow/python/api.h" namespace
'arrow::py' nogil:
CStatus ReadSerializedObject(RandomAccessFile* src,
CSerializedPyObject* out)
+ CStatus GetSerializedFromComponents(int num_tensors, int num_buffers,
+ object buffers,
+ CSerializedPyObject* out)
+
cdef extern from 'arrow/python/init.h':
int arrow_init_numpy() except -1
diff --git a/python/pyarrow/ipc.pxi b/python/pyarrow/ipc.pxi
index 27e916775..b568cd46d 100644
--- a/python/pyarrow/ipc.pxi
+++ b/python/pyarrow/ipc.pxi
@@ -125,9 +125,11 @@ cdef class MessageReader:
def open_stream(source):
cdef MessageReader result = MessageReader()
cdef shared_ptr[InputStream] in_stream
+ cdef unique_ptr[CMessageReader] reader
get_input_stream(source, &in_stream)
with nogil:
- result.reader.reset(new CInputStreamMessageReader(in_stream))
+ reader = CMessageReader.Open(in_stream)
+ result.reader.reset(reader.release())
return result
diff --git a/python/pyarrow/public-api.pxi b/python/pyarrow/public-api.pxi
index 9776f2ad7..2fdb606a7 100644
--- a/python/pyarrow/public-api.pxi
+++ b/python/pyarrow/public-api.pxi
@@ -44,7 +44,7 @@ cdef public api object pyarrow_wrap_buffer(const
shared_ptr[CBuffer]& buf):
cdef public api object pyarrow_wrap_resizable_buffer(
- const shared_ptr[CResizableBuffer]& buf):
+ const shared_ptr[CResizableBuffer]& buf):
cdef ResizableBuffer result = ResizableBuffer()
result.init_rz(buf)
return result
diff --git a/python/pyarrow/serialization.pxi b/python/pyarrow/serialization.pxi
index 6b7227797..c8bd6daec 100644
--- a/python/pyarrow/serialization.pxi
+++ b/python/pyarrow/serialization.pxi
@@ -165,7 +165,7 @@ cdef class SerializedPyObject:
def __get__(self):
cdef CMockOutputStream mock_stream
with nogil:
- check_status(WriteSerializedObject(self.data, &mock_stream))
+ check_status(self.data.WriteTo(&mock_stream))
return mock_stream.GetExtentBytesWritten()
@@ -179,7 +179,7 @@ cdef class SerializedPyObject:
cdef _write_to(self, OutputStream* stream):
with nogil:
- check_status(WriteSerializedObject(self.data, stream))
+ check_status(self.data.WriteTo(stream))
def deserialize(self, SerializationContext context=None):
"""
@@ -209,6 +209,46 @@ cdef class SerializedPyObject:
self.write_to(sink)
return output
+ @staticmethod
+ def from_components(components):
+ """
+ Reconstruct SerializedPyObject from output of
+ SerializedPyObject.to_components
+ """
+ cdef:
+ int num_tensors = components['num_tensors']
+ int num_buffers = components['num_buffers']
+ list buffers = components['data']
+ SerializedPyObject result = SerializedPyObject()
+
+ with nogil:
+ check_status(GetSerializedFromComponents(num_tensors, num_buffers,
+ buffers, &result.data))
+
+ return result
+
+ def to_components(self, memory_pool=None):
+ """
+ Return the decomposed dict representation of the serialized object
+ containing a collection of Buffer objects which maximize opportunities
+ for zero-copy
+
+ Parameters
+ ----------
+ memory_pool : MemoryPool default None
+ Pool to use for necessary allocations
+
+ Returns
+
+ """
+ cdef PyObject* result
+ cdef CMemoryPool* c_pool = maybe_unbox_memory_pool(memory_pool)
+
+ with nogil:
+ check_status(self.data.GetComponents(c_pool, &result))
+
+ return PyObject_to_object(result)
+
def serialize(object value, SerializationContext context=None):
"""EXPERIMENTAL: Serialize a Python sequence
@@ -301,6 +341,24 @@ def deserialize_from(source, object base,
SerializationContext context=None):
return serialized.deserialize(context)
+def deserialize_components(components, SerializationContext context=None):
+ """
+ Reconstruct Python object from output of SerializedPyObject.to_components
+
+ Parameters
+ ----------
+ components : dict
+ Output of SerializedPyObject.to_components
+ context : SerializationContext, default None
+
+ Returns
+ -------
+ object : the Python object that was originally serialized
+ """
+ serialized = SerializedPyObject.from_components(components)
+ return serialized.deserialize(context)
+
+
def deserialize(obj, SerializationContext context=None):
"""
EXPERIMENTAL: Deserialize Python object from Buffer or other Python object
diff --git a/python/pyarrow/tests/test_serialization.py
b/python/pyarrow/tests/test_serialization.py
index b0c5bc49e..cda7a5565 100644
--- a/python/pyarrow/tests/test_serialization.py
+++ b/python/pyarrow/tests/test_serialization.py
@@ -216,6 +216,17 @@ def serialization_roundtrip(value, f):
result = pa.deserialize_from(f, None, serialization_context)
assert_equal(value, result)
+ _check_component_roundtrip(value)
+
+
+def _check_component_roundtrip(value):
+ # Test to/from components
+ serialized = pa.serialize(value)
+ components = serialized.to_components()
+ from_comp = pa.SerializedPyObject.from_components(components)
+ recons = from_comp.deserialize()
+ assert_equal(value, recons)
+
@pytest.yield_fixture(scope='session')
def large_memory_map(tmpdir_factory, size=100*1024*1024):
@@ -482,3 +493,25 @@ def test_serialize_subclasses():
deserialized = serialized.deserialize()
assert type(deserialized).__name__ == SerializableClass.__name__
assert deserialized.value == 3
+
+
+def test_serialize_to_components_invalid_cases():
+ buf = pa.frombuffer(b'hello')
+
+ components = {
+ 'num_tensors': 0,
+ 'num_buffers': 1,
+ 'data': [buf]
+ }
+
+ with pytest.raises(pa.ArrowException):
+ pa.deserialize_components(components)
+
+ components = {
+ 'num_tensors': 1,
+ 'num_buffers': 0,
+ 'data': [buf, buf]
+ }
+
+ with pytest.raises(pa.ArrowException):
+ pa.deserialize_components(components)
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]
> [Python] Convert SerializedPyObject to/from sequence of component buffers
> with minimal memory allocation / copying
> ------------------------------------------------------------------------------------------------------------------
>
> Key: ARROW-1783
> URL: https://issues.apache.org/jira/browse/ARROW-1783
> Project: Apache Arrow
> Issue Type: New Feature
> Components: Python
> Reporter: Wes McKinney
> Assignee: Wes McKinney
> Labels: pull-request-available
> Fix For: 0.8.0
>
>
> See discussion on Dask org:
> https://github.com/dask/distributed/pull/931
> It would be valuable for downstream users to compute the serialized payload
> as a sequence of memoryview-compatible objects without having to allocate new
> memory on write. This means that the component tensor messages must have
> their metadata and bodies in separate buffers. This will require a bit of
> work internally reassemble the object from a collection of {{pyarrow.Buffer}}
> objects
> see also ARROW-1509
--
This message was sent by Atlassian JIRA
(v6.4.14#64029)