This is an automated email from the ASF dual-hosted git repository.

paleolimbot pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/arrow-nanoarrow.git


The following commit(s) were added to refs/heads/main by this push:
     new 28cd737  feat(extensions/nanoarrow_ipc): Allow shared buffers for 
zero-copy buffer decode (#165)
28cd737 is described below

commit 28cd7377a789f5cb25067a53df29ef37962026b8
Author: Dewey Dunnington <[email protected]>
AuthorDate: Fri Mar 24 09:53:40 2023 -0400

    feat(extensions/nanoarrow_ipc): Allow shared buffers for zero-copy buffer 
decode (#165)
    
    This PR makes buffer construction more flexible, with the initial target
    being zero-copy shared references to a single `struct ArrowBuffer`
    containing the read-in message body. Where supported (i.e., C11 and up),
    these shared buffer references are thread safe.
    
    I implemented this by abstracting out a `struct ArrowIpcBufferFactory`.
    This could be exposed to the user, too (but maybe in another PR where I
    also implement endian swapping).
    
    Using the decoder API, you can request this using a new function:
    
    ```c
    ArrowErrorCode ArrowIpcDecoderDecodeArrayFromShared(struct ArrowIpcDecoder* 
decoder,
                                                       struct 
ArrowIpcSharedBuffer* body, int64_t i,
                                                       struct ArrowArray* out,
                                                       struct ArrowError* 
error);
    ```
    
    Using the `ArrowArrayStream` interface this is the default *if* shared
    buffers are thread safe (this can be overridden).
---
 extensions/nanoarrow_ipc/CMakeLists.txt            |   1 +
 .../nanoarrow_ipc/src/nanoarrow/nanoarrow_ipc.h    |  62 +++++-
 .../src/nanoarrow/nanoarrow_ipc_decoder.c          | 247 ++++++++++++++++++---
 .../src/nanoarrow/nanoarrow_ipc_decoder_test.cc    |  72 +++++-
 .../src/nanoarrow/nanoarrow_ipc_reader.c           |  24 +-
 .../src/nanoarrow/nanoarrow_ipc_reader_test.cc     |  46 ++++
 6 files changed, 415 insertions(+), 37 deletions(-)

diff --git a/extensions/nanoarrow_ipc/CMakeLists.txt 
b/extensions/nanoarrow_ipc/CMakeLists.txt
index 8830b35..be5a71e 100644
--- a/extensions/nanoarrow_ipc/CMakeLists.txt
+++ b/extensions/nanoarrow_ipc/CMakeLists.txt
@@ -204,6 +204,7 @@ if (NANOARROW_IPC_BUILD_TESTS)
 
   include(GoogleTest)
   gtest_discover_tests(nanoarrow_ipc_decoder_test)
+  gtest_discover_tests(nanoarrow_ipc_reader_test)
 endif()
 
 if (NANOARROW_IPC_BUILD_APPS)
diff --git a/extensions/nanoarrow_ipc/src/nanoarrow/nanoarrow_ipc.h 
b/extensions/nanoarrow_ipc/src/nanoarrow/nanoarrow_ipc.h
index 071b299..3fa4163 100644
--- a/extensions/nanoarrow_ipc/src/nanoarrow/nanoarrow_ipc.h
+++ b/extensions/nanoarrow_ipc/src/nanoarrow/nanoarrow_ipc.h
@@ -23,6 +23,12 @@
 #ifdef NANOARROW_NAMESPACE
 
 #define ArrowIpcCheckRuntime NANOARROW_SYMBOL(NANOARROW_NAMESPACE, 
ArrowIpcCheckRuntime)
+#define ArrowIpcSharedBufferIsThreadSafe \
+  NANOARROW_SYMBOL(NANOARROW_NAMESPACE, ArrowIpcSharedBufferIsThreadSafe)
+#define ArrowIpcSharedBufferInit \
+  NANOARROW_SYMBOL(NANOARROW_NAMESPACE, ArrowIpcSharedBufferInit)
+#define ArrowIpcSharedBufferReset \
+  NANOARROW_SYMBOL(NANOARROW_NAMESPACE, ArrowIpcSharedBufferReset)
 #define ArrowIpcDecoderInit NANOARROW_SYMBOL(NANOARROW_NAMESPACE, 
ArrowIpcDecoderInit)
 #define ArrowIpcDecoderReset NANOARROW_SYMBOL(NANOARROW_NAMESPACE, 
ArrowIpcDecoderReset)
 #define ArrowIpcDecoderPeekHeader \
