Repository: phoenix
Updated Branches:
  refs/heads/calcite 9677aeb94 -> f14d5a707


PHOENIX-3488 Support COUNT(DISTINCT x) in Phoenix-Calcite Integration


Project: http://git-wip-us.apache.org/repos/asf/phoenix/repo
Commit: http://git-wip-us.apache.org/repos/asf/phoenix/commit/f14d5a70
Tree: http://git-wip-us.apache.org/repos/asf/phoenix/tree/f14d5a70
Diff: http://git-wip-us.apache.org/repos/asf/phoenix/diff/f14d5a70

Branch: refs/heads/calcite
Commit: f14d5a707011bc6da9e0409b9be71975ad72ea42
Parents: 9677aeb
Author: Eric Lomore <eric.lom...@gmail.com>
Authored: Thu Dec 1 16:49:34 2016 -0800
Committer: maryannxue <maryann....@gmail.com>
Committed: Thu Dec 1 16:49:34 2016 -0800

----------------------------------------------------------------------
 .../org/apache/phoenix/calcite/CalciteIT.java   | 46 ++++++++++++++++++++
 .../phoenix/calcite/PhoenixPrepareImpl.java     |  2 +
 .../calcite/rel/PhoenixAbstractAggregate.java   | 18 +++-----
 .../calcite/rules/PhoenixConverterRules.java    |  7 +--
 4 files changed, 58 insertions(+), 15 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/phoenix/blob/f14d5a70/phoenix-core/src/it/java/org/apache/phoenix/calcite/CalciteIT.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/it/java/org/apache/phoenix/calcite/CalciteIT.java 
b/phoenix-core/src/it/java/org/apache/phoenix/calcite/CalciteIT.java
index 142fa1b..6154a34 100644
--- a/phoenix-core/src/it/java/org/apache/phoenix/calcite/CalciteIT.java
+++ b/phoenix-core/src/it/java/org/apache/phoenix/calcite/CalciteIT.java
@@ -867,6 +867,52 @@ public class CalciteIT extends BaseCalciteIT {
                 .close();
     }
 
