Repository: calcite
Updated Branches:
  refs/heads/master 52eda6d91 -> 40503ff65


[CALCITE-2578] Support ANY_VALUE Aggregate Function in ElasticSearch adapter

Allow queries of type `SELECT foo, ANY_VALUE(bar) FROM elastic GROUP BY foo` in 
Elastic. They're
implemented as Terms aggregations with size 1.

Intermediate JSON operations in stored in implementor are now valid JSONs. 
Before they were missing leading and
trailing curly brackets ({ and }) and manually concatenated in 
ElasticsearchTable.

Closes apache/calcite#848


Project: http://git-wip-us.apache.org/repos/asf/calcite/repo
Commit: http://git-wip-us.apache.org/repos/asf/calcite/commit/40503ff6
Tree: http://git-wip-us.apache.org/repos/asf/calcite/tree/40503ff6
Diff: http://git-wip-us.apache.org/repos/asf/calcite/diff/40503ff6

Branch: refs/heads/master
Commit: 40503ff658b7a02d43841f9df7cc655dcbe5de79
Parents: 52eda6d
Author: Andrei Sereda <[email protected]>
Authored: Thu Sep 20 00:02:19 2018 -0400
Committer: Andrei Sereda <[email protected]>
Committed: Thu Sep 20 15:09:33 2018 -0400

----------------------------------------------------------------------
 .../elasticsearch/ElasticsearchAggregate.java   | 13 ++++---
 .../elasticsearch/ElasticsearchFilter.java      |  2 +-
 .../elasticsearch/ElasticsearchJson.java        | 36 ++++++++++++++------
 .../elasticsearch/ElasticsearchProject.java     |  2 +-
 .../elasticsearch/ElasticsearchTable.java       | 15 ++++----
 .../adapter/elasticsearch/AggregationTest.java  | 26 ++++++++++++++
 6 files changed, 70 insertions(+), 24 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/calcite/blob/40503ff6/elasticsearch/src/main/java/org/apache/calcite/adapter/elasticsearch/ElasticsearchAggregate.java
----------------------------------------------------------------------
diff --git 
a/elasticsearch/src/main/java/org/apache/calcite/adapter/elasticsearch/ElasticsearchAggregate.java
 
b/elasticsearch/src/main/java/org/apache/calcite/adapter/elasticsearch/ElasticsearchAggregate.java
index 9627aca..e2810f5 100644
--- 
a/elasticsearch/src/main/java/org/apache/calcite/adapter/elasticsearch/ElasticsearchAggregate.java
+++ 
b/elasticsearch/src/main/java/org/apache/calcite/adapter/elasticsearch/ElasticsearchAggregate.java
@@ -44,7 +44,8 @@ import java.util.Set;
 public class ElasticsearchAggregate extends Aggregate implements 
ElasticsearchRel {
 
   private static final Set<SqlKind> SUPPORTED_AGGREGATIONS =
-      EnumSet.of(SqlKind.COUNT, SqlKind.MAX, SqlKind.MIN, SqlKind.AVG, 
SqlKind.SUM);
+      EnumSet.of(SqlKind.COUNT, SqlKind.MAX, SqlKind.MIN, SqlKind.AVG,
+          SqlKind.SUM, SqlKind.ANY_VALUE);
 
   /** Creates a ElasticsearchAggregate */
   ElasticsearchAggregate(RelOptCluster cluster,
@@ -106,7 +107,6 @@ public class ElasticsearchAggregate extends Aggregate 
implements ElasticsearchRe
   @Override public void implement(Implementor implementor) {
     implementor.visitChild(0, getInput());
     List<String> inputFields = fieldNames(getInput().getRowType());
-
     for (int group : groupSet) {
       implementor.addGroupBy(inputFields.get(group));
     }
@@ -118,10 +118,13 @@ public class ElasticsearchAggregate extends Aggregate 
implements ElasticsearchRe
       }
 
       final String name = names.isEmpty() ? ElasticsearchConstants.ID : 
names.get(0);
+      // for ANY_VALUE return just a single result
+      final String size = aggCall.getAggregation().getKind() == 
SqlKind.ANY_VALUE ? ", \"size\": 1"
+           : "";
 
-      String op = String.format(Locale.ROOT, "\"%s\":{\"field\": \"%s\"}",
+      final String op = String.format(Locale.ROOT, "{\"%s\":{\"field\": \"%s\" 
%s}}",
           toElasticAggregate(aggCall),
-          name);
+          name, size);
 
       implementor.addAggregation(aggCall.getName(), op);
     }
