Fixing stylecheck problems with storm-hbase

Project: http://git-wip-us.apache.org/repos/asf/storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/880d14f1
Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/880d14f1
Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/880d14f1

Branch: refs/heads/master
Commit: 880d14f1e7c6d450375b195f3aa5bc4045151fab
Parents: 6ccf6a0
Author: Kishor Patil <[email protected]>
Authored: Sun Apr 22 23:08:31 2018 -0400
Committer: Kishor Patil <[email protected]>
Committed: Mon Apr 23 00:37:54 2018 -0400

----------------------------------------------------------------------
 external/storm-hbase/pom.xml                    |   2 +-
 .../storm/hbase/bolt/AbstractHBaseBolt.java     |  42 ++---
 .../org/apache/storm/hbase/bolt/HBaseBolt.java  |  42 ++---
 .../storm/hbase/bolt/HBaseLookupBolt.java       | 161 +++++++++---------
 .../storm/hbase/bolt/mapper/HBaseMapper.java    |  27 ++-
 .../bolt/mapper/HBaseProjectionCriteria.java    |  75 ++++-----
 .../hbase/bolt/mapper/HBaseValueMapper.java     |  31 ++--
 .../hbase/bolt/mapper/SimpleHBaseMapper.java    |  55 +++---
 .../apache/storm/hbase/common/ColumnList.java   | 166 +++++++++----------
 .../apache/storm/hbase/common/HBaseClient.java  |  42 ++---
 .../org/apache/storm/hbase/common/IColumn.java  |  26 ++-
 .../org/apache/storm/hbase/common/ICounter.java |  25 ++-
 .../org/apache/storm/hbase/common/Utils.java    |  83 +++++-----
 .../storm/hbase/state/HBaseKeyValueState.java   |  36 ++--
 .../hbase/state/HBaseKeyValueStateIterator.java |  42 ++---
 .../hbase/state/HBaseKeyValueStateProvider.java |  31 ++--
 .../mapper/SimpleTridentHBaseMapMapper.java     |   2 +-
 .../mapper/SimpleTridentHBaseMapper.java        |  45 +++--
 .../trident/mapper/TridentHBaseMapper.java      |  34 ++--
 .../hbase/trident/state/HBaseMapState.java      | 142 ++++++++--------
 .../storm/hbase/trident/state/HBaseQuery.java   |  24 +--
 .../storm/hbase/trident/state/HBaseState.java   | 125 +++++++-------
 .../hbase/trident/state/HBaseStateFactory.java  |  24 +--
 .../storm/hbase/trident/state/HBaseUpdater.java |  24 +--
 .../trident/windowing/HBaseWindowsStore.java    |  65 ++++----
 .../windowing/HBaseWindowsStoreFactory.java     |  24 +--
 .../storm/hbase/state/HBaseClientTestUtil.java  |  38 ++---
 .../state/HBaseKeyValueStateIteratorTest.java   |  28 ++--
 .../state/HBaseKeyValueStateProviderTest.java   |  15 +-
 .../hbase/state/HBaseKeyValueStateTest.java     |  35 ++--
 30 files changed, 684 insertions(+), 827 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/storm/blob/880d14f1/external/storm-hbase/pom.xml
----------------------------------------------------------------------
diff --git a/external/storm-hbase/pom.xml b/external/storm-hbase/pom.xml
index 6d96620..e32f3c3 100644
--- a/external/storm-hbase/pom.xml
+++ b/external/storm-hbase/pom.xml
@@ -131,7 +131,7 @@
                 <artifactId>maven-checkstyle-plugin</artifactId>
                 <!--Note - the version would be inherited-->
                 <configuration>
-                    <maxAllowedViolations>371</maxAllowedViolations>
+                    <maxAllowedViolations>100</maxAllowedViolations>
                 </configuration>
             </plugin>
         </plugins>

http://git-wip-us.apache.org/repos/asf/storm/blob/880d14f1/external/storm-hbase/src/main/java/org/apache/storm/hbase/bolt/AbstractHBaseBolt.java
----------------------------------------------------------------------
diff --git 
a/external/storm-hbase/src/main/java/org/apache/storm/hbase/bolt/AbstractHBaseBolt.java
 
b/external/storm-hbase/src/main/java/org/apache/storm/hbase/bolt/AbstractHBaseBolt.java
index 9cf1935..0ff60b3 100644
--- 
a/external/storm-hbase/src/main/java/org/apache/storm/hbase/bolt/AbstractHBaseBolt.java
+++ 
b/external/storm-hbase/src/main/java/org/apache/storm/hbase/bolt/AbstractHBaseBolt.java
@@ -1,38 +1,32 @@
 /**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
+ * Licensed to the Apache Software Foundation (ASF) under one or more 
contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.  The 
ASF licenses this file to you under the Apache License, Version
+ * 2.0 (the "License"); you may not use this file except in compliance with 
the License.  You may obtain a copy of the License at
  *
  * http://www.apache.org/licenses/LICENSE-2.0
  *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
+ * Unless required by applicable law or agreed to in writing, software 
distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 
See the License for the specific language governing permissions
+ * and limitations under the License.
  */
+
 package org.apache.storm.hbase.bolt;
 
-import org.apache.storm.Config;
-import org.apache.storm.task.OutputCollector;
-import org.apache.storm.task.TopologyContext;
-import org.apache.storm.topology.base.BaseRichBolt;
+import java.io.IOException;
+import java.util.HashMap;
+import java.util.Map;
 import org.apache.commons.lang.Validate;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.hbase.HBaseConfiguration;
+import org.apache.storm.Config;
 import org.apache.storm.hbase.bolt.mapper.HBaseMapper;
 import org.apache.storm.hbase.common.HBaseClient;
+import org.apache.storm.task.OutputCollector;
+import org.apache.storm.task.TopologyContext;
+import org.apache.storm.topology.base.BaseRichBolt;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import java.io.IOException;
-import java.util.HashMap;
-import java.util.Map;
-
 // TODO support more configuration options, for now we're defaulting to the 
