Improve handling of lock conflicts in zk session lock

- lock reacquire could happen in foreground and background thread. so use a 
semaphore to make sure there is only on e outstanding  acquire operation. and  
check if already hold lock before reacquire.
- fix handling zk sibling znode logic. as the znode is sequential znode, the 
name would be different each time. so only comparing the client id and session 
id of the znodes

RB_ID=833945


Project: http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/repo
Commit: 
http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/commit/7b46a9ac
Tree: 
http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/tree/7b46a9ac
Diff: 
http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/diff/7b46a9ac

Branch: refs/heads/merge/DL-98
Commit: 7b46a9ac6bb5d520366069c244332347ef019e8e
Parents: 0091960
Author: Sijie Guo <sij...@twitter.com>
Authored: Mon May 23 16:49:19 2016 -0700
Committer: Sijie Guo <sij...@twitter.com>
Committed: Mon Dec 12 16:28:58 2016 -0800

----------------------------------------------------------------------
 .../distributedlog/lock/ZKSessionLock.java      | 35 ++++++++++++--
 .../distributedlog/lock/TestZKSessionLock.java  | 49 ++++++++++++++++++++
 2 files changed, 80 insertions(+), 4 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/7b46a9ac/distributedlog-core/src/main/java/com/twitter/distributedlog/lock/ZKSessionLock.java
----------------------------------------------------------------------
diff --git 
a/distributedlog-core/src/main/java/com/twitter/distributedlog/lock/ZKSessionLock.java
 
b/distributedlog-core/src/main/java/com/twitter/distributedlog/lock/ZKSessionLock.java
index 87894dc..dc57d55 100644
--- 
a/distributedlog-core/src/main/java/com/twitter/distributedlog/lock/ZKSessionLock.java
+++ 
b/distributedlog-core/src/main/java/com/twitter/distributedlog/lock/ZKSessionLock.java
@@ -480,6 +480,31 @@ class ZKSessionLock implements SessionLock {
         return id;
     }
 
