Repository: storm Updated Branches: refs/heads/master e6b57ce4b -> 1677de1a1
[STORM-1714] StatefulBolts ends up as normal bolts while using TopologyBuilder.setBolt without parallelism Project: http://git-wip-us.apache.org/repos/asf/storm/repo Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/8868a58c Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/8868a58c Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/8868a58c Branch: refs/heads/master Commit: 8868a58c27b6baee3d8fa1487434fc58509e514d Parents: 97e131d Author: Arun Mahadevan <[email protected]> Authored: Fri Apr 15 17:36:07 2016 +0530 Committer: Arun Mahadevan <[email protected]> Committed: Fri Apr 15 20:53:36 2016 +0530 ---------------------------------------------------------------------- .../topology/CheckpointTupleForwarder.java | 8 ++++ .../apache/storm/topology/IStatefulBolt.java | 19 +++++++- .../storm/topology/StatefulBoltExecutor.java | 20 ++++++-- .../apache/storm/topology/TopologyBuilder.java | 48 ++++++++++++++++++++ 4 files changed, 91 insertions(+), 4 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/storm/blob/8868a58c/storm-core/src/jvm/org/apache/storm/topology/CheckpointTupleForwarder.java ---------------------------------------------------------------------- diff --git a/storm-core/src/jvm/org/apache/storm/topology/CheckpointTupleForwarder.java b/storm-core/src/jvm/org/apache/storm/topology/CheckpointTupleForwarder.java index 11d0384..9d21c33 100644 --- a/storm-core/src/jvm/org/apache/storm/topology/CheckpointTupleForwarder.java +++ b/storm-core/src/jvm/org/apache/storm/topology/CheckpointTupleForwarder.java @@ -53,6 +53,10 @@ public class CheckpointTupleForwarder implements IRichBolt { private long lastTxid = Long.MIN_VALUE; private AnchoringOutputCollector collector; + public CheckpointTupleForwarder() { + this(null); + } + public CheckpointTupleForwarder(IRichBolt bolt) { this.bolt = bolt; transactionRequestCount = new HashMap<>(); @@ -86,6 +90,10 @@ public class CheckpointTupleForwarder implements IRichBolt { @Override public void declareOutputFields(OutputFieldsDeclarer declarer) { bolt.declareOutputFields(declarer); + declareCheckpointStream(declarer); + } + + protected void declareCheckpointStream(OutputFieldsDeclarer declarer) { declarer.declareStream(CHECKPOINT_STREAM_ID, new Fields(CHECKPOINT_FIELD_TXID, CHECKPOINT_FIELD_ACTION)); } http://git-wip-us.apache.org/repos/asf/storm/blob/8868a58c/storm-core/src/jvm/org/apache/storm/topology/IStatefulBolt.java ---------------------------------------------------------------------- diff --git a/storm-core/src/jvm/org/apache/storm/topology/IStatefulBolt.java b/storm-core/src/jvm/org/apache/storm/topology/IStatefulBolt.java index ed55e1d..ef6c837 100644 --- a/storm-core/src/jvm/org/apache/storm/topology/IStatefulBolt.java +++ b/storm-core/src/jvm/org/apache/storm/topology/IStatefulBolt.java @@ -18,6 +18,11 @@ package org.apache.storm.topology; import org.apache.storm.state.State; +import org.apache.storm.task.OutputCollector; +import org.apache.storm.task.TopologyContext; +import org.apache.storm.tuple.Tuple; + +import java.util.Map; /** * A bolt abstraction for supporting stateful computation. The state of the bolt is @@ -27,5 +32,17 @@ import org.apache.storm.state.State; * state updates. The stateful bolts are expected to anchor the tuples while emitting * and ack the input tuples once its processed.</p> */ -public interface IStatefulBolt<T extends State> extends IStatefulComponent<T>, IRichBolt { +public interface IStatefulBolt<T extends State> extends IStatefulComponent<T> { + /** + * @see org.apache.storm.task.IBolt#prepare(Map, TopologyContext, OutputCollector) + */ + void prepare(Map stormConf, TopologyContext context, OutputCollector collector); + /** + * @see org.apache.storm.task.IBolt#execute(Tuple) + */ + void execute(Tuple input); + /** + * @see org.apache.storm.task.IBolt#cleanup() + */ + void cleanup(); } http://git-wip-us.apache.org/repos/asf/storm/blob/8868a58c/storm-core/src/jvm/org/apache/storm/topology/StatefulBoltExecutor.java ---------------------------------------------------------------------- diff --git a/storm-core/src/jvm/org/apache/storm/topology/StatefulBoltExecutor.java b/storm-core/src/jvm/org/apache/storm/topology/StatefulBoltExecutor.java index 237305e..9873084 100644 --- a/storm-core/src/jvm/org/apache/storm/topology/StatefulBoltExecutor.java +++ b/storm-core/src/jvm/org/apache/storm/topology/StatefulBoltExecutor.java @@ -28,7 +28,6 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; import java.util.ArrayList; -import java.util.Collection; import java.util.Iterator; import java.util.List; import java.util.Map; @@ -40,7 +39,6 @@ import static org.apache.storm.spout.CheckPointState.Action.COMMIT; import static org.apache.storm.spout.CheckPointState.Action.PREPARE; import static org.apache.storm.spout.CheckPointState.Action.ROLLBACK; import static org.apache.storm.spout.CheckPointState.Action.INITSTATE; - /** * Wraps a {@link IStatefulBolt} and manages the state of the bolt. */ @@ -54,7 +52,6 @@ public class StatefulBoltExecutor<T extends State> extends CheckpointTupleForwar private AckTrackingOutputCollector collector; public StatefulBoltExecutor(IStatefulBolt<T> bolt) { - super(bolt); this.bolt = bolt; } @@ -74,6 +71,23 @@ public class StatefulBoltExecutor<T extends State> extends CheckpointTupleForwar } @Override + public void cleanup() { + bolt.cleanup(); + } + + @Override + public void declareOutputFields(OutputFieldsDeclarer declarer) { + bolt.declareOutputFields(declarer); + super.declareCheckpointStream(declarer); + } + + @Override + public Map<String, Object> getComponentConfiguration() { + return bolt.getComponentConfiguration(); + } + + + @Override protected void handleCheckpoint(Tuple checkpointTuple, Action action, long txid) { LOG.debug("handleCheckPoint with tuple {}, action {}, txid {}", checkpointTuple, action, txid); if (action == PREPARE) { http://git-wip-us.apache.org/repos/asf/storm/blob/8868a58c/storm-core/src/jvm/org/apache/storm/topology/TopologyBuilder.java ---------------------------------------------------------------------- diff --git a/storm-core/src/jvm/org/apache/storm/topology/TopologyBuilder.java b/storm-core/src/jvm/org/apache/storm/topology/TopologyBuilder.java index 5b7d499..92cad77 100644 --- a/storm-core/src/jvm/org/apache/storm/topology/TopologyBuilder.java +++ b/storm-core/src/jvm/org/apache/storm/topology/TopologyBuilder.java @@ -221,6 +221,20 @@ public class TopologyBuilder { * * @param id the id of this component. This id is referenced by other components that want to consume this bolt's outputs. * @param bolt the windowed bolt + * @return use the returned object to declare the inputs to this component + * @throws IllegalArgumentException if {@code parallelism_hint} is not positive + */ + public BoltDeclarer setBolt(String id, IWindowedBolt bolt) throws IllegalArgumentException { + return setBolt(id, bolt, null); + } + + /** + * Define a new bolt in this topology. This defines a windowed bolt, intended + * for windowing operations. The {@link IWindowedBolt#execute(TupleWindow)} method + * is triggered for each window interval with the list of current events in the window. + * + * @param id the id of this component. This id is referenced by other components that want to consume this bolt's outputs. + * @param bolt the windowed bolt * @param parallelism_hint the number of tasks that should be assigned to execute this bolt. Each task will run on a thread in a process somwehere around the cluster. * @return use the returned object to declare the inputs to this component * @throws IllegalArgumentException if {@code parallelism_hint} is not positive @@ -240,6 +254,24 @@ public class TopologyBuilder { * </p> * @param id the id of this component. This id is referenced by other components that want to consume this bolt's outputs. * @param bolt the stateful bolt + * @return use the returned object to declare the inputs to this component + * @throws IllegalArgumentException if {@code parallelism_hint} is not positive + */ + public <T extends State> BoltDeclarer setBolt(String id, IStatefulBolt<T> bolt) throws IllegalArgumentException { + return setBolt(id, bolt, null); + } + + /** + * Define a new bolt in this topology. This defines a stateful bolt, that requires its + * state (of computation) to be saved. When this bolt is initialized, the {@link IStatefulBolt#initState(State)} method + * is invoked after {@link IStatefulBolt#prepare(Map, TopologyContext, OutputCollector)} but before {@link IStatefulBolt#execute(Tuple)} + * with its previously saved state. + * <p> + * The framework provides at-least once guarantee for the state updates. Bolts (both stateful and non-stateful) in a stateful topology + * are expected to anchor the tuples while emitting and ack the input tuples once its processed. + * </p> + * @param id the id of this component. This id is referenced by other components that want to consume this bolt's outputs. + * @param bolt the stateful bolt * @param parallelism_hint the number of tasks that should be assigned to execute this bolt. Each task will run on a thread in a process somwehere around the cluster. * @return use the returned object to declare the inputs to this component * @throws IllegalArgumentException if {@code parallelism_hint} is not positive @@ -257,6 +289,22 @@ public class TopologyBuilder { * * @param id the id of this component. This id is referenced by other components that want to consume this bolt's outputs. * @param bolt the stateful windowed bolt + * @param <T> the type of the state (e.g. {@link org.apache.storm.state.KeyValueState}) + * @return use the returned object to declare the inputs to this component + * @throws IllegalArgumentException if {@code parallelism_hint} is not positive + */ + public <T extends State> BoltDeclarer setBolt(String id, IStatefulWindowedBolt<T> bolt) throws IllegalArgumentException { + return setBolt(id, bolt, null); + } + + /** + * Define a new bolt in this topology. This defines a stateful windowed bolt, intended for stateful + * windowing operations. The {@link IStatefulWindowedBolt#execute(TupleWindow)} method is triggered + * for each window interval with the list of current events in the window. During initialization of + * this bolt {@link IStatefulWindowedBolt#initState(State)} is invoked with its previously saved state. + * + * @param id the id of this component. This id is referenced by other components that want to consume this bolt's outputs. + * @param bolt the stateful windowed bolt * @param parallelism_hint the number of tasks that should be assigned to execute this bolt. Each task will run on a thread in a process somwehere around the cluster. * @param <T> the type of the state (e.g. {@link org.apache.storm.state.KeyValueState}) * @return use the returned object to declare the inputs to this component
