rtpsw commented on code in PR #14682:
URL: https://github.com/apache/arrow/pull/14682#discussion_r1045247137


##########
python/pyarrow/src/arrow/python/udf.cc:
##########
@@ -17,35 +17,83 @@
 
 #include "arrow/python/udf.h"
 #include "arrow/compute/function.h"
+#include "arrow/compute/kernel.h"
 #include "arrow/python/common.h"
+#include "arrow/util/checked_cast.h"
 
 namespace arrow {
-
-using compute::ExecResult;
-using compute::ExecSpan;
-
 namespace py {
 
 namespace {
 
-struct PythonUdf : public compute::KernelState {
-  ScalarUdfWrapperCallback cb;
+struct PythonScalarUdfKernelState : public compute::KernelState {
+  explicit PythonScalarUdfKernelState(std::shared_ptr<OwnedRefNoGIL> function)
+      : function(function) {}
+
   std::shared_ptr<OwnedRefNoGIL> function;
-  std::shared_ptr<DataType> output_type;
+};
 
-  PythonUdf(ScalarUdfWrapperCallback cb, std::shared_ptr<OwnedRefNoGIL> 
function,
-            const std::shared_ptr<DataType>& output_type)
-      : cb(cb), function(function), output_type(output_type) {}
+struct PythonScalarUdfKernelInit {
+  explicit PythonScalarUdfKernelInit(std::shared_ptr<OwnedRefNoGIL> function)
+      : function(function) {}
 
   // function needs to be destroyed at process exit
   // and Python may no longer be initialized.
-  ~PythonUdf() {
+  ~PythonScalarUdfKernelInit() {
     if (_Py_IsFinalizing()) {
       function->detach();
     }
   }
 
-  Status Exec(compute::KernelContext* ctx, const ExecSpan& batch, ExecResult* 
out) {
+  Result<std::unique_ptr<compute::KernelState>> operator()(
+      compute::KernelContext*, const compute::KernelInitArgs&) {
+    return std::make_unique<PythonScalarUdfKernelState>(function);
+  }
+
+  std::shared_ptr<OwnedRefNoGIL> function;
+};
+
+struct PythonTableUdfKernelInit {
+  PythonTableUdfKernelInit(std::shared_ptr<OwnedRefNoGIL> function_maker,
+                           ScalarUdfWrapperCallback cb)
+      : function_maker(function_maker), cb(cb) {
+    Py_INCREF(function_maker->obj());
+  }
+
+  Result<std::unique_ptr<compute::KernelState>> operator()(
+      compute::KernelContext* ctx, const compute::KernelInitArgs&) {
+    ScalarUdfContext udf_context{ctx->memory_pool(), /*batch_length=*/0};
+    std::unique_ptr<OwnedRefNoGIL> function;
+    RETURN_NOT_OK(SafeCallIntoPython([this, &udf_context, &function] {
+      OwnedRef empty_tuple(PyTuple_New(0));

Review Comment:
   The UDT kernel structure is explained in [this 
post](https://github.com/apache/arrow/pull/14043#issuecomment-1255332996) - 
here is the relevant part:
   
   > The general idea is that the end-user is driving from PyArrow and 
registers a UDT. The UDT is a Python-implemented function that may be invoked 
multiple times, each at a different source node in the execution plan. Each 
such invocation returns a stream object implemented in Python that is managed 
in a kernel state. Invoking the kernel returns tabular data that is part of the 
dynamically generated stream.
   
   The empty-tuple in the invocation here is the arguments to the UDT; in the 
current implementation the UDT accepts no arguments (in the future I expect it 
would). This UDT invocation returns a stateful stream in the form of a kernel 
state. It does so by calling a Python (no-arguments) function that returns a 
stateful Python callable, which is wrapped by the kernel state. An invocation 
of the stateful callable, via the kernel state, return the next tabular batch 
of data. So, in comparison to what you wrote, the PR has:
   - KernelInit -> return a stateful stream
   - KernelCall -> return the next batch in the stream
   
   I think this is a reasonable kernel structure because the KernelInit creates 
state and the KernelCall uses the state to produce one output batch, as kernels 
normally do. IIUC, in your proposed structure, the KernelCall would produce the 
entire stream of batches, which is technically possible but not typical for 
kernels.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to