Repository: hive Updated Branches: refs/heads/master 7c4d48ec2 -> bdbd3bcff
http://git-wip-us.apache.org/repos/asf/hive/blob/bdbd3bcf/streaming/src/test/org/apache/hive/streaming/TestStreaming.java ---------------------------------------------------------------------- diff --git a/streaming/src/test/org/apache/hive/streaming/TestStreaming.java b/streaming/src/test/org/apache/hive/streaming/TestStreaming.java index 8b5e508..1c9e43f 100644 --- a/streaming/src/test/org/apache/hive/streaming/TestStreaming.java +++ b/streaming/src/test/org/apache/hive/streaming/TestStreaming.java @@ -35,6 +35,7 @@ import java.util.ArrayList; import java.util.Arrays; import java.util.Collection; import java.util.HashMap; +import java.util.HashSet; import java.util.List; import java.util.Map; import java.util.Set; @@ -60,13 +61,13 @@ import org.apache.hadoop.hive.metastore.api.GetOpenTxnsInfoResponse; import org.apache.hadoop.hive.metastore.api.LockState; import org.apache.hadoop.hive.metastore.api.LockType; import org.apache.hadoop.hive.metastore.api.MetaException; +import org.apache.hadoop.hive.metastore.api.NoSuchObjectException; import org.apache.hadoop.hive.metastore.api.Partition; import org.apache.hadoop.hive.metastore.api.ShowLocksRequest; import org.apache.hadoop.hive.metastore.api.ShowLocksResponse; import org.apache.hadoop.hive.metastore.api.ShowLocksResponseElement; import org.apache.hadoop.hive.metastore.api.TxnAbortedException; import org.apache.hadoop.hive.metastore.api.TxnInfo; -import org.apache.hadoop.hive.metastore.api.TxnState; import org.apache.hadoop.hive.metastore.api.hive_metastoreConstants; import org.apache.hadoop.hive.metastore.txn.AcidHouseKeeperService; import org.apache.hadoop.hive.metastore.txn.TxnDbUtil; @@ -82,6 +83,7 @@ import org.apache.hadoop.hive.ql.io.orc.OrcInputFormat; import org.apache.hadoop.hive.ql.io.orc.OrcStruct; import org.apache.hadoop.hive.ql.io.orc.Reader; import org.apache.hadoop.hive.ql.io.orc.RecordReader; +import org.apache.hadoop.hive.ql.metadata.Table; import org.apache.hadoop.hive.ql.processors.CommandProcessorResponse; import org.apache.hadoop.hive.ql.session.SessionState; import org.apache.hadoop.hive.ql.txn.compactor.Worker; @@ -150,7 +152,8 @@ public class TestStreaming { if (file.canExecute()) { mod |= 0111; } - return new FileStatus(file.length(), file.isDirectory(), 1, 1024, + return new FileStatus(file.length(), file.isDirectory(), + 1, 1024, file.lastModified(), file.lastModified(), FsPermission.createImmutable(mod), "owen", "users", path); } @@ -419,6 +422,123 @@ public class TestStreaming { } @Test + public void testGetDeltaPath() throws Exception { + StrictDelimitedInputWriter writer = StrictDelimitedInputWriter.newBuilder() + .withFieldDelimiter(',') + .build(); + HiveStreamingConnection connection = HiveStreamingConnection.newBuilder() + .withDatabase(dbName) + .withTable(tblName) + .withRecordWriter(writer) + .withHiveConf(conf) + .connect(); + Path path = connection.getDeltaFileLocation(partitionVals, 0, + 5L, 5L, 9); + Assert.assertTrue(path.toString().endsWith("testing.db/alerts/continent" + + "=Asia/country=India/delta_0000005_0000005_0009/bucket_00000")); + } + + @Test + public void testConnectionWithWriteId() throws Exception { + queryTable(driver, "drop table if exists default.writeidconnection"); + queryTable(driver, "create table default.writeidconnection (a string, b string) stored as orc " + + "TBLPROPERTIES('transactional'='true')"); + queryTable(driver, "insert into default.writeidconnection values('a0','bar')"); + + List<String> rs = queryTable(driver, "select * from default.writeidconnection"); + Assert.assertEquals(1, rs.size()); + Assert.assertEquals("a0\tbar", rs.get(0)); + + StrictDelimitedInputWriter writerT = StrictDelimitedInputWriter.newBuilder() + .withFieldDelimiter(',') + .build(); + HiveStreamingConnection transactionConnection = HiveStreamingConnection.newBuilder() + .withDatabase("Default") + .withTable("writeidconnection") + .withRecordWriter(writerT) + .withHiveConf(conf) + .connect(); + transactionConnection.beginTransaction(); + + Table tObject = transactionConnection.getTable(); + Long writeId = transactionConnection.getCurrentWriteId(); + + Assert.assertNotNull(tObject); + Assert.assertNotNull(writeId); + + StrictDelimitedInputWriter writerOne = StrictDelimitedInputWriter.newBuilder() + .withFieldDelimiter(',') + .build(); + HiveStreamingConnection connectionOne = HiveStreamingConnection.newBuilder() + .withDatabase("Default") + .withTable("writeidconnection") + .withRecordWriter(writerOne) + .withHiveConf(conf) + .withWriteId(writeId) + .withStatementId(1) + .withTableObject(tObject) + .connect(); + + StrictDelimitedInputWriter writerTwo = StrictDelimitedInputWriter.newBuilder() + .withFieldDelimiter(',') + .build(); + HiveStreamingConnection connectionTwo = HiveStreamingConnection.newBuilder() + .withDatabase("Default") + .withRecordWriter(writerTwo) + .withHiveConf(conf) + .withWriteId(writeId) + .withStatementId(2) + .withTableObject(tObject) + .connect(); + + Assert.assertNotNull(connectionOne); + Assert.assertNotNull(connectionTwo); + + connectionOne.beginTransaction(); + connectionTwo.beginTransaction(); + connectionOne.write("a1,b2".getBytes()); + connectionTwo.write("a5,b6".getBytes()); + connectionOne.write("a3,b4".getBytes()); + connectionOne.commitTransaction(); + connectionTwo.commitTransaction(); + + Assert.assertEquals(HiveStreamingConnection.TxnState.PREPARED_FOR_COMMIT, + connectionOne.getCurrentTransactionState()); + Assert.assertEquals(HiveStreamingConnection.TxnState.PREPARED_FOR_COMMIT, + connectionTwo.getCurrentTransactionState()); + + try { + connectionOne.beginTransaction(); + Assert.fail("second beginTransaction should have thrown a " + + "StreamingException"); + } catch (StreamingException e) { + + } + + connectionOne.close(); + connectionTwo.close(); + + rs = queryTable(driver, "select ROW__ID, a, b, " + + "INPUT__FILE__NAME from default.writeidconnection order by ROW__ID"); + // Nothing here since it hasn't been committed + Assert.assertEquals(1, rs.size()); + + transactionConnection.commitTransaction(); + + rs = queryTable(driver, "select ROW__ID, a, b, " + + "INPUT__FILE__NAME from default.writeidconnection order by a"); + Assert.assertEquals(4, rs.size()); + Assert.assertTrue(rs.get(0), rs.get(0).startsWith("{\"writeid\":1,\"bucketid\":536870912,\"rowid\":0}\ta0\tbar")); + Assert.assertTrue(rs.get(0), rs.get(0).endsWith("bucket_00000")); + Assert.assertTrue(rs.get(1), rs.get(1).contains("\"rowid\":0}\ta1\tb2")); + Assert.assertTrue(rs.get(1), rs.get(1).endsWith("bucket_00000")); + Assert.assertTrue(rs.get(2), rs.get(2).contains("\"rowid\":1}\ta3\tb4")); + Assert.assertTrue(rs.get(2), rs.get(2).endsWith("bucket_00000")); + Assert.assertTrue(rs.get(3), rs.get(3).contains("\ta5\tb6")); + Assert.assertTrue(rs.get(3), rs.get(3).endsWith("bucket_00000")); + } + + @Test public void testAllTypesDelimitedWriter() throws Exception { queryTable(driver, "drop table if exists default.alltypes"); queryTable(driver, @@ -619,7 +739,7 @@ public class TestStreaming { } /** - * this is a clone from TestTxnStatement2.... + * this is a clone from TestHiveStreamingConnection.TxnStatement2.... */ public static void runWorker(HiveConf hiveConf) throws MetaException { AtomicBoolean stop = new AtomicBoolean(true); @@ -956,12 +1076,159 @@ public class TestStreaming { // Create partition Assert.assertNotNull(connection); + connection.beginTransaction(); + connection.write("3,Hello streaming - once again".getBytes()); + connection.commitTransaction(); + + // Ensure partition is present + Partition p = msClient.getPartition(dbName, tblName, newPartVals); + Assert.assertNotNull("Did not find added partition", p); + } + + @Test + public void testAddPartitionWithWriteId() throws Exception { + List<String> newPartVals = new ArrayList<String>(2); + newPartVals.add("WriteId_continent"); + newPartVals.add("WriteId_country"); + + StrictDelimitedInputWriter writerT = StrictDelimitedInputWriter.newBuilder() + .withFieldDelimiter(',') + .build(); + HiveStreamingConnection transactionConnection = HiveStreamingConnection.newBuilder() + .withDatabase(dbName) + .withTable(tblName) + .withStaticPartitionValues(newPartVals) + .withRecordWriter(writerT) + .withHiveConf(conf) + .connect(); + transactionConnection.beginTransaction(); + + Table tObject = transactionConnection.getTable(); + Long writeId = transactionConnection.getCurrentWriteId(); + + Assert.assertNotNull(tObject); + Assert.assertNotNull(writeId); + + StrictDelimitedInputWriter writer = StrictDelimitedInputWriter.newBuilder() + .withFieldDelimiter(',') + .build(); + HiveStreamingConnection connection = HiveStreamingConnection.newBuilder() + .withDatabase(dbName) + .withTable(tblName) + .withStaticPartitionValues(newPartVals) + .withRecordWriter(writer) + .withHiveConf(conf) + .withWriteId(writeId) + .withStatementId(1) + .withTableObject(tObject) + .connect(); + + Assert.assertNotNull(connection); + + connection.beginTransaction(); + connection.write("3,Hello streaming - once again".getBytes()); + connection.commitTransaction(); + + Set<String> partitions = new HashSet<>(connection.getPartitions()); + + connection.close(); + + // Ensure partition is not present + try { + msClient.getPartition(dbName, tblName, newPartVals); + Assert.fail("Partition shouldn't exist so a NoSuchObjectException should have been raised"); + } catch (NoSuchObjectException e) {} + + transactionConnection.commitTransactionWithPartition(partitions); + // Ensure partition is present - Partition p = msClient.getPartition(dbName, tblName, partitionVals); + Partition p = msClient.getPartition(dbName, tblName, newPartVals); Assert.assertNotNull("Did not find added partition", p); } @Test + public void testAddDynamicPartitionWithWriteId() throws Exception { + queryTable(driver, "drop table if exists default.writeiddynamic"); + queryTable(driver, "create table default.writeiddynamic (a" + + " string, b string) partitioned by (c string, d string)" + + " stored as orc TBLPROPERTIES('transactional'='true')"); + + StrictDelimitedInputWriter writerT = + StrictDelimitedInputWriter.newBuilder().withFieldDelimiter(',').build(); + HiveStreamingConnection transactionConnection = + HiveStreamingConnection.newBuilder().withDatabase("default") + .withTable("writeiddynamic").withRecordWriter(writerT) + .withHiveConf(conf).connect(); + transactionConnection.beginTransaction(); + + Table tObject = transactionConnection.getTable(); + Long writeId = transactionConnection.getCurrentWriteId(); + + Assert.assertNotNull(tObject); + Assert.assertNotNull(writeId); + + StrictDelimitedInputWriter writerOne = + StrictDelimitedInputWriter.newBuilder().withFieldDelimiter(',').build(); + HiveStreamingConnection connectionOne = + HiveStreamingConnection.newBuilder().withDatabase("default") + .withTable("writeiddynamic").withRecordWriter(writerOne) + .withHiveConf(conf).withWriteId(writeId).withStatementId(1) + .withTableObject(tObject).connect(); + + StrictDelimitedInputWriter writerTwo = + StrictDelimitedInputWriter.newBuilder().withFieldDelimiter(',').build(); + HiveStreamingConnection connectionTwo = + HiveStreamingConnection.newBuilder().withDatabase("default") + .withTable("writeiddynamic") + .withRecordWriter(writerTwo) + .withHiveConf(conf).withWriteId(writeId).withStatementId(1) + .withTableObject(tObject) + .connect(); + + Assert.assertNotNull(connectionOne); + + connectionTwo.beginTransaction(); + connectionOne.beginTransaction(); + connectionOne.write("1,2,3,4".getBytes()); + connectionOne.write("1,2,5,6".getBytes()); + connectionTwo.write("1,2,30,40".getBytes()); + connectionOne.write("1,2,7,8".getBytes()); + connectionTwo.write("1,2,50,60".getBytes()); + connectionOne.write("1,2,9,10".getBytes()); + connectionOne.commitTransaction(); + connectionTwo.commitTransaction(); + + Set<String> partitionsOne = new HashSet<>(connectionOne.getPartitions()); + Assert.assertEquals(4, partitionsOne.size()); + + Set<String> partitionsTwo = new HashSet<>(connectionTwo.getPartitions()); + Assert.assertEquals(2, partitionsTwo.size()); + + connectionOne.close(); + connectionTwo.close(); + + try { + String partitionName = partitionsOne.iterator().next(); + msClient.getPartition("default", "writeiddynamic", partitionName); + Assert.fail( + "Partition shouldn't exist so a NoSuchObjectException should have been raised"); + } catch (NoSuchObjectException e) { + } + + partitionsOne.addAll(partitionsTwo); + Set<String> allPartitions = partitionsOne; + transactionConnection.commitTransactionWithPartition(allPartitions); + + // Ensure partition is present + for (String partition : allPartitions) { + Partition p = + msClient.getPartition("default", "writeiddynamic", + partition); + Assert.assertNotNull("Did not find added partition", p); + } + } + + @Test public void testTransactionBatchEmptyCommit() throws Exception { // 1) to partitioned table StrictDelimitedInputWriter writer = StrictDelimitedInputWriter.newBuilder() @@ -977,8 +1244,8 @@ public class TestStreaming { .connect(); connection.beginTransaction(); connection.commitTransaction(); - Assert.assertEquals(HiveStreamingConnection.TxnState.COMMITTED - , connection.getCurrentTransactionState()); + Assert.assertEquals(HiveStreamingConnection.TxnState.COMMITTED, + connection.getCurrentTransactionState()); connection.close(); // 2) To unpartitioned table @@ -995,8 +1262,8 @@ public class TestStreaming { connection.beginTransaction(); connection.commitTransaction(); - Assert.assertEquals(HiveStreamingConnection.TxnState.COMMITTED - , connection.getCurrentTransactionState()); + Assert.assertEquals(HiveStreamingConnection.TxnState.COMMITTED, + connection.getCurrentTransactionState()); connection.close(); } @@ -1115,8 +1382,8 @@ public class TestStreaming { connection.beginTransaction(); connection.abortTransaction(); - Assert.assertEquals(HiveStreamingConnection.TxnState.ABORTED - , connection.getCurrentTransactionState()); + Assert.assertEquals(HiveStreamingConnection.TxnState.ABORTED, + connection.getCurrentTransactionState()); connection.close(); // 2) to unpartitioned table @@ -1133,8 +1400,8 @@ public class TestStreaming { connection.beginTransaction(); connection.abortTransaction(); - Assert.assertEquals(HiveStreamingConnection.TxnState.ABORTED - , connection.getCurrentTransactionState()); + Assert.assertEquals(HiveStreamingConnection.TxnState.ABORTED, + connection.getCurrentTransactionState()); connection.close(); } @@ -1156,20 +1423,20 @@ public class TestStreaming { // 1st Txn connection.beginTransaction(); - Assert.assertEquals(HiveStreamingConnection.TxnState.OPEN - , connection.getCurrentTransactionState()); + Assert.assertEquals(HiveStreamingConnection.TxnState.OPEN, + connection.getCurrentTransactionState()); connection.write("1,Hello streaming".getBytes()); connection.commitTransaction(); checkDataWritten(partLoc, 1, 10, 1, 1, "{1, Hello streaming}"); - Assert.assertEquals(HiveStreamingConnection.TxnState.COMMITTED - , connection.getCurrentTransactionState()); + Assert.assertEquals(HiveStreamingConnection.TxnState.COMMITTED, + connection.getCurrentTransactionState()); // 2nd Txn connection.beginTransaction(); - Assert.assertEquals(HiveStreamingConnection.TxnState.OPEN - , connection.getCurrentTransactionState()); + Assert.assertEquals(HiveStreamingConnection.TxnState.OPEN, + connection.getCurrentTransactionState()); connection.write("2,Welcome to streaming".getBytes()); // data should not be visible @@ -1182,8 +1449,8 @@ public class TestStreaming { connection.close(); - Assert.assertEquals(HiveStreamingConnection.TxnState.INACTIVE - , connection.getCurrentTransactionState()); + Assert.assertEquals(HiveStreamingConnection.TxnState.INACTIVE, + connection.getCurrentTransactionState()); // To Unpartitioned table @@ -1199,13 +1466,13 @@ public class TestStreaming { .connect(); // 1st Txn connection.beginTransaction(); - Assert.assertEquals(HiveStreamingConnection.TxnState.OPEN - , connection.getCurrentTransactionState()); + Assert.assertEquals(HiveStreamingConnection.TxnState.OPEN, + connection.getCurrentTransactionState()); connection.write("1,Hello streaming".getBytes()); connection.commitTransaction(); - Assert.assertEquals(HiveStreamingConnection.TxnState.COMMITTED - , connection.getCurrentTransactionState()); + Assert.assertEquals(HiveStreamingConnection.TxnState.COMMITTED, + connection.getCurrentTransactionState()); connection.close(); } @@ -1227,20 +1494,20 @@ public class TestStreaming { // 1st Txn connection.beginTransaction(); - Assert.assertEquals(HiveStreamingConnection.TxnState.OPEN - , connection.getCurrentTransactionState()); + Assert.assertEquals(HiveStreamingConnection.TxnState.OPEN, + connection.getCurrentTransactionState()); connection.write("1,Hello streaming".getBytes()); connection.commitTransaction(); checkDataWritten(partLoc, 1, 10, 1, 1, "{1, Hello streaming}"); - Assert.assertEquals(HiveStreamingConnection.TxnState.COMMITTED - , connection.getCurrentTransactionState()); + Assert.assertEquals(HiveStreamingConnection.TxnState.COMMITTED, + connection.getCurrentTransactionState()); // 2nd Txn connection.beginTransaction(); - Assert.assertEquals(HiveStreamingConnection.TxnState.OPEN - , connection.getCurrentTransactionState()); + Assert.assertEquals(HiveStreamingConnection.TxnState.OPEN, + connection.getCurrentTransactionState()); connection.write("2,Welcome to streaming".getBytes()); // data should not be visible @@ -1252,8 +1519,8 @@ public class TestStreaming { "{2, Welcome to streaming}"); connection.close(); - Assert.assertEquals(HiveStreamingConnection.TxnState.INACTIVE - , connection.getCurrentTransactionState()); + Assert.assertEquals(HiveStreamingConnection.TxnState.INACTIVE, + connection.getCurrentTransactionState()); // To Unpartitioned table regex = "([^:]*):(.*)"; @@ -1271,13 +1538,13 @@ public class TestStreaming { // 1st Txn connection.beginTransaction(); - Assert.assertEquals(HiveStreamingConnection.TxnState.OPEN - , connection.getCurrentTransactionState()); + Assert.assertEquals(HiveStreamingConnection.TxnState.OPEN, + connection.getCurrentTransactionState()); connection.write("1:Hello streaming".getBytes()); connection.commitTransaction(); - Assert.assertEquals(HiveStreamingConnection.TxnState.COMMITTED - , connection.getCurrentTransactionState()); + Assert.assertEquals(HiveStreamingConnection.TxnState.COMMITTED, + connection.getCurrentTransactionState()); connection.close(); } @@ -1328,20 +1595,20 @@ public class TestStreaming { // 1st Txn connection.beginTransaction(); - Assert.assertEquals(HiveStreamingConnection.TxnState.OPEN - , connection.getCurrentTransactionState()); + Assert.assertEquals(HiveStreamingConnection.TxnState.OPEN, + connection.getCurrentTransactionState()); String rec1 = "{\"id\" : 1, \"msg\": \"Hello streaming\"}"; connection.write(rec1.getBytes()); connection.commitTransaction(); checkDataWritten(partLoc, 1, 10, 1, 1, "{1, Hello streaming}"); - Assert.assertEquals(HiveStreamingConnection.TxnState.COMMITTED - , connection.getCurrentTransactionState()); + Assert.assertEquals(HiveStreamingConnection.TxnState.COMMITTED, + connection.getCurrentTransactionState()); connection.close(); - Assert.assertEquals(HiveStreamingConnection.TxnState.INACTIVE - , connection.getCurrentTransactionState()); + Assert.assertEquals(HiveStreamingConnection.TxnState.INACTIVE, + connection.getCurrentTransactionState()); List<String> rs = queryTable(driver, "select * from " + dbName + "." + tblName); Assert.assertEquals(1, rs.size()); @@ -1399,20 +1666,20 @@ public class TestStreaming { connection.beginTransaction(); Assert.assertEquals(--initialCount, connection.remainingTransactions()); for (int rec = 0; rec < 2; ++rec) { - Assert.assertEquals(HiveStreamingConnection.TxnState.OPEN - , connection.getCurrentTransactionState()); + Assert.assertEquals(HiveStreamingConnection.TxnState.OPEN, + connection.getCurrentTransactionState()); connection.write((batch * rec + ",Hello streaming").getBytes()); } connection.commitTransaction(); - Assert.assertEquals(HiveStreamingConnection.TxnState.COMMITTED - , connection.getCurrentTransactionState()); + Assert.assertEquals(HiveStreamingConnection.TxnState.COMMITTED, + connection.getCurrentTransactionState()); ++batch; } Assert.assertEquals(0, connection.remainingTransactions()); connection.close(); - Assert.assertEquals(HiveStreamingConnection.TxnState.INACTIVE - , connection.getCurrentTransactionState()); + Assert.assertEquals(HiveStreamingConnection.TxnState.INACTIVE, + connection.getCurrentTransactionState()); connection = HiveStreamingConnection.newBuilder() .withDatabase(dbName) @@ -1430,20 +1697,20 @@ public class TestStreaming { connection.beginTransaction(); Assert.assertEquals(--initialCount, connection.remainingTransactions()); for (int rec = 0; rec < 2; ++rec) { - Assert.assertEquals(HiveStreamingConnection.TxnState.OPEN - , connection.getCurrentTransactionState()); + Assert.assertEquals(HiveStreamingConnection.TxnState.OPEN, + connection.getCurrentTransactionState()); connection.write((batch * rec + ",Hello streaming").getBytes()); } connection.abortTransaction(); - Assert.assertEquals(HiveStreamingConnection.TxnState.ABORTED - , connection.getCurrentTransactionState()); + Assert.assertEquals(HiveStreamingConnection.TxnState.ABORTED, + connection.getCurrentTransactionState()); ++batch; } Assert.assertEquals(0, connection.remainingTransactions()); connection.close(); - Assert.assertEquals(HiveStreamingConnection.TxnState.INACTIVE - , connection.getCurrentTransactionState()); + Assert.assertEquals(HiveStreamingConnection.TxnState.INACTIVE, + connection.getCurrentTransactionState()); } @Test @@ -1468,8 +1735,8 @@ public class TestStreaming { checkNothingWritten(partLoc); - Assert.assertEquals(HiveStreamingConnection.TxnState.ABORTED - , connection.getCurrentTransactionState()); + Assert.assertEquals(HiveStreamingConnection.TxnState.ABORTED, + connection.getCurrentTransactionState()); connection.close(); @@ -1507,8 +1774,8 @@ public class TestStreaming { checkNothingWritten(partLoc); - Assert.assertEquals(HiveStreamingConnection.TxnState.ABORTED - , connection.getCurrentTransactionState()); + Assert.assertEquals(HiveStreamingConnection.TxnState.ABORTED, + connection.getCurrentTransactionState()); connection.beginTransaction(); connection.write("1,Hello streaming".getBytes()); @@ -1577,8 +1844,8 @@ public class TestStreaming { "2\tWelcome to streaming", "3\tHello streaming - once again", "4\tWelcome to streaming - once again"); - Assert.assertEquals(HiveStreamingConnection.TxnState.COMMITTED - , connection.getCurrentTransactionState()); + Assert.assertEquals(HiveStreamingConnection.TxnState.COMMITTED, + connection.getCurrentTransactionState()); connection.close(); } @@ -1690,10 +1957,10 @@ public class TestStreaming { "3\tHello streaming - once again", "4\tWelcome to streaming - once again"); - Assert.assertEquals(HiveStreamingConnection.TxnState.COMMITTED - , connection.getCurrentTransactionState()); - Assert.assertEquals(HiveStreamingConnection.TxnState.COMMITTED - , connection2.getCurrentTransactionState()); + Assert.assertEquals(HiveStreamingConnection.TxnState.COMMITTED, + connection.getCurrentTransactionState()); + Assert.assertEquals(HiveStreamingConnection.TxnState.COMMITTED, + connection2.getCurrentTransactionState()); connection.close(); connection2.close(); @@ -2511,7 +2778,8 @@ public class TestStreaming { } @Test - public void testErrorHandling() throws Exception { + public void testErrorHandling() + throws Exception { String agentInfo = "UT_" + Thread.currentThread().getName(); runCmdOnDriver("create database testErrors"); runCmdOnDriver("use testErrors"); @@ -2538,8 +2806,12 @@ public class TestStreaming { GetOpenTxnsInfoResponse r = msClient.showTxns(); Assert.assertEquals("HWM didn'table match", 17, r.getTxn_high_water_mark()); List<TxnInfo> ti = r.getOpen_txns(); - Assert.assertEquals("wrong status ti(0)", TxnState.ABORTED, ti.get(0).getState()); - Assert.assertEquals("wrong status ti(1)", TxnState.ABORTED, ti.get(1).getState()); + Assert.assertEquals("wrong status ti(0)", + org.apache.hadoop.hive.metastore.api.TxnState.ABORTED, + ti.get(0).getState()); + Assert.assertEquals("wrong status ti(1)", + org.apache.hadoop.hive.metastore.api.TxnState.ABORTED, + ti.get(1).getState()); try { @@ -2621,10 +2893,16 @@ public class TestStreaming { r = msClient.showTxns(); Assert.assertEquals("HWM didn't match", 19, r.getTxn_high_water_mark()); ti = r.getOpen_txns(); - Assert.assertEquals("wrong status ti(0)", TxnState.ABORTED, ti.get(0).getState()); - Assert.assertEquals("wrong status ti(1)", TxnState.ABORTED, ti.get(1).getState()); + Assert.assertEquals("wrong status ti(0)", + org.apache.hadoop.hive.metastore.api.TxnState.ABORTED, + ti.get(0).getState()); + Assert.assertEquals("wrong status ti(1)", + org.apache.hadoop.hive.metastore.api.TxnState.ABORTED, + ti.get(1).getState()); //txnid 3 was committed and thus not open - Assert.assertEquals("wrong status ti(2)", TxnState.ABORTED, ti.get(2).getState()); + Assert.assertEquals("wrong status ti(2)", + org.apache.hadoop.hive.metastore.api.TxnState.ABORTED, + ti.get(2).getState()); connection.close(); writer.disableErrors(); @@ -2651,8 +2929,12 @@ public class TestStreaming { r = msClient.showTxns(); Assert.assertEquals("HWM didn'table match", 21, r.getTxn_high_water_mark()); ti = r.getOpen_txns(); - Assert.assertEquals("wrong status ti(3)", TxnState.ABORTED, ti.get(3).getState()); - Assert.assertEquals("wrong status ti(4)", TxnState.ABORTED, ti.get(4).getState()); + Assert.assertEquals("wrong status ti(3)", + org.apache.hadoop.hive.metastore.api.TxnState.ABORTED, + ti.get(3).getState()); + Assert.assertEquals("wrong status ti(4)", + org.apache.hadoop.hive.metastore.api.TxnState.ABORTED, + ti.get(4).getState()); } // assumes un partitioned table @@ -2953,5 +3235,12 @@ public class TestStreaming { void disableErrors() { shouldThrow = false; } + + @Override + public Path getDeltaFileLocation(List<String> partitionValues, + Integer bucketId, Long minWriteId, Long maxWriteId, Integer statementId, + Table table) throws StreamingException { + return null; + } } }
