lidavidm commented on code in PR #61: URL: https://github.com/apache/arrow-nanoarrow/pull/61#discussion_r1067352883
########## extensions/nanoarrow_ipc/src/nanoarrow_ipc/nanoarrow_ipc.c: ########## @@ -0,0 +1,297 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#include <errno.h> +#include <string.h> + +#include "nanoarrow/nanoarrow.h" + +#include "File_reader.h" +#include "Message_reader.h" +#include "Schema_reader.h" + +#include "nanoarrow_ipc.h" + +#define ArrowIpcErrorSet(err, ...) ArrowErrorSet((struct ArrowError*)err, __VA_ARGS__) + +void ArrowIpcReaderInit(struct ArrowIpcReader* reader) { + memset(reader, 0, sizeof(struct ArrowIpcReader)); +} + +void ArrowIpcReaderReset(struct ArrowIpcReader* reader) { + if (reader->schema.release != NULL) { + reader->schema.release(&reader->schema); + } + + if (reader->batch_index.release != NULL) { + reader->batch_index.release(&reader->batch_index); + } + + ArrowIpcReaderInit(reader); +} + +static inline uint32_t ArrowIpcReadUint32LE(struct ArrowIpcBufferView* data) { + uint32_t value; + memcpy(&value, data->data, sizeof(uint32_t)); + // bswap32() if big endian + data->data += sizeof(uint32_t); + data->size_bytes -= sizeof(uint32_t); + return value; +} + +static inline int32_t ArrowIpcReadInt32LE(struct ArrowIpcBufferView* data) { + int32_t value; + memcpy(&value, data->data, sizeof(int32_t)); + // bswap32() if big endian + data->data += sizeof(int32_t); + data->size_bytes -= sizeof(int32_t); + return value; +} + +#undef ns +#define ns(x) FLATBUFFERS_WRAP_NAMESPACE(org_apache_arrow_flatbuf, x) + +static int ArrowIpcReaderDecodeSchema(struct ArrowIpcReader* reader, + flatbuffers_generic_t message_header, + struct ArrowIpcError* error) { + ns(Schema_table_t) schema = (ns(Schema_table_t))message_header; + int endianness = ns(Schema_endianness(schema)); + switch (endianness) { + case ns(Endianness_Little): + reader->endianness = NANOARROW_IPC_ENDIANNESS_LITTLE; + break; + case ns(Endianness_Big): + reader->endianness = NANOARROW_IPC_ENDIANNESS_BIG; + break; + default: + ArrowIpcErrorSet(error, + "Expected Schema endianness of 0 (little) or 1 (big) but got %d", + (int)endianness); + } + + ns(Feature_vec_t) features = ns(Schema_features(schema)); + int64_t n_features = ns(Feature_vec_len(features)); + reader->features = 0; + + for (int64_t i = 0; i < n_features; i++) { + int feature = ns(Feature_vec_at(features, i)); + switch (feature) { + case ns(Feature_COMPRESSED_BODY): + reader->features |= NANOARROW_IPC_FEATURE_COMPRESSED_BODY; + break; + case ns(Feature_DICTIONARY_REPLACEMENT): + reader->features |= NANOARROW_IPC_FEATURE_DICTIONARY_REPLACEMENT; + break; + default: + ArrowIpcErrorSet(error, "Unrecognized Schema feature with value %d", + (int)feature); + return EINVAL; + } + } + + ns(Field_vec_t) fields = ns(Schema_fields(schema)); + int64_t n_fields = ns(Schema_vec_len(fields)); + if (reader->schema.release != NULL) { + reader->schema.release(&reader->schema); + } + + ArrowSchemaInit(&reader->schema); + int result = ArrowSchemaSetTypeStruct(&reader->schema, n_fields); + if (result != NANOARROW_OK) { + ArrowIpcErrorSet(error, "Failed to allocate struct schema with %ld children", + (long)n_fields); + return result; + } + + for (int64_t i = 0; i < n_fields; i++) { + ns(Field_table_t) field = ns(Field_vec_at(fields, i)); + struct ArrowSchema* schema = reader->schema.children[i]; + + if (ns(Field_name_is_present(field))) { + result = ArrowSchemaSetName(schema, ns(Field_name_get(field))); + } else { + result = ArrowSchemaSetName(schema, ""); + } + + if (result != NANOARROW_OK) { + ArrowIpcErrorSet(error, "ArrowSchemaSetName() failed for schema field %ld", + (long)i); + return result; + } + + if (ns(Field_nullable_get(field))) { + schema->flags |= ARROW_FLAG_NULLABLE; + } + + int type_type = ns(Field_type_type(field)); + switch (type_type) { + case ns(Type_Null): + result = ArrowSchemaSetType(schema, NANOARROW_TYPE_NA); + break; + case ns(Type_Int): { + ns(Int_table_t) field_int = (ns(Int_table_t))ns(Field_type_get(field)); + + int is_signed = ns(Int_is_signed_get(field_int)); + int bitwidth = ns(Int_bitWidth_get(field_int)); + int nanoarrow_type = NANOARROW_TYPE_UNINITIALIZED; + + if (is_signed) { + switch (bitwidth) { + case 8: + nanoarrow_type = NANOARROW_TYPE_INT8; + break; + case 16: + nanoarrow_type = NANOARROW_TYPE_INT16; + break; + case 32: + nanoarrow_type = NANOARROW_TYPE_INT32; + break; + case 64: + nanoarrow_type = NANOARROW_TYPE_INT64; + break; + default: + break; + } + } else { + switch (bitwidth) { + case 8: + nanoarrow_type = NANOARROW_TYPE_UINT8; + break; + case 16: + nanoarrow_type = NANOARROW_TYPE_UINT16; + break; + case 32: + nanoarrow_type = NANOARROW_TYPE_UINT32; + break; + case 64: + nanoarrow_type = NANOARROW_TYPE_UINT64; + break; + default: + break; + } + } + + if (nanoarrow_type == NANOARROW_TYPE_UNINITIALIZED) { + result = EINVAL; + } else { + result = ArrowSchemaSetType(schema, nanoarrow_type); + } + + break; + } + case ns(Type_Utf8): + result = ArrowSchemaSetType(schema, NANOARROW_TYPE_STRING); + case ns(Type_LargeUtf8): + result = ArrowSchemaSetType(schema, NANOARROW_TYPE_LARGE_STRING); + break; + default: + ArrowIpcErrorSet(error, "Unrecognized Field type with value %d", (int)type_type); + return EINVAL; Review Comment: nit, but is there a reason to mix early-return and single-return error handling here? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected]
