This is an automated email from the ASF dual-hosted git repository.
av pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/ignite.git
The following commit(s) were added to refs/heads/master by this push:
new 9a8cafc05bf IGNITE-27275 Use MessageSerializer for
ContinuousRoutineStartResultMessage (#12561)
9a8cafc05bf is described below
commit 9a8cafc05bf4675c7bca0c01548ef24390edfa5d
Author: Dmitry Werner <[email protected]>
AuthorDate: Thu Feb 5 18:03:17 2026 +0500
IGNITE-27275 Use MessageSerializer for ContinuousRoutineStartResultMessage
(#12561)
---
.../communication/GridIoMessageFactory.java | 3 +-
.../ContinuousRoutineStartResultMessage.java | 145 ++++++---------------
.../continuous/GridContinuousProcessor.java | 88 ++++---------
3 files changed, 63 insertions(+), 173 deletions(-)
diff --git
a/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoMessageFactory.java
b/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoMessageFactory.java
index 83d8712dea7..919f816f62c 100644
---
a/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoMessageFactory.java
+++
b/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoMessageFactory.java
@@ -48,6 +48,7 @@ import
org.apache.ignite.internal.codegen.CachePartitionsToReloadMapSerializer;
import org.apache.ignite.internal.codegen.CacheVersionedValueSerializer;
import
org.apache.ignite.internal.codegen.CacheWriteSynchronizationModeMessageSerializer;
import
org.apache.ignite.internal.codegen.ClusterMetricsUpdateMessageSerializer;
+import
org.apache.ignite.internal.codegen.ContinuousRoutineStartResultMessageSerializer;
import org.apache.ignite.internal.codegen.ErrorMessageSerializer;
import org.apache.ignite.internal.codegen.ExchangeInfoSerializer;
import
org.apache.ignite.internal.codegen.GenerateEncryptionKeyRequestSerializer;
@@ -460,7 +461,7 @@ public class GridIoMessageFactory implements
MessageFactoryProvider {
factory.register((short)132, UserAuthenticateResponseMessage::new, new
UserAuthenticateResponseMessageSerializer());
factory.register(ClusterMetricsUpdateMessage.TYPE_CODE,
ClusterMetricsUpdateMessage::new,
new ClusterMetricsUpdateMessageSerializer());
- factory.register((short)134, ContinuousRoutineStartResultMessage::new);
+ factory.register((short)134, ContinuousRoutineStartResultMessage::new,
new ContinuousRoutineStartResultMessageSerializer());
factory.register((short)135, LatchAckMessage::new, new
LatchAckMessageSerializer());
factory.register(CacheMetricsMessage.TYPE_CODE,
CacheMetricsMessage::new, new CacheMetricsMessageSerializer());
factory.register(NodeMetricsMessage.TYPE_CODE,
NodeMetricsMessage::new, new NodeMetricsMessageSerializer());
diff --git
a/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/ContinuousRoutineStartResultMessage.java
b/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/ContinuousRoutineStartResultMessage.java
index ad0a47c770b..bf12b906a30 100644
---
a/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/ContinuousRoutineStartResultMessage.java
+++
b/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/ContinuousRoutineStartResultMessage.java
@@ -17,13 +17,12 @@
package org.apache.ignite.internal.processors.continuous;
-import java.nio.ByteBuffer;
import java.util.UUID;
+import org.apache.ignite.internal.Order;
+import org.apache.ignite.internal.managers.communication.ErrorMessage;
import
org.apache.ignite.internal.processors.cache.distributed.dht.preloader.CachePartitionPartialCountersMap;
import org.apache.ignite.internal.util.typedef.internal.S;
import org.apache.ignite.plugin.extensions.communication.Message;
-import org.apache.ignite.plugin.extensions.communication.MessageReader;
-import org.apache.ignite.plugin.extensions.communication.MessageWriter;
import org.jetbrains.annotations.Nullable;
/**
@@ -31,19 +30,16 @@ import org.jetbrains.annotations.Nullable;
*/
public class ContinuousRoutineStartResultMessage implements Message {
/** */
- private static final int ERROR_FLAG = 0x01;
-
- /** */
+ @Order(0)
private UUID routineId;
/** */
- private byte[] errBytes;
+ @Order(value = 1, method = "errorMessage")
+ private @Nullable ErrorMessage errMsg;
/** */
- private byte[] cntrsMapBytes;
-
- /** */
- private int flags;
+ @Order(value = 2, method = "countersMap")
+ private CachePartitionPartialCountersMap cntrsMap;
/**
*
@@ -54,128 +50,61 @@ public class ContinuousRoutineStartResultMessage
implements Message {
/**
* @param routineId Routine ID.
- * @param cntrsMapBytes Marshalled {@link
CachePartitionPartialCountersMap}.
- * @param errBytes Error bytes.
- * @param err {@code True} if failed to start routine.
+ * @param cntrsMap Counters map.
+ * @param err Error.
*/
- ContinuousRoutineStartResultMessage(UUID routineId, byte[] cntrsMapBytes,
byte[] errBytes, boolean err) {
+ ContinuousRoutineStartResultMessage(
+ UUID routineId,
+ @Nullable CachePartitionPartialCountersMap cntrsMap,
+ @Nullable Throwable err
+ ) {
this.routineId = routineId;
- this.cntrsMapBytes = cntrsMapBytes;
- this.errBytes = errBytes;
+ this.cntrsMap = cntrsMap;
- if (err)
- flags |= ERROR_FLAG;
+ if (err != null)
+ errMsg = new ErrorMessage(err);
}
/**
- * @return Marshalled {@link CachePartitionPartialCountersMap}.
+ * @return Counters map.
*/
- @Nullable byte[] countersMapBytes() {
- return cntrsMapBytes;
+ public @Nullable CachePartitionPartialCountersMap countersMap() {
+ return cntrsMap;
}
/**
- * @return {@code True} if failed to start routine.
+ * @param cntrsMap Counters map.
*/
- boolean error() {
- return (flags & ERROR_FLAG) != 0;
+ public void countersMap(@Nullable CachePartitionPartialCountersMap
cntrsMap) {
+ this.cntrsMap = cntrsMap;
}
/**
* @return Routine ID.
*/
- UUID routineId() {
+ public UUID routineId() {
return routineId;
}
/**
- * @return Error bytes.
+ * @param routineId Routine ID.
*/
- @Nullable byte[] errorBytes() {
- return errBytes;
+ public void routineId(UUID routineId) {
+ this.routineId = routineId;
}
- /** {@inheritDoc} */
- @Override public boolean writeTo(ByteBuffer buf, MessageWriter writer) {
- writer.setBuffer(buf);
-
- if (!writer.isHeaderWritten()) {
- if (!writer.writeHeader(directType()))
- return false;
-
- writer.onHeaderWritten();
- }
-
- switch (writer.state()) {
- case 0:
- if (!writer.writeByteArray(cntrsMapBytes))
- return false;
-
- writer.incrementState();
-
- case 1:
- if (!writer.writeByteArray(errBytes))
- return false;
-
- writer.incrementState();
-
- case 2:
- if (!writer.writeInt(flags))
- return false;
-
- writer.incrementState();
-
- case 3:
- if (!writer.writeUuid(routineId))
- return false;
-
- writer.incrementState();
-
- }
-
- return true;
+ /**
+ * @return Error message.
+ */
+ public @Nullable ErrorMessage errorMessage() {
+ return errMsg;
}
- /** {@inheritDoc} */
- @Override public boolean readFrom(ByteBuffer buf, MessageReader reader) {
- reader.setBuffer(buf);
-
- switch (reader.state()) {
- case 0:
- cntrsMapBytes = reader.readByteArray();
-
- if (!reader.isLastRead())
- return false;
-
- reader.incrementState();
-
- case 1:
- errBytes = reader.readByteArray();
-
- if (!reader.isLastRead())
- return false;
-
- reader.incrementState();
-
- case 2:
- flags = reader.readInt();
-
- if (!reader.isLastRead())
- return false;
-
- reader.incrementState();
-
- case 3:
- routineId = reader.readUuid();
-
- if (!reader.isLastRead())
- return false;
-
- reader.incrementState();
-
- }
-
- return true;
+ /**
+ * @param errMsg Error message.
+ */
+ public void errorMessage(@Nullable ErrorMessage errMsg) {
+ this.errMsg = errMsg;
}
/** {@inheritDoc} */
diff --git
a/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/GridContinuousProcessor.java
b/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/GridContinuousProcessor.java
index 3330c91037b..a757559f903 100644
---
a/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/GridContinuousProcessor.java
+++
b/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/GridContinuousProcessor.java
@@ -56,6 +56,7 @@ import org.apache.ignite.internal.IgniteInternalFuture;
import org.apache.ignite.internal.IgniteInterruptedCheckedException;
import org.apache.ignite.internal.NodeStoppingException;
import org.apache.ignite.internal.cluster.ClusterTopologyCheckedException;
+import org.apache.ignite.internal.managers.communication.ErrorMessage;
import org.apache.ignite.internal.managers.communication.GridMessageListener;
import org.apache.ignite.internal.managers.deployment.GridDeployment;
import org.apache.ignite.internal.managers.deployment.GridDeploymentInfo;
@@ -1545,7 +1546,7 @@ public class GridContinuousProcessor extends
GridProcessorAdapter {
IgnitePredicate<ClusterNode> nodeFilter = null;
- byte[] cntrs = null;
+ CachePartitionPartialCountersMap cntrsMap = null;
if (reqData.nodeFilterBytes() != null) {
try {
@@ -1621,12 +1622,8 @@ public class GridContinuousProcessor extends
GridProcessorAdapter {
if (proc != null) {
GridCacheAdapter cache =
ctx.cache().internalCache(hnd.cacheName());
- if (cache != null &&
cache.context().userCache()) {
- CachePartitionPartialCountersMap cntrsMap =
-
cache.context().topology().localUpdateCounters(false);
-
- cntrs = U.marshal(marsh, cntrsMap);
- }
+ if (cache != null &&
cache.context().userCache())
+ cntrsMap =
cache.context().topology().localUpdateCounters(false);
}
}
}
@@ -1639,7 +1636,7 @@ public class GridContinuousProcessor extends
GridProcessorAdapter {
}
}
- sendMessageStartResult(snd, msg.routineId(), cntrs, err);
+ sendMessageStartResult(snd, msg.routineId(), cntrsMap, err);
}
});
}
@@ -1647,32 +1644,17 @@ public class GridContinuousProcessor extends
GridProcessorAdapter {
/**
* @param node Target node.
* @param routineId Routine ID.
- * @param cntrsMapBytes Marshalled {@link
CachePartitionPartialCountersMap}.
+ * @param cntrsMap Counters map.
* @param err Start error if any.
*/
private void sendMessageStartResult(final ClusterNode node,
final UUID routineId,
- byte[] cntrsMapBytes,
+ CachePartitionPartialCountersMap cntrsMap,
@Nullable final Exception err
) {
- byte[] errBytes = null;
-
- if (err != null) {
- try {
- errBytes = U.marshal(marsh, err);
- }
- catch (Exception e) {
- U.error(log, "Failed to marshal routine start error: " + e, e);
- }
- }
-
- ContinuousRoutineStartResultMessage msg = new
ContinuousRoutineStartResultMessage(routineId,
- cntrsMapBytes,
- errBytes,
- err != null);
-
try {
- ctx.io().sendToGridTopic(node, TOPIC_CONTINUOUS, msg, SYSTEM_POOL);
+ ctx.io().sendToGridTopic(node, TOPIC_CONTINUOUS, new
ContinuousRoutineStartResultMessage(routineId, cntrsMap, err),
+ SYSTEM_POOL);
}
catch (ClusterTopologyCheckedException e) {
if (log.isDebugEnabled())
@@ -2561,7 +2543,7 @@ public class GridContinuousProcessor extends
GridProcessorAdapter {
resCollect = new
DiscoveryMessageResultsCollector<ContinuousRoutineStartResultMessage,
RoutineRegisterResults>(ctx) {
@Override protected RoutineRegisterResults
createResult(Map<UUID, NodeMessage<ContinuousRoutineStartResultMessage>> rcvd) {
- Map<UUID, Exception> errs = null;
+ Map<UUID, Throwable> errs = null;
Map<UUID, Map<Integer, T2<Long, Long>>> cntrsPerNode =
null;
for (Map.Entry<UUID,
NodeMessage<ContinuousRoutineStartResultMessage>> entry : rcvd.entrySet()) {
@@ -2570,24 +2552,12 @@ public class GridContinuousProcessor extends
GridProcessorAdapter {
if (msg == null)
continue;
- if (msg.error()) {
- byte[] errBytes = msg.errorBytes();
-
- Exception err = null;
+ ErrorMessage errMsg = msg.errorMessage();
- if (errBytes != null) {
- try {
- err = U.unmarshal(marsh, errBytes,
U.resolveClassLoader(ctx.config()));
- }
- catch (Exception e) {
- U.warn(log, "Failed to unmarhal continuous
routine start error: " + e);
- }
- }
-
- if (err == null) {
- err = new IgniteCheckedException("Failed to
start continuous " +
- "routine on node: " + entry.getKey());
- }
+ if (errMsg != null) {
+ Throwable err = errMsg.error() == null
+ ? new IgniteCheckedException("Failed to start
continuous routine on node: " + entry.getKey())
+ : errMsg.error();
if (errs == null)
errs = new HashMap<>();
@@ -2595,23 +2565,13 @@ public class GridContinuousProcessor extends
GridProcessorAdapter {
errs.put(entry.getKey(), err);
}
else {
- byte[] cntrsMapBytes = msg.countersMapBytes();
-
- if (cntrsMapBytes != null) {
- try {
- CachePartitionPartialCountersMap cntrsMap
= U.unmarshal(
- marsh,
- cntrsMapBytes,
- U.resolveClassLoader(ctx.config()));
+ CachePartitionPartialCountersMap cntrsMap =
msg.countersMap();
- if (cntrsPerNode == null)
- cntrsPerNode = new HashMap<>();
+ if (cntrsMap != null) {
+ if (cntrsPerNode == null)
+ cntrsPerNode = new HashMap<>();
- cntrsPerNode.put(entry.getKey(),
CachePartitionPartialCountersMap.toCountersMap(cntrsMap));
- }
- catch (Exception e) {
- U.warn(log, "Failed to unmarhal continuous
query update counters: " + e);
- }
+ cntrsPerNode.put(entry.getKey(),
CachePartitionPartialCountersMap.toCountersMap(cntrsMap));
}
}
}
@@ -2637,7 +2597,7 @@ public class GridContinuousProcessor extends
GridProcessorAdapter {
*/
private void onAllRemoteRegistered(
AffinityTopologyVersion topVer,
- @Nullable Map<UUID, ? extends Exception> errs,
+ @Nullable Map<UUID, ? extends Throwable> errs,
Map<UUID, Map<Integer, T2<Long, Long>>> cntrsPerNode,
Map<Integer, T2<Long, Long>> cntrs) {
try {
@@ -2661,7 +2621,7 @@ public class GridContinuousProcessor extends
GridProcessorAdapter {
onRemoteRegistered();
}
else {
- Exception firstEx = F.first(errs.values());
+ Throwable firstEx = F.first(errs.values());
onDone(firstEx);
@@ -2729,7 +2689,7 @@ public class GridContinuousProcessor extends
GridProcessorAdapter {
private final AffinityTopologyVersion topVer;
/** */
- private final Map<UUID, ? extends Exception> errs;
+ private final Map<UUID, ? extends Throwable> errs;
/** */
private final Map<UUID, Map<Integer, T2<Long, Long>>> cntrsPerNode;
@@ -2740,7 +2700,7 @@ public class GridContinuousProcessor extends
GridProcessorAdapter {
* @param cntrsPerNode Update counters.
*/
RoutineRegisterResults(AffinityTopologyVersion topVer,
- Map<UUID, ? extends Exception> errs,
+ Map<UUID, ? extends Throwable> errs,
Map<UUID, Map<Integer, T2<Long, Long>>> cntrsPerNode) {
this.topVer = topVer;
this.errs = errs;