+    static boolean areLockWaitersInSameSession(String node1, String node2) {
+        String[] parts1 = node1.split("_");
+        String[] parts2 = node2.split("_");
+        if (parts1.length != 4 || parts2.length != 4) {
+            return node1.equals(node2);
+        }
+        if (!parts1[2].startsWith("s") || !parts2[2].startsWith("s")) {
+            return node1.equals(node2);
+        }
+        long sessionOwner1 = Long.parseLong(parts1[2].substring(1));
+        long sessionOwner2 = Long.parseLong(parts2[2].substring(1));
+        if (sessionOwner1 != sessionOwner2) {
+            return false;
+        }
+        String clientId1, clientId2;
+        try {
+            clientId1 = URLDecoder.decode(parts1[1], UTF_8.name());
+            clientId2 = URLDecoder.decode(parts2[1], UTF_8.name());
+            return clientId1.equals(clientId2);
+        } catch (UnsupportedEncodingException e) {
+            // if failed to parse client id, we have to get client id by 
zookeeper#getData.
+            return node1.equals(node2);
+        }
+    }
+
     /**
      * Get client id and its ephemeral owner.
      *
@@ -1209,17 +1234,19 @@ class ZKSessionLock implements SessionLock {
             @Override
             public void execute() {
                 boolean shouldWatch;
+                final boolean shouldClaimOwnership;
                 if (lockContext.hasLockId(currentOwner) && 
siblingNode.equals(ownerNode)) {
                     // if the current owner is the znode left from previous 
session
                     // we should watch it and claim ownership
                     shouldWatch = true;
+                    shouldClaimOwnership = true;
                     LOG.info("LockWatcher {} for {} found its previous session 
{} held lock, watch it to claim ownership.",
                             new Object[] { myNode, lockPath, currentOwner });
-                } else if (lockId.compareTo(currentOwner) == 0 && 
siblingNode.equals(ownerNode)) {
+                } else if (lockId.compareTo(currentOwner) == 0 && 
areLockWaitersInSameSession(siblingNode, ownerNode)) {
                     // I found that my sibling is the current owner with same 
lock id (client id & session id)
                     // It must be left by any race condition from same 
zookeeper client
-                    // I would watch owner instead of sibling
                     shouldWatch = true;
+                    shouldClaimOwnership = true;
                     LOG.info("LockWatcher {} for {} found itself {} already 
held lock at sibling node {}, watch it to claim ownership.",
                             new Object[]{myNode, lockPath, lockId, 
siblingNode});
                 } else {
@@ -1230,6 +1257,7 @@ class ZKSessionLock implements SessionLock {
                                     new Object[]{lockPath, myNode, 
siblingNode, System.currentTimeMillis()});
                         }
                     }
+                    shouldClaimOwnership = false;
                 }
 
                 // watch sibling for lock ownership
@@ -1247,8 +1275,7 @@ class ZKSessionLock implements SessionLock {
                                     }
 
                                     if (KeeperException.Code.OK.intValue() == 
rc) {
-                                        if (siblingNode.equals(ownerNode) &&
-                                                
(lockId.compareTo(currentOwner) == 0 || lockContext.hasLockId(currentOwner))) {
+                                        if (shouldClaimOwnership) {
                                             // watch owner successfully
                                             LOG.info("LockWatcher {} claimed 
ownership for {} after set watcher on {}.",
                                                     new Object[]{ myNode, 
lockPath, ownerNode });

http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/7b46a9ac/distributedlog-core/src/test/java/com/twitter/distributedlog/lock/TestZKSessionLock.java
----------------------------------------------------------------------
diff --git 
a/distributedlog-core/src/test/java/com/twitter/distributedlog/lock/TestZKSessionLock.java
 
b/distributedlog-core/src/test/java/com/twitter/distributedlog/lock/TestZKSessionLock.java
index 629538e..054d714 100644
--- 
a/distributedlog-core/src/test/java/com/twitter/distributedlog/lock/TestZKSessionLock.java
+++ 
b/distributedlog-core/src/test/java/com/twitter/distributedlog/lock/TestZKSessionLock.java
@@ -180,6 +180,28 @@ public class TestZKSessionLock extends 
ZooKeeperClusterTestCase {
     }
 
     @Test(timeout = 60000)
+    public void testAreLockWaitersInSameSession() throws Exception {
+        ZooKeeper zk = zkc.get();
+
+        String lockPath = "/test-are-lock-waiters-in-same-session";
+        String clientId1 = "test-are-lock-waiters-in-same-session-1";
+        String clientId2 = "test-are-lock-waiters-in-same-session-2";
+
+        createLockPath(zk, lockPath);
+
+        String node1 = getLockIdFromPath(createLockNodeV3(zk, lockPath, 
clientId1));
+        String node2 = getLockIdFromPath(createLockNodeV3(zk, lockPath, 
clientId2));
+        String node3 = getLockIdFromPath(createLockNodeV3(zk, lockPath, 
clientId1));
+
+        assertEquals(node1 + " and " + node3 + " should be in same session.",
+                true, areLockWaitersInSameSession(node1, node3));
+        assertEquals(node1 + " and " + node2 + " should be not in same 
session.",
+                false, areLockWaitersInSameSession(node1, node2));
+        assertEquals(node3 + " and " + node2 + " should be not in same 
session.",
+                false, areLockWaitersInSameSession(node3, node2));
+    }
+
+    @Test(timeout = 60000)
     public void testExecuteLockAction() throws Exception {
         String lockPath = "/test-execute-lock-action";
         String clientId = "test-execute-lock-action-" + 
System.currentTimeMillis();
@@ -921,6 +943,33 @@ public class TestZKSessionLock extends 
ZooKeeperClusterTestCase {
         lock1_1.unlock();
     }
 
+    @Test(timeout = 60000)
+    public void testLockWithMultipleSiblingWaiters() throws Exception {
+        String lockPath = "/test-lock-with-multiple-sibling-waiters";
+        String clientId = "client-id";
+
+        createLockPath(zkc.get(), lockPath);
+
+        final ZKSessionLock lock0 = new ZKSessionLock(zkc, lockPath, clientId, 
lockStateExecutor);
+        final ZKSessionLock lock1 = new ZKSessionLock(zkc, lockPath, clientId, 
lockStateExecutor);
+        final ZKSessionLock lock2 = new ZKSessionLock(zkc, lockPath, clientId, 
lockStateExecutor);
+
+        lock0.tryLock(Long.MAX_VALUE, TimeUnit.MILLISECONDS);
+        lock1.tryLock(Long.MAX_VALUE, TimeUnit.MILLISECONDS);
+        lock2.tryLock(Long.MAX_VALUE, TimeUnit.MILLISECONDS);
+
+        List<String> children = awaitWaiters(3, zkc, lockPath);
+
+        assertEquals(3, children.size());
+        assertEquals(State.CLAIMED, lock0.getLockState());
+        assertEquals(State.CLAIMED, lock1.getLockState());
+        assertEquals(State.CLAIMED, lock2.getLockState());
+
+        lock0.unlock();
+        lock1.unlock();
+        lock2.unlock();
+    }
+
     /**
      * Immediate lock and unlock first lock
      * @throws Exception

Reply via email to