Repository: calcite
Updated Branches:
  refs/heads/master 9c6dc773e -> be78de942


[CALCITE-2097] Druid adapter: Push Aggregate and Filter operators containing 
metric columns to Druid

Close apache/calcite#585


Project: http://git-wip-us.apache.org/repos/asf/calcite/repo
Commit: http://git-wip-us.apache.org/repos/asf/calcite/commit/be78de94
Tree: http://git-wip-us.apache.org/repos/asf/calcite/tree/be78de94
Diff: http://git-wip-us.apache.org/repos/asf/calcite/diff/be78de94

Branch: refs/heads/master
Commit: be78de9420c6588d04c4bedf885615f138448d53
Parents: 9c6dc77
Author: Slim <[email protected]>
Authored: Fri Dec 15 19:52:26 2017 -0800
Committer: Jesus Camacho Rodriguez <[email protected]>
Committed: Thu Dec 21 10:45:49 2017 -0800

----------------------------------------------------------------------
 .../adapter/druid/DruidConnectionImpl.java      |   7 +-
 .../calcite/adapter/druid/DruidQuery.java       |  26 ++---
 .../calcite/adapter/druid/DruidRules.java       |  22 +---
 .../org/apache/calcite/test/DruidAdapterIT.java | 109 ++++++++++++++-----
 4 files changed, 94 insertions(+), 70 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/calcite/blob/be78de94/druid/src/main/java/org/apache/calcite/adapter/druid/DruidConnectionImpl.java
----------------------------------------------------------------------
diff --git 
a/druid/src/main/java/org/apache/calcite/adapter/druid/DruidConnectionImpl.java 
b/druid/src/main/java/org/apache/calcite/adapter/druid/DruidConnectionImpl.java
index 91fdf90..3c52fce 100644
--- 
a/druid/src/main/java/org/apache/calcite/adapter/druid/DruidConnectionImpl.java
+++ 
b/druid/src/main/java/org/apache/calcite/adapter/druid/DruidConnectionImpl.java
@@ -356,6 +356,7 @@ class DruidConnectionImpl implements DruidConnection {
           case "NaN":
             throw new RuntimeException("/ by zero");
           }
+          rowBuilder.set(i, Long.valueOf(s));
           break;
         case FLOAT:
         case PRIMITIVE_FLOAT:
@@ -373,10 +374,12 @@ class DruidConnectionImpl implements DruidConnection {
             rowBuilder.set(i, Double.NaN);
             return;
           }
+          rowBuilder.set(i, Double.valueOf(s));
+          break;
         }
+      } else {
+        rowBuilder.set(i, s);
       }
-      rowBuilder.set(i, s);
-      break;
     }
   }
 

http://git-wip-us.apache.org/repos/asf/calcite/blob/be78de94/druid/src/main/java/org/apache/calcite/adapter/druid/DruidQuery.java
----------------------------------------------------------------------
diff --git 
a/druid/src/main/java/org/apache/calcite/adapter/druid/DruidQuery.java 
b/druid/src/main/java/org/apache/calcite/adapter/druid/DruidQuery.java
index 6664f00..01cf440 100644
--- a/druid/src/main/java/org/apache/calcite/adapter/druid/DruidQuery.java
+++ b/druid/src/main/java/org/apache/calcite/adapter/druid/DruidQuery.java
@@ -204,23 +204,13 @@ public class DruidQuery extends AbstractRelNode 
implements BindableRel {
   }
 
   public boolean isValidFilter(RexNode e) {
-    return isValidFilter(e, false, null);
+    return isValidFilter(e, false);
   }
 
