This is an automated email from the ASF dual-hosted git repository.

sergeychugunov pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/ignite.git


The following commit(s) were added to refs/heads/master by this push:
     new a1823267f67 IGNITE-27722 Split TcpDiscoveryIoSession to fix race in 
RingMessageWorker (#12731)
a1823267f67 is described below

commit a1823267f673a3ac71017347d1fd129e11a5c4cf
Author: Sergey Chugunov <[email protected]>
AuthorDate: Fri Feb 20 20:12:50 2026 +0400

    IGNITE-27722 Split TcpDiscoveryIoSession to fix race in RingMessageWorker 
(#12731)
---
 .../ignite/spi/discovery/tcp/ServerImpl.java       |  87 +++++++++++---
 .../spi/discovery/tcp/TcpDiscoveryIoSession.java   |  28 +----
 .../tcp/TcpDiscoveryMessageSerializer.java         |  71 ++++++++++++
 .../spi/discovery/tcp/TestMetricUpdateFailure.java | 127 +++++++++++++++++++++
 .../IgniteSpiDiscoverySelfTestSuite.java           |   3 +
 5 files changed, 277 insertions(+), 39 deletions(-)

diff --git 
a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ServerImpl.java
 
b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ServerImpl.java
index 2f6d431fa9a..e6d3052e902 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ServerImpl.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ServerImpl.java
@@ -100,6 +100,7 @@ import 
org.apache.ignite.internal.util.tostring.GridToStringExclude;
 import org.apache.ignite.internal.util.typedef.C1;
 import org.apache.ignite.internal.util.typedef.F;
 import org.apache.ignite.internal.util.typedef.P1;
+import org.apache.ignite.internal.util.typedef.T2;
 import org.apache.ignite.internal.util.typedef.X;
 import org.apache.ignite.internal.util.typedef.internal.LT;
 import org.apache.ignite.internal.util.typedef.internal.S;
@@ -2836,6 +2837,16 @@ class ServerImpl extends TcpDiscoveryImpl {
         /** Socket. */
         private Socket sock;
 
+        // This serializer is used exclusively for serializing messages sent 
to clients,
+        // as it represents a special case within the RingMessageWorker 
workflow.
+        // Generally, both serialization and deserialization of messages 
should be handled by TcpDiscoveryIoSession.
+        // However, there are scenarios where the session is not available, 
yet messages still need to be sent to clients.
+        // A typical example is a single server with one or more connected 
clients.
+        // To address this, we use TcpDiscoveryMessageSerializer, which 
includes some code copied from TcpDiscoveryIoSession
+        // and can be instantiated independently of any active session.
+        /** */
+        private final TcpDiscoveryMessageSerializer clientMsgSer = new 
TcpDiscoveryMessageSerializer(spi);
+
         /** IO session. */
         private TcpDiscoveryIoSession ses;
 
@@ -3243,19 +3254,36 @@ class ServerImpl extends TcpDiscoveryImpl {
                 if (spi.ensured(msg))
                     msgHist.add(msg);
 
+                if (clientMsgWorkers.isEmpty())
+                    return;
+
+                byte[] msgBytes = null;
+
+                if (!(msg instanceof TcpDiscoveryNodeAddedMessage)) {
+                    try {
+                        msgBytes = clientMsgSer.serializeMessage(msg);
+                    }
+                    catch (IgniteCheckedException | IOException e) {
+                        U.error(log, "Failed to serialize message: " + msg, e);
+
+                        return;
+                    }
+                }
+
                 for (ClientMessageWorker clientMsgWorker : 
clientMsgWorkers.values()) {
+                    TcpDiscoveryAbstractMessage msg0 = msg;
+
                     if (msg instanceof TcpDiscoveryNodeAddedMessage) {
                         TcpDiscoveryNodeAddedMessage nodeAddedMsg = 
(TcpDiscoveryNodeAddedMessage)msg;
 
                         if 
(clientMsgWorker.clientNodeId.equals(nodeAddedMsg.node().id())) {
-                            msg = new 
TcpDiscoveryNodeAddedMessage(nodeAddedMsg);
+                            msg0 = new 
TcpDiscoveryNodeAddedMessage(nodeAddedMsg);
 
-                            prepareNodeAddedMessage(msg, 
clientMsgWorker.clientNodeId, null);
+                            prepareNodeAddedMessage(msg0, 
clientMsgWorker.clientNodeId, null);
                         }
                     }
 
-                    // TODO Investigate possible optimizations: 
https://issues.apache.org/jira/browse/IGNITE-27722
-                    clientMsgWorker.addMessage(msg);
+                    clientMsgWorker.addMessage(msg0, msgBytes);
                 }
             }
         }
@@ -7588,12 +7616,20 @@ class ServerImpl extends TcpDiscoveryImpl {
     }
 
     /** */
-    private class ClientMessageWorker extends 
MessageWorker<TcpDiscoveryAbstractMessage> {
+    private class ClientMessageWorker extends 
MessageWorker<T2<TcpDiscoveryAbstractMessage, byte[]>> {
         /** Node ID. */
         private final UUID clientNodeId;
 
+        // The code responsible for sending and receiving messages to and from 
client nodes represents a special case in ServerImpl,
+        // as it is split into two separate components.
+        // One part, ClientMessageWorker, handles only message sending to 
clients and does not process responses.
+        // The other part, which reads messages from clients, is implemented 
in SocketReader.
+        // Due to this separation, we don't require a full 
TcpDiscoveryIoSession here
+        // and can instead extract just the message-writing functionality.
+        // At the same time, we aim to keep both reading and writing logic 
encapsulated within TcpDiscoveryIoSession.
+        // As a result, we need to copy some code from TcpDiscoveryIoSession 
into the new class, TcpDiscoveryMessageSerializer.
         /** */
-        private final TcpDiscoveryIoSession ses;
+        private final TcpDiscoveryMessageSerializer clientMsgSer;
 
         /** Socket. */
         private final Socket sock;
@@ -7623,7 +7659,7 @@ class ServerImpl extends TcpDiscoveryImpl {
             this.sock = sock;
             this.clientNodeId = clientNodeId;
 
-            ses = createSession(sock);
+            clientMsgSer = new TcpDiscoveryMessageSerializer(spi);
 
             lastMetricsUpdateMsgTimeNanos = System.nanoTime();
         }
@@ -7655,10 +7691,20 @@ class ServerImpl extends TcpDiscoveryImpl {
          * @param msg Message.
          */
         void addMessage(TcpDiscoveryAbstractMessage msg) {
+            addMessage(msg, null);
+        }
+
+        /**
+         * @param msg Message.
+         * @param msgBytes Optional message bytes.
+         */
+        void addMessage(TcpDiscoveryAbstractMessage msg, @Nullable byte[] 
msgBytes) {
+            T2<TcpDiscoveryAbstractMessage, byte[]> t = new T2<>(msg, 
msgBytes);
+
             if (msg.highPriority())
-                queue.addFirst(msg);
+                queue.addFirst(t);
             else
-                queue.add(msg);
+                queue.add(t);
 
             DebugLogger log = messageLogger(msg);
 
@@ -7667,14 +7713,14 @@ class ServerImpl extends TcpDiscoveryImpl {
         }
 
         /** {@inheritDoc} */
-        @Override protected void processMessage(TcpDiscoveryAbstractMessage 
msg) {
+        @Override protected void 
processMessage(T2<TcpDiscoveryAbstractMessage, byte[]> msgT) {
             boolean success = false;
 
+            TcpDiscoveryAbstractMessage msg = msgT.get1();
+
             try {
                 assert msg.verified() : msg;
 
-                byte[] msgBytes = ses.serializeMessage(msg);
-
                 DebugLogger msgLog = messageLogger(msg);
 
                 if (msg instanceof TcpDiscoveryClientAckResponse) {
@@ -7696,8 +7742,8 @@ class ServerImpl extends TcpDiscoveryImpl {
                                 + getLocalNodeId() + ", rmtNodeId=" + 
clientNodeId + ", msg=" + msg + ']');
                         }
 
-                        spi.writeToSocket(sock, msg, msgBytes, 
spi.failureDetectionTimeoutEnabled() ?
-                            spi.clientFailureDetectionTimeout() : 
spi.getSocketTimeout());
+                        writeToSocket(msgT, 
spi.failureDetectionTimeoutEnabled() ? spi.clientFailureDetectionTimeout() :
+                            spi.getSocketTimeout());
                     }
                 }
                 else {
@@ -7708,7 +7754,7 @@ class ServerImpl extends TcpDiscoveryImpl {
 
                     assert topologyInitialized(msg) : msg;
 
-                    spi.writeToSocket(sock, msg, msgBytes, 
spi.getEffectiveSocketTimeout(false));
+                    writeToSocket(msgT, spi.getEffectiveSocketTimeout(false));
                 }
 
                 boolean clientFailed = msg instanceof 
TcpDiscoveryNodeFailedMessage &&
@@ -7737,6 +7783,17 @@ class ServerImpl extends TcpDiscoveryImpl {
             }
         }
 
+        /**
+         * @param msgT Message tuple.
+         * @param timeout Timeout.
+         */
+        private void writeToSocket(T2<TcpDiscoveryAbstractMessage, byte[]> 
msgT, long timeout)
+            throws IgniteCheckedException, IOException {
+            byte[] msgBytes = msgT.get2() == null ? 
clientMsgSer.serializeMessage(msgT.get1()) : msgT.get2();
+
+            spi.writeToSocket(sock, msgT.get1(), msgBytes, timeout);
+        }
+
         /**
          * @param msg Message.
          * @return {@code True} if topology initialized.
diff --git 
a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoveryIoSession.java
 
b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoveryIoSession.java
index fa8d71e40f1..4658abb1456 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoveryIoSession.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoveryIoSession.java
@@ -20,7 +20,6 @@ package org.apache.ignite.spi.discovery.tcp;
 import java.io.BufferedInputStream;
 import java.io.BufferedOutputStream;
 import java.io.ByteArrayInputStream;
-import java.io.ByteArrayOutputStream;
 import java.io.EOFException;
 import java.io.IOException;
 import java.io.InputStream;
@@ -71,7 +70,7 @@ public class TcpDiscoveryIoSession {
     static final byte MESSAGE_SERIALIZATION = (byte)2;
 
     /** */
-    private final TcpDiscoverySpi spi;
+    final TcpDiscoverySpi spi;
 
     /** Loads discovery messages classes during java deserialization. */
     private final ClassLoader clsLdr;
@@ -80,7 +79,7 @@ public class TcpDiscoveryIoSession {
     private final Socket sock;
 
     /** */
-    private final DirectMessageWriter msgWriter;
+    final DirectMessageWriter msgWriter;
 
     /** */
     private final DirectMessageReader msgReader;
@@ -92,7 +91,7 @@ public class TcpDiscoveryIoSession {
     private final CompositeInputStream in;
 
     /** Intermediate buffer for serializing discovery messages. */
-    private final ByteBuffer msgBuf;
+    final ByteBuffer msgBuf;
 
     /**
      * Creates a new discovery I/O session bound to the given socket.
@@ -238,25 +237,6 @@ public class TcpDiscoveryIoSession {
         }
     }
 
-    /**
-     * Serializes a discovery message into a byte array.
-     *
-     * @param msg Discovery message to serialize.
-     * @return Serialized byte array containing the message data.
-     * @throws IgniteCheckedException If serialization fails.
-     * @throws IOException If serialization fails.
-     */
-    byte[] serializeMessage(TcpDiscoveryAbstractMessage msg) throws 
IgniteCheckedException, IOException {
-        if (!(msg instanceof Message))
-            return U.marshal(spi.marshaller(), msg);
-
-        try (ByteArrayOutputStream out = new ByteArrayOutputStream()) {
-            serializeMessage((Message)msg, out);
-
-            return out.toByteArray();
-        }
-    }
-
     /** @return Socket. */
     public Socket socket() {
         return sock;
@@ -269,7 +249,7 @@ public class TcpDiscoveryIoSession {
      * @param out Output stream to write serialized message.
      * @throws IOException If serialization fails.
      */
-    private void serializeMessage(Message m, OutputStream out) throws 
IOException {
+    void serializeMessage(Message m, OutputStream out) throws IOException {
         MessageSerializer msgSer = 
spi.messageFactory().serializer(m.directType());
 
         msgWriter.reset();
diff --git 
a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoveryMessageSerializer.java
 
b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoveryMessageSerializer.java
new file mode 100644
index 00000000000..1c9e1bcc69c
--- /dev/null
+++ 
b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoveryMessageSerializer.java
@@ -0,0 +1,71 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.ignite.spi.discovery.tcp;
+
+import java.io.ByteArrayOutputStream;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
+import java.net.Socket;
+import org.apache.ignite.IgniteCheckedException;
+import org.apache.ignite.internal.util.typedef.internal.U;
+import org.apache.ignite.plugin.extensions.communication.Message;
+import org.apache.ignite.plugin.extensions.communication.MessageSerializer;
+import 
org.apache.ignite.spi.discovery.tcp.messages.TcpDiscoveryAbstractMessage;
+
+/**
+ * Class is responsible for serializing discovery messages using RU-ready 
{@link MessageSerializer} mechanism.
+ * <p>
+ * It is used in a special case: when server wants to send discovery messages 
to clients, it may not have a {@link TcpDiscoveryIoSession}
+ * to serialize the messages.
+ * This class enables server to serialize discovery messages anyway, 
duplicating serialization code from {@link TcpDiscoveryIoSession}.
+ */
+class TcpDiscoveryMessageSerializer extends TcpDiscoveryIoSession {
+    /**
+     * @param spi Discovery SPI instance.
+     */
+    public TcpDiscoveryMessageSerializer(TcpDiscoverySpi spi) {
+        super(new Socket() {
+            @Override public OutputStream getOutputStream() throws IOException 
{
+                return null;
+            }
+
+            @Override public InputStream getInputStream() throws IOException {
+                return null;
+            }
+        }, spi);
+    }
+
+    /**
+     * Serializes a discovery message into a byte array.
+     *
+     * @param msg Discovery message to serialize.
+     * @return Serialized byte array containing the message data.
+     * @throws IgniteCheckedException If serialization fails.
+     * @throws IOException If serialization fails.
+     */
+    byte[] serializeMessage(TcpDiscoveryAbstractMessage msg) throws 
IgniteCheckedException, IOException {
+        if (!(msg instanceof Message))
+            return U.marshal(spi.marshaller(), msg);
+
+        try (ByteArrayOutputStream out = new ByteArrayOutputStream()) {
+            serializeMessage((Message)msg, out);
+
+            return out.toByteArray();
+        }
+    }
+}
diff --git 
a/modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/TestMetricUpdateFailure.java
 
b/modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/TestMetricUpdateFailure.java
new file mode 100644
index 00000000000..f4d76276e5f
--- /dev/null
+++ 
b/modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/TestMetricUpdateFailure.java
@@ -0,0 +1,127 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.spi.discovery.tcp;
+
+import java.util.concurrent.atomic.AtomicLong;
+import java.util.stream.IntStream;
+import org.apache.ignite.IgniteCache;
+import org.apache.ignite.configuration.CacheConfiguration;
+import org.apache.ignite.configuration.IgniteConfiguration;
+import 
org.apache.ignite.spi.discovery.tcp.messages.TcpDiscoveryAbstractMessage;
+import 
org.apache.ignite.spi.discovery.tcp.messages.TcpDiscoveryMetricsUpdateMessage;
+import org.apache.ignite.testframework.GridTestUtils;
+import org.apache.ignite.testframework.ListeningTestLogger;
+import org.apache.ignite.testframework.LogListener;
+import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest;
+import org.junit.Test;
+
+/**
+ * Test verifies there is no data race in ServerImpl code between handling 
{@link TcpDiscoveryMetricsUpdateMessage} messages
+ * and sending them to client nodes.
+ */
+public class TestMetricUpdateFailure extends GridCommonAbstractTest {
+    /** */
+    private static final int MESSAGES = 10_000;
+
+    /** */
+    private static final int METRICS_FREQ = 10;
+
+    /** */
+    public static final int SRV_CNT = 1;
+
+    /** */
+    public static final int CLN_CNT = 20;
+
+    /** */
+    private static final AtomicLong msgCnt = new AtomicLong();
+
+    /** */
+    private LogListener concLsnr = 
LogListener.matches("ConcurrentModificationException").build();
+
+    /** */
+    private LogListener invalidLsnr = LogListener.matches("Invalid message 
type").build();
+
+    /** {@inheritDoc} */
+    @Override protected IgniteConfiguration getConfiguration(String 
igniteInstanceName) throws Exception {
+        IgniteConfiguration cfg = super.getConfiguration(igniteInstanceName);
+
+        ListeningTestLogger log = new ListeningTestLogger(log());
+
+        log.registerAllListeners(concLsnr, invalidLsnr);
+
+        return cfg.setMetricsUpdateFrequency(METRICS_FREQ)
+            .setGridLogger(log)
+            .setDiscoverySpi(new TestDiscoverySpi());
+    }
+
+    /** {@inheritDoc} */
+    @Override protected void beforeTest() throws Exception {
+        super.beforeTest();
+
+        msgCnt.set(MESSAGES);
+
+        assertFalse(concLsnr.check());
+        assertFalse(invalidLsnr.check());
+    }
+
+    /** {@inheritDoc} */
+    @Override protected void afterTest() throws Exception {
+        super.afterTest();
+
+        concLsnr.reset();
+        invalidLsnr.reset();
+
+        stopAllGrids(true);
+    }
+
+    /** */
+    @Test
+    public void test() throws Exception {
+        startGrids(SRV_CNT);
+        startClientGridsMultiThreaded(SRV_CNT, CLN_CNT);
+
+        waitForTopology(SRV_CNT + CLN_CNT);
+
+        IgniteCache<Integer, Integer> clnCache = grid(SRV_CNT).createCache(
+            new CacheConfiguration<Integer, Integer>(DEFAULT_CACHE_NAME)
+                .setStatisticsEnabled(true)
+                .setBackups(1));
+
+        IntStream.range(0, 10000)
+            .forEach(i -> clnCache.put(i, i));
+
+        GridTestUtils.waitForCondition(() -> msgCnt.get() <= 0 || 
concLsnr.check() || invalidLsnr.check(),
+            getTestTimeout());
+
+        assertFalse("Concurrent modification occured", concLsnr.check());
+        assertFalse("Invalid message type found", invalidLsnr.check());
+
+        checkTopology(SRV_CNT + CLN_CNT);
+    }
+
+    /** */
+    private static class TestDiscoverySpi extends TcpDiscoverySpi {
+        /** {@inheritDoc} */
+        @Override protected void 
startMessageProcess(TcpDiscoveryAbstractMessage msg) {
+            super.startMessageProcess(msg);
+
+            if (msg instanceof TcpDiscoveryMetricsUpdateMessage)
+                msgCnt.decrementAndGet();
+        }
+    }
+}
diff --git 
a/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteSpiDiscoverySelfTestSuite.java
 
b/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteSpiDiscoverySelfTestSuite.java
index e5320437a46..af3cd558edd 100644
--- 
a/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteSpiDiscoverySelfTestSuite.java
+++ 
b/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteSpiDiscoverySelfTestSuite.java
@@ -81,6 +81,7 @@ import 
org.apache.ignite.spi.discovery.tcp.TcpDiscoverySslTrustedSelfTest;
 import org.apache.ignite.spi.discovery.tcp.TcpDiscoverySslTrustedUntrustedTest;
 import org.apache.ignite.spi.discovery.tcp.TcpDiscoveryWithAddressFilterTest;
 import org.apache.ignite.spi.discovery.tcp.TcpDiscoveryWithWrongServerTest;
+import org.apache.ignite.spi.discovery.tcp.TestMetricUpdateFailure;
 import 
org.apache.ignite.spi.discovery.tcp.ipfinder.jdbc.TcpDiscoveryJdbcIpFinderSelfTest;
 import 
org.apache.ignite.spi.discovery.tcp.ipfinder.multicast.TcpDiscoveryMulticastIpFinderSelfTest;
 import 
org.apache.ignite.spi.discovery.tcp.ipfinder.sharedfs.TcpDiscoverySharedFsIpFinderSelfTest;
@@ -152,6 +153,8 @@ import static 
org.apache.ignite.IgniteSystemProperties.IGNITE_OVERRIDE_MCAST_GRP
     IgniteDiscoveryMassiveNodeFailTest.class,
     TcpDiscoveryCoordinatorFailureTest.class,
 
+    TestMetricUpdateFailure.class,
+
     // Client connect.
     IgniteClientConnectTest.class,
     IgniteClientConnectSslTest.class,

Reply via email to