Repository: storm
Updated Branches:
  refs/heads/0.10.x-branch 511869c6f -> 81505f9c7


http://git-wip-us.apache.org/repos/asf/storm/blob/0edb8ab9/external/storm-redis/src/test/java/org/apache/storm/redis/topology/PersistentWordCount.java
----------------------------------------------------------------------
diff --git 
a/external/storm-redis/src/test/java/org/apache/storm/redis/topology/PersistentWordCount.java
 
b/external/storm-redis/src/test/java/org/apache/storm/redis/topology/PersistentWordCount.java
index 535d7b9..77c6ee8 100644
--- 
a/external/storm-redis/src/test/java/org/apache/storm/redis/topology/PersistentWordCount.java
+++ 
b/external/storm-redis/src/test/java/org/apache/storm/redis/topology/PersistentWordCount.java
@@ -23,10 +23,14 @@ import backtype.storm.StormSubmitter;
 import backtype.storm.topology.OutputFieldsDeclarer;
 import backtype.storm.topology.TopologyBuilder;
 import backtype.storm.tuple.Fields;
+import backtype.storm.tuple.ITuple;
 import backtype.storm.tuple.Tuple;
 import org.apache.storm.redis.bolt.AbstractRedisBolt;
-import org.apache.storm.redis.util.config.JedisClusterConfig;
-import org.apache.storm.redis.util.config.JedisPoolConfig;
+import org.apache.storm.redis.bolt.RedisStoreBolt;
+import org.apache.storm.redis.common.config.JedisClusterConfig;
+import org.apache.storm.redis.common.config.JedisPoolConfig;
+import org.apache.storm.redis.common.mapper.RedisDataTypeDescription;
+import org.apache.storm.redis.common.mapper.RedisStoreMapper;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import redis.clients.jedis.JedisCommands;
@@ -36,46 +40,11 @@ import redis.clients.jedis.exceptions.JedisException;
 public class PersistentWordCount {
     private static final String WORD_SPOUT = "WORD_SPOUT";
     private static final String COUNT_BOLT = "COUNT_BOLT";
-    private static final String REDIS_BOLT = "REDIS_BOLT";
+    private static final String STORE_BOLT = "STORE_BOLT";
 
     private static final String TEST_REDIS_HOST = "127.0.0.1";
     private static final int TEST_REDIS_PORT = 6379;
 
-    public static class StoreCountRedisBolt extends AbstractRedisBolt {
-        private static final Logger LOG = 
LoggerFactory.getLogger(StoreCountRedisBolt.class);
-
-        public StoreCountRedisBolt(JedisPoolConfig config) {
-            super(config);
-        }
-
-        public StoreCountRedisBolt(JedisClusterConfig config) {
-            super(config);
-        }
-
-        @Override
-        public void execute(Tuple input) {
-            String word = input.getStringByField("word");
-            int count = input.getIntegerByField("count");
-
-            JedisCommands commands = null;
-            try {
-                commands = getInstance();
-                commands.incrBy(word, count);
-            } catch (JedisConnectionException e) {
-                throw new RuntimeException("Unfortunately, this test requires 
redis-server running", e);
-            } catch (JedisException e) {
-                LOG.error("Exception occurred from Jedis/Redis", e);
-            } finally {
-                returnInstance(commands);
-                this.collector.ack(input);
-            }
-        }
-
-        @Override
-        public void declareOutputFields(OutputFieldsDeclarer declarer) {
-        }
-    }
-
     public static void main(String[] args) throws Exception {
         Config config = new Config();
 
@@ -92,14 +61,15 @@ public class PersistentWordCount {
 
         WordSpout spout = new WordSpout();
         WordCounter bolt = new WordCounter();
-        StoreCountRedisBolt redisBolt = new StoreCountRedisBolt(poolConfig);
+        RedisStoreMapper storeMapper = setupStoreMapper();
+        RedisStoreBolt storeBolt = new RedisStoreBolt(poolConfig, storeMapper);
 
         // wordSpout ==> countBolt ==> RedisBolt
         TopologyBuilder builder = new TopologyBuilder();
 
         builder.setSpout(WORD_SPOUT, spout, 1);
-        builder.setBolt(COUNT_BOLT, bolt, 1).shuffleGrouping(WORD_SPOUT);
-        builder.setBolt(REDIS_BOLT, redisBolt, 1).fieldsGrouping(COUNT_BOLT, 
new Fields("word"));
+        builder.setBolt(COUNT_BOLT, bolt, 1).fieldsGrouping(WORD_SPOUT, new 
Fields("word"));
+        builder.setBolt(STORE_BOLT, storeBolt, 1).shuffleGrouping(COUNT_BOLT);
 
         if (args.length == 2) {
             LocalCluster cluster = new LocalCluster();
@@ -114,4 +84,33 @@ public class PersistentWordCount {
             System.out.println("Usage: PersistentWordCount <redis host> <redis 
port> (topology name)");
         }
     }
+
+    private static RedisStoreMapper setupStoreMapper() {
+        return new WordCountStoreMapper();
+    }
+
+    private static class WordCountStoreMapper implements RedisStoreMapper {
+        private RedisDataTypeDescription description;
+        private final String hashKey = "wordCount";
+
+        public WordCountStoreMapper() {
+            description = new RedisDataTypeDescription(
+                    RedisDataTypeDescription.RedisDataType.HASH, hashKey);
+        }
+
+        @Override
+        public RedisDataTypeDescription getDataTypeDescription() {
+            return description;
+        }
+
+        @Override
+        public String getKeyFromTuple(ITuple tuple) {
+            return tuple.getStringByField("word");
+        }
+
+        @Override
+        public String getValueFromTuple(ITuple tuple) {
+            return tuple.getStringByField("count");
+        }
+    }
 }

http://git-wip-us.apache.org/repos/asf/storm/blob/0edb8ab9/external/storm-redis/src/test/java/org/apache/storm/redis/topology/WordCounter.java
----------------------------------------------------------------------
diff --git 
a/external/storm-redis/src/test/java/org/apache/storm/redis/topology/WordCounter.java
 
b/external/storm-redis/src/test/java/org/apache/storm/redis/topology/WordCounter.java
index 6a0548d..6f25038 100644
--- 
a/external/storm-redis/src/test/java/org/apache/storm/redis/topology/WordCounter.java
+++ 
b/external/storm-redis/src/test/java/org/apache/storm/redis/topology/WordCounter.java
@@ -23,23 +23,32 @@ import backtype.storm.topology.IBasicBolt;
 import backtype.storm.topology.OutputFieldsDeclarer;
 import backtype.storm.tuple.Fields;
 import backtype.storm.tuple.Tuple;
+import backtype.storm.tuple.Values;
+import com.google.common.collect.Maps;
 
 import java.util.Map;
 
 import static backtype.storm.utils.Utils.tuple;
 
 public class WordCounter implements IBasicBolt {
-
+    private Map<String, Integer> wordCounter = Maps.newHashMap();
 
     @SuppressWarnings("rawtypes")
     public void prepare(Map stormConf, TopologyContext context) {
     }
 
-    /*
-     * Just output the word value with a count of 1.
-     */
     public void execute(Tuple input, BasicOutputCollector collector) {
-        collector.emit(tuple(input.getValues().get(0), 1));
+        String word = input.getStringByField("word");
+        int count;
+        if (wordCounter.containsKey(word)) {
+            count = wordCounter.get(word) + 1;
+            wordCounter.put(word, wordCounter.get(word) + 1);
+        } else {
+            count = 1;
+        }
+
+        wordCounter.put(word, count);
+        collector.emit(new Values(word, String.valueOf(count)));
     }
 
     public void cleanup() {

http://git-wip-us.apache.org/repos/asf/storm/blob/0edb8ab9/external/storm-redis/src/test/java/org/apache/storm/redis/trident/WordCountTridentRedis.java
----------------------------------------------------------------------
diff --git 
a/external/storm-redis/src/test/java/org/apache/storm/redis/trident/WordCountTridentRedis.java
 
b/external/storm-redis/src/test/java/org/apache/storm/redis/trident/WordCountTridentRedis.java
index a610f54..eb13399 100644
--- 
a/external/storm-redis/src/test/java/org/apache/storm/redis/trident/WordCountTridentRedis.java
+++ 
b/external/storm-redis/src/test/java/org/apache/storm/redis/trident/WordCountTridentRedis.java
@@ -23,15 +23,14 @@ import backtype.storm.StormSubmitter;
 import backtype.storm.generated.StormTopology;
 import backtype.storm.tuple.Fields;
 import backtype.storm.tuple.Values;
-import org.apache.storm.redis.trident.mapper.TridentTupleMapper;
+import org.apache.storm.redis.common.mapper.TupleMapper;
 import org.apache.storm.redis.trident.state.RedisState;
 import org.apache.storm.redis.trident.state.RedisStateQuerier;
 import org.apache.storm.redis.trident.state.RedisStateUpdater;
-import org.apache.storm.redis.util.config.JedisPoolConfig;
+import org.apache.storm.redis.common.config.JedisPoolConfig;
 import storm.trident.Stream;
 import storm.trident.TridentState;
 import storm.trident.TridentTopology;
-import storm.trident.state.StateFactory;
 import storm.trident.testing.FixedBatchSpout;
 
 public class WordCountTridentRedis {
@@ -48,7 +47,7 @@ public class WordCountTridentRedis {
         JedisPoolConfig poolConfig = new JedisPoolConfig.Builder()
                                         .setHost(redisHost).setPort(redisPort)
                                         .build();
-        TridentTupleMapper tupleMapper = new WordCountTupleMapper();
+        TupleMapper tupleMapper = new WordCountTupleMapper();
         RedisState.Factory factory = new RedisState.Factory(poolConfig);
 
         TridentTopology topology = new TridentTopology();

http://git-wip-us.apache.org/repos/asf/storm/blob/0edb8ab9/external/storm-redis/src/test/java/org/apache/storm/redis/trident/WordCountTridentRedisCluster.java
----------------------------------------------------------------------
diff --git 
a/external/storm-redis/src/test/java/org/apache/storm/redis/trident/WordCountTridentRedisCluster.java
 
b/external/storm-redis/src/test/java/org/apache/storm/redis/trident/WordCountTridentRedisCluster.java
index 8bea3ce..8562e77 100644
--- 
a/external/storm-redis/src/test/java/org/apache/storm/redis/trident/WordCountTridentRedisCluster.java
+++ 
b/external/storm-redis/src/test/java/org/apache/storm/redis/trident/WordCountTridentRedisCluster.java
@@ -23,11 +23,11 @@ import backtype.storm.StormSubmitter;
 import backtype.storm.generated.StormTopology;
 import backtype.storm.tuple.Fields;
 import backtype.storm.tuple.Values;
-import org.apache.storm.redis.trident.mapper.TridentTupleMapper;
+import org.apache.storm.redis.common.mapper.TupleMapper;
 import org.apache.storm.redis.trident.state.RedisClusterState;
 import org.apache.storm.redis.trident.state.RedisClusterStateQuerier;
 import org.apache.storm.redis.trident.state.RedisClusterStateUpdater;
-import org.apache.storm.redis.util.config.JedisClusterConfig;
+import org.apache.storm.redis.common.config.JedisClusterConfig;
 import storm.trident.Stream;
 import storm.trident.TridentState;
 import storm.trident.TridentTopology;
@@ -55,7 +55,7 @@ public class WordCountTridentRedisCluster {
         }
         JedisClusterConfig clusterConfig = new 
JedisClusterConfig.Builder().setNodes(nodes)
                                         .build();
-        TridentTupleMapper tupleMapper = new WordCountTupleMapper();
+        TupleMapper tupleMapper = new WordCountTupleMapper();
         RedisClusterState.Factory factory = new 
RedisClusterState.Factory(clusterConfig);
 
         TridentTopology topology = new TridentTopology();

http://git-wip-us.apache.org/repos/asf/storm/blob/0edb8ab9/external/storm-redis/src/test/java/org/apache/storm/redis/trident/WordCountTridentRedisClusterMap.java
----------------------------------------------------------------------
diff --git 
a/external/storm-redis/src/test/java/org/apache/storm/redis/trident/WordCountTridentRedisClusterMap.java
 
b/external/storm-redis/src/test/java/org/apache/storm/redis/trident/WordCountTridentRedisClusterMap.java
index e9ae54d..de1f252 100644
--- 
a/external/storm-redis/src/test/java/org/apache/storm/redis/trident/WordCountTridentRedisClusterMap.java
+++ 
b/external/storm-redis/src/test/java/org/apache/storm/redis/trident/WordCountTridentRedisClusterMap.java
@@ -23,11 +23,9 @@ import backtype.storm.StormSubmitter;
 import backtype.storm.generated.StormTopology;
 import backtype.storm.tuple.Fields;
 import backtype.storm.tuple.Values;
-import org.apache.storm.redis.trident.mapper.TridentTupleMapper;
+import org.apache.storm.redis.common.mapper.TupleMapper;
 import org.apache.storm.redis.trident.state.RedisClusterMapState;
-import org.apache.storm.redis.trident.state.RedisClusterStateUpdater;
-import org.apache.storm.redis.trident.state.RedisStateQuerier;
-import org.apache.storm.redis.util.config.JedisClusterConfig;
+import org.apache.storm.redis.common.config.JedisClusterConfig;
 import storm.trident.Stream;
 import storm.trident.TridentState;
 import storm.trident.TridentTopology;
@@ -58,7 +56,7 @@ public class WordCountTridentRedisClusterMap {
         }
         JedisClusterConfig clusterConfig = new 
JedisClusterConfig.Builder().setNodes(nodes)
                                         .build();
-        TridentTupleMapper tupleMapper = new WordCountTupleMapper();
+        TupleMapper tupleMapper = new WordCountTupleMapper();
         StateFactory factory = 
RedisClusterMapState.transactional(clusterConfig);
 
         TridentTopology topology = new TridentTopology();

http://git-wip-us.apache.org/repos/asf/storm/blob/0edb8ab9/external/storm-redis/src/test/java/org/apache/storm/redis/trident/WordCountTridentRedisMap.java
----------------------------------------------------------------------
diff --git 
a/external/storm-redis/src/test/java/org/apache/storm/redis/trident/WordCountTridentRedisMap.java
 
b/external/storm-redis/src/test/java/org/apache/storm/redis/trident/WordCountTridentRedisMap.java
index b096e55..4d4afe8 100644
--- 
a/external/storm-redis/src/test/java/org/apache/storm/redis/trident/WordCountTridentRedisMap.java
+++ 
b/external/storm-redis/src/test/java/org/apache/storm/redis/trident/WordCountTridentRedisMap.java
@@ -23,12 +23,9 @@ import backtype.storm.StormSubmitter;
 import backtype.storm.generated.StormTopology;
 import backtype.storm.tuple.Fields;
 import backtype.storm.tuple.Values;
-import org.apache.storm.redis.trident.mapper.TridentTupleMapper;
+import org.apache.storm.redis.common.mapper.TupleMapper;
 import org.apache.storm.redis.trident.state.RedisMapState;
-import org.apache.storm.redis.trident.state.RedisState;
-import org.apache.storm.redis.trident.state.RedisStateQuerier;
-import org.apache.storm.redis.trident.state.RedisStateUpdater;
-import org.apache.storm.redis.util.config.JedisPoolConfig;
+import org.apache.storm.redis.common.config.JedisPoolConfig;
 import storm.trident.Stream;
 import storm.trident.TridentState;
 import storm.trident.TridentTopology;
@@ -51,7 +48,7 @@ public class WordCountTridentRedisMap {
         JedisPoolConfig poolConfig = new JedisPoolConfig.Builder()
                                         .setHost(redisHost).setPort(redisPort)
                                         .build();
-        TridentTupleMapper tupleMapper = new WordCountTupleMapper();
+        TupleMapper tupleMapper = new WordCountTupleMapper();
         StateFactory factory = RedisMapState.transactional(poolConfig);
 
         TridentTopology topology = new TridentTopology();

http://git-wip-us.apache.org/repos/asf/storm/blob/0edb8ab9/external/storm-redis/src/test/java/org/apache/storm/redis/trident/WordCountTupleMapper.java
----------------------------------------------------------------------
diff --git 
a/external/storm-redis/src/test/java/org/apache/storm/redis/trident/WordCountTupleMapper.java
 
b/external/storm-redis/src/test/java/org/apache/storm/redis/trident/WordCountTupleMapper.java
index 6454c9e..1e601c9 100644
--- 
a/external/storm-redis/src/test/java/org/apache/storm/redis/trident/WordCountTupleMapper.java
+++ 
b/external/storm-redis/src/test/java/org/apache/storm/redis/trident/WordCountTupleMapper.java
@@ -1,16 +1,16 @@
 package org.apache.storm.redis.trident;
 
-import org.apache.storm.redis.trident.mapper.TridentTupleMapper;
-import storm.trident.tuple.TridentTuple;
+import backtype.storm.tuple.ITuple;
+import org.apache.storm.redis.common.mapper.TupleMapper;
 
-public class WordCountTupleMapper implements TridentTupleMapper {
+public class WordCountTupleMapper implements TupleMapper {
     @Override
-    public String getKeyFromTridentTuple(TridentTuple tuple) {
+    public String getKeyFromTuple(ITuple tuple) {
         return tuple.getString(0);
     }
 
     @Override
-    public String getValueFromTridentTuple(TridentTuple tuple) {
+    public String getValueFromTuple(ITuple tuple) {
         return tuple.getInteger(1).toString();
     }
 }

Reply via email to