Repository: hive
Updated Branches:
  refs/heads/master ed487ac40 -> 53a590b53


HIVE-15077 - Acid LockManager is unfair (Eugene Koifman, reviewed by Alan Gates)


Project: http://git-wip-us.apache.org/repos/asf/hive/repo
Commit: http://git-wip-us.apache.org/repos/asf/hive/commit/53a590b5
Tree: http://git-wip-us.apache.org/repos/asf/hive/tree/53a590b5
Diff: http://git-wip-us.apache.org/repos/asf/hive/diff/53a590b5

Branch: refs/heads/master
Commit: 53a590b5372c30369aae8c7b32895edc6026112e
Parents: ed487ac
Author: Eugene Koifman <ekoif...@apache.org>
Authored: Fri Feb 23 13:35:06 2018 -0800
Committer: Eugene Koifman <ekoif...@apache.org>
Committed: Fri Feb 23 13:35:06 2018 -0800

----------------------------------------------------------------------
 .../hadoop/hive/ql/txn/compactor/Cleaner.java   |   2 +-
 .../hive/ql/lockmgr/TestDbTxnManager2.java      | 109 ++++++++++++++++++-
 .../hadoop/hive/metastore/txn/TxnHandler.java   |  51 ++++-----
 3 files changed, 130 insertions(+), 32 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hive/blob/53a590b5/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/Cleaner.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/Cleaner.java 
