rtpsw commented on code in PR #14682:
URL: https://github.com/apache/arrow/pull/14682#discussion_r1045247884


##########
python/pyarrow/src/arrow/python/udf.cc:
##########
@@ -17,35 +17,83 @@
 
 #include "arrow/python/udf.h"
 #include "arrow/compute/function.h"
+#include "arrow/compute/kernel.h"
 #include "arrow/python/common.h"
+#include "arrow/util/checked_cast.h"
 
 namespace arrow {
-
-using compute::ExecResult;
-using compute::ExecSpan;
-
 namespace py {
 
 namespace {
 
-struct PythonUdf : public compute::KernelState {
-  ScalarUdfWrapperCallback cb;
+struct PythonScalarUdfKernelState : public compute::KernelState {
+  explicit PythonScalarUdfKernelState(std::shared_ptr<OwnedRefNoGIL> function)
+      : function(function) {}
+
   std::shared_ptr<OwnedRefNoGIL> function;

Review Comment:
   I'll look into this.



##########
python/pyarrow/src/arrow/python/udf.cc:
##########
@@ -17,35 +17,83 @@
 
 #include "arrow/python/udf.h"
 #include "arrow/compute/function.h"
+#include "arrow/compute/kernel.h"
 #include "arrow/python/common.h"
+#include "arrow/util/checked_cast.h"
 
 namespace arrow {
-
-using compute::ExecResult;
-using compute::ExecSpan;
-
 namespace py {
 
 namespace {
 
-struct PythonUdf : public compute::KernelState {
-  ScalarUdfWrapperCallback cb;
+struct PythonScalarUdfKernelState : public compute::KernelState {
+  explicit PythonScalarUdfKernelState(std::shared_ptr<OwnedRefNoGIL> function)
+      : function(function) {}
+
   std::shared_ptr<OwnedRefNoGIL> function;
-  std::shared_ptr<DataType> output_type;
+};
 
-  PythonUdf(ScalarUdfWrapperCallback cb, std::shared_ptr<OwnedRefNoGIL> 
function,
-            const std::shared_ptr<DataType>& output_type)
-      : cb(cb), function(function), output_type(output_type) {}
+struct PythonScalarUdfKernelInit {
+  explicit PythonScalarUdfKernelInit(std::shared_ptr<OwnedRefNoGIL> function)
+      : function(function) {}
 
   // function needs to be destroyed at process exit
   // and Python may no longer be initialized.
-  ~PythonUdf() {
+  ~PythonScalarUdfKernelInit() {
     if (_Py_IsFinalizing()) {
       function->detach();
     }
   }
 
-  Status Exec(compute::KernelContext* ctx, const ExecSpan& batch, ExecResult* 
out) {
+  Result<std::unique_ptr<compute::KernelState>> operator()(
+      compute::KernelContext*, const compute::KernelInitArgs&) {
+    return std::make_unique<PythonScalarUdfKernelState>(function);
+  }
+
+  std::shared_ptr<OwnedRefNoGIL> function;
+};
+
+struct PythonTableUdfKernelInit {
+  PythonTableUdfKernelInit(std::shared_ptr<OwnedRefNoGIL> function_maker,
+                           ScalarUdfWrapperCallback cb)
+      : function_maker(function_maker), cb(cb) {
+    Py_INCREF(function_maker->obj());
+  }
+
+  Result<std::unique_ptr<compute::KernelState>> operator()(
+      compute::KernelContext* ctx, const compute::KernelInitArgs&) {
+    ScalarUdfContext udf_context{ctx->memory_pool(), /*batch_length=*/0};
+    std::unique_ptr<OwnedRefNoGIL> function;
+    RETURN_NOT_OK(SafeCallIntoPython([this, &udf_context, &function] {
+      OwnedRef empty_tuple(PyTuple_New(0));
+      function = std::make_unique<OwnedRefNoGIL>(
+          cb(function_maker->obj(), udf_context, empty_tuple.obj()));
+      RETURN_NOT_OK(CheckPyError());
+      return Status::OK();
+    }));
+    if (!PyCallable_Check(function->obj())) {
+      return Status::TypeError("Expected a callable Python object.");
+    }
+    return std::make_unique<PythonScalarUdfKernelState>(
+        std::move(function));
+  }
+
+  std::shared_ptr<OwnedRefNoGIL> function_maker;

Review Comment:
   I'll look into this.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to