This is an automated email from the ASF dual-hosted git repository.

paleolimbot pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/arrow-nanoarrow.git


The following commit(s) were added to refs/heads/main by this push:
     new dea6733  feat(r): Add ability to deterministically run a finalizer on 
an array stream (#196)
dea6733 is described below

commit dea6733717ff09eac3a36e3078e67f3886b8b0a3
Author: Dewey Dunnington <[email protected]>
AuthorDate: Wed May 17 14:58:53 2023 -0400

    feat(r): Add ability to deterministically run a finalizer on an array 
stream (#196)
    
    Before, the finalizer would only run on garbage collection. For ADBC, we
    want that to be deterministic so that we can close a
    statement/connection predictably.
---
 r/NAMESPACE                          |  1 +
 r/R/array-stream.R                   | 34 +++++++++++++++++++++++++++
 r/man/array_stream_set_finalizer.Rd  | 33 ++++++++++++++++++++++++++
 r/src/array_stream.c                 | 39 +++++++++++++++++++++++++++++++
 r/src/array_stream.h                 |  1 +
 r/src/nanoarrow_cpp.cc               |  8 ++++++-
 r/src/pointers.c                     | 14 ++++++++---
 r/src/util.h                         |  1 +
 r/tests/testthat/test-array-stream.R | 45 ++++++++++++++++++++++++++++++++++++
 9 files changed, 172 insertions(+), 4 deletions(-)

diff --git a/r/NAMESPACE b/r/NAMESPACE
index 2ca1ecf..7a24d35 100644
--- a/r/NAMESPACE
+++ b/r/NAMESPACE
@@ -84,6 +84,7 @@ S3method(str,nanoarrow_array)
 S3method(str,nanoarrow_array_stream)
 S3method(str,nanoarrow_buffer)
 S3method(str,nanoarrow_schema)
+export(array_stream_set_finalizer)
 export(as_nanoarrow_array)
 export(as_nanoarrow_array_stream)
 export(as_nanoarrow_buffer)
diff --git a/r/R/array-stream.R b/r/R/array-stream.R
index d6ac3d8..4c3ca65 100644
--- a/r/R/array-stream.R
+++ b/r/R/array-stream.R
@@ -50,6 +50,40 @@ basic_array_stream <- function(batches, schema = NULL, 
validate = TRUE) {
   .Call(nanoarrow_c_basic_array_stream, batches, schema, validate)
 }
 
+#' Register an array stream finalizer
+#'
+#' In some cases, R functions that return a 
[nanoarrow_array_stream][as_nanoarrow_array_stream]
+#' may require that the scope of some other object outlive that of the array
+#' stream. If there is a need for that object to be released deterministically
+#' (e.g., to close open files), you can register a function to run after the
+#' stream's release callback is invoked from the R thread. Note that this
+#' finalizer will **not** be run if the stream's release callback is invoked
+#' from a **non**-R thread. In this case, the finalizer and its chain of
+#' environments will be garbage-collected when `nanoarrow:::preserved_empty()`
+#' is run.
+#'
+#' @param array_stream A [nanoarrow_array_stream][as_nanoarrow_array_stream]
+#' @param finalizer A function that will be called with zero arguments.
+#'
+#' @return `array_stream`, invisibly
+#' @export
+#'
+#' @examples
+#' stream <- basic_array_stream(list(1:5))
+#' array_stream_set_finalizer(stream, function() message("All done!"))
+#' stream$release()
+#'
+array_stream_set_finalizer <- function(array_stream, finalizer) {
+  stopifnot(is.function(finalizer))
+
+  prot <- new.env(parent = emptyenv())
+  prot$array_stream_finalizer <- finalizer
+  class(prot) <- "nanoarrow_array_stream_finalizer"
+
+  nanoarrow_pointer_set_protected(array_stream, prot)
+  invisible(array_stream)
+}
+
 #' Convert an object to a nanoarrow array_stream
 #'
 #' In nanoarrow, an 'array stream' corresponds to the `struct ArrowArrayStream`
diff --git a/r/man/array_stream_set_finalizer.Rd 
b/r/man/array_stream_set_finalizer.Rd
new file mode 100644
index 0000000..0ecc878
--- /dev/null
+++ b/r/man/array_stream_set_finalizer.Rd
@@ -0,0 +1,33 @@
+% Generated by roxygen2: do not edit by hand
+% Please edit documentation in R/array-stream.R
+\name{array_stream_set_finalizer}
+\alias{array_stream_set_finalizer}
+\title{Register an array stream finalizer}
+\usage{
+array_stream_set_finalizer(array_stream, finalizer)
+}
+\arguments{
+\item{array_stream}{A 
\link[=as_nanoarrow_array_stream]{nanoarrow_array_stream}}
+
+\item{finalizer}{A function that will be called with zero arguments.}
+}
+\value{
+\code{array_stream}, invisibly
+}
+\description{
+In some cases, R functions that return a 
\link[=as_nanoarrow_array_stream]{nanoarrow_array_stream}
+may require that the scope of some other object outlive that of the array
+stream. If there is a need for that object to be released deterministically
+(e.g., to close open files), you can register a function to run after the
+stream's release callback is invoked from the R thread. Note that this
+finalizer will \strong{not} be run if the stream's release callback is invoked
+from a \strong{non}-R thread. In this case, the finalizer and its chain of
+environments will be garbage-collected when 
\code{nanoarrow:::preserved_empty()}
+is run.
+}
+\examples{
+stream <- basic_array_stream(list(1:5))
+array_stream_set_finalizer(stream, function() message("All done!"))
+stream$release()
+
+}
diff --git a/r/src/array_stream.c b/r/src/array_stream.c
index 685b57c..5e73f19 100644
--- a/r/src/array_stream.c
+++ b/r/src/array_stream.c
@@ -25,6 +25,34 @@
 #include "schema.h"
 #include "util.h"
 
+// Ideally user-supplied finalizers are written in such a way that they don't 
jump;
+// however if they do it is likely that memory will leak. Here, we use
+// R_tryCatchError to minimize the chances of that happening.
+static SEXP run_finalizer_wrapper(void* data) {
+  SEXP finalizer_sym = PROTECT(Rf_install("array_stream_finalizer"));
+  SEXP finalizer_call = PROTECT(Rf_lang1(finalizer_sym));
+  Rf_eval(finalizer_call, (SEXP)data);
+  UNPROTECT(2);
+  return R_NilValue;
+}
+
+static SEXP run_finalizer_error_handler(SEXP cond, void* hdata) {
+  REprintf("Error evaluating user-supplied array stream finalizer");
+  return R_NilValue;
+}
+
+void run_user_array_stream_finalizer(SEXP array_stream_xptr) {
+  SEXP protected = PROTECT(R_ExternalPtrProtected(array_stream_xptr));
+  R_SetExternalPtrProtected(array_stream_xptr, R_NilValue);
+
+  if (Rf_inherits(protected, "nanoarrow_array_stream_finalizer")) {
+    R_tryCatchError(&run_finalizer_wrapper, protected, 
&run_finalizer_error_handler,
+                    NULL);
+  }
+
+  UNPROTECT(1);
+}
+
 void finalize_array_stream_xptr(SEXP array_stream_xptr) {
   struct ArrowArrayStream* array_stream =
       (struct ArrowArrayStream*)R_ExternalPtrAddr(array_stream_xptr);
@@ -35,6 +63,8 @@ void finalize_array_stream_xptr(SEXP array_stream_xptr) {
   if (array_stream != NULL) {
     ArrowFree(array_stream);
   }
+
+  run_user_array_stream_finalizer(array_stream_xptr);
 }
 
 SEXP nanoarrow_c_array_stream_get_schema(SEXP array_stream_xptr) {
@@ -121,6 +151,15 @@ static void finalize_wrapper_array_stream(struct 
ArrowArrayStream* array_stream)
   if (array_stream->private_data != NULL) {
     struct WrapperArrayStreamData* data =
         (struct WrapperArrayStreamData*)array_stream->private_data;
+
+    // If safe to do so, attempt to do an eager evaluation of a release
+    // callback that may have been registered. If it is not safe to do so,
+    // garbage collection will run any finalizers that have been set
+    // on the chain of environments leading up to the finalizer.
+    if (nanoarrow_is_main_thread()) {
+      run_user_array_stream_finalizer(data->parent_array_stream_xptr);
+    }
+
     nanoarrow_release_sexp(data->parent_array_stream_xptr);
     ArrowFree(array_stream->private_data);
   }
diff --git a/r/src/array_stream.h b/r/src/array_stream.h
index b28488d..823e718 100644
--- a/r/src/array_stream.h
+++ b/r/src/array_stream.h
@@ -24,6 +24,7 @@
 #include "nanoarrow.h"
 #include "util.h"
 
+void run_user_array_stream_finalizer(SEXP array_stream_xptr);
 void finalize_array_stream_xptr(SEXP array_stream_xptr);
 
 // Returns the underlying struct ArrowSchema* from an external pointer,
diff --git a/r/src/nanoarrow_cpp.cc b/r/src/nanoarrow_cpp.cc
index ea7ef5a..9c0e38d 100644
--- a/r/src/nanoarrow_cpp.cc
+++ b/r/src/nanoarrow_cpp.cc
@@ -19,8 +19,8 @@
 #include <R.h>
 #include <Rinternals.h>
 
-#include <cstring>
 #include <cstdint>
+#include <cstring>
 #include <mutex>
 #include <string>
 #include <thread>
@@ -61,6 +61,8 @@ class PreservedSEXPRegistry {
 
   int64_t size() { return preserved_count_; }
 
+  bool is_main_thread() { return std::this_thread::get_id() == 
main_thread_id_; }
+
   void preserve(SEXP obj) {
     if (obj == R_NilValue) {
       return;
@@ -190,6 +192,10 @@ extern "C" int64_t nanoarrow_preserved_empty(void) {
   }
 }
 
+extern "C" int nanoarrow_is_main_thread(void) {
+  return PreservedSEXPRegistry::GetInstance().is_main_thread();
+}
+
 extern "C" void nanoarrow_preserve_and_release_on_other_thread(SEXP obj) {
   nanoarrow_preserve_sexp(obj);
   std::thread worker([obj] { nanoarrow_release_sexp(obj); });
diff --git a/r/src/pointers.c b/r/src/pointers.c
index 47871d5..23b27f6 100644
--- a/r/src/pointers.c
+++ b/r/src/pointers.c
@@ -96,21 +96,22 @@ SEXP nanoarrow_c_pointer_is_valid(SEXP ptr) {
 SEXP nanoarrow_c_pointer_release(SEXP ptr) {
   if (Rf_inherits(ptr, "nanoarrow_schema")) {
     struct ArrowSchema* obj = (struct ArrowSchema*)R_ExternalPtrAddr(ptr);
-    if (Rf_ScalarLogical(obj != NULL && obj->release != NULL)) {
+    if (obj != NULL && obj->release != NULL) {
       obj->release(obj);
       obj->release = NULL;
     }
   } else if (Rf_inherits(ptr, "nanoarrow_array")) {
     struct ArrowArray* obj = (struct ArrowArray*)R_ExternalPtrAddr(ptr);
-    if (Rf_ScalarLogical(obj != NULL && obj->release != NULL)) {
+    if (obj != NULL && obj->release != NULL) {
       obj->release(obj);
       obj->release = NULL;
     }
   } else if (Rf_inherits(ptr, "nanoarrow_array_stream")) {
     struct ArrowArrayStream* obj = (struct 
ArrowArrayStream*)R_ExternalPtrAddr(ptr);
-    if (Rf_ScalarLogical(obj != NULL && obj->release != NULL)) {
+    if (obj != NULL && obj->release != NULL) {
       obj->release(obj);
       obj->release = NULL;
+      run_user_array_stream_finalizer(ptr);
     }
   } else {
     Rf_error(
@@ -183,6 +184,8 @@ SEXP nanoarrow_c_pointer_move(SEXP ptr_src, SEXP ptr_dst) {
   // also move SEXP dependencies
   R_SetExternalPtrProtected(ptr_dst, R_ExternalPtrProtected(xptr_src));
   R_SetExternalPtrTag(ptr_dst, R_ExternalPtrTag(xptr_src));
+  R_SetExternalPtrProtected(xptr_src, R_NilValue);
+  R_SetExternalPtrTag(xptr_src, R_NilValue);
 
   UNPROTECT(1);
   return R_NilValue;
@@ -253,6 +256,11 @@ SEXP nanoarrow_c_export_array_stream(SEXP 
array_stream_xptr, SEXP ptr_dst) {
   }
 
   array_stream_export(array_stream_xptr, obj_dst);
+
+  // Remove SEXP dependencies (if important they are kept alive by 
array_stream_export)
+  R_SetExternalPtrProtected(array_stream_xptr, R_NilValue);
+  R_SetExternalPtrTag(array_stream_xptr, R_NilValue);
+
   UNPROTECT(1);
   return R_NilValue;
 }
diff --git a/r/src/util.h b/r/src/util.h
index bafbd60..338cd56 100644
--- a/r/src/util.h
+++ b/r/src/util.h
@@ -43,6 +43,7 @@ void nanoarrow_preserve_sexp(SEXP obj);
 void nanoarrow_release_sexp(SEXP obj);
 int64_t nanoarrow_preserved_count(void);
 int64_t nanoarrow_preserved_empty(void);
+int nanoarrow_is_main_thread(void);
 
 // For testing
 void nanoarrow_preserve_and_release_on_other_thread(SEXP obj);
diff --git a/r/tests/testthat/test-array-stream.R 
b/r/tests/testthat/test-array-stream.R
index d0c8d7c..a6020ca 100644
--- a/r/tests/testthat/test-array-stream.R
+++ b/r/tests/testthat/test-array-stream.R
@@ -165,3 +165,48 @@ test_that("nanoarrow_array_stream get_next() with schema = 
NULL", {
   array <- stream$get_next(schema = NULL)
   expect_error(infer_nanoarrow_schema(array), "has no associated schema")
 })
+
+test_that("User array stream finalizers are run on explicit release", {
+  stream <- basic_array_stream(list(1:5))
+  array_stream_set_finalizer(stream, function() cat("All done!"))
+  expect_output(stream$release(), "All done!")
+  expect_silent(stream$release())
+})
+
+test_that("User array stream finalizers are run on explicit release even when 
moved", {
+  stream <- basic_array_stream(list(1:5))
+  array_stream_set_finalizer(stream, function() cat("All done!"))
+
+  stream2 <- nanoarrow_allocate_array_stream()
+  nanoarrow_pointer_move(stream, stream2)
+  expect_false(nanoarrow_pointer_is_valid(stream))
+  expect_silent(nanoarrow_pointer_release(stream))
+  expect_output(stream2$release(), "All done!")
+  expect_silent(stream2$release())
+})
+
+test_that("User array stream finalizers are run on explicit release even when 
exported", {
+  stream <- basic_array_stream(list(1:5))
+  array_stream_set_finalizer(stream, function() cat("All done!"))
+
+  stream2 <- nanoarrow_allocate_array_stream()
+  nanoarrow_pointer_export(stream, stream2)
+  expect_false(nanoarrow_pointer_is_valid(stream))
+  expect_silent(nanoarrow_pointer_release(stream))
+  expect_output(stream2$release(), "All done!")
+  expect_silent(stream2$release())
+})
+
+test_that("Errors from user array stream finalizer are ignored", {
+  stream <- basic_array_stream(list(1:5))
+  array_stream_set_finalizer(stream, function() stop("Error that will be 
ignored"))
+  # Because this comes from REprintf(), it's not a message and not "output"
+  # according to testthat, so we use capture.output()
+  expect_identical(
+    capture.output(stream$release(), type = "message"),
+    "Error evaluating user-supplied array stream finalizer"
+  )
+
+  expect_false(nanoarrow_pointer_is_valid(stream))
+  expect_silent(stream$release())
+})

Reply via email to