This is an automated email from the ASF dual-hosted git repository.
paleolimbot pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/arrow-nanoarrow.git
The following commit(s) were added to refs/heads/main by this push:
new 28cd737 feat(extensions/nanoarrow_ipc): Allow shared buffers for
zero-copy buffer decode (#165)
28cd737 is described below
commit 28cd7377a789f5cb25067a53df29ef37962026b8
Author: Dewey Dunnington <[email protected]>
AuthorDate: Fri Mar 24 09:53:40 2023 -0400
feat(extensions/nanoarrow_ipc): Allow shared buffers for zero-copy buffer
decode (#165)
This PR makes buffer construction more flexible, with the initial target
being zero-copy shared references to a single `struct ArrowBuffer`
containing the read-in message body. Where supported (i.e., C11 and up),
these shared buffer references are thread safe.
I implemented this by abstracting out a `struct ArrowIpcBufferFactory`.
This could be exposed to the user, too (but maybe in another PR where I
also implement endian swapping).
Using the decoder API, you can request this using a new function:
```c
ArrowErrorCode ArrowIpcDecoderDecodeArrayFromShared(struct ArrowIpcDecoder*
decoder,
struct
ArrowIpcSharedBuffer* body, int64_t i,
struct ArrowArray* out,
struct ArrowError*
error);
```
Using the `ArrowArrayStream` interface this is the default *if* shared
buffers are thread safe (this can be overridden).
---
extensions/nanoarrow_ipc/CMakeLists.txt | 1 +
.../nanoarrow_ipc/src/nanoarrow/nanoarrow_ipc.h | 62 +++++-
.../src/nanoarrow/nanoarrow_ipc_decoder.c | 247 ++++++++++++++++++---
.../src/nanoarrow/nanoarrow_ipc_decoder_test.cc | 72 +++++-
.../src/nanoarrow/nanoarrow_ipc_reader.c | 24 +-
.../src/nanoarrow/nanoarrow_ipc_reader_test.cc | 46 ++++
6 files changed, 415 insertions(+), 37 deletions(-)
diff --git a/extensions/nanoarrow_ipc/CMakeLists.txt
b/extensions/nanoarrow_ipc/CMakeLists.txt
index 8830b35..be5a71e 100644
--- a/extensions/nanoarrow_ipc/CMakeLists.txt
+++ b/extensions/nanoarrow_ipc/CMakeLists.txt
@@ -204,6 +204,7 @@ if (NANOARROW_IPC_BUILD_TESTS)
include(GoogleTest)
gtest_discover_tests(nanoarrow_ipc_decoder_test)
+ gtest_discover_tests(nanoarrow_ipc_reader_test)
endif()
if (NANOARROW_IPC_BUILD_APPS)
diff --git a/extensions/nanoarrow_ipc/src/nanoarrow/nanoarrow_ipc.h
b/extensions/nanoarrow_ipc/src/nanoarrow/nanoarrow_ipc.h
index 071b299..3fa4163 100644
--- a/extensions/nanoarrow_ipc/src/nanoarrow/nanoarrow_ipc.h
+++ b/extensions/nanoarrow_ipc/src/nanoarrow/nanoarrow_ipc.h
@@ -23,6 +23,12 @@
#ifdef NANOARROW_NAMESPACE
#define ArrowIpcCheckRuntime NANOARROW_SYMBOL(NANOARROW_NAMESPACE,
ArrowIpcCheckRuntime)
+#define ArrowIpcSharedBufferIsThreadSafe \
+ NANOARROW_SYMBOL(NANOARROW_NAMESPACE, ArrowIpcSharedBufferIsThreadSafe)
+#define ArrowIpcSharedBufferInit \
+ NANOARROW_SYMBOL(NANOARROW_NAMESPACE, ArrowIpcSharedBufferInit)
+#define ArrowIpcSharedBufferReset \
+ NANOARROW_SYMBOL(NANOARROW_NAMESPACE, ArrowIpcSharedBufferReset)
#define ArrowIpcDecoderInit NANOARROW_SYMBOL(NANOARROW_NAMESPACE,
ArrowIpcDecoderInit)
#define ArrowIpcDecoderReset NANOARROW_SYMBOL(NANOARROW_NAMESPACE,
ArrowIpcDecoderReset)
#define ArrowIpcDecoderPeekHeader \
@@ -35,6 +41,8 @@
NANOARROW_SYMBOL(NANOARROW_NAMESPACE, ArrowIpcDecoderDecodeSchema)
#define ArrowIpcDecoderDecodeArray \
NANOARROW_SYMBOL(NANOARROW_NAMESPACE, ArrowIpcDecoderDecodeArray)
+#define ArrowIpcDecoderDecodeArrayFromShared \
+ NANOARROW_SYMBOL(NANOARROW_NAMESPACE, ArrowIpcDecoderDecodeArrayFromShared)
#define ArrowIpcDecoderSetSchema \
NANOARROW_SYMBOL(NANOARROW_NAMESPACE, ArrowIpcDecoderSetSchema)
#define ArrowIpcDecoderSetEndianness \
@@ -86,8 +94,36 @@ enum ArrowIpcCompressionType {
#define NANOARROW_IPC_FEATURE_DICTIONARY_REPLACEMENT 1
#define NANOARROW_IPC_FEATURE_COMPRESSED_BODY 2
+/// \brief Checks the nanoarrow runtime to make sure the run/build versions
match
ArrowErrorCode ArrowIpcCheckRuntime(struct ArrowError* error);
+/// \brief A structure representing a reference-counted buffer that may be
passed to
+/// ArrowIpcDecoderDecodeArrayFromShared().
+struct ArrowIpcSharedBuffer {
+ struct ArrowBuffer private_src;
+};
+
+/// \brief Initialize the contents of a ArrowIpcSharedBuffer struct
+///
+/// If NANOARROW_OK is returned, the ArrowIpcSharedBuffer takes ownership of
+/// src.
+ArrowErrorCode ArrowIpcSharedBufferInit(struct ArrowIpcSharedBuffer* shared,
+ struct ArrowBuffer* src);
+
+/// \brief Release the caller's copy of the shared buffer
+///
+/// When finished, the caller must relinquish its own copy of the shared data
+/// using this function. The original buffer will continue to exist until all
+/// ArrowArray objects that refer to it have also been released.
+void ArrowIpcSharedBufferReset(struct ArrowIpcSharedBuffer* shared);
+
+/// \brief Check for shared buffer thread safety
+///
+/// Thread-safe shared buffers require C11 and the stdatomic.h header.
+/// If either are unavailable, shared buffers are still possible but
+/// the resulting arrays must not be passed to other threads to be released.
+int ArrowIpcSharedBufferIsThreadSafe(void);
+
/// \brief Decoder for Arrow IPC messages
///
/// This structure is intended to be allocated by the caller,
@@ -227,6 +263,18 @@ ArrowErrorCode ArrowIpcDecoderDecodeArray(struct
ArrowIpcDecoder* decoder,
struct ArrowArray* out,
struct ArrowError* error);
+/// \brief Decode an ArrowArray from an owned buffer
+///
+/// This implementation takes advantage of the fact that it can avoid copying
individual
+/// buffers. In all cases the caller must ArrowIpcSharedBufferReset() body
after one or
+/// more calls to ArrowIpcDecoderDecodeArrayFromShared(). If
+/// ArrowIpcSharedBufferIsThreadSafe() returns 0, out must not be released by
another
+/// thread.
+ArrowErrorCode ArrowIpcDecoderDecodeArrayFromShared(struct ArrowIpcDecoder*
decoder,
+ struct
ArrowIpcSharedBuffer* shared,
+ int64_t i, struct
ArrowArray* out,
+ struct ArrowError* error);
+
/// \brief An user-extensible input data source
struct ArrowIpcInputStream {
/// \brief Read up to buf_size_bytes from stream into buf
@@ -266,8 +314,20 @@ ArrowErrorCode ArrowIpcInputStreamInitFile(struct
ArrowIpcInputStream* stream,
/// \brief Options for ArrowIpcArrayStreamReaderInit()
struct ArrowIpcArrayStreamReaderOptions {
- /// \brief The field index to extract. Defaults to -1 (i.e., read all
fields).
+ /// \brief The field index to extract.
+ ///
+ /// Defaults to -1 (i.e., read all fields). Note that this field index
refers to
+ /// the flattened tree of children and not necessarily the column index.
int64_t field_index;
+
+ /// \brief Set to a non-zero value to share the message body buffer among
decoded arrays
+ ///
+ /// Sharing buffers is a good choice when (1) using memory-mapped IO
+ /// (since unreferenced portions of the file are often not loaded into
memory) or
+ /// (2) if all data from all columns are about to be referenced anyway. When
loading
+ /// a single field there is probably no advantage to using shared buffers.
+ /// Defaults to the value of ArrowIpcSharedBufferIsThreadSafe().
+ int use_shared_buffers;
};
/// \brief Initialize an ArrowArrayStream from an input stream of bytes
diff --git a/extensions/nanoarrow_ipc/src/nanoarrow/nanoarrow_ipc_decoder.c
b/extensions/nanoarrow_ipc/src/nanoarrow/nanoarrow_ipc_decoder.c
index e9c673e..a84419b 100644
--- a/extensions/nanoarrow_ipc/src/nanoarrow/nanoarrow_ipc_decoder.c
+++ b/extensions/nanoarrow_ipc/src/nanoarrow/nanoarrow_ipc_decoder.c
@@ -19,6 +19,19 @@
#include <stdio.h>
#include <string.h>
+// For thread safe shared buffers we need C11 + stdatomic.h
+#if !defined(NANOARROW_IPC_USE_STDATOMIC)
+#define NANOARROW_IPC_USE_STDATOMIC 0
+#if defined(__STDC_VERSION__) && __STDC_VERSION__ >= 201112L
+#if !defined(__STDC_NO_ATOMICS__)
+#include <stdatomic.h>
+#undef NANOARROW_IPC_USE_STDATOMIC
+#define NANOARROW_IPC_USE_STDATOMIC 1
+#endif
+#endif
+
+#endif
+
#include "nanoarrow.h"
#include "nanoarrow_ipc.h"
#include "nanoarrow_ipc_flatcc_generated.h"
@@ -62,6 +75,90 @@ static enum ArrowIpcEndianness
ArrowIpcSystemEndianness(void) {
}
}
+#if NANOARROW_IPC_USE_STDATOMIC
+struct ArrowIpcSharedBufferPrivate {
+ struct ArrowBuffer src;
+ atomic_long reference_count;
+};
+
+static int64_t ArrowIpcSharedBufferUpdate(
+ struct ArrowIpcSharedBufferPrivate* private_data, int delta) {
+ int64_t old_count = atomic_fetch_add(&private_data->reference_count, delta);
+ return old_count + delta;
+}
+
+static void ArrowIpcSharedBufferSet(struct ArrowIpcSharedBufferPrivate*
private_data,
+ int64_t count) {
+ atomic_store(&private_data->reference_count, count);
+}
+
+int ArrowIpcSharedBufferIsThreadSafe(void) { return 1; }
+#else
+struct ArrowIpcSharedBuffer {
+ struct ArrowBuffer src;
+ int64_t reference_count;
+};
+
+static int64_t ArrowIpcSharedBufferUpdate(
+ struct ArrowIpcSharedBufferPrivate* private_data, int delta) {
+ private_data->reference_count += delta;
+ return private_data->reference_count;
+}
+
+static void ArrowIpcSharedBufferSet(struct ArrowIpcSharedBufferPrivate*
private_data,
+ int64_t count) {
+ private_data->reference_count = count;
+}
+
+int ArrowIpcSharedBufferIsThreadSafe(void) { return 0; }
+#endif
+
+static void ArrowIpcSharedBufferFree(struct ArrowBufferAllocator* allocator,
uint8_t* ptr,
+ int64_t size) {
+ struct ArrowIpcSharedBufferPrivate* private_data =
+ (struct ArrowIpcSharedBufferPrivate*)allocator->private_data;
+
+ if (ArrowIpcSharedBufferUpdate(private_data, -1) == 0) {
+ ArrowBufferReset(&private_data->src);
+ ArrowFree(private_data);
+ }
+}
+
+ArrowErrorCode ArrowIpcSharedBufferInit(struct ArrowIpcSharedBuffer* shared,
+ struct ArrowBuffer* src) {
+ struct ArrowIpcSharedBufferPrivate* private_data =
+ (struct ArrowIpcSharedBufferPrivate*)ArrowMalloc(
+ sizeof(struct ArrowIpcSharedBufferPrivate));
+ if (private_data == NULL) {
+ return ENOMEM;
+ }
+
+ ArrowBufferMove(src, &private_data->src);
+ ArrowIpcSharedBufferSet(private_data, 1);
+
+ ArrowBufferInit(&shared->private_src);
+ shared->private_src.data = private_data->src.data;
+ shared->private_src.size_bytes = private_data->src.size_bytes;
+ // Don't expose any extra capcity from src so that any calls to
ArrowBufferAppend
+ // on this buffer will fail.
+ shared->private_src.capacity_bytes = private_data->src.size_bytes;
+ shared->private_src.allocator =
+ ArrowBufferDeallocator(&ArrowIpcSharedBufferFree, private_data);
+ return NANOARROW_OK;
+}
+
+static void ArrowIpcSharedBufferClone(struct ArrowIpcSharedBuffer* shared,
+ struct ArrowBuffer* shared_out) {
+ struct ArrowIpcSharedBufferPrivate* private_data =
+ (struct
ArrowIpcSharedBufferPrivate*)shared->private_src.allocator.private_data;
+ ArrowIpcSharedBufferUpdate(private_data, 1);
+ memcpy(shared_out, shared, sizeof(struct ArrowBuffer));
+}
+
+void ArrowIpcSharedBufferReset(struct ArrowIpcSharedBuffer* shared) {
+ ArrowBufferReset(&shared->private_src);
+}
+
static int ArrowIpcDecoderNeedsSwapEndian(struct ArrowIpcDecoder* decoder) {
struct ArrowIpcDecoderPrivate* private_data =
(struct ArrowIpcDecoderPrivate*)decoder->private_data;
@@ -1051,14 +1148,97 @@ ArrowErrorCode ArrowIpcDecoderSetEndianness(struct
ArrowIpcDecoder* decoder,
}
}
+/// \brief Information required to read and/or decompress a single buffer
+///
+/// The RecordBatch message header contains a description of each buffer
+/// in the message body. The ArrowIpcBufferSource is the parsed result of
+/// a single buffer with compression and endian information such that the
+/// original buffer can be reconstructed.
+struct ArrowIpcBufferSource {
+ int64_t body_offset_bytes;
+ int64_t buffer_length_bytes;
+ enum ArrowIpcCompressionType codec;
+ int swap_endian;
+};
+
+/// \brief Materializing ArrowBuffer objects
+///
+/// Given a description of where a buffer is located inside the message body,
make
+/// the ArrowBuffer that will be placed into the correct ArrowArray. The
decoder
+/// does not do any IO and does not make any assumptions about how or if the
body
+/// has been read into memory. This abstraction is currently internal and
exists
+/// to support the two obvious ways a user might go about this: (1) using a
+/// non-owned view of memory that must be copied slice-wise or (2) adding a
reference
+/// to an ArrowIpcSharedBuffer and returning a slice of that memory.
+struct ArrowIpcBufferFactory {
+ /// \brief User-defined callback to create initialize the desired buffer
into dst
+ ///
+ /// At the time that this callback is called, the ArrowIpcBufferSource has
been checked
+ /// to ensure that it is within the body size declared by the message
header. If
+ /// NANOARROW_OK is returned, the caller is responsible for dst. Otherwise,
error must
+ /// contain a null-terminated message.
+ ArrowErrorCode (*make_buffer)(struct ArrowIpcBufferFactory* factory,
+ struct ArrowIpcBufferSource* src, struct
ArrowBuffer* dst,
+ struct ArrowError* error);
+
+ /// \brief Caller-defined private data to be used in the callback.
+ ///
+ /// Usually this would be a description of where the body has been read into
memory or
+ /// information required to do so.
+ void* private_data;
+};
+
+static ArrowErrorCode ArrowIpcMakeBufferFromView(struct ArrowIpcBufferFactory*
factory,
+ struct ArrowIpcBufferSource*
src,
+ struct ArrowBuffer* dst,
+ struct ArrowError* error) {
+ struct ArrowBufferView* body = (struct
ArrowBufferView*)factory->private_data;
+
+ struct ArrowBufferView view;
+ view.data.as_uint8 = body->data.as_uint8 + src->body_offset_bytes;
+ view.size_bytes = src->buffer_length_bytes;
+
+ ArrowBufferInit(dst);
+ NANOARROW_RETURN_NOT_OK(ArrowBufferAppendBufferView(dst, view));
+ return NANOARROW_OK;
+}
+
+static struct ArrowIpcBufferFactory ArrowIpcBufferFactoryFromView(
+ struct ArrowBufferView* buffer_view) {
+ struct ArrowIpcBufferFactory out;
+ out.make_buffer = &ArrowIpcMakeBufferFromView;
+ out.private_data = buffer_view;
+ return out;
+}
+
+static ArrowErrorCode ArrowIpcMakeBufferFromShared(struct
ArrowIpcBufferFactory* factory,
+ struct
ArrowIpcBufferSource* src,
+ struct ArrowBuffer* dst,
+ struct ArrowError* error) {
+ struct ArrowIpcSharedBuffer* shared =
+ (struct ArrowIpcSharedBuffer*)factory->private_data;
+ ArrowIpcSharedBufferClone(shared, dst);
+ dst->data += src->body_offset_bytes;
+ dst->size_bytes = src->buffer_length_bytes;
+ return NANOARROW_OK;
+}
+
+static struct ArrowIpcBufferFactory ArrowIpcBufferFactoryFromShared(
+ struct ArrowIpcSharedBuffer* shared) {
+ struct ArrowIpcBufferFactory out;
+ out.make_buffer = &ArrowIpcMakeBufferFromShared;
+ out.private_data = shared;
+ return out;
+}
+
struct ArrowIpcArraySetter {
ns(FieldNode_vec_t) fields;
int64_t field_i;
ns(Buffer_vec_t) buffers;
int64_t buffer_i;
- struct ArrowBufferView body;
- enum ArrowIpcCompressionType codec;
- int swap_endian;
+ int64_t body_size_bytes;
+ struct ArrowIpcBufferSource src;
+ struct ArrowIpcBufferFactory factory;
};
static int ArrowIpcDecoderMakeBuffer(struct ArrowIpcArraySetter* setter,
int64_t offset,
@@ -1069,35 +1249,32 @@ static int ArrowIpcDecoderMakeBuffer(struct
ArrowIpcArraySetter* setter, int64_t
}
// Check that this buffer fits within the body
- if (offset < 0 || (offset + length) > setter->body.size_bytes) {
- ArrowErrorSet(error,
- "Buffer %ld requires body offsets [%ld..%ld) but body has
size %ld",
- (long)setter->buffer_i - 1, (long)offset, (long)offset +
(long)length,
- setter->body.size_bytes);
+ int64_t buffer_start = offset;
+ int64_t buffer_end = buffer_start + length;
+ if (buffer_start < 0 || buffer_end > setter->body_size_bytes) {
+ ArrowErrorSet(error, "Buffer requires body offsets [%ld..%ld) but body has
size %ld",
+ (long)buffer_start, (long)buffer_end,
(long)setter->body_size_bytes);
return EINVAL;
}
- struct ArrowBufferView view;
- view.data.as_uint8 = setter->body.data.as_uint8 + offset;
- view.size_bytes = length;
-
- if (setter->codec != NANOARROW_IPC_COMPRESSION_TYPE_NONE) {
+ // If the ArrowIpcBufferFactory is made public, these should get moved
(since then a
+ // user could inject support for either one). More likely, by the time that
happens,
+ // this library will be able to support some of these features.
+ if (setter->src.codec != NANOARROW_IPC_COMPRESSION_TYPE_NONE) {
ArrowErrorSet(error, "The nanoarrow_ipc extension does not support
compression");
return ENOTSUP;
}
- if (setter->swap_endian) {
+ if (setter->src.swap_endian) {
ArrowErrorSet(error,
"The nanoarrow_ipc extension does not support non-system
endianness");
return ENOTSUP;
}
- int result = ArrowBufferAppendBufferView(out, view);
- if (result != NANOARROW_OK) {
- ArrowErrorSet(error, "Failed to copy buffer");
- return result;
- }
-
+ setter->src.body_offset_bytes = offset;
+ setter->src.buffer_length_bytes = length;
+ NANOARROW_RETURN_NOT_OK(
+ setter->factory.make_buffer(&setter->factory, &setter->src, out, error));
return NANOARROW_OK;
}
@@ -1142,10 +1319,9 @@ static int ArrowIpcArrayInitFromArrayView(struct
ArrowArray* array,
return NANOARROW_OK;
}
-ArrowErrorCode ArrowIpcDecoderDecodeArray(struct ArrowIpcDecoder* decoder,
- struct ArrowBufferView body, int64_t
field_i,
- struct ArrowArray* out,
- struct ArrowError* error) {
+static ArrowErrorCode ArrowIpcDecoderDecodeArrayInternal(
+ struct ArrowIpcDecoder* decoder, struct ArrowIpcBufferFactory factory,
+ int64_t field_i, struct ArrowArray* out, struct ArrowError* error) {
struct ArrowIpcDecoderPrivate* private_data =
(struct ArrowIpcDecoderPrivate*)decoder->private_data;
@@ -1173,9 +1349,10 @@ ArrowErrorCode ArrowIpcDecoderDecodeArray(struct
ArrowIpcDecoder* decoder,
setter.field_i = field_i;
setter.buffers = ns(RecordBatch_buffers(batch));
setter.buffer_i = root->buffer_offset - 1;
- setter.body = body;
- setter.codec = decoder->codec;
- setter.swap_endian = ArrowIpcDecoderNeedsSwapEndian(decoder);
+ setter.body_size_bytes = decoder->body_size_bytes;
+ setter.factory = factory;
+ setter.src.codec = decoder->codec;
+ setter.src.swap_endian = ArrowIpcDecoderNeedsSwapEndian(decoder);
// The flatbuffers FieldNode doesn't count the root struct so we have to
loop over the
// children ourselves
@@ -1212,3 +1389,19 @@ ArrowErrorCode ArrowIpcDecoderDecodeArray(struct
ArrowIpcDecoder* decoder,
ArrowArrayMove(&temp, out);
return NANOARROW_OK;
}
+
+ArrowErrorCode ArrowIpcDecoderDecodeArray(struct ArrowIpcDecoder* decoder,
+ struct ArrowBufferView body, int64_t
i,
+ struct ArrowArray* out,
+ struct ArrowError* error) {
+ return ArrowIpcDecoderDecodeArrayInternal(decoder,
ArrowIpcBufferFactoryFromView(&body),
+ i, out, error);
+}
+
+ArrowErrorCode ArrowIpcDecoderDecodeArrayFromShared(struct ArrowIpcDecoder*
decoder,
+ struct
ArrowIpcSharedBuffer* body,
+ int64_t i, struct
ArrowArray* out,
+ struct ArrowError* error) {
+ return ArrowIpcDecoderDecodeArrayInternal(
+ decoder, ArrowIpcBufferFactoryFromShared(body), i, out, error);
+}
diff --git
a/extensions/nanoarrow_ipc/src/nanoarrow/nanoarrow_ipc_decoder_test.cc
b/extensions/nanoarrow_ipc/src/nanoarrow/nanoarrow_ipc_decoder_test.cc
index caf7ebd..7c24b67 100644
--- a/extensions/nanoarrow_ipc/src/nanoarrow/nanoarrow_ipc_decoder_test.cc
+++ b/extensions/nanoarrow_ipc/src/nanoarrow/nanoarrow_ipc_decoder_test.cc
@@ -333,10 +333,9 @@ TEST(NanoarrowIpcTest,
NanoarrowIpcDecodeSimpleRecordBatch) {
ArrowIpcDecoderSetEndianness(&decoder,
NANOARROW_IPC_ENDIANNESS_UNINITIALIZED);
// Field extract should fail if body is too small
- body.size_bytes = 0;
+ decoder.body_size_bytes = 0;
EXPECT_EQ(ArrowIpcDecoderDecodeArray(&decoder, body, 0, &array, &error),
EINVAL);
- EXPECT_STREQ(error.message,
- "Buffer 1 requires body offsets [0..12) but body has size 0");
+ EXPECT_STREQ(error.message, "Buffer requires body offsets [0..12) but body
has size 0");
// Should error if the number of buffers or field nodes doesn't match
// (different numbers because we count the root struct and the message does
not)
@@ -436,6 +435,73 @@ TEST_P(ArrowTypeParameterizedTestFixture,
NanoarrowIpcArrowTypeRoundtrip) {
ArrowIpcDecoderReset(&decoder);
}
+TEST(NanoarrowIpcTest, NanoarrowIpcDecodeSimpleRecordBatchOwned) {
+ struct ArrowIpcDecoder decoder;
+ struct ArrowError error;
+ struct ArrowSchema schema;
+ struct ArrowArray array;
+
+ // Data buffer content of the hard-coded record batch message
+ uint8_t one_two_three_le[] = {0x01, 0x00, 0x00, 0x00, 0x02, 0x00,
+ 0x00, 0x00, 0x03, 0x00, 0x00, 0x00};
+
+ ArrowSchemaInit(&schema);
+ ASSERT_EQ(ArrowSchemaSetTypeStruct(&schema, 1), NANOARROW_OK);
+ ASSERT_EQ(ArrowSchemaSetType(schema.children[0], NANOARROW_TYPE_INT32),
NANOARROW_OK);
+
+ struct ArrowBufferView data;
+ data.data.as_uint8 = kSimpleRecordBatch;
+ data.size_bytes = sizeof(kSimpleRecordBatch);
+
+ ArrowIpcDecoderInit(&decoder);
+ auto decoder_private =
+ reinterpret_cast<struct ArrowIpcDecoderPrivate*>(decoder.private_data);
+
+ ASSERT_EQ(ArrowIpcDecoderSetSchema(&decoder, &schema, nullptr),
NANOARROW_OK);
+ EXPECT_EQ(ArrowIpcDecoderDecodeHeader(&decoder, data, &error), NANOARROW_OK);
+
+ struct ArrowBuffer body;
+ ArrowBufferInit(&body);
+ ASSERT_EQ(ArrowBufferAppend(&body, kSimpleRecordBatch +
decoder.header_size_bytes,
+ decoder.body_size_bytes),
+ NANOARROW_OK);
+
+ struct ArrowIpcSharedBuffer shared;
+ ASSERT_EQ(ArrowIpcSharedBufferInit(&shared, &body), NANOARROW_OK);
+
+ // Check full struct extract
+ EXPECT_EQ(ArrowIpcDecoderDecodeArrayFromShared(&decoder, &shared, -1,
&array, nullptr),
+ NANOARROW_OK);
+
+ EXPECT_EQ(array.length, 3);
+ EXPECT_EQ(array.null_count, 0);
+ ASSERT_EQ(array.n_children, 1);
+ ASSERT_EQ(array.children[0]->n_buffers, 2);
+ ASSERT_EQ(array.children[0]->length, 3);
+ EXPECT_EQ(array.children[0]->null_count, 0);
+ EXPECT_EQ(
+ memcmp(array.children[0]->buffers[1], one_two_three_le,
sizeof(one_two_three_le)),
+ 0);
+
+ array.release(&array);
+
+ // Check field extract
+ EXPECT_EQ(ArrowIpcDecoderDecodeArrayFromShared(&decoder, &shared, 0, &array,
nullptr),
+ NANOARROW_OK);
+ // Release the original shared (forthcoming array buffers should still be
valid)
+ ArrowIpcSharedBufferReset(&shared);
+
+ ASSERT_EQ(array.n_buffers, 2);
+ ASSERT_EQ(array.length, 3);
+ EXPECT_EQ(array.null_count, 0);
+ EXPECT_EQ(memcmp(array.buffers[1], one_two_three_le,
sizeof(one_two_three_le)), 0);
+
+ array.release(&array);
+ schema.release(&schema);
+ ArrowBufferReset(&body);
+ ArrowIpcDecoderReset(&decoder);
+}
+
TEST_P(ArrowTypeParameterizedTestFixture, NanoarrowIpcArrowArrayRoundtrip) {
const std::shared_ptr<arrow::DataType>& data_type = GetParam();
std::shared_ptr<arrow::Schema> dummy_schema =
diff --git a/extensions/nanoarrow_ipc/src/nanoarrow/nanoarrow_ipc_reader.c
b/extensions/nanoarrow_ipc/src/nanoarrow/nanoarrow_ipc_reader.c
index f813cab..58d7da9 100644
--- a/extensions/nanoarrow_ipc/src/nanoarrow/nanoarrow_ipc_reader.c
+++ b/extensions/nanoarrow_ipc/src/nanoarrow/nanoarrow_ipc_reader.c
@@ -166,6 +166,7 @@ ArrowErrorCode ArrowIpcInputStreamInitFile(struct
ArrowIpcInputStream* stream,
struct ArrowIpcArrayStreamReaderPrivate {
struct ArrowIpcInputStream input;
struct ArrowIpcDecoder decoder;
+ int use_shared_buffers;
struct ArrowSchema out_schema;
int64_t field_index;
struct ArrowBuffer header;
@@ -370,13 +371,22 @@ static int ArrowIpcArrayStreamReaderGetNext(struct
ArrowArrayStream* stream,
// Read in the body
NANOARROW_RETURN_NOT_OK(ArrowIpcArrayStreamReaderNextBody(private_data));
- struct ArrowBufferView body_view;
- body_view.data.data = private_data->body.data;
- body_view.size_bytes = private_data->body.size_bytes;
+ if (private_data->use_shared_buffers) {
+ struct ArrowIpcSharedBuffer shared;
+ NANOARROW_RETURN_NOT_OK(ArrowIpcSharedBufferInit(&shared,
&private_data->body));
+ NANOARROW_RETURN_NOT_OK(ArrowIpcDecoderDecodeArrayFromShared(
+ &private_data->decoder, &shared, private_data->field_index, out,
+ &private_data->error));
+ ArrowIpcSharedBufferReset(&shared);
+ } else {
+ struct ArrowBufferView body_view;
+ body_view.data.data = private_data->body.data;
+ body_view.size_bytes = private_data->body.size_bytes;
- NANOARROW_RETURN_NOT_OK(ArrowIpcDecoderDecodeArray(&private_data->decoder,
body_view,
-
private_data->field_index, out,
- &private_data->error));
+ NANOARROW_RETURN_NOT_OK(ArrowIpcDecoderDecodeArray(&private_data->decoder,
body_view,
+
private_data->field_index, out,
+ &private_data->error));
+ }
return NANOARROW_OK;
}
@@ -411,8 +421,10 @@ ArrowErrorCode ArrowIpcArrayStreamReaderInit(
if (options != NULL) {
private_data->field_index = options->field_index;
+ private_data->use_shared_buffers = options->use_shared_buffers;
} else {
private_data->field_index = -1;
+ private_data->use_shared_buffers = ArrowIpcSharedBufferIsThreadSafe();
}
out->private_data = private_data;
diff --git
a/extensions/nanoarrow_ipc/src/nanoarrow/nanoarrow_ipc_reader_test.cc
b/extensions/nanoarrow_ipc/src/nanoarrow/nanoarrow_ipc_reader_test.cc
index 126aaf3..03e4048 100644
--- a/extensions/nanoarrow_ipc/src/nanoarrow/nanoarrow_ipc_reader_test.cc
+++ b/extensions/nanoarrow_ipc/src/nanoarrow/nanoarrow_ipc_reader_test.cc
@@ -59,6 +59,14 @@ static uint8_t kSimpleRecordBatch[] = {
static uint8_t kEndOfStream[] = {0xff, 0xff, 0xff, 0xff, 0x00, 0x00, 0x00,
0x00};
+// If we have an explicit compile with or without atomics, test the return
value of
+// ArrowIpcSharedBufferIsThreadSafe()
+#if defined(NANOARROW_IPC_USE_STDATOMIC)
+TEST(NanoarrowIpcReader, ArrowIpcSharedBufferIsThreadSafe) {
+ EXPECT_EQ(ArrowIpcSharedBufferIsThreadSafe(), NANOARROW_IPC_USE_STDATOMIC !=
0);
+}
+#endif
+
TEST(NanoarrowIpcReader, InputStreamBuffer) {
uint8_t input_data[] = {0x01, 0x02, 0x03, 0x04, 0x05};
struct ArrowBuffer input;
@@ -173,6 +181,43 @@ TEST(NanoarrowIpcReader, StreamReaderBasic) {
stream.release(&stream);
}
+TEST(NanoarrowIpcReader, StreamReaderBasicNoSharedBuffers) {
+ struct ArrowBuffer input_buffer;
+ ArrowBufferInit(&input_buffer);
+ ASSERT_EQ(ArrowBufferAppend(&input_buffer, kSimpleSchema,
sizeof(kSimpleSchema)),
+ NANOARROW_OK);
+ ASSERT_EQ(
+ ArrowBufferAppend(&input_buffer, kSimpleRecordBatch,
sizeof(kSimpleRecordBatch)),
+ NANOARROW_OK);
+
+ struct ArrowIpcInputStream input;
+ ASSERT_EQ(ArrowIpcInputStreamInitBuffer(&input, &input_buffer),
NANOARROW_OK);
+
+ struct ArrowArrayStream stream;
+ struct ArrowIpcArrayStreamReaderOptions options;
+ options.field_index = -1;
+ options.use_shared_buffers = 0;
+ ASSERT_EQ(ArrowIpcArrayStreamReaderInit(&stream, &input, &options),
NANOARROW_OK);
+
+ struct ArrowSchema schema;
+ ASSERT_EQ(stream.get_schema(&stream, &schema), NANOARROW_OK);
+ EXPECT_STREQ(schema.format, "+s");
+ schema.release(&schema);
+
+ struct ArrowArray array;
+ ASSERT_EQ(stream.get_next(&stream, &array), NANOARROW_OK);
+ EXPECT_EQ(array.length, 3);
+ array.release(&array);
+
+ ASSERT_EQ(stream.get_next(&stream, &array), NANOARROW_OK);
+ EXPECT_EQ(array.release, nullptr);
+
+ ASSERT_EQ(stream.get_next(&stream, &array), NANOARROW_OK);
+ EXPECT_EQ(array.release, nullptr);
+
+ stream.release(&stream);
+}
+
TEST(NanoarrowIpcReader, StreamReaderBasicWithEndOfStream) {
struct ArrowBuffer input_buffer;
ArrowBufferInit(&input_buffer);
@@ -268,6 +313,7 @@ TEST(NanoarrowIpcReader, StreamReaderUnsupportedFieldIndex)
{
struct ArrowArrayStream stream;
struct ArrowIpcArrayStreamReaderOptions options;
options.field_index = 0;
+ options.use_shared_buffers = 0;
ASSERT_EQ(ArrowIpcArrayStreamReaderInit(&stream, &input, &options),
NANOARROW_OK);
struct ArrowSchema schema;