This is an automated email from the ASF dual-hosted git repository.

paleolimbot pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/arrow-nanoarrow.git


The following commit(s) were added to refs/heads/main by this push:
     new 36025e2  [C] Add convenience functions (#80)
36025e2 is described below

commit 36025e25999301f77b0447bee93e00055ff89541
Author: Dewey Dunnington <[email protected]>
AuthorDate: Fri Dec 2 14:58:01 2022 -0400

    [C] Add convenience functions (#80)
    
    * Add ArrowBufferAppendString|BufferView
    
    * add Move functions
    
    * first stab at array stream implementation
    
    * test passes with basic array stream
    
    * maybe fix bundled build and namespaced build
    
    * add a bitmap mover
    
    * add an arrayview mover
    
    * ...and actually use it
    
    * more consistent names for buffer mover
    
    * more array stream tests
    
    * r wrapper for the basic array stream
    
    * test validation
    
    * test validation in C
    
    * Update src/nanoarrow/nanoarrow.h
    
    Co-authored-by: David Li <[email protected]>
    
    Co-authored-by: David Li <[email protected]>
---
 CMakeLists.txt                       |   6 ++
 r/NAMESPACE                          |   1 +
 r/R/array-stream.R                   |  35 ++++++++
 r/man/basic_array_stream.Rd          |  30 +++++++
 r/src/array_stream.c                 |  37 ++++++++-
 r/src/init.c                         |   2 +
 r/src/pointers.c                     |  14 +---
 r/tests/testthat/test-array-stream.R |  53 ++++++++++++
 src/nanoarrow/array_inline.h         |   6 ++
 src/nanoarrow/array_stream.c         | 152 +++++++++++++++++++++++++++++++++++
 src/nanoarrow/array_stream_test.cc   | 125 ++++++++++++++++++++++++++++
 src/nanoarrow/array_test.cc          |  16 ++++
 src/nanoarrow/buffer_inline.h        |  25 ++++--
 src/nanoarrow/buffer_test.cc         |  31 +++++++
 src/nanoarrow/nanoarrow.h            |  67 ++++++++++++++-
 src/nanoarrow/nanoarrow.hpp          |  16 ++--
 src/nanoarrow/nanoarrow_types.h      |  19 +++++
 17 files changed, 604 insertions(+), 31 deletions(-)

diff --git a/CMakeLists.txt b/CMakeLists.txt
index 2bea056..29e3978 100644
--- a/CMakeLists.txt
+++ b/CMakeLists.txt
@@ -79,6 +79,8 @@ if(NANOARROW_BUNDLE)
     file(APPEND ${NANOARROW_C_TEMP} "${SRC_FILE_CONTENTS}")
     file(READ src/nanoarrow/array.c SRC_FILE_CONTENTS)
     file(APPEND ${NANOARROW_C_TEMP} "${SRC_FILE_CONTENTS}")
+    file(READ src/nanoarrow/array_stream.c SRC_FILE_CONTENTS)
+    file(APPEND ${NANOARROW_C_TEMP} "${SRC_FILE_CONTENTS}")
 
     # Add a library that the tests can link against (but don't install it)
     if(NANOARROW_BUILD_TESTS)
@@ -93,6 +95,7 @@ else()
         nanoarrow
         src/nanoarrow/array.c
         src/nanoarrow/schema.c
+        src/nanoarrow/array_stream.c
         src/nanoarrow/utils.c)
 
     target_include_directories(nanoarrow PUBLIC
@@ -154,6 +157,7 @@ if(NANOARROW_BUILD_TESTS)
     add_executable(buffer_test src/nanoarrow/buffer_test.cc)
     add_executable(array_test src/nanoarrow/array_test.cc)
     add_executable(schema_test src/nanoarrow/schema_test.cc)
+    add_executable(array_stream_test src/nanoarrow/array_stream_test.cc)
     add_executable(nanoarrow_hpp_test src/nanoarrow/nanoarrow_hpp_test.cc)
 
     if(NANOARROW_CODE_COVERAGE)
@@ -168,6 +172,7 @@ if(NANOARROW_BUILD_TESTS)
     target_link_libraries(buffer_test nanoarrow GTest::gtest_main)
     target_link_libraries(array_test nanoarrow GTest::gtest_main arrow_shared)
     target_link_libraries(schema_test nanoarrow GTest::gtest_main arrow_shared)
+    target_link_libraries(array_stream_test nanoarrow GTest::gtest_main)
     target_link_libraries(nanoarrow_hpp_test nanoarrow GTest::gtest_main)
 
     include(GoogleTest)
@@ -175,5 +180,6 @@ if(NANOARROW_BUILD_TESTS)
     gtest_discover_tests(buffer_test)
     gtest_discover_tests(array_test)
     gtest_discover_tests(schema_test)
+    gtest_discover_tests(array_stream_test)
     gtest_discover_tests(nanoarrow_hpp_test)
 endif()
diff --git a/r/NAMESPACE b/r/NAMESPACE
index 1213a36..897fa1d 100644
--- a/r/NAMESPACE
+++ b/r/NAMESPACE
@@ -49,6 +49,7 @@ S3method(str,nanoarrow_schema)
 export(as_nanoarrow_array)
 export(as_nanoarrow_array_stream)
 export(as_nanoarrow_schema)
+export(basic_array_stream)
 export(convert_array)
 export(convert_array_stream)
 export(infer_nanoarrow_ptype)
diff --git a/r/R/array-stream.R b/r/R/array-stream.R
index 6ea7c39..12a9dd0 100644
--- a/r/R/array-stream.R
+++ b/r/R/array-stream.R
@@ -15,6 +15,41 @@
 # specific language governing permissions and limitations
 # under the License.
 
+#' Create ArrayStreams from batches
+#'
+#' @param batches A [list()] of [nanoarrow_array][as_nanoarrow_array] objects
+#'   or objects that can be coerced via [as_nanoarrow_array()].
+#' @param schema A [nanoarrow_schema][as_nanoarrow_schema] or `NULL` to guess
+#'   based on the first schema.
+#' @param validate Use `FALSE` to skip the validation step (i.e., if you
+#'   know that the arrays are valid).
+#'
+#' @return An [nanoarrow_array_stream][as_nanoarrow_array_stream]
+#' @export
+#'
+#' @examples
+#' (stream <- basic_array_stream(list(data.frame(a = 1, b = 2))))
+#' as.data.frame(stream$get_next())
+#' stream$get_next()
+#'
+basic_array_stream <- function(batches, schema = NULL, validate = TRUE) {
+  # Error for everything except a bare list (e.g., so that calling with
+  # a data.frame() does not unintentionally loop over columns)
+  if (!identical(class(batches), "list")) {
+    stop("`batches` must be an unclassed `list()`")
+  }
+
+  batches <- lapply(batches, as_nanoarrow_array, schema = schema)
+
+  if (is.null(schema) && length(batches) > 0) {
+    schema <- infer_nanoarrow_schema(batches[[1]])
+  } else if (is.null(schema)) {
+    stop("Can't infer schema from first batch if there are zero batches")
+  }
+
+  .Call(nanoarrow_c_basic_array_stream, batches, schema, validate)
+}
+
 #' Convert an object to a nanoarrow array_stream
 #'
 #' In nanoarrow, an 'array stream' corresponds to the `struct ArrowArrayStream`
diff --git a/r/man/basic_array_stream.Rd b/r/man/basic_array_stream.Rd
new file mode 100644
index 0000000..8d850d7
--- /dev/null
+++ b/r/man/basic_array_stream.Rd
@@ -0,0 +1,30 @@
+% Generated by roxygen2: do not edit by hand
+% Please edit documentation in R/array-stream.R
+\name{basic_array_stream}
+\alias{basic_array_stream}
+\title{Create ArrayStreams from batches}
+\usage{
+basic_array_stream(batches, schema = NULL, validate = TRUE)
+}
+\arguments{
+\item{batches}{A \code{\link[=list]{list()}} of 
\link[=as_nanoarrow_array]{nanoarrow_array} objects
+or objects that can be coerced via 
\code{\link[=as_nanoarrow_array]{as_nanoarrow_array()}}.}
+
+\item{schema}{A \link[=as_nanoarrow_schema]{nanoarrow_schema} or \code{NULL} 
to guess
+based on the first schema.}
+
+\item{validate}{Use \code{FALSE} to skip the validation step (i.e., if you
+know that the arrays are valid).}
+}
+\value{
+An \link[=as_nanoarrow_array_stream]{nanoarrow_array_stream}
+}
+\description{
+Create ArrayStreams from batches
+}
+\examples{
+(stream <- basic_array_stream(list(data.frame(a = 1, b = 2))))
+as.data.frame(stream$get_next())
+stream$get_next()
+
+}
diff --git a/r/src/array_stream.c b/r/src/array_stream.c
index 7182fe2..0d879ca 100644
--- a/r/src/array_stream.c
+++ b/r/src/array_stream.c
@@ -19,10 +19,10 @@
 #include <R.h>
 #include <Rinternals.h>
 
-#include "array_stream.h"
-#include "schema.h"
 #include "array.h"
+#include "array_stream.h"
 #include "nanoarrow.h"
+#include "schema.h"
 
 void finalize_array_stream_xptr(SEXP array_stream_xptr) {
   struct ArrowArrayStream* array_stream =
@@ -61,7 +61,7 @@ SEXP nanoarrow_c_array_stream_get_next(SEXP 
array_stream_xptr) {
   SEXP array_xptr = PROTECT(array_owning_xptr());
   struct ArrowArray* array = (struct ArrowArray*)R_ExternalPtrAddr(array_xptr);
   int result = array_stream->get_next(array_stream, array);
-  
+
   if (result != 0) {
     const char* last_error = array_stream->get_last_error(array_stream);
     if (last_error == NULL) {
@@ -73,3 +73,34 @@ SEXP nanoarrow_c_array_stream_get_next(SEXP 
array_stream_xptr) {
   UNPROTECT(1);
   return array_xptr;
 }
+
+SEXP nanoarrow_c_basic_array_stream(SEXP batches_sexp, SEXP schema_xptr,
+                                    SEXP validate_sexp) {
+  int validate = LOGICAL(validate_sexp)[0];
+  struct ArrowSchema* schema = schema_from_xptr(schema_xptr);
+
+  SEXP array_stream_xptr = PROTECT(array_stream_owning_xptr());
+  struct ArrowArrayStream* array_stream =
+      (struct ArrowArrayStream*)R_ExternalPtrAddr(array_stream_xptr);
+
+  int64_t n_arrays = Rf_xlength(batches_sexp);
+  if (ArrowBasicArrayStreamInit(array_stream, schema, n_arrays) != 
NANOARROW_OK) {
+    Rf_error("Failed to initialize array stream");
+  }
+
+  struct ArrowArray array;
+  for (int64_t i = 0; i < n_arrays; i++) {
+    array_export(VECTOR_ELT(batches_sexp, i), &array);
+    ArrowBasicArrayStreamSetArray(array_stream, i, &array);
+  }
+
+  if (validate) {
+    struct ArrowError error;
+    if (ArrowBasicArrayStreamValidate(array_stream, &error) != NANOARROW_OK) {
+      Rf_error("ArrowBasicArrayStreamValidate(): %s", 
ArrowErrorMessage(&error));
+    }
+  }
+
+  UNPROTECT(1);
+  return array_stream_xptr;
+}
diff --git a/r/src/init.c b/r/src/init.c
index 1827fe7..2d42ae8 100644
--- a/r/src/init.c
+++ b/r/src/init.c
@@ -29,6 +29,7 @@ extern SEXP nanoarrow_c_altrep_is_materialized(SEXP x_sexp);
 extern SEXP nanoarrow_c_altrep_force_materialize(SEXP x_sexp, SEXP 
recursive_sexp);
 extern SEXP nanoarrow_c_array_stream_get_schema(SEXP array_stream_xptr);
 extern SEXP nanoarrow_c_array_stream_get_next(SEXP array_stream_xptr);
+extern SEXP nanoarrow_c_basic_array_stream(SEXP batches_sexp, SEXP 
schema_xptr, SEXP validate_sexp);
 extern SEXP nanoarrow_c_array_view(SEXP array_xptr, SEXP schema_xptr);
 extern SEXP nanoarrow_c_array_set_schema(SEXP array_xptr, SEXP schema_xptr, 
SEXP validate_sexp);
 extern SEXP nanoarrow_c_infer_schema_array(SEXP array_xptr);
@@ -63,6 +64,7 @@ static const R_CallMethodDef CallEntries[] = {
     {"nanoarrow_c_altrep_force_materialize", 
(DL_FUNC)&nanoarrow_c_altrep_force_materialize, 2},
     {"nanoarrow_c_array_stream_get_schema", 
(DL_FUNC)&nanoarrow_c_array_stream_get_schema, 1},
     {"nanoarrow_c_array_stream_get_next", 
(DL_FUNC)&nanoarrow_c_array_stream_get_next, 1},
+    {"nanoarrow_c_basic_array_stream", 
(DL_FUNC)&nanoarrow_c_basic_array_stream, 3},
     {"nanoarrow_c_array_view", (DL_FUNC)&nanoarrow_c_array_view, 2},
     {"nanoarrow_c_array_set_schema", (DL_FUNC)&nanoarrow_c_array_set_schema, 
3},
     {"nanoarrow_c_infer_schema_array", 
(DL_FUNC)&nanoarrow_c_infer_schema_array, 1},
diff --git a/r/src/pointers.c b/r/src/pointers.c
index 8d118ab..f55cd38 100644
--- a/r/src/pointers.c
+++ b/r/src/pointers.c
@@ -137,10 +137,8 @@ SEXP nanoarrow_c_pointer_move(SEXP ptr_src, SEXP ptr_dst) {
     if (obj_src == NULL || obj_src->release == NULL) {
       Rf_error("`ptr_src` is not a valid struct ArrowSchema");
     }
-
-    memcpy(obj_dst, obj_src, sizeof(struct ArrowSchema));
-    obj_src->release = NULL;
-
+    
+    ArrowSchemaMove(obj_src, obj_dst);
   } else if (Rf_inherits(ptr_dst, "nanoarrow_array")) {
     struct ArrowArray* obj_dst = (struct 
ArrowArray*)R_ExternalPtrAddr(ptr_dst);
     if (obj_dst == NULL) {
@@ -156,9 +154,7 @@ SEXP nanoarrow_c_pointer_move(SEXP ptr_src, SEXP ptr_dst) {
       Rf_error("`ptr_src` is not a valid struct ArrowArray");
     }
 
-    memcpy(obj_dst, obj_src, sizeof(struct ArrowArray));
-    obj_src->release = NULL;
-
+    ArrowArrayMove(obj_src, obj_dst);
   } else if (Rf_inherits(ptr_dst, "nanoarrow_array_stream")) {
     struct ArrowArrayStream* obj_dst =
         (struct ArrowArrayStream*)R_ExternalPtrAddr(ptr_dst);
@@ -176,9 +172,7 @@ SEXP nanoarrow_c_pointer_move(SEXP ptr_src, SEXP ptr_dst) {
       Rf_error("`ptr_src` is not a valid struct ArrowArrayStream");
     }
 
-    memcpy(obj_dst, obj_src, sizeof(struct ArrowArrayStream));
-    obj_src->release = NULL;
-
+    ArrowArrayStreamMove(obj_src, obj_dst);
   } else {
     Rf_error(
         "`ptr_dst` must inherit from 'nanoarrow_schema', 'nanoarrow_array', or 
"
diff --git a/r/tests/testthat/test-array-stream.R 
b/r/tests/testthat/test-array-stream.R
index cc0e243..53f0732 100644
--- a/r/tests/testthat/test-array-stream.R
+++ b/r/tests/testthat/test-array-stream.R
@@ -15,6 +15,59 @@
 # specific language governing permissions and limitations
 # under the License.
 
+test_that("basic_array_stream() can create empty streams", {
+  stream <- basic_array_stream(list(), infer_nanoarrow_schema(integer()))
+  expect_identical(stream$get_schema()$format, "i")
+  expect_null(stream$get_next())
+
+  expect_error(
+    basic_array_stream(list()),
+    "Can't infer schema from first batch if there are zero batches"
+  )
+})
+
+test_that("basic_array_stream() can create streams from batches", {
+  stream <- basic_array_stream(
+    list(
+      data.frame(a = 1, b = "two", stringsAsFactors = FALSE),
+      data.frame(a = 2, b = "three", stringsAsFactors = FALSE)
+    )
+  )
+
+  expect_identical(stream$get_schema()$format, "+s")
+  expect_identical(
+    as.data.frame(stream$get_next()),
+    data.frame(a = 1, b = "two", stringsAsFactors = FALSE)
+  )
+  expect_identical(
+    as.data.frame(stream$get_next()),
+    data.frame(a = 2, b = "three", stringsAsFactors = FALSE)
+  )
+  expect_null(stream$get_next())
+})
+
+test_that("basic_array_stream() can validate input or skip validation", {
+  invalid_stream <- basic_array_stream(
+    list(
+      as_nanoarrow_array(1:5),
+      as_nanoarrow_array(data.frame(a = 1:5))
+    ),
+    validate = FALSE
+  )
+  expect_s3_class(invalid_stream, "nanoarrow_array_stream")
+
+  expect_error(
+    basic_array_stream(
+      list(
+        as_nanoarrow_array(1:5),
+        as_nanoarrow_array(data.frame(a = 1:5))
+      ),
+      validate = TRUE
+    ),
+    "Expected array with 2 buffer"
+  )
+})
+
 test_that("nanoarrow_array_stream format, print, and str methods work", {
   array_stream <- as_nanoarrow_array_stream(data.frame(x = 1:10))
   expect_identical(format(array_stream), "<nanoarrow_array_stream struct<x: 
int32>>")
diff --git a/src/nanoarrow/array_inline.h b/src/nanoarrow/array_inline.h
index 751955d..d020a2b 100644
--- a/src/nanoarrow/array_inline.h
+++ b/src/nanoarrow/array_inline.h
@@ -440,6 +440,12 @@ static inline ArrowErrorCode 
ArrowArrayFinishElement(struct ArrowArray* array) {
   return NANOARROW_OK;
 }
 
+static inline void ArrowArrayViewMove(struct ArrowArrayView* src,
+                                      struct ArrowArrayView* dst) {
+  memcpy(dst, src, sizeof(struct ArrowArrayView));
+  ArrowArrayViewInit(src, NANOARROW_TYPE_UNINITIALIZED);
+}
+
 static inline int8_t ArrowArrayViewIsNull(struct ArrowArrayView* array_view, 
int64_t i) {
   const uint8_t* validity_buffer = array_view->buffer_views[0].data.as_uint8;
   i += array_view->array->offset;
diff --git a/src/nanoarrow/array_stream.c b/src/nanoarrow/array_stream.c
new file mode 100644
index 0000000..07917af
--- /dev/null
+++ b/src/nanoarrow/array_stream.c
@@ -0,0 +1,152 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include <errno.h>
+
+#include "nanoarrow.h"
+
+struct BasicArrayStreamPrivate {
+  struct ArrowSchema schema;
+  int64_t n_arrays;
+  struct ArrowArray* arrays;
+  int64_t arrays_i;
+};
+
+static int ArrowBasicArrayStreamGetSchema(struct ArrowArrayStream* 
array_stream,
+                                          struct ArrowSchema* schema) {
+  if (array_stream == NULL || array_stream->release == NULL) {
+    return EINVAL;
+  }
+
+  struct BasicArrayStreamPrivate* private =
+      (struct BasicArrayStreamPrivate*)array_stream->private_data;
+  return ArrowSchemaDeepCopy(&private->schema, schema);
+}
+
+static int ArrowBasicArrayStreamGetNext(struct ArrowArrayStream* array_stream,
+                                        struct ArrowArray* array) {
+  if (array_stream == NULL || array_stream->release == NULL) {
+    return EINVAL;
+  }
+
+  struct BasicArrayStreamPrivate* private =
+      (struct BasicArrayStreamPrivate*)array_stream->private_data;
+
+  if (private->arrays_i == private->n_arrays) {
+    array->release = NULL;
+    return NANOARROW_OK;
+  }
+
+  ArrowArrayMove(&private->arrays[private->arrays_i++], array);
+  return NANOARROW_OK;
+}
+
+static const char* ArrowBasicArrayStreamGetLastError(
+    struct ArrowArrayStream* array_stream) {
+  return NULL;
+}
+
+static void ArrowBasicArrayStreamRelease(struct ArrowArrayStream* 
array_stream) {
+  if (array_stream == NULL || array_stream->release == NULL) {
+    return;
+  }
+
+  struct BasicArrayStreamPrivate* private =
+      (struct BasicArrayStreamPrivate*)array_stream->private_data;
+
+  if (private->schema.release != NULL) {
+    private->schema.release(&private->schema);
+  }
+
+  for (int64_t i = 0; i < private->n_arrays; i++) {
+    if (private->arrays[i].release != NULL) {
+      private->arrays[i].release(&private->arrays[i]);
+    }
+  }
+
+  if (private->arrays != NULL) {
+    ArrowFree(private->arrays);
+  }
+
+  ArrowFree(private);
+  array_stream->release = NULL;
+}
+
+ArrowErrorCode ArrowBasicArrayStreamInit(struct ArrowArrayStream* array_stream,
+                                         struct ArrowSchema* schema, int64_t 
n_arrays) {
+  struct BasicArrayStreamPrivate* private = (struct 
BasicArrayStreamPrivate*)ArrowMalloc(
+      sizeof(struct BasicArrayStreamPrivate));
+  if (private == NULL) {
+    return ENOMEM;
+  }
+
+  ArrowSchemaMove(schema, &private->schema);
+
+  private->n_arrays = n_arrays;
+  private->arrays = NULL;
+  private->arrays_i = 0;
+
+  if (n_arrays > 0) {
+    private->arrays =
+        (struct ArrowArray*)ArrowMalloc(n_arrays * sizeof(struct ArrowArray));
+    if (private->arrays == NULL) {
+      ArrowBasicArrayStreamRelease(array_stream);
+      return ENOMEM;
+    }
+  }
+
+  for (int64_t i = 0; i < private->n_arrays; i++) {
+    private->arrays[i].release = NULL;
+  }
+
+  array_stream->get_schema = &ArrowBasicArrayStreamGetSchema;
+  array_stream->get_next = &ArrowBasicArrayStreamGetNext;
+  array_stream->get_last_error = ArrowBasicArrayStreamGetLastError;
+  array_stream->release = ArrowBasicArrayStreamRelease;
+  array_stream->private_data = private;
+  return NANOARROW_OK;
+}
+
+void ArrowBasicArrayStreamSetArray(struct ArrowArrayStream* array_stream, 
int64_t i,
+                                   struct ArrowArray* array) {
+  struct BasicArrayStreamPrivate* private =
+      (struct BasicArrayStreamPrivate*)array_stream->private_data;
+  ArrowArrayMove(array, &private->arrays[i]);
+}
+
+ArrowErrorCode ArrowBasicArrayStreamValidate(struct ArrowArrayStream* 
array_stream,
+                                             struct ArrowError* error) {
+  struct BasicArrayStreamPrivate* private =
+      (struct BasicArrayStreamPrivate*)array_stream->private_data;
+
+  struct ArrowArrayView array_view;
+  NANOARROW_RETURN_NOT_OK(
+      ArrowArrayViewInitFromSchema(&array_view, &private->schema, error));
+
+  for (int64_t i = 0; i < private->n_arrays; i++) {
+    if (private->arrays[i].release != NULL) {
+      int result = ArrowArrayViewSetArray(&array_view, &private->arrays[i], 
error);
+      if (result != NANOARROW_OK) {
+        ArrowArrayViewReset(&array_view);
+        return result;
+      }
+    }
+  }
+
+  ArrowArrayViewReset(&array_view);
+  return NANOARROW_OK;
+}
diff --git a/src/nanoarrow/array_stream_test.cc 
b/src/nanoarrow/array_stream_test.cc
new file mode 100644
index 0000000..26d49ad
--- /dev/null
+++ b/src/nanoarrow/array_stream_test.cc
@@ -0,0 +1,125 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include <gtest/gtest.h>
+
+#include "nanoarrow/nanoarrow.h"
+
+TEST(ArrayStreamTest, ArrayStreamTestBasic) {
+  struct ArrowArrayStream array_stream;
+  struct ArrowArray array;
+  struct ArrowSchema schema;
+
+  ASSERT_EQ(ArrowSchemaInit(&schema, NANOARROW_TYPE_INT32), NANOARROW_OK);
+  EXPECT_EQ(ArrowBasicArrayStreamInit(&array_stream, &schema, 1), 
NANOARROW_OK);
+  EXPECT_EQ(schema.release, nullptr);
+
+  ASSERT_EQ(ArrowArrayInit(&array, NANOARROW_TYPE_INT32), NANOARROW_OK);
+  ASSERT_EQ(ArrowArrayStartAppending(&array), NANOARROW_OK);
+  ASSERT_EQ(ArrowArrayAppendInt(&array, 123), NANOARROW_OK);
+  ASSERT_EQ(ArrowArrayFinishBuilding(&array, nullptr), NANOARROW_OK);
+
+  ArrowBasicArrayStreamSetArray(&array_stream, 0, &array);
+  EXPECT_EQ(array.release, nullptr);
+
+  EXPECT_EQ(ArrowBasicArrayStreamValidate(&array_stream, nullptr), 
NANOARROW_OK);
+
+  struct ArrowSchema schema_copy;
+  EXPECT_EQ(array_stream.get_schema(&array_stream, &schema_copy), 
NANOARROW_OK);
+  EXPECT_STREQ(schema_copy.format, "i");
+  schema_copy.release(&schema_copy);
+
+  struct ArrowArray array_copy;
+  EXPECT_EQ(array_stream.get_next(&array_stream, &array_copy), NANOARROW_OK);
+  EXPECT_EQ(array_copy.length, 1);
+  EXPECT_EQ(array_copy.n_buffers, 2);
+  array_copy.release(&array_copy);
+
+  EXPECT_EQ(array_stream.get_next(&array_stream, &array_copy), NANOARROW_OK);
+  EXPECT_EQ(array_copy.release, nullptr);
+
+  EXPECT_EQ(array_stream.get_last_error(&array_stream), nullptr);
+
+  array_stream.release(&array_stream);
+  EXPECT_EQ(array_stream.release, nullptr);
+}
+
+TEST(ArrayStreamTest, ArrayStreamTestEmpty) {
+  struct ArrowArrayStream array_stream;
+  struct ArrowArray array;
+  struct ArrowSchema schema;
+
+  ASSERT_EQ(ArrowSchemaInit(&schema, NANOARROW_TYPE_INT32), NANOARROW_OK);
+  EXPECT_EQ(ArrowBasicArrayStreamInit(&array_stream, &schema, 0), 
NANOARROW_OK);
+  EXPECT_EQ(ArrowBasicArrayStreamValidate(&array_stream, nullptr), 
NANOARROW_OK);
+  
+  for (int i = 0; i < 5; i++) {
+    EXPECT_EQ(array_stream.get_next(&array_stream, &array), NANOARROW_OK);
+    EXPECT_EQ(array.release, nullptr);
+  }
+
+  array_stream.release(&array_stream);
+}
+
+TEST(ArrayStreamTest, ArrayStreamTestIncomplete) {
+  struct ArrowArrayStream array_stream;
+  struct ArrowArray array;
+  struct ArrowSchema schema;
+
+  ASSERT_EQ(ArrowSchemaInit(&schema, NANOARROW_TYPE_INT32), NANOARROW_OK);
+  ASSERT_EQ(ArrowBasicArrayStreamInit(&array_stream, &schema, 5), 
NANOARROW_OK);
+
+  // Add five arrays with length == i
+  for (int i = 0; i < 5; i++) {
+    ASSERT_EQ(ArrowArrayInit(&array, NANOARROW_TYPE_INT32), NANOARROW_OK);
+    ASSERT_EQ(ArrowArrayStartAppending(&array), NANOARROW_OK);
+    for (int j = 0; j < i; j++) {
+      ASSERT_EQ(ArrowArrayAppendInt(&array, 123), NANOARROW_OK);
+    }
+    ASSERT_EQ(ArrowArrayFinishBuilding(&array, nullptr), NANOARROW_OK);
+    ArrowBasicArrayStreamSetArray(&array_stream, i, &array);
+  }
+
+  // Pull only one of them
+  EXPECT_EQ(array_stream.get_next(&array_stream, &array), NANOARROW_OK);
+  EXPECT_EQ(array.length, 0);
+  array.release(&array);
+
+  // The remaining arrays, owned by the stream, should be released here
+  array_stream.release(&array_stream);
+}
+
+TEST(ArrayStreamTest, ArrayStreamTestInvalid) {
+  struct ArrowArrayStream array_stream;
+  struct ArrowArray array;
+  struct ArrowSchema schema;
+  struct ArrowError error;
+
+  ASSERT_EQ(ArrowSchemaInit(&schema, NANOARROW_TYPE_INT32), NANOARROW_OK);
+  ASSERT_EQ(ArrowBasicArrayStreamInit(&array_stream, &schema, 1), 
NANOARROW_OK);
+
+  ASSERT_EQ(ArrowArrayInit(&array, NANOARROW_TYPE_STRING), NANOARROW_OK);
+  ASSERT_EQ(ArrowArrayStartAppending(&array), NANOARROW_OK);
+  ASSERT_EQ(ArrowArrayFinishBuilding(&array, nullptr), NANOARROW_OK);
+  ArrowBasicArrayStreamSetArray(&array_stream, 0, &array);
+
+  EXPECT_EQ(ArrowBasicArrayStreamValidate(&array_stream, &error), EINVAL);
+  EXPECT_STREQ(ArrowErrorMessage(&error),
+               "Expected array with 2 buffer(s) but found 3 buffer(s)");
+
+  array_stream.release(&array_stream);
+}
diff --git a/src/nanoarrow/array_test.cc b/src/nanoarrow/array_test.cc
index 2696a5e..9ab3b9c 100644
--- a/src/nanoarrow/array_test.cc
+++ b/src/nanoarrow/array_test.cc
@@ -1127,6 +1127,22 @@ TEST(ArrayTest, ArrayViewTestBasic) {
   ArrowArrayViewReset(&array_view);
 }
 
+TEST(ArrayTest, ArrayViewTestMove) {
+  struct ArrowArrayView array_view;
+  ArrowArrayViewInit(&array_view, NANOARROW_TYPE_STRING);
+  ASSERT_EQ(array_view.storage_type, NANOARROW_TYPE_STRING);
+
+  struct ArrowArrayView array_view2;
+  ArrowArrayViewInit(&array_view2, NANOARROW_TYPE_UNINITIALIZED);
+  ASSERT_EQ(array_view2.storage_type, NANOARROW_TYPE_UNINITIALIZED);
+
+  ArrowArrayViewMove(&array_view, &array_view2);
+  EXPECT_EQ(array_view.storage_type, NANOARROW_TYPE_UNINITIALIZED);
+  EXPECT_EQ(array_view2.storage_type, NANOARROW_TYPE_STRING);
+
+  ArrowArrayViewReset(&array_view2);
+}
+
 TEST(ArrayTest, ArrayViewTestString) {
   struct ArrowArrayView array_view;
   struct ArrowError error;
diff --git a/src/nanoarrow/buffer_inline.h b/src/nanoarrow/buffer_inline.h
index 1d8c02c..9a83ceb 100644
--- a/src/nanoarrow/buffer_inline.h
+++ b/src/nanoarrow/buffer_inline.h
@@ -65,11 +65,10 @@ static inline void ArrowBufferReset(struct ArrowBuffer* 
buffer) {
   buffer->size_bytes = 0;
 }
 
-static inline void ArrowBufferMove(struct ArrowBuffer* buffer,
-                                   struct ArrowBuffer* buffer_out) {
-  memcpy(buffer_out, buffer, sizeof(struct ArrowBuffer));
-  buffer->data = NULL;
-  ArrowBufferReset(buffer);
+static inline void ArrowBufferMove(struct ArrowBuffer* src, struct 
ArrowBuffer* dst) {
+  memcpy(dst, src, sizeof(struct ArrowBuffer));
+  src->data = NULL;
+  ArrowBufferReset(src);
 }
 
 static inline ArrowErrorCode ArrowBufferResize(struct ArrowBuffer* buffer,
@@ -176,6 +175,16 @@ static inline ArrowErrorCode ArrowBufferAppendFloat(struct 
ArrowBuffer* buffer,
   return ArrowBufferAppend(buffer, &value, sizeof(float));
 }
 
+static inline ArrowErrorCode ArrowBufferAppendStringView(struct ArrowBuffer* 
buffer,
+                                                         struct 
ArrowStringView value) {
+  return ArrowBufferAppend(buffer, value.data, value.n_bytes);
+}
+
+static inline ArrowErrorCode ArrowBufferAppendBufferView(struct ArrowBuffer* 
buffer,
+                                                         struct 
ArrowBufferView value) {
+  return ArrowBufferAppend(buffer, value.data.data, value.n_bytes);
+}
+
 static inline ArrowErrorCode ArrowBufferAppendFill(struct ArrowBuffer* buffer,
                                                    uint8_t value, int64_t 
size_bytes) {
   NANOARROW_RETURN_NOT_OK(ArrowBufferReserve(buffer, size_bytes));
@@ -326,6 +335,12 @@ static inline void ArrowBitmapInit(struct ArrowBitmap* 
bitmap) {
   bitmap->size_bits = 0;
 }
 
+static inline void ArrowBitmapMove(struct ArrowBitmap* src, struct 
ArrowBitmap* dst) {
+  ArrowBufferMove(&src->buffer, &dst->buffer);
+  dst->size_bits = src->size_bits;
+  src->size_bits = 0;
+}
+
 static inline ArrowErrorCode ArrowBitmapReserve(struct ArrowBitmap* bitmap,
                                                 int64_t additional_size_bits) {
   int64_t min_capacity_bits = bitmap->size_bits + additional_size_bits;
diff --git a/src/nanoarrow/buffer_test.cc b/src/nanoarrow/buffer_test.cc
index 76c925d..78c5bdc 100644
--- a/src/nanoarrow/buffer_test.cc
+++ b/src/nanoarrow/buffer_test.cc
@@ -222,6 +222,19 @@ TEST(BufferTest, BufferTestAppendHelpers) {
   EXPECT_EQ(ArrowBufferAppendFloat(&buffer, 123), NANOARROW_OK);
   EXPECT_EQ(reinterpret_cast<float*>(buffer.data)[0], 123);
   ArrowBufferReset(&buffer);
+
+  EXPECT_EQ(ArrowBufferAppendStringView(&buffer, ArrowCharView("a")), 
NANOARROW_OK);
+  EXPECT_EQ(reinterpret_cast<char*>(buffer.data)[0], 'a');
+  EXPECT_EQ(buffer.size_bytes, 1);
+  ArrowBufferReset(&buffer);
+
+  struct ArrowBufferView buffer_view;
+  buffer_view.data.data = "a";
+  buffer_view.n_bytes = 1;
+  EXPECT_EQ(ArrowBufferAppendBufferView(&buffer, buffer_view), NANOARROW_OK);
+  EXPECT_EQ(reinterpret_cast<char*>(buffer.data)[0], 'a');
+  EXPECT_EQ(buffer.size_bytes, 1);
+  ArrowBufferReset(&buffer);
 }
 
 TEST(BitmapTest, BitmapTestElement) {
@@ -366,6 +379,24 @@ TEST(BitmapTest, BitmapTestAppend) {
   ArrowBitmapReset(&bitmap);
 }
 
+TEST(BitmapTest, BitmapTestMove) {
+  struct ArrowBitmap bitmap;
+  ArrowBitmapInit(&bitmap);
+  ASSERT_EQ(ArrowBitmapAppend(&bitmap, 1, 1), NANOARROW_OK);
+  ASSERT_NE(bitmap.buffer.data, nullptr);
+  ASSERT_EQ(bitmap.size_bits, 1);
+
+  struct ArrowBitmap bitmap2;
+  bitmap2.buffer.data = NULL;
+  ArrowBitmapMove(&bitmap, &bitmap2);
+  EXPECT_EQ(bitmap.buffer.data, nullptr);
+  EXPECT_EQ(bitmap.size_bits, 0);
+  EXPECT_NE(bitmap2.buffer.data, nullptr);
+  EXPECT_EQ(bitmap2.size_bits, 1);
+
+  ArrowBitmapReset(&bitmap2);
+}
+
 TEST(BitmapTest, BitmapTestResize) {
   struct ArrowBitmap bitmap;
   ArrowBitmapInit(&bitmap);
diff --git a/src/nanoarrow/nanoarrow.h b/src/nanoarrow/nanoarrow.h
index 4dc8e68..4bea678 100644
--- a/src/nanoarrow/nanoarrow.h
+++ b/src/nanoarrow/nanoarrow.h
@@ -105,6 +105,12 @@
 #define ArrowArrayViewSetArray \
   NANOARROW_SYMBOL(NANOARROW_NAMESPACE, ArrowArrayViewSetArray)
 #define ArrowArrayViewReset NANOARROW_SYMBOL(NANOARROW_NAMESPACE, 
ArrowArrayViewReset)
+#define ArrowBasicArrayStreamInit \
+  NANOARROW_SYMBOL(NANOARROW_NAMESPACE, ArrowBasicArrayStreamInit)
+#define ArrowBasicArrayStreamSetArray \
+  NANOARROW_SYMBOL(NANOARROW_NAMESPACE, ArrowBasicArrayStreamSetArray)
+#define ArrowBasicArrayStreamValidate \
+  NANOARROW_SYMBOL(NANOARROW_NAMESPACE, ArrowBasicArrayStreamValidate)
 
 #endif
 
@@ -483,8 +489,7 @@ static inline void ArrowBufferReset(struct ArrowBuffer* 
buffer);
 ///
 /// Transfers the buffer data and lifecycle management to another
 /// address and resets buffer.
-static inline void ArrowBufferMove(struct ArrowBuffer* buffer,
-                                   struct ArrowBuffer* buffer_out);
+static inline void ArrowBufferMove(struct ArrowBuffer* src, struct 
ArrowBuffer* dst);
 
 /// \brief Grow or shrink a buffer to a given capacity
 ///
@@ -564,6 +569,14 @@ static inline ArrowErrorCode 
ArrowBufferAppendDouble(struct ArrowBuffer* buffer,
 static inline ArrowErrorCode ArrowBufferAppendFloat(struct ArrowBuffer* buffer,
                                                     float value);
 
+/// \brief Write an ArrowStringView to a buffer
+static inline ArrowErrorCode ArrowBufferAppendStringView(struct ArrowBuffer* 
buffer,
+                                                         struct 
ArrowStringView value);
+
+/// \brief Write an ArrowBufferView to a buffer
+static inline ArrowErrorCode ArrowBufferAppendBufferView(struct ArrowBuffer* 
buffer,
+                                                         struct 
ArrowBufferView value);
+
 /// @}
 
 /// \defgroup nanoarrow-bitmap Bitmap utilities
@@ -594,6 +607,12 @@ static inline int64_t ArrowBitCountSet(const uint8_t* 
bits, int64_t i_from, int6
 /// Initialize the builder's buffer, empty its cache, and reset the size to 
zero
 static inline void ArrowBitmapInit(struct ArrowBitmap* bitmap);
 
+/// \brief Move an ArrowBitmap
+///
+/// Transfers the underlying buffer data and lifecycle management to another
+/// address and resets the bitmap.
+static inline void ArrowBitmapMove(struct ArrowBitmap* src, struct 
ArrowBitmap* dst);
+
 /// \brief Ensure a bitmap builder has at least a given additional capacity
 ///
 /// Ensures that the buffer has space to append at least
@@ -792,6 +811,13 @@ ArrowErrorCode ArrowArrayFinishBuilding(struct ArrowArray* 
array,
 /// \brief Initialize the contents of an ArrowArrayView
 void ArrowArrayViewInit(struct ArrowArrayView* array_view, enum ArrowType 
storage_type);
 
+/// \brief Move an ArrowArrayView
+///
+/// Transfers the ArrowArrayView data and lifecycle management to another
+/// address and resets the contents of src.
+static inline void ArrowArrayViewMove(struct ArrowArrayView* src,
+                                      struct ArrowArrayView* dst);
+
 /// \brief Initialize the contents of an ArrowArrayView from an ArrowSchema
 ArrowErrorCode ArrowArrayViewInitFromSchema(struct ArrowArrayView* array_view,
                                             struct ArrowSchema* schema,
@@ -851,6 +877,43 @@ static inline struct ArrowBufferView 
ArrowArrayViewGetBytesUnsafe(
 
 /// @}
 
+/// \defgroup nanoarrow-basic-array-stream Basic ArrowArrayStream 
implementation
+///
+/// An implementation of an ArrowArrayStream based on a collection of
+/// zero or more previously-existing ArrowArray objects. Users should
+/// initialize and/or validate the contents before transferring the
+/// responsibility of the ArrowArrayStream elsewhere.
+///
+/// @{
+
+/// \brief Initialize an ArrowArrayStream backed by this implementation
+///
+/// This function moves the ownership of schema to the array_stream. If
+/// this function returns NANOARROW_OK, the caller is responsible for
+/// releasing the ArrowArrayStream.
+ArrowErrorCode ArrowBasicArrayStreamInit(struct ArrowArrayStream* array_stream,
+                                         struct ArrowSchema* schema, int64_t 
n_arrays);
+
+/// \brief Set the ith ArrowArray in this ArrowArrayStream.
+///
+/// array_stream must have been initialized with ArrowBasicArrayStreamInit().
+/// This function move the ownership of array to the array_stream. i must
+/// be greater than zero and less than the value of n_arrays passed in
+/// ArrowBasicArrayStreamInit(). Callers are not required to fill all
+/// n_arrays members (i.e., n_arrays is a maximum bound).
+void ArrowBasicArrayStreamSetArray(struct ArrowArrayStream* array_stream, 
int64_t i,
+                                   struct ArrowArray* array);
+
+/// \brief Validate the contents of this ArrowArrayStream
+///
+/// array_stream must have been initialized with ArrowBasicArrayStreamInit().
+/// This function uses ArrowArrayStreamInitFromSchema() and 
ArrowArrayStreamSetArray()
+/// to validate the contents of the arrays.
+ArrowErrorCode ArrowBasicArrayStreamValidate(struct ArrowArrayStream* 
array_stream,
+                                             struct ArrowError* error);
+
+/// @}
+
 // Inline function definitions
 #include "array_inline.h"
 #include "buffer_inline.h"
diff --git a/src/nanoarrow/nanoarrow.hpp b/src/nanoarrow/nanoarrow.hpp
index 992253a..ad851ea 100644
--- a/src/nanoarrow/nanoarrow.hpp
+++ b/src/nanoarrow/nanoarrow.hpp
@@ -40,8 +40,7 @@ namespace internal {
 static inline void init_pointer(struct ArrowSchema* data) { data->release = 
nullptr; }
 
 static inline void move_pointer(struct ArrowSchema* src, struct ArrowSchema* 
dst) {
-  memcpy(dst, src, sizeof(struct ArrowSchema));
-  src->release = nullptr;
+  ArrowSchemaMove(src, dst);
 }
 
 static inline void release_pointer(struct ArrowSchema* data) {
@@ -53,8 +52,7 @@ static inline void release_pointer(struct ArrowSchema* data) {
 static inline void init_pointer(struct ArrowArray* data) { data->release = 
nullptr; }
 
 static inline void move_pointer(struct ArrowArray* src, struct ArrowArray* 
dst) {
-  memcpy(dst, src, sizeof(struct ArrowArray));
-  src->release = nullptr;
+  ArrowArrayMove(src, dst);
 }
 
 static inline void release_pointer(struct ArrowArray* data) {
@@ -69,8 +67,7 @@ static inline void init_pointer(struct ArrowArrayStream* 
data) {
 
 static inline void move_pointer(struct ArrowArrayStream* src,
                                 struct ArrowArrayStream* dst) {
-  memcpy(dst, src, sizeof(struct ArrowArrayStream));
-  src->release = nullptr;
+  ArrowArrayStreamMove(src, dst);
 }
 
 static inline void release_pointer(ArrowArrayStream* data) {
@@ -90,9 +87,7 @@ static inline void release_pointer(struct ArrowBuffer* data) 
{ ArrowBufferReset(
 static inline void init_pointer(struct ArrowBitmap* data) { 
ArrowBitmapInit(data); }
 
 static inline void move_pointer(struct ArrowBitmap* src, struct ArrowBitmap* 
dst) {
-  ArrowBufferMove(&src->buffer, &dst->buffer);
-  dst->size_bits = src->size_bits;
-  src->size_bits = 0;
+  ArrowBitmapMove(src, dst);
 }
 
 static inline void release_pointer(struct ArrowBitmap* data) { 
ArrowBitmapReset(data); }
@@ -102,8 +97,7 @@ static inline void init_pointer(struct ArrowArrayView* data) 
{
 }
 
 static inline void move_pointer(struct ArrowArrayView* src, struct 
ArrowArrayView* dst) {
-  memcpy(dst, src, sizeof(struct ArrowArrayView));
-  init_pointer(src);
+  ArrowArrayViewMove(src, dst);
 }
 
 static inline void release_pointer(struct ArrowArrayView* data) {
diff --git a/src/nanoarrow/nanoarrow_types.h b/src/nanoarrow/nanoarrow_types.h
index f62d358..def60b7 100644
--- a/src/nanoarrow/nanoarrow_types.h
+++ b/src/nanoarrow/nanoarrow_types.h
@@ -125,6 +125,25 @@ struct ArrowArrayStream {
 #endif  // ARROW_C_STREAM_INTERFACE
 #endif  // ARROW_FLAG_DICTIONARY_ORDERED
 
+/// \brief Move the contents of src into dst and set src->release to NULL
+static inline void ArrowSchemaMove(struct ArrowSchema* src, struct 
ArrowSchema* dst) {
+  memcpy(dst, src, sizeof(struct ArrowSchema));
+  src->release = NULL;
+}
+
+/// \brief Move the contents of src into dst and set src->release to NULL
+static inline void ArrowArrayMove(struct ArrowArray* src, struct ArrowArray* 
dst) {
+  memcpy(dst, src, sizeof(struct ArrowArray));
+  src->release = NULL;
+}
+
+/// \brief Move the contents of src into dst and set src->release to NULL
+static inline void ArrowArrayStreamMove(struct ArrowArrayStream* src,
+                                        struct ArrowArrayStream* dst) {
+  memcpy(dst, src, sizeof(struct ArrowArrayStream));
+  src->release = NULL;
+}
+
 /// @}
 
 // Utility macros


Reply via email to