This is an automated email from the ASF dual-hosted git repository.

paleolimbot pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/arrow-nanoarrow.git


The following commit(s) were added to refs/heads/main by this push:
     new 62cffa6  feat(extensions/nanoarrow_ipc): Add single-threaded stream 
reader (#164)
62cffa6 is described below

commit 62cffa6a4e182235a82f3ea9f8715258b7b2ca56
Author: Dewey Dunnington <[email protected]>
AuthorDate: Wed Mar 22 16:05:58 2023 -0400

    feat(extensions/nanoarrow_ipc): Add single-threaded stream reader (#164)
    
    Higher level runtimes may be able to use the `ArrowIpcDecoder` (or more
    than one) and handle IO/parallelization using tools that are difficult
    to provide from C; however, for testing we do need the ability to read
    streams in their entirety. This PR provides a tool to do that based on
    an arbitrary bytes input.
    
    This reduces the overhead of coordinating the various steps required to
    decode a stream to:
    
    ```c
    struct ArrowIpcInputStream input;
    ArrowIpcInputStreamInit<Buffer|File|Custom Implementation>(&input, ...);
    
    struct ArrowArrayStream stream;
    ArrowIpcArrayStreamReaderInit(&stream, &input, nullptr);
    
    struct ArrowSchema schema;
    stream.get_schema(&stream, &schema);
    
    struct ArrowArray array;
    while (1) {
      stream.get_next(&stream, &array);
    
      if (array.release != NULL) {
        array.release(&array);
      } else {
        break;
      }
    }
    
    schema.release(&schema);
    stream.release(&stream);
    ```
    
    There is also a utility to read an entire stream from file or stdin:
    
    ```bash
    $ ./dump_stream big.arrows
    Read Schema <0.000122 seconds>
    struct
      state: string
      geometry: geoarrow.polygon{list}
        rings: list
          vertices: geoarrow.point{fixed_size_list(2)}
            xy: double
    Read 23548499 rows in 1 batch(es) <1.623532 seconds>
    ```
---
 .github/workflows/build-and-test-ipc.yaml          |  11 +-
 extensions/nanoarrow_ipc/CMakeLists.txt            |  24 +-
 extensions/nanoarrow_ipc/src/apps/dump_stream.c    | 138 +++++++
 .../nanoarrow_ipc/src/nanoarrow/nanoarrow_ipc.h    |  62 +++
 .../{nanoarrow_ipc.c => nanoarrow_ipc_decoder.c}   |  20 +-
 ...w_ipc_test.cc => nanoarrow_ipc_decoder_test.cc} |   0
 .../src/nanoarrow/nanoarrow_ipc_reader.c           | 425 +++++++++++++++++++++
 .../src/nanoarrow/nanoarrow_ipc_reader_test.cc     | 314 +++++++++++++++
 8 files changed, 977 insertions(+), 17 deletions(-)

diff --git a/.github/workflows/build-and-test-ipc.yaml 
b/.github/workflows/build-and-test-ipc.yaml
index 5c5b780..af2d543 100644
--- a/.github/workflows/build-and-test-ipc.yaml
+++ b/.github/workflows/build-and-test-ipc.yaml
@@ -41,7 +41,7 @@ jobs:
       fail-fast: false
       matrix:
         config:
-          - {label: default-build, cmake_args: ""}
+          - {label: default-build, cmake_args: "-DNANOARROW_IPC_BUILD_APPS=ON"}
           - {label: namespaced-build, cmake_args: 
"-DNANOARROW_NAMESPACE=SomeUserNamespace"}
           - {label: bundled-build, cmake_args: "-DNANOARROW_IPC_BUNDLE=ON"}
 
@@ -101,6 +101,15 @@ jobs:
           cd build
           ctest -T test --output-on-failure .
 
+      - name: Test dump_stream
+        if: matrix.config.label == 'default-build'
+        run: |
+          $SUBDIR/build/dump_stream || true
+          $SUBDIR/build/dump_stream this_is_not_a_file || true
+          $SUBDIR/build/dump_stream examples/cmake-ipc/invalid.arrows || true
+          $SUBDIR/build/dump_stream examples/cmake-ipc/schema-valid.arrows
+          cat examples/cmake-ipc/schema-valid.arrows | 
$SUBDIR/build/dump_stream -
+
       - name: Run tests with valgrind
         if: matrix.config.label == 'default-build'
         run: |
diff --git a/extensions/nanoarrow_ipc/CMakeLists.txt 
b/extensions/nanoarrow_ipc/CMakeLists.txt
index 38517f7..8830b35 100644
--- a/extensions/nanoarrow_ipc/CMakeLists.txt
+++ b/extensions/nanoarrow_ipc/CMakeLists.txt
@@ -22,6 +22,7 @@ include(FetchContent)
 project(nanoarrow_ipc)
 
 option(NANOARROW_IPC_BUILD_TESTS "Build tests" OFF)
+option(NANOARROW_IPC_BUILD_APPS "Build utility applications" OFF)
 option(NANOARROW_IPC_BUNDLE "Create bundled nanoarrow_ipc.h and 
nanoarrow_ipc.c" OFF)
 option(NANOARROW_IPC_FLATCC_ROOT_DIR "Root directory for flatcc include and 
lib directories" OFF)
 option(NANOARROW_IPC_FLATCC_INCLUDE_DIR "Include directory for flatcc 
includes" OFF)
@@ -91,11 +92,13 @@ if (NANOARROW_IPC_BUNDLE)
   file(READ src/nanoarrow/nanoarrow_ipc.h SRC_FILE_CONTENTS)
   file(WRITE ${NANOARROW_IPC_H_TEMP} "${SRC_FILE_CONTENTS}")
 
-  # combine flatcc-generated headers and nanoarrow_ipc.c
+  # combine flatcc-generated headers and nanoarrow_ipc sources
   set(NANOARROW_IPC_C_TEMP 
${CMAKE_BINARY_DIR}/amalgamation/nanoarrow/nanoarrow_ipc.c)
   file(READ src/nanoarrow/nanoarrow_ipc_flatcc_generated.h SRC_FILE_CONTENTS)
   file(WRITE ${NANOARROW_IPC_C_TEMP} "${SRC_FILE_CONTENTS}")
-  file(READ src/nanoarrow/nanoarrow_ipc.c SRC_FILE_CONTENTS)
+  file(READ src/nanoarrow/nanoarrow_ipc_decoder.c SRC_FILE_CONTENTS)
+  file(APPEND ${NANOARROW_IPC_C_TEMP} "${SRC_FILE_CONTENTS}")
+  file(READ src/nanoarrow/nanoarrow_ipc_reader.c SRC_FILE_CONTENTS)
   file(APPEND ${NANOARROW_IPC_C_TEMP} "${SRC_FILE_CONTENTS}")
 
   # remove the include for the generated files in the bundled version
@@ -143,7 +146,9 @@ if (NANOARROW_IPC_BUNDLE)
   install(DIRECTORY thirdparty/flatcc/include/flatcc DESTINATION ".")
 else()
   # This is a normal CMake build that builds + installs some includes and a 
static lib
-  add_library(nanoarrow_ipc src/nanoarrow/nanoarrow_ipc.c)
+  add_library(nanoarrow_ipc
+    src/nanoarrow/nanoarrow_ipc_decoder.c
+    src/nanoarrow/nanoarrow_ipc_reader.c)
   target_link_libraries(nanoarrow_ipc PRIVATE flatccrt)
 
   target_include_directories(nanoarrow_ipc PUBLIC
@@ -185,7 +190,8 @@ if (NANOARROW_IPC_BUILD_TESTS)
 
   enable_testing()
 
-  add_executable(nanoarrow_ipc_test src/nanoarrow/nanoarrow_ipc_test.cc)
+  add_executable(nanoarrow_ipc_decoder_test 
src/nanoarrow/nanoarrow_ipc_decoder_test.cc)
+  add_executable(nanoarrow_ipc_reader_test 
src/nanoarrow/nanoarrow_ipc_reader_test.cc)
 
   if(NANOARROW_IPC_CODE_COVERAGE)
     target_compile_options(ipc_coverage_config INTERFACE -O0 -g --coverage)
@@ -193,8 +199,14 @@ if (NANOARROW_IPC_BUILD_TESTS)
     target_link_libraries(nanoarrow_ipc PRIVATE ipc_coverage_config)
   endif()
 
-  target_link_libraries(nanoarrow_ipc_test nanoarrow_ipc nanoarrow 
arrow_shared gtest_main)
+  target_link_libraries(nanoarrow_ipc_decoder_test nanoarrow_ipc nanoarrow 
arrow_shared gtest_main)
+  target_link_libraries(nanoarrow_ipc_reader_test nanoarrow_ipc nanoarrow 
gtest_main)
 
   include(GoogleTest)
-  gtest_discover_tests(nanoarrow_ipc_test)
+  gtest_discover_tests(nanoarrow_ipc_decoder_test)
+endif()
+
+if (NANOARROW_IPC_BUILD_APPS)
+  add_executable(dump_stream src/apps/dump_stream.c)
+  target_link_libraries(dump_stream nanoarrow_ipc nanoarrow)
 endif()
diff --git a/extensions/nanoarrow_ipc/src/apps/dump_stream.c 
b/extensions/nanoarrow_ipc/src/apps/dump_stream.c
new file mode 100644
index 0000000..1a6b323
--- /dev/null
+++ b/extensions/nanoarrow_ipc/src/apps/dump_stream.c
@@ -0,0 +1,138 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include "nanoarrow/nanoarrow_ipc.h"
+
+#include <stdio.h>
+#include <string.h>
+#include <time.h>
+
+void dump_schema_to_stdout(struct ArrowSchema* schema, int level, char* buf,
+                           int buf_size) {
+  int n_chars = ArrowSchemaToString(schema, buf, buf_size, 0);
+
+  for (int i = 0; i < level; i++) {
+    fprintf(stdout, "  ");
+  }
+
+  if (schema->name == NULL) {
+    fprintf(stdout, "%s\n", buf);
+  } else {
+    fprintf(stdout, "%s: %s\n", schema->name, buf);
+  }
+
+  for (int64_t i = 0; i < schema->n_children; i++) {
+    dump_schema_to_stdout(schema->children[i], level + 1, buf, buf_size);
+  }
+}
+
+int main(int argc, char* argv[]) {
+  // Parse arguments
+  if (argc != 2) {
+    fprintf(stderr, "Usage: dump_stream FILENAME (or - for stdin)\n");
+    return 1;
+  }
+
+  // Sort the input stream
+  FILE* file_ptr;
+  if (strcmp(argv[1], "-") == 0) {
+    file_ptr = freopen(NULL, "rb", stdin);
+  } else {
+    file_ptr = fopen(argv[1], "rb");
+  }
+
+  if (file_ptr == NULL) {
+    fprintf(stderr, "Failed to open input '%s'\n", argv[1]);
+    return 1;
+  }
+
+  struct ArrowIpcInputStream input;
+  int result = ArrowIpcInputStreamInitFile(&input, file_ptr, 0);
+  if (result != NANOARROW_OK) {
+    fprintf(stderr, "ArrowIpcInputStreamInitFile() failed\n");
+    return 1;
+  }
+
+  struct ArrowArrayStream stream;
+  result = ArrowIpcArrayStreamReaderInit(&stream, &input, NULL);
+  if (result != NANOARROW_OK) {
+    fprintf(stderr, "ArrowIpcArrayStreamReaderInit() failed\n");
+    return 1;
+  }
+
+  clock_t begin = clock();
+
+  struct ArrowSchema schema;
+  result = stream.get_schema(&stream, &schema);
+  if (result != NANOARROW_OK) {
+    const char* message = stream.get_last_error(&stream);
+    if (message == NULL) {
+      message = "";
+    }
+
+    fprintf(stderr, "stream.get_schema() returned %d with error '%s'\n", 
result, message);
+    stream.release(&stream);
+    return 1;
+  }
+
+  clock_t end = clock();
+  double elapsed = (end - begin) / ((double)CLOCKS_PER_SEC);
+  fprintf(stdout, "Read Schema <%.06f seconds>\n", elapsed);
+
+  char schema_tmp[8096];
+  memset(schema_tmp, 0, sizeof(schema_tmp));
+  dump_schema_to_stdout(&schema, 0, schema_tmp, sizeof(schema_tmp));
+  schema.release(&schema);
+
+  struct ArrowArray array;
+  array.release = NULL;
+
+  int64_t batch_count = 0;
+  int64_t row_count = 0;
+  begin = clock();
+
+  while (1) {
+    result = stream.get_next(&stream, &array);
+    if (result != NANOARROW_OK) {
+      const char* message = stream.get_last_error(&stream);
+      if (message == NULL) {
+        message = "";
+      }
+
+      fprintf(stderr, "stream.get_next() returned %d with error '%s'\n", 
result, message);
+      stream.release(&stream);
+      return 1;
+    }
+
+    if (array.release != NULL) {
+      row_count += array.length;
+      batch_count++;
+      array.release(&array);
+    } else {
+      break;
+    }
+  }
+
+  end = clock();
+  elapsed = (end - begin) / ((double)CLOCKS_PER_SEC);
+  fprintf(stdout, "Read %ld rows in %ld batch(es) <%.06f seconds>\n", 
(long)row_count,
+          (long)batch_count, elapsed);
+
+  stream.release(&stream);
+  fclose(file_ptr);
+  return 0;
+}
diff --git a/extensions/nanoarrow_ipc/src/nanoarrow/nanoarrow_ipc.h 
b/extensions/nanoarrow_ipc/src/nanoarrow/nanoarrow_ipc.h
index 155150e..071b299 100644
--- a/extensions/nanoarrow_ipc/src/nanoarrow/nanoarrow_ipc.h
+++ b/extensions/nanoarrow_ipc/src/nanoarrow/nanoarrow_ipc.h
@@ -39,6 +39,14 @@
   NANOARROW_SYMBOL(NANOARROW_NAMESPACE, ArrowIpcDecoderSetSchema)
 #define ArrowIpcDecoderSetEndianness \
   NANOARROW_SYMBOL(NANOARROW_NAMESPACE, ArrowIpcDecoderSetEndianness)
+#define ArrowIpcInputStreamInitBuffer \
+  NANOARROW_SYMBOL(NANOARROW_NAMESPACE, ArrowIpcInputStreamInitBuffer)
+#define ArrowIpcInputStreamInitFile \
+  NANOARROW_SYMBOL(NANOARROW_NAMESPACE, ArrowIpcInputStreamInitFile)
+#define ArrowIpcInputStreamMove \
+  NANOARROW_SYMBOL(NANOARROW_NAMESPACE, ArrowIpcInputStreamMove)
+#define ArrowIpcArrayStreamReaderInit \
+  NANOARROW_SYMBOL(NANOARROW_NAMESPACE, ArrowIpcArrayStreamReaderInit)
 
 #endif
 
@@ -219,6 +227,60 @@ ArrowErrorCode ArrowIpcDecoderDecodeArray(struct 
ArrowIpcDecoder* decoder,
                                           struct ArrowArray* out,
                                           struct ArrowError* error);
 
+/// \brief An user-extensible input data source
+struct ArrowIpcInputStream {
+  /// \brief Read up to buf_size_bytes from stream into buf
+  ///
+  /// The actual number of bytes read is placed in the value pointed to by
+  /// size_read_out. Returns NANOARROW_OK on success.
+  ArrowErrorCode (*read)(struct ArrowIpcInputStream* stream, uint8_t* buf,
+                         int64_t buf_size_bytes, int64_t* size_read_out,
+                         struct ArrowError* error);
+
+  /// \brief Release the stream and any resources it may be holding
+  ///
+  /// Release callback implementations must set the release member to NULL.
+  /// Callers must check that the release callback is not NULL before calling
+  /// read() or release().
+  void (*release)(struct ArrowIpcInputStream* stream);
+
+  /// \brief Private implementation-defined data
+  void* private_data;
+};
+
+/// \brief Transfer ownership of an ArrowIpcInputStream
+void ArrowIpcInputStreamMove(struct ArrowIpcInputStream* src,
+                             struct ArrowIpcInputStream* dst);
+
+/// \brief Create an input stream from an ArrowBuffer
+ArrowErrorCode ArrowIpcInputStreamInitBuffer(struct ArrowIpcInputStream* 
stream,
+                                             struct ArrowBuffer* input);
+
+/// \brief Create an input stream from a C FILE* pointer
+///
+/// Note that the ArrowIpcInputStream has no mechanism to communicate an error
+/// if file_ptr fails to close. If this behaviour is needed, pass false to
+/// close_on_release and handle closing the file independently from stream.
+ArrowErrorCode ArrowIpcInputStreamInitFile(struct ArrowIpcInputStream* stream,
+                                           void* file_ptr, int 
close_on_release);
+
+/// \brief Options for ArrowIpcArrayStreamReaderInit()
+struct ArrowIpcArrayStreamReaderOptions {
+  /// \brief The field index to extract. Defaults to -1 (i.e., read all 
fields).
+  int64_t field_index;
+};
+
+/// \brief Initialize an ArrowArrayStream from an input stream of bytes
+///
+/// The stream of bytes must begin with a Schema message and be followed by
+/// zero or more RecordBatch messages as described in the Arrow IPC stream
+/// format specification. Returns NANOARROW_OK on success. If NANOARROW_OK
+/// is returned, the ArrowArrayStream takes ownership of input_stream and
+/// the caller is responsible for releasing out.
+ArrowErrorCode ArrowIpcArrayStreamReaderInit(
+    struct ArrowArrayStream* out, struct ArrowIpcInputStream* input_stream,
+    struct ArrowIpcArrayStreamReaderOptions* options);
+
 #ifdef __cplusplus
 }
 #endif
diff --git a/extensions/nanoarrow_ipc/src/nanoarrow/nanoarrow_ipc.c 
b/extensions/nanoarrow_ipc/src/nanoarrow/nanoarrow_ipc_decoder.c
similarity index 98%
rename from extensions/nanoarrow_ipc/src/nanoarrow/nanoarrow_ipc.c
rename to extensions/nanoarrow_ipc/src/nanoarrow/nanoarrow_ipc_decoder.c
index 91011b8..e9c673e 100644
--- a/extensions/nanoarrow_ipc/src/nanoarrow/nanoarrow_ipc.c
+++ b/extensions/nanoarrow_ipc/src/nanoarrow/nanoarrow_ipc_decoder.c
@@ -801,21 +801,22 @@ static inline int ArrowIpcDecoderCheckHeader(struct 
ArrowIpcDecoder* decoder,
   }
 
   int swap_endian = private_data->system_endianness == 
NANOARROW_IPC_ENDIANNESS_BIG;
-  *message_size_bytes = ArrowIpcReadInt32LE(data_mut, swap_endian);
-  if ((*message_size_bytes) < 0) {
+  int32_t header_body_size_bytes = ArrowIpcReadInt32LE(data_mut, swap_endian);
+  *message_size_bytes = header_body_size_bytes + (2 * sizeof(int32_t));
+  if (header_body_size_bytes < 0) {
     ArrowErrorSet(
         error, "Expected message body size > 0 but found message body size of 
%ld bytes",
-        (long)(*message_size_bytes));
+        (long)header_body_size_bytes);
     return EINVAL;
-  } else if ((*message_size_bytes) > data_mut->size_bytes) {
+  } else if (header_body_size_bytes > data_mut->size_bytes) {
     ArrowErrorSet(error,
                   "Expected 0 <= message body size <= %ld bytes but found 
message "
                   "body size of %ld bytes",
-                  (long)data_mut->size_bytes, (long)(*message_size_bytes));
+                  (long)data_mut->size_bytes, (long)header_body_size_bytes);
     return ESPIPE;
   }
 
-  if (*message_size_bytes == 0) {
+  if (header_body_size_bytes == 0) {
     ArrowErrorSet(error, "End of Arrow stream");
     return ENODATA;
   }
@@ -832,7 +833,6 @@ ArrowErrorCode ArrowIpcDecoderPeekHeader(struct 
ArrowIpcDecoder* decoder,
   ArrowIpcDecoderResetHeaderInfo(decoder);
   NANOARROW_RETURN_NOT_OK(
       ArrowIpcDecoderCheckHeader(decoder, &data, &decoder->header_size_bytes, 
error));
-  decoder->header_size_bytes += 2 * sizeof(int32_t);
   return NANOARROW_OK;
 }
 
@@ -847,14 +847,14 @@ ArrowErrorCode ArrowIpcDecoderVerifyHeader(struct 
ArrowIpcDecoder* decoder,
       ArrowIpcDecoderCheckHeader(decoder, &data, &decoder->header_size_bytes, 
error));
 
   // Run flatbuffers verification
-  if (ns(Message_verify_as_root(data.data.as_uint8, 
decoder->header_size_bytes)) !=
+  if (ns(Message_verify_as_root(data.data.as_uint8,
+                                decoder->header_size_bytes - (2 * 
sizeof(int32_t)))) !=
       flatcc_verify_ok) {
     ArrowErrorSet(error, "Message flatbuffer verification failed");
     return EINVAL;
   }
 
   // Read some basic information from the message
-  decoder->header_size_bytes += 2 * sizeof(int32_t);
   ns(Message_table_t) message = ns(Message_as_root(data.data.as_uint8));
   decoder->metadata_version = ns(Message_version(message));
   decoder->message_type = ns(Message_header_type(message));
@@ -873,7 +873,6 @@ ArrowErrorCode ArrowIpcDecoderDecodeHeader(struct 
ArrowIpcDecoder* decoder,
   ArrowIpcDecoderResetHeaderInfo(decoder);
   NANOARROW_RETURN_NOT_OK(
       ArrowIpcDecoderCheckHeader(decoder, &data, &decoder->header_size_bytes, 
error));
-  decoder->header_size_bytes += 2 * sizeof(int32_t);
 
   ns(Message_table_t) message = ns(Message_as_root(data.data.as_uint8));
   if (!message) {
@@ -1046,6 +1045,7 @@ ArrowErrorCode ArrowIpcDecoderSetEndianness(struct 
ArrowIpcDecoder* decoder,
     case NANOARROW_IPC_ENDIANNESS_LITTLE:
     case NANOARROW_IPC_ENDIANNESS_BIG:
       private_data->endianness = endianness;
+      return NANOARROW_OK;
     default:
       return EINVAL;
   }
diff --git a/extensions/nanoarrow_ipc/src/nanoarrow/nanoarrow_ipc_test.cc 
b/extensions/nanoarrow_ipc/src/nanoarrow/nanoarrow_ipc_decoder_test.cc
similarity index 100%
rename from extensions/nanoarrow_ipc/src/nanoarrow/nanoarrow_ipc_test.cc
rename to extensions/nanoarrow_ipc/src/nanoarrow/nanoarrow_ipc_decoder_test.cc
diff --git a/extensions/nanoarrow_ipc/src/nanoarrow/nanoarrow_ipc_reader.c 
b/extensions/nanoarrow_ipc/src/nanoarrow/nanoarrow_ipc_reader.c
new file mode 100644
index 0000000..f813cab
--- /dev/null
+++ b/extensions/nanoarrow_ipc/src/nanoarrow/nanoarrow_ipc_reader.c
@@ -0,0 +1,425 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include <errno.h>
+#include <stdio.h>
+#include <string.h>
+
+#include "nanoarrow.h"
+#include "nanoarrow_ipc.h"
+
+void ArrowIpcInputStreamMove(struct ArrowIpcInputStream* src,
+                             struct ArrowIpcInputStream* dst) {
+  memcpy(dst, src, sizeof(struct ArrowIpcInputStream));
+  src->release = NULL;
+}
+
+struct ArrowIpcInputStreamBufferPrivate {
+  struct ArrowBuffer input;
+  int64_t cursor_bytes;
+};
+
+static ArrowErrorCode ArrowIpcInputStreamBufferRead(struct 
ArrowIpcInputStream* stream,
+                                                    uint8_t* buf, int64_t 
buf_size_bytes,
+                                                    int64_t* size_read_out,
+                                                    struct ArrowError* error) {
+  if (buf_size_bytes == 0) {
+    *size_read_out = 0;
+    return NANOARROW_OK;
+  }
+
+  struct ArrowIpcInputStreamBufferPrivate* private_data =
+      (struct ArrowIpcInputStreamBufferPrivate*)stream->private_data;
+  int64_t bytes_remaining = private_data->input.size_bytes - 
private_data->cursor_bytes;
+  int64_t bytes_to_read;
+  if (bytes_remaining > buf_size_bytes) {
+    bytes_to_read = buf_size_bytes;
+  } else {
+    bytes_to_read = bytes_remaining;
+  }
+
+  if (bytes_to_read > 0) {
+    memcpy(buf, private_data->input.data + private_data->cursor_bytes, 
bytes_to_read);
+  }
+
+  *size_read_out = bytes_to_read;
+  private_data->cursor_bytes += bytes_to_read;
+  return NANOARROW_OK;
+}
+
+static void ArrowIpcInputStreamBufferRelease(struct ArrowIpcInputStream* 
stream) {
+  struct ArrowIpcInputStreamBufferPrivate* private_data =
+      (struct ArrowIpcInputStreamBufferPrivate*)stream->private_data;
+  ArrowBufferReset(&private_data->input);
+  ArrowFree(private_data);
+  stream->release = NULL;
+}
+
+ArrowErrorCode ArrowIpcInputStreamInitBuffer(struct ArrowIpcInputStream* 
stream,
+                                             struct ArrowBuffer* input) {
+  struct ArrowIpcInputStreamBufferPrivate* private_data =
+      (struct ArrowIpcInputStreamBufferPrivate*)ArrowMalloc(
+          sizeof(struct ArrowIpcInputStreamBufferPrivate));
+  if (private_data == NULL) {
+    return ENOMEM;
+  }
+
+  ArrowBufferMove(input, &private_data->input);
+  private_data->cursor_bytes = 0;
+  stream->read = &ArrowIpcInputStreamBufferRead;
+  stream->release = &ArrowIpcInputStreamBufferRelease;
+  stream->private_data = private_data;
+
+  return NANOARROW_OK;
+}
+
+struct ArrowIpcInputStreamFilePrivate {
+  FILE* file_ptr;
+  int stream_finished;
+  int close_on_release;
+};
+
+static void ArrowIpcInputStreamFileRelease(struct ArrowIpcInputStream* stream) 
{
+  struct ArrowIpcInputStreamFilePrivate* private_data =
+      (struct ArrowIpcInputStreamFilePrivate*)stream->private_data;
+
+  if (private_data->file_ptr != NULL && private_data->close_on_release) {
+    fclose(private_data->file_ptr);
+  }
+
+  ArrowFree(private_data);
+  stream->release = NULL;
+}
+
+static ArrowErrorCode ArrowIpcInputStreamFileRead(struct ArrowIpcInputStream* 
stream,
+                                                  uint8_t* buf, int64_t 
buf_size_bytes,
+                                                  int64_t* size_read_out,
+                                                  struct ArrowError* error) {
+  struct ArrowIpcInputStreamFilePrivate* private_data =
+      (struct ArrowIpcInputStreamFilePrivate*)stream->private_data;
+
+  if (private_data->stream_finished) {
+    *size_read_out = 0;
+    return NANOARROW_OK;
+  }
+
+  // Do the read
+  int64_t bytes_read = (int64_t)fread(buf, 1, buf_size_bytes, 
private_data->file_ptr);
+  *size_read_out = bytes_read;
+
+  if (bytes_read != buf_size_bytes) {
+    private_data->stream_finished = 1;
+
+    // Inspect error
+    int has_error = !feof(private_data->file_ptr) && 
ferror(private_data->file_ptr);
+
+    // Try to close the file now
+    if (private_data->close_on_release) {
+      if (fclose(private_data->file_ptr) == 0) {
+        private_data->file_ptr = NULL;
+      }
+    }
+
+    // Maybe return error
+    if (has_error) {
+      ArrowErrorSet(error, "ArrowIpcInputStreamFile IO error");
+      return EIO;
+    }
+  }
+
+  return NANOARROW_OK;
+}
+
+ArrowErrorCode ArrowIpcInputStreamInitFile(struct ArrowIpcInputStream* stream,
+                                           void* file_ptr, int 
close_on_release) {
+  struct ArrowIpcInputStreamFilePrivate* private_data =
+      (struct ArrowIpcInputStreamFilePrivate*)ArrowMalloc(
+          sizeof(struct ArrowIpcInputStreamFilePrivate));
+  if (private_data == NULL) {
+    return ENOMEM;
+  }
+
+  private_data->file_ptr = (FILE*)file_ptr;
+  private_data->close_on_release = close_on_release;
+  private_data->stream_finished = 0;
+
+  stream->read = &ArrowIpcInputStreamFileRead;
+  stream->release = &ArrowIpcInputStreamFileRelease;
+  stream->private_data = private_data;
+  return NANOARROW_OK;
+}
+
+struct ArrowIpcArrayStreamReaderPrivate {
+  struct ArrowIpcInputStream input;
+  struct ArrowIpcDecoder decoder;
+  struct ArrowSchema out_schema;
+  int64_t field_index;
+  struct ArrowBuffer header;
+  struct ArrowBuffer body;
+  struct ArrowError error;
+};
+
+static void ArrowIpcArrayStreamReaderRelease(struct ArrowArrayStream* stream) {
+  struct ArrowIpcArrayStreamReaderPrivate* private_data =
+      (struct ArrowIpcArrayStreamReaderPrivate*)stream->private_data;
+
+  if (private_data->input.release != NULL) {
+    private_data->input.release(&private_data->input);
+  }
+
+  ArrowIpcDecoderReset(&private_data->decoder);
+
+  if (private_data->out_schema.release != NULL) {
+    private_data->out_schema.release(&private_data->out_schema);
+  }
+
+  ArrowBufferReset(&private_data->header);
+  ArrowBufferReset(&private_data->body);
+
+  ArrowFree(private_data);
+  stream->release = NULL;
+}
+
+static int ArrowIpcArrayStreamReaderNextHeader(
+    struct ArrowIpcArrayStreamReaderPrivate* private_data,
+    enum ArrowIpcMessageType message_type) {
+  private_data->header.size_bytes = 0;
+  int64_t bytes_read = 0;
+
+  // Read 8 bytes (continuation + header size in bytes)
+  NANOARROW_RETURN_NOT_OK(ArrowBufferReserve(&private_data->header, 8));
+  NANOARROW_RETURN_NOT_OK(private_data->input.read(&private_data->input,
+                                                   private_data->header.data, 
8,
+                                                   &bytes_read, 
&private_data->error));
+  private_data->header.size_bytes += bytes_read;
+
+  if (bytes_read == 0) {
+    // The caller might not use this error message (e.g., if the end of the 
stream
+    // is one of the valid outcomes) but we set the error anyway in case it 
gets
+    // propagated higher (e.g., if the stream is emtpy and there's no schema 
message)
+    ArrowErrorSet(&private_data->error, "No data available on stream");
+    return ENODATA;
+  } else if (bytes_read != 8) {
+    ArrowErrorSet(&private_data->error,
+                  "Expected at least 8 bytes in remainder of stream");
+    return EINVAL;
+  }
+
+  struct ArrowBufferView input_view;
+  input_view.data.data = private_data->header.data;
+  input_view.size_bytes = private_data->header.size_bytes;
+
+  // Use PeekHeader to fill in decoder.header_size_bytes
+  int result =
+      ArrowIpcDecoderPeekHeader(&private_data->decoder, input_view, 
&private_data->error);
+  if (result == ENODATA) {
+    return result;
+  }
+
+  // Read the header bytes
+  int64_t expected_header_bytes = private_data->decoder.header_size_bytes - 8;
+  NANOARROW_RETURN_NOT_OK(
+      ArrowBufferReserve(&private_data->header, expected_header_bytes));
+  NANOARROW_RETURN_NOT_OK(
+      private_data->input.read(&private_data->input, private_data->header.data 
+ 8,
+                               expected_header_bytes, &bytes_read, 
&private_data->error));
+  private_data->header.size_bytes += bytes_read;
+
+  // Verify + decode the header
+  input_view.data.data = private_data->header.data;
+  input_view.size_bytes = private_data->header.size_bytes;
+  NANOARROW_RETURN_NOT_OK(ArrowIpcDecoderVerifyHeader(&private_data->decoder, 
input_view,
+                                                      &private_data->error));
+
+  // Don't decode the message if it's of the wrong type (because the error 
message
+  // is better communicated by the caller)
+  if (private_data->decoder.message_type != message_type) {
+    return NANOARROW_OK;
+  }
+
+  NANOARROW_RETURN_NOT_OK(ArrowIpcDecoderDecodeHeader(&private_data->decoder, 
input_view,
+                                                      &private_data->error));
+  return NANOARROW_OK;
+}
+
+static int ArrowIpcArrayStreamReaderNextBody(
+    struct ArrowIpcArrayStreamReaderPrivate* private_data) {
+  int64_t bytes_read;
+  int64_t bytes_to_read = private_data->decoder.body_size_bytes;
+
+  // Read the body bytes
+  private_data->body.size_bytes = 0;
+  NANOARROW_RETURN_NOT_OK(ArrowBufferReserve(&private_data->body, 
bytes_to_read));
+  NANOARROW_RETURN_NOT_OK(private_data->input.read(&private_data->input,
+                                                   private_data->body.data, 
bytes_to_read,
+                                                   &bytes_read, 
&private_data->error));
+  private_data->body.size_bytes += bytes_read;
+
+  return NANOARROW_OK;
+}
+
+static int ArrowIpcArrayStreamReaderReadSchemaIfNeeded(
+    struct ArrowIpcArrayStreamReaderPrivate* private_data) {
+  if (private_data->out_schema.release != NULL) {
+    return NANOARROW_OK;
+  }
+
+  NANOARROW_RETURN_NOT_OK(ArrowIpcArrayStreamReaderNextHeader(
+      private_data, NANOARROW_IPC_MESSAGE_TYPE_SCHEMA));
+
+  // Error if this isn't a schema message
+  if (private_data->decoder.message_type != NANOARROW_IPC_MESSAGE_TYPE_SCHEMA) 
{
+    ArrowErrorSet(&private_data->error,
+                  "Unexpected message type at start of input (expected 
Schema)");
+    return EINVAL;
+  }
+
+  // ...or if it uses features we don't support
+  if (private_data->decoder.feature_flags & 
NANOARROW_IPC_FEATURE_COMPRESSED_BODY) {
+    ArrowErrorSet(&private_data->error,
+                  "This stream uses unsupported feature COMPRESSED_BODY");
+    return EINVAL;
+  }
+
+  if (private_data->decoder.feature_flags &
+      NANOARROW_IPC_FEATURE_DICTIONARY_REPLACEMENT) {
+    ArrowErrorSet(&private_data->error,
+                  "This stream uses unsupported feature 
DICTIONARY_REPLACEMENT");
+    return EINVAL;
+  }
+
+  // Notify the decoder of buffer endianness
+  NANOARROW_RETURN_NOT_OK(ArrowIpcDecoderSetEndianness(&private_data->decoder,
+                                                       
private_data->decoder.endianness));
+
+  struct ArrowSchema tmp;
+  NANOARROW_RETURN_NOT_OK(
+      ArrowIpcDecoderDecodeSchema(&private_data->decoder, &tmp, 
&private_data->error));
+
+  // Only support "read the whole thing" for now
+  if (private_data->field_index != -1) {
+    tmp.release(&tmp);
+    ArrowErrorSet(&private_data->error, "Field index != -1 is not yet 
supported");
+    return ENOTSUP;
+  }
+
+  // Notify the decoder of the schema for forthcoming messages
+  int result =
+      ArrowIpcDecoderSetSchema(&private_data->decoder, &tmp, 
&private_data->error);
+  if (result != NANOARROW_OK) {
+    tmp.release(&tmp);
+    return result;
+  }
+
+  ArrowSchemaMove(&tmp, &private_data->out_schema);
+  return NANOARROW_OK;
+}
+
+static int ArrowIpcArrayStreamReaderGetSchema(struct ArrowArrayStream* stream,
+                                              struct ArrowSchema* out) {
+  struct ArrowIpcArrayStreamReaderPrivate* private_data =
+      (struct ArrowIpcArrayStreamReaderPrivate*)stream->private_data;
+  private_data->error.message[0] = '\0';
+  
NANOARROW_RETURN_NOT_OK(ArrowIpcArrayStreamReaderReadSchemaIfNeeded(private_data));
+  return ArrowSchemaDeepCopy(&private_data->out_schema, out);
+}
+
+static int ArrowIpcArrayStreamReaderGetNext(struct ArrowArrayStream* stream,
+                                            struct ArrowArray* out) {
+  struct ArrowIpcArrayStreamReaderPrivate* private_data =
+      (struct ArrowIpcArrayStreamReaderPrivate*)stream->private_data;
+  // Check if we are all done
+  if (private_data->input.release == NULL) {
+    out->release = NULL;
+    return NANOARROW_OK;
+  }
+
+  private_data->error.message[0] = '\0';
+  
NANOARROW_RETURN_NOT_OK(ArrowIpcArrayStreamReaderReadSchemaIfNeeded(private_data));
+
+  // Read + decode the next header
+  int result = ArrowIpcArrayStreamReaderNextHeader(
+      private_data, NANOARROW_IPC_MESSAGE_TYPE_RECORD_BATCH);
+  if (result == ENODATA) {
+    // If the stream is finished, release the input
+    private_data->input.release(&private_data->input);
+    out->release = NULL;
+    return NANOARROW_OK;
+  }
+
+  // Make sure we have a RecordBatch message
+  if (private_data->decoder.message_type != 
NANOARROW_IPC_MESSAGE_TYPE_RECORD_BATCH) {
+    ArrowErrorSet(&private_data->error, "Unexpected message type (expected 
RecordBatch)");
+    return EINVAL;
+  }
+
+  // Read in the body
+  NANOARROW_RETURN_NOT_OK(ArrowIpcArrayStreamReaderNextBody(private_data));
+
+  struct ArrowBufferView body_view;
+  body_view.data.data = private_data->body.data;
+  body_view.size_bytes = private_data->body.size_bytes;
+
+  NANOARROW_RETURN_NOT_OK(ArrowIpcDecoderDecodeArray(&private_data->decoder, 
body_view,
+                                                     
private_data->field_index, out,
+                                                     &private_data->error));
+
+  return NANOARROW_OK;
+}
+
+static const char* ArrowIpcArrayStreamReaderGetLastError(
+    struct ArrowArrayStream* stream) {
+  struct ArrowIpcArrayStreamReaderPrivate* private_data =
+      (struct ArrowIpcArrayStreamReaderPrivate*)stream->private_data;
+  return private_data->error.message;
+}
+
+ArrowErrorCode ArrowIpcArrayStreamReaderInit(
+    struct ArrowArrayStream* out, struct ArrowIpcInputStream* input_stream,
+    struct ArrowIpcArrayStreamReaderOptions* options) {
+  struct ArrowIpcArrayStreamReaderPrivate* private_data =
+      (struct ArrowIpcArrayStreamReaderPrivate*)ArrowMalloc(
+          sizeof(struct ArrowIpcArrayStreamReaderPrivate));
+  if (private_data == NULL) {
+    return ENOMEM;
+  }
+
+  int result = ArrowIpcDecoderInit(&private_data->decoder);
+  if (result != NANOARROW_OK) {
+    ArrowFree(private_data);
+    return result;
+  }
+
+  ArrowBufferInit(&private_data->header);
+  ArrowBufferInit(&private_data->body);
+  private_data->out_schema.release = NULL;
+  ArrowIpcInputStreamMove(input_stream, &private_data->input);
+
+  if (options != NULL) {
+    private_data->field_index = options->field_index;
+  } else {
+    private_data->field_index = -1;
+  }
+
+  out->private_data = private_data;
+  out->get_schema = &ArrowIpcArrayStreamReaderGetSchema;
+  out->get_next = &ArrowIpcArrayStreamReaderGetNext;
+  out->get_last_error = &ArrowIpcArrayStreamReaderGetLastError;
+  out->release = &ArrowIpcArrayStreamReaderRelease;
+
+  return NANOARROW_OK;
+}
diff --git 
a/extensions/nanoarrow_ipc/src/nanoarrow/nanoarrow_ipc_reader_test.cc 
b/extensions/nanoarrow_ipc/src/nanoarrow/nanoarrow_ipc_reader_test.cc
new file mode 100644
index 0000000..126aaf3
--- /dev/null
+++ b/extensions/nanoarrow_ipc/src/nanoarrow/nanoarrow_ipc_reader_test.cc
@@ -0,0 +1,314 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include <gtest/gtest.h>
+
+#include <stdio.h>
+
+#include "nanoarrow_ipc.h"
+
+static uint8_t kSimpleSchema[] = {
+    0xff, 0xff, 0xff, 0xff, 0x10, 0x01, 0x00, 0x00, 0x10, 0x00, 0x00, 0x00, 
0x00, 0x00,
+    0x0a, 0x00, 0x0e, 0x00, 0x06, 0x00, 0x05, 0x00, 0x08, 0x00, 0x0a, 0x00, 
0x00, 0x00,
+    0x00, 0x01, 0x04, 0x00, 0x10, 0x00, 0x00, 0x00, 0x00, 0x00, 0x0a, 0x00, 
0x0c, 0x00,
+    0x00, 0x00, 0x04, 0x00, 0x08, 0x00, 0x0a, 0x00, 0x00, 0x00, 0x3c, 0x00, 
0x00, 0x00,
+    0x04, 0x00, 0x00, 0x00, 0x01, 0x00, 0x00, 0x00, 0x04, 0x00, 0x00, 0x00, 
0x84, 0xff,
+    0xff, 0xff, 0x18, 0x00, 0x00, 0x00, 0x04, 0x00, 0x00, 0x00, 0x0a, 0x00, 
0x00, 0x00,
+    0x73, 0x6f, 0x6d, 0x65, 0x5f, 0x76, 0x61, 0x6c, 0x75, 0x65, 0x00, 0x00, 
0x08, 0x00,
+    0x00, 0x00, 0x73, 0x6f, 0x6d, 0x65, 0x5f, 0x6b, 0x65, 0x79, 0x00, 0x00, 
0x00, 0x00,
+    0x01, 0x00, 0x00, 0x00, 0x18, 0x00, 0x00, 0x00, 0x00, 0x00, 0x12, 0x00, 
0x18, 0x00,
+    0x08, 0x00, 0x06, 0x00, 0x07, 0x00, 0x0c, 0x00, 0x00, 0x00, 0x10, 0x00, 
0x14, 0x00,
+    0x12, 0x00, 0x00, 0x00, 0x00, 0x00, 0x01, 0x02, 0x14, 0x00, 0x00, 0x00, 
0x70, 0x00,
+    0x00, 0x00, 0x08, 0x00, 0x00, 0x00, 0x18, 0x00, 0x00, 0x00, 0x00, 0x00, 
0x00, 0x00,
+    0x08, 0x00, 0x00, 0x00, 0x73, 0x6f, 0x6d, 0x65, 0x5f, 0x63, 0x6f, 0x6c, 
0x00, 0x00,
+    0x00, 0x00, 0x01, 0x00, 0x00, 0x00, 0x0c, 0x00, 0x00, 0x00, 0x08, 0x00, 
0x0c, 0x00,
+    0x04, 0x00, 0x08, 0x00, 0x08, 0x00, 0x00, 0x00, 0x20, 0x00, 0x00, 0x00, 
0x04, 0x00,
+    0x00, 0x00, 0x10, 0x00, 0x00, 0x00, 0x73, 0x6f, 0x6d, 0x65, 0x5f, 0x76, 
0x61, 0x6c,
+    0x75, 0x65, 0x5f, 0x66, 0x69, 0x65, 0x6c, 0x64, 0x00, 0x00, 0x00, 0x00, 
0x0e, 0x00,
+    0x00, 0x00, 0x73, 0x6f, 0x6d, 0x65, 0x5f, 0x6b, 0x65, 0x79, 0x5f, 0x66, 
0x69, 0x65,
+    0x6c, 0x64, 0x00, 0x00, 0x08, 0x00, 0x0c, 0x00, 0x08, 0x00, 0x07, 0x00, 
0x08, 0x00,
+    0x00, 0x00, 0x00, 0x00, 0x00, 0x01, 0x20, 0x00, 0x00, 0x00, 0x00, 0x00, 
0x00, 0x00};
+
+static uint8_t kSimpleRecordBatch[] = {
+    0xff, 0xff, 0xff, 0xff, 0x88, 0x00, 0x00, 0x00, 0x14, 0x00, 0x00, 0x00, 
0x00, 0x00,
+    0x00, 0x00, 0x0c, 0x00, 0x16, 0x00, 0x06, 0x00, 0x05, 0x00, 0x08, 0x00, 
0x0c, 0x00,
+    0x0c, 0x00, 0x00, 0x00, 0x00, 0x03, 0x04, 0x00, 0x18, 0x00, 0x00, 0x00, 
0x10, 0x00,
+    0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x0a, 0x00, 0x18, 0x00, 
0x0c, 0x00,
+    0x04, 0x00, 0x08, 0x00, 0x0a, 0x00, 0x00, 0x00, 0x3c, 0x00, 0x00, 0x00, 
0x10, 0x00,
+    0x00, 0x00, 0x03, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 
0x00, 0x00,
+    0x02, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 
0x00, 0x00,
+    0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 
0x00, 0x00,
+    0x0c, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 
0x01, 0x00,
+    0x00, 0x00, 0x03, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 
0x00, 0x00,
+    0x00, 0x00, 0x00, 0x00, 0x01, 0x00, 0x00, 0x00, 0x02, 0x00, 0x00, 0x00, 
0x03, 0x00,
+    0x00, 0x00, 0x00, 0x00, 0x00, 0x00};
+
+static uint8_t kEndOfStream[] = {0xff, 0xff, 0xff, 0xff, 0x00, 0x00, 0x00, 
0x00};
+
+TEST(NanoarrowIpcReader, InputStreamBuffer) {
+  uint8_t input_data[] = {0x01, 0x02, 0x03, 0x04, 0x05};
+  struct ArrowBuffer input;
+  ArrowBufferInit(&input);
+  ASSERT_EQ(ArrowBufferAppend(&input, input_data, sizeof(input_data)), 
NANOARROW_OK);
+
+  struct ArrowIpcInputStream stream;
+  uint8_t output_data[] = {0xff, 0xff, 0xff, 0xff, 0xff};
+  int64_t size_read_bytes;
+
+  ASSERT_EQ(ArrowIpcInputStreamInitBuffer(&stream, &input), NANOARROW_OK);
+  EXPECT_EQ(input.data, nullptr);
+
+  EXPECT_EQ(stream.read(&stream, output_data, 2, &size_read_bytes, nullptr),
+            NANOARROW_OK);
+  EXPECT_EQ(size_read_bytes, 2);
+  uint8_t output_data1[] = {0x01, 0x02, 0xff, 0xff, 0xff};
+  EXPECT_EQ(memcmp(output_data, output_data1, sizeof(output_data)), 0);
+
+  EXPECT_EQ(stream.read(&stream, output_data + 2, 2, &size_read_bytes, 
nullptr),
+            NANOARROW_OK);
+  EXPECT_EQ(size_read_bytes, 2);
+  uint8_t output_data2[] = {0x01, 0x02, 0x03, 0x04, 0xff};
+  EXPECT_EQ(memcmp(output_data, output_data2, sizeof(output_data)), 0);
+
+  EXPECT_EQ(stream.read(&stream, output_data + 4, 2, &size_read_bytes, 
nullptr),
+            NANOARROW_OK);
+  EXPECT_EQ(size_read_bytes, 1);
+  uint8_t output_data3[] = {0x01, 0x02, 0x03, 0x04, 0x05};
+  EXPECT_EQ(memcmp(output_data, output_data3, sizeof(output_data)), 0);
+
+  EXPECT_EQ(stream.read(&stream, nullptr, 2, &size_read_bytes, nullptr), 
NANOARROW_OK);
+  EXPECT_EQ(size_read_bytes, 0);
+
+  EXPECT_EQ(stream.read(&stream, nullptr, 0, &size_read_bytes, nullptr), 
NANOARROW_OK);
+  EXPECT_EQ(size_read_bytes, 0);
+
+  stream.release(&stream);
+}
+
+TEST(NanoarrowIpcReader, InputStreamFile) {
+  uint8_t input_data[] = {0x01, 0x02, 0x03, 0x04, 0x05};
+  FILE* file_ptr = tmpfile();
+  ASSERT_NE(file_ptr, nullptr);
+  ASSERT_EQ(fwrite(input_data, 1, sizeof(input_data), file_ptr), 
sizeof(input_data));
+  fseek(file_ptr, 0, SEEK_SET);
+
+  struct ArrowIpcInputStream stream;
+  uint8_t output_data[] = {0xff, 0xff, 0xff, 0xff, 0xff};
+  int64_t size_read_bytes;
+
+  ASSERT_EQ(ArrowIpcInputStreamInitFile(&stream, file_ptr, 1), NANOARROW_OK);
+
+  EXPECT_EQ(stream.read(&stream, output_data, 2, &size_read_bytes, nullptr),
+            NANOARROW_OK);
+  EXPECT_EQ(size_read_bytes, 2);
+  uint8_t output_data1[] = {0x01, 0x02, 0xff, 0xff, 0xff};
+  EXPECT_EQ(memcmp(output_data, output_data1, sizeof(output_data)), 0);
+
+  EXPECT_EQ(stream.read(&stream, output_data + 2, 2, &size_read_bytes, 
nullptr),
+            NANOARROW_OK);
+  EXPECT_EQ(size_read_bytes, 2);
+  uint8_t output_data2[] = {0x01, 0x02, 0x03, 0x04, 0xff};
+  EXPECT_EQ(memcmp(output_data, output_data2, sizeof(output_data)), 0);
+
+  EXPECT_EQ(stream.read(&stream, output_data + 4, 2, &size_read_bytes, 
nullptr),
+            NANOARROW_OK);
+  EXPECT_EQ(size_read_bytes, 1);
+  uint8_t output_data3[] = {0x01, 0x02, 0x03, 0x04, 0x05};
+  EXPECT_EQ(memcmp(output_data, output_data3, sizeof(output_data)), 0);
+
+  EXPECT_EQ(stream.read(&stream, nullptr, 2, &size_read_bytes, nullptr), 
NANOARROW_OK);
+  EXPECT_EQ(size_read_bytes, 0);
+
+  EXPECT_EQ(stream.read(&stream, nullptr, 0, &size_read_bytes, nullptr), 
NANOARROW_OK);
+  EXPECT_EQ(size_read_bytes, 0);
+
+  stream.release(&stream);
+}
+
+TEST(NanoarrowIpcReader, StreamReaderBasic) {
+  struct ArrowBuffer input_buffer;
+  ArrowBufferInit(&input_buffer);
+  ASSERT_EQ(ArrowBufferAppend(&input_buffer, kSimpleSchema, 
sizeof(kSimpleSchema)),
+            NANOARROW_OK);
+  ASSERT_EQ(
+      ArrowBufferAppend(&input_buffer, kSimpleRecordBatch, 
sizeof(kSimpleRecordBatch)),
+      NANOARROW_OK);
+
+  struct ArrowIpcInputStream input;
+  ASSERT_EQ(ArrowIpcInputStreamInitBuffer(&input, &input_buffer), 
NANOARROW_OK);
+
+  struct ArrowArrayStream stream;
+  ASSERT_EQ(ArrowIpcArrayStreamReaderInit(&stream, &input, nullptr), 
NANOARROW_OK);
+
+  struct ArrowSchema schema;
+  ASSERT_EQ(stream.get_schema(&stream, &schema), NANOARROW_OK);
+  EXPECT_STREQ(schema.format, "+s");
+  schema.release(&schema);
+
+  struct ArrowArray array;
+  ASSERT_EQ(stream.get_next(&stream, &array), NANOARROW_OK);
+  EXPECT_EQ(array.length, 3);
+  array.release(&array);
+
+  ASSERT_EQ(stream.get_next(&stream, &array), NANOARROW_OK);
+  EXPECT_EQ(array.release, nullptr);
+
+  ASSERT_EQ(stream.get_next(&stream, &array), NANOARROW_OK);
+  EXPECT_EQ(array.release, nullptr);
+
+  stream.release(&stream);
+}
+
+TEST(NanoarrowIpcReader, StreamReaderBasicWithEndOfStream) {
+  struct ArrowBuffer input_buffer;
+  ArrowBufferInit(&input_buffer);
+  ASSERT_EQ(ArrowBufferAppend(&input_buffer, kSimpleSchema, 
sizeof(kSimpleSchema)),
+            NANOARROW_OK);
+  ASSERT_EQ(
+      ArrowBufferAppend(&input_buffer, kSimpleRecordBatch, 
sizeof(kSimpleRecordBatch)),
+      NANOARROW_OK);
+  ASSERT_EQ(ArrowBufferAppend(&input_buffer, kEndOfStream, 
sizeof(kEndOfStream)),
+            NANOARROW_OK);
+
+  struct ArrowIpcInputStream input;
+  ASSERT_EQ(ArrowIpcInputStreamInitBuffer(&input, &input_buffer), 
NANOARROW_OK);
+
+  struct ArrowArrayStream stream;
+  ASSERT_EQ(ArrowIpcArrayStreamReaderInit(&stream, &input, nullptr), 
NANOARROW_OK);
+
+  struct ArrowSchema schema;
+  ASSERT_EQ(stream.get_schema(&stream, &schema), NANOARROW_OK);
+  EXPECT_STREQ(schema.format, "+s");
+  schema.release(&schema);
+
+  struct ArrowArray array;
+  ASSERT_EQ(stream.get_next(&stream, &array), NANOARROW_OK);
+  EXPECT_EQ(array.length, 3);
+  array.release(&array);
+
+  ASSERT_EQ(stream.get_next(&stream, &array), NANOARROW_OK);
+  EXPECT_EQ(array.release, nullptr);
+
+  stream.release(&stream);
+}
+
+TEST(NanoarrowIpcReader, StreamReaderExpectedRecordBatch) {
+  struct ArrowBuffer input_buffer;
+  ArrowBufferInit(&input_buffer);
+  ASSERT_EQ(ArrowBufferAppend(&input_buffer, kSimpleSchema, 
sizeof(kSimpleSchema)),
+            NANOARROW_OK);
+  ASSERT_EQ(ArrowBufferAppend(&input_buffer, kSimpleSchema, 
sizeof(kSimpleSchema)),
+            NANOARROW_OK);
+
+  struct ArrowIpcInputStream input;
+  ASSERT_EQ(ArrowIpcInputStreamInitBuffer(&input, &input_buffer), 
NANOARROW_OK);
+
+  struct ArrowArrayStream stream;
+  ASSERT_EQ(ArrowIpcArrayStreamReaderInit(&stream, &input, nullptr), 
NANOARROW_OK);
+
+  struct ArrowSchema schema;
+  ASSERT_EQ(stream.get_schema(&stream, &schema), NANOARROW_OK);
+  EXPECT_STREQ(schema.format, "+s");
+  schema.release(&schema);
+
+  struct ArrowArray array;
+  ASSERT_EQ(stream.get_next(&stream, &array), EINVAL);
+  EXPECT_STREQ(stream.get_last_error(&stream),
+               "Unexpected message type (expected RecordBatch)");
+
+  stream.release(&stream);
+}
+
+TEST(NanoarrowIpcReader, StreamReaderExpectedSchema) {
+  struct ArrowBuffer input_buffer;
+  ArrowBufferInit(&input_buffer);
+  ASSERT_EQ(ArrowBufferAppend(&input_buffer, kSimpleRecordBatch, 
sizeof(kSimpleSchema)),
+            NANOARROW_OK);
+
+  struct ArrowIpcInputStream input;
+  ASSERT_EQ(ArrowIpcInputStreamInitBuffer(&input, &input_buffer), 
NANOARROW_OK);
+
+  struct ArrowArrayStream stream;
+  ASSERT_EQ(ArrowIpcArrayStreamReaderInit(&stream, &input, nullptr), 
NANOARROW_OK);
+
+  struct ArrowSchema schema;
+  ASSERT_EQ(stream.get_schema(&stream, &schema), EINVAL);
+  EXPECT_STREQ(stream.get_last_error(&stream),
+               "Unexpected message type at start of input (expected Schema)");
+
+  stream.release(&stream);
+}
+
+TEST(NanoarrowIpcReader, StreamReaderUnsupportedFieldIndex) {
+  struct ArrowBuffer input_buffer;
+  ArrowBufferInit(&input_buffer);
+  ASSERT_EQ(ArrowBufferAppend(&input_buffer, kSimpleSchema, 
sizeof(kSimpleSchema)),
+            NANOARROW_OK);
+  ASSERT_EQ(
+      ArrowBufferAppend(&input_buffer, kSimpleRecordBatch, 
sizeof(kSimpleRecordBatch)),
+      NANOARROW_OK);
+
+  struct ArrowIpcInputStream input;
+  ASSERT_EQ(ArrowIpcInputStreamInitBuffer(&input, &input_buffer), 
NANOARROW_OK);
+
+  struct ArrowArrayStream stream;
+  struct ArrowIpcArrayStreamReaderOptions options;
+  options.field_index = 0;
+  ASSERT_EQ(ArrowIpcArrayStreamReaderInit(&stream, &input, &options), 
NANOARROW_OK);
+
+  struct ArrowSchema schema;
+  ASSERT_EQ(stream.get_schema(&stream, &schema), ENOTSUP);
+  EXPECT_STREQ(stream.get_last_error(&stream), "Field index != -1 is not yet 
supported");
+
+  stream.release(&stream);
+}
+
+TEST(NanoarrowIpcReader, StreamReaderEmptyInput) {
+  struct ArrowBuffer input_buffer;
+  ArrowBufferInit(&input_buffer);
+
+  struct ArrowIpcInputStream input;
+  ASSERT_EQ(ArrowIpcInputStreamInitBuffer(&input, &input_buffer), 
NANOARROW_OK);
+
+  struct ArrowArrayStream stream;
+  ASSERT_EQ(ArrowIpcArrayStreamReaderInit(&stream, &input, nullptr), 
NANOARROW_OK);
+
+  struct ArrowSchema schema;
+  ASSERT_EQ(stream.get_schema(&stream, &schema), ENODATA);
+  EXPECT_STREQ(stream.get_last_error(&stream), "No data available on stream");
+
+  stream.release(&stream);
+}
+
+TEST(NanoarrowIpcReader, StreamReaderIncompletePrefix) {
+  struct ArrowBuffer input_buffer;
+  ArrowBufferInit(&input_buffer);
+  ASSERT_EQ(ArrowBufferAppendUInt8(&input_buffer, 0x00), NANOARROW_OK);
+
+  struct ArrowIpcInputStream input;
+  ASSERT_EQ(ArrowIpcInputStreamInitBuffer(&input, &input_buffer), 
NANOARROW_OK);
+
+  struct ArrowArrayStream stream;
+  ASSERT_EQ(ArrowIpcArrayStreamReaderInit(&stream, &input, nullptr), 
NANOARROW_OK);
+
+  struct ArrowSchema schema;
+  ASSERT_EQ(stream.get_schema(&stream, &schema), EINVAL);
+  EXPECT_STREQ(stream.get_last_error(&stream),
+               "Expected at least 8 bytes in remainder of stream");
+
+  stream.release(&stream);
+}


Reply via email to