This is an automated email from the ASF dual-hosted git repository.
paleolimbot pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/arrow-nanoarrow.git
The following commit(s) were added to refs/heads/main by this push:
new 6437f59 feat(extensions/nanoarrow_ipc): Decode RecordBatch message to
ArrowArray (#143)
6437f59 is described below
commit 6437f59dc45509c78a1d9403942d6d9d3e85895b
Author: Dewey Dunnington <[email protected]>
AuthorDate: Wed Mar 8 15:13:33 2023 -0400
feat(extensions/nanoarrow_ipc): Decode RecordBatch message to ArrowArray
(#143)
Closes #91.
After this PR you should be able to decode any (!!) record batch message
into a `struct ArrowArray`:
```c++
ArrowIpcReaderInit(&reader);
// Set a schema
ArrowIpcReaderSetSchema(&reader, some_schema);
// Decode the header and advance the data pointer
ArrowIpcReaderDecode(&reader, data, &error);
buffer_view.data.as_uint8 += data.header_size_bytes;
buffer_view.size_bytes -= data.header_size_bytes;
// Can decode one field at a time or the whole struct by passing -1
ArrowIpcReaderGetArray(&reader, data, -1, &array, &error);
```
---
.../nanoarrow_ipc/src/nanoarrow/nanoarrow_ipc.c | 703 ++++++++++++++++-----
.../nanoarrow_ipc/src/nanoarrow/nanoarrow_ipc.h | 178 +++++-
.../src/nanoarrow/nanoarrow_ipc_test.cc | 373 +++++++++--
3 files changed, 1014 insertions(+), 240 deletions(-)
diff --git a/extensions/nanoarrow_ipc/src/nanoarrow/nanoarrow_ipc.c
b/extensions/nanoarrow_ipc/src/nanoarrow/nanoarrow_ipc.c
index b2d7cbe..3c0f371 100644
--- a/extensions/nanoarrow_ipc/src/nanoarrow/nanoarrow_ipc.c
+++ b/extensions/nanoarrow_ipc/src/nanoarrow/nanoarrow_ipc.c
@@ -23,6 +23,19 @@
#include "nanoarrow_ipc.h"
#include "nanoarrow_ipc_flatcc_generated.h"
+struct ArrowIpcField {
+ struct ArrowArrayView* array_view;
+ int64_t buffer_offset;
+};
+
+struct ArrowIpcDecoderPrivate {
+ struct ArrowArrayView array_view;
+ int64_t n_fields;
+ struct ArrowIpcField* fields;
+ int64_t n_buffers;
+ const void* last_message;
+};
+
ArrowErrorCode ArrowIpcCheckRuntime(struct ArrowError* error) {
const char* nanoarrow_runtime_version = ArrowNanoarrowVersion();
const char* nanoarrow_ipc_build_time_version = NANOARROW_VERSION;
@@ -36,16 +49,32 @@ ArrowErrorCode ArrowIpcCheckRuntime(struct ArrowError*
error) {
return NANOARROW_OK;
}
-void ArrowIpcReaderInit(struct ArrowIpcReader* reader) {
- memset(reader, 0, sizeof(struct ArrowIpcReader));
+ArrowErrorCode ArrowIpcDecoderInit(struct ArrowIpcDecoder* decoder) {
+ memset(decoder, 0, sizeof(struct ArrowIpcDecoder));
+ struct ArrowIpcDecoderPrivate* private_data =
+ (struct ArrowIpcDecoderPrivate*)ArrowMalloc(sizeof(struct
ArrowIpcDecoderPrivate));
+ if (private_data == NULL) {
+ return ENOMEM;
+ }
+
+ memset(private_data, 0, sizeof(struct ArrowIpcDecoderPrivate));
+ decoder->private_data = private_data;
+ return NANOARROW_OK;
}
-void ArrowIpcReaderReset(struct ArrowIpcReader* reader) {
- if (reader->schema.release != NULL) {
- reader->schema.release(&reader->schema);
+void ArrowIpcDecoderReset(struct ArrowIpcDecoder* decoder) {
+ struct ArrowIpcDecoderPrivate* private_data =
+ (struct ArrowIpcDecoderPrivate*)decoder->private_data;
+
+ ArrowArrayViewReset(&private_data->array_view);
+
+ if (private_data->fields != NULL) {
+ ArrowFree(private_data->fields);
+ private_data->n_fields = 0;
}
- ArrowIpcReaderInit(reader);
+ ArrowFree(private_data);
+ memset(decoder, 0, sizeof(struct ArrowIpcDecoder));
}
static inline uint32_t ArrowIpcReadUint32LE(struct ArrowBufferView* data) {
@@ -68,9 +97,9 @@ static inline int32_t ArrowIpcReadInt32LE(struct
ArrowBufferView* data) {
#define ns(x) FLATBUFFERS_WRAP_NAMESPACE(org_apache_arrow_flatbuf, x)
-static int ArrowIpcReaderSetMetadata(struct ArrowSchema* schema,
- ns(KeyValue_vec_t) kv_vec,
- struct ArrowError* error) {
+static int ArrowIpcDecoderSetMetadata(struct ArrowSchema* schema,
+ ns(KeyValue_vec_t) kv_vec,
+ struct ArrowError* error) {
int64_t n_pairs = ns(KeyValue_vec_len(kv_vec));
if (n_pairs == 0) {
return NANOARROW_OK;
@@ -121,8 +150,8 @@ static int ArrowIpcReaderSetMetadata(struct ArrowSchema*
schema,
return NANOARROW_OK;
}
-static int ArrowIpcReaderSetTypeSimple(struct ArrowSchema* schema, int
nanoarrow_type,
- struct ArrowError* error) {
+static int ArrowIpcDecoderSetTypeSimple(struct ArrowSchema* schema, int
nanoarrow_type,
+ struct ArrowError* error) {
int result = ArrowSchemaSetType(schema, nanoarrow_type);
if (result != NANOARROW_OK) {
ArrowErrorSet(error, "ArrowSchemaSetType() failed for type %s",
@@ -133,9 +162,9 @@ static int ArrowIpcReaderSetTypeSimple(struct ArrowSchema*
schema, int nanoarrow
return NANOARROW_OK;
}
-static int ArrowIpcReaderSetTypeInt(struct ArrowSchema* schema,
- flatbuffers_generic_t type_generic,
- struct ArrowError* error) {
+static int ArrowIpcDecoderSetTypeInt(struct ArrowSchema* schema,
+ flatbuffers_generic_t type_generic,
+ struct ArrowError* error) {
ns(Int_table_t) type = (ns(Int_table_t))type_generic;
int is_signed = ns(Int_is_signed_get(type));
@@ -184,21 +213,21 @@ static int ArrowIpcReaderSetTypeInt(struct ArrowSchema*
schema,
}
}
- return ArrowIpcReaderSetTypeSimple(schema, nanoarrow_type, error);
+ return ArrowIpcDecoderSetTypeSimple(schema, nanoarrow_type, error);
}
-static int ArrowIpcReaderSetTypeFloatingPoint(struct ArrowSchema* schema,
- flatbuffers_generic_t
type_generic,
- struct ArrowError* error) {
+static int ArrowIpcDecoderSetTypeFloatingPoint(struct ArrowSchema* schema,
+ flatbuffers_generic_t
type_generic,
+ struct ArrowError* error) {
ns(FloatingPoint_table_t) type = (ns(FloatingPoint_table_t))type_generic;
int precision = ns(FloatingPoint_precision(type));
switch (precision) {
case ns(Precision_HALF):
- return ArrowIpcReaderSetTypeSimple(schema, NANOARROW_TYPE_HALF_FLOAT,
error);
+ return ArrowIpcDecoderSetTypeSimple(schema, NANOARROW_TYPE_HALF_FLOAT,
error);
case ns(Precision_SINGLE):
- return ArrowIpcReaderSetTypeSimple(schema, NANOARROW_TYPE_FLOAT, error);
+ return ArrowIpcDecoderSetTypeSimple(schema, NANOARROW_TYPE_FLOAT, error);
case ns(Precision_DOUBLE):
- return ArrowIpcReaderSetTypeSimple(schema, NANOARROW_TYPE_DOUBLE, error);
+ return ArrowIpcDecoderSetTypeSimple(schema, NANOARROW_TYPE_DOUBLE,
error);
default:
ArrowErrorSet(error, "Unexpected FloatingPoint Precision value: %d",
(int)precision);
@@ -206,9 +235,9 @@ static int ArrowIpcReaderSetTypeFloatingPoint(struct
ArrowSchema* schema,
}
}
-static int ArrowIpcReaderSetTypeDecimal(struct ArrowSchema* schema,
- flatbuffers_generic_t type_generic,
- struct ArrowError* error) {
+static int ArrowIpcDecoderSetTypeDecimal(struct ArrowSchema* schema,
+ flatbuffers_generic_t type_generic,
+ struct ArrowError* error) {
ns(Decimal_table_t) type = (ns(Decimal_table_t))type_generic;
int scale = ns(Decimal_scale(type));
int precision = ns(Decimal_precision(type));
@@ -237,34 +266,34 @@ static int ArrowIpcReaderSetTypeDecimal(struct
ArrowSchema* schema,
return NANOARROW_OK;
}
-static int ArrowIpcReaderSetTypeFixedSizeBinary(struct ArrowSchema* schema,
- flatbuffers_generic_t
type_generic,
- struct ArrowError* error) {
+static int ArrowIpcDecoderSetTypeFixedSizeBinary(struct ArrowSchema* schema,
+ flatbuffers_generic_t
type_generic,
+ struct ArrowError* error) {
ns(FixedSizeBinary_table_t) type = (ns(FixedSizeBinary_table_t))type_generic;
int fixed_size = ns(FixedSizeBinary_byteWidth(type));
return ArrowSchemaSetTypeFixedSize(schema, NANOARROW_TYPE_FIXED_SIZE_BINARY,
fixed_size);
}
-static int ArrowIpcReaderSetTypeDate(struct ArrowSchema* schema,
- flatbuffers_generic_t type_generic,
- struct ArrowError* error) {
+static int ArrowIpcDecoderSetTypeDate(struct ArrowSchema* schema,
+ flatbuffers_generic_t type_generic,
+ struct ArrowError* error) {
ns(Date_table_t) type = (ns(Date_table_t))type_generic;
int date_unit = ns(Date_unit(type));
switch (date_unit) {
case ns(DateUnit_DAY):
- return ArrowIpcReaderSetTypeSimple(schema, NANOARROW_TYPE_DATE32, error);
+ return ArrowIpcDecoderSetTypeSimple(schema, NANOARROW_TYPE_DATE32,
error);
case ns(DateUnit_MILLISECOND):
- return ArrowIpcReaderSetTypeSimple(schema, NANOARROW_TYPE_DATE64, error);
+ return ArrowIpcDecoderSetTypeSimple(schema, NANOARROW_TYPE_DATE64,
error);
default:
ArrowErrorSet(error, "Unexpected Date DateUnit value: %d",
(int)date_unit);
return EINVAL;
}
}
-static int ArrowIpcReaderSetTypeTime(struct ArrowSchema* schema,
- flatbuffers_generic_t type_generic,
- struct ArrowError* error) {
+static int ArrowIpcDecoderSetTypeTime(struct ArrowSchema* schema,
+ flatbuffers_generic_t type_generic,
+ struct ArrowError* error) {
ns(Time_table_t) type = (ns(Time_table_t))type_generic;
int time_unit = ns(Time_unit(type));
int bitwidth = ns(Time_bitWidth(type));
@@ -307,9 +336,9 @@ static int ArrowIpcReaderSetTypeTime(struct ArrowSchema*
schema,
return NANOARROW_OK;
}
-static int ArrowIpcReaderSetTypeTimestamp(struct ArrowSchema* schema,
- flatbuffers_generic_t type_generic,
- struct ArrowError* error) {
+static int ArrowIpcDecoderSetTypeTimestamp(struct ArrowSchema* schema,
+ flatbuffers_generic_t type_generic,
+ struct ArrowError* error) {
ns(Timestamp_table_t) type = (ns(Timestamp_table_t))type_generic;
int time_unit = ns(Timestamp_unit(type));
@@ -328,9 +357,9 @@ static int ArrowIpcReaderSetTypeTimestamp(struct
ArrowSchema* schema,
return NANOARROW_OK;
}
-static int ArrowIpcReaderSetTypeDuration(struct ArrowSchema* schema,
- flatbuffers_generic_t type_generic,
- struct ArrowError* error) {
+static int ArrowIpcDecoderSetTypeDuration(struct ArrowSchema* schema,
+ flatbuffers_generic_t type_generic,
+ struct ArrowError* error) {
ns(Duration_table_t) type = (ns(Duration_table_t))type_generic;
int time_unit = ns(Duration_unit(type));
@@ -344,20 +373,21 @@ static int ArrowIpcReaderSetTypeDuration(struct
ArrowSchema* schema,
return NANOARROW_OK;
}
-static int ArrowIpcReaderSetTypeInterval(struct ArrowSchema* schema,
- flatbuffers_generic_t type_generic,
- struct ArrowError* error) {
+static int ArrowIpcDecoderSetTypeInterval(struct ArrowSchema* schema,
+ flatbuffers_generic_t type_generic,
+ struct ArrowError* error) {
ns(Interval_table_t) type = (ns(Interval_table_t))type_generic;
int interval_unit = ns(Interval_unit(type));
switch (interval_unit) {
case ns(IntervalUnit_YEAR_MONTH):
- return ArrowIpcReaderSetTypeSimple(schema,
NANOARROW_TYPE_INTERVAL_MONTHS, error);
+ return ArrowIpcDecoderSetTypeSimple(schema,
NANOARROW_TYPE_INTERVAL_MONTHS, error);
case ns(IntervalUnit_DAY_TIME):
- return ArrowIpcReaderSetTypeSimple(schema,
NANOARROW_TYPE_INTERVAL_DAY_TIME, error);
+ return ArrowIpcDecoderSetTypeSimple(schema,
NANOARROW_TYPE_INTERVAL_DAY_TIME,
+ error);
case ns(IntervalUnit_MONTH_DAY_NANO):
- return ArrowIpcReaderSetTypeSimple(schema,
NANOARROW_TYPE_INTERVAL_MONTH_DAY_NANO,
- error);
+ return ArrowIpcDecoderSetTypeSimple(schema,
NANOARROW_TYPE_INTERVAL_MONTH_DAY_NANO,
+ error);
default:
ArrowErrorSet(error, "Unexpected Interval unit value: %d",
(int)interval_unit);
return EINVAL;
@@ -368,9 +398,9 @@ static int ArrowIpcReaderSetTypeInterval(struct
ArrowSchema* schema,
// because the IPC format allows modifying some of the defaults those
functions assume.
// In particular, the allocate + initialize children step is handled outside
these
// setters.
-static int ArrowIpcReaderSetTypeSimpleNested(struct ArrowSchema* schema,
- const char* format,
- struct ArrowError* error) {
+static int ArrowIpcDecoderSetTypeSimpleNested(struct ArrowSchema* schema,
+ const char* format,
+ struct ArrowError* error) {
int result = ArrowSchemaSetFormat(schema, format);
if (result != NANOARROW_OK) {
ArrowErrorSet(error, "ArrowSchemaSetFormat('%s') failed", format);
@@ -380,23 +410,23 @@ static int ArrowIpcReaderSetTypeSimpleNested(struct
ArrowSchema* schema,
return NANOARROW_OK;
}
-static int ArrowIpcReaderSetTypeFixedSizeList(struct ArrowSchema* schema,
- flatbuffers_generic_t
type_generic,
- struct ArrowError* error) {
+static int ArrowIpcDecoderSetTypeFixedSizeList(struct ArrowSchema* schema,
+ flatbuffers_generic_t
type_generic,
+ struct ArrowError* error) {
ns(FixedSizeList_table_t) type = (ns(FixedSizeList_table_t))type_generic;
int32_t fixed_size = ns(FixedSizeList_listSize(type));
char fixed_size_str[128];
int n_chars = snprintf(fixed_size_str, 128, "+w:%d", fixed_size);
fixed_size_str[n_chars] = '\0';
- return ArrowIpcReaderSetTypeSimpleNested(schema, fixed_size_str, error);
+ return ArrowIpcDecoderSetTypeSimpleNested(schema, fixed_size_str, error);
}
-static int ArrowIpcReaderSetTypeMap(struct ArrowSchema* schema,
- flatbuffers_generic_t type_generic,
- struct ArrowError* error) {
+static int ArrowIpcDecoderSetTypeMap(struct ArrowSchema* schema,
+ flatbuffers_generic_t type_generic,
+ struct ArrowError* error) {
ns(Map_table_t) type = (ns(Map_table_t))type_generic;
- NANOARROW_RETURN_NOT_OK(ArrowIpcReaderSetTypeSimpleNested(schema, "+m",
error));
+ NANOARROW_RETURN_NOT_OK(ArrowIpcDecoderSetTypeSimpleNested(schema, "+m",
error));
if (ns(Map_keysSorted(type))) {
schema->flags |= ARROW_FLAG_MAP_KEYS_SORTED;
@@ -407,9 +437,9 @@ static int ArrowIpcReaderSetTypeMap(struct ArrowSchema*
schema,
return NANOARROW_OK;
}
-static int ArrowIpcReaderSetTypeUnion(struct ArrowSchema* schema,
- flatbuffers_generic_t type_generic,
- int64_t n_children, struct ArrowError*
error) {
+static int ArrowIpcDecoderSetTypeUnion(struct ArrowSchema* schema,
+ flatbuffers_generic_t type_generic,
+ int64_t n_children, struct ArrowError*
error) {
ns(Union_table_t) type = (ns(Union_table_t))type_generic;
int union_mode = ns(Union_mode(type));
@@ -484,68 +514,70 @@ static int ArrowIpcReaderSetTypeUnion(struct ArrowSchema*
schema,
}
}
- return ArrowIpcReaderSetTypeSimpleNested(schema, union_types_str, error);
+ return ArrowIpcDecoderSetTypeSimpleNested(schema, union_types_str, error);
}
-static int ArrowIpcReaderSetType(struct ArrowSchema* schema, ns(Field_table_t)
field,
- int64_t n_children, struct ArrowError* error)
{
+static int ArrowIpcDecoderSetType(struct ArrowSchema* schema,
ns(Field_table_t) field,
+ int64_t n_children, struct ArrowError*
error) {
int type_type = ns(Field_type_type(field));
switch (type_type) {
case ns(Type_Null):
- return ArrowIpcReaderSetTypeSimple(schema, NANOARROW_TYPE_NA, error);
+ return ArrowIpcDecoderSetTypeSimple(schema, NANOARROW_TYPE_NA, error);
case ns(Type_Bool):
- return ArrowIpcReaderSetTypeSimple(schema, NANOARROW_TYPE_BOOL, error);
+ return ArrowIpcDecoderSetTypeSimple(schema, NANOARROW_TYPE_BOOL, error);
case ns(Type_Int):
- return ArrowIpcReaderSetTypeInt(schema, ns(Field_type_get(field)),
error);
+ return ArrowIpcDecoderSetTypeInt(schema, ns(Field_type_get(field)),
error);
case ns(Type_FloatingPoint):
- return ArrowIpcReaderSetTypeFloatingPoint(schema,
ns(Field_type_get(field)), error);
+ return ArrowIpcDecoderSetTypeFloatingPoint(schema,
ns(Field_type_get(field)),
+ error);
case ns(Type_Decimal):
- return ArrowIpcReaderSetTypeDecimal(schema, ns(Field_type_get(field)),
error);
+ return ArrowIpcDecoderSetTypeDecimal(schema, ns(Field_type_get(field)),
error);
case ns(Type_Binary):
- return ArrowIpcReaderSetTypeSimple(schema, NANOARROW_TYPE_BINARY, error);
+ return ArrowIpcDecoderSetTypeSimple(schema, NANOARROW_TYPE_BINARY,
error);
case ns(Type_LargeBinary):
- return ArrowIpcReaderSetTypeSimple(schema, NANOARROW_TYPE_LARGE_BINARY,
error);
+ return ArrowIpcDecoderSetTypeSimple(schema, NANOARROW_TYPE_LARGE_BINARY,
error);
case ns(Type_FixedSizeBinary):
- return ArrowIpcReaderSetTypeFixedSizeBinary(schema,
ns(Field_type_get(field)),
- error);
+ return ArrowIpcDecoderSetTypeFixedSizeBinary(schema,
ns(Field_type_get(field)),
+ error);
case ns(Type_Utf8):
- return ArrowIpcReaderSetTypeSimple(schema, NANOARROW_TYPE_STRING, error);
+ return ArrowIpcDecoderSetTypeSimple(schema, NANOARROW_TYPE_STRING,
error);
case ns(Type_LargeUtf8):
- return ArrowIpcReaderSetTypeSimple(schema, NANOARROW_TYPE_LARGE_STRING,
error);
+ return ArrowIpcDecoderSetTypeSimple(schema, NANOARROW_TYPE_LARGE_STRING,
error);
case ns(Type_Date):
- return ArrowIpcReaderSetTypeDate(schema, ns(Field_type_get(field)),
error);
+ return ArrowIpcDecoderSetTypeDate(schema, ns(Field_type_get(field)),
error);
case ns(Type_Time):
- return ArrowIpcReaderSetTypeTime(schema, ns(Field_type_get(field)),
error);
+ return ArrowIpcDecoderSetTypeTime(schema, ns(Field_type_get(field)),
error);
case ns(Type_Timestamp):
- return ArrowIpcReaderSetTypeTimestamp(schema, ns(Field_type_get(field)),
error);
+ return ArrowIpcDecoderSetTypeTimestamp(schema,
ns(Field_type_get(field)), error);
case ns(Type_Duration):
- return ArrowIpcReaderSetTypeDuration(schema, ns(Field_type_get(field)),
error);
+ return ArrowIpcDecoderSetTypeDuration(schema, ns(Field_type_get(field)),
error);
case ns(Type_Interval):
- return ArrowIpcReaderSetTypeInterval(schema, ns(Field_type_get(field)),
error);
+ return ArrowIpcDecoderSetTypeInterval(schema, ns(Field_type_get(field)),
error);
case ns(Type_Struct_):
- return ArrowIpcReaderSetTypeSimpleNested(schema, "+s", error);
+ return ArrowIpcDecoderSetTypeSimpleNested(schema, "+s", error);
case ns(Type_List):
- return ArrowIpcReaderSetTypeSimpleNested(schema, "+l", error);
+ return ArrowIpcDecoderSetTypeSimpleNested(schema, "+l", error);
case ns(Type_LargeList):
- return ArrowIpcReaderSetTypeSimpleNested(schema, "+L", error);
+ return ArrowIpcDecoderSetTypeSimpleNested(schema, "+L", error);
case ns(Type_FixedSizeList):
- return ArrowIpcReaderSetTypeFixedSizeList(schema,
ns(Field_type_get(field)), error);
+ return ArrowIpcDecoderSetTypeFixedSizeList(schema,
ns(Field_type_get(field)),
+ error);
case ns(Type_Map):
- return ArrowIpcReaderSetTypeMap(schema, ns(Field_type_get(field)),
error);
+ return ArrowIpcDecoderSetTypeMap(schema, ns(Field_type_get(field)),
error);
case ns(Type_Union):
- return ArrowIpcReaderSetTypeUnion(schema, ns(Field_type_get(field)),
n_children,
- error);
+ return ArrowIpcDecoderSetTypeUnion(schema, ns(Field_type_get(field)),
n_children,
+ error);
default:
ArrowErrorSet(error, "Unrecognized Field type with value %d",
(int)type_type);
return EINVAL;
}
}
-static int ArrowIpcReaderSetChildren(struct ArrowSchema* schema,
ns(Field_vec_t) fields,
- struct ArrowError* error);
+static int ArrowIpcDecoderSetChildren(struct ArrowSchema* schema,
ns(Field_vec_t) fields,
+ struct ArrowError* error);
-static int ArrowIpcReaderSetField(struct ArrowSchema* schema,
ns(Field_table_t) field,
- struct ArrowError* error) {
+static int ArrowIpcDecoderSetField(struct ArrowSchema* schema,
ns(Field_table_t) field,
+ struct ArrowError* error) {
// No dictionary support yet
if (ns(Field_dictionary_is_present(field))) {
ArrowErrorSet(error, "Field DictionaryEncoding not supported");
@@ -569,7 +601,7 @@ static int ArrowIpcReaderSetField(struct ArrowSchema*
schema, ns(Field_table_t)
ns(Field_vec_t) children = ns(Field_children(field));
int64_t n_children = ns(Field_vec_len(children));
- NANOARROW_RETURN_NOT_OK(ArrowIpcReaderSetType(schema, field, n_children,
error));
+ NANOARROW_RETURN_NOT_OK(ArrowIpcDecoderSetType(schema, field, n_children,
error));
// nanoarrow's type setters set the nullable flag by default, so we might
// have to unset it here.
@@ -591,33 +623,36 @@ static int ArrowIpcReaderSetField(struct ArrowSchema*
schema, ns(Field_table_t)
ArrowSchemaInit(schema->children[i]);
}
- NANOARROW_RETURN_NOT_OK(ArrowIpcReaderSetChildren(schema, children, error));
- return ArrowIpcReaderSetMetadata(schema, ns(Field_custom_metadata(field)),
error);
+ NANOARROW_RETURN_NOT_OK(ArrowIpcDecoderSetChildren(schema, children, error));
+ return ArrowIpcDecoderSetMetadata(schema, ns(Field_custom_metadata(field)),
error);
}
-static int ArrowIpcReaderSetChildren(struct ArrowSchema* schema,
ns(Field_vec_t) fields,
- struct ArrowError* error) {
+static int ArrowIpcDecoderSetChildren(struct ArrowSchema* schema,
ns(Field_vec_t) fields,
+ struct ArrowError* error) {
int64_t n_fields = ns(Schema_vec_len(fields));
for (int64_t i = 0; i < n_fields; i++) {
ns(Field_table_t) field = ns(Field_vec_at(fields, i));
- NANOARROW_RETURN_NOT_OK(ArrowIpcReaderSetField(schema->children[i], field,
error));
+ NANOARROW_RETURN_NOT_OK(ArrowIpcDecoderSetField(schema->children[i],
field, error));
}
return NANOARROW_OK;
}
-static int ArrowIpcReaderDecodeSchema(struct ArrowIpcReader* reader,
- flatbuffers_generic_t message_header,
- struct ArrowError* error) {
+static int ArrowIpcDecoderDecodeSchemaHeader(struct ArrowIpcDecoder* decoder,
+ flatbuffers_generic_t
message_header,
+ struct ArrowError* error) {
+ struct ArrowIpcDecoderPrivate* private_data =
+ (struct ArrowIpcDecoderPrivate*)decoder->private_data;
+
ns(Schema_table_t) schema = (ns(Schema_table_t))message_header;
int endianness = ns(Schema_endianness(schema));
switch (endianness) {
case ns(Endianness_Little):
- reader->endianness = NANOARROW_IPC_ENDIANNESS_LITTLE;
+ decoder->endianness = NANOARROW_IPC_ENDIANNESS_LITTLE;
break;
case ns(Endianness_Big):
- reader->endianness = NANOARROW_IPC_ENDIANNESS_BIG;
+ decoder->endianness = NANOARROW_IPC_ENDIANNESS_BIG;
break;
default:
ArrowErrorSet(error,
@@ -628,16 +663,16 @@ static int ArrowIpcReaderDecodeSchema(struct
ArrowIpcReader* reader,
ns(Feature_vec_t) features = ns(Schema_features(schema));
int64_t n_features = ns(Feature_vec_len(features));
- reader->features = 0;
+ decoder->feature_flags = 0;
for (int64_t i = 0; i < n_features; i++) {
ns(Feature_enum_t) feature = ns(Feature_vec_at(features, i));
switch (feature) {
case ns(Feature_COMPRESSED_BODY):
- reader->features |= NANOARROW_IPC_FEATURE_COMPRESSED_BODY;
+ decoder->feature_flags |= NANOARROW_IPC_FEATURE_COMPRESSED_BODY;
break;
case ns(Feature_DICTIONARY_REPLACEMENT):
- reader->features |= NANOARROW_IPC_FEATURE_DICTIONARY_REPLACEMENT;
+ decoder->feature_flags |= NANOARROW_IPC_FEATURE_DICTIONARY_REPLACEMENT;
break;
default:
ArrowErrorSet(error, "Unrecognized Schema feature with value %d",
(int)feature);
@@ -645,33 +680,70 @@ static int ArrowIpcReaderDecodeSchema(struct
ArrowIpcReader* reader,
}
}
- ns(Field_vec_t) fields = ns(Schema_fields(schema));
- int64_t n_fields = ns(Schema_vec_len(fields));
- if (reader->schema.release != NULL) {
- reader->schema.release(&reader->schema);
+ return NANOARROW_OK;
+}
+
+static int ArrowIpcDecoderDecodeRecordBatchHeader(struct ArrowIpcDecoder*
decoder,
+ flatbuffers_generic_t
message_header,
+ struct ArrowError* error) {
+ struct ArrowIpcDecoderPrivate* private_data =
+ (struct ArrowIpcDecoderPrivate*)decoder->private_data;
+
+ ns(RecordBatch_table_t) batch = (ns(RecordBatch_table_t))message_header;
+
+ ns(FieldNode_vec_t) fields = ns(RecordBatch_nodes(batch));
+ ns(Buffer_vec_t) buffers = ns(RecordBatch_buffers(batch));
+ int64_t n_fields = ns(FieldNode_vec_len(fields));
+ int64_t n_buffers = ns(Buffer_vec_len(buffers));
+
+ // Check field node and buffer count. We have one more field and buffer
+ // because we count the root struct and the flatbuffer message does not.
+ if ((n_fields + 1) != private_data->n_fields) {
+ ArrowErrorSet(error, "Expected %ld field nodes in message but found %ld",
+ (long)private_data->n_fields - 1, (long)n_fields);
+ return EINVAL;
}
- ArrowSchemaInit(&reader->schema);
- int result = ArrowSchemaSetTypeStruct(&reader->schema, n_fields);
- if (result != NANOARROW_OK) {
- ArrowErrorSet(error, "Failed to allocate struct schema with %ld children",
- (long)n_fields);
- return result;
+ if ((n_buffers + 1) != private_data->n_buffers) {
+ ArrowErrorSet(error, "Expected %ld buffers in message but found %ld",
+ (long)private_data->n_buffers - 1, (long)n_buffers);
+ return EINVAL;
}
- NANOARROW_RETURN_NOT_OK(ArrowIpcReaderSetChildren(&reader->schema, fields,
error));
- return ArrowIpcReaderSetMetadata(&reader->schema,
ns(Schema_custom_metadata(schema)),
- error);
+ if (ns(RecordBatch_compression_is_present(batch))) {
+ ns(BodyCompression_table_t) compression =
ns(RecordBatch_compression(batch));
+ ns(CompressionType_enum_t) codec = ns(BodyCompression_codec(compression));
+ switch (codec) {
+ case ns(CompressionType_LZ4_FRAME):
+ decoder->codec = NANOARROW_IPC_COMPRESSION_TYPE_LZ4_FRAME;
+ break;
+ case ns(CompressionType_ZSTD):
+ decoder->codec = NANOARROW_IPC_COMPRESSION_TYPE_ZSTD;
+ break;
+ default:
+ ArrowErrorSet(error, "Unrecognized RecordBatch BodyCompression codec
value: %d",
+ (int)codec);
+ return EINVAL;
+ }
+ } else {
+ decoder->codec = NANOARROW_IPC_COMPRESSION_TYPE_NONE;
+ }
+
+ // Copying field node and buffer information is separate so as only to pay
for the
+ // nodes that are actually accessed.
+ return NANOARROW_OK;
}
-static inline int ArrowIpcReaderCheckHeader(struct ArrowIpcReader* reader,
- struct ArrowBufferView* data_mut,
- int32_t* message_size_bytes,
- struct ArrowError* error) {
+// Returns NANOARROW_OK if data is large enough to read the message header,
+// ESPIPE if reading more data might help, or EINVAL if the content is not
valid
+static inline int ArrowIpcDecoderCheckHeader(struct ArrowIpcDecoder* decoder,
+ struct ArrowBufferView* data_mut,
+ int32_t* message_size_bytes,
+ struct ArrowError* error) {
if (data_mut->size_bytes < 8) {
ArrowErrorSet(error, "Expected data of at least 8 bytes but only %ld bytes
remain",
(long)data_mut->size_bytes);
- return EINVAL;
+ return ESPIPE;
}
uint32_t continuation = ArrowIpcReadUint32LE(data_mut);
@@ -682,12 +754,17 @@ static inline int ArrowIpcReaderCheckHeader(struct
ArrowIpcReader* reader,
}
*message_size_bytes = ArrowIpcReadInt32LE(data_mut);
- if ((*message_size_bytes) > data_mut->size_bytes || (*message_size_bytes) <
0) {
+ if ((*message_size_bytes) < 0) {
+ ArrowErrorSet(
+ error, "Expected message body size > 0 but found message body size of
%ld bytes",
+ (long)(*message_size_bytes));
+ return EINVAL;
+ } else if ((*message_size_bytes) > data_mut->size_bytes) {
ArrowErrorSet(error,
"Expected 0 <= message body size <= %ld bytes but found
message "
"body size of %ld bytes",
(long)data_mut->size_bytes, (long)(*message_size_bytes));
- return EINVAL;
+ return ESPIPE;
}
if (*message_size_bytes == 0) {
@@ -698,49 +775,64 @@ static inline int ArrowIpcReaderCheckHeader(struct
ArrowIpcReader* reader,
return NANOARROW_OK;
}
-ArrowErrorCode ArrowIpcReaderPeek(struct ArrowIpcReader* reader,
- struct ArrowBufferView data, struct
ArrowError* error) {
- reader->message_type = NANOARROW_IPC_MESSAGE_TYPE_UNINITIALIZED;
- reader->body_size_bytes = 0;
+ArrowErrorCode ArrowIpcDecoderPeekHeader(struct ArrowIpcDecoder* decoder,
+ struct ArrowBufferView data,
+ struct ArrowError* error) {
+ struct ArrowIpcDecoderPrivate* private_data =
+ (struct ArrowIpcDecoderPrivate*)decoder->private_data;
+
+ decoder->message_type = NANOARROW_IPC_MESSAGE_TYPE_UNINITIALIZED;
+ decoder->body_size_bytes = 0;
+ private_data->last_message = NULL;
NANOARROW_RETURN_NOT_OK(
- ArrowIpcReaderCheckHeader(reader, &data, &reader->header_size_bytes,
error));
- reader->header_size_bytes += 2 * sizeof(int32_t);
+ ArrowIpcDecoderCheckHeader(decoder, &data, &decoder->header_size_bytes,
error));
+ decoder->header_size_bytes += 2 * sizeof(int32_t);
return NANOARROW_OK;
}
-ArrowErrorCode ArrowIpcReaderVerify(struct ArrowIpcReader* reader,
- struct ArrowBufferView data,
- struct ArrowError* error) {
- reader->message_type = NANOARROW_IPC_MESSAGE_TYPE_UNINITIALIZED;
- reader->body_size_bytes = 0;
+ArrowErrorCode ArrowIpcDecoderVerifyHeader(struct ArrowIpcDecoder* decoder,
+ struct ArrowBufferView data,
+ struct ArrowError* error) {
+ struct ArrowIpcDecoderPrivate* private_data =
+ (struct ArrowIpcDecoderPrivate*)decoder->private_data;
+
+ decoder->message_type = NANOARROW_IPC_MESSAGE_TYPE_UNINITIALIZED;
+ decoder->body_size_bytes = 0;
+ private_data->last_message = NULL;
NANOARROW_RETURN_NOT_OK(
- ArrowIpcReaderCheckHeader(reader, &data, &reader->header_size_bytes,
error));
+ ArrowIpcDecoderCheckHeader(decoder, &data, &decoder->header_size_bytes,
error));
// Run flatbuffers verification
- if (ns(Message_verify_as_root(data.data.as_uint8,
reader->header_size_bytes)) !=
+ if (ns(Message_verify_as_root(data.data.as_uint8,
decoder->header_size_bytes)) !=
flatcc_verify_ok) {
ArrowErrorSet(error, "Message flatbuffer verification failed");
return EINVAL;
}
// Read some basic information from the message
- reader->header_size_bytes += 2 * sizeof(int32_t);
+ decoder->header_size_bytes += 2 * sizeof(int32_t);
ns(Message_table_t) message = ns(Message_as_root(data.data.as_uint8));
- reader->metadata_version = ns(Message_version(message));
- reader->message_type = ns(Message_header_type(message));
- reader->body_size_bytes = ns(Message_bodyLength(message));
+ decoder->metadata_version = ns(Message_version(message));
+ decoder->message_type = ns(Message_header_type(message));
+ decoder->body_size_bytes = ns(Message_bodyLength(message));
+ private_data->last_message = ns(Message_header_get(message));
return NANOARROW_OK;
}
-ArrowErrorCode ArrowIpcReaderDecode(struct ArrowIpcReader* reader,
- struct ArrowBufferView data,
- struct ArrowError* error) {
- reader->message_type = NANOARROW_IPC_MESSAGE_TYPE_UNINITIALIZED;
- reader->body_size_bytes = 0;
+ArrowErrorCode ArrowIpcDecoderDecodeHeader(struct ArrowIpcDecoder* decoder,
+ struct ArrowBufferView data,
+ struct ArrowError* error) {
+ struct ArrowIpcDecoderPrivate* private_data =
+ (struct ArrowIpcDecoderPrivate*)decoder->private_data;
+
+ decoder->message_type = NANOARROW_IPC_MESSAGE_TYPE_UNINITIALIZED;
+ decoder->body_size_bytes = 0;
+ private_data->last_message = NULL;
+
NANOARROW_RETURN_NOT_OK(
- ArrowIpcReaderCheckHeader(reader, &data, &reader->header_size_bytes,
error));
- reader->header_size_bytes += 2 * sizeof(int32_t);
+ ArrowIpcDecoderCheckHeader(decoder, &data, &decoder->header_size_bytes,
error));
+ decoder->header_size_bytes += 2 * sizeof(int32_t);
ns(Message_table_t) message = ns(Message_as_root(data.data.as_uint8));
if (!message) {
@@ -748,11 +840,11 @@ ArrowErrorCode ArrowIpcReaderDecode(struct
ArrowIpcReader* reader,
}
// Read some basic information from the message
- reader->metadata_version = ns(Message_version(message));
- reader->message_type = ns(Message_header_type(message));
- reader->body_size_bytes = ns(Message_bodyLength(message));
+ int32_t metadata_version = ns(Message_version(message));
+ decoder->message_type = ns(Message_header_type(message));
+ decoder->body_size_bytes = ns(Message_bodyLength(message));
- switch (reader->metadata_version) {
+ switch (decoder->metadata_version) {
case ns(MetadataVersion_V4):
case ns(MetadataVersion_V5):
break;
@@ -760,30 +852,319 @@ ArrowErrorCode ArrowIpcReaderDecode(struct
ArrowIpcReader* reader,
case ns(MetadataVersion_V2):
case ns(MetadataVersion_V3):
ArrowErrorSet(error, "Expected metadata version V4 or V5 but found %s",
- ns(MetadataVersion_name(reader->metadata_version)));
+ ns(MetadataVersion_name(decoder->metadata_version)));
break;
default:
ArrowErrorSet(error, "Unexpected value for Message metadata version
(%d)",
- reader->metadata_version);
+ decoder->metadata_version);
return EINVAL;
}
flatbuffers_generic_t message_header = ns(Message_header_get(message));
- switch (reader->message_type) {
+ switch (decoder->message_type) {
case ns(MessageHeader_Schema):
- NANOARROW_RETURN_NOT_OK(ArrowIpcReaderDecodeSchema(reader,
message_header, error));
+ NANOARROW_RETURN_NOT_OK(
+ ArrowIpcDecoderDecodeSchemaHeader(decoder, message_header, error));
break;
- case ns(MessageHeader_DictionaryBatch):
case ns(MessageHeader_RecordBatch):
+ NANOARROW_RETURN_NOT_OK(
+ ArrowIpcDecoderDecodeRecordBatchHeader(decoder, message_header,
error));
+ break;
+ case ns(MessageHeader_DictionaryBatch):
case ns(MessageHeader_Tensor):
case ns(MessageHeader_SparseTensor):
ArrowErrorSet(error, "Unsupported message type: '%s'",
- ns(MessageHeader_type_name(reader->message_type)));
+ ns(MessageHeader_type_name(decoder->message_type)));
return ENOTSUP;
default:
- ArrowErrorSet(error, "Unnown message type: %d",
(int)(reader->message_type));
+ ArrowErrorSet(error, "Unnown message type: %d",
(int)(decoder->message_type));
return EINVAL;
}
+ private_data->last_message = message_header;
+ return NANOARROW_OK;
+}
+
+ArrowErrorCode ArrowIpcDecoderDecodeSchema(struct ArrowIpcDecoder* decoder,
+ struct ArrowSchema* out,
+ struct ArrowError* error) {
+ struct ArrowIpcDecoderPrivate* private_data =
+ (struct ArrowIpcDecoderPrivate*)decoder->private_data;
+
+ if (private_data->last_message == NULL ||
+ decoder->message_type != NANOARROW_IPC_MESSAGE_TYPE_SCHEMA) {
+ ArrowErrorSet(error, "decoder did not just decode a Schema message");
+ return EINVAL;
+ }
+
+ ns(Schema_table_t) schema = (ns(Schema_table_t))private_data->last_message;
+
+ ns(Field_vec_t) fields = ns(Schema_fields(schema));
+ int64_t n_fields = ns(Schema_vec_len(fields));
+
+ struct ArrowSchema tmp;
+ ArrowSchemaInit(&tmp);
+ int result = ArrowSchemaSetTypeStruct(&tmp, n_fields);
+ if (result != NANOARROW_OK) {
+ tmp.release(&tmp);
+ ArrowErrorSet(error, "Failed to allocate struct schema with %ld children",
+ (long)n_fields);
+ return result;
+ }
+
+ result = ArrowIpcDecoderSetChildren(&tmp, fields, error);
+ if (result != NANOARROW_OK) {
+ tmp.release(&tmp);
+ return result;
+ }
+
+ result = ArrowIpcDecoderSetMetadata(&tmp,
ns(Schema_custom_metadata(schema)), error);
+ if (result != NANOARROW_OK) {
+ tmp.release(&tmp);
+ return result;
+ }
+
+ ArrowSchemaMove(&tmp, out);
+ return NANOARROW_OK;
+}
+
+static void ArrowIpcDecoderCountFields(struct ArrowSchema* schema, int64_t*
n_fields) {
+ *n_fields += 1;
+ for (int64_t i = 0; i < schema->n_children; i++) {
+ ArrowIpcDecoderCountFields(schema->children[i], n_fields);
+ }
+}
+
+static void ArrowIpcDecoderInitFields(struct ArrowIpcField* fields,
+ struct ArrowArrayView* view, int64_t*
n_fields,
+ int64_t* n_buffers) {
+ struct ArrowIpcField* field = fields + (*n_fields);
+ field->array_view = view;
+ field->buffer_offset = *n_buffers;
+
+ for (int i = 0; i < 3; i++) {
+ *n_buffers += view->layout.buffer_type[i] != NANOARROW_BUFFER_TYPE_NONE;
+ }
+
+ *n_fields += 1;
+
+ for (int64_t i = 0; i < view->n_children; i++) {
+ ArrowIpcDecoderInitFields(fields, view->children[i], n_fields, n_buffers);
+ }
+}
+
+ArrowErrorCode ArrowIpcDecoderSetSchema(struct ArrowIpcDecoder* decoder,
+ struct ArrowSchema* schema,
+ struct ArrowError* error) {
+ struct ArrowIpcDecoderPrivate* private_data =
+ (struct ArrowIpcDecoderPrivate*)decoder->private_data;
+
+ ArrowArrayViewReset(&private_data->array_view);
+
+ if (private_data->fields != NULL) {
+ ArrowFree(private_data->fields);
+ }
+
+ // Allocate Array and ArrayView based on schema without moving the schema
+ // this will fail if the schema is not valid.
+ NANOARROW_RETURN_NOT_OK(
+ ArrowArrayViewInitFromSchema(&private_data->array_view, schema, error));
+
+ // Root must be a struct
+ if (private_data->array_view.storage_type != NANOARROW_TYPE_STRUCT) {
+ ArrowErrorSet(error, "schema must be a struct type");
+ return EINVAL;
+ }
+
+ // Walk tree and calculate how many fields we need to allocate
+ private_data->n_fields = 0;
+ ArrowIpcDecoderCountFields(schema, &private_data->n_fields);
+ private_data->fields = (struct
ArrowIpcField*)ArrowMalloc(private_data->n_fields *
+ sizeof(struct
ArrowIpcField));
+ if (private_data->fields == NULL) {
+ ArrowErrorSet(error, "Failed to allocate decoder->fields");
+ return ENOMEM;
+ }
+ memset(private_data->fields, 0, private_data->n_fields * sizeof(struct
ArrowIpcField));
+
+ // Init field information and calculate starting buffer offset for each
+ int64_t field_i = 0;
+ ArrowIpcDecoderInitFields(private_data->fields, &private_data->array_view,
&field_i,
+ &private_data->n_buffers);
+
+ return NANOARROW_OK;
+}
+
+struct ArrowIpcArraySetter {
+ ns(FieldNode_vec_t) fields;
+ int64_t field_i;
+ ns(Buffer_vec_t) buffers;
+ int64_t buffer_i;
+ struct ArrowBufferView body;
+ enum ArrowIpcCompressionType codec;
+ enum ArrowIpcEndianness endianness;
+ enum ArrowIpcEndianness system_endianness;
+};
+
+static int ArrowIpcDecoderMakeBuffer(struct ArrowIpcArraySetter* setter,
int64_t offset,
+ int64_t length, struct ArrowBuffer* out,
+ struct ArrowError* error) {
+ if (length == 0) {
+ return NANOARROW_OK;
+ }
+
+ // Check that this buffer fits within the body
+ if (offset < 0 || (offset + length) > setter->body.size_bytes) {
+ ArrowErrorSet(error,
+ "Buffer %ld requires body offsets [%ld..%ld) but body has
size %ld",
+ (long)setter->buffer_i - 1, (long)offset, (long)offset +
(long)length,
+ setter->body.size_bytes);
+ return EINVAL;
+ }
+
+ struct ArrowBufferView view;
+ view.data.as_uint8 = setter->body.data.as_uint8 + offset;
+ view.size_bytes = length;
+
+ if (setter->codec != NANOARROW_IPC_COMPRESSION_TYPE_NONE) {
+ ArrowErrorSet(error, "The nanoarrow_ipc extension does not support
compression");
+ return ENOTSUP;
+ }
+
+ if (setter->endianness != NANOARROW_IPC_ENDIANNESS_UNINITIALIZED &&
+ setter->endianness != setter->system_endianness) {
+ ArrowErrorSet(error,
+ "The nanoarrow_ipc extension does not support non-system
endianness");
+ return ENOTSUP;
+ }
+
+ int result = ArrowBufferAppendBufferView(out, view);
+ if (result != NANOARROW_OK) {
+ ArrowErrorSet(error, "Failed to copy buffer");
+ return result;
+ }
+
+ return NANOARROW_OK;
+}
+
+static int ArrowIpcDecoderWalkGetArray(struct ArrowIpcArraySetter* setter,
+ struct ArrowArray* array,
+ struct ArrowError* error) {
+ ns(FieldNode_struct_t) field =
+ ns(FieldNode_vec_at(setter->fields, (size_t)setter->field_i));
+ array->length = ns(FieldNode_length(field));
+ array->null_count = ns(FieldNode_null_count(field));
+ setter->field_i += 1;
+
+ for (int64_t i = 0; i < array->n_buffers; i++) {
+ ns(Buffer_struct_t) buffer =
+ ns(Buffer_vec_at(setter->buffers, (size_t)setter->buffer_i));
+ int64_t buffer_offset = ns(Buffer_offset(buffer));
+ int64_t buffer_length = ns(Buffer_length(buffer));
+ setter->buffer_i += 1;
+
+ struct ArrowBuffer* buffer_dst = ArrowArrayBuffer(array, i);
+ NANOARROW_RETURN_NOT_OK(ArrowIpcDecoderMakeBuffer(setter, buffer_offset,
+ buffer_length,
buffer_dst, error));
+ }
+
+ for (int64_t i = 0; i < array->n_children; i++) {
+ NANOARROW_RETURN_NOT_OK(
+ ArrowIpcDecoderWalkGetArray(setter, array->children[i], error));
+ }
+
+ return NANOARROW_OK;
+}
+
+static int ArrowIpcArrayInitFromArrayView(struct ArrowArray* array,
+ struct ArrowArrayView* array_view) {
+ NANOARROW_RETURN_NOT_OK(ArrowArrayInitFromType(array,
array_view->storage_type));
+ NANOARROW_RETURN_NOT_OK(ArrowArrayAllocateChildren(array,
array_view->n_children));
+ for (int64_t i = 0; i < array_view->n_children; i++) {
+ NANOARROW_RETURN_NOT_OK(
+ ArrowIpcArrayInitFromArrayView(array->children[i],
array_view->children[i]));
+ }
+
+ return NANOARROW_OK;
+}
+
+ArrowErrorCode ArrowIpcDecoderDecodeArray(struct ArrowIpcDecoder* decoder,
+ struct ArrowBufferView body, int64_t
field_i,
+ struct ArrowArray* out,
+ struct ArrowError* error) {
+ struct ArrowIpcDecoderPrivate* private_data =
+ (struct ArrowIpcDecoderPrivate*)decoder->private_data;
+
+ if (private_data->last_message == NULL ||
+ decoder->message_type != NANOARROW_IPC_MESSAGE_TYPE_RECORD_BATCH) {
+ ArrowErrorSet(error, "decoder did not just decode a RecordBatch message");
+ return EINVAL;
+ }
+
+ ns(RecordBatch_table_t) batch =
(ns(RecordBatch_table_t))private_data->last_message;
+
+ // RecordBatch messages don't count the root node but decoder->fields does
+ struct ArrowIpcField* root = private_data->fields + field_i + 1;
+
+ struct ArrowArray temp;
+ temp.release = NULL;
+ int result = ArrowIpcArrayInitFromArrayView(&temp, root->array_view);
+ if (result != NANOARROW_OK) {
+ ArrowErrorSet(error, "Failed to initialize output array");
+ return result;
+ }
+
+ struct ArrowIpcArraySetter setter;
+ setter.fields = ns(RecordBatch_nodes(batch));
+ setter.field_i = field_i;
+ setter.buffers = ns(RecordBatch_buffers(batch));
+ setter.buffer_i = root->buffer_offset - 1;
+ setter.body = body;
+ setter.codec = decoder->codec;
+ setter.endianness = decoder->endianness;
+
+ // This should probably be done at compile time
+ uint32_t check = 1;
+ char first_byte;
+ memcpy(&first_byte, &check, sizeof(char));
+ if (first_byte) {
+ setter.system_endianness = NANOARROW_IPC_ENDIANNESS_LITTLE;
+ } else {
+ setter.system_endianness = NANOARROW_IPC_ENDIANNESS_BIG;
+ }
+
+ // The flatbuffers FieldNode doesn't count the root struct so we have to
loop over the
+ // children ourselves
+ if (field_i == -1) {
+ temp.length = ns(RecordBatch_length(batch));
+ temp.null_count = 0;
+ setter.field_i++;
+ setter.buffer_i++;
+
+ for (int64_t i = 0; i < temp.n_children; i++) {
+ result = ArrowIpcDecoderWalkGetArray(&setter, temp.children[i], error);
+ if (result != NANOARROW_OK) {
+ temp.release(&temp);
+ return result;
+ }
+ }
+ } else {
+ result = ArrowIpcDecoderWalkGetArray(&setter, &temp, error);
+ if (result != NANOARROW_OK) {
+ temp.release(&temp);
+ return result;
+ }
+ }
+
+ // TODO: this performs some validation but doesn't do everything we need it
to do
+ // notably it doesn't loop over offset buffers to look for values that will
cause
+ // out-of-bounds buffer access on the data buffer or child arrays.
+ result = ArrowArrayFinishBuilding(&temp, error);
+ if (result != NANOARROW_OK) {
+ temp.release(&temp);
+ return result;
+ }
+
+ ArrowArrayMove(&temp, out);
return NANOARROW_OK;
}
diff --git a/extensions/nanoarrow_ipc/src/nanoarrow/nanoarrow_ipc.h
b/extensions/nanoarrow_ipc/src/nanoarrow/nanoarrow_ipc.h
index 018f4ed..7573441 100644
--- a/extensions/nanoarrow_ipc/src/nanoarrow/nanoarrow_ipc.h
+++ b/extensions/nanoarrow_ipc/src/nanoarrow/nanoarrow_ipc.h
@@ -23,11 +23,20 @@
#ifdef NANOARROW_NAMESPACE
#define ArrowIpcCheckRuntime NANOARROW_SYMBOL(NANOARROW_NAMESPACE,
ArrowIpcCheckRuntime)
-#define ArrowIpcReaderInit NANOARROW_SYMBOL(NANOARROW_NAMESPACE,
ArrowIpcReaderInit)
-#define ArrowIpcReaderReset NANOARROW_SYMBOL(NANOARROW_NAMESPACE,
ArrowIpcReaderReset)
-#define ArrowIpcReaderPeek NANOARROW_SYMBOL(NANOARROW_NAMESPACE,
ArrowIpcReaderPeek)
-#define ArrowIpcReaderVerify NANOARROW_SYMBOL(NANOARROW_NAMESPACE,
ArrowIpcReaderVerify)
-#define ArrowIpcReaderDecode NANOARROW_SYMBOL(NANOARROW_NAMESPACE,
ArrowIpcReaderDecode)
+#define ArrowIpcDecoderInit NANOARROW_SYMBOL(NANOARROW_NAMESPACE,
ArrowIpcDecoderInit)
+#define ArrowIpcDecoderReset NANOARROW_SYMBOL(NANOARROW_NAMESPACE,
ArrowIpcDecoderReset)
+#define ArrowIpcDecoderPeekHeader \
+ NANOARROW_SYMBOL(NANOARROW_NAMESPACE, ArrowIpcDecoderPeekHeader)
+#define ArrowIpcDecoderVerifyHeader \
+ NANOARROW_SYMBOL(NANOARROW_NAMESPACE, ArrowIpcDecoderVerifyHeader)
+#define ArrowIpcDecoderDecodeHeader \
+ NANOARROW_SYMBOL(NANOARROW_NAMESPACE, ArrowIpcDecoderDecodeHeader)
+#define ArrowIpcDecoderDecodeSchema \
+ NANOARROW_SYMBOL(NANOARROW_NAMESPACE, ArrowIpcDecoderDecodeSchema)
+#define ArrowIpcDecoderDecodeArray \
+ NANOARROW_SYMBOL(NANOARROW_NAMESPACE, ArrowIpcDecoderDecodeArray)
+#define ArrowIpcDecoderSetSchema \
+ NANOARROW_SYMBOL(NANOARROW_NAMESPACE, ArrowIpcDecoderSetSchema)
#endif
@@ -35,6 +44,14 @@
extern "C" {
#endif
+enum ArrowIpcMetadataVersion {
+ NANOARROW_IPC_METADATA_VERSION_V1,
+ NANOARROW_IPC_METADATA_VERSION_V2,
+ NANOARROW_IPC_METADATA_VERSION_V3,
+ NANOARROW_IPC_METADATA_VERSION_V4,
+ NANOARROW_IPC_METADATA_VERSION_V5
+};
+
enum ArrowIpcMessageType {
NANOARROW_IPC_MESSAGE_TYPE_UNINITIALIZED,
NANOARROW_IPC_MESSAGE_TYPE_SCHEMA,
@@ -50,38 +67,145 @@ enum ArrowIpcEndianness {
NANOARROW_IPC_ENDIANNESS_BIG
};
+enum ArrowIpcCompressionType {
+ NANOARROW_IPC_COMPRESSION_TYPE_NONE,
+ NANOARROW_IPC_COMPRESSION_TYPE_LZ4_FRAME,
+ NANOARROW_IPC_COMPRESSION_TYPE_ZSTD
+};
+
#define NANOARROW_IPC_FEATURE_DICTIONARY_REPLACEMENT 1
#define NANOARROW_IPC_FEATURE_COMPRESSED_BODY 2
ArrowErrorCode ArrowIpcCheckRuntime(struct ArrowError* error);
-struct ArrowIpcReader {
- int32_t metadata_version;
- int32_t message_type;
- int32_t endianness;
- int32_t features;
+/// \brief Decoder for Arrow IPC messages
+///
+/// This structure is intended to be allocated by the caller,
+/// initialized using ArrowIpcDecoderInit(), and released with
+/// ArrowIpcDecoderReset(). These fields should not be modified
+/// by the caller but can be read following a call to
+/// ArrowIpcDecoderPeekHeader(), ArrowIpcDecoderVerifyHeader(), or
+/// ArrowIpcDecoderDecodeHeader().
+struct ArrowIpcDecoder {
+ /// \brief The last verified or decoded message type
+ enum ArrowIpcMessageType message_type;
+
+ /// \brief The metadata version used by this and forthcoming messages
+ enum ArrowIpcMetadataVersion metadata_version;
+
+ /// \brief Endianness of forthcoming RecordBatch messages
+ enum ArrowIpcEndianness endianness;
+
+ /// \brief Features used by this and forthcoming messages as indicated by
the current
+ /// Schema message
+ int32_t feature_flags;
+
+ /// \brief Compression used by the current RecordBatch message
+ enum ArrowIpcCompressionType codec;
+
+ /// \brief The number of bytes in the current header message
+ ///
+ /// This value includes the 8 bytes before the start of the header message
+ /// content and any padding bytes required to make the header message size
+ /// be a multiple of 8 bytes.
int32_t header_size_bytes;
- int64_t body_size_bytes;
- struct ArrowSchema schema;
-};
-
-void ArrowIpcReaderInit(struct ArrowIpcReader* reader);
-void ArrowIpcReaderReset(struct ArrowIpcReader* reader);
-
-ArrowErrorCode ArrowIpcReaderPeek(struct ArrowIpcReader* reader,
- struct ArrowBufferView data, struct
ArrowError* error);
+ /// \brief The number of bytes in the forthcoming body message.
+ int64_t body_size_bytes;
-ArrowErrorCode ArrowIpcReaderVerify(struct ArrowIpcReader* reader,
- struct ArrowBufferView data,
- struct ArrowError* error);
+ /// \brief Private resources managed by this library
+ void* private_data;
+};
-ArrowErrorCode ArrowIpcReaderDecode(struct ArrowIpcReader* reader,
- struct ArrowBufferView data,
- struct ArrowError* error);
-
-#endif
+/// \brief Initialize a decoder
+ArrowErrorCode ArrowIpcDecoderInit(struct ArrowIpcDecoder* decoder);
+
+/// \brief Release all resources attached to a decoder
+void ArrowIpcDecoderReset(struct ArrowIpcDecoder* decoder);
+
+/// \brief Peek at a message header
+///
+/// The first 8 bytes of an Arrow IPC message are 0xFFFFFF followed by the size
+/// of the header as a little-endian 32-bit integer.
ArrowIpcDecoderPeekHeader() reads
+/// these bytes and returns ESPIPE if there are not enough remaining bytes in
data to read
+/// the entire header message, EINVAL if the first 8 bytes are not valid,
ENODATA if the
+/// Arrow end-of-stream indicator has been reached, or NANOARROW_OK otherwise.
+ArrowErrorCode ArrowIpcDecoderPeekHeader(struct ArrowIpcDecoder* decoder,
+ struct ArrowBufferView data,
+ struct ArrowError* error);
+
+/// \brief Verify a message header
+///
+/// Runs ArrowIpcDecoderPeekHeader() to ensure data is sufficiently large but
additionally
+/// runs flatbuffer verification to ensure that decoding the data will not
access
+/// memory outside of the buffer specified by data.
ArrowIpcDecoderVerifyHeader() will
+/// also set decoder.header_size_bytes, decoder.body_size_bytes,
decoder.metadata_version,
+/// and decoder.message_type.
+///
+/// Returns as ArrowIpcDecoderPeekHeader() and additionally will
+/// return EINVAL if flatbuffer verification fails.
+ArrowErrorCode ArrowIpcDecoderVerifyHeader(struct ArrowIpcDecoder* decoder,
+ struct ArrowBufferView data,
+ struct ArrowError* error);
+
+/// \brief Decode a message header
+///
+/// Runs ArrowIpcDecoderPeekHeader() to ensure data is sufficiently large and
decodes
+/// the content of the message header. If data contains a schema message,
+/// decoder.endianness and decoder.feature_flags is set and
ArrowIpcDecoderDecodeSchema()
+/// can be used to obtain the decoded schema. If data contains a record batch
message,
+/// decoder.codec is set and a successful call can be followed by a call to
+/// ArrowIpcDecoderDecodeArray().
+///
+/// In almost all cases this should be preceeded by a call to
+/// ArrowIpcDecoderVerifyHeader() to ensure decoding does not access data
outside of the
+/// specified buffer.
+///
+/// Returns EINVAL if the content of the message cannot be decoded or ENOTSUP
if the
+/// content of the message uses features not supported by this library.
+ArrowErrorCode ArrowIpcDecoderDecodeHeader(struct ArrowIpcDecoder* decoder,
+ struct ArrowBufferView data,
+ struct ArrowError* error);
+
+/// \brief Decode an ArrowArray
+///
+/// After a successful call to ArrowIpcDecoderDecodeHeader(), retrieve an
ArrowSchema.
+/// The caller is responsible for releasing the schema if NANOARROW_OK is
returned.
+///
+/// Returns EINVAL if the decoder did not just decode a schema message or
+/// NANOARROW_OK otherwise.
+ArrowErrorCode ArrowIpcDecoderDecodeSchema(struct ArrowIpcDecoder* decoder,
+ struct ArrowSchema* out,
+ struct ArrowError* error);
+
+/// \brief Set the ArrowSchema used to decode future record batch messages
+///
+/// Prepares the decoder for future record batch messages
+/// of this type. The decoder takes ownership of schema if NANOARROW_OK is
returned.
+///
+/// Returns EINVAL if schema validation fails or NANOARROW_OK otherwise.
+ArrowErrorCode ArrowIpcDecoderSetSchema(struct ArrowIpcDecoder* decoder,
+ struct ArrowSchema* schema,
+ struct ArrowError* error);
+
+/// \brief Decode an ArrowArray
+///
+/// After a successful call to ArrowIpcDecoderDecodeHeader(), assemble an
ArrowArray given
+/// a message body and a field index. Note that field index does not equate to
column
+/// index if any columns contain nested types. Use a value of -1 to decode the
entire
+/// array into a struct. The caller is responsible for releasing the array if
+/// NANOARROW_OK is returned.
+///
+/// Returns EINVAL if the decoder did not just decode a record batch message,
ENOTSUP
+/// if the message uses features not supported by this library, or or
NANOARROW_OK
+/// otherwise.
+ArrowErrorCode ArrowIpcDecoderDecodeArray(struct ArrowIpcDecoder* decoder,
+ struct ArrowBufferView body, int64_t
i,
+ struct ArrowArray* out,
+ struct ArrowError* error);
#ifdef __cplusplus
}
#endif
+
+#endif
diff --git a/extensions/nanoarrow_ipc/src/nanoarrow/nanoarrow_ipc_test.cc
b/extensions/nanoarrow_ipc/src/nanoarrow/nanoarrow_ipc_test.cc
index 55835e8..a8e035f 100644
--- a/extensions/nanoarrow_ipc/src/nanoarrow/nanoarrow_ipc_test.cc
+++ b/extensions/nanoarrow_ipc/src/nanoarrow/nanoarrow_ipc_test.cc
@@ -27,6 +27,23 @@
using namespace arrow;
+// Copied from nanoarrow_ipc.c so we can test the internal state
+// of the decoder
+extern "C" {
+struct ArrowIpcField {
+ struct ArrowArrayView* array_view;
+ int64_t buffer_offset;
+};
+
+struct ArrowIpcDecoderPrivate {
+ struct ArrowArrayView array_view;
+ int64_t n_fields;
+ struct ArrowIpcField* fields;
+ int64_t n_buffers;
+ const void* last_message;
+};
+}
+
TEST(NanoarrowIpcCheckRuntime, CheckRuntime) {
EXPECT_EQ(ArrowIpcCheckRuntime(nullptr), NANOARROW_OK);
}
@@ -62,76 +79,90 @@ static uint8_t kSimpleSchema[] = {
0x6c, 0x64, 0x00, 0x00, 0x08, 0x00, 0x0c, 0x00, 0x08, 0x00, 0x07, 0x00,
0x08, 0x00,
0x00, 0x00, 0x00, 0x00, 0x00, 0x01, 0x20, 0x00, 0x00, 0x00, 0x00, 0x00,
0x00, 0x00};
+static uint8_t kSimpleRecordBatch[] = {
+ 0xff, 0xff, 0xff, 0xff, 0x88, 0x00, 0x00, 0x00, 0x14, 0x00, 0x00, 0x00,
0x00, 0x00,
+ 0x00, 0x00, 0x0c, 0x00, 0x16, 0x00, 0x06, 0x00, 0x05, 0x00, 0x08, 0x00,
0x0c, 0x00,
+ 0x0c, 0x00, 0x00, 0x00, 0x00, 0x03, 0x04, 0x00, 0x18, 0x00, 0x00, 0x00,
0x10, 0x00,
+ 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x0a, 0x00, 0x18, 0x00,
0x0c, 0x00,
+ 0x04, 0x00, 0x08, 0x00, 0x0a, 0x00, 0x00, 0x00, 0x3c, 0x00, 0x00, 0x00,
0x10, 0x00,
+ 0x00, 0x00, 0x03, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00,
0x00, 0x00,
+ 0x02, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00,
0x00, 0x00,
+ 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00,
0x00, 0x00,
+ 0x0c, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00,
0x01, 0x00,
+ 0x00, 0x00, 0x03, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00,
0x00, 0x00,
+ 0x00, 0x00, 0x00, 0x00, 0x01, 0x00, 0x00, 0x00, 0x02, 0x00, 0x00, 0x00,
0x03, 0x00,
+ 0x00, 0x00, 0x00, 0x00, 0x00, 0x00};
+
TEST(NanoarrowIpcTest, NanoarrowIpcCheckHeader) {
- struct ArrowIpcReader reader;
+ struct ArrowIpcDecoder decoder;
struct ArrowError error;
struct ArrowBufferView data;
data.data.as_uint8 = kSimpleSchema;
data.size_bytes = 1;
- ArrowIpcReaderInit(&reader);
+ ArrowIpcDecoderInit(&decoder);
- EXPECT_EQ(ArrowIpcReaderVerify(&reader, data, &error), EINVAL);
+ EXPECT_EQ(ArrowIpcDecoderVerifyHeader(&decoder, data, &error), ESPIPE);
EXPECT_STREQ(error.message,
"Expected data of at least 8 bytes but only 1 bytes remain");
uint32_t eight_bad_bytes[] = {0, 0};
data.data.as_uint8 = reinterpret_cast<uint8_t*>(eight_bad_bytes);
data.size_bytes = 8;
- EXPECT_EQ(ArrowIpcReaderVerify(&reader, data, &error), EINVAL);
+ EXPECT_EQ(ArrowIpcDecoderVerifyHeader(&decoder, data, &error), EINVAL);
EXPECT_STREQ(error.message,
"Expected 0xFFFFFFFF at start of message but found 0x00000000");
eight_bad_bytes[0] = 0xFFFFFFFF;
eight_bad_bytes[1] = static_cast<uint32_t>(-1);
- EXPECT_EQ(ArrowIpcReaderVerify(&reader, data, &error), EINVAL);
+ EXPECT_EQ(ArrowIpcDecoderVerifyHeader(&decoder, data, &error), EINVAL);
EXPECT_STREQ(error.message,
- "Expected 0 <= message body size <= 0 bytes but found message
body size "
- "of -1 bytes");
+ "Expected message body size > 0 but found message body size of
-1 bytes");
eight_bad_bytes[1] = static_cast<uint32_t>(1);
- EXPECT_EQ(ArrowIpcReaderVerify(&reader, data, &error), EINVAL);
+ EXPECT_EQ(ArrowIpcDecoderVerifyHeader(&decoder, data, &error), ESPIPE);
EXPECT_STREQ(error.message,
"Expected 0 <= message body size <= 0 bytes but found message
body size "
"of 1 bytes");
eight_bad_bytes[0] = 0xFFFFFFFF;
eight_bad_bytes[1] = 0;
- EXPECT_EQ(ArrowIpcReaderVerify(&reader, data, &error), ENODATA);
+ EXPECT_EQ(ArrowIpcDecoderVerifyHeader(&decoder, data, &error), ENODATA);
EXPECT_STREQ(error.message, "End of Arrow stream");
- ArrowIpcReaderReset(&reader);
+ ArrowIpcDecoderReset(&decoder);
}
TEST(NanoarrowIpcTest, NanoarrowIpcPeekSimpleSchema) {
- struct ArrowIpcReader reader;
+ struct ArrowIpcDecoder decoder;
struct ArrowError error;
struct ArrowBufferView data;
data.data.as_uint8 = kSimpleSchema;
data.size_bytes = sizeof(kSimpleSchema);
- ArrowIpcReaderInit(&reader);
- EXPECT_EQ(ArrowIpcReaderPeek(&reader, data, &error), NANOARROW_OK);
- EXPECT_EQ(reader.header_size_bytes, sizeof(kSimpleSchema));
- EXPECT_EQ(reader.body_size_bytes, 0);
+ ArrowIpcDecoderInit(&decoder);
+ EXPECT_EQ(ArrowIpcDecoderPeekHeader(&decoder, data, &error), NANOARROW_OK);
+ EXPECT_EQ(decoder.header_size_bytes, sizeof(kSimpleSchema));
+ EXPECT_EQ(decoder.body_size_bytes, 0);
- ArrowIpcReaderReset(&reader);
+ ArrowIpcDecoderReset(&decoder);
}
TEST(NanoarrowIpcTest, NanoarrowIpcVerifySimpleSchema) {
- struct ArrowIpcReader reader;
+ struct ArrowIpcDecoder decoder;
struct ArrowError error;
struct ArrowBufferView data;
data.data.as_uint8 = kSimpleSchema;
data.size_bytes = sizeof(kSimpleSchema);
- ArrowIpcReaderInit(&reader);
- EXPECT_EQ(ArrowIpcReaderVerify(&reader, data, &error), NANOARROW_OK);
- EXPECT_EQ(reader.header_size_bytes, sizeof(kSimpleSchema));
- EXPECT_EQ(reader.body_size_bytes, 0);
+ ArrowIpcDecoderInit(&decoder);
+ EXPECT_EQ(ArrowIpcDecoderVerifyHeader(&decoder, data, &error), NANOARROW_OK);
+ EXPECT_EQ(decoder.message_type, NANOARROW_IPC_MESSAGE_TYPE_SCHEMA);
+ EXPECT_EQ(decoder.header_size_bytes, sizeof(kSimpleSchema));
+ EXPECT_EQ(decoder.body_size_bytes, 0);
uint8_t simple_schema_invalid[280];
memcpy(simple_schema_invalid, kSimpleSchema, sizeof(simple_schema_invalid));
@@ -139,36 +170,206 @@ TEST(NanoarrowIpcTest, NanoarrowIpcVerifySimpleSchema) {
data.data.as_uint8 = simple_schema_invalid;
data.size_bytes = sizeof(kSimpleSchema);
- EXPECT_EQ(ArrowIpcReaderVerify(&reader, data, &error), EINVAL);
+ EXPECT_EQ(ArrowIpcDecoderVerifyHeader(&decoder, data, &error), EINVAL);
EXPECT_STREQ(error.message, "Message flatbuffer verification failed");
- ArrowIpcReaderReset(&reader);
+ ArrowIpcDecoderReset(&decoder);
+}
+
+TEST(NanoarrowIpcTest, NanoarrowIpcVerifySimpleRecordBatch) {
+ struct ArrowIpcDecoder decoder;
+ struct ArrowError error;
+
+ struct ArrowBufferView data;
+ data.data.as_uint8 = kSimpleRecordBatch;
+ data.size_bytes = sizeof(kSimpleRecordBatch);
+
+ ArrowIpcDecoderInit(&decoder);
+ EXPECT_EQ(ArrowIpcDecoderVerifyHeader(&decoder, data, &error), NANOARROW_OK);
+ EXPECT_EQ(decoder.message_type, NANOARROW_IPC_MESSAGE_TYPE_RECORD_BATCH);
+ EXPECT_EQ(decoder.header_size_bytes,
+ sizeof(kSimpleRecordBatch) - decoder.body_size_bytes);
+ EXPECT_EQ(decoder.body_size_bytes, 16);
+
+ ArrowIpcDecoderReset(&decoder);
}
TEST(NanoarrowIpcTest, NanoarrowIpcDecodeSimpleSchema) {
- struct ArrowIpcReader reader;
+ struct ArrowIpcDecoder decoder;
struct ArrowError error;
+ struct ArrowSchema schema;
struct ArrowBufferView data;
data.data.as_uint8 = kSimpleSchema;
data.size_bytes = sizeof(kSimpleSchema);
- ArrowIpcReaderInit(&reader);
+ ArrowIpcDecoderInit(&decoder);
+
+ EXPECT_EQ(ArrowIpcDecoderDecodeSchema(&decoder, &schema, &error), EINVAL);
+ EXPECT_STREQ(error.message, "decoder did not just decode a Schema message");
+
+ EXPECT_EQ(ArrowIpcDecoderDecodeHeader(&decoder, data, &error), NANOARROW_OK);
+ EXPECT_EQ(decoder.header_size_bytes, sizeof(kSimpleSchema));
+ EXPECT_EQ(decoder.body_size_bytes, 0);
+
+ EXPECT_EQ(decoder.message_type, NANOARROW_IPC_MESSAGE_TYPE_SCHEMA);
+ EXPECT_EQ(decoder.endianness, NANOARROW_IPC_ENDIANNESS_LITTLE);
+ EXPECT_EQ(decoder.feature_flags, 0);
+
+ ASSERT_EQ(ArrowIpcDecoderDecodeSchema(&decoder, &schema, &error),
NANOARROW_OK);
+ ASSERT_EQ(schema.n_children, 1);
+ EXPECT_STREQ(schema.children[0]->name, "some_col");
+ EXPECT_EQ(schema.children[0]->flags, ARROW_FLAG_NULLABLE);
+ EXPECT_STREQ(schema.children[0]->format, "i");
+
+ schema.release(&schema);
+ ArrowIpcDecoderReset(&decoder);
+}
+
+TEST(NanoarrowIpcTest, NanoarrowIpcDecodeSimpleRecordBatch) {
+ struct ArrowIpcDecoder decoder;
+ struct ArrowError error;
+ struct ArrowSchema schema;
+ struct ArrowArray array;
+
+ ArrowSchemaInit(&schema);
+ ASSERT_EQ(ArrowSchemaSetTypeStruct(&schema, 1), NANOARROW_OK);
+ ASSERT_EQ(ArrowSchemaSetType(schema.children[0], NANOARROW_TYPE_INT32),
NANOARROW_OK);
+
+ struct ArrowBufferView data;
+ data.data.as_uint8 = kSimpleRecordBatch;
+ data.size_bytes = sizeof(kSimpleRecordBatch);
+
+ ArrowIpcDecoderInit(&decoder);
+ auto decoder_private =
+ reinterpret_cast<struct ArrowIpcDecoderPrivate*>(decoder.private_data);
+
+ // Attempt to get array should fail nicely here
+ EXPECT_EQ(ArrowIpcDecoderDecodeArray(&decoder, data, 0, nullptr, &error),
EINVAL);
+ EXPECT_STREQ(error.message, "decoder did not just decode a RecordBatch
message");
+
+ ASSERT_EQ(ArrowIpcDecoderSetSchema(&decoder, &schema, nullptr),
NANOARROW_OK);
+
+ EXPECT_EQ(ArrowIpcDecoderDecodeHeader(&decoder, data, &error), NANOARROW_OK);
+ EXPECT_EQ(decoder.message_type, NANOARROW_IPC_MESSAGE_TYPE_RECORD_BATCH);
+ EXPECT_EQ(decoder.header_size_bytes,
+ sizeof(kSimpleRecordBatch) - decoder.body_size_bytes);
+ EXPECT_EQ(decoder.body_size_bytes, 16);
+
+ EXPECT_EQ(decoder.codec, NANOARROW_IPC_COMPRESSION_TYPE_NONE);
+
+ struct ArrowBufferView body;
+ body.data.as_uint8 = kSimpleRecordBatch + decoder.header_size_bytes;
+ body.size_bytes = decoder.body_size_bytes;
+
+ // Check full struct extract
+ EXPECT_EQ(ArrowIpcDecoderDecodeArray(&decoder, body, -1, &array, nullptr),
+ NANOARROW_OK);
+ EXPECT_EQ(array.length, 3);
+ EXPECT_EQ(array.null_count, 0);
+ ASSERT_EQ(array.n_children, 1);
+ ASSERT_EQ(array.children[0]->n_buffers, 2);
+ ASSERT_EQ(array.children[0]->length, 3);
+ EXPECT_EQ(array.children[0]->null_count, 0);
+ const int32_t* out = reinterpret_cast<const
int32_t*>(array.children[0]->buffers[1]);
+ EXPECT_EQ(out[0], 1);
+ EXPECT_EQ(out[1], 2);
+ EXPECT_EQ(out[2], 3);
+
+ array.release(&array);
+
+ // Check field extract
+ EXPECT_EQ(ArrowIpcDecoderDecodeArray(&decoder, body, 0, &array, nullptr),
NANOARROW_OK);
+ ASSERT_EQ(array.n_buffers, 2);
+ ASSERT_EQ(array.length, 3);
+ EXPECT_EQ(array.null_count, 0);
+ out = reinterpret_cast<const int32_t*>(array.buffers[1]);
+ EXPECT_EQ(out[0], 1);
+ EXPECT_EQ(out[1], 2);
+ EXPECT_EQ(out[2], 3);
+
+ array.release(&array);
+
+ // Field extract should fail if compression was set
+ decoder.codec = NANOARROW_IPC_COMPRESSION_TYPE_ZSTD;
+ EXPECT_EQ(ArrowIpcDecoderDecodeArray(&decoder, body, 0, &array, &error),
ENOTSUP);
+ EXPECT_STREQ(error.message, "The nanoarrow_ipc extension does not support
compression");
+ decoder.codec = NANOARROW_IPC_COMPRESSION_TYPE_NONE;
+
+ // Field extract should fail on non-system endian
+ // This test will have to get updated when we start testing on big endian
+ decoder.endianness = NANOARROW_IPC_ENDIANNESS_BIG;
+ EXPECT_EQ(ArrowIpcDecoderDecodeArray(&decoder, body, 0, &array, &error),
ENOTSUP);
+ EXPECT_STREQ(error.message,
+ "The nanoarrow_ipc extension does not support non-system
endianness");
+ decoder.endianness = NANOARROW_IPC_ENDIANNESS_UNINITIALIZED;
+
+ // Field extract should fail if body is too small
+ body.size_bytes = 0;
+ EXPECT_EQ(ArrowIpcDecoderDecodeArray(&decoder, body, 0, &array, &error),
EINVAL);
+ EXPECT_STREQ(error.message,
+ "Buffer 1 requires body offsets [0..12) but body has size 0");
+
+ // Should error if the number of buffers or field nodes doesn't match
+ // (different numbers because we count the root struct and the message does
not)
+ decoder_private->n_buffers = 1;
+ EXPECT_EQ(ArrowIpcDecoderDecodeHeader(&decoder, data, &error), EINVAL);
+ EXPECT_STREQ(error.message, "Expected 0 buffers in message but found 2");
+
+ decoder_private->n_fields = 1;
+ EXPECT_EQ(ArrowIpcDecoderDecodeHeader(&decoder, data, &error), EINVAL);
+ EXPECT_STREQ(error.message, "Expected 0 field nodes in message but found 1");
+
+ schema.release(&schema);
+ ArrowIpcDecoderReset(&decoder);
+}
+
+TEST(NanoarrowIpcTest, NanoarrowIpcSetSchema) {
+ struct ArrowIpcDecoder decoder;
+ struct ArrowSchema schema;
- EXPECT_EQ(ArrowIpcReaderDecode(&reader, data, &error), NANOARROW_OK);
- EXPECT_EQ(reader.header_size_bytes, sizeof(kSimpleSchema));
- EXPECT_EQ(reader.body_size_bytes, 0);
+ ArrowSchemaInit(&schema);
+ ASSERT_EQ(ArrowSchemaSetTypeStruct(&schema, 1), NANOARROW_OK);
+ ASSERT_EQ(ArrowSchemaSetName(schema.children[0], "col1"), NANOARROW_OK);
+ ASSERT_EQ(ArrowSchemaSetType(schema.children[0], NANOARROW_TYPE_INT32),
NANOARROW_OK);
- EXPECT_EQ(reader.message_type, NANOARROW_IPC_MESSAGE_TYPE_SCHEMA);
- EXPECT_EQ(reader.endianness, NANOARROW_IPC_ENDIANNESS_LITTLE);
- EXPECT_EQ(reader.features, 0);
+ ArrowIpcDecoderInit(&decoder);
+ auto decoder_private =
+ reinterpret_cast<struct ArrowIpcDecoderPrivate*>(decoder.private_data);
- ASSERT_EQ(reader.schema.n_children, 1);
- EXPECT_STREQ(reader.schema.children[0]->name, "some_col");
- EXPECT_EQ(reader.schema.children[0]->flags, ARROW_FLAG_NULLABLE);
- EXPECT_STREQ(reader.schema.children[0]->format, "i");
+ EXPECT_EQ(ArrowIpcDecoderSetSchema(&decoder, &schema, nullptr),
NANOARROW_OK);
+ EXPECT_EQ(decoder_private->n_fields, 2);
+ EXPECT_EQ(decoder_private->n_buffers, 3);
- ArrowIpcReaderReset(&reader);
+ EXPECT_EQ(decoder_private->fields[0].array_view->storage_type,
NANOARROW_TYPE_STRUCT);
+ EXPECT_EQ(decoder_private->fields[0].buffer_offset, 0);
+
+ EXPECT_EQ(decoder_private->fields[1].array_view->storage_type,
NANOARROW_TYPE_INT32);
+ EXPECT_EQ(decoder_private->fields[1].buffer_offset, 1);
+
+ schema.release(&schema);
+ ArrowIpcDecoderReset(&decoder);
+}
+
+TEST(NanoarrowIpcTest, NanoarrowIpcSetSchemaErrors) {
+ struct ArrowIpcDecoder decoder;
+ struct ArrowError error;
+ struct ArrowSchema schema;
+
+ ArrowIpcDecoderInit(&decoder);
+ ArrowSchemaInit(&schema);
+
+ EXPECT_EQ(ArrowIpcDecoderSetSchema(&decoder, &schema, &error), EINVAL);
+ EXPECT_STREQ(
+ error.message,
+ "Error parsing schema->format: Expected a null-terminated string but
found NULL");
+
+ ASSERT_EQ(ArrowSchemaInitFromType(&schema, NANOARROW_TYPE_INT32),
NANOARROW_OK);
+ EXPECT_EQ(ArrowIpcDecoderSetSchema(&decoder, &schema, &error), EINVAL);
+ EXPECT_STREQ(error.message, "schema must be a struct type");
+
+ schema.release(&schema);
+ ArrowIpcDecoderReset(&decoder);
}
class ArrowTypeParameterizedTestFixture
@@ -188,21 +389,87 @@ TEST_P(ArrowTypeParameterizedTestFixture,
NanoarrowIpcArrowTypeRoundtrip) {
buffer_view.data.data = maybe_serialized.ValueUnsafe()->data();
buffer_view.size_bytes = maybe_serialized.ValueOrDie()->size();
- struct ArrowIpcReader reader;
- ArrowIpcReaderInit(&reader);
- ASSERT_EQ(ArrowIpcReaderVerify(&reader, buffer_view, nullptr), NANOARROW_OK);
- EXPECT_EQ(reader.header_size_bytes, buffer_view.size_bytes);
- EXPECT_EQ(reader.body_size_bytes, 0);
+ struct ArrowIpcDecoder decoder;
+ ArrowIpcDecoderInit(&decoder);
+ ASSERT_EQ(ArrowIpcDecoderVerifyHeader(&decoder, buffer_view, nullptr),
NANOARROW_OK);
+ EXPECT_EQ(decoder.header_size_bytes, buffer_view.size_bytes);
+ EXPECT_EQ(decoder.body_size_bytes, 0);
- ASSERT_EQ(ArrowIpcReaderDecode(&reader, buffer_view, nullptr), NANOARROW_OK);
- auto maybe_schema = arrow::ImportSchema(&reader.schema);
+ ASSERT_EQ(ArrowIpcDecoderDecodeHeader(&decoder, buffer_view, nullptr),
NANOARROW_OK);
+ struct ArrowSchema schema;
+ ASSERT_EQ(ArrowIpcDecoderDecodeSchema(&decoder, &schema, nullptr),
NANOARROW_OK);
+ auto maybe_schema = arrow::ImportSchema(&schema);
ASSERT_TRUE(maybe_schema.ok());
// Better failure message if we first check for string equality
EXPECT_EQ(maybe_schema.ValueUnsafe()->ToString(), dummy_schema->ToString());
EXPECT_TRUE(maybe_schema.ValueUnsafe()->Equals(dummy_schema, true));
- ArrowIpcReaderReset(&reader);
+ ArrowIpcDecoderReset(&decoder);
+}
+
+TEST_P(ArrowTypeParameterizedTestFixture, NanoarrowIpcArrowArrayRoundtrip) {
+ const std::shared_ptr<arrow::DataType>& data_type = GetParam();
+ std::shared_ptr<arrow::Schema> dummy_schema =
+ arrow::schema({arrow::field("dummy_name", data_type)});
+
+ auto maybe_empty = arrow::RecordBatch::MakeEmpty(dummy_schema);
+ ASSERT_TRUE(maybe_empty.ok());
+ auto empty = maybe_empty.ValueUnsafe();
+
+ auto maybe_nulls_array = arrow::MakeArrayOfNull(data_type, 3);
+ ASSERT_TRUE(maybe_nulls_array.ok());
+ auto nulls =
+ arrow::RecordBatch::Make(dummy_schema, 3,
{maybe_nulls_array.ValueUnsafe()});
+
+ auto options = arrow::ipc::IpcWriteOptions::Defaults();
+
+ struct ArrowSchema schema;
+ struct ArrowIpcDecoder decoder;
+ struct ArrowBufferView buffer_view;
+ struct ArrowArray array;
+
+ // Initialize the decoder
+ ASSERT_TRUE(arrow::ExportSchema(*dummy_schema, &schema).ok());
+ ArrowIpcDecoderInit(&decoder);
+ ASSERT_EQ(ArrowIpcDecoderSetSchema(&decoder, &schema, nullptr),
NANOARROW_OK);
+
+ // Check the empty array
+ auto maybe_serialized = arrow::ipc::SerializeRecordBatch(*empty, options);
+ ASSERT_TRUE(maybe_serialized.ok());
+ buffer_view.data.data = maybe_serialized.ValueUnsafe()->data();
+ buffer_view.size_bytes = maybe_serialized.ValueOrDie()->size();
+
+ ASSERT_EQ(ArrowIpcDecoderDecodeHeader(&decoder, buffer_view, nullptr),
NANOARROW_OK);
+ buffer_view.data.as_uint8 += decoder.header_size_bytes;
+ buffer_view.size_bytes -= decoder.header_size_bytes;
+ ASSERT_EQ(ArrowIpcDecoderDecodeArray(&decoder, buffer_view, -1, &array,
nullptr),
+ NANOARROW_OK);
+
+ auto maybe_batch = arrow::ImportRecordBatch(&array, dummy_schema);
+ ASSERT_TRUE(maybe_batch.ok());
+ EXPECT_EQ(maybe_batch.ValueUnsafe()->ToString(), empty->ToString());
+ EXPECT_TRUE(maybe_batch.ValueUnsafe()->Equals(*empty));
+
+ // Check the array with 3 null values
+ maybe_serialized = arrow::ipc::SerializeRecordBatch(*nulls, options);
+ ASSERT_TRUE(maybe_serialized.ok());
+ buffer_view.data.data = maybe_serialized.ValueUnsafe()->data();
+ buffer_view.size_bytes = maybe_serialized.ValueOrDie()->size();
+
+ ASSERT_EQ(ArrowIpcDecoderDecodeHeader(&decoder, buffer_view, nullptr),
NANOARROW_OK);
+ buffer_view.data.as_uint8 += decoder.header_size_bytes;
+ buffer_view.size_bytes -= decoder.header_size_bytes;
+ ASSERT_EQ(ArrowIpcDecoderDecodeArray(&decoder, buffer_view, -1, &array,
nullptr),
+ NANOARROW_OK);
+
+ maybe_batch = arrow::ImportRecordBatch(&array, dummy_schema);
+ ASSERT_TRUE(maybe_batch.ok());
+ EXPECT_EQ(maybe_batch.ValueUnsafe()->ToString(), nulls->ToString());
+ EXPECT_TRUE(maybe_batch.ValueUnsafe()->Equals(*nulls));
+
+ schema.release(&schema);
+ ArrowIpcDecoderReset(&decoder);
}
INSTANTIATE_TEST_SUITE_P(
@@ -267,21 +534,23 @@ TEST_P(ArrowSchemaParameterizedTestFixture,
NanoarrowIpcArrowSchemaRoundtrip) {
buffer_view.data.data = maybe_serialized.ValueUnsafe()->data();
buffer_view.size_bytes = maybe_serialized.ValueOrDie()->size();
- struct ArrowIpcReader reader;
- ArrowIpcReaderInit(&reader);
- ASSERT_EQ(ArrowIpcReaderVerify(&reader, buffer_view, nullptr), NANOARROW_OK);
- EXPECT_EQ(reader.header_size_bytes, buffer_view.size_bytes);
- EXPECT_EQ(reader.body_size_bytes, 0);
+ struct ArrowIpcDecoder decoder;
+ ArrowIpcDecoderInit(&decoder);
+ ASSERT_EQ(ArrowIpcDecoderVerifyHeader(&decoder, buffer_view, nullptr),
NANOARROW_OK);
+ EXPECT_EQ(decoder.header_size_bytes, buffer_view.size_bytes);
+ EXPECT_EQ(decoder.body_size_bytes, 0);
- ASSERT_EQ(ArrowIpcReaderDecode(&reader, buffer_view, nullptr), NANOARROW_OK);
- auto maybe_schema = arrow::ImportSchema(&reader.schema);
+ ASSERT_EQ(ArrowIpcDecoderDecodeHeader(&decoder, buffer_view, nullptr),
NANOARROW_OK);
+ struct ArrowSchema schema;
+ ASSERT_EQ(ArrowIpcDecoderDecodeSchema(&decoder, &schema, nullptr),
NANOARROW_OK);
+ auto maybe_schema = arrow::ImportSchema(&schema);
ASSERT_TRUE(maybe_schema.ok());
// Better failure message if we first check for string equality
EXPECT_EQ(maybe_schema.ValueUnsafe()->ToString(), arrow_schema->ToString());
EXPECT_TRUE(maybe_schema.ValueUnsafe()->Equals(arrow_schema, true));
- ArrowIpcReaderReset(&reader);
+ ArrowIpcDecoderReset(&decoder);
}
INSTANTIATE_TEST_SUITE_P(