This is an automated email from the ASF dual-hosted git repository.
github-bot pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/arrow-nanoarrow.git
The following commit(s) were added to refs/heads/main by this push:
new 59c5b62 ipc: Update dist/ for commit
28cd7377a789f5cb25067a53df29ef37962026b8
59c5b62 is described below
commit 59c5b62efa16ec5a8a2e55d5e41ae84a44bae4f2
Author: GitHub Actions <[email protected]>
AuthorDate: Fri Mar 24 14:00:38 2023 +0000
ipc: Update dist/ for commit 28cd7377a789f5cb25067a53df29ef37962026b8
---
dist/nanoarrow_ipc.c | 271 ++++++++++++++++++++++++++++++++++++++++++++-------
dist/nanoarrow_ipc.h | 62 +++++++++++-
2 files changed, 299 insertions(+), 34 deletions(-)
diff --git a/dist/nanoarrow_ipc.c b/dist/nanoarrow_ipc.c
index d1d09ae..522c71e 100644
--- a/dist/nanoarrow_ipc.c
+++ b/dist/nanoarrow_ipc.c
@@ -20290,6 +20290,19 @@ static inline int
org_apache_arrow_flatbuf_Tensor_verify_as_root_with_type_hash(
#include <stdio.h>
#include <string.h>
+// For thread safe shared buffers we need C11 + stdatomic.h
+#if !defined(NANOARROW_IPC_USE_STDATOMIC)
+#define NANOARROW_IPC_USE_STDATOMIC 0
+#if defined(__STDC_VERSION__) && __STDC_VERSION__ >= 201112L
+#if !defined(__STDC_NO_ATOMICS__)
+#include <stdatomic.h>
+#undef NANOARROW_IPC_USE_STDATOMIC
+#define NANOARROW_IPC_USE_STDATOMIC 1
+#endif
+#endif
+
+#endif
+
#include "nanoarrow.h"
#include "nanoarrow_ipc.h"
@@ -20333,6 +20346,90 @@ static enum ArrowIpcEndianness
ArrowIpcSystemEndianness(void) {
}
}
+#if NANOARROW_IPC_USE_STDATOMIC
+struct ArrowIpcSharedBufferPrivate {
+ struct ArrowBuffer src;
+ atomic_long reference_count;
+};
+
+static int64_t ArrowIpcSharedBufferUpdate(
+ struct ArrowIpcSharedBufferPrivate* private_data, int delta) {
+ int64_t old_count = atomic_fetch_add(&private_data->reference_count, delta);
+ return old_count + delta;
+}
+
+static void ArrowIpcSharedBufferSet(struct ArrowIpcSharedBufferPrivate*
private_data,
+ int64_t count) {
+ atomic_store(&private_data->reference_count, count);
+}
+
+int ArrowIpcSharedBufferIsThreadSafe(void) { return 1; }
+#else
+struct ArrowIpcSharedBuffer {
+ struct ArrowBuffer src;
+ int64_t reference_count;
+};
+
+static int64_t ArrowIpcSharedBufferUpdate(
+ struct ArrowIpcSharedBufferPrivate* private_data, int delta) {
+ private_data->reference_count += delta;
+ return private_data->reference_count;
+}
+
+static void ArrowIpcSharedBufferSet(struct ArrowIpcSharedBufferPrivate*
private_data,
+ int64_t count) {
+ private_data->reference_count = count;
+}
+
+int ArrowIpcSharedBufferIsThreadSafe(void) { return 0; }
+#endif
+
+static void ArrowIpcSharedBufferFree(struct ArrowBufferAllocator* allocator,
uint8_t* ptr,
+ int64_t size) {
+ struct ArrowIpcSharedBufferPrivate* private_data =
+ (struct ArrowIpcSharedBufferPrivate*)allocator->private_data;
+
+ if (ArrowIpcSharedBufferUpdate(private_data, -1) == 0) {
+ ArrowBufferReset(&private_data->src);
+ ArrowFree(private_data);
+ }
+}
+
+ArrowErrorCode ArrowIpcSharedBufferInit(struct ArrowIpcSharedBuffer* shared,
+ struct ArrowBuffer* src) {
+ struct ArrowIpcSharedBufferPrivate* private_data =
+ (struct ArrowIpcSharedBufferPrivate*)ArrowMalloc(
+ sizeof(struct ArrowIpcSharedBufferPrivate));
+ if (private_data == NULL) {
+ return ENOMEM;
+ }
+
+ ArrowBufferMove(src, &private_data->src);
+ ArrowIpcSharedBufferSet(private_data, 1);
+
+ ArrowBufferInit(&shared->private_src);
+ shared->private_src.data = private_data->src.data;
+ shared->private_src.size_bytes = private_data->src.size_bytes;
+ // Don't expose any extra capcity from src so that any calls to
ArrowBufferAppend
+ // on this buffer will fail.
+ shared->private_src.capacity_bytes = private_data->src.size_bytes;
+ shared->private_src.allocator =
+ ArrowBufferDeallocator(&ArrowIpcSharedBufferFree, private_data);
+ return NANOARROW_OK;
+}
+
+static void ArrowIpcSharedBufferClone(struct ArrowIpcSharedBuffer* shared,
+ struct ArrowBuffer* shared_out) {
+ struct ArrowIpcSharedBufferPrivate* private_data =
+ (struct
ArrowIpcSharedBufferPrivate*)shared->private_src.allocator.private_data;
+ ArrowIpcSharedBufferUpdate(private_data, 1);
+ memcpy(shared_out, shared, sizeof(struct ArrowBuffer));
+}
+
+void ArrowIpcSharedBufferReset(struct ArrowIpcSharedBuffer* shared) {
+ ArrowBufferReset(&shared->private_src);
+}
+
static int ArrowIpcDecoderNeedsSwapEndian(struct ArrowIpcDecoder* decoder) {
struct ArrowIpcDecoderPrivate* private_data =
(struct ArrowIpcDecoderPrivate*)decoder->private_data;
@@ -21322,14 +21419,97 @@ ArrowErrorCode ArrowIpcDecoderSetEndianness(struct
ArrowIpcDecoder* decoder,
}
}
+/// \brief Information required to read and/or decompress a single buffer
+///
+/// The RecordBatch message header contains a description of each buffer
+/// in the message body. The ArrowIpcBufferSource is the parsed result of
+/// a single buffer with compression and endian information such that the
+/// original buffer can be reconstructed.
+struct ArrowIpcBufferSource {
+ int64_t body_offset_bytes;
+ int64_t buffer_length_bytes;
+ enum ArrowIpcCompressionType codec;
+ int swap_endian;
+};
+
+/// \brief Materializing ArrowBuffer objects
+///
+/// Given a description of where a buffer is located inside the message body,
make
+/// the ArrowBuffer that will be placed into the correct ArrowArray. The
decoder
+/// does not do any IO and does not make any assumptions about how or if the
body
+/// has been read into memory. This abstraction is currently internal and
exists
+/// to support the two obvious ways a user might go about this: (1) using a
+/// non-owned view of memory that must be copied slice-wise or (2) adding a
reference
+/// to an ArrowIpcSharedBuffer and returning a slice of that memory.
+struct ArrowIpcBufferFactory {
+ /// \brief User-defined callback to create initialize the desired buffer
into dst
+ ///
+ /// At the time that this callback is called, the ArrowIpcBufferSource has
been checked
+ /// to ensure that it is within the body size declared by the message
header. If
+ /// NANOARROW_OK is returned, the caller is responsible for dst. Otherwise,
error must
+ /// contain a null-terminated message.
+ ArrowErrorCode (*make_buffer)(struct ArrowIpcBufferFactory* factory,
+ struct ArrowIpcBufferSource* src, struct
ArrowBuffer* dst,
+ struct ArrowError* error);
+
+ /// \brief Caller-defined private data to be used in the callback.
+ ///
+ /// Usually this would be a description of where the body has been read into
memory or
+ /// information required to do so.
+ void* private_data;
+};
+
+static ArrowErrorCode ArrowIpcMakeBufferFromView(struct ArrowIpcBufferFactory*
factory,
+ struct ArrowIpcBufferSource*
src,
+ struct ArrowBuffer* dst,
+ struct ArrowError* error) {
+ struct ArrowBufferView* body = (struct
ArrowBufferView*)factory->private_data;
+
+ struct ArrowBufferView view;
+ view.data.as_uint8 = body->data.as_uint8 + src->body_offset_bytes;
+ view.size_bytes = src->buffer_length_bytes;
+
+ ArrowBufferInit(dst);
+ NANOARROW_RETURN_NOT_OK(ArrowBufferAppendBufferView(dst, view));
+ return NANOARROW_OK;
+}
+
+static struct ArrowIpcBufferFactory ArrowIpcBufferFactoryFromView(
+ struct ArrowBufferView* buffer_view) {
+ struct ArrowIpcBufferFactory out;
+ out.make_buffer = &ArrowIpcMakeBufferFromView;
+ out.private_data = buffer_view;
+ return out;
+}
+
+static ArrowErrorCode ArrowIpcMakeBufferFromShared(struct
ArrowIpcBufferFactory* factory,
+ struct
ArrowIpcBufferSource* src,
+ struct ArrowBuffer* dst,
+ struct ArrowError* error) {
+ struct ArrowIpcSharedBuffer* shared =
+ (struct ArrowIpcSharedBuffer*)factory->private_data;
+ ArrowIpcSharedBufferClone(shared, dst);
+ dst->data += src->body_offset_bytes;
+ dst->size_bytes = src->buffer_length_bytes;
+ return NANOARROW_OK;
+}
+
+static struct ArrowIpcBufferFactory ArrowIpcBufferFactoryFromShared(
+ struct ArrowIpcSharedBuffer* shared) {
+ struct ArrowIpcBufferFactory out;
+ out.make_buffer = &ArrowIpcMakeBufferFromShared;
+ out.private_data = shared;
+ return out;
+}
+
struct ArrowIpcArraySetter {
ns(FieldNode_vec_t) fields;
int64_t field_i;
ns(Buffer_vec_t) buffers;
int64_t buffer_i;
- struct ArrowBufferView body;
- enum ArrowIpcCompressionType codec;
- int swap_endian;
+ int64_t body_size_bytes;
+ struct ArrowIpcBufferSource src;
+ struct ArrowIpcBufferFactory factory;
};
static int ArrowIpcDecoderMakeBuffer(struct ArrowIpcArraySetter* setter,
int64_t offset,
@@ -21340,35 +21520,32 @@ static int ArrowIpcDecoderMakeBuffer(struct
ArrowIpcArraySetter* setter, int64_t
}
// Check that this buffer fits within the body
- if (offset < 0 || (offset + length) > setter->body.size_bytes) {
- ArrowErrorSet(error,
- "Buffer %ld requires body offsets [%ld..%ld) but body has
size %ld",
- (long)setter->buffer_i - 1, (long)offset, (long)offset +
(long)length,
- setter->body.size_bytes);
+ int64_t buffer_start = offset;
+ int64_t buffer_end = buffer_start + length;
+ if (buffer_start < 0 || buffer_end > setter->body_size_bytes) {
+ ArrowErrorSet(error, "Buffer requires body offsets [%ld..%ld) but body has
size %ld",
+ (long)buffer_start, (long)buffer_end,
(long)setter->body_size_bytes);
return EINVAL;
}
- struct ArrowBufferView view;
- view.data.as_uint8 = setter->body.data.as_uint8 + offset;
- view.size_bytes = length;
-
- if (setter->codec != NANOARROW_IPC_COMPRESSION_TYPE_NONE) {
+ // If the ArrowIpcBufferFactory is made public, these should get moved
(since then a
+ // user could inject support for either one). More likely, by the time that
happens,
+ // this library will be able to support some of these features.
+ if (setter->src.codec != NANOARROW_IPC_COMPRESSION_TYPE_NONE) {
ArrowErrorSet(error, "The nanoarrow_ipc extension does not support
compression");
return ENOTSUP;
}
- if (setter->swap_endian) {
+ if (setter->src.swap_endian) {
ArrowErrorSet(error,
"The nanoarrow_ipc extension does not support non-system
endianness");
return ENOTSUP;
}
- int result = ArrowBufferAppendBufferView(out, view);
- if (result != NANOARROW_OK) {
- ArrowErrorSet(error, "Failed to copy buffer");
- return result;
- }
-
+ setter->src.body_offset_bytes = offset;
+ setter->src.buffer_length_bytes = length;
+ NANOARROW_RETURN_NOT_OK(
+ setter->factory.make_buffer(&setter->factory, &setter->src, out, error));
return NANOARROW_OK;
}
@@ -21413,10 +21590,9 @@ static int ArrowIpcArrayInitFromArrayView(struct
ArrowArray* array,
return NANOARROW_OK;
}
-ArrowErrorCode ArrowIpcDecoderDecodeArray(struct ArrowIpcDecoder* decoder,
- struct ArrowBufferView body, int64_t
field_i,
- struct ArrowArray* out,
- struct ArrowError* error) {
+static ArrowErrorCode ArrowIpcDecoderDecodeArrayInternal(
+ struct ArrowIpcDecoder* decoder, struct ArrowIpcBufferFactory factory,
+ int64_t field_i, struct ArrowArray* out, struct ArrowError* error) {
struct ArrowIpcDecoderPrivate* private_data =
(struct ArrowIpcDecoderPrivate*)decoder->private_data;
@@ -21444,9 +21620,10 @@ ArrowErrorCode ArrowIpcDecoderDecodeArray(struct
ArrowIpcDecoder* decoder,
setter.field_i = field_i;
setter.buffers = ns(RecordBatch_buffers(batch));
setter.buffer_i = root->buffer_offset - 1;
- setter.body = body;
- setter.codec = decoder->codec;
- setter.swap_endian = ArrowIpcDecoderNeedsSwapEndian(decoder);
+ setter.body_size_bytes = decoder->body_size_bytes;
+ setter.factory = factory;
+ setter.src.codec = decoder->codec;
+ setter.src.swap_endian = ArrowIpcDecoderNeedsSwapEndian(decoder);
// The flatbuffers FieldNode doesn't count the root struct so we have to
loop over the
// children ourselves
@@ -21483,6 +21660,22 @@ ArrowErrorCode ArrowIpcDecoderDecodeArray(struct
ArrowIpcDecoder* decoder,
ArrowArrayMove(&temp, out);
return NANOARROW_OK;
}
+
+ArrowErrorCode ArrowIpcDecoderDecodeArray(struct ArrowIpcDecoder* decoder,
+ struct ArrowBufferView body, int64_t
i,
+ struct ArrowArray* out,
+ struct ArrowError* error) {
+ return ArrowIpcDecoderDecodeArrayInternal(decoder,
ArrowIpcBufferFactoryFromView(&body),
+ i, out, error);
+}
+
+ArrowErrorCode ArrowIpcDecoderDecodeArrayFromShared(struct ArrowIpcDecoder*
decoder,
+ struct
ArrowIpcSharedBuffer* body,
+ int64_t i, struct
ArrowArray* out,
+ struct ArrowError* error) {
+ return ArrowIpcDecoderDecodeArrayInternal(
+ decoder, ArrowIpcBufferFactoryFromShared(body), i, out, error);
+}
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
@@ -21651,6 +21844,7 @@ ArrowErrorCode ArrowIpcInputStreamInitFile(struct
ArrowIpcInputStream* stream,
struct ArrowIpcArrayStreamReaderPrivate {
struct ArrowIpcInputStream input;
struct ArrowIpcDecoder decoder;
+ int use_shared_buffers;
struct ArrowSchema out_schema;
int64_t field_index;
struct ArrowBuffer header;
@@ -21855,13 +22049,22 @@ static int ArrowIpcArrayStreamReaderGetNext(struct
ArrowArrayStream* stream,
// Read in the body
NANOARROW_RETURN_NOT_OK(ArrowIpcArrayStreamReaderNextBody(private_data));
- struct ArrowBufferView body_view;
- body_view.data.data = private_data->body.data;
- body_view.size_bytes = private_data->body.size_bytes;
+ if (private_data->use_shared_buffers) {
+ struct ArrowIpcSharedBuffer shared;
+ NANOARROW_RETURN_NOT_OK(ArrowIpcSharedBufferInit(&shared,
&private_data->body));
+ NANOARROW_RETURN_NOT_OK(ArrowIpcDecoderDecodeArrayFromShared(
+ &private_data->decoder, &shared, private_data->field_index, out,
+ &private_data->error));
+ ArrowIpcSharedBufferReset(&shared);
+ } else {
+ struct ArrowBufferView body_view;
+ body_view.data.data = private_data->body.data;
+ body_view.size_bytes = private_data->body.size_bytes;
- NANOARROW_RETURN_NOT_OK(ArrowIpcDecoderDecodeArray(&private_data->decoder,
body_view,
-
private_data->field_index, out,
- &private_data->error));
+ NANOARROW_RETURN_NOT_OK(ArrowIpcDecoderDecodeArray(&private_data->decoder,
body_view,
+
private_data->field_index, out,
+ &private_data->error));
+ }
return NANOARROW_OK;
}
@@ -21896,8 +22099,10 @@ ArrowErrorCode ArrowIpcArrayStreamReaderInit(
if (options != NULL) {
private_data->field_index = options->field_index;
+ private_data->use_shared_buffers = options->use_shared_buffers;
} else {
private_data->field_index = -1;
+ private_data->use_shared_buffers = ArrowIpcSharedBufferIsThreadSafe();
}
out->private_data = private_data;
diff --git a/dist/nanoarrow_ipc.h b/dist/nanoarrow_ipc.h
index 071b299..3fa4163 100644
--- a/dist/nanoarrow_ipc.h
+++ b/dist/nanoarrow_ipc.h
@@ -23,6 +23,12 @@
#ifdef NANOARROW_NAMESPACE
#define ArrowIpcCheckRuntime NANOARROW_SYMBOL(NANOARROW_NAMESPACE,
ArrowIpcCheckRuntime)
+#define ArrowIpcSharedBufferIsThreadSafe \
+ NANOARROW_SYMBOL(NANOARROW_NAMESPACE, ArrowIpcSharedBufferIsThreadSafe)
+#define ArrowIpcSharedBufferInit \
+ NANOARROW_SYMBOL(NANOARROW_NAMESPACE, ArrowIpcSharedBufferInit)
+#define ArrowIpcSharedBufferReset \
+ NANOARROW_SYMBOL(NANOARROW_NAMESPACE, ArrowIpcSharedBufferReset)
#define ArrowIpcDecoderInit NANOARROW_SYMBOL(NANOARROW_NAMESPACE,
ArrowIpcDecoderInit)
#define ArrowIpcDecoderReset NANOARROW_SYMBOL(NANOARROW_NAMESPACE,
ArrowIpcDecoderReset)
#define ArrowIpcDecoderPeekHeader \
@@ -35,6 +41,8 @@
NANOARROW_SYMBOL(NANOARROW_NAMESPACE, ArrowIpcDecoderDecodeSchema)
#define ArrowIpcDecoderDecodeArray \
NANOARROW_SYMBOL(NANOARROW_NAMESPACE, ArrowIpcDecoderDecodeArray)
+#define ArrowIpcDecoderDecodeArrayFromShared \
+ NANOARROW_SYMBOL(NANOARROW_NAMESPACE, ArrowIpcDecoderDecodeArrayFromShared)
#define ArrowIpcDecoderSetSchema \
NANOARROW_SYMBOL(NANOARROW_NAMESPACE, ArrowIpcDecoderSetSchema)
#define ArrowIpcDecoderSetEndianness \
@@ -86,8 +94,36 @@ enum ArrowIpcCompressionType {
#define NANOARROW_IPC_FEATURE_DICTIONARY_REPLACEMENT 1
#define NANOARROW_IPC_FEATURE_COMPRESSED_BODY 2
+/// \brief Checks the nanoarrow runtime to make sure the run/build versions
match
ArrowErrorCode ArrowIpcCheckRuntime(struct ArrowError* error);
+/// \brief A structure representing a reference-counted buffer that may be
passed to
+/// ArrowIpcDecoderDecodeArrayFromShared().
+struct ArrowIpcSharedBuffer {
+ struct ArrowBuffer private_src;
+};
+
+/// \brief Initialize the contents of a ArrowIpcSharedBuffer struct
+///
+/// If NANOARROW_OK is returned, the ArrowIpcSharedBuffer takes ownership of
+/// src.
+ArrowErrorCode ArrowIpcSharedBufferInit(struct ArrowIpcSharedBuffer* shared,
+ struct ArrowBuffer* src);
+
+/// \brief Release the caller's copy of the shared buffer
+///
+/// When finished, the caller must relinquish its own copy of the shared data
+/// using this function. The original buffer will continue to exist until all
+/// ArrowArray objects that refer to it have also been released.
+void ArrowIpcSharedBufferReset(struct ArrowIpcSharedBuffer* shared);
+
+/// \brief Check for shared buffer thread safety
+///
+/// Thread-safe shared buffers require C11 and the stdatomic.h header.
+/// If either are unavailable, shared buffers are still possible but
+/// the resulting arrays must not be passed to other threads to be released.
+int ArrowIpcSharedBufferIsThreadSafe(void);
+
/// \brief Decoder for Arrow IPC messages
///
/// This structure is intended to be allocated by the caller,
@@ -227,6 +263,18 @@ ArrowErrorCode ArrowIpcDecoderDecodeArray(struct
ArrowIpcDecoder* decoder,
struct ArrowArray* out,
struct ArrowError* error);
+/// \brief Decode an ArrowArray from an owned buffer
+///
+/// This implementation takes advantage of the fact that it can avoid copying
individual
+/// buffers. In all cases the caller must ArrowIpcSharedBufferReset() body
after one or
+/// more calls to ArrowIpcDecoderDecodeArrayFromShared(). If
+/// ArrowIpcSharedBufferIsThreadSafe() returns 0, out must not be released by
another
+/// thread.
+ArrowErrorCode ArrowIpcDecoderDecodeArrayFromShared(struct ArrowIpcDecoder*
decoder,
+ struct
ArrowIpcSharedBuffer* shared,
+ int64_t i, struct
ArrowArray* out,
+ struct ArrowError* error);
+
/// \brief An user-extensible input data source
struct ArrowIpcInputStream {
/// \brief Read up to buf_size_bytes from stream into buf
@@ -266,8 +314,20 @@ ArrowErrorCode ArrowIpcInputStreamInitFile(struct
ArrowIpcInputStream* stream,
/// \brief Options for ArrowIpcArrayStreamReaderInit()
struct ArrowIpcArrayStreamReaderOptions {
- /// \brief The field index to extract. Defaults to -1 (i.e., read all
fields).
+ /// \brief The field index to extract.
+ ///
+ /// Defaults to -1 (i.e., read all fields). Note that this field index
refers to
+ /// the flattened tree of children and not necessarily the column index.
int64_t field_index;
+
+ /// \brief Set to a non-zero value to share the message body buffer among
decoded arrays
+ ///
+ /// Sharing buffers is a good choice when (1) using memory-mapped IO
+ /// (since unreferenced portions of the file are often not loaded into
memory) or
+ /// (2) if all data from all columns are about to be referenced anyway. When
loading
+ /// a single field there is probably no advantage to using shared buffers.
+ /// Defaults to the value of ArrowIpcSharedBufferIsThreadSafe().
+ int use_shared_buffers;
};
/// \brief Initialize an ArrowArrayStream from an input stream of bytes