Repository: calcite Updated Branches: refs/heads/master 9c6dc773e -> be78de942
[CALCITE-2097] Druid adapter: Push Aggregate and Filter operators containing metric columns to Druid Close apache/calcite#585 Project: http://git-wip-us.apache.org/repos/asf/calcite/repo Commit: http://git-wip-us.apache.org/repos/asf/calcite/commit/be78de94 Tree: http://git-wip-us.apache.org/repos/asf/calcite/tree/be78de94 Diff: http://git-wip-us.apache.org/repos/asf/calcite/diff/be78de94 Branch: refs/heads/master Commit: be78de9420c6588d04c4bedf885615f138448d53 Parents: 9c6dc77 Author: Slim <[email protected]> Authored: Fri Dec 15 19:52:26 2017 -0800 Committer: Jesus Camacho Rodriguez <[email protected]> Committed: Thu Dec 21 10:45:49 2017 -0800 ---------------------------------------------------------------------- .../adapter/druid/DruidConnectionImpl.java | 7 +- .../calcite/adapter/druid/DruidQuery.java | 26 ++--- .../calcite/adapter/druid/DruidRules.java | 22 +--- .../org/apache/calcite/test/DruidAdapterIT.java | 109 ++++++++++++++----- 4 files changed, 94 insertions(+), 70 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/calcite/blob/be78de94/druid/src/main/java/org/apache/calcite/adapter/druid/DruidConnectionImpl.java ---------------------------------------------------------------------- diff --git a/druid/src/main/java/org/apache/calcite/adapter/druid/DruidConnectionImpl.java b/druid/src/main/java/org/apache/calcite/adapter/druid/DruidConnectionImpl.java index 91fdf90..3c52fce 100644 --- a/druid/src/main/java/org/apache/calcite/adapter/druid/DruidConnectionImpl.java +++ b/druid/src/main/java/org/apache/calcite/adapter/druid/DruidConnectionImpl.java @@ -356,6 +356,7 @@ class DruidConnectionImpl implements DruidConnection { case "NaN": throw new RuntimeException("/ by zero"); } + rowBuilder.set(i, Long.valueOf(s)); break; case FLOAT: case PRIMITIVE_FLOAT: @@ -373,10 +374,12 @@ class DruidConnectionImpl implements DruidConnection { rowBuilder.set(i, Double.NaN); return; } + rowBuilder.set(i, Double.valueOf(s)); + break; } + } else { + rowBuilder.set(i, s); } - rowBuilder.set(i, s); - break; } } http://git-wip-us.apache.org/repos/asf/calcite/blob/be78de94/druid/src/main/java/org/apache/calcite/adapter/druid/DruidQuery.java ---------------------------------------------------------------------- diff --git a/druid/src/main/java/org/apache/calcite/adapter/druid/DruidQuery.java b/druid/src/main/java/org/apache/calcite/adapter/druid/DruidQuery.java index 6664f00..01cf440 100644 --- a/druid/src/main/java/org/apache/calcite/adapter/druid/DruidQuery.java +++ b/druid/src/main/java/org/apache/calcite/adapter/druid/DruidQuery.java @@ -204,23 +204,13 @@ public class DruidQuery extends AbstractRelNode implements BindableRel { } public boolean isValidFilter(RexNode e) { - return isValidFilter(e, false, null); + return isValidFilter(e, false); } - public boolean isValidFilter(RexNode e, RelNode input) { - return isValidFilter(e, false, input); - } - - public boolean isValidFilter(RexNode e, boolean boundedComparator, RelNode input) { + public boolean isValidFilter(RexNode e, boolean boundedComparator) { switch (e.getKind()) { case INPUT_REF: - if (input == null) { - return true; - } - int nameIndex = ((RexInputRef) e).getIndex(); - String name = input.getRowType().getFieldList().get(nameIndex).getName(); - // Druid can't filter on metrics - return !druidTable.isMetric(name); + return true; case LITERAL: return ((RexLiteral) e).getValue() != null; case AND: @@ -229,7 +219,7 @@ public class DruidQuery extends AbstractRelNode implements BindableRel { case IN: case IS_NULL: case IS_NOT_NULL: - return areValidFilters(((RexCall) e).getOperands(), false, input); + return areValidFilters(((RexCall) e).getOperands(), false); case EQUALS: case NOT_EQUALS: case LESS_THAN: @@ -237,21 +227,21 @@ public class DruidQuery extends AbstractRelNode implements BindableRel { case GREATER_THAN: case GREATER_THAN_OR_EQUAL: case BETWEEN: - return areValidFilters(((RexCall) e).getOperands(), true, input); + return areValidFilters(((RexCall) e).getOperands(), true); case CAST: return isValidCast((RexCall) e, boundedComparator); case EXTRACT: return TimeExtractionFunction.isValidTimeExtract((RexCall) e); case IS_TRUE: - return isValidFilter(((RexCall) e).getOperands().get(0), boundedComparator, input); + return isValidFilter(((RexCall) e).getOperands().get(0), boundedComparator); default: return false; } } - private boolean areValidFilters(List<RexNode> es, boolean boundedComparator, RelNode input) { + private boolean areValidFilters(List<RexNode> es, boolean boundedComparator) { for (RexNode e : es) { - if (!isValidFilter(e, boundedComparator, input)) { + if (!isValidFilter(e, boundedComparator)) { return false; } } http://git-wip-us.apache.org/repos/asf/calcite/blob/be78de94/druid/src/main/java/org/apache/calcite/adapter/druid/DruidRules.java ---------------------------------------------------------------------- diff --git a/druid/src/main/java/org/apache/calcite/adapter/druid/DruidRules.java b/druid/src/main/java/org/apache/calcite/adapter/druid/DruidRules.java index 4996970..ae0f8fb 100644 --- a/druid/src/main/java/org/apache/calcite/adapter/druid/DruidRules.java +++ b/druid/src/main/java/org/apache/calcite/adapter/druid/DruidRules.java @@ -308,19 +308,7 @@ public class DruidRules { timeRangeNodes.add(conj); } } else { - boolean filterOnMetrics = false; - for (Integer i : visitor.inputPosReferenced) { - if (input.druidTable.isMetric(input.getRowType().getFieldList().get(i).getName())) { - // Filter on metrics, not supported in Druid - filterOnMetrics = true; - break; - } - } - if (filterOnMetrics) { - nonPushableNodes.add(conj); - } else { - pushableNodes.add(conj); - } + pushableNodes.add(conj); } } return ImmutableTriple.of(timeRangeNodes, pushableNodes, nonPushableNodes); @@ -668,9 +656,6 @@ public class DruidRules { for (AggregateCall aggCall : aggregate.getAggCallList()) { builder.addAll(aggCall.getArgList()); } - if (checkAggregateOnMetric(aggregate.getGroupSet(), aggregate, query)) { - return false; - } return !checkTimestampRefOnQuery(builder.build(), query.getTopNode(), query); } } @@ -713,7 +698,7 @@ public class DruidRules { // into Druid for (Integer i : filterRefs) { RexNode filterNode = project.getProjects().get(i); - if (!query.isValidFilter(filterNode, project.getInput()) || filterNode.isAlwaysFalse()) { + if (!query.isValidFilter(filterNode) || filterNode.isAlwaysFalse()) { return; } } @@ -724,9 +709,6 @@ public class DruidRules { || !validAggregate(aggregate, timestampIdx, filterRefs.size())) { return; } - if (checkAggregateOnMetric(aggregate.getGroupSet(), project, query)) { - return; - } final RelNode newProject = project.copy(project.getTraitSet(), ImmutableList.of(Util.last(query.rels))); final RelNode newAggregate = aggregate.copy(aggregate.getTraitSet(), http://git-wip-us.apache.org/repos/asf/calcite/blob/be78de94/druid/src/test/java/org/apache/calcite/test/DruidAdapterIT.java ---------------------------------------------------------------------- diff --git a/druid/src/test/java/org/apache/calcite/test/DruidAdapterIT.java b/druid/src/test/java/org/apache/calcite/test/DruidAdapterIT.java index c78d3b1..cc33100 100644 --- a/druid/src/test/java/org/apache/calcite/test/DruidAdapterIT.java +++ b/druid/src/test/java/org/apache/calcite/test/DruidAdapterIT.java @@ -423,11 +423,13 @@ public class DruidAdapterIT { .queryContains(druidChecker(druidQuery)); } - @Ignore("TODO: fix invalid cast from Integer to Long") @Test public void testSelectGroupBySum() { - final String explain = "PLAN=" - + "EnumerableInterpreter\n" - + " DruidQuery(table=[[foodmart, foodmart]], projects=[[$29, CAST($88):INTEGER]], groups=[{0}], aggs=[[SUM($1)]])"; + final String explain = "PLAN=EnumerableInterpreter\n" + + " BindableAggregate(group=[{0}], U=[SUM($1)])\n" + + " BindableProject(state_province=[$0], $f1=[CAST($1):INTEGER])\n" + + " DruidQuery(table=[[foodmart, foodmart]], " + + "intervals=[[1900-01-09T00:00:00.000Z/2992-01-10T00:00:00.000Z]]," + + " projects=[[$30, $89]])"; final String sql = "select \"state_province\", sum(cast(\"unit_sales\" as integer)) as u\n" + "from \"foodmart\"\n" + "group by \"state_province\""; @@ -441,16 +443,16 @@ public class DruidAdapterIT { @Test public void testGroupbyMetric() { final String sql = "select \"store_sales\" ,\"product_id\" from \"foodmart\" " + "where \"product_id\" = 1020" + "group by \"store_sales\" ,\"product_id\" "; - final String plan = "PLAN=EnumerableInterpreter\n BindableAggregate(group=[{0, 1}])\n" - + " DruidQuery(table=[[foodmart, foodmart]], " - + "intervals=[[1900-01-09T00:00:00.000Z/2992-01-10T00:00:00.000Z]], filter=[=($1, 1020)]," - + " projects=[[$90, $1]])\n"; - final String druidQuery = "{'queryType':'select','dataSource':'foodmart','descending':false," - + "'intervals':['1900-01-09T00:00:00.000Z/2992-01-10T00:00:00.000Z']," - + "'filter':{'type':'selector','dimension':'product_id','value':'1020'}," - + "'dimensions':['product_id'],'metrics':['store_sales'],'granularity':'all'," - + "'pagingSpec':{'threshold':16384,'fromNext':true}," - + "'context':{'druid.query.fetch':false}}"; + final String plan = "PLAN=EnumerableInterpreter\n" + + " DruidQuery(table=[[foodmart, foodmart]], " + + "intervals=[[1900-01-09T00:00:00.000Z/2992-01-10T00:00:00.000Z]], filter=[=($1, 1020)]," + + " projects=[[$90, $1]], groups=[{0, 1}], aggs=[[]])"; + final String druidQuery = "{'queryType':'groupBy','dataSource':'foodmart','granularity':'all'," + + "'dimensions':[{'type':'default','dimension':'store_sales'}," + + "{'type':'default','dimension':'product_id'}],'limitSpec':{'type':'default'},'" + + "filter':{'type':'selector','dimension':'product_id','value':'1020'}," + + "'aggregations':[]," + + "'intervals':['1900-01-09T00:00:00.000Z/2992-01-10T00:00:00.000Z']}"; sql(sql) .explainContains(plan) .queryContains(druidChecker(druidQuery)) @@ -1462,7 +1464,7 @@ public class DruidAdapterIT { + "FROM \"foodmart\"\n" + "GROUP BY \"store_sales\", floor(\"timestamp\" to DAY)\n ORDER BY \"store_sales\" DESC\n" + "LIMIT 10\n"; - sql(sql).queryContains(druidChecker("{\"queryType\":\"select\"")); + sql(sql).queryContains(druidChecker("{\"queryType\":\"groupBy\"")); } @Test public void testFilterOnDouble() { @@ -2833,16 +2835,59 @@ public class DruidAdapterIT { sql(sql).explainContains(expectedSubExplain); } - /** - * Test to ensure that aggregations with metrics as filters do not get pushed into Druid - */ @Test public void testFilterClauseWithMetricRef() { String sql = "select sum(\"store_sales\") filter (where \"store_cost\" > 10) from \"foodmart\""; String expectedSubExplain = - " BindableAggregate(group=[{}], EXPR$0=[SUM($0) FILTER $1])\n" - + " BindableProject(store_sales=[$0], $f1=[IS TRUE(>($1, 10))])\n"; + "PLAN=EnumerableInterpreter\n" + + " DruidQuery(table=[[foodmart, foodmart]], " + + "intervals=[[1900-01-09T00:00:00.000Z/2992-01-10T00:00:00.000Z]], filter=[>" + + "($91, 10)], projects=[[$90, IS TRUE(>($91, 10))]], groups=[{}], aggs=[[SUM($0)" + + "]])"; - sql(sql).explainContains(expectedSubExplain); + sql(sql) + .explainContains(expectedSubExplain) + .queryContains( + druidChecker("\"queryType\":\"timeseries\"", "\"filter\":{\"type\":\"bound\"," + + "\"dimension\":\"store_cost\",\"lower\":\"10\",\"lowerStrict\":true," + + "\"ordering\":\"numeric\"}")) + .returnsUnordered("EXPR$0=25.060000000000002"); + } + + @Test public void testFilterClauseWithMetricRefAndAggregates() { + String sql = "select sum(\"store_sales\"), \"product_id\" " + + "from \"foodmart\" where \"product_id\" > 1553 and \"store_cost\" > 5 group by \"product_id\""; + String expectedSubExplain = + "PLAN=EnumerableInterpreter\n" + + " BindableProject(EXPR$0=[$1], product_id=[$0])\n" + + " DruidQuery(table=[[foodmart, foodmart]], " + + "intervals=[[1900-01-09T00:00:00.000Z/2992-01-10T00:00:00.000Z]], filter=[AND(>" + + "(CAST($1):BIGINT, 1553), >($91, 5))], groups=[{1}], aggs=[[SUM($90)]])"; + + sql(sql) + .explainContains(expectedSubExplain) + .queryContains( + druidChecker("\"queryType\":\"groupBy\"", "{\"type\":\"bound\"," + + "\"dimension\":\"store_cost\",\"lower\":\"5\",\"lowerStrict\":true," + + "\"ordering\":\"numeric\"}")) + .returnsUnordered("EXPR$0=10.16; product_id=1554\n" + + "EXPR$0=45.05; product_id=1556\n" + + "EXPR$0=88.5; product_id=1555"); + } + + @Test public void testFilterClauseWithMetricAndTimeAndAggregates() { + String sql = "select sum(\"store_sales\"), \"product_id\"" + + "from \"foodmart\" where \"product_id\" > 1555 and \"store_cost\" > 5 and extract(year " + + "from \"timestamp\") = 1997 " + + "group by floor(\"timestamp\" to DAY),\"product_id\""; + sql(sql) + .queryContains( + druidChecker("\"queryType\":\"groupBy\"", "{\"type\":\"bound\"," + + "\"dimension\":\"store_cost\",\"lower\":\"5\",\"lowerStrict\":true," + + "\"ordering\":\"numeric\"}")) + .returnsUnordered("EXPR$0=10.6; product_id=1556\n" + + "EXPR$0=10.6; product_id=1556\n" + + "EXPR$0=10.6; product_id=1556\n" + + "EXPR$0=13.25; product_id=1556"); } /** @@ -2999,11 +3044,14 @@ public class DruidAdapterIT { @Test public void testDistinctCountOnMetric() { String sql = "select count(distinct \"store_sales\") from \"foodmart\" " + "where \"store_state\" = 'WA'"; - String expectedSubExplain = " BindableAggregate(group=[{}], EXPR$0=[COUNT($0)])\n" - + " BindableAggregate(group=[{1}])"; + String expectedSubExplain = "PLAN=EnumerableInterpreter\n" + + " BindableAggregate(group=[{}], EXPR$0=[COUNT($0)])\n" + + " DruidQuery(table=[[foodmart, foodmart]], " + + "intervals=[[1900-01-09T00:00:00.000Z/2992-01-10T00:00:00.000Z]], filter=[=($63, 'WA')" + + "], groups=[{90}], aggs=[[]])"; - testCountWithApproxDistinct(true, sql, expectedSubExplain); - testCountWithApproxDistinct(false, sql, expectedSubExplain); + testCountWithApproxDistinct(true, sql, expectedSubExplain, "\"queryType\":\"groupBy\""); + testCountWithApproxDistinct(false, sql, expectedSubExplain, "\"queryType\":\"groupBy\""); } /** @@ -3053,13 +3101,14 @@ public class DruidAdapterIT { String sql = "select \"B\", count(distinct \"A\") from " + "(select \"unit_sales\" as \"A\", \"store_state\" as \"B\" from \"foodmart\") " + "group by \"B\""; - String expectedSubExplain = " BindableAggregate(group=[{0}], EXPR$1=[COUNT($1)])\n" - + " DruidQuery(table=[[foodmart, foodmart]], intervals=[[1900-01-09T00:00:" - + "00.000Z/2992-01-10T00:00:00.000Z]], projects=[[$63, $89]], groups=[{0, 1}], " + String expectedSubExplain = "PLAN=EnumerableInterpreter\n" + + " BindableAggregate(group=[{0}], EXPR$1=[COUNT($1)])\n" + + " DruidQuery(table=[[foodmart, foodmart]], " + + "intervals=[[1900-01-09T00:00:00.000Z/2992-01-10T00:00:00.000Z]], groups=[{63, 89}], " + "aggs=[[]])"; - testCountWithApproxDistinct(true, sql, expectedSubExplain); - testCountWithApproxDistinct(false, sql, expectedSubExplain); + testCountWithApproxDistinct(true, sql, expectedSubExplain, "\"queryType\":\"groupBy\""); + testCountWithApproxDistinct(false, sql, expectedSubExplain, "\"queryType\":\"groupBy\""); } private void testCountWithApproxDistinct(boolean approx, String sql, String expectedExplain) {
