This is an automated email from the ASF dual-hosted git repository.

github-bot pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/arrow-nanoarrow.git


The following commit(s) were added to refs/heads/main by this push:
     new 59c5b62  ipc: Update dist/ for commit 
28cd7377a789f5cb25067a53df29ef37962026b8
59c5b62 is described below

commit 59c5b62efa16ec5a8a2e55d5e41ae84a44bae4f2
Author: GitHub Actions <[email protected]>
AuthorDate: Fri Mar 24 14:00:38 2023 +0000

    ipc: Update dist/ for commit 28cd7377a789f5cb25067a53df29ef37962026b8
---
 dist/nanoarrow_ipc.c | 271 ++++++++++++++++++++++++++++++++++++++++++++-------
 dist/nanoarrow_ipc.h |  62 +++++++++++-
 2 files changed, 299 insertions(+), 34 deletions(-)

diff --git a/dist/nanoarrow_ipc.c b/dist/nanoarrow_ipc.c
index d1d09ae..522c71e 100644
--- a/dist/nanoarrow_ipc.c
+++ b/dist/nanoarrow_ipc.c
@@ -20290,6 +20290,19 @@ static inline int 
org_apache_arrow_flatbuf_Tensor_verify_as_root_with_type_hash(
 #include <stdio.h>
 #include <string.h>
 
+// For thread safe shared buffers we need C11 + stdatomic.h
+#if !defined(NANOARROW_IPC_USE_STDATOMIC)
+#define NANOARROW_IPC_USE_STDATOMIC 0
+#if defined(__STDC_VERSION__) && __STDC_VERSION__ >= 201112L
+#if !defined(__STDC_NO_ATOMICS__)
+#include <stdatomic.h>
+#undef NANOARROW_IPC_USE_STDATOMIC
+#define NANOARROW_IPC_USE_STDATOMIC 1
+#endif
+#endif
+
+#endif
+
 #include "nanoarrow.h"
 #include "nanoarrow_ipc.h"
 
@@ -20333,6 +20346,90 @@ static enum ArrowIpcEndianness 
ArrowIpcSystemEndianness(void) {
   }
 }
 
