bkietz commented on code in PR #555:
URL: https://github.com/apache/arrow-nanoarrow/pull/555#discussion_r1702111857
##########
src/nanoarrow/ipc/encoder.c:
##########
@@ -80,8 +76,11 @@ ArrowErrorCode ArrowIpcEncoderFinalizeBuffer(struct
ArrowIpcEncoder* encoder,
struct ArrowIpcEncoderPrivate* private =
(struct ArrowIpcEncoderPrivate*)encoder->private_data;
- int64_t size = (int64_t)flatcc_builder_get_buffer_size(&private->builder);
- int32_t header[] = {-1, ArrowInt32ToLe((int32_t)size)};
+ int32_t size = (int32_t)flatcc_builder_get_buffer_size(&private->builder);
Review Comment:
I'm not sure what error you mean; overflowing `int32_t`?
##########
src/nanoarrow/ipc/decoder_test.cc:
##########
@@ -763,6 +764,132 @@ TEST_P(ArrowTypeParameterizedTestFixture,
NanoarrowIpcArrowArrayRoundtrip) {
ArrowIpcDecoderReset(&decoder);
}
+struct ArrowArrayViewEqualTo {
+ const struct ArrowArrayView* expected;
+
+ using is_gtest_matcher = void;
+
+ bool MatchAndExplain(const struct ArrowArrayView* actual, std::ostream* os)
const {
+ return MatchAndExplain({}, actual, expected, os);
+ }
+
+ static bool MatchAndExplain(std::vector<int> field_path,
+ const struct ArrowArrayView* actual,
+ const struct ArrowArrayView* expected,
std::ostream* os) {
+ auto prefixed = [&]() -> std::ostream& {
+ if (!field_path.empty()) {
+ for (int i : field_path) {
+ *os << "." << i;
+ }
+ *os << ":";
+ }
+ return *os;
+ };
+
+ NANOARROW_DCHECK(actual->offset == 0);
+ NANOARROW_DCHECK(expected->offset == 0);
+
+ if (actual->length != expected->length) {
+ prefixed() << "expected length=" << expected->length << "\n";
+ prefixed() << " actual length=" << actual->length << "\n";
+ return false;
+ }
+
+ auto null_count = [](const struct ArrowArrayView* a) {
+ return a->null_count != -1 ? a->null_count :
ArrowArrayViewComputeNullCount(a);
+ };
+ if (null_count(actual) != null_count(expected)) {
+ prefixed() << "expected null_count=" << null_count(expected) << "\n";
+ prefixed() << " actual null_count=" << null_count(actual) << "\n";
+ return false;
+ }
+
+ for (int64_t i = 0; actual->layout.buffer_type[i] !=
NANOARROW_BUFFER_TYPE_NONE &&
+ i < NANOARROW_MAX_FIXED_BUFFERS;
+ ++i) {
Review Comment:
okay
##########
src/nanoarrow/ipc/decoder_test.cc:
##########
@@ -763,6 +764,132 @@ TEST_P(ArrowTypeParameterizedTestFixture,
NanoarrowIpcArrowArrayRoundtrip) {
ArrowIpcDecoderReset(&decoder);
}
+struct ArrowArrayViewEqualTo {
+ const struct ArrowArrayView* expected;
+
+ using is_gtest_matcher = void;
+
+ bool MatchAndExplain(const struct ArrowArrayView* actual, std::ostream* os)
const {
+ return MatchAndExplain({}, actual, expected, os);
+ }
+
+ static bool MatchAndExplain(std::vector<int> field_path,
+ const struct ArrowArrayView* actual,
+ const struct ArrowArrayView* expected,
std::ostream* os) {
+ auto prefixed = [&]() -> std::ostream& {
+ if (!field_path.empty()) {
+ for (int i : field_path) {
+ *os << "." << i;
+ }
+ *os << ":";
+ }
+ return *os;
+ };
+
+ NANOARROW_DCHECK(actual->offset == 0);
+ NANOARROW_DCHECK(expected->offset == 0);
+
+ if (actual->length != expected->length) {
+ prefixed() << "expected length=" << expected->length << "\n";
+ prefixed() << " actual length=" << actual->length << "\n";
+ return false;
+ }
+
+ auto null_count = [](const struct ArrowArrayView* a) {
+ return a->null_count != -1 ? a->null_count :
ArrowArrayViewComputeNullCount(a);
+ };
+ if (null_count(actual) != null_count(expected)) {
+ prefixed() << "expected null_count=" << null_count(expected) << "\n";
+ prefixed() << " actual null_count=" << null_count(actual) << "\n";
+ return false;
+ }
+
+ for (int64_t i = 0; actual->layout.buffer_type[i] !=
NANOARROW_BUFFER_TYPE_NONE &&
+ i < NANOARROW_MAX_FIXED_BUFFERS;
+ ++i) {
+ auto a_buf = actual->buffer_views[i];
+ auto e_buf = expected->buffer_views[i];
+ if (a_buf.size_bytes != e_buf.size_bytes) {
+ prefixed() << "expected buffer[" << i << "].size=" << e_buf.size_bytes
<< "\n";
+ prefixed() << " actual buffer[" << i << "].size=" << a_buf.size_bytes
<< "\n";
+ return false;
+ }
+ if (memcmp(a_buf.data.data, e_buf.data.data, a_buf.size_bytes) != 0) {
+ prefixed() << "expected buffer[" << i << "]'s data to match\n";
+ return false;
+ }
+ }
+
+ field_path.push_back(0);
+ for (int64_t i = 0; i < actual->n_children; ++i) {
+ field_path.back() = i;
+ if (!MatchAndExplain(field_path, actual->children[i],
expected->children[i], os)) {
+ return false;
+ }
+ }
+ return true;
+ }
+
+ void DescribeTo(std::ostream* os) const { *os << "is equivalent to the array
view"; }
+ void DescribeNegationTo(std::ostream* os) const {
+ *os << "is not equivalent to the array view";
+ }
+};
+
+TEST_P(ArrowTypeParameterizedTestFixture, NanoarrowIpcNanoarrowArrayRoundtrip)
{
+ struct ArrowError error;
+ nanoarrow::UniqueSchema schema;
+ ASSERT_TRUE(
+ arrow::ExportSchema(arrow::Schema({arrow::field("", GetParam())}),
schema.get())
+ .ok());
+
+ // now make one empty struct array with this schema and another with all
zeroes
+ nanoarrow::UniqueArray empty_array, zero_array;
+ for (auto* array : {empty_array.get(), zero_array.get()}) {
+ ASSERT_EQ(ArrowArrayInitFromSchema(array, schema.get(), nullptr),
NANOARROW_OK);
+ ASSERT_EQ(ArrowArrayStartAppending(array), NANOARROW_OK);
+ if (array == zero_array.get()) {
+ ASSERT_EQ(ArrowArrayAppendEmpty(array, 5), NANOARROW_OK);
+ }
+ ASSERT_EQ(ArrowArrayFinishBuildingDefault(array, nullptr), NANOARROW_OK);
+
+ nanoarrow::UniqueArrayView array_view;
+ ASSERT_EQ(ArrowArrayViewInitFromSchema(array_view.get(), schema.get(),
&error),
+ NANOARROW_OK);
Review Comment:
didn't know that; it seemed like initfromschema always needed to precede
setarray. Thanks!
##########
src/nanoarrow/ipc/decoder_test.cc:
##########
@@ -763,6 +764,132 @@ TEST_P(ArrowTypeParameterizedTestFixture,
NanoarrowIpcArrowArrayRoundtrip) {
ArrowIpcDecoderReset(&decoder);
}
+struct ArrowArrayViewEqualTo {
Review Comment:
can we leave extraction of an array equality helper for a follow up?
##########
src/nanoarrow/nanoarrow_ipc.h:
##########
@@ -461,6 +465,21 @@ ArrowErrorCode ArrowIpcEncoderFinalizeBuffer(struct
ArrowIpcEncoder* encoder,
ArrowErrorCode ArrowIpcEncoderEncodeSchema(struct ArrowIpcEncoder* encoder,
const struct ArrowSchema* schema,
struct ArrowError* error);
+/// \brief Set the encoder to concatenate encoded buffers into body_buffer
+///
+/// encoder->encode_buffer_state will point to the provided ArrowBuffer.
+/// The contiguous body buffer will be appended to this during
+/// ArrowIpcEncoderEncodeRecordBatch.
+void ArrowIpcEncoderBuildContiguousBodyBuffer(struct ArrowIpcEncoder* encoder,
+ struct ArrowBuffer* body_buffer);
+
+/// \brief Encode a struct typed ArrayView to a flatbuffer RecordBatch,
embedded in a
+/// Message.
+///
+/// Returns ENOMEM if allocation fails, NANOARROW_OK otherwise.
+ArrowErrorCode ArrowIpcEncoderEncodeRecordBatch(struct ArrowIpcEncoder*
encoder,
+ const struct ArrowArrayView*
array_view,
+ struct ArrowError* error);
Review Comment:
okay
##########
src/nanoarrow/ipc/encoder.c:
##########
@@ -424,3 +423,135 @@ ArrowErrorCode ArrowIpcEncoderEncodeSchema(struct
ArrowIpcEncoder* encoder,
FLATCC_RETURN_UNLESS_0(Message_bodyLength_add(builder, 0));
return ns(Message_end_as_root(builder)) ? NANOARROW_OK : ENOMEM;
}
+
+static ArrowErrorCode ArrowIpcEncoderBuildContiguousBodyBufferCallback(
+ struct ArrowBufferView buffer_view, struct ArrowIpcEncoder* encoder,
int64_t* offset,
+ int64_t* length, struct ArrowError* error) {
+ struct ArrowBuffer* body_buffer = (struct
ArrowBuffer*)encoder->encode_buffer_state;
+
+ int compressed_buffer_header =
+ encoder->codec != NANOARROW_IPC_COMPRESSION_TYPE_NONE ? sizeof(int64_t)
: 0;
+ int64_t old_size = body_buffer->size_bytes;
+ int64_t buffer_begin = _ArrowRoundUpToMultipleOf8(old_size);
+ int64_t buffer_end = buffer_begin + compressed_buffer_header +
buffer_view.size_bytes;
+ int64_t new_size = _ArrowRoundUpToMultipleOf8(buffer_end);
+
+ // reserve all the memory we'll need now
+ NANOARROW_RETURN_NOT_OK(ArrowBufferReserve(body_buffer, new_size -
old_size));
+
+ // zero padding up to the start of the buffer
+ NANOARROW_RETURN_NOT_OK(ArrowBufferAppendFill(body_buffer, 0, buffer_begin -
old_size));
+
+ // store offset and length of the buffer
+ *offset = buffer_begin;
+ *length = buffer_view.size_bytes;
+
+ if (compressed_buffer_header) {
+ // Signal that the buffer is not compressed; eventually we will set this
to the
+ // decompressed length of an actually compressed buffer.
+ NANOARROW_RETURN_NOT_OK(ArrowBufferAppendInt64(body_buffer, -1));
+ }
Review Comment:
Would it really be preferable to raise an error for "codec not supported?"
##########
src/nanoarrow/ipc/encoder.c:
##########
@@ -424,3 +423,135 @@ ArrowErrorCode ArrowIpcEncoderEncodeSchema(struct
ArrowIpcEncoder* encoder,
FLATCC_RETURN_UNLESS_0(Message_bodyLength_add(builder, 0));
return ns(Message_end_as_root(builder)) ? NANOARROW_OK : ENOMEM;
}
+
+static ArrowErrorCode ArrowIpcEncoderBuildContiguousBodyBufferCallback(
+ struct ArrowBufferView buffer_view, struct ArrowIpcEncoder* encoder,
int64_t* offset,
+ int64_t* length, struct ArrowError* error) {
+ struct ArrowBuffer* body_buffer = (struct
ArrowBuffer*)encoder->encode_buffer_state;
Review Comment:
okay
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]