westonpace commented on code in PR #14682:
URL: https://github.com/apache/arrow/pull/14682#discussion_r1085460682
##########
python/pyarrow/src/arrow/python/udf.cc:
##########
@@ -105,21 +192,110 @@ Status RegisterScalarFunction(PyObject* user_function,
ScalarUdfWrapperCallback
}
compute::OutputType output_type(options.output_type);
auto udf_data = std::make_shared<PythonUdf>(
- wrapper, std::make_shared<OwnedRefNoGIL>(user_function),
options.output_type);
+ std::make_shared<OwnedRefNoGIL>(user_function), wrapper,
+ TypeHolder::FromTypes(options.input_types), options.output_type);
compute::ScalarKernel kernel(
compute::KernelSignature::Make(std::move(input_types),
std::move(output_type),
options.arity.is_varargs),
- PythonUdfExec);
+ PythonUdfExec, kernel_init);
kernel.data = std::move(udf_data);
kernel.mem_allocation = compute::MemAllocation::NO_PREALLOCATE;
kernel.null_handling = compute::NullHandling::COMPUTED_NO_PREALLOCATE;
RETURN_NOT_OK(scalar_func->AddKernel(std::move(kernel)));
- auto registry = compute::GetFunctionRegistry();
+ if (registry == NULLPTR) {
+ registry = compute::GetFunctionRegistry();
+ }
RETURN_NOT_OK(registry->AddFunction(std::move(scalar_func)));
return Status::OK();
}
-} // namespace py
+} // namespace
+Status RegisterScalarFunction(PyObject* user_function, UdfWrapperCallback
wrapper,
+ const UdfOptions& options,
+ compute::FunctionRegistry* registry) {
+ return RegisterUdf(
+ user_function,
+ PythonUdfKernelInit{std::make_shared<OwnedRefNoGIL>(user_function)},
wrapper,
+ options, registry);
+}
+
+Status RegisterTabularFunction(PyObject* user_function, UdfWrapperCallback
wrapper,
+ const UdfOptions& options,
+ compute::FunctionRegistry* registry) {
+ if (options.arity.num_args != 0 || options.arity.is_varargs) {
+ return Status::NotImplemented("tabular function of non-null arity");
+ }
+ if (options.output_type->id() != Type::type::STRUCT) {
+ return Status::Invalid("tabular function with non-struct output");
+ }
+ return RegisterUdf(
+ user_function,
+ PythonTableUdfKernelInit{std::make_shared<OwnedRefNoGIL>(user_function),
wrapper},
+ wrapper, options, registry);
+}
+
+Result<std::shared_ptr<RecordBatchReader>> CallTabularFunction(
+ const std::string& func_name, const std::vector<Datum>& args,
+ compute::FunctionRegistry* registry) {
+ if (args.size() != 0) {
+ return Status::NotImplemented("non-empty arguments to tabular function");
+ }
+ if (registry == NULLPTR) {
+ registry = compute::GetFunctionRegistry();
+ }
+ ARROW_ASSIGN_OR_RAISE(auto func, registry->GetFunction(func_name));
+ if (func->kind() != compute::Function::SCALAR) {
+ return Status::Invalid("tabular function of non-scalar kind");
+ }
+ auto arity = func->arity();
+ if (arity.num_args != 0 || arity.is_varargs) {
+ return Status::NotImplemented("tabular function of non-null arity");
Review Comment:
```suggestion
return Status::NotImplemented("tabular function of non-null / vararg
arity");
```
##########
python/pyarrow/tests/test_udf.py:
##########
@@ -504,3 +504,112 @@ def test_input_lifetime(unary_func_fixture):
# Calling a UDF should not have kept `v` alive longer than required
v = None
assert proxy_pool.bytes_allocated() == 0
+
+
+def _record_batch_from_iters(schema, *iters):
+ arrays = [pa.array(list(v), type=schema[i].type)
+ for i, v in enumerate(iters)]
+ return pa.RecordBatch.from_arrays(arrays=arrays, schema=schema)
+
+
+def _record_batch_for_range(schema, n):
+ return _record_batch_from_iters(schema,
+ range(n, n + 10),
+ range(n + 1, n + 11))
+
+
+def make_udt_func(schema, batch_gen):
+ def udf_func(ctx):
+ class UDT:
+ def __init__(self):
+ self.caller = None
+
+ def __call__(self, ctx):
+ try:
+ if self.caller is None:
+ self.caller, ctx = batch_gen(ctx).send, None
Review Comment:
Why are you assigning `ctx` to `None` here?
##########
python/pyarrow/tests/test_udf.py:
##########
@@ -504,3 +504,112 @@ def test_input_lifetime(unary_func_fixture):
# Calling a UDF should not have kept `v` alive longer than required
v = None
assert proxy_pool.bytes_allocated() == 0
+
+
+def _record_batch_from_iters(schema, *iters):
+ arrays = [pa.array(list(v), type=schema[i].type)
+ for i, v in enumerate(iters)]
+ return pa.RecordBatch.from_arrays(arrays=arrays, schema=schema)
+
+
+def _record_batch_for_range(schema, n):
+ return _record_batch_from_iters(schema,
+ range(n, n + 10),
+ range(n + 1, n + 11))
+
+
+def make_udt_func(schema, batch_gen):
+ def udf_func(ctx):
+ class UDT:
+ def __init__(self):
+ self.caller = None
+
+ def __call__(self, ctx):
+ try:
+ if self.caller is None:
+ self.caller, ctx = batch_gen(ctx).send, None
+ batch = self.caller(ctx)
+ except StopIteration:
+ arrays = [pa.array([], type=field.type)
+ for field in schema]
+ batch = pa.RecordBatch.from_arrays(
+ arrays=arrays, schema=schema)
Review Comment:
I'm not sure it is a good enough reason. Let's raise a new issue then to
support natural stop iteration. I think people will expect to be able to write
normal generators.
I think one way you could do this today would be, in `register_tabular_udf`,
to wrap the iterator-maker with a function that wraps each iterator with
something that converts stop iteration into an empty table.
I think the issue is that you've grouped both scalar and tabular UDFs into a
common "udf" type. In other words, we have "regular UDFs" which have an init
function and an execute function and we have "aggregate UDFs" which have an
init function, a consume function, a merge function, and a finalize function.
What you've done here is use regular UDFs and allow the "init" function to
be "given this input create a generator" and the "execute function" to be
"return the next batch of data from the generator".
Since you're reusing the "regular UDF" you have to maintain the semantics of
its "execute" function which is "batch-in array-out". You're passing nothing
for "batch-in" (the args and batch length are passed to the init function) and
then interpreting an empty array out as stop.
In theory I think this can all work. I suspect you may run into trouble
once you want to start passing in arguments (you'd want those to go to the init
function right? I'm not sure how you can do that). It smells a little bit to
me of "using a hammer because you have one lying around and not because its the
best tool for the job" but I think an alternative approach (creating a new
function category) would be a considerable rewrite of the function registry
because "function category" has always been a bit of an implicit concept in the
function registry.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]