This is an automated email from the ASF dual-hosted git repository.
paleolimbot pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/arrow-nanoarrow.git
The following commit(s) were added to refs/heads/main by this push:
new 0af89254 fix(r): Collect array streams in C (not R) before conversion
(#828)
0af89254 is described below
commit 0af89254f26fb6bacc59365edf8bab8fbc901e2a
Author: Dewey Dunnington <[email protected]>
AuthorDate: Wed Dec 3 20:58:01 2025 -0600
fix(r): Collect array streams in C (not R) before conversion (#828)
This PR moves the proecess of collecting an array stream from R (where
we had preserve/protect volume issues that made garbage collection very,
very slow) into C/C++.
Doesn't quite solve #822 but it should help!
Reproducer for generating an IPC file with a lot of strings:
<details>
```r
library(nanoarrow)
ascii_bytes <- vapply(letters, charToRaw, raw(1), USE.NAMES = FALSE)
random_string_array <- function(n = 1, n_chars = 16) {
data_buffer <- sample(ascii_bytes, n_chars * n, replace = TRUE)
offsets_buffer <- as.integer(seq(0, n * n_chars, length.out = n + 1))
nanoarrow_array_modify(
nanoarrow_array_init(na_string()),
list(
length = n,
null_count = 0,
buffers = list(NULL, offsets_buffer, data_buffer)
)
)
}
random_string_struct <- function(n_rows = 1024, n_cols = 1, n_chars = 16) {
col_names <- sprintf("col%03d", seq_len(n_cols))
col_types <- rep(list(na_string()), n_cols)
names(col_types) <- col_names
schema <- na_struct(col_types)
columns <- lapply(
col_names,
function(...) random_string_array(n_rows, n_chars = n_chars)
)
nanoarrow_array_modify(
nanoarrow_array_init(schema),
list(
length = n_rows,
null_count = 0,
children = columns
)
)
}
random_string_batches <- function(n_batches = 1, n_rows = 1, n_cols = 1,
n_chars = 16) {
lapply(
seq_len(n_batches),
function(...) random_string_struct(n_rows, n_cols, n_chars)
)
}
batches <- random_string_batches(n_batches = 100, n_cols = 160)
stream <- basic_array_stream(batches)
write_nanoarrow(stream, "many_strings.arrows")
```
</details>
...in a separate R session, the issues around taking a long time for the
GC to run seemed to go away (but it would be great to have a check!)
```r
library(nanoarrow)
df <- read_nanoarrow("many_strings.arrows") |>
convert_array_stream()
f
nanoarrow:::preserved_count()
#> [1] 0
system.time(gc(), gcFirst = FALSE)
#> user system elapsed
#> 0.036 0.001 0.037
```
---------
Co-authored-by: Copilot <[email protected]>
---
r/R/convert-array-stream.R | 22 ++++--------
r/bootstrap.R | 4 ---
r/src/.gitignore | 2 ++
r/src/init.c | 2 ++
r/src/nanoarrow_cpp.cc | 88 ++++++++++++++++++++++++++++++++++++++++++++++
r/tools/make-callentries.R | 2 +-
6 files changed, 99 insertions(+), 21 deletions(-)
diff --git a/r/R/convert-array-stream.R b/r/R/convert-array-stream.R
index b1f3e85e..c8e73aed 100644
--- a/r/R/convert-array-stream.R
+++ b/r/R/convert-array-stream.R
@@ -72,31 +72,21 @@ convert_array_stream <- function(array_stream, to = NULL,
size = NULL, n = Inf)
} else {
# Otherwise, we need to collect all batches and calculate the total length
# before calling nanoarrow_c_convert_array_stream().
- batches <- collect_array_stream(
- array_stream,
- n,
- schema = schema,
- validate = FALSE
- )
+ batch_info <- .Call(nanoarrow_c_collect_array_stream, array_stream, n)
# If there is exactly one batch, use convert_array(). Converting a single
# array currently takes a more efficient code path for types that can be
# converted as ALTREP (e.g., strings).
- if (length(batches) == 1L) {
- return(.Call(nanoarrow_c_convert_array, batches[[1]], to))
+ if (batch_info$n == 1L) {
+ array <- batch_info$stream$get_next(schema)
+ return(.Call(nanoarrow_c_convert_array, array, to))
}
- # Otherwise, compute the final size, create another array stream,
- # and call convert_array_stream() with a known size. Using .Call()
- # directly because we have already type checked the inputs.
- size <- .Call(nanoarrow_c_array_list_total_length, batches)
- basic_stream <- .Call(nanoarrow_c_basic_array_stream, batches, schema,
FALSE)
-
.Call(
nanoarrow_c_convert_array_stream,
- basic_stream,
+ batch_info$stream,
to,
- as.double(size),
+ as.double(batch_info$size),
Inf
)
}
diff --git a/r/bootstrap.R b/r/bootstrap.R
index c285f487..041a4b40 100644
--- a/r/bootstrap.R
+++ b/r/bootstrap.R
@@ -60,7 +60,3 @@ stopifnot(file.exists("../CMakeLists.txt") && run_bundler())
f <- "src/flatcc/portable/pdiagnostic.h"
lines <- readLines(f)
writeLines(gsub("^#pragma", "/**/#pragma", lines), f)
-
-# Remove unused files
-unused_files <- list.files("src", "\\.hpp$", full.names = TRUE)
-unlink(unused_files)
diff --git a/r/src/.gitignore b/r/src/.gitignore
index c5bf0ca6..5093a88f 100644
--- a/r/src/.gitignore
+++ b/r/src/.gitignore
@@ -22,5 +22,7 @@ nanoarrow.c
nanoarrow.h
nanoarrow_ipc.h
nanoarrow_ipc.c
+nanoarrow.hpp
+nanoarrow_ipc.hpp
flatcc*
Makevars
diff --git a/r/src/init.c b/r/src/init.c
index 4d2109a2..70eec867 100644
--- a/r/src/init.c
+++ b/r/src/init.c
@@ -61,6 +61,7 @@ extern SEXP nanoarrow_c_ipc_array_reader_buffer(SEXP
buffer_xptr);
extern SEXP nanoarrow_c_ipc_array_reader_connection(SEXP con);
extern SEXP nanoarrow_c_ipc_writer_connection(SEXP con);
extern SEXP nanoarrow_c_ipc_writer_write_stream(SEXP writer_xptr, SEXP
array_stream_xptr);
+extern SEXP nanoarrow_c_collect_array_stream(SEXP array_stream_xptr, SEXP
n_sexp);
extern SEXP nanoarrow_c_allocate_schema(void);
extern SEXP nanoarrow_c_allocate_array(void);
extern SEXP nanoarrow_c_allocate_array_stream(void);
@@ -144,6 +145,7 @@ static const R_CallMethodDef CallEntries[] = {
{"nanoarrow_c_ipc_writer_connection",
(DL_FUNC)&nanoarrow_c_ipc_writer_connection, 1},
{"nanoarrow_c_ipc_writer_write_stream",
(DL_FUNC)&nanoarrow_c_ipc_writer_write_stream,
2},
+ {"nanoarrow_c_collect_array_stream",
(DL_FUNC)&nanoarrow_c_collect_array_stream, 2},
{"nanoarrow_c_allocate_schema", (DL_FUNC)&nanoarrow_c_allocate_schema, 0},
{"nanoarrow_c_allocate_array", (DL_FUNC)&nanoarrow_c_allocate_array, 0},
{"nanoarrow_c_allocate_array_stream",
(DL_FUNC)&nanoarrow_c_allocate_array_stream, 0},
diff --git a/r/src/nanoarrow_cpp.cc b/r/src/nanoarrow_cpp.cc
index 9c0e38d6..83ebe07a 100644
--- a/r/src/nanoarrow_cpp.cc
+++ b/r/src/nanoarrow_cpp.cc
@@ -26,6 +26,9 @@
#include <thread>
#include <vector>
+#include "nanoarrow.hpp"
+#include "nanoarrow/r.h"
+
// Without this infrastructure, it's possible to check that all objects
// are released by running devtools::test(); gc() in a fresh session and
// making sure that nanoarrow:::preserved_count() is zero afterward.
@@ -201,3 +204,88 @@ extern "C" void
nanoarrow_preserve_and_release_on_other_thread(SEXP obj) {
std::thread worker([obj] { nanoarrow_release_sexp(obj); });
worker.join();
}
+
+// Collector utility for iterating over and collecting batches
+// Keeping this all in a single object reduces the amount of C++ deletion
+// we need to keep track of.
+struct ArrayVector {
+ nanoarrow::UniqueSchema schema;
+ nanoarrow::UniqueArray batch;
+ std::vector<nanoarrow::UniqueArray> vec;
+};
+
+// Use an external pointer to handle deleting the ArrayVector in
+// the event of a longjmp
+static void release_array_vector_xptr(SEXP array_vector_xptr) {
+ auto ptr =
reinterpret_cast<ArrayVector*>(R_ExternalPtrAddr(array_vector_xptr));
+ if (ptr != nullptr) {
+ delete ptr;
+ }
+}
+
+// Collects the entire array stream and collects the total number of rows and
+// total number of batches so that the R code on the end of this can decide
+// how best to proceed.
+extern "C" SEXP nanoarrow_c_collect_array_stream(SEXP array_stream_xptr, SEXP
n_sexp) {
+ struct ArrowArrayStream* array_stream =
+ nanoarrow_array_stream_from_xptr(array_stream_xptr);
+
+ double n_real = REAL(n_sexp)[0];
+ int n;
+ if (R_FINITE(n_real)) {
+ n = (int)n_real;
+ } else {
+ n = INT_MAX;
+ }
+
+ auto array_vector = new ArrayVector();
+ SEXP array_vector_xptr =
+ PROTECT(R_MakeExternalPtr(array_vector, R_NilValue, R_NilValue));
+ R_RegisterCFinalizer(array_vector_xptr, &release_array_vector_xptr);
+
+ struct ArrowError error;
+ ArrowErrorInit(&error);
+ int code = ArrowArrayStreamGetSchema(array_stream,
array_vector->schema.get(), &error);
+ if (code != NANOARROW_OK) {
+ Rf_error("ArrowArrayStreamGetSchema() failed (%d): %s", code,
error.message);
+ }
+
+ int64_t n_actual = 0;
+ int64_t size = 0;
+ while (n > 0) {
+ code = ArrowArrayStreamGetNext(array_stream, array_vector->batch.get(),
&error);
+ if (code != NANOARROW_OK) {
+ Rf_error("ArrowArrayStreamGetNext() failed (%d): %s", code,
error.message);
+ }
+
+ if (array_vector->batch->release == nullptr) {
+ break;
+ }
+
+ size += array_vector->batch->length;
+ ++n_actual;
+ --n;
+ array_vector->vec.push_back(std::move(array_vector->batch));
+ array_vector->batch.reset();
+
+ R_CheckUserInterrupt();
+ }
+
+ SEXP array_stream_out_xptr = PROTECT(nanoarrow_array_stream_owning_xptr());
+ struct ArrowArrayStream* array_stream_out =
+ nanoarrow_output_array_stream_from_xptr(array_stream_out_xptr);
+
+ nanoarrow::VectorArrayStream(array_vector->schema.get(),
std::move(array_vector->vec))
+ .ToArrayStream(array_stream_out);
+
+ SEXP size_sexp = PROTECT(Rf_ScalarReal(size));
+ SEXP n_actual_sexp = PROTECT(Rf_ScalarReal(n_actual));
+ const char* names[] = {"stream", "size", "n", ""};
+ SEXP out = PROTECT(Rf_mkNamed(VECSXP, names));
+ SET_VECTOR_ELT(out, 0, array_stream_out_xptr);
+ SET_VECTOR_ELT(out, 1, size_sexp);
+ SET_VECTOR_ELT(out, 2, n_actual_sexp);
+
+ UNPROTECT(5);
+ return out;
+}
diff --git a/r/tools/make-callentries.R b/r/tools/make-callentries.R
index 403169c3..f40fe654 100644
--- a/r/tools/make-callentries.R
+++ b/r/tools/make-callentries.R
@@ -21,7 +21,7 @@
library(tidyverse)
-src_files <- list.files("src", "\\.(c|cpp)$", full.names = TRUE) %>%
+src_files <- list.files("src", "\\.(c|cc|cpp)$", full.names = TRUE) %>%
setdiff("src/init.c")
src_sources <- src_files %>% set_names() %>% map_chr(readr::read_file)