paleolimbot commented on code in PR #555:
URL: https://github.com/apache/arrow-nanoarrow/pull/555#discussion_r1673226278
##########
src/nanoarrow/nanoarrow_ipc.h:
##########
@@ -379,6 +381,49 @@ ArrowErrorCode ArrowIpcArrayStreamReaderInit(
struct ArrowArrayStream* out, struct ArrowIpcInputStream* input_stream,
struct ArrowIpcArrayStreamReaderOptions* options);
+/// \brief Encode an ArrowSchema
+///
+/// The buffer will be allocated by flatcc.
+///
+/// Note: the resulting Message is not encapsulated; it does not include a
continuation indicator,
+/// inline metadata size, or padding bytes.
+///
+/// Returns ENOMEM if allocation fails, NANOARROW_OK otherwise.
+ArrowErrorCode ArrowIpcEncodeSchema(
+ struct ArrowBuffer* out,
+ const struct ArrowSchema* schema,
+ struct ArrowError* error);
+
+/// \brief Callback invoked against each buffer to be encoded.
+///
+/// Encoding of buffers is left as a callback to accommodate dissociated data
storage.
+/// One implementation of this callback might copy all buffers into a
contiguous body
+/// for use in an arrow IPC stream, another implementation might store offsets
and lengths
+/// relative to a known arena.
+typedef ArrowErrorCode (*ArrowIpcBufferEncodeCallback)(struct ArrowBufferView
to_be_encoded,
+ int64_t* offset,
int64_t* body_length,
+ void *user_data, struct
ArrowError* error);
+
+/// \brief Encode a struct typed ArrayView to a flatbuffer RecordBatch,
embedded in a Message.
+///
+/// The buffer will be allocated by flatcc.
+///
+/// Note: the resulting Message is not encapsulated; it does not include a
continuation indicator,
+/// inline metadata size, or padding bytes.
+///
+/// This function may be called with null out buffer, in which case the
callback will be invoked
+/// but no message will be encoded.
+///
+/// Returns ENOMEM if allocation fails, NANOARROW_OK otherwise.
+ArrowErrorCode ArrowIpcEncodeRecordBatch(
+ struct ArrowBuffer* out,
+ struct ArrowArray* to_be_encoded,
+ ArrowIpcBufferEncodeCallback encode_buffer,
+ void *user_data, struct ArrowError* error);
+
+/// \brief Encapsulate a Message stored in a buffer by prepending the
continuation indicator and size.
+ArrowErrorCode ArrowIpcEncodeEncapsulatedMessage(struct ArrowBuffer* buffer);
Review Comment:
This is great for now, although I wonder if `ArrowErrorCode
ArrowIpcEncapsulateMessage(struct ArrowBuffer* buffer, struct
ArrowIpcOutputStream* output_stream);` is closer to what we'll actually need
for the high-level version of this that writes out an `ArrowArrayStream`.
##########
src/nanoarrow/ipc/encoder_test.cc:
##########
@@ -0,0 +1,137 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include <arrow/array.h>
+#include <arrow/c/bridge.h>
+#include <arrow/io/memory.h>
+#include <arrow/ipc/api.h>
+#include <arrow/util/key_value_metadata.h>
+#include <gtest/gtest.h>
+
+#include "nanoarrow/nanoarrow.hpp"
+#include "nanoarrow/nanoarrow_ipc.h"
+
+using namespace arrow;
+
+static enum ArrowIpcEndianness ArrowIpcSystemEndianness(void) {
+ uint32_t check = 1;
+ char first_byte;
+ memcpy(&first_byte, &check, sizeof(char));
+ if (first_byte) {
+ return NANOARROW_IPC_ENDIANNESS_LITTLE;
+ } else {
+ return NANOARROW_IPC_ENDIANNESS_BIG;
+ }
+}
+
+nanoarrow::UniqueSchema to_unique(const Schema& schema) {
+ nanoarrow::UniqueSchema exported;
+ auto st = ExportSchema(schema, exported.get());
+ if (!st.ok()) {
+ exported.reset();
+ }
+ EXPECT_TRUE(st.ok()) << st;
+ return exported;
+}
+
+auto read_schema(const struct ArrowBuffer* buffer) {
+ io::BufferReader reader{std::make_shared<Buffer>(buffer->data,
buffer->size_bytes)};
+ auto maybe_reader = ipc::ReadSchema(&reader, nullptr);
+ EXPECT_TRUE(maybe_reader.ok()) << maybe_reader.status();
+ return std::move(maybe_reader).ValueOr(nullptr);
+}
+
+TEST(NanoarrowIpcTest, NanoarrowIpcEncodeSimpleSchemas) {
+ struct ArrowError error;
+ nanoarrow::UniqueBuffer buffer;
+
+ // null buffer, schema don't segfault
+ EXPECT_EQ(ArrowIpcEncodeSchema(nullptr, to_unique(Schema({})).get(),
&error), EINVAL);
+ EXPECT_EQ(ArrowIpcEncodeSchema(buffer.get(), nullptr, &error), EINVAL);
+
+ FieldVector boring_fields{
Review Comment:
A potentially low-effort way to get a lot of coverage would be to just add
your encoder into the existing decoder tests to check roundtrip (would be
easier if our testing library had a more straightforward schema comparer)
##########
src/nanoarrow/ipc/encoder.c:
##########
@@ -0,0 +1,446 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include <errno.h>
+#include <inttypes.h>
+#include <stdio.h>
+#include <string.h>
+
+// For thread safe shared buffers we need C11 + stdatomic.h
+// Can compile with -DNANOARROW_IPC_USE_STDATOMIC=0 or 1 to override
+// automatic detection
+#if !defined(NANOARROW_IPC_USE_STDATOMIC)
+#define NANOARROW_IPC_USE_STDATOMIC 0
+
+// Check for C11
+#if defined(__STDC_VERSION__) && __STDC_VERSION__ >= 201112L
+
+// Check for GCC 4.8, which doesn't include stdatomic.h but does
+// not define __STDC_NO_ATOMICS__
+#if defined(__clang__) || !defined(__GNUC__) || __GNUC__ >= 5
+
+#if !defined(__STDC_NO_ATOMICS__)
+#include <stdatomic.h>
+#undef NANOARROW_IPC_USE_STDATOMIC
+#define NANOARROW_IPC_USE_STDATOMIC 1
+#endif
+#endif
+#endif
+
+#endif
+
+#include "nanoarrow/ipc/flatcc_generated.h"
+#include "nanoarrow/nanoarrow.h"
+#include "nanoarrow/nanoarrow_ipc.h"
+
+// R 3.6 / Windows builds on a very old toolchain that does not define ENODATA
+#if defined(_WIN32) && !defined(_MSC_VER) && !defined(ENODATA)
+#define ENODATA 120
+#endif
+
+#define ns(x) FLATBUFFERS_WRAP_NAMESPACE(org_apache_arrow_flatbuf, x)
+
+#define FLATCC_RETURN_UNLESS_0(x) \
+ if (ns(x) != 0) return ENOMEM;
+
+static ArrowErrorCode ArrowIpcEncodeFieldType(flatcc_builder_t* builder,
+ const struct ArrowSchemaView*
schema_view,
+ struct ArrowError* error) {
+ switch (schema_view->type) {
+ case NANOARROW_TYPE_NA:
+ FLATCC_RETURN_UNLESS_0(Field_type_Null_create(builder));
+ return NANOARROW_OK;
+
+ case NANOARROW_TYPE_BOOL:
+ FLATCC_RETURN_UNLESS_0(Field_type_Bool_create(builder));
+ return NANOARROW_OK;
+
+ case NANOARROW_TYPE_UINT8:
+ case NANOARROW_TYPE_INT8:
+ FLATCC_RETURN_UNLESS_0(
+ Field_type_Int_create(builder, 8, schema_view->type ==
NANOARROW_TYPE_INT8));
+ return NANOARROW_OK;
+
+ case NANOARROW_TYPE_UINT16:
+ case NANOARROW_TYPE_INT16:
+ FLATCC_RETURN_UNLESS_0(
+ Field_type_Int_create(builder, 16, schema_view->type ==
NANOARROW_TYPE_INT16));
+ return NANOARROW_OK;
+
+ case NANOARROW_TYPE_UINT32:
+ case NANOARROW_TYPE_INT32:
+ FLATCC_RETURN_UNLESS_0(
+ Field_type_Int_create(builder, 32, schema_view->type ==
NANOARROW_TYPE_INT32));
+ return NANOARROW_OK;
+
+ case NANOARROW_TYPE_UINT64:
+ case NANOARROW_TYPE_INT64:
+ FLATCC_RETURN_UNLESS_0(
+ Field_type_Int_create(builder, 64, schema_view->type ==
NANOARROW_TYPE_INT64));
+ return NANOARROW_OK;
+
+ case NANOARROW_TYPE_HALF_FLOAT:
+ FLATCC_RETURN_UNLESS_0(
+ Field_type_FloatingPoint_create(builder, ns(Precision_HALF)));
+ return NANOARROW_OK;
+
+ case NANOARROW_TYPE_FLOAT:
+ FLATCC_RETURN_UNLESS_0(
+ Field_type_FloatingPoint_create(builder, ns(Precision_SINGLE)));
+ return NANOARROW_OK;
+
+ case NANOARROW_TYPE_DOUBLE:
+ FLATCC_RETURN_UNLESS_0(
+ Field_type_FloatingPoint_create(builder, ns(Precision_DOUBLE)));
+ return NANOARROW_OK;
+
+ case NANOARROW_TYPE_DECIMAL128:
+ case NANOARROW_TYPE_DECIMAL256:
+ FLATCC_RETURN_UNLESS_0(Field_type_Decimal_create(
+ builder, schema_view->decimal_precision, schema_view->decimal_scale,
+ schema_view->decimal_bitwidth));
+ return NANOARROW_OK;
+
+ case NANOARROW_TYPE_STRING:
+ FLATCC_RETURN_UNLESS_0(Field_type_Utf8_create(builder));
+ return NANOARROW_OK;
+
+ case NANOARROW_TYPE_LARGE_STRING:
+ FLATCC_RETURN_UNLESS_0(Field_type_LargeUtf8_create(builder));
+ return NANOARROW_OK;
+
+ case NANOARROW_TYPE_BINARY:
+ FLATCC_RETURN_UNLESS_0(Field_type_Binary_create(builder));
+ return NANOARROW_OK;
+
+ case NANOARROW_TYPE_LARGE_BINARY:
+ FLATCC_RETURN_UNLESS_0(Field_type_LargeBinary_create(builder));
+ return NANOARROW_OK;
+
+ case NANOARROW_TYPE_DATE32:
+ FLATCC_RETURN_UNLESS_0(Field_type_Date_create(builder,
ns(DateUnit_DAY)));
+ return NANOARROW_OK;
+
+ case NANOARROW_TYPE_DATE64:
+ FLATCC_RETURN_UNLESS_0(Field_type_Date_create(builder,
ns(DateUnit_MILLISECOND)));
+ return NANOARROW_OK;
+
+ case NANOARROW_TYPE_INTERVAL_MONTHS:
+ FLATCC_RETURN_UNLESS_0(
+ Field_type_Interval_create(builder, ns(IntervalUnit_YEAR_MONTH)));
+ return NANOARROW_OK;
+
+ case NANOARROW_TYPE_INTERVAL_DAY_TIME:
+ FLATCC_RETURN_UNLESS_0(
+ Field_type_Interval_create(builder, ns(IntervalUnit_DAY_TIME)));
+ return NANOARROW_OK;
+
+ case NANOARROW_TYPE_INTERVAL_MONTH_DAY_NANO:
+ FLATCC_RETURN_UNLESS_0(
+ Field_type_Interval_create(builder,
ns(IntervalUnit_MONTH_DAY_NANO)));
+ return NANOARROW_OK;
+
+ case NANOARROW_TYPE_TIMESTAMP:
+ FLATCC_RETURN_UNLESS_0(Field_type_Timestamp_start(builder));
+ FLATCC_RETURN_UNLESS_0(Timestamp_unit_add(builder,
schema_view->time_unit));
+ FLATCC_RETURN_UNLESS_0(
+ Timestamp_timezone_create_str(builder, schema_view->timezone));
+ FLATCC_RETURN_UNLESS_0(Field_type_Timestamp_end(builder));
+ return NANOARROW_OK;
+
+ case NANOARROW_TYPE_TIME32:
+ FLATCC_RETURN_UNLESS_0(Field_type_Time_create(builder,
schema_view->time_unit, 32));
+ return NANOARROW_OK;
+
+ case NANOARROW_TYPE_TIME64:
+ FLATCC_RETURN_UNLESS_0(Field_type_Time_create(builder,
schema_view->time_unit, 64));
+ return NANOARROW_OK;
+
+ case NANOARROW_TYPE_DURATION:
+ FLATCC_RETURN_UNLESS_0(Field_type_Duration_create(builder,
schema_view->time_unit));
+ return NANOARROW_OK;
+
+ case NANOARROW_TYPE_FIXED_SIZE_BINARY:
+ FLATCC_RETURN_UNLESS_0(
+ Field_type_FixedSizeBinary_create(builder, schema_view->fixed_size));
+ return NANOARROW_OK;
+
+ case NANOARROW_TYPE_LIST:
+ FLATCC_RETURN_UNLESS_0(Field_type_List_create(builder));
+ return NANOARROW_OK;
+
+ case NANOARROW_TYPE_LARGE_LIST:
+ FLATCC_RETURN_UNLESS_0(Field_type_LargeList_create(builder));
+ return NANOARROW_OK;
+
+ case NANOARROW_TYPE_FIXED_SIZE_LIST:
+ FLATCC_RETURN_UNLESS_0(
+ Field_type_FixedSizeList_create(builder, schema_view->fixed_size));
+ return NANOARROW_OK;
+
+ case NANOARROW_TYPE_RUN_END_ENCODED:
+ FLATCC_RETURN_UNLESS_0(Field_type_RunEndEncoded_create(builder));
+ return NANOARROW_OK;
+
+ case NANOARROW_TYPE_STRUCT:
+ FLATCC_RETURN_UNLESS_0(Field_type_Struct__create(builder));
+ return NANOARROW_OK;
+
+ case NANOARROW_TYPE_SPARSE_UNION:
+ case NANOARROW_TYPE_DENSE_UNION: {
+ FLATCC_RETURN_UNLESS_0(Field_type_Union_start(builder));
+
+ FLATCC_RETURN_UNLESS_0(
+ Union_mode_add(builder, schema_view->type ==
NANOARROW_TYPE_DENSE_UNION));
+ if (schema_view->union_type_ids) {
+ int8_t type_ids[128];
+ int n = _ArrowParseUnionTypeIds(schema_view->union_type_ids, type_ids);
+ if (n != 0) {
+ FLATCC_RETURN_UNLESS_0(Union_typeIds_start(builder));
+ int32_t* type_ids_32 = (int32_t*)ns(Union_typeIds_extend(builder,
n));
+ if (!type_ids_32) return ENOMEM;
+
+ for (int i = 0; i < n; ++i) {
+ type_ids_32[i] = type_ids[i];
+ }
+ FLATCC_RETURN_UNLESS_0(Union_typeIds_end(builder));
+ }
+ }
+
+ FLATCC_RETURN_UNLESS_0(Field_type_Union_end(builder));
+ return NANOARROW_OK;
+ }
+
+ case NANOARROW_TYPE_MAP:
+ FLATCC_RETURN_UNLESS_0(Field_type_Map_create(
+ builder, schema_view->schema->flags & ARROW_FLAG_MAP_KEYS_SORTED));
+ return NANOARROW_OK;
+
+ case NANOARROW_TYPE_DICTIONARY:
+ ArrowErrorSet(error, "IPC encoding of dictionary types unsupported");
+ return ENOTSUP;
+
+ default:
+ ArrowErrorSet(error, "Expected a valid enum ArrowType value but found
%d",
+ schema_view->type);
+ return EINVAL;
+ }
+}
+
+static ArrowErrorCode ArrowIpcEncodeField(flatcc_builder_t* builder,
+ const struct ArrowSchema* schema,
+ struct ArrowError* error);
+
+static ArrowErrorCode ArrowIpcEncodeMetadata(flatcc_builder_t* builder,
+ const struct ArrowSchema* schema,
+ int
(*push_start)(flatcc_builder_t*),
+ ns(KeyValue_ref_t) *
+
(*push_end)(flatcc_builder_t*),
+ struct ArrowError* error) {
+ struct ArrowMetadataReader metadata;
+ NANOARROW_RETURN_NOT_OK(ArrowMetadataReaderInit(&metadata,
schema->metadata));
+ while (metadata.remaining_keys > 0) {
+ struct ArrowStringView key, value;
+ NANOARROW_RETURN_NOT_OK(ArrowMetadataReaderRead(&metadata, &key, &value));
+ if (push_start(builder) != 0) return ENOMEM;
+ FLATCC_RETURN_UNLESS_0(KeyValue_key_create_strn(builder, key.data,
key.size_bytes));
+ FLATCC_RETURN_UNLESS_0(
+ KeyValue_value_create_strn(builder, value.data, value.size_bytes));
+ if (!push_end(builder)) return ENOMEM;
+ }
+ return NANOARROW_OK;
+}
+
+static ArrowErrorCode ArrowIpcEncodeFields(flatcc_builder_t* builder,
+ const struct ArrowSchema* schema,
+ int
(*push_start)(flatcc_builder_t*),
+ ns(Field_ref_t) *
+ (*push_end)(flatcc_builder_t*),
+ struct ArrowError* error) {
+ for (int i = 0; i < schema->n_children; ++i) {
+ if (push_start(builder) != 0) return ENOMEM;
+ NANOARROW_RETURN_NOT_OK(ArrowIpcEncodeField(builder, schema->children[i],
error));
+ if (!push_end(builder)) return ENOMEM;
+ }
+ return NANOARROW_OK;
+}
+
+static ArrowErrorCode ArrowIpcEncodeField(flatcc_builder_t* builder,
+ const struct ArrowSchema* schema,
+ struct ArrowError* error) {
+ FLATCC_RETURN_UNLESS_0(Field_name_create_str(builder, schema->name));
+ FLATCC_RETURN_UNLESS_0(
+ Field_nullable_add(builder, schema->flags & ARROW_FLAG_NULLABLE));
+
+ struct ArrowSchemaView schema_view;
+ NANOARROW_RETURN_NOT_OK(ArrowSchemaViewInit(&schema_view, schema, error));
+ NANOARROW_RETURN_NOT_OK(ArrowIpcEncodeFieldType(builder, &schema_view,
error));
+
+ if (schema->n_children != 0) {
+ FLATCC_RETURN_UNLESS_0(Field_children_start(builder));
+ NANOARROW_RETURN_NOT_OK(ArrowIpcEncodeFields(builder, schema,
+
&ns(Field_children_push_start),
+ &ns(Field_children_push_end),
error));
+ FLATCC_RETURN_UNLESS_0(Field_children_end(builder));
+ }
+
+ if (schema->metadata) {
+ FLATCC_RETURN_UNLESS_0(Field_custom_metadata_start(builder));
+ NANOARROW_RETURN_NOT_OK(
+ ArrowIpcEncodeMetadata(builder, schema,
&ns(Field_custom_metadata_push_start),
+ &ns(Field_custom_metadata_push_end), error));
+ FLATCC_RETURN_UNLESS_0(Field_custom_metadata_end(builder));
+ }
+ return NANOARROW_OK;
+}
+
+ArrowErrorCode ArrowIpcEncodeSchemaImpl(flatcc_builder_t* builder,
+ const struct ArrowSchema* schema,
+ struct ArrowError* error) {
+ FLATCC_RETURN_UNLESS_0(Message_start_as_root(builder));
+ FLATCC_RETURN_UNLESS_0(Message_version_add(builder, ns(MetadataVersion_V5)));
+
+ FLATCC_RETURN_UNLESS_0(Message_header_Schema_start(builder));
+
+ FLATCC_RETURN_UNLESS_0(Schema_fields_start(builder));
+ NANOARROW_RETURN_NOT_OK(ArrowIpcEncodeFields(builder, schema,
+ &ns(Schema_fields_push_start),
+ &ns(Schema_fields_push_end),
error));
+ FLATCC_RETURN_UNLESS_0(Schema_fields_end(builder));
+
+ if (schema->metadata) {
+ FLATCC_RETURN_UNLESS_0(Schema_custom_metadata_start(builder));
+ NANOARROW_RETURN_NOT_OK(
+ ArrowIpcEncodeMetadata(builder, schema,
&ns(Schema_custom_metadata_push_start),
+ &ns(Schema_custom_metadata_push_end), error));
+ FLATCC_RETURN_UNLESS_0(Schema_custom_metadata_end(builder));
+ }
+
+ FLATCC_RETURN_UNLESS_0(Message_header_Schema_end(builder));
+
+ FLATCC_RETURN_UNLESS_0(Message_bodyLength_add(builder, 0));
+ if (ns(Message_end_as_root(builder)) == 0) return ENOMEM;
+ return NANOARROW_OK;
+}
+
+static ArrowErrorCode ArrowIpcFinalizeFlatccBuffer(struct ArrowBuffer* out,
+ flatcc_builder_t* builder) {
+ ArrowBufferReset(out);
+ ArrowBufferInit(out);
+ size_t size = 0;
+ out->data = (uint8_t*)flatcc_builder_finalize_buffer(builder, &size);
+ out->size_bytes = out->capacity_bytes = (int64_t)size;
+ return out->data ? NANOARROW_OK : ENOMEM;
+}
+
+ArrowErrorCode ArrowIpcEncodeSchema(struct ArrowBuffer* out,
+ const struct ArrowSchema* schema,
+ struct ArrowError* error) {
+ if (!out || !schema || !error) return EINVAL;
+
+ flatcc_builder_t builder;
+ if (flatcc_builder_init(&builder) == -1) {
+ ArrowErrorSet(error, "Failed to initialize flatcc builder");
+ return ESPIPE;
+ }
+
+ ArrowErrorCode status = ArrowIpcEncodeSchemaImpl(&builder, schema, error);
+ if (status == NANOARROW_OK) {
+ status = ArrowIpcFinalizeFlatccBuffer(out, &builder);
+ }
+ flatcc_builder_clear(&builder);
+ return status;
+}
+
+ArrowErrorCode ArrowIpcEncodeEncapsulatedMessage(struct ArrowBuffer* buffer) {
+ if (!buffer) return EINVAL;
+
+ int32_t message_size = (int32_t)buffer->size_bytes;
+ NANOARROW_RETURN_NOT_OK(ArrowBufferResize(buffer, message_size + 16, 0));
+
+ int32_t continuation = -1;
+ memmove(buffer->data + 8, buffer->data, message_size);
+ memcpy(buffer->data + 4, &message_size, sizeof(message_size));
+ memcpy(buffer->data, &continuation, sizeof(continuation));
+ memset(buffer->data + 8 + message_size, 0, 8);
+ return NANOARROW_OK;
+}
+
+ArrowErrorCode ArrowIpcEncodeRecordBatchImpl(flatcc_builder_t* builder,
+ struct ArrowArray* to_be_encoded,
+ ArrowIpcBufferEncodeCallback
encode_buffer,
+ void* user_data, struct
ArrowError* error) {
+ FLATCC_RETURN_UNLESS_0(Message_start_as_root(builder));
+ FLATCC_RETURN_UNLESS_0(Message_version_add(builder, ns(MetadataVersion_V5)));
+
+ int64_t body_length = 0;
+
+ FLATCC_RETURN_UNLESS_0(Message_header_RecordBatch_start(builder));
+
+ // TODO add compression support; should be a parameter to EncodeRecordBatch
since
+ // encoding of buffers is handled by the callback
+ FLATCC_RETURN_UNLESS_0(RecordBatch_length_add(builder,
to_be_encoded->length));
+
+ struct ArrowBuffer buffers, nodes;
+ ArrowBufferInit(&buffers);
+ ArrowBufferInit(&nodes);
+ for (int i = 0; i < to_be_encoded->n_children; ++i) {
+ // FIXME we need the schema too so that we can get array lengths
Review Comment:
This should probably accept a `const ArrowArrayView*` (which already has the
buffer lengths) instead of a `const ArrowArray*`
##########
src/nanoarrow/ipc/encoder.c:
##########
@@ -0,0 +1,446 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include <errno.h>
+#include <inttypes.h>
+#include <stdio.h>
+#include <string.h>
+
+// For thread safe shared buffers we need C11 + stdatomic.h
+// Can compile with -DNANOARROW_IPC_USE_STDATOMIC=0 or 1 to override
+// automatic detection
+#if !defined(NANOARROW_IPC_USE_STDATOMIC)
+#define NANOARROW_IPC_USE_STDATOMIC 0
+
+// Check for C11
+#if defined(__STDC_VERSION__) && __STDC_VERSION__ >= 201112L
+
+// Check for GCC 4.8, which doesn't include stdatomic.h but does
+// not define __STDC_NO_ATOMICS__
+#if defined(__clang__) || !defined(__GNUC__) || __GNUC__ >= 5
+
+#if !defined(__STDC_NO_ATOMICS__)
+#include <stdatomic.h>
+#undef NANOARROW_IPC_USE_STDATOMIC
+#define NANOARROW_IPC_USE_STDATOMIC 1
+#endif
+#endif
+#endif
+
+#endif
+
+#include "nanoarrow/ipc/flatcc_generated.h"
+#include "nanoarrow/nanoarrow.h"
+#include "nanoarrow/nanoarrow_ipc.h"
+
+// R 3.6 / Windows builds on a very old toolchain that does not define ENODATA
+#if defined(_WIN32) && !defined(_MSC_VER) && !defined(ENODATA)
+#define ENODATA 120
+#endif
+
+#define ns(x) FLATBUFFERS_WRAP_NAMESPACE(org_apache_arrow_flatbuf, x)
+
+#define FLATCC_RETURN_UNLESS_0(x) \
+ if (ns(x) != 0) return ENOMEM;
+
+static ArrowErrorCode ArrowIpcEncodeFieldType(flatcc_builder_t* builder,
+ const struct ArrowSchemaView*
schema_view,
+ struct ArrowError* error) {
+ switch (schema_view->type) {
+ case NANOARROW_TYPE_NA:
+ FLATCC_RETURN_UNLESS_0(Field_type_Null_create(builder));
+ return NANOARROW_OK;
+
+ case NANOARROW_TYPE_BOOL:
+ FLATCC_RETURN_UNLESS_0(Field_type_Bool_create(builder));
+ return NANOARROW_OK;
+
+ case NANOARROW_TYPE_UINT8:
+ case NANOARROW_TYPE_INT8:
+ FLATCC_RETURN_UNLESS_0(
+ Field_type_Int_create(builder, 8, schema_view->type ==
NANOARROW_TYPE_INT8));
+ return NANOARROW_OK;
+
+ case NANOARROW_TYPE_UINT16:
+ case NANOARROW_TYPE_INT16:
+ FLATCC_RETURN_UNLESS_0(
+ Field_type_Int_create(builder, 16, schema_view->type ==
NANOARROW_TYPE_INT16));
+ return NANOARROW_OK;
+
+ case NANOARROW_TYPE_UINT32:
+ case NANOARROW_TYPE_INT32:
+ FLATCC_RETURN_UNLESS_0(
+ Field_type_Int_create(builder, 32, schema_view->type ==
NANOARROW_TYPE_INT32));
+ return NANOARROW_OK;
+
+ case NANOARROW_TYPE_UINT64:
+ case NANOARROW_TYPE_INT64:
+ FLATCC_RETURN_UNLESS_0(
+ Field_type_Int_create(builder, 64, schema_view->type ==
NANOARROW_TYPE_INT64));
+ return NANOARROW_OK;
+
+ case NANOARROW_TYPE_HALF_FLOAT:
+ FLATCC_RETURN_UNLESS_0(
+ Field_type_FloatingPoint_create(builder, ns(Precision_HALF)));
+ return NANOARROW_OK;
+
+ case NANOARROW_TYPE_FLOAT:
+ FLATCC_RETURN_UNLESS_0(
+ Field_type_FloatingPoint_create(builder, ns(Precision_SINGLE)));
+ return NANOARROW_OK;
+
+ case NANOARROW_TYPE_DOUBLE:
+ FLATCC_RETURN_UNLESS_0(
+ Field_type_FloatingPoint_create(builder, ns(Precision_DOUBLE)));
+ return NANOARROW_OK;
+
+ case NANOARROW_TYPE_DECIMAL128:
+ case NANOARROW_TYPE_DECIMAL256:
+ FLATCC_RETURN_UNLESS_0(Field_type_Decimal_create(
+ builder, schema_view->decimal_precision, schema_view->decimal_scale,
+ schema_view->decimal_bitwidth));
+ return NANOARROW_OK;
+
+ case NANOARROW_TYPE_STRING:
+ FLATCC_RETURN_UNLESS_0(Field_type_Utf8_create(builder));
+ return NANOARROW_OK;
+
+ case NANOARROW_TYPE_LARGE_STRING:
+ FLATCC_RETURN_UNLESS_0(Field_type_LargeUtf8_create(builder));
+ return NANOARROW_OK;
+
+ case NANOARROW_TYPE_BINARY:
+ FLATCC_RETURN_UNLESS_0(Field_type_Binary_create(builder));
+ return NANOARROW_OK;
+
+ case NANOARROW_TYPE_LARGE_BINARY:
+ FLATCC_RETURN_UNLESS_0(Field_type_LargeBinary_create(builder));
+ return NANOARROW_OK;
+
+ case NANOARROW_TYPE_DATE32:
+ FLATCC_RETURN_UNLESS_0(Field_type_Date_create(builder,
ns(DateUnit_DAY)));
+ return NANOARROW_OK;
+
+ case NANOARROW_TYPE_DATE64:
+ FLATCC_RETURN_UNLESS_0(Field_type_Date_create(builder,
ns(DateUnit_MILLISECOND)));
+ return NANOARROW_OK;
+
+ case NANOARROW_TYPE_INTERVAL_MONTHS:
+ FLATCC_RETURN_UNLESS_0(
+ Field_type_Interval_create(builder, ns(IntervalUnit_YEAR_MONTH)));
+ return NANOARROW_OK;
+
+ case NANOARROW_TYPE_INTERVAL_DAY_TIME:
+ FLATCC_RETURN_UNLESS_0(
+ Field_type_Interval_create(builder, ns(IntervalUnit_DAY_TIME)));
+ return NANOARROW_OK;
+
+ case NANOARROW_TYPE_INTERVAL_MONTH_DAY_NANO:
+ FLATCC_RETURN_UNLESS_0(
+ Field_type_Interval_create(builder,
ns(IntervalUnit_MONTH_DAY_NANO)));
+ return NANOARROW_OK;
+
+ case NANOARROW_TYPE_TIMESTAMP:
+ FLATCC_RETURN_UNLESS_0(Field_type_Timestamp_start(builder));
+ FLATCC_RETURN_UNLESS_0(Timestamp_unit_add(builder,
schema_view->time_unit));
+ FLATCC_RETURN_UNLESS_0(
+ Timestamp_timezone_create_str(builder, schema_view->timezone));
+ FLATCC_RETURN_UNLESS_0(Field_type_Timestamp_end(builder));
+ return NANOARROW_OK;
+
+ case NANOARROW_TYPE_TIME32:
+ FLATCC_RETURN_UNLESS_0(Field_type_Time_create(builder,
schema_view->time_unit, 32));
+ return NANOARROW_OK;
+
+ case NANOARROW_TYPE_TIME64:
+ FLATCC_RETURN_UNLESS_0(Field_type_Time_create(builder,
schema_view->time_unit, 64));
+ return NANOARROW_OK;
+
+ case NANOARROW_TYPE_DURATION:
+ FLATCC_RETURN_UNLESS_0(Field_type_Duration_create(builder,
schema_view->time_unit));
+ return NANOARROW_OK;
+
+ case NANOARROW_TYPE_FIXED_SIZE_BINARY:
+ FLATCC_RETURN_UNLESS_0(
+ Field_type_FixedSizeBinary_create(builder, schema_view->fixed_size));
+ return NANOARROW_OK;
+
+ case NANOARROW_TYPE_LIST:
+ FLATCC_RETURN_UNLESS_0(Field_type_List_create(builder));
+ return NANOARROW_OK;
+
+ case NANOARROW_TYPE_LARGE_LIST:
+ FLATCC_RETURN_UNLESS_0(Field_type_LargeList_create(builder));
+ return NANOARROW_OK;
+
+ case NANOARROW_TYPE_FIXED_SIZE_LIST:
+ FLATCC_RETURN_UNLESS_0(
+ Field_type_FixedSizeList_create(builder, schema_view->fixed_size));
+ return NANOARROW_OK;
+
+ case NANOARROW_TYPE_RUN_END_ENCODED:
+ FLATCC_RETURN_UNLESS_0(Field_type_RunEndEncoded_create(builder));
+ return NANOARROW_OK;
+
+ case NANOARROW_TYPE_STRUCT:
+ FLATCC_RETURN_UNLESS_0(Field_type_Struct__create(builder));
+ return NANOARROW_OK;
+
+ case NANOARROW_TYPE_SPARSE_UNION:
+ case NANOARROW_TYPE_DENSE_UNION: {
+ FLATCC_RETURN_UNLESS_0(Field_type_Union_start(builder));
+
+ FLATCC_RETURN_UNLESS_0(
+ Union_mode_add(builder, schema_view->type ==
NANOARROW_TYPE_DENSE_UNION));
+ if (schema_view->union_type_ids) {
+ int8_t type_ids[128];
+ int n = _ArrowParseUnionTypeIds(schema_view->union_type_ids, type_ids);
+ if (n != 0) {
+ FLATCC_RETURN_UNLESS_0(Union_typeIds_start(builder));
+ int32_t* type_ids_32 = (int32_t*)ns(Union_typeIds_extend(builder,
n));
+ if (!type_ids_32) return ENOMEM;
+
+ for (int i = 0; i < n; ++i) {
+ type_ids_32[i] = type_ids[i];
+ }
+ FLATCC_RETURN_UNLESS_0(Union_typeIds_end(builder));
+ }
+ }
+
+ FLATCC_RETURN_UNLESS_0(Field_type_Union_end(builder));
+ return NANOARROW_OK;
+ }
+
+ case NANOARROW_TYPE_MAP:
+ FLATCC_RETURN_UNLESS_0(Field_type_Map_create(
+ builder, schema_view->schema->flags & ARROW_FLAG_MAP_KEYS_SORTED));
+ return NANOARROW_OK;
+
+ case NANOARROW_TYPE_DICTIONARY:
+ ArrowErrorSet(error, "IPC encoding of dictionary types unsupported");
+ return ENOTSUP;
+
+ default:
+ ArrowErrorSet(error, "Expected a valid enum ArrowType value but found
%d",
+ schema_view->type);
+ return EINVAL;
+ }
+}
+
+static ArrowErrorCode ArrowIpcEncodeField(flatcc_builder_t* builder,
+ const struct ArrowSchema* schema,
+ struct ArrowError* error);
+
+static ArrowErrorCode ArrowIpcEncodeMetadata(flatcc_builder_t* builder,
+ const struct ArrowSchema* schema,
+ int
(*push_start)(flatcc_builder_t*),
+ ns(KeyValue_ref_t) *
+
(*push_end)(flatcc_builder_t*),
+ struct ArrowError* error) {
+ struct ArrowMetadataReader metadata;
+ NANOARROW_RETURN_NOT_OK(ArrowMetadataReaderInit(&metadata,
schema->metadata));
+ while (metadata.remaining_keys > 0) {
+ struct ArrowStringView key, value;
+ NANOARROW_RETURN_NOT_OK(ArrowMetadataReaderRead(&metadata, &key, &value));
+ if (push_start(builder) != 0) return ENOMEM;
+ FLATCC_RETURN_UNLESS_0(KeyValue_key_create_strn(builder, key.data,
key.size_bytes));
+ FLATCC_RETURN_UNLESS_0(
+ KeyValue_value_create_strn(builder, value.data, value.size_bytes));
+ if (!push_end(builder)) return ENOMEM;
+ }
+ return NANOARROW_OK;
+}
+
+static ArrowErrorCode ArrowIpcEncodeFields(flatcc_builder_t* builder,
+ const struct ArrowSchema* schema,
+ int
(*push_start)(flatcc_builder_t*),
+ ns(Field_ref_t) *
+ (*push_end)(flatcc_builder_t*),
+ struct ArrowError* error) {
+ for (int i = 0; i < schema->n_children; ++i) {
+ if (push_start(builder) != 0) return ENOMEM;
+ NANOARROW_RETURN_NOT_OK(ArrowIpcEncodeField(builder, schema->children[i],
error));
+ if (!push_end(builder)) return ENOMEM;
+ }
+ return NANOARROW_OK;
+}
+
+static ArrowErrorCode ArrowIpcEncodeField(flatcc_builder_t* builder,
+ const struct ArrowSchema* schema,
+ struct ArrowError* error) {
+ FLATCC_RETURN_UNLESS_0(Field_name_create_str(builder, schema->name));
+ FLATCC_RETURN_UNLESS_0(
+ Field_nullable_add(builder, schema->flags & ARROW_FLAG_NULLABLE));
+
+ struct ArrowSchemaView schema_view;
+ NANOARROW_RETURN_NOT_OK(ArrowSchemaViewInit(&schema_view, schema, error));
+ NANOARROW_RETURN_NOT_OK(ArrowIpcEncodeFieldType(builder, &schema_view,
error));
+
+ if (schema->n_children != 0) {
+ FLATCC_RETURN_UNLESS_0(Field_children_start(builder));
+ NANOARROW_RETURN_NOT_OK(ArrowIpcEncodeFields(builder, schema,
+
&ns(Field_children_push_start),
+ &ns(Field_children_push_end),
error));
+ FLATCC_RETURN_UNLESS_0(Field_children_end(builder));
+ }
+
+ if (schema->metadata) {
+ FLATCC_RETURN_UNLESS_0(Field_custom_metadata_start(builder));
+ NANOARROW_RETURN_NOT_OK(
+ ArrowIpcEncodeMetadata(builder, schema,
&ns(Field_custom_metadata_push_start),
+ &ns(Field_custom_metadata_push_end), error));
+ FLATCC_RETURN_UNLESS_0(Field_custom_metadata_end(builder));
+ }
+ return NANOARROW_OK;
+}
+
+ArrowErrorCode ArrowIpcEncodeSchemaImpl(flatcc_builder_t* builder,
+ const struct ArrowSchema* schema,
+ struct ArrowError* error) {
+ FLATCC_RETURN_UNLESS_0(Message_start_as_root(builder));
+ FLATCC_RETURN_UNLESS_0(Message_version_add(builder, ns(MetadataVersion_V5)));
+
+ FLATCC_RETURN_UNLESS_0(Message_header_Schema_start(builder));
+
+ FLATCC_RETURN_UNLESS_0(Schema_fields_start(builder));
+ NANOARROW_RETURN_NOT_OK(ArrowIpcEncodeFields(builder, schema,
+ &ns(Schema_fields_push_start),
+ &ns(Schema_fields_push_end),
error));
+ FLATCC_RETURN_UNLESS_0(Schema_fields_end(builder));
+
+ if (schema->metadata) {
+ FLATCC_RETURN_UNLESS_0(Schema_custom_metadata_start(builder));
+ NANOARROW_RETURN_NOT_OK(
+ ArrowIpcEncodeMetadata(builder, schema,
&ns(Schema_custom_metadata_push_start),
+ &ns(Schema_custom_metadata_push_end), error));
+ FLATCC_RETURN_UNLESS_0(Schema_custom_metadata_end(builder));
+ }
+
+ FLATCC_RETURN_UNLESS_0(Message_header_Schema_end(builder));
+
+ FLATCC_RETURN_UNLESS_0(Message_bodyLength_add(builder, 0));
+ if (ns(Message_end_as_root(builder)) == 0) return ENOMEM;
+ return NANOARROW_OK;
+}
+
+static ArrowErrorCode ArrowIpcFinalizeFlatccBuffer(struct ArrowBuffer* out,
+ flatcc_builder_t* builder) {
+ ArrowBufferReset(out);
+ ArrowBufferInit(out);
+ size_t size = 0;
+ out->data = (uint8_t*)flatcc_builder_finalize_buffer(builder, &size);
+ out->size_bytes = out->capacity_bytes = (int64_t)size;
+ return out->data ? NANOARROW_OK : ENOMEM;
+}
+
+ArrowErrorCode ArrowIpcEncodeSchema(struct ArrowBuffer* out,
+ const struct ArrowSchema* schema,
+ struct ArrowError* error) {
+ if (!out || !schema || !error) return EINVAL;
+
+ flatcc_builder_t builder;
+ if (flatcc_builder_init(&builder) == -1) {
+ ArrowErrorSet(error, "Failed to initialize flatcc builder");
+ return ESPIPE;
+ }
+
+ ArrowErrorCode status = ArrowIpcEncodeSchemaImpl(&builder, schema, error);
+ if (status == NANOARROW_OK) {
+ status = ArrowIpcFinalizeFlatccBuffer(out, &builder);
+ }
+ flatcc_builder_clear(&builder);
+ return status;
+}
+
+ArrowErrorCode ArrowIpcEncodeEncapsulatedMessage(struct ArrowBuffer* buffer) {
+ if (!buffer) return EINVAL;
+
+ int32_t message_size = (int32_t)buffer->size_bytes;
+ NANOARROW_RETURN_NOT_OK(ArrowBufferResize(buffer, message_size + 16, 0));
+
+ int32_t continuation = -1;
+ memmove(buffer->data + 8, buffer->data, message_size);
+ memcpy(buffer->data + 4, &message_size, sizeof(message_size));
+ memcpy(buffer->data, &continuation, sizeof(continuation));
+ memset(buffer->data + 8 + message_size, 0, 8);
+ return NANOARROW_OK;
+}
+
+ArrowErrorCode ArrowIpcEncodeRecordBatchImpl(flatcc_builder_t* builder,
+ struct ArrowArray* to_be_encoded,
+ ArrowIpcBufferEncodeCallback
encode_buffer,
+ void* user_data, struct
ArrowError* error) {
+ FLATCC_RETURN_UNLESS_0(Message_start_as_root(builder));
+ FLATCC_RETURN_UNLESS_0(Message_version_add(builder, ns(MetadataVersion_V5)));
+
+ int64_t body_length = 0;
+
+ FLATCC_RETURN_UNLESS_0(Message_header_RecordBatch_start(builder));
+
+ // TODO add compression support; should be a parameter to EncodeRecordBatch
since
+ // encoding of buffers is handled by the callback
Review Comment:
I think this is a good argument for an `ArrowIpcEcoder` (since compression
and dictionary support will both result in extra arguments getting added here,
and potentially cached state)
##########
src/nanoarrow/ipc/encoder.c:
##########
@@ -0,0 +1,446 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include <errno.h>
+#include <inttypes.h>
+#include <stdio.h>
+#include <string.h>
+
+// For thread safe shared buffers we need C11 + stdatomic.h
+// Can compile with -DNANOARROW_IPC_USE_STDATOMIC=0 or 1 to override
+// automatic detection
+#if !defined(NANOARROW_IPC_USE_STDATOMIC)
+#define NANOARROW_IPC_USE_STDATOMIC 0
+
+// Check for C11
+#if defined(__STDC_VERSION__) && __STDC_VERSION__ >= 201112L
+
+// Check for GCC 4.8, which doesn't include stdatomic.h but does
+// not define __STDC_NO_ATOMICS__
+#if defined(__clang__) || !defined(__GNUC__) || __GNUC__ >= 5
+
+#if !defined(__STDC_NO_ATOMICS__)
+#include <stdatomic.h>
+#undef NANOARROW_IPC_USE_STDATOMIC
+#define NANOARROW_IPC_USE_STDATOMIC 1
+#endif
+#endif
+#endif
+
+#endif
+
+#include "nanoarrow/ipc/flatcc_generated.h"
+#include "nanoarrow/nanoarrow.h"
+#include "nanoarrow/nanoarrow_ipc.h"
+
+// R 3.6 / Windows builds on a very old toolchain that does not define ENODATA
+#if defined(_WIN32) && !defined(_MSC_VER) && !defined(ENODATA)
+#define ENODATA 120
+#endif
+
+#define ns(x) FLATBUFFERS_WRAP_NAMESPACE(org_apache_arrow_flatbuf, x)
+
+#define FLATCC_RETURN_UNLESS_0(x) \
+ if (ns(x) != 0) return ENOMEM;
+
+static ArrowErrorCode ArrowIpcEncodeFieldType(flatcc_builder_t* builder,
+ const struct ArrowSchemaView*
schema_view,
+ struct ArrowError* error) {
+ switch (schema_view->type) {
+ case NANOARROW_TYPE_NA:
+ FLATCC_RETURN_UNLESS_0(Field_type_Null_create(builder));
+ return NANOARROW_OK;
+
+ case NANOARROW_TYPE_BOOL:
+ FLATCC_RETURN_UNLESS_0(Field_type_Bool_create(builder));
+ return NANOARROW_OK;
+
+ case NANOARROW_TYPE_UINT8:
+ case NANOARROW_TYPE_INT8:
+ FLATCC_RETURN_UNLESS_0(
+ Field_type_Int_create(builder, 8, schema_view->type ==
NANOARROW_TYPE_INT8));
+ return NANOARROW_OK;
+
+ case NANOARROW_TYPE_UINT16:
+ case NANOARROW_TYPE_INT16:
+ FLATCC_RETURN_UNLESS_0(
+ Field_type_Int_create(builder, 16, schema_view->type ==
NANOARROW_TYPE_INT16));
+ return NANOARROW_OK;
+
+ case NANOARROW_TYPE_UINT32:
+ case NANOARROW_TYPE_INT32:
+ FLATCC_RETURN_UNLESS_0(
+ Field_type_Int_create(builder, 32, schema_view->type ==
NANOARROW_TYPE_INT32));
+ return NANOARROW_OK;
+
+ case NANOARROW_TYPE_UINT64:
+ case NANOARROW_TYPE_INT64:
+ FLATCC_RETURN_UNLESS_0(
+ Field_type_Int_create(builder, 64, schema_view->type ==
NANOARROW_TYPE_INT64));
+ return NANOARROW_OK;
+
+ case NANOARROW_TYPE_HALF_FLOAT:
+ FLATCC_RETURN_UNLESS_0(
+ Field_type_FloatingPoint_create(builder, ns(Precision_HALF)));
+ return NANOARROW_OK;
+
+ case NANOARROW_TYPE_FLOAT:
+ FLATCC_RETURN_UNLESS_0(
+ Field_type_FloatingPoint_create(builder, ns(Precision_SINGLE)));
+ return NANOARROW_OK;
+
+ case NANOARROW_TYPE_DOUBLE:
+ FLATCC_RETURN_UNLESS_0(
+ Field_type_FloatingPoint_create(builder, ns(Precision_DOUBLE)));
+ return NANOARROW_OK;
+
+ case NANOARROW_TYPE_DECIMAL128:
+ case NANOARROW_TYPE_DECIMAL256:
+ FLATCC_RETURN_UNLESS_0(Field_type_Decimal_create(
+ builder, schema_view->decimal_precision, schema_view->decimal_scale,
+ schema_view->decimal_bitwidth));
+ return NANOARROW_OK;
+
+ case NANOARROW_TYPE_STRING:
+ FLATCC_RETURN_UNLESS_0(Field_type_Utf8_create(builder));
+ return NANOARROW_OK;
+
+ case NANOARROW_TYPE_LARGE_STRING:
+ FLATCC_RETURN_UNLESS_0(Field_type_LargeUtf8_create(builder));
+ return NANOARROW_OK;
+
+ case NANOARROW_TYPE_BINARY:
+ FLATCC_RETURN_UNLESS_0(Field_type_Binary_create(builder));
+ return NANOARROW_OK;
+
+ case NANOARROW_TYPE_LARGE_BINARY:
+ FLATCC_RETURN_UNLESS_0(Field_type_LargeBinary_create(builder));
+ return NANOARROW_OK;
+
+ case NANOARROW_TYPE_DATE32:
+ FLATCC_RETURN_UNLESS_0(Field_type_Date_create(builder,
ns(DateUnit_DAY)));
+ return NANOARROW_OK;
+
+ case NANOARROW_TYPE_DATE64:
+ FLATCC_RETURN_UNLESS_0(Field_type_Date_create(builder,
ns(DateUnit_MILLISECOND)));
+ return NANOARROW_OK;
+
+ case NANOARROW_TYPE_INTERVAL_MONTHS:
+ FLATCC_RETURN_UNLESS_0(
+ Field_type_Interval_create(builder, ns(IntervalUnit_YEAR_MONTH)));
+ return NANOARROW_OK;
+
+ case NANOARROW_TYPE_INTERVAL_DAY_TIME:
+ FLATCC_RETURN_UNLESS_0(
+ Field_type_Interval_create(builder, ns(IntervalUnit_DAY_TIME)));
+ return NANOARROW_OK;
+
+ case NANOARROW_TYPE_INTERVAL_MONTH_DAY_NANO:
+ FLATCC_RETURN_UNLESS_0(
+ Field_type_Interval_create(builder,
ns(IntervalUnit_MONTH_DAY_NANO)));
+ return NANOARROW_OK;
+
+ case NANOARROW_TYPE_TIMESTAMP:
+ FLATCC_RETURN_UNLESS_0(Field_type_Timestamp_start(builder));
+ FLATCC_RETURN_UNLESS_0(Timestamp_unit_add(builder,
schema_view->time_unit));
+ FLATCC_RETURN_UNLESS_0(
+ Timestamp_timezone_create_str(builder, schema_view->timezone));
+ FLATCC_RETURN_UNLESS_0(Field_type_Timestamp_end(builder));
+ return NANOARROW_OK;
+
+ case NANOARROW_TYPE_TIME32:
+ FLATCC_RETURN_UNLESS_0(Field_type_Time_create(builder,
schema_view->time_unit, 32));
+ return NANOARROW_OK;
+
+ case NANOARROW_TYPE_TIME64:
+ FLATCC_RETURN_UNLESS_0(Field_type_Time_create(builder,
schema_view->time_unit, 64));
+ return NANOARROW_OK;
+
+ case NANOARROW_TYPE_DURATION:
+ FLATCC_RETURN_UNLESS_0(Field_type_Duration_create(builder,
schema_view->time_unit));
+ return NANOARROW_OK;
+
+ case NANOARROW_TYPE_FIXED_SIZE_BINARY:
+ FLATCC_RETURN_UNLESS_0(
+ Field_type_FixedSizeBinary_create(builder, schema_view->fixed_size));
+ return NANOARROW_OK;
+
+ case NANOARROW_TYPE_LIST:
+ FLATCC_RETURN_UNLESS_0(Field_type_List_create(builder));
+ return NANOARROW_OK;
+
+ case NANOARROW_TYPE_LARGE_LIST:
+ FLATCC_RETURN_UNLESS_0(Field_type_LargeList_create(builder));
+ return NANOARROW_OK;
+
+ case NANOARROW_TYPE_FIXED_SIZE_LIST:
+ FLATCC_RETURN_UNLESS_0(
+ Field_type_FixedSizeList_create(builder, schema_view->fixed_size));
+ return NANOARROW_OK;
+
+ case NANOARROW_TYPE_RUN_END_ENCODED:
+ FLATCC_RETURN_UNLESS_0(Field_type_RunEndEncoded_create(builder));
+ return NANOARROW_OK;
+
+ case NANOARROW_TYPE_STRUCT:
+ FLATCC_RETURN_UNLESS_0(Field_type_Struct__create(builder));
+ return NANOARROW_OK;
+
+ case NANOARROW_TYPE_SPARSE_UNION:
+ case NANOARROW_TYPE_DENSE_UNION: {
+ FLATCC_RETURN_UNLESS_0(Field_type_Union_start(builder));
+
+ FLATCC_RETURN_UNLESS_0(
+ Union_mode_add(builder, schema_view->type ==
NANOARROW_TYPE_DENSE_UNION));
+ if (schema_view->union_type_ids) {
+ int8_t type_ids[128];
+ int n = _ArrowParseUnionTypeIds(schema_view->union_type_ids, type_ids);
+ if (n != 0) {
+ FLATCC_RETURN_UNLESS_0(Union_typeIds_start(builder));
+ int32_t* type_ids_32 = (int32_t*)ns(Union_typeIds_extend(builder,
n));
+ if (!type_ids_32) return ENOMEM;
+
+ for (int i = 0; i < n; ++i) {
+ type_ids_32[i] = type_ids[i];
+ }
+ FLATCC_RETURN_UNLESS_0(Union_typeIds_end(builder));
+ }
+ }
+
+ FLATCC_RETURN_UNLESS_0(Field_type_Union_end(builder));
+ return NANOARROW_OK;
+ }
+
+ case NANOARROW_TYPE_MAP:
+ FLATCC_RETURN_UNLESS_0(Field_type_Map_create(
+ builder, schema_view->schema->flags & ARROW_FLAG_MAP_KEYS_SORTED));
+ return NANOARROW_OK;
+
+ case NANOARROW_TYPE_DICTIONARY:
+ ArrowErrorSet(error, "IPC encoding of dictionary types unsupported");
+ return ENOTSUP;
+
+ default:
+ ArrowErrorSet(error, "Expected a valid enum ArrowType value but found
%d",
+ schema_view->type);
+ return EINVAL;
+ }
+}
+
+static ArrowErrorCode ArrowIpcEncodeField(flatcc_builder_t* builder,
+ const struct ArrowSchema* schema,
+ struct ArrowError* error);
+
+static ArrowErrorCode ArrowIpcEncodeMetadata(flatcc_builder_t* builder,
+ const struct ArrowSchema* schema,
+ int
(*push_start)(flatcc_builder_t*),
+ ns(KeyValue_ref_t) *
+
(*push_end)(flatcc_builder_t*),
+ struct ArrowError* error) {
+ struct ArrowMetadataReader metadata;
+ NANOARROW_RETURN_NOT_OK(ArrowMetadataReaderInit(&metadata,
schema->metadata));
+ while (metadata.remaining_keys > 0) {
+ struct ArrowStringView key, value;
+ NANOARROW_RETURN_NOT_OK(ArrowMetadataReaderRead(&metadata, &key, &value));
+ if (push_start(builder) != 0) return ENOMEM;
+ FLATCC_RETURN_UNLESS_0(KeyValue_key_create_strn(builder, key.data,
key.size_bytes));
+ FLATCC_RETURN_UNLESS_0(
+ KeyValue_value_create_strn(builder, value.data, value.size_bytes));
+ if (!push_end(builder)) return ENOMEM;
+ }
+ return NANOARROW_OK;
+}
+
+static ArrowErrorCode ArrowIpcEncodeFields(flatcc_builder_t* builder,
+ const struct ArrowSchema* schema,
+ int
(*push_start)(flatcc_builder_t*),
+ ns(Field_ref_t) *
+ (*push_end)(flatcc_builder_t*),
+ struct ArrowError* error) {
+ for (int i = 0; i < schema->n_children; ++i) {
+ if (push_start(builder) != 0) return ENOMEM;
+ NANOARROW_RETURN_NOT_OK(ArrowIpcEncodeField(builder, schema->children[i],
error));
+ if (!push_end(builder)) return ENOMEM;
+ }
+ return NANOARROW_OK;
+}
+
+static ArrowErrorCode ArrowIpcEncodeField(flatcc_builder_t* builder,
+ const struct ArrowSchema* schema,
+ struct ArrowError* error) {
+ FLATCC_RETURN_UNLESS_0(Field_name_create_str(builder, schema->name));
+ FLATCC_RETURN_UNLESS_0(
+ Field_nullable_add(builder, schema->flags & ARROW_FLAG_NULLABLE));
+
+ struct ArrowSchemaView schema_view;
+ NANOARROW_RETURN_NOT_OK(ArrowSchemaViewInit(&schema_view, schema, error));
+ NANOARROW_RETURN_NOT_OK(ArrowIpcEncodeFieldType(builder, &schema_view,
error));
+
+ if (schema->n_children != 0) {
+ FLATCC_RETURN_UNLESS_0(Field_children_start(builder));
+ NANOARROW_RETURN_NOT_OK(ArrowIpcEncodeFields(builder, schema,
+
&ns(Field_children_push_start),
+ &ns(Field_children_push_end),
error));
+ FLATCC_RETURN_UNLESS_0(Field_children_end(builder));
+ }
+
+ if (schema->metadata) {
+ FLATCC_RETURN_UNLESS_0(Field_custom_metadata_start(builder));
+ NANOARROW_RETURN_NOT_OK(
+ ArrowIpcEncodeMetadata(builder, schema,
&ns(Field_custom_metadata_push_start),
+ &ns(Field_custom_metadata_push_end), error));
+ FLATCC_RETURN_UNLESS_0(Field_custom_metadata_end(builder));
+ }
+ return NANOARROW_OK;
+}
+
+ArrowErrorCode ArrowIpcEncodeSchemaImpl(flatcc_builder_t* builder,
Review Comment:
```suggestion
static ArrowErrorCode ArrowIpcEncodeSchemaImpl(flatcc_builder_t* builder,
```
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]