Repository: phoenix
Updated Branches:
  refs/heads/calcite 500e85cc1 -> 2bfebfd1d


PHOENIX-1837 Detect ordered/unordered group-by


Project: http://git-wip-us.apache.org/repos/asf/phoenix/repo
Commit: http://git-wip-us.apache.org/repos/asf/phoenix/commit/2bfebfd1
Tree: http://git-wip-us.apache.org/repos/asf/phoenix/tree/2bfebfd1
Diff: http://git-wip-us.apache.org/repos/asf/phoenix/diff/2bfebfd1

Branch: refs/heads/calcite
Commit: 2bfebfd1d24461a08246f35eb2783311e6b77f44
Parents: 500e85c
Author: maryannxue <[email protected]>
Authored: Wed Oct 21 15:12:00 2015 -0400
Committer: maryannxue <[email protected]>
Committed: Wed Oct 21 15:12:00 2015 -0400

----------------------------------------------------------------------
 .../org/apache/phoenix/calcite/CalciteIT.java   | 112 +++++++++++++++----
 .../calcite/jdbc/PhoenixPrepareImpl.java        |   2 +
 .../calcite/rel/PhoenixAbstractAggregate.java   |  45 +++++++-
 .../rules/PhoenixOrderedAggregateRule.java      |  77 +++++++++++++
 4 files changed, 212 insertions(+), 24 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/phoenix/blob/2bfebfd1/phoenix-core/src/it/java/org/apache/phoenix/calcite/CalciteIT.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/it/java/org/apache/phoenix/calcite/CalciteIT.java 
b/phoenix-core/src/it/java/org/apache/phoenix/calcite/CalciteIT.java
index 5987b32..3f001e0 100644
--- a/phoenix-core/src/it/java/org/apache/phoenix/calcite/CalciteIT.java
+++ b/phoenix-core/src/it/java/org/apache/phoenix/calcite/CalciteIT.java
@@ -647,9 +647,57 @@ public class CalciteIT extends BaseClientManagedTimeIT {
     }
     
     @Test public void testAggregate() {
+        start(false).sql("select count(b_string) from atable")
+                .explainIs("PhoenixToEnumerableConverter\n" +
+                           "  PhoenixServerAggregate(group=[{}], 
EXPR$0=[COUNT($3)])\n" +
+                           "    PhoenixTableScan(table=[[phoenix, ATABLE]])\n")
+                .resultIs(new Object[][] {
+                          {9L}})
+                .close();
+        
+        start(false).sql("select organization_id, count(b_string) from atable 
group by organization_id")
+                .explainIs("PhoenixToEnumerableConverter\n" +
+                           "  PhoenixServerAggregate(group=[{0}], 
EXPR$1=[COUNT($3)], isOrdered=[true])\n" +
+                           "    PhoenixTableScan(table=[[phoenix, ATABLE]])\n")
+                .resultIs(new Object[][] {
+                          {"00D300000000XHP", 9L}})
+                .close();
+        
+        start(false).sql("select organization_id, entity_id, count(b_string) 
from atable group by organization_id, entity_id")
+                .explainIs("PhoenixToEnumerableConverter\n" +
+                           "  PhoenixServerAggregate(group=[{0, 1}], 
EXPR$2=[COUNT($3)], isOrdered=[true])\n" +
+                           "    PhoenixTableScan(table=[[phoenix, ATABLE]])\n")
+                .resultIs(new Object[][] {
+                          {"00D300000000XHP", "00A123122312312", 1L}, 
+                          {"00D300000000XHP", "00A223122312312", 1L}, 
+                          {"00D300000000XHP", "00A323122312312", 1L}, 
+                          {"00D300000000XHP", "00A423122312312", 1L}, 
+                          {"00D300000000XHP", "00B523122312312", 1L}, 
+                          {"00D300000000XHP", "00B623122312312", 1L}, 
+                          {"00D300000000XHP", "00B723122312312", 1L}, 
+                          {"00D300000000XHP", "00B823122312312", 1L}, 
+                          {"00D300000000XHP", "00C923122312312", 1L}})
+                .close();
+        
+        start(false).sql("select entity_id, count(b_string) from atable group 
by entity_id")
+                .explainIs("PhoenixToEnumerableConverter\n" +
+                           "  PhoenixServerAggregate(group=[{1}], 
EXPR$1=[COUNT($3)], isOrdered=[false])\n" +
+                           "    PhoenixTableScan(table=[[phoenix, ATABLE]])\n")
+                .resultIs(new Object[][] {
+                        {"00A123122312312", 1L}, 
+                        {"00A223122312312", 1L}, 
+                        {"00A323122312312", 1L}, 
+                        {"00A423122312312", 1L}, 
+                        {"00B523122312312", 1L}, 
+                        {"00B623122312312", 1L}, 
+                        {"00B723122312312", 1L}, 
+                        {"00B823122312312", 1L}, 
+                        {"00C923122312312", 1L}})
+                .close();
+        
         start(false).sql("select a_string, count(b_string) from atable group 
by a_string")
                 .explainIs("PhoenixToEnumerableConverter\n" +
-                           "  PhoenixServerAggregate(group=[{2}], 
EXPR$1=[COUNT($3)])\n" +
+                           "  PhoenixServerAggregate(group=[{2}], 
EXPR$1=[COUNT($3)], isOrdered=[false])\n" +
                            "    PhoenixTableScan(table=[[phoenix, ATABLE]])\n")
                 .resultIs(new Object[][] {
                           {"a", 4L},
@@ -660,7 +708,7 @@ public class CalciteIT extends BaseClientManagedTimeIT {
         start(false).sql("select count(entity_id), a_string from atable group 
by a_string")
                 .explainIs("PhoenixToEnumerableConverter\n" +
                            "  PhoenixClientProject(EXPR$0=[$1], 
A_STRING=[$0])\n" +
-                           "    PhoenixServerAggregate(group=[{2}], 
EXPR$0=[COUNT()])\n" +
+                           "    PhoenixServerAggregate(group=[{2}], 
EXPR$0=[COUNT()], isOrdered=[false])\n" +
                            "      PhoenixTableScan(table=[[phoenix, 
ATABLE]])\n")
                 .resultIs(new Object[][] {
                           {4L, "a"},
@@ -670,7 +718,7 @@ public class CalciteIT extends BaseClientManagedTimeIT {
         
         start(false).sql("select s.name, count(\"item_id\") from " + 
JOIN_SUPPLIER_TABLE_FULL_NAME + " s join " + JOIN_ITEM_TABLE_FULL_NAME + " i on 
s.\"supplier_id\" = i.\"supplier_id\" group by s.name")
                 .explainIs("PhoenixToEnumerableConverter\n" +
-                           "  PhoenixServerAggregate(group=[{3}], 
EXPR$1=[COUNT()])\n" +
+                           "  PhoenixServerAggregate(group=[{3}], 
EXPR$1=[COUNT()], isOrdered=[false])\n" +
                            "    PhoenixServerJoin(condition=[=($2, $1)], 
joinType=[inner])\n" +
                            "      PhoenixServerProject(item_id=[$0], 
supplier_id=[$5])\n" +
                            "        PhoenixTableScan(table=[[phoenix, Join, 
