Changes look great, Maryann. Would you mind pulling those in, Jacques?

Thanks,
James

On Mon, Oct 19, 2015 at 11:50 AM, Maryann Xue <maryann....@gmail.com> wrote:

> Made another two check-ins to https://github.com/jacques-n/drill/pull/5,
> first one for the changes James had suggested. The second check-in included
> some test cases that failed to use Phoenix partial aggregate because of
> https://issues.apache.org/jira/browse/CALCITE-926.
>
> I also reproduced the problem with Phoenix+Calcite, but will make a new
> patch for CALCITE-926 to add some standalone test cases for Calcite.
>
>
> Thanks,
> Maryann
>
> On Fri, Oct 9, 2015 at 1:30 PM, James Taylor <jamestay...@apache.org>
> wrote:
>
>> Thanks for the updates to the patch, Maryann. It's looking very good -
>> this will perform better I believe. I made a few comments on the pull
>> request.
>>
>> FYI, I filed PHOENIX-2316 to add the missing information (namely the
>> region server that the parallelized scan will go to) so that I can improve
>> the assignment logic.
>>
>>      James
>>
>> On Wed, Oct 7, 2015 at 1:11 PM, Maryann Xue <maryann....@gmail.com>
>> wrote:
>>
>>> Made another checkin for the pull request. All good now.
>>>
>>> In order to compile and run, be sure to update the Phoenix project under
>>> Julian's branch.
>>>
>>>
>>> Thanks,
>>> Maryann
>>>
>>> On Wed, Oct 7, 2015 at 12:19 PM, Jacques Nadeau <jacq...@dremio.com>
>>> wrote:
>>>
>>>> I just filed a jira for the merge issue:
>>>>
>>>> https://issues.apache.org/jira/browse/DRILL-3907
>>>>
>>>> --
>>>> Jacques Nadeau
>>>> CTO and Co-Founder, Dremio
>>>>
>>>> On Wed, Oct 7, 2015 at 8:54 AM, Jacques Nadeau <jacq...@dremio.com>
>>>> wrote:
>>>>
>>>>> Drill doesn't currently have a merge-sort operation available outside
>>>>> the context of an exchange. See here:
>>>>>
>>>>>
>>>>> https://github.com/apache/drill/tree/master/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/mergereceiver
>>>>>
>>>>> We'll need to do a bit of refactoring to provide this functionality
>>>>> outside the context of an exchange. The one other thing we'll have to 
>>>>> think
>>>>> about in this context is how do we avoid doing a n-way merge in the case
>>>>> that the we're not using the collation.
>>>>>
>>>>> --
>>>>> Jacques Nadeau
>>>>> CTO and Co-Founder, Dremio
>>>>>
>>>>> On Wed, Oct 7, 2015 at 8:18 AM, Maryann Xue <maryann....@gmail.com>
>>>>> wrote:
>>>>>
>>>>>> One thing from what I asked James offline yesterday, and maybe we can
>>>>>> discuss a little bit in today's meeting:
>>>>>>
>>>>>> Phoenix uses a list of lists of Scan objects to indicate Region
>>>>>> boundaries and guideposts, and if the top-level list contains more than 
>>>>>> one
>>>>>> element it means that the results from different Scanner/ResultIterator
>>>>>> should be merge-sorted. We now use this list in Drill integration to
>>>>>> generate different batches or slices. I see from the Drill plan of a 
>>>>>> simple
>>>>>> select like "SELECT * FROM A.BEER" that a Drill Sort node sits on top of
>>>>>> the PhoenixTableScan. I guess this is a real sort rather than a 
>>>>>> merge-sort.
>>>>>> So optimally,
>>>>>> 1) this should be a merge-sort (to be more accurate, a merge)
>>>>>> 2) furthermore, if Drill has something to indicate the order among
>>>>>> slices and batches, we could even turn it into a concat.
>>>>>>
>>>>>> The structure of this Scan list might be helpful for 2), or we may
>>>>>> have some Logical representation for this. Otherwise, we can simply 
>>>>>> flatten
>>>>>> this list to a one-dimensional list as we do now (in my ci yesterday).
>>>>>>
>>>>>>
>>>>>>
>>>>>> Thanks,
>>>>>> Maryann
>>>>>>
>>>>>> On Tue, Oct 6, 2015 at 9:52 PM, Maryann Xue <maryann....@gmail.com>
>>>>>> wrote:
>>>>>>
>>>>>>> Yes, but the partially aggregated results will not contain any
>>>>>>> duplicate rowkeys, since they are also group-by keys. What we need is 
>>>>>>> the
>>>>>>> aggregators and call aggregate for each row. We can write a new simpler
>>>>>>> ResultIterator to replace this, but for now it should work correctly.
>>>>>>>
>>>>>>> On Tue, Oct 6, 2015 at 9:45 PM, James Taylor <jamestay...@apache.org
>>>>>>> > wrote:
>>>>>>>
>>>>>>>> The results we get back from the server-side scan are already the
>>>>>>>> partial aggregated values we need. GroupedAggregatingResultIterator
>>>>>>>> will collapse adjacent Tuples together which happen to have the same 
>>>>>>>> row
>>>>>>>> key. I'm not sure we want/need this to happen. Instead I think we just 
>>>>>>>> need
>>>>>>>> to decode the aggregated values directly from the result of the scan.
>>>>>>>>
>>>>>>>> On Tue, Oct 6, 2015 at 6:07 PM, Maryann Xue <maryann....@gmail.com>
>>>>>>>> wrote:
>>>>>>>>
>>>>>>>>> Hi James,
>>>>>>>>>
>>>>>>>>> bq. A few questions for you: not sure I understand the changes you
>>>>>>>>> made to PhoenixRecordReader. Is it necessary to wrap the server-side 
>>>>>>>>> scan
>>>>>>>>> results in a GroupedAggregatingResultIterator? Each server-side scan 
>>>>>>>>> will
>>>>>>>>> produce results with a single tuple per group by key. In Phoenix, the
>>>>>>>>> GroupedAggregatingResultIterator's function in life is to do the final
>>>>>>>>> merge. Note too that the results aren't sorted that come back from the
>>>>>>>>> aggregated scan (while GroupedAggregatingResultIterator needs tuples 
>>>>>>>>> sorted
>>>>>>>>> by the group by key). Or is this just to help in decoding the values 
>>>>>>>>> coming
>>>>>>>>> back from the scan?
>>>>>>>>>
>>>>>>>>> It is necessary. I suppose what we should return as a partial
>>>>>>>>> result from PhoenixRecordReader is exactly the same as what we do in
>>>>>>>>> standalone Phoenix+Calcite, except that the result is partial or say
>>>>>>>>> incomplete. For example, we have "select a, count(*) from t group by 
>>>>>>>>> a", we
>>>>>>>>> should return rows that have "a" as the first expression value, and
>>>>>>>>> "count(*)" as the second expression value. For this "count" 
>>>>>>>>> expression, it
>>>>>>>>> actually needs a ClientAggregator for evaluation, and that's what this
>>>>>>>>> GroupedAggregatingResultIterator is used for.
>>>>>>>>> Since "each server-side scan will produce results with a single
>>>>>>>>> tuple per group by key", and PhoenixRecordReader is only dealing with 
>>>>>>>>> one
>>>>>>>>> server-side result each time, we don't care how the group-by keys are
>>>>>>>>> arranged (ordered or unordered"). Actually
>>>>>>>>> GroupedAggregatingResultIterator is not the group-by iterator we
>>>>>>>>> use for AggregatePlan. It does not "combine". It treats every row as a
>>>>>>>>> different group, by returning its rowkey as the group-by key (
>>>>>>>>> GroupedAggregatingResultIterator.java:56).
>>>>>>>>>
>>>>>>>>> In short, this iterator is for decoding the server-side values. So
>>>>>>>>> we may want to optimize this logic by removing this serialization and
>>>>>>>>> deserialization and having only one set of aggregators in future.
>>>>>>>>>
>>>>>>>>> bq. Also, not sure what impact it has in the way we "combine" the
>>>>>>>>> scans in our Drill parallelization code 
>>>>>>>>> (PhoenixGroupScan.applyAssignments()),
>>>>>>>>> as each of our scans could include duplicate group by keys. Is it ok 
>>>>>>>>> to
>>>>>>>>> combine them in this case?
>>>>>>>>>
>>>>>>>>> It should not matter, or at least is not related to the problem
>>>>>>>>> I'm now having.
>>>>>>>>>
>>>>>>>>> bq. One more question: how is the group by key communicated back
>>>>>>>>> to Drill?
>>>>>>>>>
>>>>>>>>> According to the HashAggPrule, if it decides to create a two-phase
>>>>>>>>> aggregate, the first phase is now handled by Phoenix (after applying 
>>>>>>>>> the
>>>>>>>>> PhoenixHashAggPrule). I assume then the partial results gets shuffled 
>>>>>>>>> based
>>>>>>>>> on the hash of their group-by keys (returned by PhoenixRecordReader). 
>>>>>>>>> The
>>>>>>>>> final step is the Drill hash aggregation.
>>>>>>>>>
>>>>>>>>>
>>>>>>>>> This is my test table "A.BEER", which has for columns: "B", "E1",
>>>>>>>>> "E2", "R", all of INTEGER types. And the data is generated like this:
>>>>>>>>> for (x=1 to N) { //currently N=1000
>>>>>>>>>  UPSERT INTO A.BEER VALUES (x, x % 10, x % 100, x);
>>>>>>>>> }
>>>>>>>>>
>>>>>>>>> The group-by query for testing is "SELECT e1, count(*) FROM a.beer
>>>>>>>>> GROUP BY e1".
>>>>>>>>> The expected result should be:
>>>>>>>>> 0 100
>>>>>>>>> 1 100
>>>>>>>>> 2 100
>>>>>>>>> 3 100
>>>>>>>>> 4 100
>>>>>>>>> 5 100
>>>>>>>>> 6 100
>>>>>>>>> 7 100
>>>>>>>>> 8 100
>>>>>>>>> 9 100
>>>>>>>>> The actual result was:
>>>>>>>>> 6 0
>>>>>>>>> 7 0
>>>>>>>>> 8 0
>>>>>>>>> 9 0
>>>>>>>>> 0 0
>>>>>>>>> 1 100
>>>>>>>>> 2 100
>>>>>>>>> 3 100
>>>>>>>>> 4 100
>>>>>>>>> 5 100
>>>>>>>>>
>>>>>>>>> Here I just tried another one "SELECT e2, count(*) FROM a.beer
>>>>>>>>> GROUP BY e2".
>>>>>>>>> Similarly, the expected result should have group-by keys from 0 to
>>>>>>>>> 99, each having a value of 10 as the count, while the actual result 
>>>>>>>>> was:
>>>>>>>>> from group-by key 86 to 99, together with 0, their count values
>>>>>>>>> were all 0; the rest (1 to 85) all had the correct value 10.
>>>>>>>>>
>>>>>>>>> Looks to me that the scans were good but there was a problem with
>>>>>>>>> one of the hash buckets.
>>>>>>>>>
>>>>>>>>> Thanks,
>>>>>>>>> Maryann
>>>>>>>>>
>>>>>>>>>
>>>>>>>>> On Tue, Oct 6, 2015 at 6:45 PM, James Taylor <
>>>>>>>>> jamestay...@apache.org> wrote:
>>>>>>>>>
>>>>>>>>>> Nice progress, Maryann.
>>>>>>>>>>
>>>>>>>>>> A few questions for you: not sure I understand the changes you
>>>>>>>>>> made to PhoenixRecordReader. Is it necessary to wrap the server-side 
>>>>>>>>>> scan
>>>>>>>>>> results in a GroupedAggregatingResultIterator? Each server-side scan 
>>>>>>>>>> will
>>>>>>>>>> produce results with a single tuple per group by key. In Phoenix, the
>>>>>>>>>> GroupedAggregatingResultIterator's function in life is to do the 
>>>>>>>>>> final
>>>>>>>>>> merge. Note too that the results aren't sorted that come back from 
>>>>>>>>>> the
>>>>>>>>>> aggregated scan (while GroupedAggregatingResultIterator needs tuples 
>>>>>>>>>> sorted
>>>>>>>>>> by the group by key). Or is this just to help in decoding the values 
>>>>>>>>>> coming
>>>>>>>>>> back from the scan?
>>>>>>>>>>
>>>>>>>>>> Also, not sure what impact it has in the way we "combine" the
>>>>>>>>>> scans in our Drill parallelization code
>>>>>>>>>> (PhoenixGroupScan.applyAssignments()), as each of our scans could 
>>>>>>>>>> include
>>>>>>>>>> duplicate group by keys. Is it ok to combine them in this case?
>>>>>>>>>>
>>>>>>>>>> One more question: how is the group by key communicated back to
>>>>>>>>>> Drill?
>>>>>>>>>>
>>>>>>>>>> Thanks,
>>>>>>>>>> James
>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>> On Tue, Oct 6, 2015 at 2:10 PM, Maryann Xue <
>>>>>>>>>> maryann....@gmail.com> wrote:
>>>>>>>>>>
>>>>>>>>>>> Added a few fixes in the pull request. Tested with two regions,
>>>>>>>>>>> turned out that half of the result is empty (count = 0).
>>>>>>>>>>> Not sure if there's anything wrong with
>>>>>>>>>>> https://github.com/maryannxue/drill/blob/phoenix_plugin/contrib/storage-phoenix/src/main/java/org/apache/drill/exec/store/phoenix/rel/PhoenixHashAggPrule.java
>>>>>>>>>>> .
>>>>>>>>>>> Like Julian said, this rule looks a bit hacky.
>>>>>>>>>>>
>>>>>>>>>>> To force a 2-phase HashAgg, I made a temporary change as well:
>>>>>>>>>>>
>>>>>>>>>>> diff --git
>>>>>>>>>>> a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/AggPruleBase.java
>>>>>>>>>>> b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/AggPruleBase.java
>>>>>>>>>>>
>>>>>>>>>>> index b911f6b..58bc918 100644
>>>>>>>>>>>
>>>>>>>>>>> ---
>>>>>>>>>>> a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/AggPruleBase.java
>>>>>>>>>>>
>>>>>>>>>>> +++
>>>>>>>>>>> b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/AggPruleBase.java
>>>>>>>>>>>
>>>>>>>>>>> @@ -60,12 +60,12 @@ public abstract class AggPruleBase extends
>>>>>>>>>>> Prule {
>>>>>>>>>>>
>>>>>>>>>>>    // If any of the aggregate functions are not one of these,
>>>>>>>>>>> then we
>>>>>>>>>>>
>>>>>>>>>>>    // currently won't generate a 2 phase plan.
>>>>>>>>>>>
>>>>>>>>>>>    protected boolean create2PhasePlan(RelOptRuleCall call,
>>>>>>>>>>> DrillAggregateRel aggregate) {
>>>>>>>>>>>
>>>>>>>>>>> -    PlannerSettings settings =
>>>>>>>>>>> PrelUtil.getPlannerSettings(call.getPlanner());
>>>>>>>>>>>
>>>>>>>>>>> -    RelNode child = call.rel(0).getInputs().get(0);
>>>>>>>>>>>
>>>>>>>>>>> -    boolean smallInput = child.getRows() <
>>>>>>>>>>> settings.getSliceTarget();
>>>>>>>>>>>
>>>>>>>>>>> -    if (! settings.isMultiPhaseAggEnabled() ||
>>>>>>>>>>> settings.isSingleMode() || smallInput) {
>>>>>>>>>>>
>>>>>>>>>>> -      return false;
>>>>>>>>>>>
>>>>>>>>>>> -    }
>>>>>>>>>>>
>>>>>>>>>>> +//    PlannerSettings settings =
>>>>>>>>>>> PrelUtil.getPlannerSettings(call.getPlanner());
>>>>>>>>>>>
>>>>>>>>>>> +//    RelNode child = call.rel(0).getInputs().get(0);
>>>>>>>>>>>
>>>>>>>>>>> +//    boolean smallInput = child.getRows() <
>>>>>>>>>>> settings.getSliceTarget();
>>>>>>>>>>>
>>>>>>>>>>> +//    if (! settings.isMultiPhaseAggEnabled() ||
>>>>>>>>>>> settings.isSingleMode() || smallInput) {
>>>>>>>>>>>
>>>>>>>>>>> +//      return false;
>>>>>>>>>>>
>>>>>>>>>>> +//    }
>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>>      for (AggregateCall aggCall : aggregate.getAggCallList()) {
>>>>>>>>>>>
>>>>>>>>>>>        String name = aggCall.getAggregation().getName();
>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>> Thanks,
>>>>>>>>>>> Maryann
>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>> On Tue, Oct 6, 2015 at 2:31 PM, Julian Hyde <jh...@apache.org>
>>>>>>>>>>> wrote:
>>>>>>>>>>>
>>>>>>>>>>>> Drill's current approach seems adequate for Drill alone but
>>>>>>>>>>>> extending
>>>>>>>>>>>> it to a heterogenous system that includes Phoenix seems like a
>>>>>>>>>>>> hack.
>>>>>>>>>>>>
>>>>>>>>>>>> I think you should only create Prels for algebra nodes that you
>>>>>>>>>>>> know
>>>>>>>>>>>> for sure are going to run on the Drill engine. If there's a
>>>>>>>>>>>> possibility that it would run in another engine such as Phoenix
>>>>>>>>>>>> then
>>>>>>>>>>>> they should still be logical.
>>>>>>>>>>>>
>>>>>>>>>>>> On Tue, Oct 6, 2015 at 11:03 AM, Maryann Xue <
>>>>>>>>>>>> maryann....@gmail.com> wrote:
>>>>>>>>>>>> > The partial aggregate seems to be working now, with one
>>>>>>>>>>>> interface extension
>>>>>>>>>>>> > and one bug fix in the Phoenix project. Will do some code
>>>>>>>>>>>> cleanup and
>>>>>>>>>>>> > create a pull request soon.
>>>>>>>>>>>> >
>>>>>>>>>>>> > Still there was a hack in the Drill project which I made to
>>>>>>>>>>>> force 2-phase
>>>>>>>>>>>> > aggregation. I'll try to fix that.
>>>>>>>>>>>> >
>>>>>>>>>>>> > Jacques, I have one question though, how can I verify that
>>>>>>>>>>>> there are more
>>>>>>>>>>>> > than one slice and the shuffle happens?
>>>>>>>>>>>> >
>>>>>>>>>>>> >
>>>>>>>>>>>> > Thanks,
>>>>>>>>>>>> > Maryann
>>>>>>>>>>>> >
>>>>>>>>>>>> > On Mon, Oct 5, 2015 at 2:03 PM, James Taylor <
>>>>>>>>>>>> jamestay...@apache.org> wrote:
>>>>>>>>>>>> >
>>>>>>>>>>>> >> Maryann,
>>>>>>>>>>>> >> I believe Jacques mentioned that a little bit of refactoring
>>>>>>>>>>>> is required
>>>>>>>>>>>> >> for a merge sort to occur - there's something that does
>>>>>>>>>>>> that, but it's not
>>>>>>>>>>>> >> expected to be used in this context currently.
>>>>>>>>>>>> >>
>>>>>>>>>>>> >> IMHO, there's more of a clear value in getting the
>>>>>>>>>>>> aggregation to use
>>>>>>>>>>>> >> Phoenix first, so I'd recommend going down that road as
>>>>>>>>>>>> Jacques mentioned
>>>>>>>>>>>> >> above if possible. Once that's working, we can circle back
>>>>>>>>>>>> to the partial
>>>>>>>>>>>> >> sort.
>>>>>>>>>>>> >>
>>>>>>>>>>>> >> Thoughts?
>>>>>>>>>>>> >> James
>>>>>>>>>>>> >>
>>>>>>>>>>>> >> On Mon, Oct 5, 2015 at 10:40 AM, Maryann Xue <
>>>>>>>>>>>> maryann....@gmail.com>
>>>>>>>>>>>> >> wrote:
>>>>>>>>>>>> >>
>>>>>>>>>>>> >>> I actually tried implementing partial sort with
>>>>>>>>>>>> >>> https://github.com/jacques-n/drill/pull/4, which I figured
>>>>>>>>>>>> might be a
>>>>>>>>>>>> >>> little easier to start with than partial aggregation. But I
>>>>>>>>>>>> found that even
>>>>>>>>>>>> >>> though the code worked (returned the right results), the
>>>>>>>>>>>> Drill side sort
>>>>>>>>>>>> >>> turned out to be a ordinary sort instead of a merge which
>>>>>>>>>>>> it should have
>>>>>>>>>>>> >>> been. Any idea of how to fix that?
>>>>>>>>>>>> >>>
>>>>>>>>>>>> >>>
>>>>>>>>>>>> >>> Thanks,
>>>>>>>>>>>> >>> Maryann
>>>>>>>>>>>> >>>
>>>>>>>>>>>> >>> On Mon, Oct 5, 2015 at 12:52 PM, Jacques Nadeau <
>>>>>>>>>>>> jacq...@dremio.com>
>>>>>>>>>>>> >>> wrote:
>>>>>>>>>>>> >>>
>>>>>>>>>>>> >>>> Right now this type of work is done here:
>>>>>>>>>>>> >>>>
>>>>>>>>>>>> >>>>
>>>>>>>>>>>> >>>>
>>>>>>>>>>>> https://github.com/apache/drill/blob/master/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/HashAggPrule.java
>>>>>>>>>>>> >>>>
>>>>>>>>>>>> >>>>
>>>>>>>>>>>> https://github.com/apache/drill/blob/master/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/AggPruleBase.java
>>>>>>>>>>>> >>>>
>>>>>>>>>>>> >>>> With Distribution Trait application here:
>>>>>>>>>>>> >>>>
>>>>>>>>>>>> >>>>
>>>>>>>>>>>> https://github.com/apache/drill/blob/master/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/DrillDistributionTraitDef.java
>>>>>>>>>>>> >>>>
>>>>>>>>>>>> >>>> To me, the easiest way to solve the Phoenix issue is by
>>>>>>>>>>>> providing a rule
>>>>>>>>>>>> >>>> that matches HashAgg and StreamAgg but requires Phoenix
>>>>>>>>>>>> convention as
>>>>>>>>>>>> >>>> input. It would replace everywhere but would only be
>>>>>>>>>>>> plannable when it is
>>>>>>>>>>>> >>>> the first phase of aggregation.
>>>>>>>>>>>> >>>>
>>>>>>>>>>>> >>>> Thoughts?
>>>>>>>>>>>> >>>>
>>>>>>>>>>>> >>>>
>>>>>>>>>>>> >>>>
>>>>>>>>>>>> >>>> --
>>>>>>>>>>>> >>>> Jacques Nadeau
>>>>>>>>>>>> >>>> CTO and Co-Founder, Dremio
>>>>>>>>>>>> >>>>
>>>>>>>>>>>> >>>> On Thu, Oct 1, 2015 at 2:30 PM, Julian Hyde <
>>>>>>>>>>>> jh...@apache.org> wrote:
>>>>>>>>>>>> >>>>
>>>>>>>>>>>> >>>>> Phoenix is able to perform quite a few relational
>>>>>>>>>>>> operations on the
>>>>>>>>>>>> >>>>> region server: scan, filter, project, aggregate, sort
>>>>>>>>>>>> (optionally with
>>>>>>>>>>>> >>>>> limit). However, the sort and aggregate are necessarily
>>>>>>>>>>>> "local". They
>>>>>>>>>>>> >>>>> can only deal with data on that region server, and there
>>>>>>>>>>>> needs to be a
>>>>>>>>>>>> >>>>> further operation to combine the results from the region
>>>>>>>>>>>> servers.
>>>>>>>>>>>> >>>>>
>>>>>>>>>>>> >>>>> The question is how to plan such queries. I think the
>>>>>>>>>>>> answer is an
>>>>>>>>>>>> >>>>> AggregateExchangeTransposeRule.
>>>>>>>>>>>> >>>>>
>>>>>>>>>>>> >>>>> The rule would spot an Aggregate on a data source that is
>>>>>>>>>>>> split into
>>>>>>>>>>>> >>>>> multiple locations (partitions) and split it into a
>>>>>>>>>>>> partial Aggregate
>>>>>>>>>>>> >>>>> that computes sub-totals and a summarizing Aggregate that
>>>>>>>>>>>> combines
>>>>>>>>>>>> >>>>> those totals.
>>>>>>>>>>>> >>>>>
>>>>>>>>>>>> >>>>> How does the planner know that the Aggregate needs to be
>>>>>>>>>>>> split? Since
>>>>>>>>>>>> >>>>> the data's distribution has changed, there would need to
>>>>>>>>>>>> be an
>>>>>>>>>>>> >>>>> Exchange operator. It is the Exchange operator that
>>>>>>>>>>>> triggers the rule
>>>>>>>>>>>> >>>>> to fire.
>>>>>>>>>>>> >>>>>
>>>>>>>>>>>> >>>>> There are some special cases. If the data is sorted as
>>>>>>>>>>>> well as
>>>>>>>>>>>> >>>>> partitioned (say because the local aggregate uses a
>>>>>>>>>>>> sort-based
>>>>>>>>>>>> >>>>> algorithm) we could maybe use a more efficient plan. And
>>>>>>>>>>>> if the
>>>>>>>>>>>> >>>>> partition key is the same as the aggregation key we don't
>>>>>>>>>>>> need a
>>>>>>>>>>>> >>>>> summarizing Aggregate, just a Union.
>>>>>>>>>>>> >>>>>
>>>>>>>>>>>> >>>>> It turns out not to be very Phoenix-specific. In the
>>>>>>>>>>>> Drill-on-Phoenix
>>>>>>>>>>>> >>>>> scenario, once the Aggregate has been pushed through the
>>>>>>>>>>>> Exchange
>>>>>>>>>>>> >>>>> (i.e. onto the drill-bit residing on the region server)
>>>>>>>>>>>> we can then
>>>>>>>>>>>> >>>>> push the DrillAggregate across the drill-to-phoenix
>>>>>>>>>>>> membrane and make
>>>>>>>>>>>> >>>>> it into a PhoenixServerAggregate that executes in the
>>>>>>>>>>>> region server.
>>>>>>>>>>>> >>>>>
>>>>>>>>>>>> >>>>> Related issues:
>>>>>>>>>>>> >>>>> * https://issues.apache.org/jira/browse/DRILL-3840
>>>>>>>>>>>> >>>>> * https://issues.apache.org/jira/browse/CALCITE-751
>>>>>>>>>>>> >>>>>
>>>>>>>>>>>> >>>>> Julian
>>>>>>>>>>>> >>>>>
>>>>>>>>>>>> >>>>
>>>>>>>>>>>> >>>>
>>>>>>>>>>>> >>>
>>>>>>>>>>>> >>
>>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>
>>>>>>>>
>>>>>>>
>>>>>>
>>>>>
>>>>
>>>
>>
>

Reply via email to