bkietz commented on code in PR #571: URL: https://github.com/apache/arrow-nanoarrow/pull/571#discussion_r1714001445
########## src/nanoarrow/ipc/writer.c: ########## @@ -150,3 +155,173 @@ ArrowErrorCode ArrowIpcOutputStreamInitFile(struct ArrowIpcOutputStream* stream, stream->private_data = private_data; return NANOARROW_OK; } + +struct ArrowIpcWriterPrivate { + struct ArrowIpcEncoder encoder; + struct ArrowIpcOutputStream output_stream; + struct ArrowBuffer buffer; + struct ArrowBuffer body_buffer; +}; + +ArrowErrorCode ArrowIpcOutputStreamWrite(struct ArrowIpcOutputStream* stream, + struct ArrowBufferView data, + struct ArrowError* error) { + while (data.size_bytes != 0) { + int64_t bytes_written = 0; + NANOARROW_RETURN_NOT_OK(stream->write(stream, data.data.as_uint8, data.size_bytes, + &bytes_written, error)); + data.size_bytes -= bytes_written; + data.data.as_uint8 += bytes_written; + } + return NANOARROW_OK; +} + +ArrowErrorCode ArrowIpcWriterInit(struct ArrowIpcWriter* writer, + struct ArrowIpcOutputStream* output_stream) { + NANOARROW_DCHECK(writer != NULL && output_stream != NULL); + + struct ArrowIpcWriterPrivate* private = + (struct ArrowIpcWriterPrivate*)ArrowMalloc(sizeof(struct ArrowIpcWriterPrivate)); + + if (private == NULL) { + return ENOMEM; + } + NANOARROW_RETURN_NOT_OK(ArrowIpcEncoderInit(&private->encoder)); + ArrowIpcOutputStreamMove(output_stream, &private->output_stream); + + ArrowBufferInit(&private->buffer); + ArrowBufferInit(&private->body_buffer); + + writer->private_data = private; + return NANOARROW_OK; +} + +void ArrowIpcWriterReset(struct ArrowIpcWriter* writer) { + NANOARROW_DCHECK(writer != NULL); + + struct ArrowIpcWriterPrivate* private = + (struct ArrowIpcWriterPrivate*)writer->private_data; + + if (private != NULL) { + ArrowIpcEncoderReset(&private->encoder); + private->output_stream.release(&private->output_stream); + ArrowBufferReset(&private->buffer); + ArrowBufferReset(&private->body_buffer); + + ArrowFree(private); + } + memset(writer, 0, sizeof(struct ArrowIpcWriter)); +} + +static struct ArrowBufferView ArrowBufferToBufferView(const struct ArrowBuffer* buffer) { + struct ArrowBufferView buffer_view = { + .data.as_uint8 = buffer->data, + .size_bytes = buffer->size_bytes, + }; + return buffer_view; +} + +// Eventually, it may be necessary to construct an ArrowIpcWriter which doesn't rely on +// blocking writes (ArrowIpcOutputStreamWrite). For example an ArrowIpcOutputStream +// might wrap a socket which is not always able to transmit all bytes of a Message. In +// that case users of ArrowIpcWriter might prefer to do other work until a socket is +// ready rather than blocking, or timeout, or otherwise respond to partial transmission. +// +// This could be handled by: +// - keeping partially sent buffers internal and signalling incomplete transmission by +// raising EAGAIN, returning "bytes actually written", ... +// - when the caller is ready to try again, call ArrowIpcWriterWriteSome() +// - exposing internal buffers which have not been completely sent, deferring +// follow-up transmission to the caller + +ArrowErrorCode ArrowIpcWriterWriteSchema(struct ArrowIpcWriter* writer, + const struct ArrowSchema* in, + struct ArrowError* error) { + NANOARROW_DCHECK(writer != NULL && writer->private_data != NULL && in != NULL); + struct ArrowIpcWriterPrivate* private = + (struct ArrowIpcWriterPrivate*)writer->private_data; + + NANOARROW_ASSERT_OK(ArrowBufferResize(&private->buffer, 0, 0)); + NANOARROW_RETURN_NOT_OK(ArrowIpcEncoderEncodeSchema(&private->encoder, in, error)); + NANOARROW_RETURN_NOT_OK_WITH_ERROR( + ArrowIpcEncoderFinalizeBuffer(&private->encoder, /*encapsulate=*/1, + &private->buffer), + error); + + return ArrowIpcOutputStreamWrite(&private->output_stream, + ArrowBufferToBufferView(&private->buffer), error); +} + +ArrowErrorCode ArrowIpcWriterWriteArrayView(struct ArrowIpcWriter* writer, + const struct ArrowArrayView* in, + struct ArrowError* error) { + NANOARROW_DCHECK(writer != NULL && writer->private_data != NULL); + struct ArrowIpcWriterPrivate* private = + (struct ArrowIpcWriterPrivate*)writer->private_data; + + if (in == NULL) { + int32_t eos[] = {-1, 0}; + struct ArrowBufferView data = {.data.as_int32 = eos, .size_bytes = sizeof(eos)}; + return ArrowIpcOutputStreamWrite(&private->output_stream, data, error); + } + + NANOARROW_ASSERT_OK(ArrowBufferResize(&private->buffer, 0, 0)); + NANOARROW_ASSERT_OK(ArrowBufferResize(&private->body_buffer, 0, 0)); + + NANOARROW_RETURN_NOT_OK(ArrowIpcEncoderEncodeSimpleRecordBatch( + &private->encoder, in, &private->body_buffer, error)); + Review Comment: Okay... I'll get a fix as soon as I write a failing test. Thanks! -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: github-unsubscr...@arrow.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org