ItemTable]])\n" +
@@ -683,12 +731,29 @@ public class CalciteIT extends BaseClientManagedTimeIT {
                           {"S5", 1L},
                           {"S6", 1L}})
                 .close();
+        
+        // test PhoenixOrderedAggregateRule
+        start(false).sql("select s.\"supplier_id\", count(*) from " + 
JOIN_SUPPLIER_TABLE_FULL_NAME + " s join " + JOIN_ITEM_TABLE_FULL_NAME + " i on 
s.\"supplier_id\" = i.\"supplier_id\" group by s.\"supplier_id\"")
+                .explainIs("PhoenixToEnumerableConverter\n" +
+                           "  PhoenixServerAggregate(group=[{0}], 
EXPR$1=[COUNT()], isOrdered=[true])\n" +
+                           "    PhoenixServerJoin(condition=[=($0, $2)], 
joinType=[inner])\n" +
+                           "      PhoenixServerProject(supplier_id=[$0])\n" +
+                           "        PhoenixTableScan(table=[[phoenix, Join, 
SupplierTable]])\n" +
+                           "      PhoenixToClientConverter\n" +
+                           "        PhoenixServerProject(item_id=[$0], 
supplier_id=[$5])\n" +
+                           "          PhoenixTableScan(table=[[phoenix, Join, 
ItemTable]])\n")
+                .resultIs(new Object[][] {
+                          {"0000000001", 2L},
+                          {"0000000002", 2L},
+                          {"0000000005", 1L},
+                          {"0000000006", 1L}})
+                .close();
     }
     
     @Test public void testDistinct() {
         start(false).sql("select distinct a_string from aTable")
                 .explainIs("PhoenixToEnumerableConverter\n" +
-                           "  PhoenixServerAggregate(group=[{2}])\n" +
+                           "  PhoenixServerAggregate(group=[{2}], 
isOrdered=[false])\n" +
                            "    PhoenixTableScan(table=[[phoenix, ATABLE]])\n")
                 .resultIs(new Object[][]{
                           {"a"}, 
@@ -736,7 +801,7 @@ public class CalciteIT extends BaseClientManagedTimeIT {
                 .explainIs("PhoenixToEnumerableConverter\n" +
                            "  PhoenixClientProject(EXPR$0=[$1], 
A_STRING=[$0])\n" +
                            "    PhoenixCompactClientSort(sort0=[$1], 
sort1=[$0], dir0=[ASC], dir1=[DESC])\n" +
-                           "      PhoenixServerAggregate(group=[{2}], 
EXPR$0=[COUNT()])\n" +
+                           "      PhoenixServerAggregate(group=[{2}], 
EXPR$0=[COUNT()], isOrdered=[false])\n" +
                            "        PhoenixTableScan(table=[[phoenix, 
