[ 
https://issues.apache.org/jira/browse/ARROW-17446?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17580948#comment-17580948
 ] 

Dewey Dunnington commented on ARROW-17446:
------------------------------------------

Maybe something like:

{code:R}
library(arrow, warn.conflicts = FALSE)
#> Some features are not enabled in this build of Arrow. Run `arrow_info()` for 
more information.
library(rlang)
#> 
#> Attaching package: 'rlang'
#> The following object is masked from 'package:arrow':
#> 
#>     string

# I'm sure there's an easier way to do this
make_dummy_array <- function(type) {
  reader <- RecordBatchReader$create(batches = list(), schema = schema(.x = 
type))
  as_arrow_array(as_arrow_table(reader)[[1]], type = type)
}

make_dummy_array(int8())
#> Array
#> <int8>
#> []

add_udfs_from_expr <- function(x, mask, env = caller_env(), 
                               registry = new.env(parent = emptyenv()), schema 
= NULL) {
  if (is_call(x)) {
    if (is_symbol(x[[1]]) && !exists(as.character(x[[1]]), registry)) {
      temp_fun_name <- paste0("r_temp_udf_", hash(x))
      arg_values <- lapply(x[-1], eval_tidy, mask, env)
      arg_is_expression <- vapply(arg_values, inherits, logical(1), 
"Expression")
      
      args_that_were_not_expressions <- arg_values[!arg_is_expression]
      arg_mapping <- c(which(arg_is_expression), which(!arg_is_expression))
      
      expression_as_types <- lapply(
        arg_values[arg_is_expression],
        function(e) e$type(schema)
      )
      expression_as_fields <- lapply(
        expression_as_types,
        function(t) field("", t)
      )
      expression_as_dummy_arrays <- lapply(
        expression_as_types,
        make_dummy_array
      )
      
      in_schema <- schema(!!!expression_as_fields)
      
      wrapper_fun <- function(context, ...) {
        args_that_were_expressions <- lapply(list(...), as.vector)
        args <- c(args_that_were_expressions, 
args_that_were_not_expressions)[arg_mapping]
        expr <- call2(x[[1]], !!! args)
        eval_tidy(expr, env = env)
      }
      
      dummy_output <- exec(wrapper_fun, list(), !!!expression_as_dummy_arrays)
      out_type <- infer_type(dummy_output)
      
      register_scalar_function(
        temp_fun_name,
        wrapper_fun,
        in_type = in_schema,
        out_type = out_type,
        auto_convert = TRUE
      )
      
      args <- as.list(x[-1])[arg_is_expression]
      call2(temp_fun_name, !!!args)
    } else {
      x[] <- lapply(x, add_udfs_from_expr, mask, env)
      x
    }
  } else {
    x
  }
}

some_unknown_function <- function(x, y, z) {
  x + y / z
}

some_variable <- 1

result <- add_udfs_from_expr(
  quote(some_unknown_function(some_variable, some_col, 3)),
  mask = list(
    some_col = Expression$field_ref("some_col")
  ),
  # I forget why we have to do this
  schema = schema(some_col = float64())
)

result
#> r_temp_udf_9835e6872b8d3b41297ffc264389487d(some_col)

call_function(as.character(result[[1]]), as_arrow_array(1))
#> Array
#> <double>
#> [
#>   1.3333333333333333
#> ]
{code}


> [R] Allow unrecognized R expressions to be callable as compute::Functions
> -------------------------------------------------------------------------
>
>                 Key: ARROW-17446
>                 URL: https://issues.apache.org/jira/browse/ARROW-17446
>             Project: Apache Arrow
>          Issue Type: Bug
>          Components: R
>            Reporter: Ben Kietzman
>            Priority: Major
>              Labels: compute
>
> Currently, if an R expression is not entirely supported by the arrow compute 
> engine, the entire input will be pulled into memory for native R to operate 
> on. It would be possible to instead provide add a custom compute function to 
> the registry (inside {{R_init_arrow}}, probably) which evaluates any sub 
> expressions which couldn't be translated to native arrow compute expressions.
> This would for example allow a filter expression including a call to an R 
> function {{baz}} to evaluate on a dataset larger than memory and with 
> predicate and projection pushdown as normal using the expressions which *are* 
> translatable. The resulting expression might look something like this in c++:
> {code}
> call("and_kleene", {
>   call("greater", {field_ref("a"), scalar(1)}),
>   call("r_expr", {field_ref("b")},
>       /*options=*/RexprOptions{cpp11::function(baz_sexp)}),
> });
> {code}
> In this case although the "r_expr" function is opaque to compute and 
> datasets, we would still recognize that only fields "a" and "b" need to be 
> materialized. Furthermore, the first member of the filter's conjunction is 
> {{a > 1}}, which *is* translatable and could be used for predicate pushdown, 
> for checking against parquet statistics, etc.
> Since R is not multithreaded, the compute function would need to take a 
> global lock to ensure only a single thread of R execution. This would also 
> block the interpreter, so it's not a high-performance solution... but it 
> *would* block the interpreter less than doing everything in pure native R 
> (since at least *some* of the work could be offloaded to worker threads and 
> we could take advantage of batched input). Still, it seems like a worthwhile 
> option to consider



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

Reply via email to