@@ -146,6 +149,8 @@ public class ElasticsearchAggregate extends Aggregate 
implements ElasticsearchRe
       return "max";
     case AVG:
       return "avg";
+    case ANY_VALUE:
+      return "terms";
     default:
       throw new IllegalArgumentException("Unknown aggregation kind " + kind + 
" for " + call);
     }

http://git-wip-us.apache.org/repos/asf/calcite/blob/40503ff6/elasticsearch/src/main/java/org/apache/calcite/adapter/elasticsearch/ElasticsearchFilter.java
----------------------------------------------------------------------
diff --git 
a/elasticsearch/src/main/java/org/apache/calcite/adapter/elasticsearch/ElasticsearchFilter.java
 
b/elasticsearch/src/main/java/org/apache/calcite/adapter/elasticsearch/ElasticsearchFilter.java
index c339671..67363d2 100644
--- 
a/elasticsearch/src/main/java/org/apache/calcite/adapter/elasticsearch/ElasticsearchFilter.java
+++ 
b/elasticsearch/src/main/java/org/apache/calcite/adapter/elasticsearch/ElasticsearchFilter.java
@@ -99,7 +99,7 @@ public class ElasticsearchFilter extends Filter implements 
ElasticsearchRel {
       
QueryBuilders.constantScoreQuery(PredicateAnalyzer.analyze(condition)).writeJson(generator);
       generator.flush();
       generator.close();
-      return "\"query\" : " + writer.toString();
+      return "{\"query\" : " + writer.toString() + "}";
     }
   }
 

http://git-wip-us.apache.org/repos/asf/calcite/blob/40503ff6/elasticsearch/src/main/java/org/apache/calcite/adapter/elasticsearch/ElasticsearchJson.java
----------------------------------------------------------------------
diff --git 
a/elasticsearch/src/main/java/org/apache/calcite/adapter/elasticsearch/ElasticsearchJson.java
 
