This is an automated email from the ASF dual-hosted git repository.
github-bot pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/arrow-nanoarrow.git
The following commit(s) were added to refs/heads/main by this push:
new 95bcebb ipc: Update dist/ for commit
6437f59dc45509c78a1d9403942d6d9d3e85895b
95bcebb is described below
commit 95bcebbe70cfe4aa7837642997340c19512ad31b
Author: GitHub Actions <[email protected]>
AuthorDate: Wed Mar 8 19:20:12 2023 +0000
ipc: Update dist/ for commit 6437f59dc45509c78a1d9403942d6d9d3e85895b
---
dist/nanoarrow_ipc.c | 703 +++++++++++++++++++++++++++++++++++++++------------
dist/nanoarrow_ipc.h | 178 +++++++++++--
2 files changed, 693 insertions(+), 188 deletions(-)
diff --git a/dist/nanoarrow_ipc.c b/dist/nanoarrow_ipc.c
index 7300657..bb518a6 100644
--- a/dist/nanoarrow_ipc.c
+++ b/dist/nanoarrow_ipc.c
@@ -20294,6 +20294,19 @@ static inline int
org_apache_arrow_flatbuf_Tensor_verify_as_root_with_type_hash(
#include "nanoarrow_ipc.h"
+struct ArrowIpcField {
+ struct ArrowArrayView* array_view;
+ int64_t buffer_offset;
+};
+
+struct ArrowIpcDecoderPrivate {
+ struct ArrowArrayView array_view;
+ int64_t n_fields;
+ struct ArrowIpcField* fields;
+ int64_t n_buffers;
+ const void* last_message;
+};
+
ArrowErrorCode ArrowIpcCheckRuntime(struct ArrowError* error) {
const char* nanoarrow_runtime_version = ArrowNanoarrowVersion();
const char* nanoarrow_ipc_build_time_version = NANOARROW_VERSION;
@@ -20307,16 +20320,32 @@ ArrowErrorCode ArrowIpcCheckRuntime(struct
ArrowError* error) {
return NANOARROW_OK;
}
-void ArrowIpcReaderInit(struct ArrowIpcReader* reader) {
- memset(reader, 0, sizeof(struct ArrowIpcReader));
+ArrowErrorCode ArrowIpcDecoderInit(struct ArrowIpcDecoder* decoder) {
+ memset(decoder, 0, sizeof(struct ArrowIpcDecoder));
+ struct ArrowIpcDecoderPrivate* private_data =
+ (struct ArrowIpcDecoderPrivate*)ArrowMalloc(sizeof(struct
ArrowIpcDecoderPrivate));
+ if (private_data == NULL) {
+ return ENOMEM;
+ }
+
+ memset(private_data, 0, sizeof(struct ArrowIpcDecoderPrivate));
+ decoder->private_data = private_data;
+ return NANOARROW_OK;
}
-void ArrowIpcReaderReset(struct ArrowIpcReader* reader) {
- if (reader->schema.release != NULL) {
- reader->schema.release(&reader->schema);
+void ArrowIpcDecoderReset(struct ArrowIpcDecoder* decoder) {
+ struct ArrowIpcDecoderPrivate* private_data =
+ (struct ArrowIpcDecoderPrivate*)decoder->private_data;
+
+ ArrowArrayViewReset(&private_data->array_view);
+
+ if (private_data->fields != NULL) {
+ ArrowFree(private_data->fields);
+ private_data->n_fields = 0;
}
- ArrowIpcReaderInit(reader);
+ ArrowFree(private_data);
+ memset(decoder, 0, sizeof(struct ArrowIpcDecoder));
}
static inline uint32_t ArrowIpcReadUint32LE(struct ArrowBufferView* data) {
@@ -20339,9 +20368,9 @@ static inline int32_t ArrowIpcReadInt32LE(struct
ArrowBufferView* data) {
#define ns(x) FLATBUFFERS_WRAP_NAMESPACE(org_apache_arrow_flatbuf, x)
-static int ArrowIpcReaderSetMetadata(struct ArrowSchema* schema,
- ns(KeyValue_vec_t) kv_vec,
- struct ArrowError* error) {
+static int ArrowIpcDecoderSetMetadata(struct ArrowSchema* schema,
+ ns(KeyValue_vec_t) kv_vec,
+ struct ArrowError* error) {
int64_t n_pairs = ns(KeyValue_vec_len(kv_vec));
if (n_pairs == 0) {
return NANOARROW_OK;
@@ -20392,8 +20421,8 @@ static int ArrowIpcReaderSetMetadata(struct
ArrowSchema* schema,
return NANOARROW_OK;
}
-static int ArrowIpcReaderSetTypeSimple(struct ArrowSchema* schema, int
nanoarrow_type,
- struct ArrowError* error) {
+static int ArrowIpcDecoderSetTypeSimple(struct ArrowSchema* schema, int
nanoarrow_type,
+ struct ArrowError* error) {
int result = ArrowSchemaSetType(schema, nanoarrow_type);
if (result != NANOARROW_OK) {
ArrowErrorSet(error, "ArrowSchemaSetType() failed for type %s",
@@ -20404,9 +20433,9 @@ static int ArrowIpcReaderSetTypeSimple(struct
ArrowSchema* schema, int nanoarrow
return NANOARROW_OK;
}
-static int ArrowIpcReaderSetTypeInt(struct ArrowSchema* schema,
- flatbuffers_generic_t type_generic,
- struct ArrowError* error) {
+static int ArrowIpcDecoderSetTypeInt(struct ArrowSchema* schema,
+ flatbuffers_generic_t type_generic,
+ struct ArrowError* error) {
ns(Int_table_t) type = (ns(Int_table_t))type_generic;
int is_signed = ns(Int_is_signed_get(type));
@@ -20455,21 +20484,21 @@ static int ArrowIpcReaderSetTypeInt(struct
ArrowSchema* schema,
}
}
- return ArrowIpcReaderSetTypeSimple(schema, nanoarrow_type, error);
+ return ArrowIpcDecoderSetTypeSimple(schema, nanoarrow_type, error);
}
-static int ArrowIpcReaderSetTypeFloatingPoint(struct ArrowSchema* schema,
- flatbuffers_generic_t
type_generic,
- struct ArrowError* error) {
+static int ArrowIpcDecoderSetTypeFloatingPoint(struct ArrowSchema* schema,
+ flatbuffers_generic_t
type_generic,
+ struct ArrowError* error) {
ns(FloatingPoint_table_t) type = (ns(FloatingPoint_table_t))type_generic;
int precision = ns(FloatingPoint_precision(type));
switch (precision) {
case ns(Precision_HALF):
- return ArrowIpcReaderSetTypeSimple(schema, NANOARROW_TYPE_HALF_FLOAT,
error);
+ return ArrowIpcDecoderSetTypeSimple(schema, NANOARROW_TYPE_HALF_FLOAT,
error);
case ns(Precision_SINGLE):
- return ArrowIpcReaderSetTypeSimple(schema, NANOARROW_TYPE_FLOAT, error);
+ return ArrowIpcDecoderSetTypeSimple(schema, NANOARROW_TYPE_FLOAT, error);
case ns(Precision_DOUBLE):
- return ArrowIpcReaderSetTypeSimple(schema, NANOARROW_TYPE_DOUBLE, error);
+ return ArrowIpcDecoderSetTypeSimple(schema, NANOARROW_TYPE_DOUBLE,
error);
default:
ArrowErrorSet(error, "Unexpected FloatingPoint Precision value: %d",
(int)precision);
@@ -20477,9 +20506,9 @@ static int ArrowIpcReaderSetTypeFloatingPoint(struct
ArrowSchema* schema,
}
}
-static int ArrowIpcReaderSetTypeDecimal(struct ArrowSchema* schema,
- flatbuffers_generic_t type_generic,
- struct ArrowError* error) {
+static int ArrowIpcDecoderSetTypeDecimal(struct ArrowSchema* schema,
+ flatbuffers_generic_t type_generic,
+ struct ArrowError* error) {
ns(Decimal_table_t) type = (ns(Decimal_table_t))type_generic;
int scale = ns(Decimal_scale(type));
int precision = ns(Decimal_precision(type));
@@ -20508,34 +20537,34 @@ static int ArrowIpcReaderSetTypeDecimal(struct
ArrowSchema* schema,
return NANOARROW_OK;
}
-static int ArrowIpcReaderSetTypeFixedSizeBinary(struct ArrowSchema* schema,
- flatbuffers_generic_t
type_generic,
- struct ArrowError* error) {
+static int ArrowIpcDecoderSetTypeFixedSizeBinary(struct ArrowSchema* schema,
+ flatbuffers_generic_t
type_generic,
+ struct ArrowError* error) {
ns(FixedSizeBinary_table_t) type = (ns(FixedSizeBinary_table_t))type_generic;
int fixed_size = ns(FixedSizeBinary_byteWidth(type));
return ArrowSchemaSetTypeFixedSize(schema, NANOARROW_TYPE_FIXED_SIZE_BINARY,
fixed_size);
}
-static int ArrowIpcReaderSetTypeDate(struct ArrowSchema* schema,
- flatbuffers_generic_t type_generic,
- struct ArrowError* error) {
+static int ArrowIpcDecoderSetTypeDate(struct ArrowSchema* schema,
+ flatbuffers_generic_t type_generic,
+ struct ArrowError* error) {
ns(Date_table_t) type = (ns(Date_table_t))type_generic;
int date_unit = ns(Date_unit(type));
switch (date_unit) {
case ns(DateUnit_DAY):
- return ArrowIpcReaderSetTypeSimple(schema, NANOARROW_TYPE_DATE32, error);
+ return ArrowIpcDecoderSetTypeSimple(schema, NANOARROW_TYPE_DATE32,
error);
case ns(DateUnit_MILLISECOND):
- return ArrowIpcReaderSetTypeSimple(schema, NANOARROW_TYPE_DATE64, error);
+ return ArrowIpcDecoderSetTypeSimple(schema, NANOARROW_TYPE_DATE64,
error);
default:
ArrowErrorSet(error, "Unexpected Date DateUnit value: %d",
(int)date_unit);
return EINVAL;
}
}
-static int ArrowIpcReaderSetTypeTime(struct ArrowSchema* schema,
- flatbuffers_generic_t type_generic,
- struct ArrowError* error) {
+static int ArrowIpcDecoderSetTypeTime(struct ArrowSchema* schema,
+ flatbuffers_generic_t type_generic,
+ struct ArrowError* error) {
ns(Time_table_t) type = (ns(Time_table_t))type_generic;
int time_unit = ns(Time_unit(type));
int bitwidth = ns(Time_bitWidth(type));
@@ -20578,9 +20607,9 @@ static int ArrowIpcReaderSetTypeTime(struct
ArrowSchema* schema,
return NANOARROW_OK;
}
-static int ArrowIpcReaderSetTypeTimestamp(struct ArrowSchema* schema,
- flatbuffers_generic_t type_generic,
- struct ArrowError* error) {
+static int ArrowIpcDecoderSetTypeTimestamp(struct ArrowSchema* schema,
+ flatbuffers_generic_t type_generic,
+ struct ArrowError* error) {
ns(Timestamp_table_t) type = (ns(Timestamp_table_t))type_generic;
int time_unit = ns(Timestamp_unit(type));
@@ -20599,9 +20628,9 @@ static int ArrowIpcReaderSetTypeTimestamp(struct
ArrowSchema* schema,
return NANOARROW_OK;
}
-static int ArrowIpcReaderSetTypeDuration(struct ArrowSchema* schema,
- flatbuffers_generic_t type_generic,
- struct ArrowError* error) {
+static int ArrowIpcDecoderSetTypeDuration(struct ArrowSchema* schema,
+ flatbuffers_generic_t type_generic,
+ struct ArrowError* error) {
ns(Duration_table_t) type = (ns(Duration_table_t))type_generic;
int time_unit = ns(Duration_unit(type));
@@ -20615,20 +20644,21 @@ static int ArrowIpcReaderSetTypeDuration(struct
ArrowSchema* schema,
return NANOARROW_OK;
}
-static int ArrowIpcReaderSetTypeInterval(struct ArrowSchema* schema,
- flatbuffers_generic_t type_generic,
- struct ArrowError* error) {
+static int ArrowIpcDecoderSetTypeInterval(struct ArrowSchema* schema,
+ flatbuffers_generic_t type_generic,
+ struct ArrowError* error) {
ns(Interval_table_t) type = (ns(Interval_table_t))type_generic;
int interval_unit = ns(Interval_unit(type));
switch (interval_unit) {
case ns(IntervalUnit_YEAR_MONTH):
- return ArrowIpcReaderSetTypeSimple(schema,
NANOARROW_TYPE_INTERVAL_MONTHS, error);
+ return ArrowIpcDecoderSetTypeSimple(schema,
NANOARROW_TYPE_INTERVAL_MONTHS, error);
case ns(IntervalUnit_DAY_TIME):
- return ArrowIpcReaderSetTypeSimple(schema,
NANOARROW_TYPE_INTERVAL_DAY_TIME, error);
+ return ArrowIpcDecoderSetTypeSimple(schema,
NANOARROW_TYPE_INTERVAL_DAY_TIME,
+ error);
case ns(IntervalUnit_MONTH_DAY_NANO):
- return ArrowIpcReaderSetTypeSimple(schema,
NANOARROW_TYPE_INTERVAL_MONTH_DAY_NANO,
- error);
+ return ArrowIpcDecoderSetTypeSimple(schema,
NANOARROW_TYPE_INTERVAL_MONTH_DAY_NANO,
+ error);
default:
ArrowErrorSet(error, "Unexpected Interval unit value: %d",
(int)interval_unit);
return EINVAL;
@@ -20639,9 +20669,9 @@ static int ArrowIpcReaderSetTypeInterval(struct
ArrowSchema* schema,
// because the IPC format allows modifying some of the defaults those
functions assume.
// In particular, the allocate + initialize children step is handled outside
these
// setters.
-static int ArrowIpcReaderSetTypeSimpleNested(struct ArrowSchema* schema,
- const char* format,
- struct ArrowError* error) {
+static int ArrowIpcDecoderSetTypeSimpleNested(struct ArrowSchema* schema,
+ const char* format,
+ struct ArrowError* error) {
int result = ArrowSchemaSetFormat(schema, format);
if (result != NANOARROW_OK) {
ArrowErrorSet(error, "ArrowSchemaSetFormat('%s') failed", format);
@@ -20651,23 +20681,23 @@ static int ArrowIpcReaderSetTypeSimpleNested(struct
ArrowSchema* schema,
return NANOARROW_OK;
}
-static int ArrowIpcReaderSetTypeFixedSizeList(struct ArrowSchema* schema,
- flatbuffers_generic_t
type_generic,
- struct ArrowError* error) {
+static int ArrowIpcDecoderSetTypeFixedSizeList(struct ArrowSchema* schema,
+ flatbuffers_generic_t
type_generic,
+ struct ArrowError* error) {
ns(FixedSizeList_table_t) type = (ns(FixedSizeList_table_t))type_generic;
int32_t fixed_size = ns(FixedSizeList_listSize(type));
char fixed_size_str[128];
int n_chars = snprintf(fixed_size_str, 128, "+w:%d", fixed_size);
fixed_size_str[n_chars] = '\0';
- return ArrowIpcReaderSetTypeSimpleNested(schema, fixed_size_str, error);
+ return ArrowIpcDecoderSetTypeSimpleNested(schema, fixed_size_str, error);
}
-static int ArrowIpcReaderSetTypeMap(struct ArrowSchema* schema,
- flatbuffers_generic_t type_generic,
- struct ArrowError* error) {
+static int ArrowIpcDecoderSetTypeMap(struct ArrowSchema* schema,
+ flatbuffers_generic_t type_generic,
+ struct ArrowError* error) {
ns(Map_table_t) type = (ns(Map_table_t))type_generic;
- NANOARROW_RETURN_NOT_OK(ArrowIpcReaderSetTypeSimpleNested(schema, "+m",
error));
+ NANOARROW_RETURN_NOT_OK(ArrowIpcDecoderSetTypeSimpleNested(schema, "+m",
error));
if (ns(Map_keysSorted(type))) {
schema->flags |= ARROW_FLAG_MAP_KEYS_SORTED;
@@ -20678,9 +20708,9 @@ static int ArrowIpcReaderSetTypeMap(struct ArrowSchema*
schema,
return NANOARROW_OK;
}
-static int ArrowIpcReaderSetTypeUnion(struct ArrowSchema* schema,
- flatbuffers_generic_t type_generic,
- int64_t n_children, struct ArrowError*
error) {
+static int ArrowIpcDecoderSetTypeUnion(struct ArrowSchema* schema,
+ flatbuffers_generic_t type_generic,
+ int64_t n_children, struct ArrowError*
error) {
ns(Union_table_t) type = (ns(Union_table_t))type_generic;
int union_mode = ns(Union_mode(type));
@@ -20755,68 +20785,70 @@ static int ArrowIpcReaderSetTypeUnion(struct
ArrowSchema* schema,
}
}
- return ArrowIpcReaderSetTypeSimpleNested(schema, union_types_str, error);
+ return ArrowIpcDecoderSetTypeSimpleNested(schema, union_types_str, error);
}
-static int ArrowIpcReaderSetType(struct ArrowSchema* schema, ns(Field_table_t)
field,
- int64_t n_children, struct ArrowError* error)
{
+static int ArrowIpcDecoderSetType(struct ArrowSchema* schema,
ns(Field_table_t) field,
+ int64_t n_children, struct ArrowError*
error) {
int type_type = ns(Field_type_type(field));
switch (type_type) {
case ns(Type_Null):
- return ArrowIpcReaderSetTypeSimple(schema, NANOARROW_TYPE_NA, error);
+ return ArrowIpcDecoderSetTypeSimple(schema, NANOARROW_TYPE_NA, error);
case ns(Type_Bool):
- return ArrowIpcReaderSetTypeSimple(schema, NANOARROW_TYPE_BOOL, error);
+ return ArrowIpcDecoderSetTypeSimple(schema, NANOARROW_TYPE_BOOL, error);
case ns(Type_Int):
- return ArrowIpcReaderSetTypeInt(schema, ns(Field_type_get(field)),
error);
+ return ArrowIpcDecoderSetTypeInt(schema, ns(Field_type_get(field)),
error);
case ns(Type_FloatingPoint):
- return ArrowIpcReaderSetTypeFloatingPoint(schema,
ns(Field_type_get(field)), error);
+ return ArrowIpcDecoderSetTypeFloatingPoint(schema,
ns(Field_type_get(field)),
+ error);
case ns(Type_Decimal):
- return ArrowIpcReaderSetTypeDecimal(schema, ns(Field_type_get(field)),
error);
+ return ArrowIpcDecoderSetTypeDecimal(schema, ns(Field_type_get(field)),
error);
case ns(Type_Binary):
- return ArrowIpcReaderSetTypeSimple(schema, NANOARROW_TYPE_BINARY, error);
+ return ArrowIpcDecoderSetTypeSimple(schema, NANOARROW_TYPE_BINARY,
error);
case ns(Type_LargeBinary):
- return ArrowIpcReaderSetTypeSimple(schema, NANOARROW_TYPE_LARGE_BINARY,
error);
+ return ArrowIpcDecoderSetTypeSimple(schema, NANOARROW_TYPE_LARGE_BINARY,
error);
case ns(Type_FixedSizeBinary):
- return ArrowIpcReaderSetTypeFixedSizeBinary(schema,
ns(Field_type_get(field)),
- error);
+ return ArrowIpcDecoderSetTypeFixedSizeBinary(schema,
ns(Field_type_get(field)),
+ error);
case ns(Type_Utf8):
- return ArrowIpcReaderSetTypeSimple(schema, NANOARROW_TYPE_STRING, error);
+ return ArrowIpcDecoderSetTypeSimple(schema, NANOARROW_TYPE_STRING,
error);
case ns(Type_LargeUtf8):
- return ArrowIpcReaderSetTypeSimple(schema, NANOARROW_TYPE_LARGE_STRING,
error);
+ return ArrowIpcDecoderSetTypeSimple(schema, NANOARROW_TYPE_LARGE_STRING,
error);
case ns(Type_Date):
- return ArrowIpcReaderSetTypeDate(schema, ns(Field_type_get(field)),
error);
+ return ArrowIpcDecoderSetTypeDate(schema, ns(Field_type_get(field)),
error);
case ns(Type_Time):
- return ArrowIpcReaderSetTypeTime(schema, ns(Field_type_get(field)),
error);
+ return ArrowIpcDecoderSetTypeTime(schema, ns(Field_type_get(field)),
error);
case ns(Type_Timestamp):
- return ArrowIpcReaderSetTypeTimestamp(schema, ns(Field_type_get(field)),
error);
+ return ArrowIpcDecoderSetTypeTimestamp(schema,
ns(Field_type_get(field)), error);
case ns(Type_Duration):
- return ArrowIpcReaderSetTypeDuration(schema, ns(Field_type_get(field)),
error);
+ return ArrowIpcDecoderSetTypeDuration(schema, ns(Field_type_get(field)),
error);
case ns(Type_Interval):
- return ArrowIpcReaderSetTypeInterval(schema, ns(Field_type_get(field)),
error);
+ return ArrowIpcDecoderSetTypeInterval(schema, ns(Field_type_get(field)),
error);
case ns(Type_Struct_):
- return ArrowIpcReaderSetTypeSimpleNested(schema, "+s", error);
+ return ArrowIpcDecoderSetTypeSimpleNested(schema, "+s", error);
case ns(Type_List):
- return ArrowIpcReaderSetTypeSimpleNested(schema, "+l", error);
+ return ArrowIpcDecoderSetTypeSimpleNested(schema, "+l", error);
case ns(Type_LargeList):
- return ArrowIpcReaderSetTypeSimpleNested(schema, "+L", error);
+ return ArrowIpcDecoderSetTypeSimpleNested(schema, "+L", error);
case ns(Type_FixedSizeList):
- return ArrowIpcReaderSetTypeFixedSizeList(schema,
ns(Field_type_get(field)), error);
+ return ArrowIpcDecoderSetTypeFixedSizeList(schema,
ns(Field_type_get(field)),
+ error);
case ns(Type_Map):
- return ArrowIpcReaderSetTypeMap(schema, ns(Field_type_get(field)),
error);
+ return ArrowIpcDecoderSetTypeMap(schema, ns(Field_type_get(field)),
error);
case ns(Type_Union):
- return ArrowIpcReaderSetTypeUnion(schema, ns(Field_type_get(field)),
n_children,
- error);
+ return ArrowIpcDecoderSetTypeUnion(schema, ns(Field_type_get(field)),
n_children,
+ error);
default:
ArrowErrorSet(error, "Unrecognized Field type with value %d",
(int)type_type);
return EINVAL;
}
}
-static int ArrowIpcReaderSetChildren(struct ArrowSchema* schema,
ns(Field_vec_t) fields,
- struct ArrowError* error);
+static int ArrowIpcDecoderSetChildren(struct ArrowSchema* schema,
ns(Field_vec_t) fields,
+ struct ArrowError* error);
-static int ArrowIpcReaderSetField(struct ArrowSchema* schema,
ns(Field_table_t) field,
- struct ArrowError* error) {
+static int ArrowIpcDecoderSetField(struct ArrowSchema* schema,
ns(Field_table_t) field,
+ struct ArrowError* error) {
// No dictionary support yet
if (ns(Field_dictionary_is_present(field))) {
ArrowErrorSet(error, "Field DictionaryEncoding not supported");
@@ -20840,7 +20872,7 @@ static int ArrowIpcReaderSetField(struct ArrowSchema*
schema, ns(Field_table_t)
ns(Field_vec_t) children = ns(Field_children(field));
int64_t n_children = ns(Field_vec_len(children));
- NANOARROW_RETURN_NOT_OK(ArrowIpcReaderSetType(schema, field, n_children,
error));
+ NANOARROW_RETURN_NOT_OK(ArrowIpcDecoderSetType(schema, field, n_children,
error));
// nanoarrow's type setters set the nullable flag by default, so we might
// have to unset it here.
@@ -20862,33 +20894,36 @@ static int ArrowIpcReaderSetField(struct ArrowSchema*
schema, ns(Field_table_t)
ArrowSchemaInit(schema->children[i]);
}
- NANOARROW_RETURN_NOT_OK(ArrowIpcReaderSetChildren(schema, children, error));
- return ArrowIpcReaderSetMetadata(schema, ns(Field_custom_metadata(field)),
error);
+ NANOARROW_RETURN_NOT_OK(ArrowIpcDecoderSetChildren(schema, children, error));
+ return ArrowIpcDecoderSetMetadata(schema, ns(Field_custom_metadata(field)),
error);
}
-static int ArrowIpcReaderSetChildren(struct ArrowSchema* schema,
ns(Field_vec_t) fields,
- struct ArrowError* error) {
+static int ArrowIpcDecoderSetChildren(struct ArrowSchema* schema,
ns(Field_vec_t) fields,
+ struct ArrowError* error) {
int64_t n_fields = ns(Schema_vec_len(fields));
for (int64_t i = 0; i < n_fields; i++) {
ns(Field_table_t) field = ns(Field_vec_at(fields, i));
- NANOARROW_RETURN_NOT_OK(ArrowIpcReaderSetField(schema->children[i], field,
error));
+ NANOARROW_RETURN_NOT_OK(ArrowIpcDecoderSetField(schema->children[i],
field, error));
}
return NANOARROW_OK;
}
-static int ArrowIpcReaderDecodeSchema(struct ArrowIpcReader* reader,
- flatbuffers_generic_t message_header,
- struct ArrowError* error) {
+static int ArrowIpcDecoderDecodeSchemaHeader(struct ArrowIpcDecoder* decoder,
+ flatbuffers_generic_t
message_header,
+ struct ArrowError* error) {
+ struct ArrowIpcDecoderPrivate* private_data =
+ (struct ArrowIpcDecoderPrivate*)decoder->private_data;
+
ns(Schema_table_t) schema = (ns(Schema_table_t))message_header;
int endianness = ns(Schema_endianness(schema));
switch (endianness) {
case ns(Endianness_Little):
- reader->endianness = NANOARROW_IPC_ENDIANNESS_LITTLE;
+ decoder->endianness = NANOARROW_IPC_ENDIANNESS_LITTLE;
break;
case ns(Endianness_Big):
- reader->endianness = NANOARROW_IPC_ENDIANNESS_BIG;
+ decoder->endianness = NANOARROW_IPC_ENDIANNESS_BIG;
break;
default:
ArrowErrorSet(error,
@@ -20899,16 +20934,16 @@ static int ArrowIpcReaderDecodeSchema(struct
ArrowIpcReader* reader,
ns(Feature_vec_t) features = ns(Schema_features(schema));
int64_t n_features = ns(Feature_vec_len(features));
- reader->features = 0;
+ decoder->feature_flags = 0;
for (int64_t i = 0; i < n_features; i++) {
ns(Feature_enum_t) feature = ns(Feature_vec_at(features, i));
switch (feature) {
case ns(Feature_COMPRESSED_BODY):
- reader->features |= NANOARROW_IPC_FEATURE_COMPRESSED_BODY;
+ decoder->feature_flags |= NANOARROW_IPC_FEATURE_COMPRESSED_BODY;
break;
case ns(Feature_DICTIONARY_REPLACEMENT):
- reader->features |= NANOARROW_IPC_FEATURE_DICTIONARY_REPLACEMENT;
+ decoder->feature_flags |= NANOARROW_IPC_FEATURE_DICTIONARY_REPLACEMENT;
break;
default:
ArrowErrorSet(error, "Unrecognized Schema feature with value %d",
(int)feature);
@@ -20916,33 +20951,70 @@ static int ArrowIpcReaderDecodeSchema(struct
ArrowIpcReader* reader,
}
}
- ns(Field_vec_t) fields = ns(Schema_fields(schema));
- int64_t n_fields = ns(Schema_vec_len(fields));
- if (reader->schema.release != NULL) {
- reader->schema.release(&reader->schema);
+ return NANOARROW_OK;
+}
+
+static int ArrowIpcDecoderDecodeRecordBatchHeader(struct ArrowIpcDecoder*
decoder,
+ flatbuffers_generic_t
message_header,
+ struct ArrowError* error) {
+ struct ArrowIpcDecoderPrivate* private_data =
+ (struct ArrowIpcDecoderPrivate*)decoder->private_data;
+
+ ns(RecordBatch_table_t) batch = (ns(RecordBatch_table_t))message_header;
+
+ ns(FieldNode_vec_t) fields = ns(RecordBatch_nodes(batch));
+ ns(Buffer_vec_t) buffers = ns(RecordBatch_buffers(batch));
+ int64_t n_fields = ns(FieldNode_vec_len(fields));
+ int64_t n_buffers = ns(Buffer_vec_len(buffers));
+
+ // Check field node and buffer count. We have one more field and buffer
+ // because we count the root struct and the flatbuffer message does not.
+ if ((n_fields + 1) != private_data->n_fields) {
+ ArrowErrorSet(error, "Expected %ld field nodes in message but found %ld",
+ (long)private_data->n_fields - 1, (long)n_fields);
+ return EINVAL;
}
- ArrowSchemaInit(&reader->schema);
- int result = ArrowSchemaSetTypeStruct(&reader->schema, n_fields);
- if (result != NANOARROW_OK) {
- ArrowErrorSet(error, "Failed to allocate struct schema with %ld children",
- (long)n_fields);
- return result;
+ if ((n_buffers + 1) != private_data->n_buffers) {
+ ArrowErrorSet(error, "Expected %ld buffers in message but found %ld",
+ (long)private_data->n_buffers - 1, (long)n_buffers);
+ return EINVAL;
}
- NANOARROW_RETURN_NOT_OK(ArrowIpcReaderSetChildren(&reader->schema, fields,
error));
- return ArrowIpcReaderSetMetadata(&reader->schema,
ns(Schema_custom_metadata(schema)),
- error);
+ if (ns(RecordBatch_compression_is_present(batch))) {
+ ns(BodyCompression_table_t) compression =
ns(RecordBatch_compression(batch));
+ ns(CompressionType_enum_t) codec = ns(BodyCompression_codec(compression));
+ switch (codec) {
+ case ns(CompressionType_LZ4_FRAME):
+ decoder->codec = NANOARROW_IPC_COMPRESSION_TYPE_LZ4_FRAME;
+ break;
+ case ns(CompressionType_ZSTD):
+ decoder->codec = NANOARROW_IPC_COMPRESSION_TYPE_ZSTD;
+ break;
+ default:
+ ArrowErrorSet(error, "Unrecognized RecordBatch BodyCompression codec
value: %d",
+ (int)codec);
+ return EINVAL;
+ }
+ } else {
+ decoder->codec = NANOARROW_IPC_COMPRESSION_TYPE_NONE;
+ }
+
+ // Copying field node and buffer information is separate so as only to pay
for the
+ // nodes that are actually accessed.
+ return NANOARROW_OK;
}
-static inline int ArrowIpcReaderCheckHeader(struct ArrowIpcReader* reader,
- struct ArrowBufferView* data_mut,
- int32_t* message_size_bytes,
- struct ArrowError* error) {
+// Returns NANOARROW_OK if data is large enough to read the message header,
+// ESPIPE if reading more data might help, or EINVAL if the content is not
valid
+static inline int ArrowIpcDecoderCheckHeader(struct ArrowIpcDecoder* decoder,
+ struct ArrowBufferView* data_mut,
+ int32_t* message_size_bytes,
+ struct ArrowError* error) {
if (data_mut->size_bytes < 8) {
ArrowErrorSet(error, "Expected data of at least 8 bytes but only %ld bytes
remain",
(long)data_mut->size_bytes);
- return EINVAL;
+ return ESPIPE;
}
uint32_t continuation = ArrowIpcReadUint32LE(data_mut);
@@ -20953,12 +21025,17 @@ static inline int ArrowIpcReaderCheckHeader(struct
ArrowIpcReader* reader,
}
*message_size_bytes = ArrowIpcReadInt32LE(data_mut);
- if ((*message_size_bytes) > data_mut->size_bytes || (*message_size_bytes) <
0) {
+ if ((*message_size_bytes) < 0) {
+ ArrowErrorSet(
+ error, "Expected message body size > 0 but found message body size of
%ld bytes",
+ (long)(*message_size_bytes));
+ return EINVAL;
+ } else if ((*message_size_bytes) > data_mut->size_bytes) {
ArrowErrorSet(error,
"Expected 0 <= message body size <= %ld bytes but found
message "
"body size of %ld bytes",
(long)data_mut->size_bytes, (long)(*message_size_bytes));
- return EINVAL;
+ return ESPIPE;
}
if (*message_size_bytes == 0) {
@@ -20969,49 +21046,64 @@ static inline int ArrowIpcReaderCheckHeader(struct
ArrowIpcReader* reader,
return NANOARROW_OK;
}
-ArrowErrorCode ArrowIpcReaderPeek(struct ArrowIpcReader* reader,
- struct ArrowBufferView data, struct
ArrowError* error) {
- reader->message_type = NANOARROW_IPC_MESSAGE_TYPE_UNINITIALIZED;
- reader->body_size_bytes = 0;
+ArrowErrorCode ArrowIpcDecoderPeekHeader(struct ArrowIpcDecoder* decoder,
+ struct ArrowBufferView data,
+ struct ArrowError* error) {
+ struct ArrowIpcDecoderPrivate* private_data =
+ (struct ArrowIpcDecoderPrivate*)decoder->private_data;
+
+ decoder->message_type = NANOARROW_IPC_MESSAGE_TYPE_UNINITIALIZED;
+ decoder->body_size_bytes = 0;
+ private_data->last_message = NULL;
NANOARROW_RETURN_NOT_OK(
- ArrowIpcReaderCheckHeader(reader, &data, &reader->header_size_bytes,
error));
- reader->header_size_bytes += 2 * sizeof(int32_t);
+ ArrowIpcDecoderCheckHeader(decoder, &data, &decoder->header_size_bytes,
error));
+ decoder->header_size_bytes += 2 * sizeof(int32_t);
return NANOARROW_OK;
}
-ArrowErrorCode ArrowIpcReaderVerify(struct ArrowIpcReader* reader,
- struct ArrowBufferView data,
- struct ArrowError* error) {
- reader->message_type = NANOARROW_IPC_MESSAGE_TYPE_UNINITIALIZED;
- reader->body_size_bytes = 0;
+ArrowErrorCode ArrowIpcDecoderVerifyHeader(struct ArrowIpcDecoder* decoder,
+ struct ArrowBufferView data,
+ struct ArrowError* error) {
+ struct ArrowIpcDecoderPrivate* private_data =
+ (struct ArrowIpcDecoderPrivate*)decoder->private_data;
+
+ decoder->message_type = NANOARROW_IPC_MESSAGE_TYPE_UNINITIALIZED;
+ decoder->body_size_bytes = 0;
+ private_data->last_message = NULL;
NANOARROW_RETURN_NOT_OK(
- ArrowIpcReaderCheckHeader(reader, &data, &reader->header_size_bytes,
error));
+ ArrowIpcDecoderCheckHeader(decoder, &data, &decoder->header_size_bytes,
error));
// Run flatbuffers verification
- if (ns(Message_verify_as_root(data.data.as_uint8,
reader->header_size_bytes)) !=
+ if (ns(Message_verify_as_root(data.data.as_uint8,
decoder->header_size_bytes)) !=
flatcc_verify_ok) {
ArrowErrorSet(error, "Message flatbuffer verification failed");
return EINVAL;
}
// Read some basic information from the message
- reader->header_size_bytes += 2 * sizeof(int32_t);
+ decoder->header_size_bytes += 2 * sizeof(int32_t);
ns(Message_table_t) message = ns(Message_as_root(data.data.as_uint8));
- reader->metadata_version = ns(Message_version(message));
- reader->message_type = ns(Message_header_type(message));
- reader->body_size_bytes = ns(Message_bodyLength(message));
+ decoder->metadata_version = ns(Message_version(message));
+ decoder->message_type = ns(Message_header_type(message));
+ decoder->body_size_bytes = ns(Message_bodyLength(message));
+ private_data->last_message = ns(Message_header_get(message));
return NANOARROW_OK;
}
-ArrowErrorCode ArrowIpcReaderDecode(struct ArrowIpcReader* reader,
- struct ArrowBufferView data,
- struct ArrowError* error) {
- reader->message_type = NANOARROW_IPC_MESSAGE_TYPE_UNINITIALIZED;
- reader->body_size_bytes = 0;
+ArrowErrorCode ArrowIpcDecoderDecodeHeader(struct ArrowIpcDecoder* decoder,
+ struct ArrowBufferView data,
+ struct ArrowError* error) {
+ struct ArrowIpcDecoderPrivate* private_data =
+ (struct ArrowIpcDecoderPrivate*)decoder->private_data;
+
+ decoder->message_type = NANOARROW_IPC_MESSAGE_TYPE_UNINITIALIZED;
+ decoder->body_size_bytes = 0;
+ private_data->last_message = NULL;
+
NANOARROW_RETURN_NOT_OK(
- ArrowIpcReaderCheckHeader(reader, &data, &reader->header_size_bytes,
error));
- reader->header_size_bytes += 2 * sizeof(int32_t);
+ ArrowIpcDecoderCheckHeader(decoder, &data, &decoder->header_size_bytes,
error));
+ decoder->header_size_bytes += 2 * sizeof(int32_t);
ns(Message_table_t) message = ns(Message_as_root(data.data.as_uint8));
if (!message) {
@@ -21019,11 +21111,11 @@ ArrowErrorCode ArrowIpcReaderDecode(struct
ArrowIpcReader* reader,
}
// Read some basic information from the message
- reader->metadata_version = ns(Message_version(message));
- reader->message_type = ns(Message_header_type(message));
- reader->body_size_bytes = ns(Message_bodyLength(message));
+ int32_t metadata_version = ns(Message_version(message));
+ decoder->message_type = ns(Message_header_type(message));
+ decoder->body_size_bytes = ns(Message_bodyLength(message));
- switch (reader->metadata_version) {
+ switch (decoder->metadata_version) {
case ns(MetadataVersion_V4):
case ns(MetadataVersion_V5):
break;
@@ -21031,30 +21123,319 @@ ArrowErrorCode ArrowIpcReaderDecode(struct
ArrowIpcReader* reader,
case ns(MetadataVersion_V2):
case ns(MetadataVersion_V3):
ArrowErrorSet(error, "Expected metadata version V4 or V5 but found %s",
- ns(MetadataVersion_name(reader->metadata_version)));
+ ns(MetadataVersion_name(decoder->metadata_version)));
break;
default:
ArrowErrorSet(error, "Unexpected value for Message metadata version
(%d)",
- reader->metadata_version);
+ decoder->metadata_version);
return EINVAL;
}
flatbuffers_generic_t message_header = ns(Message_header_get(message));
- switch (reader->message_type) {
+ switch (decoder->message_type) {
case ns(MessageHeader_Schema):
- NANOARROW_RETURN_NOT_OK(ArrowIpcReaderDecodeSchema(reader,
message_header, error));
+ NANOARROW_RETURN_NOT_OK(
+ ArrowIpcDecoderDecodeSchemaHeader(decoder, message_header, error));
break;
- case ns(MessageHeader_DictionaryBatch):
case ns(MessageHeader_RecordBatch):
+ NANOARROW_RETURN_NOT_OK(
+ ArrowIpcDecoderDecodeRecordBatchHeader(decoder, message_header,
error));
+ break;
+ case ns(MessageHeader_DictionaryBatch):
case ns(MessageHeader_Tensor):
case ns(MessageHeader_SparseTensor):
ArrowErrorSet(error, "Unsupported message type: '%s'",
- ns(MessageHeader_type_name(reader->message_type)));
+ ns(MessageHeader_type_name(decoder->message_type)));
return ENOTSUP;
default:
- ArrowErrorSet(error, "Unnown message type: %d",
(int)(reader->message_type));
+ ArrowErrorSet(error, "Unnown message type: %d",
(int)(decoder->message_type));
return EINVAL;
}
+ private_data->last_message = message_header;
+ return NANOARROW_OK;
+}
+
+ArrowErrorCode ArrowIpcDecoderDecodeSchema(struct ArrowIpcDecoder* decoder,
+ struct ArrowSchema* out,
+ struct ArrowError* error) {
+ struct ArrowIpcDecoderPrivate* private_data =
+ (struct ArrowIpcDecoderPrivate*)decoder->private_data;
+
+ if (private_data->last_message == NULL ||
+ decoder->message_type != NANOARROW_IPC_MESSAGE_TYPE_SCHEMA) {
+ ArrowErrorSet(error, "decoder did not just decode a Schema message");
+ return EINVAL;
+ }
+
+ ns(Schema_table_t) schema = (ns(Schema_table_t))private_data->last_message;
+
+ ns(Field_vec_t) fields = ns(Schema_fields(schema));
+ int64_t n_fields = ns(Schema_vec_len(fields));
+
+ struct ArrowSchema tmp;
+ ArrowSchemaInit(&tmp);
+ int result = ArrowSchemaSetTypeStruct(&tmp, n_fields);
+ if (result != NANOARROW_OK) {
+ tmp.release(&tmp);
+ ArrowErrorSet(error, "Failed to allocate struct schema with %ld children",
+ (long)n_fields);
+ return result;
+ }
+
+ result = ArrowIpcDecoderSetChildren(&tmp, fields, error);
+ if (result != NANOARROW_OK) {
+ tmp.release(&tmp);
+ return result;
+ }
+
+ result = ArrowIpcDecoderSetMetadata(&tmp,
ns(Schema_custom_metadata(schema)), error);
+ if (result != NANOARROW_OK) {
+ tmp.release(&tmp);
+ return result;
+ }
+
+ ArrowSchemaMove(&tmp, out);
+ return NANOARROW_OK;
+}
+
+static void ArrowIpcDecoderCountFields(struct ArrowSchema* schema, int64_t*
n_fields) {
+ *n_fields += 1;
+ for (int64_t i = 0; i < schema->n_children; i++) {
+ ArrowIpcDecoderCountFields(schema->children[i], n_fields);
+ }
+}
+
+static void ArrowIpcDecoderInitFields(struct ArrowIpcField* fields,
+ struct ArrowArrayView* view, int64_t*
n_fields,
+ int64_t* n_buffers) {
+ struct ArrowIpcField* field = fields + (*n_fields);
+ field->array_view = view;
+ field->buffer_offset = *n_buffers;
+
+ for (int i = 0; i < 3; i++) {
+ *n_buffers += view->layout.buffer_type[i] != NANOARROW_BUFFER_TYPE_NONE;
+ }
+
+ *n_fields += 1;
+
+ for (int64_t i = 0; i < view->n_children; i++) {
+ ArrowIpcDecoderInitFields(fields, view->children[i], n_fields, n_buffers);
+ }
+}
+
+ArrowErrorCode ArrowIpcDecoderSetSchema(struct ArrowIpcDecoder* decoder,
+ struct ArrowSchema* schema,
+ struct ArrowError* error) {
+ struct ArrowIpcDecoderPrivate* private_data =
+ (struct ArrowIpcDecoderPrivate*)decoder->private_data;
+
+ ArrowArrayViewReset(&private_data->array_view);
+
+ if (private_data->fields != NULL) {
+ ArrowFree(private_data->fields);
+ }
+
+ // Allocate Array and ArrayView based on schema without moving the schema
+ // this will fail if the schema is not valid.
+ NANOARROW_RETURN_NOT_OK(
+ ArrowArrayViewInitFromSchema(&private_data->array_view, schema, error));
+
+ // Root must be a struct
+ if (private_data->array_view.storage_type != NANOARROW_TYPE_STRUCT) {
+ ArrowErrorSet(error, "schema must be a struct type");
+ return EINVAL;
+ }
+
+ // Walk tree and calculate how many fields we need to allocate
+ private_data->n_fields = 0;
+ ArrowIpcDecoderCountFields(schema, &private_data->n_fields);
+ private_data->fields = (struct
ArrowIpcField*)ArrowMalloc(private_data->n_fields *
+ sizeof(struct
ArrowIpcField));
+ if (private_data->fields == NULL) {
+ ArrowErrorSet(error, "Failed to allocate decoder->fields");
+ return ENOMEM;
+ }
+ memset(private_data->fields, 0, private_data->n_fields * sizeof(struct
ArrowIpcField));
+
+ // Init field information and calculate starting buffer offset for each
+ int64_t field_i = 0;
+ ArrowIpcDecoderInitFields(private_data->fields, &private_data->array_view,
&field_i,
+ &private_data->n_buffers);
+
+ return NANOARROW_OK;
+}
+
+struct ArrowIpcArraySetter {
+ ns(FieldNode_vec_t) fields;
+ int64_t field_i;
+ ns(Buffer_vec_t) buffers;
+ int64_t buffer_i;
+ struct ArrowBufferView body;
+ enum ArrowIpcCompressionType codec;
+ enum ArrowIpcEndianness endianness;
+ enum ArrowIpcEndianness system_endianness;
+};
+
+static int ArrowIpcDecoderMakeBuffer(struct ArrowIpcArraySetter* setter,
int64_t offset,
+ int64_t length, struct ArrowBuffer* out,
+ struct ArrowError* error) {
+ if (length == 0) {
+ return NANOARROW_OK;
+ }
+
+ // Check that this buffer fits within the body
+ if (offset < 0 || (offset + length) > setter->body.size_bytes) {
+ ArrowErrorSet(error,
+ "Buffer %ld requires body offsets [%ld..%ld) but body has
size %ld",
+ (long)setter->buffer_i - 1, (long)offset, (long)offset +
(long)length,
+ setter->body.size_bytes);
+ return EINVAL;
+ }
+
+ struct ArrowBufferView view;
+ view.data.as_uint8 = setter->body.data.as_uint8 + offset;
+ view.size_bytes = length;
+
+ if (setter->codec != NANOARROW_IPC_COMPRESSION_TYPE_NONE) {
+ ArrowErrorSet(error, "The nanoarrow_ipc extension does not support
compression");
+ return ENOTSUP;
+ }
+
+ if (setter->endianness != NANOARROW_IPC_ENDIANNESS_UNINITIALIZED &&
+ setter->endianness != setter->system_endianness) {
+ ArrowErrorSet(error,
+ "The nanoarrow_ipc extension does not support non-system
endianness");
+ return ENOTSUP;
+ }
+
+ int result = ArrowBufferAppendBufferView(out, view);
+ if (result != NANOARROW_OK) {
+ ArrowErrorSet(error, "Failed to copy buffer");
+ return result;
+ }
+
+ return NANOARROW_OK;
+}
+
+static int ArrowIpcDecoderWalkGetArray(struct ArrowIpcArraySetter* setter,
+ struct ArrowArray* array,
+ struct ArrowError* error) {
+ ns(FieldNode_struct_t) field =
+ ns(FieldNode_vec_at(setter->fields, (size_t)setter->field_i));
+ array->length = ns(FieldNode_length(field));
+ array->null_count = ns(FieldNode_null_count(field));
+ setter->field_i += 1;
+
+ for (int64_t i = 0; i < array->n_buffers; i++) {
+ ns(Buffer_struct_t) buffer =
+ ns(Buffer_vec_at(setter->buffers, (size_t)setter->buffer_i));
+ int64_t buffer_offset = ns(Buffer_offset(buffer));
+ int64_t buffer_length = ns(Buffer_length(buffer));
+ setter->buffer_i += 1;
+
+ struct ArrowBuffer* buffer_dst = ArrowArrayBuffer(array, i);
+ NANOARROW_RETURN_NOT_OK(ArrowIpcDecoderMakeBuffer(setter, buffer_offset,
+ buffer_length,
buffer_dst, error));
+ }
+
+ for (int64_t i = 0; i < array->n_children; i++) {
+ NANOARROW_RETURN_NOT_OK(
+ ArrowIpcDecoderWalkGetArray(setter, array->children[i], error));
+ }
+
+ return NANOARROW_OK;
+}
+
+static int ArrowIpcArrayInitFromArrayView(struct ArrowArray* array,
+ struct ArrowArrayView* array_view) {
+ NANOARROW_RETURN_NOT_OK(ArrowArrayInitFromType(array,
array_view->storage_type));
+ NANOARROW_RETURN_NOT_OK(ArrowArrayAllocateChildren(array,
array_view->n_children));
+ for (int64_t i = 0; i < array_view->n_children; i++) {
+ NANOARROW_RETURN_NOT_OK(
+ ArrowIpcArrayInitFromArrayView(array->children[i],
array_view->children[i]));
+ }
+
+ return NANOARROW_OK;
+}
+
+ArrowErrorCode ArrowIpcDecoderDecodeArray(struct ArrowIpcDecoder* decoder,
+ struct ArrowBufferView body, int64_t
field_i,
+ struct ArrowArray* out,
+ struct ArrowError* error) {
+ struct ArrowIpcDecoderPrivate* private_data =
+ (struct ArrowIpcDecoderPrivate*)decoder->private_data;
+
+ if (private_data->last_message == NULL ||
+ decoder->message_type != NANOARROW_IPC_MESSAGE_TYPE_RECORD_BATCH) {
+ ArrowErrorSet(error, "decoder did not just decode a RecordBatch message");
+ return EINVAL;
+ }
+
+ ns(RecordBatch_table_t) batch =
(ns(RecordBatch_table_t))private_data->last_message;
+
+ // RecordBatch messages don't count the root node but decoder->fields does
+ struct ArrowIpcField* root = private_data->fields + field_i + 1;
+
+ struct ArrowArray temp;
+ temp.release = NULL;
+ int result = ArrowIpcArrayInitFromArrayView(&temp, root->array_view);
+ if (result != NANOARROW_OK) {
+ ArrowErrorSet(error, "Failed to initialize output array");
+ return result;
+ }
+
+ struct ArrowIpcArraySetter setter;
+ setter.fields = ns(RecordBatch_nodes(batch));
+ setter.field_i = field_i;
+ setter.buffers = ns(RecordBatch_buffers(batch));
+ setter.buffer_i = root->buffer_offset - 1;
+ setter.body = body;
+ setter.codec = decoder->codec;
+ setter.endianness = decoder->endianness;
+
+ // This should probably be done at compile time
+ uint32_t check = 1;
+ char first_byte;
+ memcpy(&first_byte, &check, sizeof(char));
+ if (first_byte) {
+ setter.system_endianness = NANOARROW_IPC_ENDIANNESS_LITTLE;
+ } else {
+ setter.system_endianness = NANOARROW_IPC_ENDIANNESS_BIG;
+ }
+
+ // The flatbuffers FieldNode doesn't count the root struct so we have to
loop over the
+ // children ourselves
+ if (field_i == -1) {
+ temp.length = ns(RecordBatch_length(batch));
+ temp.null_count = 0;
+ setter.field_i++;
+ setter.buffer_i++;
+
+ for (int64_t i = 0; i < temp.n_children; i++) {
+ result = ArrowIpcDecoderWalkGetArray(&setter, temp.children[i], error);
+ if (result != NANOARROW_OK) {
+ temp.release(&temp);
+ return result;
+ }
+ }
+ } else {
+ result = ArrowIpcDecoderWalkGetArray(&setter, &temp, error);
+ if (result != NANOARROW_OK) {
+ temp.release(&temp);
+ return result;
+ }
+ }
+
+ // TODO: this performs some validation but doesn't do everything we need it
to do
+ // notably it doesn't loop over offset buffers to look for values that will
cause
+ // out-of-bounds buffer access on the data buffer or child arrays.
+ result = ArrowArrayFinishBuilding(&temp, error);
+ if (result != NANOARROW_OK) {
+ temp.release(&temp);
+ return result;
+ }
+
+ ArrowArrayMove(&temp, out);
return NANOARROW_OK;
}
diff --git a/dist/nanoarrow_ipc.h b/dist/nanoarrow_ipc.h
index 018f4ed..7573441 100644
--- a/dist/nanoarrow_ipc.h
+++ b/dist/nanoarrow_ipc.h
@@ -23,11 +23,20 @@
#ifdef NANOARROW_NAMESPACE
#define ArrowIpcCheckRuntime NANOARROW_SYMBOL(NANOARROW_NAMESPACE,
ArrowIpcCheckRuntime)
-#define ArrowIpcReaderInit NANOARROW_SYMBOL(NANOARROW_NAMESPACE,
ArrowIpcReaderInit)
-#define ArrowIpcReaderReset NANOARROW_SYMBOL(NANOARROW_NAMESPACE,
ArrowIpcReaderReset)
-#define ArrowIpcReaderPeek NANOARROW_SYMBOL(NANOARROW_NAMESPACE,
ArrowIpcReaderPeek)
-#define ArrowIpcReaderVerify NANOARROW_SYMBOL(NANOARROW_NAMESPACE,
ArrowIpcReaderVerify)
-#define ArrowIpcReaderDecode NANOARROW_SYMBOL(NANOARROW_NAMESPACE,
ArrowIpcReaderDecode)
+#define ArrowIpcDecoderInit NANOARROW_SYMBOL(NANOARROW_NAMESPACE,
ArrowIpcDecoderInit)
+#define ArrowIpcDecoderReset NANOARROW_SYMBOL(NANOARROW_NAMESPACE,
ArrowIpcDecoderReset)
+#define ArrowIpcDecoderPeekHeader \
+ NANOARROW_SYMBOL(NANOARROW_NAMESPACE, ArrowIpcDecoderPeekHeader)
+#define ArrowIpcDecoderVerifyHeader \
+ NANOARROW_SYMBOL(NANOARROW_NAMESPACE, ArrowIpcDecoderVerifyHeader)
+#define ArrowIpcDecoderDecodeHeader \
+ NANOARROW_SYMBOL(NANOARROW_NAMESPACE, ArrowIpcDecoderDecodeHeader)
+#define ArrowIpcDecoderDecodeSchema \
+ NANOARROW_SYMBOL(NANOARROW_NAMESPACE, ArrowIpcDecoderDecodeSchema)
+#define ArrowIpcDecoderDecodeArray \
+ NANOARROW_SYMBOL(NANOARROW_NAMESPACE, ArrowIpcDecoderDecodeArray)
+#define ArrowIpcDecoderSetSchema \
+ NANOARROW_SYMBOL(NANOARROW_NAMESPACE, ArrowIpcDecoderSetSchema)
#endif
@@ -35,6 +44,14 @@
extern "C" {
#endif
+enum ArrowIpcMetadataVersion {
+ NANOARROW_IPC_METADATA_VERSION_V1,
+ NANOARROW_IPC_METADATA_VERSION_V2,
+ NANOARROW_IPC_METADATA_VERSION_V3,
+ NANOARROW_IPC_METADATA_VERSION_V4,
+ NANOARROW_IPC_METADATA_VERSION_V5
+};
+
enum ArrowIpcMessageType {
NANOARROW_IPC_MESSAGE_TYPE_UNINITIALIZED,
NANOARROW_IPC_MESSAGE_TYPE_SCHEMA,
@@ -50,38 +67,145 @@ enum ArrowIpcEndianness {
NANOARROW_IPC_ENDIANNESS_BIG
};
+enum ArrowIpcCompressionType {
+ NANOARROW_IPC_COMPRESSION_TYPE_NONE,
+ NANOARROW_IPC_COMPRESSION_TYPE_LZ4_FRAME,
+ NANOARROW_IPC_COMPRESSION_TYPE_ZSTD
+};
+
#define NANOARROW_IPC_FEATURE_DICTIONARY_REPLACEMENT 1
#define NANOARROW_IPC_FEATURE_COMPRESSED_BODY 2
ArrowErrorCode ArrowIpcCheckRuntime(struct ArrowError* error);
-struct ArrowIpcReader {
- int32_t metadata_version;
- int32_t message_type;
- int32_t endianness;
- int32_t features;
+/// \brief Decoder for Arrow IPC messages
+///
+/// This structure is intended to be allocated by the caller,
+/// initialized using ArrowIpcDecoderInit(), and released with
+/// ArrowIpcDecoderReset(). These fields should not be modified
+/// by the caller but can be read following a call to
+/// ArrowIpcDecoderPeekHeader(), ArrowIpcDecoderVerifyHeader(), or
+/// ArrowIpcDecoderDecodeHeader().
+struct ArrowIpcDecoder {
+ /// \brief The last verified or decoded message type
+ enum ArrowIpcMessageType message_type;
+
+ /// \brief The metadata version used by this and forthcoming messages
+ enum ArrowIpcMetadataVersion metadata_version;
+
+ /// \brief Endianness of forthcoming RecordBatch messages
+ enum ArrowIpcEndianness endianness;
+
+ /// \brief Features used by this and forthcoming messages as indicated by
the current
+ /// Schema message
+ int32_t feature_flags;
+
+ /// \brief Compression used by the current RecordBatch message
+ enum ArrowIpcCompressionType codec;
+
+ /// \brief The number of bytes in the current header message
+ ///
+ /// This value includes the 8 bytes before the start of the header message
+ /// content and any padding bytes required to make the header message size
+ /// be a multiple of 8 bytes.
int32_t header_size_bytes;
- int64_t body_size_bytes;
- struct ArrowSchema schema;
-};
-
-void ArrowIpcReaderInit(struct ArrowIpcReader* reader);
-void ArrowIpcReaderReset(struct ArrowIpcReader* reader);
-
-ArrowErrorCode ArrowIpcReaderPeek(struct ArrowIpcReader* reader,
- struct ArrowBufferView data, struct
ArrowError* error);
+ /// \brief The number of bytes in the forthcoming body message.
+ int64_t body_size_bytes;
-ArrowErrorCode ArrowIpcReaderVerify(struct ArrowIpcReader* reader,
- struct ArrowBufferView data,
- struct ArrowError* error);
+ /// \brief Private resources managed by this library
+ void* private_data;
+};
-ArrowErrorCode ArrowIpcReaderDecode(struct ArrowIpcReader* reader,
- struct ArrowBufferView data,
- struct ArrowError* error);
-
-#endif
+/// \brief Initialize a decoder
+ArrowErrorCode ArrowIpcDecoderInit(struct ArrowIpcDecoder* decoder);
+
+/// \brief Release all resources attached to a decoder
+void ArrowIpcDecoderReset(struct ArrowIpcDecoder* decoder);
+
+/// \brief Peek at a message header
+///
+/// The first 8 bytes of an Arrow IPC message are 0xFFFFFF followed by the size
+/// of the header as a little-endian 32-bit integer.
ArrowIpcDecoderPeekHeader() reads
+/// these bytes and returns ESPIPE if there are not enough remaining bytes in
data to read
+/// the entire header message, EINVAL if the first 8 bytes are not valid,
ENODATA if the
+/// Arrow end-of-stream indicator has been reached, or NANOARROW_OK otherwise.
+ArrowErrorCode ArrowIpcDecoderPeekHeader(struct ArrowIpcDecoder* decoder,
+ struct ArrowBufferView data,
+ struct ArrowError* error);
+
+/// \brief Verify a message header
+///
+/// Runs ArrowIpcDecoderPeekHeader() to ensure data is sufficiently large but
additionally
+/// runs flatbuffer verification to ensure that decoding the data will not
access
+/// memory outside of the buffer specified by data.
ArrowIpcDecoderVerifyHeader() will
+/// also set decoder.header_size_bytes, decoder.body_size_bytes,
decoder.metadata_version,
+/// and decoder.message_type.
+///
+/// Returns as ArrowIpcDecoderPeekHeader() and additionally will
+/// return EINVAL if flatbuffer verification fails.
+ArrowErrorCode ArrowIpcDecoderVerifyHeader(struct ArrowIpcDecoder* decoder,
+ struct ArrowBufferView data,
+ struct ArrowError* error);
+
+/// \brief Decode a message header
+///
+/// Runs ArrowIpcDecoderPeekHeader() to ensure data is sufficiently large and
decodes
+/// the content of the message header. If data contains a schema message,
+/// decoder.endianness and decoder.feature_flags is set and
ArrowIpcDecoderDecodeSchema()
+/// can be used to obtain the decoded schema. If data contains a record batch
message,
+/// decoder.codec is set and a successful call can be followed by a call to
+/// ArrowIpcDecoderDecodeArray().
+///
+/// In almost all cases this should be preceeded by a call to
+/// ArrowIpcDecoderVerifyHeader() to ensure decoding does not access data
outside of the
+/// specified buffer.
+///
+/// Returns EINVAL if the content of the message cannot be decoded or ENOTSUP
if the
+/// content of the message uses features not supported by this library.
+ArrowErrorCode ArrowIpcDecoderDecodeHeader(struct ArrowIpcDecoder* decoder,
+ struct ArrowBufferView data,
+ struct ArrowError* error);
+
+/// \brief Decode an ArrowArray
+///
+/// After a successful call to ArrowIpcDecoderDecodeHeader(), retrieve an
ArrowSchema.
+/// The caller is responsible for releasing the schema if NANOARROW_OK is
returned.
+///
+/// Returns EINVAL if the decoder did not just decode a schema message or
+/// NANOARROW_OK otherwise.
+ArrowErrorCode ArrowIpcDecoderDecodeSchema(struct ArrowIpcDecoder* decoder,
+ struct ArrowSchema* out,
+ struct ArrowError* error);
+
+/// \brief Set the ArrowSchema used to decode future record batch messages
+///
+/// Prepares the decoder for future record batch messages
+/// of this type. The decoder takes ownership of schema if NANOARROW_OK is
returned.
+///
+/// Returns EINVAL if schema validation fails or NANOARROW_OK otherwise.
+ArrowErrorCode ArrowIpcDecoderSetSchema(struct ArrowIpcDecoder* decoder,
+ struct ArrowSchema* schema,
+ struct ArrowError* error);
+
+/// \brief Decode an ArrowArray
+///
+/// After a successful call to ArrowIpcDecoderDecodeHeader(), assemble an
ArrowArray given
+/// a message body and a field index. Note that field index does not equate to
column
+/// index if any columns contain nested types. Use a value of -1 to decode the
entire
+/// array into a struct. The caller is responsible for releasing the array if
+/// NANOARROW_OK is returned.
+///
+/// Returns EINVAL if the decoder did not just decode a record batch message,
ENOTSUP
+/// if the message uses features not supported by this library, or or
NANOARROW_OK
+/// otherwise.
+ArrowErrorCode ArrowIpcDecoderDecodeArray(struct ArrowIpcDecoder* decoder,
+ struct ArrowBufferView body, int64_t
i,
+ struct ArrowArray* out,
+ struct ArrowError* error);
#ifdef __cplusplus
}
#endif
+
+#endif