http://git-wip-us.apache.org/repos/asf/hive/blob/61c6a702/streaming/src/test/org/apache/hive/streaming/TestStreaming.java
----------------------------------------------------------------------
diff --git a/streaming/src/test/org/apache/hive/streaming/TestStreaming.java 
b/streaming/src/test/org/apache/hive/streaming/TestStreaming.java
index 0ec3048..a6fdd66 100644
--- a/streaming/src/test/org/apache/hive/streaming/TestStreaming.java
+++ b/streaming/src/test/org/apache/hive/streaming/TestStreaming.java
@@ -35,6 +35,7 @@ import java.util.Collection;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
+import java.util.Set;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicBoolean;
 
@@ -53,12 +54,10 @@ import org.apache.hadoop.hive.conf.HiveConf;
 import org.apache.hadoop.hive.conf.Validator;
 import org.apache.hadoop.hive.metastore.HiveMetaStoreClient;
 import org.apache.hadoop.hive.metastore.IMetaStoreClient;
-import org.apache.hadoop.hive.metastore.api.FieldSchema;
 import org.apache.hadoop.hive.metastore.api.GetOpenTxnsInfoResponse;
 import org.apache.hadoop.hive.metastore.api.LockState;
 import org.apache.hadoop.hive.metastore.api.LockType;
 import org.apache.hadoop.hive.metastore.api.MetaException;
-import org.apache.hadoop.hive.metastore.api.NoSuchObjectException;
 import org.apache.hadoop.hive.metastore.api.Partition;
 import org.apache.hadoop.hive.metastore.api.ShowLocksRequest;
 import org.apache.hadoop.hive.metastore.api.ShowLocksResponse;
@@ -90,13 +89,11 @@ import 
org.apache.hadoop.hive.serde2.objectinspector.StructObjectInspector;
 import 
org.apache.hadoop.hive.serde2.objectinspector.primitive.WritableIntObjectInspector;
 import 
org.apache.hadoop.hive.serde2.objectinspector.primitive.WritableLongObjectInspector;
 import 
org.apache.hadoop.hive.serde2.objectinspector.primitive.WritableStringObjectInspector;
-import org.apache.hadoop.hive.shims.Utils;
 import org.apache.hadoop.io.NullWritable;
 import org.apache.hadoop.mapred.InputFormat;
 import org.apache.hadoop.mapred.InputSplit;
 import org.apache.hadoop.mapred.JobConf;
 import org.apache.hadoop.mapred.Reporter;
-import org.apache.hadoop.security.UserGroupInformation;
 import org.apache.orc.impl.OrcAcidUtils;
 import org.apache.orc.tools.FileDump;
 import org.apache.thrift.TException;
