Repository: hive
Updated Branches:
  refs/heads/master 7c4d48ec2 -> bdbd3bcff


http://git-wip-us.apache.org/repos/asf/hive/blob/bdbd3bcf/streaming/src/test/org/apache/hive/streaming/TestStreaming.java
----------------------------------------------------------------------
diff --git a/streaming/src/test/org/apache/hive/streaming/TestStreaming.java 
b/streaming/src/test/org/apache/hive/streaming/TestStreaming.java
index 8b5e508..1c9e43f 100644
--- a/streaming/src/test/org/apache/hive/streaming/TestStreaming.java
+++ b/streaming/src/test/org/apache/hive/streaming/TestStreaming.java
@@ -35,6 +35,7 @@ import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.Collection;
 import java.util.HashMap;
+import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
@@ -60,13 +61,13 @@ import 
org.apache.hadoop.hive.metastore.api.GetOpenTxnsInfoResponse;
 import org.apache.hadoop.hive.metastore.api.LockState;
 import org.apache.hadoop.hive.metastore.api.LockType;
 import org.apache.hadoop.hive.metastore.api.MetaException;
+import org.apache.hadoop.hive.metastore.api.NoSuchObjectException;
 import org.apache.hadoop.hive.metastore.api.Partition;
 import org.apache.hadoop.hive.metastore.api.ShowLocksRequest;
 import org.apache.hadoop.hive.metastore.api.ShowLocksResponse;
 import org.apache.hadoop.hive.metastore.api.ShowLocksResponseElement;
 import org.apache.hadoop.hive.metastore.api.TxnAbortedException;
 import org.apache.hadoop.hive.metastore.api.TxnInfo;
-import org.apache.hadoop.hive.metastore.api.TxnState;
 import org.apache.hadoop.hive.metastore.api.hive_metastoreConstants;
 import org.apache.hadoop.hive.metastore.txn.AcidHouseKeeperService;
 import org.apache.hadoop.hive.metastore.txn.TxnDbUtil;
@@ -82,6 +83,7 @@ import org.apache.hadoop.hive.ql.io.orc.OrcInputFormat;
 import org.apache.hadoop.hive.ql.io.orc.OrcStruct;
 import org.apache.hadoop.hive.ql.io.orc.Reader;
 import org.apache.hadoop.hive.ql.io.orc.RecordReader;
+import org.apache.hadoop.hive.ql.metadata.Table;
 import org.apache.hadoop.hive.ql.processors.CommandProcessorResponse;
 import org.apache.hadoop.hive.ql.session.SessionState;
 import org.apache.hadoop.hive.ql.txn.compactor.Worker;
