Hi Julian, need some help here:

I read the source code, and find that the user defined aggregations are
eventually invoked here:


https://github.com/apache/calcite/blob/ce2122ff2562340333bfa0ba371872fc9a9c6251/linq4j/src/main/java/org/apache/calcite/linq4j/EnumerableDefaults.java#L826

the code here is not hard to understand, since all the data are calculated
in this single method, in a single thread. The invoke process is something
like:

       (repeatedly)
       +----------+
       |          |
       |          |
init()-+--> add() +--> result()

Now, if we introduce `merge` into this process, the input data will first
be splitted into bundles; in each bundle, there will be an intermediate
results, in the end, these intermediate results will be `merged` to a final
result, something like:

                  (repeatedly)
                  +----------+
                  |          |
                  |          |
           init()-+--> add() +--> intermediate result  +
                                                       |
                  (repeatedly)                         |
                  +----------+                         |
                  |          |                         |
                  |          |                         |
           init() +-v> add() +--> intermediate result  +--> merge() +-->
final result
                                                       |
                                                       |
                  (repeatedly)                         |
                  +----------+                         |
                  |          |                         |
                  |          |                         |
           init() +-v> add() +--> intermediate result  +



there will be multiple threads involved here, and the merge method will be
called at the end this process in a separate thread.

Two questions here:

* how to represent the `multiple threads/bundles` logic in caclite?
* where to put the merge method?

Is there already some constructs/framework i can utilize/learn, or I need
to write this new logic from scratch?


James <[email protected]>于2017年5月10日周三 上午10:10写道:

> Julian, thanks for the guides, will log a issue and try to implement it.
>
> On Wed, May 10, 2017 at 1:36 AM, Julian Hyde <[email protected]> wrote:
>
>> I don't think it's that hard. We just never got round to it. Can you
>> log a JIRA case, and submit a pull request when you're done. Be sure
>> to add a test similar to UdfTest.testUserDefinedAggregateFunction.
>>
>> Also, there's no code currently that would call merge, but don't let
>> that stop you. EnumerableAggregate generates code that calls
>> EnumerableDefaults.groupBy, which groups all values of the same key
>> together. We'd use merge if we used a parallel or distributed
>> algorithm.
>>
>> Another piece of code that could call merge is roll-up: see
>> SubstitutionVisitor.getRollup.
>>
>> Julian
>>
>>
>> On Tue, May 9, 2017 at 8:03 AM, James <[email protected]> wrote:
>> > I was implementing UDAF based on calcite, but found that merge method is
>> > not implemented:
>> >
>> >
>> https://github.com/apache/calcite/blob/0938c7b6d767e3242874d87a30d9112512d9243a/core/src/main/java/org/apache/calcite/schema/impl/AggregateFunctionImpl.java#L86
>> >
>> > I just wonder why it is not implemented? As a newbie to calcite, it
>> seems
>> > not so hard to implement the merge. Are there any issue blocking the
>> > implemention of merge, or just a matter of time? If it is just a matter
>> of
>> > time, I'd like to spend some time implementing it.
>> >
>> > Thanks in advance.
>> > James
>>
>
>

Reply via email to