icexelloss commented on code in PR #35514:
URL: https://github.com/apache/arrow/pull/35514#discussion_r1218526390


##########
python/pyarrow/src/arrow/python/udf.cc:
##########
@@ -101,6 +125,99 @@ struct PythonTableUdfKernelInit {
   UdfWrapperCallback cb;
 };
 
+  struct PythonUdfScalarAggregatorImpl : public ScalarUdfAggregator {
+
+    PythonUdfScalarAggregatorImpl(UdfWrapperCallback agg_cb,
+                                 std::shared_ptr<OwnedRefNoGIL> agg_function,
+          std::vector<std::shared_ptr<DataType>> input_types,
+                                 std::shared_ptr<DataType> output_type):
+      agg_cb(agg_cb),
+      agg_function(agg_function),
+      output_type(output_type) {
+        std::vector<std::shared_ptr<Field>> fields;
+        for (size_t i = 0; i < input_types.size(); i++) {
+          fields.push_back(field("", input_types[i]));
+        }
+        input_schema = schema(fields);
+      };
+
+    ~PythonUdfScalarAggregatorImpl() {
+      if (_Py_IsFinalizing()) {
+        agg_function->detach();
+      }
+    }
+
+    Status Consume(compute::KernelContext* ctx, const compute::ExecSpan& 
batch) {
+      ARROW_ASSIGN_OR_RAISE(auto rb, 
batch.ToExecBatch().ToRecordBatch(input_schema, ctx->memory_pool()));
+      values.push_back(rb);
+      return Status::OK();
+    }
+
+    Status MergeFrom(compute::KernelContext* ctx, compute::KernelState&& src) {
+      const auto& other_state = checked_cast<const 
PythonUdfScalarAggregatorImpl&>(src);
+      values.insert(values.end(), other_state.values.begin(), 
other_state.values.end());
+      return Status::OK();
+    }
+
+    Status Finalize(compute::KernelContext* ctx, Datum* out) {
+      auto state = 
arrow::internal::checked_cast<PythonUdfScalarAggregatorImpl*>(ctx->state());
+      std::shared_ptr<OwnedRefNoGIL>& function = state->agg_function;
+      const int num_args = input_schema->num_fields();
+
+      OwnedRef arg_tuple(PyTuple_New(num_args));
+      RETURN_NOT_OK(CheckPyError());
+
+      // Note: The way that batches are concatenated together
+      // would result in using double amount of the memory.
+      // This is OK for now because non decomposable aggregate
+      // UDF is supposed to be used with segmented aggregation
+      // where the size of the segment is more or less constant
+      // so doubling that is not a big deal. This can be also
+      // improved in the future to use more efficient way to
+      // concatenate.
+      ARROW_ASSIGN_OR_RAISE(
+        auto table,
+        arrow::Table::FromRecordBatches(input_schema, values)
+      );
+      ARROW_ASSIGN_OR_RAISE(
+        table, table->CombineChunks(ctx->memory_pool())
+      );
+
+      UdfContext udf_context{ctx->memory_pool(), table->num_rows()};
+      for (int arg_id = 0; arg_id < num_args; arg_id++) {
+        // Since we combined chunks thComere is only one chunk
+        std::shared_ptr<Array> c_data = table->column(arg_id)->chunk(0);
+        PyObject* data = wrap_array(c_data);
+        PyTuple_SetItem(arg_tuple.obj(), arg_id, data);
+      }
+
+      OwnedRef result(agg_cb(function->obj(), udf_context, arg_tuple.obj()));
+      RETURN_NOT_OK(CheckPyError());
+
+      // unwrapping the output for expected output type
+      if (is_scalar(result.obj())) {
+        ARROW_ASSIGN_OR_RAISE(std::shared_ptr<Scalar> val, 
unwrap_scalar(result.obj()));
+        if (*output_type != *val->type) {
+          return Status::TypeError("Expected output datatype ", 
output_type->ToString(),
+                                   ", but function returned datatype ",
+                                   val->type->ToString());
+        }
+        out->value = std::move(val);
+        return Status::OK();
+      } else {
+        return Status::TypeError("Unexpected output type: ", 
Py_TYPE(result.obj())->tp_name,

Review Comment:
   Added



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to