http://git-wip-us.apache.org/repos/asf/storm/blob/e3f5b138/external/storm-redis/src/main/java/org/apache/storm/redis/trident/state/Options.java
----------------------------------------------------------------------
diff --git 
a/external/storm-redis/src/main/java/org/apache/storm/redis/trident/state/Options.java
 
b/external/storm-redis/src/main/java/org/apache/storm/redis/trident/state/Options.java
index 3442c46..a65790a 100644
--- 
a/external/storm-redis/src/main/java/org/apache/storm/redis/trident/state/Options.java
+++ 
b/external/storm-redis/src/main/java/org/apache/storm/redis/trident/state/Options.java
@@ -1,28 +1,21 @@
 /**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
+ * Licensed to the Apache Software Foundation (ASF) under one or more 
contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.  The 
ASF licenses this file to you under the Apache License, Version
+ * 2.0 (the "License"); you may not use this file except in compliance with 
the License.  You may obtain a copy of the License at
  * <p>
  * http://www.apache.org/licenses/LICENSE-2.0
  * <p>
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
+ * Unless required by applicable law or agreed to in writing, software 
distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 
See the License for the specific language governing permissions
+ * and limitations under the License.
  */
 
 package org.apache.storm.redis.trident.state;
 
+import java.io.Serializable;
 import org.apache.storm.redis.common.mapper.RedisDataTypeDescription;
 import org.apache.storm.trident.state.Serializer;
 
