Github user srdo commented on a diff in the pull request:
https://github.com/apache/storm/pull/2596#discussion_r176908022
--- Diff:
examples/storm-elasticsearch-examples/src/main/java/org/apache/storm/elasticsearch/bolt/EsIndexTopology.java
---
@@ -34,52 +39,128 @@
import org.apache.storm.tuple.Fields;
import org.apache.storm.tuple.Values;
-public class EsIndexTopology {
+/**
+ * Demonstrates an ElasticSearch Strom topology.
+ * @author unknown
+ */
+public final class EsIndexTopology {
+ /**
+ * The id of the used spout.
+ */
static final String SPOUT_ID = "spout";
+ /**
+ * The id of the used bolt.
+ */
static final String BOLT_ID = "bolt";
+ /**
+ * The name of the used topology.
+ */
static final String TOPOLOGY_NAME = "elasticsearch-test-topology1";
+ /**
+ * The number of pending tuples triggering logging.
+ */
+ private static final int PENDING_COUNT_MAX = 1000;
- public static void main(String[] args) throws Exception {
+ /**
+ * The example's main method.
+ * @param args the command line arguments
+ * @throws AlreadyAliveException if the topology is already started
+ * @throws InvalidTopologyException if the topology is invalid
+ * @throws AuthorizationException if the topology authorization fails
+ */
+ public static void main(final String[] args) throws
AlreadyAliveException,
+ InvalidTopologyException,
+ AuthorizationException {
Config config = new Config();
config.setNumWorkers(1);
TopologyBuilder builder = new TopologyBuilder();
UserDataSpout spout = new UserDataSpout();
builder.setSpout(SPOUT_ID, spout, 1);
EsTupleMapper tupleMapper =
EsTestUtil.generateDefaultTupleMapper();
EsConfig esConfig = new EsConfig("http://localhost:9300");
- builder.setBolt(BOLT_ID, new EsIndexBolt(esConfig, tupleMapper),
1).shuffleGrouping(SPOUT_ID);
+ builder.setBolt(BOLT_ID, new EsIndexBolt(esConfig, tupleMapper), 1)
+ .shuffleGrouping(SPOUT_ID);
EsTestUtil.startEsNode();
- EsTestUtil.waitForSeconds(5);
- StormSubmitter.submitTopology(TOPOLOGY_NAME, config,
builder.createTopology());
+ EsTestUtil.waitForSeconds(EsConstants.WAIT_DEFAULT);
+ StormSubmitter.submitTopology(TOPOLOGY_NAME,
+ config,
+ builder.createTopology());
}
+ /**
+ * The user data spout.
+ */
public static class UserDataSpout extends BaseRichSpout {
+ private static final long serialVersionUID = 1L;
+ /**
+ * The pending values.
+ */
private ConcurrentHashMap<UUID, Values> pending;
+ /**
+ * The collector passed in
+ * {@link #open(java.util.Map,
org.apache.storm.task.TopologyContext,
+ * org.apache.storm.spout.SpoutOutputCollector) }.
+ */
private SpoutOutputCollector collector;
+ /**
+ * The sources.
+ */
private String[] sources = {
- "{\"user\":\"user1\"}",
- "{\"user\":\"user2\"}",
- "{\"user\":\"user3\"}",
- "{\"user\":\"user4\"}"
+ "{\"user\":\"user1\"}",
+ "{\"user\":\"user2\"}",
+ "{\"user\":\"user3\"}",
+ "{\"user\":\"user4\"}"
};
+ /**
+ * The current index.
+ */
private int index = 0;
+ /**
+ * The current count.
+ */
private int count = 0;
+ /**
+ * The total.
+ */
private long total = 0L;
+ /**
+ * The index name.
+ */
private String indexName = "index1";
+ /**
+ * The type name.
+ */
private String typeName = "type1";
- public void declareOutputFields(OutputFieldsDeclarer declarer) {
+ /**
+ * Declares {@code source}, {@code index}, {@code type} and {@code
id}.
+ * @param declarer the declarer to pass to
+ */
+ @Override
+ public void declareOutputFields(final OutputFieldsDeclarer
declarer) {
declarer.declare(new Fields("source", "index", "type", "id"));
}
- public void open(Map<String, Object> config, TopologyContext
context,
- SpoutOutputCollector collector) {
- this.collector = collector;
- this.pending = new ConcurrentHashMap<UUID, Values>();
+ /**
+ * Acquires {@code collector} and initializes {@code pending}.
+ * @param config unused
+ * @param context unused
+ * @param collectorArg the collector to acquire
+ */
+ @Override
+ public void open(final Map<String, Object> config,
+ final TopologyContext context,
+ final SpoutOutputCollector collectorArg) {
+ this.collector = collectorArg;
+ this.pending = new ConcurrentHashMap<>();
}
+ /**
+ * Processes the next tuple.
--- End diff --
Nit: nextTuple makes the spout emit the next tuple, if any.
---