This is an automated email from the ASF dual-hosted git repository.
sereda pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/calcite.git
The following commit(s) were added to refs/heads/master by this push:
new f6aa7a9 [CALCITE-3335] ElasticSearch adapter. Introduce configuration
parameter "hosts" which deprecates previous "coordinates". (Shikha Somani)
f6aa7a9 is described below
commit f6aa7a927feaa88bd4076f9098203346b89ae80d
Author: Shikha <[email protected]>
AuthorDate: Mon Sep 9 11:45:57 2019 -0400
[CALCITE-3335] ElasticSearch adapter. Introduce configuration parameter
"hosts" which deprecates previous "coordinates". (Shikha Somani)
---
.../elasticsearch/ElasticsearchSchemaFactory.java | 57 +++++++++++++++-------
1 file changed, 39 insertions(+), 18 deletions(-)
diff --git
a/elasticsearch/src/main/java/org/apache/calcite/adapter/elasticsearch/ElasticsearchSchemaFactory.java
b/elasticsearch/src/main/java/org/apache/calcite/adapter/elasticsearch/ElasticsearchSchemaFactory.java
index 000c4c0..a5a01ea 100644
---
a/elasticsearch/src/main/java/org/apache/calcite/adapter/elasticsearch/ElasticsearchSchemaFactory.java
+++
b/elasticsearch/src/main/java/org/apache/calcite/adapter/elasticsearch/ElasticsearchSchemaFactory.java
@@ -28,12 +28,15 @@ import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.base.Preconditions;
import org.elasticsearch.client.RestClient;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
import java.io.IOException;
-import java.util.LinkedHashSet;
+import java.util.List;
import java.util.Map;
import java.util.Objects;
-import java.util.Set;
+import java.util.stream.Collectors;
+
/**
* Factory that creates an {@link ElasticsearchSchema}.
@@ -43,6 +46,8 @@ import java.util.Set;
@SuppressWarnings("UnusedDeclaration")
public class ElasticsearchSchemaFactory implements SchemaFactory {
+ private static final Logger LOGGER =
LoggerFactory.getLogger(ElasticsearchSchemaFactory.class);
+
public ElasticsearchSchemaFactory() {
}
@@ -55,16 +60,35 @@ public class ElasticsearchSchemaFactory implements
SchemaFactory {
mapper.configure(JsonParser.Feature.ALLOW_SINGLE_QUOTES, true);
try {
- final String coordinatesString = (String) map.get("coordinates");
- Preconditions.checkState(coordinatesString != null,
- "'coordinates' is missing in configuration");
- final Map<String, Integer> coordinates =
mapper.readValue(coordinatesString,
- new TypeReference<Map<String, Integer>>() { });
+ List<HttpHost> hosts;
- // create client
- final RestClient client = connect(coordinates);
+ if (map.containsKey("hosts")) {
+ final List<String> configHosts = mapper.readValue((String)
map.get("hosts"),
+ new TypeReference<List<String>>() { });
+
+ hosts = configHosts
+ .stream()
+ .map(host -> HttpHost.create(host))
+ .collect(Collectors.toList());
+ } else if (map.containsKey("coordinates")) {
+ final Map<String, Integer> coordinates = mapper.readValue((String)
map.get("coordinates"),
+ new TypeReference<Map<String, Integer>>() { });
+
+ hosts = coordinates
+ .entrySet()
+ .stream()
+ .map(entry -> new HttpHost(entry.getKey(), entry.getValue()))
+ .collect(Collectors.toList());
+ LOGGER.warn("Prefer using hosts, coordinates is deprecated.");
+ } else {
+ throw new IllegalArgumentException
+ ("Both 'coordinates' and 'hosts' is missing in configuration. Provide
one of them.");
+ }
+
+ // create client
+ final RestClient client = connect(hosts);
final String index = (String) map.get("index");
return new ElasticsearchSchema(client, new ObjectMapper(), index);
@@ -75,18 +99,15 @@ public class ElasticsearchSchemaFactory implements
SchemaFactory {
/**
* Builds elastic rest client from user configuration
- * @param coordinates list of {@code hostname/port} to connect to
+ * @param hosts list of ES HTTP Hosts to connect to
* @return newly initialized low-level rest http client for ES
*/
- private static RestClient connect(Map<String, Integer> coordinates) {
- Objects.requireNonNull(coordinates, "coordinates");
- Preconditions.checkArgument(!coordinates.isEmpty(), "no ES coordinates
specified");
- final Set<HttpHost> set = new LinkedHashSet<>();
- for (Map.Entry<String, Integer> entry: coordinates.entrySet()) {
- set.add(new HttpHost(entry.getKey(), entry.getValue()));
- }
+ private static RestClient connect(List<HttpHost> hosts) {
+
+ Objects.requireNonNull(hosts, "hosts or coordinates");
+ Preconditions.checkArgument(!hosts.isEmpty(), "no ES hosts specified");
- return RestClient.builder(set.toArray(new HttpHost[0])).build();
+ return RestClient.builder(hosts.toArray(new
HttpHost[hosts.size()])).build();
}
}