This is an automated email from the ASF dual-hosted git repository.

sereda pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/calcite.git


The following commit(s) were added to refs/heads/master by this push:
     new f6aa7a9  [CALCITE-3335] ElasticSearch adapter. Introduce configuration 
parameter "hosts" which deprecates previous "coordinates". (Shikha Somani)
f6aa7a9 is described below

commit f6aa7a927feaa88bd4076f9098203346b89ae80d
Author: Shikha <[email protected]>
AuthorDate: Mon Sep 9 11:45:57 2019 -0400

    [CALCITE-3335] ElasticSearch adapter. Introduce configuration parameter 
"hosts" which deprecates previous "coordinates". (Shikha Somani)
---
 .../elasticsearch/ElasticsearchSchemaFactory.java  | 57 +++++++++++++++-------
 1 file changed, 39 insertions(+), 18 deletions(-)

diff --git 
a/elasticsearch/src/main/java/org/apache/calcite/adapter/elasticsearch/ElasticsearchSchemaFactory.java
 
b/elasticsearch/src/main/java/org/apache/calcite/adapter/elasticsearch/ElasticsearchSchemaFactory.java
index 000c4c0..a5a01ea 100644
--- 
a/elasticsearch/src/main/java/org/apache/calcite/adapter/elasticsearch/ElasticsearchSchemaFactory.java
+++ 
b/elasticsearch/src/main/java/org/apache/calcite/adapter/elasticsearch/ElasticsearchSchemaFactory.java
@@ -28,12 +28,15 @@ import com.fasterxml.jackson.databind.ObjectMapper;
 import com.google.common.base.Preconditions;
 
 import org.elasticsearch.client.RestClient;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 import java.io.IOException;
-import java.util.LinkedHashSet;
+import java.util.List;
 import java.util.Map;
 import java.util.Objects;
-import java.util.Set;
+import java.util.stream.Collectors;
+
 
 /**
  * Factory that creates an {@link ElasticsearchSchema}.
@@ -43,6 +46,8 @@ import java.util.Set;
 @SuppressWarnings("UnusedDeclaration")
 public class ElasticsearchSchemaFactory implements SchemaFactory {
 
+  private static final Logger LOGGER = 
LoggerFactory.getLogger(ElasticsearchSchemaFactory.class);
+
   public ElasticsearchSchemaFactory() {
   }
 
@@ -55,16 +60,35 @@ public class ElasticsearchSchemaFactory implements 
SchemaFactory {
     mapper.configure(JsonParser.Feature.ALLOW_SINGLE_QUOTES, true);
 
     try {
-      final String coordinatesString = (String) map.get("coordinates");
-      Preconditions.checkState(coordinatesString != null,
-          "'coordinates' is missing in configuration");
 
-      final Map<String, Integer> coordinates = 
mapper.readValue(coordinatesString,
-          new TypeReference<Map<String, Integer>>() { });
+      List<HttpHost> hosts;
 
-      // create client
-      final RestClient client = connect(coordinates);
+      if (map.containsKey("hosts")) {
+        final List<String> configHosts = mapper.readValue((String) 
map.get("hosts"),
+                new TypeReference<List<String>>() { });
+
+        hosts = configHosts
+                .stream()
+                .map(host -> HttpHost.create(host))
+                .collect(Collectors.toList());
+      } else if (map.containsKey("coordinates")) {
+        final Map<String, Integer> coordinates = mapper.readValue((String) 
map.get("coordinates"),
+                new TypeReference<Map<String, Integer>>() { });
+
+        hosts =  coordinates
+                .entrySet()
+                .stream()
+                .map(entry -> new HttpHost(entry.getKey(), entry.getValue()))
+                .collect(Collectors.toList());
 
+        LOGGER.warn("Prefer using hosts, coordinates is deprecated.");
+      } else {
+        throw new IllegalArgumentException
+        ("Both 'coordinates' and 'hosts' is missing in configuration. Provide 
one of them.");
+      }
+
+      // create client
+      final RestClient client = connect(hosts);
       final String index = (String) map.get("index");
 
       return new ElasticsearchSchema(client, new ObjectMapper(), index);
@@ -75,18 +99,15 @@ public class ElasticsearchSchemaFactory implements 
SchemaFactory {
 
   /**
    * Builds elastic rest client from user configuration
-   * @param coordinates list of {@code hostname/port} to connect to
+   * @param hosts list of ES HTTP Hosts to connect to
    * @return newly initialized low-level rest http client for ES
    */
-  private static RestClient connect(Map<String, Integer> coordinates) {
-    Objects.requireNonNull(coordinates, "coordinates");
-    Preconditions.checkArgument(!coordinates.isEmpty(), "no ES coordinates 
specified");
-    final Set<HttpHost> set = new LinkedHashSet<>();
-    for (Map.Entry<String, Integer> entry: coordinates.entrySet()) {
-      set.add(new HttpHost(entry.getKey(), entry.getValue()));
-    }
+  private static RestClient connect(List<HttpHost> hosts) {
+
+    Objects.requireNonNull(hosts, "hosts or coordinates");
+    Preconditions.checkArgument(!hosts.isEmpty(), "no ES hosts specified");
 
-    return RestClient.builder(set.toArray(new HttpHost[0])).build();
+    return RestClient.builder(hosts.toArray(new 
HttpHost[hosts.size()])).build();
   }
 
 }

Reply via email to