This is an automated email from the ASF dual-hosted git repository.
bkietz pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/arrow-nanoarrow.git
The following commit(s) were added to refs/heads/main by this push:
new 9668b18d feat: Add IPC schema encoding (#568)
9668b18d is described below
commit 9668b18da28618537bc6add298da38b4a9747d90
Author: Benjamin Kietzman <[email protected]>
AuthorDate: Tue Jul 30 20:11:14 2024 -0500
feat: Add IPC schema encoding (#568)
- added ArrowIpcEncoderEncodeSchema
- added a parameter to ArrowIpcEncoderFinalizeBuffer which controls
whether encapsulated/padded message buffers will be produced instead of
raw
- tests reuse the decoder tests, replacing arrow C++'s encoder with
ArrowIpcEncoder
Extracted from
https://github.com/apache/arrow-nanoarrow/pull/555#pullrequestreview-2186295630
---
src/nanoarrow/ipc/decoder_test.cc | 140 ++++++++++++++-
src/nanoarrow/ipc/encoder.c | 348 +++++++++++++++++++++++++++++++++++++-
src/nanoarrow/ipc/encoder_test.cc | 27 ++-
src/nanoarrow/nanoarrow_ipc.h | 17 +-
4 files changed, 517 insertions(+), 15 deletions(-)
diff --git a/src/nanoarrow/ipc/decoder_test.cc
b/src/nanoarrow/ipc/decoder_test.cc
index 1cebc063..45f4f53b 100644
--- a/src/nanoarrow/ipc/decoder_test.cc
+++ b/src/nanoarrow/ipc/decoder_test.cc
@@ -26,7 +26,7 @@
// For bswap32()
#include "flatcc/portable/pendian.h"
-#include "nanoarrow/nanoarrow_ipc.h"
+#include "nanoarrow/nanoarrow_ipc.hpp"
using namespace arrow;
@@ -469,6 +469,101 @@ TEST_P(ArrowTypeParameterizedTestFixture,
NanoarrowIpcArrowTypeRoundtrip) {
ArrowIpcDecoderReset(&decoder);
}
+std::string ArrowSchemaMetadataToString(const char* metadata) {
+ struct ArrowMetadataReader reader;
+ auto st = ArrowMetadataReaderInit(&reader, metadata);
+ NANOARROW_DCHECK(st == NANOARROW_OK);
+
+ bool comma = false;
+ std::string out;
+ while (reader.remaining_keys > 0) {
+ struct ArrowStringView key, value;
+ auto st = ArrowMetadataReaderRead(&reader, &key, &value);
+ NANOARROW_DCHECK(st == NANOARROW_OK);
+ if (comma) {
+ out += ", ";
+ }
+ comma = true;
+
+ out.append(key.data, key.size_bytes);
+ out += "=";
+ out.append(value.data, value.size_bytes);
+ }
+ return out;
+}
+
+std::string ArrowSchemaToString(const struct ArrowSchema* schema) {
+ std::string out;
+ size_t n = 1024;
+ while (out.size() < n) {
+ out.resize(n);
+ n = ArrowSchemaToString(schema, out.data(), out.size(),
/*recursive=*/false);
+ }
+
+ std::string metadata = ArrowSchemaMetadataToString(schema->metadata);
+ if (!metadata.empty()) {
+ out += "{" + metadata + "}";
+ }
+
+ bool comma = false;
+ if (schema->format[0] == '+') {
+ out += "<";
+ for (int64_t i = 0; i < schema->n_children; ++i) {
+ if (comma) {
+ out += ", ";
+ }
+ comma = true;
+
+ auto* child = schema->children[i];
+ if (child && child->name[0] != '\0') {
+ out += child->name;
+ out += ": ";
+ }
+ out += ArrowSchemaToString(schema->children[i]);
+ }
+ out += ">";
+ }
+
+ return out;
+}
+
+TEST_P(ArrowTypeParameterizedTestFixture, NanoarrowIpcNanoarrowTypeRoundtrip) {
+ nanoarrow::UniqueSchema schema;
+ ASSERT_TRUE(
+ arrow::ExportSchema(arrow::Schema({arrow::field("", GetParam())}),
schema.get())
+ .ok());
+
+ nanoarrow::ipc::UniqueEncoder encoder;
+ EXPECT_EQ(ArrowIpcEncoderInit(encoder.get()), NANOARROW_OK);
+
+ struct ArrowError error;
+ EXPECT_EQ(ArrowIpcEncoderEncodeSchema(encoder.get(), schema.get(), &error),
+ NANOARROW_OK)
+ << error.message;
+
+ nanoarrow::UniqueBuffer buffer;
+ EXPECT_EQ(
+ ArrowIpcEncoderFinalizeBuffer(encoder.get(), /*encapsulate=*/true,
buffer.get()),
+ NANOARROW_OK);
+
+ struct ArrowBufferView buffer_view;
+ buffer_view.data.data = buffer->data;
+ buffer_view.size_bytes = buffer->size_bytes;
+
+ nanoarrow::ipc::UniqueDecoder decoder;
+ ArrowIpcDecoderInit(decoder.get());
+ ASSERT_EQ(ArrowIpcDecoderVerifyHeader(decoder.get(), buffer_view, nullptr),
+ NANOARROW_OK);
+ ASSERT_EQ(ArrowIpcDecoderDecodeHeader(decoder.get(), buffer_view, nullptr),
+ NANOARROW_OK);
+
+ nanoarrow::UniqueSchema roundtripped;
+ ASSERT_EQ(ArrowIpcDecoderDecodeSchema(decoder.get(), roundtripped.get(),
nullptr),
+ NANOARROW_OK);
+
+ EXPECT_EQ(ArrowSchemaToString(roundtripped.get()),
ArrowSchemaToString(schema.get()));
+}
+
TEST(NanoarrowIpcTest, NanoarrowIpcDecodeSimpleRecordBatchFromShared) {
struct ArrowIpcDecoder decoder;
struct ArrowError error;
@@ -685,7 +780,8 @@ INSTANTIATE_TEST_SUITE_P(
arrow::duration(arrow::TimeUnit::SECOND),
arrow::duration(arrow::TimeUnit::MILLI),
arrow::duration(arrow::TimeUnit::MICRO),
arrow::duration(arrow::TimeUnit::NANO),
arrow::month_interval(), arrow::day_time_interval(),
- arrow::month_day_nano_interval(),
+ arrow::month_day_nano_interval(), arrow::list(arrow::int32()),
+ arrow::list(arrow::field("", arrow::int32())),
arrow::list(arrow::field("some_custom_name", arrow::int32())),
arrow::large_list(arrow::field("some_custom_name", arrow::int32())),
arrow::fixed_size_list(arrow::field("some_custom_name",
arrow::int32()), 123),
@@ -743,12 +839,50 @@ TEST_P(ArrowSchemaParameterizedTestFixture,
NanoarrowIpcArrowSchemaRoundtrip) {
ASSERT_TRUE(maybe_schema.ok());
// Better failure message if we first check for string equality
- EXPECT_EQ(maybe_schema.ValueUnsafe()->ToString(), arrow_schema->ToString());
+ EXPECT_EQ(maybe_schema.ValueUnsafe()->ToString(/*show_metadata=*/true),
+ arrow_schema->ToString(/*show_metadata=*/true));
EXPECT_TRUE(maybe_schema.ValueUnsafe()->Equals(arrow_schema, true));
ArrowIpcDecoderReset(&decoder);
}
+TEST_P(ArrowSchemaParameterizedTestFixture,
NanoarrowIpcNanoarrowSchemaRoundtrip) {
+ const std::shared_ptr<arrow::Schema>& arrow_schema = GetParam();
+
+ nanoarrow::UniqueSchema schema;
+ ASSERT_TRUE(arrow::ExportSchema(*arrow_schema, schema.get()).ok());
+
+ nanoarrow::ipc::UniqueEncoder encoder;
+ EXPECT_EQ(ArrowIpcEncoderInit(encoder.get()), NANOARROW_OK);
+
+ struct ArrowError error;
+ EXPECT_EQ(ArrowIpcEncoderEncodeSchema(encoder.get(), schema.get(), &error),
+ NANOARROW_OK)
+ << error.message;
+
+ nanoarrow::UniqueBuffer buffer;
+ EXPECT_EQ(
+ ArrowIpcEncoderFinalizeBuffer(encoder.get(), /*encapsulate=*/true,
buffer.get()),
+ NANOARROW_OK);
+
+ struct ArrowBufferView buffer_view;
+ buffer_view.data.data = buffer->data;
+ buffer_view.size_bytes = buffer->size_bytes;
+
+ nanoarrow::ipc::UniqueDecoder decoder;
+ ArrowIpcDecoderInit(decoder.get());
+ ASSERT_EQ(ArrowIpcDecoderVerifyHeader(decoder.get(), buffer_view, nullptr),
+ NANOARROW_OK);
+ ASSERT_EQ(ArrowIpcDecoderDecodeHeader(decoder.get(), buffer_view, nullptr),
+ NANOARROW_OK);
+
+ nanoarrow::UniqueSchema roundtripped;
+ ASSERT_EQ(ArrowIpcDecoderDecodeSchema(decoder.get(), roundtripped.get(),
nullptr),
+ NANOARROW_OK);
+
+ EXPECT_EQ(ArrowSchemaToString(roundtripped.get()),
ArrowSchemaToString(schema.get()));
+}
+
INSTANTIATE_TEST_SUITE_P(
NanoarrowIpcTest, ArrowSchemaParameterizedTestFixture,
::testing::Values(
diff --git a/src/nanoarrow/ipc/encoder.c b/src/nanoarrow/ipc/encoder.c
index b973e8aa..6813f61f 100644
--- a/src/nanoarrow/ipc/encoder.c
+++ b/src/nanoarrow/ipc/encoder.c
@@ -21,15 +21,28 @@
#include <string.h>
#include "flatcc/flatcc_builder.h"
+#include "nanoarrow/ipc/flatcc_generated.h"
#include "nanoarrow/nanoarrow.h"
#include "nanoarrow/nanoarrow_ipc.h"
+#define ns(x) FLATBUFFERS_WRAP_NAMESPACE(org_apache_arrow_flatbuf, x)
+
+#define FLATCC_RETURN_UNLESS_0(x) \
+ if (ns(x) != 0) return ENOMEM;
+
struct ArrowIpcEncoderPrivate {
flatcc_builder_t builder;
struct ArrowBuffer buffers;
struct ArrowBuffer nodes;
};
+static int32_t ArrowInt32ToLe(int32_t i) {
+ if (ArrowIpcSystemEndianness() == NANOARROW_IPC_ENDIANNESS_BIG) {
+ return bswap32(i);
+ }
+ return i;
+}
+
ArrowErrorCode ArrowIpcEncoderInit(struct ArrowIpcEncoder* encoder) {
NANOARROW_DCHECK(encoder != NULL);
memset(encoder, 0, sizeof(struct ArrowIpcEncoder));
@@ -60,25 +73,352 @@ void ArrowIpcEncoderReset(struct ArrowIpcEncoder* encoder)
{
}
ArrowErrorCode ArrowIpcEncoderFinalizeBuffer(struct ArrowIpcEncoder* encoder,
- struct ArrowBuffer* out) {
+ char encapsulate, struct
ArrowBuffer* out) {
NANOARROW_DCHECK(encoder != NULL && encoder->private_data != NULL && out !=
NULL);
struct ArrowIpcEncoderPrivate* private =
(struct ArrowIpcEncoderPrivate*)encoder->private_data;
int64_t size = (int64_t)flatcc_builder_get_buffer_size(&private->builder);
+ int32_t header[] = {-1, ArrowInt32ToLe((int32_t)size)};
+
if (size == 0) {
// Finalizing an empty flatcc_builder_t triggers an assertion
- return NANOARROW_OK;
+ return encapsulate ? ArrowBufferAppend(out, &header, sizeof(header)) :
NANOARROW_OK;
}
- void* data = flatcc_builder_get_direct_buffer(&private->builder, NULL);
+ const void* data = flatcc_builder_get_direct_buffer(&private->builder, NULL);
if (data == NULL) {
return ENOMEM;
}
- NANOARROW_RETURN_NOT_OK(ArrowBufferAppend(out, data, size));
+ int64_t i = out->size_bytes;
+ if (encapsulate) {
+ int64_t encapsulated_size =
+ _ArrowRoundUpToMultipleOf8(sizeof(int32_t) + sizeof(int32_t) + size);
+ NANOARROW_RETURN_NOT_OK(
+ ArrowBufferResize(out, out->size_bytes + encapsulated_size, 0));
+ } else {
+ NANOARROW_RETURN_NOT_OK(ArrowBufferResize(out, out->size_bytes + size, 0));
+ }
+
+ if (encapsulate) {
+ memcpy(out->data + i, &header, sizeof(header));
+ i += sizeof(header);
+ }
+
+ memcpy(out->data + i, data, size);
+ i += size;
+
+ // zero padding bytes, if any
+ memset(out->data + i, 0, out->size_bytes - i);
// don't deallocate yet, just wipe the builder's current Message
flatcc_builder_reset(&private->builder);
return NANOARROW_OK;
}
+
+static ArrowErrorCode ArrowIpcEncodeFieldType(flatcc_builder_t* builder,
+ const struct ArrowSchemaView*
schema_view,
+ struct ArrowError* error) {
+ switch (schema_view->type) {
+ case NANOARROW_TYPE_NA:
+ FLATCC_RETURN_UNLESS_0(Field_type_Null_create(builder));
+ return NANOARROW_OK;
+
+ case NANOARROW_TYPE_BOOL:
+ FLATCC_RETURN_UNLESS_0(Field_type_Bool_create(builder));
+ return NANOARROW_OK;
+
+ case NANOARROW_TYPE_UINT8:
+ case NANOARROW_TYPE_INT8:
+ FLATCC_RETURN_UNLESS_0(
+ Field_type_Int_create(builder, 8, schema_view->type ==
NANOARROW_TYPE_INT8));
+ return NANOARROW_OK;
+
+ case NANOARROW_TYPE_UINT16:
+ case NANOARROW_TYPE_INT16:
+ FLATCC_RETURN_UNLESS_0(
+ Field_type_Int_create(builder, 16, schema_view->type ==
NANOARROW_TYPE_INT16));
+ return NANOARROW_OK;
+
+ case NANOARROW_TYPE_UINT32:
+ case NANOARROW_TYPE_INT32:
+ FLATCC_RETURN_UNLESS_0(
+ Field_type_Int_create(builder, 32, schema_view->type ==
NANOARROW_TYPE_INT32));
+ return NANOARROW_OK;
+
+ case NANOARROW_TYPE_UINT64:
+ case NANOARROW_TYPE_INT64:
+ FLATCC_RETURN_UNLESS_0(
+ Field_type_Int_create(builder, 64, schema_view->type ==
NANOARROW_TYPE_INT64));
+ return NANOARROW_OK;
+
+ case NANOARROW_TYPE_HALF_FLOAT:
+ FLATCC_RETURN_UNLESS_0(
+ Field_type_FloatingPoint_create(builder, ns(Precision_HALF)));
+ return NANOARROW_OK;
+
+ case NANOARROW_TYPE_FLOAT:
+ FLATCC_RETURN_UNLESS_0(
+ Field_type_FloatingPoint_create(builder, ns(Precision_SINGLE)));
+ return NANOARROW_OK;
+
+ case NANOARROW_TYPE_DOUBLE:
+ FLATCC_RETURN_UNLESS_0(
+ Field_type_FloatingPoint_create(builder, ns(Precision_DOUBLE)));
+ return NANOARROW_OK;
+
+ case NANOARROW_TYPE_DECIMAL128:
+ case NANOARROW_TYPE_DECIMAL256:
+ FLATCC_RETURN_UNLESS_0(Field_type_Decimal_create(
+ builder, schema_view->decimal_precision, schema_view->decimal_scale,
+ schema_view->decimal_bitwidth));
+ return NANOARROW_OK;
+
+ case NANOARROW_TYPE_STRING:
+ FLATCC_RETURN_UNLESS_0(Field_type_Utf8_create(builder));
+ return NANOARROW_OK;
+
+ case NANOARROW_TYPE_LARGE_STRING:
+ FLATCC_RETURN_UNLESS_0(Field_type_LargeUtf8_create(builder));
+ return NANOARROW_OK;
+
+ case NANOARROW_TYPE_BINARY:
+ FLATCC_RETURN_UNLESS_0(Field_type_Binary_create(builder));
+ return NANOARROW_OK;
+
+ case NANOARROW_TYPE_LARGE_BINARY:
+ FLATCC_RETURN_UNLESS_0(Field_type_LargeBinary_create(builder));
+ return NANOARROW_OK;
+
+ case NANOARROW_TYPE_DATE32:
+ FLATCC_RETURN_UNLESS_0(Field_type_Date_create(builder,
ns(DateUnit_DAY)));
+ return NANOARROW_OK;
+
+ case NANOARROW_TYPE_DATE64:
+ FLATCC_RETURN_UNLESS_0(Field_type_Date_create(builder,
ns(DateUnit_MILLISECOND)));
+ return NANOARROW_OK;
+
+ case NANOARROW_TYPE_INTERVAL_MONTHS:
+ FLATCC_RETURN_UNLESS_0(
+ Field_type_Interval_create(builder, ns(IntervalUnit_YEAR_MONTH)));
+ return NANOARROW_OK;
+
+ case NANOARROW_TYPE_INTERVAL_DAY_TIME:
+ FLATCC_RETURN_UNLESS_0(
+ Field_type_Interval_create(builder, ns(IntervalUnit_DAY_TIME)));
+ return NANOARROW_OK;
+
+ case NANOARROW_TYPE_INTERVAL_MONTH_DAY_NANO:
+ FLATCC_RETURN_UNLESS_0(
+ Field_type_Interval_create(builder,
ns(IntervalUnit_MONTH_DAY_NANO)));
+ return NANOARROW_OK;
+
+ case NANOARROW_TYPE_TIMESTAMP:
+ FLATCC_RETURN_UNLESS_0(Field_type_Timestamp_start(builder));
+ FLATCC_RETURN_UNLESS_0(Timestamp_unit_add(builder,
schema_view->time_unit));
+ FLATCC_RETURN_UNLESS_0(
+ Timestamp_timezone_create_str(builder, schema_view->timezone));
+ FLATCC_RETURN_UNLESS_0(Field_type_Timestamp_end(builder));
+ return NANOARROW_OK;
+
+ case NANOARROW_TYPE_TIME32:
+ FLATCC_RETURN_UNLESS_0(Field_type_Time_create(builder,
schema_view->time_unit, 32));
+ return NANOARROW_OK;
+
+ case NANOARROW_TYPE_TIME64:
+ FLATCC_RETURN_UNLESS_0(Field_type_Time_create(builder,
schema_view->time_unit, 64));
+ return NANOARROW_OK;
+
+ case NANOARROW_TYPE_DURATION:
+ FLATCC_RETURN_UNLESS_0(Field_type_Duration_create(builder,
schema_view->time_unit));
+ return NANOARROW_OK;
+
+ case NANOARROW_TYPE_FIXED_SIZE_BINARY:
+ FLATCC_RETURN_UNLESS_0(
+ Field_type_FixedSizeBinary_create(builder, schema_view->fixed_size));
+ return NANOARROW_OK;
+
+ case NANOARROW_TYPE_LIST:
+ FLATCC_RETURN_UNLESS_0(Field_type_List_create(builder));
+ return NANOARROW_OK;
+
+ case NANOARROW_TYPE_LARGE_LIST:
+ FLATCC_RETURN_UNLESS_0(Field_type_LargeList_create(builder));
+ return NANOARROW_OK;
+
+ case NANOARROW_TYPE_FIXED_SIZE_LIST:
+ FLATCC_RETURN_UNLESS_0(
+ Field_type_FixedSizeList_create(builder, schema_view->fixed_size));
+ return NANOARROW_OK;
+
+ case NANOARROW_TYPE_RUN_END_ENCODED:
+ FLATCC_RETURN_UNLESS_0(Field_type_RunEndEncoded_create(builder));
+ return NANOARROW_OK;
+
+ case NANOARROW_TYPE_STRUCT:
+ FLATCC_RETURN_UNLESS_0(Field_type_Struct__create(builder));
+ return NANOARROW_OK;
+
+ case NANOARROW_TYPE_SPARSE_UNION:
+ case NANOARROW_TYPE_DENSE_UNION: {
+ FLATCC_RETURN_UNLESS_0(Field_type_Union_start(builder));
+
+ FLATCC_RETURN_UNLESS_0(
+ Union_mode_add(builder, schema_view->type ==
NANOARROW_TYPE_DENSE_UNION));
+ if (schema_view->union_type_ids) {
+ int8_t type_ids[128];
+ int n = _ArrowParseUnionTypeIds(schema_view->union_type_ids, type_ids);
+ if (n != 0) {
+ FLATCC_RETURN_UNLESS_0(Union_typeIds_start(builder));
+ int32_t* type_ids_32 = (int32_t*)ns(Union_typeIds_extend(builder,
n));
+ if (!type_ids_32) {
+ return ENOMEM;
+ }
+
+ for (int i = 0; i < n; ++i) {
+ type_ids_32[i] = type_ids[i];
+ }
+ FLATCC_RETURN_UNLESS_0(Union_typeIds_end(builder));
+ }
+ }
+
+ FLATCC_RETURN_UNLESS_0(Field_type_Union_end(builder));
+ return NANOARROW_OK;
+ }
+
+ case NANOARROW_TYPE_MAP:
+ FLATCC_RETURN_UNLESS_0(Field_type_Map_create(
+ builder, schema_view->schema->flags & ARROW_FLAG_MAP_KEYS_SORTED));
+ return NANOARROW_OK;
+
+ case NANOARROW_TYPE_DICTIONARY:
+ ArrowErrorSet(error, "IPC encoding of dictionary types unsupported");
+ return ENOTSUP;
+
+ default:
+ ArrowErrorSet(error, "Expected a valid enum ArrowType value but found
%d",
+ schema_view->type);
+ return EINVAL;
+ }
+}
+
+static ArrowErrorCode ArrowIpcEncodeField(flatcc_builder_t* builder,
+ const struct ArrowSchema* schema,
+ struct ArrowError* error);
+
+static ArrowErrorCode ArrowIpcEncodeMetadata(flatcc_builder_t* builder,
+ const struct ArrowSchema* schema,
+ int
(*push_start)(flatcc_builder_t*),
+ ns(KeyValue_ref_t) *
+
(*push_end)(flatcc_builder_t*),
+ struct ArrowError* error) {
+ struct ArrowMetadataReader metadata;
+ NANOARROW_RETURN_NOT_OK(ArrowMetadataReaderInit(&metadata,
schema->metadata));
+ while (metadata.remaining_keys > 0) {
+ struct ArrowStringView key, value;
+ NANOARROW_RETURN_NOT_OK(ArrowMetadataReaderRead(&metadata, &key, &value));
+ if (push_start(builder) != 0) {
+ return ENOMEM;
+ }
+ FLATCC_RETURN_UNLESS_0(KeyValue_key_create_strn(builder, key.data,
key.size_bytes));
+ FLATCC_RETURN_UNLESS_0(
+ KeyValue_value_create_strn(builder, value.data, value.size_bytes));
+ if (!push_end(builder)) {
+ return ENOMEM;
+ }
+ }
+ return NANOARROW_OK;
+}
+
+static ArrowErrorCode ArrowIpcEncodeFields(flatcc_builder_t* builder,
+ const struct ArrowSchema* schema,
+ int
(*push_start)(flatcc_builder_t*),
+ ns(Field_ref_t) *
+ (*push_end)(flatcc_builder_t*),
+ struct ArrowError* error) {
+ for (int i = 0; i < schema->n_children; ++i) {
+ if (push_start(builder) != 0) {
+ return ENOMEM;
+ }
+ NANOARROW_RETURN_NOT_OK(ArrowIpcEncodeField(builder, schema->children[i],
error));
+ if (!push_end(builder)) {
+ return ENOMEM;
+ }
+ }
+ return NANOARROW_OK;
+}
+
+static ArrowErrorCode ArrowIpcEncodeField(flatcc_builder_t* builder,
+ const struct ArrowSchema* schema,
+ struct ArrowError* error) {
+ FLATCC_RETURN_UNLESS_0(Field_name_create_str(builder, schema->name));
+ FLATCC_RETURN_UNLESS_0(
+ Field_nullable_add(builder, schema->flags & ARROW_FLAG_NULLABLE));
+
+ struct ArrowSchemaView schema_view;
+ NANOARROW_RETURN_NOT_OK(ArrowSchemaViewInit(&schema_view, schema, error));
+ NANOARROW_RETURN_NOT_OK(ArrowIpcEncodeFieldType(builder, &schema_view,
error));
+
+ if (schema->n_children != 0) {
+ FLATCC_RETURN_UNLESS_0(Field_children_start(builder));
+ NANOARROW_RETURN_NOT_OK(ArrowIpcEncodeFields(builder, schema,
+
&ns(Field_children_push_start),
+ &ns(Field_children_push_end),
error));
+ FLATCC_RETURN_UNLESS_0(Field_children_end(builder));
+ }
+
+ if (schema->metadata) {
+ FLATCC_RETURN_UNLESS_0(Field_custom_metadata_start(builder));
+ NANOARROW_RETURN_NOT_OK(
+ ArrowIpcEncodeMetadata(builder, schema,
&ns(Field_custom_metadata_push_start),
+ &ns(Field_custom_metadata_push_end), error));
+ FLATCC_RETURN_UNLESS_0(Field_custom_metadata_end(builder));
+ }
+ return NANOARROW_OK;
+}
+
+ArrowErrorCode ArrowIpcEncoderEncodeSchema(struct ArrowIpcEncoder* encoder,
+ const struct ArrowSchema* schema,
+ struct ArrowError* error) {
+ NANOARROW_DCHECK(encoder != NULL && encoder->private_data != NULL && schema
!= NULL);
+
+ struct ArrowIpcEncoderPrivate* private =
+ (struct ArrowIpcEncoderPrivate*)encoder->private_data;
+
+ flatcc_builder_t* builder = &private->builder;
+
+ FLATCC_RETURN_UNLESS_0(Message_start_as_root(builder));
+ FLATCC_RETURN_UNLESS_0(Message_version_add(builder, ns(MetadataVersion_V5)));
+
+ FLATCC_RETURN_UNLESS_0(Message_header_Schema_start(builder));
+
+ FLATCC_RETURN_UNLESS_0(Schema_endianness_add(builder,
ArrowIpcSystemEndianness()));
+
+ FLATCC_RETURN_UNLESS_0(Schema_fields_start(builder));
+ NANOARROW_RETURN_NOT_OK(ArrowIpcEncodeFields(builder, schema,
+ &ns(Schema_fields_push_start),
+ &ns(Schema_fields_push_end),
error));
+ FLATCC_RETURN_UNLESS_0(Schema_fields_end(builder));
+
+ if (schema->metadata) {
+ FLATCC_RETURN_UNLESS_0(Schema_custom_metadata_start(builder));
+ NANOARROW_RETURN_NOT_OK(
+ ArrowIpcEncodeMetadata(builder, schema,
&ns(Schema_custom_metadata_push_start),
+ &ns(Schema_custom_metadata_push_end), error));
+ FLATCC_RETURN_UNLESS_0(Schema_custom_metadata_end(builder));
+ }
+
+ FLATCC_RETURN_UNLESS_0(Schema_features_start(builder));
+ ns(Feature_enum_t)* features = ns(Schema_features_extend(builder, 1));
+ if (!features) {
+ return ENOMEM;
+ }
+ features[0] = ns(Feature_COMPRESSED_BODY);
+ FLATCC_RETURN_UNLESS_0(Schema_features_end(builder));
+
+ FLATCC_RETURN_UNLESS_0(Message_header_Schema_end(builder));
+
+ FLATCC_RETURN_UNLESS_0(Message_bodyLength_add(builder, 0));
+ return ns(Message_end_as_root(builder)) ? NANOARROW_OK : ENOMEM;
+}
diff --git a/src/nanoarrow/ipc/encoder_test.cc
b/src/nanoarrow/ipc/encoder_test.cc
index 8f78b063..f5e6659f 100644
--- a/src/nanoarrow/ipc/encoder_test.cc
+++ b/src/nanoarrow/ipc/encoder_test.cc
@@ -40,9 +40,9 @@ TEST(NanoarrowIpcTest, NanoarrowIpcEncoderConstruction) {
EXPECT_EQ(encoder->encode_buffer, nullptr);
EXPECT_EQ(encoder->encode_buffer_state, nullptr);
- auto* priv = static_cast<struct
ArrowIpcEncoderPrivate*>(encoder->private_data);
- ASSERT_NE(priv, nullptr);
- for (auto* b : {&priv->buffers, &priv->nodes}) {
+ auto* p = static_cast<struct ArrowIpcEncoderPrivate*>(encoder->private_data);
+ ASSERT_NE(p, nullptr);
+ for (auto* b : {&p->buffers, &p->nodes}) {
// Buffers are empty but initialized with the default allocator
EXPECT_EQ(b->size_bytes, 0);
@@ -52,11 +52,26 @@ TEST(NanoarrowIpcTest, NanoarrowIpcEncoderConstruction) {
// Empty buffer works
nanoarrow::UniqueBuffer buffer;
- EXPECT_EQ(ArrowIpcEncoderFinalizeBuffer(encoder.get(), buffer.get()),
NANOARROW_OK);
+ EXPECT_EQ(
+ ArrowIpcEncoderFinalizeBuffer(encoder.get(), /*encapsulate=*/false,
buffer.get()),
+ NANOARROW_OK);
EXPECT_EQ(buffer->size_bytes, 0);
+ EXPECT_EQ(
+ ArrowIpcEncoderFinalizeBuffer(encoder.get(), /*encapsulate=*/true,
buffer.get()),
+ NANOARROW_OK);
+ EXPECT_EQ(buffer->size_bytes, 8);
// Append a string (finalizing an empty buffer is an error for
flatcc_builder_t)
- EXPECT_NE(flatcc_builder_create_string_str(&priv->builder, "hello world"),
0);
- EXPECT_EQ(ArrowIpcEncoderFinalizeBuffer(encoder.get(), buffer.get()),
NANOARROW_OK);
+ EXPECT_NE(flatcc_builder_create_string_str(&p->builder, "hello world"), 0);
+ EXPECT_EQ(
+ ArrowIpcEncoderFinalizeBuffer(encoder.get(), /*encapsulate=*/false,
buffer.get()),
+ NANOARROW_OK);
EXPECT_GT(buffer->size_bytes, sizeof("hello world"));
+
+ EXPECT_NE(flatcc_builder_create_string_str(&p->builder, "hello world"), 0);
+ EXPECT_EQ(
+ ArrowIpcEncoderFinalizeBuffer(encoder.get(), /*encapsulate=*/true,
buffer.get()),
+ NANOARROW_OK);
+ EXPECT_GT(buffer->size_bytes, 8 + sizeof("hello world"));
+ EXPECT_EQ(buffer->size_bytes % 8, 0);
}
diff --git a/src/nanoarrow/nanoarrow_ipc.h b/src/nanoarrow/nanoarrow_ipc.h
index ab7b9eb9..c74e288a 100644
--- a/src/nanoarrow/nanoarrow_ipc.h
+++ b/src/nanoarrow/nanoarrow_ipc.h
@@ -61,6 +61,8 @@
#define ArrowIpcEncoderReset NANOARROW_SYMBOL(NANOARROW_NAMESPACE,
ArrowIpcEncoderReset)
#define ArrowIpcEncoderFinalizeBuffer \
NANOARROW_SYMBOL(NANOARROW_NAMESPACE, ArrowIpcEncoderFinalizeBuffer)
+#define ArrowIpcEncoderEncodeSchema \
+ NANOARROW_SYMBOL(NANOARROW_NAMESPACE, ArrowIpcEncoderEncodeSchema)
#endif
@@ -436,11 +438,22 @@ ArrowErrorCode ArrowIpcEncoderInit(struct
ArrowIpcEncoder* encoder);
/// \brief Release all resources attached to an encoder
void ArrowIpcEncoderReset(struct ArrowIpcEncoder* encoder);
-/// \brief Finalize the most recently encoded message to a buffer
+/// \brief Finalize the most recently encoded message into a buffer
+///
+/// If specified, the message will be encapsulated (prefixed with the
continuation
+/// marker and the header size and 0-padded to a multiple of 8 bytes).
///
/// The bytes of the encoded message will be appended to the provided buffer.
ArrowErrorCode ArrowIpcEncoderFinalizeBuffer(struct ArrowIpcEncoder* encoder,
- struct ArrowBuffer* out);
+ char encapsulate, struct
ArrowBuffer* out);
+
+/// \brief Encode an ArrowSchema
+///
+/// Returns ENOMEM if allocation fails, NANOARROW_OK otherwise.
+ArrowErrorCode ArrowIpcEncoderEncodeSchema(struct ArrowIpcEncoder* encoder,
+ const struct ArrowSchema* schema,
+ struct ArrowError* error);
+
/// @}
#ifdef __cplusplus