paleolimbot commented on code in PR #143:
URL: https://github.com/apache/arrow-nanoarrow/pull/143#discussion_r1129429566


##########
extensions/nanoarrow_ipc/src/nanoarrow/nanoarrow_ipc.c:
##########
@@ -785,5 +847,252 @@ ArrowErrorCode ArrowIpcReaderDecode(struct 
ArrowIpcReader* reader,
       return EINVAL;
   }
 
+  reader->private_data = message_header;
+  return NANOARROW_OK;
+}
+
+ArrowErrorCode ArrowIpcReaderGetSchema(struct ArrowIpcReader* reader,
+                                       struct ArrowSchema* out,
+                                       struct ArrowError* error) {
+  if (reader->schema.release == NULL) {
+    ArrowErrorSet(error, "reader does not contain a valid schema");
+    return EINVAL;
+  }
+
+  ArrowSchemaMove(&reader->schema, out);
+  return NANOARROW_OK;
+}
+
+static void ArrowIpcReaderCountFields(struct ArrowSchema* schema, int64_t* 
n_fields) {
+  *n_fields += 1;
+  for (int64_t i = 0; i < schema->n_children; i++) {
+    ArrowIpcReaderCountFields(schema->children[i], n_fields);
+  }
+}
+
+static void ArrowIpcReaderInitFields(struct ArrowIpcField* fields,
+                                     struct ArrowArrayView* view, int64_t* 
n_fields,
+                                     int64_t* n_buffers) {
+  struct ArrowIpcField* field = fields + (*n_fields);
+  field->array_view = view;
+  field->buffer_offset = *n_buffers;
+
+  for (int i = 0; i < 3; i++) {
+    *n_buffers += view->layout.buffer_type[i] != NANOARROW_BUFFER_TYPE_NONE;
+  }
+
+  *n_fields += 1;
+
+  for (int64_t i = 0; i < view->n_children; i++) {
+    ArrowIpcReaderInitFields(fields, view->children[i], n_fields, n_buffers);
+  }
+}
+
+ArrowErrorCode ArrowIpcReaderSetSchema(struct ArrowIpcReader* reader,
+                                       struct ArrowSchema* schema,
+                                       struct ArrowError* error) {
+  ArrowArrayViewReset(&reader->array_view);
+
+  if (reader->fields != NULL) {
+    ArrowFree(reader->fields);
+  }
+
+  // Allocate Array and ArrayView based on schema without moving the schema
+  // this will fail if the schema is not valid.
+  NANOARROW_RETURN_NOT_OK(
+      ArrowArrayViewInitFromSchema(&reader->array_view, schema, error));
+
+  // Root must be a struct
+  if (reader->array_view.storage_type != NANOARROW_TYPE_STRUCT) {
+    ArrowErrorSet(error, "schema must be a struct type");
+    return EINVAL;
+  }
+
+  // Walk tree and calculate how many fields we need to allocate
+  reader->n_fields = 0;
+  ArrowIpcReaderCountFields(schema, &reader->n_fields);
+  reader->fields =
+      (struct ArrowIpcField*)ArrowMalloc(reader->n_fields * sizeof(struct 
ArrowIpcField));
+  if (reader->fields == NULL) {
+    ArrowErrorSet(error, "Failed to allocate reader->fields");
+    return ENOMEM;
+  }
+  memset(reader->fields, 0, reader->n_fields * sizeof(struct ArrowIpcField));
+
+  // Init field information and calculate starting buffer offset for each
+  int64_t field_i = 0;
+  ArrowIpcReaderInitFields(reader->fields, &reader->array_view, &field_i,
+                           &reader->n_buffers);
+
+  return NANOARROW_OK;
+}
+
+struct ArrowIpcArraySetter {
+  ns(FieldNode_vec_t) fields;
+  int64_t field_i;
+  ns(Buffer_vec_t) buffers;
+  int64_t buffer_i;
+  struct ArrowBufferView body;
+  enum ArrowIpcCompressionType codec;
+  enum ArrowIpcEndianness endianness;
+  enum ArrowIpcEndianness system_endianness;
+};
+
+static int ArrowIpcReaderMakeBuffer(struct ArrowIpcArraySetter* setter, 
int64_t offset,
+                                    int64_t length, struct ArrowBuffer* out,
+                                    struct ArrowError* error) {
+  if (length == 0) {
+    return NANOARROW_OK;
+  }
+
+  // Check that this buffer fits within the body
+  if (offset < 0 || (offset + length) > setter->body.size_bytes) {
+    ArrowErrorSet(error,
+                  "Buffer %ld requires body offsets [%ld..%ld) but body has 
size %ld",
+                  (long)setter->buffer_i - 1, (long)offset, (long)offset + 
(long)length,
+                  setter->body.size_bytes);
+    return EINVAL;
+  }
+
+  struct ArrowBufferView view;
+  view.data.as_uint8 = setter->body.data.as_uint8 + offset;
+  view.size_bytes = length;
+
+  if (setter->codec != NANOARROW_IPC_COMPRESSION_TYPE_NONE) {
+    ArrowErrorSet(error, "The nanoarrow_ipc extension does not support 
compression");
+    return ENOTSUP;
+  }
+
+  if (setter->endianness != NANOARROW_IPC_ENDIANNESS_UNINITIALIZED &&
+      setter->endianness != setter->system_endianness) {
+    ArrowErrorSet(error,
+                  "The nanoarrow_ipc extension does not support non-system 
endianness");
+    return ENOTSUP;
+  }
+
+  int result = ArrowBufferAppendBufferView(out, view);
+  if (result != NANOARROW_OK) {
+    ArrowErrorSet(error, "Failed to copy buffer");

Review Comment:
   Yeah, this should be extensible somehow...a `struct ArrowIpcBufferDecoder` 
or something. It also needs a way to inject compression support at runtime. 
I'll do both of those in a follow-up (this PR is mostly about ensuring type 
coverage and correctness of the flattening/unflattening).



##########
extensions/nanoarrow_ipc/src/nanoarrow/nanoarrow_ipc_test.cc:
##########
@@ -168,6 +204,151 @@ TEST(NanoarrowIpcTest, NanoarrowIpcDecodeSimpleSchema) {
   EXPECT_EQ(reader.schema.children[0]->flags, ARROW_FLAG_NULLABLE);
   EXPECT_STREQ(reader.schema.children[0]->format, "i");
 
+  EXPECT_EQ(ArrowIpcReaderGetSchema(&reader, &schema, &error), NANOARROW_OK);
+  EXPECT_EQ(reader.schema.release, nullptr);
+  EXPECT_NE(schema.release, nullptr);
+
+  schema.release(&schema);
+  ArrowIpcReaderReset(&reader);
+}
+
+TEST(NanoarrowIpcTest, NanoarrowIpcDecodeSimpleRecordBatch) {
+  struct ArrowIpcReader reader;
+  struct ArrowError error;
+  struct ArrowSchema schema;
+  struct ArrowArray array;
+
+  ArrowSchemaInit(&schema);
+  ASSERT_EQ(ArrowSchemaSetTypeStruct(&schema, 1), NANOARROW_OK);
+  ASSERT_EQ(ArrowSchemaSetType(schema.children[0], NANOARROW_TYPE_INT32), 
NANOARROW_OK);
+
+  struct ArrowBufferView data;
+  data.data.as_uint8 = kSimpleRecordBatch;
+  data.size_bytes = sizeof(kSimpleRecordBatch);
+
+  ArrowIpcReaderInit(&reader);
+
+  // Attempt to get array should fail nicely here
+  EXPECT_EQ(ArrowIpcReaderGetArray(&reader, data, 0, nullptr, &error), EINVAL);
+  EXPECT_STREQ(error.message, "reader did not just decode a RecordBatch 
message");
+
+  ASSERT_EQ(ArrowIpcReaderSetSchema(&reader, &schema, nullptr), NANOARROW_OK);

Review Comment:
   I think so...what's there now might be more accurately be called an 
`ArrowIpcDecoder`. There are so many ways to mix, match, and parallelize the 
various steps here based on the tools available to the caller. This library 
should probably provide a reader for the simple but common "just read it all as 
an ArrowArrayStream" (which would at least make sure that it's possible to do).



##########
extensions/nanoarrow_ipc/src/nanoarrow/nanoarrow_ipc.c:
##########
@@ -785,5 +847,252 @@ ArrowErrorCode ArrowIpcReaderDecode(struct 
ArrowIpcReader* reader,
       return EINVAL;
   }
 
+  reader->private_data = message_header;
+  return NANOARROW_OK;
+}
+
+ArrowErrorCode ArrowIpcReaderGetSchema(struct ArrowIpcReader* reader,
+                                       struct ArrowSchema* out,
+                                       struct ArrowError* error) {
+  if (reader->schema.release == NULL) {
+    ArrowErrorSet(error, "reader does not contain a valid schema");
+    return EINVAL;
+  }
+
+  ArrowSchemaMove(&reader->schema, out);
+  return NANOARROW_OK;
+}
+
+static void ArrowIpcReaderCountFields(struct ArrowSchema* schema, int64_t* 
n_fields) {
+  *n_fields += 1;
+  for (int64_t i = 0; i < schema->n_children; i++) {
+    ArrowIpcReaderCountFields(schema->children[i], n_fields);
+  }
+}
+
+static void ArrowIpcReaderInitFields(struct ArrowIpcField* fields,
+                                     struct ArrowArrayView* view, int64_t* 
n_fields,
+                                     int64_t* n_buffers) {
+  struct ArrowIpcField* field = fields + (*n_fields);
+  field->array_view = view;
+  field->buffer_offset = *n_buffers;
+
+  for (int i = 0; i < 3; i++) {
+    *n_buffers += view->layout.buffer_type[i] != NANOARROW_BUFFER_TYPE_NONE;
+  }
+
+  *n_fields += 1;
+
+  for (int64_t i = 0; i < view->n_children; i++) {
+    ArrowIpcReaderInitFields(fields, view->children[i], n_fields, n_buffers);
+  }
+}
+
+ArrowErrorCode ArrowIpcReaderSetSchema(struct ArrowIpcReader* reader,
+                                       struct ArrowSchema* schema,
+                                       struct ArrowError* error) {
+  ArrowArrayViewReset(&reader->array_view);
+
+  if (reader->fields != NULL) {
+    ArrowFree(reader->fields);
+  }
+
+  // Allocate Array and ArrayView based on schema without moving the schema
+  // this will fail if the schema is not valid.
+  NANOARROW_RETURN_NOT_OK(
+      ArrowArrayViewInitFromSchema(&reader->array_view, schema, error));
+
+  // Root must be a struct
+  if (reader->array_view.storage_type != NANOARROW_TYPE_STRUCT) {
+    ArrowErrorSet(error, "schema must be a struct type");
+    return EINVAL;
+  }
+
+  // Walk tree and calculate how many fields we need to allocate
+  reader->n_fields = 0;
+  ArrowIpcReaderCountFields(schema, &reader->n_fields);
+  reader->fields =
+      (struct ArrowIpcField*)ArrowMalloc(reader->n_fields * sizeof(struct 
ArrowIpcField));
+  if (reader->fields == NULL) {
+    ArrowErrorSet(error, "Failed to allocate reader->fields");
+    return ENOMEM;
+  }
+  memset(reader->fields, 0, reader->n_fields * sizeof(struct ArrowIpcField));
+
+  // Init field information and calculate starting buffer offset for each
+  int64_t field_i = 0;
+  ArrowIpcReaderInitFields(reader->fields, &reader->array_view, &field_i,
+                           &reader->n_buffers);
+
+  return NANOARROW_OK;
+}
+
+struct ArrowIpcArraySetter {
+  ns(FieldNode_vec_t) fields;
+  int64_t field_i;
+  ns(Buffer_vec_t) buffers;
+  int64_t buffer_i;
+  struct ArrowBufferView body;
+  enum ArrowIpcCompressionType codec;
+  enum ArrowIpcEndianness endianness;
+  enum ArrowIpcEndianness system_endianness;
+};
+
+static int ArrowIpcReaderMakeBuffer(struct ArrowIpcArraySetter* setter, 
int64_t offset,
+                                    int64_t length, struct ArrowBuffer* out,
+                                    struct ArrowError* error) {
+  if (length == 0) {
+    return NANOARROW_OK;
+  }
+
+  // Check that this buffer fits within the body
+  if (offset < 0 || (offset + length) > setter->body.size_bytes) {
+    ArrowErrorSet(error,
+                  "Buffer %ld requires body offsets [%ld..%ld) but body has 
size %ld",
+                  (long)setter->buffer_i - 1, (long)offset, (long)offset + 
(long)length,
+                  setter->body.size_bytes);
+    return EINVAL;
+  }
+
+  struct ArrowBufferView view;
+  view.data.as_uint8 = setter->body.data.as_uint8 + offset;
+  view.size_bytes = length;
+
+  if (setter->codec != NANOARROW_IPC_COMPRESSION_TYPE_NONE) {
+    ArrowErrorSet(error, "The nanoarrow_ipc extension does not support 
compression");
+    return ENOTSUP;
+  }
+
+  if (setter->endianness != NANOARROW_IPC_ENDIANNESS_UNINITIALIZED &&
+      setter->endianness != setter->system_endianness) {
+    ArrowErrorSet(error,
+                  "The nanoarrow_ipc extension does not support non-system 
endianness");
+    return ENOTSUP;
+  }
+
+  int result = ArrowBufferAppendBufferView(out, view);
+  if (result != NANOARROW_OK) {
+    ArrowErrorSet(error, "Failed to copy buffer");
+    return result;
+  }
+
+  return NANOARROW_OK;
+}
+
+static int ArrowIpcReaderWalkGetArray(struct ArrowIpcArraySetter* setter,
+                                      struct ArrowArray* array,
+                                      struct ArrowError* error) {
+  ns(FieldNode_struct_t) field =
+      ns(FieldNode_vec_at(setter->fields, (size_t)setter->field_i));
+  array->length = ns(FieldNode_length(field));
+  array->null_count = ns(FieldNode_null_count(field));
+  setter->field_i += 1;
+
+  for (int64_t i = 0; i < array->n_buffers; i++) {
+    ns(Buffer_struct_t) buffer =
+        ns(Buffer_vec_at(setter->buffers, (size_t)setter->buffer_i));
+    int64_t buffer_offset = ns(Buffer_offset(buffer));
+    int64_t buffer_length = ns(Buffer_length(buffer));
+    setter->buffer_i += 1;
+
+    struct ArrowBuffer* buffer_dst = ArrowArrayBuffer(array, i);
+    NANOARROW_RETURN_NOT_OK(ArrowIpcReaderMakeBuffer(setter, buffer_offset, 
buffer_length,
+                                                     buffer_dst, error));
+  }
+
+  for (int64_t i = 0; i < array->n_children; i++) {
+    NANOARROW_RETURN_NOT_OK(
+        ArrowIpcReaderWalkGetArray(setter, array->children[i], error));
+  }
+
+  return NANOARROW_OK;
+}
+
+static int ArrowIpcArrayInitFromArrayView(struct ArrowArray* array,
+                                          struct ArrowArrayView* array_view) {
+  NANOARROW_RETURN_NOT_OK(ArrowArrayInitFromType(array, 
array_view->storage_type));
+  NANOARROW_RETURN_NOT_OK(ArrowArrayAllocateChildren(array, 
array_view->n_children));
+  for (int64_t i = 0; i < array_view->n_children; i++) {
+    NANOARROW_RETURN_NOT_OK(
+        ArrowIpcArrayInitFromArrayView(array->children[i], 
array_view->children[i]));
+  }
+
+  return NANOARROW_OK;
+}
+
+ArrowErrorCode ArrowIpcReaderGetArray(struct ArrowIpcReader* reader,
+                                      struct ArrowBufferView body, int64_t 
field_i,
+                                      struct ArrowArray* out, struct 
ArrowError* error) {
+  if (reader->private_data == NULL ||
+      reader->message_type != NANOARROW_IPC_MESSAGE_TYPE_RECORD_BATCH) {
+    ArrowErrorSet(error, "reader did not just decode a RecordBatch message");
+    return EINVAL;
+  }
+
+  ns(RecordBatch_table_t) batch = 
(ns(RecordBatch_table_t))reader->private_data;
+
+  // RecordBatch messages don't count the root node but reader->fields does
+  struct ArrowIpcField* root = reader->fields + field_i + 1;
+
+  struct ArrowArray temp;
+  temp.release = NULL;
+  int result = ArrowIpcArrayInitFromArrayView(&temp, root->array_view);
+  if (result != NANOARROW_OK) {
+    ArrowErrorSet(error, "Failed to initialize output array");
+    return result;
+  }
+
+  struct ArrowIpcArraySetter setter;
+  setter.fields = ns(RecordBatch_nodes(batch));
+  setter.field_i = field_i;
+  setter.buffers = ns(RecordBatch_buffers(batch));
+  setter.buffer_i = root->buffer_offset - 1;
+  setter.body = body;
+  setter.codec = reader->codec;
+  setter.endianness = reader->endianness;
+
+  // This should probably be done at compile time
+  uint32_t check = 1;
+  char first_byte;
+  memcpy(&first_byte, &check, sizeof(char));
+  if (first_byte) {
+    setter.system_endianness = NANOARROW_IPC_ENDIANNESS_LITTLE;
+  } else {
+    setter.system_endianness = NANOARROW_IPC_ENDIANNESS_BIG;
+  }
+
+  // The flatbuffers FieldNode doesn't count the root struct so we have to 
loop over the
+  // children ourselves
+  if (field_i == -1) {
+    temp.length = ns(RecordBatch_length(batch));
+    temp.null_count = 0;
+    setter.field_i++;
+    setter.buffer_i++;
+
+    for (int64_t i = 0; i < temp.n_children; i++) {
+      result = ArrowIpcReaderWalkGetArray(&setter, temp.children[i], error);
+      if (result != NANOARROW_OK) {
+        temp.release(&temp);
+        return result;
+      }
+    }
+  } else {
+    result = ArrowIpcReaderWalkGetArray(&setter, &temp, error);
+    if (result != NANOARROW_OK) {
+      temp.release(&temp);
+      return result;
+    }
+  }
+
+  // TODO: this performs some validation but doesn't do everything we need it 
to do

Review Comment:
   Definitely!



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscr...@arrow.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to