Repository: hive Updated Branches: refs/heads/branch-1 6c6583274 -> c0b532fce
http://git-wip-us.apache.org/repos/asf/hive/blob/c0b532fc/ql/src/test/org/apache/hadoop/hive/ql/lockmgr/TestDbTxnManager2.java ---------------------------------------------------------------------- diff --git a/ql/src/test/org/apache/hadoop/hive/ql/lockmgr/TestDbTxnManager2.java b/ql/src/test/org/apache/hadoop/hive/ql/lockmgr/TestDbTxnManager2.java index 0770298..584cd45 100644 --- a/ql/src/test/org/apache/hadoop/hive/ql/lockmgr/TestDbTxnManager2.java +++ b/ql/src/test/org/apache/hadoop/hive/ql/lockmgr/TestDbTxnManager2.java @@ -18,6 +18,7 @@ package org.apache.hadoop.hive.ql.lockmgr; import org.apache.hadoop.hive.metastore.api.AddDynamicPartitions; +import org.apache.hadoop.hive.metastore.api.DataOperationType; import org.apache.hadoop.hive.metastore.txn.TxnStore; import org.apache.hadoop.hive.metastore.txn.TxnUtils; import org.apache.hadoop.hive.ql.TestTxnCommands2; @@ -854,8 +855,10 @@ public class TestDbTxnManager2 { checkLock(LockType.SHARED_WRITE, LockState.ACQUIRED, "default", "TAB2", null, locks.get(1)); //update stmt has p=blah, thus nothing is actually update and we generate empty dyn part list Assert.assertEquals(0, TxnDbUtil.countQueryAgent("select count(*) from WRITE_SET")); - txnHandler.addDynamicPartitions(new AddDynamicPartitions(txnMgr2.getCurrentTxnId(), - "default", "tab2", Collections.EMPTY_LIST)); + AddDynamicPartitions adp = new AddDynamicPartitions(txnMgr2.getCurrentTxnId(), + "default", "tab2", Collections.EMPTY_LIST); + adp.setOperationType(DataOperationType.UPDATE); + txnHandler.addDynamicPartitions(adp); txnMgr2.commitTxn(); //Short Running updated nothing, so we expect 0 rows in WRITE_SET Assert.assertEquals( 0, TxnDbUtil.countQueryAgent("select count(*) from WRITE_SET")); @@ -869,8 +872,10 @@ public class TestDbTxnManager2 { checkLock(LockType.SHARED_WRITE, LockState.ACQUIRED, "default", "TAB2", null, locks.get(1));//since TAB2 is empty //update stmt has p=blah, thus nothing is actually update and we generate empty dyn part list Assert.assertEquals(0, TxnDbUtil.countQueryAgent("select count(*) from WRITE_SET")); - txnHandler.addDynamicPartitions(new AddDynamicPartitions(txnMgr2.getCurrentTxnId(), - "default", "tab2", Collections.singletonList("p=two")));//simulate partition update + adp = new AddDynamicPartitions(txnMgr2.getCurrentTxnId(), + "default", "tab2", Collections.singletonList("p=two")); + adp.setOperationType(DataOperationType.UPDATE); + txnHandler.addDynamicPartitions(adp);//simulate partition update txnMgr2.commitTxn(); Assert.assertEquals("WRITE_SET mismatch: " + TxnDbUtil.queryToString("select * from WRITE_SET"), 1, TxnDbUtil.countQueryAgent("select count(*) from WRITE_SET")); @@ -882,8 +887,10 @@ public class TestDbTxnManager2 { checkCmdOnDriver(driver.compileAndRespond("update TAB2 set b = 17 where a = 1"));//no rows match txnMgr.acquireLocks(driver.getPlan(), ctx, "Long Running"); //so generate empty Dyn Part call - txnHandler.addDynamicPartitions(new AddDynamicPartitions(txnMgr.getCurrentTxnId(), - "default", "tab2", Collections.EMPTY_LIST)); + adp = new AddDynamicPartitions(txnMgr.getCurrentTxnId(), + "default", "tab2", Collections.EMPTY_LIST); + adp.setOperationType(DataOperationType.UPDATE); + txnHandler.addDynamicPartitions(adp); txnMgr.commitTxn(); locks = getLocks(txnMgr); @@ -984,16 +991,20 @@ public class TestDbTxnManager2 { checkLock(LockType.SHARED_WRITE, LockState.ACQUIRED, "default", "TAB2", "p=one", locks.get(1)); //this simulates the completion of txnid:2 - txnHandler.addDynamicPartitions(new AddDynamicPartitions(txnMgr2.getCurrentTxnId(), "default", "tab2", - Collections.singletonList("p=two"))); + AddDynamicPartitions adp = new AddDynamicPartitions(txnMgr2.getCurrentTxnId(), "default", "tab2", + Collections.singletonList("p=two")); + adp.setOperationType(DataOperationType.UPDATE); + txnHandler.addDynamicPartitions(adp); txnMgr2.commitTxn();//txnid:2 locks = getLocks(txnMgr2); Assert.assertEquals("Unexpected lock count", 1, locks.size()); checkLock(LockType.SHARED_WRITE, LockState.ACQUIRED, "default", "TAB2", "p=one", locks.get(0)); //completion of txnid:3 - txnHandler.addDynamicPartitions(new AddDynamicPartitions(txnMgr.getCurrentTxnId(), "default", "tab2", - Collections.singletonList("p=one"))); + adp = new AddDynamicPartitions(txnMgr.getCurrentTxnId(), "default", "tab2", + Collections.singletonList("p=one")); + adp.setOperationType(DataOperationType.UPDATE); + txnHandler.addDynamicPartitions(adp); txnMgr.commitTxn();//txnid:3 //now both txns concurrently updated TAB2 but different partitions. @@ -1031,8 +1042,10 @@ public class TestDbTxnManager2 { checkLock(LockType.SHARED_WRITE, LockState.WAITING, "default", "TAB1", "p=one", locks.get(3)); //this simulates the completion of txnid:5 - txnHandler.addDynamicPartitions(new AddDynamicPartitions(txnMgr2.getCurrentTxnId(), "default", "tab1", - Collections.singletonList("p=one"))); + adp = new AddDynamicPartitions(txnMgr2.getCurrentTxnId(), "default", "tab1", + Collections.singletonList("p=one")); + adp.setOperationType(DataOperationType.UPDATE); + txnHandler.addDynamicPartitions(adp); txnMgr2.commitTxn();//txnid:5 ((DbLockManager)txnMgr.getLockManager()).checkLock(locks.get(2).getLockid());//retest WAITING locks (both have same ext id) @@ -1041,8 +1054,10 @@ public class TestDbTxnManager2 { checkLock(LockType.SHARED_WRITE, LockState.ACQUIRED, "default", "TAB1", "p=two", locks.get(0)); checkLock(LockType.SHARED_WRITE, LockState.ACQUIRED, "default", "TAB1", "p=one", locks.get(1)); //completion of txnid:6 - txnHandler.addDynamicPartitions(new AddDynamicPartitions(txnMgr.getCurrentTxnId(), "default", "tab1", - Collections.singletonList("p=two"))); + adp = new AddDynamicPartitions(txnMgr.getCurrentTxnId(), "default", "tab1", + Collections.singletonList("p=two")); + adp.setOperationType(DataOperationType.UPDATE); + txnHandler.addDynamicPartitions(adp); txnMgr.commitTxn();//txnid:6 Assert.assertEquals("WRITE_SET mismatch: " + TxnDbUtil.queryToString("select * from WRITE_SET"), @@ -1082,8 +1097,10 @@ public class TestDbTxnManager2 { checkLock(LockType.SHARED_WRITE, LockState.WAITING, "default", "TAB1", "p=two", locks.get(2)); //this simulates the completion of txnid:2 - txnHandler.addDynamicPartitions(new AddDynamicPartitions(txnMgr2.getCurrentTxnId(), "default", "tab1", - Collections.singletonList("p=one"))); + AddDynamicPartitions adp = new AddDynamicPartitions(txnMgr2.getCurrentTxnId(), "default", "tab1", + Collections.singletonList("p=one")); + adp.setOperationType(DataOperationType.UPDATE); + txnHandler.addDynamicPartitions(adp); txnMgr2.commitTxn();//txnid:2 ((DbLockManager)txnMgr.getLockManager()).checkLock(locks.get(2).getLockid());//retest WAITING locks (both have same ext id) @@ -1091,8 +1108,10 @@ public class TestDbTxnManager2 { Assert.assertEquals("Unexpected lock count", 1, locks.size()); checkLock(LockType.SHARED_WRITE, LockState.ACQUIRED, "default", "TAB1", "p=two", locks.get(0)); //completion of txnid:3 - txnHandler.addDynamicPartitions(new AddDynamicPartitions(txnMgr.getCurrentTxnId(), "default", "tab1", - Collections.singletonList("p=two"))); + adp = new AddDynamicPartitions(txnMgr.getCurrentTxnId(), "default", "tab1", + Collections.singletonList("p=two")); + adp.setOperationType(DataOperationType.UPDATE); + txnHandler.addDynamicPartitions(adp); txnMgr.commitTxn();//txnid:3 Assert.assertEquals("WRITE_SET mismatch: " + TxnDbUtil.queryToString("select * from WRITE_SET"), @@ -1131,8 +1150,10 @@ public class TestDbTxnManager2 { checkLock(LockType.SHARED_WRITE, LockState.WAITING, "default", "TAB1", "p=two", locks.get(2)); //this simulates the completion of txnid:2 - txnHandler.addDynamicPartitions(new AddDynamicPartitions(txnMgr2.getCurrentTxnId(), "default", "tab1", - Collections.singletonList("p=one"))); + AddDynamicPartitions adp = new AddDynamicPartitions(txnMgr2.getCurrentTxnId(), "default", "tab1", + Collections.singletonList("p=one")); + adp.setOperationType(DataOperationType.UPDATE); + txnHandler.addDynamicPartitions(adp); txnMgr2.commitTxn();//txnid:2 ((DbLockManager)txnMgr.getLockManager()).checkLock(locks.get(2).getLockid());//retest WAITING locks (both have same ext id) @@ -1140,14 +1161,22 @@ public class TestDbTxnManager2 { Assert.assertEquals("Unexpected lock count", 1, locks.size()); checkLock(LockType.SHARED_WRITE, LockState.ACQUIRED, "default", "TAB1", "p=two", locks.get(0)); //completion of txnid:3 - txnHandler.addDynamicPartitions(new AddDynamicPartitions(txnMgr.getCurrentTxnId(), "default", "tab1", - Collections.singletonList("p=two"))); + adp = new AddDynamicPartitions(txnMgr.getCurrentTxnId(), "default", "tab1", + Collections.singletonList("p=two")); + adp.setOperationType(DataOperationType.DELETE); + txnHandler.addDynamicPartitions(adp); txnMgr.commitTxn();//txnid:3 + Assert.assertEquals("WRITE_SET mismatch: " + TxnDbUtil.queryToString("select * from COMPLETED_TXN_COMPONENTS"), + 2, TxnDbUtil.countQueryAgent("select count(*) from COMPLETED_TXN_COMPONENTS where ctc_txnid=1 and ctc_table='tab1'")); + Assert.assertEquals("WRITE_SET mismatch: " + TxnDbUtil.queryToString("select * from COMPLETED_TXN_COMPONENTS"), + 1, TxnDbUtil.countQueryAgent("select count(*) from COMPLETED_TXN_COMPONENTS where ctc_txnid=2 and ctc_table='tab1' and ctc_partition='p=one'")); + Assert.assertEquals("WRITE_SET mismatch: " + TxnDbUtil.queryToString("select * from COMPLETED_TXN_COMPONENTS"), + 1, TxnDbUtil.countQueryAgent("select count(*) from COMPLETED_TXN_COMPONENTS where ctc_txnid=3 and ctc_table='tab1' and ctc_partition='p=two'")); Assert.assertEquals("WRITE_SET mismatch: " + TxnDbUtil.queryToString("select * from WRITE_SET"), 1, TxnDbUtil.countQueryAgent("select count(*) from WRITE_SET where ws_partition='p=one' and ws_operation_type='u' and ws_table='tab1'")); Assert.assertEquals("WRITE_SET mismatch: " + TxnDbUtil.queryToString("select * from WRITE_SET"), - 1, TxnDbUtil.countQueryAgent("select count(*) from WRITE_SET where ws_partition='p=two' and ws_operation_type='u' and ws_table='tab1'")); + 1, TxnDbUtil.countQueryAgent("select count(*) from WRITE_SET where ws_partition='p=two' and ws_operation_type='d' and ws_table='tab1'")); Assert.assertEquals("COMPLETED_TXN_COMPONENTS mismatch: " + TxnDbUtil.queryToString("select * from COMPLETED_TXN_COMPONENTS"), 4, TxnDbUtil.countQueryAgent("select count(*) from COMPLETED_TXN_COMPONENTS where ctc_table='tab1' and ctc_partition is not null")); } @@ -1180,8 +1209,10 @@ public class TestDbTxnManager2 { checkLock(LockType.SHARED_WRITE, LockState.WAITING, "default", "TAB1", "p=two", locks.get(2)); //this simulates the completion of txnid:2 - txnHandler.addDynamicPartitions(new AddDynamicPartitions(txnMgr2.getCurrentTxnId(), "default", "tab1", - Collections.singletonList("p=two"))); + AddDynamicPartitions adp = new AddDynamicPartitions(txnMgr2.getCurrentTxnId(), "default", "tab1", + Collections.singletonList("p=two")); + adp.setOperationType(DataOperationType.UPDATE); + txnHandler.addDynamicPartitions(adp); txnMgr2.commitTxn();//txnid:2 ((DbLockManager)txnMgr.getLockManager()).checkLock(locks.get(2).getLockid());//retest WAITING locks (both have same ext id) @@ -1189,8 +1220,10 @@ public class TestDbTxnManager2 { Assert.assertEquals("Unexpected lock count", 1, locks.size()); checkLock(LockType.SHARED_WRITE, LockState.ACQUIRED, "default", "TAB1", "p=two", locks.get(0)); //completion of txnid:3 - txnHandler.addDynamicPartitions(new AddDynamicPartitions(txnMgr.getCurrentTxnId(), "default", "tab1", - Collections.singletonList("p=two"))); + adp = new AddDynamicPartitions(txnMgr.getCurrentTxnId(), "default", "tab1", + Collections.singletonList("p=two")); + adp.setOperationType(DataOperationType.DELETE); + txnHandler.addDynamicPartitions(adp); LockException exception = null; try { txnMgr.commitTxn();//txnid:3 @@ -1210,11 +1243,7 @@ public class TestDbTxnManager2 { } /** * Concurrent delte/detele of same partition - should pass - * This test doesn't work yet, because we don't yet pass in operation type - * - * todo: Concurrent insert/update of same partition - should pass */ - @Ignore("HIVE-13622") @Test public void testWriteSetTracking11() throws Exception { CommandProcessorResponse cpr = driver.run("create table if not exists tab1 (a int, b int) partitioned by (p string) " + @@ -1232,46 +1261,86 @@ public class TestDbTxnManager2 { //now start concurrent txn txnMgr.openTxn("T3"); + checkCmdOnDriver(driver.compileAndRespond("select * from tab1 where b=1 and p='one'")); + ((DbTxnManager)txnMgr).acquireLocks(driver.getPlan(), ctx, "T3", false); checkCmdOnDriver(driver.compileAndRespond("delete from tab1 where p='two' and b=2")); ((DbTxnManager)txnMgr).acquireLocks(driver.getPlan(), ctx, "T3", false); locks = getLocks(txnMgr); - Assert.assertEquals("Unexpected lock count", 3, locks.size()); - checkLock(LockType.SHARED_WRITE, LockState.ACQUIRED, "default", "TAB1", "p=two", locks.get(0)); - checkLock(LockType.SHARED_WRITE, LockState.ACQUIRED, "default", "TAB1", "p=one", locks.get(1)); - checkLock(LockType.SHARED_WRITE, LockState.WAITING, "default", "TAB1", "p=two", locks.get(2)); + Assert.assertEquals("Unexpected lock count", 5, locks.size()); + checkLock(LockType.SHARED_READ, LockState.ACQUIRED, "default", "TAB1", null, locks.get(0)); + checkLock(LockType.SHARED_READ, LockState.ACQUIRED, "default", "TAB1", "p=one", locks.get(1)); + checkLock(LockType.SHARED_WRITE, LockState.ACQUIRED, "default", "TAB1", "p=two", locks.get(2)); + checkLock(LockType.SHARED_WRITE, LockState.ACQUIRED, "default", "TAB1", "p=one", locks.get(3)); + checkLock(LockType.SHARED_WRITE, LockState.WAITING, "default", "TAB1", "p=two", locks.get(4)); //this simulates the completion of txnid:2 - txnHandler.addDynamicPartitions(new AddDynamicPartitions(txnMgr2.getCurrentTxnId(), "default", "tab1", - Collections.singletonList("p=two"))); + AddDynamicPartitions adp = new AddDynamicPartitions(txnMgr2.getCurrentTxnId(), "default", "tab1", + Collections.singletonList("p=two")); + adp.setOperationType(DataOperationType.DELETE); + txnHandler.addDynamicPartitions(adp); txnMgr2.commitTxn();//txnid:2 - ((DbLockManager)txnMgr.getLockManager()).checkLock(locks.get(2).getLockid());//retest WAITING locks (both have same ext id) + ((DbLockManager)txnMgr.getLockManager()).checkLock(locks.get(4).getLockid());//retest WAITING locks (both have same ext id) locks = getLocks(txnMgr); - Assert.assertEquals("Unexpected lock count", 1, locks.size()); - checkLock(LockType.SHARED_WRITE, LockState.ACQUIRED, "default", "TAB1", "p=two", locks.get(0)); + Assert.assertEquals("Unexpected lock count", 3, locks.size()); + checkLock(LockType.SHARED_READ, LockState.ACQUIRED, "default", "TAB1", null, locks.get(0)); + checkLock(LockType.SHARED_READ, LockState.ACQUIRED, "default", "TAB1", "p=one", locks.get(1)); + checkLock(LockType.SHARED_WRITE, LockState.ACQUIRED, "default", "TAB1", "p=two", locks.get(2)); //completion of txnid:3 - txnHandler.addDynamicPartitions(new AddDynamicPartitions(txnMgr.getCurrentTxnId(), "default", "tab1", - Collections.singletonList("p=two"))); - LockException exception = null; - try { - txnMgr.commitTxn();//txnid:3 - } - catch(LockException e) { - exception = e; - } - Assert.assertNotEquals("Expected exception", null, exception); - Assert.assertEquals("Exception msg doesn't match", - "Aborting [txnid:3,3] due to a write conflict on default/tab1/p=two committed by [txnid:2,3]", - exception.getCause().getMessage()); + adp = new AddDynamicPartitions(txnMgr.getCurrentTxnId(), "default", "tab1", + Collections.singletonList("p=two")); + adp.setOperationType(DataOperationType.DELETE); + txnHandler.addDynamicPartitions(adp); + txnMgr.commitTxn();//txnid:3 - //todo: this currently fails since we don't yet set operation type properly Assert.assertEquals("WRITE_SET mismatch: " + TxnDbUtil.queryToString("select * from WRITE_SET"), - 1, TxnDbUtil.countQueryAgent("select count(*) from WRITE_SET where ws_partition='p=two' and ws_operation_type='d' and ws_table='tab1'")); + 1, TxnDbUtil.countQueryAgent("select count(*) from WRITE_SET where ws_partition='p=two' and ws_operation_type='d' and ws_table='tab1' and ws_txnid=2")); Assert.assertEquals("WRITE_SET mismatch: " + TxnDbUtil.queryToString("select * from WRITE_SET"), - 1, TxnDbUtil.countQueryAgent("select count(*) from WRITE_SET where ws_partition='p=two' and ws_operation_type='d' and ws_table='tab1'")); + 1, TxnDbUtil.countQueryAgent("select count(*) from WRITE_SET where ws_partition='p=two' and ws_operation_type='d' and ws_table='tab1' and ws_txnid=3")); + Assert.assertEquals("WRITE_SET mismatch: " + TxnDbUtil.queryToString("select * from WRITE_SET"), + 1, TxnDbUtil.countQueryAgent("select count(*) from WRITE_SET where ws_partition='p=two' and ws_operation_type='d' and ws_table='tab1' and ws_txnid=2")); + Assert.assertEquals("WRITE_SET mismatch: " + TxnDbUtil.queryToString("select * from WRITE_SET"), + 1, TxnDbUtil.countQueryAgent("select count(*) from WRITE_SET where ws_partition='p=two' and ws_operation_type='d' and ws_table='tab1' and ws_txnid=3")); Assert.assertEquals("COMPLETED_TXN_COMPONENTS mismatch: " + TxnDbUtil.queryToString("select * from COMPLETED_TXN_COMPONENTS"), 4, TxnDbUtil.countQueryAgent("select count(*) from COMPLETED_TXN_COMPONENTS where ctc_table='tab1' and ctc_partition is not null")); } + @Test + public void testCompletedTxnComponents() throws Exception { + CommandProcessorResponse cpr = driver.run("create table if not exists tab1 (a int, b int) partitioned by (p string) " + + "clustered by (a) into 2 buckets stored as orc TBLPROPERTIES ('transactional'='true')"); + checkCmdOnDriver(cpr); + cpr = driver.run("create table if not exists tab_not_acid2 (a int, b int)"); + checkCmdOnDriver(cpr); + checkCmdOnDriver(driver.run("insert into tab_not_acid2 values(1,1),(2,2)")); + //writing both acid and non-acid resources in the same txn + checkCmdOnDriver(driver.run("from tab_not_acid2 insert into tab1 partition(p='two')(a,b) select a,b insert into tab_not_acid2(a,b) select a,b "));//txnid:1 + Assert.assertEquals(TxnDbUtil.queryToString("select * from COMPLETED_TXN_COMPONENTS"), + 1, TxnDbUtil.countQueryAgent("select count(*) from COMPLETED_TXN_COMPONENTS")); + //only expect transactional components to be in COMPLETED_TXN_COMPONENTS + Assert.assertEquals(TxnDbUtil.queryToString("select * from COMPLETED_TXN_COMPONENTS"), + 1, TxnDbUtil.countQueryAgent("select count(*) from COMPLETED_TXN_COMPONENTS where ctc_txnid=1 and ctc_table='tab1'")); + } + @Test + public void testMultiInsert() throws Exception { + CommandProcessorResponse cpr = driver.run("create table if not exists tab1 (a int, b int) partitioned by (p string) " + + "clustered by (a) into 2 buckets stored as orc TBLPROPERTIES ('transactional'='true')"); + checkCmdOnDriver(cpr); + cpr = driver.run("create table if not exists tab_not_acid (a int, b int, p string)"); + checkCmdOnDriver(cpr); + checkCmdOnDriver(driver.run("insert into tab_not_acid values(1,1,'one'),(2,2,'two')")); + checkCmdOnDriver(driver.run("insert into tab1 partition(p) values(3,3,'one'),(4,4,'two')"));//txinid:1 + //writing both acid and non-acid resources in the same txn + //tab1 write is a dynamic partition insert + checkCmdOnDriver(driver.run("from tab_not_acid insert into tab1 partition(p)(a,b,p) select a,b,p insert into tab_not_acid(a,b) select a,b where p='two'"));//txnid:2 + Assert.assertEquals(TxnDbUtil.queryToString("select * from COMPLETED_TXN_COMPONENTS"), + 4, TxnDbUtil.countQueryAgent("select count(*) from COMPLETED_TXN_COMPONENTS")); + //only expect transactional components to be in COMPLETED_TXN_COMPONENTS + Assert.assertEquals(TxnDbUtil.queryToString("select * from COMPLETED_TXN_COMPONENTS"), + 2, TxnDbUtil.countQueryAgent("select count(*) from COMPLETED_TXN_COMPONENTS where ctc_txnid=2")); + Assert.assertEquals(TxnDbUtil.queryToString("select * from COMPLETED_TXN_COMPONENTS"), + 2, TxnDbUtil.countQueryAgent("select count(*) from COMPLETED_TXN_COMPONENTS where ctc_txnid=2 and ctc_table='tab1'")); + } + //todo: Concurrent insert/update of same partition - should pass private List<ShowLocksResponseElement> getLocksWithFilterOptions(HiveTxnManager txnMgr, String dbName, String tblName, Map<String, String> partSpec) throws Exception { http://git-wip-us.apache.org/repos/asf/hive/blob/c0b532fc/ql/src/test/org/apache/hadoop/hive/ql/txn/compactor/TestCleaner.java ---------------------------------------------------------------------- diff --git a/ql/src/test/org/apache/hadoop/hive/ql/txn/compactor/TestCleaner.java b/ql/src/test/org/apache/hadoop/hive/ql/txn/compactor/TestCleaner.java index 8d75ab3..b797b55 100644 --- a/ql/src/test/org/apache/hadoop/hive/ql/txn/compactor/TestCleaner.java +++ b/ql/src/test/org/apache/hadoop/hive/ql/txn/compactor/TestCleaner.java @@ -208,6 +208,7 @@ public class TestCleaner extends CompactorTest { LockComponent comp = new LockComponent(LockType.SHARED_READ, LockLevel.TABLE, "default"); comp.setTablename("bblt"); + comp.setOperationType(DataOperationType.SELECT); List<LockComponent> components = new ArrayList<LockComponent>(1); components.add(comp); LockRequest req = new LockRequest(components, "me", "localhost"); @@ -246,6 +247,7 @@ public class TestCleaner extends CompactorTest { LockComponent comp = new LockComponent(LockType.SHARED_WRITE, LockLevel.PARTITION, "default"); comp.setTablename("bblp"); comp.setPartitionname("ds=today"); + comp.setOperationType(DataOperationType.DELETE); List<LockComponent> components = new ArrayList<LockComponent>(1); components.add(comp); LockRequest req = new LockRequest(components, "me", "localhost"); @@ -288,6 +290,7 @@ public class TestCleaner extends CompactorTest { LockComponent comp = new LockComponent(LockType.SHARED_READ, LockLevel.TABLE, "default"); comp.setTablename("bblt"); + comp.setOperationType(DataOperationType.INSERT); List<LockComponent> components = new ArrayList<LockComponent>(1); components.add(comp); LockRequest req = new LockRequest(components, "me", "localhost"); @@ -311,6 +314,7 @@ public class TestCleaner extends CompactorTest { // clean request LockComponent comp2 = new LockComponent(LockType.SHARED_READ, LockLevel.TABLE, "default"); comp2.setTablename("bblt"); + comp.setOperationType(DataOperationType.SELECT); List<LockComponent> components2 = new ArrayList<LockComponent>(1); components2.add(comp2); LockRequest req2 = new LockRequest(components, "me", "localhost"); @@ -360,6 +364,7 @@ public class TestCleaner extends CompactorTest { LockComponent comp = new LockComponent(LockType.SHARED_READ, LockLevel.PARTITION, "default"); comp.setTablename("bblt"); comp.setPartitionname("ds=today"); + comp.setOperationType(DataOperationType.INSERT); List<LockComponent> components = new ArrayList<LockComponent>(1); components.add(comp); LockRequest req = new LockRequest(components, "me", "localhost"); @@ -385,6 +390,7 @@ public class TestCleaner extends CompactorTest { LockComponent comp2 = new LockComponent(LockType.SHARED_READ, LockLevel.PARTITION, "default"); comp2.setTablename("bblt"); comp2.setPartitionname("ds=today"); + comp.setOperationType(DataOperationType.SELECT); List<LockComponent> components2 = new ArrayList<LockComponent>(1); components2.add(comp2); LockRequest req2 = new LockRequest(components, "me", "localhost"); http://git-wip-us.apache.org/repos/asf/hive/blob/c0b532fc/ql/src/test/org/apache/hadoop/hive/ql/txn/compactor/TestInitiator.java ---------------------------------------------------------------------- diff --git a/ql/src/test/org/apache/hadoop/hive/ql/txn/compactor/TestInitiator.java b/ql/src/test/org/apache/hadoop/hive/ql/txn/compactor/TestInitiator.java index f84bd7e..bbd2bf8 100644 --- a/ql/src/test/org/apache/hadoop/hive/ql/txn/compactor/TestInitiator.java +++ b/ql/src/test/org/apache/hadoop/hive/ql/txn/compactor/TestInitiator.java @@ -110,6 +110,7 @@ public class TestInitiator extends CompactorTest { long txnid = openTxn(); LockComponent comp = new LockComponent(LockType.SHARED_WRITE, LockLevel.TABLE, "default"); comp.setTablename("mcottma"); + comp.setOperationType(DataOperationType.UPDATE); List<LockComponent> components = new ArrayList<LockComponent>(1); components.add(comp); LockRequest req = new LockRequest(components, "me", "localhost"); @@ -140,6 +141,7 @@ public class TestInitiator extends CompactorTest { LockComponent comp = new LockComponent(LockType.SHARED_WRITE, LockLevel.TABLE, "default"); comp.setTablename("mcoptma"); comp.setPartitionname("ds=today"); + comp.setOperationType(DataOperationType.DELETE); List<LockComponent> components = new ArrayList<LockComponent>(1); components.add(comp); LockRequest req = new LockRequest(components, "me", "localhost"); @@ -173,6 +175,7 @@ public class TestInitiator extends CompactorTest { LockComponent comp = new LockComponent(LockType.SHARED_WRITE, LockLevel.TABLE, "default"); comp.setTablename("ncomdpa"); comp.setPartitionname("ds=day-" + i); + comp.setOperationType(DataOperationType.UPDATE); List<LockComponent> components = new ArrayList<LockComponent>(1); components.add(comp); LockRequest req = new LockRequest(components, "me", "localhost"); @@ -197,6 +200,7 @@ public class TestInitiator extends CompactorTest { long txnid = openTxn(); LockComponent comp = new LockComponent(LockType.SHARED_WRITE, LockLevel.TABLE, "default"); comp.setTablename("ceat"); + comp.setOperationType(DataOperationType.UPDATE); List<LockComponent> components = new ArrayList<LockComponent>(1); components.add(comp); LockRequest req = new LockRequest(components, "me", "localhost"); @@ -229,6 +233,7 @@ public class TestInitiator extends CompactorTest { long txnid = openTxn(); LockComponent comp = new LockComponent(LockType.SHARED_WRITE, LockLevel.TABLE, "default"); comp.setTablename("ncwncs"); + comp.setOperationType(DataOperationType.UPDATE); List<LockComponent> components = new ArrayList<LockComponent>(1); components.add(comp); LockRequest req = new LockRequest(components, "me", "localhost"); @@ -254,6 +259,7 @@ public class TestInitiator extends CompactorTest { for (int i = 0; i < 11; i++) { long txnid = openTxn(); LockComponent comp = new LockComponent(LockType.SHARED_WRITE, LockLevel.TABLE, "default"); + comp.setOperationType(DataOperationType.DELETE); comp.setTablename("ncwncs"); List<LockComponent> components = new ArrayList<LockComponent>(1); components.add(comp); @@ -279,6 +285,7 @@ public class TestInitiator extends CompactorTest { long txnid = openTxn(); LockComponent comp = new LockComponent(LockType.SHARED_WRITE, LockLevel.TABLE, "default"); comp.setTablename("ncwcas"); + comp.setOperationType(DataOperationType.UPDATE); List<LockComponent> components = new ArrayList<LockComponent>(1); components.add(comp); LockRequest req = new LockRequest(components, "me", "localhost"); @@ -319,6 +326,7 @@ public class TestInitiator extends CompactorTest { long txnid = openTxn(); LockComponent comp = new LockComponent(LockType.SHARED_WRITE, LockLevel.TABLE, "default"); comp.setTablename("cthdp"); + comp.setOperationType(DataOperationType.UPDATE); List<LockComponent> components = new ArrayList<LockComponent>(1); components.add(comp); LockRequest req = new LockRequest(components, "me", "localhost"); @@ -351,6 +359,7 @@ public class TestInitiator extends CompactorTest { LockComponent comp = new LockComponent(LockType.SHARED_WRITE, LockLevel.PARTITION, "default"); comp.setTablename("cphdp"); comp.setPartitionname("ds=today"); + comp.setOperationType(DataOperationType.UPDATE); List<LockComponent> components = new ArrayList<LockComponent>(1); components.add(comp); LockRequest req = new LockRequest(components, "me", "localhost"); @@ -382,6 +391,7 @@ public class TestInitiator extends CompactorTest { long txnid = openTxn(); LockComponent comp = new LockComponent(LockType.SHARED_WRITE, LockLevel.TABLE, "default"); comp.setTablename("nctdpnhe"); + comp.setOperationType(DataOperationType.UPDATE); List<LockComponent> components = new ArrayList<LockComponent>(1); components.add(comp); LockRequest req = new LockRequest(components, "me", "localhost"); @@ -417,6 +427,7 @@ public class TestInitiator extends CompactorTest { long txnid = openTxn(); LockComponent comp = new LockComponent(LockType.SHARED_WRITE, LockLevel.TABLE, "default"); comp.setTablename("cttmd"); + comp.setOperationType(DataOperationType.UPDATE); List<LockComponent> components = new ArrayList<LockComponent>(1); components.add(comp); LockRequest req = new LockRequest(components, "me", "localhost"); @@ -458,6 +469,7 @@ public class TestInitiator extends CompactorTest { LockComponent comp = new LockComponent(LockType.SHARED_WRITE, LockLevel.PARTITION, "default"); comp.setTablename("cptmd"); comp.setPartitionname("ds=today"); + comp.setOperationType(DataOperationType.UPDATE); List<LockComponent> components = new ArrayList<LockComponent>(1); components.add(comp); LockRequest req = new LockRequest(components, "me", "localhost"); @@ -489,6 +501,7 @@ public class TestInitiator extends CompactorTest { long txnid = openTxn(); LockComponent comp = new LockComponent(LockType.SHARED_WRITE, LockLevel.TABLE, "default"); comp.setTablename("nctned"); + comp.setOperationType(DataOperationType.UPDATE); List<LockComponent> components = new ArrayList<LockComponent>(1); components.add(comp); LockRequest req = new LockRequest(components, "me", "localhost"); @@ -524,6 +537,7 @@ public class TestInitiator extends CompactorTest { long txnid = openTxn(); LockComponent comp = new LockComponent(LockType.SHARED_WRITE, LockLevel.TABLE, "default"); comp.setTablename("cmomwbv"); + comp.setOperationType(DataOperationType.UPDATE); List<LockComponent> components = new ArrayList<LockComponent>(1); components.add(comp); LockRequest req = new LockRequest(components, "me", "localhost"); @@ -564,6 +578,7 @@ public class TestInitiator extends CompactorTest { LockComponent comp = new LockComponent(LockType.SHARED_WRITE, LockLevel.PARTITION, "default"); comp.setTablename("ednb"); comp.setPartitionname("ds=today"); + comp.setOperationType(DataOperationType.DELETE); List<LockComponent> components = new ArrayList<LockComponent>(1); components.add(comp); LockRequest req = new LockRequest(components, "me", "localhost"); @@ -597,6 +612,7 @@ public class TestInitiator extends CompactorTest { LockComponent comp = new LockComponent(LockType.SHARED_WRITE, LockLevel.PARTITION, "default"); comp.setTablename("ttospgocr"); comp.setPartitionname("ds=today"); + comp.setOperationType(DataOperationType.UPDATE); List<LockComponent> components = new ArrayList<LockComponent>(1); components.add(comp); LockRequest req = new LockRequest(components, "me", "localhost"); @@ -608,6 +624,7 @@ public class TestInitiator extends CompactorTest { comp = new LockComponent(LockType.SHARED_WRITE, LockLevel.PARTITION, "default"); comp.setTablename("ttospgocr"); comp.setPartitionname("ds=today"); + comp.setOperationType(DataOperationType.UPDATE); components = new ArrayList<LockComponent>(1); components.add(comp); req = new LockRequest(components, "me", "localhost"); @@ -640,6 +657,7 @@ public class TestInitiator extends CompactorTest { long txnid = openTxn(); LockComponent comp = new LockComponent(LockType.SHARED_WRITE, LockLevel.TABLE, "default"); comp.setTablename("nctdp"); + comp.setOperationType(DataOperationType.UPDATE); List<LockComponent> components = new ArrayList<LockComponent>(1); components.add(comp); LockRequest req = new LockRequest(components, "me", "localhost"); @@ -667,6 +685,7 @@ public class TestInitiator extends CompactorTest { long txnid = openTxn(); LockComponent comp = new LockComponent(LockType.SHARED_WRITE, LockLevel.PARTITION, "default"); comp.setTablename("dt"); + comp.setOperationType(DataOperationType.UPDATE); List<LockComponent> components = new ArrayList<LockComponent>(1); components.add(comp); LockRequest req = new LockRequest(components, "me", "localhost"); @@ -698,6 +717,7 @@ public class TestInitiator extends CompactorTest { LockComponent comp = new LockComponent(LockType.SHARED_WRITE, LockLevel.PARTITION, "default"); comp.setTablename("dp"); comp.setPartitionname("ds=today"); + comp.setOperationType(DataOperationType.UPDATE); List<LockComponent> components = new ArrayList<LockComponent>(1); components.add(comp); LockRequest req = new LockRequest(components, "me", "localhost"); http://git-wip-us.apache.org/repos/asf/hive/blob/c0b532fc/service/src/gen/thrift/gen-py/hive_service/ThriftHive-remote ---------------------------------------------------------------------- diff --git a/service/src/gen/thrift/gen-py/hive_service/ThriftHive-remote b/service/src/gen/thrift/gen-py/hive_service/ThriftHive-remote index af2d93d..cbdbd56 100755 --- a/service/src/gen/thrift/gen-py/hive_service/ThriftHive-remote +++ b/service/src/gen/thrift/gen-py/hive_service/ThriftHive-remote @@ -145,6 +145,7 @@ if len(sys.argv) <= 1 or sys.argv[1] == '--help': print(' GetOpenTxnsInfoResponse get_open_txns_info()') print(' OpenTxnsResponse open_txns(OpenTxnRequest rqst)') print(' void abort_txn(AbortTxnRequest rqst)') + print(' void abort_txns(AbortTxnsRequest rqst)') print(' void commit_txn(CommitTxnRequest rqst)') print(' LockResponse lock(LockRequest rqst)') print(' LockResponse check_lock(CheckLockRequest rqst)') @@ -953,6 +954,12 @@ elif cmd == 'abort_txn': sys.exit(1) pp.pprint(client.abort_txn(eval(args[0]),)) +elif cmd == 'abort_txns': + if len(args) != 1: + print('abort_txns requires 1 args') + sys.exit(1) + pp.pprint(client.abort_txns(eval(args[0]),)) + elif cmd == 'commit_txn': if len(args) != 1: print('commit_txn requires 1 args')
