Fixing stylecheck problems with storm-hive

Project: http://git-wip-us.apache.org/repos/asf/storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/5fc4e9f0
Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/5fc4e9f0
Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/5fc4e9f0

Branch: refs/heads/master
Commit: 5fc4e9f0bdf7a58852f7c27a1f8049e2bb3776a5
Parents: 0e409ec
Author: Kishor Patil <[email protected]>
Authored: Sun Apr 22 23:19:45 2018 -0400
Committer: Kishor Patil <[email protected]>
Committed: Mon Apr 23 02:32:40 2018 -0400

----------------------------------------------------------------------
 external/storm-hive/pom.xml                     |   2 +-
 .../org/apache/storm/hive/bolt/HiveBolt.java    | 139 ++++-----
 .../bolt/mapper/DelimitedRecordHiveMapper.java  |  60 ++--
 .../storm/hive/bolt/mapper/HiveMapper.java      |  30 +-
 .../hive/bolt/mapper/JsonRecordHiveMapper.java  |  61 ++--
 .../apache/storm/hive/common/HiveOptions.java   |  24 +-
 .../org/apache/storm/hive/common/HiveUtils.java |  60 ++--
 .../apache/storm/hive/common/HiveWriter.java    | 255 ++++++++--------
 .../apache/storm/hive/trident/HiveState.java    | 124 ++++----
 .../storm/hive/trident/HiveStateFactory.java    |  31 +-
 .../apache/storm/hive/trident/HiveUpdater.java  |  23 +-
 .../apache/storm/hive/bolt/HiveSetupUtil.java   | 143 +++++----
 .../apache/storm/hive/bolt/TestHiveBolt.java    | 302 +++++++++----------
 .../storm/hive/common/TestHiveWriter.java       | 205 ++++++-------
 14 files changed, 683 insertions(+), 776 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/storm/blob/5fc4e9f0/external/storm-hive/pom.xml
----------------------------------------------------------------------
diff --git a/external/storm-hive/pom.xml b/external/storm-hive/pom.xml
index 4637e7f..eb790f7 100644
--- a/external/storm-hive/pom.xml
+++ b/external/storm-hive/pom.xml
@@ -202,7 +202,7 @@
         <artifactId>maven-checkstyle-plugin</artifactId>
         <!--Note - the version would be inherited-->
         <configuration>
-          <maxAllowedViolations>259</maxAllowedViolations>
+          <maxAllowedViolations>58</maxAllowedViolations>
         </configuration>
       </plugin>
     </plugins>

http://git-wip-us.apache.org/repos/asf/storm/blob/5fc4e9f0/external/storm-hive/src/main/java/org/apache/storm/hive/bolt/HiveBolt.java
----------------------------------------------------------------------
diff --git 
a/external/storm-hive/src/main/java/org/apache/storm/hive/bolt/HiveBolt.java 
b/external/storm-hive/src/main/java/org/apache/storm/hive/bolt/HiveBolt.java
index 7a76888..cfabbd6 100644
--- a/external/storm-hive/src/main/java/org/apache/storm/hive/bolt/HiveBolt.java
+++ b/external/storm-hive/src/main/java/org/apache/storm/hive/bolt/HiveBolt.java
@@ -1,59 +1,52 @@
 /**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
+ * Licensed to the Apache Software Foundation (ASF) under one or more 
contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.  The 
ASF licenses this file to you under the Apache License, Version
+ * 2.0 (the "License"); you may not use this file except in compliance with 
the License.  You may obtain a copy of the License at
  *
  * http://www.apache.org/licenses/LICENSE-2.0
  *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
+ * Unless required by applicable law or agreed to in writing, software 
distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 
See the License for the specific language governing permissions
+ * and limitations under the License.
  */
 
 package org.apache.storm.hive.bolt;
 
 import com.google.common.annotations.VisibleForTesting;
+import com.google.common.util.concurrent.ThreadFactoryBuilder;
+import java.io.IOException;
+import java.util.List;
+import java.util.Map;
+import java.util.Map.Entry;
+import java.util.Timer;
+import java.util.TimerTask;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
+import org.apache.hadoop.security.UserGroupInformation;
 import org.apache.hive.hcatalog.streaming.HiveEndPoint;
 import org.apache.hive.hcatalog.streaming.SerializationError;
 import org.apache.hive.hcatalog.streaming.StreamingException;
+import org.apache.storm.Config;
+import org.apache.storm.hive.common.HiveOptions;
+import org.apache.storm.hive.common.HiveUtils;
+import org.apache.storm.hive.common.HiveWriter;
 import org.apache.storm.task.OutputCollector;
 import org.apache.storm.task.TopologyContext;
-import org.apache.storm.tuple.Tuple;
-import org.apache.storm.topology.base.BaseRichBolt;
 import org.apache.storm.topology.OutputFieldsDeclarer;
+import org.apache.storm.topology.base.BaseRichBolt;
+import org.apache.storm.tuple.Tuple;
 import org.apache.storm.utils.BatchHelper;
 import org.apache.storm.utils.TupleUtils;
