paleolimbot commented on code in PR #165:
URL: https://github.com/apache/arrow-nanoarrow/pull/165#discussion_r1147588013
##########
extensions/nanoarrow_ipc/src/nanoarrow/nanoarrow_ipc_decoder.c:
##########
@@ -1051,53 +1064,184 @@ ArrowErrorCode ArrowIpcDecoderSetEndianness(struct
ArrowIpcDecoder* decoder,
}
}
-struct ArrowIpcArraySetter {
- ns(FieldNode_vec_t) fields;
- int64_t field_i;
- ns(Buffer_vec_t) buffers;
- int64_t buffer_i;
- struct ArrowBufferView body;
- enum ArrowIpcCompressionType codec;
- int swap_endian;
+#if NANOARROW_IPC_USE_STDATOMIC
+struct ArrowIpcSharedBuffer {
+ struct ArrowBuffer src;
+ atomic_long reference_count;
};
-static int ArrowIpcDecoderMakeBuffer(struct ArrowIpcArraySetter* setter,
int64_t offset,
- int64_t length, struct ArrowBuffer* out,
- struct ArrowError* error) {
- if (length == 0) {
- return NANOARROW_OK;
+static int64_t ArrowIpcSharedBufferUpdate(
+ struct ArrowIpcSharedBuffer* private_data, int delta) {
+ int64_t old_count = atomic_fetch_add(&private_data->reference_count, delta);
+ return old_count + delta;
+}
+
+static void ArrowIpcSharedBufferSet(
+ struct ArrowIpcSharedBuffer* private_data, int64_t count) {
+ atomic_store(&private_data->reference_count, count);
+}
+
+int ArrowIpcSharedBufferIsThreadSafe(void) {
+ return 1;
+}
+#else
+struct ArrowIpcSharedBuffer {
+ struct ArrowBuffer src;
+ int64_t reference_count;
+};
+
+static int64_t ArrowIpcSharedBufferUpdate(
+ struct ArrowIpcSharedBuffer* private_data, int delta) {
+ private_data->reference_count += delta;
+ return private_data->reference_count;
+}
+
+static void ArrowIpcSharedBufferSet(
+ struct ArrowIpcSharedBuffer* private_data, int64_t count) {
+ private_data->reference_count = count;
+}
+
+int ArrowIpcSharedBufferIsThreadSafe(void) {
+ return 0;
+}
+#endif
+
+static void ArrowIpcSharedBufferFree(struct ArrowBufferAllocator* allocator,
uint8_t* ptr,
+ int64_t size) {
+ struct ArrowIpcSharedBuffer* private_data =
+ (struct ArrowIpcSharedBuffer*)allocator->private_data;
+
+ if (ArrowIpcSharedBufferUpdate(private_data, -1) == 0) {
+ ArrowBufferReset(&private_data->src);
+ ArrowFree(private_data);
}
+}
- // Check that this buffer fits within the body
- if (offset < 0 || (offset + length) > setter->body.size_bytes) {
- ArrowErrorSet(error,
- "Buffer %ld requires body offsets [%ld..%ld) but body has
size %ld",
- (long)setter->buffer_i - 1, (long)offset, (long)offset +
(long)length,
- setter->body.size_bytes);
- return EINVAL;
+static int ArrowIpcSharedBufferInit(struct ArrowBuffer* buffer) {
+ struct ArrowIpcSharedBuffer* private_data =
+ (struct ArrowIpcSharedBuffer*)ArrowMalloc(sizeof(struct
ArrowIpcSharedBuffer));
+ if (private_data == NULL) {
+ return ENOMEM;
}
- struct ArrowBufferView view;
- view.data.as_uint8 = setter->body.data.as_uint8 + offset;
- view.size_bytes = length;
+ ArrowBufferMove(buffer, &private_data->src);
+ ArrowIpcSharedBufferSet(private_data, 1);
+
+ ArrowBufferInit(buffer);
+ buffer->data = private_data->src.data;
+ buffer->size_bytes = private_data->src.size_bytes;
+ // Don't expose any extra capcity from src so that any calls to
ArrowBufferAppend
+ // on this buffer will fail.
+ buffer->capacity_bytes = private_data->src.size_bytes;
+ buffer->allocator = ArrowBufferDeallocator(&ArrowIpcSharedBufferFree,
private_data);
+ return NANOARROW_OK;
+}
+
+static void ArrowIpcSharedBufferClone(struct ArrowBuffer* shared,
+ struct ArrowBuffer* shared_out) {
+ struct ArrowIpcSharedBuffer* private_data =
+ (struct ArrowIpcSharedBuffer*)shared->allocator.private_data;
+ ArrowIpcSharedBufferUpdate(private_data, 1);
+ memcpy(shared_out, shared, sizeof(struct ArrowBuffer));
+}
+
+struct ArrowIpcBufferSource {
+ int64_t body_offset_bytes;
+ int64_t buffer_length_bytes;
+ enum ArrowIpcCompressionType codec;
+ int swap_endian;
+};
+
+struct ArrowIpcBufferFactory {
Review Comment:
I added full documentation here...the basic idea is that what we get from
flatbuffers is offset + length as integers, but there are a multitude of ways
to get from there to a `struct ArrowBuffer` (not all of which involve absolute
pointers or a complete message body that exists in memory). Because it's
internal it could also be a `switch()` + `enum` if you think the callback thing
is overkill 🤷
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]