Repository: calcite
Updated Branches:
  refs/heads/master 82cdc2fa7 -> 8eb7d0ff2


[CALCITE-2101] Druid adapter: Push count(column) using Druid filtered aggregate

Close apache/calcite#586


Project: http://git-wip-us.apache.org/repos/asf/calcite/repo
Commit: http://git-wip-us.apache.org/repos/asf/calcite/commit/8eb7d0ff
Tree: http://git-wip-us.apache.org/repos/asf/calcite/tree/8eb7d0ff
Diff: http://git-wip-us.apache.org/repos/asf/calcite/diff/8eb7d0ff

Branch: refs/heads/master
Commit: 8eb7d0ff2a6067b7555ef63ddc5422d6523f45cb
Parents: 82cdc2f
Author: Slim <[email protected]>
Authored: Tue Dec 19 11:10:25 2017 -0800
Committer: Jesus Camacho Rodriguez <[email protected]>
Committed: Wed Dec 20 12:58:01 2017 -0800

----------------------------------------------------------------------
 .../calcite/adapter/druid/DruidQuery.java       | 11 ++-
 .../calcite/adapter/druid/DruidRules.java       | 23 ++++--
 .../org/apache/calcite/test/DruidAdapterIT.java | 75 ++++++++++++--------
 3 files changed, 73 insertions(+), 36 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/calcite/blob/8eb7d0ff/druid/src/main/java/org/apache/calcite/adapter/druid/DruidQuery.java
----------------------------------------------------------------------
diff --git 
a/druid/src/main/java/org/apache/calcite/adapter/druid/DruidQuery.java 
b/druid/src/main/java/org/apache/calcite/adapter/druid/DruidQuery.java
index 8a6524c..6664f00 100644
--- a/druid/src/main/java/org/apache/calcite/adapter/druid/DruidQuery.java
+++ b/druid/src/main/java/org/apache/calcite/adapter/druid/DruidQuery.java
@@ -875,7 +875,16 @@ public class DruidQuery extends AbstractRelNode implements 
BindableRel {
               + " because an approximate count distinct is not acceptable.");
         }
       }
-      aggregation = new JsonAggregation("count", name, only);
+      if (aggCall.getArgList().size() == 1) {
+        // case we have count(column) push it as count(*) where column is not 
null
+        final JsonFilter matchNulls = new JsonSelector(only, null, null);
+        final JsonFilter filterOutNulls = new 
JsonCompositeFilter(JsonFilter.Type.NOT, matchNulls);
+        aggregation = new JsonFilteredAggregation(filterOutNulls,
+            new JsonAggregation("count", name, only));
+      } else {
+        aggregation = new JsonAggregation("count", name, only);
+      }
+
       break;
     case SUM:
     case SUM0:

