westonpace commented on code in PR #14682:
URL: https://github.com/apache/arrow/pull/14682#discussion_r1085459769


##########
python/pyarrow/src/arrow/python/udf.cc:
##########
@@ -105,21 +158,117 @@ Status RegisterScalarFunction(PyObject* user_function, 
ScalarUdfWrapperCallback
   }
   compute::OutputType output_type(options.output_type);
   auto udf_data = std::make_shared<PythonUdf>(
-      wrapper, std::make_shared<OwnedRefNoGIL>(user_function), 
options.output_type);
+      std::make_shared<OwnedRefNoGIL>(user_function), wrapper, 
options.output_type);
   compute::ScalarKernel kernel(
       compute::KernelSignature::Make(std::move(input_types), 
std::move(output_type),
                                      options.arity.is_varargs),
-      PythonUdfExec);
+      PythonUdfExec, kernel_init);
   kernel.data = std::move(udf_data);
 
   kernel.mem_allocation = compute::MemAllocation::NO_PREALLOCATE;
   kernel.null_handling = compute::NullHandling::COMPUTED_NO_PREALLOCATE;
   RETURN_NOT_OK(scalar_func->AddKernel(std::move(kernel)));
-  auto registry = compute::GetFunctionRegistry();
+  if (registry == NULLPTR) {
+    registry = compute::GetFunctionRegistry();
+  }
   RETURN_NOT_OK(registry->AddFunction(std::move(scalar_func)));
   return Status::OK();
 }
 
-}  // namespace py
+}  // namespace
+
+Status RegisterScalarFunction(PyObject* user_function, 
ScalarUdfWrapperCallback wrapper,
+                              const ScalarUdfOptions& options,
+                              compute::FunctionRegistry* registry) {
+  return RegisterScalarLikeFunction(
+      user_function,
+      
PythonScalarUdfKernelInit{std::make_shared<OwnedRefNoGIL>(user_function)}, 
wrapper,
+      options, registry);
+}
+
+Status RegisterTabularFunction(PyObject* user_function, 
ScalarUdfWrapperCallback wrapper,
+                               const ScalarUdfOptions& options,
+                               compute::FunctionRegistry* registry) {
+  if (options.arity.num_args != 0 || options.arity.is_varargs) {
+    return Status::NotImplemented("tabular function of non-null arity");
+  }
+  if (options.output_type->id() != Type::type::STRUCT) {
+    return Status::Invalid("tabular function with non-struct output");
+  }
+  return RegisterScalarLikeFunction(
+      user_function,
+      PythonTableUdfKernelInit{std::make_shared<OwnedRefNoGIL>(user_function), 
wrapper},
+      wrapper, options, registry);
+}
 
+namespace  {
+
+Result<std::shared_ptr<RecordBatch>> RecordBatchFromArray(
+    std::shared_ptr<Schema> schema, std::shared_ptr<Array> array) {
+  auto& data = const_cast<std::shared_ptr<ArrayData>&>(array->data());
+  if (data->child_data.size() != static_cast<size_t>(schema->num_fields())) {
+    return Status::Invalid("UDF result with shape not conforming to schema");
+  }
+  return RecordBatch::Make(std::move(schema), data->length, 
std::move(data->child_data));
+}
+
+}  // namespace
+
+Result<std::shared_ptr<RecordBatchReader>> CallTabularFunction(
+    const std::string& func_name, const std::vector<Datum>& args,
+    compute::FunctionRegistry* registry) {
+  if (args.size() != 0) {
+    return Status::NotImplemented("non-empty arguments to tabular function");
+  }
+  if (registry == NULLPTR) {
+    registry = compute::GetFunctionRegistry();
+  }
+  ARROW_ASSIGN_OR_RAISE(auto func, registry->GetFunction(func_name));
+  if (func->kind() != compute::Function::SCALAR) {

Review Comment:
   This particular line will still need to change at some point though right?  
Right now you're asserting this must be scalar but the tabular functions you 
are interested in (generators) are not scalar.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to