This is an automated email from the ASF dual-hosted git repository. sereda pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/calcite.git
The following commit(s) were added to refs/heads/master by this push: new 542c086 [CALCITE-2814] ElasticSearch adapter. Fix "group by" when using raw item access (eg. _MAP['foo']) 542c086 is described below commit 542c086749d504de5542c37f12f6a1329842f18d Author: Andrei Sereda <25229979+asereda...@users.noreply.github.com> AuthorDate: Sat Jan 26 23:27:33 2019 -0500 [CALCITE-2814] ElasticSearch adapter. Fix "group by" when using raw item access (eg. _MAP['foo']) The following queries were previsouly failing: ```sql select max(_MAP['a']), _MAP['b'] from elastic group by _MAP['b'] ``` --- .../elasticsearch/ElasticsearchAggregate.java | 7 +++--- .../elasticsearch/ElasticsearchEnumerators.java | 26 +++++++++++++++++----- .../adapter/elasticsearch/ElasticsearchMethod.java | 3 ++- .../adapter/elasticsearch/ElasticsearchRel.java | 5 +++-- .../adapter/elasticsearch/ElasticsearchSort.java | 20 ++--------------- .../adapter/elasticsearch/ElasticsearchTable.java | 6 ++--- .../ElasticsearchToEnumerableConverter.java | 5 ++--- .../elasticsearch/ElasticSearchAdapterTest.java | 7 ++++++ 8 files changed, 44 insertions(+), 35 deletions(-) diff --git a/elasticsearch/src/main/java/org/apache/calcite/adapter/elasticsearch/ElasticsearchAggregate.java b/elasticsearch/src/main/java/org/apache/calcite/adapter/elasticsearch/ElasticsearchAggregate.java index 8ca1397..66dfc43 100644 --- a/elasticsearch/src/main/java/org/apache/calcite/adapter/elasticsearch/ElasticsearchAggregate.java +++ b/elasticsearch/src/main/java/org/apache/calcite/adapter/elasticsearch/ElasticsearchAggregate.java @@ -113,9 +113,10 @@ public class ElasticsearchAggregate extends Aggregate implements ElasticsearchRe @Override public void implement(Implementor implementor) { implementor.visitChild(0, getInput()); - List<String> inputFields = fieldNames(getInput().getRowType()); + final List<String> inputFields = fieldNames(getInput().getRowType()); for (int group : groupSet) { - implementor.addGroupBy(inputFields.get(group)); + final String name = inputFields.get(group); + implementor.addGroupBy(implementor.expressionItemMap.getOrDefault(name, name)); } final ObjectMapper mapper = implementor.elasticsearchTable.mapper; @@ -130,7 +131,7 @@ public class ElasticsearchAggregate extends Aggregate implements ElasticsearchRe final ObjectNode field = aggregation.with(toElasticAggregate(aggCall)); final String name = names.isEmpty() ? ElasticsearchConstants.ID : names.get(0); - field.put("field", name); + field.put("field", implementor.expressionItemMap.getOrDefault(name, name)); if (aggCall.getAggregation().getKind() == SqlKind.ANY_VALUE) { field.put("size", 1); } diff --git a/elasticsearch/src/main/java/org/apache/calcite/adapter/elasticsearch/ElasticsearchEnumerators.java b/elasticsearch/src/main/java/org/apache/calcite/adapter/elasticsearch/ElasticsearchEnumerators.java index be2090a..054d85c 100644 --- a/elasticsearch/src/main/java/org/apache/calcite/adapter/elasticsearch/ElasticsearchEnumerators.java +++ b/elasticsearch/src/main/java/org/apache/calcite/adapter/elasticsearch/ElasticsearchEnumerators.java @@ -42,12 +42,20 @@ class ElasticsearchEnumerators { final Class fieldClass, final Map<String, String> mapping) { return hit -> { + final String key; + if (hit.sourceOrFields().containsKey(fieldName)) { + key = fieldName; + } else { + key = mapping.getOrDefault(fieldName, fieldName); + } + final Object value; - if (ElasticsearchConstants.ID.equals(mapping.get(fieldName))) { + if (ElasticsearchConstants.ID.equals(key) + || ElasticsearchConstants.ID.equals(mapping.getOrDefault(fieldName, fieldName))) { // is the original projection on _id field ? value = hit.id(); } else { - value = hit.valueOrNull(fieldName); + value = hit.valueOrNull(key); } return convert(value, fieldClass); }; @@ -67,13 +75,21 @@ class ElasticsearchEnumerators { Object[] objects = new Object[fields.size()]; for (int i = 0; i < fields.size(); i++) { final Map.Entry<String, Class> field = fields.get(i); - final Object value; + final String key; + if (hit.sourceOrFields().containsKey(field.getKey())) { + key = field.getKey(); + } else { + key = mapping.getOrDefault(field.getKey(), field.getKey()); + } - if (ElasticsearchConstants.ID.equals(mapping.get(field.getKey()))) { + final Object value; + if (ElasticsearchConstants.ID.equals(key) + || ElasticsearchConstants.ID.equals(mapping.get(field.getKey())) + || ElasticsearchConstants.ID.equals(field.getKey())) { // is the original projection on _id field ? value = hit.id(); } else { - value = hit.valueOrNull(field.getKey()); + value = hit.valueOrNull(key); } final Class type = field.getValue(); diff --git a/elasticsearch/src/main/java/org/apache/calcite/adapter/elasticsearch/ElasticsearchMethod.java b/elasticsearch/src/main/java/org/apache/calcite/adapter/elasticsearch/ElasticsearchMethod.java index 1e8e13e..f2da014 100644 --- a/elasticsearch/src/main/java/org/apache/calcite/adapter/elasticsearch/ElasticsearchMethod.java +++ b/elasticsearch/src/main/java/org/apache/calcite/adapter/elasticsearch/ElasticsearchMethod.java @@ -22,6 +22,7 @@ import com.google.common.collect.ImmutableMap; import java.lang.reflect.Method; import java.util.List; +import java.util.Map; /** * Builtin methods in the Elasticsearch adapter. @@ -35,7 +36,7 @@ enum ElasticsearchMethod { List.class, // sort List.class, // groupBy List.class, // aggregations - List.class, // expression mapping + Map.class, // item to expression mapping. Eg. _MAP['a.b.c'] and EXPR$1 Long.class, // offset Long.class); // fetch diff --git a/elasticsearch/src/main/java/org/apache/calcite/adapter/elasticsearch/ElasticsearchRel.java b/elasticsearch/src/main/java/org/apache/calcite/adapter/elasticsearch/ElasticsearchRel.java index c0a9ce0..022a867 100644 --- a/elasticsearch/src/main/java/org/apache/calcite/adapter/elasticsearch/ElasticsearchRel.java +++ b/elasticsearch/src/main/java/org/apache/calcite/adapter/elasticsearch/ElasticsearchRel.java @@ -24,6 +24,7 @@ import org.apache.calcite.sql.fun.SqlStdOperatorTable; import org.apache.calcite.util.Pair; import java.util.ArrayList; +import java.util.LinkedHashMap; import java.util.List; import java.util.Map; import java.util.Objects; @@ -72,7 +73,7 @@ public interface ElasticsearchRel extends RelNode { * * @see SqlStdOperatorTable#ITEM */ - final List<Map.Entry<String, String>> expressionItemMap = new ArrayList<>(); + final Map<String, String> expressionItemMap = new LinkedHashMap<>(); /** * Starting index (default {@code 0}). Equivalent to {@code start} in ES query. @@ -112,7 +113,7 @@ public interface ElasticsearchRel extends RelNode { void addExpressionItemMapping(String expressionId, String item) { Objects.requireNonNull(expressionId, "expressionId"); Objects.requireNonNull(item, "item"); - expressionItemMap.add(new Pair<>(expressionId, item)); + expressionItemMap.put(expressionId, item); } void offset(long offset) { diff --git a/elasticsearch/src/main/java/org/apache/calcite/adapter/elasticsearch/ElasticsearchSort.java b/elasticsearch/src/main/java/org/apache/calcite/adapter/elasticsearch/ElasticsearchSort.java index 7adab31..5dae378 100644 --- a/elasticsearch/src/main/java/org/apache/calcite/adapter/elasticsearch/ElasticsearchSort.java +++ b/elasticsearch/src/main/java/org/apache/calcite/adapter/elasticsearch/ElasticsearchSort.java @@ -30,9 +30,6 @@ import org.apache.calcite.rex.RexLiteral; import org.apache.calcite.rex.RexNode; import java.util.List; -import java.util.Locale; -import java.util.Map; -import java.util.Optional; /** * Implementation of {@link org.apache.calcite.rel.core.Sort} @@ -61,21 +58,8 @@ public class ElasticsearchSort extends Sort implements ElasticsearchRel { for (RelFieldCollation fieldCollation : collation.getFieldCollations()) { final String name = fields.get(fieldCollation.getFieldIndex()).getName(); - // TODO there should be a better way to extract original ITEM - if (name.toUpperCase(Locale.ROOT).startsWith("EXPR$")) { - Optional<String> item = implementor.expressionItemMap.stream() - .filter(m -> name.equals(m.getKey())) - .map(Map.Entry::getValue).findAny(); - - if (!item.isPresent()) { - final String message = String.format(Locale.ROOT, "No mapping found for %s", name); - throw new IllegalStateException(message); - } - - item.ifPresent(m -> implementor.addSort(m, fieldCollation.getDirection())); - } else { - implementor.addSort(name, fieldCollation.getDirection()); - } + final String rawName = implementor.expressionItemMap.getOrDefault(name, name); + implementor.addSort(rawName, fieldCollation.getDirection()); } if (offset != null) { diff --git a/elasticsearch/src/main/java/org/apache/calcite/adapter/elasticsearch/ElasticsearchTable.java b/elasticsearch/src/main/java/org/apache/calcite/adapter/elasticsearch/ElasticsearchTable.java index f4d0028..29dc62a 100644 --- a/elasticsearch/src/main/java/org/apache/calcite/adapter/elasticsearch/ElasticsearchTable.java +++ b/elasticsearch/src/main/java/org/apache/calcite/adapter/elasticsearch/ElasticsearchTable.java @@ -122,7 +122,7 @@ public class ElasticsearchTable extends AbstractQueryableTable implements Transl List<Map.Entry<String, RelFieldCollation.Direction>> sort, List<String> groupBy, List<Map.Entry<String, String>> aggregations, - List<Map.Entry<String, String>> mappings, + Map<String, String> mappings, Long offset, Long fetch) throws IOException { if (!aggregations.isEmpty() || !groupBy.isEmpty()) { @@ -172,7 +172,7 @@ public class ElasticsearchTable extends AbstractQueryableTable implements Transl List<Map.Entry<String, RelFieldCollation.Direction>> sort, List<String> groupBy, List<Map.Entry<String, String>> aggregations, - List<Map.Entry<String, String>> mapping, + Map<String, String> mapping, Long offset, Long fetch) throws IOException { if (!groupBy.isEmpty() && offset != null) { @@ -359,7 +359,7 @@ public class ElasticsearchTable extends AbstractQueryableTable implements Transl List<Map.Entry<String, RelFieldCollation.Direction>> sort, List<String> groupBy, List<Map.Entry<String, String>> aggregations, - List<Map.Entry<String, String>> mappings, + Map<String, String> mappings, Long offset, Long fetch) { try { return getTable().find(ops, fields, sort, groupBy, aggregations, mappings, offset, fetch); diff --git a/elasticsearch/src/main/java/org/apache/calcite/adapter/elasticsearch/ElasticsearchToEnumerableConverter.java b/elasticsearch/src/main/java/org/apache/calcite/adapter/elasticsearch/ElasticsearchToEnumerableConverter.java index 8a62728..f301a12 100644 --- a/elasticsearch/src/main/java/org/apache/calcite/adapter/elasticsearch/ElasticsearchToEnumerableConverter.java +++ b/elasticsearch/src/main/java/org/apache/calcite/adapter/elasticsearch/ElasticsearchToEnumerableConverter.java @@ -82,15 +82,14 @@ public class ElasticsearchToEnumerableConverter extends ConverterImpl implements final Expression table = block.append("table", implementor.table .getExpression(ElasticsearchTable.ElasticsearchQueryable.class)); - List<String> opList = implementor.list; - final Expression ops = block.append("ops", constantArrayList(opList, String.class)); + final Expression ops = block.append("ops", Expressions.constant(implementor.list)); final Expression sort = block.append("sort", constantArrayList(implementor.sort, Pair.class)); final Expression groupBy = block.append("groupBy", Expressions.constant(implementor.groupBy)); final Expression aggregations = block.append("aggregations", constantArrayList(implementor.aggregations, Pair.class)); final Expression mappings = block.append("mappings", - constantArrayList(implementor.expressionItemMap, Pair.class)); + Expressions.constant(implementor.expressionItemMap)); final Expression offset = block.append("offset", Expressions.constant(implementor.offset)); final Expression fetch = block.append("fetch", Expressions.constant(implementor.fetch)); diff --git a/elasticsearch/src/test/java/org/apache/calcite/adapter/elasticsearch/ElasticSearchAdapterTest.java b/elasticsearch/src/test/java/org/apache/calcite/adapter/elasticsearch/ElasticSearchAdapterTest.java index 507d106..5973436 100644 --- a/elasticsearch/src/test/java/org/apache/calcite/adapter/elasticsearch/ElasticSearchAdapterTest.java +++ b/elasticsearch/src/test/java/org/apache/calcite/adapter/elasticsearch/ElasticSearchAdapterTest.java @@ -292,6 +292,13 @@ public class ElasticSearchAdapterTest { .returns(sortedResultSetChecker("city", RelFieldCollation.Direction.DESCENDING)) .returnsCount(ZIPS_SIZE); + CalciteAssert.that() + .with(newConnectionFactory()) + .query("select max(_MAP['pop']), min(_MAP['pop']), _MAP['state'] from elastic.zips " + + "group by _MAP['state'] order by _MAP['state'] limit 3") + .returnsOrdered("EXPR$0=32383.0; EXPR$1=23238.0; EXPR$2=AK", + "EXPR$0=44165.0; EXPR$1=42124.0; EXPR$2=AL", + "EXPR$0=53532.0; EXPR$1=37428.0; EXPR$2=AR"); } /**