[
https://issues.apache.org/jira/browse/ARROW-17446?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17580948#comment-17580948
]
Dewey Dunnington commented on ARROW-17446:
------------------------------------------
Maybe something like:
{code:R}
library(arrow, warn.conflicts = FALSE)
#> Some features are not enabled in this build of Arrow. Run `arrow_info()` for
more information.
library(rlang)
#>
#> Attaching package: 'rlang'
#> The following object is masked from 'package:arrow':
#>
#> string
# I'm sure there's an easier way to do this
make_dummy_array <- function(type) {
reader <- RecordBatchReader$create(batches = list(), schema = schema(.x =
type))
as_arrow_array(as_arrow_table(reader)[[1]], type = type)
}
make_dummy_array(int8())
#> Array
#> <int8>
#> []
add_udfs_from_expr <- function(x, mask, env = caller_env(),
registry = new.env(parent = emptyenv()), schema
= NULL) {
if (is_call(x)) {
if (is_symbol(x[[1]]) && !exists(as.character(x[[1]]), registry)) {
temp_fun_name <- paste0("r_temp_udf_", hash(x))
arg_values <- lapply(x[-1], eval_tidy, mask, env)
arg_is_expression <- vapply(arg_values, inherits, logical(1),
"Expression")
args_that_were_not_expressions <- arg_values[!arg_is_expression]
arg_mapping <- c(which(arg_is_expression), which(!arg_is_expression))
expression_as_types <- lapply(
arg_values[arg_is_expression],
function(e) e$type(schema)
)
expression_as_fields <- lapply(
expression_as_types,
function(t) field("", t)
)
expression_as_dummy_arrays <- lapply(
expression_as_types,
make_dummy_array
)
in_schema <- schema(!!!expression_as_fields)
wrapper_fun <- function(context, ...) {
args_that_were_expressions <- lapply(list(...), as.vector)
args <- c(args_that_were_expressions,
args_that_were_not_expressions)[arg_mapping]
expr <- call2(x[[1]], !!! args)
eval_tidy(expr, env = env)
}
dummy_output <- exec(wrapper_fun, list(), !!!expression_as_dummy_arrays)
out_type <- infer_type(dummy_output)
register_scalar_function(
temp_fun_name,
wrapper_fun,
in_type = in_schema,
out_type = out_type,
auto_convert = TRUE
)
args <- as.list(x[-1])[arg_is_expression]
call2(temp_fun_name, !!!args)
} else {
x[] <- lapply(x, add_udfs_from_expr, mask, env)
x
}
} else {
x
}
}
some_unknown_function <- function(x, y, z) {
x + y / z
}
some_variable <- 1
result <- add_udfs_from_expr(
quote(some_unknown_function(some_variable, some_col, 3)),
mask = list(
some_col = Expression$field_ref("some_col")
),
# I forget why we have to do this
schema = schema(some_col = float64())
)
result
#> r_temp_udf_9835e6872b8d3b41297ffc264389487d(some_col)
call_function(as.character(result[[1]]), as_arrow_array(1))
#> Array
#> <double>
#> [
#> 1.3333333333333333
#> ]
{code}
> [R] Allow unrecognized R expressions to be callable as compute::Functions
> -------------------------------------------------------------------------
>
> Key: ARROW-17446
> URL: https://issues.apache.org/jira/browse/ARROW-17446
> Project: Apache Arrow
> Issue Type: Bug
> Components: R
> Reporter: Ben Kietzman
> Priority: Major
> Labels: compute
>
> Currently, if an R expression is not entirely supported by the arrow compute
> engine, the entire input will be pulled into memory for native R to operate
> on. It would be possible to instead provide add a custom compute function to
> the registry (inside {{R_init_arrow}}, probably) which evaluates any sub
> expressions which couldn't be translated to native arrow compute expressions.
> This would for example allow a filter expression including a call to an R
> function {{baz}} to evaluate on a dataset larger than memory and with
> predicate and projection pushdown as normal using the expressions which *are*
> translatable. The resulting expression might look something like this in c++:
> {code}
> call("and_kleene", {
> call("greater", {field_ref("a"), scalar(1)}),
> call("r_expr", {field_ref("b")},
> /*options=*/RexprOptions{cpp11::function(baz_sexp)}),
> });
> {code}
> In this case although the "r_expr" function is opaque to compute and
> datasets, we would still recognize that only fields "a" and "b" need to be
> materialized. Furthermore, the first member of the filter's conjunction is
> {{a > 1}}, which *is* translatable and could be used for predicate pushdown,
> for checking against parquet statistics, etc.
> Since R is not multithreaded, the compute function would need to take a
> global lock to ensure only a single thread of R execution. This would also
> block the interpreter, so it's not a high-performance solution... but it
> *would* block the interpreter less than doing everything in pure native R
> (since at least *some* of the work could be offloaded to worker threads and
> we could take advantage of batched input). Still, it seems like a worthwhile
> option to consider
--
This message was sent by Atlassian Jira
(v8.20.10#820010)