Repository: calcite Updated Branches: refs/heads/master 82cdc2fa7 -> 8eb7d0ff2
[CALCITE-2101] Druid adapter: Push count(column) using Druid filtered aggregate Close apache/calcite#586 Project: http://git-wip-us.apache.org/repos/asf/calcite/repo Commit: http://git-wip-us.apache.org/repos/asf/calcite/commit/8eb7d0ff Tree: http://git-wip-us.apache.org/repos/asf/calcite/tree/8eb7d0ff Diff: http://git-wip-us.apache.org/repos/asf/calcite/diff/8eb7d0ff Branch: refs/heads/master Commit: 8eb7d0ff2a6067b7555ef63ddc5422d6523f45cb Parents: 82cdc2f Author: Slim <[email protected]> Authored: Tue Dec 19 11:10:25 2017 -0800 Committer: Jesus Camacho Rodriguez <[email protected]> Committed: Wed Dec 20 12:58:01 2017 -0800 ---------------------------------------------------------------------- .../calcite/adapter/druid/DruidQuery.java | 11 ++- .../calcite/adapter/druid/DruidRules.java | 23 ++++-- .../org/apache/calcite/test/DruidAdapterIT.java | 75 ++++++++++++-------- 3 files changed, 73 insertions(+), 36 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/calcite/blob/8eb7d0ff/druid/src/main/java/org/apache/calcite/adapter/druid/DruidQuery.java ---------------------------------------------------------------------- diff --git a/druid/src/main/java/org/apache/calcite/adapter/druid/DruidQuery.java b/druid/src/main/java/org/apache/calcite/adapter/druid/DruidQuery.java index 8a6524c..6664f00 100644 --- a/druid/src/main/java/org/apache/calcite/adapter/druid/DruidQuery.java +++ b/druid/src/main/java/org/apache/calcite/adapter/druid/DruidQuery.java @@ -875,7 +875,16 @@ public class DruidQuery extends AbstractRelNode implements BindableRel { + " because an approximate count distinct is not acceptable."); } } - aggregation = new JsonAggregation("count", name, only); + if (aggCall.getArgList().size() == 1) { + // case we have count(column) push it as count(*) where column is not null + final JsonFilter matchNulls = new JsonSelector(only, null, null); + final JsonFilter filterOutNulls = new JsonCompositeFilter(JsonFilter.Type.NOT, matchNulls); + aggregation = new JsonFilteredAggregation(filterOutNulls, + new JsonAggregation("count", name, only)); + } else { + aggregation = new JsonAggregation("count", name, only); + } + break; case SUM: case SUM0: http://git-wip-us.apache.org/repos/asf/calcite/blob/8eb7d0ff/druid/src/main/java/org/apache/calcite/adapter/druid/DruidRules.java ---------------------------------------------------------------------- diff --git a/druid/src/main/java/org/apache/calcite/adapter/druid/DruidRules.java b/druid/src/main/java/org/apache/calcite/adapter/druid/DruidRules.java index 128f1aa..4996970 100644 --- a/druid/src/main/java/org/apache/calcite/adapter/druid/DruidRules.java +++ b/druid/src/main/java/org/apache/calcite/adapter/druid/DruidRules.java @@ -140,18 +140,29 @@ public class DruidRules { for (AggregateCall aggregateCall : aggregate.getAggCallList()) { switch (aggregateCall.getAggregation().getKind()) { case COUNT: - // Druid can handle 2 scenarios: + // Druid count aggregator can handle 3 scenarios: // 1. count(distinct col) when approximate results - // are acceptable and col is not a metric + // are acceptable and col is not a metric. + // Note that exact count(distinct column) is handled + // by being rewritten into group by followed by count // 2. count(*) + // 3. count(column) + if (checkAggregateOnMetric(ImmutableBitSet.of(aggregateCall.getArgList()), node, query)) { return true; } - if ((aggregateCall.isDistinct() - && (aggregateCall.isApproximate() - || config.approximateDistinctCount())) - || aggregateCall.getArgList().isEmpty()) { + // case count(*) + if (aggregateCall.getArgList().isEmpty()) { + continue; + } + // case count(column) + if (aggregateCall.getArgList().size() == 1 && !aggregateCall.isDistinct()) { + continue; + } + // case count(distinct and is approximate) + if (aggregateCall.isDistinct() + && (aggregateCall.isApproximate() || config.approximateDistinctCount())) { continue; } return true; http://git-wip-us.apache.org/repos/asf/calcite/blob/8eb7d0ff/druid/src/test/java/org/apache/calcite/test/DruidAdapterIT.java ---------------------------------------------------------------------- diff --git a/druid/src/test/java/org/apache/calcite/test/DruidAdapterIT.java b/druid/src/test/java/org/apache/calcite/test/DruidAdapterIT.java index 2df8e57..c78d3b1 100644 --- a/druid/src/test/java/org/apache/calcite/test/DruidAdapterIT.java +++ b/druid/src/test/java/org/apache/calcite/test/DruidAdapterIT.java @@ -1047,11 +1047,17 @@ public class DruidAdapterIT { final String sql = "select sum(\"unit_sales\") as s,\n" + " count(\"store_sqft\") as c\n" + "from \"foodmart\"\n" - + "group by floor(\"timestamp\" to MONTH)"; - String druidQuery = "{'queryType':'select','dataSource':'foodmart'"; + + "group by floor(\"timestamp\" to MONTH) order by s"; + String druidQuery = "{'queryType':'timeseries','dataSource':'foodmart'"; sql(sql) .limit(3) - .returnsUnordered("S=21081; C=5793", "S=23763; C=6762", "S=25270; C=7026") + .explainContains("PLAN=EnumerableInterpreter\n" + + " BindableSort(sort0=[$0], dir0=[ASC])\n" + + " BindableProject(S=[$1], C=[$2])\n" + + " DruidQuery(table=[[foodmart, foodmart]], " + + "intervals=[[1900-01-09T00:00:00.000Z/2992-01-10T00:00:00.000Z]], projects=[[FLOOR" + + "($0, FLAG(MONTH)), $89, $71]], groups=[{0}], aggs=[[SUM($1), COUNT($2)]])") + .returnsOrdered("S=19958; C=5606", "S=20179; C=5523", "S=20388; C=5591") .queryContains(druidChecker(druidQuery)); } @@ -1065,12 +1071,11 @@ public class DruidAdapterIT { + "group by floor(\"timestamp\" to MONTH)\n" + "order by floor(\"timestamp\" to MONTH) ASC"; final String explain = "PLAN=EnumerableInterpreter\n" - + " BindableSort(sort0=[$2], dir0=[ASC])\n" - + " BindableProject(S=[$1], C=[$2], EXPR$2=[$0])\n" - + " BindableAggregate(group=[{0}], S=[SUM($1)], C=[COUNT($2)])\n" - + " BindableProject($f0=[FLOOR($0, FLAG(MONTH))], unit_sales=[$2], store_sqft=[$1])\n" - + " DruidQuery(table=[[foodmart, foodmart]], " - + "intervals=[[1900-01-09T00:00:00.000Z/2992-01-10T00:00:00.000Z]], projects=[[$0, $71, $89]])"; + + " BindableProject(S=[$1], C=[$2], EXPR$2=[$0])\n" + + " DruidQuery(table=[[foodmart, foodmart]], " + + "intervals=[[1900-01-09T00:00:00.000Z/2992-01-10T00:00:00.000Z]], projects=[[FLOOR($0, " + + "FLAG(MONTH)), $89, $71]], groups=[{0}], aggs=[[SUM($1), COUNT($2)]], sort0=[0], " + + "dir0=[ASC])"; sql(sql) .explainContains(explain) .returnsOrdered("S=21628; C=5957", @@ -1094,11 +1099,13 @@ public class DruidAdapterIT { + "from \"foodmart\"\n" + "group by floor(\"timestamp\" to MONTH)\n" + "order by floor(\"timestamp\" to MONTH) limit 3"; - final String explain = "BindableSort(sort0=[$0], dir0=[ASC], fetch=[3])\n" - + " BindableAggregate(group=[{0}], S=[SUM($1)], C=[COUNT($2)])\n" - + " BindableProject($f0=[FLOOR($0, FLAG(MONTH))], unit_sales=[$2], store_sqft=[$1])\n" - + " DruidQuery(table=[[foodmart, foodmart]], " - + "intervals=[[1900-01-09T00:00:00.000Z/2992-01-10T00:00:00.000Z]], projects=[[$0, $71, $89]])"; + final String explain = "PLAN=EnumerableInterpreter\n" + + " BindableProject(M=[CAST($0):TIMESTAMP(0) NOT NULL], S=[$1], C=[$2], EXPR$3=[$0])\n" + + " BindableSort(sort0=[$0], dir0=[ASC], fetch=[3])\n" + + " DruidQuery(table=[[foodmart, foodmart]], " + + "intervals=[[1900-01-09T00:00:00.000Z/2992-01-10T00:00:00.000Z]], projects=[[FLOOR($0, " + + "FLAG(MONTH)), $89, $71]], groups=[{0}], aggs=[[SUM($1), COUNT($2)]], sort0=[0], " + + "dir0=[ASC])"; sql(sql) .returnsOrdered("M=1997-01-01 00:00:00; S=21628; C=5957", "M=1997-02-01 00:00:00; S=20957; C=5842", @@ -1110,12 +1117,12 @@ public class DruidAdapterIT { final String sql = "select sum(\"unit_sales\") as s,\n" + " count(\"store_sqft\") as c\n" + "from \"foodmart\"\n" - + "group by floor(\"timestamp\" to DAY)"; - String druidQuery = "{'queryType':'select','dataSource':'foodmart'"; + + "group by floor(\"timestamp\" to DAY) order by c desc"; + String druidQuery = "{'queryType':'timeseries','dataSource':'foodmart'"; sql(sql) .limit(3) .queryContains(druidChecker(druidQuery)) - .returnsUnordered("S=1244; C=391", "S=550; C=112", "S=580; C=171"); + .returnsOrdered("S=3850; C=1230", "S=3342; C=1071", "S=3219; C=1024"); } @Test public void testGroupByMonthGranularityFiltered() { @@ -1124,11 +1131,11 @@ public class DruidAdapterIT { + "from \"foodmart\"\n" + "where \"timestamp\" >= '1996-01-01 00:00:00 UTC' and " + " \"timestamp\" < '1998-01-01 00:00:00 UTC'\n" - + "group by floor(\"timestamp\" to MONTH)"; - String druidQuery = "{'queryType':'select','dataSource':'foodmart'"; + + "group by floor(\"timestamp\" to MONTH) order by s asc"; + String druidQuery = "{'queryType':'timeseries','dataSource':'foodmart'"; sql(sql) .limit(3) - .returnsUnordered("S=21081; C=5793", "S=23763; C=6762", "S=25270; C=7026") + .returnsOrdered("S=19958; C=5606", "S=20179; C=5523", "S=20388; C=5591") .queryContains(druidChecker(druidQuery)); } @@ -2910,22 +2917,31 @@ public class DruidAdapterIT { final String sql2 = "SELECT count(\"countryName\") FROM (SELECT \"countryName\" FROM " + "\"wikiticker\") as a"; final String plan2 = "PLAN=EnumerableInterpreter\n" - + " BindableAggregate(group=[{}], EXPR$0=[COUNT($0)])\n" - + " DruidQuery(table=[[wiki, wikiticker]], " - + "intervals=[[1900-01-01T00:00:00.000Z/3000-01-01T00:00:00.000Z]], projects=[[$7]])"; + + " DruidQuery(table=[[wiki, wikiticker]], " + + "intervals=[[1900-01-01T00:00:00.000Z/3000-01-01T00:00:00.000Z]], projects=[[$7]], " + + "groups=[{}], aggs=[[COUNT($0)]])"; sql(sql2, WIKI_AUTO2) .returnsUnordered("EXPR$0=3799") .explainContains(plan2); final String sql3 = "SELECT count(*), count(\"countryName\") FROM \"wikiticker\""; final String plan3 = "PLAN=EnumerableInterpreter\n" - + " BindableAggregate(group=[{}], EXPR$0=[COUNT()], EXPR$1=[COUNT($0)])\n" - + " DruidQuery(table=[[wiki, wikiticker]], " - + "intervals=[[1900-01-01T00:00:00.000Z/3000-01-01T00:00:00.000Z]], projects=[[$7]])"; + + " DruidQuery(table=[[wiki, wikiticker]], " + + "intervals=[[1900-01-01T00:00:00.000Z/3000-01-01T00:00:00.000Z]], projects=[[$7]], " + + "groups=[{}], aggs=[[COUNT(), COUNT($0)]])"; sql(sql3, WIKI_AUTO2) .explainContains(plan3); } + + @Test public void testCountColumn2() { + final String sql = "SELECT count(\"countryName\") FROM (SELECT \"countryName\" FROM " + + "\"wikiticker\" WHERE \"countryName\" IS NOT NULL) as a"; + sql(sql, WIKI_AUTO2) + .queryContains(druidChecker("timeseries")) + .returnsUnordered("EXPR$0=3799"); + } + @Test public void testCountWithNonNull() { final String sql = "select count(\"timestamp\") from \"foodmart\"\n"; @@ -3024,9 +3040,10 @@ public class DruidAdapterIT { String sql = "select \"B\", count(\"A\") from " + "(select \"unit_sales\" as \"A\", \"store_state\" as \"B\" from \"foodmart\") " + "group by \"B\""; - String expectedSubExplain = " BindableAggregate(group=[{0}], EXPR$1=[COUNT($1)])\n" - + " DruidQuery(table=[[foodmart, foodmart]], intervals=[[1900-01-09T00:00:00.000Z" - + "/2992-01-10T00:00:00.000Z]], projects=[[$63, $89]])\n"; + String expectedSubExplain = "PLAN=EnumerableInterpreter\n" + + " DruidQuery(table=[[foodmart, foodmart]], " + + "intervals=[[1900-01-09T00:00:00.000Z/2992-01-10T00:00:00.000Z]], projects=[[$63, " + + "$89]], groups=[{0}], aggs=[[COUNT($1)]]"; testCountWithApproxDistinct(true, sql, expectedSubExplain); testCountWithApproxDistinct(false, sql, expectedSubExplain);