hbase-*.xml files found on the classpath
 public abstract class AbstractHBaseBolt extends BaseRichBolt {
     private static final Logger LOG = 
LoggerFactory.getLogger(AbstractHBaseBolt.class);
@@ -55,15 +49,15 @@ public abstract class AbstractHBaseBolt extends 
BaseRichBolt {
         this.collector = collector;
         final Configuration hbConfig = HBaseConfiguration.create();
 
-        Map<String, Object> conf = (Map<String, 
Object>)topoConf.get(this.configKey);
-        if(conf == null) {
+        Map<String, Object> conf = (Map<String, Object>) 
topoConf.get(this.configKey);
+        if (conf == null) {
             throw new IllegalArgumentException("HBase configuration not found 
using key '" + this.configKey + "'");
         }
 
-        if(conf.get("hbase.rootdir") == null) {
+        if (conf.get("hbase.rootdir") == null) {
             LOG.warn("No 'hbase.rootdir' value found in configuration! Using 
HBase defaults.");
         }
-        for(String key : conf.keySet()) {
+        for (String key : conf.keySet()) {
             hbConfig.set(key, String.valueOf(conf.get(key)));
         }
 

http://git-wip-us.apache.org/repos/asf/storm/blob/880d14f1/external/storm-hbase/src/main/java/org/apache/storm/hbase/bolt/HBaseBolt.java
----------------------------------------------------------------------
diff --git 
a/external/storm-hbase/src/main/java/org/apache/storm/hbase/bolt/HBaseBolt.java 
b/external/storm-hbase/src/main/java/org/apache/storm/hbase/bolt/HBaseBolt.java
index bae7aeb..fa6acb5 100644
--- 
a/external/storm-hbase/src/main/java/org/apache/storm/hbase/bolt/HBaseBolt.java
+++ 
b/external/storm-hbase/src/main/java/org/apache/storm/hbase/bolt/HBaseBolt.java
@@ -1,46 +1,39 @@
 /**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
+ * Licensed to the Apache Software Foundation (ASF) under one or more 
contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.  The 
ASF licenses this file to you under the Apache License, Version
+ * 2.0 (the "License"); you may not use this file except in compliance with 
the License.  You may obtain a copy of the License at
  *
  * http://www.apache.org/licenses/LICENSE-2.0
  *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
+ * Unless required by applicable law or agreed to in writing, software 
distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 
See the License for the specific language governing permissions
+ * and limitations under the License.
  */
+
 package org.apache.storm.hbase.bolt;
 
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Map;
+import org.apache.hadoop.hbase.client.Durability;
+import org.apache.hadoop.hbase.client.Mutation;
+import org.apache.storm.hbase.bolt.mapper.HBaseMapper;
+import org.apache.storm.hbase.common.ColumnList;
 import org.apache.storm.task.OutputCollector;
 import org.apache.storm.task.TopologyContext;
 import org.apache.storm.topology.OutputFieldsDeclarer;
 import org.apache.storm.tuple.Tuple;
 import org.apache.storm.utils.BatchHelper;
 import org.apache.storm.utils.TupleUtils;
-import org.apache.hadoop.hbase.client.Durability;
-import org.apache.hadoop.hbase.client.Mutation;
-import org.apache.storm.hbase.bolt.mapper.HBaseMapper;
-import org.apache.storm.hbase.common.ColumnList;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import java.util.Map;
-import java.util.List;
-import java.util.LinkedList;
-
 /**
  * Basic bolt for writing to HBase.
  *
  * Note: Each HBaseBolt defined in a topology is tied to a specific table.
- *
  */
-public class HBaseBolt  extends AbstractHBaseBolt {
+public class HBaseBolt extends AbstractHBaseBolt {
     private static final Logger LOG = LoggerFactory.getLogger(HBaseBolt.class);
     private static final int DEFAULT_FLUSH_INTERVAL_SECS = 1;
 
@@ -87,7 +80,8 @@ public class HBaseBolt  extends AbstractHBaseBolt {
             if (batchHelper.shouldHandle(tuple)) {
                 byte[] rowKey = this.mapper.rowKey(tuple);
                 ColumnList cols = this.mapper.columns(tuple);
-                List<Mutation> mutations = 
hBaseClient.constructMutationReq(rowKey, cols, writeToWAL? Durability.SYNC_WAL 
: Durability.SKIP_WAL);
+                List<Mutation> mutations =
+                    hBaseClient.constructMutationReq(rowKey, cols, writeToWAL 
? Durability.SYNC_WAL : Durability.SKIP_WAL);
                 batchMutations.addAll(mutations);
                 batchHelper.addBatch(tuple);
             }
@@ -98,7 +92,7 @@ public class HBaseBolt  extends AbstractHBaseBolt {
                 batchHelper.ack();
                 batchMutations.clear();
             }
-        } catch(Exception e){
+        } catch (Exception e) {
             batchHelper.fail(e);
             batchMutations.clear();
         }

http://git-wip-us.apache.org/repos/asf/storm/blob/880d14f1/external/storm-hbase/src/main/java/org/apache/storm/hbase/bolt/HBaseLookupBolt.java
----------------------------------------------------------------------
diff --git 
a/external/storm-hbase/src/main/java/org/apache/storm/hbase/bolt/HBaseLookupBolt.java
 
b/external/storm-hbase/src/main/java/org/apache/storm/hbase/bolt/HBaseLookupBolt.java
index d20aab3..538d896 100644
--- 
a/external/storm-hbase/src/main/java/org/apache/storm/hbase/bolt/HBaseLookupBolt.java
+++ 
b/external/storm-hbase/src/main/java/org/apache/storm/hbase/bolt/HBaseLookupBolt.java
@@ -1,35 +1,23 @@
 /**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
+ * Licensed to the Apache Software Foundation (ASF) under one or more 
contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.  The 
ASF licenses this file to you under the Apache License, Version
+ * 2.0 (the "License"); you may not use this file except in compliance with 
the License.  You may obtain a copy of the License at
  *
  * http://www.apache.org/licenses/LICENSE-2.0
  *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
+ * Unless required by applicable law or agreed to in writing, software 
distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 
See the License for the specific language governing permissions
+ * and limitations under the License.
  */
-package org.apache.storm.hbase.bolt;
 
-import org.apache.storm.topology.OutputFieldsDeclarer;
-import org.apache.storm.tuple.Tuple;
-import org.apache.storm.tuple.Values;
-import org.apache.storm.utils.TupleUtils;
+package org.apache.storm.hbase.bolt;
 
 import com.github.benmanes.caffeine.cache.CacheLoader;
 import com.github.benmanes.caffeine.cache.Caffeine;
 import com.github.benmanes.caffeine.cache.LoadingCache;
 import com.google.common.collect.Lists;
-
 import java.util.Map;
 import java.util.concurrent.TimeUnit;
-
 import org.apache.commons.lang.Validate;
 import org.apache.hadoop.hbase.client.Get;
 import org.apache.hadoop.hbase.client.Result;
@@ -38,6 +26,10 @@ import 
org.apache.storm.hbase.bolt.mapper.HBaseProjectionCriteria;
 import org.apache.storm.hbase.bolt.mapper.HBaseValueMapper;
 import org.apache.storm.task.OutputCollector;
 import org.apache.storm.task.TopologyContext;
+import org.apache.storm.topology.OutputFieldsDeclarer;
+import org.apache.storm.tuple.Tuple;
+import org.apache.storm.tuple.Values;
+import org.apache.storm.utils.TupleUtils;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -45,82 +37,81 @@ import org.slf4j.LoggerFactory;
  * Basic bolt for querying from HBase.
  *
  * Note: Each HBaseBolt defined in a topology is tied to a specific table.
- *
  */
 public class HBaseLookupBolt extends AbstractHBaseBolt {
-     private static final Logger LOG = 
LoggerFactory.getLogger(HBaseLookupBolt.class);
+    private static final Logger LOG = 
LoggerFactory.getLogger(HBaseLookupBolt.class);
 
-     private HBaseValueMapper rowToTupleMapper;
-     private HBaseProjectionCriteria projectionCriteria;
-     private transient LoadingCache<byte[], Result> cache;
-     private transient boolean cacheEnabled;
+    private HBaseValueMapper rowToTupleMapper;
+    private HBaseProjectionCriteria projectionCriteria;
+    private transient LoadingCache<byte[], Result> cache;
+    private transient boolean cacheEnabled;
 
-     public HBaseLookupBolt(String tableName, HBaseMapper mapper, 
HBaseValueMapper rowToTupleMapper) {
-          super(tableName, mapper);
-          Validate.notNull(rowToTupleMapper, "rowToTupleMapper can not be 
null");
-          this.rowToTupleMapper = rowToTupleMapper;
-     }
+    public HBaseLookupBolt(String tableName, HBaseMapper mapper, 
HBaseValueMapper rowToTupleMapper) {
+        super(tableName, mapper);
+        Validate.notNull(rowToTupleMapper, "rowToTupleMapper can not be null");
+        this.rowToTupleMapper = rowToTupleMapper;
+    }
 
-     public HBaseLookupBolt withConfigKey(String configKey) {
-          this.configKey = configKey;
-          return this;
-     }
+    public HBaseLookupBolt withConfigKey(String configKey) {
+        this.configKey = configKey;
+        return this;
+    }
 
-     public HBaseLookupBolt withProjectionCriteria(HBaseProjectionCriteria 
projectionCriteria) {
-          this.projectionCriteria = projectionCriteria;
-          return this;
-     }
+    public HBaseLookupBolt withProjectionCriteria(HBaseProjectionCriteria 
projectionCriteria) {
+        this.projectionCriteria = projectionCriteria;
+        return this;
+    }
 
-     @Override
-     public void prepare(Map<String, Object> config, TopologyContext 
topologyContext, OutputCollector collector) {
-          super.prepare(config, topologyContext, collector);
-          cacheEnabled = 
Boolean.parseBoolean(config.getOrDefault("hbase.cache.enable", 
"false").toString());
-          int cacheTTL = 
Integer.parseInt(config.getOrDefault("hbase.cache.ttl.seconds", 
"300").toString());
-          int maxCacheSize = 
Integer.parseInt(config.getOrDefault("hbase.cache.size", "1000").toString());
-          if (cacheEnabled) {
-               cache = 
Caffeine.newBuilder().maximumSize(maxCacheSize).expireAfterWrite(cacheTTL, 
TimeUnit.SECONDS)
-                         .build(new CacheLoader<byte[], Result>() {
+    @Override
+    public void prepare(Map<String, Object> config, TopologyContext 
topologyContext, OutputCollector collector) {
+        super.prepare(config, topologyContext, collector);
+        cacheEnabled = 
Boolean.parseBoolean(config.getOrDefault("hbase.cache.enable", 
"false").toString());
+        int cacheTTL = 
Integer.parseInt(config.getOrDefault("hbase.cache.ttl.seconds", 
"300").toString());
+        int maxCacheSize = 
Integer.parseInt(config.getOrDefault("hbase.cache.size", "1000").toString());
+        if (cacheEnabled) {
+            cache = 
Caffeine.newBuilder().maximumSize(maxCacheSize).expireAfterWrite(cacheTTL, 
TimeUnit.SECONDS)
+                            .build(new CacheLoader<byte[], Result>() {
 
-                              @Override
-                              public Result load(byte[] rowKey) throws 
Exception {
-                                   Get get = 
hBaseClient.constructGetRequests(rowKey, projectionCriteria);
-                                   if (LOG.isDebugEnabled()) {
+                                @Override
+                                public Result load(byte[] rowKey) throws 
Exception {
+                                    Get get = 
hBaseClient.constructGetRequests(rowKey, projectionCriteria);
+                                    if (LOG.isDebugEnabled()) {
                                         LOG.debug("Cache miss for key:" + new 
String(rowKey));
-                                   }
-                                   return 
hBaseClient.batchGet(Lists.newArrayList(get))[0];
-                              }
+                                    }
+                                    return 
hBaseClient.batchGet(Lists.newArrayList(get))[0];
+                                }
 
-                         });
-          }
-     }
+                            });
+        }
+    }
 
-     @Override
-     public void execute(Tuple tuple) {
-          if (TupleUtils.isTick(tuple)) {
-               collector.ack(tuple);
-               return;
-          }
-          byte[] rowKey = this.mapper.rowKey(tuple);
-          Result result = null;
-          try {
-               if (cacheEnabled) {
-                    result = cache.get(rowKey);
-               } else {
-                    Get get = hBaseClient.constructGetRequests(rowKey, 
projectionCriteria);
-                    result = hBaseClient.batchGet(Lists.newArrayList(get))[0];
-               }
-               for (Values values : rowToTupleMapper.toValues(tuple, result)) {
-                    this.collector.emit(tuple, values);
-               }
-               this.collector.ack(tuple);
-          } catch (Exception e) {
-               this.collector.reportError(e);
-               this.collector.fail(tuple);
-          }
-     }
+    @Override
+    public void execute(Tuple tuple) {
+        if (TupleUtils.isTick(tuple)) {
+            collector.ack(tuple);
+            return;
+        }
+        byte[] rowKey = this.mapper.rowKey(tuple);
+        Result result = null;
+        try {
+            if (cacheEnabled) {
+                result = cache.get(rowKey);
+            } else {
+                Get get = hBaseClient.constructGetRequests(rowKey, 
projectionCriteria);
+                result = hBaseClient.batchGet(Lists.newArrayList(get))[0];
+            }
+            for (Values values : rowToTupleMapper.toValues(tuple, result)) {
+                this.collector.emit(tuple, values);
+            }
+            this.collector.ack(tuple);
+        } catch (Exception e) {
+            this.collector.reportError(e);
+            this.collector.fail(tuple);
+        }
+    }
 
-     @Override
-     public void declareOutputFields(OutputFieldsDeclarer 
outputFieldsDeclarer) {
-          rowToTupleMapper.declareOutputFields(outputFieldsDeclarer);
-     }
+    @Override
+    public void declareOutputFields(OutputFieldsDeclarer outputFieldsDeclarer) 
{
+        rowToTupleMapper.declareOutputFields(outputFieldsDeclarer);
+    }
 }

http://git-wip-us.apache.org/repos/asf/storm/blob/880d14f1/external/storm-hbase/src/main/java/org/apache/storm/hbase/bolt/mapper/HBaseMapper.java
----------------------------------------------------------------------
diff --git 
a/external/storm-hbase/src/main/java/org/apache/storm/hbase/bolt/mapper/HBaseMapper.java
 
b/external/storm-hbase/src/main/java/org/apache/storm/hbase/bolt/mapper/HBaseMapper.java
index 70016c5..bf3eca6 100644
--- 
a/external/storm-hbase/src/main/java/org/apache/storm/hbase/bolt/mapper/HBaseMapper.java
+++ 
b/external/storm-hbase/src/main/java/org/apache/storm/hbase/bolt/mapper/HBaseMapper.java
@@ -1,31 +1,24 @@
 /**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
+ * Licensed to the Apache Software Foundation (ASF) under one or more 
contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.  The 
ASF licenses this file to you under the Apache License, Version
+ * 2.0 (the "License"); you may not use this file except in compliance with 
the License.  You may obtain a copy of the License at
  *
  * http://www.apache.org/licenses/LICENSE-2.0
  *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
+ * Unless required by applicable law or agreed to in writing, software 
distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 
See the License for the specific language governing permissions
+ * and limitations under the License.
  */
-package org.apache.storm.hbase.bolt.mapper;
 
+package org.apache.storm.hbase.bolt.mapper;
 
-import org.apache.storm.tuple.Tuple;
-import org.apache.storm.hbase.common.ColumnList;
 
 import java.io.Serializable;
+import org.apache.storm.hbase.common.ColumnList;
+import org.apache.storm.tuple.Tuple;
 
 /**
- * Maps a <code>org.apache.storm.tuple.Tuple</code> object
- * to a row in an HBase table.
+ * Maps a <code>org.apache.storm.tuple.Tuple</code> object to a row in an 
HBase table.
  */
 public interface HBaseMapper extends Serializable {
 

http://git-wip-us.apache.org/repos/asf/storm/blob/880d14f1/external/storm-hbase/src/main/java/org/apache/storm/hbase/bolt/mapper/HBaseProjectionCriteria.java
----------------------------------------------------------------------
diff --git 
a/external/storm-hbase/src/main/java/org/apache/storm/hbase/bolt/mapper/HBaseProjectionCriteria.java
 
b/external/storm-hbase/src/main/java/org/apache/storm/hbase/bolt/mapper/HBaseProjectionCriteria.java
index 7325c62..18322c6 100644
--- 
a/external/storm-hbase/src/main/java/org/apache/storm/hbase/bolt/mapper/HBaseProjectionCriteria.java
+++ 
b/external/storm-hbase/src/main/java/org/apache/storm/hbase/bolt/mapper/HBaseProjectionCriteria.java
@@ -1,61 +1,30 @@
 /**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
+ * Licensed to the Apache Software Foundation (ASF) under one or more 
contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.  The 
ASF licenses this file to you under the Apache License, Version
+ * 2.0 (the "License"); you may not use this file except in compliance with 
the License.  You may obtain a copy of the License at
  *
  * http://www.apache.org/licenses/LICENSE-2.0
  *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
+ * Unless required by applicable law or agreed to in writing, software 
distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 
See the License for the specific language governing permissions
+ * and limitations under the License.
  */
+
 package org.apache.storm.hbase.bolt.mapper;
 
 import com.google.common.collect.Lists;
-
 import java.io.Serializable;
 import java.util.Arrays;
 import java.util.List;
 
 /**
- * Allows the user to specify the projection criteria.
- * If only columnFamily is specified all columns from that family will be 
returned.
- * If a column is specified only that column from that family will be returned.
-
+ * Allows the user to specify the projection criteria. If only columnFamily is 
specified all columns from that family will be returned. If a
+ * column is specified only that column from that family will be returned.
  */
 public class HBaseProjectionCriteria implements Serializable {
     private List<byte[]> columnFamilies;
     private List<ColumnMetaData> columns;
 
-    public static class ColumnMetaData implements Serializable {
-        private byte[]  columnFamily;
-        private byte[] qualifier;
-
-        public ColumnMetaData(String columnFamily, String qualifier) {
-            this.columnFamily = columnFamily.getBytes();
-            this.qualifier = qualifier.getBytes();
-        }
-
-        public ColumnMetaData(byte[] columnFamily, byte[] qualifier) {
-            this.columnFamily = Arrays.copyOf(columnFamily, 
columnFamily.length);
-            this.qualifier = Arrays.copyOf(qualifier, qualifier.length);
-        }
-
-        public byte[] getColumnFamily() {
-            return columnFamily;
-        }
-
-        public byte[] getQualifier() {
-            return qualifier;
-        }
-    }
-
     public HBaseProjectionCriteria() {
         columnFamilies = Lists.newArrayList();
         columns = Lists.newArrayList();
@@ -63,6 +32,7 @@ public class HBaseProjectionCriteria implements Serializable {
 
     /**
      * all columns from this family will be included as result of HBase lookup.
+     *
      * @param columnFamily
      * @return
      */
@@ -73,6 +43,7 @@ public class HBaseProjectionCriteria implements Serializable {
 
     /**
      * all columns from this family will be included as result of HBase lookup.
+     *
      * @param columnFamily
      * @return
      */
@@ -83,6 +54,7 @@ public class HBaseProjectionCriteria implements Serializable {
 
     /**
      * Only this column from the the columnFamily will be included as result 
of HBase lookup.
+     *
      * @param column
      * @return
      */
@@ -98,4 +70,27 @@ public class HBaseProjectionCriteria implements Serializable 
{
     public List<byte[]> getColumnFamilies() {
         return columnFamilies;
     }
+
+    public static class ColumnMetaData implements Serializable {
+        private byte[] columnFamily;
+        private byte[] qualifier;
+
+        public ColumnMetaData(String columnFamily, String qualifier) {
+            this.columnFamily = columnFamily.getBytes();
+            this.qualifier = qualifier.getBytes();
+        }
+
+        public ColumnMetaData(byte[] columnFamily, byte[] qualifier) {
+            this.columnFamily = Arrays.copyOf(columnFamily, 
columnFamily.length);
+            this.qualifier = Arrays.copyOf(qualifier, qualifier.length);
+        }
+
+        public byte[] getColumnFamily() {
+            return columnFamily;
+        }
+
+        public byte[] getQualifier() {
+            return qualifier;
+        }
+    }
 }

http://git-wip-us.apache.org/repos/asf/storm/blob/880d14f1/external/storm-hbase/src/main/java/org/apache/storm/hbase/bolt/mapper/HBaseValueMapper.java
----------------------------------------------------------------------
diff --git 
a/external/storm-hbase/src/main/java/org/apache/storm/hbase/bolt/mapper/HBaseValueMapper.java
 
b/external/storm-hbase/src/main/java/org/apache/storm/hbase/bolt/mapper/HBaseValueMapper.java
index 38e879f..004cf03 100644
--- 
a/external/storm-hbase/src/main/java/org/apache/storm/hbase/bolt/mapper/HBaseValueMapper.java
+++ 
b/external/storm-hbase/src/main/java/org/apache/storm/hbase/bolt/mapper/HBaseValueMapper.java
@@ -1,42 +1,37 @@
 /**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
+ * Licensed to the Apache Software Foundation (ASF) under one or more 
contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.  The 
ASF licenses this file to you under the Apache License, Version
+ * 2.0 (the "License"); you may not use this file except in compliance with 
the License.  You may obtain a copy of the License at
  *
  * http://www.apache.org/licenses/LICENSE-2.0
  *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
+ * Unless required by applicable law or agreed to in writing, software 
distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 
See the License for the specific language governing permissions
+ * and limitations under the License.
  */
+
 package org.apache.storm.hbase.bolt.mapper;
 
+import java.io.Serializable;
+import java.util.List;
+import org.apache.hadoop.hbase.client.Result;
 import org.apache.storm.topology.OutputFieldsDeclarer;
 import org.apache.storm.tuple.ITuple;
 import org.apache.storm.tuple.Values;
-import org.apache.hadoop.hbase.client.Result;
-
-import java.io.Serializable;
-import java.util.List;
 
 public interface HBaseValueMapper extends Serializable {
     /**
-     *
-     * @param input tuple.
+     * @param input  tuple.
      * @param result HBase lookup result instance.
      * @return list of values that should be emitted by the lookup bolt.
+     *
      * @throws Exception
      */
     public List<Values> toValues(ITuple input, Result result) throws Exception;
 
     /**
      * declares the output fields for the lookup bolt.
+     *
      * @param declarer
      */
     void declareOutputFields(OutputFieldsDeclarer declarer);

http://git-wip-us.apache.org/repos/asf/storm/blob/880d14f1/external/storm-hbase/src/main/java/org/apache/storm/hbase/bolt/mapper/SimpleHBaseMapper.java
----------------------------------------------------------------------
diff --git 
a/external/storm-hbase/src/main/java/org/apache/storm/hbase/bolt/mapper/SimpleHBaseMapper.java
 
b/external/storm-hbase/src/main/java/org/apache/storm/hbase/bolt/mapper/SimpleHBaseMapper.java
index 8747405..f37995c 100644
--- 
a/external/storm-hbase/src/main/java/org/apache/storm/hbase/bolt/mapper/SimpleHBaseMapper.java
+++ 
b/external/storm-hbase/src/main/java/org/apache/storm/hbase/bolt/mapper/SimpleHBaseMapper.java
@@ -1,69 +1,66 @@
 /**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
+ * Licensed to the Apache Software Foundation (ASF) under one or more 
contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.  The 
ASF licenses this file to you under the Apache License, Version
+ * 2.0 (the "License"); you may not use this file except in compliance with 
the License.  You may obtain a copy of the License at
  *
  * http://www.apache.org/licenses/LICENSE-2.0
  *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
+ * Unless required by applicable law or agreed to in writing, software 
distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 
See the License for the specific language governing permissions
+ * and limitations under the License.
  */
+
 package org.apache.storm.hbase.bolt.mapper;
 
+import org.apache.storm.hbase.common.ColumnList;
 import org.apache.storm.tuple.Fields;
 import org.apache.storm.tuple.Tuple;
-import org.apache.storm.hbase.common.ColumnList;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
-import static org.apache.storm.hbase.common.Utils.*;
+
+import static org.apache.storm.hbase.common.Utils.toBytes;
+import static org.apache.storm.hbase.common.Utils.toLong;
 
 /**
  *
  */
 public class SimpleHBaseMapper implements HBaseMapper {
     private static final Logger LOG = 
LoggerFactory.getLogger(SimpleHBaseMapper.class);
-    
+
     private String rowKeyField;
-//    private String timestampField;
+    //    private String timestampField;
     private byte[] columnFamily;
     private Fields columnFields;
     private Fields counterFields;
 
-    public SimpleHBaseMapper(){
+    public SimpleHBaseMapper() {
     }
 
 
-    public SimpleHBaseMapper withRowKeyField(String rowKeyField){
+    public SimpleHBaseMapper withRowKeyField(String rowKeyField) {
         this.rowKeyField = rowKeyField;
         return this;
     }
 
-    public SimpleHBaseMapper withColumnFields(Fields columnFields){
+    public SimpleHBaseMapper withColumnFields(Fields columnFields) {
         this.columnFields = columnFields;
         return this;
     }
 
-    public SimpleHBaseMapper withCounterFields(Fields counterFields){
+    public SimpleHBaseMapper withCounterFields(Fields counterFields) {
         this.counterFields = counterFields;
         return this;
     }
 
-    public SimpleHBaseMapper withColumnFamily(String columnFamily){
+    public SimpleHBaseMapper withColumnFamily(String columnFamily) {
         this.columnFamily = columnFamily.getBytes();
         return this;
     }
 
-//    public SimpleTridentHBaseMapper withTimestampField(String 
timestampField){
-//        this.timestampField = timestampField;
-//        return this;
-//    }
+    //    public SimpleTridentHBaseMapper withTimestampField(String 
timestampField){
+    //        this.timestampField = timestampField;
+    //        return this;
+    //    }
 
     @Override
     public byte[] rowKey(Tuple tuple) {
@@ -74,14 +71,14 @@ public class SimpleHBaseMapper implements HBaseMapper {
     @Override
     public ColumnList columns(Tuple tuple) {
         ColumnList cols = new ColumnList();
-        if(this.columnFields != null){
+        if (this.columnFields != null) {
             // TODO timestamps
-            for(String field : this.columnFields){
+            for (String field : this.columnFields) {
                 cols.addColumn(this.columnFamily, field.getBytes(), 
toBytes(tuple.getValueByField(field)));
             }
         }
-        if(this.counterFields != null){
-            for(String field : this.counterFields){
+        if (this.counterFields != null) {
+            for (String field : this.counterFields) {
                 cols.addCounter(this.columnFamily, field.getBytes(), 
toLong(tuple.getValueByField(field)));
             }
         }

http://git-wip-us.apache.org/repos/asf/storm/blob/880d14f1/external/storm-hbase/src/main/java/org/apache/storm/hbase/common/ColumnList.java
----------------------------------------------------------------------
diff --git 
a/external/storm-hbase/src/main/java/org/apache/storm/hbase/common/ColumnList.java
 
b/external/storm-hbase/src/main/java/org/apache/storm/hbase/common/ColumnList.java
index 0abe1ad..045b2e5 100644
--- 
a/external/storm-hbase/src/main/java/org/apache/storm/hbase/common/ColumnList.java
+++ 
b/external/storm-hbase/src/main/java/org/apache/storm/hbase/common/ColumnList.java
@@ -1,20 +1,15 @@
 /**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
+ * Licensed to the Apache Software Foundation (ASF) under one or more 
contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.  The 
ASF licenses this file to you under the Apache License, Version
+ * 2.0 (the "License"); you may not use this file except in compliance with 
the License.  You may obtain a copy of the License at
  *
  * http://www.apache.org/licenses/LICENSE-2.0
  *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
+ * Unless required by applicable law or agreed to in writing, software 
distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 
See the License for the specific language governing permissions
+ * and limitations under the License.
  */
+
 package org.apache.storm.hbase.common;
 
 import java.util.ArrayList;
@@ -28,85 +23,33 @@ import java.util.List;
  * Standard columns have <i>column family</i> (required), <i>qualifier</i> 
(optional),
  * <i>timestamp</i> (optional), and a <i>value</i> (optional) values.
  *
- * Counter columns have <i>column family</i> (required), <i>qualifier</i> 
(optional),
- * and an <i>increment</i> (optional, but recommended) values.
- *
- * Inserts/Updates can be added via the <code>addColumn()</code> and 
<code>addCounter()</code>
- * methods.
+ * Counter columns have <i>column family</i> (required), <i>qualifier</i> 
(optional), and an <i>increment</i> (optional, but recommended)
+ * values.
  *
+ * Inserts/Updates can be added via the <code>addColumn()</code> and 
<code>addCounter()</code> methods.
  */
 public class ColumnList {
 
-    public static abstract class AbstractColumn {
-        byte[] family, qualifier;
-
-        AbstractColumn(byte[] family, byte[] qualifier){
-            this.family = family;
-            this.qualifier = qualifier;
-        }
-
-        public byte[] getFamily() {
-            return family;
-        }
-
-        public byte[] getQualifier() {
-            return qualifier;
-        }
-
-    }
-
-    public static class Column extends AbstractColumn {
-        byte[] value;
-        long ts = -1;
-        Column(byte[] family, byte[] qualifier, long ts, byte[] value){
-            super(family, qualifier);
-            this.value = value;
-            this.ts = ts;
-        }
-
-        public byte[] getValue() {
-            return value;
-        }
-
-        public long getTs() {
-            return ts;
-        }
-    }
-
-    public static class Counter extends AbstractColumn {
-        long incr = 0;
-        Counter(byte[] family, byte[] qualifier, long incr){
-            super(family, qualifier);
-            this.incr = incr;
-        }
-
-        public long getIncrement() {
-            return incr;
-        }
-    }
-
-
     private ArrayList<Column> columns;
     private ArrayList<Column> columnsToDelete;
     private ArrayList<Counter> counters;
 
-
-    private ArrayList<Column> columns(){
-        if(this.columns == null){
+    private ArrayList<Column> columns() {
+        if (this.columns == null) {
             this.columns = new ArrayList<Column>();
         }
         return this.columns;
     }
 
-    private ArrayList<Column> columnsToDelete(){
-        if(this.columnsToDelete == null){
+    private ArrayList<Column> columnsToDelete() {
+        if (this.columnsToDelete == null) {
             this.columnsToDelete = new ArrayList<Column>();
         }
         return this.columnsToDelete;
     }
 
-    private ArrayList<Counter> counters(){
-        if(this.counters == null){
+    private ArrayList<Counter> counters() {
+        if (this.counters == null) {
             this.counters = new ArrayList<Counter>();
         }
         return this.counters;
@@ -121,30 +64,31 @@ public class ColumnList {
      * @param value
      * @return
      */
-    public ColumnList addColumn(byte[] family, byte[] qualifier, long ts, 
byte[] value){
+    public ColumnList addColumn(byte[] family, byte[] qualifier, long ts, 
byte[] value) {
         columns().add(new Column(family, qualifier, ts, value));
         return this;
     }
 
     /**
      * Add a standard HBase column
+     *
      * @param family
      * @param qualifier
      * @param value
      * @return
      */
-    public ColumnList addColumn(byte[] family, byte[] qualifier, byte[] value){
+    public ColumnList addColumn(byte[] family, byte[] qualifier, byte[] value) 
{
         columns().add(new Column(family, qualifier, -1, value));
         return this;
     }
 
     /**
-     * Add a standard HBase column given an instance of a class that implements
-     * the <code>IColumn</code> interface.
+     * Add a standard HBase column given an instance of a class that 
implements the <code>IColumn</code> interface.
+     *
      * @param column
      * @return
      */
-    public ColumnList addColumn(IColumn column){
+    public ColumnList addColumn(IColumn column) {
         return this.addColumn(column.family(), column.qualifier(), 
column.timestamp(), column.value());
     }
 
@@ -156,7 +100,7 @@ public class ColumnList {
      * @param incr
      * @return
      */
-    public ColumnList addCounter(byte[] family, byte[] qualifier, long incr){
+    public ColumnList addCounter(byte[] family, byte[] qualifier, long incr) {
         counters().add(new Counter(family, qualifier, incr));
         return this;
     }
@@ -164,10 +108,11 @@ public class ColumnList {
     /**
      * Add an HBase counter column given an instance of a class that 
implements the
      * <code>ICounter</code> interface.
+     *
      * @param counter
      * @return
      */
-    public ColumnList addCounter(ICounter counter){
+    public ColumnList addCounter(ICounter counter) {
         return this.addCounter(counter.family(), counter.qualifier(), 
counter.increment());
     }
 
@@ -188,7 +133,7 @@ public class ColumnList {
      *
      * @return
      */
-    public boolean hasColumns(){
+    public boolean hasColumns() {
         return this.columns != null;
     }
 
@@ -197,7 +142,7 @@ public class ColumnList {
      *
      * @return
      */
-    public boolean hasColumnsToDelete(){
+    public boolean hasColumnsToDelete() {
         return this.columnsToDelete != null;
     }
 
@@ -206,7 +151,7 @@ public class ColumnList {
      *
      * @return
      */
-    public boolean hasCounters(){
+    public boolean hasCounters() {
         return this.counters != null;
     }
 
@@ -215,7 +160,7 @@ public class ColumnList {
      *
      * @return
      */
-    public List<Column> getColumns(){
+    public List<Column> getColumns() {
         return this.columns;
     }
 
@@ -230,10 +175,61 @@ public class ColumnList {
 
     /**
      * Get the list of counter definitions.
+     *
      * @return
      */
-    public List<Counter> getCounters(){
+    public List<Counter> getCounters() {
         return this.counters;
     }
 
+    public static abstract class AbstractColumn {
+        byte[] family, qualifier;
+
+        AbstractColumn(byte[] family, byte[] qualifier) {
+            this.family = family;
+            this.qualifier = qualifier;
+        }
+
+        public byte[] getFamily() {
+            return family;
+        }
+
+        public byte[] getQualifier() {
+            return qualifier;
+        }
+
+    }
+
+    public static class Column extends AbstractColumn {
+        byte[] value;
+        long ts = -1;
+
+        Column(byte[] family, byte[] qualifier, long ts, byte[] value) {
+            super(family, qualifier);
+            this.value = value;
+            this.ts = ts;
+        }
+
+        public byte[] getValue() {
+            return value;
+        }
+
+        public long getTs() {
+            return ts;
+        }
+    }
+
+    public static class Counter extends AbstractColumn {
+        long incr = 0;
+
+        Counter(byte[] family, byte[] qualifier, long incr) {
+            super(family, qualifier);
+            this.incr = incr;
+        }
+
+        public long getIncrement() {
+            return incr;
+        }
+    }
+
 }

http://git-wip-us.apache.org/repos/asf/storm/blob/880d14f1/external/storm-hbase/src/main/java/org/apache/storm/hbase/common/HBaseClient.java
----------------------------------------------------------------------
diff --git 
a/external/storm-hbase/src/main/java/org/apache/storm/hbase/common/HBaseClient.java
 
b/external/storm-hbase/src/main/java/org/apache/storm/hbase/common/HBaseClient.java
index ad5160c..208e622 100644
--- 
a/external/storm-hbase/src/main/java/org/apache/storm/hbase/common/HBaseClient.java
+++ 
b/external/storm-hbase/src/main/java/org/apache/storm/hbase/common/HBaseClient.java
@@ -1,30 +1,22 @@
 /**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
+ * Licensed to the Apache Software Foundation (ASF) under one or more 
contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.  The 
ASF licenses this file to you under the Apache License, Version
+ * 2.0 (the "License"); you may not use this file except in compliance with 
the License.  You may obtain a copy of the License at
  *
  * http://www.apache.org/licenses/LICENSE-2.0
  *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
+ * Unless required by applicable law or agreed to in writing, software 
distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 
See the License for the specific language governing permissions
+ * and limitations under the License.
  */
 
 package org.apache.storm.hbase.common;
 
 import com.google.common.collect.Lists;
-
 import java.io.Closeable;
 import java.io.IOException;
 import java.util.List;
 import java.util.Map;
-
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.hbase.HConstants;
 import org.apache.hadoop.hbase.client.Delete;
@@ -51,7 +43,7 @@ public class HBaseClient implements Closeable {
 
     private HTable table;
 
-    public HBaseClient(Map<String, Object> map , final Configuration 
configuration, final String tableName) {
+    public HBaseClient(Map<String, Object> map, final Configuration 
configuration, final String tableName) {
         try {
             UserProvider provider = HBaseSecurityUtil.login(map, 
configuration);
             this.table = Utils.getTable(provider, configuration, tableName);
@@ -69,16 +61,16 @@ public class HBaseClient implements Closeable {
             for (ColumnList.Column col : cols.getColumns()) {
                 if (col.getTs() > 0) {
                     put.add(
-                            col.getFamily(),
-                            col.getQualifier(),
-                            col.getTs(),
-                            col.getValue()
+                        col.getFamily(),
+                        col.getQualifier(),
+                        col.getTs(),
+                        col.getValue()
                     );
                 } else {
                     put.add(
-                            col.getFamily(),
-                            col.getQualifier(),
-                            col.getValue()
+                        col.getFamily(),
+                        col.getQualifier(),
+                        col.getValue()
                     );
                 }
             }
@@ -90,9 +82,9 @@ public class HBaseClient implements Closeable {
             inc.setDurability(durability);
             for (ColumnList.Counter cnt : cols.getCounters()) {
                 inc.addColumn(
-                        cnt.getFamily(),
-                        cnt.getQualifier(),
-                        cnt.getIncrement()
+                    cnt.getFamily(),
+                    cnt.getQualifier(),
+                    cnt.getIncrement()
                 );
             }
             mutations.add(inc);

http://git-wip-us.apache.org/repos/asf/storm/blob/880d14f1/external/storm-hbase/src/main/java/org/apache/storm/hbase/common/IColumn.java
----------------------------------------------------------------------
diff --git 
a/external/storm-hbase/src/main/java/org/apache/storm/hbase/common/IColumn.java 
b/external/storm-hbase/src/main/java/org/apache/storm/hbase/common/IColumn.java
index 36f7e96..8f64c2e 100644
--- 
a/external/storm-hbase/src/main/java/org/apache/storm/hbase/common/IColumn.java
+++ 
b/external/storm-hbase/src/main/java/org/apache/storm/hbase/common/IColumn.java
@@ -1,30 +1,26 @@
 /**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
+ * Licensed to the Apache Software Foundation (ASF) under one or more 
contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.  The 
ASF licenses this file to you under the Apache License, Version
+ * 2.0 (the "License"); you may not use this file except in compliance with 
the License.  You may obtain a copy of the License at
  *
  * http://www.apache.org/licenses/LICENSE-2.0
  *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
+ * Unless required by applicable law or agreed to in writing, software 
distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 
See the License for the specific language governing permissions
+ * and limitations under the License.
  */
+
 package org.apache.storm.hbase.common;
 
 /**
- * Interface definition for classes that support being written to HBase as
- * a regular column.
- *
+ * Interface definition for classes that support being written to HBase as a 
regular column.
  */
 public interface IColumn {
     byte[] family();
+
     byte[] qualifier();
+
     byte[] value();
+
     long timestamp();
 }

http://git-wip-us.apache.org/repos/asf/storm/blob/880d14f1/external/storm-hbase/src/main/java/org/apache/storm/hbase/common/ICounter.java
----------------------------------------------------------------------
diff --git 
a/external/storm-hbase/src/main/java/org/apache/storm/hbase/common/ICounter.java
 
b/external/storm-hbase/src/main/java/org/apache/storm/hbase/common/ICounter.java
index 43e3f60..ce9d84f 100644
--- 
a/external/storm-hbase/src/main/java/org/apache/storm/hbase/common/ICounter.java
+++ 
b/external/storm-hbase/src/main/java/org/apache/storm/hbase/common/ICounter.java
@@ -1,29 +1,24 @@
 /**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
+ * Licensed to the Apache Software Foundation (ASF) under one or more 
contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.  The 
ASF licenses this file to you under the Apache License, Version
+ * 2.0 (the "License"); you may not use this file except in compliance with 
the License.  You may obtain a copy of the License at
  *
  * http://www.apache.org/licenses/LICENSE-2.0
  *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
+ * Unless required by applicable law or agreed to in writing, software 
distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 
See the License for the specific language governing permissions
+ * and limitations under the License.
  */
+
 package org.apache.storm.hbase.common;
 
 /**
- * Interface definition for classes that support being written to HBase as
- * a counter column.
- *
+ * Interface definition for classes that support being written to HBase as a 
counter column.
  */
 public interface ICounter {
     byte[] family();
+
     byte[] qualifier();
+
     long increment();
 }

http://git-wip-us.apache.org/repos/asf/storm/blob/880d14f1/external/storm-hbase/src/main/java/org/apache/storm/hbase/common/Utils.java
----------------------------------------------------------------------
diff --git 
a/external/storm-hbase/src/main/java/org/apache/storm/hbase/common/Utils.java 
b/external/storm-hbase/src/main/java/org/apache/storm/hbase/common/Utils.java
index 981d4ff..0dca16e 100644
--- 
a/external/storm-hbase/src/main/java/org/apache/storm/hbase/common/Utils.java
+++ 
b/external/storm-hbase/src/main/java/org/apache/storm/hbase/common/Utils.java
@@ -1,22 +1,20 @@
 /**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
+ * Licensed to the Apache Software Foundation (ASF) under one or more 
contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.  The 
ASF licenses this file to you under the Apache License, Version
+ * 2.0 (the "License"); you may not use this file except in compliance with 
the License.  You may obtain a copy of the License at
  *
  * http://www.apache.org/licenses/LICENSE-2.0
  *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
+ * Unless required by applicable law or agreed to in writing, software 
distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 
See the License for the specific language governing permissions
+ * and limitations under the License.
  */
+
 package org.apache.storm.hbase.common;
 
+import java.io.IOException;
+import java.math.BigDecimal;
+import java.security.PrivilegedExceptionAction;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.hbase.client.HTable;
 import org.apache.hadoop.hbase.security.UserProvider;
@@ -27,18 +25,14 @@ import org.apache.hadoop.security.token.TokenIdentifier;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import java.io.IOException;
-import java.math.BigDecimal;
-import java.security.PrivilegedExceptionAction;
-
 public class Utils {
-    private static final Logger LOG = LoggerFactory.getLogger(Utils.class);
     public static final String TOKEN_KIND_HBASE_AUTH_TOKEN = 
"HBASE_AUTH_TOKEN";
+    private static final Logger LOG = LoggerFactory.getLogger(Utils.class);
 
-    private Utils(){}
+    private Utils() {}
 
     public static HTable getTable(UserProvider provider, Configuration config, 
String tableName)
-            throws IOException, InterruptedException {
+        throws IOException, InterruptedException {
         UserGroupInformation ugi;
         if (provider != null) {
             ugi = provider.getCurrent().getUGI();
@@ -51,7 +45,7 @@ public class Utils {
             boolean foundHBaseAuthToken = false;
             for (Token<? extends TokenIdentifier> token : ugi.getTokens()) {
                 LOG.debug("Token in UGI (delegation token): {} / {}", 
token.toString(),
-                        token.decodeIdentifier().getUser());
+                          token.decodeIdentifier().getUser());
 
                 // token.getKind() = Text, Text is annotated by @Stringable
                 // which ensures toString() implementation
@@ -66,7 +60,7 @@ public class Utils {
                         foundHBaseAuthToken = true;
                     } else {
                         LOG.warn("Found multiple HBASE_AUTH_TOKEN - will use 
already found token. " +
-                                "Please enable DEBUG log level to track 
delegation tokens.");
+                                 "Please enable DEBUG log level to track 
delegation tokens.");
                     }
                 }
             }
@@ -78,18 +72,19 @@ public class Utils {
         }
 
         return ugi.doAs(new PrivilegedExceptionAction<HTable>() {
-            @Override public HTable run() throws IOException {
+            @Override
+            public HTable run() throws IOException {
                 return new HTable(config, tableName);
             }
         });
     }
 
-    public static long toLong(Object obj){
+    public static long toLong(Object obj) {
         long l = 0;
-        if(obj != null){
-            if(obj instanceof Number){
-                l = ((Number)obj).longValue();
-            } else{
+        if (obj != null) {
+            if (obj instanceof Number) {
+                l = ((Number) obj).longValue();
+            } else {
                 LOG.warn("Could not coerce {} to Long", 
obj.getClass().getName());
             }
         }
@@ -97,24 +92,24 @@ public class Utils {
     }
 
     public static byte[] toBytes(Object obj) {
-        if(obj == null) {
-          return null;
-        } else if(obj instanceof String){
-            return ((String)obj).getBytes();
-        } else if (obj instanceof Integer){
+        if (obj == null) {
+            return null;
+        } else if (obj instanceof String) {
+            return ((String) obj).getBytes();
+        } else if (obj instanceof Integer) {
             return Bytes.toBytes((Integer) obj);
-        } else if (obj instanceof Long){
-            return Bytes.toBytes((Long)obj);
-        } else if (obj instanceof Short){
-            return Bytes.toBytes((Short)obj);
-        } else if (obj instanceof Float){
-            return Bytes.toBytes((Float)obj);
-        } else if (obj instanceof Double){
-            return Bytes.toBytes((Double)obj);
-        } else if (obj instanceof Boolean){
-            return Bytes.toBytes((Boolean)obj);
-        } else if (obj instanceof BigDecimal){
-            return Bytes.toBytes((BigDecimal)obj);
+        } else if (obj instanceof Long) {
+            return Bytes.toBytes((Long) obj);
+        } else if (obj instanceof Short) {
+            return Bytes.toBytes((Short) obj);
+        } else if (obj instanceof Float) {
+            return Bytes.toBytes((Float) obj);
+        } else if (obj instanceof Double) {
+            return Bytes.toBytes((Double) obj);
+        } else if (obj instanceof Boolean) {
+            return Bytes.toBytes((Boolean) obj);
+        } else if (obj instanceof BigDecimal) {
+            return Bytes.toBytes((BigDecimal) obj);
         } else {
             LOG.error("Can't convert class to byte array: " + 
obj.getClass().getName());
             return new byte[0];

http://git-wip-us.apache.org/repos/asf/storm/blob/880d14f1/external/storm-hbase/src/main/java/org/apache/storm/hbase/state/HBaseKeyValueState.java
----------------------------------------------------------------------
diff --git 
a/external/storm-hbase/src/main/java/org/apache/storm/hbase/state/HBaseKeyValueState.java
 
b/external/storm-hbase/src/main/java/org/apache/storm/hbase/state/HBaseKeyValueState.java
index dcbaf65..0da456c 100644
--- 
a/external/storm-hbase/src/main/java/org/apache/storm/hbase/state/HBaseKeyValueState.java
+++ 
b/external/storm-hbase/src/main/java/org/apache/storm/hbase/state/HBaseKeyValueState.java
@@ -20,7 +20,6 @@ package org.apache.storm.hbase.state;
 
 import com.google.common.collect.Maps;
 import com.google.common.primitives.UnsignedBytes;
-
 import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.Collections;
@@ -31,7 +30,6 @@ import java.util.NavigableMap;
 import java.util.TreeMap;
 import java.util.concurrent.ConcurrentNavigableMap;
 import java.util.concurrent.ConcurrentSkipListMap;
-
 import org.apache.hadoop.hbase.client.Delete;
 import org.apache.hadoop.hbase.client.Durability;
 import org.apache.hadoop.hbase.client.Get;
@@ -51,14 +49,11 @@ import org.slf4j.LoggerFactory;
  * A Hbase based implementation that persists the state in HBase.
  */
 public class HBaseKeyValueState<K, V> implements KeyValueState<K, V> {
-    private static final Logger LOG = 
LoggerFactory.getLogger(HBaseKeyValueState.class);
-
-    public static byte[] STATE_QUALIFIER = "s".getBytes();
     public static final int ITERATOR_CHUNK_SIZE = 1000;
-
     public static final NavigableMap<byte[], byte[]> EMPTY_PENDING_COMMIT_MAP 
= Maps.unmodifiableNavigableMap(
-            new TreeMap<byte[], 
byte[]>(UnsignedBytes.lexicographicalComparator()));
-
+        new TreeMap<byte[], 
byte[]>(UnsignedBytes.lexicographicalComparator()));
+    private static final Logger LOG = 
LoggerFactory.getLogger(HBaseKeyValueState.class);
+    public static byte[] STATE_QUALIFIER = "s".getBytes();
     private static byte[] COMMIT_TXID_KEY = "commit".getBytes();
     private static byte[] PREPARE_TXID_KEY = "prepare".getBytes();
 
@@ -79,22 +74,22 @@ public class HBaseKeyValueState<K, V> implements 
KeyValueState<K, V> {
     /**
      * Constructor.
      *
-     * @param hbaseClient HBaseClient instance
+     * @param hbaseClient  HBaseClient instance
      * @param columnFamily column family to store State
-     * @param namespace namespace
+     * @param namespace    namespace
      */
     public HBaseKeyValueState(HBaseClient hbaseClient, String columnFamily, 
String namespace) {
         this(hbaseClient, columnFamily, namespace, new 
DefaultStateSerializer<K>(),
-                new DefaultStateSerializer<V>());
+             new DefaultStateSerializer<V>());
     }
 
     /**
      * Constructor.
      *
-     * @param hbaseClient HBaseClient instance
-     * @param columnFamily column family to store State
-     * @param namespace namespace
-     * @param keySerializer key serializer
+     * @param hbaseClient     HBaseClient instance
+     * @param columnFamily    column family to store State
+     * @param namespace       namespace
+     * @param keySerializer   key serializer
      * @param valueSerializer value serializer
      */
     public HBaseKeyValueState(HBaseClient hbaseClient, String columnFamily, 
String namespace,
@@ -173,7 +168,7 @@ public class HBaseKeyValueState<K, V> implements 
KeyValueState<K, V> {
         } else {
             HBaseProjectionCriteria criteria = new HBaseProjectionCriteria();
             HBaseProjectionCriteria.ColumnMetaData column = new 
HBaseProjectionCriteria.ColumnMetaData(columnFamily,
-                    STATE_QUALIFIER);
+                                                                               
                        STATE_QUALIFIER);
             criteria.addColumn(column);
             Get get = 
hbaseClient.constructGetRequests(getRowKeyForStateKey(columnKey), criteria);
             try {
@@ -210,7 +205,8 @@ public class HBaseKeyValueState<K, V> implements 
KeyValueState<K, V> {
     @Override
     public Iterator<Map.Entry<K, V>> iterator() {
         return new HBaseKeyValueStateIterator<>(namespace, columnFamily, 
hbaseClient, pendingPrepare.entrySet().iterator(),
-                pendingCommit.entrySet().iterator(), ITERATOR_CHUNK_SIZE, 
encoder.getKeySerializer(), encoder.getValueSerializer());
+                                                
pendingCommit.entrySet().iterator(), ITERATOR_CHUNK_SIZE, 
encoder.getKeySerializer(),
+                                                encoder.getValueSerializer());
     }
 
     @Override
@@ -316,7 +312,7 @@ public class HBaseKeyValueState<K, V> implements 
KeyValueState<K, V> {
         if (committedTxid != null) {
             if (txid <= committedTxid) {
                 throw new RuntimeException("Invalid txid '" + txid + "' for 
prepare. Txid '" + committedTxid
-                        + "' is already committed");
+                                           + "' is already committed");
             }
         }
     }
@@ -375,7 +371,7 @@ public class HBaseKeyValueState<K, V> implements 
KeyValueState<K, V> {
                 mutations.add(new Delete(getRowKeyForStateKey(rowKey)));
             } else {
                 List<Mutation> mutationsForRow = 
prepareMutateRow(getRowKeyForStateKey(rowKey), columnFamily,
-                        Collections.singletonMap(STATE_QUALIFIER, value));
+                                                                  
Collections.singletonMap(STATE_QUALIFIER, value));
                 mutations.addAll(mutationsForRow);
             }
         }
@@ -402,7 +398,7 @@ public class HBaseKeyValueState<K, V> implements 
KeyValueState<K, V> {
     }
 
     private void mutateRow(byte[] rowKey, byte[] columnFamily, Map<byte[], 
byte[]> map)
-            throws Exception {
+        throws Exception {
         mutateRow(rowKey, columnFamily, map, Durability.USE_DEFAULT);
     }
 

http://git-wip-us.apache.org/repos/asf/storm/blob/880d14f1/external/storm-hbase/src/main/java/org/apache/storm/hbase/state/HBaseKeyValueStateIterator.java
----------------------------------------------------------------------
diff --git 
a/external/storm-hbase/src/main/java/org/apache/storm/hbase/state/HBaseKeyValueStateIterator.java
 
b/external/storm-hbase/src/main/java/org/apache/storm/hbase/state/HBaseKeyValueStateIterator.java
index 9b69aec..dbbd6d0 100644
--- 
a/external/storm-hbase/src/main/java/org/apache/storm/hbase/state/HBaseKeyValueStateIterator.java
+++ 
b/external/storm-hbase/src/main/java/org/apache/storm/hbase/state/HBaseKeyValueStateIterator.java
@@ -1,67 +1,57 @@
 /**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
+ * Licensed to the Apache Software Foundation (ASF) under one or more 
contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.  The 
ASF licenses this file to you under the Apache License, Version
+ * 2.0 (the "License"); you may not use this file except in compliance with 
the License.  You may obtain a copy of the License at
  *
  * http://www.apache.org/licenses/LICENSE-2.0
  *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
+ * Unless required by applicable law or agreed to in writing, software 
distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 
See the License for the specific language governing permissions
+ * and limitations under the License.
  */
 
 package org.apache.storm.hbase.state;
 
-import static org.apache.storm.hbase.state.HBaseKeyValueState.STATE_QUALIFIER;
-
 import com.google.common.primitives.UnsignedBytes;
-
 import java.util.Arrays;
 import java.util.Iterator;
 import java.util.Map;
 import java.util.TreeMap;
-
 import org.apache.hadoop.hbase.client.Result;
 import org.apache.hadoop.hbase.client.ResultScanner;
 import org.apache.storm.hbase.common.HBaseClient;
 import org.apache.storm.state.BaseBinaryStateIterator;
 import org.apache.storm.state.DefaultStateEncoder;
 import org.apache.storm.state.Serializer;
-
 import org.apache.storm.state.StateEncoder;
 
+import static org.apache.storm.hbase.state.HBaseKeyValueState.STATE_QUALIFIER;
+
 /**
  * An iterator over {@link HBaseKeyValueState}.
  */
 public class HBaseKeyValueStateIterator<K, V> extends 
BaseBinaryStateIterator<K, V> {
 
     private final byte[] keyNamespace;
-    private byte[] cursorKey;
     private final byte[] endScanKey;
     private final byte[] columnFamily;
     private final HBaseClient hbaseClient;
     private final int chunkSize;
     private final StateEncoder<K, V, byte[], byte[]> encoder;
-
+    private byte[] cursorKey;
     private Iterator<Map.Entry<byte[], byte[]>> cachedResultIterator;
 
     /**
      * Constructor.
      *
-     * @param namespace The namespace of State
-     * @param columnFamily The column family of state
-     * @param hbaseClient The instance of HBaseClient
+     * @param namespace              The namespace of State
+     * @param columnFamily           The column family of state
+     * @param hbaseClient            The instance of HBaseClient
      * @param pendingPrepareIterator The iterator of pendingPrepare
-     * @param pendingCommitIterator The iterator of pendingCommit
-     * @param chunkSize The size of chunk to get entries from HBase
-     * @param keySerializer The serializer of key
-     * @param valueSerializer The serializer of value
+     * @param pendingCommitIterator  The iterator of pendingCommit
+     * @param chunkSize              The size of chunk to get entries from 
HBase
+     * @param keySerializer          The serializer of key
+     * @param valueSerializer        The serializer of value
      */
     public HBaseKeyValueStateIterator(String namespace, byte[] columnFamily, 
HBaseClient hbaseClient,
                                       Iterator<Map.Entry<byte[], byte[]>> 
pendingPrepareIterator,

http://git-wip-us.apache.org/repos/asf/storm/blob/880d14f1/external/storm-hbase/src/main/java/org/apache/storm/hbase/state/HBaseKeyValueStateProvider.java
----------------------------------------------------------------------
diff --git 
a/external/storm-hbase/src/main/java/org/apache/storm/hbase/state/HBaseKeyValueStateProvider.java
 
b/external/storm-hbase/src/main/java/org/apache/storm/hbase/state/HBaseKeyValueStateProvider.java
index 763c387..89adae3 100644
--- 
a/external/storm-hbase/src/main/java/org/apache/storm/hbase/state/HBaseKeyValueStateProvider.java
+++ 
b/external/storm-hbase/src/main/java/org/apache/storm/hbase/state/HBaseKeyValueStateProvider.java
@@ -21,6 +21,9 @@ package org.apache.storm.hbase.state;
 import com.fasterxml.jackson.annotation.JsonAutoDetect;
 import com.fasterxml.jackson.annotation.PropertyAccessor;
 import com.fasterxml.jackson.databind.ObjectMapper;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Map;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.hbase.HBaseConfiguration;
 import org.apache.hadoop.hbase.procedure2.util.StringUtils;
@@ -34,10 +37,6 @@ import org.apache.storm.task.TopologyContext;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import java.util.Collections;
-import java.util.HashMap;
-import java.util.Map;
-
 /**
  * Provides {@link HBaseKeyValueState}.
  */
@@ -86,12 +85,12 @@ public class HBaseKeyValueStateProvider implements 
StateProvider {
         HBaseClient hbaseClient = new HBaseClient(hbaseConfMap, hbConfig, 
config.tableName);
 
         return new HBaseKeyValueState(hbaseClient, config.columnFamily, 
namespace,
-                getKeySerializer(stormConf, context, config), 
getValueSerializer(stormConf, context, config));
+                                      getKeySerializer(stormConf, context, 
config), getValueSerializer(stormConf, context, config));
     }
 
     private Configuration getHBaseConfigurationInstance(Map<String, Object> 
conf) {
         final Configuration hbConfig = HBaseConfiguration.create();
-        for(String key : conf.keySet()) {
+        for (String key : conf.keySet()) {
             hbConfig.set(key, String.valueOf(conf.get(key)));
         }
         return hbConfig;
@@ -99,11 +98,11 @@ public class HBaseKeyValueStateProvider implements 
StateProvider {
 
     private Map<String, Object> getHBaseConfigMap(Map<String, Object> 
stormConfMap, String hbaseConfigKey) {
         Map<String, Object> conf = (Map<String, Object>) 
stormConfMap.get(hbaseConfigKey);
-        if(conf == null) {
+        if (conf == null) {
             throw new IllegalArgumentException("HBase configuration not found 
using key '" + hbaseConfigKey + "'");
         }
 
-        if(conf.get("hbase.rootdir") == null) {
+        if (conf.get("hbase.rootdir") == null) {
             LOG.warn("No 'hbase.rootdir' value found in configuration! Using 
HBase defaults.");
         }
         return conf;
@@ -153,14 +152,14 @@ public class HBaseKeyValueStateProvider implements 
StateProvider {
         @Override
         public String toString() {
             return "StateConfig{" +
-                    "keyClass='" + keyClass + '\'' +
-                    ", valueClass='" + valueClass + '\'' +
-                    ", keySerializerClass='" + keySerializerClass + '\'' +
-                    ", valueSerializerClass='" + valueSerializerClass + '\'' +
-                    ", hbaseConfigKey='" + hbaseConfigKey + '\'' +
-                    ", tableName='" + tableName + '\'' +
-                    ", columnFamily='" + columnFamily + '\'' +
-                    '}';
+                   "keyClass='" + keyClass + '\'' +
+                   ", valueClass='" + valueClass + '\'' +
+                   ", keySerializerClass='" + keySerializerClass + '\'' +
+                   ", valueSerializerClass='" + valueSerializerClass + '\'' +
+                   ", hbaseConfigKey='" + hbaseConfigKey + '\'' +
+                   ", tableName='" + tableName + '\'' +
+                   ", columnFamily='" + columnFamily + '\'' +
+                   '}';
         }
     }
 }

http://git-wip-us.apache.org/repos/asf/storm/blob/880d14f1/external/storm-hbase/src/main/java/org/apache/storm/hbase/trident/mapper/SimpleTridentHBaseMapMapper.java
----------------------------------------------------------------------
diff --git 
a/external/storm-hbase/src/main/java/org/apache/storm/hbase/trident/mapper/SimpleTridentHBaseMapMapper.java
 
b/external/storm-hbase/src/main/java/org/apache/storm/hbase/trident/mapper/SimpleTridentHBaseMapMapper.java
index 2262a79..86f38d7 100644
--- 
a/external/storm-hbase/src/main/java/org/apache/storm/hbase/trident/mapper/SimpleTridentHBaseMapMapper.java
+++ 
b/external/storm-hbase/src/main/java/org/apache/storm/hbase/trident/mapper/SimpleTridentHBaseMapMapper.java
@@ -37,7 +37,7 @@ public class SimpleTridentHBaseMapMapper implements 
TridentHBaseMapMapper {
                 bos.write(String.valueOf(key).getBytes());
             }
             bos.close();
-        } catch (IOException e){
+        } catch (IOException e) {
             throw new RuntimeException("IOException creating HBase row key.", 
e);
         }
         return bos.toByteArray();

http://git-wip-us.apache.org/repos/asf/storm/blob/880d14f1/external/storm-hbase/src/main/java/org/apache/storm/hbase/trident/mapper/SimpleTridentHBaseMapper.java
----------------------------------------------------------------------
diff --git 
a/external/storm-hbase/src/main/java/org/apache/storm/hbase/trident/mapper/SimpleTridentHBaseMapper.java
 
b/external/storm-hbase/src/main/java/org/apache/storm/hbase/trident/mapper/SimpleTridentHBaseMapper.java
index eda9a32..f7ec2c7 100644
--- 
a/external/storm-hbase/src/main/java/org/apache/storm/hbase/trident/mapper/SimpleTridentHBaseMapper.java
+++ 
b/external/storm-hbase/src/main/java/org/apache/storm/hbase/trident/mapper/SimpleTridentHBaseMapper.java
@@ -1,29 +1,22 @@
 /**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
+ * Licensed to the Apache Software Foundation (ASF) under one or more 
contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.  The 
ASF licenses this file to you under the Apache License, Version
+ * 2.0 (the "License"); you may not use this file except in compliance with 
the License.  You may obtain a copy of the License at
  *
  * http://www.apache.org/licenses/LICENSE-2.0
  *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
+ * Unless required by applicable law or agreed to in writing, software 
distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 
See the License for the specific language governing permissions
+ * and limitations under the License.
  */
+
 package org.apache.storm.hbase.trident.mapper;
 
-import org.apache.storm.tuple.Fields;
-import org.apache.storm.tuple.Tuple;
-import org.apache.storm.hbase.bolt.mapper.HBaseMapper;
 import org.apache.storm.hbase.common.ColumnList;
+import org.apache.storm.trident.tuple.TridentTuple;
+import org.apache.storm.tuple.Fields;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
-import org.apache.storm.trident.tuple.TridentTuple;
 
 import static org.apache.storm.hbase.common.Utils.toBytes;
 import static org.apache.storm.hbase.common.Utils.toLong;
@@ -33,32 +26,32 @@ import static org.apache.storm.hbase.common.Utils.toLong;
  */
 public class SimpleTridentHBaseMapper implements TridentHBaseMapper {
     private static final Logger LOG = 
LoggerFactory.getLogger(SimpleTridentHBaseMapper.class);
-    
+
     private String rowKeyField;
     private byte[] columnFamily;
     private Fields columnFields;
     private Fields counterFields;
 
-    public SimpleTridentHBaseMapper(){
+    public SimpleTridentHBaseMapper() {
     }
 
 
-    public SimpleTridentHBaseMapper withRowKeyField(String rowKeyField){
+    public SimpleTridentHBaseMapper withRowKeyField(String rowKeyField) {
         this.rowKeyField = rowKeyField;
         return this;
     }
 
-    public SimpleTridentHBaseMapper withColumnFields(Fields columnFields){
+    public SimpleTridentHBaseMapper withColumnFields(Fields columnFields) {
         this.columnFields = columnFields;
         return this;
     }
 
-    public SimpleTridentHBaseMapper withCounterFields(Fields counterFields){
+    public SimpleTridentHBaseMapper withCounterFields(Fields counterFields) {
         this.counterFields = counterFields;
         return this;
     }
 
-    public SimpleTridentHBaseMapper withColumnFamily(String columnFamily){
+    public SimpleTridentHBaseMapper withColumnFamily(String columnFamily) {
         this.columnFamily = columnFamily.getBytes();
         return this;
     }
@@ -73,14 +66,14 @@ public class SimpleTridentHBaseMapper implements 
TridentHBaseMapper {
     @Override
     public ColumnList columns(TridentTuple tuple) {
         ColumnList cols = new ColumnList();
-        if(this.columnFields != null){
+        if (this.columnFields != null) {
             // TODO timestamps
-            for(String field : this.columnFields){
+            for (String field : this.columnFields) {
                 cols.addColumn(this.columnFamily, field.getBytes(), 
toBytes(tuple.getValueByField(field)));
             }
         }
-        if(this.counterFields != null){
-            for(String field : this.counterFields){
+        if (this.counterFields != null) {
+            for (String field : this.counterFields) {
                 cols.addCounter(this.columnFamily, field.getBytes(), 
toLong(tuple.getValueByField(field)));
             }
         }

http://git-wip-us.apache.org/repos/asf/storm/blob/880d14f1/external/storm-hbase/src/main/java/org/apache/storm/hbase/trident/mapper/TridentHBaseMapper.java
----------------------------------------------------------------------
diff --git 
a/external/storm-hbase/src/main/java/org/apache/storm/hbase/trident/mapper/TridentHBaseMapper.java
 
b/external/storm-hbase/src/main/java/org/apache/storm/hbase/trident/mapper/TridentHBaseMapper.java
index bb95497..dfdc2b2 100644
--- 
a/external/storm-hbase/src/main/java/org/apache/storm/hbase/trident/mapper/TridentHBaseMapper.java
+++ 
b/external/storm-hbase/src/main/java/org/apache/storm/hbase/trident/mapper/TridentHBaseMapper.java
@@ -19,31 +19,29 @@
 package org.apache.storm.hbase.trident.mapper;
 
 
-import org.apache.storm.tuple.Tuple;
+import java.io.Serializable;
 import org.apache.storm.hbase.common.ColumnList;
 import org.apache.storm.trident.tuple.TridentTuple;
 
-import java.io.Serializable;
 /**
- * Maps a <code>org.apache.storm.trident.tuple.TridentTuple</code> object
- * to a row in an HBase table.
+ * Maps a <code>org.apache.storm.trident.tuple.TridentTuple</code> object to a 
row in an HBase table.
  */
 public interface TridentHBaseMapper extends Serializable {
 
 
-        /**
-         * Given a tuple, return the HBase rowkey.
-         *
-         * @param tuple
-         * @return
-         */
-        byte[] rowKey(TridentTuple tuple);
+    /**
+     * Given a tuple, return the HBase rowkey.
+     *
+     * @param tuple
+     * @return
+     */
+    byte[] rowKey(TridentTuple tuple);
 
-        /**
-         * Given a tuple, return a list of HBase columns to insert.
-         *
-         * @param tuple
-         * @return
-         */
-        ColumnList columns(TridentTuple tuple);
+    /**
+     * Given a tuple, return a list of HBase columns to insert.
+     *
+     * @param tuple
+     * @return
+     */
+    ColumnList columns(TridentTuple tuple);
 }

Reply via email to