This is an automated email from the ASF dual-hosted git repository.

randgalt pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/curator.git

commit a3ca8b5d159262fcd3c2fc7cdfc5c26c6c5473cd
Author: randgalt <[email protected]>
AuthorDate: Tue Mar 31 20:22:43 2020 -0500

    CURATOR-525
    
    There is a race whereby the ZooKeeper connection can be healed before 
Curator is finished processing the new connection state. When this happens
    the Curator instance becomes a Zombie stuck in the LOST state. This fix is 
a "hack". ConnectionStateManager will notice that the connection state is
    LOST but that the Curator instance reports that it is connected. When this 
happens, it is logged and the connection is reset.
---
 .../framework/imps/CuratorFrameworkImpl.java       | 27 ++++++++++++++++
 .../framework/state/ConnectionStateManager.java    | 18 +++++++++++
 .../curator/framework/imps/TestFrameworkEdges.java | 36 ++++++++++++++++++++++
 3 files changed, 81 insertions(+)

diff --git 
a/curator-framework/src/main/java/org/apache/curator/framework/imps/CuratorFrameworkImpl.java
 
b/curator-framework/src/main/java/org/apache/curator/framework/imps/CuratorFrameworkImpl.java
index bfe61bf..1abfc28 100644
--- 
a/curator-framework/src/main/java/org/apache/curator/framework/imps/CuratorFrameworkImpl.java
+++ 
b/curator-framework/src/main/java/org/apache/curator/framework/imps/CuratorFrameworkImpl.java
@@ -816,6 +816,13 @@ public class CuratorFrameworkImpl implements 
CuratorFramework
         return ensembleTracker;
     }
 
+    @VisibleForTesting
+    volatile CountDownLatch debugCheckBackgroundRetryLatch;
+    @VisibleForTesting
+    volatile CountDownLatch debugCheckBackgroundRetryReadyLatch;
+    @VisibleForTesting
+    volatile KeeperException.Code injectedCode;
+
     @SuppressWarnings({"ThrowableResultOfMethodCallIgnored"})
     private <DATA_TYPE> boolean 
checkBackgroundRetry(OperationAndData<DATA_TYPE> operationAndData, CuratorEvent 
event)
     {
@@ -851,6 +858,26 @@ public class CuratorFrameworkImpl implements 
CuratorFramework
                 e = new Exception("Unknown result codegetResultCode()");
             }
 
+            if ( debugCheckBackgroundRetryLatch != null )       // scaffolding 
to test CURATOR-525
+            {
+                if ( debugCheckBackgroundRetryReadyLatch != null )
+                {
+                    debugCheckBackgroundRetryReadyLatch.countDown();
+                }
+                try
+                {
+                    debugCheckBackgroundRetryLatch.await();
+                    if (injectedCode != null)
+                    {
+                        code = injectedCode;
+                    }
+                }
+                catch ( InterruptedException ex )
+                {
+                    Thread.currentThread().interrupt();
+                }
+            }
+
             validateConnection(codeToState(code));
             logError("Background operation retry gave up", e);
         }
diff --git 
a/curator-framework/src/main/java/org/apache/curator/framework/state/ConnectionStateManager.java
 
b/curator-framework/src/main/java/org/apache/curator/framework/state/ConnectionStateManager.java
index 7285431..32ddb78 100644
--- 
a/curator-framework/src/main/java/org/apache/curator/framework/state/ConnectionStateManager.java
+++ 
b/curator-framework/src/main/java/org/apache/curator/framework/state/ConnectionStateManager.java
@@ -285,6 +285,24 @@ public class ConnectionStateManager implements Closeable
                         checkSessionExpiration();
                     }
                 }
+
+                synchronized(this)
+                {
+                    if ( (currentConnectionState == ConnectionState.LOST) && 
client.getZookeeperClient().isConnected() )
+                    {
+                        // CURATOR-525 - there is a race whereby LOST is 
sometimes set after the connection has been repaired
+                        // this "hack" fixes it by resetting the connection
+                        log.warn("ConnectionState is LOST but isConnected() is 
true. Resetting connection.");
+                        try
+                        {
+                            client.getZookeeperClient().reset();
+                        }
+                        catch ( Exception e )
+                        {
+                            log.error("Could not reset connection after 
LOST/isConnected mismatch");
+                        }
+                    }
+                }
             }
             catch ( InterruptedException e )
             {
diff --git 
a/curator-framework/src/test/java/org/apache/curator/framework/imps/TestFrameworkEdges.java
 
b/curator-framework/src/test/java/org/apache/curator/framework/imps/TestFrameworkEdges.java
index 5a7c415..6fcd553 100644
--- 
a/curator-framework/src/test/java/org/apache/curator/framework/imps/TestFrameworkEdges.java
+++ 
b/curator-framework/src/test/java/org/apache/curator/framework/imps/TestFrameworkEdges.java
@@ -75,6 +75,42 @@ public class TestFrameworkEdges extends BaseClassForTests
         System.setProperty("zookeeper.extendedTypesEnabled", "true");
     }
 
+    @Test(description = "test case for CURATOR-525")
+    public void testValidateConnectionEventRaces() throws Exception
+    {
+        // test for CURATOR-525 - there is a race whereby Curator can go to 
LOST
+        // after the connection has been repaired. Prior to the fix, the 
Curator
+        // instance would become a zombie, never leaving the LOST state
+        try (CuratorFramework client = 
CuratorFrameworkFactory.newClient(server.getConnectString(), 2000, 1000, new 
RetryOneTime(1)))
+        {
+            CuratorFrameworkImpl clientImpl = (CuratorFrameworkImpl)client;
+
+            client.start();
+            client.getChildren().forPath("/");
+            client.create().forPath("/foo");
+
+            BlockingQueue<ConnectionState> stateQueue = new 
LinkedBlockingQueue<>();
+            client.getConnectionStateListenable().addListener((__, newState) 
-> stateQueue.add(newState));
+
+            server.stop();
+            Assert.assertEquals(timing.takeFromQueue(stateQueue), 
ConnectionState.SUSPENDED);
+            Assert.assertEquals(timing.takeFromQueue(stateQueue), 
ConnectionState.LOST);
+
+            clientImpl.debugCheckBackgroundRetryReadyLatch = new 
CountDownLatch(1);
+            clientImpl.debugCheckBackgroundRetryLatch = new CountDownLatch(1);
+
+            client.delete().guaranteed().inBackground().forPath("/foo");
+            timing.awaitLatch(clientImpl.debugCheckBackgroundRetryReadyLatch);
+            server.restart();
+            Assert.assertEquals(timing.takeFromQueue(stateQueue), 
ConnectionState.RECONNECTED);
+            clientImpl.injectedCode = KeeperException.Code.SESSIONEXPIRED;  // 
simulate an expiration being handled after the connection is repaired
+            clientImpl.debugCheckBackgroundRetryLatch.countDown();
+            Assert.assertEquals(timing.takeFromQueue(stateQueue), 
ConnectionState.LOST);
+
+            Assert.assertEquals(timing.takeFromQueue(stateQueue), 
ConnectionState.RECONNECTED);
+        }
+    }
+
     @Test
     public void testInjectSessionExpiration() throws Exception
     {

Reply via email to