Repository: camel Updated Branches: refs/heads/master 1451499b8 -> fba8aa47c
This closes #507 More parameters are available for Elasticsearch component, including replication type, write consistency level and multiple transport addresses. Project: http://git-wip-us.apache.org/repos/asf/camel/repo Commit: http://git-wip-us.apache.org/repos/asf/camel/commit/fba8aa47 Tree: http://git-wip-us.apache.org/repos/asf/camel/tree/fba8aa47 Diff: http://git-wip-us.apache.org/repos/asf/camel/diff/fba8aa47 Branch: refs/heads/master Commit: fba8aa47cc3885cdf71d57dec201a48272ea7577 Parents: 1451499 Author: Mauricio Jost <mauricio.j...@activeeon.com> Authored: Mon Apr 27 16:30:46 2015 +0200 Committer: Andrea Cosentino <anco...@gmail.com> Committed: Mon Apr 27 22:05:41 2015 +0200 ---------------------------------------------------------------------- .../ElasticsearchConfiguration.java | 88 ++++++++++++++- .../elasticsearch/ElasticsearchEndpoint.java | 41 ++++--- .../elasticsearch/ElasticsearchProducer.java | 23 ++++ .../ElasticsearchActionRequestConverter.java | 18 ++-- .../ElasticsearchComponentTest.java | 45 ++++++++ .../ElasticsearchConfigurationTest.java | 108 +++++++++++++++++++ 6 files changed, 301 insertions(+), 22 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/camel/blob/fba8aa47/components/camel-elasticsearch/src/main/java/org/apache/camel/component/elasticsearch/ElasticsearchConfiguration.java ---------------------------------------------------------------------- diff --git a/components/camel-elasticsearch/src/main/java/org/apache/camel/component/elasticsearch/ElasticsearchConfiguration.java b/components/camel-elasticsearch/src/main/java/org/apache/camel/component/elasticsearch/ElasticsearchConfiguration.java index 7db78c7..d2568d3 100644 --- a/components/camel-elasticsearch/src/main/java/org/apache/camel/component/elasticsearch/ElasticsearchConfiguration.java +++ b/components/camel-elasticsearch/src/main/java/org/apache/camel/component/elasticsearch/ElasticsearchConfiguration.java @@ -18,12 +18,18 @@ package org.apache.camel.component.elasticsearch; import java.net.URI; import java.net.URISyntaxException; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.List; import java.util.Map; import org.apache.camel.spi.Metadata; import org.apache.camel.spi.UriParam; import org.apache.camel.spi.UriParams; import org.apache.camel.spi.UriPath; +import org.elasticsearch.action.WriteConsistencyLevel; +import org.elasticsearch.action.support.replication.ReplicationType; +import org.elasticsearch.common.transport.InetSocketTransportAddress; import org.elasticsearch.node.Node; import org.elasticsearch.node.NodeBuilder; @@ -44,14 +50,22 @@ public class ElasticsearchConfiguration { public static final String PARAM_DATA = "data"; public static final String PARAM_INDEX_NAME = "indexName"; public static final String PARAM_INDEX_TYPE = "indexType"; + public static final String PARAM_CONSISTENCY_LEVEL = "consistencyLevel"; + public static final String PARAM_REPLICATION_TYPE = "replicationType"; + public static final String TRANSPORT_ADDRESSES = "transportAddresses"; public static final String PROTOCOL = "elasticsearch"; private static final String LOCAL_NAME = "local"; private static final String IP = "ip"; private static final String PORT = "port"; private static final Integer DEFAULT_PORT = 9300; + private static final WriteConsistencyLevel DEFAULT_CONSISTENCY_LEVEL = WriteConsistencyLevel.DEFAULT; + private static final ReplicationType DEFAULT_REPLICATION_TYPE = ReplicationType.DEFAULT; + private static final String TRANSPORT_ADDRESSES_SEPARATOR_REGEX = ","; + private static final String IP_PORT_SEPARATOR_REGEX = ":"; private URI uri; - @UriPath(description = "Name of cluster or use local for local mode") @Metadata(required = "true") + @UriPath(description = "Name of cluster or use local for local mode") + @Metadata(required = "true") private String clusterName; @UriParam private String protocolType; @@ -62,6 +76,10 @@ public class ElasticsearchConfiguration { @UriParam private String indexType; @UriParam + private WriteConsistencyLevel consistencyLevel; + @UriParam + private ReplicationType replicationType; + @UriParam private boolean local; @UriParam private Boolean data; @@ -70,6 +88,8 @@ public class ElasticsearchConfiguration { @UriParam private String ip; @UriParam + private List<InetSocketTransportAddress> transportAddresses; + @UriParam private Integer port; public ElasticsearchConfiguration(URI uri, Map<String, Object> parameters) throws Exception { @@ -106,11 +126,53 @@ public class ElasticsearchConfiguration { indexName = (String)parameters.remove(PARAM_INDEX_NAME); indexType = (String)parameters.remove(PARAM_INDEX_TYPE); operation = (String)parameters.remove(PARAM_OPERATION); + consistencyLevel = parseConsistencyLevel(parameters); + replicationType = parseReplicationType(parameters); + ip = (String)parameters.remove(IP); + transportAddresses = parseTransportAddresses((String) parameters.remove(TRANSPORT_ADDRESSES)); + String portParam = (String) parameters.remove(PORT); port = portParam == null ? DEFAULT_PORT : Integer.valueOf(portParam); } + private ReplicationType parseReplicationType(Map<String, Object> parameters) { + Object replicationTypeParam = parameters.remove(PARAM_REPLICATION_TYPE); + if (replicationTypeParam != null) { + return ReplicationType.valueOf(replicationTypeParam.toString()); + } else { + return DEFAULT_REPLICATION_TYPE; + } + } + + private WriteConsistencyLevel parseConsistencyLevel(Map<String, Object> parameters) { + Object consistencyLevelParam = parameters.remove(PARAM_CONSISTENCY_LEVEL); + if (consistencyLevelParam != null) { + return WriteConsistencyLevel.valueOf(consistencyLevelParam.toString()); + } else { + return DEFAULT_CONSISTENCY_LEVEL; + } + } + + private List<InetSocketTransportAddress> parseTransportAddresses(String ipsString) { + if (ipsString == null || ipsString.isEmpty()) { + return null; + } + List<String> addressesStr = Arrays.asList(ipsString.split(TRANSPORT_ADDRESSES_SEPARATOR_REGEX)); + List<InetSocketTransportAddress> addressesTrAd = new ArrayList<>(addressesStr.size()); + for (String address : addressesStr) { + String[] split = address.split(IP_PORT_SEPARATOR_REGEX); + String hostname; + if (split.length > 0) + hostname = split[0]; + else + throw new IllegalArgumentException(); + Integer port = (split.length > 1 ? Integer.parseInt(split[1]) : DEFAULT_PORT); + addressesTrAd.add(new InetSocketTransportAddress(hostname, port)); + } + return addressesTrAd; + } + protected Boolean toBoolean(Object string) { if ("true".equals(string)) { return true; @@ -217,6 +279,14 @@ public class ElasticsearchConfiguration { this.ip = ip; } + public List<InetSocketTransportAddress> getTransportAddresses() { + return transportAddresses; + } + + public void setTransportAddresses(List<InetSocketTransportAddress> transportAddresses) { + this.transportAddresses = transportAddresses; + } + public Integer getPort() { return port; } @@ -225,4 +295,20 @@ public class ElasticsearchConfiguration { this.port = port; } + public void setConsistencyLevel(WriteConsistencyLevel consistencyLevel) { + this.consistencyLevel = consistencyLevel; + } + + public WriteConsistencyLevel getConsistencyLevel() { + return consistencyLevel; + } + + public void setReplicationType(ReplicationType replicationType) { + this.replicationType = replicationType; + } + + public ReplicationType getReplicationType() { + return replicationType; + } + } http://git-wip-us.apache.org/repos/asf/camel/blob/fba8aa47/components/camel-elasticsearch/src/main/java/org/apache/camel/component/elasticsearch/ElasticsearchEndpoint.java ---------------------------------------------------------------------- diff --git a/components/camel-elasticsearch/src/main/java/org/apache/camel/component/elasticsearch/ElasticsearchEndpoint.java b/components/camel-elasticsearch/src/main/java/org/apache/camel/component/elasticsearch/ElasticsearchEndpoint.java index da49c12..4a582ed 100644 --- a/components/camel-elasticsearch/src/main/java/org/apache/camel/component/elasticsearch/ElasticsearchEndpoint.java +++ b/components/camel-elasticsearch/src/main/java/org/apache/camel/component/elasticsearch/ElasticsearchEndpoint.java @@ -18,6 +18,8 @@ package org.apache.camel.component.elasticsearch; import java.net.URI; import java.util.Map; +import java.util.ArrayList; +import java.util.List; import org.apache.camel.Consumer; import org.apache.camel.Processor; import org.apache.camel.Producer; @@ -30,6 +32,7 @@ import org.elasticsearch.client.transport.TransportClient; import org.elasticsearch.common.settings.ImmutableSettings; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.transport.InetSocketTransportAddress; +import org.elasticsearch.common.transport.TransportAddress; import org.elasticsearch.node.Node; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -73,27 +76,37 @@ public class ElasticsearchEndpoint extends DefaultEndpoint { LOG.info("Joining ElasticSearch cluster " + configuration.getClusterName()); } if (configuration.getIp() != null) { - LOG.info("REMOTE ELASTICSEARCH: {}", configuration.getIp()); - Settings settings = ImmutableSettings.settingsBuilder() - // setting the classloader here will allow the underlying elasticsearch-java - // class to find its names.txt in an OSGi environment (otherwise the thread - // classloader is used, which won't be able to see the file causing a startup - // exception). - .classLoader(Settings.class.getClassLoader()) - .put("cluster.name", configuration.getClusterName()) - .put("client.transport.ignore_cluster_name", false) - .put("node.client", true) - .put("client.transport.sniff", true) - .build(); - Client client = new TransportClient(settings) + this.client = new TransportClient(getSettings()) .addTransportAddress(new InetSocketTransportAddress(configuration.getIp(), configuration.getPort())); - this.client = client; + + } else if (configuration.getTransportAddresses() != null && + !configuration.getTransportAddresses().isEmpty()) { + List<TransportAddress> addresses = new ArrayList<>(configuration.getTransportAddresses().size()); + for (TransportAddress address : configuration.getTransportAddresses()) { + addresses.add(address); + } + this.client = new TransportClient(getSettings()) + .addTransportAddresses(addresses.toArray(new TransportAddress[0])); } else { node = configuration.buildNode(); client = node.client(); } } + private Settings getSettings() { + return ImmutableSettings.settingsBuilder() + // setting the classloader here will allow the underlying elasticsearch-java + // class to find its names.txt in an OSGi environment (otherwise the thread + // classloader is used, which won't be able to see the file causing a startup + // exception). + .classLoader(Settings.class.getClassLoader()) + .put("cluster.name", configuration.getClusterName()) + .put("client.transport.ignore_cluster_name", false) + .put("node.client", true) + .put("client.transport.sniff", true) + .build(); + } + @Override protected void doStop() throws Exception { if (configuration.isLocal()) { http://git-wip-us.apache.org/repos/asf/camel/blob/fba8aa47/components/camel-elasticsearch/src/main/java/org/apache/camel/component/elasticsearch/ElasticsearchProducer.java ---------------------------------------------------------------------- diff --git a/components/camel-elasticsearch/src/main/java/org/apache/camel/component/elasticsearch/ElasticsearchProducer.java b/components/camel-elasticsearch/src/main/java/org/apache/camel/component/elasticsearch/ElasticsearchProducer.java index 2b432d5..0ededde 100644 --- a/components/camel-elasticsearch/src/main/java/org/apache/camel/component/elasticsearch/ElasticsearchProducer.java +++ b/components/camel-elasticsearch/src/main/java/org/apache/camel/component/elasticsearch/ElasticsearchProducer.java @@ -113,6 +113,20 @@ public class ElasticsearchProducer extends DefaultProducer { configIndexType = true; } + boolean configConsistencyLevel = false; + String consistencyLevel = message.getHeader(ElasticsearchConfiguration.PARAM_CONSISTENCY_LEVEL, String.class); + if (consistencyLevel == null) { + message.setHeader(ElasticsearchConfiguration.PARAM_CONSISTENCY_LEVEL, getEndpoint().getConfig().getConsistencyLevel()); + configConsistencyLevel = true; + } + + boolean configReplicationType = false; + String replicationType = message.getHeader(ElasticsearchConfiguration.PARAM_REPLICATION_TYPE, String.class); + if (replicationType == null) { + message.setHeader(ElasticsearchConfiguration.PARAM_REPLICATION_TYPE, getEndpoint().getConfig().getReplicationType()); + configReplicationType = true; + } + Client client = getEndpoint().getClient(); if (ElasticsearchConfiguration.OPERATION_INDEX.equals(operation)) { IndexRequest indexRequest = message.getBody(IndexRequest.class); @@ -155,5 +169,14 @@ public class ElasticsearchProducer extends DefaultProducer { if (configIndexType) { message.removeHeader(ElasticsearchConfiguration.PARAM_INDEX_TYPE); } + + if (configConsistencyLevel) { + message.removeHeader(ElasticsearchConfiguration.PARAM_CONSISTENCY_LEVEL); + } + + if (configReplicationType) { + message.removeHeader(ElasticsearchConfiguration.PARAM_REPLICATION_TYPE); + } + } } http://git-wip-us.apache.org/repos/asf/camel/blob/fba8aa47/components/camel-elasticsearch/src/main/java/org/apache/camel/component/elasticsearch/converter/ElasticsearchActionRequestConverter.java ---------------------------------------------------------------------- diff --git a/components/camel-elasticsearch/src/main/java/org/apache/camel/component/elasticsearch/converter/ElasticsearchActionRequestConverter.java b/components/camel-elasticsearch/src/main/java/org/apache/camel/component/elasticsearch/converter/ElasticsearchActionRequestConverter.java index 934258f..a64f843 100644 --- a/components/camel-elasticsearch/src/main/java/org/apache/camel/component/elasticsearch/converter/ElasticsearchActionRequestConverter.java +++ b/components/camel-elasticsearch/src/main/java/org/apache/camel/component/elasticsearch/converter/ElasticsearchActionRequestConverter.java @@ -22,11 +22,13 @@ import java.util.Map; import org.apache.camel.Converter; import org.apache.camel.Exchange; import org.apache.camel.component.elasticsearch.ElasticsearchConfiguration; +import org.elasticsearch.action.WriteConsistencyLevel; import org.elasticsearch.action.bulk.BulkRequest; import org.elasticsearch.action.delete.DeleteRequest; import org.elasticsearch.action.get.GetRequest; import org.elasticsearch.action.index.IndexRequest; import org.elasticsearch.action.search.SearchRequest; +import org.elasticsearch.action.support.replication.ReplicationType; import org.elasticsearch.common.xcontent.XContentBuilder; @Converter @@ -50,13 +52,15 @@ public final class ElasticsearchActionRequestConverter { return null; } - return indexRequest.index( - exchange.getIn().getHeader( - ElasticsearchConfiguration.PARAM_INDEX_NAME, - String.class)).type( - exchange.getIn().getHeader( - ElasticsearchConfiguration.PARAM_INDEX_TYPE, - String.class)); + return indexRequest + .consistencyLevel(exchange.getIn().getHeader( + ElasticsearchConfiguration.PARAM_CONSISTENCY_LEVEL, WriteConsistencyLevel.class)) + .replicationType(exchange.getIn().getHeader( + ElasticsearchConfiguration.PARAM_REPLICATION_TYPE, ReplicationType.class)) + .index(exchange.getIn().getHeader( + ElasticsearchConfiguration.PARAM_INDEX_NAME, String.class)) + .type(exchange.getIn().getHeader( + ElasticsearchConfiguration.PARAM_INDEX_TYPE, String.class)); } @Converter http://git-wip-us.apache.org/repos/asf/camel/blob/fba8aa47/components/camel-elasticsearch/src/test/java/org/apache/camel/component/elasticsearch/ElasticsearchComponentTest.java ---------------------------------------------------------------------- diff --git a/components/camel-elasticsearch/src/test/java/org/apache/camel/component/elasticsearch/ElasticsearchComponentTest.java b/components/camel-elasticsearch/src/test/java/org/apache/camel/component/elasticsearch/ElasticsearchComponentTest.java index 318bdf5..27fa90f 100644 --- a/components/camel-elasticsearch/src/test/java/org/apache/camel/component/elasticsearch/ElasticsearchComponentTest.java +++ b/components/camel-elasticsearch/src/test/java/org/apache/camel/component/elasticsearch/ElasticsearchComponentTest.java @@ -94,6 +94,20 @@ public class ElasticsearchComponentTest extends CamelTestSupport { } @Test + public void testIndexWithReplication() throws Exception { + Map<String, String> map = createIndexedData(); + String indexId = template.requestBody("direct:indexWithReplication", map, String.class); + assertNotNull("indexId should be set", indexId); + } + + @Test + public void testIndexWithWriteConsistency() throws Exception { + Map<String, String> map = createIndexedData(); + String indexId = template.requestBody("direct:indexWithWriteConsistency", map, String.class); + assertNotNull("indexId should be set", indexId); + } + + @Test public void testBulkIndex() throws Exception { List<Map<String, String>> documents = new ArrayList<Map<String, String>>(); Map<String, String> document1 = createIndexedData("1"); @@ -215,6 +229,33 @@ public class ElasticsearchComponentTest extends CamelTestSupport { } @Test + @Ignore("need to setup the cluster with multiple nodes for this test") + public void indexWithTransportAddresses() throws Exception { + Map<String, String> map = createIndexedData(); + Map<String, Object> headers = new HashMap<String, Object>(); + headers.put(ElasticsearchConfiguration.PARAM_OPERATION, ElasticsearchConfiguration.OPERATION_INDEX); + headers.put(ElasticsearchConfiguration.PARAM_INDEX_NAME, "twitter"); + headers.put(ElasticsearchConfiguration.PARAM_INDEX_TYPE, "tweet"); + + String indexId = template.requestBodyAndHeaders("direct:indexWithTransportAddresses", map, headers, String.class); + assertNotNull("indexId should be set", indexId); + } + + @Test + @Ignore("need to setup the cluster with multiple nodes for this test") + public void indexWithIpAndTransportAddresses() throws Exception { + Map<String, String> map = createIndexedData(); + Map<String, Object> headers = new HashMap<String, Object>(); + headers.put(ElasticsearchConfiguration.PARAM_OPERATION, ElasticsearchConfiguration.OPERATION_INDEX); + headers.put(ElasticsearchConfiguration.PARAM_INDEX_NAME, "twitter"); + headers.put(ElasticsearchConfiguration.PARAM_INDEX_TYPE, "tweet"); + + //should ignore transport addresses configuration + String indexId = template.requestBodyAndHeaders("direct:indexWithIpAndTransportAddresses", map, headers, String.class); + assertNotNull("indexId should be set", indexId); + } + + @Test public void testGetWithHeaders() throws Exception { //first, INDEX a value Map<String, String> map = createIndexedData(); @@ -357,6 +398,8 @@ public class ElasticsearchComponentTest extends CamelTestSupport { public void configure() { from("direct:start").to("elasticsearch://local"); from("direct:index").to("elasticsearch://local?operation=INDEX&indexName=twitter&indexType=tweet"); + from("direct:indexWithReplication").to("elasticsearch://local?operation=INDEX&indexName=twitter&indexType=tweet&replicationType=SYNC"); + from("direct:indexWithWriteConsistency").to("elasticsearch://local?operation=INDEX&indexName=twitter&indexType=tweet&consistencyLevel=ONE"); from("direct:get").to("elasticsearch://local?operation=GET_BY_ID&indexName=twitter&indexType=tweet"); from("direct:delete").to("elasticsearch://local?operation=DELETE&indexName=twitter&indexType=tweet"); from("direct:search").to("elasticsearch://local?operation=SEARCH&indexName=twitter&indexType=tweet"); @@ -364,6 +407,8 @@ public class ElasticsearchComponentTest extends CamelTestSupport { from("direct:bulk").to("elasticsearch://local?operation=BULK&indexName=twitter&indexType=tweet"); //from("direct:indexWithIp").to("elasticsearch://elasticsearch?operation=INDEX&indexName=twitter&indexType=tweet&ip=localhost"); //from("direct:indexWithIpAndPort").to("elasticsearch://elasticsearch?operation=INDEX&indexName=twitter&indexType=tweet&ip=localhost&port=9300"); + //from("direct:indexWithTransportAddresses").to("elasticsearch://elasticsearch?operation=INDEX&indexName=twitter&indexType=tweet&transportAddresses=localhost:9300,localhost:9301"); + //from("direct:indexWithIpAndTransportAddresses").to("elasticsearch://elasticsearch?operation=INDEX&indexName=twitter&indexType=tweet&ip=localhost&port=9300&transportAddresses=localhost:4444,localhost:5555"); } }; } http://git-wip-us.apache.org/repos/asf/camel/blob/fba8aa47/components/camel-elasticsearch/src/test/java/org/apache/camel/component/elasticsearch/ElasticsearchConfigurationTest.java ---------------------------------------------------------------------- diff --git a/components/camel-elasticsearch/src/test/java/org/apache/camel/component/elasticsearch/ElasticsearchConfigurationTest.java b/components/camel-elasticsearch/src/test/java/org/apache/camel/component/elasticsearch/ElasticsearchConfigurationTest.java index f96c164..294cd32 100644 --- a/components/camel-elasticsearch/src/test/java/org/apache/camel/component/elasticsearch/ElasticsearchConfigurationTest.java +++ b/components/camel-elasticsearch/src/test/java/org/apache/camel/component/elasticsearch/ElasticsearchConfigurationTest.java @@ -21,6 +21,8 @@ import java.util.Map; import org.apache.camel.test.junit4.CamelTestSupport; import org.apache.camel.util.URISupport; +import org.elasticsearch.action.WriteConsistencyLevel; +import org.elasticsearch.action.support.replication.ReplicationType; import org.junit.Test; @@ -93,4 +95,110 @@ public class ElasticsearchConfigurationTest extends CamelTestSupport { assertNull(conf.getClusterName()); } + @Test + public void writeConsistencyLevelDefaultConfTest() throws Exception { + URI uri = new URI("elasticsearch://local?operation=INDEX&indexName=twitter&indexType=tweet"); + Map<String, Object> parameters = URISupport.parseParameters(uri); + ElasticsearchConfiguration conf = new ElasticsearchConfiguration(uri, parameters); + assertTrue(conf.isLocal()); + assertEquals("INDEX", conf.getOperation()); + assertEquals("twitter", conf.getIndexName()); + assertEquals("tweet", conf.getIndexType()); + assertEquals(WriteConsistencyLevel.DEFAULT, conf.getConsistencyLevel()); + assertNull(conf.getClusterName()); + } + + @Test + public void writeConsistencyLevelConfTest() throws Exception { + URI uri = new URI("elasticsearch://local?operation=INDEX&indexName=twitter&indexType=tweet&consistencyLevel=QUORUM"); + Map<String, Object> parameters = URISupport.parseParameters(uri); + ElasticsearchConfiguration conf = new ElasticsearchConfiguration(uri, parameters); + assertTrue(conf.isLocal()); + assertEquals("INDEX", conf.getOperation()); + assertEquals("twitter", conf.getIndexName()); + assertEquals("tweet", conf.getIndexType()); + assertEquals(WriteConsistencyLevel.QUORUM, conf.getConsistencyLevel()); + assertNull(conf.getClusterName()); + } + + @Test + public void replicationTypeConfTest() throws Exception { + URI uri = new URI("elasticsearch://local?operation=INDEX&indexName=twitter&indexType=tweet&replicationType=ASYNC"); + Map<String, Object> parameters = URISupport.parseParameters(uri); + ElasticsearchConfiguration conf = new ElasticsearchConfiguration(uri, parameters); + assertDefaultConfigurationParameters(conf); + assertEquals(ReplicationType.ASYNC, conf.getReplicationType()); + } + + @Test + public void replicationTypeDefaultConfTest() throws Exception { + URI uri = new URI("elasticsearch://local?operation=INDEX&indexName=twitter&indexType=tweet"); + Map<String, Object> parameters = URISupport.parseParameters(uri); + ElasticsearchConfiguration conf = new ElasticsearchConfiguration(uri, parameters); + assertDefaultConfigurationParameters(conf); + assertEquals(ReplicationType.DEFAULT, conf.getReplicationType()); + } + + @Test + public void transportAddressesSimpleHostnameTest() throws Exception { + URI uri = new URI("elasticsearch://local?operation=INDEX&indexName=twitter&" + + "indexType=tweet&transportAddresses=127.0.0.1"); + Map<String, Object> parameters = URISupport.parseParameters(uri); + ElasticsearchConfiguration conf = new ElasticsearchConfiguration(uri, parameters); + assertDefaultConfigurationParameters(conf); + assertEquals(1, conf.getTransportAddresses().size()); + assertEquals("127.0.0.1", conf.getTransportAddresses().get(0).address().getHostString()); + assertEquals(9300, conf.getTransportAddresses().get(0).address().getPort()); + } + + @Test + public void transportAddressesMultipleHostnameTest() throws Exception { + URI uri = new URI("elasticsearch://local?operation=INDEX&indexName=twitter&" + + "indexType=tweet&transportAddresses=127.0.0.1,127.0.0.2"); + Map<String, Object> parameters = URISupport.parseParameters(uri); + ElasticsearchConfiguration conf = new ElasticsearchConfiguration(uri, parameters); + assertDefaultConfigurationParameters(conf); + assertEquals(2, conf.getTransportAddresses().size()); + assertEquals("127.0.0.1", conf.getTransportAddresses().get(0).address().getHostString()); + assertEquals(9300, conf.getTransportAddresses().get(0).address().getPort()); + assertEquals("127.0.0.2", conf.getTransportAddresses().get(1).address().getHostString()); + assertEquals(9300, conf.getTransportAddresses().get(1).address().getPort()); + } + + @Test + public void transportAddressesSimpleHostnameAndPortTest() throws Exception { + URI uri = new URI("elasticsearch://local?operation=INDEX&indexName=twitter&" + + "indexType=tweet&transportAddresses=127.0.0.1:9305"); + Map<String, Object> parameters = URISupport.parseParameters(uri); + ElasticsearchConfiguration conf = new ElasticsearchConfiguration(uri, parameters); + assertDefaultConfigurationParameters(conf); + assertEquals(1, conf.getTransportAddresses().size()); + assertEquals("127.0.0.1", conf.getTransportAddresses().get(0).address().getHostString()); + assertEquals(9305, conf.getTransportAddresses().get(0).address().getPort()); + } + + @Test + public void transportAddressesMultipleHostnameAndPortTest() throws Exception { + URI uri = new URI("elasticsearch://local?operation=INDEX&indexName=twitter&" + + "indexType=tweet&transportAddresses=127.0.0.1:9400,127.0.0.2,127.0.0.3:9401"); + Map<String, Object> parameters = URISupport.parseParameters(uri); + ElasticsearchConfiguration conf = new ElasticsearchConfiguration(uri, parameters); + assertDefaultConfigurationParameters(conf); + assertEquals(3, conf.getTransportAddresses().size()); + assertEquals("127.0.0.1", conf.getTransportAddresses().get(0).address().getHostString()); + assertEquals(9400, conf.getTransportAddresses().get(0).address().getPort()); + assertEquals("127.0.0.2", conf.getTransportAddresses().get(1).address().getHostString()); + assertEquals(9300, conf.getTransportAddresses().get(1).address().getPort()); + assertEquals("127.0.0.3", conf.getTransportAddresses().get(2).address().getHostString()); + assertEquals(9401, conf.getTransportAddresses().get(2).address().getPort()); + } + + private void assertDefaultConfigurationParameters(ElasticsearchConfiguration conf) { + assertTrue(conf.isLocal()); + assertEquals("INDEX", conf.getOperation()); + assertEquals("twitter", conf.getIndexName()); + assertEquals("tweet", conf.getIndexType()); + assertNull(conf.getClusterName()); + } + }