ATABLE]])\n")
                 .resultIs(new Object[][] {
                           {1L, "c"},
@@ -747,7 +812,7 @@ public class CalciteIT extends BaseClientManagedTimeIT {
         start(false).sql("select s.name, count(\"item_id\") from " + 
JOIN_SUPPLIER_TABLE_FULL_NAME + " s join " + JOIN_ITEM_TABLE_FULL_NAME + " i on 
s.\"supplier_id\" = i.\"supplier_id\" group by s.name order by 
count(\"item_id\"), s.name desc")
                 .explainIs("PhoenixToEnumerableConverter\n" +
                            "  PhoenixCompactClientSort(sort0=[$1], sort1=[$0], 
dir0=[ASC], dir1=[DESC])\n" +
-                           "    PhoenixServerAggregate(group=[{3}], 
EXPR$1=[COUNT()])\n" +
+                           "    PhoenixServerAggregate(group=[{3}], 
EXPR$1=[COUNT()], isOrdered=[false])\n" +
                            "      PhoenixServerJoin(condition=[=($2, $1)], 
joinType=[inner])\n" +
                            "        PhoenixServerProject(item_id=[$0], 
supplier_id=[$5])\n" +
                            "          PhoenixTableScan(table=[[phoenix, Join, 
ItemTable]])\n" +
@@ -815,7 +880,7 @@ public class CalciteIT extends BaseClientManagedTimeIT {
                            "  PhoenixClientProject(EXPR$0=[$1], 
A_STRING=[$0])\n" +
                            "    PhoenixLimit(fetch=[2])\n" +
                            "      PhoenixCompactClientSort(sort0=[$1], 
sort1=[$0], dir0=[ASC], dir1=[DESC])\n" +
-                           "        PhoenixServerAggregate(group=[{2}], 
EXPR$0=[COUNT()])\n" +
+                           "        PhoenixServerAggregate(group=[{2}], 
EXPR$0=[COUNT()], isOrdered=[false])\n" +
                            "          PhoenixTableScan(table=[[phoenix, 
ATABLE]])\n")
                 .resultIs(new Object[][] {
                           {1L, "c"},
@@ -826,7 +891,7 @@ public class CalciteIT extends BaseClientManagedTimeIT {
                 .explainIs("PhoenixToEnumerableConverter\n" +
                            "  PhoenixLimit(fetch=[3])\n" +
                            "    PhoenixCompactClientSort(sort0=[$1], 
sort1=[$0], dir0=[ASC], dir1=[DESC])\n" +
-                           "      PhoenixServerAggregate(group=[{3}], 
EXPR$1=[COUNT()])\n" +
+                           "      PhoenixServerAggregate(group=[{3}], 
EXPR$1=[COUNT()], isOrdered=[false])\n" +
                            "        PhoenixServerJoin(condition=[=($2, $1)], 
joinType=[inner])\n" +
                            "          PhoenixServerProject(item_id=[$0], 
supplier_id=[$5])\n" +
                            "            PhoenixTableScan(table=[[phoenix, 
Join, ItemTable]])\n" +
@@ -876,7 +941,7 @@ public class CalciteIT extends BaseClientManagedTimeIT {
                 .explainIs("PhoenixToEnumerableConverter\n" +
                            "  PhoenixClientProject(EXPR$0=[$1], 
A_STRING=[$0])\n" +
                            "    PhoenixLimit(fetch=[2])\n" +
-                           "      PhoenixServerAggregate(group=[{2}], 
EXPR$0=[COUNT()])\n" +
+                           "      PhoenixServerAggregate(group=[{2}], 
EXPR$0=[COUNT()], isOrdered=[false])\n" +
                            "        PhoenixTableScan(table=[[phoenix, 
