Yes, but the partially aggregated results will not contain any duplicate rowkeys, since they are also group-by keys. What we need is the aggregators and call aggregate for each row. We can write a new simpler ResultIterator to replace this, but for now it should work correctly.
On Tue, Oct 6, 2015 at 9:45 PM, James Taylor <[email protected]> wrote: > The results we get back from the server-side scan are already the partial > aggregated values we need. GroupedAggregatingResultIterator will collapse > adjacent Tuples together which happen to have the same row key. I'm not > sure we want/need this to happen. Instead I think we just need to decode > the aggregated values directly from the result of the scan. > > On Tue, Oct 6, 2015 at 6:07 PM, Maryann Xue <[email protected]> wrote: > >> Hi James, >> >> bq. A few questions for you: not sure I understand the changes you made >> to PhoenixRecordReader. Is it necessary to wrap the server-side scan >> results in a GroupedAggregatingResultIterator? Each server-side scan will >> produce results with a single tuple per group by key. In Phoenix, the >> GroupedAggregatingResultIterator's function in life is to do the final >> merge. Note too that the results aren't sorted that come back from the >> aggregated scan (while GroupedAggregatingResultIterator needs tuples sorted >> by the group by key). Or is this just to help in decoding the values coming >> back from the scan? >> >> It is necessary. I suppose what we should return as a partial result from >> PhoenixRecordReader is exactly the same as what we do in standalone >> Phoenix+Calcite, except that the result is partial or say incomplete. For >> example, we have "select a, count(*) from t group by a", we should return >> rows that have "a" as the first expression value, and "count(*)" as the >> second expression value. For this "count" expression, it actually needs a >> ClientAggregator for evaluation, and that's what this >> GroupedAggregatingResultIterator is used for. >> Since "each server-side scan will produce results with a single tuple >> per group by key", and PhoenixRecordReader is only dealing with one >> server-side result each time, we don't care how the group-by keys are >> arranged (ordered or unordered"). Actually GroupedAggregatingResultIterator >> is not the group-by iterator we use for AggregatePlan. It does not >> "combine". It treats every row as a different group, by returning its >> rowkey as the group-by key (GroupedAggregatingResultIterator.java:56). >> >> In short, this iterator is for decoding the server-side values. So we may >> want to optimize this logic by removing this serialization and >> deserialization and having only one set of aggregators in future. >> >> bq. Also, not sure what impact it has in the way we "combine" the scans >> in our Drill parallelization code (PhoenixGroupScan.applyAssignments()), >> as each of our scans could include duplicate group by keys. Is it ok to >> combine them in this case? >> >> It should not matter, or at least is not related to the problem I'm now >> having. >> >> bq. One more question: how is the group by key communicated back to Drill? >> >> According to the HashAggPrule, if it decides to create a two-phase >> aggregate, the first phase is now handled by Phoenix (after applying the >> PhoenixHashAggPrule). I assume then the partial results gets shuffled based >> on the hash of their group-by keys (returned by PhoenixRecordReader). The >> final step is the Drill hash aggregation. >> >> >> This is my test table "A.BEER", which has for columns: "B", "E1", "E2", >> "R", all of INTEGER types. And the data is generated like this: >> for (x=1 to N) { //currently N=1000 >> UPSERT INTO A.BEER VALUES (x, x % 10, x % 100, x); >> } >> >> The group-by query for testing is "SELECT e1, count(*) FROM a.beer GROUP >> BY e1". >> The expected result should be: >> 0 100 >> 1 100 >> 2 100 >> 3 100 >> 4 100 >> 5 100 >> 6 100 >> 7 100 >> 8 100 >> 9 100 >> The actual result was: >> 6 0 >> 7 0 >> 8 0 >> 9 0 >> 0 0 >> 1 100 >> 2 100 >> 3 100 >> 4 100 >> 5 100 >> >> Here I just tried another one "SELECT e2, count(*) FROM a.beer GROUP BY >> e2". >> Similarly, the expected result should have group-by keys from 0 to 99, >> each having a value of 10 as the count, while the actual result was: >> from group-by key 86 to 99, together with 0, their count values were all >> 0; the rest (1 to 85) all had the correct value 10. >> >> Looks to me that the scans were good but there was a problem with one of >> the hash buckets. >> >> Thanks, >> Maryann >> >> >> On Tue, Oct 6, 2015 at 6:45 PM, James Taylor <[email protected]> >> wrote: >> >>> Nice progress, Maryann. >>> >>> A few questions for you: not sure I understand the changes you made to >>> PhoenixRecordReader. Is it necessary to wrap the server-side scan results >>> in a GroupedAggregatingResultIterator? Each server-side scan will produce >>> results with a single tuple per group by key. In Phoenix, the >>> GroupedAggregatingResultIterator's function in life is to do the final >>> merge. Note too that the results aren't sorted that come back from the >>> aggregated scan (while GroupedAggregatingResultIterator needs tuples sorted >>> by the group by key). Or is this just to help in decoding the values coming >>> back from the scan? >>> >>> Also, not sure what impact it has in the way we "combine" the scans in >>> our Drill parallelization code (PhoenixGroupScan.applyAssignments()), as >>> each of our scans could include duplicate group by keys. Is it ok to >>> combine them in this case? >>> >>> One more question: how is the group by key communicated back to Drill? >>> >>> Thanks, >>> James >>> >>> >>> On Tue, Oct 6, 2015 at 2:10 PM, Maryann Xue <[email protected]> >>> wrote: >>> >>>> Added a few fixes in the pull request. Tested with two regions, turned >>>> out that half of the result is empty (count = 0). >>>> Not sure if there's anything wrong with >>>> https://github.com/maryannxue/drill/blob/phoenix_plugin/contrib/storage-phoenix/src/main/java/org/apache/drill/exec/store/phoenix/rel/PhoenixHashAggPrule.java >>>> . >>>> Like Julian said, this rule looks a bit hacky. >>>> >>>> To force a 2-phase HashAgg, I made a temporary change as well: >>>> >>>> diff --git >>>> a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/AggPruleBase.java >>>> b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/AggPruleBase.java >>>> >>>> index b911f6b..58bc918 100644 >>>> >>>> --- >>>> a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/AggPruleBase.java >>>> >>>> +++ >>>> b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/AggPruleBase.java >>>> >>>> @@ -60,12 +60,12 @@ public abstract class AggPruleBase extends Prule { >>>> >>>> // If any of the aggregate functions are not one of these, then we >>>> >>>> // currently won't generate a 2 phase plan. >>>> >>>> protected boolean create2PhasePlan(RelOptRuleCall call, >>>> DrillAggregateRel aggregate) { >>>> >>>> - PlannerSettings settings = >>>> PrelUtil.getPlannerSettings(call.getPlanner()); >>>> >>>> - RelNode child = call.rel(0).getInputs().get(0); >>>> >>>> - boolean smallInput = child.getRows() < settings.getSliceTarget(); >>>> >>>> - if (! settings.isMultiPhaseAggEnabled() || settings.isSingleMode() >>>> || smallInput) { >>>> >>>> - return false; >>>> >>>> - } >>>> >>>> +// PlannerSettings settings = >>>> PrelUtil.getPlannerSettings(call.getPlanner()); >>>> >>>> +// RelNode child = call.rel(0).getInputs().get(0); >>>> >>>> +// boolean smallInput = child.getRows() < settings.getSliceTarget(); >>>> >>>> +// if (! settings.isMultiPhaseAggEnabled() || >>>> settings.isSingleMode() || smallInput) { >>>> >>>> +// return false; >>>> >>>> +// } >>>> >>>> >>>> for (AggregateCall aggCall : aggregate.getAggCallList()) { >>>> >>>> String name = aggCall.getAggregation().getName(); >>>> >>>> >>>> Thanks, >>>> Maryann >>>> >>>> >>>> >>>> On Tue, Oct 6, 2015 at 2:31 PM, Julian Hyde <[email protected]> wrote: >>>> >>>>> Drill's current approach seems adequate for Drill alone but extending >>>>> it to a heterogenous system that includes Phoenix seems like a hack. >>>>> >>>>> I think you should only create Prels for algebra nodes that you know >>>>> for sure are going to run on the Drill engine. If there's a >>>>> possibility that it would run in another engine such as Phoenix then >>>>> they should still be logical. >>>>> >>>>> On Tue, Oct 6, 2015 at 11:03 AM, Maryann Xue <[email protected]> >>>>> wrote: >>>>> > The partial aggregate seems to be working now, with one interface >>>>> extension >>>>> > and one bug fix in the Phoenix project. Will do some code cleanup and >>>>> > create a pull request soon. >>>>> > >>>>> > Still there was a hack in the Drill project which I made to force >>>>> 2-phase >>>>> > aggregation. I'll try to fix that. >>>>> > >>>>> > Jacques, I have one question though, how can I verify that there are >>>>> more >>>>> > than one slice and the shuffle happens? >>>>> > >>>>> > >>>>> > Thanks, >>>>> > Maryann >>>>> > >>>>> > On Mon, Oct 5, 2015 at 2:03 PM, James Taylor <[email protected]> >>>>> wrote: >>>>> > >>>>> >> Maryann, >>>>> >> I believe Jacques mentioned that a little bit of refactoring is >>>>> required >>>>> >> for a merge sort to occur - there's something that does that, but >>>>> it's not >>>>> >> expected to be used in this context currently. >>>>> >> >>>>> >> IMHO, there's more of a clear value in getting the aggregation to >>>>> use >>>>> >> Phoenix first, so I'd recommend going down that road as Jacques >>>>> mentioned >>>>> >> above if possible. Once that's working, we can circle back to the >>>>> partial >>>>> >> sort. >>>>> >> >>>>> >> Thoughts? >>>>> >> James >>>>> >> >>>>> >> On Mon, Oct 5, 2015 at 10:40 AM, Maryann Xue <[email protected] >>>>> > >>>>> >> wrote: >>>>> >> >>>>> >>> I actually tried implementing partial sort with >>>>> >>> https://github.com/jacques-n/drill/pull/4, which I figured might >>>>> be a >>>>> >>> little easier to start with than partial aggregation. But I found >>>>> that even >>>>> >>> though the code worked (returned the right results), the Drill >>>>> side sort >>>>> >>> turned out to be a ordinary sort instead of a merge which it >>>>> should have >>>>> >>> been. Any idea of how to fix that? >>>>> >>> >>>>> >>> >>>>> >>> Thanks, >>>>> >>> Maryann >>>>> >>> >>>>> >>> On Mon, Oct 5, 2015 at 12:52 PM, Jacques Nadeau < >>>>> [email protected]> >>>>> >>> wrote: >>>>> >>> >>>>> >>>> Right now this type of work is done here: >>>>> >>>> >>>>> >>>> >>>>> >>>> >>>>> https://github.com/apache/drill/blob/master/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/HashAggPrule.java >>>>> >>>> >>>>> >>>> >>>>> https://github.com/apache/drill/blob/master/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/AggPruleBase.java >>>>> >>>> >>>>> >>>> With Distribution Trait application here: >>>>> >>>> >>>>> >>>> >>>>> https://github.com/apache/drill/blob/master/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/DrillDistributionTraitDef.java >>>>> >>>> >>>>> >>>> To me, the easiest way to solve the Phoenix issue is by providing >>>>> a rule >>>>> >>>> that matches HashAgg and StreamAgg but requires Phoenix >>>>> convention as >>>>> >>>> input. It would replace everywhere but would only be plannable >>>>> when it is >>>>> >>>> the first phase of aggregation. >>>>> >>>> >>>>> >>>> Thoughts? >>>>> >>>> >>>>> >>>> >>>>> >>>> >>>>> >>>> -- >>>>> >>>> Jacques Nadeau >>>>> >>>> CTO and Co-Founder, Dremio >>>>> >>>> >>>>> >>>> On Thu, Oct 1, 2015 at 2:30 PM, Julian Hyde <[email protected]> >>>>> wrote: >>>>> >>>> >>>>> >>>>> Phoenix is able to perform quite a few relational operations on >>>>> the >>>>> >>>>> region server: scan, filter, project, aggregate, sort >>>>> (optionally with >>>>> >>>>> limit). However, the sort and aggregate are necessarily "local". >>>>> They >>>>> >>>>> can only deal with data on that region server, and there needs >>>>> to be a >>>>> >>>>> further operation to combine the results from the region servers. >>>>> >>>>> >>>>> >>>>> The question is how to plan such queries. I think the answer is >>>>> an >>>>> >>>>> AggregateExchangeTransposeRule. >>>>> >>>>> >>>>> >>>>> The rule would spot an Aggregate on a data source that is split >>>>> into >>>>> >>>>> multiple locations (partitions) and split it into a partial >>>>> Aggregate >>>>> >>>>> that computes sub-totals and a summarizing Aggregate that >>>>> combines >>>>> >>>>> those totals. >>>>> >>>>> >>>>> >>>>> How does the planner know that the Aggregate needs to be split? >>>>> Since >>>>> >>>>> the data's distribution has changed, there would need to be an >>>>> >>>>> Exchange operator. It is the Exchange operator that triggers the >>>>> rule >>>>> >>>>> to fire. >>>>> >>>>> >>>>> >>>>> There are some special cases. If the data is sorted as well as >>>>> >>>>> partitioned (say because the local aggregate uses a sort-based >>>>> >>>>> algorithm) we could maybe use a more efficient plan. And if the >>>>> >>>>> partition key is the same as the aggregation key we don't need a >>>>> >>>>> summarizing Aggregate, just a Union. >>>>> >>>>> >>>>> >>>>> It turns out not to be very Phoenix-specific. In the >>>>> Drill-on-Phoenix >>>>> >>>>> scenario, once the Aggregate has been pushed through the Exchange >>>>> >>>>> (i.e. onto the drill-bit residing on the region server) we can >>>>> then >>>>> >>>>> push the DrillAggregate across the drill-to-phoenix membrane and >>>>> make >>>>> >>>>> it into a PhoenixServerAggregate that executes in the region >>>>> server. >>>>> >>>>> >>>>> >>>>> Related issues: >>>>> >>>>> * https://issues.apache.org/jira/browse/DRILL-3840 >>>>> >>>>> * https://issues.apache.org/jira/browse/CALCITE-751 >>>>> >>>>> >>>>> >>>>> Julian >>>>> >>>>> >>>>> >>>> >>>>> >>>> >>>>> >>> >>>>> >> >>>>> >>>> >>>> >>> >> >
