This is an automated email from the ASF dual-hosted git repository.

wesm pushed a commit to branch ARROW-6313-flatbuffer-alignment
in repository https://gitbox.apache.org/repos/asf/arrow.git

commit 9514e2e225cdb84da09476cb5707dbb9443239eb
Author: Wes McKinney <wesm+...@apache.org>
AuthorDate: Wed Aug 28 11:58:52 2019 -0500

    ARROW-6314: [C++] Implement IPC message format alignment changes, provide 
backwards compatibility and "legacy" option to emit old message format
    
    This also moves the alignment multiple to `IpcOptions` and adds the 
`IpcOptions` argument to more functions.
    
    Closes #5211 from wesm/ARROW-6314 and squashes the following commits:
    
    df3b910bd <Wes McKinney> Fix MSVC narrowing warning
    62758b614 <Wes McKinney> Code review comments. Copy metadata always in 
prefix_length==4 legacy case
    857a57155 <Antoine Pitrou> Fix CUDA IPC
    71b2fad4f <Wes McKinney> Add tests exercising backwards compatibility write 
and read path
    4777d2bc8 <Wes McKinney> Implement backwards compatibility and 
compatibility mode, pass IpcOptions in more APIs
    1a3843215 <Wes McKinney> Revert changes to submodule
    69883cf63 <Micah Kornfield> verify 8 bytes alignment fixes ubsan for ipc
    
    Lead-authored-by: Wes McKinney <wesm+...@apache.org>
    Co-authored-by: Antoine Pitrou <anto...@python.org>
    Co-authored-by: Micah Kornfield <emkornfi...@gmail.com>
    Signed-off-by: Wes McKinney <wesm+...@apache.org>
---
 cpp/src/arrow/gpu/cuda_arrow_ipc.cc    |  26 +-----
 cpp/src/arrow/ipc/message.cc           | 154 ++++++++++++++++++++++++++++-----
 cpp/src/arrow/ipc/message.h            |  47 +++++++---
 cpp/src/arrow/ipc/metadata_internal.cc |  32 -------
 cpp/src/arrow/ipc/metadata_internal.h  |  16 ----
 cpp/src/arrow/ipc/options.h            |   8 ++
 cpp/src/arrow/ipc/read_write_test.cc   | 119 ++++++++++++++++++-------
 cpp/src/arrow/ipc/writer.cc            |  69 +++++++--------
 cpp/src/arrow/ipc/writer.h             |  22 ++---
 python/pyarrow/includes/libarrow.pxd   |   7 +-
 python/pyarrow/ipc.pxi                 |   5 +-
 11 files changed, 309 insertions(+), 196 deletions(-)