@@ -35,6 +41,8 @@
   NANOARROW_SYMBOL(NANOARROW_NAMESPACE, ArrowIpcDecoderDecodeSchema)
 #define ArrowIpcDecoderDecodeArray \
   NANOARROW_SYMBOL(NANOARROW_NAMESPACE, ArrowIpcDecoderDecodeArray)
+#define ArrowIpcDecoderDecodeArrayFromShared \
+  NANOARROW_SYMBOL(NANOARROW_NAMESPACE, ArrowIpcDecoderDecodeArrayFromShared)
 #define ArrowIpcDecoderSetSchema \
   NANOARROW_SYMBOL(NANOARROW_NAMESPACE, ArrowIpcDecoderSetSchema)
 #define ArrowIpcDecoderSetEndianness \
@@ -86,8 +94,36 @@ enum ArrowIpcCompressionType {
 #define NANOARROW_IPC_FEATURE_DICTIONARY_REPLACEMENT 1
 #define NANOARROW_IPC_FEATURE_COMPRESSED_BODY 2
 
+/// \brief Checks the nanoarrow runtime to make sure the run/build versions 
match
 ArrowErrorCode ArrowIpcCheckRuntime(struct ArrowError* error);
 
+/// \brief A structure representing a reference-counted buffer that may be 
passed to
+/// ArrowIpcDecoderDecodeArrayFromShared().
+struct ArrowIpcSharedBuffer {
+  struct ArrowBuffer private_src;
+};
+
+/// \brief Initialize the contents of a ArrowIpcSharedBuffer struct
+///
+/// If NANOARROW_OK is returned, the ArrowIpcSharedBuffer takes ownership of
+/// src.
+ArrowErrorCode ArrowIpcSharedBufferInit(struct ArrowIpcSharedBuffer* shared,
+                                        struct ArrowBuffer* src);
+
+/// \brief Release the caller's copy of the shared buffer
+///
+/// When finished, the caller must relinquish its own copy of the shared data
+/// using this function. The original buffer will continue to exist until all
+/// ArrowArray objects that refer to it have also been released.
+void ArrowIpcSharedBufferReset(struct ArrowIpcSharedBuffer* shared);
+
+/// \brief Check for shared buffer thread safety
+///
+/// Thread-safe shared buffers require C11 and the stdatomic.h header.
+/// If either are unavailable, shared buffers are still possible but
+/// the resulting arrays must not be passed to other threads to be released.
+int ArrowIpcSharedBufferIsThreadSafe(void);
+
 /// \brief Decoder for Arrow IPC messages
 ///
 /// This structure is intended to be allocated by the caller,
@@ -227,6 +263,18 @@ ArrowErrorCode ArrowIpcDecoderDecodeArray(struct 
ArrowIpcDecoder* decoder,
                                           struct ArrowArray* out,
                                           struct ArrowError* error);
 
+/// \brief Decode an ArrowArray from an owned buffer
+///
+/// This implementation takes advantage of the fact that it can avoid copying 
individual
+/// buffers. In all cases the caller must ArrowIpcSharedBufferReset() body 
after one or
+/// more calls to ArrowIpcDecoderDecodeArrayFromShared(). If
+/// ArrowIpcSharedBufferIsThreadSafe() returns 0, out must not be released by 
another
+/// thread.
+ArrowErrorCode ArrowIpcDecoderDecodeArrayFromShared(struct ArrowIpcDecoder* 
decoder,
+                                                    struct 
ArrowIpcSharedBuffer* shared,
+                                                    int64_t i, struct 
ArrowArray* out,
+                                                    struct ArrowError* error);
+
 /// \brief An user-extensible input data source
 struct ArrowIpcInputStream {
   /// \brief Read up to buf_size_bytes from stream into buf
@@ -266,8 +314,20 @@ ArrowErrorCode ArrowIpcInputStreamInitFile(struct 
ArrowIpcInputStream* stream,
 
 /// \brief Options for ArrowIpcArrayStreamReaderInit()
 struct ArrowIpcArrayStreamReaderOptions {
-  /// \brief The field index to extract. Defaults to -1 (i.e., read all 
fields).
+  /// \brief The field index to extract.
+  ///
+  /// Defaults to -1 (i.e., read all fields). Note that this field index 
refers to
+  /// the flattened tree of children and not necessarily the column index.
   int64_t field_index;
+
+  /// \brief Set to a non-zero value to share the message body buffer among 
decoded arrays
+  ///
+  /// Sharing buffers is a good choice when (1) using memory-mapped IO
+  /// (since unreferenced portions of the file are often not loaded into 
memory) or
+  /// (2) if all data from all columns are about to be referenced anyway. When 
loading
+  /// a single field there is probably no advantage to using shared buffers.
+  /// Defaults to the value of ArrowIpcSharedBufferIsThreadSafe().
+  int use_shared_buffers;
 };
 
 /// \brief Initialize an ArrowArrayStream from an input stream of bytes
diff --git a/extensions/nanoarrow_ipc/src/nanoarrow/nanoarrow_ipc_decoder.c 
b/extensions/nanoarrow_ipc/src/nanoarrow/nanoarrow_ipc_decoder.c
index e9c673e..a84419b 100644
--- a/extensions/nanoarrow_ipc/src/nanoarrow/nanoarrow_ipc_decoder.c
+++ b/extensions/nanoarrow_ipc/src/nanoarrow/nanoarrow_ipc_decoder.c
@@ -19,6 +19,19 @@
 #include <stdio.h>
 #include <string.h>
 
+// For thread safe shared buffers we need C11 + stdatomic.h
+#if !defined(NANOARROW_IPC_USE_STDATOMIC)
+#define NANOARROW_IPC_USE_STDATOMIC 0
+#if defined(__STDC_VERSION__) && __STDC_VERSION__ >= 201112L
+#if !defined(__STDC_NO_ATOMICS__)
+#include <stdatomic.h>
+#undef NANOARROW_IPC_USE_STDATOMIC
+#define NANOARROW_IPC_USE_STDATOMIC 1
+#endif
+#endif
+
+#endif
+
 #include "nanoarrow.h"
 #include "nanoarrow_ipc.h"
 #include "nanoarrow_ipc_flatcc_generated.h"
@@ -62,6 +75,90 @@ static enum ArrowIpcEndianness 
ArrowIpcSystemEndianness(void) {
   }
 }
 
