CURATOR-498 "Protection" has a potential bug. If the connection is lost for long enough, Curator will want to kill the session. Session deletions must be handled by the Leader ZK instance. At the same time that the session kill is being processed, Curator's protection mode handling could be calling the follower that it's connected to get the current list of children - this can be handled directly by the follower instance without needing to call the leader. So, in this scenario, the client will get a list of children that includes the ZNode that will get deleted as part of killing the session.
This bug has been in Curator since we added the protection feature to it more than 6 years ago. The fix is to include the session ID in the protection ID that is generated for the node name when the create mode is an ephemeral type. Then, if findProtectedNodeInForeground() finds the node in the use-case we've been discussing, it can compare the session ID to the current ZooKeeper handle's session ID and disregard the found node if they don't match. Project: http://git-wip-us.apache.org/repos/asf/curator/repo Commit: http://git-wip-us.apache.org/repos/asf/curator/commit/dafd0914 Tree: http://git-wip-us.apache.org/repos/asf/curator/tree/dafd0914 Diff: http://git-wip-us.apache.org/repos/asf/curator/diff/dafd0914 Branch: refs/heads/CURATOR-498 Commit: dafd091412a834a128c9882d2b9534d1a0ff7735 Parents: 4b0bc85 Author: randgalt <[email protected]> Authored: Tue Jan 1 22:34:41 2019 -0500 Committer: randgalt <[email protected]> Committed: Tue Jan 1 22:34:41 2019 -0500 ---------------------------------------------------------------------- .../framework/imps/CreateBuilderImpl.java | 55 +++++++++---- .../FindAndDeleteProtectedNodeInBackground.java | 2 +- .../framework/imps/TestFrameworkEdges.java | 82 +++++++++++++++++++- 3 files changed, 121 insertions(+), 18 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/curator/blob/dafd0914/curator-framework/src/main/java/org/apache/curator/framework/imps/CreateBuilderImpl.java ---------------------------------------------------------------------- diff --git a/curator-framework/src/main/java/org/apache/curator/framework/imps/CreateBuilderImpl.java b/curator-framework/src/main/java/org/apache/curator/framework/imps/CreateBuilderImpl.java index ce82542..0108063 100644 --- a/curator-framework/src/main/java/org/apache/curator/framework/imps/CreateBuilderImpl.java +++ b/curator-framework/src/main/java/org/apache/curator/framework/imps/CreateBuilderImpl.java @@ -674,7 +674,7 @@ public class CreateBuilderImpl implements CreateBuilder, CreateBuilder2, Backgro }; client.getZooKeeper().create ( - operationAndData.getData().getPath(), + insertSessionIdIfNeeded(operationAndData.getData().getPath()), data, acling.getAclList(operationAndData.getData().getPath()), createMode, @@ -687,7 +687,7 @@ public class CreateBuilderImpl implements CreateBuilder, CreateBuilder2, Backgro CreateZK35.create ( client.getZooKeeper(), - operationAndData.getData().getPath(), + insertSessionIdIfNeeded(operationAndData.getData().getPath()), data, acling.getAclList(operationAndData.getData().getPath()), createMode, @@ -784,9 +784,13 @@ public class CreateBuilderImpl implements CreateBuilder, CreateBuilder2, Backgro }; } - private static String getProtectedPrefix(String protectedId) + private static String getProtectedPrefix(String protectedId, long sessionId) { - return PROTECTED_PREFIX + protectedId + "-"; + if ( sessionId != 0 ) + { + return PROTECTED_PREFIX + protectedId + '-' + sessionId + '-'; + } + return PROTECTED_PREFIX + protectedId + '-'; } static <T> void backgroundCreateParentsThenNode(final CuratorFrameworkImpl client, final OperationAndData<T> mainOperationAndData, final String path, Backgrounding backgrounding, final InternalACLProvider aclProvider, final boolean createParentsAsContainers) @@ -1134,8 +1138,8 @@ public class CreateBuilderImpl implements CreateBuilder, CreateBuilder2, Backgro if ( failNextCreateForTesting ) { - pathInForeground(path, data, acling.getAclList(path)); // simulate success on server without notification to client failNextCreateForTesting = false; + pathInForeground(path, data, acling.getAclList(path)); // simulate success on server without notification to client throw new KeeperException.ConnectionLossException(); } @@ -1172,29 +1176,30 @@ public class CreateBuilderImpl implements CreateBuilder, CreateBuilder2, Backgro if ( createdPath == null ) { + String sessionAdjustedPath = insertSessionIdIfNeeded(path); try { if ( client.isZk34CompatibilityMode() ) { - createdPath = client.getZooKeeper().create(path, data, aclList, createMode); + createdPath = client.getZooKeeper().create(sessionAdjustedPath, data, aclList, createMode); } else { - createdPath = client.getZooKeeper().create(path, data, aclList, createMode, storingStat, ttl); + createdPath = client.getZooKeeper().create(sessionAdjustedPath, data, aclList, createMode, storingStat, ttl); } } catch ( KeeperException.NoNodeException e ) { if ( createParentsIfNeeded ) { - ZKPaths.mkdirs(client.getZooKeeper(), path, false, acling.getACLProviderForParents(), createParentsAsContainers); + ZKPaths.mkdirs(client.getZooKeeper(), sessionAdjustedPath, false, acling.getACLProviderForParents(), createParentsAsContainers); if ( client.isZk34CompatibilityMode() ) { - createdPath = client.getZooKeeper().create(path, data, acling.getAclList(path), createMode); + createdPath = client.getZooKeeper().create(sessionAdjustedPath, data, acling.getAclList(sessionAdjustedPath), createMode); } else { - createdPath = client.getZooKeeper().create(path, data, acling.getAclList(path), createMode, storingStat, ttl); + createdPath = client.getZooKeeper().create(sessionAdjustedPath, data, acling.getAclList(sessionAdjustedPath), createMode, storingStat, ttl); } } else @@ -1206,12 +1211,12 @@ public class CreateBuilderImpl implements CreateBuilder, CreateBuilder2, Backgro { if ( setDataIfExists ) { - Stat setStat = client.getZooKeeper().setData(path, data, setDataIfExistsVersion); + Stat setStat = client.getZooKeeper().setData(sessionAdjustedPath, data, setDataIfExistsVersion); if(storingStat != null) { DataTree.copyStat(setStat, storingStat); } - createdPath = path; + createdPath = sessionAdjustedPath; } else { @@ -1252,7 +1257,7 @@ public class CreateBuilderImpl implements CreateBuilder, CreateBuilder2, Backgro final ZKPaths.PathAndNode pathAndNode = ZKPaths.getPathAndNode(path); List<String> children = client.getZooKeeper().getChildren(pathAndNode.getPath(), false); - foundNode = findNode(children, pathAndNode.getPath(), protectedId); + foundNode = findNode(children, pathAndNode.getPath(), protectedId, createMode.isEphemeral() ? client.getZooKeeper().getSessionId() : 0); } catch ( KeeperException.NoNodeException ignore ) { @@ -1273,7 +1278,7 @@ public class CreateBuilderImpl implements CreateBuilder, CreateBuilder2, Backgro if ( doProtected ) { ZKPaths.PathAndNode pathAndNode = ZKPaths.getPathAndNode(path); - String name = getProtectedPrefix(protectedId) + pathAndNode.getNode(); + String name = getProtectedPrefix(protectedId, 0) + pathAndNode.getNode(); path = ZKPaths.makePath(pathAndNode.getPath(), name); } return path; @@ -1285,11 +1290,12 @@ public class CreateBuilderImpl implements CreateBuilder, CreateBuilder2, Backgro * @param children a list of candidates znodes * @param path the path * @param protectedId the protected id + * @param sessionId if a session id should be included, 0 if not * @return the absolute path of the znode or <code>null</code> if it is not found */ - static String findNode(final List<String> children, final String path, final String protectedId) + static String findNode(final List<String> children, final String path, final String protectedId, final long sessionId) { - final String protectedPrefix = getProtectedPrefix(protectedId); + final String protectedPrefix = getProtectedPrefix(protectedId, sessionId); String foundNode = Iterables.find ( children, @@ -1309,4 +1315,21 @@ public class CreateBuilderImpl implements CreateBuilder, CreateBuilder2, Backgro } return foundNode; } + + private String insertSessionIdIfNeeded(String path) throws Exception + { + if ( doProtected && createMode.isEphemeral() ) + { + // per CURATOR-498 - it's been discovered that the protected mode search can discover + // a created ephemeral node that will get deleted due to session timeouts. To work around + // this include the session ID in the ZNode name and search for the node with the current + // session ID thus ignoring created ZNodes with stale session IDs. A UUID is still included + // in the node name so that the protection mode contract is maintained + ZKPaths.PathAndNode pathAndNode = ZKPaths.getPathAndNode(path); + String unadjustedNode = pathAndNode.getNode().substring(PROTECTED_PREFIX.length() + protectedId.length() + 1); + String newNode = getProtectedPrefix(protectedId, client.getZooKeeper().getSessionId()) + unadjustedNode; + return ZKPaths.makePath(pathAndNode.getPath(), newNode); + } + return path; + } } http://git-wip-us.apache.org/repos/asf/curator/blob/dafd0914/curator-framework/src/main/java/org/apache/curator/framework/imps/FindAndDeleteProtectedNodeInBackground.java ---------------------------------------------------------------------- diff --git a/curator-framework/src/main/java/org/apache/curator/framework/imps/FindAndDeleteProtectedNodeInBackground.java b/curator-framework/src/main/java/org/apache/curator/framework/imps/FindAndDeleteProtectedNodeInBackground.java index de91552..928a550 100644 --- a/curator-framework/src/main/java/org/apache/curator/framework/imps/FindAndDeleteProtectedNodeInBackground.java +++ b/curator-framework/src/main/java/org/apache/curator/framework/imps/FindAndDeleteProtectedNodeInBackground.java @@ -81,7 +81,7 @@ class FindAndDeleteProtectedNodeInBackground implements BackgroundOperation<Void if ( rc == KeeperException.Code.OK.intValue() ) { - final String node = CreateBuilderImpl.findNode(strings, "/", protectedId); // due to namespacing, don't let CreateBuilderImpl.findNode adjust the path + final String node = CreateBuilderImpl.findNode(strings, "/", protectedId, 0); // due to namespacing, don't let CreateBuilderImpl.findNode adjust the path if ( node != null ) { try http://git-wip-us.apache.org/repos/asf/curator/blob/dafd0914/curator-framework/src/test/java/org/apache/curator/framework/imps/TestFrameworkEdges.java ---------------------------------------------------------------------- diff --git a/curator-framework/src/test/java/org/apache/curator/framework/imps/TestFrameworkEdges.java b/curator-framework/src/test/java/org/apache/curator/framework/imps/TestFrameworkEdges.java index a28d6c5..1cd9670 100644 --- a/curator-framework/src/test/java/org/apache/curator/framework/imps/TestFrameworkEdges.java +++ b/curator-framework/src/test/java/org/apache/curator/framework/imps/TestFrameworkEdges.java @@ -30,11 +30,15 @@ import org.apache.curator.framework.api.CreateBuilder; import org.apache.curator.framework.api.CuratorEvent; import org.apache.curator.framework.api.CuratorEventType; import org.apache.curator.framework.api.CuratorListener; +import org.apache.curator.framework.api.ErrorListenerPathAndBytesable; import org.apache.curator.framework.state.ConnectionState; import org.apache.curator.framework.state.ConnectionStateListener; +import org.apache.curator.retry.RetryForever; import org.apache.curator.retry.RetryNTimes; import org.apache.curator.retry.RetryOneTime; import org.apache.curator.test.BaseClassForTests; +import org.apache.curator.test.InstanceSpec; +import org.apache.curator.test.TestingCluster; import org.apache.curator.test.TestingServer; import org.apache.curator.test.compatibility.KillSession2; import org.apache.curator.test.compatibility.Timing2; @@ -50,11 +54,15 @@ import org.slf4j.LoggerFactory; import org.testng.Assert; import org.testng.annotations.BeforeClass; import org.testng.annotations.Test; +import java.io.IOException; +import java.util.Collection; +import java.util.Collections; import java.util.List; import java.util.Random; import java.util.concurrent.ArrayBlockingQueue; import java.util.concurrent.BlockingQueue; import java.util.concurrent.CountDownLatch; +import java.util.concurrent.LinkedBlockingQueue; import java.util.concurrent.Semaphore; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; @@ -62,7 +70,6 @@ import java.util.concurrent.atomic.AtomicInteger; public class TestFrameworkEdges extends BaseClassForTests { - private final Logger log = LoggerFactory.getLogger(getClass()); private final Timing2 timing = new Timing2(); @@ -73,6 +80,79 @@ public class TestFrameworkEdges extends BaseClassForTests } @Test + public void testProtectionWithKilledSession() throws Exception + { + server.stop(); // not needed + + // see CURATOR-498 + // attempt to re-create the state described in the bug report: create a 3 Instance ensemble; + // have Curator connect to only 1 one of those instances; set failNextCreateForTesting to + // simulate protection mode searching; kill the connected server when this happens; + // wait for session timeout to elapse and then restart the instance. In most cases + // this will cause the scenario as Curator will send the session cancel and do protection mode + // search around the same time. The protection mode search should return first as it can be resolved + // by the Instance Curator is connected to but the session kill needs a quorum vote (it's a + // transaction) + + try ( TestingCluster cluster = new TestingCluster(3) ) + { + cluster.start(); + InstanceSpec instanceSpec0 = cluster.getServers().get(0).getInstanceSpec(); + + CountDownLatch serverStoppedLatch = new CountDownLatch(1); + RetryPolicy retryPolicy = new RetryForever(100) + { + @Override + public boolean allowRetry(int retryCount, long elapsedTimeMs, RetrySleeper sleeper) + { + if ( serverStoppedLatch.getCount() > 0 ) + { + try + { + cluster.killServer(instanceSpec0); + } + catch ( Exception e ) + { + // ignore + } + serverStoppedLatch.countDown(); + } + return super.allowRetry(retryCount, elapsedTimeMs, sleeper); + } + }; + + try (CuratorFramework client = CuratorFrameworkFactory.newClient(instanceSpec0.getConnectString(), timing.session(), timing.connection(), retryPolicy)) + { + BlockingQueue<String> createdNode = new LinkedBlockingQueue<>(); + BackgroundCallback callback = (__, event) -> { + if ( event.getType() == CuratorEventType.CREATE ) + { + createdNode.offer(event.getPath()); + } + }; + + client.start(); + client.create().forPath("/test"); + + ErrorListenerPathAndBytesable<String> builder = client.create().withProtection().withMode(CreateMode.EPHEMERAL).inBackground(callback); + ((CreateBuilderImpl)builder).failNextCreateForTesting = true; + + builder.forPath("/test/hey"); + + Assert.assertTrue(timing.awaitLatch(serverStoppedLatch)); + timing.forSessionSleep().sleep(); // wait for session to expire + cluster.restartServer(instanceSpec0); + + String path = timing.takeFromQueue(createdNode); + timing.sleepABit(); + List<String> children = client.getChildren().forPath("/test"); + Assert.assertEquals(children, Collections.singletonList(ZKPaths.getNodeFromPath(path)), path + " is not equal to getChildren: " + children); + Assert.assertTrue(path.contains(Long.toString(client.getZookeeperClient().getZooKeeper().getSessionId()))); + } + } + } + + @Test public void testBackgroundLatencyUnSleep() throws Exception { server.stop();
