This is an automated email from the ASF dual-hosted git repository.
sergeychugunov pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/ignite.git
The following commit(s) were added to refs/heads/master by this push:
new a1823267f67 IGNITE-27722 Split TcpDiscoveryIoSession to fix race in
RingMessageWorker (#12731)
a1823267f67 is described below
commit a1823267f673a3ac71017347d1fd129e11a5c4cf
Author: Sergey Chugunov <[email protected]>
AuthorDate: Fri Feb 20 20:12:50 2026 +0400
IGNITE-27722 Split TcpDiscoveryIoSession to fix race in RingMessageWorker
(#12731)
---
.../ignite/spi/discovery/tcp/ServerImpl.java | 87 +++++++++++---
.../spi/discovery/tcp/TcpDiscoveryIoSession.java | 28 +----
.../tcp/TcpDiscoveryMessageSerializer.java | 71 ++++++++++++
.../spi/discovery/tcp/TestMetricUpdateFailure.java | 127 +++++++++++++++++++++
.../IgniteSpiDiscoverySelfTestSuite.java | 3 +
5 files changed, 277 insertions(+), 39 deletions(-)
diff --git
a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ServerImpl.java
b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ServerImpl.java
index 2f6d431fa9a..e6d3052e902 100644
---
a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ServerImpl.java
+++
b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ServerImpl.java
@@ -100,6 +100,7 @@ import
org.apache.ignite.internal.util.tostring.GridToStringExclude;
import org.apache.ignite.internal.util.typedef.C1;
import org.apache.ignite.internal.util.typedef.F;
import org.apache.ignite.internal.util.typedef.P1;
+import org.apache.ignite.internal.util.typedef.T2;
import org.apache.ignite.internal.util.typedef.X;
import org.apache.ignite.internal.util.typedef.internal.LT;
import org.apache.ignite.internal.util.typedef.internal.S;
@@ -2836,6 +2837,16 @@ class ServerImpl extends TcpDiscoveryImpl {
/** Socket. */
private Socket sock;
+ // This serializer is used exclusively for serializing messages sent
to clients,
+ // as it represents a special case within the RingMessageWorker
workflow.
+ // Generally, both serialization and deserialization of messages
should be handled by TcpDiscoveryIoSession.
+ // However, there are scenarios where the session is not available,
yet messages still need to be sent to clients.
+ // A typical example is a single server with one or more connected
clients.
+ // To address this, we use TcpDiscoveryMessageSerializer, which
includes some code copied from TcpDiscoveryIoSession
+ // and can be instantiated independently of any active session.
+ /** */
+ private final TcpDiscoveryMessageSerializer clientMsgSer = new
TcpDiscoveryMessageSerializer(spi);
+
/** IO session. */
private TcpDiscoveryIoSession ses;
@@ -3243,19 +3254,36 @@ class ServerImpl extends TcpDiscoveryImpl {
if (spi.ensured(msg))
msgHist.add(msg);
+ if (clientMsgWorkers.isEmpty())
+ return;
+
+ byte[] msgBytes = null;
+
+ if (!(msg instanceof TcpDiscoveryNodeAddedMessage)) {
+ try {
+ msgBytes = clientMsgSer.serializeMessage(msg);
+ }
+ catch (IgniteCheckedException | IOException e) {
+ U.error(log, "Failed to serialize message: " + msg, e);
+
+ return;
+ }
+ }
+
for (ClientMessageWorker clientMsgWorker :
clientMsgWorkers.values()) {
+ TcpDiscoveryAbstractMessage msg0 = msg;
+
if (msg instanceof TcpDiscoveryNodeAddedMessage) {
TcpDiscoveryNodeAddedMessage nodeAddedMsg =
(TcpDiscoveryNodeAddedMessage)msg;
if
(clientMsgWorker.clientNodeId.equals(nodeAddedMsg.node().id())) {
- msg = new
TcpDiscoveryNodeAddedMessage(nodeAddedMsg);
+ msg0 = new
TcpDiscoveryNodeAddedMessage(nodeAddedMsg);
- prepareNodeAddedMessage(msg,
clientMsgWorker.clientNodeId, null);
+ prepareNodeAddedMessage(msg0,
clientMsgWorker.clientNodeId, null);
}
}
- // TODO Investigate possible optimizations:
https://issues.apache.org/jira/browse/IGNITE-27722
- clientMsgWorker.addMessage(msg);
+ clientMsgWorker.addMessage(msg0, msgBytes);
}
}
}
@@ -7588,12 +7616,20 @@ class ServerImpl extends TcpDiscoveryImpl {
}
/** */
- private class ClientMessageWorker extends
MessageWorker<TcpDiscoveryAbstractMessage> {
+ private class ClientMessageWorker extends
MessageWorker<T2<TcpDiscoveryAbstractMessage, byte[]>> {
/** Node ID. */
private final UUID clientNodeId;
+ // The code responsible for sending and receiving messages to and from
client nodes represents a special case in ServerImpl,
+ // as it is split into two separate components.
+ // One part, ClientMessageWorker, handles only message sending to
clients and does not process responses.
+ // The other part, which reads messages from clients, is implemented
in SocketReader.
+ // Due to this separation, we don't require a full
TcpDiscoveryIoSession here
+ // and can instead extract just the message-writing functionality.
+ // At the same time, we aim to keep both reading and writing logic
encapsulated within TcpDiscoveryIoSession.
+ // As a result, we need to copy some code from TcpDiscoveryIoSession
into the new class, TcpDiscoveryMessageSerializer.
/** */
- private final TcpDiscoveryIoSession ses;
+ private final TcpDiscoveryMessageSerializer clientMsgSer;
/** Socket. */
private final Socket sock;
@@ -7623,7 +7659,7 @@ class ServerImpl extends TcpDiscoveryImpl {
this.sock = sock;
this.clientNodeId = clientNodeId;
- ses = createSession(sock);
+ clientMsgSer = new TcpDiscoveryMessageSerializer(spi);
lastMetricsUpdateMsgTimeNanos = System.nanoTime();
}
@@ -7655,10 +7691,20 @@ class ServerImpl extends TcpDiscoveryImpl {
* @param msg Message.
*/
void addMessage(TcpDiscoveryAbstractMessage msg) {
+ addMessage(msg, null);
+ }
+
+ /**
+ * @param msg Message.
+ * @param msgBytes Optional message bytes.
+ */
+ void addMessage(TcpDiscoveryAbstractMessage msg, @Nullable byte[]
msgBytes) {
+ T2<TcpDiscoveryAbstractMessage, byte[]> t = new T2<>(msg,
msgBytes);
+
if (msg.highPriority())
- queue.addFirst(msg);
+ queue.addFirst(t);
else
- queue.add(msg);
+ queue.add(t);
DebugLogger log = messageLogger(msg);
@@ -7667,14 +7713,14 @@ class ServerImpl extends TcpDiscoveryImpl {
}
/** {@inheritDoc} */
- @Override protected void processMessage(TcpDiscoveryAbstractMessage
msg) {
+ @Override protected void
processMessage(T2<TcpDiscoveryAbstractMessage, byte[]> msgT) {
boolean success = false;
+ TcpDiscoveryAbstractMessage msg = msgT.get1();
+
try {
assert msg.verified() : msg;
- byte[] msgBytes = ses.serializeMessage(msg);
-
DebugLogger msgLog = messageLogger(msg);
if (msg instanceof TcpDiscoveryClientAckResponse) {
@@ -7696,8 +7742,8 @@ class ServerImpl extends TcpDiscoveryImpl {
+ getLocalNodeId() + ", rmtNodeId=" +
clientNodeId + ", msg=" + msg + ']');
}
- spi.writeToSocket(sock, msg, msgBytes,
spi.failureDetectionTimeoutEnabled() ?
- spi.clientFailureDetectionTimeout() :
spi.getSocketTimeout());
+ writeToSocket(msgT,
spi.failureDetectionTimeoutEnabled() ? spi.clientFailureDetectionTimeout() :
+ spi.getSocketTimeout());
}
}
else {
@@ -7708,7 +7754,7 @@ class ServerImpl extends TcpDiscoveryImpl {
assert topologyInitialized(msg) : msg;
- spi.writeToSocket(sock, msg, msgBytes,
spi.getEffectiveSocketTimeout(false));
+ writeToSocket(msgT, spi.getEffectiveSocketTimeout(false));
}
boolean clientFailed = msg instanceof
TcpDiscoveryNodeFailedMessage &&
@@ -7737,6 +7783,17 @@ class ServerImpl extends TcpDiscoveryImpl {
}
}
+ /**
+ * @param msgT Message tuple.
+ * @param timeout Timeout.
+ */
+ private void writeToSocket(T2<TcpDiscoveryAbstractMessage, byte[]>
msgT, long timeout)
+ throws IgniteCheckedException, IOException {
+ byte[] msgBytes = msgT.get2() == null ?
clientMsgSer.serializeMessage(msgT.get1()) : msgT.get2();
+
+ spi.writeToSocket(sock, msgT.get1(), msgBytes, timeout);
+ }
+
/**
* @param msg Message.
* @return {@code True} if topology initialized.
diff --git
a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoveryIoSession.java
b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoveryIoSession.java
index fa8d71e40f1..4658abb1456 100644
---
a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoveryIoSession.java
+++
b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoveryIoSession.java
@@ -20,7 +20,6 @@ package org.apache.ignite.spi.discovery.tcp;
import java.io.BufferedInputStream;
import java.io.BufferedOutputStream;
import java.io.ByteArrayInputStream;
-import java.io.ByteArrayOutputStream;
import java.io.EOFException;
import java.io.IOException;
import java.io.InputStream;
@@ -71,7 +70,7 @@ public class TcpDiscoveryIoSession {
static final byte MESSAGE_SERIALIZATION = (byte)2;
/** */
- private final TcpDiscoverySpi spi;
+ final TcpDiscoverySpi spi;
/** Loads discovery messages classes during java deserialization. */
private final ClassLoader clsLdr;
@@ -80,7 +79,7 @@ public class TcpDiscoveryIoSession {
private final Socket sock;
/** */
- private final DirectMessageWriter msgWriter;
+ final DirectMessageWriter msgWriter;
/** */
private final DirectMessageReader msgReader;
@@ -92,7 +91,7 @@ public class TcpDiscoveryIoSession {
private final CompositeInputStream in;
/** Intermediate buffer for serializing discovery messages. */
- private final ByteBuffer msgBuf;
+ final ByteBuffer msgBuf;
/**
* Creates a new discovery I/O session bound to the given socket.
@@ -238,25 +237,6 @@ public class TcpDiscoveryIoSession {
}
}
- /**
- * Serializes a discovery message into a byte array.
- *
- * @param msg Discovery message to serialize.
- * @return Serialized byte array containing the message data.
- * @throws IgniteCheckedException If serialization fails.
- * @throws IOException If serialization fails.
- */
- byte[] serializeMessage(TcpDiscoveryAbstractMessage msg) throws
IgniteCheckedException, IOException {
- if (!(msg instanceof Message))
- return U.marshal(spi.marshaller(), msg);
-
- try (ByteArrayOutputStream out = new ByteArrayOutputStream()) {
- serializeMessage((Message)msg, out);
-
- return out.toByteArray();
- }
- }
-
/** @return Socket. */
public Socket socket() {
return sock;
@@ -269,7 +249,7 @@ public class TcpDiscoveryIoSession {
* @param out Output stream to write serialized message.
* @throws IOException If serialization fails.
*/
- private void serializeMessage(Message m, OutputStream out) throws
IOException {
+ void serializeMessage(Message m, OutputStream out) throws IOException {
MessageSerializer msgSer =
spi.messageFactory().serializer(m.directType());
msgWriter.reset();
diff --git
a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoveryMessageSerializer.java
b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoveryMessageSerializer.java
new file mode 100644
index 00000000000..1c9e1bcc69c
--- /dev/null
+++
b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoveryMessageSerializer.java
@@ -0,0 +1,71 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.ignite.spi.discovery.tcp;
+
+import java.io.ByteArrayOutputStream;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
+import java.net.Socket;
+import org.apache.ignite.IgniteCheckedException;
+import org.apache.ignite.internal.util.typedef.internal.U;
+import org.apache.ignite.plugin.extensions.communication.Message;
+import org.apache.ignite.plugin.extensions.communication.MessageSerializer;
+import
org.apache.ignite.spi.discovery.tcp.messages.TcpDiscoveryAbstractMessage;
+
+/**
+ * Class is responsible for serializing discovery messages using RU-ready
{@link MessageSerializer} mechanism.
+ * <p>
+ * It is used in a special case: when server wants to send discovery messages
to clients, it may not have a {@link TcpDiscoveryIoSession}
+ * to serialize the messages.
+ * This class enables server to serialize discovery messages anyway,
duplicating serialization code from {@link TcpDiscoveryIoSession}.
+ */
+class TcpDiscoveryMessageSerializer extends TcpDiscoveryIoSession {
+ /**
+ * @param spi Discovery SPI instance.
+ */
+ public TcpDiscoveryMessageSerializer(TcpDiscoverySpi spi) {
+ super(new Socket() {
+ @Override public OutputStream getOutputStream() throws IOException
{
+ return null;
+ }
+
+ @Override public InputStream getInputStream() throws IOException {
+ return null;
+ }
+ }, spi);
+ }
+
+ /**
+ * Serializes a discovery message into a byte array.
+ *
+ * @param msg Discovery message to serialize.
+ * @return Serialized byte array containing the message data.
+ * @throws IgniteCheckedException If serialization fails.
+ * @throws IOException If serialization fails.
+ */
+ byte[] serializeMessage(TcpDiscoveryAbstractMessage msg) throws
IgniteCheckedException, IOException {
+ if (!(msg instanceof Message))
+ return U.marshal(spi.marshaller(), msg);
+
+ try (ByteArrayOutputStream out = new ByteArrayOutputStream()) {
+ serializeMessage((Message)msg, out);
+
+ return out.toByteArray();
+ }
+ }
+}
diff --git
a/modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/TestMetricUpdateFailure.java
b/modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/TestMetricUpdateFailure.java
new file mode 100644
index 00000000000..f4d76276e5f
--- /dev/null
+++
b/modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/TestMetricUpdateFailure.java
@@ -0,0 +1,127 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.spi.discovery.tcp;
+
+import java.util.concurrent.atomic.AtomicLong;
+import java.util.stream.IntStream;
+import org.apache.ignite.IgniteCache;
+import org.apache.ignite.configuration.CacheConfiguration;
+import org.apache.ignite.configuration.IgniteConfiguration;
+import
org.apache.ignite.spi.discovery.tcp.messages.TcpDiscoveryAbstractMessage;
+import
org.apache.ignite.spi.discovery.tcp.messages.TcpDiscoveryMetricsUpdateMessage;
+import org.apache.ignite.testframework.GridTestUtils;
+import org.apache.ignite.testframework.ListeningTestLogger;
+import org.apache.ignite.testframework.LogListener;
+import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest;
+import org.junit.Test;
+
+/**
+ * Test verifies there is no data race in ServerImpl code between handling
{@link TcpDiscoveryMetricsUpdateMessage} messages
+ * and sending them to client nodes.
+ */
+public class TestMetricUpdateFailure extends GridCommonAbstractTest {
+ /** */
+ private static final int MESSAGES = 10_000;
+
+ /** */
+ private static final int METRICS_FREQ = 10;
+
+ /** */
+ public static final int SRV_CNT = 1;
+
+ /** */
+ public static final int CLN_CNT = 20;
+
+ /** */
+ private static final AtomicLong msgCnt = new AtomicLong();
+
+ /** */
+ private LogListener concLsnr =
LogListener.matches("ConcurrentModificationException").build();
+
+ /** */
+ private LogListener invalidLsnr = LogListener.matches("Invalid message
type").build();
+
+ /** {@inheritDoc} */
+ @Override protected IgniteConfiguration getConfiguration(String
igniteInstanceName) throws Exception {
+ IgniteConfiguration cfg = super.getConfiguration(igniteInstanceName);
+
+ ListeningTestLogger log = new ListeningTestLogger(log());
+
+ log.registerAllListeners(concLsnr, invalidLsnr);
+
+ return cfg.setMetricsUpdateFrequency(METRICS_FREQ)
+ .setGridLogger(log)
+ .setDiscoverySpi(new TestDiscoverySpi());
+ }
+
+ /** {@inheritDoc} */
+ @Override protected void beforeTest() throws Exception {
+ super.beforeTest();
+
+ msgCnt.set(MESSAGES);
+
+ assertFalse(concLsnr.check());
+ assertFalse(invalidLsnr.check());
+ }
+
+ /** {@inheritDoc} */
+ @Override protected void afterTest() throws Exception {
+ super.afterTest();
+
+ concLsnr.reset();
+ invalidLsnr.reset();
+
+ stopAllGrids(true);
+ }
+
+ /** */
+ @Test
+ public void test() throws Exception {
+ startGrids(SRV_CNT);
+ startClientGridsMultiThreaded(SRV_CNT, CLN_CNT);
+
+ waitForTopology(SRV_CNT + CLN_CNT);
+
+ IgniteCache<Integer, Integer> clnCache = grid(SRV_CNT).createCache(
+ new CacheConfiguration<Integer, Integer>(DEFAULT_CACHE_NAME)
+ .setStatisticsEnabled(true)
+ .setBackups(1));
+
+ IntStream.range(0, 10000)
+ .forEach(i -> clnCache.put(i, i));
+
+ GridTestUtils.waitForCondition(() -> msgCnt.get() <= 0 ||
concLsnr.check() || invalidLsnr.check(),
+ getTestTimeout());
+
+ assertFalse("Concurrent modification occured", concLsnr.check());
+ assertFalse("Invalid message type found", invalidLsnr.check());
+
+ checkTopology(SRV_CNT + CLN_CNT);
+ }
+
+ /** */
+ private static class TestDiscoverySpi extends TcpDiscoverySpi {
+ /** {@inheritDoc} */
+ @Override protected void
startMessageProcess(TcpDiscoveryAbstractMessage msg) {
+ super.startMessageProcess(msg);
+
+ if (msg instanceof TcpDiscoveryMetricsUpdateMessage)
+ msgCnt.decrementAndGet();
+ }
+ }
+}
diff --git
a/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteSpiDiscoverySelfTestSuite.java
b/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteSpiDiscoverySelfTestSuite.java
index e5320437a46..af3cd558edd 100644
---
a/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteSpiDiscoverySelfTestSuite.java
+++
b/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteSpiDiscoverySelfTestSuite.java
@@ -81,6 +81,7 @@ import
org.apache.ignite.spi.discovery.tcp.TcpDiscoverySslTrustedSelfTest;
import org.apache.ignite.spi.discovery.tcp.TcpDiscoverySslTrustedUntrustedTest;
import org.apache.ignite.spi.discovery.tcp.TcpDiscoveryWithAddressFilterTest;
import org.apache.ignite.spi.discovery.tcp.TcpDiscoveryWithWrongServerTest;
+import org.apache.ignite.spi.discovery.tcp.TestMetricUpdateFailure;
import
org.apache.ignite.spi.discovery.tcp.ipfinder.jdbc.TcpDiscoveryJdbcIpFinderSelfTest;
import
org.apache.ignite.spi.discovery.tcp.ipfinder.multicast.TcpDiscoveryMulticastIpFinderSelfTest;
import
org.apache.ignite.spi.discovery.tcp.ipfinder.sharedfs.TcpDiscoverySharedFsIpFinderSelfTest;
@@ -152,6 +153,8 @@ import static
org.apache.ignite.IgniteSystemProperties.IGNITE_OVERRIDE_MCAST_GRP
IgniteDiscoveryMassiveNodeFailTest.class,
TcpDiscoveryCoordinatorFailureTest.class,
+ TestMetricUpdateFailure.class,
+
// Client connect.
IgniteClientConnectTest.class,
IgniteClientConnectSslTest.class,