This is an automated email from the ASF dual-hosted git repository.

paleolimbot pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/arrow-nanoarrow.git


The following commit(s) were added to refs/heads/main by this push:
     new 6437f59  feat(extensions/nanoarrow_ipc): Decode RecordBatch message to 
ArrowArray (#143)
6437f59 is described below

commit 6437f59dc45509c78a1d9403942d6d9d3e85895b
Author: Dewey Dunnington <[email protected]>
AuthorDate: Wed Mar 8 15:13:33 2023 -0400

    feat(extensions/nanoarrow_ipc): Decode RecordBatch message to ArrowArray 
(#143)
    
    Closes #91.
    
    After this PR you should be able to decode any (!!) record batch message
    into a `struct ArrowArray`:
    
    ```c++
    ArrowIpcReaderInit(&reader);
    
    // Set a schema
    ArrowIpcReaderSetSchema(&reader, some_schema);
    
    // Decode the header and advance the data pointer
    ArrowIpcReaderDecode(&reader, data, &error);
    buffer_view.data.as_uint8 += data.header_size_bytes;
    buffer_view.size_bytes -= data.header_size_bytes;
    
    // Can decode one field at a time or the whole struct by passing -1
    ArrowIpcReaderGetArray(&reader, data, -1, &array, &error);
    ```
---
 .../nanoarrow_ipc/src/nanoarrow/nanoarrow_ipc.c    | 703 ++++++++++++++++-----
 .../nanoarrow_ipc/src/nanoarrow/nanoarrow_ipc.h    | 178 +++++-
 .../src/nanoarrow/nanoarrow_ipc_test.cc            | 373 +++++++++--
 3 files changed, 1014 insertions(+), 240 deletions(-)

diff --git a/extensions/nanoarrow_ipc/src/nanoarrow/nanoarrow_ipc.c 
b/extensions/nanoarrow_ipc/src/nanoarrow/nanoarrow_ipc.c
index b2d7cbe..3c0f371 100644
--- a/extensions/nanoarrow_ipc/src/nanoarrow/nanoarrow_ipc.c
+++ b/extensions/nanoarrow_ipc/src/nanoarrow/nanoarrow_ipc.c
@@ -23,6 +23,19 @@
 #include "nanoarrow_ipc.h"
 #include "nanoarrow_ipc_flatcc_generated.h"
 
+struct ArrowIpcField {
+  struct ArrowArrayView* array_view;
+  int64_t buffer_offset;
+};
+
+struct ArrowIpcDecoderPrivate {
+  struct ArrowArrayView array_view;
+  int64_t n_fields;
+  struct ArrowIpcField* fields;
+  int64_t n_buffers;
+  const void* last_message;
+};
+
 ArrowErrorCode ArrowIpcCheckRuntime(struct ArrowError* error) {
   const char* nanoarrow_runtime_version = ArrowNanoarrowVersion();
   const char* nanoarrow_ipc_build_time_version = NANOARROW_VERSION;
@@ -36,16 +49,32 @@ ArrowErrorCode ArrowIpcCheckRuntime(struct ArrowError* 
error) {
   return NANOARROW_OK;
 }
 
-void ArrowIpcReaderInit(struct ArrowIpcReader* reader) {
-  memset(reader, 0, sizeof(struct ArrowIpcReader));
+ArrowErrorCode ArrowIpcDecoderInit(struct ArrowIpcDecoder* decoder) {
+  memset(decoder, 0, sizeof(struct ArrowIpcDecoder));
+  struct ArrowIpcDecoderPrivate* private_data =
+      (struct ArrowIpcDecoderPrivate*)ArrowMalloc(sizeof(struct 
ArrowIpcDecoderPrivate));
+  if (private_data == NULL) {
+    return ENOMEM;
+  }
+
+  memset(private_data, 0, sizeof(struct ArrowIpcDecoderPrivate));
+  decoder->private_data = private_data;
+  return NANOARROW_OK;
 }
 
-void ArrowIpcReaderReset(struct ArrowIpcReader* reader) {
-  if (reader->schema.release != NULL) {
-    reader->schema.release(&reader->schema);
+void ArrowIpcDecoderReset(struct ArrowIpcDecoder* decoder) {
+  struct ArrowIpcDecoderPrivate* private_data =
+      (struct ArrowIpcDecoderPrivate*)decoder->private_data;
+
+  ArrowArrayViewReset(&private_data->array_view);
+
+  if (private_data->fields != NULL) {
+    ArrowFree(private_data->fields);
+    private_data->n_fields = 0;
   }
 
-  ArrowIpcReaderInit(reader);
+  ArrowFree(private_data);
+  memset(decoder, 0, sizeof(struct ArrowIpcDecoder));
 }
 
 static inline uint32_t ArrowIpcReadUint32LE(struct ArrowBufferView* data) {
@@ -68,9 +97,9 @@ static inline int32_t ArrowIpcReadInt32LE(struct 
ArrowBufferView* data) {
 
 #define ns(x) FLATBUFFERS_WRAP_NAMESPACE(org_apache_arrow_flatbuf, x)
 
-static int ArrowIpcReaderSetMetadata(struct ArrowSchema* schema,
-                                     ns(KeyValue_vec_t) kv_vec,
-                                     struct ArrowError* error) {
+static int ArrowIpcDecoderSetMetadata(struct ArrowSchema* schema,
+                                      ns(KeyValue_vec_t) kv_vec,
+                                      struct ArrowError* error) {
   int64_t n_pairs = ns(KeyValue_vec_len(kv_vec));
   if (n_pairs == 0) {
     return NANOARROW_OK;
@@ -121,8 +150,8 @@ static int ArrowIpcReaderSetMetadata(struct ArrowSchema* 
schema,
   return NANOARROW_OK;
 }
 
-static int ArrowIpcReaderSetTypeSimple(struct ArrowSchema* schema, int 
nanoarrow_type,
-                                       struct ArrowError* error) {
+static int ArrowIpcDecoderSetTypeSimple(struct ArrowSchema* schema, int 
nanoarrow_type,
+                                        struct ArrowError* error) {
   int result = ArrowSchemaSetType(schema, nanoarrow_type);
   if (result != NANOARROW_OK) {
     ArrowErrorSet(error, "ArrowSchemaSetType() failed for type %s",
@@ -133,9 +162,9 @@ static int ArrowIpcReaderSetTypeSimple(struct ArrowSchema* 
schema, int nanoarrow
   return NANOARROW_OK;
 }
 
-static int ArrowIpcReaderSetTypeInt(struct ArrowSchema* schema,
-                                    flatbuffers_generic_t type_generic,
-                                    struct ArrowError* error) {
+static int ArrowIpcDecoderSetTypeInt(struct ArrowSchema* schema,
+                                     flatbuffers_generic_t type_generic,
+                                     struct ArrowError* error) {
   ns(Int_table_t) type = (ns(Int_table_t))type_generic;
 
   int is_signed = ns(Int_is_signed_get(type));
@@ -184,21 +213,21 @@ static int ArrowIpcReaderSetTypeInt(struct ArrowSchema* 
schema,
     }
   }
 
-  return ArrowIpcReaderSetTypeSimple(schema, nanoarrow_type, error);
+  return ArrowIpcDecoderSetTypeSimple(schema, nanoarrow_type, error);
 }
 
-static int ArrowIpcReaderSetTypeFloatingPoint(struct ArrowSchema* schema,
-                                              flatbuffers_generic_t 
type_generic,
-                                              struct ArrowError* error) {
+static int ArrowIpcDecoderSetTypeFloatingPoint(struct ArrowSchema* schema,
+                                               flatbuffers_generic_t 
type_generic,
+                                               struct ArrowError* error) {
   ns(FloatingPoint_table_t) type = (ns(FloatingPoint_table_t))type_generic;
   int precision = ns(FloatingPoint_precision(type));
   switch (precision) {
     case ns(Precision_HALF):
-      return ArrowIpcReaderSetTypeSimple(schema, NANOARROW_TYPE_HALF_FLOAT, 
error);
+      return ArrowIpcDecoderSetTypeSimple(schema, NANOARROW_TYPE_HALF_FLOAT, 
error);
     case ns(Precision_SINGLE):
-      return ArrowIpcReaderSetTypeSimple(schema, NANOARROW_TYPE_FLOAT, error);
+      return ArrowIpcDecoderSetTypeSimple(schema, NANOARROW_TYPE_FLOAT, error);
     case ns(Precision_DOUBLE):
-      return ArrowIpcReaderSetTypeSimple(schema, NANOARROW_TYPE_DOUBLE, error);
+      return ArrowIpcDecoderSetTypeSimple(schema, NANOARROW_TYPE_DOUBLE, 
error);
     default:
       ArrowErrorSet(error, "Unexpected FloatingPoint Precision value: %d",
                     (int)precision);
@@ -206,9 +235,9 @@ static int ArrowIpcReaderSetTypeFloatingPoint(struct 
ArrowSchema* schema,
   }
 }
 
-static int ArrowIpcReaderSetTypeDecimal(struct ArrowSchema* schema,
-                                        flatbuffers_generic_t type_generic,
-                                        struct ArrowError* error) {
+static int ArrowIpcDecoderSetTypeDecimal(struct ArrowSchema* schema,
+                                         flatbuffers_generic_t type_generic,
+                                         struct ArrowError* error) {
   ns(Decimal_table_t) type = (ns(Decimal_table_t))type_generic;
   int scale = ns(Decimal_scale(type));
   int precision = ns(Decimal_precision(type));
@@ -237,34 +266,34 @@ static int ArrowIpcReaderSetTypeDecimal(struct 
ArrowSchema* schema,
   return NANOARROW_OK;
 }
 
-static int ArrowIpcReaderSetTypeFixedSizeBinary(struct ArrowSchema* schema,
-                                                flatbuffers_generic_t 
type_generic,
-                                                struct ArrowError* error) {
+static int ArrowIpcDecoderSetTypeFixedSizeBinary(struct ArrowSchema* schema,
+                                                 flatbuffers_generic_t 
type_generic,
+                                                 struct ArrowError* error) {
   ns(FixedSizeBinary_table_t) type = (ns(FixedSizeBinary_table_t))type_generic;
   int fixed_size = ns(FixedSizeBinary_byteWidth(type));
   return ArrowSchemaSetTypeFixedSize(schema, NANOARROW_TYPE_FIXED_SIZE_BINARY,
                                      fixed_size);
 }
 
-static int ArrowIpcReaderSetTypeDate(struct ArrowSchema* schema,
-                                     flatbuffers_generic_t type_generic,
-                                     struct ArrowError* error) {
+static int ArrowIpcDecoderSetTypeDate(struct ArrowSchema* schema,
+                                      flatbuffers_generic_t type_generic,
+                                      struct ArrowError* error) {
   ns(Date_table_t) type = (ns(Date_table_t))type_generic;
   int date_unit = ns(Date_unit(type));
   switch (date_unit) {
     case ns(DateUnit_DAY):
-      return ArrowIpcReaderSetTypeSimple(schema, NANOARROW_TYPE_DATE32, error);
+      return ArrowIpcDecoderSetTypeSimple(schema, NANOARROW_TYPE_DATE32, 
error);
     case ns(DateUnit_MILLISECOND):
-      return ArrowIpcReaderSetTypeSimple(schema, NANOARROW_TYPE_DATE64, error);
+      return ArrowIpcDecoderSetTypeSimple(schema, NANOARROW_TYPE_DATE64, 
error);
     default:
       ArrowErrorSet(error, "Unexpected Date DateUnit value: %d", 
(int)date_unit);
       return EINVAL;
   }
 }
 
-static int ArrowIpcReaderSetTypeTime(struct ArrowSchema* schema,
-                                     flatbuffers_generic_t type_generic,
-                                     struct ArrowError* error) {
+static int ArrowIpcDecoderSetTypeTime(struct ArrowSchema* schema,
+                                      flatbuffers_generic_t type_generic,
+                                      struct ArrowError* error) {
   ns(Time_table_t) type = (ns(Time_table_t))type_generic;
   int time_unit = ns(Time_unit(type));
   int bitwidth = ns(Time_bitWidth(type));
@@ -307,9 +336,9 @@ static int ArrowIpcReaderSetTypeTime(struct ArrowSchema* 
schema,
   return NANOARROW_OK;
 }
 
-static int ArrowIpcReaderSetTypeTimestamp(struct ArrowSchema* schema,
-                                          flatbuffers_generic_t type_generic,
-                                          struct ArrowError* error) {
+static int ArrowIpcDecoderSetTypeTimestamp(struct ArrowSchema* schema,
+                                           flatbuffers_generic_t type_generic,
+                                           struct ArrowError* error) {
   ns(Timestamp_table_t) type = (ns(Timestamp_table_t))type_generic;
   int time_unit = ns(Timestamp_unit(type));
 
@@ -328,9 +357,9 @@ static int ArrowIpcReaderSetTypeTimestamp(struct 
ArrowSchema* schema,
   return NANOARROW_OK;
 }
 
-static int ArrowIpcReaderSetTypeDuration(struct ArrowSchema* schema,
-                                         flatbuffers_generic_t type_generic,
-                                         struct ArrowError* error) {
+static int ArrowIpcDecoderSetTypeDuration(struct ArrowSchema* schema,
+                                          flatbuffers_generic_t type_generic,
+                                          struct ArrowError* error) {
   ns(Duration_table_t) type = (ns(Duration_table_t))type_generic;
   int time_unit = ns(Duration_unit(type));
 
@@ -344,20 +373,21 @@ static int ArrowIpcReaderSetTypeDuration(struct 
ArrowSchema* schema,
   return NANOARROW_OK;
 }
 
-static int ArrowIpcReaderSetTypeInterval(struct ArrowSchema* schema,
-                                         flatbuffers_generic_t type_generic,
-                                         struct ArrowError* error) {
+static int ArrowIpcDecoderSetTypeInterval(struct ArrowSchema* schema,
+                                          flatbuffers_generic_t type_generic,
+                                          struct ArrowError* error) {
   ns(Interval_table_t) type = (ns(Interval_table_t))type_generic;
   int interval_unit = ns(Interval_unit(type));
 
   switch (interval_unit) {
     case ns(IntervalUnit_YEAR_MONTH):
-      return ArrowIpcReaderSetTypeSimple(schema, 
NANOARROW_TYPE_INTERVAL_MONTHS, error);
+      return ArrowIpcDecoderSetTypeSimple(schema, 
NANOARROW_TYPE_INTERVAL_MONTHS, error);
     case ns(IntervalUnit_DAY_TIME):
-      return ArrowIpcReaderSetTypeSimple(schema, 
NANOARROW_TYPE_INTERVAL_DAY_TIME, error);
+      return ArrowIpcDecoderSetTypeSimple(schema, 
NANOARROW_TYPE_INTERVAL_DAY_TIME,
+                                          error);
     case ns(IntervalUnit_MONTH_DAY_NANO):
-      return ArrowIpcReaderSetTypeSimple(schema, 
NANOARROW_TYPE_INTERVAL_MONTH_DAY_NANO,
-                                         error);
+      return ArrowIpcDecoderSetTypeSimple(schema, 
NANOARROW_TYPE_INTERVAL_MONTH_DAY_NANO,
+                                          error);
     default:
       ArrowErrorSet(error, "Unexpected Interval unit value: %d", 
(int)interval_unit);
       return EINVAL;
@@ -368,9 +398,9 @@ static int ArrowIpcReaderSetTypeInterval(struct 
ArrowSchema* schema,
 // because the IPC format allows modifying some of the defaults those 
functions assume.
 // In particular, the allocate + initialize children step is handled outside 
these
 // setters.
-static int ArrowIpcReaderSetTypeSimpleNested(struct ArrowSchema* schema,
-                                             const char* format,
-                                             struct ArrowError* error) {
+static int ArrowIpcDecoderSetTypeSimpleNested(struct ArrowSchema* schema,
+                                              const char* format,
+                                              struct ArrowError* error) {
   int result = ArrowSchemaSetFormat(schema, format);
   if (result != NANOARROW_OK) {
     ArrowErrorSet(error, "ArrowSchemaSetFormat('%s') failed", format);
@@ -380,23 +410,23 @@ static int ArrowIpcReaderSetTypeSimpleNested(struct 
ArrowSchema* schema,
   return NANOARROW_OK;
 }
 
-static int ArrowIpcReaderSetTypeFixedSizeList(struct ArrowSchema* schema,
-                                              flatbuffers_generic_t 
type_generic,
-                                              struct ArrowError* error) {
+static int ArrowIpcDecoderSetTypeFixedSizeList(struct ArrowSchema* schema,
+                                               flatbuffers_generic_t 
type_generic,
+                                               struct ArrowError* error) {
   ns(FixedSizeList_table_t) type = (ns(FixedSizeList_table_t))type_generic;
   int32_t fixed_size = ns(FixedSizeList_listSize(type));
 
   char fixed_size_str[128];
   int n_chars = snprintf(fixed_size_str, 128, "+w:%d", fixed_size);
   fixed_size_str[n_chars] = '\0';
-  return ArrowIpcReaderSetTypeSimpleNested(schema, fixed_size_str, error);
+  return ArrowIpcDecoderSetTypeSimpleNested(schema, fixed_size_str, error);
 }
 
-static int ArrowIpcReaderSetTypeMap(struct ArrowSchema* schema,
-                                    flatbuffers_generic_t type_generic,
-                                    struct ArrowError* error) {
+static int ArrowIpcDecoderSetTypeMap(struct ArrowSchema* schema,
+                                     flatbuffers_generic_t type_generic,
+                                     struct ArrowError* error) {
   ns(Map_table_t) type = (ns(Map_table_t))type_generic;
-  NANOARROW_RETURN_NOT_OK(ArrowIpcReaderSetTypeSimpleNested(schema, "+m", 
error));
+  NANOARROW_RETURN_NOT_OK(ArrowIpcDecoderSetTypeSimpleNested(schema, "+m", 
error));
 
   if (ns(Map_keysSorted(type))) {
     schema->flags |= ARROW_FLAG_MAP_KEYS_SORTED;
@@ -407,9 +437,9 @@ static int ArrowIpcReaderSetTypeMap(struct ArrowSchema* 
schema,
   return NANOARROW_OK;
 }
 
-static int ArrowIpcReaderSetTypeUnion(struct ArrowSchema* schema,
-                                      flatbuffers_generic_t type_generic,
-                                      int64_t n_children, struct ArrowError* 
error) {
+static int ArrowIpcDecoderSetTypeUnion(struct ArrowSchema* schema,
+                                       flatbuffers_generic_t type_generic,
+                                       int64_t n_children, struct ArrowError* 
error) {
   ns(Union_table_t) type = (ns(Union_table_t))type_generic;
   int union_mode = ns(Union_mode(type));
 
@@ -484,68 +514,70 @@ static int ArrowIpcReaderSetTypeUnion(struct ArrowSchema* 
schema,
     }
   }
 
-  return ArrowIpcReaderSetTypeSimpleNested(schema, union_types_str, error);
+  return ArrowIpcDecoderSetTypeSimpleNested(schema, union_types_str, error);
 }
 
-static int ArrowIpcReaderSetType(struct ArrowSchema* schema, ns(Field_table_t) 
field,
-                                 int64_t n_children, struct ArrowError* error) 
{
+static int ArrowIpcDecoderSetType(struct ArrowSchema* schema, 
ns(Field_table_t) field,
+                                  int64_t n_children, struct ArrowError* 
error) {
   int type_type = ns(Field_type_type(field));
   switch (type_type) {
     case ns(Type_Null):
-      return ArrowIpcReaderSetTypeSimple(schema, NANOARROW_TYPE_NA, error);
+      return ArrowIpcDecoderSetTypeSimple(schema, NANOARROW_TYPE_NA, error);
     case ns(Type_Bool):
-      return ArrowIpcReaderSetTypeSimple(schema, NANOARROW_TYPE_BOOL, error);
+      return ArrowIpcDecoderSetTypeSimple(schema, NANOARROW_TYPE_BOOL, error);
     case ns(Type_Int):
-      return ArrowIpcReaderSetTypeInt(schema, ns(Field_type_get(field)), 
error);
+      return ArrowIpcDecoderSetTypeInt(schema, ns(Field_type_get(field)), 
error);
     case ns(Type_FloatingPoint):
-      return ArrowIpcReaderSetTypeFloatingPoint(schema, 
ns(Field_type_get(field)), error);
+      return ArrowIpcDecoderSetTypeFloatingPoint(schema, 
ns(Field_type_get(field)),
+                                                 error);
     case ns(Type_Decimal):
-      return ArrowIpcReaderSetTypeDecimal(schema, ns(Field_type_get(field)), 
error);
+      return ArrowIpcDecoderSetTypeDecimal(schema, ns(Field_type_get(field)), 
error);
     case ns(Type_Binary):
-      return ArrowIpcReaderSetTypeSimple(schema, NANOARROW_TYPE_BINARY, error);
+      return ArrowIpcDecoderSetTypeSimple(schema, NANOARROW_TYPE_BINARY, 
error);
     case ns(Type_LargeBinary):
-      return ArrowIpcReaderSetTypeSimple(schema, NANOARROW_TYPE_LARGE_BINARY, 
error);
+      return ArrowIpcDecoderSetTypeSimple(schema, NANOARROW_TYPE_LARGE_BINARY, 
error);
     case ns(Type_FixedSizeBinary):
-      return ArrowIpcReaderSetTypeFixedSizeBinary(schema, 
ns(Field_type_get(field)),
-                                                  error);
+      return ArrowIpcDecoderSetTypeFixedSizeBinary(schema, 
ns(Field_type_get(field)),
+                                                   error);
     case ns(Type_Utf8):
-      return ArrowIpcReaderSetTypeSimple(schema, NANOARROW_TYPE_STRING, error);
+      return ArrowIpcDecoderSetTypeSimple(schema, NANOARROW_TYPE_STRING, 
error);
     case ns(Type_LargeUtf8):
-      return ArrowIpcReaderSetTypeSimple(schema, NANOARROW_TYPE_LARGE_STRING, 
error);
+      return ArrowIpcDecoderSetTypeSimple(schema, NANOARROW_TYPE_LARGE_STRING, 
error);
     case ns(Type_Date):
-      return ArrowIpcReaderSetTypeDate(schema, ns(Field_type_get(field)), 
error);
+      return ArrowIpcDecoderSetTypeDate(schema, ns(Field_type_get(field)), 
error);
     case ns(Type_Time):
-      return ArrowIpcReaderSetTypeTime(schema, ns(Field_type_get(field)), 
error);
+      return ArrowIpcDecoderSetTypeTime(schema, ns(Field_type_get(field)), 
error);
     case ns(Type_Timestamp):
-      return ArrowIpcReaderSetTypeTimestamp(schema, ns(Field_type_get(field)), 
error);
+      return ArrowIpcDecoderSetTypeTimestamp(schema, 
ns(Field_type_get(field)), error);
     case ns(Type_Duration):
-      return ArrowIpcReaderSetTypeDuration(schema, ns(Field_type_get(field)), 
error);
+      return ArrowIpcDecoderSetTypeDuration(schema, ns(Field_type_get(field)), 
error);
     case ns(Type_Interval):
-      return ArrowIpcReaderSetTypeInterval(schema, ns(Field_type_get(field)), 
error);
+      return ArrowIpcDecoderSetTypeInterval(schema, ns(Field_type_get(field)), 
error);
     case ns(Type_Struct_):
-      return ArrowIpcReaderSetTypeSimpleNested(schema, "+s", error);
+      return ArrowIpcDecoderSetTypeSimpleNested(schema, "+s", error);
     case ns(Type_List):
-      return ArrowIpcReaderSetTypeSimpleNested(schema, "+l", error);
+      return ArrowIpcDecoderSetTypeSimpleNested(schema, "+l", error);
     case ns(Type_LargeList):
-      return ArrowIpcReaderSetTypeSimpleNested(schema, "+L", error);
+      return ArrowIpcDecoderSetTypeSimpleNested(schema, "+L", error);
     case ns(Type_FixedSizeList):
-      return ArrowIpcReaderSetTypeFixedSizeList(schema, 
ns(Field_type_get(field)), error);
+      return ArrowIpcDecoderSetTypeFixedSizeList(schema, 
ns(Field_type_get(field)),
+                                                 error);
     case ns(Type_Map):
-      return ArrowIpcReaderSetTypeMap(schema, ns(Field_type_get(field)), 
error);
+      return ArrowIpcDecoderSetTypeMap(schema, ns(Field_type_get(field)), 
error);
     case ns(Type_Union):
-      return ArrowIpcReaderSetTypeUnion(schema, ns(Field_type_get(field)), 
n_children,
-                                        error);
+      return ArrowIpcDecoderSetTypeUnion(schema, ns(Field_type_get(field)), 
n_children,
+                                         error);
     default:
       ArrowErrorSet(error, "Unrecognized Field type with value %d", 
(int)type_type);
       return EINVAL;
   }
 }
 
-static int ArrowIpcReaderSetChildren(struct ArrowSchema* schema, 
ns(Field_vec_t) fields,
-                                     struct ArrowError* error);
+static int ArrowIpcDecoderSetChildren(struct ArrowSchema* schema, 
ns(Field_vec_t) fields,
+                                      struct ArrowError* error);
 
-static int ArrowIpcReaderSetField(struct ArrowSchema* schema, 
ns(Field_table_t) field,
-                                  struct ArrowError* error) {
+static int ArrowIpcDecoderSetField(struct ArrowSchema* schema, 
ns(Field_table_t) field,
+                                   struct ArrowError* error) {
   // No dictionary support yet
   if (ns(Field_dictionary_is_present(field))) {
     ArrowErrorSet(error, "Field DictionaryEncoding not supported");
@@ -569,7 +601,7 @@ static int ArrowIpcReaderSetField(struct ArrowSchema* 
schema, ns(Field_table_t)
   ns(Field_vec_t) children = ns(Field_children(field));
   int64_t n_children = ns(Field_vec_len(children));
 
-  NANOARROW_RETURN_NOT_OK(ArrowIpcReaderSetType(schema, field, n_children, 
error));
+  NANOARROW_RETURN_NOT_OK(ArrowIpcDecoderSetType(schema, field, n_children, 
error));
 
   // nanoarrow's type setters set the nullable flag by default, so we might
   // have to unset it here.
@@ -591,33 +623,36 @@ static int ArrowIpcReaderSetField(struct ArrowSchema* 
schema, ns(Field_table_t)
     ArrowSchemaInit(schema->children[i]);
   }
 
-  NANOARROW_RETURN_NOT_OK(ArrowIpcReaderSetChildren(schema, children, error));
-  return ArrowIpcReaderSetMetadata(schema, ns(Field_custom_metadata(field)), 
error);
+  NANOARROW_RETURN_NOT_OK(ArrowIpcDecoderSetChildren(schema, children, error));
+  return ArrowIpcDecoderSetMetadata(schema, ns(Field_custom_metadata(field)), 
error);
 }
 
-static int ArrowIpcReaderSetChildren(struct ArrowSchema* schema, 
ns(Field_vec_t) fields,
-                                     struct ArrowError* error) {
+static int ArrowIpcDecoderSetChildren(struct ArrowSchema* schema, 
ns(Field_vec_t) fields,
+                                      struct ArrowError* error) {
   int64_t n_fields = ns(Schema_vec_len(fields));
 
   for (int64_t i = 0; i < n_fields; i++) {
     ns(Field_table_t) field = ns(Field_vec_at(fields, i));
-    NANOARROW_RETURN_NOT_OK(ArrowIpcReaderSetField(schema->children[i], field, 
error));
+    NANOARROW_RETURN_NOT_OK(ArrowIpcDecoderSetField(schema->children[i], 
field, error));
   }
 
   return NANOARROW_OK;
 }
 
-static int ArrowIpcReaderDecodeSchema(struct ArrowIpcReader* reader,
-                                      flatbuffers_generic_t message_header,
-                                      struct ArrowError* error) {
+static int ArrowIpcDecoderDecodeSchemaHeader(struct ArrowIpcDecoder* decoder,
+                                             flatbuffers_generic_t 
message_header,
+                                             struct ArrowError* error) {
+  struct ArrowIpcDecoderPrivate* private_data =
+      (struct ArrowIpcDecoderPrivate*)decoder->private_data;
+
   ns(Schema_table_t) schema = (ns(Schema_table_t))message_header;
   int endianness = ns(Schema_endianness(schema));
   switch (endianness) {
     case ns(Endianness_Little):
-      reader->endianness = NANOARROW_IPC_ENDIANNESS_LITTLE;
+      decoder->endianness = NANOARROW_IPC_ENDIANNESS_LITTLE;
       break;
     case ns(Endianness_Big):
-      reader->endianness = NANOARROW_IPC_ENDIANNESS_BIG;
+      decoder->endianness = NANOARROW_IPC_ENDIANNESS_BIG;
       break;
     default:
       ArrowErrorSet(error,
@@ -628,16 +663,16 @@ static int ArrowIpcReaderDecodeSchema(struct 
ArrowIpcReader* reader,
 
   ns(Feature_vec_t) features = ns(Schema_features(schema));
   int64_t n_features = ns(Feature_vec_len(features));
-  reader->features = 0;
+  decoder->feature_flags = 0;
 
   for (int64_t i = 0; i < n_features; i++) {
     ns(Feature_enum_t) feature = ns(Feature_vec_at(features, i));
     switch (feature) {
       case ns(Feature_COMPRESSED_BODY):
-        reader->features |= NANOARROW_IPC_FEATURE_COMPRESSED_BODY;
+        decoder->feature_flags |= NANOARROW_IPC_FEATURE_COMPRESSED_BODY;
         break;
       case ns(Feature_DICTIONARY_REPLACEMENT):
-        reader->features |= NANOARROW_IPC_FEATURE_DICTIONARY_REPLACEMENT;
+        decoder->feature_flags |= NANOARROW_IPC_FEATURE_DICTIONARY_REPLACEMENT;
         break;
       default:
         ArrowErrorSet(error, "Unrecognized Schema feature with value %d", 
(int)feature);
@@ -645,33 +680,70 @@ static int ArrowIpcReaderDecodeSchema(struct 
ArrowIpcReader* reader,
     }
   }
 
-  ns(Field_vec_t) fields = ns(Schema_fields(schema));
-  int64_t n_fields = ns(Schema_vec_len(fields));
-  if (reader->schema.release != NULL) {
-    reader->schema.release(&reader->schema);
+  return NANOARROW_OK;
+}
+
+static int ArrowIpcDecoderDecodeRecordBatchHeader(struct ArrowIpcDecoder* 
decoder,
+                                                  flatbuffers_generic_t 
message_header,
+                                                  struct ArrowError* error) {
+  struct ArrowIpcDecoderPrivate* private_data =
+      (struct ArrowIpcDecoderPrivate*)decoder->private_data;
+
+  ns(RecordBatch_table_t) batch = (ns(RecordBatch_table_t))message_header;
+
+  ns(FieldNode_vec_t) fields = ns(RecordBatch_nodes(batch));
+  ns(Buffer_vec_t) buffers = ns(RecordBatch_buffers(batch));
+  int64_t n_fields = ns(FieldNode_vec_len(fields));
+  int64_t n_buffers = ns(Buffer_vec_len(buffers));
+
+  // Check field node and buffer count. We have one more field and buffer
+  // because we count the root struct and the flatbuffer message does not.
+  if ((n_fields + 1) != private_data->n_fields) {
+    ArrowErrorSet(error, "Expected %ld field nodes in message but found %ld",
+                  (long)private_data->n_fields - 1, (long)n_fields);
+    return EINVAL;
   }
 
-  ArrowSchemaInit(&reader->schema);
-  int result = ArrowSchemaSetTypeStruct(&reader->schema, n_fields);
-  if (result != NANOARROW_OK) {
-    ArrowErrorSet(error, "Failed to allocate struct schema with %ld children",
-                  (long)n_fields);
-    return result;
+  if ((n_buffers + 1) != private_data->n_buffers) {
+    ArrowErrorSet(error, "Expected %ld buffers in message but found %ld",
+                  (long)private_data->n_buffers - 1, (long)n_buffers);
+    return EINVAL;
   }
 
-  NANOARROW_RETURN_NOT_OK(ArrowIpcReaderSetChildren(&reader->schema, fields, 
error));
-  return ArrowIpcReaderSetMetadata(&reader->schema, 
ns(Schema_custom_metadata(schema)),
-                                   error);
+  if (ns(RecordBatch_compression_is_present(batch))) {
+    ns(BodyCompression_table_t) compression = 
ns(RecordBatch_compression(batch));
+    ns(CompressionType_enum_t) codec = ns(BodyCompression_codec(compression));
+    switch (codec) {
+      case ns(CompressionType_LZ4_FRAME):
+        decoder->codec = NANOARROW_IPC_COMPRESSION_TYPE_LZ4_FRAME;
+        break;
+      case ns(CompressionType_ZSTD):
+        decoder->codec = NANOARROW_IPC_COMPRESSION_TYPE_ZSTD;
+        break;
+      default:
+        ArrowErrorSet(error, "Unrecognized RecordBatch BodyCompression codec 
value: %d",
+                      (int)codec);
+        return EINVAL;
+    }
+  } else {
+    decoder->codec = NANOARROW_IPC_COMPRESSION_TYPE_NONE;
+  }
+
+  // Copying field node and buffer information is separate so as only to pay 
for the
+  // nodes that are actually accessed.
+  return NANOARROW_OK;
 }
 
-static inline int ArrowIpcReaderCheckHeader(struct ArrowIpcReader* reader,
-                                            struct ArrowBufferView* data_mut,
-                                            int32_t* message_size_bytes,
-                                            struct ArrowError* error) {
+// Returns NANOARROW_OK if data is large enough to read the message header,
+// ESPIPE if reading more data might help, or EINVAL if the content is not 
valid
+static inline int ArrowIpcDecoderCheckHeader(struct ArrowIpcDecoder* decoder,
+                                             struct ArrowBufferView* data_mut,
+                                             int32_t* message_size_bytes,
+                                             struct ArrowError* error) {
   if (data_mut->size_bytes < 8) {
     ArrowErrorSet(error, "Expected data of at least 8 bytes but only %ld bytes 
remain",
                   (long)data_mut->size_bytes);
-    return EINVAL;
+    return ESPIPE;
   }
 
   uint32_t continuation = ArrowIpcReadUint32LE(data_mut);
@@ -682,12 +754,17 @@ static inline int ArrowIpcReaderCheckHeader(struct 
ArrowIpcReader* reader,
   }
 
   *message_size_bytes = ArrowIpcReadInt32LE(data_mut);
-  if ((*message_size_bytes) > data_mut->size_bytes || (*message_size_bytes) < 
0) {
+  if ((*message_size_bytes) < 0) {
+    ArrowErrorSet(
+        error, "Expected message body size > 0 but found message body size of 
%ld bytes",
+        (long)(*message_size_bytes));
+    return EINVAL;
+  } else if ((*message_size_bytes) > data_mut->size_bytes) {
     ArrowErrorSet(error,
                   "Expected 0 <= message body size <= %ld bytes but found 
message "
                   "body size of %ld bytes",
                   (long)data_mut->size_bytes, (long)(*message_size_bytes));
-    return EINVAL;
+    return ESPIPE;
   }
 
   if (*message_size_bytes == 0) {
@@ -698,49 +775,64 @@ static inline int ArrowIpcReaderCheckHeader(struct 
ArrowIpcReader* reader,
   return NANOARROW_OK;
 }
 
-ArrowErrorCode ArrowIpcReaderPeek(struct ArrowIpcReader* reader,
-                                  struct ArrowBufferView data, struct 
ArrowError* error) {
-  reader->message_type = NANOARROW_IPC_MESSAGE_TYPE_UNINITIALIZED;
-  reader->body_size_bytes = 0;
+ArrowErrorCode ArrowIpcDecoderPeekHeader(struct ArrowIpcDecoder* decoder,
+                                         struct ArrowBufferView data,
+                                         struct ArrowError* error) {
+  struct ArrowIpcDecoderPrivate* private_data =
+      (struct ArrowIpcDecoderPrivate*)decoder->private_data;
+
+  decoder->message_type = NANOARROW_IPC_MESSAGE_TYPE_UNINITIALIZED;
+  decoder->body_size_bytes = 0;
+  private_data->last_message = NULL;
   NANOARROW_RETURN_NOT_OK(
-      ArrowIpcReaderCheckHeader(reader, &data, &reader->header_size_bytes, 
error));
-  reader->header_size_bytes += 2 * sizeof(int32_t);
+      ArrowIpcDecoderCheckHeader(decoder, &data, &decoder->header_size_bytes, 
error));
+  decoder->header_size_bytes += 2 * sizeof(int32_t);
   return NANOARROW_OK;
 }
 
-ArrowErrorCode ArrowIpcReaderVerify(struct ArrowIpcReader* reader,
-                                    struct ArrowBufferView data,
-                                    struct ArrowError* error) {
-  reader->message_type = NANOARROW_IPC_MESSAGE_TYPE_UNINITIALIZED;
-  reader->body_size_bytes = 0;
+ArrowErrorCode ArrowIpcDecoderVerifyHeader(struct ArrowIpcDecoder* decoder,
+                                           struct ArrowBufferView data,
+                                           struct ArrowError* error) {
+  struct ArrowIpcDecoderPrivate* private_data =
+      (struct ArrowIpcDecoderPrivate*)decoder->private_data;
+
+  decoder->message_type = NANOARROW_IPC_MESSAGE_TYPE_UNINITIALIZED;
+  decoder->body_size_bytes = 0;
+  private_data->last_message = NULL;
   NANOARROW_RETURN_NOT_OK(
-      ArrowIpcReaderCheckHeader(reader, &data, &reader->header_size_bytes, 
error));
+      ArrowIpcDecoderCheckHeader(decoder, &data, &decoder->header_size_bytes, 
error));
 
   // Run flatbuffers verification
-  if (ns(Message_verify_as_root(data.data.as_uint8, 
reader->header_size_bytes)) !=
+  if (ns(Message_verify_as_root(data.data.as_uint8, 
decoder->header_size_bytes)) !=
       flatcc_verify_ok) {
     ArrowErrorSet(error, "Message flatbuffer verification failed");
     return EINVAL;
   }
 
   // Read some basic information from the message
-  reader->header_size_bytes += 2 * sizeof(int32_t);
+  decoder->header_size_bytes += 2 * sizeof(int32_t);
   ns(Message_table_t) message = ns(Message_as_root(data.data.as_uint8));
-  reader->metadata_version = ns(Message_version(message));
-  reader->message_type = ns(Message_header_type(message));
-  reader->body_size_bytes = ns(Message_bodyLength(message));
+  decoder->metadata_version = ns(Message_version(message));
+  decoder->message_type = ns(Message_header_type(message));
+  decoder->body_size_bytes = ns(Message_bodyLength(message));
 
+  private_data->last_message = ns(Message_header_get(message));
   return NANOARROW_OK;
 }
 
-ArrowErrorCode ArrowIpcReaderDecode(struct ArrowIpcReader* reader,
-                                    struct ArrowBufferView data,
-                                    struct ArrowError* error) {
-  reader->message_type = NANOARROW_IPC_MESSAGE_TYPE_UNINITIALIZED;
-  reader->body_size_bytes = 0;
+ArrowErrorCode ArrowIpcDecoderDecodeHeader(struct ArrowIpcDecoder* decoder,
+                                           struct ArrowBufferView data,
+                                           struct ArrowError* error) {
+  struct ArrowIpcDecoderPrivate* private_data =
+      (struct ArrowIpcDecoderPrivate*)decoder->private_data;
+
+  decoder->message_type = NANOARROW_IPC_MESSAGE_TYPE_UNINITIALIZED;
+  decoder->body_size_bytes = 0;
+  private_data->last_message = NULL;
+
   NANOARROW_RETURN_NOT_OK(
-      ArrowIpcReaderCheckHeader(reader, &data, &reader->header_size_bytes, 
error));
-  reader->header_size_bytes += 2 * sizeof(int32_t);
+      ArrowIpcDecoderCheckHeader(decoder, &data, &decoder->header_size_bytes, 
error));
+  decoder->header_size_bytes += 2 * sizeof(int32_t);
 
   ns(Message_table_t) message = ns(Message_as_root(data.data.as_uint8));
   if (!message) {
@@ -748,11 +840,11 @@ ArrowErrorCode ArrowIpcReaderDecode(struct 
ArrowIpcReader* reader,
   }
 
   // Read some basic information from the message
-  reader->metadata_version = ns(Message_version(message));
-  reader->message_type = ns(Message_header_type(message));
-  reader->body_size_bytes = ns(Message_bodyLength(message));
+  int32_t metadata_version = ns(Message_version(message));
+  decoder->message_type = ns(Message_header_type(message));
+  decoder->body_size_bytes = ns(Message_bodyLength(message));
 
-  switch (reader->metadata_version) {
+  switch (decoder->metadata_version) {
     case ns(MetadataVersion_V4):
     case ns(MetadataVersion_V5):
       break;
@@ -760,30 +852,319 @@ ArrowErrorCode ArrowIpcReaderDecode(struct 
ArrowIpcReader* reader,
     case ns(MetadataVersion_V2):
     case ns(MetadataVersion_V3):
       ArrowErrorSet(error, "Expected metadata version V4 or V5 but found %s",
-                    ns(MetadataVersion_name(reader->metadata_version)));
+                    ns(MetadataVersion_name(decoder->metadata_version)));
       break;
     default:
       ArrowErrorSet(error, "Unexpected value for Message metadata version 
(%d)",
-                    reader->metadata_version);
+                    decoder->metadata_version);
       return EINVAL;
   }
 
   flatbuffers_generic_t message_header = ns(Message_header_get(message));
-  switch (reader->message_type) {
+  switch (decoder->message_type) {
     case ns(MessageHeader_Schema):
-      NANOARROW_RETURN_NOT_OK(ArrowIpcReaderDecodeSchema(reader, 
message_header, error));
+      NANOARROW_RETURN_NOT_OK(
+          ArrowIpcDecoderDecodeSchemaHeader(decoder, message_header, error));
       break;
-    case ns(MessageHeader_DictionaryBatch):
     case ns(MessageHeader_RecordBatch):
+      NANOARROW_RETURN_NOT_OK(
+          ArrowIpcDecoderDecodeRecordBatchHeader(decoder, message_header, 
error));
+      break;
+    case ns(MessageHeader_DictionaryBatch):
     case ns(MessageHeader_Tensor):
     case ns(MessageHeader_SparseTensor):
       ArrowErrorSet(error, "Unsupported message type: '%s'",
-                    ns(MessageHeader_type_name(reader->message_type)));
+                    ns(MessageHeader_type_name(decoder->message_type)));
       return ENOTSUP;
     default:
-      ArrowErrorSet(error, "Unnown message type: %d", 
(int)(reader->message_type));
+      ArrowErrorSet(error, "Unnown message type: %d", 
(int)(decoder->message_type));
       return EINVAL;
   }
 
+  private_data->last_message = message_header;
+  return NANOARROW_OK;
+}
+
+ArrowErrorCode ArrowIpcDecoderDecodeSchema(struct ArrowIpcDecoder* decoder,
+                                           struct ArrowSchema* out,
+                                           struct ArrowError* error) {
+  struct ArrowIpcDecoderPrivate* private_data =
+      (struct ArrowIpcDecoderPrivate*)decoder->private_data;
+
+  if (private_data->last_message == NULL ||
+      decoder->message_type != NANOARROW_IPC_MESSAGE_TYPE_SCHEMA) {
+    ArrowErrorSet(error, "decoder did not just decode a Schema message");
+    return EINVAL;
+  }
+
+  ns(Schema_table_t) schema = (ns(Schema_table_t))private_data->last_message;
+
+  ns(Field_vec_t) fields = ns(Schema_fields(schema));
+  int64_t n_fields = ns(Schema_vec_len(fields));
+
+  struct ArrowSchema tmp;
+  ArrowSchemaInit(&tmp);
+  int result = ArrowSchemaSetTypeStruct(&tmp, n_fields);
+  if (result != NANOARROW_OK) {
+    tmp.release(&tmp);
+    ArrowErrorSet(error, "Failed to allocate struct schema with %ld children",
+                  (long)n_fields);
+    return result;
+  }
+
+  result = ArrowIpcDecoderSetChildren(&tmp, fields, error);
+  if (result != NANOARROW_OK) {
+    tmp.release(&tmp);
+    return result;
+  }
+
+  result = ArrowIpcDecoderSetMetadata(&tmp, 
ns(Schema_custom_metadata(schema)), error);
+  if (result != NANOARROW_OK) {
+    tmp.release(&tmp);
+    return result;
+  }
+
+  ArrowSchemaMove(&tmp, out);
+  return NANOARROW_OK;
+}
+
+static void ArrowIpcDecoderCountFields(struct ArrowSchema* schema, int64_t* 
n_fields) {
+  *n_fields += 1;
+  for (int64_t i = 0; i < schema->n_children; i++) {
+    ArrowIpcDecoderCountFields(schema->children[i], n_fields);
+  }
+}
+
+static void ArrowIpcDecoderInitFields(struct ArrowIpcField* fields,
+                                      struct ArrowArrayView* view, int64_t* 
n_fields,
+                                      int64_t* n_buffers) {
+  struct ArrowIpcField* field = fields + (*n_fields);
+  field->array_view = view;
+  field->buffer_offset = *n_buffers;
+
+  for (int i = 0; i < 3; i++) {
+    *n_buffers += view->layout.buffer_type[i] != NANOARROW_BUFFER_TYPE_NONE;
+  }
+
+  *n_fields += 1;
+
+  for (int64_t i = 0; i < view->n_children; i++) {
+    ArrowIpcDecoderInitFields(fields, view->children[i], n_fields, n_buffers);
+  }
+}
+
+ArrowErrorCode ArrowIpcDecoderSetSchema(struct ArrowIpcDecoder* decoder,
+                                        struct ArrowSchema* schema,
+                                        struct ArrowError* error) {
+  struct ArrowIpcDecoderPrivate* private_data =
+      (struct ArrowIpcDecoderPrivate*)decoder->private_data;
+
+  ArrowArrayViewReset(&private_data->array_view);
+
+  if (private_data->fields != NULL) {
+    ArrowFree(private_data->fields);
+  }
+
+  // Allocate Array and ArrayView based on schema without moving the schema
+  // this will fail if the schema is not valid.
+  NANOARROW_RETURN_NOT_OK(
+      ArrowArrayViewInitFromSchema(&private_data->array_view, schema, error));
+
+  // Root must be a struct
+  if (private_data->array_view.storage_type != NANOARROW_TYPE_STRUCT) {
+    ArrowErrorSet(error, "schema must be a struct type");
+    return EINVAL;
+  }
+
+  // Walk tree and calculate how many fields we need to allocate
+  private_data->n_fields = 0;
+  ArrowIpcDecoderCountFields(schema, &private_data->n_fields);
+  private_data->fields = (struct 
ArrowIpcField*)ArrowMalloc(private_data->n_fields *
+                                                            sizeof(struct 
ArrowIpcField));
+  if (private_data->fields == NULL) {
+    ArrowErrorSet(error, "Failed to allocate decoder->fields");
+    return ENOMEM;
+  }
+  memset(private_data->fields, 0, private_data->n_fields * sizeof(struct 
ArrowIpcField));
+
+  // Init field information and calculate starting buffer offset for each
+  int64_t field_i = 0;
+  ArrowIpcDecoderInitFields(private_data->fields, &private_data->array_view, 
&field_i,
+                            &private_data->n_buffers);
+
+  return NANOARROW_OK;
+}
+
+struct ArrowIpcArraySetter {
+  ns(FieldNode_vec_t) fields;
+  int64_t field_i;
+  ns(Buffer_vec_t) buffers;
+  int64_t buffer_i;
+  struct ArrowBufferView body;
+  enum ArrowIpcCompressionType codec;
+  enum ArrowIpcEndianness endianness;
+  enum ArrowIpcEndianness system_endianness;
+};
+
+static int ArrowIpcDecoderMakeBuffer(struct ArrowIpcArraySetter* setter, 
int64_t offset,
+                                     int64_t length, struct ArrowBuffer* out,
+                                     struct ArrowError* error) {
+  if (length == 0) {
+    return NANOARROW_OK;
+  }
+
+  // Check that this buffer fits within the body
+  if (offset < 0 || (offset + length) > setter->body.size_bytes) {
+    ArrowErrorSet(error,
+                  "Buffer %ld requires body offsets [%ld..%ld) but body has 
size %ld",
+                  (long)setter->buffer_i - 1, (long)offset, (long)offset + 
(long)length,
+                  setter->body.size_bytes);
+    return EINVAL;
+  }
+
+  struct ArrowBufferView view;
+  view.data.as_uint8 = setter->body.data.as_uint8 + offset;
+  view.size_bytes = length;
+
+  if (setter->codec != NANOARROW_IPC_COMPRESSION_TYPE_NONE) {
+    ArrowErrorSet(error, "The nanoarrow_ipc extension does not support 
compression");
+    return ENOTSUP;
+  }
+
+  if (setter->endianness != NANOARROW_IPC_ENDIANNESS_UNINITIALIZED &&
+      setter->endianness != setter->system_endianness) {
+    ArrowErrorSet(error,
+                  "The nanoarrow_ipc extension does not support non-system 
endianness");
+    return ENOTSUP;
+  }
+
+  int result = ArrowBufferAppendBufferView(out, view);
+  if (result != NANOARROW_OK) {
+    ArrowErrorSet(error, "Failed to copy buffer");
+    return result;
+  }
+
+  return NANOARROW_OK;
+}
+
+static int ArrowIpcDecoderWalkGetArray(struct ArrowIpcArraySetter* setter,
+                                       struct ArrowArray* array,
+                                       struct ArrowError* error) {
+  ns(FieldNode_struct_t) field =
+      ns(FieldNode_vec_at(setter->fields, (size_t)setter->field_i));
+  array->length = ns(FieldNode_length(field));
+  array->null_count = ns(FieldNode_null_count(field));
+  setter->field_i += 1;
+
+  for (int64_t i = 0; i < array->n_buffers; i++) {
+    ns(Buffer_struct_t) buffer =
+        ns(Buffer_vec_at(setter->buffers, (size_t)setter->buffer_i));
+    int64_t buffer_offset = ns(Buffer_offset(buffer));
+    int64_t buffer_length = ns(Buffer_length(buffer));
+    setter->buffer_i += 1;
+
+    struct ArrowBuffer* buffer_dst = ArrowArrayBuffer(array, i);
+    NANOARROW_RETURN_NOT_OK(ArrowIpcDecoderMakeBuffer(setter, buffer_offset,
+                                                      buffer_length, 
buffer_dst, error));
+  }
+
+  for (int64_t i = 0; i < array->n_children; i++) {
+    NANOARROW_RETURN_NOT_OK(
+        ArrowIpcDecoderWalkGetArray(setter, array->children[i], error));
+  }
+
+  return NANOARROW_OK;
+}
+
+static int ArrowIpcArrayInitFromArrayView(struct ArrowArray* array,
+                                          struct ArrowArrayView* array_view) {
+  NANOARROW_RETURN_NOT_OK(ArrowArrayInitFromType(array, 
array_view->storage_type));
+  NANOARROW_RETURN_NOT_OK(ArrowArrayAllocateChildren(array, 
array_view->n_children));
+  for (int64_t i = 0; i < array_view->n_children; i++) {
+    NANOARROW_RETURN_NOT_OK(
+        ArrowIpcArrayInitFromArrayView(array->children[i], 
array_view->children[i]));
+  }
+
+  return NANOARROW_OK;
+}
+
+ArrowErrorCode ArrowIpcDecoderDecodeArray(struct ArrowIpcDecoder* decoder,
+                                          struct ArrowBufferView body, int64_t 
field_i,
+                                          struct ArrowArray* out,
+                                          struct ArrowError* error) {
+  struct ArrowIpcDecoderPrivate* private_data =
+      (struct ArrowIpcDecoderPrivate*)decoder->private_data;
+
+  if (private_data->last_message == NULL ||
+      decoder->message_type != NANOARROW_IPC_MESSAGE_TYPE_RECORD_BATCH) {
+    ArrowErrorSet(error, "decoder did not just decode a RecordBatch message");
+    return EINVAL;
+  }
+
+  ns(RecordBatch_table_t) batch = 
(ns(RecordBatch_table_t))private_data->last_message;
+
+  // RecordBatch messages don't count the root node but decoder->fields does
+  struct ArrowIpcField* root = private_data->fields + field_i + 1;
+
+  struct ArrowArray temp;
+  temp.release = NULL;
+  int result = ArrowIpcArrayInitFromArrayView(&temp, root->array_view);
+  if (result != NANOARROW_OK) {
+    ArrowErrorSet(error, "Failed to initialize output array");
+    return result;
+  }
+
+  struct ArrowIpcArraySetter setter;
+  setter.fields = ns(RecordBatch_nodes(batch));
+  setter.field_i = field_i;
+  setter.buffers = ns(RecordBatch_buffers(batch));
+  setter.buffer_i = root->buffer_offset - 1;
+  setter.body = body;
+  setter.codec = decoder->codec;
+  setter.endianness = decoder->endianness;
+
+  // This should probably be done at compile time
+  uint32_t check = 1;
+  char first_byte;
+  memcpy(&first_byte, &check, sizeof(char));
+  if (first_byte) {
+    setter.system_endianness = NANOARROW_IPC_ENDIANNESS_LITTLE;
+  } else {
+    setter.system_endianness = NANOARROW_IPC_ENDIANNESS_BIG;
+  }
+
+  // The flatbuffers FieldNode doesn't count the root struct so we have to 
loop over the
+  // children ourselves
+  if (field_i == -1) {
+    temp.length = ns(RecordBatch_length(batch));
+    temp.null_count = 0;
+    setter.field_i++;
+    setter.buffer_i++;
+
+    for (int64_t i = 0; i < temp.n_children; i++) {
+      result = ArrowIpcDecoderWalkGetArray(&setter, temp.children[i], error);
+      if (result != NANOARROW_OK) {
+        temp.release(&temp);
+        return result;
+      }
+    }
+  } else {
+    result = ArrowIpcDecoderWalkGetArray(&setter, &temp, error);
+    if (result != NANOARROW_OK) {
+      temp.release(&temp);
+      return result;
+    }
+  }
+
+  // TODO: this performs some validation but doesn't do everything we need it 
to do
+  // notably it doesn't loop over offset buffers to look for values that will 
cause
+  // out-of-bounds buffer access on the data buffer or child arrays.
+  result = ArrowArrayFinishBuilding(&temp, error);
+  if (result != NANOARROW_OK) {
+    temp.release(&temp);
+    return result;
+  }
+
+  ArrowArrayMove(&temp, out);
   return NANOARROW_OK;
 }
diff --git a/extensions/nanoarrow_ipc/src/nanoarrow/nanoarrow_ipc.h 
b/extensions/nanoarrow_ipc/src/nanoarrow/nanoarrow_ipc.h
index 018f4ed..7573441 100644
--- a/extensions/nanoarrow_ipc/src/nanoarrow/nanoarrow_ipc.h
+++ b/extensions/nanoarrow_ipc/src/nanoarrow/nanoarrow_ipc.h
@@ -23,11 +23,20 @@
 #ifdef NANOARROW_NAMESPACE
 
 #define ArrowIpcCheckRuntime NANOARROW_SYMBOL(NANOARROW_NAMESPACE, 
ArrowIpcCheckRuntime)
-#define ArrowIpcReaderInit NANOARROW_SYMBOL(NANOARROW_NAMESPACE, 
ArrowIpcReaderInit)
-#define ArrowIpcReaderReset NANOARROW_SYMBOL(NANOARROW_NAMESPACE, 
ArrowIpcReaderReset)
-#define ArrowIpcReaderPeek NANOARROW_SYMBOL(NANOARROW_NAMESPACE, 
ArrowIpcReaderPeek)
-#define ArrowIpcReaderVerify NANOARROW_SYMBOL(NANOARROW_NAMESPACE, 
ArrowIpcReaderVerify)
-#define ArrowIpcReaderDecode NANOARROW_SYMBOL(NANOARROW_NAMESPACE, 
ArrowIpcReaderDecode)
+#define ArrowIpcDecoderInit NANOARROW_SYMBOL(NANOARROW_NAMESPACE, 
ArrowIpcDecoderInit)
+#define ArrowIpcDecoderReset NANOARROW_SYMBOL(NANOARROW_NAMESPACE, 
ArrowIpcDecoderReset)
+#define ArrowIpcDecoderPeekHeader \
+  NANOARROW_SYMBOL(NANOARROW_NAMESPACE, ArrowIpcDecoderPeekHeader)
+#define ArrowIpcDecoderVerifyHeader \
+  NANOARROW_SYMBOL(NANOARROW_NAMESPACE, ArrowIpcDecoderVerifyHeader)
+#define ArrowIpcDecoderDecodeHeader \
+  NANOARROW_SYMBOL(NANOARROW_NAMESPACE, ArrowIpcDecoderDecodeHeader)
+#define ArrowIpcDecoderDecodeSchema \
+  NANOARROW_SYMBOL(NANOARROW_NAMESPACE, ArrowIpcDecoderDecodeSchema)
+#define ArrowIpcDecoderDecodeArray \
+  NANOARROW_SYMBOL(NANOARROW_NAMESPACE, ArrowIpcDecoderDecodeArray)
+#define ArrowIpcDecoderSetSchema \
+  NANOARROW_SYMBOL(NANOARROW_NAMESPACE, ArrowIpcDecoderSetSchema)
 
 #endif
 
@@ -35,6 +44,14 @@
 extern "C" {
 #endif
 
+enum ArrowIpcMetadataVersion {
+  NANOARROW_IPC_METADATA_VERSION_V1,
+  NANOARROW_IPC_METADATA_VERSION_V2,
+  NANOARROW_IPC_METADATA_VERSION_V3,
+  NANOARROW_IPC_METADATA_VERSION_V4,
+  NANOARROW_IPC_METADATA_VERSION_V5
+};
+
 enum ArrowIpcMessageType {
   NANOARROW_IPC_MESSAGE_TYPE_UNINITIALIZED,
   NANOARROW_IPC_MESSAGE_TYPE_SCHEMA,
@@ -50,38 +67,145 @@ enum ArrowIpcEndianness {
   NANOARROW_IPC_ENDIANNESS_BIG
 };
 
+enum ArrowIpcCompressionType {
+  NANOARROW_IPC_COMPRESSION_TYPE_NONE,
+  NANOARROW_IPC_COMPRESSION_TYPE_LZ4_FRAME,
+  NANOARROW_IPC_COMPRESSION_TYPE_ZSTD
+};
+
 #define NANOARROW_IPC_FEATURE_DICTIONARY_REPLACEMENT 1
 #define NANOARROW_IPC_FEATURE_COMPRESSED_BODY 2
 
 ArrowErrorCode ArrowIpcCheckRuntime(struct ArrowError* error);
 
-struct ArrowIpcReader {
-  int32_t metadata_version;
-  int32_t message_type;
-  int32_t endianness;
-  int32_t features;
+/// \brief Decoder for Arrow IPC messages
+///
+/// This structure is intended to be allocated by the caller,
+/// initialized using ArrowIpcDecoderInit(), and released with
+/// ArrowIpcDecoderReset(). These fields should not be modified
+/// by the caller but can be read following a call to
+/// ArrowIpcDecoderPeekHeader(), ArrowIpcDecoderVerifyHeader(), or
+/// ArrowIpcDecoderDecodeHeader().
+struct ArrowIpcDecoder {
+  /// \brief The last verified or decoded message type
+  enum ArrowIpcMessageType message_type;
+
+  /// \brief The metadata version used by this and forthcoming messages
+  enum ArrowIpcMetadataVersion metadata_version;
+
+  /// \brief Endianness of forthcoming RecordBatch messages
+  enum ArrowIpcEndianness endianness;
+
+  /// \brief Features used by this and forthcoming messages as indicated by 
the current
+  /// Schema message
+  int32_t feature_flags;
+
+  /// \brief Compression used by the current RecordBatch message
+  enum ArrowIpcCompressionType codec;
+
+  /// \brief The number of bytes in the current header message
+  ///
+  /// This value includes the 8 bytes before the start of the header message
+  /// content and any padding bytes required to make the header message size
+  /// be a multiple of 8 bytes.
   int32_t header_size_bytes;
-  int64_t body_size_bytes;
-  struct ArrowSchema schema;
-};
-
-void ArrowIpcReaderInit(struct ArrowIpcReader* reader);
 
-void ArrowIpcReaderReset(struct ArrowIpcReader* reader);
-
-ArrowErrorCode ArrowIpcReaderPeek(struct ArrowIpcReader* reader,
-                                  struct ArrowBufferView data, struct 
ArrowError* error);
+  /// \brief The number of bytes in the forthcoming body message.
+  int64_t body_size_bytes;
 
-ArrowErrorCode ArrowIpcReaderVerify(struct ArrowIpcReader* reader,
-                                    struct ArrowBufferView data,
-                                    struct ArrowError* error);
+  /// \brief Private resources managed by this library
+  void* private_data;
+};
 
-ArrowErrorCode ArrowIpcReaderDecode(struct ArrowIpcReader* reader,
-                                    struct ArrowBufferView data,
-                                    struct ArrowError* error);
-
-#endif
+/// \brief Initialize a decoder
+ArrowErrorCode ArrowIpcDecoderInit(struct ArrowIpcDecoder* decoder);
+
+/// \brief Release all resources attached to a decoder
+void ArrowIpcDecoderReset(struct ArrowIpcDecoder* decoder);
+
+/// \brief Peek at a message header
+///
+/// The first 8 bytes of an Arrow IPC message are 0xFFFFFF followed by the size
+/// of the header as a little-endian 32-bit integer. 
ArrowIpcDecoderPeekHeader() reads
+/// these bytes and returns ESPIPE if there are not enough remaining bytes in 
data to read
+/// the entire header message, EINVAL if the first 8 bytes are not valid, 
ENODATA if the
+/// Arrow end-of-stream indicator has been reached, or NANOARROW_OK otherwise.
+ArrowErrorCode ArrowIpcDecoderPeekHeader(struct ArrowIpcDecoder* decoder,
+                                         struct ArrowBufferView data,
+                                         struct ArrowError* error);
+
+/// \brief Verify a message header
+///
+/// Runs ArrowIpcDecoderPeekHeader() to ensure data is sufficiently large but 
additionally
+/// runs flatbuffer verification to ensure that decoding the data will not 
access
+/// memory outside of the buffer specified by data. 
ArrowIpcDecoderVerifyHeader() will
+/// also set decoder.header_size_bytes, decoder.body_size_bytes, 
decoder.metadata_version,
+/// and decoder.message_type.
+///
+/// Returns as ArrowIpcDecoderPeekHeader() and additionally will
+/// return EINVAL if flatbuffer verification fails.
+ArrowErrorCode ArrowIpcDecoderVerifyHeader(struct ArrowIpcDecoder* decoder,
+                                           struct ArrowBufferView data,
+                                           struct ArrowError* error);
+
+/// \brief Decode a message header
+///
+/// Runs ArrowIpcDecoderPeekHeader() to ensure data is sufficiently large and 
decodes
+/// the content of the message header. If data contains a schema message,
+/// decoder.endianness and decoder.feature_flags is set and 
ArrowIpcDecoderDecodeSchema()
+/// can be used to obtain the decoded schema. If data contains a record batch 
message,
+/// decoder.codec is set and a successful call can be followed by a call to
+/// ArrowIpcDecoderDecodeArray().
+///
+/// In almost all cases this should be preceeded by a call to
+/// ArrowIpcDecoderVerifyHeader() to ensure decoding does not access data 
outside of the
+/// specified buffer.
+///
+/// Returns EINVAL if the content of the message cannot be decoded or ENOTSUP 
if the
+/// content of the message uses features not supported by this library.
+ArrowErrorCode ArrowIpcDecoderDecodeHeader(struct ArrowIpcDecoder* decoder,
+                                           struct ArrowBufferView data,
+                                           struct ArrowError* error);
+
+/// \brief Decode an ArrowArray
+///
+/// After a successful call to ArrowIpcDecoderDecodeHeader(), retrieve an 
ArrowSchema.
+/// The caller is responsible for releasing the schema if NANOARROW_OK is 
returned.
+///
+/// Returns EINVAL if the decoder did not just decode a schema message or
+/// NANOARROW_OK otherwise.
+ArrowErrorCode ArrowIpcDecoderDecodeSchema(struct ArrowIpcDecoder* decoder,
+                                           struct ArrowSchema* out,
+                                           struct ArrowError* error);
+
+/// \brief Set the ArrowSchema used to decode future record batch messages
+///
+/// Prepares the decoder for future record batch messages
+/// of this type. The decoder takes ownership of schema if NANOARROW_OK is 
returned.
+///
+/// Returns EINVAL if schema validation fails or NANOARROW_OK otherwise.
+ArrowErrorCode ArrowIpcDecoderSetSchema(struct ArrowIpcDecoder* decoder,
+                                        struct ArrowSchema* schema,
+                                        struct ArrowError* error);
+
+/// \brief Decode an ArrowArray
+///
+/// After a successful call to ArrowIpcDecoderDecodeHeader(), assemble an 
ArrowArray given
+/// a message body and a field index. Note that field index does not equate to 
column
+/// index if any columns contain nested types. Use a value of -1 to decode the 
entire
+/// array into a struct. The caller is responsible for releasing the array if
+/// NANOARROW_OK is returned.
+///
+/// Returns EINVAL if the decoder did not just decode a record batch message, 
ENOTSUP
+/// if the message uses features not supported by this library, or or 
NANOARROW_OK
+/// otherwise.
+ArrowErrorCode ArrowIpcDecoderDecodeArray(struct ArrowIpcDecoder* decoder,
+                                          struct ArrowBufferView body, int64_t 
i,
+                                          struct ArrowArray* out,
+                                          struct ArrowError* error);
 
 #ifdef __cplusplus
 }
 #endif
+
+#endif
diff --git a/extensions/nanoarrow_ipc/src/nanoarrow/nanoarrow_ipc_test.cc 
b/extensions/nanoarrow_ipc/src/nanoarrow/nanoarrow_ipc_test.cc
index 55835e8..a8e035f 100644
--- a/extensions/nanoarrow_ipc/src/nanoarrow/nanoarrow_ipc_test.cc
+++ b/extensions/nanoarrow_ipc/src/nanoarrow/nanoarrow_ipc_test.cc
@@ -27,6 +27,23 @@
 
 using namespace arrow;
 
+// Copied from nanoarrow_ipc.c so we can test the internal state
+// of the decoder
+extern "C" {
+struct ArrowIpcField {
+  struct ArrowArrayView* array_view;
+  int64_t buffer_offset;
+};
+
+struct ArrowIpcDecoderPrivate {
+  struct ArrowArrayView array_view;
+  int64_t n_fields;
+  struct ArrowIpcField* fields;
+  int64_t n_buffers;
+  const void* last_message;
+};
+}
+
 TEST(NanoarrowIpcCheckRuntime, CheckRuntime) {
   EXPECT_EQ(ArrowIpcCheckRuntime(nullptr), NANOARROW_OK);
 }
@@ -62,76 +79,90 @@ static uint8_t kSimpleSchema[] = {
     0x6c, 0x64, 0x00, 0x00, 0x08, 0x00, 0x0c, 0x00, 0x08, 0x00, 0x07, 0x00, 
0x08, 0x00,
     0x00, 0x00, 0x00, 0x00, 0x00, 0x01, 0x20, 0x00, 0x00, 0x00, 0x00, 0x00, 
0x00, 0x00};
 
+static uint8_t kSimpleRecordBatch[] = {
+    0xff, 0xff, 0xff, 0xff, 0x88, 0x00, 0x00, 0x00, 0x14, 0x00, 0x00, 0x00, 
0x00, 0x00,
+    0x00, 0x00, 0x0c, 0x00, 0x16, 0x00, 0x06, 0x00, 0x05, 0x00, 0x08, 0x00, 
0x0c, 0x00,
+    0x0c, 0x00, 0x00, 0x00, 0x00, 0x03, 0x04, 0x00, 0x18, 0x00, 0x00, 0x00, 
0x10, 0x00,
+    0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x0a, 0x00, 0x18, 0x00, 
0x0c, 0x00,
+    0x04, 0x00, 0x08, 0x00, 0x0a, 0x00, 0x00, 0x00, 0x3c, 0x00, 0x00, 0x00, 
0x10, 0x00,
+    0x00, 0x00, 0x03, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 
0x00, 0x00,
+    0x02, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 
0x00, 0x00,
+    0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 
0x00, 0x00,
+    0x0c, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 
0x01, 0x00,
+    0x00, 0x00, 0x03, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 
0x00, 0x00,
+    0x00, 0x00, 0x00, 0x00, 0x01, 0x00, 0x00, 0x00, 0x02, 0x00, 0x00, 0x00, 
0x03, 0x00,
+    0x00, 0x00, 0x00, 0x00, 0x00, 0x00};
+
 TEST(NanoarrowIpcTest, NanoarrowIpcCheckHeader) {
-  struct ArrowIpcReader reader;
+  struct ArrowIpcDecoder decoder;
   struct ArrowError error;
 
   struct ArrowBufferView data;
   data.data.as_uint8 = kSimpleSchema;
   data.size_bytes = 1;
 
-  ArrowIpcReaderInit(&reader);
+  ArrowIpcDecoderInit(&decoder);
 
-  EXPECT_EQ(ArrowIpcReaderVerify(&reader, data, &error), EINVAL);
+  EXPECT_EQ(ArrowIpcDecoderVerifyHeader(&decoder, data, &error), ESPIPE);
   EXPECT_STREQ(error.message,
                "Expected data of at least 8 bytes but only 1 bytes remain");
 
   uint32_t eight_bad_bytes[] = {0, 0};
   data.data.as_uint8 = reinterpret_cast<uint8_t*>(eight_bad_bytes);
   data.size_bytes = 8;
-  EXPECT_EQ(ArrowIpcReaderVerify(&reader, data, &error), EINVAL);
+  EXPECT_EQ(ArrowIpcDecoderVerifyHeader(&decoder, data, &error), EINVAL);
   EXPECT_STREQ(error.message,
                "Expected 0xFFFFFFFF at start of message but found 0x00000000");
 
   eight_bad_bytes[0] = 0xFFFFFFFF;
   eight_bad_bytes[1] = static_cast<uint32_t>(-1);
-  EXPECT_EQ(ArrowIpcReaderVerify(&reader, data, &error), EINVAL);
+  EXPECT_EQ(ArrowIpcDecoderVerifyHeader(&decoder, data, &error), EINVAL);
   EXPECT_STREQ(error.message,
-               "Expected 0 <= message body size <= 0 bytes but found message 
body size "
-               "of -1 bytes");
+               "Expected message body size > 0 but found message body size of 
-1 bytes");
 
   eight_bad_bytes[1] = static_cast<uint32_t>(1);
-  EXPECT_EQ(ArrowIpcReaderVerify(&reader, data, &error), EINVAL);
+  EXPECT_EQ(ArrowIpcDecoderVerifyHeader(&decoder, data, &error), ESPIPE);
   EXPECT_STREQ(error.message,
                "Expected 0 <= message body size <= 0 bytes but found message 
body size "
                "of 1 bytes");
 
   eight_bad_bytes[0] = 0xFFFFFFFF;
   eight_bad_bytes[1] = 0;
-  EXPECT_EQ(ArrowIpcReaderVerify(&reader, data, &error), ENODATA);
+  EXPECT_EQ(ArrowIpcDecoderVerifyHeader(&decoder, data, &error), ENODATA);
   EXPECT_STREQ(error.message, "End of Arrow stream");
 
-  ArrowIpcReaderReset(&reader);
+  ArrowIpcDecoderReset(&decoder);
 }
 
 TEST(NanoarrowIpcTest, NanoarrowIpcPeekSimpleSchema) {
-  struct ArrowIpcReader reader;
+  struct ArrowIpcDecoder decoder;
   struct ArrowError error;
 
   struct ArrowBufferView data;
   data.data.as_uint8 = kSimpleSchema;
   data.size_bytes = sizeof(kSimpleSchema);
 
-  ArrowIpcReaderInit(&reader);
-  EXPECT_EQ(ArrowIpcReaderPeek(&reader, data, &error), NANOARROW_OK);
-  EXPECT_EQ(reader.header_size_bytes, sizeof(kSimpleSchema));
-  EXPECT_EQ(reader.body_size_bytes, 0);
+  ArrowIpcDecoderInit(&decoder);
+  EXPECT_EQ(ArrowIpcDecoderPeekHeader(&decoder, data, &error), NANOARROW_OK);
+  EXPECT_EQ(decoder.header_size_bytes, sizeof(kSimpleSchema));
+  EXPECT_EQ(decoder.body_size_bytes, 0);
 
-  ArrowIpcReaderReset(&reader);
+  ArrowIpcDecoderReset(&decoder);
 }
 
 TEST(NanoarrowIpcTest, NanoarrowIpcVerifySimpleSchema) {
-  struct ArrowIpcReader reader;
+  struct ArrowIpcDecoder decoder;
   struct ArrowError error;
 
   struct ArrowBufferView data;
   data.data.as_uint8 = kSimpleSchema;
   data.size_bytes = sizeof(kSimpleSchema);
 
-  ArrowIpcReaderInit(&reader);
-  EXPECT_EQ(ArrowIpcReaderVerify(&reader, data, &error), NANOARROW_OK);
-  EXPECT_EQ(reader.header_size_bytes, sizeof(kSimpleSchema));
-  EXPECT_EQ(reader.body_size_bytes, 0);
+  ArrowIpcDecoderInit(&decoder);
+  EXPECT_EQ(ArrowIpcDecoderVerifyHeader(&decoder, data, &error), NANOARROW_OK);
+  EXPECT_EQ(decoder.message_type, NANOARROW_IPC_MESSAGE_TYPE_SCHEMA);
+  EXPECT_EQ(decoder.header_size_bytes, sizeof(kSimpleSchema));
+  EXPECT_EQ(decoder.body_size_bytes, 0);
 
   uint8_t simple_schema_invalid[280];
   memcpy(simple_schema_invalid, kSimpleSchema, sizeof(simple_schema_invalid));
@@ -139,36 +170,206 @@ TEST(NanoarrowIpcTest, NanoarrowIpcVerifySimpleSchema) {
 
   data.data.as_uint8 = simple_schema_invalid;
   data.size_bytes = sizeof(kSimpleSchema);
-  EXPECT_EQ(ArrowIpcReaderVerify(&reader, data, &error), EINVAL);
+  EXPECT_EQ(ArrowIpcDecoderVerifyHeader(&decoder, data, &error), EINVAL);
   EXPECT_STREQ(error.message, "Message flatbuffer verification failed");
 
-  ArrowIpcReaderReset(&reader);
+  ArrowIpcDecoderReset(&decoder);
+}
+
+TEST(NanoarrowIpcTest, NanoarrowIpcVerifySimpleRecordBatch) {
+  struct ArrowIpcDecoder decoder;
+  struct ArrowError error;
+
+  struct ArrowBufferView data;
+  data.data.as_uint8 = kSimpleRecordBatch;
+  data.size_bytes = sizeof(kSimpleRecordBatch);
+
+  ArrowIpcDecoderInit(&decoder);
+  EXPECT_EQ(ArrowIpcDecoderVerifyHeader(&decoder, data, &error), NANOARROW_OK);
+  EXPECT_EQ(decoder.message_type, NANOARROW_IPC_MESSAGE_TYPE_RECORD_BATCH);
+  EXPECT_EQ(decoder.header_size_bytes,
+            sizeof(kSimpleRecordBatch) - decoder.body_size_bytes);
+  EXPECT_EQ(decoder.body_size_bytes, 16);
+
+  ArrowIpcDecoderReset(&decoder);
 }
 
 TEST(NanoarrowIpcTest, NanoarrowIpcDecodeSimpleSchema) {
-  struct ArrowIpcReader reader;
+  struct ArrowIpcDecoder decoder;
   struct ArrowError error;
+  struct ArrowSchema schema;
 
   struct ArrowBufferView data;
   data.data.as_uint8 = kSimpleSchema;
   data.size_bytes = sizeof(kSimpleSchema);
 
-  ArrowIpcReaderInit(&reader);
+  ArrowIpcDecoderInit(&decoder);
+
+  EXPECT_EQ(ArrowIpcDecoderDecodeSchema(&decoder, &schema, &error), EINVAL);
+  EXPECT_STREQ(error.message, "decoder did not just decode a Schema message");
+
+  EXPECT_EQ(ArrowIpcDecoderDecodeHeader(&decoder, data, &error), NANOARROW_OK);
+  EXPECT_EQ(decoder.header_size_bytes, sizeof(kSimpleSchema));
+  EXPECT_EQ(decoder.body_size_bytes, 0);
+
+  EXPECT_EQ(decoder.message_type, NANOARROW_IPC_MESSAGE_TYPE_SCHEMA);
+  EXPECT_EQ(decoder.endianness, NANOARROW_IPC_ENDIANNESS_LITTLE);
+  EXPECT_EQ(decoder.feature_flags, 0);
+
+  ASSERT_EQ(ArrowIpcDecoderDecodeSchema(&decoder, &schema, &error), 
NANOARROW_OK);
+  ASSERT_EQ(schema.n_children, 1);
+  EXPECT_STREQ(schema.children[0]->name, "some_col");
+  EXPECT_EQ(schema.children[0]->flags, ARROW_FLAG_NULLABLE);
+  EXPECT_STREQ(schema.children[0]->format, "i");
+
+  schema.release(&schema);
+  ArrowIpcDecoderReset(&decoder);
+}
+
+TEST(NanoarrowIpcTest, NanoarrowIpcDecodeSimpleRecordBatch) {
+  struct ArrowIpcDecoder decoder;
+  struct ArrowError error;
+  struct ArrowSchema schema;
+  struct ArrowArray array;
+
+  ArrowSchemaInit(&schema);
+  ASSERT_EQ(ArrowSchemaSetTypeStruct(&schema, 1), NANOARROW_OK);
+  ASSERT_EQ(ArrowSchemaSetType(schema.children[0], NANOARROW_TYPE_INT32), 
NANOARROW_OK);
+
+  struct ArrowBufferView data;
+  data.data.as_uint8 = kSimpleRecordBatch;
+  data.size_bytes = sizeof(kSimpleRecordBatch);
+
+  ArrowIpcDecoderInit(&decoder);
+  auto decoder_private =
+      reinterpret_cast<struct ArrowIpcDecoderPrivate*>(decoder.private_data);
+
+  // Attempt to get array should fail nicely here
+  EXPECT_EQ(ArrowIpcDecoderDecodeArray(&decoder, data, 0, nullptr, &error), 
EINVAL);
+  EXPECT_STREQ(error.message, "decoder did not just decode a RecordBatch 
message");
+
+  ASSERT_EQ(ArrowIpcDecoderSetSchema(&decoder, &schema, nullptr), 
NANOARROW_OK);
+
+  EXPECT_EQ(ArrowIpcDecoderDecodeHeader(&decoder, data, &error), NANOARROW_OK);
+  EXPECT_EQ(decoder.message_type, NANOARROW_IPC_MESSAGE_TYPE_RECORD_BATCH);
+  EXPECT_EQ(decoder.header_size_bytes,
+            sizeof(kSimpleRecordBatch) - decoder.body_size_bytes);
+  EXPECT_EQ(decoder.body_size_bytes, 16);
+
+  EXPECT_EQ(decoder.codec, NANOARROW_IPC_COMPRESSION_TYPE_NONE);
+
+  struct ArrowBufferView body;
+  body.data.as_uint8 = kSimpleRecordBatch + decoder.header_size_bytes;
+  body.size_bytes = decoder.body_size_bytes;
+
+  // Check full struct extract
+  EXPECT_EQ(ArrowIpcDecoderDecodeArray(&decoder, body, -1, &array, nullptr),
+            NANOARROW_OK);
+  EXPECT_EQ(array.length, 3);
+  EXPECT_EQ(array.null_count, 0);
+  ASSERT_EQ(array.n_children, 1);
+  ASSERT_EQ(array.children[0]->n_buffers, 2);
+  ASSERT_EQ(array.children[0]->length, 3);
+  EXPECT_EQ(array.children[0]->null_count, 0);
+  const int32_t* out = reinterpret_cast<const 
int32_t*>(array.children[0]->buffers[1]);
+  EXPECT_EQ(out[0], 1);
+  EXPECT_EQ(out[1], 2);
+  EXPECT_EQ(out[2], 3);
+
+  array.release(&array);
+
+  // Check field extract
+  EXPECT_EQ(ArrowIpcDecoderDecodeArray(&decoder, body, 0, &array, nullptr), 
NANOARROW_OK);
+  ASSERT_EQ(array.n_buffers, 2);
+  ASSERT_EQ(array.length, 3);
+  EXPECT_EQ(array.null_count, 0);
+  out = reinterpret_cast<const int32_t*>(array.buffers[1]);
+  EXPECT_EQ(out[0], 1);
+  EXPECT_EQ(out[1], 2);
+  EXPECT_EQ(out[2], 3);
+
+  array.release(&array);
+
+  // Field extract should fail if compression was set
+  decoder.codec = NANOARROW_IPC_COMPRESSION_TYPE_ZSTD;
+  EXPECT_EQ(ArrowIpcDecoderDecodeArray(&decoder, body, 0, &array, &error), 
ENOTSUP);
+  EXPECT_STREQ(error.message, "The nanoarrow_ipc extension does not support 
compression");
+  decoder.codec = NANOARROW_IPC_COMPRESSION_TYPE_NONE;
+
+  // Field extract should fail on non-system endian
+  // This test will have to get updated when we start testing on big endian
+  decoder.endianness = NANOARROW_IPC_ENDIANNESS_BIG;
+  EXPECT_EQ(ArrowIpcDecoderDecodeArray(&decoder, body, 0, &array, &error), 
ENOTSUP);
+  EXPECT_STREQ(error.message,
+               "The nanoarrow_ipc extension does not support non-system 
endianness");
+  decoder.endianness = NANOARROW_IPC_ENDIANNESS_UNINITIALIZED;
+
+  // Field extract should fail if body is too small
+  body.size_bytes = 0;
+  EXPECT_EQ(ArrowIpcDecoderDecodeArray(&decoder, body, 0, &array, &error), 
EINVAL);
+  EXPECT_STREQ(error.message,
+               "Buffer 1 requires body offsets [0..12) but body has size 0");
+
+  // Should error if the number of buffers or field nodes doesn't match
+  // (different numbers because we count the root struct and the message does 
not)
+  decoder_private->n_buffers = 1;
+  EXPECT_EQ(ArrowIpcDecoderDecodeHeader(&decoder, data, &error), EINVAL);
+  EXPECT_STREQ(error.message, "Expected 0 buffers in message but found 2");
+
+  decoder_private->n_fields = 1;
+  EXPECT_EQ(ArrowIpcDecoderDecodeHeader(&decoder, data, &error), EINVAL);
+  EXPECT_STREQ(error.message, "Expected 0 field nodes in message but found 1");
+
+  schema.release(&schema);
+  ArrowIpcDecoderReset(&decoder);
+}
+
+TEST(NanoarrowIpcTest, NanoarrowIpcSetSchema) {
+  struct ArrowIpcDecoder decoder;
+  struct ArrowSchema schema;
 
-  EXPECT_EQ(ArrowIpcReaderDecode(&reader, data, &error), NANOARROW_OK);
-  EXPECT_EQ(reader.header_size_bytes, sizeof(kSimpleSchema));
-  EXPECT_EQ(reader.body_size_bytes, 0);
+  ArrowSchemaInit(&schema);
+  ASSERT_EQ(ArrowSchemaSetTypeStruct(&schema, 1), NANOARROW_OK);
+  ASSERT_EQ(ArrowSchemaSetName(schema.children[0], "col1"), NANOARROW_OK);
+  ASSERT_EQ(ArrowSchemaSetType(schema.children[0], NANOARROW_TYPE_INT32), 
NANOARROW_OK);
 
-  EXPECT_EQ(reader.message_type, NANOARROW_IPC_MESSAGE_TYPE_SCHEMA);
-  EXPECT_EQ(reader.endianness, NANOARROW_IPC_ENDIANNESS_LITTLE);
-  EXPECT_EQ(reader.features, 0);
+  ArrowIpcDecoderInit(&decoder);
+  auto decoder_private =
+      reinterpret_cast<struct ArrowIpcDecoderPrivate*>(decoder.private_data);
 
-  ASSERT_EQ(reader.schema.n_children, 1);
-  EXPECT_STREQ(reader.schema.children[0]->name, "some_col");
-  EXPECT_EQ(reader.schema.children[0]->flags, ARROW_FLAG_NULLABLE);
-  EXPECT_STREQ(reader.schema.children[0]->format, "i");
+  EXPECT_EQ(ArrowIpcDecoderSetSchema(&decoder, &schema, nullptr), 
NANOARROW_OK);
+  EXPECT_EQ(decoder_private->n_fields, 2);
+  EXPECT_EQ(decoder_private->n_buffers, 3);
 
-  ArrowIpcReaderReset(&reader);
+  EXPECT_EQ(decoder_private->fields[0].array_view->storage_type, 
NANOARROW_TYPE_STRUCT);
+  EXPECT_EQ(decoder_private->fields[0].buffer_offset, 0);
+
+  EXPECT_EQ(decoder_private->fields[1].array_view->storage_type, 
NANOARROW_TYPE_INT32);
+  EXPECT_EQ(decoder_private->fields[1].buffer_offset, 1);
+
+  schema.release(&schema);
+  ArrowIpcDecoderReset(&decoder);
+}
+
+TEST(NanoarrowIpcTest, NanoarrowIpcSetSchemaErrors) {
+  struct ArrowIpcDecoder decoder;
+  struct ArrowError error;
+  struct ArrowSchema schema;
+
+  ArrowIpcDecoderInit(&decoder);
+  ArrowSchemaInit(&schema);
+
+  EXPECT_EQ(ArrowIpcDecoderSetSchema(&decoder, &schema, &error), EINVAL);
+  EXPECT_STREQ(
+      error.message,
+      "Error parsing schema->format: Expected a null-terminated string but 
found NULL");
+
+  ASSERT_EQ(ArrowSchemaInitFromType(&schema, NANOARROW_TYPE_INT32), 
NANOARROW_OK);
+  EXPECT_EQ(ArrowIpcDecoderSetSchema(&decoder, &schema, &error), EINVAL);
+  EXPECT_STREQ(error.message, "schema must be a struct type");
+
+  schema.release(&schema);
+  ArrowIpcDecoderReset(&decoder);
 }
 
 class ArrowTypeParameterizedTestFixture
@@ -188,21 +389,87 @@ TEST_P(ArrowTypeParameterizedTestFixture, 
NanoarrowIpcArrowTypeRoundtrip) {
   buffer_view.data.data = maybe_serialized.ValueUnsafe()->data();
   buffer_view.size_bytes = maybe_serialized.ValueOrDie()->size();
 
-  struct ArrowIpcReader reader;
-  ArrowIpcReaderInit(&reader);
-  ASSERT_EQ(ArrowIpcReaderVerify(&reader, buffer_view, nullptr), NANOARROW_OK);
-  EXPECT_EQ(reader.header_size_bytes, buffer_view.size_bytes);
-  EXPECT_EQ(reader.body_size_bytes, 0);
+  struct ArrowIpcDecoder decoder;
+  ArrowIpcDecoderInit(&decoder);
+  ASSERT_EQ(ArrowIpcDecoderVerifyHeader(&decoder, buffer_view, nullptr), 
NANOARROW_OK);
+  EXPECT_EQ(decoder.header_size_bytes, buffer_view.size_bytes);
+  EXPECT_EQ(decoder.body_size_bytes, 0);
 
-  ASSERT_EQ(ArrowIpcReaderDecode(&reader, buffer_view, nullptr), NANOARROW_OK);
-  auto maybe_schema = arrow::ImportSchema(&reader.schema);
+  ASSERT_EQ(ArrowIpcDecoderDecodeHeader(&decoder, buffer_view, nullptr), 
NANOARROW_OK);
+  struct ArrowSchema schema;
+  ASSERT_EQ(ArrowIpcDecoderDecodeSchema(&decoder, &schema, nullptr), 
NANOARROW_OK);
+  auto maybe_schema = arrow::ImportSchema(&schema);
   ASSERT_TRUE(maybe_schema.ok());
 
   // Better failure message if we first check for string equality
   EXPECT_EQ(maybe_schema.ValueUnsafe()->ToString(), dummy_schema->ToString());
   EXPECT_TRUE(maybe_schema.ValueUnsafe()->Equals(dummy_schema, true));
 
-  ArrowIpcReaderReset(&reader);
+  ArrowIpcDecoderReset(&decoder);
+}
+
+TEST_P(ArrowTypeParameterizedTestFixture, NanoarrowIpcArrowArrayRoundtrip) {
+  const std::shared_ptr<arrow::DataType>& data_type = GetParam();
+  std::shared_ptr<arrow::Schema> dummy_schema =
+      arrow::schema({arrow::field("dummy_name", data_type)});
+
+  auto maybe_empty = arrow::RecordBatch::MakeEmpty(dummy_schema);
+  ASSERT_TRUE(maybe_empty.ok());
+  auto empty = maybe_empty.ValueUnsafe();
+
+  auto maybe_nulls_array = arrow::MakeArrayOfNull(data_type, 3);
+  ASSERT_TRUE(maybe_nulls_array.ok());
+  auto nulls =
+      arrow::RecordBatch::Make(dummy_schema, 3, 
{maybe_nulls_array.ValueUnsafe()});
+
+  auto options = arrow::ipc::IpcWriteOptions::Defaults();
+
+  struct ArrowSchema schema;
+  struct ArrowIpcDecoder decoder;
+  struct ArrowBufferView buffer_view;
+  struct ArrowArray array;
+
+  // Initialize the decoder
+  ASSERT_TRUE(arrow::ExportSchema(*dummy_schema, &schema).ok());
+  ArrowIpcDecoderInit(&decoder);
+  ASSERT_EQ(ArrowIpcDecoderSetSchema(&decoder, &schema, nullptr), 
NANOARROW_OK);
+
+  // Check the empty array
+  auto maybe_serialized = arrow::ipc::SerializeRecordBatch(*empty, options);
+  ASSERT_TRUE(maybe_serialized.ok());
+  buffer_view.data.data = maybe_serialized.ValueUnsafe()->data();
+  buffer_view.size_bytes = maybe_serialized.ValueOrDie()->size();
+
+  ASSERT_EQ(ArrowIpcDecoderDecodeHeader(&decoder, buffer_view, nullptr), 
NANOARROW_OK);
+  buffer_view.data.as_uint8 += decoder.header_size_bytes;
+  buffer_view.size_bytes -= decoder.header_size_bytes;
+  ASSERT_EQ(ArrowIpcDecoderDecodeArray(&decoder, buffer_view, -1, &array, 
nullptr),
+            NANOARROW_OK);
+
+  auto maybe_batch = arrow::ImportRecordBatch(&array, dummy_schema);
+  ASSERT_TRUE(maybe_batch.ok());
+  EXPECT_EQ(maybe_batch.ValueUnsafe()->ToString(), empty->ToString());
+  EXPECT_TRUE(maybe_batch.ValueUnsafe()->Equals(*empty));
+
+  // Check the array with 3 null values
+  maybe_serialized = arrow::ipc::SerializeRecordBatch(*nulls, options);
+  ASSERT_TRUE(maybe_serialized.ok());
+  buffer_view.data.data = maybe_serialized.ValueUnsafe()->data();
+  buffer_view.size_bytes = maybe_serialized.ValueOrDie()->size();
+
+  ASSERT_EQ(ArrowIpcDecoderDecodeHeader(&decoder, buffer_view, nullptr), 
NANOARROW_OK);
+  buffer_view.data.as_uint8 += decoder.header_size_bytes;
+  buffer_view.size_bytes -= decoder.header_size_bytes;
+  ASSERT_EQ(ArrowIpcDecoderDecodeArray(&decoder, buffer_view, -1, &array, 
nullptr),
+            NANOARROW_OK);
+
+  maybe_batch = arrow::ImportRecordBatch(&array, dummy_schema);
+  ASSERT_TRUE(maybe_batch.ok());
+  EXPECT_EQ(maybe_batch.ValueUnsafe()->ToString(), nulls->ToString());
+  EXPECT_TRUE(maybe_batch.ValueUnsafe()->Equals(*nulls));
+
+  schema.release(&schema);
+  ArrowIpcDecoderReset(&decoder);
 }
 
 INSTANTIATE_TEST_SUITE_P(
@@ -267,21 +534,23 @@ TEST_P(ArrowSchemaParameterizedTestFixture, 
NanoarrowIpcArrowSchemaRoundtrip) {
   buffer_view.data.data = maybe_serialized.ValueUnsafe()->data();
   buffer_view.size_bytes = maybe_serialized.ValueOrDie()->size();
 
-  struct ArrowIpcReader reader;
-  ArrowIpcReaderInit(&reader);
-  ASSERT_EQ(ArrowIpcReaderVerify(&reader, buffer_view, nullptr), NANOARROW_OK);
-  EXPECT_EQ(reader.header_size_bytes, buffer_view.size_bytes);
-  EXPECT_EQ(reader.body_size_bytes, 0);
+  struct ArrowIpcDecoder decoder;
+  ArrowIpcDecoderInit(&decoder);
+  ASSERT_EQ(ArrowIpcDecoderVerifyHeader(&decoder, buffer_view, nullptr), 
NANOARROW_OK);
+  EXPECT_EQ(decoder.header_size_bytes, buffer_view.size_bytes);
+  EXPECT_EQ(decoder.body_size_bytes, 0);
 
-  ASSERT_EQ(ArrowIpcReaderDecode(&reader, buffer_view, nullptr), NANOARROW_OK);
-  auto maybe_schema = arrow::ImportSchema(&reader.schema);
+  ASSERT_EQ(ArrowIpcDecoderDecodeHeader(&decoder, buffer_view, nullptr), 
NANOARROW_OK);
+  struct ArrowSchema schema;
+  ASSERT_EQ(ArrowIpcDecoderDecodeSchema(&decoder, &schema, nullptr), 
NANOARROW_OK);
+  auto maybe_schema = arrow::ImportSchema(&schema);
   ASSERT_TRUE(maybe_schema.ok());
 
   // Better failure message if we first check for string equality
   EXPECT_EQ(maybe_schema.ValueUnsafe()->ToString(), arrow_schema->ToString());
   EXPECT_TRUE(maybe_schema.ValueUnsafe()->Equals(arrow_schema, true));
 
-  ArrowIpcReaderReset(&reader);
+  ArrowIpcDecoderReset(&decoder);
 }
 
 INSTANTIATE_TEST_SUITE_P(

Reply via email to