This is an automated email from the ASF dual-hosted git repository.
namelchev pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/ignite.git
The following commit(s) were added to refs/heads/master by this push:
new 1858e222559 IGNITE-27968 Use message serializer for
ExchangeFailureMessage (#12963)
1858e222559 is described below
commit 1858e2225597f4534fd59c7be5767d7529a257a9
Author: Nikita Amelchev <[email protected]>
AuthorDate: Wed Apr 1 12:29:35 2026 +0300
IGNITE-27968 Use message serializer for ExchangeFailureMessage (#12963)
---
.../discovery/DiscoveryMessageFactory.java | 6 ++++
.../processors/cache/ExchangeFailureMessage.java | 34 ++++++++++++++++------
2 files changed, 31 insertions(+), 9 deletions(-)
diff --git
a/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/DiscoveryMessageFactory.java
b/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/DiscoveryMessageFactory.java
index 5e05b7d8757..4960fb374c8 100644
---
a/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/DiscoveryMessageFactory.java
+++
b/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/DiscoveryMessageFactory.java
@@ -43,6 +43,8 @@ import
org.apache.ignite.internal.processors.cache.ClientCacheChangeDummyDiscove
import
org.apache.ignite.internal.processors.cache.ClientCacheChangeDummyDiscoveryMessageMarshallableSerializer;
import org.apache.ignite.internal.processors.cache.DynamicCacheChangeBatch;
import
org.apache.ignite.internal.processors.cache.DynamicCacheChangeBatchMarshallableSerializer;
+import org.apache.ignite.internal.processors.cache.ExchangeFailureMessage;
+import
org.apache.ignite.internal.processors.cache.ExchangeFailureMessageSerializer;
import
org.apache.ignite.internal.processors.cache.TxTimeoutOnPartitionMapExchangeChangeMessage;
import
org.apache.ignite.internal.processors.cache.TxTimeoutOnPartitionMapExchangeChangeMessageSerializer;
import org.apache.ignite.internal.processors.cache.WalStateFinishMessage;
@@ -55,6 +57,8 @@ import
org.apache.ignite.internal.processors.cache.binary.MetadataRemoveProposed
import
org.apache.ignite.internal.processors.cache.binary.MetadataRemoveProposedMessageSerializer;
import
org.apache.ignite.internal.processors.cache.binary.MetadataUpdateAcceptedMessage;
import
org.apache.ignite.internal.processors.cache.binary.MetadataUpdateAcceptedMessageSerializer;
+import
org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPartitionExchangeId;
+import
org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPartitionExchangeIdSerializer;
import
org.apache.ignite.internal.processors.cache.persistence.snapshot.DataStreamerUpdatesHandlerResult;
import
org.apache.ignite.internal.processors.cache.persistence.snapshot.DataStreamerUpdatesHandlerResultSerializer;
import
org.apache.ignite.internal.processors.cache.persistence.snapshot.IncrementalSnapshotVerifyResult;
@@ -306,6 +310,7 @@ public class DiscoveryMessageFactory implements
MessageFactoryProvider {
factory.register(38, ChangeCacheEncryptionRequest::new, new
ChangeCacheEncryptionRequestSerializer());
factory.register(86, GridCacheVersion::new, new
GridCacheVersionSerializer());
+ factory.register(87, GridDhtPartitionExchangeId::new, new
GridDhtPartitionExchangeIdSerializer());
factory.register(167, ServiceDeploymentProcessId::new, new
ServiceDeploymentProcessIdSerializer());
factory.register(169, ServiceSingleNodeDeploymentResult::new, new
ServiceSingleNodeDeploymentResultSerializer());
@@ -361,5 +366,6 @@ public class DiscoveryMessageFactory implements
MessageFactoryProvider {
factory.register(537, ServiceDeploymentRequest::new,
new ServiceDeploymentRequestMarshallableSerializer(marsh, clsLdr));
factory.register(538, ServiceUndeploymentRequest::new, new
ServiceUndeploymentRequestSerializer());
+ factory.register(539, ExchangeFailureMessage::new, new
ExchangeFailureMessageSerializer());
}
}
diff --git
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/ExchangeFailureMessage.java
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/ExchangeFailureMessage.java
index 8d6c3d3310f..e46e0cf6dcf 100644
---
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/ExchangeFailureMessage.java
+++
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/ExchangeFailureMessage.java
@@ -18,10 +18,14 @@
package org.apache.ignite.internal.processors.cache;
import java.util.Collection;
+import java.util.HashMap;
import java.util.Map;
import java.util.UUID;
+import java.util.stream.Collectors;
import org.apache.ignite.IgniteCheckedException;
import org.apache.ignite.cluster.ClusterNode;
+import org.apache.ignite.internal.Order;
+import org.apache.ignite.internal.managers.communication.ErrorMessage;
import org.apache.ignite.internal.managers.discovery.DiscoCache;
import org.apache.ignite.internal.managers.discovery.DiscoveryCustomMessage;
import org.apache.ignite.internal.managers.discovery.GridDiscoveryManager;
@@ -32,32 +36,43 @@ import org.apache.ignite.internal.util.typedef.F;
import org.apache.ignite.internal.util.typedef.internal.S;
import org.apache.ignite.internal.util.typedef.internal.U;
import org.apache.ignite.lang.IgniteUuid;
+import org.apache.ignite.plugin.extensions.communication.Message;
+import org.apache.ignite.plugin.extensions.communication.MessageFactory;
import org.jetbrains.annotations.Nullable;
/**
* This class represents discovery message that is used to provide information
about dynamic cache start failure.
*/
-public class ExchangeFailureMessage implements DiscoveryCustomMessage {
+public class ExchangeFailureMessage implements DiscoveryCustomMessage, Message
{
/** */
private static final long serialVersionUID = 0L;
/** Cache names. */
@GridToStringInclude
- private final Collection<String> cacheNames;
+ @Order(0)
+ Collection<String> cacheNames;
/** Custom message ID. */
- private final IgniteUuid id;
+ @Order(1)
+ IgniteUuid id;
/** */
- private final GridDhtPartitionExchangeId exchId;
+ @Order(2)
+ GridDhtPartitionExchangeId exchId;
/** */
@GridToStringInclude
- private final Map<UUID, Throwable> exchangeErrors;
+ @Order(3)
+ Map<UUID, ErrorMessage> exchangeErrors;
/** Actions to be done to rollback changes done before the exchange
failure. */
private transient ExchangeActions exchangeRollbackActions;
+ /** Default constructor for {@link MessageFactory}. */
+ public ExchangeFailureMessage() {
+ // No-op.
+ }
+
/**
* Creates new DynamicCacheChangeFailureMessage instance.
*
@@ -78,7 +93,8 @@ public class ExchangeFailureMessage implements
DiscoveryCustomMessage {
this.id = IgniteUuid.fromUuid(locNode.id());
this.exchId = exchId;
this.cacheNames = cacheNames;
- this.exchangeErrors = exchangeErrors;
+ this.exchangeErrors = exchangeErrors.entrySet().stream().collect(
+ Collectors.toMap(Map.Entry::getKey, e -> new
ErrorMessage(e.getValue()), (a, b) -> a, HashMap::new));
}
/** {@inheritDoc} */
@@ -95,7 +111,7 @@ public class ExchangeFailureMessage implements
DiscoveryCustomMessage {
/** */
public Map<UUID, Throwable> exchangeErrors() {
- return exchangeErrors;
+ return F.viewReadOnly(exchangeErrors, e -> ErrorMessage.error(e));
}
/**
@@ -123,8 +139,8 @@ public class ExchangeFailureMessage implements
DiscoveryCustomMessage {
public IgniteCheckedException createFailureCompoundException() {
IgniteCheckedException ex = new IgniteCheckedException("Failed to
complete exchange process.");
- for (Map.Entry<UUID, Throwable> entry : exchangeErrors.entrySet())
- U.addSuppressed(ex, entry.getValue());
+ for (Throwable err : exchangeErrors().values())
+ U.addSuppressed(ex, err);
return ex;
}