Repository: hive
Updated Branches:
  refs/heads/branch-1 6c6583274 -> c0b532fce


http://git-wip-us.apache.org/repos/asf/hive/blob/c0b532fc/ql/src/test/org/apache/hadoop/hive/ql/lockmgr/TestDbTxnManager2.java
----------------------------------------------------------------------
diff --git 
a/ql/src/test/org/apache/hadoop/hive/ql/lockmgr/TestDbTxnManager2.java 
b/ql/src/test/org/apache/hadoop/hive/ql/lockmgr/TestDbTxnManager2.java
index 0770298..584cd45 100644
--- a/ql/src/test/org/apache/hadoop/hive/ql/lockmgr/TestDbTxnManager2.java
+++ b/ql/src/test/org/apache/hadoop/hive/ql/lockmgr/TestDbTxnManager2.java
@@ -18,6 +18,7 @@
 package org.apache.hadoop.hive.ql.lockmgr;
 
 import org.apache.hadoop.hive.metastore.api.AddDynamicPartitions;
+import org.apache.hadoop.hive.metastore.api.DataOperationType;
 import org.apache.hadoop.hive.metastore.txn.TxnStore;
 import org.apache.hadoop.hive.metastore.txn.TxnUtils;
 import org.apache.hadoop.hive.ql.TestTxnCommands2;
