This is an automated email from the ASF dual-hosted git repository.

andor pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/zookeeper.git


The following commit(s) were added to refs/heads/master by this push:
     new 1c903df95 ZOOKEEPER-4923: Add timeout to control brand-new session 
establishment
1c903df95 is described below

commit 1c903df950553610b3c67f2edda61f40205c5cc7
Author: Kezhu Wang <[email protected]>
AuthorDate: Tue Dec 16 06:39:07 2025 +0800

    ZOOKEEPER-4923: Add timeout to control brand-new session establishment
    
    Reviewers: anmolnar, cnauroth, anmolnar
    Author: kezhuw
    Closes #2253 from kezhuw/ZOOKEEPER-4923-brand-new-session-timeout-control
---
 .../main/java/org/apache/zookeeper/ClientCnxn.java | 40 ++++++++++++++++++-
 .../main/java/org/apache/zookeeper/ZooKeeper.java  |  3 ++
 .../apache/zookeeper/client/ZooKeeperBuilder.java  | 17 ++++++++
 .../apache/zookeeper/client/ZooKeeperOptions.java  | 11 ++++++
 .../zookeeper/ClientCnxnSocketFragilityTest.java   | 10 +++--
 .../apache/zookeeper/ClientRequestTimeoutTest.java |  4 ++
 .../apache/zookeeper/test/SessionTimeoutTest.java  | 46 ++++++++++++++++++++++
 7 files changed, 127 insertions(+), 4 deletions(-)

