rtpsw commented on code in PR #14682:
URL: https://github.com/apache/arrow/pull/14682#discussion_r1045247137
##########
python/pyarrow/src/arrow/python/udf.cc:
##########
@@ -17,35 +17,83 @@
#include "arrow/python/udf.h"
#include "arrow/compute/function.h"
+#include "arrow/compute/kernel.h"
#include "arrow/python/common.h"
+#include "arrow/util/checked_cast.h"
namespace arrow {
-
-using compute::ExecResult;
-using compute::ExecSpan;
-
namespace py {
namespace {
-struct PythonUdf : public compute::KernelState {
- ScalarUdfWrapperCallback cb;
+struct PythonScalarUdfKernelState : public compute::KernelState {
+ explicit PythonScalarUdfKernelState(std::shared_ptr<OwnedRefNoGIL> function)
+ : function(function) {}
+
std::shared_ptr<OwnedRefNoGIL> function;
- std::shared_ptr<DataType> output_type;
+};
- PythonUdf(ScalarUdfWrapperCallback cb, std::shared_ptr<OwnedRefNoGIL>
function,
- const std::shared_ptr<DataType>& output_type)
- : cb(cb), function(function), output_type(output_type) {}
+struct PythonScalarUdfKernelInit {
+ explicit PythonScalarUdfKernelInit(std::shared_ptr<OwnedRefNoGIL> function)
+ : function(function) {}
// function needs to be destroyed at process exit
// and Python may no longer be initialized.
- ~PythonUdf() {
+ ~PythonScalarUdfKernelInit() {
if (_Py_IsFinalizing()) {
function->detach();
}
}
- Status Exec(compute::KernelContext* ctx, const ExecSpan& batch, ExecResult*
out) {
+ Result<std::unique_ptr<compute::KernelState>> operator()(
+ compute::KernelContext*, const compute::KernelInitArgs&) {
+ return std::make_unique<PythonScalarUdfKernelState>(function);
+ }
+
+ std::shared_ptr<OwnedRefNoGIL> function;
+};
+
+struct PythonTableUdfKernelInit {
+ PythonTableUdfKernelInit(std::shared_ptr<OwnedRefNoGIL> function_maker,
+ ScalarUdfWrapperCallback cb)
+ : function_maker(function_maker), cb(cb) {
+ Py_INCREF(function_maker->obj());
+ }
+
+ Result<std::unique_ptr<compute::KernelState>> operator()(
+ compute::KernelContext* ctx, const compute::KernelInitArgs&) {
+ ScalarUdfContext udf_context{ctx->memory_pool(), /*batch_length=*/0};
+ std::unique_ptr<OwnedRefNoGIL> function;
+ RETURN_NOT_OK(SafeCallIntoPython([this, &udf_context, &function] {
+ OwnedRef empty_tuple(PyTuple_New(0));
Review Comment:
The UDT kernel structure is explained in [this
post](https://github.com/apache/arrow/pull/14043#issuecomment-1255332996) -
here is the relevant part:
> The general idea is that the end-user is driving from PyArrow and
registers a UDT. The UDT is a Python-implemented function that may be invoked
multiple times, each at a different source node in the execution plan. Each
such invocation returns a stream object implemented in Python that is managed
in a kernel state. Invoking the kernel returns tabular data that is part of the
dynamically generated stream.
The empty-tuple in the invocation here is the arguments to the UDT; in the
current implementation the UDT accepts no arguments (in the future I expect it
would). This UDT invocation returns a stateful stream in the form of a kernel
state. It does so by calling a Python (no-arguments) function that returns a
stateful Python callable, which is wrapped by the kernel state. An invocation
of the stateful callable, via the kernel state, return the next tabular batch
of data. So, in comparison to what you wrote, the PR has:
- KernelInit -> return a stateful stream
- KernelCall -> return the next batch in the stream
I think this is a reasonable kernel structure because the KernelInit creates
state and the KernelCall uses the state to produce one output batch, as kernels
normally do. IIUC, in your proposed structure, the KernelCall would produce the
entire stream of batches, which is technically possible but not typical for
kernels.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]