@@ -115,6 +112,7 @@ public class TestStreaming {
 
   public static class RawFileSystem extends RawLocalFileSystem {
     private static final URI NAME;
+
     static {
       try {
         NAME = new URI("raw:///");
@@ -128,12 +126,16 @@ public class TestStreaming {
       return NAME;
     }
 
+    @Override
+    public String getScheme() {
+      return "raw";
+    }
 
     @Override
     public FileStatus getFileStatus(Path path) throws IOException {
       File file = pathToFile(path);
       if (!file.exists()) {
-        throw new FileNotFoundException("Can't find " + path);
+        throw new FileNotFoundException("Can'table find " + path);
       }
       // get close enough
       short mod = 0;
@@ -147,32 +149,30 @@ public class TestStreaming {
         mod |= 0111;
       }
       return new FileStatus(file.length(), file.isDirectory(), 1, 1024,
-          file.lastModified(), file.lastModified(),
-          FsPermission.createImmutable(mod), "owen", "users", path);
+        file.lastModified(), file.lastModified(),
+        FsPermission.createImmutable(mod), "owen", "users", path);
     }
   }
 
   private static final String COL1 = "id";
   private static final String COL2 = "msg";
 
-  private final HiveConf conf;
+  private static HiveConf conf = null;
   private IDriver driver;
   private final IMetaStoreClient msClient;
 
-  final String metaStoreURI = null;
-
   // partitioned table
   private final static String dbName = "testing";
   private final static String tblName = "alerts";
-  private final static String[] fieldNames = new String[]{COL1,COL2};
-  List<String> partitionVals;
+  private final static String[] fieldNames = new String[]{COL1, COL2};
+  static List<String> partitionVals;
   private static Path partLoc;
   private static Path partLoc2;
 
   // unpartitioned table
   private final static String dbName2 = "testing2";
   private final static String tblName2 = "alerts";
-  private final static String[] fieldNames2 = new String[]{COL1,COL2};
+  private final static String[] fieldNames2 = new String[]{COL1, COL2};
 
 
   // for bucket join testing
@@ -201,13 +201,9 @@ public class TestStreaming {
 
     conf = new HiveConf(this.getClass());
     conf.set("fs.raw.impl", RawFileSystem.class.getName());
-    conf
-    .setVar(HiveConf.ConfVars.HIVE_AUTHORIZATION_MANAGER,
-        
"org.apache.hadoop.hive.ql.security.authorization.plugin.sqlstd.SQLStdHiveAuthorizerFactory");
+    conf.setVar(HiveConf.ConfVars.HIVE_AUTHORIZATION_MANAGER,
+      
"org.apache.hadoop.hive.ql.security.authorization.plugin.sqlstd.SQLStdHiveAuthorizerFactory");
     TxnDbUtil.setConfValues(conf);
-    if (metaStoreURI!=null) {
-      conf.setVar(HiveConf.ConfVars.METASTOREURIS, metaStoreURI);
-    }
     conf.setBoolVar(HiveConf.ConfVars.METASTORE_EXECUTE_SET_UGI, true);
     conf.setBoolVar(HiveConf.ConfVars.HIVE_SUPPORT_CONCURRENCY, true);
     dbFolder.create();
@@ -229,12 +225,13 @@ public class TestStreaming {
     // drop and recreate the necessary databases and tables
     dropDB(msClient, dbName);
 
-    String[] colNames = new String[] {COL1, COL2};
-    String[] colTypes = new String[] {serdeConstants.INT_TYPE_NAME, 
serdeConstants.STRING_TYPE_NAME};
-    String[] bucketCols = new String[] {COL1};
+    String[] colNames = new String[]{COL1, COL2};
+    String[] colTypes = new String[]{serdeConstants.INT_TYPE_NAME, 
serdeConstants.STRING_TYPE_NAME};
+    String[] bucketCols = new String[]{COL1};
     String loc1 = dbFolder.newFolder(dbName + ".db").toString();
     String[] partNames = new String[]{"Continent", "Country"};
-    partLoc = createDbAndTable(driver, dbName, tblName, partitionVals, 
colNames, colTypes, bucketCols, partNames, loc1, 1);
+    partLoc = createDbAndTable(driver, dbName, tblName, partitionVals, 
colNames, colTypes, bucketCols, partNames, loc1,
+      1);
 
     dropDB(msClient, dbName2);
     String loc2 = dbFolder.newFolder(dbName2 + ".db").toString();
@@ -249,19 +246,11 @@ public class TestStreaming {
   }
 
   @After
-  public void cleanup() throws Exception {
+  public void cleanup() {
     msClient.close();
     driver.close();
   }
 
-  private static List<FieldSchema> getPartitionKeys() {
-    List<FieldSchema> fields = new ArrayList<FieldSchema>();
-    // Defining partition names in unsorted order
-    fields.add(new FieldSchema("continent", serdeConstants.STRING_TYPE_NAME, 
""));
-    fields.add(new FieldSchema("country", serdeConstants.STRING_TYPE_NAME, 
""));
-    return fields;
-  }
-
   private void createStoreSales(String dbName, String loc) throws Exception {
     String dbUri = "raw://" + new Path(loc).toUri().toString();
     String tableLoc = dbUri + Path.SEPARATOR + "store_sales";
@@ -301,43 +290,48 @@ public class TestStreaming {
       ")\n" +
       " partitioned by (dt string)\n" +
       "clustered by (ss_store_sk, ss_promo_sk)\n" +
-      "INTO 4 BUCKETS stored as orc " + " location '" + tableLoc +  "'" + "  
TBLPROPERTIES ('orc.compress'='NONE', 'transactional'='true')");
+      "INTO 4 BUCKETS stored as orc " + " location '" + tableLoc + "'" +
+      "  TBLPROPERTIES ('orc.compress'='NONE', 'transactional'='true')");
     Assert.assertTrue(success);
 
     success = runDDL(driver, "alter table store_sales add 
partition(dt='2015')");
     Assert.assertTrue(success);
   }
+
   /**
    * make sure it works with table where bucket col is not 1st col
+   *
    * @throws Exception
    */
   @Test
   public void testBucketingWhereBucketColIsNotFirstCol() throws Exception {
     List<String> partitionVals = new ArrayList<String>();
     partitionVals.add("2015");
-    HiveEndPoint endPt = new HiveEndPoint(metaStoreURI, "testing5", 
"store_sales", partitionVals);
-    StreamingConnection connection = endPt.newConnection(false, "UT_" + 
Thread.currentThread().getName());
-    DelimitedInputWriter writer = new DelimitedInputWriter(new String[] 
{"ss_sold_date_sk","ss_sold_time_sk", "ss_item_sk",
-      "ss_customer_sk", "ss_cdemo_sk", "ss_hdemo_sk", "ss_addr_sk", 
"ss_store_sk", "ss_promo_sk", "ss_ticket_number", "ss_quantity",
-      "ss_wholesale_cost", "ss_list_price", "ss_sales_price", 
"ss_ext_discount_amt", "ss_ext_sales_price", "ss_ext_wholesale_cost",
-      "ss_ext_list_price", "ss_ext_tax", "ss_coupon_amt", "ss_net_paid", 
"ss_net_paid_inc_tax", "ss_net_profit"},",", endPt, connection);
-
-    TransactionBatch txnBatch =  connection.fetchTransactionBatch(2, writer);
-    txnBatch.beginNextTransaction();
-
+    StrictDelimitedInputWriter writer = StrictDelimitedInputWriter.newBuilder()
+      .withFieldDelimiter(',')
+      .build();
+    HiveStreamingConnection connection = HiveStreamingConnection.newBuilder()
+      .withDatabase("testing5")
+      .withTable("store_sales")
+      .withStaticPartitionValues(partitionVals)
+      .withAgentInfo("UT_" + Thread.currentThread().getName())
+      .withRecordWriter(writer)
+      .withHiveConf(conf)
+      .connect();
+
+    connection.beginTransaction();
     StringBuilder row = new StringBuilder();
-    for(int i = 0; i < 10; i++) {
-      for(int ints = 0; ints < 11; ints++) {
+    for (int i = 0; i < 10; i++) {
+      for (int ints = 0; ints < 11; ints++) {
         row.append(ints).append(',');
       }
-      for(int decs = 0; decs < 12; decs++) {
+      for (int decs = 0; decs < 12; decs++) {
         row.append(i + 0.1).append(',');
       }
       row.setLength(row.length() - 1);
-      txnBatch.write(row.toString().getBytes());
+      connection.write(row.toString().getBytes());
     }
-    txnBatch.commit();
-    txnBatch.close();
+    connection.commitTransaction();
     connection.close();
 
     ArrayList<String> res = queryTable(driver, "select row__id.bucketid, * 
from testing5.store_sales");
@@ -352,35 +346,41 @@ public class TestStreaming {
   @Test
   public void testNoBuckets() throws Exception {
     queryTable(driver, "drop table if exists default.streamingnobuckets");
-    //todo: why does it need transactional_properties?
-    queryTable(driver, "create table default.streamingnobuckets (a string, b 
string) stored as orc TBLPROPERTIES('transactional'='true', 
'transactional_properties'='default')");
+    queryTable(driver, "create table default.streamingnobuckets (a string, b 
string) stored as orc " +
+      "TBLPROPERTIES('transactional'='true')");
     queryTable(driver, "insert into default.streamingnobuckets 
values('foo','bar')");
     List<String> rs = queryTable(driver, "select * from 
default.streamingnobuckets");
     Assert.assertEquals(1, rs.size());
     Assert.assertEquals("foo\tbar", rs.get(0));
-    HiveEndPoint endPt = new HiveEndPoint(metaStoreURI, "Default", 
"streamingNoBuckets", null);
-    String[] colNames1 = new String[] { "a", "b" };
-    StreamingConnection connection = endPt.newConnection(false, "UT_" + 
Thread.currentThread().getName());
-    DelimitedInputWriter wr = new DelimitedInputWriter(colNames1,",",  endPt, 
connection);
-
-    TransactionBatch txnBatch =  connection.fetchTransactionBatch(2, wr);
-    txnBatch.beginNextTransaction();
-    txnBatch.write("a1,b2".getBytes());
-    txnBatch.write("a3,b4".getBytes());
+    StrictDelimitedInputWriter wr = StrictDelimitedInputWriter.newBuilder()
+      .withFieldDelimiter(',')
+      .build();
+    HiveStreamingConnection connection = HiveStreamingConnection.newBuilder()
+      .withDatabase("Default")
+      .withTable("streamingNoBuckets")
+      .withAgentInfo("UT_" + Thread.currentThread().getName())
+      .withTransactionBatchSize(2)
+      .withRecordWriter(wr)
+      .withHiveConf(conf)
+      .connect();
+
+    connection.beginTransaction();
+    connection.write("a1,b2".getBytes());
+    connection.write("a3,b4".getBytes());
     TxnStore txnHandler = TxnUtils.getTxnStore(conf);
     ShowLocksResponse resp = txnHandler.showLocks(new ShowLocksRequest());
     Assert.assertEquals(resp.getLocksSize(), 1);
     Assert.assertEquals("streamingnobuckets", 
resp.getLocks().get(0).getTablename());
     Assert.assertEquals("default", resp.getLocks().get(0).getDbname());
-    txnBatch.commit();
-    txnBatch.beginNextTransaction();
-    txnBatch.write("a5,b6".getBytes());
-    txnBatch.write("a7,b8".getBytes());
-    txnBatch.commit();
-    txnBatch.close();
+    connection.commitTransaction();
+    connection.beginTransaction();
+    connection.write("a5,b6".getBytes());
+    connection.write("a7,b8".getBytes());
+    connection.commitTransaction();
+    connection.close();
 
     Assert.assertEquals("", 0, 
BucketCodec.determineVersion(536870912).decodeWriterId(536870912));
-    rs = queryTable(driver,"select ROW__ID, a, b, INPUT__FILE__NAME from 
default.streamingnobuckets order by ROW__ID");
+    rs = queryTable(driver, "select ROW__ID, a, b, INPUT__FILE__NAME from 
default.streamingnobuckets order by ROW__ID");
 
     Assert.assertTrue(rs.get(0), 
rs.get(0).startsWith("{\"writeid\":1,\"bucketid\":536870912,\"rowid\":0}\tfoo\tbar"));
     Assert.assertTrue(rs.get(0), 
rs.get(0).endsWith("streamingnobuckets/delta_0000001_0000001_0000/bucket_00000"));
@@ -404,7 +404,7 @@ public class TestStreaming {
 
     queryTable(driver, "alter table default.streamingnobuckets compact 
'major'");
     runWorker(conf);
-    rs = queryTable(driver,"select ROW__ID, a, b, INPUT__FILE__NAME from 
default.streamingnobuckets order by ROW__ID");
+    rs = queryTable(driver, "select ROW__ID, a, b, INPUT__FILE__NAME from 
default.streamingnobuckets order by ROW__ID");
 
     Assert.assertTrue(rs.get(0), 
rs.get(0).startsWith("{\"writeid\":1,\"bucketid\":536870912,\"rowid\":0}\tfoo\tbar"));
     Assert.assertTrue(rs.get(0), 
rs.get(0).endsWith("streamingnobuckets/base_0000005/bucket_00000"));
@@ -416,6 +416,152 @@ public class TestStreaming {
     Assert.assertTrue(rs.get(3), 
rs.get(3).endsWith("streamingnobuckets/base_0000005/bucket_00000"));
   }
 
+  @Test
+  public void testAllTypesDelimitedWriter() throws Exception {
+    queryTable(driver, "drop table if exists default.alltypes");
+    queryTable(driver,
+      "create table if not exists default.alltypes ( bo boolean, ti tinyint, 
si smallint, i int, bi bigint, " +
+        "f float, d double, de decimal(10,3), ts timestamp, da date, s string, 
c char(5), vc varchar(5), " +
+        "m map<string, string>, l array<int>, st struct<c1:int, c2:string> ) " 
+
+        "stored as orc TBLPROPERTIES('transactional'='true')");
+    StrictDelimitedInputWriter wr = StrictDelimitedInputWriter.newBuilder()
+      .withFieldDelimiter('|')
+      .withCollectionDelimiter(',')
+      .withMapKeyDelimiter(':')
+      .build();
+    HiveStreamingConnection connection = HiveStreamingConnection.newBuilder()
+      .withDatabase("default")
+      .withTable("alltypes")
+      .withAgentInfo("UT_" + Thread.currentThread().getName())
+      .withTransactionBatchSize(2)
+      .withRecordWriter(wr)
+      .withHiveConf(conf)
+      .connect();
+
+    String row1 = "true|10|100|1000|10000|4.0|20.0|4.2222|1969-12-31 " +
+      "15:59:58.174|1970-01-01|string|hello|hello|k1:v1|100,200|10,foo";
+    String row2 = "false|20|200|2000|20000|8.0|40.0|2.2222|1970-12-31 
15:59:58.174|1971-01-01|abcd|world|world|" +
+      "k4:v4|200,300|20,bar";
+    connection.beginTransaction();
+    connection.write(row1.getBytes());
+    connection.write(row2.getBytes());
+    connection.commitTransaction();
+    connection.close();
+
+    List<String> rs = queryTable(driver, "select ROW__ID, bo, ti, si, i, bi, 
f, d, de, ts, da, s, c, vc, m, l, st," +
+      " INPUT__FILE__NAME from default.alltypes order by ROW__ID");
+    Assert.assertEquals(2, rs.size());
+    String gotRow1 = rs.get(0);
+    String expectedPrefixRow1 = "{\"writeid\":1,\"bucketid\":536870912," +
+      "\"rowid\":0}\ttrue\t10\t100\t1000\t10000\t4.0\t20.0\t4.222\t1969-12-31 
15:59:58.174\t1970-01-01\tstring" +
+      "\thello\thello\t{\"k1\":\"v1\"}\t[100,200]\t{\"c1\":10,\"c2\":\"foo\"}";
+    String expectedSuffixRow1 = "alltypes/delta_0000001_0000002/bucket_00000";
+    String gotRow2 = rs.get(1);
+    String expectedPrefixRow2 = "{\"writeid\":1,\"bucketid\":536870912," +
+      "\"rowid\":1}\tfalse\t20\t200\t2000\t20000\t8.0\t40.0\t2.222\t1970-12-31 
15:59:58.174\t1971-01-01\tabcd" +
+      "\tworld\tworld\t{\"k4\":\"v4\"}\t[200,300]\t{\"c1\":20,\"c2\":\"bar\"}";
+    String expectedSuffixRow2 = "alltypes/delta_0000001_0000002/bucket_00000";
+    Assert.assertTrue(gotRow1, gotRow1.startsWith(expectedPrefixRow1));
+    Assert.assertTrue(gotRow1, gotRow1.endsWith(expectedSuffixRow1));
+    Assert.assertTrue(gotRow2, gotRow2.startsWith(expectedPrefixRow2));
+    Assert.assertTrue(gotRow2, gotRow2.endsWith(expectedSuffixRow2));
+  }
+
+  @Test
+  public void testAutoRollTransactionBatch() throws Exception {
+    queryTable(driver, "drop table if exists default.streamingnobuckets");
+    queryTable(driver, "create table default.streamingnobuckets (a string, b 
string) stored as orc " +
+      "TBLPROPERTIES('transactional'='true')");
+    queryTable(driver, "insert into default.streamingnobuckets 
values('foo','bar')");
+    List<String> rs = queryTable(driver, "select * from 
default.streamingnobuckets");
+    Assert.assertEquals(1, rs.size());
+    Assert.assertEquals("foo\tbar", rs.get(0));
+    StrictDelimitedInputWriter wr = StrictDelimitedInputWriter.newBuilder()
+      .withFieldDelimiter(',')
+      .build();
+    HiveStreamingConnection connection = HiveStreamingConnection.newBuilder()
+      .withDatabase("default")
+      .withTable("streamingnobuckets")
+      .withAgentInfo("UT_" + Thread.currentThread().getName())
+      .withRecordWriter(wr)
+      .withHiveConf(conf)
+      .withTransactionBatchSize(2)
+      .connect();
+
+    connection.beginTransaction();
+    connection.write("a1,b2".getBytes());
+    connection.write("a3,b4".getBytes());
+    connection.commitTransaction();
+    connection.beginTransaction();
+    connection.write("a5,b6".getBytes());
+    connection.write("a7,b8".getBytes());
+    connection.commitTransaction();
+    // should have rolled over to next transaction batch
+    connection.beginTransaction();
+    connection.write("a9,b10".getBytes());
+    connection.write("a11,b12".getBytes());
+    connection.commitTransaction();
+    connection.beginTransaction();
+    connection.write("a13,b14".getBytes());
+    connection.write("a15,b16".getBytes());
+    connection.commitTransaction();
+    connection.close();
+
+    Assert.assertEquals("", 0, 
BucketCodec.determineVersion(536870912).decodeWriterId(536870912));
+    rs = queryTable(driver, "select ROW__ID, a, b, INPUT__FILE__NAME from 
default.streamingnobuckets order by ROW__ID");
+
+    Assert.assertTrue(rs.get(0), 
rs.get(0).startsWith("{\"writeid\":1,\"bucketid\":536870912,\"rowid\":0}\tfoo\tbar"));
+    Assert.assertTrue(rs.get(0), 
rs.get(0).endsWith("streamingnobuckets/delta_0000001_0000001_0000/bucket_00000"));
+    Assert.assertTrue(rs.get(1), 
rs.get(1).startsWith("{\"writeid\":2,\"bucketid\":536870912,\"rowid\":0}\ta1\tb2"));
+    Assert.assertTrue(rs.get(1), 
rs.get(1).endsWith("streamingnobuckets/delta_0000002_0000003/bucket_00000"));
+    Assert.assertTrue(rs.get(2), 
rs.get(2).startsWith("{\"writeid\":2,\"bucketid\":536870912,\"rowid\":1}\ta3\tb4"));
+    Assert.assertTrue(rs.get(2), 
rs.get(2).endsWith("streamingnobuckets/delta_0000002_0000003/bucket_00000"));
+    Assert.assertTrue(rs.get(3), 
rs.get(3).startsWith("{\"writeid\":3,\"bucketid\":536870912,\"rowid\":0}\ta5\tb6"));
+    Assert.assertTrue(rs.get(3), 
rs.get(3).endsWith("streamingnobuckets/delta_0000002_0000003/bucket_00000"));
+    Assert.assertTrue(rs.get(4), 
rs.get(4).startsWith("{\"writeid\":3,\"bucketid\":536870912,\"rowid\":1}\ta7\tb8"));
+    Assert.assertTrue(rs.get(4), 
rs.get(4).endsWith("streamingnobuckets/delta_0000002_0000003/bucket_00000"));
+
+    Assert.assertTrue(rs.get(5), 
rs.get(5).startsWith("{\"writeid\":4,\"bucketid\":536870912,\"rowid\":0}\ta9\tb10"));
+    Assert.assertTrue(rs.get(5), 
rs.get(5).endsWith("streamingnobuckets/delta_0000004_0000005/bucket_00000"));
+    Assert.assertTrue(rs.get(6), 
rs.get(6).startsWith("{\"writeid\":4,\"bucketid\":536870912,\"rowid\":1}\ta11\tb12"));
+    Assert.assertTrue(rs.get(6), 
rs.get(6).endsWith("streamingnobuckets/delta_0000004_0000005/bucket_00000"));
+    Assert.assertTrue(rs.get(7), 
rs.get(7).startsWith("{\"writeid\":5,\"bucketid\":536870912,\"rowid\":0}\ta13\tb14"));
+    Assert.assertTrue(rs.get(7), 
rs.get(7).endsWith("streamingnobuckets/delta_0000004_0000005/bucket_00000"));
+    Assert.assertTrue(rs.get(8), 
rs.get(8).startsWith("{\"writeid\":5,\"bucketid\":536870912,\"rowid\":1}\ta15\tb16"));
+    Assert.assertTrue(rs.get(8), 
rs.get(8).endsWith("streamingnobuckets/delta_0000004_0000005/bucket_00000"));
+
+    queryTable(driver, "update default.streamingnobuckets set a=0, b=0 where 
a='a7'");
+    queryTable(driver, "delete from default.streamingnobuckets where a='a1'");
+    queryTable(driver, "update default.streamingnobuckets set a=0, b=0 where 
a='a15'");
+    queryTable(driver, "delete from default.streamingnobuckets where a='a9'");
+    rs = queryTable(driver, "select a, b from default.streamingnobuckets order 
by a, b");
+    int row = 0;
+    Assert.assertEquals("at row=" + row, "0\t0", rs.get(row++));
+    Assert.assertEquals("at row=" + row, "0\t0", rs.get(row++));
+    Assert.assertEquals("at row=" + row, "a11\tb12", rs.get(row++));
+    Assert.assertEquals("at row=" + row, "a13\tb14", rs.get(row++));
+    Assert.assertEquals("at row=" + row, "a3\tb4", rs.get(row++));
+    Assert.assertEquals("at row=" + row, "a5\tb6", rs.get(row++));
+    Assert.assertEquals("at row=" + row, "foo\tbar", rs.get(row++));
+
+    queryTable(driver, "alter table default.streamingnobuckets compact 
'major'");
+    runWorker(conf);
+    rs = queryTable(driver, "select ROW__ID, a, b, INPUT__FILE__NAME from 
default.streamingnobuckets order by ROW__ID");
+
+    Assert.assertTrue(rs.get(0), 
rs.get(0).startsWith("{\"writeid\":1,\"bucketid\":536870912,\"rowid\":0}\tfoo\tbar"));
+    Assert.assertTrue(rs.get(0), 
rs.get(0).endsWith("streamingnobuckets/base_0000009/bucket_00000"));
+    Assert.assertTrue(rs.get(1), 
rs.get(1).startsWith("{\"writeid\":2,\"bucketid\":536870912,\"rowid\":1}\ta3\tb4"));
+    Assert.assertTrue(rs.get(1), 
rs.get(1).endsWith("streamingnobuckets/base_0000009/bucket_00000"));
+    Assert.assertTrue(rs.get(2), 
rs.get(2).startsWith("{\"writeid\":3,\"bucketid\":536870912,\"rowid\":0}\ta5\tb6"));
+    Assert.assertTrue(rs.get(2), 
rs.get(2).endsWith("streamingnobuckets/base_0000009/bucket_00000"));
+    Assert.assertTrue(rs.get(3), 
rs.get(3).startsWith("{\"writeid\":4,\"bucketid\":536870912,\"rowid\":1}\ta11\tb12"));
+    Assert.assertTrue(rs.get(3), 
rs.get(3).endsWith("streamingnobuckets/base_0000009/bucket_00000"));
+    Assert.assertTrue(rs.get(4), 
rs.get(4).startsWith("{\"writeid\":5,\"bucketid\":536870912,\"rowid\":0}\ta13\tb14"));
+    Assert.assertTrue(rs.get(4), 
rs.get(4).endsWith("streamingnobuckets/base_0000009/bucket_00000"));
+    Assert.assertTrue(rs.get(5), 
rs.get(5).startsWith("{\"writeid\":6,\"bucketid\":536870912,\"rowid\":0}\t0\t0"));
+    Assert.assertTrue(rs.get(5), 
rs.get(5).endsWith("streamingnobuckets/base_0000009/bucket_00000"));
+  }
+
   /**
    * this is a clone from TestTxnStatement2....
    */
@@ -436,71 +582,83 @@ public class TestStreaming {
     int bucketCount = 100;
 
     String dbUri = "raw://" + new 
Path(dbFolder.newFolder().toString()).toUri().toString();
-    String tableLoc  = "'" + dbUri + Path.SEPARATOR + "streamedtable" + "'";
+    String tableLoc = "'" + dbUri + Path.SEPARATOR + "streamedtable" + "'";
     String tableLoc2 = "'" + dbUri + Path.SEPARATOR + "finaltable" + "'";
     String tableLoc3 = "'" + dbUri + Path.SEPARATOR + "nobucket" + "'";
 
-    runDDL(driver, "create database testBucketing3");
-    runDDL(driver, "use testBucketing3");
-    runDDL(driver, "create table streamedtable ( key1 string,key2 int,data 
string ) clustered by ( key1,key2 ) into "
-            + bucketCount + " buckets  stored as orc  location " + tableLoc + 
" TBLPROPERTIES ('transactional'='true')") ;
-//  In 'nobucket' table we capture bucketid from streamedtable to workaround a 
hive bug that prevents joins two identically bucketed tables
-    runDDL(driver, "create table nobucket ( bucketid int, key1 string,key2 
int,data string ) location " + tableLoc3) ;
-    runDDL(driver, "create table finaltable ( bucketid int, key1 string,key2 
int,data string ) clustered by ( key1,key2 ) into "
-            + bucketCount + " buckets  stored as orc location " + tableLoc2 + 
" TBLPROPERTIES ('transactional'='true')");
-
-
-    String[] records = new String[] {
-    "PSFAHYLZVC,29,EPNMA",
-    "PPPRKWAYAU,96,VUTEE",
-    "MIAOFERCHI,3,WBDSI",
-    "CEGQAZOWVN,0,WCUZL",
-    "XWAKMNSVQF,28,YJVHU",
-    "XBWTSAJWME,2,KDQFO",
-    "FUVLQTAXAY,5,LDSDG",
-    "QTQMDJMGJH,6,QBOMA",
-    "EFLOTLWJWN,71,GHWPS",
-    "PEQNAOJHCM,82,CAAFI",
-    "MOEKQLGZCP,41,RUACR",
-    "QZXMCOPTID,37,LFLWE",
-    "EYALVWICRD,13,JEZLC",
-    "VYWLZAYTXX,16,DMVZX",
-    "OSALYSQIXR,47,HNZVE",
-    "JGKVHKCEGQ,25,KSCJB",
-    "WQFMMYDHET,12,DTRWA",
-    "AJOVAYZKZQ,15,YBKFO",
-    "YAQONWCUAU,31,QJNHZ",
-    "DJBXUEUOEB,35,IYCBL"
-    };
-
-    HiveEndPoint endPt = new HiveEndPoint(metaStoreURI, "testBucketing3", 
"streamedtable", null);
-    String[] colNames1 = new String[] { "key1", "key2", "data" };
-    DelimitedInputWriter wr = new DelimitedInputWriter(colNames1,",",  endPt);
-    StreamingConnection connection = endPt.newConnection(false, "UT_" + 
Thread.currentThread().getName());
-
-    TransactionBatch txnBatch =  connection.fetchTransactionBatch(2, wr);
-    txnBatch.beginNextTransaction();
-
-    for (String record : records) {
-      txnBatch.write(record.toString().getBytes());
-    }
+    // disabling vectorization as this test yields incorrect results with 
vectorization
+    conf.setBoolVar(HiveConf.ConfVars.HIVE_VECTORIZATION_ENABLED, false);
+    try (IDriver driver = DriverFactory.newDriver(conf)) {
+      runDDL(driver, "create database testBucketing3");
+      runDDL(driver, "use testBucketing3");
+      runDDL(driver, "create table streamedtable ( key1 string,key2 int,data 
string ) clustered by ( key1,key2 ) into "
+        + bucketCount + " buckets  stored as orc  location " + tableLoc + " 
TBLPROPERTIES ('transactional'='true')");
+      //  In 'nobucket' table we capture bucketid from streamedtable to 
workaround a hive bug that prevents joins two identically bucketed tables
+      runDDL(driver, "create table nobucket ( bucketid int, key1 string,key2 
int,data string ) location " + tableLoc3);
+      runDDL(driver,
+        "create table finaltable ( bucketid int, key1 string,key2 int,data 
string ) clustered by ( key1,key2 ) into "
+          + bucketCount + " buckets  stored as orc location " + tableLoc2 + " 
TBLPROPERTIES ('transactional'='true')");
+
+
+      String[] records = new String[]{
+        "PSFAHYLZVC,29,EPNMA",
+        "PPPRKWAYAU,96,VUTEE",
+        "MIAOFERCHI,3,WBDSI",
+        "CEGQAZOWVN,0,WCUZL",
+        "XWAKMNSVQF,28,YJVHU",
+        "XBWTSAJWME,2,KDQFO",
+        "FUVLQTAXAY,5,LDSDG",
+        "QTQMDJMGJH,6,QBOMA",
+        "EFLOTLWJWN,71,GHWPS",
+        "PEQNAOJHCM,82,CAAFI",
+        "MOEKQLGZCP,41,RUACR",
+        "QZXMCOPTID,37,LFLWE",
+        "EYALVWICRD,13,JEZLC",
+        "VYWLZAYTXX,16,DMVZX",
+        "OSALYSQIXR,47,HNZVE",
+        "JGKVHKCEGQ,25,KSCJB",
+        "WQFMMYDHET,12,DTRWA",
+        "AJOVAYZKZQ,15,YBKFO",
+        "YAQONWCUAU,31,QJNHZ",
+        "DJBXUEUOEB,35,IYCBL"
+      };
+
+      StrictDelimitedInputWriter wr = StrictDelimitedInputWriter.newBuilder()
+        .withFieldDelimiter(',')
+        .build();
+      HiveStreamingConnection connection = HiveStreamingConnection.newBuilder()
+        .withDatabase("testBucketing3")
+        .withTable("streamedtable")
+        .withAgentInfo("UT_" + Thread.currentThread().getName())
+        .withRecordWriter(wr)
+        .withHiveConf(conf)
+        .connect();
+
+      connection.beginTransaction();
+
+      for (String record : records) {
+        connection.write(record.getBytes());
+      }
 
-    txnBatch.commit();
-    txnBatch.close();
-    connection.close();
+      connection.commitTransaction();
+      connection.close();
 
-    ArrayList<String> res1 = queryTable(driver, "select row__id.bucketid, * 
from streamedtable order by key2");
-    for (String re : res1) {
-      System.out.println(re);
-    }
+      ArrayList<String> res1 = queryTable(driver, "select row__id.bucketid, * 
from streamedtable order by key2");
+      for (String re : res1) {
+        LOG.error(re);
+      }
 
-    driver.run("insert into nobucket select row__id.bucketid,* from 
streamedtable");
-    runDDL(driver, " insert into finaltable select * from nobucket");
-    ArrayList<String> res2 = queryTable(driver, "select row__id.bucketid,* 
from finaltable where row__id.bucketid<>bucketid");
-    for (String s : res2) {
-      LOG.error(s);
+      driver.run("insert into nobucket select row__id.bucketid,* from 
streamedtable");
+      runDDL(driver, "insert into finaltable select * from nobucket");
+      ArrayList<String> res2 = queryTable(driver,
+        "select row__id.bucketid,* from finaltable where 
row__id.bucketid<>bucketid");
+      for (String s : res2) {
+        LOG.error(s);
+      }
+      Assert.assertTrue(res2.isEmpty());
+    } finally {
+      conf.unset(HiveConf.ConfVars.HIVE_VECTORIZATION_ENABLED.varname);
     }
-    Assert.assertTrue(res2.isEmpty());
   }
 
 
@@ -512,32 +670,53 @@ public class TestStreaming {
     String tbl1 = "validation1";
     String tbl2 = "validation2";
 
-    String tableLoc  = "'" + dbUri + Path.SEPARATOR + tbl1 + "'";
+    String tableLoc = "'" + dbUri + Path.SEPARATOR + tbl1 + "'";
     String tableLoc2 = "'" + dbUri + Path.SEPARATOR + tbl2 + "'";
 
     runDDL(driver, "create database testBucketing3");
     runDDL(driver, "use testBucketing3");
 
     runDDL(driver, "create table " + tbl1 + " ( key1 string, data string ) 
clustered by ( key1 ) into "
-            + bucketCount + " buckets  stored as orc  location " + tableLoc + 
" TBLPROPERTIES ('transactional'='false')") ;
+      + bucketCount + " buckets  stored as orc  location " + tableLoc + " 
TBLPROPERTIES ('transactional'='false')");
 
     runDDL(driver, "create table " + tbl2 + " ( key1 string, data string ) 
clustered by ( key1 ) into "
-            + bucketCount + " buckets  stored as orc  location " + tableLoc2 + 
" TBLPROPERTIES ('transactional'='false')") ;
-
+      + bucketCount + " buckets  stored as orc  location " + tableLoc2 + " 
TBLPROPERTIES ('transactional'='false')");
 
+    StrictDelimitedInputWriter writer = StrictDelimitedInputWriter.newBuilder()
+      .withFieldDelimiter(',')
+      .build();
+    HiveStreamingConnection connection = null;
     try {
-      HiveEndPoint endPt = new HiveEndPoint(metaStoreURI, "testBucketing3", 
"validation1", null);
-      endPt.newConnection(false, "UT_" + Thread.currentThread().getName());
+      connection = HiveStreamingConnection.newBuilder()
+        .withDatabase("testBucketing3")
+        .withTable("validation2")
+        .withAgentInfo("UT_" + Thread.currentThread().getName())
+        .withRecordWriter(writer)
+        .withHiveConf(conf)
+        .connect();
       Assert.assertTrue("InvalidTable exception was not thrown", false);
     } catch (InvalidTable e) {
       // expecting this exception
+    } finally {
+      if (connection != null) {
+        connection.close();
+      }
     }
     try {
-      HiveEndPoint endPt = new HiveEndPoint(metaStoreURI, "testBucketing3", 
"validation2", null);
-      endPt.newConnection(false, "UT_" + Thread.currentThread().getName());
+      connection = HiveStreamingConnection.newBuilder()
+        .withDatabase("testBucketing3")
+        .withTable("validation2")
+        .withAgentInfo("UT_" + Thread.currentThread().getName())
+        .withRecordWriter(writer)
+        .withHiveConf(conf)
+        .connect();
       Assert.assertTrue("InvalidTable exception was not thrown", false);
     } catch (InvalidTable e) {
       // expecting this exception
+    } finally {
+      if (connection != null) {
+        connection.close();
+      }
     }
   }
 
@@ -547,7 +726,7 @@ public class TestStreaming {
    */
   @Deprecated
   private void checkDataWritten(Path partitionPath, long minTxn, long maxTxn, 
int buckets, int numExpectedFiles,
-                                String... records) throws Exception {
+    String... records) throws Exception {
     ValidWriteIdList writeIds = 
msClient.getValidWriteIds(AcidUtils.getFullTableName(dbName, tblName));
     AcidUtils.Directory dir = AcidUtils.getAcidState(partitionPath, conf, 
writeIds);
     Assert.assertEquals(0, dir.getObsolete().size());
@@ -585,7 +764,7 @@ public class TestStreaming {
     InputSplit[] splits = inf.getSplits(job, buckets);
     Assert.assertEquals(numExpectedFiles, splits.length);
     org.apache.hadoop.mapred.RecordReader<NullWritable, OrcStruct> rr =
-            inf.getRecordReader(splits[0], job, Reporter.NULL);
+      inf.getRecordReader(splits[0], job, Reporter.NULL);
 
     NullWritable key = rr.createKey();
     OrcStruct value = rr.createValue();
@@ -595,12 +774,13 @@ public class TestStreaming {
     }
     Assert.assertEquals(false, rr.next(key, value));
   }
+
   /**
    * @param validationQuery query to read from table to compare data against 
{@code records}
-   * @param records expected data.  each row is CVS list of values
+   * @param records         expected data.  each row is CVS list of values
    */
   private void checkDataWritten2(Path partitionPath, long minTxn, long maxTxn, 
int numExpectedFiles,
-                                String validationQuery, boolean vectorize, 
String... records) throws Exception {
+    String validationQuery, boolean vectorize, String... records) throws 
Exception {
     ValidWriteIdList txns = 
msClient.getValidWriteIds(AcidUtils.getFullTableName(dbName, tblName));
     AcidUtils.Directory dir = AcidUtils.getAcidState(partitionPath, conf, 
txns);
     Assert.assertEquals(0, dir.getObsolete().size());
@@ -626,12 +806,13 @@ public class TestStreaming {
     Assert.assertEquals(minTxn, min);
     Assert.assertEquals(maxTxn, max);
     boolean isVectorizationEnabled = 
conf.getBoolVar(HiveConf.ConfVars.HIVE_VECTORIZATION_ENABLED);
-    if(vectorize) {
+    if (vectorize) {
       conf.setBoolVar(HiveConf.ConfVars.HIVE_VECTORIZATION_ENABLED, true);
     }
 
     String currStrategy = 
conf.getVar(HiveConf.ConfVars.HIVE_ORC_SPLIT_STRATEGY);
-    for(String strategy : 
((Validator.StringSet)HiveConf.ConfVars.HIVE_ORC_SPLIT_STRATEGY.getValidator()).getExpected())
 {
+    for (String strategy : ((Validator.StringSet) 
HiveConf.ConfVars.HIVE_ORC_SPLIT_STRATEGY.getValidator())
+      .getExpected()) {
       //run it with each split strategy - make sure there are differences
       conf.setVar(HiveConf.ConfVars.HIVE_ORC_SPLIT_STRATEGY, 
strategy.toUpperCase());
       List<String> actualResult = queryTable(driver, validationQuery);
@@ -656,35 +837,44 @@ public class TestStreaming {
   @Test
   public void testEndpointConnection() throws Exception {
     // For partitioned table, partitionVals are specified
-    HiveEndPoint endPt = new HiveEndPoint(metaStoreURI, dbName, tblName, 
partitionVals);
-    StreamingConnection connection = endPt.newConnection(false, "UT_" + 
Thread.currentThread().getName()); //shouldn't throw
+    StrictDelimitedInputWriter writer = StrictDelimitedInputWriter.newBuilder()
+      .withFieldDelimiter(',')
+      .build();
+    HiveStreamingConnection connection = HiveStreamingConnection.newBuilder()
+      .withDatabase(dbName)
+      .withTable(tblName)
+      .withStaticPartitionValues(partitionVals)
+      .withAgentInfo("UT_" + Thread.currentThread().getName())
+      .withRecordWriter(writer)
+      .withHiveConf(conf)
+      .connect();
     connection.close();
 
     // For unpartitioned table, partitionVals are not specified
-    endPt = new HiveEndPoint(metaStoreURI, dbName2, tblName2, null);
-    endPt.newConnection(false, "UT_" + 
Thread.currentThread().getName()).close(); // should not throw
-
-    // For partitioned table, partitionVals are not specified
-    try {
-      endPt = new HiveEndPoint(metaStoreURI, dbName, tblName, null);
-      connection = endPt.newConnection(true, "UT_" + 
Thread.currentThread().getName());
-      Assert.assertTrue("ConnectionError was not thrown", false);
-      connection.close();
-    } catch (ConnectionError e) {
-      // expecting this exception
-      String errMsg = "doesn't specify any partitions for partitioned table";
-      Assert.assertTrue(e.toString().endsWith(errMsg));
-    }
+    connection = HiveStreamingConnection.newBuilder()
+      .withDatabase(dbName2)
+      .withTable(tblName2)
+      .withAgentInfo("UT_" + Thread.currentThread().getName())
+      .withRecordWriter(writer)
+      .withHiveConf(conf)
+      .connect();
+    connection.close();
 
     // For unpartitioned table, partition values are specified
     try {
-      endPt = new HiveEndPoint(metaStoreURI, dbName2, tblName2, partitionVals);
-      connection = endPt.newConnection(false, "UT_" + 
Thread.currentThread().getName());
+      connection = HiveStreamingConnection.newBuilder()
+        .withDatabase(dbName2)
+        .withTable(tblName2)
+        .withStaticPartitionValues(partitionVals)
+        .withAgentInfo("UT_" + Thread.currentThread().getName())
+        .withRecordWriter(writer)
+        .withHiveConf(conf)
+        .connect();
       Assert.assertTrue("ConnectionError was not thrown", false);
       connection.close();
     } catch (ConnectionError e) {
       // expecting this exception
-      String errMsg = "specifies partitions for unpartitioned table";
+      String errMsg = "specifies partitions for un-partitioned table";
       Assert.assertTrue(e.toString().endsWith(errMsg));
     }
   }
@@ -695,417 +885,474 @@ public class TestStreaming {
     newPartVals.add(PART1_CONTINENT);
     newPartVals.add("Nepal");
 
-    HiveEndPoint endPt = new HiveEndPoint(metaStoreURI, dbName, tblName
-            , newPartVals);
-
-    // Ensure partition is absent
-    try {
-      msClient.getPartition(endPt.database, endPt.table, endPt.partitionVals);
-      Assert.assertTrue("Partition already exists", false);
-    } catch (NoSuchObjectException e) {
-      // expect this exception
-    }
+    StrictDelimitedInputWriter writer = StrictDelimitedInputWriter.newBuilder()
+      .withFieldDelimiter(',')
+      .build();
+    HiveStreamingConnection connection = HiveStreamingConnection.newBuilder()
+      .withDatabase(dbName)
+      .withTable(tblName)
+      .withStaticPartitionValues(newPartVals)
+      .withAgentInfo("UT_" + Thread.currentThread().getName())
+      .withRecordWriter(writer)
+      .withHiveConf(conf)
+      .connect();
 
     // Create partition
-    Assert.assertNotNull(endPt.newConnection(true, "UT_" + 
Thread.currentThread().getName()));
+    Assert.assertNotNull(connection);
 
     // Ensure partition is present
-    Partition p = msClient.getPartition(endPt.database, endPt.table, 
endPt.partitionVals);
+    Partition p = msClient.getPartition(dbName, tblName, partitionVals);
     Assert.assertNotNull("Did not find added partition", p);
   }
 
   @Test
   public void testTransactionBatchEmptyCommit() throws Exception {
     // 1)  to partitioned table
-    HiveEndPoint endPt = new HiveEndPoint(metaStoreURI, dbName, tblName,
-            partitionVals);
-    StreamingConnection connection = endPt.newConnection(false, "UT_" + 
Thread.currentThread().getName());
-    DelimitedInputWriter writer = new DelimitedInputWriter(fieldNames,",", 
endPt, connection);
-
-    TransactionBatch txnBatch =  connection.fetchTransactionBatch(10, writer);
-
-    txnBatch.beginNextTransaction();
-    txnBatch.commit();
-    Assert.assertEquals(TransactionBatch.TxnState.COMMITTED
-            , txnBatch.getCurrentTransactionState());
-    txnBatch.close();
+    StrictDelimitedInputWriter writer = StrictDelimitedInputWriter.newBuilder()
+      .withFieldDelimiter(',')
+      .build();
+    HiveStreamingConnection connection = HiveStreamingConnection.newBuilder()
+      .withDatabase(dbName)
+      .withTable(tblName)
+      .withStaticPartitionValues(partitionVals)
+      .withAgentInfo("UT_" + Thread.currentThread().getName())
+      .withRecordWriter(writer)
+      .withHiveConf(conf)
+      .connect();
+    connection.beginTransaction();
+    connection.commitTransaction();
+    Assert.assertEquals(HiveStreamingConnection.TxnState.COMMITTED
+      , connection.getCurrentTransactionState());
     connection.close();
 
     // 2) To unpartitioned table
-    endPt = new HiveEndPoint(metaStoreURI, dbName2, tblName2, null);
-    writer = new DelimitedInputWriter(fieldNames2,",", endPt);
-    connection = endPt.newConnection(false, "UT_" + 
Thread.currentThread().getName());
-
-    txnBatch =  connection.fetchTransactionBatch(10, writer);
-    txnBatch.beginNextTransaction();
-    txnBatch.commit();
-    Assert.assertEquals(TransactionBatch.TxnState.COMMITTED
-            , txnBatch.getCurrentTransactionState());
-    txnBatch.close();
+    writer = StrictDelimitedInputWriter.newBuilder()
+      .withFieldDelimiter(',')
+      .build();
+    connection = HiveStreamingConnection.newBuilder()
+      .withDatabase(dbName2)
+      .withTable(tblName2)
+      .withAgentInfo("UT_" + Thread.currentThread().getName())
+      .withRecordWriter(writer)
+      .withHiveConf(conf)
+      .connect();
+
+    connection.beginTransaction();
+    connection.commitTransaction();
+    Assert.assertEquals(HiveStreamingConnection.TxnState.COMMITTED
+      , connection.getCurrentTransactionState());
     connection.close();
   }
 
   /**
    * check that transactions that have not heartbeated and timedout get 
properly aborted
+   *
    * @throws Exception
    */
   @Test
   public void testTimeOutReaper() throws Exception {
-    HiveEndPoint endPt = new HiveEndPoint(metaStoreURI, dbName2, tblName2, 
null);
-    DelimitedInputWriter writer = new DelimitedInputWriter(fieldNames2,",", 
endPt);
-    StreamingConnection connection = endPt.newConnection(false, "UT_" + 
Thread.currentThread().getName());
-
-    TransactionBatch txnBatch =  connection.fetchTransactionBatch(5, writer);
-    txnBatch.beginNextTransaction();
+    StrictDelimitedInputWriter writer = StrictDelimitedInputWriter.newBuilder()
+      .withFieldDelimiter(',')
+      .build();
+    HiveStreamingConnection connection = HiveStreamingConnection.newBuilder()
+      .withDatabase(dbName2)
+      .withTable(tblName2)
+      .withAgentInfo("UT_" + Thread.currentThread().getName())
+      .withRecordWriter(writer)
+      .withHiveConf(conf)
+      .connect();
+
+    connection.beginTransaction();
     conf.setTimeVar(HiveConf.ConfVars.HIVE_TIMEDOUT_TXN_REAPER_START, 0, 
TimeUnit.SECONDS);
     //ensure txn timesout
-    conf.setTimeVar(HiveConf.ConfVars.HIVE_TXN_TIMEOUT, 1, 
TimeUnit.MILLISECONDS);
+    conf.setTimeVar(HiveConf.ConfVars.HIVE_TXN_TIMEOUT, 2, 
TimeUnit.MILLISECONDS);
     AcidHouseKeeperService houseKeeperService = new AcidHouseKeeperService();
     houseKeeperService.setConf(conf);
     houseKeeperService.run();
     try {
       //should fail because the TransactionBatch timed out
-      txnBatch.commit();
-    }
-    catch(TransactionError e) {
+      connection.commitTransaction();
+    } catch (TransactionError e) {
       Assert.assertTrue("Expected aborted transaction", e.getCause() 
instanceof TxnAbortedException);
     }
-    txnBatch.close();
-    txnBatch =  connection.fetchTransactionBatch(10, writer);
-    txnBatch.beginNextTransaction();
-    txnBatch.commit();
-    txnBatch.beginNextTransaction();
+    connection.close();
+    connection = HiveStreamingConnection.newBuilder()
+      .withDatabase(dbName2)
+      .withTable(tblName2)
+      .withAgentInfo("UT_" + Thread.currentThread().getName())
+      .withRecordWriter(writer)
+      .withHiveConf(conf)
+      .connect();
+    connection.beginTransaction();
+    connection.commitTransaction();
+    connection.beginTransaction();
     houseKeeperService.run();
     try {
       //should fail because the TransactionBatch timed out
-      txnBatch.commit();
-    }
-    catch(TransactionError e) {
+      connection.commitTransaction();
+    } catch (TransactionError e) {
       Assert.assertTrue("Expected aborted transaction", e.getCause() 
instanceof TxnAbortedException);
     }
-    txnBatch.close();
     connection.close();
   }
 
   @Test
   public void testHeartbeat() throws Exception {
-    HiveEndPoint endPt = new HiveEndPoint(metaStoreURI, dbName2, tblName2, 
null);
-    StreamingConnection connection = endPt.newConnection(false, "UT_" + 
Thread.currentThread().getName());
-    DelimitedInputWriter writer = new DelimitedInputWriter(fieldNames2,",", 
endPt, connection);
-
-    TransactionBatch txnBatch =  connection.fetchTransactionBatch(5, writer);
-    txnBatch.beginNextTransaction();
-    //todo: this should ideally check Transaction heartbeat as well, but 
heartbeat
-    //timestamp is not reported yet
-    //GetOpenTxnsInfoResponse txnresp = msClient.showTxns();
-    ShowLocksRequest request = new ShowLocksRequest();
-    request.setDbname(dbName2);
-    request.setTablename(tblName2);
-    ShowLocksResponse response = msClient.showLocks(request);
-    Assert.assertEquals("Wrong nubmer of locks: " + response, 1, 
response.getLocks().size());
-    ShowLocksResponseElement lock = response.getLocks().get(0);
-    long acquiredAt = lock.getAcquiredat();
-    long heartbeatAt = lock.getLastheartbeat();
-    txnBatch.heartbeat();
-    response = msClient.showLocks(request);
-    Assert.assertEquals("Wrong number of locks2: " + response, 1, 
response.getLocks().size());
-    lock = response.getLocks().get(0);
-    Assert.assertEquals("Acquired timestamp didn't match", acquiredAt, 
lock.getAcquiredat());
-    Assert.assertTrue("Expected new heartbeat (" + lock.getLastheartbeat() +
-      ") == old heartbeat(" + heartbeatAt +")", lock.getLastheartbeat() == 
heartbeatAt);
-    txnBatch.close();
-    int txnBatchSize = 200;
-    txnBatch = connection.fetchTransactionBatch(txnBatchSize, writer);
-    for(int i = 0; i < txnBatchSize; i++) {
-      txnBatch.beginNextTransaction();
-      if(i % 47 == 0) {
-        txnBatch.heartbeat();
-      }
-      if(i % 10 == 0) {
-        txnBatch.abort();
-      }
-      else {
-        txnBatch.commit();
-      }
-      if(i % 37 == 0) {
-        txnBatch.heartbeat();
+    int transactionBatch = 20;
+    conf.setTimeVar(HiveConf.ConfVars.HIVE_TXN_TIMEOUT, 200, 
TimeUnit.MILLISECONDS);
+    StrictDelimitedInputWriter writer = StrictDelimitedInputWriter.newBuilder()
+      .withFieldDelimiter(',')
+      .build();
+    HiveStreamingConnection connection = HiveStreamingConnection.newBuilder()
+      .withDatabase(dbName2)
+      .withTable(tblName2)
+      .withAgentInfo("UT_" + Thread.currentThread().getName())
+      .withTransactionBatchSize(transactionBatch)
+      .withRecordWriter(writer)
+      .withHiveConf(conf)
+      .connect();
+    try {
+      connection.beginTransaction();
+      ShowLocksRequest request = new ShowLocksRequest();
+      request.setDbname(dbName2);
+      request.setTablename(tblName2);
+      ShowLocksResponse response = msClient.showLocks(request);
+      Assert.assertEquals("Wrong number of locks: " + response, 1, 
response.getLocks().size());
+      ShowLocksResponseElement lock = response.getLocks().get(0);
+      long acquiredAt = lock.getAcquiredat();
+      long heartbeatAt = lock.getLastheartbeat();
+      response = msClient.showLocks(request);
+      Assert.assertEquals("Wrong number of locks2: " + response, 1, 
response.getLocks().size());
+      lock = response.getLocks().get(0);
+      Assert.assertEquals("Acquired timestamp didn'table match", acquiredAt, 
lock.getAcquiredat());
+      Assert.assertTrue("Expected new heartbeat (" + lock.getLastheartbeat() +
+        ") == old heartbeat(" + heartbeatAt + ")", lock.getLastheartbeat() == 
heartbeatAt);
+      for (int i = 0; i < transactionBatch * 3; i++) {
+        connection.beginTransaction();
+        if (i % 10 == 0) {
+          connection.abortTransaction();
+        } else {
+          connection.commitTransaction();
+        }
+        Thread.sleep(10);
       }
+    } finally {
+      conf.unset(HiveConf.ConfVars.HIVE_TXN_TIMEOUT.varname);
+      connection.close();
     }
-
   }
+
   @Test
   public void testTransactionBatchEmptyAbort() throws Exception {
     // 1) to partitioned table
-    HiveEndPoint endPt = new HiveEndPoint(metaStoreURI, dbName, tblName,
-            partitionVals);
-    StreamingConnection connection = endPt.newConnection(true, "UT_" + 
Thread.currentThread().getName());
-    DelimitedInputWriter writer = new DelimitedInputWriter(fieldNames,",", 
endPt, connection);
-
-    TransactionBatch txnBatch =  connection.fetchTransactionBatch(10, writer);
-    txnBatch.beginNextTransaction();
-    txnBatch.abort();
-    Assert.assertEquals(TransactionBatch.TxnState.ABORTED
-            , txnBatch.getCurrentTransactionState());
-    txnBatch.close();
+    StrictDelimitedInputWriter writer = StrictDelimitedInputWriter.newBuilder()
+      .withFieldDelimiter(',')
+      .build();
+    HiveStreamingConnection connection = HiveStreamingConnection.newBuilder()
+      .withDatabase(dbName)
+      .withTable(tblName)
+      .withStaticPartitionValues(partitionVals)
+      .withAgentInfo("UT_" + Thread.currentThread().getName())
+      .withRecordWriter(writer)
+      .withHiveConf(conf)
+      .connect();
+
+    connection.beginTransaction();
+    connection.abortTransaction();
+    Assert.assertEquals(HiveStreamingConnection.TxnState.ABORTED
+      , connection.getCurrentTransactionState());
     connection.close();
 
     // 2) to unpartitioned table
-    endPt = new HiveEndPoint(metaStoreURI, dbName2, tblName2, null);
-    writer = new DelimitedInputWriter(fieldNames,",", endPt);
-    connection = endPt.newConnection(true, "UT_" + 
Thread.currentThread().getName());
-
-    txnBatch =  connection.fetchTransactionBatch(10, writer);
-    txnBatch.beginNextTransaction();
-    txnBatch.abort();
-    Assert.assertEquals(TransactionBatch.TxnState.ABORTED
-            , txnBatch.getCurrentTransactionState());
-    txnBatch.close();
+    writer = StrictDelimitedInputWriter.newBuilder()
+      .withFieldDelimiter(',')
+      .build();
+    connection = HiveStreamingConnection.newBuilder()
+      .withDatabase(dbName2)
+      .withTable(tblName2)
+      .withAgentInfo("UT_" + Thread.currentThread().getName())
+      .withRecordWriter(writer)
+      .withHiveConf(conf)
+      .connect();
+
+    connection.beginTransaction();
+    connection.abortTransaction();
+    Assert.assertEquals(HiveStreamingConnection.TxnState.ABORTED
+      , connection.getCurrentTransactionState());
     connection.close();
   }
 
   @Test
-  public void testTransactionBatchCommit_Delimited() throws Exception {
-    testTransactionBatchCommit_Delimited(null);
-  }
-  @Test
-  public void testTransactionBatchCommit_DelimitedUGI() throws Exception {
-    testTransactionBatchCommit_Delimited(Utils.getUGI());
-  }
-  private void testTransactionBatchCommit_Delimited(UserGroupInformation ugi) 
throws Exception {
-    HiveEndPoint endPt = new HiveEndPoint(metaStoreURI, dbName, tblName,
-      partitionVals);
-    StreamingConnection connection = endPt.newConnection(true, conf, ugi, 
"UT_" + Thread.currentThread().getName());
-    DelimitedInputWriter writer = new DelimitedInputWriter(fieldNames,",", 
endPt, conf, connection);
+  public void testTransactionBatchCommitDelimited() throws Exception {
+    StrictDelimitedInputWriter writer = StrictDelimitedInputWriter.newBuilder()
+      .withFieldDelimiter(',')
+      .build();
+
+    HiveStreamingConnection connection = HiveStreamingConnection.newBuilder()
+      .withDatabase(dbName)
+      .withTable(tblName)
+      .withStaticPartitionValues(partitionVals)
+      .withAgentInfo("UT_" + Thread.currentThread().getName())
+      .withHiveConf(conf)
+      .withRecordWriter(writer)
+      .withTransactionBatchSize(10)
+      .connect();
 
     // 1st Txn
-    TransactionBatch txnBatch =  connection.fetchTransactionBatch(10, writer);
-    txnBatch.beginNextTransaction();
-    Assert.assertEquals(TransactionBatch.TxnState.OPEN
-      , txnBatch.getCurrentTransactionState());
-    txnBatch.write("1,Hello streaming".getBytes());
-    txnBatch.commit();
+    connection.beginTransaction();
+    Assert.assertEquals(HiveStreamingConnection.TxnState.OPEN
+      , connection.getCurrentTransactionState());
+    connection.write("1,Hello streaming".getBytes());
+    connection.commitTransaction();
 
     checkDataWritten(partLoc, 1, 10, 1, 1, "{1, Hello streaming}");
 
-    Assert.assertEquals(TransactionBatch.TxnState.COMMITTED
-      , txnBatch.getCurrentTransactionState());
+    Assert.assertEquals(HiveStreamingConnection.TxnState.COMMITTED
+      , connection.getCurrentTransactionState());
 
     // 2nd Txn
-    txnBatch.beginNextTransaction();
-    Assert.assertEquals(TransactionBatch.TxnState.OPEN
-      , txnBatch.getCurrentTransactionState());
-    txnBatch.write("2,Welcome to streaming".getBytes());
+    connection.beginTransaction();
+    Assert.assertEquals(HiveStreamingConnection.TxnState.OPEN
+      , connection.getCurrentTransactionState());
+    connection.write("2,Welcome to streaming".getBytes());
 
     // data should not be visible
     checkDataWritten(partLoc, 1, 10, 1, 1, "{1, Hello streaming}");
 
-    txnBatch.commit();
+    connection.commitTransaction();
 
     checkDataWritten(partLoc, 1, 10, 1, 1, "{1, Hello streaming}",
       "{2, Welcome to streaming}");
 
-    txnBatch.close();
-    Assert.assertEquals(TransactionBatch.TxnState.INACTIVE
-      , txnBatch.getCurrentTransactionState());
-
-
     connection.close();
 
+    Assert.assertEquals(HiveStreamingConnection.TxnState.INACTIVE
+      , connection.getCurrentTransactionState());
 
-    // To Unpartitioned table
-    endPt = new HiveEndPoint(metaStoreURI, dbName2, tblName2, null);
-    connection = endPt.newConnection(true, conf, ugi, "UT_" + 
Thread.currentThread().getName());
-    writer = new DelimitedInputWriter(fieldNames,",", endPt, conf, connection);
 
+    // To Unpartitioned table
+    writer = StrictDelimitedInputWriter.newBuilder()
+      .withFieldDelimiter(',')
+      .build();
+    connection = HiveStreamingConnection.newBuilder()
+      .withDatabase(dbName2)
+      .withTable(tblName2)
+      .withAgentInfo("UT_" + Thread.currentThread().getName())
+      .withHiveConf(conf)
+      .withRecordWriter(writer)
+      .connect();
     // 1st Txn
-    txnBatch =  connection.fetchTransactionBatch(10, writer);
-    txnBatch.beginNextTransaction();
-    Assert.assertEquals(TransactionBatch.TxnState.OPEN
-      , txnBatch.getCurrentTransactionState());
-    txnBatch.write("1,Hello streaming".getBytes());
-    txnBatch.commit();
-
-    Assert.assertEquals(TransactionBatch.TxnState.COMMITTED
-      , txnBatch.getCurrentTransactionState());
+    connection.beginTransaction();
+    Assert.assertEquals(HiveStreamingConnection.TxnState.OPEN
+      , connection.getCurrentTransactionState());
+    connection.write("1,Hello streaming".getBytes());
+    connection.commitTransaction();
+
+    Assert.assertEquals(HiveStreamingConnection.TxnState.COMMITTED
+      , connection.getCurrentTransactionState());
     connection.close();
   }
 
   @Test
-  public void testTransactionBatchCommit_Regex() throws Exception {
-    testTransactionBatchCommit_Regex(null);
-  }
-  @Test
-  public void testTransactionBatchCommit_RegexUGI() throws Exception {
-    testTransactionBatchCommit_Regex(Utils.getUGI());
-  }
-  private void testTransactionBatchCommit_Regex(UserGroupInformation ugi) 
throws Exception {
-    HiveEndPoint endPt = new HiveEndPoint(metaStoreURI, dbName, tblName,
-      partitionVals);
-    StreamingConnection connection = endPt.newConnection(true, conf, ugi, 
"UT_" + Thread.currentThread().getName());
+  public void testTransactionBatchCommitRegex() throws Exception {
     String regex = "([^,]*),(.*)";
-    StrictRegexWriter writer = new StrictRegexWriter(regex, endPt, conf, 
connection);
+    StrictRegexWriter writer = StrictRegexWriter.newBuilder()
+      .withRegex(regex)
+      .build();
+    HiveStreamingConnection connection = HiveStreamingConnection.newBuilder()
+      .withDatabase(dbName)
+      .withTable(tblName)
+      .withStaticPartitionValues(partitionVals)
+      .withAgentInfo("UT_" + Thread.currentThread().getName())
+      .withHiveConf(conf)
+      .withRecordWriter(writer)
+      .withTransactionBatchSize(10)
+      .connect();
 
     // 1st Txn
-    TransactionBatch txnBatch =  connection.fetchTransactionBatch(10, writer);
-    txnBatch.beginNextTransaction();
-    Assert.assertEquals(TransactionBatch.TxnState.OPEN
-      , txnBatch.getCurrentTransactionState());
-    txnBatch.write("1,Hello streaming".getBytes());
-    txnBatch.commit();
+    connection.beginTransaction();
+    Assert.assertEquals(HiveStreamingConnection.TxnState.OPEN
+      , connection.getCurrentTransactionState());
+    connection.write("1,Hello streaming".getBytes());
+    connection.commitTransaction();
 
     checkDataWritten(partLoc, 1, 10, 1, 1, "{1, Hello streaming}");
 
-    Assert.assertEquals(TransactionBatch.TxnState.COMMITTED
-      , txnBatch.getCurrentTransactionState());
+    Assert.assertEquals(HiveStreamingConnection.TxnState.COMMITTED
+      , connection.getCurrentTransactionState());
 
     // 2nd Txn
-    txnBatch.beginNextTransaction();
-    Assert.assertEquals(TransactionBatch.TxnState.OPEN
-      , txnBatch.getCurrentTransactionState());
-    txnBatch.write("2,Welcome to streaming".getBytes());
+    connection.beginTransaction();
+    Assert.assertEquals(HiveStreamingConnection.TxnState.OPEN
+      , connection.getCurrentTransactionState());
+    connection.write("2,Welcome to streaming".getBytes());
 
     // data should not be visible
     checkDataWritten(partLoc, 1, 10, 1, 1, "{1, Hello streaming}");
 
-    txnBatch.commit();
+    connection.commitTransaction();
 
     checkDataWritten(partLoc, 1, 10, 1, 1, "{1, Hello streaming}",
       "{2, Welcome to streaming}");
 
-    txnBatch.close();
-    Assert.assertEquals(TransactionBatch.TxnState.INACTIVE
-      , txnBatch.getCurrentTransactionState());
-
-
     connection.close();
-
+    Assert.assertEquals(HiveStreamingConnection.TxnState.INACTIVE
+      , connection.getCurrentTransactionState());
 
     // To Unpartitioned table
-    endPt = new HiveEndPoint(metaStoreURI, dbName2, tblName2, null);
-    connection = endPt.newConnection(true, conf, ugi, "UT_" + 
Thread.currentThread().getName());
     regex = "([^:]*):(.*)";
-    writer = new StrictRegexWriter(regex, endPt, conf, connection);
+    writer = StrictRegexWriter.newBuilder()
+      .withRegex(regex)
+      .build();
+
+    connection = HiveStreamingConnection.newBuilder()
+      .withDatabase(dbName2)
+      .withTable(tblName2)
+      .withAgentInfo("UT_" + Thread.currentThread().getName())
+      .withHiveConf(conf)
+      .withRecordWriter(writer)
+      .connect();
 
     // 1st Txn
-    txnBatch =  connection.fetchTransactionBatch(10, writer);
-    txnBatch.beginNextTransaction();
-    Assert.assertEquals(TransactionBatch.TxnState.OPEN
-      , txnBatch.getCurrentTransactionState());
-    txnBatch.write("1:Hello streaming".getBytes());
-    txnBatch.commit();
-
-    Assert.assertEquals(TransactionBatch.TxnState.COMMITTED
-      , txnBatch.getCurrentTransactionState());
+    connection.beginTransaction();
+    Assert.assertEquals(HiveStreamingConnection.TxnState.OPEN
+      , connection.getCurrentTransactionState());
+    connection.write("1:Hello streaming".getBytes());
+    connection.commitTransaction();
+
+    Assert.assertEquals(HiveStreamingConnection.TxnState.COMMITTED
+      , connection.getCurrentTransactionState());
     connection.close();
   }
 
   @Test
-  public void testTransactionBatchCommit_Json() throws Exception {
-    HiveEndPoint endPt = new HiveEndPoint(metaStoreURI, dbName, tblName,
-            partitionVals);
-    StreamingConnection connection = endPt.newConnection(true, "UT_" + 
Thread.currentThread().getName());
-    StrictJsonWriter writer = new StrictJsonWriter(endPt, connection);
+  public void testTransactionBatchCommitJson() throws Exception {
+    StrictJsonWriter writer = StrictJsonWriter.newBuilder()
+      .build();
+    HiveStreamingConnection connection = HiveStreamingConnection.newBuilder()
+      .withDatabase(dbName)
+      .withTable(tblName)
+      .withStaticPartitionValues(partitionVals)
+      .withAgentInfo("UT_" + Thread.currentThread().getName())
+      .withRecordWriter(writer)
+      .withHiveConf(conf)
+      .withTransactionBatchSize(10)
+      .connect();
 
     // 1st Txn
-    TransactionBatch txnBatch =  connection.fetchTransactionBatch(10, writer);
-    txnBatch.beginNextTransaction();
-    Assert.assertEquals(TransactionBatch.TxnState.OPEN
-            , txnBatch.getCurrentTransactionState());
+    connection.beginTransaction();
+    Assert.assertEquals(HiveStreamingConnection.TxnState.OPEN
+      , connection.getCurrentTransactionState());
     String rec1 = "{\"id\" : 1, \"msg\": \"Hello streaming\"}";
-    txnBatch.write(rec1.getBytes());
-    txnBatch.commit();
+    connection.write(rec1.getBytes());
+    connection.commitTransaction();
 
     checkDataWritten(partLoc, 1, 10, 1, 1, "{1, Hello streaming}");
 
-    Assert.assertEquals(TransactionBatch.TxnState.COMMITTED
-            , txnBatch.getCurrentTransactionState());
-
-    txnBatch.close();
-    Assert.assertEquals(TransactionBatch.TxnState.INACTIVE
-            , txnBatch.getCurrentTransactionState());
+    Assert.assertEquals(HiveStreamingConnection.TxnState.COMMITTED
+      , connection.getCurrentTransactionState());
 
     connection.close();
+    Assert.assertEquals(HiveStreamingConnection.TxnState.INACTIVE
+      , connection.getCurrentTransactionState());
+
     List<String> rs = queryTable(driver, "select * from " + dbName + "." + 
tblName);
     Assert.assertEquals(1, rs.size());
   }
 
   @Test
   public void testRemainingTransactions() throws Exception {
-    HiveEndPoint endPt = new HiveEndPoint(metaStoreURI, dbName, tblName,
-            partitionVals);
-    DelimitedInputWriter writer = new DelimitedInputWriter(fieldNames,",", 
endPt);
-    StreamingConnection connection = endPt.newConnection(true, "UT_" + 
Thread.currentThread().getName());
-
+    StrictDelimitedInputWriter writer = StrictDelimitedInputWriter.newBuilder()
+      .withFieldDelimiter(',')
+      .build();
+    HiveStreamingConnection connection = HiveStreamingConnection.newBuilder()
+      .withDatabase(dbName)
+      .withTable(tblName)
+      .withStaticPartitionValues(partitionVals)
+      .withAgentInfo("UT_" + Thread.currentThread().getName())
+      .withRecordWriter(writer)
+      .withHiveConf(conf)
+      .connect();
+    connection.beginTransaction();
     // 1) test with txn.Commit()
-    TransactionBatch txnBatch =  connection.fetchTransactionBatch(10, writer);
-    int batch=0;
-    int initialCount = txnBatch.remainingTransactions();
-    while (txnBatch.remainingTransactions()>0) {
-      txnBatch.beginNextTransaction();
-      Assert.assertEquals(--initialCount, txnBatch.remainingTransactions());
-      for (int rec=0; rec<2; ++rec) {
-        Assert.assertEquals(TransactionBatch.TxnState.OPEN
-                , txnBatch.getCurrentTransactionState());
-        txnBatch.write((batch * rec + ",Hello streaming").getBytes());
+    int batch = 0;
+    int initialCount = connection.remainingTransactions();
+    while (connection.remainingTransactions() > 0) {
+      connection.beginTransaction();
+      Assert.assertEquals(--initialCount, connection.remainingTransactions());
+      for (int rec = 0; rec < 2; ++rec) {
+        Assert.assertEquals(HiveStreamingConnection.TxnState.OPEN
+          , connection.getCurrentTransactionState());
+        connection.write((batch * rec + ",Hello streaming").getBytes());
       }
-      txnBatch.commit();
-      Assert.assertEquals(TransactionBatch.TxnState.COMMITTED
-              , txnBatch.getCurrentTransactionState());
+      connection.commitTransaction();
+      Assert.assertEquals(HiveStreamingConnection.TxnState.COMMITTED
+        , connection.getCurrentTransactionState());
       ++batch;
     }
-    Assert.assertEquals(0, txnBatch.remainingTransactions());
-    txnBatch.close();
-
-    Assert.assertEquals(TransactionBatch.TxnState.INACTIVE
-            , txnBatch.getCurrentTransactionState());
+    Assert.assertEquals(0, connection.remainingTransactions());
+    connection.close();
 
+    Assert.assertEquals(HiveStreamingConnection.TxnState.INACTIVE
+      , connection.getCurrentTransactionState());
+
+    connection = HiveStreamingConnection.newBuilder()
+      .withDatabase(dbName)
+      .withTable(tblName)
+      .withStaticPartitionValues(partitionVals)
+      .withAgentInfo("UT_" + Thread.currentThread().getName())
+      .withRecordWriter(writer)
+      .withHiveConf(conf)
+      .connect();
     // 2) test with txn.Abort()
-    txnBatch =  connection.fetchTransactionBatch(10, writer);
-    batch=0;
-    initialCount = txnBatch.remainingTransactions();
-    while (txnBatch.remainingTransactions()>0) {
-      txnBatch.beginNextTransaction();
-      Assert.assertEquals(--initialCount,txnBatch.remainingTransactions());
-      for (int rec=0; rec<2; ++rec) {
-        Assert.assertEquals(TransactionBatch.TxnState.OPEN
-                , txnBatch.getCurrentTransactionState());
-        txnBatch.write((batch * rec + ",Hello streaming").getBytes());
+    connection.beginTransaction();
+    batch = 0;
+    initialCount = connection.remainingTransactions();
+    while (connection.remainingTransactions() > 0) {
+      connection.beginTransaction();
+      Assert.assertEquals(--initialCount, connection.remainingTransactions());
+      for (int rec = 0; rec < 2; ++rec) {
+        Assert.assertEquals(HiveStreamingConnection.TxnState.OPEN
+          , connection.getCurrentTransactionState());
+        connection.write((batch * rec + ",Hello streaming").getBytes());
       }
-      txnBatch.abort();
-      Assert.assertEquals(TransactionBatch.TxnState.ABORTED
-              , txnBatch.getCurrentTransactionState());
+      connection.abortTransaction();
+      Assert.assertEquals(HiveStreamingConnection.TxnState.ABORTED
+        , connection.getCurrentTransactionState());
       ++batch;
     }
-    Assert.assertEquals(0, txnBatch.remainingTransactions());
-    txnBatch.close();
-
-    Assert.assertEquals(TransactionBatch.TxnState.INACTIVE
-            , txnBatch.getCurrentTransactionState());
-
+    Assert.assertEquals(0, connection.remainingTransactions());
     connection.close();
+
+    Assert.assertEquals(HiveStreamingConnection.TxnState.INACTIVE
+      , connection.getCurrentTransactionState());
   }
 
   @Test
   public void testTransactionBatchAbort() throws Exception {
-
-    HiveEndPoint endPt = new HiveEndPoint(metaStoreURI, dbName, tblName,
-            partitionVals);
-    StreamingConnection connection = endPt.newConnection(false, "UT_" + 
Thread.currentThread().getName());
-    DelimitedInputWriter writer = new DelimitedInputWriter(fieldNames,",", 
endPt, connection);
-
-
-    TransactionBatch txnBatch =  connection.fetchTransactionBatch(10, writer);
-    txnBatch.beginNextTransaction();
-    txnBatch.write("1,Hello streaming".getBytes());
-    txnBatch.write("2,Welcome to streaming".getBytes());
-    txnBatch.abort();
+    StrictDelimitedInputWriter writer = StrictDelimitedInputWriter.newBuilder()
+      .withFieldDelimiter(',')
+      .build();
+
+    HiveStreamingConnection connection = HiveStreamingConnection.newBuilder()
+      .withDatabase(dbName)
+      .withTable(tblName)
+      .withStaticPartitionValues(partitionVals)
+      .withAgentInfo("UT_" + Thread.currentThread().getName())
+      .withRecordWriter(writer)
+      .withHiveConf(conf)
+      .connect();
+
+    connection.beginTransaction();
+    connection.write("1,Hello streaming".getBytes());
+    connection.write("2,Welcome to streaming".getBytes());
+    connection.abortTransaction();
 
     checkNothingWritten(partLoc);
 
-    Assert.assertEquals(TransactionBatch.TxnState.ABORTED
-            , txnBatch.getCurrentTransactionState());
+    Assert.assertEquals(HiveStreamingConnection.TxnState.ABORTED
+      , connection.getCurrentTransactionState());
 
-    txnBatch.close();
     connection.close();
 
     checkNothingWritten(partLoc);
@@ -1116,123 +1363,162 @@ public class TestStreaming {
   @Test
   public void testTransactionBatchAbortAndCommit() throws Exception {
     String agentInfo = "UT_" + Thread.currentThread().getName();
-    HiveEndPoint endPt = new HiveEndPoint(metaStoreURI, dbName, tblName,
-            partitionVals);
-    StreamingConnection connection = endPt.newConnection(false, agentInfo);
-    DelimitedInputWriter writer = new DelimitedInputWriter(fieldNames,",", 
endPt, connection);
-
-    TransactionBatch txnBatch =  connection.fetchTransactionBatch(10, writer);
-    txnBatch.beginNextTransaction();
-    txnBatch.write("1,Hello streaming".getBytes());
-    txnBatch.write("2,Welcome to streaming".getBytes());
+    StrictDelimitedInputWriter writer = StrictDelimitedInputWriter.newBuilder()
+      .withFieldDelimiter(',')
+      .build();
+
+    HiveStreamingConnection connection = HiveStreamingConnection.newBuilder()
+      .withDatabase(dbName)
+      .withTable(tblName)
+      .withStaticPartitionValues(partitionVals)
+      .withAgentInfo(agentInfo)
+      .withRecordWriter(writer)
+      .withHiveConf(conf)
+      .withTransactionBatchSize(10)
+      .connect();
+
+    connection.beginTransaction();
+    connection.write("1,Hello streaming".getBytes());
+    connection.write("2,Welcome to streaming".getBytes());
     ShowLocksResponse resp = msClient.showLocks(new ShowLocksRequest());
     Assert.assertEquals("LockCount", 1, resp.getLocksSize());
     Assert.assertEquals("LockType", LockType.SHARED_READ, 
resp.getLocks().get(0).getType());
     Assert.assertEquals("LockState", LockState.ACQUIRED, 
resp.getLocks().get(0).getState());
     Assert.assertEquals("AgentInfo", agentInfo, 
resp.getLocks().get(0).getAgentInfo());
-    txnBatch.abort();
+    connection.abortTransaction();
 
     checkNothingWritten(partLoc);
 
-    Assert.assertEquals(TransactionBatch.TxnState.ABORTED
-            , txnBatch.getCurrentTransactionState());
+    Assert.assertEquals(HiveStreamingConnection.TxnState.ABORTED
+      , connection.getCurrentTransactionState());
 
-    txnBatch.beginNextTransaction();
-    txnBatch.write("1,Hello streaming".getBytes());
-    txnBatch.write("2,Welcome to streaming".getBytes());
-    txnBatch.commit();
+    connection.beginTransaction();
+    connection.write("1,Hello streaming".getBytes());
+    connection.write("2,Welcome to streaming".getBytes());
+    connection.commitTransaction();
 
     checkDataWritten(partLoc, 1, 10, 1, 1, "{1, Hello streaming}",
-            "{2, Welcome to streaming}");
+      "{2, Welcome to streaming}");
 
-    txnBatch.close();
     connection.close();
   }
 
   @Test
   public void testMultipleTransactionBatchCommits() throws Exception {
-    HiveEndPoint endPt = new HiveEndPoint(metaStoreURI, dbName, tblName,
-            partitionVals);
-    DelimitedInputWriter writer = new DelimitedInputWriter(fieldNames,",", 
endPt);
-    StreamingConnection connection = endPt.newConnection(true, "UT_" + 
Thread.currentThread().getName());
-
-    TransactionBatch txnBatch =  connection.fetchTransactionBatch(10, writer);
-    txnBatch.beginNextTransaction();
-    txnBatch.write("1,Hello streaming".getBytes());
-    txnBatch.commit();
+    StrictDelimitedInputWriter writer = StrictDelimitedInputWriter.newBuilder()
+      .withFieldDelimiter(',')
+      .build();
+
+    HiveStreamingConnection connection = HiveStreamingConnection.newBuilder()
+      .withDatabase(dbName)
+      .withTable(tblName)
+      .withStaticPartitionValues(partitionVals)
+      .withAgentInfo("UT_" + Thread.currentThread().getName())
+      .withRecordWriter(writer)
+      .withTransactionBatchSize(10)
+      .withHiveConf(conf)
+      .connect();
+
+    connection.beginTransaction();
+    connection.write("1,Hello streaming".getBytes());
+    connection.commitTransaction();
     String validationQuery = "select id, msg from " + dbName + "." + tblName + 
" order by id, msg";
     checkDataWritten2(partLoc, 1, 10, 1, validationQuery, false, "1\tHello 
streaming");
 
-    txnBatch.beginNextTransaction();
-    txnBatch.write("2,Welcome to streaming".getBytes());
-    txnBatch.commit();
+    connection.beginTransaction();
+    connection.write("2,Welcome to streaming".getBytes());
+    connection.commitTransaction();
 
-    checkDataWritten2(partLoc, 1, 10,  1, validationQuery, true, "1\tHello 
streaming",
-            "2\tWelcome to streaming");
+    checkDataWritten2(partLoc, 1, 10, 1, validationQuery, true, "1\tHello 
streaming",
+      "2\tWelcome to streaming");
 
-    txnBatch.close();
+    connection.close();
 
+    connection = HiveStreamingConnection.newBuilder()
+      .withDatabase(dbName)
+      .withTable(tblName)
+      .withStaticPartitionValues(partitionVals)
+      .withAgentInfo("UT_" + Thread.currentThread().getName())
+      .withRecordWriter(writer)
+      .withTransactionBatchSize(10)
+      .withHiveConf(conf)
+      .connect();
     // 2nd Txn Batch
-    txnBatch =  connection.fetchTransactionBatch(10, writer);
-    txnBatch.beginNextTransaction();
-    txnBatch.write("3,Hello streaming - once again".getBytes());
-    txnBatch.commit();
+    connection.beginTransaction();
+    connection.write("3,Hello streaming - once again".getBytes());
+    connection.commitTransaction();
 
-    checkDataWritten2(partLoc, 1, 20,  2, validationQuery, false, "1\tHello 
streaming",
-            "2\tWelcome to streaming", "3\tHello streaming - once again");
+    checkDataWritten2(partLoc, 1, 20, 2, validationQuery, false, "1\tHello 
streaming",
+      "2\tWelcome to streaming", "3\tHello streaming - once again");
 
-    txnBatch.beginNextTransaction();
-    txnBatch.write("4,Welcome to streaming - once again".getBytes());
-    txnBatch.commit();
+    connection.beginTransaction();
+    connection.write("4,Welcome to streaming - once again".getBytes());
+    connection.commitTransaction();
 
-    checkDataWritten2(partLoc, 1, 20,  2, validationQuery, true, "1\tHello 
streaming",
-            "2\tWelcome to streaming", "3\tHello streaming - once again",
-            "4\tWelcome to streaming - once again");
+    checkDataWritten2(partLoc, 1, 20, 2, validationQuery, true, "1\tHello 
streaming",
+      "2\tWelcome to streaming", "3\tHello streaming - once again",
+      "4\tWelcome to streaming - once again");
 
-    Assert.assertEquals(TransactionBatch.TxnState.COMMITTED
-            , txnBatch.getCurrentTransactionState());
-
-    txnBatch.close();
+    Assert.assertEquals(HiveStreamingConnection.TxnState.COMMITTED
+      , connection.getCurrentTransactionState());
 
     connection.close();
   }
 
   @Test
   public void testInterleavedTransactionBatchCommits() throws Exception {
-    HiveEndPoint endPt = new HiveEndPoint(metaStoreURI, dbName, tblName,
-            partitionVals);
-    DelimitedInputWriter writer = new DelimitedInputWriter(fieldNames, ",", 
endPt);
-    StreamingConnection connection = endPt.newConnection(false, "UT_" + 
Thread.currentThread().getName());
+    StrictDelimitedInputWriter writer = StrictDelimitedInputWriter.newBuilder()
+      .withFieldDelimiter(',')
+      .build();
+
+    HiveStreamingConnection connection = HiveStreamingConnection.newBuilder()
+      .withDatabase(dbName)
+      .withTable(tblName)
+      .withStaticPartitionValues(partitionVals)
+      .withAgentInfo("UT_" + Thread.currentThread().getName())
+      .withRecordWriter(writer)
+      .withHiveConf(conf)
+      .withTransactionBatchSize(10)
+      .connect();
 
     // Acquire 1st Txn Batch
-    TransactionBatch txnBatch1 =  connection.fetchTransactionBatch(10, writer);
-    txnBatch1.beginNextTransaction();
+    connection.beginTransaction();
 
     // Acquire 2nd Txn Batch
-    DelimitedInputWriter writer2 = new DelimitedInputWriter(fieldNames, ",", 
endPt);
-    TransactionBatch txnBatch2 =  connection.fetchTransactionBatch(10, 
writer2);
-    txnBatch2.beginNextTransaction();
+    StrictDelimitedInputWriter writer2 = 
StrictDelimitedInputWriter.newBuilder()
+      .withFieldDelimiter(',')
+      .build();
+    HiveStreamingConnection connection2 = HiveStreamingConnection.newBuilder()
+      .withDatabase(dbName)
+      .withTable(tblName)
+      .withStaticPartitionValues(partitionVals)
+      .withAgentInfo("UT_" + Thread.currentThread().getName())
+      .withRecordWriter(writer2)
+      .withHiveConf(conf)
+      .withTransactionBatchSize(10)
+      .connect();
+    connection2.beginTransaction();
 
     // Interleaved writes to both batches
-    txnBatch1.write("1,Hello streaming".getBytes());
-    txnBatch2.write("3,Hello streaming - once again".getBytes());
+    connection.write("1,Hello streaming".getBytes());
+    connection2.write("3,Hello streaming - once again".getBytes());
 
     checkNothingWritten(partLoc);
 
-    txnBatch2.commit();
+    connection2.commitTransaction();
 
     String validationQuery = "select id, msg from " + dbName + "." + tblName + 
" order by id, msg";
     checkDataWritten2(partLoc, 11, 20, 1,
       validationQuery, true, "3\tHello streaming - once again");
 
-    txnBatch1.commit();
+    connection.commitTransaction();
     /*now both batches have committed (but not closed) so we for each primary 
file we expect a side
     file to exist and indicate the true length of primary file*/
     FileSystem fs = partLoc.getFileSystem(conf);
     AcidUtils.Directory dir = AcidUtils.getAcidState(partLoc, conf,
-            msClient.getValidWriteIds(AcidUtils.getFullTableName(dbName, 
tblName)));
-    for(AcidUtils.ParsedDelta pd : dir.getCurrentDirectories()) {
-      for(FileStatus stat : fs.listStatus(pd.getPath(), 
AcidUtils.bucketFileFilter)) {
+      msClient.getValidWriteIds(AcidUtils.getFullTableName(dbName, tblName)));
+    for (AcidUtils.ParsedDelta pd : dir.getCurrentDirectories()) {
+      for (FileStatus stat : fs.listStatus(pd.getPath(), 
AcidUtils.bucketFileFilter)) {
         Path lengthFile = OrcAcidUtils.getSideFile(stat.getPath());
         Assert.assertTrue(lengthFile + " missing", fs.exists(lengthFile));
         long lengthFileSize = fs.getFileStatus(lengthFile).getLen();
@@ -1244,20 +1530,20 @@ public class TestStreaming {
       }
     }
     checkDataWritten2(partLoc, 1, 20, 2,
-      validationQuery, false,"1\tHello streaming", "3\tHello streaming - once 
again");
+      validationQuery, false, "1\tHello streaming", "3\tHello streaming - once 
again");
 
-    txnBatch1.beginNextTransaction();
-    txnBatch1.write("2,Welcome to streaming".getBytes());
+    connection.beginTransaction();
+    connection.write("2,Welcome to streaming".getBytes());
 
-    txnBatch2.beginNextTransaction();
-    txnBatch2.write("4,Welcome to streaming - once again".getBytes());
+    connection2.beginTransaction();
+    connection2.write("4,Welcome to streaming - once again".getBytes());
     //here each batch has written data and committed (to bucket0 since table 
only has 1 bucket)
     //so each of 2 deltas has 1 bucket0 and 1 bucket0_flush_length.  
Furthermore, each bucket0
     //has now received more data(logically - it's buffered) but it is not yet 
committed.
     //lets check that side files exist, etc
     dir = AcidUtils.getAcidState(partLoc, conf, 
msClient.getValidWriteIds(AcidUtils.getFullTableName(dbName, tblName)));
-    for(AcidUtils.ParsedDelta pd : dir.getCurrentDirectories()) {
-      for(FileStatus stat : fs.listStatus(pd.getPath(), 
AcidUtils.bucketFileFilter)) {
+    for (AcidUtils.ParsedDelta pd : dir.getCurrentDirectories()) {
+      for (FileStatus stat : fs.listStatus(pd.getPath(), 
AcidUtils.bucketFileFilter)) {
         Path lengthFile = OrcAcidUtils.getSideFile(stat.getPath());
         Assert.assertTrue(lengthFile + " missing", fs.exists(lengthFile));
         long lengthFileSize = fs.getFileStatus(lengthFile).getLen();
@@ -1269,103 +1555,97 @@ public class TestStreaming {
       }
     }
     checkDataWritten2(partLoc, 1, 20, 2,
-      validationQuery, true,"1\tHello streaming", "3\tHello streaming - once 
again");
+      validationQuery, true, "1\tHello streaming", "3\tHello streaming - once 
again");
 
-    txnBatch1.commit();
+    connection.commitTransaction();
 
     checkDataWritten2(partLoc, 1, 20, 2,
       validationQuery, false, "1\tHello streaming",
-        "2\tWelcome to streaming",
-        "3\tHello streaming - once again");
+      "2\tWelcome to streaming",
+      "3\tHello streaming - once again");
 
-    txnBatch2.commit();
+    connection2.commitTransaction();
 
     checkDataWritten2(partLoc, 1, 20, 2,
       validationQuery, true, "1\tHello streaming",
-        "2\tWelcome to streaming",
-        "3\tHello streaming - once again",
-        "4\tWelcome to streaming - once again");
-
-    Assert.assertEquals(TransactionBatch.TxnState.COMMITTED
-            , txnBatch1.getCurrentTransactionState());
-    Assert.assertEquals(TransactionBatch.TxnState.COMMITTED
-            , txnBatch2.getCurrentTransactionState());
+      "2\tWelcome to streaming",
+      "3\tHello streaming - once again",
+      "4\tWelcome to streaming - once again");
 
-    txnBatch1.close();
-    txnBatch2.close();
+    Assert.assertEquals(HiveStreamingConnection.TxnState.COMMITTED
+      , connection.getCurrentTransactionState());
+    Assert.assertEquals(HiveStreamingConnection.TxnState.COMMITTED
+      , connection2.getCurrentTransactionState());
 
     connection.close();
+    connection2.close();
   }
 
   private static class WriterThd extends Thread {
 
     private final StreamingConnection conn;
-    private final DelimitedInputWriter writer;
     private final String data;
     private Throwable error;
 
-    WriterThd(HiveEndPoint ep, String data) throws Exception {
+    WriterThd(String data) throws Exception {
       super("Writer_" + data);
-      writer = new DelimitedInputWriter(fieldNames, ",", ep);
-      conn = ep.newConnection(false, "UT_" + Thread.currentThread().getName());
+      RecordWriter writer = StrictDelimitedInputWriter.newBuilder()
+        .withFieldDelimiter(',')
+        .build();
+      HiveStreamingConnection connection = HiveStreamingConnection.newBuilder()
+        .withDatabase(dbName)
+        .withTable(tblName)
+        .withStaticPartitionValues(partitionVals)
+        .withRecordWriter(writer)
+        .withHiveConf(conf)
+        .connect();
+      this.conn = connection;
       this.data = data;
-      setUncaughtExceptionHandler(new UncaughtExceptionHandler() {
-        @Override
-        public void uncaughtException(Thread thread, Throwable throwable) {
-          error = throwable;
-          LOG.error("Thread " + thread.getName() + " died: " + 
throwable.getMessage(), throwable);
-        }
+      setUncaughtExceptionHandler((thread, throwable) -> {
+        error = throwable;
+        LOG.error(connection.toTransactionString());
+        LOG.error("Thread " + thread.getName() + " died: " + 
throwable.getMessage(), throwable);
       });
     }
 
     @Override
     public void run() {
-      TransactionBatch txnBatch = null;
       try {
-        txnBatch =  conn.fetchTransactionBatch(10, writer);
-        while (txnBatch.remainingTransactions() > 0) {
-          txnBatch.beginNextTransaction();
-          txnBatch.write(data.getBytes());
-          txnBatch.write(data.getBytes());
-          txnBatch.commit();
+        for (int i = 0; i < 10; i++) {
+          conn.beginTransaction();
+          conn.write(data.getBytes());
+          conn.write(data.getBytes());
+          conn.commitTransaction();
         } // while
       } catch (Exception e) {
         throw new RuntimeException(e);
       } finally {
-        if (txnBatch != null) {
+        if (conn != null) {
           try {
-            txnBatch.close();
+            conn.close();
           } catch (Exception e) {
             LOG.error("txnBatch.close() failed: " + e.getMessage(), e);
-            conn.close();
           }
         }
-        try {
-          conn.close();
-        } catch (Exception e) {
-          LOG.error("conn.close() failed: " + e.getMessage(), e);
-        }
-
       }
     }
   }
 
   @Test
   public void testConcurrentTransactionBatchCommits() throws Exception {
-    final HiveEndPoint ep = new HiveEndPoint(metaStoreURI, dbName, tblName, 
partitionVals);
     List<WriterThd> writers = new ArrayList<WriterThd>(3);
-    writers.add(new WriterThd(ep, "1,Matrix"));
-    writers.add(new WriterThd(ep, "2,Gandhi"));
-    writers.add(new WriterThd(ep, "3,Silence"));
+    writers.add(new WriterThd("1,Matrix"));
+    writers.add(new WriterThd("2,Gandhi"));
+    writers.add(new WriterThd("3,Silence"));
 
-    for(WriterThd w : writers) {
+    for (WriterThd w : writers) {
       w.start();
     }
-    for(WriterThd w : writers) {
+    for (WriterThd w : writers) {
       w.join();
     }
-    for(WriterThd w : writers) {
-      if(w.error != null) {
+    for (WriterThd w : writers) {
+      if (w.error != null) {
         Assert.assertFalse("Writer thread" + w.getName() + " died: " + 
w.error.getMessage() +
           " See log file for stack trace", true);
       }
@@ -1376,11 +1656,11 @@ public class TestStreaming {
   private ArrayList<SampleRec> dumpBucket(Path orcFile) throws IOException {
     org.apache.hadoop.fs.FileSystem fs = 
org.apache.hadoop.fs.FileSystem.getLocal(new Configuration());
     Reader reader = OrcFile.createReader(orcFile,
-            OrcFile.readerOptions(conf).filesystem(fs));
+      OrcFile.readerOptions(conf).filesystem(fs));
 
     RecordReader rows = reader.rows();
     StructObjectInspector inspector = (StructObjectInspector) reader
-            .getObjectInspector();
+      .getObjectInspector();
 
     System.out.format("Found Bucket File : %s \n", orcFile.getName());
     ArrayList<SampleRec> result = new ArrayList<SampleRec>();
@@ -1402,7 +1682,7 @@ public class TestStreaming {
     WritableLongObjectInspector f1ins = (WritableLongObjectInspector) 
fields.get(1).getFieldObjectInspector();
     WritableIntObjectInspector f2ins = (WritableIntObjectInspector) 
fields.get(2).getFieldObjectInspector();
     WritableLongObjectInspector f3ins = (WritableLongObjectInspector) 
fields.get(3).getFieldObjectInspector();
-    WritableLongObjectInspector f4ins = (WritableLongObjectInspector)  
fields.get(4).getFieldObjectInspector();
+    WritableLongObjectInspector f4ins = (WritableLongObjectInspector) 
fields.get(4).getFieldObjectInspector();
     StructObjectInspector f5ins = (StructObjectInspector) 
fields.get(5).getFieldObjectInspector();
 
     int f0 = f0ins.get(inspector.getStructFieldData(row, fields.get(0)));
@@ -1412,7 +1692,7 @@ public class TestStreaming {
     long f4 = f4ins.get(inspector.getStructFieldData(row, fields.get(4)));
     SampleRec f5 = deserializeInner(inspector.getStructFieldData(row, 
fields.get(5)), f5ins);
 
-    return new Object[] {f0, f1, f2, f3, f4, f5};
+    return new Object[]{f0, f1, f2, f3, f4, f5};
   }
 
   // Assumes row schema => string,int,string
@@ -1437,49 +1717,67 @@ public class TestStreaming {
 
     // 1) Create two bucketed tables
     String dbLocation = dbFolder.newFolder(dbName3).getCanonicalPath() + ".db";
-    dbLocation = dbLocation.replaceAll("\\\\","/"); // for windows paths
+    dbLocation = dbLocation.replaceAll("\\\\", "/"); // for windows paths
     String[] colNames = "key1,key2,data".split(",");
     String[] colTypes = "string,int,string".split(",");
     String[] bucketNames = "key1,key2".split(",");
     int bucketCount = 4;
     createDbAndTable(driver, dbName3, tblName3, null, colNames, colTypes, 
bucketNames
-            , null, dbLocation, bucketCount);
+      , null, dbLocation, bucketCount);
 
     String dbLocation2 = dbFolder.newFolder(dbName4).getCanonicalPath() + 
".db";
-    dbLocation2 = dbLocation2.replaceAll("\\\\","/"); // for windows paths
+    dbLocation2 = dbLocation2.replaceAll("\\\\", "/"); // for windows paths
     String[] colNames2 = "key3,key4,data2".split(",");
     String[] colTypes2 = "string,int,string".split(",");
     String[] bucketNames2 = "key3,key4".split(",");
     createDbAndTable(driver, dbName4, tblName4, null, colNames2, colTypes2, 
bucketNames2
-            , null, dbLocation2, bucketCount);
+      , null, dbLocation2, bucketCount);
 
 
     // 2) Insert data into both tables
-    HiveEndPoint endPt = new HiveEndPoint(metaStoreURI, db

<TRUNCATED>

Reply via email to