ATABLE]])\n")
                 .resultIs(new Object[][] {
                           {4L, "a"},
@@ -886,7 +951,7 @@ public class CalciteIT extends BaseClientManagedTimeIT {
         start(false).sql("select s.name, count(\"item_id\") from " + 
JOIN_SUPPLIER_TABLE_FULL_NAME + " s join " + JOIN_ITEM_TABLE_FULL_NAME + " i on 
s.\"supplier_id\" = i.\"supplier_id\" group by s.name limit 3")
                 .explainIs("PhoenixToEnumerableConverter\n" +
                            "  PhoenixLimit(fetch=[3])\n" +
-                           "    PhoenixServerAggregate(group=[{3}], 
EXPR$1=[COUNT()])\n" +
+                           "    PhoenixServerAggregate(group=[{3}], 
EXPR$1=[COUNT()], isOrdered=[false])\n" +
                            "      PhoenixServerJoin(condition=[=($2, $1)], 
joinType=[inner])\n" +
                            "        PhoenixServerProject(item_id=[$0], 
supplier_id=[$5])\n" +
                            "          PhoenixTableScan(table=[[phoenix, Join, 
ItemTable]])\n" +
@@ -934,7 +999,7 @@ public class CalciteIT extends BaseClientManagedTimeIT {
                        "    PhoenixToClientConverter\n" +
                        "      PhoenixServerJoin(condition=[=($0, $7)], 
joinType=[left], isSingleValueRhs=[true])\n" +
                        "        PhoenixTableScan(table=[[phoenix, Join, 
ItemTable]])\n" +
-                       "        PhoenixServerAggregate(group=[{7}], 
SQ=[MAX($4)])\n" +
+                       "        PhoenixServerAggregate(group=[{7}], 
SQ=[MAX($4)], isOrdered=[false])\n" +
                        "          PhoenixServerJoin(condition=[=($2, $7)], 
joinType=[inner])\n" +
                        "            PhoenixTableScan(table=[[phoenix, Join, 
OrderTable]])\n" +
                        "            PhoenixToClientConverter\n" +
@@ -962,7 +1027,7 @@ public class CalciteIT extends BaseClientManagedTimeIT {
                           "          PhoenixToClientConverter\n" +
                           "            PhoenixServerJoin(condition=[=($2, 
$7)], joinType=[inner])\n" +
                           "              PhoenixTableScan(table=[[phoenix, 
Join, OrderTable]])\n" +
-                          "              
PhoenixServerAggregate(group=[{0}])\n" +
+                          "              PhoenixServerAggregate(group=[{0}], 
isOrdered=[true])\n" +
                           "                PhoenixTableScan(table=[[phoenix, 
Join, ItemTable]], filter=[<($0, '0000000006')])\n")
                .resultIs(new Object[][] {
                          new Object[] {"0000000001", "T1", 1000},
@@ -1022,6 +1087,11 @@ public class CalciteIT extends BaseClientManagedTimeIT {
                        "    PhoenixServerProject(A_STRING=[$3], B_STRING=[$0], 
X_INTEGER=[$10], Y_INTEGER=[$11])\n" +
                        "      PhoenixTableScan(table=[[phoenix, IDX_FULL]], 
filter=[=($0, 'b')])\n")
             .close();
+        start(true).sql("select a_string, count(*) from aTable group by 
a_string")
+            .explainIs("PhoenixToEnumerableConverter\n" +
+                       "  PhoenixServerAggregate(group=[{0}], 
EXPR$1=[COUNT()], isOrdered=[true])\n" +
+                       "    PhoenixTableScan(table=[[phoenix, IDX1]])\n")
+            .close();
     }
     
     @Test public void testValues() {
@@ -1157,10 +1227,10 @@ public class CalciteIT extends BaseClientManagedTimeIT {
                 "    PhoenixToClientConverter\n" +
                 "      PhoenixServerJoin(condition=[AND(=($2, $7), =($4, 
$8))], joinType=[inner])\n" +
                 "        PhoenixTableScan(table=[[phoenix, Join, 
