Repository: storm
Updated Branches:
  refs/heads/1.x-branch a7370a62f -> 3399a4a46


[STORM-1714] StatefulBolts ends up as normal bolts while using 
TopologyBuilder.setBolt without parallelism


Project: http://git-wip-us.apache.org/repos/asf/storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/3b879d62
Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/3b879d62
Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/3b879d62

Branch: refs/heads/1.x-branch
Commit: 3b879d62d68ac75310d12784d8d1ad4213fdd38e
Parents: a7370a6
Author: Arun Mahadevan <[email protected]>
Authored: Fri Apr 15 17:36:07 2016 +0530
Committer: Jungtaek Lim <[email protected]>
Committed: Wed Apr 20 17:18:12 2016 +0900

----------------------------------------------------------------------
 .../topology/CheckpointTupleForwarder.java      |  8 ++++
 .../apache/storm/topology/IStatefulBolt.java    | 19 +++++++-
 .../storm/topology/StatefulBoltExecutor.java    | 20 ++++++--
 .../apache/storm/topology/TopologyBuilder.java  | 48 ++++++++++++++++++++
 4 files changed, 91 insertions(+), 4 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/storm/blob/3b879d62/storm-core/src/jvm/org/apache/storm/topology/CheckpointTupleForwarder.java
----------------------------------------------------------------------
diff --git 
a/storm-core/src/jvm/org/apache/storm/topology/CheckpointTupleForwarder.java 
b/storm-core/src/jvm/org/apache/storm/topology/CheckpointTupleForwarder.java
index 11d0384..9d21c33 100644
--- a/storm-core/src/jvm/org/apache/storm/topology/CheckpointTupleForwarder.java
+++ b/storm-core/src/jvm/org/apache/storm/topology/CheckpointTupleForwarder.java
@@ -53,6 +53,10 @@ public class CheckpointTupleForwarder implements IRichBolt {
     private long lastTxid = Long.MIN_VALUE;
     private AnchoringOutputCollector collector;
 
+    public CheckpointTupleForwarder() {
+        this(null);
+    }
+
     public CheckpointTupleForwarder(IRichBolt bolt) {
         this.bolt = bolt;
         transactionRequestCount = new HashMap<>();
@@ -86,6 +90,10 @@ public class CheckpointTupleForwarder implements IRichBolt {
     @Override
     public void declareOutputFields(OutputFieldsDeclarer declarer) {
         bolt.declareOutputFields(declarer);
+        declareCheckpointStream(declarer);
+    }
+
+    protected void declareCheckpointStream(OutputFieldsDeclarer declarer) {
         declarer.declareStream(CHECKPOINT_STREAM_ID, new 
Fields(CHECKPOINT_FIELD_TXID, CHECKPOINT_FIELD_ACTION));
     }
 

http://git-wip-us.apache.org/repos/asf/storm/blob/3b879d62/storm-core/src/jvm/org/apache/storm/topology/IStatefulBolt.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/org/apache/storm/topology/IStatefulBolt.java 
b/storm-core/src/jvm/org/apache/storm/topology/IStatefulBolt.java
index ed55e1d..ef6c837 100644
--- a/storm-core/src/jvm/org/apache/storm/topology/IStatefulBolt.java
+++ b/storm-core/src/jvm/org/apache/storm/topology/IStatefulBolt.java
@@ -18,6 +18,11 @@
 package org.apache.storm.topology;
 
 import org.apache.storm.state.State;
+import org.apache.storm.task.OutputCollector;
+import org.apache.storm.task.TopologyContext;
+import org.apache.storm.tuple.Tuple;
+
+import java.util.Map;
 
 /**
  * A bolt abstraction for supporting stateful computation. The state of the 
bolt is
@@ -27,5 +32,17 @@ import org.apache.storm.state.State;
  * state updates. The stateful bolts are expected to anchor the tuples while 
emitting
  * and ack the input tuples once its processed.</p>
  */
-public interface IStatefulBolt<T extends State> extends IStatefulComponent<T>, 
IRichBolt {
+public interface IStatefulBolt<T extends State> extends IStatefulComponent<T> {
+    /**
+     * @see org.apache.storm.task.IBolt#prepare(Map, TopologyContext, 
OutputCollector)
+     */
+    void prepare(Map stormConf, TopologyContext context, OutputCollector 
collector);
+    /**
+     * @see org.apache.storm.task.IBolt#execute(Tuple)
+     */
+    void execute(Tuple input);
+    /**
+     * @see org.apache.storm.task.IBolt#cleanup()
+     */
+    void cleanup();
 }

http://git-wip-us.apache.org/repos/asf/storm/blob/3b879d62/storm-core/src/jvm/org/apache/storm/topology/StatefulBoltExecutor.java
----------------------------------------------------------------------
diff --git 
a/storm-core/src/jvm/org/apache/storm/topology/StatefulBoltExecutor.java 
b/storm-core/src/jvm/org/apache/storm/topology/StatefulBoltExecutor.java
index 237305e..9873084 100644
--- a/storm-core/src/jvm/org/apache/storm/topology/StatefulBoltExecutor.java
+++ b/storm-core/src/jvm/org/apache/storm/topology/StatefulBoltExecutor.java
@@ -28,7 +28,6 @@ import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import java.util.ArrayList;
-import java.util.Collection;
 import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
@@ -40,7 +39,6 @@ import static 
org.apache.storm.spout.CheckPointState.Action.COMMIT;
 import static org.apache.storm.spout.CheckPointState.Action.PREPARE;
 import static org.apache.storm.spout.CheckPointState.Action.ROLLBACK;
 import static org.apache.storm.spout.CheckPointState.Action.INITSTATE;
-
 /**
  * Wraps a {@link IStatefulBolt} and manages the state of the bolt.
  */
@@ -54,7 +52,6 @@ public class StatefulBoltExecutor<T extends State> extends 
CheckpointTupleForwar
     private AckTrackingOutputCollector collector;
 
     public StatefulBoltExecutor(IStatefulBolt<T> bolt) {
-        super(bolt);
         this.bolt = bolt;
     }
 
@@ -74,6 +71,23 @@ public class StatefulBoltExecutor<T extends State> extends 
CheckpointTupleForwar
     }
 
     @Override
+    public void cleanup() {
+        bolt.cleanup();
+    }
+
+    @Override
+    public void declareOutputFields(OutputFieldsDeclarer declarer) {
+        bolt.declareOutputFields(declarer);
+        super.declareCheckpointStream(declarer);
+    }
+
+    @Override
+    public Map<String, Object> getComponentConfiguration() {
+        return bolt.getComponentConfiguration();
+    }
+
+
+    @Override
     protected void handleCheckpoint(Tuple checkpointTuple, Action action, long 
txid) {
         LOG.debug("handleCheckPoint with tuple {}, action {}, txid {}", 
checkpointTuple, action, txid);
         if (action == PREPARE) {

http://git-wip-us.apache.org/repos/asf/storm/blob/3b879d62/storm-core/src/jvm/org/apache/storm/topology/TopologyBuilder.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/org/apache/storm/topology/TopologyBuilder.java 
b/storm-core/src/jvm/org/apache/storm/topology/TopologyBuilder.java
index 5b7d499..92cad77 100644
--- a/storm-core/src/jvm/org/apache/storm/topology/TopologyBuilder.java
+++ b/storm-core/src/jvm/org/apache/storm/topology/TopologyBuilder.java
@@ -221,6 +221,20 @@ public class TopologyBuilder {
      *
      * @param id the id of this component. This id is referenced by other 
components that want to consume this bolt's outputs.
      * @param bolt the windowed bolt
+     * @return use the returned object to declare the inputs to this component
+     * @throws IllegalArgumentException if {@code parallelism_hint} is not 
positive
+     */
+    public BoltDeclarer setBolt(String id, IWindowedBolt bolt) throws 
IllegalArgumentException {
+        return setBolt(id, bolt, null);
+    }
+
+    /**
+     * Define a new bolt in this topology. This defines a windowed bolt, 
intended
+     * for windowing operations. The {@link 
IWindowedBolt#execute(TupleWindow)} method
+     * is triggered for each window interval with the list of current events 
in the window.
+     *
+     * @param id the id of this component. This id is referenced by other 
components that want to consume this bolt's outputs.
+     * @param bolt the windowed bolt
      * @param parallelism_hint the number of tasks that should be assigned to 
execute this bolt. Each task will run on a thread in a process somwehere around 
the cluster.
      * @return use the returned object to declare the inputs to this component
      * @throws IllegalArgumentException if {@code parallelism_hint} is not 
positive
@@ -240,6 +254,24 @@ public class TopologyBuilder {
      * </p>
      * @param id the id of this component. This id is referenced by other 
components that want to consume this bolt's outputs.
      * @param bolt the stateful bolt
+     * @return use the returned object to declare the inputs to this component
+     * @throws IllegalArgumentException if {@code parallelism_hint} is not 
positive
+     */
+    public <T extends State> BoltDeclarer setBolt(String id, IStatefulBolt<T> 
bolt) throws IllegalArgumentException {
+        return setBolt(id, bolt, null);
+    }
+
+    /**
+     * Define a new bolt in this topology. This defines a stateful bolt, that 
requires its
+     * state (of computation) to be saved. When this bolt is initialized, the 
{@link IStatefulBolt#initState(State)} method
+     * is invoked after {@link IStatefulBolt#prepare(Map, TopologyContext, 
OutputCollector)} but before {@link IStatefulBolt#execute(Tuple)}
+     * with its previously saved state.
+     * <p>
+     * The framework provides at-least once guarantee for the state updates. 
Bolts (both stateful and non-stateful) in a stateful topology
+     * are expected to anchor the tuples while emitting and ack the input 
tuples once its processed.
+     * </p>
+     * @param id the id of this component. This id is referenced by other 
components that want to consume this bolt's outputs.
+     * @param bolt the stateful bolt
      * @param parallelism_hint the number of tasks that should be assigned to 
execute this bolt. Each task will run on a thread in a process somwehere around 
the cluster.
      * @return use the returned object to declare the inputs to this component
      * @throws IllegalArgumentException if {@code parallelism_hint} is not 
positive
@@ -257,6 +289,22 @@ public class TopologyBuilder {
      *
      * @param id the id of this component. This id is referenced by other 
components that want to consume this bolt's outputs.
      * @param bolt the stateful windowed bolt
+     * @param <T> the type of the state (e.g. {@link 
org.apache.storm.state.KeyValueState})
+     * @return use the returned object to declare the inputs to this component
+     * @throws IllegalArgumentException if {@code parallelism_hint} is not 
positive
+     */
+    public <T extends State> BoltDeclarer setBolt(String id, 
IStatefulWindowedBolt<T> bolt) throws IllegalArgumentException {
+        return setBolt(id, bolt, null);
+    }
+
+    /**
+     * Define a new bolt in this topology. This defines a stateful windowed 
bolt, intended for stateful
+     * windowing operations. The {@link 
IStatefulWindowedBolt#execute(TupleWindow)} method is triggered
+     * for each window interval with the list of current events in the window. 
During initialization of
+     * this bolt {@link IStatefulWindowedBolt#initState(State)} is invoked 
with its previously saved state.
+     *
+     * @param id the id of this component. This id is referenced by other 
components that want to consume this bolt's outputs.
+     * @param bolt the stateful windowed bolt
      * @param parallelism_hint the number of tasks that should be assigned to 
execute this bolt. Each task will run on a thread in a process somwehere around 
the cluster.
      * @param <T> the type of the state (e.g. {@link 
org.apache.storm.state.KeyValueState})
      * @return use the returned object to declare the inputs to this component

Reply via email to