+#if NANOARROW_IPC_USE_STDATOMIC
+struct ArrowIpcSharedBufferPrivate {
+  struct ArrowBuffer src;
+  atomic_long reference_count;
+};
+
+static int64_t ArrowIpcSharedBufferUpdate(
+    struct ArrowIpcSharedBufferPrivate* private_data, int delta) {
+  int64_t old_count = atomic_fetch_add(&private_data->reference_count, delta);
+  return old_count + delta;
+}
+
+static void ArrowIpcSharedBufferSet(struct ArrowIpcSharedBufferPrivate* 
private_data,
+                                    int64_t count) {
+  atomic_store(&private_data->reference_count, count);
+}
+
+int ArrowIpcSharedBufferIsThreadSafe(void) { return 1; }
+#else
+struct ArrowIpcSharedBuffer {
+  struct ArrowBuffer src;
+  int64_t reference_count;
+};
+
+static int64_t ArrowIpcSharedBufferUpdate(
+    struct ArrowIpcSharedBufferPrivate* private_data, int delta) {
+  private_data->reference_count += delta;
+  return private_data->reference_count;
+}
+
+static void ArrowIpcSharedBufferSet(struct ArrowIpcSharedBufferPrivate* 
private_data,
+                                    int64_t count) {
+  private_data->reference_count = count;
+}
+
+int ArrowIpcSharedBufferIsThreadSafe(void) { return 0; }
+#endif
+
+static void ArrowIpcSharedBufferFree(struct ArrowBufferAllocator* allocator, 
uint8_t* ptr,
+                                     int64_t size) {
+  struct ArrowIpcSharedBufferPrivate* private_data =
+      (struct ArrowIpcSharedBufferPrivate*)allocator->private_data;
+
+  if (ArrowIpcSharedBufferUpdate(private_data, -1) == 0) {
+    ArrowBufferReset(&private_data->src);
+    ArrowFree(private_data);
+  }
+}
+
+ArrowErrorCode ArrowIpcSharedBufferInit(struct ArrowIpcSharedBuffer* shared,
+                                        struct ArrowBuffer* src) {
+  struct ArrowIpcSharedBufferPrivate* private_data =
+      (struct ArrowIpcSharedBufferPrivate*)ArrowMalloc(
+          sizeof(struct ArrowIpcSharedBufferPrivate));
+  if (private_data == NULL) {
+    return ENOMEM;
+  }
+
+  ArrowBufferMove(src, &private_data->src);
+  ArrowIpcSharedBufferSet(private_data, 1);
+
+  ArrowBufferInit(&shared->private_src);
+  shared->private_src.data = private_data->src.data;
+  shared->private_src.size_bytes = private_data->src.size_bytes;
+  // Don't expose any extra capcity from src so that any calls to 
ArrowBufferAppend
+  // on this buffer will fail.
+  shared->private_src.capacity_bytes = private_data->src.size_bytes;
+  shared->private_src.allocator =
+      ArrowBufferDeallocator(&ArrowIpcSharedBufferFree, private_data);
+  return NANOARROW_OK;
+}
+
+static void ArrowIpcSharedBufferClone(struct ArrowIpcSharedBuffer* shared,
+                                      struct ArrowBuffer* shared_out) {
+  struct ArrowIpcSharedBufferPrivate* private_data =
+      (struct 
ArrowIpcSharedBufferPrivate*)shared->private_src.allocator.private_data;
+  ArrowIpcSharedBufferUpdate(private_data, 1);
+  memcpy(shared_out, shared, sizeof(struct ArrowBuffer));
+}
+
+void ArrowIpcSharedBufferReset(struct ArrowIpcSharedBuffer* shared) {
+  ArrowBufferReset(&shared->private_src);
+}
+
 static int ArrowIpcDecoderNeedsSwapEndian(struct ArrowIpcDecoder* decoder) {
   struct ArrowIpcDecoderPrivate* private_data =
       (struct ArrowIpcDecoderPrivate*)decoder->private_data;
@@ -1051,14 +1148,97 @@ ArrowErrorCode ArrowIpcDecoderSetEndianness(struct 
ArrowIpcDecoder* decoder,
   }
 }
 
