Repository: curator
Updated Branches:
  refs/heads/CURATOR-498 [created] 31151b67b


CURATOR-498

"Protection" has a potential bug. If the connection is lost for long enough, 
Curator will want to kill the session. Session deletions must be handled by the 
Leader ZK instance. At the same time that the session kill is being processed, 
Curator's protection mode handling could be calling the follower that it's 
connected to get the current list of children - this can be handled directly by 
the follower instance without needing to call the leader. So, in this scenario, 
the client will get a list of children that includes the ZNode that will get 
deleted as part of killing the session.

This bug has been in Curator since we added the protection feature to it more 
than 6 years ago. The only feasible fix is to set a watcher on the found ZNode 
so that clients can be notified if the ZNode is deleted (or no longer exists). 
This requires changes to any client unfortunately. All Curator recipes will be 
updated by existing Curator client code will need to be updated
manually.


Project: http://git-wip-us.apache.org/repos/asf/curator/repo
Commit: http://git-wip-us.apache.org/repos/asf/curator/commit/31151b67
Tree: http://git-wip-us.apache.org/repos/asf/curator/tree/31151b67
Diff: http://git-wip-us.apache.org/repos/asf/curator/diff/31151b67

Branch: refs/heads/CURATOR-498
Commit: 31151b67b2b704d7a31579284168d3f034694fed
Parents: 219c881
Author: randgalt <[email protected]>
Authored: Wed Jan 2 17:53:11 2019 -0500
Committer: randgalt <[email protected]>
Committed: Wed Jan 2 17:53:11 2019 -0500

----------------------------------------------------------------------
 .../framework/api/CreateBuilderMain.java        |  2 +
 ...ateProtectACLCreateModePathAndBytesable.java |  2 +
 .../ProtectACLCreateModePathAndBytesable.java   |  2 +
 .../framework/imps/CreateBuilderImpl.java       | 72 +++++++++++++++--
 .../framework/state/ConnectionStateManager.java |  6 +-
 .../framework/imps/TestFrameworkEdges.java      | 84 +++++++++++++++++++-
 .../framework/recipes/leader/LeaderLatch.java   | 19 ++++-
 7 files changed, 177 insertions(+), 10 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/curator/blob/31151b67/curator-framework/src/main/java/org/apache/curator/framework/api/CreateBuilderMain.java
----------------------------------------------------------------------
diff --git 
a/curator-framework/src/main/java/org/apache/curator/framework/api/CreateBuilderMain.java
 
b/curator-framework/src/main/java/org/apache/curator/framework/api/CreateBuilderMain.java
index 3d076b2..c6d480a 100644
--- 
a/curator-framework/src/main/java/org/apache/curator/framework/api/CreateBuilderMain.java
+++ 
b/curator-framework/src/main/java/org/apache/curator/framework/api/CreateBuilderMain.java
@@ -83,4 +83,6 @@ public interface CreateBuilderMain extends
      * @return this
      */
     public ACLCreateModeStatBackgroundPathAndBytesable<String>    
withProtection();
+
+    public Watchable<ACLCreateModeStatBackgroundPathAndBytesable<String>>    
withWatchedProtection();
 }

http://git-wip-us.apache.org/repos/asf/curator/blob/31151b67/curator-framework/src/main/java/org/apache/curator/framework/api/CreateProtectACLCreateModePathAndBytesable.java
----------------------------------------------------------------------
diff --git 
a/curator-framework/src/main/java/org/apache/curator/framework/api/CreateProtectACLCreateModePathAndBytesable.java
 
b/curator-framework/src/main/java/org/apache/curator/framework/api/CreateProtectACLCreateModePathAndBytesable.java
index 9e0c840..0d6f35b 100755
--- 
a/curator-framework/src/main/java/org/apache/curator/framework/api/CreateProtectACLCreateModePathAndBytesable.java
+++ 
b/curator-framework/src/main/java/org/apache/curator/framework/api/CreateProtectACLCreateModePathAndBytesable.java
@@ -69,4 +69,6 @@ public interface 
CreateProtectACLCreateModePathAndBytesable<T> extends
      * @return this
      */
     public ACLCreateModeBackgroundPathAndBytesable<String>    withProtection();
