[ 
https://issues.apache.org/jira/browse/CALCITE-1783?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16005417#comment-16005417
 ] 

Julian Hyde commented on CALCITE-1783:
--------------------------------------

In [a dev 
thread|https://lists.apache.org/thread.html/b196170e083a5e1ad8aa30b313348a7ee578b41e1e03e3732efa3b41@%3Cdev.calcite.apache.org%3E],
 [~xumingming] wrote:

I read the source code, and find that the user defined aggregations are
eventually invoked in 
[EnumerableDefaults.groupBy_|https://github.com/apache/calcite/blob/ce2122ff2562340333bfa0ba371872fc9a9c6251/linq4j/src/main/java/org/apache/calcite/linq4j/EnumerableDefaults.java#L826].

The code here is not hard to understand, since all the data are calculated
in this single method, in a single thread. The invoke process is something
like:

{noformat}
       (repeatedly)
       +----------+
       |          |
       |          |
init()-+--> add() +--> result()
{noformat}

Now, if we introduce `merge` into this process, the input data will first
be splitted into bundles; in each bundle, there will be an intermediate
results, in the end, these intermediate results will be `merged` to a final
result, something like:

{noformat}
                  (repeatedly)
                  +----------+
                  |          |
                  |          |
           init()-+--> add() +--> intermediate result  +
                                                       |
                  (repeatedly)                         |
                  +----------+                         |
                  |          |                         |
                  |          |                         |
           init() +-v> add() +--> intermediate result  +--> merge() +--> final 
result
                                                       |
                                                       |
                  (repeatedly)                         |
                  +----------+                         |
                  |          |                         |
                  |          |                         |
           init() +-v> add() +--> intermediate result  +
{noformat}

there will be multiple threads involved here, and the merge method will be
called at the end this process in a separate thread.

Two questions here:

* how to represent the `multiple threads/bundles` logic in calcite?
* where to put the merge method?

Is there already some constructs/framework i can utilize/learn, or I need
to write this new logic from scratch?

> Support merge method in AggregateFunction
> -----------------------------------------
>
>                 Key: CALCITE-1783
>                 URL: https://issues.apache.org/jira/browse/CALCITE-1783
>             Project: Calcite
>          Issue Type: New Feature
>            Reporter: James Xu
>            Assignee: James Xu
>
> I was implementing the UDAF for BEAM SQL using Calcite, Since BEAM is used in 
> big data processing where aggregate functions will be implemented using 
> distributed algorithms. Then the `merge` method is a must to support UDAF in 
> BEAM, the corresponding aggregator interface in BEAM is `CombineFn`:
> {code:java}
>   public abstract static class CombineFn<InputT, AccumT, OutputT>
>       extends AbstractGlobalCombineFn<InputT, AccumT, OutputT> {
>     /**
>      * Returns a new, mutable accumulator value, representing the 
> accumulation of zero input values.
>      */
>     public abstract AccumT createAccumulator();
>     /**
>      * Adds the given input value to the given accumulator, returning the
>      * new accumulator value.
>      *
>      * <p>For efficiency, the input accumulator may be modified and returned.
>      */
>     public abstract AccumT addInput(AccumT accumulator, InputT input);
>     /**
>      * Returns an accumulator representing the accumulation of all the
>      * input values accumulated in the merging accumulators.
>      *
>      * <p>May modify any of the argument accumulators.  May return a
>      * fresh accumulator, or may return one of the (modified) argument
>      * accumulators.
>      */
>     public abstract AccumT mergeAccumulators(Iterable<AccumT> accumulators);
>     /**
>      * Returns the output value that is the result of combining all
>      * the input values represented by the given accumulator.
>      */
>     public abstract OutputT extractOutput(AccumT accumulator);
>   }
> {code}
> mergeAccumulators is the analogue in BEAM for `merge`.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)

Reply via email to