lidavidm commented on code in PR #12590: URL: https://github.com/apache/arrow/pull/12590#discussion_r854069192
########## cpp/src/arrow/python/common.h: ########## @@ -180,6 +180,9 @@ class ARROW_PYTHON_EXPORT OwnedRefNoGIL : public OwnedRef { explicit OwnedRefNoGIL(PyObject* obj) : OwnedRef(obj) {} ~OwnedRefNoGIL() { + if (obj() == NULLPTR) { Review Comment: Why are we creating a ref to nullptr in the first place? ########## cpp/src/arrow/python/udf.cc: ########## @@ -0,0 +1,234 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#include "arrow/python/udf.h" + +#include <cstddef> +#include <memory> +#include <sstream> + +#include "arrow/compute/function.h" +#include "arrow/python/common.h" + +namespace arrow { + +namespace py { + +Status CheckOutputType(const DataType& expected, const DataType& actual) { + if (!expected.Equals(actual)) { + return Status::TypeError("Expected output type, ", expected.name(), + ", but function returned type ", actual.name()); + } + return Status::OK(); +} + +// struct PythonUdf { +// std::shared_ptr<OwnedRefNoGIL> function; +// compute::OutputType output_type; + +// // function needs to be destroyed at process exit +// // and Python may no longer be initialized. +// ~PythonUdf() { +// if (_Py_IsFinalizing()) { +// function->detach(); +// } +// } + +// Status operator()(compute::KernelContext* ctx, const compute::ExecBatch& batch, +// Datum* out) { +// return SafeCallIntoPython([=]() -> Status { return Execute(ctx, batch, out); }); +// } + +// Status Execute(compute::KernelContext* ctx, const compute::ExecBatch& batch, +// Datum* out) { +// const auto num_args = batch.values.size(); +// PyObject* arg_tuple = PyTuple_New(num_args); +// for (size_t arg_id = 0; arg_id < num_args; arg_id++) { +// switch (batch[arg_id].kind()) { +// case Datum::SCALAR: { +// auto c_data = batch[arg_id].scalar(); +// PyObject* data = wrap_scalar(c_data); +// PyTuple_SetItem(arg_tuple, arg_id, data); +// break; +// } +// case Datum::ARRAY: { +// auto c_data = batch[arg_id].make_array(); +// PyObject* data = wrap_array(c_data); +// PyTuple_SetItem(arg_tuple, arg_id, data); +// break; +// } +// default: +// auto datum = batch[arg_id]; +// return Status::NotImplemented( +// "User-defined-functions are not supported for the datum kind ", +// datum.ToString(batch[arg_id].kind())); +// } +// } +// PyObject* result; +// result = PyObject_CallObject(function->obj(), arg_tuple); +// RETURN_NOT_OK(CheckPyError()); +// if (result == Py_None) { +// return Status::Invalid("Output is None, but expected an array"); +// } +// // unwrapping the output for expected output type +// if (is_scalar(result)) { +// ARROW_ASSIGN_OR_RAISE(auto val, unwrap_scalar(result)); +// RETURN_NOT_OK(CheckOutputType(*output_type.type(), *val->type)); +// *out = Datum(val); +// return Status::OK(); +// } else if (is_array(result)) { +// ARROW_ASSIGN_OR_RAISE(auto val, unwrap_array(result)); +// RETURN_NOT_OK(CheckOutputType(*output_type.type(), *val->type())); +// *out = Datum(val); +// return Status::OK(); +// } else { +// return Status::TypeError("Unexpected output type: ", Py_TYPE(result)->tp_name, +// " (expected Scalar or Array)"); +// } +// return Status::OK(); +// } +// }; Review Comment: Don't leave commented out code. ########## cpp/src/arrow/datum.h: ########## @@ -283,6 +283,8 @@ struct ARROW_EXPORT Datum { std::string ToString() const; ARROW_EXPORT friend void PrintTo(const Datum&, std::ostream*); + + ARROW_EXPORT std::string ToString(Datum::Kind kind); Review Comment: Why is this a method of Datum and not a free function? ########## python/pyarrow/_compute.pyx: ########## @@ -2251,3 +2339,208 @@ cdef CExpression _bind(Expression filter, Schema schema) except *: return GetResultValue(filter.unwrap().Bind( deref(pyarrow_unwrap_schema(schema).get()))) + + +cdef class ScalarUdfContext: + """ + A container to hold user-defined-function related Review Comment: Docstrings should follow the proper numpydoc format (https://numpydoc.readthedocs.io/en/latest/format.html, also see https://sphinxcontrib-napoleon.readthedocs.io/en/latest/example_numpy.html) ########## python/pyarrow/includes/libarrow.pxd: ########## @@ -1840,6 +1842,36 @@ cdef extern from "arrow/compute/api.h" namespace "arrow::compute" nogil: int num_args c_bool is_varargs + CArity() + CArity(int num_args, c_bool is_varargs) + + @staticmethod + CArity Nullary() + + @staticmethod + CArity Unary() + + @staticmethod + CArity Binary() + + @staticmethod + CArity Ternary() + + @staticmethod + CArity VarArgs(int min_args) Review Comment: We don't need all of these overloads do we? ########## cpp/src/arrow/python/udf.cc: ########## @@ -0,0 +1,234 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#include "arrow/python/udf.h" + +#include <cstddef> +#include <memory> +#include <sstream> + +#include "arrow/compute/function.h" +#include "arrow/python/common.h" + +namespace arrow { + +namespace py { + +Status CheckOutputType(const DataType& expected, const DataType& actual) { + if (!expected.Equals(actual)) { + return Status::TypeError("Expected output type, ", expected.name(), + ", but function returned type ", actual.name()); + } + return Status::OK(); +} + +// struct PythonUdf { +// std::shared_ptr<OwnedRefNoGIL> function; +// compute::OutputType output_type; + +// // function needs to be destroyed at process exit +// // and Python may no longer be initialized. +// ~PythonUdf() { +// if (_Py_IsFinalizing()) { +// function->detach(); +// } +// } + +// Status operator()(compute::KernelContext* ctx, const compute::ExecBatch& batch, +// Datum* out) { +// return SafeCallIntoPython([=]() -> Status { return Execute(ctx, batch, out); }); +// } + +// Status Execute(compute::KernelContext* ctx, const compute::ExecBatch& batch, +// Datum* out) { +// const auto num_args = batch.values.size(); +// PyObject* arg_tuple = PyTuple_New(num_args); +// for (size_t arg_id = 0; arg_id < num_args; arg_id++) { +// switch (batch[arg_id].kind()) { +// case Datum::SCALAR: { +// auto c_data = batch[arg_id].scalar(); +// PyObject* data = wrap_scalar(c_data); +// PyTuple_SetItem(arg_tuple, arg_id, data); +// break; +// } +// case Datum::ARRAY: { +// auto c_data = batch[arg_id].make_array(); +// PyObject* data = wrap_array(c_data); +// PyTuple_SetItem(arg_tuple, arg_id, data); +// break; +// } +// default: +// auto datum = batch[arg_id]; +// return Status::NotImplemented( +// "User-defined-functions are not supported for the datum kind ", +// datum.ToString(batch[arg_id].kind())); +// } +// } +// PyObject* result; +// result = PyObject_CallObject(function->obj(), arg_tuple); +// RETURN_NOT_OK(CheckPyError()); +// if (result == Py_None) { +// return Status::Invalid("Output is None, but expected an array"); +// } +// // unwrapping the output for expected output type +// if (is_scalar(result)) { +// ARROW_ASSIGN_OR_RAISE(auto val, unwrap_scalar(result)); +// RETURN_NOT_OK(CheckOutputType(*output_type.type(), *val->type)); +// *out = Datum(val); +// return Status::OK(); +// } else if (is_array(result)) { +// ARROW_ASSIGN_OR_RAISE(auto val, unwrap_array(result)); +// RETURN_NOT_OK(CheckOutputType(*output_type.type(), *val->type())); +// *out = Datum(val); +// return Status::OK(); +// } else { +// return Status::TypeError("Unexpected output type: ", Py_TYPE(result)->tp_name, +// " (expected Scalar or Array)"); +// } +// return Status::OK(); +// } +// }; + +struct PythonUdf { + ScalarUdfWrapperCallback cb; + std::shared_ptr<OwnedRefNoGIL> function; + compute::OutputType output_type; + + // function needs to be destroyed at process exit + // and Python may no longer be initialized. + ~PythonUdf() { + if (_Py_IsFinalizing()) { + function->detach(); + } + } + + Status operator()(compute::KernelContext* ctx, const compute::ExecBatch& batch, + Datum* out) { + return SafeCallIntoPython([=]() -> Status { return Execute(ctx, batch, out); }); + } + + Status Execute(compute::KernelContext* ctx, const compute::ExecBatch& batch, + Datum* out) { + const auto num_args = batch.values.size(); + ScalarUdfContext udf_context{ctx->memory_pool(), static_cast<int64_t>(num_args)}; + PyObject* arg_tuple = PyTuple_New(num_args); + for (size_t arg_id = 0; arg_id < num_args; arg_id++) { + switch (batch[arg_id].kind()) { + case Datum::SCALAR: { + auto c_data = batch[arg_id].scalar(); + PyObject* data = wrap_scalar(c_data); + PyTuple_SetItem(arg_tuple, arg_id, data); + break; + } + case Datum::ARRAY: { + auto c_data = batch[arg_id].make_array(); + PyObject* data = wrap_array(c_data); + PyTuple_SetItem(arg_tuple, arg_id, data); + break; + } + default: + auto datum = batch[arg_id]; + return Status::NotImplemented( + "User-defined-functions are not supported for the datum kind ", + datum.ToString(batch[arg_id].kind())); + } + } + PyObject* result; + // result = PyObject_CallObject(function->obj(), arg_tuple); + result = cb(function->obj(), udf_context, arg_tuple); + RETURN_NOT_OK(CheckPyError()); + if (result == Py_None) { + return Status::Invalid("Output is None, but expected an array"); + } + // unwrapping the output for expected output type + if (is_scalar(result)) { + ARROW_ASSIGN_OR_RAISE(auto val, unwrap_scalar(result)); + RETURN_NOT_OK(CheckOutputType(*output_type.type(), *val->type)); + *out = Datum(val); + return Status::OK(); + } else if (is_array(result)) { + ARROW_ASSIGN_OR_RAISE(auto val, unwrap_array(result)); + RETURN_NOT_OK(CheckOutputType(*output_type.type(), *val->type())); + *out = Datum(val); + return Status::OK(); + } else { + return Status::TypeError("Unexpected output type: ", Py_TYPE(result)->tp_name, + " (expected Scalar or Array)"); + } + return Status::OK(); + } +}; + +// Status RegisterScalarFunction(PyObject* function, const ScalarUdfOptions& options) { +// if (function == nullptr) { +// return Status::Invalid("Python function cannot be null"); +// } +// if (!PyCallable_Check(function)) { +// return Status::TypeError("Expected a callable Python object."); +// } +// auto doc = options.doc(); +// auto arity = options.arity(); +// auto in_types = options.input_types(); +// auto exp_out_type = options.output_type(); +// auto scalar_func = +// std::make_shared<compute::ScalarFunction>(options.name(), arity, std::move(doc)); +// Py_INCREF(function); +// PythonUdf exec{std::make_shared<OwnedRefNoGIL>(function), std::move(exp_out_type)}; +// compute::ScalarKernel kernel( +// compute::KernelSignature::Make(options.input_types(), options.output_type(), +// arity.is_varargs), +// std::move(exec)); +// kernel.mem_allocation = compute::MemAllocation::NO_PREALLOCATE; +// kernel.null_handling = compute::NullHandling::COMPUTED_NO_PREALLOCATE; +// RETURN_NOT_OK(scalar_func->AddKernel(std::move(kernel))); +// auto registry = compute::GetFunctionRegistry(); +// RETURN_NOT_OK(registry->AddFunction(std::move(scalar_func))); +// return Status::OK(); +// } + +Status RegisterScalarFunction(PyObject* user_function, ScalarUdfWrapperCallback wrapper, + const ScalarUdfOptions& options) { + if (user_function == nullptr) { + return Status::Invalid("Python function cannot be null"); + } + if (!PyCallable_Check(user_function)) { + return Status::TypeError("Expected a callable Python object."); + } + auto doc = options.doc(); + auto arity = options.arity(); + auto in_types = options.input_types(); + auto exp_out_type = options.output_type(); Review Comment: nit, but is it really necessary to make copies of all these to just save a few characters over inlining them? ########## cpp/src/arrow/python/udf.h: ########## @@ -0,0 +1,95 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#pragma once + +#include "arrow/python/platform.h" + +#include <cstdint> +#include <memory> +#include <unordered_map> + +#include "arrow/compute/api_scalar.h" +#include "arrow/compute/cast.h" +#include "arrow/compute/exec.h" +#include "arrow/compute/function.h" +#include "arrow/compute/registry.h" + +#include "arrow/python/common.h" +#include "arrow/python/pyarrow.h" +#include "arrow/python/visibility.h" + +namespace arrow { + +namespace py { + +// TODO: TODO(ARROW-16041): UDF Options are not exposed to the Python +// users. This feature will be included when extending to provide advanced +// options for the users. +class ARROW_PYTHON_EXPORT ScalarUdfOptions { + public: + ScalarUdfOptions(const std::string func_name, const compute::Arity arity, + const compute::FunctionDoc func_doc, + const std::vector<compute::InputType> in_types, + const compute::OutputType out_type) + : func_name_(func_name), + kind_(compute::Function::SCALAR), + arity_(arity), + func_doc_(std::move(func_doc)), + in_types_(std::move(in_types)), + out_type_(out_type) {} + + const std::string& name() const { return func_name_; } + + compute::Function::Kind kind() { return kind_; } + + const compute::Arity& arity() const { return arity_; } + + const compute::FunctionDoc& doc() const { return func_doc_; } + + const std::vector<compute::InputType>& input_types() const { return in_types_; } + + const compute::OutputType& output_type() const { return out_type_; } + + private: + std::string func_name_; + compute::Function::Kind kind_; + compute::Arity arity_; + const compute::FunctionDoc func_doc_; + std::vector<compute::InputType> in_types_; + compute::OutputType out_type_; +}; + +struct ARROW_PYTHON_EXPORT ScalarUdfContext { + MemoryPool* pool; + int64_t batch_length; +}; + +/// \brief register a Scalar user-defined-function from Python +Status ARROW_PYTHON_EXPORT RegisterScalarFunction(PyObject* function, + const ScalarUdfOptions& options); Review Comment: Is this needed? ########## python/pyarrow/_compute.pyx: ########## @@ -26,9 +26,14 @@ from collections import namedtuple from pyarrow.lib import frombytes, tobytes, ordered_dict from pyarrow.lib cimport * +from pyarrow.includes.common cimport * from pyarrow.includes.libarrow cimport * import pyarrow.lib as lib +from libc.stdlib cimport free Review Comment: This isn't used. ########## python/pyarrow/_compute.pxd: ########## @@ -22,6 +22,18 @@ from pyarrow.includes.common cimport * from pyarrow.includes.libarrow cimport * +cdef class InputType(_Weakrefable): + cdef: + CInputType input_type + + cdef void init(self, const CInputType &input_type) + +cdef class ScalarUdfContext(_Weakrefable): Review Comment: Why not just `UdfContext` or `KernelContext`? ########## cpp/src/arrow/python/udf.cc: ########## @@ -0,0 +1,234 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#include "arrow/python/udf.h" + +#include <cstddef> +#include <memory> +#include <sstream> + +#include "arrow/compute/function.h" +#include "arrow/python/common.h" + +namespace arrow { + +namespace py { + +Status CheckOutputType(const DataType& expected, const DataType& actual) { + if (!expected.Equals(actual)) { + return Status::TypeError("Expected output type, ", expected.name(), + ", but function returned type ", actual.name()); + } + return Status::OK(); +} + +// struct PythonUdf { +// std::shared_ptr<OwnedRefNoGIL> function; +// compute::OutputType output_type; + +// // function needs to be destroyed at process exit +// // and Python may no longer be initialized. +// ~PythonUdf() { +// if (_Py_IsFinalizing()) { +// function->detach(); +// } +// } + +// Status operator()(compute::KernelContext* ctx, const compute::ExecBatch& batch, +// Datum* out) { +// return SafeCallIntoPython([=]() -> Status { return Execute(ctx, batch, out); }); +// } + +// Status Execute(compute::KernelContext* ctx, const compute::ExecBatch& batch, +// Datum* out) { +// const auto num_args = batch.values.size(); +// PyObject* arg_tuple = PyTuple_New(num_args); +// for (size_t arg_id = 0; arg_id < num_args; arg_id++) { +// switch (batch[arg_id].kind()) { +// case Datum::SCALAR: { +// auto c_data = batch[arg_id].scalar(); +// PyObject* data = wrap_scalar(c_data); +// PyTuple_SetItem(arg_tuple, arg_id, data); +// break; +// } +// case Datum::ARRAY: { +// auto c_data = batch[arg_id].make_array(); +// PyObject* data = wrap_array(c_data); +// PyTuple_SetItem(arg_tuple, arg_id, data); +// break; +// } +// default: +// auto datum = batch[arg_id]; +// return Status::NotImplemented( +// "User-defined-functions are not supported for the datum kind ", +// datum.ToString(batch[arg_id].kind())); +// } +// } +// PyObject* result; +// result = PyObject_CallObject(function->obj(), arg_tuple); +// RETURN_NOT_OK(CheckPyError()); +// if (result == Py_None) { +// return Status::Invalid("Output is None, but expected an array"); +// } +// // unwrapping the output for expected output type +// if (is_scalar(result)) { +// ARROW_ASSIGN_OR_RAISE(auto val, unwrap_scalar(result)); +// RETURN_NOT_OK(CheckOutputType(*output_type.type(), *val->type)); +// *out = Datum(val); +// return Status::OK(); +// } else if (is_array(result)) { +// ARROW_ASSIGN_OR_RAISE(auto val, unwrap_array(result)); +// RETURN_NOT_OK(CheckOutputType(*output_type.type(), *val->type())); +// *out = Datum(val); +// return Status::OK(); +// } else { +// return Status::TypeError("Unexpected output type: ", Py_TYPE(result)->tp_name, +// " (expected Scalar or Array)"); +// } +// return Status::OK(); +// } +// }; + +struct PythonUdf { + ScalarUdfWrapperCallback cb; + std::shared_ptr<OwnedRefNoGIL> function; + compute::OutputType output_type; + + // function needs to be destroyed at process exit + // and Python may no longer be initialized. + ~PythonUdf() { + if (_Py_IsFinalizing()) { + function->detach(); + } + } Review Comment: Hmm, should we set an atexit hook or something instead? ########## python/pyarrow/_compute.pyx: ########## @@ -2251,3 +2339,208 @@ cdef CExpression _bind(Expression filter, Schema schema) except *: return GetResultValue(filter.unwrap().Bind( deref(pyarrow_unwrap_schema(schema).get()))) + + +cdef class ScalarUdfContext: + """ + A container to hold user-defined-function related + entities. `batch_length` and `MemoryPool` are important + entities in defining functions which require these details. + """ + + def __init__(self): + raise TypeError("Do not call {}'s constructor directly" + .format(self.__class__.__name__)) + + cdef void init(self, const CScalarUdfContext &c_context): + self.c_context = c_context + + @property + def batch_length(self): + """ + Returns the length of the batch associated with the + user-defined-function. Useful when the batch_length + is required to do computations specially when scalars + are parameters of the user-defined-function. + + Returns + ------- + batch_length: int64 + The number of batches used when calling + user-defined-function. + """ + return self.c_context.batch_length + + @property + def memory_pool(self): + """ + Returns the MemoryPool associated with the + user-defined-function. An already initialized + MemoryPool can be used within the + user-defined-function. + + Returns + ------- + memory_pool: MemoryPool + MemoryPool is obtained from the KernelContext + and passed to the ScalarUdfContext. + """ + return box_memory_pool(self.c_context.pool) + + +cdef CFunctionDoc _make_function_doc(dict func_doc) except *: + """ + Helper function to generate the FunctionDoc + This function accepts a dictionary and expect the + summary(str), description(str) and arg_names(List[str]) keys. + """ + cdef: + CFunctionDoc f_doc + vector[c_string] c_arg_names + + f_doc.summary = tobytes(func_doc["summary"]) + f_doc.description = tobytes(func_doc["description"]) + for arg_name in func_doc["arg_names"]: + c_arg_names.push_back(tobytes(arg_name)) + f_doc.arg_names = c_arg_names + # UDFOptions integration: + # TODO: https://issues.apache.org/jira/browse/ARROW-16041 + f_doc.options_class = tobytes("") + f_doc.options_required = False + return f_doc + +cdef _scalar_udf_callback(user_function, const CScalarUdfContext& c_context, inputs): + """ + Helper callback function used to wrap the ScalarUdfContext from Python to C++ + execution. + """ + cdef ScalarUdfContext context = ScalarUdfContext.__new__(ScalarUdfContext) + context.init(c_context) + return user_function(context, *inputs) + + +def register_scalar_function(func, func_name, function_doc, in_types, + out_type): + """ + Register a user-defined scalar function. + + A scalar function is a function that executes elementwise + operations on arrays or scalars, and therefore whose results + generally do not depend on the order of the values in the + arguments. Accepts and returns arrays that are all of the + same size. These functions roughly correspond to the functions + used in SQL expressions. + + Parameters + ---------- + func : callable + A callable implementing the user-defined function. + It must take arguments equal to the number of + in_types defined. It must return an Array or Scalar + matching the out_type. It must return a Scalar if + all arguments are scalar, else it must return an Array. + + To define a varargs function, pass a callable that takes + varargs. The last in_type will be the type of the all + varargs arguments. + func_name : str + Name of the function. This name must be globally unique. + function_doc : dict + A dictionary object with keys "summary" (str), + and "description" (str). + in_types : Dict[str, InputType] + Dictionary mapping function argument names to + their respective InputType specifications. + The argument names will be used to generate + documentation for the function. The number of + arguments specified here determines the function + arity. + out_type : DataType + Output type of the function. + + Example + ------- + + >>> import pyarrow.compute as pc + >>> + >>> func_doc = {} + >>> func_doc["summary"] = "simple udf" + >>> func_doc["description"] = "add a constant to a scalar" + >>> + >>> def add_constant(ctx, array): + ... return pc.add(array, 1) + >>> + >>> func_name = "py_add_func" + >>> in_types = {"array": pc.InputType.array(pa.int64())} + >>> out_type = pa.int64() + >>> pc.register_scalar_function(add_constant, func_name, func_doc, + ... in_types, out_type) + >>> + >>> func = pc.get_function(func_name) + >>> func.name + 'py_add_func' + >>> answer = pc.call_function(func_name, [pa.array([20])]) + >>> answer + <pyarrow.lib.Int64Array object at 0x10c22e700> + [ + 21 + ] + """ + cdef: + c_string c_func_name + CArity c_arity + CFunctionDoc c_func_doc + CInputType in_tmp + vector[CInputType] c_in_types + PyObject* c_function + shared_ptr[CDataType] c_type + shared_ptr[COutputType] c_out_type + CStatus st + shared_ptr[CScalarUdfOptions] c_options + + c_func_name = tobytes(func_name) + + if callable(func): + c_function = <PyObject*>func + else: + raise TypeError("Object must be a callable") + + func_spec = inspect.getfullargspec(func) + num_args = -1 + if isinstance(in_types, dict): + for in_type in in_types.values(): + if isinstance(in_type, InputType): + in_tmp = (<InputType> in_type).input_type + c_in_types.push_back(in_tmp) + else: + raise TypeError("in_types must be of type InputType") + function_doc["arg_names"] = in_types.keys() + num_args = len(in_types) + else: + raise TypeError( + "in_types must be a dictionary of InputType") + + if func_spec.varargs: + c_arity = CArity.VarArgs(num_args) + else: + c_arity = CArity(num_args, False) + + if not "summary" in function_doc.keys(): + raise ValueError("Function doc must contain a summary") + + if not "description" in function_doc.keys(): + raise ValueError("Function doc must contain a description") + + if not "arg_names" in function_doc.keys(): + raise ValueError("Function doc must contain arg_names") + + c_func_doc = _make_function_doc(function_doc) Review Comment: At this point why not just inline the function? ########## cpp/src/arrow/python/udf.cc: ########## @@ -0,0 +1,234 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#include "arrow/python/udf.h" + +#include <cstddef> +#include <memory> +#include <sstream> + +#include "arrow/compute/function.h" +#include "arrow/python/common.h" + +namespace arrow { + +namespace py { + +Status CheckOutputType(const DataType& expected, const DataType& actual) { + if (!expected.Equals(actual)) { + return Status::TypeError("Expected output type, ", expected.name(), + ", but function returned type ", actual.name()); + } + return Status::OK(); +} + +// struct PythonUdf { +// std::shared_ptr<OwnedRefNoGIL> function; +// compute::OutputType output_type; + +// // function needs to be destroyed at process exit +// // and Python may no longer be initialized. +// ~PythonUdf() { +// if (_Py_IsFinalizing()) { +// function->detach(); +// } +// } + +// Status operator()(compute::KernelContext* ctx, const compute::ExecBatch& batch, +// Datum* out) { +// return SafeCallIntoPython([=]() -> Status { return Execute(ctx, batch, out); }); +// } + +// Status Execute(compute::KernelContext* ctx, const compute::ExecBatch& batch, +// Datum* out) { +// const auto num_args = batch.values.size(); +// PyObject* arg_tuple = PyTuple_New(num_args); +// for (size_t arg_id = 0; arg_id < num_args; arg_id++) { +// switch (batch[arg_id].kind()) { +// case Datum::SCALAR: { +// auto c_data = batch[arg_id].scalar(); +// PyObject* data = wrap_scalar(c_data); +// PyTuple_SetItem(arg_tuple, arg_id, data); +// break; +// } +// case Datum::ARRAY: { +// auto c_data = batch[arg_id].make_array(); +// PyObject* data = wrap_array(c_data); +// PyTuple_SetItem(arg_tuple, arg_id, data); +// break; +// } +// default: +// auto datum = batch[arg_id]; +// return Status::NotImplemented( +// "User-defined-functions are not supported for the datum kind ", +// datum.ToString(batch[arg_id].kind())); +// } +// } +// PyObject* result; +// result = PyObject_CallObject(function->obj(), arg_tuple); +// RETURN_NOT_OK(CheckPyError()); +// if (result == Py_None) { +// return Status::Invalid("Output is None, but expected an array"); +// } +// // unwrapping the output for expected output type +// if (is_scalar(result)) { +// ARROW_ASSIGN_OR_RAISE(auto val, unwrap_scalar(result)); +// RETURN_NOT_OK(CheckOutputType(*output_type.type(), *val->type)); +// *out = Datum(val); +// return Status::OK(); +// } else if (is_array(result)) { +// ARROW_ASSIGN_OR_RAISE(auto val, unwrap_array(result)); +// RETURN_NOT_OK(CheckOutputType(*output_type.type(), *val->type())); +// *out = Datum(val); +// return Status::OK(); +// } else { +// return Status::TypeError("Unexpected output type: ", Py_TYPE(result)->tp_name, +// " (expected Scalar or Array)"); +// } +// return Status::OK(); +// } +// }; + +struct PythonUdf { + ScalarUdfWrapperCallback cb; + std::shared_ptr<OwnedRefNoGIL> function; Review Comment: Why does this have to be a shared_ptr? ########## cpp/src/arrow/python/udf.cc: ########## @@ -0,0 +1,234 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#include "arrow/python/udf.h" + +#include <cstddef> +#include <memory> +#include <sstream> + +#include "arrow/compute/function.h" +#include "arrow/python/common.h" + +namespace arrow { + +namespace py { + +Status CheckOutputType(const DataType& expected, const DataType& actual) { + if (!expected.Equals(actual)) { + return Status::TypeError("Expected output type, ", expected.name(), + ", but function returned type ", actual.name()); + } + return Status::OK(); +} + +// struct PythonUdf { +// std::shared_ptr<OwnedRefNoGIL> function; +// compute::OutputType output_type; + +// // function needs to be destroyed at process exit +// // and Python may no longer be initialized. +// ~PythonUdf() { +// if (_Py_IsFinalizing()) { +// function->detach(); +// } +// } + +// Status operator()(compute::KernelContext* ctx, const compute::ExecBatch& batch, +// Datum* out) { +// return SafeCallIntoPython([=]() -> Status { return Execute(ctx, batch, out); }); +// } + +// Status Execute(compute::KernelContext* ctx, const compute::ExecBatch& batch, +// Datum* out) { +// const auto num_args = batch.values.size(); +// PyObject* arg_tuple = PyTuple_New(num_args); +// for (size_t arg_id = 0; arg_id < num_args; arg_id++) { +// switch (batch[arg_id].kind()) { +// case Datum::SCALAR: { +// auto c_data = batch[arg_id].scalar(); +// PyObject* data = wrap_scalar(c_data); +// PyTuple_SetItem(arg_tuple, arg_id, data); +// break; +// } +// case Datum::ARRAY: { +// auto c_data = batch[arg_id].make_array(); +// PyObject* data = wrap_array(c_data); +// PyTuple_SetItem(arg_tuple, arg_id, data); +// break; +// } +// default: +// auto datum = batch[arg_id]; +// return Status::NotImplemented( +// "User-defined-functions are not supported for the datum kind ", +// datum.ToString(batch[arg_id].kind())); +// } +// } +// PyObject* result; +// result = PyObject_CallObject(function->obj(), arg_tuple); +// RETURN_NOT_OK(CheckPyError()); +// if (result == Py_None) { +// return Status::Invalid("Output is None, but expected an array"); +// } +// // unwrapping the output for expected output type +// if (is_scalar(result)) { +// ARROW_ASSIGN_OR_RAISE(auto val, unwrap_scalar(result)); +// RETURN_NOT_OK(CheckOutputType(*output_type.type(), *val->type)); +// *out = Datum(val); +// return Status::OK(); +// } else if (is_array(result)) { +// ARROW_ASSIGN_OR_RAISE(auto val, unwrap_array(result)); +// RETURN_NOT_OK(CheckOutputType(*output_type.type(), *val->type())); +// *out = Datum(val); +// return Status::OK(); +// } else { +// return Status::TypeError("Unexpected output type: ", Py_TYPE(result)->tp_name, +// " (expected Scalar or Array)"); +// } +// return Status::OK(); +// } +// }; + +struct PythonUdf { + ScalarUdfWrapperCallback cb; + std::shared_ptr<OwnedRefNoGIL> function; + compute::OutputType output_type; + + // function needs to be destroyed at process exit + // and Python may no longer be initialized. + ~PythonUdf() { + if (_Py_IsFinalizing()) { + function->detach(); + } + } + + Status operator()(compute::KernelContext* ctx, const compute::ExecBatch& batch, + Datum* out) { + return SafeCallIntoPython([=]() -> Status { return Execute(ctx, batch, out); }); + } + + Status Execute(compute::KernelContext* ctx, const compute::ExecBatch& batch, + Datum* out) { + const auto num_args = batch.values.size(); + ScalarUdfContext udf_context{ctx->memory_pool(), static_cast<int64_t>(num_args)}; + PyObject* arg_tuple = PyTuple_New(num_args); + for (size_t arg_id = 0; arg_id < num_args; arg_id++) { + switch (batch[arg_id].kind()) { + case Datum::SCALAR: { + auto c_data = batch[arg_id].scalar(); + PyObject* data = wrap_scalar(c_data); + PyTuple_SetItem(arg_tuple, arg_id, data); + break; + } + case Datum::ARRAY: { + auto c_data = batch[arg_id].make_array(); + PyObject* data = wrap_array(c_data); + PyTuple_SetItem(arg_tuple, arg_id, data); + break; + } + default: + auto datum = batch[arg_id]; + return Status::NotImplemented( + "User-defined-functions are not supported for the datum kind ", + datum.ToString(batch[arg_id].kind())); + } + } + PyObject* result; + // result = PyObject_CallObject(function->obj(), arg_tuple); Review Comment: Don't leave commented code. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: github-unsubscr...@arrow.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org