[
https://issues.apache.org/jira/browse/CALCITE-1783?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16005417#comment-16005417
]
Julian Hyde commented on CALCITE-1783:
--------------------------------------
In [a dev
thread|https://lists.apache.org/thread.html/b196170e083a5e1ad8aa30b313348a7ee578b41e1e03e3732efa3b41@%3Cdev.calcite.apache.org%3E],
[~xumingming] wrote:
I read the source code, and find that the user defined aggregations are
eventually invoked in
[EnumerableDefaults.groupBy_|https://github.com/apache/calcite/blob/ce2122ff2562340333bfa0ba371872fc9a9c6251/linq4j/src/main/java/org/apache/calcite/linq4j/EnumerableDefaults.java#L826].
The code here is not hard to understand, since all the data are calculated
in this single method, in a single thread. The invoke process is something
like:
{noformat}
(repeatedly)
+----------+
| |
| |
init()-+--> add() +--> result()
{noformat}
Now, if we introduce `merge` into this process, the input data will first
be splitted into bundles; in each bundle, there will be an intermediate
results, in the end, these intermediate results will be `merged` to a final
result, something like:
{noformat}
(repeatedly)
+----------+
| |
| |
init()-+--> add() +--> intermediate result +
|
(repeatedly) |
+----------+ |
| | |
| | |
init() +-v> add() +--> intermediate result +--> merge() +--> final
result
|
|
(repeatedly) |
+----------+ |
| | |
| | |
init() +-v> add() +--> intermediate result +
{noformat}
there will be multiple threads involved here, and the merge method will be
called at the end this process in a separate thread.
Two questions here:
* how to represent the `multiple threads/bundles` logic in calcite?
* where to put the merge method?
Is there already some constructs/framework i can utilize/learn, or I need
to write this new logic from scratch?
> Support merge method in AggregateFunction
> -----------------------------------------
>
> Key: CALCITE-1783
> URL: https://issues.apache.org/jira/browse/CALCITE-1783
> Project: Calcite
> Issue Type: New Feature
> Reporter: James Xu
> Assignee: James Xu
>
> I was implementing the UDAF for BEAM SQL using Calcite, Since BEAM is used in
> big data processing where aggregate functions will be implemented using
> distributed algorithms. Then the `merge` method is a must to support UDAF in
> BEAM, the corresponding aggregator interface in BEAM is `CombineFn`:
> {code:java}
> public abstract static class CombineFn<InputT, AccumT, OutputT>
> extends AbstractGlobalCombineFn<InputT, AccumT, OutputT> {
> /**
> * Returns a new, mutable accumulator value, representing the
> accumulation of zero input values.
> */
> public abstract AccumT createAccumulator();
> /**
> * Adds the given input value to the given accumulator, returning the
> * new accumulator value.
> *
> * <p>For efficiency, the input accumulator may be modified and returned.
> */
> public abstract AccumT addInput(AccumT accumulator, InputT input);
> /**
> * Returns an accumulator representing the accumulation of all the
> * input values accumulated in the merging accumulators.
> *
> * <p>May modify any of the argument accumulators. May return a
> * fresh accumulator, or may return one of the (modified) argument
> * accumulators.
> */
> public abstract AccumT mergeAccumulators(Iterable<AccumT> accumulators);
> /**
> * Returns the output value that is the result of combining all
> * the input values represented by the given accumulator.
> */
> public abstract OutputT extractOutput(AccumT accumulator);
> }
> {code}
> mergeAccumulators is the analogue in BEAM for `merge`.
--
This message was sent by Atlassian JIRA
(v6.3.15#6346)