This is an automated email from the ASF dual-hosted git repository. randgalt pushed a commit to branch CURATOR-525-fix-lost-state-race in repository https://gitbox.apache.org/repos/asf/curator.git
commit ba83de17ea9c99ff5fb73a8efa8d939ceda0d1f9 Author: randgalt <[email protected]> AuthorDate: Tue Mar 31 20:22:43 2020 -0500 CURATOR-525 There is a race whereby the ZooKeeper connection can be healed before Curator is finished processing the new connection state. When this happens the Curator instance becomes a Zombie stuck in the LOST state. This fix is a "hack". ConnectionStateManager will notice that the connection state is LOST but that the Curator instance reports that it is connected. When this happens, it is logged and the connection is reset. --- .../framework/imps/CuratorFrameworkImpl.java | 27 ++++++++++++++++ .../framework/state/ConnectionStateManager.java | 18 +++++++++++ .../curator/framework/imps/TestFrameworkEdges.java | 36 ++++++++++++++++++++++ 3 files changed, 81 insertions(+) diff --git a/curator-framework/src/main/java/org/apache/curator/framework/imps/CuratorFrameworkImpl.java b/curator-framework/src/main/java/org/apache/curator/framework/imps/CuratorFrameworkImpl.java index bfe61bf..1abfc28 100644 --- a/curator-framework/src/main/java/org/apache/curator/framework/imps/CuratorFrameworkImpl.java +++ b/curator-framework/src/main/java/org/apache/curator/framework/imps/CuratorFrameworkImpl.java @@ -816,6 +816,13 @@ public class CuratorFrameworkImpl implements CuratorFramework return ensembleTracker; } + @VisibleForTesting + volatile CountDownLatch debugCheckBackgroundRetryLatch; + @VisibleForTesting + volatile CountDownLatch debugCheckBackgroundRetryReadyLatch; + @VisibleForTesting + volatile KeeperException.Code injectedCode; + @SuppressWarnings({"ThrowableResultOfMethodCallIgnored"}) private <DATA_TYPE> boolean checkBackgroundRetry(OperationAndData<DATA_TYPE> operationAndData, CuratorEvent event) { @@ -851,6 +858,26 @@ public class CuratorFrameworkImpl implements CuratorFramework e = new Exception("Unknown result codegetResultCode()"); } + if ( debugCheckBackgroundRetryLatch != null ) // scaffolding to test CURATOR-525 + { + if ( debugCheckBackgroundRetryReadyLatch != null ) + { + debugCheckBackgroundRetryReadyLatch.countDown(); + } + try + { + debugCheckBackgroundRetryLatch.await(); + if (injectedCode != null) + { + code = injectedCode; + } + } + catch ( InterruptedException ex ) + { + Thread.currentThread().interrupt(); + } + } + validateConnection(codeToState(code)); logError("Background operation retry gave up", e); } diff --git a/curator-framework/src/main/java/org/apache/curator/framework/state/ConnectionStateManager.java b/curator-framework/src/main/java/org/apache/curator/framework/state/ConnectionStateManager.java index 7285431..32ddb78 100644 --- a/curator-framework/src/main/java/org/apache/curator/framework/state/ConnectionStateManager.java +++ b/curator-framework/src/main/java/org/apache/curator/framework/state/ConnectionStateManager.java @@ -285,6 +285,24 @@ public class ConnectionStateManager implements Closeable checkSessionExpiration(); } } + + synchronized(this) + { + if ( (currentConnectionState == ConnectionState.LOST) && client.getZookeeperClient().isConnected() ) + { + // CURATOR-525 - there is a race whereby LOST is sometimes set after the connection has been repaired + // this "hack" fixes it by resetting the connection + log.warn("ConnectionState is LOST but isConnected() is true. Resetting connection."); + try + { + client.getZookeeperClient().reset(); + } + catch ( Exception e ) + { + log.error("Could not reset connection after LOST/isConnected mismatch"); + } + } + } } catch ( InterruptedException e ) { diff --git a/curator-framework/src/test/java/org/apache/curator/framework/imps/TestFrameworkEdges.java b/curator-framework/src/test/java/org/apache/curator/framework/imps/TestFrameworkEdges.java index 5a7c415..6fcd553 100644 --- a/curator-framework/src/test/java/org/apache/curator/framework/imps/TestFrameworkEdges.java +++ b/curator-framework/src/test/java/org/apache/curator/framework/imps/TestFrameworkEdges.java @@ -75,6 +75,42 @@ public class TestFrameworkEdges extends BaseClassForTests System.setProperty("zookeeper.extendedTypesEnabled", "true"); } + @Test(description = "test case for CURATOR-525") + public void testValidateConnectionEventRaces() throws Exception + { + // test for CURATOR-525 - there is a race whereby Curator can go to LOST + // after the connection has been repaired. Prior to the fix, the Curator + // instance would become a zombie, never leaving the LOST state + try (CuratorFramework client = CuratorFrameworkFactory.newClient(server.getConnectString(), 2000, 1000, new RetryOneTime(1))) + { + CuratorFrameworkImpl clientImpl = (CuratorFrameworkImpl)client; + + client.start(); + client.getChildren().forPath("/"); + client.create().forPath("/foo"); + + BlockingQueue<ConnectionState> stateQueue = new LinkedBlockingQueue<>(); + client.getConnectionStateListenable().addListener((__, newState) -> stateQueue.add(newState)); + + server.stop(); + Assert.assertEquals(timing.takeFromQueue(stateQueue), ConnectionState.SUSPENDED); + Assert.assertEquals(timing.takeFromQueue(stateQueue), ConnectionState.LOST); + + clientImpl.debugCheckBackgroundRetryReadyLatch = new CountDownLatch(1); + clientImpl.debugCheckBackgroundRetryLatch = new CountDownLatch(1); + + client.delete().guaranteed().inBackground().forPath("/foo"); + timing.awaitLatch(clientImpl.debugCheckBackgroundRetryReadyLatch); + server.restart(); + Assert.assertEquals(timing.takeFromQueue(stateQueue), ConnectionState.RECONNECTED); + clientImpl.injectedCode = KeeperException.Code.SESSIONEXPIRED; // simulate an expiration being handled after the connection is repaired + clientImpl.debugCheckBackgroundRetryLatch.countDown(); + Assert.assertEquals(timing.takeFromQueue(stateQueue), ConnectionState.LOST); + + Assert.assertEquals(timing.takeFromQueue(stateQueue), ConnectionState.RECONNECTED); + } + } + @Test public void testInjectSessionExpiration() throws Exception {
