STORM-1878: Added a Flux example using a stateful bolt The example resumes word counting even over topology restarts if the state is persisted in Redis.
Project: http://git-wip-us.apache.org/repos/asf/storm/repo Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/f909ce91 Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/f909ce91 Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/f909ce91 Branch: refs/heads/1.x-branch Commit: f909ce91520448a399e70c4ba2fba18d5276ef2e Parents: 3722b93 Author: Daniel Klessing <[email protected]> Authored: Fri Jun 3 13:17:29 2016 +0200 Committer: Arun Mahadevan <[email protected]> Committed: Tue Jun 7 12:04:54 2016 +0530 ---------------------------------------------------------------------- external/flux/flux-examples/README.md | 18 ++++++ .../flux/examples/StatefulWordCounter.java | 64 ++++++++++++++++++++ .../resources/simple_stateful_wordcount.yaml | 60 ++++++++++++++++++ 3 files changed, 142 insertions(+) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/storm/blob/f909ce91/external/flux/flux-examples/README.md ---------------------------------------------------------------------- diff --git a/external/flux/flux-examples/README.md b/external/flux/flux-examples/README.md index a6afec2..3d610b4 100644 --- a/external/flux/flux-examples/README.md +++ b/external/flux/flux-examples/README.md @@ -39,6 +39,7 @@ Another wordcount example that uses a spout written in JavaScript (node.js), a b written in java. ### [kafka_spout.yaml](src/main/resources/kafka_spout.yaml) + This example illustrates how to configure Storm's `storm-kafka` spout using Flux YAML DSL `components`, `references`, and `constructor arguments` constructs. @@ -64,6 +65,7 @@ To run the `simple_hbase.yaml` example, copy the `hbase_bolt.properties` file to ```bash storm jar ./target/flux-examples-*.jar org.apache.storm.flux.Flux --local ./src/main/resources/simple_hbase.yaml --filter my_hbase_bolt.properties ``` + ### [simple_windowing.yaml](src/main/resources/simple_windowing.yaml) This example illustrates how to use Flux to set up a storm topology that contains windowing operations. @@ -73,3 +75,19 @@ To run, ```bash storm jar ./target/flux-examples-*.jar org.apache.storm.flux.Flux --local ./src/main/resources/simple_windowing.yaml ``` + +### [simple_stateful_wordcount.yaml](src/main/resources/simple_stateful_wordcount.yaml) + +Flux also supports stateful bolts which is illustrated with this example. It is basically an extension of the basic wordcount example. +The state is periodically saved (checkpointed) and restored when the topology is restarted. + +```bash +storm jar ./target/flux-examples-*.jar org.apache.storm.flux.Flux --local ./src/main/resources/simple_stateful_wordcount.yaml +``` + +By default the state is stored in-memory only. As such you won't see a resumed state unless you configure to use Redis as the state backend. +Ensure that you have Redis running at `localhost:6379` and that `storm-redis-*.jar` is in the classpath. + +```bash +STORM_EXT_CLASSPATH=../../storm-redis/target storm jar ./target/flux-examples-*.jar -c topology.state.provider=org.apache.storm.redis.state.RedisKeyValueStateProvider org.apache.storm.flux.Flux --local ./src/main/resources/simple_stateful_wordcount.yaml +``` http://git-wip-us.apache.org/repos/asf/storm/blob/f909ce91/external/flux/flux-examples/src/main/java/org/apache/storm/flux/examples/StatefulWordCounter.java ---------------------------------------------------------------------- diff --git a/external/flux/flux-examples/src/main/java/org/apache/storm/flux/examples/StatefulWordCounter.java b/external/flux/flux-examples/src/main/java/org/apache/storm/flux/examples/StatefulWordCounter.java new file mode 100644 index 0000000..5534888 --- /dev/null +++ b/external/flux/flux-examples/src/main/java/org/apache/storm/flux/examples/StatefulWordCounter.java @@ -0,0 +1,64 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.storm.flux.examples; + +import org.apache.storm.state.KeyValueState; +import org.apache.storm.task.OutputCollector; +import org.apache.storm.task.TopologyContext; +import org.apache.storm.topology.base.BaseStatefulBolt; +import org.apache.storm.topology.OutputFieldsDeclarer; +import org.apache.storm.tuple.Fields; +import org.apache.storm.tuple.Tuple; +import org.apache.storm.tuple.Values; + +import java.util.Map; + +public class StatefulWordCounter extends BaseStatefulBolt<KeyValueState<String, Long>> { + + private KeyValueState<String, Long> wordCounts; + private OutputCollector collector; + + @SuppressWarnings("rawtypes") + @Override + public void prepare(Map stormConf, TopologyContext context, OutputCollector collector) { + this.collector = collector; + } + + @Override + public void initState(KeyValueState<String, Long> state) { + wordCounts = state; + } + + @Override + public void execute(Tuple tuple) { + String word = tuple.getString(0); + + Long count = wordCounts.get(word, 0L); + count++; + wordCounts.put(word, count); + + collector.emit(tuple, new Values(word, count)); + collector.ack(tuple); + } + + @Override + public void declareOutputFields(OutputFieldsDeclarer declarer) { + declarer.declare(new Fields("word", "count")); + } + +} http://git-wip-us.apache.org/repos/asf/storm/blob/f909ce91/external/flux/flux-examples/src/main/resources/simple_stateful_wordcount.yaml ---------------------------------------------------------------------- diff --git a/external/flux/flux-examples/src/main/resources/simple_stateful_wordcount.yaml b/external/flux/flux-examples/src/main/resources/simple_stateful_wordcount.yaml new file mode 100644 index 0000000..14b9b3a --- /dev/null +++ b/external/flux/flux-examples/src/main/resources/simple_stateful_wordcount.yaml @@ -0,0 +1,60 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +--- + +# topology definition +# name to be used when submitting +name: "stateful-wordcount-topology" + +# topology configuration +# this will be passed to the submitter as a map of config options +# +config: + topology.workers: 1 + +# spout definitions +spouts: + - id: "spout-1" + className: "org.apache.storm.testing.TestWordSpout" + parallelism: 1 + +# bolt definitions +bolts: + - id: "bolt-1" + className: "org.apache.storm.flux.examples.StatefulWordCounter" + parallelism: 1 + + - id: "bolt-2" + className: "org.apache.storm.flux.wrappers.bolts.LogInfoBolt" + parallelism: 1 + +#stream definitions +# stream definitions define connections between spouts and bolts. +# note that such connections can be cyclical +streams: + - name: "spout-1 --> bolt-1" # name isn't used (placeholder for logging, UI, etc.) + from: "spout-1" + to: "bolt-1" + grouping: + type: FIELDS + args: ["word"] + + - name: "bolt-1 --> bolt2" + from: "bolt-1" + to: "bolt-2" + grouping: + type: SHUFFLE