OrderTable]])\n" +
-                "        PhoenixServerAggregate(group=[{7}], 
EXPR$0=[MAX($4)])\n" +
+                "        PhoenixServerAggregate(group=[{7}], EXPR$0=[MAX($4)], 
isOrdered=[false])\n" +
                 "          PhoenixServerJoin(condition=[=($7, $2)], 
joinType=[inner])\n" +
                 "            PhoenixTableScan(table=[[phoenix, Join, 
OrderTable]])\n" +
-                "            PhoenixServerAggregate(group=[{2}])\n" +
+                "            PhoenixServerAggregate(group=[{2}], 
isOrdered=[false])\n" +
                 "              PhoenixTableScan(table=[[phoenix, Join, 
OrderTable]])\n";
         start(correlProps).sql(q1).explainIs(p1Correlate).resultIs(r1).close();
         
start(decorrelProps).sql(q1).explainIs(p1Decorrelated).resultIs(r1).close();
@@ -1188,7 +1258,7 @@ public class CalciteIT extends BaseClientManagedTimeIT {
                 "    PhoenixToClientConverter\n" +
                 "      PhoenixServerJoin(condition=[AND(=($0, $7), =($1, $8), 
=($2, $9))], joinType=[inner])\n" +
                 "        PhoenixTableScan(table=[[phoenix, Join, 
ItemTable]])\n" +
-                "        PhoenixServerAggregate(group=[{0, 1}], 
EXPR$0=[MAX($4)])\n" +
+                "        PhoenixServerAggregate(group=[{0, 1}], 
EXPR$0=[MAX($4)], isOrdered=[false])\n" +
                 "          PhoenixServerJoin(condition=[AND(=($0, $2), =($1, 
$3))], joinType=[inner])\n" +
                 "            PhoenixServerProject(item_id=[$0], NAME=[$1])\n" +
                 "              PhoenixTableScan(table=[[phoenix, Join, 
ItemTable]])\n" +
@@ -1298,13 +1368,13 @@ public class CalciteIT extends BaseClientManagedTimeIT {
                 "        PhoenixToClientConverter\n" +
                 "          PhoenixServerJoin(condition=[=($4, $8)], 
joinType=[inner])\n" +
                 "            PhoenixTableScan(table=[[phoenix, Join, 
OrderTable]])\n" +
-                "            PhoenixServerAggregate(group=[{14}], 
EXPR$0=[MAX($4)])\n" +
+                "            PhoenixServerAggregate(group=[{14}], 
EXPR$0=[MAX($4)], isOrdered=[false])\n" +
                 "              PhoenixServerJoin(condition=[=($2, $7)], 
joinType=[inner])\n" +
                 "                PhoenixTableScan(table=[[phoenix, Join, 
OrderTable]])\n" +
                 "                PhoenixToClientConverter\n" +
                 "                  PhoenixServerJoin(condition=[=($7, $5)], 
joinType=[inner])\n" +
                 "                    PhoenixTableScan(table=[[phoenix, Join, 
ItemTable]])\n" +
-                "                    PhoenixServerAggregate(group=[{5}])\n" +
+                "                    PhoenixServerAggregate(group=[{5}], 
isOrdered=[false])\n" +
                 "                      PhoenixServerJoin(condition=[=($9, 
$0)], joinType=[inner])\n" +
                 "                        PhoenixTableScan(table=[[phoenix, 
Join, ItemTable]])\n" +
                 "                        PhoenixToClientConverter\n" +
@@ -1332,10 +1402,10 @@ public class CalciteIT extends BaseClientManagedTimeIT {
                 "    PhoenixToClientConverter\n" +
                 "      PhoenixServerJoin(condition=[AND(=($0, $18), =($3, 
$19), =($4, $20))], joinType=[inner])\n" +
                 "        PhoenixTableScan(table=[[phoenix, ATABLE]], 
filter=[=($2, 'a')])\n" +
-                "        PhoenixServerAggregate(group=[{18, 19}], 
EXPR$0=[MIN($4)])\n" +
+                "        PhoenixServerAggregate(group=[{18, 19}], 
EXPR$0=[MIN($4)], isOrdered=[false])\n" +
                 "          PhoenixServerJoin(condition=[AND(=($18, $0), =($19, 
$3))], joinType=[inner])\n" +
                 "            PhoenixTableScan(table=[[phoenix, ATABLE]], 
filter=[=($2, 'a')])\n" +
-                "            PhoenixServerAggregate(group=[{0, 3}])\n" +
+                "            PhoenixServerAggregate(group=[{0, 3}], 
isOrdered=[false])\n" +
                 "              PhoenixTableScan(table=[[phoenix, ATABLE]], 