http://git-wip-us.apache.org/repos/asf/calcite/blob/8eb7d0ff/druid/src/main/java/org/apache/calcite/adapter/druid/DruidRules.java
----------------------------------------------------------------------
diff --git 
a/druid/src/main/java/org/apache/calcite/adapter/druid/DruidRules.java 
b/druid/src/main/java/org/apache/calcite/adapter/druid/DruidRules.java
index 128f1aa..4996970 100644
--- a/druid/src/main/java/org/apache/calcite/adapter/druid/DruidRules.java
+++ b/druid/src/main/java/org/apache/calcite/adapter/druid/DruidRules.java
@@ -140,18 +140,29 @@ public class DruidRules {
           for (AggregateCall aggregateCall : aggregate.getAggCallList()) {
             switch (aggregateCall.getAggregation().getKind()) {
             case COUNT:
-              // Druid can handle 2 scenarios:
+              // Druid count aggregator can handle 3 scenarios:
               // 1. count(distinct col) when approximate results
-              //    are acceptable and col is not a metric
+              //    are acceptable and col is not a metric.
+              //    Note that exact count(distinct column) is handled
+              //    by being rewritten into group by followed by count
               // 2. count(*)
+              // 3. count(column)
+
               if 
(checkAggregateOnMetric(ImmutableBitSet.of(aggregateCall.getArgList()),
                       node, query)) {
                 return true;
               }
-              if ((aggregateCall.isDistinct()
-                      && (aggregateCall.isApproximate()
-                          || config.approximateDistinctCount()))
-                  || aggregateCall.getArgList().isEmpty()) {
+              // case count(*)
+              if (aggregateCall.getArgList().isEmpty()) {
+                continue;
+              }
+              // case count(column)
+              if (aggregateCall.getArgList().size() == 1 && 
!aggregateCall.isDistinct()) {
+                continue;
+              }
+              // case count(distinct and is approximate)
+              if (aggregateCall.isDistinct()
+                      && (aggregateCall.isApproximate() || 
config.approximateDistinctCount())) {
                 continue;
               }
               return true;

http://git-wip-us.apache.org/repos/asf/calcite/blob/8eb7d0ff/druid/src/test/java/org/apache/calcite/test/DruidAdapterIT.java
----------------------------------------------------------------------
diff --git a/druid/src/test/java/org/apache/calcite/test/DruidAdapterIT.java 
b/druid/src/test/java/org/apache/calcite/test/DruidAdapterIT.java
index 2df8e57..c78d3b1 100644
--- a/druid/src/test/java/org/apache/calcite/test/DruidAdapterIT.java
+++ b/druid/src/test/java/org/apache/calcite/test/DruidAdapterIT.java
@@ -1047,11 +1047,17 @@ public class DruidAdapterIT {
     final String sql = "select sum(\"unit_sales\") as s,\n"
         + " count(\"store_sqft\") as c\n"
         + "from \"foodmart\"\n"
-        + "group by floor(\"timestamp\" to MONTH)";
-    String druidQuery = "{'queryType':'select','dataSource':'foodmart'";
+        + "group by floor(\"timestamp\" to MONTH) order by s";
+    String druidQuery = "{'queryType':'timeseries','dataSource':'foodmart'";
     sql(sql)
         .limit(3)
-        .returnsUnordered("S=21081; C=5793", "S=23763; C=6762", "S=25270; 
C=7026")
+        .explainContains("PLAN=EnumerableInterpreter\n"
+            + "  BindableSort(sort0=[$0], dir0=[ASC])\n"
+            + "    BindableProject(S=[$1], C=[$2])\n"
+            + "      DruidQuery(table=[[foodmart, foodmart]], "
+            + 
"intervals=[[1900-01-09T00:00:00.000Z/2992-01-10T00:00:00.000Z]], 
projects=[[FLOOR"
+            + "($0, FLAG(MONTH)), $89, $71]], groups=[{0}], aggs=[[SUM($1), 
COUNT($2)]])")
+        .returnsOrdered("S=19958; C=5606", "S=20179; C=5523", "S=20388; 
C=5591")
         .queryContains(druidChecker(druidQuery));
   }
 
@@ -1065,12 +1071,11 @@ public class DruidAdapterIT {
         + "group by floor(\"timestamp\" to MONTH)\n"
         + "order by floor(\"timestamp\" to MONTH) ASC";
     final String explain = "PLAN=EnumerableInterpreter\n"
-        + "  BindableSort(sort0=[$2], dir0=[ASC])\n"
-        + "    BindableProject(S=[$1], C=[$2], EXPR$2=[$0])\n"
-        + "      BindableAggregate(group=[{0}], S=[SUM($1)], C=[COUNT($2)])\n"
-        + "        BindableProject($f0=[FLOOR($0, FLAG(MONTH))], 
unit_sales=[$2], store_sqft=[$1])\n"
-        + "          DruidQuery(table=[[foodmart, foodmart]], "
-        + "intervals=[[1900-01-09T00:00:00.000Z/2992-01-10T00:00:00.000Z]], 
projects=[[$0, $71, $89]])";
+        + "  BindableProject(S=[$1], C=[$2], EXPR$2=[$0])\n"
+        + "    DruidQuery(table=[[foodmart, foodmart]], "
+        + "intervals=[[1900-01-09T00:00:00.000Z/2992-01-10T00:00:00.000Z]], 
projects=[[FLOOR($0, "
+        + "FLAG(MONTH)), $89, $71]], groups=[{0}], aggs=[[SUM($1), 
COUNT($2)]], sort0=[0], "
+        + "dir0=[ASC])";
     sql(sql)
         .explainContains(explain)
         .returnsOrdered("S=21628; C=5957",
@@ -1094,11 +1099,13 @@ public class DruidAdapterIT {
         + "from \"foodmart\"\n"
         + "group by floor(\"timestamp\" to MONTH)\n"
         + "order by floor(\"timestamp\" to MONTH) limit 3";
-    final String explain = "BindableSort(sort0=[$0], dir0=[ASC], fetch=[3])\n"
-        + "      BindableAggregate(group=[{0}], S=[SUM($1)], C=[COUNT($2)])\n"
-        + "        BindableProject($f0=[FLOOR($0, FLAG(MONTH))], 
unit_sales=[$2], store_sqft=[$1])\n"
-        + "          DruidQuery(table=[[foodmart, foodmart]], "
-        + "intervals=[[1900-01-09T00:00:00.000Z/2992-01-10T00:00:00.000Z]], 
projects=[[$0, $71, $89]])";
+    final String explain = "PLAN=EnumerableInterpreter\n"
+        + "  BindableProject(M=[CAST($0):TIMESTAMP(0) NOT NULL], S=[$1], 
C=[$2], EXPR$3=[$0])\n"
+        + "    BindableSort(sort0=[$0], dir0=[ASC], fetch=[3])\n"
+        + "      DruidQuery(table=[[foodmart, foodmart]], "
+        + "intervals=[[1900-01-09T00:00:00.000Z/2992-01-10T00:00:00.000Z]], 
projects=[[FLOOR($0, "
+        + "FLAG(MONTH)), $89, $71]], groups=[{0}], aggs=[[SUM($1), 
COUNT($2)]], sort0=[0], "
+        + "dir0=[ASC])";
     sql(sql)
         .returnsOrdered("M=1997-01-01 00:00:00; S=21628; C=5957",
             "M=1997-02-01 00:00:00; S=20957; C=5842",
@@ -1110,12 +1117,12 @@ public class DruidAdapterIT {
     final String sql = "select sum(\"unit_sales\") as s,\n"
         + " count(\"store_sqft\") as c\n"
         + "from \"foodmart\"\n"
-        + "group by floor(\"timestamp\" to DAY)";
-    String druidQuery = "{'queryType':'select','dataSource':'foodmart'";
+        + "group by floor(\"timestamp\" to DAY) order by c desc";
+    String druidQuery = "{'queryType':'timeseries','dataSource':'foodmart'";
     sql(sql)
         .limit(3)
         .queryContains(druidChecker(druidQuery))
-        .returnsUnordered("S=1244; C=391", "S=550; C=112", "S=580; C=171");
+        .returnsOrdered("S=3850; C=1230", "S=3342; C=1071", "S=3219; C=1024");
   }
 
   @Test public void testGroupByMonthGranularityFiltered() {
@@ -1124,11 +1131,11 @@ public class DruidAdapterIT {
         + "from \"foodmart\"\n"
         + "where \"timestamp\" >= '1996-01-01 00:00:00 UTC' and "
         + " \"timestamp\" < '1998-01-01 00:00:00 UTC'\n"
-        + "group by floor(\"timestamp\" to MONTH)";
-    String druidQuery = "{'queryType':'select','dataSource':'foodmart'";
+        + "group by floor(\"timestamp\" to MONTH) order by s asc";
+    String druidQuery = "{'queryType':'timeseries','dataSource':'foodmart'";
     sql(sql)
         .limit(3)
-        .returnsUnordered("S=21081; C=5793", "S=23763; C=6762", "S=25270; 
C=7026")
+        .returnsOrdered("S=19958; C=5606", "S=20179; C=5523", "S=20388; 
C=5591")
         .queryContains(druidChecker(druidQuery));
   }
 
@@ -2910,22 +2917,31 @@ public class DruidAdapterIT {
     final String sql2 = "SELECT count(\"countryName\") FROM (SELECT 
\"countryName\" FROM "
         + "\"wikiticker\") as a";
     final String plan2 = "PLAN=EnumerableInterpreter\n"
-        + "  BindableAggregate(group=[{}], EXPR$0=[COUNT($0)])\n"
-        + "    DruidQuery(table=[[wiki, wikiticker]], "
-        + "intervals=[[1900-01-01T00:00:00.000Z/3000-01-01T00:00:00.000Z]], 
projects=[[$7]])";
+        + "  DruidQuery(table=[[wiki, wikiticker]], "
+        + "intervals=[[1900-01-01T00:00:00.000Z/3000-01-01T00:00:00.000Z]], 
projects=[[$7]], "
+        + "groups=[{}], aggs=[[COUNT($0)]])";
     sql(sql2, WIKI_AUTO2)
         .returnsUnordered("EXPR$0=3799")
         .explainContains(plan2);
 
     final String sql3 = "SELECT count(*), count(\"countryName\") FROM 
\"wikiticker\"";
     final String plan3 = "PLAN=EnumerableInterpreter\n"
-        + "  BindableAggregate(group=[{}], EXPR$0=[COUNT()], 
EXPR$1=[COUNT($0)])\n"
-        + "    DruidQuery(table=[[wiki, wikiticker]], "
-        + "intervals=[[1900-01-01T00:00:00.000Z/3000-01-01T00:00:00.000Z]], 
projects=[[$7]])";
+        + "  DruidQuery(table=[[wiki, wikiticker]], "
+        + "intervals=[[1900-01-01T00:00:00.000Z/3000-01-01T00:00:00.000Z]], 
projects=[[$7]], "
+        + "groups=[{}], aggs=[[COUNT(), COUNT($0)]])";
     sql(sql3, WIKI_AUTO2)
         .explainContains(plan3);
   }
 
+
+  @Test public void testCountColumn2() {
+    final String sql = "SELECT count(\"countryName\") FROM (SELECT 
\"countryName\" FROM "
+        + "\"wikiticker\" WHERE \"countryName\"  IS NOT NULL) as a";
+    sql(sql, WIKI_AUTO2)
+        .queryContains(druidChecker("timeseries"))
+        .returnsUnordered("EXPR$0=3799");
+  }
+
   @Test
   public void testCountWithNonNull() {
     final String sql = "select count(\"timestamp\") from \"foodmart\"\n";
@@ -3024,9 +3040,10 @@ public class DruidAdapterIT {
     String sql = "select \"B\", count(\"A\") from "
         + "(select \"unit_sales\" as \"A\", \"store_state\" as \"B\" from 
\"foodmart\") "
         + "group by \"B\"";
-    String expectedSubExplain = "  BindableAggregate(group=[{0}], 
EXPR$1=[COUNT($1)])\n"
-        + "    DruidQuery(table=[[foodmart, foodmart]], 
intervals=[[1900-01-09T00:00:00.000Z"
-        + "/2992-01-10T00:00:00.000Z]], projects=[[$63, $89]])\n";
+    String expectedSubExplain = "PLAN=EnumerableInterpreter\n"
+        + "  DruidQuery(table=[[foodmart, foodmart]], "
+        + "intervals=[[1900-01-09T00:00:00.000Z/2992-01-10T00:00:00.000Z]], 
projects=[[$63, "
+        + "$89]], groups=[{0}], aggs=[[COUNT($1)]]";
 
     testCountWithApproxDistinct(true, sql, expectedSubExplain);
     testCountWithApproxDistinct(false, sql, expectedSubExplain);

Reply via email to