This is an automated email from the ASF dual-hosted git repository. zstan pushed a commit to branch ignite-2.18 in repository https://gitbox.apache.org/repos/asf/ignite.git
commit a535985c2fa1cc4c958f1393f79984d143be4e31 Author: Maksim Timonin <[email protected]> AuthorDate: Wed Feb 11 12:48:36 2026 +0300 IGNITE-20973 Add handshake timeout configuration for java thin client (#12710) --- .../ignite/configuration/ClientConfiguration.java | 61 +++++++++++- .../client/thin/ClientChannelConfiguration.java | 23 +++-- .../internal/client/thin/TcpClientChannel.java | 20 ++-- .../GridNioClientConnectionMultiplexer.java | 2 +- .../ignite/client/ClientConfigurationTest.java | 3 +- .../java/org/apache/ignite/client/Comparers.java | 3 +- .../thin/ThinClientEnpointsDiscoveryTest.java | 3 +- .../ignite/internal/client/thin/TimeoutTest.java | 102 ++++++++++++++++++++- .../IgniteClientConnectionEventListenerTest.java | 3 +- .../odbc/ClientSessionOutboundQueueLimitTest.java | 3 +- 10 files changed, 196 insertions(+), 27 deletions(-) diff --git a/modules/core/src/main/java/org/apache/ignite/configuration/ClientConfiguration.java b/modules/core/src/main/java/org/apache/ignite/configuration/ClientConfiguration.java index ed1f0319fb0..e8aca91c95c 100644 --- a/modules/core/src/main/java/org/apache/ignite/configuration/ClientConfiguration.java +++ b/modules/core/src/main/java/org/apache/ignite/configuration/ClientConfiguration.java @@ -38,6 +38,7 @@ import org.apache.ignite.client.ClientTransactions; import org.apache.ignite.client.SslMode; import org.apache.ignite.client.SslProtocol; import org.apache.ignite.internal.client.thin.TcpIgniteClient; +import org.apache.ignite.internal.util.typedef.internal.LT; import org.apache.ignite.internal.util.typedef.internal.S; /** @@ -57,8 +58,11 @@ public final class ClientConfiguration implements Serializable { /** @serial Tcp no delay. */ private boolean tcpNoDelay = true; - /** @serial Timeout. 0 means infinite. */ - private int timeout; + /** @serial Handshake timeout in milliseconds. 0 means infinite. */ + private int handshakeTimeout; + + /** @serial Request timeout in milliseconds. 0 means infinite. */ + private int reqTimeout; /** @serial Send buffer size. 0 means system default. */ private int sndBufSize = 32 * 1024; @@ -227,18 +231,65 @@ public final class ClientConfiguration implements Serializable { } /** - * @return Send/receive timeout in milliseconds. + * @deprecated Use {@link #getHandshakeTimeout()} and {@link #getRequestTimeout()} instead. + * @return Request timeout in milliseconds. */ + @Deprecated public int getTimeout() { - return timeout; + if (reqTimeout != handshakeTimeout) { + LT.warn(logger, String.format( + "Deprecated getTimeout() API is used while request timeout (%d) differs from handshake timeout (%d). " + + "Returning request timeout. Please use getRequestTimeout() and getHandshakeTimeout() instead.", + reqTimeout, handshakeTimeout + )); + } + + return reqTimeout; } /** + * @deprecated Use {@link #setHandshakeTimeout(int)} and {@link #setRequestTimeout(int)} instead. * @param timeout Send/receive timeout in milliseconds. * @return {@code this} for chaining. */ + @Deprecated public ClientConfiguration setTimeout(int timeout) { - this.timeout = timeout; + handshakeTimeout = timeout; + reqTimeout = timeout; + + return this; + } + + /** + * @return Handshake timeout in milliseconds. 0 means infinite. + */ + public int getHandshakeTimeout() { + return handshakeTimeout; + } + + /** + * @param handshakeTimeout Handshake timeout in milliseconds. 0 means infinite. + * @return {@code this} for chaining. + */ + public ClientConfiguration setHandshakeTimeout(int handshakeTimeout) { + this.handshakeTimeout = handshakeTimeout; + + return this; + } + + /** + * @return Request timeout in milliseconds. 0 means infinite. + */ + public int getRequestTimeout() { + return reqTimeout; + } + + /** + * @param reqTimeout Request timeout in milliseconds. 0 means infinite. + * @return {@code this} for chaining. + */ + public ClientConfiguration setRequestTimeout(int reqTimeout) { + this.reqTimeout = reqTimeout; return this; } diff --git a/modules/core/src/main/java/org/apache/ignite/internal/client/thin/ClientChannelConfiguration.java b/modules/core/src/main/java/org/apache/ignite/internal/client/thin/ClientChannelConfiguration.java index 97cdc822159..b0e9910178e 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/client/thin/ClientChannelConfiguration.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/client/thin/ClientChannelConfiguration.java @@ -44,8 +44,11 @@ final class ClientChannelConfiguration { /** Tcp no delay. */ private final boolean tcpNoDelay; - /** Timeout. */ - private final int timeout; + /** Handshake timeout. */ + private final int handshakeTimeout; + + /** Request timeout. */ + private final int reqTimeout; /** Send buffer size. */ private final int sndBufSize; @@ -123,7 +126,8 @@ final class ClientChannelConfiguration { ClientChannelConfiguration(ClientConfiguration cfg, List<InetSocketAddress> addrs) { this.sslMode = cfg.getSslMode(); this.tcpNoDelay = cfg.isTcpNoDelay(); - this.timeout = cfg.getTimeout(); + this.handshakeTimeout = cfg.getHandshakeTimeout(); + this.reqTimeout = cfg.getRequestTimeout(); this.sndBufSize = cfg.getSendBufferSize(); this.rcvBufSize = cfg.getReceiveBufferSize(); this.sslClientCertKeyStorePath = cfg.getSslClientCertificateKeyStorePath(); @@ -172,10 +176,17 @@ final class ClientChannelConfiguration { } /** - * @return Timeout. + * @return Handshake timeout. + */ + public int getHandshakeTimeout() { + return handshakeTimeout; + } + + /** + * @return Request timeout. */ - public int getTimeout() { - return timeout; + public int getRequestTimeout() { + return reqTimeout; } /** diff --git a/modules/core/src/main/java/org/apache/ignite/internal/client/thin/TcpClientChannel.java b/modules/core/src/main/java/org/apache/ignite/internal/client/thin/TcpClientChannel.java index f8316faf64c..00a6c56c59e 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/client/thin/TcpClientChannel.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/client/thin/TcpClientChannel.java @@ -160,8 +160,11 @@ class TcpClientChannel implements ClientChannel, ClientMessageHandler, ClientCon /** Executor for async operation listeners. */ private final Executor asyncContinuationExecutor; - /** Send/receive timeout in milliseconds. */ - private final int timeout; + /** Handshake timeout in milliseconds. */ + private final int handshakeTimeout; + + /** Request timeout in milliseconds. */ + private final int reqTimeout; /** Heartbeat timer. */ private final Timer heartbeatTimer; @@ -195,7 +198,8 @@ class TcpClientChannel implements ClientChannel, ClientMessageHandler, ClientCon Executor cfgExec = cfg.getAsyncContinuationExecutor(); asyncContinuationExecutor = cfgExec != null ? cfgExec : ForkJoinPool.commonPool(); - timeout = cfg.getTimeout(); + handshakeTimeout = cfg.getHandshakeTimeout(); + reqTimeout = cfg.getRequestTimeout(); List<InetSocketAddress> addrs = cfg.getAddresses(); @@ -419,7 +423,7 @@ class TcpClientChannel implements ClientChannel, ClientMessageHandler, ClientCon long startTimeNanos = pendingReq.startTimeNanos; try { - ByteBuffer payload = timeout > 0 ? pendingReq.get(timeout) : pendingReq.get(); + ByteBuffer payload = reqTimeout > 0 ? pendingReq.get(reqTimeout) : pendingReq.get(); T res = null; if (payload != null && payloadReader != null) @@ -713,9 +717,6 @@ class TcpClientChannel implements ClientChannel, ClientMessageHandler, ClientCon long reqId = -1L; long startTime = System.nanoTime(); - eventListener.onHandshakeStart(new ConnectionDescription(sock.localAddress(), sock.remoteAddress(), - new ProtocolContext(ver).toString(), null)); - while (true) { ClientRequestFuture fut; @@ -733,10 +734,13 @@ class TcpClientChannel implements ClientChannel, ClientMessageHandler, ClientCon pendingReqsLock.readLock().unlock(); } + eventListener.onHandshakeStart(new ConnectionDescription(sock.localAddress(), sock.remoteAddress(), + new ProtocolContext(ver).toString(), null)); + handshakeReq(ver, user, pwd, userAttrs); try { - ByteBuffer buf = timeout > 0 ? fut.get(timeout) : fut.get(); + ByteBuffer buf = handshakeTimeout > 0 ? fut.get(handshakeTimeout) : fut.get(); BinaryInputStream res = BinaryStreams.inputStream(buf); diff --git a/modules/core/src/main/java/org/apache/ignite/internal/client/thin/io/gridnioserver/GridNioClientConnectionMultiplexer.java b/modules/core/src/main/java/org/apache/ignite/internal/client/thin/io/gridnioserver/GridNioClientConnectionMultiplexer.java index d2f6895ac1d..a26a6f2dac7 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/client/thin/io/gridnioserver/GridNioClientConnectionMultiplexer.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/client/thin/io/gridnioserver/GridNioClientConnectionMultiplexer.java @@ -89,7 +89,7 @@ public class GridNioClientConnectionMultiplexer implements ClientConnectionMulti else filters = new GridNioFilter[] {codecFilter}; - connTimeout = cfg.getTimeout(); + connTimeout = cfg.getHandshakeTimeout(); try { srv = GridNioServer.<ByteBuffer>builder() diff --git a/modules/core/src/test/java/org/apache/ignite/client/ClientConfigurationTest.java b/modules/core/src/test/java/org/apache/ignite/client/ClientConfigurationTest.java index 8f882653b09..5648f99b42d 100644 --- a/modules/core/src/test/java/org/apache/ignite/client/ClientConfigurationTest.java +++ b/modules/core/src/test/java/org/apache/ignite/client/ClientConfigurationTest.java @@ -59,7 +59,8 @@ public class ClientConfigurationTest { public void testSerialization() throws IOException, ClassNotFoundException { ClientConfiguration target = new ClientConfiguration() .setAddresses("127.0.0.1:10800", "127.0.0.1:10801") - .setTimeout(123) + .setHandshakeTimeout(123) + .setRequestTimeout(123) .setBinaryConfiguration(new BinaryConfiguration() .setClassNames(Collections.singleton("Person")) ) diff --git a/modules/core/src/test/java/org/apache/ignite/client/Comparers.java b/modules/core/src/test/java/org/apache/ignite/client/Comparers.java index 4442196c40d..cb244400c79 100644 --- a/modules/core/src/test/java/org/apache/ignite/client/Comparers.java +++ b/modules/core/src/test/java/org/apache/ignite/client/Comparers.java @@ -36,7 +36,8 @@ public final class Comparers { return Arrays.equals(a.getAddresses(), b.getAddresses()) && a.isTcpNoDelay() == b.isTcpNoDelay() && - a.getTimeout() == b.getTimeout() && + a.getHandshakeTimeout() == b.getHandshakeTimeout() && + a.getRequestTimeout() == b.getRequestTimeout() && a.getSendBufferSize() == b.getSendBufferSize() && a.getReceiveBufferSize() == b.getReceiveBufferSize(); } diff --git a/modules/core/src/test/java/org/apache/ignite/internal/client/thin/ThinClientEnpointsDiscoveryTest.java b/modules/core/src/test/java/org/apache/ignite/internal/client/thin/ThinClientEnpointsDiscoveryTest.java index bcb7627f8ec..d3f6d95bee2 100644 --- a/modules/core/src/test/java/org/apache/ignite/internal/client/thin/ThinClientEnpointsDiscoveryTest.java +++ b/modules/core/src/test/java/org/apache/ignite/internal/client/thin/ThinClientEnpointsDiscoveryTest.java @@ -158,7 +158,8 @@ public class ThinClientEnpointsDiscoveryTest extends ThinClientAbstractPartition // Config has good server address, client discovery returns unreachable address. // We expect the client to connect to the good address and ignore the unreachable one. ClientConfiguration ccfg = new ClientConfiguration() - .setTimeout(2000) + .setHandshakeTimeout(2000) + .setRequestTimeout(2000) .setAddresses("127.0.0.1:" + DFLT_PORT); IgniteClient client = Ignition.startClient(ccfg); diff --git a/modules/core/src/test/java/org/apache/ignite/internal/client/thin/TimeoutTest.java b/modules/core/src/test/java/org/apache/ignite/internal/client/thin/TimeoutTest.java index 882a4df96bf..9b3a08b2159 100644 --- a/modules/core/src/test/java/org/apache/ignite/internal/client/thin/TimeoutTest.java +++ b/modules/core/src/test/java/org/apache/ignite/internal/client/thin/TimeoutTest.java @@ -39,6 +39,7 @@ import org.apache.ignite.client.IgniteClient; import org.apache.ignite.configuration.ClientConfiguration; import org.apache.ignite.configuration.ClientConnectorConfiguration; import org.apache.ignite.configuration.IgniteConfiguration; +import org.apache.ignite.internal.IgniteFutureTimeoutCheckedException; import org.apache.ignite.internal.IgniteInternalFuture; import org.apache.ignite.internal.binary.streams.BinaryOutputStream; import org.apache.ignite.internal.binary.streams.BinaryStreams; @@ -67,7 +68,7 @@ public class TimeoutTest extends AbstractThinClientTest { /** {@inheritDoc} */ @Override protected ClientConfiguration getClientConfiguration() { - return super.getClientConfiguration().setTimeout(TIMEOUT); + return super.getClientConfiguration().setHandshakeTimeout(TIMEOUT).setRequestTimeout(TIMEOUT); } /** @@ -217,4 +218,103 @@ public class TimeoutTest extends AbstractThinClientTest { } } } + + /** + * Test that connection timeout is independent of request timeout during connection establishment. + */ + @Test + @SuppressWarnings("ThrowableNotThrown") + public void testConnectionTimeoutIndependentOfRequest() throws Exception { + ServerSocket sock = new ServerSocket(); + sock.bind(new InetSocketAddress("127.0.0.1", DFLT_PORT)); + + IgniteInternalFuture<?> fut = GridTestUtils.runAsync(() -> { + Socket accepted = null; + + try { + accepted = sock.accept(); + + while (!Thread.currentThread().isInterrupted()) + Thread.sleep(10); + } + finally { + U.closeQuiet(accepted); + } + }); + + try { + ClientConfiguration cfg = new ClientConfiguration() + .setAddresses("127.0.0.1:" + DFLT_PORT) + .setHandshakeTimeout(500) + .setRequestTimeout(Integer.MAX_VALUE); + + GridTestUtils.assertThrowsWithCause( + () -> Ignition.startClient(cfg), + IgniteFutureTimeoutCheckedException.class + ); + } + finally { + fut.cancel(); + + U.closeQuiet(sock); + } + } + + /** + * Test that request timeout is independent of connection timeout during operations. + */ + @Test + @SuppressWarnings("ThrowableNotThrown") + public void testRequestTimeoutIndependentOfConnection() throws Exception { + IgniteConfiguration igniteCfg = getConfiguration(getTestIgniteInstanceName()); + igniteCfg.setClientConnectorConfiguration(new ClientConnectorConfiguration().setHandshakeTimeout(Integer.MAX_VALUE)); + + try (Ignite ignite = startGrid(igniteCfg)) { + ClientConfiguration cfg = getClientConfiguration(ignite) + .setHandshakeTimeout(Integer.MAX_VALUE) + .setRequestTimeout(500); + + try (IgniteClient client = Ignition.startClient(cfg)) { + ClientCache<Object, Object> cache = client.getOrCreateCache("testTimeoutCache"); + + ClientCacheConfiguration txCacheCfg = new ClientCacheConfiguration() + .setName("txCache") + .setAtomicityMode(CacheAtomicityMode.TRANSACTIONAL); + + ClientCache<Object, Object> txCache = client.getOrCreateCache(txCacheCfg); + + CyclicBarrier barrier = new CyclicBarrier(2); + + IgniteInternalFuture<?> blockingThread = GridTestUtils.runAsync(() -> { + try (ClientTransaction ignored1 = client.transactions().txStart(PESSIMISTIC, REPEATABLE_READ)) { + txCache.put(1, "blocked"); + + barrier.await(2, TimeUnit.SECONDS); + + // Wait for main thread to time out + barrier.await(2, TimeUnit.SECONDS); + } + catch (Exception e) { + throw new IgniteException(e); + } + }); + + barrier.await(2, TimeUnit.SECONDS); + + try (ClientTransaction ignored1 = client.transactions().txStart(PESSIMISTIC, REPEATABLE_READ)) { + GridTestUtils.assertThrowsWithCause( + () -> txCache.put(1, "should timeout"), + IgniteFutureTimeoutCheckedException.class + ); + } + + barrier.await(2, TimeUnit.SECONDS); + + cache.put(1, "still works"); + assertEquals("still works", cache.get(1)); + + blockingThread.get(); + } + } + } } diff --git a/modules/core/src/test/java/org/apache/ignite/internal/client/thin/events/IgniteClientConnectionEventListenerTest.java b/modules/core/src/test/java/org/apache/ignite/internal/client/thin/events/IgniteClientConnectionEventListenerTest.java index 8712b6985d3..d5502bc8918 100644 --- a/modules/core/src/test/java/org/apache/ignite/internal/client/thin/events/IgniteClientConnectionEventListenerTest.java +++ b/modules/core/src/test/java/org/apache/ignite/internal/client/thin/events/IgniteClientConnectionEventListenerTest.java @@ -90,8 +90,7 @@ public class IgniteClientConnectionEventListenerTest extends GridCommonAbstractT HandshakeStartEvent hsStartEv = (HandshakeStartEvent)evSet.get(HandshakeStartEvent.class); - assertEquals(hsStartEv.connectionDescription().protocol(), "ProtocolContext [version=" + ProtocolVersion.LATEST_VER - + ", features=[]]"); + assertEquals(hsStartEv.connectionDescription().protocol(), "ProtocolContext [version=" + srvVer + ", features=[]]"); assertEquals(LOCALHOST, hsStartEv.connectionDescription().remoteAddress().getAddress()); assertEquals(SRV_PORT, hsStartEv.connectionDescription().remoteAddress().getPort()); assertEquals(LOCALHOST, hsStartEv.connectionDescription().localAddress().getAddress()); diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/odbc/ClientSessionOutboundQueueLimitTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/odbc/ClientSessionOutboundQueueLimitTest.java index 27bb599c861..a9546607a58 100644 --- a/modules/core/src/test/java/org/apache/ignite/internal/processors/odbc/ClientSessionOutboundQueueLimitTest.java +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/odbc/ClientSessionOutboundQueueLimitTest.java @@ -68,7 +68,8 @@ public class ClientSessionOutboundQueueLimitTest extends GridCommonAbstractTest try ( IgniteClient cli = Ignition.startClient(new ClientConfiguration() .setAddresses("127.0.0.1:10800") - .setTimeout(5000) // Server will drop packets intended for the client. So client can hang on handshake during reconnect. + .setHandshakeTimeout(5000) // Server will drop packets intended for the client, it can hang on handshake during reconnect. + .setRequestTimeout(5000) .setRetryLimit(1) // Let's not retry operations if the channel was closed while waiting for a response. .setEventListeners(new ConnectionEventListener() { @Override public void onConnectionClosed(ConnectionClosedEvent event) {
