paleolimbot commented on code in PR #555:
URL: https://github.com/apache/arrow-nanoarrow/pull/555#discussion_r1702427353
##########
src/nanoarrow/ipc/encoder.c:
##########
@@ -424,3 +423,135 @@ ArrowErrorCode ArrowIpcEncoderEncodeSchema(struct
ArrowIpcEncoder* encoder,
FLATCC_RETURN_UNLESS_0(Message_bodyLength_add(builder, 0));
return ns(Message_end_as_root(builder)) ? NANOARROW_OK : ENOMEM;
}
+
+static ArrowErrorCode ArrowIpcEncoderBuildContiguousBodyBufferCallback(
+ struct ArrowBufferView buffer_view, struct ArrowIpcEncoder* encoder,
int64_t* offset,
+ int64_t* length, struct ArrowError* error) {
+ struct ArrowBuffer* body_buffer = (struct
ArrowBuffer*)encoder->encode_buffer_state;
+
+ int compressed_buffer_header =
+ encoder->codec != NANOARROW_IPC_COMPRESSION_TYPE_NONE ? sizeof(int64_t)
: 0;
+ int64_t old_size = body_buffer->size_bytes;
+ int64_t buffer_begin = _ArrowRoundUpToMultipleOf8(old_size);
+ int64_t buffer_end = buffer_begin + compressed_buffer_header +
buffer_view.size_bytes;
+ int64_t new_size = _ArrowRoundUpToMultipleOf8(buffer_end);
+
+ // reserve all the memory we'll need now
+ NANOARROW_RETURN_NOT_OK(ArrowBufferReserve(body_buffer, new_size -
old_size));
+
+ // zero padding up to the start of the buffer
+ NANOARROW_RETURN_NOT_OK(ArrowBufferAppendFill(body_buffer, 0, buffer_begin -
old_size));
+
+ // store offset and length of the buffer
+ *offset = buffer_begin;
+ *length = buffer_view.size_bytes;
+
+ if (compressed_buffer_header) {
+ // Signal that the buffer is not compressed; eventually we will set this
to the
+ // decompressed length of an actually compressed buffer.
+ NANOARROW_RETURN_NOT_OK(ArrowBufferAppendInt64(body_buffer, -1));
+ }
+ NANOARROW_RETURN_NOT_OK(
+ ArrowBufferAppend(body_buffer, buffer_view.data.data,
buffer_view.size_bytes));
+
+ // zero padding after writing the buffer
+ NANOARROW_DCHECK(body_buffer->size_bytes == buffer_end);
+ NANOARROW_RETURN_NOT_OK(ArrowBufferAppendFill(body_buffer, 0, new_size -
buffer_end));
+
+ encoder->body_length = body_buffer->size_bytes;
+ return NANOARROW_OK;
+}
+
+void ArrowIpcEncoderBuildContiguousBodyBuffer(struct ArrowIpcEncoder* encoder,
+ struct ArrowBuffer* body_buffer)
{
+ NANOARROW_DCHECK(encoder != NULL && encoder->private_data != NULL &&
+ body_buffer != NULL);
+ encoder->encode_buffer = &ArrowIpcEncoderBuildContiguousBodyBufferCallback;
+ encoder->encode_buffer_state = body_buffer;
+}
+
+static ArrowErrorCode ArrowIpcEncoderEncodeRecordBatchImpl(
+ struct ArrowIpcEncoder* encoder, const struct ArrowArrayView* array_view,
+ struct ArrowBuffer* buffers, struct ArrowBuffer* nodes, struct ArrowError*
error) {
+ if (array_view->offset != 0) {
+ ArrowErrorSet(error, "Cannot encode arrays with nonzero offset");
+ return ENOTSUP;
+ }
+
+ for (int64_t c = 0; c < array_view->n_children; ++c) {
+ const struct ArrowArrayView* child = array_view->children[c];
+
+ struct ns(FieldNode) node = {child->length, child->null_count};
+ NANOARROW_RETURN_NOT_OK(ArrowBufferAppend(nodes, &node, sizeof(node)));
+
+ for (int64_t b = 0; b < child->array->n_buffers; ++b) {
+ struct ns(Buffer) buffer;
+ NANOARROW_RETURN_NOT_OK(encoder->encode_buffer(
+ child->buffer_views[b], encoder, &buffer.offset, &buffer.length,
error));
+ NANOARROW_RETURN_NOT_OK(ArrowBufferAppend(buffers, &buffer,
sizeof(buffer)));
+ }
+
+ NANOARROW_RETURN_NOT_OK(
+ ArrowIpcEncoderEncodeRecordBatchImpl(encoder, child, buffers, nodes,
error));
+ }
+ return NANOARROW_OK;
+}
+
+ArrowErrorCode ArrowIpcEncoderEncodeRecordBatch(struct ArrowIpcEncoder*
encoder,
+ const struct ArrowArrayView*
array_view,
+ struct ArrowError* error) {
+ NANOARROW_DCHECK(encoder != NULL && encoder->private_data != NULL && schema
!= NULL);
+
+ if (array_view->null_count != 0 &&
ArrowArrayViewComputeNullCount(array_view) != 0) {
+ ArrowErrorSet(error,
+ "RecordBatches cannot be constructed from arrays with top
level nulls");
+ return EINVAL;
+ }
+
+ if (array_view->storage_type != NANOARROW_TYPE_STRUCT) {
+ ArrowErrorSet(
+ error,
+ "RecordBatches cannot be constructed from arrays of type other than
struct");
+ return EINVAL;
+ }
+
+ if (!encoder->encode_buffer) {
+ ArrowErrorSet(error, "No encode_buffer behavior provided when encoding
RecordBatch");
+ return EINVAL;
+ }
+
+ struct ArrowIpcEncoderPrivate* private =
+ (struct ArrowIpcEncoderPrivate*)encoder->private_data;
+
+ flatcc_builder_t* builder = &private->builder;
+
+ FLATCC_RETURN_UNLESS_0(Message_start_as_root(builder));
+ FLATCC_RETURN_UNLESS_0(Message_version_add(builder, ns(MetadataVersion_V5)));
+
+ encoder->body_length = 0;
+
+ FLATCC_RETURN_UNLESS_0(Message_header_RecordBatch_start(builder));
+ if (encoder->codec != NANOARROW_IPC_COMPRESSION_TYPE_NONE) {
+ FLATCC_RETURN_UNLESS_0(RecordBatch_compression_start(builder));
+ FLATCC_RETURN_UNLESS_0(BodyCompression_codec_add(builder, encoder->codec));
+ FLATCC_RETURN_UNLESS_0(RecordBatch_compression_end(builder));
+ }
+ FLATCC_RETURN_UNLESS_0(RecordBatch_length_add(builder, array_view->length));
+
+ ArrowBufferResize(&private->buffers, 0, 0);
+ ArrowBufferResize(&private->nodes, 0, 0);
+ NANOARROW_RETURN_NOT_OK(ArrowIpcEncoderEncodeRecordBatchImpl(
+ encoder, array_view, &private->buffers, &private->nodes, error));
+
+ FLATCC_RETURN_UNLESS_0(RecordBatch_nodes_create( //
+ builder, (struct ns(FieldNode)*)private->nodes.data,
+ private->nodes.size_bytes / sizeof(struct ns(FieldNode))));
+ FLATCC_RETURN_UNLESS_0(RecordBatch_buffers_create( //
+ builder, (struct ns(Buffer)*)private->buffers.data,
+ private->buffers.size_bytes / sizeof(struct ns(Buffer))));
Review Comment:
I was wondering if we could avoid creating the new root/fields and copying
our cached buffer into it (updating in place instead), but flatcc is probably
be very good at anticipating repeated message building and avoiding allocations
(and the copy is probably not expensive).
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]