-import java.io.Serializable;
-
 /**
  * Options of State.<br/>
  * It's a data structure (whole things are public) and you can access and 
modify all fields.
@@ -30,7 +23,8 @@ import java.io.Serializable;
  * @param <T> value's type class
  */
 public class Options<T> implements Serializable {
-    private static final RedisDataTypeDescription DEFAULT_REDIS_DATATYPE = new 
RedisDataTypeDescription(RedisDataTypeDescription.RedisDataType.STRING);
+    private static final RedisDataTypeDescription DEFAULT_REDIS_DATATYPE =
+        new 
RedisDataTypeDescription(RedisDataTypeDescription.RedisDataType.STRING);
 
     public int localCacheSize = 1000;
     public String globalKey = "$REDIS-MAP-STATE-GLOBAL";

http://git-wip-us.apache.org/repos/asf/storm/blob/e3f5b138/external/storm-redis/src/main/java/org/apache/storm/redis/trident/state/RedisClusterMapState.java
----------------------------------------------------------------------
diff --git 
a/external/storm-redis/src/main/java/org/apache/storm/redis/trident/state/RedisClusterMapState.java
 
b/external/storm-redis/src/main/java/org/apache/storm/redis/trident/state/RedisClusterMapState.java
index ecae18d..c43a18d 100644
--- 
a/external/storm-redis/src/main/java/org/apache/storm/redis/trident/state/RedisClusterMapState.java
+++ 
b/external/storm-redis/src/main/java/org/apache/storm/redis/trident/state/RedisClusterMapState.java
@@ -1,29 +1,23 @@
 /**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
+ * Licensed to the Apache Software Foundation (ASF) under one or more 
contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.  The 
ASF licenses this file to you under the Apache License, Version
+ * 2.0 (the "License"); you may not use this file except in compliance with 
the License.  You may obtain a copy of the License at
  * <p>
  * http://www.apache.org/licenses/LICENSE-2.0
  * <p>
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
+ * Unless required by applicable law or agreed to in writing, software 
distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 
See the License for the specific language governing permissions
+ * and limitations under the License.
  */
 
 package org.apache.storm.redis.trident.state;
 
-import org.apache.storm.task.IMetricsContext;
-import org.apache.storm.tuple.Values;
 import com.google.common.collect.Lists;
+import java.util.List;
+import java.util.Map;
 import org.apache.storm.redis.common.config.JedisClusterConfig;
 import org.apache.storm.redis.common.mapper.RedisDataTypeDescription;
-import redis.clients.jedis.JedisCluster;
+import org.apache.storm.task.IMetricsContext;
 import org.apache.storm.trident.state.OpaqueValue;
 import org.apache.storm.trident.state.Serializer;
 import org.apache.storm.trident.state.State;
@@ -36,9 +30,8 @@ import org.apache.storm.trident.state.map.NonTransactionalMap;
 import org.apache.storm.trident.state.map.OpaqueMap;
 import org.apache.storm.trident.state.map.SnapshottableMap;
 import org.apache.storm.trident.state.map.TransactionalMap;
-
-import java.util.List;
-import java.util.Map;
+import org.apache.storm.tuple.Values;
+import redis.clients.jedis.JedisCluster;
 
 /**
  * IBackingMap implementation for Redis Cluster environment.
@@ -47,6 +40,27 @@ import java.util.Map;
  * @see AbstractRedisMapState
  */
 public class RedisClusterMapState<T> extends AbstractRedisMapState<T> {
+    private JedisCluster jedisCluster;
+    private Options options;
+    private Serializer serializer;
+    private KeyFactory keyFactory;
+
+    /**
+     * Constructor
+     *
+     * @param jedisCluster JedisCluster
+     * @param options options of State
+     * @param serializer Serializer
+     * @param keyFactory KeyFactory
+     */
+    public RedisClusterMapState(JedisCluster jedisCluster, Options options,
+                                Serializer<T> serializer, KeyFactory 
keyFactory) {
+        this.jedisCluster = jedisCluster;
+        this.options = options;
+        this.serializer = serializer;
+        this.keyFactory = keyFactory;
+    }
+
     /**
      * Provides StateFactory for opaque transactional.
      *
@@ -189,98 +203,6 @@ public class RedisClusterMapState<T> extends 
AbstractRedisMapState<T> {
     }
 
     /**
-     * RedisClusterMapState.Factory provides Redis Cluster environment version 
of StateFactory.
-     */
-    protected static class Factory implements StateFactory {
-        public static final redis.clients.jedis.JedisPoolConfig 
DEFAULT_POOL_CONFIG = new redis.clients.jedis.JedisPoolConfig();
-
-        JedisClusterConfig jedisClusterConfig;
-
-        StateType type;
-        Serializer serializer;
-        KeyFactory keyFactory;
-        Options options;
-
-        /**
-         * Constructor
-         *
-         * @param jedisClusterConfig configuration for JedisCluster
-         * @param type StateType
-         * @param options options of State
-         */
-        public Factory(JedisClusterConfig jedisClusterConfig, StateType type, 
Options options) {
-            this.jedisClusterConfig = jedisClusterConfig;
-            this.type = type;
-            this.options = options;
-
-            this.keyFactory = options.keyFactory;
-            if (this.keyFactory == null) {
-                this.keyFactory = new KeyFactory.DefaultKeyFactory();
-            }
-            this.serializer = options.serializer;
-            if (this.serializer == null) {
-                this.serializer = DEFAULT_SERIALIZERS.get(type);
-                if (this.serializer == null) {
-                    throw new RuntimeException("Couldn't find serializer for 
state type: " + type);
-                }
-            }
-        }
-
-        /**
-         * {@inheritDoc}
-         */
-        @Override
-        public State makeState(Map<String, Object> conf, IMetricsContext 
metrics, int partitionIndex, int numPartitions) {
-            JedisCluster jedisCluster = new 
JedisCluster(jedisClusterConfig.getNodes(),
-                    jedisClusterConfig.getTimeout(),
-                    jedisClusterConfig.getTimeout(),
-                    jedisClusterConfig.getMaxRedirections(),
-                    jedisClusterConfig.getPassword(),
-                    DEFAULT_POOL_CONFIG);
-
-            RedisClusterMapState state = new 
RedisClusterMapState(jedisCluster, options, serializer, keyFactory);
-            CachedMap c = new CachedMap(state, options.localCacheSize);
-
-            MapState ms;
-            if (type == StateType.NON_TRANSACTIONAL) {
-                ms = NonTransactionalMap.build(c);
-
-            } else if (type == StateType.OPAQUE) {
-                ms = OpaqueMap.build(c);
-
-            } else if (type == StateType.TRANSACTIONAL) {
-                ms = TransactionalMap.build(c);
-
-            } else {
-                throw new RuntimeException("Unknown state type: " + type);
-            }
-
-            return new SnapshottableMap(ms, new Values(options.globalKey));
-        }
-    }
-
-    private JedisCluster jedisCluster;
-    private Options options;
-    private Serializer serializer;
-    private KeyFactory keyFactory;
-
-    /**
-     * Constructor
-     *
-     * @param jedisCluster JedisCluster
-     * @param options options of State
-     * @param serializer Serializer
-     * @param keyFactory KeyFactory
-     */
-    public RedisClusterMapState(JedisCluster jedisCluster, Options options,
-                                Serializer<T> serializer, KeyFactory 
keyFactory) {
-        this.jedisCluster = jedisCluster;
-        this.options = options;
-        this.serializer = serializer;
-        this.keyFactory = keyFactory;
-    }
-
-    /**
      * {@inheritDoc}
      */
     @Override
@@ -350,4 +272,75 @@ public class RedisClusterMapState<T> extends 
AbstractRedisMapState<T> {
                 throw new IllegalArgumentException("Cannot process such data 
type: " + description.getDataType());
         }
     }
+
+    /**
+     * RedisClusterMapState.Factory provides Redis Cluster environment version 
of StateFactory.
+     */
+    protected static class Factory implements StateFactory {
+        public static final redis.clients.jedis.JedisPoolConfig 
DEFAULT_POOL_CONFIG = new redis.clients.jedis.JedisPoolConfig();
+
+        JedisClusterConfig jedisClusterConfig;
+
+        StateType type;
+        Serializer serializer;
+        KeyFactory keyFactory;
+        Options options;
+
+        /**
+         * Constructor
+         *
+         * @param jedisClusterConfig configuration for JedisCluster
+         * @param type StateType
+         * @param options options of State
+         */
+        public Factory(JedisClusterConfig jedisClusterConfig, StateType type, 
Options options) {
+            this.jedisClusterConfig = jedisClusterConfig;
+            this.type = type;
+            this.options = options;
+
+            this.keyFactory = options.keyFactory;
+            if (this.keyFactory == null) {
+                this.keyFactory = new KeyFactory.DefaultKeyFactory();
+            }
+            this.serializer = options.serializer;
+            if (this.serializer == null) {
+                this.serializer = DEFAULT_SERIALIZERS.get(type);
+                if (this.serializer == null) {
+                    throw new RuntimeException("Couldn't find serializer for 
state type: " + type);
+                }
+            }
+        }
+
+        /**
+         * {@inheritDoc}
+         */
+        @Override
+        public State makeState(Map<String, Object> conf, IMetricsContext 
metrics, int partitionIndex, int numPartitions) {
+            JedisCluster jedisCluster = new 
JedisCluster(jedisClusterConfig.getNodes(),
+                                                         
jedisClusterConfig.getTimeout(),
+                                                         
jedisClusterConfig.getTimeout(),
+                                                         
jedisClusterConfig.getMaxRedirections(),
+                                                         
jedisClusterConfig.getPassword(),
+                                                         DEFAULT_POOL_CONFIG);
+
+            RedisClusterMapState state = new 
RedisClusterMapState(jedisCluster, options, serializer, keyFactory);
+            CachedMap c = new CachedMap(state, options.localCacheSize);
+
+            MapState ms;
+            if (type == StateType.NON_TRANSACTIONAL) {
+                ms = NonTransactionalMap.build(c);
+
+            } else if (type == StateType.OPAQUE) {
+                ms = OpaqueMap.build(c);
+
+            } else if (type == StateType.TRANSACTIONAL) {
+                ms = TransactionalMap.build(c);
+
+            } else {
+                throw new RuntimeException("Unknown state type: " + type);
+            }
+
+            return new SnapshottableMap(ms, new Values(options.globalKey));
+        }
+    }
 }

http://git-wip-us.apache.org/repos/asf/storm/blob/e3f5b138/external/storm-redis/src/main/java/org/apache/storm/redis/trident/state/RedisClusterState.java
----------------------------------------------------------------------
diff --git 
a/external/storm-redis/src/main/java/org/apache/storm/redis/trident/state/RedisClusterState.java
 
b/external/storm-redis/src/main/java/org/apache/storm/redis/trident/state/RedisClusterState.java
index 11d8907..7bdd86b 100644
--- 
a/external/storm-redis/src/main/java/org/apache/storm/redis/trident/state/RedisClusterState.java
+++ 
b/external/storm-redis/src/main/java/org/apache/storm/redis/trident/state/RedisClusterState.java
@@ -1,37 +1,39 @@
 /**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
+ * Licensed to the Apache Software Foundation (ASF) under one or more 
contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.  The 
ASF licenses this file to you under the Apache License, Version
+ * 2.0 (the "License"); you may not use this file except in compliance with 
the License.  You may obtain a copy of the License at
  *
  * http://www.apache.org/licenses/LICENSE-2.0
  *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
+ * Unless required by applicable law or agreed to in writing, software 
distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 
See the License for the specific language governing permissions
+ * and limitations under the License.
  */
 
 package org.apache.storm.redis.trident.state;
 
-import org.apache.storm.task.IMetricsContext;
+import java.util.Map;
 import org.apache.storm.redis.common.config.JedisClusterConfig;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-import redis.clients.jedis.JedisCluster;
+import org.apache.storm.task.IMetricsContext;
 import org.apache.storm.trident.state.State;
 import org.apache.storm.trident.state.StateFactory;
-
-import java.util.Map;
+import redis.clients.jedis.JedisCluster;
 
 /**
  * Implementation of State for Redis Cluster environment.
  */
 public class RedisClusterState implements State {
+    private JedisCluster jedisCluster;
+
+    /**
+     * Constructor
+     *
+     * @param jedisCluster JedisCluster
+     */
+    public RedisClusterState(JedisCluster jedisCluster) {
+        this.jedisCluster = jedisCluster;
+    }
+
     /**
      * {@inheritDoc}
      */
@@ -47,6 +49,26 @@ public class RedisClusterState implements State {
     }
 
     /**
+     * Borrows JedisCluster instance.
+     * <p/>
+     * Note that you should return borrowed instance when you finish using 
instance.
+     *
+     * @return JedisCluster instance
+     */
+    public JedisCluster getJedisCluster() {
+        return this.jedisCluster;
+    }
+
+    /**
+     * Returns JedisCluster instance to pool.
+     *
+     * @param jedisCluster JedisCluster instance to return to pool
+     */
+    public void returnJedisCluster(JedisCluster jedisCluster) {
+        //do nothing
+    }
+
+    /**
      * RedisClusterState.Factory implements StateFactory for Redis Cluster 
environment.
      *
      * @see StateFactory
@@ -71,45 +93,14 @@ public class RedisClusterState implements State {
         @Override
         public State makeState(Map<String, Object> conf, IMetricsContext 
metrics, int partitionIndex, int numPartitions) {
             JedisCluster jedisCluster = new 
JedisCluster(jedisClusterConfig.getNodes(),
-                                                    
jedisClusterConfig.getTimeout(),
-                                                    
jedisClusterConfig.getTimeout(),
-                                                    
jedisClusterConfig.getMaxRedirections(),
-                                                    
jedisClusterConfig.getPassword(),
-                                                    DEFAULT_POOL_CONFIG);
+                                                         
jedisClusterConfig.getTimeout(),
+                                                         
jedisClusterConfig.getTimeout(),
+                                                         
jedisClusterConfig.getMaxRedirections(),
+                                                         
jedisClusterConfig.getPassword(),
+                                                         DEFAULT_POOL_CONFIG);
 
             return new RedisClusterState(jedisCluster);
         }
     }
 
-    private JedisCluster jedisCluster;
-
-    /**
-     * Constructor
-     *
-     * @param jedisCluster JedisCluster
-     */
-    public RedisClusterState(JedisCluster jedisCluster) {
-        this.jedisCluster = jedisCluster;
-    }
-
-    /**
-     * Borrows JedisCluster instance.
-     * <p/>
-     * Note that you should return borrowed instance when you finish using 
instance.
-     *
-     * @return JedisCluster instance
-     */
-    public JedisCluster getJedisCluster() {
-        return this.jedisCluster;
-    }
-
-    /**
-     * Returns JedisCluster instance to pool.
-     *
-     * @param jedisCluster JedisCluster instance to return to pool
-     */
-    public void returnJedisCluster(JedisCluster jedisCluster) {
-        //do nothing
-    }
-
 }

http://git-wip-us.apache.org/repos/asf/storm/blob/e3f5b138/external/storm-redis/src/main/java/org/apache/storm/redis/trident/state/RedisClusterStateQuerier.java
----------------------------------------------------------------------
diff --git 
a/external/storm-redis/src/main/java/org/apache/storm/redis/trident/state/RedisClusterStateQuerier.java
 
b/external/storm-redis/src/main/java/org/apache/storm/redis/trident/state/RedisClusterStateQuerier.java
index 6938409..b7401aa 100644
--- 
a/external/storm-redis/src/main/java/org/apache/storm/redis/trident/state/RedisClusterStateQuerier.java
+++ 
b/external/storm-redis/src/main/java/org/apache/storm/redis/trident/state/RedisClusterStateQuerier.java
@@ -1,28 +1,21 @@
 /**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
+ * Licensed to the Apache Software Foundation (ASF) under one or more 
contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.  The 
ASF licenses this file to you under the Apache License, Version
+ * 2.0 (the "License"); you may not use this file except in compliance with 
the License.  You may obtain a copy of the License at
  * <p>
  * http://www.apache.org/licenses/LICENSE-2.0
  * <p>
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
+ * Unless required by applicable law or agreed to in writing, software 
distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 
See the License for the specific language governing permissions
+ * and limitations under the License.
  */
 
 package org.apache.storm.redis.trident.state;
 
-import org.apache.storm.redis.common.mapper.RedisLookupMapper;
-import redis.clients.jedis.JedisCluster;
-
 import java.util.ArrayList;
 import java.util.List;
+import org.apache.storm.redis.common.mapper.RedisLookupMapper;
+import redis.clients.jedis.JedisCluster;
 
 /**
  * BaseQueryFunction implementation for Redis Cluster environment.

http://git-wip-us.apache.org/repos/asf/storm/blob/e3f5b138/external/storm-redis/src/main/java/org/apache/storm/redis/trident/state/RedisClusterStateUpdater.java
----------------------------------------------------------------------
diff --git 
a/external/storm-redis/src/main/java/org/apache/storm/redis/trident/state/RedisClusterStateUpdater.java
 
b/external/storm-redis/src/main/java/org/apache/storm/redis/trident/state/RedisClusterStateUpdater.java
index 1cc0725..0258ae5 100644
--- 
a/external/storm-redis/src/main/java/org/apache/storm/redis/trident/state/RedisClusterStateUpdater.java
+++ 
b/external/storm-redis/src/main/java/org/apache/storm/redis/trident/state/RedisClusterStateUpdater.java
@@ -1,28 +1,22 @@
 /**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
+ * Licensed to the Apache Software Foundation (ASF) under one or more 
contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.  The 
ASF licenses this file to you under the Apache License, Version
+ * 2.0 (the "License"); you may not use this file except in compliance with 
the License.  You may obtain a copy of the License at
  *
  * http://www.apache.org/licenses/LICENSE-2.0
  *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
+ * Unless required by applicable law or agreed to in writing, software 
distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 
See the License for the specific language governing permissions
+ * and limitations under the License.
  */
+
 package org.apache.storm.redis.trident.state;
 
+import java.util.Map;
 import org.apache.storm.redis.common.mapper.RedisDataTypeDescription;
 import org.apache.storm.redis.common.mapper.RedisStoreMapper;
 import redis.clients.jedis.JedisCluster;
 
-import java.util.Map;
-
 /**
  * BaseStateUpdater implementation for Redis Cluster environment.
  *
@@ -63,25 +57,25 @@ public class RedisClusterStateUpdater extends 
AbstractRedisStateUpdater<RedisClu
                 String value = kvEntry.getValue();
 
                 switch (dataType) {
-                case STRING:
-                    if (this.expireIntervalSec > 0) {
-                        jedisCluster.setex(key, expireIntervalSec, value);
-                    } else {
-                        jedisCluster.set(key, value);
-                    }
-                    break;
-                case HASH:
-                    jedisCluster.hset(additionalKey, key, value);
-                    break;
-                default:
-                    throw new IllegalArgumentException("Cannot process such 
data type: " + dataType);
+                    case STRING:
+                        if (this.expireIntervalSec > 0) {
+                            jedisCluster.setex(key, expireIntervalSec, value);
+                        } else {
+                            jedisCluster.set(key, value);
+                        }
+                        break;
+                    case HASH:
+                        jedisCluster.hset(additionalKey, key, value);
+                        break;
+                    default:
+                        throw new IllegalArgumentException("Cannot process 
such data type: " + dataType);
                 }
             }
 
             // send expire command for hash only once
             // it expires key itself entirely, so use it with caution
             if (dataType == RedisDataTypeDescription.RedisDataType.HASH &&
-                    this.expireIntervalSec > 0) {
+                this.expireIntervalSec > 0) {
                 jedisCluster.expire(additionalKey, expireIntervalSec);
             }
         } finally {

http://git-wip-us.apache.org/repos/asf/storm/blob/e3f5b138/external/storm-redis/src/main/java/org/apache/storm/redis/trident/state/RedisMapState.java
----------------------------------------------------------------------
diff --git 
a/external/storm-redis/src/main/java/org/apache/storm/redis/trident/state/RedisMapState.java
 
b/external/storm-redis/src/main/java/org/apache/storm/redis/trident/state/RedisMapState.java
index f68b5ef..6febc41 100644
--- 
a/external/storm-redis/src/main/java/org/apache/storm/redis/trident/state/RedisMapState.java
+++ 
b/external/storm-redis/src/main/java/org/apache/storm/redis/trident/state/RedisMapState.java
@@ -1,29 +1,22 @@
 /**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
+ * Licensed to the Apache Software Foundation (ASF) under one or more 
contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.  The 
ASF licenses this file to you under the Apache License, Version
+ * 2.0 (the "License"); you may not use this file except in compliance with 
the License.  You may obtain a copy of the License at
  *
  * http://www.apache.org/licenses/LICENSE-2.0
  *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
+ * Unless required by applicable law or agreed to in writing, software 
distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 
See the License for the specific language governing permissions
+ * and limitations under the License.
  */
+
 package org.apache.storm.redis.trident.state;
 
-import org.apache.storm.task.IMetricsContext;
-import org.apache.storm.tuple.Values;
+import java.util.List;
+import java.util.Map;
 import org.apache.storm.redis.common.config.JedisPoolConfig;
 import org.apache.storm.redis.common.mapper.RedisDataTypeDescription;
-import redis.clients.jedis.Jedis;
-import redis.clients.jedis.JedisPool;
-import redis.clients.jedis.Pipeline;
+import org.apache.storm.task.IMetricsContext;
 import org.apache.storm.trident.state.OpaqueValue;
 import org.apache.storm.trident.state.Serializer;
 import org.apache.storm.trident.state.State;
@@ -36,9 +29,10 @@ import 
org.apache.storm.trident.state.map.NonTransactionalMap;
 import org.apache.storm.trident.state.map.OpaqueMap;
 import org.apache.storm.trident.state.map.SnapshottableMap;
 import org.apache.storm.trident.state.map.TransactionalMap;
-
-import java.util.List;
-import java.util.Map;
+import org.apache.storm.tuple.Values;
+import redis.clients.jedis.Jedis;
+import redis.clients.jedis.JedisPool;
+import redis.clients.jedis.Pipeline;
 
 /**
  * IBackingMap implementation for single Redis environment.
@@ -47,6 +41,27 @@ import java.util.Map;
  * @see AbstractRedisMapState
  */
 public class RedisMapState<T> extends AbstractRedisMapState<T> {
+    private JedisPool jedisPool;
+    private Options options;
+    private Serializer serializer;
+    private KeyFactory keyFactory;
+
+    /**
+     * Constructor
+     *
+     * @param jedisPool JedisPool
+     * @param options options of State
+     * @param serializer Serializer
+     * @param keyFactory KeyFactory
+     */
+    public RedisMapState(JedisPool jedisPool, Options options,
+                         Serializer<T> serializer, KeyFactory keyFactory) {
+        this.jedisPool = jedisPool;
+        this.options = options;
+        this.serializer = serializer;
+        this.keyFactory = keyFactory;
+    }
+
     /**
      * Provides StateFactory for opaque transactional.
      *
@@ -189,97 +204,6 @@ public class RedisMapState<T> extends 
AbstractRedisMapState<T> {
     }
 
     /**
-     * RedisMapState.Factory provides single Redis environment version of 
StateFactory.
-     */
-    protected static class Factory implements StateFactory {
-        public static final redis.clients.jedis.JedisPoolConfig 
DEFAULT_POOL_CONFIG = new redis.clients.jedis.JedisPoolConfig();
-
-        JedisPoolConfig jedisPoolConfig;
-
-        StateType type;
-        Serializer serializer;
-        KeyFactory keyFactory;
-        Options options;
-
-        /**
-         * Constructor
-         *
-         * @param jedisPoolConfig configuration for JedisPool
-         * @param type StateType
-         * @param options options of State
-         */
-        public Factory(JedisPoolConfig jedisPoolConfig, StateType type, 
Options options) {
-            this.jedisPoolConfig = jedisPoolConfig;
-            this.type = type;
-            this.options = options;
-
-            this.keyFactory = options.keyFactory;
-            if (this.keyFactory == null) {
-                this.keyFactory = new KeyFactory.DefaultKeyFactory();
-            }
-            this.serializer = options.serializer;
-            if (this.serializer == null) {
-                this.serializer = DEFAULT_SERIALIZERS.get(type);
-                if (this.serializer == null) {
-                    throw new RuntimeException("Couldn't find serializer for 
state type: " + type);
-                }
-            }
-        }
-
-        /**
-         * {@inheritDoc}
-         */
-        @Override
-        public State makeState(Map<String, Object> conf, IMetricsContext 
metrics, int partitionIndex, int numPartitions) {
-            JedisPool jedisPool = new JedisPool(DEFAULT_POOL_CONFIG,
-                                                    jedisPoolConfig.getHost(),
-                                                    jedisPoolConfig.getPort(),
-                                                    
jedisPoolConfig.getTimeout(),
-                                                    
jedisPoolConfig.getPassword(),
-                                                    
jedisPoolConfig.getDatabase());
-            RedisMapState state = new RedisMapState(jedisPool, options, 
serializer, keyFactory);
-            CachedMap c = new CachedMap(state, options.localCacheSize);
-
-            MapState ms;
-            if (type == StateType.NON_TRANSACTIONAL) {
-                ms = NonTransactionalMap.build(c);
-
-            } else if (type == StateType.OPAQUE) {
-                ms = OpaqueMap.build(c);
-
-            } else if (type == StateType.TRANSACTIONAL) {
-                ms = TransactionalMap.build(c);
-
-            } else {
-                throw new RuntimeException("Unknown state type: " + type);
-            }
-
-            return new SnapshottableMap(ms, new Values(options.globalKey));
-        }
-    }
-
-    private JedisPool jedisPool;
-    private Options options;
-    private Serializer serializer;
-    private KeyFactory keyFactory;
-
-    /**
-     * Constructor
-     *
-     * @param jedisPool JedisPool
-     * @param options options of State
-     * @param serializer Serializer
-     * @param keyFactory KeyFactory
-     */
-    public RedisMapState(JedisPool jedisPool, Options options,
-                                            Serializer<T> serializer, 
KeyFactory keyFactory) {
-        this.jedisPool = jedisPool;
-        this.options = options;
-        this.serializer = serializer;
-        this.keyFactory = keyFactory;
-    }
-
-    /**
      * {@inheritDoc}
      */
     @Override
@@ -308,14 +232,14 @@ public class RedisMapState<T> extends 
AbstractRedisMapState<T> {
 
             RedisDataTypeDescription description = 
this.options.dataTypeDescription;
             switch (description.getDataType()) {
-            case STRING:
-                return jedis.mget(stringKeys);
+                case STRING:
+                    return jedis.mget(stringKeys);
 
-            case HASH:
-                return jedis.hmget(description.getAdditionalKey(), stringKeys);
+                case HASH:
+                    return jedis.hmget(description.getAdditionalKey(), 
stringKeys);
 
-            default:
-                throw new IllegalArgumentException("Cannot process such data 
type: " + description.getDataType());
+                default:
+                    throw new IllegalArgumentException("Cannot process such 
data type: " + description.getDataType());
             }
 
         } finally {
@@ -337,27 +261,27 @@ public class RedisMapState<T> extends 
AbstractRedisMapState<T> {
 
             RedisDataTypeDescription description = 
this.options.dataTypeDescription;
             switch (description.getDataType()) {
-            case STRING:
-                String[] keyValue = buildKeyValuesList(keyValues);
-                jedis.mset(keyValue);
-                if(this.options.expireIntervalSec > 0){
-                    Pipeline pipe = jedis.pipelined();
-                    for(int i = 0; i < keyValue.length; i += 2){
-                        pipe.expire(keyValue[i], 
this.options.expireIntervalSec);
+                case STRING:
+                    String[] keyValue = buildKeyValuesList(keyValues);
+                    jedis.mset(keyValue);
+                    if (this.options.expireIntervalSec > 0) {
+                        Pipeline pipe = jedis.pipelined();
+                        for (int i = 0; i < keyValue.length; i += 2) {
+                            pipe.expire(keyValue[i], 
this.options.expireIntervalSec);
+                        }
+                        pipe.sync();
                     }
-                    pipe.sync();
-                }
-                break;
+                    break;
 
-            case HASH:
-                jedis.hmset(description.getAdditionalKey(), keyValues);
-                if (this.options.expireIntervalSec > 0) {
-                    jedis.expire(description.getAdditionalKey(), 
this.options.expireIntervalSec);
-                }
-                break;
+                case HASH:
+                    jedis.hmset(description.getAdditionalKey(), keyValues);
+                    if (this.options.expireIntervalSec > 0) {
+                        jedis.expire(description.getAdditionalKey(), 
this.options.expireIntervalSec);
+                    }
+                    break;
 
-            default:
-                throw new IllegalArgumentException("Cannot process such data 
type: " + description.getDataType());
+                default:
+                    throw new IllegalArgumentException("Cannot process such 
data type: " + description.getDataType());
             }
 
         } finally {
@@ -378,4 +302,74 @@ public class RedisMapState<T> extends 
AbstractRedisMapState<T> {
 
         return keyValueLists;
     }
+
+    /**
+     * RedisMapState.Factory provides single Redis environment version of 
StateFactory.
+     */
+    protected static class Factory implements StateFactory {
+        public static final redis.clients.jedis.JedisPoolConfig 
DEFAULT_POOL_CONFIG = new redis.clients.jedis.JedisPoolConfig();
+
+        JedisPoolConfig jedisPoolConfig;
+
+        StateType type;
+        Serializer serializer;
+        KeyFactory keyFactory;
+        Options options;
+
+        /**
+         * Constructor
+         *
+         * @param jedisPoolConfig configuration for JedisPool
+         * @param type StateType
+         * @param options options of State
+         */
+        public Factory(JedisPoolConfig jedisPoolConfig, StateType type, 
Options options) {
+            this.jedisPoolConfig = jedisPoolConfig;
+            this.type = type;
+            this.options = options;
+
+            this.keyFactory = options.keyFactory;
+            if (this.keyFactory == null) {
+                this.keyFactory = new KeyFactory.DefaultKeyFactory();
+            }
+            this.serializer = options.serializer;
+            if (this.serializer == null) {
+                this.serializer = DEFAULT_SERIALIZERS.get(type);
+                if (this.serializer == null) {
+                    throw new RuntimeException("Couldn't find serializer for 
state type: " + type);
+                }
+            }
+        }
+
+        /**
+         * {@inheritDoc}
+         */
+        @Override
+        public State makeState(Map<String, Object> conf, IMetricsContext 
metrics, int partitionIndex, int numPartitions) {
+            JedisPool jedisPool = new JedisPool(DEFAULT_POOL_CONFIG,
+                                                jedisPoolConfig.getHost(),
+                                                jedisPoolConfig.getPort(),
+                                                jedisPoolConfig.getTimeout(),
+                                                jedisPoolConfig.getPassword(),
+                                                jedisPoolConfig.getDatabase());
+            RedisMapState state = new RedisMapState(jedisPool, options, 
serializer, keyFactory);
+            CachedMap c = new CachedMap(state, options.localCacheSize);
+
+            MapState ms;
+            if (type == StateType.NON_TRANSACTIONAL) {
+                ms = NonTransactionalMap.build(c);
+
+            } else if (type == StateType.OPAQUE) {
+                ms = OpaqueMap.build(c);
+
+            } else if (type == StateType.TRANSACTIONAL) {
+                ms = TransactionalMap.build(c);
+
+            } else {
+                throw new RuntimeException("Unknown state type: " + type);
+            }
+
+            return new SnapshottableMap(ms, new Values(options.globalKey));
+        }
+    }
 }

http://git-wip-us.apache.org/repos/asf/storm/blob/e3f5b138/external/storm-redis/src/main/java/org/apache/storm/redis/trident/state/RedisState.java
----------------------------------------------------------------------
diff --git 
a/external/storm-redis/src/main/java/org/apache/storm/redis/trident/state/RedisState.java
 
b/external/storm-redis/src/main/java/org/apache/storm/redis/trident/state/RedisState.java
index ec0b679..76b60f7 100644
--- 
a/external/storm-redis/src/main/java/org/apache/storm/redis/trident/state/RedisState.java
+++ 
b/external/storm-redis/src/main/java/org/apache/storm/redis/trident/state/RedisState.java
@@ -1,37 +1,40 @@
 /**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
+ * Licensed to the Apache Software Foundation (ASF) under one or more 
contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.  The 
ASF licenses this file to you under the Apache License, Version
+ * 2.0 (the "License"); you may not use this file except in compliance with 
the License.  You may obtain a copy of the License at
  *
  * http://www.apache.org/licenses/LICENSE-2.0
  *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
+ * Unless required by applicable law or agreed to in writing, software 
distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 
See the License for the specific language governing permissions
+ * and limitations under the License.
  */
+
 package org.apache.storm.redis.trident.state;
 
-import org.apache.storm.task.IMetricsContext;
+import java.util.Map;
 import org.apache.storm.redis.common.config.JedisPoolConfig;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-import redis.clients.jedis.Jedis;
-import redis.clients.jedis.JedisPool;
+import org.apache.storm.task.IMetricsContext;
 import org.apache.storm.trident.state.State;
 import org.apache.storm.trident.state.StateFactory;
-
-import java.util.Map;
+import redis.clients.jedis.Jedis;
+import redis.clients.jedis.JedisPool;
 
 /**
  * Implementation of State for single Redis environment.
  */
 public class RedisState implements State {
+    private JedisPool jedisPool;
+
+    /**
+     * Constructor
+     *
+     * @param jedisPool JedisPool
+     */
+    public RedisState(JedisPool jedisPool) {
+        this.jedisPool = jedisPool;
+    }
+
     /**
      * {@inheritDoc}
      */
@@ -47,6 +50,26 @@ public class RedisState implements State {
     }
 
     /**
+     * Borrows Jedis instance from pool.
+     * <p/>
+     * Note that you should return borrowed instance to pool when you finish 
using instance.
+     *
+     * @return Jedis instance
+     */
+    public Jedis getJedis() {
+        return this.jedisPool.getResource();
+    }
+
+    /**
+     * Returns Jedis instance to pool.
+     *
+     * @param jedis Jedis instance to return to pool
+     */
+    public void returnJedis(Jedis jedis) {
+        jedis.close();
+    }
+
+    /**
      * RedisState.Factory implements StateFactory for single Redis environment.
      *
      * @see StateFactory
@@ -81,35 +104,4 @@ public class RedisState implements State {
         }
     }
 
-    private JedisPool jedisPool;
-
-    /**
-     * Constructor
-     *
-     * @param jedisPool JedisPool
-     */
-    public RedisState(JedisPool jedisPool) {
-        this.jedisPool = jedisPool;
-    }
-
-    /**
-     * Borrows Jedis instance from pool.
-     * <p/>
-     * Note that you should return borrowed instance to pool when you finish 
using instance.
-     *
-     * @return Jedis instance
-     */
-    public Jedis getJedis() {
-        return this.jedisPool.getResource();
-    }
-
-    /**
-     * Returns Jedis instance to pool.
-     *
-     * @param jedis Jedis instance to return to pool
-     */
-    public void returnJedis(Jedis jedis) {
-        jedis.close();
-    }
-
 }

http://git-wip-us.apache.org/repos/asf/storm/blob/e3f5b138/external/storm-redis/src/main/java/org/apache/storm/redis/trident/state/RedisStateQuerier.java
----------------------------------------------------------------------
diff --git 
a/external/storm-redis/src/main/java/org/apache/storm/redis/trident/state/RedisStateQuerier.java
 
b/external/storm-redis/src/main/java/org/apache/storm/redis/trident/state/RedisStateQuerier.java
index d76fec2..8be00e2 100644
--- 
a/external/storm-redis/src/main/java/org/apache/storm/redis/trident/state/RedisStateQuerier.java
+++ 
b/external/storm-redis/src/main/java/org/apache/storm/redis/trident/state/RedisStateQuerier.java
@@ -1,27 +1,21 @@
 /**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
+ * Licensed to the Apache Software Foundation (ASF) under one or more 
contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.  The 
ASF licenses this file to you under the Apache License, Version
+ * 2.0 (the "License"); you may not use this file except in compliance with 
the License.  You may obtain a copy of the License at
  *
  * http://www.apache.org/licenses/LICENSE-2.0
  *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
+ * Unless required by applicable law or agreed to in writing, software 
distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 
See the License for the specific language governing permissions
+ * and limitations under the License.
  */
+
 package org.apache.storm.redis.trident.state;
 
+import java.util.List;
 import org.apache.storm.redis.common.mapper.RedisLookupMapper;
 import redis.clients.jedis.Jedis;
 
-import java.util.List;
-
 /**
  * BaseQueryFunction implementation for single Redis environment.
  *
@@ -49,14 +43,14 @@ public class RedisStateQuerier extends 
AbstractRedisStateQuerier<RedisState> {
 
             String[] keysForRedis = keys.toArray(new String[keys.size()]);
             switch (dataType) {
-            case STRING:
-                redisVals = jedis.mget(keysForRedis);
-                break;
-            case HASH:
-                redisVals = jedis.hmget(additionalKey, keysForRedis);
-                break;
-            default:
-                throw new IllegalArgumentException("Cannot process such data 
type: " + dataType);
+                case STRING:
+                    redisVals = jedis.mget(keysForRedis);
+                    break;
+                case HASH:
+                    redisVals = jedis.hmget(additionalKey, keysForRedis);
+                    break;
+                default:
+                    throw new IllegalArgumentException("Cannot process such 
data type: " + dataType);
             }
 
             return redisVals;

http://git-wip-us.apache.org/repos/asf/storm/blob/e3f5b138/external/storm-redis/src/main/java/org/apache/storm/redis/trident/state/RedisStateUpdater.java
----------------------------------------------------------------------
diff --git 
a/external/storm-redis/src/main/java/org/apache/storm/redis/trident/state/RedisStateUpdater.java
 
b/external/storm-redis/src/main/java/org/apache/storm/redis/trident/state/RedisStateUpdater.java
index d0507cf..0a5aede 100644
--- 
a/external/storm-redis/src/main/java/org/apache/storm/redis/trident/state/RedisStateUpdater.java
+++ 
b/external/storm-redis/src/main/java/org/apache/storm/redis/trident/state/RedisStateUpdater.java
@@ -1,29 +1,23 @@
 /**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
+ * Licensed to the Apache Software Foundation (ASF) under one or more 
contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.  The 
ASF licenses this file to you under the Apache License, Version
+ * 2.0 (the "License"); you may not use this file except in compliance with 
the License.  You may obtain a copy of the License at
  *
  * http://www.apache.org/licenses/LICENSE-2.0
  *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
+ * Unless required by applicable law or agreed to in writing, software 
distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 
See the License for the specific language governing permissions
+ * and limitations under the License.
  */
+
 package org.apache.storm.redis.trident.state;
 
+import java.util.Map;
 import org.apache.storm.redis.common.mapper.RedisDataTypeDescription;
 import org.apache.storm.redis.common.mapper.RedisStoreMapper;
 import redis.clients.jedis.Jedis;
 import redis.clients.jedis.Pipeline;
 
-import java.util.Map;
-
 /**
  * BaseStateUpdater implementation for single Redis environment.
  *
@@ -65,25 +59,25 @@ public class RedisStateUpdater extends 
AbstractRedisStateUpdater<RedisState> {
                 String value = kvEntry.getValue();
 
                 switch (dataType) {
-                case STRING:
-                    if (this.expireIntervalSec > 0) {
-                        pipeline.setex(key, expireIntervalSec, value);
-                    } else {
-                        pipeline.set(key, value);
-                    }
-                    break;
-                case HASH:
-                    pipeline.hset(additionalKey, key, value);
-                    break;
-                default:
-                    throw new IllegalArgumentException("Cannot process such 
data type: " + dataType);
+                    case STRING:
+                        if (this.expireIntervalSec > 0) {
+                            pipeline.setex(key, expireIntervalSec, value);
+                        } else {
+                            pipeline.set(key, value);
+                        }
+                        break;
+                    case HASH:
+                        pipeline.hset(additionalKey, key, value);
+                        break;
+                    default:
+                        throw new IllegalArgumentException("Cannot process 
such data type: " + dataType);
                 }
             }
 
             // send expire command for hash only once
             // it expires key itself entirely, so use it with caution
             if (dataType == RedisDataTypeDescription.RedisDataType.HASH &&
-                    this.expireIntervalSec > 0) {
+                this.expireIntervalSec > 0) {
                 pipeline.expire(additionalKey, expireIntervalSec);
             }
 

http://git-wip-us.apache.org/repos/asf/storm/blob/e3f5b138/external/storm-redis/src/test/java/org/apache/storm/redis/state/RedisKeyValueStateIteratorTest.java
----------------------------------------------------------------------
diff --git 
a/external/storm-redis/src/test/java/org/apache/storm/redis/state/RedisKeyValueStateIteratorTest.java
 
b/external/storm-redis/src/test/java/org/apache/storm/redis/state/RedisKeyValueStateIteratorTest.java
index 565a250..fb0052d 100644
--- 
a/external/storm-redis/src/test/java/org/apache/storm/redis/state/RedisKeyValueStateIteratorTest.java
+++ 
b/external/storm-redis/src/test/java/org/apache/storm/redis/state/RedisKeyValueStateIteratorTest.java
@@ -1,24 +1,22 @@
 /**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
+ * Licensed to the Apache Software Foundation (ASF) under one or more 
contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.  The 
ASF licenses this file to you under the Apache License, Version
+ * 2.0 (the "License"); you may not use this file except in compliance with 
the License.  You may obtain a copy of the License at
  *
  * http://www.apache.org/licenses/LICENSE-2.0
  *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
+ * Unless required by applicable law or agreed to in writing, software 
distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 
See the License for the specific language governing permissions
+ * and limitations under the License.
  */
 
 package org.apache.storm.redis.state;
 
 import com.google.common.primitives.UnsignedBytes;
+import java.util.ArrayList;
+import java.util.Map;
+import java.util.NavigableMap;
+import java.util.TreeMap;
 import org.apache.storm.redis.common.adapter.RedisCommandsAdapterJedis;
 import org.apache.storm.redis.common.container.RedisCommandsInstanceContainer;
 import org.apache.storm.state.DefaultStateEncoder;
@@ -29,11 +27,6 @@ import org.junit.Test;
 import redis.clients.jedis.ScanParams;
 import redis.clients.jedis.ScanResult;
 
-import java.util.ArrayList;
-import java.util.Map;
-import java.util.NavigableMap;
-import java.util.TreeMap;
-
 import static org.junit.Assert.assertArrayEquals;
 import static org.junit.Assert.assertFalse;
 import static org.junit.Assert.assertTrue;
@@ -79,15 +72,15 @@ public class RedisKeyValueStateIteratorTest {
         putEncodedKeyValueToMap(chunkMap, "key2".getBytes(), 
"value2".getBytes());
 
         ScanResult<Map.Entry<byte[], byte[]>> scanResultFirst = new 
ScanResult<>(
-                "12345".getBytes(), new ArrayList<>(chunkMap.entrySet()));
+            "12345".getBytes(), new ArrayList<>(chunkMap.entrySet()));
         ScanResult<Map.Entry<byte[], byte[]>> scanResultSecond = new 
ScanResult<>(
-                ScanParams.SCAN_POINTER_START_BINARY, new 
ArrayList<Map.Entry<byte[], byte[]>>());
+            ScanParams.SCAN_POINTER_START_BINARY, new 
ArrayList<Map.Entry<byte[], byte[]>>());
         when(mockJedis.hscan(eq(namespace), any(byte[].class), 
any(ScanParams.class)))
-                .thenReturn(scanResultFirst, scanResultSecond);
+            .thenReturn(scanResultFirst, scanResultSecond);
 
         RedisKeyValueStateIterator<byte[], byte[]> kvIterator =
-                new RedisKeyValueStateIterator<>(namespace, mockContainer, 
pendingPrepare.entrySet().iterator(),
-                        pendingCommit.entrySet().iterator(), chunkSize, 
keySerializer, valueSerializer);
+            new RedisKeyValueStateIterator<>(namespace, mockContainer, 
pendingPrepare.entrySet().iterator(),
+                                             
pendingCommit.entrySet().iterator(), chunkSize, keySerializer, valueSerializer);
 
         assertNextEntry(kvIterator, "key0".getBytes(), "value0".getBytes());
 
@@ -112,17 +105,17 @@ public class RedisKeyValueStateIteratorTest {
         putEncodedKeyValueToMap(chunkMap, "key2".getBytes(), 
"value2".getBytes());
 
         ScanResult<Map.Entry<byte[], byte[]>> scanResultFirst = new 
ScanResult<>(
-                "12345".getBytes(), new ArrayList<Map.Entry<byte[], 
byte[]>>());
+            "12345".getBytes(), new ArrayList<Map.Entry<byte[], byte[]>>());
         ScanResult<Map.Entry<byte[], byte[]>> scanResultSecond = new 
ScanResult<>(
-                "23456".getBytes(), new ArrayList<Map.Entry<byte[], 
byte[]>>());
+            "23456".getBytes(), new ArrayList<Map.Entry<byte[], byte[]>>());
         ScanResult<Map.Entry<byte[], byte[]>> scanResultThird = new 
ScanResult<>(
-                ScanParams.SCAN_POINTER_START_BINARY, new 
ArrayList<>(chunkMap.entrySet()));
+            ScanParams.SCAN_POINTER_START_BINARY, new 
ArrayList<>(chunkMap.entrySet()));
         when(mockJedis.hscan(eq(namespace), any(byte[].class), 
any(ScanParams.class)))
-                .thenReturn(scanResultFirst, scanResultSecond, 
scanResultThird);
+            .thenReturn(scanResultFirst, scanResultSecond, scanResultThird);
 
         RedisKeyValueStateIterator<byte[], byte[]> kvIterator =
-                new RedisKeyValueStateIterator<>(namespace, mockContainer, 
pendingPrepare.entrySet().iterator(),
-                        pendingCommit.entrySet().iterator(), chunkSize, 
keySerializer, valueSerializer);
+            new RedisKeyValueStateIterator<>(namespace, mockContainer, 
pendingPrepare.entrySet().iterator(),
+                                             
pendingCommit.entrySet().iterator(), chunkSize, keySerializer, valueSerializer);
 
         assertNextEntry(kvIterator, "key0".getBytes(), "value0".getBytes());
 
@@ -152,17 +145,17 @@ public class RedisKeyValueStateIteratorTest {
         putEncodedKeyValueToMap(chunkMap2, "key4".getBytes(), 
"value4".getBytes());
 
         ScanResult<Map.Entry<byte[], byte[]>> scanResultFirst = new 
ScanResult<>(
-                "12345".getBytes(), new ArrayList<>(chunkMap.entrySet()));
+            "12345".getBytes(), new ArrayList<>(chunkMap.entrySet()));
         ScanResult<Map.Entry<byte[], byte[]>> scanResultSecond = new 
ScanResult<>(
-                "23456".getBytes(), new ArrayList<>(chunkMap2.entrySet()));
+            "23456".getBytes(), new ArrayList<>(chunkMap2.entrySet()));
         ScanResult<Map.Entry<byte[], byte[]>> scanResultThird = new 
ScanResult<>(
-                ScanParams.SCAN_POINTER_START_BINARY, new 
ArrayList<Map.Entry<byte[], byte[]>>());
+            ScanParams.SCAN_POINTER_START_BINARY, new 
ArrayList<Map.Entry<byte[], byte[]>>());
         when(mockJedis.hscan(eq(namespace), any(byte[].class), 
any(ScanParams.class)))
-                .thenReturn(scanResultFirst, scanResultSecond, 
scanResultThird);
+            .thenReturn(scanResultFirst, scanResultSecond, scanResultThird);
 
         RedisKeyValueStateIterator<byte[], byte[]> kvIterator =
-                new RedisKeyValueStateIterator<>(namespace, mockContainer, 
pendingPrepare.entrySet().iterator(),
-                        pendingCommit.entrySet().iterator(), chunkSize, 
keySerializer, valueSerializer);
+            new RedisKeyValueStateIterator<>(namespace, mockContainer, 
pendingPrepare.entrySet().iterator(),
+                                             
pendingCommit.entrySet().iterator(), chunkSize, keySerializer, valueSerializer);
 
         // keys shouldn't appear twice
 
@@ -184,13 +177,13 @@ public class RedisKeyValueStateIteratorTest {
         NavigableMap<byte[], byte[]> pendingCommit = getBinaryTreeMap();
 
         ScanResult<Map.Entry<byte[], byte[]>> scanResult = new ScanResult<>(
-                ScanParams.SCAN_POINTER_START_BINARY, new 
ArrayList<Map.Entry<byte[], byte[]>>());
+            ScanParams.SCAN_POINTER_START_BINARY, new 
ArrayList<Map.Entry<byte[], byte[]>>());
         when(mockJedis.hscan(eq(namespace), any(byte[].class), 
any(ScanParams.class)))
-                .thenReturn(scanResult);
+            .thenReturn(scanResult);
 
         RedisKeyValueStateIterator<byte[], byte[]> kvIterator =
-                new RedisKeyValueStateIterator<>(namespace, mockContainer, 
pendingPrepare.entrySet().iterator(),
-                        pendingCommit.entrySet().iterator(), chunkSize, 
keySerializer, valueSerializer);
+            new RedisKeyValueStateIterator<>(namespace, mockContainer, 
pendingPrepare.entrySet().iterator(),
+                                             
pendingCommit.entrySet().iterator(), chunkSize, keySerializer, valueSerializer);
 
         assertFalse(kvIterator.hasNext());
     }

http://git-wip-us.apache.org/repos/asf/storm/blob/e3f5b138/external/storm-redis/src/test/java/org/apache/storm/redis/state/RedisKeyValueStateProviderTest.java
----------------------------------------------------------------------
diff --git 
a/external/storm-redis/src/test/java/org/apache/storm/redis/state/RedisKeyValueStateProviderTest.java
 
b/external/storm-redis/src/test/java/org/apache/storm/redis/state/RedisKeyValueStateProviderTest.java
index 92e76ec..b8de3d3 100644
--- 
a/external/storm-redis/src/test/java/org/apache/storm/redis/state/RedisKeyValueStateProviderTest.java
+++ 
b/external/storm-redis/src/test/java/org/apache/storm/redis/state/RedisKeyValueStateProviderTest.java
@@ -1,32 +1,24 @@
 /**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
+ * Licensed to the Apache Software Foundation (ASF) under one or more 
contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.  The 
ASF licenses this file to you under the Apache License, Version
+ * 2.0 (the "License"); you may not use this file except in compliance with 
the License.  You may obtain a copy of the License at
  *
  * http://www.apache.org/licenses/LICENSE-2.0
  *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
+ * Unless required by applicable law or agreed to in writing, software 
distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 
See the License for the specific language governing permissions
+ * and limitations under the License.
  */
-package org.apache.storm.redis.state;
 
-import org.apache.storm.Config;
-import org.apache.storm.state.State;
-import org.junit.Assert;
-import org.junit.Test;
+package org.apache.storm.redis.state;
 
-import java.util.Collections;
 import java.util.HashMap;
 import java.util.Map;
+import org.apache.storm.Config;
+import org.junit.Test;
 
-import static org.junit.Assert.*;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotNull;
 
 /**
  * Unit tests for {@link RedisKeyValueStateProvider}
@@ -49,8 +41,8 @@ public class RedisKeyValueStateProviderTest {
         RedisKeyValueStateProvider provider = new RedisKeyValueStateProvider();
         Map<String, Object> topoConf = new HashMap<>();
         topoConf.put(Config.TOPOLOGY_STATE_PROVIDER_CONFIG, 
"{\"keyClass\":\"String\", \"valueClass\":\"String\"," +
-                " \"jedisPoolConfig\":" +
-                "{\"host\":\"localhost\", \"port\":1000}}");
+                                                            " 
\"jedisPoolConfig\":" +
+                                                            
"{\"host\":\"localhost\", \"port\":1000}}");
 
         RedisKeyValueStateProvider.StateConfig config = 
provider.getStateConfig(topoConf);
         //System.out.println(config);

http://git-wip-us.apache.org/repos/asf/storm/blob/e3f5b138/external/storm-redis/src/test/java/org/apache/storm/redis/state/RedisKeyValueStateTest.java
----------------------------------------------------------------------
diff --git 
a/external/storm-redis/src/test/java/org/apache/storm/redis/state/RedisKeyValueStateTest.java
 
b/external/storm-redis/src/test/java/org/apache/storm/redis/state/RedisKeyValueStateTest.java
index 480001c..db98a39 100644
--- 
a/external/storm-redis/src/test/java/org/apache/storm/redis/state/RedisKeyValueStateTest.java
+++ 
b/external/storm-redis/src/test/java/org/apache/storm/redis/state/RedisKeyValueStateTest.java
@@ -1,23 +1,24 @@
 /**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
+ * Licensed to the Apache Software Foundation (ASF) under one or more 
contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.  The 
ASF licenses this file to you under the Apache License, Version
+ * 2.0 (the "License"); you may not use this file except in compliance with 
the License.  You may obtain a copy of the License at
  *
  * http://www.apache.org/licenses/LICENSE-2.0
  *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
+ * Unless required by applicable law or agreed to in writing, software 
distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 
See the License for the specific language governing permissions
+ * and limitations under the License.
  */
+
 package org.apache.storm.redis.state;
 
 import com.google.common.primitives.UnsignedBytes;
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.NavigableMap;
+import java.util.TreeMap;
+import java.util.concurrent.ConcurrentSkipListMap;
 import org.apache.storm.redis.common.commands.RedisCommands;
 import org.apache.storm.redis.common.container.RedisCommandsInstanceContainer;
 import org.apache.storm.state.DefaultStateSerializer;
@@ -29,14 +30,8 @@ import org.mockito.invocation.InvocationOnMock;
 import org.mockito.stubbing.Answer;
 import redis.clients.util.SafeEncoder;
 
-import java.util.HashMap;
-import java.util.Arrays;
-import java.util.Map;
-import java.util.NavigableMap;
-import java.util.TreeMap;
-import java.util.concurrent.ConcurrentSkipListMap;
-
-import static org.junit.Assert.*;
+import static org.junit.Assert.assertArrayEquals;
+import static org.junit.Assert.assertEquals;
 
 /**
  * Unit tests for {@link RedisKeyValueState}
@@ -53,7 +48,7 @@ public class RedisKeyValueStateTest {
     @Before
     public void setUp() {
         final NavigableMap<byte[], NavigableMap<byte[], byte[]>> mockMap =
-                new 
ConcurrentSkipListMap<>(UnsignedBytes.lexicographicalComparator());
+            new 
ConcurrentSkipListMap<>(UnsignedBytes.lexicographicalComparator());
         mockContainer = Mockito.mock(RedisCommandsInstanceContainer.class);
         mockCommands = Mockito.mock(RedisCommands.class);
         Mockito.when(mockContainer.getInstance()).thenReturn(mockCommands);
@@ -62,78 +57,78 @@ public class RedisKeyValueStateTest {
         ArgumentCaptor<Map> mapArgumentCaptor = 
ArgumentCaptor.forClass(Map.class);
 
         Mockito.when(mockCommands.exists(Mockito.any(byte[].class)))
-                .thenAnswer(new Answer<Boolean>() {
-                    @Override
-                    public Boolean answer(InvocationOnMock invocation) throws 
Throwable {
-                        Object[] args = invocation.getArguments();
-                        return exists(mockMap, (byte[]) args[0]);
-                    }
-                });
+               .thenAnswer(new Answer<Boolean>() {
+                   @Override
+                   public Boolean answer(InvocationOnMock invocation) throws 
Throwable {
+                       Object[] args = invocation.getArguments();
+                       return exists(mockMap, (byte[]) args[0]);
+                   }
+               });
 
         Mockito.when(mockCommands.del(Mockito.any(byte[].class)))
-                .thenAnswer(new Answer<Long>() {
-                    @Override
-                    public Long answer(InvocationOnMock invocation) throws 
Throwable {
-                        Object[] args = invocation.getArguments();
-                        return del(mockMap, (byte[]) args[0]);
-                    }
-                });
+               .thenAnswer(new Answer<Long>() {
+                   @Override
+                   public Long answer(InvocationOnMock invocation) throws 
Throwable {
+                       Object[] args = invocation.getArguments();
+                       return del(mockMap, (byte[]) args[0]);
+                   }
+               });
 
         Mockito.when(mockCommands.hmset(Mockito.any(byte[].class), 
Mockito.anyMap()))
-                .thenAnswer(new Answer<String>() {
-                    @Override
-                    public String answer(InvocationOnMock invocation) throws 
Throwable {
-                        Object[] args = invocation.getArguments();
-                        return hmset(mockMap, (byte[]) args[0], (Map<byte[], 
byte[]>) args[1]);
-                    }
-                });
+               .thenAnswer(new Answer<String>() {
+                   @Override
+                   public String answer(InvocationOnMock invocation) throws 
Throwable {
+                       Object[] args = invocation.getArguments();
+                       return hmset(mockMap, (byte[]) args[0], (Map<byte[], 
byte[]>) args[1]);
+                   }
+               });
 
         Mockito.when(mockCommands.hget(Mockito.any(byte[].class), 
Mockito.any(byte[].class)))
-                .thenAnswer(new Answer<byte[]>() {
-                    @Override
-                    public byte[] answer(InvocationOnMock invocation) throws 
Throwable {
-                        Object[] args = invocation.getArguments();
-                        return hget(mockMap, (byte[]) args[0], (byte[]) 
args[1]);
-                    }
-                });
+               .thenAnswer(new Answer<byte[]>() {
+                   @Override
+                   public byte[] answer(InvocationOnMock invocation) throws 
Throwable {
+                       Object[] args = invocation.getArguments();
+                       return hget(mockMap, (byte[]) args[0], (byte[]) 
args[1]);
+                   }
+               });
 
         Mockito.when(mockCommands.hdel(Mockito.any(byte[].class), 
Mockito.<byte[]>anyVararg()))
-                .thenAnswer(new Answer<Long>() {
-                    @Override
-                    public Long answer(InvocationOnMock invocation) throws 
Throwable {
-                        Object[] args = invocation.getArguments();
-                        int argsSize = args.length;
-                        byte[][] fields = Arrays.asList(args).subList(1, 
argsSize).toArray(new byte[argsSize - 1][]);
-                        return hdel(mockMap, (byte[]) args[0], fields);
-                    }
-                });
+               .thenAnswer(new Answer<Long>() {
+                   @Override
+                   public Long answer(InvocationOnMock invocation) throws 
Throwable {
+                       Object[] args = invocation.getArguments();
+                       int argsSize = args.length;
+                       byte[][] fields = Arrays.asList(args).subList(1, 
argsSize).toArray(new byte[argsSize - 1][]);
+                       return hdel(mockMap, (byte[]) args[0], fields);
+                   }
+               });
 
         Mockito.when(mockCommands.exists(Mockito.anyString()))
-                .thenAnswer(new Answer<Boolean>() {
-                    @Override
-                    public Boolean answer(InvocationOnMock invocation) throws 
Throwable {
-                        Object[] args = invocation.getArguments();
-                        return exists(mockMap, (String) args[0]);
-                    }
-                });
+               .thenAnswer(new Answer<Boolean>() {
+                   @Override
+                   public Boolean answer(InvocationOnMock invocation) throws 
Throwable {
+                       Object[] args = invocation.getArguments();
+                       return exists(mockMap, (String) args[0]);
+                   }
+               });
 
         Mockito.when(mockCommands.hmset(Mockito.anyString(), Mockito.anyMap()))
-                .thenAnswer(new Answer<String>() {
-                    @Override
-                    public String answer(InvocationOnMock invocation) throws 
Throwable {
-                        Object[] args = invocation.getArguments();
-                        return hmset(mockMap, (String) args[0], (Map<String, 
String>) args[1]);
-                    }
-                });
+               .thenAnswer(new Answer<String>() {
+                   @Override
+                   public String answer(InvocationOnMock invocation) throws 
Throwable {
+                       Object[] args = invocation.getArguments();
+                       return hmset(mockMap, (String) args[0], (Map<String, 
String>) args[1]);
+                   }
+               });
 
         Mockito.when(mockCommands.hgetAll(Mockito.anyString()))
-                .thenAnswer(new Answer<Map<String, String>>() {
-                    @Override
-                    public Map<String, String> answer(InvocationOnMock 
invocation) throws Throwable {
-                        Object[] args = invocation.getArguments();
-                        return hgetAll(mockMap, (String) args[0]);
-                    }
-                });
+               .thenAnswer(new Answer<Map<String, String>>() {
+                   @Override
+                   public Map<String, String> answer(InvocationOnMock 
invocation) throws Throwable {
+                       Object[] args = invocation.getArguments();
+                       return hgetAll(mockMap, (String) args[0]);
+                   }
+               });
 
         keyValueState = new RedisKeyValueState<String, String>("test", 
mockContainer, new DefaultStateSerializer<String>(),
                                                                new 
DefaultStateSerializer<String>());
@@ -167,38 +162,38 @@ public class RedisKeyValueStateTest {
         keyValueState.put("b", "2");
         keyValueState.prepareCommit(1);
         keyValueState.put("c", "3");
-        assertArrayEquals(new String[]{"1", "2", "3"}, getValues());
+        assertArrayEquals(new String[]{ "1", "2", "3" }, getValues());
         keyValueState.rollback();
-        assertArrayEquals(new String[]{null, null, null}, getValues());
+        assertArrayEquals(new String[]{ null, null, null }, getValues());
         keyValueState.put("a", "1");
         keyValueState.put("b", "2");
         keyValueState.prepareCommit(1);
         keyValueState.commit(1);
         keyValueState.put("c", "3");
-        assertArrayEquals(new String[]{"1", "2", "3"}, getValues());
+        assertArrayEquals(new String[]{ "1", "2", "3" }, getValues());
         keyValueState.rollback();
-        assertArrayEquals(new String[]{"1", "2", null}, getValues());
+        assertArrayEquals(new String[]{ "1", "2", null }, getValues());
         keyValueState.put("c", "3");
         assertEquals("2", keyValueState.delete("b"));
         assertEquals("3", keyValueState.delete("c"));
-        assertArrayEquals(new String[]{"1", null, null}, getValues());
+        assertArrayEquals(new String[]{ "1", null, null }, getValues());
         keyValueState.prepareCommit(2);
-        assertArrayEquals(new String[]{"1", null, null}, getValues());
+        assertArrayEquals(new String[]{ "1", null, null }, getValues());
         keyValueState.commit(2);
-        assertArrayEquals(new String[]{"1", null, null}, getValues());
+        assertArrayEquals(new String[]{ "1", null, null }, getValues());
         keyValueState.put("b", "2");
         keyValueState.prepareCommit(3);
         keyValueState.put("c", "3");
-        assertArrayEquals(new String[]{"1", "2", "3"}, getValues());
+        assertArrayEquals(new String[]{ "1", "2", "3" }, getValues());
         keyValueState.rollback();
-        assertArrayEquals(new String[]{"1", null, null}, getValues());
+        assertArrayEquals(new String[]{ "1", null, null }, getValues());
     }
 
     private String[] getValues() {
         return new String[]{
-                keyValueState.get("a"),
-                keyValueState.get("b"),
-                keyValueState.get("c")
+            keyValueState.get("a"),
+            keyValueState.get("b"),
+            keyValueState.get("c")
         };
     }
 
@@ -221,10 +216,11 @@ public class RedisKeyValueStateTest {
     }
 
     private Long del(NavigableMap<byte[], NavigableMap<byte[], byte[]>> 
mockMap, byte[] key) {
-        if (mockMap.remove(key) == null)
+        if (mockMap.remove(key) == null) {
             return 0L;
-        else
+        } else {
             return 1L;
+        }
     }
 
     private byte[] hget(NavigableMap<byte[], NavigableMap<byte[], byte[]>> 
mockMap, byte[] namespace, byte[] key) {
@@ -234,9 +230,9 @@ public class RedisKeyValueStateTest {
         return null;
     }
 
-    private Long hdel(NavigableMap<byte[], NavigableMap<byte[], byte[]>> 
mockMap, byte[] namespace, byte[] ... keys) {
+    private Long hdel(NavigableMap<byte[], NavigableMap<byte[], byte[]>> 
mockMap, byte[] namespace, byte[]... keys) {
         Long count = 0L;
-        for (byte[] key: keys) {
+        for (byte[] key : keys) {
             if (mockMap.get(namespace).remove(key) != null) count++;
         }
         return count;

Reply via email to