Repository: curator
Updated Branches:
  refs/heads/CURATOR-498 [created] ea505f542


CURATOR-498

Kudos to user Shay Shimony for his tireless and excellent work tracking this 
down. There are two problems addressed here: 1) Protected create mode can 
potentially find a ZNode that is about to be deleted due to an expired session. 
CreateBuilderImpl now keeps track of the session ID when the create is 
initiated. If after a connection loss the session ID has changed, any found 
protected node is ignored as it will soon be deleted. 2) For ZooKeeper 3.4.x 
the simulated (via reflection) InjectSessionExpiration was incorrectly setting 
the connection state to closed which caused the session expiration to be 
ignored.


Project: http://git-wip-us.apache.org/repos/asf/curator/repo
Commit: http://git-wip-us.apache.org/repos/asf/curator/commit/ea505f54
Tree: http://git-wip-us.apache.org/repos/asf/curator/tree/ea505f54
Diff: http://git-wip-us.apache.org/repos/asf/curator/diff/ea505f54

Branch: refs/heads/CURATOR-498
Commit: ea505f54291dc548aca947503630960cd10225d0
Parents: 4251fe3
Author: randgalt <[email protected]>
Authored: Mon Jan 28 14:23:15 2019 -0500
Committer: randgalt <[email protected]>
Committed: Mon Jan 28 14:23:15 2019 -0500

----------------------------------------------------------------------
 .../curator/utils/InjectSessionExpiration.java  |  7 +-
 .../framework/imps/CreateBuilderImpl.java       | 70 ++++++++++----
 .../framework/imps/TestFrameworkEdges.java      | 98 +++++++++++++++++++-
 3 files changed, 148 insertions(+), 27 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/curator/blob/ea505f54/curator-client/src/main/java/org/apache/curator/utils/InjectSessionExpiration.java
----------------------------------------------------------------------
diff --git 
a/curator-client/src/main/java/org/apache/curator/utils/InjectSessionExpiration.java
 
