This is an automated email from the ASF dual-hosted git repository.
sereda pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/calcite.git
The following commit(s) were added to refs/heads/master by this push:
new e6f91d2 [CALCITE-3437] ElasticSearch adapter. Support MatchQuery in
elasticsearch (Shlok Srivastava)
e6f91d2 is described below
commit e6f91d2e92acbc8dccd375b777b71265677dea16
Author: shlok <[email protected]>
AuthorDate: Fri Oct 25 14:47:30 2019 +0530
[CALCITE-3437] ElasticSearch adapter. Support MatchQuery in elasticsearch
(Shlok Srivastava)
---
.../org/apache/calcite/test/CalciteAssert.java | 2 +-
.../adapter/elasticsearch/PredicateAnalyzer.java | 19 +-
.../adapter/elasticsearch/QueryBuilders.java | 73 +++++++
.../calcite/adapter/elasticsearch/MatchTest.java | 217 +++++++++++++++++++++
.../adapter/elasticsearch/QueryBuildersTest.java | 9 +
5 files changed, 318 insertions(+), 2 deletions(-)
diff --git a/core/src/test/java/org/apache/calcite/test/CalciteAssert.java
b/core/src/test/java/org/apache/calcite/test/CalciteAssert.java
index 151102d..65aec7d 100644
--- a/core/src/test/java/org/apache/calcite/test/CalciteAssert.java
+++ b/core/src/test/java/org/apache/calcite/test/CalciteAssert.java
@@ -672,7 +672,7 @@ public class CalciteAssert {
}
/** Converts a {@link ResultSet} to a string. */
- static String toString(ResultSet resultSet) throws SQLException {
+ public static String toString(ResultSet resultSet) throws SQLException {
return new ResultSetFormatter().resultSet(resultSet).string();
}
diff --git
a/elasticsearch/src/main/java/org/apache/calcite/adapter/elasticsearch/PredicateAnalyzer.java
b/elasticsearch/src/main/java/org/apache/calcite/adapter/elasticsearch/PredicateAnalyzer.java
index f2955d3..fe61b52 100644
---
a/elasticsearch/src/main/java/org/apache/calcite/adapter/elasticsearch/PredicateAnalyzer.java
+++
b/elasticsearch/src/main/java/org/apache/calcite/adapter/elasticsearch/PredicateAnalyzer.java
@@ -51,8 +51,8 @@ import static
org.apache.calcite.adapter.elasticsearch.QueryBuilders.rangeQuery;
import static
org.apache.calcite.adapter.elasticsearch.QueryBuilders.regexpQuery;
import static org.apache.calcite.adapter.elasticsearch.QueryBuilders.termQuery;
-import static java.lang.String.format;
+import static java.lang.String.format;
/**
* Query predicate analyzer. Uses visitor pattern to traverse existing
expression
* and convert it to {@link QueryBuilder}.
@@ -166,6 +166,7 @@ class PredicateAnalyzer {
switch (syntax) {
case BINARY:
switch (call.getKind()) {
+ case CONTAINS:
case AND:
case OR:
case LIKE:
@@ -233,6 +234,8 @@ class PredicateAnalyzer {
return toCastExpression(call);
case LIKE:
return binary(call);
+ case CONTAINS:
+ return binary(call);
default:
// manually process ITEM($0, 'foo') which in our case will be named
attribute
if (call.getOperator().getName().equalsIgnoreCase("ITEM")) {
@@ -349,6 +352,8 @@ class PredicateAnalyzer {
}
switch (call.getKind()) {
+ case CONTAINS:
+ return QueryExpression.create(pair.getKey()).contains(pair.getValue());
case LIKE:
throw new UnsupportedOperationException("LIKE not yet supported");
case EQUALS:
@@ -541,6 +546,8 @@ class PredicateAnalyzer {
return false;
}
+ public abstract QueryExpression contains(LiteralExpression literal);
+
/**
* Negate {@code this} QueryExpression (not the next one).
*/
@@ -641,6 +648,11 @@ class PredicateAnalyzer {
+ "cannot be applied to a compound expression");
}
+ @Override public QueryExpression contains(LiteralExpression literal) {
+ throw new PredicateAnalyzerException("SqlOperatorImpl ['contains'] "
+ + "cannot be applied to a compound expression");
+ }
+
@Override public QueryExpression notExists() {
throw new PredicateAnalyzerException("SqlOperatorImpl ['notExists'] "
+ "cannot be applied to a compound expression");
@@ -741,6 +753,11 @@ class PredicateAnalyzer {
return this;
}
+ @Override public QueryExpression contains(LiteralExpression literal) {
+ builder = QueryBuilders.matchQuery(getFieldReference(), literal.value());
+ return this;
+ }
+
@Override public QueryExpression notLike(LiteralExpression literal) {
builder = boolQuery()
// NOT LIKE should return false when field is NULL
diff --git
a/elasticsearch/src/main/java/org/apache/calcite/adapter/elasticsearch/QueryBuilders.java
b/elasticsearch/src/main/java/org/apache/calcite/adapter/elasticsearch/QueryBuilders.java
index b046cb5..4752325 100644
---
a/elasticsearch/src/main/java/org/apache/calcite/adapter/elasticsearch/QueryBuilders.java
+++
b/elasticsearch/src/main/java/org/apache/calcite/adapter/elasticsearch/QueryBuilders.java
@@ -125,6 +125,26 @@ class QueryBuilders {
* @param name The field name
* @param values The terms
*/
+ static MatchesQueryBuilder matchesQuery(String name, Iterable<?> values) {
+ return new MatchesQueryBuilder(name, values);
+ }
+
+ /**
+ * A Query that matches documents containing a term.
+ *
+ * @param name The name of the field
+ * @param value The value of the term
+ */
+ static MatchQueryBuilder matchQuery(String name, Object value) {
+ return new MatchQueryBuilder(name, value);
+ }
+
+ /**
+ * A filer for a field based on several terms matching on any of them.
+ *
+ * @param name The field name
+ * @param values The terms
+ */
static TermsQueryBuilder termsQuery(String name, Iterable<?> values) {
return new TermsQueryBuilder(name, values);
}
@@ -309,6 +329,59 @@ class QueryBuilders {
}
}
+
+
+ /**
+ * A Query that matches documents containing a term.
+ */
+ static class MatchQueryBuilder extends QueryBuilder {
+ private final String fieldName;
+ private final Object value;
+
+ private MatchQueryBuilder(final String fieldName, final Object value) {
+ this.fieldName = Objects.requireNonNull(fieldName, "fieldName");
+ this.value = Objects.requireNonNull(value, "value");
+ }
+
+ @Override void writeJson(final JsonGenerator generator) throws IOException
{
+ generator.writeStartObject();
+ generator.writeFieldName("match");
+ generator.writeStartObject();
+ generator.writeFieldName(fieldName);
+ writeObject(generator, value);
+ generator.writeEndObject();
+ generator.writeEndObject();
+ }
+ }
+
+
+ /**
+ * A filter for a field based on several terms matching on any of them.
+ */
+ private static class MatchesQueryBuilder extends QueryBuilder {
+ private final String fieldName;
+ private final Iterable<?> values;
+
+ private MatchesQueryBuilder(final String fieldName, final Iterable<?>
values) {
+ this.fieldName = Objects.requireNonNull(fieldName, "fieldName");
+ this.values = Objects.requireNonNull(values, "values");
+ }
+
+ @Override void writeJson(final JsonGenerator generator) throws IOException
{
+ generator.writeStartObject();
+ generator.writeFieldName("match");
+ generator.writeStartObject();
+ generator.writeFieldName(fieldName);
+ generator.writeStartArray();
+ for (Object value: values) {
+ writeObject(generator, value);
+ }
+ generator.writeEndArray();
+ generator.writeEndObject();
+ generator.writeEndObject();
+ }
+ }
+
/**
* Write usually simple (scalar) value (string, number, boolean or null) to
json output.
* In case of complex objects delegates to jackson serialization.
diff --git
a/elasticsearch/src/test/java/org/apache/calcite/adapter/elasticsearch/MatchTest.java
b/elasticsearch/src/test/java/org/apache/calcite/adapter/elasticsearch/MatchTest.java
new file mode 100644
index 0000000..2f4bfa0
--- /dev/null
+++
b/elasticsearch/src/test/java/org/apache/calcite/adapter/elasticsearch/MatchTest.java
@@ -0,0 +1,217 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to you under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.calcite.adapter.elasticsearch;
+
+
+import org.apache.calcite.jdbc.CalciteConnection;
+import org.apache.calcite.rel.RelNode;
+import org.apache.calcite.rel.type.RelDataType;
+import org.apache.calcite.rel.type.RelDataTypeFactory;
+import org.apache.calcite.rel.type.RelDataTypeSystem;
+import org.apache.calcite.rex.RexBuilder;
+import org.apache.calcite.rex.RexNode;
+import org.apache.calcite.schema.SchemaPlus;
+import org.apache.calcite.schema.impl.ViewTable;
+import org.apache.calcite.schema.impl.ViewTableMacro;
+import org.apache.calcite.sql.SqlCollation;
+import org.apache.calcite.sql.fun.SqlStdOperatorTable;
+import org.apache.calcite.sql.parser.SqlParser;
+import org.apache.calcite.sql.type.SqlTypeFactoryImpl;
+import org.apache.calcite.sql.type.SqlTypeName;
+import org.apache.calcite.test.CalciteAssert;
+import org.apache.calcite.tools.FrameworkConfig;
+import org.apache.calcite.tools.Frameworks;
+import org.apache.calcite.tools.RelBuilder;
+import org.apache.calcite.tools.RelRunner;
+import org.apache.calcite.util.NlsString;
+
+import com.fasterxml.jackson.databind.node.ObjectNode;
+import com.google.common.collect.ImmutableMap;
+import com.google.common.io.LineProcessor;
+import com.google.common.io.Resources;
+
+
+import org.junit.BeforeClass;
+import org.junit.ClassRule;
+import org.junit.Test;
+
+import static org.apache.calcite.test.Matchers.hasTree;
+
+import static org.hamcrest.CoreMatchers.is;
+import static org.junit.Assert.assertThat;
+
+import java.io.IOException;
+import java.nio.charset.StandardCharsets;
+import java.sql.Connection;
+import java.sql.DriverManager;
+import java.sql.PreparedStatement;
+import java.sql.SQLException;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+/**
+ * Testing Elasticsearch match query.
+ */
+public class MatchTest {
+
+ @ClassRule //init once for all tests
+ public static final EmbeddedElasticsearchPolicy NODE =
EmbeddedElasticsearchPolicy.create();
+
+ /** Default index/type name */
+ private static final String ZIPS = "zips";
+ private static final int ZIPS_SIZE = 149;
+
+ /**
+ * Used to create {@code zips} index and insert zip data in bulk.
+ * @throws Exception when instance setup failed
+ */
+ @BeforeClass
+ public static void setup() throws Exception {
+ final Map<String, String> mapping = ImmutableMap.of("city", "text",
"state",
+ "keyword", "pop", "long");
+
+ NODE.createIndex(ZIPS, mapping);
+
+ // load records from file
+ final List<ObjectNode> bulk = new ArrayList<>();
+
Resources.readLines(ElasticSearchAdapterTest.class.getResource("/zips-mini.json"),
+ StandardCharsets.UTF_8, new LineProcessor<Void>() {
+ @Override public boolean processLine(String line) throws IOException
{
+ line = line.replaceAll("_id", "id"); // _id is a reserved
attribute in ES
+ bulk.add((ObjectNode) NODE.mapper().readTree(line));
+ return true;
+ }
+
+ @Override public Void getResult() {
+ return null;
+ }
+ });
+
+ if (bulk.isEmpty()) {
+ throw new IllegalStateException("No records to index. Empty file ?");
+ }
+
+ NODE.insertBulk(ZIPS, bulk);
+ }
+
+ private CalciteAssert.ConnectionFactory newConnectionFactory() {
+ return new CalciteAssert.ConnectionFactory() {
+ @Override public Connection createConnection() throws SQLException {
+ final Connection connection =
DriverManager.getConnection("jdbc:calcite:lex=JAVA");
+ final SchemaPlus root =
connection.unwrap(CalciteConnection.class).getRootSchema();
+
+ root.add("elastic", new ElasticsearchSchema(NODE.restClient(),
NODE.mapper(), ZIPS));
+
+ // add calcite view programmatically
+ final String viewSql = "select cast(_MAP['city'] AS varchar(20)) AS
\"city\", "
+ + " cast(_MAP['loc'][0] AS float) AS \"longitude\",\n"
+ + " cast(_MAP['loc'][1] AS float) AS \"latitude\",\n"
+ + " cast(_MAP['pop'] AS integer) AS \"pop\", "
+ + " cast(_MAP['state'] AS varchar(2)) AS \"state\", "
+ + " cast(_MAP['id'] AS varchar(5)) AS \"id\" "
+ + "from \"elastic\".\"zips\"";
+
+ ViewTableMacro macro = ViewTable.viewMacro(root, viewSql,
+ Collections.singletonList("elastic"), Arrays.asList("elastic",
"view"), false);
+ root.add("zips", macro);
+
+ return connection;
+ }
+ };
+ }
+
+ /**
+ * Test the ElasticSearch match query. The match query is translated from
CONTAINS query which
+ * is build using RelBuilder, RexBuilder because the normal sql query
assumes CONTAINS query
+ * is for date/period range
+ *
+ * Equivalent SQL query: select * from zips where city contains 'waltham'
+ *
+ * ElasticSearch query for it:
+ * {"query":{"constant_score":{"filter":{"match":{"city":"waltham"}}}}}
+ *
+ * @throws Exception
+ */
+ @Test
+ public void testMatchQuery() throws Exception {
+
+ CalciteConnection con = (CalciteConnection) newConnectionFactory()
+ .createConnection();
+ SchemaPlus postschema = con.getRootSchema().getSubSchema("elastic");
+
+ FrameworkConfig postConfig = Frameworks.newConfigBuilder()
+ .parserConfig(SqlParser.Config.DEFAULT)
+ .defaultSchema(postschema)
+ .build();
+
+ final RelBuilder builder = RelBuilder.create(postConfig);
+ builder.scan(ZIPS);
+
+ final RelDataTypeFactory relDataTypeFactory = new SqlTypeFactoryImpl(
+ RelDataTypeSystem.DEFAULT);
+ final RexBuilder rexbuilder = new RexBuilder(relDataTypeFactory);
+
+ RexNode nameRexNode = rexbuilder.makeCall(SqlStdOperatorTable.ITEM,
+
rexbuilder.makeInputRef(relDataTypeFactory.createSqlType(SqlTypeName.ANY), 0),
+ rexbuilder.makeCharLiteral(
+ new NlsString("city",
rexbuilder.getTypeFactory().getDefaultCharset().name(),
+ SqlCollation.COERCIBLE)));
+
+ RelDataType mapType = relDataTypeFactory.createMapType(
+ relDataTypeFactory.createSqlType(SqlTypeName.VARCHAR),
+ relDataTypeFactory.createTypeWithNullability(
+ relDataTypeFactory.createSqlType(SqlTypeName.ANY), true));
+
+ ArrayList<RexNode> namedList = new ArrayList<RexNode>(2);
+ namedList.add(rexbuilder.makeInputRef(mapType, 0));
+ namedList.add(nameRexNode);
+
+ //Add fields in builder stack so it is accessible while filter preparation
+ builder.projectNamed(namedList, Arrays.asList("_MAP", "city"), true);
+
+ RexNode filterRexNode = builder
+ .call(SqlStdOperatorTable.CONTAINS, builder.field("city"),
+ builder.literal("waltham"));
+ builder.filter(filterRexNode);
+
+ String builderExpected = ""
+ + "LogicalFilter(condition=[CONTAINS($1, 'waltham')])\n"
+ + " LogicalProject(_MAP=[$0], city=[ITEM($0, 'city')])\n"
+ + " LogicalTableScan(table=[[elastic, zips]])\n";
+
+ RelNode root = builder.build();
+
+ RelRunner ru = (RelRunner)
con.unwrap(Class.forName("org.apache.calcite.tools.RelRunner"));
+ try (PreparedStatement preparedStatement = ru.prepare(root)) {
+
+ String s = CalciteAssert.toString(preparedStatement.executeQuery());
+ final String result = ""
+ + "_MAP={id=02154, city=NORTH WALTHAM, loc=[-71.236497, 42.382492],
pop=57871, state=MA}; city=NORTH WALTHAM\n";
+
+ //Validate query prepared
+ assertThat(root, hasTree(builderExpected));
+
+ //Validate result returned from ES
+ assertThat(s, is(result));
+ }
+ }
+}
+
+// End MatchTest.java
diff --git
a/elasticsearch/src/test/java/org/apache/calcite/adapter/elasticsearch/QueryBuildersTest.java
b/elasticsearch/src/test/java/org/apache/calcite/adapter/elasticsearch/QueryBuildersTest.java
index 3f3099a..80bc5fe 100644
---
a/elasticsearch/src/test/java/org/apache/calcite/adapter/elasticsearch/QueryBuildersTest.java
+++
b/elasticsearch/src/test/java/org/apache/calcite/adapter/elasticsearch/QueryBuildersTest.java
@@ -172,6 +172,15 @@ public class QueryBuildersTest {
toJson(QueryBuilders.matchAll()));
}
+ @Test
+ public void match() throws IOException {
+ assertEquals("{\"match\":{\"foo\":[\"bar\"]}}",
+ toJson(QueryBuilders.matchesQuery("foo",
Collections.singleton("bar"))));
+
+ assertEquals("{\"match\":{\"foo\":[true]}}",
+ toJson(QueryBuilders.matchesQuery("foo",
Collections.singleton(true))));
+ }
+
private String toJson(QueryBuilders.QueryBuilder builder) throws IOException
{
StringWriter writer = new StringWriter();
JsonGenerator gen = mapper.getFactory().createGenerator(writer);