This is an automated email from the ASF dual-hosted git repository.
paleolimbot pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/arrow-nanoarrow.git
The following commit(s) were added to refs/heads/main by this push:
new 36025e2 [C] Add convenience functions (#80)
36025e2 is described below
commit 36025e25999301f77b0447bee93e00055ff89541
Author: Dewey Dunnington <[email protected]>
AuthorDate: Fri Dec 2 14:58:01 2022 -0400
[C] Add convenience functions (#80)
* Add ArrowBufferAppendString|BufferView
* add Move functions
* first stab at array stream implementation
* test passes with basic array stream
* maybe fix bundled build and namespaced build
* add a bitmap mover
* add an arrayview mover
* ...and actually use it
* more consistent names for buffer mover
* more array stream tests
* r wrapper for the basic array stream
* test validation
* test validation in C
* Update src/nanoarrow/nanoarrow.h
Co-authored-by: David Li <[email protected]>
Co-authored-by: David Li <[email protected]>
---
CMakeLists.txt | 6 ++
r/NAMESPACE | 1 +
r/R/array-stream.R | 35 ++++++++
r/man/basic_array_stream.Rd | 30 +++++++
r/src/array_stream.c | 37 ++++++++-
r/src/init.c | 2 +
r/src/pointers.c | 14 +---
r/tests/testthat/test-array-stream.R | 53 ++++++++++++
src/nanoarrow/array_inline.h | 6 ++
src/nanoarrow/array_stream.c | 152 +++++++++++++++++++++++++++++++++++
src/nanoarrow/array_stream_test.cc | 125 ++++++++++++++++++++++++++++
src/nanoarrow/array_test.cc | 16 ++++
src/nanoarrow/buffer_inline.h | 25 ++++--
src/nanoarrow/buffer_test.cc | 31 +++++++
src/nanoarrow/nanoarrow.h | 67 ++++++++++++++-
src/nanoarrow/nanoarrow.hpp | 16 ++--
src/nanoarrow/nanoarrow_types.h | 19 +++++
17 files changed, 604 insertions(+), 31 deletions(-)
diff --git a/CMakeLists.txt b/CMakeLists.txt
index 2bea056..29e3978 100644
--- a/CMakeLists.txt
+++ b/CMakeLists.txt
@@ -79,6 +79,8 @@ if(NANOARROW_BUNDLE)
file(APPEND ${NANOARROW_C_TEMP} "${SRC_FILE_CONTENTS}")
file(READ src/nanoarrow/array.c SRC_FILE_CONTENTS)
file(APPEND ${NANOARROW_C_TEMP} "${SRC_FILE_CONTENTS}")
+ file(READ src/nanoarrow/array_stream.c SRC_FILE_CONTENTS)
+ file(APPEND ${NANOARROW_C_TEMP} "${SRC_FILE_CONTENTS}")
# Add a library that the tests can link against (but don't install it)
if(NANOARROW_BUILD_TESTS)
@@ -93,6 +95,7 @@ else()
nanoarrow
src/nanoarrow/array.c
src/nanoarrow/schema.c
+ src/nanoarrow/array_stream.c
src/nanoarrow/utils.c)
target_include_directories(nanoarrow PUBLIC
@@ -154,6 +157,7 @@ if(NANOARROW_BUILD_TESTS)
add_executable(buffer_test src/nanoarrow/buffer_test.cc)
add_executable(array_test src/nanoarrow/array_test.cc)
add_executable(schema_test src/nanoarrow/schema_test.cc)
+ add_executable(array_stream_test src/nanoarrow/array_stream_test.cc)
add_executable(nanoarrow_hpp_test src/nanoarrow/nanoarrow_hpp_test.cc)
if(NANOARROW_CODE_COVERAGE)
@@ -168,6 +172,7 @@ if(NANOARROW_BUILD_TESTS)
target_link_libraries(buffer_test nanoarrow GTest::gtest_main)
target_link_libraries(array_test nanoarrow GTest::gtest_main arrow_shared)
target_link_libraries(schema_test nanoarrow GTest::gtest_main arrow_shared)
+ target_link_libraries(array_stream_test nanoarrow GTest::gtest_main)
target_link_libraries(nanoarrow_hpp_test nanoarrow GTest::gtest_main)
include(GoogleTest)
@@ -175,5 +180,6 @@ if(NANOARROW_BUILD_TESTS)
gtest_discover_tests(buffer_test)
gtest_discover_tests(array_test)
gtest_discover_tests(schema_test)
+ gtest_discover_tests(array_stream_test)
gtest_discover_tests(nanoarrow_hpp_test)
endif()
diff --git a/r/NAMESPACE b/r/NAMESPACE
index 1213a36..897fa1d 100644
--- a/r/NAMESPACE
+++ b/r/NAMESPACE
@@ -49,6 +49,7 @@ S3method(str,nanoarrow_schema)
export(as_nanoarrow_array)
export(as_nanoarrow_array_stream)
export(as_nanoarrow_schema)
+export(basic_array_stream)
export(convert_array)
export(convert_array_stream)
export(infer_nanoarrow_ptype)
diff --git a/r/R/array-stream.R b/r/R/array-stream.R
index 6ea7c39..12a9dd0 100644
--- a/r/R/array-stream.R
+++ b/r/R/array-stream.R
@@ -15,6 +15,41 @@
# specific language governing permissions and limitations
# under the License.
+#' Create ArrayStreams from batches
+#'
+#' @param batches A [list()] of [nanoarrow_array][as_nanoarrow_array] objects
+#' or objects that can be coerced via [as_nanoarrow_array()].
+#' @param schema A [nanoarrow_schema][as_nanoarrow_schema] or `NULL` to guess
+#' based on the first schema.
+#' @param validate Use `FALSE` to skip the validation step (i.e., if you
+#' know that the arrays are valid).
+#'
+#' @return An [nanoarrow_array_stream][as_nanoarrow_array_stream]
+#' @export
+#'
+#' @examples
+#' (stream <- basic_array_stream(list(data.frame(a = 1, b = 2))))
+#' as.data.frame(stream$get_next())
+#' stream$get_next()
+#'
+basic_array_stream <- function(batches, schema = NULL, validate = TRUE) {
+ # Error for everything except a bare list (e.g., so that calling with
+ # a data.frame() does not unintentionally loop over columns)
+ if (!identical(class(batches), "list")) {
+ stop("`batches` must be an unclassed `list()`")
+ }
+
+ batches <- lapply(batches, as_nanoarrow_array, schema = schema)
+
+ if (is.null(schema) && length(batches) > 0) {
+ schema <- infer_nanoarrow_schema(batches[[1]])
+ } else if (is.null(schema)) {
+ stop("Can't infer schema from first batch if there are zero batches")
+ }
+
+ .Call(nanoarrow_c_basic_array_stream, batches, schema, validate)
+}
+
#' Convert an object to a nanoarrow array_stream
#'
#' In nanoarrow, an 'array stream' corresponds to the `struct ArrowArrayStream`
diff --git a/r/man/basic_array_stream.Rd b/r/man/basic_array_stream.Rd
new file mode 100644
index 0000000..8d850d7
--- /dev/null
+++ b/r/man/basic_array_stream.Rd
@@ -0,0 +1,30 @@
+% Generated by roxygen2: do not edit by hand
+% Please edit documentation in R/array-stream.R
+\name{basic_array_stream}
+\alias{basic_array_stream}
+\title{Create ArrayStreams from batches}
+\usage{
+basic_array_stream(batches, schema = NULL, validate = TRUE)
+}
+\arguments{
+\item{batches}{A \code{\link[=list]{list()}} of
\link[=as_nanoarrow_array]{nanoarrow_array} objects
+or objects that can be coerced via
\code{\link[=as_nanoarrow_array]{as_nanoarrow_array()}}.}
+
+\item{schema}{A \link[=as_nanoarrow_schema]{nanoarrow_schema} or \code{NULL}
to guess
+based on the first schema.}
+
+\item{validate}{Use \code{FALSE} to skip the validation step (i.e., if you
+know that the arrays are valid).}
+}
+\value{
+An \link[=as_nanoarrow_array_stream]{nanoarrow_array_stream}
+}
+\description{
+Create ArrayStreams from batches
+}
+\examples{
+(stream <- basic_array_stream(list(data.frame(a = 1, b = 2))))
+as.data.frame(stream$get_next())
+stream$get_next()
+
+}
diff --git a/r/src/array_stream.c b/r/src/array_stream.c
index 7182fe2..0d879ca 100644
--- a/r/src/array_stream.c
+++ b/r/src/array_stream.c
@@ -19,10 +19,10 @@
#include <R.h>
#include <Rinternals.h>
-#include "array_stream.h"
-#include "schema.h"
#include "array.h"
+#include "array_stream.h"
#include "nanoarrow.h"
+#include "schema.h"
void finalize_array_stream_xptr(SEXP array_stream_xptr) {
struct ArrowArrayStream* array_stream =
@@ -61,7 +61,7 @@ SEXP nanoarrow_c_array_stream_get_next(SEXP
array_stream_xptr) {
SEXP array_xptr = PROTECT(array_owning_xptr());
struct ArrowArray* array = (struct ArrowArray*)R_ExternalPtrAddr(array_xptr);
int result = array_stream->get_next(array_stream, array);
-
+
if (result != 0) {
const char* last_error = array_stream->get_last_error(array_stream);
if (last_error == NULL) {
@@ -73,3 +73,34 @@ SEXP nanoarrow_c_array_stream_get_next(SEXP
array_stream_xptr) {
UNPROTECT(1);
return array_xptr;
}
+
+SEXP nanoarrow_c_basic_array_stream(SEXP batches_sexp, SEXP schema_xptr,
+ SEXP validate_sexp) {
+ int validate = LOGICAL(validate_sexp)[0];
+ struct ArrowSchema* schema = schema_from_xptr(schema_xptr);
+
+ SEXP array_stream_xptr = PROTECT(array_stream_owning_xptr());
+ struct ArrowArrayStream* array_stream =
+ (struct ArrowArrayStream*)R_ExternalPtrAddr(array_stream_xptr);
+
+ int64_t n_arrays = Rf_xlength(batches_sexp);
+ if (ArrowBasicArrayStreamInit(array_stream, schema, n_arrays) !=
NANOARROW_OK) {
+ Rf_error("Failed to initialize array stream");
+ }
+
+ struct ArrowArray array;
+ for (int64_t i = 0; i < n_arrays; i++) {
+ array_export(VECTOR_ELT(batches_sexp, i), &array);
+ ArrowBasicArrayStreamSetArray(array_stream, i, &array);
+ }
+
+ if (validate) {
+ struct ArrowError error;
+ if (ArrowBasicArrayStreamValidate(array_stream, &error) != NANOARROW_OK) {
+ Rf_error("ArrowBasicArrayStreamValidate(): %s",
ArrowErrorMessage(&error));
+ }
+ }
+
+ UNPROTECT(1);
+ return array_stream_xptr;
+}
diff --git a/r/src/init.c b/r/src/init.c
index 1827fe7..2d42ae8 100644
--- a/r/src/init.c
+++ b/r/src/init.c
@@ -29,6 +29,7 @@ extern SEXP nanoarrow_c_altrep_is_materialized(SEXP x_sexp);
extern SEXP nanoarrow_c_altrep_force_materialize(SEXP x_sexp, SEXP
recursive_sexp);
extern SEXP nanoarrow_c_array_stream_get_schema(SEXP array_stream_xptr);
extern SEXP nanoarrow_c_array_stream_get_next(SEXP array_stream_xptr);
+extern SEXP nanoarrow_c_basic_array_stream(SEXP batches_sexp, SEXP
schema_xptr, SEXP validate_sexp);
extern SEXP nanoarrow_c_array_view(SEXP array_xptr, SEXP schema_xptr);
extern SEXP nanoarrow_c_array_set_schema(SEXP array_xptr, SEXP schema_xptr,
SEXP validate_sexp);
extern SEXP nanoarrow_c_infer_schema_array(SEXP array_xptr);
@@ -63,6 +64,7 @@ static const R_CallMethodDef CallEntries[] = {
{"nanoarrow_c_altrep_force_materialize",
(DL_FUNC)&nanoarrow_c_altrep_force_materialize, 2},
{"nanoarrow_c_array_stream_get_schema",
(DL_FUNC)&nanoarrow_c_array_stream_get_schema, 1},
{"nanoarrow_c_array_stream_get_next",
(DL_FUNC)&nanoarrow_c_array_stream_get_next, 1},
+ {"nanoarrow_c_basic_array_stream",
(DL_FUNC)&nanoarrow_c_basic_array_stream, 3},
{"nanoarrow_c_array_view", (DL_FUNC)&nanoarrow_c_array_view, 2},
{"nanoarrow_c_array_set_schema", (DL_FUNC)&nanoarrow_c_array_set_schema,
3},
{"nanoarrow_c_infer_schema_array",
(DL_FUNC)&nanoarrow_c_infer_schema_array, 1},
diff --git a/r/src/pointers.c b/r/src/pointers.c
index 8d118ab..f55cd38 100644
--- a/r/src/pointers.c
+++ b/r/src/pointers.c
@@ -137,10 +137,8 @@ SEXP nanoarrow_c_pointer_move(SEXP ptr_src, SEXP ptr_dst) {
if (obj_src == NULL || obj_src->release == NULL) {
Rf_error("`ptr_src` is not a valid struct ArrowSchema");
}
-
- memcpy(obj_dst, obj_src, sizeof(struct ArrowSchema));
- obj_src->release = NULL;
-
+
+ ArrowSchemaMove(obj_src, obj_dst);
} else if (Rf_inherits(ptr_dst, "nanoarrow_array")) {
struct ArrowArray* obj_dst = (struct
ArrowArray*)R_ExternalPtrAddr(ptr_dst);
if (obj_dst == NULL) {
@@ -156,9 +154,7 @@ SEXP nanoarrow_c_pointer_move(SEXP ptr_src, SEXP ptr_dst) {
Rf_error("`ptr_src` is not a valid struct ArrowArray");
}
- memcpy(obj_dst, obj_src, sizeof(struct ArrowArray));
- obj_src->release = NULL;
-
+ ArrowArrayMove(obj_src, obj_dst);
} else if (Rf_inherits(ptr_dst, "nanoarrow_array_stream")) {
struct ArrowArrayStream* obj_dst =
(struct ArrowArrayStream*)R_ExternalPtrAddr(ptr_dst);
@@ -176,9 +172,7 @@ SEXP nanoarrow_c_pointer_move(SEXP ptr_src, SEXP ptr_dst) {
Rf_error("`ptr_src` is not a valid struct ArrowArrayStream");
}
- memcpy(obj_dst, obj_src, sizeof(struct ArrowArrayStream));
- obj_src->release = NULL;
-
+ ArrowArrayStreamMove(obj_src, obj_dst);
} else {
Rf_error(
"`ptr_dst` must inherit from 'nanoarrow_schema', 'nanoarrow_array', or
"
diff --git a/r/tests/testthat/test-array-stream.R
b/r/tests/testthat/test-array-stream.R
index cc0e243..53f0732 100644
--- a/r/tests/testthat/test-array-stream.R
+++ b/r/tests/testthat/test-array-stream.R
@@ -15,6 +15,59 @@
# specific language governing permissions and limitations
# under the License.
+test_that("basic_array_stream() can create empty streams", {
+ stream <- basic_array_stream(list(), infer_nanoarrow_schema(integer()))
+ expect_identical(stream$get_schema()$format, "i")
+ expect_null(stream$get_next())
+
+ expect_error(
+ basic_array_stream(list()),
+ "Can't infer schema from first batch if there are zero batches"
+ )
+})
+
+test_that("basic_array_stream() can create streams from batches", {
+ stream <- basic_array_stream(
+ list(
+ data.frame(a = 1, b = "two", stringsAsFactors = FALSE),
+ data.frame(a = 2, b = "three", stringsAsFactors = FALSE)
+ )
+ )
+
+ expect_identical(stream$get_schema()$format, "+s")
+ expect_identical(
+ as.data.frame(stream$get_next()),
+ data.frame(a = 1, b = "two", stringsAsFactors = FALSE)
+ )
+ expect_identical(
+ as.data.frame(stream$get_next()),
+ data.frame(a = 2, b = "three", stringsAsFactors = FALSE)
+ )
+ expect_null(stream$get_next())
+})
+
+test_that("basic_array_stream() can validate input or skip validation", {
+ invalid_stream <- basic_array_stream(
+ list(
+ as_nanoarrow_array(1:5),
+ as_nanoarrow_array(data.frame(a = 1:5))
+ ),
+ validate = FALSE
+ )
+ expect_s3_class(invalid_stream, "nanoarrow_array_stream")
+
+ expect_error(
+ basic_array_stream(
+ list(
+ as_nanoarrow_array(1:5),
+ as_nanoarrow_array(data.frame(a = 1:5))
+ ),
+ validate = TRUE
+ ),
+ "Expected array with 2 buffer"
+ )
+})
+
test_that("nanoarrow_array_stream format, print, and str methods work", {
array_stream <- as_nanoarrow_array_stream(data.frame(x = 1:10))
expect_identical(format(array_stream), "<nanoarrow_array_stream struct<x:
int32>>")
diff --git a/src/nanoarrow/array_inline.h b/src/nanoarrow/array_inline.h
index 751955d..d020a2b 100644
--- a/src/nanoarrow/array_inline.h
+++ b/src/nanoarrow/array_inline.h
@@ -440,6 +440,12 @@ static inline ArrowErrorCode
ArrowArrayFinishElement(struct ArrowArray* array) {
return NANOARROW_OK;
}
+static inline void ArrowArrayViewMove(struct ArrowArrayView* src,
+ struct ArrowArrayView* dst) {
+ memcpy(dst, src, sizeof(struct ArrowArrayView));
+ ArrowArrayViewInit(src, NANOARROW_TYPE_UNINITIALIZED);
+}
+
static inline int8_t ArrowArrayViewIsNull(struct ArrowArrayView* array_view,
int64_t i) {
const uint8_t* validity_buffer = array_view->buffer_views[0].data.as_uint8;
i += array_view->array->offset;
diff --git a/src/nanoarrow/array_stream.c b/src/nanoarrow/array_stream.c
new file mode 100644
index 0000000..07917af
--- /dev/null
+++ b/src/nanoarrow/array_stream.c
@@ -0,0 +1,152 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include <errno.h>
+
+#include "nanoarrow.h"
+
+struct BasicArrayStreamPrivate {
+ struct ArrowSchema schema;
+ int64_t n_arrays;
+ struct ArrowArray* arrays;
+ int64_t arrays_i;
+};
+
+static int ArrowBasicArrayStreamGetSchema(struct ArrowArrayStream*
array_stream,
+ struct ArrowSchema* schema) {
+ if (array_stream == NULL || array_stream->release == NULL) {
+ return EINVAL;
+ }
+
+ struct BasicArrayStreamPrivate* private =
+ (struct BasicArrayStreamPrivate*)array_stream->private_data;
+ return ArrowSchemaDeepCopy(&private->schema, schema);
+}
+
+static int ArrowBasicArrayStreamGetNext(struct ArrowArrayStream* array_stream,
+ struct ArrowArray* array) {
+ if (array_stream == NULL || array_stream->release == NULL) {
+ return EINVAL;
+ }
+
+ struct BasicArrayStreamPrivate* private =
+ (struct BasicArrayStreamPrivate*)array_stream->private_data;
+
+ if (private->arrays_i == private->n_arrays) {
+ array->release = NULL;
+ return NANOARROW_OK;
+ }
+
+ ArrowArrayMove(&private->arrays[private->arrays_i++], array);
+ return NANOARROW_OK;
+}
+
+static const char* ArrowBasicArrayStreamGetLastError(
+ struct ArrowArrayStream* array_stream) {
+ return NULL;
+}
+
+static void ArrowBasicArrayStreamRelease(struct ArrowArrayStream*
array_stream) {
+ if (array_stream == NULL || array_stream->release == NULL) {
+ return;
+ }
+
+ struct BasicArrayStreamPrivate* private =
+ (struct BasicArrayStreamPrivate*)array_stream->private_data;
+
+ if (private->schema.release != NULL) {
+ private->schema.release(&private->schema);
+ }
+
+ for (int64_t i = 0; i < private->n_arrays; i++) {
+ if (private->arrays[i].release != NULL) {
+ private->arrays[i].release(&private->arrays[i]);
+ }
+ }
+
+ if (private->arrays != NULL) {
+ ArrowFree(private->arrays);
+ }
+
+ ArrowFree(private);
+ array_stream->release = NULL;
+}
+
+ArrowErrorCode ArrowBasicArrayStreamInit(struct ArrowArrayStream* array_stream,
+ struct ArrowSchema* schema, int64_t
n_arrays) {
+ struct BasicArrayStreamPrivate* private = (struct
BasicArrayStreamPrivate*)ArrowMalloc(
+ sizeof(struct BasicArrayStreamPrivate));
+ if (private == NULL) {
+ return ENOMEM;
+ }
+
+ ArrowSchemaMove(schema, &private->schema);
+
+ private->n_arrays = n_arrays;
+ private->arrays = NULL;
+ private->arrays_i = 0;
+
+ if (n_arrays > 0) {
+ private->arrays =
+ (struct ArrowArray*)ArrowMalloc(n_arrays * sizeof(struct ArrowArray));
+ if (private->arrays == NULL) {
+ ArrowBasicArrayStreamRelease(array_stream);
+ return ENOMEM;
+ }
+ }
+
+ for (int64_t i = 0; i < private->n_arrays; i++) {
+ private->arrays[i].release = NULL;
+ }
+
+ array_stream->get_schema = &ArrowBasicArrayStreamGetSchema;
+ array_stream->get_next = &ArrowBasicArrayStreamGetNext;
+ array_stream->get_last_error = ArrowBasicArrayStreamGetLastError;
+ array_stream->release = ArrowBasicArrayStreamRelease;
+ array_stream->private_data = private;
+ return NANOARROW_OK;
+}
+
+void ArrowBasicArrayStreamSetArray(struct ArrowArrayStream* array_stream,
int64_t i,
+ struct ArrowArray* array) {
+ struct BasicArrayStreamPrivate* private =
+ (struct BasicArrayStreamPrivate*)array_stream->private_data;
+ ArrowArrayMove(array, &private->arrays[i]);
+}
+
+ArrowErrorCode ArrowBasicArrayStreamValidate(struct ArrowArrayStream*
array_stream,
+ struct ArrowError* error) {
+ struct BasicArrayStreamPrivate* private =
+ (struct BasicArrayStreamPrivate*)array_stream->private_data;
+
+ struct ArrowArrayView array_view;
+ NANOARROW_RETURN_NOT_OK(
+ ArrowArrayViewInitFromSchema(&array_view, &private->schema, error));
+
+ for (int64_t i = 0; i < private->n_arrays; i++) {
+ if (private->arrays[i].release != NULL) {
+ int result = ArrowArrayViewSetArray(&array_view, &private->arrays[i],
error);
+ if (result != NANOARROW_OK) {
+ ArrowArrayViewReset(&array_view);
+ return result;
+ }
+ }
+ }
+
+ ArrowArrayViewReset(&array_view);
+ return NANOARROW_OK;
+}
diff --git a/src/nanoarrow/array_stream_test.cc
b/src/nanoarrow/array_stream_test.cc
new file mode 100644
index 0000000..26d49ad
--- /dev/null
+++ b/src/nanoarrow/array_stream_test.cc
@@ -0,0 +1,125 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include <gtest/gtest.h>
+
+#include "nanoarrow/nanoarrow.h"
+
+TEST(ArrayStreamTest, ArrayStreamTestBasic) {
+ struct ArrowArrayStream array_stream;
+ struct ArrowArray array;
+ struct ArrowSchema schema;
+
+ ASSERT_EQ(ArrowSchemaInit(&schema, NANOARROW_TYPE_INT32), NANOARROW_OK);
+ EXPECT_EQ(ArrowBasicArrayStreamInit(&array_stream, &schema, 1),
NANOARROW_OK);
+ EXPECT_EQ(schema.release, nullptr);
+
+ ASSERT_EQ(ArrowArrayInit(&array, NANOARROW_TYPE_INT32), NANOARROW_OK);
+ ASSERT_EQ(ArrowArrayStartAppending(&array), NANOARROW_OK);
+ ASSERT_EQ(ArrowArrayAppendInt(&array, 123), NANOARROW_OK);
+ ASSERT_EQ(ArrowArrayFinishBuilding(&array, nullptr), NANOARROW_OK);
+
+ ArrowBasicArrayStreamSetArray(&array_stream, 0, &array);
+ EXPECT_EQ(array.release, nullptr);
+
+ EXPECT_EQ(ArrowBasicArrayStreamValidate(&array_stream, nullptr),
NANOARROW_OK);
+
+ struct ArrowSchema schema_copy;
+ EXPECT_EQ(array_stream.get_schema(&array_stream, &schema_copy),
NANOARROW_OK);
+ EXPECT_STREQ(schema_copy.format, "i");
+ schema_copy.release(&schema_copy);
+
+ struct ArrowArray array_copy;
+ EXPECT_EQ(array_stream.get_next(&array_stream, &array_copy), NANOARROW_OK);
+ EXPECT_EQ(array_copy.length, 1);
+ EXPECT_EQ(array_copy.n_buffers, 2);
+ array_copy.release(&array_copy);
+
+ EXPECT_EQ(array_stream.get_next(&array_stream, &array_copy), NANOARROW_OK);
+ EXPECT_EQ(array_copy.release, nullptr);
+
+ EXPECT_EQ(array_stream.get_last_error(&array_stream), nullptr);
+
+ array_stream.release(&array_stream);
+ EXPECT_EQ(array_stream.release, nullptr);
+}
+
+TEST(ArrayStreamTest, ArrayStreamTestEmpty) {
+ struct ArrowArrayStream array_stream;
+ struct ArrowArray array;
+ struct ArrowSchema schema;
+
+ ASSERT_EQ(ArrowSchemaInit(&schema, NANOARROW_TYPE_INT32), NANOARROW_OK);
+ EXPECT_EQ(ArrowBasicArrayStreamInit(&array_stream, &schema, 0),
NANOARROW_OK);
+ EXPECT_EQ(ArrowBasicArrayStreamValidate(&array_stream, nullptr),
NANOARROW_OK);
+
+ for (int i = 0; i < 5; i++) {
+ EXPECT_EQ(array_stream.get_next(&array_stream, &array), NANOARROW_OK);
+ EXPECT_EQ(array.release, nullptr);
+ }
+
+ array_stream.release(&array_stream);
+}
+
+TEST(ArrayStreamTest, ArrayStreamTestIncomplete) {
+ struct ArrowArrayStream array_stream;
+ struct ArrowArray array;
+ struct ArrowSchema schema;
+
+ ASSERT_EQ(ArrowSchemaInit(&schema, NANOARROW_TYPE_INT32), NANOARROW_OK);
+ ASSERT_EQ(ArrowBasicArrayStreamInit(&array_stream, &schema, 5),
NANOARROW_OK);
+
+ // Add five arrays with length == i
+ for (int i = 0; i < 5; i++) {
+ ASSERT_EQ(ArrowArrayInit(&array, NANOARROW_TYPE_INT32), NANOARROW_OK);
+ ASSERT_EQ(ArrowArrayStartAppending(&array), NANOARROW_OK);
+ for (int j = 0; j < i; j++) {
+ ASSERT_EQ(ArrowArrayAppendInt(&array, 123), NANOARROW_OK);
+ }
+ ASSERT_EQ(ArrowArrayFinishBuilding(&array, nullptr), NANOARROW_OK);
+ ArrowBasicArrayStreamSetArray(&array_stream, i, &array);
+ }
+
+ // Pull only one of them
+ EXPECT_EQ(array_stream.get_next(&array_stream, &array), NANOARROW_OK);
+ EXPECT_EQ(array.length, 0);
+ array.release(&array);
+
+ // The remaining arrays, owned by the stream, should be released here
+ array_stream.release(&array_stream);
+}
+
+TEST(ArrayStreamTest, ArrayStreamTestInvalid) {
+ struct ArrowArrayStream array_stream;
+ struct ArrowArray array;
+ struct ArrowSchema schema;
+ struct ArrowError error;
+
+ ASSERT_EQ(ArrowSchemaInit(&schema, NANOARROW_TYPE_INT32), NANOARROW_OK);
+ ASSERT_EQ(ArrowBasicArrayStreamInit(&array_stream, &schema, 1),
NANOARROW_OK);
+
+ ASSERT_EQ(ArrowArrayInit(&array, NANOARROW_TYPE_STRING), NANOARROW_OK);
+ ASSERT_EQ(ArrowArrayStartAppending(&array), NANOARROW_OK);
+ ASSERT_EQ(ArrowArrayFinishBuilding(&array, nullptr), NANOARROW_OK);
+ ArrowBasicArrayStreamSetArray(&array_stream, 0, &array);
+
+ EXPECT_EQ(ArrowBasicArrayStreamValidate(&array_stream, &error), EINVAL);
+ EXPECT_STREQ(ArrowErrorMessage(&error),
+ "Expected array with 2 buffer(s) but found 3 buffer(s)");
+
+ array_stream.release(&array_stream);
+}
diff --git a/src/nanoarrow/array_test.cc b/src/nanoarrow/array_test.cc
index 2696a5e..9ab3b9c 100644
--- a/src/nanoarrow/array_test.cc
+++ b/src/nanoarrow/array_test.cc
@@ -1127,6 +1127,22 @@ TEST(ArrayTest, ArrayViewTestBasic) {
ArrowArrayViewReset(&array_view);
}
+TEST(ArrayTest, ArrayViewTestMove) {
+ struct ArrowArrayView array_view;
+ ArrowArrayViewInit(&array_view, NANOARROW_TYPE_STRING);
+ ASSERT_EQ(array_view.storage_type, NANOARROW_TYPE_STRING);
+
+ struct ArrowArrayView array_view2;
+ ArrowArrayViewInit(&array_view2, NANOARROW_TYPE_UNINITIALIZED);
+ ASSERT_EQ(array_view2.storage_type, NANOARROW_TYPE_UNINITIALIZED);
+
+ ArrowArrayViewMove(&array_view, &array_view2);
+ EXPECT_EQ(array_view.storage_type, NANOARROW_TYPE_UNINITIALIZED);
+ EXPECT_EQ(array_view2.storage_type, NANOARROW_TYPE_STRING);
+
+ ArrowArrayViewReset(&array_view2);
+}
+
TEST(ArrayTest, ArrayViewTestString) {
struct ArrowArrayView array_view;
struct ArrowError error;
diff --git a/src/nanoarrow/buffer_inline.h b/src/nanoarrow/buffer_inline.h
index 1d8c02c..9a83ceb 100644
--- a/src/nanoarrow/buffer_inline.h
+++ b/src/nanoarrow/buffer_inline.h
@@ -65,11 +65,10 @@ static inline void ArrowBufferReset(struct ArrowBuffer*
buffer) {
buffer->size_bytes = 0;
}
-static inline void ArrowBufferMove(struct ArrowBuffer* buffer,
- struct ArrowBuffer* buffer_out) {
- memcpy(buffer_out, buffer, sizeof(struct ArrowBuffer));
- buffer->data = NULL;
- ArrowBufferReset(buffer);
+static inline void ArrowBufferMove(struct ArrowBuffer* src, struct
ArrowBuffer* dst) {
+ memcpy(dst, src, sizeof(struct ArrowBuffer));
+ src->data = NULL;
+ ArrowBufferReset(src);
}
static inline ArrowErrorCode ArrowBufferResize(struct ArrowBuffer* buffer,
@@ -176,6 +175,16 @@ static inline ArrowErrorCode ArrowBufferAppendFloat(struct
ArrowBuffer* buffer,
return ArrowBufferAppend(buffer, &value, sizeof(float));
}
+static inline ArrowErrorCode ArrowBufferAppendStringView(struct ArrowBuffer*
buffer,
+ struct
ArrowStringView value) {
+ return ArrowBufferAppend(buffer, value.data, value.n_bytes);
+}
+
+static inline ArrowErrorCode ArrowBufferAppendBufferView(struct ArrowBuffer*
buffer,
+ struct
ArrowBufferView value) {
+ return ArrowBufferAppend(buffer, value.data.data, value.n_bytes);
+}
+
static inline ArrowErrorCode ArrowBufferAppendFill(struct ArrowBuffer* buffer,
uint8_t value, int64_t
size_bytes) {
NANOARROW_RETURN_NOT_OK(ArrowBufferReserve(buffer, size_bytes));
@@ -326,6 +335,12 @@ static inline void ArrowBitmapInit(struct ArrowBitmap*
bitmap) {
bitmap->size_bits = 0;
}
+static inline void ArrowBitmapMove(struct ArrowBitmap* src, struct
ArrowBitmap* dst) {
+ ArrowBufferMove(&src->buffer, &dst->buffer);
+ dst->size_bits = src->size_bits;
+ src->size_bits = 0;
+}
+
static inline ArrowErrorCode ArrowBitmapReserve(struct ArrowBitmap* bitmap,
int64_t additional_size_bits) {
int64_t min_capacity_bits = bitmap->size_bits + additional_size_bits;
diff --git a/src/nanoarrow/buffer_test.cc b/src/nanoarrow/buffer_test.cc
index 76c925d..78c5bdc 100644
--- a/src/nanoarrow/buffer_test.cc
+++ b/src/nanoarrow/buffer_test.cc
@@ -222,6 +222,19 @@ TEST(BufferTest, BufferTestAppendHelpers) {
EXPECT_EQ(ArrowBufferAppendFloat(&buffer, 123), NANOARROW_OK);
EXPECT_EQ(reinterpret_cast<float*>(buffer.data)[0], 123);
ArrowBufferReset(&buffer);
+
+ EXPECT_EQ(ArrowBufferAppendStringView(&buffer, ArrowCharView("a")),
NANOARROW_OK);
+ EXPECT_EQ(reinterpret_cast<char*>(buffer.data)[0], 'a');
+ EXPECT_EQ(buffer.size_bytes, 1);
+ ArrowBufferReset(&buffer);
+
+ struct ArrowBufferView buffer_view;
+ buffer_view.data.data = "a";
+ buffer_view.n_bytes = 1;
+ EXPECT_EQ(ArrowBufferAppendBufferView(&buffer, buffer_view), NANOARROW_OK);
+ EXPECT_EQ(reinterpret_cast<char*>(buffer.data)[0], 'a');
+ EXPECT_EQ(buffer.size_bytes, 1);
+ ArrowBufferReset(&buffer);
}
TEST(BitmapTest, BitmapTestElement) {
@@ -366,6 +379,24 @@ TEST(BitmapTest, BitmapTestAppend) {
ArrowBitmapReset(&bitmap);
}
+TEST(BitmapTest, BitmapTestMove) {
+ struct ArrowBitmap bitmap;
+ ArrowBitmapInit(&bitmap);
+ ASSERT_EQ(ArrowBitmapAppend(&bitmap, 1, 1), NANOARROW_OK);
+ ASSERT_NE(bitmap.buffer.data, nullptr);
+ ASSERT_EQ(bitmap.size_bits, 1);
+
+ struct ArrowBitmap bitmap2;
+ bitmap2.buffer.data = NULL;
+ ArrowBitmapMove(&bitmap, &bitmap2);
+ EXPECT_EQ(bitmap.buffer.data, nullptr);
+ EXPECT_EQ(bitmap.size_bits, 0);
+ EXPECT_NE(bitmap2.buffer.data, nullptr);
+ EXPECT_EQ(bitmap2.size_bits, 1);
+
+ ArrowBitmapReset(&bitmap2);
+}
+
TEST(BitmapTest, BitmapTestResize) {
struct ArrowBitmap bitmap;
ArrowBitmapInit(&bitmap);
diff --git a/src/nanoarrow/nanoarrow.h b/src/nanoarrow/nanoarrow.h
index 4dc8e68..4bea678 100644
--- a/src/nanoarrow/nanoarrow.h
+++ b/src/nanoarrow/nanoarrow.h
@@ -105,6 +105,12 @@
#define ArrowArrayViewSetArray \
NANOARROW_SYMBOL(NANOARROW_NAMESPACE, ArrowArrayViewSetArray)
#define ArrowArrayViewReset NANOARROW_SYMBOL(NANOARROW_NAMESPACE,
ArrowArrayViewReset)
+#define ArrowBasicArrayStreamInit \
+ NANOARROW_SYMBOL(NANOARROW_NAMESPACE, ArrowBasicArrayStreamInit)
+#define ArrowBasicArrayStreamSetArray \
+ NANOARROW_SYMBOL(NANOARROW_NAMESPACE, ArrowBasicArrayStreamSetArray)
+#define ArrowBasicArrayStreamValidate \
+ NANOARROW_SYMBOL(NANOARROW_NAMESPACE, ArrowBasicArrayStreamValidate)
#endif
@@ -483,8 +489,7 @@ static inline void ArrowBufferReset(struct ArrowBuffer*
buffer);
///
/// Transfers the buffer data and lifecycle management to another
/// address and resets buffer.
-static inline void ArrowBufferMove(struct ArrowBuffer* buffer,
- struct ArrowBuffer* buffer_out);
+static inline void ArrowBufferMove(struct ArrowBuffer* src, struct
ArrowBuffer* dst);
/// \brief Grow or shrink a buffer to a given capacity
///
@@ -564,6 +569,14 @@ static inline ArrowErrorCode
ArrowBufferAppendDouble(struct ArrowBuffer* buffer,
static inline ArrowErrorCode ArrowBufferAppendFloat(struct ArrowBuffer* buffer,
float value);
+/// \brief Write an ArrowStringView to a buffer
+static inline ArrowErrorCode ArrowBufferAppendStringView(struct ArrowBuffer*
buffer,
+ struct
ArrowStringView value);
+
+/// \brief Write an ArrowBufferView to a buffer
+static inline ArrowErrorCode ArrowBufferAppendBufferView(struct ArrowBuffer*
buffer,
+ struct
ArrowBufferView value);
+
/// @}
/// \defgroup nanoarrow-bitmap Bitmap utilities
@@ -594,6 +607,12 @@ static inline int64_t ArrowBitCountSet(const uint8_t*
bits, int64_t i_from, int6
/// Initialize the builder's buffer, empty its cache, and reset the size to
zero
static inline void ArrowBitmapInit(struct ArrowBitmap* bitmap);
+/// \brief Move an ArrowBitmap
+///
+/// Transfers the underlying buffer data and lifecycle management to another
+/// address and resets the bitmap.
+static inline void ArrowBitmapMove(struct ArrowBitmap* src, struct
ArrowBitmap* dst);
+
/// \brief Ensure a bitmap builder has at least a given additional capacity
///
/// Ensures that the buffer has space to append at least
@@ -792,6 +811,13 @@ ArrowErrorCode ArrowArrayFinishBuilding(struct ArrowArray*
array,
/// \brief Initialize the contents of an ArrowArrayView
void ArrowArrayViewInit(struct ArrowArrayView* array_view, enum ArrowType
storage_type);
+/// \brief Move an ArrowArrayView
+///
+/// Transfers the ArrowArrayView data and lifecycle management to another
+/// address and resets the contents of src.
+static inline void ArrowArrayViewMove(struct ArrowArrayView* src,
+ struct ArrowArrayView* dst);
+
/// \brief Initialize the contents of an ArrowArrayView from an ArrowSchema
ArrowErrorCode ArrowArrayViewInitFromSchema(struct ArrowArrayView* array_view,
struct ArrowSchema* schema,
@@ -851,6 +877,43 @@ static inline struct ArrowBufferView
ArrowArrayViewGetBytesUnsafe(
/// @}
+/// \defgroup nanoarrow-basic-array-stream Basic ArrowArrayStream
implementation
+///
+/// An implementation of an ArrowArrayStream based on a collection of
+/// zero or more previously-existing ArrowArray objects. Users should
+/// initialize and/or validate the contents before transferring the
+/// responsibility of the ArrowArrayStream elsewhere.
+///
+/// @{
+
+/// \brief Initialize an ArrowArrayStream backed by this implementation
+///
+/// This function moves the ownership of schema to the array_stream. If
+/// this function returns NANOARROW_OK, the caller is responsible for
+/// releasing the ArrowArrayStream.
+ArrowErrorCode ArrowBasicArrayStreamInit(struct ArrowArrayStream* array_stream,
+ struct ArrowSchema* schema, int64_t
n_arrays);
+
+/// \brief Set the ith ArrowArray in this ArrowArrayStream.
+///
+/// array_stream must have been initialized with ArrowBasicArrayStreamInit().
+/// This function move the ownership of array to the array_stream. i must
+/// be greater than zero and less than the value of n_arrays passed in
+/// ArrowBasicArrayStreamInit(). Callers are not required to fill all
+/// n_arrays members (i.e., n_arrays is a maximum bound).
+void ArrowBasicArrayStreamSetArray(struct ArrowArrayStream* array_stream,
int64_t i,
+ struct ArrowArray* array);
+
+/// \brief Validate the contents of this ArrowArrayStream
+///
+/// array_stream must have been initialized with ArrowBasicArrayStreamInit().
+/// This function uses ArrowArrayStreamInitFromSchema() and
ArrowArrayStreamSetArray()
+/// to validate the contents of the arrays.
+ArrowErrorCode ArrowBasicArrayStreamValidate(struct ArrowArrayStream*
array_stream,
+ struct ArrowError* error);
+
+/// @}
+
// Inline function definitions
#include "array_inline.h"
#include "buffer_inline.h"
diff --git a/src/nanoarrow/nanoarrow.hpp b/src/nanoarrow/nanoarrow.hpp
index 992253a..ad851ea 100644
--- a/src/nanoarrow/nanoarrow.hpp
+++ b/src/nanoarrow/nanoarrow.hpp
@@ -40,8 +40,7 @@ namespace internal {
static inline void init_pointer(struct ArrowSchema* data) { data->release =
nullptr; }
static inline void move_pointer(struct ArrowSchema* src, struct ArrowSchema*
dst) {
- memcpy(dst, src, sizeof(struct ArrowSchema));
- src->release = nullptr;
+ ArrowSchemaMove(src, dst);
}
static inline void release_pointer(struct ArrowSchema* data) {
@@ -53,8 +52,7 @@ static inline void release_pointer(struct ArrowSchema* data) {
static inline void init_pointer(struct ArrowArray* data) { data->release =
nullptr; }
static inline void move_pointer(struct ArrowArray* src, struct ArrowArray*
dst) {
- memcpy(dst, src, sizeof(struct ArrowArray));
- src->release = nullptr;
+ ArrowArrayMove(src, dst);
}
static inline void release_pointer(struct ArrowArray* data) {
@@ -69,8 +67,7 @@ static inline void init_pointer(struct ArrowArrayStream*
data) {
static inline void move_pointer(struct ArrowArrayStream* src,
struct ArrowArrayStream* dst) {
- memcpy(dst, src, sizeof(struct ArrowArrayStream));
- src->release = nullptr;
+ ArrowArrayStreamMove(src, dst);
}
static inline void release_pointer(ArrowArrayStream* data) {
@@ -90,9 +87,7 @@ static inline void release_pointer(struct ArrowBuffer* data)
{ ArrowBufferReset(
static inline void init_pointer(struct ArrowBitmap* data) {
ArrowBitmapInit(data); }
static inline void move_pointer(struct ArrowBitmap* src, struct ArrowBitmap*
dst) {
- ArrowBufferMove(&src->buffer, &dst->buffer);
- dst->size_bits = src->size_bits;
- src->size_bits = 0;
+ ArrowBitmapMove(src, dst);
}
static inline void release_pointer(struct ArrowBitmap* data) {
ArrowBitmapReset(data); }
@@ -102,8 +97,7 @@ static inline void init_pointer(struct ArrowArrayView* data)
{
}
static inline void move_pointer(struct ArrowArrayView* src, struct
ArrowArrayView* dst) {
- memcpy(dst, src, sizeof(struct ArrowArrayView));
- init_pointer(src);
+ ArrowArrayViewMove(src, dst);
}
static inline void release_pointer(struct ArrowArrayView* data) {
diff --git a/src/nanoarrow/nanoarrow_types.h b/src/nanoarrow/nanoarrow_types.h
index f62d358..def60b7 100644
--- a/src/nanoarrow/nanoarrow_types.h
+++ b/src/nanoarrow/nanoarrow_types.h
@@ -125,6 +125,25 @@ struct ArrowArrayStream {
#endif // ARROW_C_STREAM_INTERFACE
#endif // ARROW_FLAG_DICTIONARY_ORDERED
+/// \brief Move the contents of src into dst and set src->release to NULL
+static inline void ArrowSchemaMove(struct ArrowSchema* src, struct
ArrowSchema* dst) {
+ memcpy(dst, src, sizeof(struct ArrowSchema));
+ src->release = NULL;
+}
+
+/// \brief Move the contents of src into dst and set src->release to NULL
+static inline void ArrowArrayMove(struct ArrowArray* src, struct ArrowArray*
dst) {
+ memcpy(dst, src, sizeof(struct ArrowArray));
+ src->release = NULL;
+}
+
+/// \brief Move the contents of src into dst and set src->release to NULL
+static inline void ArrowArrayStreamMove(struct ArrowArrayStream* src,
+ struct ArrowArrayStream* dst) {
+ memcpy(dst, src, sizeof(struct ArrowArrayStream));
+ src->release = NULL;
+}
+
/// @}
// Utility macros