@@ -150,7 +152,8 @@ public class TestStreaming {
       if (file.canExecute()) {
         mod |= 0111;
       }
-      return new FileStatus(file.length(), file.isDirectory(), 1, 1024,
+      return new FileStatus(file.length(), file.isDirectory(),
+          1, 1024,
         file.lastModified(), file.lastModified(),
         FsPermission.createImmutable(mod), "owen", "users", path);
     }
@@ -419,6 +422,123 @@ public class TestStreaming {
   }
 
   @Test
+  public void testGetDeltaPath() throws Exception {
+    StrictDelimitedInputWriter writer = StrictDelimitedInputWriter.newBuilder()
+        .withFieldDelimiter(',')
+        .build();
+    HiveStreamingConnection connection = HiveStreamingConnection.newBuilder()
+        .withDatabase(dbName)
+        .withTable(tblName)
+        .withRecordWriter(writer)
+        .withHiveConf(conf)
+        .connect();
+    Path path = connection.getDeltaFileLocation(partitionVals, 0,
+        5L, 5L, 9);
+    Assert.assertTrue(path.toString().endsWith("testing.db/alerts/continent"
+        + "=Asia/country=India/delta_0000005_0000005_0009/bucket_00000"));
+  }
+
+  @Test
+  public void testConnectionWithWriteId() throws Exception {
+    queryTable(driver, "drop table if exists default.writeidconnection");
+    queryTable(driver, "create table default.writeidconnection (a string, b 
string) stored as orc " +
+        "TBLPROPERTIES('transactional'='true')");
+    queryTable(driver, "insert into default.writeidconnection 
values('a0','bar')");
+
+    List<String> rs = queryTable(driver, "select * from 
default.writeidconnection");
+    Assert.assertEquals(1, rs.size());
+    Assert.assertEquals("a0\tbar", rs.get(0));
+
+    StrictDelimitedInputWriter writerT = 
StrictDelimitedInputWriter.newBuilder()
+        .withFieldDelimiter(',')
+        .build();
+    HiveStreamingConnection transactionConnection = 
HiveStreamingConnection.newBuilder()
+        .withDatabase("Default")
+        .withTable("writeidconnection")
+        .withRecordWriter(writerT)
+        .withHiveConf(conf)
+        .connect();
+    transactionConnection.beginTransaction();
+
+    Table tObject = transactionConnection.getTable();
+    Long writeId = transactionConnection.getCurrentWriteId();
+
+    Assert.assertNotNull(tObject);
+    Assert.assertNotNull(writeId);
+
+    StrictDelimitedInputWriter writerOne = 
StrictDelimitedInputWriter.newBuilder()
+        .withFieldDelimiter(',')
+        .build();
+    HiveStreamingConnection connectionOne = 
HiveStreamingConnection.newBuilder()
+        .withDatabase("Default")
+        .withTable("writeidconnection")
+        .withRecordWriter(writerOne)
+        .withHiveConf(conf)
+        .withWriteId(writeId)
+        .withStatementId(1)
+        .withTableObject(tObject)
+        .connect();
+
+    StrictDelimitedInputWriter writerTwo = 
StrictDelimitedInputWriter.newBuilder()
+        .withFieldDelimiter(',')
+        .build();
+    HiveStreamingConnection connectionTwo = 
HiveStreamingConnection.newBuilder()
+        .withDatabase("Default")
+        .withRecordWriter(writerTwo)
+        .withHiveConf(conf)
+        .withWriteId(writeId)
+        .withStatementId(2)
+        .withTableObject(tObject)
+        .connect();
+
+    Assert.assertNotNull(connectionOne);
+    Assert.assertNotNull(connectionTwo);
+
+    connectionOne.beginTransaction();
+    connectionTwo.beginTransaction();
+    connectionOne.write("a1,b2".getBytes());
+    connectionTwo.write("a5,b6".getBytes());
+    connectionOne.write("a3,b4".getBytes());
+    connectionOne.commitTransaction();
+    connectionTwo.commitTransaction();
+
+    Assert.assertEquals(HiveStreamingConnection.TxnState.PREPARED_FOR_COMMIT,
+        connectionOne.getCurrentTransactionState());
+    Assert.assertEquals(HiveStreamingConnection.TxnState.PREPARED_FOR_COMMIT,
+        connectionTwo.getCurrentTransactionState());
+
+    try {
+      connectionOne.beginTransaction();
+      Assert.fail("second beginTransaction should have thrown a "
+          + "StreamingException");
+    } catch (StreamingException e) {
+
+    }
+
+    connectionOne.close();
+    connectionTwo.close();
+
+    rs = queryTable(driver, "select ROW__ID, a, b, "
+        + "INPUT__FILE__NAME from default.writeidconnection order by ROW__ID");
+    // Nothing here since it hasn't been committed
+    Assert.assertEquals(1, rs.size());
+
+    transactionConnection.commitTransaction();
+
+    rs = queryTable(driver, "select ROW__ID, a, b, "
+        + "INPUT__FILE__NAME from default.writeidconnection order by a");
+    Assert.assertEquals(4, rs.size());
+    Assert.assertTrue(rs.get(0), 
rs.get(0).startsWith("{\"writeid\":1,\"bucketid\":536870912,\"rowid\":0}\ta0\tbar"));
+    Assert.assertTrue(rs.get(0), rs.get(0).endsWith("bucket_00000"));
+    Assert.assertTrue(rs.get(1), rs.get(1).contains("\"rowid\":0}\ta1\tb2"));
+    Assert.assertTrue(rs.get(1), rs.get(1).endsWith("bucket_00000"));
+    Assert.assertTrue(rs.get(2), rs.get(2).contains("\"rowid\":1}\ta3\tb4"));
+    Assert.assertTrue(rs.get(2), rs.get(2).endsWith("bucket_00000"));
+    Assert.assertTrue(rs.get(3), rs.get(3).contains("\ta5\tb6"));
+    Assert.assertTrue(rs.get(3), rs.get(3).endsWith("bucket_00000"));
+  }
+
+  @Test
   public void testAllTypesDelimitedWriter() throws Exception {
     queryTable(driver, "drop table if exists default.alltypes");
     queryTable(driver,
@@ -619,7 +739,7 @@ public class TestStreaming {
   }
 
   /**
-   * this is a clone from TestTxnStatement2....
+   * this is a clone from TestHiveStreamingConnection.TxnStatement2....
    */
   public static void runWorker(HiveConf hiveConf) throws MetaException {
     AtomicBoolean stop = new AtomicBoolean(true);
@@ -956,12 +1076,159 @@ public class TestStreaming {
     // Create partition
     Assert.assertNotNull(connection);
 
+    connection.beginTransaction();
+    connection.write("3,Hello streaming - once again".getBytes());
+    connection.commitTransaction();
+
+    // Ensure partition is present
+    Partition p = msClient.getPartition(dbName, tblName, newPartVals);
+    Assert.assertNotNull("Did not find added partition", p);
+  }
+
+  @Test
+  public void testAddPartitionWithWriteId() throws Exception {
+    List<String> newPartVals = new ArrayList<String>(2);
+    newPartVals.add("WriteId_continent");
+    newPartVals.add("WriteId_country");
+
+    StrictDelimitedInputWriter writerT = 
StrictDelimitedInputWriter.newBuilder()
+        .withFieldDelimiter(',')
+        .build();
+    HiveStreamingConnection transactionConnection = 
HiveStreamingConnection.newBuilder()
+        .withDatabase(dbName)
+        .withTable(tblName)
+        .withStaticPartitionValues(newPartVals)
+        .withRecordWriter(writerT)
+        .withHiveConf(conf)
+        .connect();
+    transactionConnection.beginTransaction();
+
+    Table tObject = transactionConnection.getTable();
+    Long writeId = transactionConnection.getCurrentWriteId();
+
+    Assert.assertNotNull(tObject);
+    Assert.assertNotNull(writeId);
+
+    StrictDelimitedInputWriter writer = StrictDelimitedInputWriter.newBuilder()
+        .withFieldDelimiter(',')
+        .build();
+    HiveStreamingConnection connection = HiveStreamingConnection.newBuilder()
+        .withDatabase(dbName)
+        .withTable(tblName)
+        .withStaticPartitionValues(newPartVals)
+        .withRecordWriter(writer)
+        .withHiveConf(conf)
+        .withWriteId(writeId)
+        .withStatementId(1)
+        .withTableObject(tObject)
+        .connect();
+
+    Assert.assertNotNull(connection);
+
+    connection.beginTransaction();
+    connection.write("3,Hello streaming - once again".getBytes());
+    connection.commitTransaction();
+
+    Set<String> partitions = new HashSet<>(connection.getPartitions());
+
+    connection.close();
+
+    // Ensure partition is not present
+    try {
+      msClient.getPartition(dbName, tblName, newPartVals);
+      Assert.fail("Partition shouldn't exist so a NoSuchObjectException should 
have been raised");
+    } catch (NoSuchObjectException e) {}
+
+    transactionConnection.commitTransactionWithPartition(partitions);
+
     // Ensure partition is present
-    Partition p = msClient.getPartition(dbName, tblName, partitionVals);
+    Partition p = msClient.getPartition(dbName, tblName, newPartVals);
     Assert.assertNotNull("Did not find added partition", p);
   }
 
   @Test
+  public void testAddDynamicPartitionWithWriteId() throws Exception {
+    queryTable(driver, "drop table if exists default.writeiddynamic");
+    queryTable(driver, "create table default.writeiddynamic (a"
+        + " string, b string) partitioned by (c string, d string)"
+        + " stored as orc TBLPROPERTIES('transactional'='true')");
+
+    StrictDelimitedInputWriter writerT =
+        
StrictDelimitedInputWriter.newBuilder().withFieldDelimiter(',').build();
+    HiveStreamingConnection transactionConnection =
+        HiveStreamingConnection.newBuilder().withDatabase("default")
+            .withTable("writeiddynamic").withRecordWriter(writerT)
+            .withHiveConf(conf).connect();
+    transactionConnection.beginTransaction();
+
+    Table tObject = transactionConnection.getTable();
+    Long writeId = transactionConnection.getCurrentWriteId();
+
+    Assert.assertNotNull(tObject);
+    Assert.assertNotNull(writeId);
+
+    StrictDelimitedInputWriter writerOne =
+        
StrictDelimitedInputWriter.newBuilder().withFieldDelimiter(',').build();
+    HiveStreamingConnection connectionOne =
+        HiveStreamingConnection.newBuilder().withDatabase("default")
+            .withTable("writeiddynamic").withRecordWriter(writerOne)
+            .withHiveConf(conf).withWriteId(writeId).withStatementId(1)
+            .withTableObject(tObject).connect();
+
+    StrictDelimitedInputWriter writerTwo =
+        
StrictDelimitedInputWriter.newBuilder().withFieldDelimiter(',').build();
+    HiveStreamingConnection connectionTwo =
+        HiveStreamingConnection.newBuilder().withDatabase("default")
+            .withTable("writeiddynamic")
+            .withRecordWriter(writerTwo)
+            .withHiveConf(conf).withWriteId(writeId).withStatementId(1)
+            .withTableObject(tObject)
+            .connect();
+
+    Assert.assertNotNull(connectionOne);
+
+    connectionTwo.beginTransaction();
+    connectionOne.beginTransaction();
+    connectionOne.write("1,2,3,4".getBytes());
+    connectionOne.write("1,2,5,6".getBytes());
+    connectionTwo.write("1,2,30,40".getBytes());
+    connectionOne.write("1,2,7,8".getBytes());
+    connectionTwo.write("1,2,50,60".getBytes());
+    connectionOne.write("1,2,9,10".getBytes());
+    connectionOne.commitTransaction();
+    connectionTwo.commitTransaction();
+
+    Set<String> partitionsOne = new HashSet<>(connectionOne.getPartitions());
+    Assert.assertEquals(4, partitionsOne.size());
+
+    Set<String> partitionsTwo = new HashSet<>(connectionTwo.getPartitions());
+    Assert.assertEquals(2, partitionsTwo.size());
+
+    connectionOne.close();
+    connectionTwo.close();
+
+    try {
+      String partitionName = partitionsOne.iterator().next();
+      msClient.getPartition("default", "writeiddynamic", partitionName);
+      Assert.fail(
+          "Partition shouldn't exist so a NoSuchObjectException should have 
been raised");
+    } catch (NoSuchObjectException e) {
+    }
+
+    partitionsOne.addAll(partitionsTwo);
+    Set<String> allPartitions = partitionsOne;
+    transactionConnection.commitTransactionWithPartition(allPartitions);
+
+    // Ensure partition is present
+    for (String partition : allPartitions) {
+      Partition p =
+          msClient.getPartition("default", "writeiddynamic",
+              partition);
+      Assert.assertNotNull("Did not find added partition", p);
+    }
+  }
+
+  @Test
   public void testTransactionBatchEmptyCommit() throws Exception {
     // 1)  to partitioned table
     StrictDelimitedInputWriter writer = StrictDelimitedInputWriter.newBuilder()
@@ -977,8 +1244,8 @@ public class TestStreaming {
       .connect();
     connection.beginTransaction();
     connection.commitTransaction();
-    Assert.assertEquals(HiveStreamingConnection.TxnState.COMMITTED
-      , connection.getCurrentTransactionState());
+    Assert.assertEquals(HiveStreamingConnection.TxnState.COMMITTED,
+        connection.getCurrentTransactionState());
     connection.close();
 
     // 2) To unpartitioned table
@@ -995,8 +1262,8 @@ public class TestStreaming {
 
     connection.beginTransaction();
     connection.commitTransaction();
-    Assert.assertEquals(HiveStreamingConnection.TxnState.COMMITTED
-      , connection.getCurrentTransactionState());
+    Assert.assertEquals(HiveStreamingConnection.TxnState.COMMITTED,
+        connection.getCurrentTransactionState());
     connection.close();
   }
 
@@ -1115,8 +1382,8 @@ public class TestStreaming {
 
     connection.beginTransaction();
     connection.abortTransaction();
-    Assert.assertEquals(HiveStreamingConnection.TxnState.ABORTED
-      , connection.getCurrentTransactionState());
+    Assert.assertEquals(HiveStreamingConnection.TxnState.ABORTED,
+        connection.getCurrentTransactionState());
     connection.close();
 
     // 2) to unpartitioned table
@@ -1133,8 +1400,8 @@ public class TestStreaming {
 
     connection.beginTransaction();
     connection.abortTransaction();
-    Assert.assertEquals(HiveStreamingConnection.TxnState.ABORTED
-      , connection.getCurrentTransactionState());
+    Assert.assertEquals(HiveStreamingConnection.TxnState.ABORTED,
+        connection.getCurrentTransactionState());
     connection.close();
   }
 
@@ -1156,20 +1423,20 @@ public class TestStreaming {
 
     // 1st Txn
     connection.beginTransaction();
-    Assert.assertEquals(HiveStreamingConnection.TxnState.OPEN
-      , connection.getCurrentTransactionState());
+    Assert.assertEquals(HiveStreamingConnection.TxnState.OPEN,
+        connection.getCurrentTransactionState());
     connection.write("1,Hello streaming".getBytes());
     connection.commitTransaction();
 
     checkDataWritten(partLoc, 1, 10, 1, 1, "{1, Hello streaming}");
 
-    Assert.assertEquals(HiveStreamingConnection.TxnState.COMMITTED
-      , connection.getCurrentTransactionState());
+    Assert.assertEquals(HiveStreamingConnection.TxnState.COMMITTED,
+        connection.getCurrentTransactionState());
 
     // 2nd Txn
     connection.beginTransaction();
-    Assert.assertEquals(HiveStreamingConnection.TxnState.OPEN
-      , connection.getCurrentTransactionState());
+    Assert.assertEquals(HiveStreamingConnection.TxnState.OPEN,
+        connection.getCurrentTransactionState());
     connection.write("2,Welcome to streaming".getBytes());
 
     // data should not be visible
@@ -1182,8 +1449,8 @@ public class TestStreaming {
 
     connection.close();
 
-    Assert.assertEquals(HiveStreamingConnection.TxnState.INACTIVE
-      , connection.getCurrentTransactionState());
+    Assert.assertEquals(HiveStreamingConnection.TxnState.INACTIVE,
+        connection.getCurrentTransactionState());
 
 
     // To Unpartitioned table
@@ -1199,13 +1466,13 @@ public class TestStreaming {
       .connect();
     // 1st Txn
     connection.beginTransaction();
-    Assert.assertEquals(HiveStreamingConnection.TxnState.OPEN
-      , connection.getCurrentTransactionState());
+    Assert.assertEquals(HiveStreamingConnection.TxnState.OPEN,
+        connection.getCurrentTransactionState());
     connection.write("1,Hello streaming".getBytes());
     connection.commitTransaction();
 
-    Assert.assertEquals(HiveStreamingConnection.TxnState.COMMITTED
-      , connection.getCurrentTransactionState());
+    Assert.assertEquals(HiveStreamingConnection.TxnState.COMMITTED,
+        connection.getCurrentTransactionState());
     connection.close();
   }
 
@@ -1227,20 +1494,20 @@ public class TestStreaming {
 
     // 1st Txn
     connection.beginTransaction();
-    Assert.assertEquals(HiveStreamingConnection.TxnState.OPEN
-      , connection.getCurrentTransactionState());
+    Assert.assertEquals(HiveStreamingConnection.TxnState.OPEN,
+        connection.getCurrentTransactionState());
     connection.write("1,Hello streaming".getBytes());
     connection.commitTransaction();
 
     checkDataWritten(partLoc, 1, 10, 1, 1, "{1, Hello streaming}");
 
-    Assert.assertEquals(HiveStreamingConnection.TxnState.COMMITTED
-      , connection.getCurrentTransactionState());
+    Assert.assertEquals(HiveStreamingConnection.TxnState.COMMITTED,
+        connection.getCurrentTransactionState());
 
     // 2nd Txn
     connection.beginTransaction();
-    Assert.assertEquals(HiveStreamingConnection.TxnState.OPEN
-      , connection.getCurrentTransactionState());
+    Assert.assertEquals(HiveStreamingConnection.TxnState.OPEN,
+        connection.getCurrentTransactionState());
     connection.write("2,Welcome to streaming".getBytes());
 
     // data should not be visible
@@ -1252,8 +1519,8 @@ public class TestStreaming {
       "{2, Welcome to streaming}");
 
     connection.close();
-    Assert.assertEquals(HiveStreamingConnection.TxnState.INACTIVE
-      , connection.getCurrentTransactionState());
+    Assert.assertEquals(HiveStreamingConnection.TxnState.INACTIVE,
+        connection.getCurrentTransactionState());
 
     // To Unpartitioned table
     regex = "([^:]*):(.*)";
@@ -1271,13 +1538,13 @@ public class TestStreaming {
 
     // 1st Txn
     connection.beginTransaction();
-    Assert.assertEquals(HiveStreamingConnection.TxnState.OPEN
-      , connection.getCurrentTransactionState());
+    Assert.assertEquals(HiveStreamingConnection.TxnState.OPEN,
+        connection.getCurrentTransactionState());
     connection.write("1:Hello streaming".getBytes());
     connection.commitTransaction();
 
-    Assert.assertEquals(HiveStreamingConnection.TxnState.COMMITTED
-      , connection.getCurrentTransactionState());
+    Assert.assertEquals(HiveStreamingConnection.TxnState.COMMITTED,
+        connection.getCurrentTransactionState());
     connection.close();
   }
 
@@ -1328,20 +1595,20 @@ public class TestStreaming {
 
     // 1st Txn
     connection.beginTransaction();
-    Assert.assertEquals(HiveStreamingConnection.TxnState.OPEN
-      , connection.getCurrentTransactionState());
+    Assert.assertEquals(HiveStreamingConnection.TxnState.OPEN,
+        connection.getCurrentTransactionState());
     String rec1 = "{\"id\" : 1, \"msg\": \"Hello streaming\"}";
     connection.write(rec1.getBytes());
     connection.commitTransaction();
 
     checkDataWritten(partLoc, 1, 10, 1, 1, "{1, Hello streaming}");
 
-    Assert.assertEquals(HiveStreamingConnection.TxnState.COMMITTED
-      , connection.getCurrentTransactionState());
+    Assert.assertEquals(HiveStreamingConnection.TxnState.COMMITTED,
+        connection.getCurrentTransactionState());
 
     connection.close();
-    Assert.assertEquals(HiveStreamingConnection.TxnState.INACTIVE
-      , connection.getCurrentTransactionState());
+    Assert.assertEquals(HiveStreamingConnection.TxnState.INACTIVE,
+        connection.getCurrentTransactionState());
 
     List<String> rs = queryTable(driver, "select * from " + dbName + "." + 
tblName);
     Assert.assertEquals(1, rs.size());
@@ -1399,20 +1666,20 @@ public class TestStreaming {
       connection.beginTransaction();
       Assert.assertEquals(--initialCount, connection.remainingTransactions());
       for (int rec = 0; rec < 2; ++rec) {
-        Assert.assertEquals(HiveStreamingConnection.TxnState.OPEN
-          , connection.getCurrentTransactionState());
+        Assert.assertEquals(HiveStreamingConnection.TxnState.OPEN,
+            connection.getCurrentTransactionState());
         connection.write((batch * rec + ",Hello streaming").getBytes());
       }
       connection.commitTransaction();
-      Assert.assertEquals(HiveStreamingConnection.TxnState.COMMITTED
-        , connection.getCurrentTransactionState());
+      Assert.assertEquals(HiveStreamingConnection.TxnState.COMMITTED,
+          connection.getCurrentTransactionState());
       ++batch;
     }
     Assert.assertEquals(0, connection.remainingTransactions());
     connection.close();
 
-    Assert.assertEquals(HiveStreamingConnection.TxnState.INACTIVE
-      , connection.getCurrentTransactionState());
+    Assert.assertEquals(HiveStreamingConnection.TxnState.INACTIVE,
+        connection.getCurrentTransactionState());
 
     connection = HiveStreamingConnection.newBuilder()
       .withDatabase(dbName)
@@ -1430,20 +1697,20 @@ public class TestStreaming {
       connection.beginTransaction();
       Assert.assertEquals(--initialCount, connection.remainingTransactions());
       for (int rec = 0; rec < 2; ++rec) {
-        Assert.assertEquals(HiveStreamingConnection.TxnState.OPEN
-          , connection.getCurrentTransactionState());
+        Assert.assertEquals(HiveStreamingConnection.TxnState.OPEN,
+            connection.getCurrentTransactionState());
         connection.write((batch * rec + ",Hello streaming").getBytes());
       }
       connection.abortTransaction();
-      Assert.assertEquals(HiveStreamingConnection.TxnState.ABORTED
-        , connection.getCurrentTransactionState());
+      Assert.assertEquals(HiveStreamingConnection.TxnState.ABORTED,
+          connection.getCurrentTransactionState());
       ++batch;
     }
     Assert.assertEquals(0, connection.remainingTransactions());
     connection.close();
 
-    Assert.assertEquals(HiveStreamingConnection.TxnState.INACTIVE
-      , connection.getCurrentTransactionState());
+    Assert.assertEquals(HiveStreamingConnection.TxnState.INACTIVE,
+        connection.getCurrentTransactionState());
   }
 
   @Test
@@ -1468,8 +1735,8 @@ public class TestStreaming {
 
     checkNothingWritten(partLoc);
 
-    Assert.assertEquals(HiveStreamingConnection.TxnState.ABORTED
-      , connection.getCurrentTransactionState());
+    Assert.assertEquals(HiveStreamingConnection.TxnState.ABORTED,
+        connection.getCurrentTransactionState());
 
     connection.close();
 
@@ -1507,8 +1774,8 @@ public class TestStreaming {
 
     checkNothingWritten(partLoc);
 
-    Assert.assertEquals(HiveStreamingConnection.TxnState.ABORTED
-      , connection.getCurrentTransactionState());
+    Assert.assertEquals(HiveStreamingConnection.TxnState.ABORTED,
+        connection.getCurrentTransactionState());
 
     connection.beginTransaction();
     connection.write("1,Hello streaming".getBytes());
@@ -1577,8 +1844,8 @@ public class TestStreaming {
       "2\tWelcome to streaming", "3\tHello streaming - once again",
       "4\tWelcome to streaming - once again");
 
-    Assert.assertEquals(HiveStreamingConnection.TxnState.COMMITTED
-      , connection.getCurrentTransactionState());
+    Assert.assertEquals(HiveStreamingConnection.TxnState.COMMITTED,
+        connection.getCurrentTransactionState());
 
     connection.close();
   }
@@ -1690,10 +1957,10 @@ public class TestStreaming {
       "3\tHello streaming - once again",
       "4\tWelcome to streaming - once again");
 
-    Assert.assertEquals(HiveStreamingConnection.TxnState.COMMITTED
-      , connection.getCurrentTransactionState());
-    Assert.assertEquals(HiveStreamingConnection.TxnState.COMMITTED
-      , connection2.getCurrentTransactionState());
+    Assert.assertEquals(HiveStreamingConnection.TxnState.COMMITTED,
+        connection.getCurrentTransactionState());
+    Assert.assertEquals(HiveStreamingConnection.TxnState.COMMITTED,
+        connection2.getCurrentTransactionState());
 
     connection.close();
     connection2.close();
@@ -2511,7 +2778,8 @@ public class TestStreaming {
   }
 
   @Test
-  public void testErrorHandling() throws Exception {
+  public void testErrorHandling()
+      throws Exception {
     String agentInfo = "UT_" + Thread.currentThread().getName();
     runCmdOnDriver("create database testErrors");
     runCmdOnDriver("use testErrors");
@@ -2538,8 +2806,12 @@ public class TestStreaming {
     GetOpenTxnsInfoResponse r = msClient.showTxns();
     Assert.assertEquals("HWM didn'table match", 17, 
r.getTxn_high_water_mark());
     List<TxnInfo> ti = r.getOpen_txns();
-    Assert.assertEquals("wrong status ti(0)", TxnState.ABORTED, 
ti.get(0).getState());
-    Assert.assertEquals("wrong status ti(1)", TxnState.ABORTED, 
ti.get(1).getState());
+    Assert.assertEquals("wrong status ti(0)",
+        org.apache.hadoop.hive.metastore.api.TxnState.ABORTED,
+        ti.get(0).getState());
+    Assert.assertEquals("wrong status ti(1)",
+        org.apache.hadoop.hive.metastore.api.TxnState.ABORTED,
+        ti.get(1).getState());
 
 
     try {
@@ -2621,10 +2893,16 @@ public class TestStreaming {
     r = msClient.showTxns();
     Assert.assertEquals("HWM didn't match", 19, r.getTxn_high_water_mark());
     ti = r.getOpen_txns();
-    Assert.assertEquals("wrong status ti(0)", TxnState.ABORTED, 
ti.get(0).getState());
-    Assert.assertEquals("wrong status ti(1)", TxnState.ABORTED, 
ti.get(1).getState());
+    Assert.assertEquals("wrong status ti(0)",
+        org.apache.hadoop.hive.metastore.api.TxnState.ABORTED,
+        ti.get(0).getState());
+    Assert.assertEquals("wrong status ti(1)",
+        org.apache.hadoop.hive.metastore.api.TxnState.ABORTED,
+        ti.get(1).getState());
     //txnid 3 was committed and thus not open
-    Assert.assertEquals("wrong status ti(2)", TxnState.ABORTED, 
ti.get(2).getState());
+    Assert.assertEquals("wrong status ti(2)",
+        org.apache.hadoop.hive.metastore.api.TxnState.ABORTED,
+        ti.get(2).getState());
     connection.close();
 
     writer.disableErrors();
@@ -2651,8 +2929,12 @@ public class TestStreaming {
     r = msClient.showTxns();
     Assert.assertEquals("HWM didn'table match", 21, 
r.getTxn_high_water_mark());
     ti = r.getOpen_txns();
-    Assert.assertEquals("wrong status ti(3)", TxnState.ABORTED, 
ti.get(3).getState());
-    Assert.assertEquals("wrong status ti(4)", TxnState.ABORTED, 
ti.get(4).getState());
+    Assert.assertEquals("wrong status ti(3)",
+        org.apache.hadoop.hive.metastore.api.TxnState.ABORTED,
+        ti.get(3).getState());
+    Assert.assertEquals("wrong status ti(4)",
+        org.apache.hadoop.hive.metastore.api.TxnState.ABORTED,
+        ti.get(4).getState());
   }
 
   // assumes un partitioned table
@@ -2953,5 +3235,12 @@ public class TestStreaming {
     void disableErrors() {
       shouldThrow = false;
     }
+
+    @Override
+    public Path getDeltaFileLocation(List<String> partitionValues,
+        Integer bucketId, Long minWriteId, Long maxWriteId, Integer 
statementId,
+        Table table) throws StreamingException {
+      return null;
+    }
   }
 }

Reply via email to