Updated Branches: refs/heads/flume-1.5 f6bbc5c5a -> 933617039
FLUME-2210. UnresolvedAddressException when using multiple hostNames in Elasticsearch sink configuration (Dib Ghosh via Hari Shreedharan) Project: http://git-wip-us.apache.org/repos/asf/flume/repo Commit: http://git-wip-us.apache.org/repos/asf/flume/commit/93361703 Tree: http://git-wip-us.apache.org/repos/asf/flume/tree/93361703 Diff: http://git-wip-us.apache.org/repos/asf/flume/diff/93361703 Branch: refs/heads/flume-1.5 Commit: 933617039b81e1ade6593d96ef39508d78d8a518 Parents: f6bbc5c Author: Hari Shreedharan <[email protected]> Authored: Thu Oct 24 17:47:53 2013 -0700 Committer: Hari Shreedharan <[email protected]> Committed: Thu Oct 24 18:23:32 2013 -0700 ---------------------------------------------------------------------- .../sink/elasticsearch/ElasticSearchSink.java | 6 ++-- .../elasticsearch/TestElasticSearchSink.java | 30 ++++++++++++++++++++ 2 files changed, 33 insertions(+), 3 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/flume/blob/93361703/flume-ng-sinks/flume-ng-elasticsearch-sink/src/main/java/org/apache/flume/sink/elasticsearch/ElasticSearchSink.java ---------------------------------------------------------------------- diff --git a/flume-ng-sinks/flume-ng-elasticsearch-sink/src/main/java/org/apache/flume/sink/elasticsearch/ElasticSearchSink.java b/flume-ng-sinks/flume-ng-elasticsearch-sink/src/main/java/org/apache/flume/sink/elasticsearch/ElasticSearchSink.java index 3286412..3d01173 100644 --- a/flume-ng-sinks/flume-ng-elasticsearch-sink/src/main/java/org/apache/flume/sink/elasticsearch/ElasticSearchSink.java +++ b/flume-ng-sinks/flume-ng-elasticsearch-sink/src/main/java/org/apache/flume/sink/elasticsearch/ElasticSearchSink.java @@ -241,9 +241,9 @@ public class ElasticSearchSink extends AbstractSink implements Configurable { serverAddresses = new InetSocketTransportAddress[hostNames.length]; for (int i = 0; i < hostNames.length; i++) { - String[] hostPort = hostNames[i].split(":"); - String host = hostPort[0]; - int port = hostPort.length == 2 ? Integer.parseInt(hostPort[1]) + String[] hostPort = hostNames[i].trim().split(":"); + String host = hostPort[0].trim(); + int port = hostPort.length == 2 ? Integer.parseInt(hostPort[1].trim()) : DEFAULT_PORT; serverAddresses[i] = new InetSocketTransportAddress(host, port); } http://git-wip-us.apache.org/repos/asf/flume/blob/93361703/flume-ng-sinks/flume-ng-elasticsearch-sink/src/test/java/org/apache/flume/sink/elasticsearch/TestElasticSearchSink.java ---------------------------------------------------------------------- diff --git a/flume-ng-sinks/flume-ng-elasticsearch-sink/src/test/java/org/apache/flume/sink/elasticsearch/TestElasticSearchSink.java b/flume-ng-sinks/flume-ng-elasticsearch-sink/src/test/java/org/apache/flume/sink/elasticsearch/TestElasticSearchSink.java index ad40a3c..3f2ec6e 100644 --- a/flume-ng-sinks/flume-ng-elasticsearch-sink/src/test/java/org/apache/flume/sink/elasticsearch/TestElasticSearchSink.java +++ b/flume-ng-sinks/flume-ng-elasticsearch-sink/src/test/java/org/apache/flume/sink/elasticsearch/TestElasticSearchSink.java @@ -212,6 +212,21 @@ public class TestElasticSearchSink extends AbstractElasticSearchSinkTest { } @Test + public void shouldParseMultipleHostWithWhitespacesUsingDefaultPorts() { + parameters.put(HOSTNAMES, " 10.5.5.27 , 10.5.5.28 , 10.5.5.29 "); + + fixture = new ElasticSearchSink(); + fixture.configure(new Context(parameters)); + + InetSocketTransportAddress[] expected = { + new InetSocketTransportAddress("10.5.5.27", DEFAULT_PORT), + new InetSocketTransportAddress("10.5.5.28", DEFAULT_PORT), + new InetSocketTransportAddress("10.5.5.29", DEFAULT_PORT) }; + + assertArrayEquals(expected, fixture.getServerAddresses()); + } + + @Test public void shouldParseMultipleHostAndPorts() { parameters.put(HOSTNAMES, "10.5.5.27:9300,10.5.5.28:9301,10.5.5.29:9302"); @@ -227,6 +242,21 @@ public class TestElasticSearchSink extends AbstractElasticSearchSinkTest { } @Test + public void shouldParseMultipleHostAndPortsWithWhitespaces() { + parameters.put(HOSTNAMES, " 10.5.5.27 : 9300 , 10.5.5.28 : 9301 , 10.5.5.29 : 9302 "); + + fixture = new ElasticSearchSink(); + fixture.configure(new Context(parameters)); + + InetSocketTransportAddress[] expected = { + new InetSocketTransportAddress("10.5.5.27", 9300), + new InetSocketTransportAddress("10.5.5.28", 9301), + new InetSocketTransportAddress("10.5.5.29", 9302) }; + + assertArrayEquals(expected, fixture.getServerAddresses()); + } + + @Test public void shouldAllowCustomElasticSearchIndexRequestBuilderFactory() throws Exception {
