Thanks for the updates to the patch, Maryann. It's looking very good - this will perform better I believe. I made a few comments on the pull request.
FYI, I filed PHOENIX-2316 to add the missing information (namely the region server that the parallelized scan will go to) so that I can improve the assignment logic. James On Wed, Oct 7, 2015 at 1:11 PM, Maryann Xue <maryann....@gmail.com> wrote: > Made another checkin for the pull request. All good now. > > In order to compile and run, be sure to update the Phoenix project under > Julian's branch. > > > Thanks, > Maryann > > On Wed, Oct 7, 2015 at 12:19 PM, Jacques Nadeau <jacq...@dremio.com> > wrote: > >> I just filed a jira for the merge issue: >> >> https://issues.apache.org/jira/browse/DRILL-3907 >> >> -- >> Jacques Nadeau >> CTO and Co-Founder, Dremio >> >> On Wed, Oct 7, 2015 at 8:54 AM, Jacques Nadeau <jacq...@dremio.com> >> wrote: >> >>> Drill doesn't currently have a merge-sort operation available outside >>> the context of an exchange. See here: >>> >>> >>> https://github.com/apache/drill/tree/master/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/mergereceiver >>> >>> We'll need to do a bit of refactoring to provide this functionality >>> outside the context of an exchange. The one other thing we'll have to think >>> about in this context is how do we avoid doing a n-way merge in the case >>> that the we're not using the collation. >>> >>> -- >>> Jacques Nadeau >>> CTO and Co-Founder, Dremio >>> >>> On Wed, Oct 7, 2015 at 8:18 AM, Maryann Xue <maryann....@gmail.com> >>> wrote: >>> >>>> One thing from what I asked James offline yesterday, and maybe we can >>>> discuss a little bit in today's meeting: >>>> >>>> Phoenix uses a list of lists of Scan objects to indicate Region >>>> boundaries and guideposts, and if the top-level list contains more than one >>>> element it means that the results from different Scanner/ResultIterator >>>> should be merge-sorted. We now use this list in Drill integration to >>>> generate different batches or slices. I see from the Drill plan of a simple >>>> select like "SELECT * FROM A.BEER" that a Drill Sort node sits on top of >>>> the PhoenixTableScan. I guess this is a real sort rather than a merge-sort. >>>> So optimally, >>>> 1) this should be a merge-sort (to be more accurate, a merge) >>>> 2) furthermore, if Drill has something to indicate the order among >>>> slices and batches, we could even turn it into a concat. >>>> >>>> The structure of this Scan list might be helpful for 2), or we may have >>>> some Logical representation for this. Otherwise, we can simply flatten this >>>> list to a one-dimensional list as we do now (in my ci yesterday). >>>> >>>> >>>> >>>> Thanks, >>>> Maryann >>>> >>>> On Tue, Oct 6, 2015 at 9:52 PM, Maryann Xue <maryann....@gmail.com> >>>> wrote: >>>> >>>>> Yes, but the partially aggregated results will not contain any >>>>> duplicate rowkeys, since they are also group-by keys. What we need is the >>>>> aggregators and call aggregate for each row. We can write a new simpler >>>>> ResultIterator to replace this, but for now it should work correctly. >>>>> >>>>> On Tue, Oct 6, 2015 at 9:45 PM, James Taylor <jamestay...@apache.org> >>>>> wrote: >>>>> >>>>>> The results we get back from the server-side scan are already the >>>>>> partial aggregated values we need. GroupedAggregatingResultIterator >>>>>> will collapse adjacent Tuples together which happen to have the same row >>>>>> key. I'm not sure we want/need this to happen. Instead I think we just >>>>>> need >>>>>> to decode the aggregated values directly from the result of the scan. >>>>>> >>>>>> On Tue, Oct 6, 2015 at 6:07 PM, Maryann Xue <maryann....@gmail.com> >>>>>> wrote: >>>>>> >>>>>>> Hi James, >>>>>>> >>>>>>> bq. A few questions for you: not sure I understand the changes you >>>>>>> made to PhoenixRecordReader. Is it necessary to wrap the server-side >>>>>>> scan >>>>>>> results in a GroupedAggregatingResultIterator? Each server-side scan >>>>>>> will >>>>>>> produce results with a single tuple per group by key. In Phoenix, the >>>>>>> GroupedAggregatingResultIterator's function in life is to do the final >>>>>>> merge. Note too that the results aren't sorted that come back from the >>>>>>> aggregated scan (while GroupedAggregatingResultIterator needs tuples >>>>>>> sorted >>>>>>> by the group by key). Or is this just to help in decoding the values >>>>>>> coming >>>>>>> back from the scan? >>>>>>> >>>>>>> It is necessary. I suppose what we should return as a partial result >>>>>>> from PhoenixRecordReader is exactly the same as what we do in standalone >>>>>>> Phoenix+Calcite, except that the result is partial or say incomplete. >>>>>>> For >>>>>>> example, we have "select a, count(*) from t group by a", we should >>>>>>> return >>>>>>> rows that have "a" as the first expression value, and "count(*)" as the >>>>>>> second expression value. For this "count" expression, it actually needs >>>>>>> a >>>>>>> ClientAggregator for evaluation, and that's what this >>>>>>> GroupedAggregatingResultIterator is used for. >>>>>>> Since "each server-side scan will produce results with a single >>>>>>> tuple per group by key", and PhoenixRecordReader is only dealing with >>>>>>> one >>>>>>> server-side result each time, we don't care how the group-by keys are >>>>>>> arranged (ordered or unordered"). Actually >>>>>>> GroupedAggregatingResultIterator is not the group-by iterator we >>>>>>> use for AggregatePlan. It does not "combine". It treats every row as a >>>>>>> different group, by returning its rowkey as the group-by key ( >>>>>>> GroupedAggregatingResultIterator.java:56). >>>>>>> >>>>>>> In short, this iterator is for decoding the server-side values. So >>>>>>> we may want to optimize this logic by removing this serialization and >>>>>>> deserialization and having only one set of aggregators in future. >>>>>>> >>>>>>> bq. Also, not sure what impact it has in the way we "combine" the >>>>>>> scans in our Drill parallelization code >>>>>>> (PhoenixGroupScan.applyAssignments()), >>>>>>> as each of our scans could include duplicate group by keys. Is it ok to >>>>>>> combine them in this case? >>>>>>> >>>>>>> It should not matter, or at least is not related to the problem I'm >>>>>>> now having. >>>>>>> >>>>>>> bq. One more question: how is the group by key communicated back to >>>>>>> Drill? >>>>>>> >>>>>>> According to the HashAggPrule, if it decides to create a two-phase >>>>>>> aggregate, the first phase is now handled by Phoenix (after applying the >>>>>>> PhoenixHashAggPrule). I assume then the partial results gets shuffled >>>>>>> based >>>>>>> on the hash of their group-by keys (returned by PhoenixRecordReader). >>>>>>> The >>>>>>> final step is the Drill hash aggregation. >>>>>>> >>>>>>> >>>>>>> This is my test table "A.BEER", which has for columns: "B", "E1", >>>>>>> "E2", "R", all of INTEGER types. And the data is generated like this: >>>>>>> for (x=1 to N) { //currently N=1000 >>>>>>> UPSERT INTO A.BEER VALUES (x, x % 10, x % 100, x); >>>>>>> } >>>>>>> >>>>>>> The group-by query for testing is "SELECT e1, count(*) FROM a.beer >>>>>>> GROUP BY e1". >>>>>>> The expected result should be: >>>>>>> 0 100 >>>>>>> 1 100 >>>>>>> 2 100 >>>>>>> 3 100 >>>>>>> 4 100 >>>>>>> 5 100 >>>>>>> 6 100 >>>>>>> 7 100 >>>>>>> 8 100 >>>>>>> 9 100 >>>>>>> The actual result was: >>>>>>> 6 0 >>>>>>> 7 0 >>>>>>> 8 0 >>>>>>> 9 0 >>>>>>> 0 0 >>>>>>> 1 100 >>>>>>> 2 100 >>>>>>> 3 100 >>>>>>> 4 100 >>>>>>> 5 100 >>>>>>> >>>>>>> Here I just tried another one "SELECT e2, count(*) FROM a.beer >>>>>>> GROUP BY e2". >>>>>>> Similarly, the expected result should have group-by keys from 0 to >>>>>>> 99, each having a value of 10 as the count, while the actual result was: >>>>>>> from group-by key 86 to 99, together with 0, their count values were >>>>>>> all 0; the rest (1 to 85) all had the correct value 10. >>>>>>> >>>>>>> Looks to me that the scans were good but there was a problem with >>>>>>> one of the hash buckets. >>>>>>> >>>>>>> Thanks, >>>>>>> Maryann >>>>>>> >>>>>>> >>>>>>> On Tue, Oct 6, 2015 at 6:45 PM, James Taylor <jamestay...@apache.org >>>>>>> > wrote: >>>>>>> >>>>>>>> Nice progress, Maryann. >>>>>>>> >>>>>>>> A few questions for you: not sure I understand the changes you made >>>>>>>> to PhoenixRecordReader. Is it necessary to wrap the server-side scan >>>>>>>> results in a GroupedAggregatingResultIterator? Each server-side scan >>>>>>>> will >>>>>>>> produce results with a single tuple per group by key. In Phoenix, the >>>>>>>> GroupedAggregatingResultIterator's function in life is to do the final >>>>>>>> merge. Note too that the results aren't sorted that come back from the >>>>>>>> aggregated scan (while GroupedAggregatingResultIterator needs tuples >>>>>>>> sorted >>>>>>>> by the group by key). Or is this just to help in decoding the values >>>>>>>> coming >>>>>>>> back from the scan? >>>>>>>> >>>>>>>> Also, not sure what impact it has in the way we "combine" the scans >>>>>>>> in our Drill parallelization code >>>>>>>> (PhoenixGroupScan.applyAssignments()), as >>>>>>>> each of our scans could include duplicate group by keys. Is it ok to >>>>>>>> combine them in this case? >>>>>>>> >>>>>>>> One more question: how is the group by key communicated back to >>>>>>>> Drill? >>>>>>>> >>>>>>>> Thanks, >>>>>>>> James >>>>>>>> >>>>>>>> >>>>>>>> On Tue, Oct 6, 2015 at 2:10 PM, Maryann Xue <maryann....@gmail.com> >>>>>>>> wrote: >>>>>>>> >>>>>>>>> Added a few fixes in the pull request. Tested with two regions, >>>>>>>>> turned out that half of the result is empty (count = 0). >>>>>>>>> Not sure if there's anything wrong with >>>>>>>>> https://github.com/maryannxue/drill/blob/phoenix_plugin/contrib/storage-phoenix/src/main/java/org/apache/drill/exec/store/phoenix/rel/PhoenixHashAggPrule.java >>>>>>>>> . >>>>>>>>> Like Julian said, this rule looks a bit hacky. >>>>>>>>> >>>>>>>>> To force a 2-phase HashAgg, I made a temporary change as well: >>>>>>>>> >>>>>>>>> diff --git >>>>>>>>> a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/AggPruleBase.java >>>>>>>>> b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/AggPruleBase.java >>>>>>>>> >>>>>>>>> index b911f6b..58bc918 100644 >>>>>>>>> >>>>>>>>> --- >>>>>>>>> a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/AggPruleBase.java >>>>>>>>> >>>>>>>>> +++ >>>>>>>>> b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/AggPruleBase.java >>>>>>>>> >>>>>>>>> @@ -60,12 +60,12 @@ public abstract class AggPruleBase extends >>>>>>>>> Prule { >>>>>>>>> >>>>>>>>> // If any of the aggregate functions are not one of these, then >>>>>>>>> we >>>>>>>>> >>>>>>>>> // currently won't generate a 2 phase plan. >>>>>>>>> >>>>>>>>> protected boolean create2PhasePlan(RelOptRuleCall call, >>>>>>>>> DrillAggregateRel aggregate) { >>>>>>>>> >>>>>>>>> - PlannerSettings settings = >>>>>>>>> PrelUtil.getPlannerSettings(call.getPlanner()); >>>>>>>>> >>>>>>>>> - RelNode child = call.rel(0).getInputs().get(0); >>>>>>>>> >>>>>>>>> - boolean smallInput = child.getRows() < >>>>>>>>> settings.getSliceTarget(); >>>>>>>>> >>>>>>>>> - if (! settings.isMultiPhaseAggEnabled() || >>>>>>>>> settings.isSingleMode() || smallInput) { >>>>>>>>> >>>>>>>>> - return false; >>>>>>>>> >>>>>>>>> - } >>>>>>>>> >>>>>>>>> +// PlannerSettings settings = >>>>>>>>> PrelUtil.getPlannerSettings(call.getPlanner()); >>>>>>>>> >>>>>>>>> +// RelNode child = call.rel(0).getInputs().get(0); >>>>>>>>> >>>>>>>>> +// boolean smallInput = child.getRows() < >>>>>>>>> settings.getSliceTarget(); >>>>>>>>> >>>>>>>>> +// if (! settings.isMultiPhaseAggEnabled() || >>>>>>>>> settings.isSingleMode() || smallInput) { >>>>>>>>> >>>>>>>>> +// return false; >>>>>>>>> >>>>>>>>> +// } >>>>>>>>> >>>>>>>>> >>>>>>>>> for (AggregateCall aggCall : aggregate.getAggCallList()) { >>>>>>>>> >>>>>>>>> String name = aggCall.getAggregation().getName(); >>>>>>>>> >>>>>>>>> >>>>>>>>> Thanks, >>>>>>>>> Maryann >>>>>>>>> >>>>>>>>> >>>>>>>>> >>>>>>>>> On Tue, Oct 6, 2015 at 2:31 PM, Julian Hyde <jh...@apache.org> >>>>>>>>> wrote: >>>>>>>>> >>>>>>>>>> Drill's current approach seems adequate for Drill alone but >>>>>>>>>> extending >>>>>>>>>> it to a heterogenous system that includes Phoenix seems like a >>>>>>>>>> hack. >>>>>>>>>> >>>>>>>>>> I think you should only create Prels for algebra nodes that you >>>>>>>>>> know >>>>>>>>>> for sure are going to run on the Drill engine. If there's a >>>>>>>>>> possibility that it would run in another engine such as Phoenix >>>>>>>>>> then >>>>>>>>>> they should still be logical. >>>>>>>>>> >>>>>>>>>> On Tue, Oct 6, 2015 at 11:03 AM, Maryann Xue < >>>>>>>>>> maryann....@gmail.com> wrote: >>>>>>>>>> > The partial aggregate seems to be working now, with one >>>>>>>>>> interface extension >>>>>>>>>> > and one bug fix in the Phoenix project. Will do some code >>>>>>>>>> cleanup and >>>>>>>>>> > create a pull request soon. >>>>>>>>>> > >>>>>>>>>> > Still there was a hack in the Drill project which I made to >>>>>>>>>> force 2-phase >>>>>>>>>> > aggregation. I'll try to fix that. >>>>>>>>>> > >>>>>>>>>> > Jacques, I have one question though, how can I verify that >>>>>>>>>> there are more >>>>>>>>>> > than one slice and the shuffle happens? >>>>>>>>>> > >>>>>>>>>> > >>>>>>>>>> > Thanks, >>>>>>>>>> > Maryann >>>>>>>>>> > >>>>>>>>>> > On Mon, Oct 5, 2015 at 2:03 PM, James Taylor < >>>>>>>>>> jamestay...@apache.org> wrote: >>>>>>>>>> > >>>>>>>>>> >> Maryann, >>>>>>>>>> >> I believe Jacques mentioned that a little bit of refactoring >>>>>>>>>> is required >>>>>>>>>> >> for a merge sort to occur - there's something that does that, >>>>>>>>>> but it's not >>>>>>>>>> >> expected to be used in this context currently. >>>>>>>>>> >> >>>>>>>>>> >> IMHO, there's more of a clear value in getting the aggregation >>>>>>>>>> to use >>>>>>>>>> >> Phoenix first, so I'd recommend going down that road as >>>>>>>>>> Jacques mentioned >>>>>>>>>> >> above if possible. Once that's working, we can circle back to >>>>>>>>>> the partial >>>>>>>>>> >> sort. >>>>>>>>>> >> >>>>>>>>>> >> Thoughts? >>>>>>>>>> >> James >>>>>>>>>> >> >>>>>>>>>> >> On Mon, Oct 5, 2015 at 10:40 AM, Maryann Xue < >>>>>>>>>> maryann....@gmail.com> >>>>>>>>>> >> wrote: >>>>>>>>>> >> >>>>>>>>>> >>> I actually tried implementing partial sort with >>>>>>>>>> >>> https://github.com/jacques-n/drill/pull/4, which I figured >>>>>>>>>> might be a >>>>>>>>>> >>> little easier to start with than partial aggregation. But I >>>>>>>>>> found that even >>>>>>>>>> >>> though the code worked (returned the right results), the >>>>>>>>>> Drill side sort >>>>>>>>>> >>> turned out to be a ordinary sort instead of a merge which it >>>>>>>>>> should have >>>>>>>>>> >>> been. Any idea of how to fix that? >>>>>>>>>> >>> >>>>>>>>>> >>> >>>>>>>>>> >>> Thanks, >>>>>>>>>> >>> Maryann >>>>>>>>>> >>> >>>>>>>>>> >>> On Mon, Oct 5, 2015 at 12:52 PM, Jacques Nadeau < >>>>>>>>>> jacq...@dremio.com> >>>>>>>>>> >>> wrote: >>>>>>>>>> >>> >>>>>>>>>> >>>> Right now this type of work is done here: >>>>>>>>>> >>>> >>>>>>>>>> >>>> >>>>>>>>>> >>>> >>>>>>>>>> https://github.com/apache/drill/blob/master/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/HashAggPrule.java >>>>>>>>>> >>>> >>>>>>>>>> >>>> >>>>>>>>>> https://github.com/apache/drill/blob/master/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/AggPruleBase.java >>>>>>>>>> >>>> >>>>>>>>>> >>>> With Distribution Trait application here: >>>>>>>>>> >>>> >>>>>>>>>> >>>> >>>>>>>>>> https://github.com/apache/drill/blob/master/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/DrillDistributionTraitDef.java >>>>>>>>>> >>>> >>>>>>>>>> >>>> To me, the easiest way to solve the Phoenix issue is by >>>>>>>>>> providing a rule >>>>>>>>>> >>>> that matches HashAgg and StreamAgg but requires Phoenix >>>>>>>>>> convention as >>>>>>>>>> >>>> input. It would replace everywhere but would only be >>>>>>>>>> plannable when it is >>>>>>>>>> >>>> the first phase of aggregation. >>>>>>>>>> >>>> >>>>>>>>>> >>>> Thoughts? >>>>>>>>>> >>>> >>>>>>>>>> >>>> >>>>>>>>>> >>>> >>>>>>>>>> >>>> -- >>>>>>>>>> >>>> Jacques Nadeau >>>>>>>>>> >>>> CTO and Co-Founder, Dremio >>>>>>>>>> >>>> >>>>>>>>>> >>>> On Thu, Oct 1, 2015 at 2:30 PM, Julian Hyde < >>>>>>>>>> jh...@apache.org> wrote: >>>>>>>>>> >>>> >>>>>>>>>> >>>>> Phoenix is able to perform quite a few relational >>>>>>>>>> operations on the >>>>>>>>>> >>>>> region server: scan, filter, project, aggregate, sort >>>>>>>>>> (optionally with >>>>>>>>>> >>>>> limit). However, the sort and aggregate are necessarily >>>>>>>>>> "local". They >>>>>>>>>> >>>>> can only deal with data on that region server, and there >>>>>>>>>> needs to be a >>>>>>>>>> >>>>> further operation to combine the results from the region >>>>>>>>>> servers. >>>>>>>>>> >>>>> >>>>>>>>>> >>>>> The question is how to plan such queries. I think the >>>>>>>>>> answer is an >>>>>>>>>> >>>>> AggregateExchangeTransposeRule. >>>>>>>>>> >>>>> >>>>>>>>>> >>>>> The rule would spot an Aggregate on a data source that is >>>>>>>>>> split into >>>>>>>>>> >>>>> multiple locations (partitions) and split it into a partial >>>>>>>>>> Aggregate >>>>>>>>>> >>>>> that computes sub-totals and a summarizing Aggregate that >>>>>>>>>> combines >>>>>>>>>> >>>>> those totals. >>>>>>>>>> >>>>> >>>>>>>>>> >>>>> How does the planner know that the Aggregate needs to be >>>>>>>>>> split? Since >>>>>>>>>> >>>>> the data's distribution has changed, there would need to be >>>>>>>>>> an >>>>>>>>>> >>>>> Exchange operator. It is the Exchange operator that >>>>>>>>>> triggers the rule >>>>>>>>>> >>>>> to fire. >>>>>>>>>> >>>>> >>>>>>>>>> >>>>> There are some special cases. If the data is sorted as well >>>>>>>>>> as >>>>>>>>>> >>>>> partitioned (say because the local aggregate uses a >>>>>>>>>> sort-based >>>>>>>>>> >>>>> algorithm) we could maybe use a more efficient plan. And if >>>>>>>>>> the >>>>>>>>>> >>>>> partition key is the same as the aggregation key we don't >>>>>>>>>> need a >>>>>>>>>> >>>>> summarizing Aggregate, just a Union. >>>>>>>>>> >>>>> >>>>>>>>>> >>>>> It turns out not to be very Phoenix-specific. In the >>>>>>>>>> Drill-on-Phoenix >>>>>>>>>> >>>>> scenario, once the Aggregate has been pushed through the >>>>>>>>>> Exchange >>>>>>>>>> >>>>> (i.e. onto the drill-bit residing on the region server) we >>>>>>>>>> can then >>>>>>>>>> >>>>> push the DrillAggregate across the drill-to-phoenix >>>>>>>>>> membrane and make >>>>>>>>>> >>>>> it into a PhoenixServerAggregate that executes in the >>>>>>>>>> region server. >>>>>>>>>> >>>>> >>>>>>>>>> >>>>> Related issues: >>>>>>>>>> >>>>> * https://issues.apache.org/jira/browse/DRILL-3840 >>>>>>>>>> >>>>> * https://issues.apache.org/jira/browse/CALCITE-751 >>>>>>>>>> >>>>> >>>>>>>>>> >>>>> Julian >>>>>>>>>> >>>>> >>>>>>>>>> >>>> >>>>>>>>>> >>>> >>>>>>>>>> >>> >>>>>>>>>> >> >>>>>>>>>> >>>>>>>>> >>>>>>>>> >>>>>>>> >>>>>>> >>>>>> >>>>> >>>> >>> >> >