b/elasticsearch/src/main/java/org/apache/calcite/adapter/elasticsearch/ElasticsearchJson.java
index 7c80e82..a05d2a4 100644
--- 
a/elasticsearch/src/main/java/org/apache/calcite/adapter/elasticsearch/ElasticsearchJson.java
+++ 
b/elasticsearch/src/main/java/org/apache/calcite/adapter/elasticsearch/ElasticsearchJson.java
@@ -73,13 +73,11 @@ class ElasticsearchJson {
     Objects.requireNonNull(aggregations, "aggregations");
     Objects.requireNonNull(consumer, "consumer");
 
-    List<Bucket> buckets = new ArrayList<>();
-
     Map<RowKey, List<MultiValue>> rows = new LinkedHashMap<>();
 
     BiConsumer<RowKey, MultiValue> cons = (r, v) ->
         rows.computeIfAbsent(r, ignore -> new ArrayList<>()).add(v);
-    aggregations.forEach(a -> visitValueNodes(a, buckets, cons));
+    aggregations.forEach(a -> visitValueNodes(a, new ArrayList<>(), cons));
     rows.forEach((k, v) -> {
       Map<String, Object> row = new LinkedHashMap<>(k.keys);
       v.forEach(val -> row.put(val.getName(), val.value()));
@@ -131,7 +129,7 @@ class ElasticsearchJson {
       BiConsumer<RowKey, MultiValue> consumer) {
 
     if (aggregation instanceof MultiValue) {
-      // publish one value of the row
+      // this is a leaf. publish value of the row.
       RowKey key = new RowKey(parents);
       consumer.accept(key, (MultiValue) aggregation);
       return;
@@ -139,6 +137,11 @@ class ElasticsearchJson {
 
     if (aggregation instanceof Bucket) {
       Bucket bucket = (Bucket) aggregation;
+      if (bucket.hasNoAggregations()) {
+        // bucket with no aggregations is also considered a leaf node
+        visitValueNodes(MultiValue.of(bucket.getName(), bucket.key()), 
parents, consumer);
+        return;
+      }
       parents.add(bucket);
       bucket.getAggregations().forEach(a -> visitValueNodes(a, parents, 
consumer));
       parents.remove(parents.size() - 1);
@@ -147,11 +150,7 @@ class ElasticsearchJson {
       children.getAggregations().forEach(a -> visitValueNodes(a, parents, 
consumer));
     } else if (aggregation instanceof MultiBucketsAggregation) {
       MultiBucketsAggregation multi = (MultiBucketsAggregation) aggregation;
-      multi.buckets().forEach(b -> {
-        parents.add(b);
-        b.getAggregations().forEach(a -> visitValueNodes(a, parents, 
consumer));
-        parents.remove(parents.size() - 1);
-      });
+      multi.buckets().forEach(b -> visitValueNodes(b, parents, consumer));
     }
 
   }
@@ -462,6 +461,13 @@ class ElasticsearchJson {
     }
 
     /**
+     * Means current bucket has no aggregations.
+     */
+    boolean hasNoAggregations() {
+      return aggregations.asList().isEmpty();
+    }
+
+    /**
      * @return  The sub-aggregations of this bucket
      */
     @Override public Aggregations getAggregations() {
@@ -500,12 +506,22 @@ class ElasticsearchJson {
      */
     Object value() {
       if (!values().containsKey("value")) {
-        throw new IllegalStateException("'value' field not present in this 
aggregation");
+        String message = String.format(Locale.ROOT, "'value' field not present 
in "
+            + "%s aggregation", getName());
+
+        throw new IllegalStateException(message);
       }
 
       return values().get("value");
     }
 
+    /**
+     * Constructs a {@link MultiValue} instance with a single value.
+     */
+    static MultiValue of(String name, Object value) {
+      return new MultiValue(name, Collections.singletonMap("value", value));
+    }
+
   }
 
   /**

http://git-wip-us.apache.org/repos/asf/calcite/blob/40503ff6/elasticsearch/src/main/java/org/apache/calcite/adapter/elasticsearch/ElasticsearchProject.java
----------------------------------------------------------------------
diff --git 
a/elasticsearch/src/main/java/org/apache/calcite/adapter/elasticsearch/ElasticsearchProject.java
 
b/elasticsearch/src/main/java/org/apache/calcite/adapter/elasticsearch/ElasticsearchProject.java
index d0841c3..234b5f0 100644
--- 
a/elasticsearch/src/main/java/org/apache/calcite/adapter/elasticsearch/ElasticsearchProject.java
+++ 
b/elasticsearch/src/main/java/org/apache/calcite/adapter/elasticsearch/ElasticsearchProject.java
@@ -102,7 +102,7 @@ public class ElasticsearchProject extends Project 
implements ElasticsearchRel {
     }
 
     implementor.list.removeIf(l -> l.startsWith("\"_source\""));
-    implementor.add(query.toString());
+    implementor.add("{" + query.toString() + "}");
   }
 }
 

http://git-wip-us.apache.org/repos/asf/calcite/blob/40503ff6/elasticsearch/src/main/java/org/apache/calcite/adapter/elasticsearch/ElasticsearchTable.java
----------------------------------------------------------------------
diff --git 
a/elasticsearch/src/main/java/org/apache/calcite/adapter/elasticsearch/ElasticsearchTable.java
 
b/elasticsearch/src/main/java/org/apache/calcite/adapter/elasticsearch/ElasticsearchTable.java
index c404da7..f8cea1f 100644
--- 
a/elasticsearch/src/main/java/org/apache/calcite/adapter/elasticsearch/ElasticsearchTable.java
+++ 
b/elasticsearch/src/main/java/org/apache/calcite/adapter/elasticsearch/ElasticsearchTable.java
@@ -34,7 +34,6 @@ import org.apache.calcite.schema.SchemaPlus;
 import org.apache.calcite.schema.TranslatableTable;
 import org.apache.calcite.schema.impl.AbstractTableQueryable;
 import org.apache.calcite.sql.type.SqlTypeName;
-import org.apache.calcite.util.Util;
 
 import org.apache.http.HttpEntity;
 import org.apache.http.HttpStatus;
@@ -168,11 +167,10 @@ public class ElasticsearchTable extends 
AbstractQueryableTable implements Transl
     }
 
     final ObjectNode query = mapper.createObjectNode();
-
     // manually parse from previously concatenated string
-    query.setAll(
-        (ObjectNode) mapper.readTree("{"
-            + Util.toString(ops, "", ", ", "") + "}"));
+    for (String op: ops) {
+      query.setAll((ObjectNode) mapper.readTree(op));
+    }
 
     if (!sort.isEmpty()) {
       ArrayNode sortNode = query.withArray("sort");
@@ -219,9 +217,10 @@ public class ElasticsearchTable extends 
AbstractQueryableTable implements Transl
     }
 
     final ObjectNode query = mapper.createObjectNode();
-
     // manually parse into JSON from previously concatenated strings
-    query.setAll((ObjectNode) mapper.readTree("{" + Util.toString(ops, "", ", 
", "") + "}"));
+    for (String op: ops) {
+      query.setAll((ObjectNode) mapper.readTree(op));
+    }
 
     // remove / override attributes which are not applicable to aggregations
     query.put("_source", false);
@@ -270,7 +269,7 @@ public class ElasticsearchTable extends 
AbstractQueryableTable implements Transl
     // simple version for queries like "select count(*), max(col1) from table" 
(no GROUP BY cols)
     if (!groupBy.isEmpty() || !aggregations.stream().allMatch(isCountStar)) {
       for (Map.Entry<String, String> aggregation : aggregations) {
-        JsonNode value = mapper.readTree("{" + aggregation.getValue()  + "}");
+        JsonNode value = mapper.readTree(aggregation.getValue());
         parent.set(aggregation.getKey(), value);
       }
     }

http://git-wip-us.apache.org/repos/asf/calcite/blob/40503ff6/elasticsearch/src/test/java/org/apache/calcite/adapter/elasticsearch/AggregationTest.java
----------------------------------------------------------------------
diff --git 
a/elasticsearch/src/test/java/org/apache/calcite/adapter/elasticsearch/AggregationTest.java
 
b/elasticsearch/src/test/java/org/apache/calcite/adapter/elasticsearch/AggregationTest.java
index 8115bc7..d06cc7d 100644
--- 
a/elasticsearch/src/test/java/org/apache/calcite/adapter/elasticsearch/AggregationTest.java
+++ 
b/elasticsearch/src/test/java/org/apache/calcite/adapter/elasticsearch/AggregationTest.java
@@ -220,6 +220,32 @@ public class AggregationTest {
             "cat1=b; cat3=z; EXPR$2=7.0; EXPR$3=42.0");
   }
 
+  /**
+   * Testing {@link org.apache.calcite.sql.SqlKind#ANY_VALUE} aggregate 
function
+   */
+  @Test
+  public void anyValue() throws Exception {
+    CalciteAssert.that()
+        .with(newConnectionFactory())
+        .query("select cat1, any_value(cat2) from view group by cat1")
+        .returnsUnordered("cat1=a; EXPR$1=g",
+            "cat1=null; EXPR$1=g",
+            "cat1=b; EXPR$1=h");
+
+    CalciteAssert.that()
+        .with(newConnectionFactory())
+        .query("select cat2, any_value(cat1) from view group by cat2")
+        .returnsUnordered("cat2=g; EXPR$1=a", // EXPR$1=null is also valid
+            "cat2=h; EXPR$1=b");
+
+    CalciteAssert.that()
+        .with(newConnectionFactory())
+        .query("select cat2, any_value(cat3) from view group by cat2")
+        .returnsUnordered("cat2=g; EXPR$1=y", // EXPR$1=null is also valid
+            "cat2=h; EXPR$1=z");
+
+  }
+
   @Test
   public void cat1Cat2Cat3() throws Exception {
     CalciteAssert.that()

Reply via email to