Repository: hive Updated Branches: refs/heads/master ffa69a22d -> f25b86520
http://git-wip-us.apache.org/repos/asf/hive/blob/f25b8652/metastore/src/test/org/apache/hadoop/hive/metastore/txn/TestCompactionTxnHandler.java ---------------------------------------------------------------------- diff --git a/metastore/src/test/org/apache/hadoop/hive/metastore/txn/TestCompactionTxnHandler.java b/metastore/src/test/org/apache/hadoop/hive/metastore/txn/TestCompactionTxnHandler.java index 80e3cd6..f513d0f 100644 --- a/metastore/src/test/org/apache/hadoop/hive/metastore/txn/TestCompactionTxnHandler.java +++ b/metastore/src/test/org/apache/hadoop/hive/metastore/txn/TestCompactionTxnHandler.java @@ -23,6 +23,7 @@ import org.apache.hadoop.hive.metastore.api.AddDynamicPartitions; import org.apache.hadoop.hive.metastore.api.CommitTxnRequest; import org.apache.hadoop.hive.metastore.api.CompactionRequest; import org.apache.hadoop.hive.metastore.api.CompactionType; +import org.apache.hadoop.hive.metastore.api.DataOperationType; import org.apache.hadoop.hive.metastore.api.GetOpenTxnsResponse; import org.apache.hadoop.hive.metastore.api.LockComponent; import org.apache.hadoop.hive.metastore.api.LockLevel; @@ -286,12 +287,14 @@ public class TestCompactionTxnHandler { LockComponent comp = new LockComponent(LockType.SHARED_WRITE, LockLevel.DB, "mydb"); comp.setTablename("mytable"); + comp.setOperationType(DataOperationType.UPDATE); List<LockComponent> components = new ArrayList<LockComponent>(1); components.add(comp); comp = new LockComponent(LockType.SHARED_WRITE, LockLevel.DB, "mydb"); comp.setTablename("yourtable"); comp.setPartitionname("mypartition"); + comp.setOperationType(DataOperationType.UPDATE); components.add(comp); LockRequest req = new LockRequest(components, "me", "localhost"); req.setTxnid(txnid); @@ -322,6 +325,7 @@ public class TestCompactionTxnHandler { LockComponent comp = new LockComponent(LockType.SHARED_WRITE, LockLevel.DB, "mydb"); comp.setTablename("mytable"); + comp.setOperationType(DataOperationType.INSERT); List<LockComponent> components = new ArrayList<LockComponent>(1); components.add(comp); LockRequest req = new LockRequest(components, "me", "localhost"); @@ -333,6 +337,7 @@ public class TestCompactionTxnHandler { txnid = openTxn(); comp = new LockComponent(LockType.SHARED_WRITE, LockLevel.DB, "mydb"); comp.setTablename("yourtable"); + comp.setOperationType(DataOperationType.DELETE); components = new ArrayList<LockComponent>(1); components.add(comp); req = new LockRequest(components, "me", "localhost"); @@ -345,6 +350,7 @@ public class TestCompactionTxnHandler { comp = new LockComponent(LockType.SHARED_WRITE, LockLevel.DB, "mydb"); comp.setTablename("foo"); comp.setPartitionname("bar"); + comp.setOperationType(DataOperationType.UPDATE); components = new ArrayList<LockComponent>(1); components.add(comp); req = new LockRequest(components, "me", "localhost"); @@ -355,6 +361,7 @@ public class TestCompactionTxnHandler { comp = new LockComponent(LockType.SHARED_WRITE, LockLevel.DB, "mydb"); comp.setTablename("foo"); comp.setPartitionname("baz"); + comp.setOperationType(DataOperationType.UPDATE); components = new ArrayList<LockComponent>(1); components.add(comp); req = new LockRequest(components, "me", "localhost"); @@ -411,13 +418,17 @@ public class TestCompactionTxnHandler { // lock a table, as in dynamic partitions LockComponent lc = new LockComponent(LockType.SHARED_WRITE, LockLevel.TABLE, dbName); lc.setTablename(tableName); + DataOperationType dop = DataOperationType.UPDATE; + lc.setOperationType(dop); LockRequest lr = new LockRequest(Arrays.asList(lc), "me", "localhost"); lr.setTxnid(txnId); LockResponse lock = txnHandler.lock(lr); assertEquals(LockState.ACQUIRED, lock.getState()); - txnHandler.addDynamicPartitions(new AddDynamicPartitions(txnId, dbName, tableName, - Arrays.asList("ds=yesterday", "ds=today"))); + AddDynamicPartitions adp = new AddDynamicPartitions(txnId, dbName, tableName, + Arrays.asList("ds=yesterday", "ds=today")); + adp.setOperationType(dop); + txnHandler.addDynamicPartitions(adp); txnHandler.commitTxn(new CommitTxnRequest(txnId)); Set<CompactionInfo> potentials = txnHandler.findPotentialCompactions(1000); http://git-wip-us.apache.org/repos/asf/hive/blob/f25b8652/metastore/src/test/org/apache/hadoop/hive/metastore/txn/TestTxnHandler.java ---------------------------------------------------------------------- diff --git a/metastore/src/test/org/apache/hadoop/hive/metastore/txn/TestTxnHandler.java b/metastore/src/test/org/apache/hadoop/hive/metastore/txn/TestTxnHandler.java index 1a118a9..2804e21 100644 --- a/metastore/src/test/org/apache/hadoop/hive/metastore/txn/TestTxnHandler.java +++ b/metastore/src/test/org/apache/hadoop/hive/metastore/txn/TestTxnHandler.java @@ -23,6 +23,7 @@ import org.apache.hadoop.hive.metastore.api.CheckLockRequest; import org.apache.hadoop.hive.metastore.api.CommitTxnRequest; import org.apache.hadoop.hive.metastore.api.CompactionRequest; import org.apache.hadoop.hive.metastore.api.CompactionType; +import org.apache.hadoop.hive.metastore.api.DataOperationType; import org.apache.hadoop.hive.metastore.api.GetOpenTxnsInfoResponse; import org.apache.hadoop.hive.metastore.api.GetOpenTxnsResponse; import org.apache.hadoop.hive.metastore.api.HeartbeatRequest; @@ -216,6 +217,7 @@ public class TestTxnHandler { public void testLockDifferentDBs() throws Exception { // Test that two different databases don't collide on their locks LockComponent comp = new LockComponent(LockType.EXCLUSIVE, LockLevel.DB, "mydb"); + comp.setOperationType(DataOperationType.NO_TXN); List<LockComponent> components = new ArrayList<LockComponent>(1); components.add(comp); LockRequest req = new LockRequest(components, "me", "localhost"); @@ -223,6 +225,7 @@ public class TestTxnHandler { assertTrue(res.getState() == LockState.ACQUIRED); comp = new LockComponent(LockType.EXCLUSIVE, LockLevel.DB, "yourdb"); + comp.setOperationType(DataOperationType.NO_TXN); components.clear(); components.add(comp); req = new LockRequest(components, "me", "localhost"); @@ -234,6 +237,7 @@ public class TestTxnHandler { public void testLockSameDB() throws Exception { // Test that two different databases don't collide on their locks LockComponent comp = new LockComponent(LockType.EXCLUSIVE, LockLevel.DB, "mydb"); + comp.setOperationType(DataOperationType.NO_TXN); List<LockComponent> components = new ArrayList<LockComponent>(1); components.add(comp); LockRequest req = new LockRequest(components, "me", "localhost"); @@ -241,6 +245,7 @@ public class TestTxnHandler { assertTrue(res.getState() == LockState.ACQUIRED); comp = new LockComponent(LockType.EXCLUSIVE, LockLevel.DB, "mydb"); + comp.setOperationType(DataOperationType.NO_TXN); components.clear(); components.add(comp); req = new LockRequest(components, "me", "localhost"); @@ -252,6 +257,7 @@ public class TestTxnHandler { public void testLockDbLocksTable() throws Exception { // Test that locking a database prevents locking of tables in the database LockComponent comp = new LockComponent(LockType.EXCLUSIVE, LockLevel.DB, "mydb"); + comp.setOperationType(DataOperationType.NO_TXN); List<LockComponent> components = new ArrayList<LockComponent>(1); components.add(comp); LockRequest req = new LockRequest(components, "me", "localhost"); @@ -259,6 +265,7 @@ public class TestTxnHandler { assertTrue(res.getState() == LockState.ACQUIRED); comp = new LockComponent(LockType.EXCLUSIVE, LockLevel.DB, "mydb"); + comp.setOperationType(DataOperationType.NO_TXN); comp.setTablename("mytable"); components.clear(); components.add(comp); @@ -271,6 +278,7 @@ public class TestTxnHandler { public void testLockDbDoesNotLockTableInDifferentDB() throws Exception { // Test that locking a database prevents locking of tables in the database LockComponent comp = new LockComponent(LockType.EXCLUSIVE, LockLevel.DB, "mydb"); + comp.setOperationType(DataOperationType.NO_TXN); List<LockComponent> components = new ArrayList<LockComponent>(1); components.add(comp); LockRequest req = new LockRequest(components, "me", "localhost"); @@ -278,6 +286,7 @@ public class TestTxnHandler { assertTrue(res.getState() == LockState.ACQUIRED); comp = new LockComponent(LockType.EXCLUSIVE, LockLevel.DB, "yourdb"); + comp.setOperationType(DataOperationType.NO_TXN); comp.setTablename("mytable"); components.clear(); components.add(comp); @@ -290,6 +299,7 @@ public class TestTxnHandler { public void testLockDifferentTables() throws Exception { // Test that two different tables don't collide on their locks LockComponent comp = new LockComponent(LockType.EXCLUSIVE, LockLevel.DB, "mydb"); + comp.setOperationType(DataOperationType.NO_TXN); comp.setTablename("mytable"); List<LockComponent> components = new ArrayList<LockComponent>(1); components.add(comp); @@ -298,6 +308,7 @@ public class TestTxnHandler { assertTrue(res.getState() == LockState.ACQUIRED); comp = new LockComponent(LockType.EXCLUSIVE, LockLevel.DB, "mydb"); + comp.setOperationType(DataOperationType.NO_TXN); comp.setTablename("yourtable"); components.clear(); components.add(comp); @@ -311,6 +322,7 @@ public class TestTxnHandler { // Test that two different tables don't collide on their locks LockComponent comp = new LockComponent(LockType.EXCLUSIVE, LockLevel.DB, "mydb"); comp.setTablename("mytable"); + comp.setOperationType(DataOperationType.NO_TXN); List<LockComponent> components = new ArrayList<LockComponent>(1); components.add(comp); LockRequest req = new LockRequest(components, "me", "localhost"); @@ -319,6 +331,7 @@ public class TestTxnHandler { comp = new LockComponent(LockType.EXCLUSIVE, LockLevel.DB, "mydb"); comp.setTablename("mytable"); + comp.setOperationType(DataOperationType.NO_TXN); components.clear(); components.add(comp); req = new LockRequest(components, "me", "localhost"); @@ -331,6 +344,7 @@ public class TestTxnHandler { // Test that locking a table prevents locking of partitions of the table LockComponent comp = new LockComponent(LockType.EXCLUSIVE, LockLevel.DB, "mydb"); comp.setTablename("mytable"); + comp.setOperationType(DataOperationType.NO_TXN); List<LockComponent> components = new ArrayList<LockComponent>(1); components.add(comp); LockRequest req = new LockRequest(components, "me", "localhost"); @@ -340,6 +354,7 @@ public class TestTxnHandler { comp = new LockComponent(LockType.EXCLUSIVE, LockLevel.DB, "mydb"); comp.setTablename("mytable"); comp.setPartitionname("mypartition"); + comp.setOperationType(DataOperationType.NO_TXN); components.clear(); components.add(comp); req = new LockRequest(components, "me", "localhost"); @@ -352,6 +367,7 @@ public class TestTxnHandler { // Test that locking a table prevents locking of partitions of the table LockComponent comp = new LockComponent(LockType.EXCLUSIVE, LockLevel.DB, "mydb"); comp.setTablename("mytable"); + comp.setOperationType(DataOperationType.NO_TXN); List<LockComponent> components = new ArrayList<LockComponent>(1); components.add(comp); LockRequest req = new LockRequest(components, "me", "localhost"); @@ -361,6 +377,7 @@ public class TestTxnHandler { comp = new LockComponent(LockType.EXCLUSIVE, LockLevel.DB, "mydb"); comp.setTablename("yourtable"); comp.setPartitionname("mypartition"); + comp.setOperationType(DataOperationType.NO_TXN); components.clear(); components.add(comp); req = new LockRequest(components, "me", "localhost"); @@ -374,6 +391,7 @@ public class TestTxnHandler { LockComponent comp = new LockComponent(LockType.EXCLUSIVE, LockLevel.DB, "mydb"); comp.setTablename("mytable"); comp.setPartitionname("mypartition"); + comp.setOperationType(DataOperationType.NO_TXN); List<LockComponent> components = new ArrayList<LockComponent>(1); components.add(comp); LockRequest req = new LockRequest(components, "me", "localhost"); @@ -383,6 +401,7 @@ public class TestTxnHandler { comp = new LockComponent(LockType.EXCLUSIVE, LockLevel.DB, "mydb"); comp.setTablename("mytable"); comp.setPartitionname("yourpartition"); + comp.setOperationType(DataOperationType.NO_TXN); components.clear(); components.add(comp); req = new LockRequest(components, "me", "localhost"); @@ -396,6 +415,7 @@ public class TestTxnHandler { LockComponent comp = new LockComponent(LockType.EXCLUSIVE, LockLevel.DB, "mydb"); comp.setTablename("mytable"); comp.setPartitionname("mypartition"); + comp.setOperationType(DataOperationType.NO_TXN); List<LockComponent> components = new ArrayList<LockComponent>(1); components.add(comp); LockRequest req = new LockRequest(components, "me", "localhost"); @@ -405,6 +425,7 @@ public class TestTxnHandler { comp = new LockComponent(LockType.EXCLUSIVE, LockLevel.DB, "mydb"); comp.setTablename("mytable"); comp.setPartitionname("mypartition"); + comp.setOperationType(DataOperationType.NO_TXN); components.clear(); components.add(comp); req = new LockRequest(components, "me", "localhost"); @@ -418,6 +439,7 @@ public class TestTxnHandler { LockComponent comp = new LockComponent(LockType.SHARED_READ, LockLevel.DB, "mydb"); comp.setTablename("mytable"); comp.setPartitionname("mypartition"); + comp.setOperationType(DataOperationType.INSERT); List<LockComponent> components = new ArrayList<LockComponent>(1); components.add(comp); LockRequest req = new LockRequest(components, "me", "localhost"); @@ -427,6 +449,7 @@ public class TestTxnHandler { comp = new LockComponent(LockType.SHARED_READ, LockLevel.DB, "mydb"); comp.setTablename("mytable"); comp.setPartitionname("mypartition"); + comp.setOperationType(DataOperationType.SELECT); components.clear(); components.add(comp); req = new LockRequest(components, "me", "localhost"); @@ -440,6 +463,7 @@ public class TestTxnHandler { LockComponent comp = new LockComponent(LockType.EXCLUSIVE, LockLevel.DB, "mydb"); comp.setTablename("mytable"); comp.setPartitionname("mypartition"); + comp.setOperationType(DataOperationType.NO_TXN); List<LockComponent> components = new ArrayList<LockComponent>(1); components.add(comp); LockRequest req = new LockRequest(components, "me", "localhost"); @@ -449,6 +473,7 @@ public class TestTxnHandler { comp = new LockComponent(LockType.SHARED_READ, LockLevel.DB, "mydb"); comp.setTablename("mytable"); comp.setPartitionname("mypartition"); + comp.setOperationType(DataOperationType.INSERT); components.clear(); components.add(comp); req = new LockRequest(components, "me", "localhost"); @@ -458,6 +483,7 @@ public class TestTxnHandler { comp = new LockComponent(LockType.SHARED_READ, LockLevel.DB, "mydb"); comp.setTablename("mytable"); comp.setPartitionname("mypartition"); + comp.setOperationType(DataOperationType.SELECT); components.clear(); components.add(comp); req = new LockRequest(components, "me", "localhost"); @@ -471,6 +497,7 @@ public class TestTxnHandler { LockComponent comp = new LockComponent(LockType.SHARED_READ, LockLevel.DB, "mydb"); comp.setTablename("mytable"); comp.setPartitionname("mypartition"); + comp.setOperationType(DataOperationType.INSERT); List<LockComponent> components = new ArrayList<LockComponent>(1); components.add(comp); LockRequest req = new LockRequest(components, "me", "localhost"); @@ -480,6 +507,7 @@ public class TestTxnHandler { comp = new LockComponent(LockType.SHARED_WRITE, LockLevel.DB, "mydb"); comp.setTablename("mytable"); comp.setPartitionname("mypartition"); + comp.setOperationType(DataOperationType.DELETE); components.clear(); components.add(comp); req = new LockRequest(components, "me", "localhost"); @@ -494,6 +522,7 @@ public class TestTxnHandler { LockComponent comp = new LockComponent(LockType.EXCLUSIVE, LockLevel.DB, "mydb"); comp.setTablename("mytable"); comp.setPartitionname("mypartition"); + comp.setOperationType(DataOperationType.NO_TXN); List<LockComponent> components = new ArrayList<LockComponent>(1); components.add(comp); LockRequest req = new LockRequest(components, "me", "localhost"); @@ -503,6 +532,7 @@ public class TestTxnHandler { comp = new LockComponent(LockType.SHARED_READ, LockLevel.DB, "mydb"); comp.setTablename("mytable"); comp.setPartitionname("mypartition"); + comp.setOperationType(DataOperationType.SELECT); components.clear(); components.add(comp); req = new LockRequest(components, "me", "localhost"); @@ -512,6 +542,7 @@ public class TestTxnHandler { comp = new LockComponent(LockType.SHARED_WRITE, LockLevel.DB, "mydb"); comp.setTablename("mytable"); comp.setPartitionname("mypartition"); + comp.setOperationType(DataOperationType.UPDATE); components.clear(); components.add(comp); req = new LockRequest(components, "me", "localhost"); @@ -526,6 +557,7 @@ public class TestTxnHandler { LockComponent comp = new LockComponent(LockType.SHARED_READ, LockLevel.DB, "mydb"); comp.setTablename("mytable"); comp.setPartitionname("mypartition"); + comp.setOperationType(DataOperationType.SELECT); List<LockComponent> components = new ArrayList<LockComponent>(1); components.add(comp); LockRequest req = new LockRequest(components, "me", "localhost"); @@ -535,6 +567,7 @@ public class TestTxnHandler { comp = new LockComponent(LockType.EXCLUSIVE, LockLevel.DB, "mydb"); comp.setTablename("mytable"); comp.setPartitionname("mypartition"); + comp.setOperationType(DataOperationType.NO_TXN); components.clear(); components.add(comp); req = new LockRequest(components, "me", "localhost"); @@ -548,6 +581,7 @@ public class TestTxnHandler { LockComponent comp = new LockComponent(LockType.EXCLUSIVE, LockLevel.DB, "mydb"); comp.setTablename("mytable"); comp.setPartitionname("mypartition"); + comp.setOperationType(DataOperationType.NO_TXN); List<LockComponent> components = new ArrayList<LockComponent>(1); components.add(comp); LockRequest req = new LockRequest(components, "me", "localhost"); @@ -557,6 +591,7 @@ public class TestTxnHandler { comp = new LockComponent(LockType.SHARED_READ, LockLevel.DB, "mydb"); comp.setTablename("mytable"); comp.setPartitionname("mypartition"); + comp.setOperationType(DataOperationType.SELECT); components.clear(); components.add(comp); req = new LockRequest(components, "me", "localhost"); @@ -566,6 +601,7 @@ public class TestTxnHandler { comp = new LockComponent(LockType.EXCLUSIVE, LockLevel.DB, "mydb"); comp.setTablename("mytable"); comp.setPartitionname("mypartition"); + comp.setOperationType(DataOperationType.NO_TXN); components.clear(); components.add(comp); req = new LockRequest(components, "me", "localhost"); @@ -579,6 +615,7 @@ public class TestTxnHandler { LockComponent comp = new LockComponent(LockType.SHARED_WRITE, LockLevel.DB, "mydb"); comp.setTablename("mytable"); comp.setPartitionname("mypartition"); + comp.setOperationType(DataOperationType.UPDATE); List<LockComponent> components = new ArrayList<LockComponent>(1); components.add(comp); LockRequest req = new LockRequest(components, "me", "localhost"); @@ -589,6 +626,7 @@ public class TestTxnHandler { comp = new LockComponent(LockType.SHARED_READ, LockLevel.DB, "mydb"); comp.setTablename("mytable"); comp.setPartitionname("mypartition"); + comp.setOperationType(DataOperationType.SELECT); components.clear(); components.add(comp); req = new LockRequest(components, "me", "localhost"); @@ -602,6 +640,7 @@ public class TestTxnHandler { LockComponent comp = new LockComponent(LockType.SHARED_WRITE, LockLevel.DB, "mydb"); comp.setTablename("mytable"); comp.setPartitionname("mypartition"); + comp.setOperationType(DataOperationType.UPDATE); List<LockComponent> components = new ArrayList<LockComponent>(1); components.add(comp); LockRequest req = new LockRequest(components, "me", "localhost"); @@ -612,6 +651,7 @@ public class TestTxnHandler { comp = new LockComponent(LockType.SHARED_WRITE, LockLevel.DB, "mydb"); comp.setTablename("mytable"); comp.setPartitionname("mypartition"); + comp.setOperationType(DataOperationType.DELETE); components.clear(); components.add(comp); req = new LockRequest(components, "me", "localhost"); @@ -622,6 +662,7 @@ public class TestTxnHandler { comp = new LockComponent(LockType.SHARED_READ, LockLevel.DB, "mydb"); comp.setTablename("mytable"); comp.setPartitionname("mypartition"); + comp.setOperationType(DataOperationType.INSERT); components.clear(); components.add(comp); req = new LockRequest(components, "me", "localhost"); @@ -630,11 +671,31 @@ public class TestTxnHandler { } @Test + public void testWrongLockForOperation() throws Exception { + LockComponent comp = new LockComponent(LockType.SHARED_WRITE, LockLevel.DB, "mydb"); + comp.setTablename("mytable"); + comp.setPartitionname("mypartition"); + comp.setOperationType(DataOperationType.NO_TXN); + List<LockComponent> components = new ArrayList<LockComponent>(1); + components.add(comp); + LockRequest req = new LockRequest(components, "me", "localhost"); + req.setTxnid(openTxn()); + Exception expectedError = null; + try { + LockResponse res = txnHandler.lock(req); + } + catch(Exception e) { + expectedError = e; + } + Assert.assertTrue(expectedError != null && expectedError.getMessage().contains("Unexpected DataOperationType")); + } + @Test public void testLockSWSWSW() throws Exception { // Test that write blocks two writes LockComponent comp = new LockComponent(LockType.SHARED_WRITE, LockLevel.DB, "mydb"); comp.setTablename("mytable"); comp.setPartitionname("mypartition"); + comp.setOperationType(DataOperationType.DELETE); List<LockComponent> components = new ArrayList<LockComponent>(1); components.add(comp); LockRequest req = new LockRequest(components, "me", "localhost"); @@ -645,6 +706,7 @@ public class TestTxnHandler { comp = new LockComponent(LockType.SHARED_WRITE, LockLevel.DB, "mydb"); comp.setTablename("mytable"); comp.setPartitionname("mypartition"); + comp.setOperationType(DataOperationType.DELETE); components.clear(); components.add(comp); req = new LockRequest(components, "me", "localhost"); @@ -655,6 +717,7 @@ public class TestTxnHandler { comp = new LockComponent(LockType.SHARED_WRITE, LockLevel.DB, "mydb"); comp.setTablename("mytable"); comp.setPartitionname("mypartition"); + comp.setOperationType(DataOperationType.DELETE); components.clear(); components.add(comp); req = new LockRequest(components, "me", "localhost"); @@ -669,6 +732,7 @@ public class TestTxnHandler { LockComponent comp = new LockComponent(LockType.EXCLUSIVE, LockLevel.DB, "mydb"); comp.setTablename("mytable"); comp.setPartitionname("mypartition"); + comp.setOperationType(DataOperationType.NO_TXN); List<LockComponent> components = new ArrayList<LockComponent>(1); components.add(comp); LockRequest req = new LockRequest(components, "me", "localhost"); @@ -678,6 +742,7 @@ public class TestTxnHandler { comp = new LockComponent(LockType.EXCLUSIVE, LockLevel.DB, "mydb"); comp.setTablename("mytable"); comp.setPartitionname("mypartition"); + comp.setOperationType(DataOperationType.NO_TXN); components.clear(); components.add(comp); req = new LockRequest(components, "me", "localhost"); @@ -687,6 +752,7 @@ public class TestTxnHandler { comp = new LockComponent(LockType.SHARED_WRITE, LockLevel.DB, "mydb"); comp.setTablename("mytable"); comp.setPartitionname("mypartition"); + comp.setOperationType(DataOperationType.DELETE); components.clear(); components.add(comp); req = new LockRequest(components, "me", "localhost"); @@ -701,6 +767,7 @@ public class TestTxnHandler { LockComponent comp = new LockComponent(LockType.EXCLUSIVE, LockLevel.DB, "mydb"); comp.setTablename("mytable"); comp.setPartitionname("mypartition"); + comp.setOperationType(DataOperationType.NO_TXN); List<LockComponent> components = new ArrayList<LockComponent>(1); components.add(comp); LockRequest req = new LockRequest(components, "me", "localhost"); @@ -710,6 +777,7 @@ public class TestTxnHandler { comp = new LockComponent(LockType.EXCLUSIVE, LockLevel.DB, "mydb"); comp.setTablename("mytable"); comp.setPartitionname("mypartition"); + comp.setOperationType(DataOperationType.NO_TXN); components.clear(); components.add(comp); req = new LockRequest(components, "me", "localhost"); @@ -719,6 +787,7 @@ public class TestTxnHandler { comp = new LockComponent(LockType.SHARED_READ, LockLevel.DB, "mydb"); comp.setTablename("mytable"); comp.setPartitionname("mypartition"); + comp.setOperationType(DataOperationType.SELECT); components.clear(); components.add(comp); req = new LockRequest(components, "me", "localhost"); @@ -731,6 +800,7 @@ public class TestTxnHandler { LockComponent comp = new LockComponent(LockType.SHARED_WRITE, LockLevel.DB, "mydb"); comp.setTablename("mytable"); comp.setPartitionname("mypartition"); + comp.setOperationType(DataOperationType.DELETE); List<LockComponent> components = new ArrayList<LockComponent>(1); components.add(comp); LockRequest req = new LockRequest(components, "me", "localhost"); @@ -743,6 +813,7 @@ public class TestTxnHandler { comp = new LockComponent(LockType.SHARED_WRITE, LockLevel.DB, "mydb"); comp.setTablename("mytable"); comp.setPartitionname("mypartition"); + comp.setOperationType(DataOperationType.UPDATE); components.clear(); components.add(comp); req = new LockRequest(components, "me", "localhost"); @@ -772,6 +843,7 @@ public class TestTxnHandler { LockComponent comp = new LockComponent(LockType.SHARED_WRITE, LockLevel.DB, "mydb"); comp.setTablename("mytable"); comp.setPartitionname("mypartition"); + comp.setOperationType(DataOperationType.DELETE); List<LockComponent> components = new ArrayList<LockComponent>(1); components.add(comp); LockRequest req = new LockRequest(components, "me", "localhost"); @@ -794,12 +866,14 @@ public class TestTxnHandler { LockComponent comp = new LockComponent(LockType.EXCLUSIVE, LockLevel.DB, "mydb"); comp.setTablename("mytable"); comp.setPartitionname("mypartition"); + comp.setOperationType(DataOperationType.NO_TXN); List<LockComponent> components = new ArrayList<LockComponent>(2); components.add(comp); comp = new LockComponent(LockType.EXCLUSIVE, LockLevel.DB, "mydb"); comp.setTablename("mytable"); comp.setPartitionname("anotherpartition"); + comp.setOperationType(DataOperationType.NO_TXN); components.add(comp); LockRequest req = new LockRequest(components, "me", "localhost"); LockResponse res = txnHandler.lock(req); @@ -817,12 +891,14 @@ public class TestTxnHandler { LockComponent comp = new LockComponent(LockType.EXCLUSIVE, LockLevel.DB, "mydb"); comp.setTablename("mytable"); comp.setPartitionname("mypartition"); + comp.setOperationType(DataOperationType.NO_TXN); List<LockComponent> components = new ArrayList<LockComponent>(2); components.add(comp); comp = new LockComponent(LockType.EXCLUSIVE, LockLevel.DB, "mydb"); comp.setTablename("mytable"); comp.setPartitionname("anotherpartition"); + comp.setOperationType(DataOperationType.NO_TXN); components.add(comp); LockRequest req = new LockRequest(components, "me", "localhost"); LockResponse res = txnHandler.lock(req); @@ -833,6 +909,7 @@ public class TestTxnHandler { comp = new LockComponent(LockType.EXCLUSIVE, LockLevel.DB, "mydb"); comp.setTablename("mytable"); comp.setPartitionname("mypartition"); + comp.setOperationType(DataOperationType.NO_TXN); components = new ArrayList<LockComponent>(1); components.add(comp); req = new LockRequest(components, "me", "localhost"); @@ -852,6 +929,7 @@ public class TestTxnHandler { long txnid = openTxn(); LockComponent comp = new LockComponent(LockType.SHARED_WRITE, LockLevel.DB, "mydb"); comp.setTablename("mytable"); + comp.setOperationType(DataOperationType.DELETE); List<LockComponent> components = new ArrayList<LockComponent>(1); components.add(comp); LockRequest req = new LockRequest(components, "me", "localhost"); @@ -867,6 +945,7 @@ public class TestTxnHandler { // Test that committing unlocks long txnid = openTxn(); LockComponent comp = new LockComponent(LockType.SHARED_WRITE, LockLevel.DB, "mydb"); + comp.setOperationType(DataOperationType.UPDATE); List<LockComponent> components = new ArrayList<LockComponent>(1); components.add(comp); LockRequest req = new LockRequest(components, "me", "localhost"); @@ -886,6 +965,7 @@ public class TestTxnHandler { LockComponent comp = new LockComponent(LockType.SHARED_WRITE, LockLevel.DB, "mydb"); comp.setTablename("mytable"); comp.setPartitionname("mypartition"); + comp.setOperationType(DataOperationType.DELETE); List<LockComponent> components = new ArrayList<LockComponent>(1); components.add(comp); LockRequest req = new LockRequest(components, "me", "localhost"); @@ -932,6 +1012,7 @@ public class TestTxnHandler { LockComponent comp = new LockComponent(LockType.EXCLUSIVE, LockLevel.DB, "mydb"); comp.setTablename("mytable"); comp.setPartitionname("mypartition"); + comp.setOperationType(DataOperationType.NO_TXN); List<LockComponent> components = new ArrayList<LockComponent>(1); components.add(comp); LockRequest req = new LockRequest(components, "me", "localhost"); @@ -996,6 +1077,7 @@ public class TestTxnHandler { LockComponent comp = new LockComponent(LockType.EXCLUSIVE, LockLevel.DB, "mydb"); comp.setTablename("mytable"); comp.setPartitionname("mypartition"); + comp.setOperationType(DataOperationType.NO_TXN); List<LockComponent> components = new ArrayList<LockComponent>(1); components.add(comp); LockRequest req = new LockRequest(components, "me", "localhost"); @@ -1083,6 +1165,7 @@ public class TestTxnHandler { public void showLocks() throws Exception { long begining = System.currentTimeMillis(); LockComponent comp = new LockComponent(LockType.EXCLUSIVE, LockLevel.DB, "mydb"); + comp.setOperationType(DataOperationType.NO_TXN); List<LockComponent> components = new ArrayList<LockComponent>(1); components.add(comp); LockRequest req = new LockRequest(components, "me", "localhost"); @@ -1092,6 +1175,7 @@ public class TestTxnHandler { long txnid = openTxn(); comp = new LockComponent(LockType.SHARED_READ, LockLevel.TABLE, "mydb"); comp.setTablename("mytable"); + comp.setOperationType(DataOperationType.SELECT); components = new ArrayList<LockComponent>(1); components.add(comp); req = new LockRequest(components, "me", "localhost"); @@ -1103,6 +1187,7 @@ public class TestTxnHandler { comp = new LockComponent(LockType.SHARED_READ, LockLevel.PARTITION, "yourdb"); comp.setTablename("yourtable"); comp.setPartitionname("yourpartition"); + comp.setOperationType(DataOperationType.INSERT); components.add(comp); req = new LockRequest(components, "you", "remotehost"); res = txnHandler.lock(req); http://git-wip-us.apache.org/repos/asf/hive/blob/f25b8652/ql/src/java/org/apache/hadoop/hive/ql/exec/MoveTask.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/MoveTask.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/MoveTask.java index 21aa315..aeaae6b 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/MoveTask.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/MoveTask.java @@ -429,7 +429,8 @@ public class MoveTask extends Task<MoveWork> implements Serializable { dpCtx.getNumDPCols(), isSkewedStoredAsDirs(tbd), work.getLoadTableWork().getWriteType() != AcidUtils.Operation.NOT_ACID, - SessionState.get().getTxnMgr().getCurrentTxnId(), hasFollowingStatsTask()); + SessionState.get().getTxnMgr().getCurrentTxnId(), hasFollowingStatsTask(), + work.getLoadTableWork().getWriteType()); console.printInfo("\t Time taken to load dynamic partitions: " + (System.currentTimeMillis() - startTime)/1000.0 + " seconds"); http://git-wip-us.apache.org/repos/asf/hive/blob/f25b8652/ql/src/java/org/apache/hadoop/hive/ql/io/AcidUtils.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/io/AcidUtils.java b/ql/src/java/org/apache/hadoop/hive/ql/io/AcidUtils.java index 9446876..bac38ce 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/io/AcidUtils.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/io/AcidUtils.java @@ -18,6 +18,7 @@ package org.apache.hadoop.hive.ql.io; +import org.apache.hadoop.hive.metastore.api.DataOperationType; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.apache.hadoop.conf.Configuration; @@ -221,7 +222,20 @@ public class AcidUtils { return result; } - public enum Operation { NOT_ACID, INSERT, UPDATE, DELETE } + public enum Operation { + NOT_ACID(DataOperationType.UNSET), + INSERT(DataOperationType.INSERT), + UPDATE(DataOperationType.UPDATE), + DELETE(DataOperationType.DELETE); + + private final DataOperationType dop; + private Operation(DataOperationType dop) { + this.dop = dop; + } + public DataOperationType toDataOperationType() { + return dop; + } + } public static interface Directory { http://git-wip-us.apache.org/repos/asf/hive/blob/f25b8652/ql/src/java/org/apache/hadoop/hive/ql/lockmgr/DbTxnManager.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/lockmgr/DbTxnManager.java b/ql/src/java/org/apache/hadoop/hive/ql/lockmgr/DbTxnManager.java index 9ab6169..9988eec 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/lockmgr/DbTxnManager.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/lockmgr/DbTxnManager.java @@ -162,7 +162,7 @@ public class DbTxnManager extends HiveTxnManagerImpl { boolean atLeastOneLock = false; - LockRequestBuilder rqstBuilder = new LockRequestBuilder(); + LockRequestBuilder rqstBuilder = new LockRequestBuilder(plan.getQueryId()); //link queryId to txnId LOG.info("Setting lock request transaction to " + JavaUtils.txnIdToString(txnId) + " for queryId=" + plan.getQueryId()); rqstBuilder.setTransactionId(txnId) @@ -178,6 +178,7 @@ public class DbTxnManager extends HiveTxnManagerImpl { } LockComponentBuilder compBuilder = new LockComponentBuilder(); compBuilder.setShared(); + compBuilder.setOperationType(DataOperationType.SELECT); Table t = null; switch (input.getType()) { @@ -203,6 +204,9 @@ public class DbTxnManager extends HiveTxnManagerImpl { // This is a file or something we don't hold locks for. continue; } + if(t != null && AcidUtils.isAcidTable(t)) { + compBuilder.setIsAcid(true); + } LockComponent comp = compBuilder.build(); LOG.debug("Adding lock component to lock request " + comp.toString()); rqstBuilder.addLockComponent(comp); @@ -226,27 +230,35 @@ public class DbTxnManager extends HiveTxnManagerImpl { case DDL_EXCLUSIVE: case INSERT_OVERWRITE: compBuilder.setExclusive(); + compBuilder.setOperationType(DataOperationType.NO_TXN); break; case INSERT: - t = output.getTable(); - if(t == null) { - throw new IllegalStateException("No table info for " + output); - } + t = getTable(output); if(AcidUtils.isAcidTable(t)) { compBuilder.setShared(); + compBuilder.setIsAcid(true); } else { compBuilder.setExclusive(); + compBuilder.setIsAcid(false); } + compBuilder.setOperationType(DataOperationType.INSERT); break; case DDL_SHARED: compBuilder.setShared(); + compBuilder.setOperationType(DataOperationType.NO_TXN); break; case UPDATE: + compBuilder.setSemiShared(); + compBuilder.setOperationType(DataOperationType.UPDATE); + t = getTable(output); + break; case DELETE: compBuilder.setSemiShared(); + compBuilder.setOperationType(DataOperationType.DELETE); + t = getTable(output); break; case DDL_NO_LOCK: @@ -280,12 +292,15 @@ public class DbTxnManager extends HiveTxnManagerImpl { // This is a file or something we don't hold locks for. continue; } + if(t != null && AcidUtils.isAcidTable(t)) { + compBuilder.setIsAcid(true); + } LockComponent comp = compBuilder.build(); LOG.debug("Adding lock component to lock request " + comp.toString()); rqstBuilder.addLockComponent(comp); atLeastOneLock = true; } - + //plan // Make sure we need locks. It's possible there's nothing to lock in // this operation. if (!atLeastOneLock) { @@ -301,6 +316,13 @@ public class DbTxnManager extends HiveTxnManagerImpl { ctx.setHiveLocks(locks); return lockState; } + private static Table getTable(WriteEntity we) { + Table t = we.getTable(); + if(t == null) { + throw new IllegalStateException("No table info for " + we); + } + return t; + } /** * This is for testing only. * @param delay time to delay for first heartbeat http://git-wip-us.apache.org/repos/asf/hive/blob/f25b8652/ql/src/java/org/apache/hadoop/hive/ql/metadata/Hive.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/metadata/Hive.java b/ql/src/java/org/apache/hadoop/hive/ql/metadata/Hive.java index dcfc2b5..3fa1233 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/metadata/Hive.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/metadata/Hive.java @@ -1695,7 +1695,8 @@ private void constructOneLBLocationMap(FileStatus fSta, */ public Map<Map<String, String>, Partition> loadDynamicPartitions(Path loadPath, String tableName, Map<String, String> partSpec, boolean replace, - int numDP, boolean listBucketingEnabled, boolean isAcid, long txnId, boolean hasFollowingStatsTask) + int numDP, boolean listBucketingEnabled, boolean isAcid, long txnId, boolean hasFollowingStatsTask, + AcidUtils.Operation operation) throws HiveException { Set<Path> validPartitions = new HashSet<Path>(); @@ -1758,7 +1759,8 @@ private void constructOneLBLocationMap(FileStatus fSta, for (Partition p : partitionsMap.values()) { partNames.add(p.getName()); } - metaStoreClient.addDynamicPartitions(txnId, tbl.getDbName(), tbl.getTableName(), partNames); + metaStoreClient.addDynamicPartitions(txnId, tbl.getDbName(), tbl.getTableName(), + partNames, operation.toDataOperationType()); } return partitionsMap; } catch (IOException e) { http://git-wip-us.apache.org/repos/asf/hive/blob/f25b8652/ql/src/test/org/apache/hadoop/hive/ql/lockmgr/TestDbTxnManager2.java ---------------------------------------------------------------------- diff --git a/ql/src/test/org/apache/hadoop/hive/ql/lockmgr/TestDbTxnManager2.java b/ql/src/test/org/apache/hadoop/hive/ql/lockmgr/TestDbTxnManager2.java index 8840fd9..4782213 100644 --- a/ql/src/test/org/apache/hadoop/hive/ql/lockmgr/TestDbTxnManager2.java +++ b/ql/src/test/org/apache/hadoop/hive/ql/lockmgr/TestDbTxnManager2.java @@ -18,6 +18,7 @@ package org.apache.hadoop.hive.ql.lockmgr; import org.apache.hadoop.hive.metastore.api.AddDynamicPartitions; +import org.apache.hadoop.hive.metastore.api.DataOperationType; import org.apache.hadoop.hive.metastore.txn.TxnStore; import org.apache.hadoop.hive.metastore.txn.TxnUtils; import org.apache.hadoop.hive.ql.TestTxnCommands2; @@ -866,8 +867,10 @@ public class TestDbTxnManager2 { checkLock(LockType.SHARED_WRITE, LockState.ACQUIRED, "default", "TAB2", null, locks.get(1)); //update stmt has p=blah, thus nothing is actually update and we generate empty dyn part list Assert.assertEquals(0, TxnDbUtil.countQueryAgent("select count(*) from WRITE_SET")); - txnHandler.addDynamicPartitions(new AddDynamicPartitions(txnMgr2.getCurrentTxnId(), - "default", "tab2", Collections.EMPTY_LIST)); + AddDynamicPartitions adp = new AddDynamicPartitions(txnMgr2.getCurrentTxnId(), + "default", "tab2", Collections.EMPTY_LIST); + adp.setOperationType(DataOperationType.UPDATE); + txnHandler.addDynamicPartitions(adp); txnMgr2.commitTxn(); //Short Running updated nothing, so we expect 0 rows in WRITE_SET Assert.assertEquals( 0, TxnDbUtil.countQueryAgent("select count(*) from WRITE_SET")); @@ -881,8 +884,10 @@ public class TestDbTxnManager2 { checkLock(LockType.SHARED_WRITE, LockState.ACQUIRED, "default", "TAB2", null, locks.get(1));//since TAB2 is empty //update stmt has p=blah, thus nothing is actually update and we generate empty dyn part list Assert.assertEquals(0, TxnDbUtil.countQueryAgent("select count(*) from WRITE_SET")); - txnHandler.addDynamicPartitions(new AddDynamicPartitions(txnMgr2.getCurrentTxnId(), - "default", "tab2", Collections.singletonList("p=two")));//simulate partition update + adp = new AddDynamicPartitions(txnMgr2.getCurrentTxnId(), + "default", "tab2", Collections.singletonList("p=two")); + adp.setOperationType(DataOperationType.UPDATE); + txnHandler.addDynamicPartitions(adp);//simulate partition update txnMgr2.commitTxn(); Assert.assertEquals("WRITE_SET mismatch: " + TxnDbUtil.queryToString("select * from WRITE_SET"), 1, TxnDbUtil.countQueryAgent("select count(*) from WRITE_SET")); @@ -894,8 +899,10 @@ public class TestDbTxnManager2 { checkCmdOnDriver(driver.compileAndRespond("update TAB2 set b = 17 where a = 1"));//no rows match txnMgr.acquireLocks(driver.getPlan(), ctx, "Long Running"); //so generate empty Dyn Part call - txnHandler.addDynamicPartitions(new AddDynamicPartitions(txnMgr.getCurrentTxnId(), - "default", "tab2", Collections.EMPTY_LIST)); + adp = new AddDynamicPartitions(txnMgr.getCurrentTxnId(), + "default", "tab2", Collections.EMPTY_LIST); + adp.setOperationType(DataOperationType.UPDATE); + txnHandler.addDynamicPartitions(adp); txnMgr.commitTxn(); locks = getLocks(txnMgr); @@ -996,16 +1003,20 @@ public class TestDbTxnManager2 { checkLock(LockType.SHARED_WRITE, LockState.ACQUIRED, "default", "TAB2", "p=one", locks.get(1)); //this simulates the completion of txnid:2 - txnHandler.addDynamicPartitions(new AddDynamicPartitions(txnMgr2.getCurrentTxnId(), "default", "tab2", - Collections.singletonList("p=two"))); + AddDynamicPartitions adp = new AddDynamicPartitions(txnMgr2.getCurrentTxnId(), "default", "tab2", + Collections.singletonList("p=two")); + adp.setOperationType(DataOperationType.UPDATE); + txnHandler.addDynamicPartitions(adp); txnMgr2.commitTxn();//txnid:2 locks = getLocks(txnMgr2); Assert.assertEquals("Unexpected lock count", 1, locks.size()); checkLock(LockType.SHARED_WRITE, LockState.ACQUIRED, "default", "TAB2", "p=one", locks.get(0)); //completion of txnid:3 - txnHandler.addDynamicPartitions(new AddDynamicPartitions(txnMgr.getCurrentTxnId(), "default", "tab2", - Collections.singletonList("p=one"))); + adp = new AddDynamicPartitions(txnMgr.getCurrentTxnId(), "default", "tab2", + Collections.singletonList("p=one")); + adp.setOperationType(DataOperationType.UPDATE); + txnHandler.addDynamicPartitions(adp); txnMgr.commitTxn();//txnid:3 //now both txns concurrently updated TAB2 but different partitions. @@ -1043,8 +1054,10 @@ public class TestDbTxnManager2 { checkLock(LockType.SHARED_WRITE, LockState.WAITING, "default", "TAB1", "p=one", locks.get(3)); //this simulates the completion of txnid:5 - txnHandler.addDynamicPartitions(new AddDynamicPartitions(txnMgr2.getCurrentTxnId(), "default", "tab1", - Collections.singletonList("p=one"))); + adp = new AddDynamicPartitions(txnMgr2.getCurrentTxnId(), "default", "tab1", + Collections.singletonList("p=one")); + adp.setOperationType(DataOperationType.UPDATE); + txnHandler.addDynamicPartitions(adp); txnMgr2.commitTxn();//txnid:5 ((DbLockManager)txnMgr.getLockManager()).checkLock(locks.get(2).getLockid());//retest WAITING locks (both have same ext id) @@ -1053,8 +1066,10 @@ public class TestDbTxnManager2 { checkLock(LockType.SHARED_WRITE, LockState.ACQUIRED, "default", "TAB1", "p=two", locks.get(0)); checkLock(LockType.SHARED_WRITE, LockState.ACQUIRED, "default", "TAB1", "p=one", locks.get(1)); //completion of txnid:6 - txnHandler.addDynamicPartitions(new AddDynamicPartitions(txnMgr.getCurrentTxnId(), "default", "tab1", - Collections.singletonList("p=two"))); + adp = new AddDynamicPartitions(txnMgr.getCurrentTxnId(), "default", "tab1", + Collections.singletonList("p=two")); + adp.setOperationType(DataOperationType.UPDATE); + txnHandler.addDynamicPartitions(adp); txnMgr.commitTxn();//txnid:6 Assert.assertEquals("WRITE_SET mismatch: " + TxnDbUtil.queryToString("select * from WRITE_SET"), @@ -1094,8 +1109,10 @@ public class TestDbTxnManager2 { checkLock(LockType.SHARED_WRITE, LockState.WAITING, "default", "TAB1", "p=two", locks.get(2)); //this simulates the completion of txnid:2 - txnHandler.addDynamicPartitions(new AddDynamicPartitions(txnMgr2.getCurrentTxnId(), "default", "tab1", - Collections.singletonList("p=one"))); + AddDynamicPartitions adp = new AddDynamicPartitions(txnMgr2.getCurrentTxnId(), "default", "tab1", + Collections.singletonList("p=one")); + adp.setOperationType(DataOperationType.UPDATE); + txnHandler.addDynamicPartitions(adp); txnMgr2.commitTxn();//txnid:2 ((DbLockManager)txnMgr.getLockManager()).checkLock(locks.get(2).getLockid());//retest WAITING locks (both have same ext id) @@ -1103,8 +1120,10 @@ public class TestDbTxnManager2 { Assert.assertEquals("Unexpected lock count", 1, locks.size()); checkLock(LockType.SHARED_WRITE, LockState.ACQUIRED, "default", "TAB1", "p=two", locks.get(0)); //completion of txnid:3 - txnHandler.addDynamicPartitions(new AddDynamicPartitions(txnMgr.getCurrentTxnId(), "default", "tab1", - Collections.singletonList("p=two"))); + adp = new AddDynamicPartitions(txnMgr.getCurrentTxnId(), "default", "tab1", + Collections.singletonList("p=two")); + adp.setOperationType(DataOperationType.UPDATE); + txnHandler.addDynamicPartitions(adp); txnMgr.commitTxn();//txnid:3 Assert.assertEquals("WRITE_SET mismatch: " + TxnDbUtil.queryToString("select * from WRITE_SET"), @@ -1143,8 +1162,10 @@ public class TestDbTxnManager2 { checkLock(LockType.SHARED_WRITE, LockState.WAITING, "default", "TAB1", "p=two", locks.get(2)); //this simulates the completion of txnid:2 - txnHandler.addDynamicPartitions(new AddDynamicPartitions(txnMgr2.getCurrentTxnId(), "default", "tab1", - Collections.singletonList("p=one"))); + AddDynamicPartitions adp = new AddDynamicPartitions(txnMgr2.getCurrentTxnId(), "default", "tab1", + Collections.singletonList("p=one")); + adp.setOperationType(DataOperationType.UPDATE); + txnHandler.addDynamicPartitions(adp); txnMgr2.commitTxn();//txnid:2 ((DbLockManager)txnMgr.getLockManager()).checkLock(locks.get(2).getLockid());//retest WAITING locks (both have same ext id) @@ -1152,14 +1173,22 @@ public class TestDbTxnManager2 { Assert.assertEquals("Unexpected lock count", 1, locks.size()); checkLock(LockType.SHARED_WRITE, LockState.ACQUIRED, "default", "TAB1", "p=two", locks.get(0)); //completion of txnid:3 - txnHandler.addDynamicPartitions(new AddDynamicPartitions(txnMgr.getCurrentTxnId(), "default", "tab1", - Collections.singletonList("p=two"))); + adp = new AddDynamicPartitions(txnMgr.getCurrentTxnId(), "default", "tab1", + Collections.singletonList("p=two")); + adp.setOperationType(DataOperationType.DELETE); + txnHandler.addDynamicPartitions(adp); txnMgr.commitTxn();//txnid:3 + Assert.assertEquals("WRITE_SET mismatch: " + TxnDbUtil.queryToString("select * from COMPLETED_TXN_COMPONENTS"), + 2, TxnDbUtil.countQueryAgent("select count(*) from COMPLETED_TXN_COMPONENTS where ctc_txnid=1 and ctc_table='tab1'")); + Assert.assertEquals("WRITE_SET mismatch: " + TxnDbUtil.queryToString("select * from COMPLETED_TXN_COMPONENTS"), + 1, TxnDbUtil.countQueryAgent("select count(*) from COMPLETED_TXN_COMPONENTS where ctc_txnid=2 and ctc_table='tab1' and ctc_partition='p=one'")); + Assert.assertEquals("WRITE_SET mismatch: " + TxnDbUtil.queryToString("select * from COMPLETED_TXN_COMPONENTS"), + 1, TxnDbUtil.countQueryAgent("select count(*) from COMPLETED_TXN_COMPONENTS where ctc_txnid=3 and ctc_table='tab1' and ctc_partition='p=two'")); Assert.assertEquals("WRITE_SET mismatch: " + TxnDbUtil.queryToString("select * from WRITE_SET"), 1, TxnDbUtil.countQueryAgent("select count(*) from WRITE_SET where ws_partition='p=one' and ws_operation_type='u' and ws_table='tab1'")); Assert.assertEquals("WRITE_SET mismatch: " + TxnDbUtil.queryToString("select * from WRITE_SET"), - 1, TxnDbUtil.countQueryAgent("select count(*) from WRITE_SET where ws_partition='p=two' and ws_operation_type='u' and ws_table='tab1'")); + 1, TxnDbUtil.countQueryAgent("select count(*) from WRITE_SET where ws_partition='p=two' and ws_operation_type='d' and ws_table='tab1'")); Assert.assertEquals("COMPLETED_TXN_COMPONENTS mismatch: " + TxnDbUtil.queryToString("select * from COMPLETED_TXN_COMPONENTS"), 4, TxnDbUtil.countQueryAgent("select count(*) from COMPLETED_TXN_COMPONENTS where ctc_table='tab1' and ctc_partition is not null")); } @@ -1192,8 +1221,10 @@ public class TestDbTxnManager2 { checkLock(LockType.SHARED_WRITE, LockState.WAITING, "default", "TAB1", "p=two", locks.get(2)); //this simulates the completion of txnid:2 - txnHandler.addDynamicPartitions(new AddDynamicPartitions(txnMgr2.getCurrentTxnId(), "default", "tab1", - Collections.singletonList("p=two"))); + AddDynamicPartitions adp = new AddDynamicPartitions(txnMgr2.getCurrentTxnId(), "default", "tab1", + Collections.singletonList("p=two")); + adp.setOperationType(DataOperationType.UPDATE); + txnHandler.addDynamicPartitions(adp); txnMgr2.commitTxn();//txnid:2 ((DbLockManager)txnMgr.getLockManager()).checkLock(locks.get(2).getLockid());//retest WAITING locks (both have same ext id) @@ -1201,8 +1232,10 @@ public class TestDbTxnManager2 { Assert.assertEquals("Unexpected lock count", 1, locks.size()); checkLock(LockType.SHARED_WRITE, LockState.ACQUIRED, "default", "TAB1", "p=two", locks.get(0)); //completion of txnid:3 - txnHandler.addDynamicPartitions(new AddDynamicPartitions(txnMgr.getCurrentTxnId(), "default", "tab1", - Collections.singletonList("p=two"))); + adp = new AddDynamicPartitions(txnMgr.getCurrentTxnId(), "default", "tab1", + Collections.singletonList("p=two")); + adp.setOperationType(DataOperationType.DELETE); + txnHandler.addDynamicPartitions(adp); LockException exception = null; try { txnMgr.commitTxn();//txnid:3 @@ -1222,11 +1255,7 @@ public class TestDbTxnManager2 { } /** * Concurrent delte/detele of same partition - should pass - * This test doesn't work yet, because we don't yet pass in operation type - * - * todo: Concurrent insert/update of same partition - should pass */ - @Ignore("HIVE-13622") @Test public void testWriteSetTracking11() throws Exception { CommandProcessorResponse cpr = driver.run("create table if not exists tab1 (a int, b int) partitioned by (p string) " + @@ -1244,46 +1273,86 @@ public class TestDbTxnManager2 { //now start concurrent txn txnMgr.openTxn("T3"); + checkCmdOnDriver(driver.compileAndRespond("select * from tab1 where b=1 and p='one'")); + ((DbTxnManager)txnMgr).acquireLocks(driver.getPlan(), ctx, "T3", false); checkCmdOnDriver(driver.compileAndRespond("delete from tab1 where p='two' and b=2")); ((DbTxnManager)txnMgr).acquireLocks(driver.getPlan(), ctx, "T3", false); locks = getLocks(txnMgr); - Assert.assertEquals("Unexpected lock count", 3, locks.size()); - checkLock(LockType.SHARED_WRITE, LockState.ACQUIRED, "default", "TAB1", "p=two", locks.get(0)); - checkLock(LockType.SHARED_WRITE, LockState.ACQUIRED, "default", "TAB1", "p=one", locks.get(1)); - checkLock(LockType.SHARED_WRITE, LockState.WAITING, "default", "TAB1", "p=two", locks.get(2)); + Assert.assertEquals("Unexpected lock count", 5, locks.size()); + checkLock(LockType.SHARED_READ, LockState.ACQUIRED, "default", "TAB1", null, locks.get(0)); + checkLock(LockType.SHARED_READ, LockState.ACQUIRED, "default", "TAB1", "p=one", locks.get(1)); + checkLock(LockType.SHARED_WRITE, LockState.ACQUIRED, "default", "TAB1", "p=two", locks.get(2)); + checkLock(LockType.SHARED_WRITE, LockState.ACQUIRED, "default", "TAB1", "p=one", locks.get(3)); + checkLock(LockType.SHARED_WRITE, LockState.WAITING, "default", "TAB1", "p=two", locks.get(4)); //this simulates the completion of txnid:2 - txnHandler.addDynamicPartitions(new AddDynamicPartitions(txnMgr2.getCurrentTxnId(), "default", "tab1", - Collections.singletonList("p=two"))); + AddDynamicPartitions adp = new AddDynamicPartitions(txnMgr2.getCurrentTxnId(), "default", "tab1", + Collections.singletonList("p=two")); + adp.setOperationType(DataOperationType.DELETE); + txnHandler.addDynamicPartitions(adp); txnMgr2.commitTxn();//txnid:2 - ((DbLockManager)txnMgr.getLockManager()).checkLock(locks.get(2).getLockid());//retest WAITING locks (both have same ext id) + ((DbLockManager)txnMgr.getLockManager()).checkLock(locks.get(4).getLockid());//retest WAITING locks (both have same ext id) locks = getLocks(txnMgr); - Assert.assertEquals("Unexpected lock count", 1, locks.size()); - checkLock(LockType.SHARED_WRITE, LockState.ACQUIRED, "default", "TAB1", "p=two", locks.get(0)); + Assert.assertEquals("Unexpected lock count", 3, locks.size()); + checkLock(LockType.SHARED_READ, LockState.ACQUIRED, "default", "TAB1", null, locks.get(0)); + checkLock(LockType.SHARED_READ, LockState.ACQUIRED, "default", "TAB1", "p=one", locks.get(1)); + checkLock(LockType.SHARED_WRITE, LockState.ACQUIRED, "default", "TAB1", "p=two", locks.get(2)); //completion of txnid:3 - txnHandler.addDynamicPartitions(new AddDynamicPartitions(txnMgr.getCurrentTxnId(), "default", "tab1", - Collections.singletonList("p=two"))); - LockException exception = null; - try { - txnMgr.commitTxn();//txnid:3 - } - catch(LockException e) { - exception = e; - } - Assert.assertNotEquals("Expected exception", null, exception); - Assert.assertEquals("Exception msg doesn't match", - "Aborting [txnid:3,3] due to a write conflict on default/tab1/p=two committed by [txnid:2,3]", - exception.getCause().getMessage()); + adp = new AddDynamicPartitions(txnMgr.getCurrentTxnId(), "default", "tab1", + Collections.singletonList("p=two")); + adp.setOperationType(DataOperationType.DELETE); + txnHandler.addDynamicPartitions(adp); + txnMgr.commitTxn();//txnid:3 - //todo: this currently fails since we don't yet set operation type properly Assert.assertEquals("WRITE_SET mismatch: " + TxnDbUtil.queryToString("select * from WRITE_SET"), - 1, TxnDbUtil.countQueryAgent("select count(*) from WRITE_SET where ws_partition='p=two' and ws_operation_type='d' and ws_table='tab1'")); + 1, TxnDbUtil.countQueryAgent("select count(*) from WRITE_SET where ws_partition='p=two' and ws_operation_type='d' and ws_table='tab1' and ws_txnid=2")); Assert.assertEquals("WRITE_SET mismatch: " + TxnDbUtil.queryToString("select * from WRITE_SET"), - 1, TxnDbUtil.countQueryAgent("select count(*) from WRITE_SET where ws_partition='p=two' and ws_operation_type='d' and ws_table='tab1'")); + 1, TxnDbUtil.countQueryAgent("select count(*) from WRITE_SET where ws_partition='p=two' and ws_operation_type='d' and ws_table='tab1' and ws_txnid=3")); + Assert.assertEquals("WRITE_SET mismatch: " + TxnDbUtil.queryToString("select * from WRITE_SET"), + 1, TxnDbUtil.countQueryAgent("select count(*) from WRITE_SET where ws_partition='p=two' and ws_operation_type='d' and ws_table='tab1' and ws_txnid=2")); + Assert.assertEquals("WRITE_SET mismatch: " + TxnDbUtil.queryToString("select * from WRITE_SET"), + 1, TxnDbUtil.countQueryAgent("select count(*) from WRITE_SET where ws_partition='p=two' and ws_operation_type='d' and ws_table='tab1' and ws_txnid=3")); Assert.assertEquals("COMPLETED_TXN_COMPONENTS mismatch: " + TxnDbUtil.queryToString("select * from COMPLETED_TXN_COMPONENTS"), 4, TxnDbUtil.countQueryAgent("select count(*) from COMPLETED_TXN_COMPONENTS where ctc_table='tab1' and ctc_partition is not null")); } + @Test + public void testCompletedTxnComponents() throws Exception { + CommandProcessorResponse cpr = driver.run("create table if not exists tab1 (a int, b int) partitioned by (p string) " + + "clustered by (a) into 2 buckets stored as orc TBLPROPERTIES ('transactional'='true')"); + checkCmdOnDriver(cpr); + cpr = driver.run("create table if not exists tab_not_acid2 (a int, b int)"); + checkCmdOnDriver(cpr); + checkCmdOnDriver(driver.run("insert into tab_not_acid2 values(1,1),(2,2)")); + //writing both acid and non-acid resources in the same txn + checkCmdOnDriver(driver.run("from tab_not_acid2 insert into tab1 partition(p='two')(a,b) select a,b insert into tab_not_acid2(a,b) select a,b "));//txnid:1 + Assert.assertEquals(TxnDbUtil.queryToString("select * from COMPLETED_TXN_COMPONENTS"), + 1, TxnDbUtil.countQueryAgent("select count(*) from COMPLETED_TXN_COMPONENTS")); + //only expect transactional components to be in COMPLETED_TXN_COMPONENTS + Assert.assertEquals(TxnDbUtil.queryToString("select * from COMPLETED_TXN_COMPONENTS"), + 1, TxnDbUtil.countQueryAgent("select count(*) from COMPLETED_TXN_COMPONENTS where ctc_txnid=1 and ctc_table='tab1'")); + } + @Test + public void testMultiInsert() throws Exception { + CommandProcessorResponse cpr = driver.run("create table if not exists tab1 (a int, b int) partitioned by (p string) " + + "clustered by (a) into 2 buckets stored as orc TBLPROPERTIES ('transactional'='true')"); + checkCmdOnDriver(cpr); + cpr = driver.run("create table if not exists tab_not_acid (a int, b int, p string)"); + checkCmdOnDriver(cpr); + checkCmdOnDriver(driver.run("insert into tab_not_acid values(1,1,'one'),(2,2,'two')")); + checkCmdOnDriver(driver.run("insert into tab1 partition(p) values(3,3,'one'),(4,4,'two')"));//txinid:1 + //writing both acid and non-acid resources in the same txn + //tab1 write is a dynamic partition insert + checkCmdOnDriver(driver.run("from tab_not_acid insert into tab1 partition(p)(a,b,p) select a,b,p insert into tab_not_acid(a,b) select a,b where p='two'"));//txnid:2 + Assert.assertEquals(TxnDbUtil.queryToString("select * from COMPLETED_TXN_COMPONENTS"), + 4, TxnDbUtil.countQueryAgent("select count(*) from COMPLETED_TXN_COMPONENTS")); + //only expect transactional components to be in COMPLETED_TXN_COMPONENTS + Assert.assertEquals(TxnDbUtil.queryToString("select * from COMPLETED_TXN_COMPONENTS"), + 2, TxnDbUtil.countQueryAgent("select count(*) from COMPLETED_TXN_COMPONENTS where ctc_txnid=2")); + Assert.assertEquals(TxnDbUtil.queryToString("select * from COMPLETED_TXN_COMPONENTS"), + 2, TxnDbUtil.countQueryAgent("select count(*) from COMPLETED_TXN_COMPONENTS where ctc_txnid=2 and ctc_table='tab1'")); + } + //todo: Concurrent insert/update of same partition - should pass private List<ShowLocksResponseElement> getLocksWithFilterOptions(HiveTxnManager txnMgr, String dbName, String tblName, Map<String, String> partSpec) throws Exception { http://git-wip-us.apache.org/repos/asf/hive/blob/f25b8652/ql/src/test/org/apache/hadoop/hive/ql/txn/compactor/TestCleaner.java ---------------------------------------------------------------------- diff --git a/ql/src/test/org/apache/hadoop/hive/ql/txn/compactor/TestCleaner.java b/ql/src/test/org/apache/hadoop/hive/ql/txn/compactor/TestCleaner.java index 1578bfb..44dd99b 100644 --- a/ql/src/test/org/apache/hadoop/hive/ql/txn/compactor/TestCleaner.java +++ b/ql/src/test/org/apache/hadoop/hive/ql/txn/compactor/TestCleaner.java @@ -21,6 +21,7 @@ import org.apache.hadoop.fs.Path; import org.apache.hadoop.hive.conf.HiveConf; import org.apache.hadoop.hive.metastore.api.CompactionRequest; import org.apache.hadoop.hive.metastore.api.CompactionType; +import org.apache.hadoop.hive.metastore.api.DataOperationType; import org.apache.hadoop.hive.metastore.api.LockComponent; import org.apache.hadoop.hive.metastore.api.LockLevel; import org.apache.hadoop.hive.metastore.api.LockRequest; @@ -222,6 +223,7 @@ public class TestCleaner extends CompactorTest { LockComponent comp = new LockComponent(LockType.SHARED_READ, LockLevel.TABLE, "default"); comp.setTablename("bblt"); + comp.setOperationType(DataOperationType.SELECT); List<LockComponent> components = new ArrayList<LockComponent>(1); components.add(comp); LockRequest req = new LockRequest(components, "me", "localhost"); @@ -260,6 +262,7 @@ public class TestCleaner extends CompactorTest { LockComponent comp = new LockComponent(LockType.SHARED_WRITE, LockLevel.PARTITION, "default"); comp.setTablename("bblp"); comp.setPartitionname("ds=today"); + comp.setOperationType(DataOperationType.DELETE); List<LockComponent> components = new ArrayList<LockComponent>(1); components.add(comp); LockRequest req = new LockRequest(components, "me", "localhost"); @@ -302,6 +305,7 @@ public class TestCleaner extends CompactorTest { LockComponent comp = new LockComponent(LockType.SHARED_READ, LockLevel.TABLE, "default"); comp.setTablename("bblt"); + comp.setOperationType(DataOperationType.INSERT); List<LockComponent> components = new ArrayList<LockComponent>(1); components.add(comp); LockRequest req = new LockRequest(components, "me", "localhost"); @@ -325,6 +329,7 @@ public class TestCleaner extends CompactorTest { // clean request LockComponent comp2 = new LockComponent(LockType.SHARED_READ, LockLevel.TABLE, "default"); comp2.setTablename("bblt"); + comp.setOperationType(DataOperationType.SELECT); List<LockComponent> components2 = new ArrayList<LockComponent>(1); components2.add(comp2); LockRequest req2 = new LockRequest(components, "me", "localhost"); @@ -374,6 +379,7 @@ public class TestCleaner extends CompactorTest { LockComponent comp = new LockComponent(LockType.SHARED_READ, LockLevel.PARTITION, "default"); comp.setTablename("bblt"); comp.setPartitionname("ds=today"); + comp.setOperationType(DataOperationType.INSERT); List<LockComponent> components = new ArrayList<LockComponent>(1); components.add(comp); LockRequest req = new LockRequest(components, "me", "localhost"); @@ -399,6 +405,7 @@ public class TestCleaner extends CompactorTest { LockComponent comp2 = new LockComponent(LockType.SHARED_READ, LockLevel.PARTITION, "default"); comp2.setTablename("bblt"); comp2.setPartitionname("ds=today"); + comp.setOperationType(DataOperationType.SELECT); List<LockComponent> components2 = new ArrayList<LockComponent>(1); components2.add(comp2); LockRequest req2 = new LockRequest(components, "me", "localhost"); http://git-wip-us.apache.org/repos/asf/hive/blob/f25b8652/ql/src/test/org/apache/hadoop/hive/ql/txn/compactor/TestInitiator.java ---------------------------------------------------------------------- diff --git a/ql/src/test/org/apache/hadoop/hive/ql/txn/compactor/TestInitiator.java b/ql/src/test/org/apache/hadoop/hive/ql/txn/compactor/TestInitiator.java index a31e2d1..a11fe86 100644 --- a/ql/src/test/org/apache/hadoop/hive/ql/txn/compactor/TestInitiator.java +++ b/ql/src/test/org/apache/hadoop/hive/ql/txn/compactor/TestInitiator.java @@ -22,6 +22,7 @@ import org.apache.hadoop.hive.metastore.api.AbortTxnRequest; import org.apache.hadoop.hive.metastore.api.CommitTxnRequest; import org.apache.hadoop.hive.metastore.api.CompactionRequest; import org.apache.hadoop.hive.metastore.api.CompactionType; +import org.apache.hadoop.hive.metastore.api.DataOperationType; import org.apache.hadoop.hive.metastore.api.GetOpenTxnsResponse; import org.apache.hadoop.hive.metastore.api.LockComponent; import org.apache.hadoop.hive.metastore.api.LockLevel; @@ -123,6 +124,7 @@ public class TestInitiator extends CompactorTest { long txnid = openTxn(); LockComponent comp = new LockComponent(LockType.SHARED_WRITE, LockLevel.TABLE, "default"); comp.setTablename("mcottma"); + comp.setOperationType(DataOperationType.UPDATE); List<LockComponent> components = new ArrayList<LockComponent>(1); components.add(comp); LockRequest req = new LockRequest(components, "me", "localhost"); @@ -153,6 +155,7 @@ public class TestInitiator extends CompactorTest { LockComponent comp = new LockComponent(LockType.SHARED_WRITE, LockLevel.TABLE, "default"); comp.setTablename("mcoptma"); comp.setPartitionname("ds=today"); + comp.setOperationType(DataOperationType.DELETE); List<LockComponent> components = new ArrayList<LockComponent>(1); components.add(comp); LockRequest req = new LockRequest(components, "me", "localhost"); @@ -186,6 +189,7 @@ public class TestInitiator extends CompactorTest { LockComponent comp = new LockComponent(LockType.SHARED_WRITE, LockLevel.TABLE, "default"); comp.setTablename("ncomdpa"); comp.setPartitionname("ds=day-" + i); + comp.setOperationType(DataOperationType.UPDATE); List<LockComponent> components = new ArrayList<LockComponent>(1); components.add(comp); LockRequest req = new LockRequest(components, "me", "localhost"); @@ -210,6 +214,7 @@ public class TestInitiator extends CompactorTest { long txnid = openTxn(); LockComponent comp = new LockComponent(LockType.SHARED_WRITE, LockLevel.TABLE, "default"); comp.setTablename("ceat"); + comp.setOperationType(DataOperationType.UPDATE); List<LockComponent> components = new ArrayList<LockComponent>(1); components.add(comp); LockRequest req = new LockRequest(components, "me", "localhost"); @@ -242,6 +247,7 @@ public class TestInitiator extends CompactorTest { long txnid = openTxn(); LockComponent comp = new LockComponent(LockType.SHARED_WRITE, LockLevel.TABLE, "default"); comp.setTablename("ncwncs"); + comp.setOperationType(DataOperationType.UPDATE); List<LockComponent> components = new ArrayList<LockComponent>(1); components.add(comp); LockRequest req = new LockRequest(components, "me", "localhost"); @@ -267,6 +273,7 @@ public class TestInitiator extends CompactorTest { for (int i = 0; i < 11; i++) { long txnid = openTxn(); LockComponent comp = new LockComponent(LockType.SHARED_WRITE, LockLevel.TABLE, "default"); + comp.setOperationType(DataOperationType.DELETE); comp.setTablename("ncwncs"); List<LockComponent> components = new ArrayList<LockComponent>(1); components.add(comp); @@ -292,6 +299,7 @@ public class TestInitiator extends CompactorTest { long txnid = openTxn(); LockComponent comp = new LockComponent(LockType.SHARED_WRITE, LockLevel.TABLE, "default"); comp.setTablename("ncwcas"); + comp.setOperationType(DataOperationType.UPDATE); List<LockComponent> components = new ArrayList<LockComponent>(1); components.add(comp); LockRequest req = new LockRequest(components, "me", "localhost"); @@ -332,6 +340,7 @@ public class TestInitiator extends CompactorTest { long txnid = openTxn(); LockComponent comp = new LockComponent(LockType.SHARED_WRITE, LockLevel.TABLE, "default"); comp.setTablename("cthdp"); + comp.setOperationType(DataOperationType.UPDATE); List<LockComponent> components = new ArrayList<LockComponent>(1); components.add(comp); LockRequest req = new LockRequest(components, "me", "localhost"); @@ -364,6 +373,7 @@ public class TestInitiator extends CompactorTest { LockComponent comp = new LockComponent(LockType.SHARED_WRITE, LockLevel.PARTITION, "default"); comp.setTablename("cphdp"); comp.setPartitionname("ds=today"); + comp.setOperationType(DataOperationType.UPDATE); List<LockComponent> components = new ArrayList<LockComponent>(1); components.add(comp); LockRequest req = new LockRequest(components, "me", "localhost"); @@ -395,6 +405,7 @@ public class TestInitiator extends CompactorTest { long txnid = openTxn(); LockComponent comp = new LockComponent(LockType.SHARED_WRITE, LockLevel.TABLE, "default"); comp.setTablename("nctdpnhe"); + comp.setOperationType(DataOperationType.UPDATE); List<LockComponent> components = new ArrayList<LockComponent>(1); components.add(comp); LockRequest req = new LockRequest(components, "me", "localhost"); @@ -430,6 +441,7 @@ public class TestInitiator extends CompactorTest { long txnid = openTxn(); LockComponent comp = new LockComponent(LockType.SHARED_WRITE, LockLevel.TABLE, "default"); comp.setTablename("cttmd"); + comp.setOperationType(DataOperationType.UPDATE); List<LockComponent> components = new ArrayList<LockComponent>(1); components.add(comp); LockRequest req = new LockRequest(components, "me", "localhost"); @@ -471,6 +483,7 @@ public class TestInitiator extends CompactorTest { LockComponent comp = new LockComponent(LockType.SHARED_WRITE, LockLevel.PARTITION, "default"); comp.setTablename("cptmd"); comp.setPartitionname("ds=today"); + comp.setOperationType(DataOperationType.UPDATE); List<LockComponent> components = new ArrayList<LockComponent>(1); components.add(comp); LockRequest req = new LockRequest(components, "me", "localhost"); @@ -502,6 +515,7 @@ public class TestInitiator extends CompactorTest { long txnid = openTxn(); LockComponent comp = new LockComponent(LockType.SHARED_WRITE, LockLevel.TABLE, "default"); comp.setTablename("nctned"); + comp.setOperationType(DataOperationType.UPDATE); List<LockComponent> components = new ArrayList<LockComponent>(1); components.add(comp); LockRequest req = new LockRequest(components, "me", "localhost"); @@ -537,6 +551,7 @@ public class TestInitiator extends CompactorTest { long txnid = openTxn(); LockComponent comp = new LockComponent(LockType.SHARED_WRITE, LockLevel.TABLE, "default"); comp.setTablename("cmomwbv"); + comp.setOperationType(DataOperationType.UPDATE); List<LockComponent> components = new ArrayList<LockComponent>(1); components.add(comp); LockRequest req = new LockRequest(components, "me", "localhost"); @@ -577,6 +592,7 @@ public class TestInitiator extends CompactorTest { LockComponent comp = new LockComponent(LockType.SHARED_WRITE, LockLevel.PARTITION, "default"); comp.setTablename("ednb"); comp.setPartitionname("ds=today"); + comp.setOperationType(DataOperationType.DELETE); List<LockComponent> components = new ArrayList<LockComponent>(1); components.add(comp); LockRequest req = new LockRequest(components, "me", "localhost"); @@ -610,6 +626,7 @@ public class TestInitiator extends CompactorTest { LockComponent comp = new LockComponent(LockType.SHARED_WRITE, LockLevel.PARTITION, "default"); comp.setTablename("ttospgocr"); comp.setPartitionname("ds=today"); + comp.setOperationType(DataOperationType.UPDATE); List<LockComponent> components = new ArrayList<LockComponent>(1); components.add(comp); LockRequest req = new LockRequest(components, "me", "localhost"); @@ -621,6 +638,7 @@ public class TestInitiator extends CompactorTest { comp = new LockComponent(LockType.SHARED_WRITE, LockLevel.PARTITION, "default"); comp.setTablename("ttospgocr"); comp.setPartitionname("ds=today"); + comp.setOperationType(DataOperationType.UPDATE); components = new ArrayList<LockComponent>(1); components.add(comp); req = new LockRequest(components, "me", "localhost"); @@ -653,6 +671,7 @@ public class TestInitiator extends CompactorTest { long txnid = openTxn(); LockComponent comp = new LockComponent(LockType.SHARED_WRITE, LockLevel.TABLE, "default"); comp.setTablename("nctdp"); + comp.setOperationType(DataOperationType.UPDATE); List<LockComponent> components = new ArrayList<LockComponent>(1); components.add(comp); LockRequest req = new LockRequest(components, "me", "localhost"); @@ -680,6 +699,7 @@ public class TestInitiator extends CompactorTest { long txnid = openTxn(); LockComponent comp = new LockComponent(LockType.SHARED_WRITE, LockLevel.PARTITION, "default"); comp.setTablename("dt"); + comp.setOperationType(DataOperationType.UPDATE); List<LockComponent> components = new ArrayList<LockComponent>(1); components.add(comp); LockRequest req = new LockRequest(components, "me", "localhost"); @@ -711,6 +731,7 @@ public class TestInitiator extends CompactorTest { LockComponent comp = new LockComponent(LockType.SHARED_WRITE, LockLevel.PARTITION, "default"); comp.setTablename("dp"); comp.setPartitionname("ds=today"); + comp.setOperationType(DataOperationType.UPDATE); List<LockComponent> components = new ArrayList<LockComponent>(1); components.add(comp); LockRequest req = new LockRequest(components, "me", "localhost");
