rtpsw commented on code in PR #14682:
URL: https://github.com/apache/arrow/pull/14682#discussion_r1048668717


##########
python/pyarrow/src/arrow/python/udf.cc:
##########
@@ -17,35 +17,83 @@
 
 #include "arrow/python/udf.h"
 #include "arrow/compute/function.h"
+#include "arrow/compute/kernel.h"
 #include "arrow/python/common.h"
+#include "arrow/util/checked_cast.h"
 
 namespace arrow {
-
-using compute::ExecResult;
-using compute::ExecSpan;
-
 namespace py {
 
 namespace {
 
-struct PythonUdf : public compute::KernelState {
-  ScalarUdfWrapperCallback cb;
+struct PythonScalarUdfKernelState : public compute::KernelState {
+  explicit PythonScalarUdfKernelState(std::shared_ptr<OwnedRefNoGIL> function)
+      : function(function) {}
+
   std::shared_ptr<OwnedRefNoGIL> function;
-  std::shared_ptr<DataType> output_type;
+};
 
-  PythonUdf(ScalarUdfWrapperCallback cb, std::shared_ptr<OwnedRefNoGIL> 
function,
-            const std::shared_ptr<DataType>& output_type)
-      : cb(cb), function(function), output_type(output_type) {}
+struct PythonScalarUdfKernelInit {
+  explicit PythonScalarUdfKernelInit(std::shared_ptr<OwnedRefNoGIL> function)
+      : function(function) {}
 
   // function needs to be destroyed at process exit
   // and Python may no longer be initialized.
-  ~PythonUdf() {
+  ~PythonScalarUdfKernelInit() {
     if (_Py_IsFinalizing()) {
       function->detach();
     }
   }
 
-  Status Exec(compute::KernelContext* ctx, const ExecSpan& batch, ExecResult* 
out) {
+  Result<std::unique_ptr<compute::KernelState>> operator()(
+      compute::KernelContext*, const compute::KernelInitArgs&) {
+    return std::make_unique<PythonScalarUdfKernelState>(function);
+  }
+
+  std::shared_ptr<OwnedRefNoGIL> function;
+};
+
+struct PythonTableUdfKernelInit {
+  PythonTableUdfKernelInit(std::shared_ptr<OwnedRefNoGIL> function_maker,
+                           ScalarUdfWrapperCallback cb)
+      : function_maker(function_maker), cb(cb) {
+    Py_INCREF(function_maker->obj());
+  }
+
+  Result<std::unique_ptr<compute::KernelState>> operator()(
+      compute::KernelContext* ctx, const compute::KernelInitArgs&) {
+    ScalarUdfContext udf_context{ctx->memory_pool(), /*batch_length=*/0};
+    std::unique_ptr<OwnedRefNoGIL> function;
+    RETURN_NOT_OK(SafeCallIntoPython([this, &udf_context, &function] {
+      OwnedRef empty_tuple(PyTuple_New(0));

Review Comment:
   OK, keeping the existing kernel structure.
   
   Just to clarify, KernelInit gets invoked once in both structures - mine 
returns a stream and yours would return a stream maker. Then, in my structure 
KernelCall gets invoked multiple times, returning an output batch each time, 
whereas in your structure it would get called just once but would return an 
iterator that would get invoked multiple times, returning an output batch each 
time. So, the difference between the two structures is syntactic. As noted, the 
reason I chose my structure is because it is typical to kernels.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscr...@arrow.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to