filter=[=($2, 'a')])\n";
         start(correlProps).sql(q6).explainIs(p6Correlate).resultIs(r6).close();
         
start(decorrelProps).sql(q6).explainIs(p6Decorrelated).resultIs(r6).close();
@@ -1464,10 +1534,10 @@ public class CalciteIT extends BaseClientManagedTimeIT {
                        "      EnumerableAggregate(group=[{0}], 
CNT=[COUNT()])\n" +
                        "        EnumerableJoin(condition=[=($0, $11)], 
joinType=[inner])\n" +
                        "          PhoenixToEnumerableConverter\n" +
-                       "            PhoenixServerAggregate(group=[{6}])\n" +
+                       "            PhoenixServerAggregate(group=[{6}], 
isOrdered=[false])\n" +
                        "              PhoenixTableScan(table=[[phoenix, Join, 
OrderTable]])\n" +
                        "          JdbcToEnumerableConverter\n" +
-                       "            JdbcProject(time_id=[$0], the_date=[$1], 
the_day=[$2], the_month=[$3], the_year=[$4], day_of_month=[$5], 
week_of_year=[$6], month_of_year=[$7], quarter=[$8], fiscal_period=[$9], 
$f10=[CAST($4):INTEGER])\n" +
+                       "            JdbcProject(time_id=[$0], the_date=[$1], 
the_day=[$2], the_month=[$3], the_year=[$4], day_of_month=[$5], 
week_of_year=[$6], month_of_year=[$7], quarter=[$8], fiscal_period=[$9], 
the_year10=[CAST($4):INTEGER])\n" +
                        "              JdbcTableScan(table=[[foodmart, 
