westonpace commented on code in PR #14682:
URL: https://github.com/apache/arrow/pull/14682#discussion_r1048129423
##########
python/pyarrow/src/arrow/python/udf.cc:
##########
@@ -17,35 +17,83 @@
#include "arrow/python/udf.h"
#include "arrow/compute/function.h"
+#include "arrow/compute/kernel.h"
#include "arrow/python/common.h"
+#include "arrow/util/checked_cast.h"
namespace arrow {
-
-using compute::ExecResult;
-using compute::ExecSpan;
-
namespace py {
namespace {
-struct PythonUdf : public compute::KernelState {
- ScalarUdfWrapperCallback cb;
+struct PythonScalarUdfKernelState : public compute::KernelState {
+ explicit PythonScalarUdfKernelState(std::shared_ptr<OwnedRefNoGIL> function)
+ : function(function) {}
+
std::shared_ptr<OwnedRefNoGIL> function;
- std::shared_ptr<DataType> output_type;
+};
- PythonUdf(ScalarUdfWrapperCallback cb, std::shared_ptr<OwnedRefNoGIL>
function,
- const std::shared_ptr<DataType>& output_type)
- : cb(cb), function(function), output_type(output_type) {}
+struct PythonScalarUdfKernelInit {
+ explicit PythonScalarUdfKernelInit(std::shared_ptr<OwnedRefNoGIL> function)
+ : function(function) {}
// function needs to be destroyed at process exit
// and Python may no longer be initialized.
- ~PythonUdf() {
+ ~PythonScalarUdfKernelInit() {
if (_Py_IsFinalizing()) {
function->detach();
}
}
- Status Exec(compute::KernelContext* ctx, const ExecSpan& batch, ExecResult*
out) {
+ Result<std::unique_ptr<compute::KernelState>> operator()(
+ compute::KernelContext*, const compute::KernelInitArgs&) {
+ return std::make_unique<PythonScalarUdfKernelState>(function);
+ }
+
+ std::shared_ptr<OwnedRefNoGIL> function;
+};
+
+struct PythonTableUdfKernelInit {
+ PythonTableUdfKernelInit(std::shared_ptr<OwnedRefNoGIL> function_maker,
+ ScalarUdfWrapperCallback cb)
+ : function_maker(function_maker), cb(cb) {
+ Py_INCREF(function_maker->obj());
+ }
+
+ Result<std::unique_ptr<compute::KernelState>> operator()(
+ compute::KernelContext* ctx, const compute::KernelInitArgs&) {
+ ScalarUdfContext udf_context{ctx->memory_pool(), /*batch_length=*/0};
+ std::unique_ptr<OwnedRefNoGIL> function;
+ RETURN_NOT_OK(SafeCallIntoPython([this, &udf_context, &function] {
+ OwnedRef empty_tuple(PyTuple_New(0));
Review Comment:
I think your solution will work. It essentially boils down to whether we
want one "init" per input batch/row or one "init" per plan. I don't know
enough to know the pros / cons and I think we can always adjust as needed.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]