westonpace commented on code in PR #35514:
URL: https://github.com/apache/arrow/pull/35514#discussion_r1205046279
##########
python/pyarrow/_compute.pyx:
##########
@@ -2738,9 +2744,83 @@ def register_scalar_function(func, function_name,
function_doc, in_types, out_ty
21
]
"""
- return _register_scalar_like_function(get_register_scalar_function(),
- func, function_name, function_doc,
in_types,
- out_type, func_registry)
+ return _register_user_defined_function(get_register_scalar_function(),
+ func, function_name, function_doc,
in_types,
+ out_type, func_registry)
+
+
+def register_aggregate_function(func, function_name, function_doc, in_types,
out_type,
+ func_registry=None):
+ """
+ Register a user-defined non-decomposable aggregate function.
+
+ A non-decomposable aggregation function is a function that executes
+ aggregate operations on the whole data that it is aggregating.
+ In other words, non-decomposable aggregate function cannot be
+ split into consume/merge/finalize steps.
+
+ This is mostly useful with segemented aggregation, where the data
+ to be aggregated is continuous.
Review Comment:
"where the data to be aggregated is continuous" is confusing to me.
Perhaps:
This is often used with ordered or segmented aggregation where groups can be
emit before accumulating all of the input data.
##########
python/pyarrow/_compute.pyx:
##########
@@ -2738,9 +2744,83 @@ def register_scalar_function(func, function_name,
function_doc, in_types, out_ty
21
]
"""
- return _register_scalar_like_function(get_register_scalar_function(),
- func, function_name, function_doc,
in_types,
- out_type, func_registry)
+ return _register_user_defined_function(get_register_scalar_function(),
+ func, function_name, function_doc,
in_types,
+ out_type, func_registry)
+
+
+def register_aggregate_function(func, function_name, function_doc, in_types,
out_type,
+ func_registry=None):
+ """
+ Register a user-defined non-decomposable aggregate function.
+
+ A non-decomposable aggregation function is a function that executes
+ aggregate operations on the whole data that it is aggregating.
+ In other words, non-decomposable aggregate function cannot be
+ split into consume/merge/finalize steps.
+
+ This is mostly useful with segemented aggregation, where the data
+ to be aggregated is continuous.
+
+ Parameters
+ ----------
+ func : callable
+ A callable implementing the user-defined function.
+ The first argument is the context argument of type
+ UdfContext.
+ Then, it must take arguments equal to the number of
+ in_types defined. It must return Scalar matching the
Review Comment:
```suggestion
in_types defined. It must return a Scalar matching the
```
##########
python/pyarrow/_compute.pyx:
##########
@@ -2783,36 +2863,25 @@ def register_tabular_function(func, function_name,
function_doc, in_types, out_t
with nogil:
c_type =
<shared_ptr[CDataType]>make_shared[CStructType](deref(c_schema).fields())
out_type = pyarrow_wrap_data_type(c_type)
- return _register_scalar_like_function(get_register_tabular_function(),
- func, function_name, function_doc,
in_types,
- out_type, func_registry)
+ return _register_user_defined_function(get_register_tabular_function(),
+ func, function_name, function_doc,
in_types,
+ out_type, func_registry)
-def _register_scalar_like_function(register_func, func, function_name,
function_doc, in_types,
- out_type, func_registry=None):
+def _register_user_defined_function(register_func, func, function_name,
function_doc, in_types,
+ out_type, func_registry=None):
"""
- Register a user-defined scalar-like function.
+ Register a user-defined function.
- A scalar-like function is a callable accepting a first
- context argument of type ScalarUdfContext as well as
- possibly additional Arrow arguments, and returning a
- an Arrow result appropriate for the kind of function.
- A scalar function and a tabular function are examples
- for scalar-like functions.
- This function is normally not called directly but via
- register_scalar_function or register_tabular_function.
+ This method itself doesn't care what the type of the UDF
Review Comment:
Thanks for this.
##########
python/pyarrow/src/arrow/python/udf.cc:
##########
@@ -65,6 +69,26 @@ struct PythonUdfKernelInit {
std::shared_ptr<OwnedRefNoGIL> function;
};
+struct ScalarUdfAggregator : public compute::KernelState {
+ virtual Status Consume(compute::KernelContext* ctx, const compute::ExecSpan&
batch) = 0;
+ virtual Status MergeFrom(compute::KernelContext* ctx, compute::KernelState&&
src) = 0;
+ virtual Status Finalize(compute::KernelContext* ctx, Datum* out) = 0;
+};
+
+arrow::Status AggregateUdfConsume(compute::KernelContext* ctx, const
compute::ExecSpan& batch) {
+ return checked_cast<ScalarUdfAggregator*>(ctx->state())->Consume(ctx, batch);
+}
+
+arrow::Status AggregateUdfMerge(compute::KernelContext* ctx,
compute::KernelState&& src,
+ compute::KernelState* dst) {
+ return checked_cast<ScalarUdfAggregator*>(dst)->MergeFrom(ctx,
std::move(src));
Review Comment:
I'm not sure the `std::move` has any effect here (`src` is already a `&&`)
but I could be wrong.
##########
python/pyarrow/src/arrow/python/udf.cc:
##########
@@ -101,6 +125,99 @@ struct PythonTableUdfKernelInit {
UdfWrapperCallback cb;
};
+ struct PythonUdfScalarAggregatorImpl : public ScalarUdfAggregator {
+
+ PythonUdfScalarAggregatorImpl(UdfWrapperCallback agg_cb,
+ std::shared_ptr<OwnedRefNoGIL> agg_function,
+ std::vector<std::shared_ptr<DataType>> input_types,
+ std::shared_ptr<DataType> output_type):
+ agg_cb(agg_cb),
+ agg_function(agg_function),
+ output_type(output_type) {
+ std::vector<std::shared_ptr<Field>> fields;
+ for (size_t i = 0; i < input_types.size(); i++) {
+ fields.push_back(field("", input_types[i]));
+ }
+ input_schema = schema(fields);
+ };
+
+ ~PythonUdfScalarAggregatorImpl() {
+ if (_Py_IsFinalizing()) {
+ agg_function->detach();
+ }
+ }
+
+ Status Consume(compute::KernelContext* ctx, const compute::ExecSpan&
batch) {
+ ARROW_ASSIGN_OR_RAISE(auto rb,
batch.ToExecBatch().ToRecordBatch(input_schema, ctx->memory_pool()));
+ values.push_back(rb);
+ return Status::OK();
+ }
+
+ Status MergeFrom(compute::KernelContext* ctx, compute::KernelState&& src) {
+ const auto& other_state = checked_cast<const
PythonUdfScalarAggregatorImpl&>(src);
+ values.insert(values.end(), other_state.values.begin(),
other_state.values.end());
+ return Status::OK();
+ }
+
+ Status Finalize(compute::KernelContext* ctx, Datum* out) {
+ auto state =
arrow::internal::checked_cast<PythonUdfScalarAggregatorImpl*>(ctx->state());
+ std::shared_ptr<OwnedRefNoGIL>& function = state->agg_function;
+ const int num_args = input_schema->num_fields();
+
+ OwnedRef arg_tuple(PyTuple_New(num_args));
+ RETURN_NOT_OK(CheckPyError());
+
+ // Note: The way that batches are concatenated together
+ // would result in using double amount of the memory.
+ // This is OK for now because non decomposable aggregate
+ // UDF is supposed to be used with segmented aggregation
+ // where the size of the segment is more or less constant
+ // so doubling that is not a big deal. This can be also
+ // improved in the future to use more efficient way to
+ // concatenate.
+ ARROW_ASSIGN_OR_RAISE(
+ auto table,
+ arrow::Table::FromRecordBatches(input_schema, values)
+ );
+ ARROW_ASSIGN_OR_RAISE(
+ table, table->CombineChunks(ctx->memory_pool())
+ );
+
+ UdfContext udf_context{ctx->memory_pool(), table->num_rows()};
+ for (int arg_id = 0; arg_id < num_args; arg_id++) {
+ // Since we combined chunks thComere is only one chunk
+ std::shared_ptr<Array> c_data = table->column(arg_id)->chunk(0);
Review Comment:
I believe there is a small possibility an empty table will give you zero
chunks. However, it's not clear to me that you could get here with an empty
table?
##########
python/pyarrow/src/arrow/python/udf.cc:
##########
@@ -101,6 +125,99 @@ struct PythonTableUdfKernelInit {
UdfWrapperCallback cb;
};
+ struct PythonUdfScalarAggregatorImpl : public ScalarUdfAggregator {
+
+ PythonUdfScalarAggregatorImpl(UdfWrapperCallback agg_cb,
+ std::shared_ptr<OwnedRefNoGIL> agg_function,
+ std::vector<std::shared_ptr<DataType>> input_types,
+ std::shared_ptr<DataType> output_type):
+ agg_cb(agg_cb),
+ agg_function(agg_function),
+ output_type(output_type) {
+ std::vector<std::shared_ptr<Field>> fields;
+ for (size_t i = 0; i < input_types.size(); i++) {
+ fields.push_back(field("", input_types[i]));
+ }
+ input_schema = schema(fields);
+ };
+
+ ~PythonUdfScalarAggregatorImpl() {
+ if (_Py_IsFinalizing()) {
+ agg_function->detach();
+ }
+ }
+
+ Status Consume(compute::KernelContext* ctx, const compute::ExecSpan&
batch) {
+ ARROW_ASSIGN_OR_RAISE(auto rb,
batch.ToExecBatch().ToRecordBatch(input_schema, ctx->memory_pool()));
+ values.push_back(rb);
+ return Status::OK();
+ }
+
+ Status MergeFrom(compute::KernelContext* ctx, compute::KernelState&& src) {
+ const auto& other_state = checked_cast<const
PythonUdfScalarAggregatorImpl&>(src);
+ values.insert(values.end(), other_state.values.begin(),
other_state.values.end());
+ return Status::OK();
+ }
+
+ Status Finalize(compute::KernelContext* ctx, Datum* out) {
+ auto state =
arrow::internal::checked_cast<PythonUdfScalarAggregatorImpl*>(ctx->state());
+ std::shared_ptr<OwnedRefNoGIL>& function = state->agg_function;
+ const int num_args = input_schema->num_fields();
+
+ OwnedRef arg_tuple(PyTuple_New(num_args));
+ RETURN_NOT_OK(CheckPyError());
+
+ // Note: The way that batches are concatenated together
+ // would result in using double amount of the memory.
+ // This is OK for now because non decomposable aggregate
+ // UDF is supposed to be used with segmented aggregation
+ // where the size of the segment is more or less constant
+ // so doubling that is not a big deal. This can be also
+ // improved in the future to use more efficient way to
+ // concatenate.
+ ARROW_ASSIGN_OR_RAISE(
+ auto table,
+ arrow::Table::FromRecordBatches(input_schema, values)
+ );
+ ARROW_ASSIGN_OR_RAISE(
+ table, table->CombineChunks(ctx->memory_pool())
+ );
+
+ UdfContext udf_context{ctx->memory_pool(), table->num_rows()};
+ for (int arg_id = 0; arg_id < num_args; arg_id++) {
+ // Since we combined chunks thComere is only one chunk
+ std::shared_ptr<Array> c_data = table->column(arg_id)->chunk(0);
+ PyObject* data = wrap_array(c_data);
+ PyTuple_SetItem(arg_tuple.obj(), arg_id, data);
+ }
+
+ OwnedRef result(agg_cb(function->obj(), udf_context, arg_tuple.obj()));
+ RETURN_NOT_OK(CheckPyError());
+
+ // unwrapping the output for expected output type
+ if (is_scalar(result.obj())) {
+ ARROW_ASSIGN_OR_RAISE(std::shared_ptr<Scalar> val,
unwrap_scalar(result.obj()));
+ if (*output_type != *val->type) {
+ return Status::TypeError("Expected output datatype ",
output_type->ToString(),
+ ", but function returned datatype ",
+ val->type->ToString());
+ }
+ out->value = std::move(val);
+ return Status::OK();
+ } else {
+ return Status::TypeError("Unexpected output type: ",
Py_TYPE(result.obj())->tp_name,
+ " (expected Scalar)");
+ }
+ return Status::OK();
Review Comment:
```suggestion
}
return Status::TypeError("Unexpected output type: ",
Py_TYPE(result.obj())->tp_name,
" (expected Scalar)");
```
##########
python/pyarrow/src/arrow/python/udf.cc:
##########
@@ -101,6 +125,99 @@ struct PythonTableUdfKernelInit {
UdfWrapperCallback cb;
};
+ struct PythonUdfScalarAggregatorImpl : public ScalarUdfAggregator {
+
+ PythonUdfScalarAggregatorImpl(UdfWrapperCallback agg_cb,
+ std::shared_ptr<OwnedRefNoGIL> agg_function,
+ std::vector<std::shared_ptr<DataType>> input_types,
+ std::shared_ptr<DataType> output_type):
+ agg_cb(agg_cb),
+ agg_function(agg_function),
+ output_type(output_type) {
+ std::vector<std::shared_ptr<Field>> fields;
+ for (size_t i = 0; i < input_types.size(); i++) {
+ fields.push_back(field("", input_types[i]));
+ }
+ input_schema = schema(fields);
+ };
+
+ ~PythonUdfScalarAggregatorImpl() {
+ if (_Py_IsFinalizing()) {
+ agg_function->detach();
+ }
+ }
+
+ Status Consume(compute::KernelContext* ctx, const compute::ExecSpan&
batch) {
+ ARROW_ASSIGN_OR_RAISE(auto rb,
batch.ToExecBatch().ToRecordBatch(input_schema, ctx->memory_pool()));
+ values.push_back(rb);
+ return Status::OK();
+ }
+
+ Status MergeFrom(compute::KernelContext* ctx, compute::KernelState&& src) {
+ const auto& other_state = checked_cast<const
PythonUdfScalarAggregatorImpl&>(src);
+ values.insert(values.end(), other_state.values.begin(),
other_state.values.end());
+ return Status::OK();
+ }
+
+ Status Finalize(compute::KernelContext* ctx, Datum* out) {
+ auto state =
arrow::internal::checked_cast<PythonUdfScalarAggregatorImpl*>(ctx->state());
+ std::shared_ptr<OwnedRefNoGIL>& function = state->agg_function;
+ const int num_args = input_schema->num_fields();
+
+ OwnedRef arg_tuple(PyTuple_New(num_args));
+ RETURN_NOT_OK(CheckPyError());
+
+ // Note: The way that batches are concatenated together
+ // would result in using double amount of the memory.
+ // This is OK for now because non decomposable aggregate
+ // UDF is supposed to be used with segmented aggregation
+ // where the size of the segment is more or less constant
+ // so doubling that is not a big deal. This can be also
+ // improved in the future to use more efficient way to
+ // concatenate.
+ ARROW_ASSIGN_OR_RAISE(
+ auto table,
+ arrow::Table::FromRecordBatches(input_schema, values)
+ );
+ ARROW_ASSIGN_OR_RAISE(
+ table, table->CombineChunks(ctx->memory_pool())
+ );
+
+ UdfContext udf_context{ctx->memory_pool(), table->num_rows()};
+ for (int arg_id = 0; arg_id < num_args; arg_id++) {
+ // Since we combined chunks thComere is only one chunk
+ std::shared_ptr<Array> c_data = table->column(arg_id)->chunk(0);
+ PyObject* data = wrap_array(c_data);
+ PyTuple_SetItem(arg_tuple.obj(), arg_id, data);
+ }
+
+ OwnedRef result(agg_cb(function->obj(), udf_context, arg_tuple.obj()));
+ RETURN_NOT_OK(CheckPyError());
+
+ // unwrapping the output for expected output type
+ if (is_scalar(result.obj())) {
+ ARROW_ASSIGN_OR_RAISE(std::shared_ptr<Scalar> val,
unwrap_scalar(result.obj()));
+ if (*output_type != *val->type) {
+ return Status::TypeError("Expected output datatype ",
output_type->ToString(),
+ ", but function returned datatype ",
+ val->type->ToString());
+ }
+ out->value = std::move(val);
+ return Status::OK();
+ } else {
+ return Status::TypeError("Unexpected output type: ",
Py_TYPE(result.obj())->tp_name,
Review Comment:
Can we add a test case for this?
##########
python/pyarrow/conftest.py:
##########
@@ -278,3 +278,59 @@ def unary_function(ctx, x):
{"array": pa.int64()},
pa.int64())
return unary_function, func_name
+
+
[email protected](scope="session")
+def unary_agg_func_fixture():
+ """
+ Register a unary aggregate function
+ """
+ from pyarrow import compute as pc
+ import numpy as np
+
+ def func(ctx, x):
+ return pa.scalar(np.nanmean(x))
+
+ func_name = "y=avg(x)"
+ func_doc = {"summary": "y=avg(x)",
+ "description": "find mean of x"}
+
+ pc.register_aggregate_function(func,
+ func_name,
+ func_doc,
+ {
+ "x": pa.float64(),
+ },
+ pa.float64()
+ )
+ return func, func_name
+
+
[email protected](scope="session")
+def varargs_agg_func_fixture():
+ """
+ Register a unary aggregate function
+ """
+ from pyarrow import compute as pc
+ import numpy as np
+
+ def func(ctx, *args):
+ sum = 0.0
+ for arg in args:
+ sum += np.nanmean(arg)
+ return pa.scalar(sum)
+
+ func_name = "y=sum_mean(x...)"
+ func_doc = {"summary": "Varargs aggregate",
+ "description": "Varargs aggregate"}
+
+ pc.register_aggregate_function(func,
+ func_name,
+ func_doc,
+ {
+ "x": pa.int64(),
+ "y": pa.float64()
Review Comment:
What are `x` and `y` if this is a varargs function?
##########
python/pyarrow/_compute.pyx:
##########
@@ -2738,9 +2744,83 @@ def register_scalar_function(func, function_name,
function_doc, in_types, out_ty
21
]
"""
- return _register_scalar_like_function(get_register_scalar_function(),
- func, function_name, function_doc,
in_types,
- out_type, func_registry)
+ return _register_user_defined_function(get_register_scalar_function(),
+ func, function_name, function_doc,
in_types,
+ out_type, func_registry)
+
+
+def register_aggregate_function(func, function_name, function_doc, in_types,
out_type,
+ func_registry=None):
+ """
+ Register a user-defined non-decomposable aggregate function.
+
+ A non-decomposable aggregation function is a function that executes
+ aggregate operations on the whole data that it is aggregating.
+ In other words, non-decomposable aggregate function cannot be
+ split into consume/merge/finalize steps.
+
+ This is mostly useful with segemented aggregation, where the data
+ to be aggregated is continuous.
+
+ Parameters
+ ----------
+ func : callable
+ A callable implementing the user-defined function.
+ The first argument is the context argument of type
+ UdfContext.
+ Then, it must take arguments equal to the number of
+ in_types defined. It must return Scalar matching the
+ out_type.
+ To define a varargs function, pass a callable that takes
+ varargs. The in_type needs to match in type of inputs when
+ the function gets called.
+
+ function_name : str
+ Name of the function. This name must be globally unique.
+ function_doc : dict
+ A dictionary object with keys "summary" (str),
+ and "description" (str).
+ in_types : Dict[str, DataType]
+ A dictionary mapping function argument names to
+ their respective DataType.
+ The argument names will be used to generate
+ documentation for the function. The number of
+ arguments specified here determines the function
+ arity.
+ out_type : DataType
+ Output type of the function.
+ func_registry : FunctionRegistry
+ Optional function registry to use instead of the default global one.
+
+ Examples
+ --------
+ >>> import numpy as np
+ >>> import pyarrow as pa
+ >>> import pyarrow.compute as pc
+ >>>
+ >>> func_doc = {}
+ >>> func_doc["summary"] = "simple mean udf"
+ >>> func_doc["description"] = "compute mean"
+ >>>
+ >>> def compute_mean(ctx, array):
+ ... return pa.scalar(np.nanmean(array))
+ >>>
+ >>> func_name = "py_compute_mean"
+ >>> in_types = {"array": pa.int64()}
+ >>> out_type = pa.float64()
+ >>> pc.register_aggregate_function(compute_mean, func_name, func_doc,
Review Comment:
This example is ok but we already have a mean aggregate function. Can you
use `np.median` instead?
##########
python/pyarrow/_compute.pyx:
##########
@@ -2738,9 +2744,83 @@ def register_scalar_function(func, function_name,
function_doc, in_types, out_ty
21
]
"""
- return _register_scalar_like_function(get_register_scalar_function(),
- func, function_name, function_doc,
in_types,
- out_type, func_registry)
+ return _register_user_defined_function(get_register_scalar_function(),
+ func, function_name, function_doc,
in_types,
+ out_type, func_registry)
+
+
+def register_aggregate_function(func, function_name, function_doc, in_types,
out_type,
+ func_registry=None):
+ """
+ Register a user-defined non-decomposable aggregate function.
+
+ A non-decomposable aggregation function is a function that executes
+ aggregate operations on the whole data that it is aggregating.
+ In other words, non-decomposable aggregate function cannot be
+ split into consume/merge/finalize steps.
+
+ This is mostly useful with segemented aggregation, where the data
+ to be aggregated is continuous.
+
+ Parameters
+ ----------
+ func : callable
+ A callable implementing the user-defined function.
+ The first argument is the context argument of type
+ UdfContext.
+ Then, it must take arguments equal to the number of
+ in_types defined. It must return Scalar matching the
+ out_type.
+ To define a varargs function, pass a callable that takes
+ varargs. The in_type needs to match in type of inputs when
Review Comment:
This sentence is confusing to me:
To define a varargs function, pass a callable that takes varargs.
I don't think varargs is a standard python concept (e.g. I always see it
just referred to as `*args`)
##########
python/pyarrow/src/arrow/python/udf.cc:
##########
@@ -101,6 +125,99 @@ struct PythonTableUdfKernelInit {
UdfWrapperCallback cb;
};
+ struct PythonUdfScalarAggregatorImpl : public ScalarUdfAggregator {
+
+ PythonUdfScalarAggregatorImpl(UdfWrapperCallback agg_cb,
+ std::shared_ptr<OwnedRefNoGIL> agg_function,
+ std::vector<std::shared_ptr<DataType>> input_types,
+ std::shared_ptr<DataType> output_type):
+ agg_cb(agg_cb),
+ agg_function(agg_function),
+ output_type(output_type) {
+ std::vector<std::shared_ptr<Field>> fields;
+ for (size_t i = 0; i < input_types.size(); i++) {
+ fields.push_back(field("", input_types[i]));
+ }
+ input_schema = schema(fields);
+ };
+
+ ~PythonUdfScalarAggregatorImpl() {
+ if (_Py_IsFinalizing()) {
+ agg_function->detach();
+ }
+ }
+
+ Status Consume(compute::KernelContext* ctx, const compute::ExecSpan&
batch) {
+ ARROW_ASSIGN_OR_RAISE(auto rb,
batch.ToExecBatch().ToRecordBatch(input_schema, ctx->memory_pool()));
+ values.push_back(rb);
+ return Status::OK();
+ }
+
+ Status MergeFrom(compute::KernelContext* ctx, compute::KernelState&& src) {
+ const auto& other_state = checked_cast<const
PythonUdfScalarAggregatorImpl&>(src);
+ values.insert(values.end(), other_state.values.begin(),
other_state.values.end());
Review Comment:
I think there is a slight benefit to moving instead of copying all these
batches (it's just a few shared_ptr's so nothing huge but still):
https://stackoverflow.com/questions/15004517/moving-elements-from-stdvector-to-another-one
##########
python/pyarrow/_compute.pyx:
##########
@@ -2738,9 +2744,83 @@ def register_scalar_function(func, function_name,
function_doc, in_types, out_ty
21
]
"""
- return _register_scalar_like_function(get_register_scalar_function(),
- func, function_name, function_doc,
in_types,
- out_type, func_registry)
+ return _register_user_defined_function(get_register_scalar_function(),
+ func, function_name, function_doc,
in_types,
+ out_type, func_registry)
+
+
+def register_aggregate_function(func, function_name, function_doc, in_types,
out_type,
+ func_registry=None):
+ """
+ Register a user-defined non-decomposable aggregate function.
+
+ A non-decomposable aggregation function is a function that executes
+ aggregate operations on the whole data that it is aggregating.
+ In other words, non-decomposable aggregate function cannot be
+ split into consume/merge/finalize steps.
+
+ This is mostly useful with segemented aggregation, where the data
+ to be aggregated is continuous.
+
+ Parameters
+ ----------
+ func : callable
+ A callable implementing the user-defined function.
+ The first argument is the context argument of type
+ UdfContext.
+ Then, it must take arguments equal to the number of
+ in_types defined. It must return Scalar matching the
+ out_type.
+ To define a varargs function, pass a callable that takes
+ varargs. The in_type needs to match in type of inputs when
+ the function gets called.
+
+ function_name : str
+ Name of the function. This name must be globally unique.
Review Comment:
What does globally unique mean?
##########
python/pyarrow/src/arrow/python/udf.cc:
##########
@@ -15,15 +15,19 @@
// specific language governing permissions and limitations
// under the License.
+#include <iostream>
+
Review Comment:
```suggestion
```
##########
python/pyarrow/src/arrow/python/udf.cc:
##########
@@ -101,6 +125,99 @@ struct PythonTableUdfKernelInit {
UdfWrapperCallback cb;
};
+ struct PythonUdfScalarAggregatorImpl : public ScalarUdfAggregator {
+
+ PythonUdfScalarAggregatorImpl(UdfWrapperCallback agg_cb,
+ std::shared_ptr<OwnedRefNoGIL> agg_function,
+ std::vector<std::shared_ptr<DataType>> input_types,
+ std::shared_ptr<DataType> output_type):
+ agg_cb(agg_cb),
+ agg_function(agg_function),
+ output_type(output_type) {
+ std::vector<std::shared_ptr<Field>> fields;
+ for (size_t i = 0; i < input_types.size(); i++) {
+ fields.push_back(field("", input_types[i]));
+ }
+ input_schema = schema(fields);
+ };
+
+ ~PythonUdfScalarAggregatorImpl() {
+ if (_Py_IsFinalizing()) {
+ agg_function->detach();
+ }
+ }
+
+ Status Consume(compute::KernelContext* ctx, const compute::ExecSpan&
batch) {
+ ARROW_ASSIGN_OR_RAISE(auto rb,
batch.ToExecBatch().ToRecordBatch(input_schema, ctx->memory_pool()));
+ values.push_back(rb);
Review Comment:
```suggestion
values.push_back(std::move(rb));
```
##########
python/pyarrow/src/arrow/python/udf.cc:
##########
@@ -101,6 +125,99 @@ struct PythonTableUdfKernelInit {
UdfWrapperCallback cb;
};
+ struct PythonUdfScalarAggregatorImpl : public ScalarUdfAggregator {
+
+ PythonUdfScalarAggregatorImpl(UdfWrapperCallback agg_cb,
+ std::shared_ptr<OwnedRefNoGIL> agg_function,
+ std::vector<std::shared_ptr<DataType>> input_types,
+ std::shared_ptr<DataType> output_type):
+ agg_cb(agg_cb),
+ agg_function(agg_function),
+ output_type(output_type) {
+ std::vector<std::shared_ptr<Field>> fields;
+ for (size_t i = 0; i < input_types.size(); i++) {
+ fields.push_back(field("", input_types[i]));
+ }
+ input_schema = schema(fields);
+ };
+
+ ~PythonUdfScalarAggregatorImpl() {
+ if (_Py_IsFinalizing()) {
+ agg_function->detach();
+ }
+ }
+
+ Status Consume(compute::KernelContext* ctx, const compute::ExecSpan&
batch) {
+ ARROW_ASSIGN_OR_RAISE(auto rb,
batch.ToExecBatch().ToRecordBatch(input_schema, ctx->memory_pool()));
+ values.push_back(rb);
+ return Status::OK();
+ }
+
+ Status MergeFrom(compute::KernelContext* ctx, compute::KernelState&& src) {
+ const auto& other_state = checked_cast<const
PythonUdfScalarAggregatorImpl&>(src);
+ values.insert(values.end(), other_state.values.begin(),
other_state.values.end());
+ return Status::OK();
+ }
+
+ Status Finalize(compute::KernelContext* ctx, Datum* out) {
+ auto state =
arrow::internal::checked_cast<PythonUdfScalarAggregatorImpl*>(ctx->state());
+ std::shared_ptr<OwnedRefNoGIL>& function = state->agg_function;
+ const int num_args = input_schema->num_fields();
+
+ OwnedRef arg_tuple(PyTuple_New(num_args));
+ RETURN_NOT_OK(CheckPyError());
+
+ // Note: The way that batches are concatenated together
+ // would result in using double amount of the memory.
+ // This is OK for now because non decomposable aggregate
+ // UDF is supposed to be used with segmented aggregation
+ // where the size of the segment is more or less constant
+ // so doubling that is not a big deal. This can be also
+ // improved in the future to use more efficient way to
+ // concatenate.
+ ARROW_ASSIGN_OR_RAISE(
+ auto table,
+ arrow::Table::FromRecordBatches(input_schema, values)
+ );
+ ARROW_ASSIGN_OR_RAISE(
+ table, table->CombineChunks(ctx->memory_pool())
+ );
+
+ UdfContext udf_context{ctx->memory_pool(), table->num_rows()};
+ for (int arg_id = 0; arg_id < num_args; arg_id++) {
+ // Since we combined chunks thComere is only one chunk
Review Comment:
```suggestion
// Since we combined chunks there is only one chunk
```
##########
python/pyarrow/src/arrow/python/udf.cc:
##########
@@ -65,6 +69,26 @@ struct PythonUdfKernelInit {
std::shared_ptr<OwnedRefNoGIL> function;
};
+struct ScalarUdfAggregator : public compute::KernelState {
+ virtual Status Consume(compute::KernelContext* ctx, const compute::ExecSpan&
batch) = 0;
+ virtual Status MergeFrom(compute::KernelContext* ctx, compute::KernelState&&
src) = 0;
+ virtual Status Finalize(compute::KernelContext* ctx, Datum* out) = 0;
+};
+
+arrow::Status AggregateUdfConsume(compute::KernelContext* ctx, const
compute::ExecSpan& batch) {
+ return checked_cast<ScalarUdfAggregator*>(ctx->state())->Consume(ctx, batch);
+}
+
+arrow::Status AggregateUdfMerge(compute::KernelContext* ctx,
compute::KernelState&& src,
+ compute::KernelState* dst) {
+ return checked_cast<ScalarUdfAggregator*>(dst)->MergeFrom(ctx,
std::move(src));
+}
+
+arrow::Status AggregateUdfFinalize(compute::KernelContext* ctx, arrow::Datum*
out) {
+ auto udf = checked_cast<ScalarUdfAggregator*>(ctx->state());
+ return SafeCallIntoPython([&]() -> Status {return udf->Finalize(ctx, out);});
Review Comment:
Should `SafeCallIntoPython` be a part of `PythonUdfScalarAggregatorImpl` and
not `ScalarUdfAggregator`?
##########
python/pyarrow/src/arrow/python/udf.cc:
##########
@@ -234,6 +351,56 @@ Status RegisterTabularFunction(PyObject* user_function,
UdfWrapperCallback wrapp
wrapper, options, registry);
}
+Status AddAggKernel(std::shared_ptr<compute::KernelSignature> sig,
compute::KernelInit init,
+ compute::ScalarAggregateFunction* func) {
+
+ compute::ScalarAggregateKernel kernel(std::move(sig), std::move(init),
AggregateUdfConsume, AggregateUdfMerge, AggregateUdfFinalize,
/*ordered=*/false);
+ RETURN_NOT_OK(func->AddKernel(std::move(kernel)));
+ return Status::OK();
+}
+
+Status RegisterAggregateFunction(PyObject* agg_function, UdfWrapperCallback
agg_wrapper,
+ const UdfOptions& options,
+ compute::FunctionRegistry*
registry) {
+ if (!PyCallable_Check(agg_function)) {
+ return Status::TypeError("Expected a callable Python object.");
+ }
+
+ if (registry == NULLPTR) {
+ registry = compute::GetFunctionRegistry();
+ }
+
+ static auto default_scalar_aggregate_options =
compute::ScalarAggregateOptions::Defaults();
+ auto aggregate_func = std::make_shared<compute::ScalarAggregateFunction>(
+ options.func_name, options.arity, options.func_doc,
&default_scalar_aggregate_options);
+
+ Py_INCREF(agg_function);
Review Comment:
Why is this needed? Can we wrap `agg_function` in an `OwnedRef` instead?
##########
python/pyarrow/src/arrow/python/udf.cc:
##########
@@ -101,6 +125,99 @@ struct PythonTableUdfKernelInit {
UdfWrapperCallback cb;
};
+ struct PythonUdfScalarAggregatorImpl : public ScalarUdfAggregator {
+
+ PythonUdfScalarAggregatorImpl(UdfWrapperCallback agg_cb,
+ std::shared_ptr<OwnedRefNoGIL> agg_function,
+ std::vector<std::shared_ptr<DataType>> input_types,
+ std::shared_ptr<DataType> output_type):
+ agg_cb(agg_cb),
+ agg_function(agg_function),
+ output_type(output_type) {
+ std::vector<std::shared_ptr<Field>> fields;
+ for (size_t i = 0; i < input_types.size(); i++) {
+ fields.push_back(field("", input_types[i]));
+ }
+ input_schema = schema(fields);
+ };
+
+ ~PythonUdfScalarAggregatorImpl() {
+ if (_Py_IsFinalizing()) {
+ agg_function->detach();
+ }
+ }
+
+ Status Consume(compute::KernelContext* ctx, const compute::ExecSpan&
batch) {
+ ARROW_ASSIGN_OR_RAISE(auto rb,
batch.ToExecBatch().ToRecordBatch(input_schema, ctx->memory_pool()));
+ values.push_back(rb);
+ return Status::OK();
+ }
+
+ Status MergeFrom(compute::KernelContext* ctx, compute::KernelState&& src) {
+ const auto& other_state = checked_cast<const
PythonUdfScalarAggregatorImpl&>(src);
+ values.insert(values.end(), other_state.values.begin(),
other_state.values.end());
+ return Status::OK();
+ }
+
+ Status Finalize(compute::KernelContext* ctx, Datum* out) {
+ auto state =
arrow::internal::checked_cast<PythonUdfScalarAggregatorImpl*>(ctx->state());
+ std::shared_ptr<OwnedRefNoGIL>& function = state->agg_function;
+ const int num_args = input_schema->num_fields();
+
+ OwnedRef arg_tuple(PyTuple_New(num_args));
+ RETURN_NOT_OK(CheckPyError());
+
+ // Note: The way that batches are concatenated together
+ // would result in using double amount of the memory.
+ // This is OK for now because non decomposable aggregate
+ // UDF is supposed to be used with segmented aggregation
+ // where the size of the segment is more or less constant
+ // so doubling that is not a big deal. This can be also
+ // improved in the future to use more efficient way to
+ // concatenate.
+ ARROW_ASSIGN_OR_RAISE(
+ auto table,
+ arrow::Table::FromRecordBatches(input_schema, values)
+ );
+ ARROW_ASSIGN_OR_RAISE(
+ table, table->CombineChunks(ctx->memory_pool())
+ );
+
+ UdfContext udf_context{ctx->memory_pool(), table->num_rows()};
+ for (int arg_id = 0; arg_id < num_args; arg_id++) {
+ // Since we combined chunks thComere is only one chunk
+ std::shared_ptr<Array> c_data = table->column(arg_id)->chunk(0);
+ PyObject* data = wrap_array(c_data);
+ PyTuple_SetItem(arg_tuple.obj(), arg_id, data);
+ }
+
+ OwnedRef result(agg_cb(function->obj(), udf_context, arg_tuple.obj()));
+ RETURN_NOT_OK(CheckPyError());
+
+ // unwrapping the output for expected output type
+ if (is_scalar(result.obj())) {
+ ARROW_ASSIGN_OR_RAISE(std::shared_ptr<Scalar> val,
unwrap_scalar(result.obj()));
+ if (*output_type != *val->type) {
+ return Status::TypeError("Expected output datatype ",
output_type->ToString(),
Review Comment:
Can we add a test case for this?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]