This is an automated email from the ASF dual-hosted git repository.

av pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/ignite.git


The following commit(s) were added to refs/heads/master by this push:
     new 9a8cafc05bf IGNITE-27275 Use MessageSerializer for 
ContinuousRoutineStartResultMessage (#12561)
9a8cafc05bf is described below

commit 9a8cafc05bf4675c7bca0c01548ef24390edfa5d
Author: Dmitry Werner <[email protected]>
AuthorDate: Thu Feb 5 18:03:17 2026 +0500

    IGNITE-27275 Use MessageSerializer for ContinuousRoutineStartResultMessage 
(#12561)
---
 .../communication/GridIoMessageFactory.java        |   3 +-
 .../ContinuousRoutineStartResultMessage.java       | 145 ++++++---------------
 .../continuous/GridContinuousProcessor.java        |  88 ++++---------
 3 files changed, 63 insertions(+), 173 deletions(-)

diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoMessageFactory.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoMessageFactory.java
index 83d8712dea7..919f816f62c 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoMessageFactory.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoMessageFactory.java
@@ -48,6 +48,7 @@ import 
org.apache.ignite.internal.codegen.CachePartitionsToReloadMapSerializer;
 import org.apache.ignite.internal.codegen.CacheVersionedValueSerializer;
 import 
org.apache.ignite.internal.codegen.CacheWriteSynchronizationModeMessageSerializer;
 import 
org.apache.ignite.internal.codegen.ClusterMetricsUpdateMessageSerializer;
+import 
org.apache.ignite.internal.codegen.ContinuousRoutineStartResultMessageSerializer;
 import org.apache.ignite.internal.codegen.ErrorMessageSerializer;
 import org.apache.ignite.internal.codegen.ExchangeInfoSerializer;
 import 
org.apache.ignite.internal.codegen.GenerateEncryptionKeyRequestSerializer;
@@ -460,7 +461,7 @@ public class GridIoMessageFactory implements 
MessageFactoryProvider {
         factory.register((short)132, UserAuthenticateResponseMessage::new, new 
UserAuthenticateResponseMessageSerializer());
         factory.register(ClusterMetricsUpdateMessage.TYPE_CODE, 
ClusterMetricsUpdateMessage::new,
             new ClusterMetricsUpdateMessageSerializer());
-        factory.register((short)134, ContinuousRoutineStartResultMessage::new);
+        factory.register((short)134, ContinuousRoutineStartResultMessage::new, 
new ContinuousRoutineStartResultMessageSerializer());
         factory.register((short)135, LatchAckMessage::new, new 
LatchAckMessageSerializer());
         factory.register(CacheMetricsMessage.TYPE_CODE, 
CacheMetricsMessage::new, new CacheMetricsMessageSerializer());
         factory.register(NodeMetricsMessage.TYPE_CODE, 
NodeMetricsMessage::new, new NodeMetricsMessageSerializer());
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/ContinuousRoutineStartResultMessage.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/ContinuousRoutineStartResultMessage.java
index ad0a47c770b..bf12b906a30 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/ContinuousRoutineStartResultMessage.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/ContinuousRoutineStartResultMessage.java
@@ -17,13 +17,12 @@
 
 package org.apache.ignite.internal.processors.continuous;
 
-import java.nio.ByteBuffer;
 import java.util.UUID;
+import org.apache.ignite.internal.Order;
+import org.apache.ignite.internal.managers.communication.ErrorMessage;
 import 
org.apache.ignite.internal.processors.cache.distributed.dht.preloader.CachePartitionPartialCountersMap;
 import org.apache.ignite.internal.util.typedef.internal.S;
 import org.apache.ignite.plugin.extensions.communication.Message;
-import org.apache.ignite.plugin.extensions.communication.MessageReader;
-import org.apache.ignite.plugin.extensions.communication.MessageWriter;
 import org.jetbrains.annotations.Nullable;
 
 /**
@@ -31,19 +30,16 @@ import org.jetbrains.annotations.Nullable;
  */
 public class ContinuousRoutineStartResultMessage implements Message {
     /** */
-    private static final int ERROR_FLAG = 0x01;
-
-    /** */
+    @Order(0)
     private UUID routineId;
 
     /** */
-    private byte[] errBytes;
+    @Order(value = 1, method = "errorMessage")
+    private @Nullable ErrorMessage errMsg;
 
     /** */
-    private byte[] cntrsMapBytes;
-
-    /** */
-    private int flags;
+    @Order(value = 2, method = "countersMap")
+    private CachePartitionPartialCountersMap cntrsMap;
 
     /**
      *
@@ -54,128 +50,61 @@ public class ContinuousRoutineStartResultMessage 
implements Message {
 
     /**
      * @param routineId Routine ID.
-     * @param cntrsMapBytes Marshalled {@link 
CachePartitionPartialCountersMap}.
-     * @param errBytes Error bytes.
-     * @param err {@code True} if failed to start routine.
+     * @param cntrsMap Counters map.
+     * @param err Error.
      */
-    ContinuousRoutineStartResultMessage(UUID routineId, byte[] cntrsMapBytes, 
byte[] errBytes, boolean err) {
+    ContinuousRoutineStartResultMessage(
+        UUID routineId,
+        @Nullable CachePartitionPartialCountersMap cntrsMap,
+        @Nullable Throwable err
+    ) {
         this.routineId = routineId;
-        this.cntrsMapBytes = cntrsMapBytes;
-        this.errBytes = errBytes;
+        this.cntrsMap = cntrsMap;
 
-        if (err)
-            flags |= ERROR_FLAG;
+        if (err != null)
+            errMsg = new ErrorMessage(err);
     }
 
     /**
-     * @return Marshalled {@link CachePartitionPartialCountersMap}.
+     * @return Counters map.
      */
-    @Nullable byte[] countersMapBytes() {
-        return cntrsMapBytes;
+    public @Nullable CachePartitionPartialCountersMap countersMap() {
+        return cntrsMap;
     }
 
     /**
-     * @return {@code True} if failed to start routine.
+     * @param cntrsMap Counters map.
      */
-    boolean error() {
-        return (flags & ERROR_FLAG) != 0;
+    public void countersMap(@Nullable CachePartitionPartialCountersMap 
cntrsMap) {
+        this.cntrsMap = cntrsMap;
     }
 
     /**
      * @return Routine ID.
      */
-    UUID routineId() {
+    public UUID routineId() {
         return routineId;
     }
 
     /**
-     * @return Error bytes.
+     * @param routineId Routine ID.
      */
-    @Nullable byte[] errorBytes() {
-        return errBytes;
+    public void routineId(UUID routineId) {
+        this.routineId = routineId;
     }
 
-    /** {@inheritDoc} */
-    @Override public boolean writeTo(ByteBuffer buf, MessageWriter writer) {
-        writer.setBuffer(buf);
-
-        if (!writer.isHeaderWritten()) {
-            if (!writer.writeHeader(directType()))
-                return false;
-
-            writer.onHeaderWritten();
-        }
-
-        switch (writer.state()) {
-            case 0:
-                if (!writer.writeByteArray(cntrsMapBytes))
-                    return false;
-
-                writer.incrementState();
-
-            case 1:
-                if (!writer.writeByteArray(errBytes))
-                    return false;
-
-                writer.incrementState();
-
-            case 2:
-                if (!writer.writeInt(flags))
-                    return false;
-
-                writer.incrementState();
-
-            case 3:
-                if (!writer.writeUuid(routineId))
-                    return false;
-
-                writer.incrementState();
-
-        }
-
-        return true;
+    /**
+     * @return Error message.
+     */
+    public @Nullable ErrorMessage errorMessage() {
+        return errMsg;
     }
 
-    /** {@inheritDoc} */
-    @Override public boolean readFrom(ByteBuffer buf, MessageReader reader) {
-        reader.setBuffer(buf);
-
-        switch (reader.state()) {
-            case 0:
-                cntrsMapBytes = reader.readByteArray();
-
-                if (!reader.isLastRead())
-                    return false;
-
-                reader.incrementState();
-
-            case 1:
-                errBytes = reader.readByteArray();
-
-                if (!reader.isLastRead())
-                    return false;
-
-                reader.incrementState();
-
-            case 2:
-                flags = reader.readInt();
-
-                if (!reader.isLastRead())
-                    return false;
-
-                reader.incrementState();
-
-            case 3:
-                routineId = reader.readUuid();
-
-                if (!reader.isLastRead())
-                    return false;
-
-                reader.incrementState();
-
-        }
-
-        return true;
+    /**
+     * @param errMsg Error message.
+     */
+    public void errorMessage(@Nullable ErrorMessage errMsg) {
+        this.errMsg = errMsg;
     }
 
     /** {@inheritDoc} */
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/GridContinuousProcessor.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/GridContinuousProcessor.java
index 3330c91037b..a757559f903 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/GridContinuousProcessor.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/GridContinuousProcessor.java
@@ -56,6 +56,7 @@ import org.apache.ignite.internal.IgniteInternalFuture;
 import org.apache.ignite.internal.IgniteInterruptedCheckedException;
 import org.apache.ignite.internal.NodeStoppingException;
 import org.apache.ignite.internal.cluster.ClusterTopologyCheckedException;
+import org.apache.ignite.internal.managers.communication.ErrorMessage;
 import org.apache.ignite.internal.managers.communication.GridMessageListener;
 import org.apache.ignite.internal.managers.deployment.GridDeployment;
 import org.apache.ignite.internal.managers.deployment.GridDeploymentInfo;
@@ -1545,7 +1546,7 @@ public class GridContinuousProcessor extends 
GridProcessorAdapter {
 
                 IgnitePredicate<ClusterNode> nodeFilter = null;
 
-                byte[] cntrs = null;
+                CachePartitionPartialCountersMap cntrsMap = null;
 
                 if (reqData.nodeFilterBytes() != null) {
                     try {
@@ -1621,12 +1622,8 @@ public class GridContinuousProcessor extends 
GridProcessorAdapter {
                             if (proc != null) {
                                 GridCacheAdapter cache = 
ctx.cache().internalCache(hnd.cacheName());
 
-                                if (cache != null && 
cache.context().userCache()) {
-                                    CachePartitionPartialCountersMap cntrsMap =
-                                        
cache.context().topology().localUpdateCounters(false);
-
-                                    cntrs = U.marshal(marsh, cntrsMap);
-                                }
+                                if (cache != null && 
cache.context().userCache())
+                                    cntrsMap = 
cache.context().topology().localUpdateCounters(false);
                             }
                         }
                     }
@@ -1639,7 +1636,7 @@ public class GridContinuousProcessor extends 
GridProcessorAdapter {
                     }
                 }
 
-                sendMessageStartResult(snd, msg.routineId(), cntrs, err);
+                sendMessageStartResult(snd, msg.routineId(), cntrsMap, err);
             }
         });
     }
@@ -1647,32 +1644,17 @@ public class GridContinuousProcessor extends 
GridProcessorAdapter {
     /**
      * @param node Target node.
      * @param routineId Routine ID.
-     * @param cntrsMapBytes Marshalled {@link 
CachePartitionPartialCountersMap}.
+     * @param cntrsMap Counters map.
      * @param err Start error if any.
      */
     private void sendMessageStartResult(final ClusterNode node,
         final UUID routineId,
-        byte[] cntrsMapBytes,
+        CachePartitionPartialCountersMap cntrsMap,
         @Nullable final Exception err
     ) {
-        byte[] errBytes = null;
-
-        if (err != null) {
-            try {
-                errBytes = U.marshal(marsh, err);
-            }
-            catch (Exception e) {
-                U.error(log, "Failed to marshal routine start error: " + e, e);
-            }
-        }
-
-        ContinuousRoutineStartResultMessage msg = new 
ContinuousRoutineStartResultMessage(routineId,
-            cntrsMapBytes,
-            errBytes,
-            err != null);
-
         try {
-            ctx.io().sendToGridTopic(node, TOPIC_CONTINUOUS, msg, SYSTEM_POOL);
+            ctx.io().sendToGridTopic(node, TOPIC_CONTINUOUS, new 
ContinuousRoutineStartResultMessage(routineId, cntrsMap, err),
+                SYSTEM_POOL);
         }
         catch (ClusterTopologyCheckedException e) {
             if (log.isDebugEnabled())
@@ -2561,7 +2543,7 @@ public class GridContinuousProcessor extends 
GridProcessorAdapter {
 
             resCollect = new 
DiscoveryMessageResultsCollector<ContinuousRoutineStartResultMessage, 
RoutineRegisterResults>(ctx) {
                 @Override protected RoutineRegisterResults 
createResult(Map<UUID, NodeMessage<ContinuousRoutineStartResultMessage>> rcvd) {
-                    Map<UUID, Exception> errs = null;
+                    Map<UUID, Throwable> errs = null;
                     Map<UUID, Map<Integer, T2<Long, Long>>> cntrsPerNode = 
null;
 
                     for (Map.Entry<UUID, 
NodeMessage<ContinuousRoutineStartResultMessage>> entry : rcvd.entrySet()) {
@@ -2570,24 +2552,12 @@ public class GridContinuousProcessor extends 
GridProcessorAdapter {
                         if (msg == null)
                             continue;
 
-                        if (msg.error()) {
-                            byte[] errBytes = msg.errorBytes();
-
-                            Exception err = null;
+                        ErrorMessage errMsg = msg.errorMessage();
 
-                            if (errBytes != null) {
-                                try {
-                                    err = U.unmarshal(marsh, errBytes, 
U.resolveClassLoader(ctx.config()));
-                                }
-                                catch (Exception e) {
-                                    U.warn(log, "Failed to unmarhal continuous 
routine start error: " + e);
-                                }
-                            }
-
-                            if (err == null) {
-                                err = new IgniteCheckedException("Failed to 
start continuous " +
-                                    "routine on node: " + entry.getKey());
-                            }
+                        if (errMsg != null) {
+                            Throwable err = errMsg.error() == null
+                                ? new IgniteCheckedException("Failed to start 
continuous routine on node: " + entry.getKey())
+                                : errMsg.error();
 
                             if (errs == null)
                                 errs = new HashMap<>();
@@ -2595,23 +2565,13 @@ public class GridContinuousProcessor extends 
GridProcessorAdapter {
                             errs.put(entry.getKey(), err);
                         }
                         else {
-                            byte[] cntrsMapBytes = msg.countersMapBytes();
-
-                            if (cntrsMapBytes != null) {
-                                try {
-                                    CachePartitionPartialCountersMap cntrsMap 
= U.unmarshal(
-                                        marsh,
-                                        cntrsMapBytes,
-                                        U.resolveClassLoader(ctx.config()));
+                            CachePartitionPartialCountersMap cntrsMap = 
msg.countersMap();
 
-                                    if (cntrsPerNode == null)
-                                        cntrsPerNode = new HashMap<>();
+                            if (cntrsMap != null) {
+                                if (cntrsPerNode == null)
+                                    cntrsPerNode = new HashMap<>();
 
-                                    cntrsPerNode.put(entry.getKey(), 
CachePartitionPartialCountersMap.toCountersMap(cntrsMap));
-                                }
-                                catch (Exception e) {
-                                    U.warn(log, "Failed to unmarhal continuous 
query update counters: " + e);
-                                }
+                                cntrsPerNode.put(entry.getKey(), 
CachePartitionPartialCountersMap.toCountersMap(cntrsMap));
                             }
                         }
                     }
@@ -2637,7 +2597,7 @@ public class GridContinuousProcessor extends 
GridProcessorAdapter {
          */
         private void onAllRemoteRegistered(
             AffinityTopologyVersion topVer,
-            @Nullable Map<UUID, ? extends Exception> errs,
+            @Nullable Map<UUID, ? extends Throwable> errs,
             Map<UUID, Map<Integer, T2<Long, Long>>> cntrsPerNode,
             Map<Integer, T2<Long, Long>> cntrs) {
             try {
@@ -2661,7 +2621,7 @@ public class GridContinuousProcessor extends 
GridProcessorAdapter {
                     onRemoteRegistered();
                 }
                 else {
-                    Exception firstEx = F.first(errs.values());
+                    Throwable firstEx = F.first(errs.values());
 
                     onDone(firstEx);
 
@@ -2729,7 +2689,7 @@ public class GridContinuousProcessor extends 
GridProcessorAdapter {
         private final AffinityTopologyVersion topVer;
 
         /** */
-        private final Map<UUID, ? extends Exception> errs;
+        private final Map<UUID, ? extends Throwable> errs;
 
         /** */
         private final Map<UUID, Map<Integer, T2<Long, Long>>> cntrsPerNode;
@@ -2740,7 +2700,7 @@ public class GridContinuousProcessor extends 
GridProcessorAdapter {
          * @param cntrsPerNode Update counters.
          */
         RoutineRegisterResults(AffinityTopologyVersion topVer,
-            Map<UUID, ? extends Exception> errs,
+            Map<UUID, ? extends Throwable> errs,
             Map<UUID, Map<Integer, T2<Long, Long>>> cntrsPerNode) {
             this.topVer = topVer;
             this.errs = errs;

Reply via email to