rtpsw commented on code in PR #14682: URL: https://github.com/apache/arrow/pull/14682#discussion_r1048668717
########## python/pyarrow/src/arrow/python/udf.cc: ########## @@ -17,35 +17,83 @@ #include "arrow/python/udf.h" #include "arrow/compute/function.h" +#include "arrow/compute/kernel.h" #include "arrow/python/common.h" +#include "arrow/util/checked_cast.h" namespace arrow { - -using compute::ExecResult; -using compute::ExecSpan; - namespace py { namespace { -struct PythonUdf : public compute::KernelState { - ScalarUdfWrapperCallback cb; +struct PythonScalarUdfKernelState : public compute::KernelState { + explicit PythonScalarUdfKernelState(std::shared_ptr<OwnedRefNoGIL> function) + : function(function) {} + std::shared_ptr<OwnedRefNoGIL> function; - std::shared_ptr<DataType> output_type; +}; - PythonUdf(ScalarUdfWrapperCallback cb, std::shared_ptr<OwnedRefNoGIL> function, - const std::shared_ptr<DataType>& output_type) - : cb(cb), function(function), output_type(output_type) {} +struct PythonScalarUdfKernelInit { + explicit PythonScalarUdfKernelInit(std::shared_ptr<OwnedRefNoGIL> function) + : function(function) {} // function needs to be destroyed at process exit // and Python may no longer be initialized. - ~PythonUdf() { + ~PythonScalarUdfKernelInit() { if (_Py_IsFinalizing()) { function->detach(); } } - Status Exec(compute::KernelContext* ctx, const ExecSpan& batch, ExecResult* out) { + Result<std::unique_ptr<compute::KernelState>> operator()( + compute::KernelContext*, const compute::KernelInitArgs&) { + return std::make_unique<PythonScalarUdfKernelState>(function); + } + + std::shared_ptr<OwnedRefNoGIL> function; +}; + +struct PythonTableUdfKernelInit { + PythonTableUdfKernelInit(std::shared_ptr<OwnedRefNoGIL> function_maker, + ScalarUdfWrapperCallback cb) + : function_maker(function_maker), cb(cb) { + Py_INCREF(function_maker->obj()); + } + + Result<std::unique_ptr<compute::KernelState>> operator()( + compute::KernelContext* ctx, const compute::KernelInitArgs&) { + ScalarUdfContext udf_context{ctx->memory_pool(), /*batch_length=*/0}; + std::unique_ptr<OwnedRefNoGIL> function; + RETURN_NOT_OK(SafeCallIntoPython([this, &udf_context, &function] { + OwnedRef empty_tuple(PyTuple_New(0)); Review Comment: OK, keeping the existing kernel structure. Just to clarify, KernelInit gets invoked once in both structures - mine returns a stream and yours would return a stream maker. Then, in my structure KernelCall gets invoked multiple times, returning an output batch each time, whereas in your structure it would get called just once but would return an iterator that would get invoked multiple times, returning an output batch each time. So, the difference between the two structures is syntactic. As noted, the reason I chose my structure is because it is typical to kernels. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: github-unsubscr...@arrow.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org