b/curator-client/src/main/java/org/apache/curator/utils/InjectSessionExpiration.java
index 996e9a2..caf9cbf 100644
--- 
a/curator-client/src/main/java/org/apache/curator/utils/InjectSessionExpiration.java
+++ 
b/curator-client/src/main/java/org/apache/curator/utils/InjectSessionExpiration.java
@@ -30,7 +30,6 @@ import java.lang.reflect.Method;
 public class InjectSessionExpiration
 {
     private static final Field cnxnField;
-    private static final Field stateField;
     private static final Field eventThreadField;
     private static final Field sendThreadField;
     private static final Method queueEventMethod;
@@ -40,7 +39,6 @@ public class InjectSessionExpiration
     static
     {
         Field localCnxnField;
-        Field localStateField;
         Field localEventThreadField;
         Field localSendThreadField;
         Method localQueueEventMethod;
@@ -55,8 +53,6 @@ public class InjectSessionExpiration
 
             localCnxnField = ZooKeeper.class.getDeclaredField("cnxn");
             localCnxnField.setAccessible(true);
-            localStateField = ClientCnxn.class.getDeclaredField("state");
-            localStateField.setAccessible(true);
             localEventThreadField = 
ClientCnxn.class.getDeclaredField("eventThread");
             localEventThreadField.setAccessible(true);
             localSendThreadField = 
ClientCnxn.class.getDeclaredField("sendThread");
@@ -75,7 +71,6 @@ public class InjectSessionExpiration
             throw new RuntimeException("Could not access internal ZooKeeper 
fields", e);
         }
         cnxnField = localCnxnField;
-        stateField = localStateField;
         eventThreadField = localEventThreadField;
         sendThreadField = localSendThreadField;
         queueEventMethod = localQueueEventMethod;
@@ -94,7 +89,7 @@ public class InjectSessionExpiration
             Object eventThread = eventThreadField.get(clientCnxn);
             queueEventMethod.invoke(eventThread, event);
             queueEventOfDeathMethod.invoke(eventThread);
-            stateField.set(clientCnxn, ZooKeeper.States.CLOSED);
+            // we used to set the state field to CLOSED here but this resulted 
in CURATOR-498
             Object sendThread = sendThreadField.get(clientCnxn);
             Object clientCnxnSocket = 
getClientCnxnSocketMethod.invoke(sendThread);
             wakeupCnxnMethod.invoke(clientCnxnSocket);

http://git-wip-us.apache.org/repos/asf/curator/blob/ea505f54/curator-framework/src/main/java/org/apache/curator/framework/imps/CreateBuilderImpl.java
----------------------------------------------------------------------
diff --git 
a/curator-framework/src/main/java/org/apache/curator/framework/imps/CreateBuilderImpl.java
 
b/curator-framework/src/main/java/org/apache/curator/framework/imps/CreateBuilderImpl.java
index ce82542..d1ff2bb 100644
--- 
a/curator-framework/src/main/java/org/apache/curator/framework/imps/CreateBuilderImpl.java
+++ 
b/curator-framework/src/main/java/org/apache/curator/framework/imps/CreateBuilderImpl.java
@@ -36,10 +36,12 @@ import org.apache.zookeeper.AsyncCallback;
 import org.apache.zookeeper.CreateMode;
 import org.apache.zookeeper.KeeperException;
 import org.apache.zookeeper.Op;
+import org.apache.zookeeper.Watcher;
 import org.apache.zookeeper.data.ACL;
 import org.apache.zookeeper.data.Stat;
 import org.apache.zookeeper.server.DataTree;
-
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 import java.util.List;
 import java.util.UUID;
 import java.util.concurrent.Callable;
@@ -48,19 +50,21 @@ import java.util.concurrent.atomic.AtomicBoolean;
 
 public class CreateBuilderImpl implements CreateBuilder, CreateBuilder2, 
BackgroundOperation<PathAndBytes>, ErrorListenerPathAndBytesable<String>
 {
+    private final Logger log = LoggerFactory.getLogger(getClass());
     private final CuratorFrameworkImpl client;
     private CreateMode createMode;
     private Backgrounding backgrounding;
     private boolean createParentsIfNeeded;
     private boolean createParentsAsContainers;
-    private boolean doProtected;
     private boolean compress;
     private boolean setDataIfExists;
     private int setDataIfExistsVersion = -1;
-    private String protectedId;
     private ACLing acling;
     private Stat storingStat;
     private long ttl;
+    private boolean doProtected;
+    private String protectedId;
+    private long initialSessionId;
 
     @VisibleForTesting
     boolean failNextCreateForTesting = false;
@@ -77,11 +81,13 @@ public class CreateBuilderImpl implements CreateBuilder, 
CreateBuilder2, Backgro
         createParentsIfNeeded = false;
         createParentsAsContainers = false;
         compress = false;
-        doProtected = false;
         setDataIfExists = false;
-        protectedId = null;
         storingStat = null;
         ttl = -1;
+        initialSessionId = 0;
+
+        doProtected = false;
+        protectedId = null;
     }
 
     public CreateBuilderImpl(CuratorFrameworkImpl client, CreateMode 
createMode, Backgrounding backgrounding, boolean createParentsIfNeeded, boolean 
createParentsAsContainers, boolean doProtected, boolean compress, boolean 
setDataIfExists, List<ACL> aclList, Stat storingStat, long ttl)
@@ -618,9 +624,9 @@ public class CreateBuilderImpl implements CreateBuilder, 
CreateBuilder2, Backgro
                  */
                 new FindAndDeleteProtectedNodeInBackground(client, 
ZKPaths.getPathAndNode(adjustedPath).getPath(), protectedId).execute();
                 /*
-                * The current UUID is scheduled to be deleted, it is not safe 
to use it again.
-                * If this builder is used again later create a new UUID
-                */
+                 * The current UUID is scheduled to be deleted, it is not safe 
to use it again.
+                 * If this builder is used again later create a new UUID
+                 */
                 protectedId = UUID.randomUUID().toString();
             }
 
@@ -685,16 +691,16 @@ public class CreateBuilderImpl implements CreateBuilder, 
CreateBuilder2, Backgro
             else
             {
                 CreateZK35.create
-                (
-                    client.getZooKeeper(),
-                    operationAndData.getData().getPath(),
-                    data,
-                    acling.getAclList(operationAndData.getData().getPath()),
-                    createMode,
-                    mainCallback,
-                    backgrounding.getContext(),
-                    ttl
-                );
+                    (
+                        client.getZooKeeper(),
+                        operationAndData.getData().getPath(),
+                        data,
+                        
acling.getAclList(operationAndData.getData().getPath()),
+                        createMode,
+                        mainCallback,
+                        backgrounding.getContext(),
+                        ttl
+                    );
             }
         }
         catch ( Throwable e )
@@ -748,7 +754,7 @@ public class CreateBuilderImpl implements CreateBuilder, 
CreateBuilder2, Backgro
 
             @Override
             public ErrorListenerPathAndBytesable<String> 
inBackground(BackgroundCallback callback, Object context,
-                    Executor executor) {
+                                                                      Executor 
executor) {
                 return CreateBuilderImpl.this.inBackground(callback, context, 
executor);
             }
 
