Repository: incubator-distributedlog Updated Branches: refs/heads/master dc1bb30ba -> 40df29d9b
DL-86: Improve handling of lock conflicts in zk session lock merge twitter's change from Sijie Guo. Author: Jordan Bull <jb...@twitter.com> Author: Sijie Guo <sij...@twitter.com> Reviewers: Leigh Stewart <lstew...@apache.org> Closes #58 from sijie/merge/DL-86 Project: http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/commit/40df29d9 Tree: http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/tree/40df29d9 Diff: http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/diff/40df29d9 Branch: refs/heads/master Commit: 40df29d9b892c660f6626c92472e6c6b4b52007b Parents: dc1bb30 Author: Jordan Bull <jb...@twitter.com> Authored: Fri Dec 16 21:51:03 2016 -0800 Committer: Sijie Guo <si...@apache.org> Committed: Fri Dec 16 21:51:03 2016 -0800 ---------------------------------------------------------------------- .../distributedlog/lock/ZKSessionLock.java | 35 ++++++++++++-- .../distributedlog/lock/TestZKSessionLock.java | 49 ++++++++++++++++++++ 2 files changed, 80 insertions(+), 4 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/40df29d9/distributedlog-core/src/main/java/com/twitter/distributedlog/lock/ZKSessionLock.java ---------------------------------------------------------------------- diff --git a/distributedlog-core/src/main/java/com/twitter/distributedlog/lock/ZKSessionLock.java b/distributedlog-core/src/main/java/com/twitter/distributedlog/lock/ZKSessionLock.java index 87894dc..dc57d55 100644 --- a/distributedlog-core/src/main/java/com/twitter/distributedlog/lock/ZKSessionLock.java +++ b/distributedlog-core/src/main/java/com/twitter/distributedlog/lock/ZKSessionLock.java @@ -480,6 +480,31 @@ class ZKSessionLock implements SessionLock { return id; } + static boolean areLockWaitersInSameSession(String node1, String node2) { + String[] parts1 = node1.split("_"); + String[] parts2 = node2.split("_"); + if (parts1.length != 4 || parts2.length != 4) { + return node1.equals(node2); + } + if (!parts1[2].startsWith("s") || !parts2[2].startsWith("s")) { + return node1.equals(node2); + } + long sessionOwner1 = Long.parseLong(parts1[2].substring(1)); + long sessionOwner2 = Long.parseLong(parts2[2].substring(1)); + if (sessionOwner1 != sessionOwner2) { + return false; + } + String clientId1, clientId2; + try { + clientId1 = URLDecoder.decode(parts1[1], UTF_8.name()); + clientId2 = URLDecoder.decode(parts2[1], UTF_8.name()); + return clientId1.equals(clientId2); + } catch (UnsupportedEncodingException e) { + // if failed to parse client id, we have to get client id by zookeeper#getData. + return node1.equals(node2); + } + } + /** * Get client id and its ephemeral owner. * @@ -1209,17 +1234,19 @@ class ZKSessionLock implements SessionLock { @Override public void execute() { boolean shouldWatch; + final boolean shouldClaimOwnership; if (lockContext.hasLockId(currentOwner) && siblingNode.equals(ownerNode)) { // if the current owner is the znode left from previous session // we should watch it and claim ownership shouldWatch = true; + shouldClaimOwnership = true; LOG.info("LockWatcher {} for {} found its previous session {} held lock, watch it to claim ownership.", new Object[] { myNode, lockPath, currentOwner }); - } else if (lockId.compareTo(currentOwner) == 0 && siblingNode.equals(ownerNode)) { + } else if (lockId.compareTo(currentOwner) == 0 && areLockWaitersInSameSession(siblingNode, ownerNode)) { // I found that my sibling is the current owner with same lock id (client id & session id) // It must be left by any race condition from same zookeeper client - // I would watch owner instead of sibling shouldWatch = true; + shouldClaimOwnership = true; LOG.info("LockWatcher {} for {} found itself {} already held lock at sibling node {}, watch it to claim ownership.", new Object[]{myNode, lockPath, lockId, siblingNode}); } else { @@ -1230,6 +1257,7 @@ class ZKSessionLock implements SessionLock { new Object[]{lockPath, myNode, siblingNode, System.currentTimeMillis()}); } } + shouldClaimOwnership = false; } // watch sibling for lock ownership @@ -1247,8 +1275,7 @@ class ZKSessionLock implements SessionLock { } if (KeeperException.Code.OK.intValue() == rc) { - if (siblingNode.equals(ownerNode) && - (lockId.compareTo(currentOwner) == 0 || lockContext.hasLockId(currentOwner))) { + if (shouldClaimOwnership) { // watch owner successfully LOG.info("LockWatcher {} claimed ownership for {} after set watcher on {}.", new Object[]{ myNode, lockPath, ownerNode }); http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/40df29d9/distributedlog-core/src/test/java/com/twitter/distributedlog/lock/TestZKSessionLock.java ---------------------------------------------------------------------- diff --git a/distributedlog-core/src/test/java/com/twitter/distributedlog/lock/TestZKSessionLock.java b/distributedlog-core/src/test/java/com/twitter/distributedlog/lock/TestZKSessionLock.java index 629538e..054d714 100644 --- a/distributedlog-core/src/test/java/com/twitter/distributedlog/lock/TestZKSessionLock.java +++ b/distributedlog-core/src/test/java/com/twitter/distributedlog/lock/TestZKSessionLock.java @@ -180,6 +180,28 @@ public class TestZKSessionLock extends ZooKeeperClusterTestCase { } @Test(timeout = 60000) + public void testAreLockWaitersInSameSession() throws Exception { + ZooKeeper zk = zkc.get(); + + String lockPath = "/test-are-lock-waiters-in-same-session"; + String clientId1 = "test-are-lock-waiters-in-same-session-1"; + String clientId2 = "test-are-lock-waiters-in-same-session-2"; + + createLockPath(zk, lockPath); + + String node1 = getLockIdFromPath(createLockNodeV3(zk, lockPath, clientId1)); + String node2 = getLockIdFromPath(createLockNodeV3(zk, lockPath, clientId2)); + String node3 = getLockIdFromPath(createLockNodeV3(zk, lockPath, clientId1)); + + assertEquals(node1 + " and " + node3 + " should be in same session.", + true, areLockWaitersInSameSession(node1, node3)); + assertEquals(node1 + " and " + node2 + " should be not in same session.", + false, areLockWaitersInSameSession(node1, node2)); + assertEquals(node3 + " and " + node2 + " should be not in same session.", + false, areLockWaitersInSameSession(node3, node2)); + } + + @Test(timeout = 60000) public void testExecuteLockAction() throws Exception { String lockPath = "/test-execute-lock-action"; String clientId = "test-execute-lock-action-" + System.currentTimeMillis(); @@ -921,6 +943,33 @@ public class TestZKSessionLock extends ZooKeeperClusterTestCase { lock1_1.unlock(); } + @Test(timeout = 60000) + public void testLockWithMultipleSiblingWaiters() throws Exception { + String lockPath = "/test-lock-with-multiple-sibling-waiters"; + String clientId = "client-id"; + + createLockPath(zkc.get(), lockPath); + + final ZKSessionLock lock0 = new ZKSessionLock(zkc, lockPath, clientId, lockStateExecutor); + final ZKSessionLock lock1 = new ZKSessionLock(zkc, lockPath, clientId, lockStateExecutor); + final ZKSessionLock lock2 = new ZKSessionLock(zkc, lockPath, clientId, lockStateExecutor); + + lock0.tryLock(Long.MAX_VALUE, TimeUnit.MILLISECONDS); + lock1.tryLock(Long.MAX_VALUE, TimeUnit.MILLISECONDS); + lock2.tryLock(Long.MAX_VALUE, TimeUnit.MILLISECONDS); + + List<String> children = awaitWaiters(3, zkc, lockPath); + + assertEquals(3, children.size()); + assertEquals(State.CLAIMED, lock0.getLockState()); + assertEquals(State.CLAIMED, lock1.getLockState()); + assertEquals(State.CLAIMED, lock2.getLockState()); + + lock0.unlock(); + lock1.unlock(); + lock2.unlock(); + } + /** * Immediate lock and unlock first lock * @throws Exception