paleolimbot commented on code in PR #555:
URL: https://github.com/apache/arrow-nanoarrow/pull/555#discussion_r1701046812


##########
src/nanoarrow/ipc/encoder.c:
##########
@@ -424,3 +423,135 @@ ArrowErrorCode ArrowIpcEncoderEncodeSchema(struct 
ArrowIpcEncoder* encoder,
   FLATCC_RETURN_UNLESS_0(Message_bodyLength_add(builder, 0));
   return ns(Message_end_as_root(builder)) ? NANOARROW_OK : ENOMEM;
 }
+
+static ArrowErrorCode ArrowIpcEncoderBuildContiguousBodyBufferCallback(
+    struct ArrowBufferView buffer_view, struct ArrowIpcEncoder* encoder, 
int64_t* offset,
+    int64_t* length, struct ArrowError* error) {
+  struct ArrowBuffer* body_buffer = (struct 
ArrowBuffer*)encoder->encode_buffer_state;
+
+  int compressed_buffer_header =
+      encoder->codec != NANOARROW_IPC_COMPRESSION_TYPE_NONE ? sizeof(int64_t) 
: 0;
+  int64_t old_size = body_buffer->size_bytes;
+  int64_t buffer_begin = _ArrowRoundUpToMultipleOf8(old_size);
+  int64_t buffer_end = buffer_begin + compressed_buffer_header + 
buffer_view.size_bytes;
+  int64_t new_size = _ArrowRoundUpToMultipleOf8(buffer_end);
+
+  // reserve all the memory we'll need now
+  NANOARROW_RETURN_NOT_OK(ArrowBufferReserve(body_buffer, new_size - 
old_size));
+
+  // zero padding up to the start of the buffer
+  NANOARROW_RETURN_NOT_OK(ArrowBufferAppendFill(body_buffer, 0, buffer_begin - 
old_size));
+
+  // store offset and length of the buffer
+  *offset = buffer_begin;
+  *length = buffer_view.size_bytes;
+
+  if (compressed_buffer_header) {
+    // Signal that the buffer is not compressed; eventually we will set this 
to the
+    // decompressed length of an actually compressed buffer.
+    NANOARROW_RETURN_NOT_OK(ArrowBufferAppendInt64(body_buffer, -1));
+  }

Review Comment:
   I think it might be worth deferring this to the PR where compression support 
is added?



##########
src/nanoarrow/ipc/encoder.c:
##########
@@ -424,3 +423,135 @@ ArrowErrorCode ArrowIpcEncoderEncodeSchema(struct 
ArrowIpcEncoder* encoder,
   FLATCC_RETURN_UNLESS_0(Message_bodyLength_add(builder, 0));
   return ns(Message_end_as_root(builder)) ? NANOARROW_OK : ENOMEM;
 }
+
+static ArrowErrorCode ArrowIpcEncoderBuildContiguousBodyBufferCallback(
+    struct ArrowBufferView buffer_view, struct ArrowIpcEncoder* encoder, 
int64_t* offset,
+    int64_t* length, struct ArrowError* error) {
+  struct ArrowBuffer* body_buffer = (struct 
ArrowBuffer*)encoder->encode_buffer_state;

Review Comment:
   I find that the relationship between this callback and the `ArrowIpcEncoder` 
confusing, and I wonder if this should be internal for now (i.e., not exposed 
as a callback in the `struct ArrowIpcEncoder` yet). Arrow C++ handles this with 
an `IpcPayload` and an extensible writer class, the decoder handles it with a 
callback that is only exposed internally, and most nanoarrow extensible-ness 
involves a `release()` callback of some kind.
   
   The minimum required to for somebody to call 
`ArrowIpcEncoderEncodeRecordBatch(encoder, array_view, output_stream/buffer, 
error)` (which is pretty much exactly what you have here but keeping the 
callback internal) might be a starting place until we need extensibility?



##########
src/nanoarrow/ipc/encoder.c:
##########
@@ -80,8 +76,11 @@ ArrowErrorCode ArrowIpcEncoderFinalizeBuffer(struct 
ArrowIpcEncoder* encoder,
   struct ArrowIpcEncoderPrivate* private =
       (struct ArrowIpcEncoderPrivate*)encoder->private_data;
 
-  int64_t size = (int64_t)flatcc_builder_get_buffer_size(&private->builder);
-  int32_t header[] = {-1, ArrowInt32ToLe((int32_t)size)};
+  int32_t size = (int32_t)flatcc_builder_get_buffer_size(&private->builder);

Review Comment:
   This cast should probably have a check since we can easily return an error 
here.



##########
src/nanoarrow/ipc/decoder_test.cc:
##########
@@ -763,6 +764,132 @@ TEST_P(ArrowTypeParameterizedTestFixture, 
NanoarrowIpcArrowArrayRoundtrip) {
   ArrowIpcDecoderReset(&decoder);
 }
 
+struct ArrowArrayViewEqualTo {
+  const struct ArrowArrayView* expected;
+
+  using is_gtest_matcher = void;
+
+  bool MatchAndExplain(const struct ArrowArrayView* actual, std::ostream* os) 
const {
+    return MatchAndExplain({}, actual, expected, os);
+  }
+
+  static bool MatchAndExplain(std::vector<int> field_path,
+                              const struct ArrowArrayView* actual,
+                              const struct ArrowArrayView* expected, 
std::ostream* os) {
+    auto prefixed = [&]() -> std::ostream& {
+      if (!field_path.empty()) {
+        for (int i : field_path) {
+          *os << "." << i;
+        }
+        *os << ":";
+      }
+      return *os;
+    };
+
+    NANOARROW_DCHECK(actual->offset == 0);
+    NANOARROW_DCHECK(expected->offset == 0);
+
+    if (actual->length != expected->length) {
+      prefixed() << "expected length=" << expected->length << "\n";
+      prefixed() << "  actual length=" << actual->length << "\n";
+      return false;
+    }
+
+    auto null_count = [](const struct ArrowArrayView* a) {
+      return a->null_count != -1 ? a->null_count : 
ArrowArrayViewComputeNullCount(a);
+    };
+    if (null_count(actual) != null_count(expected)) {
+      prefixed() << "expected null_count=" << null_count(expected) << "\n";
+      prefixed() << "  actual null_count=" << null_count(actual) << "\n";
+      return false;
+    }
+
+    for (int64_t i = 0; actual->layout.buffer_type[i] != 
NANOARROW_BUFFER_TYPE_NONE &&
+                        i < NANOARROW_MAX_FIXED_BUFFERS;
+         ++i) {
+      auto a_buf = actual->buffer_views[i];
+      auto e_buf = expected->buffer_views[i];
+      if (a_buf.size_bytes != e_buf.size_bytes) {
+        prefixed() << "expected buffer[" << i << "].size=" << e_buf.size_bytes 
<< "\n";
+        prefixed() << "  actual buffer[" << i << "].size=" << a_buf.size_bytes 
<< "\n";
+        return false;
+      }
+      if (memcmp(a_buf.data.data, e_buf.data.data, a_buf.size_bytes) != 0) {
+        prefixed() << "expected buffer[" << i << "]'s data to match\n";
+        return false;
+      }
+    }
+
+    field_path.push_back(0);
+    for (int64_t i = 0; i < actual->n_children; ++i) {
+      field_path.back() = i;
+      if (!MatchAndExplain(field_path, actual->children[i], 
expected->children[i], os)) {
+        return false;
+      }
+    }
+    return true;
+  }
+
+  void DescribeTo(std::ostream* os) const { *os << "is equivalent to the array 
view"; }
+  void DescribeNegationTo(std::ostream* os) const {
+    *os << "is not equivalent to the array view";
+  }
+};
+
+TEST_P(ArrowTypeParameterizedTestFixture, NanoarrowIpcNanoarrowArrayRoundtrip) 
{
+  struct ArrowError error;
+  nanoarrow::UniqueSchema schema;
+  ASSERT_TRUE(
+      arrow::ExportSchema(arrow::Schema({arrow::field("", GetParam())}), 
schema.get())
+          .ok());
+
+  // now make one empty struct array with this schema and another with all 
zeroes
+  nanoarrow::UniqueArray empty_array, zero_array;
+  for (auto* array : {empty_array.get(), zero_array.get()}) {
+    ASSERT_EQ(ArrowArrayInitFromSchema(array, schema.get(), nullptr), 
NANOARROW_OK);
+    ASSERT_EQ(ArrowArrayStartAppending(array), NANOARROW_OK);
+    if (array == zero_array.get()) {
+      ASSERT_EQ(ArrowArrayAppendEmpty(array, 5), NANOARROW_OK);
+    }
+    ASSERT_EQ(ArrowArrayFinishBuildingDefault(array, nullptr), NANOARROW_OK);
+
+    nanoarrow::UniqueArrayView array_view;
+    ASSERT_EQ(ArrowArrayViewInitFromSchema(array_view.get(), schema.get(), 
&error),
+              NANOARROW_OK);

Review Comment:
   This part can probably be outside the loop?



##########
src/nanoarrow/nanoarrow_ipc.h:
##########
@@ -461,6 +465,21 @@ ArrowErrorCode ArrowIpcEncoderFinalizeBuffer(struct 
ArrowIpcEncoder* encoder,
 ArrowErrorCode ArrowIpcEncoderEncodeSchema(struct ArrowIpcEncoder* encoder,
                                            const struct ArrowSchema* schema,
                                            struct ArrowError* error);
+/// \brief Set the encoder to concatenate encoded buffers into body_buffer
+///
+/// encoder->encode_buffer_state will point to the provided ArrowBuffer.
+/// The contiguous body buffer will be appended to this during
+/// ArrowIpcEncoderEncodeRecordBatch.
+void ArrowIpcEncoderBuildContiguousBodyBuffer(struct ArrowIpcEncoder* encoder,
+                                              struct ArrowBuffer* body_buffer);
+
+/// \brief Encode a struct typed ArrayView to a flatbuffer RecordBatch, 
embedded in a
+/// Message.
+///
+/// Returns ENOMEM if allocation fails, NANOARROW_OK otherwise.
+ArrowErrorCode ArrowIpcEncoderEncodeRecordBatch(struct ArrowIpcEncoder* 
encoder,
+                                                const struct ArrowArrayView* 
array_view,
+                                                struct ArrowError* error);

Review Comment:
   I wonder if this would be simpler as:
   
   ```c
   ArrowErrorCode ArrowIpcEncoderEncodeSimpleRecordBatch(struct 
ArrowIpcEncoder* encoder,
                                                   const struct ArrowArrayView* 
array_view,
                                                   struct ArrowBuffer* out,
                                                   struct ArrowError* error);
   ```
   
   ...with a potential future
   
   ```c
   ArrowErrorCode ArrowIpcEncoderEncodeRecordBatch(struct ArrowIpcEncoder* 
encoder,
                                                   const struct ArrowArrayView* 
array_view,
                                                   struct 
ArrowIpcBufferEncoder* encoder,
                                                   struct ArrowError* error);
   ```
   
   ...to handle advanced usage of the dissociated stream stuff where the 
offsets aren't local.



##########
src/nanoarrow/ipc/decoder_test.cc:
##########
@@ -763,6 +764,132 @@ TEST_P(ArrowTypeParameterizedTestFixture, 
NanoarrowIpcArrowArrayRoundtrip) {
   ArrowIpcDecoderReset(&decoder);
 }
 
+struct ArrowArrayViewEqualTo {
+  const struct ArrowArrayView* expected;
+
+  using is_gtest_matcher = void;
+
+  bool MatchAndExplain(const struct ArrowArrayView* actual, std::ostream* os) 
const {
+    return MatchAndExplain({}, actual, expected, os);
+  }
+
+  static bool MatchAndExplain(std::vector<int> field_path,
+                              const struct ArrowArrayView* actual,
+                              const struct ArrowArrayView* expected, 
std::ostream* os) {
+    auto prefixed = [&]() -> std::ostream& {
+      if (!field_path.empty()) {
+        for (int i : field_path) {
+          *os << "." << i;
+        }
+        *os << ":";
+      }
+      return *os;
+    };
+
+    NANOARROW_DCHECK(actual->offset == 0);
+    NANOARROW_DCHECK(expected->offset == 0);
+
+    if (actual->length != expected->length) {
+      prefixed() << "expected length=" << expected->length << "\n";
+      prefixed() << "  actual length=" << actual->length << "\n";
+      return false;
+    }
+
+    auto null_count = [](const struct ArrowArrayView* a) {
+      return a->null_count != -1 ? a->null_count : 
ArrowArrayViewComputeNullCount(a);
+    };
+    if (null_count(actual) != null_count(expected)) {
+      prefixed() << "expected null_count=" << null_count(expected) << "\n";
+      prefixed() << "  actual null_count=" << null_count(actual) << "\n";
+      return false;
+    }
+
+    for (int64_t i = 0; actual->layout.buffer_type[i] != 
NANOARROW_BUFFER_TYPE_NONE &&
+                        i < NANOARROW_MAX_FIXED_BUFFERS;
+         ++i) {

Review Comment:
   nit: all other nanoarrow code (for better or worse) uses `i++`



##########
src/nanoarrow/ipc/decoder_test.cc:
##########
@@ -763,6 +764,132 @@ TEST_P(ArrowTypeParameterizedTestFixture, 
NanoarrowIpcArrowArrayRoundtrip) {
   ArrowIpcDecoderReset(&decoder);
 }
 
+struct ArrowArrayViewEqualTo {

Review Comment:
   This is awesome! The negative matches and messages here are also not tested 
and I am not sure anybody looking to see if we had a utility to help with array 
equality would look in `decoder_test.cc` to find it. Something like:
   
   ```c
   void AssertArrayViewIdentical(actual, expected) {
     NANOARROW_DCHECK(actual->dictionary != nullptr);
     NANOARROW_DCHECK(expected->dictionary != nullptr);
   
     ASSERT_EQ(actual->storage_type, expected->storage_type);
     ASSERT_EQ(actual->offset, expected->offset);
     ASSERT_EQ(actual->length, expected->length);
     for (int i = 0; i < 3; i++) {
       auto a_buf = actual->buffer_views[i];
       auto e_buf = expected->buffer_views[i];
       ASSERT_EQ(a_buf.size_bytes,  e_buf->size_bytes);
       if (a_buf.size_bytes != 0) {
         ASSERT_EQ(memcmp(a_buf.data.data, e_buf.data.data, a_buf.size_bytes), 
0);
       }
     }
   
     ASSERT_EQ(actual->n_children, expected->n_children);
     for (int i = 0; i < actual->n_children; i++) {
       AssertArrayViewIdentical(actual->children[i], expected->children[i]);
     }
   }
   ```
   
   ...will give terrible error messages but can be followed up with a tested 
version of this helper (or a version that partly lives in the C library since 
this comes up in C situations as well).



##########
src/nanoarrow/ipc/encoder.c:
##########
@@ -424,3 +423,135 @@ ArrowErrorCode ArrowIpcEncoderEncodeSchema(struct 
ArrowIpcEncoder* encoder,
   FLATCC_RETURN_UNLESS_0(Message_bodyLength_add(builder, 0));
   return ns(Message_end_as_root(builder)) ? NANOARROW_OK : ENOMEM;
 }
+
+static ArrowErrorCode ArrowIpcEncoderBuildContiguousBodyBufferCallback(
+    struct ArrowBufferView buffer_view, struct ArrowIpcEncoder* encoder, 
int64_t* offset,
+    int64_t* length, struct ArrowError* error) {
+  struct ArrowBuffer* body_buffer = (struct 
ArrowBuffer*)encoder->encode_buffer_state;
+
+  int compressed_buffer_header =
+      encoder->codec != NANOARROW_IPC_COMPRESSION_TYPE_NONE ? sizeof(int64_t) 
: 0;

Review Comment:
   Does it make sense to have the `codec` member of `ArrowIpcEncoder`? It seems 
like that is a very simple way of handling compression that will have to change 
at some point (either to handle more complex ways of choosing which buffers to 
compress or adding options unrelated to compression).



##########
src/nanoarrow/ipc/encoder.c:
##########
@@ -424,3 +423,135 @@ ArrowErrorCode ArrowIpcEncoderEncodeSchema(struct 
ArrowIpcEncoder* encoder,
   FLATCC_RETURN_UNLESS_0(Message_bodyLength_add(builder, 0));
   return ns(Message_end_as_root(builder)) ? NANOARROW_OK : ENOMEM;
 }
+
+static ArrowErrorCode ArrowIpcEncoderBuildContiguousBodyBufferCallback(
+    struct ArrowBufferView buffer_view, struct ArrowIpcEncoder* encoder, 
int64_t* offset,
+    int64_t* length, struct ArrowError* error) {
+  struct ArrowBuffer* body_buffer = (struct 
ArrowBuffer*)encoder->encode_buffer_state;
+
+  int compressed_buffer_header =
+      encoder->codec != NANOARROW_IPC_COMPRESSION_TYPE_NONE ? sizeof(int64_t) 
: 0;
+  int64_t old_size = body_buffer->size_bytes;
+  int64_t buffer_begin = _ArrowRoundUpToMultipleOf8(old_size);
+  int64_t buffer_end = buffer_begin + compressed_buffer_header + 
buffer_view.size_bytes;
+  int64_t new_size = _ArrowRoundUpToMultipleOf8(buffer_end);
+
+  // reserve all the memory we'll need now
+  NANOARROW_RETURN_NOT_OK(ArrowBufferReserve(body_buffer, new_size - 
old_size));
+
+  // zero padding up to the start of the buffer
+  NANOARROW_RETURN_NOT_OK(ArrowBufferAppendFill(body_buffer, 0, buffer_begin - 
old_size));
+
+  // store offset and length of the buffer
+  *offset = buffer_begin;
+  *length = buffer_view.size_bytes;
+
+  if (compressed_buffer_header) {
+    // Signal that the buffer is not compressed; eventually we will set this 
to the
+    // decompressed length of an actually compressed buffer.
+    NANOARROW_RETURN_NOT_OK(ArrowBufferAppendInt64(body_buffer, -1));
+  }
+  NANOARROW_RETURN_NOT_OK(
+      ArrowBufferAppend(body_buffer, buffer_view.data.data, 
buffer_view.size_bytes));
+
+  // zero padding after writing the buffer
+  NANOARROW_DCHECK(body_buffer->size_bytes == buffer_end);
+  NANOARROW_RETURN_NOT_OK(ArrowBufferAppendFill(body_buffer, 0, new_size - 
buffer_end));
+
+  encoder->body_length = body_buffer->size_bytes;
+  return NANOARROW_OK;
+}
+
+void ArrowIpcEncoderBuildContiguousBodyBuffer(struct ArrowIpcEncoder* encoder,
+                                              struct ArrowBuffer* body_buffer) 
{
+  NANOARROW_DCHECK(encoder != NULL && encoder->private_data != NULL &&
+                   body_buffer != NULL);
+  encoder->encode_buffer = &ArrowIpcEncoderBuildContiguousBodyBufferCallback;
+  encoder->encode_buffer_state = body_buffer;
+}
+
+static ArrowErrorCode ArrowIpcEncoderEncodeRecordBatchImpl(
+    struct ArrowIpcEncoder* encoder, const struct ArrowArrayView* array_view,
+    struct ArrowBuffer* buffers, struct ArrowBuffer* nodes, struct ArrowError* 
error) {
+  if (array_view->offset != 0) {
+    ArrowErrorSet(error, "Cannot encode arrays with nonzero offset");
+    return ENOTSUP;
+  }
+
+  for (int64_t c = 0; c < array_view->n_children; ++c) {
+    const struct ArrowArrayView* child = array_view->children[c];
+
+    struct ns(FieldNode) node = {child->length, child->null_count};
+    NANOARROW_RETURN_NOT_OK(ArrowBufferAppend(nodes, &node, sizeof(node)));
+
+    for (int64_t b = 0; b < child->array->n_buffers; ++b) {
+      struct ns(Buffer) buffer;
+      NANOARROW_RETURN_NOT_OK(encoder->encode_buffer(
+          child->buffer_views[b], encoder, &buffer.offset, &buffer.length, 
error));
+      NANOARROW_RETURN_NOT_OK(ArrowBufferAppend(buffers, &buffer, 
sizeof(buffer)));
+    }
+
+    NANOARROW_RETURN_NOT_OK(
+        ArrowIpcEncoderEncodeRecordBatchImpl(encoder, child, buffers, nodes, 
error));
+  }
+  return NANOARROW_OK;
+}
+
+ArrowErrorCode ArrowIpcEncoderEncodeRecordBatch(struct ArrowIpcEncoder* 
encoder,
+                                                const struct ArrowArrayView* 
array_view,
+                                                struct ArrowError* error) {
+  NANOARROW_DCHECK(encoder != NULL && encoder->private_data != NULL && schema 
!= NULL);

Review Comment:
   Does `schema` exist here?



##########
src/nanoarrow/ipc/encoder.c:
##########
@@ -424,3 +423,135 @@ ArrowErrorCode ArrowIpcEncoderEncodeSchema(struct 
ArrowIpcEncoder* encoder,
   FLATCC_RETURN_UNLESS_0(Message_bodyLength_add(builder, 0));
   return ns(Message_end_as_root(builder)) ? NANOARROW_OK : ENOMEM;
 }
+
+static ArrowErrorCode ArrowIpcEncoderBuildContiguousBodyBufferCallback(
+    struct ArrowBufferView buffer_view, struct ArrowIpcEncoder* encoder, 
int64_t* offset,
+    int64_t* length, struct ArrowError* error) {
+  struct ArrowBuffer* body_buffer = (struct 
ArrowBuffer*)encoder->encode_buffer_state;
+
+  int compressed_buffer_header =
+      encoder->codec != NANOARROW_IPC_COMPRESSION_TYPE_NONE ? sizeof(int64_t) 
: 0;
+  int64_t old_size = body_buffer->size_bytes;
+  int64_t buffer_begin = _ArrowRoundUpToMultipleOf8(old_size);
+  int64_t buffer_end = buffer_begin + compressed_buffer_header + 
buffer_view.size_bytes;
+  int64_t new_size = _ArrowRoundUpToMultipleOf8(buffer_end);
+
+  // reserve all the memory we'll need now
+  NANOARROW_RETURN_NOT_OK(ArrowBufferReserve(body_buffer, new_size - 
old_size));
+
+  // zero padding up to the start of the buffer
+  NANOARROW_RETURN_NOT_OK(ArrowBufferAppendFill(body_buffer, 0, buffer_begin - 
old_size));
+
+  // store offset and length of the buffer
+  *offset = buffer_begin;
+  *length = buffer_view.size_bytes;
+
+  if (compressed_buffer_header) {
+    // Signal that the buffer is not compressed; eventually we will set this 
to the
+    // decompressed length of an actually compressed buffer.
+    NANOARROW_RETURN_NOT_OK(ArrowBufferAppendInt64(body_buffer, -1));
+  }
+  NANOARROW_RETURN_NOT_OK(
+      ArrowBufferAppend(body_buffer, buffer_view.data.data, 
buffer_view.size_bytes));
+
+  // zero padding after writing the buffer
+  NANOARROW_DCHECK(body_buffer->size_bytes == buffer_end);
+  NANOARROW_RETURN_NOT_OK(ArrowBufferAppendFill(body_buffer, 0, new_size - 
buffer_end));
+
+  encoder->body_length = body_buffer->size_bytes;
+  return NANOARROW_OK;
+}
+
+void ArrowIpcEncoderBuildContiguousBodyBuffer(struct ArrowIpcEncoder* encoder,
+                                              struct ArrowBuffer* body_buffer) 
{
+  NANOARROW_DCHECK(encoder != NULL && encoder->private_data != NULL &&
+                   body_buffer != NULL);
+  encoder->encode_buffer = &ArrowIpcEncoderBuildContiguousBodyBufferCallback;
+  encoder->encode_buffer_state = body_buffer;
+}
+
+static ArrowErrorCode ArrowIpcEncoderEncodeRecordBatchImpl(
+    struct ArrowIpcEncoder* encoder, const struct ArrowArrayView* array_view,
+    struct ArrowBuffer* buffers, struct ArrowBuffer* nodes, struct ArrowError* 
error) {
+  if (array_view->offset != 0) {
+    ArrowErrorSet(error, "Cannot encode arrays with nonzero offset");
+    return ENOTSUP;
+  }
+
+  for (int64_t c = 0; c < array_view->n_children; ++c) {
+    const struct ArrowArrayView* child = array_view->children[c];
+
+    struct ns(FieldNode) node = {child->length, child->null_count};
+    NANOARROW_RETURN_NOT_OK(ArrowBufferAppend(nodes, &node, sizeof(node)));
+
+    for (int64_t b = 0; b < child->array->n_buffers; ++b) {
+      struct ns(Buffer) buffer;
+      NANOARROW_RETURN_NOT_OK(encoder->encode_buffer(
+          child->buffer_views[b], encoder, &buffer.offset, &buffer.length, 
error));
+      NANOARROW_RETURN_NOT_OK(ArrowBufferAppend(buffers, &buffer, 
sizeof(buffer)));
+    }
+
+    NANOARROW_RETURN_NOT_OK(
+        ArrowIpcEncoderEncodeRecordBatchImpl(encoder, child, buffers, nodes, 
error));
+  }
+  return NANOARROW_OK;
+}
+
+ArrowErrorCode ArrowIpcEncoderEncodeRecordBatch(struct ArrowIpcEncoder* 
encoder,
+                                                const struct ArrowArrayView* 
array_view,
+                                                struct ArrowError* error) {
+  NANOARROW_DCHECK(encoder != NULL && encoder->private_data != NULL && schema 
!= NULL);
+
+  if (array_view->null_count != 0 && 
ArrowArrayViewComputeNullCount(array_view) != 0) {
+    ArrowErrorSet(error,
+                  "RecordBatches cannot be constructed from arrays with top 
level nulls");
+    return EINVAL;
+  }
+
+  if (array_view->storage_type != NANOARROW_TYPE_STRUCT) {
+    ArrowErrorSet(
+        error,
+        "RecordBatches cannot be constructed from arrays of type other than 
struct");
+    return EINVAL;
+  }
+
+  if (!encoder->encode_buffer) {
+    ArrowErrorSet(error, "No encode_buffer behavior provided when encoding 
RecordBatch");
+    return EINVAL;
+  }
+
+  struct ArrowIpcEncoderPrivate* private =
+      (struct ArrowIpcEncoderPrivate*)encoder->private_data;
+
+  flatcc_builder_t* builder = &private->builder;
+
+  FLATCC_RETURN_UNLESS_0(Message_start_as_root(builder));
+  FLATCC_RETURN_UNLESS_0(Message_version_add(builder, ns(MetadataVersion_V5)));
+
+  encoder->body_length = 0;
+
+  FLATCC_RETURN_UNLESS_0(Message_header_RecordBatch_start(builder));
+  if (encoder->codec != NANOARROW_IPC_COMPRESSION_TYPE_NONE) {
+    FLATCC_RETURN_UNLESS_0(RecordBatch_compression_start(builder));
+    FLATCC_RETURN_UNLESS_0(BodyCompression_codec_add(builder, encoder->codec));
+    FLATCC_RETURN_UNLESS_0(RecordBatch_compression_end(builder));
+  }
+  FLATCC_RETURN_UNLESS_0(RecordBatch_length_add(builder, array_view->length));
+
+  ArrowBufferResize(&private->buffers, 0, 0);
+  ArrowBufferResize(&private->nodes, 0, 0);
+  NANOARROW_RETURN_NOT_OK(ArrowIpcEncoderEncodeRecordBatchImpl(
+      encoder, array_view, &private->buffers, &private->nodes, error));
+
+  FLATCC_RETURN_UNLESS_0(RecordBatch_nodes_create(  //
+      builder, (struct ns(FieldNode)*)private->nodes.data,
+      private->nodes.size_bytes / sizeof(struct ns(FieldNode))));
+  FLATCC_RETURN_UNLESS_0(RecordBatch_buffers_create(  //
+      builder, (struct ns(Buffer)*)private->buffers.data,
+      private->buffers.size_bytes / sizeof(struct ns(Buffer))));

Review Comment:
   Is it worth just caching the `builder` for the RecordBatch message and the 
`int64_t*` or `ns(FieldNode)*` to the start of each (to avoid allocating a 
builder for every batch?). The decoder requires initializing with a schema to 
make some allocations that only need to happen once (here an `ArrowArrayView` 
would do).



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to