@@ -1104,6 +1110,10 @@ public class CreateBuilderImpl implements CreateBuilder, 
CreateBuilder2, Backgro
             {
                 boolean callSuper = true;
                 boolean localFirstTime = firstTime.getAndSet(false) && 
!debugForceFindProtectedNode;
+                if ( initialSessionId == 0 )
+                {
+                    initialSessionId = client.getZooKeeper().getSessionId();
+                }
                 if ( !localFirstTime && doProtected )
                 {
                     debugForceFindProtectedNode = false;
@@ -1134,8 +1144,8 @@ public class CreateBuilderImpl implements CreateBuilder, 
CreateBuilder2, Backgro
 
                 if ( failNextCreateForTesting )
                 {
-                    pathInForeground(path, data, acling.getAclList(path));   
// simulate success on server without notification to client
                     failNextCreateForTesting = false;
+                    pathInForeground(path, data, acling.getAclList(path));   
// simulate success on server without notification to client
                     throw new KeeperException.ConnectionLossException();
                 }
 
@@ -1162,6 +1172,10 @@ public class CreateBuilderImpl implements CreateBuilder, 
CreateBuilder2, Backgro
                     public String call() throws Exception
                     {
                         boolean localFirstTime = firstTime.getAndSet(false) && 
!debugForceFindProtectedNode;
+                        if ( initialSessionId == 0 )
+                        {
+                            initialSessionId = 
client.getZooKeeper().getSessionId();
+                        }
 
                         String createdPath = null;
                         if ( !localFirstTime && doProtected )
@@ -1253,6 +1267,22 @@ public class CreateBuilderImpl implements CreateBuilder, 
CreateBuilder2, Backgro
                             List<String> children = 
client.getZooKeeper().getChildren(pathAndNode.getPath(), false);
 
                             foundNode = findNode(children, 
pathAndNode.getPath(), protectedId);
+                            log.debug("Protected mode findNode result: {}", 
foundNode);
+
+                            if ( doProtected && createMode.isEphemeral() )
+                            {
+                                if ( initialSessionId != 
client.getZooKeeper().getSessionId() )
+                                {
+                                    log.info("Session has changed during 
protected mode with ephemeral. old: {} new: {}", initialSessionId, 
client.getZooKeeper().getSessionId());
+                                    if ( foundNode != null )
+                                    {
+                                        log.info("Deleted old session's found 
node: {}", foundNode);
+                                        
client.getFailedDeleteManager().executeGuaranteedOperationInBackground(foundNode);
+                                        foundNode = null;
+                                    }
+                                    initialSessionId = 
client.getZooKeeper().getSessionId();
+                                }
+                            }
                         }
                         catch ( KeeperException.NoNodeException ignore )
                         {

http://git-wip-us.apache.org/repos/asf/curator/blob/ea505f54/curator-framework/src/test/java/org/apache/curator/framework/imps/TestFrameworkEdges.java
----------------------------------------------------------------------
diff --git 
a/curator-framework/src/test/java/org/apache/curator/framework/imps/TestFrameworkEdges.java
 
b/curator-framework/src/test/java/org/apache/curator/framework/imps/TestFrameworkEdges.java
index a28d6c5..066cee2 100644
--- 
a/curator-framework/src/test/java/org/apache/curator/framework/imps/TestFrameworkEdges.java
+++ 
b/curator-framework/src/test/java/org/apache/curator/framework/imps/TestFrameworkEdges.java
@@ -30,11 +30,15 @@ import org.apache.curator.framework.api.CreateBuilder;
 import org.apache.curator.framework.api.CuratorEvent;
 import org.apache.curator.framework.api.CuratorEventType;
 import org.apache.curator.framework.api.CuratorListener;
+import org.apache.curator.framework.api.ErrorListenerPathAndBytesable;
 import org.apache.curator.framework.state.ConnectionState;
 import org.apache.curator.framework.state.ConnectionStateListener;
+import org.apache.curator.retry.RetryForever;
 import org.apache.curator.retry.RetryNTimes;
 import org.apache.curator.retry.RetryOneTime;
 import org.apache.curator.test.BaseClassForTests;
+import org.apache.curator.test.InstanceSpec;
+import org.apache.curator.test.TestingCluster;
 import org.apache.curator.test.TestingServer;
 import org.apache.curator.test.compatibility.KillSession2;
 import org.apache.curator.test.compatibility.Timing2;
@@ -50,11 +54,13 @@ import org.slf4j.LoggerFactory;
 import org.testng.Assert;
 import org.testng.annotations.BeforeClass;
 import org.testng.annotations.Test;
+import java.util.Collections;
 import java.util.List;
 import java.util.Random;
 import java.util.concurrent.ArrayBlockingQueue;
 import java.util.concurrent.BlockingQueue;
 import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.LinkedBlockingQueue;
 import java.util.concurrent.Semaphore;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicBoolean;
@@ -62,7 +68,6 @@ import java.util.concurrent.atomic.AtomicInteger;
 
 public class TestFrameworkEdges extends BaseClassForTests
 {
-
     private final Logger log = LoggerFactory.getLogger(getClass());
     private final Timing2 timing = new Timing2();
 
@@ -73,6 +78,97 @@ public class TestFrameworkEdges extends BaseClassForTests
     }
 
     @Test
+    public void testInjectSessionExpiration() throws Exception
+    {
+        try (CuratorFramework client = 
CuratorFrameworkFactory.newClient(server.getConnectString(), new 
RetryOneTime(1)))
+        {
+            client.start();
+
+            CountDownLatch expiredLatch = new CountDownLatch(1);
+            Watcher watcher = event -> {
+                if ( event.getState() == Watcher.Event.KeeperState.Expired )
+                {
+                    expiredLatch.countDown();
+                }
+            };
+            client.checkExists().usingWatcher(watcher).forPath("/foobar");
+            KillSession2.kill(client.getZookeeperClient().getZooKeeper());
+            Assert.assertTrue(timing.awaitLatch(expiredLatch));
+        }
+    }
+
+    @Test
+    public void testProtectionWithKilledSession() throws Exception
+    {
+        server.stop();  // not needed
+
+        // see CURATOR-498
+        // attempt to re-create the state described in the bug report: create 
a 3 Instance ensemble;
+        // have Curator connect to only 1 one of those instances; set 
failNextCreateForTesting to
+        // simulate protection mode searching; kill the connected server when 
this happens;
+        // wait for session timeout to elapse and then restart the instance. 
In most cases
+        // this will cause the scenario as Curator will send the session 
cancel and do protection mode
+        // search around the same time. The protection mode search should 
return first as it can be resolved
+        // by the Instance Curator is connected to but the session kill needs 
a quorum vote (it's a
+        // transaction)
+
+        try (TestingCluster cluster = new TestingCluster(3))
+        {
+            cluster.start();
+            InstanceSpec instanceSpec0 = 
cluster.getServers().get(0).getInstanceSpec();
+
+            CountDownLatch serverStoppedLatch = new CountDownLatch(1);
+            RetryPolicy retryPolicy = new RetryForever(100)
+            {
+                @Override
+                public boolean allowRetry(int retryCount, long elapsedTimeMs, 
RetrySleeper sleeper)
+                {
+                    if ( serverStoppedLatch.getCount() > 0 )
+                    {
+                        try
+                        {
+                            cluster.killServer(instanceSpec0);
+                        }
+                        catch ( Exception e )
+                        {
+                            // ignore
+                        }
+                        serverStoppedLatch.countDown();
+                    }
+                    return super.allowRetry(retryCount, elapsedTimeMs, 
sleeper);
+                }
+            };
+
+            try (CuratorFramework client = 
CuratorFrameworkFactory.newClient(instanceSpec0.getConnectString(), 
timing.session(), timing.connection(), retryPolicy))
+            {
+                BlockingQueue<String> createdNode = new 
LinkedBlockingQueue<>();
+                BackgroundCallback callback = (__, event) -> {
+                    if ( event.getType() == CuratorEventType.CREATE )
+                    {
+                        createdNode.offer(event.getPath());
+                    }
+                };
+
+                client.start();
+                client.create().forPath("/test");
+
+                ErrorListenerPathAndBytesable<String> builder = 
client.create().withProtection().withMode(CreateMode.EPHEMERAL).inBackground(callback);
+                ((CreateBuilderImpl)builder).failNextCreateForTesting = true;
+
+                builder.forPath("/test/hey");
+
+                Assert.assertTrue(timing.awaitLatch(serverStoppedLatch));
+                timing.forSessionSleep().sleep();   // wait for session to 
expire
+                cluster.restartServer(instanceSpec0);
+
+                String path = timing.takeFromQueue(createdNode);
+                List<String> children = client.getChildren().forPath("/test");
+                
Assert.assertEquals(Collections.singletonList(ZKPaths.getNodeFromPath(path)), 
children);
+            }
+        }
+    }
+
+    @Test
     public void testBackgroundLatencyUnSleep() throws Exception
     {
         server.stop();

Reply via email to