paleolimbot commented on code in PR #13397:
URL: https://github.com/apache/arrow/pull/13397#discussion_r916747754
##########
r/src/compute.cpp:
##########
@@ -574,3 +576,186 @@ SEXP compute__CallFunction(std::string func_name,
cpp11::list args, cpp11::list
std::vector<std::string> compute__GetFunctionNames() {
return arrow::compute::GetFunctionRegistry()->GetFunctionNames();
}
+
+class RScalarUDFKernelState : public arrow::compute::KernelState {
+ public:
+ RScalarUDFKernelState(cpp11::sexp exec_func, cpp11::sexp resolver)
+ : exec_func_(exec_func), resolver_(resolver) {}
+
+ cpp11::function exec_func_;
+ cpp11::function resolver_;
+};
+
+class RScalarUDFOutputTypeResolver : public
arrow::compute::OutputType::Resolver {
+ public:
+ arrow::Result<arrow::ValueDescr> operator()(
+ arrow::compute::KernelContext* context,
+ const std::vector<arrow::ValueDescr>& descr) {
+ return SafeCallIntoR<arrow::ValueDescr>(
+ [&]() -> arrow::ValueDescr {
+ auto kernel =
+ reinterpret_cast<const
arrow::compute::ScalarKernel*>(context->kernel());
+ auto state =
std::dynamic_pointer_cast<RScalarUDFKernelState>(kernel->data);
+
+ cpp11::writable::list input_types_sexp(descr.size());
+ for (size_t i = 0; i < descr.size(); i++) {
+ input_types_sexp[i] = cpp11::to_r6<arrow::DataType>(descr[i].type);
+ }
+
+ cpp11::sexp output_type_sexp = state->resolver_(input_types_sexp);
+ if (!Rf_inherits(output_type_sexp, "DataType")) {
+ cpp11::stop("arrow_scalar_function resolver must return a
DataType");
+ }
+
+ return arrow::ValueDescr(
+
cpp11::as_cpp<std::shared_ptr<arrow::DataType>>(output_type_sexp));
+ },
+ "resolve scalar user-defined function output data type");
+ }
+};
+
+class RScalarUDFCallable : public arrow::compute::ArrayKernelExec {
+ public:
+ arrow::Status operator()(arrow::compute::KernelContext* context,
+ const arrow::compute::ExecSpan& span,
+ arrow::compute::ExecResult* result) {
+ return SafeCallIntoRVoid(
+ [&]() {
+ auto kernel =
+ reinterpret_cast<const
arrow::compute::ScalarKernel*>(context->kernel());
+ auto state =
std::dynamic_pointer_cast<RScalarUDFKernelState>(kernel->data);
+
+ cpp11::writable::list args_sexp(span.num_values());
+
+ for (int i = 0; i < span.num_values(); i++) {
+ const arrow::compute::ExecValue& exec_val = span[i];
+ if (exec_val.is_array()) {
+ std::shared_ptr<arrow::Array> array = exec_val.array.ToArray();
+ args_sexp[i] = cpp11::to_r6<arrow::Array>(array);
+ } else if (exec_val.is_scalar()) {
+ std::shared_ptr<arrow::Scalar> scalar = exec_val.scalar->Copy();
+ args_sexp[i] = cpp11::to_r6<arrow::Scalar>(scalar);
+ }
+ }
+
+ cpp11::sexp batch_length_sexp = cpp11::as_sexp(span.length);
+
+ std::shared_ptr<arrow::DataType> output_type =
result->type()->Copy();
+ cpp11::sexp output_type_sexp =
cpp11::to_r6<arrow::DataType>(output_type);
+ cpp11::writable::list udf_context = {batch_length_sexp,
output_type_sexp};
+ udf_context.names() = {"batch_length", "output_type"};
+
+ cpp11::sexp func_result_sexp = state->exec_func_(udf_context,
args_sexp);
+
+ if (Rf_inherits(func_result_sexp, "Array")) {
+ auto array =
cpp11::as_cpp<std::shared_ptr<arrow::Array>>(func_result_sexp);
+
+ // handle an Array result of the wrong type
+ if (!result->type()->Equals(array->type())) {
+ arrow::Datum out =
+ ValueOrStop(arrow::compute::Cast(array,
result->type()->Copy()));
+ std::shared_ptr<arrow::Array> out_array = out.make_array();
+ array.swap(out_array);
+ }
+
+ // make sure we assign the type that the result is expecting
+ if (result->is_array_data()) {
+ result->value = std::move(array->data());
+ } else if (array->length() == 1) {
+ result->value = ValueOrStop(array->GetScalar(0));
+ } else {
+ cpp11::stop("expected Scalar return value but got Array with
length != 1");
+ }
+ } else if (Rf_inherits(func_result_sexp, "Scalar")) {
+ auto scalar =
cpp11::as_cpp<std::shared_ptr<arrow::Scalar>>(func_result_sexp);
+
+ // handle a Scalar result of the wrong type
+ if (!result->type()->Equals(scalar->type)) {
+ arrow::Datum out =
+ ValueOrStop(arrow::compute::Cast(scalar,
result->type()->Copy()));
+ std::shared_ptr<arrow::Scalar> out_scalar = out.scalar();
+ scalar.swap(out_scalar);
+ }
+
+ // make sure we assign the type that the result is expecting
+ if (result->is_scalar()) {
+ result->value = std::move(scalar);
+ } else {
+ auto array = ValueOrStop(arrow::MakeArrayFromScalar(
+ *scalar, span.length, context->memory_pool()));
+ result->value = std::move(array->data());
+ }
+ } else {
+ cpp11::stop("arrow_scalar_function must return an Array or
Scalar");
+ }
+ },
+ "execute scalar user-defined function");
+ }
+};
+
+// [[arrow::export]]
+void RegisterScalarUDF(std::string name, cpp11::sexp func_sexp) {
+ cpp11::list in_type_r(func_sexp.attr("in_type"));
+ cpp11::list out_type_r(func_sexp.attr("out_type"));
+ R_xlen_t n_kernels = in_type_r.size();
+
+ if (n_kernels == 0) {
+ cpp11::stop("Can't register user-defined function with zero kernels");
+ }
+
+ // compute the Arity from the list of input kernels
+ std::vector<int64_t> n_args(n_kernels);
+ for (R_xlen_t i = 0; i < n_kernels; i++) {
+ auto in_types =
cpp11::as_cpp<std::shared_ptr<arrow::Schema>>(in_type_r[i]);
+ n_args[i] = in_types->num_fields();
Review Comment:
The internet seems to be divided on whether or not one should `std::move()`
an integer (since under the hood it's just copying anyway). I'm inclined to
leave as is?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]