Repository: calcite
Updated Branches:
refs/heads/master 52eda6d91 -> 40503ff65
[CALCITE-2578] Support ANY_VALUE Aggregate Function in ElasticSearch adapter
Allow queries of type `SELECT foo, ANY_VALUE(bar) FROM elastic GROUP BY foo` in
Elastic. They're
implemented as Terms aggregations with size 1.
Intermediate JSON operations in stored in implementor are now valid JSONs.
Before they were missing leading and
trailing curly brackets ({ and }) and manually concatenated in
ElasticsearchTable.
Closes apache/calcite#848
Project: http://git-wip-us.apache.org/repos/asf/calcite/repo
Commit: http://git-wip-us.apache.org/repos/asf/calcite/commit/40503ff6
Tree: http://git-wip-us.apache.org/repos/asf/calcite/tree/40503ff6
Diff: http://git-wip-us.apache.org/repos/asf/calcite/diff/40503ff6
Branch: refs/heads/master
Commit: 40503ff658b7a02d43841f9df7cc655dcbe5de79
Parents: 52eda6d
Author: Andrei Sereda <[email protected]>
Authored: Thu Sep 20 00:02:19 2018 -0400
Committer: Andrei Sereda <[email protected]>
Committed: Thu Sep 20 15:09:33 2018 -0400
----------------------------------------------------------------------
.../elasticsearch/ElasticsearchAggregate.java | 13 ++++---
.../elasticsearch/ElasticsearchFilter.java | 2 +-
.../elasticsearch/ElasticsearchJson.java | 36 ++++++++++++++------
.../elasticsearch/ElasticsearchProject.java | 2 +-
.../elasticsearch/ElasticsearchTable.java | 15 ++++----
.../adapter/elasticsearch/AggregationTest.java | 26 ++++++++++++++
6 files changed, 70 insertions(+), 24 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/calcite/blob/40503ff6/elasticsearch/src/main/java/org/apache/calcite/adapter/elasticsearch/ElasticsearchAggregate.java
----------------------------------------------------------------------
diff --git
a/elasticsearch/src/main/java/org/apache/calcite/adapter/elasticsearch/ElasticsearchAggregate.java
b/elasticsearch/src/main/java/org/apache/calcite/adapter/elasticsearch/ElasticsearchAggregate.java
index 9627aca..e2810f5 100644
---
a/elasticsearch/src/main/java/org/apache/calcite/adapter/elasticsearch/ElasticsearchAggregate.java
+++
b/elasticsearch/src/main/java/org/apache/calcite/adapter/elasticsearch/ElasticsearchAggregate.java
@@ -44,7 +44,8 @@ import java.util.Set;
public class ElasticsearchAggregate extends Aggregate implements
ElasticsearchRel {
private static final Set<SqlKind> SUPPORTED_AGGREGATIONS =
- EnumSet.of(SqlKind.COUNT, SqlKind.MAX, SqlKind.MIN, SqlKind.AVG,
SqlKind.SUM);
+ EnumSet.of(SqlKind.COUNT, SqlKind.MAX, SqlKind.MIN, SqlKind.AVG,
+ SqlKind.SUM, SqlKind.ANY_VALUE);
/** Creates a ElasticsearchAggregate */
ElasticsearchAggregate(RelOptCluster cluster,
@@ -106,7 +107,6 @@ public class ElasticsearchAggregate extends Aggregate
implements ElasticsearchRe
@Override public void implement(Implementor implementor) {
implementor.visitChild(0, getInput());
List<String> inputFields = fieldNames(getInput().getRowType());
-
for (int group : groupSet) {
implementor.addGroupBy(inputFields.get(group));
}
@@ -118,10 +118,13 @@ public class ElasticsearchAggregate extends Aggregate
implements ElasticsearchRe
}
final String name = names.isEmpty() ? ElasticsearchConstants.ID :
names.get(0);
+ // for ANY_VALUE return just a single result
+ final String size = aggCall.getAggregation().getKind() ==
SqlKind.ANY_VALUE ? ", \"size\": 1"
+ : "";
- String op = String.format(Locale.ROOT, "\"%s\":{\"field\": \"%s\"}",
+ final String op = String.format(Locale.ROOT, "{\"%s\":{\"field\": \"%s\"
%s}}",
toElasticAggregate(aggCall),
- name);
+ name, size);
implementor.addAggregation(aggCall.getName(), op);
}
@@ -146,6 +149,8 @@ public class ElasticsearchAggregate extends Aggregate
implements ElasticsearchRe
return "max";
case AVG:
return "avg";
+ case ANY_VALUE:
+ return "terms";
default:
throw new IllegalArgumentException("Unknown aggregation kind " + kind +
" for " + call);
}
http://git-wip-us.apache.org/repos/asf/calcite/blob/40503ff6/elasticsearch/src/main/java/org/apache/calcite/adapter/elasticsearch/ElasticsearchFilter.java
----------------------------------------------------------------------
diff --git
a/elasticsearch/src/main/java/org/apache/calcite/adapter/elasticsearch/ElasticsearchFilter.java
b/elasticsearch/src/main/java/org/apache/calcite/adapter/elasticsearch/ElasticsearchFilter.java
index c339671..67363d2 100644
---
a/elasticsearch/src/main/java/org/apache/calcite/adapter/elasticsearch/ElasticsearchFilter.java
+++
b/elasticsearch/src/main/java/org/apache/calcite/adapter/elasticsearch/ElasticsearchFilter.java
@@ -99,7 +99,7 @@ public class ElasticsearchFilter extends Filter implements
ElasticsearchRel {
QueryBuilders.constantScoreQuery(PredicateAnalyzer.analyze(condition)).writeJson(generator);
generator.flush();
generator.close();
- return "\"query\" : " + writer.toString();
+ return "{\"query\" : " + writer.toString() + "}";
}
}
http://git-wip-us.apache.org/repos/asf/calcite/blob/40503ff6/elasticsearch/src/main/java/org/apache/calcite/adapter/elasticsearch/ElasticsearchJson.java
----------------------------------------------------------------------
diff --git
a/elasticsearch/src/main/java/org/apache/calcite/adapter/elasticsearch/ElasticsearchJson.java
b/elasticsearch/src/main/java/org/apache/calcite/adapter/elasticsearch/ElasticsearchJson.java
index 7c80e82..a05d2a4 100644
---
a/elasticsearch/src/main/java/org/apache/calcite/adapter/elasticsearch/ElasticsearchJson.java
+++
b/elasticsearch/src/main/java/org/apache/calcite/adapter/elasticsearch/ElasticsearchJson.java
@@ -73,13 +73,11 @@ class ElasticsearchJson {
Objects.requireNonNull(aggregations, "aggregations");
Objects.requireNonNull(consumer, "consumer");
- List<Bucket> buckets = new ArrayList<>();
-
Map<RowKey, List<MultiValue>> rows = new LinkedHashMap<>();
BiConsumer<RowKey, MultiValue> cons = (r, v) ->
rows.computeIfAbsent(r, ignore -> new ArrayList<>()).add(v);
- aggregations.forEach(a -> visitValueNodes(a, buckets, cons));
+ aggregations.forEach(a -> visitValueNodes(a, new ArrayList<>(), cons));
rows.forEach((k, v) -> {
Map<String, Object> row = new LinkedHashMap<>(k.keys);
v.forEach(val -> row.put(val.getName(), val.value()));
@@ -131,7 +129,7 @@ class ElasticsearchJson {
BiConsumer<RowKey, MultiValue> consumer) {
if (aggregation instanceof MultiValue) {
- // publish one value of the row
+ // this is a leaf. publish value of the row.
RowKey key = new RowKey(parents);
consumer.accept(key, (MultiValue) aggregation);
return;
@@ -139,6 +137,11 @@ class ElasticsearchJson {
if (aggregation instanceof Bucket) {
Bucket bucket = (Bucket) aggregation;
+ if (bucket.hasNoAggregations()) {
+ // bucket with no aggregations is also considered a leaf node
+ visitValueNodes(MultiValue.of(bucket.getName(), bucket.key()),
parents, consumer);
+ return;
+ }
parents.add(bucket);
bucket.getAggregations().forEach(a -> visitValueNodes(a, parents,
consumer));
parents.remove(parents.size() - 1);
@@ -147,11 +150,7 @@ class ElasticsearchJson {
children.getAggregations().forEach(a -> visitValueNodes(a, parents,
consumer));
} else if (aggregation instanceof MultiBucketsAggregation) {
MultiBucketsAggregation multi = (MultiBucketsAggregation) aggregation;
- multi.buckets().forEach(b -> {
- parents.add(b);
- b.getAggregations().forEach(a -> visitValueNodes(a, parents,
consumer));
- parents.remove(parents.size() - 1);
- });
+ multi.buckets().forEach(b -> visitValueNodes(b, parents, consumer));
}
}
@@ -462,6 +461,13 @@ class ElasticsearchJson {
}
/**
+ * Means current bucket has no aggregations.
+ */
+ boolean hasNoAggregations() {
+ return aggregations.asList().isEmpty();
+ }
+
+ /**
* @return The sub-aggregations of this bucket
*/
@Override public Aggregations getAggregations() {
@@ -500,12 +506,22 @@ class ElasticsearchJson {
*/
Object value() {
if (!values().containsKey("value")) {
- throw new IllegalStateException("'value' field not present in this
aggregation");
+ String message = String.format(Locale.ROOT, "'value' field not present
in "
+ + "%s aggregation", getName());
+
+ throw new IllegalStateException(message);
}
return values().get("value");
}
+ /**
+ * Constructs a {@link MultiValue} instance with a single value.
+ */
+ static MultiValue of(String name, Object value) {
+ return new MultiValue(name, Collections.singletonMap("value", value));
+ }
+
}
/**
http://git-wip-us.apache.org/repos/asf/calcite/blob/40503ff6/elasticsearch/src/main/java/org/apache/calcite/adapter/elasticsearch/ElasticsearchProject.java
----------------------------------------------------------------------
diff --git
a/elasticsearch/src/main/java/org/apache/calcite/adapter/elasticsearch/ElasticsearchProject.java
b/elasticsearch/src/main/java/org/apache/calcite/adapter/elasticsearch/ElasticsearchProject.java
index d0841c3..234b5f0 100644
---
a/elasticsearch/src/main/java/org/apache/calcite/adapter/elasticsearch/ElasticsearchProject.java
+++
b/elasticsearch/src/main/java/org/apache/calcite/adapter/elasticsearch/ElasticsearchProject.java
@@ -102,7 +102,7 @@ public class ElasticsearchProject extends Project
implements ElasticsearchRel {
}
implementor.list.removeIf(l -> l.startsWith("\"_source\""));
- implementor.add(query.toString());
+ implementor.add("{" + query.toString() + "}");
}
}
http://git-wip-us.apache.org/repos/asf/calcite/blob/40503ff6/elasticsearch/src/main/java/org/apache/calcite/adapter/elasticsearch/ElasticsearchTable.java
----------------------------------------------------------------------
diff --git
a/elasticsearch/src/main/java/org/apache/calcite/adapter/elasticsearch/ElasticsearchTable.java
b/elasticsearch/src/main/java/org/apache/calcite/adapter/elasticsearch/ElasticsearchTable.java
index c404da7..f8cea1f 100644
---
a/elasticsearch/src/main/java/org/apache/calcite/adapter/elasticsearch/ElasticsearchTable.java
+++
b/elasticsearch/src/main/java/org/apache/calcite/adapter/elasticsearch/ElasticsearchTable.java
@@ -34,7 +34,6 @@ import org.apache.calcite.schema.SchemaPlus;
import org.apache.calcite.schema.TranslatableTable;
import org.apache.calcite.schema.impl.AbstractTableQueryable;
import org.apache.calcite.sql.type.SqlTypeName;
-import org.apache.calcite.util.Util;
import org.apache.http.HttpEntity;
import org.apache.http.HttpStatus;
@@ -168,11 +167,10 @@ public class ElasticsearchTable extends
AbstractQueryableTable implements Transl
}
final ObjectNode query = mapper.createObjectNode();
-
// manually parse from previously concatenated string
- query.setAll(
- (ObjectNode) mapper.readTree("{"
- + Util.toString(ops, "", ", ", "") + "}"));
+ for (String op: ops) {
+ query.setAll((ObjectNode) mapper.readTree(op));
+ }
if (!sort.isEmpty()) {
ArrayNode sortNode = query.withArray("sort");
@@ -219,9 +217,10 @@ public class ElasticsearchTable extends
AbstractQueryableTable implements Transl
}
final ObjectNode query = mapper.createObjectNode();
-
// manually parse into JSON from previously concatenated strings
- query.setAll((ObjectNode) mapper.readTree("{" + Util.toString(ops, "", ",
", "") + "}"));
+ for (String op: ops) {
+ query.setAll((ObjectNode) mapper.readTree(op));
+ }
// remove / override attributes which are not applicable to aggregations
query.put("_source", false);
@@ -270,7 +269,7 @@ public class ElasticsearchTable extends
AbstractQueryableTable implements Transl
// simple version for queries like "select count(*), max(col1) from table"
(no GROUP BY cols)
if (!groupBy.isEmpty() || !aggregations.stream().allMatch(isCountStar)) {
for (Map.Entry<String, String> aggregation : aggregations) {
- JsonNode value = mapper.readTree("{" + aggregation.getValue() + "}");
+ JsonNode value = mapper.readTree(aggregation.getValue());
parent.set(aggregation.getKey(), value);
}
}
http://git-wip-us.apache.org/repos/asf/calcite/blob/40503ff6/elasticsearch/src/test/java/org/apache/calcite/adapter/elasticsearch/AggregationTest.java
----------------------------------------------------------------------
diff --git
a/elasticsearch/src/test/java/org/apache/calcite/adapter/elasticsearch/AggregationTest.java
b/elasticsearch/src/test/java/org/apache/calcite/adapter/elasticsearch/AggregationTest.java
index 8115bc7..d06cc7d 100644
---
a/elasticsearch/src/test/java/org/apache/calcite/adapter/elasticsearch/AggregationTest.java
+++
b/elasticsearch/src/test/java/org/apache/calcite/adapter/elasticsearch/AggregationTest.java
@@ -220,6 +220,32 @@ public class AggregationTest {
"cat1=b; cat3=z; EXPR$2=7.0; EXPR$3=42.0");
}
+ /**
+ * Testing {@link org.apache.calcite.sql.SqlKind#ANY_VALUE} aggregate
function
+ */
+ @Test
+ public void anyValue() throws Exception {
+ CalciteAssert.that()
+ .with(newConnectionFactory())
+ .query("select cat1, any_value(cat2) from view group by cat1")
+ .returnsUnordered("cat1=a; EXPR$1=g",
+ "cat1=null; EXPR$1=g",
+ "cat1=b; EXPR$1=h");
+
+ CalciteAssert.that()
+ .with(newConnectionFactory())
+ .query("select cat2, any_value(cat1) from view group by cat2")
+ .returnsUnordered("cat2=g; EXPR$1=a", // EXPR$1=null is also valid
+ "cat2=h; EXPR$1=b");
+
+ CalciteAssert.that()
+ .with(newConnectionFactory())
+ .query("select cat2, any_value(cat3) from view group by cat2")
+ .returnsUnordered("cat2=g; EXPR$1=y", // EXPR$1=null is also valid
+ "cat2=h; EXPR$1=z");
+
+ }
+
@Test
public void cat1Cat2Cat3() throws Exception {
CalciteAssert.that()