wjones127 commented on code in PR #14682:
URL: https://github.com/apache/arrow/pull/14682#discussion_r1028301304
##########
python/pyarrow/_compute.pyx:
##########
@@ -2558,8 +2602,51 @@ def _get_scalar_udf_context(memory_pool, batch_length):
return context
-def register_scalar_function(func, function_name, function_doc, in_types,
- out_type):
+def udf_result_from_record_batch(record_batch):
Review Comment:
We have `RecordBatch.from_struct_array()` exposed. Could this function
instead be added as `RecordBatch.to_struct_array()`?
https://github.com/apache/arrow/blob/b9dd41607cb7dd7afd50e3ceb99c68e79e7733a0/python/pyarrow/table.pxi#L2468
##########
python/pyarrow/_compute.pyx:
##########
@@ -2629,14 +2718,98 @@ def register_scalar_function(func, function_name,
function_doc, in_types,
21
]
"""
+ return register_scalar_like_function(GetRegisterScalarFunction(),
+ func, function_name, function_doc,
in_types,
+ out_type, func_registry)
+
+
+def register_tabular_function(func, function_name, function_doc, in_types,
out_type,
+ func_registry=None):
+ """
+ Register a user-defined tabular function.
+
+ A tabular function is one accepting a context argument of type
+ ScalarUdfContext and returning a generator of struct arrays.
+ The in_types argument must be empty and the out_type argument
+ specifies a schema. Each struct array must have field types
+ correspoding to the schema.
+
+ Parameters
+ ----------
+ func : callable
+ A callable implementing the user-defined function.
+ The only argument is the context argument of type
+ ScalarUdfContext. It must return a callable that
+ returns on each invocation a StructArray matching
+ the out_type, where an empty array indicates end.
+ function_name : str
+ Name of the function. This name must be globally unique.
+ function_doc : dict
+ A dictionary object with keys "summary" (str),
+ and "description" (str).
+ in_types : Dict[str, DataType]
+ Must be an empty dictionary.
+ out_type : DataType
+ Output type of the function.
+ func_registry : FunctionRegistry
+ Optional function registry to use instead of the default global one.
+ """
+ return register_scalar_like_function(GetRegisterTabularFunction(),
+ func, function_name, function_doc,
in_types,
+ out_type, func_registry)
+
+
+def register_scalar_like_function(register_func, func, function_name,
function_doc, in_types,
+ out_type, func_registry=None):
+ """
+ Register a user-defined scalar-like function.
+
+ A scalar-like function is a callable accepting a first
+ context argument of type ScalarUdfContext as well as
+ possibly additional Arrow arguments, and returning a
+ an Arrow result appropriate for the kind of function.
+ A scalar function and a tabular function are examples
+ for scalar-like functions.
+ This function is normally not called directly but via
+ register_scalar_function or register_tabular_function.
Review Comment:
Why do we define this publicly? Can this be an internal function?
##########
python/pyarrow/src/arrow/python/udf.cc:
##########
@@ -17,35 +17,92 @@
#include "arrow/python/udf.h"
#include "arrow/compute/function.h"
+#include "arrow/compute/kernel.h"
#include "arrow/python/common.h"
+#include "arrow/util/checked_cast.h"
namespace arrow {
using compute::ExecResult;
using compute::ExecSpan;
+using compute::Function;
+using compute::OutputType;
Review Comment:
This is only used once in the file; but many times used as
`compute::OutputType`. Perhaps remove this `using`? Or if you really want,
modify the other references?
##########
python/pyarrow/compute.py:
##########
@@ -80,7 +80,10 @@
list_functions,
_group_by,
# Udf
+ get_record_batches_from_tabular_function,
Review Comment:
This feels off to me.
There are these function classes here:
https://github.com/apache/arrow/blob/b9dd41607cb7dd7afd50e3ceb99c68e79e7733a0/python/pyarrow/_compute.pyx#L363
Perhaps you should implement a `TableFunction` class, and have this be a
method on that class. So the API would be:
```python
registry: FunctionRegistry
func: TableFunction = registry.get_function("my_table_function")
func.get_batch_iterator()
```
##########
python/pyarrow/src/arrow/python/udf.cc:
##########
@@ -105,21 +167,109 @@ Status RegisterScalarFunction(PyObject* user_function,
ScalarUdfWrapperCallback
}
compute::OutputType output_type(options.output_type);
auto udf_data = std::make_shared<PythonUdf>(
- wrapper, std::make_shared<OwnedRefNoGIL>(user_function),
options.output_type);
+ std::make_shared<OwnedRefNoGIL>(user_function), wrapper,
options.output_type);
compute::ScalarKernel kernel(
compute::KernelSignature::Make(std::move(input_types),
std::move(output_type),
options.arity.is_varargs),
- PythonUdfExec);
+ PythonUdfExec, kernel_init);
kernel.data = std::move(udf_data);
kernel.mem_allocation = compute::MemAllocation::NO_PREALLOCATE;
kernel.null_handling = compute::NullHandling::COMPUTED_NO_PREALLOCATE;
RETURN_NOT_OK(scalar_func->AddKernel(std::move(kernel)));
- auto registry = compute::GetFunctionRegistry();
+ if (registry == NULLPTR) {
+ registry = compute::GetFunctionRegistry();
+ }
RETURN_NOT_OK(registry->AddFunction(std::move(scalar_func)));
return Status::OK();
}
+} // namespace
+
+Status RegisterScalarFunction(PyObject* user_function,
ScalarUdfWrapperCallback wrapper,
+ const ScalarUdfOptions& options,
+ compute::FunctionRegistry* registry) {
+ return RegisterScalarLikeFunction(
+ user_function,
+
PythonScalarUdfKernelInit{std::make_shared<OwnedRefNoGIL>(user_function)},
wrapper,
+ options, registry);
+}
+
+Status RegisterTabularFunction(PyObject* user_function,
ScalarUdfWrapperCallback wrapper,
+ const ScalarUdfOptions& options,
+ compute::FunctionRegistry* registry) {
+ if (options.arity.num_args != 0 || options.arity.is_varargs) {
+ return Status::Invalid("tabular function must have no arguments");
+ }
+ return RegisterScalarLikeFunction(
Review Comment:
Shouldn't we validate the the `out_type` is a StructType?
##########
python/pyarrow/tests/test_udf.py:
##########
@@ -504,3 +504,49 @@ def test_input_lifetime(unary_func_fixture):
# Calling a UDF should not have kept `v` alive longer than required
v = None
assert proxy_pool.bytes_allocated() == 0
+
+
+def _record_batch_from_iters(schema, *iters):
+ arrays = [pa.array(list(v), type=schema[i].type)
+ for i, v in enumerate(iters)]
+ return pa.RecordBatch.from_arrays(arrays=arrays, schema=schema)
+
+
+def _record_batch_for_range(schema, n):
+ return _record_batch_from_iters(schema,
+ range(n, n + 10),
+ range(n + 1, n + 11))
+
+
+def datasource1(ctx):
+ """A short dataset"""
+ import pyarrow as pa
+ schema = pa.schema([('', pa.int32()), ('', pa.int32())])
+
+ class Generator:
+ def __init__(self):
+ self.n = 3
+
+ def __call__(self, ctx):
+ if self.n == 0:
+ batch = _record_batch_from_iters(schema, [], [])
+ else:
+ self.n -= 1
+ batch = _record_batch_for_range(schema, self.n)
+ return pc.udf_result_from_record_batch(batch)
+ return Generator()
Review Comment:
This works, though I wonder if the API might be better taking a coroutine?
```python
def generator(ctx):
for i in range(3):
ctx = yield _record_batch_for_range(schema, n + 1).to_struct_array()
```
Thought, this might be a bit awkward: the first `ctx` must be passed in as a
parameter, and then subsequent ones via
[send](https://docs.python.org/3/reference/expressions.html#generator.send).
But this would eliminate the need for the user to send an empty array at the
end, since we could detect StopIteration and map that in the PyArrow code?
##########
python/pyarrow/src/arrow/python/udf.cc:
##########
@@ -105,21 +167,109 @@ Status RegisterScalarFunction(PyObject* user_function,
ScalarUdfWrapperCallback
}
compute::OutputType output_type(options.output_type);
auto udf_data = std::make_shared<PythonUdf>(
- wrapper, std::make_shared<OwnedRefNoGIL>(user_function),
options.output_type);
+ std::make_shared<OwnedRefNoGIL>(user_function), wrapper,
options.output_type);
compute::ScalarKernel kernel(
compute::KernelSignature::Make(std::move(input_types),
std::move(output_type),
options.arity.is_varargs),
- PythonUdfExec);
+ PythonUdfExec, kernel_init);
kernel.data = std::move(udf_data);
kernel.mem_allocation = compute::MemAllocation::NO_PREALLOCATE;
kernel.null_handling = compute::NullHandling::COMPUTED_NO_PREALLOCATE;
RETURN_NOT_OK(scalar_func->AddKernel(std::move(kernel)));
- auto registry = compute::GetFunctionRegistry();
+ if (registry == NULLPTR) {
+ registry = compute::GetFunctionRegistry();
+ }
RETURN_NOT_OK(registry->AddFunction(std::move(scalar_func)));
return Status::OK();
}
+} // namespace
+
+Status RegisterScalarFunction(PyObject* user_function,
ScalarUdfWrapperCallback wrapper,
+ const ScalarUdfOptions& options,
+ compute::FunctionRegistry* registry) {
+ return RegisterScalarLikeFunction(
+ user_function,
+
PythonScalarUdfKernelInit{std::make_shared<OwnedRefNoGIL>(user_function)},
wrapper,
+ options, registry);
+}
+
+Status RegisterTabularFunction(PyObject* user_function,
ScalarUdfWrapperCallback wrapper,
+ const ScalarUdfOptions& options,
+ compute::FunctionRegistry* registry) {
+ if (options.arity.num_args != 0 || options.arity.is_varargs) {
+ return Status::Invalid("tabular function must have no arguments");
+ }
+ return RegisterScalarLikeFunction(
+ user_function,
+ PythonTableUdfKernelInit{std::make_shared<OwnedRefNoGIL>(user_function),
wrapper},
+ wrapper, options, registry);
+}
+
+namespace {
+
+Result<std::shared_ptr<RecordBatch>> RecordBatchFromArray(
+ std::shared_ptr<Schema> schema, std::shared_ptr<Array> array) {
+ auto& data = const_cast<std::shared_ptr<ArrayData>&>(array->data());
+ if (data->child_data.size() != static_cast<size_t>(schema->num_fields())) {
+ return Status::Invalid("UDF result with shape not conforming to schema");
+ }
+ return RecordBatch::Make(std::move(schema), data->length,
std::move(data->child_data));
+}
+
+} // namespace
+
+Result<RecordBatchIterator> GetRecordBatchesFromTabularFunction(
+ const std::string& func_name, compute::FunctionRegistry* registry) {
+ if (registry == NULLPTR) {
+ registry = compute::GetFunctionRegistry();
+ }
+ ARROW_ASSIGN_OR_RAISE(auto func, registry->GetFunction(func_name));
+ if (func->kind() != Function::SCALAR) {
+ return Status::Invalid("tabular function of non-scalar kind");
+ }
+ auto arity = func->arity();
+ if (arity.num_args != 0 || arity.is_varargs) {
+ return Status::Invalid("tabular function of non-null arity");
+ }
Review Comment:
Why doesn't this validation happen during registration?
##########
python/pyarrow/_compute.pyx:
##########
@@ -2629,14 +2718,98 @@ def register_scalar_function(func, function_name,
function_doc, in_types,
21
]
"""
+ return register_scalar_like_function(GetRegisterScalarFunction(),
+ func, function_name, function_doc,
in_types,
+ out_type, func_registry)
+
+
+def register_tabular_function(func, function_name, function_doc, in_types,
out_type,
+ func_registry=None):
+ """
+ Register a user-defined tabular function.
+
+ A tabular function is one accepting a context argument of type
+ ScalarUdfContext and returning a generator of struct arrays.
+ The in_types argument must be empty and the out_type argument
+ specifies a schema. Each struct array must have field types
+ correspoding to the schema.
+
+ Parameters
+ ----------
+ func : callable
+ A callable implementing the user-defined function.
+ The only argument is the context argument of type
+ ScalarUdfContext. It must return a callable that
+ returns on each invocation a StructArray matching
+ the out_type, where an empty array indicates end.
+ function_name : str
+ Name of the function. This name must be globally unique.
+ function_doc : dict
+ A dictionary object with keys "summary" (str),
+ and "description" (str).
+ in_types : Dict[str, DataType]
+ Must be an empty dictionary.
+ out_type : DataType
+ Output type of the function.
Review Comment:
Should this be a Schema? or a struct type?
##########
python/pyarrow/tests/test_udf.py:
##########
@@ -504,3 +504,49 @@ def test_input_lifetime(unary_func_fixture):
# Calling a UDF should not have kept `v` alive longer than required
v = None
assert proxy_pool.bytes_allocated() == 0
+
+
+def _record_batch_from_iters(schema, *iters):
+ arrays = [pa.array(list(v), type=schema[i].type)
+ for i, v in enumerate(iters)]
+ return pa.RecordBatch.from_arrays(arrays=arrays, schema=schema)
+
+
+def _record_batch_for_range(schema, n):
+ return _record_batch_from_iters(schema,
+ range(n, n + 10),
+ range(n + 1, n + 11))
+
+
+def datasource1(ctx):
+ """A short dataset"""
+ import pyarrow as pa
+ schema = pa.schema([('', pa.int32()), ('', pa.int32())])
+
+ class Generator:
+ def __init__(self):
+ self.n = 3
+
+ def __call__(self, ctx):
+ if self.n == 0:
+ batch = _record_batch_from_iters(schema, [], [])
+ else:
+ self.n -= 1
+ batch = _record_batch_for_range(schema, self.n)
+ return pc.udf_result_from_record_batch(batch)
+ return Generator()
+
+
+def test_udt():
Review Comment:
Could we add a test validating the behavior of when an exception is raised
in the function?
##########
python/pyarrow/src/arrow/python/udf.cc:
##########
@@ -17,35 +17,92 @@
#include "arrow/python/udf.h"
#include "arrow/compute/function.h"
+#include "arrow/compute/kernel.h"
#include "arrow/python/common.h"
+#include "arrow/util/checked_cast.h"
namespace arrow {
using compute::ExecResult;
using compute::ExecSpan;
+using compute::Function;
+using compute::OutputType;
+using compute::ScalarFunction;
+using compute::ScalarKernel;
+using internal::checked_cast;
namespace py {
namespace {
-struct PythonUdf : public compute::KernelState {
- ScalarUdfWrapperCallback cb;
+struct PythonScalarUdfKernelState : public compute::KernelState {
+ explicit PythonScalarUdfKernelState(std::shared_ptr<OwnedRefNoGIL> function)
+ : function(function) {}
+
std::shared_ptr<OwnedRefNoGIL> function;
- std::shared_ptr<DataType> output_type;
+};
- PythonUdf(ScalarUdfWrapperCallback cb, std::shared_ptr<OwnedRefNoGIL>
function,
- const std::shared_ptr<DataType>& output_type)
- : cb(cb), function(function), output_type(output_type) {}
+struct PythonScalarUdfKernelInit {
+ explicit PythonScalarUdfKernelInit(std::shared_ptr<OwnedRefNoGIL> function)
+ : function(function) {}
// function needs to be destroyed at process exit
// and Python may no longer be initialized.
- ~PythonUdf() {
+ ~PythonScalarUdfKernelInit() {
if (_Py_IsFinalizing()) {
function->detach();
}
}
+ Result<std::unique_ptr<compute::KernelState>> operator()(
+ compute::KernelContext*, const compute::KernelInitArgs&) {
+ return std::make_unique<PythonScalarUdfKernelState>(function);
+ }
+
+ std::shared_ptr<OwnedRefNoGIL> function;
+};
+
+struct PythonTableUdfKernelInit {
+ PythonTableUdfKernelInit(std::shared_ptr<OwnedRefNoGIL> function_maker,
+ ScalarUdfWrapperCallback cb)
+ : function_maker(function_maker), cb(cb) {
+ Py_INCREF(function_maker->obj());
+ }
+
+ Result<std::unique_ptr<compute::KernelState>> operator()(
+ compute::KernelContext* ctx, const compute::KernelInitArgs&) {
+ ScalarUdfContext udf_context{ctx->memory_pool(), /*batch_length=*/0};
+ std::unique_ptr<OwnedRefNoGIL> function;
+ RETURN_NOT_OK(SafeCallIntoPython([this, &udf_context, &function] {
+ OwnedRef empty_tuple(PyTuple_New(0));
+ function = std::make_unique<OwnedRefNoGIL>(
+ cb(function_maker->obj(), udf_context, empty_tuple.obj()));
+ RETURN_NOT_OK(CheckPyError());
+ return Status::OK();
+ }));
+ if (!PyCallable_Check(function->obj())) {
+ return Status::TypeError("Expected a callable Python object.");
+ }
+ return std::make_unique<PythonScalarUdfKernelState>(
+ std::move(function));
+ }
+
+ std::shared_ptr<OwnedRefNoGIL> function_maker;
+ ScalarUdfWrapperCallback cb;
+};
+
+struct PythonUdf : public PythonScalarUdfKernelState {
+ PythonUdf(std::shared_ptr<OwnedRefNoGIL> function, ScalarUdfWrapperCallback
cb,
+ compute::OutputType output_type)
+ : PythonScalarUdfKernelState(function), cb(cb), output_type(output_type)
{}
+
+ ScalarUdfWrapperCallback cb;
+ compute::OutputType output_type;
+
Status Exec(compute::KernelContext* ctx, const ExecSpan& batch, ExecResult*
out) {
+ auto state =
+
::arrow::internal::checked_cast<PythonScalarUdfKernelState*>(ctx->state());
Review Comment:
Since you added `using internal::checked_cast;` above:
```suggestion
checked_cast<PythonScalarUdfKernelState*>(ctx->state());
```
##########
python/pyarrow/src/arrow/python/udf.cc:
##########
@@ -17,35 +17,92 @@
#include "arrow/python/udf.h"
#include "arrow/compute/function.h"
+#include "arrow/compute/kernel.h"
#include "arrow/python/common.h"
+#include "arrow/util/checked_cast.h"
namespace arrow {
using compute::ExecResult;
using compute::ExecSpan;
+using compute::Function;
+using compute::OutputType;
+using compute::ScalarFunction;
+using compute::ScalarKernel;
Review Comment:
Similarly here; was previously always `compute::ScalarKernel`, and there are
still references in the file like that.
##########
python/pyarrow/src/arrow/python/udf.cc:
##########
@@ -17,35 +17,92 @@
#include "arrow/python/udf.h"
#include "arrow/compute/function.h"
+#include "arrow/compute/kernel.h"
#include "arrow/python/common.h"
+#include "arrow/util/checked_cast.h"
namespace arrow {
using compute::ExecResult;
using compute::ExecSpan;
+using compute::Function;
+using compute::OutputType;
+using compute::ScalarFunction;
+using compute::ScalarKernel;
+using internal::checked_cast;
namespace py {
namespace {
-struct PythonUdf : public compute::KernelState {
- ScalarUdfWrapperCallback cb;
+struct PythonScalarUdfKernelState : public compute::KernelState {
+ explicit PythonScalarUdfKernelState(std::shared_ptr<OwnedRefNoGIL> function)
+ : function(function) {}
+
std::shared_ptr<OwnedRefNoGIL> function;
- std::shared_ptr<DataType> output_type;
+};
- PythonUdf(ScalarUdfWrapperCallback cb, std::shared_ptr<OwnedRefNoGIL>
function,
- const std::shared_ptr<DataType>& output_type)
- : cb(cb), function(function), output_type(output_type) {}
+struct PythonScalarUdfKernelInit {
+ explicit PythonScalarUdfKernelInit(std::shared_ptr<OwnedRefNoGIL> function)
+ : function(function) {}
// function needs to be destroyed at process exit
// and Python may no longer be initialized.
- ~PythonUdf() {
+ ~PythonScalarUdfKernelInit() {
if (_Py_IsFinalizing()) {
function->detach();
}
}
+ Result<std::unique_ptr<compute::KernelState>> operator()(
+ compute::KernelContext*, const compute::KernelInitArgs&) {
+ return std::make_unique<PythonScalarUdfKernelState>(function);
+ }
+
+ std::shared_ptr<OwnedRefNoGIL> function;
+};
+
+struct PythonTableUdfKernelInit {
+ PythonTableUdfKernelInit(std::shared_ptr<OwnedRefNoGIL> function_maker,
Review Comment:
Should this perhaps be a `generator_function` or `generator_factory`?
##########
python/pyarrow/_compute.pyx:
##########
@@ -2629,14 +2718,98 @@ def register_scalar_function(func, function_name,
function_doc, in_types,
21
]
"""
+ return register_scalar_like_function(GetRegisterScalarFunction(),
+ func, function_name, function_doc,
in_types,
+ out_type, func_registry)
+
+
+def register_tabular_function(func, function_name, function_doc, in_types,
out_type,
+ func_registry=None):
+ """
+ Register a user-defined tabular function.
+
+ A tabular function is one accepting a context argument of type
+ ScalarUdfContext and returning a generator of struct arrays.
+ The in_types argument must be empty and the out_type argument
+ specifies a schema. Each struct array must have field types
+ correspoding to the schema.
+
+ Parameters
+ ----------
+ func : callable
+ A callable implementing the user-defined function.
+ The only argument is the context argument of type
+ ScalarUdfContext. It must return a callable that
+ returns on each invocation a StructArray matching
+ the out_type, where an empty array indicates end.
+ function_name : str
+ Name of the function. This name must be globally unique.
+ function_doc : dict
+ A dictionary object with keys "summary" (str),
+ and "description" (str).
+ in_types : Dict[str, DataType]
+ Must be an empty dictionary.
Review Comment:
Why have this parameter at all?
##########
python/pyarrow/_compute.pyx:
##########
@@ -2629,14 +2718,98 @@ def register_scalar_function(func, function_name,
function_doc, in_types,
21
]
"""
+ return register_scalar_like_function(GetRegisterScalarFunction(),
+ func, function_name, function_doc,
in_types,
+ out_type, func_registry)
+
+
+def register_tabular_function(func, function_name, function_doc, in_types,
out_type,
Review Comment:
I'm not sure if "tabular" is the right word. I would have expected that to
be a function that takes a table/record batch and returns a new table / record
batch.
What about "batch generator function"?
```suggestion
def register_batch_generator_function(func, function_name, function_doc,
in_types, out_type,
```
##########
python/pyarrow/_compute.pyx:
##########
@@ -2629,14 +2718,98 @@ def register_scalar_function(func, function_name,
function_doc, in_types,
21
]
"""
+ return register_scalar_like_function(GetRegisterScalarFunction(),
+ func, function_name, function_doc,
in_types,
+ out_type, func_registry)
+
+
+def register_tabular_function(func, function_name, function_doc, in_types,
out_type,
+ func_registry=None):
+ """
+ Register a user-defined tabular function.
+
+ A tabular function is one accepting a context argument of type
+ ScalarUdfContext and returning a generator of struct arrays.
+ The in_types argument must be empty and the out_type argument
+ specifies a schema. Each struct array must have field types
+ correspoding to the schema.
+
+ Parameters
+ ----------
+ func : callable
+ A callable implementing the user-defined function.
+ The only argument is the context argument of type
+ ScalarUdfContext. It must return a callable that
+ returns on each invocation a StructArray matching
+ the out_type, where an empty array indicates end.
+ function_name : str
+ Name of the function. This name must be globally unique.
+ function_doc : dict
+ A dictionary object with keys "summary" (str),
+ and "description" (str).
+ in_types : Dict[str, DataType]
+ Must be an empty dictionary.
+ out_type : DataType
+ Output type of the function.
+ func_registry : FunctionRegistry
+ Optional function registry to use instead of the default global one.
+ """
+ return register_scalar_like_function(GetRegisterTabularFunction(),
+ func, function_name, function_doc,
in_types,
+ out_type, func_registry)
+
+
+def register_scalar_like_function(register_func, func, function_name,
function_doc, in_types,
Review Comment:
the table functions don't take any `in_types`, so I don't see why it needs
to share that logic with scalar like. Perhaps we should simply add a utility
function to handle the common doc validation for UDF registration?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]