This is an automated email from the ASF dual-hosted git repository.

dkuzmenko pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hive.git


The following commit(s) were added to refs/heads/master by this push:
     new 4fb7717f9d6 HIVE-28351: TestStreaming to run on Tez (#6145)
4fb7717f9d6 is described below

commit 4fb7717f9d636de86247da52c1ef5f4dfffe4964
Author: Denys Kuzmenko <[email protected]>
AuthorDate: Thu Oct 23 00:35:28 2025 +0200

    HIVE-28351: TestStreaming to run on Tez (#6145)
---
 .../org/apache/hive/streaming/TestStreaming.java   | 761 +++++++++------------
 1 file changed, 336 insertions(+), 425 deletions(-)

diff --git a/streaming/src/test/org/apache/hive/streaming/TestStreaming.java 
b/streaming/src/test/org/apache/hive/streaming/TestStreaming.java
index 5ee780005a6..3177a171283 100644
--- a/streaming/src/test/org/apache/hive/streaming/TestStreaming.java
+++ b/streaming/src/test/org/apache/hive/streaming/TestStreaming.java
@@ -18,12 +18,9 @@
 
 package org.apache.hive.streaming;
 
-import static 
org.apache.hadoop.hive.metastore.api.hive_metastoreConstants.BUCKET_COUNT;
-
 import java.io.ByteArrayInputStream;
 import java.io.ByteArrayOutputStream;
 import java.io.File;
-import java.io.FileFilter;
 import java.io.FileNotFoundException;
 import java.io.IOException;
 import java.io.InputStream;
@@ -39,6 +36,7 @@
 import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
+import java.util.Objects;
 import java.util.Set;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicBoolean;
@@ -57,6 +55,7 @@
 import org.apache.hadoop.hive.common.ValidTxnList;
 import org.apache.hadoop.hive.common.ValidWriteIdList;
 import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.hadoop.hive.conf.HiveConfForTest;
 import org.apache.hadoop.hive.conf.Validator;
 import org.apache.hadoop.hive.metastore.HiveMetaStoreClient;
 import org.apache.hadoop.hive.metastore.IMetaStoreClient;
@@ -71,23 +70,18 @@
 import org.apache.hadoop.hive.metastore.api.TableValidWriteIds;
 import org.apache.hadoop.hive.metastore.api.TxnAbortedException;
 import org.apache.hadoop.hive.metastore.api.TxnInfo;
-import org.apache.hadoop.hive.metastore.api.hive_metastoreConstants;
 import org.apache.hadoop.hive.metastore.conf.MetastoreConf;
 import org.apache.hadoop.hive.metastore.txn.service.AcidHouseKeeperService;
 import org.apache.hadoop.hive.metastore.txn.TxnCommonUtils;
 import org.apache.hadoop.hive.metastore.utils.TestTxnDbUtil;
 import org.apache.hadoop.hive.metastore.txn.TxnStore;
 import org.apache.hadoop.hive.metastore.txn.TxnUtils;
-import org.apache.hadoop.hive.metastore.utils.TestTxnDbUtil;
 import org.apache.hadoop.hive.ql.DriverFactory;
 import org.apache.hadoop.hive.ql.IDriver;
 import org.apache.hadoop.hive.ql.io.AcidDirectory;
 import org.apache.hadoop.hive.ql.io.AcidUtils;
 import org.apache.hadoop.hive.ql.io.BucketCodec;
-import org.apache.hadoop.hive.ql.io.IOConstants;
 import org.apache.hadoop.hive.ql.io.orc.OrcFile;
-import org.apache.hadoop.hive.ql.io.orc.OrcInputFormat;
-import org.apache.hadoop.hive.ql.io.orc.OrcStruct;
 import org.apache.hadoop.hive.ql.io.orc.Reader;
 import org.apache.hadoop.hive.ql.io.orc.RecordReader;
 import org.apache.hadoop.hive.ql.metadata.Table;
@@ -100,29 +94,22 @@
 import 
org.apache.hadoop.hive.serde2.objectinspector.primitive.WritableIntObjectInspector;
 import 
org.apache.hadoop.hive.serde2.objectinspector.primitive.WritableLongObjectInspector;
 import 
org.apache.hadoop.hive.serde2.objectinspector.primitive.WritableStringObjectInspector;
-import org.apache.hadoop.io.NullWritable;
-import org.apache.hadoop.mapred.InputFormat;
-import org.apache.hadoop.mapred.InputSplit;
-import org.apache.hadoop.mapred.JobConf;
-import org.apache.hadoop.mapred.Reporter;
 import org.apache.orc.impl.OrcAcidUtils;
 import org.apache.orc.tools.FileDump;
 import org.apache.thrift.TException;
 import org.junit.After;
 import org.junit.Assert;
 import org.junit.Before;
-import org.junit.Ignore;
 import org.junit.Rule;
 import org.junit.Test;
 import org.junit.rules.TemporaryFolder;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-@Ignore("HIVE-24219")
 public class TestStreaming {
   private static final Logger LOG = 
LoggerFactory.getLogger(TestStreaming.class);
 
-  public static class RawFileSystem extends RawLocalFileSystem {
+  private static final class RawFileSystem extends RawLocalFileSystem {
     private static final URI NAME;
 
     static {
@@ -147,7 +134,7 @@ public String getScheme() {
     public FileStatus getFileStatus(Path path) throws IOException {
       File file = pathToFile(path);
       if (!file.exists()) {
-        throw new FileNotFoundException("Can'table find " + path);
+        throw new FileNotFoundException("Can't find " + path);
       }
       // get close enough
       short mod = 0;
@@ -177,16 +164,12 @@ public FileStatus getFileStatus(Path path) throws 
IOException {
   // partitioned table
   private final static String dbName = "testing";
   private final static String tblName = "alerts";
-  private final static String[] fieldNames = new String[]{COL1, COL2};
   static List<String> partitionVals;
   private static Path partLoc;
-  private static Path partLoc2;
 
   // unpartitioned table
   private final static String dbName2 = "testing2";
   private final static String tblName2 = "alerts";
-  private final static String[] fieldNames2 = new String[]{COL1, COL2};
-
 
   // for bucket join testing
   private final static String dbName3 = "testing3";
@@ -195,38 +178,39 @@ public FileStatus getFileStatus(Path path) throws 
IOException {
   private final static String tblName4 = "factTable";
   List<String> partitionVals2;
 
-
   private final String PART1_CONTINENT = "Asia";
   private final String PART1_COUNTRY = "India";
 
   @Rule
   public TemporaryFolder dbFolder = new TemporaryFolder();
 
-
   public TestStreaming() throws Exception {
-    partitionVals = new ArrayList<String>(2);
+    partitionVals = new ArrayList<>(2);
     partitionVals.add(PART1_CONTINENT);
     partitionVals.add(PART1_COUNTRY);
 
-    partitionVals2 = new ArrayList<String>(1);
+    partitionVals2 = new ArrayList<>(1);
     partitionVals2.add(PART1_COUNTRY);
 
+    conf = new HiveConfForTest(this.getClass());
+    // Multiple tests requires more than one buckets per write. Use a very 
small value for grouping size to create
+    // multiple mapper instances with FileSinkOperators. The number of buckets 
are depends on the size of the data
+    // written and the grouping size. Most test cases expects 2 buckets.
+    conf.set("tez.grouping.max-size", "10");
+    conf.set("tez.grouping.min-size", "1");
 
-    conf = new HiveConf(this.getClass());
     conf.set("fs.raw.impl", RawFileSystem.class.getName());
     conf.setVar(HiveConf.ConfVars.HIVE_AUTHORIZATION_MANAGER,
       
"org.apache.hadoop.hive.ql.security.authorization.plugin.sqlstd.SQLStdHiveAuthorizerFactory");
-    TestTxnDbUtil.setConfValues(conf);
     conf.setBoolVar(HiveConf.ConfVars.METASTORE_EXECUTE_SET_UGI, true);
     conf.setBoolVar(HiveConf.ConfVars.HIVE_SUPPORT_CONCURRENCY, true);
+
     dbFolder.create();
     MetastoreConf.setVar(conf, MetastoreConf.ConfVars.WAREHOUSE, "raw://" + 
dbFolder.newFolder("warehouse"));
 
-    //1) Start from a clean slate (metastore)
-    TestTxnDbUtil.cleanDb(conf);
+    TestTxnDbUtil.setConfValues(conf);
     TestTxnDbUtil.prepDb(conf);
 
-    //2) obtain metastore clients
     msClient = new HiveMetaStoreClient(conf);
   }
 
@@ -235,6 +219,7 @@ public void setup() throws Exception {
     SessionState.start(new CliSessionState(conf));
     driver = DriverFactory.newDriver(conf);
     driver.setMaxRows(200002);//make sure Driver returns all results
+
     // drop and recreate the necessary databases and tables
     dropDB(msClient, dbName);
 
@@ -248,34 +233,39 @@ public void setup() throws Exception {
 
     dropDB(msClient, dbName2);
     String loc2 = dbFolder.newFolder(dbName2 + ".db").toString();
-    partLoc2 = createDbAndTable(driver, dbName2, tblName2, null, colNames, 
colTypes, bucketCols, null, loc2, 2);
+    createDbAndTable(driver, dbName2, tblName2, null, colNames, colTypes, 
bucketCols, null, loc2, 2);
 
+    dropDB(msClient, "testing5");
     String loc3 = dbFolder.newFolder("testing5.db").toString();
     createStoreSales("testing5", loc3);
 
-    runDDL(driver, "drop table testBucketing3.streamedtable");
-    runDDL(driver, "drop table testBucketing3.finaltable");
-    runDDL(driver, "drop table testBucketing3.nobucket");
+    dropTables(driver,
+        "testBucketing3.streamedtable",
+        "testBucketing3.finaltable",
+        "testBucketing3.nobucket");
   }
 
   @After
-  public void cleanup() {
-    msClient.close();
-    driver.close();
+  public void cleanup() throws Exception {
+    try {
+      msClient.close();
+      driver.close();
+    } finally {
+      TestTxnDbUtil.cleanDb(conf);
+    }
   }
 
-  private void createStoreSales(String dbName, String loc) throws Exception {
+  private void createStoreSales(String dbName, String loc) {
     String dbUri = "raw://" + new Path(loc).toUri().toString();
     String tableLoc = dbUri + Path.SEPARATOR + "store_sales";
 
-    boolean success = runDDL(driver, "create database IF NOT EXISTS " + dbName 
+ " location '" + dbUri + "'");
+    boolean success = runQuery(driver, "create database IF NOT EXISTS " + 
dbName + " location '" + dbUri + "'");
     Assert.assertTrue(success);
-    success = runDDL(driver, "use " + dbName);
+    success = runQuery(driver, "use " + dbName);
     Assert.assertTrue(success);
 
-    success = runDDL(driver, "drop table if exists store_sales");
-    Assert.assertTrue(success);
-    success = runDDL(driver, "create table store_sales\n" +
+    dropTables(driver, "store_sales");
+    success = runQuery(driver, "create table store_sales\n" +
       "(\n" +
       "    ss_sold_date_sk           int,\n" +
       "    ss_sold_time_sk           int,\n" +
@@ -307,26 +297,24 @@ private void createStoreSales(String dbName, String loc) 
throws Exception {
       "  TBLPROPERTIES ('orc.compress'='NONE', 'transactional'='true')");
     Assert.assertTrue(success);
 
-    success = runDDL(driver, "alter table store_sales add 
partition(dt='2015')");
+    success = runQuery(driver, "alter table store_sales add 
partition(dt='2015')");
     Assert.assertTrue(success);
   }
 
   /**
    * make sure it works with table where bucket col is not 1st col
-   *
-   * @throws Exception
    */
   @Test
   public void testBucketingWhereBucketColIsNotFirstCol() throws Exception {
-    List<String> partitionVals = new ArrayList<String>();
-    partitionVals.add("2015");
+    List<String> newPartVals = new ArrayList<>();
+    newPartVals.add("2015");
     StrictDelimitedInputWriter writer = StrictDelimitedInputWriter.newBuilder()
       .withFieldDelimiter(',')
       .build();
     HiveStreamingConnection connection = HiveStreamingConnection.newBuilder()
       .withDatabase("testing5")
       .withTable("store_sales")
-      .withStaticPartitionValues(partitionVals)
+      .withStaticPartitionValues(newPartVals)
       .withAgentInfo("UT_" + Thread.currentThread().getName())
       .withRecordWriter(writer)
       .withHiveConf(conf)
@@ -347,7 +335,7 @@ public void testBucketingWhereBucketColIsNotFirstCol() 
throws Exception {
     connection.commitTransaction();
     connection.close();
 
-    ArrayList<String> res = queryTable(driver, "select row__id.bucketid, * 
from testing5.store_sales");
+    List<String> res = queryTable(driver, "select row__id.bucketid, * from 
testing5.store_sales");
     for (String re : res) {
       System.out.println(re);
     }
@@ -358,13 +346,13 @@ public void testBucketingWhereBucketColIsNotFirstCol() 
throws Exception {
    */
   @Test
   public void testNoBuckets() throws Exception {
-    queryTable(driver, "drop table if exists default.streamingnobuckets");
-    queryTable(driver, "create table default.streamingnobuckets (a string, b 
string) stored as orc " +
+    dropTables(driver, "default.streamingnobuckets");
+    runQuery(driver, "create table default.streamingnobuckets (a string, b 
string) stored as orc " +
       "TBLPROPERTIES('transactional'='true')");
-    queryTable(driver, "insert into default.streamingnobuckets 
values('foo','bar')");
+    runQuery(driver, "insert into default.streamingnobuckets 
values('foo','bar')");
     List<String> rs = queryTable(driver, "select * from 
default.streamingnobuckets");
     Assert.assertEquals(1, rs.size());
-    Assert.assertEquals("foo\tbar", rs.get(0));
+    Assert.assertEquals("foo\tbar", rs.getFirst());
     StrictDelimitedInputWriter wr = StrictDelimitedInputWriter.newBuilder()
       .withFieldDelimiter(',')
       .build();
@@ -382,9 +370,9 @@ public void testNoBuckets() throws Exception {
     connection.write("a3,b4".getBytes());
     TxnStore txnHandler = TxnUtils.getTxnStore(conf);
     ShowLocksResponse resp = txnHandler.showLocks(new ShowLocksRequest());
-    Assert.assertEquals(resp.getLocksSize(), 1);
-    Assert.assertEquals("streamingnobuckets", 
resp.getLocks().get(0).getTablename());
-    Assert.assertEquals("default", resp.getLocks().get(0).getDbname());
+    Assert.assertEquals(1, resp.getLocksSize());
+    Assert.assertEquals("streamingnobuckets", 
resp.getLocks().getFirst().getTablename());
+    Assert.assertEquals("default", resp.getLocks().getFirst().getDbname());
     connection.commitTransaction();
     connection.beginTransaction();
     connection.write("a5,b6".getBytes());
@@ -406,27 +394,27 @@ public void testNoBuckets() throws Exception {
     Assert.assertTrue(rs.get(4), 
rs.get(4).startsWith("{\"writeid\":3,\"bucketid\":536870912,\"rowid\":1}\ta7\tb8"));
     Assert.assertTrue(rs.get(4), 
rs.get(4).endsWith("streamingnobuckets/delta_0000002_0000003/bucket_00000"));
 
-    queryTable(driver, "update default.streamingnobuckets set a=0, b=0 where 
a='a7'");
-    queryTable(driver, "delete from default.streamingnobuckets where a='a1'");
+    runQuery(driver, "update default.streamingnobuckets set a=0, b=0 where 
a='a7'");
+    runQuery(driver, "delete from default.streamingnobuckets where a='a1'");
     rs = queryTable(driver, "select a, b from default.streamingnobuckets order 
by a, b");
-    int row = 0;
-    Assert.assertEquals("at row=" + row, "0\t0", rs.get(row++));
-    Assert.assertEquals("at row=" + row, "a3\tb4", rs.get(row++));
-    Assert.assertEquals("at row=" + row, "a5\tb6", rs.get(row++));
-    Assert.assertEquals("at row=" + row, "foo\tbar", rs.get(row++));
 
-    queryTable(driver, "alter table default.streamingnobuckets compact 
'major'");
+    Assert.assertEquals("at row=" + 0, "0\t0", rs.get(0));
+    Assert.assertEquals("at row=" + 1, "a3\tb4", rs.get(1));
+    Assert.assertEquals("at row=" + 2, "a5\tb6", rs.get(2));
+    Assert.assertEquals("at row=" + 3, "foo\tbar", rs.get(3));
+
+    runQuery(driver, "alter table default.streamingnobuckets compact 'major'");
     runWorker(conf);
     rs = queryTable(driver, "select ROW__ID, a, b, INPUT__FILE__NAME from 
default.streamingnobuckets order by ROW__ID");
 
     Assert.assertTrue(rs.get(0), 
rs.get(0).startsWith("{\"writeid\":1,\"bucketid\":536870912,\"rowid\":0}\tfoo\tbar"));
-    Assert.assertTrue(rs.get(0), 
rs.get(0).endsWith("streamingnobuckets/base_0000005_v0000024/bucket_00000"));
+    Assert.assertTrue(rs.get(0), 
rs.get(0).endsWith("streamingnobuckets/base_0000005_v0000016/bucket_00000"));
     Assert.assertTrue(rs.get(1), 
rs.get(1).startsWith("{\"writeid\":2,\"bucketid\":536870912,\"rowid\":1}\ta3\tb4"));
-    Assert.assertTrue(rs.get(1), 
rs.get(1).endsWith("streamingnobuckets/base_0000005_v0000024/bucket_00000"));
+    Assert.assertTrue(rs.get(1), 
rs.get(1).endsWith("streamingnobuckets/base_0000005_v0000016/bucket_00000"));
     Assert.assertTrue(rs.get(2), 
rs.get(2).startsWith("{\"writeid\":3,\"bucketid\":536870912,\"rowid\":0}\ta5\tb6"));
-    Assert.assertTrue(rs.get(2), 
rs.get(2).endsWith("streamingnobuckets/base_0000005_v0000024/bucket_00000"));
+    Assert.assertTrue(rs.get(2), 
rs.get(2).endsWith("streamingnobuckets/base_0000005_v0000016/bucket_00000"));
     Assert.assertTrue(rs.get(3), 
rs.get(3).startsWith("{\"writeid\":4,\"bucketid\":536870913,\"rowid\":0}\t0\t0"));
-    Assert.assertTrue(rs.get(3), 
rs.get(3).endsWith("streamingnobuckets/base_0000005_v0000024/bucket_00000"));
+    Assert.assertTrue(rs.get(3), 
rs.get(3).endsWith("streamingnobuckets/base_0000005_v0000016/bucket_00000"));
   }
 
   @Test
@@ -448,14 +436,14 @@ public void testGetDeltaPath() throws Exception {
 
   @Test
   public void testCommitWithKeyValue() throws Exception {
-    queryTable(driver, "drop table if exists default.keyvalue");
-    queryTable(driver, "create table default.keyvalue (a string, b string) 
stored as orc " +
+    dropTables(driver, "default.keyvalue");
+    runQuery(driver, "create table default.keyvalue (a string, b string) 
stored as orc " +
         "TBLPROPERTIES('transactional'='true')");
-    queryTable(driver, "insert into default.keyvalue values('foo','bar')");
-    queryTable(driver, "ALTER TABLE default.keyvalue SET 
TBLPROPERTIES('_metamykey' = 'myvalue')");
+    runQuery(driver, "insert into default.keyvalue values('foo','bar')");
+    runQuery(driver, "ALTER TABLE default.keyvalue SET 
TBLPROPERTIES('_metamykey' = 'myvalue')");
     List<String> rs = queryTable(driver, "select * from default.keyvalue");
     Assert.assertEquals(1, rs.size());
-    Assert.assertEquals("foo\tbar", rs.get(0));
+    Assert.assertEquals("foo\tbar", rs.getFirst());
     StrictDelimitedInputWriter wr = StrictDelimitedInputWriter.newBuilder()
         .withFieldDelimiter(',')
         .build();
@@ -485,14 +473,14 @@ public void testCommitWithKeyValue() throws Exception {
 
   @Test
   public void testConnectionWithWriteId() throws Exception {
-    queryTable(driver, "drop table if exists default.writeidconnection");
-    queryTable(driver, "create table default.writeidconnection (a string, b 
string) stored as orc " +
+    dropTables(driver, "default.writeidconnection");
+    runQuery(driver, "create table default.writeidconnection (a string, b 
string) stored as orc " +
         "TBLPROPERTIES('transactional'='true')");
-    queryTable(driver, "insert into default.writeidconnection 
values('a0','bar')");
+    runQuery(driver, "insert into default.writeidconnection 
values('a0','bar')");
 
     List<String> rs = queryTable(driver, "select * from 
default.writeidconnection");
     Assert.assertEquals(1, rs.size());
-    Assert.assertEquals("a0\tbar", rs.get(0));
+    Assert.assertEquals("a0\tbar", rs.getFirst());
 
     StrictDelimitedInputWriter writerT = 
StrictDelimitedInputWriter.newBuilder()
         .withFieldDelimiter(',')
@@ -556,8 +544,7 @@ public void testConnectionWithWriteId() throws Exception {
       connectionOne.beginTransaction();
       Assert.fail("second beginTransaction should have thrown a "
           + "StreamingException");
-    } catch (StreamingException e) {
-
+    } catch (StreamingException ignored) {
     }
 
     connectionOne.close();
@@ -585,8 +572,8 @@ public void testConnectionWithWriteId() throws Exception {
 
   @Test
   public void testAllTypesDelimitedWriter() throws Exception {
-    queryTable(driver, "drop table if exists default.alltypes");
-    queryTable(driver,
+    dropTables(driver, "default.alltypes");
+    runQuery(driver,
       "create table if not exists default.alltypes ( bo boolean, ti tinyint, 
si smallint, i int, bi bigint, " +
         "f float, d double, de decimal(10,3), ts timestamp, da date, s string, 
c char(5), vc varchar(5), " +
         "m map<string, string>, l array<int>, st struct<c1:int, c2:string> ) " 
+
@@ -618,7 +605,7 @@ public void testAllTypesDelimitedWriter() throws Exception {
     List<String> rs = queryTable(driver, "select ROW__ID, bo, ti, si, i, bi, 
f, d, de, ts, da, s, c, vc, m, l, st," +
       " INPUT__FILE__NAME from default.alltypes order by ROW__ID");
     Assert.assertEquals(2, rs.size());
-    String gotRow1 = rs.get(0);
+    String gotRow1 = rs.getFirst();
     String expectedPrefixRow1 = "{\"writeid\":1,\"bucketid\":536870912," +
       "\"rowid\":0}\ttrue\t10\t100\t1000\t10000\t4.0\t20.0\t4.222\t1969-12-31 
15:59:58.174\t1970-01-01\tstring" +
       "\thello\thello\t{\"k1\":\"v1\"}\t[100,200]\t{\"c1\":10,\"c2\":\"foo\"}";
@@ -636,8 +623,8 @@ public void testAllTypesDelimitedWriter() throws Exception {
 
   @Test
   public void testAllTypesDelimitedWriterInputStream() throws Exception {
-    queryTable(driver, "drop table if exists default.alltypes");
-    queryTable(driver,
+    dropTables(driver, "default.alltypes");
+    runQuery(driver,
       "create table if not exists default.alltypes ( bo boolean, ti tinyint, 
si smallint, i int, bi bigint, " +
         "f float, d double, de decimal(10,3), ts timestamp, da date, s string, 
c char(5), vc varchar(5), " +
         "m map<string, string>, l array<int>, st struct<c1:int, c2:string> ) " 
+
@@ -672,7 +659,7 @@ public void testAllTypesDelimitedWriterInputStream() throws 
Exception {
     List<String> rs = queryTable(driver, "select ROW__ID, bo, ti, si, i, bi, 
f, d, de, ts, da, s, c, vc, m, l, st," +
       " INPUT__FILE__NAME from default.alltypes order by ROW__ID");
     Assert.assertEquals(2, rs.size());
-    String gotRow1 = rs.get(0);
+    String gotRow1 = rs.getFirst();
     String expectedPrefixRow1 = "{\"writeid\":1,\"bucketid\":536870912," +
       "\"rowid\":0}\ttrue\t10\t100\t1000\t10000\t4.0\t20.0\t4.222\t1969-12-31 
15:59:58.174\t1970-01-01\tstring" +
       "\thello\thello\t{\"k1\":\"v1\"}\t[100,200]\t{\"c1\":10,\"c2\":\"foo\"}";
@@ -690,13 +677,13 @@ public void testAllTypesDelimitedWriterInputStream() 
throws Exception {
 
   @Test
   public void testAutoRollTransactionBatch() throws Exception {
-    queryTable(driver, "drop table if exists default.streamingnobuckets");
-    queryTable(driver, "create table default.streamingnobuckets (a string, b 
string) stored as orc " +
+    dropTables(driver, "default.streamingnobuckets");
+    runQuery(driver, "create table default.streamingnobuckets (a string, b 
string) stored as orc " +
       "TBLPROPERTIES('transactional'='true')");
-    queryTable(driver, "insert into default.streamingnobuckets 
values('foo','bar')");
+    runQuery(driver, "insert into default.streamingnobuckets 
values('foo','bar')");
     List<String> rs = queryTable(driver, "select * from 
default.streamingnobuckets");
     Assert.assertEquals(1, rs.size());
-    Assert.assertEquals("foo\tbar", rs.get(0));
+    Assert.assertEquals("foo\tbar", rs.getFirst());
     StrictDelimitedInputWriter wr = StrictDelimitedInputWriter.newBuilder()
       .withFieldDelimiter(',')
       .build();
@@ -751,36 +738,36 @@ public void testAutoRollTransactionBatch() throws 
Exception {
     Assert.assertTrue(rs.get(8), 
rs.get(8).startsWith("{\"writeid\":5,\"bucketid\":536870912,\"rowid\":1}\ta15\tb16"));
     Assert.assertTrue(rs.get(8), 
rs.get(8).endsWith("streamingnobuckets/delta_0000004_0000005/bucket_00000"));
 
-    queryTable(driver, "update default.streamingnobuckets set a=0, b=0 where 
a='a7'");
-    queryTable(driver, "delete from default.streamingnobuckets where a='a1'");
-    queryTable(driver, "update default.streamingnobuckets set a=0, b=0 where 
a='a15'");
-    queryTable(driver, "delete from default.streamingnobuckets where a='a9'");
+    runQuery(driver, "update default.streamingnobuckets set a=0, b=0 where 
a='a7'");
+    runQuery(driver, "delete from default.streamingnobuckets where a='a1'");
+    runQuery(driver, "update default.streamingnobuckets set a=0, b=0 where 
a='a15'");
+    runQuery(driver, "delete from default.streamingnobuckets where a='a9'");
     rs = queryTable(driver, "select a, b from default.streamingnobuckets order 
by a, b");
-    int row = 0;
-    Assert.assertEquals("at row=" + row, "0\t0", rs.get(row++));
-    Assert.assertEquals("at row=" + row, "0\t0", rs.get(row++));
-    Assert.assertEquals("at row=" + row, "a11\tb12", rs.get(row++));
-    Assert.assertEquals("at row=" + row, "a13\tb14", rs.get(row++));
-    Assert.assertEquals("at row=" + row, "a3\tb4", rs.get(row++));
-    Assert.assertEquals("at row=" + row, "a5\tb6", rs.get(row++));
-    Assert.assertEquals("at row=" + row, "foo\tbar", rs.get(row++));
-
-    queryTable(driver, "alter table default.streamingnobuckets compact 
'major'");
+
+    Assert.assertEquals("at row=" + 0, "0\t0", rs.get(0));
+    Assert.assertEquals("at row=" + 1, "0\t0", rs.get(1));
+    Assert.assertEquals("at row=" + 2, "a11\tb12", rs.get(2));
+    Assert.assertEquals("at row=" + 3, "a13\tb14", rs.get(3));
+    Assert.assertEquals("at row=" + 4, "a3\tb4", rs.get(4));
+    Assert.assertEquals("at row=" + 5, "a5\tb6", rs.get(5));
+    Assert.assertEquals("at row=" + 6, "foo\tbar", rs.get(6));
+
+    runQuery(driver, "alter table default.streamingnobuckets compact 'major'");
     runWorker(conf);
     rs = queryTable(driver, "select ROW__ID, a, b, INPUT__FILE__NAME from 
default.streamingnobuckets order by ROW__ID");
 
     Assert.assertTrue(rs.get(0), 
rs.get(0).startsWith("{\"writeid\":1,\"bucketid\":536870912,\"rowid\":0}\tfoo\tbar"));
-    Assert.assertTrue(rs.get(0), 
rs.get(0).endsWith("streamingnobuckets/base_0000009_v0000028/bucket_00000"));
+    Assert.assertTrue(rs.get(0), 
rs.get(0).endsWith("streamingnobuckets/base_0000009_v0000020/bucket_00000"));
     Assert.assertTrue(rs.get(1), 
rs.get(1).startsWith("{\"writeid\":2,\"bucketid\":536870912,\"rowid\":1}\ta3\tb4"));
-    Assert.assertTrue(rs.get(1), 
rs.get(1).endsWith("streamingnobuckets/base_0000009_v0000028/bucket_00000"));
+    Assert.assertTrue(rs.get(1), 
rs.get(1).endsWith("streamingnobuckets/base_0000009_v0000020/bucket_00000"));
     Assert.assertTrue(rs.get(2), 
rs.get(2).startsWith("{\"writeid\":3,\"bucketid\":536870912,\"rowid\":0}\ta5\tb6"));
-    Assert.assertTrue(rs.get(2), 
rs.get(2).endsWith("streamingnobuckets/base_0000009_v0000028/bucket_00000"));
+    Assert.assertTrue(rs.get(2), 
rs.get(2).endsWith("streamingnobuckets/base_0000009_v0000020/bucket_00000"));
     Assert.assertTrue(rs.get(3), 
rs.get(3).startsWith("{\"writeid\":4,\"bucketid\":536870912,\"rowid\":1}\ta11\tb12"));
-    Assert.assertTrue(rs.get(3), 
rs.get(3).endsWith("streamingnobuckets/base_0000009_v0000028/bucket_00000"));
+    Assert.assertTrue(rs.get(3), 
rs.get(3).endsWith("streamingnobuckets/base_0000009_v0000020/bucket_00000"));
     Assert.assertTrue(rs.get(4), 
rs.get(4).startsWith("{\"writeid\":5,\"bucketid\":536870912,\"rowid\":0}\ta13\tb14"));
-    Assert.assertTrue(rs.get(4), 
rs.get(4).endsWith("streamingnobuckets/base_0000009_v0000028/bucket_00000"));
+    Assert.assertTrue(rs.get(4), 
rs.get(4).endsWith("streamingnobuckets/base_0000009_v0000020/bucket_00000"));
     Assert.assertTrue(rs.get(5), 
rs.get(5).startsWith("{\"writeid\":6,\"bucketid\":536936449,\"rowid\":0}\t0\t0"));
-    Assert.assertTrue(rs.get(5), 
rs.get(5).endsWith("streamingnobuckets/base_0000009_v0000028/bucket_00001"));
+    Assert.assertTrue(rs.get(5), 
rs.get(5).endsWith("streamingnobuckets/base_0000009_v0000020/bucket_00001"));
   }
 
   /**
@@ -805,19 +792,19 @@ public void testStreamBucketingMatchesRegularBucketing() 
throws Exception {
     String tableLoc2 = "'" + dbUri + Path.SEPARATOR + "finaltable" + "'";
     String tableLoc3 = "'" + dbUri + Path.SEPARATOR + "nobucket" + "'";
 
-    // disabling vectorization as this test yields incorrect results with 
vectorization
-    conf.setBoolVar(HiveConf.ConfVars.HIVE_VECTORIZATION_ENABLED, false);
     try (IDriver driver = DriverFactory.newDriver(conf)) {
-      runDDL(driver, "create database testBucketing3");
-      runDDL(driver, "use testBucketing3");
-      runDDL(driver, "create table streamedtable ( key1 string,key2 int,data 
string ) clustered by ( key1,key2 ) into "
-        + bucketCount + " buckets  stored as orc  location " + tableLoc + " 
TBLPROPERTIES ('transactional'='true')");
-      //  In 'nobucket' table we capture bucketid from streamedtable to 
workaround a hive bug that prevents joins two identically bucketed tables
-      runDDL(driver, "create table nobucket ( bucketid int, key1 string,key2 
int,data string ) location " + tableLoc3);
-      runDDL(driver,
-        "create table finaltable ( bucketid int, key1 string,key2 int,data 
string ) clustered by ( key1,key2 ) into "
-          + bucketCount + " buckets  stored as orc location " + tableLoc2 + " 
TBLPROPERTIES ('transactional'='true')");
-
+      runQuery(driver, "create database testBucketing3");
+      runQuery(driver, "use testBucketing3");
+      runQuery(driver, "create table streamedtable (key1 string, key2 int, 
data string)"
+        + " clustered by (key1, key2) into " + bucketCount + " buckets"
+        + " stored as orc  location " + tableLoc + " TBLPROPERTIES 
('transactional'='true')");
+      //  In 'nobucket' table we capture bucketid from streamedtable to 
workaround a hive bug
+      //  that prevents joins two identically bucketed tables
+      runQuery(driver, "create table nobucket (bucketid int, key1 string, key2 
int, data string)"
+        + " location " + tableLoc3);
+      runQuery(driver, "create table finaltable (bucketid int, key1 string, 
key2 int, data string)"
+        + " clustered by (key1, key2) into " + bucketCount + " buckets  "
+        + " stored as orc location " + tableLoc2 + " TBLPROPERTIES 
('transactional'='true')");
 
       String[] records = new String[]{
         "PSFAHYLZVC,29,EPNMA",
@@ -862,25 +849,22 @@ public void testStreamBucketingMatchesRegularBucketing() 
throws Exception {
       connection.commitTransaction();
       connection.close();
 
-      ArrayList<String> res1 = queryTable(driver, "select row__id.bucketid, * 
from streamedtable order by key2");
+      List<String> res1 = queryTable(driver, "select row__id.bucketid, * from 
streamedtable order by key2");
       for (String re : res1) {
         LOG.error(re);
       }
 
       driver.run("insert into nobucket select row__id.bucketid,* from 
streamedtable");
-      runDDL(driver, "insert into finaltable select * from nobucket");
-      ArrayList<String> res2 = queryTable(driver,
+      runQuery(driver, "insert into finaltable select * from nobucket");
+      List<String> res2 = queryTable(driver,
         "select row__id.bucketid,* from finaltable where 
row__id.bucketid<>bucketid");
       for (String s : res2) {
         LOG.error(s);
       }
       Assert.assertTrue(res2.isEmpty());
-    } finally {
-      conf.unset(HiveConf.ConfVars.HIVE_VECTORIZATION_ENABLED.varname);
     }
   }
 
-
   @Test
   public void testTableValidation() throws Exception {
     int bucketCount = 100;
@@ -892,13 +876,13 @@ public void testTableValidation() throws Exception {
     String tableLoc = "'" + dbUri + Path.SEPARATOR + tbl1 + "'";
     String tableLoc2 = "'" + dbUri + Path.SEPARATOR + tbl2 + "'";
 
-    runDDL(driver, "create database testBucketing3");
-    runDDL(driver, "use testBucketing3");
+    runQuery(driver, "create database testBucketing3");
+    runQuery(driver, "use testBucketing3");
 
-    runDDL(driver, "create table " + tbl1 + " ( key1 string, data string ) 
clustered by ( key1 ) into "
+    runQuery(driver, "create table " + tbl1 + " ( key1 string, data string ) 
clustered by ( key1 ) into "
       + bucketCount + " buckets  stored as orc  location " + tableLoc + " 
TBLPROPERTIES ('transactional'='false')");
 
-    runDDL(driver, "create table " + tbl2 + " ( key1 string, data string ) 
clustered by ( key1 ) into "
+    runQuery(driver, "create table " + tbl2 + " ( key1 string, data string ) 
clustered by ( key1 ) into "
       + bucketCount + " buckets  stored as orc  location " + tableLoc2 + " 
TBLPROPERTIES ('transactional'='false')");
 
     StrictDelimitedInputWriter writer = StrictDelimitedInputWriter.newBuilder()
@@ -913,7 +897,7 @@ public void testTableValidation() throws Exception {
         .withRecordWriter(writer)
         .withHiveConf(conf)
         .connect();
-      Assert.assertTrue("InvalidTable exception was not thrown", false);
+      Assert.fail("InvalidTable exception was not thrown");
     } catch (InvalidTable e) {
       // expecting this exception
     } finally {
@@ -929,7 +913,7 @@ public void testTableValidation() throws Exception {
         .withRecordWriter(writer)
         .withHiveConf(conf)
         .connect();
-      Assert.assertTrue("InvalidTable exception was not thrown", false);
+      Assert.fail("InvalidTable exception was not thrown");
     } catch (InvalidTable e) {
       // expecting this exception
     } finally {
@@ -939,71 +923,14 @@ public void testTableValidation() throws Exception {
     }
   }
 
-  /**
-   * @deprecated use {@link #checkDataWritten2(Path, long, long, int, String, 
boolean, String...)} -
-   * there is little value in using InputFormat directly
-   */
-  @Deprecated
-  private void checkDataWritten(Path partitionPath, long minTxn, long maxTxn, 
int buckets, int numExpectedFiles,
-    String... records) throws Exception {
-    ValidWriteIdList writeIds = getTransactionContext(conf);
-    AcidDirectory dir = AcidUtils.getAcidState(null, partitionPath, conf, 
writeIds, null, false);
-    Assert.assertEquals(0, dir.getObsolete().size());
-    Assert.assertEquals(0, dir.getOriginalFiles().size());
-    List<AcidUtils.ParsedDelta> current = dir.getCurrentDirectories();
-    System.out.println("Files found: ");
-    for (AcidUtils.ParsedDelta pd : current) {
-      System.out.println(pd.getPath().toString());
-    }
-    Assert.assertEquals(numExpectedFiles, current.size());
-
-    // find the absolute minimum transaction
-    long min = Long.MAX_VALUE;
-    long max = Long.MIN_VALUE;
-    for (AcidUtils.ParsedDelta pd : current) {
-      if (pd.getMaxWriteId() > max) {
-        max = pd.getMaxWriteId();
-      }
-      if (pd.getMinWriteId() < min) {
-        min = pd.getMinWriteId();
-      }
-    }
-    // We are doing +1, as DDL operation will also advance the write Id now.
-    Assert.assertEquals(minTxn + 1, min);
-    Assert.assertEquals(maxTxn + 1, max);
-
-    InputFormat inf = new OrcInputFormat();
-    JobConf job = new JobConf();
-    job.set("mapred.input.dir", partitionPath.toString());
-    job.set(BUCKET_COUNT, Integer.toString(buckets));
-    job.set(IOConstants.SCHEMA_EVOLUTION_COLUMNS, "id,msg");
-    job.set(IOConstants.SCHEMA_EVOLUTION_COLUMNS_TYPES, "bigint:string");
-    AcidUtils.setAcidOperationalProperties(job, true, null);
-    job.setBoolean(hive_metastoreConstants.TABLE_IS_TRANSACTIONAL, true);
-    job.set(ValidWriteIdList.VALID_WRITEIDS_KEY, writeIds.writeToString());
-    job.set(ValidTxnList.VALID_TXNS_KEY, 
conf.get(ValidTxnList.VALID_TXNS_KEY));
-    InputSplit[] splits = inf.getSplits(job, buckets);
-    Assert.assertEquals(numExpectedFiles, splits.length);
-    org.apache.hadoop.mapred.RecordReader<NullWritable, OrcStruct> rr =
-      inf.getRecordReader(splits[0], job, Reporter.NULL);
-
-    NullWritable key = rr.createKey();
-    OrcStruct value = rr.createValue();
-    for (String record : records) {
-      Assert.assertEquals(true, rr.next(key, value));
-      Assert.assertEquals(record, value.toString());
-    }
-    Assert.assertEquals(false, rr.next(key, value));
-  }
-
   /**
    * @param validationQuery query to read from table to compare data against 
{@code records}
    * @param records         expected data.  each row is CVS list of values
    */
-  private void checkDataWritten2(Path partitionPath, long minTxn, long maxTxn, 
int numExpectedFiles,
-    String validationQuery, boolean vectorize, String... records) throws 
Exception {
-    AcidDirectory dir =
-        AcidUtils.getAcidState(null, partitionPath, conf, 
getTransactionContext(conf), null, false);
+  private void checkDataWritten(Path partitionPath, long minTxn, long maxTxn, 
int numExpectedFiles,
+        String validationQuery, String... records) throws Exception {
+    ValidWriteIdList writeIds = getTransactionContext(conf);
+    AcidDirectory dir = AcidUtils.getAcidState(null, partitionPath, conf, 
writeIds, null, false);
     Assert.assertEquals(0, dir.getObsolete().size());
     Assert.assertEquals(0, dir.getOriginalFiles().size());
     List<AcidUtils.ParsedDelta> current = dir.getCurrentDirectories();
@@ -1024,13 +951,9 @@ private void checkDataWritten2(Path partitionPath, long 
minTxn, long maxTxn, int
         min = pd.getMinWriteId();
       }
     }
-    // We are doing +1, as DDL operation will also advance the write Id now.
+    // We are doing +1, as DDL operation will also advance the writeId now.
     Assert.assertEquals(minTxn + 1, min);
     Assert.assertEquals(maxTxn + 1, max);
-    boolean isVectorizationEnabled = 
conf.getBoolVar(HiveConf.ConfVars.HIVE_VECTORIZATION_ENABLED);
-    if (vectorize) {
-      conf.setBoolVar(HiveConf.ConfVars.HIVE_VECTORIZATION_ENABLED, true);
-    }
 
     String currStrategy = 
conf.getVar(HiveConf.ConfVars.HIVE_ORC_SPLIT_STRATEGY);
     for (String strategy : ((Validator.StringSet) 
HiveConf.ConfVars.HIVE_ORC_SPLIT_STRATEGY.getValidator())
@@ -1044,7 +967,6 @@ private void checkDataWritten2(Path partitionPath, long 
minTxn, long maxTxn, int
       }
     }
     conf.setVar(HiveConf.ConfVars.HIVE_ORC_SPLIT_STRATEGY, currStrategy);
-    conf.setBoolVar(HiveConf.ConfVars.HIVE_VECTORIZATION_ENABLED, 
isVectorizationEnabled);
   }
 
   private ValidWriteIdList getTransactionContext(Configuration conf) throws 
Exception {
@@ -1052,8 +974,9 @@ private ValidWriteIdList 
getTransactionContext(Configuration conf) throws Except
     conf.set(ValidTxnList.VALID_TXNS_KEY, validTxnList.writeToString());
     List<TableValidWriteIds> v = msClient.getValidWriteIds(Collections
         .singletonList(TableName.getDbTable(dbName, tblName)), 
validTxnList.writeToString());
-    return TxnCommonUtils.createValidReaderWriteIdList(v.get(0));
+    return TxnCommonUtils.createValidReaderWriteIdList(v.getFirst());
   }
+
   private void checkNothingWritten(Path partitionPath) throws Exception {
     AcidDirectory dir =
         AcidUtils.getAcidState(null, partitionPath, conf, 
getTransactionContext(conf), null, false);
@@ -1099,7 +1022,7 @@ public void testEndpointConnection() throws Exception {
         .withRecordWriter(writer)
         .withHiveConf(conf)
         .connect();
-      Assert.assertTrue("ConnectionError was not thrown", false);
+      Assert.fail("ConnectionError was not thrown");
       connection.close();
     } catch (ConnectionError e) {
       // expecting this exception
@@ -1110,7 +1033,7 @@ public void testEndpointConnection() throws Exception {
 
   @Test
   public void testAddPartition() throws Exception {
-    List<String> newPartVals = new ArrayList<String>(2);
+    List<String> newPartVals = new ArrayList<>(2);
     newPartVals.add(PART1_CONTINENT);
     newPartVals.add("Nepal");
 
@@ -1140,7 +1063,7 @@ public void testAddPartition() throws Exception {
 
   @Test
   public void testAddPartitionWithWriteId() throws Exception {
-    List<String> newPartVals = new ArrayList<String>(2);
+    List<String> newPartVals = new ArrayList<>(2);
     newPartVals.add("WriteId_continent");
     newPartVals.add("WriteId_country");
 
@@ -1190,7 +1113,8 @@ public void testAddPartitionWithWriteId() throws 
Exception {
     try {
       msClient.getPartition(dbName, tblName, newPartVals);
       Assert.fail("Partition shouldn't exist so a NoSuchObjectException should 
have been raised");
-    } catch (NoSuchObjectException e) {}
+    } catch (NoSuchObjectException ignored) {
+    }
 
     transactionConnection.commitTransaction(partitions);
 
@@ -1201,8 +1125,8 @@ public void testAddPartitionWithWriteId() throws 
Exception {
 
   @Test
   public void testAddDynamicPartitionWithWriteId() throws Exception {
-    queryTable(driver, "drop table if exists default.writeiddynamic");
-    queryTable(driver, "create table default.writeiddynamic (a"
+    dropTables(driver, "default.writeiddynamic");
+    runQuery(driver, "create table default.writeiddynamic (a"
         + " string, b string) partitioned by (c string, d string)"
         + " stored as orc TBLPROPERTIES('transactional'='true')");
 
@@ -1265,11 +1189,12 @@ public void testAddDynamicPartitionWithWriteId() throws 
Exception {
       msClient.getPartition("default", "writeiddynamic", partitionName);
       Assert.fail(
           "Partition shouldn't exist so a NoSuchObjectException should have 
been raised");
-    } catch (NoSuchObjectException e) {
+    } catch (NoSuchObjectException ignored) {
     }
 
-    partitionsOne.addAll(partitionsTwo);
-    Set<String> allPartitions = partitionsOne;
+    Set<String> allPartitions = new HashSet<>(partitionsOne);
+    allPartitions.addAll(partitionsTwo);
+
     transactionConnection.commitTransaction(allPartitions);
 
     // Ensure partition is present
@@ -1351,8 +1276,6 @@ public void testTransactionBatchSizeValidation() throws 
Exception {
 
   /**
    * check that transactions that have not heartbeated and timedout get 
properly aborted
-   *
-   * @throws Exception
    */
   @Test
   public void testTimeOutReaper() throws Exception {
@@ -1366,7 +1289,7 @@ public void testTimeOutReaper() throws Exception {
       .withRecordWriter(writer)
       .withHiveConf(conf)
       .connect();
-    
+
     HiveConf houseKeeperConf = new HiveConf(conf);
     //ensure txn timesout
     houseKeeperConf.setTimeVar(HiveConf.ConfVars.HIVE_TXN_TIMEOUT, 100, 
TimeUnit.MILLISECONDS);
@@ -1426,15 +1349,15 @@ public void testHeartbeat() throws Exception {
       request.setTablename(tblName2);
       ShowLocksResponse response = msClient.showLocks(request);
       Assert.assertEquals("Wrong number of locks: " + response, 1, 
response.getLocks().size());
-      ShowLocksResponseElement lock = response.getLocks().get(0);
+      ShowLocksResponseElement lock = response.getLocks().getFirst();
       long acquiredAt = lock.getAcquiredat();
       long heartbeatAt = lock.getLastheartbeat();
       response = msClient.showLocks(request);
       Assert.assertEquals("Wrong number of locks2: " + response, 1, 
response.getLocks().size());
-      lock = response.getLocks().get(0);
+      lock = response.getLocks().getFirst();
       Assert.assertEquals("Acquired timestamp didn'table match", acquiredAt, 
lock.getAcquiredat());
-      Assert.assertTrue("Expected new heartbeat (" + lock.getLastheartbeat() +
-        ") == old heartbeat(" + heartbeatAt + ")", lock.getLastheartbeat() == 
heartbeatAt);
+      Assert.assertEquals("Expected new heartbeat (" + lock.getLastheartbeat() 
+
+          ") == old heartbeat(" + heartbeatAt + ")", lock.getLastheartbeat(), 
heartbeatAt);
       for (int i = 0; i < transactionBatch * 3; i++) {
         connection.beginTransaction();
         if (i % 10 == 0) {
@@ -1513,7 +1436,9 @@ public void testTransactionBatchCommitDelimited() throws 
Exception {
     connection.write("1,Hello streaming".getBytes());
     connection.commitTransaction();
 
-    checkDataWritten(partLoc, 1, 10, 1, 1, "{1, Hello streaming}");
+    String validationQuery = "select id, msg from " + dbName + "." + tblName + 
" order by id, msg";
+    checkDataWritten(partLoc, 1, 10, 1, validationQuery,
+        "1\tHello streaming");
 
     Assert.assertEquals(HiveStreamingConnection.TxnState.COMMITTED,
         connection.getCurrentTransactionState());
@@ -1525,12 +1450,14 @@ public void testTransactionBatchCommitDelimited() 
throws Exception {
     connection.write("2,Welcome to streaming".getBytes());
 
     // data should not be visible
-    checkDataWritten(partLoc, 1, 10, 1, 1, "{1, Hello streaming}");
+    checkDataWritten(partLoc, 1, 10, 1, validationQuery,
+        "1\tHello streaming");
 
     connection.commitTransaction();
 
-    checkDataWritten(partLoc, 1, 10, 1, 1, "{1, Hello streaming}",
-      "{2, Welcome to streaming}");
+    checkDataWritten(partLoc, 1, 10, 1, validationQuery,
+        "1\tHello streaming",
+        "2\tWelcome to streaming");
 
     connection.close();
 
@@ -1584,7 +1511,9 @@ public void testTransactionBatchCommitRegex() throws 
Exception {
     connection.write("1,Hello streaming".getBytes());
     connection.commitTransaction();
 
-    checkDataWritten(partLoc, 1, 10, 1, 1, "{1, Hello streaming}");
+    String validationQuery = "select id, msg from " + dbName + "." + tblName + 
" order by id, msg";
+    checkDataWritten(partLoc, 1, 10, 1, validationQuery,
+        "1\tHello streaming");
 
     Assert.assertEquals(HiveStreamingConnection.TxnState.COMMITTED,
         connection.getCurrentTransactionState());
@@ -1596,12 +1525,14 @@ public void testTransactionBatchCommitRegex() throws 
Exception {
     connection.write("2,Welcome to streaming".getBytes());
 
     // data should not be visible
-    checkDataWritten(partLoc, 1, 10, 1, 1, "{1, Hello streaming}");
+    checkDataWritten(partLoc, 1, 10, 1, validationQuery,
+        "1\tHello streaming");
 
     connection.commitTransaction();
 
-    checkDataWritten(partLoc, 1, 10, 1, 1, "{1, Hello streaming}",
-      "{2, Welcome to streaming}");
+    checkDataWritten(partLoc, 1, 10, 1, validationQuery,
+        "1\tHello streaming",
+        "2\tWelcome to streaming");
 
     connection.close();
     Assert.assertEquals(HiveStreamingConnection.TxnState.INACTIVE,
@@ -1686,7 +1617,9 @@ public void testTransactionBatchCommitJson() throws 
Exception {
     connection.write(rec1.getBytes());
     connection.commitTransaction();
 
-    checkDataWritten(partLoc, 1, 10, 1, 1, "{1, Hello streaming}");
+    String validationQuery = "select id, msg from " + dbName + "." + tblName + 
" order by id, msg";
+    checkDataWritten(partLoc, 1, 10, 1, validationQuery,
+        "1\tHello streaming");
 
     Assert.assertEquals(HiveStreamingConnection.TxnState.COMMITTED,
         connection.getCurrentTransactionState());
@@ -1826,31 +1759,29 @@ public void testTransactionBatchAbort() throws 
Exception {
     connection.close();
 
     checkNothingWritten(partLoc);
-
   }
 
   @Test(expected = ClassCastException.class)
   public void testFileSystemError() throws Exception {
     // Bad file system object, ClassCastException should occur during record 
writer init
-    conf.set("fs.raw.impl", Object.class.getName());
+    conf.set("fs.raw.impl", TestStreaming.class.getName());
 
     StrictDelimitedInputWriter writer = StrictDelimitedInputWriter.newBuilder()
-            .withFieldDelimiter(',')
-            .build();
+      .withFieldDelimiter(',')
+      .build();
 
     HiveStreamingConnection connection = HiveStreamingConnection.newBuilder()
-            .withDatabase(dbName)
-            .withTable(tblName)
-            .withStaticPartitionValues(partitionVals)
-            .withAgentInfo("UT_" + Thread.currentThread().getName())
-            .withRecordWriter(writer)
-            .withHiveConf(conf)
-            .connect();
+      .withDatabase(dbName)
+      .withTable(tblName)
+      .withStaticPartitionValues(partitionVals)
+      .withAgentInfo("UT_" + Thread.currentThread().getName())
+      .withRecordWriter(writer)
+      .withHiveConf(conf)
+      .connect();
 
     connection.beginTransaction();
   }
 
-
   @Test
   public void testTransactionBatchAbortAndCommit() throws Exception {
     String agentInfo = "UT_" + Thread.currentThread().getName();
@@ -1873,9 +1804,9 @@ public void testTransactionBatchAbortAndCommit() throws 
Exception {
     connection.write("2,Welcome to streaming".getBytes());
     ShowLocksResponse resp = msClient.showLocks(new ShowLocksRequest());
     Assert.assertEquals("LockCount", 1, resp.getLocksSize());
-    Assert.assertEquals("LockType", LockType.SHARED_READ, 
resp.getLocks().get(0).getType());
-    Assert.assertEquals("LockState", LockState.ACQUIRED, 
resp.getLocks().get(0).getState());
-    Assert.assertEquals("AgentInfo", agentInfo, 
resp.getLocks().get(0).getAgentInfo());
+    Assert.assertEquals("LockType", LockType.SHARED_WRITE, 
resp.getLocks().getFirst().getType());
+    Assert.assertEquals("LockState", LockState.ACQUIRED, 
resp.getLocks().getFirst().getState());
+    Assert.assertEquals("AgentInfo", agentInfo, 
resp.getLocks().getFirst().getAgentInfo());
     connection.abortTransaction();
 
     checkNothingWritten(partLoc);
@@ -1888,8 +1819,10 @@ public void testTransactionBatchAbortAndCommit() throws 
Exception {
     connection.write("2,Welcome to streaming".getBytes());
     connection.commitTransaction();
 
-    checkDataWritten(partLoc, 1, 10, 1, 1, "{1, Hello streaming}",
-      "{2, Welcome to streaming}");
+    String validationQuery = "select id, msg from " + dbName + "." + tblName + 
" order by id, msg";
+    checkDataWritten(partLoc, 1, 10, 1, validationQuery,
+        "1\tHello streaming",
+        "2\tWelcome to streaming");
 
     connection.close();
   }
@@ -1914,14 +1847,16 @@ public void testMultipleTransactionBatchCommits() 
throws Exception {
     connection.write("1,Hello streaming".getBytes());
     connection.commitTransaction();
     String validationQuery = "select id, msg from " + dbName + "." + tblName + 
" order by id, msg";
-    checkDataWritten2(partLoc, 1, 10, 1, validationQuery, false, "1\tHello 
streaming");
+    checkDataWritten(partLoc, 1, 10, 1, validationQuery,
+        "1\tHello streaming");
 
     connection.beginTransaction();
     connection.write("2,Welcome to streaming".getBytes());
     connection.commitTransaction();
 
-    checkDataWritten2(partLoc, 1, 10, 1, validationQuery, true, "1\tHello 
streaming",
-      "2\tWelcome to streaming");
+    checkDataWritten(partLoc, 1, 10, 1, validationQuery,
+        "1\tHello streaming",
+        "2\tWelcome to streaming");
 
     connection.close();
 
@@ -1939,16 +1874,20 @@ public void testMultipleTransactionBatchCommits() 
throws Exception {
     connection.write("3,Hello streaming - once again".getBytes());
     connection.commitTransaction();
 
-    checkDataWritten2(partLoc, 1, 20, 2, validationQuery, false, "1\tHello 
streaming",
-      "2\tWelcome to streaming", "3\tHello streaming - once again");
+    checkDataWritten(partLoc, 1, 20, 2, validationQuery,
+        "1\tHello streaming",
+        "2\tWelcome to streaming",
+        "3\tHello streaming - once again");
 
     connection.beginTransaction();
     connection.write("4,Welcome to streaming - once again".getBytes());
     connection.commitTransaction();
 
-    checkDataWritten2(partLoc, 1, 20, 2, validationQuery, true, "1\tHello 
streaming",
-      "2\tWelcome to streaming", "3\tHello streaming - once again",
-      "4\tWelcome to streaming - once again");
+    checkDataWritten(partLoc, 1, 20, 2, validationQuery,
+        "1\tHello streaming",
+        "2\tWelcome to streaming",
+        "3\tHello streaming - once again",
+        "4\tWelcome to streaming - once again");
 
     Assert.assertEquals(HiveStreamingConnection.TxnState.COMMITTED,
         connection.getCurrentTransactionState());
@@ -1999,8 +1938,8 @@ public void testInterleavedTransactionBatchCommits() 
throws Exception {
     connection2.commitTransaction();
 
     String validationQuery = "select id, msg from " + dbName + "." + tblName + 
" order by id, msg";
-    checkDataWritten2(partLoc, 11, 20, 1,
-      validationQuery, true, "3\tHello streaming - once again");
+    checkDataWritten(partLoc, 11, 20, 1, validationQuery,
+        "3\tHello streaming - once again");
 
     connection.commitTransaction();
     /*now both batches have committed (but not closed) so we for each primary 
file we expect a side
@@ -2016,11 +1955,12 @@ public void testInterleavedTransactionBatchCommits() 
throws Exception {
           lengthFileSize, lengthFileSize > 0);
         long logicalLength = AcidUtils.getLogicalLength(fs, stat);
         long actualLength = stat.getLen();
-        Assert.assertTrue("", logicalLength == actualLength);
+        Assert.assertEquals("", logicalLength, actualLength);
       }
     }
-    checkDataWritten2(partLoc, 1, 20, 2,
-      validationQuery, false, "1\tHello streaming", "3\tHello streaming - once 
again");
+    checkDataWritten(partLoc, 1, 20, 2, validationQuery,
+        "1\tHello streaming",
+        "3\tHello streaming - once again");
 
     connection.beginTransaction();
     connection.write("2,Welcome to streaming".getBytes());
@@ -2030,7 +1970,7 @@ public void testInterleavedTransactionBatchCommits() 
throws Exception {
     //here each batch has written data and committed (to bucket0 since table 
only has 1 bucket)
     //so each of 2 deltas has 1 bucket0 and 1 bucket0_flush_length.  
Furthermore, each bucket0
     //has now received more data(logically - it's buffered) but it is not yet 
committed.
-    //lets check that side files exist, etc
+    //let's check that side files exist, etc
     dir = AcidUtils.getAcidState(fs, partLoc, conf, 
getTransactionContext(conf), null, false);
     for (AcidUtils.ParsedDelta pd : dir.getCurrentDirectories()) {
       for (FileStatus stat : fs.listStatus(pd.getPath(), 
AcidUtils.bucketFileFilter)) {
@@ -2044,23 +1984,24 @@ public void testInterleavedTransactionBatchCommits() 
throws Exception {
         Assert.assertTrue("", logicalLength <= actualLength);
       }
     }
-    checkDataWritten2(partLoc, 1, 20, 2,
-      validationQuery, true, "1\tHello streaming", "3\tHello streaming - once 
again");
+    checkDataWritten(partLoc, 1, 20, 2, validationQuery,
+        "1\tHello streaming",
+        "3\tHello streaming - once again");
 
     connection.commitTransaction();
 
-    checkDataWritten2(partLoc, 1, 20, 2,
-      validationQuery, false, "1\tHello streaming",
-      "2\tWelcome to streaming",
-      "3\tHello streaming - once again");
+    checkDataWritten(partLoc, 1, 20, 2, validationQuery,
+        "1\tHello streaming",
+        "2\tWelcome to streaming",
+        "3\tHello streaming - once again");
 
     connection2.commitTransaction();
 
-    checkDataWritten2(partLoc, 1, 20, 2,
-      validationQuery, true, "1\tHello streaming",
-      "2\tWelcome to streaming",
-      "3\tHello streaming - once again",
-      "4\tWelcome to streaming - once again");
+    checkDataWritten(partLoc, 1, 20, 2, validationQuery,
+        "1\tHello streaming",
+        "2\tWelcome to streaming",
+        "3\tHello streaming - once again",
+        "4\tWelcome to streaming - once again");
 
     Assert.assertEquals(HiveStreamingConnection.TxnState.COMMITTED,
         connection.getCurrentTransactionState());
@@ -2094,7 +2035,7 @@ private static class WriterThd extends Thread {
       setUncaughtExceptionHandler((thread, throwable) -> {
         error = throwable;
         LOG.error(connection.toTransactionString());
-        LOG.error("Thread " + thread.getName() + " died: " + 
throwable.getMessage(), throwable);
+        LOG.error("Thread {} died: {}", thread.getName(), 
throwable.getMessage(), throwable);
       });
     }
 
@@ -2114,7 +2055,7 @@ public void run() {
           try {
             conn.close();
           } catch (Exception e) {
-            LOG.error("txnBatch.close() failed: " + e.getMessage(), e);
+            LOG.error("txnBatch.close() failed: {}", e.getMessage(), e);
           }
         }
       }
@@ -2123,7 +2064,7 @@ public void run() {
 
   @Test
   public void testConcurrentTransactionBatchCommits() throws Exception {
-    List<WriterThd> writers = new ArrayList<WriterThd>(3);
+    List<WriterThd> writers = new ArrayList<>(3);
     writers.add(new WriterThd("1,Matrix"));
     writers.add(new WriterThd("2,Gandhi"));
     writers.add(new WriterThd("3,Silence"));
@@ -2136,16 +2077,15 @@ public void testConcurrentTransactionBatchCommits() 
throws Exception {
     }
     for (WriterThd w : writers) {
       if (w.error != null) {
-        Assert.assertFalse("Writer thread" + w.getName() + " died: " + 
w.error.getMessage() +
-          " See log file for stack trace", true);
+        Assert.fail("Writer thread" + w.getName() + " died: " + 
w.error.getMessage() +
+            " See log file for stack trace");
       }
     }
   }
 
-
   private ArrayList<SampleRec> dumpBucket(Path orcFile) throws IOException {
     org.apache.hadoop.fs.FileSystem fs = 
org.apache.hadoop.fs.FileSystem.getLocal(new Configuration());
-    ArrayList<SampleRec> result = new ArrayList<SampleRec>();
+    ArrayList<SampleRec> result = new ArrayList<>();
     try (Reader reader = OrcFile.createReader(orcFile, 
OrcFile.readerOptions(conf).filesystem(fs))) {
       RecordReader rows = reader.rows();
       StructObjectInspector inspector = (StructObjectInspector) reader
@@ -2222,7 +2162,6 @@ public void testBucketing() throws Exception {
     createDbAndTable(driver, dbName4, tblName4, null, colNames2, colTypes2, 
bucketNames2
       , null, dbLocation2, bucketCount);
 
-
     // 2) Insert data into both tables
     StrictDelimitedInputWriter writer = StrictDelimitedInputWriter.newBuilder()
       .withFieldDelimiter(',')
@@ -2244,7 +2183,6 @@ public void testBucketing() throws Exception {
     connection.commitTransaction();
     connection.close();
 
-
     StrictDelimitedInputWriter writer2 = 
StrictDelimitedInputWriter.newBuilder()
       .withFieldDelimiter(',')
       .build();
@@ -2257,7 +2195,6 @@ public void testBucketing() throws Exception {
       .withHiveConf(conf)
       .connect();
 
-
     connection2.beginTransaction();
 
     connection2.write("name5,2,fact3".getBytes());  // bucket 0
@@ -2269,27 +2206,26 @@ public void testBucketing() throws Exception {
     connection2.close();
     // 3 Check data distribution in  buckets
 
-    HashMap<Integer, ArrayList<SampleRec>> actual1 = 
dumpAllBuckets(dbLocation, tblName3);
-    HashMap<Integer, ArrayList<SampleRec>> actual2 = 
dumpAllBuckets(dbLocation2, tblName4);
+    Map<Integer, ArrayList<SampleRec>> actual1 = dumpAllBuckets(dbLocation, 
tblName3);
+    Map<Integer, ArrayList<SampleRec>> actual2 = dumpAllBuckets(dbLocation2, 
tblName4);
     System.err.println("\n  Table 1");
     System.err.println(actual1);
     System.err.println("\n  Table 2");
     System.err.println(actual2);
 
     // assert bucket listing is as expected
-    Assert.assertEquals("number of buckets does not match expectation", 
actual1.values().size(), 3);
-    Assert.assertTrue("bucket 0 shouldn't have been created", actual1.get(0) 
== null);
-    Assert.assertEquals("records in bucket does not match expectation", 
actual1.get(1).size(), 1);
-    Assert.assertEquals("records in bucket does not match expectation", 
actual1.get(2).size(), 2);
-    Assert.assertEquals("records in bucket does not match expectation", 
actual1.get(3).size(), 1);
+    Assert.assertEquals("number of buckets does not match expectation", 3, 
actual1.size());
+    Assert.assertNull("bucket 0 shouldn't have been created", actual1.get(0));
+    Assert.assertEquals("records in bucket does not match expectation", 1, 
actual1.get(1).size());
+    Assert.assertEquals("records in bucket does not match expectation", 2, 
actual1.get(2).size());
+    Assert.assertEquals("records in bucket does not match expectation", 1, 
actual1.get(3).size());
   }
 
   private void runCmdOnDriver(String cmd) {
-    boolean t = runDDL(driver, cmd);
+    boolean t = runQuery(driver, cmd);
     Assert.assertTrue(cmd + " failed", t);
   }
 
-
   @Test
   public void testFileDump() throws Exception {
     String agentInfo = "UT_" + Thread.currentThread().getName();
@@ -2343,12 +2279,12 @@ public void testFileDump() throws Exception {
     System.err.flush();
     System.setErr(origErr);
 
-    String errDump = new String(myErr.toByteArray());
-    Assert.assertEquals(false, errDump.contains("file(s) are corrupted"));
+    String errDump = myErr.toString();
+    Assert.assertFalse(errDump.contains("file(s) are corrupted"));
     // since this test runs on local file system which does not have an API to 
tell if files or
     // open or not, we are testing for negative case even though the bucket 
files are still open
     // for writes (transaction batch not closed yet)
-    Assert.assertEquals(false, errDump.contains("is still open for writes."));
+    Assert.assertFalse(errDump.contains("is still open for writes."));
 
     StrictDelimitedInputWriter writer2 = 
StrictDelimitedInputWriter.newBuilder()
       .withFieldDelimiter(',')
@@ -2382,10 +2318,10 @@ public void testFileDump() throws Exception {
     System.err.flush();
     System.setErr(origErr);
 
-    errDump = new String(myErr.toByteArray());
-    Assert.assertEquals(false, errDump.contains("Exception"));
-    Assert.assertEquals(false, errDump.contains("file(s) are corrupted"));
-    Assert.assertEquals(false, errDump.contains("is still open for writes."));
+    errDump = myErr.toString();
+    Assert.assertFalse(errDump.contains("Exception"));
+    Assert.assertFalse(errDump.contains("file(s) are corrupted"));
+    Assert.assertFalse(errDump.contains("is still open for writes."));
   }
 
   @Test
@@ -2442,23 +2378,23 @@ public void 
testFileDumpDeltaFilesWithStreamingOptimizations() throws Exception
     System.out.flush();
     System.setOut(origOut);
 
-    String outDump = new String(myOut.toByteArray());
+    String outDump = myOut.toString();
     // make sure delta files are written with no indexes and no dictionary
-    Assert.assertEquals(true, outDump.contains("Compression: ZLIB"));
+    Assert.assertTrue(outDump.contains("Compression: ZLIB"));
     // no stats/indexes
-    Assert.assertEquals(true, outDump.contains("Column 0: count: 0 hasNull: 
false"));
-    Assert.assertEquals(true, outDump.contains("Column 1: count: 0 hasNull: 
false bytesOnDisk: 15 sum: 0"));
-    Assert.assertEquals(true, outDump.contains("Column 2: count: 0 hasNull: 
false bytesOnDisk: 15 sum: 0"));
-    Assert.assertEquals(true, outDump.contains("Column 3: count: 0 hasNull: 
false bytesOnDisk: 19 sum: 0"));
-    Assert.assertEquals(true, outDump.contains("Column 4: count: 0 hasNull: 
false bytesOnDisk: 17 sum: 0"));
-    Assert.assertEquals(true, outDump.contains("Column 5: count: 0 hasNull: 
false bytesOnDisk: 15 sum: 0"));
-    Assert.assertEquals(true, outDump.contains("Column 6: count: 0 hasNull: 
false"));
-    Assert.assertEquals(true, outDump.contains("Column 7: count: 0 hasNull: 
false bytesOnDisk: 3929"));
-    Assert.assertEquals(true, outDump.contains("Column 8: count: 0 hasNull: 
false bytesOnDisk: 1484 sum: 0"));
-    Assert.assertEquals(true, outDump.contains("Column 9: count: 0 hasNull: 
false bytesOnDisk: 816"));
+    Assert.assertTrue(outDump.contains("Column 0: count: 0 hasNull: false"));
+    Assert.assertTrue(outDump.contains("Column 1: count: 0 hasNull: false 
bytesOnDisk: 15 sum: 0"));
+    Assert.assertTrue(outDump.contains("Column 2: count: 0 hasNull: false 
bytesOnDisk: 15 sum: 0"));
+    Assert.assertTrue(outDump.contains("Column 3: count: 0 hasNull: false 
bytesOnDisk: 19 sum: 0"));
+    Assert.assertTrue(outDump.contains("Column 4: count: 0 hasNull: false 
bytesOnDisk: 17 sum: 0"));
+    Assert.assertTrue(outDump.contains("Column 5: count: 0 hasNull: false 
bytesOnDisk: 15 sum: 0"));
+    Assert.assertTrue(outDump.contains("Column 6: count: 0 hasNull: false"));
+    Assert.assertTrue(outDump.contains("Column 7: count: 0 hasNull: false 
bytesOnDisk: 3929"));
+    Assert.assertTrue(outDump.contains("Column 8: count: 0 hasNull: false 
bytesOnDisk: 1484 sum: 0"));
+    Assert.assertTrue(outDump.contains("Column 9: count: 0 hasNull: false 
bytesOnDisk: 816"));
     // no dictionary
-    Assert.assertEquals(true, outDump.contains("Encoding column 7: 
DIRECT_V2"));
-    Assert.assertEquals(true, outDump.contains("Encoding column 9: 
DIRECT_V2"));
+    Assert.assertTrue(outDump.contains("Encoding column 7: DIRECT_V2"));
+    Assert.assertTrue(outDump.contains("Encoding column 9: DIRECT_V2"));
   }
 
   @Test
@@ -2514,9 +2450,9 @@ public void 
testFileDumpDeltaFilesWithoutStreamingOptimizations() throws Excepti
     System.out.flush();
     System.setOut(origOut);
 
-    String outDump = new String(myOut.toByteArray());
-    Assert.assertEquals(true, outDump.contains("Compression: ZLIB"));
-    Assert.assertEquals(true, outDump.contains("Encoding column 9: 
DICTIONARY"));
+    String outDump = myOut.toString();
+    Assert.assertTrue(outDump.contains("Compression: ZLIB"));
+    Assert.assertTrue(outDump.contains("Encoding column 9: DICTIONARY"));
   }
 
   @Test
@@ -2579,10 +2515,10 @@ public void testFileDumpCorruptDataFiles() throws 
Exception {
     System.err.flush();
     System.setErr(origErr);
 
-    String errDump = new String(myErr.toByteArray());
-    Assert.assertEquals(false, errDump.contains("Exception"));
-    Assert.assertEquals(true, errDump.contains("3 file(s) are corrupted"));
-    Assert.assertEquals(false, errDump.contains("is still open for writes."));
+    String errDump = myErr.toString();
+    Assert.assertFalse(errDump.contains("Exception"));
+    Assert.assertTrue(errDump.contains("3 file(s) are corrupted"));
+    Assert.assertFalse(errDump.contains("is still open for writes."));
 
     origErr = System.err;
     myErr = new ByteArrayOutputStream();
@@ -2593,13 +2529,13 @@ public void testFileDumpCorruptDataFiles() throws 
Exception {
     System.err.flush();
     System.setErr(origErr);
 
-    errDump = new String(myErr.toByteArray());
-    Assert.assertEquals(true, errDump.contains("bucket_00001 recovered 
successfully!"));
-    Assert.assertEquals(true, errDump.contains("No readable footers found. 
Creating empty orc file."));
-    Assert.assertEquals(true, errDump.contains("bucket_00002 recovered 
successfully!"));
-    Assert.assertEquals(true, errDump.contains("bucket_00003 recovered 
successfully!"));
-    Assert.assertEquals(false, errDump.contains("Exception"));
-    Assert.assertEquals(false, errDump.contains("is still open for writes."));
+    errDump = myErr.toString();
+    Assert.assertTrue(errDump.contains("bucket_00001 recovered 
successfully!"));
+    Assert.assertTrue(errDump.contains("No readable footers found. Creating 
empty orc file."));
+    Assert.assertTrue(errDump.contains("bucket_00002 recovered 
successfully!"));
+    Assert.assertTrue(errDump.contains("bucket_00003 recovered 
successfully!"));
+    Assert.assertFalse(errDump.contains("Exception"));
+    Assert.assertFalse(errDump.contains("is still open for writes."));
 
     // test after recovery
     origErr = System.err;
@@ -2611,15 +2547,15 @@ public void testFileDumpCorruptDataFiles() throws 
Exception {
     System.err.flush();
     System.setErr(origErr);
 
-    errDump = new String(myErr.toByteArray());
-    Assert.assertEquals(false, errDump.contains("Exception"));
-    Assert.assertEquals(false, errDump.contains("file(s) are corrupted"));
-    Assert.assertEquals(false, errDump.contains("is still open for writes."));
+    errDump = myErr.toString();
+    Assert.assertFalse(errDump.contains("Exception"));
+    Assert.assertFalse(errDump.contains("file(s) are corrupted"));
+    Assert.assertFalse(errDump.contains("is still open for writes."));
 
-    // after recovery there shouldn'table be any *_flush_length files
+    // after recovery there shouldn't be any *_flush_length files
     files = FileDump.getAllFilesInPath(path, conf);
     for (String file : files) {
-      Assert.assertEquals(false, file.contains("_flush_length"));
+      Assert.assertFalse(file.contains("_flush_length"));
     }
 
     connection.close();
@@ -2679,7 +2615,7 @@ public void testFileDumpCorruptSideFiles() throws 
Exception {
     connection.write("name6,3,aHello streaming".getBytes());
     connection.commitTransaction();
 
-    Map<String, List<Long>> offsetMap = new HashMap<String, List<Long>>();
+    Map<String, List<Long>> offsetMap = new HashMap<>();
     recordOffsets(conf, dbLocation, offsetMap);
 
     connection.beginTransaction();
@@ -2726,14 +2662,14 @@ public void testFileDumpCorruptSideFiles() throws 
Exception {
     System.err.flush();
     System.setErr(origErr);
 
-    String errDump = new String(myErr.toByteArray());
-    Assert.assertEquals(true, errDump.contains("bucket_00000_flush_length 
[length: 11"));
-    Assert.assertEquals(true, errDump.contains("bucket_00001_flush_length 
[length: 0"));
-    Assert.assertEquals(true, errDump.contains("bucket_00002_flush_length 
[length: 24"));
-    Assert.assertEquals(true, errDump.contains("bucket_00003_flush_length 
[length: 80"));
-    Assert.assertEquals(false, errDump.contains("Exception"));
-    Assert.assertEquals(true, errDump.contains("4 file(s) are corrupted"));
-    Assert.assertEquals(false, errDump.contains("is still open for writes."));
+    String errDump = myErr.toString();
+    Assert.assertTrue(errDump.contains("bucket_00000_flush_length [length: 
11"));
+    Assert.assertTrue(errDump.contains("bucket_00001_flush_length [length: 
0"));
+    Assert.assertTrue(errDump.contains("bucket_00002_flush_length [length: 
24"));
+    Assert.assertTrue(errDump.contains("bucket_00003_flush_length [length: 
80"));
+    Assert.assertFalse(errDump.contains("Exception"));
+    Assert.assertTrue(errDump.contains("4 file(s) are corrupted"));
+    Assert.assertFalse(errDump.contains("is still open for writes."));
 
     origErr = System.err;
     myErr = new ByteArrayOutputStream();
@@ -2744,21 +2680,21 @@ public void testFileDumpCorruptSideFiles() throws 
Exception {
     System.err.flush();
     System.setErr(origErr);
 
-    errDump = new String(myErr.toByteArray());
-    Assert.assertEquals(true, errDump.contains("bucket_00000 recovered 
successfully!"));
-    Assert.assertEquals(true, errDump.contains("bucket_00001 recovered 
successfully!"));
-    Assert.assertEquals(true, errDump.contains("bucket_00002 recovered 
successfully!"));
-    Assert.assertEquals(true, errDump.contains("bucket_00003 recovered 
successfully!"));
+    errDump = myErr.toString();
+    Assert.assertTrue(errDump.contains("bucket_00000 recovered 
successfully!"));
+    Assert.assertTrue(errDump.contains("bucket_00001 recovered 
successfully!"));
+    Assert.assertTrue(errDump.contains("bucket_00002 recovered 
successfully!"));
+    Assert.assertTrue(errDump.contains("bucket_00003 recovered 
successfully!"));
     List<Long> offsets = offsetMap.get("bucket_00000");
-    Assert.assertEquals(true, errDump.contains("Readable footerOffsets: " + 
offsets.toString()));
+    Assert.assertTrue(errDump.contains("Readable footerOffsets: " + 
offsets.toString()));
     offsets = offsetMap.get("bucket_00001");
-    Assert.assertEquals(true, errDump.contains("Readable footerOffsets: " + 
offsets.toString()));
+    Assert.assertTrue(errDump.contains("Readable footerOffsets: " + 
offsets.toString()));
     offsets = offsetMap.get("bucket_00002");
-    Assert.assertEquals(true, errDump.contains("Readable footerOffsets: " + 
offsets.toString()));
+    Assert.assertTrue(errDump.contains("Readable footerOffsets: " + 
offsets.toString()));
     offsets = offsetMap.get("bucket_00003");
-    Assert.assertEquals(true, errDump.contains("Readable footerOffsets: " + 
offsets.toString()));
-    Assert.assertEquals(false, errDump.contains("Exception"));
-    Assert.assertEquals(false, errDump.contains("is still open for writes."));
+    Assert.assertTrue(errDump.contains("Readable footerOffsets: " + 
offsets.toString()));
+    Assert.assertFalse(errDump.contains("Exception"));
+    Assert.assertFalse(errDump.contains("is still open for writes."));
 
     // test after recovery
     origErr = System.err;
@@ -2770,15 +2706,15 @@ public void testFileDumpCorruptSideFiles() throws 
Exception {
     System.err.flush();
     System.setErr(origErr);
 
-    errDump = new String(myErr.toByteArray());
-    Assert.assertEquals(false, errDump.contains("Exception"));
-    Assert.assertEquals(false, errDump.contains("file(s) are corrupted"));
-    Assert.assertEquals(false, errDump.contains("is still open for writes."));
+    errDump = myErr.toString();
+    Assert.assertFalse(errDump.contains("Exception"));
+    Assert.assertFalse(errDump.contains("file(s) are corrupted"));
+    Assert.assertFalse(errDump.contains("is still open for writes."));
 
-    // after recovery there shouldn'table be any *_flush_length files
+    // after recovery there shouldn't be any *_flush_length files
     files = FileDump.getAllFilesInPath(path, conf);
     for (String file : files) {
-      Assert.assertEquals(false, file.contains("_flush_length"));
+      Assert.assertFalse(file.contains("_flush_length"));
     }
 
     connection.close();
@@ -2792,7 +2728,7 @@ private void corruptSideFile(final String file, final 
HiveConf conf,
     Path cPath = new Path(sideFilePath.getParent(), sideFilePath.getName() + 
".corrupt");
     FileSystem fs = sideFilePath.getFileSystem(conf);
     List<Long> offsets = offsetMap.get(key);
-    long lastOffset = offsets.get(offsets.size() - 1);
+    long lastOffset = offsets.getLast();
     FSDataOutputStream fdos = fs.create(cPath, true);
     // corrupt last entry
     if (numEntries < 0) {
@@ -2838,45 +2774,21 @@ private void recordOffsets(final HiveConf conf, final 
String dbLocation,
       long len = fileStatus.getLen();
 
       if (file.contains("bucket_00000")) {
-        if (offsetMap.containsKey("bucket_00000")) {
-          List<Long> offsets = offsetMap.get("bucket_00000");
-          offsets.add(len);
-          offsetMap.put("bucket_00000", offsets);
-        } else {
-          List<Long> offsets = new ArrayList<Long>();
-          offsets.add(len);
-          offsetMap.put("bucket_00000", offsets);
-        }
+        List<Long> offsets = offsetMap.computeIfAbsent("bucket_00000", k -> 
new ArrayList<>());
+        offsets.add(len);
+        offsetMap.put("bucket_00000", offsets);
       } else if (file.contains("bucket_00001")) {
-        if (offsetMap.containsKey("bucket_00001")) {
-          List<Long> offsets = offsetMap.get("bucket_00001");
-          offsets.add(len);
-          offsetMap.put("bucket_00001", offsets);
-        } else {
-          List<Long> offsets = new ArrayList<Long>();
-          offsets.add(len);
-          offsetMap.put("bucket_00001", offsets);
-        }
+        List<Long> offsets = offsetMap.computeIfAbsent("bucket_00001", k -> 
new ArrayList<>());
+        offsets.add(len);
+        offsetMap.put("bucket_00001", offsets);
       } else if (file.contains("bucket_00002")) {
-        if (offsetMap.containsKey("bucket_00002")) {
-          List<Long> offsets = offsetMap.get("bucket_00002");
-          offsets.add(len);
-          offsetMap.put("bucket_00002", offsets);
-        } else {
-          List<Long> offsets = new ArrayList<Long>();
-          offsets.add(len);
-          offsetMap.put("bucket_00002", offsets);
-        }
+        List<Long> offsets = offsetMap.computeIfAbsent("bucket_00002", k -> 
new ArrayList<>());
+        offsets.add(len);
+        offsetMap.put("bucket_00002", offsets);
       } else if (file.contains("bucket_00003")) {
-        if (offsetMap.containsKey("bucket_00003")) {
-          List<Long> offsets = offsetMap.get("bucket_00003");
-          offsets.add(len);
-          offsetMap.put("bucket_00003", offsets);
-        } else {
-          List<Long> offsets = new ArrayList<Long>();
-          offsets.add(len);
-          offsetMap.put("bucket_00003", offsets);
-        }
+        List<Long> offsets = offsetMap.computeIfAbsent("bucket_00003", k -> 
new ArrayList<>());
+        offsets.add(len);
+        offsetMap.put("bucket_00003", offsets);
       }
     }
   }
@@ -2908,7 +2820,7 @@ public void testErrorHandling()
     connection.close();
     Exception expectedEx = null;
     GetOpenTxnsInfoResponse r = msClient.showTxns();
-    Assert.assertEquals("HWM didn'table match", 17, 
r.getTxn_high_water_mark());
+    Assert.assertEquals("HWM didn't match", 9, r.getTxn_high_water_mark());
     List<TxnInfo> ti = r.getOpen_txns();
     Assert.assertEquals("wrong status ti(0)",
         org.apache.hadoop.hive.metastore.api.TxnState.ABORTED,
@@ -2995,7 +2907,7 @@ public void testErrorHandling()
     Assert.assertTrue("Actual: " + s, s.contains("TxnStatus[CA]"));
 
     r = msClient.showTxns();
-    Assert.assertEquals("HWM didn't match", 19, r.getTxn_high_water_mark());
+    Assert.assertEquals("HWM didn't match", 11, r.getTxn_high_water_mark());
     ti = r.getOpen_txns();
     Assert.assertEquals("wrong status ti(0)",
         org.apache.hadoop.hive.metastore.api.TxnState.ABORTED,
@@ -3031,7 +2943,7 @@ public void testErrorHandling()
       expectedEx != null && expectedEx.getMessage().contains("Simulated fault 
occurred"));
 
     r = msClient.showTxns();
-    Assert.assertEquals("HWM didn'table match", 21, 
r.getTxn_high_water_mark());
+    Assert.assertEquals("HWM didn't match", 13, r.getTxn_high_water_mark());
     ti = r.getOpen_txns();
     Assert.assertEquals("wrong status ti(3)",
         org.apache.hadoop.hive.metastore.api.TxnState.ABORTED,
@@ -3043,20 +2955,17 @@ public void testErrorHandling()
 
   // assumes un partitioned table
   // returns a map<bucketNum, list<record> >
-  private HashMap<Integer, ArrayList<SampleRec>> dumpAllBuckets(String 
dbLocation, String tableName)
-    throws IOException {
-    HashMap<Integer, ArrayList<SampleRec>> result = new HashMap<Integer, 
ArrayList<SampleRec>>();
+  private Map<Integer, ArrayList<SampleRec>> dumpAllBuckets(String dbLocation, 
String tableName)
+      throws IOException {
+    Map<Integer, ArrayList<SampleRec>> result = new HashMap<>();
 
     for (File deltaDir : new File(dbLocation + "/" + tableName).listFiles()) {
       if (!deltaDir.getName().startsWith("delta")) {
         continue;
       }
-      File[] bucketFiles = deltaDir.listFiles(new FileFilter() {
-        @Override
-        public boolean accept(File pathname) {
-          String name = pathname.getName();
-          return !name.startsWith("_") && !name.startsWith(".");
-        }
+      File[] bucketFiles = deltaDir.listFiles(pathname -> {
+        String name = pathname.getName();
+        return !name.startsWith("_") && !name.startsWith(".");
       });
       for (File bucketFile : bucketFiles) {
         if (bucketFile.toString().endsWith("length")) {
@@ -3074,7 +2983,7 @@ public boolean accept(File pathname) {
   private Integer getBucketNumber(File bucketFile) {
     String fname = bucketFile.getName();
     int start = fname.indexOf('_');
-    String number = fname.substring(start + 1, fname.length());
+    String number = fname.substring(start + 1);
     return Integer.parseInt(number);
   }
 
@@ -3085,12 +2994,10 @@ public static void dropDB(IMetaStoreClient client, 
String databaseName) {
         client.dropTable(databaseName, table, true, true);
       }
       client.dropDatabase(databaseName);
-    } catch (TException e) {
+    } catch (TException ignored) {
     }
-
   }
 
-
   ///////// -------- UTILS ------- /////////
   // returns Path of the partition created (if any) else Path of table
   private static Path createDbAndTable(IDriver driver, String databaseName,
@@ -3103,8 +3010,8 @@ private static Path createDbAndTable(IDriver driver, 
String databaseName,
     String dbUri = "raw://" + new Path(dbLocation).toUri().toString();
     String tableLoc = dbUri + Path.SEPARATOR + tableName;
 
-    runDDL(driver, "create database IF NOT EXISTS " + databaseName + " 
location '" + dbUri + "'");
-    runDDL(driver, "use " + databaseName);
+    runQuery(driver, "create database IF NOT EXISTS " + databaseName + " 
location '" + dbUri + "'");
+    runQuery(driver, "use " + databaseName);
     String crtTbl = "create table " + tableName +
       " ( " + getTableColumnsStr(colNames, colTypes) + " )" +
       getPartitionStmtStr(partNames) +
@@ -3113,7 +3020,7 @@ private static Path createDbAndTable(IDriver driver, 
String databaseName,
       " stored as orc " +
       " location '" + tableLoc + "'" +
       " TBLPROPERTIES ('transactional'='true') ";
-    runDDL(driver, crtTbl);
+    runQuery(driver, crtTbl);
     if (partNames != null && partNames.length != 0) {
       return addPartition(driver, tableName, partVals, partNames);
     }
@@ -3124,13 +3031,13 @@ private static Path addPartition(IDriver driver, String 
tableName, List<String>
     throws Exception {
     String partSpec = getPartsSpec(partNames, partVals);
     String addPart = "alter table " + tableName + " add partition ( " + 
partSpec + " )";
-    runDDL(driver, addPart);
+    runQuery(driver, addPart);
     return getPartitionPath(driver, tableName, partSpec);
   }
 
   private static Path getPartitionPath(IDriver driver, String tableName, 
String partSpec) throws Exception {
-    ArrayList<String> res = queryTable(driver, "describe extended " + 
tableName + " PARTITION (" + partSpec + ")");
-    String partInfo = res.get(res.size() - 1);
+    List<String> res = queryTable(driver, "describe extended " + tableName + " 
PARTITION (" + partSpec + ")");
+    String partInfo = res.getLast();
     int start = partInfo.indexOf("location:") + "location:".length();
     int end = partInfo.indexOf(",", start);
     return new Path(partInfo.substring(start, end));
@@ -3201,27 +3108,33 @@ private static String getPartitionStmtStr(String[] 
partNames) {
     return " partitioned by (" + getTablePartsStr(partNames) + " )";
   }
 
-  private static boolean runDDL(IDriver driver, String sql) {
+  private static boolean runQuery(IDriver driver, String sql) {
     LOG.debug(sql);
-    System.out.println(sql);
-    //LOG.debug("Running Hive Query: "+ sql);
     try {
       driver.run(sql);
       return true;
     } catch (CommandProcessorException e) {
-      LOG.error("Statement: " + sql + " failed: " + e);
+      LOG.error("Statement: {} failed: ", sql, e);
       return false;
     }
   }
 
+  private static void dropTables(IDriver driver, String... tables) {
+    HiveConf queryConf = driver.getQueryState().getConf();
+    queryConf.setBoolVar(HiveConf.ConfVars.HIVE_SUPPORT_CONCURRENCY, false);
+    for (String table : tables) {
+      runQuery(driver, "drop table if exists " + table);
+    }
+    queryConf.setBoolVar(HiveConf.ConfVars.HIVE_SUPPORT_CONCURRENCY, true);
+  }
 
-  private static ArrayList<String> queryTable(IDriver driver, String query) 
throws IOException {
+  private static List<String> queryTable(IDriver driver, String query) throws 
IOException {
     try {
       driver.run(query);
     } catch (CommandProcessorException e) {
       throw new RuntimeException(query + " failed: " + e);
     }
-    ArrayList<String> res = new ArrayList<String>();
+    List<String> res = new ArrayList<>();
     driver.getResults(res);
     return res;
   }
@@ -3251,10 +3164,10 @@ public boolean equals(Object o) {
       if (field2 != that.field2) {
         return false;
       }
-      if (field1 != null ? !field1.equals(that.field1) : that.field1 != null) {
+      if (!Objects.equals(field1, that.field1)) {
         return false;
       }
-      return !(field3 != null ? !field3.equals(that.field3) : that.field3 != 
null);
+      return Objects.equals(field3, that.field3);
 
     }
 
@@ -3325,8 +3238,6 @@ public Set<String> getPartitions() {
 
     /**
      * allows testing of "unexpected" errors
-     *
-     * @throws StreamingIOFailure
      */
     private void produceFault() throws StreamingIOFailure {
       if (shouldThrow) {
@@ -3345,7 +3256,7 @@ void disableErrors() {
     @Override
     public Path getDeltaFileLocation(List<String> partitionValues,
         Integer bucketId, Long minWriteId, Long maxWriteId, Integer 
statementId,
-        Table table) throws StreamingException {
+        Table table) {
       return null;
     }
   }

Reply via email to