diff --git a/cpp/src/arrow/gpu/cuda_arrow_ipc.cc 
b/cpp/src/arrow/gpu/cuda_arrow_ipc.cc
index 34488a1..0fb81bc 100644
--- a/cpp/src/arrow/gpu/cuda_arrow_ipc.cc
+++ b/cpp/src/arrow/gpu/cuda_arrow_ipc.cc
@@ -63,31 +63,7 @@ Status SerializeRecordBatch(const RecordBatch& batch, 
CudaContext* ctx,
 
 Status ReadMessage(CudaBufferReader* reader, MemoryPool* pool,
                    std::unique_ptr<ipc::Message>* out) {
-  int32_t message_length = 0;
-  int64_t bytes_read = 0;
-
-  RETURN_NOT_OK(reader->Read(sizeof(int32_t), &bytes_read,
-                             reinterpret_cast<uint8_t*>(&message_length)));
-  if (bytes_read != sizeof(int32_t)) {
-    *out = nullptr;
-    return Status::OK();
-  }
-
-  if (message_length == 0) {
-    // Optional 0 EOS control message
-    *out = nullptr;
-    return Status::OK();
-  }
-
-  std::shared_ptr<Buffer> metadata;
-  RETURN_NOT_OK(AllocateBuffer(pool, message_length, &metadata));
-  RETURN_NOT_OK(reader->Read(message_length, &bytes_read, 
metadata->mutable_data()));
-  if (bytes_read != message_length) {
-    return Status::IOError("Expected ", message_length, " metadata bytes, but 
only got ",
-                           bytes_read);
-  }
-
-  return ipc::Message::ReadFrom(metadata, reader, out);
+  return ipc::ReadMessageCopy(reader, pool, out);
 }
 
 Status ReadRecordBatch(const std::shared_ptr<Schema>& schema,
diff --git a/cpp/src/arrow/ipc/message.cc b/cpp/src/arrow/ipc/message.cc
index dad1f98..a281b0d 100644
--- a/cpp/src/arrow/ipc/message.cc
+++ b/cpp/src/arrow/ipc/message.cc
@@ -32,10 +32,14 @@
 #include "arrow/ipc/util.h"
 #include "arrow/status.h"
 #include "arrow/util/logging.h"
+#include "arrow/util/ubsan.h"
 
 namespace arrow {
 namespace ipc {
 
+// This 0xFFFFFFFF value is the first 4 bytes of a valid IPC message
+constexpr int32_t kIpcContinuationToken = -1;
+
 class Message::MessageImpl {
  public:
   explicit MessageImpl(const std::shared_ptr<Buffer>& metadata,
@@ -142,12 +146,19 @@ bool Message::Equals(const Message& other) const {
   }
 }
 
-Status Message::ReadFrom(const std::shared_ptr<Buffer>& metadata, 
io::InputStream* stream,
-                         std::unique_ptr<Message>* out) {
+Status CheckMetadataAndGetBodyLength(const Buffer& metadata, int64_t* 
body_length) {
+  // Check metadata memory alignment in debug builds
+  DCHECK_EQ(0, reinterpret_cast<uintptr_t>(metadata.data()) % 8);
   const flatbuf::Message* fb_message;
-  RETURN_NOT_OK(internal::VerifyMessage(metadata->data(), metadata->size(), 
&fb_message));
+  RETURN_NOT_OK(internal::VerifyMessage(metadata.data(), metadata.size(), 
&fb_message));
+  *body_length = fb_message->bodyLength();
+  return Status::OK();
+}
 
-  int64_t body_length = fb_message->bodyLength();
+Status Message::ReadFrom(const std::shared_ptr<Buffer>& metadata, 
io::InputStream* stream,
+                         std::unique_ptr<Message>* out) {
+  int64_t body_length = -1;
+  RETURN_NOT_OK(CheckMetadataAndGetBodyLength(*metadata, &body_length));
 
   std::shared_ptr<Buffer> body;
   RETURN_NOT_OK(stream->Read(body_length, &body));
@@ -161,9 +172,8 @@ Status Message::ReadFrom(const std::shared_ptr<Buffer>& 
metadata, io::InputStrea
 
 Status Message::ReadFrom(const int64_t offset, const std::shared_ptr<Buffer>& 
metadata,
                          io::RandomAccessFile* file, std::unique_ptr<Message>* 
out) {
-  const flatbuf::Message* fb_message;
-  RETURN_NOT_OK(internal::VerifyMessage(metadata->data(), metadata->size(), 
&fb_message));
-  int64_t body_length = fb_message->bodyLength();
+  int64_t body_length = -1;
+  RETURN_NOT_OK(CheckMetadataAndGetBodyLength(*metadata, &body_length));
 
   std::shared_ptr<Buffer> body;
   RETURN_NOT_OK(file->ReadAt(offset, body_length, &body));
@@ -184,10 +194,10 @@ Status WritePadding(io::OutputStream* stream, int64_t 
nbytes) {
   return Status::OK();
 }
 
-Status Message::SerializeTo(io::OutputStream* stream, int32_t alignment,
+Status Message::SerializeTo(io::OutputStream* stream, const IpcOptions& 
options,
                             int64_t* output_length) const {
   int32_t metadata_length = 0;
-  RETURN_NOT_OK(internal::WriteMessage(*metadata(), alignment, stream, 
&metadata_length));
+  RETURN_NOT_OK(WriteMessage(*metadata(), options, stream, &metadata_length));
 
   *output_length = metadata_length;
 
@@ -237,15 +247,47 @@ Status ReadMessage(int64_t offset, int32_t 
metadata_length, io::RandomAccessFile
                            " metadata bytes but got ", buffer->size());
   }
 
-  int32_t flatbuffer_size = *reinterpret_cast<const int32_t*>(buffer->data());
+  const int32_t continuation = util::SafeLoadAs<int32_t>(buffer->data());
+
+  // The size of the Flatbuffer including padding
+  int32_t flatbuffer_length = -1;
+  int32_t prefix_size = -1;
+  if (continuation == kIpcContinuationToken) {
+    if (metadata_length < 8) {
+      return Status::Invalid(
+          "Corrupted IPC message, had continuation token "
+          " but length ",
+          metadata_length);
+    }
 
-  if (flatbuffer_size + static_cast<int>(sizeof(int32_t)) > metadata_length) {
-    return Status::Invalid("flatbuffer size ", metadata_length,
+    // Valid IPC message, parse the message length now
+    flatbuffer_length = util::SafeLoadAs<int32_t>(buffer->data() + 4);
+    prefix_size = 8;
+  } else if (continuation == 0) {
+    // EOS
+    *message = nullptr;
+    return Status::OK();
+  } else {
+    // ARROW-6314: Backwards compatibility for reading old IPC
+    // messages produced prior to version 0.15.0
+    flatbuffer_length = continuation;
+    prefix_size = 4;
+  }
+
+  if (flatbuffer_length + prefix_size != metadata_length) {
+    return Status::Invalid("flatbuffer size ", flatbuffer_length,
                            " invalid. File offset: ", offset,
                            ", metadata length: ", metadata_length);
   }
 
-  auto metadata = SliceBuffer(buffer, 4, buffer->size() - 4);
+  std::shared_ptr<Buffer> metadata =
+      SliceBuffer(buffer, prefix_size, buffer->size() - prefix_size);
+  if (prefix_size == 4) {
+    // ARROW-6314: For old messages we copy the metadata to fix UBSAN
+    // issues with Flatbuffers. For new messages, they are already
+    // aligned
+    RETURN_NOT_OK(metadata->Copy(0, metadata->size(), &metadata));
+  }
   return Message::ReadFrom(offset + metadata_length, metadata, file, message);
 }
 
@@ -269,39 +311,105 @@ Status CheckAligned(io::FileInterface* stream, int32_t 
alignment) {
   int64_t current_position;
   ARROW_RETURN_NOT_OK(stream->Tell(&current_position));
   if (current_position % alignment != 0) {
-    return Status::Invalid("Stream is not aligned");
+    return Status::Invalid("Stream is not aligned pos: ", current_position,
+                           " alignment: ", alignment);
   } else {
     return Status::OK();
   }
 }
 
-Status ReadMessage(io::InputStream* file, std::unique_ptr<Message>* message) {
-  int32_t message_length = 0;
+namespace {
+
+Status ReadMessage(io::InputStream* file, MemoryPool* pool, bool copy_metadata,
+                   std::unique_ptr<Message>* message) {
+  int32_t continuation = 0;
   int64_t bytes_read = 0;
   RETURN_NOT_OK(file->Read(sizeof(int32_t), &bytes_read,
-                           reinterpret_cast<uint8_t*>(&message_length)));
+                           reinterpret_cast<uint8_t*>(&continuation)));
 
   if (bytes_read != sizeof(int32_t)) {
+    // EOS
     *message = nullptr;
     return Status::OK();
   }
 
-  if (message_length == 0) {
-    // Optional 0 EOS control message
+  int32_t flatbuffer_length = -1;
+  bool legacy_format = false;
+  if (continuation == kIpcContinuationToken) {
+    // Valid IPC message, read the message length now
+    RETURN_NOT_OK(file->Read(sizeof(int32_t), &bytes_read,
+                             reinterpret_cast<uint8_t*>(&flatbuffer_length)));
+  } else if (continuation == 0) {
+    // EOS
     *message = nullptr;
     return Status::OK();
+  } else {
+    // ARROW-6314: Backwards compatibility for reading old IPC
+    // messages produced prior to version 0.15.0
+    flatbuffer_length = continuation;
+    legacy_format = true;
   }
 
   std::shared_ptr<Buffer> metadata;
-  RETURN_NOT_OK(file->Read(message_length, &metadata));
-  if (metadata->size() != message_length) {
-    return Status::Invalid("Expected to read ", message_length, " metadata 
bytes, but ",
-                           "only read ", metadata->size());
+  if (legacy_format || copy_metadata) {
+    DCHECK_NE(pool, nullptr);
+    RETURN_NOT_OK(AllocateBuffer(pool, flatbuffer_length, &metadata));
+    RETURN_NOT_OK(file->Read(flatbuffer_length, &bytes_read, 
metadata->mutable_data()));
+  } else {
+    RETURN_NOT_OK(file->Read(flatbuffer_length, &metadata));
+    bytes_read = metadata->size();
+  }
+  if (bytes_read != flatbuffer_length) {
+    return Status::Invalid("Expected to read ", flatbuffer_length,
+                           " metadata bytes, but ", "only read ", bytes_read);
   }
 
   return Message::ReadFrom(metadata, file, message);
 }
 
+}  // namespace
+
+Status ReadMessage(io::InputStream* file, std::unique_ptr<Message>* out) {
+  return ReadMessage(file, default_memory_pool(), /*copy_metadata=*/false, 
out);
+}
+
+Status ReadMessageCopy(io::InputStream* file, MemoryPool* pool,
+                       std::unique_ptr<Message>* out) {
+  return ReadMessage(file, pool, /*copy_metadata=*/true, out);
+}
+
+Status WriteMessage(const Buffer& message, const IpcOptions& options,
+                    io::OutputStream* file, int32_t* message_length) {
+  const int32_t prefix_size = options.write_legacy_ipc_format ? 4 : 8;
+  const int32_t flatbuffer_size = static_cast<int32_t>(message.size());
+
+  int32_t padded_message_length = static_cast<int32_t>(
+      PaddedLength(flatbuffer_size + prefix_size, options.alignment));
+
+  int32_t padding = padded_message_length - flatbuffer_size - prefix_size;
+
+  // The returned message size includes the length prefix, the flatbuffer,
+  // plus padding
+  *message_length = padded_message_length;
+
+  // ARROW-6314: Write continuation / padding token
+  if (!options.write_legacy_ipc_format) {
+    RETURN_NOT_OK(file->Write(&kIpcContinuationToken, sizeof(int32_t)));
+  }
+
+  // Write the flatbuffer size prefix including padding
+  int32_t padded_flatbuffer_size = padded_message_length - prefix_size;
+  RETURN_NOT_OK(file->Write(&padded_flatbuffer_size, sizeof(int32_t)));
+
+  // Write the flatbuffer
+  RETURN_NOT_OK(file->Write(message.data(), flatbuffer_size));
+  if (padding > 0) {
+    RETURN_NOT_OK(file->Write(kPaddingBytes, padding));
+  }
+
+  return Status::OK();
+}
+
 // ----------------------------------------------------------------------
 // Implement InputStream message reader
 
diff --git a/cpp/src/arrow/ipc/message.h b/cpp/src/arrow/ipc/message.h
index 9c152d7..89be45e 100644
--- a/cpp/src/arrow/ipc/message.h
+++ b/cpp/src/arrow/ipc/message.h
@@ -17,8 +17,7 @@
 
 // C++ object model and user API for interprocess schema messaging
 
-#ifndef ARROW_IPC_MESSAGE_H
-#define ARROW_IPC_MESSAGE_H
+#pragma once
 
 #include <cstdint>
 #include <memory>
@@ -32,6 +31,7 @@
 namespace arrow {
 
 class Buffer;
+class MemoryPool;
 
 namespace io {
 
@@ -137,13 +137,10 @@ class ARROW_EXPORT Message {
   /// \brief Write length-prefixed metadata and body to output stream
   ///
   /// \param[in] file output stream to write to
-  /// \param[in] alignment byte alignment for metadata, usually 8 or
-  /// 64. Whether the body is padded depends on the metadata; if the body
-  /// buffer is smaller than the size indicated in the metadata, then extra
-  /// padding bytes will be written
+  /// \param[in] options IPC writing options including alignment
   /// \param[out] output_length the number of bytes written
   /// \return Status
-  Status SerializeTo(io::OutputStream* file, int32_t alignment,
+  Status SerializeTo(io::OutputStream* file, const IpcOptions& options,
                      int64_t* output_length) const;
 
   /// \brief Return true if the Message metadata passes Flatbuffer validation
@@ -223,15 +220,39 @@ Status AlignStream(io::OutputStream* stream, int32_t 
alignment = 8);
 ARROW_EXPORT
 Status CheckAligned(io::FileInterface* stream, int32_t alignment = 8);
 
-/// \brief Read encapsulated RPC message (metadata and body) from InputStream
+/// \brief Read encapsulated IPC message (metadata and body) from InputStream
 ///
-/// Read length-prefixed message with as-yet unknown length. Returns null if
-/// there are not enough bytes available or the message length is 0 (e.g. EOS
-/// in a stream)
+/// Returns null if there are not enough bytes available or the
+/// message length is 0 (e.g. EOS in a stream)
 ARROW_EXPORT
 Status ReadMessage(io::InputStream* stream, std::unique_ptr<Message>* message);
 
+/// \brief Read encapsulated IPC message (metadata and body) from InputStream
+///
+/// Like ReadMessage, except that the metadata is copied in a new buffer.
+/// This is necessary if the stream returns non-CPU buffers.
+ARROW_EXPORT
+Status ReadMessageCopy(io::InputStream* stream, MemoryPool* pool,
+                       std::unique_ptr<Message>* message);
+
+/// Write encapsulated IPC message Does not make assumptions about
+/// whether the stream is aligned already. Can write legacy (pre
+/// version 0.15.0) IPC message if option set
+///
+/// continuation: 0xFFFFFFFF
+/// message_size: int32
+/// message: const void*
+/// padding
+///
+/// \param[in] message a buffer containing the metadata to write
+/// \param[in] options IPC writing options, including alignment and
+/// legacy message support
+/// \param[in,out] file the OutputStream to write to
+/// \param[out] message_length the total size of the payload written including
+/// padding
+/// \return Status
+Status WriteMessage(const Buffer& message, const IpcOptions& options,
+                    io::OutputStream* file, int32_t* message_length);
+
 }  // namespace ipc
 }  // namespace arrow
-
-#endif  // ARROW_IPC_MESSAGE_H
diff --git a/cpp/src/arrow/ipc/metadata_internal.cc 
b/cpp/src/arrow/ipc/metadata_internal.cc
index 6810351..dff3369 100644
--- a/cpp/src/arrow/ipc/metadata_internal.cc
+++ b/cpp/src/arrow/ipc/metadata_internal.cc
@@ -1282,38 +1282,6 @@ Status GetSparseTensorMetadata(const Buffer& metadata, 
std::shared_ptr<DataType>
   return ConcreteTypeFromFlatbuffer(sparse_tensor->type_type(), type_data, {}, 
type);
 }
 
-// ----------------------------------------------------------------------
-// Implement message writing
-
-Status WriteMessage(const Buffer& message, int32_t alignment, 
io::OutputStream* file,
-                    int32_t* message_length) {
-  // ARROW-3212: We do not make assumptions that the output stream is aligned
-  int32_t padded_message_length = static_cast<int32_t>(message.size()) + 4;
-  const int32_t remainder = padded_message_length % alignment;
-  if (remainder != 0) {
-    padded_message_length += alignment - remainder;
-  }
-
-  // The returned message size includes the length prefix, the flatbuffer,
-  // plus padding
-  *message_length = padded_message_length;
-
-  // Write the flatbuffer size prefix including padding
-  int32_t flatbuffer_size = padded_message_length - 4;
-  RETURN_NOT_OK(file->Write(&flatbuffer_size, sizeof(int32_t)));
-
-  // Write the flatbuffer
-  RETURN_NOT_OK(file->Write(message.data(), message.size()));
-
-  // Write any padding
-  int32_t padding = padded_message_length - 
static_cast<int32_t>(message.size()) - 4;
-  if (padding > 0) {
-    RETURN_NOT_OK(file->Write(kPaddingBytes, padding));
-  }
-
-  return Status::OK();
-}
-
 }  // namespace internal
 }  // namespace ipc
 }  // namespace arrow
diff --git a/cpp/src/arrow/ipc/metadata_internal.h 
b/cpp/src/arrow/ipc/metadata_internal.h
index 828affd..420cfb1 100644
--- a/cpp/src/arrow/ipc/metadata_internal.h
+++ b/cpp/src/arrow/ipc/metadata_internal.h
@@ -128,22 +128,6 @@ static inline Status VerifyMessage(const uint8_t* data, 
int64_t size,
   return Status::OK();
 }
 
-/// Write a serialized message metadata with a length-prefix and padding to an
-/// 8-byte offset. Does not make assumptions about whether the stream is
-/// aligned already
-///
-/// <message_size: int32><message: const void*><padding>
-///
-/// \param[in] message a buffer containing the metadata to write
-/// \param[in] alignment the size multiple of the total message size including
-/// length prefix, metadata, and padding. Usually 8 or 64
-/// \param[in,out] file the OutputStream to write to
-/// \param[out] message_length the total size of the payload written including
-/// padding
-/// \return Status
-Status WriteMessage(const Buffer& message, int32_t alignment, 
io::OutputStream* file,
-                    int32_t* message_length);
-
 // Serialize arrow::Schema as a Flatbuffer
 //
 // \param[in] schema a Schema instance
diff --git a/cpp/src/arrow/ipc/options.h b/cpp/src/arrow/ipc/options.h
index d380402..3570c06 100644
--- a/cpp/src/arrow/ipc/options.h
+++ b/cpp/src/arrow/ipc/options.h
@@ -36,6 +36,14 @@ struct ARROW_EXPORT IpcOptions {
   // The maximum permitted schema nesting depth.
   int max_recursion_depth = kMaxNestingDepth;
 
+  // Write padding after memory buffers to this multiple of
+  // bytes. Generally 8 or 64
+  int32_t alignment = 8;
+
+  /// \brief Write the pre-0.15.0 encapsulated IPC message format
+  /// consisting of a 4-byte prefix instead of 8 byte
+  bool write_legacy_ipc_format = false;
+
   static IpcOptions Defaults();
 };
 
diff --git a/cpp/src/arrow/ipc/read_write_test.cc 
b/cpp/src/arrow/ipc/read_write_test.cc
index 9cbeacf..efd0a88 100644
--- a/cpp/src/arrow/ipc/read_write_test.cc
+++ b/cpp/src/arrow/ipc/read_write_test.cc
@@ -106,27 +106,69 @@ TEST(TestMessage, SerializeTo) {
 
   std::shared_ptr<io::BufferOutputStream> stream;
 
-  {
-    const int32_t alignment = 8;
-
+  auto CheckWithAlignment = [&](int32_t alignment) {
+    IpcOptions options;
+    options.alignment = alignment;
+    const int32_t prefix_size = 8;
     ASSERT_OK(io::BufferOutputStream::Create(1 << 10, default_memory_pool(), 
&stream));
-    ASSERT_OK(message->SerializeTo(stream.get(), alignment, &output_length));
+    ASSERT_OK(message->SerializeTo(stream.get(), options, &output_length));
     ASSERT_OK(stream->Tell(&position));
-    ASSERT_EQ(BitUtil::RoundUp(metadata->size() + 4, alignment) + body_length,
+    ASSERT_EQ(BitUtil::RoundUp(metadata->size() + prefix_size, alignment) + 
body_length,
               output_length);
     ASSERT_EQ(output_length, position);
-  }
+  };
 
-  {
-    const int32_t alignment = 64;
+  CheckWithAlignment(8);
+  CheckWithAlignment(64);
+}
 
-    ASSERT_OK(io::BufferOutputStream::Create(1 << 10, default_memory_pool(), 
&stream));
-    ASSERT_OK(message->SerializeTo(stream.get(), alignment, &output_length));
-    ASSERT_OK(stream->Tell(&position));
-    ASSERT_EQ(BitUtil::RoundUp(metadata->size() + 4, alignment) + body_length,
-              output_length);
-    ASSERT_EQ(output_length, position);
-  }
+void BuffersOverlapEquals(const Buffer& left, const Buffer& right) {
+  ASSERT_GT(left.size(), 0);
+  ASSERT_GT(right.size(), 0);
+  ASSERT_TRUE(left.Equals(right, std::min(left.size(), right.size())));
+}
+
+TEST(TestMessage, LegacyIpcBackwardsCompatibility) {
+  std::shared_ptr<RecordBatch> batch;
+  ASSERT_OK(MakeIntBatchSized(36, &batch));
+
+  auto RoundtripWithOptions = [&](const IpcOptions& arg_options,
+                                  std::shared_ptr<Buffer>* out_serialized,
+                                  std::unique_ptr<Message>* out) {
+    internal::IpcPayload payload;
+    ASSERT_OK(internal::GetRecordBatchPayload(*batch, arg_options, 
default_memory_pool(),
+                                              &payload));
+
+    std::shared_ptr<io::BufferOutputStream> stream;
+    ASSERT_OK(io::BufferOutputStream::Create(1 << 20, default_memory_pool(), 
&stream));
+
+    int32_t metadata_length = -1;
+    ASSERT_OK(
+        internal::WriteIpcPayload(payload, arg_options, stream.get(), 
&metadata_length));
+
+    ASSERT_OK(stream->Finish(out_serialized));
+    io::BufferReader io_reader(*out_serialized);
+    ASSERT_OK(ReadMessage(&io_reader, out));
+  };
+
+  std::shared_ptr<Buffer> serialized, legacy_serialized;
+  std::unique_ptr<Message> message, legacy_message;
+
+  IpcOptions options;
+  RoundtripWithOptions(options, &serialized, &message);
+
+  // First 4 bytes 0xFFFFFFFF Continuation marker
+  ASSERT_EQ(-1, util::SafeLoadAs<int32_t>(serialized->data()));
+
+  options.write_legacy_ipc_format = true;
+  RoundtripWithOptions(options, &legacy_serialized, &legacy_message);
+
+  // Check that the continuation marker is not written
+  ASSERT_NE(-1, util::SafeLoadAs<int32_t>(legacy_serialized->data()));
+
+  // Have to use the smaller size to exclude padding
+  BuffersOverlapEquals(*legacy_message->metadata(), *message->metadata());
+  ASSERT_TRUE(legacy_message->body()->Equals(*message->body()));
 }
 
 TEST(TestMessage, Verify) {
@@ -635,13 +677,14 @@ TEST_F(RecursionLimits, StressLimit) {
 #endif  // !defined(_WIN32) || defined(NDEBUG)
 
 struct FileWriterHelper {
-  Status Init(const std::shared_ptr<Schema>& schema) {
+  Status Init(const std::shared_ptr<Schema>& schema, const IpcOptions& 
options) {
     num_batches_written_ = 0;
 
     RETURN_NOT_OK(AllocateResizableBuffer(0, &buffer_));
     sink_.reset(new io::BufferOutputStream(buffer_));
-
-    return RecordBatchFileWriter::Open(sink_.get(), schema, &writer_);
+    ARROW_ASSIGN_OR_RAISE(writer_,
+                          RecordBatchFileWriter::Open(sink_.get(), schema, 
options));
+    return Status::OK();
   }
 
   Status WriteBatch(const std::shared_ptr<RecordBatch>& batch) {
@@ -680,11 +723,12 @@ struct FileWriterHelper {
 };
 
 struct StreamWriterHelper {
-  Status Init(const std::shared_ptr<Schema>& schema) {
+  Status Init(const std::shared_ptr<Schema>& schema, const IpcOptions& 
options) {
     RETURN_NOT_OK(AllocateResizableBuffer(0, &buffer_));
     sink_.reset(new io::BufferOutputStream(buffer_));
-
-    return RecordBatchStreamWriter::Open(sink_.get(), schema, &writer_);
+    ARROW_ASSIGN_OR_RAISE(writer_,
+                          RecordBatchStreamWriter::Open(sink_.get(), schema, 
options));
+    return Status::OK();
   }
 
   Status WriteBatch(const std::shared_ptr<RecordBatch>& batch) {
@@ -718,7 +762,7 @@ class ReaderWriterMixin {
 
   // Check simple RecordBatch roundtripping
   template <typename Param>
-  void TestRoundTrip(Param&& param) {
+  void TestRoundTrip(Param&& param, const IpcOptions& options) {
     std::shared_ptr<RecordBatch> batch1;
     std::shared_ptr<RecordBatch> batch2;
     ASSERT_OK(param(&batch1));  // NOLINT clang-tidy gtest issue
@@ -727,7 +771,7 @@ class ReaderWriterMixin {
     BatchVector in_batches = {batch1, batch2};
     BatchVector out_batches;
 
-    ASSERT_OK(RoundTripHelper(in_batches, &out_batches));
+    ASSERT_OK(RoundTripHelper(in_batches, options, &out_batches));
     ASSERT_EQ(out_batches.size(), in_batches.size());
 
     // Compare batches
@@ -741,7 +785,7 @@ class ReaderWriterMixin {
     ASSERT_OK(MakeDictionary(&batch));
 
     BatchVector out_batches;
-    ASSERT_OK(RoundTripHelper({batch}, &out_batches));
+    ASSERT_OK(RoundTripHelper({batch}, IpcOptions::Defaults(), &out_batches));
     ASSERT_EQ(out_batches.size(), 1);
 
     // TODO(wesm): This was broken in ARROW-3144. I'm not sure how to
@@ -764,7 +808,7 @@ class ReaderWriterMixin {
     schema = schema->WithMetadata(key_value_metadata({"some_key"}, 
{"some_value"}));
 
     WriterHelper writer_helper;
-    ASSERT_OK(writer_helper.Init(schema));
+    ASSERT_OK(writer_helper.Init(schema, IpcOptions::Defaults()));
     // Writing a record batch with a different schema
     ASSERT_RAISES(Invalid, writer_helper.WriteBatch(batch_ints));
     // Writing a record batch with the same schema (except metadata)
@@ -781,9 +825,10 @@ class ReaderWriterMixin {
   }
 
  private:
-  Status RoundTripHelper(const BatchVector& in_batches, BatchVector* 
out_batches) {
+  Status RoundTripHelper(const BatchVector& in_batches, const IpcOptions& 
options,
+                         BatchVector* out_batches) {
     WriterHelper writer_helper;
-    RETURN_NOT_OK(writer_helper.Init(in_batches[0]->schema()));
+    RETURN_NOT_OK(writer_helper.Init(in_batches[0]->schema(), options));
     for (const auto& batch : in_batches) {
       RETURN_NOT_OK(writer_helper.WriteBatch(batch));
     }
@@ -813,9 +858,21 @@ class TestFileFormat : public 
ReaderWriterMixin<FileWriterHelper>,
 class TestStreamFormat : public ReaderWriterMixin<StreamWriterHelper>,
                          public ::testing::TestWithParam<MakeRecordBatch*> {};
 
-TEST_P(TestFileFormat, RoundTrip) { TestRoundTrip(*GetParam()); }
+TEST_P(TestFileFormat, RoundTrip) {
+  TestRoundTrip(*GetParam(), IpcOptions::Defaults());
 
-TEST_P(TestStreamFormat, RoundTrip) { TestRoundTrip(*GetParam()); }
+  IpcOptions options;
+  options.write_legacy_ipc_format = true;
+  TestRoundTrip(*GetParam(), options);
+}
+
+TEST_P(TestStreamFormat, RoundTrip) {
+  TestRoundTrip(*GetParam(), IpcOptions::Defaults());
+
+  IpcOptions options;
+  options.write_legacy_ipc_format = true;
+  TestRoundTrip(*GetParam(), options);
+}
 
 INSTANTIATE_TEST_CASE_P(GenericIpcRoundTripTests, TestIpcRoundTrip, 
BATCH_CASES());
 INSTANTIATE_TEST_CASE_P(FileRoundTripTests, TestFileFormat, BATCH_CASES());
@@ -912,13 +969,15 @@ void SpliceMessages(std::shared_ptr<Buffer> stream,
       continue;
     }
 
+    IpcOptions options;
     internal::IpcPayload payload;
     payload.type = msg->type();
     payload.metadata = msg->metadata();
     payload.body_buffers.push_back(msg->body());
     payload.body_length = msg->body()->size();
     int32_t unused_metadata_length = -1;
-    ASSERT_OK(internal::WriteIpcPayload(payload, out.get(), 
&unused_metadata_length));
+    ASSERT_OK(
+        internal::WriteIpcPayload(payload, options, out.get(), 
&unused_metadata_length));
   }
   ASSERT_OK(out->Finish(spliced_stream));
 }
diff --git a/cpp/src/arrow/ipc/writer.cc b/cpp/src/arrow/ipc/writer.cc
index 81332a6..7127300 100644
--- a/cpp/src/arrow/ipc/writer.cc
+++ b/cpp/src/arrow/ipc/writer.cc
@@ -529,10 +529,9 @@ class DictionaryWriter : public RecordBatchSerializer {
   int64_t dictionary_id_;
 };
 
-Status WriteIpcPayload(const IpcPayload& payload, io::OutputStream* dst,
-                       int32_t* metadata_length) {
-  RETURN_NOT_OK(internal::WriteMessage(*payload.metadata, kArrowIpcAlignment, 
dst,
-                                       metadata_length));
+Status WriteIpcPayload(const IpcPayload& payload, const IpcOptions& options,
+                       io::OutputStream* dst, int32_t* metadata_length) {
+  RETURN_NOT_OK(WriteMessage(*payload.metadata, options, dst, 
metadata_length));
 
 #ifndef NDEBUG
   RETURN_NOT_OK(CheckAligned(dst));
@@ -604,7 +603,7 @@ Status WriteRecordBatch(const RecordBatch& batch, int64_t 
buffer_start_offset,
   // The body size is computed in the payload
   *body_length = payload.body_length;
 
-  return internal::WriteIpcPayload(payload, dst, metadata_length);
+  return internal::WriteIpcPayload(payload, options, dst, metadata_length);
 }
 
 Status WriteRecordBatchStream(const std::vector<std::shared_ptr<RecordBatch>>& 
batches,
@@ -625,7 +624,9 @@ Status WriteTensorHeader(const Tensor& tensor, 
io::OutputStream* dst,
                          int32_t* metadata_length) {
   std::shared_ptr<Buffer> metadata;
   RETURN_NOT_OK(internal::WriteTensorMessage(tensor, 0, &metadata));
-  return internal::WriteMessage(*metadata, kTensorAlignment, dst, 
metadata_length);
+  IpcOptions options;
+  options.alignment = kTensorAlignment;
+  return WriteMessage(*metadata, options, dst, metadata_length);
 }
 
 Status WriteStridedTensorData(int dim_index, int64_t offset, int elem_size,
@@ -818,19 +819,7 @@ Status WriteSparseTensor(const SparseTensor& 
sparse_tensor, io::OutputStream* ds
   RETURN_NOT_OK(writer.Assemble(sparse_tensor));
 
   *body_length = payload.body_length;
-  return internal::WriteIpcPayload(payload, dst, metadata_length);
-}
-
-Status WriteDictionary(int64_t dictionary_id, const std::shared_ptr<Array>& 
dictionary,
-                       int64_t buffer_start_offset, io::OutputStream* dst,
-                       int32_t* metadata_length, int64_t* body_length, 
MemoryPool* pool) {
-  auto options = IpcOptions::Defaults();
-  internal::IpcPayload payload;
-  RETURN_NOT_OK(GetDictionaryPayload(dictionary_id, dictionary, options, pool, 
&payload));
-
-  // The body size is computed in the payload
-  *body_length = payload.body_length;
-  return internal::WriteIpcPayload(payload, dst, metadata_length);
+  return internal::WriteIpcPayload(payload, IpcOptions::Defaults(), dst, 
metadata_length);
 }
 
 Status GetRecordBatchSize(const RecordBatch& batch, int64_t* size) {
@@ -1022,20 +1011,24 @@ class StreamBookKeeper {
     return Status::OK();
   }
 
+  Status WriteEOS() {
+    // End of stream marker
+    constexpr int64_t kEos = 0;
+    return Write(&kEos, sizeof(kEos));
+  }
+
  protected:
   io::OutputStream* sink_;
   int64_t position_;
 };
 
-// End of stream marker
-constexpr int32_t kEos = 0;
-
 /// A IpcPayloadWriter implementation that writes to a IPC stream
 /// (with an end-of-stream marker)
 class PayloadStreamWriter : public internal::IpcPayloadWriter,
                             protected StreamBookKeeper {
  public:
-  explicit PayloadStreamWriter(io::OutputStream* sink) : 
StreamBookKeeper(sink) {}
+  PayloadStreamWriter(const IpcOptions& options, io::OutputStream* sink)
+      : StreamBookKeeper(sink), options_(options) {}
 
   ~PayloadStreamWriter() override = default;
 
@@ -1046,23 +1039,24 @@ class PayloadStreamWriter : public 
internal::IpcPayloadWriter,
 #endif
 
     int32_t metadata_length = 0;  // unused
-    RETURN_NOT_OK(WriteIpcPayload(payload, sink_, &metadata_length));
+    RETURN_NOT_OK(WriteIpcPayload(payload, options_, sink_, &metadata_length));
     RETURN_NOT_OK(UpdatePositionCheckAligned());
     return Status::OK();
   }
 
-  Status Close() override {
-    // Write 0 EOS message
-    return Write(&kEos, sizeof(int32_t));
-  }
+  Status Close() override { return WriteEOS(); }
+
+ private:
+  IpcOptions options_;
 };
 
 /// A IpcPayloadWriter implementation that writes to a IPC file
 /// (with a footer as defined in File.fbs)
 class PayloadFileWriter : public internal::IpcPayloadWriter, protected 
StreamBookKeeper {
  public:
-  PayloadFileWriter(io::OutputStream* sink, const std::shared_ptr<Schema>& 
schema)
-      : StreamBookKeeper(sink), schema_(schema) {}
+  PayloadFileWriter(const IpcOptions& options, const std::shared_ptr<Schema>& 
schema,
+                    io::OutputStream* sink)
+      : StreamBookKeeper(sink), options_(options), schema_(schema) {}
 
   ~PayloadFileWriter() override = default;
 
@@ -1074,7 +1068,7 @@ class PayloadFileWriter : public 
internal::IpcPayloadWriter, protected StreamBoo
 
     // Metadata length must include padding, it's computed by WriteIpcPayload()
     FileBlock block = {position_, 0, payload.body_length};
-    RETURN_NOT_OK(WriteIpcPayload(payload, sink_, &block.metadata_length));
+    RETURN_NOT_OK(WriteIpcPayload(payload, options_, sink_, 
&block.metadata_length));
     RETURN_NOT_OK(UpdatePositionCheckAligned());
 
     // Record position and size of some message types, to list them in the 
footer
@@ -1107,7 +1101,7 @@ class PayloadFileWriter : public 
internal::IpcPayloadWriter, protected StreamBoo
 
   Status Close() override {
     // Write 0 EOS message for compatibility with sequential readers
-    RETURN_NOT_OK(Write(&kEos, sizeof(int32_t)));
+    RETURN_NOT_OK(WriteEOS());
 
     // Write file footer
     RETURN_NOT_OK(UpdatePosition());
@@ -1128,6 +1122,7 @@ class PayloadFileWriter : public 
internal::IpcPayloadWriter, protected StreamBoo
   }
 
  protected:
+  IpcOptions options_;
   std::shared_ptr<Schema> schema_;
   std::vector<FileBlock> dictionaries_;
   std::vector<FileBlock> record_batches_;
@@ -1141,9 +1136,9 @@ class RecordBatchStreamWriter::RecordBatchStreamWriterImpl
   RecordBatchStreamWriterImpl(io::OutputStream* sink,
                               const std::shared_ptr<Schema>& schema,
                               const IpcOptions& options)
-      : RecordBatchPayloadWriter(
-            std::unique_ptr<internal::IpcPayloadWriter>(new 
PayloadStreamWriter(sink)),
-            schema, options) {}
+      : RecordBatchPayloadWriter(std::unique_ptr<internal::IpcPayloadWriter>(
+                                     new PayloadStreamWriter(options, sink)),
+                                 schema, options) {}
 
   ~RecordBatchStreamWriterImpl() = default;
 };
@@ -1153,7 +1148,7 @@ class RecordBatchFileWriter::RecordBatchFileWriterImpl : 
public RecordBatchPaylo
   RecordBatchFileWriterImpl(io::OutputStream* sink, const 
std::shared_ptr<Schema>& schema,
                             const IpcOptions& options)
       : RecordBatchPayloadWriter(std::unique_ptr<internal::IpcPayloadWriter>(
-                                     new PayloadFileWriter(sink, schema)),
+                                     new PayloadFileWriter(options, schema, 
sink)),
                                  schema, options) {}
 
   ~RecordBatchFileWriterImpl() = default;
@@ -1277,7 +1272,7 @@ Status SerializeSchema(const Schema& schema, 
DictionaryMemo* dictionary_memo,
   RETURN_NOT_OK(io::BufferOutputStream::Create(1024, pool, &stream));
 
   auto options = IpcOptions::Defaults();
-  auto payload_writer = make_unique<PayloadStreamWriter>(stream.get());
+  auto payload_writer = make_unique<PayloadStreamWriter>(options, 
stream.get());
   RecordBatchPayloadWriter writer(std::move(payload_writer), schema, options,
                                   dictionary_memo);
   // Write schema and populate fields (but not dictionaries) in dictionary_memo
diff --git a/cpp/src/arrow/ipc/writer.h b/cpp/src/arrow/ipc/writer.h
index 75030c2..c673b0a 100644
--- a/cpp/src/arrow/ipc/writer.h
+++ b/cpp/src/arrow/ipc/writer.h
@@ -177,7 +177,9 @@ class ARROW_EXPORT RecordBatchFileWriter : public 
RecordBatchStreamWriter {
   std::unique_ptr<RecordBatchFileWriterImpl> file_impl_;
 };
 
-/// \brief Low-level API for writing a record batch (without schema) to an 
OutputStream
+/// \brief Low-level API for writing a record batch (without schema)
+/// to an OutputStream as encapsulated IPC message. See Arrow format
+/// documentation for more detail.
 ///
 /// \param[in] batch the record batch to write
 /// \param[in] buffer_start_offset the start offset to use in the buffer 
metadata,
@@ -189,20 +191,6 @@ class ARROW_EXPORT RecordBatchFileWriter : public 
RecordBatchStreamWriter {
 /// \param[in] options options for serialization
 /// \param[in] pool the memory pool to allocate memory from
 /// \return Status
-///
-/// Write the RecordBatch (collection of equal-length Arrow arrays) to the
-/// output stream in a contiguous block. The record batch metadata is written 
as
-/// a flatbuffer (see format/Message.fbs -- the RecordBatch message type)
-/// prefixed by its size, followed by each of the memory buffers in the batch
-/// written end to end (with appropriate alignment and padding):
-///
-/// \code
-/// <int32: metadata size> <uint8*: metadata> <buffers ...>
-/// \endcode
-///
-/// Finally, the absolute offsets (relative to the start of the output stream)
-/// to the end of the body and end of the metadata / data header (suffixed by
-/// the header size) is returned in out-variables
 ARROW_EXPORT
 Status WriteRecordBatch(const RecordBatch& batch, int64_t buffer_start_offset,
                         io::OutputStream* dst, int32_t* metadata_length,
@@ -392,8 +380,8 @@ Status GetRecordBatchPayload(const RecordBatch& batch, 
const IpcOptions& options
                              MemoryPool* pool, IpcPayload* out);
 
 ARROW_EXPORT
-Status WriteIpcPayload(const IpcPayload& payload, io::OutputStream* dst,
-                       int32_t* metadata_length);
+Status WriteIpcPayload(const IpcPayload& payload, const IpcOptions& options,
+                       io::OutputStream* dst, int32_t* metadata_length);
 
 }  // namespace internal
 
diff --git a/python/pyarrow/includes/libarrow.pxd 
b/python/pyarrow/includes/libarrow.pxd
index 2c05ec5..87393fb 100644
--- a/python/pyarrow/includes/libarrow.pxd
+++ b/python/pyarrow/includes/libarrow.pxd
@@ -964,6 +964,11 @@ cdef extern from "arrow/ipc/api.h" namespace "arrow::ipc" 
nogil:
         MessageType_V4" arrow::ipc::MetadataVersion::V4"
 
     cdef cppclass CIpcOptions" arrow::ipc::IpcOptions":
+        c_bool allow_64bit
+        int max_recursion_depth
+        int32_t alignment
+        c_bool write_legacy_ipc_format
+
         @staticmethod
         CIpcOptions Defaults()
 
@@ -989,7 +994,7 @@ cdef extern from "arrow/ipc/api.h" namespace "arrow::ipc" 
nogil:
         MetadataVersion metadata_version()
         MessageType type()
 
-        CStatus SerializeTo(OutputStream* stream, int32_t alignment,
+        CStatus SerializeTo(OutputStream* stream, const CIpcOptions& options,
                             int64_t* output_length)
 
     c_string FormatMessageType(MessageType type)
diff --git a/python/pyarrow/ipc.pxi b/python/pyarrow/ipc.pxi
index b1aca23..6710f63 100644
--- a/python/pyarrow/ipc.pxi
+++ b/python/pyarrow/ipc.pxi
@@ -76,13 +76,14 @@ cdef class Message:
         """
         cdef:
             int64_t output_length = 0
-            int32_t c_alignment = alignment
             OutputStream* out
+            CIpcOptions options
 
+        options.alignment = alignment
         out = sink.get_output_stream().get()
         with nogil:
             check_status(self.message.get()
-                         .SerializeTo(out, c_alignment, &output_length))
+                         .SerializeTo(out, options, &output_length))
 
     def serialize(self, alignment=8, memory_pool=None):
         """

Reply via email to