This is an automated email from the ASF dual-hosted git repository. wesm pushed a commit to branch ARROW-6313-flatbuffer-alignment in repository https://gitbox.apache.org/repos/asf/arrow.git
commit 9514e2e225cdb84da09476cb5707dbb9443239eb Author: Wes McKinney <wesm+...@apache.org> AuthorDate: Wed Aug 28 11:58:52 2019 -0500 ARROW-6314: [C++] Implement IPC message format alignment changes, provide backwards compatibility and "legacy" option to emit old message format This also moves the alignment multiple to `IpcOptions` and adds the `IpcOptions` argument to more functions. Closes #5211 from wesm/ARROW-6314 and squashes the following commits: df3b910bd <Wes McKinney> Fix MSVC narrowing warning 62758b614 <Wes McKinney> Code review comments. Copy metadata always in prefix_length==4 legacy case 857a57155 <Antoine Pitrou> Fix CUDA IPC 71b2fad4f <Wes McKinney> Add tests exercising backwards compatibility write and read path 4777d2bc8 <Wes McKinney> Implement backwards compatibility and compatibility mode, pass IpcOptions in more APIs 1a3843215 <Wes McKinney> Revert changes to submodule 69883cf63 <Micah Kornfield> verify 8 bytes alignment fixes ubsan for ipc Lead-authored-by: Wes McKinney <wesm+...@apache.org> Co-authored-by: Antoine Pitrou <anto...@python.org> Co-authored-by: Micah Kornfield <emkornfi...@gmail.com> Signed-off-by: Wes McKinney <wesm+...@apache.org> --- cpp/src/arrow/gpu/cuda_arrow_ipc.cc | 26 +----- cpp/src/arrow/ipc/message.cc | 154 ++++++++++++++++++++++++++++----- cpp/src/arrow/ipc/message.h | 47 +++++++--- cpp/src/arrow/ipc/metadata_internal.cc | 32 ------- cpp/src/arrow/ipc/metadata_internal.h | 16 ---- cpp/src/arrow/ipc/options.h | 8 ++ cpp/src/arrow/ipc/read_write_test.cc | 119 ++++++++++++++++++------- cpp/src/arrow/ipc/writer.cc | 69 +++++++-------- cpp/src/arrow/ipc/writer.h | 22 ++--- python/pyarrow/includes/libarrow.pxd | 7 +- python/pyarrow/ipc.pxi | 5 +- 11 files changed, 309 insertions(+), 196 deletions(-) diff --git a/cpp/src/arrow/gpu/cuda_arrow_ipc.cc b/cpp/src/arrow/gpu/cuda_arrow_ipc.cc index 34488a1..0fb81bc 100644 --- a/cpp/src/arrow/gpu/cuda_arrow_ipc.cc +++ b/cpp/src/arrow/gpu/cuda_arrow_ipc.cc @@ -63,31 +63,7 @@ Status SerializeRecordBatch(const RecordBatch& batch, CudaContext* ctx, Status ReadMessage(CudaBufferReader* reader, MemoryPool* pool, std::unique_ptr<ipc::Message>* out) { - int32_t message_length = 0; - int64_t bytes_read = 0; - - RETURN_NOT_OK(reader->Read(sizeof(int32_t), &bytes_read, - reinterpret_cast<uint8_t*>(&message_length))); - if (bytes_read != sizeof(int32_t)) { - *out = nullptr; - return Status::OK(); - } - - if (message_length == 0) { - // Optional 0 EOS control message - *out = nullptr; - return Status::OK(); - } - - std::shared_ptr<Buffer> metadata; - RETURN_NOT_OK(AllocateBuffer(pool, message_length, &metadata)); - RETURN_NOT_OK(reader->Read(message_length, &bytes_read, metadata->mutable_data())); - if (bytes_read != message_length) { - return Status::IOError("Expected ", message_length, " metadata bytes, but only got ", - bytes_read); - } - - return ipc::Message::ReadFrom(metadata, reader, out); + return ipc::ReadMessageCopy(reader, pool, out); } Status ReadRecordBatch(const std::shared_ptr<Schema>& schema, diff --git a/cpp/src/arrow/ipc/message.cc b/cpp/src/arrow/ipc/message.cc index dad1f98..a281b0d 100644 --- a/cpp/src/arrow/ipc/message.cc +++ b/cpp/src/arrow/ipc/message.cc @@ -32,10 +32,14 @@ #include "arrow/ipc/util.h" #include "arrow/status.h" #include "arrow/util/logging.h" +#include "arrow/util/ubsan.h" namespace arrow { namespace ipc { +// This 0xFFFFFFFF value is the first 4 bytes of a valid IPC message +constexpr int32_t kIpcContinuationToken = -1; + class Message::MessageImpl { public: explicit MessageImpl(const std::shared_ptr<Buffer>& metadata, @@ -142,12 +146,19 @@ bool Message::Equals(const Message& other) const { } } -Status Message::ReadFrom(const std::shared_ptr<Buffer>& metadata, io::InputStream* stream, - std::unique_ptr<Message>* out) { +Status CheckMetadataAndGetBodyLength(const Buffer& metadata, int64_t* body_length) { + // Check metadata memory alignment in debug builds + DCHECK_EQ(0, reinterpret_cast<uintptr_t>(metadata.data()) % 8); const flatbuf::Message* fb_message; - RETURN_NOT_OK(internal::VerifyMessage(metadata->data(), metadata->size(), &fb_message)); + RETURN_NOT_OK(internal::VerifyMessage(metadata.data(), metadata.size(), &fb_message)); + *body_length = fb_message->bodyLength(); + return Status::OK(); +} - int64_t body_length = fb_message->bodyLength(); +Status Message::ReadFrom(const std::shared_ptr<Buffer>& metadata, io::InputStream* stream, + std::unique_ptr<Message>* out) { + int64_t body_length = -1; + RETURN_NOT_OK(CheckMetadataAndGetBodyLength(*metadata, &body_length)); std::shared_ptr<Buffer> body; RETURN_NOT_OK(stream->Read(body_length, &body)); @@ -161,9 +172,8 @@ Status Message::ReadFrom(const std::shared_ptr<Buffer>& metadata, io::InputStrea Status Message::ReadFrom(const int64_t offset, const std::shared_ptr<Buffer>& metadata, io::RandomAccessFile* file, std::unique_ptr<Message>* out) { - const flatbuf::Message* fb_message; - RETURN_NOT_OK(internal::VerifyMessage(metadata->data(), metadata->size(), &fb_message)); - int64_t body_length = fb_message->bodyLength(); + int64_t body_length = -1; + RETURN_NOT_OK(CheckMetadataAndGetBodyLength(*metadata, &body_length)); std::shared_ptr<Buffer> body; RETURN_NOT_OK(file->ReadAt(offset, body_length, &body)); @@ -184,10 +194,10 @@ Status WritePadding(io::OutputStream* stream, int64_t nbytes) { return Status::OK(); } -Status Message::SerializeTo(io::OutputStream* stream, int32_t alignment, +Status Message::SerializeTo(io::OutputStream* stream, const IpcOptions& options, int64_t* output_length) const { int32_t metadata_length = 0; - RETURN_NOT_OK(internal::WriteMessage(*metadata(), alignment, stream, &metadata_length)); + RETURN_NOT_OK(WriteMessage(*metadata(), options, stream, &metadata_length)); *output_length = metadata_length; @@ -237,15 +247,47 @@ Status ReadMessage(int64_t offset, int32_t metadata_length, io::RandomAccessFile " metadata bytes but got ", buffer->size()); } - int32_t flatbuffer_size = *reinterpret_cast<const int32_t*>(buffer->data()); + const int32_t continuation = util::SafeLoadAs<int32_t>(buffer->data()); + + // The size of the Flatbuffer including padding + int32_t flatbuffer_length = -1; + int32_t prefix_size = -1; + if (continuation == kIpcContinuationToken) { + if (metadata_length < 8) { + return Status::Invalid( + "Corrupted IPC message, had continuation token " + " but length ", + metadata_length); + } - if (flatbuffer_size + static_cast<int>(sizeof(int32_t)) > metadata_length) { - return Status::Invalid("flatbuffer size ", metadata_length, + // Valid IPC message, parse the message length now + flatbuffer_length = util::SafeLoadAs<int32_t>(buffer->data() + 4); + prefix_size = 8; + } else if (continuation == 0) { + // EOS + *message = nullptr; + return Status::OK(); + } else { + // ARROW-6314: Backwards compatibility for reading old IPC + // messages produced prior to version 0.15.0 + flatbuffer_length = continuation; + prefix_size = 4; + } + + if (flatbuffer_length + prefix_size != metadata_length) { + return Status::Invalid("flatbuffer size ", flatbuffer_length, " invalid. File offset: ", offset, ", metadata length: ", metadata_length); } - auto metadata = SliceBuffer(buffer, 4, buffer->size() - 4); + std::shared_ptr<Buffer> metadata = + SliceBuffer(buffer, prefix_size, buffer->size() - prefix_size); + if (prefix_size == 4) { + // ARROW-6314: For old messages we copy the metadata to fix UBSAN + // issues with Flatbuffers. For new messages, they are already + // aligned + RETURN_NOT_OK(metadata->Copy(0, metadata->size(), &metadata)); + } return Message::ReadFrom(offset + metadata_length, metadata, file, message); } @@ -269,39 +311,105 @@ Status CheckAligned(io::FileInterface* stream, int32_t alignment) { int64_t current_position; ARROW_RETURN_NOT_OK(stream->Tell(¤t_position)); if (current_position % alignment != 0) { - return Status::Invalid("Stream is not aligned"); + return Status::Invalid("Stream is not aligned pos: ", current_position, + " alignment: ", alignment); } else { return Status::OK(); } } -Status ReadMessage(io::InputStream* file, std::unique_ptr<Message>* message) { - int32_t message_length = 0; +namespace { + +Status ReadMessage(io::InputStream* file, MemoryPool* pool, bool copy_metadata, + std::unique_ptr<Message>* message) { + int32_t continuation = 0; int64_t bytes_read = 0; RETURN_NOT_OK(file->Read(sizeof(int32_t), &bytes_read, - reinterpret_cast<uint8_t*>(&message_length))); + reinterpret_cast<uint8_t*>(&continuation))); if (bytes_read != sizeof(int32_t)) { + // EOS *message = nullptr; return Status::OK(); } - if (message_length == 0) { - // Optional 0 EOS control message + int32_t flatbuffer_length = -1; + bool legacy_format = false; + if (continuation == kIpcContinuationToken) { + // Valid IPC message, read the message length now + RETURN_NOT_OK(file->Read(sizeof(int32_t), &bytes_read, + reinterpret_cast<uint8_t*>(&flatbuffer_length))); + } else if (continuation == 0) { + // EOS *message = nullptr; return Status::OK(); + } else { + // ARROW-6314: Backwards compatibility for reading old IPC + // messages produced prior to version 0.15.0 + flatbuffer_length = continuation; + legacy_format = true; } std::shared_ptr<Buffer> metadata; - RETURN_NOT_OK(file->Read(message_length, &metadata)); - if (metadata->size() != message_length) { - return Status::Invalid("Expected to read ", message_length, " metadata bytes, but ", - "only read ", metadata->size()); + if (legacy_format || copy_metadata) { + DCHECK_NE(pool, nullptr); + RETURN_NOT_OK(AllocateBuffer(pool, flatbuffer_length, &metadata)); + RETURN_NOT_OK(file->Read(flatbuffer_length, &bytes_read, metadata->mutable_data())); + } else { + RETURN_NOT_OK(file->Read(flatbuffer_length, &metadata)); + bytes_read = metadata->size(); + } + if (bytes_read != flatbuffer_length) { + return Status::Invalid("Expected to read ", flatbuffer_length, + " metadata bytes, but ", "only read ", bytes_read); } return Message::ReadFrom(metadata, file, message); } +} // namespace + +Status ReadMessage(io::InputStream* file, std::unique_ptr<Message>* out) { + return ReadMessage(file, default_memory_pool(), /*copy_metadata=*/false, out); +} + +Status ReadMessageCopy(io::InputStream* file, MemoryPool* pool, + std::unique_ptr<Message>* out) { + return ReadMessage(file, pool, /*copy_metadata=*/true, out); +} + +Status WriteMessage(const Buffer& message, const IpcOptions& options, + io::OutputStream* file, int32_t* message_length) { + const int32_t prefix_size = options.write_legacy_ipc_format ? 4 : 8; + const int32_t flatbuffer_size = static_cast<int32_t>(message.size()); + + int32_t padded_message_length = static_cast<int32_t>( + PaddedLength(flatbuffer_size + prefix_size, options.alignment)); + + int32_t padding = padded_message_length - flatbuffer_size - prefix_size; + + // The returned message size includes the length prefix, the flatbuffer, + // plus padding + *message_length = padded_message_length; + + // ARROW-6314: Write continuation / padding token + if (!options.write_legacy_ipc_format) { + RETURN_NOT_OK(file->Write(&kIpcContinuationToken, sizeof(int32_t))); + } + + // Write the flatbuffer size prefix including padding + int32_t padded_flatbuffer_size = padded_message_length - prefix_size; + RETURN_NOT_OK(file->Write(&padded_flatbuffer_size, sizeof(int32_t))); + + // Write the flatbuffer + RETURN_NOT_OK(file->Write(message.data(), flatbuffer_size)); + if (padding > 0) { + RETURN_NOT_OK(file->Write(kPaddingBytes, padding)); + } + + return Status::OK(); +} + // ---------------------------------------------------------------------- // Implement InputStream message reader diff --git a/cpp/src/arrow/ipc/message.h b/cpp/src/arrow/ipc/message.h index 9c152d7..89be45e 100644 --- a/cpp/src/arrow/ipc/message.h +++ b/cpp/src/arrow/ipc/message.h @@ -17,8 +17,7 @@ // C++ object model and user API for interprocess schema messaging -#ifndef ARROW_IPC_MESSAGE_H -#define ARROW_IPC_MESSAGE_H +#pragma once #include <cstdint> #include <memory> @@ -32,6 +31,7 @@ namespace arrow { class Buffer; +class MemoryPool; namespace io { @@ -137,13 +137,10 @@ class ARROW_EXPORT Message { /// \brief Write length-prefixed metadata and body to output stream /// /// \param[in] file output stream to write to - /// \param[in] alignment byte alignment for metadata, usually 8 or - /// 64. Whether the body is padded depends on the metadata; if the body - /// buffer is smaller than the size indicated in the metadata, then extra - /// padding bytes will be written + /// \param[in] options IPC writing options including alignment /// \param[out] output_length the number of bytes written /// \return Status - Status SerializeTo(io::OutputStream* file, int32_t alignment, + Status SerializeTo(io::OutputStream* file, const IpcOptions& options, int64_t* output_length) const; /// \brief Return true if the Message metadata passes Flatbuffer validation @@ -223,15 +220,39 @@ Status AlignStream(io::OutputStream* stream, int32_t alignment = 8); ARROW_EXPORT Status CheckAligned(io::FileInterface* stream, int32_t alignment = 8); -/// \brief Read encapsulated RPC message (metadata and body) from InputStream +/// \brief Read encapsulated IPC message (metadata and body) from InputStream /// -/// Read length-prefixed message with as-yet unknown length. Returns null if -/// there are not enough bytes available or the message length is 0 (e.g. EOS -/// in a stream) +/// Returns null if there are not enough bytes available or the +/// message length is 0 (e.g. EOS in a stream) ARROW_EXPORT Status ReadMessage(io::InputStream* stream, std::unique_ptr<Message>* message); +/// \brief Read encapsulated IPC message (metadata and body) from InputStream +/// +/// Like ReadMessage, except that the metadata is copied in a new buffer. +/// This is necessary if the stream returns non-CPU buffers. +ARROW_EXPORT +Status ReadMessageCopy(io::InputStream* stream, MemoryPool* pool, + std::unique_ptr<Message>* message); + +/// Write encapsulated IPC message Does not make assumptions about +/// whether the stream is aligned already. Can write legacy (pre +/// version 0.15.0) IPC message if option set +/// +/// continuation: 0xFFFFFFFF +/// message_size: int32 +/// message: const void* +/// padding +/// +/// \param[in] message a buffer containing the metadata to write +/// \param[in] options IPC writing options, including alignment and +/// legacy message support +/// \param[in,out] file the OutputStream to write to +/// \param[out] message_length the total size of the payload written including +/// padding +/// \return Status +Status WriteMessage(const Buffer& message, const IpcOptions& options, + io::OutputStream* file, int32_t* message_length); + } // namespace ipc } // namespace arrow - -#endif // ARROW_IPC_MESSAGE_H diff --git a/cpp/src/arrow/ipc/metadata_internal.cc b/cpp/src/arrow/ipc/metadata_internal.cc index 6810351..dff3369 100644 --- a/cpp/src/arrow/ipc/metadata_internal.cc +++ b/cpp/src/arrow/ipc/metadata_internal.cc @@ -1282,38 +1282,6 @@ Status GetSparseTensorMetadata(const Buffer& metadata, std::shared_ptr<DataType> return ConcreteTypeFromFlatbuffer(sparse_tensor->type_type(), type_data, {}, type); } -// ---------------------------------------------------------------------- -// Implement message writing - -Status WriteMessage(const Buffer& message, int32_t alignment, io::OutputStream* file, - int32_t* message_length) { - // ARROW-3212: We do not make assumptions that the output stream is aligned - int32_t padded_message_length = static_cast<int32_t>(message.size()) + 4; - const int32_t remainder = padded_message_length % alignment; - if (remainder != 0) { - padded_message_length += alignment - remainder; - } - - // The returned message size includes the length prefix, the flatbuffer, - // plus padding - *message_length = padded_message_length; - - // Write the flatbuffer size prefix including padding - int32_t flatbuffer_size = padded_message_length - 4; - RETURN_NOT_OK(file->Write(&flatbuffer_size, sizeof(int32_t))); - - // Write the flatbuffer - RETURN_NOT_OK(file->Write(message.data(), message.size())); - - // Write any padding - int32_t padding = padded_message_length - static_cast<int32_t>(message.size()) - 4; - if (padding > 0) { - RETURN_NOT_OK(file->Write(kPaddingBytes, padding)); - } - - return Status::OK(); -} - } // namespace internal } // namespace ipc } // namespace arrow diff --git a/cpp/src/arrow/ipc/metadata_internal.h b/cpp/src/arrow/ipc/metadata_internal.h index 828affd..420cfb1 100644 --- a/cpp/src/arrow/ipc/metadata_internal.h +++ b/cpp/src/arrow/ipc/metadata_internal.h @@ -128,22 +128,6 @@ static inline Status VerifyMessage(const uint8_t* data, int64_t size, return Status::OK(); } -/// Write a serialized message metadata with a length-prefix and padding to an -/// 8-byte offset. Does not make assumptions about whether the stream is -/// aligned already -/// -/// <message_size: int32><message: const void*><padding> -/// -/// \param[in] message a buffer containing the metadata to write -/// \param[in] alignment the size multiple of the total message size including -/// length prefix, metadata, and padding. Usually 8 or 64 -/// \param[in,out] file the OutputStream to write to -/// \param[out] message_length the total size of the payload written including -/// padding -/// \return Status -Status WriteMessage(const Buffer& message, int32_t alignment, io::OutputStream* file, - int32_t* message_length); - // Serialize arrow::Schema as a Flatbuffer // // \param[in] schema a Schema instance diff --git a/cpp/src/arrow/ipc/options.h b/cpp/src/arrow/ipc/options.h index d380402..3570c06 100644 --- a/cpp/src/arrow/ipc/options.h +++ b/cpp/src/arrow/ipc/options.h @@ -36,6 +36,14 @@ struct ARROW_EXPORT IpcOptions { // The maximum permitted schema nesting depth. int max_recursion_depth = kMaxNestingDepth; + // Write padding after memory buffers to this multiple of + // bytes. Generally 8 or 64 + int32_t alignment = 8; + + /// \brief Write the pre-0.15.0 encapsulated IPC message format + /// consisting of a 4-byte prefix instead of 8 byte + bool write_legacy_ipc_format = false; + static IpcOptions Defaults(); }; diff --git a/cpp/src/arrow/ipc/read_write_test.cc b/cpp/src/arrow/ipc/read_write_test.cc index 9cbeacf..efd0a88 100644 --- a/cpp/src/arrow/ipc/read_write_test.cc +++ b/cpp/src/arrow/ipc/read_write_test.cc @@ -106,27 +106,69 @@ TEST(TestMessage, SerializeTo) { std::shared_ptr<io::BufferOutputStream> stream; - { - const int32_t alignment = 8; - + auto CheckWithAlignment = [&](int32_t alignment) { + IpcOptions options; + options.alignment = alignment; + const int32_t prefix_size = 8; ASSERT_OK(io::BufferOutputStream::Create(1 << 10, default_memory_pool(), &stream)); - ASSERT_OK(message->SerializeTo(stream.get(), alignment, &output_length)); + ASSERT_OK(message->SerializeTo(stream.get(), options, &output_length)); ASSERT_OK(stream->Tell(&position)); - ASSERT_EQ(BitUtil::RoundUp(metadata->size() + 4, alignment) + body_length, + ASSERT_EQ(BitUtil::RoundUp(metadata->size() + prefix_size, alignment) + body_length, output_length); ASSERT_EQ(output_length, position); - } + }; - { - const int32_t alignment = 64; + CheckWithAlignment(8); + CheckWithAlignment(64); +} - ASSERT_OK(io::BufferOutputStream::Create(1 << 10, default_memory_pool(), &stream)); - ASSERT_OK(message->SerializeTo(stream.get(), alignment, &output_length)); - ASSERT_OK(stream->Tell(&position)); - ASSERT_EQ(BitUtil::RoundUp(metadata->size() + 4, alignment) + body_length, - output_length); - ASSERT_EQ(output_length, position); - } +void BuffersOverlapEquals(const Buffer& left, const Buffer& right) { + ASSERT_GT(left.size(), 0); + ASSERT_GT(right.size(), 0); + ASSERT_TRUE(left.Equals(right, std::min(left.size(), right.size()))); +} + +TEST(TestMessage, LegacyIpcBackwardsCompatibility) { + std::shared_ptr<RecordBatch> batch; + ASSERT_OK(MakeIntBatchSized(36, &batch)); + + auto RoundtripWithOptions = [&](const IpcOptions& arg_options, + std::shared_ptr<Buffer>* out_serialized, + std::unique_ptr<Message>* out) { + internal::IpcPayload payload; + ASSERT_OK(internal::GetRecordBatchPayload(*batch, arg_options, default_memory_pool(), + &payload)); + + std::shared_ptr<io::BufferOutputStream> stream; + ASSERT_OK(io::BufferOutputStream::Create(1 << 20, default_memory_pool(), &stream)); + + int32_t metadata_length = -1; + ASSERT_OK( + internal::WriteIpcPayload(payload, arg_options, stream.get(), &metadata_length)); + + ASSERT_OK(stream->Finish(out_serialized)); + io::BufferReader io_reader(*out_serialized); + ASSERT_OK(ReadMessage(&io_reader, out)); + }; + + std::shared_ptr<Buffer> serialized, legacy_serialized; + std::unique_ptr<Message> message, legacy_message; + + IpcOptions options; + RoundtripWithOptions(options, &serialized, &message); + + // First 4 bytes 0xFFFFFFFF Continuation marker + ASSERT_EQ(-1, util::SafeLoadAs<int32_t>(serialized->data())); + + options.write_legacy_ipc_format = true; + RoundtripWithOptions(options, &legacy_serialized, &legacy_message); + + // Check that the continuation marker is not written + ASSERT_NE(-1, util::SafeLoadAs<int32_t>(legacy_serialized->data())); + + // Have to use the smaller size to exclude padding + BuffersOverlapEquals(*legacy_message->metadata(), *message->metadata()); + ASSERT_TRUE(legacy_message->body()->Equals(*message->body())); } TEST(TestMessage, Verify) { @@ -635,13 +677,14 @@ TEST_F(RecursionLimits, StressLimit) { #endif // !defined(_WIN32) || defined(NDEBUG) struct FileWriterHelper { - Status Init(const std::shared_ptr<Schema>& schema) { + Status Init(const std::shared_ptr<Schema>& schema, const IpcOptions& options) { num_batches_written_ = 0; RETURN_NOT_OK(AllocateResizableBuffer(0, &buffer_)); sink_.reset(new io::BufferOutputStream(buffer_)); - - return RecordBatchFileWriter::Open(sink_.get(), schema, &writer_); + ARROW_ASSIGN_OR_RAISE(writer_, + RecordBatchFileWriter::Open(sink_.get(), schema, options)); + return Status::OK(); } Status WriteBatch(const std::shared_ptr<RecordBatch>& batch) { @@ -680,11 +723,12 @@ struct FileWriterHelper { }; struct StreamWriterHelper { - Status Init(const std::shared_ptr<Schema>& schema) { + Status Init(const std::shared_ptr<Schema>& schema, const IpcOptions& options) { RETURN_NOT_OK(AllocateResizableBuffer(0, &buffer_)); sink_.reset(new io::BufferOutputStream(buffer_)); - - return RecordBatchStreamWriter::Open(sink_.get(), schema, &writer_); + ARROW_ASSIGN_OR_RAISE(writer_, + RecordBatchStreamWriter::Open(sink_.get(), schema, options)); + return Status::OK(); } Status WriteBatch(const std::shared_ptr<RecordBatch>& batch) { @@ -718,7 +762,7 @@ class ReaderWriterMixin { // Check simple RecordBatch roundtripping template <typename Param> - void TestRoundTrip(Param&& param) { + void TestRoundTrip(Param&& param, const IpcOptions& options) { std::shared_ptr<RecordBatch> batch1; std::shared_ptr<RecordBatch> batch2; ASSERT_OK(param(&batch1)); // NOLINT clang-tidy gtest issue @@ -727,7 +771,7 @@ class ReaderWriterMixin { BatchVector in_batches = {batch1, batch2}; BatchVector out_batches; - ASSERT_OK(RoundTripHelper(in_batches, &out_batches)); + ASSERT_OK(RoundTripHelper(in_batches, options, &out_batches)); ASSERT_EQ(out_batches.size(), in_batches.size()); // Compare batches @@ -741,7 +785,7 @@ class ReaderWriterMixin { ASSERT_OK(MakeDictionary(&batch)); BatchVector out_batches; - ASSERT_OK(RoundTripHelper({batch}, &out_batches)); + ASSERT_OK(RoundTripHelper({batch}, IpcOptions::Defaults(), &out_batches)); ASSERT_EQ(out_batches.size(), 1); // TODO(wesm): This was broken in ARROW-3144. I'm not sure how to @@ -764,7 +808,7 @@ class ReaderWriterMixin { schema = schema->WithMetadata(key_value_metadata({"some_key"}, {"some_value"})); WriterHelper writer_helper; - ASSERT_OK(writer_helper.Init(schema)); + ASSERT_OK(writer_helper.Init(schema, IpcOptions::Defaults())); // Writing a record batch with a different schema ASSERT_RAISES(Invalid, writer_helper.WriteBatch(batch_ints)); // Writing a record batch with the same schema (except metadata) @@ -781,9 +825,10 @@ class ReaderWriterMixin { } private: - Status RoundTripHelper(const BatchVector& in_batches, BatchVector* out_batches) { + Status RoundTripHelper(const BatchVector& in_batches, const IpcOptions& options, + BatchVector* out_batches) { WriterHelper writer_helper; - RETURN_NOT_OK(writer_helper.Init(in_batches[0]->schema())); + RETURN_NOT_OK(writer_helper.Init(in_batches[0]->schema(), options)); for (const auto& batch : in_batches) { RETURN_NOT_OK(writer_helper.WriteBatch(batch)); } @@ -813,9 +858,21 @@ class TestFileFormat : public ReaderWriterMixin<FileWriterHelper>, class TestStreamFormat : public ReaderWriterMixin<StreamWriterHelper>, public ::testing::TestWithParam<MakeRecordBatch*> {}; -TEST_P(TestFileFormat, RoundTrip) { TestRoundTrip(*GetParam()); } +TEST_P(TestFileFormat, RoundTrip) { + TestRoundTrip(*GetParam(), IpcOptions::Defaults()); -TEST_P(TestStreamFormat, RoundTrip) { TestRoundTrip(*GetParam()); } + IpcOptions options; + options.write_legacy_ipc_format = true; + TestRoundTrip(*GetParam(), options); +} + +TEST_P(TestStreamFormat, RoundTrip) { + TestRoundTrip(*GetParam(), IpcOptions::Defaults()); + + IpcOptions options; + options.write_legacy_ipc_format = true; + TestRoundTrip(*GetParam(), options); +} INSTANTIATE_TEST_CASE_P(GenericIpcRoundTripTests, TestIpcRoundTrip, BATCH_CASES()); INSTANTIATE_TEST_CASE_P(FileRoundTripTests, TestFileFormat, BATCH_CASES()); @@ -912,13 +969,15 @@ void SpliceMessages(std::shared_ptr<Buffer> stream, continue; } + IpcOptions options; internal::IpcPayload payload; payload.type = msg->type(); payload.metadata = msg->metadata(); payload.body_buffers.push_back(msg->body()); payload.body_length = msg->body()->size(); int32_t unused_metadata_length = -1; - ASSERT_OK(internal::WriteIpcPayload(payload, out.get(), &unused_metadata_length)); + ASSERT_OK( + internal::WriteIpcPayload(payload, options, out.get(), &unused_metadata_length)); } ASSERT_OK(out->Finish(spliced_stream)); } diff --git a/cpp/src/arrow/ipc/writer.cc b/cpp/src/arrow/ipc/writer.cc index 81332a6..7127300 100644 --- a/cpp/src/arrow/ipc/writer.cc +++ b/cpp/src/arrow/ipc/writer.cc @@ -529,10 +529,9 @@ class DictionaryWriter : public RecordBatchSerializer { int64_t dictionary_id_; }; -Status WriteIpcPayload(const IpcPayload& payload, io::OutputStream* dst, - int32_t* metadata_length) { - RETURN_NOT_OK(internal::WriteMessage(*payload.metadata, kArrowIpcAlignment, dst, - metadata_length)); +Status WriteIpcPayload(const IpcPayload& payload, const IpcOptions& options, + io::OutputStream* dst, int32_t* metadata_length) { + RETURN_NOT_OK(WriteMessage(*payload.metadata, options, dst, metadata_length)); #ifndef NDEBUG RETURN_NOT_OK(CheckAligned(dst)); @@ -604,7 +603,7 @@ Status WriteRecordBatch(const RecordBatch& batch, int64_t buffer_start_offset, // The body size is computed in the payload *body_length = payload.body_length; - return internal::WriteIpcPayload(payload, dst, metadata_length); + return internal::WriteIpcPayload(payload, options, dst, metadata_length); } Status WriteRecordBatchStream(const std::vector<std::shared_ptr<RecordBatch>>& batches, @@ -625,7 +624,9 @@ Status WriteTensorHeader(const Tensor& tensor, io::OutputStream* dst, int32_t* metadata_length) { std::shared_ptr<Buffer> metadata; RETURN_NOT_OK(internal::WriteTensorMessage(tensor, 0, &metadata)); - return internal::WriteMessage(*metadata, kTensorAlignment, dst, metadata_length); + IpcOptions options; + options.alignment = kTensorAlignment; + return WriteMessage(*metadata, options, dst, metadata_length); } Status WriteStridedTensorData(int dim_index, int64_t offset, int elem_size, @@ -818,19 +819,7 @@ Status WriteSparseTensor(const SparseTensor& sparse_tensor, io::OutputStream* ds RETURN_NOT_OK(writer.Assemble(sparse_tensor)); *body_length = payload.body_length; - return internal::WriteIpcPayload(payload, dst, metadata_length); -} - -Status WriteDictionary(int64_t dictionary_id, const std::shared_ptr<Array>& dictionary, - int64_t buffer_start_offset, io::OutputStream* dst, - int32_t* metadata_length, int64_t* body_length, MemoryPool* pool) { - auto options = IpcOptions::Defaults(); - internal::IpcPayload payload; - RETURN_NOT_OK(GetDictionaryPayload(dictionary_id, dictionary, options, pool, &payload)); - - // The body size is computed in the payload - *body_length = payload.body_length; - return internal::WriteIpcPayload(payload, dst, metadata_length); + return internal::WriteIpcPayload(payload, IpcOptions::Defaults(), dst, metadata_length); } Status GetRecordBatchSize(const RecordBatch& batch, int64_t* size) { @@ -1022,20 +1011,24 @@ class StreamBookKeeper { return Status::OK(); } + Status WriteEOS() { + // End of stream marker + constexpr int64_t kEos = 0; + return Write(&kEos, sizeof(kEos)); + } + protected: io::OutputStream* sink_; int64_t position_; }; -// End of stream marker -constexpr int32_t kEos = 0; - /// A IpcPayloadWriter implementation that writes to a IPC stream /// (with an end-of-stream marker) class PayloadStreamWriter : public internal::IpcPayloadWriter, protected StreamBookKeeper { public: - explicit PayloadStreamWriter(io::OutputStream* sink) : StreamBookKeeper(sink) {} + PayloadStreamWriter(const IpcOptions& options, io::OutputStream* sink) + : StreamBookKeeper(sink), options_(options) {} ~PayloadStreamWriter() override = default; @@ -1046,23 +1039,24 @@ class PayloadStreamWriter : public internal::IpcPayloadWriter, #endif int32_t metadata_length = 0; // unused - RETURN_NOT_OK(WriteIpcPayload(payload, sink_, &metadata_length)); + RETURN_NOT_OK(WriteIpcPayload(payload, options_, sink_, &metadata_length)); RETURN_NOT_OK(UpdatePositionCheckAligned()); return Status::OK(); } - Status Close() override { - // Write 0 EOS message - return Write(&kEos, sizeof(int32_t)); - } + Status Close() override { return WriteEOS(); } + + private: + IpcOptions options_; }; /// A IpcPayloadWriter implementation that writes to a IPC file /// (with a footer as defined in File.fbs) class PayloadFileWriter : public internal::IpcPayloadWriter, protected StreamBookKeeper { public: - PayloadFileWriter(io::OutputStream* sink, const std::shared_ptr<Schema>& schema) - : StreamBookKeeper(sink), schema_(schema) {} + PayloadFileWriter(const IpcOptions& options, const std::shared_ptr<Schema>& schema, + io::OutputStream* sink) + : StreamBookKeeper(sink), options_(options), schema_(schema) {} ~PayloadFileWriter() override = default; @@ -1074,7 +1068,7 @@ class PayloadFileWriter : public internal::IpcPayloadWriter, protected StreamBoo // Metadata length must include padding, it's computed by WriteIpcPayload() FileBlock block = {position_, 0, payload.body_length}; - RETURN_NOT_OK(WriteIpcPayload(payload, sink_, &block.metadata_length)); + RETURN_NOT_OK(WriteIpcPayload(payload, options_, sink_, &block.metadata_length)); RETURN_NOT_OK(UpdatePositionCheckAligned()); // Record position and size of some message types, to list them in the footer @@ -1107,7 +1101,7 @@ class PayloadFileWriter : public internal::IpcPayloadWriter, protected StreamBoo Status Close() override { // Write 0 EOS message for compatibility with sequential readers - RETURN_NOT_OK(Write(&kEos, sizeof(int32_t))); + RETURN_NOT_OK(WriteEOS()); // Write file footer RETURN_NOT_OK(UpdatePosition()); @@ -1128,6 +1122,7 @@ class PayloadFileWriter : public internal::IpcPayloadWriter, protected StreamBoo } protected: + IpcOptions options_; std::shared_ptr<Schema> schema_; std::vector<FileBlock> dictionaries_; std::vector<FileBlock> record_batches_; @@ -1141,9 +1136,9 @@ class RecordBatchStreamWriter::RecordBatchStreamWriterImpl RecordBatchStreamWriterImpl(io::OutputStream* sink, const std::shared_ptr<Schema>& schema, const IpcOptions& options) - : RecordBatchPayloadWriter( - std::unique_ptr<internal::IpcPayloadWriter>(new PayloadStreamWriter(sink)), - schema, options) {} + : RecordBatchPayloadWriter(std::unique_ptr<internal::IpcPayloadWriter>( + new PayloadStreamWriter(options, sink)), + schema, options) {} ~RecordBatchStreamWriterImpl() = default; }; @@ -1153,7 +1148,7 @@ class RecordBatchFileWriter::RecordBatchFileWriterImpl : public RecordBatchPaylo RecordBatchFileWriterImpl(io::OutputStream* sink, const std::shared_ptr<Schema>& schema, const IpcOptions& options) : RecordBatchPayloadWriter(std::unique_ptr<internal::IpcPayloadWriter>( - new PayloadFileWriter(sink, schema)), + new PayloadFileWriter(options, schema, sink)), schema, options) {} ~RecordBatchFileWriterImpl() = default; @@ -1277,7 +1272,7 @@ Status SerializeSchema(const Schema& schema, DictionaryMemo* dictionary_memo, RETURN_NOT_OK(io::BufferOutputStream::Create(1024, pool, &stream)); auto options = IpcOptions::Defaults(); - auto payload_writer = make_unique<PayloadStreamWriter>(stream.get()); + auto payload_writer = make_unique<PayloadStreamWriter>(options, stream.get()); RecordBatchPayloadWriter writer(std::move(payload_writer), schema, options, dictionary_memo); // Write schema and populate fields (but not dictionaries) in dictionary_memo diff --git a/cpp/src/arrow/ipc/writer.h b/cpp/src/arrow/ipc/writer.h index 75030c2..c673b0a 100644 --- a/cpp/src/arrow/ipc/writer.h +++ b/cpp/src/arrow/ipc/writer.h @@ -177,7 +177,9 @@ class ARROW_EXPORT RecordBatchFileWriter : public RecordBatchStreamWriter { std::unique_ptr<RecordBatchFileWriterImpl> file_impl_; }; -/// \brief Low-level API for writing a record batch (without schema) to an OutputStream +/// \brief Low-level API for writing a record batch (without schema) +/// to an OutputStream as encapsulated IPC message. See Arrow format +/// documentation for more detail. /// /// \param[in] batch the record batch to write /// \param[in] buffer_start_offset the start offset to use in the buffer metadata, @@ -189,20 +191,6 @@ class ARROW_EXPORT RecordBatchFileWriter : public RecordBatchStreamWriter { /// \param[in] options options for serialization /// \param[in] pool the memory pool to allocate memory from /// \return Status -/// -/// Write the RecordBatch (collection of equal-length Arrow arrays) to the -/// output stream in a contiguous block. The record batch metadata is written as -/// a flatbuffer (see format/Message.fbs -- the RecordBatch message type) -/// prefixed by its size, followed by each of the memory buffers in the batch -/// written end to end (with appropriate alignment and padding): -/// -/// \code -/// <int32: metadata size> <uint8*: metadata> <buffers ...> -/// \endcode -/// -/// Finally, the absolute offsets (relative to the start of the output stream) -/// to the end of the body and end of the metadata / data header (suffixed by -/// the header size) is returned in out-variables ARROW_EXPORT Status WriteRecordBatch(const RecordBatch& batch, int64_t buffer_start_offset, io::OutputStream* dst, int32_t* metadata_length, @@ -392,8 +380,8 @@ Status GetRecordBatchPayload(const RecordBatch& batch, const IpcOptions& options MemoryPool* pool, IpcPayload* out); ARROW_EXPORT -Status WriteIpcPayload(const IpcPayload& payload, io::OutputStream* dst, - int32_t* metadata_length); +Status WriteIpcPayload(const IpcPayload& payload, const IpcOptions& options, + io::OutputStream* dst, int32_t* metadata_length); } // namespace internal diff --git a/python/pyarrow/includes/libarrow.pxd b/python/pyarrow/includes/libarrow.pxd index 2c05ec5..87393fb 100644 --- a/python/pyarrow/includes/libarrow.pxd +++ b/python/pyarrow/includes/libarrow.pxd @@ -964,6 +964,11 @@ cdef extern from "arrow/ipc/api.h" namespace "arrow::ipc" nogil: MessageType_V4" arrow::ipc::MetadataVersion::V4" cdef cppclass CIpcOptions" arrow::ipc::IpcOptions": + c_bool allow_64bit + int max_recursion_depth + int32_t alignment + c_bool write_legacy_ipc_format + @staticmethod CIpcOptions Defaults() @@ -989,7 +994,7 @@ cdef extern from "arrow/ipc/api.h" namespace "arrow::ipc" nogil: MetadataVersion metadata_version() MessageType type() - CStatus SerializeTo(OutputStream* stream, int32_t alignment, + CStatus SerializeTo(OutputStream* stream, const CIpcOptions& options, int64_t* output_length) c_string FormatMessageType(MessageType type) diff --git a/python/pyarrow/ipc.pxi b/python/pyarrow/ipc.pxi index b1aca23..6710f63 100644 --- a/python/pyarrow/ipc.pxi +++ b/python/pyarrow/ipc.pxi @@ -76,13 +76,14 @@ cdef class Message: """ cdef: int64_t output_length = 0 - int32_t c_alignment = alignment OutputStream* out + CIpcOptions options + options.alignment = alignment out = sink.get_output_stream().get() with nogil: check_status(self.message.get() - .SerializeTo(out, c_alignment, &output_length)) + .SerializeTo(out, options, &output_length)) def serialize(self, alignment=8, memory_pool=None): """