[CALCITE-2679] In Elasticsearch adapter, implement DISTINCT and GROUP BY without aggregate function (Siyuan Liu)
This commit mainly fixed 3 bugs: 1. Group by and distinct query enter the wrong execution branch. [ElasticsearchTable:126] 2. Values in agg bucket loses a part after returning as a result. [ElasticsearchJson:83-93, 546-551] 3. Logic of removing empty agg blocks can not work. [Elasticsearch:240-254] Close apache/calcite#927 Project: http://git-wip-us.apache.org/repos/asf/calcite/repo Commit: http://git-wip-us.apache.org/repos/asf/calcite/commit/96605a86 Tree: http://git-wip-us.apache.org/repos/asf/calcite/tree/96605a86 Diff: http://git-wip-us.apache.org/repos/asf/calcite/diff/96605a86 Branch: refs/heads/master Commit: 96605a86cfc4586951700727f12d33fc00c1ce55 Parents: e809344 Author: liusiyuan1 <[email protected]> Authored: Sun Nov 18 23:28:29 2018 +0800 Committer: Julian Hyde <[email protected]> Committed: Sat Dec 1 14:42:33 2018 -0800 ---------------------------------------------------------------------- .../elasticsearch/ElasticsearchJson.java | 42 ++++++--- .../elasticsearch/ElasticsearchTable.java | 29 ++++-- .../elasticsearch/ElasticSearchAdapterTest.java | 96 +++++++++++++++----- 3 files changed, 120 insertions(+), 47 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/calcite/blob/96605a86/elasticsearch/src/main/java/org/apache/calcite/adapter/elasticsearch/ElasticsearchJson.java ---------------------------------------------------------------------- diff --git a/elasticsearch/src/main/java/org/apache/calcite/adapter/elasticsearch/ElasticsearchJson.java b/elasticsearch/src/main/java/org/apache/calcite/adapter/elasticsearch/ElasticsearchJson.java index e389ecf..dd49dfa 100644 --- a/elasticsearch/src/main/java/org/apache/calcite/adapter/elasticsearch/ElasticsearchJson.java +++ b/elasticsearch/src/main/java/org/apache/calcite/adapter/elasticsearch/ElasticsearchJson.java @@ -27,15 +27,14 @@ import com.fasterxml.jackson.databind.annotation.JsonDeserialize; import com.fasterxml.jackson.databind.deser.std.StdDeserializer; import com.fasterxml.jackson.databind.node.ArrayNode; import com.fasterxml.jackson.databind.node.ObjectNode; +import com.google.common.collect.ImmutableSet; import java.io.IOException; import java.time.Duration; import java.util.ArrayDeque; import java.util.ArrayList; -import java.util.Arrays; import java.util.Collections; import java.util.Deque; -import java.util.HashSet; import java.util.Iterator; import java.util.LinkedHashMap; import java.util.List; @@ -74,9 +73,17 @@ final class ElasticsearchJson { rows.computeIfAbsent(r, ignore -> new ArrayList<>()).add(v); aggregations.forEach(a -> visitValueNodes(a, new ArrayList<>(), cons)); rows.forEach((k, v) -> { - Map<String, Object> row = new LinkedHashMap<>(k.keys); - v.forEach(val -> row.put(val.getName(), val.value())); - consumer.accept(row); + if (v.stream().anyMatch(val -> val instanceof GroupValue)) { + v.forEach(tuple -> { + Map<String, Object> groupRow = new LinkedHashMap<>(k.keys); + groupRow.put(tuple.getName(), tuple.value()); + consumer.accept(groupRow); + }); + } else { + Map<String, Object> row = new LinkedHashMap<>(k.keys); + v.forEach(val -> row.put(val.getName(), val.value())); + consumer.accept(row); + } }); } @@ -178,7 +185,7 @@ final class ElasticsearchJson { Bucket bucket = (Bucket) aggregation; if (bucket.hasNoAggregations()) { // bucket with no aggregations is also considered a leaf node - visitValueNodes(MultiValue.of(bucket.getName(), bucket.key()), parents, consumer); + visitValueNodes(GroupValue.of(bucket.getName(), bucket.key()), parents, consumer); return; } parents.add(bucket); @@ -561,13 +568,23 @@ final class ElasticsearchJson { return values().get("value"); } + } + + /** + * Distinguishes from {@link MultiValue}. + * In order that rows which have the same key can be put into result map. + */ + static class GroupValue extends MultiValue { + GroupValue(String name, Map<String, Object> values) { + super(name, values); + } + /** - * Constructs a {@link MultiValue} instance with a single value. + * Constructs a {@link GroupValue} instance with a single value. */ - static MultiValue of(String name, Object value) { - return new MultiValue(name, Collections.singletonMap("value", value)); + static GroupValue of(String name, Object value) { + return new GroupValue(name, Collections.singletonMap("value", value)); } - } /** @@ -575,8 +592,9 @@ final class ElasticsearchJson { */ static class AggregationsDeserializer extends StdDeserializer<Aggregations> { - private static final Set<String> IGNORE_TOKENS = new HashSet<>(Arrays.asList("meta", - "buckets", "value", "values", "value_as_string", "doc_count", "key", "key_as_string")); + private static final Set<String> IGNORE_TOKENS = + ImmutableSet.of("meta", "buckets", "value", "values", "value_as_string", + "doc_count", "key", "key_as_string"); AggregationsDeserializer() { super(Aggregations.class); http://git-wip-us.apache.org/repos/asf/calcite/blob/96605a86/elasticsearch/src/main/java/org/apache/calcite/adapter/elasticsearch/ElasticsearchTable.java ---------------------------------------------------------------------- diff --git a/elasticsearch/src/main/java/org/apache/calcite/adapter/elasticsearch/ElasticsearchTable.java b/elasticsearch/src/main/java/org/apache/calcite/adapter/elasticsearch/ElasticsearchTable.java index b009fff..e288a16 100644 --- a/elasticsearch/src/main/java/org/apache/calcite/adapter/elasticsearch/ElasticsearchTable.java +++ b/elasticsearch/src/main/java/org/apache/calcite/adapter/elasticsearch/ElasticsearchTable.java @@ -53,6 +53,7 @@ import java.util.List; import java.util.Map; import java.util.Objects; import java.util.Set; +import java.util.function.Consumer; import java.util.function.Predicate; import java.util.stream.Collectors; @@ -122,7 +123,7 @@ public class ElasticsearchTable extends AbstractQueryableTable implements Transl List<Map.Entry<String, String>> aggregations, Long offset, Long fetch) throws IOException { - if (!aggregations.isEmpty()) { + if (!aggregations.isEmpty() || !groupBy.isEmpty()) { // process aggregations separately return aggregate(ops, fields, sort, groupBy, aggregations, offset, fetch); } @@ -171,10 +172,6 @@ public class ElasticsearchTable extends AbstractQueryableTable implements Transl List<Map.Entry<String, String>> aggregations, Long offset, Long fetch) throws IOException { - if (aggregations.isEmpty()) { - throw new IllegalArgumentException("Missing Aggregations"); - } - if (!groupBy.isEmpty() && offset != null) { String message = "Currently ES doesn't support generic pagination " + "with aggregations. You can still use LIMIT keyword (without OFFSET). " @@ -245,12 +242,23 @@ public class ElasticsearchTable extends AbstractQueryableTable implements Transl } } + final Consumer<JsonNode> emptyAggRemover = new Consumer<JsonNode>() { + @Override public void accept(JsonNode node) { + if (!node.has(AGGREGATIONS)) { + node.elements().forEachRemaining(this); + return; + } + JsonNode agg = node.get(AGGREGATIONS); + if (agg.size() == 0) { + ((ObjectNode) node).remove(AGGREGATIONS); + } else { + this.accept(agg); + } + } + }; + // cleanup query. remove empty AGGREGATIONS element (if empty) - JsonNode agg = query; - while (agg.has(AGGREGATIONS) && agg.get(AGGREGATIONS).elements().hasNext()) { - agg = agg.get(AGGREGATIONS); - } - ((ObjectNode) agg).remove(AGGREGATIONS); + emptyAggRemover.accept(query); ElasticsearchJson.Result res = transport.search(Collections.emptyMap()).apply(query); @@ -258,6 +266,7 @@ public class ElasticsearchTable extends AbstractQueryableTable implements Transl if (res.aggregations() != null) { // collect values ElasticsearchJson.visitValueNodes(res.aggregations(), m -> { + // using 'Collectors.toMap' will trigger Java 8 bug here Map<String, Object> newMap = new LinkedHashMap<>(); for (String key: m.keySet()) { newMap.put(fieldMap.getOrDefault(key, key), m.get(key)); http://git-wip-us.apache.org/repos/asf/calcite/blob/96605a86/elasticsearch/src/test/java/org/apache/calcite/adapter/elasticsearch/ElasticSearchAdapterTest.java ---------------------------------------------------------------------- diff --git a/elasticsearch/src/test/java/org/apache/calcite/adapter/elasticsearch/ElasticSearchAdapterTest.java b/elasticsearch/src/test/java/org/apache/calcite/adapter/elasticsearch/ElasticSearchAdapterTest.java index 73cf6bf..8b09b88 100644 --- a/elasticsearch/src/test/java/org/apache/calcite/adapter/elasticsearch/ElasticSearchAdapterTest.java +++ b/elasticsearch/src/test/java/org/apache/calcite/adapter/elasticsearch/ElasticSearchAdapterTest.java @@ -458,62 +458,108 @@ public class ElasticSearchAdapterTest { @Test public void groupBy() { + // distinct + calciteAssert() + .query("select distinct state\n" + + "from zips\n" + + "limit 6") + .queryContains( + ElasticsearchChecker.elasticsearchChecker("_source:false", + "size:0", + "aggregations:{'g_state':{'terms':{'field':'state','missing':'__MISSING__', 'size' : 6}}}")) + .returnsOrdered("state=AK", + "state=AL", + "state=AR", + "state=AZ", + "state=CA", + "state=CO"); + + // without aggregate function + calciteAssert() + .query("select state, city\n" + + "from zips\n" + + "group by state, city\n" + + "order by city limit 10") + .queryContains( + ElasticsearchChecker.elasticsearchChecker("'_source':false", + "size:0", + "aggregations:{'g_city':{'terms':{'field':'city','missing':'__MISSING__','size':10,'order':{'_key':'asc'}}", + "aggregations:{'g_state':{'terms':{'field':'state','missing':'__MISSING__','size':10}}}}}}")) + .returnsOrdered("state=SD; city=ABERDEEN", + "state=SC; city=AIKEN", + "state=TX; city=ALTON", + "state=IA; city=AMES", + "state=AK; city=ANCHORAGE", + "state=MD; city=BALTIMORE", + "state=ME; city=BANGOR", + "state=KS; city=BAVARIA", + "state=NJ; city=BAYONNE", + "state=OR; city=BEAVERTON"); + // ascending calciteAssert() - .query("select min(pop), max(pop), state from zips group by state " + .query("select min(pop), max(pop), state\n" + + "from zips\n" + + "group by state\n" + "order by state limit 3") .queryContains( ElasticsearchChecker.elasticsearchChecker("'_source':false", - "size:0", - "aggregations:{'g_state':{terms:{field:'state',missing:'__MISSING__',size:3," - + " order:{'_key':'asc'}}", - "aggregations:{'EXPR$0':{min:{field:'pop'}},'EXPR$1':{max:{field:'pop'}}}}}")) + "size:0", + "aggregations:{'g_state':{terms:{field:'state',missing:'__MISSING__',size:3," + + " order:{'_key':'asc'}}", + "aggregations:{'EXPR$0':{min:{field:'pop'}},'EXPR$1':{max:{field:'pop'}}}}}")) .returnsOrdered("EXPR$0=23238; EXPR$1=32383; state=AK", "EXPR$0=42124; EXPR$1=44165; state=AL", "EXPR$0=37428; EXPR$1=53532; state=AR"); // just one aggregation function calciteAssert() - .query("select min(pop), state from zips group by state" - + " order by state limit 3") + .query("select min(pop), state\n" + + "from zips\n" + + "group by state\n" + + "order by state limit 3") .queryContains( ElasticsearchChecker.elasticsearchChecker("'_source':false", - "size:0", - "aggregations:{'g_state':{terms:{field:'state',missing:'__MISSING__'," - + "size:3, order:{'_key':'asc'}}", - "aggregations:{'EXPR$0':{min:{field:'pop'}} }}}")) + "size:0", + "aggregations:{'g_state':{terms:{field:'state',missing:'__MISSING__'," + + "size:3, order:{'_key':'asc'}}", + "aggregations:{'EXPR$0':{min:{field:'pop'}} }}}")) .returnsOrdered("EXPR$0=23238; state=AK", "EXPR$0=42124; state=AL", "EXPR$0=37428; state=AR"); // group by count calciteAssert() - .query("select count(city), state from zips group by state " + .query("select count(city), state\n" + + "from zips\n" + + "group by state\n" + "order by state limit 3") .queryContains( ElasticsearchChecker.elasticsearchChecker("'_source':false", - "size:0", - "aggregations:{'g_state':{terms:{field:'state',missing:'__MISSING__'," - + " size:3, order:{'_key':'asc'}}", - "aggregations:{'EXPR$0':{'value_count':{field:'city'}} }}}")) + "size:0", + "aggregations:{'g_state':{terms:{field:'state',missing:'__MISSING__'," + + " size:3, order:{'_key':'asc'}}", + "aggregations:{'EXPR$0':{'value_count':{field:'city'}} }}}")) .returnsOrdered("EXPR$0=3; state=AK", "EXPR$0=3; state=AL", "EXPR$0=3; state=AR"); // descending calciteAssert() - .query("select min(pop), max(pop), state from zips group by state " - + " order by state desc limit 3") + .query("select min(pop), max(pop), state\n" + + "from zips\n" + + "group by state\n" + + "order by state desc limit 3") .queryContains( ElasticsearchChecker.elasticsearchChecker("'_source':false", - "size:0", - "aggregations:{'g_state':{terms:{field:'state',missing:'__MISSING__'," - + "size:3, order:{'_key':'desc'}}", - "aggregations:{'EXPR$0':{min:{field:'pop'}},'EXPR$1':" - + "{max:{field:'pop'}}}}}")) + "size:0", + "aggregations:{'g_state':{terms:{field:'state',missing:'__MISSING__'," + + "size:3, order:{'_key':'desc'}}", + "aggregations:{'EXPR$0':{min:{field:'pop'}},'EXPR$1':" + + "{max:{field:'pop'}}}}}")) .returnsOrdered("EXPR$0=25968; EXPR$1=33107; state=WY", - "EXPR$0=45196; EXPR$1=70185; state=WV", - "EXPR$0=51008; EXPR$1=57187; state=WI"); + "EXPR$0=45196; EXPR$1=70185; state=WV", + "EXPR$0=51008; EXPR$1=57187; state=WI"); } /**