+#if NANOARROW_IPC_USE_STDATOMIC
+struct ArrowIpcSharedBufferPrivate {
+  struct ArrowBuffer src;
+  atomic_long reference_count;
+};
+
+static int64_t ArrowIpcSharedBufferUpdate(
+    struct ArrowIpcSharedBufferPrivate* private_data, int delta) {
+  int64_t old_count = atomic_fetch_add(&private_data->reference_count, delta);
+  return old_count + delta;
+}
+
+static void ArrowIpcSharedBufferSet(struct ArrowIpcSharedBufferPrivate* 
private_data,
+                                    int64_t count) {
+  atomic_store(&private_data->reference_count, count);
+}
+
+int ArrowIpcSharedBufferIsThreadSafe(void) { return 1; }
+#else
+struct ArrowIpcSharedBuffer {
+  struct ArrowBuffer src;
+  int64_t reference_count;
+};
+
+static int64_t ArrowIpcSharedBufferUpdate(
+    struct ArrowIpcSharedBufferPrivate* private_data, int delta) {
+  private_data->reference_count += delta;
+  return private_data->reference_count;
+}
+
+static void ArrowIpcSharedBufferSet(struct ArrowIpcSharedBufferPrivate* 
private_data,
+                                    int64_t count) {
+  private_data->reference_count = count;
+}
+
+int ArrowIpcSharedBufferIsThreadSafe(void) { return 0; }
+#endif
+
+static void ArrowIpcSharedBufferFree(struct ArrowBufferAllocator* allocator, 
uint8_t* ptr,
+                                     int64_t size) {
+  struct ArrowIpcSharedBufferPrivate* private_data =
+      (struct ArrowIpcSharedBufferPrivate*)allocator->private_data;
+
+  if (ArrowIpcSharedBufferUpdate(private_data, -1) == 0) {
+    ArrowBufferReset(&private_data->src);
+    ArrowFree(private_data);
+  }
+}
+
+ArrowErrorCode ArrowIpcSharedBufferInit(struct ArrowIpcSharedBuffer* shared,
+                                        struct ArrowBuffer* src) {
+  struct ArrowIpcSharedBufferPrivate* private_data =
+      (struct ArrowIpcSharedBufferPrivate*)ArrowMalloc(
+          sizeof(struct ArrowIpcSharedBufferPrivate));
+  if (private_data == NULL) {
+    return ENOMEM;
+  }
+
+  ArrowBufferMove(src, &private_data->src);
+  ArrowIpcSharedBufferSet(private_data, 1);
+
+  ArrowBufferInit(&shared->private_src);
+  shared->private_src.data = private_data->src.data;
+  shared->private_src.size_bytes = private_data->src.size_bytes;
+  // Don't expose any extra capcity from src so that any calls to 
ArrowBufferAppend
+  // on this buffer will fail.
+  shared->private_src.capacity_bytes = private_data->src.size_bytes;
+  shared->private_src.allocator =
+      ArrowBufferDeallocator(&ArrowIpcSharedBufferFree, private_data);
+  return NANOARROW_OK;
+}
+
+static void ArrowIpcSharedBufferClone(struct ArrowIpcSharedBuffer* shared,
+                                      struct ArrowBuffer* shared_out) {
+  struct ArrowIpcSharedBufferPrivate* private_data =
+      (struct 
ArrowIpcSharedBufferPrivate*)shared->private_src.allocator.private_data;
+  ArrowIpcSharedBufferUpdate(private_data, 1);
+  memcpy(shared_out, shared, sizeof(struct ArrowBuffer));
+}
+
+void ArrowIpcSharedBufferReset(struct ArrowIpcSharedBuffer* shared) {
+  ArrowBufferReset(&shared->private_src);
+}
+
 static int ArrowIpcDecoderNeedsSwapEndian(struct ArrowIpcDecoder* decoder) {
   struct ArrowIpcDecoderPrivate* private_data =
       (struct ArrowIpcDecoderPrivate*)decoder->private_data;
@@ -21322,14 +21419,97 @@ ArrowErrorCode ArrowIpcDecoderSetEndianness(struct 
ArrowIpcDecoder* decoder,
   }
 }
 