-import org.apache.storm.Config;
-import org.apache.storm.hive.common.HiveWriter;
-
-import com.google.common.util.concurrent.ThreadFactoryBuilder;
-
-import org.apache.storm.hive.common.HiveOptions;
-import org.apache.storm.hive.common.HiveUtils;
-import org.apache.hadoop.security.UserGroupInformation;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import java.util.Map;
-import java.util.concurrent.ConcurrentHashMap;
-import java.util.Timer;
-import java.util.TimerTask;
-import java.util.Map.Entry;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Executors;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.atomic.AtomicBoolean;
-import java.util.List;
-import java.io.IOException;
-
 public class HiveBolt extends BaseRichBolt {
     private static final Logger LOG = LoggerFactory.getLogger(HiveBolt.class);
+    @VisibleForTesting
+    Map<HiveEndPoint, HiveWriter> allWriters;
     private OutputCollector collector;
     private HiveOptions options;
     private ExecutorService callTimeoutPool;
@@ -63,36 +56,33 @@ public class HiveBolt extends BaseRichBolt {
     private BatchHelper batchHelper;
     private boolean tokenAuthEnabled;
 
-    @VisibleForTesting
-    Map<HiveEndPoint, HiveWriter> allWriters;
-
     public HiveBolt(HiveOptions options) {
         this.options = options;
     }
 
     @Override
-    public void prepare(Map<String, Object> conf, TopologyContext 
topologyContext, OutputCollector collector)  {
+    public void prepare(Map<String, Object> conf, TopologyContext 
topologyContext, OutputCollector collector) {
         try {
             tokenAuthEnabled = HiveUtils.isTokenAuthEnabled(conf);
             try {
                 ugi = HiveUtils.authenticate(tokenAuthEnabled, 
options.getKerberosKeytab(), options.getKerberosPrincipal());
-            } catch(HiveUtils.AuthenticationFailed ex) {
+            } catch (HiveUtils.AuthenticationFailed ex) {
                 LOG.error("Hive kerberos authentication failed " + 
ex.getMessage(), ex);
                 throw new IllegalArgumentException(ex);
             }
 
             this.collector = collector;
             this.batchHelper = new BatchHelper(options.getBatchSize(), 
collector);
-            allWriters = new ConcurrentHashMap<HiveEndPoint,HiveWriter>();
+            allWriters = new ConcurrentHashMap<HiveEndPoint, HiveWriter>();
             String timeoutName = "hive-bolt-%d";
             this.callTimeoutPool = Executors.newFixedThreadPool(1,
-                                new 
ThreadFactoryBuilder().setNameFormat(timeoutName).build());
+                                                                new 
ThreadFactoryBuilder().setNameFormat(timeoutName).build());
 
             sendHeartBeat.set(true);
             heartBeatTimer = new Timer();
             setupHeartBeatTimer();
 
-        } catch(Exception e) {
+        } catch (Exception e) {
             LOG.warn("unable to make connection to hive ", e);
         }
     }
@@ -108,7 +98,7 @@ public class HiveBolt extends BaseRichBolt {
                 batchHelper.addBatch(tuple);
             }
 
-            if(batchHelper.shouldFlush()) {
+            if (batchHelper.shouldFlush()) {
                 flushAllWriters(true);
                 LOG.info("acknowledging tuples after writers flushed ");
                 batchHelper.ack();
@@ -116,11 +106,11 @@ public class HiveBolt extends BaseRichBolt {
             if (TupleUtils.isTick(tuple)) {
                 retireIdleWriters();
             }
-        } catch(SerializationError se) {
+        } catch (SerializationError se) {
             LOG.info("Serialization exception occurred, tuple is acknowledged 
but not written to Hive.", tuple);
             this.collector.reportError(se);
             collector.ack(tuple);
-        } catch(Exception e) {
+        } catch (Exception e) {
             batchHelper.fail(e);
             abortAndCloseWriters();
         }
@@ -147,13 +137,13 @@ public class HiveBolt extends BaseRichBolt {
             }
         }
 
-        ExecutorService toShutdown[] = {callTimeoutPool};
+        ExecutorService toShutdown[] = { callTimeoutPool };
         for (ExecutorService execService : toShutdown) {
             execService.shutdown();
             try {
                 while (!execService.isTerminated()) {
                     execService.awaitTermination(
-                                 options.getCallTimeOut(), 
TimeUnit.MILLISECONDS);
+                        options.getCallTimeOut(), TimeUnit.MILLISECONDS);
                 }
             } catch (InterruptedException ex) {
                 LOG.warn("shutdown interrupted on " + execService, ex);
@@ -168,31 +158,33 @@ public class HiveBolt extends BaseRichBolt {
     @Override
     public Map<String, Object> getComponentConfiguration() {
         Map<String, Object> conf = super.getComponentConfiguration();
-        if (conf == null)
+        if (conf == null) {
             conf = new Config();
+        }
 
-        if (options.getTickTupleInterval() > 0)
+        if (options.getTickTupleInterval() > 0) {
             conf.put(Config.TOPOLOGY_TICK_TUPLE_FREQ_SECS, 
options.getTickTupleInterval());
+        }
 
         return conf;
     }
 
     private void setupHeartBeatTimer() {
-        if(options.getHeartBeatInterval()>0) {
+        if (options.getHeartBeatInterval() > 0) {
             heartBeatTimer.schedule(new TimerTask() {
-                    @Override
-                    public void run() {
-                        try {
-                            if (sendHeartBeat.get()) {
-                                LOG.debug("Start sending heartbeat on all 
writers");
-                                sendHeartBeatOnAllWriters();
-                                setupHeartBeatTimer();
-                            }
-                        } catch (Exception e) {
-                            LOG.warn("Failed to heartbeat on HiveWriter ", e);
+                @Override
+                public void run() {
+                    try {
+                        if (sendHeartBeat.get()) {
+                            LOG.debug("Start sending heartbeat on all 
writers");
+                            sendHeartBeatOnAllWriters();
+                            setupHeartBeatTimer();
                         }
+                    } catch (Exception e) {
+                        LOG.warn("Failed to heartbeat on HiveWriter ", e);
                     }
-                }, options.getHeartBeatInterval() * 1000);
+                }
+            }, options.getHeartBeatInterval() * 1000);
         }
     }
 
@@ -204,7 +196,7 @@ public class HiveBolt extends BaseRichBolt {
 
     void flushAllWriters(boolean rollToNext)
         throws HiveWriter.CommitFailure, HiveWriter.TxnBatchFailure, 
HiveWriter.TxnFailure, InterruptedException {
-        for(HiveWriter writer: allWriters.values()) {
+        for (HiveWriter writer : allWriters.values()) {
             writer.flush(rollToNext);
         }
     }
@@ -213,7 +205,7 @@ public class HiveBolt extends BaseRichBolt {
         try {
             abortAllWriters();
             closeAllWriters();
-        }  catch(Exception ie) {
+        } catch (Exception ie) {
             LOG.warn("unable to close hive connections. ", ie);
         }
     }
@@ -222,11 +214,11 @@ public class HiveBolt extends BaseRichBolt {
      * Abort current Txn on all writers
      */
     private void abortAllWriters() throws InterruptedException, 
StreamingException, HiveWriter.TxnBatchFailure {
-        for (Entry<HiveEndPoint,HiveWriter> entry : allWriters.entrySet()) {
+        for (Entry<HiveEndPoint, HiveWriter> entry : allWriters.entrySet()) {
             try {
                 entry.getValue().abort();
             } catch (Exception e) {
-                LOG.error("Failed to abort hive transaction batch, 
HiveEndPoint " + entry.getValue() +" due to exception ", e);
+                LOG.error("Failed to abort hive transaction batch, 
HiveEndPoint " + entry.getValue() + " due to exception ", e);
             }
         }
     }
@@ -236,10 +228,10 @@ public class HiveBolt extends BaseRichBolt {
      */
     private void closeAllWriters() {
         //1) Retire writers
-        for (Entry<HiveEndPoint,HiveWriter> entry : allWriters.entrySet()) {
+        for (Entry<HiveEndPoint, HiveWriter> entry : allWriters.entrySet()) {
             try {
                 entry.getValue().close();
-            } catch(Exception e) {
+            } catch (Exception e) {
                 LOG.warn("unable to close writers. ", e);
             }
         }
@@ -251,14 +243,15 @@ public class HiveBolt extends BaseRichBolt {
     HiveWriter getOrCreateWriter(HiveEndPoint endPoint)
         throws HiveWriter.ConnectFailure, InterruptedException {
         try {
-            HiveWriter writer = allWriters.get( endPoint );
+            HiveWriter writer = allWriters.get(endPoint);
             if (writer == null) {
                 LOG.debug("Creating Writer to Hive end point : " + endPoint);
                 writer = HiveUtils.makeHiveWriter(endPoint, callTimeoutPool, 
ugi, options, tokenAuthEnabled);
                 if (allWriters.size() > (options.getMaxOpenConnections() - 1)) 
{
-                    LOG.info("cached HiveEndPoint size {} exceeded 
maxOpenConnections {} ", allWriters.size(), options.getMaxOpenConnections());
+                    LOG.info("cached HiveEndPoint size {} exceeded 
maxOpenConnections {} ", allWriters.size(),
+                             options.getMaxOpenConnections());
                     int retired = retireIdleWriters();
-                    if(retired==0) {
+                    if (retired == 0) {
                         retireEldestWriter();
                     }
                 }
@@ -279,7 +272,7 @@ public class HiveBolt extends BaseRichBolt {
         LOG.info("Attempting close eldest writers");
         long oldestTimeStamp = System.currentTimeMillis();
         HiveEndPoint eldest = null;
-        for (Entry<HiveEndPoint,HiveWriter> entry : allWriters.entrySet()) {
+        for (Entry<HiveEndPoint, HiveWriter> entry : allWriters.entrySet()) {
             if (entry.getValue().getLastUsed() < oldestTimeStamp) {
                 eldest = entry.getKey();
                 oldestTimeStamp = entry.getValue().getLastUsed();
@@ -308,8 +301,8 @@ public class HiveBolt extends BaseRichBolt {
         long now = System.currentTimeMillis();
 
         //1) Find retirement candidates
-        for (Entry<HiveEndPoint,HiveWriter> entry : allWriters.entrySet()) {
-            if(now - entry.getValue().getLastUsed() > 
options.getIdleTimeout()) {
+        for (Entry<HiveEndPoint, HiveWriter> entry : allWriters.entrySet()) {
+            if (now - entry.getValue().getLastUsed() > 
options.getIdleTimeout()) {
                 ++count;
                 retire(entry.getKey());
             }

http://git-wip-us.apache.org/repos/asf/storm/blob/5fc4e9f0/external/storm-hive/src/main/java/org/apache/storm/hive/bolt/mapper/DelimitedRecordHiveMapper.java
----------------------------------------------------------------------
diff --git 
a/external/storm-hive/src/main/java/org/apache/storm/hive/bolt/mapper/DelimitedRecordHiveMapper.java
 
b/external/storm-hive/src/main/java/org/apache/storm/hive/bolt/mapper/DelimitedRecordHiveMapper.java
index 48cd5ff..d67ec67 100644
--- 
a/external/storm-hive/src/main/java/org/apache/storm/hive/bolt/mapper/DelimitedRecordHiveMapper.java
+++ 
b/external/storm-hive/src/main/java/org/apache/storm/hive/bolt/mapper/DelimitedRecordHiveMapper.java
@@ -1,40 +1,34 @@
 /**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
+ * Licensed to the Apache Software Foundation (ASF) under one or more 
contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.  The 
ASF licenses this file to you under the Apache License, Version
+ * 2.0 (the "License"); you may not use this file except in compliance with 
the License.  You may obtain a copy of the License at
  *
  * http://www.apache.org/licenses/LICENSE-2.0
  *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
+ * Unless required by applicable law or agreed to in writing, software 
distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 
See the License for the specific language governing permissions
+ * and limitations under the License.
  */
+
 package org.apache.storm.hive.bolt.mapper;
 
 
 import com.google.common.annotations.VisibleForTesting;
-import org.apache.storm.tuple.Fields;
-import org.apache.storm.tuple.Tuple;
-import org.apache.storm.trident.tuple.TridentTuple;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
+import java.io.IOException;
+import java.text.SimpleDateFormat;
+import java.util.ArrayList;
+import java.util.Date;
+import java.util.List;
 import org.apache.hive.hcatalog.streaming.DelimitedInputWriter;
 import org.apache.hive.hcatalog.streaming.HiveEndPoint;
 import org.apache.hive.hcatalog.streaming.RecordWriter;
 import org.apache.hive.hcatalog.streaming.StreamingException;
 import org.apache.hive.hcatalog.streaming.TransactionBatch;
-
-import java.util.List;
-import java.util.ArrayList;
-import java.util.Date;
-import java.text.SimpleDateFormat;
-import java.io.IOException;
+import org.apache.storm.trident.tuple.TridentTuple;
+import org.apache.storm.tuple.Fields;
+import org.apache.storm.tuple.Tuple;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 public class DelimitedRecordHiveMapper implements HiveMapper {
     private static final Logger LOG = 
LoggerFactory.getLogger(DelimitedRecordHiveMapper.class);
@@ -62,7 +56,7 @@ public class DelimitedRecordHiveMapper implements HiveMapper {
         return this;
     }
 
-    public DelimitedRecordHiveMapper withFieldDelimiter(String delimiter){
+    public DelimitedRecordHiveMapper withFieldDelimiter(String delimiter) {
         this.fieldDelimiter = delimiter;
         return this;
     }
@@ -76,7 +70,7 @@ public class DelimitedRecordHiveMapper implements HiveMapper {
     @Override
     public RecordWriter createRecordWriter(HiveEndPoint endPoint)
         throws StreamingException, IOException, ClassNotFoundException {
-        return new DelimitedInputWriter(columnNames, fieldDelimiter,endPoint);
+        return new DelimitedInputWriter(columnNames, fieldDelimiter, endPoint);
     }
 
     @Override
@@ -88,8 +82,8 @@ public class DelimitedRecordHiveMapper implements HiveMapper {
     @Override
     public List<String> mapPartitions(Tuple tuple) {
         List<String> partitionList = new ArrayList<String>();
-        if(this.partitionFields != null) {
-            for(String field: this.partitionFields) {
+        if (this.partitionFields != null) {
+            for (String field : this.partitionFields) {
                 partitionList.add(tuple.getStringByField(field));
             }
         }
@@ -102,8 +96,8 @@ public class DelimitedRecordHiveMapper implements HiveMapper 
{
     @Override
     public byte[] mapRecord(Tuple tuple) {
         StringBuilder builder = new StringBuilder();
-        if(this.columnFields != null) {
-            for(String field: this.columnFields) {
+        if (this.columnFields != null) {
+            for (String field : this.columnFields) {
                 builder.append(tuple.getValueByField(field));
                 builder.append(fieldDelimiter);
             }
@@ -114,8 +108,8 @@ public class DelimitedRecordHiveMapper implements 
HiveMapper {
     @Override
     public List<String> mapPartitions(TridentTuple tuple) {
         List<String> partitionList = new ArrayList<String>();
-        if(this.partitionFields != null) {
-            for(String field: this.partitionFields) {
+        if (this.partitionFields != null) {
+            for (String field : this.partitionFields) {
                 partitionList.add(tuple.getStringByField(field));
             }
         }
@@ -128,8 +122,8 @@ public class DelimitedRecordHiveMapper implements 
HiveMapper {
     @Override
     public byte[] mapRecord(TridentTuple tuple) {
         StringBuilder builder = new StringBuilder();
-        if(this.columnFields != null) {
-            for(String field: this.columnFields) {
+        if (this.columnFields != null) {
+            for (String field : this.columnFields) {
                 builder.append(tuple.getValueByField(field));
                 builder.append(fieldDelimiter);
             }

http://git-wip-us.apache.org/repos/asf/storm/blob/5fc4e9f0/external/storm-hive/src/main/java/org/apache/storm/hive/bolt/mapper/HiveMapper.java
----------------------------------------------------------------------
diff --git 
a/external/storm-hive/src/main/java/org/apache/storm/hive/bolt/mapper/HiveMapper.java
 
b/external/storm-hive/src/main/java/org/apache/storm/hive/bolt/mapper/HiveMapper.java
index a7a79fb..cc29392 100644
--- 
a/external/storm-hive/src/main/java/org/apache/storm/hive/bolt/mapper/HiveMapper.java
+++ 
b/external/storm-hive/src/main/java/org/apache/storm/hive/bolt/mapper/HiveMapper.java
@@ -1,33 +1,27 @@
 /**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
+ * Licensed to the Apache Software Foundation (ASF) under one or more 
contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.  The 
ASF licenses this file to you under the Apache License, Version
+ * 2.0 (the "License"); you may not use this file except in compliance with 
the License.  You may obtain a copy of the License at
  *
  * http://www.apache.org/licenses/LICENSE-2.0
  *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
+ * Unless required by applicable law or agreed to in writing, software 
distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 
See the License for the specific language governing permissions
+ * and limitations under the License.
  */
+
 package org.apache.storm.hive.bolt.mapper;
 
 
-import org.apache.storm.tuple.Tuple;
-import org.apache.storm.trident.tuple.TridentTuple;
+import java.io.IOException;
+import java.io.Serializable;
 import java.util.List;
 import org.apache.hive.hcatalog.streaming.HiveEndPoint;
 import org.apache.hive.hcatalog.streaming.RecordWriter;
-import org.apache.hive.hcatalog.streaming.TransactionBatch;
 import org.apache.hive.hcatalog.streaming.StreamingException;
-import java.io.Serializable;
-
-import java.io.IOException;
+import org.apache.hive.hcatalog.streaming.TransactionBatch;
+import org.apache.storm.trident.tuple.TridentTuple;
+import org.apache.storm.tuple.Tuple;
 
 /**
  * Maps a <code>org.apache.storm.tuple.Tupe</code> object

http://git-wip-us.apache.org/repos/asf/storm/blob/5fc4e9f0/external/storm-hive/src/main/java/org/apache/storm/hive/bolt/mapper/JsonRecordHiveMapper.java
----------------------------------------------------------------------
diff --git 
a/external/storm-hive/src/main/java/org/apache/storm/hive/bolt/mapper/JsonRecordHiveMapper.java
 
b/external/storm-hive/src/main/java/org/apache/storm/hive/bolt/mapper/JsonRecordHiveMapper.java
index 0028b87..3a43b7c 100644
--- 
a/external/storm-hive/src/main/java/org/apache/storm/hive/bolt/mapper/JsonRecordHiveMapper.java
+++ 
b/external/storm-hive/src/main/java/org/apache/storm/hive/bolt/mapper/JsonRecordHiveMapper.java
@@ -1,41 +1,34 @@
 /**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
+ * Licensed to the Apache Software Foundation (ASF) under one or more 
contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.  The 
ASF licenses this file to you under the Apache License, Version
+ * 2.0 (the "License"); you may not use this file except in compliance with 
the License.  You may obtain a copy of the License at
  *
  * http://www.apache.org/licenses/LICENSE-2.0
  *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
+ * Unless required by applicable law or agreed to in writing, software 
distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 
See the License for the specific language governing permissions
+ * and limitations under the License.
  */
-package org.apache.storm.hive.bolt.mapper;
 
+package org.apache.storm.hive.bolt.mapper;
 
-import org.apache.storm.tuple.Fields;
-import org.apache.storm.trident.tuple.TridentTuple;
-import org.apache.storm.tuple.Tuple;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
 
+import java.io.IOException;
+import java.text.SimpleDateFormat;
+import java.util.ArrayList;
+import java.util.Date;
+import java.util.List;
 import org.apache.hive.hcatalog.streaming.HiveEndPoint;
 import org.apache.hive.hcatalog.streaming.RecordWriter;
 import org.apache.hive.hcatalog.streaming.StreamingException;
 import org.apache.hive.hcatalog.streaming.StrictJsonWriter;
 import org.apache.hive.hcatalog.streaming.TransactionBatch;
+import org.apache.storm.trident.tuple.TridentTuple;
+import org.apache.storm.tuple.Fields;
+import org.apache.storm.tuple.Tuple;
 import org.json.simple.JSONObject;
-
-import java.util.List;
-import java.util.ArrayList;
-import java.util.Date;
-import java.text.SimpleDateFormat;
-import java.io.IOException;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 public class JsonRecordHiveMapper implements HiveMapper {
     private static final Logger LOG = 
LoggerFactory.getLogger(JsonRecordHiveMapper.class);
@@ -78,8 +71,8 @@ public class JsonRecordHiveMapper implements HiveMapper {
     @Override
     public List<String> mapPartitions(Tuple tuple) {
         List<String> partitionList = new ArrayList<String>();
-        if(this.partitionFields != null) {
-            for(String field: this.partitionFields) {
+        if (this.partitionFields != null) {
+            for (String field : this.partitionFields) {
                 partitionList.add(tuple.getStringByField(field));
             }
         }
@@ -92,9 +85,9 @@ public class JsonRecordHiveMapper implements HiveMapper {
     @Override
     public byte[] mapRecord(Tuple tuple) {
         JSONObject obj = new JSONObject();
-        if(this.columnFields != null) {
-            for(String field: this.columnFields) {
-                obj.put(field,tuple.getValueByField(field));
+        if (this.columnFields != null) {
+            for (String field : this.columnFields) {
+                obj.put(field, tuple.getValueByField(field));
             }
         }
         return obj.toJSONString().getBytes();
@@ -103,8 +96,8 @@ public class JsonRecordHiveMapper implements HiveMapper {
     @Override
     public List<String> mapPartitions(TridentTuple tuple) {
         List<String> partitionList = new ArrayList<String>();
-        if(this.partitionFields != null) {
-            for(String field: this.partitionFields) {
+        if (this.partitionFields != null) {
+            for (String field : this.partitionFields) {
                 partitionList.add(tuple.getStringByField(field));
             }
         }
@@ -117,9 +110,9 @@ public class JsonRecordHiveMapper implements HiveMapper {
     @Override
     public byte[] mapRecord(TridentTuple tuple) {
         JSONObject obj = new JSONObject();
-        if(this.columnFields != null) {
-            for(String field: this.columnFields) {
-                obj.put(field,tuple.getValueByField(field));
+        if (this.columnFields != null) {
+            for (String field : this.columnFields) {
+                obj.put(field, tuple.getValueByField(field));
             }
         }
         return obj.toJSONString().getBytes();

http://git-wip-us.apache.org/repos/asf/storm/blob/5fc4e9f0/external/storm-hive/src/main/java/org/apache/storm/hive/common/HiveOptions.java
----------------------------------------------------------------------
diff --git 
a/external/storm-hive/src/main/java/org/apache/storm/hive/common/HiveOptions.java
 
b/external/storm-hive/src/main/java/org/apache/storm/hive/common/HiveOptions.java
index e80f5d7..4a91da1 100644
--- 
a/external/storm-hive/src/main/java/org/apache/storm/hive/common/HiveOptions.java
+++ 
b/external/storm-hive/src/main/java/org/apache/storm/hive/common/HiveOptions.java
@@ -1,25 +1,18 @@
 /**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
+ * Licensed to the Apache Software Foundation (ASF) under one or more 
contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.  The 
ASF licenses this file to you under the Apache License, Version
+ * 2.0 (the "License"); you may not use this file except in compliance with 
the License.  You may obtain a copy of the License at
  *
  * http://www.apache.org/licenses/LICENSE-2.0
  *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
+ * Unless required by applicable law or agreed to in writing, software 
distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 
See the License for the specific language governing permissions
+ * and limitations under the License.
  */
 
 package org.apache.storm.hive.common;
 
 import java.io.Serializable;
-
 import org.apache.storm.hive.bolt.mapper.HiveMapper;
 
 public class HiveOptions implements Serializable {
@@ -43,15 +36,14 @@ public class HiveOptions implements Serializable {
     protected String kerberosKeytab;
     protected Integer tickTupleInterval = DEFAULT_TICK_TUPLE_INTERVAL_SECS;
 
-    public HiveOptions(String metaStoreURI,String databaseName,String 
tableName,HiveMapper mapper) {
+    public HiveOptions(String metaStoreURI, String databaseName, String 
tableName, HiveMapper mapper) {
         this.metaStoreURI = metaStoreURI;
         this.databaseName = databaseName;
         this.tableName = tableName;
         this.mapper = mapper;
     }
 
-    public HiveOptions withTickTupleInterval(Integer tickInterval)
-    {
+    public HiveOptions withTickTupleInterval(Integer tickInterval) {
         this.tickTupleInterval = tickInterval;
         return this;
     }

http://git-wip-us.apache.org/repos/asf/storm/blob/5fc4e9f0/external/storm-hive/src/main/java/org/apache/storm/hive/common/HiveUtils.java
----------------------------------------------------------------------
diff --git 
a/external/storm-hive/src/main/java/org/apache/storm/hive/common/HiveUtils.java 
b/external/storm-hive/src/main/java/org/apache/storm/hive/common/HiveUtils.java
index 26beee0..7681f91 100644
--- 
a/external/storm-hive/src/main/java/org/apache/storm/hive/common/HiveUtils.java
+++ 
b/external/storm-hive/src/main/java/org/apache/storm/hive/common/HiveUtils.java
@@ -1,23 +1,22 @@
 /**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
+ * Licensed to the Apache Software Foundation (ASF) under one or more 
contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.  The 
ASF licenses this file to you under the Apache License, Version
+ * 2.0 (the "License"); you may not use this file except in compliance with 
the License.  You may obtain a copy of the License at
  * <p>
  * http://www.apache.org/licenses/LICENSE-2.0
  * <p>
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
+ * Unless required by applicable law or agreed to in writing, software 
distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 
See the License for the specific language governing permissions
+ * and limitations under the License.
  */
 
 package org.apache.storm.hive.common;
 
+import java.io.File;
+import java.io.IOException;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.ExecutorService;
 import org.apache.hadoop.security.SecurityUtil;
 import org.apache.hadoop.security.UserGroupInformation;
 import org.apache.hive.hcatalog.streaming.ConnectionError;
@@ -26,12 +25,6 @@ import org.apache.storm.hive.security.AutoHive;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import java.io.File;
-import java.io.IOException;
-import java.util.List;
-import java.util.Map;
-import java.util.concurrent.ExecutorService;
-
 import static org.apache.storm.Config.TOPOLOGY_AUTO_CREDENTIALS;
 
 public class HiveUtils {
@@ -44,16 +37,19 @@ public class HiveUtils {
         return new HiveEndPoint(options.getMetaStoreURI(), 
options.getDatabaseName(), options.getTableName(), partitionVals);
     }
 
-    public static HiveWriter makeHiveWriter(HiveEndPoint endPoint, 
ExecutorService callTimeoutPool, UserGroupInformation ugi, HiveOptions options, 
boolean tokenAuthEnabled)
-            throws HiveWriter.ConnectFailure, InterruptedException {
+    public static HiveWriter makeHiveWriter(HiveEndPoint endPoint, 
ExecutorService callTimeoutPool, UserGroupInformation ugi,
+                                            HiveOptions options, boolean 
tokenAuthEnabled)
+        throws HiveWriter.ConnectFailure, InterruptedException {
         return new HiveWriter(endPoint, options.getTxnsPerBatch(), 
options.getAutoCreatePartitions(),
-                options.getCallTimeOut(), callTimeoutPool, 
options.getMapper(), ugi, tokenAuthEnabled);
+                              options.getCallTimeOut(), callTimeoutPool, 
options.getMapper(), ugi, tokenAuthEnabled);
     }
 
-    public static synchronized UserGroupInformation authenticate(boolean 
isTokenAuthEnabled, String keytab, String principal) throws 
AuthenticationFailed {
+    public static synchronized UserGroupInformation authenticate(boolean 
isTokenAuthEnabled, String keytab, String principal) throws
+        AuthenticationFailed {
 
-        if (isTokenAuthEnabled)
+        if (isTokenAuthEnabled) {
             return getCurrentUser(principal);
+        }
 
         boolean kerberosEnabled = false;
 
@@ -70,7 +66,7 @@ public class HiveUtils {
 
             if (!(kfile.isFile() && kfile.canRead())) {
                 throw new IllegalArgumentException("The keyTab file: " + 
keytab + " is nonexistent or can't read. "
-                        + "Please specify a readable keytab file for Kerberos 
auth.");
+                                                   + "Please specify a 
readable keytab file for Kerberos auth.");
             }
 
             try {
@@ -91,12 +87,6 @@ public class HiveUtils {
 
     }
 
-    public static class AuthenticationFailed extends Exception {
-        public AuthenticationFailed(String reason, Exception cause) {
-            super("Kerberos Authentication Failed. " + reason, cause);
-        }
-    }
-
     public static void logAllHiveEndPoints(Map<HiveEndPoint, HiveWriter> 
allWriters) {
         for (Map.Entry<HiveEndPoint, HiveWriter> entry : 
allWriters.entrySet()) {
             LOG.info("cached writers {} ", entry.getValue());
@@ -104,10 +94,10 @@ public class HiveUtils {
     }
 
     public static boolean isTokenAuthEnabled(Map<String, Object> conf) {
-        return conf.get(TOPOLOGY_AUTO_CREDENTIALS) != null && (((List) 
conf.get(TOPOLOGY_AUTO_CREDENTIALS)).contains(AutoHive.class.getName()));
+        return conf.get(TOPOLOGY_AUTO_CREDENTIALS) != null &&
+               (((List) 
conf.get(TOPOLOGY_AUTO_CREDENTIALS)).contains(AutoHive.class.getName()));
     }
 
-
     private static UserGroupInformation getCurrentUser(String principal) 
throws AuthenticationFailed {
         try {
             return UserGroupInformation.getCurrentUser();
@@ -115,4 +105,10 @@ public class HiveUtils {
             throw new AuthenticationFailed("Login failed for principal " + 
principal, e);
         }
     }
+
+    public static class AuthenticationFailed extends Exception {
+        public AuthenticationFailed(String reason, Exception cause) {
+            super("Kerberos Authentication Failed. " + reason, cause);
+        }
+    }
 }

http://git-wip-us.apache.org/repos/asf/storm/blob/5fc4e9f0/external/storm-hive/src/main/java/org/apache/storm/hive/common/HiveWriter.java
----------------------------------------------------------------------
diff --git 
a/external/storm-hive/src/main/java/org/apache/storm/hive/common/HiveWriter.java
 
b/external/storm-hive/src/main/java/org/apache/storm/hive/common/HiveWriter.java
index e9a30fe..bc0ee75 100644
--- 
a/external/storm-hive/src/main/java/org/apache/storm/hive/common/HiveWriter.java
+++ 
b/external/storm-hive/src/main/java/org/apache/storm/hive/common/HiveWriter.java
@@ -1,23 +1,18 @@
 /**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
+ * Licensed to the Apache Software Foundation (ASF) under one or more 
contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.  The 
ASF licenses this file to you under the Apache License, Version
+ * 2.0 (the "License"); you may not use this file except in compliance with 
the License.  You may obtain a copy of the License at
  *
- *     http://www.apache.org/licenses/LICENSE-2.0
+ * http://www.apache.org/licenses/LICENSE-2.0
  *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
+ * Unless required by applicable law or agreed to in writing, software 
distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 
See the License for the specific language governing permissions
+ * and limitations under the License.
  */
 
 package org.apache.storm.hive.common;
 
+import com.google.common.annotations.VisibleForTesting;
 import java.io.IOException;
 import java.security.PrivilegedExceptionAction;
 import java.util.concurrent.Callable;
@@ -26,14 +21,17 @@ import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Future;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.TimeoutException;
-
-import com.google.common.annotations.VisibleForTesting;
 import org.apache.hadoop.hive.conf.HiveConf;
 import org.apache.hadoop.security.UserGroupInformation;
-import org.apache.hive.hcatalog.streaming.*;
+import org.apache.hive.hcatalog.streaming.HiveEndPoint;
+import org.apache.hive.hcatalog.streaming.RecordWriter;
+import org.apache.hive.hcatalog.streaming.SerializationError;
+import org.apache.hive.hcatalog.streaming.StreamingConnection;
+import org.apache.hive.hcatalog.streaming.StreamingException;
+import org.apache.hive.hcatalog.streaming.StreamingIOFailure;
+import org.apache.hive.hcatalog.streaming.TransactionBatch;
 import org.apache.storm.hive.bolt.mapper.HiveMapper;
 import org.apache.storm.tuple.Tuple;
-
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -50,9 +48,9 @@ public class HiveWriter {
     private final ExecutorService callTimeoutPool;
     private final long callTimeout;
     private final Object txnBatchLock = new Object();
+    protected boolean closed; // flag indicating HiveWriter was closed
     private TransactionBatch txnBatch;
     private long lastUsed; // time of last flush on this writer
-    protected boolean closed; // flag indicating HiveWriter was closed
     private boolean autoCreatePartitions;
     private UserGroupInformation ugi;
     private int totalRecords = 0;
@@ -74,37 +72,52 @@ public class HiveWriter {
             this.txnBatch = nextTxnBatch(recordWriter);
             this.closed = false;
             this.lastUsed = System.currentTimeMillis();
-        } catch(InterruptedException e) {
+        } catch (InterruptedException e) {
             throw e;
-        } catch(RuntimeException e) {
+        } catch (RuntimeException e) {
             throw e;
-        } catch(Exception e) {
+        } catch (Exception e) {
             throw new ConnectFailure(endPoint, e);
         }
     }
 
-    public RecordWriter getRecordWriter(final HiveMapper mapper, final boolean 
tokenAuthEnabled) throws  Exception {
-        if (!tokenAuthEnabled)
-          return mapper.createRecordWriter(endPoint);
+    /**
+     * If the current thread has been interrupted, then throws an
+     * exception.
+     * @throws InterruptedException
+     */
+    private static void checkAndThrowInterruptedException()
+        throws InterruptedException {
+        if (Thread.currentThread().interrupted()) {
+            throw new InterruptedException("Timed out before Hive call was 
made. "
+                                           + "Your callTimeout might be set 
too low or Hive calls are "
+                                           + "taking too long.");
+        }
+    }
+
+    public RecordWriter getRecordWriter(final HiveMapper mapper, final boolean 
tokenAuthEnabled) throws Exception {
+        if (!tokenAuthEnabled) {
+            return mapper.createRecordWriter(endPoint);
+        }
 
         try {
-            return ugi.doAs (
-                    new PrivilegedExceptionAction<RecordWriter>() {
-                        @Override
-                        public RecordWriter run() throws StreamingException, 
IOException, ClassNotFoundException {
-                            return mapper.createRecordWriter(endPoint);
-                        }
+            return ugi.doAs(
+                new PrivilegedExceptionAction<RecordWriter>() {
+                    @Override
+                    public RecordWriter run() throws StreamingException, 
IOException, ClassNotFoundException {
+                        return mapper.createRecordWriter(endPoint);
                     }
+                }
             );
         } catch (Exception e) {
             throw new ConnectFailure(endPoint, e);
         }
     }
 
-
-    private  HiveConf createHiveConf(String metaStoreURI, boolean 
tokenAuthEnabled)  {
-        if (!tokenAuthEnabled)
+    private HiveConf createHiveConf(String metaStoreURI, boolean 
tokenAuthEnabled) {
+        if (!tokenAuthEnabled) {
             return null;
+        }
 
         HiveConf hcatConf = new HiveConf();
         hcatConf.setVar(HiveConf.ConfVars.METASTOREURIS, metaStoreURI);
@@ -114,9 +127,9 @@ public class HiveWriter {
 
     @Override
     public String toString() {
-          return "{ "
-              + "endPoint = " + endPoint.toString()
-              + ", TransactionBatch = " + txnBatch.toString() + " }";
+        return "{ "
+               + "endPoint = " + endPoint.toString()
+               + ", TransactionBatch = " + txnBatch.toString() + " }";
     }
 
     /**
@@ -135,18 +148,18 @@ public class HiveWriter {
         try {
             LOG.debug("Writing event to {}", endPoint);
             callWithTimeout(new CallRunner<Void>() {
-                    @Override
-                    public Void call() throws StreamingException, 
InterruptedException {
-                        txnBatch.write(record);
-                        totalRecords++;
-                        return null;
-                    }
-                });
-        } catch(SerializationError se) {
+                @Override
+                public Void call() throws StreamingException, 
InterruptedException {
+                    txnBatch.write(record);
+                    totalRecords++;
+                    return null;
+                }
+            });
+        } catch (SerializationError se) {
             throw new SerializationError(endPoint.toString() + " 
SerializationError", se);
-        } catch(StreamingException e) {
+        } catch (StreamingException e) {
             throw new WriteFailure(endPoint, txnBatch.getCurrentTxnId(), e);
-        } catch(TimeoutException e) {
+        } catch (TimeoutException e) {
             throw new WriteFailure(endPoint, txnBatch.getCurrentTxnId(), e);
         }
     }
@@ -161,13 +174,13 @@ public class HiveWriter {
         // if there are no records do not call flush
         if (totalRecords <= 0) return;
         try {
-            synchronized(txnBatchLock) {
+            synchronized (txnBatchLock) {
                 commitTxn();
                 nextTxn(rollToNext);
                 totalRecords = 0;
                 lastUsed = System.currentTimeMillis();
             }
-        } catch(StreamingException e) {
+        } catch (StreamingException e) {
             throw new TxnFailure(txnBatch, e);
         }
     }
@@ -177,24 +190,24 @@ public class HiveWriter {
      */
     public void heartBeat() throws InterruptedException {
         // 1) schedule the heartbeat on one thread in pool
-        synchronized(txnBatchLock) {
+        synchronized (txnBatchLock) {
             try {
                 callWithTimeout(new CallRunner<Void>() {
-                        @Override
-                        public Void call() throws Exception {
-                            try {
-                                LOG.info("Sending heartbeat on batch " + 
txnBatch);
-                                txnBatch.heartbeat();
-                            } catch (StreamingException e) {
-                                LOG.warn("Heartbeat error on batch " + 
txnBatch, e);
-                            }
-                            return null;
+                    @Override
+                    public Void call() throws Exception {
+                        try {
+                            LOG.info("Sending heartbeat on batch " + txnBatch);
+                            txnBatch.heartbeat();
+                        } catch (StreamingException e) {
+                            LOG.warn("Heartbeat error on batch " + txnBatch, 
e);
                         }
-                    });
+                        return null;
+                    }
+                });
             } catch (InterruptedException e) {
                 throw e;
             } catch (Exception e) {
-                LOG.warn("Unable to send heartbeat on Txn Batch " + txnBatch,  
e);
+                LOG.warn("Unable to send heartbeat on Txn Batch " + txnBatch, 
e);
                 // Suppressing exceptions as we don't care for errors on 
heartbeats
             }
         }
@@ -212,10 +225,11 @@ public class HiveWriter {
      * Flush and Close current transactionBatch.
      */
     public void flushAndClose() throws TxnBatchFailure, TxnFailure, 
CommitFailure,
-            IOException, InterruptedException {
+        IOException, InterruptedException {
         flush(false);
         close();
     }
+
     /**
      * Close the Transaction Batch and connection
      * @throws IOException
@@ -231,28 +245,28 @@ public class HiveWriter {
         LOG.info("Closing connection to end point : {}", endPoint);
         try {
             callWithTimeout(new CallRunner<Void>() {
-                    @Override
-                    public Void call() throws Exception {
-                        connection.close(); // could block
-                        return null;
-                    }
-                });
-        } catch(Exception e) {
+                @Override
+                public Void call() throws Exception {
+                    connection.close(); // could block
+                    return null;
+                }
+            });
+        } catch (Exception e) {
             LOG.warn("Error closing connection to EndPoint : " + endPoint, e);
             // Suppressing exceptions as we don't care for errors on 
connection close
         }
     }
 
     private void commitTxn() throws CommitFailure, InterruptedException {
-        LOG.debug("Committing Txn id {} to {}", txnBatch.getCurrentTxnId() , 
endPoint);
+        LOG.debug("Committing Txn id {} to {}", txnBatch.getCurrentTxnId(), 
endPoint);
         try {
             callWithTimeout(new CallRunner<Void>() {
-                    @Override
-                    public Void call() throws Exception {
-                        txnBatch.commit(); // could block
-                        return null;
-                    }
-                });
+                @Override
+                public Void call() throws Exception {
+                    txnBatch.commit(); // could block
+                    return null;
+                }
+            });
         } catch (StreamingException e) {
             throw new CommitFailure(endPoint, txnBatch.getCurrentTxnId(), e);
         } catch (TimeoutException e) {
@@ -264,15 +278,16 @@ public class HiveWriter {
     StreamingConnection newConnection(final UserGroupInformation ugi, final 
boolean tokenAuthEnabled)
         throws InterruptedException, ConnectFailure {
         try {
-            return  callWithTimeout(new CallRunner<StreamingConnection>() {
-                    @Override
-                    public StreamingConnection call() throws Exception {
-                        return endPoint.newConnection(autoCreatePartitions, 
createHiveConf(endPoint.metaStoreUri, tokenAuthEnabled) , ugi); // could block
-                    }
-                });
-        } catch(StreamingException e) {
+            return callWithTimeout(new CallRunner<StreamingConnection>() {
+                @Override
+                public StreamingConnection call() throws Exception {
+                    return endPoint
+                        .newConnection(autoCreatePartitions, 
createHiveConf(endPoint.metaStoreUri, tokenAuthEnabled), ugi); // could block
+                }
+            });
+        } catch (StreamingException e) {
             throw new ConnectFailure(endPoint, e);
-        } catch(TimeoutException e) {
+        } catch (TimeoutException e) {
             throw new ConnectFailure(endPoint, e);
         }
     }
@@ -290,30 +305,30 @@ public class HiveWriter {
             });
             batch.beginNextTransaction();
             LOG.debug("Acquired {}. Switching to first txn", batch);
-        } catch(TimeoutException e) {
+        } catch (TimeoutException e) {
             throw new TxnBatchFailure(endPoint, e);
-        } catch(StreamingException e) {
+        } catch (StreamingException e) {
             throw new TxnBatchFailure(endPoint, e);
         }
         return batch;
     }
 
-    private void closeTxnBatch() throws  InterruptedException {
+    private void closeTxnBatch() throws InterruptedException {
         try {
             LOG.debug("Closing Txn Batch {}", txnBatch);
             callWithTimeout(new CallRunner<Void>() {
-                    @Override
-                        public Void call() throws Exception {
-                        if(txnBatch != null) {
-                            txnBatch.close(); // could block
-                        }
-                        return null;
+                @Override
+                public Void call() throws Exception {
+                    if (txnBatch != null) {
+                        txnBatch.close(); // could block
                     }
-                });
-        } catch(InterruptedException e) {
+                    return null;
+                }
+            });
+        } catch (InterruptedException e) {
             throw e;
-        } catch(Exception e) {
-            LOG.warn("Error closing txn batch "+ txnBatch, e);
+        } catch (Exception e) {
+            LOG.warn("Error closing txn batch " + txnBatch, e);
         }
     }
 
@@ -322,13 +337,12 @@ public class HiveWriter {
      * @throws StreamingException if could not get new Transaction Batch, or 
switch to next Txn
      */
     public void abort() throws StreamingException, TxnBatchFailure, 
InterruptedException {
-        synchronized(txnBatchLock) {
+        synchronized (txnBatchLock) {
             abortTxn();
             nextTxn(true); // roll to next
         }
     }
 
-
     /**
      * Aborts current Txn in the txnBatch.
      */
@@ -336,12 +350,12 @@ public class HiveWriter {
         LOG.info("Aborting Txn id {} on End Point {}", 
txnBatch.getCurrentTxnId(), endPoint);
         try {
             callWithTimeout(new CallRunner<Void>() {
-                    @Override
-                        public Void call() throws StreamingException, 
InterruptedException {
-                        txnBatch.abort(); // could block
-                        return null;
-                    }
-                });
+                @Override
+                public Void call() throws StreamingException, 
InterruptedException {
+                    txnBatch.abort(); // could block
+                    return null;
+                }
+            });
         } catch (InterruptedException e) {
             throw e;
         } catch (TimeoutException e) {
@@ -352,40 +366,25 @@ public class HiveWriter {
         }
     }
 
-
     /**
      * if there are remainingTransactions in current txnBatch, begins 
nextTransactions
      * otherwise creates new txnBatch.
      * @param rollToNext
      */
     private void nextTxn(boolean rollToNext) throws StreamingException, 
InterruptedException, TxnBatchFailure {
-        if(txnBatch.remainingTransactions() == 0) {
+        if (txnBatch.remainingTransactions() == 0) {
             closeTxnBatch();
             txnBatch = null;
-            if(rollToNext) {
+            if (rollToNext) {
                 txnBatch = nextTxnBatch(recordWriter);
             }
-        } else if(rollToNext) {
+        } else if (rollToNext) {
             LOG.debug("Switching to next Txn for {}", endPoint);
             txnBatch.beginNextTransaction(); // does not block
         }
     }
 
     /**
-     * If the current thread has been interrupted, then throws an
-     * exception.
-     * @throws InterruptedException
-     */
-    private static void checkAndThrowInterruptedException()
-        throws InterruptedException {
-        if (Thread.currentThread().interrupted()) {
-            throw new InterruptedException("Timed out before Hive call was 
made. "
-                                           + "Your callTimeout might be set 
too low or Hive calls are "
-                                           + "taking too long.");
-        }
-    }
-
-    /**
      * Execute the callable on a separate thread and wait for the completion
      * for the specified amount of time in milliseconds. In case of timeout
      * cancel the callable and throw an IOException
@@ -393,11 +392,11 @@ public class HiveWriter {
     private <T> T callWithTimeout(final CallRunner<T> callRunner)
         throws TimeoutException, StreamingException, InterruptedException {
         Future<T> future = callTimeoutPool.submit(new Callable<T>() {
-                @Override
-                public T call() throws Exception {
-                    return callRunner.call();
-                }
-            });
+            @Override
+            public T call() throws Exception {
+                return callRunner.call();
+            }
+        });
         try {
             if (callTimeout > 0) {
                 return future.get(callTimeout, TimeUnit.MILLISECONDS);
@@ -431,7 +430,7 @@ public class HiveWriter {
 
     private byte[] generateRecord(Tuple tuple) {
         StringBuilder buf = new StringBuilder();
-        for (Object o: tuple.getValues()) {
+        for (Object o : tuple.getValues()) {
             buf.append(o);
             buf.append(",");
         }

http://git-wip-us.apache.org/repos/asf/storm/blob/5fc4e9f0/external/storm-hive/src/main/java/org/apache/storm/hive/trident/HiveState.java
----------------------------------------------------------------------
diff --git 
a/external/storm-hive/src/main/java/org/apache/storm/hive/trident/HiveState.java
 
b/external/storm-hive/src/main/java/org/apache/storm/hive/trident/HiveState.java
index a7685f0..a698e24 100644
--- 
a/external/storm-hive/src/main/java/org/apache/storm/hive/trident/HiveState.java
+++ 
b/external/storm-hive/src/main/java/org/apache/storm/hive/trident/HiveState.java
@@ -1,50 +1,43 @@
 /**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
+ * Licensed to the Apache Software Foundation (ASF) under one or more 
contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.  The 
ASF licenses this file to you under the Apache License, Version
+ * 2.0 (the "License"); you may not use this file except in compliance with 
the License.  You may obtain a copy of the License at
  *
  * http://www.apache.org/licenses/LICENSE-2.0
  *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
+ * Unless required by applicable law or agreed to in writing, software 
distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 
See the License for the specific language governing permissions
+ * and limitations under the License.
  */
 
 package org.apache.storm.hive.trident;
 
-import org.apache.storm.trident.operation.TridentCollector;
-import org.apache.storm.trident.state.State;
-import org.apache.storm.trident.tuple.TridentTuple;
-import org.apache.storm.task.IMetricsContext;
-import org.apache.storm.topology.FailedException;
-import org.apache.storm.hive.common.HiveWriter;
-import org.apache.hive.hcatalog.streaming.*;
-import org.apache.storm.hive.common.HiveOptions;
-import org.apache.storm.hive.common.HiveUtils;
-import org.apache.hadoop.security.UserGroupInformation;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-
+import com.google.common.util.concurrent.ThreadFactoryBuilder;
 import java.io.IOException;
 import java.util.ArrayList;
 import java.util.List;
 import java.util.Map;
-import java.util.concurrent.ConcurrentHashMap;
+import java.util.Map.Entry;
 import java.util.Timer;
 import java.util.TimerTask;
-import java.util.Map.Entry;
-import com.google.common.util.concurrent.ThreadFactoryBuilder;
+import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicBoolean;
+import org.apache.hadoop.security.UserGroupInformation;
+import org.apache.hive.hcatalog.streaming.HiveEndPoint;
+import org.apache.hive.hcatalog.streaming.StreamingException;
+import org.apache.storm.hive.common.HiveOptions;
+import org.apache.storm.hive.common.HiveUtils;
+import org.apache.storm.hive.common.HiveWriter;
+import org.apache.storm.task.IMetricsContext;
+import org.apache.storm.topology.FailedException;
+import org.apache.storm.trident.operation.TridentCollector;
+import org.apache.storm.trident.state.State;
+import org.apache.storm.trident.tuple.TridentTuple;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 public class HiveState implements State {
     private static final Logger LOG = LoggerFactory.getLogger(HiveState.class);
@@ -73,24 +66,24 @@ public class HiveState implements State {
     public void commit(Long txId) {
     }
 
-    public void prepare(Map<String, Object> conf, IMetricsContext metrics, int 
partitionIndex, int numPartitions)  {
+    public void prepare(Map<String, Object> conf, IMetricsContext metrics, int 
partitionIndex, int numPartitions) {
         try {
             tokenAuthEnabled = HiveUtils.isTokenAuthEnabled(conf);
             try {
                 ugi = HiveUtils.authenticate(tokenAuthEnabled, 
options.getKerberosKeytab(), options.getKerberosPrincipal());
-            } catch(HiveUtils.AuthenticationFailed ex) {
+            } catch (HiveUtils.AuthenticationFailed ex) {
                 LOG.error("Hive kerberos authentication failed " + 
ex.getMessage(), ex);
                 throw new IllegalArgumentException(ex);
             }
 
-            allWriters = new ConcurrentHashMap<HiveEndPoint,HiveWriter>();
+            allWriters = new ConcurrentHashMap<HiveEndPoint, HiveWriter>();
             String timeoutName = "hive-bolt-%d";
             this.callTimeoutPool = Executors.newFixedThreadPool(1,
                                                                 new 
ThreadFactoryBuilder().setNameFormat(timeoutName).build());
-            heartBeatTimer= new Timer();
+            heartBeatTimer = new Timer();
             setupHeartBeatTimer();
-        } catch(Exception e) {
-            LOG.warn("unable to make connection to hive ",e);
+        } catch (Exception e) {
+            LOG.warn("unable to make connection to hive ", e);
         }
     }
 
@@ -99,7 +92,7 @@ public class HiveState implements State {
             writeTuples(tuples);
         } catch (Exception e) {
             abortAndCloseWriters();
-            LOG.warn("hive streaming failed.",e);
+            LOG.warn("hive streaming failed.", e);
             throw new FailedException(e);
         }
     }
@@ -112,7 +105,7 @@ public class HiveState implements State {
             HiveWriter writer = getOrCreateWriter(endPoint);
             writer.write(options.getMapper().mapRecord(tuple));
             currentBatchSize++;
-            if(currentBatchSize >= options.getBatchSize()) {
+            if (currentBatchSize >= options.getBatchSize()) {
                 flushAllWriters();
                 currentBatchSize = 0;
             }
@@ -124,7 +117,7 @@ public class HiveState implements State {
             sendHeartBeat = false;
             abortAllWriters();
             closeAllWriters();
-        }  catch(Exception ie) {
+        } catch (Exception ie) {
             LOG.warn("unable to close hive connections. ", ie);
         }
     }
@@ -133,7 +126,7 @@ public class HiveState implements State {
      * Abort current Txn on all writers
      */
     private void abortAllWriters() throws InterruptedException, 
StreamingException, HiveWriter.TxnBatchFailure {
-        for (Entry<HiveEndPoint,HiveWriter> entry : allWriters.entrySet()) {
+        for (Entry<HiveEndPoint, HiveWriter> entry : allWriters.entrySet()) {
             entry.getValue().abort();
         }
     }
@@ -145,7 +138,7 @@ public class HiveState implements State {
      */
     private void closeAllWriters() throws InterruptedException, IOException {
         //1) Retire writers
-        for (Entry<HiveEndPoint,HiveWriter> entry : allWriters.entrySet()) {
+        for (Entry<HiveEndPoint, HiveWriter> entry : allWriters.entrySet()) {
             entry.getValue().close();
         }
         //2) Clear cache
@@ -153,27 +146,27 @@ public class HiveState implements State {
     }
 
     private void setupHeartBeatTimer() {
-        if(options.getHeartBeatInterval()>0) {
+        if (options.getHeartBeatInterval() > 0) {
             heartBeatTimer.schedule(new TimerTask() {
-                    @Override
-                    public void run() {
-                        try {
-                            if (sendHeartBeat) {
-                                LOG.debug("Start sending heartbeat on all 
writers");
-                                sendHeartBeatOnAllWriters();
-                                setupHeartBeatTimer();
-                            }
-                        } catch (Exception e) {
-                            LOG.warn("Failed to heartbeat on HiveWriter ", e);
+                @Override
+                public void run() {
+                    try {
+                        if (sendHeartBeat) {
+                            LOG.debug("Start sending heartbeat on all 
writers");
+                            sendHeartBeatOnAllWriters();
+                            setupHeartBeatTimer();
                         }
+                    } catch (Exception e) {
+                        LOG.warn("Failed to heartbeat on HiveWriter ", e);
                     }
-                }, options.getHeartBeatInterval() * 1000);
+                }
+            }, options.getHeartBeatInterval() * 1000);
         }
     }
 
     private void flushAllWriters()
         throws HiveWriter.CommitFailure, HiveWriter.TxnBatchFailure, 
HiveWriter.TxnFailure, InterruptedException {
-        for(HiveWriter writer: allWriters.values()) {
+        for (HiveWriter writer : allWriters.values()) {
             writer.flush(true);
         }
     }
@@ -187,13 +180,13 @@ public class HiveState implements State {
     private HiveWriter getOrCreateWriter(HiveEndPoint endPoint)
         throws HiveWriter.ConnectFailure, InterruptedException {
         try {
-            HiveWriter writer = allWriters.get( endPoint );
-            if( writer == null ) {
+            HiveWriter writer = allWriters.get(endPoint);
+            if (writer == null) {
                 LOG.info("Creating Writer to Hive end point : " + endPoint);
                 writer = HiveUtils.makeHiveWriter(endPoint, callTimeoutPool, 
ugi, options, tokenAuthEnabled);
-                if(allWriters.size() > (options.getMaxOpenConnections() - 1)){
+                if (allWriters.size() > (options.getMaxOpenConnections() - 1)) 
{
                     int retired = retireIdleWriters();
-                    if(retired==0) {
+                    if (retired == 0) {
                         retireEldestWriter();
                     }
                 }
@@ -208,15 +201,14 @@ public class HiveState implements State {
     }
 
 
-
     /**
      * Locate writer that has not been used for longest time and retire it
      */
     private void retireEldestWriter() {
         long oldestTimeStamp = System.currentTimeMillis();
         HiveEndPoint eldest = null;
-        for (Entry<HiveEndPoint,HiveWriter> entry : allWriters.entrySet()) {
-            if(entry.getValue().getLastUsed() < oldestTimeStamp) {
+        for (Entry<HiveEndPoint, HiveWriter> entry : allWriters.entrySet()) {
+            if (entry.getValue().getLastUsed() < oldestTimeStamp) {
                 eldest = entry.getKey();
                 oldestTimeStamp = entry.getValue().getLastUsed();
             }
@@ -244,19 +236,19 @@ public class HiveState implements State {
         ArrayList<HiveEndPoint> retirees = new ArrayList<HiveEndPoint>();
 
         //1) Find retirement candidates
-        for (Entry<HiveEndPoint,HiveWriter> entry : allWriters.entrySet()) {
-            if(now - entry.getValue().getLastUsed() > 
options.getIdleTimeout()) {
+        for (Entry<HiveEndPoint, HiveWriter> entry : allWriters.entrySet()) {
+            if (now - entry.getValue().getLastUsed() > 
options.getIdleTimeout()) {
                 ++count;
                 retirees.add(entry.getKey());
             }
         }
         //2) Retire them
-        for(HiveEndPoint ep : retirees) {
+        for (HiveEndPoint ep : retirees) {
             try {
                 LOG.info("Closing idle Writer to Hive end point : {}", ep);
                 allWriters.remove(ep).flushAndClose();
             } catch (IOException e) {
-                LOG.warn("Failed to close writer for end point: {}. Error: "+ 
ep, e);
+                LOG.warn("Failed to close writer for end point: {}. Error: " + 
ep, e);
             } catch (InterruptedException e) {
                 LOG.warn("Interrupted when attempting to close writer for end 
point: " + ep, e);
                 Thread.currentThread().interrupt();
@@ -285,13 +277,13 @@ public class HiveState implements State {
             }
         }
 
-        ExecutorService toShutdown[] = {callTimeoutPool};
+        ExecutorService toShutdown[] = { callTimeoutPool };
         for (ExecutorService execService : toShutdown) {
             execService.shutdown();
             try {
                 while (!execService.isTerminated()) {
                     execService.awaitTermination(
-                                                 options.getCallTimeOut(), 
TimeUnit.MILLISECONDS);
+                        options.getCallTimeOut(), TimeUnit.MILLISECONDS);
                 }
             } catch (InterruptedException ex) {
                 LOG.warn("shutdown interrupted on " + execService, ex);

http://git-wip-us.apache.org/repos/asf/storm/blob/5fc4e9f0/external/storm-hive/src/main/java/org/apache/storm/hive/trident/HiveStateFactory.java
----------------------------------------------------------------------
diff --git 
a/external/storm-hive/src/main/java/org/apache/storm/hive/trident/HiveStateFactory.java
 
b/external/storm-hive/src/main/java/org/apache/storm/hive/trident/HiveStateFactory.java
index 6659825..68722d1 100644
--- 
a/external/storm-hive/src/main/java/org/apache/storm/hive/trident/HiveStateFactory.java
+++ 
b/external/storm-hive/src/main/java/org/apache/storm/hive/trident/HiveStateFactory.java
@@ -1,40 +1,33 @@
 /**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
+ * Licensed to the Apache Software Foundation (ASF) under one or more 
contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.  The 
ASF licenses this file to you under the Apache License, Version
+ * 2.0 (the "License"); you may not use this file except in compliance with 
the License.  You may obtain a copy of the License at
  *
  * http://www.apache.org/licenses/LICENSE-2.0
  *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
+ * Unless required by applicable law or agreed to in writing, software 
distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 
See the License for the specific language governing permissions
+ * and limitations under the License.
  */
 
 package org.apache.storm.hive.trident;
 
+import java.util.Map;
+import org.apache.storm.hive.common.HiveOptions;
 import org.apache.storm.task.IMetricsContext;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
 import org.apache.storm.trident.state.State;
 import org.apache.storm.trident.state.StateFactory;
-import org.apache.storm.hive.common.HiveOptions;
-
-import java.util.Map;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 
 public class HiveStateFactory implements StateFactory {
     private static final Logger LOG = 
LoggerFactory.getLogger(HiveStateFactory.class);
     private HiveOptions options;
 
-    public HiveStateFactory(){}
+    public HiveStateFactory() {}
 
-    public HiveStateFactory withOptions(HiveOptions options){
+    public HiveStateFactory withOptions(HiveOptions options) {
         this.options = options;
         return this;
     }

http://git-wip-us.apache.org/repos/asf/storm/blob/5fc4e9f0/external/storm-hive/src/main/java/org/apache/storm/hive/trident/HiveUpdater.java
----------------------------------------------------------------------
diff --git 
a/external/storm-hive/src/main/java/org/apache/storm/hive/trident/HiveUpdater.java
 
b/external/storm-hive/src/main/java/org/apache/storm/hive/trident/HiveUpdater.java
index 062f7fb..36479d9 100644
--- 
a/external/storm-hive/src/main/java/org/apache/storm/hive/trident/HiveUpdater.java
+++ 
b/external/storm-hive/src/main/java/org/apache/storm/hive/trident/HiveUpdater.java
@@ -1,30 +1,23 @@
 /**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
+ * Licensed to the Apache Software Foundation (ASF) under one or more 
contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.  The 
ASF licenses this file to you under the Apache License, Version
+ * 2.0 (the "License"); you may not use this file except in compliance with 
the License.  You may obtain a copy of the License at
  *
  * http://www.apache.org/licenses/LICENSE-2.0
  *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
+ * Unless required by applicable law or agreed to in writing, software 
distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 
See the License for the specific language governing permissions
+ * and limitations under the License.
  */
 
 package org.apache.storm.hive.trident;
 
+import java.util.List;
 import org.apache.storm.trident.operation.TridentCollector;
 import org.apache.storm.trident.state.BaseStateUpdater;
 import org.apache.storm.trident.tuple.TridentTuple;
 
-import java.util.List;
-
-public class HiveUpdater extends BaseStateUpdater<HiveState>{
+public class HiveUpdater extends BaseStateUpdater<HiveState> {
     @Override
     public void updateState(HiveState state, List<TridentTuple> tuples, 
TridentCollector collector) {
         state.updateState(tuples, collector);

http://git-wip-us.apache.org/repos/asf/storm/blob/5fc4e9f0/external/storm-hive/src/test/java/org/apache/storm/hive/bolt/HiveSetupUtil.java
----------------------------------------------------------------------
diff --git 
a/external/storm-hive/src/test/java/org/apache/storm/hive/bolt/HiveSetupUtil.java
 
b/external/storm-hive/src/test/java/org/apache/storm/hive/bolt/HiveSetupUtil.java
index d492819..6646390 100644
--- 
a/external/storm-hive/src/test/java/org/apache/storm/hive/bolt/HiveSetupUtil.java
+++ 
b/external/storm-hive/src/test/java/org/apache/storm/hive/bolt/HiveSetupUtil.java
@@ -1,23 +1,26 @@
 /**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
+ * Licensed to the Apache Software Foundation (ASF) under one or more 
contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.  The 
ASF licenses this file to you under the Apache License, Version
+ * 2.0 (the "License"); you may not use this file except in compliance with 
the License.  You may obtain a copy of the License at
  *
  * http://www.apache.org/licenses/LICENSE-2.0
  *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
+ * Unless required by applicable law or agreed to in writing, software 
distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 
See the License for the specific language governing permissions
+ * and limitations under the License.
  */
 
 package org.apache.storm.hive.bolt;
 
+import java.io.File;
+import java.io.FileNotFoundException;
+import java.io.IOException;
+import java.net.URI;
+import java.net.URISyntaxException;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
 import org.apache.hadoop.fs.FileStatus;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.fs.RawLocalFileSystem;
@@ -42,57 +45,7 @@ import org.apache.hadoop.hive.serde.serdeConstants;
 import org.apache.hadoop.hive.shims.ShimLoader;
 import org.apache.thrift.TException;
 
-
-import java.io.File;
-import java.io.FileNotFoundException;
-import java.io.IOException;
-import java.net.URI;
-import java.net.URISyntaxException;
-import java.util.ArrayList;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-
 public class HiveSetupUtil {
-    public static class RawFileSystem extends RawLocalFileSystem {
-        private static final URI NAME;
-        static {
-            try {
-                NAME = new URI("raw:///");
-            } catch (URISyntaxException se) {
-                throw new IllegalArgumentException("bad uri", se);
-            }
-        }
-
-        @Override
-        public URI getUri() {
-            return NAME;
-        }
-
-        @Override
-        public FileStatus getFileStatus(Path path) throws IOException {
-            File file = pathToFile(path);
-            if (!file.exists()) {
-                throw new FileNotFoundException("Can't find " + path);
-            }
-            // get close enough
-            short mod = 0;
-            if (file.canRead()) {
-                mod |= 0444;
-            }
-            if (file.canWrite()) {
-                mod |= 0200;
-            }
-            if (file.canExecute()) {
-                mod |= 0111;
-            }
-            ShimLoader.getHadoopShims();
-            return new FileStatus(file.length(), file.isDirectory(), 1, 1024,
-                                  file.lastModified(), file.lastModified(),
-                                  FsPermission.createImmutable(mod), "owen", 
"users", path);
-        }
-    }
-
     private final static String txnMgr = 
"org.apache.hadoop.hive.ql.lockmgr.DbTxnManager";
 
     public static HiveConf getHiveConf() {
@@ -126,7 +79,7 @@ public class HiveSetupUtil {
             sd.setCols(getTableColumns(colNames, colTypes));
             sd.setNumBuckets(1);
             sd.setLocation(dbLocation + Path.SEPARATOR + tableName);
-            if(partNames!=null && partNames.length!=0) {
+            if (partNames != null && partNames.length != 0) {
                 tbl.setPartitionKeys(getPartitionKeys(partNames));
             }
 
@@ -146,10 +99,10 @@ public class HiveSetupUtil {
             tbl.setParameters(tableParams);
             client.createTable(tbl);
             try {
-                if(partVals!=null && partVals.size() > 0) {
+                if (partVals != null && partVals.size() > 0) {
                     addPartition(client, tbl, partVals);
                 }
-            } catch(AlreadyExistsException e) {
+            } catch (AlreadyExistsException e) {
             }
         } finally {
             client.close();
@@ -170,7 +123,7 @@ public class HiveSetupUtil {
     }
 
     private static void addPartition(IMetaStoreClient client, Table tbl
-                                     , List<String> partValues)
+        , List<String> partValues)
         throws IOException, TException {
         Partition part = new Partition();
         part.setDbName(tbl.getDbName());
@@ -183,17 +136,17 @@ public class HiveSetupUtil {
     }
 
     private static String makePartPath(List<FieldSchema> partKeys, 
List<String> partVals) {
-        if(partKeys.size()!=partVals.size()) {
+        if (partKeys.size() != partVals.size()) {
             throw new IllegalArgumentException("Partition values:" + partVals +
-                                               ", does not match the partition 
Keys in table :" + partKeys );
+                                               ", does not match the partition 
Keys in table :" + partKeys);
         }
-        StringBuffer buff = new StringBuffer(partKeys.size()*20);
-        int i=0;
-        for(FieldSchema schema : partKeys) {
+        StringBuffer buff = new StringBuffer(partKeys.size() * 20);
+        int i = 0;
+        for (FieldSchema schema : partKeys) {
             buff.append(schema.getName());
             buff.append("=");
             buff.append(partVals.get(i));
-            if(i!=partKeys.size()-1) {
+            if (i != partKeys.size() - 1) {
                 buff.append(Path.SEPARATOR);
             }
             ++i;
@@ -203,7 +156,7 @@ public class HiveSetupUtil {
 
     private static List<FieldSchema> getTableColumns(String[] colNames, 
String[] colTypes) {
         List<FieldSchema> fields = new ArrayList<FieldSchema>();
-        for (int i=0; i<colNames.length; ++i) {
+        for (int i = 0; i < colNames.length; ++i) {
             fields.add(new FieldSchema(colNames[i], colTypes[i], ""));
         }
         return fields;
@@ -211,10 +164,50 @@ public class HiveSetupUtil {
 
     private static List<FieldSchema> getPartitionKeys(String[] partNames) {
         List<FieldSchema> fields = new ArrayList<FieldSchema>();
-        for (int i=0; i < partNames.length; ++i) {
-           fields.add(new FieldSchema(partNames[i], 
serdeConstants.STRING_TYPE_NAME, ""));
+        for (int i = 0; i < partNames.length; ++i) {
+            fields.add(new FieldSchema(partNames[i], 
serdeConstants.STRING_TYPE_NAME, ""));
         }
         return fields;
     }
 
+    public static class RawFileSystem extends RawLocalFileSystem {
+        private static final URI NAME;
+
+        static {
+            try {
+                NAME = new URI("raw:///");
+            } catch (URISyntaxException se) {
+                throw new IllegalArgumentException("bad uri", se);
+            }
+        }
+
+        @Override
+        public URI getUri() {
+            return NAME;
+        }
+
+        @Override
+        public FileStatus getFileStatus(Path path) throws IOException {
+            File file = pathToFile(path);
+            if (!file.exists()) {
+                throw new FileNotFoundException("Can't find " + path);
+            }
+            // get close enough
+            short mod = 0;
+            if (file.canRead()) {
+                mod |= 0444;
+            }
+            if (file.canWrite()) {
+                mod |= 0200;
+            }
+            if (file.canExecute()) {
+                mod |= 0111;
+            }
+            ShimLoader.getHadoopShims();
+            return new FileStatus(file.length(), file.isDirectory(), 1, 1024,
+                                  file.lastModified(), file.lastModified(),
+                                  FsPermission.createImmutable(mod), "owen", 
"users", path);
+        }
+    }
+
 }

Reply via email to