This is an automated email from the ASF dual-hosted git repository. sergeychugunov pushed a commit to branch ignite-27722-alt-0 in repository https://gitbox.apache.org/repos/asf/ignite.git
commit a5fd27e33a0297b8e22450426498330137cf4787 Author: Sergey Chugunov <[email protected]> AuthorDate: Fri Feb 6 10:46:44 2026 +0300 IGNITE-27722 Alternative implementation --- .../ignite/spi/discovery/tcp/ServerImpl.java | 34 +++++++++++----------- .../ignite/spi/discovery/tcp/TcpDiscoveryImpl.java | 7 ++++- .../spi/discovery/tcp/TcpDiscoveryIoSession.java | 23 +++++++++++---- ...zer.java => TcpDiscoveryMessageMarshaller.java} | 26 ++++++----------- 4 files changed, 50 insertions(+), 40 deletions(-) diff --git a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ServerImpl.java b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ServerImpl.java index cf5182fca33..36df73065eb 100644 --- a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ServerImpl.java +++ b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ServerImpl.java @@ -2835,6 +2835,9 @@ class ServerImpl extends TcpDiscoveryImpl { /** Socket. */ private Socket sock; + /** */ + private TcpDiscoveryMessageMarshaller msgMarsh = new TcpDiscoveryMessageMarshaller(spi); + /** IO session. */ private TcpDiscoveryIoSession ses; @@ -3245,22 +3248,21 @@ class ServerImpl extends TcpDiscoveryImpl { if (clientMsgWorkers.isEmpty()) return; - byte[] msgBytes; + byte[] msgBytes = null; - TcpDiscoveryIoSerializer ser = ses != null ? ses : new TcpDiscoveryIoSerializer(spi); - - try { - msgBytes = ser.serializeMessage(msg); - } - catch (IgniteCheckedException | IOException e) { - U.error(log, "Failed to serialize message: " + msg, e); + if (!(msg instanceof TcpDiscoveryNodeAddedMessage)) { + try { + msgBytes = msgMarsh.marshal(msg); + } + catch (IgniteCheckedException | IOException e) { + U.error(log, "Failed to serialize message: " + msg, e); - return; + return; + } } for (ClientMessageWorker clientMsgWorker : clientMsgWorkers.values()) { TcpDiscoveryAbstractMessage msg0 = msg; - byte[] msgBytes0 = msgBytes; if (msg instanceof TcpDiscoveryNodeAddedMessage) { TcpDiscoveryNodeAddedMessage nodeAddedMsg = (TcpDiscoveryNodeAddedMessage)msg; @@ -3269,12 +3271,10 @@ class ServerImpl extends TcpDiscoveryImpl { msg0 = new TcpDiscoveryNodeAddedMessage(nodeAddedMsg); prepareNodeAddedMessage(msg0, clientMsgWorker.clientNodeId, null); - - msgBytes0 = null; } } - clientMsgWorker.addMessage(msg0, msgBytes0); + clientMsgWorker.addMessage(msg0, msgBytes); } } } @@ -3398,7 +3398,7 @@ class ServerImpl extends TcpDiscoveryImpl { try { sock = spi.openSocket(addr, timeoutHelper); - ses = createSession(sock); + ses = createSession(sock, msgMarsh); openSock = true; @@ -7608,7 +7608,7 @@ class ServerImpl extends TcpDiscoveryImpl { private final UUID clientNodeId; /** */ - private final TcpDiscoveryIoSession ses; + private final TcpDiscoveryMessageMarshaller msgMarsh; /** Socket. */ private final Socket sock; @@ -7638,7 +7638,7 @@ class ServerImpl extends TcpDiscoveryImpl { this.sock = sock; this.clientNodeId = clientNodeId; - ses = createSession(sock); + msgMarsh = new TcpDiscoveryMessageMarshaller(spi); lastMetricsUpdateMsgTimeNanos = System.nanoTime(); } @@ -7768,7 +7768,7 @@ class ServerImpl extends TcpDiscoveryImpl { */ private void writeToSocket(T2<TcpDiscoveryAbstractMessage, byte[]> msgT, long timeout) throws IgniteCheckedException, IOException { - byte[] msgBytes = msgT.get2() == null ? ses.serializeMessage(msgT.get1()) : msgT.get2(); + byte[] msgBytes = msgT.get2() == null ? msgMarsh.marshal(msgT.get1()) : msgT.get2(); spi.writeToSocket(sock, msgT.get1(), msgBytes, timeout); } diff --git a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoveryImpl.java b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoveryImpl.java index e115a3cca03..b2e5055800e 100644 --- a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoveryImpl.java +++ b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoveryImpl.java @@ -484,7 +484,12 @@ abstract class TcpDiscoveryImpl { * @return IO session for writing and reading {@link TcpDiscoveryAbstractMessage}. */ TcpDiscoveryIoSession createSession(Socket sock) { - return new TcpDiscoveryIoSession(sock, spi); + return createSession(sock, null); + } + + /** */ + TcpDiscoveryIoSession createSession(Socket sock, TcpDiscoveryMessageMarshaller msgMarsh) { + return new TcpDiscoveryIoSession(sock, msgMarsh, spi); } /** diff --git a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoveryIoSession.java b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoveryIoSession.java index 131c228c0af..8a99c4a5456 100644 --- a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoveryIoSession.java +++ b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoveryIoSession.java @@ -26,6 +26,7 @@ import java.io.InputStream; import java.io.OutputStream; import java.io.StreamCorruptedException; import java.net.Socket; +import java.nio.ByteBuffer; import java.security.cert.Certificate; import javax.net.ssl.SSLPeerUnverifiedException; import javax.net.ssl.SSLSocket; @@ -53,7 +54,7 @@ import static org.apache.ignite.spi.communication.tcp.TcpCommunicationSpi.makeMe * </ul> * A leading byte is used to distinguish between the modes. The byte will be removed in future. */ -public class TcpDiscoveryIoSession extends TcpDiscoveryIoSerializer { +public class TcpDiscoveryIoSession { /** Default size of buffer used for buffering socket in/out. */ private static final int DFLT_SOCK_BUFFER_SIZE = 8192; @@ -67,6 +68,15 @@ public class TcpDiscoveryIoSession extends TcpDiscoveryIoSerializer { /** */ private final Socket sock; + /** */ + private final TcpDiscoveryMessageMarshaller msgMarsh; + + /** */ + private final TcpDiscoverySpi spi; + + /** */ + private final ClassLoader clsLdr; + /** */ private final DirectMessageReader msgReader; @@ -83,10 +93,11 @@ public class TcpDiscoveryIoSession extends TcpDiscoveryIoSerializer { * @param spi Discovery SPI instance owning this session. * @throws IgniteException If an I/O error occurs while initializing buffers. */ - TcpDiscoveryIoSession(Socket sock, TcpDiscoverySpi spi) { - super(spi); - + TcpDiscoveryIoSession(Socket sock, TcpDiscoveryMessageMarshaller msgMarshaller, TcpDiscoverySpi spi) { this.sock = sock; + msgMarsh = msgMarshaller; + this.spi = spi; + clsLdr = U.resolveClassLoader(spi.ignite().configuration()); msgReader = new DirectMessageReader(spi.messageFactory(), null); @@ -120,7 +131,7 @@ public class TcpDiscoveryIoSession extends TcpDiscoveryIoSerializer { try { out.write(MESSAGE_SERIALIZATION); - serializeMessage((Message)msg, out); + msgMarsh.marshal((Message)msg, out); out.flush(); } @@ -157,6 +168,8 @@ public class TcpDiscoveryIoSession extends TcpDiscoveryIoSerializer { Message msg = spi.messageFactory().create(makeMessageType((byte)in.read(), (byte)in.read())); + ByteBuffer msgBuf = msgMarsh.msgBuf; + msgReader.reset(); msgReader.setBuffer(msgBuf); diff --git a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoveryIoSerializer.java b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoveryMessageMarshaller.java similarity index 81% rename from modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoveryIoSerializer.java rename to modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoveryMessageMarshaller.java index d8cc0564f0f..00b915fc1f7 100644 --- a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoveryIoSerializer.java +++ b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoveryMessageMarshaller.java @@ -14,7 +14,6 @@ * See the License for the specific language governing permissions and * limitations under the License. */ - package org.apache.ignite.spi.discovery.tcp; import java.io.ByteArrayOutputStream; @@ -28,21 +27,16 @@ import org.apache.ignite.plugin.extensions.communication.Message; import org.apache.ignite.plugin.extensions.communication.MessageSerializer; import org.apache.ignite.spi.discovery.tcp.messages.TcpDiscoveryAbstractMessage; -/** - * Serializer of messages. Converts discovery messages into bytes. - */ -public class TcpDiscoveryIoSerializer { +/** */ +public class TcpDiscoveryMessageMarshaller { /** Size for an intermediate buffer for serializing discovery messages. */ - static final int MSG_BUFFER_SIZE = 100; + static final int MSG_BUFFER_SIZE = 128; /** */ - final TcpDiscoverySpi spi; - - /** Loads discovery messages classes during java deserialization. */ - final ClassLoader clsLdr; + private final TcpDiscoverySpi spi; /** */ - final DirectMessageWriter msgWriter; + private final DirectMessageWriter msgWriter; /** Intermediate buffer for serializing discovery messages. */ final ByteBuffer msgBuf; @@ -50,11 +44,9 @@ public class TcpDiscoveryIoSerializer { /** * @param spi Discovery SPI instance. */ - public TcpDiscoveryIoSerializer(TcpDiscoverySpi spi) { + public TcpDiscoveryMessageMarshaller(TcpDiscoverySpi spi) { this.spi = spi; - clsLdr = U.resolveClassLoader(spi.ignite().configuration()); - msgBuf = ByteBuffer.allocate(MSG_BUFFER_SIZE); msgWriter = new DirectMessageWriter(spi.messageFactory()); @@ -68,12 +60,12 @@ public class TcpDiscoveryIoSerializer { * @throws IgniteCheckedException If serialization fails. * @throws IOException If serialization fails. */ - byte[] serializeMessage(TcpDiscoveryAbstractMessage msg) throws IgniteCheckedException, IOException { + byte[] marshal(TcpDiscoveryAbstractMessage msg) throws IgniteCheckedException, IOException { if (!(msg instanceof Message)) return U.marshal(spi.marshaller(), msg); try (ByteArrayOutputStream out = new ByteArrayOutputStream()) { - serializeMessage((Message)msg, out); + marshal((Message)msg, out); return out.toByteArray(); } @@ -87,7 +79,7 @@ public class TcpDiscoveryIoSerializer { * @param out Output stream to write serialized message. * @throws IOException If serialization fails. */ - void serializeMessage(Message m, OutputStream out) throws IOException { + void marshal(Message m, OutputStream out) throws IOException { MessageSerializer msgSer = spi.messageFactory().serializer(m.directType()); msgWriter.reset();
