This is an automated email from the ASF dual-hosted git repository.
namelchev pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/ignite.git
The following commit(s) were added to refs/heads/master by this push:
new 075a76a144f IGNITE-18826 Java thin client: fixed the pending requests
race on close (#10549)
075a76a144f is described below
commit 075a76a144fbf3a02b928208aeee60aa7084a930
Author: Nikita Amelchev <[email protected]>
AuthorDate: Mon Feb 20 13:50:55 2023 +0300
IGNITE-18826 Java thin client: fixed the pending requests race on close
(#10549)
---
.../internal/client/thin/TcpClientChannel.java | 53 ++++++++--
.../GridNioClientConnectionMultiplexer.java | 4 +-
...ientPartitionAwarenessUnstableTopologyTest.java | 117 +++++++++++++++++++++
3 files changed, 162 insertions(+), 12 deletions(-)
diff --git
a/modules/core/src/main/java/org/apache/ignite/internal/client/thin/TcpClientChannel.java
b/modules/core/src/main/java/org/apache/ignite/internal/client/thin/TcpClientChannel.java
index 062d4bd0731..cdf894d9495 100644
---
a/modules/core/src/main/java/org/apache/ignite/internal/client/thin/TcpClientChannel.java
+++
b/modules/core/src/main/java/org/apache/ignite/internal/client/thin/TcpClientChannel.java
@@ -135,6 +135,9 @@ class TcpClientChannel implements ClientChannel,
ClientMessageHandler, ClientCon
/** Pending requests. */
private final Map<Long, ClientRequestFuture> pendingReqs = new
ConcurrentHashMap<>();
+ /** Lock to safely close pending requests. */
+ private final ReadWriteLock pendingReqsLock = new ReentrantReadWriteLock();
+
/** Topology change listeners. */
private final Collection<Consumer<ClientChannel>> topChangeLsnrs = new
CopyOnWriteArrayList<>();
@@ -273,8 +276,15 @@ class TcpClientChannel implements ClientChannel,
ClientMessageHandler, ClientCon
U.closeQuiet(sock);
- for (ClientRequestFuture pendingReq : pendingReqs.values())
- pendingReq.onDone(new ClientConnectionException("Channel is
closed", cause));
+ pendingReqsLock.writeLock().lock();
+
+ try {
+ for (ClientRequestFuture pendingReq : pendingReqs.values())
+ pendingReq.onDone(new ClientConnectionException("Channel
is closed", cause));
+ }
+ finally {
+ pendingReqsLock.writeLock().unlock();
+ }
notificationLsnrsGuard.readLock().lock();
@@ -333,17 +343,26 @@ class TcpClientChannel implements ClientChannel,
ClientMessageHandler, ClientCon
PayloadOutputChannel payloadCh = new PayloadOutputChannel(this);
try {
- if (closed()) {
- ClientConnectionException err = new
ClientConnectionException("Channel is closed");
+ ClientRequestFuture fut;
- eventListener.onRequestFail(connDesc, id, op.code(),
op.name(), System.nanoTime() - startTimeNanos, err);
+ pendingReqsLock.readLock().lock();
- throw err;
- }
+ try {
+ if (closed()) {
+ ClientConnectionException err = new
ClientConnectionException("Channel is closed");
+
+ eventListener.onRequestFail(connDesc, id, op.code(),
op.name(), System.nanoTime() - startTimeNanos, err);
+
+ throw err;
+ }
- ClientRequestFuture fut = new ClientRequestFuture(id, op,
startTimeNanos);
+ fut = new ClientRequestFuture(id, op, startTimeNanos);
- pendingReqs.put(id, fut);
+ pendingReqs.put(id, fut);
+ }
+ finally {
+ pendingReqsLock.readLock().unlock();
+ }
eventListener.onRequestStart(connDesc, id, op.code(), op.name());
@@ -691,9 +710,21 @@ class TcpClientChannel implements ClientChannel,
ClientMessageHandler, ClientCon
new ProtocolContext(ver).toString(), null));
while (true) {
- ClientRequestFuture fut = new ClientRequestFuture(requestId,
ClientOperation.HANDSHAKE);
+ ClientRequestFuture fut;
+
+ pendingReqsLock.readLock().lock();
- pendingReqs.put(requestId, fut);
+ try {
+ if (closed())
+ throw new ClientConnectionException("Channel is closed");
+
+ fut = new ClientRequestFuture(requestId,
ClientOperation.HANDSHAKE);
+
+ pendingReqs.put(requestId, fut);
+ }
+ finally {
+ pendingReqsLock.readLock().unlock();
+ }
handshakeReq(ver, user, pwd, userAttrs);
diff --git
a/modules/core/src/main/java/org/apache/ignite/internal/client/thin/io/gridnioserver/GridNioClientConnectionMultiplexer.java
b/modules/core/src/main/java/org/apache/ignite/internal/client/thin/io/gridnioserver/GridNioClientConnectionMultiplexer.java
index 8fe96e91c7a..7cf78735cf2 100644
---
a/modules/core/src/main/java/org/apache/ignite/internal/client/thin/io/gridnioserver/GridNioClientConnectionMultiplexer.java
+++
b/modules/core/src/main/java/org/apache/ignite/internal/client/thin/io/gridnioserver/GridNioClientConnectionMultiplexer.java
@@ -26,7 +26,6 @@ import java.util.Map;
import java.util.concurrent.locks.ReadWriteLock;
import java.util.concurrent.locks.ReentrantReadWriteLock;
import javax.net.ssl.SSLContext;
-
import org.apache.ignite.IgniteCheckedException;
import org.apache.ignite.IgniteException;
import org.apache.ignite.IgniteLogger;
@@ -156,6 +155,9 @@ public class GridNioClientConnectionMultiplexer implements
ClientConnectionMulti
GridNioFuture<GridNioSession> sesFut = srv.createSession(ch, meta,
false, null);
+ if (sesFut.error() != null)
+ sesFut.get();
+
if (sslHandshakeFut != null)
sslHandshakeFut.get();
diff --git
a/modules/core/src/test/java/org/apache/ignite/internal/client/thin/ThinClientPartitionAwarenessUnstableTopologyTest.java
b/modules/core/src/test/java/org/apache/ignite/internal/client/thin/ThinClientPartitionAwarenessUnstableTopologyTest.java
index dfe93a97c69..a413d627a57 100644
---
a/modules/core/src/test/java/org/apache/ignite/internal/client/thin/ThinClientPartitionAwarenessUnstableTopologyTest.java
+++
b/modules/core/src/test/java/org/apache/ignite/internal/client/thin/ThinClientPartitionAwarenessUnstableTopologyTest.java
@@ -17,17 +17,50 @@
package org.apache.ignite.internal.client.thin;
+import java.nio.ByteBuffer;
import java.util.Arrays;
+import java.util.Collection;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
+import org.apache.ignite.Ignition;
import org.apache.ignite.client.ClientCache;
+import org.apache.ignite.client.ClientConnectionException;
+import org.apache.ignite.client.IgniteClient;
+import org.apache.ignite.client.SslMode;
+import org.apache.ignite.client.events.ConnectionEventListener;
+import org.apache.ignite.client.events.HandshakeStartEvent;
+import org.apache.ignite.configuration.ClientConfiguration;
+import org.apache.ignite.configuration.ClientConnectorConfiguration;
+import org.apache.ignite.configuration.IgniteConfiguration;
import org.apache.ignite.internal.processors.cache.IgniteInternalCache;
import org.apache.ignite.internal.processors.odbc.ClientListenerProcessor;
+import org.apache.ignite.internal.util.nio.GridNioServer;
import org.apache.ignite.mxbean.ClientProcessorMXBean;
+import org.apache.ignite.testframework.GridTestUtils;
import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+
+import static org.apache.ignite.testframework.GridTestUtils.getFieldValue;
+import static org.apache.ignite.testframework.GridTestUtils.setFieldValue;
+import static org.apache.ignite.testframework.GridTestUtils.waitForCondition;
/**
* Test partition awareness of thin client on unstable topology.
*/
+@RunWith(Parameterized.class)
public class ThinClientPartitionAwarenessUnstableTopologyTest extends
ThinClientAbstractPartitionAwarenessTest {
+ /** */
+ @Parameterized.Parameter
+ public boolean sslEnabled;
+
+ /** @return Test parameters. */
+ @Parameterized.Parameters(name = "sslEnabled={0}")
+ public static Collection<?> parameters() {
+ return Arrays.asList(new Object[][] {{false}, {true}});
+ }
+
/** {@inheritDoc} */
@Override protected void afterTest() throws Exception {
super.afterTest();
@@ -35,6 +68,33 @@ public class
ThinClientPartitionAwarenessUnstableTopologyTest extends ThinClient
stopAllGrids();
}
+ /** {@inheritDoc} */
+ @Override protected IgniteConfiguration getConfiguration(String
igniteInstanceName) throws Exception {
+ IgniteConfiguration cfg = super.getConfiguration(igniteInstanceName);
+
+ if (sslEnabled) {
+ cfg.setClientConnectorConfiguration(new
ClientConnectorConfiguration()
+ .setSslEnabled(true)
+ .setSslClientAuth(true)
+ .setUseIgniteSslContextFactory(false)
+ .setSslContextFactory(GridTestUtils.sslFactory()));
+ }
+
+ return cfg;
+ }
+
+ /** {@inheritDoc} */
+ @Override protected ClientConfiguration getClientConfiguration(int...
nodeIdxs) {
+ ClientConfiguration cfg = super.getClientConfiguration(nodeIdxs);
+
+ if (sslEnabled) {
+ cfg.setSslMode(SslMode.REQUIRED)
+ .setSslContextFactory(GridTestUtils.sslFactory());
+ }
+
+ return cfg;
+ }
+
/**
* Test that join of the new node is detected by the client and affects
partition awareness.
*/
@@ -203,4 +263,61 @@ public class
ThinClientPartitionAwarenessUnstableTopologyTest extends ThinClient
assertOpOnChannel(opCh, ClientOperation.CACHE_PUT);
}
}
+
+ /** */
+ @Test
+ public void testSessionCloseBeforeHandshake() throws Exception {
+ startGrid(0);
+
+ ClientConfiguration cliCfg = getClientConfiguration(0)
+ .setEventListeners(new ConnectionEventListener() {
+ @Override public void onHandshakeStart(HandshakeStartEvent
event) {
+ // Close connection.
+ stopAllGrids();
+ }
+ });
+
+ GridTestUtils.assertThrowsWithCause(() -> {
+ try (IgniteClient client = Ignition.startClient(cliCfg)) {
+ return client;
+ }
+ }, ClientConnectionException.class);
+ }
+
+ /** */
+ @Test
+ public void testCreateSessionAfterClose() throws Exception {
+ startGrids(2);
+
+ CountDownLatch srvStopped = new CountDownLatch(1);
+
+ AtomicBoolean dfltInited = new AtomicBoolean();
+
+ // The client should close pending requests on closing without waiting.
+ try (TcpIgniteClient client = new TcpIgniteClient((cfg, connMgr) -> {
+ // Skip default channel to successful client start.
+ if (!dfltInited.compareAndSet(false, true)) {
+ try {
+ // Connection manager should be stopped before opening a
new connection.
+ srvStopped.await(getTestTimeout(), TimeUnit.MILLISECONDS);
+ }
+ catch (InterruptedException ignored) {
+ // No-op.
+ }
+ }
+
+ return new TcpClientChannel(cfg, connMgr);
+ }, getClientConfiguration(0))) {
+ GridNioServer<ByteBuffer> srv =
getFieldValue(client.reliableChannel(), "connMgr", "srv");
+
+ // Make sure handshake data will not be recieved.
+ setFieldValue(srv, "skipRead", true);
+
+ GridTestUtils.runAsync(() -> {
+ assertTrue(waitForCondition(() -> getFieldValue(srv,
"closed"), getTestTimeout()));
+
+ srvStopped.countDown();
+ });
+ }
+ }
}