+
+    public Watchable<ACLCreateModeStatBackgroundPathAndBytesable<String>>    
withWatchedProtection();
 }

http://git-wip-us.apache.org/repos/asf/curator/blob/31151b67/curator-framework/src/main/java/org/apache/curator/framework/api/ProtectACLCreateModePathAndBytesable.java
----------------------------------------------------------------------
diff --git 
a/curator-framework/src/main/java/org/apache/curator/framework/api/ProtectACLCreateModePathAndBytesable.java
 
b/curator-framework/src/main/java/org/apache/curator/framework/api/ProtectACLCreateModePathAndBytesable.java
index 1d1df10..57f4beb 100644
--- 
a/curator-framework/src/main/java/org/apache/curator/framework/api/ProtectACLCreateModePathAndBytesable.java
+++ 
b/curator-framework/src/main/java/org/apache/curator/framework/api/ProtectACLCreateModePathAndBytesable.java
@@ -51,4 +51,6 @@ public interface ProtectACLCreateModePathAndBytesable<T> 
extends
      * @return this
      */
     public ACLCreateModeBackgroundPathAndBytesable<String>    withProtection();
+
+    public Watchable<ACLCreateModeStatBackgroundPathAndBytesable<String>>    
withWatchedProtection();
 }

http://git-wip-us.apache.org/repos/asf/curator/blob/31151b67/curator-framework/src/main/java/org/apache/curator/framework/imps/CreateBuilderImpl.java
----------------------------------------------------------------------
diff --git 
a/curator-framework/src/main/java/org/apache/curator/framework/imps/CreateBuilderImpl.java
 
b/curator-framework/src/main/java/org/apache/curator/framework/imps/CreateBuilderImpl.java
index ce82542..e446024 100644
--- 
a/curator-framework/src/main/java/org/apache/curator/framework/imps/CreateBuilderImpl.java
+++ 
b/curator-framework/src/main/java/org/apache/curator/framework/imps/CreateBuilderImpl.java
@@ -36,6 +36,8 @@ import org.apache.zookeeper.AsyncCallback;
 import org.apache.zookeeper.CreateMode;
 import org.apache.zookeeper.KeeperException;
 import org.apache.zookeeper.Op;
+import org.apache.zookeeper.WatchedEvent;
+import org.apache.zookeeper.Watcher;
 import org.apache.zookeeper.data.ACL;
 import org.apache.zookeeper.data.Stat;
 import org.apache.zookeeper.server.DataTree;
@@ -53,14 +55,15 @@ public class CreateBuilderImpl implements CreateBuilder, 
CreateBuilder2, Backgro
     private Backgrounding backgrounding;
     private boolean createParentsIfNeeded;
     private boolean createParentsAsContainers;
-    private boolean doProtected;
     private boolean compress;
     private boolean setDataIfExists;
     private int setDataIfExistsVersion = -1;
-    private String protectedId;
     private ACLing acling;
     private Stat storingStat;
     private long ttl;
+    private boolean doProtected;
+    private String protectedId;
+    private Watching protectedWatching;
 
     @VisibleForTesting
     boolean failNextCreateForTesting = false;
@@ -77,11 +80,13 @@ public class CreateBuilderImpl implements CreateBuilder, 
CreateBuilder2, Backgro
         createParentsIfNeeded = false;
         createParentsAsContainers = false;
         compress = false;
-        doProtected = false;
         setDataIfExists = false;
-        protectedId = null;
         storingStat = null;
         ttl = -1;
