[ 
https://issues.apache.org/jira/browse/CALCITE-1783?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16005426#comment-16005426
 ] 

Julian Hyde commented on CALCITE-1783:
--------------------------------------

So, let's figure out how to model this as a relational operator.

I think you're describing what in Hadoop terminology would be called a reducer. 
Take a look at our [Exchange 
operator|https://calcite.apache.org/apidocs/org/apache/calcite/rel/core/Exchange.html],
 which moves data so
that it is partitioned by a particular key; it has a sub-class {{SortExchange}} 
which does that and also sorts, and therefore models
Hadoop's shuffle operation. Maybe you'd expect {{Exchange}} to have multiple 
inputs and outputs, but in fact it has one input and one output, which are 
physically partitioned.

In relational algebra terms, you need an {{Aggregate}}. Like {{Exchange}}'s 
output, the input to your {{Aggregate}} is physically partitioned on the key 
into multiple "streams". (The input may also be unique on the key, and/or 
sorted on the key, and your implementation might require that, or at least 
exploit it.)

So, it follows that your "merge" operation is an aggregate function (albeit one 
that we don't expose to the end user). You can accomplish {code}merge(v0, v1, 
..., vn){code} by {code}init().add(v0).add(v1)....add(vn).result(){code} and 
the latter should be just as efficient after the optimizing compiler has kicked 
in.

As an example, the "merge" operation for MIN is MIN. The "merge" operation for 
COUNT is SUM0 (a variant of SUM that returns 0 when given no input values).

I think that 
[SubstitutionVisitor.getRollup()|https://insight.io/github.com/apache/calcite/blob/60b4f4eb10a018e7d6ab8ae4f6ac0f4d0b598b1f/core/src/main/java/org/apache/calcite/plan/SubstitutionVisitor.java?line=1316]
 is along the right lines by returning a {{SqlAggFunction}}, and we were wrong 
to add a "A merge(A, A)" method to our UDAF API.

> Support merge method in AggregateFunction
> -----------------------------------------
>
>                 Key: CALCITE-1783
>                 URL: https://issues.apache.org/jira/browse/CALCITE-1783
>             Project: Calcite
>          Issue Type: New Feature
>            Reporter: James Xu
>            Assignee: James Xu
>
> I was implementing the UDAF for BEAM SQL using Calcite, Since BEAM is used in 
> big data processing where aggregate functions will be implemented using 
> distributed algorithms. Then the `merge` method is a must to support UDAF in 
> BEAM, the corresponding aggregator interface in BEAM is `CombineFn`:
> {code:java}
>   public abstract static class CombineFn<InputT, AccumT, OutputT>
>       extends AbstractGlobalCombineFn<InputT, AccumT, OutputT> {
>     /**
>      * Returns a new, mutable accumulator value, representing the 
> accumulation of zero input values.
>      */
>     public abstract AccumT createAccumulator();
>     /**
>      * Adds the given input value to the given accumulator, returning the
>      * new accumulator value.
>      *
>      * <p>For efficiency, the input accumulator may be modified and returned.
>      */
>     public abstract AccumT addInput(AccumT accumulator, InputT input);
>     /**
>      * Returns an accumulator representing the accumulation of all the
>      * input values accumulated in the merging accumulators.
>      *
>      * <p>May modify any of the argument accumulators.  May return a
>      * fresh accumulator, or may return one of the (modified) argument
>      * accumulators.
>      */
>     public abstract AccumT mergeAccumulators(Iterable<AccumT> accumulators);
>     /**
>      * Returns the output value that is the result of combining all
>      * the input values represented by the given accumulator.
>      */
>     public abstract OutputT extractOutput(AccumT accumulator);
>   }
> {code}
> mergeAccumulators is the analogue in BEAM for `merge`.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)

Reply via email to