One thing from what I asked James offline yesterday, and maybe we can discuss a little bit in today's meeting:
Phoenix uses a list of lists of Scan objects to indicate Region boundaries and guideposts, and if the top-level list contains more than one element it means that the results from different Scanner/ResultIterator should be merge-sorted. We now use this list in Drill integration to generate different batches or slices. I see from the Drill plan of a simple select like "SELECT * FROM A.BEER" that a Drill Sort node sits on top of the PhoenixTableScan. I guess this is a real sort rather than a merge-sort. So optimally, 1) this should be a merge-sort (to be more accurate, a merge) 2) furthermore, if Drill has something to indicate the order among slices and batches, we could even turn it into a concat. The structure of this Scan list might be helpful for 2), or we may have some Logical representation for this. Otherwise, we can simply flatten this list to a one-dimensional list as we do now (in my ci yesterday). Thanks, Maryann On Tue, Oct 6, 2015 at 9:52 PM, Maryann Xue <maryann....@gmail.com> wrote: > Yes, but the partially aggregated results will not contain any duplicate > rowkeys, since they are also group-by keys. What we need is the aggregators > and call aggregate for each row. We can write a new simpler ResultIterator > to replace this, but for now it should work correctly. > > On Tue, Oct 6, 2015 at 9:45 PM, James Taylor <jamestay...@apache.org> > wrote: > >> The results we get back from the server-side scan are already the partial >> aggregated values we need. GroupedAggregatingResultIterator will >> collapse adjacent Tuples together which happen to have the same row key. >> I'm not sure we want/need this to happen. Instead I think we just need to >> decode the aggregated values directly from the result of the scan. >> >> On Tue, Oct 6, 2015 at 6:07 PM, Maryann Xue <maryann....@gmail.com> >> wrote: >> >>> Hi James, >>> >>> bq. A few questions for you: not sure I understand the changes you made >>> to PhoenixRecordReader. Is it necessary to wrap the server-side scan >>> results in a GroupedAggregatingResultIterator? Each server-side scan will >>> produce results with a single tuple per group by key. In Phoenix, the >>> GroupedAggregatingResultIterator's function in life is to do the final >>> merge. Note too that the results aren't sorted that come back from the >>> aggregated scan (while GroupedAggregatingResultIterator needs tuples sorted >>> by the group by key). Or is this just to help in decoding the values coming >>> back from the scan? >>> >>> It is necessary. I suppose what we should return as a partial result >>> from PhoenixRecordReader is exactly the same as what we do in standalone >>> Phoenix+Calcite, except that the result is partial or say incomplete. For >>> example, we have "select a, count(*) from t group by a", we should return >>> rows that have "a" as the first expression value, and "count(*)" as the >>> second expression value. For this "count" expression, it actually needs a >>> ClientAggregator for evaluation, and that's what this >>> GroupedAggregatingResultIterator is used for. >>> Since "each server-side scan will produce results with a single tuple >>> per group by key", and PhoenixRecordReader is only dealing with one >>> server-side result each time, we don't care how the group-by keys are >>> arranged (ordered or unordered"). Actually >>> GroupedAggregatingResultIterator is not the group-by iterator we use >>> for AggregatePlan. It does not "combine". It treats every row as a >>> different group, by returning its rowkey as the group-by key ( >>> GroupedAggregatingResultIterator.java:56). >>> >>> In short, this iterator is for decoding the server-side values. So we >>> may want to optimize this logic by removing this serialization and >>> deserialization and having only one set of aggregators in future. >>> >>> bq. Also, not sure what impact it has in the way we "combine" the scans >>> in our Drill parallelization code (PhoenixGroupScan.applyAssignments()), >>> as each of our scans could include duplicate group by keys. Is it ok to >>> combine them in this case? >>> >>> It should not matter, or at least is not related to the problem I'm now >>> having. >>> >>> bq. One more question: how is the group by key communicated back to >>> Drill? >>> >>> According to the HashAggPrule, if it decides to create a two-phase >>> aggregate, the first phase is now handled by Phoenix (after applying the >>> PhoenixHashAggPrule). I assume then the partial results gets shuffled based >>> on the hash of their group-by keys (returned by PhoenixRecordReader). The >>> final step is the Drill hash aggregation. >>> >>> >>> This is my test table "A.BEER", which has for columns: "B", "E1", "E2", >>> "R", all of INTEGER types. And the data is generated like this: >>> for (x=1 to N) { //currently N=1000 >>> UPSERT INTO A.BEER VALUES (x, x % 10, x % 100, x); >>> } >>> >>> The group-by query for testing is "SELECT e1, count(*) FROM a.beer GROUP >>> BY e1". >>> The expected result should be: >>> 0 100 >>> 1 100 >>> 2 100 >>> 3 100 >>> 4 100 >>> 5 100 >>> 6 100 >>> 7 100 >>> 8 100 >>> 9 100 >>> The actual result was: >>> 6 0 >>> 7 0 >>> 8 0 >>> 9 0 >>> 0 0 >>> 1 100 >>> 2 100 >>> 3 100 >>> 4 100 >>> 5 100 >>> >>> Here I just tried another one "SELECT e2, count(*) FROM a.beer GROUP BY >>> e2". >>> Similarly, the expected result should have group-by keys from 0 to 99, >>> each having a value of 10 as the count, while the actual result was: >>> from group-by key 86 to 99, together with 0, their count values were all >>> 0; the rest (1 to 85) all had the correct value 10. >>> >>> Looks to me that the scans were good but there was a problem with one of >>> the hash buckets. >>> >>> Thanks, >>> Maryann >>> >>> >>> On Tue, Oct 6, 2015 at 6:45 PM, James Taylor <jamestay...@apache.org> >>> wrote: >>> >>>> Nice progress, Maryann. >>>> >>>> A few questions for you: not sure I understand the changes you made to >>>> PhoenixRecordReader. Is it necessary to wrap the server-side scan results >>>> in a GroupedAggregatingResultIterator? Each server-side scan will produce >>>> results with a single tuple per group by key. In Phoenix, the >>>> GroupedAggregatingResultIterator's function in life is to do the final >>>> merge. Note too that the results aren't sorted that come back from the >>>> aggregated scan (while GroupedAggregatingResultIterator needs tuples sorted >>>> by the group by key). Or is this just to help in decoding the values coming >>>> back from the scan? >>>> >>>> Also, not sure what impact it has in the way we "combine" the scans in >>>> our Drill parallelization code (PhoenixGroupScan.applyAssignments()), as >>>> each of our scans could include duplicate group by keys. Is it ok to >>>> combine them in this case? >>>> >>>> One more question: how is the group by key communicated back to Drill? >>>> >>>> Thanks, >>>> James >>>> >>>> >>>> On Tue, Oct 6, 2015 at 2:10 PM, Maryann Xue <maryann....@gmail.com> >>>> wrote: >>>> >>>>> Added a few fixes in the pull request. Tested with two regions, turned >>>>> out that half of the result is empty (count = 0). >>>>> Not sure if there's anything wrong with >>>>> https://github.com/maryannxue/drill/blob/phoenix_plugin/contrib/storage-phoenix/src/main/java/org/apache/drill/exec/store/phoenix/rel/PhoenixHashAggPrule.java >>>>> . >>>>> Like Julian said, this rule looks a bit hacky. >>>>> >>>>> To force a 2-phase HashAgg, I made a temporary change as well: >>>>> >>>>> diff --git >>>>> a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/AggPruleBase.java >>>>> b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/AggPruleBase.java >>>>> >>>>> index b911f6b..58bc918 100644 >>>>> >>>>> --- >>>>> a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/AggPruleBase.java >>>>> >>>>> +++ >>>>> b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/AggPruleBase.java >>>>> >>>>> @@ -60,12 +60,12 @@ public abstract class AggPruleBase extends Prule { >>>>> >>>>> // If any of the aggregate functions are not one of these, then we >>>>> >>>>> // currently won't generate a 2 phase plan. >>>>> >>>>> protected boolean create2PhasePlan(RelOptRuleCall call, >>>>> DrillAggregateRel aggregate) { >>>>> >>>>> - PlannerSettings settings = >>>>> PrelUtil.getPlannerSettings(call.getPlanner()); >>>>> >>>>> - RelNode child = call.rel(0).getInputs().get(0); >>>>> >>>>> - boolean smallInput = child.getRows() < settings.getSliceTarget(); >>>>> >>>>> - if (! settings.isMultiPhaseAggEnabled() || >>>>> settings.isSingleMode() || smallInput) { >>>>> >>>>> - return false; >>>>> >>>>> - } >>>>> >>>>> +// PlannerSettings settings = >>>>> PrelUtil.getPlannerSettings(call.getPlanner()); >>>>> >>>>> +// RelNode child = call.rel(0).getInputs().get(0); >>>>> >>>>> +// boolean smallInput = child.getRows() < >>>>> settings.getSliceTarget(); >>>>> >>>>> +// if (! settings.isMultiPhaseAggEnabled() || >>>>> settings.isSingleMode() || smallInput) { >>>>> >>>>> +// return false; >>>>> >>>>> +// } >>>>> >>>>> >>>>> for (AggregateCall aggCall : aggregate.getAggCallList()) { >>>>> >>>>> String name = aggCall.getAggregation().getName(); >>>>> >>>>> >>>>> Thanks, >>>>> Maryann >>>>> >>>>> >>>>> >>>>> On Tue, Oct 6, 2015 at 2:31 PM, Julian Hyde <jh...@apache.org> wrote: >>>>> >>>>>> Drill's current approach seems adequate for Drill alone but extending >>>>>> it to a heterogenous system that includes Phoenix seems like a hack. >>>>>> >>>>>> I think you should only create Prels for algebra nodes that you know >>>>>> for sure are going to run on the Drill engine. If there's a >>>>>> possibility that it would run in another engine such as Phoenix then >>>>>> they should still be logical. >>>>>> >>>>>> On Tue, Oct 6, 2015 at 11:03 AM, Maryann Xue <maryann....@gmail.com> >>>>>> wrote: >>>>>> > The partial aggregate seems to be working now, with one interface >>>>>> extension >>>>>> > and one bug fix in the Phoenix project. Will do some code cleanup >>>>>> and >>>>>> > create a pull request soon. >>>>>> > >>>>>> > Still there was a hack in the Drill project which I made to force >>>>>> 2-phase >>>>>> > aggregation. I'll try to fix that. >>>>>> > >>>>>> > Jacques, I have one question though, how can I verify that there >>>>>> are more >>>>>> > than one slice and the shuffle happens? >>>>>> > >>>>>> > >>>>>> > Thanks, >>>>>> > Maryann >>>>>> > >>>>>> > On Mon, Oct 5, 2015 at 2:03 PM, James Taylor < >>>>>> jamestay...@apache.org> wrote: >>>>>> > >>>>>> >> Maryann, >>>>>> >> I believe Jacques mentioned that a little bit of refactoring is >>>>>> required >>>>>> >> for a merge sort to occur - there's something that does that, but >>>>>> it's not >>>>>> >> expected to be used in this context currently. >>>>>> >> >>>>>> >> IMHO, there's more of a clear value in getting the aggregation to >>>>>> use >>>>>> >> Phoenix first, so I'd recommend going down that road as Jacques >>>>>> mentioned >>>>>> >> above if possible. Once that's working, we can circle back to the >>>>>> partial >>>>>> >> sort. >>>>>> >> >>>>>> >> Thoughts? >>>>>> >> James >>>>>> >> >>>>>> >> On Mon, Oct 5, 2015 at 10:40 AM, Maryann Xue < >>>>>> maryann....@gmail.com> >>>>>> >> wrote: >>>>>> >> >>>>>> >>> I actually tried implementing partial sort with >>>>>> >>> https://github.com/jacques-n/drill/pull/4, which I figured might >>>>>> be a >>>>>> >>> little easier to start with than partial aggregation. But I found >>>>>> that even >>>>>> >>> though the code worked (returned the right results), the Drill >>>>>> side sort >>>>>> >>> turned out to be a ordinary sort instead of a merge which it >>>>>> should have >>>>>> >>> been. Any idea of how to fix that? >>>>>> >>> >>>>>> >>> >>>>>> >>> Thanks, >>>>>> >>> Maryann >>>>>> >>> >>>>>> >>> On Mon, Oct 5, 2015 at 12:52 PM, Jacques Nadeau < >>>>>> jacq...@dremio.com> >>>>>> >>> wrote: >>>>>> >>> >>>>>> >>>> Right now this type of work is done here: >>>>>> >>>> >>>>>> >>>> >>>>>> >>>> >>>>>> https://github.com/apache/drill/blob/master/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/HashAggPrule.java >>>>>> >>>> >>>>>> >>>> >>>>>> https://github.com/apache/drill/blob/master/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/AggPruleBase.java >>>>>> >>>> >>>>>> >>>> With Distribution Trait application here: >>>>>> >>>> >>>>>> >>>> >>>>>> https://github.com/apache/drill/blob/master/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/DrillDistributionTraitDef.java >>>>>> >>>> >>>>>> >>>> To me, the easiest way to solve the Phoenix issue is by >>>>>> providing a rule >>>>>> >>>> that matches HashAgg and StreamAgg but requires Phoenix >>>>>> convention as >>>>>> >>>> input. It would replace everywhere but would only be plannable >>>>>> when it is >>>>>> >>>> the first phase of aggregation. >>>>>> >>>> >>>>>> >>>> Thoughts? >>>>>> >>>> >>>>>> >>>> >>>>>> >>>> >>>>>> >>>> -- >>>>>> >>>> Jacques Nadeau >>>>>> >>>> CTO and Co-Founder, Dremio >>>>>> >>>> >>>>>> >>>> On Thu, Oct 1, 2015 at 2:30 PM, Julian Hyde <jh...@apache.org> >>>>>> wrote: >>>>>> >>>> >>>>>> >>>>> Phoenix is able to perform quite a few relational operations on >>>>>> the >>>>>> >>>>> region server: scan, filter, project, aggregate, sort >>>>>> (optionally with >>>>>> >>>>> limit). However, the sort and aggregate are necessarily >>>>>> "local". They >>>>>> >>>>> can only deal with data on that region server, and there needs >>>>>> to be a >>>>>> >>>>> further operation to combine the results from the region >>>>>> servers. >>>>>> >>>>> >>>>>> >>>>> The question is how to plan such queries. I think the answer is >>>>>> an >>>>>> >>>>> AggregateExchangeTransposeRule. >>>>>> >>>>> >>>>>> >>>>> The rule would spot an Aggregate on a data source that is split >>>>>> into >>>>>> >>>>> multiple locations (partitions) and split it into a partial >>>>>> Aggregate >>>>>> >>>>> that computes sub-totals and a summarizing Aggregate that >>>>>> combines >>>>>> >>>>> those totals. >>>>>> >>>>> >>>>>> >>>>> How does the planner know that the Aggregate needs to be split? >>>>>> Since >>>>>> >>>>> the data's distribution has changed, there would need to be an >>>>>> >>>>> Exchange operator. It is the Exchange operator that triggers >>>>>> the rule >>>>>> >>>>> to fire. >>>>>> >>>>> >>>>>> >>>>> There are some special cases. If the data is sorted as well as >>>>>> >>>>> partitioned (say because the local aggregate uses a sort-based >>>>>> >>>>> algorithm) we could maybe use a more efficient plan. And if the >>>>>> >>>>> partition key is the same as the aggregation key we don't need a >>>>>> >>>>> summarizing Aggregate, just a Union. >>>>>> >>>>> >>>>>> >>>>> It turns out not to be very Phoenix-specific. In the >>>>>> Drill-on-Phoenix >>>>>> >>>>> scenario, once the Aggregate has been pushed through the >>>>>> Exchange >>>>>> >>>>> (i.e. onto the drill-bit residing on the region server) we can >>>>>> then >>>>>> >>>>> push the DrillAggregate across the drill-to-phoenix membrane >>>>>> and make >>>>>> >>>>> it into a PhoenixServerAggregate that executes in the region >>>>>> server. >>>>>> >>>>> >>>>>> >>>>> Related issues: >>>>>> >>>>> * https://issues.apache.org/jira/browse/DRILL-3840 >>>>>> >>>>> * https://issues.apache.org/jira/browse/CALCITE-751 >>>>>> >>>>> >>>>>> >>>>> Julian >>>>>> >>>>> >>>>>> >>>> >>>>>> >>>> >>>>>> >>> >>>>>> >> >>>>>> >>>>> >>>>> >>>> >>> >> >