lidavidm commented on code in PR #165:
URL: https://github.com/apache/arrow-nanoarrow/pull/165#discussion_r1147599241
##########
extensions/nanoarrow_ipc/src/nanoarrow/nanoarrow_ipc_decoder.c:
##########
@@ -1051,53 +1064,184 @@ ArrowErrorCode ArrowIpcDecoderSetEndianness(struct
ArrowIpcDecoder* decoder,
}
}
-struct ArrowIpcArraySetter {
- ns(FieldNode_vec_t) fields;
- int64_t field_i;
- ns(Buffer_vec_t) buffers;
- int64_t buffer_i;
- struct ArrowBufferView body;
- enum ArrowIpcCompressionType codec;
- int swap_endian;
+#if NANOARROW_IPC_USE_STDATOMIC
+struct ArrowIpcSharedBuffer {
+ struct ArrowBuffer src;
+ atomic_long reference_count;
};
-static int ArrowIpcDecoderMakeBuffer(struct ArrowIpcArraySetter* setter,
int64_t offset,
- int64_t length, struct ArrowBuffer* out,
- struct ArrowError* error) {
- if (length == 0) {
- return NANOARROW_OK;
+static int64_t ArrowIpcSharedBufferUpdate(
+ struct ArrowIpcSharedBuffer* private_data, int delta) {
+ int64_t old_count = atomic_fetch_add(&private_data->reference_count, delta);
+ return old_count + delta;
+}
+
+static void ArrowIpcSharedBufferSet(
+ struct ArrowIpcSharedBuffer* private_data, int64_t count) {
+ atomic_store(&private_data->reference_count, count);
+}
+
+int ArrowIpcSharedBufferIsThreadSafe(void) {
+ return 1;
+}
+#else
+struct ArrowIpcSharedBuffer {
+ struct ArrowBuffer src;
+ int64_t reference_count;
+};
+
+static int64_t ArrowIpcSharedBufferUpdate(
+ struct ArrowIpcSharedBuffer* private_data, int delta) {
+ private_data->reference_count += delta;
+ return private_data->reference_count;
+}
+
+static void ArrowIpcSharedBufferSet(
+ struct ArrowIpcSharedBuffer* private_data, int64_t count) {
+ private_data->reference_count = count;
+}
+
+int ArrowIpcSharedBufferIsThreadSafe(void) {
+ return 0;
+}
+#endif
+
+static void ArrowIpcSharedBufferFree(struct ArrowBufferAllocator* allocator,
uint8_t* ptr,
+ int64_t size) {
+ struct ArrowIpcSharedBuffer* private_data =
+ (struct ArrowIpcSharedBuffer*)allocator->private_data;
+
+ if (ArrowIpcSharedBufferUpdate(private_data, -1) == 0) {
+ ArrowBufferReset(&private_data->src);
+ ArrowFree(private_data);
}
+}
- // Check that this buffer fits within the body
- if (offset < 0 || (offset + length) > setter->body.size_bytes) {
- ArrowErrorSet(error,
- "Buffer %ld requires body offsets [%ld..%ld) but body has
size %ld",
- (long)setter->buffer_i - 1, (long)offset, (long)offset +
(long)length,
- setter->body.size_bytes);
- return EINVAL;
+static int ArrowIpcSharedBufferInit(struct ArrowBuffer* buffer) {
+ struct ArrowIpcSharedBuffer* private_data =
+ (struct ArrowIpcSharedBuffer*)ArrowMalloc(sizeof(struct
ArrowIpcSharedBuffer));
+ if (private_data == NULL) {
+ return ENOMEM;
}
- struct ArrowBufferView view;
- view.data.as_uint8 = setter->body.data.as_uint8 + offset;
- view.size_bytes = length;
+ ArrowBufferMove(buffer, &private_data->src);
+ ArrowIpcSharedBufferSet(private_data, 1);
+
+ ArrowBufferInit(buffer);
+ buffer->data = private_data->src.data;
+ buffer->size_bytes = private_data->src.size_bytes;
+ // Don't expose any extra capcity from src so that any calls to
ArrowBufferAppend
+ // on this buffer will fail.
+ buffer->capacity_bytes = private_data->src.size_bytes;
+ buffer->allocator = ArrowBufferDeallocator(&ArrowIpcSharedBufferFree,
private_data);
+ return NANOARROW_OK;
+}
+
+static void ArrowIpcSharedBufferClone(struct ArrowBuffer* shared,
+ struct ArrowBuffer* shared_out) {
+ struct ArrowIpcSharedBuffer* private_data =
+ (struct ArrowIpcSharedBuffer*)shared->allocator.private_data;
+ ArrowIpcSharedBufferUpdate(private_data, 1);
+ memcpy(shared_out, shared, sizeof(struct ArrowBuffer));
+}
+
+struct ArrowIpcBufferSource {
+ int64_t body_offset_bytes;
+ int64_t buffer_length_bytes;
+ enum ArrowIpcCompressionType codec;
+ int swap_endian;
+};
+
+struct ArrowIpcBufferFactory {
Review Comment:
We can leave it be (it's a little Java-y for my tastes) and revisit
depending on if we see value in the flexibility.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]