time_by_day]])\n")
             .resultIs(new Object[][] {
                     new Object[] {1997, 1000, 365L}, 

http://git-wip-us.apache.org/repos/asf/phoenix/blob/2bfebfd1/phoenix-core/src/main/java/org/apache/phoenix/calcite/jdbc/PhoenixPrepareImpl.java
----------------------------------------------------------------------
diff --git 
a/phoenix-core/src/main/java/org/apache/phoenix/calcite/jdbc/PhoenixPrepareImpl.java
 
b/phoenix-core/src/main/java/org/apache/phoenix/calcite/jdbc/PhoenixPrepareImpl.java
index fc7406b..b165c40 100644
--- 
a/phoenix-core/src/main/java/org/apache/phoenix/calcite/jdbc/PhoenixPrepareImpl.java
+++ 
b/phoenix-core/src/main/java/org/apache/phoenix/calcite/jdbc/PhoenixPrepareImpl.java
@@ -28,6 +28,7 @@ import 
org.apache.phoenix.calcite.rules.PhoenixCompactClientSortRule;
 import org.apache.phoenix.calcite.rules.PhoenixFilterScanMergeRule;
 import org.apache.phoenix.calcite.rules.PhoenixInnerSortRemoveRule;
 import 
org.apache.phoenix.calcite.rules.PhoenixJoinSingleValueAggregateMergeRule;
+import org.apache.phoenix.calcite.rules.PhoenixOrderedAggregateRule;
 
 import com.google.common.base.Function;
 
@@ -72,6 +73,7 @@ public class PhoenixPrepareImpl extends CalcitePrepareImpl {
         planner.addRule(PhoenixCompactClientSortRule.SORT_SERVERAGGREGATE);
         planner.addRule(PhoenixJoinSingleValueAggregateMergeRule.INSTANCE);
         planner.addRule(PhoenixInnerSortRemoveRule.INSTANCE);
+        planner.addRule(PhoenixOrderedAggregateRule.INSTANCE);
         
         if (prepareContext.config().materializationsEnabled()) {
             for (CalciteSchema subSchema : 
prepareContext.getRootSchema().getSubSchemaMap().values()) {

http://git-wip-us.apache.org/repos/asf/phoenix/blob/2bfebfd1/phoenix-core/src/main/java/org/apache/phoenix/calcite/rel/PhoenixAbstractAggregate.java
----------------------------------------------------------------------
diff --git 
a/phoenix-core/src/main/java/org/apache/phoenix/calcite/rel/PhoenixAbstractAggregate.java
 
b/phoenix-core/src/main/java/org/apache/phoenix/calcite/rel/PhoenixAbstractAggregate.java
index b549663..17a4cea 100644
--- 
a/phoenix-core/src/main/java/org/apache/phoenix/calcite/rel/PhoenixAbstractAggregate.java
+++ 
b/phoenix-core/src/main/java/org/apache/phoenix/calcite/rel/PhoenixAbstractAggregate.java
@@ -6,11 +6,16 @@ import org.apache.calcite.plan.RelOptCluster;
 import org.apache.calcite.plan.RelOptCost;
 import org.apache.calcite.plan.RelOptPlanner;
 import org.apache.calcite.plan.RelTraitSet;
+import org.apache.calcite.rel.RelCollation;
+import org.apache.calcite.rel.RelCollationTraitDef;
+import org.apache.calcite.rel.RelFieldCollation;
 import org.apache.calcite.rel.RelNode;
+import org.apache.calcite.rel.RelWriter;
 import org.apache.calcite.rel.core.Aggregate;
 import org.apache.calcite.rel.core.AggregateCall;
 import org.apache.calcite.util.ImmutableBitSet;
 import org.apache.calcite.util.ImmutableIntList;
+import org.apache.calcite.util.Util;
 import org.apache.phoenix.calcite.CalciteUtils;
 import org.apache.phoenix.compile.QueryPlan;
 import org.apache.phoenix.compile.GroupByCompiler.GroupBy;
@@ -45,6 +50,26 @@ abstract public class PhoenixAbstractAggregate extends 
Aggregate implements Phoe
         return call.getAggregation().getName().equals("SINGLE_VALUE");
     }
     
+    public static boolean isOrderedGroupSet(ImmutableBitSet groupSet, RelNode 
child) {
+        List<Integer> ordinals = groupSet.asList();
+        List<RelCollation> collations = 
child.getTraitSet().getTraits(RelCollationTraitDef.INSTANCE);
+        boolean isOrderedGroupBy = ordinals.isEmpty();
+        for (int i = 0; i < collations.size() && !isOrderedGroupBy; i++) {
+            List<RelFieldCollation> fieldCollations = 
collations.get(i).getFieldCollations();
+            List<Integer> fields = 
Lists.newArrayListWithExpectedSize(fieldCollations.size());
+            for (RelFieldCollation fieldCollation : fieldCollations) {
+                fields.add(fieldCollation.getFieldIndex());
+            }
+            if (Util.startsWith(fields, ordinals)) {
+                isOrderedGroupBy = true;
+            }
+        }
+        
+        return isOrderedGroupBy;
+    }
+    
+    public final boolean isOrderedGroupBy;
+    
     protected PhoenixAbstractAggregate(RelOptCluster cluster, RelTraitSet 
traits, RelNode child, boolean indicator, ImmutableBitSet groupSet, 
List<ImmutableBitSet> groupSets, List<AggregateCall> aggCalls) {
         super(cluster, traits, child, indicator, groupSet, groupSets, 
aggCalls);
 
@@ -59,6 +84,8 @@ abstract public class PhoenixAbstractAggregate extends 
Aggregate implements Phoe
             default:
                 throw new UnsupportedOperationException("unsupported group 
type: " + getGroupType());
         }
+        
+        this.isOrderedGroupBy = isOrderedGroupSet(groupSet, child);
     }
     
     @Override
@@ -66,7 +93,14 @@ abstract public class PhoenixAbstractAggregate extends 
Aggregate implements Phoe
         if (isSingleValueCheckAggregate(this))
             return planner.getCostFactory().makeInfiniteCost();
         
-        return super.computeSelfCost(planner);
+        double orderedGroupByFactor = isOrderedGroupBy ? 0.8 : 1.0;
+        return super.computeSelfCost(planner).multiplyBy(orderedGroupByFactor);
+    }
+
+    @Override
+    public RelWriter explainTerms(RelWriter pw) {
+        return super.explainTerms(pw)
+            .itemIf("isOrdered", isOrderedGroupBy, !groupSet.isEmpty());
     }
     
     protected ImmutableIntList getColumnRefList() {
@@ -87,8 +121,13 @@ abstract public class PhoenixAbstractAggregate extends 
Aggregate implements Phoe
         }
         
         List<Integer> ordinals = groupSet.asList();
-        // TODO check order-preserving
-        String groupExprAttribName = 
BaseScannerRegionObserver.UNORDERED_GROUP_BY_EXPRESSIONS;
+        if (ordinals.isEmpty()) {
+            return GroupBy.EMPTY_GROUP_BY;
+        }
+        
+        String groupExprAttribName = isOrderedGroupBy?
+                BaseScannerRegionObserver.KEY_ORDERED_GROUP_BY_EXPRESSIONS
+              : BaseScannerRegionObserver.UNORDERED_GROUP_BY_EXPRESSIONS;
         // TODO sort group by keys. not sure if there is a way to avoid this 
sorting,
         //      otherwise we would have add an extra projection.
         // TODO convert key types. can be avoided?

http://git-wip-us.apache.org/repos/asf/phoenix/blob/2bfebfd1/phoenix-core/src/main/java/org/apache/phoenix/calcite/rules/PhoenixOrderedAggregateRule.java
----------------------------------------------------------------------
diff --git 
a/phoenix-core/src/main/java/org/apache/phoenix/calcite/rules/PhoenixOrderedAggregateRule.java
 
b/phoenix-core/src/main/java/org/apache/phoenix/calcite/rules/PhoenixOrderedAggregateRule.java
new file mode 100644
index 0000000..286d58d
--- /dev/null
+++ 
b/phoenix-core/src/main/java/org/apache/phoenix/calcite/rules/PhoenixOrderedAggregateRule.java
@@ -0,0 +1,77 @@
+package org.apache.phoenix.calcite.rules;
+
+import java.util.List;
+
+import org.apache.calcite.plan.RelOptRule;
+import org.apache.calcite.plan.RelOptRuleCall;
+import org.apache.calcite.rel.RelCollation;
+import org.apache.calcite.rel.RelCollationTraitDef;
+import org.apache.calcite.rel.RelNode;
+import org.apache.phoenix.calcite.rel.PhoenixAbstractAggregate;
+import org.apache.phoenix.calcite.rel.PhoenixRel;
+
+import com.google.common.base.Predicate;
+
+/**
+ * Phoenix rule that transforms an unordered Aggregate into an ordered 
Aggregate.
+ * 
+ * The Aggregate's child could have a collation that matches the groupSet and 
thus
+ * makes the Aggregate ordered, but the Aggregate wouldn't know this matching 
+ * collation if its child resides in a RelSubset with an empty collation.
+ * An option would be to use conversion rules that create a subset of a 
specific
+ * collation. But since there are so many potential collations that can match 
the
+ * groupSet and most of them are meaningless for the actual child expression, 
we
+ * do not want to make this rule a ConvertRule.
+ * Instead, we surface the matching child expression by using RelOptRule and
+ * reconstruct a new Aggregate with this child.
+ */
+public class PhoenixOrderedAggregateRule extends RelOptRule {
+    
+    private static Predicate<PhoenixAbstractAggregate> UNORDERED_GROUPBY =
+            new Predicate<PhoenixAbstractAggregate>() {
+                @Override
+                public boolean apply(PhoenixAbstractAggregate input) {
+                    return !input.isOrderedGroupBy;
+                }
+    };
+    
+    private static Predicate<PhoenixRel> NON_EMPTY_COLLATION =
+            new Predicate<PhoenixRel>() {
+                @Override
+                public boolean apply(PhoenixRel input) {
+                    if (input.getConvention() != PhoenixRel.SERVER_CONVENTION
+                            && input.getConvention() != 
PhoenixRel.SERVERJOIN_CONVENTION)
+                        return false;
+                    
+                    List<RelCollation> collations = 
input.getTraitSet().getTraits(RelCollationTraitDef.INSTANCE);
+                    for (RelCollation collation : collations) {
+                        if (!collation.getFieldCollations().isEmpty()) {
+                            return true;
+                        }
+                    }
+                    return false;
+                }
+    };
+    
+    public static final PhoenixOrderedAggregateRule INSTANCE = new 
PhoenixOrderedAggregateRule();
+    
+    public PhoenixOrderedAggregateRule() {
+        super(operand(PhoenixAbstractAggregate.class, null, UNORDERED_GROUPBY,
+                operand(PhoenixRel.class, null, NON_EMPTY_COLLATION, any())));
+    }
+    
+    @Override
+    public boolean matches(RelOptRuleCall call) {
+        PhoenixAbstractAggregate agg = call.rel(0);
+        RelNode child = call.rel(1);
+        return PhoenixAbstractAggregate.isOrderedGroupSet(agg.getGroupSet(), 
child);
+    }
+
+    @Override
+    public void onMatch(RelOptRuleCall call) {
+        PhoenixAbstractAggregate agg = call.rel(0);
+        RelNode child = call.rel(1);
+        call.transformTo(agg.copy(agg.getTraitSet(), child, agg.indicator, 
agg.getGroupSet(), agg.groupSets, agg.getAggCallList()));
+    }
+
+}

Reply via email to