This is an automated email from the ASF dual-hosted git repository.

namelchev pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/ignite.git


The following commit(s) were added to refs/heads/master by this push:
     new 075a76a144f IGNITE-18826 Java thin client: fixed the pending requests 
race on close (#10549)
075a76a144f is described below

commit 075a76a144fbf3a02b928208aeee60aa7084a930
Author: Nikita Amelchev <[email protected]>
AuthorDate: Mon Feb 20 13:50:55 2023 +0300

    IGNITE-18826 Java thin client: fixed the pending requests race on close 
(#10549)
---
 .../internal/client/thin/TcpClientChannel.java     |  53 ++++++++--
 .../GridNioClientConnectionMultiplexer.java        |   4 +-
 ...ientPartitionAwarenessUnstableTopologyTest.java | 117 +++++++++++++++++++++
 3 files changed, 162 insertions(+), 12 deletions(-)

diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/client/thin/TcpClientChannel.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/client/thin/TcpClientChannel.java
index 062d4bd0731..cdf894d9495 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/client/thin/TcpClientChannel.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/client/thin/TcpClientChannel.java
@@ -135,6 +135,9 @@ class TcpClientChannel implements ClientChannel, 
ClientMessageHandler, ClientCon
     /** Pending requests. */
     private final Map<Long, ClientRequestFuture> pendingReqs = new 
ConcurrentHashMap<>();
 
+    /** Lock to safely close pending requests. */
+    private final ReadWriteLock pendingReqsLock = new ReentrantReadWriteLock();
+
     /** Topology change listeners. */
     private final Collection<Consumer<ClientChannel>> topChangeLsnrs = new 
CopyOnWriteArrayList<>();
 
@@ -273,8 +276,15 @@ class TcpClientChannel implements ClientChannel, 
ClientMessageHandler, ClientCon
 
             U.closeQuiet(sock);
 
-            for (ClientRequestFuture pendingReq : pendingReqs.values())
-                pendingReq.onDone(new ClientConnectionException("Channel is 
closed", cause));
+            pendingReqsLock.writeLock().lock();
+
+            try {
+                for (ClientRequestFuture pendingReq : pendingReqs.values())
+                    pendingReq.onDone(new ClientConnectionException("Channel 
is closed", cause));
+            }
+            finally {
+                pendingReqsLock.writeLock().unlock();
+            }
 
             notificationLsnrsGuard.readLock().lock();
 
@@ -333,17 +343,26 @@ class TcpClientChannel implements ClientChannel, 
ClientMessageHandler, ClientCon
         PayloadOutputChannel payloadCh = new PayloadOutputChannel(this);
 
         try {
-            if (closed()) {
-                ClientConnectionException err = new 
ClientConnectionException("Channel is closed");
+            ClientRequestFuture fut;
 
-                eventListener.onRequestFail(connDesc, id, op.code(), 
op.name(), System.nanoTime() - startTimeNanos, err);
+            pendingReqsLock.readLock().lock();
 
-                throw err;
-            }
+            try {
+                if (closed()) {
+                    ClientConnectionException err = new 
ClientConnectionException("Channel is closed");
+
+                    eventListener.onRequestFail(connDesc, id, op.code(), 
op.name(), System.nanoTime() - startTimeNanos, err);
+
+                    throw err;
+                }
 
-            ClientRequestFuture fut = new ClientRequestFuture(id, op, 
startTimeNanos);
+                fut = new ClientRequestFuture(id, op, startTimeNanos);
 
-            pendingReqs.put(id, fut);
+                pendingReqs.put(id, fut);
+            }
+            finally {
+                pendingReqsLock.readLock().unlock();
+            }
 
             eventListener.onRequestStart(connDesc, id, op.code(), op.name());
 
@@ -691,9 +710,21 @@ class TcpClientChannel implements ClientChannel, 
ClientMessageHandler, ClientCon
             new ProtocolContext(ver).toString(), null));
 
         while (true) {
-            ClientRequestFuture fut = new ClientRequestFuture(requestId, 
ClientOperation.HANDSHAKE);
+            ClientRequestFuture fut;
+
+            pendingReqsLock.readLock().lock();
 
-            pendingReqs.put(requestId, fut);
+            try {
+                if (closed())
+                    throw new ClientConnectionException("Channel is closed");
+
+                fut = new ClientRequestFuture(requestId, 
ClientOperation.HANDSHAKE);
+
+                pendingReqs.put(requestId, fut);
+            }
+            finally {
+                pendingReqsLock.readLock().unlock();
+            }
 
             handshakeReq(ver, user, pwd, userAttrs);
 
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/client/thin/io/gridnioserver/GridNioClientConnectionMultiplexer.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/client/thin/io/gridnioserver/GridNioClientConnectionMultiplexer.java
index 8fe96e91c7a..7cf78735cf2 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/client/thin/io/gridnioserver/GridNioClientConnectionMultiplexer.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/client/thin/io/gridnioserver/GridNioClientConnectionMultiplexer.java
@@ -26,7 +26,6 @@ import java.util.Map;
 import java.util.concurrent.locks.ReadWriteLock;
 import java.util.concurrent.locks.ReentrantReadWriteLock;
 import javax.net.ssl.SSLContext;
-
 import org.apache.ignite.IgniteCheckedException;
 import org.apache.ignite.IgniteException;
 import org.apache.ignite.IgniteLogger;
@@ -156,6 +155,9 @@ public class GridNioClientConnectionMultiplexer implements 
ClientConnectionMulti
 
             GridNioFuture<GridNioSession> sesFut = srv.createSession(ch, meta, 
false, null);
 
+            if (sesFut.error() != null)
+                sesFut.get();
+
             if (sslHandshakeFut != null)
                 sslHandshakeFut.get();
 
diff --git 
a/modules/core/src/test/java/org/apache/ignite/internal/client/thin/ThinClientPartitionAwarenessUnstableTopologyTest.java
 
b/modules/core/src/test/java/org/apache/ignite/internal/client/thin/ThinClientPartitionAwarenessUnstableTopologyTest.java
index dfe93a97c69..a413d627a57 100644
--- 
a/modules/core/src/test/java/org/apache/ignite/internal/client/thin/ThinClientPartitionAwarenessUnstableTopologyTest.java
+++ 
b/modules/core/src/test/java/org/apache/ignite/internal/client/thin/ThinClientPartitionAwarenessUnstableTopologyTest.java
@@ -17,17 +17,50 @@
 
 package org.apache.ignite.internal.client.thin;
 
+import java.nio.ByteBuffer;
 import java.util.Arrays;
+import java.util.Collection;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
+import org.apache.ignite.Ignition;
 import org.apache.ignite.client.ClientCache;
+import org.apache.ignite.client.ClientConnectionException;
+import org.apache.ignite.client.IgniteClient;
+import org.apache.ignite.client.SslMode;
+import org.apache.ignite.client.events.ConnectionEventListener;
+import org.apache.ignite.client.events.HandshakeStartEvent;
+import org.apache.ignite.configuration.ClientConfiguration;
+import org.apache.ignite.configuration.ClientConnectorConfiguration;
+import org.apache.ignite.configuration.IgniteConfiguration;
 import org.apache.ignite.internal.processors.cache.IgniteInternalCache;
 import org.apache.ignite.internal.processors.odbc.ClientListenerProcessor;
+import org.apache.ignite.internal.util.nio.GridNioServer;
 import org.apache.ignite.mxbean.ClientProcessorMXBean;
+import org.apache.ignite.testframework.GridTestUtils;
 import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+
+import static org.apache.ignite.testframework.GridTestUtils.getFieldValue;
+import static org.apache.ignite.testframework.GridTestUtils.setFieldValue;
+import static org.apache.ignite.testframework.GridTestUtils.waitForCondition;
 
 /**
  * Test partition awareness of thin client on unstable topology.
  */
+@RunWith(Parameterized.class)
 public class ThinClientPartitionAwarenessUnstableTopologyTest extends 
ThinClientAbstractPartitionAwarenessTest {
+    /** */
+    @Parameterized.Parameter
+    public boolean sslEnabled;
+
+    /** @return Test parameters. */
+    @Parameterized.Parameters(name = "sslEnabled={0}")
+    public static Collection<?> parameters() {
+        return Arrays.asList(new Object[][] {{false}, {true}});
+    }
+
     /** {@inheritDoc} */
     @Override protected void afterTest() throws Exception {
         super.afterTest();
@@ -35,6 +68,33 @@ public class 
ThinClientPartitionAwarenessUnstableTopologyTest extends ThinClient
         stopAllGrids();
     }
 
+    /** {@inheritDoc} */
+    @Override protected IgniteConfiguration getConfiguration(String 
igniteInstanceName) throws Exception {
+        IgniteConfiguration cfg = super.getConfiguration(igniteInstanceName);
+
+        if (sslEnabled) {
+            cfg.setClientConnectorConfiguration(new 
ClientConnectorConfiguration()
+                .setSslEnabled(true)
+                .setSslClientAuth(true)
+                .setUseIgniteSslContextFactory(false)
+                .setSslContextFactory(GridTestUtils.sslFactory()));
+        }
+
+        return cfg;
+    }
+
+    /** {@inheritDoc} */
+    @Override protected ClientConfiguration getClientConfiguration(int... 
nodeIdxs) {
+        ClientConfiguration cfg = super.getClientConfiguration(nodeIdxs);
+
+        if (sslEnabled) {
+            cfg.setSslMode(SslMode.REQUIRED)
+                .setSslContextFactory(GridTestUtils.sslFactory());
+        }
+
+        return cfg;
+    }
+
     /**
      * Test that join of the new node is detected by the client and affects 
partition awareness.
      */
@@ -203,4 +263,61 @@ public class 
ThinClientPartitionAwarenessUnstableTopologyTest extends ThinClient
             assertOpOnChannel(opCh, ClientOperation.CACHE_PUT);
         }
     }
+
+    /** */
+    @Test
+    public void testSessionCloseBeforeHandshake() throws Exception {
+        startGrid(0);
+
+        ClientConfiguration cliCfg = getClientConfiguration(0)
+            .setEventListeners(new ConnectionEventListener() {
+                @Override public void onHandshakeStart(HandshakeStartEvent 
event) {
+                    // Close connection.
+                    stopAllGrids();
+                }
+            });
+
+        GridTestUtils.assertThrowsWithCause(() -> {
+            try (IgniteClient client = Ignition.startClient(cliCfg)) {
+                return client;
+            }
+        }, ClientConnectionException.class);
+    }
+
+    /** */
+    @Test
+    public void testCreateSessionAfterClose() throws Exception {
+        startGrids(2);
+
+        CountDownLatch srvStopped = new CountDownLatch(1);
+
+        AtomicBoolean dfltInited = new AtomicBoolean();
+
+        // The client should close pending requests on closing without waiting.
+        try (TcpIgniteClient client = new TcpIgniteClient((cfg, connMgr) -> {
+            // Skip default channel to successful client start.
+            if (!dfltInited.compareAndSet(false, true)) {
+                try {
+                    // Connection manager should be stopped before opening a 
new connection.
+                    srvStopped.await(getTestTimeout(), TimeUnit.MILLISECONDS);
+                }
+                catch (InterruptedException ignored) {
+                    // No-op.
+                }
+            }
+
+            return new TcpClientChannel(cfg, connMgr);
+        }, getClientConfiguration(0))) {
+            GridNioServer<ByteBuffer> srv = 
getFieldValue(client.reliableChannel(), "connMgr", "srv");
+
+            // Make sure handshake data will not be recieved.
+            setFieldValue(srv, "skipRead", true);
+
+            GridTestUtils.runAsync(() -> {
+                assertTrue(waitForCondition(() -> getFieldValue(srv, 
"closed"), getTestTimeout()));
+
+                srvStopped.countDown();
+            });
+        }
+    }
 }

Reply via email to