+
+        doProtected = false;
+        protectedId = null;
+        protectedWatching = new Watching(client);
     }
 
     public CreateBuilderImpl(CuratorFrameworkImpl client, CreateMode 
createMode, Backgrounding backgrounding, boolean createParentsIfNeeded, boolean 
createParentsAsContainers, boolean doProtected, boolean compress, boolean 
setDataIfExists, List<ACL> aclList, Stat storingStat, long ttl)
@@ -398,6 +403,12 @@ public class CreateBuilderImpl implements CreateBuilder, 
CreateBuilder2, Backgro
             }
 
             @Override
+            public 
Watchable<ACLCreateModeStatBackgroundPathAndBytesable<String>> 
withWatchedProtection()
+            {
+                return CreateBuilderImpl.this.withWatchedProtection();
+            }
+
+            @Override
             public BackgroundPathAndBytesable<String> withACL(List<ACL> 
aclList)
             {
                 return withACL(aclList, false);
@@ -480,6 +491,35 @@ public class CreateBuilderImpl implements CreateBuilder, 
CreateBuilder2, Backgro
     }
 
     @Override
+    public Watchable<ACLCreateModeStatBackgroundPathAndBytesable<String>> 
withWatchedProtection()
+    {
+        setProtected();
+        return new 
Watchable<ACLCreateModeStatBackgroundPathAndBytesable<String>>()
+        {
+            @Override
+            public ACLCreateModeStatBackgroundPathAndBytesable<String> 
watched()
+            {
+                protectedWatching = new Watching(client, true);
+                return asACLCreateModeStatBackgroundPathAndBytesable();
+            }
+
+            @Override
+            public ACLCreateModeStatBackgroundPathAndBytesable<String> 
usingWatcher(Watcher watcher)
+            {
+                protectedWatching = new Watching(client, watcher);
+                return asACLCreateModeStatBackgroundPathAndBytesable();
+            }
+
+            @Override
+            public ACLCreateModeStatBackgroundPathAndBytesable<String> 
usingWatcher(CuratorWatcher watcher)
+            {
+                protectedWatching = new Watching(client, watcher);
+                return asACLCreateModeStatBackgroundPathAndBytesable();
+            }
+        };
+    }
+
+    @Override
     public ACLPathAndBytesable<String> withProtectedEphemeralSequential()
     {
         setProtected();
@@ -773,6 +813,12 @@ public class CreateBuilderImpl implements CreateBuilder, 
CreateBuilder2, Backgro
             }
 
             @Override
+            public 
Watchable<ACLCreateModeStatBackgroundPathAndBytesable<String>> 
withWatchedProtection()
+            {
+                return CreateBuilderImpl.this.withWatchedProtection();
+            }
+
+            @Override
             public ProtectACLCreateModePathAndBytesable<String> 
creatingParentsIfNeeded() {
                 return CreateBuilderImpl.this.creatingParentsIfNeeded();
             }
@@ -1134,8 +1180,8 @@ public class CreateBuilderImpl implements CreateBuilder, 
CreateBuilder2, Backgro
 
                 if ( failNextCreateForTesting )
                 {
-                    pathInForeground(path, data, acling.getAclList(path));   
// simulate success on server without notification to client
                     failNextCreateForTesting = false;
+                    pathInForeground(path, data, acling.getAclList(path));   
// simulate success on server without notification to client
                     throw new KeeperException.ConnectionLossException();
                 }
 
@@ -1253,6 +1299,22 @@ public class CreateBuilderImpl implements CreateBuilder, 
CreateBuilder2, Backgro
                             List<String> children = 
