lidavidm commented on code in PR #165:
URL: https://github.com/apache/arrow-nanoarrow/pull/165#discussion_r1146734552
##########
extensions/nanoarrow_ipc/src/nanoarrow/nanoarrow_ipc_decoder.c:
##########
@@ -1051,53 +1064,184 @@ ArrowErrorCode ArrowIpcDecoderSetEndianness(struct
ArrowIpcDecoder* decoder,
}
}
-struct ArrowIpcArraySetter {
- ns(FieldNode_vec_t) fields;
- int64_t field_i;
- ns(Buffer_vec_t) buffers;
- int64_t buffer_i;
- struct ArrowBufferView body;
- enum ArrowIpcCompressionType codec;
- int swap_endian;
+#if NANOARROW_IPC_USE_STDATOMIC
+struct ArrowIpcSharedBuffer {
+ struct ArrowBuffer src;
+ atomic_long reference_count;
};
-static int ArrowIpcDecoderMakeBuffer(struct ArrowIpcArraySetter* setter,
int64_t offset,
- int64_t length, struct ArrowBuffer* out,
- struct ArrowError* error) {
- if (length == 0) {
- return NANOARROW_OK;
+static int64_t ArrowIpcSharedBufferUpdate(
+ struct ArrowIpcSharedBuffer* private_data, int delta) {
+ int64_t old_count = atomic_fetch_add(&private_data->reference_count, delta);
+ return old_count + delta;
+}
+
+static void ArrowIpcSharedBufferSet(
+ struct ArrowIpcSharedBuffer* private_data, int64_t count) {
+ atomic_store(&private_data->reference_count, count);
+}
+
+int ArrowIpcSharedBufferIsThreadSafe(void) {
+ return 1;
+}
+#else
+struct ArrowIpcSharedBuffer {
+ struct ArrowBuffer src;
+ int64_t reference_count;
+};
+
+static int64_t ArrowIpcSharedBufferUpdate(
+ struct ArrowIpcSharedBuffer* private_data, int delta) {
+ private_data->reference_count += delta;
+ return private_data->reference_count;
+}
+
+static void ArrowIpcSharedBufferSet(
+ struct ArrowIpcSharedBuffer* private_data, int64_t count) {
+ private_data->reference_count = count;
+}
+
+int ArrowIpcSharedBufferIsThreadSafe(void) {
+ return 0;
+}
+#endif
+
+static void ArrowIpcSharedBufferFree(struct ArrowBufferAllocator* allocator,
uint8_t* ptr,
+ int64_t size) {
+ struct ArrowIpcSharedBuffer* private_data =
+ (struct ArrowIpcSharedBuffer*)allocator->private_data;
+
+ if (ArrowIpcSharedBufferUpdate(private_data, -1) == 0) {
+ ArrowBufferReset(&private_data->src);
+ ArrowFree(private_data);
}
+}
- // Check that this buffer fits within the body
- if (offset < 0 || (offset + length) > setter->body.size_bytes) {
- ArrowErrorSet(error,
- "Buffer %ld requires body offsets [%ld..%ld) but body has
size %ld",
- (long)setter->buffer_i - 1, (long)offset, (long)offset +
(long)length,
- setter->body.size_bytes);
- return EINVAL;
+static int ArrowIpcSharedBufferInit(struct ArrowBuffer* buffer) {
Review Comment:
Would it be possible to check if buffer is already a shared buffer, and if
so just reuse the existing reference count?
##########
extensions/nanoarrow_ipc/src/nanoarrow/nanoarrow_ipc_decoder.c:
##########
@@ -1051,53 +1064,184 @@ ArrowErrorCode ArrowIpcDecoderSetEndianness(struct
ArrowIpcDecoder* decoder,
}
}
-struct ArrowIpcArraySetter {
- ns(FieldNode_vec_t) fields;
- int64_t field_i;
- ns(Buffer_vec_t) buffers;
- int64_t buffer_i;
- struct ArrowBufferView body;
- enum ArrowIpcCompressionType codec;
- int swap_endian;
+#if NANOARROW_IPC_USE_STDATOMIC
+struct ArrowIpcSharedBuffer {
+ struct ArrowBuffer src;
+ atomic_long reference_count;
};
-static int ArrowIpcDecoderMakeBuffer(struct ArrowIpcArraySetter* setter,
int64_t offset,
- int64_t length, struct ArrowBuffer* out,
- struct ArrowError* error) {
- if (length == 0) {
- return NANOARROW_OK;
+static int64_t ArrowIpcSharedBufferUpdate(
+ struct ArrowIpcSharedBuffer* private_data, int delta) {
+ int64_t old_count = atomic_fetch_add(&private_data->reference_count, delta);
+ return old_count + delta;
+}
+
+static void ArrowIpcSharedBufferSet(
+ struct ArrowIpcSharedBuffer* private_data, int64_t count) {
+ atomic_store(&private_data->reference_count, count);
+}
+
+int ArrowIpcSharedBufferIsThreadSafe(void) {
+ return 1;
+}
+#else
+struct ArrowIpcSharedBuffer {
+ struct ArrowBuffer src;
+ int64_t reference_count;
+};
+
+static int64_t ArrowIpcSharedBufferUpdate(
+ struct ArrowIpcSharedBuffer* private_data, int delta) {
+ private_data->reference_count += delta;
+ return private_data->reference_count;
+}
+
+static void ArrowIpcSharedBufferSet(
+ struct ArrowIpcSharedBuffer* private_data, int64_t count) {
+ private_data->reference_count = count;
+}
+
+int ArrowIpcSharedBufferIsThreadSafe(void) {
+ return 0;
+}
+#endif
+
+static void ArrowIpcSharedBufferFree(struct ArrowBufferAllocator* allocator,
uint8_t* ptr,
+ int64_t size) {
+ struct ArrowIpcSharedBuffer* private_data =
+ (struct ArrowIpcSharedBuffer*)allocator->private_data;
+
+ if (ArrowIpcSharedBufferUpdate(private_data, -1) == 0) {
+ ArrowBufferReset(&private_data->src);
+ ArrowFree(private_data);
}
+}
- // Check that this buffer fits within the body
- if (offset < 0 || (offset + length) > setter->body.size_bytes) {
- ArrowErrorSet(error,
- "Buffer %ld requires body offsets [%ld..%ld) but body has
size %ld",
- (long)setter->buffer_i - 1, (long)offset, (long)offset +
(long)length,
- setter->body.size_bytes);
- return EINVAL;
+static int ArrowIpcSharedBufferInit(struct ArrowBuffer* buffer) {
Review Comment:
Especially if `DecodeArrayFromOwned` intends to replace `body` with a shared
buffer so you can reuse it, then it sounds like we need to be careful here to
avoid multiple reference counts pointing to the same source data.
##########
extensions/nanoarrow_ipc/src/nanoarrow/nanoarrow_ipc_decoder.c:
##########
@@ -1051,53 +1064,184 @@ ArrowErrorCode ArrowIpcDecoderSetEndianness(struct
ArrowIpcDecoder* decoder,
}
}
-struct ArrowIpcArraySetter {
- ns(FieldNode_vec_t) fields;
- int64_t field_i;
- ns(Buffer_vec_t) buffers;
- int64_t buffer_i;
- struct ArrowBufferView body;
- enum ArrowIpcCompressionType codec;
- int swap_endian;
+#if NANOARROW_IPC_USE_STDATOMIC
+struct ArrowIpcSharedBuffer {
+ struct ArrowBuffer src;
+ atomic_long reference_count;
};
-static int ArrowIpcDecoderMakeBuffer(struct ArrowIpcArraySetter* setter,
int64_t offset,
- int64_t length, struct ArrowBuffer* out,
- struct ArrowError* error) {
- if (length == 0) {
- return NANOARROW_OK;
+static int64_t ArrowIpcSharedBufferUpdate(
+ struct ArrowIpcSharedBuffer* private_data, int delta) {
+ int64_t old_count = atomic_fetch_add(&private_data->reference_count, delta);
+ return old_count + delta;
+}
+
+static void ArrowIpcSharedBufferSet(
+ struct ArrowIpcSharedBuffer* private_data, int64_t count) {
+ atomic_store(&private_data->reference_count, count);
+}
+
+int ArrowIpcSharedBufferIsThreadSafe(void) {
+ return 1;
+}
+#else
+struct ArrowIpcSharedBuffer {
+ struct ArrowBuffer src;
+ int64_t reference_count;
+};
+
+static int64_t ArrowIpcSharedBufferUpdate(
+ struct ArrowIpcSharedBuffer* private_data, int delta) {
+ private_data->reference_count += delta;
+ return private_data->reference_count;
+}
+
+static void ArrowIpcSharedBufferSet(
+ struct ArrowIpcSharedBuffer* private_data, int64_t count) {
+ private_data->reference_count = count;
+}
+
+int ArrowIpcSharedBufferIsThreadSafe(void) {
+ return 0;
+}
+#endif
+
+static void ArrowIpcSharedBufferFree(struct ArrowBufferAllocator* allocator,
uint8_t* ptr,
+ int64_t size) {
+ struct ArrowIpcSharedBuffer* private_data =
+ (struct ArrowIpcSharedBuffer*)allocator->private_data;
+
+ if (ArrowIpcSharedBufferUpdate(private_data, -1) == 0) {
+ ArrowBufferReset(&private_data->src);
+ ArrowFree(private_data);
}
+}
- // Check that this buffer fits within the body
- if (offset < 0 || (offset + length) > setter->body.size_bytes) {
- ArrowErrorSet(error,
- "Buffer %ld requires body offsets [%ld..%ld) but body has
size %ld",
- (long)setter->buffer_i - 1, (long)offset, (long)offset +
(long)length,
- setter->body.size_bytes);
- return EINVAL;
+static int ArrowIpcSharedBufferInit(struct ArrowBuffer* buffer) {
+ struct ArrowIpcSharedBuffer* private_data =
+ (struct ArrowIpcSharedBuffer*)ArrowMalloc(sizeof(struct
ArrowIpcSharedBuffer));
+ if (private_data == NULL) {
+ return ENOMEM;
}
- struct ArrowBufferView view;
- view.data.as_uint8 = setter->body.data.as_uint8 + offset;
- view.size_bytes = length;
+ ArrowBufferMove(buffer, &private_data->src);
+ ArrowIpcSharedBufferSet(private_data, 1);
+
+ ArrowBufferInit(buffer);
+ buffer->data = private_data->src.data;
+ buffer->size_bytes = private_data->src.size_bytes;
+ // Don't expose any extra capcity from src so that any calls to
ArrowBufferAppend
+ // on this buffer will fail.
+ buffer->capacity_bytes = private_data->src.size_bytes;
+ buffer->allocator = ArrowBufferDeallocator(&ArrowIpcSharedBufferFree,
private_data);
+ return NANOARROW_OK;
+}
+
+static void ArrowIpcSharedBufferClone(struct ArrowBuffer* shared,
+ struct ArrowBuffer* shared_out) {
+ struct ArrowIpcSharedBuffer* private_data =
+ (struct ArrowIpcSharedBuffer*)shared->allocator.private_data;
+ ArrowIpcSharedBufferUpdate(private_data, 1);
+ memcpy(shared_out, shared, sizeof(struct ArrowBuffer));
+}
+
+struct ArrowIpcBufferSource {
+ int64_t body_offset_bytes;
+ int64_t buffer_length_bytes;
+ enum ArrowIpcCompressionType codec;
+ int swap_endian;
+};
+
+struct ArrowIpcBufferFactory {
Review Comment:
This might be worth some docs? Basically, it's handing out slices/copies of
some internal source buffer based on the `src` parameter?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]