This is an automated email from the ASF dual-hosted git repository.
paleolimbot pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/arrow-nanoarrow.git
The following commit(s) were added to refs/heads/main by this push:
new 2040e74a feat: Add IPC writer scaffolding (#564)
2040e74a is described below
commit 2040e74add0a3c8a36877bce35c7dc43c27ba0e4
Author: Benjamin Kietzman <[email protected]>
AuthorDate: Thu Jul 25 10:16:08 2024 -0500
feat: Add IPC writer scaffolding (#564)
Add `ArrowIpcEncoder`, init/reset, and tests. Extracted from
https://github.com/apache/arrow-nanoarrow/pull/555#pullrequestreview-2186295630
---
CMakeLists.txt | 67 +++++++++++++------------------
ci/scripts/bundle.py | 1 +
src/nanoarrow/ipc/decoder.c | 11 -----
src/nanoarrow/ipc/decoder_test.cc | 14 +------
src/nanoarrow/ipc/encoder.c | 84 +++++++++++++++++++++++++++++++++++++++
src/nanoarrow/ipc/encoder_test.cc | 62 +++++++++++++++++++++++++++++
src/nanoarrow/nanoarrow_ipc.h | 62 +++++++++++++++++++++++++++++
src/nanoarrow/nanoarrow_ipc.hpp | 19 +++++++++
8 files changed, 256 insertions(+), 64 deletions(-)
diff --git a/CMakeLists.txt b/CMakeLists.txt
index f54fbaef..2cb8ad90 100644
--- a/CMakeLists.txt
+++ b/CMakeLists.txt
@@ -223,8 +223,9 @@ if(NANOARROW_IPC)
endif()
if(NOT NANOARROW_BUNDLE)
- set(NANOARROW_IPC_BUILD_SOURCES src/nanoarrow/ipc/decoder.c
- src/nanoarrow/ipc/reader.c)
+ set(NANOARROW_IPC_BUILD_SOURCES
+ src/nanoarrow/ipc/decoder.c src/nanoarrow/ipc/encoder.c
+ src/nanoarrow/ipc/reader.c)
endif()
add_library(nanoarrow_ipc ${NANOARROW_IPC_BUILD_SOURCES})
@@ -418,51 +419,37 @@ if(NANOARROW_BUILD_TESTS)
endif()
enable_testing()
-
- add_executable(nanoarrow_ipc_decoder_test
src/nanoarrow/ipc/decoder_test.cc)
- add_executable(nanoarrow_ipc_reader_test src/nanoarrow/ipc/reader_test.cc)
- add_executable(nanoarrow_ipc_files_test src/nanoarrow/ipc/files_test.cc)
- add_executable(nanoarrow_ipc_hpp_test src/nanoarrow/ipc/ipc_hpp_test.cc)
+ include(GoogleTest)
if(NANOARROW_CODE_COVERAGE)
target_compile_options(ipc_coverage_config INTERFACE -O0 -g --coverage)
target_link_options(ipc_coverage_config INTERFACE --coverage)
target_link_libraries(nanoarrow_ipc PRIVATE ipc_coverage_config)
endif()
- target_link_libraries(nanoarrow_ipc_decoder_test
- nanoarrow_ipc
- nanoarrow
- flatccrt
- ${NANOARROW_ARROW_TARGET}
- gtest_main
- ipc_coverage_config)
- target_link_libraries(nanoarrow_ipc_reader_test
- nanoarrow_ipc
- nanoarrow
- flatccrt
- gtest_main
- ipc_coverage_config)
- target_link_libraries(nanoarrow_ipc_files_test
- nanoarrow_ipc
- nanoarrow
- flatccrt
- ${NANOARROW_ARROW_TARGET}
- nlohmann_json
- ZLIB::ZLIB
- gtest_main
- ipc_coverage_config)
- target_link_libraries(nanoarrow_ipc_hpp_test
- nanoarrow_ipc
- nanoarrow
- ${NANOARROW_ARROW_TARGET}
- gtest_main
- ipc_coverage_config)
- include(GoogleTest)
- gtest_discover_tests(nanoarrow_ipc_decoder_test)
- gtest_discover_tests(nanoarrow_ipc_reader_test)
- gtest_discover_tests(nanoarrow_ipc_files_test)
- gtest_discover_tests(nanoarrow_ipc_hpp_test)
+ foreach(name
+ decoder
+ encoder
+ reader
+ files
+ ipc_hpp)
+ add_executable(nanoarrow_ipc_${name}_test
src/nanoarrow/ipc/${name}_test.cc)
+
+ target_link_libraries(nanoarrow_ipc_${name}_test
+ nanoarrow_ipc
+ nanoarrow
+ ${NANOARROW_ARROW_TARGET}
+ gtest_main
+ ipc_coverage_config)
+
+ if(NOT (name MATCHES "_hpp_"))
+ target_link_libraries(nanoarrow_ipc_${name}_test flatccrt)
+ endif()
+
+ gtest_discover_tests(nanoarrow_ipc_${name}_test)
+ endforeach()
+
+ target_link_libraries(nanoarrow_ipc_files_test nlohmann_json ZLIB::ZLIB)
endif()
if(NANOARROW_DEVICE)
diff --git a/ci/scripts/bundle.py b/ci/scripts/bundle.py
index f152389d..7ee09437 100644
--- a/ci/scripts/bundle.py
+++ b/ci/scripts/bundle.py
@@ -203,6 +203,7 @@ def bundle_nanoarrow_ipc(
[
src_dir / "ipc" / "flatcc_generated.h",
src_dir / "ipc" / "decoder.c",
+ src_dir / "ipc" / "encoder.c",
src_dir / "ipc" / "reader.c",
]
)
diff --git a/src/nanoarrow/ipc/decoder.c b/src/nanoarrow/ipc/decoder.c
index e092a218..02d34541 100644
--- a/src/nanoarrow/ipc/decoder.c
+++ b/src/nanoarrow/ipc/decoder.c
@@ -110,17 +110,6 @@ ArrowErrorCode ArrowIpcCheckRuntime(struct ArrowError*
error) {
return NANOARROW_OK;
}
-static enum ArrowIpcEndianness ArrowIpcSystemEndianness(void) {
- uint32_t check = 1;
- char first_byte;
- memcpy(&first_byte, &check, sizeof(char));
- if (first_byte) {
- return NANOARROW_IPC_ENDIANNESS_LITTLE;
- } else {
- return NANOARROW_IPC_ENDIANNESS_BIG;
- }
-}
-
#if NANOARROW_IPC_USE_STDATOMIC
struct ArrowIpcSharedBufferPrivate {
struct ArrowBuffer src;
diff --git a/src/nanoarrow/ipc/decoder_test.cc
b/src/nanoarrow/ipc/decoder_test.cc
index d17e7ee4..1cebc063 100644
--- a/src/nanoarrow/ipc/decoder_test.cc
+++ b/src/nanoarrow/ipc/decoder_test.cc
@@ -30,8 +30,7 @@
using namespace arrow;
-// Copied from nanoarrow_ipc.c so we can test the internal state
-// of the decoder
+// Copied from decoder.c so we can test the internal state
extern "C" {
struct ArrowIpcField {
struct ArrowArrayView* array_view;
@@ -51,17 +50,6 @@ struct ArrowIpcDecoderPrivate {
};
}
-static enum ArrowIpcEndianness ArrowIpcSystemEndianness(void) {
- uint32_t check = 1;
- char first_byte;
- memcpy(&first_byte, &check, sizeof(char));
- if (first_byte) {
- return NANOARROW_IPC_ENDIANNESS_LITTLE;
- } else {
- return NANOARROW_IPC_ENDIANNESS_BIG;
- }
-}
-
TEST(NanoarrowIpcCheckRuntime, CheckRuntime) {
EXPECT_EQ(ArrowIpcCheckRuntime(nullptr), NANOARROW_OK);
}
diff --git a/src/nanoarrow/ipc/encoder.c b/src/nanoarrow/ipc/encoder.c
new file mode 100644
index 00000000..b973e8aa
--- /dev/null
+++ b/src/nanoarrow/ipc/encoder.c
@@ -0,0 +1,84 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include <errno.h>
+#include <inttypes.h>
+#include <stdio.h>
+#include <string.h>
+
+#include "flatcc/flatcc_builder.h"
+#include "nanoarrow/nanoarrow.h"
+#include "nanoarrow/nanoarrow_ipc.h"
+
+struct ArrowIpcEncoderPrivate {
+ flatcc_builder_t builder;
+ struct ArrowBuffer buffers;
+ struct ArrowBuffer nodes;
+};
+
+ArrowErrorCode ArrowIpcEncoderInit(struct ArrowIpcEncoder* encoder) {
+ NANOARROW_DCHECK(encoder != NULL);
+ memset(encoder, 0, sizeof(struct ArrowIpcEncoder));
+ encoder->encode_buffer = NULL;
+ encoder->encode_buffer_state = NULL;
+ encoder->codec = NANOARROW_IPC_COMPRESSION_TYPE_NONE;
+ encoder->private_data = ArrowMalloc(sizeof(struct ArrowIpcEncoderPrivate));
+ struct ArrowIpcEncoderPrivate* private =
+ (struct ArrowIpcEncoderPrivate*)encoder->private_data;
+ if (flatcc_builder_init(&private->builder) == -1) {
+ ArrowFree(private);
+ return ESPIPE;
+ }
+ ArrowBufferInit(&private->buffers);
+ ArrowBufferInit(&private->nodes);
+ return NANOARROW_OK;
+}
+
+void ArrowIpcEncoderReset(struct ArrowIpcEncoder* encoder) {
+ NANOARROW_DCHECK(encoder != NULL && encoder->private_data != NULL);
+ struct ArrowIpcEncoderPrivate* private =
+ (struct ArrowIpcEncoderPrivate*)encoder->private_data;
+ flatcc_builder_clear(&private->builder);
+ ArrowBufferReset(&private->nodes);
+ ArrowBufferReset(&private->buffers);
+ ArrowFree(private);
+ memset(encoder, 0, sizeof(struct ArrowIpcEncoder));
+}
+
+ArrowErrorCode ArrowIpcEncoderFinalizeBuffer(struct ArrowIpcEncoder* encoder,
+ struct ArrowBuffer* out) {
+ NANOARROW_DCHECK(encoder != NULL && encoder->private_data != NULL && out !=
NULL);
+ struct ArrowIpcEncoderPrivate* private =
+ (struct ArrowIpcEncoderPrivate*)encoder->private_data;
+
+ int64_t size = (int64_t)flatcc_builder_get_buffer_size(&private->builder);
+ if (size == 0) {
+ // Finalizing an empty flatcc_builder_t triggers an assertion
+ return NANOARROW_OK;
+ }
+
+ void* data = flatcc_builder_get_direct_buffer(&private->builder, NULL);
+ if (data == NULL) {
+ return ENOMEM;
+ }
+
+ NANOARROW_RETURN_NOT_OK(ArrowBufferAppend(out, data, size));
+
+ // don't deallocate yet, just wipe the builder's current Message
+ flatcc_builder_reset(&private->builder);
+ return NANOARROW_OK;
+}
diff --git a/src/nanoarrow/ipc/encoder_test.cc
b/src/nanoarrow/ipc/encoder_test.cc
new file mode 100644
index 00000000..8f78b063
--- /dev/null
+++ b/src/nanoarrow/ipc/encoder_test.cc
@@ -0,0 +1,62 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include <gtest/gtest.h>
+
+#include "flatcc/flatcc_builder.h"
+#include "nanoarrow/nanoarrow.hpp"
+#include "nanoarrow/nanoarrow_ipc.hpp"
+
+// Copied from encoder.c so we can test the internal state
+extern "C" {
+struct ArrowIpcEncoderPrivate {
+ flatcc_builder_t builder;
+ struct ArrowBuffer buffers;
+ struct ArrowBuffer nodes;
+};
+}
+
+TEST(NanoarrowIpcTest, NanoarrowIpcEncoderConstruction) {
+ nanoarrow::ipc::UniqueEncoder encoder;
+
+ EXPECT_EQ(ArrowIpcEncoderInit(encoder.get()), NANOARROW_OK);
+
+ EXPECT_EQ(encoder->codec, NANOARROW_IPC_COMPRESSION_TYPE_NONE);
+ EXPECT_EQ(encoder->body_length, 0);
+ EXPECT_EQ(encoder->encode_buffer, nullptr);
+ EXPECT_EQ(encoder->encode_buffer_state, nullptr);
+
+ auto* priv = static_cast<struct
ArrowIpcEncoderPrivate*>(encoder->private_data);
+ ASSERT_NE(priv, nullptr);
+ for (auto* b : {&priv->buffers, &priv->nodes}) {
+ // Buffers are empty but initialized with the default allocator
+ EXPECT_EQ(b->size_bytes, 0);
+
+ auto default_allocator = ArrowBufferAllocatorDefault();
+ EXPECT_EQ(memcmp(&b->allocator, &default_allocator, sizeof(b->allocator)),
0);
+ }
+
+ // Empty buffer works
+ nanoarrow::UniqueBuffer buffer;
+ EXPECT_EQ(ArrowIpcEncoderFinalizeBuffer(encoder.get(), buffer.get()),
NANOARROW_OK);
+ EXPECT_EQ(buffer->size_bytes, 0);
+
+ // Append a string (finalizing an empty buffer is an error for
flatcc_builder_t)
+ EXPECT_NE(flatcc_builder_create_string_str(&priv->builder, "hello world"),
0);
+ EXPECT_EQ(ArrowIpcEncoderFinalizeBuffer(encoder.get(), buffer.get()),
NANOARROW_OK);
+ EXPECT_GT(buffer->size_bytes, sizeof("hello world"));
+}
diff --git a/src/nanoarrow/nanoarrow_ipc.h b/src/nanoarrow/nanoarrow_ipc.h
index 8819ec1c..ab7b9eb9 100644
--- a/src/nanoarrow/nanoarrow_ipc.h
+++ b/src/nanoarrow/nanoarrow_ipc.h
@@ -57,6 +57,10 @@
NANOARROW_SYMBOL(NANOARROW_NAMESPACE, ArrowIpcInputStreamMove)
#define ArrowIpcArrayStreamReaderInit \
NANOARROW_SYMBOL(NANOARROW_NAMESPACE, ArrowIpcArrayStreamReaderInit)
+#define ArrowIpcEncoderInit NANOARROW_SYMBOL(NANOARROW_NAMESPACE,
ArrowIpcEncoderInit)
+#define ArrowIpcEncoderReset NANOARROW_SYMBOL(NANOARROW_NAMESPACE,
ArrowIpcEncoderReset)
+#define ArrowIpcEncoderFinalizeBuffer \
+ NANOARROW_SYMBOL(NANOARROW_NAMESPACE, ArrowIpcEncoderFinalizeBuffer)
#endif
@@ -117,6 +121,18 @@ enum ArrowIpcCompressionType {
/// \brief Checks the nanoarrow runtime to make sure the run/build versions
match
ArrowErrorCode ArrowIpcCheckRuntime(struct ArrowError* error);
+/// \brief Get the endianness of the current runtime
+static inline enum ArrowIpcEndianness ArrowIpcSystemEndianness(void) {
+ uint32_t check = 1;
+ char first_byte;
+ memcpy(&first_byte, &check, sizeof(char));
+ if (first_byte) {
+ return NANOARROW_IPC_ENDIANNESS_LITTLE;
+ } else {
+ return NANOARROW_IPC_ENDIANNESS_BIG;
+ }
+}
+
/// \brief A structure representing a reference-counted buffer that may be
passed to
/// ArrowIpcDecoderDecodeArrayFromShared().
struct ArrowIpcSharedBuffer {
@@ -379,6 +395,52 @@ ArrowErrorCode ArrowIpcArrayStreamReaderInit(
struct ArrowArrayStream* out, struct ArrowIpcInputStream* input_stream,
struct ArrowIpcArrayStreamReaderOptions* options);
+/// \brief Encoder for Arrow IPC messages
+///
+/// This structure is intended to be allocated by the caller,
+/// initialized using ArrowIpcEncoderInit(), and released with
+/// ArrowIpcEncoderReset().
+struct ArrowIpcEncoder {
+ /// \brief Compression to encode in the next RecordBatch message.
+ enum ArrowIpcCompressionType codec;
+
+ /// \brief Callback invoked against each buffer to be encoded
+ ///
+ /// Encoding of buffers is left as a callback to accommodate dissociated
data storage.
+ /// One implementation of this callback might copy all buffers into a
contiguous body
+ /// for use in an arrow IPC stream, another implementation might store
offsets and
+ /// lengths relative to a known arena.
+ ArrowErrorCode (*encode_buffer)(struct ArrowBufferView buffer_view,
+ struct ArrowIpcEncoder* encoder, int64_t*
offset,
+ int64_t* length, struct ArrowError* error);
+
+ /// \brief Pointer to arbitrary data used by encode_buffer()
+ void* encode_buffer_state;
+
+ /// \brief Finalized body length of the most recently encoded RecordBatch
message
+ ///
+ /// (This is initially 0 and encode_buffer() is expected to update it. After
all
+ /// buffers are encoded, this will be written to the RecordBatch's
.bodyLength)
+ int64_t body_length;
+
+ /// \brief Private resources managed by this library
+ void* private_data;
+};
+
+/// \brief Initialize an encoder
+///
+/// If NANOARROW_OK is returned, the caller must call ArrowIpcEncoderReset()
+/// to release resources allocated by this function.
+ArrowErrorCode ArrowIpcEncoderInit(struct ArrowIpcEncoder* encoder);
+
+/// \brief Release all resources attached to an encoder
+void ArrowIpcEncoderReset(struct ArrowIpcEncoder* encoder);
+
+/// \brief Finalize the most recently encoded message to a buffer
+///
+/// The bytes of the encoded message will be appended to the provided buffer.
+ArrowErrorCode ArrowIpcEncoderFinalizeBuffer(struct ArrowIpcEncoder* encoder,
+ struct ArrowBuffer* out);
/// @}
#ifdef __cplusplus
diff --git a/src/nanoarrow/nanoarrow_ipc.hpp b/src/nanoarrow/nanoarrow_ipc.hpp
index 2757d28c..e5e0ee33 100644
--- a/src/nanoarrow/nanoarrow_ipc.hpp
+++ b/src/nanoarrow/nanoarrow_ipc.hpp
@@ -41,6 +41,22 @@ inline void release_pointer(struct ArrowIpcDecoder* data) {
ArrowIpcDecoderReset(data);
}
+template <>
+inline void init_pointer(struct ArrowIpcEncoder* data) {
+ data->private_data = nullptr;
+}
+
+template <>
+inline void move_pointer(struct ArrowIpcEncoder* src, struct ArrowIpcEncoder*
dst) {
+ memcpy(dst, src, sizeof(struct ArrowIpcEncoder));
+ src->private_data = nullptr;
+}
+
+template <>
+inline void release_pointer(struct ArrowIpcEncoder* data) {
+ ArrowIpcEncoderReset(data);
+}
+
template <>
inline void init_pointer(struct ArrowIpcInputStream* data) {
data->release = nullptr;
@@ -77,6 +93,9 @@ namespace ipc {
/// \brief Class wrapping a unique struct ArrowIpcDecoder
using UniqueDecoder = internal::Unique<struct ArrowIpcDecoder>;
+/// \brief Class wrapping a unique struct ArrowIpcEncoder
+using UniqueEncoder = internal::Unique<struct ArrowIpcEncoder>;
+
/// \brief Class wrapping a unique struct ArrowIpcInputStream
using UniqueInputStream = internal::Unique<struct ArrowIpcInputStream>;