Repository: storm Updated Branches: refs/heads/asf-site 23799a9f2 -> 22a243ab7
[STORM-1615] Update state checkpointing doc with the acking behavior Project: http://git-wip-us.apache.org/repos/asf/storm/repo Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/9fee0172 Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/9fee0172 Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/9fee0172 Branch: refs/heads/asf-site Commit: 9fee0172f90a4f5e18369107e3cf3c71eb9d2f79 Parents: 1b2d1d6 Author: Arun Mahadevan <[email protected]> Authored: Mon Mar 14 18:32:42 2016 +0530 Committer: Arun Mahadevan <[email protected]> Committed: Mon Mar 14 18:32:42 2016 +0530 ---------------------------------------------------------------------- documentation/State-checkpointing.md | 12 ++++++++++-- 1 file changed, 10 insertions(+), 2 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/storm/blob/9fee0172/documentation/State-checkpointing.md ---------------------------------------------------------------------- diff --git a/documentation/State-checkpointing.md b/documentation/State-checkpointing.md index c7a81f5..7c59aad 100644 --- a/documentation/State-checkpointing.md +++ b/documentation/State-checkpointing.md @@ -25,18 +25,24 @@ last committed by the framework during the previous run. ```java public class WordCountBolt extends BaseStatefulBolt<KeyValueState<String, Long>> { private KeyValueState<String, Long> wordCounts; + private OutputCollector collector; ... @Override + public void prepare(Map stormConf, TopologyContext context, OutputCollector collector) { + this.collector = collector; + } + @Override public void initState(KeyValueState<String, Long> state) { wordCounts = state; } @Override - public void execute(Tuple tuple, BasicOutputCollector collector) { + public void execute(Tuple tuple) { String word = tuple.getString(0); Integer count = wordCounts.get(word, 0); count++; wordCounts.put(word, count); - collector.emit(new Values(word, count)); + collector.emit(tuple, new Values(word, count)); + collector.ack(tuple); } ... } @@ -110,6 +116,8 @@ duplicate state updates during recovery. The state abstraction does not eliminate duplicate evaluations and currently provides only at-least once guarantee. +In order to provide the at-least once guarantee, all bolts in a stateful topology are expected to anchor the tuples while emitting and ack the input tuples once its processed. For non-stateful bolts, the anchoring/acking can be automatically managed by extending the `BaseBasicBolt`. Stateful bolts are expected to anchor tuples while emitting and ack the tuple after processing like in the `WordCountBolt` example in the State management section above. + ### IStateful bolt hooks IStateful bolt interface provides hook methods where in the stateful bolts could implement some custom actions. ```java