+/// \brief Information required to read and/or decompress a single buffer
+///
+/// The RecordBatch message header contains a description of each buffer
+/// in the message body. The ArrowIpcBufferSource is the parsed result of
+/// a single buffer with compression and endian information such that the
+/// original buffer can be reconstructed.
+struct ArrowIpcBufferSource {
+  int64_t body_offset_bytes;
+  int64_t buffer_length_bytes;
+  enum ArrowIpcCompressionType codec;
+  int swap_endian;
+};
+
+/// \brief Materializing ArrowBuffer objects
+///
+/// Given a description of where a buffer is located inside the message body, 
make
+/// the ArrowBuffer that will be placed into the correct ArrowArray. The 
decoder
+/// does not do any IO and does not make any assumptions about how or if the 
body
+/// has been read into memory. This abstraction is currently internal and 
exists
+/// to support the two obvious ways a user might go about this: (1) using a
+/// non-owned view of memory that must be copied slice-wise or (2) adding a 
reference
+/// to an ArrowIpcSharedBuffer and returning a slice of that memory.
+struct ArrowIpcBufferFactory {
+  /// \brief User-defined callback to create initialize the desired buffer 
into dst
+  ///
+  /// At the time that this callback is called, the ArrowIpcBufferSource has 
been checked
+  /// to ensure that it is within the body size declared by the message 
header. If
+  /// NANOARROW_OK is returned, the caller is responsible for dst. Otherwise, 
error must
+  /// contain a null-terminated message.
+  ArrowErrorCode (*make_buffer)(struct ArrowIpcBufferFactory* factory,
+                                struct ArrowIpcBufferSource* src, struct 
ArrowBuffer* dst,
+                                struct ArrowError* error);
+
+  /// \brief Caller-defined private data to be used in the callback.
+  ///
+  /// Usually this would be a description of where the body has been read into 
memory or
+  /// information required to do so.
+  void* private_data;
+};
+
+static ArrowErrorCode ArrowIpcMakeBufferFromView(struct ArrowIpcBufferFactory* 
factory,
+                                                 struct ArrowIpcBufferSource* 
src,
+                                                 struct ArrowBuffer* dst,
+                                                 struct ArrowError* error) {
+  struct ArrowBufferView* body = (struct 
ArrowBufferView*)factory->private_data;
+
+  struct ArrowBufferView view;
+  view.data.as_uint8 = body->data.as_uint8 + src->body_offset_bytes;
+  view.size_bytes = src->buffer_length_bytes;
+
+  ArrowBufferInit(dst);
+  NANOARROW_RETURN_NOT_OK(ArrowBufferAppendBufferView(dst, view));
+  return NANOARROW_OK;
+}
+
+static struct ArrowIpcBufferFactory ArrowIpcBufferFactoryFromView(
+    struct ArrowBufferView* buffer_view) {
+  struct ArrowIpcBufferFactory out;
+  out.make_buffer = &ArrowIpcMakeBufferFromView;
+  out.private_data = buffer_view;
+  return out;
+}
+
+static ArrowErrorCode ArrowIpcMakeBufferFromShared(struct 
ArrowIpcBufferFactory* factory,
+                                                   struct 
ArrowIpcBufferSource* src,
+                                                   struct ArrowBuffer* dst,
+                                                   struct ArrowError* error) {
+  struct ArrowIpcSharedBuffer* shared =
+      (struct ArrowIpcSharedBuffer*)factory->private_data;
+  ArrowIpcSharedBufferClone(shared, dst);
+  dst->data += src->body_offset_bytes;
+  dst->size_bytes = src->buffer_length_bytes;
+  return NANOARROW_OK;
+}
+
+static struct ArrowIpcBufferFactory ArrowIpcBufferFactoryFromShared(
+    struct ArrowIpcSharedBuffer* shared) {
+  struct ArrowIpcBufferFactory out;
+  out.make_buffer = &ArrowIpcMakeBufferFromShared;
+  out.private_data = shared;
+  return out;
+}
+
 struct ArrowIpcArraySetter {
   ns(FieldNode_vec_t) fields;
   int64_t field_i;
   ns(Buffer_vec_t) buffers;
   int64_t buffer_i;
-  struct ArrowBufferView body;
-  enum ArrowIpcCompressionType codec;
-  int swap_endian;
+  int64_t body_size_bytes;
+  struct ArrowIpcBufferSource src;
+  struct ArrowIpcBufferFactory factory;
 };
 
 static int ArrowIpcDecoderMakeBuffer(struct ArrowIpcArraySetter* setter, 
int64_t offset,
@@ -21340,35 +21520,32 @@ static int ArrowIpcDecoderMakeBuffer(struct 
ArrowIpcArraySetter* setter, int64_t
   }
 
   // Check that this buffer fits within the body
-  if (offset < 0 || (offset + length) > setter->body.size_bytes) {
-    ArrowErrorSet(error,
-                  "Buffer %ld requires body offsets [%ld..%ld) but body has 
size %ld",
-                  (long)setter->buffer_i - 1, (long)offset, (long)offset + 
(long)length,
-                  setter->body.size_bytes);
+  int64_t buffer_start = offset;
+  int64_t buffer_end = buffer_start + length;
+  if (buffer_start < 0 || buffer_end > setter->body_size_bytes) {
+    ArrowErrorSet(error, "Buffer requires body offsets [%ld..%ld) but body has 
size %ld",
+                  (long)buffer_start, (long)buffer_end, 
(long)setter->body_size_bytes);
     return EINVAL;
   }
 
-  struct ArrowBufferView view;
-  view.data.as_uint8 = setter->body.data.as_uint8 + offset;
-  view.size_bytes = length;
-
-  if (setter->codec != NANOARROW_IPC_COMPRESSION_TYPE_NONE) {
+  // If the ArrowIpcBufferFactory is made public, these should get moved 
(since then a
+  // user could inject support for either one). More likely, by the time that 
happens,
+  // this library will be able to support some of these features.
+  if (setter->src.codec != NANOARROW_IPC_COMPRESSION_TYPE_NONE) {
     ArrowErrorSet(error, "The nanoarrow_ipc extension does not support 
compression");
     return ENOTSUP;
   }
 
-  if (setter->swap_endian) {
+  if (setter->src.swap_endian) {
     ArrowErrorSet(error,
                   "The nanoarrow_ipc extension does not support non-system 
endianness");
     return ENOTSUP;
   }
 
-  int result = ArrowBufferAppendBufferView(out, view);
-  if (result != NANOARROW_OK) {
-    ArrowErrorSet(error, "Failed to copy buffer");
-    return result;
-  }
-
+  setter->src.body_offset_bytes = offset;
+  setter->src.buffer_length_bytes = length;
+  NANOARROW_RETURN_NOT_OK(
+      setter->factory.make_buffer(&setter->factory, &setter->src, out, error));
   return NANOARROW_OK;
 }
 
@@ -21413,10 +21590,9 @@ static int ArrowIpcArrayInitFromArrayView(struct 
ArrowArray* array,
   return NANOARROW_OK;
 }
 
-ArrowErrorCode ArrowIpcDecoderDecodeArray(struct ArrowIpcDecoder* decoder,
-                                          struct ArrowBufferView body, int64_t 
field_i,
-                                          struct ArrowArray* out,
-                                          struct ArrowError* error) {
+static ArrowErrorCode ArrowIpcDecoderDecodeArrayInternal(
+    struct ArrowIpcDecoder* decoder, struct ArrowIpcBufferFactory factory,
+    int64_t field_i, struct ArrowArray* out, struct ArrowError* error) {
   struct ArrowIpcDecoderPrivate* private_data =
       (struct ArrowIpcDecoderPrivate*)decoder->private_data;
 
@@ -21444,9 +21620,10 @@ ArrowErrorCode ArrowIpcDecoderDecodeArray(struct 
ArrowIpcDecoder* decoder,
   setter.field_i = field_i;
   setter.buffers = ns(RecordBatch_buffers(batch));
   setter.buffer_i = root->buffer_offset - 1;
-  setter.body = body;
-  setter.codec = decoder->codec;
-  setter.swap_endian = ArrowIpcDecoderNeedsSwapEndian(decoder);
+  setter.body_size_bytes = decoder->body_size_bytes;
+  setter.factory = factory;
+  setter.src.codec = decoder->codec;
+  setter.src.swap_endian = ArrowIpcDecoderNeedsSwapEndian(decoder);
 
   // The flatbuffers FieldNode doesn't count the root struct so we have to 
loop over the
   // children ourselves
@@ -21483,6 +21660,22 @@ ArrowErrorCode ArrowIpcDecoderDecodeArray(struct 
ArrowIpcDecoder* decoder,
   ArrowArrayMove(&temp, out);
   return NANOARROW_OK;
 }
+
+ArrowErrorCode ArrowIpcDecoderDecodeArray(struct ArrowIpcDecoder* decoder,
+                                          struct ArrowBufferView body, int64_t 
i,
+                                          struct ArrowArray* out,
+                                          struct ArrowError* error) {
+  return ArrowIpcDecoderDecodeArrayInternal(decoder, 
ArrowIpcBufferFactoryFromView(&body),
+                                            i, out, error);
+}
+
+ArrowErrorCode ArrowIpcDecoderDecodeArrayFromShared(struct ArrowIpcDecoder* 
decoder,
+                                                    struct 
ArrowIpcSharedBuffer* body,
+                                                    int64_t i, struct 
ArrowArray* out,
+                                                    struct ArrowError* error) {
+  return ArrowIpcDecoderDecodeArrayInternal(
+      decoder, ArrowIpcBufferFactoryFromShared(body), i, out, error);
+}
 // Licensed to the Apache Software Foundation (ASF) under one
 // or more contributor license agreements.  See the NOTICE file
 // distributed with this work for additional information
@@ -21651,6 +21844,7 @@ ArrowErrorCode ArrowIpcInputStreamInitFile(struct 
ArrowIpcInputStream* stream,
 struct ArrowIpcArrayStreamReaderPrivate {
   struct ArrowIpcInputStream input;
   struct ArrowIpcDecoder decoder;
+  int use_shared_buffers;
   struct ArrowSchema out_schema;
   int64_t field_index;
   struct ArrowBuffer header;
@@ -21855,13 +22049,22 @@ static int ArrowIpcArrayStreamReaderGetNext(struct 
ArrowArrayStream* stream,
   // Read in the body
   NANOARROW_RETURN_NOT_OK(ArrowIpcArrayStreamReaderNextBody(private_data));
 
-  struct ArrowBufferView body_view;
-  body_view.data.data = private_data->body.data;
-  body_view.size_bytes = private_data->body.size_bytes;
+  if (private_data->use_shared_buffers) {
+    struct ArrowIpcSharedBuffer shared;
+    NANOARROW_RETURN_NOT_OK(ArrowIpcSharedBufferInit(&shared, 
&private_data->body));
+    NANOARROW_RETURN_NOT_OK(ArrowIpcDecoderDecodeArrayFromShared(
+        &private_data->decoder, &shared, private_data->field_index, out,
+        &private_data->error));
+    ArrowIpcSharedBufferReset(&shared);
+  } else {
+    struct ArrowBufferView body_view;
+    body_view.data.data = private_data->body.data;
+    body_view.size_bytes = private_data->body.size_bytes;
 
-  NANOARROW_RETURN_NOT_OK(ArrowIpcDecoderDecodeArray(&private_data->decoder, 
body_view,
-                                                     
private_data->field_index, out,
-                                                     &private_data->error));
+    NANOARROW_RETURN_NOT_OK(ArrowIpcDecoderDecodeArray(&private_data->decoder, 
body_view,
+                                                       
private_data->field_index, out,
+                                                       &private_data->error));
+  }
 
   return NANOARROW_OK;
 }
@@ -21896,8 +22099,10 @@ ArrowErrorCode ArrowIpcArrayStreamReaderInit(
 
   if (options != NULL) {
     private_data->field_index = options->field_index;
+    private_data->use_shared_buffers = options->use_shared_buffers;
   } else {
     private_data->field_index = -1;
+    private_data->use_shared_buffers = ArrowIpcSharedBufferIsThreadSafe();
   }
 
   out->private_data = private_data;
diff --git a/dist/nanoarrow_ipc.h b/dist/nanoarrow_ipc.h
index 071b299..3fa4163 100644
--- a/dist/nanoarrow_ipc.h
+++ b/dist/nanoarrow_ipc.h
@@ -23,6 +23,12 @@
 #ifdef NANOARROW_NAMESPACE
 
 #define ArrowIpcCheckRuntime NANOARROW_SYMBOL(NANOARROW_NAMESPACE, 
ArrowIpcCheckRuntime)
+#define ArrowIpcSharedBufferIsThreadSafe \
+  NANOARROW_SYMBOL(NANOARROW_NAMESPACE, ArrowIpcSharedBufferIsThreadSafe)
+#define ArrowIpcSharedBufferInit \
+  NANOARROW_SYMBOL(NANOARROW_NAMESPACE, ArrowIpcSharedBufferInit)
+#define ArrowIpcSharedBufferReset \
+  NANOARROW_SYMBOL(NANOARROW_NAMESPACE, ArrowIpcSharedBufferReset)
 #define ArrowIpcDecoderInit NANOARROW_SYMBOL(NANOARROW_NAMESPACE, 
ArrowIpcDecoderInit)
 #define ArrowIpcDecoderReset NANOARROW_SYMBOL(NANOARROW_NAMESPACE, 
ArrowIpcDecoderReset)
 #define ArrowIpcDecoderPeekHeader \
@@ -35,6 +41,8 @@
   NANOARROW_SYMBOL(NANOARROW_NAMESPACE, ArrowIpcDecoderDecodeSchema)
 #define ArrowIpcDecoderDecodeArray \
   NANOARROW_SYMBOL(NANOARROW_NAMESPACE, ArrowIpcDecoderDecodeArray)
+#define ArrowIpcDecoderDecodeArrayFromShared \
+  NANOARROW_SYMBOL(NANOARROW_NAMESPACE, ArrowIpcDecoderDecodeArrayFromShared)
 #define ArrowIpcDecoderSetSchema \
   NANOARROW_SYMBOL(NANOARROW_NAMESPACE, ArrowIpcDecoderSetSchema)
 #define ArrowIpcDecoderSetEndianness \
@@ -86,8 +94,36 @@ enum ArrowIpcCompressionType {
 #define NANOARROW_IPC_FEATURE_DICTIONARY_REPLACEMENT 1
 #define NANOARROW_IPC_FEATURE_COMPRESSED_BODY 2
 
+/// \brief Checks the nanoarrow runtime to make sure the run/build versions 
match
 ArrowErrorCode ArrowIpcCheckRuntime(struct ArrowError* error);
 
+/// \brief A structure representing a reference-counted buffer that may be 
passed to
+/// ArrowIpcDecoderDecodeArrayFromShared().
+struct ArrowIpcSharedBuffer {
+  struct ArrowBuffer private_src;
+};
+
+/// \brief Initialize the contents of a ArrowIpcSharedBuffer struct
+///
+/// If NANOARROW_OK is returned, the ArrowIpcSharedBuffer takes ownership of
+/// src.
+ArrowErrorCode ArrowIpcSharedBufferInit(struct ArrowIpcSharedBuffer* shared,
+                                        struct ArrowBuffer* src);
+
+/// \brief Release the caller's copy of the shared buffer
+///
+/// When finished, the caller must relinquish its own copy of the shared data
+/// using this function. The original buffer will continue to exist until all
+/// ArrowArray objects that refer to it have also been released.
+void ArrowIpcSharedBufferReset(struct ArrowIpcSharedBuffer* shared);
+
+/// \brief Check for shared buffer thread safety
+///
+/// Thread-safe shared buffers require C11 and the stdatomic.h header.
+/// If either are unavailable, shared buffers are still possible but
+/// the resulting arrays must not be passed to other threads to be released.
+int ArrowIpcSharedBufferIsThreadSafe(void);
+
 /// \brief Decoder for Arrow IPC messages
 ///
 /// This structure is intended to be allocated by the caller,
@@ -227,6 +263,18 @@ ArrowErrorCode ArrowIpcDecoderDecodeArray(struct 
ArrowIpcDecoder* decoder,
                                           struct ArrowArray* out,
                                           struct ArrowError* error);
 
+/// \brief Decode an ArrowArray from an owned buffer
+///
+/// This implementation takes advantage of the fact that it can avoid copying 
individual
+/// buffers. In all cases the caller must ArrowIpcSharedBufferReset() body 
after one or
+/// more calls to ArrowIpcDecoderDecodeArrayFromShared(). If
+/// ArrowIpcSharedBufferIsThreadSafe() returns 0, out must not be released by 
another
+/// thread.
+ArrowErrorCode ArrowIpcDecoderDecodeArrayFromShared(struct ArrowIpcDecoder* 
decoder,
+                                                    struct 
ArrowIpcSharedBuffer* shared,
+                                                    int64_t i, struct 
ArrowArray* out,
+                                                    struct ArrowError* error);
+
 /// \brief An user-extensible input data source
 struct ArrowIpcInputStream {
   /// \brief Read up to buf_size_bytes from stream into buf
@@ -266,8 +314,20 @@ ArrowErrorCode ArrowIpcInputStreamInitFile(struct 
ArrowIpcInputStream* stream,
 
 /// \brief Options for ArrowIpcArrayStreamReaderInit()
 struct ArrowIpcArrayStreamReaderOptions {
-  /// \brief The field index to extract. Defaults to -1 (i.e., read all 
fields).
+  /// \brief The field index to extract.
+  ///
+  /// Defaults to -1 (i.e., read all fields). Note that this field index 
refers to
+  /// the flattened tree of children and not necessarily the column index.
   int64_t field_index;
+
+  /// \brief Set to a non-zero value to share the message body buffer among 
decoded arrays
+  ///
+  /// Sharing buffers is a good choice when (1) using memory-mapped IO
+  /// (since unreferenced portions of the file are often not loaded into 
memory) or
+  /// (2) if all data from all columns are about to be referenced anyway. When 
loading
+  /// a single field there is probably no advantage to using shared buffers.
+  /// Defaults to the value of ArrowIpcSharedBufferIsThreadSafe().
+  int use_shared_buffers;
 };
 
 /// \brief Initialize an ArrowArrayStream from an input stream of bytes

Reply via email to