This is an automated email from the ASF dual-hosted git repository.

sergeychugunov pushed a commit to branch ignite-27722-alt-0
in repository https://gitbox.apache.org/repos/asf/ignite.git

commit a5fd27e33a0297b8e22450426498330137cf4787
Author: Sergey Chugunov <[email protected]>
AuthorDate: Fri Feb 6 10:46:44 2026 +0300

    IGNITE-27722 Alternative implementation
---
 .../ignite/spi/discovery/tcp/ServerImpl.java       | 34 +++++++++++-----------
 .../ignite/spi/discovery/tcp/TcpDiscoveryImpl.java |  7 ++++-
 .../spi/discovery/tcp/TcpDiscoveryIoSession.java   | 23 +++++++++++----
 ...zer.java => TcpDiscoveryMessageMarshaller.java} | 26 ++++++-----------
 4 files changed, 50 insertions(+), 40 deletions(-)

diff --git 
a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ServerImpl.java
 
b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ServerImpl.java
index cf5182fca33..36df73065eb 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ServerImpl.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ServerImpl.java
@@ -2835,6 +2835,9 @@ class ServerImpl extends TcpDiscoveryImpl {
         /** Socket. */
         private Socket sock;
 
+        /** */
+        private TcpDiscoveryMessageMarshaller msgMarsh = new 
TcpDiscoveryMessageMarshaller(spi);
+
         /** IO session. */
         private TcpDiscoveryIoSession ses;
 
@@ -3245,22 +3248,21 @@ class ServerImpl extends TcpDiscoveryImpl {
                 if (clientMsgWorkers.isEmpty())
                     return;
 
-                byte[] msgBytes;
+                byte[] msgBytes = null;
 
-                TcpDiscoveryIoSerializer ser = ses != null ? ses : new 
TcpDiscoveryIoSerializer(spi);
-
-                try {
-                    msgBytes = ser.serializeMessage(msg);
-                }
-                catch (IgniteCheckedException | IOException e) {
-                    U.error(log, "Failed to serialize message: " + msg, e);
+                if (!(msg instanceof TcpDiscoveryNodeAddedMessage)) {
+                    try {
+                        msgBytes = msgMarsh.marshal(msg);
+                    }
+                    catch (IgniteCheckedException | IOException e) {
+                        U.error(log, "Failed to serialize message: " + msg, e);
 
-                    return;
+                        return;
+                    }
                 }
 
                 for (ClientMessageWorker clientMsgWorker : 
clientMsgWorkers.values()) {
                     TcpDiscoveryAbstractMessage msg0 = msg;
-                    byte[] msgBytes0 = msgBytes;
 
                     if (msg instanceof TcpDiscoveryNodeAddedMessage) {
                         TcpDiscoveryNodeAddedMessage nodeAddedMsg = 
(TcpDiscoveryNodeAddedMessage)msg;
@@ -3269,12 +3271,10 @@ class ServerImpl extends TcpDiscoveryImpl {
                             msg0 = new 
TcpDiscoveryNodeAddedMessage(nodeAddedMsg);
 
                             prepareNodeAddedMessage(msg0, 
clientMsgWorker.clientNodeId, null);
-
-                            msgBytes0 = null;
                         }
                     }
 
-                    clientMsgWorker.addMessage(msg0, msgBytes0);
+                    clientMsgWorker.addMessage(msg0, msgBytes);
                 }
             }
         }
@@ -3398,7 +3398,7 @@ class ServerImpl extends TcpDiscoveryImpl {
                             try {
                                 sock = spi.openSocket(addr, timeoutHelper);
 
-                                ses = createSession(sock);
+                                ses = createSession(sock, msgMarsh);
 
                                 openSock = true;
 
@@ -7608,7 +7608,7 @@ class ServerImpl extends TcpDiscoveryImpl {
         private final UUID clientNodeId;
 
         /** */
-        private final TcpDiscoveryIoSession ses;
+        private final TcpDiscoveryMessageMarshaller msgMarsh;
 
         /** Socket. */
         private final Socket sock;
@@ -7638,7 +7638,7 @@ class ServerImpl extends TcpDiscoveryImpl {
             this.sock = sock;
             this.clientNodeId = clientNodeId;
 
-            ses = createSession(sock);
+            msgMarsh = new TcpDiscoveryMessageMarshaller(spi);
 
             lastMetricsUpdateMsgTimeNanos = System.nanoTime();
         }
@@ -7768,7 +7768,7 @@ class ServerImpl extends TcpDiscoveryImpl {
          */
         private void writeToSocket(T2<TcpDiscoveryAbstractMessage, byte[]> 
msgT, long timeout)
             throws IgniteCheckedException, IOException {
-            byte[] msgBytes = msgT.get2() == null ? 
ses.serializeMessage(msgT.get1()) : msgT.get2();
+            byte[] msgBytes = msgT.get2() == null ? 
msgMarsh.marshal(msgT.get1()) : msgT.get2();
 
             spi.writeToSocket(sock, msgT.get1(), msgBytes, timeout);
         }
diff --git 
a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoveryImpl.java
 
b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoveryImpl.java
index e115a3cca03..b2e5055800e 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoveryImpl.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoveryImpl.java
@@ -484,7 +484,12 @@ abstract class TcpDiscoveryImpl {
      * @return IO session for writing and reading {@link 
TcpDiscoveryAbstractMessage}.
      */
     TcpDiscoveryIoSession createSession(Socket sock) {
-        return new TcpDiscoveryIoSession(sock, spi);
+        return createSession(sock, null);
+    }
+
+    /** */
+    TcpDiscoveryIoSession createSession(Socket sock, 
TcpDiscoveryMessageMarshaller msgMarsh) {
+        return new TcpDiscoveryIoSession(sock, msgMarsh, spi);
     }
 
     /**
diff --git 
a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoveryIoSession.java
 
b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoveryIoSession.java
index 131c228c0af..8a99c4a5456 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoveryIoSession.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoveryIoSession.java
@@ -26,6 +26,7 @@ import java.io.InputStream;
 import java.io.OutputStream;
 import java.io.StreamCorruptedException;
 import java.net.Socket;
+import java.nio.ByteBuffer;
 import java.security.cert.Certificate;
 import javax.net.ssl.SSLPeerUnverifiedException;
 import javax.net.ssl.SSLSocket;
@@ -53,7 +54,7 @@ import static 
org.apache.ignite.spi.communication.tcp.TcpCommunicationSpi.makeMe
  * </ul>
  * A leading byte is used to distinguish between the modes. The byte will be 
removed in future.
  */
-public class TcpDiscoveryIoSession extends TcpDiscoveryIoSerializer {
+public class TcpDiscoveryIoSession {
     /** Default size of buffer used for buffering socket in/out. */
     private static final int DFLT_SOCK_BUFFER_SIZE = 8192;
 
@@ -67,6 +68,15 @@ public class TcpDiscoveryIoSession extends 
TcpDiscoveryIoSerializer {
     /** */
     private final Socket sock;
 
+    /** */
+    private final TcpDiscoveryMessageMarshaller msgMarsh;
+
+    /** */
+    private final TcpDiscoverySpi spi;
+
+    /** */
+    private final ClassLoader clsLdr;
+
     /** */
     private final DirectMessageReader msgReader;
 
@@ -83,10 +93,11 @@ public class TcpDiscoveryIoSession extends 
TcpDiscoveryIoSerializer {
      * @param spi  Discovery SPI instance owning this session.
      * @throws IgniteException If an I/O error occurs while initializing 
buffers.
      */
-    TcpDiscoveryIoSession(Socket sock, TcpDiscoverySpi spi) {
-        super(spi);
-
+    TcpDiscoveryIoSession(Socket sock, TcpDiscoveryMessageMarshaller 
msgMarshaller, TcpDiscoverySpi spi) {
         this.sock = sock;
+        msgMarsh = msgMarshaller;
+        this.spi = spi;
+        clsLdr = U.resolveClassLoader(spi.ignite().configuration());
 
         msgReader = new DirectMessageReader(spi.messageFactory(), null);
 
@@ -120,7 +131,7 @@ public class TcpDiscoveryIoSession extends 
TcpDiscoveryIoSerializer {
         try {
             out.write(MESSAGE_SERIALIZATION);
 
-            serializeMessage((Message)msg, out);
+            msgMarsh.marshal((Message)msg, out);
 
             out.flush();
         }
@@ -157,6 +168,8 @@ public class TcpDiscoveryIoSession extends 
TcpDiscoveryIoSerializer {
 
             Message msg = 
spi.messageFactory().create(makeMessageType((byte)in.read(), (byte)in.read()));
 
+            ByteBuffer msgBuf = msgMarsh.msgBuf;
+
             msgReader.reset();
             msgReader.setBuffer(msgBuf);
 
diff --git 
a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoveryIoSerializer.java
 
b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoveryMessageMarshaller.java
similarity index 81%
rename from 
modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoveryIoSerializer.java
rename to 
modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoveryMessageMarshaller.java
index d8cc0564f0f..00b915fc1f7 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoveryIoSerializer.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoveryMessageMarshaller.java
@@ -14,7 +14,6 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-
 package org.apache.ignite.spi.discovery.tcp;
 
 import java.io.ByteArrayOutputStream;
@@ -28,21 +27,16 @@ import 
org.apache.ignite.plugin.extensions.communication.Message;
 import org.apache.ignite.plugin.extensions.communication.MessageSerializer;
 import 
org.apache.ignite.spi.discovery.tcp.messages.TcpDiscoveryAbstractMessage;
 
-/**
- * Serializer of messages. Converts discovery messages into bytes.
- */
-public class TcpDiscoveryIoSerializer {
+/** */
+public class TcpDiscoveryMessageMarshaller {
     /** Size for an intermediate buffer for serializing discovery messages. */
-    static final int MSG_BUFFER_SIZE = 100;
+    static final int MSG_BUFFER_SIZE = 128;
 
     /** */
-    final TcpDiscoverySpi spi;
-
-    /** Loads discovery messages classes during java deserialization. */
-    final ClassLoader clsLdr;
+    private final TcpDiscoverySpi spi;
 
     /** */
-    final DirectMessageWriter msgWriter;
+    private final DirectMessageWriter msgWriter;
 
     /** Intermediate buffer for serializing discovery messages. */
     final ByteBuffer msgBuf;
@@ -50,11 +44,9 @@ public class TcpDiscoveryIoSerializer {
     /**
      * @param spi Discovery SPI instance.
      */
-    public TcpDiscoveryIoSerializer(TcpDiscoverySpi spi) {
+    public TcpDiscoveryMessageMarshaller(TcpDiscoverySpi spi) {
         this.spi = spi;
 
-        clsLdr = U.resolveClassLoader(spi.ignite().configuration());
-
         msgBuf = ByteBuffer.allocate(MSG_BUFFER_SIZE);
 
         msgWriter = new DirectMessageWriter(spi.messageFactory());
@@ -68,12 +60,12 @@ public class TcpDiscoveryIoSerializer {
      * @throws IgniteCheckedException If serialization fails.
      * @throws IOException If serialization fails.
      */
-    byte[] serializeMessage(TcpDiscoveryAbstractMessage msg) throws 
IgniteCheckedException, IOException {
+    byte[] marshal(TcpDiscoveryAbstractMessage msg) throws 
IgniteCheckedException, IOException {
         if (!(msg instanceof Message))
             return U.marshal(spi.marshaller(), msg);
 
         try (ByteArrayOutputStream out = new ByteArrayOutputStream()) {
-            serializeMessage((Message)msg, out);
+            marshal((Message)msg, out);
 
             return out.toByteArray();
         }
@@ -87,7 +79,7 @@ public class TcpDiscoveryIoSerializer {
      * @param out Output stream to write serialized message.
      * @throws IOException If serialization fails.
      */
-    void serializeMessage(Message m, OutputStream out) throws IOException {
+    void marshal(Message m, OutputStream out) throws IOException {
         MessageSerializer msgSer = 
spi.messageFactory().serializer(m.directType());
 
         msgWriter.reset();

Reply via email to