This is an automated email from the ASF dual-hosted git repository.

alexpl pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/ignite.git


The following commit(s) were added to refs/heads/master by this push:
     new 533a636  IGNITE-13719 Java thin client: Fix timeout on idle connection 
- Fixes #8480.
533a636 is described below

commit 533a6365cece7896e6ade969f085ee41b690759d
Author: Aleksey Plekhanov <plehanov.a...@gmail.com>
AuthorDate: Thu Nov 19 16:47:54 2020 +0300

    IGNITE-13719 Java thin client: Fix timeout on idle connection - Fixes #8480.
    
    Signed-off-by: Aleksey Plekhanov <plehanov.a...@gmail.com>
---
 .../internal/client/thin/TcpClientChannel.java     |  15 +-
 .../client/thin/AbstractThinClientTest.java        |  20 +-
 .../ignite/internal/client/thin/TimeoutTest.java   | 220 +++++++++++++++++++++
 .../org/apache/ignite/client/ClientTestSuite.java  |   4 +-
 4 files changed, 254 insertions(+), 5 deletions(-)

diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/client/thin/TcpClientChannel.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/client/thin/TcpClientChannel.java
index c357672..7c79130 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/client/thin/TcpClientChannel.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/client/thin/TcpClientChannel.java
@@ -168,6 +168,9 @@ class TcpClientChannel implements ClientChannel {
     /** Receiver thread (processes incoming messages). */
     private Thread receiverThread;
 
+    /** Send/receive timeout in milliseconds. */
+    private final int timeout;
+
     /** Constructor. */
     TcpClientChannel(ClientChannelConfiguration cfg)
         throws ClientConnectionException, ClientAuthenticationException, 
ClientProtocolError {
@@ -176,17 +179,23 @@ class TcpClientChannel implements ClientChannel {
         Executor cfgExec = cfg.getAsyncContinuationExecutor();
         asyncContinuationExecutor = cfgExec != null ? cfgExec : 
ForkJoinPool.commonPool();
 
+        timeout = cfg.getTimeout();
+
         try {
             sock = createSocket(cfg);
 
             out = sock.getOutputStream();
             dataInput = new ByteCountingDataInput(sock.getInputStream());
+
+            handshake(DEFAULT_VERSION, cfg.getUserName(), 
cfg.getUserPassword(), cfg.getUserAttributes());
+
+            // Disable timeout on socket after handshake, instead, get future 
result with timeout in "receive" method.
+            if (timeout > 0)
+                sock.setSoTimeout(0);
         }
         catch (IOException e) {
             throw handleIOError("addr=" + cfg.getAddress(), e);
         }
-
-        handshake(DEFAULT_VERSION, cfg.getUserName(), cfg.getUserPassword(), 
cfg.getUserAttributes());
     }
 
     /** {@inheritDoc} */
@@ -303,7 +312,7 @@ class TcpClientChannel implements ClientChannel {
     private <T> T receive(ClientRequestFuture pendingReq, 
Function<PayloadInputChannel, T> payloadReader)
         throws ClientException {
         try {
-            byte[] payload = pendingReq.get();
+            byte[] payload = timeout > 0 ? pendingReq.get(timeout) : 
pendingReq.get();
 
             if (payload == null || payloadReader == null)
                 return null;
diff --git 
a/modules/core/src/test/java/org/apache/ignite/internal/client/thin/AbstractThinClientTest.java
 
b/modules/core/src/test/java/org/apache/ignite/internal/client/thin/AbstractThinClientTest.java
index b985a77..2e2f21d 100644
--- 
a/modules/core/src/test/java/org/apache/ignite/internal/client/thin/AbstractThinClientTest.java
+++ 
b/modules/core/src/test/java/org/apache/ignite/internal/client/thin/AbstractThinClientTest.java
@@ -39,6 +39,24 @@ public abstract class AbstractThinClientTest extends 
GridCommonAbstractTest {
     }
 
     /**
+     * Return thin client port for given node.
+     *
+     * @param node Node.
+     */
+    protected int clientPort(ClusterNode node) {
+        return node.attribute(ClientListenerProcessor.CLIENT_LISTENER_PORT);
+    }
+
+    /**
+     * Return host for given node.
+     *
+     * @param node Node.
+     */
+    protected String clientHost(ClusterNode node) {
+        return F.first(node.addresses());
+    }
+
+    /**
      * Start thin client with configured endpoints to specified nodes.
      *
      * @param nodes Nodes to connect.
@@ -50,7 +68,7 @@ public abstract class AbstractThinClientTest extends 
GridCommonAbstractTest {
         for (int i = 0; i < nodes.length; i++) {
             ClusterNode node = nodes[i];
 
-            addrs[i] = F.first(node.addresses()) + ":" + 
node.attribute(ClientListenerProcessor.CLIENT_LISTENER_PORT);
+            addrs[i] = clientHost(node) + ":" + clientPort(node);
         }
 
         return 
Ignition.startClient(getClientConfiguration().setAddresses(addrs));
diff --git 
a/modules/core/src/test/java/org/apache/ignite/internal/client/thin/TimeoutTest.java
 
b/modules/core/src/test/java/org/apache/ignite/internal/client/thin/TimeoutTest.java
new file mode 100644
index 0000000..2c7bf88
--- /dev/null
+++ 
b/modules/core/src/test/java/org/apache/ignite/internal/client/thin/TimeoutTest.java
@@ -0,0 +1,220 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.client.thin;
+
+import java.io.InputStream;
+import java.io.OutputStream;
+import java.net.InetSocketAddress;
+import java.net.ServerSocket;
+import java.net.Socket;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.CyclicBarrier;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
+import org.apache.ignite.Ignite;
+import org.apache.ignite.IgniteException;
+import org.apache.ignite.Ignition;
+import org.apache.ignite.cache.CacheAtomicityMode;
+import org.apache.ignite.client.ClientCache;
+import org.apache.ignite.client.ClientCacheConfiguration;
+import org.apache.ignite.client.ClientConnectionException;
+import org.apache.ignite.client.ClientException;
+import org.apache.ignite.client.ClientTransaction;
+import org.apache.ignite.client.IgniteClient;
+import org.apache.ignite.configuration.ClientConfiguration;
+import org.apache.ignite.configuration.ClientConnectorConfiguration;
+import org.apache.ignite.configuration.IgniteConfiguration;
+import org.apache.ignite.internal.IgniteInternalFuture;
+import org.apache.ignite.internal.binary.streams.BinaryHeapOutputStream;
+import org.apache.ignite.internal.binary.streams.BinaryOutputStream;
+import org.apache.ignite.internal.util.typedef.internal.U;
+import org.apache.ignite.testframework.GridTestUtils;
+import org.junit.Test;
+
+import static 
org.apache.ignite.configuration.ClientConnectorConfiguration.DFLT_PORT;
+import static 
org.apache.ignite.transactions.TransactionConcurrency.PESSIMISTIC;
+import static 
org.apache.ignite.transactions.TransactionIsolation.REPEATABLE_READ;
+
+/**
+ * Thin client timeouts tests.
+ */
+public class TimeoutTest extends AbstractThinClientTest {
+    /**
+     * Default timeout value.
+     */
+    private static final int TIMEOUT = 500;
+
+    /** {@inheritDoc} */
+    @Override protected IgniteConfiguration getConfiguration(String 
igniteInstanceName) throws Exception {
+        return 
super.getConfiguration(igniteInstanceName).setClientConnectorConfiguration(
+            new ClientConnectorConfiguration().setHandshakeTimeout(TIMEOUT));
+    }
+
+    /** {@inheritDoc} */
+    @Override protected ClientConfiguration getClientConfiguration() {
+        return super.getClientConfiguration().setTimeout(TIMEOUT);
+    }
+
+    /**
+     * Test that server closes thin client connection in case of handshake 
timeout.
+     */
+    @Test
+    public void testServerClosesThinClientConnectionOnHandshakeTimeout() {
+        try (Ignite ignite = startGrid(0)) {
+            long ts0 = System.currentTimeMillis();
+
+            Socket s = new Socket();
+
+            s.connect(new 
InetSocketAddress(clientHost(ignite.cluster().localNode()),
+                clientPort(ignite.cluster().localNode())), 0);
+
+            s.setSoTimeout(TIMEOUT * 2);
+
+            OutputStream os = s.getOutputStream();
+
+            try (BinaryOutputStream bos = new BinaryHeapOutputStream(32)) {
+                bos.writeInt(1000); // Size.
+
+                os.write(bos.arrayCopy());
+                os.flush();
+
+                InputStream is = s.getInputStream();
+
+                assertEquals(-1, is.read()); // Connection and stream closed 
by server after timeout.
+
+                long ts1 = System.currentTimeMillis();
+
+                assertTrue("Unexpected timeout [ts0=" + ts0 + ", ts1=" + ts1 + 
']',
+                    ts1 - ts0 >= TIMEOUT && ts1 - ts0 < TIMEOUT * 2);
+            }
+            finally {
+                s.close();
+            }
+        }
+        catch (Exception e) {
+            fail("Exception while sending message: " + e.getMessage());
+        }
+    }
+
+    /**
+     * Test client timeout on handshake.
+     */
+    @Test
+    @SuppressWarnings("ThrowableNotThrown")
+    public void testClientTimeoutOnHandshake() throws Exception {
+        ServerSocket sock = new ServerSocket();
+
+        sock.bind(new InetSocketAddress("127.0.0.1", DFLT_PORT));
+
+        AtomicBoolean connectionAccepted = new AtomicBoolean();
+
+        CountDownLatch latch = new CountDownLatch(1);
+
+        IgniteInternalFuture<?> fut = GridTestUtils.runAsync(() -> {
+            try {
+                Socket accepted = sock.accept();
+
+                connectionAccepted.set(true);
+
+                latch.await(TIMEOUT * 2, TimeUnit.MILLISECONDS);
+
+                U.closeQuiet(accepted);
+            }
+            catch (Exception e) {
+                throw new IgniteException("Accept thread failed: " + 
e.getMessage(), e);
+            }
+        });
+
+        long ts0 = System.currentTimeMillis();
+
+        try {
+            GridTestUtils.assertThrowsWithCause(
+                (Runnable)() -> 
Ignition.startClient(getClientConfiguration().setAddresses("127.0.0.1:" + 
DFLT_PORT)),
+                ClientConnectionException.class);
+        }
+        finally {
+            latch.countDown();
+        }
+
+        U.closeQuiet(sock);
+
+        assertTrue(connectionAccepted.get());
+
+        long ts1 = System.currentTimeMillis();
+
+        assertTrue("Unexpected timeout [ts0=" + ts0 + ", ts1=" + ts1 + ']',
+            ts1 - ts0 >= TIMEOUT && ts1 - ts0 < TIMEOUT * 2);
+
+        fut.get();
+    }
+
+    /**
+     * Test client timeout on operation.
+     */
+    @Test
+    @SuppressWarnings("ThrowableNotThrown")
+    public void testClientTimeoutOnOperation() throws Exception {
+        try (Ignite ignite = startGrid(0)) {
+            try (IgniteClient client = startClient(0)) {
+                ClientCache<Object, Object> cache = 
client.getOrCreateCache(new ClientCacheConfiguration()
+                    
.setName("cache").setAtomicityMode(CacheAtomicityMode.TRANSACTIONAL));
+
+                doSleep(TIMEOUT * 2);
+
+                // Should not fail if connection is idle.
+                cache.put(0, 0);
+
+                CyclicBarrier barrier = new CyclicBarrier(2);
+
+                IgniteInternalFuture<?> fut = GridTestUtils.runAsync(() -> {
+                    try (ClientTransaction tx = 
client.transactions().txStart(PESSIMISTIC, REPEATABLE_READ)) {
+                        cache.put(0, 0);
+
+                        barrier.await(TIMEOUT * 2, TimeUnit.MILLISECONDS);
+                        barrier.await(TIMEOUT * 2, TimeUnit.MILLISECONDS);
+                    }
+                    catch (Exception e) {
+                        throw new IgniteException(e);
+                    }
+                });
+
+                // Wait for the key locked.
+                barrier.await(TIMEOUT * 2, TimeUnit.MILLISECONDS);
+
+                long ts0 = System.currentTimeMillis();
+
+                try (ClientTransaction tx = 
client.transactions().txStart(PESSIMISTIC, REPEATABLE_READ)) {
+                    try {
+                        GridTestUtils.assertThrowsWithCause(() -> cache.put(0, 
0), ClientException.class);
+                    }
+                    finally {
+                        // To unlock another thread.
+                        barrier.await(TIMEOUT * 2, TimeUnit.MILLISECONDS);
+                    }
+                }
+
+                long ts1 = System.currentTimeMillis();
+
+                assertTrue("Unexpected timeout [ts0=" + ts0 + ", ts1=" + ts1 + 
']',
+                    ts1 - ts0 >= TIMEOUT && ts1 - ts0 < TIMEOUT * 2);
+
+                fut.get();
+            }
+        }
+    }
+}
diff --git 
a/modules/indexing/src/test/java/org/apache/ignite/client/ClientTestSuite.java 
b/modules/indexing/src/test/java/org/apache/ignite/client/ClientTestSuite.java
index 55a2461..48d346f 100644
--- 
a/modules/indexing/src/test/java/org/apache/ignite/client/ClientTestSuite.java
+++ 
b/modules/indexing/src/test/java/org/apache/ignite/client/ClientTestSuite.java
@@ -27,6 +27,7 @@ import 
org.apache.ignite.internal.client.thin.ThinClientPartitionAwarenessDiscov
 import 
org.apache.ignite.internal.client.thin.ThinClientPartitionAwarenessResourceReleaseTest;
 import 
org.apache.ignite.internal.client.thin.ThinClientPartitionAwarenessStableTopologyTest;
 import 
org.apache.ignite.internal.client.thin.ThinClientPartitionAwarenessUnstableTopologyTest;
+import org.apache.ignite.internal.client.thin.TimeoutTest;
 import org.junit.runner.RunWith;
 import org.junit.runners.Suite;
 
@@ -60,7 +61,8 @@ import org.junit.runners.Suite;
     ThinClientPartitionAwarenessResourceReleaseTest.class,
     ThinClientPartitionAwarenessDiscoveryTest.class,
     ReliableChannelTest.class,
-    CacheAsyncTest.class
+    CacheAsyncTest.class,
+    TimeoutTest.class
 })
 public class ClientTestSuite {
     // No-op.

Reply via email to