http://git-wip-us.apache.org/repos/asf/storm/blob/880d14f1/external/storm-hbase/src/main/java/org/apache/storm/hbase/trident/state/HBaseMapState.java
----------------------------------------------------------------------
diff --git 
a/external/storm-hbase/src/main/java/org/apache/storm/hbase/trident/state/HBaseMapState.java
 
b/external/storm-hbase/src/main/java/org/apache/storm/hbase/trident/state/HBaseMapState.java
index 9dbcd80..d2a73d4 100644
--- 
a/external/storm-hbase/src/main/java/org/apache/storm/hbase/trident/state/HBaseMapState.java
+++ 
b/external/storm-hbase/src/main/java/org/apache/storm/hbase/trident/state/HBaseMapState.java
@@ -19,14 +19,12 @@
 package org.apache.storm.hbase.trident.state;
 
 import com.google.common.collect.Maps;
-
 import java.io.IOException;
 import java.io.InterruptedIOException;
 import java.io.Serializable;
 import java.util.ArrayList;
 import java.util.List;
 import java.util.Map;
-
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.hbase.HBaseConfiguration;
 import org.apache.hadoop.hbase.client.Get;
@@ -61,13 +59,9 @@ import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 public class HBaseMapState<T> implements IBackingMap<T> {
-    private static Logger LOG = LoggerFactory.getLogger(HBaseMapState.class);
-
-    private int partitionNum;
-
-
     @SuppressWarnings("rawtypes")
     private static final Map<StateType, Serializer> DEFAULT_SERIALZERS = 
Maps.newHashMap();
+    private static Logger LOG = LoggerFactory.getLogger(HBaseMapState.class);
 
     static {
         DEFAULT_SERIALZERS.put(StateType.NON_TRANSACTIONAL, new 
JSONNonTransactionalSerializer());
@@ -75,6 +69,7 @@ public class HBaseMapState<T> implements IBackingMap<T> {
         DEFAULT_SERIALZERS.put(StateType.OPAQUE, new JSONOpaqueSerializer());
     }
 
+    private int partitionNum;
     private Options<T> options;
     private Serializer<T> serializer;
     private HTable table;
@@ -82,8 +77,8 @@ public class HBaseMapState<T> implements IBackingMap<T> {
     /**
      * Constructor.
      *
-     * @param options HBase State options.
-     * @param map topology config map.
+     * @param options      HBase State options.
+     * @param map          topology config map.
      * @param partitionNum the number of partition.
      */
     public HBaseMapState(final Options<T> options, Map<String, Object> map, 
int partitionNum) {
@@ -92,7 +87,7 @@ public class HBaseMapState<T> implements IBackingMap<T> {
         this.partitionNum = partitionNum;
 
         final Configuration hbConfig = HBaseConfiguration.create();
-        Map<String, Object> conf = (Map<String, 
Object>)map.get(options.configKey);
+        Map<String, Object> conf = (Map<String, Object>) 
map.get(options.configKey);
         if (conf == null) {
             LOG.info("HBase configuration not found using key '" + 
options.configKey + "'");
             LOG.info("Using HBase config from first hbase-site.xml found on 
classpath.");
@@ -114,19 +109,6 @@ public class HBaseMapState<T> implements IBackingMap<T> {
 
     }
 
-
-    public static class Options<T> implements Serializable {
-
-        public Serializer<T> serializer = null;
-        public int cacheSize = 5000;
-        public String globalKey = "$HBASE_STATE_GLOBAL$";
-        public String configKey = "hbase.config";
-        public String tableName;
-        public String columnFamily;
-        public TridentHBaseMapMapper mapMapper;
-    }
-
-
     @SuppressWarnings("rawtypes")
     public static StateFactory opaque() {
         Options<OpaqueValue> options = new Options<OpaqueValue>();
@@ -159,57 +141,6 @@ public class HBaseMapState<T> implements IBackingMap<T> {
         return new Factory(StateType.NON_TRANSACTIONAL, opts);
     }
 
-
-    protected static class Factory implements StateFactory {
-        private StateType stateType;
-        private Options options;
-
-        @SuppressWarnings({"rawtypes", "unchecked"})
-        public Factory(StateType stateType, Options options) {
-            this.stateType = stateType;
-            this.options = options;
-
-            if (this.options.serializer == null) {
-                this.options.serializer = DEFAULT_SERIALZERS.get(stateType);
-            }
-
-            if (this.options.serializer == null) {
-                throw new RuntimeException("Serializer should be specified for 
type: " + stateType);
-            }
-
-            if (this.options.mapMapper == null) {
-                throw new RuntimeException("MapMapper should be specified for 
type: " + stateType);
-            }
-        }
-
-        @SuppressWarnings({"rawtypes", "unchecked"})
-        public State makeState(Map<String, Object> conf, IMetricsContext 
metrics, int partitionIndex, int numPartitions) {
-            LOG.info("Preparing HBase State for partition {} of {}.", 
partitionIndex + 1, numPartitions);
-            IBackingMap state = new HBaseMapState(options, conf, 
partitionIndex);
-
-            if (options.cacheSize > 0) {
-                state = new CachedMap(state, options.cacheSize);
-            }
-
-            MapState mapState;
-            switch (stateType) {
-                case NON_TRANSACTIONAL:
-                    mapState = NonTransactionalMap.build(state);
-                    break;
-                case OPAQUE:
-                    mapState = OpaqueMap.build(state);
-                    break;
-                case TRANSACTIONAL:
-                    mapState = TransactionalMap.build(state);
-                    break;
-                default:
-                    throw new IllegalArgumentException("Unknown state type: " 
+ stateType);
-            }
-            return new SnapshottableMap(mapState, new 
Values(options.globalKey));
-        }
-
-    }
-
     @Override
     public List<T> multiGet(List<List<Object>> keys) {
         List<Get> gets = new ArrayList<Get>();
@@ -249,7 +180,7 @@ public class HBaseMapState<T> implements IBackingMap<T> {
             byte[] hbaseKey = this.options.mapMapper.rowKey(keys.get(i));
             String qualifier = this.options.mapMapper.qualifier(keys.get(i));
             LOG.info("Partiton: {}, Key: {}, Value: {}",
-                    new Object[]{this.partitionNum, new String(hbaseKey), new 
String(this.serializer.serialize(values.get(i)))});
+                     new Object[]{ this.partitionNum, new String(hbaseKey), 
new String(this.serializer.serialize(values.get(i))) });
             Put put = new Put(hbaseKey);
             T val = values.get(i);
             put.add(this.options.columnFamily.getBytes(),
@@ -268,4 +199,65 @@ public class HBaseMapState<T> implements IBackingMap<T> {
             throw new FailedException("IOException while writing to HBase", e);
         }
     }
+
+    public static class Options<T> implements Serializable {
+
+        public Serializer<T> serializer = null;
+        public int cacheSize = 5000;
+        public String globalKey = "$HBASE_STATE_GLOBAL$";
+        public String configKey = "hbase.config";
+        public String tableName;
+        public String columnFamily;
+        public TridentHBaseMapMapper mapMapper;
+    }
+
+    protected static class Factory implements StateFactory {
+        private StateType stateType;
+        private Options options;
+
+        @SuppressWarnings({ "rawtypes", "unchecked" })
+        public Factory(StateType stateType, Options options) {
+            this.stateType = stateType;
+            this.options = options;
+
+            if (this.options.serializer == null) {
+                this.options.serializer = DEFAULT_SERIALZERS.get(stateType);
+            }
+
+            if (this.options.serializer == null) {
+                throw new RuntimeException("Serializer should be specified for 
type: " + stateType);
+            }
+
+            if (this.options.mapMapper == null) {
+                throw new RuntimeException("MapMapper should be specified for 
type: " + stateType);
+            }
+        }
+
+        @SuppressWarnings({ "rawtypes", "unchecked" })
+        public State makeState(Map<String, Object> conf, IMetricsContext 
metrics, int partitionIndex, int numPartitions) {
+            LOG.info("Preparing HBase State for partition {} of {}.", 
partitionIndex + 1, numPartitions);
+            IBackingMap state = new HBaseMapState(options, conf, 
partitionIndex);
+
+            if (options.cacheSize > 0) {
+                state = new CachedMap(state, options.cacheSize);
+            }
+
+            MapState mapState;
+            switch (stateType) {
+                case NON_TRANSACTIONAL:
+                    mapState = NonTransactionalMap.build(state);
+                    break;
+                case OPAQUE:
+                    mapState = OpaqueMap.build(state);
+                    break;
+                case TRANSACTIONAL:
+                    mapState = TransactionalMap.build(state);
+                    break;
+                default:
+                    throw new IllegalArgumentException("Unknown state type: " 
+ stateType);
+            }
+            return new SnapshottableMap(mapState, new 
Values(options.globalKey));
+        }
+
+    }
 }

http://git-wip-us.apache.org/repos/asf/storm/blob/880d14f1/external/storm-hbase/src/main/java/org/apache/storm/hbase/trident/state/HBaseQuery.java
----------------------------------------------------------------------
diff --git 
a/external/storm-hbase/src/main/java/org/apache/storm/hbase/trident/state/HBaseQuery.java
 
b/external/storm-hbase/src/main/java/org/apache/storm/hbase/trident/state/HBaseQuery.java
index 4d95eb5..e3c8755 100644
--- 
a/external/storm-hbase/src/main/java/org/apache/storm/hbase/trident/state/HBaseQuery.java
+++ 
b/external/storm-hbase/src/main/java/org/apache/storm/hbase/trident/state/HBaseQuery.java
@@ -1,28 +1,22 @@
 /**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
+ * Licensed to the Apache Software Foundation (ASF) under one or more 
contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.  The 
ASF licenses this file to you under the Apache License, Version
+ * 2.0 (the "License"); you may not use this file except in compliance with 
the License.  You may obtain a copy of the License at
  *
  * http://www.apache.org/licenses/LICENSE-2.0
  *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
+ * Unless required by applicable law or agreed to in writing, software 
distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 
See the License for the specific language governing permissions
+ * and limitations under the License.
  */
+
 package org.apache.storm.hbase.trident.state;
 
-import org.apache.storm.tuple.Values;
+import java.util.List;
 import org.apache.storm.trident.operation.TridentCollector;
 import org.apache.storm.trident.state.BaseQueryFunction;
 import org.apache.storm.trident.tuple.TridentTuple;
-
-import java.util.List;
+import org.apache.storm.tuple.Values;
 
 public class HBaseQuery extends BaseQueryFunction<HBaseState, List<Values>> {
 

http://git-wip-us.apache.org/repos/asf/storm/blob/880d14f1/external/storm-hbase/src/main/java/org/apache/storm/hbase/trident/state/HBaseState.java
----------------------------------------------------------------------
diff --git 
a/external/storm-hbase/src/main/java/org/apache/storm/hbase/trident/state/HBaseState.java
 
b/external/storm-hbase/src/main/java/org/apache/storm/hbase/trident/state/HBaseState.java
index 7b32eae..a216402 100644
--- 
a/external/storm-hbase/src/main/java/org/apache/storm/hbase/trident/state/HBaseState.java
+++ 
b/external/storm-hbase/src/main/java/org/apache/storm/hbase/trident/state/HBaseState.java
@@ -1,44 +1,41 @@
 /**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
+ * Licensed to the Apache Software Foundation (ASF) under one or more 
contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.  The 
ASF licenses this file to you under the Apache License, Version
+ * 2.0 (the "License"); you may not use this file except in compliance with 
the License.  You may obtain a copy of the License at
  *
  * http://www.apache.org/licenses/LICENSE-2.0
  *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
+ * Unless required by applicable law or agreed to in writing, software 
distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 
See the License for the specific language governing permissions
+ * and limitations under the License.
  */
+
 package org.apache.storm.hbase.trident.state;
 
-import org.apache.storm.Config;
-import org.apache.storm.topology.FailedException;
-import org.apache.storm.tuple.Values;
 import com.google.common.collect.Lists;
+import java.io.Serializable;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.hbase.HBaseConfiguration;
-import org.apache.hadoop.hbase.client.*;
+import org.apache.hadoop.hbase.client.Durability;
+import org.apache.hadoop.hbase.client.Get;
+import org.apache.hadoop.hbase.client.Mutation;
+import org.apache.hadoop.hbase.client.Result;
+import org.apache.storm.Config;
 import org.apache.storm.hbase.bolt.mapper.HBaseProjectionCriteria;
 import org.apache.storm.hbase.bolt.mapper.HBaseValueMapper;
 import org.apache.storm.hbase.common.ColumnList;
 import org.apache.storm.hbase.common.HBaseClient;
 import org.apache.storm.hbase.trident.mapper.TridentHBaseMapper;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
+import org.apache.storm.topology.FailedException;
 import org.apache.storm.trident.operation.TridentCollector;
 import org.apache.storm.trident.state.State;
 import org.apache.storm.trident.tuple.TridentTuple;
-
-import java.io.Serializable;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
+import org.apache.storm.tuple.Values;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 public class HBaseState implements State {
 
@@ -57,49 +54,10 @@ public class HBaseState implements State {
         this.numPartitions = numPartitions;
     }
 
-    public static class Options implements Serializable {
-        private TridentHBaseMapper mapper;
-        private Durability durability = Durability.SKIP_WAL;
-        private HBaseProjectionCriteria projectionCriteria;
-        private HBaseValueMapper rowToStormValueMapper;
-        private String configKey;
-        private String tableName;
-
-        public Options withDurability(Durability durability) {
-            this.durability = durability;
-            return this;
-        }
-
-        public Options withProjectionCriteria(HBaseProjectionCriteria 
projectionCriteria) {
-            this.projectionCriteria = projectionCriteria;
-            return this;
-        }
-
-        public Options withConfigKey(String configKey) {
-            this.configKey = configKey;
-            return this;
-        }
-
-        public Options withTableName(String tableName) {
-            this.tableName = tableName;
-            return this;
-        }
-
-        public Options withRowToStormValueMapper(HBaseValueMapper 
rowToStormValueMapper) {
-            this.rowToStormValueMapper = rowToStormValueMapper;
-            return this;
-        }
-
-        public Options withMapper(TridentHBaseMapper mapper) {
-            this.mapper = mapper;
-            return this;
-        }
-    }
-
     protected void prepare() {
         final Configuration hbConfig = HBaseConfiguration.create();
         Map<String, Object> conf = (Map<String, Object>) 
map.get(options.configKey);
-        if(conf == null){
+        if (conf == null) {
             LOG.info("HBase configuration not found using key '" + 
options.configKey + "'");
             LOG.info("Using HBase config from first hbase-site.xml found on 
classpath.");
         } else {
@@ -156,7 +114,7 @@ public class HBaseState implements State {
 
         try {
             Result[] results = hBaseClient.batchGet(gets);
-            for(int i = 0; i < results.length; i++) {
+            for (int i = 0; i < results.length; i++) {
                 Result result = results[i];
                 TridentTuple tuple = tridentTuples.get(i);
                 List<Values> values = 
options.rowToStormValueMapper.toValues(tuple, result);
@@ -168,4 +126,43 @@ public class HBaseState implements State {
         }
         return batchRetrieveResult;
     }
+
+    public static class Options implements Serializable {
+        private TridentHBaseMapper mapper;
+        private Durability durability = Durability.SKIP_WAL;
+        private HBaseProjectionCriteria projectionCriteria;
+        private HBaseValueMapper rowToStormValueMapper;
+        private String configKey;
+        private String tableName;
+
+        public Options withDurability(Durability durability) {
+            this.durability = durability;
+            return this;
+        }
+
+        public Options withProjectionCriteria(HBaseProjectionCriteria 
projectionCriteria) {
+            this.projectionCriteria = projectionCriteria;
+            return this;
+        }
+
+        public Options withConfigKey(String configKey) {
+            this.configKey = configKey;
+            return this;
+        }
+
+        public Options withTableName(String tableName) {
+            this.tableName = tableName;
+            return this;
+        }
+
+        public Options withRowToStormValueMapper(HBaseValueMapper 
rowToStormValueMapper) {
+            this.rowToStormValueMapper = rowToStormValueMapper;
+            return this;
+        }
+
+        public Options withMapper(TridentHBaseMapper mapper) {
+            this.mapper = mapper;
+            return this;
+        }
+    }
 }

http://git-wip-us.apache.org/repos/asf/storm/blob/880d14f1/external/storm-hbase/src/main/java/org/apache/storm/hbase/trident/state/HBaseStateFactory.java
----------------------------------------------------------------------
diff --git 
a/external/storm-hbase/src/main/java/org/apache/storm/hbase/trident/state/HBaseStateFactory.java
 
b/external/storm-hbase/src/main/java/org/apache/storm/hbase/trident/state/HBaseStateFactory.java
index 08e9dc3..99b87b5 100644
--- 
a/external/storm-hbase/src/main/java/org/apache/storm/hbase/trident/state/HBaseStateFactory.java
+++ 
b/external/storm-hbase/src/main/java/org/apache/storm/hbase/trident/state/HBaseStateFactory.java
@@ -1,28 +1,22 @@
 /**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
+ * Licensed to the Apache Software Foundation (ASF) under one or more 
contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.  The 
ASF licenses this file to you under the Apache License, Version
+ * 2.0 (the "License"); you may not use this file except in compliance with 
the License.  You may obtain a copy of the License at
  *
  * http://www.apache.org/licenses/LICENSE-2.0
  *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
+ * Unless required by applicable law or agreed to in writing, software 
distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 
See the License for the specific language governing permissions
+ * and limitations under the License.
  */
+
 package org.apache.storm.hbase.trident.state;
 
+import java.util.Map;
 import org.apache.storm.task.IMetricsContext;
 import org.apache.storm.trident.state.State;
 import org.apache.storm.trident.state.StateFactory;
 
-import java.util.Map;
-
 public class HBaseStateFactory implements StateFactory {
 
     private HBaseState.Options options;
@@ -33,7 +27,7 @@ public class HBaseStateFactory implements StateFactory {
 
     @Override
     public State makeState(Map<String, Object> map, IMetricsContext 
iMetricsContext, int partitionIndex, int numPartitions) {
-        HBaseState state = new HBaseState(map , partitionIndex, numPartitions, 
options);
+        HBaseState state = new HBaseState(map, partitionIndex, numPartitions, 
options);
         state.prepare();
         return state;
     }

http://git-wip-us.apache.org/repos/asf/storm/blob/880d14f1/external/storm-hbase/src/main/java/org/apache/storm/hbase/trident/state/HBaseUpdater.java
----------------------------------------------------------------------
diff --git 
a/external/storm-hbase/src/main/java/org/apache/storm/hbase/trident/state/HBaseUpdater.java
 
b/external/storm-hbase/src/main/java/org/apache/storm/hbase/trident/state/HBaseUpdater.java
index 57ca844..6340abc 100644
--- 
a/external/storm-hbase/src/main/java/org/apache/storm/hbase/trident/state/HBaseUpdater.java
+++ 
b/external/storm-hbase/src/main/java/org/apache/storm/hbase/trident/state/HBaseUpdater.java
@@ -1,29 +1,23 @@
 /**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
+ * Licensed to the Apache Software Foundation (ASF) under one or more 
contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.  The 
ASF licenses this file to you under the Apache License, Version
+ * 2.0 (the "License"); you may not use this file except in compliance with 
the License.  You may obtain a copy of the License at
  *
  * http://www.apache.org/licenses/LICENSE-2.0
  *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
+ * Unless required by applicable law or agreed to in writing, software 
distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 
See the License for the specific language governing permissions
+ * and limitations under the License.
  */
+
 package org.apache.storm.hbase.trident.state;
 
+import java.util.List;
 import org.apache.storm.trident.operation.TridentCollector;
 import org.apache.storm.trident.state.BaseStateUpdater;
 import org.apache.storm.trident.tuple.TridentTuple;
 
-import java.util.List;
-
-public class HBaseUpdater extends BaseStateUpdater<HBaseState>  {
+public class HBaseUpdater extends BaseStateUpdater<HBaseState> {
 
     @Override
     public void updateState(HBaseState hBaseState, List<TridentTuple> tuples, 
TridentCollector collector) {

http://git-wip-us.apache.org/repos/asf/storm/blob/880d14f1/external/storm-hbase/src/main/java/org/apache/storm/hbase/trident/windowing/HBaseWindowsStore.java
----------------------------------------------------------------------
diff --git 
a/external/storm-hbase/src/main/java/org/apache/storm/hbase/trident/windowing/HBaseWindowsStore.java
 
b/external/storm-hbase/src/main/java/org/apache/storm/hbase/trident/windowing/HBaseWindowsStore.java
index 702a790..086b477 100644
--- 
a/external/storm-hbase/src/main/java/org/apache/storm/hbase/trident/windowing/HBaseWindowsStore.java
+++ 
b/external/storm-hbase/src/main/java/org/apache/storm/hbase/trident/windowing/HBaseWindowsStore.java
@@ -1,23 +1,27 @@
 /**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
+ * Licensed to the Apache Software Foundation (ASF) under one or more 
contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.  The 
ASF licenses this file to you under the Apache License, Version
+ * 2.0 (the "License"); you may not use this file except in compliance with 
the License.  You may obtain a copy of the License at
  * <p/>
  * http://www.apache.org/licenses/LICENSE-2.0
  * <p/>
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
+ * Unless required by applicable law or agreed to in writing, software 
distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.  
See the License for the specific language governing permissions
+ * and limitations under the License.
  */
+
 package org.apache.storm.hbase.trident.windowing;
 
+import java.io.IOException;
+import java.io.UnsupportedEncodingException;
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.Queue;
+import java.util.concurrent.ConcurrentLinkedQueue;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.hbase.client.Delete;
 import org.apache.hadoop.hbase.client.Get;
@@ -31,32 +35,20 @@ import org.apache.storm.trident.windowing.WindowsStore;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import java.io.IOException;
-import java.io.UnsupportedEncodingException;
-import java.nio.ByteBuffer;
-import java.util.ArrayList;
-import java.util.Collection;
-import java.util.Iterator;
-import java.util.List;
-import java.util.Map;
-import java.util.Queue;
-import java.util.concurrent.ConcurrentLinkedQueue;
-
 /**
  * This class stores entries into hbase instance of the given configuration.
- *
  */
 public class HBaseWindowsStore implements WindowsStore {
-    private static final Logger LOG = 
LoggerFactory.getLogger(HBaseWindowsStore.class);
     public static final String UTF_8 = "utf-8";
-
+    private static final Logger LOG = 
LoggerFactory.getLogger(HBaseWindowsStore.class);
     private final ThreadLocal<HTable> threadLocalHtable;
     private final ThreadLocal<WindowKryoSerializer> 
threadLocalWindowKryoSerializer;
     private final Queue<HTable> htables = new ConcurrentLinkedQueue<>();
     private final byte[] family;
     private final byte[] qualifier;
 
-    public HBaseWindowsStore(final Map<String, Object> topoConf, final 
Configuration config, final String tableName, byte[] family, byte[] qualifier) {
+    public HBaseWindowsStore(final Map<String, Object> topoConf, final 
Configuration config, final String tableName, byte[] family,
+                             byte[] qualifier) {
         this.family = family;
         this.qualifier = qualifier;
 
@@ -73,7 +65,7 @@ public class HBaseWindowsStore implements WindowsStore {
             }
         };
 
-        threadLocalWindowKryoSerializer = new 
ThreadLocal<WindowKryoSerializer>(){
+        threadLocalWindowKryoSerializer = new 
ThreadLocal<WindowKryoSerializer>() {
             @Override
             protected WindowKryoSerializer initialValue() {
                 return new WindowKryoSerializer(topoConf);
@@ -111,7 +103,7 @@ public class HBaseWindowsStore implements WindowsStore {
             throw new RuntimeException(e);
         }
 
-        if(result.isEmpty()) {
+        if (result.isEmpty()) {
             return null;
         }
 
@@ -136,11 +128,11 @@ public class HBaseWindowsStore implements WindowsStore {
         }
 
         List<Object> values = new ArrayList<>();
-        for (int i=0; i<results.length; i++) {
+        for (int i = 0; i < results.length; i++) {
             Result result = results[i];
-            if(result.isEmpty()) {
+            if (result.isEmpty()) {
                 LOG.error("Got empty result for key [{}]", keys.get(i));
-                throw new RuntimeException("Received empty result for key: 
"+keys.get(i));
+                throw new RuntimeException("Received empty result for key: " + 
keys.get(i));
             }
             Object resultObject = 
windowKryoSerializer().deserialize(result.getValue(family, qualifier));
             values.add(resultObject);
@@ -200,8 +192,8 @@ public class HBaseWindowsStore implements WindowsStore {
         WindowsStore.Entry.nonNullCheckForKey(key);
         WindowsStore.Entry.nonNullCheckForValue(value);
 
-        if(value == null) {
-            throw new IllegalArgumentException("Invalid value of null with 
key: "+key);
+        if (value == null) {
+            throw new IllegalArgumentException("Invalid value of null with 
key: " + key);
         }
         Put put = new Put(effectiveKey(key));
         put.addColumn(family, ByteBuffer.wrap(qualifier), 
System.currentTimeMillis(), 
windowKryoSerializer().serializeToByteBuffer(value));
@@ -217,7 +209,8 @@ public class HBaseWindowsStore implements WindowsStore {
         List<Put> list = new ArrayList<>();
         for (Entry entry : entries) {
             Put put = new Put(effectiveKey(entry.key));
-            put.addColumn(family, ByteBuffer.wrap(qualifier), 
System.currentTimeMillis(), 
windowKryoSerializer().serializeToByteBuffer(entry.value));
+            put.addColumn(family, ByteBuffer.wrap(qualifier), 
System.currentTimeMillis(),
+                          
windowKryoSerializer().serializeToByteBuffer(entry.value));
             list.add(put);
         }
 

http://git-wip-us.apache.org/repos/asf/storm/blob/880d14f1/external/storm-hbase/src/main/java/org/apache/storm/hbase/trident/windowing/HBaseWindowsStoreFactory.java
----------------------------------------------------------------------
diff --git 
a/external/storm-hbase/src/main/java/org/apache/storm/hbase/trident/windowing/HBaseWindowsStoreFactory.java
 
b/external/storm-hbase/src/main/java/org/apache/storm/hbase/trident/windowing/HBaseWindowsStoreFactory.java
index f0c8805..9981811 100644
--- 
a/external/storm-hbase/src/main/java/org/apache/storm/hbase/trident/windowing/HBaseWindowsStoreFactory.java
+++ 
b/external/storm-hbase/src/main/java/org/apache/storm/hbase/trident/windowing/HBaseWindowsStoreFactory.java
@@ -1,33 +1,25 @@
 /**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
+ * Licensed to the Apache Software Foundation (ASF) under one or more 
contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.  The 
ASF licenses this file to you under the Apache License, Version
+ * 2.0 (the "License"); you may not use this file except in compliance with 
the License.  You may obtain a copy of the License at
  * <p/>
  * http://www.apache.org/licenses/LICENSE-2.0
  * <p/>
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
+ * Unless required by applicable law or agreed to in writing, software 
distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.  
See the License for the specific language governing permissions
+ * and limitations under the License.
  */
+
 package org.apache.storm.hbase.trident.windowing;
 
+import java.util.Map;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.hbase.HBaseConfiguration;
 import org.apache.storm.trident.windowing.WindowsStore;
 import org.apache.storm.trident.windowing.WindowsStoreFactory;
 
-import java.util.Map;
-
 /**
  * Factory to create {@link HBaseWindowsStore} instances.
- *
  */
 public class HBaseWindowsStoreFactory implements WindowsStoreFactory {
     private final Map<String, Object> config;

http://git-wip-us.apache.org/repos/asf/storm/blob/880d14f1/external/storm-hbase/src/test/java/org/apache/storm/hbase/state/HBaseClientTestUtil.java
----------------------------------------------------------------------
diff --git 
a/external/storm-hbase/src/test/java/org/apache/storm/hbase/state/HBaseClientTestUtil.java
 
b/external/storm-hbase/src/test/java/org/apache/storm/hbase/state/HBaseClientTestUtil.java
index c4fa7c1..24ac8c5 100644
--- 
a/external/storm-hbase/src/test/java/org/apache/storm/hbase/state/HBaseClientTestUtil.java
+++ 
b/external/storm-hbase/src/test/java/org/apache/storm/hbase/state/HBaseClientTestUtil.java
@@ -18,11 +18,7 @@
 
 package org.apache.storm.hbase.state;
 
-import static org.mockito.Matchers.any;
-import static org.mockito.Mockito.mock;
-
 import com.google.common.primitives.UnsignedBytes;
-
 import java.io.IOException;
 import java.util.ArrayList;
 import java.util.Collections;
@@ -34,7 +30,6 @@ import java.util.NavigableSet;
 import java.util.TreeMap;
 import java.util.concurrent.ConcurrentNavigableMap;
 import java.util.concurrent.ConcurrentSkipListMap;
-
 import org.apache.hadoop.hbase.Cell;
 import org.apache.hadoop.hbase.CellUtil;
 import org.apache.hadoop.hbase.KeyValue;
@@ -52,27 +47,30 @@ import org.mockito.Mockito;
 import org.mockito.invocation.InvocationOnMock;
 import org.mockito.stubbing.Answer;
 
+import static org.mockito.Matchers.any;
+import static org.mockito.Mockito.mock;
+
 public class HBaseClientTestUtil {
     private HBaseClientTestUtil() {
     }
 
     public static HBaseClient mockedHBaseClient() throws Exception {
         return mockedHBaseClient(new ConcurrentSkipListMap<byte[], 
NavigableMap<byte[], NavigableMap<byte[], byte[]>>>(
-                UnsignedBytes.lexicographicalComparator()));
+            UnsignedBytes.lexicographicalComparator()));
     }
 
     public static HBaseClient mockedHBaseClient(
-            ConcurrentNavigableMap<byte[], NavigableMap<byte[], 
NavigableMap<byte[], byte[]>>> internalMap)
-            throws Exception {
+        ConcurrentNavigableMap<byte[], NavigableMap<byte[], 
NavigableMap<byte[], byte[]>>> internalMap)
+        throws Exception {
         HBaseClient mockClient = mock(HBaseClient.class);
 
         Mockito.doNothing().when(mockClient).close();
 
         Mockito.when(mockClient.constructGetRequests(any(byte[].class), 
any(HBaseProjectionCriteria.class)))
-                .thenCallRealMethod();
+               .thenCallRealMethod();
 
         Mockito.when(mockClient.constructMutationReq(any(byte[].class), 
any(ColumnList.class), any(Durability.class)))
-                .thenCallRealMethod();
+               .thenCallRealMethod();
 
         Mockito.when(mockClient.exists(any(Get.class))).thenAnswer(new 
ExistsAnswer(internalMap));
         Mockito.when(mockClient.batchGet(any(List.class))).thenAnswer(new 
BatchGetAnswer(internalMap));
@@ -84,8 +82,8 @@ public class HBaseClientTestUtil {
 
     static class BuildCellsHelper {
         public static void addMatchingColumnFamilies(byte[] rowKey, 
Map<byte[], NavigableSet<byte[]>> familyMap,
-                                               NavigableMap<byte[], 
NavigableMap<byte[], byte[]>> cfToQualifierToValueMap,
-                                               List<Cell> cells) {
+                                                     NavigableMap<byte[], 
NavigableMap<byte[], byte[]>> cfToQualifierToValueMap,
+                                                     List<Cell> cells) {
             for (Map.Entry<byte[], NavigableSet<byte[]>> entry : 
familyMap.entrySet()) {
                 byte[] columnFamily = entry.getKey();
 
@@ -101,9 +99,9 @@ public class HBaseClientTestUtil {
         }
 
         public static void addMatchingQualifiers(byte[] rowKey, byte[] 
columnFamily,
-                                           Map.Entry<byte[], 
NavigableSet<byte[]>> qualifierSet,
-                                           NavigableMap<byte[], byte[]> 
qualifierToValueMap,
-                                           List<Cell> cells) {
+                                                 Map.Entry<byte[], 
NavigableSet<byte[]>> qualifierSet,
+                                                 NavigableMap<byte[], byte[]> 
qualifierToValueMap,
+                                                 List<Cell> cells) {
             for (byte[] qualifier : qualifierSet.getValue()) {
                 byte[] value = qualifierToValueMap.get(qualifier);
                 if (value != null) {
@@ -113,7 +111,7 @@ public class HBaseClientTestUtil {
         }
 
         public static void addAllColumnFamilies(byte[] rowKey, 
NavigableMap<byte[], NavigableMap<byte[], byte[]>> cfToQualifierToValueMap,
-                                          List<Cell> cells) {
+                                                List<Cell> cells) {
             for (Map.Entry<byte[], NavigableMap<byte[], byte[]>> entry : 
cfToQualifierToValueMap.entrySet()) {
                 byte[] columnFamily = entry.getKey();
                 addAllQualifiers(rowKey, columnFamily, entry.getValue(), 
cells);
@@ -121,7 +119,7 @@ public class HBaseClientTestUtil {
         }
 
         public static void addAllQualifiers(byte[] rowKey, byte[] columnFamily,
-                                      NavigableMap<byte[], byte[]> 
qualifierToValueMap, List<Cell> cells) {
+                                            NavigableMap<byte[], byte[]> 
qualifierToValueMap, List<Cell> cells) {
             for (Map.Entry<byte[], byte[]> entry2 : 
qualifierToValueMap.entrySet()) {
                 byte[] qualifier = entry2.getKey();
                 byte[] value = entry2.getValue();
@@ -149,7 +147,7 @@ public class HBaseClientTestUtil {
                 byte[] rowKey = get.getRow();
 
                 NavigableMap<byte[], NavigableMap<byte[], byte[]>> 
cfToQualifierToValueMap =
-                        mockMap.get(rowKey);
+                    mockMap.get(rowKey);
 
                 if (cfToQualifierToValueMap != null) {
                     Map<byte[], NavigableSet<byte[]>> familyMap = 
get.getFamilyMap();
@@ -310,7 +308,7 @@ public class HBaseClientTestUtil {
             byte[] endKey = (byte[]) args[1];
 
             final ConcurrentNavigableMap<byte[], NavigableMap<byte[], 
NavigableMap<byte[], byte[]>>> subMap =
-                    mockMap.subMap(startKey, true, endKey, false);
+                mockMap.subMap(startKey, true, endKey, false);
 
             final List<Result> results = buildResults(subMap);
 
@@ -353,7 +351,7 @@ public class HBaseClientTestUtil {
             @Override
             public Result[] next(int nbRows) throws IOException {
                 List<Result> bulkResult = new ArrayList<>();
-                for (int i = 0 ; i < nbRows ; i++) {
+                for (int i = 0; i < nbRows; i++) {
                     Result result = next();
                     if (result == null) {
                         break;

http://git-wip-us.apache.org/repos/asf/storm/blob/880d14f1/external/storm-hbase/src/test/java/org/apache/storm/hbase/state/HBaseKeyValueStateIteratorTest.java
----------------------------------------------------------------------
diff --git 
a/external/storm-hbase/src/test/java/org/apache/storm/hbase/state/HBaseKeyValueStateIteratorTest.java
 
b/external/storm-hbase/src/test/java/org/apache/storm/hbase/state/HBaseKeyValueStateIteratorTest.java
index aa965d1..250e337 100644
--- 
a/external/storm-hbase/src/test/java/org/apache/storm/hbase/state/HBaseKeyValueStateIteratorTest.java
+++ 
b/external/storm-hbase/src/test/java/org/apache/storm/hbase/state/HBaseKeyValueStateIteratorTest.java
@@ -16,13 +16,7 @@
 
 package org.apache.storm.hbase.state;
 
-import static org.apache.storm.hbase.state.HBaseKeyValueState.STATE_QUALIFIER;
-import static org.junit.Assert.assertArrayEquals;
-import static org.junit.Assert.assertFalse;
-import static org.junit.Assert.assertTrue;
-
 import com.google.common.primitives.UnsignedBytes;
-
 import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.Collections;
@@ -30,7 +24,6 @@ import java.util.List;
 import java.util.Map;
 import java.util.NavigableMap;
 import java.util.TreeMap;
-
 import org.apache.hadoop.hbase.client.Delete;
 import org.apache.hadoop.hbase.client.Durability;
 import org.apache.hadoop.hbase.client.Mutation;
@@ -42,6 +35,11 @@ import org.apache.storm.state.Serializer;
 import org.junit.Before;
 import org.junit.Test;
 
+import static org.apache.storm.hbase.state.HBaseKeyValueState.STATE_QUALIFIER;
+import static org.junit.Assert.assertArrayEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+
 /**
  * Test for HBaseKeyValueStateIterator.
  */
@@ -81,8 +79,8 @@ public class HBaseKeyValueStateIteratorTest {
         applyPendingStateToHBase(chunkMap);
 
         HBaseKeyValueStateIterator<byte[], byte[]> kvIterator =
-                new HBaseKeyValueStateIterator<>(namespace, columnFamily, 
mockHBaseClient, pendingPrepare.entrySet().iterator(),
-                        pendingCommit.entrySet().iterator(), chunkSize, 
keySerializer, valueSerializer);
+            new HBaseKeyValueStateIterator<>(namespace, columnFamily, 
mockHBaseClient, pendingPrepare.entrySet().iterator(),
+                                             
pendingCommit.entrySet().iterator(), chunkSize, keySerializer, valueSerializer);
 
         assertNextEntry(kvIterator, "key0".getBytes(), "value0".getBytes());
 
@@ -111,8 +109,8 @@ public class HBaseKeyValueStateIteratorTest {
         applyPendingStateToHBase(chunkMap);
 
         HBaseKeyValueStateIterator<byte[], byte[]> kvIterator =
-                new HBaseKeyValueStateIterator<>(namespace, columnFamily, 
mockHBaseClient, pendingPrepare.entrySet().iterator(),
-                        pendingCommit.entrySet().iterator(), chunkSize, 
keySerializer, valueSerializer);
+            new HBaseKeyValueStateIterator<>(namespace, columnFamily, 
mockHBaseClient, pendingPrepare.entrySet().iterator(),
+                                             
pendingCommit.entrySet().iterator(), chunkSize, keySerializer, valueSerializer);
 
         // keys shouldn't appear twice
 
@@ -134,8 +132,8 @@ public class HBaseKeyValueStateIteratorTest {
         NavigableMap<byte[], byte[]> pendingCommit = getBinaryTreeMap();
 
         HBaseKeyValueStateIterator<byte[], byte[]> kvIterator =
-                new HBaseKeyValueStateIterator<>(namespace, columnFamily, 
mockHBaseClient, pendingPrepare.entrySet().iterator(),
-                        pendingCommit.entrySet().iterator(), chunkSize, 
keySerializer, valueSerializer);
+            new HBaseKeyValueStateIterator<>(namespace, columnFamily, 
mockHBaseClient, pendingPrepare.entrySet().iterator(),
+                                             
pendingCommit.entrySet().iterator(), chunkSize, keySerializer, valueSerializer);
 
         assertFalse(kvIterator.hasNext());
     }
@@ -170,7 +168,7 @@ public class HBaseKeyValueStateIteratorTest {
                 mutations.add(new Delete(getRowKeyForStateKey(rowKey)));
             } else {
                 List<Mutation> mutationsForRow = 
prepareMutateRow(getRowKeyForStateKey(rowKey), columnFamily,
-                        Collections.singletonMap(STATE_QUALIFIER, value));
+                                                                  
Collections.singletonMap(STATE_QUALIFIER, value));
                 mutations.addAll(mutationsForRow);
             }
         }
@@ -204,7 +202,7 @@ public class HBaseKeyValueStateIteratorTest {
     }
 
     private void mutateRow(byte[] rowKey, byte[] columnFamily, Map<byte[], 
byte[]> map)
-            throws Exception {
+        throws Exception {
         mutateRow(rowKey, columnFamily, map, Durability.USE_DEFAULT);
     }
 

http://git-wip-us.apache.org/repos/asf/storm/blob/880d14f1/external/storm-hbase/src/test/java/org/apache/storm/hbase/state/HBaseKeyValueStateProviderTest.java
----------------------------------------------------------------------
diff --git 
a/external/storm-hbase/src/test/java/org/apache/storm/hbase/state/HBaseKeyValueStateProviderTest.java
 
b/external/storm-hbase/src/test/java/org/apache/storm/hbase/state/HBaseKeyValueStateProviderTest.java
index bd38e57..b7c562f 100644
--- 
a/external/storm-hbase/src/test/java/org/apache/storm/hbase/state/HBaseKeyValueStateProviderTest.java
+++ 
b/external/storm-hbase/src/test/java/org/apache/storm/hbase/state/HBaseKeyValueStateProviderTest.java
@@ -18,11 +18,10 @@
 
 package org.apache.storm.hbase.state;
 
-import org.apache.storm.Config;
-import org.junit.Test;
-
 import java.util.HashMap;
 import java.util.Map;
+import org.apache.storm.Config;
+import org.junit.Test;
 
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertTrue;
@@ -38,7 +37,7 @@ public class HBaseKeyValueStateProviderTest {
         HBaseKeyValueStateProvider provider = new HBaseKeyValueStateProvider();
         Map<String, String> stormConf = new HashMap<>();
         stormConf.put(Config.TOPOLOGY_STATE_PROVIDER_CONFIG, 
"{\"keyClass\":\"String\", \"valueClass\":\"String\"," +
-                " \"tableName\": \"table\", \"columnFamily\": \"cf\"}");
+                                                             " \"tableName\": 
\"table\", \"columnFamily\": \"cf\"}");
 
         try {
             HBaseKeyValueStateProvider.StateConfig config = 
provider.getStateConfig(stormConf);
@@ -53,7 +52,7 @@ public class HBaseKeyValueStateProviderTest {
         HBaseKeyValueStateProvider provider = new HBaseKeyValueStateProvider();
         Map<String, String> stormConf = new HashMap<>();
         stormConf.put(Config.TOPOLOGY_STATE_PROVIDER_CONFIG, 
"{\"keyClass\":\"String\", \"valueClass\":\"String\"," +
-                " \"hbaseConfigKey\": \"hbaseConfKey\", \"columnFamily\": 
\"cf\"}");
+                                                             " 
\"hbaseConfigKey\": \"hbaseConfKey\", \"columnFamily\": \"cf\"}");
 
         try {
             HBaseKeyValueStateProvider.StateConfig config = 
provider.getStateConfig(stormConf);
@@ -68,7 +67,7 @@ public class HBaseKeyValueStateProviderTest {
         HBaseKeyValueStateProvider provider = new HBaseKeyValueStateProvider();
         Map<String, String> stormConf = new HashMap<>();
         stormConf.put(Config.TOPOLOGY_STATE_PROVIDER_CONFIG, 
"{\"keyClass\":\"String\", \"valueClass\":\"String\"," +
-                " \"hbaseConfigKey\": \"hbaseConfKey\", \"tableName\": 
\"table\"}");
+                                                             " 
\"hbaseConfigKey\": \"hbaseConfKey\", \"tableName\": \"table\"}");
 
         try {
             HBaseKeyValueStateProvider.StateConfig config = 
provider.getStateConfig(stormConf);
@@ -83,8 +82,8 @@ public class HBaseKeyValueStateProviderTest {
         HBaseKeyValueStateProvider provider = new HBaseKeyValueStateProvider();
         Map<String, String> stormConf = new HashMap<>();
         stormConf.put(Config.TOPOLOGY_STATE_PROVIDER_CONFIG, 
"{\"keyClass\":\"String\", \"valueClass\":\"String\"," +
-                " \"hbaseConfigKey\": \"hbaseConfKey\", \"tableName\": 
\"table\"," +
-                " \"columnFamily\": \"columnFamily\"}");
+                                                             " 
\"hbaseConfigKey\": \"hbaseConfKey\", \"tableName\": \"table\"," +
+                                                             " 
\"columnFamily\": \"columnFamily\"}");
 
         HBaseKeyValueStateProvider.StateConfig config = 
provider.getStateConfig(stormConf);
         assertEquals("String", config.keyClass);

http://git-wip-us.apache.org/repos/asf/storm/blob/880d14f1/external/storm-hbase/src/test/java/org/apache/storm/hbase/state/HBaseKeyValueStateTest.java
----------------------------------------------------------------------
diff --git 
a/external/storm-hbase/src/test/java/org/apache/storm/hbase/state/HBaseKeyValueStateTest.java
 
b/external/storm-hbase/src/test/java/org/apache/storm/hbase/state/HBaseKeyValueStateTest.java
index 7827283..bf40409 100644
--- 
a/external/storm-hbase/src/test/java/org/apache/storm/hbase/state/HBaseKeyValueStateTest.java
+++ 
b/external/storm-hbase/src/test/java/org/apache/storm/hbase/state/HBaseKeyValueStateTest.java
@@ -19,17 +19,14 @@
 package org.apache.storm.hbase.state;
 
 import com.google.common.primitives.UnsignedBytes;
+import java.util.NavigableMap;
+import java.util.concurrent.ConcurrentNavigableMap;
+import java.util.concurrent.ConcurrentSkipListMap;
 import org.apache.storm.hbase.common.HBaseClient;
 import org.apache.storm.state.DefaultStateSerializer;
 import org.junit.Before;
 import org.junit.Test;
 
-import java.util.Map;
-import java.util.NavigableMap;
-import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.ConcurrentNavigableMap;
-import java.util.concurrent.ConcurrentSkipListMap;
-
 import static org.junit.Assert.assertArrayEquals;
 import static org.junit.Assert.assertEquals;
 
@@ -49,7 +46,7 @@ public class HBaseKeyValueStateTest {
         mockMap = new 
ConcurrentSkipListMap<>(UnsignedBytes.lexicographicalComparator());
         mockClient = HBaseClientTestUtil.mockedHBaseClient(mockMap);
         keyValueState = new HBaseKeyValueState<>(mockClient, COLUMN_FAMILY, 
NAMESPACE,
-                new DefaultStateSerializer<String>(), new 
DefaultStateSerializer<String>());
+                                                 new 
DefaultStateSerializer<String>(), new DefaultStateSerializer<String>());
     }
 
     @Test
@@ -80,38 +77,38 @@ public class HBaseKeyValueStateTest {
         keyValueState.put("b", "2");
         keyValueState.prepareCommit(1);
         keyValueState.put("c", "3");
-        assertArrayEquals(new String[]{"1", "2", "3"}, getValues());
+        assertArrayEquals(new String[]{ "1", "2", "3" }, getValues());
         keyValueState.rollback();
-        assertArrayEquals(new String[]{null, null, null}, getValues());
+        assertArrayEquals(new String[]{ null, null, null }, getValues());
         keyValueState.put("a", "1");
         keyValueState.put("b", "2");
         keyValueState.prepareCommit(1);
         keyValueState.commit(1);
         keyValueState.put("c", "3");
-        assertArrayEquals(new String[]{"1", "2", "3"}, getValues());
+        assertArrayEquals(new String[]{ "1", "2", "3" }, getValues());
         keyValueState.rollback();
-        assertArrayEquals(new String[]{"1", "2", null}, getValues());
+        assertArrayEquals(new String[]{ "1", "2", null }, getValues());
         keyValueState.put("c", "3");
         assertEquals("2", keyValueState.delete("b"));
         assertEquals("3", keyValueState.delete("c"));
-        assertArrayEquals(new String[]{"1", null, null}, getValues());
+        assertArrayEquals(new String[]{ "1", null, null }, getValues());
         keyValueState.prepareCommit(2);
-        assertArrayEquals(new String[]{"1", null, null}, getValues());
+        assertArrayEquals(new String[]{ "1", null, null }, getValues());
         keyValueState.commit(2);
-        assertArrayEquals(new String[]{"1", null, null}, getValues());
+        assertArrayEquals(new String[]{ "1", null, null }, getValues());
         keyValueState.put("b", "2");
         keyValueState.prepareCommit(3);
         keyValueState.put("c", "3");
-        assertArrayEquals(new String[]{"1", "2", "3"}, getValues());
+        assertArrayEquals(new String[]{ "1", "2", "3" }, getValues());
         keyValueState.rollback();
-        assertArrayEquals(new String[]{"1", null, null}, getValues());
+        assertArrayEquals(new String[]{ "1", null, null }, getValues());
     }
 
     private String[] getValues() {
         return new String[]{
-                keyValueState.get("a"),
-                keyValueState.get("b"),
-                keyValueState.get("c")
+            keyValueState.get("a"),
+            keyValueState.get("b"),
+            keyValueState.get("c")
         };
     }
 }
\ No newline at end of file

Reply via email to