Repository: phoenix Updated Branches: refs/heads/calcite 500e85cc1 -> 2bfebfd1d
PHOENIX-1837 Detect ordered/unordered group-by Project: http://git-wip-us.apache.org/repos/asf/phoenix/repo Commit: http://git-wip-us.apache.org/repos/asf/phoenix/commit/2bfebfd1 Tree: http://git-wip-us.apache.org/repos/asf/phoenix/tree/2bfebfd1 Diff: http://git-wip-us.apache.org/repos/asf/phoenix/diff/2bfebfd1 Branch: refs/heads/calcite Commit: 2bfebfd1d24461a08246f35eb2783311e6b77f44 Parents: 500e85c Author: maryannxue <[email protected]> Authored: Wed Oct 21 15:12:00 2015 -0400 Committer: maryannxue <[email protected]> Committed: Wed Oct 21 15:12:00 2015 -0400 ---------------------------------------------------------------------- .../org/apache/phoenix/calcite/CalciteIT.java | 112 +++++++++++++++---- .../calcite/jdbc/PhoenixPrepareImpl.java | 2 + .../calcite/rel/PhoenixAbstractAggregate.java | 45 +++++++- .../rules/PhoenixOrderedAggregateRule.java | 77 +++++++++++++ 4 files changed, 212 insertions(+), 24 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/phoenix/blob/2bfebfd1/phoenix-core/src/it/java/org/apache/phoenix/calcite/CalciteIT.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/it/java/org/apache/phoenix/calcite/CalciteIT.java b/phoenix-core/src/it/java/org/apache/phoenix/calcite/CalciteIT.java index 5987b32..3f001e0 100644 --- a/phoenix-core/src/it/java/org/apache/phoenix/calcite/CalciteIT.java +++ b/phoenix-core/src/it/java/org/apache/phoenix/calcite/CalciteIT.java @@ -647,9 +647,57 @@ public class CalciteIT extends BaseClientManagedTimeIT { } @Test public void testAggregate() { + start(false).sql("select count(b_string) from atable") + .explainIs("PhoenixToEnumerableConverter\n" + + " PhoenixServerAggregate(group=[{}], EXPR$0=[COUNT($3)])\n" + + " PhoenixTableScan(table=[[phoenix, ATABLE]])\n") + .resultIs(new Object[][] { + {9L}}) + .close(); + + start(false).sql("select organization_id, count(b_string) from atable group by organization_id") + .explainIs("PhoenixToEnumerableConverter\n" + + " PhoenixServerAggregate(group=[{0}], EXPR$1=[COUNT($3)], isOrdered=[true])\n" + + " PhoenixTableScan(table=[[phoenix, ATABLE]])\n") + .resultIs(new Object[][] { + {"00D300000000XHP", 9L}}) + .close(); + + start(false).sql("select organization_id, entity_id, count(b_string) from atable group by organization_id, entity_id") + .explainIs("PhoenixToEnumerableConverter\n" + + " PhoenixServerAggregate(group=[{0, 1}], EXPR$2=[COUNT($3)], isOrdered=[true])\n" + + " PhoenixTableScan(table=[[phoenix, ATABLE]])\n") + .resultIs(new Object[][] { + {"00D300000000XHP", "00A123122312312", 1L}, + {"00D300000000XHP", "00A223122312312", 1L}, + {"00D300000000XHP", "00A323122312312", 1L}, + {"00D300000000XHP", "00A423122312312", 1L}, + {"00D300000000XHP", "00B523122312312", 1L}, + {"00D300000000XHP", "00B623122312312", 1L}, + {"00D300000000XHP", "00B723122312312", 1L}, + {"00D300000000XHP", "00B823122312312", 1L}, + {"00D300000000XHP", "00C923122312312", 1L}}) + .close(); + + start(false).sql("select entity_id, count(b_string) from atable group by entity_id") + .explainIs("PhoenixToEnumerableConverter\n" + + " PhoenixServerAggregate(group=[{1}], EXPR$1=[COUNT($3)], isOrdered=[false])\n" + + " PhoenixTableScan(table=[[phoenix, ATABLE]])\n") + .resultIs(new Object[][] { + {"00A123122312312", 1L}, + {"00A223122312312", 1L}, + {"00A323122312312", 1L}, + {"00A423122312312", 1L}, + {"00B523122312312", 1L}, + {"00B623122312312", 1L}, + {"00B723122312312", 1L}, + {"00B823122312312", 1L}, + {"00C923122312312", 1L}}) + .close(); + start(false).sql("select a_string, count(b_string) from atable group by a_string") .explainIs("PhoenixToEnumerableConverter\n" + - " PhoenixServerAggregate(group=[{2}], EXPR$1=[COUNT($3)])\n" + + " PhoenixServerAggregate(group=[{2}], EXPR$1=[COUNT($3)], isOrdered=[false])\n" + " PhoenixTableScan(table=[[phoenix, ATABLE]])\n") .resultIs(new Object[][] { {"a", 4L}, @@ -660,7 +708,7 @@ public class CalciteIT extends BaseClientManagedTimeIT { start(false).sql("select count(entity_id), a_string from atable group by a_string") .explainIs("PhoenixToEnumerableConverter\n" + " PhoenixClientProject(EXPR$0=[$1], A_STRING=[$0])\n" + - " PhoenixServerAggregate(group=[{2}], EXPR$0=[COUNT()])\n" + + " PhoenixServerAggregate(group=[{2}], EXPR$0=[COUNT()], isOrdered=[false])\n" + " PhoenixTableScan(table=[[phoenix, ATABLE]])\n") .resultIs(new Object[][] { {4L, "a"}, @@ -670,7 +718,7 @@ public class CalciteIT extends BaseClientManagedTimeIT { start(false).sql("select s.name, count(\"item_id\") from " + JOIN_SUPPLIER_TABLE_FULL_NAME + " s join " + JOIN_ITEM_TABLE_FULL_NAME + " i on s.\"supplier_id\" = i.\"supplier_id\" group by s.name") .explainIs("PhoenixToEnumerableConverter\n" + - " PhoenixServerAggregate(group=[{3}], EXPR$1=[COUNT()])\n" + + " PhoenixServerAggregate(group=[{3}], EXPR$1=[COUNT()], isOrdered=[false])\n" + " PhoenixServerJoin(condition=[=($2, $1)], joinType=[inner])\n" + " PhoenixServerProject(item_id=[$0], supplier_id=[$5])\n" + " PhoenixTableScan(table=[[phoenix, Join, ItemTable]])\n" + @@ -683,12 +731,29 @@ public class CalciteIT extends BaseClientManagedTimeIT { {"S5", 1L}, {"S6", 1L}}) .close(); + + // test PhoenixOrderedAggregateRule + start(false).sql("select s.\"supplier_id\", count(*) from " + JOIN_SUPPLIER_TABLE_FULL_NAME + " s join " + JOIN_ITEM_TABLE_FULL_NAME + " i on s.\"supplier_id\" = i.\"supplier_id\" group by s.\"supplier_id\"") + .explainIs("PhoenixToEnumerableConverter\n" + + " PhoenixServerAggregate(group=[{0}], EXPR$1=[COUNT()], isOrdered=[true])\n" + + " PhoenixServerJoin(condition=[=($0, $2)], joinType=[inner])\n" + + " PhoenixServerProject(supplier_id=[$0])\n" + + " PhoenixTableScan(table=[[phoenix, Join, SupplierTable]])\n" + + " PhoenixToClientConverter\n" + + " PhoenixServerProject(item_id=[$0], supplier_id=[$5])\n" + + " PhoenixTableScan(table=[[phoenix, Join, ItemTable]])\n") + .resultIs(new Object[][] { + {"0000000001", 2L}, + {"0000000002", 2L}, + {"0000000005", 1L}, + {"0000000006", 1L}}) + .close(); } @Test public void testDistinct() { start(false).sql("select distinct a_string from aTable") .explainIs("PhoenixToEnumerableConverter\n" + - " PhoenixServerAggregate(group=[{2}])\n" + + " PhoenixServerAggregate(group=[{2}], isOrdered=[false])\n" + " PhoenixTableScan(table=[[phoenix, ATABLE]])\n") .resultIs(new Object[][]{ {"a"}, @@ -736,7 +801,7 @@ public class CalciteIT extends BaseClientManagedTimeIT { .explainIs("PhoenixToEnumerableConverter\n" + " PhoenixClientProject(EXPR$0=[$1], A_STRING=[$0])\n" + " PhoenixCompactClientSort(sort0=[$1], sort1=[$0], dir0=[ASC], dir1=[DESC])\n" + - " PhoenixServerAggregate(group=[{2}], EXPR$0=[COUNT()])\n" + + " PhoenixServerAggregate(group=[{2}], EXPR$0=[COUNT()], isOrdered=[false])\n" + " PhoenixTableScan(table=[[phoenix, ATABLE]])\n") .resultIs(new Object[][] { {1L, "c"}, @@ -747,7 +812,7 @@ public class CalciteIT extends BaseClientManagedTimeIT { start(false).sql("select s.name, count(\"item_id\") from " + JOIN_SUPPLIER_TABLE_FULL_NAME + " s join " + JOIN_ITEM_TABLE_FULL_NAME + " i on s.\"supplier_id\" = i.\"supplier_id\" group by s.name order by count(\"item_id\"), s.name desc") .explainIs("PhoenixToEnumerableConverter\n" + " PhoenixCompactClientSort(sort0=[$1], sort1=[$0], dir0=[ASC], dir1=[DESC])\n" + - " PhoenixServerAggregate(group=[{3}], EXPR$1=[COUNT()])\n" + + " PhoenixServerAggregate(group=[{3}], EXPR$1=[COUNT()], isOrdered=[false])\n" + " PhoenixServerJoin(condition=[=($2, $1)], joinType=[inner])\n" + " PhoenixServerProject(item_id=[$0], supplier_id=[$5])\n" + " PhoenixTableScan(table=[[phoenix, Join, ItemTable]])\n" + @@ -815,7 +880,7 @@ public class CalciteIT extends BaseClientManagedTimeIT { " PhoenixClientProject(EXPR$0=[$1], A_STRING=[$0])\n" + " PhoenixLimit(fetch=[2])\n" + " PhoenixCompactClientSort(sort0=[$1], sort1=[$0], dir0=[ASC], dir1=[DESC])\n" + - " PhoenixServerAggregate(group=[{2}], EXPR$0=[COUNT()])\n" + + " PhoenixServerAggregate(group=[{2}], EXPR$0=[COUNT()], isOrdered=[false])\n" + " PhoenixTableScan(table=[[phoenix, ATABLE]])\n") .resultIs(new Object[][] { {1L, "c"}, @@ -826,7 +891,7 @@ public class CalciteIT extends BaseClientManagedTimeIT { .explainIs("PhoenixToEnumerableConverter\n" + " PhoenixLimit(fetch=[3])\n" + " PhoenixCompactClientSort(sort0=[$1], sort1=[$0], dir0=[ASC], dir1=[DESC])\n" + - " PhoenixServerAggregate(group=[{3}], EXPR$1=[COUNT()])\n" + + " PhoenixServerAggregate(group=[{3}], EXPR$1=[COUNT()], isOrdered=[false])\n" + " PhoenixServerJoin(condition=[=($2, $1)], joinType=[inner])\n" + " PhoenixServerProject(item_id=[$0], supplier_id=[$5])\n" + " PhoenixTableScan(table=[[phoenix, Join, ItemTable]])\n" + @@ -876,7 +941,7 @@ public class CalciteIT extends BaseClientManagedTimeIT { .explainIs("PhoenixToEnumerableConverter\n" + " PhoenixClientProject(EXPR$0=[$1], A_STRING=[$0])\n" + " PhoenixLimit(fetch=[2])\n" + - " PhoenixServerAggregate(group=[{2}], EXPR$0=[COUNT()])\n" + + " PhoenixServerAggregate(group=[{2}], EXPR$0=[COUNT()], isOrdered=[false])\n" + " PhoenixTableScan(table=[[phoenix, ATABLE]])\n") .resultIs(new Object[][] { {4L, "a"}, @@ -886,7 +951,7 @@ public class CalciteIT extends BaseClientManagedTimeIT { start(false).sql("select s.name, count(\"item_id\") from " + JOIN_SUPPLIER_TABLE_FULL_NAME + " s join " + JOIN_ITEM_TABLE_FULL_NAME + " i on s.\"supplier_id\" = i.\"supplier_id\" group by s.name limit 3") .explainIs("PhoenixToEnumerableConverter\n" + " PhoenixLimit(fetch=[3])\n" + - " PhoenixServerAggregate(group=[{3}], EXPR$1=[COUNT()])\n" + + " PhoenixServerAggregate(group=[{3}], EXPR$1=[COUNT()], isOrdered=[false])\n" + " PhoenixServerJoin(condition=[=($2, $1)], joinType=[inner])\n" + " PhoenixServerProject(item_id=[$0], supplier_id=[$5])\n" + " PhoenixTableScan(table=[[phoenix, Join, ItemTable]])\n" + @@ -934,7 +999,7 @@ public class CalciteIT extends BaseClientManagedTimeIT { " PhoenixToClientConverter\n" + " PhoenixServerJoin(condition=[=($0, $7)], joinType=[left], isSingleValueRhs=[true])\n" + " PhoenixTableScan(table=[[phoenix, Join, ItemTable]])\n" + - " PhoenixServerAggregate(group=[{7}], SQ=[MAX($4)])\n" + + " PhoenixServerAggregate(group=[{7}], SQ=[MAX($4)], isOrdered=[false])\n" + " PhoenixServerJoin(condition=[=($2, $7)], joinType=[inner])\n" + " PhoenixTableScan(table=[[phoenix, Join, OrderTable]])\n" + " PhoenixToClientConverter\n" + @@ -962,7 +1027,7 @@ public class CalciteIT extends BaseClientManagedTimeIT { " PhoenixToClientConverter\n" + " PhoenixServerJoin(condition=[=($2, $7)], joinType=[inner])\n" + " PhoenixTableScan(table=[[phoenix, Join, OrderTable]])\n" + - " PhoenixServerAggregate(group=[{0}])\n" + + " PhoenixServerAggregate(group=[{0}], isOrdered=[true])\n" + " PhoenixTableScan(table=[[phoenix, Join, ItemTable]], filter=[<($0, '0000000006')])\n") .resultIs(new Object[][] { new Object[] {"0000000001", "T1", 1000}, @@ -1022,6 +1087,11 @@ public class CalciteIT extends BaseClientManagedTimeIT { " PhoenixServerProject(A_STRING=[$3], B_STRING=[$0], X_INTEGER=[$10], Y_INTEGER=[$11])\n" + " PhoenixTableScan(table=[[phoenix, IDX_FULL]], filter=[=($0, 'b')])\n") .close(); + start(true).sql("select a_string, count(*) from aTable group by a_string") + .explainIs("PhoenixToEnumerableConverter\n" + + " PhoenixServerAggregate(group=[{0}], EXPR$1=[COUNT()], isOrdered=[true])\n" + + " PhoenixTableScan(table=[[phoenix, IDX1]])\n") + .close(); } @Test public void testValues() { @@ -1157,10 +1227,10 @@ public class CalciteIT extends BaseClientManagedTimeIT { " PhoenixToClientConverter\n" + " PhoenixServerJoin(condition=[AND(=($2, $7), =($4, $8))], joinType=[inner])\n" + " PhoenixTableScan(table=[[phoenix, Join, OrderTable]])\n" + - " PhoenixServerAggregate(group=[{7}], EXPR$0=[MAX($4)])\n" + + " PhoenixServerAggregate(group=[{7}], EXPR$0=[MAX($4)], isOrdered=[false])\n" + " PhoenixServerJoin(condition=[=($7, $2)], joinType=[inner])\n" + " PhoenixTableScan(table=[[phoenix, Join, OrderTable]])\n" + - " PhoenixServerAggregate(group=[{2}])\n" + + " PhoenixServerAggregate(group=[{2}], isOrdered=[false])\n" + " PhoenixTableScan(table=[[phoenix, Join, OrderTable]])\n"; start(correlProps).sql(q1).explainIs(p1Correlate).resultIs(r1).close(); start(decorrelProps).sql(q1).explainIs(p1Decorrelated).resultIs(r1).close(); @@ -1188,7 +1258,7 @@ public class CalciteIT extends BaseClientManagedTimeIT { " PhoenixToClientConverter\n" + " PhoenixServerJoin(condition=[AND(=($0, $7), =($1, $8), =($2, $9))], joinType=[inner])\n" + " PhoenixTableScan(table=[[phoenix, Join, ItemTable]])\n" + - " PhoenixServerAggregate(group=[{0, 1}], EXPR$0=[MAX($4)])\n" + + " PhoenixServerAggregate(group=[{0, 1}], EXPR$0=[MAX($4)], isOrdered=[false])\n" + " PhoenixServerJoin(condition=[AND(=($0, $2), =($1, $3))], joinType=[inner])\n" + " PhoenixServerProject(item_id=[$0], NAME=[$1])\n" + " PhoenixTableScan(table=[[phoenix, Join, ItemTable]])\n" + @@ -1298,13 +1368,13 @@ public class CalciteIT extends BaseClientManagedTimeIT { " PhoenixToClientConverter\n" + " PhoenixServerJoin(condition=[=($4, $8)], joinType=[inner])\n" + " PhoenixTableScan(table=[[phoenix, Join, OrderTable]])\n" + - " PhoenixServerAggregate(group=[{14}], EXPR$0=[MAX($4)])\n" + + " PhoenixServerAggregate(group=[{14}], EXPR$0=[MAX($4)], isOrdered=[false])\n" + " PhoenixServerJoin(condition=[=($2, $7)], joinType=[inner])\n" + " PhoenixTableScan(table=[[phoenix, Join, OrderTable]])\n" + " PhoenixToClientConverter\n" + " PhoenixServerJoin(condition=[=($7, $5)], joinType=[inner])\n" + " PhoenixTableScan(table=[[phoenix, Join, ItemTable]])\n" + - " PhoenixServerAggregate(group=[{5}])\n" + + " PhoenixServerAggregate(group=[{5}], isOrdered=[false])\n" + " PhoenixServerJoin(condition=[=($9, $0)], joinType=[inner])\n" + " PhoenixTableScan(table=[[phoenix, Join, ItemTable]])\n" + " PhoenixToClientConverter\n" + @@ -1332,10 +1402,10 @@ public class CalciteIT extends BaseClientManagedTimeIT { " PhoenixToClientConverter\n" + " PhoenixServerJoin(condition=[AND(=($0, $18), =($3, $19), =($4, $20))], joinType=[inner])\n" + " PhoenixTableScan(table=[[phoenix, ATABLE]], filter=[=($2, 'a')])\n" + - " PhoenixServerAggregate(group=[{18, 19}], EXPR$0=[MIN($4)])\n" + + " PhoenixServerAggregate(group=[{18, 19}], EXPR$0=[MIN($4)], isOrdered=[false])\n" + " PhoenixServerJoin(condition=[AND(=($18, $0), =($19, $3))], joinType=[inner])\n" + " PhoenixTableScan(table=[[phoenix, ATABLE]], filter=[=($2, 'a')])\n" + - " PhoenixServerAggregate(group=[{0, 3}])\n" + + " PhoenixServerAggregate(group=[{0, 3}], isOrdered=[false])\n" + " PhoenixTableScan(table=[[phoenix, ATABLE]], filter=[=($2, 'a')])\n"; start(correlProps).sql(q6).explainIs(p6Correlate).resultIs(r6).close(); start(decorrelProps).sql(q6).explainIs(p6Decorrelated).resultIs(r6).close(); @@ -1464,10 +1534,10 @@ public class CalciteIT extends BaseClientManagedTimeIT { " EnumerableAggregate(group=[{0}], CNT=[COUNT()])\n" + " EnumerableJoin(condition=[=($0, $11)], joinType=[inner])\n" + " PhoenixToEnumerableConverter\n" + - " PhoenixServerAggregate(group=[{6}])\n" + + " PhoenixServerAggregate(group=[{6}], isOrdered=[false])\n" + " PhoenixTableScan(table=[[phoenix, Join, OrderTable]])\n" + " JdbcToEnumerableConverter\n" + - " JdbcProject(time_id=[$0], the_date=[$1], the_day=[$2], the_month=[$3], the_year=[$4], day_of_month=[$5], week_of_year=[$6], month_of_year=[$7], quarter=[$8], fiscal_period=[$9], $f10=[CAST($4):INTEGER])\n" + + " JdbcProject(time_id=[$0], the_date=[$1], the_day=[$2], the_month=[$3], the_year=[$4], day_of_month=[$5], week_of_year=[$6], month_of_year=[$7], quarter=[$8], fiscal_period=[$9], the_year10=[CAST($4):INTEGER])\n" + " JdbcTableScan(table=[[foodmart, time_by_day]])\n") .resultIs(new Object[][] { new Object[] {1997, 1000, 365L}, http://git-wip-us.apache.org/repos/asf/phoenix/blob/2bfebfd1/phoenix-core/src/main/java/org/apache/phoenix/calcite/jdbc/PhoenixPrepareImpl.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/calcite/jdbc/PhoenixPrepareImpl.java b/phoenix-core/src/main/java/org/apache/phoenix/calcite/jdbc/PhoenixPrepareImpl.java index fc7406b..b165c40 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/calcite/jdbc/PhoenixPrepareImpl.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/calcite/jdbc/PhoenixPrepareImpl.java @@ -28,6 +28,7 @@ import org.apache.phoenix.calcite.rules.PhoenixCompactClientSortRule; import org.apache.phoenix.calcite.rules.PhoenixFilterScanMergeRule; import org.apache.phoenix.calcite.rules.PhoenixInnerSortRemoveRule; import org.apache.phoenix.calcite.rules.PhoenixJoinSingleValueAggregateMergeRule; +import org.apache.phoenix.calcite.rules.PhoenixOrderedAggregateRule; import com.google.common.base.Function; @@ -72,6 +73,7 @@ public class PhoenixPrepareImpl extends CalcitePrepareImpl { planner.addRule(PhoenixCompactClientSortRule.SORT_SERVERAGGREGATE); planner.addRule(PhoenixJoinSingleValueAggregateMergeRule.INSTANCE); planner.addRule(PhoenixInnerSortRemoveRule.INSTANCE); + planner.addRule(PhoenixOrderedAggregateRule.INSTANCE); if (prepareContext.config().materializationsEnabled()) { for (CalciteSchema subSchema : prepareContext.getRootSchema().getSubSchemaMap().values()) { http://git-wip-us.apache.org/repos/asf/phoenix/blob/2bfebfd1/phoenix-core/src/main/java/org/apache/phoenix/calcite/rel/PhoenixAbstractAggregate.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/calcite/rel/PhoenixAbstractAggregate.java b/phoenix-core/src/main/java/org/apache/phoenix/calcite/rel/PhoenixAbstractAggregate.java index b549663..17a4cea 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/calcite/rel/PhoenixAbstractAggregate.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/calcite/rel/PhoenixAbstractAggregate.java @@ -6,11 +6,16 @@ import org.apache.calcite.plan.RelOptCluster; import org.apache.calcite.plan.RelOptCost; import org.apache.calcite.plan.RelOptPlanner; import org.apache.calcite.plan.RelTraitSet; +import org.apache.calcite.rel.RelCollation; +import org.apache.calcite.rel.RelCollationTraitDef; +import org.apache.calcite.rel.RelFieldCollation; import org.apache.calcite.rel.RelNode; +import org.apache.calcite.rel.RelWriter; import org.apache.calcite.rel.core.Aggregate; import org.apache.calcite.rel.core.AggregateCall; import org.apache.calcite.util.ImmutableBitSet; import org.apache.calcite.util.ImmutableIntList; +import org.apache.calcite.util.Util; import org.apache.phoenix.calcite.CalciteUtils; import org.apache.phoenix.compile.QueryPlan; import org.apache.phoenix.compile.GroupByCompiler.GroupBy; @@ -45,6 +50,26 @@ abstract public class PhoenixAbstractAggregate extends Aggregate implements Phoe return call.getAggregation().getName().equals("SINGLE_VALUE"); } + public static boolean isOrderedGroupSet(ImmutableBitSet groupSet, RelNode child) { + List<Integer> ordinals = groupSet.asList(); + List<RelCollation> collations = child.getTraitSet().getTraits(RelCollationTraitDef.INSTANCE); + boolean isOrderedGroupBy = ordinals.isEmpty(); + for (int i = 0; i < collations.size() && !isOrderedGroupBy; i++) { + List<RelFieldCollation> fieldCollations = collations.get(i).getFieldCollations(); + List<Integer> fields = Lists.newArrayListWithExpectedSize(fieldCollations.size()); + for (RelFieldCollation fieldCollation : fieldCollations) { + fields.add(fieldCollation.getFieldIndex()); + } + if (Util.startsWith(fields, ordinals)) { + isOrderedGroupBy = true; + } + } + + return isOrderedGroupBy; + } + + public final boolean isOrderedGroupBy; + protected PhoenixAbstractAggregate(RelOptCluster cluster, RelTraitSet traits, RelNode child, boolean indicator, ImmutableBitSet groupSet, List<ImmutableBitSet> groupSets, List<AggregateCall> aggCalls) { super(cluster, traits, child, indicator, groupSet, groupSets, aggCalls); @@ -59,6 +84,8 @@ abstract public class PhoenixAbstractAggregate extends Aggregate implements Phoe default: throw new UnsupportedOperationException("unsupported group type: " + getGroupType()); } + + this.isOrderedGroupBy = isOrderedGroupSet(groupSet, child); } @Override @@ -66,7 +93,14 @@ abstract public class PhoenixAbstractAggregate extends Aggregate implements Phoe if (isSingleValueCheckAggregate(this)) return planner.getCostFactory().makeInfiniteCost(); - return super.computeSelfCost(planner); + double orderedGroupByFactor = isOrderedGroupBy ? 0.8 : 1.0; + return super.computeSelfCost(planner).multiplyBy(orderedGroupByFactor); + } + + @Override + public RelWriter explainTerms(RelWriter pw) { + return super.explainTerms(pw) + .itemIf("isOrdered", isOrderedGroupBy, !groupSet.isEmpty()); } protected ImmutableIntList getColumnRefList() { @@ -87,8 +121,13 @@ abstract public class PhoenixAbstractAggregate extends Aggregate implements Phoe } List<Integer> ordinals = groupSet.asList(); - // TODO check order-preserving - String groupExprAttribName = BaseScannerRegionObserver.UNORDERED_GROUP_BY_EXPRESSIONS; + if (ordinals.isEmpty()) { + return GroupBy.EMPTY_GROUP_BY; + } + + String groupExprAttribName = isOrderedGroupBy? + BaseScannerRegionObserver.KEY_ORDERED_GROUP_BY_EXPRESSIONS + : BaseScannerRegionObserver.UNORDERED_GROUP_BY_EXPRESSIONS; // TODO sort group by keys. not sure if there is a way to avoid this sorting, // otherwise we would have add an extra projection. // TODO convert key types. can be avoided? http://git-wip-us.apache.org/repos/asf/phoenix/blob/2bfebfd1/phoenix-core/src/main/java/org/apache/phoenix/calcite/rules/PhoenixOrderedAggregateRule.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/calcite/rules/PhoenixOrderedAggregateRule.java b/phoenix-core/src/main/java/org/apache/phoenix/calcite/rules/PhoenixOrderedAggregateRule.java new file mode 100644 index 0000000..286d58d --- /dev/null +++ b/phoenix-core/src/main/java/org/apache/phoenix/calcite/rules/PhoenixOrderedAggregateRule.java @@ -0,0 +1,77 @@ +package org.apache.phoenix.calcite.rules; + +import java.util.List; + +import org.apache.calcite.plan.RelOptRule; +import org.apache.calcite.plan.RelOptRuleCall; +import org.apache.calcite.rel.RelCollation; +import org.apache.calcite.rel.RelCollationTraitDef; +import org.apache.calcite.rel.RelNode; +import org.apache.phoenix.calcite.rel.PhoenixAbstractAggregate; +import org.apache.phoenix.calcite.rel.PhoenixRel; + +import com.google.common.base.Predicate; + +/** + * Phoenix rule that transforms an unordered Aggregate into an ordered Aggregate. + * + * The Aggregate's child could have a collation that matches the groupSet and thus + * makes the Aggregate ordered, but the Aggregate wouldn't know this matching + * collation if its child resides in a RelSubset with an empty collation. + * An option would be to use conversion rules that create a subset of a specific + * collation. But since there are so many potential collations that can match the + * groupSet and most of them are meaningless for the actual child expression, we + * do not want to make this rule a ConvertRule. + * Instead, we surface the matching child expression by using RelOptRule and + * reconstruct a new Aggregate with this child. + */ +public class PhoenixOrderedAggregateRule extends RelOptRule { + + private static Predicate<PhoenixAbstractAggregate> UNORDERED_GROUPBY = + new Predicate<PhoenixAbstractAggregate>() { + @Override + public boolean apply(PhoenixAbstractAggregate input) { + return !input.isOrderedGroupBy; + } + }; + + private static Predicate<PhoenixRel> NON_EMPTY_COLLATION = + new Predicate<PhoenixRel>() { + @Override + public boolean apply(PhoenixRel input) { + if (input.getConvention() != PhoenixRel.SERVER_CONVENTION + && input.getConvention() != PhoenixRel.SERVERJOIN_CONVENTION) + return false; + + List<RelCollation> collations = input.getTraitSet().getTraits(RelCollationTraitDef.INSTANCE); + for (RelCollation collation : collations) { + if (!collation.getFieldCollations().isEmpty()) { + return true; + } + } + return false; + } + }; + + public static final PhoenixOrderedAggregateRule INSTANCE = new PhoenixOrderedAggregateRule(); + + public PhoenixOrderedAggregateRule() { + super(operand(PhoenixAbstractAggregate.class, null, UNORDERED_GROUPBY, + operand(PhoenixRel.class, null, NON_EMPTY_COLLATION, any()))); + } + + @Override + public boolean matches(RelOptRuleCall call) { + PhoenixAbstractAggregate agg = call.rel(0); + RelNode child = call.rel(1); + return PhoenixAbstractAggregate.isOrderedGroupSet(agg.getGroupSet(), child); + } + + @Override + public void onMatch(RelOptRuleCall call) { + PhoenixAbstractAggregate agg = call.rel(0); + RelNode child = call.rel(1); + call.transformTo(agg.copy(agg.getTraitSet(), child, agg.indicator, agg.getGroupSet(), agg.groupSets, agg.getAggCallList())); + } + +}