+    @Test public void testCountDistinct() throws Exception {
+        start(false, 1000f).sql("select count(distinct a_string) from aTable")
+                .explainIs("PhoenixToEnumerableConverter\n" +
+                        "  PhoenixServerAggregate(group=[{}], 
EXPR$0=[COUNT(DISTINCT $2)])\n" +
+                        "    PhoenixTableScan(table=[[phoenix, ATABLE]])\n")
+                .resultIs(new Object[][] {{3L}})
+                .close();
+
+        start(false, 1000f).sql("select a_string, count(distinct b_string) 
from atable group by a_string")
+                .explainIs("PhoenixToEnumerableConverter\n" +
+                        "  PhoenixServerAggregate(group=[{2}], 
EXPR$1=[COUNT(DISTINCT $3)], isOrdered=[false])\n" +
+                        "    PhoenixTableScan(table=[[phoenix, ATABLE]])\n")
+                .resultIs(new Object[][] {
+                        {"a", 3L},
+                        {"b", 3L},
+                        {"c", 1L}})
+                .close();
+
+        start(false, 1000f).sql("select organization_id, entity_id, 
count(distinct b_string) from atable group by entity_id ,organization_id")
+                .explainIs("PhoenixToEnumerableConverter\n" +
+                        "  PhoenixServerAggregate(group=[{0, 1}], 
EXPR$2=[COUNT(DISTINCT $3)], isOrdered=[true])\n" +
+                        "    PhoenixTableScan(table=[[phoenix, ATABLE]], 
scanOrder=[FORWARD])\n")
+                .resultIs(new Object[][] {
+                        {"00D300000000XHP", "00A123122312312", 1L},
+                        {"00D300000000XHP", "00A223122312312", 1L},
+                        {"00D300000000XHP", "00A323122312312", 1L},
+                        {"00D300000000XHP", "00A423122312312", 1L},
+                        {"00D300000000XHP", "00B523122312312", 1L},
+                        {"00D300000000XHP", "00B623122312312", 1L},
+                        {"00D300000000XHP", "00B723122312312", 1L},
+                        {"00D300000000XHP", "00B823122312312", 1L},
+                        {"00D300000000XHP", "00C923122312312", 1L}})
+                .close();
+
+        start(false, 1000f).sql("select organization_id, count(distinct 
entity_id), b_string from atable group by organization_id, b_string")
+                .explainIs("PhoenixToEnumerableConverter\n" +
+                        "  PhoenixClientProject(ORGANIZATION_ID=[$0], 
EXPR$1=[$2], B_STRING=[$1])\n" +
+                        "    PhoenixServerAggregate(group=[{0, 3}], 
EXPR$1=[COUNT(DISTINCT $1)], isOrdered=[false])\n" +
+                        "      PhoenixTableScan(table=[[phoenix, ATABLE]])\n")
+                .resultIs(0, new Object[][] {
+                        {"00D300000000XHP", 3L, "b"},
+                        {"00D300000000XHP", 3L, "c"},
+                        {"00D300000000XHP", 3L, "e"}})
+                .close();
+    }
+
     @Test public void testOffset() throws Exception {
         start(false, 1000f).sql(
                 "select organization_id, entity_id, a_string from aTable 
offset 3")

http://git-wip-us.apache.org/repos/asf/phoenix/blob/f14d5a70/phoenix-core/src/main/java/org/apache/phoenix/calcite/PhoenixPrepareImpl.java
----------------------------------------------------------------------
diff --git 
a/phoenix-core/src/main/java/org/apache/phoenix/calcite/PhoenixPrepareImpl.java 
b/phoenix-core/src/main/java/org/apache/phoenix/calcite/PhoenixPrepareImpl.java
index dc4e29e..1489ae8 100644
--- 
a/phoenix-core/src/main/java/org/apache/phoenix/calcite/PhoenixPrepareImpl.java
+++ 
b/phoenix-core/src/main/java/org/apache/phoenix/calcite/PhoenixPrepareImpl.java
@@ -20,6 +20,7 @@ import org.apache.calcite.prepare.Prepare.Materialization;
 import org.apache.calcite.rel.RelNode;
 import org.apache.calcite.rel.convert.ConverterRule;
 import org.apache.calcite.rel.logical.LogicalSort;
+import org.apache.calcite.rel.rules.AggregateExpandDistinctAggregatesRule;
 import org.apache.calcite.rel.rules.JoinCommuteRule;
 import org.apache.calcite.rel.rules.SortProjectTransposeRule;
 import org.apache.calcite.rel.rules.SortUnionTransposeRule;
@@ -158,6 +159,7 @@ public class PhoenixPrepareImpl extends CalcitePrepareImpl {
         
         planner.removeRule(EnumerableRules.ENUMERABLE_SEMI_JOIN_RULE);
         planner.removeRule(JoinCommuteRule.INSTANCE);
+        planner.removeRule(AggregateExpandDistinctAggregatesRule.INSTANCE);
         planner.addRule(JoinCommuteRule.SWAP_OUTER);
         planner.removeRule(SortUnionTransposeRule.INSTANCE);
         planner.addRule(SortUnionTransposeRule.MATCH_NULL_FETCH);

http://git-wip-us.apache.org/repos/asf/phoenix/blob/f14d5a70/phoenix-core/src/main/java/org/apache/phoenix/calcite/rel/PhoenixAbstractAggregate.java
----------------------------------------------------------------------
diff --git 
a/phoenix-core/src/main/java/org/apache/phoenix/calcite/rel/PhoenixAbstractAggregate.java
 
b/phoenix-core/src/main/java/org/apache/phoenix/calcite/rel/PhoenixAbstractAggregate.java
index 023e9f1..5520efc 100644
--- 
a/phoenix-core/src/main/java/org/apache/phoenix/calcite/rel/PhoenixAbstractAggregate.java
+++ 
b/phoenix-core/src/main/java/org/apache/phoenix/calcite/rel/PhoenixAbstractAggregate.java
@@ -30,6 +30,8 @@ import org.apache.phoenix.expression.RowKeyColumnExpression;
 import org.apache.phoenix.expression.aggregator.ClientAggregators;
 import org.apache.phoenix.expression.aggregator.ServerAggregators;
 import org.apache.phoenix.expression.function.AggregateFunction;
+import org.apache.phoenix.expression.function.CountAggregateFunction;
+import org.apache.phoenix.expression.function.DistinctCountAggregateFunction;
 import org.apache.phoenix.expression.function.SingleAggregateFunction;
 import org.apache.phoenix.schema.PTable;
 import org.apache.phoenix.schema.RowKeyValueAccessor;
@@ -82,18 +84,6 @@ abstract public class PhoenixAbstractAggregate extends 
Aggregate implements Phoe
     
     protected PhoenixAbstractAggregate(RelOptCluster cluster, RelTraitSet 
traits, RelNode child, boolean indicator, ImmutableBitSet groupSet, 
List<ImmutableBitSet> groupSets, List<AggregateCall> aggCalls) {
         super(cluster, traits, child, indicator, groupSet, groupSets, 
aggCalls);
-
-        for (AggregateCall aggCall : aggCalls) {
-            if (aggCall.isDistinct()) {
-                throw new UnsupportedOperationException( "distinct aggregation 
not supported");
-            }
-        }
-        switch (getGroupType()) {
-            case SIMPLE:
-                break;
-            default:
-                throw new UnsupportedOperationException("unsupported group 
type: " + getGroupType());
-        }
         
         this.isOrderedGroupBy = isOrderedGroupSet(groupSet, child);
     }
@@ -162,10 +152,14 @@ abstract public class PhoenixAbstractAggregate extends 
Aggregate implements Phoe
     }
     
     protected void serializeAggregators(PhoenixRelImplementor implementor, 
StatementContext context, boolean isEmptyGroupBy) {
+        if(getGroupType() != Group.SIMPLE) throw new 
UnsupportedOperationException();
         // TODO sort aggFuncs. same problem with group by key sorting.
         List<SingleAggregateFunction> aggFuncs = Lists.newArrayList();
         for (AggregateCall call : aggCalls) {
             AggregateFunction aggFunc = 
CalciteUtils.toAggregateFunction(call.getAggregation(), call.getArgList(), 
implementor);
+            if(aggFunc instanceof CountAggregateFunction && call.isDistinct()){
+                aggFunc = new 
DistinctCountAggregateFunction(aggFunc.getChildren(), null);
+            }
             if (!(aggFunc instanceof SingleAggregateFunction)) {
                 throw new UnsupportedOperationException();
             }

http://git-wip-us.apache.org/repos/asf/phoenix/blob/f14d5a70/phoenix-core/src/main/java/org/apache/phoenix/calcite/rules/PhoenixConverterRules.java
----------------------------------------------------------------------
diff --git 
a/phoenix-core/src/main/java/org/apache/phoenix/calcite/rules/PhoenixConverterRules.java
 
b/phoenix-core/src/main/java/org/apache/phoenix/calcite/rules/PhoenixConverterRules.java
index ee444fd..68db713 100644
--- 
a/phoenix-core/src/main/java/org/apache/phoenix/calcite/rules/PhoenixConverterRules.java
+++ 
b/phoenix-core/src/main/java/org/apache/phoenix/calcite/rules/PhoenixConverterRules.java
@@ -37,6 +37,7 @@ import org.apache.calcite.rel.logical.LogicalTableModify;
 import org.apache.calcite.rel.logical.LogicalUnion;
 import org.apache.calcite.rel.logical.LogicalValues;
 import org.apache.calcite.rex.RexNode;
+import org.apache.calcite.sql.SqlKind;
 import org.apache.phoenix.calcite.CalciteUtils;
 import org.apache.phoenix.calcite.PhoenixTable;
 import org.apache.phoenix.calcite.rel.PhoenixAbstractAggregate;
@@ -899,13 +900,13 @@ public class PhoenixConverterRules {
         if (input.getGroupSets().size() > 1)
             return false;
         
-        if (input.containsDistinctCall())
-            return false;
-        
         if (input.getGroupType() != Group.SIMPLE)
             return false;
         
         for (AggregateCall aggCall : input.getAggCallList()) {
+            if(!SqlKind.COUNT.equals(aggCall.getAggregation().getKind()) && 
aggCall.isDistinct()) {
+                return false;
+            }
             if 
(!CalciteUtils.isAggregateFunctionSupported(aggCall.getAggregation())) {
                 return false;
             }

Reply via email to