+/// \brief Information required to read and/or decompress a single buffer
+///
+/// The RecordBatch message header contains a description of each buffer
+/// in the message body. The ArrowIpcBufferSource is the parsed result of
+/// a single buffer with compression and endian information such that the
+/// original buffer can be reconstructed.
+struct ArrowIpcBufferSource {
+  int64_t body_offset_bytes;
+  int64_t buffer_length_bytes;
+  enum ArrowIpcCompressionType codec;
+  int swap_endian;
+};
+
+/// \brief Materializing ArrowBuffer objects
+///
+/// Given a description of where a buffer is located inside the message body, 
make
+/// the ArrowBuffer that will be placed into the correct ArrowArray. The 
decoder
+/// does not do any IO and does not make any assumptions about how or if the 
body
+/// has been read into memory. This abstraction is currently internal and 
exists
+/// to support the two obvious ways a user might go about this: (1) using a
+/// non-owned view of memory that must be copied slice-wise or (2) adding a 
reference
+/// to an ArrowIpcSharedBuffer and returning a slice of that memory.
+struct ArrowIpcBufferFactory {
+  /// \brief User-defined callback to create initialize the desired buffer 
into dst
+  ///
+  /// At the time that this callback is called, the ArrowIpcBufferSource has 
been checked
+  /// to ensure that it is within the body size declared by the message 
header. If
+  /// NANOARROW_OK is returned, the caller is responsible for dst. Otherwise, 
error must
+  /// contain a null-terminated message.
+  ArrowErrorCode (*make_buffer)(struct ArrowIpcBufferFactory* factory,
+                                struct ArrowIpcBufferSource* src, struct 
ArrowBuffer* dst,
+                                struct ArrowError* error);
+
+  /// \brief Caller-defined private data to be used in the callback.
+  ///
+  /// Usually this would be a description of where the body has been read into 
memory or
+  /// information required to do so.
+  void* private_data;
+};
+
+static ArrowErrorCode ArrowIpcMakeBufferFromView(struct ArrowIpcBufferFactory* 
factory,
+                                                 struct ArrowIpcBufferSource* 
src,
+                                                 struct ArrowBuffer* dst,
+                                                 struct ArrowError* error) {
+  struct ArrowBufferView* body = (struct 
ArrowBufferView*)factory->private_data;
+
+  struct ArrowBufferView view;
+  view.data.as_uint8 = body->data.as_uint8 + src->body_offset_bytes;
+  view.size_bytes = src->buffer_length_bytes;
+
+  ArrowBufferInit(dst);
+  NANOARROW_RETURN_NOT_OK(ArrowBufferAppendBufferView(dst, view));
+  return NANOARROW_OK;
+}
+
+static struct ArrowIpcBufferFactory ArrowIpcBufferFactoryFromView(
+    struct ArrowBufferView* buffer_view) {
+  struct ArrowIpcBufferFactory out;
+  out.make_buffer = &ArrowIpcMakeBufferFromView;
+  out.private_data = buffer_view;
+  return out;
+}
+
+static ArrowErrorCode ArrowIpcMakeBufferFromShared(struct 
ArrowIpcBufferFactory* factory,
+                                                   struct 
ArrowIpcBufferSource* src,
+                                                   struct ArrowBuffer* dst,
+                                                   struct ArrowError* error) {
+  struct ArrowIpcSharedBuffer* shared =
+      (struct ArrowIpcSharedBuffer*)factory->private_data;
+  ArrowIpcSharedBufferClone(shared, dst);
+  dst->data += src->body_offset_bytes;
+  dst->size_bytes = src->buffer_length_bytes;
+  return NANOARROW_OK;
+}
+
+static struct ArrowIpcBufferFactory ArrowIpcBufferFactoryFromShared(
+    struct ArrowIpcSharedBuffer* shared) {
+  struct ArrowIpcBufferFactory out;
+  out.make_buffer = &ArrowIpcMakeBufferFromShared;
+  out.private_data = shared;
+  return out;
+}
+
 struct ArrowIpcArraySetter {
   ns(FieldNode_vec_t) fields;
   int64_t field_i;
   ns(Buffer_vec_t) buffers;
   int64_t buffer_i;
-  struct ArrowBufferView body;
-  enum ArrowIpcCompressionType codec;
-  int swap_endian;
+  int64_t body_size_bytes;
+  struct ArrowIpcBufferSource src;
+  struct ArrowIpcBufferFactory factory;
 };
 
 static int ArrowIpcDecoderMakeBuffer(struct ArrowIpcArraySetter* setter, 
int64_t offset,
@@ -1069,35 +1249,32 @@ static int ArrowIpcDecoderMakeBuffer(struct 
ArrowIpcArraySetter* setter, int64_t
   }
 
   // Check that this buffer fits within the body
-  if (offset < 0 || (offset + length) > setter->body.size_bytes) {
-    ArrowErrorSet(error,
-                  "Buffer %ld requires body offsets [%ld..%ld) but body has 
size %ld",
-                  (long)setter->buffer_i - 1, (long)offset, (long)offset + 
(long)length,
-                  setter->body.size_bytes);
+  int64_t buffer_start = offset;
+  int64_t buffer_end = buffer_start + length;
+  if (buffer_start < 0 || buffer_end > setter->body_size_bytes) {
+    ArrowErrorSet(error, "Buffer requires body offsets [%ld..%ld) but body has 
size %ld",
+                  (long)buffer_start, (long)buffer_end, 
(long)setter->body_size_bytes);
     return EINVAL;
   }
 
-  struct ArrowBufferView view;
-  view.data.as_uint8 = setter->body.data.as_uint8 + offset;
-  view.size_bytes = length;
-
-  if (setter->codec != NANOARROW_IPC_COMPRESSION_TYPE_NONE) {
+  // If the ArrowIpcBufferFactory is made public, these should get moved 
(since then a
+  // user could inject support for either one). More likely, by the time that 
happens,
+  // this library will be able to support some of these features.
+  if (setter->src.codec != NANOARROW_IPC_COMPRESSION_TYPE_NONE) {
     ArrowErrorSet(error, "The nanoarrow_ipc extension does not support 
compression");
     return ENOTSUP;
   }
 
-  if (setter->swap_endian) {
+  if (setter->src.swap_endian) {
     ArrowErrorSet(error,
                   "The nanoarrow_ipc extension does not support non-system 
endianness");
     return ENOTSUP;
   }
 
-  int result = ArrowBufferAppendBufferView(out, view);
-  if (result != NANOARROW_OK) {
-    ArrowErrorSet(error, "Failed to copy buffer");
-    return result;
-  }
-
+  setter->src.body_offset_bytes = offset;
+  setter->src.buffer_length_bytes = length;
+  NANOARROW_RETURN_NOT_OK(
+      setter->factory.make_buffer(&setter->factory, &setter->src, out, error));
   return NANOARROW_OK;
 }
 
@@ -1142,10 +1319,9 @@ static int ArrowIpcArrayInitFromArrayView(struct 
ArrowArray* array,
   return NANOARROW_OK;
 }
 
-ArrowErrorCode ArrowIpcDecoderDecodeArray(struct ArrowIpcDecoder* decoder,
-                                          struct ArrowBufferView body, int64_t 
field_i,
-                                          struct ArrowArray* out,
-                                          struct ArrowError* error) {
+static ArrowErrorCode ArrowIpcDecoderDecodeArrayInternal(
+    struct ArrowIpcDecoder* decoder, struct ArrowIpcBufferFactory factory,
+    int64_t field_i, struct ArrowArray* out, struct ArrowError* error) {
   struct ArrowIpcDecoderPrivate* private_data =
       (struct ArrowIpcDecoderPrivate*)decoder->private_data;
 
@@ -1173,9 +1349,10 @@ ArrowErrorCode ArrowIpcDecoderDecodeArray(struct 
ArrowIpcDecoder* decoder,
   setter.field_i = field_i;
   setter.buffers = ns(RecordBatch_buffers(batch));
   setter.buffer_i = root->buffer_offset - 1;
-  setter.body = body;
-  setter.codec = decoder->codec;
-  setter.swap_endian = ArrowIpcDecoderNeedsSwapEndian(decoder);
+  setter.body_size_bytes = decoder->body_size_bytes;
+  setter.factory = factory;
+  setter.src.codec = decoder->codec;
+  setter.src.swap_endian = ArrowIpcDecoderNeedsSwapEndian(decoder);
 
   // The flatbuffers FieldNode doesn't count the root struct so we have to 
loop over the
   // children ourselves
@@ -1212,3 +1389,19 @@ ArrowErrorCode ArrowIpcDecoderDecodeArray(struct 
ArrowIpcDecoder* decoder,
   ArrowArrayMove(&temp, out);
   return NANOARROW_OK;
 }
+
+ArrowErrorCode ArrowIpcDecoderDecodeArray(struct ArrowIpcDecoder* decoder,
+                                          struct ArrowBufferView body, int64_t 
i,
+                                          struct ArrowArray* out,
+                                          struct ArrowError* error) {
+  return ArrowIpcDecoderDecodeArrayInternal(decoder, 
ArrowIpcBufferFactoryFromView(&body),
+                                            i, out, error);
+}
+
+ArrowErrorCode ArrowIpcDecoderDecodeArrayFromShared(struct ArrowIpcDecoder* 
decoder,
+                                                    struct 
ArrowIpcSharedBuffer* body,
+                                                    int64_t i, struct 
ArrowArray* out,
+                                                    struct ArrowError* error) {
+  return ArrowIpcDecoderDecodeArrayInternal(
+      decoder, ArrowIpcBufferFactoryFromShared(body), i, out, error);
+}
diff --git 
a/extensions/nanoarrow_ipc/src/nanoarrow/nanoarrow_ipc_decoder_test.cc 
b/extensions/nanoarrow_ipc/src/nanoarrow/nanoarrow_ipc_decoder_test.cc
index caf7ebd..7c24b67 100644
--- a/extensions/nanoarrow_ipc/src/nanoarrow/nanoarrow_ipc_decoder_test.cc
+++ b/extensions/nanoarrow_ipc/src/nanoarrow/nanoarrow_ipc_decoder_test.cc
@@ -333,10 +333,9 @@ TEST(NanoarrowIpcTest, 
NanoarrowIpcDecodeSimpleRecordBatch) {
   ArrowIpcDecoderSetEndianness(&decoder, 
NANOARROW_IPC_ENDIANNESS_UNINITIALIZED);
 
   // Field extract should fail if body is too small
-  body.size_bytes = 0;
+  decoder.body_size_bytes = 0;
   EXPECT_EQ(ArrowIpcDecoderDecodeArray(&decoder, body, 0, &array, &error), 
EINVAL);
-  EXPECT_STREQ(error.message,
-               "Buffer 1 requires body offsets [0..12) but body has size 0");
+  EXPECT_STREQ(error.message, "Buffer requires body offsets [0..12) but body 
has size 0");
 
   // Should error if the number of buffers or field nodes doesn't match
   // (different numbers because we count the root struct and the message does 
not)
@@ -436,6 +435,73 @@ TEST_P(ArrowTypeParameterizedTestFixture, 
NanoarrowIpcArrowTypeRoundtrip) {
   ArrowIpcDecoderReset(&decoder);
 }
 
+TEST(NanoarrowIpcTest, NanoarrowIpcDecodeSimpleRecordBatchOwned) {
+  struct ArrowIpcDecoder decoder;
+  struct ArrowError error;
+  struct ArrowSchema schema;
+  struct ArrowArray array;
+
+  // Data buffer content of the hard-coded record batch message
+  uint8_t one_two_three_le[] = {0x01, 0x00, 0x00, 0x00, 0x02, 0x00,
+                                0x00, 0x00, 0x03, 0x00, 0x00, 0x00};
+
+  ArrowSchemaInit(&schema);
+  ASSERT_EQ(ArrowSchemaSetTypeStruct(&schema, 1), NANOARROW_OK);
+  ASSERT_EQ(ArrowSchemaSetType(schema.children[0], NANOARROW_TYPE_INT32), 
NANOARROW_OK);
+
+  struct ArrowBufferView data;
+  data.data.as_uint8 = kSimpleRecordBatch;
+  data.size_bytes = sizeof(kSimpleRecordBatch);
+
+  ArrowIpcDecoderInit(&decoder);
+  auto decoder_private =
+      reinterpret_cast<struct ArrowIpcDecoderPrivate*>(decoder.private_data);
+
+  ASSERT_EQ(ArrowIpcDecoderSetSchema(&decoder, &schema, nullptr), 
NANOARROW_OK);
+  EXPECT_EQ(ArrowIpcDecoderDecodeHeader(&decoder, data, &error), NANOARROW_OK);
+
+  struct ArrowBuffer body;
+  ArrowBufferInit(&body);
+  ASSERT_EQ(ArrowBufferAppend(&body, kSimpleRecordBatch + 
decoder.header_size_bytes,
+                              decoder.body_size_bytes),
+            NANOARROW_OK);
+
+  struct ArrowIpcSharedBuffer shared;
+  ASSERT_EQ(ArrowIpcSharedBufferInit(&shared, &body), NANOARROW_OK);
+
+  // Check full struct extract
+  EXPECT_EQ(ArrowIpcDecoderDecodeArrayFromShared(&decoder, &shared, -1, 
&array, nullptr),
+            NANOARROW_OK);
+
+  EXPECT_EQ(array.length, 3);
+  EXPECT_EQ(array.null_count, 0);
+  ASSERT_EQ(array.n_children, 1);
+  ASSERT_EQ(array.children[0]->n_buffers, 2);
+  ASSERT_EQ(array.children[0]->length, 3);
+  EXPECT_EQ(array.children[0]->null_count, 0);
+  EXPECT_EQ(
+      memcmp(array.children[0]->buffers[1], one_two_three_le, 
sizeof(one_two_three_le)),
+      0);
+
+  array.release(&array);
+
+  // Check field extract
+  EXPECT_EQ(ArrowIpcDecoderDecodeArrayFromShared(&decoder, &shared, 0, &array, 
nullptr),
+            NANOARROW_OK);
+  // Release the original shared (forthcoming array buffers should still be 
valid)
+  ArrowIpcSharedBufferReset(&shared);
+
+  ASSERT_EQ(array.n_buffers, 2);
+  ASSERT_EQ(array.length, 3);
+  EXPECT_EQ(array.null_count, 0);
+  EXPECT_EQ(memcmp(array.buffers[1], one_two_three_le, 
sizeof(one_two_three_le)), 0);
+
+  array.release(&array);
+  schema.release(&schema);
+  ArrowBufferReset(&body);
+  ArrowIpcDecoderReset(&decoder);
+}
+
 TEST_P(ArrowTypeParameterizedTestFixture, NanoarrowIpcArrowArrayRoundtrip) {
   const std::shared_ptr<arrow::DataType>& data_type = GetParam();
   std::shared_ptr<arrow::Schema> dummy_schema =
diff --git a/extensions/nanoarrow_ipc/src/nanoarrow/nanoarrow_ipc_reader.c 
b/extensions/nanoarrow_ipc/src/nanoarrow/nanoarrow_ipc_reader.c
index f813cab..58d7da9 100644
--- a/extensions/nanoarrow_ipc/src/nanoarrow/nanoarrow_ipc_reader.c
+++ b/extensions/nanoarrow_ipc/src/nanoarrow/nanoarrow_ipc_reader.c
@@ -166,6 +166,7 @@ ArrowErrorCode ArrowIpcInputStreamInitFile(struct 
ArrowIpcInputStream* stream,
 struct ArrowIpcArrayStreamReaderPrivate {
   struct ArrowIpcInputStream input;
   struct ArrowIpcDecoder decoder;
+  int use_shared_buffers;
   struct ArrowSchema out_schema;
   int64_t field_index;
   struct ArrowBuffer header;
@@ -370,13 +371,22 @@ static int ArrowIpcArrayStreamReaderGetNext(struct 
ArrowArrayStream* stream,
   // Read in the body
   NANOARROW_RETURN_NOT_OK(ArrowIpcArrayStreamReaderNextBody(private_data));
 
-  struct ArrowBufferView body_view;
-  body_view.data.data = private_data->body.data;
-  body_view.size_bytes = private_data->body.size_bytes;
+  if (private_data->use_shared_buffers) {
+    struct ArrowIpcSharedBuffer shared;
+    NANOARROW_RETURN_NOT_OK(ArrowIpcSharedBufferInit(&shared, 
&private_data->body));
+    NANOARROW_RETURN_NOT_OK(ArrowIpcDecoderDecodeArrayFromShared(
+        &private_data->decoder, &shared, private_data->field_index, out,
+        &private_data->error));
+    ArrowIpcSharedBufferReset(&shared);
+  } else {
+    struct ArrowBufferView body_view;
+    body_view.data.data = private_data->body.data;
+    body_view.size_bytes = private_data->body.size_bytes;
 
-  NANOARROW_RETURN_NOT_OK(ArrowIpcDecoderDecodeArray(&private_data->decoder, 
body_view,
-                                                     
private_data->field_index, out,
-                                                     &private_data->error));
+    NANOARROW_RETURN_NOT_OK(ArrowIpcDecoderDecodeArray(&private_data->decoder, 
body_view,
+                                                       
private_data->field_index, out,
+                                                       &private_data->error));
+  }
 
   return NANOARROW_OK;
 }
@@ -411,8 +421,10 @@ ArrowErrorCode ArrowIpcArrayStreamReaderInit(
 
   if (options != NULL) {
     private_data->field_index = options->field_index;
+    private_data->use_shared_buffers = options->use_shared_buffers;
   } else {
     private_data->field_index = -1;
+    private_data->use_shared_buffers = ArrowIpcSharedBufferIsThreadSafe();
   }
 
   out->private_data = private_data;
diff --git 
a/extensions/nanoarrow_ipc/src/nanoarrow/nanoarrow_ipc_reader_test.cc 
b/extensions/nanoarrow_ipc/src/nanoarrow/nanoarrow_ipc_reader_test.cc
index 126aaf3..03e4048 100644
--- a/extensions/nanoarrow_ipc/src/nanoarrow/nanoarrow_ipc_reader_test.cc
+++ b/extensions/nanoarrow_ipc/src/nanoarrow/nanoarrow_ipc_reader_test.cc
@@ -59,6 +59,14 @@ static uint8_t kSimpleRecordBatch[] = {
 
 static uint8_t kEndOfStream[] = {0xff, 0xff, 0xff, 0xff, 0x00, 0x00, 0x00, 
0x00};
 
+// If we have an explicit compile with or without atomics, test the return 
value of
+// ArrowIpcSharedBufferIsThreadSafe()
+#if defined(NANOARROW_IPC_USE_STDATOMIC)
+TEST(NanoarrowIpcReader, ArrowIpcSharedBufferIsThreadSafe) {
+  EXPECT_EQ(ArrowIpcSharedBufferIsThreadSafe(), NANOARROW_IPC_USE_STDATOMIC != 
0);
+}
+#endif
+
 TEST(NanoarrowIpcReader, InputStreamBuffer) {
   uint8_t input_data[] = {0x01, 0x02, 0x03, 0x04, 0x05};
   struct ArrowBuffer input;
@@ -173,6 +181,43 @@ TEST(NanoarrowIpcReader, StreamReaderBasic) {
   stream.release(&stream);
 }
 
+TEST(NanoarrowIpcReader, StreamReaderBasicNoSharedBuffers) {
+  struct ArrowBuffer input_buffer;
+  ArrowBufferInit(&input_buffer);
+  ASSERT_EQ(ArrowBufferAppend(&input_buffer, kSimpleSchema, 
sizeof(kSimpleSchema)),
+            NANOARROW_OK);
+  ASSERT_EQ(
+      ArrowBufferAppend(&input_buffer, kSimpleRecordBatch, 
sizeof(kSimpleRecordBatch)),
+      NANOARROW_OK);
+
+  struct ArrowIpcInputStream input;
+  ASSERT_EQ(ArrowIpcInputStreamInitBuffer(&input, &input_buffer), 
NANOARROW_OK);
+
+  struct ArrowArrayStream stream;
+  struct ArrowIpcArrayStreamReaderOptions options;
+  options.field_index = -1;
+  options.use_shared_buffers = 0;
+  ASSERT_EQ(ArrowIpcArrayStreamReaderInit(&stream, &input, &options), 
NANOARROW_OK);
+
+  struct ArrowSchema schema;
+  ASSERT_EQ(stream.get_schema(&stream, &schema), NANOARROW_OK);
+  EXPECT_STREQ(schema.format, "+s");
+  schema.release(&schema);
+
+  struct ArrowArray array;
+  ASSERT_EQ(stream.get_next(&stream, &array), NANOARROW_OK);
+  EXPECT_EQ(array.length, 3);
+  array.release(&array);
+
+  ASSERT_EQ(stream.get_next(&stream, &array), NANOARROW_OK);
+  EXPECT_EQ(array.release, nullptr);
+
+  ASSERT_EQ(stream.get_next(&stream, &array), NANOARROW_OK);
+  EXPECT_EQ(array.release, nullptr);
+
+  stream.release(&stream);
+}
+
 TEST(NanoarrowIpcReader, StreamReaderBasicWithEndOfStream) {
   struct ArrowBuffer input_buffer;
   ArrowBufferInit(&input_buffer);
@@ -268,6 +313,7 @@ TEST(NanoarrowIpcReader, StreamReaderUnsupportedFieldIndex) 
{
   struct ArrowArrayStream stream;
   struct ArrowIpcArrayStreamReaderOptions options;
   options.field_index = 0;
+  options.use_shared_buffers = 0;
   ASSERT_EQ(ArrowIpcArrayStreamReaderInit(&stream, &input, &options), 
NANOARROW_OK);
 
   struct ArrowSchema schema;

Reply via email to