client.getZooKeeper().getChildren(pathAndNode.getPath(), false);
 
                             foundNode = findNode(children, 
pathAndNode.getPath(), protectedId);
+                            if ( (foundNode != null) && 
protectedWatching.hasWatcher() )
+                            {
+                                String foundPath = 
ZKPaths.makePath(pathAndNode.getPath(), foundNode);
+                                Watcher protectedWatcher = 
protectedWatching.getWatcher(foundPath);
+                                try
+                                {
+                                    client.getZooKeeper().getData(foundPath, 
protectedWatcher, null);
+                                    
protectedWatching.commitWatcher(KeeperException.Code.OK.intValue(), false);
+                                }
+                                catch ( KeeperException.NoNodeException e )
+                                {
+                                    
protectedWatching.commitWatcher(KeeperException.Code.CONNECTIONLOSS.intValue(), 
false); // CONNECTIONLOSS no need to register namespace watcher
+                                    WatchedEvent event = new 
WatchedEvent(Watcher.Event.EventType.NodeDeleted, 
Watcher.Event.KeeperState.SyncConnected, e.getPath());
+                                    protectedWatcher.process(event);
+                                }
+                            }
                         }
                         catch ( KeeperException.NoNodeException ignore )
                         {

http://git-wip-us.apache.org/repos/asf/curator/blob/31151b67/curator-framework/src/main/java/org/apache/curator/framework/state/ConnectionStateManager.java
----------------------------------------------------------------------
diff --git 
a/curator-framework/src/main/java/org/apache/curator/framework/state/ConnectionStateManager.java
 
b/curator-framework/src/main/java/org/apache/curator/framework/state/ConnectionStateManager.java
index 5e28b3d..bbcb5c2 100644
--- 
a/curator-framework/src/main/java/org/apache/curator/framework/state/ConnectionStateManager.java
+++ 
b/curator-framework/src/main/java/org/apache/curator/framework/state/ConnectionStateManager.java
@@ -25,6 +25,7 @@ import org.apache.curator.framework.CuratorFramework;
 import org.apache.curator.framework.listen.ListenerContainer;
 import org.apache.curator.utils.Compatibility;
 import org.apache.curator.utils.ThreadUtils;
+import org.apache.zookeeper.ZooKeeper;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import java.io.Closeable;
@@ -302,10 +303,11 @@ public class ConnectionStateManager implements Closeable
             if ( elapsedMs >= useSessionTimeoutMs )
             {
                 startOfSuspendedEpoch = System.currentTimeMillis(); // reset 
startOfSuspendedEpoch to avoid spinning on this session expiration injection 
CURATOR-405
-                log.warn(String.format("Session timeout has elapsed while 
SUSPENDED. Injecting a session expiration. Elapsed ms: %d. Adjusted session 
timeout ms: %d", elapsedMs, useSessionTimeoutMs));
                 try
                 {
-                    
Compatibility.injectSessionExpiration(client.getZookeeperClient().getZooKeeper());
+                    ZooKeeper zooKeeper = 
client.getZookeeperClient().getZooKeeper();
+                    log.warn(String.format("Session timeout has elapsed while 
SUSPENDED. Injecting a session expiration. Elapsed ms: %d. Adjusted session 
timeout ms: %d. SessionId: 0x%s", elapsedMs, useSessionTimeoutMs, 
Long.toHexString(zooKeeper.getSessionId())));
+                    Compatibility.injectSessionExpiration(zooKeeper);
                 }
                 catch ( Exception e )
                 {

http://git-wip-us.apache.org/repos/asf/curator/blob/31151b67/curator-framework/src/test/java/org/apache/curator/framework/imps/TestFrameworkEdges.java
----------------------------------------------------------------------
diff --git 
a/curator-framework/src/test/java/org/apache/curator/framework/imps/TestFrameworkEdges.java
 
b/curator-framework/src/test/java/org/apache/curator/framework/imps/TestFrameworkEdges.java
index a28d6c5..4fffc8b 100644
--- 
a/curator-framework/src/test/java/org/apache/curator/framework/imps/TestFrameworkEdges.java
+++ 
b/curator-framework/src/test/java/org/apache/curator/framework/imps/TestFrameworkEdges.java
@@ -30,11 +30,15 @@ import org.apache.curator.framework.api.CreateBuilder;
 import org.apache.curator.framework.api.CuratorEvent;
 import org.apache.curator.framework.api.CuratorEventType;
 import org.apache.curator.framework.api.CuratorListener;
+import org.apache.curator.framework.api.ErrorListenerPathAndBytesable;
 import org.apache.curator.framework.state.ConnectionState;
 import org.apache.curator.framework.state.ConnectionStateListener;
+import org.apache.curator.retry.RetryForever;
 import org.apache.curator.retry.RetryNTimes;
 import org.apache.curator.retry.RetryOneTime;
 import org.apache.curator.test.BaseClassForTests;
+import org.apache.curator.test.InstanceSpec;
+import org.apache.curator.test.TestingCluster;
 import org.apache.curator.test.TestingServer;
 import org.apache.curator.test.compatibility.KillSession2;
 import org.apache.curator.test.compatibility.Timing2;
@@ -55,6 +59,7 @@ import java.util.Random;
 import java.util.concurrent.ArrayBlockingQueue;
 import java.util.concurrent.BlockingQueue;
 import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.LinkedBlockingQueue;
 import java.util.concurrent.Semaphore;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicBoolean;
@@ -62,7 +67,6 @@ import java.util.concurrent.atomic.AtomicInteger;
 
 public class TestFrameworkEdges extends BaseClassForTests
 {
-
     private final Logger log = LoggerFactory.getLogger(getClass());
     private final Timing2 timing = new Timing2();
 
@@ -73,6 +77,84 @@ public class TestFrameworkEdges extends BaseClassForTests
     }
 
     @Test
+    public void testProtectionWithKilledSession() throws Exception
+    {
+        server.stop();  // not needed
+
+        // see CURATOR-498
+        // attempt to re-create the state described in the bug report: create 
a 3 Instance ensemble;
+        // have Curator connect to only 1 one of those instances; set 
failNextCreateForTesting to
+        // simulate protection mode searching; kill the connected server when 
this happens;
+        // wait for session timeout to elapse and then restart the instance. 
In most cases
+        // this will cause the scenario as Curator will send the session 
cancel and do protection mode
+        // search around the same time. The protection mode search should 
return first as it can be resolved
+        // by the Instance Curator is connected to but the session kill needs 
a quorum vote (it's a
+        // transaction)
+
+        try ( TestingCluster cluster = new TestingCluster(3) )
+        {
+            cluster.start();
+            InstanceSpec instanceSpec0 = 
cluster.getServers().get(0).getInstanceSpec();
+
+            CountDownLatch serverStoppedLatch = new CountDownLatch(1);
+            RetryPolicy retryPolicy = new RetryForever(100)
+            {
+                @Override
+                public boolean allowRetry(int retryCount, long elapsedTimeMs, 
RetrySleeper sleeper)
+                {
+                    if ( serverStoppedLatch.getCount() > 0 )
+                    {
+                        try
+                        {
+                            cluster.killServer(instanceSpec0);
+                        }
+                        catch ( Exception e )
+                        {
+                            // ignore
+                        }
+                        serverStoppedLatch.countDown();
+                    }
+                    return super.allowRetry(retryCount, elapsedTimeMs, 
sleeper);
+                }
+            };
+
+            try (CuratorFramework client = 
CuratorFrameworkFactory.newClient(instanceSpec0.getConnectString(), 
timing.session(), timing.connection(), retryPolicy))
+            {
+                BlockingQueue<String> createdNode = new 
LinkedBlockingQueue<>();
+                BackgroundCallback callback = (__, event) -> {
+                    if ( event.getType() == CuratorEventType.CREATE )
+                    {
+                        createdNode.offer(event.getPath());
+                    }
+                };
+
+                client.start();
+                client.create().forPath("/test");
+
+                CountDownLatch protectedNodeLatch = new CountDownLatch(1);
+                Watcher protectedNodeWatcher = watchedEvent -> {
+                    if ( watchedEvent.getType() == 
Watcher.Event.EventType.NodeDeleted )
+                    {
+                        protectedNodeLatch.countDown();
+                    }
+                };
+                ErrorListenerPathAndBytesable<String> builder = 
client.create().withWatchedProtection().usingWatcher(protectedNodeWatcher).withMode(CreateMode.EPHEMERAL).inBackground(callback);
+                ((CreateBuilderImpl)builder).failNextCreateForTesting = true;
+
+                builder.forPath("/test/hey");
+
+                Assert.assertTrue(timing.awaitLatch(serverStoppedLatch));
+                timing.forSessionSleep().sleep();   // wait for session to 
expire
+                cluster.restartServer(instanceSpec0);
+
+                String path = timing.takeFromQueue(createdNode);
+                Assert.assertNotNull(path);
+                Assert.assertTrue(timing.awaitLatch(protectedNodeLatch));
+            }
+        }
+    }
+
+    @Test
     public void testBackgroundLatencyUnSleep() throws Exception
     {
         server.stop();

http://git-wip-us.apache.org/repos/asf/curator/blob/31151b67/curator-recipes/src/main/java/org/apache/curator/framework/recipes/leader/LeaderLatch.java
----------------------------------------------------------------------
diff --git 
a/curator-recipes/src/main/java/org/apache/curator/framework/recipes/leader/LeaderLatch.java
 
b/curator-recipes/src/main/java/org/apache/curator/framework/recipes/leader/LeaderLatch.java
index bb8aa73..c8e8e22 100644
--- 
a/curator-recipes/src/main/java/org/apache/curator/framework/recipes/leader/LeaderLatch.java
+++ 
b/curator-recipes/src/main/java/org/apache/curator/framework/recipes/leader/LeaderLatch.java
@@ -33,6 +33,7 @@ import 
org.apache.curator.framework.recipes.locks.LockInternalsSorter;
 import org.apache.curator.framework.recipes.locks.StandardLockInternalsDriver;
 import org.apache.curator.framework.state.ConnectionState;
 import org.apache.curator.framework.state.ConnectionStateListener;
+import org.apache.curator.utils.PathUtils;
 import org.apache.curator.utils.ThreadUtils;
 import org.apache.curator.utils.ZKPaths;
 import org.apache.zookeeper.CreateMode;
@@ -52,7 +53,6 @@ import java.util.concurrent.Future;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.AtomicReference;
-import org.apache.curator.utils.PathUtils;
 
 /**
  * <p>
@@ -84,6 +84,21 @@ public class LeaderLatch implements Closeable
         }
     };
 
+    private final Watcher protectedModeWatcher = watchedEvent -> {
+        if ( watchedEvent.getType() == Watcher.Event.EventType.NodeDeleted )
+        {
+            try
+            {
+                log.warn("Protected mode node was deleted. Resetting.");
+                reset();
+            }
+            catch ( Exception e )
+            {
+                log.error("Could not reset from protectedModeWatcher", e);
+            }
+        }
+    };
+
     private static final String LOCK_NAME = "latch-";
 
     private static final LockInternalsSorter sorter = new LockInternalsSorter()
@@ -504,7 +519,7 @@ public class LeaderLatch implements Closeable
                 }
             }
         };
-        
client.create().creatingParentContainersIfNeeded().withProtection().withMode(CreateMode.EPHEMERAL_SEQUENTIAL).inBackground(callback).forPath(ZKPaths.makePath(latchPath,
 LOCK_NAME), LeaderSelector.getIdBytes(id));
+        
client.create().creatingParentContainersIfNeeded().withWatchedProtection().usingWatcher(protectedModeWatcher).withMode(CreateMode.EPHEMERAL_SEQUENTIAL).inBackground(callback).forPath(ZKPaths.makePath(latchPath,
 LOCK_NAME), LeaderSelector.getIdBytes(id));
     }
 
     private synchronized void internalStart()

Reply via email to