This is an automated email from the ASF dual-hosted git repository.

bkietz pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/arrow-nanoarrow.git


The following commit(s) were added to refs/heads/main by this push:
     new 16f4306f fix: IPC streams did not include RecordBatch headers (#582)
16f4306f is described below

commit 16f4306f58193eff18013cc75d8b7b49f1ad4108
Author: Benjamin Kietzman <[email protected]>
AuthorDate: Mon Aug 12 16:12:16 2024 -0500

    fix: IPC streams did not include RecordBatch headers (#582)
    
    - As noted
    https://github.com/apache/arrow-nanoarrow/pull/571#discussion_r1712730976
    RecordBatch Messages were not being written during IPC streaming. This
    has been corrected
    - Testing uses one more round trip case where we have arrow C++ read
    from a stream written by nanoarrow.
    - During testing it also became apparent that encapsulated messages were
    not always stored with the correct size; metadata_size should include
    padding (but not the continuation or size itself)
    
https://arrow.apache.org/docs/format/Columnar.html#encapsulated-message-format
---
 src/nanoarrow/ipc/encoder.c     | 40 +++++++++++---------
 src/nanoarrow/ipc/files_test.cc | 81 ++++++++++++++++++++++++++++-------------
 src/nanoarrow/ipc/writer.c      |  4 ++
 3 files changed, 82 insertions(+), 43 deletions(-)

diff --git a/src/nanoarrow/ipc/encoder.c b/src/nanoarrow/ipc/encoder.c
index 53e3610e..5ab4140d 100644
--- a/src/nanoarrow/ipc/encoder.c
+++ b/src/nanoarrow/ipc/encoder.c
@@ -78,6 +78,18 @@ void ArrowIpcEncoderReset(struct ArrowIpcEncoder* encoder) {
   memset(encoder, 0, sizeof(struct ArrowIpcEncoder));
 }
 
+static ArrowErrorCode ArrowIpcEncoderWriteContinuationAndSize(struct 
ArrowBuffer* out,
+                                                              size_t size) {
+  _NANOARROW_CHECK_UPPER_LIMIT(size, INT32_MAX);
+  NANOARROW_RETURN_NOT_OK(ArrowBufferAppendInt32(out, -1));
+
+  if (ArrowIpcSystemEndianness() == NANOARROW_IPC_ENDIANNESS_BIG) {
+    return ArrowBufferAppendInt32(out, (int32_t)bswap32((uint32_t)size));
+  } else {
+    return ArrowBufferAppendInt32(out, (int32_t)size);
+  }
+}
+
 ArrowErrorCode ArrowIpcEncoderFinalizeBuffer(struct ArrowIpcEncoder* encoder,
                                              char encapsulate, struct 
ArrowBuffer* out) {
   NANOARROW_DCHECK(encoder != NULL && encoder->private_data != NULL && out != 
NULL);
@@ -85,25 +97,19 @@ ArrowErrorCode ArrowIpcEncoderFinalizeBuffer(struct 
ArrowIpcEncoder* encoder,
       (struct ArrowIpcEncoderPrivate*)encoder->private_data;
 
   size_t size = flatcc_builder_get_buffer_size(&private->builder);
-  _NANOARROW_CHECK_UPPER_LIMIT(size, INT32_MAX);
 
-  int32_t header[] = {-1, (int32_t)size};
-  if (ArrowIpcSystemEndianness() == NANOARROW_IPC_ENDIANNESS_BIG) {
-    header[1] = (int32_t)bswap32((uint32_t)size);
+  if (encapsulate) {
+    int64_t padded_size = _ArrowRoundUpToMultipleOf8(size);
+    NANOARROW_RETURN_NOT_OK(
+        ArrowBufferReserve(out, sizeof(int32_t) + sizeof(int32_t) + 
padded_size));
+    NANOARROW_ASSERT_OK(ArrowIpcEncoderWriteContinuationAndSize(out, 
padded_size));
+  } else {
+    NANOARROW_RETURN_NOT_OK(ArrowBufferReserve(out, size));
   }
 
   if (size == 0) {
     // Finalizing an empty flatcc_builder_t triggers an assertion
-    return encapsulate ? ArrowBufferAppend(out, &header, sizeof(header)) : 
NANOARROW_OK;
-  }
-
-  if (encapsulate) {
-    int64_t encapsulated_size =
-        _ArrowRoundUpToMultipleOf8(sizeof(int32_t) + sizeof(int32_t) + size);
-    NANOARROW_RETURN_NOT_OK(ArrowBufferReserve(out, encapsulated_size));
-    NANOARROW_RETURN_NOT_OK(ArrowBufferAppend(out, &header, sizeof(header)));
-  } else {
-    NANOARROW_RETURN_NOT_OK(ArrowBufferReserve(out, size));
+    return NANOARROW_OK;
   }
 
   void* data =
@@ -112,11 +118,9 @@ ArrowErrorCode ArrowIpcEncoderFinalizeBuffer(struct 
ArrowIpcEncoder* encoder,
   NANOARROW_UNUSED(data);
   out->size_bytes += size;
 
-  if (encapsulate) {
+  while (encapsulate && out->size_bytes % 8 != 0) {
     // zero padding bytes, if any
-    int64_t padded_size = _ArrowRoundUpToMultipleOf8(out->size_bytes);
-    memset(out->data + out->size_bytes, 0, padded_size - out->size_bytes);
-    out->size_bytes = padded_size;
+    out->data[out->size_bytes++] = 0;
   }
 
   // don't deallocate yet, just wipe the builder's current Message
diff --git a/src/nanoarrow/ipc/files_test.cc b/src/nanoarrow/ipc/files_test.cc
index 9f8f65ac..011d5935 100644
--- a/src/nanoarrow/ipc/files_test.cc
+++ b/src/nanoarrow/ipc/files_test.cc
@@ -88,6 +88,16 @@ class TestFile {
     return path_.substr(0, dot_pos) + std::string(".json.gz");
   }
 
+  ArrowErrorCode GetArrowArrayStreamIPC(struct ArrowBuffer* content,
+                                        ArrowArrayStream* out, ArrowError* 
error) {
+    nanoarrow::ipc::UniqueInputStream input;
+    NANOARROW_RETURN_NOT_OK_WITH_ERROR(
+        ArrowIpcInputStreamInitBuffer(input.get(), content), error);
+    NANOARROW_RETURN_NOT_OK_WITH_ERROR(
+        ArrowIpcArrayStreamReaderInit(out, input.get(), nullptr), error);
+    return NANOARROW_OK;
+  }
+
   ArrowErrorCode GetArrowArrayStreamIPC(const std::string& dir_prefix,
                                         ArrowArrayStream* out, ArrowError* 
error) {
     std::stringstream path_builder;
@@ -96,28 +106,19 @@ class TestFile {
     // Read using nanoarrow_ipc
     nanoarrow::UniqueBuffer content;
     NANOARROW_RETURN_NOT_OK(ReadFileBuffer(path_builder.str(), content.get(), 
error));
-
-    struct ArrowIpcInputStream input;
-    NANOARROW_RETURN_NOT_OK_WITH_ERROR(
-        ArrowIpcInputStreamInitBuffer(&input, content.get()), error);
-    NANOARROW_RETURN_NOT_OK_WITH_ERROR(
-        ArrowIpcArrayStreamReaderInit(out, &input, nullptr), error);
-    return NANOARROW_OK;
+    return GetArrowArrayStreamIPC(content.get(), out, error);
   }
 
-  ArrowErrorCode ReadArrowArrayStreamIPC(const std::string& dir_prefix,
+  ArrowErrorCode ReadArrowArrayStreamIPC(struct ArrowArrayStream* stream,
                                          struct ArrowSchema* schema,
                                          std::vector<nanoarrow::UniqueArray>* 
arrays,
                                          ArrowError* error) {
-    nanoarrow::UniqueArrayStream stream;
-    NANOARROW_RETURN_NOT_OK(GetArrowArrayStreamIPC(dir_prefix, stream.get(), 
error));
-
-    NANOARROW_RETURN_NOT_OK(ArrowArrayStreamGetSchema(stream.get(), schema, 
error));
+    NANOARROW_RETURN_NOT_OK(ArrowArrayStreamGetSchema(stream, schema, error));
 
     while (true) {
       nanoarrow::UniqueArray array;
 
-      NANOARROW_RETURN_NOT_OK(ArrowArrayStreamGetNext(stream.get(), 
array.get(), error));
+      NANOARROW_RETURN_NOT_OK(ArrowArrayStreamGetNext(stream, array.get(), 
error));
 
       if (array->release == nullptr) {
         break;
@@ -128,6 +129,15 @@ class TestFile {
     return NANOARROW_OK;
   }
 
+  ArrowErrorCode ReadArrowArrayStreamIPC(const std::string& dir_prefix,
+                                         struct ArrowSchema* schema,
+                                         std::vector<nanoarrow::UniqueArray>* 
arrays,
+                                         ArrowError* error) {
+    nanoarrow::UniqueArrayStream stream;
+    NANOARROW_RETURN_NOT_OK(GetArrowArrayStreamIPC(dir_prefix, stream.get(), 
error));
+    return ReadArrowArrayStreamIPC(stream.get(), schema, arrays, error);
+  }
+
   ArrowErrorCode GetArrowArrayStreamCheckJSON(const std::string& dir_prefix,
                                               ArrowArrayStream* out, 
ArrowError* error) {
     std::stringstream path_builder;
@@ -250,20 +260,36 @@ class TestFile {
 
     // Read the same file with Arrow C++
     auto maybe_table_arrow = 
ReadTable(io::ReadableFile::Open(path_builder.str()));
-    FAIL_RESULT_NOT_OK(maybe_table_arrow);
+    {
+      SCOPED_TRACE("Read the same file with Arrow C++");
+      FAIL_RESULT_NOT_OK(maybe_table_arrow);
+      AssertEqualsTable(std::move(schema), std::move(arrays),
+                        maybe_table_arrow.ValueUnsafe());
+    }
 
-    AssertEqualsTable(std::move(schema), std::move(arrays),
-                      maybe_table_arrow.ValueUnsafe());
+    auto maybe_table_roundtripped = 
ReadTable(BufferInputStream(roundtripped.get()));
+    {
+      SCOPED_TRACE("Read the roundtripped buffer using Arrow C++");
+      FAIL_RESULT_NOT_OK(maybe_table_roundtripped);
+
+      AssertEqualsTable(maybe_table_roundtripped.ValueUnsafe(),
+                        maybe_table_arrow.ValueUnsafe());
+    }
 
-    // Read the roundtripped buffer using nanoarrow
     nanoarrow::UniqueSchema roundtripped_schema;
     std::vector<nanoarrow::UniqueArray> roundtripped_arrays;
-    ASSERT_EQ(ReadArrowArrayStreamIPC(dir_prefix, roundtripped_schema.get(),
-                                      &roundtripped_arrays, &error),
-              NANOARROW_OK);
-
-    AssertEqualsTable(std::move(roundtripped_schema), 
std::move(roundtripped_arrays),
-                      maybe_table_arrow.ValueUnsafe());
+    {
+      SCOPED_TRACE("Read the roundtripped buffer using nanoarrow");
+      nanoarrow::UniqueArrayStream array_stream;
+      ASSERT_EQ(GetArrowArrayStreamIPC(roundtripped.get(), array_stream.get(), 
&error),
+                NANOARROW_OK);
+      ASSERT_EQ(ReadArrowArrayStreamIPC(array_stream.get(), 
roundtripped_schema.get(),
+                                        &roundtripped_arrays, &error),
+                NANOARROW_OK);
+
+      AssertEqualsTable(std::move(roundtripped_schema), 
std::move(roundtripped_arrays),
+                        maybe_table_arrow.ValueUnsafe());
+    }
   }
 
   Result<std::shared_ptr<Table>> ReadTable(
@@ -286,13 +312,18 @@ class TestFile {
     return Table::FromRecordBatches(std::move(arrow_schema), 
std::move(batches));
   }
 
+  void AssertEqualsTable(const std::shared_ptr<Table>& actual,
+                         const std::shared_ptr<Table>& expected) {
+    ASSERT_TRUE(actual->schema()->Equals(expected->schema(), true));
+    EXPECT_TRUE(actual->Equals(*expected, true));
+  }
+
   void AssertEqualsTable(nanoarrow::UniqueSchema schema,
                          std::vector<nanoarrow::UniqueArray> arrays,
                          const std::shared_ptr<Table>& expected) {
     auto maybe_table = ToTable(std::move(schema), std::move(arrays));
     FAIL_RESULT_NOT_OK(maybe_table);
-    
ASSERT_TRUE(expected->schema()->Equals(maybe_table.ValueUnsafe()->schema(), 
true));
-    EXPECT_TRUE(maybe_table.ValueUnsafe()->Equals(*expected, true));
+    AssertEqualsTable(maybe_table.ValueUnsafe(), expected);
   }
 
   void TestIPCCheckJSON(const std::string& dir_prefix) {
diff --git a/src/nanoarrow/ipc/writer.c b/src/nanoarrow/ipc/writer.c
index 776a22cd..6a5372a4 100644
--- a/src/nanoarrow/ipc/writer.c
+++ b/src/nanoarrow/ipc/writer.c
@@ -270,6 +270,10 @@ ArrowErrorCode ArrowIpcWriterWriteArrayView(struct 
ArrowIpcWriter* writer,
 
   NANOARROW_RETURN_NOT_OK(ArrowIpcEncoderEncodeSimpleRecordBatch(
       &private->encoder, in, &private->body_buffer, error));
+  NANOARROW_RETURN_NOT_OK_WITH_ERROR(
+      ArrowIpcEncoderFinalizeBuffer(&private->encoder, /*encapsulate=*/1,
+                                    &private->buffer),
+      error);
 
   NANOARROW_RETURN_NOT_OK(ArrowIpcOutputStreamWrite(
       &private->output_stream, ArrowBufferToBufferView(&private->buffer), 
error));

Reply via email to