This is an automated email from the ASF dual-hosted git repository.

paleolimbot pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/arrow-nanoarrow.git


The following commit(s) were added to refs/heads/main by this push:
     new 2040e74a feat: Add IPC writer scaffolding (#564)
2040e74a is described below

commit 2040e74add0a3c8a36877bce35c7dc43c27ba0e4
Author: Benjamin Kietzman <[email protected]>
AuthorDate: Thu Jul 25 10:16:08 2024 -0500

    feat: Add IPC writer scaffolding (#564)
    
    Add `ArrowIpcEncoder`, init/reset, and tests. Extracted from
    
https://github.com/apache/arrow-nanoarrow/pull/555#pullrequestreview-2186295630
---
 CMakeLists.txt                    | 67 +++++++++++++------------------
 ci/scripts/bundle.py              |  1 +
 src/nanoarrow/ipc/decoder.c       | 11 -----
 src/nanoarrow/ipc/decoder_test.cc | 14 +------
 src/nanoarrow/ipc/encoder.c       | 84 +++++++++++++++++++++++++++++++++++++++
 src/nanoarrow/ipc/encoder_test.cc | 62 +++++++++++++++++++++++++++++
 src/nanoarrow/nanoarrow_ipc.h     | 62 +++++++++++++++++++++++++++++
 src/nanoarrow/nanoarrow_ipc.hpp   | 19 +++++++++
 8 files changed, 256 insertions(+), 64 deletions(-)

diff --git a/CMakeLists.txt b/CMakeLists.txt
index f54fbaef..2cb8ad90 100644
--- a/CMakeLists.txt
+++ b/CMakeLists.txt
@@ -223,8 +223,9 @@ if(NANOARROW_IPC)
   endif()
 
   if(NOT NANOARROW_BUNDLE)
-    set(NANOARROW_IPC_BUILD_SOURCES src/nanoarrow/ipc/decoder.c
-                                    src/nanoarrow/ipc/reader.c)
+    set(NANOARROW_IPC_BUILD_SOURCES
+        src/nanoarrow/ipc/decoder.c src/nanoarrow/ipc/encoder.c
+        src/nanoarrow/ipc/reader.c)
   endif()
 
   add_library(nanoarrow_ipc ${NANOARROW_IPC_BUILD_SOURCES})
@@ -418,51 +419,37 @@ if(NANOARROW_BUILD_TESTS)
     endif()
 
     enable_testing()
-
-    add_executable(nanoarrow_ipc_decoder_test 
src/nanoarrow/ipc/decoder_test.cc)
-    add_executable(nanoarrow_ipc_reader_test src/nanoarrow/ipc/reader_test.cc)
-    add_executable(nanoarrow_ipc_files_test src/nanoarrow/ipc/files_test.cc)
-    add_executable(nanoarrow_ipc_hpp_test src/nanoarrow/ipc/ipc_hpp_test.cc)
+    include(GoogleTest)
 
     if(NANOARROW_CODE_COVERAGE)
       target_compile_options(ipc_coverage_config INTERFACE -O0 -g --coverage)
       target_link_options(ipc_coverage_config INTERFACE --coverage)
       target_link_libraries(nanoarrow_ipc PRIVATE ipc_coverage_config)
     endif()
-    target_link_libraries(nanoarrow_ipc_decoder_test
-                          nanoarrow_ipc
-                          nanoarrow
-                          flatccrt
-                          ${NANOARROW_ARROW_TARGET}
-                          gtest_main
-                          ipc_coverage_config)
-    target_link_libraries(nanoarrow_ipc_reader_test
-                          nanoarrow_ipc
-                          nanoarrow
-                          flatccrt
-                          gtest_main
-                          ipc_coverage_config)
-    target_link_libraries(nanoarrow_ipc_files_test
-                          nanoarrow_ipc
-                          nanoarrow
-                          flatccrt
-                          ${NANOARROW_ARROW_TARGET}
-                          nlohmann_json
-                          ZLIB::ZLIB
-                          gtest_main
-                          ipc_coverage_config)
-    target_link_libraries(nanoarrow_ipc_hpp_test
-                          nanoarrow_ipc
-                          nanoarrow
-                          ${NANOARROW_ARROW_TARGET}
-                          gtest_main
-                          ipc_coverage_config)
 
-    include(GoogleTest)
-    gtest_discover_tests(nanoarrow_ipc_decoder_test)
-    gtest_discover_tests(nanoarrow_ipc_reader_test)
-    gtest_discover_tests(nanoarrow_ipc_files_test)
-    gtest_discover_tests(nanoarrow_ipc_hpp_test)
+    foreach(name
+            decoder
+            encoder
+            reader
+            files
+            ipc_hpp)
+      add_executable(nanoarrow_ipc_${name}_test 
src/nanoarrow/ipc/${name}_test.cc)
+
+      target_link_libraries(nanoarrow_ipc_${name}_test
+                            nanoarrow_ipc
+                            nanoarrow
+                            ${NANOARROW_ARROW_TARGET}
+                            gtest_main
+                            ipc_coverage_config)
+
+      if(NOT (name MATCHES "_hpp_"))
+        target_link_libraries(nanoarrow_ipc_${name}_test flatccrt)
+      endif()
+
+      gtest_discover_tests(nanoarrow_ipc_${name}_test)
+    endforeach()
+
+    target_link_libraries(nanoarrow_ipc_files_test nlohmann_json ZLIB::ZLIB)
   endif()
 
   if(NANOARROW_DEVICE)
diff --git a/ci/scripts/bundle.py b/ci/scripts/bundle.py
index f152389d..7ee09437 100644
--- a/ci/scripts/bundle.py
+++ b/ci/scripts/bundle.py
@@ -203,6 +203,7 @@ def bundle_nanoarrow_ipc(
         [
             src_dir / "ipc" / "flatcc_generated.h",
             src_dir / "ipc" / "decoder.c",
+            src_dir / "ipc" / "encoder.c",
             src_dir / "ipc" / "reader.c",
         ]
     )
diff --git a/src/nanoarrow/ipc/decoder.c b/src/nanoarrow/ipc/decoder.c
index e092a218..02d34541 100644
--- a/src/nanoarrow/ipc/decoder.c
+++ b/src/nanoarrow/ipc/decoder.c
@@ -110,17 +110,6 @@ ArrowErrorCode ArrowIpcCheckRuntime(struct ArrowError* 
error) {
   return NANOARROW_OK;
 }
 
-static enum ArrowIpcEndianness ArrowIpcSystemEndianness(void) {
-  uint32_t check = 1;
-  char first_byte;
-  memcpy(&first_byte, &check, sizeof(char));
-  if (first_byte) {
-    return NANOARROW_IPC_ENDIANNESS_LITTLE;
-  } else {
-    return NANOARROW_IPC_ENDIANNESS_BIG;
-  }
-}
-
 #if NANOARROW_IPC_USE_STDATOMIC
 struct ArrowIpcSharedBufferPrivate {
   struct ArrowBuffer src;
diff --git a/src/nanoarrow/ipc/decoder_test.cc 
b/src/nanoarrow/ipc/decoder_test.cc
index d17e7ee4..1cebc063 100644
--- a/src/nanoarrow/ipc/decoder_test.cc
+++ b/src/nanoarrow/ipc/decoder_test.cc
@@ -30,8 +30,7 @@
 
 using namespace arrow;
 
-// Copied from nanoarrow_ipc.c so we can test the internal state
-// of the decoder
+// Copied from decoder.c so we can test the internal state
 extern "C" {
 struct ArrowIpcField {
   struct ArrowArrayView* array_view;
@@ -51,17 +50,6 @@ struct ArrowIpcDecoderPrivate {
 };
 }
 
-static enum ArrowIpcEndianness ArrowIpcSystemEndianness(void) {
-  uint32_t check = 1;
-  char first_byte;
-  memcpy(&first_byte, &check, sizeof(char));
-  if (first_byte) {
-    return NANOARROW_IPC_ENDIANNESS_LITTLE;
-  } else {
-    return NANOARROW_IPC_ENDIANNESS_BIG;
-  }
-}
-
 TEST(NanoarrowIpcCheckRuntime, CheckRuntime) {
   EXPECT_EQ(ArrowIpcCheckRuntime(nullptr), NANOARROW_OK);
 }
diff --git a/src/nanoarrow/ipc/encoder.c b/src/nanoarrow/ipc/encoder.c
new file mode 100644
index 00000000..b973e8aa
--- /dev/null
+++ b/src/nanoarrow/ipc/encoder.c
@@ -0,0 +1,84 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include <errno.h>
+#include <inttypes.h>
+#include <stdio.h>
+#include <string.h>
+
+#include "flatcc/flatcc_builder.h"
+#include "nanoarrow/nanoarrow.h"
+#include "nanoarrow/nanoarrow_ipc.h"
+
+struct ArrowIpcEncoderPrivate {
+  flatcc_builder_t builder;
+  struct ArrowBuffer buffers;
+  struct ArrowBuffer nodes;
+};
+
+ArrowErrorCode ArrowIpcEncoderInit(struct ArrowIpcEncoder* encoder) {
+  NANOARROW_DCHECK(encoder != NULL);
+  memset(encoder, 0, sizeof(struct ArrowIpcEncoder));
+  encoder->encode_buffer = NULL;
+  encoder->encode_buffer_state = NULL;
+  encoder->codec = NANOARROW_IPC_COMPRESSION_TYPE_NONE;
+  encoder->private_data = ArrowMalloc(sizeof(struct ArrowIpcEncoderPrivate));
+  struct ArrowIpcEncoderPrivate* private =
+      (struct ArrowIpcEncoderPrivate*)encoder->private_data;
+  if (flatcc_builder_init(&private->builder) == -1) {
+    ArrowFree(private);
+    return ESPIPE;
+  }
+  ArrowBufferInit(&private->buffers);
+  ArrowBufferInit(&private->nodes);
+  return NANOARROW_OK;
+}
+
+void ArrowIpcEncoderReset(struct ArrowIpcEncoder* encoder) {
+  NANOARROW_DCHECK(encoder != NULL && encoder->private_data != NULL);
+  struct ArrowIpcEncoderPrivate* private =
+      (struct ArrowIpcEncoderPrivate*)encoder->private_data;
+  flatcc_builder_clear(&private->builder);
+  ArrowBufferReset(&private->nodes);
+  ArrowBufferReset(&private->buffers);
+  ArrowFree(private);
+  memset(encoder, 0, sizeof(struct ArrowIpcEncoder));
+}
+
+ArrowErrorCode ArrowIpcEncoderFinalizeBuffer(struct ArrowIpcEncoder* encoder,
+                                             struct ArrowBuffer* out) {
+  NANOARROW_DCHECK(encoder != NULL && encoder->private_data != NULL && out != 
NULL);
+  struct ArrowIpcEncoderPrivate* private =
+      (struct ArrowIpcEncoderPrivate*)encoder->private_data;
+
+  int64_t size = (int64_t)flatcc_builder_get_buffer_size(&private->builder);
+  if (size == 0) {
+    // Finalizing an empty flatcc_builder_t triggers an assertion
+    return NANOARROW_OK;
+  }
+
+  void* data = flatcc_builder_get_direct_buffer(&private->builder, NULL);
+  if (data == NULL) {
+    return ENOMEM;
+  }
+
+  NANOARROW_RETURN_NOT_OK(ArrowBufferAppend(out, data, size));
+
+  // don't deallocate yet, just wipe the builder's current Message
+  flatcc_builder_reset(&private->builder);
+  return NANOARROW_OK;
+}
diff --git a/src/nanoarrow/ipc/encoder_test.cc 
b/src/nanoarrow/ipc/encoder_test.cc
new file mode 100644
index 00000000..8f78b063
--- /dev/null
+++ b/src/nanoarrow/ipc/encoder_test.cc
@@ -0,0 +1,62 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include <gtest/gtest.h>
+
+#include "flatcc/flatcc_builder.h"
+#include "nanoarrow/nanoarrow.hpp"
+#include "nanoarrow/nanoarrow_ipc.hpp"
+
+// Copied from encoder.c so we can test the internal state
+extern "C" {
+struct ArrowIpcEncoderPrivate {
+  flatcc_builder_t builder;
+  struct ArrowBuffer buffers;
+  struct ArrowBuffer nodes;
+};
+}
+
+TEST(NanoarrowIpcTest, NanoarrowIpcEncoderConstruction) {
+  nanoarrow::ipc::UniqueEncoder encoder;
+
+  EXPECT_EQ(ArrowIpcEncoderInit(encoder.get()), NANOARROW_OK);
+
+  EXPECT_EQ(encoder->codec, NANOARROW_IPC_COMPRESSION_TYPE_NONE);
+  EXPECT_EQ(encoder->body_length, 0);
+  EXPECT_EQ(encoder->encode_buffer, nullptr);
+  EXPECT_EQ(encoder->encode_buffer_state, nullptr);
+
+  auto* priv = static_cast<struct 
ArrowIpcEncoderPrivate*>(encoder->private_data);
+  ASSERT_NE(priv, nullptr);
+  for (auto* b : {&priv->buffers, &priv->nodes}) {
+    // Buffers are empty but initialized with the default allocator
+    EXPECT_EQ(b->size_bytes, 0);
+
+    auto default_allocator = ArrowBufferAllocatorDefault();
+    EXPECT_EQ(memcmp(&b->allocator, &default_allocator, sizeof(b->allocator)), 
0);
+  }
+
+  // Empty buffer works
+  nanoarrow::UniqueBuffer buffer;
+  EXPECT_EQ(ArrowIpcEncoderFinalizeBuffer(encoder.get(), buffer.get()), 
NANOARROW_OK);
+  EXPECT_EQ(buffer->size_bytes, 0);
+
+  // Append a string (finalizing an empty buffer is an error for 
flatcc_builder_t)
+  EXPECT_NE(flatcc_builder_create_string_str(&priv->builder, "hello world"), 
0);
+  EXPECT_EQ(ArrowIpcEncoderFinalizeBuffer(encoder.get(), buffer.get()), 
NANOARROW_OK);
+  EXPECT_GT(buffer->size_bytes, sizeof("hello world"));
+}
diff --git a/src/nanoarrow/nanoarrow_ipc.h b/src/nanoarrow/nanoarrow_ipc.h
index 8819ec1c..ab7b9eb9 100644
--- a/src/nanoarrow/nanoarrow_ipc.h
+++ b/src/nanoarrow/nanoarrow_ipc.h
@@ -57,6 +57,10 @@
   NANOARROW_SYMBOL(NANOARROW_NAMESPACE, ArrowIpcInputStreamMove)
 #define ArrowIpcArrayStreamReaderInit \
   NANOARROW_SYMBOL(NANOARROW_NAMESPACE, ArrowIpcArrayStreamReaderInit)
+#define ArrowIpcEncoderInit NANOARROW_SYMBOL(NANOARROW_NAMESPACE, 
ArrowIpcEncoderInit)
+#define ArrowIpcEncoderReset NANOARROW_SYMBOL(NANOARROW_NAMESPACE, 
ArrowIpcEncoderReset)
+#define ArrowIpcEncoderFinalizeBuffer \
+  NANOARROW_SYMBOL(NANOARROW_NAMESPACE, ArrowIpcEncoderFinalizeBuffer)
 
 #endif
 
@@ -117,6 +121,18 @@ enum ArrowIpcCompressionType {
 /// \brief Checks the nanoarrow runtime to make sure the run/build versions 
match
 ArrowErrorCode ArrowIpcCheckRuntime(struct ArrowError* error);
 
+/// \brief Get the endianness of the current runtime
+static inline enum ArrowIpcEndianness ArrowIpcSystemEndianness(void) {
+  uint32_t check = 1;
+  char first_byte;
+  memcpy(&first_byte, &check, sizeof(char));
+  if (first_byte) {
+    return NANOARROW_IPC_ENDIANNESS_LITTLE;
+  } else {
+    return NANOARROW_IPC_ENDIANNESS_BIG;
+  }
+}
+
 /// \brief A structure representing a reference-counted buffer that may be 
passed to
 /// ArrowIpcDecoderDecodeArrayFromShared().
 struct ArrowIpcSharedBuffer {
@@ -379,6 +395,52 @@ ArrowErrorCode ArrowIpcArrayStreamReaderInit(
     struct ArrowArrayStream* out, struct ArrowIpcInputStream* input_stream,
     struct ArrowIpcArrayStreamReaderOptions* options);
 
+/// \brief Encoder for Arrow IPC messages
+///
+/// This structure is intended to be allocated by the caller,
+/// initialized using ArrowIpcEncoderInit(), and released with
+/// ArrowIpcEncoderReset().
+struct ArrowIpcEncoder {
+  /// \brief Compression to encode in the next RecordBatch message.
+  enum ArrowIpcCompressionType codec;
+
+  /// \brief Callback invoked against each buffer to be encoded
+  ///
+  /// Encoding of buffers is left as a callback to accommodate dissociated 
data storage.
+  /// One implementation of this callback might copy all buffers into a 
contiguous body
+  /// for use in an arrow IPC stream, another implementation might store 
offsets and
+  /// lengths relative to a known arena.
+  ArrowErrorCode (*encode_buffer)(struct ArrowBufferView buffer_view,
+                                  struct ArrowIpcEncoder* encoder, int64_t* 
offset,
+                                  int64_t* length, struct ArrowError* error);
+
+  /// \brief Pointer to arbitrary data used by encode_buffer()
+  void* encode_buffer_state;
+
+  /// \brief Finalized body length of the most recently encoded RecordBatch 
message
+  ///
+  /// (This is initially 0 and encode_buffer() is expected to update it. After 
all
+  /// buffers are encoded, this will be written to the RecordBatch's 
.bodyLength)
+  int64_t body_length;
+
+  /// \brief Private resources managed by this library
+  void* private_data;
+};
+
+/// \brief Initialize an encoder
+///
+/// If NANOARROW_OK is returned, the caller must call ArrowIpcEncoderReset()
+/// to release resources allocated by this function.
+ArrowErrorCode ArrowIpcEncoderInit(struct ArrowIpcEncoder* encoder);
+
+/// \brief Release all resources attached to an encoder
+void ArrowIpcEncoderReset(struct ArrowIpcEncoder* encoder);
+
+/// \brief Finalize the most recently encoded message to a buffer
+///
+/// The bytes of the encoded message will be appended to the provided buffer.
+ArrowErrorCode ArrowIpcEncoderFinalizeBuffer(struct ArrowIpcEncoder* encoder,
+                                             struct ArrowBuffer* out);
 /// @}
 
 #ifdef __cplusplus
diff --git a/src/nanoarrow/nanoarrow_ipc.hpp b/src/nanoarrow/nanoarrow_ipc.hpp
index 2757d28c..e5e0ee33 100644
--- a/src/nanoarrow/nanoarrow_ipc.hpp
+++ b/src/nanoarrow/nanoarrow_ipc.hpp
@@ -41,6 +41,22 @@ inline void release_pointer(struct ArrowIpcDecoder* data) {
   ArrowIpcDecoderReset(data);
 }
 
+template <>
+inline void init_pointer(struct ArrowIpcEncoder* data) {
+  data->private_data = nullptr;
+}
+
+template <>
+inline void move_pointer(struct ArrowIpcEncoder* src, struct ArrowIpcEncoder* 
dst) {
+  memcpy(dst, src, sizeof(struct ArrowIpcEncoder));
+  src->private_data = nullptr;
+}
+
+template <>
+inline void release_pointer(struct ArrowIpcEncoder* data) {
+  ArrowIpcEncoderReset(data);
+}
+
 template <>
 inline void init_pointer(struct ArrowIpcInputStream* data) {
   data->release = nullptr;
@@ -77,6 +93,9 @@ namespace ipc {
 /// \brief Class wrapping a unique struct ArrowIpcDecoder
 using UniqueDecoder = internal::Unique<struct ArrowIpcDecoder>;
 
+/// \brief Class wrapping a unique struct ArrowIpcEncoder
+using UniqueEncoder = internal::Unique<struct ArrowIpcEncoder>;
+
 /// \brief Class wrapping a unique struct ArrowIpcInputStream
 using UniqueInputStream = internal::Unique<struct ArrowIpcInputStream>;
 

Reply via email to