This is an automated email from the ASF dual-hosted git repository. alexpl pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/ignite.git
The following commit(s) were added to refs/heads/master by this push: new 533a636 IGNITE-13719 Java thin client: Fix timeout on idle connection - Fixes #8480. 533a636 is described below commit 533a6365cece7896e6ade969f085ee41b690759d Author: Aleksey Plekhanov <plehanov.a...@gmail.com> AuthorDate: Thu Nov 19 16:47:54 2020 +0300 IGNITE-13719 Java thin client: Fix timeout on idle connection - Fixes #8480. Signed-off-by: Aleksey Plekhanov <plehanov.a...@gmail.com> --- .../internal/client/thin/TcpClientChannel.java | 15 +- .../client/thin/AbstractThinClientTest.java | 20 +- .../ignite/internal/client/thin/TimeoutTest.java | 220 +++++++++++++++++++++ .../org/apache/ignite/client/ClientTestSuite.java | 4 +- 4 files changed, 254 insertions(+), 5 deletions(-) diff --git a/modules/core/src/main/java/org/apache/ignite/internal/client/thin/TcpClientChannel.java b/modules/core/src/main/java/org/apache/ignite/internal/client/thin/TcpClientChannel.java index c357672..7c79130 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/client/thin/TcpClientChannel.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/client/thin/TcpClientChannel.java @@ -168,6 +168,9 @@ class TcpClientChannel implements ClientChannel { /** Receiver thread (processes incoming messages). */ private Thread receiverThread; + /** Send/receive timeout in milliseconds. */ + private final int timeout; + /** Constructor. */ TcpClientChannel(ClientChannelConfiguration cfg) throws ClientConnectionException, ClientAuthenticationException, ClientProtocolError { @@ -176,17 +179,23 @@ class TcpClientChannel implements ClientChannel { Executor cfgExec = cfg.getAsyncContinuationExecutor(); asyncContinuationExecutor = cfgExec != null ? cfgExec : ForkJoinPool.commonPool(); + timeout = cfg.getTimeout(); + try { sock = createSocket(cfg); out = sock.getOutputStream(); dataInput = new ByteCountingDataInput(sock.getInputStream()); + + handshake(DEFAULT_VERSION, cfg.getUserName(), cfg.getUserPassword(), cfg.getUserAttributes()); + + // Disable timeout on socket after handshake, instead, get future result with timeout in "receive" method. + if (timeout > 0) + sock.setSoTimeout(0); } catch (IOException e) { throw handleIOError("addr=" + cfg.getAddress(), e); } - - handshake(DEFAULT_VERSION, cfg.getUserName(), cfg.getUserPassword(), cfg.getUserAttributes()); } /** {@inheritDoc} */ @@ -303,7 +312,7 @@ class TcpClientChannel implements ClientChannel { private <T> T receive(ClientRequestFuture pendingReq, Function<PayloadInputChannel, T> payloadReader) throws ClientException { try { - byte[] payload = pendingReq.get(); + byte[] payload = timeout > 0 ? pendingReq.get(timeout) : pendingReq.get(); if (payload == null || payloadReader == null) return null; diff --git a/modules/core/src/test/java/org/apache/ignite/internal/client/thin/AbstractThinClientTest.java b/modules/core/src/test/java/org/apache/ignite/internal/client/thin/AbstractThinClientTest.java index b985a77..2e2f21d 100644 --- a/modules/core/src/test/java/org/apache/ignite/internal/client/thin/AbstractThinClientTest.java +++ b/modules/core/src/test/java/org/apache/ignite/internal/client/thin/AbstractThinClientTest.java @@ -39,6 +39,24 @@ public abstract class AbstractThinClientTest extends GridCommonAbstractTest { } /** + * Return thin client port for given node. + * + * @param node Node. + */ + protected int clientPort(ClusterNode node) { + return node.attribute(ClientListenerProcessor.CLIENT_LISTENER_PORT); + } + + /** + * Return host for given node. + * + * @param node Node. + */ + protected String clientHost(ClusterNode node) { + return F.first(node.addresses()); + } + + /** * Start thin client with configured endpoints to specified nodes. * * @param nodes Nodes to connect. @@ -50,7 +68,7 @@ public abstract class AbstractThinClientTest extends GridCommonAbstractTest { for (int i = 0; i < nodes.length; i++) { ClusterNode node = nodes[i]; - addrs[i] = F.first(node.addresses()) + ":" + node.attribute(ClientListenerProcessor.CLIENT_LISTENER_PORT); + addrs[i] = clientHost(node) + ":" + clientPort(node); } return Ignition.startClient(getClientConfiguration().setAddresses(addrs)); diff --git a/modules/core/src/test/java/org/apache/ignite/internal/client/thin/TimeoutTest.java b/modules/core/src/test/java/org/apache/ignite/internal/client/thin/TimeoutTest.java new file mode 100644 index 0000000..2c7bf88 --- /dev/null +++ b/modules/core/src/test/java/org/apache/ignite/internal/client/thin/TimeoutTest.java @@ -0,0 +1,220 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.client.thin; + +import java.io.InputStream; +import java.io.OutputStream; +import java.net.InetSocketAddress; +import java.net.ServerSocket; +import java.net.Socket; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.CyclicBarrier; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicBoolean; +import org.apache.ignite.Ignite; +import org.apache.ignite.IgniteException; +import org.apache.ignite.Ignition; +import org.apache.ignite.cache.CacheAtomicityMode; +import org.apache.ignite.client.ClientCache; +import org.apache.ignite.client.ClientCacheConfiguration; +import org.apache.ignite.client.ClientConnectionException; +import org.apache.ignite.client.ClientException; +import org.apache.ignite.client.ClientTransaction; +import org.apache.ignite.client.IgniteClient; +import org.apache.ignite.configuration.ClientConfiguration; +import org.apache.ignite.configuration.ClientConnectorConfiguration; +import org.apache.ignite.configuration.IgniteConfiguration; +import org.apache.ignite.internal.IgniteInternalFuture; +import org.apache.ignite.internal.binary.streams.BinaryHeapOutputStream; +import org.apache.ignite.internal.binary.streams.BinaryOutputStream; +import org.apache.ignite.internal.util.typedef.internal.U; +import org.apache.ignite.testframework.GridTestUtils; +import org.junit.Test; + +import static org.apache.ignite.configuration.ClientConnectorConfiguration.DFLT_PORT; +import static org.apache.ignite.transactions.TransactionConcurrency.PESSIMISTIC; +import static org.apache.ignite.transactions.TransactionIsolation.REPEATABLE_READ; + +/** + * Thin client timeouts tests. + */ +public class TimeoutTest extends AbstractThinClientTest { + /** + * Default timeout value. + */ + private static final int TIMEOUT = 500; + + /** {@inheritDoc} */ + @Override protected IgniteConfiguration getConfiguration(String igniteInstanceName) throws Exception { + return super.getConfiguration(igniteInstanceName).setClientConnectorConfiguration( + new ClientConnectorConfiguration().setHandshakeTimeout(TIMEOUT)); + } + + /** {@inheritDoc} */ + @Override protected ClientConfiguration getClientConfiguration() { + return super.getClientConfiguration().setTimeout(TIMEOUT); + } + + /** + * Test that server closes thin client connection in case of handshake timeout. + */ + @Test + public void testServerClosesThinClientConnectionOnHandshakeTimeout() { + try (Ignite ignite = startGrid(0)) { + long ts0 = System.currentTimeMillis(); + + Socket s = new Socket(); + + s.connect(new InetSocketAddress(clientHost(ignite.cluster().localNode()), + clientPort(ignite.cluster().localNode())), 0); + + s.setSoTimeout(TIMEOUT * 2); + + OutputStream os = s.getOutputStream(); + + try (BinaryOutputStream bos = new BinaryHeapOutputStream(32)) { + bos.writeInt(1000); // Size. + + os.write(bos.arrayCopy()); + os.flush(); + + InputStream is = s.getInputStream(); + + assertEquals(-1, is.read()); // Connection and stream closed by server after timeout. + + long ts1 = System.currentTimeMillis(); + + assertTrue("Unexpected timeout [ts0=" + ts0 + ", ts1=" + ts1 + ']', + ts1 - ts0 >= TIMEOUT && ts1 - ts0 < TIMEOUT * 2); + } + finally { + s.close(); + } + } + catch (Exception e) { + fail("Exception while sending message: " + e.getMessage()); + } + } + + /** + * Test client timeout on handshake. + */ + @Test + @SuppressWarnings("ThrowableNotThrown") + public void testClientTimeoutOnHandshake() throws Exception { + ServerSocket sock = new ServerSocket(); + + sock.bind(new InetSocketAddress("127.0.0.1", DFLT_PORT)); + + AtomicBoolean connectionAccepted = new AtomicBoolean(); + + CountDownLatch latch = new CountDownLatch(1); + + IgniteInternalFuture<?> fut = GridTestUtils.runAsync(() -> { + try { + Socket accepted = sock.accept(); + + connectionAccepted.set(true); + + latch.await(TIMEOUT * 2, TimeUnit.MILLISECONDS); + + U.closeQuiet(accepted); + } + catch (Exception e) { + throw new IgniteException("Accept thread failed: " + e.getMessage(), e); + } + }); + + long ts0 = System.currentTimeMillis(); + + try { + GridTestUtils.assertThrowsWithCause( + (Runnable)() -> Ignition.startClient(getClientConfiguration().setAddresses("127.0.0.1:" + DFLT_PORT)), + ClientConnectionException.class); + } + finally { + latch.countDown(); + } + + U.closeQuiet(sock); + + assertTrue(connectionAccepted.get()); + + long ts1 = System.currentTimeMillis(); + + assertTrue("Unexpected timeout [ts0=" + ts0 + ", ts1=" + ts1 + ']', + ts1 - ts0 >= TIMEOUT && ts1 - ts0 < TIMEOUT * 2); + + fut.get(); + } + + /** + * Test client timeout on operation. + */ + @Test + @SuppressWarnings("ThrowableNotThrown") + public void testClientTimeoutOnOperation() throws Exception { + try (Ignite ignite = startGrid(0)) { + try (IgniteClient client = startClient(0)) { + ClientCache<Object, Object> cache = client.getOrCreateCache(new ClientCacheConfiguration() + .setName("cache").setAtomicityMode(CacheAtomicityMode.TRANSACTIONAL)); + + doSleep(TIMEOUT * 2); + + // Should not fail if connection is idle. + cache.put(0, 0); + + CyclicBarrier barrier = new CyclicBarrier(2); + + IgniteInternalFuture<?> fut = GridTestUtils.runAsync(() -> { + try (ClientTransaction tx = client.transactions().txStart(PESSIMISTIC, REPEATABLE_READ)) { + cache.put(0, 0); + + barrier.await(TIMEOUT * 2, TimeUnit.MILLISECONDS); + barrier.await(TIMEOUT * 2, TimeUnit.MILLISECONDS); + } + catch (Exception e) { + throw new IgniteException(e); + } + }); + + // Wait for the key locked. + barrier.await(TIMEOUT * 2, TimeUnit.MILLISECONDS); + + long ts0 = System.currentTimeMillis(); + + try (ClientTransaction tx = client.transactions().txStart(PESSIMISTIC, REPEATABLE_READ)) { + try { + GridTestUtils.assertThrowsWithCause(() -> cache.put(0, 0), ClientException.class); + } + finally { + // To unlock another thread. + barrier.await(TIMEOUT * 2, TimeUnit.MILLISECONDS); + } + } + + long ts1 = System.currentTimeMillis(); + + assertTrue("Unexpected timeout [ts0=" + ts0 + ", ts1=" + ts1 + ']', + ts1 - ts0 >= TIMEOUT && ts1 - ts0 < TIMEOUT * 2); + + fut.get(); + } + } + } +} diff --git a/modules/indexing/src/test/java/org/apache/ignite/client/ClientTestSuite.java b/modules/indexing/src/test/java/org/apache/ignite/client/ClientTestSuite.java index 55a2461..48d346f 100644 --- a/modules/indexing/src/test/java/org/apache/ignite/client/ClientTestSuite.java +++ b/modules/indexing/src/test/java/org/apache/ignite/client/ClientTestSuite.java @@ -27,6 +27,7 @@ import org.apache.ignite.internal.client.thin.ThinClientPartitionAwarenessDiscov import org.apache.ignite.internal.client.thin.ThinClientPartitionAwarenessResourceReleaseTest; import org.apache.ignite.internal.client.thin.ThinClientPartitionAwarenessStableTopologyTest; import org.apache.ignite.internal.client.thin.ThinClientPartitionAwarenessUnstableTopologyTest; +import org.apache.ignite.internal.client.thin.TimeoutTest; import org.junit.runner.RunWith; import org.junit.runners.Suite; @@ -60,7 +61,8 @@ import org.junit.runners.Suite; ThinClientPartitionAwarenessResourceReleaseTest.class, ThinClientPartitionAwarenessDiscoveryTest.class, ReliableChannelTest.class, - CacheAsyncTest.class + CacheAsyncTest.class, + TimeoutTest.class }) public class ClientTestSuite { // No-op.