http://git-wip-us.apache.org/repos/asf/storm/blob/5fc4e9f0/external/storm-hive/src/test/java/org/apache/storm/hive/bolt/TestHiveBolt.java
----------------------------------------------------------------------
diff --git 
a/external/storm-hive/src/test/java/org/apache/storm/hive/bolt/TestHiveBolt.java
 
b/external/storm-hive/src/test/java/org/apache/storm/hive/bolt/TestHiveBolt.java
index a18b91e..35815fd 100644
--- 
a/external/storm-hive/src/test/java/org/apache/storm/hive/bolt/TestHiveBolt.java
+++ 
b/external/storm-hive/src/test/java/org/apache/storm/hive/bolt/TestHiveBolt.java
@@ -1,19 +1,13 @@
 /**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
+ * Licensed to the Apache Software Foundation (ASF) under one or more 
contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.  The 
ASF licenses this file to you under the Apache License, Version
+ * 2.0 (the "License"); you may not use this file except in compliance with 
the License.  You may obtain a copy of the License at
  *
  * http://www.apache.org/licenses/LICENSE-2.0
  *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
+ * Unless required by applicable law or agreed to in writing, software 
distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 
See the License for the specific language governing permissions
+ * and limitations under the License.
  */
 
 package org.apache.storm.hive.bolt;
@@ -21,7 +15,23 @@ package org.apache.storm.hive.bolt;
 import com.fasterxml.jackson.core.type.TypeReference;
 import com.fasterxml.jackson.databind.ObjectMapper;
 import com.google.common.collect.Lists;
+import java.text.SimpleDateFormat;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.Date;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.hadoop.hive.metastore.txn.TxnDbUtil;
+import org.apache.hadoop.hive.serde.serdeConstants;
+import org.apache.hive.hcatalog.streaming.HiveEndPoint;
 import org.apache.storm.Config;
+import org.apache.storm.hive.bolt.mapper.DelimitedRecordHiveMapper;
+import org.apache.storm.hive.bolt.mapper.JsonRecordHiveMapper;
+import org.apache.storm.hive.common.HiveOptions;
 import org.apache.storm.hive.common.HiveWriter;
 import org.apache.storm.task.GeneralTopologyContext;
 import org.apache.storm.task.OutputCollector;
@@ -31,41 +41,21 @@ import org.apache.storm.tuple.Tuple;
 import org.apache.storm.tuple.TupleImpl;
 import org.apache.storm.tuple.Values;
 import org.apache.storm.utils.MockTupleHelpers;
-
-import org.apache.storm.hive.common.HiveOptions;
-import org.apache.storm.hive.bolt.mapper.DelimitedRecordHiveMapper;
-import org.apache.storm.hive.bolt.mapper.JsonRecordHiveMapper;
-
-import org.apache.hadoop.hive.conf.HiveConf;
-import org.apache.hadoop.hive.serde.serdeConstants;
-import org.apache.hadoop.hive.metastore.txn.TxnDbUtil;
-
+import org.junit.Assert;
 import org.junit.Before;
 import org.junit.Test;
 import org.mockito.Mock;
 import org.mockito.Mockito;
 import org.mockito.MockitoAnnotations;
-import static org.mockito.Mockito.*;
-
-import org.junit.Assert;
-
 import org.mockito.invocation.InvocationOnMock;
 import org.mockito.stubbing.Answer;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import java.util.Collections;
