This is an automated email from the ASF dual-hosted git repository. tison pushed a commit to branch branch-3.9.4 in repository https://gitbox.apache.org/repos/asf/zookeeper.git
The following commit(s) were added to refs/heads/branch-3.9.4 by this push: new 311239892 ZOOKEEPER-4921: Retry endlessly to establish a brand-new session (#2265) 311239892 is described below commit 311239892255df52929cca1a336968026c586541 Author: Kezhu Wang <kez...@apache.org> AuthorDate: Tue Jun 17 11:47:09 2025 +0800 ZOOKEEPER-4921: Retry endlessly to establish a brand-new session (#2265) This partially rollback ZOOKEEPER-4508 to keep consistent with versions prior to 3.9.3 (excluded), so to maintain compatibility with third party libraries. Refs: ZOOKEEPER-4508, ZOOKEEPER-4921, ZOOKEEPER-4923 and https://lists.apache.org/thread/nfb9z7rhgglbjzfxvg4z2m3pks53b3c1 --- .../main/java/org/apache/zookeeper/ClientCnxn.java | 2 +- .../apache/zookeeper/test/SessionTimeoutTest.java | 65 +++++++++++++++------- 2 files changed, 47 insertions(+), 20 deletions(-) diff --git a/zookeeper-server/src/main/java/org/apache/zookeeper/ClientCnxn.java b/zookeeper-server/src/main/java/org/apache/zookeeper/ClientCnxn.java index 2d50acb01..d51313cd3 100644 --- a/zookeeper-server/src/main/java/org/apache/zookeeper/ClientCnxn.java +++ b/zookeeper-server/src/main/java/org/apache/zookeeper/ClientCnxn.java @@ -1241,7 +1241,7 @@ public void run() { to = connectTimeout - clientCnxnSocket.getIdleSend(); } - int expiration = expirationTimeout - clientCnxnSocket.getIdleRecv(); + int expiration = sessionId == 0 ? Integer.MAX_VALUE : expirationTimeout - clientCnxnSocket.getIdleRecv(); if (expiration <= 0) { String warnInfo = String.format( "Client session timed out, have not heard from server in %dms for session id 0x%s", diff --git a/zookeeper-server/src/test/java/org/apache/zookeeper/test/SessionTimeoutTest.java b/zookeeper-server/src/test/java/org/apache/zookeeper/test/SessionTimeoutTest.java index 7a59f5eb9..9f5943f68 100644 --- a/zookeeper-server/src/test/java/org/apache/zookeeper/test/SessionTimeoutTest.java +++ b/zookeeper-server/src/test/java/org/apache/zookeeper/test/SessionTimeoutTest.java @@ -18,6 +18,9 @@ package org.apache.zookeeper.test; +import static org.hamcrest.MatcherAssert.assertThat; +import static org.hamcrest.Matchers.greaterThanOrEqualTo; +import static org.hamcrest.Matchers.lessThan; import static org.junit.jupiter.api.Assertions.assertNotNull; import static org.junit.jupiter.api.Assertions.assertNull; import static org.junit.jupiter.api.Assertions.assertThrows; @@ -31,12 +34,15 @@ import java.util.concurrent.CompletableFuture; import java.util.concurrent.CountDownLatch; import java.util.concurrent.TimeUnit; +import java.util.concurrent.TimeoutException; import org.apache.zookeeper.CreateMode; import org.apache.zookeeper.KeeperException; import org.apache.zookeeper.TestableZooKeeper; import org.apache.zookeeper.WatchedEvent; import org.apache.zookeeper.Watcher; import org.apache.zookeeper.ZooDefs; +import org.apache.zookeeper.ZooKeeper; +import org.apache.zookeeper.common.Time; import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; import org.slf4j.Logger; @@ -54,6 +60,21 @@ public void setUp() throws Exception { zk = createClient(); } + private static class ExpiredWatcher implements Watcher { + public volatile CompletableFuture<Void> expired = new CompletableFuture<>(); + + synchronized void reset() { + expired = new CompletableFuture<>(); + } + + @Override + public synchronized void process(WatchedEvent event) { + if (event.getState() == Event.KeeperState.Expired) { + expired.complete(null); + } + } + } + private static class BusyServer implements AutoCloseable { private final ServerSocket server; private final Socket client; @@ -143,17 +164,24 @@ public void testSessionExpirationAfterAllServerDown() throws Exception { // stop client also to gain less distraction zk.close(); - // small connection timeout to gain quick ci feedback - int sessionTimeout = 3000; - CompletableFuture<Void> expired = new CompletableFuture<>(); + // given: established session + int sessionTimeout = 3000; // small connection timeout to gain quick ci feedback + ExpiredWatcher watcher = new ExpiredWatcher(); zk = createClient(new CountdownWatcher(), hostPort, sessionTimeout); - zk.register(event -> { - if (event.getState() == Watcher.Event.KeeperState.Expired) { - expired.complete(null); - } - }); + zk.register(watcher); + + // when: all server down + long start = Time.currentElapsedTime(); + zk.sync("/"); // touch timeout counts stopServer(); - expired.join(); + + // then: get Expired after session timeout + watcher.expired.join(); + long elapsed = Time.currentElapsedTime() - start; + assertThat(elapsed, greaterThanOrEqualTo((long) zk.getSessionTimeout())); + assertThat(elapsed, lessThan(zk.getSessionTimeout() * 10L)); + + // then: future request will get SessionExpiredException assertThrows(KeeperException.SessionExpiredException.class, () -> zk.exists("/", null)); } @@ -162,18 +190,17 @@ public void testSessionExpirationWhenNoServerUp() throws Exception { // stop client also to gain less distraction zk.close(); + // given: unavailable cluster stopServer(); - // small connection timeout to gain quick ci feedback - int sessionTimeout = 3000; - CompletableFuture<Void> expired = new CompletableFuture<>(); - new TestableZooKeeper(hostPort, sessionTimeout, event -> { - if (event.getState() == Watcher.Event.KeeperState.Expired) { - expired.complete(null); - } - }); - expired.join(); - assertThrows(KeeperException.SessionExpiredException.class, () -> zk.exists("/", null)); + // when: try to establish a brand-new session + int sessionTimeout = 300; // small connection timeout to gain quick ci feedback + ExpiredWatcher watcher = new ExpiredWatcher(); + try (ZooKeeper zk = new ZooKeeper(hostPort, sessionTimeout, watcher)) { + // then: never Expired + assertThrows(TimeoutException.class, () -> watcher.expired.get(3 * sessionTimeout, TimeUnit.MILLISECONDS)); + assertThrows(KeeperException.ConnectionLossException.class, () -> zk.exists("/", null)); + } } @Test