-  public boolean isValidFilter(RexNode e, RelNode input) {
-    return isValidFilter(e, false, input);
-  }
-
-  public boolean isValidFilter(RexNode e, boolean boundedComparator, RelNode 
input) {
+  public boolean isValidFilter(RexNode e, boolean boundedComparator) {
     switch (e.getKind()) {
     case INPUT_REF:
-      if (input == null) {
-        return true;
-      }
-      int nameIndex = ((RexInputRef) e).getIndex();
-      String name = input.getRowType().getFieldList().get(nameIndex).getName();
-      // Druid can't filter on metrics
-      return !druidTable.isMetric(name);
+      return true;
     case LITERAL:
       return ((RexLiteral) e).getValue() != null;
     case AND:
@@ -229,7 +219,7 @@ public class DruidQuery extends AbstractRelNode implements 
BindableRel {
     case IN:
     case IS_NULL:
     case IS_NOT_NULL:
-      return areValidFilters(((RexCall) e).getOperands(), false, input);
+      return areValidFilters(((RexCall) e).getOperands(), false);
     case EQUALS:
     case NOT_EQUALS:
     case LESS_THAN:
@@ -237,21 +227,21 @@ public class DruidQuery extends AbstractRelNode 
implements BindableRel {
     case GREATER_THAN:
     case GREATER_THAN_OR_EQUAL:
     case BETWEEN:
-      return areValidFilters(((RexCall) e).getOperands(), true, input);
+      return areValidFilters(((RexCall) e).getOperands(), true);
     case CAST:
       return isValidCast((RexCall) e, boundedComparator);
     case EXTRACT:
       return TimeExtractionFunction.isValidTimeExtract((RexCall) e);
     case IS_TRUE:
-      return isValidFilter(((RexCall) e).getOperands().get(0), 
boundedComparator, input);
+      return isValidFilter(((RexCall) e).getOperands().get(0), 
boundedComparator);
     default:
       return false;
     }
   }
 
-  private boolean areValidFilters(List<RexNode> es, boolean boundedComparator, 
RelNode input) {
+  private boolean areValidFilters(List<RexNode> es, boolean boundedComparator) 
{
     for (RexNode e : es) {
-      if (!isValidFilter(e, boundedComparator, input)) {
+      if (!isValidFilter(e, boundedComparator)) {
         return false;
       }
     }

http://git-wip-us.apache.org/repos/asf/calcite/blob/be78de94/druid/src/main/java/org/apache/calcite/adapter/druid/DruidRules.java
----------------------------------------------------------------------
diff --git 
a/druid/src/main/java/org/apache/calcite/adapter/druid/DruidRules.java 
b/druid/src/main/java/org/apache/calcite/adapter/druid/DruidRules.java
index 4996970..ae0f8fb 100644
--- a/druid/src/main/java/org/apache/calcite/adapter/druid/DruidRules.java
+++ b/druid/src/main/java/org/apache/calcite/adapter/druid/DruidRules.java
@@ -308,19 +308,7 @@ public class DruidRules {
             timeRangeNodes.add(conj);
           }
         } else {
-          boolean filterOnMetrics = false;
-          for (Integer i : visitor.inputPosReferenced) {
-            if 
(input.druidTable.isMetric(input.getRowType().getFieldList().get(i).getName())) 
{
-              // Filter on metrics, not supported in Druid
-              filterOnMetrics = true;
-              break;
-            }
-          }
-          if (filterOnMetrics) {
-            nonPushableNodes.add(conj);
-          } else {
-            pushableNodes.add(conj);
-          }
+          pushableNodes.add(conj);
         }
       }
       return ImmutableTriple.of(timeRangeNodes, pushableNodes, 
nonPushableNodes);
@@ -668,9 +656,6 @@ public class DruidRules {
       for (AggregateCall aggCall : aggregate.getAggCallList()) {
         builder.addAll(aggCall.getArgList());
       }
-      if (checkAggregateOnMetric(aggregate.getGroupSet(), aggregate, query)) {
-        return false;
-      }
       return !checkTimestampRefOnQuery(builder.build(), query.getTopNode(), 
query);
     }
   }
@@ -713,7 +698,7 @@ public class DruidRules {
       // into Druid
       for (Integer i : filterRefs) {
         RexNode filterNode = project.getProjects().get(i);
-        if (!query.isValidFilter(filterNode, project.getInput()) || 
filterNode.isAlwaysFalse()) {
+        if (!query.isValidFilter(filterNode) || filterNode.isAlwaysFalse()) {
           return;
         }
       }
@@ -724,9 +709,6 @@ public class DruidRules {
               || !validAggregate(aggregate, timestampIdx, filterRefs.size())) {
         return;
       }
-      if (checkAggregateOnMetric(aggregate.getGroupSet(), project, query)) {
-        return;
-      }
       final RelNode newProject = project.copy(project.getTraitSet(),
               ImmutableList.of(Util.last(query.rels)));
       final RelNode newAggregate = aggregate.copy(aggregate.getTraitSet(),

http://git-wip-us.apache.org/repos/asf/calcite/blob/be78de94/druid/src/test/java/org/apache/calcite/test/DruidAdapterIT.java
----------------------------------------------------------------------
diff --git a/druid/src/test/java/org/apache/calcite/test/DruidAdapterIT.java 
b/druid/src/test/java/org/apache/calcite/test/DruidAdapterIT.java
index c78d3b1..cc33100 100644
--- a/druid/src/test/java/org/apache/calcite/test/DruidAdapterIT.java
+++ b/druid/src/test/java/org/apache/calcite/test/DruidAdapterIT.java
@@ -423,11 +423,13 @@ public class DruidAdapterIT {
         .queryContains(druidChecker(druidQuery));
   }
 
-  @Ignore("TODO: fix invalid cast from Integer to Long")
   @Test public void testSelectGroupBySum() {
-    final String explain = "PLAN="
-        + "EnumerableInterpreter\n"
-        + "  DruidQuery(table=[[foodmart, foodmart]], projects=[[$29, 
CAST($88):INTEGER]], groups=[{0}], aggs=[[SUM($1)]])";
+    final String explain = "PLAN=EnumerableInterpreter\n"
+        + "  BindableAggregate(group=[{0}], U=[SUM($1)])\n"
+        + "    BindableProject(state_province=[$0], $f1=[CAST($1):INTEGER])\n"
+        + "      DruidQuery(table=[[foodmart, foodmart]], "
+        + "intervals=[[1900-01-09T00:00:00.000Z/2992-01-10T00:00:00.000Z]],"
+        + " projects=[[$30, $89]])";
     final String sql = "select \"state_province\", sum(cast(\"unit_sales\" as 
integer)) as u\n"
         + "from \"foodmart\"\n"
         + "group by \"state_province\"";
@@ -441,16 +443,16 @@ public class DruidAdapterIT {
   @Test public void testGroupbyMetric() {
     final String sql = "select  \"store_sales\" ,\"product_id\" from 
\"foodmart\" "
             + "where \"product_id\" = 1020" + "group by \"store_sales\" 
,\"product_id\" ";
-    final String plan = "PLAN=EnumerableInterpreter\n  
BindableAggregate(group=[{0, 1}])\n"
-            + "    DruidQuery(table=[[foodmart, foodmart]], "
-            + 
"intervals=[[1900-01-09T00:00:00.000Z/2992-01-10T00:00:00.000Z]], filter=[=($1, 
1020)],"
-            + " projects=[[$90, $1]])\n";
-    final String druidQuery = 
"{'queryType':'select','dataSource':'foodmart','descending':false,"
-            + 
"'intervals':['1900-01-09T00:00:00.000Z/2992-01-10T00:00:00.000Z'],"
-            + 
"'filter':{'type':'selector','dimension':'product_id','value':'1020'},"
-            + 
"'dimensions':['product_id'],'metrics':['store_sales'],'granularity':'all',"
-            + "'pagingSpec':{'threshold':16384,'fromNext':true},"
-            + "'context':{'druid.query.fetch':false}}";
+    final String plan = "PLAN=EnumerableInterpreter\n"
+        + "  DruidQuery(table=[[foodmart, foodmart]], "
+        + "intervals=[[1900-01-09T00:00:00.000Z/2992-01-10T00:00:00.000Z]], 
filter=[=($1, 1020)],"
+        + " projects=[[$90, $1]], groups=[{0, 1}], aggs=[[]])";
+    final String druidQuery = 
"{'queryType':'groupBy','dataSource':'foodmart','granularity':'all',"
+            + "'dimensions':[{'type':'default','dimension':'store_sales'},"
+            + 
"{'type':'default','dimension':'product_id'}],'limitSpec':{'type':'default'},'"
+            + 
"filter':{'type':'selector','dimension':'product_id','value':'1020'},"
+            + "'aggregations':[],"
+            + 
"'intervals':['1900-01-09T00:00:00.000Z/2992-01-10T00:00:00.000Z']}";
     sql(sql)
         .explainContains(plan)
         .queryContains(druidChecker(druidQuery))
@@ -1462,7 +1464,7 @@ public class DruidAdapterIT {
         + "FROM \"foodmart\"\n"
         + "GROUP BY \"store_sales\", floor(\"timestamp\" to DAY)\n ORDER BY 
\"store_sales\" DESC\n"
         + "LIMIT 10\n";
-    sql(sql).queryContains(druidChecker("{\"queryType\":\"select\""));
+    sql(sql).queryContains(druidChecker("{\"queryType\":\"groupBy\""));
   }
 
   @Test public void testFilterOnDouble() {
@@ -2833,16 +2835,59 @@ public class DruidAdapterIT {
     sql(sql).explainContains(expectedSubExplain);
   }
 
-  /**
-   * Test to ensure that aggregations with metrics as filters do not get 
pushed into Druid
-   */
   @Test public void testFilterClauseWithMetricRef() {
     String sql = "select sum(\"store_sales\") filter (where \"store_cost\" > 
10) from \"foodmart\"";
     String expectedSubExplain =
-            "  BindableAggregate(group=[{}], EXPR$0=[SUM($0) FILTER $1])\n"
-                    + "    BindableProject(store_sales=[$0], $f1=[IS 
TRUE(>($1, 10))])\n";
+            "PLAN=EnumerableInterpreter\n"
+                + "  DruidQuery(table=[[foodmart, foodmart]], "
+                + 
"intervals=[[1900-01-09T00:00:00.000Z/2992-01-10T00:00:00.000Z]], filter=[>"
+                + "($91, 10)], projects=[[$90, IS TRUE(>($91, 10))]], 
groups=[{}], aggs=[[SUM($0)"
+                + "]])";
 
-    sql(sql).explainContains(expectedSubExplain);
+    sql(sql)
+        .explainContains(expectedSubExplain)
+        .queryContains(
+            druidChecker("\"queryType\":\"timeseries\"", 
"\"filter\":{\"type\":\"bound\","
+                + 
"\"dimension\":\"store_cost\",\"lower\":\"10\",\"lowerStrict\":true,"
+                + "\"ordering\":\"numeric\"}"))
+        .returnsUnordered("EXPR$0=25.060000000000002");
+  }
+
+  @Test public void testFilterClauseWithMetricRefAndAggregates() {
+    String sql = "select sum(\"store_sales\"), \"product_id\" "
+        + "from \"foodmart\" where \"product_id\" > 1553 and \"store_cost\" > 
5 group by \"product_id\"";
+    String expectedSubExplain =
+        "PLAN=EnumerableInterpreter\n"
+            + "  BindableProject(EXPR$0=[$1], product_id=[$0])\n"
+            + "    DruidQuery(table=[[foodmart, foodmart]], "
+            + 
"intervals=[[1900-01-09T00:00:00.000Z/2992-01-10T00:00:00.000Z]], filter=[AND(>"
+            + "(CAST($1):BIGINT, 1553), >($91, 5))], groups=[{1}], 
aggs=[[SUM($90)]])";
+
+    sql(sql)
+        .explainContains(expectedSubExplain)
+        .queryContains(
+            druidChecker("\"queryType\":\"groupBy\"", "{\"type\":\"bound\","
+                + 
"\"dimension\":\"store_cost\",\"lower\":\"5\",\"lowerStrict\":true,"
+                + "\"ordering\":\"numeric\"}"))
+        .returnsUnordered("EXPR$0=10.16; product_id=1554\n"
+            + "EXPR$0=45.05; product_id=1556\n"
+            + "EXPR$0=88.5; product_id=1555");
+  }
+
+  @Test public void testFilterClauseWithMetricAndTimeAndAggregates() {
+    String sql = "select sum(\"store_sales\"), \"product_id\""
+        + "from \"foodmart\" where \"product_id\" > 1555 and \"store_cost\" > 
5 and extract(year "
+        + "from \"timestamp\") = 1997 "
+        + "group by floor(\"timestamp\" to DAY),\"product_id\"";
+    sql(sql)
+        .queryContains(
+            druidChecker("\"queryType\":\"groupBy\"", "{\"type\":\"bound\","
+                + 
"\"dimension\":\"store_cost\",\"lower\":\"5\",\"lowerStrict\":true,"
+                + "\"ordering\":\"numeric\"}"))
+        .returnsUnordered("EXPR$0=10.6; product_id=1556\n"
+            + "EXPR$0=10.6; product_id=1556\n"
+            + "EXPR$0=10.6; product_id=1556\n"
+            + "EXPR$0=13.25; product_id=1556");
   }
 
   /**
@@ -2999,11 +3044,14 @@ public class DruidAdapterIT {
   @Test public void testDistinctCountOnMetric() {
     String sql = "select count(distinct \"store_sales\") from \"foodmart\" "
         + "where \"store_state\" = 'WA'";
-    String expectedSubExplain = "  BindableAggregate(group=[{}], 
EXPR$0=[COUNT($0)])\n"
-        + "    BindableAggregate(group=[{1}])";
+    String expectedSubExplain = "PLAN=EnumerableInterpreter\n"
+        + "  BindableAggregate(group=[{}], EXPR$0=[COUNT($0)])\n"
+        + "    DruidQuery(table=[[foodmart, foodmart]], "
+        + "intervals=[[1900-01-09T00:00:00.000Z/2992-01-10T00:00:00.000Z]], 
filter=[=($63, 'WA')"
+        + "], groups=[{90}], aggs=[[]])";
 
-    testCountWithApproxDistinct(true, sql, expectedSubExplain);
-    testCountWithApproxDistinct(false, sql, expectedSubExplain);
+    testCountWithApproxDistinct(true, sql, expectedSubExplain, 
"\"queryType\":\"groupBy\"");
+    testCountWithApproxDistinct(false, sql, expectedSubExplain, 
"\"queryType\":\"groupBy\"");
   }
 
   /**
@@ -3053,13 +3101,14 @@ public class DruidAdapterIT {
     String sql = "select \"B\", count(distinct \"A\") from "
         + "(select \"unit_sales\" as \"A\", \"store_state\" as \"B\" from 
\"foodmart\") "
         + "group by \"B\"";
-    String expectedSubExplain = "  BindableAggregate(group=[{0}], 
EXPR$1=[COUNT($1)])\n"
-        + "    DruidQuery(table=[[foodmart, foodmart]], 
intervals=[[1900-01-09T00:00:"
-        + "00.000Z/2992-01-10T00:00:00.000Z]], projects=[[$63, $89]], 
groups=[{0, 1}], "
+    String expectedSubExplain = "PLAN=EnumerableInterpreter\n"
+        + "  BindableAggregate(group=[{0}], EXPR$1=[COUNT($1)])\n"
+        + "    DruidQuery(table=[[foodmart, foodmart]], "
+        + "intervals=[[1900-01-09T00:00:00.000Z/2992-01-10T00:00:00.000Z]], 
groups=[{63, 89}], "
         + "aggs=[[]])";
 
-    testCountWithApproxDistinct(true, sql, expectedSubExplain);
-    testCountWithApproxDistinct(false, sql, expectedSubExplain);
+    testCountWithApproxDistinct(true, sql, expectedSubExplain, 
"\"queryType\":\"groupBy\"");
+    testCountWithApproxDistinct(false, sql, expectedSubExplain, 
"\"queryType\":\"groupBy\"");
   }
 
   private void testCountWithApproxDistinct(boolean approx, String sql, String 
expectedExplain) {

Reply via email to