+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hive.streaming;
+import static 
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FSDataInputStream;
+import org.apache.hadoop.fs.FSDataOutputStream;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.RawLocalFileSystem;
+import org.apache.hadoop.fs.permission.FsPermission;
+import org.apache.hadoop.hive.cli.CliSessionState;
+import org.apache.hadoop.hive.common.JavaUtils;
+import org.apache.hadoop.hive.common.ValidWriteIdList;
+import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.hadoop.hive.conf.Validator;
+import org.apache.hadoop.hive.metastore.HiveMetaStoreClient;
+import org.apache.hadoop.hive.metastore.IMetaStoreClient;
+import org.apache.hadoop.hive.metastore.api.FieldSchema;
+import org.apache.hadoop.hive.metastore.api.GetOpenTxnsInfoResponse;
+import org.apache.hadoop.hive.metastore.api.LockState;
+import org.apache.hadoop.hive.metastore.api.LockType;
+import org.apache.hadoop.hive.metastore.api.MetaException;
+import org.apache.hadoop.hive.metastore.api.NoSuchObjectException;
+import org.apache.hadoop.hive.metastore.api.Partition;
+import org.apache.hadoop.hive.metastore.api.ShowLocksRequest;
+import org.apache.hadoop.hive.metastore.api.ShowLocksResponse;
+import org.apache.hadoop.hive.metastore.api.ShowLocksResponseElement;
+import org.apache.hadoop.hive.metastore.api.TxnAbortedException;
+import org.apache.hadoop.hive.metastore.api.TxnInfo;
+import org.apache.hadoop.hive.metastore.api.TxnState;
+import org.apache.hadoop.hive.metastore.api.hive_metastoreConstants;
+import org.apache.hadoop.hive.metastore.txn.AcidHouseKeeperService;
+import org.apache.hadoop.hive.metastore.txn.TxnDbUtil;
+import org.apache.hadoop.hive.ql.DriverFactory;
+import org.apache.hadoop.hive.ql.IDriver;
+import org.apache.hadoop.hive.ql.processors.CommandProcessorResponse;
+import org.apache.hadoop.hive.ql.session.SessionState;
+import org.apache.hadoop.hive.ql.txn.compactor.Worker;
+import org.apache.hadoop.hive.serde.serdeConstants;
+import org.apache.hadoop.hive.serde2.objectinspector.StructField;
+import org.apache.hadoop.hive.serde2.objectinspector.StructObjectInspector;
+import org.apache.hadoop.hive.shims.Utils;
+import org.apache.hadoop.mapred.InputFormat;
+import org.apache.hadoop.mapred.InputSplit;
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.mapred.Reporter;
+import org.apache.orc.impl.OrcAcidUtils;
+import org.apache.thrift.TException;
+import org.junit.After;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.TemporaryFolder;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+public class TestStreaming {
+  private static final Logger LOG = 
+  public static class RawFileSystem extends RawLocalFileSystem {
+    private static final URI NAME;
+    static {
+      try {
+        NAME = new URI("raw:///");
+      } catch (URISyntaxException se) {
+        throw new IllegalArgumentException("bad uri", se);
+      }
+    }
+    @Override
+    public URI getUri() {
+      return NAME;
+    }
+    @Override
+    public FileStatus getFileStatus(Path path) throws IOException {
+      File file = pathToFile(path);
+      if (!file.exists()) {
+        throw new FileNotFoundException("Can't find " + path);
+      }
+      // get close enough
+      short mod = 0;
+      if (file.canRead()) {
+        mod |= 0444;
+      }
+      if (file.canWrite()) {
+        mod |= 0200;
+      }
+      if (file.canExecute()) {
+        mod |= 0111;
+      }
+      return new FileStatus(file.length(), file.isDirectory(), 1, 1024,
+          file.lastModified(), file.lastModified(),
+          FsPermission.createImmutable(mod), "owen", "users", path);
+    }
+  }
+  private static final String COL1 = "id";
+  private static final String COL2 = "msg";
+  private final HiveConf conf;
+  private IDriver driver;
+  private final IMetaStoreClient msClient;
+  final String metaStoreURI = null;
+  // partitioned table
+  private final static String dbName = "testing";
+  private final static String tblName = "alerts";
+  private final static String[] fieldNames = new String[]{COL1,COL2};
+  List<String> partitionVals;
+  private static Path partLoc;
+  private static Path partLoc2;
+  // unpartitioned table
+  private final static String dbName2 = "testing2";
+  private final static String tblName2 = "alerts";
+  private final static String[] fieldNames2 = new String[]{COL1,COL2};
+  // for bucket join testing
+  private final static String dbName3 = "testing3";
+  private final static String tblName3 = "dimensionTable";
+  private final static String dbName4 = "testing4";
+  private final static String tblName4 = "factTable";
+  List<String> partitionVals2;
+  private final String PART1_CONTINENT = "Asia";
+  private final String PART1_COUNTRY = "India";
+  @Rule
+  public TemporaryFolder dbFolder = new TemporaryFolder();
+  public TestStreaming() throws Exception {
+    partitionVals = new ArrayList<String>(2);
+    partitionVals.add(PART1_CONTINENT);
+    partitionVals.add(PART1_COUNTRY);
+    partitionVals2 = new ArrayList<String>(1);
+    partitionVals2.add(PART1_COUNTRY);
+    conf = new HiveConf(this.getClass());
+    conf.set("fs.raw.impl", RawFileSystem.class.getName());
+    conf
+    .setVar(HiveConf.ConfVars.HIVE_AUTHORIZATION_MANAGER,
+    TxnDbUtil.setConfValues(conf);
+    if (metaStoreURI!=null) {
+      conf.setVar(HiveConf.ConfVars.METASTOREURIS, metaStoreURI);
+    }
+    conf.setBoolVar(HiveConf.ConfVars.METASTORE_EXECUTE_SET_UGI, true);
+    conf.setBoolVar(HiveConf.ConfVars.HIVE_SUPPORT_CONCURRENCY, true);
+    dbFolder.create();
+    //1) Start from a clean slate (metastore)
+    TxnDbUtil.cleanDb(conf);
+    TxnDbUtil.prepDb(conf);
+    //2) obtain metastore clients
+    msClient = new HiveMetaStoreClient(conf);
+  }
+  @Before
+  public void setup() throws Exception {
+    SessionState.start(new CliSessionState(conf));
+    driver = DriverFactory.newDriver(conf);
+    driver.setMaxRows(200002);//make sure Driver returns all results
+    // drop and recreate the necessary databases and tables
+    dropDB(msClient, dbName);
+    String[] colNames = new String[] {COL1, COL2};
+    String[] colTypes = new String[] {serdeConstants.INT_TYPE_NAME, 
+    String[] bucketCols = new String[] {COL1};
+    String loc1 = dbFolder.newFolder(dbName + ".db").toString();
+    String[] partNames = new String[]{"Continent", "Country"};
+    partLoc = createDbAndTable(driver, dbName, tblName, partitionVals, 
colNames, colTypes, bucketCols, partNames, loc1, 1);
+    dropDB(msClient, dbName2);
+    String loc2 = dbFolder.newFolder(dbName2 + ".db").toString();
+    partLoc2 = createDbAndTable(driver, dbName2, tblName2, null, colNames, 
colTypes, bucketCols, null, loc2, 2);
+    String loc3 = dbFolder.newFolder("testing5.db").toString();
+    createStoreSales("testing5", loc3);
+    runDDL(driver, "drop table testBucketing3.streamedtable");
+    runDDL(driver, "drop table testBucketing3.finaltable");
+    runDDL(driver, "drop table testBucketing3.nobucket");
+  }
+  @After
+  public void cleanup() throws Exception {
+    msClient.close();
+    driver.close();
+  }
+  private static List<FieldSchema> getPartitionKeys() {
+    List<FieldSchema> fields = new ArrayList<FieldSchema>();
+    // Defining partition names in unsorted order
+    fields.add(new FieldSchema("continent", serdeConstants.STRING_TYPE_NAME, 
+    fields.add(new FieldSchema("country", serdeConstants.STRING_TYPE_NAME, 
+    return fields;
+  }
+  private void createStoreSales(String dbName, String loc) throws Exception {
+    String dbUri = "raw://" + new Path(loc).toUri().toString();
+    String tableLoc = dbUri + Path.SEPARATOR + "store_sales";
+    boolean success = runDDL(driver, "create database IF NOT EXISTS " + dbName 
+ " location '" + dbUri + "'");
+    Assert.assertTrue(success);
+    success = runDDL(driver, "use " + dbName);
+    Assert.assertTrue(success);
+    success = runDDL(driver, "drop table if exists store_sales");
+    Assert.assertTrue(success);
+    success = runDDL(driver, "create table store_sales\n" +
+      "(\n" +
+      "    ss_sold_date_sk           int,\n" +
+      "    ss_sold_time_sk           int,\n" +
+      "    ss_item_sk                int,\n" +
+      "    ss_customer_sk            int,\n" +
+      "    ss_cdemo_sk               int,\n" +
+      "    ss_hdemo_sk               int,\n" +
+      "    ss_addr_sk                int,\n" +
+      "    ss_store_sk               int,\n" +
+      "    ss_promo_sk               int,\n" +
+      "    ss_ticket_number          int,\n" +
+      "    ss_quantity               int,\n" +
+      "    ss_wholesale_cost         decimal(7,2),\n" +
+      "    ss_list_price             decimal(7,2),\n" +
+      "    ss_sales_price            decimal(7,2),\n" +
+      "    ss_ext_discount_amt       decimal(7,2),\n" +
+      "    ss_ext_sales_price        decimal(7,2),\n" +
+      "    ss_ext_wholesale_cost     decimal(7,2),\n" +
+      "    ss_ext_list_price         decimal(7,2),\n" +
+      "    ss_ext_tax                decimal(7,2),\n" +
+      "    ss_coupon_amt             decimal(7,2),\n" +
+      "    ss_net_paid               decimal(7,2),\n" +
+      "    ss_net_paid_inc_tax       decimal(7,2),\n" +
+      "    ss_net_profit             decimal(7,2)\n" +
+      ")\n" +
+      " partitioned by (dt string)\n" +
+      "clustered by (ss_store_sk, ss_promo_sk)\n" +
+      "INTO 4 BUCKETS stored as orc " + " location '" + tableLoc +  "'" + "  
TBLPROPERTIES ('orc.compress'='NONE', 'transactional'='true')");
+    Assert.assertTrue(success);
+    success = runDDL(driver, "alter table store_sales add 
+    Assert.assertTrue(success);
+  }
+  /**
+   * make sure it works with table where bucket col is not 1st col
+   * @throws Exception
+   */
+  @Test
+  public void testBucketingWhereBucketColIsNotFirstCol() throws Exception {
+    List<String> partitionVals = new ArrayList<String>();
+    partitionVals.add("2015");
+    HiveEndPoint endPt = new HiveEndPoint(metaStoreURI, "testing5", 
"store_sales", partitionVals);
+    StreamingConnection connection = endPt.newConnection(false, "UT_" + 
+    DelimitedInputWriter writer = new DelimitedInputWriter(new String[] 
{"ss_sold_date_sk","ss_sold_time_sk", "ss_item_sk",
+      "ss_customer_sk", "ss_cdemo_sk", "ss_hdemo_sk", "ss_addr_sk", 
"ss_store_sk", "ss_promo_sk", "ss_ticket_number", "ss_quantity",
+      "ss_wholesale_cost", "ss_list_price", "ss_sales_price", 
"ss_ext_discount_amt", "ss_ext_sales_price", "ss_ext_wholesale_cost",
+      "ss_ext_list_price", "ss_ext_tax", "ss_coupon_amt", "ss_net_paid", 
"ss_net_paid_inc_tax", "ss_net_profit"},",", endPt, connection);
+    TransactionBatch txnBatch =  connection.fetchTransactionBatch(2, writer);
+    txnBatch.beginNextTransaction();
+    StringBuilder row = new StringBuilder();
+    for(int i = 0; i < 10; i++) {
+      for(int ints = 0; ints < 11; ints++) {
+        row.append(ints).append(',');
+      }
+      for(int decs = 0; decs < 12; decs++) {
+        row.append(i + 0.1).append(',');
+      }
+      row.setLength(row.length() - 1);
+      txnBatch.write(row.toString().getBytes());
+    }
+    txnBatch.commit();
+    txnBatch.close();
+    connection.close();
+    ArrayList<String> res = queryTable(driver, "select row__id.bucketid, * 
from testing5.store_sales");
+    for (String re : res) {
+      System.out.println(re);
+    }
+  }
+  /**
+   * Test that streaming can write to unbucketed table.
+   */
+  @Test
+  public void testNoBuckets() throws Exception {
+    queryTable(driver, "drop table if exists default.streamingnobuckets");
+    //todo: why does it need transactional_properties?
+    queryTable(driver, "create table default.streamingnobuckets (a string, b 
string) stored as orc TBLPROPERTIES('transactional'='true', 
+    queryTable(driver, "insert into default.streamingnobuckets 
+    List<String> rs = queryTable(driver, "select * from 
+    Assert.assertEquals(1, rs.size());
+    Assert.assertEquals("foo\tbar", rs.get(0));
+    HiveEndPoint endPt = new HiveEndPoint(metaStoreURI, "default", 
"streamingnobuckets", null);
+    String[] colNames1 = new String[] { "a", "b" };
+    StreamingConnection connection = endPt.newConnection(false, "UT_" + 
+    DelimitedInputWriter wr = new DelimitedInputWriter(colNames1,",",  endPt, 
+    TransactionBatch txnBatch =  connection.fetchTransactionBatch(2, wr);
+    txnBatch.beginNextTransaction();
+    txnBatch.write("a1,b2".getBytes());
+    txnBatch.write("a3,b4".getBytes());
+    txnBatch.commit();
+    txnBatch.beginNextTransaction();
+    txnBatch.write("a5,b6".getBytes());
+    txnBatch.write("a7,b8".getBytes());
+    txnBatch.commit();
+    txnBatch.close();
+    Assert.assertEquals("", 0, 
+    rs = queryTable(driver,"select ROW__ID, a, b, INPUT__FILE__NAME from 
default.streamingnobuckets order by ROW__ID");
+    Assert.assertTrue(rs.get(0), 
+    Assert.assertTrue(rs.get(0), 
+    Assert.assertTrue(rs.get(1), 
+    Assert.assertTrue(rs.get(1), 
+    Assert.assertTrue(rs.get(2), 
+    Assert.assertTrue(rs.get(2), 
+    Assert.assertTrue(rs.get(3), 
+    Assert.assertTrue(rs.get(3), 
+    Assert.assertTrue(rs.get(4), 
+    Assert.assertTrue(rs.get(4), 
+    queryTable(driver, "update default.streamingnobuckets set a=0, b=0 where 
+    queryTable(driver, "delete from default.streamingnobuckets where a='a1'");
+    rs = queryTable(driver, "select a, b from default.streamingnobuckets order 
by a, b");
+    int row = 0;
+    Assert.assertEquals("at row=" + row, "0\t0", rs.get(row++));
+    Assert.assertEquals("at row=" + row, "a3\tb4", rs.get(row++));
+    Assert.assertEquals("at row=" + row, "a5\tb6", rs.get(row++));
+    Assert.assertEquals("at row=" + row, "foo\tbar", rs.get(row++));
+    queryTable(driver, "alter table default.streamingnobuckets compact 
+    runWorker(conf);
+    rs = queryTable(driver,"select ROW__ID, a, b, INPUT__FILE__NAME from 
default.streamingnobuckets order by ROW__ID");
+    Assert.assertTrue(rs.get(0), 
+    Assert.assertTrue(rs.get(0), 
+    Assert.assertTrue(rs.get(1), 
+    Assert.assertTrue(rs.get(1), 
+    Assert.assertTrue(rs.get(2), 
+    Assert.assertTrue(rs.get(2), 
+    Assert.assertTrue(rs.get(3), 
+    Assert.assertTrue(rs.get(3), 
+  }
+  /**
+   * this is a clone from TestTxnStatement2....
+   */
+  public static void runWorker(HiveConf hiveConf) throws MetaException {
+    AtomicBoolean stop = new AtomicBoolean(true);
+    Worker t = new Worker();
+    t.setThreadId((int) t.getId());
+    t.setConf(hiveConf);
+    AtomicBoolean looped = new AtomicBoolean();
+    t.init(stop, looped);
+  }
+  // stream data into streaming table with N buckets, then copy the data into 
another bucketed table
+  // check if bucketing in both was done in the same way
+  @Test
+  public void testStreamBucketingMatchesRegularBucketing() throws Exception {
+    int bucketCount = 100;
+    String dbUri = "raw://" + new 
+    String tableLoc  = "'" + dbUri + Path.SEPARATOR + "streamedtable" + "'";
+    String tableLoc2 = "'" + dbUri + Path.SEPARATOR + "finaltable" + "'";
+    String tableLoc3 = "'" + dbUri + Path.SEPARATOR + "nobucket" + "'";
+    runDDL(driver, "create database testBucketing3");
+    runDDL(driver, "use testBucketing3");
+    runDDL(driver, "create table streamedtable ( key1 string,key2 int,data 
string ) clustered by ( key1,key2 ) into "
+            + bucketCount + " buckets  stored as orc  location " + tableLoc + 
" TBLPROPERTIES ('transactional'='true')") ;
+//  In 'nobucket' table we capture bucketid from streamedtable to workaround a 
hive bug that prevents joins two identically bucketed tables
+    runDDL(driver, "create table nobucket ( bucketid int, key1 string,key2 
int,data string ) location " + tableLoc3) ;
+    runDDL(driver, "create table finaltable ( bucketid int, key1 string,key2 
int,data string ) clustered by ( key1,key2 ) into "
+            + bucketCount + " buckets  stored as orc location " + tableLoc2 + 
" TBLPROPERTIES ('transactional'='true')");
+    String[] records = new String[] {
+    };
+    HiveEndPoint endPt = new HiveEndPoint(metaStoreURI, "testBucketing3", 
"streamedtable", null);
+    String[] colNames1 = new String[] { "key1", "key2", "data" };
+    DelimitedInputWriter wr = new DelimitedInputWriter(colNames1,",",  endPt);
+    StreamingConnection connection = endPt.newConnection(false, "UT_" + 
+    TransactionBatch txnBatch =  connection.fetchTransactionBatch(2, wr);
+    txnBatch.beginNextTransaction();
+    for (String record : records) {
+      txnBatch.write(record.toString().getBytes());
+    }
+    txnBatch.commit();
+    txnBatch.close();
+    connection.close();
+    ArrayList<String> res1 = queryTable(driver, "select row__id.bucketid, * 
from streamedtable order by key2");
+    for (String re : res1) {
+      System.out.println(re);
+    }
+"insert into nobucket select row__id.bucketid,* from 
+    runDDL(driver, " insert into finaltable select * from nobucket");
+    ArrayList<String> res2 = queryTable(driver, "select row__id.bucketid,* 
from finaltable where row__id.bucketid<>bucketid");
+    for (String s : res2) {
+      LOG.error(s);
+    }
+    Assert.assertTrue(res2.isEmpty());
+  }
+  @Test
+  public void testTableValidation() throws Exception {
+    int bucketCount = 100;
+    String dbUri = "raw://" + new 
+    String tbl1 = "validation1";
+    String tbl2 = "validation2";
+    String tableLoc  = "'" + dbUri + Path.SEPARATOR + tbl1 + "'";
+    String tableLoc2 = "'" + dbUri + Path.SEPARATOR + tbl2 + "'";
+    runDDL(driver, "create database testBucketing3");
+    runDDL(driver, "use testBucketing3");
+    runDDL(driver, "create table " + tbl1 + " ( key1 string, data string ) 
clustered by ( key1 ) into "
+            + bucketCount + " buckets  stored as orc  location " + tableLoc + 
" TBLPROPERTIES ('transactional'='false')") ;
+    runDDL(driver, "create table " + tbl2 + " ( key1 string, data string ) 
clustered by ( key1 ) into "
+            + bucketCount + " buckets  stored as orc  location " + tableLoc2 + 
" TBLPROPERTIES ('transactional'='false')") ;
+    try {
+      HiveEndPoint endPt = new HiveEndPoint(metaStoreURI, "testBucketing3", 
"validation1", null);
+      endPt.newConnection(false, "UT_" + Thread.currentThread().getName());
+      Assert.assertTrue("InvalidTable exception was not thrown", false);
+    } catch (InvalidTable e) {
+      // expecting this exception
+    }
+    try {
+      HiveEndPoint endPt = new HiveEndPoint(metaStoreURI, "testBucketing3", 
"validation2", null);
+      endPt.newConnection(false, "UT_" + Thread.currentThread().getName());
+      Assert.assertTrue("InvalidTable exception was not thrown", false);
+    } catch (InvalidTable e) {
+      // expecting this exception
+    }
+  }
+  /**
+   * @deprecated use {@link #checkDataWritten2(Path, long, long, int, String, 
boolean, String...)} -
+   * there is little value in using InputFormat directly
+   */
+  @Deprecated
+  private void checkDataWritten(Path partitionPath, long minTxn, long maxTxn, 
int buckets, int numExpectedFiles,
+                                String... records) throws Exception {
+    ValidWriteIdList writeIds = 
msClient.getValidWriteIds(AcidUtils.getFullTableName(dbName, tblName));
+    AcidUtils.Directory dir = AcidUtils.getAcidState(partitionPath, conf, 
+    Assert.assertEquals(0, dir.getObsolete().size());
+    Assert.assertEquals(0, dir.getOriginalFiles().size());
+    List<AcidUtils.ParsedDelta> current = dir.getCurrentDirectories();
+    System.out.println("Files found: ");
+    for (AcidUtils.ParsedDelta pd : current) {
+      System.out.println(pd.getPath().toString());
+    }
+    Assert.assertEquals(numExpectedFiles, current.size());
+    // find the absolute minimum transaction
+    long min = Long.MAX_VALUE;
+    long max = Long.MIN_VALUE;
+    for (AcidUtils.ParsedDelta pd : current) {
+      if (pd.getMaxWriteId() > max) {
+        max = pd.getMaxWriteId();
+      }
+      if (pd.getMinWriteId() < min) {
+        min = pd.getMinWriteId();
+      }
+    }
+    Assert.assertEquals(minTxn, min);
+    Assert.assertEquals(maxTxn, max);
+    InputFormat inf = new OrcInputFormat();
+    JobConf job = new JobConf();
+    job.set("mapred.input.dir", partitionPath.toString());
+    job.set(BUCKET_COUNT, Integer.toString(buckets));
+    job.set(IOConstants.SCHEMA_EVOLUTION_COLUMNS, "id,msg");
+    job.set(IOConstants.SCHEMA_EVOLUTION_COLUMNS_TYPES, "bigint:string");
+    AcidUtils.setAcidOperationalProperties(job, true, null);
+    job.setBoolean(hive_metastoreConstants.TABLE_IS_TRANSACTIONAL, true);
+    job.set(ValidWriteIdList.VALID_WRITEIDS_KEY, writeIds.toString());
+    InputSplit[] splits = inf.getSplits(job, buckets);
+    Assert.assertEquals(numExpectedFiles, splits.length);
+    org.apache.hadoop.mapred.RecordReader<NullWritable, OrcStruct> rr =
+            inf.getRecordReader(splits[0], job, Reporter.NULL);
+    NullWritable key = rr.createKey();
+    OrcStruct value = rr.createValue();
+    for (String record : records) {
+      Assert.assertEquals(true,, value));
+      Assert.assertEquals(record, value.toString());
+    }
+    Assert.assertEquals(false,, value));
+  }
+  /**
+   * @param validationQuery query to read from table to compare data against 
{@code records}
+   * @param records expected data.  each row is CVS list of values
+   */
+  private void checkDataWritten2(Path partitionPath, long minTxn, long maxTxn, 
int numExpectedFiles,
+                                String validationQuery, boolean vectorize, 
String... records) throws Exception {
+    ValidWriteIdList txns = 
msClient.getValidWriteIds(AcidUtils.getFullTableName(dbName, tblName));
+    AcidUtils.Directory dir = AcidUtils.getAcidState(partitionPath, conf, 
+    Assert.assertEquals(0, dir.getObsolete().size());
+    Assert.assertEquals(0, dir.getOriginalFiles().size());
+    List<AcidUtils.ParsedDelta> current = dir.getCurrentDirectories();
+    System.out.println("Files found: ");
+    for (AcidUtils.ParsedDelta pd : current) {
+      System.out.println(pd.getPath().toString());
+    }
+    Assert.assertEquals(numExpectedFiles, current.size());
+    // find the absolute minimum transaction
+    long min = Long.MAX_VALUE;
+    long max = Long.MIN_VALUE;
+    for (AcidUtils.ParsedDelta pd : current) {
+      if (pd.getMaxWriteId() > max) {
+        max = pd.getMaxWriteId();
+      }
+      if (pd.getMinWriteId() < min) {
+        min = pd.getMinWriteId();
+      }
+    }
+    Assert.assertEquals(minTxn, min);
+    Assert.assertEquals(maxTxn, max);
+    boolean isVectorizationEnabled = 
+    if(vectorize) {
+      conf.setBoolVar(HiveConf.ConfVars.HIVE_VECTORIZATION_ENABLED, true);
+    }
+    String currStrategy = 
+    for(String strategy : 
+      //run it with each split strategy - make sure there are differences
+      conf.setVar(HiveConf.ConfVars.HIVE_ORC_SPLIT_STRATEGY, 
+      List<String> actualResult = queryTable(driver, validationQuery);
+      for (int i = 0; i < actualResult.size(); i++) {
+        Assert.assertEquals("diff at [" + i + "].  actual=" + actualResult + " 
expected=" +
+          Arrays.toString(records), records[i], actualResult.get(i));
+      }
+    }
+    conf.setVar(HiveConf.ConfVars.HIVE_ORC_SPLIT_STRATEGY, currStrategy);
+    conf.setBoolVar(HiveConf.ConfVars.HIVE_VECTORIZATION_ENABLED, 
+  }
+  private void checkNothingWritten(Path partitionPath) throws Exception {
+    ValidWriteIdList writeIds = 
msClient.getValidWriteIds(AcidUtils.getFullTableName(dbName, tblName));
+    AcidUtils.Directory dir = AcidUtils.getAcidState(partitionPath, conf, 
+    Assert.assertEquals(0, dir.getObsolete().size());
+    Assert.assertEquals(0, dir.getOriginalFiles().size());
+    List<AcidUtils.ParsedDelta> current = dir.getCurrentDirectories();
+    Assert.assertEquals(0, current.size());
+  }
+  @Test
+  public void testEndpointConnection() throws Exception {
+    // For partitioned table, partitionVals are specified
+    HiveEndPoint endPt = new HiveEndPoint(metaStoreURI, dbName, tblName, 
+    StreamingConnection connection = endPt.newConnection(false, "UT_" + 
Thread.currentThread().getName()); //shouldn't throw
+    connection.close();
+    // For unpartitioned table, partitionVals are not specified
+    endPt = new HiveEndPoint(metaStoreURI, dbName2, tblName2, null);
+    endPt.newConnection(false, "UT_" + 
Thread.currentThread().getName()).close(); // should not throw
+    // For partitioned table, partitionVals are not specified
+    try {
+      endPt = new HiveEndPoint(metaStoreURI, dbName, tblName, null);
+      connection = endPt.newConnection(true, "UT_" + 
+      Assert.assertTrue("ConnectionError was not thrown", false);
+      connection.close();
+    } catch (ConnectionError e) {
+      // expecting this exception
+      String errMsg = "doesn't specify any partitions for partitioned table";
+      Assert.assertTrue(e.toString().endsWith(errMsg));
+    }
+    // For unpartitioned table, partition values are specified
+    try {
+      endPt = new HiveEndPoint(metaStoreURI, dbName2, tblName2, partitionVals);
+      connection = endPt.newConnection(false, "UT_" + 
+      Assert.assertTrue("ConnectionError was not thrown", false);
+      connection.close();
+    } catch (ConnectionError e) {
+      // expecting this exception
+      String errMsg = "specifies partitions for unpartitioned table";
+      Assert.assertTrue(e.toString().endsWith(errMsg));
+    }
+  }
+  @Test
+  public void testAddPartition() throws Exception {
+    List<String> newPartVals = new ArrayList<String>(2);
+    newPartVals.add(PART1_CONTINENT);
+    newPartVals.add("Nepal");
+    HiveEndPoint endPt = new HiveEndPoint(metaStoreURI, dbName, tblName
+            , newPartVals);
+    // Ensure partition is absent
+    try {
+      msClient.getPartition(endPt.database, endPt.table, endPt.partitionVals);
+      Assert.assertTrue("Partition already exists", false);
+    } catch (NoSuchObjectException e) {
+      // expect this exception
+    }
+    // Create partition
+    Assert.assertNotNull(endPt.newConnection(true, "UT_" + 
+    // Ensure partition is present
+    Partition p = msClient.getPartition(endPt.database, endPt.table, 
+    Assert.assertNotNull("Did not find added partition", p);
+  }
+  @Test
+  public void testTransactionBatchEmptyCommit() throws Exception {
+    // 1)  to partitioned table
+    HiveEndPoint endPt = new HiveEndPoint(metaStoreURI, dbName, tblName,
+            partitionVals);
+    StreamingConnection connection = endPt.newConnection(false, "UT_" + 
+    DelimitedInputWriter writer = new DelimitedInputWriter(fieldNames,",", 
endPt, connection);
+    TransactionBatch txnBatch =  connection.fetchTransactionBatch(10, writer);
+    txnBatch.beginNextTransaction();
+    txnBatch.commit();
+    Assert.assertEquals(TransactionBatch.TxnState.COMMITTED
+            , txnBatch.getCurrentTransactionState());
+    txnBatch.close();
+    connection.close();
+    // 2) To unpartitioned table
+    endPt = new HiveEndPoint(metaStoreURI, dbName2, tblName2, null);
+    writer = new DelimitedInputWriter(fieldNames2,",", endPt);
+    connection = endPt.newConnection(false, "UT_" + 
+    txnBatch =  connection.fetchTransactionBatch(10, writer);
+    txnBatch.beginNextTransaction();
+    txnBatch.commit();
+    Assert.assertEquals(TransactionBatch.TxnState.COMMITTED
+            , txnBatch.getCurrentTransactionState());
+    txnBatch.close();
+    connection.close();
+  }
+  /**
+   * check that transactions that have not heartbeated and timedout get 
properly aborted
+   * @throws Exception
+   */
+  @Test
+  public void testTimeOutReaper() throws Exception {
+    HiveEndPoint endPt = new HiveEndPoint(metaStoreURI, dbName2, tblName2, 
+    DelimitedInputWriter writer = new DelimitedInputWriter(fieldNames2,",", 
+    StreamingConnection connection = endPt.newConnection(false, "UT_" + 
+    TransactionBatch txnBatch =  connection.fetchTransactionBatch(5, writer);
+    txnBatch.beginNextTransaction();
+    conf.setTimeVar(HiveConf.ConfVars.HIVE_TIMEDOUT_TXN_REAPER_START, 0, 
+    //ensure txn timesout
+    conf.setTimeVar(HiveConf.ConfVars.HIVE_TXN_TIMEOUT, 1, 
+    AcidHouseKeeperService houseKeeperService = new AcidHouseKeeperService();
+    houseKeeperService.setConf(conf);
+    try {
+      //should fail because the TransactionBatch timed out
+      txnBatch.commit();
+    }
+    catch(TransactionError e) {
+      Assert.assertTrue("Expected aborted transaction", e.getCause() 
instanceof TxnAbortedException);
+    }
+    txnBatch.close();
+    txnBatch =  connection.fetchTransactionBatch(10, writer);
+    txnBatch.beginNextTransaction();
+    txnBatch.commit();
+    txnBatch.beginNextTransaction();
+    try {
+      //should fail because the TransactionBatch timed out
+      txnBatch.commit();
+    }
+    catch(TransactionError e) {
+      Assert.assertTrue("Expected aborted transaction", e.getCause() 
instanceof TxnAbortedException);
+    }
+    txnBatch.close();
+    connection.close();
+  }
+  @Test
+  public void testHeartbeat() throws Exception {
+    HiveEndPoint endPt = new HiveEndPoint(metaStoreURI, dbName2, tblName2, 
+    StreamingConnection connection = endPt.newConnection(false, "UT_" + 
+    DelimitedInputWriter writer = new DelimitedInputWriter(fieldNames2,",", 
endPt, connection);
+    TransactionBatch txnBatch =  connection.fetchTransactionBatch(5, writer);
+    txnBatch.beginNextTransaction();
+    //todo: this should ideally check Transaction heartbeat as well, but 
+    //timestamp is not reported yet
+    //GetOpenTxnsInfoResponse txnresp = msClient.showTxns();
+    ShowLocksRequest request = new ShowLocksRequest();
+    request.setDbname(dbName2);
+    request.setTablename(tblName2);
+    ShowLocksResponse response = msClient.showLocks(request);
+    Assert.assertEquals("Wrong nubmer of locks: " + response, 1, 
+    ShowLocksResponseElement lock = response.getLocks().get(0);
+    long acquiredAt = lock.getAcquiredat();
+    long heartbeatAt = lock.getLastheartbeat();
+    txnBatch.heartbeat();
+    response = msClient.showLocks(request);
+    Assert.assertEquals("Wrong number of locks2: " + response, 1, 
+    lock = response.getLocks().get(0);
+    Assert.assertEquals("Acquired timestamp didn't match", acquiredAt, 
+    Assert.assertTrue("Expected new heartbeat (" + lock.getLastheartbeat() +
+      ") == old heartbeat(" + heartbeatAt +")", lock.getLastheartbeat() == 
+    txnBatch.close();
+    int txnBatchSize = 200;
+    txnBatch = connection.fetchTransactionBatch(txnBatchSize, writer);
+    for(int i = 0; i < txnBatchSize; i++) {
+      txnBatch.beginNextTransaction();
+      if(i % 47 == 0) {
+        txnBatch.heartbeat();
+      }
+      if(i % 10 == 0) {
+        txnBatch.abort();
+      }
+      else {
+        txnBatch.commit();
+      }
+      if(i % 37 == 0) {
+        txnBatch.heartbeat();
+      }
+    }
+  }
+  @Test
+  public void testTransactionBatchEmptyAbort() throws Exception {
+    // 1) to partitioned table
+    HiveEndPoint endPt = new HiveEndPoint(metaStoreURI, dbName, tblName,
+            partitionVals);
+    StreamingConnection connection = endPt.newConnection(true, "UT_" + 
+    DelimitedInputWriter writer = new DelimitedInputWriter(fieldNames,",", 
endPt, connection);
+    TransactionBatch txnBatch =  connection.fetchTransactionBatch(10, writer);
+    txnBatch.beginNextTransaction();
+    txnBatch.abort();
+    Assert.assertEquals(TransactionBatch.TxnState.ABORTED
+            , txnBatch.getCurrentTransactionState());
+    txnBatch.close();
+    connection.close();
+    // 2) to unpartitioned table
+    endPt = new HiveEndPoint(metaStoreURI, dbName2, tblName2, null);
+    writer = new DelimitedInputWriter(fieldNames,",", endPt);
+    connection = endPt.newConnection(true, "UT_" + 
+    txnBatch =  connection.fetchTransactionBatch(10, writer);
+    txnBatch.beginNextTransaction();
+    txnBatch.abort();
+    Assert.assertEquals(TransactionBatch.TxnState.ABORTED
+            , txnBatch.getCurrentTransactionState());
+    txnBatch.close();
+    connection.close();
+  }
+  @Test
+  public void testTransactionBatchCommit_Delimited() throws Exception {
+    testTransactionBatchCommit_Delimited(null);
+  }
+  @Test
+  public void testTransactionBatchCommit_DelimitedUGI() throws Exception {
+    testTransactionBatchCommit_Delimited(Utils.getUGI());
+  }
+  private void testTransactionBatchCommit_Delimited(UserGroupInformation ugi) 
throws Exception {
+    HiveEndPoint endPt = new HiveEndPoint(metaStoreURI, dbName, tblName,
+      partitionVals);
+    StreamingConnection connection = endPt.newConnection(true, conf, ugi, 
"UT_" + Thread.currentThread().getName());
+    DelimitedInputWriter writer = new DelimitedInputWriter(fieldNames,",", 
endPt, conf, connection);
+    // 1st Txn
+    TransactionBatch txnBatch =  connection.fetchTransactionBatch(10, writer);
+    txnBatch.beginNextTransaction();
+    Assert.assertEquals(TransactionBatch.TxnState.OPEN
+      , txnBatch.getCurrentTransactionState());
+    txnBatch.write("1,Hello streaming".getBytes());
+    txnBatch.commit();
+    checkDataWritten(partLoc, 1, 10, 1, 1, "{1, Hello streaming}");
+    Assert.assertEquals(TransactionBatch.TxnState.COMMITTED
+      , txnBatch.getCurrentTransactionState());
+    // 2nd Txn
+    txnBatch.beginNextTransaction();
+    Assert.assertEquals(TransactionBatch.TxnState.OPEN
+      , txnBatch.getCurrentTransactionState());
+    txnBatch.write("2,Welcome to streaming".getBytes());
+    // data should not be visible
+    checkDataWritten(partLoc, 1, 10, 1, 1, "{1, Hello streaming}");
+    txnBatch.commit();
+    checkDataWritten(partLoc, 1, 10, 1, 1, "{1, Hello streaming}",
+      "{2, Welcome to streaming}");
+    txnBatch.close();
+    Assert.assertEquals(TransactionBatch.TxnState.INACTIVE
+      , txnBatch.getCurrentTransactionState());
+    connection.close();
+    // To Unpartitioned table
+    endPt = new HiveEndPoint(metaStoreURI, dbName2, tblName2, null);
+    connection = endPt.newConnection(true, conf, ugi, "UT_" + 
+    writer = new DelimitedInputWriter(fieldNames,",", endPt, conf, connection);
+    // 1st Txn
+    txnBatch =  connection.fetchTransactionBatch(10, writer);
+    txnBatch.beginNextTransaction();
+    Assert.assertEquals(TransactionBatch.TxnState.OPEN
+      , txnBatch.getCurrentTransactionState());
+    txnBatch.write("1,Hello streaming".getBytes());
+    txnBatch.commit();
+    Assert.assertEquals(TransactionBatch.TxnState.COMMITTED
+      , txnBatch.getCurrentTransactionState());
+    connection.close();
+  }
+  @Test
+  public void testTransactionBatchCommit_Regex() throws Exception {
+    testTransactionBatchCommit_Regex(null);
+  }
+  @Test
+  public void testTransactionBatchCommit_RegexUGI() throws Exception {
+    testTransactionBatchCommit_Regex(Utils.getUGI());
+  }
+  private void testTransactionBatchCommit_Regex(UserGroupInformation ugi) 
throws Exception {
+    HiveEndPoint endPt = new HiveEndPoint(metaStoreURI, dbName, tblName,
+      partitionVals);
+    StreamingConnection connection = endPt.newConnection(true, conf, ugi, 
"UT_" + Thread.currentThread().getName());
+    String regex = "([^,]*),(.*)";
+    StrictRegexWriter writer = new StrictRegexWriter(regex, endPt, conf, 
+    // 1st Txn
+    TransactionBatch txnBatch =  connection.fetchTransactionBatch(10, writer);
+    txnBatch.beginNextTransaction();
+    Assert.assertEquals(TransactionBatch.TxnState.OPEN
+      , txnBatch.getCurrentTransactionState());
+    txnBatch.write("1,Hello streaming".getBytes());
+    txnBatch.commit();
+    checkDataWritten(partLoc, 1, 10, 1, 1, "{1, Hello streaming}");
+    Assert.assertEquals(TransactionBatch.TxnState.COMMITTED
+      , txnBatch.getCurrentTransactionState());
+    // 2nd Txn
+    txnBatch.beginNextTransaction();
+    Assert.assertEquals(TransactionBatch.TxnState.OPEN
+      , txnBatch.getCurrentTransactionState());
+    txnBatch.write("2,Welcome to streaming".getBytes());
+    // data should not be visible
+    checkDataWritten(partLoc, 1, 10, 1, 1, "{1, Hello streaming}");
+    txnBatch.commit();
+    checkDataWritten(partLoc, 1, 10, 1, 1, "{1, Hello streaming}",
+      "{2, Welcome to streaming}");
+    txnBatch.close();
+    Assert.assertEquals(TransactionBatch.TxnState.INACTIVE
+      , txnBatch.getCurrentTransactionState());
+    connection.close();
+    // To Unpartitioned table
+    endPt = new HiveEndPoint(metaStoreURI, dbName2, tblName2, null);
+    connection = endPt.newConnection(true, conf, ugi, "UT_" + 
+    regex = "([^:]*):(.*)";
+    writer = new StrictRegexWriter(regex, endPt, conf, connection);
+    // 1st Txn
+    txnBatch =  connection.fetchTransactionBatch(10, writer);
+    txnBatch.beginNextTransaction();
+    Assert.assertEquals(TransactionBatch.TxnState.OPEN
+      , txnBatch.getCurrentTransactionState());
+    txnBatch.write("1:Hello streaming".getBytes());
+    txnBatch.commit();
+    Assert.assertEquals(TransactionBatch.TxnState.COMMITTED
+      , txnBatch.getCurrentTransactionState());
+    connection.close();
+  }
+  @Test
+  public void testTransactionBatchCommit_Json() throws Exception {
+    HiveEndPoint endPt = new HiveEndPoint(metaStoreURI, dbName, tblName,
+            partitionVals);
+    StreamingConnection connection = endPt.newConnection(true, "UT_" + 
+    StrictJsonWriter writer = new StrictJsonWriter(endPt, connection);
+    // 1st Txn
+    TransactionBatch txnBatch =  connection.fetchTransactionBatch(10, writer);
+    txnBatch.beginNextTransaction();
+    Assert.assertEquals(TransactionBatch.TxnState.OPEN
+            , txnBatch.getCurrentTransactionState());
+    String rec1 = "{\"id\" : 1, \"msg\": \"Hello streaming\"}";
+    txnBatch.write(rec1.getBytes());
+    txnBatch.commit();
+    checkDataWritten(partLoc, 1, 10, 1, 1, "{1, Hello streaming}");
+    Assert.assertEquals(TransactionBatch.TxnState.COMMITTED
+            , txnBatch.getCurrentTransactionState());
+    txnBatch.close();
+    Assert.assertEquals(TransactionBatch.TxnState.INACTIVE
+            , txnBatch.getCurrentTransactionState());
+    connection.close();
+    List<String> rs = queryTable(driver, "select * from " + dbName + "." + 
+    Assert.assertEquals(1, rs.size());
+  }
+  @Test
+  public void testRemainingTransactions() throws Exception {
+    HiveEndPoint endPt = new HiveEndPoint(metaStoreURI, dbName, tblName,
+            partitionVals);
+    DelimitedInputWriter writer = new DelimitedInputWriter(fieldNames,",", 
+    StreamingConnection connection = endPt.newConnection(true, "UT_" + 
+    // 1) test with txn.Commit()
+    TransactionBatch txnBatch =  connection.fetchTransactionBatch(10, writer);
+    int batch=0;
+    int initialCount = txnBatch.remainingTransactions();
+    while (txnBatch.remainingTransactions()>0) {
+      txnBatch.beginNextTransaction();
+      Assert.assertEquals(--initialCount, txnBatch.remainingTransactions());
+      for (int rec=0; rec<2; ++rec) {
+        Assert.assertEquals(TransactionBatch.TxnState.OPEN
+                , txnBatch.getCurrentTransactionState());
+        txnBatch.write((batch * rec + ",Hello streaming").getBytes());
+      }
+      txnBatch.commit();
+      Assert.assertEquals(TransactionBatch.TxnState.COMMITTED
+              , txnBatch.getCurrentTransactionState());
+      ++batch;
+    }
+    Assert.assertEquals(0, txnBatch.remainingTransactions());
+    txnBatch.close();
+    Assert.assertEquals(TransactionBatch.TxnState.INACTIVE
+            , txnBatch.getCurrentTransactionState());
+    // 2) test with txn.Abort()
+    txnBatch =  connection.fetchTransactionBatch(10, writer);
+    batch=0;
+    initialCount = txnBatch.remainingTransactions();
+    while (txnBatch.remainingTransactions()>0) {
+      txnBatch.beginNextTransaction();
+      Assert.assertEquals(--initialCount,txnBatch.remainingTransactions());
+      for (int rec=0; rec<2; ++rec) {
+        Assert.assertEquals(TransactionBatch.TxnState.OPEN
+                , txnBatch.getCurrentTransactionState());
+        txnBatch.write((batch * rec + ",Hello streaming").getBytes());
+      }
+      txnBatch.abort();
+      Assert.assertEquals(TransactionBatch.TxnState.ABORTED
+              , txnBatch.getCurrentTransactionState());
+      ++batch;
+    }
+    Assert.assertEquals(0, txnBatch.remainingTransactions());
+    txnBatch.close();
+    Assert.assertEquals(TransactionBatch.TxnState.INACTIVE
+            , txnBatch.getCurrentTransactionState());
+    connection.close();
+  }
+  @Test
+  public void testTransactionBatchAbort() throws Exception {
+    HiveEndPoint endPt = new HiveEndPoint(metaStoreURI, dbName, tblName,
+            partitionVals);
+    StreamingConnection connection = endPt.newConnection(false, "UT_" + 
+    DelimitedInputWriter writer = new DelimitedInputWriter(fieldNames,",", 
endPt, connection);
+    TransactionBatch txnBatch =  connection.fetchTransactionBatch(10, writer);
+    txnBatch.beginNextTransaction();
+    txnBatch.write("1,Hello streaming".getBytes());
+    txnBatch.write("2,Welcome to streaming".getBytes());
+    txnBatch.abort();
+    checkNothingWritten(partLoc);
+    Assert.assertEquals(TransactionBatch.TxnState.ABORTED
+            , txnBatch.getCurrentTransactionState());
+    txnBatch.close();
+    connection.close();
+    checkNothingWritten(partLoc);
+  }
+  @Test
+  public void testTransactionBatchAbortAndCommit() throws Exception {
+    String agentInfo = "UT_" + Thread.currentThread().getName();
+    HiveEndPoint endPt = new HiveEndPoint(metaStoreURI, dbName, tblName,
+            partitionVals);
+    StreamingConnection connection = endPt.newConnection(false, agentInfo);
+    DelimitedInputWriter writer = new DelimitedInputWriter(fieldNames,",", 
endPt, connection);
+    TransactionBatch txnBatch =  connection.fetchTransactionBatch(10, writer);
+    txnBatch.beginNextTransaction();
+    txnBatch.write("1,Hello streaming".getBytes());
+    txnBatch.write("2,Welcome to streaming".getBytes());
+    ShowLocksResponse resp = msClient.showLocks(new ShowLocksRequest());
+    Assert.assertEquals("LockCount", 1, resp.getLocksSize());
+    Assert.assertEquals("LockType", LockType.SHARED_READ, 
+    Assert.assertEquals("LockState", LockState.ACQUIRED, 
+    Assert.assertEquals("AgentInfo", agentInfo, 
+    txnBatch.abort();
+    checkNothingWritten(partLoc);
+    Assert.assertEquals(TransactionBatch.TxnState.ABORTED
+            , txnBatch.getCurrentTransactionState());
+    txnBatch.beginNextTransaction();
+    txnBatch.write("1,Hello streaming".getBytes());
+    txnBatch.write("2,Welcome to streaming".getBytes());
+    txnBatch.commit();
+    checkDataWritten(partLoc, 1, 10, 1, 1, "{1, Hello streaming}",
+            "{2, Welcome to streaming}");
+    txnBatch.close();
+    connection.close();
+  }
+  @Test
+  public void testMultipleTransactionBatchCommits() throws Exception {
+    HiveEndPoint endPt = new HiveEndPoint(metaStoreURI, dbName, tblName,
+            partitionVals);
+    DelimitedInputWriter writer = new DelimitedInputWriter(fieldNames,",", 
+    StreamingConnection connection = endPt.newConnection(true, "UT_" + 
+    TransactionBatch txnBatch =  connection.fetchTransactionBatch(10, writer);
+    txnBatch.beginNextTransaction();
+    txnBatch.write("1,Hello streaming".getBytes());
+    txnBatch.commit();
+    String validationQuery = "select id, msg from " + dbName + "." + tblName + 
" order by id, msg";
+    checkDataWritten2(partLoc, 1, 10, 1, validationQuery, false, "1\tHello 
+    txnBatch.beginNextTransaction();
+    txnBatch.write("2,Welcome to streaming".getBytes());
+    txnBatch.commit();
+    checkDataWritten2(partLoc, 1, 10,  1, validationQuery, true, "1\tHello 
+            "2\tWelcome to streaming");
+    txnBatch.close();
+    // 2nd Txn Batch
+    txnBatch =  connection.fetchTransactionBatch(10, writer);
+    txnBatch.beginNextTransaction();
+    txnBatch.write("3,Hello streaming - once again".getBytes());
+    txnBatch.commit();
+    checkDataWritten2(partLoc, 1, 20,  2, validationQuery, false, "1\tHello 
+            "2\tWelcome to streaming", "3\tHello streaming - once again");
+    txnBatch.beginNextTransaction();
+    txnBatch.write("4,Welcome to streaming - once again".getBytes());
+    txnBatch.commit();
+    checkDataWritten2(partLoc, 1, 20,  2, validationQuery, true, "1\tHello 
+            "2\tWelcome to streaming", "3\tHello streaming - once again",
+            "4\tWelcome to streaming - once again");
+    Assert.assertEquals(TransactionBatch.TxnState.COMMITTED
+            , txnBatch.getCurrentTransactionState());
+    txnBatch.close();
+    connection.close();
+  }
+  @Test
+  public void testInterleavedTransactionBatchCommits() throws Exception {
+    HiveEndPoint endPt = new HiveEndPoint(metaStoreURI, dbName, tblName,
+            partitionVals);
+    DelimitedInputWriter writer = new DelimitedInputWriter(fieldNames, ",", 
+    StreamingConnection connection = endPt.newConnection(false, "UT_" + 
+    // Acquire 1st Txn Batch
+    TransactionBatch txnBatch1 =  connection.fetchTransactionBatch(10, writer);
+    txnBatch1.beginNextTransaction();
+    // Acquire 2nd Txn Batch
+    DelimitedInputWriter writer2 = new DelimitedInputWriter(fieldNames, ",", 
+    TransactionBatch txnBatch2 =  connection.fetchTransactionBatch(10, 
+    txnBatch2.beginNextTransaction();
+    // Interleaved writes to both batches
+    txnBatch1.write("1,Hello streaming".getBytes());
+    txnBatch2.write("3,Hello streaming - once again".getBytes());
+    checkNothingWritten(partLoc);
+    txnBatch2.commit();
+    String validationQuery = "select id, msg from " + dbName + "." + tblName + 
" order by id, msg";
+    checkDataWritten2(partLoc, 11, 20, 1,
+      validationQuery, true, "3\tHello streaming - once again");
+    txnBatch1.commit();
+    /*now both batches have committed (but not closed) so we for each primary 
file we expect a side
+    file to exist and indicate the true length of primary file*/
+    FileSystem fs = partLoc.getFileSystem(conf);
+    AcidUtils.Directory dir = AcidUtils.getAcidState(partLoc, conf,
+            msClient.getValidWriteIds(AcidUtils.getFullTableName(dbName, 
+    for(AcidUtils.ParsedDelta pd : dir.getCurrentDirectories()) {
+      for(FileStatus stat : fs.listStatus(pd.getPath(), 
AcidUtils.bucketFileFilter)) {
+        Path lengthFile = OrcAcidUtils.getSideFile(stat.getPath());
+        Assert.assertTrue(lengthFile + " missing", fs.exists(lengthFile));
+        long lengthFileSize = fs.getFileStatus(lengthFile).getLen();
+        Assert.assertTrue("Expected " + lengthFile + " to be non empty. 
lengh=" +
+          lengthFileSize, lengthFileSize > 0);
+        long logicalLength = AcidUtils.getLogicalLength(fs, stat);
+        long actualLength = stat.getLen();
+        Assert.assertTrue("", logicalLength == actualLength);
+      }
+    }
+    checkDataWritten2(partLoc, 1, 20, 2,
+      validationQuery, false,"1\tHello streaming", "3\tHello streaming - once 
+    txnBatch1.beginNextTransaction();
+    txnBatch1.write("2,Welcome to streaming".getBytes());
+    txnBatch2.beginNextTransaction();
+    txnBatch2.write("4,Welcome to streaming - once again".getBytes());
+    //here each batch has written data and committed (to bucket0 since table 
only has 1 bucket)
+    //so each of 2 deltas has 1 bucket0 and 1 bucket0_flush_length.  
Furthermore, each bucket0
+    //has now received more data(logically - it's buffered) but it is not yet 
+    //lets check that side files exist, etc
+    dir = AcidUtils.getAcidState(partLoc, conf, 
msClient.getValidWriteIds(AcidUtils.getFullTableName(dbName, tblName)));
+    for(AcidUtils.ParsedDelta pd : dir.getCurrentDirectories()) {
+      for(FileStatus stat : fs.listStatus(pd.getPath(), 
AcidUtils.bucketFileFilter)) {
+        Path lengthFile = OrcAcidUtils.getSideFile(stat.getPath());
+        Assert.assertTrue(lengthFile + " missing", fs.exists(lengthFile));
+        long lengthFileSize = fs.getFileStatus(lengthFile).getLen();
+        Assert.assertTrue("Expected " + lengthFile + " to be non empty. 
lengh=" +
+          lengthFileSize, lengthFileSize > 0);
+        long logicalLength = AcidUtils.getLogicalLength(fs, stat);
+        long actualLength = stat.getLen();
+        Assert.assertTrue("", logicalLength <= actualLength);
+      }
+    }
+    checkDataWritten2(partLoc, 1, 20, 2,
+      validationQuery, true,"1\tHello streaming", "3\tHello streaming - once 
+    txnBatch1.commit();
+    checkDataWritten2(partLoc, 1, 20, 2,
+      validationQuery, false, "1\tHello streaming",
+        "2\tWelcome to streaming",
+        "3\tHello streaming - once again");
+    txnBatch2.commit();
+    checkDataWritten2(partLoc, 1, 20, 2,
+      validationQuery, true, "1\tHello streaming",
+        "2\tWelcome to streaming",
+        "3\tHello streaming - once again",
+        "4\tWelcome to streaming - once again");
+    Assert.assertEquals(TransactionBatch.TxnState.COMMITTED
+            , txnBatch1.getCurrentTransactionState());
+    Assert.assertEquals(TransactionBatch.TxnState.COMMITTED
+            , txnBatch2.getCurrentTransactionState());
+    txnBatch1.close();
+    txnBatch2.close();
+    connection.close();
+  }
+  private static class WriterThd extends Thread {
+    private final StreamingConnection conn;
+    private final DelimitedInputWriter writer;
+    private final String data;
+    private Throwable error;
+    WriterThd(HiveEndPoint ep, String data) throws Exception {
+      super("Writer_" + data);
+      writer = new DelimitedInputWriter(fieldNames, ",", ep);
+      conn = ep.newConnection(false, "UT_" + Thread.currentThread().getName());
+ = data;
+      setUncaughtExceptionHandler(new UncaughtExceptionHandler() {
+        @Override
+        public void uncaughtException(Thread thread, Throwable throwable) {
+          error = throwable;
+          LOG.error("Thread " + thread.getName() + " died: " + 
throwable.getMessage(), throwable);
+        }
+      });
+    }
+    @Override
+    public void run() {
+      TransactionBatch txnBatch = null;
+      try {
+        txnBatch =  conn.fetchTransactionBatch(10, writer);
+        while (txnBatch.remainingTransactions() > 0) {
+          txnBatch.beginNextTransaction();
+          txnBatch.write(data.getBytes());
+          txnBatch.write(data.getBytes());
+          txnBatch.commit();
+        } // while
+      } catch (Exception e) {
+        throw new RuntimeException(e);
+      } finally {
+        if (txnBatch != null) {
+          try {
+            txnBatch.close();
+          } catch (Exception e) {
+            LOG.error("txnBatch.close() failed: " + e.getMessage(), e);
+            conn.close();
+          }
+        }
+        try {
+          conn.close();
+        } catch (Exception e) {
+          LOG.error("conn.close() failed: " + e.getMessage(), e);
+        }
+      }
+    }
+  }
+  @Test
+  public void testConcurrentTransactionBatchCommits() throws Exception {
+    final HiveEndPoint ep = new HiveEndPoint(metaStoreURI, dbName, tblName, 
+    List<WriterThd> writers = new ArrayList<WriterThd>(3);
+    writers.add(new WriterThd(ep, "1,Matrix"));
+    writers.add(new WriterThd(ep, "2,Gandhi"));
+    writers.add(new WriterThd(ep, "3,Silence"));
+    for(WriterThd w : writers) {
+      w.start();
+    }
+    for(WriterThd w : writers) {
+      w.join();
+    }
+    for(WriterThd w : writers) {
+      if(w.error != null) {
+        Assert.assertFalse("Writer thread" + w.getName() + " died: " + 
w.error.getMessage() +
+          " See log file for stack trace", true);
+      }
+    }
+  }
+  private ArrayList<SampleRec> dumpBucket(Path orcFile) throws IOException {
+    org.apache.hadoop.fs.FileSystem fs = 
org.apache.hadoop.fs.FileSystem.getLocal(new Configuration());
+    Reader reader = OrcFile.createReader(orcFile,
+            OrcFile.readerOptions(conf).filesystem(fs));
+    RecordReader rows = reader.rows();
+    StructObjectInspector inspector = (StructObjectInspector) reader
+            .getObjectInspector();
+    System.out.format("Found Bucket File : %s \n", orcFile.getName());
+    ArrayList<SampleRec> result = new ArrayList<SampleRec>();
+    while (rows.hasNext()) {
+      Object row =;
+      SampleRec rec = (SampleRec) deserializeDeltaFileRow(row, inspector)[5];
+      result.add(rec);
+    }
+    return result;
+  }
+  // Assumes stored data schema = [acid fields],string,int,string
+  // return array of 6 fields, where the last field has the actual data
+  private static Object[] deserializeDeltaFileRow(Object row, 
StructObjectInspector inspector) {
+    List<? extends StructField> fields = inspector.getAllStructFieldRefs();
+    WritableIntObjectInspector f0ins = (WritableIntObjectInspector) 
+    WritableLongObjectInspector f1ins = (WritableLongObjectInspector) 
+    WritableIntObjectInspector f2ins = (WritableIntObjectInspector) 
+    WritableLongObjectInspector f3ins = (WritableLongObjectInspector) 
+    WritableLongObjectInspector f4ins = (WritableLongObjectInspector)  
+    StructObjectInspector f5ins = (StructObjectInspector) 
+    int f0 = f0ins.get(inspector.getStructFieldData(row, fields.get(0)));
+    long f1 = f1ins.get(inspector.getStructFieldData(row, fields.get(1)));
+    int f2 = f2ins.get(inspector.getStructFieldData(row, fields.get(2)));
+    long f3 = f3ins.get(inspector.getStructFieldData(row, fields.get(3)));
+    long f4 = f4ins.get(inspector.getStructFieldData(row, fields.get(4)));
+    SampleRec f5 = deserializeInner(inspector.getStructFieldData(row, 
fields.get(5)), f5ins);
+    return new Object[] {f0, f1, f2, f3, f4, f5};
+  }
+  // Assumes row schema => string,int,string
+  private static SampleRec deserializeInner(Object row, StructObjectInspector 
inspector) {
+    List<? extends StructField> fields = inspector.getAllStructFieldRefs();
+    WritableStringObjectInspector f0ins = (WritableStringObjectInspector) 
+    WritableIntObjectInspector f1ins = (WritableIntObjectInspector) 
+    WritableStringObjectInspector f2ins = (WritableStringObjectInspector) 
+    String f0 = f0ins.getPrimitiveJavaObject(inspector.getStructFieldData(row, 
+    int f1 = f1ins.get(inspector.getStructFieldData(row, fields.get(1)));
+    String f2 = f2ins.getPrimitiveJavaObject(inspector.getStructFieldData(row, 
+    return new SampleRec(f0, f1, f2);
+  }
+  @Test
+  public void testBucketing() throws Exception {
+    String agentInfo = "UT_" + Thread.currentThread().getName();
+    dropDB(msClient, dbName3);
+    dropDB(msClient, dbName4);
+    // 1) Create two bucketed tables
+    String dbLocation = dbFolder.newFolder(dbName3).getCanonicalPath() + ".db";
+    dbLocation = dbLocation.replaceAll("\\\\","/"); // for windows paths
+    String[] colNames = "key1,key2,data".split(",");
+    String[] colTypes = "string,int,string".split(",");
+    String[] bucketNames = "key1,key2".split(",");
+    int bucketCount = 4;
+    createDbAndTable(driver, dbName3, tblName3, null, colNames, colTypes, 
+            , null, dbLocation, bucketCount);
+    String dbLocation2 = dbFolder.newFolder(dbName4).getCanonicalPath() + 
+    dbLocation2 = dbLocation2.replaceAll("\\\\","/"); // for windows paths
+    String[] colNames2 = "key3,key4,data2".split(",");
+    String[] colTypes2 = "string,int,string".split(",");
+    String[] bucketNames2 = "key3,key4".split(",");
+    createDbAndTable(driver, dbName4, tblName4, null, colNames2, colTypes2, 
+            , null, dbLocation2, bucketCount);
+    // 2) Insert data into both tables
+    HiveEndPoint endPt = new HiveEndPoint(metaStoreURI, dbName3, tblName3, 
+    StreamingConnection connection = endPt.newConnection(false, agentInfo);
+    DelimitedInputWriter writer = new DelimitedInputWriter(colNames,",", 
endPt, connection);
+    TransactionBatch txnBatch =  connection.fetchTransactionBatch(2, writer);
+    txnBatch.beginNextTransaction();
+    txnBatch.write("name0,1,Hello streaming".getBytes());
+    txnBatch.write("name2,2,Welcome to streaming".getBytes());
+    txnBatch.write("name4,2,more Streaming unlimited".getBytes());
+    txnBatch.write("name5,2,even more Streaming unlimited".getBytes());
+    txnBatch.commit();
+    HiveEndPoint endPt2 = new HiveEndPoint(metaStoreURI, dbName4, tblName4, 
+    StreamingConnection connection2 = endPt2.newConnection(false, agentInfo);
+    DelimitedInputWriter writer2 = new DelimitedInputWriter(colNames2,",", 
endPt2, connection);
+    TransactionBatch txnBatch2 =  connection2.fetchTransactionBatch(2, 
+    txnBatch2.beginNextTransaction();
+    txnBatch2.write("name5,2,fact3".getBytes());  // bucket 0
+    txnBatch2.write("name8,2,fact3".getBytes());  // bucket 1
+    txnBatch2.write("name0,1,fact1".getBytes());  // bucket 2
+    txnBatch2.commit();
+    // 3 Check data distribution in  buckets
+    HashMap<Integer, ArrayList<SampleRec>> actual1 = 
dumpAllBuckets(dbLocation, tblName3);
+    HashMap<Integer, ArrayList<SampleRec>> actual2 = 
dumpAllBuckets(dbLocation2, tblName4);
+    System.err.println("\n  Table 1");
+    System.err.println(actual1);
+    System.err.println("\n  Table 2");
+    System.err.println(actual2);
+    // assert bucket listing is as expected
+    Assert.assertEquals("number of buckets does not match expectation", 
actual1.values().size(), 3);
+    Assert.assertEquals("records in bucket does not match expectation", 
actual1.get(0).size(), 2);
+    Assert.assertEquals("records in bucket does not match expectation", 
actual1.get(1).size(), 1);
+    Assert.assertTrue("bucket 2 shouldn't have been created", actual1.get(2) 
== null);
+    Assert.assertEquals("records in bucket does not match expectation", 
actual1.get(3).size(), 1);
+  }
+  private void runCmdOnDriver(String cmd) throws QueryFailedException {
+    boolean t = runDDL(driver, cmd);
+    Assert.assertTrue(cmd + " failed", t);
+  }
+  @Test
+  public void testFileDump() throws Exception {
+    String agentInfo = "UT_" + Thread.currentThread().getName();
+    dropDB(msClient, dbName3);
+    dropDB(msClient, dbName4);
+    // 1) Create two bucketed tables
+    String dbLocation = dbFolder.newFolder(dbName3).getCanonicalPath() + ".db";
+    dbLocation = dbLocation.replaceAll("\\\\","/"); // for windows paths
+    String[] colNames = "key1,key2,data".split(",");
+    String[] colTypes = "string,int,string".split(",");
+    String[] bucketNames = "key1,key2".split(",");
+    int bucketCount = 4;
+    createDbAndTable(driver, dbName3, tblName3, null, colNames, colTypes, 
+        , null, dbLocation, bucketCount);
+    String dbLocation2 = dbFolder.newFolder(dbName4).getCanonicalPath() + 
+    dbLocation2 = dbLocation2.replaceAll("\\\\","/"); // for windows paths
+    String[] colNames2 = "key3,key4,data2".split(",");
+    String[] colTypes2 = "string,int,string".split(",");
+    String[] bucketNames2 = "key3,key4".split(",");
+    createDbAndTable(driver, dbName4, tblName4, null, colNames2, colTypes2, 
+        , null, dbLocation2, bucketCount);
+    // 2) Insert data into both tables
+    HiveEndPoint endPt = new HiveEndPoint(metaStoreURI, dbName3, tblName3, 
+    StreamingConnection connection = endPt.newConnection(false, agentInfo);
+    DelimitedInputWriter writer = new DelimitedInputWriter(colNames,",", 
endPt, connection);
+    TransactionBatch txnBatch =  connection.fetchTransactionBatch(2, writer);
+    txnBatch.beginNextTransaction();
+    txnBatch.write("name0,1,Hello streaming".getBytes());
+    txnBatch.write("name2,2,Welcome to streaming".getBytes());
+    txnBatch.write("name4,2,more Streaming unlimited".getBytes());
+    txnBatch.write("name5,2,even more Streaming unlimited".getBytes());
+    txnBatch.commit();
+    PrintStream origErr = System.err;
+    ByteArrayOutputStream myErr = new ByteArrayOutputStream();
+    // replace stderr and run command
+    System.setErr(new PrintStream(myErr));
+    FileDump.main(new String[]{dbLocation});
+    System.err.flush();
+    System.setErr(origErr);
+    String errDump = new String(myErr.toByteArray());
+    Assert.assertEquals(false, errDump.contains("file(s) are corrupted"));
+    // since this test runs on local file system which does not have an API to 
tell if files or
+    // open or not, we are testing for negative case even though the bucket 
files are still open
+    // for writes (transaction batch not closed yet)
+    Assert.assertEquals(false, errDump.contains("is still open for writes."));
+    HiveEndPoint endPt2 = new HiveEndPoint(metaStoreURI, dbName4, tblName4, 
+    DelimitedInputWriter writer2 = new DelimitedInputWriter(colNames2,",", 
+    StreamingConnection connection2 = endPt2.newConnection(false, agentInfo);
+    TransactionBatch txnBatch2 =  connection2.fetchTransactionBatch(2, 
+    txnBatch2.beginNextTransaction();
+    txnBatch2.write("name5,2,fact3".getBytes());  // bucket 0
+    txnBatch2.write("name8,2,fact3".getBytes());  // bucket 1
+    txnBatch2.write("name0,1,fact1".getBytes());  // bucket 2
+    // no data for bucket 3 -- expect 0 length bucket file
+    txnBatch2.commit();
+    origErr = System.err;
+    myErr = new ByteArrayOutputStream();
+    // replace stderr and run command
+    System.setErr(new PrintStream(myErr));
+    FileDump.main(new String[]{dbLocation});
+    System.out.flush();
+    System.err.flush();
+    System.setErr(origErr);
+    errDump = new String(myErr.toByteArray());
+    Assert.assertEquals(false, errDump.contains("Exception"));
+    Assert.assertEquals(false, errDump.contains("file(s) are corrupted"));
+    Assert.assertEquals(false, errDump.contains("is still open for writes."));
+  }
+  @Test
+  public void testFileDumpCorruptDataFiles() throws Exception {
+    dropDB(msClient, dbName3);
+    // 1) Create two bucketed tables
+    String dbLocation = dbFolder.newFolder(dbName3).getCanonicalPath() + ".db";
+    dbLocation = dbLocation.replaceAll("\\\\","/"); // for windows paths
+    String[] colNames = "key1,key2,data".split(",");
+    String[] colTypes = "string,int,string".split(",");
+    String[] bucketNames = "key1,key2".split(",");
+    int bucketCount = 4;
+    createDbAndTable(driver, dbName3, tblName3, null, colNames, colTypes, 
+        , null, dbLocation, bucketCount);
+    // 2) Insert data into both tables
+    HiveEndPoint endPt = new HiveEndPoint(metaStoreURI, dbName3, tblName3, 
+    StreamingConnection connection = endPt.newConnection(false, "UT_" + 
+    DelimitedInputWriter writer = new DelimitedInputWriter(colNames,",", 
endPt, connection);
+    // we need side file for this test, so we create 2 txn batch and test with 
only one
+    TransactionBatch txnBatch =  connection.fetchTransactionBatch(2, writer);
+    txnBatch.beginNextTransaction();
+    txnBatch.write("name0,1,Hello streaming".getBytes());
+    txnBatch.write("name2,2,Welcome to streaming".getBytes());
+    txnBatch.write("name4,2,more Streaming unlimited".getBytes());
+    txnBatch.write("name5,2,even more Streaming unlimited".getBytes());
+    txnBatch.commit();
+    // intentionally corrupt some files
+    Path path = new Path(dbLocation);
+    Collection<String> files = FileDump.getAllFilesInPath(path, conf);
+    int readableFooter = -1;
+    for (String file : files) {
+      if (file.contains("bucket_00000")) {
+        // empty out the file
+        corruptDataFile(file, conf, Integer.MIN_VALUE);
+      } else if (file.contains("bucket_00001")) {
+        corruptDataFile(file, conf, -1);
+      } else if (file.contains("bucket_00002")) {
+        Assert.assertFalse("bucket 2 shouldn't have been created", true);
+      } else if (file.contains("bucket_00003")) {
+        corruptDataFile(file, conf, 100);
+      }
+    }
+    PrintStream origErr = System.err;
+    ByteArrayOutputStream myErr = new ByteArrayOutputStream();
+    // replace stderr and run command
+    System.setErr(new PrintStream(myErr));
+    FileDump.main(new String[]{dbLocation});
+    System.err.flush();
+    System.setErr(origErr);
+    String errDump = new String(myErr.toByteArray());
+    Assert.assertEquals(false, errDump.contains("Exception"));
+    Assert.assertEquals(true, errDump.contains("3 file(s) are corrupted"));
+    Assert.assertEquals(false, errDump.contains("is still open for writes."));
+    origErr = System.err;
+    myErr = new ByteArrayOutputStream();
+    // replace stderr and run command
+    System.setErr(new PrintStream(myErr));
+    FileDump.main(new String[]{dbLocation, "--recover", "--skip-dump"});
+    System.err.flush();
+    System.setErr(origErr);
+    errDump = new String(myErr.toByteArray());
+    Assert.assertEquals(true, errDump.contains("bucket_00000 recovered 
+    Assert.assertEquals(true, errDump.contains("No readable footers found. 
Creating empty orc file."));
+    Assert.assertEquals(true, errDump.contains("bucket_00001 recovered 
+    Assert.assertEquals(true, errDump.contains("bucket_00003 recovered 
+    Assert.assertEquals(false, errDump.contains("Exception"));
+    Assert.assertEquals(false, errDump.contains("is still open for writes."));
+    // test after recovery
+    origErr = System.err;
+    myErr = new ByteArrayOutputStream();
+    // replace stdout and run command
+    System.setErr(new PrintStream(myErr));
+    FileDump.main(new String[]{dbLocation});
+    System.err.flush();
+    System.setErr(origErr);
+    errDump = new String(myErr.toByteArray());
+    Assert.assertEquals(false, errDump.contains("Exception"));
+    Assert.assertEquals(false, errDump.contains("file(s) are corrupted"));
+    Assert.assertEquals(false, errDump.contains("is still open for writes."));
+    // after recovery there shouldn't be any *_flush_length files
+    files = FileDump.getAllFilesInPath(path, conf);
+    for (String file : files) {
+      Assert.assertEquals(false, file.contains("_flush_length"));
+    }
+    txnBatch.close();
+  }
+  private void corruptDataFile(final String file, final Configuration conf, 
final int addRemoveBytes)
+      throws Exception {
+    Path bPath = new Path(file);
+    Path cPath = new Path(bPath.getParent(), bPath.getName() + ".corrupt");
+    FileSystem fs = bPath.getFileSystem(conf);
+    FileStatus fileStatus = fs.getFileStatus(bPath);
+    int len = addRemoveBytes == Integer.MIN_VALUE ? 0 : (int) 
fileStatus.getLen() + addRemoveBytes;
+    byte[] buffer = new byte[len];
+    FSDataInputStream fdis =;
+    fdis.readFully(0, buffer, 0, (int) Math.min(fileStatus.getLen(), 
+    fdis.close();
+    FSDataOutputStream fdos = fs.create(cPath, true);
+    fdos.write(buffer, 0, buffer.length);
+    fdos.close();
+    fs.delete(bPath, false);
+    fs.rename(cPath, bPath);
+  }
+  @Test
+  public void testFileDumpCorruptSideFiles() throws Exception {
+    dropDB(msClient, dbName3);
+    // 1) Create two bucketed tables
+    String dbLocation = dbFolder.newFolder(dbName3).getCanonicalPath() + ".db";
+    dbLocation = dbLocation.replaceAll("\\\\","/"); // for windows paths
+    String[] colNames = "key1,key2,data".split(",");
+    String[] colTypes = "string,int,string".split(",");
+    String[] bucketNames = "key1,key2".split(",");
+    int bucketCount = 4;
+    createDbAndTable(driver, dbName3, tblName3, null, colNames, colTypes, 
+        , null, dbLocation, bucketCount);
+    // 2) Insert data into both tables
+    HiveEndPoint endPt = new HiveEndPoint(metaStoreURI, dbName3, tblName3, 
+    StreamingConnection connection = endPt.newConnection(false, "UT_" + 
+    DelimitedInputWriter writer = new DelimitedInputWriter(colNames,",", 
endPt, connection);
+    TransactionBatch txnBatch =  connection.fetchTransactionBatch(2, writer);
+    txnBatch.beginNextTransaction();
+    txnBatch.write("name0,1,Hello streaming".getBytes());
+    txnBatch.write("name2,2,Welcome to streaming".getBytes());
+    txnBatch.write("name4,2,more Streaming unlimited".getBytes());
+    txnBatch.write("name5,2,even more Streaming unlimited".getBytes());
+    txnBatch.write("name6,3,aHello streaming".getBytes());
+    txnBatch.commit();
+    Map<String,List<Long>> offsetMap = new HashMap<String,List<Long>>();
+    recordOffsets(conf, dbLocation, offsetMap);
+    txnBatch.beginNextTransaction();
+    txnBatch.write("name01,11,-Hello streaming".getBytes());
+    txnBatch.write("name21,21,-Welcome to streaming".getBytes());
+    txnBatch.write("name41,21,-more Streaming unlimited".getBytes());
+    txnBatch.write("name51,21,-even more Streaming unlimited".getBytes());
+    txnBatch.write("name02,12,--Hello streaming".getBytes());
+    txnBatch.write("name22,22,--Welcome to streaming".getBytes());
+    txnBatch.write("name42,22,--more Streaming unlimited".getBytes());
+    txnBatch.write("name52,22,--even more Streaming unlimited".getBytes());
+    txnBatch.write("name7,4,aWelcome to streaming".getBytes());
+    txnBatch.write("name8,5,amore Streaming unlimited".getBytes());
+    txnBatch.write("name9,6,aeven more Streaming unlimited".getBytes());
+    txnBatch.write("name10,7,bHello streaming".getBytes());
+    txnBatch.write("name11,8,bWelcome to streaming".getBytes());
+    txnBatch.write("name12,9,bmore Streaming unlimited".getBytes());
+    txnBatch.write("name13,10,beven more Streaming unlimited".getBytes());
+    txnBatch.commit();
+    recordOffsets(conf, dbLocation, offsetMap);
+    // intentionally corrupt some files
+    Path path = new Path(dbLocation);
+    Collection<String> files = FileDump.getAllFilesInPath(path, conf);
+    for (String file : files) {
+      if (file.contains("bucket_00000")) {
+        corruptSideFile(file, conf, offsetMap, "bucket_00000", -1); // corrupt 
last entry
+      } else if (file.contains("bucket_00001")) {
+        corruptSideFile(file, conf, offsetMap, "bucket_00001", 0); // empty 
out side file
+      } else if (file.contains("bucket_00002")) {
+        corruptSideFile(file, conf, offsetMap, "bucket_00002", 3); // total 3 
entries (2 valid + 1 fake)
+      } else if (file.contains("bucket_00003")) {
+        corruptSideFile(file, conf, offsetMap, "bucket_00003", 10); // total 
10 entries (2 valid + 8 fake)
+      }
+    }
+    PrintStream origErr = System.err;
+    ByteArrayOutputStream myErr = new ByteArrayOutputStream();
+    // replace stderr and run command
+    System.setErr(new PrintStream(myErr));
+    FileDump.main(new String[]{dbLocation});
+    System.err.flush();
+    System.setErr(origErr);
+    String errDump = new String(myErr.toByteArray());
+    Assert.assertEquals(true, errDump.contains("bucket_00000_flush_length 
[length: 11"));
+    Assert.assertEquals(true, errDump.contains("bucket_00001_flush_length 
[length: 0"));
+    Assert.assertEquals(true, errDump.contains("bucket_00002_flush_length 
[length: 24"));
+    Assert.assertEquals(true, errDump.contains("bucket_00003_flush_length 
[length: 80"));
+    Assert.assertEquals(false, errDump.contains("Exception"));
+    Assert.assertEquals(true, errDump.contains("4 file(s) are corrupted"));
+    Assert.assertEquals(false, errDump.contains("is still open for writes."));
+    origErr = System.err;
+    myErr = new ByteArrayOutputStream();
+    // replace stderr and run command
+    System.setErr(new PrintStream(myErr));
+    FileDump.main(new String[]{dbLocation, "--recover", "--skip-dump"});
+    System.err.flush();
+    System.setErr(origErr);
+    errDump = new String(myErr.toByteArray());
+    Assert.assertEquals(true, errDump.contains("bucket_00000 recovered 
+    Assert.assertEquals(true, errDump.contains("bucket_00001 recovered 
+    Assert.assertEquals(true, errDump.contains("bucket_00002 recovered 
+    Assert.assertEquals(true, errDump.contains("bucket_00003 recovered 
+    List<Long> offsets = offsetMap.get("bucket_00000");
+    Assert.assertEquals(true, errDump.contains("Readable footerOffsets: " + 
+    offsets = offsetMap.get("bucket_00001");
+    Assert.assertEquals(true, errDump.contains("Readable footerOffsets: " + 
+    offsets = offsetMap.get("bucket_00002");
+    Assert.assertEquals(true, errDump.contains("Readable footerOffsets: " + 
+    offsets = offsetMap.get("bucket_00003");
+    Assert.assertEquals(true, errDump.contains("Readable footerOffsets: " + 
+    Assert.assertEquals(false, errDump.contains("Exception"));
+    Assert.assertEquals(false, errDump.contains("is still open for writes."));
+    // test after recovery
+    origErr = System.err;
+    myErr = new ByteArrayOutputStream();
+    // replace stdout and run command
+    System.setErr(new PrintStream(myErr));
+    FileDump.main(new String[]{dbLocation});
+    System.err.flush();
+    System.setErr(origErr);
+    errDump = new String(myErr.toByteArray());
+    Assert.assertEquals(false, errDump.contains("Exception"));
+    Assert.assertEquals(false, errDump.contains("file(s) are corrupted"));
+    Assert.assertEquals(false, errDump.contains("is still open for writes."));
+    // after recovery there shouldn't be any *_flush_length files
+    files = FileDump.getAllFilesInPath(path, conf);
+    for (String file : files) {
+      Assert.assertEquals(false, file.contains("_flush_length"));
+    }
+    txnBatch.close();
+  }
+  private void corruptSideFile(final String file, final HiveConf conf,
+      final Map<String, List<Long>> offsetMap, final String key, final int 
+      throws IOException {
+    Path dataPath = new Path(file);
+    Path sideFilePath = OrcAcidUtils.getSideFile(dataPath);
+    Path cPath = new Path(sideFilePath.getParent(), sideFilePath.getName() + 
+    FileSystem fs = sideFilePath.getFileSystem(conf);
+    List<Long> offsets = offsetMap.get(key);
+    long lastOffset = offsets.get(offsets.size() - 1);
+    FSDataOutputStream fdos = fs.create(cPath, true);
+    // corrupt last entry
+    if (numEntries < 0) {
+      byte[] lastOffsetBytes = longToBytes(lastOffset);
+      for (int i = 0; i < offsets.size() - 1; i++) {
+        fdos.writeLong(offsets.get(i));
+      }
+      fdos.write(lastOffsetBytes, 0, 3);
+    } else if (numEntries > 0) {
+      int firstRun = Math.min(offsets.size(), numEntries);
+      // add original entries
+      for (int i=0; i < firstRun; i++) {
+        fdos.writeLong(offsets.get(i));
+      }
+      // add fake entries
+      int remaining = numEntries - firstRun;
+      for (int i = 0; i < remaining; i++) {
+        fdos.writeLong(lastOffset + ((i + 1) * 100));
+      }
+    }
+    fdos.close();
+    fs.delete(sideFilePath, false);
+    fs.rename(cPath, sideFilePath);
+  }
+  private  byte[] longToBytes(long x) {
+    ByteBuffer buffer = ByteBuffer.allocate(8);
+    buffer.putLong(x);
+    return buffer.array();
+  }
+  private void recordOffsets(final HiveConf conf, final String dbLocation,
+      final Map<String, List<Long>> offsetMap) throws IOException {
+    Path path = new Path(dbLocation);
+    Collection<String> files = FileDump.getAllFilesInPath(path, conf);
+    for (String file: files) {
+      Path bPath = new Path(file);
+      FileSystem fs = bPath.getFileSystem(conf);
+      FileStatus fileStatus = fs.getFileStatus(bPath);
+      long len = fileStatus.getLen();
+      if (file.contains("bucket_00000")) {
+        if (offsetMap.containsKey("bucket_00000")) {
+          List<Long> offsets = offsetMap.get("bucket_00000");
+          offsets.add(len);
+          offsetMap.put("bucket_00000", offsets);
+        } else {
+          List<Long> offsets = new ArrayList<Long>();
+          offsets.add(len);
+          offsetMap.put("bucket_00000", offsets);
+        }
+      } else if (file.contains("bucket_00001")) {
+        if (offsetMap.containsKey("bucket_00001")) {
+          List<Long> offsets = offsetMap.get("bucket_00001");
+          offsets.add(len);
+          offsetMap.put("bucket_00001", offsets);
+        } else {
+          List<Long> offsets = new ArrayList<Long>();
+          offsets.add(len);
+          offsetMap.put("bucket_00001", offsets);
+        }
+      } else if (file.contains("bucket_00002")) {
+        if (offsetMap.containsKey("bucket_00002")) {
+          List<Long> offsets = offsetMap.get("bucket_00002");
+          offsets.add(len);
+          offsetMap.put("bucket_00002", offsets);
+        } else {
+          List<Long> offsets = new ArrayList<Long>();
+          offsets.add(len);
+          offsetMap.put("bucket_00002", offsets);
+        }
+      } else if (file.contains("bucket_00003")) {
+        if (offsetMap.containsKey("bucket_00003")) {
+          List<Long> offsets = offsetMap.get("bucket_00003");
+          offsets.add(len);
+          offsetMap.put("bucket_00003", offsets);
+        } else {
+          List<Long> offsets = new ArrayList<Long>();
+          offsets.add(len);
+          offsetMap.put("bucket_00003", offsets);
+        }
+      }
+    }
+  }
+  @Test
+  public void testErrorHandling() throws Exception {
+    String agentInfo = "UT_" + Thread.currentThread().getName();
+    runCmdOnDriver("create database testErrors");
+    runCmdOnDriver("use testErrors");
+    runCmdOnDriver("create table T(a int, b int) clustered by (b) into 2 
buckets stored as orc TBLPROPERTIES ('transactional'='true')");
+    HiveEndPoint endPt = new HiveEndPoint(metaStoreURI, "testErrors", "T", 
+    StreamingConnection connection = endPt.newConnection(false, agentInfo);
+    DelimitedInputWriter innerWriter = new 
DelimitedInputWriter("a,b".split(","),",", endPt, connection);
+    FaultyWriter writer = new FaultyWriter(innerWriter);
+    TransactionBatch txnBatch =  connection.fetchTransactionBatch(2, writer);
+    txnBatch.close();
+    txnBatch.heartbeat();//this is no-op on closed batch
+    txnBatch.abort();//ditto
+    GetOpenTxnsInfoResponse r = msClient.showTxns();
+    Assert.assertEquals("HWM didn't match", 17, r.getTxn_high_water_mark());
+    List<TxnInfo> ti = r.getOpen_txns();
+    Assert.assertEquals("wrong status ti(0)", TxnState.ABORTED, 
+    Assert.assertEquals("wrong status ti(1)", TxnState.ABORTED, 
+    Exception expectedEx = null;
+    try {
+      txnBatch.beginNextTransaction();
+    }
+    catch(IllegalStateException ex) {
+      expectedEx = ex;
+    }
+    Assert.assertTrue("beginNextTransaction() should have failed",
+      expectedEx != null && expectedEx.getMessage().contains("has been 
+    expectedEx = null;
+    try {
+      txnBatch.write("name0,1,Hello streaming".getBytes());
+    }
+    catch(IllegalStateException ex) {
+      expectedEx = ex;
+    }
+    Assert.assertTrue("write()  should have failed",
+      expectedEx != null && expectedEx.getMessage().contains("has been 
+    expectedEx = null;
+    try {
+      txnBatch.commit();
+    }
+    catch(IllegalStateException ex) {
+      expectedEx = ex;
+    }
+    Assert.assertTrue("commit() should have failed",
+      expectedEx != null && expectedEx.getMessage().contains("has been 
+    txnBatch =  connection.fetchTransactionBatch(2, writer);
+    txnBatch.beginNextTransaction();
+    txnBatch.write("name2,2,Welcome to streaming".getBytes());
+    txnBatch.write("name4,2,more Streaming unlimited".getBytes());
+    txnBatch.write("name5,2,even more Streaming unlimited".getBytes());
+    txnBatch.commit();
+    //test toString()
+    String s = txnBatch.toString();
+    Assert.assertTrue("Actual: " + s, s.contains("LastUsed " + 
+    Assert.assertTrue("Actual: " + s, s.contains("TxnStatus[CO]"));
+    expectedEx = null;
+    txnBatch.beginNextTransaction();
+    writer.enableErrors();
+    try {
+      txnBatch.write("name6,2,Doh!".getBytes());
+    }
+    catch(StreamingIOFailure ex) {
+      expectedEx = ex;
+      txnBatch.getCurrentTransactionState();
+      txnBatch.getCurrentTxnId();//test it doesn't throw 
+    }
+    Assert.assertTrue("Wrong exception: " + (expectedEx != null ? 
expectedEx.getMessage() : "?"),
+      expectedEx != null && expectedEx.getMessage().contains("Simulated fault 
+    expectedEx = null;
+    try {
+      txnBatch.commit();
+    }
+    catch(IllegalStateException ex) {
+      expectedEx = ex;
+    }
+    Assert.assertTrue("commit() should have failed",
+      expectedEx != null && expectedEx.getMessage().contains("has been 
+    //test toString()
+    s = txnBatch.toString();
+    Assert.assertTrue("Actual: " + s, s.contains("LastUsed " + 
+    Assert.assertTrue("Actual: " + s, s.contains("TxnStatus[CA]"));
+    r = msClient.showTxns();
+    Assert.assertEquals("HWM didn't match", 19, r.getTxn_high_water_mark());
+    ti = r.getOpen_txns();
+    Assert.assertEquals("wrong status ti(0)", TxnState.ABORTED, 
+    Assert.assertEquals("wrong status ti(1)", TxnState.ABORTED, 
+    //txnid 3 was committed and thus not open
+    Assert.assertEquals("wrong status ti(2)", TxnState.ABORTED, 
+    writer.disableErrors();
+    txnBatch =  connection.fetchTransactionBatch(2, writer);
+    txnBatch.beginNextTransaction();
+    txnBatch.write("name2,2,Welcome to streaming".getBytes());
+    writer.enableErrors();
+    expectedEx = null;
+    try {


