Repository: curator Updated Branches: refs/heads/CURATOR-498 [created] 31151b67b
CURATOR-498 "Protection" has a potential bug. If the connection is lost for long enough, Curator will want to kill the session. Session deletions must be handled by the Leader ZK instance. At the same time that the session kill is being processed, Curator's protection mode handling could be calling the follower that it's connected to get the current list of children - this can be handled directly by the follower instance without needing to call the leader. So, in this scenario, the client will get a list of children that includes the ZNode that will get deleted as part of killing the session. This bug has been in Curator since we added the protection feature to it more than 6 years ago. The only feasible fix is to set a watcher on the found ZNode so that clients can be notified if the ZNode is deleted (or no longer exists). This requires changes to any client unfortunately. All Curator recipes will be updated by existing Curator client code will need to be updated manually. Project: http://git-wip-us.apache.org/repos/asf/curator/repo Commit: http://git-wip-us.apache.org/repos/asf/curator/commit/31151b67 Tree: http://git-wip-us.apache.org/repos/asf/curator/tree/31151b67 Diff: http://git-wip-us.apache.org/repos/asf/curator/diff/31151b67 Branch: refs/heads/CURATOR-498 Commit: 31151b67b2b704d7a31579284168d3f034694fed Parents: 219c881 Author: randgalt <[email protected]> Authored: Wed Jan 2 17:53:11 2019 -0500 Committer: randgalt <[email protected]> Committed: Wed Jan 2 17:53:11 2019 -0500 ---------------------------------------------------------------------- .../framework/api/CreateBuilderMain.java | 2 + ...ateProtectACLCreateModePathAndBytesable.java | 2 + .../ProtectACLCreateModePathAndBytesable.java | 2 + .../framework/imps/CreateBuilderImpl.java | 72 +++++++++++++++-- .../framework/state/ConnectionStateManager.java | 6 +- .../framework/imps/TestFrameworkEdges.java | 84 +++++++++++++++++++- .../framework/recipes/leader/LeaderLatch.java | 19 ++++- 7 files changed, 177 insertions(+), 10 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/curator/blob/31151b67/curator-framework/src/main/java/org/apache/curator/framework/api/CreateBuilderMain.java ---------------------------------------------------------------------- diff --git a/curator-framework/src/main/java/org/apache/curator/framework/api/CreateBuilderMain.java b/curator-framework/src/main/java/org/apache/curator/framework/api/CreateBuilderMain.java index 3d076b2..c6d480a 100644 --- a/curator-framework/src/main/java/org/apache/curator/framework/api/CreateBuilderMain.java +++ b/curator-framework/src/main/java/org/apache/curator/framework/api/CreateBuilderMain.java @@ -83,4 +83,6 @@ public interface CreateBuilderMain extends * @return this */ public ACLCreateModeStatBackgroundPathAndBytesable<String> withProtection(); + + public Watchable<ACLCreateModeStatBackgroundPathAndBytesable<String>> withWatchedProtection(); } http://git-wip-us.apache.org/repos/asf/curator/blob/31151b67/curator-framework/src/main/java/org/apache/curator/framework/api/CreateProtectACLCreateModePathAndBytesable.java ---------------------------------------------------------------------- diff --git a/curator-framework/src/main/java/org/apache/curator/framework/api/CreateProtectACLCreateModePathAndBytesable.java b/curator-framework/src/main/java/org/apache/curator/framework/api/CreateProtectACLCreateModePathAndBytesable.java index 9e0c840..0d6f35b 100755 --- a/curator-framework/src/main/java/org/apache/curator/framework/api/CreateProtectACLCreateModePathAndBytesable.java +++ b/curator-framework/src/main/java/org/apache/curator/framework/api/CreateProtectACLCreateModePathAndBytesable.java @@ -69,4 +69,6 @@ public interface CreateProtectACLCreateModePathAndBytesable<T> extends * @return this */ public ACLCreateModeBackgroundPathAndBytesable<String> withProtection(); + + public Watchable<ACLCreateModeStatBackgroundPathAndBytesable<String>> withWatchedProtection(); } http://git-wip-us.apache.org/repos/asf/curator/blob/31151b67/curator-framework/src/main/java/org/apache/curator/framework/api/ProtectACLCreateModePathAndBytesable.java ---------------------------------------------------------------------- diff --git a/curator-framework/src/main/java/org/apache/curator/framework/api/ProtectACLCreateModePathAndBytesable.java b/curator-framework/src/main/java/org/apache/curator/framework/api/ProtectACLCreateModePathAndBytesable.java index 1d1df10..57f4beb 100644 --- a/curator-framework/src/main/java/org/apache/curator/framework/api/ProtectACLCreateModePathAndBytesable.java +++ b/curator-framework/src/main/java/org/apache/curator/framework/api/ProtectACLCreateModePathAndBytesable.java @@ -51,4 +51,6 @@ public interface ProtectACLCreateModePathAndBytesable<T> extends * @return this */ public ACLCreateModeBackgroundPathAndBytesable<String> withProtection(); + + public Watchable<ACLCreateModeStatBackgroundPathAndBytesable<String>> withWatchedProtection(); } http://git-wip-us.apache.org/repos/asf/curator/blob/31151b67/curator-framework/src/main/java/org/apache/curator/framework/imps/CreateBuilderImpl.java ---------------------------------------------------------------------- diff --git a/curator-framework/src/main/java/org/apache/curator/framework/imps/CreateBuilderImpl.java b/curator-framework/src/main/java/org/apache/curator/framework/imps/CreateBuilderImpl.java index ce82542..e446024 100644 --- a/curator-framework/src/main/java/org/apache/curator/framework/imps/CreateBuilderImpl.java +++ b/curator-framework/src/main/java/org/apache/curator/framework/imps/CreateBuilderImpl.java @@ -36,6 +36,8 @@ import org.apache.zookeeper.AsyncCallback; import org.apache.zookeeper.CreateMode; import org.apache.zookeeper.KeeperException; import org.apache.zookeeper.Op; +import org.apache.zookeeper.WatchedEvent; +import org.apache.zookeeper.Watcher; import org.apache.zookeeper.data.ACL; import org.apache.zookeeper.data.Stat; import org.apache.zookeeper.server.DataTree; @@ -53,14 +55,15 @@ public class CreateBuilderImpl implements CreateBuilder, CreateBuilder2, Backgro private Backgrounding backgrounding; private boolean createParentsIfNeeded; private boolean createParentsAsContainers; - private boolean doProtected; private boolean compress; private boolean setDataIfExists; private int setDataIfExistsVersion = -1; - private String protectedId; private ACLing acling; private Stat storingStat; private long ttl; + private boolean doProtected; + private String protectedId; + private Watching protectedWatching; @VisibleForTesting boolean failNextCreateForTesting = false; @@ -77,11 +80,13 @@ public class CreateBuilderImpl implements CreateBuilder, CreateBuilder2, Backgro createParentsIfNeeded = false; createParentsAsContainers = false; compress = false; - doProtected = false; setDataIfExists = false; - protectedId = null; storingStat = null; ttl = -1; + + doProtected = false; + protectedId = null; + protectedWatching = new Watching(client); } public CreateBuilderImpl(CuratorFrameworkImpl client, CreateMode createMode, Backgrounding backgrounding, boolean createParentsIfNeeded, boolean createParentsAsContainers, boolean doProtected, boolean compress, boolean setDataIfExists, List<ACL> aclList, Stat storingStat, long ttl) @@ -398,6 +403,12 @@ public class CreateBuilderImpl implements CreateBuilder, CreateBuilder2, Backgro } @Override + public Watchable<ACLCreateModeStatBackgroundPathAndBytesable<String>> withWatchedProtection() + { + return CreateBuilderImpl.this.withWatchedProtection(); + } + + @Override public BackgroundPathAndBytesable<String> withACL(List<ACL> aclList) { return withACL(aclList, false); @@ -480,6 +491,35 @@ public class CreateBuilderImpl implements CreateBuilder, CreateBuilder2, Backgro } @Override + public Watchable<ACLCreateModeStatBackgroundPathAndBytesable<String>> withWatchedProtection() + { + setProtected(); + return new Watchable<ACLCreateModeStatBackgroundPathAndBytesable<String>>() + { + @Override + public ACLCreateModeStatBackgroundPathAndBytesable<String> watched() + { + protectedWatching = new Watching(client, true); + return asACLCreateModeStatBackgroundPathAndBytesable(); + } + + @Override + public ACLCreateModeStatBackgroundPathAndBytesable<String> usingWatcher(Watcher watcher) + { + protectedWatching = new Watching(client, watcher); + return asACLCreateModeStatBackgroundPathAndBytesable(); + } + + @Override + public ACLCreateModeStatBackgroundPathAndBytesable<String> usingWatcher(CuratorWatcher watcher) + { + protectedWatching = new Watching(client, watcher); + return asACLCreateModeStatBackgroundPathAndBytesable(); + } + }; + } + + @Override public ACLPathAndBytesable<String> withProtectedEphemeralSequential() { setProtected(); @@ -773,6 +813,12 @@ public class CreateBuilderImpl implements CreateBuilder, CreateBuilder2, Backgro } @Override + public Watchable<ACLCreateModeStatBackgroundPathAndBytesable<String>> withWatchedProtection() + { + return CreateBuilderImpl.this.withWatchedProtection(); + } + + @Override public ProtectACLCreateModePathAndBytesable<String> creatingParentsIfNeeded() { return CreateBuilderImpl.this.creatingParentsIfNeeded(); } @@ -1134,8 +1180,8 @@ public class CreateBuilderImpl implements CreateBuilder, CreateBuilder2, Backgro if ( failNextCreateForTesting ) { - pathInForeground(path, data, acling.getAclList(path)); // simulate success on server without notification to client failNextCreateForTesting = false; + pathInForeground(path, data, acling.getAclList(path)); // simulate success on server without notification to client throw new KeeperException.ConnectionLossException(); } @@ -1253,6 +1299,22 @@ public class CreateBuilderImpl implements CreateBuilder, CreateBuilder2, Backgro List<String> children = client.getZooKeeper().getChildren(pathAndNode.getPath(), false); foundNode = findNode(children, pathAndNode.getPath(), protectedId); + if ( (foundNode != null) && protectedWatching.hasWatcher() ) + { + String foundPath = ZKPaths.makePath(pathAndNode.getPath(), foundNode); + Watcher protectedWatcher = protectedWatching.getWatcher(foundPath); + try + { + client.getZooKeeper().getData(foundPath, protectedWatcher, null); + protectedWatching.commitWatcher(KeeperException.Code.OK.intValue(), false); + } + catch ( KeeperException.NoNodeException e ) + { + protectedWatching.commitWatcher(KeeperException.Code.CONNECTIONLOSS.intValue(), false); // CONNECTIONLOSS no need to register namespace watcher + WatchedEvent event = new WatchedEvent(Watcher.Event.EventType.NodeDeleted, Watcher.Event.KeeperState.SyncConnected, e.getPath()); + protectedWatcher.process(event); + } + } } catch ( KeeperException.NoNodeException ignore ) { http://git-wip-us.apache.org/repos/asf/curator/blob/31151b67/curator-framework/src/main/java/org/apache/curator/framework/state/ConnectionStateManager.java ---------------------------------------------------------------------- diff --git a/curator-framework/src/main/java/org/apache/curator/framework/state/ConnectionStateManager.java b/curator-framework/src/main/java/org/apache/curator/framework/state/ConnectionStateManager.java index 5e28b3d..bbcb5c2 100644 --- a/curator-framework/src/main/java/org/apache/curator/framework/state/ConnectionStateManager.java +++ b/curator-framework/src/main/java/org/apache/curator/framework/state/ConnectionStateManager.java @@ -25,6 +25,7 @@ import org.apache.curator.framework.CuratorFramework; import org.apache.curator.framework.listen.ListenerContainer; import org.apache.curator.utils.Compatibility; import org.apache.curator.utils.ThreadUtils; +import org.apache.zookeeper.ZooKeeper; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import java.io.Closeable; @@ -302,10 +303,11 @@ public class ConnectionStateManager implements Closeable if ( elapsedMs >= useSessionTimeoutMs ) { startOfSuspendedEpoch = System.currentTimeMillis(); // reset startOfSuspendedEpoch to avoid spinning on this session expiration injection CURATOR-405 - log.warn(String.format("Session timeout has elapsed while SUSPENDED. Injecting a session expiration. Elapsed ms: %d. Adjusted session timeout ms: %d", elapsedMs, useSessionTimeoutMs)); try { - Compatibility.injectSessionExpiration(client.getZookeeperClient().getZooKeeper()); + ZooKeeper zooKeeper = client.getZookeeperClient().getZooKeeper(); + log.warn(String.format("Session timeout has elapsed while SUSPENDED. Injecting a session expiration. Elapsed ms: %d. Adjusted session timeout ms: %d. SessionId: 0x%s", elapsedMs, useSessionTimeoutMs, Long.toHexString(zooKeeper.getSessionId()))); + Compatibility.injectSessionExpiration(zooKeeper); } catch ( Exception e ) { http://git-wip-us.apache.org/repos/asf/curator/blob/31151b67/curator-framework/src/test/java/org/apache/curator/framework/imps/TestFrameworkEdges.java ---------------------------------------------------------------------- diff --git a/curator-framework/src/test/java/org/apache/curator/framework/imps/TestFrameworkEdges.java b/curator-framework/src/test/java/org/apache/curator/framework/imps/TestFrameworkEdges.java index a28d6c5..4fffc8b 100644 --- a/curator-framework/src/test/java/org/apache/curator/framework/imps/TestFrameworkEdges.java +++ b/curator-framework/src/test/java/org/apache/curator/framework/imps/TestFrameworkEdges.java @@ -30,11 +30,15 @@ import org.apache.curator.framework.api.CreateBuilder; import org.apache.curator.framework.api.CuratorEvent; import org.apache.curator.framework.api.CuratorEventType; import org.apache.curator.framework.api.CuratorListener; +import org.apache.curator.framework.api.ErrorListenerPathAndBytesable; import org.apache.curator.framework.state.ConnectionState; import org.apache.curator.framework.state.ConnectionStateListener; +import org.apache.curator.retry.RetryForever; import org.apache.curator.retry.RetryNTimes; import org.apache.curator.retry.RetryOneTime; import org.apache.curator.test.BaseClassForTests; +import org.apache.curator.test.InstanceSpec; +import org.apache.curator.test.TestingCluster; import org.apache.curator.test.TestingServer; import org.apache.curator.test.compatibility.KillSession2; import org.apache.curator.test.compatibility.Timing2; @@ -55,6 +59,7 @@ import java.util.Random; import java.util.concurrent.ArrayBlockingQueue; import java.util.concurrent.BlockingQueue; import java.util.concurrent.CountDownLatch; +import java.util.concurrent.LinkedBlockingQueue; import java.util.concurrent.Semaphore; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; @@ -62,7 +67,6 @@ import java.util.concurrent.atomic.AtomicInteger; public class TestFrameworkEdges extends BaseClassForTests { - private final Logger log = LoggerFactory.getLogger(getClass()); private final Timing2 timing = new Timing2(); @@ -73,6 +77,84 @@ public class TestFrameworkEdges extends BaseClassForTests } @Test + public void testProtectionWithKilledSession() throws Exception + { + server.stop(); // not needed + + // see CURATOR-498 + // attempt to re-create the state described in the bug report: create a 3 Instance ensemble; + // have Curator connect to only 1 one of those instances; set failNextCreateForTesting to + // simulate protection mode searching; kill the connected server when this happens; + // wait for session timeout to elapse and then restart the instance. In most cases + // this will cause the scenario as Curator will send the session cancel and do protection mode + // search around the same time. The protection mode search should return first as it can be resolved + // by the Instance Curator is connected to but the session kill needs a quorum vote (it's a + // transaction) + + try ( TestingCluster cluster = new TestingCluster(3) ) + { + cluster.start(); + InstanceSpec instanceSpec0 = cluster.getServers().get(0).getInstanceSpec(); + + CountDownLatch serverStoppedLatch = new CountDownLatch(1); + RetryPolicy retryPolicy = new RetryForever(100) + { + @Override + public boolean allowRetry(int retryCount, long elapsedTimeMs, RetrySleeper sleeper) + { + if ( serverStoppedLatch.getCount() > 0 ) + { + try + { + cluster.killServer(instanceSpec0); + } + catch ( Exception e ) + { + // ignore + } + serverStoppedLatch.countDown(); + } + return super.allowRetry(retryCount, elapsedTimeMs, sleeper); + } + }; + + try (CuratorFramework client = CuratorFrameworkFactory.newClient(instanceSpec0.getConnectString(), timing.session(), timing.connection(), retryPolicy)) + { + BlockingQueue<String> createdNode = new LinkedBlockingQueue<>(); + BackgroundCallback callback = (__, event) -> { + if ( event.getType() == CuratorEventType.CREATE ) + { + createdNode.offer(event.getPath()); + } + }; + + client.start(); + client.create().forPath("/test"); + + CountDownLatch protectedNodeLatch = new CountDownLatch(1); + Watcher protectedNodeWatcher = watchedEvent -> { + if ( watchedEvent.getType() == Watcher.Event.EventType.NodeDeleted ) + { + protectedNodeLatch.countDown(); + } + }; + ErrorListenerPathAndBytesable<String> builder = client.create().withWatchedProtection().usingWatcher(protectedNodeWatcher).withMode(CreateMode.EPHEMERAL).inBackground(callback); + ((CreateBuilderImpl)builder).failNextCreateForTesting = true; + + builder.forPath("/test/hey"); + + Assert.assertTrue(timing.awaitLatch(serverStoppedLatch)); + timing.forSessionSleep().sleep(); // wait for session to expire + cluster.restartServer(instanceSpec0); + + String path = timing.takeFromQueue(createdNode); + Assert.assertNotNull(path); + Assert.assertTrue(timing.awaitLatch(protectedNodeLatch)); + } + } + } + + @Test public void testBackgroundLatencyUnSleep() throws Exception { server.stop(); http://git-wip-us.apache.org/repos/asf/curator/blob/31151b67/curator-recipes/src/main/java/org/apache/curator/framework/recipes/leader/LeaderLatch.java ---------------------------------------------------------------------- diff --git a/curator-recipes/src/main/java/org/apache/curator/framework/recipes/leader/LeaderLatch.java b/curator-recipes/src/main/java/org/apache/curator/framework/recipes/leader/LeaderLatch.java index bb8aa73..c8e8e22 100644 --- a/curator-recipes/src/main/java/org/apache/curator/framework/recipes/leader/LeaderLatch.java +++ b/curator-recipes/src/main/java/org/apache/curator/framework/recipes/leader/LeaderLatch.java @@ -33,6 +33,7 @@ import org.apache.curator.framework.recipes.locks.LockInternalsSorter; import org.apache.curator.framework.recipes.locks.StandardLockInternalsDriver; import org.apache.curator.framework.state.ConnectionState; import org.apache.curator.framework.state.ConnectionStateListener; +import org.apache.curator.utils.PathUtils; import org.apache.curator.utils.ThreadUtils; import org.apache.curator.utils.ZKPaths; import org.apache.zookeeper.CreateMode; @@ -52,7 +53,6 @@ import java.util.concurrent.Future; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicReference; -import org.apache.curator.utils.PathUtils; /** * <p> @@ -84,6 +84,21 @@ public class LeaderLatch implements Closeable } }; + private final Watcher protectedModeWatcher = watchedEvent -> { + if ( watchedEvent.getType() == Watcher.Event.EventType.NodeDeleted ) + { + try + { + log.warn("Protected mode node was deleted. Resetting."); + reset(); + } + catch ( Exception e ) + { + log.error("Could not reset from protectedModeWatcher", e); + } + } + }; + private static final String LOCK_NAME = "latch-"; private static final LockInternalsSorter sorter = new LockInternalsSorter() @@ -504,7 +519,7 @@ public class LeaderLatch implements Closeable } } }; - client.create().creatingParentContainersIfNeeded().withProtection().withMode(CreateMode.EPHEMERAL_SEQUENTIAL).inBackground(callback).forPath(ZKPaths.makePath(latchPath, LOCK_NAME), LeaderSelector.getIdBytes(id)); + client.create().creatingParentContainersIfNeeded().withWatchedProtection().usingWatcher(protectedModeWatcher).withMode(CreateMode.EPHEMERAL_SEQUENTIAL).inBackground(callback).forPath(ZKPaths.makePath(latchPath, LOCK_NAME), LeaderSelector.getIdBytes(id)); } private synchronized void internalStart()
