This is an automated email from the ASF dual-hosted git repository.
sereda pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/calcite.git
The following commit(s) were added to refs/heads/master by this push:
new 61b858d [CALCITE-3023] Upgrade elastic search to 7.x (Takako
Shimamoto)
61b858d is described below
commit 61b858db13c2e1997e92172a09f38c639ffdceee
Author: shimamoto <[email protected]>
AuthorDate: Tue May 21 11:06:30 2019 +0900
[CALCITE-3023] Upgrade elastic search to 7.x (Takako Shimamoto)
---
.../adapter/elasticsearch/ElasticsearchJson.java | 55 +++++++++++++++++-
.../elasticsearch/ElasticsearchMapping.java | 15 ++---
.../adapter/elasticsearch/ElasticsearchSchema.java | 66 ++++++++--------------
.../elasticsearch/ElasticsearchSchemaFactory.java | 2 +-
.../adapter/elasticsearch/ElasticsearchTable.java | 23 ++++----
.../elasticsearch/ElasticsearchTransport.java | 20 +++----
.../elasticsearch/ElasticsearchVersion.java | 20 +++++--
.../adapter/elasticsearch/AggregationTest.java | 12 ++--
.../elasticsearch/ElasticSearchAdapterTest.java | 5 +-
.../elasticsearch/EmbeddedElasticsearchNode.java | 9 ++-
.../elasticsearch/EmbeddedElasticsearchPolicy.java | 27 +++++----
.../adapter/elasticsearch/ScrollingTest.java | 5 +-
pom.xml | 2 +-
site/_docs/elasticsearch_adapter.md | 34 ++++++-----
sqlline | 2 +-
sqlline.bat | 2 +-
16 files changed, 174 insertions(+), 125 deletions(-)
diff --git
a/elasticsearch/src/main/java/org/apache/calcite/adapter/elasticsearch/ElasticsearchJson.java
b/elasticsearch/src/main/java/org/apache/calcite/adapter/elasticsearch/ElasticsearchJson.java
index d67aed7..beb85f1 100644
---
a/elasticsearch/src/main/java/org/apache/calcite/adapter/elasticsearch/ElasticsearchJson.java
+++
b/elasticsearch/src/main/java/org/apache/calcite/adapter/elasticsearch/ElasticsearchJson.java
@@ -251,11 +251,11 @@ final class ElasticsearchJson {
@JsonIgnoreProperties(ignoreUnknown = true)
static class SearchHits {
- private final long total;
+ private final SearchTotal total;
private final List<SearchHit> hits;
@JsonCreator
- SearchHits(@JsonProperty("total")final long total,
+ SearchHits(@JsonProperty("total")final SearchTotal total,
@JsonProperty("hits") final List<SearchHit> hits) {
this.total = total;
this.hits = Objects.requireNonNull(hits, "hits");
@@ -265,13 +265,62 @@ final class ElasticsearchJson {
return this.hits;
}
- public long total() {
+ public SearchTotal total() {
return total;
}
}
/**
+ * Container for total hits
+ */
+ @JsonDeserialize(using = SearchTotalDeserializer.class)
+ static class SearchTotal {
+
+ private final long value;
+
+ SearchTotal(final long value) {
+ this.value = value;
+ }
+
+ public long value() {
+ return value;
+ }
+
+ }
+
+ /**
+ * Allows to de-serialize total hits structures.
+ */
+ static class SearchTotalDeserializer extends StdDeserializer<SearchTotal> {
+
+ SearchTotalDeserializer() {
+ super(SearchTotal.class);
+ }
+
+ @Override public SearchTotal deserialize(final JsonParser parser,
+ final DeserializationContext ctxt)
+ throws IOException {
+
+ JsonNode node = parser.getCodec().readTree(parser);
+ return parseSearchTotal(node);
+ }
+
+ private static SearchTotal parseSearchTotal(JsonNode node) {
+
+ final Number value;
+ if (node.isNumber()) {
+ value = node.numberValue();
+ } else {
+ value = node.get("value").numberValue();
+ }
+
+ return new SearchTotal(value.longValue());
+ }
+
+ }
+
+ /**
* Concrete result record which matched the query. Similar to {@code
SearchHit} in ES.
*/
@JsonIgnoreProperties(ignoreUnknown = true)
diff --git
a/elasticsearch/src/main/java/org/apache/calcite/adapter/elasticsearch/ElasticsearchMapping.java
b/elasticsearch/src/main/java/org/apache/calcite/adapter/elasticsearch/ElasticsearchMapping.java
index 93a8049..9195df1 100644
---
a/elasticsearch/src/main/java/org/apache/calcite/adapter/elasticsearch/ElasticsearchMapping.java
+++
b/elasticsearch/src/main/java/org/apache/calcite/adapter/elasticsearch/ElasticsearchMapping.java
@@ -34,8 +34,8 @@ import javax.annotation.Nullable;
/**
* Stores Elasticsearch
* <a
href="https://www.elastic.co/guide/en/elasticsearch/reference/current/mapping.html">
- * mapping</a> information for particular index/type. This information is
- * extracted from {@code /$index/$type/_mapping} endpoint.
+ * mapping</a> information for particular index. This information is
+ * extracted from {@code /$index/_mapping} endpoint.
*
* <p>Instances of this class are immutable.
*/
@@ -43,14 +43,11 @@ class ElasticsearchMapping {
private final String index;
- private final String type;
-
private final Map<String, Datatype> mapping;
- ElasticsearchMapping(final String index, final String type,
+ ElasticsearchMapping(final String index,
final Map<String, String> mapping) {
this.index = Objects.requireNonNull(index, "index");
- this.type = Objects.requireNonNull(type, "type");
Objects.requireNonNull(mapping, "mapping");
final Map<String, Datatype> transformed = mapping.entrySet().stream()
@@ -83,7 +80,7 @@ class ElasticsearchMapping {
Optional<JsonNode> missingValueFor(String fieldName) {
if (!mapping().containsKey(fieldName)) {
final String message = String.format(Locale.ROOT,
- "Field %s not defined for %s/%s", fieldName, index, type);
+ "Field %s not defined for %s", fieldName, index);
throw new IllegalArgumentException(message);
}
@@ -94,10 +91,6 @@ class ElasticsearchMapping {
return this.index;
}
- String type() {
- return this.type;
- }
-
/**
* Represents elastic data-type, like {@code long}, {@code keyword},
* {@code date} etc.
diff --git
a/elasticsearch/src/main/java/org/apache/calcite/adapter/elasticsearch/ElasticsearchSchema.java
b/elasticsearch/src/main/java/org/apache/calcite/adapter/elasticsearch/ElasticsearchSchema.java
index 888593b..e6c22e7 100644
---
a/elasticsearch/src/main/java/org/apache/calcite/adapter/elasticsearch/ElasticsearchSchema.java
+++
b/elasticsearch/src/main/java/org/apache/calcite/adapter/elasticsearch/ElasticsearchSchema.java
@@ -26,6 +26,7 @@ import com.google.common.base.Preconditions;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.Sets;
+import org.elasticsearch.client.Request;
import org.elasticsearch.client.Response;
import org.elasticsearch.client.RestClient;
@@ -39,14 +40,10 @@ import java.util.Objects;
import java.util.Set;
/**
- * Schema mapped onto an index of ELASTICSEARCH types.
- *
- * <p>Each table in the schema is an ELASTICSEARCH type in that index.
+ * Each table in the schema is an ELASTICSEARCH index.
*/
public class ElasticsearchSchema extends AbstractSchema {
- private final String index;
-
private final RestClient client;
private final ObjectMapper mapper;
@@ -60,91 +57,74 @@ public class ElasticsearchSchema extends AbstractSchema {
/**
* Allows schema to be instantiated from existing elastic search client.
- * This constructor is used in tests.
+ *
* @param client existing client instance
* @param mapper mapper for JSON (de)serialization
* @param index name of ES index
*/
public ElasticsearchSchema(RestClient client, ObjectMapper mapper, String
index) {
- this(client, mapper, index, null);
- }
-
- public ElasticsearchSchema(RestClient client, ObjectMapper mapper, String
index, String type) {
- this(client, mapper, index, type,
ElasticsearchTransport.DEFAULT_FETCH_SIZE);
+ this(client, mapper, index, ElasticsearchTransport.DEFAULT_FETCH_SIZE);
}
@VisibleForTesting
ElasticsearchSchema(RestClient client, ObjectMapper mapper,
- String index, String type,
- int fetchSize) {
+ String index, int fetchSize) {
super();
this.client = Objects.requireNonNull(client, "client");
this.mapper = Objects.requireNonNull(mapper, "mapper");
- this.index = Objects.requireNonNull(index, "index");
Preconditions.checkArgument(fetchSize > 0,
"invalid fetch size. Expected %s > 0", fetchSize);
this.fetchSize = fetchSize;
- if (type == null) {
+
+ if (index == null) {
try {
- this.tableMap = createTables(listTypesFromElastic());
+ this.tableMap = createTables(indicesFromElastic());
} catch (IOException e) {
- throw new UncheckedIOException("Couldn't get types for " + index, e);
+ throw new UncheckedIOException("Couldn't get indices", e);
}
} else {
- this.tableMap = createTables(Collections.singleton(type));
+ this.tableMap = createTables(Collections.singleton(index));
}
-
}
@Override protected Map<String, Table> getTableMap() {
return tableMap;
}
- private Map<String, Table> createTables(Iterable<String> types) {
+ private Map<String, Table> createTables(Iterable<String> indices) {
final ImmutableMap.Builder<String, Table> builder = ImmutableMap.builder();
- for (String type : types) {
+ for (String index : indices) {
final ElasticsearchTransport transport = new
ElasticsearchTransport(client, mapper,
- index, type, fetchSize);
- builder.put(type, new ElasticsearchTable(transport));
+ index, fetchSize);
+ builder.put(index, new ElasticsearchTable(transport));
}
return builder.build();
}
/**
- * Queries {@code _mapping} definition to automatically detect all types for
an index
+ * Queries {@code _alias} definition to automatically detect all indices
*
- * @return list of types associated with this index
+ * @return list of indices
* @throws IOException for any IO related issues
* @throws IllegalStateException if reply is not understood
*/
- private Set<String> listTypesFromElastic() throws IOException {
- final String endpoint = "/" + index + "/_mapping";
- final Response response = client.performRequest("GET", endpoint);
+ private Set<String> indicesFromElastic() throws IOException {
+ final String endpoint = "/_alias";
+ final Response response = client.performRequest(new Request("GET",
endpoint));
try (InputStream is = response.getEntity().getContent()) {
final JsonNode root = mapper.readTree(is);
- if (!root.isObject() || root.size() != 1) {
+ if (!(root.isObject() && root.size() > 0)) {
final String message = String.format(Locale.ROOT, "Invalid response
for %s/%s "
- + "Expected object of size 1 got %s (of size %d)",
response.getHost(),
+ + "Expected object of at least size 1 got %s (of size %d)",
response.getHost(),
response.getRequestLine(), root.getNodeType(), root.size());
throw new IllegalStateException(message);
}
- JsonNode mappings = root.iterator().next().get("mappings");
- if (mappings == null || mappings.size() == 0) {
- final String message = String.format(Locale.ROOT, "Index %s does not
have any types",
- index);
- throw new IllegalStateException(message);
- }
-
- Set<String> types = Sets.newHashSet(mappings.fieldNames());
- types.remove("_default_");
- return types;
+ Set<String> indices = Sets.newHashSet(root.fieldNames());
+ return indices;
}
}
- public String getIndex() {
- return index;
- }
}
// End ElasticsearchSchema.java
diff --git
a/elasticsearch/src/main/java/org/apache/calcite/adapter/elasticsearch/ElasticsearchSchemaFactory.java
b/elasticsearch/src/main/java/org/apache/calcite/adapter/elasticsearch/ElasticsearchSchemaFactory.java
index 7939457..000c4c0 100644
---
a/elasticsearch/src/main/java/org/apache/calcite/adapter/elasticsearch/ElasticsearchSchemaFactory.java
+++
b/elasticsearch/src/main/java/org/apache/calcite/adapter/elasticsearch/ElasticsearchSchemaFactory.java
@@ -66,7 +66,7 @@ public class ElasticsearchSchemaFactory implements
SchemaFactory {
final RestClient client = connect(coordinates);
final String index = (String) map.get("index");
- Preconditions.checkState(index != null, "'index' is missing in
configuration");
+
return new ElasticsearchSchema(client, new ObjectMapper(), index);
} catch (IOException e) {
throw new RuntimeException("Cannot parse values from json", e);
diff --git
a/elasticsearch/src/main/java/org/apache/calcite/adapter/elasticsearch/ElasticsearchTable.java
b/elasticsearch/src/main/java/org/apache/calcite/adapter/elasticsearch/ElasticsearchTable.java
index 29dc62a..bc9499d 100644
---
a/elasticsearch/src/main/java/org/apache/calcite/adapter/elasticsearch/ElasticsearchTable.java
+++
b/elasticsearch/src/main/java/org/apache/calcite/adapter/elasticsearch/ElasticsearchTable.java
@@ -59,7 +59,7 @@ import java.util.function.Predicate;
import java.util.stream.Collectors;
/**
- * Table based on an Elasticsearch type.
+ * Table based on an Elasticsearch index.
*/
public class ElasticsearchTable extends AbstractQueryableTable implements
TranslatableTable {
@@ -72,7 +72,6 @@ public class ElasticsearchTable extends
AbstractQueryableTable implements Transl
private final ElasticsearchVersion version;
private final String indexName;
- private final String typeName;
final ObjectMapper mapper;
final ElasticsearchTransport transport;
@@ -84,7 +83,6 @@ public class ElasticsearchTable extends
AbstractQueryableTable implements Transl
this.transport = Objects.requireNonNull(transport, "transport");
this.version = transport.version;
this.indexName = transport.indexName;
- this.typeName = transport.typeName;
this.mapper = transport.mapper();
}
@@ -105,11 +103,7 @@ public class ElasticsearchTable extends
AbstractQueryableTable implements Transl
}
/**
- * Executes a "find" operation on the underlying type.
- *
- * <p>For example,
- * <code>client.prepareSearch(index).setTypes(type)
- * .setSource("{\"fields\" : [\"state\"]}")</code></p>
+ * Executes a "find" operation on the underlying index.
*
* @param ops List of operations represented as Json strings.
* @param fields List of fields to project; or null to return map
@@ -263,6 +257,13 @@ public class ElasticsearchTable extends
AbstractQueryableTable implements Transl
// cleanup query. remove empty AGGREGATIONS element (if empty)
emptyAggRemover.accept(query);
+ // This must be set to true or else in 7.X and 6/7 mixed clusters
+ // will return lower bounded count values instead of an accurate count.
+ if (groupBy.isEmpty()
+ && version.elasticVersionMajor() >=
ElasticsearchVersion.ES6.elasticVersionMajor()) {
+ query.put("track_total_hits", true);
+ }
+
ElasticsearchJson.Result res =
transport.search(Collections.emptyMap()).apply(query);
final List<Map<String, Object>> result = new ArrayList<>();
@@ -283,7 +284,7 @@ public class ElasticsearchTable extends
AbstractQueryableTable implements Transl
// elastic exposes total number of documents matching a query in
"/hits/total" path
// this can be used for simple "select count(*) from table"
- final long total = res.searchHits().total();
+ final long total = res.searchHits().total().value();
if (groupBy.isEmpty()) {
// put totals automatically for count(*) expression(s), unless they
contain group by
@@ -296,7 +297,7 @@ public class ElasticsearchTable extends
AbstractQueryableTable implements Transl
ElasticsearchEnumerators.getter(fields, ImmutableMap.copyOf(mapping));
ElasticsearchJson.SearchHits hits =
- new ElasticsearchJson.SearchHits(total, result.stream()
+ new ElasticsearchJson.SearchHits(res.searchHits().total(),
result.stream()
.map(r -> new ElasticsearchJson.SearchHit("_id", r, null))
.collect(Collectors.toList()));
@@ -313,7 +314,7 @@ public class ElasticsearchTable extends
AbstractQueryableTable implements Transl
}
@Override public String toString() {
- return "ElasticsearchTable{" + indexName + "/" + typeName + "}";
+ return "ElasticsearchTable{" + indexName + "}";
}
@Override public <T> Queryable<T> asQueryable(QueryProvider queryProvider,
SchemaPlus schema,
diff --git
a/elasticsearch/src/main/java/org/apache/calcite/adapter/elasticsearch/ElasticsearchTransport.java
b/elasticsearch/src/main/java/org/apache/calcite/adapter/elasticsearch/ElasticsearchTransport.java
index 0c9c06a..8e52e98 100644
---
a/elasticsearch/src/main/java/org/apache/calcite/adapter/elasticsearch/ElasticsearchTransport.java
+++
b/elasticsearch/src/main/java/org/apache/calcite/adapter/elasticsearch/ElasticsearchTransport.java
@@ -38,6 +38,7 @@ import com.fasterxml.jackson.databind.node.ObjectNode;
import com.fasterxml.jackson.databind.node.TextNode;
import com.google.common.collect.ImmutableMap;
+import org.elasticsearch.client.Request;
import org.elasticsearch.client.Response;
import org.elasticsearch.client.RestClient;
import org.slf4j.Logger;
@@ -69,7 +70,6 @@ final class ElasticsearchTransport {
private final RestClient restClient;
final String indexName;
- final String typeName;
final ElasticsearchVersion version;
@@ -84,12 +84,10 @@ final class ElasticsearchTransport {
ElasticsearchTransport(final RestClient restClient,
final ObjectMapper mapper,
final String indexName,
- final String typeName,
final int fetchSize) {
this.mapper = Objects.requireNonNull(mapper, "mapper");
this.restClient = Objects.requireNonNull(restClient, "restClient");
this.indexName = Objects.requireNonNull(indexName, "indexName");
- this.typeName = Objects.requireNonNull(typeName, "typeName");
this.fetchSize = fetchSize;
this.version = version(); // cache version
this.mapping = fetchAndCreateMapping(); // cache mapping
@@ -120,13 +118,13 @@ final class ElasticsearchTransport {
* Build index mapping returning new instance of {@link
ElasticsearchMapping}.
*/
private ElasticsearchMapping fetchAndCreateMapping() {
- final String uri = String.format(Locale.ROOT, "/%s/%s/_mapping",
indexName, typeName);
+ final String uri = String.format(Locale.ROOT, "/%s/_mapping", indexName);
final ObjectNode root = rawHttp(ObjectNode.class).apply(new HttpGet(uri));
- ObjectNode properties = (ObjectNode)
root.elements().next().get("mappings").elements().next();
+ ObjectNode properties = (ObjectNode)
root.elements().next().get("mappings");
ImmutableMap.Builder<String, String> builder = ImmutableMap.builder();
ElasticsearchJson.visitMappingProperties(properties, builder::put);
- return new ElasticsearchMapping(indexName, typeName, builder.build());
+ return new ElasticsearchMapping(indexName, builder.build());
}
ObjectMapper mapper() {
@@ -206,7 +204,7 @@ final class ElasticsearchTransport {
Objects.requireNonNull(httpParams, "httpParams");
return query -> {
Hook.QUERY_PLAN.run(query);
- String path = String.format(Locale.ROOT, "/%s/%s/_search", indexName,
typeName);
+ String path = String.format(Locale.ROOT, "/%s/_search", indexName);
final HttpPost post;
try {
URIBuilder builder = new URIBuilder(path);
@@ -275,11 +273,11 @@ final class ElasticsearchTransport {
final HttpEntity entity = request instanceof HttpEntityEnclosingRequest
? ((HttpEntityEnclosingRequest) request).getEntity() : null;
- final Response response = restClient.performRequest(
+ final Request r = new Request(
request.getRequestLine().getMethod(),
- request.getRequestLine().getUri(),
- Collections.emptyMap(),
- entity);
+ request.getRequestLine().getUri());
+ r.setEntity(entity);
+ final Response response = restClient.performRequest(r);
final String payload = entity != null && entity.isRepeatable()
? EntityUtils.toString(entity) : "<empty>";
diff --git
a/elasticsearch/src/main/java/org/apache/calcite/adapter/elasticsearch/ElasticsearchVersion.java
b/elasticsearch/src/main/java/org/apache/calcite/adapter/elasticsearch/ElasticsearchVersion.java
index 3d774dd..5c7ff0b 100644
---
a/elasticsearch/src/main/java/org/apache/calcite/adapter/elasticsearch/ElasticsearchVersion.java
+++
b/elasticsearch/src/main/java/org/apache/calcite/adapter/elasticsearch/ElasticsearchVersion.java
@@ -25,11 +25,21 @@ import java.util.Objects;
*/
enum ElasticsearchVersion {
- ES2,
- ES5,
- ES6,
- ES7,
- UNKNOWN;
+ ES2(2),
+ ES5(5),
+ ES6(6),
+ ES7(7),
+ UNKNOWN(0);
+
+ private final int elasticVersionMajor;
+
+ ElasticsearchVersion(final int elasticVersionMajor) {
+ this.elasticVersionMajor = elasticVersionMajor;
+ }
+
+ public int elasticVersionMajor() {
+ return elasticVersionMajor;
+ }
static ElasticsearchVersion fromString(String version) {
Objects.requireNonNull(version, "version");
diff --git
a/elasticsearch/src/test/java/org/apache/calcite/adapter/elasticsearch/AggregationTest.java
b/elasticsearch/src/test/java/org/apache/calcite/adapter/elasticsearch/AggregationTest.java
index 25fcd17..2be4da2 100644
---
a/elasticsearch/src/test/java/org/apache/calcite/adapter/elasticsearch/AggregationTest.java
+++
b/elasticsearch/src/test/java/org/apache/calcite/adapter/elasticsearch/AggregationTest.java
@@ -116,7 +116,8 @@ public class AggregationTest {
.with(newConnectionFactory())
.query("select count(*) from view")
.queryContains(
- ElasticsearchChecker.elasticsearchChecker("_source:false, size:0"))
+ ElasticsearchChecker.elasticsearchChecker(
+ "_source:false, size:0, track_total_hits:true"))
.returns("EXPR$0=3\n");
CalciteAssert.that()
@@ -136,7 +137,8 @@ public class AggregationTest {
.with(newConnectionFactory())
.query("select count(*), sum(val1), sum(val2) from view")
.queryContains(
- ElasticsearchChecker.elasticsearchChecker("_source:false, size:0",
+ ElasticsearchChecker.elasticsearchChecker(
+ "_source:false, size:0, track_total_hits:true",
"aggregations:{'EXPR$0.value_count.field': '_id'",
"'EXPR$1.sum.field': 'val1'",
"'EXPR$2.sum.field': 'val2'}"))
@@ -146,7 +148,8 @@ public class AggregationTest {
.with(newConnectionFactory())
.query("select min(val1), max(val2), count(*) from view")
.queryContains(
- ElasticsearchChecker.elasticsearchChecker("_source:false, size:0",
+ ElasticsearchChecker.elasticsearchChecker(
+ "_source:false, size:0, track_total_hits:true",
"aggregations:{'EXPR$0.min.field': 'val1'",
"'EXPR$1.max.field': 'val2'",
"'EXPR$2.value_count.field': '_id'}"))
@@ -364,7 +367,8 @@ public class AggregationTest {
String.format(Locale.ROOT, "select max(cast(_MAP['val1'] as
integer)) as v1, "
+ "min(cast(_MAP['val2'] as integer)) as v2 from elastic.%s",
NAME))
.queryContains(
- ElasticsearchChecker.elasticsearchChecker("_source:false, size:0",
+ ElasticsearchChecker.elasticsearchChecker(
+ "_source:false, size:0, track_total_hits:true",
"aggregations:{'v1.max.field': 'val1'",
"'v2.min.field': 'val2'}"))
.returnsUnordered("v1=7; v2=5");
diff --git
a/elasticsearch/src/test/java/org/apache/calcite/adapter/elasticsearch/ElasticSearchAdapterTest.java
b/elasticsearch/src/test/java/org/apache/calcite/adapter/elasticsearch/ElasticSearchAdapterTest.java
index 0c028e2..2592a67 100644
---
a/elasticsearch/src/test/java/org/apache/calcite/adapter/elasticsearch/ElasticSearchAdapterTest.java
+++
b/elasticsearch/src/test/java/org/apache/calcite/adapter/elasticsearch/ElasticSearchAdapterTest.java
@@ -514,7 +514,7 @@ public class ElasticSearchAdapterTest {
.query("select count(*) from zips")
.queryContains(
ElasticsearchChecker.elasticsearchChecker("'_source':false",
- "size:0"))
+ "size:0", "track_total_hits:true"))
.returns("EXPR$0=149\n");
// check with limit (should still return correct result).
@@ -526,7 +526,7 @@ public class ElasticSearchAdapterTest {
.query("select count(*) as cnt from zips")
.queryContains(
ElasticsearchChecker.elasticsearchChecker("'_source':false",
- "size:0"))
+ "size:0", "track_total_hits:true"))
.returns("cnt=149\n");
calciteAssert()
@@ -534,6 +534,7 @@ public class ElasticSearchAdapterTest {
.queryContains(
ElasticsearchChecker.elasticsearchChecker("'_source':false",
"size:0",
+ "track_total_hits:true",
"aggregations:{'EXPR$0':{min:{field:'pop'}},'EXPR$1':{max:"
+ "{field:'pop'}}}"))
.returns("EXPR$0=21; EXPR$1=112047\n");
diff --git
a/elasticsearch/src/test/java/org/apache/calcite/adapter/elasticsearch/EmbeddedElasticsearchNode.java
b/elasticsearch/src/test/java/org/apache/calcite/adapter/elasticsearch/EmbeddedElasticsearchNode.java
index 93cf652..c5a56fc 100644
---
a/elasticsearch/src/test/java/org/apache/calcite/adapter/elasticsearch/EmbeddedElasticsearchNode.java
+++
b/elasticsearch/src/test/java/org/apache/calcite/adapter/elasticsearch/EmbeddedElasticsearchNode.java
@@ -38,6 +38,8 @@ import java.util.Arrays;
import java.util.Collection;
import java.util.Objects;
+import static java.util.Collections.emptyMap;
+
/**
* Represents a single elastic search node which can run embedded in a java
application.
*
@@ -154,8 +156,11 @@ class EmbeddedElasticsearchNode implements AutoCloseable {
private static class LocalNode extends Node {
private LocalNode(Settings settings, Collection<Class<? extends Plugin>>
classpathPlugins) {
- super(InternalSettingsPreparer.prepareEnvironment(settings, null),
- classpathPlugins);
+ super(
+ InternalSettingsPreparer.prepareEnvironment(settings, emptyMap(),
+ null, () -> "default_node_name"),
+ classpathPlugins,
+ false);
}
}
}
diff --git
a/elasticsearch/src/test/java/org/apache/calcite/adapter/elasticsearch/EmbeddedElasticsearchPolicy.java
b/elasticsearch/src/test/java/org/apache/calcite/adapter/elasticsearch/EmbeddedElasticsearchPolicy.java
index 8089057..1a2ca5f 100644
---
a/elasticsearch/src/test/java/org/apache/calcite/adapter/elasticsearch/EmbeddedElasticsearchPolicy.java
+++
b/elasticsearch/src/test/java/org/apache/calcite/adapter/elasticsearch/EmbeddedElasticsearchPolicy.java
@@ -26,13 +26,13 @@ import org.apache.http.entity.StringEntity;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.databind.node.ObjectNode;
+import org.elasticsearch.client.Request;
import org.elasticsearch.client.RestClient;
import org.elasticsearch.common.transport.TransportAddress;
import org.junit.rules.ExternalResource;
import java.io.IOException;
import java.util.ArrayList;
-import java.util.Collections;
import java.util.List;
import java.util.Locale;
import java.util.Map;
@@ -114,7 +114,7 @@ class EmbeddedElasticsearchPolicy extends ExternalResource {
ObjectNode mappings = mapper().createObjectNode();
- ObjectNode properties =
mappings.with("mappings").with(index).with("properties");
+ ObjectNode properties = mappings.with("mappings").with("properties");
for (Map.Entry<String, String> entry: mapping.entrySet()) {
applyMapping(properties, entry.getKey(), entry.getValue());
}
@@ -122,7 +122,9 @@ class EmbeddedElasticsearchPolicy extends ExternalResource {
// create index and mapping
final HttpEntity entity = new
StringEntity(mapper().writeValueAsString(mappings),
ContentType.APPLICATION_JSON);
- restClient().performRequest("PUT", "/" + index, Collections.emptyMap(),
entity);
+ final Request r = new Request("PUT", "/" + index);
+ r.setEntity(entity);
+ restClient().performRequest(r);
}
/**
@@ -147,13 +149,12 @@ class EmbeddedElasticsearchPolicy extends
ExternalResource {
Objects.requireNonNull(index, "index");
Objects.requireNonNull(document, "document");
String uri = String.format(Locale.ROOT,
- "/%s/%s/?refresh", index, index);
+ "/%s/_doc?refresh", index);
StringEntity entity = new
StringEntity(mapper().writeValueAsString(document),
ContentType.APPLICATION_JSON);
-
- restClient().performRequest("POST", uri,
- Collections.emptyMap(),
- entity);
+ final Request r = new Request("POST", uri);
+ r.setEntity(entity);
+ restClient().performRequest(r);
}
void insertBulk(String index, List<ObjectNode> documents) throws IOException
{
@@ -167,18 +168,16 @@ class EmbeddedElasticsearchPolicy extends
ExternalResource {
List<String> bulk = new ArrayList<>(documents.size() * 2);
for (ObjectNode doc: documents) {
- bulk.add("{\"index\": {} }"); // index/type will be derived from _bulk
URI
+ bulk.add(String.format(Locale.ROOT, "{\"index\": {\"_index\":\"%s\"}}",
index));
bulk.add(mapper().writeValueAsString(doc));
}
final StringEntity entity = new StringEntity(String.join("\n", bulk) +
"\n",
ContentType.APPLICATION_JSON);
- final String uri = String.format(Locale.ROOT, "/%s/%s/_bulk?refresh",
index, index);
-
- restClient().performRequest("POST", uri,
- Collections.emptyMap(),
- entity);
+ final Request r = new Request("POST", "/_bulk?refresh");
+ r.setEntity(entity);
+ restClient().performRequest(r);
}
/**
diff --git
a/elasticsearch/src/test/java/org/apache/calcite/adapter/elasticsearch/ScrollingTest.java
b/elasticsearch/src/test/java/org/apache/calcite/adapter/elasticsearch/ScrollingTest.java
index b792e20..77eb4ef 100644
---
a/elasticsearch/src/test/java/org/apache/calcite/adapter/elasticsearch/ScrollingTest.java
+++
b/elasticsearch/src/test/java/org/apache/calcite/adapter/elasticsearch/ScrollingTest.java
@@ -23,6 +23,7 @@ import org.apache.calcite.test.CalciteAssert;
import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.node.ObjectNode;
+import org.elasticsearch.client.Request;
import org.elasticsearch.client.Response;
import org.junit.BeforeClass;
import org.junit.ClassRule;
@@ -69,7 +70,7 @@ public class ScrollingTest {
final Connection connection =
DriverManager.getConnection("jdbc:calcite:");
final SchemaPlus root =
connection.unwrap(CalciteConnection.class).getRootSchema();
ElasticsearchSchema schema = new
ElasticsearchSchema(NODE.restClient(), NODE.mapper(),
- NAME, null, fetchSize);
+ NAME, fetchSize);
root.add("elastic", schema);
return connection;
}
@@ -100,7 +101,7 @@ public class ScrollingTest {
private void assertNoActiveScrolls() throws IOException {
// get node stats
final Response response = NODE.restClient()
- .performRequest("GET", "/_nodes/stats/indices/search");
+ .performRequest(new Request("GET", "/_nodes/stats/indices/search"));
try (InputStream is = response.getEntity().getContent()) {
final ObjectNode node = NODE.mapper().readValue(is, ObjectNode.class);
diff --git a/pom.xml b/pom.xml
index e48816c..b65e95d 100644
--- a/pom.xml
+++ b/pom.xml
@@ -76,7 +76,7 @@ limitations under the License.
<commons-dbcp2.version>2.5.0</commons-dbcp2.version>
<commons-lang3.version>3.8</commons-lang3.version>
<commons-pool2.version>2.6.0</commons-pool2.version>
- <elasticsearch.version>6.2.4</elasticsearch.version>
+ <elasticsearch.version>7.0.1</elasticsearch.version>
<esri-geometry-api.version>2.2.0</esri-geometry-api.version>
<findbugs.version>3.0.1</findbugs.version>
<drill-fmpp-maven-plugin.version>1.14.0</drill-fmpp-maven-plugin.version>
diff --git a/site/_docs/elasticsearch_adapter.md
b/site/_docs/elasticsearch_adapter.md
index 45f2bed..1c3a634 100644
--- a/site/_docs/elasticsearch_adapter.md
+++ b/site/_docs/elasticsearch_adapter.md
@@ -33,9 +33,7 @@ of the Elasticsearch adapter. The models can contain
definitions of
[materializations]({{ site.baseurl }}/docs/model.html#materialization).
The name of the tables defined in the model definition corresponds to
-[types](https://www.elastic.co/blog/what-is-an-elasticsearch-index) in
-Elasticsearch. The schema/database is represented by the `index` parameter
-in the model definition.
+indices in Elasticsearch.
A basic example of a model file is given below:
@@ -49,8 +47,7 @@ A basic example of a model file is given below:
"name": "elasticsearch",
"factory":
"org.apache.calcite.adapter.elasticsearch.ElasticsearchSchemaFactory",
"operand": {
- "coordinates": "{'127.0.0.1': 9300}",
- "index": "usa"
+ "coordinates": "{'127.0.0.1': 9200}"
}
}
]
@@ -66,19 +63,29 @@ $ ./sqlline
sqlline> !connect jdbc:calcite:model=model.json admin admin
{% endhighlight %}
-`sqlline` will now accept SQL queries which access your Elasticsearch types.
+You can also specify the index name that is represented by the `index`
parameter in the model definition:
+
+{% highlight json %}
+...
+
+ "operand": {
+ "coordinates": "{'127.0.0.1': 9200}",
+ "index": "usa"
+ }
+
+...
+{% endhighlight %}
+
+`sqlline` will now accept SQL queries which access your Elasticsearch.
The purpose of this adapter is to compile the query into the most efficient
Elasticsearch SEARCH JSON possible by exploiting filtering and sorting directly
in Elasticsearch where possible.
-For example, in the example dataset there is an Elasticsearch type
-named `zips` under index named `usa`.
-
We can issue a simple query to fetch the names of all the states
-stored in the type `zips`.
+stored in the index `usa`.
{% highlight sql %}
-sqlline> SELECT * from "zips";
+sqlline> SELECT * from "usa";
{% endhighlight %}
{% highlight json %}
@@ -133,7 +140,7 @@ The final source json given to Elasticsearch is below:
You can also query elastic search index without prior view definition:
{% highlight sql %}
-sqlline> SELECT _MAP['city'], _MAP['state'] from "elasticsearch"."zips" order
by _MAP['state'];
+sqlline> SELECT _MAP['city'], _MAP['state'] from "elasticsearch"."usa" order
by _MAP['state'];
{% endhighlight %}
### Use of Scrolling API
@@ -145,6 +152,7 @@ scroll is automatically cleared (removed) when all query
resuts are consumed.
### Supported versions
-Currently this adapter supports ElasticSearch versions 2.x (or newer).
Generally
+Currently this adapter supports ElasticSearch versions 6.x (or newer).
Generally
we try to follow official [support
schedule](https://www.elastic.co/support/eol).
+Also, types are not supported (this adapter only supports indices).
diff --git a/sqlline b/sqlline
index c26f001..fc60c90 100755
--- a/sqlline
+++ b/sqlline
@@ -37,7 +37,7 @@ if [ ! -f target/fullclasspath.txt ]; then
fi
CP=
-for module in core cassandra druid elasticsearch2 elasticsearch5 file mongodb
server spark splunk geode example/csv example/function; do
+for module in core cassandra druid elasticsearch file mongodb server spark
splunk geode example/csv example/function; do
CP=${CP}${module}/target/classes:
CP=${CP}${module}/target/test-classes:
done
diff --git a/sqlline.bat b/sqlline.bat
index b163e6f..d338981 100644
--- a/sqlline.bat
+++ b/sqlline.bat
@@ -23,6 +23,6 @@
:: Copy dependency jars on first call. (To force jar refresh, remove
target\dependencies)
if not exist target\dependencies (call mvn -B dependency:copy-dependencies
-DoverWriteReleases=false -DoverWriteSnapshots=false -DoverWriteIfNewer=true
-DoutputDirectory=target\dependencies)
-java -Xmx1G -cp
".\target\dependencies\*;core\target\dependencies\*;cassandra\target\dependencies\*;druid\target\dependencies\*;elasticsearch2\target\dependencies\*;elasticsearch5\target\dependencies\*;geode\target\dependencies\*;file\target\dependencies\*;mongodb\target\dependencies\*;server\target\dependencies\*;spark\target\dependencies\*;splunk\target\dependencies\*"
sqlline.SqlLine --verbose=true %*
+java -Xmx1G -cp
".\target\dependencies\*;core\target\dependencies\*;cassandra\target\dependencies\*;druid\target\dependencies\*;elasticsearch\target\dependencies\*;geode\target\dependencies\*;file\target\dependencies\*;mongodb\target\dependencies\*;server\target\dependencies\*;spark\target\dependencies\*;splunk\target\dependencies\*"
sqlline.SqlLine --verbose=true %*
:: End sqlline.bat