This is an automated email from the ASF dual-hosted git repository.
sergeychugunov pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/ignite.git
The following commit(s) were added to refs/heads/master by this push:
new 999dc1fc88d IGNITE-26889 Use MessageSerializer for
GridDhtPartitionsFullMessage (#12508)
999dc1fc88d is described below
commit 999dc1fc88d8f85bacf2fa229f8644ac423ac1c3
Author: Dmitry Werner <[email protected]>
AuthorDate: Thu Nov 20 18:02:48 2025 +0500
IGNITE-26889 Use MessageSerializer for GridDhtPartitionsFullMessage (#12508)
---
.../communication/GridIoMessageFactory.java | 3 +-
.../processors/cache/ExchangeFailureMessage.java | 8 +-
.../preloader/GridDhtPartitionsExchangeFuture.java | 17 +-
.../preloader/GridDhtPartitionsFullMessage.java | 399 ++++++++-------------
.../cluster/GridClusterStateProcessor.java | 4 +-
.../cluster/IGridClusterStateProcessor.java | 2 +-
6 files changed, 161 insertions(+), 272 deletions(-)
diff --git
a/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoMessageFactory.java
b/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoMessageFactory.java
index 4bdc2102d44..a91f8a7a996 100644
---
a/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoMessageFactory.java
+++
b/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoMessageFactory.java
@@ -65,6 +65,7 @@ import
org.apache.ignite.internal.codegen.GridDhtLockResponseSerializer;
import
org.apache.ignite.internal.codegen.GridDhtPartitionDemandMessageSerializer;
import org.apache.ignite.internal.codegen.GridDhtPartitionExchangeIdSerializer;
import
org.apache.ignite.internal.codegen.GridDhtPartitionSupplyMessageSerializer;
+import
org.apache.ignite.internal.codegen.GridDhtPartitionsFullMessageSerializer;
import
org.apache.ignite.internal.codegen.GridDhtPartitionsSingleRequestSerializer;
import org.apache.ignite.internal.codegen.GridDhtTxFinishRequestSerializer;
import org.apache.ignite.internal.codegen.GridDhtTxFinishResponseSerializer;
@@ -362,7 +363,7 @@ public class GridIoMessageFactory implements
MessageFactoryProvider {
factory.register((short)42, GridDhtForceKeysRequest::new, new
GridDhtForceKeysRequestSerializer());
factory.register((short)43, GridDhtForceKeysResponse::new, new
GridDhtForceKeysResponseSerializer());
factory.register((short)45, GridDhtPartitionDemandMessage::new, new
GridDhtPartitionDemandMessageSerializer());
- factory.register((short)46, GridDhtPartitionsFullMessage::new);
+ factory.register((short)46, GridDhtPartitionsFullMessage::new, new
GridDhtPartitionsFullMessageSerializer());
factory.register((short)47, GridDhtPartitionsSingleMessage::new);
factory.register((short)48, GridDhtPartitionsSingleRequest::new, new
GridDhtPartitionsSingleRequestSerializer());
factory.register((short)49, GridNearGetRequest::new, new
GridNearGetRequestSerializer());
diff --git
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/ExchangeFailureMessage.java
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/ExchangeFailureMessage.java
index b29505e5ece..764bd717d1c 100644
---
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/ExchangeFailureMessage.java
+++
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/ExchangeFailureMessage.java
@@ -53,7 +53,7 @@ public class ExchangeFailureMessage implements
DiscoveryCustomMessage {
/** */
@GridToStringInclude
- private final Map<UUID, Exception> exchangeErrors;
+ private final Map<UUID, Throwable> exchangeErrors;
/** Actions to be done to rollback changes done before the exchange
failure. */
private transient ExchangeActions exchangeRollbackActions;
@@ -68,7 +68,7 @@ public class ExchangeFailureMessage implements
DiscoveryCustomMessage {
public ExchangeFailureMessage(
ClusterNode locNode,
GridDhtPartitionExchangeId exchId,
- Map<UUID, Exception> exchangeErrors,
+ Map<UUID, Throwable> exchangeErrors,
Collection<String> cacheNames
) {
assert exchId != null;
@@ -94,7 +94,7 @@ public class ExchangeFailureMessage implements
DiscoveryCustomMessage {
}
/** */
- public Map<UUID, Exception> exchangeErrors() {
+ public Map<UUID, Throwable> exchangeErrors() {
return exchangeErrors;
}
@@ -123,7 +123,7 @@ public class ExchangeFailureMessage implements
DiscoveryCustomMessage {
public IgniteCheckedException createFailureCompoundException() {
IgniteCheckedException ex = new IgniteCheckedException("Failed to
complete exchange process.");
- for (Map.Entry<UUID, Exception> entry : exchangeErrors.entrySet())
+ for (Map.Entry<UUID, Throwable> entry : exchangeErrors.entrySet())
U.addSuppressed(ex, entry.getValue());
return ex;
diff --git
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java
index e40f6234a90..dd1ded01ed4 100644
---
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java
+++
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java
@@ -306,7 +306,7 @@ public class GridDhtPartitionsExchangeFuture extends
GridDhtTopologyFutureAdapte
private volatile Exception exchangeLocE;
/** Exchange exceptions from all participating nodes. */
- private final Map<UUID, Exception> exchangeGlobalExceptions = new
ConcurrentHashMap<>();
+ private final Map<UUID, Throwable> exchangeGlobalExceptions = new
ConcurrentHashMap<>();
/** Used to track the fact that {@link ExchangeFailureMessage} was sent. */
private volatile boolean isExchangeFailureMsgSent;
@@ -2214,7 +2214,13 @@ public class GridDhtPartitionsExchangeFuture extends
GridDhtTopologyFutureAdapte
// Prepare full message for newly joined nodes with affinity request.
final GridDhtPartitionsFullMessage fullMsgWithAff = singleMsgWithAffReq
.filter(singleMessage -> affinityForJoinedNodes != null)
- .map(singleMessage ->
fullMsg.copy().joinedNodeAffinity(affinityForJoinedNodes))
+ .map(singleMessage -> {
+ GridDhtPartitionsFullMessage copy = fullMsg.copy();
+
+ copy.joinedNodeAffinity(affinityForJoinedNodes);
+
+ return copy;
+ })
.orElse(null);
// Prepare and send full messages for given nodes.
@@ -4252,7 +4258,7 @@ public class GridDhtPartitionsExchangeFuture extends
GridDhtTopologyFutureAdapte
}
catch (IllegalStateException e) {
// Cannot create affinity message.
- Map<UUID, Exception> errs = Collections.singletonMap(
+ Map<UUID, Throwable> errs = Collections.singletonMap(
nodeId,
node.isClient() ? new IgniteNeedReconnectException(node,
e) : new IgniteCheckedException(e));
@@ -4517,7 +4523,7 @@ public class GridDhtPartitionsExchangeFuture extends
GridDhtTopologyFutureAdapte
}
else {
if (!F.isEmpty(msg.getErrorsMap())) {
- Exception e =
msg.getErrorsMap().get(cctx.localNodeId());
+ Throwable e =
msg.getErrorsMap().get(cctx.localNodeId());
if (e instanceof
IgniteNeedReconnectException) {
onDone(e);
@@ -4654,7 +4660,8 @@ public class GridDhtPartitionsExchangeFuture extends
GridDhtTopologyFutureAdapte
assert partHistSuppliers.isEmpty();
- partHistSuppliers.putAll(msg.partitionHistorySuppliers());
+ partHistSuppliers.putAll(msg.partitionHistorySuppliers() != null ?
msg.partitionHistorySuppliers() :
+ IgniteDhtPartitionHistorySuppliersMap.empty());
// Reserve at least 2 threads for system operations.
int parallelismLvl = U.availableThreadCount(cctx.kernalContext(),
GridIoPolicy.SYSTEM_POOL, 2);
diff --git
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsFullMessage.java
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsFullMessage.java
index 15fef52e473..0894a8ba48f 100644
---
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsFullMessage.java
+++
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsFullMessage.java
@@ -17,7 +17,6 @@
package org.apache.ignite.internal.processors.cache.distributed.dht.preloader;
-import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
@@ -30,8 +29,8 @@ import java.util.stream.Collectors;
import java.util.stream.IntStream;
import org.apache.ignite.IgniteCheckedException;
import org.apache.ignite.cluster.ClusterNode;
-import org.apache.ignite.internal.GridDirectMap;
-import org.apache.ignite.internal.GridDirectTransient;
+import org.apache.ignite.internal.Order;
+import org.apache.ignite.internal.managers.communication.ErrorMessage;
import org.apache.ignite.internal.managers.communication.GridIoPolicy;
import org.apache.ignite.internal.managers.discovery.GridDiscoveryManager;
import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion;
@@ -45,9 +44,6 @@ import
org.apache.ignite.internal.util.tostring.GridToStringInclude;
import org.apache.ignite.internal.util.typedef.F;
import org.apache.ignite.internal.util.typedef.internal.S;
import org.apache.ignite.internal.util.typedef.internal.U;
-import
org.apache.ignite.plugin.extensions.communication.MessageCollectionItemType;
-import org.apache.ignite.plugin.extensions.communication.MessageReader;
-import org.apache.ignite.plugin.extensions.communication.MessageWriter;
import org.jetbrains.annotations.NotNull;
import org.jetbrains.annotations.Nullable;
@@ -62,58 +58,73 @@ public class GridDhtPartitionsFullMessage extends
GridDhtPartitionsAbstractMessa
/** grpId -> FullMap */
@GridToStringInclude
- @GridDirectTransient
private Map<Integer, GridDhtPartitionFullMap> parts;
- /** */
- @GridDirectMap(keyType = Integer.class, valueType = Integer.class)
- private Map<Integer, Integer> dupPartsData;
+ /**
+ * Serialized local partitions.
+ * <p>
+ * TODO Remove this field after completing task IGNITE-26976.
+ */
+ @Order(value = 6, method = "partitionBytes")
+ private byte[] partsBytes;
/** */
- private byte[] partsBytes;
+ @Order(value = 7, method = "duplicatedPartitionsData")
+ private Map<Integer, Integer> dupPartsData;
/** Partitions update counters. */
+ @Order(value = 8, method = "partitionCounters")
@GridToStringInclude
private IgniteDhtPartitionCountersMap partCntrs;
/** Partitions history suppliers. */
+ @Order(value = 9, method = "partitionHistorySuppliers")
@GridToStringInclude
private IgniteDhtPartitionHistorySuppliersMap partHistSuppliers;
/** Partitions that must be cleared and re-loaded. */
+ @Order(value = 10, method = "partitionsToReload")
@GridToStringInclude
private IgniteDhtPartitionsToReloadMap partsToReload;
/** Partition sizes. */
+ @Order(value = 11, method = "partitionSizes")
private Map<Integer, PartitionSizesMap> partsSizes;
/** Topology version. */
+ @Order(value = 12, method = "topologyVersion")
private AffinityTopologyVersion topVer;
/** Exceptions. */
@GridToStringInclude
- @GridDirectTransient
- private Map<UUID, Exception> errs;
+ private Map<UUID, Throwable> errs;
- /** */
- private byte[] errsBytes;
+ /**
+ * Used as a stub for serialization of {@link #errs}.
+ * All logic resides within getter and setter.
+ */
+ @Order(value = 13, method = "errorMessages")
+ @SuppressWarnings("unused")
+ private Map<UUID, ErrorMessage> errMsgs;
/** */
+ @Order(value = 14, method = "resultTopologyVersion")
private AffinityTopologyVersion resTopVer;
/** */
- @GridDirectMap(keyType = Integer.class, valueType =
CacheGroupAffinityMessage.class)
+ @Order(value = 15, method = "joinedNodeAffinity")
private Map<Integer, CacheGroupAffinityMessage> joinedNodeAff;
/** */
- @GridDirectMap(keyType = Integer.class, valueType =
CacheGroupAffinityMessage.class)
+ @Order(value = 16, method = "idealAffinityDiff")
private Map<Integer, CacheGroupAffinityMessage> idealAffDiff;
/** */
+ @Order(value = 17, method = "rebalancedFlags")
private byte flags;
/** */
- @GridDirectMap(keyType = Integer.class, valueType = int[].class)
+ @Order(value = 18, method = "lostPartitions")
@GridToStringExclude
private Map<Integer, int[]> lostParts;
@@ -176,7 +187,6 @@ public class GridDhtPartitionsFullMessage extends
GridDhtPartitionsAbstractMessa
cp.partsSizes = partsSizes;
cp.topVer = topVer;
cp.errs = errs;
- cp.errsBytes = errsBytes;
cp.resTopVer = resTopVer;
cp.joinedNodeAff = joinedNodeAff;
cp.idealAffDiff = idealAffDiff;
@@ -219,10 +229,8 @@ public class GridDhtPartitionsFullMessage extends
GridDhtPartitionsAbstractMessa
/**
* @param joinedNodeAff Caches affinity for joining nodes.
*/
- GridDhtPartitionsFullMessage joinedNodeAffinity(Map<Integer,
CacheGroupAffinityMessage> joinedNodeAff) {
+ public void joinedNodeAffinity(Map<Integer, CacheGroupAffinityMessage>
joinedNodeAff) {
this.joinedNodeAff = joinedNodeAff;
-
- return this;
}
/**
@@ -235,7 +243,7 @@ public class GridDhtPartitionsFullMessage extends
GridDhtPartitionsAbstractMessa
/**
* @param idealAffDiff Difference with ideal affinity.
*/
- void idealAffinityDiff(Map<Integer, CacheGroupAffinityMessage>
idealAffDiff) {
+ public void idealAffinityDiff(Map<Integer, CacheGroupAffinityMessage>
idealAffDiff) {
this.idealAffDiff = idealAffDiff;
}
@@ -249,6 +257,20 @@ public class GridDhtPartitionsFullMessage extends
GridDhtPartitionsAbstractMessa
return parts;
}
+ /**
+ * @return Serialized local partitions.
+ */
+ public byte[] partitionBytes() {
+ return partsBytes;
+ }
+
+ /**
+ * @param partsBytes Serialized local partitions.
+ */
+ public void partitionBytes(byte[] partsBytes) {
+ this.partsBytes = partsBytes;
+ }
+
/**
* @param grpId Cache group ID.
* @return {@code True} if message contains full map for given cache.
@@ -333,15 +355,19 @@ public class GridDhtPartitionsFullMessage extends
GridDhtPartitionsAbstractMessa
}
/**
- *
+ * @return Partitions history suppliers.
*/
public IgniteDhtPartitionHistorySuppliersMap partitionHistorySuppliers() {
- if (partHistSuppliers == null)
- return IgniteDhtPartitionHistorySuppliersMap.empty();
-
return partHistSuppliers;
}
+ /**
+ * @param partHistSuppliers Partitions history suppliers.
+ */
+ public void
partitionHistorySuppliers(IgniteDhtPartitionHistorySuppliersMap
partHistSuppliers) {
+ this.partHistSuppliers = partHistSuppliers;
+ }
+
/**
*
*/
@@ -371,17 +397,31 @@ public class GridDhtPartitionsFullMessage extends
GridDhtPartitionsAbstractMessa
/**
* @return Errors map.
*/
- @Nullable Map<UUID, Exception> getErrorsMap() {
+ @Nullable Map<UUID, Throwable> getErrorsMap() {
return errs;
}
/**
* @param errs Errors map.
*/
- void setErrorsMap(Map<UUID, Exception> errs) {
+ void setErrorsMap(Map<UUID, Throwable> errs) {
this.errs = new HashMap<>(errs);
}
+ /**
+ * @return Error messages map.
+ */
+ public Map<UUID, ErrorMessage> errorMessages() {
+ return errs == null ? null : F.viewReadOnly(errs, ErrorMessage::new);
+ }
+
+ /**
+ * @param errMsgs Error messages map.
+ */
+ public void errorMessages(Map<UUID, ErrorMessage> errMsgs) {
+ errs = errMsgs == null ? null : F.viewReadOnly(errMsgs, e ->
e.error());
+ }
+
/**
* Rebalance finished.
*/
@@ -396,12 +436,81 @@ public class GridDhtPartitionsFullMessage extends
GridDhtPartitionsAbstractMessa
flags = rebalanced ? (byte)(flags | REBALANCED_FLAG_MASK) :
(byte)(flags & ~REBALANCED_FLAG_MASK);
}
+ /**
+ * @return Duplicated partitions data.
+ */
+ public Map<Integer, Integer> duplicatedPartitionsData() {
+ return dupPartsData;
+ }
+
+ /**
+ * @param dupPartsData Duplicated partitions data.
+ */
+ public void duplicatedPartitionsData(Map<Integer, Integer> dupPartsData) {
+ this.dupPartsData = dupPartsData;
+ }
+
+ /**
+ * @return Partitions update counters.
+ */
+ public IgniteDhtPartitionCountersMap partitionCounters() {
+ return partCntrs;
+ }
+
+ /**
+ * @param partCntrs Partitions update counters.
+ */
+ public void partitionCounters(IgniteDhtPartitionCountersMap partCntrs) {
+ this.partCntrs = partCntrs;
+ }
+
+ /**
+ * @return Partitions that must be cleared and re-loaded.
+ */
+ public IgniteDhtPartitionsToReloadMap partitionsToReload() {
+ return partsToReload;
+ }
+
+ /**
+ * @param partsToReload Partitions that must be cleared and re-loaded.
+ */
+ public void partitionsToReload(IgniteDhtPartitionsToReloadMap
partsToReload) {
+ this.partsToReload = partsToReload;
+ }
+
+ /**
+ * @return Rebalanced flags.
+ */
+ public byte rebalancedFlags() {
+ return flags;
+ }
+
+ /**
+ * @param flags Rebalanced flags.
+ */
+ public void rebalancedFlags(byte flags) {
+ this.flags = flags;
+ }
+
+ /**
+ * @return Lost partitions.
+ */
+ public Map<Integer, int[]> lostPartitions() {
+ return lostParts;
+ }
+
+ /**
+ * @param lostParts Lost partitions.
+ */
+ public void lostPartitions(Map<Integer, int[]> lostParts) {
+ this.lostParts = lostParts;
+ }
+
/** {@inheritDoc} */
@Override public void prepareMarshal(GridCacheSharedContext<?, ?> ctx)
throws IgniteCheckedException {
super.prepareMarshal(ctx);
- boolean marshal = (!F.isEmpty(parts) && partsBytes == null) ||
- (!F.isEmpty(errs) && errsBytes == null);
+ boolean marshal = !F.isEmpty(parts) && partsBytes == null;
if (marshal) {
// Reserve at least 2 threads for system operations.
@@ -412,9 +521,6 @@ public class GridDhtPartitionsFullMessage extends
GridDhtPartitionsAbstractMessa
if (!F.isEmpty(parts) && partsBytes == null)
objectsToMarshall.add(parts);
- if (!F.isEmpty(errs) && errsBytes == null)
- objectsToMarshall.add(errs);
-
Collection<byte[]> marshalled = U.doInParallel(
parallelismLvl,
ctx.kernalContext().pools().getSystemExecutorService(),
@@ -434,9 +540,6 @@ public class GridDhtPartitionsFullMessage extends
GridDhtPartitionsAbstractMessa
if (!F.isEmpty(parts) && partsBytes == null)
partsBytes = iter.next();
-
- if (!F.isEmpty(errs) && errsBytes == null)
- errsBytes = iter.next();
}
}
@@ -468,9 +571,6 @@ public class GridDhtPartitionsFullMessage extends
GridDhtPartitionsAbstractMessa
if (partsBytes != null && parts == null)
objectsToUnmarshall.add(partsBytes);
- if (errsBytes != null && errs == null)
- objectsToUnmarshall.add(errsBytes);
-
Collection<Object> unmarshalled = U.doInParallel(
parallelismLvl,
ctx.kernalContext().pools().getSystemExecutorService(),
@@ -517,9 +617,6 @@ public class GridDhtPartitionsFullMessage extends
GridDhtPartitionsAbstractMessa
}
}
- if (errsBytes != null && errs == null)
- errs = (Map<UUID, Exception>)iter.next();
-
if (parts == null)
parts = new HashMap<>();
@@ -536,221 +633,6 @@ public class GridDhtPartitionsFullMessage extends
GridDhtPartitionsAbstractMessa
errs = new HashMap<>();
}
- /** {@inheritDoc} */
- @Override public boolean writeTo(ByteBuffer buf, MessageWriter writer) {
- writer.setBuffer(buf);
-
- if (!super.writeTo(buf, writer))
- return false;
-
- if (!writer.isHeaderWritten()) {
- if (!writer.writeHeader(directType()))
- return false;
-
- writer.onHeaderWritten();
- }
-
- switch (writer.state()) {
- case 6:
- if (!writer.writeMap(dupPartsData,
MessageCollectionItemType.INT, MessageCollectionItemType.INT))
- return false;
-
- writer.incrementState();
-
- case 7:
- if (!writer.writeByteArray(errsBytes))
- return false;
-
- writer.incrementState();
-
- case 8:
- if (!writer.writeByte(flags))
- return false;
-
- writer.incrementState();
-
- case 9:
- if (!writer.writeMap(idealAffDiff,
MessageCollectionItemType.INT, MessageCollectionItemType.MSG))
- return false;
-
- writer.incrementState();
-
- case 10:
- if (!writer.writeMap(joinedNodeAff,
MessageCollectionItemType.INT, MessageCollectionItemType.MSG))
- return false;
-
- writer.incrementState();
-
- case 11:
- if (!writer.writeMap(lostParts, MessageCollectionItemType.INT,
MessageCollectionItemType.INT_ARR))
- return false;
-
- writer.incrementState();
-
- case 12:
- if (!writer.writeMessage(partCntrs))
- return false;
-
- writer.incrementState();
-
- case 13:
- if (!writer.writeMessage(partHistSuppliers))
- return false;
-
- writer.incrementState();
-
- case 14:
- if (!writer.writeByteArray(partsBytes))
- return false;
-
- writer.incrementState();
-
- case 15:
- if (!writer.writeMap(partsSizes,
MessageCollectionItemType.INT, MessageCollectionItemType.MSG))
- return false;
-
- writer.incrementState();
-
- case 16:
- if (!writer.writeMessage(partsToReload))
- return false;
-
- writer.incrementState();
-
- case 17:
- if (!writer.writeAffinityTopologyVersion(resTopVer))
- return false;
-
- writer.incrementState();
-
- case 18:
- if (!writer.writeAffinityTopologyVersion(topVer))
- return false;
-
- writer.incrementState();
-
- }
-
- return true;
- }
-
- /** {@inheritDoc} */
- @Override public boolean readFrom(ByteBuffer buf, MessageReader reader) {
- reader.setBuffer(buf);
-
- if (!super.readFrom(buf, reader))
- return false;
-
- switch (reader.state()) {
- case 6:
- dupPartsData = reader.readMap(MessageCollectionItemType.INT,
MessageCollectionItemType.INT, false);
-
- if (!reader.isLastRead())
- return false;
-
- reader.incrementState();
-
- case 7:
- errsBytes = reader.readByteArray();
-
- if (!reader.isLastRead())
- return false;
-
- reader.incrementState();
-
- case 8:
- flags = reader.readByte();
-
- if (!reader.isLastRead())
- return false;
-
- reader.incrementState();
-
- case 9:
- idealAffDiff = reader.readMap(MessageCollectionItemType.INT,
MessageCollectionItemType.MSG, false);
-
- if (!reader.isLastRead())
- return false;
-
- reader.incrementState();
-
- case 10:
- joinedNodeAff = reader.readMap(MessageCollectionItemType.INT,
MessageCollectionItemType.MSG, false);
-
- if (!reader.isLastRead())
- return false;
-
- reader.incrementState();
-
- case 11:
- lostParts = reader.readMap(MessageCollectionItemType.INT,
MessageCollectionItemType.INT_ARR, false);
-
- if (!reader.isLastRead())
- return false;
-
- reader.incrementState();
-
- case 12:
- partCntrs = reader.readMessage();
-
- if (!reader.isLastRead())
- return false;
-
- reader.incrementState();
-
- case 13:
- partHistSuppliers = reader.readMessage();
-
- if (!reader.isLastRead())
- return false;
-
- reader.incrementState();
-
- case 14:
- partsBytes = reader.readByteArray();
-
- if (!reader.isLastRead())
- return false;
-
- reader.incrementState();
-
- case 15:
- partsSizes = reader.readMap(MessageCollectionItemType.INT,
MessageCollectionItemType.MSG, false);
-
- if (!reader.isLastRead())
- return false;
-
- reader.incrementState();
-
- case 16:
- partsToReload = reader.readMessage();
-
- if (!reader.isLastRead())
- return false;
-
- reader.incrementState();
-
- case 17:
- resTopVer = reader.readAffinityTopologyVersion();
-
- if (!reader.isLastRead())
- return false;
-
- reader.incrementState();
-
- case 18:
- topVer = reader.readAffinityTopologyVersion();
-
- if (!reader.isLastRead())
- return false;
-
- reader.incrementState();
-
- }
-
- return true;
- }
-
/** {@inheritDoc} */
@Override public short directType() {
return 46;
@@ -799,6 +681,5 @@ public class GridDhtPartitionsFullMessage extends
GridDhtPartitionsAbstractMessa
public void cleanUp() {
partsBytes = null;
partCntrs = null;
- errsBytes = null;
}
}
diff --git
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cluster/GridClusterStateProcessor.java
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cluster/GridClusterStateProcessor.java
index 0032af700d8..51419ab5ecc 100644
---
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cluster/GridClusterStateProcessor.java
+++
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cluster/GridClusterStateProcessor.java
@@ -1378,7 +1378,7 @@ public class GridClusterStateProcessor extends
GridProcessorAdapter implements I
}
/** {@inheritDoc} */
- @Override public void onStateChangeError(Map<UUID, Exception> errs,
StateChangeRequest req) {
+ @Override public void onStateChangeError(Map<UUID, Throwable> errs,
StateChangeRequest req) {
assert !F.isEmpty(errs);
// Revert caches start if activation request fail.
@@ -1408,7 +1408,7 @@ public class GridClusterStateProcessor extends
GridProcessorAdapter implements I
if (fut != null) {
IgniteCheckedException e = new IgniteCheckedException("Failed to "
+ prettyStr(req.state()), null, false);
- for (Map.Entry<UUID, Exception> entry : errs.entrySet())
+ for (Map.Entry<UUID, Throwable> entry : errs.entrySet())
e.addSuppressed(entry.getValue());
fut.onDone(e);
diff --git
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cluster/IGridClusterStateProcessor.java
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cluster/IGridClusterStateProcessor.java
index fbd1df1bb9f..dee6702d7c0 100644
---
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cluster/IGridClusterStateProcessor.java
+++
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cluster/IGridClusterStateProcessor.java
@@ -128,7 +128,7 @@ public interface IGridClusterStateProcessor extends
GridProcessor {
* @param errs Errors.
* @param req State change request.
*/
- void onStateChangeError(Map<UUID, Exception> errs, StateChangeRequest req);
+ void onStateChangeError(Map<UUID, Throwable> errs, StateChangeRequest req);
/**
* @param exchangeActions Exchange actions.