paleolimbot commented on code in PR #13397:
URL: https://github.com/apache/arrow/pull/13397#discussion_r918307791
##########
r/src/compute.cpp:
##########
@@ -574,3 +576,169 @@ SEXP compute__CallFunction(std::string func_name,
cpp11::list args, cpp11::list
std::vector<std::string> compute__GetFunctionNames() {
return arrow::compute::GetFunctionRegistry()->GetFunctionNames();
}
+
+class RScalarUDFKernelState : public arrow::compute::KernelState {
+ public:
+ RScalarUDFKernelState(cpp11::sexp exec_func, cpp11::sexp resolver)
+ : exec_func_(exec_func), resolver_(resolver) {}
+
+ cpp11::function exec_func_;
+ cpp11::function resolver_;
+};
+
+arrow::Result<arrow::TypeHolder> ResolveScalarUDFOutputType(
+ arrow::compute::KernelContext* context,
+ const std::vector<arrow::TypeHolder>& input_types) {
+ return SafeCallIntoR<arrow::TypeHolder>(
+ [&]() -> arrow::TypeHolder {
+ auto kernel =
+ reinterpret_cast<const
arrow::compute::ScalarKernel*>(context->kernel());
+ auto state =
std::dynamic_pointer_cast<RScalarUDFKernelState>(kernel->data);
+
+ cpp11::writable::list input_types_sexp(input_types.size());
+ for (size_t i = 0; i < input_types.size(); i++) {
+ input_types_sexp[i] =
+ cpp11::to_r6<arrow::DataType>(input_types[i].GetSharedPtr());
+ }
+
+ cpp11::sexp output_type_sexp = state->resolver_(input_types_sexp);
+ if (!Rf_inherits(output_type_sexp, "DataType")) {
+ cpp11::stop("arrow_scalar_function resolver must return a DataType");
+ }
+
+ return arrow::TypeHolder(
+ cpp11::as_cpp<std::shared_ptr<arrow::DataType>>(output_type_sexp));
+ },
+ "resolve scalar user-defined function output data type");
+}
+
+arrow::Status CallRScalarUDF(arrow::compute::KernelContext* context,
+ const arrow::compute::ExecSpan& span,
+ arrow::compute::ExecResult* result) {
+ if (result->is_array_span()) {
+ return arrow::Status::NotImplemented("ArraySpan result from R scalar UDF");
+ }
+
+ return SafeCallIntoRVoid(
+ [&]() {
+ auto kernel =
+ reinterpret_cast<const
arrow::compute::ScalarKernel*>(context->kernel());
+ auto state =
std::dynamic_pointer_cast<RScalarUDFKernelState>(kernel->data);
+
+ cpp11::writable::list args_sexp(span.num_values());
+
+ for (int i = 0; i < span.num_values(); i++) {
+ const arrow::compute::ExecValue& exec_val = span[i];
+ if (exec_val.is_array()) {
+ args_sexp[i] =
cpp11::to_r6<arrow::Array>(exec_val.array.ToArray());
+ } else if (exec_val.is_scalar()) {
+ args_sexp[i] =
cpp11::to_r6<arrow::Scalar>(exec_val.scalar->GetSharedPtr());
+ }
+ }
+
+ cpp11::sexp batch_length_sexp = cpp11::as_sexp(span.length);
+
+ std::shared_ptr<arrow::DataType> output_type =
result->type()->GetSharedPtr();
+ cpp11::sexp output_type_sexp =
cpp11::to_r6<arrow::DataType>(output_type);
+ cpp11::writable::list udf_context = {batch_length_sexp,
output_type_sexp};
+ udf_context.names() = {"batch_length", "output_type"};
+
+ cpp11::sexp func_result_sexp = state->exec_func_(udf_context,
args_sexp);
+
+ if (Rf_inherits(func_result_sexp, "Array")) {
+ auto array =
cpp11::as_cpp<std::shared_ptr<arrow::Array>>(func_result_sexp);
+
+ // handle an Array result of the wrong type
+ if (!result->type()->Equals(array->type())) {
+ arrow::Datum out = ValueOrStop(arrow::compute::Cast(array,
result->type()));
Review Comment:
I took this out - the "advanced" scalar function no longer casts the result
automatically (but the `arrow_scalar_function()` does, since if you do
something like `return(5)` in R you'll get a float64 Array. That auto-casting
there is handled in R (and there is skepticism at the advanced/non advanced
distinction and one of them might go away).
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]