One thing from what I asked James offline yesterday, and maybe we can
discuss a little bit in today's meeting:

Phoenix uses a list of lists of Scan objects to indicate Region boundaries
and guideposts, and if the top-level list contains more than one element it
means that the results from different Scanner/ResultIterator should be
merge-sorted. We now use this list in Drill integration to generate
different batches or slices. I see from the Drill plan of a simple select
like "SELECT * FROM A.BEER" that a Drill Sort node sits on top of the
PhoenixTableScan. I guess this is a real sort rather than a merge-sort.
So optimally,
1) this should be a merge-sort (to be more accurate, a merge)
2) furthermore, if Drill has something to indicate the order among slices
and batches, we could even turn it into a concat.

The structure of this Scan list might be helpful for 2), or we may have
some Logical representation for this. Otherwise, we can simply flatten this
list to a one-dimensional list as we do now (in my ci yesterday).



Thanks,
Maryann

On Tue, Oct 6, 2015 at 9:52 PM, Maryann Xue <maryann....@gmail.com> wrote:

> Yes, but the partially aggregated results will not contain any duplicate
> rowkeys, since they are also group-by keys. What we need is the aggregators
> and call aggregate for each row. We can write a new simpler ResultIterator
> to replace this, but for now it should work correctly.
>
> On Tue, Oct 6, 2015 at 9:45 PM, James Taylor <jamestay...@apache.org>
> wrote:
>
>> The results we get back from the server-side scan are already the partial
>> aggregated values we need. GroupedAggregatingResultIterator will
>> collapse adjacent Tuples together which happen to have the same row key.
>> I'm not sure we want/need this to happen. Instead I think we just need to
>> decode the aggregated values directly from the result of the scan.
>>
>> On Tue, Oct 6, 2015 at 6:07 PM, Maryann Xue <maryann....@gmail.com>
>> wrote:
>>
>>> Hi James,
>>>
>>> bq. A few questions for you: not sure I understand the changes you made
>>> to PhoenixRecordReader. Is it necessary to wrap the server-side scan
>>> results in a GroupedAggregatingResultIterator? Each server-side scan will
>>> produce results with a single tuple per group by key. In Phoenix, the
>>> GroupedAggregatingResultIterator's function in life is to do the final
>>> merge. Note too that the results aren't sorted that come back from the
>>> aggregated scan (while GroupedAggregatingResultIterator needs tuples sorted
>>> by the group by key). Or is this just to help in decoding the values coming
>>> back from the scan?
>>>
>>> It is necessary. I suppose what we should return as a partial result
>>> from PhoenixRecordReader is exactly the same as what we do in standalone
>>> Phoenix+Calcite, except that the result is partial or say incomplete. For
>>> example, we have "select a, count(*) from t group by a", we should return
>>> rows that have "a" as the first expression value, and "count(*)" as the
>>> second expression value. For this "count" expression, it actually needs a
>>> ClientAggregator for evaluation, and that's what this
>>> GroupedAggregatingResultIterator is used for.
>>> Since "each server-side scan will produce results with a single tuple
>>> per group by key", and PhoenixRecordReader is only dealing with one
>>> server-side result each time, we don't care how the group-by keys are
>>> arranged (ordered or unordered"). Actually
>>> GroupedAggregatingResultIterator is not the group-by iterator we use
>>> for AggregatePlan. It does not "combine". It treats every row as a
>>> different group, by returning its rowkey as the group-by key (
>>> GroupedAggregatingResultIterator.java:56).
>>>
>>> In short, this iterator is for decoding the server-side values. So we
>>> may want to optimize this logic by removing this serialization and
>>> deserialization and having only one set of aggregators in future.
>>>
>>> bq. Also, not sure what impact it has in the way we "combine" the scans
>>> in our Drill parallelization code (PhoenixGroupScan.applyAssignments()),
>>> as each of our scans could include duplicate group by keys. Is it ok to
>>> combine them in this case?
>>>
>>> It should not matter, or at least is not related to the problem I'm now
>>> having.
>>>
>>> bq. One more question: how is the group by key communicated back to
>>> Drill?
>>>
>>> According to the HashAggPrule, if it decides to create a two-phase
>>> aggregate, the first phase is now handled by Phoenix (after applying the
>>> PhoenixHashAggPrule). I assume then the partial results gets shuffled based
>>> on the hash of their group-by keys (returned by PhoenixRecordReader). The
>>> final step is the Drill hash aggregation.
>>>
>>>
>>> This is my test table "A.BEER", which has for columns: "B", "E1", "E2",
>>> "R", all of INTEGER types. And the data is generated like this:
>>> for (x=1 to N) { //currently N=1000
>>>  UPSERT INTO A.BEER VALUES (x, x % 10, x % 100, x);
>>> }
>>>
>>> The group-by query for testing is "SELECT e1, count(*) FROM a.beer GROUP
>>> BY e1".
>>> The expected result should be:
>>> 0 100
>>> 1 100
>>> 2 100
>>> 3 100
>>> 4 100
>>> 5 100
>>> 6 100
>>> 7 100
>>> 8 100
>>> 9 100
>>> The actual result was:
>>> 6 0
>>> 7 0
>>> 8 0
>>> 9 0
>>> 0 0
>>> 1 100
>>> 2 100
>>> 3 100
>>> 4 100
>>> 5 100
>>>
>>> Here I just tried another one "SELECT e2, count(*) FROM a.beer GROUP BY
>>> e2".
>>> Similarly, the expected result should have group-by keys from 0 to 99,
>>> each having a value of 10 as the count, while the actual result was:
>>> from group-by key 86 to 99, together with 0, their count values were all
>>> 0; the rest (1 to 85) all had the correct value 10.
>>>
>>> Looks to me that the scans were good but there was a problem with one of
>>> the hash buckets.
>>>
>>> Thanks,
>>> Maryann
>>>
>>>
>>> On Tue, Oct 6, 2015 at 6:45 PM, James Taylor <jamestay...@apache.org>
>>> wrote:
>>>
>>>> Nice progress, Maryann.
>>>>
>>>> A few questions for you: not sure I understand the changes you made to
>>>> PhoenixRecordReader. Is it necessary to wrap the server-side scan results
>>>> in a GroupedAggregatingResultIterator? Each server-side scan will produce
>>>> results with a single tuple per group by key. In Phoenix, the
>>>> GroupedAggregatingResultIterator's function in life is to do the final
>>>> merge. Note too that the results aren't sorted that come back from the
>>>> aggregated scan (while GroupedAggregatingResultIterator needs tuples sorted
>>>> by the group by key). Or is this just to help in decoding the values coming
>>>> back from the scan?
>>>>
>>>> Also, not sure what impact it has in the way we "combine" the scans in
>>>> our Drill parallelization code (PhoenixGroupScan.applyAssignments()), as
>>>> each of our scans could include duplicate group by keys. Is it ok to
>>>> combine them in this case?
>>>>
>>>> One more question: how is the group by key communicated back to Drill?
>>>>
>>>> Thanks,
>>>> James
>>>>
>>>>
>>>> On Tue, Oct 6, 2015 at 2:10 PM, Maryann Xue <maryann....@gmail.com>
>>>> wrote:
>>>>
>>>>> Added a few fixes in the pull request. Tested with two regions, turned
>>>>> out that half of the result is empty (count = 0).
>>>>> Not sure if there's anything wrong with
>>>>> https://github.com/maryannxue/drill/blob/phoenix_plugin/contrib/storage-phoenix/src/main/java/org/apache/drill/exec/store/phoenix/rel/PhoenixHashAggPrule.java
>>>>> .
>>>>> Like Julian said, this rule looks a bit hacky.
>>>>>
>>>>> To force a 2-phase HashAgg, I made a temporary change as well:
>>>>>
>>>>> diff --git
>>>>> a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/AggPruleBase.java
>>>>> b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/AggPruleBase.java
>>>>>
>>>>> index b911f6b..58bc918 100644
>>>>>
>>>>> ---
>>>>> a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/AggPruleBase.java
>>>>>
>>>>> +++
>>>>> b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/AggPruleBase.java
>>>>>
>>>>> @@ -60,12 +60,12 @@ public abstract class AggPruleBase extends Prule {
>>>>>
>>>>>    // If any of the aggregate functions are not one of these, then we
>>>>>
>>>>>    // currently won't generate a 2 phase plan.
>>>>>
>>>>>    protected boolean create2PhasePlan(RelOptRuleCall call,
>>>>> DrillAggregateRel aggregate) {
>>>>>
>>>>> -    PlannerSettings settings =
>>>>> PrelUtil.getPlannerSettings(call.getPlanner());
>>>>>
>>>>> -    RelNode child = call.rel(0).getInputs().get(0);
>>>>>
>>>>> -    boolean smallInput = child.getRows() < settings.getSliceTarget();
>>>>>
>>>>> -    if (! settings.isMultiPhaseAggEnabled() ||
>>>>> settings.isSingleMode() || smallInput) {
>>>>>
>>>>> -      return false;
>>>>>
>>>>> -    }
>>>>>
>>>>> +//    PlannerSettings settings =
>>>>> PrelUtil.getPlannerSettings(call.getPlanner());
>>>>>
>>>>> +//    RelNode child = call.rel(0).getInputs().get(0);
>>>>>
>>>>> +//    boolean smallInput = child.getRows() <
>>>>> settings.getSliceTarget();
>>>>>
>>>>> +//    if (! settings.isMultiPhaseAggEnabled() ||
>>>>> settings.isSingleMode() || smallInput) {
>>>>>
>>>>> +//      return false;
>>>>>
>>>>> +//    }
>>>>>
>>>>>
>>>>>      for (AggregateCall aggCall : aggregate.getAggCallList()) {
>>>>>
>>>>>        String name = aggCall.getAggregation().getName();
>>>>>
>>>>>
>>>>> Thanks,
>>>>> Maryann
>>>>>
>>>>>
>>>>>
>>>>> On Tue, Oct 6, 2015 at 2:31 PM, Julian Hyde <jh...@apache.org> wrote:
>>>>>
>>>>>> Drill's current approach seems adequate for Drill alone but extending
>>>>>> it to a heterogenous system that includes Phoenix seems like a hack.
>>>>>>
>>>>>> I think you should only create Prels for algebra nodes that you know
>>>>>> for sure are going to run on the Drill engine. If there's a
>>>>>> possibility that it would run in another engine such as Phoenix then
>>>>>> they should still be logical.
>>>>>>
>>>>>> On Tue, Oct 6, 2015 at 11:03 AM, Maryann Xue <maryann....@gmail.com>
>>>>>> wrote:
>>>>>> > The partial aggregate seems to be working now, with one interface
>>>>>> extension
>>>>>> > and one bug fix in the Phoenix project. Will do some code cleanup
>>>>>> and
>>>>>> > create a pull request soon.
>>>>>> >
>>>>>> > Still there was a hack in the Drill project which I made to force
>>>>>> 2-phase
>>>>>> > aggregation. I'll try to fix that.
>>>>>> >
>>>>>> > Jacques, I have one question though, how can I verify that there
>>>>>> are more
>>>>>> > than one slice and the shuffle happens?
>>>>>> >
>>>>>> >
>>>>>> > Thanks,
>>>>>> > Maryann
>>>>>> >
>>>>>> > On Mon, Oct 5, 2015 at 2:03 PM, James Taylor <
>>>>>> jamestay...@apache.org> wrote:
>>>>>> >
>>>>>> >> Maryann,
>>>>>> >> I believe Jacques mentioned that a little bit of refactoring is
>>>>>> required
>>>>>> >> for a merge sort to occur - there's something that does that, but
>>>>>> it's not
>>>>>> >> expected to be used in this context currently.
>>>>>> >>
>>>>>> >> IMHO, there's more of a clear value in getting the aggregation to
>>>>>> use
>>>>>> >> Phoenix first, so I'd recommend going down that road as Jacques
>>>>>> mentioned
>>>>>> >> above if possible. Once that's working, we can circle back to the
>>>>>> partial
>>>>>> >> sort.
>>>>>> >>
>>>>>> >> Thoughts?
>>>>>> >> James
>>>>>> >>
>>>>>> >> On Mon, Oct 5, 2015 at 10:40 AM, Maryann Xue <
>>>>>> maryann....@gmail.com>
>>>>>> >> wrote:
>>>>>> >>
>>>>>> >>> I actually tried implementing partial sort with
>>>>>> >>> https://github.com/jacques-n/drill/pull/4, which I figured might
>>>>>> be a
>>>>>> >>> little easier to start with than partial aggregation. But I found
>>>>>> that even
>>>>>> >>> though the code worked (returned the right results), the Drill
>>>>>> side sort
>>>>>> >>> turned out to be a ordinary sort instead of a merge which it
>>>>>> should have
>>>>>> >>> been. Any idea of how to fix that?
>>>>>> >>>
>>>>>> >>>
>>>>>> >>> Thanks,
>>>>>> >>> Maryann
>>>>>> >>>
>>>>>> >>> On Mon, Oct 5, 2015 at 12:52 PM, Jacques Nadeau <
>>>>>> jacq...@dremio.com>
>>>>>> >>> wrote:
>>>>>> >>>
>>>>>> >>>> Right now this type of work is done here:
>>>>>> >>>>
>>>>>> >>>>
>>>>>> >>>>
>>>>>> https://github.com/apache/drill/blob/master/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/HashAggPrule.java
>>>>>> >>>>
>>>>>> >>>>
>>>>>> https://github.com/apache/drill/blob/master/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/AggPruleBase.java
>>>>>> >>>>
>>>>>> >>>> With Distribution Trait application here:
>>>>>> >>>>
>>>>>> >>>>
>>>>>> https://github.com/apache/drill/blob/master/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/DrillDistributionTraitDef.java
>>>>>> >>>>
>>>>>> >>>> To me, the easiest way to solve the Phoenix issue is by
>>>>>> providing a rule
>>>>>> >>>> that matches HashAgg and StreamAgg but requires Phoenix
>>>>>> convention as
>>>>>> >>>> input. It would replace everywhere but would only be plannable
>>>>>> when it is
>>>>>> >>>> the first phase of aggregation.
>>>>>> >>>>
>>>>>> >>>> Thoughts?
>>>>>> >>>>
>>>>>> >>>>
>>>>>> >>>>
>>>>>> >>>> --
>>>>>> >>>> Jacques Nadeau
>>>>>> >>>> CTO and Co-Founder, Dremio
>>>>>> >>>>
>>>>>> >>>> On Thu, Oct 1, 2015 at 2:30 PM, Julian Hyde <jh...@apache.org>
>>>>>> wrote:
>>>>>> >>>>
>>>>>> >>>>> Phoenix is able to perform quite a few relational operations on
>>>>>> the
>>>>>> >>>>> region server: scan, filter, project, aggregate, sort
>>>>>> (optionally with
>>>>>> >>>>> limit). However, the sort and aggregate are necessarily
>>>>>> "local". They
>>>>>> >>>>> can only deal with data on that region server, and there needs
>>>>>> to be a
>>>>>> >>>>> further operation to combine the results from the region
>>>>>> servers.
>>>>>> >>>>>
>>>>>> >>>>> The question is how to plan such queries. I think the answer is
>>>>>> an
>>>>>> >>>>> AggregateExchangeTransposeRule.
>>>>>> >>>>>
>>>>>> >>>>> The rule would spot an Aggregate on a data source that is split
>>>>>> into
>>>>>> >>>>> multiple locations (partitions) and split it into a partial
>>>>>> Aggregate
>>>>>> >>>>> that computes sub-totals and a summarizing Aggregate that
>>>>>> combines
>>>>>> >>>>> those totals.
>>>>>> >>>>>
>>>>>> >>>>> How does the planner know that the Aggregate needs to be split?
>>>>>> Since
>>>>>> >>>>> the data's distribution has changed, there would need to be an
>>>>>> >>>>> Exchange operator. It is the Exchange operator that triggers
>>>>>> the rule
>>>>>> >>>>> to fire.
>>>>>> >>>>>
>>>>>> >>>>> There are some special cases. If the data is sorted as well as
>>>>>> >>>>> partitioned (say because the local aggregate uses a sort-based
>>>>>> >>>>> algorithm) we could maybe use a more efficient plan. And if the
>>>>>> >>>>> partition key is the same as the aggregation key we don't need a
>>>>>> >>>>> summarizing Aggregate, just a Union.
>>>>>> >>>>>
>>>>>> >>>>> It turns out not to be very Phoenix-specific. In the
>>>>>> Drill-on-Phoenix
>>>>>> >>>>> scenario, once the Aggregate has been pushed through the
>>>>>> Exchange
>>>>>> >>>>> (i.e. onto the drill-bit residing on the region server) we can
>>>>>> then
>>>>>> >>>>> push the DrillAggregate across the drill-to-phoenix membrane
>>>>>> and make
>>>>>> >>>>> it into a PhoenixServerAggregate that executes in the region
>>>>>> server.
>>>>>> >>>>>
>>>>>> >>>>> Related issues:
>>>>>> >>>>> * https://issues.apache.org/jira/browse/DRILL-3840
>>>>>> >>>>> * https://issues.apache.org/jira/browse/CALCITE-751
>>>>>> >>>>>
>>>>>> >>>>> Julian
>>>>>> >>>>>
>>>>>> >>>>
>>>>>> >>>>
>>>>>> >>>
>>>>>> >>
>>>>>>
>>>>>
>>>>>
>>>>
>>>
>>
>

Reply via email to