Repository: storm
Updated Branches:
  refs/heads/asf-site 23799a9f2 -> 22a243ab7


[STORM-1615] Update state checkpointing doc with the acking behavior


Project: http://git-wip-us.apache.org/repos/asf/storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/9fee0172
Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/9fee0172
Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/9fee0172

Branch: refs/heads/asf-site
Commit: 9fee0172f90a4f5e18369107e3cf3c71eb9d2f79
Parents: 1b2d1d6
Author: Arun Mahadevan <[email protected]>
Authored: Mon Mar 14 18:32:42 2016 +0530
Committer: Arun Mahadevan <[email protected]>
Committed: Mon Mar 14 18:32:42 2016 +0530

----------------------------------------------------------------------
 documentation/State-checkpointing.md | 12 ++++++++++--
 1 file changed, 10 insertions(+), 2 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/storm/blob/9fee0172/documentation/State-checkpointing.md
----------------------------------------------------------------------
diff --git a/documentation/State-checkpointing.md 
b/documentation/State-checkpointing.md
index c7a81f5..7c59aad 100644
--- a/documentation/State-checkpointing.md
+++ b/documentation/State-checkpointing.md
@@ -25,18 +25,24 @@ last committed by the framework during the previous run.
  ```java
  public class WordCountBolt extends BaseStatefulBolt<KeyValueState<String, 
Long>> {
  private KeyValueState<String, Long> wordCounts;
+ private OutputCollector collector;
  ...
      @Override
+     public void prepare(Map stormConf, TopologyContext context, 
OutputCollector collector) {
+       this.collector = collector;
+     }
+     @Override
      public void initState(KeyValueState<String, Long> state) {
        wordCounts = state;
      }
      @Override
-     public void execute(Tuple tuple, BasicOutputCollector collector) {
+     public void execute(Tuple tuple) {
        String word = tuple.getString(0);
        Integer count = wordCounts.get(word, 0);
        count++;
        wordCounts.put(word, count);
-       collector.emit(new Values(word, count));
+       collector.emit(tuple, new Values(word, count));
+       collector.ack(tuple);
      }
  ...
  }
@@ -110,6 +116,8 @@ duplicate state updates during recovery.
 
 The state abstraction does not eliminate duplicate evaluations and currently 
provides only at-least once guarantee.
 
+In order to provide the at-least once guarantee, all bolts in a stateful 
topology are expected to anchor the tuples while emitting and ack the input 
tuples once its processed. For non-stateful bolts, the anchoring/acking can be 
automatically managed by extending the `BaseBasicBolt`. Stateful bolts are 
expected to anchor tuples while emitting and ack the tuple after processing 
like in the `WordCountBolt` example in the State management section above.
+
 ### IStateful bolt hooks
 IStateful bolt interface provides hook methods where in the stateful bolts 
could implement some custom actions.
 ```java

Reply via email to