This is an automated email from the ASF dual-hosted git repository.

sereda pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/calcite.git


The following commit(s) were added to refs/heads/master by this push:
     new 3dee82d  [CALCITE-2755] Expose document _id field when querying 
ElasticSearch
3dee82d is described below

commit 3dee82dd7825f4c09dca27a312f9f82a8d6b899d
Author: Andrei Sereda <[email protected]>
AuthorDate: Mon Dec 24 13:56:59 2018 -0500

    [CALCITE-2755] Expose document _id field when querying ElasticSearch
    
    Allow user to query (project) 
[_id](https://www.elastic.co/guide/en/elasticsearch/reference/current/mapping-id-field.html)
 field
    explicitly.
    
    Note that (by default) meta fields are not available for `select *` type of 
queries and have to be explicitly listed in projection
    like `select _MAP['_id'], _MAP['a'] from elastic`.
    
    Add additional mapping between calcite expression `EXPR$n` and item name 
`foo.bar` (as part of `_MAP['foo.bar']`).
    This information is otherwise lost during query translation.
    
    Closes #982
---
 .../elasticsearch/ElasticsearchConstants.java      |   5 +
 .../elasticsearch/ElasticsearchEnumerators.java    |  34 +++--
 .../adapter/elasticsearch/ElasticsearchJson.java   |   6 +-
 .../adapter/elasticsearch/ElasticsearchMethod.java |   1 +
 .../elasticsearch/ElasticsearchProject.java        |   4 +
 .../adapter/elasticsearch/ElasticsearchRel.java    |  16 +++
 .../adapter/elasticsearch/ElasticsearchRules.java  |  12 +-
 .../adapter/elasticsearch/ElasticsearchTable.java  |  12 +-
 .../ElasticsearchToEnumerableConverter.java        |   5 +-
 .../adapter/elasticsearch/Projection2Test.java     | 138 ++++++++++++++++++++-
 10 files changed, 217 insertions(+), 16 deletions(-)

diff --git 
a/elasticsearch/src/main/java/org/apache/calcite/adapter/elasticsearch/ElasticsearchConstants.java
 
b/elasticsearch/src/main/java/org/apache/calcite/adapter/elasticsearch/ElasticsearchConstants.java
index 2c4c42c..da875d5 100644
--- 
a/elasticsearch/src/main/java/org/apache/calcite/adapter/elasticsearch/ElasticsearchConstants.java
+++ 
b/elasticsearch/src/main/java/org/apache/calcite/adapter/elasticsearch/ElasticsearchConstants.java
@@ -30,6 +30,11 @@ interface ElasticsearchConstants {
   String FIELDS = "fields";
   String SOURCE_PAINLESS = "params._source";
   String SOURCE_GROOVY = "_source";
+
+  /**
+   * Attribute which uniquely identifies a document (ID)
+   * @see <a 
href="https://www.elastic.co/guide/en/elasticsearch/reference/current/mapping-id-field.html";>ID
 Field</a>
+   */
   String ID = "_id";
   String UID = "_uid";
 
diff --git 
a/elasticsearch/src/main/java/org/apache/calcite/adapter/elasticsearch/ElasticsearchEnumerators.java
 
b/elasticsearch/src/main/java/org/apache/calcite/adapter/elasticsearch/ElasticsearchEnumerators.java
index a536269..be2090a 100644
--- 
a/elasticsearch/src/main/java/org/apache/calcite/adapter/elasticsearch/ElasticsearchEnumerators.java
+++ 
b/elasticsearch/src/main/java/org/apache/calcite/adapter/elasticsearch/ElasticsearchEnumerators.java
@@ -39,8 +39,18 @@ class ElasticsearchEnumerators {
 
   private static Function1<ElasticsearchJson.SearchHit, Object> 
singletonGetter(
       final String fieldName,
-      final Class fieldClass) {
-    return hits -> convert(hits.valueOrNull(fieldName), fieldClass);
+      final Class fieldClass,
+      final Map<String, String> mapping) {
+    return hit -> {
+      final Object value;
+      if (ElasticsearchConstants.ID.equals(mapping.get(fieldName))) {
+        // is the original projection on _id field ?
+        value = hit.id();
+      } else {
+        value = hit.valueOrNull(fieldName);
+      }
+      return convert(value, fieldClass);
+    };
   }
 
   /**
@@ -52,21 +62,29 @@ class ElasticsearchEnumerators {
    * @return function that converts the search result into a generic array
    */
   private static Function1<ElasticsearchJson.SearchHit, Object[]> listGetter(
-      final List<Map.Entry<String, Class>> fields) {
+      final List<Map.Entry<String, Class>> fields, Map<String, String> 
mapping) {
     return hit -> {
       Object[] objects = new Object[fields.size()];
       for (int i = 0; i < fields.size(); i++) {
         final Map.Entry<String, Class> field = fields.get(i);
-        final String name = field.getKey();
+        final Object value;
+
+        if (ElasticsearchConstants.ID.equals(mapping.get(field.getKey()))) {
+          // is the original projection on _id field ?
+          value = hit.id();
+        } else {
+          value = hit.valueOrNull(field.getKey());
+        }
+
         final Class type = field.getValue();
-        objects[i] = convert(hit.valueOrNull(name), type);
+        objects[i] = convert(value, type);
       }
       return objects;
     };
   }
 
   static Function1<ElasticsearchJson.SearchHit, Object> getter(
-      List<Map.Entry<String, Class>> fields) {
+      List<Map.Entry<String, Class>> fields, Map<String, String> mapping) {
     //noinspection unchecked
     final Function1 getter;
     if (fields == null || fields.size() == 1 && 
"_MAP".equals(fields.get(0).getKey())) {
@@ -74,10 +92,10 @@ class ElasticsearchEnumerators {
       getter = mapGetter();
     } else if (fields.size() == 1) {
       // select foo from table
-      getter = singletonGetter(fields.get(0).getKey(), 
fields.get(0).getValue());
+      getter = singletonGetter(fields.get(0).getKey(), 
fields.get(0).getValue(), mapping);
     } else {
       // select a, b, c from table
-      getter = listGetter(fields);
+      getter = listGetter(fields, mapping);
     }
 
     return getter;
diff --git 
a/elasticsearch/src/main/java/org/apache/calcite/adapter/elasticsearch/ElasticsearchJson.java
 
b/elasticsearch/src/main/java/org/apache/calcite/adapter/elasticsearch/ElasticsearchJson.java
index dd49dfa..eb9c011 100644
--- 
a/elasticsearch/src/main/java/org/apache/calcite/adapter/elasticsearch/ElasticsearchJson.java
+++ 
b/elasticsearch/src/main/java/org/apache/calcite/adapter/elasticsearch/ElasticsearchJson.java
@@ -276,12 +276,16 @@ final class ElasticsearchJson {
    */
   @JsonIgnoreProperties(ignoreUnknown = true)
   static class SearchHit {
+
+    /**
+     * ID of the document (not available in aggregations)
+     */
     private final String id;
     private final Map<String, Object> source;
     private final Map<String, Object> fields;
 
     @JsonCreator
-    SearchHit(@JsonProperty("_id") final String id,
+    SearchHit(@JsonProperty(ElasticsearchConstants.ID) final String id,
                       @JsonProperty("_source") final Map<String, Object> 
source,
                       @JsonProperty("fields") final Map<String, Object> 
fields) {
       this.id = Objects.requireNonNull(id, "id");
diff --git 
a/elasticsearch/src/main/java/org/apache/calcite/adapter/elasticsearch/ElasticsearchMethod.java
 
b/elasticsearch/src/main/java/org/apache/calcite/adapter/elasticsearch/ElasticsearchMethod.java
index 7c61345..1e8e13e 100644
--- 
a/elasticsearch/src/main/java/org/apache/calcite/adapter/elasticsearch/ElasticsearchMethod.java
+++ 
b/elasticsearch/src/main/java/org/apache/calcite/adapter/elasticsearch/ElasticsearchMethod.java
@@ -35,6 +35,7 @@ enum ElasticsearchMethod {
       List.class, // sort
       List.class, // groupBy
       List.class, // aggregations
+      List.class, // expression mapping
       Long.class, // offset
       Long.class); // fetch
 
diff --git 
a/elasticsearch/src/main/java/org/apache/calcite/adapter/elasticsearch/ElasticsearchProject.java
 
b/elasticsearch/src/main/java/org/apache/calcite/adapter/elasticsearch/ElasticsearchProject.java
index 234b5f0..a701091 100644
--- 
a/elasticsearch/src/main/java/org/apache/calcite/adapter/elasticsearch/ElasticsearchProject.java
+++ 
b/elasticsearch/src/main/java/org/apache/calcite/adapter/elasticsearch/ElasticsearchProject.java
@@ -68,6 +68,10 @@ public class ElasticsearchProject extends Project implements 
ElasticsearchRel {
       final String name = pair.right;
       final String expr = pair.left.accept(translator);
 
+      if (ElasticsearchRules.isItem(pair.left)) {
+        implementor.addExpressionItemMapping(name, 
ElasticsearchRules.stripQuotes(expr));
+      }
+
       if (expr.equals("\"" + name + "\"")) {
         fields.add(name);
       } else if (expr.matches("\"literal\":.+")) {
diff --git 
a/elasticsearch/src/main/java/org/apache/calcite/adapter/elasticsearch/ElasticsearchRel.java
 
b/elasticsearch/src/main/java/org/apache/calcite/adapter/elasticsearch/ElasticsearchRel.java
index 1dad691..c0a9ce0 100644
--- 
a/elasticsearch/src/main/java/org/apache/calcite/adapter/elasticsearch/ElasticsearchRel.java
+++ 
b/elasticsearch/src/main/java/org/apache/calcite/adapter/elasticsearch/ElasticsearchRel.java
@@ -20,6 +20,7 @@ import org.apache.calcite.plan.Convention;
 import org.apache.calcite.plan.RelOptTable;
 import org.apache.calcite.rel.RelFieldCollation;
 import org.apache.calcite.rel.RelNode;
+import org.apache.calcite.sql.fun.SqlStdOperatorTable;
 import org.apache.calcite.util.Pair;
 
 import java.util.ArrayList;
@@ -65,6 +66,15 @@ public interface ElasticsearchRel extends RelNode {
     final List<String> groupBy = new ArrayList<>();
 
     /**
+     * Keeps mapping between calcite expression identifier (like {@code 
EXPR$0}) and
+     * original item call like {@code _MAP['foo.bar']} ({@code foo.bar} 
really).
+     * This information otherwise might be lost during query translation.
+     *
+     * @see SqlStdOperatorTable#ITEM
+     */
+    final List<Map.Entry<String, String>> expressionItemMap = new 
ArrayList<>();
+
+    /**
      * Starting index (default {@code 0}). Equivalent to {@code start} in ES 
query.
      * @see <a 
href="https://www.elastic.co/guide/en/elasticsearch/reference/current/search-request-from-size.html";>From/Size</a>
      */
@@ -99,6 +109,12 @@ public interface ElasticsearchRel extends RelNode {
       aggregations.add(new Pair<>(field, expression));
     }
 
+    void addExpressionItemMapping(String expressionId, String item) {
+      Objects.requireNonNull(expressionId, "expressionId");
+      Objects.requireNonNull(item, "item");
+      expressionItemMap.add(new Pair<>(expressionId, item));
+    }
+
     void offset(long offset) {
       this.offset = offset;
     }
diff --git 
a/elasticsearch/src/main/java/org/apache/calcite/adapter/elasticsearch/ElasticsearchRules.java
 
b/elasticsearch/src/main/java/org/apache/calcite/adapter/elasticsearch/ElasticsearchRules.java
index 311f9bc..9340092 100644
--- 
a/elasticsearch/src/main/java/org/apache/calcite/adapter/elasticsearch/ElasticsearchRules.java
+++ 
b/elasticsearch/src/main/java/org/apache/calcite/adapter/elasticsearch/ElasticsearchRules.java
@@ -82,6 +82,15 @@ class ElasticsearchRules {
     return null;
   }
 
+  static boolean isItem(RexNode node) {
+    final Boolean result = node.accept(new RexVisitorImpl<Boolean>(false) {
+      @Override public Boolean visitCall(final RexCall call) {
+        return isItem(call) != null;
+      }
+    });
+    return Boolean.TRUE.equals(result);
+  }
+
   static List<String> elasticsearchFieldNames(final RelDataType rowType) {
     return SqlValidatorUtil.uniquify(
         new AbstractList<String>() {
@@ -102,7 +111,8 @@ class ElasticsearchRules {
   }
 
   static String stripQuotes(String s) {
-    return s.startsWith("\"") && s.endsWith("\"") ? s.substring(1, s.length() 
- 1) : s;
+    return s.length() > 1 && s.startsWith("\"") && s.endsWith("\"")
+        ? s.substring(1, s.length() - 1) : s;
   }
 
   /**
diff --git 
a/elasticsearch/src/main/java/org/apache/calcite/adapter/elasticsearch/ElasticsearchTable.java
 
b/elasticsearch/src/main/java/org/apache/calcite/adapter/elasticsearch/ElasticsearchTable.java
index e288a16..0b32f89 100644
--- 
a/elasticsearch/src/main/java/org/apache/calcite/adapter/elasticsearch/ElasticsearchTable.java
+++ 
b/elasticsearch/src/main/java/org/apache/calcite/adapter/elasticsearch/ElasticsearchTable.java
@@ -38,6 +38,7 @@ import com.fasterxml.jackson.databind.JsonNode;
 import com.fasterxml.jackson.databind.ObjectMapper;
 import com.fasterxml.jackson.databind.node.ArrayNode;
 import com.fasterxml.jackson.databind.node.ObjectNode;
+import com.google.common.collect.ImmutableMap;
 
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -121,11 +122,12 @@ public class ElasticsearchTable extends 
AbstractQueryableTable implements Transl
       List<Map.Entry<String, RelFieldCollation.Direction>> sort,
       List<String> groupBy,
       List<Map.Entry<String, String>> aggregations,
+      List<Map.Entry<String, String>> mappings,
       Long offset, Long fetch) throws IOException {
 
     if (!aggregations.isEmpty() || !groupBy.isEmpty()) {
       // process aggregations separately
-      return aggregate(ops, fields, sort, groupBy, aggregations, offset, 
fetch);
+      return aggregate(ops, fields, sort, groupBy, aggregations, mappings, 
offset, fetch);
     }
 
     final ObjectNode query = mapper.createObjectNode();
@@ -151,7 +153,7 @@ public class ElasticsearchTable extends 
AbstractQueryableTable implements Transl
     }
 
     final Function1<ElasticsearchJson.SearchHit, Object> getter =
-        ElasticsearchEnumerators.getter(fields);
+        ElasticsearchEnumerators.getter(fields, ImmutableMap.copyOf(mappings));
 
     Iterable<ElasticsearchJson.SearchHit> iter;
     if (offset == null) {
@@ -170,6 +172,7 @@ public class ElasticsearchTable extends 
AbstractQueryableTable implements Transl
       List<Map.Entry<String, RelFieldCollation.Direction>> sort,
       List<String> groupBy,
       List<Map.Entry<String, String>> aggregations,
+      List<Map.Entry<String, String>> mapping,
       Long offset, Long fetch) throws IOException {
 
     if (!groupBy.isEmpty() && offset != null) {
@@ -290,7 +293,7 @@ public class ElasticsearchTable extends 
AbstractQueryableTable implements Transl
     }
 
     final Function1<ElasticsearchJson.SearchHit, Object> getter =
-        ElasticsearchEnumerators.getter(fields);
+        ElasticsearchEnumerators.getter(fields, ImmutableMap.copyOf(mapping));
 
     ElasticsearchJson.SearchHits hits =
         new ElasticsearchJson.SearchHits(total, result.stream()
@@ -356,9 +359,10 @@ public class ElasticsearchTable extends 
AbstractQueryableTable implements Transl
          List<Map.Entry<String, RelFieldCollation.Direction>> sort,
          List<String> groupBy,
          List<Map.Entry<String, String>> aggregations,
+         List<Map.Entry<String, String>> mappings,
          Long offset, Long fetch) {
       try {
-        return getTable().find(ops, fields, sort, groupBy, aggregations, 
offset, fetch);
+        return getTable().find(ops, fields, sort, groupBy, aggregations, 
mappings, offset, fetch);
       } catch (IOException e) {
         throw new UncheckedIOException("Failed to query " + 
getTable().indexName, e);
       }
diff --git 
a/elasticsearch/src/main/java/org/apache/calcite/adapter/elasticsearch/ElasticsearchToEnumerableConverter.java
 
b/elasticsearch/src/main/java/org/apache/calcite/adapter/elasticsearch/ElasticsearchToEnumerableConverter.java
index 5e788a8..8a62728 100644
--- 
a/elasticsearch/src/main/java/org/apache/calcite/adapter/elasticsearch/ElasticsearchToEnumerableConverter.java
+++ 
b/elasticsearch/src/main/java/org/apache/calcite/adapter/elasticsearch/ElasticsearchToEnumerableConverter.java
@@ -89,12 +89,15 @@ public class ElasticsearchToEnumerableConverter extends 
ConverterImpl implements
     final Expression aggregations = block.append("aggregations",
         constantArrayList(implementor.aggregations, Pair.class));
 
+    final Expression mappings = block.append("mappings",
+        constantArrayList(implementor.expressionItemMap, Pair.class));
+
     final Expression offset = block.append("offset", 
Expressions.constant(implementor.offset));
     final Expression fetch = block.append("fetch", 
Expressions.constant(implementor.fetch));
 
     Expression enumerable = block.append("enumerable",
         Expressions.call(table, 
ElasticsearchMethod.ELASTICSEARCH_QUERYABLE_FIND.method, ops,
-            fields, sort, groupBy, aggregations, offset, fetch));
+            fields, sort, groupBy, aggregations, mappings, offset, fetch));
     block.add(Expressions.return_(null, enumerable));
     return relImplementor.result(physType, block.toBlock());
   }
diff --git 
a/elasticsearch/src/test/java/org/apache/calcite/adapter/elasticsearch/Projection2Test.java
 
b/elasticsearch/src/test/java/org/apache/calcite/adapter/elasticsearch/Projection2Test.java
index 20d4457..ddbadc2 100644
--- 
a/elasticsearch/src/test/java/org/apache/calcite/adapter/elasticsearch/Projection2Test.java
+++ 
b/elasticsearch/src/test/java/org/apache/calcite/adapter/elasticsearch/Projection2Test.java
@@ -31,11 +31,17 @@ import org.junit.Test;
 
 import java.sql.Connection;
 import java.sql.DriverManager;
+import java.sql.ResultSet;
 import java.sql.SQLException;
 import java.util.Arrays;
 import java.util.Collections;
 import java.util.Locale;
 import java.util.Map;
+import java.util.function.Consumer;
+import java.util.regex.PatternSyntaxException;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.fail;
 
 /**
  * Checks renaming of fields (also upper, lower cases) during projections
@@ -72,7 +78,8 @@ public class Projection2Test {
             "select _MAP['a'] AS \"a\", "
                 + " _MAP['b.a']  AS \"b.a\", "
                 +  " _MAP['b.b'] AS \"b.b\", "
-                +  " _MAP['b.c.a'] AS \"b.c.a\" "
+                +  " _MAP['b.c.a'] AS \"b.c.a\", "
+                +  " _MAP['_id'] AS \"id\" " // _id field is implicit
                 +  " from \"elastic\".\"%s\"", NAME);
 
         ViewTableMacro macro = ViewTable.viewMacro(root, viewSql,
@@ -102,6 +109,135 @@ public class Projection2Test {
             .returns("EXPR$0=1; EXPR$1=2; EXPR$2=3; EXPR$3=foo; EXPR$4=null; 
EXPR$5=null\n");
   }
 
+  /**
+   * Test that {@code _id} field is available when queried explicitly.
+   * @see <a 
href="https://www.elastic.co/guide/en/elasticsearch/reference/current/mapping-id-field.html";>ID
 Field</a>
+   */
+  @Test
+  public void projectionWithIdField() {
+
+    final CalciteAssert.AssertThat factory = 
CalciteAssert.that().with(newConnectionFactory());
+
+    factory
+        .query("select \"id\" from view")
+        .returns(regexMatch("id=\\p{Graph}+"));
+
+    factory
+        .query("select \"id\", \"id\" from view")
+        .returns(regexMatch("id=\\p{Graph}+; id=\\p{Graph}+"));
+
+    factory
+        .query("select \"id\", \"a\" from view")
+        .returns(regexMatch("id=\\p{Graph}+; a=1"));
+
+    factory
+        .query("select \"a\", \"id\" from view")
+        .returns(regexMatch("a=1; id=\\p{Graph}+"));
+
+    // single _id column
+    final String sql1 = String.format(Locale.ROOT, "select _MAP['_id'] "
+        + " from \"elastic\".\"%s\"", NAME);
+    factory
+        .query(sql1)
+        .returns(regexMatch("EXPR$0=\\p{Graph}+"));
+
+    // multiple columns: _id and a
+    final String sql2 = String.format(Locale.ROOT, "select _MAP['_id'], 
_MAP['a'] "
+        + " from \"elastic\".\"%s\"", NAME);
+    factory
+        .query(sql2)
+        .returns(regexMatch("EXPR$0=\\p{Graph}+; EXPR$1=1"));
+
+    // multiple _id columns
+    final String sql3 = String.format(Locale.ROOT, "select _MAP['_id'], 
_MAP['_id'] "
+        + " from \"elastic\".\"%s\"", NAME);
+    factory
+        .query(sql3)
+        .returns(regexMatch("EXPR$0=\\p{Graph}+; EXPR$1=\\p{Graph}+"));
+
+    // _id column with same alias
+    final String sql4 = String.format(Locale.ROOT, "select _MAP['_id'] as 
\"_id\" "
+        + " from \"elastic\".\"%s\"", NAME);
+    factory
+        .query(sql4)
+        .returns(regexMatch("_id=\\p{Graph}+"));
+
+    // _id field not available implicitly
+    final String sql5 = String.format(Locale.ROOT, "select * from 
\"elastic\".\"%s\"", NAME);
+    factory
+        .query(sql5)
+        .returns(regexMatch("_MAP={a=1, b={a=2, b=3, c={a=foo}}}"));
+  }
+
+  /**
+   * Allows values to contain regular expressions instead of exact values.
+   * <pre>
+   *   {@code
+   *      key1=foo1; key2=\\w+; key4=\\d{3,4}
+   *   }
+   * </pre>
+   * @param lines lines with regexp
+   * @return consumer to be used in {@link 
org.apache.calcite.test.CalciteAssert.AssertQuery}
+   */
+  private static Consumer<ResultSet> regexMatch(String...lines) {
+    return rset -> {
+      try {
+        final int columnCount = rset.getMetaData().getColumnCount();
+        final StringBuilder actual = new StringBuilder();
+        int processedRows = 0;
+        boolean fail = false;
+        while (rset.next()) {
+          if (processedRows >= lines.length) {
+            fail = true;
+          }
+
+          for (int i = 1; i <= columnCount; i++) {
+            final String name = rset.getMetaData().getColumnName(i);
+            final String value = rset.getString(i);
+            actual.append(name).append('=').append(value);
+            if (i < columnCount) {
+              actual.append("; ");
+            }
+
+            // don't re-check if already failed
+            if (!fail) {
+              // splitting string of type: key1=val1; key2=val2
+              final String keyValue = lines[processedRows].split("; ")[i - 1];
+              final String[] parts = keyValue.split("=", 2);
+              final String expectedName = parts[0];
+              final String expectedValue = parts[1];
+
+              boolean valueMatches = expectedValue.equals(value);
+
+              if (!valueMatches) {
+                // try regex
+                try {
+                  valueMatches = value != null && value.matches(expectedValue);
+                } catch (PatternSyntaxException ignore) {
+                  // probably not a regular expression
+                }
+              }
+
+              fail = !(name.equals(expectedName) && valueMatches);
+            }
+
+          }
+
+          processedRows++;
+        }
+
+        // also check that processed same number of rows
+        fail &= processedRows == lines.length;
+
+        if (fail) {
+          assertEquals(String.join("\n", Arrays.asList(lines)), 
actual.toString());
+          fail("Should have failed on previous line, but for some reason 
didn't");
+        }
+      } catch (SQLException e) {
+        throw new RuntimeException(e);
+      }
+    };
+  }
 }
 
 // End Projection2Test.java

Reply via email to