This is an automated email from the ASF dual-hosted git repository.
bkietz pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/arrow-nanoarrow.git
The following commit(s) were added to refs/heads/main by this push:
new 16f4306f fix: IPC streams did not include RecordBatch headers (#582)
16f4306f is described below
commit 16f4306f58193eff18013cc75d8b7b49f1ad4108
Author: Benjamin Kietzman <[email protected]>
AuthorDate: Mon Aug 12 16:12:16 2024 -0500
fix: IPC streams did not include RecordBatch headers (#582)
- As noted
https://github.com/apache/arrow-nanoarrow/pull/571#discussion_r1712730976
RecordBatch Messages were not being written during IPC streaming. This
has been corrected
- Testing uses one more round trip case where we have arrow C++ read
from a stream written by nanoarrow.
- During testing it also became apparent that encapsulated messages were
not always stored with the correct size; metadata_size should include
padding (but not the continuation or size itself)
https://arrow.apache.org/docs/format/Columnar.html#encapsulated-message-format
---
src/nanoarrow/ipc/encoder.c | 40 +++++++++++---------
src/nanoarrow/ipc/files_test.cc | 81 ++++++++++++++++++++++++++++-------------
src/nanoarrow/ipc/writer.c | 4 ++
3 files changed, 82 insertions(+), 43 deletions(-)
diff --git a/src/nanoarrow/ipc/encoder.c b/src/nanoarrow/ipc/encoder.c
index 53e3610e..5ab4140d 100644
--- a/src/nanoarrow/ipc/encoder.c
+++ b/src/nanoarrow/ipc/encoder.c
@@ -78,6 +78,18 @@ void ArrowIpcEncoderReset(struct ArrowIpcEncoder* encoder) {
memset(encoder, 0, sizeof(struct ArrowIpcEncoder));
}
+static ArrowErrorCode ArrowIpcEncoderWriteContinuationAndSize(struct
ArrowBuffer* out,
+ size_t size) {
+ _NANOARROW_CHECK_UPPER_LIMIT(size, INT32_MAX);
+ NANOARROW_RETURN_NOT_OK(ArrowBufferAppendInt32(out, -1));
+
+ if (ArrowIpcSystemEndianness() == NANOARROW_IPC_ENDIANNESS_BIG) {
+ return ArrowBufferAppendInt32(out, (int32_t)bswap32((uint32_t)size));
+ } else {
+ return ArrowBufferAppendInt32(out, (int32_t)size);
+ }
+}
+
ArrowErrorCode ArrowIpcEncoderFinalizeBuffer(struct ArrowIpcEncoder* encoder,
char encapsulate, struct
ArrowBuffer* out) {
NANOARROW_DCHECK(encoder != NULL && encoder->private_data != NULL && out !=
NULL);
@@ -85,25 +97,19 @@ ArrowErrorCode ArrowIpcEncoderFinalizeBuffer(struct
ArrowIpcEncoder* encoder,
(struct ArrowIpcEncoderPrivate*)encoder->private_data;
size_t size = flatcc_builder_get_buffer_size(&private->builder);
- _NANOARROW_CHECK_UPPER_LIMIT(size, INT32_MAX);
- int32_t header[] = {-1, (int32_t)size};
- if (ArrowIpcSystemEndianness() == NANOARROW_IPC_ENDIANNESS_BIG) {
- header[1] = (int32_t)bswap32((uint32_t)size);
+ if (encapsulate) {
+ int64_t padded_size = _ArrowRoundUpToMultipleOf8(size);
+ NANOARROW_RETURN_NOT_OK(
+ ArrowBufferReserve(out, sizeof(int32_t) + sizeof(int32_t) +
padded_size));
+ NANOARROW_ASSERT_OK(ArrowIpcEncoderWriteContinuationAndSize(out,
padded_size));
+ } else {
+ NANOARROW_RETURN_NOT_OK(ArrowBufferReserve(out, size));
}
if (size == 0) {
// Finalizing an empty flatcc_builder_t triggers an assertion
- return encapsulate ? ArrowBufferAppend(out, &header, sizeof(header)) :
NANOARROW_OK;
- }
-
- if (encapsulate) {
- int64_t encapsulated_size =
- _ArrowRoundUpToMultipleOf8(sizeof(int32_t) + sizeof(int32_t) + size);
- NANOARROW_RETURN_NOT_OK(ArrowBufferReserve(out, encapsulated_size));
- NANOARROW_RETURN_NOT_OK(ArrowBufferAppend(out, &header, sizeof(header)));
- } else {
- NANOARROW_RETURN_NOT_OK(ArrowBufferReserve(out, size));
+ return NANOARROW_OK;
}
void* data =
@@ -112,11 +118,9 @@ ArrowErrorCode ArrowIpcEncoderFinalizeBuffer(struct
ArrowIpcEncoder* encoder,
NANOARROW_UNUSED(data);
out->size_bytes += size;
- if (encapsulate) {
+ while (encapsulate && out->size_bytes % 8 != 0) {
// zero padding bytes, if any
- int64_t padded_size = _ArrowRoundUpToMultipleOf8(out->size_bytes);
- memset(out->data + out->size_bytes, 0, padded_size - out->size_bytes);
- out->size_bytes = padded_size;
+ out->data[out->size_bytes++] = 0;
}
// don't deallocate yet, just wipe the builder's current Message
diff --git a/src/nanoarrow/ipc/files_test.cc b/src/nanoarrow/ipc/files_test.cc
index 9f8f65ac..011d5935 100644
--- a/src/nanoarrow/ipc/files_test.cc
+++ b/src/nanoarrow/ipc/files_test.cc
@@ -88,6 +88,16 @@ class TestFile {
return path_.substr(0, dot_pos) + std::string(".json.gz");
}
+ ArrowErrorCode GetArrowArrayStreamIPC(struct ArrowBuffer* content,
+ ArrowArrayStream* out, ArrowError*
error) {
+ nanoarrow::ipc::UniqueInputStream input;
+ NANOARROW_RETURN_NOT_OK_WITH_ERROR(
+ ArrowIpcInputStreamInitBuffer(input.get(), content), error);
+ NANOARROW_RETURN_NOT_OK_WITH_ERROR(
+ ArrowIpcArrayStreamReaderInit(out, input.get(), nullptr), error);
+ return NANOARROW_OK;
+ }
+
ArrowErrorCode GetArrowArrayStreamIPC(const std::string& dir_prefix,
ArrowArrayStream* out, ArrowError*
error) {
std::stringstream path_builder;
@@ -96,28 +106,19 @@ class TestFile {
// Read using nanoarrow_ipc
nanoarrow::UniqueBuffer content;
NANOARROW_RETURN_NOT_OK(ReadFileBuffer(path_builder.str(), content.get(),
error));
-
- struct ArrowIpcInputStream input;
- NANOARROW_RETURN_NOT_OK_WITH_ERROR(
- ArrowIpcInputStreamInitBuffer(&input, content.get()), error);
- NANOARROW_RETURN_NOT_OK_WITH_ERROR(
- ArrowIpcArrayStreamReaderInit(out, &input, nullptr), error);
- return NANOARROW_OK;
+ return GetArrowArrayStreamIPC(content.get(), out, error);
}
- ArrowErrorCode ReadArrowArrayStreamIPC(const std::string& dir_prefix,
+ ArrowErrorCode ReadArrowArrayStreamIPC(struct ArrowArrayStream* stream,
struct ArrowSchema* schema,
std::vector<nanoarrow::UniqueArray>*
arrays,
ArrowError* error) {
- nanoarrow::UniqueArrayStream stream;
- NANOARROW_RETURN_NOT_OK(GetArrowArrayStreamIPC(dir_prefix, stream.get(),
error));
-
- NANOARROW_RETURN_NOT_OK(ArrowArrayStreamGetSchema(stream.get(), schema,
error));
+ NANOARROW_RETURN_NOT_OK(ArrowArrayStreamGetSchema(stream, schema, error));
while (true) {
nanoarrow::UniqueArray array;
- NANOARROW_RETURN_NOT_OK(ArrowArrayStreamGetNext(stream.get(),
array.get(), error));
+ NANOARROW_RETURN_NOT_OK(ArrowArrayStreamGetNext(stream, array.get(),
error));
if (array->release == nullptr) {
break;
@@ -128,6 +129,15 @@ class TestFile {
return NANOARROW_OK;
}
+ ArrowErrorCode ReadArrowArrayStreamIPC(const std::string& dir_prefix,
+ struct ArrowSchema* schema,
+ std::vector<nanoarrow::UniqueArray>*
arrays,
+ ArrowError* error) {
+ nanoarrow::UniqueArrayStream stream;
+ NANOARROW_RETURN_NOT_OK(GetArrowArrayStreamIPC(dir_prefix, stream.get(),
error));
+ return ReadArrowArrayStreamIPC(stream.get(), schema, arrays, error);
+ }
+
ArrowErrorCode GetArrowArrayStreamCheckJSON(const std::string& dir_prefix,
ArrowArrayStream* out,
ArrowError* error) {
std::stringstream path_builder;
@@ -250,20 +260,36 @@ class TestFile {
// Read the same file with Arrow C++
auto maybe_table_arrow =
ReadTable(io::ReadableFile::Open(path_builder.str()));
- FAIL_RESULT_NOT_OK(maybe_table_arrow);
+ {
+ SCOPED_TRACE("Read the same file with Arrow C++");
+ FAIL_RESULT_NOT_OK(maybe_table_arrow);
+ AssertEqualsTable(std::move(schema), std::move(arrays),
+ maybe_table_arrow.ValueUnsafe());
+ }
- AssertEqualsTable(std::move(schema), std::move(arrays),
- maybe_table_arrow.ValueUnsafe());
+ auto maybe_table_roundtripped =
ReadTable(BufferInputStream(roundtripped.get()));
+ {
+ SCOPED_TRACE("Read the roundtripped buffer using Arrow C++");
+ FAIL_RESULT_NOT_OK(maybe_table_roundtripped);
+
+ AssertEqualsTable(maybe_table_roundtripped.ValueUnsafe(),
+ maybe_table_arrow.ValueUnsafe());
+ }
- // Read the roundtripped buffer using nanoarrow
nanoarrow::UniqueSchema roundtripped_schema;
std::vector<nanoarrow::UniqueArray> roundtripped_arrays;
- ASSERT_EQ(ReadArrowArrayStreamIPC(dir_prefix, roundtripped_schema.get(),
- &roundtripped_arrays, &error),
- NANOARROW_OK);
-
- AssertEqualsTable(std::move(roundtripped_schema),
std::move(roundtripped_arrays),
- maybe_table_arrow.ValueUnsafe());
+ {
+ SCOPED_TRACE("Read the roundtripped buffer using nanoarrow");
+ nanoarrow::UniqueArrayStream array_stream;
+ ASSERT_EQ(GetArrowArrayStreamIPC(roundtripped.get(), array_stream.get(),
&error),
+ NANOARROW_OK);
+ ASSERT_EQ(ReadArrowArrayStreamIPC(array_stream.get(),
roundtripped_schema.get(),
+ &roundtripped_arrays, &error),
+ NANOARROW_OK);
+
+ AssertEqualsTable(std::move(roundtripped_schema),
std::move(roundtripped_arrays),
+ maybe_table_arrow.ValueUnsafe());
+ }
}
Result<std::shared_ptr<Table>> ReadTable(
@@ -286,13 +312,18 @@ class TestFile {
return Table::FromRecordBatches(std::move(arrow_schema),
std::move(batches));
}
+ void AssertEqualsTable(const std::shared_ptr<Table>& actual,
+ const std::shared_ptr<Table>& expected) {
+ ASSERT_TRUE(actual->schema()->Equals(expected->schema(), true));
+ EXPECT_TRUE(actual->Equals(*expected, true));
+ }
+
void AssertEqualsTable(nanoarrow::UniqueSchema schema,
std::vector<nanoarrow::UniqueArray> arrays,
const std::shared_ptr<Table>& expected) {
auto maybe_table = ToTable(std::move(schema), std::move(arrays));
FAIL_RESULT_NOT_OK(maybe_table);
-
ASSERT_TRUE(expected->schema()->Equals(maybe_table.ValueUnsafe()->schema(),
true));
- EXPECT_TRUE(maybe_table.ValueUnsafe()->Equals(*expected, true));
+ AssertEqualsTable(maybe_table.ValueUnsafe(), expected);
}
void TestIPCCheckJSON(const std::string& dir_prefix) {
diff --git a/src/nanoarrow/ipc/writer.c b/src/nanoarrow/ipc/writer.c
index 776a22cd..6a5372a4 100644
--- a/src/nanoarrow/ipc/writer.c
+++ b/src/nanoarrow/ipc/writer.c
@@ -270,6 +270,10 @@ ArrowErrorCode ArrowIpcWriterWriteArrayView(struct
ArrowIpcWriter* writer,
NANOARROW_RETURN_NOT_OK(ArrowIpcEncoderEncodeSimpleRecordBatch(
&private->encoder, in, &private->body_buffer, error));
+ NANOARROW_RETURN_NOT_OK_WITH_ERROR(
+ ArrowIpcEncoderFinalizeBuffer(&private->encoder, /*encapsulate=*/1,
+ &private->buffer),
+ error);
NANOARROW_RETURN_NOT_OK(ArrowIpcOutputStreamWrite(
&private->output_stream, ArrowBufferToBufferView(&private->buffer),
error));