paleolimbot commented on code in PR #143: URL: https://github.com/apache/arrow-nanoarrow/pull/143#discussion_r1129429566
########## extensions/nanoarrow_ipc/src/nanoarrow/nanoarrow_ipc.c: ########## @@ -785,5 +847,252 @@ ArrowErrorCode ArrowIpcReaderDecode(struct ArrowIpcReader* reader, return EINVAL; } + reader->private_data = message_header; + return NANOARROW_OK; +} + +ArrowErrorCode ArrowIpcReaderGetSchema(struct ArrowIpcReader* reader, + struct ArrowSchema* out, + struct ArrowError* error) { + if (reader->schema.release == NULL) { + ArrowErrorSet(error, "reader does not contain a valid schema"); + return EINVAL; + } + + ArrowSchemaMove(&reader->schema, out); + return NANOARROW_OK; +} + +static void ArrowIpcReaderCountFields(struct ArrowSchema* schema, int64_t* n_fields) { + *n_fields += 1; + for (int64_t i = 0; i < schema->n_children; i++) { + ArrowIpcReaderCountFields(schema->children[i], n_fields); + } +} + +static void ArrowIpcReaderInitFields(struct ArrowIpcField* fields, + struct ArrowArrayView* view, int64_t* n_fields, + int64_t* n_buffers) { + struct ArrowIpcField* field = fields + (*n_fields); + field->array_view = view; + field->buffer_offset = *n_buffers; + + for (int i = 0; i < 3; i++) { + *n_buffers += view->layout.buffer_type[i] != NANOARROW_BUFFER_TYPE_NONE; + } + + *n_fields += 1; + + for (int64_t i = 0; i < view->n_children; i++) { + ArrowIpcReaderInitFields(fields, view->children[i], n_fields, n_buffers); + } +} + +ArrowErrorCode ArrowIpcReaderSetSchema(struct ArrowIpcReader* reader, + struct ArrowSchema* schema, + struct ArrowError* error) { + ArrowArrayViewReset(&reader->array_view); + + if (reader->fields != NULL) { + ArrowFree(reader->fields); + } + + // Allocate Array and ArrayView based on schema without moving the schema + // this will fail if the schema is not valid. + NANOARROW_RETURN_NOT_OK( + ArrowArrayViewInitFromSchema(&reader->array_view, schema, error)); + + // Root must be a struct + if (reader->array_view.storage_type != NANOARROW_TYPE_STRUCT) { + ArrowErrorSet(error, "schema must be a struct type"); + return EINVAL; + } + + // Walk tree and calculate how many fields we need to allocate + reader->n_fields = 0; + ArrowIpcReaderCountFields(schema, &reader->n_fields); + reader->fields = + (struct ArrowIpcField*)ArrowMalloc(reader->n_fields * sizeof(struct ArrowIpcField)); + if (reader->fields == NULL) { + ArrowErrorSet(error, "Failed to allocate reader->fields"); + return ENOMEM; + } + memset(reader->fields, 0, reader->n_fields * sizeof(struct ArrowIpcField)); + + // Init field information and calculate starting buffer offset for each + int64_t field_i = 0; + ArrowIpcReaderInitFields(reader->fields, &reader->array_view, &field_i, + &reader->n_buffers); + + return NANOARROW_OK; +} + +struct ArrowIpcArraySetter { + ns(FieldNode_vec_t) fields; + int64_t field_i; + ns(Buffer_vec_t) buffers; + int64_t buffer_i; + struct ArrowBufferView body; + enum ArrowIpcCompressionType codec; + enum ArrowIpcEndianness endianness; + enum ArrowIpcEndianness system_endianness; +}; + +static int ArrowIpcReaderMakeBuffer(struct ArrowIpcArraySetter* setter, int64_t offset, + int64_t length, struct ArrowBuffer* out, + struct ArrowError* error) { + if (length == 0) { + return NANOARROW_OK; + } + + // Check that this buffer fits within the body + if (offset < 0 || (offset + length) > setter->body.size_bytes) { + ArrowErrorSet(error, + "Buffer %ld requires body offsets [%ld..%ld) but body has size %ld", + (long)setter->buffer_i - 1, (long)offset, (long)offset + (long)length, + setter->body.size_bytes); + return EINVAL; + } + + struct ArrowBufferView view; + view.data.as_uint8 = setter->body.data.as_uint8 + offset; + view.size_bytes = length; + + if (setter->codec != NANOARROW_IPC_COMPRESSION_TYPE_NONE) { + ArrowErrorSet(error, "The nanoarrow_ipc extension does not support compression"); + return ENOTSUP; + } + + if (setter->endianness != NANOARROW_IPC_ENDIANNESS_UNINITIALIZED && + setter->endianness != setter->system_endianness) { + ArrowErrorSet(error, + "The nanoarrow_ipc extension does not support non-system endianness"); + return ENOTSUP; + } + + int result = ArrowBufferAppendBufferView(out, view); + if (result != NANOARROW_OK) { + ArrowErrorSet(error, "Failed to copy buffer"); Review Comment: Yeah, this should be extensible somehow...a `struct ArrowIpcBufferDecoder` or something. It also needs a way to inject compression support at runtime. I'll do both of those in a follow-up (this PR is mostly about ensuring type coverage and correctness of the flattening/unflattening). ########## extensions/nanoarrow_ipc/src/nanoarrow/nanoarrow_ipc_test.cc: ########## @@ -168,6 +204,151 @@ TEST(NanoarrowIpcTest, NanoarrowIpcDecodeSimpleSchema) { EXPECT_EQ(reader.schema.children[0]->flags, ARROW_FLAG_NULLABLE); EXPECT_STREQ(reader.schema.children[0]->format, "i"); + EXPECT_EQ(ArrowIpcReaderGetSchema(&reader, &schema, &error), NANOARROW_OK); + EXPECT_EQ(reader.schema.release, nullptr); + EXPECT_NE(schema.release, nullptr); + + schema.release(&schema); + ArrowIpcReaderReset(&reader); +} + +TEST(NanoarrowIpcTest, NanoarrowIpcDecodeSimpleRecordBatch) { + struct ArrowIpcReader reader; + struct ArrowError error; + struct ArrowSchema schema; + struct ArrowArray array; + + ArrowSchemaInit(&schema); + ASSERT_EQ(ArrowSchemaSetTypeStruct(&schema, 1), NANOARROW_OK); + ASSERT_EQ(ArrowSchemaSetType(schema.children[0], NANOARROW_TYPE_INT32), NANOARROW_OK); + + struct ArrowBufferView data; + data.data.as_uint8 = kSimpleRecordBatch; + data.size_bytes = sizeof(kSimpleRecordBatch); + + ArrowIpcReaderInit(&reader); + + // Attempt to get array should fail nicely here + EXPECT_EQ(ArrowIpcReaderGetArray(&reader, data, 0, nullptr, &error), EINVAL); + EXPECT_STREQ(error.message, "reader did not just decode a RecordBatch message"); + + ASSERT_EQ(ArrowIpcReaderSetSchema(&reader, &schema, nullptr), NANOARROW_OK); Review Comment: I think so...what's there now might be more accurately be called an `ArrowIpcDecoder`. There are so many ways to mix, match, and parallelize the various steps here based on the tools available to the caller. This library should probably provide a reader for the simple but common "just read it all as an ArrowArrayStream" (which would at least make sure that it's possible to do). ########## extensions/nanoarrow_ipc/src/nanoarrow/nanoarrow_ipc.c: ########## @@ -785,5 +847,252 @@ ArrowErrorCode ArrowIpcReaderDecode(struct ArrowIpcReader* reader, return EINVAL; } + reader->private_data = message_header; + return NANOARROW_OK; +} + +ArrowErrorCode ArrowIpcReaderGetSchema(struct ArrowIpcReader* reader, + struct ArrowSchema* out, + struct ArrowError* error) { + if (reader->schema.release == NULL) { + ArrowErrorSet(error, "reader does not contain a valid schema"); + return EINVAL; + } + + ArrowSchemaMove(&reader->schema, out); + return NANOARROW_OK; +} + +static void ArrowIpcReaderCountFields(struct ArrowSchema* schema, int64_t* n_fields) { + *n_fields += 1; + for (int64_t i = 0; i < schema->n_children; i++) { + ArrowIpcReaderCountFields(schema->children[i], n_fields); + } +} + +static void ArrowIpcReaderInitFields(struct ArrowIpcField* fields, + struct ArrowArrayView* view, int64_t* n_fields, + int64_t* n_buffers) { + struct ArrowIpcField* field = fields + (*n_fields); + field->array_view = view; + field->buffer_offset = *n_buffers; + + for (int i = 0; i < 3; i++) { + *n_buffers += view->layout.buffer_type[i] != NANOARROW_BUFFER_TYPE_NONE; + } + + *n_fields += 1; + + for (int64_t i = 0; i < view->n_children; i++) { + ArrowIpcReaderInitFields(fields, view->children[i], n_fields, n_buffers); + } +} + +ArrowErrorCode ArrowIpcReaderSetSchema(struct ArrowIpcReader* reader, + struct ArrowSchema* schema, + struct ArrowError* error) { + ArrowArrayViewReset(&reader->array_view); + + if (reader->fields != NULL) { + ArrowFree(reader->fields); + } + + // Allocate Array and ArrayView based on schema without moving the schema + // this will fail if the schema is not valid. + NANOARROW_RETURN_NOT_OK( + ArrowArrayViewInitFromSchema(&reader->array_view, schema, error)); + + // Root must be a struct + if (reader->array_view.storage_type != NANOARROW_TYPE_STRUCT) { + ArrowErrorSet(error, "schema must be a struct type"); + return EINVAL; + } + + // Walk tree and calculate how many fields we need to allocate + reader->n_fields = 0; + ArrowIpcReaderCountFields(schema, &reader->n_fields); + reader->fields = + (struct ArrowIpcField*)ArrowMalloc(reader->n_fields * sizeof(struct ArrowIpcField)); + if (reader->fields == NULL) { + ArrowErrorSet(error, "Failed to allocate reader->fields"); + return ENOMEM; + } + memset(reader->fields, 0, reader->n_fields * sizeof(struct ArrowIpcField)); + + // Init field information and calculate starting buffer offset for each + int64_t field_i = 0; + ArrowIpcReaderInitFields(reader->fields, &reader->array_view, &field_i, + &reader->n_buffers); + + return NANOARROW_OK; +} + +struct ArrowIpcArraySetter { + ns(FieldNode_vec_t) fields; + int64_t field_i; + ns(Buffer_vec_t) buffers; + int64_t buffer_i; + struct ArrowBufferView body; + enum ArrowIpcCompressionType codec; + enum ArrowIpcEndianness endianness; + enum ArrowIpcEndianness system_endianness; +}; + +static int ArrowIpcReaderMakeBuffer(struct ArrowIpcArraySetter* setter, int64_t offset, + int64_t length, struct ArrowBuffer* out, + struct ArrowError* error) { + if (length == 0) { + return NANOARROW_OK; + } + + // Check that this buffer fits within the body + if (offset < 0 || (offset + length) > setter->body.size_bytes) { + ArrowErrorSet(error, + "Buffer %ld requires body offsets [%ld..%ld) but body has size %ld", + (long)setter->buffer_i - 1, (long)offset, (long)offset + (long)length, + setter->body.size_bytes); + return EINVAL; + } + + struct ArrowBufferView view; + view.data.as_uint8 = setter->body.data.as_uint8 + offset; + view.size_bytes = length; + + if (setter->codec != NANOARROW_IPC_COMPRESSION_TYPE_NONE) { + ArrowErrorSet(error, "The nanoarrow_ipc extension does not support compression"); + return ENOTSUP; + } + + if (setter->endianness != NANOARROW_IPC_ENDIANNESS_UNINITIALIZED && + setter->endianness != setter->system_endianness) { + ArrowErrorSet(error, + "The nanoarrow_ipc extension does not support non-system endianness"); + return ENOTSUP; + } + + int result = ArrowBufferAppendBufferView(out, view); + if (result != NANOARROW_OK) { + ArrowErrorSet(error, "Failed to copy buffer"); + return result; + } + + return NANOARROW_OK; +} + +static int ArrowIpcReaderWalkGetArray(struct ArrowIpcArraySetter* setter, + struct ArrowArray* array, + struct ArrowError* error) { + ns(FieldNode_struct_t) field = + ns(FieldNode_vec_at(setter->fields, (size_t)setter->field_i)); + array->length = ns(FieldNode_length(field)); + array->null_count = ns(FieldNode_null_count(field)); + setter->field_i += 1; + + for (int64_t i = 0; i < array->n_buffers; i++) { + ns(Buffer_struct_t) buffer = + ns(Buffer_vec_at(setter->buffers, (size_t)setter->buffer_i)); + int64_t buffer_offset = ns(Buffer_offset(buffer)); + int64_t buffer_length = ns(Buffer_length(buffer)); + setter->buffer_i += 1; + + struct ArrowBuffer* buffer_dst = ArrowArrayBuffer(array, i); + NANOARROW_RETURN_NOT_OK(ArrowIpcReaderMakeBuffer(setter, buffer_offset, buffer_length, + buffer_dst, error)); + } + + for (int64_t i = 0; i < array->n_children; i++) { + NANOARROW_RETURN_NOT_OK( + ArrowIpcReaderWalkGetArray(setter, array->children[i], error)); + } + + return NANOARROW_OK; +} + +static int ArrowIpcArrayInitFromArrayView(struct ArrowArray* array, + struct ArrowArrayView* array_view) { + NANOARROW_RETURN_NOT_OK(ArrowArrayInitFromType(array, array_view->storage_type)); + NANOARROW_RETURN_NOT_OK(ArrowArrayAllocateChildren(array, array_view->n_children)); + for (int64_t i = 0; i < array_view->n_children; i++) { + NANOARROW_RETURN_NOT_OK( + ArrowIpcArrayInitFromArrayView(array->children[i], array_view->children[i])); + } + + return NANOARROW_OK; +} + +ArrowErrorCode ArrowIpcReaderGetArray(struct ArrowIpcReader* reader, + struct ArrowBufferView body, int64_t field_i, + struct ArrowArray* out, struct ArrowError* error) { + if (reader->private_data == NULL || + reader->message_type != NANOARROW_IPC_MESSAGE_TYPE_RECORD_BATCH) { + ArrowErrorSet(error, "reader did not just decode a RecordBatch message"); + return EINVAL; + } + + ns(RecordBatch_table_t) batch = (ns(RecordBatch_table_t))reader->private_data; + + // RecordBatch messages don't count the root node but reader->fields does + struct ArrowIpcField* root = reader->fields + field_i + 1; + + struct ArrowArray temp; + temp.release = NULL; + int result = ArrowIpcArrayInitFromArrayView(&temp, root->array_view); + if (result != NANOARROW_OK) { + ArrowErrorSet(error, "Failed to initialize output array"); + return result; + } + + struct ArrowIpcArraySetter setter; + setter.fields = ns(RecordBatch_nodes(batch)); + setter.field_i = field_i; + setter.buffers = ns(RecordBatch_buffers(batch)); + setter.buffer_i = root->buffer_offset - 1; + setter.body = body; + setter.codec = reader->codec; + setter.endianness = reader->endianness; + + // This should probably be done at compile time + uint32_t check = 1; + char first_byte; + memcpy(&first_byte, &check, sizeof(char)); + if (first_byte) { + setter.system_endianness = NANOARROW_IPC_ENDIANNESS_LITTLE; + } else { + setter.system_endianness = NANOARROW_IPC_ENDIANNESS_BIG; + } + + // The flatbuffers FieldNode doesn't count the root struct so we have to loop over the + // children ourselves + if (field_i == -1) { + temp.length = ns(RecordBatch_length(batch)); + temp.null_count = 0; + setter.field_i++; + setter.buffer_i++; + + for (int64_t i = 0; i < temp.n_children; i++) { + result = ArrowIpcReaderWalkGetArray(&setter, temp.children[i], error); + if (result != NANOARROW_OK) { + temp.release(&temp); + return result; + } + } + } else { + result = ArrowIpcReaderWalkGetArray(&setter, &temp, error); + if (result != NANOARROW_OK) { + temp.release(&temp); + return result; + } + } + + // TODO: this performs some validation but doesn't do everything we need it to do Review Comment: Definitely! -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: github-unsubscr...@arrow.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org