-import java.util.HashMap;
-import java.util.ArrayList;
-import java.util.Date;
-import java.util.List;
-import java.util.Map;
-import java.util.Set;
-import java.util.HashSet;
-import java.text.SimpleDateFormat;
-
-
-import org.apache.hive.hcatalog.streaming.*;
+import static org.mockito.Mockito.any;
+import static org.mockito.Mockito.never;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.verifyZeroInteractions;
 
 public class TestHiveBolt {
     final static String dbName = "testdb";
@@ -74,87 +64,23 @@ public class TestHiveBolt {
     final static String tblName1 = "test_table1";
     final static String PART1_NAME = "city";
     final static String PART2_NAME = "state";
-    final static String[] partNames = {PART1_NAME, PART2_NAME};
-    final String partitionVals = "sunnyvale,ca";
+    final static String[] partNames = { PART1_NAME, PART2_NAME };
     private static final String COL1 = "id";
     private static final String COL2 = "msg";
-    final String[] colNames = {COL1, COL2};
-    final String[] colNames1 = {COL2, COL1};
-    private String[] colTypes = {serdeConstants.INT_TYPE_NAME, 
serdeConstants.STRING_TYPE_NAME};
-    private final HiveConf conf;
+    private static final Logger LOG = LoggerFactory.getLogger(HiveBolt.class);
+    final String partitionVals = "sunnyvale,ca";
+    final String[] colNames = { COL1, COL2 };
+    final String[] colNames1 = { COL2, COL1 };
     final String metaStoreURI;
+    private final HiveConf conf;
+    private String[] colTypes = { serdeConstants.INT_TYPE_NAME, 
serdeConstants.STRING_TYPE_NAME };
     private Config config = new Config();
     private TestingHiveBolt bolt;
-    private ObjectMapper objectMapper = new ObjectMapper();;
-
+    ;
+    private ObjectMapper objectMapper = new ObjectMapper();
     @Mock
     private OutputCollector collector;
 
-    private static final Logger LOG = LoggerFactory.getLogger(HiveBolt.class);
-
-    private static class TestingHiveBolt extends HiveBolt {
-
-        protected Map<List<String>, List<byte[]>> 
partitionValuesToWrittenRecords = new HashMap<>();
-
-        public TestingHiveBolt(HiveOptions options) {
-            super(options);
-        }
-
-        @Override
-        HiveWriter getOrCreateWriter(final HiveEndPoint endPoint)
-                throws HiveWriter.ConnectFailure, InterruptedException {
-            HiveWriter writer = allWriters.get(endPoint);
-            if (writer == null) {
-                // always provide mocked HiveWriter
-                writer = Mockito.mock(HiveWriter.class);
-                try {
-                    Mockito.doAnswer(new Answer<Void>() {
-                        @Override
-                        public Void answer(InvocationOnMock invocation) throws 
Throwable {
-                            Object[] arguments = invocation.getArguments();
-                            List<String> partitionVals = 
endPoint.partitionVals;
-                            List<byte[]> writtenRecords = 
partitionValuesToWrittenRecords.get(partitionVals);
-                            if (writtenRecords == null) {
-                                writtenRecords = new ArrayList<>();
-                                
partitionValuesToWrittenRecords.put(partitionVals, writtenRecords);
-                            }
-                            writtenRecords.add((byte[]) arguments[0]);
-                            return null;
-                        }
-                    }).when(writer).write(any(byte[].class));
-                } catch (Exception exc) {
-                    throw new RuntimeException(exc);
-                }
-            }
-            return writer;
-        }
-
-        public Map<List<String>, List<byte[]>> 
getPartitionValuesToWrittenRecords() {
-            return partitionValuesToWrittenRecords;
-        }
-
-        public List<byte[]> getRecordWritten(List<String> partitionValues) {
-            return partitionValuesToWrittenRecords.get(partitionValues);
-        }
-    }
-
-    private static class FlushFailureHiveBolt extends TestingHiveBolt {
-
-        public FlushFailureHiveBolt(HiveOptions options) {
-            super(options);
-        }
-
-        @Override
-        void flushAllWriters(boolean rollToNext) throws 
HiveWriter.CommitFailure, HiveWriter.TxnBatchFailure, HiveWriter.TxnFailure,
-                InterruptedException {
-            if (rollToNext) {
-                throw new InterruptedException();
-            } else {
-                super.flushAllWriters(false);
-            }
-        }
-    }
-
     public TestHiveBolt() throws Exception {
         //metaStoreURI = 
"jdbc:derby:;databaseName="+System.getProperty("java.io.tmpdir") 
+"metastore_db;create=true";
         metaStoreURI = null;
@@ -172,13 +98,13 @@ public class TestHiveBolt {
 
     @Test
     public void testWithByteArrayIdandMessage()
-            throws Exception {
+        throws Exception {
         DelimitedRecordHiveMapper mapper = new DelimitedRecordHiveMapper()
-                .withColumnFields(new Fields(colNames))
-                .withPartitionFields(new Fields(partNames));
+            .withColumnFields(new Fields(colNames))
+            .withPartitionFields(new Fields(partNames));
         HiveOptions hiveOptions = new HiveOptions(metaStoreURI, dbName, 
tblName, mapper)
-                .withTxnsPerBatch(2)
-                .withBatchSize(2);
+            .withTxnsPerBatch(2)
+            .withBatchSize(2);
 
         bolt = new TestingHiveBolt(hiveOptions);
         bolt.prepare(config, null, collector);
@@ -206,14 +132,13 @@ public class TestHiveBolt {
         bolt.cleanup();
     }
 
-
     @Test
     public void testWithoutPartitions()
-            throws Exception {
+        throws Exception {
         DelimitedRecordHiveMapper mapper = new DelimitedRecordHiveMapper()
-                .withColumnFields(new Fields(colNames));
+            .withColumnFields(new Fields(colNames));
         HiveOptions hiveOptions = new HiveOptions(metaStoreURI, dbName1, 
tblName1, mapper)
-                
.withTxnsPerBatch(2).withBatchSize(2).withAutoCreatePartitions(false);
+            
.withTxnsPerBatch(2).withBatchSize(2).withAutoCreatePartitions(false);
 
         bolt = new TestingHiveBolt(hiveOptions);
         bolt.prepare(config, null, collector);
@@ -245,15 +170,15 @@ public class TestHiveBolt {
 
     @Test
     public void testWithTimeformat()
-            throws Exception {
+        throws Exception {
         String timeFormat = "yyyy/MM/dd";
         DelimitedRecordHiveMapper mapper = new DelimitedRecordHiveMapper()
-                .withColumnFields(new Fields(colNames))
-                .withTimeAsPartitionField(timeFormat);
+            .withColumnFields(new Fields(colNames))
+            .withTimeAsPartitionField(timeFormat);
         HiveOptions hiveOptions = new HiveOptions(metaStoreURI, dbName1, 
tblName1, mapper)
-                .withTxnsPerBatch(2)
-                .withBatchSize(1)
-                .withMaxOpenConnections(1);
+            .withTxnsPerBatch(2)
+            .withBatchSize(1)
+            .withMaxOpenConnections(1);
 
         bolt = new TestingHiveBolt(hiveOptions);
         bolt.prepare(config, null, collector);
@@ -292,13 +217,13 @@ public class TestHiveBolt {
 
     @Test
     public void testData()
-            throws Exception {
+        throws Exception {
         DelimitedRecordHiveMapper mapper = new DelimitedRecordHiveMapper()
-                .withColumnFields(new Fields(colNames))
-                .withPartitionFields(new Fields(partNames));
+            .withColumnFields(new Fields(colNames))
+            .withPartitionFields(new Fields(partNames));
         HiveOptions hiveOptions = new HiveOptions(metaStoreURI, dbName, 
tblName, mapper)
-                .withTxnsPerBatch(2)
-                .withBatchSize(1);
+            .withTxnsPerBatch(2)
+            .withBatchSize(1);
 
         bolt = new TestingHiveBolt(hiveOptions);
         bolt.prepare(config, null, new OutputCollector(collector));
@@ -327,15 +252,15 @@ public class TestHiveBolt {
 
     @Test
     public void testJsonWriter()
-            throws Exception {
+        throws Exception {
         // json record doesn't need columns to be in the same order
         // as table in hive.
         JsonRecordHiveMapper mapper = new JsonRecordHiveMapper()
-                .withColumnFields(new Fields(colNames1))
-                .withPartitionFields(new Fields(partNames));
+            .withColumnFields(new Fields(colNames1))
+            .withPartitionFields(new Fields(partNames));
         HiveOptions hiveOptions = new HiveOptions(metaStoreURI, dbName, 
tblName, mapper)
-                .withTxnsPerBatch(2)
-                .withBatchSize(1);
+            .withTxnsPerBatch(2)
+            .withBatchSize(1);
 
         bolt = new TestingHiveBolt(hiveOptions);
         bolt.prepare(config, null, collector);
@@ -373,11 +298,11 @@ public class TestHiveBolt {
     @Test
     public void testNoAcksUntilFlushed() {
         JsonRecordHiveMapper mapper = new JsonRecordHiveMapper()
-                .withColumnFields(new Fields(colNames1))
-                .withPartitionFields(new Fields(partNames));
+            .withColumnFields(new Fields(colNames1))
+            .withPartitionFields(new Fields(partNames));
         HiveOptions hiveOptions = new HiveOptions(metaStoreURI, dbName, 
tblName, mapper)
-                .withTxnsPerBatch(2)
-                .withBatchSize(2);
+            .withTxnsPerBatch(2)
+            .withBatchSize(2);
 
         bolt = new TestingHiveBolt(hiveOptions);
         bolt.prepare(config, null, new OutputCollector(collector));
@@ -397,11 +322,11 @@ public class TestHiveBolt {
     @Test
     public void testNoAcksIfFlushFails() throws Exception {
         JsonRecordHiveMapper mapper = new JsonRecordHiveMapper()
-                .withColumnFields(new Fields(colNames1))
-                .withPartitionFields(new Fields(partNames));
+            .withColumnFields(new Fields(colNames1))
+            .withPartitionFields(new Fields(partNames));
         HiveOptions hiveOptions = new HiveOptions(metaStoreURI, dbName, 
tblName, mapper)
-                .withTxnsPerBatch(2)
-                .withBatchSize(2);
+            .withTxnsPerBatch(2)
+            .withBatchSize(2);
 
         HiveBolt failingBolt = new FlushFailureHiveBolt(hiveOptions);
 
@@ -422,11 +347,11 @@ public class TestHiveBolt {
     @Test
     public void testTickTuple() {
         JsonRecordHiveMapper mapper = new JsonRecordHiveMapper()
-                .withColumnFields(new Fields(colNames1))
-                .withPartitionFields(new Fields(partNames));
+            .withColumnFields(new Fields(colNames1))
+            .withPartitionFields(new Fields(partNames));
         HiveOptions hiveOptions = new HiveOptions(metaStoreURI, dbName, 
tblName, mapper)
-                .withTxnsPerBatch(2)
-                .withBatchSize(2);
+            .withTxnsPerBatch(2)
+            .withBatchSize(2);
 
         bolt = new TestingHiveBolt(hiveOptions);
         bolt.prepare(config, null, new OutputCollector(collector));
@@ -452,11 +377,11 @@ public class TestHiveBolt {
     @Test
     public void testNoTickEmptyBatches() throws Exception {
         JsonRecordHiveMapper mapper = new JsonRecordHiveMapper()
-                .withColumnFields(new Fields(colNames1))
-                .withPartitionFields(new Fields(partNames));
+            .withColumnFields(new Fields(colNames1))
+            .withPartitionFields(new Fields(partNames));
         HiveOptions hiveOptions = new HiveOptions(metaStoreURI, dbName, 
tblName, mapper)
-                .withTxnsPerBatch(2)
-                .withBatchSize(2);
+            .withTxnsPerBatch(2)
+            .withBatchSize(2);
 
         bolt = new TestingHiveBolt(hiveOptions);
         bolt.prepare(config, null, new OutputCollector(collector));
@@ -471,13 +396,13 @@ public class TestHiveBolt {
 
     @Test
     public void testMultiPartitionTuples()
-            throws Exception {
+        throws Exception {
         DelimitedRecordHiveMapper mapper = new DelimitedRecordHiveMapper()
-                .withColumnFields(new Fields(colNames))
-                .withPartitionFields(new Fields(partNames));
+            .withColumnFields(new Fields(colNames))
+            .withPartitionFields(new Fields(partNames));
         HiveOptions hiveOptions = new HiveOptions(metaStoreURI, dbName, 
tblName, mapper)
-                .withTxnsPerBatch(10)
-                .withBatchSize(10);
+            .withTxnsPerBatch(10)
+            .withBatchSize(10);
 
         bolt = new TestingHiveBolt(hiveOptions);
         bolt.prepare(config, null, new OutputCollector(collector));
@@ -517,7 +442,7 @@ public class TestHiveBolt {
     private Tuple generateTestTuple(Object id, Object msg, Object city, Object 
state) {
         TopologyBuilder builder = new TopologyBuilder();
         GeneralTopologyContext topologyContext = new 
GeneralTopologyContext(builder.createTopology(),
-                new Config(), new HashMap(), new HashMap(), new HashMap(), "") 
{
+                                                                            
new Config(), new HashMap(), new HashMap(), new HashMap(), "") {
             @Override
             public Fields getComponentOutputFields(String componentId, String 
streamId) {
                 return new Fields("id", "msg", "city", "state");
@@ -535,4 +460,67 @@ public class TestHiveBolt {
         return builder.toString().getBytes();
     }
 
+    private static class TestingHiveBolt extends HiveBolt {
+
+        protected Map<List<String>, List<byte[]>> 
partitionValuesToWrittenRecords = new HashMap<>();
+
+        public TestingHiveBolt(HiveOptions options) {
+            super(options);
+        }
+
+        @Override
+        HiveWriter getOrCreateWriter(final HiveEndPoint endPoint)
+            throws HiveWriter.ConnectFailure, InterruptedException {
+            HiveWriter writer = allWriters.get(endPoint);
+            if (writer == null) {
+                // always provide mocked HiveWriter
+                writer = Mockito.mock(HiveWriter.class);
+                try {
+                    Mockito.doAnswer(new Answer<Void>() {
+                        @Override
+                        public Void answer(InvocationOnMock invocation) throws 
Throwable {
+                            Object[] arguments = invocation.getArguments();
+                            List<String> partitionVals = 
endPoint.partitionVals;
+                            List<byte[]> writtenRecords = 
partitionValuesToWrittenRecords.get(partitionVals);
+                            if (writtenRecords == null) {
+                                writtenRecords = new ArrayList<>();
+                                
partitionValuesToWrittenRecords.put(partitionVals, writtenRecords);
+                            }
+                            writtenRecords.add((byte[]) arguments[0]);
+                            return null;
+                        }
+                    }).when(writer).write(any(byte[].class));
+                } catch (Exception exc) {
+                    throw new RuntimeException(exc);
+                }
+            }
+            return writer;
+        }
+
+        public Map<List<String>, List<byte[]>> 
getPartitionValuesToWrittenRecords() {
+            return partitionValuesToWrittenRecords;
+        }
+
+        public List<byte[]> getRecordWritten(List<String> partitionValues) {
+            return partitionValuesToWrittenRecords.get(partitionValues);
+        }
+    }
+
+    private static class FlushFailureHiveBolt extends TestingHiveBolt {
+
+        public FlushFailureHiveBolt(HiveOptions options) {
+            super(options);
+        }
+
+        @Override
+        void flushAllWriters(boolean rollToNext) throws 
HiveWriter.CommitFailure, HiveWriter.TxnBatchFailure, HiveWriter.TxnFailure,
+            InterruptedException {
+            if (rollToNext) {
+                throw new InterruptedException();
+            } else {
+                super.flushAllWriters(false);
+            }
+        }
+    }
+
 }

http://git-wip-us.apache.org/repos/asf/storm/blob/5fc4e9f0/external/storm-hive/src/test/java/org/apache/storm/hive/common/TestHiveWriter.java
----------------------------------------------------------------------
diff --git 
a/external/storm-hive/src/test/java/org/apache/storm/hive/common/TestHiveWriter.java
 
b/external/storm-hive/src/test/java/org/apache/storm/hive/common/TestHiveWriter.java
index 450342b..03d3383 100644
--- 
a/external/storm-hive/src/test/java/org/apache/storm/hive/common/TestHiveWriter.java
+++ 
b/external/storm-hive/src/test/java/org/apache/storm/hive/common/TestHiveWriter.java
@@ -1,140 +1,76 @@
 /**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
+ * Licensed to the Apache Software Foundation (ASF) under one or more 
contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.  The 
ASF licenses this file to you under the Apache License, Version
+ * 2.0 (the "License"); you may not use this file except in compliance with 
the License.  You may obtain a copy of the License at
  *
  * http://www.apache.org/licenses/LICENSE-2.0
  *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
+ * Unless required by applicable law or agreed to in writing, software 
distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 
See the License for the specific language governing permissions
+ * and limitations under the License.
  */
 
 package org.apache.storm.hive.common;
 
 import com.google.common.util.concurrent.ThreadFactoryBuilder;
+import java.io.IOException;
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
 import junit.framework.Assert;
-import org.apache.hadoop.hive.cli.CliSessionState;
 import org.apache.hadoop.hive.conf.HiveConf;
-import org.apache.hadoop.hive.ql.CommandNeedRetryException;
-import org.apache.hadoop.hive.ql.Driver;
-import org.apache.hadoop.hive.ql.session.SessionState;
+import org.apache.hadoop.hive.metastore.txn.TxnDbUtil;
+import org.apache.hadoop.security.UserGroupInformation;
 import org.apache.hive.hcatalog.streaming.HiveEndPoint;
 import org.apache.hive.hcatalog.streaming.RecordWriter;
+import org.apache.hive.hcatalog.streaming.SerializationError;
 import org.apache.hive.hcatalog.streaming.StreamingConnection;
 import org.apache.hive.hcatalog.streaming.StreamingException;
-import org.apache.hive.hcatalog.streaming.SerializationError;
 import org.apache.hive.hcatalog.streaming.TransactionBatch;
+import org.apache.storm.Config;
+import org.apache.storm.hive.bolt.HiveSetupUtil;
 import org.apache.storm.hive.bolt.mapper.DelimitedRecordHiveMapper;
 import org.apache.storm.hive.bolt.mapper.HiveMapper;
-import org.apache.storm.hive.bolt.HiveSetupUtil;
-import org.apache.hadoop.security.UserGroupInformation;
-import org.junit.Before;
-import org.junit.Rule;
-import org.junit.Test;
-import org.junit.rules.TemporaryFolder;
-
-import org.apache.storm.Config;
 import org.apache.storm.task.GeneralTopologyContext;
 import org.apache.storm.topology.TopologyBuilder;
 import org.apache.storm.tuple.Fields;
 import org.apache.storm.tuple.Tuple;
 import org.apache.storm.tuple.TupleImpl;
 import org.apache.storm.tuple.Values;
-import org.apache.hadoop.hive.metastore.txn.TxnDbUtil;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.TemporaryFolder;
 import org.mockito.Mockito;
 
-import java.io.IOException;
-import java.util.Arrays;
-import java.util.ArrayList;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Executors;
-import java.util.HashMap;
-
 public class TestHiveWriter {
-    final static String dbName = "testdb";
-    final static String tblName = "test_table2";
-
     public static final String PART1_NAME = "city";
     public static final String PART2_NAME = "state";
     public static final String[] partNames = { PART1_NAME, PART2_NAME };
-    final String[] partitionVals = {"sunnyvale","ca"};
-    final String[] colNames = {"id","msg"};
+    final static String dbName = "testdb";
+    final static String tblName = "test_table2";
+    final String[] partitionVals = { "sunnyvale", "ca" };
+    final String[] colNames = { "id", "msg" };
     private final int port;
     private final String metaStoreURI;
     private final HiveConf conf;
-    private ExecutorService callTimeoutPool;
-    int timeout = 10000; // msec
-    UserGroupInformation ugi = null;
-
     @Rule
     public TemporaryFolder dbFolder = new TemporaryFolder();
-
-    private static class TestingHiveWriter extends HiveWriter {
-
-        private StreamingConnection mockedStreamingConn;
-        private TransactionBatch mockedTxBatch;
-
-        public TestingHiveWriter(HiveEndPoint endPoint, int txnsPerBatch, 
boolean autoCreatePartitions, long callTimeout, ExecutorService 
callTimeoutPool, HiveMapper mapper, UserGroupInformation ugi, boolean 
tokenAuthEnabled) throws InterruptedException, ConnectFailure {
-            super(endPoint, txnsPerBatch, autoCreatePartitions, callTimeout, 
callTimeoutPool, mapper, ugi, tokenAuthEnabled);
-        }
-
-        @Override
-        synchronized StreamingConnection newConnection(UserGroupInformation 
ugi, boolean tokenAuthEnabled) throws InterruptedException, ConnectFailure {
-            if (mockedStreamingConn == null) {
-                mockedStreamingConn = Mockito.mock(StreamingConnection.class);
-                mockedTxBatch = Mockito.mock(TransactionBatch.class);
-
-                try {
-                    
Mockito.when(mockedStreamingConn.fetchTransactionBatch(Mockito.anyInt(), 
Mockito.any(RecordWriter.class)))
-                            .thenReturn(mockedTxBatch);
-                } catch (StreamingException e) {
-                    throw new RuntimeException(e);
-                }
-            }
-
-            return mockedStreamingConn;
-        }
-
-        public TransactionBatch getMockedTxBatch() {
-            return mockedTxBatch;
-        }
-    }
-
-    private static class MockedDelemiteredRecordHiveMapper extends 
DelimitedRecordHiveMapper {
-        private final RecordWriter mockedRecordWriter;
-
-        public MockedDelemiteredRecordHiveMapper() {
-            this.mockedRecordWriter = Mockito.mock(RecordWriter.class);
-        }
-
-        @Override
-        public RecordWriter createRecordWriter(HiveEndPoint endPoint) throws 
StreamingException, IOException, ClassNotFoundException {
-            return mockedRecordWriter;
-        }
-
-        public RecordWriter getMockedRecordWriter() {
-            return mockedRecordWriter;
-        }
-    }
+    int timeout = 10000; // msec
+    UserGroupInformation ugi = null;
+    private ExecutorService callTimeoutPool;
 
     public TestHiveWriter() throws Exception {
         port = 9083;
         metaStoreURI = null;
         int callTimeoutPoolSize = 1;
         callTimeoutPool = Executors.newFixedThreadPool(callTimeoutPoolSize,
-                new 
ThreadFactoryBuilder().setNameFormat("hiveWriterTest").build());
+                                                       new 
ThreadFactoryBuilder().setNameFormat("hiveWriterTest").build());
 
         // 1) Start metastore
         conf = HiveSetupUtil.getHiveConf();
         TxnDbUtil.setConfValues(conf);
-        if(metaStoreURI!=null) {
+        if (metaStoreURI != null) {
             conf.setVar(HiveConf.ConfVars.METASTOREURIS, metaStoreURI);
         }
     }
@@ -142,23 +78,23 @@ public class TestHiveWriter {
     @Test
     public void testInstantiate() throws Exception {
         DelimitedRecordHiveMapper mapper = new 
MockedDelemiteredRecordHiveMapper()
-                .withColumnFields(new Fields(colNames))
-                .withPartitionFields(new Fields(partNames));
+            .withColumnFields(new Fields(colNames))
+            .withPartitionFields(new Fields(partNames));
         HiveEndPoint endPoint = new HiveEndPoint(metaStoreURI, dbName, 
tblName, Arrays.asList(partitionVals));
         TestingHiveWriter writer = new TestingHiveWriter(endPoint, 10, true, 
timeout
-                ,callTimeoutPool, mapper, ugi, false);
+            , callTimeoutPool, mapper, ugi, false);
         writer.close();
     }
 
     @Test
     public void testWriteBasic() throws Exception {
         DelimitedRecordHiveMapper mapper = new 
MockedDelemiteredRecordHiveMapper()
-                .withColumnFields(new Fields(colNames))
-                .withPartitionFields(new Fields(partNames));
+            .withColumnFields(new Fields(colNames))
+            .withPartitionFields(new Fields(partNames));
         HiveEndPoint endPoint = new HiveEndPoint(metaStoreURI, dbName, 
tblName, Arrays.asList(partitionVals));
         TestingHiveWriter writer = new TestingHiveWriter(endPoint, 10, true, 
timeout
-                , callTimeoutPool, mapper, ugi, false);
-        writeTuples(writer,mapper,3);
+            , callTimeoutPool, mapper, ugi, false);
+        writeTuples(writer, mapper, 3);
         writer.flush(false);
         writer.close();
         Mockito.verify(writer.getMockedTxBatch(), 
Mockito.times(3)).write(Mockito.any(byte[].class));
@@ -167,15 +103,15 @@ public class TestHiveWriter {
     @Test
     public void testWriteMultiFlush() throws Exception {
         DelimitedRecordHiveMapper mapper = new 
MockedDelemiteredRecordHiveMapper()
-                .withColumnFields(new Fields(colNames))
-                .withPartitionFields(new Fields(partNames));
+            .withColumnFields(new Fields(colNames))
+            .withPartitionFields(new Fields(partNames));
 
         HiveEndPoint endPoint = new HiveEndPoint(metaStoreURI, dbName, 
tblName, Arrays.asList(partitionVals));
         TestingHiveWriter writer = new TestingHiveWriter(endPoint, 10, true, 
timeout
-                , callTimeoutPool, mapper, ugi, false);
-        Tuple tuple = generateTestTuple("1","abc");
+            , callTimeoutPool, mapper, ugi, false);
+        Tuple tuple = generateTestTuple("1", "abc");
         writer.write(mapper.mapRecord(tuple));
-        tuple = generateTestTuple("2","def");
+        tuple = generateTestTuple("2", "def");
         writer.write(mapper.mapRecord(tuple));
         Assert.assertEquals(writer.getTotalRecords(), 2);
         Mockito.verify(writer.getMockedTxBatch(), 
Mockito.times(2)).write(Mockito.any(byte[].class));
@@ -184,11 +120,11 @@ public class TestHiveWriter {
         Assert.assertEquals(writer.getTotalRecords(), 0);
         Mockito.verify(writer.getMockedTxBatch(), 
Mockito.atLeastOnce()).commit();
 
-        tuple = generateTestTuple("3","ghi");
+        tuple = generateTestTuple("3", "ghi");
         writer.write(mapper.mapRecord(tuple));
         writer.flush(true);
 
-        tuple = generateTestTuple("4","klm");
+        tuple = generateTestTuple("4", "klm");
         writer.write(mapper.mapRecord(tuple));
         writer.flush(true);
         writer.close();
@@ -198,7 +134,7 @@ public class TestHiveWriter {
     private Tuple generateTestTuple(Object id, Object msg) {
         TopologyBuilder builder = new TopologyBuilder();
         GeneralTopologyContext topologyContext = new 
GeneralTopologyContext(builder.createTopology(),
-                new Config(), new HashMap(), new HashMap(), new HashMap(), "") 
{
+                                                                            
new Config(), new HashMap(), new HashMap(), new HashMap(), "") {
             @Override
             public Fields getComponentOutputFields(String componentId, String 
streamId) {
                 return new Fields("id", "msg");
@@ -208,13 +144,64 @@ public class TestHiveWriter {
     }
 
     private void writeTuples(HiveWriter writer, HiveMapper mapper, int count)
-            throws HiveWriter.WriteFailure, InterruptedException, 
SerializationError {
+        throws HiveWriter.WriteFailure, InterruptedException, 
SerializationError {
         Integer id = 100;
         String msg = "test-123";
         for (int i = 1; i <= count; i++) {
-            Tuple tuple = generateTestTuple(id,msg);
+            Tuple tuple = generateTestTuple(id, msg);
             writer.write(mapper.mapRecord(tuple));
         }
     }
 
+    private static class TestingHiveWriter extends HiveWriter {
+
+        private StreamingConnection mockedStreamingConn;
+        private TransactionBatch mockedTxBatch;
+
+        public TestingHiveWriter(HiveEndPoint endPoint, int txnsPerBatch, 
boolean autoCreatePartitions, long callTimeout,
+                                 ExecutorService callTimeoutPool, HiveMapper 
mapper, UserGroupInformation ugi,
+                                 boolean tokenAuthEnabled) throws 
InterruptedException, ConnectFailure {
+            super(endPoint, txnsPerBatch, autoCreatePartitions, callTimeout, 
callTimeoutPool, mapper, ugi, tokenAuthEnabled);
+        }
+
+        @Override
+        synchronized StreamingConnection newConnection(UserGroupInformation 
ugi, boolean tokenAuthEnabled) throws InterruptedException,
+            ConnectFailure {
+            if (mockedStreamingConn == null) {
+                mockedStreamingConn = Mockito.mock(StreamingConnection.class);
+                mockedTxBatch = Mockito.mock(TransactionBatch.class);
+
+                try {
+                    
Mockito.when(mockedStreamingConn.fetchTransactionBatch(Mockito.anyInt(), 
Mockito.any(RecordWriter.class)))
+                           .thenReturn(mockedTxBatch);
+                } catch (StreamingException e) {
+                    throw new RuntimeException(e);
+                }
+            }
+
+            return mockedStreamingConn;
+        }
+
+        public TransactionBatch getMockedTxBatch() {
+            return mockedTxBatch;
+        }
+    }
+
+    private static class MockedDelemiteredRecordHiveMapper extends 
DelimitedRecordHiveMapper {
+        private final RecordWriter mockedRecordWriter;
+
+        public MockedDelemiteredRecordHiveMapper() {
+            this.mockedRecordWriter = Mockito.mock(RecordWriter.class);
+        }
+
+        @Override
+        public RecordWriter createRecordWriter(HiveEndPoint endPoint) throws 
StreamingException, IOException, ClassNotFoundException {
+            return mockedRecordWriter;
+        }
+
+        public RecordWriter getMockedRecordWriter() {
+            return mockedRecordWriter;
+        }
+    }
+
 }
\ No newline at end of file

Reply via email to