This is an automated email from the ASF dual-hosted git repository.
paleolimbot pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/arrow-nanoarrow.git
The following commit(s) were added to refs/heads/main by this push:
new dea6733 feat(r): Add ability to deterministically run a finalizer on
an array stream (#196)
dea6733 is described below
commit dea6733717ff09eac3a36e3078e67f3886b8b0a3
Author: Dewey Dunnington <[email protected]>
AuthorDate: Wed May 17 14:58:53 2023 -0400
feat(r): Add ability to deterministically run a finalizer on an array
stream (#196)
Before, the finalizer would only run on garbage collection. For ADBC, we
want that to be deterministic so that we can close a
statement/connection predictably.
---
r/NAMESPACE | 1 +
r/R/array-stream.R | 34 +++++++++++++++++++++++++++
r/man/array_stream_set_finalizer.Rd | 33 ++++++++++++++++++++++++++
r/src/array_stream.c | 39 +++++++++++++++++++++++++++++++
r/src/array_stream.h | 1 +
r/src/nanoarrow_cpp.cc | 8 ++++++-
r/src/pointers.c | 14 ++++++++---
r/src/util.h | 1 +
r/tests/testthat/test-array-stream.R | 45 ++++++++++++++++++++++++++++++++++++
9 files changed, 172 insertions(+), 4 deletions(-)
diff --git a/r/NAMESPACE b/r/NAMESPACE
index 2ca1ecf..7a24d35 100644
--- a/r/NAMESPACE
+++ b/r/NAMESPACE
@@ -84,6 +84,7 @@ S3method(str,nanoarrow_array)
S3method(str,nanoarrow_array_stream)
S3method(str,nanoarrow_buffer)
S3method(str,nanoarrow_schema)
+export(array_stream_set_finalizer)
export(as_nanoarrow_array)
export(as_nanoarrow_array_stream)
export(as_nanoarrow_buffer)
diff --git a/r/R/array-stream.R b/r/R/array-stream.R
index d6ac3d8..4c3ca65 100644
--- a/r/R/array-stream.R
+++ b/r/R/array-stream.R
@@ -50,6 +50,40 @@ basic_array_stream <- function(batches, schema = NULL,
validate = TRUE) {
.Call(nanoarrow_c_basic_array_stream, batches, schema, validate)
}
+#' Register an array stream finalizer
+#'
+#' In some cases, R functions that return a
[nanoarrow_array_stream][as_nanoarrow_array_stream]
+#' may require that the scope of some other object outlive that of the array
+#' stream. If there is a need for that object to be released deterministically
+#' (e.g., to close open files), you can register a function to run after the
+#' stream's release callback is invoked from the R thread. Note that this
+#' finalizer will **not** be run if the stream's release callback is invoked
+#' from a **non**-R thread. In this case, the finalizer and its chain of
+#' environments will be garbage-collected when `nanoarrow:::preserved_empty()`
+#' is run.
+#'
+#' @param array_stream A [nanoarrow_array_stream][as_nanoarrow_array_stream]
+#' @param finalizer A function that will be called with zero arguments.
+#'
+#' @return `array_stream`, invisibly
+#' @export
+#'
+#' @examples
+#' stream <- basic_array_stream(list(1:5))
+#' array_stream_set_finalizer(stream, function() message("All done!"))
+#' stream$release()
+#'
+array_stream_set_finalizer <- function(array_stream, finalizer) {
+ stopifnot(is.function(finalizer))
+
+ prot <- new.env(parent = emptyenv())
+ prot$array_stream_finalizer <- finalizer
+ class(prot) <- "nanoarrow_array_stream_finalizer"
+
+ nanoarrow_pointer_set_protected(array_stream, prot)
+ invisible(array_stream)
+}
+
#' Convert an object to a nanoarrow array_stream
#'
#' In nanoarrow, an 'array stream' corresponds to the `struct ArrowArrayStream`
diff --git a/r/man/array_stream_set_finalizer.Rd
b/r/man/array_stream_set_finalizer.Rd
new file mode 100644
index 0000000..0ecc878
--- /dev/null
+++ b/r/man/array_stream_set_finalizer.Rd
@@ -0,0 +1,33 @@
+% Generated by roxygen2: do not edit by hand
+% Please edit documentation in R/array-stream.R
+\name{array_stream_set_finalizer}
+\alias{array_stream_set_finalizer}
+\title{Register an array stream finalizer}
+\usage{
+array_stream_set_finalizer(array_stream, finalizer)
+}
+\arguments{
+\item{array_stream}{A
\link[=as_nanoarrow_array_stream]{nanoarrow_array_stream}}
+
+\item{finalizer}{A function that will be called with zero arguments.}
+}
+\value{
+\code{array_stream}, invisibly
+}
+\description{
+In some cases, R functions that return a
\link[=as_nanoarrow_array_stream]{nanoarrow_array_stream}
+may require that the scope of some other object outlive that of the array
+stream. If there is a need for that object to be released deterministically
+(e.g., to close open files), you can register a function to run after the
+stream's release callback is invoked from the R thread. Note that this
+finalizer will \strong{not} be run if the stream's release callback is invoked
+from a \strong{non}-R thread. In this case, the finalizer and its chain of
+environments will be garbage-collected when
\code{nanoarrow:::preserved_empty()}
+is run.
+}
+\examples{
+stream <- basic_array_stream(list(1:5))
+array_stream_set_finalizer(stream, function() message("All done!"))
+stream$release()
+
+}
diff --git a/r/src/array_stream.c b/r/src/array_stream.c
index 685b57c..5e73f19 100644
--- a/r/src/array_stream.c
+++ b/r/src/array_stream.c
@@ -25,6 +25,34 @@
#include "schema.h"
#include "util.h"
+// Ideally user-supplied finalizers are written in such a way that they don't
jump;
+// however if they do it is likely that memory will leak. Here, we use
+// R_tryCatchError to minimize the chances of that happening.
+static SEXP run_finalizer_wrapper(void* data) {
+ SEXP finalizer_sym = PROTECT(Rf_install("array_stream_finalizer"));
+ SEXP finalizer_call = PROTECT(Rf_lang1(finalizer_sym));
+ Rf_eval(finalizer_call, (SEXP)data);
+ UNPROTECT(2);
+ return R_NilValue;
+}
+
+static SEXP run_finalizer_error_handler(SEXP cond, void* hdata) {
+ REprintf("Error evaluating user-supplied array stream finalizer");
+ return R_NilValue;
+}
+
+void run_user_array_stream_finalizer(SEXP array_stream_xptr) {
+ SEXP protected = PROTECT(R_ExternalPtrProtected(array_stream_xptr));
+ R_SetExternalPtrProtected(array_stream_xptr, R_NilValue);
+
+ if (Rf_inherits(protected, "nanoarrow_array_stream_finalizer")) {
+ R_tryCatchError(&run_finalizer_wrapper, protected,
&run_finalizer_error_handler,
+ NULL);
+ }
+
+ UNPROTECT(1);
+}
+
void finalize_array_stream_xptr(SEXP array_stream_xptr) {
struct ArrowArrayStream* array_stream =
(struct ArrowArrayStream*)R_ExternalPtrAddr(array_stream_xptr);
@@ -35,6 +63,8 @@ void finalize_array_stream_xptr(SEXP array_stream_xptr) {
if (array_stream != NULL) {
ArrowFree(array_stream);
}
+
+ run_user_array_stream_finalizer(array_stream_xptr);
}
SEXP nanoarrow_c_array_stream_get_schema(SEXP array_stream_xptr) {
@@ -121,6 +151,15 @@ static void finalize_wrapper_array_stream(struct
ArrowArrayStream* array_stream)
if (array_stream->private_data != NULL) {
struct WrapperArrayStreamData* data =
(struct WrapperArrayStreamData*)array_stream->private_data;
+
+ // If safe to do so, attempt to do an eager evaluation of a release
+ // callback that may have been registered. If it is not safe to do so,
+ // garbage collection will run any finalizers that have been set
+ // on the chain of environments leading up to the finalizer.
+ if (nanoarrow_is_main_thread()) {
+ run_user_array_stream_finalizer(data->parent_array_stream_xptr);
+ }
+
nanoarrow_release_sexp(data->parent_array_stream_xptr);
ArrowFree(array_stream->private_data);
}
diff --git a/r/src/array_stream.h b/r/src/array_stream.h
index b28488d..823e718 100644
--- a/r/src/array_stream.h
+++ b/r/src/array_stream.h
@@ -24,6 +24,7 @@
#include "nanoarrow.h"
#include "util.h"
+void run_user_array_stream_finalizer(SEXP array_stream_xptr);
void finalize_array_stream_xptr(SEXP array_stream_xptr);
// Returns the underlying struct ArrowSchema* from an external pointer,
diff --git a/r/src/nanoarrow_cpp.cc b/r/src/nanoarrow_cpp.cc
index ea7ef5a..9c0e38d 100644
--- a/r/src/nanoarrow_cpp.cc
+++ b/r/src/nanoarrow_cpp.cc
@@ -19,8 +19,8 @@
#include <R.h>
#include <Rinternals.h>
-#include <cstring>
#include <cstdint>
+#include <cstring>
#include <mutex>
#include <string>
#include <thread>
@@ -61,6 +61,8 @@ class PreservedSEXPRegistry {
int64_t size() { return preserved_count_; }
+ bool is_main_thread() { return std::this_thread::get_id() ==
main_thread_id_; }
+
void preserve(SEXP obj) {
if (obj == R_NilValue) {
return;
@@ -190,6 +192,10 @@ extern "C" int64_t nanoarrow_preserved_empty(void) {
}
}
+extern "C" int nanoarrow_is_main_thread(void) {
+ return PreservedSEXPRegistry::GetInstance().is_main_thread();
+}
+
extern "C" void nanoarrow_preserve_and_release_on_other_thread(SEXP obj) {
nanoarrow_preserve_sexp(obj);
std::thread worker([obj] { nanoarrow_release_sexp(obj); });
diff --git a/r/src/pointers.c b/r/src/pointers.c
index 47871d5..23b27f6 100644
--- a/r/src/pointers.c
+++ b/r/src/pointers.c
@@ -96,21 +96,22 @@ SEXP nanoarrow_c_pointer_is_valid(SEXP ptr) {
SEXP nanoarrow_c_pointer_release(SEXP ptr) {
if (Rf_inherits(ptr, "nanoarrow_schema")) {
struct ArrowSchema* obj = (struct ArrowSchema*)R_ExternalPtrAddr(ptr);
- if (Rf_ScalarLogical(obj != NULL && obj->release != NULL)) {
+ if (obj != NULL && obj->release != NULL) {
obj->release(obj);
obj->release = NULL;
}
} else if (Rf_inherits(ptr, "nanoarrow_array")) {
struct ArrowArray* obj = (struct ArrowArray*)R_ExternalPtrAddr(ptr);
- if (Rf_ScalarLogical(obj != NULL && obj->release != NULL)) {
+ if (obj != NULL && obj->release != NULL) {
obj->release(obj);
obj->release = NULL;
}
} else if (Rf_inherits(ptr, "nanoarrow_array_stream")) {
struct ArrowArrayStream* obj = (struct
ArrowArrayStream*)R_ExternalPtrAddr(ptr);
- if (Rf_ScalarLogical(obj != NULL && obj->release != NULL)) {
+ if (obj != NULL && obj->release != NULL) {
obj->release(obj);
obj->release = NULL;
+ run_user_array_stream_finalizer(ptr);
}
} else {
Rf_error(
@@ -183,6 +184,8 @@ SEXP nanoarrow_c_pointer_move(SEXP ptr_src, SEXP ptr_dst) {
// also move SEXP dependencies
R_SetExternalPtrProtected(ptr_dst, R_ExternalPtrProtected(xptr_src));
R_SetExternalPtrTag(ptr_dst, R_ExternalPtrTag(xptr_src));
+ R_SetExternalPtrProtected(xptr_src, R_NilValue);
+ R_SetExternalPtrTag(xptr_src, R_NilValue);
UNPROTECT(1);
return R_NilValue;
@@ -253,6 +256,11 @@ SEXP nanoarrow_c_export_array_stream(SEXP
array_stream_xptr, SEXP ptr_dst) {
}
array_stream_export(array_stream_xptr, obj_dst);
+
+ // Remove SEXP dependencies (if important they are kept alive by
array_stream_export)
+ R_SetExternalPtrProtected(array_stream_xptr, R_NilValue);
+ R_SetExternalPtrTag(array_stream_xptr, R_NilValue);
+
UNPROTECT(1);
return R_NilValue;
}
diff --git a/r/src/util.h b/r/src/util.h
index bafbd60..338cd56 100644
--- a/r/src/util.h
+++ b/r/src/util.h
@@ -43,6 +43,7 @@ void nanoarrow_preserve_sexp(SEXP obj);
void nanoarrow_release_sexp(SEXP obj);
int64_t nanoarrow_preserved_count(void);
int64_t nanoarrow_preserved_empty(void);
+int nanoarrow_is_main_thread(void);
// For testing
void nanoarrow_preserve_and_release_on_other_thread(SEXP obj);
diff --git a/r/tests/testthat/test-array-stream.R
b/r/tests/testthat/test-array-stream.R
index d0c8d7c..a6020ca 100644
--- a/r/tests/testthat/test-array-stream.R
+++ b/r/tests/testthat/test-array-stream.R
@@ -165,3 +165,48 @@ test_that("nanoarrow_array_stream get_next() with schema =
NULL", {
array <- stream$get_next(schema = NULL)
expect_error(infer_nanoarrow_schema(array), "has no associated schema")
})
+
+test_that("User array stream finalizers are run on explicit release", {
+ stream <- basic_array_stream(list(1:5))
+ array_stream_set_finalizer(stream, function() cat("All done!"))
+ expect_output(stream$release(), "All done!")
+ expect_silent(stream$release())
+})
+
+test_that("User array stream finalizers are run on explicit release even when
moved", {
+ stream <- basic_array_stream(list(1:5))
+ array_stream_set_finalizer(stream, function() cat("All done!"))
+
+ stream2 <- nanoarrow_allocate_array_stream()
+ nanoarrow_pointer_move(stream, stream2)
+ expect_false(nanoarrow_pointer_is_valid(stream))
+ expect_silent(nanoarrow_pointer_release(stream))
+ expect_output(stream2$release(), "All done!")
+ expect_silent(stream2$release())
+})
+
+test_that("User array stream finalizers are run on explicit release even when
exported", {
+ stream <- basic_array_stream(list(1:5))
+ array_stream_set_finalizer(stream, function() cat("All done!"))
+
+ stream2 <- nanoarrow_allocate_array_stream()
+ nanoarrow_pointer_export(stream, stream2)
+ expect_false(nanoarrow_pointer_is_valid(stream))
+ expect_silent(nanoarrow_pointer_release(stream))
+ expect_output(stream2$release(), "All done!")
+ expect_silent(stream2$release())
+})
+
+test_that("Errors from user array stream finalizer are ignored", {
+ stream <- basic_array_stream(list(1:5))
+ array_stream_set_finalizer(stream, function() stop("Error that will be
ignored"))
+ # Because this comes from REprintf(), it's not a message and not "output"
+ # according to testthat, so we use capture.output()
+ expect_identical(
+ capture.output(stream$release(), type = "message"),
+ "Error evaluating user-supplied array stream finalizer"
+ )
+
+ expect_false(nanoarrow_pointer_is_valid(stream))
+ expect_silent(stream$release())
+})