This is an automated email from the ASF dual-hosted git repository.

sereda pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/calcite.git


The following commit(s) were added to refs/heads/master by this push:
     new e6f91d2  [CALCITE-3437] ElasticSearch adapter. Support MatchQuery in 
elasticsearch (Shlok Srivastava)
e6f91d2 is described below

commit e6f91d2e92acbc8dccd375b777b71265677dea16
Author: shlok <[email protected]>
AuthorDate: Fri Oct 25 14:47:30 2019 +0530

    [CALCITE-3437] ElasticSearch adapter. Support MatchQuery in elasticsearch 
(Shlok Srivastava)
---
 .../org/apache/calcite/test/CalciteAssert.java     |   2 +-
 .../adapter/elasticsearch/PredicateAnalyzer.java   |  19 +-
 .../adapter/elasticsearch/QueryBuilders.java       |  73 +++++++
 .../calcite/adapter/elasticsearch/MatchTest.java   | 217 +++++++++++++++++++++
 .../adapter/elasticsearch/QueryBuildersTest.java   |   9 +
 5 files changed, 318 insertions(+), 2 deletions(-)

diff --git a/core/src/test/java/org/apache/calcite/test/CalciteAssert.java 
b/core/src/test/java/org/apache/calcite/test/CalciteAssert.java
index 151102d..65aec7d 100644
--- a/core/src/test/java/org/apache/calcite/test/CalciteAssert.java
+++ b/core/src/test/java/org/apache/calcite/test/CalciteAssert.java
@@ -672,7 +672,7 @@ public class CalciteAssert {
   }
 
   /** Converts a {@link ResultSet} to a string. */
-  static String toString(ResultSet resultSet) throws SQLException {
+  public static String toString(ResultSet resultSet) throws SQLException {
     return new ResultSetFormatter().resultSet(resultSet).string();
   }
 
diff --git 
a/elasticsearch/src/main/java/org/apache/calcite/adapter/elasticsearch/PredicateAnalyzer.java
 
b/elasticsearch/src/main/java/org/apache/calcite/adapter/elasticsearch/PredicateAnalyzer.java
index f2955d3..fe61b52 100644
--- 
a/elasticsearch/src/main/java/org/apache/calcite/adapter/elasticsearch/PredicateAnalyzer.java
+++ 
b/elasticsearch/src/main/java/org/apache/calcite/adapter/elasticsearch/PredicateAnalyzer.java
@@ -51,8 +51,8 @@ import static 
org.apache.calcite.adapter.elasticsearch.QueryBuilders.rangeQuery;
 import static 
org.apache.calcite.adapter.elasticsearch.QueryBuilders.regexpQuery;
 import static org.apache.calcite.adapter.elasticsearch.QueryBuilders.termQuery;
 
-import static java.lang.String.format;
 
+import static java.lang.String.format;
 /**
  * Query predicate analyzer. Uses visitor pattern to traverse existing 
expression
  * and convert it to {@link QueryBuilder}.
@@ -166,6 +166,7 @@ class PredicateAnalyzer {
       switch (syntax) {
       case BINARY:
         switch (call.getKind()) {
+        case CONTAINS:
         case AND:
         case OR:
         case LIKE:
@@ -233,6 +234,8 @@ class PredicateAnalyzer {
           return toCastExpression(call);
         case LIKE:
           return binary(call);
+        case CONTAINS:
+          return binary(call);
         default:
           // manually process ITEM($0, 'foo') which in our case will be named 
attribute
           if (call.getOperator().getName().equalsIgnoreCase("ITEM")) {
@@ -349,6 +352,8 @@ class PredicateAnalyzer {
       }
 
       switch (call.getKind()) {
+      case CONTAINS:
+        return QueryExpression.create(pair.getKey()).contains(pair.getValue());
       case LIKE:
         throw new UnsupportedOperationException("LIKE not yet supported");
       case EQUALS:
@@ -541,6 +546,8 @@ class PredicateAnalyzer {
       return false;
     }
 
+    public abstract QueryExpression contains(LiteralExpression literal);
+
     /**
      * Negate {@code this} QueryExpression (not the next one).
      */
@@ -641,6 +648,11 @@ class PredicateAnalyzer {
           + "cannot be applied to a compound expression");
     }
 
