This is an automated email from the ASF dual-hosted git repository.

av pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/ignite.git


The following commit(s) were added to refs/heads/master by this push:
     new 737c550270f IGNITE-28286 : Use MarshallableMessage for GridIoMessage 
and GridEventStorageMessage v2 (#12891)
737c550270f is described below

commit 737c550270f62ee791423a81aa433f3ad2d4cd9d
Author: Vladimir Steshin <[email protected]>
AuthorDate: Thu Mar 19 20:38:57 2026 +0300

    IGNITE-28286 : Use MarshallableMessage for GridIoMessage and 
GridEventStorageMessage v2 (#12891)
---
 .../managers/communication/GridIoManager.java      | 24 ------------
 .../managers/communication/GridIoMessage.java      | 23 ++++++-----
 .../communication/GridIoMessageFactory.java        | 11 ++++--
 .../deployment/GridDeploymentCommunication.java    |  6 ---
 .../eventstorage/GridEventStorageManager.java      | 19 +---------
 .../eventstorage/GridEventStorageMessage.java      | 44 +++++++++++-----------
 .../ignite/p2p/GridP2PHotRedeploymentSelfTest.java | 16 --------
 7 files changed, 42 insertions(+), 101 deletions(-)

diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoManager.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoManager.java
index 446546b8962..7a164d1b6d5 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoManager.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoManager.java
@@ -1199,15 +1199,6 @@ public class GridIoManager extends 
GridManagerAdapter<CommunicationSpi<Object>>
                 return;
             }
 
-            if (initMsg.topic() == null) {
-                int topicOrd = initMsg.topicOrdinal();
-
-                if (topicOrd >= 0)
-                    initMsg.topic(GridTopic.fromOrdinal(topicOrd));
-                else
-                    initMsg.finishUnmarshal(marsh, 
U.resolveClassLoader(ctx.config()));
-            }
-
             byte plc = initMsg.policy();
 
             pools.poolForPolicy(plc).execute(new Runnable() {
@@ -1248,15 +1239,6 @@ public class GridIoManager extends 
GridManagerAdapter<CommunicationSpi<Object>>
                 return;
             }
 
-            if (msg.topic() == null) {
-                int topicOrd = msg.topicOrdinal();
-
-                if (topicOrd >= 0)
-                    msg.topic(GridTopic.fromOrdinal(topicOrd));
-                else
-                    msg.finishUnmarshal(marsh, 
U.resolveClassLoader(ctx.config()));
-            }
-
             if (!started) {
                 lock.readLock().lock();
 
@@ -1980,9 +1962,6 @@ public class GridIoManager extends 
GridManagerAdapter<CommunicationSpi<Object>>
         );
 
         try {
-            if (topicOrd < 0)
-                ioMsg.prepareMarshal(marsh);
-
             return 
((TcpCommunicationSpi)(CommunicationSpi)getSpi()).openChannel(node, ioMsg);
         }
         catch (IgniteSpiException e) {
@@ -2054,9 +2033,6 @@ public class GridIoManager extends 
GridManagerAdapter<CommunicationSpi<Object>>
                     ackC.apply(null);
             }
             else {
-                if (topicOrd < 0)
-                    ioMsg.prepareMarshal(marsh);
-
                 try {
                     if ((CommunicationSpi<?>)getSpi() instanceof 
TcpCommunicationSpi)
                         getTcpCommunicationSpi().sendMessage(node, ioMsg, 
ackC);
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoMessage.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoMessage.java
index b42b8df962a..906df7c9ff6 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoMessage.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoMessage.java
@@ -19,6 +19,7 @@ package org.apache.ignite.internal.managers.communication;
 
 import org.apache.ignite.IgniteCheckedException;
 import org.apache.ignite.internal.ExecutorAwareMessage;
+import org.apache.ignite.internal.GridTopic;
 import org.apache.ignite.internal.Order;
 import org.apache.ignite.internal.processors.cache.GridCacheMessage;
 import org.apache.ignite.internal.processors.datastreamer.DataStreamerRequest;
@@ -27,13 +28,14 @@ import 
org.apache.ignite.internal.util.tostring.GridToStringInclude;
 import org.apache.ignite.internal.util.typedef.internal.S;
 import org.apache.ignite.internal.util.typedef.internal.U;
 import org.apache.ignite.marshaller.Marshaller;
+import org.apache.ignite.plugin.extensions.communication.MarshallableMessage;
 import org.apache.ignite.plugin.extensions.communication.Message;
 import org.jetbrains.annotations.Nullable;
 
 /**
  * Wrapper for all grid messages.
  */
-public class GridIoMessage implements Message, SpanTransport {
+public class GridIoMessage implements MarshallableMessage, SpanTransport {
     /** */
     public static final Integer STRIPE_DISABLED_PART = Integer.MIN_VALUE;
 
@@ -216,24 +218,21 @@ public class GridIoMessage implements Message, 
SpanTransport {
         return null;
     }
 
-    /**
-     * @param marsh Marshaller.
-     */
-    public void prepareMarshal(Marshaller marsh) throws IgniteCheckedException 
{
-        if (topic != null && topicBytes == null)
+    /** {@inheritDoc} */
+    @Override public void prepareMarshal(Marshaller marsh) throws 
IgniteCheckedException {
+        if (topicOrd < 0 && topic != null)
             topicBytes = U.marshal(marsh, topic);
     }
 
-    /**
-     * @param marsh Marshaller.
-     * @param ldr Class loader.
-     */
-    public void finishUnmarshal(Marshaller marsh, ClassLoader ldr) throws 
IgniteCheckedException {
-        if (topicBytes != null && topic == null) {
+    /** {@inheritDoc} */
+    @Override public void finishUnmarshal(Marshaller marsh, ClassLoader ldr) 
throws IgniteCheckedException {
+        if (topicOrd < 0 && topicBytes != null) {
             topic = U.unmarshal(marsh, topicBytes, ldr);
 
             topicBytes = null;
         }
+        else if (topicOrd >= 0)
+            topic = GridTopic.fromOrdinal(topicOrd);
     }
 
     /** {@inheritDoc} */
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoMessageFactory.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoMessageFactory.java
index a3379034634..65480e0b48e 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoMessageFactory.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoMessageFactory.java
@@ -62,7 +62,7 @@ import 
org.apache.ignite.internal.managers.encryption.GenerateEncryptionKeyReque
 import 
org.apache.ignite.internal.managers.encryption.GenerateEncryptionKeyResponse;
 import 
org.apache.ignite.internal.managers.encryption.GenerateEncryptionKeyResponseSerializer;
 import 
org.apache.ignite.internal.managers.eventstorage.GridEventStorageMessage;
-import 
org.apache.ignite.internal.managers.eventstorage.GridEventStorageMessageSerializer;
+import 
org.apache.ignite.internal.managers.eventstorage.GridEventStorageMessageMarshallableSerializer;
 import 
org.apache.ignite.internal.processors.authentication.UserAuthenticateRequestMessage;
 import 
org.apache.ignite.internal.processors.authentication.UserAuthenticateRequestMessageSerializer;
 import 
org.apache.ignite.internal.processors.authentication.UserAuthenticateResponseMessage;
@@ -405,12 +405,14 @@ public class GridIoMessageFactory implements 
MessageFactoryProvider {
         factory.register((short)5, GridTaskCancelRequest::new, new 
GridTaskCancelRequestSerializer());
         factory.register((short)6, GridTaskSessionRequest::new, new 
GridTaskSessionRequestSerializer());
         factory.register((short)7, GridCheckpointRequest::new, new 
GridCheckpointRequestSerializer());
-        factory.register((short)8, GridIoMessage::new, new 
GridIoMessageSerializer());
+        factory.register((short)8, GridIoMessage::new,
+            new GridIoMessageMarshallableSerializer(cstDataMarshall, 
cstDataMarshallClsLdr));
         factory.register((short)9, GridIoUserMessage::new, new 
GridIoUserMessageSerializer());
         factory.register((short)10, GridDeploymentInfoBean::new, new 
GridDeploymentInfoBeanSerializer());
         factory.register((short)11, GridDeploymentRequest::new, new 
GridDeploymentRequestSerializer());
         factory.register((short)12, GridDeploymentResponse::new, new 
GridDeploymentResponseSerializer());
-        factory.register((short)13, GridEventStorageMessage::new, new 
GridEventStorageMessageSerializer());
+        factory.register((short)13, GridEventStorageMessage::new,
+            new GridEventStorageMessageMarshallableSerializer(cstDataMarshall, 
cstDataMarshallClsLdr));
         factory.register((short)16, GridCacheTxRecoveryRequest::new, new 
GridCacheTxRecoveryRequestSerializer());
         factory.register((short)17, GridCacheTxRecoveryResponse::new, new 
GridCacheTxRecoveryResponseSerializer());
         factory.register((short)18, IndexQueryResultMeta::new, new 
IndexQueryResultMetaSerializer());
@@ -517,7 +519,8 @@ public class GridIoMessageFactory implements 
MessageFactoryProvider {
         factory.register((short)169, ServiceSingleNodeDeploymentResult::new, 
new ServiceSingleNodeDeploymentResultSerializer());
         factory.register(GridQueryKillRequest.TYPE_CODE, 
GridQueryKillRequest::new, new GridQueryKillRequestSerializer());
         factory.register(GridQueryKillResponse.TYPE_CODE, 
GridQueryKillResponse::new, new GridQueryKillResponseSerializer());
-        factory.register(GridIoSecurityAwareMessage.TYPE_CODE, 
GridIoSecurityAwareMessage::new, new GridIoSecurityAwareMessageSerializer());
+        factory.register(GridIoSecurityAwareMessage.TYPE_CODE, 
GridIoSecurityAwareMessage::new,
+            new 
GridIoSecurityAwareMessageMarshallableSerializer(cstDataMarshall, 
cstDataMarshallClsLdr));
         factory.register(SessionChannelMessage.TYPE_CODE, 
SessionChannelMessage::new, new SessionChannelMessageSerializer());
         factory.register(SingleNodeMessage.TYPE_CODE, SingleNodeMessage::new);
         factory.register((short)177, TcpInverseConnectionResponseMessage::new, 
new TcpInverseConnectionResponseMessageSerializer());
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/managers/deployment/GridDeploymentCommunication.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/managers/deployment/GridDeploymentCommunication.java
index c83d39e4bf9..f6c8b0c6117 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/managers/deployment/GridDeploymentCommunication.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/managers/deployment/GridDeploymentCommunication.java
@@ -42,7 +42,6 @@ import org.apache.ignite.internal.util.typedef.internal.SB;
 import org.apache.ignite.internal.util.typedef.internal.U;
 import org.apache.ignite.lang.IgniteNotPeerDeployable;
 import org.apache.ignite.lang.IgniteUuid;
-import org.apache.ignite.marshaller.Marshaller;
 import org.apache.ignite.plugin.extensions.communication.Message;
 
 import static org.apache.ignite.events.EventType.EVT_NODE_FAILED;
@@ -73,9 +72,6 @@ class GridDeploymentCommunication {
     /** */
     private final GridBusyLock busyLock = new GridBusyLock();
 
-    /** */
-    private final Marshaller marsh;
-
     /**
      * Creates new instance of deployment communication.
      *
@@ -93,8 +89,6 @@ class GridDeploymentCommunication {
                 processDeploymentRequest(nodeId, msg);
             }
         };
-
-        marsh = ctx.marshaller();
     }
 
     /**
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/managers/eventstorage/GridEventStorageManager.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/managers/eventstorage/GridEventStorageManager.java
index fcfa3e5b37d..38a773de2bb 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/managers/eventstorage/GridEventStorageManager.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/managers/eventstorage/GridEventStorageManager.java
@@ -1025,15 +1025,6 @@ public class GridEventStorageManager extends 
GridManagerAdapter<EventStorageSpi>
 
                 GridEventStorageMessage res = (GridEventStorageMessage)msg;
 
-                try {
-                    res.finishUnmarshal(marsh, 
U.resolveClassLoader(ctx.config()), null);
-                }
-                catch (IgniteCheckedException e) {
-                    U.error(log, "Failed to unmarshal events query response: " 
+ msg, e);
-
-                    return;
-                }
-
                 synchronized (qryMux) {
                     if (uids.remove(nodeId)) {
                         if (res.events() != null)
@@ -1146,11 +1137,8 @@ public class GridEventStorageManager extends 
GridManagerAdapter<EventStorageSpi>
         if (locNode != null)
             ctx.io().sendToGridTopic(locNode, topic, msg, plc);
 
-        if (!rmtNodes.isEmpty()) {
-            msg.prepareMarshal(marsh);
-
+        if (!rmtNodes.isEmpty())
             ctx.io().sendToGridTopic(rmtNodes, topic, msg, plc);
-        }
     }
 
     /**
@@ -1216,7 +1204,7 @@ public class GridEventStorageManager extends 
GridManagerAdapter<EventStorageSpi>
                         throw new IgniteDeploymentCheckedException("Failed to 
obtain deployment for event filter " +
                             "(is peer class loading turned on?): " + req);
 
-                    req.finishUnmarshal(marsh, 
U.resolveClassLoader(ctx.config()), U.resolveClassLoader(dep.classLoader(), 
ctx.config()));
+                    req.finishUnmarshalFilters(marsh, 
U.resolveClassLoader(dep.classLoader(), ctx.config()));
 
                     filter = (IgnitePredicate<Event>)req.filter();
 
@@ -1252,9 +1240,6 @@ public class GridEventStorageManager extends 
GridManagerAdapter<EventStorageSpi>
                     if (log.isDebugEnabled())
                         log.debug("Sending event query response to node 
[nodeId=" + nodeId + "res=" + res + ']');
 
-                    if (!ctx.localNodeId().equals(nodeId))
-                        res.prepareMarshal(marsh);
-
                     ctx.io().sendToCustomTopic(node, req.responseTopic(), res, 
PUBLIC_POOL);
                 }
                 catch (ClusterTopologyCheckedException e) {
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/managers/eventstorage/GridEventStorageMessage.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/managers/eventstorage/GridEventStorageMessage.java
index dade283bc0d..263451596e8 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/managers/eventstorage/GridEventStorageMessage.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/managers/eventstorage/GridEventStorageMessage.java
@@ -32,13 +32,13 @@ import org.apache.ignite.internal.util.typedef.internal.U;
 import org.apache.ignite.lang.IgnitePredicate;
 import org.apache.ignite.lang.IgniteUuid;
 import org.apache.ignite.marshaller.Marshaller;
-import org.apache.ignite.plugin.extensions.communication.Message;
+import org.apache.ignite.plugin.extensions.communication.MarshallableMessage;
 import org.jetbrains.annotations.Nullable;
 
 /**
  * Event storage message.
  */
-public class GridEventStorageMessage implements Message {
+public class GridEventStorageMessage implements MarshallableMessage {
     /** */
     private Object resTopic;
 
@@ -205,42 +205,42 @@ public class GridEventStorageMessage implements Message {
         return ErrorMessage.error(errMsg);
     }
 
-    /**
-     * @param marsh Marshaller.
-     */
-    public void prepareMarshal(Marshaller marsh) throws IgniteCheckedException 
{
-        if (resTopic != null && resTopicBytes == null)
+    /** {@inheritDoc} */
+    @Override public void prepareMarshal(Marshaller marsh) throws 
IgniteCheckedException {
+        if (resTopic != null)
             resTopicBytes = U.marshal(marsh, resTopic);
 
-        if (filter != null && filterBytes == null)
+        if (filter != null)
             filterBytes = U.marshal(marsh, filter);
 
-        if (evts != null && evtsBytes == null)
+        if (evts != null)
             evtsBytes = U.marshal(marsh, evts);
     }
 
-    /**
-     * @param marsh Marshaller.
-     * @param ldr Class loader.
-     * @param filterClsLdr Class loader for filter.
-     */
-    public void finishUnmarshal(Marshaller marsh, ClassLoader ldr, ClassLoader 
filterClsLdr) throws IgniteCheckedException {
-        if (resTopicBytes != null && resTopic == null) {
+    /** {@inheritDoc} */
+    @Override public void finishUnmarshal(Marshaller marsh, ClassLoader ldr) 
throws IgniteCheckedException {
+        if (resTopicBytes != null) {
             resTopic = U.unmarshal(marsh, resTopicBytes, ldr);
 
             resTopicBytes = null;
         }
 
-        if (filterBytes != null && filter == null && filterClsLdr != null) {
-            filter = U.unmarshal(marsh, filterBytes, filterClsLdr);
+        if (evtsBytes != null) {
+            evts = U.unmarshal(marsh, evtsBytes, ldr);
 
-            filterBytes = null;
+            evtsBytes = null;
         }
+    }
 
-        if (evtsBytes != null && evts == null) {
-            evts = U.unmarshal(marsh, evtsBytes, ldr);
+    /**
+     * @param marsh Marshaller.
+     * @param filterClsLdr Class loader for filter.
+     */
+    public void finishUnmarshalFilters(Marshaller marsh, ClassLoader 
filterClsLdr) throws IgniteCheckedException {
+        if (filterBytes != null && filter == null) {
+            filter = U.unmarshal(marsh, filterBytes, filterClsLdr);
 
-            evtsBytes = null;
+            filterBytes = null;
         }
     }
 
diff --git 
a/modules/core/src/test/java/org/apache/ignite/p2p/GridP2PHotRedeploymentSelfTest.java
 
b/modules/core/src/test/java/org/apache/ignite/p2p/GridP2PHotRedeploymentSelfTest.java
index 70db1390a16..2cc6e5b9dee 100644
--- 
a/modules/core/src/test/java/org/apache/ignite/p2p/GridP2PHotRedeploymentSelfTest.java
+++ 
b/modules/core/src/test/java/org/apache/ignite/p2p/GridP2PHotRedeploymentSelfTest.java
@@ -122,18 +122,6 @@ public class GridP2PHotRedeploymentSelfTest extends 
GridCommonAbstractTest {
             assert taskCls1.getClassLoader() != taskCls2.getClassLoader();
             assert taskCls1 != taskCls2;
 
-//            final AtomicBoolean undeployed = new AtomicBoolean(false);
-//
-//            grid2.events().localListen(new GridLocalEventListener() {
-//                @Override public void onEvent(GridEvent evt) {
-//                    if (evt.type() == EVT_TASK_UNDEPLOYED) {
-//                        assert 
((GridDeploymentEvent)evt).alias().equals(TASK_NAME);
-//
-//                        undeployed.set(true);
-//                    }
-//                }
-//            }, EVT_TASK_UNDEPLOYED);
-
             ignite2.compute().localDeployTask(taskCls1, 
taskCls1.getClassLoader());
 
             Integer res1 = ignite1.compute().execute(taskCls1, 
Collections.singletonList(ignite2.cluster().localNode().id()));
@@ -149,10 +137,6 @@ public class GridP2PHotRedeploymentSelfTest extends 
GridCommonAbstractTest {
             info("Result2: " + res2);
 
             assert !res1.equals(res2);
-
-//            Thread.sleep(P2P_TIMEOUT * 2);
-//
-//            assert undeployed.get();
         }
         finally {
             stopGrid(2);

Reply via email to