This is an automated email from the ASF dual-hosted git repository.

namelchev pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/ignite.git


The following commit(s) were added to refs/heads/master by this push:
     new 40d9b614bf8 IGNITE-28061 Migrate ZookeeperDiscoverySpi to new 
serialization framework for discovery custom messages (#12860)
40d9b614bf8 is described below

commit 40d9b614bf876b7b59d29e199ab8c018504f3d32
Author: Nikita Amelchev <[email protected]>
AuthorDate: Wed Mar 11 17:22:19 2026 +0300

    IGNITE-28061 Migrate ZookeeperDiscoverySpi to new serialization framework 
for discovery custom messages (#12860)
---
 .../zk/internal/DiscoveryMessageParser.java        | 167 +++++++++++++++++++++
 .../zk/internal/ZkDiscoveryCustomEventData.java    |  21 ++-
 .../zk/internal/ZookeeperDiscoveryImpl.java        |  59 +++-----
 3 files changed, 208 insertions(+), 39 deletions(-)

diff --git 
a/modules/zookeeper/src/main/java/org/apache/ignite/spi/discovery/zk/internal/DiscoveryMessageParser.java
 
b/modules/zookeeper/src/main/java/org/apache/ignite/spi/discovery/zk/internal/DiscoveryMessageParser.java
new file mode 100644
index 00000000000..bd2dd5857b5
--- /dev/null
+++ 
b/modules/zookeeper/src/main/java/org/apache/ignite/spi/discovery/zk/internal/DiscoveryMessageParser.java
@@ -0,0 +1,167 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.spi.discovery.zk.internal;
+
+import java.io.ByteArrayInputStream;
+import java.io.ByteArrayOutputStream;
+import java.io.EOFException;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
+import java.nio.ByteBuffer;
+import java.util.zip.DeflaterOutputStream;
+import java.util.zip.InflaterInputStream;
+import org.apache.ignite.internal.direct.DirectMessageReader;
+import org.apache.ignite.internal.direct.DirectMessageWriter;
+import 
org.apache.ignite.internal.managers.communication.IgniteMessageFactoryImpl;
+import org.apache.ignite.internal.managers.discovery.DiscoveryCustomMessage;
+import org.apache.ignite.internal.managers.discovery.DiscoveryMessageFactory;
+import org.apache.ignite.internal.util.typedef.internal.U;
+import org.apache.ignite.marshaller.Marshaller;
+import org.apache.ignite.marshaller.jdk.JdkMarshaller;
+import org.apache.ignite.plugin.extensions.communication.Message;
+import org.apache.ignite.plugin.extensions.communication.MessageFactory;
+import 
org.apache.ignite.plugin.extensions.communication.MessageFactoryProvider;
+import org.apache.ignite.plugin.extensions.communication.MessageSerializer;
+import org.apache.ignite.spi.IgniteSpiException;
+
+import static 
org.apache.ignite.spi.communication.tcp.TcpCommunicationSpi.makeMessageType;
+
+/**
+ * Class is responsible for serializing discovery messages using RU-ready 
{@link MessageSerializer} mechanism.
+ */
+public class DiscoveryMessageParser {
+    /** Leading byte for messages use {@link JdkMarshaller} for serialization. 
*/
+    // TODO: remove these flags after refactoring all discovery messages.
+    private static final byte JAVA_SERIALIZATION = (byte)1;
+
+    /** Leading byte for messages use {@link MessageSerializer} for 
serialization. */
+    private static final byte MESSAGE_SERIALIZATION = (byte)2;
+
+    /** Size for an intermediate buffer for serializing discovery messages. */
+    private static final int MSG_BUFFER_SIZE = 100;
+
+    /** */
+    private final MessageFactory msgFactory;
+
+    /** */
+    private final Marshaller marsh;
+
+    /** */
+    public DiscoveryMessageParser(Marshaller marsh) {
+        this.marsh = marsh;
+        this.msgFactory = new IgniteMessageFactoryImpl(
+            new MessageFactoryProvider[] { new DiscoveryMessageFactory(null, 
null) });
+    }
+
+    /** Marshals discovery message to bytes array. */
+    public byte[] marshalZip(DiscoveryCustomMessage msg) {
+        ByteArrayOutputStream baos = new ByteArrayOutputStream();
+
+        try (DeflaterOutputStream out = new DeflaterOutputStream(baos)) {
+            if (msg instanceof Message) {
+                out.write(MESSAGE_SERIALIZATION);
+
+                serializeMessage((Message)msg, out);
+            }
+            else {
+                out.write(JAVA_SERIALIZATION);
+
+                U.marshal(marsh, msg, out);
+            }
+        }
+        catch (Exception e) {
+            throw new IgniteSpiException("Failed to serialize message: " + 
msg, e);
+        }
+
+        return baos.toByteArray();
+    }
+
+    /** Unmarshals discovery message from bytes array. */
+    public DiscoveryCustomMessage unmarshalZip(byte[] bytes) {
+        try (
+            ByteArrayInputStream bais = new ByteArrayInputStream(bytes);
+            InflaterInputStream in = new InflaterInputStream(bais)
+        ) {
+            byte mode = (byte)in.read();
+
+            if (mode == JAVA_SERIALIZATION)
+                return U.unmarshal(marsh, in, U.gridClassLoader());
+
+            if (MESSAGE_SERIALIZATION != mode)
+                throw new IOException("Received unexpected byte while reading 
discovery message: " + mode);
+
+            return (DiscoveryCustomMessage)deserializeMessage(in);
+        }
+        catch (Exception e) {
+            throw new IgniteSpiException("Failed to deserialize message.", e);
+        }
+    }
+
+    /** */
+    private void serializeMessage(Message m, OutputStream out) throws 
IOException {
+        DirectMessageWriter msgWriter = new DirectMessageWriter(msgFactory);
+        ByteBuffer msgBuf = ByteBuffer.allocate(MSG_BUFFER_SIZE);
+
+        msgWriter.setBuffer(msgBuf);
+
+        MessageSerializer msgSer = msgFactory.serializer(m.directType());
+
+        boolean finished;
+
+        do {
+            msgBuf.clear();
+
+            finished = msgSer.writeTo(m, msgWriter);
+
+            out.write(msgBuf.array(), 0, msgBuf.position());
+        }
+        while (!finished);
+    }
+
+    /** */
+    private Message deserializeMessage(InputStream in) throws IOException {
+        DirectMessageReader msgReader = new DirectMessageReader(msgFactory, 
null);
+        ByteBuffer msgBuf = ByteBuffer.allocate(MSG_BUFFER_SIZE);
+
+        msgReader.setBuffer(msgBuf);
+
+        Message msg = msgFactory.create(makeMessageType((byte)in.read(), 
(byte)in.read()));
+        MessageSerializer msgSer = msgFactory.serializer(msg.directType());
+
+        boolean finished;
+
+        do {
+            int read = in.read(msgBuf.array(), msgBuf.position(), 
msgBuf.remaining());
+
+            if (read == -1)
+                throw new EOFException("Stream closed before message was fully 
read.");
+
+            msgBuf.limit(msgBuf.position() + read);
+            msgBuf.rewind();
+
+            finished = msgSer.readFrom(msg, msgReader);
+
+            if (!finished)
+                msgBuf.compact();
+        }
+        while (!finished);
+
+        return msg;
+    }
+}
diff --git 
a/modules/zookeeper/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZkDiscoveryCustomEventData.java
 
b/modules/zookeeper/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZkDiscoveryCustomEventData.java
index a4db36079e8..87754e169c3 100644
--- 
a/modules/zookeeper/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZkDiscoveryCustomEventData.java
+++ 
b/modules/zookeeper/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZkDiscoveryCustomEventData.java
@@ -20,7 +20,6 @@ package org.apache.ignite.spi.discovery.zk.internal;
 import java.util.UUID;
 import org.apache.ignite.internal.managers.discovery.DiscoveryCustomMessage;
 import org.apache.ignite.internal.util.typedef.F;
-import org.apache.ignite.spi.discovery.DiscoverySpiCustomMessage;
 
 /**
  *
@@ -38,8 +37,8 @@ class ZkDiscoveryCustomEventData extends ZkDiscoveryEventData 
{
     /** */
     final String evtPath;
 
-    /** Message instance (can be marshalled as part of 
ZkDiscoveryCustomEventData or stored in separate znode. */
-    DiscoverySpiCustomMessage msg;
+    /** Message (can be marshalled as part of ZkDiscoveryCustomEventData or 
stored in separate znode. */
+    byte[] msgBytes;
 
     /** Unmarshalled message. */
     transient DiscoveryCustomMessage resolvedMsg;
@@ -57,7 +56,7 @@ class ZkDiscoveryCustomEventData extends ZkDiscoveryEventData 
{
         long origEvtId,
         long topVer,
         UUID sndNodeId,
-        DiscoverySpiCustomMessage msg,
+        DiscoveryCustomMessage msg,
         String evtPath
     ) {
         super(evtId, ZK_EVT_CUSTOM_EVT, topVer);
@@ -66,11 +65,23 @@ class ZkDiscoveryCustomEventData extends 
ZkDiscoveryEventData {
         assert msg != null || origEvtId != 0 || !F.isEmpty(evtPath);
 
         this.origEvtId = origEvtId;
-        this.msg = msg;
+        this.resolvedMsg = msg;
         this.sndNodeId = sndNodeId;
         this.evtPath = evtPath;
     }
 
+    /** */
+    public void prepareMarshal(DiscoveryMessageParser parser) {
+        if (resolvedMsg != null)
+            msgBytes = parser.marshalZip(resolvedMsg);
+    }
+
+    /** */
+    public void finishUnmarshal(DiscoveryMessageParser parser) {
+        if (msgBytes != null)
+            resolvedMsg = parser.unmarshalZip(msgBytes);
+    }
+
     /**
      * @return {@code True} for custom event ack message.
      */
diff --git 
a/modules/zookeeper/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZookeeperDiscoveryImpl.java
 
b/modules/zookeeper/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZookeeperDiscoveryImpl.java
index fd414cf3da0..ddc79f498b0 100644
--- 
a/modules/zookeeper/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZookeeperDiscoveryImpl.java
+++ 
b/modules/zookeeper/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZookeeperDiscoveryImpl.java
@@ -216,6 +216,9 @@ public class ZookeeperDiscoveryImpl {
     /** */
     private final ZookeeperDiscoveryStatistics stats;
 
+    /** */
+    private final DiscoveryMessageParser msgParser;
+
     /**
      * @param spi Discovery SPI.
      * @param igniteInstanceName Instance name.
@@ -262,6 +265,8 @@ public class ZookeeperDiscoveryImpl {
         this.evtsAckThreshold = evtsAckThreshold;
 
         this.stats = stats;
+
+        msgParser = new DiscoveryMessageParser(marsh);
     }
 
     /**
@@ -666,14 +671,7 @@ public class ZookeeperDiscoveryImpl {
         if (!hasServerNode)
             throw new IgniteException("Failed to send custom message: no 
server nodes in topology.");
 
-        byte[] msgBytes;
-
-        try {
-            msgBytes = marshalZip(msg);
-        }
-        catch (IgniteCheckedException e) {
-            throw new IgniteSpiException("Failed to marshal custom message: " 
+ msg, e);
-        }
+        byte[] msgBytes = msgParser.marshalZip(msg);
 
         while (!busyLock.enterBusy())
             checkState();
@@ -1486,6 +1484,8 @@ public class ZookeeperDiscoveryImpl {
             new ZkNoServersMessage(),
             null);
 
+        evtData.prepareMarshal(msgParser);
+
         Collection<ZookeeperClusterNode> nodesToAck = Collections.emptyList();
 
         evtsData.addEvent(nodesToAck, evtData);
@@ -1514,11 +1514,13 @@ public class ZookeeperDiscoveryImpl {
             if (evtData instanceof ZkDiscoveryCustomEventData) {
                 ZkDiscoveryCustomEventData evtData0 = 
(ZkDiscoveryCustomEventData)evtData;
 
+                evtData0.finishUnmarshal(msgParser);
+
                 // It is possible previous coordinator failed before finished 
cleanup.
-                if (evtData0.msg instanceof 
ZkCommunicationErrorResolveFinishMessage) {
+                if (evtData0.resolvedMsg instanceof 
ZkCommunicationErrorResolveFinishMessage) {
                     try {
                         ZkCommunicationErrorResolveFinishMessage msg =
-                            
(ZkCommunicationErrorResolveFinishMessage)evtData0.msg;
+                            
(ZkCommunicationErrorResolveFinishMessage)evtData0.resolvedMsg;
 
                         ZkCommunicationErrorResolveResult res = unmarshalZip(
                             
ZkDistributedCollectDataFuture.readResult(rtState.zkClient, zkPaths, 
msg.futId));
@@ -2472,7 +2474,7 @@ public class ZookeeperDiscoveryImpl {
                 DiscoveryCustomMessage msg;
 
                 try {
-                    msg = unmarshalZip(evtBytes);
+                    msg = msgParser.unmarshalZip(evtBytes);
                 }
                 catch (Exception e) {
                     U.error(log, "Failed to unmarshal custom discovery 
message: " + e, e);
@@ -2559,11 +2561,9 @@ public class ZookeeperDiscoveryImpl {
             0L,
             evtsData.topVer,
             sndNode.id(),
-            null,
+            msg,
             evtPath);
 
-        evtData.resolvedMsg = msg;
-
         if (log.isDebugEnabled())
             log.debug("Generated CUSTOM event [evt=" + evtData + ", msg=" + 
msg + ']');
 
@@ -2746,19 +2746,16 @@ public class ZookeeperDiscoveryImpl {
                         if (evtData0.ackEvent() && evtData0.topologyVersion() 
< locNode.order())
                             break;
 
-                        DiscoveryCustomMessage msg;
+                        evtData0.finishUnmarshal(msgParser);
 
-                        if (rtState.crd) {
+                        if (rtState.crd)
                             assert evtData0.resolvedMsg != null : evtData0;
-
-                            msg = evtData0.resolvedMsg;
-                        }
                         else {
-                            if (evtData0.msg == null) {
+                            if (evtData0.resolvedMsg == null) {
                                 if (evtData0.ackEvent()) {
                                     String path = 
zkPaths.ackEventDataPath(evtData0.origEvtId);
 
-                                    msg = unmarshalZip(zkClient.getData(path));
+                                    evtData0.resolvedMsg = 
msgParser.unmarshalZip(zkClient.getData(path));
                                 }
                                 else {
                                     assert evtData0.evtPath != null : evtData0;
@@ -2767,19 +2764,15 @@ public class ZookeeperDiscoveryImpl {
                                         evtData0.evtPath,
                                         evtData0.sndNodeId);
 
-                                    msg = unmarshalZip(msgBytes);
+                                    evtData0.resolvedMsg = 
msgParser.unmarshalZip(msgBytes);
                                 }
                             }
-                            else
-                                msg = evtData0.msg;
-
-                            evtData0.resolvedMsg = msg;
                         }
 
-                        if (msg instanceof ZkInternalMessage)
-                            processInternalMessage(evtData0, 
(ZkInternalMessage)msg);
+                        if (evtData0.resolvedMsg instanceof ZkInternalMessage)
+                            processInternalMessage(evtData0, 
(ZkInternalMessage)evtData0.resolvedMsg);
                         else {
-                            notifyCustomEvent(evtData0, msg);
+                            notifyCustomEvent(evtData0, evtData0.resolvedMsg);
 
                             if (!evtData0.ackEvent())
                                 updateNodeInfo = true;
@@ -3455,7 +3448,7 @@ public class ZookeeperDiscoveryImpl {
             msg,
             null);
 
-        evtData.resolvedMsg = msg;
+        evtData.prepareMarshal(msgParser);
 
         evtsData.addEvent(rtState.top.nodesByOrder.values(), evtData);
 
@@ -3770,7 +3763,7 @@ public class ZookeeperDiscoveryImpl {
 
         long evtId = rtState.evtsData.evtIdGen;
 
-        byte[] ackBytes = marshalZip(ack);
+        byte[] ackBytes = msgParser.marshalZip(ack);
 
         String path = zkPaths.ackEventDataPath(origEvt.eventId());
 
@@ -3788,11 +3781,9 @@ public class ZookeeperDiscoveryImpl {
             origEvt.eventId(),
             rtState.evtsData.topVer, // Use actual topology version because 
topology version must be growing.
             locNode.id(),
-            null,
+            ack,
             null);
 
-        ackEvtData.resolvedMsg = ack;
-
         if (log.isDebugEnabled()) {
             log.debug("Generated CUSTOM event ack [origEvtId=" + 
origEvt.eventId() +
                 ", evt=" + ackEvtData +

Reply via email to