diff --git 
a/zookeeper-server/src/main/java/org/apache/zookeeper/ClientCnxn.java 
b/zookeeper-server/src/main/java/org/apache/zookeeper/ClientCnxn.java
index 7663e27f6..020f9408a 100644
--- a/zookeeper-server/src/main/java/org/apache/zookeeper/ClientCnxn.java
+++ b/zookeeper-server/src/main/java/org/apache/zookeeper/ClientCnxn.java
@@ -168,6 +168,8 @@ static class AuthData {
 
     private final int sessionTimeout;
 
+    private final long newSessionTimeout;
+
     private final ZKWatchManager watchManager;
 
     private long sessionId;
@@ -398,6 +400,36 @@ public ClientCnxn(
         long sessionId,
         byte[] sessionPasswd,
         boolean canBeReadOnly
+    ) throws IOException {
+        this(hostProvider, sessionTimeout, Long.MAX_VALUE, clientConfig, 
defaultWatcher, clientCnxnSocket, sessionId, sessionPasswd, canBeReadOnly);
+    }
+
+    /**
+     * Creates a connection object. The actual network connect doesn't get
+     * established until needed. The start() instance method must be called
+     * after construction.
+     *
+     * @param hostProvider the list of ZooKeeper servers to connect to
+     * @param sessionTimeout the timeout for connections.
+     * @param newSessionTimeout the timeout before giving up brand-new session 
establishment.
+     * @param clientConfig the client configuration.
+     * @param defaultWatcher default watcher for this connection
+     * @param clientCnxnSocket the socket implementation used (e.g. NIO/Netty)
+     * @param sessionId session id if re-establishing session
+     * @param sessionPasswd session passwd if re-establishing session
+     * @param canBeReadOnly whether the connection is allowed to go to 
read-only mode in case of partitioning
+     * @throws IOException in cases of broken network
+     */
+    public ClientCnxn(
+            HostProvider hostProvider,
+            int sessionTimeout,
+            long newSessionTimeout,
+            ZKClientConfig clientConfig,
+            Watcher defaultWatcher,
+            ClientCnxnSocket clientCnxnSocket,
+            long sessionId,
+            byte[] sessionPasswd,
+            boolean canBeReadOnly
     ) throws IOException {
         this.hostProvider = hostProvider;
         this.sessionTimeout = sessionTimeout;
@@ -413,6 +445,7 @@ public ClientCnxn(
         this.connectTimeout = sessionTimeout / hostProvider.size();
         this.readTimeout = sessionTimeout * 2 / 3;
         this.expirationTimeout = sessionTimeout * 4 / 3;
+        this.newSessionTimeout = newSessionTimeout == 0 ? expirationTimeout : 
newSessionTimeout;
 
         this.sendThread = new SendThread(clientCnxnSocket);
         this.eventThread = new EventThread();
@@ -1192,7 +1225,12 @@ public void run() {
                         to = connectTimeout - clientCnxnSocket.getIdleSend();
                     }
 
-                    int expiration = sessionId == 0 ? Integer.MAX_VALUE : 
expirationTimeout - clientCnxnSocket.getIdleRecv();
+                    long expiration;
+                    if (sessionId == 0) {
+                        expiration = newSessionTimeout - 
clientCnxnSocket.getIdleRecv();
+                    } else {
+                        expiration = expirationTimeout - 
clientCnxnSocket.getIdleRecv();
+                    }
                     if (expiration <= 0) {
                         String warnInfo = String.format(
                             "Client session timed out, have not heard from 
server in %dms for session id 0x%s",
diff --git a/zookeeper-server/src/main/java/org/apache/zookeeper/ZooKeeper.java 
b/zookeeper-server/src/main/java/org/apache/zookeeper/ZooKeeper.java
index b3b5af454..7533e01a9 100644
--- a/zookeeper-server/src/main/java/org/apache/zookeeper/ZooKeeper.java
+++ b/zookeeper-server/src/main/java/org/apache/zookeeper/ZooKeeper.java
@@ -697,6 +697,7 @@ public ZooKeeper(
     ClientCnxn createConnection(
         HostProvider hostProvider,
         int sessionTimeout,
+        long newSessionTimeout,
         ZKClientConfig clientConfig,
         Watcher defaultWatcher,
         ClientCnxnSocket clientCnxnSocket,
@@ -707,6 +708,7 @@ ClientCnxn createConnection(
         return new ClientCnxn(
             hostProvider,
             sessionTimeout,
+            newSessionTimeout,
             clientConfig,
             defaultWatcher,
             clientCnxnSocket,
@@ -1148,6 +1150,7 @@ public ZooKeeper(ZooKeeperOptions options) throws 
IOException {
         cnxn = createConnection(
             hostProvider,
             sessionTimeout,
+            options.getNewSessionTimeoutMs(),
             this.clientConfig,
             watcher,
             getClientCnxnSocket(),
diff --git 
a/zookeeper-server/src/main/java/org/apache/zookeeper/client/ZooKeeperBuilder.java
 
b/zookeeper-server/src/main/java/org/apache/zookeeper/client/ZooKeeperBuilder.java
index f484dcfee..03b5f4c4e 100644
--- 
a/zookeeper-server/src/main/java/org/apache/zookeeper/client/ZooKeeperBuilder.java
+++ 
b/zookeeper-server/src/main/java/org/apache/zookeeper/client/ZooKeeperBuilder.java
@@ -39,6 +39,7 @@
 public class ZooKeeperBuilder {
     private final String connectString;
     private final Duration sessionTimeout;
+    private Duration newSessionTimeout = Duration.ofSeconds(Long.MAX_VALUE, 
999_999_999L);
     private Function<Collection<InetSocketAddress>, HostProvider> hostProvider;
     private Watcher defaultWatcher;
     private boolean canBeReadOnly = false;
@@ -117,6 +118,21 @@ public ZooKeeperBuilder withSession(long sessionId, byte[] 
sessionPasswd) {
         return this;
     }
 
+    /**
+     * Specifies timeout to establish a brand-new session.
+     *
+     * @param timeout timeout to get {@link 
org.apache.zookeeper.Watcher.Event.KeeperState#Expired} in establishing a
+     *                brand-new session. {@code 
Duration.ofSeconds(Long.MAX_VALUE, 999_999_999L)}, which is the default,
+     *                means endless retry until connected. {@code 
Duration.ZERO} means a sensible value deduced from
+     *                specified session timeout, currently, it is approximate 
{@code sessionTimeout * 4 / 3}.
+     * @return this
+     * @since 3.10.0
+     */
+    public ZooKeeperBuilder withNewSessionTimeout(Duration timeout) {
+        this.newSessionTimeout = timeout;
+        return this;
+    }
+
     /**
      * Specifies the client config used to construct ZooKeeper instances.
      *
@@ -143,6 +159,7 @@ public ZooKeeperOptions toOptions() {
         return new ZooKeeperOptions(
             connectString,
             sessionTimeout,
+            newSessionTimeout,
             defaultWatcher,
             hostProvider,
             canBeReadOnly,
diff --git 
a/zookeeper-server/src/main/java/org/apache/zookeeper/client/ZooKeeperOptions.java
 
b/zookeeper-server/src/main/java/org/apache/zookeeper/client/ZooKeeperOptions.java
index 52a173ebf..2605cb0f8 100644
--- 
a/zookeeper-server/src/main/java/org/apache/zookeeper/client/ZooKeeperOptions.java
+++ 
b/zookeeper-server/src/main/java/org/apache/zookeeper/client/ZooKeeperOptions.java
@@ -33,6 +33,7 @@
 public class ZooKeeperOptions {
     private final String connectString;
     private final Duration sessionTimeout;
+    private final Duration newSessionTimeout;
     private final Watcher defaultWatcher;
     private final Function<Collection<InetSocketAddress>, HostProvider> 
hostProvider;
     private final boolean canBeReadOnly;
@@ -42,6 +43,7 @@ public class ZooKeeperOptions {
 
     ZooKeeperOptions(String connectString,
                      Duration sessionTimeout,
+                     Duration newSessionTimeout,
                      Watcher defaultWatcher,
                      Function<Collection<InetSocketAddress>, HostProvider> 
hostProvider,
                      boolean canBeReadOnly,
@@ -50,6 +52,7 @@ public class ZooKeeperOptions {
                      ZKClientConfig clientConfig) {
         this.connectString = connectString;
         this.sessionTimeout = sessionTimeout;
+        this.newSessionTimeout = newSessionTimeout;
         this.hostProvider = hostProvider;
         this.defaultWatcher = defaultWatcher;
         this.canBeReadOnly = canBeReadOnly;
@@ -66,6 +69,14 @@ public int getSessionTimeoutMs() {
         return (int) Long.min(Integer.MAX_VALUE, sessionTimeout.toMillis());
     }
 
+    public long getNewSessionTimeoutMs() {
+        try {
+            return newSessionTimeout.toMillis();
+        } catch (ArithmeticException ignored) {
+            return Long.MAX_VALUE;
+        }
+    }
+
     public Watcher getDefaultWatcher() {
         return defaultWatcher;
     }
diff --git 
a/zookeeper-server/src/test/java/org/apache/zookeeper/ClientCnxnSocketFragilityTest.java
 
b/zookeeper-server/src/test/java/org/apache/zookeeper/ClientCnxnSocketFragilityTest.java
index 2b70a599d..27c07fe90 100644
--- 
a/zookeeper-server/src/test/java/org/apache/zookeeper/ClientCnxnSocketFragilityTest.java
+++ 
b/zookeeper-server/src/test/java/org/apache/zookeeper/ClientCnxnSocketFragilityTest.java
@@ -96,7 +96,7 @@ public void testSocketClosedAfterFailure() throws Exception {
                 BusyServer server = new BusyServer();
                 ZooKeeper zk = new ZooKeeper(server.getHostPort(), (int) 
sessionTimeout.toMillis(), null) {
                 @Override
-                ClientCnxn createConnection(HostProvider hostProvider, int 
sessionTimeout, ZKClientConfig clientConfig, Watcher defaultWatcher, 
ClientCnxnSocket clientCnxnSocket, long sessionId, byte[] sessionPasswd, 
boolean canBeReadOnly) throws IOException {
+                ClientCnxn createConnection(HostProvider hostProvider, int 
sessionTimeout, long newSessionTimeout, ZKClientConfig clientConfig, Watcher 
defaultWatcher, ClientCnxnSocket clientCnxnSocket, long sessionId, byte[] 
sessionPasswd, boolean canBeReadOnly) throws IOException {
                     ClientCnxnSocketNIO socket = spy((ClientCnxnSocketNIO) 
clientCnxnSocket);
 
                     doAnswer(mock -> {
@@ -110,7 +110,7 @@ ClientCnxn createConnection(HostProvider hostProvider, int 
sessionTimeout, ZKCli
                     }).when(socket).createSock();
 
                     nioSelector.set(socket.getSelector());
-                    return super.createConnection(hostProvider, 
sessionTimeout, clientConfig, defaultWatcher, socket, sessionId, sessionPasswd, 
canBeReadOnly);
+                    return super.createConnection(hostProvider, 
sessionTimeout, newSessionTimeout, clientConfig, defaultWatcher, socket, 
sessionId, sessionPasswd, canBeReadOnly);
                 }
             }) {
 
@@ -328,6 +328,7 @@ class CustomClientCnxn extends ClientCnxn {
         public CustomClientCnxn(
             HostProvider hostProvider,
             int sessionTimeout,
+            long newSessionTimeout,
             ZKClientConfig zkClientConfig,
             Watcher defaultWatcher,
             ClientCnxnSocket clientCnxnSocket,
@@ -338,6 +339,7 @@ public CustomClientCnxn(
             super(
                 hostProvider,
                 sessionTimeout,
+                newSessionTimeout,
                 zkClientConfig,
                 defaultWatcher,
                 clientCnxnSocket,
@@ -403,6 +405,7 @@ public boolean isAlive() {
         ClientCnxn createConnection(
             HostProvider hostProvider,
             int sessionTimeout,
+            long newSessionTimeout,
             ZKClientConfig clientConfig,
             Watcher defaultWatcher,
             ClientCnxnSocket clientCnxnSocket,
@@ -415,6 +418,7 @@ ClientCnxn createConnection(
             ClientCnxnSocketFragilityTest.this.cnxn = new CustomClientCnxn(
                 hostProvider,
                 sessionTimeout,
+                newSessionTimeout,
                 clientConfig,
                 defaultWatcher,
                 clientCnxnSocket,
@@ -424,4 +428,4 @@ ClientCnxn createConnection(
             return ClientCnxnSocketFragilityTest.this.cnxn;
         }
     }
-}
\ No newline at end of file
+}
diff --git 
a/zookeeper-server/src/test/java/org/apache/zookeeper/ClientRequestTimeoutTest.java
 
b/zookeeper-server/src/test/java/org/apache/zookeeper/ClientRequestTimeoutTest.java
index 93f801cab..a05825ce2 100644
--- 
a/zookeeper-server/src/test/java/org/apache/zookeeper/ClientRequestTimeoutTest.java
+++ 
b/zookeeper-server/src/test/java/org/apache/zookeeper/ClientRequestTimeoutTest.java
@@ -225,6 +225,7 @@ class CustomClientCnxn extends ClientCnxn {
         CustomClientCnxn(
             HostProvider hostProvider,
             int sessionTimeout,
+            long newSessionTimeout,
             ZKClientConfig clientConfig,
             Watcher defaultWatcher,
             ClientCnxnSocket clientCnxnSocket,
@@ -235,6 +236,7 @@ class CustomClientCnxn extends ClientCnxn {
             super(
                 hostProvider,
                 sessionTimeout,
+                newSessionTimeout,
                 clientConfig,
                 defaultWatcher,
                 clientCnxnSocket,
@@ -286,6 +288,7 @@ public CustomZooKeeper(String connectString, int 
sessionTimeout, Watcher watcher
         ClientCnxn createConnection(
             HostProvider hostProvider,
             int sessionTimeout,
+            long newSessionTimeout,
             ZKClientConfig clientConfig,
             Watcher defaultWatcher,
             ClientCnxnSocket clientCnxnSocket,
@@ -296,6 +299,7 @@ ClientCnxn createConnection(
             return new CustomClientCnxn(
                 hostProvider,
                 sessionTimeout,
+                newSessionTimeout,
                 clientConfig,
                 defaultWatcher,
                 clientCnxnSocket,
diff --git 
a/zookeeper-server/src/test/java/org/apache/zookeeper/test/SessionTimeoutTest.java
 
b/zookeeper-server/src/test/java/org/apache/zookeeper/test/SessionTimeoutTest.java
index 86659ba70..5688d9f55 100644
--- 
a/zookeeper-server/src/test/java/org/apache/zookeeper/test/SessionTimeoutTest.java
+++ 
b/zookeeper-server/src/test/java/org/apache/zookeeper/test/SessionTimeoutTest.java
@@ -19,6 +19,7 @@
 package org.apache.zookeeper.test;
 
 import static org.hamcrest.MatcherAssert.assertThat;
+import static org.hamcrest.Matchers.greaterThan;
 import static org.hamcrest.Matchers.greaterThanOrEqualTo;
 import static org.hamcrest.Matchers.lessThan;
 import static org.junit.jupiter.api.Assertions.assertNotNull;
@@ -27,6 +28,7 @@
 import static org.junit.jupiter.api.Assertions.assertTrue;
 import static org.junit.jupiter.api.Assertions.fail;
 import java.io.IOException;
+import java.time.Duration;
 import java.util.Arrays;
 import java.util.List;
 import java.util.concurrent.CompletableFuture;
@@ -40,6 +42,7 @@
 import org.apache.zookeeper.Watcher;
 import org.apache.zookeeper.ZooDefs;
 import org.apache.zookeeper.ZooKeeper;
+import org.apache.zookeeper.client.ZooKeeperBuilder;
 import org.apache.zookeeper.common.BusyServer;
 import org.apache.zookeeper.common.Time;
 import org.junit.jupiter.api.BeforeEach;
@@ -176,6 +179,49 @@ public void testSessionExpirationWhenNoServerUp() throws 
Exception {
             assertThrows(TimeoutException.class, () -> watcher.expired.get(3 * 
sessionTimeout, TimeUnit.MILLISECONDS));
             assertThrows(KeeperException.ConnectionLossException.class, () -> 
zk.exists("/", null));
         }
+
+        // when: try to establish a brand-new session using builder with 
default newSessionTimeout
+        watcher.reset();
+        try (ZooKeeper zk = new ZooKeeperBuilder(hostPort, 
Duration.ofMillis(sessionTimeout))
+                .withDefaultWatcher(watcher)
+                .build()) {
+            // then: never Expired
+            assertThrows(TimeoutException.class, () -> watcher.expired.get(3 * 
sessionTimeout, TimeUnit.MILLISECONDS));
+            assertThrows(KeeperException.ConnectionLossException.class, () -> 
zk.exists("/", null));
+        }
+
+        // when: try to establish a brand-new session using builder with 
Duration.ZERO newSessionTimeout
+        watcher.reset();
+        long start = Time.currentElapsedTime();
+        try (ZooKeeper zk = new ZooKeeperBuilder(hostPort, 
Duration.ofMillis(sessionTimeout))
+                .withDefaultWatcher(watcher)
+                .withNewSessionTimeout(Duration.ZERO)
+                .build()) {
+            // then: get Expired after some delay
+            watcher.expired.join();
+            long elapsed = Time.currentElapsedTime() - start;
+            assertThat(elapsed, greaterThan((long) sessionTimeout));
+            assertThat(elapsed, lessThan(sessionTimeout * 10L));
+            // then: future request will get SessionExpiredException
+            assertThrows(KeeperException.SessionExpiredException.class, () -> 
zk.exists("/", null));
+        }
+
+        // when: try to establish a brand-new session using builder with 
custom newSessionTimeout
+        watcher.reset();
+        start = Time.currentElapsedTime();
+        Duration newSessionTimeout = Duration.ofMillis(300);
+        try (ZooKeeper zk = new ZooKeeperBuilder(hostPort, 
Duration.ofMillis(30000))
+                .withDefaultWatcher(watcher)
+                .withNewSessionTimeout(newSessionTimeout)
+                .build()) {
+            // then: get Expired after newSessionTimeout
+            watcher.expired.join();
+            long elapsed = Time.currentElapsedTime() - start;
+            assertThat(elapsed, 
greaterThanOrEqualTo(newSessionTimeout.toMillis()));
+            assertThat(elapsed, lessThan(newSessionTimeout.toMillis() * 10));
+            // then: future request will get SessionExpiredException
+            assertThrows(KeeperException.SessionExpiredException.class, () -> 
zk.exists("/", null));
+        }
     }
 
     @Test

Reply via email to