This is an automated email from the ASF dual-hosted git repository.
sereda pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/calcite.git
The following commit(s) were added to refs/heads/master by this push:
new 3dee82d [CALCITE-2755] Expose document _id field when querying
ElasticSearch
3dee82d is described below
commit 3dee82dd7825f4c09dca27a312f9f82a8d6b899d
Author: Andrei Sereda <[email protected]>
AuthorDate: Mon Dec 24 13:56:59 2018 -0500
[CALCITE-2755] Expose document _id field when querying ElasticSearch
Allow user to query (project)
[_id](https://www.elastic.co/guide/en/elasticsearch/reference/current/mapping-id-field.html)
field
explicitly.
Note that (by default) meta fields are not available for `select *` type of
queries and have to be explicitly listed in projection
like `select _MAP['_id'], _MAP['a'] from elastic`.
Add additional mapping between calcite expression `EXPR$n` and item name
`foo.bar` (as part of `_MAP['foo.bar']`).
This information is otherwise lost during query translation.
Closes #982
---
.../elasticsearch/ElasticsearchConstants.java | 5 +
.../elasticsearch/ElasticsearchEnumerators.java | 34 +++--
.../adapter/elasticsearch/ElasticsearchJson.java | 6 +-
.../adapter/elasticsearch/ElasticsearchMethod.java | 1 +
.../elasticsearch/ElasticsearchProject.java | 4 +
.../adapter/elasticsearch/ElasticsearchRel.java | 16 +++
.../adapter/elasticsearch/ElasticsearchRules.java | 12 +-
.../adapter/elasticsearch/ElasticsearchTable.java | 12 +-
.../ElasticsearchToEnumerableConverter.java | 5 +-
.../adapter/elasticsearch/Projection2Test.java | 138 ++++++++++++++++++++-
10 files changed, 217 insertions(+), 16 deletions(-)
diff --git
a/elasticsearch/src/main/java/org/apache/calcite/adapter/elasticsearch/ElasticsearchConstants.java
b/elasticsearch/src/main/java/org/apache/calcite/adapter/elasticsearch/ElasticsearchConstants.java
index 2c4c42c..da875d5 100644
---
a/elasticsearch/src/main/java/org/apache/calcite/adapter/elasticsearch/ElasticsearchConstants.java
+++
b/elasticsearch/src/main/java/org/apache/calcite/adapter/elasticsearch/ElasticsearchConstants.java
@@ -30,6 +30,11 @@ interface ElasticsearchConstants {
String FIELDS = "fields";
String SOURCE_PAINLESS = "params._source";
String SOURCE_GROOVY = "_source";
+
+ /**
+ * Attribute which uniquely identifies a document (ID)
+ * @see <a
href="https://www.elastic.co/guide/en/elasticsearch/reference/current/mapping-id-field.html">ID
Field</a>
+ */
String ID = "_id";
String UID = "_uid";
diff --git
a/elasticsearch/src/main/java/org/apache/calcite/adapter/elasticsearch/ElasticsearchEnumerators.java
b/elasticsearch/src/main/java/org/apache/calcite/adapter/elasticsearch/ElasticsearchEnumerators.java
index a536269..be2090a 100644
---
a/elasticsearch/src/main/java/org/apache/calcite/adapter/elasticsearch/ElasticsearchEnumerators.java
+++
b/elasticsearch/src/main/java/org/apache/calcite/adapter/elasticsearch/ElasticsearchEnumerators.java
@@ -39,8 +39,18 @@ class ElasticsearchEnumerators {
private static Function1<ElasticsearchJson.SearchHit, Object>
singletonGetter(
final String fieldName,
- final Class fieldClass) {
- return hits -> convert(hits.valueOrNull(fieldName), fieldClass);
+ final Class fieldClass,
+ final Map<String, String> mapping) {
+ return hit -> {
+ final Object value;
+ if (ElasticsearchConstants.ID.equals(mapping.get(fieldName))) {
+ // is the original projection on _id field ?
+ value = hit.id();
+ } else {
+ value = hit.valueOrNull(fieldName);
+ }
+ return convert(value, fieldClass);
+ };
}
/**
@@ -52,21 +62,29 @@ class ElasticsearchEnumerators {
* @return function that converts the search result into a generic array
*/
private static Function1<ElasticsearchJson.SearchHit, Object[]> listGetter(
- final List<Map.Entry<String, Class>> fields) {
+ final List<Map.Entry<String, Class>> fields, Map<String, String>
mapping) {
return hit -> {
Object[] objects = new Object[fields.size()];
for (int i = 0; i < fields.size(); i++) {
final Map.Entry<String, Class> field = fields.get(i);
- final String name = field.getKey();
+ final Object value;
+
+ if (ElasticsearchConstants.ID.equals(mapping.get(field.getKey()))) {
+ // is the original projection on _id field ?
+ value = hit.id();
+ } else {
+ value = hit.valueOrNull(field.getKey());
+ }
+
final Class type = field.getValue();
- objects[i] = convert(hit.valueOrNull(name), type);
+ objects[i] = convert(value, type);
}
return objects;
};
}
static Function1<ElasticsearchJson.SearchHit, Object> getter(
- List<Map.Entry<String, Class>> fields) {
+ List<Map.Entry<String, Class>> fields, Map<String, String> mapping) {
//noinspection unchecked
final Function1 getter;
if (fields == null || fields.size() == 1 &&
"_MAP".equals(fields.get(0).getKey())) {
@@ -74,10 +92,10 @@ class ElasticsearchEnumerators {
getter = mapGetter();
} else if (fields.size() == 1) {
// select foo from table
- getter = singletonGetter(fields.get(0).getKey(),
fields.get(0).getValue());
+ getter = singletonGetter(fields.get(0).getKey(),
fields.get(0).getValue(), mapping);
} else {
// select a, b, c from table
- getter = listGetter(fields);
+ getter = listGetter(fields, mapping);
}
return getter;
diff --git
a/elasticsearch/src/main/java/org/apache/calcite/adapter/elasticsearch/ElasticsearchJson.java
b/elasticsearch/src/main/java/org/apache/calcite/adapter/elasticsearch/ElasticsearchJson.java
index dd49dfa..eb9c011 100644
---
a/elasticsearch/src/main/java/org/apache/calcite/adapter/elasticsearch/ElasticsearchJson.java
+++
b/elasticsearch/src/main/java/org/apache/calcite/adapter/elasticsearch/ElasticsearchJson.java
@@ -276,12 +276,16 @@ final class ElasticsearchJson {
*/
@JsonIgnoreProperties(ignoreUnknown = true)
static class SearchHit {
+
+ /**
+ * ID of the document (not available in aggregations)
+ */
private final String id;
private final Map<String, Object> source;
private final Map<String, Object> fields;
@JsonCreator
- SearchHit(@JsonProperty("_id") final String id,
+ SearchHit(@JsonProperty(ElasticsearchConstants.ID) final String id,
@JsonProperty("_source") final Map<String, Object>
source,
@JsonProperty("fields") final Map<String, Object>
fields) {
this.id = Objects.requireNonNull(id, "id");
diff --git
a/elasticsearch/src/main/java/org/apache/calcite/adapter/elasticsearch/ElasticsearchMethod.java
b/elasticsearch/src/main/java/org/apache/calcite/adapter/elasticsearch/ElasticsearchMethod.java
index 7c61345..1e8e13e 100644
---
a/elasticsearch/src/main/java/org/apache/calcite/adapter/elasticsearch/ElasticsearchMethod.java
+++
b/elasticsearch/src/main/java/org/apache/calcite/adapter/elasticsearch/ElasticsearchMethod.java
@@ -35,6 +35,7 @@ enum ElasticsearchMethod {
List.class, // sort
List.class, // groupBy
List.class, // aggregations
+ List.class, // expression mapping
Long.class, // offset
Long.class); // fetch
diff --git
a/elasticsearch/src/main/java/org/apache/calcite/adapter/elasticsearch/ElasticsearchProject.java
b/elasticsearch/src/main/java/org/apache/calcite/adapter/elasticsearch/ElasticsearchProject.java
index 234b5f0..a701091 100644
---
a/elasticsearch/src/main/java/org/apache/calcite/adapter/elasticsearch/ElasticsearchProject.java
+++
b/elasticsearch/src/main/java/org/apache/calcite/adapter/elasticsearch/ElasticsearchProject.java
@@ -68,6 +68,10 @@ public class ElasticsearchProject extends Project implements
ElasticsearchRel {
final String name = pair.right;
final String expr = pair.left.accept(translator);
+ if (ElasticsearchRules.isItem(pair.left)) {
+ implementor.addExpressionItemMapping(name,
ElasticsearchRules.stripQuotes(expr));
+ }
+
if (expr.equals("\"" + name + "\"")) {
fields.add(name);
} else if (expr.matches("\"literal\":.+")) {
diff --git
a/elasticsearch/src/main/java/org/apache/calcite/adapter/elasticsearch/ElasticsearchRel.java
b/elasticsearch/src/main/java/org/apache/calcite/adapter/elasticsearch/ElasticsearchRel.java
index 1dad691..c0a9ce0 100644
---
a/elasticsearch/src/main/java/org/apache/calcite/adapter/elasticsearch/ElasticsearchRel.java
+++
b/elasticsearch/src/main/java/org/apache/calcite/adapter/elasticsearch/ElasticsearchRel.java
@@ -20,6 +20,7 @@ import org.apache.calcite.plan.Convention;
import org.apache.calcite.plan.RelOptTable;
import org.apache.calcite.rel.RelFieldCollation;
import org.apache.calcite.rel.RelNode;
+import org.apache.calcite.sql.fun.SqlStdOperatorTable;
import org.apache.calcite.util.Pair;
import java.util.ArrayList;
@@ -65,6 +66,15 @@ public interface ElasticsearchRel extends RelNode {
final List<String> groupBy = new ArrayList<>();
/**
+ * Keeps mapping between calcite expression identifier (like {@code
EXPR$0}) and
+ * original item call like {@code _MAP['foo.bar']} ({@code foo.bar}
really).
+ * This information otherwise might be lost during query translation.
+ *
+ * @see SqlStdOperatorTable#ITEM
+ */
+ final List<Map.Entry<String, String>> expressionItemMap = new
ArrayList<>();
+
+ /**
* Starting index (default {@code 0}). Equivalent to {@code start} in ES
query.
* @see <a
href="https://www.elastic.co/guide/en/elasticsearch/reference/current/search-request-from-size.html">From/Size</a>
*/
@@ -99,6 +109,12 @@ public interface ElasticsearchRel extends RelNode {
aggregations.add(new Pair<>(field, expression));
}
+ void addExpressionItemMapping(String expressionId, String item) {
+ Objects.requireNonNull(expressionId, "expressionId");
+ Objects.requireNonNull(item, "item");
+ expressionItemMap.add(new Pair<>(expressionId, item));
+ }
+
void offset(long offset) {
this.offset = offset;
}
diff --git
a/elasticsearch/src/main/java/org/apache/calcite/adapter/elasticsearch/ElasticsearchRules.java
b/elasticsearch/src/main/java/org/apache/calcite/adapter/elasticsearch/ElasticsearchRules.java
index 311f9bc..9340092 100644
---
a/elasticsearch/src/main/java/org/apache/calcite/adapter/elasticsearch/ElasticsearchRules.java
+++
b/elasticsearch/src/main/java/org/apache/calcite/adapter/elasticsearch/ElasticsearchRules.java
@@ -82,6 +82,15 @@ class ElasticsearchRules {
return null;
}
+ static boolean isItem(RexNode node) {
+ final Boolean result = node.accept(new RexVisitorImpl<Boolean>(false) {
+ @Override public Boolean visitCall(final RexCall call) {
+ return isItem(call) != null;
+ }
+ });
+ return Boolean.TRUE.equals(result);
+ }
+
static List<String> elasticsearchFieldNames(final RelDataType rowType) {
return SqlValidatorUtil.uniquify(
new AbstractList<String>() {
@@ -102,7 +111,8 @@ class ElasticsearchRules {
}
static String stripQuotes(String s) {
- return s.startsWith("\"") && s.endsWith("\"") ? s.substring(1, s.length()
- 1) : s;
+ return s.length() > 1 && s.startsWith("\"") && s.endsWith("\"")
+ ? s.substring(1, s.length() - 1) : s;
}
/**
diff --git
a/elasticsearch/src/main/java/org/apache/calcite/adapter/elasticsearch/ElasticsearchTable.java
b/elasticsearch/src/main/java/org/apache/calcite/adapter/elasticsearch/ElasticsearchTable.java
index e288a16..0b32f89 100644
---
a/elasticsearch/src/main/java/org/apache/calcite/adapter/elasticsearch/ElasticsearchTable.java
+++
b/elasticsearch/src/main/java/org/apache/calcite/adapter/elasticsearch/ElasticsearchTable.java
@@ -38,6 +38,7 @@ import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.databind.node.ArrayNode;
import com.fasterxml.jackson.databind.node.ObjectNode;
+import com.google.common.collect.ImmutableMap;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -121,11 +122,12 @@ public class ElasticsearchTable extends
AbstractQueryableTable implements Transl
List<Map.Entry<String, RelFieldCollation.Direction>> sort,
List<String> groupBy,
List<Map.Entry<String, String>> aggregations,
+ List<Map.Entry<String, String>> mappings,
Long offset, Long fetch) throws IOException {
if (!aggregations.isEmpty() || !groupBy.isEmpty()) {
// process aggregations separately
- return aggregate(ops, fields, sort, groupBy, aggregations, offset,
fetch);
+ return aggregate(ops, fields, sort, groupBy, aggregations, mappings,
offset, fetch);
}
final ObjectNode query = mapper.createObjectNode();
@@ -151,7 +153,7 @@ public class ElasticsearchTable extends
AbstractQueryableTable implements Transl
}
final Function1<ElasticsearchJson.SearchHit, Object> getter =
- ElasticsearchEnumerators.getter(fields);
+ ElasticsearchEnumerators.getter(fields, ImmutableMap.copyOf(mappings));
Iterable<ElasticsearchJson.SearchHit> iter;
if (offset == null) {
@@ -170,6 +172,7 @@ public class ElasticsearchTable extends
AbstractQueryableTable implements Transl
List<Map.Entry<String, RelFieldCollation.Direction>> sort,
List<String> groupBy,
List<Map.Entry<String, String>> aggregations,
+ List<Map.Entry<String, String>> mapping,
Long offset, Long fetch) throws IOException {
if (!groupBy.isEmpty() && offset != null) {
@@ -290,7 +293,7 @@ public class ElasticsearchTable extends
AbstractQueryableTable implements Transl
}
final Function1<ElasticsearchJson.SearchHit, Object> getter =
- ElasticsearchEnumerators.getter(fields);
+ ElasticsearchEnumerators.getter(fields, ImmutableMap.copyOf(mapping));
ElasticsearchJson.SearchHits hits =
new ElasticsearchJson.SearchHits(total, result.stream()
@@ -356,9 +359,10 @@ public class ElasticsearchTable extends
AbstractQueryableTable implements Transl
List<Map.Entry<String, RelFieldCollation.Direction>> sort,
List<String> groupBy,
List<Map.Entry<String, String>> aggregations,
+ List<Map.Entry<String, String>> mappings,
Long offset, Long fetch) {
try {
- return getTable().find(ops, fields, sort, groupBy, aggregations,
offset, fetch);
+ return getTable().find(ops, fields, sort, groupBy, aggregations,
mappings, offset, fetch);
} catch (IOException e) {
throw new UncheckedIOException("Failed to query " +
getTable().indexName, e);
}
diff --git
a/elasticsearch/src/main/java/org/apache/calcite/adapter/elasticsearch/ElasticsearchToEnumerableConverter.java
b/elasticsearch/src/main/java/org/apache/calcite/adapter/elasticsearch/ElasticsearchToEnumerableConverter.java
index 5e788a8..8a62728 100644
---
a/elasticsearch/src/main/java/org/apache/calcite/adapter/elasticsearch/ElasticsearchToEnumerableConverter.java
+++
b/elasticsearch/src/main/java/org/apache/calcite/adapter/elasticsearch/ElasticsearchToEnumerableConverter.java
@@ -89,12 +89,15 @@ public class ElasticsearchToEnumerableConverter extends
ConverterImpl implements
final Expression aggregations = block.append("aggregations",
constantArrayList(implementor.aggregations, Pair.class));
+ final Expression mappings = block.append("mappings",
+ constantArrayList(implementor.expressionItemMap, Pair.class));
+
final Expression offset = block.append("offset",
Expressions.constant(implementor.offset));
final Expression fetch = block.append("fetch",
Expressions.constant(implementor.fetch));
Expression enumerable = block.append("enumerable",
Expressions.call(table,
ElasticsearchMethod.ELASTICSEARCH_QUERYABLE_FIND.method, ops,
- fields, sort, groupBy, aggregations, offset, fetch));
+ fields, sort, groupBy, aggregations, mappings, offset, fetch));
block.add(Expressions.return_(null, enumerable));
return relImplementor.result(physType, block.toBlock());
}
diff --git
a/elasticsearch/src/test/java/org/apache/calcite/adapter/elasticsearch/Projection2Test.java
b/elasticsearch/src/test/java/org/apache/calcite/adapter/elasticsearch/Projection2Test.java
index 20d4457..ddbadc2 100644
---
a/elasticsearch/src/test/java/org/apache/calcite/adapter/elasticsearch/Projection2Test.java
+++
b/elasticsearch/src/test/java/org/apache/calcite/adapter/elasticsearch/Projection2Test.java
@@ -31,11 +31,17 @@ import org.junit.Test;
import java.sql.Connection;
import java.sql.DriverManager;
+import java.sql.ResultSet;
import java.sql.SQLException;
import java.util.Arrays;
import java.util.Collections;
import java.util.Locale;
import java.util.Map;
+import java.util.function.Consumer;
+import java.util.regex.PatternSyntaxException;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.fail;
/**
* Checks renaming of fields (also upper, lower cases) during projections
@@ -72,7 +78,8 @@ public class Projection2Test {
"select _MAP['a'] AS \"a\", "
+ " _MAP['b.a'] AS \"b.a\", "
+ " _MAP['b.b'] AS \"b.b\", "
- + " _MAP['b.c.a'] AS \"b.c.a\" "
+ + " _MAP['b.c.a'] AS \"b.c.a\", "
+ + " _MAP['_id'] AS \"id\" " // _id field is implicit
+ " from \"elastic\".\"%s\"", NAME);
ViewTableMacro macro = ViewTable.viewMacro(root, viewSql,
@@ -102,6 +109,135 @@ public class Projection2Test {
.returns("EXPR$0=1; EXPR$1=2; EXPR$2=3; EXPR$3=foo; EXPR$4=null;
EXPR$5=null\n");
}
+ /**
+ * Test that {@code _id} field is available when queried explicitly.
+ * @see <a
href="https://www.elastic.co/guide/en/elasticsearch/reference/current/mapping-id-field.html">ID
Field</a>
+ */
+ @Test
+ public void projectionWithIdField() {
+
+ final CalciteAssert.AssertThat factory =
CalciteAssert.that().with(newConnectionFactory());
+
+ factory
+ .query("select \"id\" from view")
+ .returns(regexMatch("id=\\p{Graph}+"));
+
+ factory
+ .query("select \"id\", \"id\" from view")
+ .returns(regexMatch("id=\\p{Graph}+; id=\\p{Graph}+"));
+
+ factory
+ .query("select \"id\", \"a\" from view")
+ .returns(regexMatch("id=\\p{Graph}+; a=1"));
+
+ factory
+ .query("select \"a\", \"id\" from view")
+ .returns(regexMatch("a=1; id=\\p{Graph}+"));
+
+ // single _id column
+ final String sql1 = String.format(Locale.ROOT, "select _MAP['_id'] "
+ + " from \"elastic\".\"%s\"", NAME);
+ factory
+ .query(sql1)
+ .returns(regexMatch("EXPR$0=\\p{Graph}+"));
+
+ // multiple columns: _id and a
+ final String sql2 = String.format(Locale.ROOT, "select _MAP['_id'],
_MAP['a'] "
+ + " from \"elastic\".\"%s\"", NAME);
+ factory
+ .query(sql2)
+ .returns(regexMatch("EXPR$0=\\p{Graph}+; EXPR$1=1"));
+
+ // multiple _id columns
+ final String sql3 = String.format(Locale.ROOT, "select _MAP['_id'],
_MAP['_id'] "
+ + " from \"elastic\".\"%s\"", NAME);
+ factory
+ .query(sql3)
+ .returns(regexMatch("EXPR$0=\\p{Graph}+; EXPR$1=\\p{Graph}+"));
+
+ // _id column with same alias
+ final String sql4 = String.format(Locale.ROOT, "select _MAP['_id'] as
\"_id\" "
+ + " from \"elastic\".\"%s\"", NAME);
+ factory
+ .query(sql4)
+ .returns(regexMatch("_id=\\p{Graph}+"));
+
+ // _id field not available implicitly
+ final String sql5 = String.format(Locale.ROOT, "select * from
\"elastic\".\"%s\"", NAME);
+ factory
+ .query(sql5)
+ .returns(regexMatch("_MAP={a=1, b={a=2, b=3, c={a=foo}}}"));
+ }
+
+ /**
+ * Allows values to contain regular expressions instead of exact values.
+ * <pre>
+ * {@code
+ * key1=foo1; key2=\\w+; key4=\\d{3,4}
+ * }
+ * </pre>
+ * @param lines lines with regexp
+ * @return consumer to be used in {@link
org.apache.calcite.test.CalciteAssert.AssertQuery}
+ */
+ private static Consumer<ResultSet> regexMatch(String...lines) {
+ return rset -> {
+ try {
+ final int columnCount = rset.getMetaData().getColumnCount();
+ final StringBuilder actual = new StringBuilder();
+ int processedRows = 0;
+ boolean fail = false;
+ while (rset.next()) {
+ if (processedRows >= lines.length) {
+ fail = true;
+ }
+
+ for (int i = 1; i <= columnCount; i++) {
+ final String name = rset.getMetaData().getColumnName(i);
+ final String value = rset.getString(i);
+ actual.append(name).append('=').append(value);
+ if (i < columnCount) {
+ actual.append("; ");
+ }
+
+ // don't re-check if already failed
+ if (!fail) {
+ // splitting string of type: key1=val1; key2=val2
+ final String keyValue = lines[processedRows].split("; ")[i - 1];
+ final String[] parts = keyValue.split("=", 2);
+ final String expectedName = parts[0];
+ final String expectedValue = parts[1];
+
+ boolean valueMatches = expectedValue.equals(value);
+
+ if (!valueMatches) {
+ // try regex
+ try {
+ valueMatches = value != null && value.matches(expectedValue);
+ } catch (PatternSyntaxException ignore) {
+ // probably not a regular expression
+ }
+ }
+
+ fail = !(name.equals(expectedName) && valueMatches);
+ }
+
+ }
+
+ processedRows++;
+ }
+
+ // also check that processed same number of rows
+ fail &= processedRows == lines.length;
+
+ if (fail) {
+ assertEquals(String.join("\n", Arrays.asList(lines)),
actual.toString());
+ fail("Should have failed on previous line, but for some reason
didn't");
+ }
+ } catch (SQLException e) {
+ throw new RuntimeException(e);
+ }
+ };
+ }
}
// End Projection2Test.java