rtpsw commented on code in PR #14682:
URL: https://github.com/apache/arrow/pull/14682#discussion_r1048668717
##########
python/pyarrow/src/arrow/python/udf.cc:
##########
@@ -17,35 +17,83 @@
#include "arrow/python/udf.h"
#include "arrow/compute/function.h"
+#include "arrow/compute/kernel.h"
#include "arrow/python/common.h"
+#include "arrow/util/checked_cast.h"
namespace arrow {
-
-using compute::ExecResult;
-using compute::ExecSpan;
-
namespace py {
namespace {
-struct PythonUdf : public compute::KernelState {
- ScalarUdfWrapperCallback cb;
+struct PythonScalarUdfKernelState : public compute::KernelState {
+ explicit PythonScalarUdfKernelState(std::shared_ptr<OwnedRefNoGIL> function)
+ : function(function) {}
+
std::shared_ptr<OwnedRefNoGIL> function;
- std::shared_ptr<DataType> output_type;
+};
- PythonUdf(ScalarUdfWrapperCallback cb, std::shared_ptr<OwnedRefNoGIL>
function,
- const std::shared_ptr<DataType>& output_type)
- : cb(cb), function(function), output_type(output_type) {}
+struct PythonScalarUdfKernelInit {
+ explicit PythonScalarUdfKernelInit(std::shared_ptr<OwnedRefNoGIL> function)
+ : function(function) {}
// function needs to be destroyed at process exit
// and Python may no longer be initialized.
- ~PythonUdf() {
+ ~PythonScalarUdfKernelInit() {
if (_Py_IsFinalizing()) {
function->detach();
}
}
- Status Exec(compute::KernelContext* ctx, const ExecSpan& batch, ExecResult*
out) {
+ Result<std::unique_ptr<compute::KernelState>> operator()(
+ compute::KernelContext*, const compute::KernelInitArgs&) {
+ return std::make_unique<PythonScalarUdfKernelState>(function);
+ }
+
+ std::shared_ptr<OwnedRefNoGIL> function;
+};
+
+struct PythonTableUdfKernelInit {
+ PythonTableUdfKernelInit(std::shared_ptr<OwnedRefNoGIL> function_maker,
+ ScalarUdfWrapperCallback cb)
+ : function_maker(function_maker), cb(cb) {
+ Py_INCREF(function_maker->obj());
+ }
+
+ Result<std::unique_ptr<compute::KernelState>> operator()(
+ compute::KernelContext* ctx, const compute::KernelInitArgs&) {
+ ScalarUdfContext udf_context{ctx->memory_pool(), /*batch_length=*/0};
+ std::unique_ptr<OwnedRefNoGIL> function;
+ RETURN_NOT_OK(SafeCallIntoPython([this, &udf_context, &function] {
+ OwnedRef empty_tuple(PyTuple_New(0));
Review Comment:
OK, keeping the existing kernel structure.
Just to clarify, KernelInit gets invoked once in both structures - mine
returns a stream and yours would return a stream maker. Then, in my structure
KernelCall gets invoked multiple times, returning an output batch each time,
whereas in your structure it would get called just once but would return an
iterator that would get invoked multiple times, returning an output batch each
time. So, the difference between the two structures is syntactic. As noted, the
reason I chose my structure is because it is typical to kernels.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]