So, let's figure out how to model this as a relational operator. I think you're describing what in Hadoop terminology would be called a reducer. Take a look at our Exchange operator[1], which moves data so that it is partitioned by a particular key; it has a sub-class SortExchange which does that and also sorts, and therefore models Hadoop's shuffle operation. Maybe you'd expect Exchange to have multiple inputs and outputs, but in fact it has one input and one output, which are physically partitioned.
In relational algebra terms, you need an Aggregate. Like Exchange's output, the input to your Aggregate is physically partitioned on the key into multiple "streams". (The input may also be unique on the key, and/or sorted on the key, and your implementation might require that, or at least exploit it.) So, it follows that your "merge" operation is an aggregate function (albeit one that we don't expose to the end user). You can accomplish "merge(v0, v1, ..., vn)" by "init().add(v0).add(v1)....add(vn).result()", which should be just as efficient after the optimizing compiler has kicked in. As an example, the "merge" operation for MIN is MIN. The "merge" operation for COUNT is SUM0 (a variant of SUM that returns 0 when given no input values). I think that SubstitutionVisitor.getRollup() [2] is along the right lines by returning a SqlAggFunction, and we were wrong to add a "A merge(A, A)" method to our UDAF API. Julian [1] https://calcite.apache.org/apidocs/org/apache/calcite/rel/core/Exchange.html [2] https://insight.io/github.com/apache/calcite/blob/60b4f4eb10a018e7d6ab8ae4f6ac0f4d0b598b1f/core/src/main/java/org/apache/calcite/plan/SubstitutionVisitor.java?line=1316 On Tue, May 9, 2017 at 11:35 PM, James <[email protected]> wrote: > Hi Julian, need some help here: > > I read the source code, and find that the user defined aggregations are > eventually invoked here: > > > https://github.com/apache/calcite/blob/ce2122ff2562340333bfa0ba371872fc9a9c6251/linq4j/src/main/java/org/apache/calcite/linq4j/EnumerableDefaults.java#L826 > > the code here is not hard to understand, since all the data are calculated > in this single method, in a single thread. The invoke process is something > like: > > (repeatedly) > +----------+ > | | > | | > init()-+--> add() +--> result() > > Now, if we introduce `merge` into this process, the input data will first > be splitted into bundles; in each bundle, there will be an intermediate > results, in the end, these intermediate results will be `merged` to a final > result, something like: > > (repeatedly) > +----------+ > | | > | | > init()-+--> add() +--> intermediate result + > | > (repeatedly) | > +----------+ | > | | | > | | | > init() +-v> add() +--> intermediate result +--> merge() +--> > final result > | > | > (repeatedly) | > +----------+ | > | | | > | | | > init() +-v> add() +--> intermediate result + > > > > there will be multiple threads involved here, and the merge method will be > called at the end this process in a separate thread. > > Two questions here: > > * how to represent the `multiple threads/bundles` logic in caclite? > * where to put the merge method? > > Is there already some constructs/framework i can utilize/learn, or I need > to write this new logic from scratch? > > > James <[email protected]>于2017年5月10日周三 上午10:10写道: > >> Julian, thanks for the guides, will log a issue and try to implement it. >> >> On Wed, May 10, 2017 at 1:36 AM, Julian Hyde <[email protected]> wrote: >> >>> I don't think it's that hard. We just never got round to it. Can you >>> log a JIRA case, and submit a pull request when you're done. Be sure >>> to add a test similar to UdfTest.testUserDefinedAggregateFunction. >>> >>> Also, there's no code currently that would call merge, but don't let >>> that stop you. EnumerableAggregate generates code that calls >>> EnumerableDefaults.groupBy, which groups all values of the same key >>> together. We'd use merge if we used a parallel or distributed >>> algorithm. >>> >>> Another piece of code that could call merge is roll-up: see >>> SubstitutionVisitor.getRollup. >>> >>> Julian >>> >>> >>> On Tue, May 9, 2017 at 8:03 AM, James <[email protected]> wrote: >>> > I was implementing UDAF based on calcite, but found that merge method is >>> > not implemented: >>> > >>> > >>> https://github.com/apache/calcite/blob/0938c7b6d767e3242874d87a30d9112512d9243a/core/src/main/java/org/apache/calcite/schema/impl/AggregateFunctionImpl.java#L86 >>> > >>> > I just wonder why it is not implemented? As a newbie to calcite, it >>> seems >>> > not so hard to implement the merge. Are there any issue blocking the >>> > implemention of merge, or just a matter of time? If it is just a matter >>> of >>> > time, I'd like to spend some time implementing it. >>> > >>> > Thanks in advance. >>> > James >>> >> >>
