STORM-845 Storm ElasticSearch connector
Project: http://git-wip-us.apache.org/repos/asf/storm/repo Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/2002dbd7 Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/2002dbd7 Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/2002dbd7 Branch: refs/heads/master Commit: 2002dbd7cf54c9615c7c826496275d35b7471c5b Parents: c147a4e Author: SEUNGJIN LEE <sweetest...@navercorp.com> Authored: Mon Jun 22 19:33:40 2015 +0900 Committer: SEUNGJIN LEE <sweetest...@navercorp.com> Committed: Thu Jul 23 14:32:17 2015 +0900 ---------------------------------------------------------------------- external/storm-elasticsearch/README.md | 17 ++++------ .../elasticsearch/bolt/AbstractEsBolt.java | 9 ++++-- .../storm/elasticsearch/common/EsConfig.java | 24 +++++--------- .../storm/elasticsearch/trident/EsState.java | 33 ++++++++++++-------- .../elasticsearch/bolt/AbstractEsBoltTest.java | 25 ++++++++------- .../elasticsearch/bolt/EsIndexBoltTest.java | 3 +- .../elasticsearch/bolt/EsIndexTopology.java | 3 +- .../elasticsearch/bolt/EsPercolateBoltTest.java | 3 +- .../trident/TridentEsTopology.java | 3 +- 9 files changed, 57 insertions(+), 63 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/storm/blob/2002dbd7/external/storm-elasticsearch/README.md ---------------------------------------------------------------------- diff --git a/external/storm-elasticsearch/README.md b/external/storm-elasticsearch/README.md index 562fd6d..20caece 100644 --- a/external/storm-elasticsearch/README.md +++ b/external/storm-elasticsearch/README.md @@ -13,8 +13,7 @@ User should make sure that there are "index","type", and "source" fields declare ```java EsConfig esConfig = new EsConfig(); esConfig.setClusterName(clusterName); -esConfig.setHost(new String[]{"localhost"}); -esConfig.setPort(9300); +esConfig.setNodes(new String[]{"localhost:9300"}); EsIndexBolt indexBolt = new IndexBolt(esConfig); ``` @@ -28,8 +27,7 @@ User should make sure that there are "index","type", and "source" fields declare ```java EsConfig esConfig = new EsConfig(); esConfig.setClusterName(clusterName); -esConfig.setHost(new String[]{"localhost"}); -esConfig.setPort(9300); +esConfig.setNodes(new String[]{"localhost:9300"}); EsPercolateBolt percolateBolt = new EsPercolateBolt(esConfig); ``` @@ -40,8 +38,7 @@ Two bolts above takes in EsConfig as a constructor arg. ```java EsConfig esConfig = new EsConfig(); esConfig.setClusterName(clusterName); - esConfig.setHost(new String[]{"localhost"}); - esConfig.setPort(9300); + esConfig.setNodes(new String[]{"localhost:9300"}); ``` EsConfig params @@ -49,8 +46,7 @@ EsConfig params |Arg |Description | Type |--- |--- |--- |clusterName | ElasticSearch cluster name | String (required) | -|host | ElasticSearch host | String array (required) | -|port | ElasticSearch port | int (required) | +|nodes | ElasticSearch nodes in a String array, each element should follow {host}:{port} pattern | String array (required) | @@ -61,9 +57,8 @@ ElasticSearch Trident state also follows similar pattern to EsBolts. It takes in ```code EsConfig esConfig = new EsConfig(); esConfig.setClusterName(clusterName); - esConfig.setHost(new String[]{"localhost"}); - esConfig.setPort(9300); - + esConfig.setNodes(new String[]{"localhost:9300"}); + StateFactory factory = new EsStateFactory(esConfig); TridentState state = stream.partitionPersist(factory, esFields, new EsUpdater(), new Fields()); ``` http://git-wip-us.apache.org/repos/asf/storm/blob/2002dbd7/external/storm-elasticsearch/src/main/java/org/apache/storm/elasticsearch/bolt/AbstractEsBolt.java ---------------------------------------------------------------------- diff --git a/external/storm-elasticsearch/src/main/java/org/apache/storm/elasticsearch/bolt/AbstractEsBolt.java b/external/storm-elasticsearch/src/main/java/org/apache/storm/elasticsearch/bolt/AbstractEsBolt.java index cd7fc81..1e2d1ed 100644 --- a/external/storm-elasticsearch/src/main/java/org/apache/storm/elasticsearch/bolt/AbstractEsBolt.java +++ b/external/storm-elasticsearch/src/main/java/org/apache/storm/elasticsearch/bolt/AbstractEsBolt.java @@ -47,7 +47,6 @@ public abstract class AbstractEsBolt extends BaseRichBolt { @Override public void prepare(Map map, TopologyContext topologyContext, OutputCollector outputCollector) { - System.out.println(this.getClass().getName()); try { this.collector = outputCollector; synchronized (AbstractEsBolt.class) { @@ -56,8 +55,12 @@ public abstract class AbstractEsBolt extends BaseRichBolt { ImmutableSettings.settingsBuilder().put("cluster.name", esConfig.getClusterName()) .put("client.transport.sniff", "false").build(); List<InetSocketTransportAddress> transportAddressList = new ArrayList<InetSocketTransportAddress>(); - for (String host : esConfig.getHost()) { - transportAddressList.add(new InetSocketTransportAddress(host, esConfig.getPort())); + for (String node : esConfig.getNodes()) { + String[] hostAndPort = node.split(":"); + if(hostAndPort.length != 2){ + throw new Exception("incorrect ElasticSearch node format, should follow {host}:{port} pattern"); + } + transportAddressList.add(new InetSocketTransportAddress(hostAndPort[0], Integer.parseInt(hostAndPort[1]))); } client = new TransportClient(settings) .addTransportAddresses(transportAddressList.toArray(new InetSocketTransportAddress[transportAddressList.size()])); http://git-wip-us.apache.org/repos/asf/storm/blob/2002dbd7/external/storm-elasticsearch/src/main/java/org/apache/storm/elasticsearch/common/EsConfig.java ---------------------------------------------------------------------- diff --git a/external/storm-elasticsearch/src/main/java/org/apache/storm/elasticsearch/common/EsConfig.java b/external/storm-elasticsearch/src/main/java/org/apache/storm/elasticsearch/common/EsConfig.java index f2aa48f..c97d77f 100644 --- a/external/storm-elasticsearch/src/main/java/org/apache/storm/elasticsearch/common/EsConfig.java +++ b/external/storm-elasticsearch/src/main/java/org/apache/storm/elasticsearch/common/EsConfig.java @@ -21,16 +21,14 @@ import java.io.Serializable; public class EsConfig implements Serializable{ private String clusterName; - private String[] host; - private int port; + private String[] nodes; public EsConfig() { } - public EsConfig(String clusterName, String[] host, int port) { + public EsConfig(String clusterName, String[] nodes, int port) { this.clusterName = clusterName; - this.host = host; - this.port = port; + this.nodes = nodes; } public String getClusterName() { @@ -41,19 +39,11 @@ public class EsConfig implements Serializable{ this.clusterName = clusterName; } - public String[] getHost() { - return host; + public String[] getNodes() { + return nodes; } - public void setHost(String[] host) { - this.host = host; - } - - public int getPort() { - return port; - } - - public void setPort(int port) { - this.port = port; + public void setNodes(String[] nodes) { + this.nodes = nodes; } } http://git-wip-us.apache.org/repos/asf/storm/blob/2002dbd7/external/storm-elasticsearch/src/main/java/org/apache/storm/elasticsearch/trident/EsState.java ---------------------------------------------------------------------- diff --git a/external/storm-elasticsearch/src/main/java/org/apache/storm/elasticsearch/trident/EsState.java b/external/storm-elasticsearch/src/main/java/org/apache/storm/elasticsearch/trident/EsState.java index e753119..ee95355 100644 --- a/external/storm-elasticsearch/src/main/java/org/apache/storm/elasticsearch/trident/EsState.java +++ b/external/storm-elasticsearch/src/main/java/org/apache/storm/elasticsearch/trident/EsState.java @@ -6,9 +6,9 @@ * to you under the Apache License, Version 2.0 (the * "License"); you may not use this file except in compliance * with the License. You may obtain a copy of the License at - * + * <p/> * http://www.apache.org/licenses/LICENSE-2.0 - * + * <p/> * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. @@ -57,20 +57,27 @@ public class EsState implements State { } public void prepare(Map conf, IMetricsContext metrics, int partitionIndex, int numPartitions) { - synchronized (EsState.class) { - if (client == null) { - Settings settings = - ImmutableSettings.settingsBuilder().put("cluster.name", esConfig.getClusterName()) - .put("client.transport.sniff", "true").build(); - List<InetSocketTransportAddress> transportAddressList = new ArrayList<InetSocketTransportAddress>(); - for (String host : esConfig.getHost()) { - transportAddressList.add(new InetSocketTransportAddress(host, esConfig.getPort())); + try { + synchronized (EsState.class) { + if (client == null) { + Settings settings = + ImmutableSettings.settingsBuilder().put("cluster.name", esConfig.getClusterName()) + .put("client.transport.sniff", "true").build(); + List<InetSocketTransportAddress> transportAddressList = new ArrayList<InetSocketTransportAddress>(); + for (String node : esConfig.getNodes()) { + String[] hostAndPort = node.split(":"); + if (hostAndPort.length != 2) { + throw new Exception("incorrect ElasticSearch node format, should follow {host}:{port} pattern"); + } + transportAddressList.add(new InetSocketTransportAddress(hostAndPort[0], Integer.parseInt(hostAndPort[1]))); + } + client = new TransportClient(settings) + .addTransportAddresses(transportAddressList.toArray(new InetSocketTransportAddress[transportAddressList.size()])); } - client = new TransportClient(settings) - .addTransportAddresses(transportAddressList.toArray(new InetSocketTransportAddress[transportAddressList.size()])); } + } catch (Exception e) { + LOG.warn("unable to initialize EsState ", e); } - } public void updateState(List<TridentTuple> tuples, TridentCollector collector) { http://git-wip-us.apache.org/repos/asf/storm/blob/2002dbd7/external/storm-elasticsearch/src/test/java/org/apache/storm/elasticsearch/bolt/AbstractEsBoltTest.java ---------------------------------------------------------------------- diff --git a/external/storm-elasticsearch/src/test/java/org/apache/storm/elasticsearch/bolt/AbstractEsBoltTest.java b/external/storm-elasticsearch/src/test/java/org/apache/storm/elasticsearch/bolt/AbstractEsBoltTest.java index fdf7cd4..ae6b321 100644 --- a/external/storm-elasticsearch/src/test/java/org/apache/storm/elasticsearch/bolt/AbstractEsBoltTest.java +++ b/external/storm-elasticsearch/src/test/java/org/apache/storm/elasticsearch/bolt/AbstractEsBoltTest.java @@ -19,6 +19,7 @@ package org.apache.storm.elasticsearch.bolt; import backtype.storm.Config; import backtype.storm.task.OutputCollector; +import org.apache.commons.io.FileUtils; import org.elasticsearch.action.admin.cluster.health.ClusterHealthResponse; import org.elasticsearch.action.admin.cluster.health.ClusterHealthStatus; import org.elasticsearch.client.Requests; @@ -31,20 +32,23 @@ import org.elasticsearch.common.util.concurrent.EsExecutors; import org.elasticsearch.node.Node; import org.elasticsearch.node.NodeBuilder; import org.junit.After; +import org.junit.AfterClass; import org.junit.Before; +import org.junit.BeforeClass; + +import java.io.File; import static org.hamcrest.CoreMatchers.equalTo; import static org.hamcrest.MatcherAssert.assertThat; import static org.mockito.Mockito.mock; public class AbstractEsBoltTest { - protected Config config = new Config(); - protected OutputCollector collector = mock(OutputCollector.class); - protected Node node; + protected static Config config = new Config(); + protected static OutputCollector collector = mock(OutputCollector.class); + protected static Node node; - @Before - public void setup() throws Exception { - System.out.println("setup"); + @BeforeClass + public static void setup() throws Exception { node = NodeBuilder.nodeBuilder().data(true).settings( ImmutableSettings.builder() .put(ClusterName.SETTING, "test-cluster") @@ -59,18 +63,17 @@ public class AbstractEsBoltTest { ensureEsGreen(node); ClusterHealthResponse chr = node.client().admin().cluster() .health(Requests.clusterHealthRequest().timeout(TimeValue.timeValueSeconds(30)).waitForGreenStatus().waitForRelocatingShards(0)).actionGet(); - System.out.println(chr.getStatus()); Thread.sleep(1000); } - @After - public void cleanup() throws Exception { - System.out.println("cleanup"); + @AfterClass + public static void cleanup() throws Exception { node.stop(); node.close(); + FileUtils.deleteDirectory(new File("./data")); } - private void ensureEsGreen(Node node) { + private static void ensureEsGreen(Node node) { ClusterHealthResponse chr = node.client().admin().cluster() .health(Requests.clusterHealthRequest().timeout(TimeValue.timeValueSeconds(30)).waitForGreenStatus().waitForEvents(Priority.LANGUID).waitForRelocatingShards(0)).actionGet(); assertThat("cluster status is green", chr.getStatus(), equalTo(ClusterHealthStatus.GREEN)); http://git-wip-us.apache.org/repos/asf/storm/blob/2002dbd7/external/storm-elasticsearch/src/test/java/org/apache/storm/elasticsearch/bolt/EsIndexBoltTest.java ---------------------------------------------------------------------- diff --git a/external/storm-elasticsearch/src/test/java/org/apache/storm/elasticsearch/bolt/EsIndexBoltTest.java b/external/storm-elasticsearch/src/test/java/org/apache/storm/elasticsearch/bolt/EsIndexBoltTest.java index e66da19..28b8bf7 100644 --- a/external/storm-elasticsearch/src/test/java/org/apache/storm/elasticsearch/bolt/EsIndexBoltTest.java +++ b/external/storm-elasticsearch/src/test/java/org/apache/storm/elasticsearch/bolt/EsIndexBoltTest.java @@ -35,8 +35,7 @@ public class EsIndexBoltTest extends AbstractEsBoltTest{ throws Exception { EsConfig esConfig = new EsConfig(); esConfig.setClusterName("test-cluster"); - esConfig.setHost(new String[]{"127.0.0.1"}); - esConfig.setPort(9300); + esConfig.setNodes(new String[]{"127.0.0.1:9300"}); bolt = new EsIndexBolt(esConfig); bolt.prepare(config, null, collector); String index = "index1"; http://git-wip-us.apache.org/repos/asf/storm/blob/2002dbd7/external/storm-elasticsearch/src/test/java/org/apache/storm/elasticsearch/bolt/EsIndexTopology.java ---------------------------------------------------------------------- diff --git a/external/storm-elasticsearch/src/test/java/org/apache/storm/elasticsearch/bolt/EsIndexTopology.java b/external/storm-elasticsearch/src/test/java/org/apache/storm/elasticsearch/bolt/EsIndexTopology.java index 4a82c63..f5e868a 100644 --- a/external/storm-elasticsearch/src/test/java/org/apache/storm/elasticsearch/bolt/EsIndexTopology.java +++ b/external/storm-elasticsearch/src/test/java/org/apache/storm/elasticsearch/bolt/EsIndexTopology.java @@ -48,8 +48,7 @@ public class EsIndexTopology { builder.setSpout(SPOUT_ID, spout, 1); EsConfig esConfig = new EsConfig(); esConfig.setClusterName(EsConstants.clusterName); - esConfig.setHost(new String[]{"localhost"}); - esConfig.setPort(9300); + esConfig.setNodes(new String[]{"localhost:9300"}); builder.setBolt(BOLT_ID, new EsIndexBolt(esConfig), 1).shuffleGrouping(SPOUT_ID); EsTestUtil.startEsNode(); http://git-wip-us.apache.org/repos/asf/storm/blob/2002dbd7/external/storm-elasticsearch/src/test/java/org/apache/storm/elasticsearch/bolt/EsPercolateBoltTest.java ---------------------------------------------------------------------- diff --git a/external/storm-elasticsearch/src/test/java/org/apache/storm/elasticsearch/bolt/EsPercolateBoltTest.java b/external/storm-elasticsearch/src/test/java/org/apache/storm/elasticsearch/bolt/EsPercolateBoltTest.java index 1bd338f..4520389 100644 --- a/external/storm-elasticsearch/src/test/java/org/apache/storm/elasticsearch/bolt/EsPercolateBoltTest.java +++ b/external/storm-elasticsearch/src/test/java/org/apache/storm/elasticsearch/bolt/EsPercolateBoltTest.java @@ -36,8 +36,7 @@ public class EsPercolateBoltTest extends AbstractEsBoltTest { throws Exception { EsConfig esConfig = new EsConfig(); esConfig.setClusterName("test-cluster"); - esConfig.setHost(new String[]{"127.0.0.1"}); - esConfig.setPort(9300); + esConfig.setNodes(new String[]{"localhost:9300"}); bolt = new EsPercolateBolt(esConfig); bolt.prepare(config, null, collector); String index = "index1"; http://git-wip-us.apache.org/repos/asf/storm/blob/2002dbd7/external/storm-elasticsearch/src/test/java/org/apache/storm/elasticsearch/trident/TridentEsTopology.java ---------------------------------------------------------------------- diff --git a/external/storm-elasticsearch/src/test/java/org/apache/storm/elasticsearch/trident/TridentEsTopology.java b/external/storm-elasticsearch/src/test/java/org/apache/storm/elasticsearch/trident/TridentEsTopology.java index b1e62ff..aed06f6 100644 --- a/external/storm-elasticsearch/src/test/java/org/apache/storm/elasticsearch/trident/TridentEsTopology.java +++ b/external/storm-elasticsearch/src/test/java/org/apache/storm/elasticsearch/trident/TridentEsTopology.java @@ -50,8 +50,7 @@ public class TridentEsTopology { Stream stream = topology.newStream("spout", spout); EsConfig esConfig = new EsConfig(); esConfig.setClusterName(EsConstants.clusterName); - esConfig.setHost(new String[]{"localhost"}); - esConfig.setPort(9300); + esConfig.setNodes(new String[]{"localhost:9300"}); Fields esFields = new Fields("index", "type", "source"); StateFactory factory = new EsStateFactory(esConfig); TridentState state = stream.partitionPersist(factory, esFields, new EsUpdater(), new Fields());