+    @Override public QueryExpression contains(LiteralExpression literal) {
+      throw new PredicateAnalyzerException("SqlOperatorImpl ['contains'] "
+              + "cannot be applied to a compound expression");
+    }
+
     @Override public QueryExpression notExists() {
       throw new PredicateAnalyzerException("SqlOperatorImpl ['notExists'] "
           + "cannot be applied to a compound expression");
@@ -741,6 +753,11 @@ class PredicateAnalyzer {
       return this;
     }
 
+    @Override public QueryExpression contains(LiteralExpression literal) {
+      builder = QueryBuilders.matchQuery(getFieldReference(), literal.value());
+      return this;
+    }
+
     @Override public QueryExpression notLike(LiteralExpression literal) {
       builder = boolQuery()
               // NOT LIKE should return false when field is NULL
diff --git 
a/elasticsearch/src/main/java/org/apache/calcite/adapter/elasticsearch/QueryBuilders.java
 
b/elasticsearch/src/main/java/org/apache/calcite/adapter/elasticsearch/QueryBuilders.java
index b046cb5..4752325 100644
--- 
a/elasticsearch/src/main/java/org/apache/calcite/adapter/elasticsearch/QueryBuilders.java
+++ 
b/elasticsearch/src/main/java/org/apache/calcite/adapter/elasticsearch/QueryBuilders.java
@@ -125,6 +125,26 @@ class QueryBuilders {
    * @param name   The field name
    * @param values The terms
    */
+  static MatchesQueryBuilder matchesQuery(String name, Iterable<?> values) {
+    return new MatchesQueryBuilder(name, values);
+  }
+
+  /**
+   * A Query that matches documents containing a term.
+   *
+   * @param name  The name of the field
+   * @param value The value of the term
+   */
+  static MatchQueryBuilder matchQuery(String name, Object value) {
+    return new MatchQueryBuilder(name, value);
+  }
+
+  /**
+   * A filer for a field based on several terms matching on any of them.
+   *
+   * @param name   The field name
+   * @param values The terms
+   */
   static TermsQueryBuilder termsQuery(String name, Iterable<?> values) {
     return new TermsQueryBuilder(name, values);
   }
@@ -309,6 +329,59 @@ class QueryBuilders {
     }
   }
 
+
+
+  /**
+   * A Query that matches documents containing a term.
+   */
+  static class MatchQueryBuilder extends QueryBuilder {
+    private final String fieldName;
+    private final Object value;
+
+    private MatchQueryBuilder(final String fieldName, final Object value) {
+      this.fieldName = Objects.requireNonNull(fieldName, "fieldName");
+      this.value = Objects.requireNonNull(value, "value");
+    }
+
+    @Override void writeJson(final JsonGenerator generator) throws IOException 
{
+      generator.writeStartObject();
+      generator.writeFieldName("match");
+      generator.writeStartObject();
+      generator.writeFieldName(fieldName);
+      writeObject(generator, value);
+      generator.writeEndObject();
+      generator.writeEndObject();
+    }
+  }
+
+
+  /**
+   * A filter for a field based on several terms matching on any of them.
+   */
+  private static class MatchesQueryBuilder extends QueryBuilder {
+    private final String fieldName;
+    private final Iterable<?> values;
+
+    private MatchesQueryBuilder(final String fieldName, final Iterable<?> 
values) {
+      this.fieldName = Objects.requireNonNull(fieldName, "fieldName");
+      this.values = Objects.requireNonNull(values, "values");
+    }
+
+    @Override void writeJson(final JsonGenerator generator) throws IOException 
{
+      generator.writeStartObject();
+      generator.writeFieldName("match");
+      generator.writeStartObject();
+      generator.writeFieldName(fieldName);
+      generator.writeStartArray();
+      for (Object value: values) {
+        writeObject(generator, value);
+      }
+      generator.writeEndArray();
+      generator.writeEndObject();
+      generator.writeEndObject();
+    }
+  }
+
   /**
    * Write usually simple (scalar) value (string, number, boolean or null) to 
json output.
    * In case of complex objects delegates to jackson serialization.
diff --git 
a/elasticsearch/src/test/java/org/apache/calcite/adapter/elasticsearch/MatchTest.java
 
b/elasticsearch/src/test/java/org/apache/calcite/adapter/elasticsearch/MatchTest.java
new file mode 100644
index 0000000..2f4bfa0
--- /dev/null
+++ 
b/elasticsearch/src/test/java/org/apache/calcite/adapter/elasticsearch/MatchTest.java
@@ -0,0 +1,217 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to you under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.calcite.adapter.elasticsearch;
+
+
+import org.apache.calcite.jdbc.CalciteConnection;
+import org.apache.calcite.rel.RelNode;
+import org.apache.calcite.rel.type.RelDataType;
+import org.apache.calcite.rel.type.RelDataTypeFactory;
+import org.apache.calcite.rel.type.RelDataTypeSystem;
+import org.apache.calcite.rex.RexBuilder;
+import org.apache.calcite.rex.RexNode;
+import org.apache.calcite.schema.SchemaPlus;
+import org.apache.calcite.schema.impl.ViewTable;
+import org.apache.calcite.schema.impl.ViewTableMacro;
+import org.apache.calcite.sql.SqlCollation;
+import org.apache.calcite.sql.fun.SqlStdOperatorTable;
+import org.apache.calcite.sql.parser.SqlParser;
+import org.apache.calcite.sql.type.SqlTypeFactoryImpl;
+import org.apache.calcite.sql.type.SqlTypeName;
+import org.apache.calcite.test.CalciteAssert;
+import org.apache.calcite.tools.FrameworkConfig;
+import org.apache.calcite.tools.Frameworks;
+import org.apache.calcite.tools.RelBuilder;
+import org.apache.calcite.tools.RelRunner;
+import org.apache.calcite.util.NlsString;
+
+import com.fasterxml.jackson.databind.node.ObjectNode;
+import com.google.common.collect.ImmutableMap;
+import com.google.common.io.LineProcessor;
+import com.google.common.io.Resources;
+
+
+import org.junit.BeforeClass;
+import org.junit.ClassRule;
+import org.junit.Test;
+
+import static org.apache.calcite.test.Matchers.hasTree;
+
+import static org.hamcrest.CoreMatchers.is;
+import static org.junit.Assert.assertThat;
+
+import java.io.IOException;
+import java.nio.charset.StandardCharsets;
+import java.sql.Connection;
+import java.sql.DriverManager;
+import java.sql.PreparedStatement;
+import java.sql.SQLException;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+/**
+ * Testing Elasticsearch match query.
+ */
+public class MatchTest {
+
+  @ClassRule //init once for all tests
+  public static final EmbeddedElasticsearchPolicy NODE = 
EmbeddedElasticsearchPolicy.create();
+
+  /** Default index/type name */
+  private static final String ZIPS = "zips";
+  private static final int ZIPS_SIZE = 149;
+
+  /**
+   * Used to create {@code zips} index and insert zip data in bulk.
+   * @throws Exception when instance setup failed
+   */
+  @BeforeClass
+  public static void setup() throws Exception {
+    final Map<String, String> mapping = ImmutableMap.of("city", "text", 
"state",
+        "keyword", "pop", "long");
+
+    NODE.createIndex(ZIPS, mapping);
+
+    // load records from file
+    final List<ObjectNode> bulk = new ArrayList<>();
+    
Resources.readLines(ElasticSearchAdapterTest.class.getResource("/zips-mini.json"),
+        StandardCharsets.UTF_8, new LineProcessor<Void>() {
+          @Override public boolean processLine(String line) throws IOException 
{
+            line = line.replaceAll("_id", "id"); // _id is a reserved 
attribute in ES
+            bulk.add((ObjectNode) NODE.mapper().readTree(line));
+            return true;
+          }
+
+          @Override public Void getResult() {
+            return null;
+          }
+        });
+
+    if (bulk.isEmpty()) {
+      throw new IllegalStateException("No records to index. Empty file ?");
+    }
+
+    NODE.insertBulk(ZIPS, bulk);
+  }
+
+  private CalciteAssert.ConnectionFactory newConnectionFactory() {
+    return new CalciteAssert.ConnectionFactory() {
+      @Override public Connection createConnection() throws SQLException {
+        final Connection connection = 
DriverManager.getConnection("jdbc:calcite:lex=JAVA");
+        final SchemaPlus root = 
connection.unwrap(CalciteConnection.class).getRootSchema();
+
+        root.add("elastic", new ElasticsearchSchema(NODE.restClient(), 
NODE.mapper(), ZIPS));
+
+        // add calcite view programmatically
+        final String viewSql = "select cast(_MAP['city'] AS varchar(20)) AS 
\"city\", "
+            + " cast(_MAP['loc'][0] AS float) AS \"longitude\",\n"
+            + " cast(_MAP['loc'][1] AS float) AS \"latitude\",\n"
+            + " cast(_MAP['pop'] AS integer) AS \"pop\", "
+            +  " cast(_MAP['state'] AS varchar(2)) AS \"state\", "
+            +  " cast(_MAP['id'] AS varchar(5)) AS \"id\" "
+            +  "from \"elastic\".\"zips\"";
+
+        ViewTableMacro macro = ViewTable.viewMacro(root, viewSql,
+            Collections.singletonList("elastic"), Arrays.asList("elastic", 
"view"), false);
+        root.add("zips", macro);
+
+        return connection;
+      }
+    };
+  }
+
+      /**
+   * Test the ElasticSearch match query. The match query is translated from 
CONTAINS query which
+   * is build using RelBuilder, RexBuilder because the normal sql query 
assumes CONTAINS query
+   * is for date/period range
+   *
+   * Equivalent SQL query: select * from zips where city contains 'waltham'
+   *
+   * ElasticSearch query for it:
+   * {"query":{"constant_score":{"filter":{"match":{"city":"waltham"}}}}}
+   *
+   * @throws Exception
+   */
+  @Test
+  public void testMatchQuery() throws Exception {
+
+    CalciteConnection con = (CalciteConnection) newConnectionFactory()
+        .createConnection();
+    SchemaPlus postschema = con.getRootSchema().getSubSchema("elastic");
+
+    FrameworkConfig postConfig = Frameworks.newConfigBuilder()
+         .parserConfig(SqlParser.Config.DEFAULT)
+         .defaultSchema(postschema)
+         .build();
+
+    final RelBuilder builder = RelBuilder.create(postConfig);
+    builder.scan(ZIPS);
+
+    final RelDataTypeFactory relDataTypeFactory = new SqlTypeFactoryImpl(
+        RelDataTypeSystem.DEFAULT);
+    final RexBuilder rexbuilder = new RexBuilder(relDataTypeFactory);
+
+    RexNode nameRexNode = rexbuilder.makeCall(SqlStdOperatorTable.ITEM,
+        
rexbuilder.makeInputRef(relDataTypeFactory.createSqlType(SqlTypeName.ANY), 0),
+        rexbuilder.makeCharLiteral(
+          new NlsString("city", 
rexbuilder.getTypeFactory().getDefaultCharset().name(),
+          SqlCollation.COERCIBLE)));
+
+    RelDataType mapType = relDataTypeFactory.createMapType(
+        relDataTypeFactory.createSqlType(SqlTypeName.VARCHAR),
+        relDataTypeFactory.createTypeWithNullability(
+          relDataTypeFactory.createSqlType(SqlTypeName.ANY), true));
+
+    ArrayList<RexNode> namedList = new ArrayList<RexNode>(2);
+    namedList.add(rexbuilder.makeInputRef(mapType, 0));
+    namedList.add(nameRexNode);
+
+    //Add fields in builder stack so it is accessible while filter preparation
+    builder.projectNamed(namedList, Arrays.asList("_MAP", "city"), true);
+
+    RexNode filterRexNode = builder
+         .call(SqlStdOperatorTable.CONTAINS, builder.field("city"),
+         builder.literal("waltham"));
+    builder.filter(filterRexNode);
+
+    String builderExpected = ""
+         + "LogicalFilter(condition=[CONTAINS($1, 'waltham')])\n"
+         + "  LogicalProject(_MAP=[$0], city=[ITEM($0, 'city')])\n"
+         + "    LogicalTableScan(table=[[elastic, zips]])\n";
+
+    RelNode root = builder.build();
+
+    RelRunner ru = (RelRunner) 
con.unwrap(Class.forName("org.apache.calcite.tools.RelRunner"));
+    try (PreparedStatement preparedStatement = ru.prepare(root)) {
+
+      String s = CalciteAssert.toString(preparedStatement.executeQuery());
+      final String result = ""
+          + "_MAP={id=02154, city=NORTH WALTHAM, loc=[-71.236497, 42.382492], 
pop=57871, state=MA}; city=NORTH WALTHAM\n";
+
+      //Validate query prepared
+      assertThat(root, hasTree(builderExpected));
+
+      //Validate result returned from ES
+      assertThat(s, is(result));
+    }
+  }
+}
+
+// End MatchTest.java
diff --git 
a/elasticsearch/src/test/java/org/apache/calcite/adapter/elasticsearch/QueryBuildersTest.java
 
b/elasticsearch/src/test/java/org/apache/calcite/adapter/elasticsearch/QueryBuildersTest.java
index 3f3099a..80bc5fe 100644
--- 
a/elasticsearch/src/test/java/org/apache/calcite/adapter/elasticsearch/QueryBuildersTest.java
+++ 
b/elasticsearch/src/test/java/org/apache/calcite/adapter/elasticsearch/QueryBuildersTest.java
@@ -172,6 +172,15 @@ public class QueryBuildersTest {
         toJson(QueryBuilders.matchAll()));
   }
 
+  @Test
+  public void match() throws IOException {
+    assertEquals("{\"match\":{\"foo\":[\"bar\"]}}",
+        toJson(QueryBuilders.matchesQuery("foo", 
Collections.singleton("bar"))));
+
+    assertEquals("{\"match\":{\"foo\":[true]}}",
+        toJson(QueryBuilders.matchesQuery("foo", 
Collections.singleton(true))));
+  }
+
   private String toJson(QueryBuilders.QueryBuilder builder) throws IOException 
{
     StringWriter writer = new StringWriter();
     JsonGenerator gen = mapper.getFactory().createGenerator(writer);

Reply via email to