Repository: storm Updated Branches: refs/heads/0.10.x-branch 511869c6f -> 81505f9c7
http://git-wip-us.apache.org/repos/asf/storm/blob/0edb8ab9/external/storm-redis/src/test/java/org/apache/storm/redis/topology/PersistentWordCount.java ---------------------------------------------------------------------- diff --git a/external/storm-redis/src/test/java/org/apache/storm/redis/topology/PersistentWordCount.java b/external/storm-redis/src/test/java/org/apache/storm/redis/topology/PersistentWordCount.java index 535d7b9..77c6ee8 100644 --- a/external/storm-redis/src/test/java/org/apache/storm/redis/topology/PersistentWordCount.java +++ b/external/storm-redis/src/test/java/org/apache/storm/redis/topology/PersistentWordCount.java @@ -23,10 +23,14 @@ import backtype.storm.StormSubmitter; import backtype.storm.topology.OutputFieldsDeclarer; import backtype.storm.topology.TopologyBuilder; import backtype.storm.tuple.Fields; +import backtype.storm.tuple.ITuple; import backtype.storm.tuple.Tuple; import org.apache.storm.redis.bolt.AbstractRedisBolt; -import org.apache.storm.redis.util.config.JedisClusterConfig; -import org.apache.storm.redis.util.config.JedisPoolConfig; +import org.apache.storm.redis.bolt.RedisStoreBolt; +import org.apache.storm.redis.common.config.JedisClusterConfig; +import org.apache.storm.redis.common.config.JedisPoolConfig; +import org.apache.storm.redis.common.mapper.RedisDataTypeDescription; +import org.apache.storm.redis.common.mapper.RedisStoreMapper; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import redis.clients.jedis.JedisCommands; @@ -36,46 +40,11 @@ import redis.clients.jedis.exceptions.JedisException; public class PersistentWordCount { private static final String WORD_SPOUT = "WORD_SPOUT"; private static final String COUNT_BOLT = "COUNT_BOLT"; - private static final String REDIS_BOLT = "REDIS_BOLT"; + private static final String STORE_BOLT = "STORE_BOLT"; private static final String TEST_REDIS_HOST = "127.0.0.1"; private static final int TEST_REDIS_PORT = 6379; - public static class StoreCountRedisBolt extends AbstractRedisBolt { - private static final Logger LOG = LoggerFactory.getLogger(StoreCountRedisBolt.class); - - public StoreCountRedisBolt(JedisPoolConfig config) { - super(config); - } - - public StoreCountRedisBolt(JedisClusterConfig config) { - super(config); - } - - @Override - public void execute(Tuple input) { - String word = input.getStringByField("word"); - int count = input.getIntegerByField("count"); - - JedisCommands commands = null; - try { - commands = getInstance(); - commands.incrBy(word, count); - } catch (JedisConnectionException e) { - throw new RuntimeException("Unfortunately, this test requires redis-server running", e); - } catch (JedisException e) { - LOG.error("Exception occurred from Jedis/Redis", e); - } finally { - returnInstance(commands); - this.collector.ack(input); - } - } - - @Override - public void declareOutputFields(OutputFieldsDeclarer declarer) { - } - } - public static void main(String[] args) throws Exception { Config config = new Config(); @@ -92,14 +61,15 @@ public class PersistentWordCount { WordSpout spout = new WordSpout(); WordCounter bolt = new WordCounter(); - StoreCountRedisBolt redisBolt = new StoreCountRedisBolt(poolConfig); + RedisStoreMapper storeMapper = setupStoreMapper(); + RedisStoreBolt storeBolt = new RedisStoreBolt(poolConfig, storeMapper); // wordSpout ==> countBolt ==> RedisBolt TopologyBuilder builder = new TopologyBuilder(); builder.setSpout(WORD_SPOUT, spout, 1); - builder.setBolt(COUNT_BOLT, bolt, 1).shuffleGrouping(WORD_SPOUT); - builder.setBolt(REDIS_BOLT, redisBolt, 1).fieldsGrouping(COUNT_BOLT, new Fields("word")); + builder.setBolt(COUNT_BOLT, bolt, 1).fieldsGrouping(WORD_SPOUT, new Fields("word")); + builder.setBolt(STORE_BOLT, storeBolt, 1).shuffleGrouping(COUNT_BOLT); if (args.length == 2) { LocalCluster cluster = new LocalCluster(); @@ -114,4 +84,33 @@ public class PersistentWordCount { System.out.println("Usage: PersistentWordCount <redis host> <redis port> (topology name)"); } } + + private static RedisStoreMapper setupStoreMapper() { + return new WordCountStoreMapper(); + } + + private static class WordCountStoreMapper implements RedisStoreMapper { + private RedisDataTypeDescription description; + private final String hashKey = "wordCount"; + + public WordCountStoreMapper() { + description = new RedisDataTypeDescription( + RedisDataTypeDescription.RedisDataType.HASH, hashKey); + } + + @Override + public RedisDataTypeDescription getDataTypeDescription() { + return description; + } + + @Override + public String getKeyFromTuple(ITuple tuple) { + return tuple.getStringByField("word"); + } + + @Override + public String getValueFromTuple(ITuple tuple) { + return tuple.getStringByField("count"); + } + } } http://git-wip-us.apache.org/repos/asf/storm/blob/0edb8ab9/external/storm-redis/src/test/java/org/apache/storm/redis/topology/WordCounter.java ---------------------------------------------------------------------- diff --git a/external/storm-redis/src/test/java/org/apache/storm/redis/topology/WordCounter.java b/external/storm-redis/src/test/java/org/apache/storm/redis/topology/WordCounter.java index 6a0548d..6f25038 100644 --- a/external/storm-redis/src/test/java/org/apache/storm/redis/topology/WordCounter.java +++ b/external/storm-redis/src/test/java/org/apache/storm/redis/topology/WordCounter.java @@ -23,23 +23,32 @@ import backtype.storm.topology.IBasicBolt; import backtype.storm.topology.OutputFieldsDeclarer; import backtype.storm.tuple.Fields; import backtype.storm.tuple.Tuple; +import backtype.storm.tuple.Values; +import com.google.common.collect.Maps; import java.util.Map; import static backtype.storm.utils.Utils.tuple; public class WordCounter implements IBasicBolt { - + private Map<String, Integer> wordCounter = Maps.newHashMap(); @SuppressWarnings("rawtypes") public void prepare(Map stormConf, TopologyContext context) { } - /* - * Just output the word value with a count of 1. - */ public void execute(Tuple input, BasicOutputCollector collector) { - collector.emit(tuple(input.getValues().get(0), 1)); + String word = input.getStringByField("word"); + int count; + if (wordCounter.containsKey(word)) { + count = wordCounter.get(word) + 1; + wordCounter.put(word, wordCounter.get(word) + 1); + } else { + count = 1; + } + + wordCounter.put(word, count); + collector.emit(new Values(word, String.valueOf(count))); } public void cleanup() { http://git-wip-us.apache.org/repos/asf/storm/blob/0edb8ab9/external/storm-redis/src/test/java/org/apache/storm/redis/trident/WordCountTridentRedis.java ---------------------------------------------------------------------- diff --git a/external/storm-redis/src/test/java/org/apache/storm/redis/trident/WordCountTridentRedis.java b/external/storm-redis/src/test/java/org/apache/storm/redis/trident/WordCountTridentRedis.java index a610f54..eb13399 100644 --- a/external/storm-redis/src/test/java/org/apache/storm/redis/trident/WordCountTridentRedis.java +++ b/external/storm-redis/src/test/java/org/apache/storm/redis/trident/WordCountTridentRedis.java @@ -23,15 +23,14 @@ import backtype.storm.StormSubmitter; import backtype.storm.generated.StormTopology; import backtype.storm.tuple.Fields; import backtype.storm.tuple.Values; -import org.apache.storm.redis.trident.mapper.TridentTupleMapper; +import org.apache.storm.redis.common.mapper.TupleMapper; import org.apache.storm.redis.trident.state.RedisState; import org.apache.storm.redis.trident.state.RedisStateQuerier; import org.apache.storm.redis.trident.state.RedisStateUpdater; -import org.apache.storm.redis.util.config.JedisPoolConfig; +import org.apache.storm.redis.common.config.JedisPoolConfig; import storm.trident.Stream; import storm.trident.TridentState; import storm.trident.TridentTopology; -import storm.trident.state.StateFactory; import storm.trident.testing.FixedBatchSpout; public class WordCountTridentRedis { @@ -48,7 +47,7 @@ public class WordCountTridentRedis { JedisPoolConfig poolConfig = new JedisPoolConfig.Builder() .setHost(redisHost).setPort(redisPort) .build(); - TridentTupleMapper tupleMapper = new WordCountTupleMapper(); + TupleMapper tupleMapper = new WordCountTupleMapper(); RedisState.Factory factory = new RedisState.Factory(poolConfig); TridentTopology topology = new TridentTopology(); http://git-wip-us.apache.org/repos/asf/storm/blob/0edb8ab9/external/storm-redis/src/test/java/org/apache/storm/redis/trident/WordCountTridentRedisCluster.java ---------------------------------------------------------------------- diff --git a/external/storm-redis/src/test/java/org/apache/storm/redis/trident/WordCountTridentRedisCluster.java b/external/storm-redis/src/test/java/org/apache/storm/redis/trident/WordCountTridentRedisCluster.java index 8bea3ce..8562e77 100644 --- a/external/storm-redis/src/test/java/org/apache/storm/redis/trident/WordCountTridentRedisCluster.java +++ b/external/storm-redis/src/test/java/org/apache/storm/redis/trident/WordCountTridentRedisCluster.java @@ -23,11 +23,11 @@ import backtype.storm.StormSubmitter; import backtype.storm.generated.StormTopology; import backtype.storm.tuple.Fields; import backtype.storm.tuple.Values; -import org.apache.storm.redis.trident.mapper.TridentTupleMapper; +import org.apache.storm.redis.common.mapper.TupleMapper; import org.apache.storm.redis.trident.state.RedisClusterState; import org.apache.storm.redis.trident.state.RedisClusterStateQuerier; import org.apache.storm.redis.trident.state.RedisClusterStateUpdater; -import org.apache.storm.redis.util.config.JedisClusterConfig; +import org.apache.storm.redis.common.config.JedisClusterConfig; import storm.trident.Stream; import storm.trident.TridentState; import storm.trident.TridentTopology; @@ -55,7 +55,7 @@ public class WordCountTridentRedisCluster { } JedisClusterConfig clusterConfig = new JedisClusterConfig.Builder().setNodes(nodes) .build(); - TridentTupleMapper tupleMapper = new WordCountTupleMapper(); + TupleMapper tupleMapper = new WordCountTupleMapper(); RedisClusterState.Factory factory = new RedisClusterState.Factory(clusterConfig); TridentTopology topology = new TridentTopology(); http://git-wip-us.apache.org/repos/asf/storm/blob/0edb8ab9/external/storm-redis/src/test/java/org/apache/storm/redis/trident/WordCountTridentRedisClusterMap.java ---------------------------------------------------------------------- diff --git a/external/storm-redis/src/test/java/org/apache/storm/redis/trident/WordCountTridentRedisClusterMap.java b/external/storm-redis/src/test/java/org/apache/storm/redis/trident/WordCountTridentRedisClusterMap.java index e9ae54d..de1f252 100644 --- a/external/storm-redis/src/test/java/org/apache/storm/redis/trident/WordCountTridentRedisClusterMap.java +++ b/external/storm-redis/src/test/java/org/apache/storm/redis/trident/WordCountTridentRedisClusterMap.java @@ -23,11 +23,9 @@ import backtype.storm.StormSubmitter; import backtype.storm.generated.StormTopology; import backtype.storm.tuple.Fields; import backtype.storm.tuple.Values; -import org.apache.storm.redis.trident.mapper.TridentTupleMapper; +import org.apache.storm.redis.common.mapper.TupleMapper; import org.apache.storm.redis.trident.state.RedisClusterMapState; -import org.apache.storm.redis.trident.state.RedisClusterStateUpdater; -import org.apache.storm.redis.trident.state.RedisStateQuerier; -import org.apache.storm.redis.util.config.JedisClusterConfig; +import org.apache.storm.redis.common.config.JedisClusterConfig; import storm.trident.Stream; import storm.trident.TridentState; import storm.trident.TridentTopology; @@ -58,7 +56,7 @@ public class WordCountTridentRedisClusterMap { } JedisClusterConfig clusterConfig = new JedisClusterConfig.Builder().setNodes(nodes) .build(); - TridentTupleMapper tupleMapper = new WordCountTupleMapper(); + TupleMapper tupleMapper = new WordCountTupleMapper(); StateFactory factory = RedisClusterMapState.transactional(clusterConfig); TridentTopology topology = new TridentTopology(); http://git-wip-us.apache.org/repos/asf/storm/blob/0edb8ab9/external/storm-redis/src/test/java/org/apache/storm/redis/trident/WordCountTridentRedisMap.java ---------------------------------------------------------------------- diff --git a/external/storm-redis/src/test/java/org/apache/storm/redis/trident/WordCountTridentRedisMap.java b/external/storm-redis/src/test/java/org/apache/storm/redis/trident/WordCountTridentRedisMap.java index b096e55..4d4afe8 100644 --- a/external/storm-redis/src/test/java/org/apache/storm/redis/trident/WordCountTridentRedisMap.java +++ b/external/storm-redis/src/test/java/org/apache/storm/redis/trident/WordCountTridentRedisMap.java @@ -23,12 +23,9 @@ import backtype.storm.StormSubmitter; import backtype.storm.generated.StormTopology; import backtype.storm.tuple.Fields; import backtype.storm.tuple.Values; -import org.apache.storm.redis.trident.mapper.TridentTupleMapper; +import org.apache.storm.redis.common.mapper.TupleMapper; import org.apache.storm.redis.trident.state.RedisMapState; -import org.apache.storm.redis.trident.state.RedisState; -import org.apache.storm.redis.trident.state.RedisStateQuerier; -import org.apache.storm.redis.trident.state.RedisStateUpdater; -import org.apache.storm.redis.util.config.JedisPoolConfig; +import org.apache.storm.redis.common.config.JedisPoolConfig; import storm.trident.Stream; import storm.trident.TridentState; import storm.trident.TridentTopology; @@ -51,7 +48,7 @@ public class WordCountTridentRedisMap { JedisPoolConfig poolConfig = new JedisPoolConfig.Builder() .setHost(redisHost).setPort(redisPort) .build(); - TridentTupleMapper tupleMapper = new WordCountTupleMapper(); + TupleMapper tupleMapper = new WordCountTupleMapper(); StateFactory factory = RedisMapState.transactional(poolConfig); TridentTopology topology = new TridentTopology(); http://git-wip-us.apache.org/repos/asf/storm/blob/0edb8ab9/external/storm-redis/src/test/java/org/apache/storm/redis/trident/WordCountTupleMapper.java ---------------------------------------------------------------------- diff --git a/external/storm-redis/src/test/java/org/apache/storm/redis/trident/WordCountTupleMapper.java b/external/storm-redis/src/test/java/org/apache/storm/redis/trident/WordCountTupleMapper.java index 6454c9e..1e601c9 100644 --- a/external/storm-redis/src/test/java/org/apache/storm/redis/trident/WordCountTupleMapper.java +++ b/external/storm-redis/src/test/java/org/apache/storm/redis/trident/WordCountTupleMapper.java @@ -1,16 +1,16 @@ package org.apache.storm.redis.trident; -import org.apache.storm.redis.trident.mapper.TridentTupleMapper; -import storm.trident.tuple.TridentTuple; +import backtype.storm.tuple.ITuple; +import org.apache.storm.redis.common.mapper.TupleMapper; -public class WordCountTupleMapper implements TridentTupleMapper { +public class WordCountTupleMapper implements TupleMapper { @Override - public String getKeyFromTridentTuple(TridentTuple tuple) { + public String getKeyFromTuple(ITuple tuple) { return tuple.getString(0); } @Override - public String getValueFromTridentTuple(TridentTuple tuple) { + public String getValueFromTuple(ITuple tuple) { return tuple.getInteger(1).toString(); } }