@@ -854,8 +855,10 @@ public class TestDbTxnManager2 {
     checkLock(LockType.SHARED_WRITE, LockState.ACQUIRED, "default", "TAB2", 
null, locks.get(1));
     //update stmt has p=blah, thus nothing is actually update and we generate 
empty dyn part list
     Assert.assertEquals(0, TxnDbUtil.countQueryAgent("select count(*) from 
WRITE_SET"));
-    txnHandler.addDynamicPartitions(new 
AddDynamicPartitions(txnMgr2.getCurrentTxnId(),
-      "default", "tab2", Collections.EMPTY_LIST));
+    AddDynamicPartitions adp = new 
AddDynamicPartitions(txnMgr2.getCurrentTxnId(),
+      "default", "tab2", Collections.EMPTY_LIST);
+    adp.setOperationType(DataOperationType.UPDATE);
+    txnHandler.addDynamicPartitions(adp);
     txnMgr2.commitTxn();
     //Short Running updated nothing, so we expect 0 rows in WRITE_SET
     Assert.assertEquals( 0, TxnDbUtil.countQueryAgent("select count(*) from 
WRITE_SET"));
@@ -869,8 +872,10 @@ public class TestDbTxnManager2 {
     checkLock(LockType.SHARED_WRITE, LockState.ACQUIRED, "default", "TAB2", 
null, locks.get(1));//since TAB2 is empty
     //update stmt has p=blah, thus nothing is actually update and we generate 
empty dyn part list
     Assert.assertEquals(0, TxnDbUtil.countQueryAgent("select count(*) from 
WRITE_SET"));
-    txnHandler.addDynamicPartitions(new 
AddDynamicPartitions(txnMgr2.getCurrentTxnId(),
-      "default", "tab2", Collections.singletonList("p=two")));//simulate 
partition update
+    adp = new AddDynamicPartitions(txnMgr2.getCurrentTxnId(),
+      "default", "tab2", Collections.singletonList("p=two"));
+    adp.setOperationType(DataOperationType.UPDATE);
+    txnHandler.addDynamicPartitions(adp);//simulate partition update
     txnMgr2.commitTxn();
     Assert.assertEquals("WRITE_SET mismatch: " + 
TxnDbUtil.queryToString("select * from WRITE_SET"),
       1, TxnDbUtil.countQueryAgent("select count(*) from WRITE_SET"));
@@ -882,8 +887,10 @@ public class TestDbTxnManager2 {
     checkCmdOnDriver(driver.compileAndRespond("update TAB2 set b = 17 where a 
= 1"));//no rows match
     txnMgr.acquireLocks(driver.getPlan(), ctx, "Long Running");
     //so generate empty Dyn Part call
-    txnHandler.addDynamicPartitions(new 
AddDynamicPartitions(txnMgr.getCurrentTxnId(),
-      "default", "tab2", Collections.EMPTY_LIST));     
+    adp = new AddDynamicPartitions(txnMgr.getCurrentTxnId(),
+      "default", "tab2", Collections.EMPTY_LIST);
+    adp.setOperationType(DataOperationType.UPDATE);
+    txnHandler.addDynamicPartitions(adp);     
     txnMgr.commitTxn();
 
     locks = getLocks(txnMgr);
@@ -984,16 +991,20 @@ public class TestDbTxnManager2 {
     checkLock(LockType.SHARED_WRITE, LockState.ACQUIRED, "default", "TAB2", 
"p=one", locks.get(1));
     
     //this simulates the completion of txnid:2
-    txnHandler.addDynamicPartitions(new 
AddDynamicPartitions(txnMgr2.getCurrentTxnId(), "default", "tab2",
-      Collections.singletonList("p=two")));
+    AddDynamicPartitions adp = new 
AddDynamicPartitions(txnMgr2.getCurrentTxnId(), "default", "tab2",
+      Collections.singletonList("p=two"));
+    adp.setOperationType(DataOperationType.UPDATE);
+    txnHandler.addDynamicPartitions(adp);
     txnMgr2.commitTxn();//txnid:2
     
     locks = getLocks(txnMgr2);
     Assert.assertEquals("Unexpected lock count", 1, locks.size());
     checkLock(LockType.SHARED_WRITE, LockState.ACQUIRED, "default", "TAB2", 
"p=one", locks.get(0));
     //completion of txnid:3
-    txnHandler.addDynamicPartitions(new 
AddDynamicPartitions(txnMgr.getCurrentTxnId(), "default", "tab2",
-      Collections.singletonList("p=one")));
+    adp = new AddDynamicPartitions(txnMgr.getCurrentTxnId(), "default", "tab2",
+      Collections.singletonList("p=one"));
+    adp.setOperationType(DataOperationType.UPDATE);
+    txnHandler.addDynamicPartitions(adp);
     txnMgr.commitTxn();//txnid:3
     //now both txns concurrently updated TAB2 but different partitions.
     
@@ -1031,8 +1042,10 @@ public class TestDbTxnManager2 {
     checkLock(LockType.SHARED_WRITE, LockState.WAITING, "default", "TAB1", 
"p=one", locks.get(3));
 
     //this simulates the completion of txnid:5
-    txnHandler.addDynamicPartitions(new 
AddDynamicPartitions(txnMgr2.getCurrentTxnId(), "default", "tab1",
-      Collections.singletonList("p=one")));
+    adp = new AddDynamicPartitions(txnMgr2.getCurrentTxnId(), "default", 
"tab1",
+      Collections.singletonList("p=one"));
+    adp.setOperationType(DataOperationType.UPDATE);
+    txnHandler.addDynamicPartitions(adp);
     txnMgr2.commitTxn();//txnid:5
 
     
((DbLockManager)txnMgr.getLockManager()).checkLock(locks.get(2).getLockid());//retest
 WAITING locks (both have same ext id)
@@ -1041,8 +1054,10 @@ public class TestDbTxnManager2 {
     checkLock(LockType.SHARED_WRITE, LockState.ACQUIRED, "default", "TAB1", 
"p=two", locks.get(0));
     checkLock(LockType.SHARED_WRITE, LockState.ACQUIRED, "default", "TAB1", 
"p=one", locks.get(1));
     //completion of txnid:6
-    txnHandler.addDynamicPartitions(new 
AddDynamicPartitions(txnMgr.getCurrentTxnId(), "default", "tab1",
-      Collections.singletonList("p=two")));
+    adp = new AddDynamicPartitions(txnMgr.getCurrentTxnId(), "default", "tab1",
+      Collections.singletonList("p=two"));
+    adp.setOperationType(DataOperationType.UPDATE);
+    txnHandler.addDynamicPartitions(adp);
     txnMgr.commitTxn();//txnid:6
 
     Assert.assertEquals("WRITE_SET mismatch: " + 
TxnDbUtil.queryToString("select * from WRITE_SET"),
@@ -1082,8 +1097,10 @@ public class TestDbTxnManager2 {
     checkLock(LockType.SHARED_WRITE, LockState.WAITING, "default", "TAB1", 
"p=two", locks.get(2));
 
     //this simulates the completion of txnid:2
-    txnHandler.addDynamicPartitions(new 
AddDynamicPartitions(txnMgr2.getCurrentTxnId(), "default", "tab1",
-      Collections.singletonList("p=one")));
+    AddDynamicPartitions adp = new 
AddDynamicPartitions(txnMgr2.getCurrentTxnId(), "default", "tab1",
+      Collections.singletonList("p=one"));
+    adp.setOperationType(DataOperationType.UPDATE);
+    txnHandler.addDynamicPartitions(adp);
     txnMgr2.commitTxn();//txnid:2
 
     
((DbLockManager)txnMgr.getLockManager()).checkLock(locks.get(2).getLockid());//retest
 WAITING locks (both have same ext id)
@@ -1091,8 +1108,10 @@ public class TestDbTxnManager2 {
     Assert.assertEquals("Unexpected lock count", 1, locks.size());
     checkLock(LockType.SHARED_WRITE, LockState.ACQUIRED, "default", "TAB1", 
"p=two", locks.get(0));
     //completion of txnid:3
-    txnHandler.addDynamicPartitions(new 
AddDynamicPartitions(txnMgr.getCurrentTxnId(), "default", "tab1",
-      Collections.singletonList("p=two")));
+    adp = new AddDynamicPartitions(txnMgr.getCurrentTxnId(), "default", "tab1",
+      Collections.singletonList("p=two"));
+    adp.setOperationType(DataOperationType.UPDATE);
+    txnHandler.addDynamicPartitions(adp);
     txnMgr.commitTxn();//txnid:3
 
     Assert.assertEquals("WRITE_SET mismatch: " + 
TxnDbUtil.queryToString("select * from WRITE_SET"),
@@ -1131,8 +1150,10 @@ public class TestDbTxnManager2 {
     checkLock(LockType.SHARED_WRITE, LockState.WAITING, "default", "TAB1", 
"p=two", locks.get(2));
 
     //this simulates the completion of txnid:2
-    txnHandler.addDynamicPartitions(new 
AddDynamicPartitions(txnMgr2.getCurrentTxnId(), "default", "tab1",
-      Collections.singletonList("p=one")));
+    AddDynamicPartitions adp = new 
AddDynamicPartitions(txnMgr2.getCurrentTxnId(), "default", "tab1",
+      Collections.singletonList("p=one"));
+    adp.setOperationType(DataOperationType.UPDATE);
+    txnHandler.addDynamicPartitions(adp);
     txnMgr2.commitTxn();//txnid:2
 
     
((DbLockManager)txnMgr.getLockManager()).checkLock(locks.get(2).getLockid());//retest
 WAITING locks (both have same ext id)
@@ -1140,14 +1161,22 @@ public class TestDbTxnManager2 {
     Assert.assertEquals("Unexpected lock count", 1, locks.size());
     checkLock(LockType.SHARED_WRITE, LockState.ACQUIRED, "default", "TAB1", 
"p=two", locks.get(0));
     //completion of txnid:3
-    txnHandler.addDynamicPartitions(new 
AddDynamicPartitions(txnMgr.getCurrentTxnId(), "default", "tab1",
-      Collections.singletonList("p=two")));
+    adp = new AddDynamicPartitions(txnMgr.getCurrentTxnId(), "default", "tab1",
+      Collections.singletonList("p=two"));
+    adp.setOperationType(DataOperationType.DELETE);
+    txnHandler.addDynamicPartitions(adp);
     txnMgr.commitTxn();//txnid:3
 
+    Assert.assertEquals("WRITE_SET mismatch: " + 
TxnDbUtil.queryToString("select * from COMPLETED_TXN_COMPONENTS"),
+      2, TxnDbUtil.countQueryAgent("select count(*) from 
COMPLETED_TXN_COMPONENTS where ctc_txnid=1  and ctc_table='tab1'"));
+    Assert.assertEquals("WRITE_SET mismatch: " + 
TxnDbUtil.queryToString("select * from COMPLETED_TXN_COMPONENTS"),
+      1, TxnDbUtil.countQueryAgent("select count(*) from 
COMPLETED_TXN_COMPONENTS where ctc_txnid=2  and ctc_table='tab1' and 
ctc_partition='p=one'"));
+    Assert.assertEquals("WRITE_SET mismatch: " + 
TxnDbUtil.queryToString("select * from COMPLETED_TXN_COMPONENTS"),
+      1, TxnDbUtil.countQueryAgent("select count(*) from 
COMPLETED_TXN_COMPONENTS where ctc_txnid=3  and ctc_table='tab1' and 
ctc_partition='p=two'"));
     Assert.assertEquals("WRITE_SET mismatch: " + 
TxnDbUtil.queryToString("select * from WRITE_SET"),
       1, TxnDbUtil.countQueryAgent("select count(*) from WRITE_SET where 
ws_partition='p=one' and ws_operation_type='u' and ws_table='tab1'"));
     Assert.assertEquals("WRITE_SET mismatch: " + 
TxnDbUtil.queryToString("select * from WRITE_SET"),
-      1, TxnDbUtil.countQueryAgent("select count(*) from WRITE_SET where 
ws_partition='p=two' and ws_operation_type='u' and ws_table='tab1'"));
+      1, TxnDbUtil.countQueryAgent("select count(*) from WRITE_SET where 
ws_partition='p=two' and ws_operation_type='d' and ws_table='tab1'"));
     Assert.assertEquals("COMPLETED_TXN_COMPONENTS mismatch: " + 
TxnDbUtil.queryToString("select * from COMPLETED_TXN_COMPONENTS"),
       4, TxnDbUtil.countQueryAgent("select count(*) from 
COMPLETED_TXN_COMPONENTS where ctc_table='tab1' and ctc_partition is not 
null"));
   }
@@ -1180,8 +1209,10 @@ public class TestDbTxnManager2 {
     checkLock(LockType.SHARED_WRITE, LockState.WAITING, "default", "TAB1", 
"p=two", locks.get(2));
 
     //this simulates the completion of txnid:2
-    txnHandler.addDynamicPartitions(new 
AddDynamicPartitions(txnMgr2.getCurrentTxnId(), "default", "tab1",
-      Collections.singletonList("p=two")));
+    AddDynamicPartitions adp = new 
AddDynamicPartitions(txnMgr2.getCurrentTxnId(), "default", "tab1",
+      Collections.singletonList("p=two"));
+    adp.setOperationType(DataOperationType.UPDATE);
+    txnHandler.addDynamicPartitions(adp);
     txnMgr2.commitTxn();//txnid:2
 
     
((DbLockManager)txnMgr.getLockManager()).checkLock(locks.get(2).getLockid());//retest
 WAITING locks (both have same ext id)
@@ -1189,8 +1220,10 @@ public class TestDbTxnManager2 {
     Assert.assertEquals("Unexpected lock count", 1, locks.size());
     checkLock(LockType.SHARED_WRITE, LockState.ACQUIRED, "default", "TAB1", 
"p=two", locks.get(0));
     //completion of txnid:3
-    txnHandler.addDynamicPartitions(new 
AddDynamicPartitions(txnMgr.getCurrentTxnId(), "default", "tab1",
-      Collections.singletonList("p=two")));
+    adp = new AddDynamicPartitions(txnMgr.getCurrentTxnId(), "default", "tab1",
+      Collections.singletonList("p=two"));
+    adp.setOperationType(DataOperationType.DELETE);
+    txnHandler.addDynamicPartitions(adp);
     LockException exception = null;
     try {
       txnMgr.commitTxn();//txnid:3
@@ -1210,11 +1243,7 @@ public class TestDbTxnManager2 {
   }
   /**
    * Concurrent delte/detele of same partition - should pass
-   * This test doesn't work yet, because we don't yet pass in operation type
-   * 
-   * todo: Concurrent insert/update of same partition - should pass
    */
-  @Ignore("HIVE-13622")
   @Test
   public void testWriteSetTracking11() throws Exception {
     CommandProcessorResponse cpr = driver.run("create table if not exists tab1 
(a int, b int) partitioned by (p string) " +
@@ -1232,46 +1261,86 @@ public class TestDbTxnManager2 {
 
     //now start concurrent txn
     txnMgr.openTxn("T3");
+    checkCmdOnDriver(driver.compileAndRespond("select * from tab1 where b=1 
and p='one'"));
+    ((DbTxnManager)txnMgr).acquireLocks(driver.getPlan(), ctx, "T3", false);
     checkCmdOnDriver(driver.compileAndRespond("delete from tab1 where p='two' 
and b=2"));
     ((DbTxnManager)txnMgr).acquireLocks(driver.getPlan(), ctx, "T3", false);
     locks = getLocks(txnMgr);
-    Assert.assertEquals("Unexpected lock count", 3, locks.size());
-    checkLock(LockType.SHARED_WRITE, LockState.ACQUIRED, "default", "TAB1", 
"p=two", locks.get(0));
-    checkLock(LockType.SHARED_WRITE, LockState.ACQUIRED, "default", "TAB1", 
"p=one", locks.get(1));
-    checkLock(LockType.SHARED_WRITE, LockState.WAITING, "default", "TAB1", 
"p=two", locks.get(2));
+    Assert.assertEquals("Unexpected lock count", 5, locks.size());
+    checkLock(LockType.SHARED_READ, LockState.ACQUIRED, "default", "TAB1", 
null, locks.get(0));
+    checkLock(LockType.SHARED_READ, LockState.ACQUIRED, "default", "TAB1", 
"p=one", locks.get(1));
+    checkLock(LockType.SHARED_WRITE, LockState.ACQUIRED, "default", "TAB1", 
"p=two", locks.get(2));
+    checkLock(LockType.SHARED_WRITE, LockState.ACQUIRED, "default", "TAB1", 
"p=one", locks.get(3));
+    checkLock(LockType.SHARED_WRITE, LockState.WAITING, "default", "TAB1", 
"p=two", locks.get(4));
 
     //this simulates the completion of txnid:2
-    txnHandler.addDynamicPartitions(new 
AddDynamicPartitions(txnMgr2.getCurrentTxnId(), "default", "tab1",
-      Collections.singletonList("p=two")));
+    AddDynamicPartitions adp = new 
AddDynamicPartitions(txnMgr2.getCurrentTxnId(), "default", "tab1",
+      Collections.singletonList("p=two"));
+    adp.setOperationType(DataOperationType.DELETE);
+    txnHandler.addDynamicPartitions(adp);
     txnMgr2.commitTxn();//txnid:2
 
-    
((DbLockManager)txnMgr.getLockManager()).checkLock(locks.get(2).getLockid());//retest
 WAITING locks (both have same ext id)
+    
((DbLockManager)txnMgr.getLockManager()).checkLock(locks.get(4).getLockid());//retest
 WAITING locks (both have same ext id)
     locks = getLocks(txnMgr);
-    Assert.assertEquals("Unexpected lock count", 1, locks.size());
-    checkLock(LockType.SHARED_WRITE, LockState.ACQUIRED, "default", "TAB1", 
"p=two", locks.get(0));
+    Assert.assertEquals("Unexpected lock count", 3, locks.size());
+    checkLock(LockType.SHARED_READ, LockState.ACQUIRED, "default", "TAB1", 
null, locks.get(0));
+    checkLock(LockType.SHARED_READ, LockState.ACQUIRED, "default", "TAB1", 
"p=one", locks.get(1));
+    checkLock(LockType.SHARED_WRITE, LockState.ACQUIRED, "default", "TAB1", 
"p=two", locks.get(2));
     //completion of txnid:3
-    txnHandler.addDynamicPartitions(new 
AddDynamicPartitions(txnMgr.getCurrentTxnId(), "default", "tab1",
-      Collections.singletonList("p=two")));
-    LockException exception = null;
-    try {
-      txnMgr.commitTxn();//txnid:3
-    }
-    catch(LockException e) {
-      exception = e;
-    }
-    Assert.assertNotEquals("Expected exception", null, exception);
-    Assert.assertEquals("Exception msg doesn't match",
-      "Aborting [txnid:3,3] due to a write conflict on default/tab1/p=two 
committed by [txnid:2,3]",
-      exception.getCause().getMessage());
+    adp = new AddDynamicPartitions(txnMgr.getCurrentTxnId(), "default", "tab1",
+      Collections.singletonList("p=two"));
+    adp.setOperationType(DataOperationType.DELETE);
+    txnHandler.addDynamicPartitions(adp);
+    txnMgr.commitTxn();//txnid:3
 
-    //todo: this currently fails since we don't yet set operation type properly
     Assert.assertEquals("WRITE_SET mismatch: " + 
TxnDbUtil.queryToString("select * from WRITE_SET"),
-      1, TxnDbUtil.countQueryAgent("select count(*) from WRITE_SET where 
ws_partition='p=two' and ws_operation_type='d' and ws_table='tab1'"));
+      1, TxnDbUtil.countQueryAgent("select count(*) from WRITE_SET where 
ws_partition='p=two' and ws_operation_type='d' and ws_table='tab1' and 
ws_txnid=2"));
     Assert.assertEquals("WRITE_SET mismatch: " + 
TxnDbUtil.queryToString("select * from WRITE_SET"),
-      1, TxnDbUtil.countQueryAgent("select count(*) from WRITE_SET where 
ws_partition='p=two' and ws_operation_type='d' and ws_table='tab1'"));
+      1, TxnDbUtil.countQueryAgent("select count(*) from WRITE_SET where 
ws_partition='p=two' and ws_operation_type='d' and ws_table='tab1' and 
ws_txnid=3"));
+    Assert.assertEquals("WRITE_SET mismatch: " + 
TxnDbUtil.queryToString("select * from WRITE_SET"),
+      1, TxnDbUtil.countQueryAgent("select count(*) from WRITE_SET where 
ws_partition='p=two' and ws_operation_type='d' and ws_table='tab1' and 
ws_txnid=2"));
+    Assert.assertEquals("WRITE_SET mismatch: " + 
TxnDbUtil.queryToString("select * from WRITE_SET"),
+      1, TxnDbUtil.countQueryAgent("select count(*) from WRITE_SET where 
ws_partition='p=two' and ws_operation_type='d' and ws_table='tab1' and 
ws_txnid=3"));
     Assert.assertEquals("COMPLETED_TXN_COMPONENTS mismatch: " + 
TxnDbUtil.queryToString("select * from COMPLETED_TXN_COMPONENTS"),
       4, TxnDbUtil.countQueryAgent("select count(*) from 
COMPLETED_TXN_COMPONENTS where ctc_table='tab1' and ctc_partition is not 
null"));
   }
+  @Test
+  public void testCompletedTxnComponents() throws Exception {
+    CommandProcessorResponse cpr = driver.run("create table if not exists tab1 
(a int, b int) partitioned by (p string) " +
+      "clustered by (a) into 2  buckets stored as orc TBLPROPERTIES 
('transactional'='true')");
+    checkCmdOnDriver(cpr);
+    cpr = driver.run("create table if not exists tab_not_acid2 (a int, b 
int)");
+    checkCmdOnDriver(cpr);
+    checkCmdOnDriver(driver.run("insert into tab_not_acid2 
values(1,1),(2,2)"));
+    //writing both acid and non-acid resources in the same txn
+    checkCmdOnDriver(driver.run("from tab_not_acid2 insert into tab1 
partition(p='two')(a,b) select a,b insert into tab_not_acid2(a,b) select a,b 
"));//txnid:1
+    Assert.assertEquals(TxnDbUtil.queryToString("select * from 
COMPLETED_TXN_COMPONENTS"),
+      1, TxnDbUtil.countQueryAgent("select count(*) from 
COMPLETED_TXN_COMPONENTS"));
+    //only expect transactional components to be in COMPLETED_TXN_COMPONENTS
+    Assert.assertEquals(TxnDbUtil.queryToString("select * from 
COMPLETED_TXN_COMPONENTS"),
+      1, TxnDbUtil.countQueryAgent("select count(*) from 
COMPLETED_TXN_COMPONENTS where ctc_txnid=1 and ctc_table='tab1'"));
+  }
+  @Test
+  public void testMultiInsert() throws Exception {
+    CommandProcessorResponse cpr = driver.run("create table if not exists tab1 
(a int, b int) partitioned by (p string) " +
+      "clustered by (a) into 2  buckets stored as orc TBLPROPERTIES 
('transactional'='true')");
+    checkCmdOnDriver(cpr);
+    cpr = driver.run("create table if not exists tab_not_acid (a int, b int, p 
string)");
+    checkCmdOnDriver(cpr);
+    checkCmdOnDriver(driver.run("insert into tab_not_acid 
values(1,1,'one'),(2,2,'two')"));
+    checkCmdOnDriver(driver.run("insert into tab1 partition(p) 
values(3,3,'one'),(4,4,'two')"));//txinid:1
+    //writing both acid and non-acid resources in the same txn
+    //tab1 write is a dynamic partition insert
+    checkCmdOnDriver(driver.run("from tab_not_acid insert into tab1 
partition(p)(a,b,p) select a,b,p insert into tab_not_acid(a,b) select a,b where 
p='two'"));//txnid:2
+    Assert.assertEquals(TxnDbUtil.queryToString("select * from 
COMPLETED_TXN_COMPONENTS"),
+      4, TxnDbUtil.countQueryAgent("select count(*) from 
COMPLETED_TXN_COMPONENTS"));
+    //only expect transactional components to be in COMPLETED_TXN_COMPONENTS
+    Assert.assertEquals(TxnDbUtil.queryToString("select * from 
COMPLETED_TXN_COMPONENTS"),
+      2, TxnDbUtil.countQueryAgent("select count(*) from 
COMPLETED_TXN_COMPONENTS where ctc_txnid=2"));
+    Assert.assertEquals(TxnDbUtil.queryToString("select * from 
COMPLETED_TXN_COMPONENTS"),
+      2, TxnDbUtil.countQueryAgent("select count(*) from 
COMPLETED_TXN_COMPONENTS where ctc_txnid=2 and ctc_table='tab1'"));
+  }
+  //todo: Concurrent insert/update of same partition - should pass
 
   private List<ShowLocksResponseElement> 
getLocksWithFilterOptions(HiveTxnManager txnMgr,
       String dbName, String tblName, Map<String, String> partSpec) throws 
Exception {

http://git-wip-us.apache.org/repos/asf/hive/blob/c0b532fc/ql/src/test/org/apache/hadoop/hive/ql/txn/compactor/TestCleaner.java
----------------------------------------------------------------------
diff --git 
a/ql/src/test/org/apache/hadoop/hive/ql/txn/compactor/TestCleaner.java 
b/ql/src/test/org/apache/hadoop/hive/ql/txn/compactor/TestCleaner.java
index 8d75ab3..b797b55 100644
--- a/ql/src/test/org/apache/hadoop/hive/ql/txn/compactor/TestCleaner.java
+++ b/ql/src/test/org/apache/hadoop/hive/ql/txn/compactor/TestCleaner.java
@@ -208,6 +208,7 @@ public class TestCleaner extends CompactorTest {
 
     LockComponent comp = new LockComponent(LockType.SHARED_READ, 
LockLevel.TABLE, "default");
     comp.setTablename("bblt");
+    comp.setOperationType(DataOperationType.SELECT);
     List<LockComponent> components = new ArrayList<LockComponent>(1);
     components.add(comp);
     LockRequest req = new LockRequest(components, "me", "localhost");
@@ -246,6 +247,7 @@ public class TestCleaner extends CompactorTest {
     LockComponent comp = new LockComponent(LockType.SHARED_WRITE, 
LockLevel.PARTITION, "default");
     comp.setTablename("bblp");
     comp.setPartitionname("ds=today");
+    comp.setOperationType(DataOperationType.DELETE);
     List<LockComponent> components = new ArrayList<LockComponent>(1);
     components.add(comp);
     LockRequest req = new LockRequest(components, "me", "localhost");
@@ -288,6 +290,7 @@ public class TestCleaner extends CompactorTest {
 
     LockComponent comp = new LockComponent(LockType.SHARED_READ, 
LockLevel.TABLE, "default");
     comp.setTablename("bblt");
+    comp.setOperationType(DataOperationType.INSERT);
     List<LockComponent> components = new ArrayList<LockComponent>(1);
     components.add(comp);
     LockRequest req = new LockRequest(components, "me", "localhost");
@@ -311,6 +314,7 @@ public class TestCleaner extends CompactorTest {
     // clean request
     LockComponent comp2 = new LockComponent(LockType.SHARED_READ, 
LockLevel.TABLE, "default");
     comp2.setTablename("bblt");
+    comp.setOperationType(DataOperationType.SELECT);
     List<LockComponent> components2 = new ArrayList<LockComponent>(1);
     components2.add(comp2);
     LockRequest req2 = new LockRequest(components, "me", "localhost");
@@ -360,6 +364,7 @@ public class TestCleaner extends CompactorTest {
     LockComponent comp = new LockComponent(LockType.SHARED_READ, 
LockLevel.PARTITION, "default");
     comp.setTablename("bblt");
     comp.setPartitionname("ds=today");
+    comp.setOperationType(DataOperationType.INSERT);
     List<LockComponent> components = new ArrayList<LockComponent>(1);
     components.add(comp);
     LockRequest req = new LockRequest(components, "me", "localhost");
@@ -385,6 +390,7 @@ public class TestCleaner extends CompactorTest {
     LockComponent comp2 = new LockComponent(LockType.SHARED_READ, 
LockLevel.PARTITION, "default");
     comp2.setTablename("bblt");
     comp2.setPartitionname("ds=today");
+    comp.setOperationType(DataOperationType.SELECT);
     List<LockComponent> components2 = new ArrayList<LockComponent>(1);
     components2.add(comp2);
     LockRequest req2 = new LockRequest(components, "me", "localhost");

http://git-wip-us.apache.org/repos/asf/hive/blob/c0b532fc/ql/src/test/org/apache/hadoop/hive/ql/txn/compactor/TestInitiator.java
----------------------------------------------------------------------
diff --git 
a/ql/src/test/org/apache/hadoop/hive/ql/txn/compactor/TestInitiator.java 
b/ql/src/test/org/apache/hadoop/hive/ql/txn/compactor/TestInitiator.java
index f84bd7e..bbd2bf8 100644
--- a/ql/src/test/org/apache/hadoop/hive/ql/txn/compactor/TestInitiator.java
+++ b/ql/src/test/org/apache/hadoop/hive/ql/txn/compactor/TestInitiator.java
@@ -110,6 +110,7 @@ public class TestInitiator extends CompactorTest {
       long txnid = openTxn();
       LockComponent comp = new LockComponent(LockType.SHARED_WRITE, 
LockLevel.TABLE,  "default");
       comp.setTablename("mcottma");
+      comp.setOperationType(DataOperationType.UPDATE);
       List<LockComponent> components = new ArrayList<LockComponent>(1);
       components.add(comp);
       LockRequest req = new LockRequest(components, "me", "localhost");
@@ -140,6 +141,7 @@ public class TestInitiator extends CompactorTest {
       LockComponent comp = new LockComponent(LockType.SHARED_WRITE, 
LockLevel.TABLE, "default");
       comp.setTablename("mcoptma");
       comp.setPartitionname("ds=today");
+      comp.setOperationType(DataOperationType.DELETE);
       List<LockComponent> components = new ArrayList<LockComponent>(1);
       components.add(comp);
       LockRequest req = new LockRequest(components, "me", "localhost");
@@ -173,6 +175,7 @@ public class TestInitiator extends CompactorTest {
       LockComponent comp = new LockComponent(LockType.SHARED_WRITE, 
LockLevel.TABLE,  "default");
       comp.setTablename("ncomdpa");
       comp.setPartitionname("ds=day-" + i);
+      comp.setOperationType(DataOperationType.UPDATE);
       List<LockComponent> components = new ArrayList<LockComponent>(1);
       components.add(comp);
       LockRequest req = new LockRequest(components, "me", "localhost");
@@ -197,6 +200,7 @@ public class TestInitiator extends CompactorTest {
     long txnid = openTxn();
     LockComponent comp = new LockComponent(LockType.SHARED_WRITE, 
LockLevel.TABLE, "default");
     comp.setTablename("ceat");
+    comp.setOperationType(DataOperationType.UPDATE);
     List<LockComponent> components = new ArrayList<LockComponent>(1);
     components.add(comp);
     LockRequest req = new LockRequest(components, "me", "localhost");
@@ -229,6 +233,7 @@ public class TestInitiator extends CompactorTest {
       long txnid = openTxn();
       LockComponent comp = new LockComponent(LockType.SHARED_WRITE, 
LockLevel.TABLE, "default");
       comp.setTablename("ncwncs");
+      comp.setOperationType(DataOperationType.UPDATE);
       List<LockComponent> components = new ArrayList<LockComponent>(1);
       components.add(comp);
       LockRequest req = new LockRequest(components, "me", "localhost");
@@ -254,6 +259,7 @@ public class TestInitiator extends CompactorTest {
     for (int i = 0; i < 11; i++) {
       long txnid = openTxn();
       LockComponent comp = new LockComponent(LockType.SHARED_WRITE, 
LockLevel.TABLE, "default");
+      comp.setOperationType(DataOperationType.DELETE);
       comp.setTablename("ncwncs");
       List<LockComponent> components = new ArrayList<LockComponent>(1);
       components.add(comp);
@@ -279,6 +285,7 @@ public class TestInitiator extends CompactorTest {
       long txnid = openTxn();
       LockComponent comp = new LockComponent(LockType.SHARED_WRITE, 
LockLevel.TABLE,  "default");
       comp.setTablename("ncwcas");
+      comp.setOperationType(DataOperationType.UPDATE);
       List<LockComponent> components = new ArrayList<LockComponent>(1);
       components.add(comp);
       LockRequest req = new LockRequest(components, "me", "localhost");
@@ -319,6 +326,7 @@ public class TestInitiator extends CompactorTest {
     long txnid = openTxn();
     LockComponent comp = new LockComponent(LockType.SHARED_WRITE, 
LockLevel.TABLE, "default");
     comp.setTablename("cthdp");
+    comp.setOperationType(DataOperationType.UPDATE);
     List<LockComponent> components = new ArrayList<LockComponent>(1);
     components.add(comp);
     LockRequest req = new LockRequest(components, "me", "localhost");
@@ -351,6 +359,7 @@ public class TestInitiator extends CompactorTest {
     LockComponent comp = new LockComponent(LockType.SHARED_WRITE, 
LockLevel.PARTITION, "default");
     comp.setTablename("cphdp");
     comp.setPartitionname("ds=today");
+    comp.setOperationType(DataOperationType.UPDATE);
     List<LockComponent> components = new ArrayList<LockComponent>(1);
     components.add(comp);
     LockRequest req = new LockRequest(components, "me", "localhost");
@@ -382,6 +391,7 @@ public class TestInitiator extends CompactorTest {
     long txnid = openTxn();
     LockComponent comp = new LockComponent(LockType.SHARED_WRITE, 
LockLevel.TABLE, "default");
     comp.setTablename("nctdpnhe");
+    comp.setOperationType(DataOperationType.UPDATE);
     List<LockComponent> components = new ArrayList<LockComponent>(1);
     components.add(comp);
     LockRequest req = new LockRequest(components, "me", "localhost");
@@ -417,6 +427,7 @@ public class TestInitiator extends CompactorTest {
     long txnid = openTxn();
     LockComponent comp = new LockComponent(LockType.SHARED_WRITE, 
LockLevel.TABLE, "default");
     comp.setTablename("cttmd");
+    comp.setOperationType(DataOperationType.UPDATE);
     List<LockComponent> components = new ArrayList<LockComponent>(1);
     components.add(comp);
     LockRequest req = new LockRequest(components, "me", "localhost");
@@ -458,6 +469,7 @@ public class TestInitiator extends CompactorTest {
     LockComponent comp = new LockComponent(LockType.SHARED_WRITE, 
LockLevel.PARTITION, "default");
     comp.setTablename("cptmd");
     comp.setPartitionname("ds=today");
+    comp.setOperationType(DataOperationType.UPDATE);
     List<LockComponent> components = new ArrayList<LockComponent>(1);
     components.add(comp);
     LockRequest req = new LockRequest(components, "me", "localhost");
@@ -489,6 +501,7 @@ public class TestInitiator extends CompactorTest {
     long txnid = openTxn();
     LockComponent comp = new LockComponent(LockType.SHARED_WRITE, 
LockLevel.TABLE, "default");
     comp.setTablename("nctned");
+    comp.setOperationType(DataOperationType.UPDATE);
     List<LockComponent> components = new ArrayList<LockComponent>(1);
     components.add(comp);
     LockRequest req = new LockRequest(components, "me", "localhost");
@@ -524,6 +537,7 @@ public class TestInitiator extends CompactorTest {
     long txnid = openTxn();
     LockComponent comp = new LockComponent(LockType.SHARED_WRITE, 
LockLevel.TABLE, "default");
     comp.setTablename("cmomwbv");
+    comp.setOperationType(DataOperationType.UPDATE);
     List<LockComponent> components = new ArrayList<LockComponent>(1);
     components.add(comp);
     LockRequest req = new LockRequest(components, "me", "localhost");
@@ -564,6 +578,7 @@ public class TestInitiator extends CompactorTest {
     LockComponent comp = new LockComponent(LockType.SHARED_WRITE, 
LockLevel.PARTITION, "default");
     comp.setTablename("ednb");
     comp.setPartitionname("ds=today");
+    comp.setOperationType(DataOperationType.DELETE);
     List<LockComponent> components = new ArrayList<LockComponent>(1);
     components.add(comp);
     LockRequest req = new LockRequest(components, "me", "localhost");
@@ -597,6 +612,7 @@ public class TestInitiator extends CompactorTest {
     LockComponent comp = new LockComponent(LockType.SHARED_WRITE, 
LockLevel.PARTITION, "default");
     comp.setTablename("ttospgocr");
     comp.setPartitionname("ds=today");
+    comp.setOperationType(DataOperationType.UPDATE);
     List<LockComponent> components = new ArrayList<LockComponent>(1);
     components.add(comp);
     LockRequest req = new LockRequest(components, "me", "localhost");
@@ -608,6 +624,7 @@ public class TestInitiator extends CompactorTest {
     comp = new LockComponent(LockType.SHARED_WRITE, LockLevel.PARTITION, 
"default");
     comp.setTablename("ttospgocr");
     comp.setPartitionname("ds=today");
+    comp.setOperationType(DataOperationType.UPDATE);
     components = new ArrayList<LockComponent>(1);
     components.add(comp);
     req = new LockRequest(components, "me", "localhost");
@@ -640,6 +657,7 @@ public class TestInitiator extends CompactorTest {
     long txnid = openTxn();
     LockComponent comp = new LockComponent(LockType.SHARED_WRITE, 
LockLevel.TABLE, "default");
     comp.setTablename("nctdp");
+    comp.setOperationType(DataOperationType.UPDATE);
     List<LockComponent> components = new ArrayList<LockComponent>(1);
     components.add(comp);
     LockRequest req = new LockRequest(components, "me", "localhost");
@@ -667,6 +685,7 @@ public class TestInitiator extends CompactorTest {
     long txnid = openTxn();
     LockComponent comp = new LockComponent(LockType.SHARED_WRITE, 
LockLevel.PARTITION, "default");
     comp.setTablename("dt");
+    comp.setOperationType(DataOperationType.UPDATE);
     List<LockComponent> components = new ArrayList<LockComponent>(1);
     components.add(comp);
     LockRequest req = new LockRequest(components, "me", "localhost");
@@ -698,6 +717,7 @@ public class TestInitiator extends CompactorTest {
     LockComponent comp = new LockComponent(LockType.SHARED_WRITE, 
LockLevel.PARTITION, "default");
     comp.setTablename("dp");
     comp.setPartitionname("ds=today");
+    comp.setOperationType(DataOperationType.UPDATE);
     List<LockComponent> components = new ArrayList<LockComponent>(1);
     components.add(comp);
     LockRequest req = new LockRequest(components, "me", "localhost");

http://git-wip-us.apache.org/repos/asf/hive/blob/c0b532fc/service/src/gen/thrift/gen-py/hive_service/ThriftHive-remote
----------------------------------------------------------------------
diff --git a/service/src/gen/thrift/gen-py/hive_service/ThriftHive-remote 
b/service/src/gen/thrift/gen-py/hive_service/ThriftHive-remote
index af2d93d..cbdbd56 100755
--- a/service/src/gen/thrift/gen-py/hive_service/ThriftHive-remote
+++ b/service/src/gen/thrift/gen-py/hive_service/ThriftHive-remote
@@ -145,6 +145,7 @@ if len(sys.argv) <= 1 or sys.argv[1] == '--help':
   print('  GetOpenTxnsInfoResponse get_open_txns_info()')
   print('  OpenTxnsResponse open_txns(OpenTxnRequest rqst)')
   print('  void abort_txn(AbortTxnRequest rqst)')
+  print('  void abort_txns(AbortTxnsRequest rqst)')
   print('  void commit_txn(CommitTxnRequest rqst)')
   print('  LockResponse lock(LockRequest rqst)')
   print('  LockResponse check_lock(CheckLockRequest rqst)')
@@ -953,6 +954,12 @@ elif cmd == 'abort_txn':
     sys.exit(1)
   pp.pprint(client.abort_txn(eval(args[0]),))
 
+elif cmd == 'abort_txns':
+  if len(args) != 1:
+    print('abort_txns requires 1 args')
+    sys.exit(1)
+  pp.pprint(client.abort_txns(eval(args[0]),))
+
 elif cmd == 'commit_txn':
   if len(args) != 1:
     print('commit_txn requires 1 args')

Reply via email to