bkietz commented on code in PR #571:
URL: https://github.com/apache/arrow-nanoarrow/pull/571#discussion_r1714001445


##########
src/nanoarrow/ipc/writer.c:
##########
@@ -150,3 +155,173 @@ ArrowErrorCode ArrowIpcOutputStreamInitFile(struct 
ArrowIpcOutputStream* stream,
   stream->private_data = private_data;
   return NANOARROW_OK;
 }
+
+struct ArrowIpcWriterPrivate {
+  struct ArrowIpcEncoder encoder;
+  struct ArrowIpcOutputStream output_stream;
+  struct ArrowBuffer buffer;
+  struct ArrowBuffer body_buffer;
+};
+
+ArrowErrorCode ArrowIpcOutputStreamWrite(struct ArrowIpcOutputStream* stream,
+                                         struct ArrowBufferView data,
+                                         struct ArrowError* error) {
+  while (data.size_bytes != 0) {
+    int64_t bytes_written = 0;
+    NANOARROW_RETURN_NOT_OK(stream->write(stream, data.data.as_uint8, 
data.size_bytes,
+                                          &bytes_written, error));
+    data.size_bytes -= bytes_written;
+    data.data.as_uint8 += bytes_written;
+  }
+  return NANOARROW_OK;
+}
+
+ArrowErrorCode ArrowIpcWriterInit(struct ArrowIpcWriter* writer,
+                                  struct ArrowIpcOutputStream* output_stream) {
+  NANOARROW_DCHECK(writer != NULL && output_stream != NULL);
+
+  struct ArrowIpcWriterPrivate* private =
+      (struct ArrowIpcWriterPrivate*)ArrowMalloc(sizeof(struct 
ArrowIpcWriterPrivate));
+
+  if (private == NULL) {
+    return ENOMEM;
+  }
+  NANOARROW_RETURN_NOT_OK(ArrowIpcEncoderInit(&private->encoder));
+  ArrowIpcOutputStreamMove(output_stream, &private->output_stream);
+
+  ArrowBufferInit(&private->buffer);
+  ArrowBufferInit(&private->body_buffer);
+
+  writer->private_data = private;
+  return NANOARROW_OK;
+}
+
+void ArrowIpcWriterReset(struct ArrowIpcWriter* writer) {
+  NANOARROW_DCHECK(writer != NULL);
+
+  struct ArrowIpcWriterPrivate* private =
+      (struct ArrowIpcWriterPrivate*)writer->private_data;
+
+  if (private != NULL) {
+    ArrowIpcEncoderReset(&private->encoder);
+    private->output_stream.release(&private->output_stream);
+    ArrowBufferReset(&private->buffer);
+    ArrowBufferReset(&private->body_buffer);
+
+    ArrowFree(private);
+  }
+  memset(writer, 0, sizeof(struct ArrowIpcWriter));
+}
+
+static struct ArrowBufferView ArrowBufferToBufferView(const struct 
ArrowBuffer* buffer) {
+  struct ArrowBufferView buffer_view = {
+      .data.as_uint8 = buffer->data,
+      .size_bytes = buffer->size_bytes,
+  };
+  return buffer_view;
+}
+
+// Eventually, it may be necessary to construct an ArrowIpcWriter which 
doesn't rely on
+// blocking writes (ArrowIpcOutputStreamWrite). For example an 
ArrowIpcOutputStream
+// might wrap a socket which is not always able to transmit all bytes of a 
Message. In
+// that case users of ArrowIpcWriter might prefer to do other work until a 
socket is
+// ready rather than blocking, or timeout, or otherwise respond to partial 
transmission.
+//
+// This could be handled by:
+// - keeping partially sent buffers internal and signalling incomplete 
transmission by
+//   raising EAGAIN, returning "bytes actually written", ...
+//   - when the caller is ready to try again, call ArrowIpcWriterWriteSome()
+// - exposing internal buffers which have not been completely sent, deferring
+//   follow-up transmission to the caller
+
+ArrowErrorCode ArrowIpcWriterWriteSchema(struct ArrowIpcWriter* writer,
+                                         const struct ArrowSchema* in,
+                                         struct ArrowError* error) {
+  NANOARROW_DCHECK(writer != NULL && writer->private_data != NULL && in != 
NULL);
+  struct ArrowIpcWriterPrivate* private =
+      (struct ArrowIpcWriterPrivate*)writer->private_data;
+
+  NANOARROW_ASSERT_OK(ArrowBufferResize(&private->buffer, 0, 0));
+  NANOARROW_RETURN_NOT_OK(ArrowIpcEncoderEncodeSchema(&private->encoder, in, 
error));
+  NANOARROW_RETURN_NOT_OK_WITH_ERROR(
+      ArrowIpcEncoderFinalizeBuffer(&private->encoder, /*encapsulate=*/1,
+                                    &private->buffer),
+      error);
+
+  return ArrowIpcOutputStreamWrite(&private->output_stream,
+                                   ArrowBufferToBufferView(&private->buffer), 
error);
+}
+
+ArrowErrorCode ArrowIpcWriterWriteArrayView(struct ArrowIpcWriter* writer,
+                                            const struct ArrowArrayView* in,
+                                            struct ArrowError* error) {
+  NANOARROW_DCHECK(writer != NULL && writer->private_data != NULL);
+  struct ArrowIpcWriterPrivate* private =
+      (struct ArrowIpcWriterPrivate*)writer->private_data;
+
+  if (in == NULL) {
+    int32_t eos[] = {-1, 0};
+    struct ArrowBufferView data = {.data.as_int32 = eos, .size_bytes = 
sizeof(eos)};
+    return ArrowIpcOutputStreamWrite(&private->output_stream, data, error);
+  }
+
+  NANOARROW_ASSERT_OK(ArrowBufferResize(&private->buffer, 0, 0));
+  NANOARROW_ASSERT_OK(ArrowBufferResize(&private->body_buffer, 0, 0));
+
+  NANOARROW_RETURN_NOT_OK(ArrowIpcEncoderEncodeSimpleRecordBatch(
+      &private->encoder, in, &private->body_buffer, error));
+

Review Comment:
   Okay... I'll get a fix as soon as I write a failing test. Thanks!



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscr...@arrow.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to