This is an automated email from the ASF dual-hosted git repository.
paleolimbot pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/arrow-nanoarrow.git
The following commit(s) were added to refs/heads/main by this push:
new b4396bf feat(r): Add ArrowArrayStream implementation to support
keeping a dependent object in scope (#194)
b4396bf is described below
commit b4396bfd32eddb8b898727f1b2c366023f2bcb63
Author: Dewey Dunnington <[email protected]>
AuthorDate: Tue May 16 08:42:26 2023 -0400
feat(r): Add ArrowArrayStream implementation to support keeping a dependent
object in scope (#194)
This is useful because the ADBC spec requires that the AdbcStatement
must outlive the stream. GDAL's ArrowArrayStream export from OGR also
specifies something like this. If the stream is getting consumed from R
it's no problem; however, we want to be able to export these streams
elsewhere (e.g., DuckDB, Polars) and handling it at the C level ensures
that flexibility.
See also https://github.com/r-dbi/adbc/discussions/4
---
r/NAMESPACE | 1 +
r/R/pointers.R | 26 +++++++++++--
r/man/nanoarrow_pointer_is_valid.Rd | 8 ++++
r/src/array_stream.c | 77 +++++++++++++++++++++++++++++++++++++
r/src/array_stream.h | 3 ++
r/src/init.c | 4 ++
r/src/pointers.c | 27 +++++++++++++
r/src/util.h | 7 ++++
r/tests/testthat/test-pointers.R | 31 ++++++++++++++-
9 files changed, 179 insertions(+), 5 deletions(-)
diff --git a/r/NAMESPACE b/r/NAMESPACE
index 71e5324..2ca1ecf 100644
--- a/r/NAMESPACE
+++ b/r/NAMESPACE
@@ -147,6 +147,7 @@ export(nanoarrow_pointer_export)
export(nanoarrow_pointer_is_valid)
export(nanoarrow_pointer_move)
export(nanoarrow_pointer_release)
+export(nanoarrow_pointer_set_protected)
export(nanoarrow_schema_modify)
export(nanoarrow_schema_parse)
export(nanoarrow_version)
diff --git a/r/R/pointers.R b/r/R/pointers.R
index e6150d5..1f4d7ec 100644
--- a/r/R/pointers.R
+++ b/r/R/pointers.R
@@ -61,6 +61,10 @@
#'
#' @param ptr,ptr_src,ptr_dst An external pointer to a `struct ArrowSchema`,
#' `struct ArrowArray`, or `struct ArrowArrayStream`.
+#' @param protected An object whose scope must outlive that of `ptr`. This is
+#' useful for array streams since at least two specifications involving the
+#' array stream specify that the stream is only valid for the lifecycle of
+#' another object (e.g., an AdbcStatement or OGRDataset).
#' @return
#' - `nanoarrow_pointer_is_valid()` returns TRUE if the pointer is non-null
#' and has a non-null release callback.
@@ -116,17 +120,18 @@ nanoarrow_pointer_move <- function(ptr_src, ptr_dst) {
#' @export
nanoarrow_pointer_export <- function(ptr_src, ptr_dst) {
if (inherits(ptr_src, "nanoarrow_schema")) {
- invisible(.Call(nanoarrow_c_export_schema, ptr_src, ptr_dst))
+ .Call(nanoarrow_c_export_schema, ptr_src, ptr_dst)
} else if (inherits(ptr_src, "nanoarrow_array")) {
- invisible(.Call(nanoarrow_c_export_array, ptr_src, ptr_dst))
+ .Call(nanoarrow_c_export_array, ptr_src, ptr_dst)
} else if (inherits(ptr_src, "nanoarrow_array_stream")) {
- # for streams, we don't keep the original pointer alive
- nanoarrow_pointer_move(ptr_src, ptr_dst)
+ .Call(nanoarrow_c_export_array_stream, ptr_src, ptr_dst)
} else {
stop(
"`ptr_src` must inherit from 'nanoarrow_schema', 'nanoarrow_array', or
'nanoarrow_array_stream'"
)
}
+
+ invisible(ptr_dst)
}
#' @rdname nanoarrow_pointer_is_valid
@@ -146,3 +151,16 @@ nanoarrow_allocate_array <- function() {
nanoarrow_allocate_array_stream <- function() {
.Call(nanoarrow_c_allocate_array_stream)
}
+
+#' @rdname nanoarrow_pointer_is_valid
+#' @export
+nanoarrow_pointer_set_protected <- function(ptr_src, protected) {
+ if (!inherits(ptr_src, c("nanoarrow_schema", "nanoarrow_array",
"nanoarrow_array_stream"))) {
+ stop(
+ "`ptr_src` must inherit from 'nanoarrow_schema', 'nanoarrow_array', or
'nanoarrow_array_stream'"
+ )
+ }
+
+ .Call(nanoarrow_c_pointer_set_protected, ptr_src, protected)
+ invisible(ptr_src)
+}
diff --git a/r/man/nanoarrow_pointer_is_valid.Rd
b/r/man/nanoarrow_pointer_is_valid.Rd
index 557cf62..f2aaf5a 100644
--- a/r/man/nanoarrow_pointer_is_valid.Rd
+++ b/r/man/nanoarrow_pointer_is_valid.Rd
@@ -11,6 +11,7 @@
\alias{nanoarrow_allocate_schema}
\alias{nanoarrow_allocate_array}
\alias{nanoarrow_allocate_array_stream}
+\alias{nanoarrow_pointer_set_protected}
\title{Danger zone: low-level pointer operations}
\usage{
nanoarrow_pointer_is_valid(ptr)
@@ -32,10 +33,17 @@ nanoarrow_allocate_schema()
nanoarrow_allocate_array()
nanoarrow_allocate_array_stream()
+
+nanoarrow_pointer_set_protected(ptr_src, protected)
}
\arguments{
\item{ptr, ptr_src, ptr_dst}{An external pointer to a \verb{struct
ArrowSchema},
\verb{struct ArrowArray}, or \verb{struct ArrowArrayStream}.}
+
+\item{protected}{An object whose scope must outlive that of \code{ptr}. This is
+useful for array streams since at least two specifications involving the
+array stream specify that the stream is only valid for the lifecycle of
+another object (e.g., an AdbcStatement or OGRDataset).}
}
\value{
\itemize{
diff --git a/r/src/array_stream.c b/r/src/array_stream.c
index 0d879ca..f0832b8 100644
--- a/r/src/array_stream.c
+++ b/r/src/array_stream.c
@@ -23,6 +23,7 @@
#include "array_stream.h"
#include "nanoarrow.h"
#include "schema.h"
+#include "util.h"
void finalize_array_stream_xptr(SEXP array_stream_xptr) {
struct ArrowArrayStream* array_stream =
@@ -104,3 +105,79 @@ SEXP nanoarrow_c_basic_array_stream(SEXP batches_sexp,
SEXP schema_xptr,
UNPROTECT(1);
return array_stream_xptr;
}
+
+// Implementation of an ArrowArrayStream that keeps a dependent object valid
+struct WrapperArrayStreamData {
+ SEXP parent_array_stream_xptr;
+ struct ArrowArrayStream* parent_array_stream;
+};
+
+static void finalize_wrapper_array_stream(struct ArrowArrayStream*
array_stream) {
+ if (array_stream->private_data != NULL) {
+ struct WrapperArrayStreamData* data =
+ (struct WrapperArrayStreamData*)array_stream->private_data;
+ nanoarrow_release_sexp(data->parent_array_stream_xptr);
+ ArrowFree(array_stream->private_data);
+ }
+
+ array_stream->release = NULL;
+}
+
+static const char* wrapper_array_stream_get_last_error(
+ struct ArrowArrayStream* array_stream) {
+ struct WrapperArrayStreamData* data =
+ (struct WrapperArrayStreamData*)array_stream->private_data;
+ return data->parent_array_stream->get_last_error(data->parent_array_stream);
+}
+
+static int wrapper_array_stream_get_schema(struct ArrowArrayStream*
array_stream,
+ struct ArrowSchema* out) {
+ struct WrapperArrayStreamData* data =
+ (struct WrapperArrayStreamData*)array_stream->private_data;
+ return data->parent_array_stream->get_schema(data->parent_array_stream, out);
+}
+
+static int wrapper_array_stream_get_next(struct ArrowArrayStream* array_stream,
+ struct ArrowArray* out) {
+ struct WrapperArrayStreamData* data =
+ (struct WrapperArrayStreamData*)array_stream->private_data;
+ return data->parent_array_stream->get_next(data->parent_array_stream, out);
+}
+
+void array_stream_export(SEXP parent_array_stream_xptr,
+ struct ArrowArrayStream* array_stream_copy) {
+ struct ArrowArrayStream* parent_array_stream =
+ array_stream_from_xptr(parent_array_stream_xptr);
+
+ // If there is no dependent object, don't bother with this wrapper
+ SEXP dependent_sexp = R_ExternalPtrProtected(parent_array_stream_xptr);
+ if (dependent_sexp == R_NilValue) {
+ ArrowArrayStreamMove(parent_array_stream, array_stream_copy);
+ return;
+ }
+
+ // Allocate a new external pointer for an array stream (for consistency:
+ // we always move an array stream when exporting)
+ SEXP parent_array_stream_xptr_new = PROTECT(array_stream_owning_xptr());
+ struct ArrowArrayStream* parent_array_stream_new =
+ (struct
ArrowArrayStream*)R_ExternalPtrAddr(parent_array_stream_xptr_new);
+ ArrowArrayStreamMove(parent_array_stream, parent_array_stream_new);
+ R_SetExternalPtrProtected(parent_array_stream_xptr_new, dependent_sexp);
+
+ array_stream_copy->private_data = NULL;
+ array_stream_copy->get_last_error = &wrapper_array_stream_get_last_error;
+ array_stream_copy->get_schema = &wrapper_array_stream_get_schema;
+ array_stream_copy->get_next = &wrapper_array_stream_get_next;
+ array_stream_copy->release = &finalize_wrapper_array_stream;
+
+ struct WrapperArrayStreamData* data =
+ (struct WrapperArrayStreamData*)ArrowMalloc(sizeof(struct
WrapperArrayStreamData));
+ check_trivial_alloc(data, "struct WrapperArrayStreamData");
+ data->parent_array_stream_xptr = parent_array_stream_xptr_new;
+ data->parent_array_stream = parent_array_stream_new;
+ array_stream_copy->private_data = data;
+
+ // Transfer responsibility for the stream_xptr to the C object
+ nanoarrow_preserve_sexp(parent_array_stream_xptr_new);
+ UNPROTECT(1);
+}
diff --git a/r/src/array_stream.h b/r/src/array_stream.h
index 28c9b06..b28488d 100644
--- a/r/src/array_stream.h
+++ b/r/src/array_stream.h
@@ -61,4 +61,7 @@ static inline SEXP array_stream_owning_xptr(void) {
return array_stream_xptr;
}
+void array_stream_export(SEXP array_stream_xptr,
+ struct ArrowArrayStream* array_stream_copy);
+
#endif
diff --git a/r/src/init.c b/r/src/init.c
index 3b88b88..371f202 100644
--- a/r/src/init.c
+++ b/r/src/init.c
@@ -62,6 +62,8 @@ extern SEXP nanoarrow_c_pointer_release(SEXP ptr);
extern SEXP nanoarrow_c_pointer_move(SEXP ptr_src, SEXP ptr_dst);
extern SEXP nanoarrow_c_export_schema(SEXP schema_xptr, SEXP ptr_dst);
extern SEXP nanoarrow_c_export_array(SEXP array_xptr, SEXP ptr_dst);
+extern SEXP nanoarrow_c_export_array_stream(SEXP array_stream_xptr, SEXP
ptr_dst);
+extern SEXP nanoarrow_c_pointer_set_protected(SEXP ptr_src, SEXP
protected_sexp);
extern SEXP nanoarrow_c_schema_init(SEXP type_id_sexp, SEXP nullable_sexp);
extern SEXP nanoarrow_c_schema_init_date_time(SEXP type_id_sexp, SEXP
time_unit_sexp, SEXP timezone_sexp, SEXP nullable_sexp);
extern SEXP nanoarrow_c_schema_init_decimal(SEXP type_id_sexp, SEXP
precision_sexp, SEXP scale_sexp, SEXP nullable_sexp);
@@ -121,6 +123,8 @@ static const R_CallMethodDef CallEntries[] = {
{"nanoarrow_c_pointer_move", (DL_FUNC)&nanoarrow_c_pointer_move, 2},
{"nanoarrow_c_export_schema", (DL_FUNC)&nanoarrow_c_export_schema, 2},
{"nanoarrow_c_export_array", (DL_FUNC)&nanoarrow_c_export_array, 2},
+ {"nanoarrow_c_export_array_stream",
(DL_FUNC)&nanoarrow_c_export_array_stream, 2},
+ {"nanoarrow_c_pointer_set_protected",
(DL_FUNC)&nanoarrow_c_pointer_set_protected, 2},
{"nanoarrow_c_schema_init", (DL_FUNC)&nanoarrow_c_schema_init, 2},
{"nanoarrow_c_schema_init_date_time",
(DL_FUNC)&nanoarrow_c_schema_init_date_time, 4},
{"nanoarrow_c_schema_init_decimal",
(DL_FUNC)&nanoarrow_c_schema_init_decimal, 4},
diff --git a/r/src/pointers.c b/r/src/pointers.c
index 5124c62..47871d5 100644
--- a/r/src/pointers.c
+++ b/r/src/pointers.c
@@ -238,3 +238,30 @@ SEXP nanoarrow_c_export_array(SEXP array_xptr, SEXP
ptr_dst) {
UNPROTECT(1);
return R_NilValue;
}
+
+SEXP nanoarrow_c_export_array_stream(SEXP array_stream_xptr, SEXP ptr_dst) {
+ SEXP xptr_dst = PROTECT(nanoarrow_c_pointer(ptr_dst));
+
+ struct ArrowArrayStream* obj_dst =
+ (struct ArrowArrayStream*)R_ExternalPtrAddr(xptr_dst);
+ if (obj_dst == NULL) {
+ Rf_error("`ptr_dst` is a pointer to NULL");
+ }
+
+ if (obj_dst->release != NULL) {
+ Rf_error("`ptr_dst` is a valid struct ArrowArrayStream");
+ }
+
+ array_stream_export(array_stream_xptr, obj_dst);
+ UNPROTECT(1);
+ return R_NilValue;
+}
+
+SEXP nanoarrow_c_pointer_set_protected(SEXP ptr_src, SEXP protected_sexp) {
+ if (R_ExternalPtrProtected(ptr_src) != R_NilValue) {
+ Rf_error("External pointer protected value has already been set");
+ }
+
+ R_SetExternalPtrProtected(ptr_src, protected_sexp);
+ return R_NilValue;
+}
diff --git a/r/src/util.h b/r/src/util.h
index dbc7202..bafbd60 100644
--- a/r/src/util.h
+++ b/r/src/util.h
@@ -47,4 +47,11 @@ int64_t nanoarrow_preserved_empty(void);
// For testing
void nanoarrow_preserve_and_release_on_other_thread(SEXP obj);
+// Checker for very small mallocs()
+static inline void check_trivial_alloc(const void* ptr, const char* ptr_type) {
+ if (ptr == NULL) {
+ Rf_error("ArrowMalloc(sizeof(%s)) failed", ptr_type); // # nocov
+ }
+}
+
#endif
diff --git a/r/tests/testthat/test-pointers.R b/r/tests/testthat/test-pointers.R
index 36efa5a..bcc5d3d 100644
--- a/r/tests/testthat/test-pointers.R
+++ b/r/tests/testthat/test-pointers.R
@@ -194,6 +194,8 @@ test_that("nanoarrow_pointer_export() works for
array_stream", {
expect_false(nanoarrow_pointer_is_valid(ptr))
expect_true(nanoarrow_pointer_is_valid(dst))
+ expect_identical(convert_array_stream(dst), data.frame(a = integer()))
+
expect_error(
nanoarrow_pointer_export(ptr, dst),
"`ptr_dst` is a valid struct ArrowArrayStream"
@@ -201,7 +203,34 @@ test_that("nanoarrow_pointer_export() works for
array_stream", {
expect_error(
nanoarrow_pointer_export(nanoarrow_allocate_array_stream(), ptr),
- "is not a valid struct ArrowArrayStream"
+ "has already been released"
+ )
+})
+
+test_that("nanoarrow_pointer_export() works for wrapped array_stream", {
+ some_dependent_object <- list()
+ ptr <- as_nanoarrow_array_stream(data.frame(a = integer()))
+ nanoarrow_pointer_set_protected(ptr, some_dependent_object)
+
+ dst <- nanoarrow_allocate_array_stream()
+ nanoarrow_pointer_export(ptr, dst)
+ expect_false(nanoarrow_pointer_is_valid(ptr))
+ expect_true(nanoarrow_pointer_is_valid(dst))
+
+ expect_identical(convert_array_stream(dst), data.frame(a = integer()))
+})
+
+test_that("nanoarrow_pointer_set_protected() errors appropriately", {
+ expect_error(
+ nanoarrow_pointer_set_protected(NULL),
+ "must inherit from 'nanoarrow_schema', 'nanoarrow_array', or
'nanoarrow_array_stream'"
+ )
+
+ dst <- nanoarrow_allocate_array_stream()
+ nanoarrow_pointer_set_protected(dst, 1234)
+ expect_error(
+ nanoarrow_pointer_set_protected(dst, 5678),
+ "External pointer protected value has already been set"
)
})