westonpace commented on code in PR #14682:
URL: https://github.com/apache/arrow/pull/14682#discussion_r1045129058


##########
python/pyarrow/src/arrow/python/udf.cc:
##########
@@ -17,35 +17,83 @@
 
 #include "arrow/python/udf.h"
 #include "arrow/compute/function.h"
+#include "arrow/compute/kernel.h"
 #include "arrow/python/common.h"
+#include "arrow/util/checked_cast.h"
 
 namespace arrow {
-
-using compute::ExecResult;
-using compute::ExecSpan;
-
 namespace py {
 
 namespace {
 
-struct PythonUdf : public compute::KernelState {
-  ScalarUdfWrapperCallback cb;
+struct PythonScalarUdfKernelState : public compute::KernelState {
+  explicit PythonScalarUdfKernelState(std::shared_ptr<OwnedRefNoGIL> function)
+      : function(function) {}
+
   std::shared_ptr<OwnedRefNoGIL> function;

Review Comment:
   Could this be a raw pointer?  I might be wrong here (I'm not as familiar 
with `kernel.h` as I could be) but I think each execution of the function will 
generate a new state object correct?  I would think then that the kernel 
registry would need to outlive any running kernels.
   
   Certainly not a performance concern but for readability it helps make it 
more clear that this is just a view and the init function is the true owner.
   
   Also, if we think the state could possibly outlive the init function then 
we'd need to do something here similar to the init function's destructor where 
we safely detach from the object if python has already shut down.



##########
python/pyarrow/src/arrow/python/udf.cc:
##########
@@ -17,35 +17,83 @@
 
 #include "arrow/python/udf.h"
 #include "arrow/compute/function.h"
+#include "arrow/compute/kernel.h"
 #include "arrow/python/common.h"
+#include "arrow/util/checked_cast.h"
 
 namespace arrow {
-
-using compute::ExecResult;
-using compute::ExecSpan;
-
 namespace py {
 
 namespace {
 
-struct PythonUdf : public compute::KernelState {
-  ScalarUdfWrapperCallback cb;
+struct PythonScalarUdfKernelState : public compute::KernelState {
+  explicit PythonScalarUdfKernelState(std::shared_ptr<OwnedRefNoGIL> function)
+      : function(function) {}
+
   std::shared_ptr<OwnedRefNoGIL> function;
-  std::shared_ptr<DataType> output_type;
+};
 
-  PythonUdf(ScalarUdfWrapperCallback cb, std::shared_ptr<OwnedRefNoGIL> 
function,
-            const std::shared_ptr<DataType>& output_type)
-      : cb(cb), function(function), output_type(output_type) {}
+struct PythonScalarUdfKernelInit {
+  explicit PythonScalarUdfKernelInit(std::shared_ptr<OwnedRefNoGIL> function)
+      : function(function) {}
 
   // function needs to be destroyed at process exit
   // and Python may no longer be initialized.
-  ~PythonUdf() {
+  ~PythonScalarUdfKernelInit() {
     if (_Py_IsFinalizing()) {
       function->detach();
     }
   }
 
-  Status Exec(compute::KernelContext* ctx, const ExecSpan& batch, ExecResult* 
out) {
+  Result<std::unique_ptr<compute::KernelState>> operator()(
+      compute::KernelContext*, const compute::KernelInitArgs&) {
+    return std::make_unique<PythonScalarUdfKernelState>(function);
+  }
+
+  std::shared_ptr<OwnedRefNoGIL> function;
+};
+
+struct PythonTableUdfKernelInit {
+  PythonTableUdfKernelInit(std::shared_ptr<OwnedRefNoGIL> function_maker,
+                           ScalarUdfWrapperCallback cb)
+      : function_maker(function_maker), cb(cb) {
+    Py_INCREF(function_maker->obj());
+  }
+
+  Result<std::unique_ptr<compute::KernelState>> operator()(
+      compute::KernelContext* ctx, const compute::KernelInitArgs&) {
+    ScalarUdfContext udf_context{ctx->memory_pool(), /*batch_length=*/0};
+    std::unique_ptr<OwnedRefNoGIL> function;
+    RETURN_NOT_OK(SafeCallIntoPython([this, &udf_context, &function] {
+      OwnedRef empty_tuple(PyTuple_New(0));

Review Comment:
   What is the purpose of this empty tuple?  Is it a proxy for potentially 
future use when a tabular UDF might be receiving data (e.g. not nil)?  We 
should probably document this.
   
   If it is a placeholder for potential input then does that mean we would need 
to init a new kernel state (basically start a whole new call) on each batch of 
data?  Would it be better to restructure this slightly so we have:
   
   KernelInit -> return a state object with the table maker function
   KernelCall -> call the table maker function as part of the call to get an 
iterator and iterate it
   



##########
python/pyarrow/src/arrow/python/udf.cc:
##########
@@ -83,16 +135,17 @@ struct PythonUdf : public compute::KernelState {
   }
 };
 
-Status PythonUdfExec(compute::KernelContext* ctx, const ExecSpan& batch,
-                     ExecResult* out) {
+Status PythonUdfExec(compute::KernelContext* ctx, const compute::ExecSpan& 
batch,
+                     compute::ExecResult* out) {
   auto udf = static_cast<PythonUdf*>(ctx->kernel()->data.get());
   return SafeCallIntoPython([&]() -> Status { return udf->Exec(ctx, batch, 
out); });
 }
 
-}  // namespace
-
-Status RegisterScalarFunction(PyObject* user_function, 
ScalarUdfWrapperCallback wrapper,
-                              const ScalarUdfOptions& options) {
+Status RegisterScalarLikeFunction(PyObject* user_function,

Review Comment:
   I think this PR muddles the meaning of the word "scalar" which is already a 
confusing enough word as it is.  Do we want to maybe pick a new word entirely 
instead of "scalar like"?  We should also rename ScalarUdfWrapperCallback, 
ScalarUdfOptions as well (at the very least to ScalarLike).  What would you 
imagine is not a "scalar like" function?  The pattern (something with a python 
function object and options that describe the behavior of that function object) 
seems pretty generic.
   
   If we can't think of anything that wouldn't fit this pattern then maybe 
"PythonFunction", "PythonUdfWrapperCallback", and "PythonUdfOptions"?
   
   Or maybe, compared to aggregate udfs, the unique thing is that there is one 
function.  So maybe "SingleFunction", "SingleFunctionUdfWrapperCallback", and 
"SingleFunctionUdfOptions"?



##########
python/pyarrow/src/arrow/python/udf.cc:
##########
@@ -105,21 +158,117 @@ Status RegisterScalarFunction(PyObject* user_function, 
ScalarUdfWrapperCallback
   }
   compute::OutputType output_type(options.output_type);
   auto udf_data = std::make_shared<PythonUdf>(
-      wrapper, std::make_shared<OwnedRefNoGIL>(user_function), 
options.output_type);
+      std::make_shared<OwnedRefNoGIL>(user_function), wrapper, 
options.output_type);
   compute::ScalarKernel kernel(
       compute::KernelSignature::Make(std::move(input_types), 
std::move(output_type),
                                      options.arity.is_varargs),
-      PythonUdfExec);
+      PythonUdfExec, kernel_init);
   kernel.data = std::move(udf_data);
 
   kernel.mem_allocation = compute::MemAllocation::NO_PREALLOCATE;
   kernel.null_handling = compute::NullHandling::COMPUTED_NO_PREALLOCATE;
   RETURN_NOT_OK(scalar_func->AddKernel(std::move(kernel)));
-  auto registry = compute::GetFunctionRegistry();
+  if (registry == NULLPTR) {
+    registry = compute::GetFunctionRegistry();
+  }
   RETURN_NOT_OK(registry->AddFunction(std::move(scalar_func)));
   return Status::OK();
 }
 
-}  // namespace py
+}  // namespace
+
+Status RegisterScalarFunction(PyObject* user_function, 
ScalarUdfWrapperCallback wrapper,
+                              const ScalarUdfOptions& options,
+                              compute::FunctionRegistry* registry) {
+  return RegisterScalarLikeFunction(
+      user_function,
+      
PythonScalarUdfKernelInit{std::make_shared<OwnedRefNoGIL>(user_function)}, 
wrapper,
+      options, registry);
+}
+
+Status RegisterTabularFunction(PyObject* user_function, 
ScalarUdfWrapperCallback wrapper,
+                               const ScalarUdfOptions& options,
+                               compute::FunctionRegistry* registry) {
+  if (options.arity.num_args != 0 || options.arity.is_varargs) {
+    return Status::NotImplemented("tabular function of non-null arity");
+  }
+  if (options.output_type->id() != Type::type::STRUCT) {
+    return Status::Invalid("tabular function with non-struct output");
+  }
+  return RegisterScalarLikeFunction(
+      user_function,
+      PythonTableUdfKernelInit{std::make_shared<OwnedRefNoGIL>(user_function), 
wrapper},
+      wrapper, options, registry);
+}
 
+namespace  {
+
+Result<std::shared_ptr<RecordBatch>> RecordBatchFromArray(
+    std::shared_ptr<Schema> schema, std::shared_ptr<Array> array) {
+  auto& data = const_cast<std::shared_ptr<ArrayData>&>(array->data());
+  if (data->child_data.size() != static_cast<size_t>(schema->num_fields())) {
+    return Status::Invalid("UDF result with shape not conforming to schema");
+  }
+  return RecordBatch::Make(std::move(schema), data->length, 
std::move(data->child_data));
+}
+
+}  // namespace
+
+Result<std::shared_ptr<RecordBatchReader>> CallTabularFunction(
+    const std::string& func_name, const std::vector<Datum>& args,
+    compute::FunctionRegistry* registry) {
+  if (args.size() != 0) {
+    return Status::NotImplemented("non-empty arguments to tabular function");
+  }
+  if (registry == NULLPTR) {
+    registry = compute::GetFunctionRegistry();
+  }
+  ARROW_ASSIGN_OR_RAISE(auto func, registry->GetFunction(func_name));
+  if (func->kind() != compute::Function::SCALAR) {

Review Comment:
   I think we can tackle this in a future PR but I think we will eventually 
want to introduce a new function kind for tabular functions.  Or, if we have to 
pick one of the two options we have today, I think vector would be more 
accurate.  Since we have no inputs I think we might get away with this for now 
but if we had inputs I would expect the length of the array returned from a 
scalar function to match the length of the incoming inputs and I don't think 
this will be true here.



##########
python/pyarrow/src/arrow/python/udf.cc:
##########
@@ -68,8 +116,12 @@ struct PythonUdf : public compute::KernelState {
     // unwrapping the output for expected output type
     if (is_array(result.obj())) {
       ARROW_ASSIGN_OR_RAISE(std::shared_ptr<Array> val, 
unwrap_array(result.obj()));
-      if (!output_type->Equals(*val->type())) {
-        return Status::TypeError("Expected output datatype ", 
output_type->ToString(),
+      ARROW_ASSIGN_OR_RAISE(TypeHolder type, output_type.Resolve(ctx, 
batch.GetTypes()));

Review Comment:
   It seems strange that the type resolver would need to run on each batch 
(though maybe this is entirely correct).  Do you think there is a way the type 
resolver can run on init and put the output type in the state?



##########
python/pyarrow/src/arrow/python/udf.cc:
##########
@@ -105,21 +158,117 @@ Status RegisterScalarFunction(PyObject* user_function, 
ScalarUdfWrapperCallback
   }
   compute::OutputType output_type(options.output_type);
   auto udf_data = std::make_shared<PythonUdf>(
-      wrapper, std::make_shared<OwnedRefNoGIL>(user_function), 
options.output_type);
+      std::make_shared<OwnedRefNoGIL>(user_function), wrapper, 
options.output_type);
   compute::ScalarKernel kernel(
       compute::KernelSignature::Make(std::move(input_types), 
std::move(output_type),
                                      options.arity.is_varargs),
-      PythonUdfExec);
+      PythonUdfExec, kernel_init);
   kernel.data = std::move(udf_data);
 
   kernel.mem_allocation = compute::MemAllocation::NO_PREALLOCATE;
   kernel.null_handling = compute::NullHandling::COMPUTED_NO_PREALLOCATE;
   RETURN_NOT_OK(scalar_func->AddKernel(std::move(kernel)));
-  auto registry = compute::GetFunctionRegistry();
+  if (registry == NULLPTR) {
+    registry = compute::GetFunctionRegistry();
+  }
   RETURN_NOT_OK(registry->AddFunction(std::move(scalar_func)));
   return Status::OK();
 }
 
-}  // namespace py
+}  // namespace
+
+Status RegisterScalarFunction(PyObject* user_function, 
ScalarUdfWrapperCallback wrapper,
+                              const ScalarUdfOptions& options,
+                              compute::FunctionRegistry* registry) {
+  return RegisterScalarLikeFunction(
+      user_function,
+      
PythonScalarUdfKernelInit{std::make_shared<OwnedRefNoGIL>(user_function)}, 
wrapper,
+      options, registry);
+}
+
+Status RegisterTabularFunction(PyObject* user_function, 
ScalarUdfWrapperCallback wrapper,
+                               const ScalarUdfOptions& options,
+                               compute::FunctionRegistry* registry) {
+  if (options.arity.num_args != 0 || options.arity.is_varargs) {
+    return Status::NotImplemented("tabular function of non-null arity");
+  }
+  if (options.output_type->id() != Type::type::STRUCT) {
+    return Status::Invalid("tabular function with non-struct output");
+  }
+  return RegisterScalarLikeFunction(
+      user_function,
+      PythonTableUdfKernelInit{std::make_shared<OwnedRefNoGIL>(user_function), 
wrapper},
+      wrapper, options, registry);
+}
 
+namespace  {
+
+Result<std::shared_ptr<RecordBatch>> RecordBatchFromArray(
+    std::shared_ptr<Schema> schema, std::shared_ptr<Array> array) {
+  auto& data = const_cast<std::shared_ptr<ArrayData>&>(array->data());
+  if (data->child_data.size() != static_cast<size_t>(schema->num_fields())) {
+    return Status::Invalid("UDF result with shape not conforming to schema");
+  }
+  return RecordBatch::Make(std::move(schema), data->length, 
std::move(data->child_data));
+}
+
+}  // namespace
+
+Result<std::shared_ptr<RecordBatchReader>> CallTabularFunction(
+    const std::string& func_name, const std::vector<Datum>& args,
+    compute::FunctionRegistry* registry) {
+  if (args.size() != 0) {
+    return Status::NotImplemented("non-empty arguments to tabular function");
+  }
+  if (registry == NULLPTR) {
+    registry = compute::GetFunctionRegistry();
+  }
+  ARROW_ASSIGN_OR_RAISE(auto func, registry->GetFunction(func_name));
+  if (func->kind() != compute::Function::SCALAR) {
+    return Status::Invalid("tabular function of non-scalar kind");
+  }
+  auto arity = func->arity();
+  if (arity.num_args != 0 || arity.is_varargs) {
+    return Status::NotImplemented("tabular function of non-null arity");
+  }
+  auto kernels =
+      
arrow::internal::checked_pointer_cast<compute::ScalarFunction>(func)->kernels();
+  if (kernels.size() != 1) {
+    return Status::NotImplemented("tabular function with non-single kernel");
+  }
+  const compute::ScalarKernel* kernel = kernels[0];
+  auto out_type = kernel->signature->out_type();
+  if (out_type.kind() != compute::OutputType::FIXED) {
+    return Status::Invalid("tabular kernel of non-fixed kind");
+  }
+  auto datatype = out_type.type();
+  if (datatype->id() != Type::type::STRUCT) {
+    return Status::Invalid("tabular kernel with non-struct output");
+  }
+  auto struct_type = 
arrow::internal::checked_cast<StructType*>(datatype.get());
+  auto schema = ::arrow::schema(struct_type->fields());
+  std::vector<TypeHolder> in_types;
+  ARROW_ASSIGN_OR_RAISE(auto func_exec,
+                        GetFunctionExecutor(func_name, in_types, NULLPTR, 
registry));
+  auto next_func =
+      [schema,
+       func_exec = std::move(func_exec)]() -> 
Result<std::shared_ptr<RecordBatch>> {
+    std::vector<Datum> args;
+    // passed_length of -1 or 0 with args.size() of 0 leads to an empty 
ExecSpanIterator
+    // in exec.cc and to never invoking the source function, so 1 is passed 
instead
+    ARROW_ASSIGN_OR_RAISE(auto datum, func_exec->Execute(args, 
/*passed_length=*/1));
+    if (!datum.is_array()) {
+      return Status::Invalid("UDF result of non-array kind");
+    }
+    std::shared_ptr<Array> array = datum.make_array();
+    if (array->length() == 0) {
+      return IterationTraits<std::shared_ptr<RecordBatch>>::End();
+    }
+    return RecordBatchFromArray(std::move(schema), std::move(array));

Review Comment:
   Right now it seems a tabular function must return a struct array.  I think 
that is "ok" but for consistency with the rest of the kernels might it be 
better to return an ExecSpan?
   
   Of course, we have no concept of exec span in python so the python UDF would 
still return a struct array.  The C++ wrapper could then convert that returned 
struct array into an exec span.
   
   Although, continuing down this line, would it make more sense to have the 
python UDFs return record batches?  pyarrow doesn't really have much concept / 
functionality for struct array.



##########
python/pyarrow/src/arrow/python/udf.cc:
##########
@@ -17,35 +17,83 @@
 
 #include "arrow/python/udf.h"
 #include "arrow/compute/function.h"
+#include "arrow/compute/kernel.h"
 #include "arrow/python/common.h"
+#include "arrow/util/checked_cast.h"
 
 namespace arrow {
-
-using compute::ExecResult;
-using compute::ExecSpan;
-
 namespace py {
 
 namespace {
 
-struct PythonUdf : public compute::KernelState {
-  ScalarUdfWrapperCallback cb;
+struct PythonScalarUdfKernelState : public compute::KernelState {
+  explicit PythonScalarUdfKernelState(std::shared_ptr<OwnedRefNoGIL> function)
+      : function(function) {}
+
   std::shared_ptr<OwnedRefNoGIL> function;
-  std::shared_ptr<DataType> output_type;
+};
 
-  PythonUdf(ScalarUdfWrapperCallback cb, std::shared_ptr<OwnedRefNoGIL> 
function,
-            const std::shared_ptr<DataType>& output_type)
-      : cb(cb), function(function), output_type(output_type) {}
+struct PythonScalarUdfKernelInit {
+  explicit PythonScalarUdfKernelInit(std::shared_ptr<OwnedRefNoGIL> function)
+      : function(function) {}
 
   // function needs to be destroyed at process exit
   // and Python may no longer be initialized.
-  ~PythonUdf() {
+  ~PythonScalarUdfKernelInit() {
     if (_Py_IsFinalizing()) {
       function->detach();
     }
   }
 
-  Status Exec(compute::KernelContext* ctx, const ExecSpan& batch, ExecResult* 
out) {
+  Result<std::unique_ptr<compute::KernelState>> operator()(
+      compute::KernelContext*, const compute::KernelInitArgs&) {
+    return std::make_unique<PythonScalarUdfKernelState>(function);
+  }
+
+  std::shared_ptr<OwnedRefNoGIL> function;
+};
+
+struct PythonTableUdfKernelInit {
+  PythonTableUdfKernelInit(std::shared_ptr<OwnedRefNoGIL> function_maker,
+                           ScalarUdfWrapperCallback cb)
+      : function_maker(function_maker), cb(cb) {
+    Py_INCREF(function_maker->obj());
+  }
+
+  Result<std::unique_ptr<compute::KernelState>> operator()(
+      compute::KernelContext* ctx, const compute::KernelInitArgs&) {
+    ScalarUdfContext udf_context{ctx->memory_pool(), /*batch_length=*/0};
+    std::unique_ptr<OwnedRefNoGIL> function;
+    RETURN_NOT_OK(SafeCallIntoPython([this, &udf_context, &function] {
+      OwnedRef empty_tuple(PyTuple_New(0));
+      function = std::make_unique<OwnedRefNoGIL>(
+          cb(function_maker->obj(), udf_context, empty_tuple.obj()));
+      RETURN_NOT_OK(CheckPyError());
+      return Status::OK();
+    }));
+    if (!PyCallable_Check(function->obj())) {
+      return Status::TypeError("Expected a callable Python object.");
+    }
+    return std::make_unique<PythonScalarUdfKernelState>(
+        std::move(function));
+  }
+
+  std::shared_ptr<OwnedRefNoGIL> function_maker;

Review Comment:
   We need to potentially detach from this in the destructor (see the 
destructor for the scalar init).  The problem (if I'm remembering correctly) is 
that it is not safe to destroy the `OwnedRefNoGIL` (without detaching) if 
python has already started shutting down.  Since the function registry is 
global (at least the default one) it might be destroyed after python has 
already shut down.



##########
python/pyarrow/src/arrow/python/udf.cc:
##########
@@ -105,21 +158,117 @@ Status RegisterScalarFunction(PyObject* user_function, 
ScalarUdfWrapperCallback
   }
   compute::OutputType output_type(options.output_type);
   auto udf_data = std::make_shared<PythonUdf>(
-      wrapper, std::make_shared<OwnedRefNoGIL>(user_function), 
options.output_type);
+      std::make_shared<OwnedRefNoGIL>(user_function), wrapper, 
options.output_type);
   compute::ScalarKernel kernel(
       compute::KernelSignature::Make(std::move(input_types), 
std::move(output_type),
                                      options.arity.is_varargs),
-      PythonUdfExec);
+      PythonUdfExec, kernel_init);
   kernel.data = std::move(udf_data);
 
   kernel.mem_allocation = compute::MemAllocation::NO_PREALLOCATE;
   kernel.null_handling = compute::NullHandling::COMPUTED_NO_PREALLOCATE;
   RETURN_NOT_OK(scalar_func->AddKernel(std::move(kernel)));
-  auto registry = compute::GetFunctionRegistry();
+  if (registry == NULLPTR) {
+    registry = compute::GetFunctionRegistry();
+  }
   RETURN_NOT_OK(registry->AddFunction(std::move(scalar_func)));
   return Status::OK();
 }
 
-}  // namespace py
+}  // namespace
+
+Status RegisterScalarFunction(PyObject* user_function, 
ScalarUdfWrapperCallback wrapper,
+                              const ScalarUdfOptions& options,
+                              compute::FunctionRegistry* registry) {
+  return RegisterScalarLikeFunction(
+      user_function,
+      
PythonScalarUdfKernelInit{std::make_shared<OwnedRefNoGIL>(user_function)}, 
wrapper,
+      options, registry);
+}
+
+Status RegisterTabularFunction(PyObject* user_function, 
ScalarUdfWrapperCallback wrapper,
+                               const ScalarUdfOptions& options,
+                               compute::FunctionRegistry* registry) {
+  if (options.arity.num_args != 0 || options.arity.is_varargs) {
+    return Status::NotImplemented("tabular function of non-null arity");
+  }
+  if (options.output_type->id() != Type::type::STRUCT) {
+    return Status::Invalid("tabular function with non-struct output");
+  }
+  return RegisterScalarLikeFunction(
+      user_function,
+      PythonTableUdfKernelInit{std::make_shared<OwnedRefNoGIL>(user_function), 
wrapper},
+      wrapper, options, registry);
+}
 
+namespace  {
+
+Result<std::shared_ptr<RecordBatch>> RecordBatchFromArray(

Review Comment:
   Can you use `RecordBatch::FromStructArray` defined in `record_batch.h`?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to