b/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/Cleaner.java
index 31f50fa..af0884c 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/Cleaner.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/Cleaner.java
@@ -246,7 +246,7 @@ public class Cleaner extends CompactorThread {
        * are resolved (i.e. not opened).  This is what "highestWriteId" 
tracks.  This is only tracked
        * since Hive 1.3.0/2.0 - thus may be 0.  See ValidCompactorWriteIdList 
and uses for more info.
        * 
-       * We only want to clean up to the highestWriteId - otherwise we risk 
deleteing deltas from
+       * We only want to clean up to the highestWriteId - otherwise we risk 
deleting deltas from
        * under an active reader.
        * 
        * Suppose we have deltas D2 D3 for table T, i.e. the last compaction 
created D3 so now there is a 

http://git-wip-us.apache.org/repos/asf/hive/blob/53a590b5/ql/src/test/org/apache/hadoop/hive/ql/lockmgr/TestDbTxnManager2.java
----------------------------------------------------------------------
diff --git 
a/ql/src/test/org/apache/hadoop/hive/ql/lockmgr/TestDbTxnManager2.java 
b/ql/src/test/org/apache/hadoop/hive/ql/lockmgr/TestDbTxnManager2.java
index d411a8b..d9a5feb 100644
--- a/ql/src/test/org/apache/hadoop/hive/ql/lockmgr/TestDbTxnManager2.java
+++ b/ql/src/test/org/apache/hadoop/hive/ql/lockmgr/TestDbTxnManager2.java
@@ -72,7 +72,7 @@ import java.util.Map;
  * each statement and can also simulate concurrent (but very controlled) work 
but w/o forking any
  * threads.  The limitation here is that not all statements are allowed in an 
explicit transaction.
  * For example, "drop table foo".  This approach will also cause the query to 
execute which will
- * make tests slower but will exericise the code path that is much closer to 
the actual user calls.
+ * make tests slower but will exercise the code path that is much closer to 
the actual user calls.
  *
  * In either approach, each logical "session" should use it's own Transaction 
Manager.  This requires
  * using {@link #swapTxnManager(HiveTxnManager)} since in the SessionState the 
TM is associated with
@@ -1467,7 +1467,7 @@ public class TestDbTxnManager2 {
       3, TxnDbUtil.countQueryAgent(conf, "select count(*) from 
COMPLETED_TXN_COMPONENTS where ctc_table='tab1' and ctc_partition is not 
null"));
   }
   /**
-   * Concurrent delte/detele of same partition - should pass
+   * Concurrent delete/detele of same partition - should pass
    */
   @Test
   public void testWriteSetTracking11() throws Exception {
@@ -2230,4 +2230,109 @@ public class TestDbTxnManager2 {
     Assert.assertEquals("Lock remained", 0, getLocks().size());
     Assert.assertEquals("Lock remained", 0, getLocks(txnMgr2).size());
   }
+  @Test
+  public void testFairness() throws Exception {
+    dropTable(new String[] {"T6"});
+    CommandProcessorResponse cpr = driver.run("create table if not exists T6(a 
int)");
+    checkCmdOnDriver(cpr);
+    cpr = driver.compileAndRespond("select a from T6");
+    checkCmdOnDriver(cpr);
+    txnMgr.acquireLocks(driver.getPlan(), ctx, "Fifer");//gets S lock on T6
+    HiveTxnManager txnMgr2 = 
TxnManagerFactory.getTxnManagerFactory().getTxnManager(conf);
+    swapTxnManager(txnMgr2);
+    cpr = driver.compileAndRespond("drop table if exists T6");
+    checkCmdOnDriver(cpr);
+    //tries to get X lock on T6 and gets Waiting state
+    LockState lockState = ((DbTxnManager) 
txnMgr2).acquireLocks(driver.getPlan(), ctx, "Fiddler", false);
+    List<ShowLocksResponseElement> locks = getLocks();
+    Assert.assertEquals("Unexpected lock count", 2, locks.size());
+    checkLock(LockType.SHARED_READ, LockState.ACQUIRED, "default", "T6", null, 
locks);
+    checkLock(LockType.EXCLUSIVE, LockState.WAITING, "default", "T6", null, 
locks);
+
+    HiveTxnManager txnMgr3 = 
TxnManagerFactory.getTxnManagerFactory().getTxnManager(conf);
+    swapTxnManager(txnMgr3);
+    //this should block behind the X lock on  T6
+    //this is a contrived example, in practice this query would of course fail 
after drop table
+    cpr = driver.compileAndRespond("select a from T6");
+    checkCmdOnDriver(cpr);
+    ((DbTxnManager)txnMgr3).acquireLocks(driver.getPlan(), ctx, "Fifer", 
false);//gets S lock on T6
+    locks = getLocks();
+    Assert.assertEquals("Unexpected lock count", 3, locks.size());
+    checkLock(LockType.SHARED_READ, LockState.ACQUIRED, "default", "T6", null, 
locks);
+    checkLock(LockType.SHARED_READ, LockState.WAITING, "default", "T6", null, 
locks);
+    checkLock(LockType.EXCLUSIVE, LockState.WAITING, "default", "T6", null, 
locks);
+  }
+
+  /**
+   * T7 is a table with 2 partitions
+   * 1. run select from T7
+   * 2. run drop partition from T7
+   * concurrently with 1 starting first so that 2 blocks
+   * 3. start another concurrent select on T7 - it should block behind waiting 
X (from drop) - LM should be fair
+   * 4. finish #1 so that drop unblocks
+   * 5. rollback the drop to release its X lock
+   * 6. # should unblock
+   */
+  @Test
+  public void testFairness2() throws Exception {
+    dropTable(new String[]{"T7"});
+    CommandProcessorResponse cpr = driver.run("create table if not exists T7 
(a int) "
+        + "partitioned by (p int) stored as orc TBLPROPERTIES 
('transactional'='true')");
+    checkCmdOnDriver(cpr);
+    checkCmdOnDriver(driver.run(
+        "insert into T7 partition(p) values(1,1),(1,2)"));//create 2 partitions
+    cpr = driver.compileAndRespond("select a from T7 ");
+    checkCmdOnDriver(cpr);
+    txnMgr.acquireLocks(driver.getPlan(), ctx, "Fifer");//gets S lock on T7
+    HiveTxnManager txnMgr2 = 
TxnManagerFactory.getTxnManagerFactory().getTxnManager(conf);
+    swapTxnManager(txnMgr2);
+    cpr = driver.compileAndRespond("alter table T7 drop partition (p=1)");
+    checkCmdOnDriver(cpr);
+    //tries to get X lock on T7.p=1 and gets Waiting state
+    LockState lockState = ((DbTxnManager) 
txnMgr2).acquireLocks(driver.getPlan(), ctx,
+        "Fiddler", false);
+    List<ShowLocksResponseElement> locks = getLocks();
+    Assert.assertEquals("Unexpected lock count", 4, locks.size());
+    checkLock(LockType.SHARED_READ, LockState.ACQUIRED, "default", "T7", null, 
locks);
+    checkLock(LockType.SHARED_READ, LockState.ACQUIRED, "default", "T7", 
"p=1", locks);
+    checkLock(LockType.SHARED_READ, LockState.ACQUIRED, "default", "T7", 
"p=2", locks);
+    checkLock(LockType.EXCLUSIVE, LockState.WAITING, "default", "T7", "p=1", 
locks);
+
+    HiveTxnManager txnMgr3 = 
TxnManagerFactory.getTxnManagerFactory().getTxnManager(conf);
+    swapTxnManager(txnMgr3);
+    //this should block behind the X lock on  T7.p=1
+    cpr = driver.compileAndRespond("select a from T7");
+    checkCmdOnDriver(cpr);
+    //tries to get S lock on T7, S on T7.p=1 and S on T7.p=2
+    ((DbTxnManager)txnMgr3).acquireLocks(driver.getPlan(), ctx, "Fifer", 
false);
+    locks = getLocks();
+    Assert.assertEquals("Unexpected lock count", 7, locks.size());
+    checkLock(LockType.SHARED_READ, LockState.ACQUIRED, "default", "T7", null, 
locks);
+    checkLock(LockType.SHARED_READ, LockState.ACQUIRED, "default", "T7", 
"p=1", locks);
+    checkLock(LockType.SHARED_READ, LockState.ACQUIRED, "default", "T7", 
"p=2", locks);
+    checkLock(LockType.SHARED_READ, LockState.WAITING, "default", "T7", null, 
locks);
+    checkLock(LockType.SHARED_READ, LockState.WAITING, "default", "T7", "p=1", 
locks);
+    checkLock(LockType.SHARED_READ, LockState.WAITING, "default", "T7", "p=2", 
locks);
+    checkLock(LockType.EXCLUSIVE, LockState.WAITING, "default", "T7", "p=1", 
locks);
+
+    txnMgr.commitTxn();//release locks from "select a from T7" - to unblock 
hte drop partition
+    //retest the the "drop partiton" X lock
+    lockState = 
((DbLockManager)txnMgr2.getLockManager()).checkLock(locks.get(6).getLockid());
+    locks = getLocks();
+    Assert.assertEquals("Unexpected lock count", 4, locks.size());
+    checkLock(LockType.EXCLUSIVE, LockState.ACQUIRED, "default", "T7", "p=1", 
locks);
+    checkLock(LockType.SHARED_READ, LockState.WAITING, "default", "T7", null, 
locks);
+    checkLock(LockType.SHARED_READ, LockState.WAITING, "default", "T7", "p=1", 
locks);
+    checkLock(LockType.SHARED_READ, LockState.WAITING, "default", "T7", "p=2", 
locks);
+
+    txnMgr2.rollbackTxn();//release the X lock on T7.p=1
+    //re-test the locks
+    lockState = 
((DbLockManager)txnMgr2.getLockManager()).checkLock(locks.get(1).getLockid());//S
 lock on T7
+    locks = getLocks();
+    Assert.assertEquals("Unexpected lock count", 3, locks.size());
+    checkLock(LockType.SHARED_READ, LockState.ACQUIRED, "default", "T7", null, 
locks);
+    checkLock(LockType.SHARED_READ, LockState.ACQUIRED, "default", "T7", 
"p=1", locks);
+    checkLock(LockType.SHARED_READ, LockState.ACQUIRED, "default", "T7", 
"p=2", locks);
+
+  }
 }

http://git-wip-us.apache.org/repos/asf/hive/blob/53a590b5/standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/txn/TxnHandler.java
----------------------------------------------------------------------
diff --git 
a/standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/txn/TxnHandler.java
 
b/standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/txn/TxnHandler.java
index ac61715..6a74594 100644
--- 
a/standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/txn/TxnHandler.java
+++ 
b/standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/txn/TxnHandler.java
@@ -2690,7 +2690,14 @@ abstract class TxnHandler implements TxnStore, 
TxnStore.MutexAPI {
   }
 
   /**
-   * Sort more restrictive locks after less restrictive ones
+   * Sort more restrictive locks after less restrictive ones.  Why?
+   * Consider insert overwirte table DB.T1 select ... from T2:
+   * this takes X lock on DB.T1 and S lock on T2
+   * Also, create table DB.T3: takes S lock on DB.
+   * so the state of the lock manger is {X(T1), S(T2) S(DB)} all in acquired 
state.
+   * This is made possible by HIVE-10242.
+   * Now a select * from T1 will try to get S(T1) which according to the 
'jumpTable' will
+   * be acquired once it sees S(DB).  So need to check stricter locks first.
    */
   private final static class LockTypeComparator implements 
Comparator<LockType> {
     public boolean equals(Object other) {
@@ -2842,7 +2849,7 @@ abstract class TxnHandler implements TxnStore, 
TxnStore.MutexAPI {
    * Lock acquisition is meant to be fair, so every lock can only block on 
some lock with smaller
    * hl_lock_ext_id by only checking earlier locks.
    *
-   * For any given SQL statment all locks required by it are grouped under 
single extLockId and are
+   * For any given SQL statement all locks required by it are grouped under 
single extLockId and are
    * granted all at once or all locks wait.
    *
    * This is expected to run at READ_COMMITTED.
@@ -2871,7 +2878,7 @@ abstract class TxnHandler implements TxnStore, 
TxnStore.MutexAPI {
     boolean isPartOfDynamicPartitionInsert = true;
     try {
       /**
-       * checkLock() must be mutexed against any other checkLock to make sure 
2 conflicting locks
+       * checkLock() must be mutex'd against any other checkLock to make sure 
2 conflicting locks
        * are not granted by parallel checkLock() calls.
        */
       handle = getMutexAPI().acquireLock(MUTEX_KEY.CheckLock.name());
@@ -3007,7 +3014,7 @@ abstract class TxnHandler implements TxnStore, 
TxnStore.MutexAPI {
           query.append("))");
         }
       }
-      query.append(" and hl_lock_ext_id <= ").append(extLockId);
+      query.append(" and hl_lock_ext_id < ").append(extLockId);
 
       LOG.debug("Going to execute query <" + query.toString() + ">");
       stmt = dbConn.createStatement();
@@ -3027,57 +3034,43 @@ abstract class TxnHandler implements TxnStore, 
TxnStore.MutexAPI {
       }
 
       for (LockInfo info : locksBeingChecked) {
-        // Find the lock record we're checking
-        int index = -1;
-        for (int i = 0; i < locks.length; i++) {
-          if (locks[i].equals(info)) {
-            index = i;
-            break;
-          }
-        }
-
-        // If we didn't find the lock, then it must not be in the table
-        if (index == -1) {
-          LOG.debug("Going to rollback");
-          dbConn.rollback();
-          throw new MetaException("How did we get here, we heartbeated our 
lock before we started! ( " + info + ")");
-        }
-
-
         // If we've found it and it's already been marked acquired,
         // then just look at the other locks.
-        if (locks[index].state == LockState.ACQUIRED) {
+        if (info.state == LockState.ACQUIRED) {
           /**this is what makes this method @SafeToRetry*/
           continue;
         }
 
         // Look at everything in front of this lock to see if it should block
         // it or not.
-        for (int i = index - 1; i >= 0; i--) {
+        for (int i = locks.length - 1; i >= 0; i--) {
           // Check if we're operating on the same database, if not, move on
-          if (!locks[index].db.equals(locks[i].db)) {
+          if (!info.db.equals(locks[i].db)) {
             continue;
           }
 
           // If table is null on either of these, then they are claiming to
           // lock the whole database and we need to check it.  Otherwise,
           // check if they are operating on the same table, if not, move on.
-          if (locks[index].table != null && locks[i].table != null
-            && !locks[index].table.equals(locks[i].table)) {
+          if (info.table != null && locks[i].table != null
+            && !info.table.equals(locks[i].table)) {
             continue;
           }
+          // if here, we may be checking a DB level lock against a Table level 
lock.  Alternatively,
+          // we could have used Intention locks (for example a request for S 
lock on table would
+          // cause an IS lock DB that contains the table).  Similarly, at 
partition level.
 
           // If partition is null on either of these, then they are claiming to
           // lock the whole table and we need to check it.  Otherwise,
           // check if they are operating on the same partition, if not, move 
on.
-          if (locks[index].partition != null && locks[i].partition != null
-            && !locks[index].partition.equals(locks[i].partition)) {
+          if (info.partition != null && locks[i].partition != null
+            && !info.partition.equals(locks[i].partition)) {
             continue;
           }
 
           // We've found something that matches what we're trying to lock,
           // so figure out if we can lock it too.
-          LockAction lockAction = 
jumpTable.get(locks[index].type).get(locks[i].type).get(locks[i].state);
+          LockAction lockAction = 
jumpTable.get(info.type).get(locks[i].type).get(locks[i].state);
           LOG.debug("desired Lock: " + info + " checked Lock: " + locks[i] + " 
action: " + lockAction);
           switch (lockAction) {
             case WAIT:

Reply via email to