Improve handling of lock conflicts in zk session lock - lock reacquire could happen in foreground and background thread. so use a semaphore to make sure there is only on e outstanding acquire operation. and check if already hold lock before reacquire. - fix handling zk sibling znode logic. as the znode is sequential znode, the name would be different each time. so only comparing the client id and session id of the znodes
RB_ID=833945 Project: http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/commit/7b46a9ac Tree: http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/tree/7b46a9ac Diff: http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/diff/7b46a9ac Branch: refs/heads/merge/DL-98 Commit: 7b46a9ac6bb5d520366069c244332347ef019e8e Parents: 0091960 Author: Sijie Guo <sij...@twitter.com> Authored: Mon May 23 16:49:19 2016 -0700 Committer: Sijie Guo <sij...@twitter.com> Committed: Mon Dec 12 16:28:58 2016 -0800 ---------------------------------------------------------------------- .../distributedlog/lock/ZKSessionLock.java | 35 ++++++++++++-- .../distributedlog/lock/TestZKSessionLock.java | 49 ++++++++++++++++++++ 2 files changed, 80 insertions(+), 4 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/7b46a9ac/distributedlog-core/src/main/java/com/twitter/distributedlog/lock/ZKSessionLock.java ---------------------------------------------------------------------- diff --git a/distributedlog-core/src/main/java/com/twitter/distributedlog/lock/ZKSessionLock.java b/distributedlog-core/src/main/java/com/twitter/distributedlog/lock/ZKSessionLock.java index 87894dc..dc57d55 100644 --- a/distributedlog-core/src/main/java/com/twitter/distributedlog/lock/ZKSessionLock.java +++ b/distributedlog-core/src/main/java/com/twitter/distributedlog/lock/ZKSessionLock.java @@ -480,6 +480,31 @@ class ZKSessionLock implements SessionLock { return id; } + static boolean areLockWaitersInSameSession(String node1, String node2) { + String[] parts1 = node1.split("_"); + String[] parts2 = node2.split("_"); + if (parts1.length != 4 || parts2.length != 4) { + return node1.equals(node2); + } + if (!parts1[2].startsWith("s") || !parts2[2].startsWith("s")) { + return node1.equals(node2); + } + long sessionOwner1 = Long.parseLong(parts1[2].substring(1)); + long sessionOwner2 = Long.parseLong(parts2[2].substring(1)); + if (sessionOwner1 != sessionOwner2) { + return false; + } + String clientId1, clientId2; + try { + clientId1 = URLDecoder.decode(parts1[1], UTF_8.name()); + clientId2 = URLDecoder.decode(parts2[1], UTF_8.name()); + return clientId1.equals(clientId2); + } catch (UnsupportedEncodingException e) { + // if failed to parse client id, we have to get client id by zookeeper#getData. + return node1.equals(node2); + } + } + /** * Get client id and its ephemeral owner. * @@ -1209,17 +1234,19 @@ class ZKSessionLock implements SessionLock { @Override public void execute() { boolean shouldWatch; + final boolean shouldClaimOwnership; if (lockContext.hasLockId(currentOwner) && siblingNode.equals(ownerNode)) { // if the current owner is the znode left from previous session // we should watch it and claim ownership shouldWatch = true; + shouldClaimOwnership = true; LOG.info("LockWatcher {} for {} found its previous session {} held lock, watch it to claim ownership.", new Object[] { myNode, lockPath, currentOwner }); - } else if (lockId.compareTo(currentOwner) == 0 && siblingNode.equals(ownerNode)) { + } else if (lockId.compareTo(currentOwner) == 0 && areLockWaitersInSameSession(siblingNode, ownerNode)) { // I found that my sibling is the current owner with same lock id (client id & session id) // It must be left by any race condition from same zookeeper client - // I would watch owner instead of sibling shouldWatch = true; + shouldClaimOwnership = true; LOG.info("LockWatcher {} for {} found itself {} already held lock at sibling node {}, watch it to claim ownership.", new Object[]{myNode, lockPath, lockId, siblingNode}); } else { @@ -1230,6 +1257,7 @@ class ZKSessionLock implements SessionLock { new Object[]{lockPath, myNode, siblingNode, System.currentTimeMillis()}); } } + shouldClaimOwnership = false; } // watch sibling for lock ownership @@ -1247,8 +1275,7 @@ class ZKSessionLock implements SessionLock { } if (KeeperException.Code.OK.intValue() == rc) { - if (siblingNode.equals(ownerNode) && - (lockId.compareTo(currentOwner) == 0 || lockContext.hasLockId(currentOwner))) { + if (shouldClaimOwnership) { // watch owner successfully LOG.info("LockWatcher {} claimed ownership for {} after set watcher on {}.", new Object[]{ myNode, lockPath, ownerNode }); http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/7b46a9ac/distributedlog-core/src/test/java/com/twitter/distributedlog/lock/TestZKSessionLock.java ---------------------------------------------------------------------- diff --git a/distributedlog-core/src/test/java/com/twitter/distributedlog/lock/TestZKSessionLock.java b/distributedlog-core/src/test/java/com/twitter/distributedlog/lock/TestZKSessionLock.java index 629538e..054d714 100644 --- a/distributedlog-core/src/test/java/com/twitter/distributedlog/lock/TestZKSessionLock.java +++ b/distributedlog-core/src/test/java/com/twitter/distributedlog/lock/TestZKSessionLock.java @@ -180,6 +180,28 @@ public class TestZKSessionLock extends ZooKeeperClusterTestCase { } @Test(timeout = 60000) + public void testAreLockWaitersInSameSession() throws Exception { + ZooKeeper zk = zkc.get(); + + String lockPath = "/test-are-lock-waiters-in-same-session"; + String clientId1 = "test-are-lock-waiters-in-same-session-1"; + String clientId2 = "test-are-lock-waiters-in-same-session-2"; + + createLockPath(zk, lockPath); + + String node1 = getLockIdFromPath(createLockNodeV3(zk, lockPath, clientId1)); + String node2 = getLockIdFromPath(createLockNodeV3(zk, lockPath, clientId2)); + String node3 = getLockIdFromPath(createLockNodeV3(zk, lockPath, clientId1)); + + assertEquals(node1 + " and " + node3 + " should be in same session.", + true, areLockWaitersInSameSession(node1, node3)); + assertEquals(node1 + " and " + node2 + " should be not in same session.", + false, areLockWaitersInSameSession(node1, node2)); + assertEquals(node3 + " and " + node2 + " should be not in same session.", + false, areLockWaitersInSameSession(node3, node2)); + } + + @Test(timeout = 60000) public void testExecuteLockAction() throws Exception { String lockPath = "/test-execute-lock-action"; String clientId = "test-execute-lock-action-" + System.currentTimeMillis(); @@ -921,6 +943,33 @@ public class TestZKSessionLock extends ZooKeeperClusterTestCase { lock1_1.unlock(); } + @Test(timeout = 60000) + public void testLockWithMultipleSiblingWaiters() throws Exception { + String lockPath = "/test-lock-with-multiple-sibling-waiters"; + String clientId = "client-id"; + + createLockPath(zkc.get(), lockPath); + + final ZKSessionLock lock0 = new ZKSessionLock(zkc, lockPath, clientId, lockStateExecutor); + final ZKSessionLock lock1 = new ZKSessionLock(zkc, lockPath, clientId, lockStateExecutor); + final ZKSessionLock lock2 = new ZKSessionLock(zkc, lockPath, clientId, lockStateExecutor); + + lock0.tryLock(Long.MAX_VALUE, TimeUnit.MILLISECONDS); + lock1.tryLock(Long.MAX_VALUE, TimeUnit.MILLISECONDS); + lock2.tryLock(Long.MAX_VALUE, TimeUnit.MILLISECONDS); + + List<String> children = awaitWaiters(3, zkc, lockPath); + + assertEquals(3, children.size()); + assertEquals(State.CLAIMED, lock0.getLockState()); + assertEquals(State.CLAIMED, lock1.getLockState()); + assertEquals(State.CLAIMED, lock2.getLockState()); + + lock0.unlock(); + lock1.unlock(); + lock2.unlock(); + } + /** * Immediate lock and unlock first lock * @throws Exception