This is an automated email from the ASF dual-hosted git repository.
sergeychugunov pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/ignite.git
The following commit(s) were added to refs/heads/master by this push:
new 92074fd27b1 IGNITE-26517 Use MessageSerializer for
GridDhtPartitionsSingleMessage (#12523)
92074fd27b1 is described below
commit 92074fd27b1f28ff99750d210433e96b1965e11c
Author: Dmitry Werner <[email protected]>
AuthorDate: Wed Nov 26 12:50:24 2025 +0500
IGNITE-26517 Use MessageSerializer for GridDhtPartitionsSingleMessage
(#12523)
---
.../communication/GridIoMessageFactory.java | 9 +-
.../cache/GridCachePartitionExchangeManager.java | 10 +-
.../GridDhtPartitionsAbstractMessage.java | 80 ----
.../preloader/GridDhtPartitionsExchangeFuture.java | 9 +-
.../preloader/GridDhtPartitionsFullMessage.java | 6 +-
.../preloader/GridDhtPartitionsSingleMessage.java | 446 +++++++--------------
.../{PartitionSizesMap.java => IntLongMap.java} | 33 +-
...teDynamicCacheStartCoordinatorFailoverTest.java | 2 +-
8 files changed, 175 insertions(+), 420 deletions(-)
diff --git
a/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoMessageFactory.java
b/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoMessageFactory.java
index a91f8a7a996..1536030b251 100644
---
a/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoMessageFactory.java
+++
b/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoMessageFactory.java
@@ -66,6 +66,7 @@ import
org.apache.ignite.internal.codegen.GridDhtPartitionDemandMessageSerialize
import org.apache.ignite.internal.codegen.GridDhtPartitionExchangeIdSerializer;
import
org.apache.ignite.internal.codegen.GridDhtPartitionSupplyMessageSerializer;
import
org.apache.ignite.internal.codegen.GridDhtPartitionsFullMessageSerializer;
+import
org.apache.ignite.internal.codegen.GridDhtPartitionsSingleMessageSerializer;
import
org.apache.ignite.internal.codegen.GridDhtPartitionsSingleRequestSerializer;
import org.apache.ignite.internal.codegen.GridDhtTxFinishRequestSerializer;
import org.apache.ignite.internal.codegen.GridDhtTxFinishResponseSerializer;
@@ -118,6 +119,7 @@ import
org.apache.ignite.internal.codegen.IgniteDhtPartitionHistorySuppliersMapS
import
org.apache.ignite.internal.codegen.IgniteDhtPartitionsToReloadMapSerializer;
import org.apache.ignite.internal.codegen.IgniteTxKeySerializer;
import
org.apache.ignite.internal.codegen.IncrementalSnapshotAwareMessageSerializer;
+import org.apache.ignite.internal.codegen.IntLongMapSerializer;
import org.apache.ignite.internal.codegen.JobStealingRequestSerializer;
import org.apache.ignite.internal.codegen.LatchAckMessageSerializer;
import org.apache.ignite.internal.codegen.MetadataRequestMessageSerializer;
@@ -127,7 +129,6 @@ import
org.apache.ignite.internal.codegen.MissingMappingResponseMessageSerialize
import org.apache.ignite.internal.codegen.NearCacheUpdatesSerializer;
import org.apache.ignite.internal.codegen.NodeIdMessageSerializer;
import org.apache.ignite.internal.codegen.PartitionReservationsMapSerializer;
-import org.apache.ignite.internal.codegen.PartitionSizesMapSerializer;
import org.apache.ignite.internal.codegen.PartitionsToReloadSerializer;
import
org.apache.ignite.internal.codegen.RecoveryLastReceivedMessageSerializer;
import
org.apache.ignite.internal.codegen.SchemaOperationStatusMessageSerializer;
@@ -223,8 +224,8 @@ import
org.apache.ignite.internal.processors.cache.distributed.dht.preloader.Ign
import
org.apache.ignite.internal.processors.cache.distributed.dht.preloader.IgniteDhtPartitionCountersMap;
import
org.apache.ignite.internal.processors.cache.distributed.dht.preloader.IgniteDhtPartitionHistorySuppliersMap;
import
org.apache.ignite.internal.processors.cache.distributed.dht.preloader.IgniteDhtPartitionsToReloadMap;
+import
org.apache.ignite.internal.processors.cache.distributed.dht.preloader.IntLongMap;
import
org.apache.ignite.internal.processors.cache.distributed.dht.preloader.PartitionReservationsMap;
-import
org.apache.ignite.internal.processors.cache.distributed.dht.preloader.PartitionSizesMap;
import
org.apache.ignite.internal.processors.cache.distributed.dht.preloader.PartitionsToReload;
import
org.apache.ignite.internal.processors.cache.distributed.dht.preloader.latch.LatchAckMessage;
import
org.apache.ignite.internal.processors.cache.distributed.near.CacheVersionedValue;
@@ -364,7 +365,7 @@ public class GridIoMessageFactory implements
MessageFactoryProvider {
factory.register((short)43, GridDhtForceKeysResponse::new, new
GridDhtForceKeysResponseSerializer());
factory.register((short)45, GridDhtPartitionDemandMessage::new, new
GridDhtPartitionDemandMessageSerializer());
factory.register((short)46, GridDhtPartitionsFullMessage::new, new
GridDhtPartitionsFullMessageSerializer());
- factory.register((short)47, GridDhtPartitionsSingleMessage::new);
+ factory.register((short)47, GridDhtPartitionsSingleMessage::new, new
GridDhtPartitionsSingleMessageSerializer());
factory.register((short)48, GridDhtPartitionsSingleRequest::new, new
GridDhtPartitionsSingleRequestSerializer());
factory.register((short)49, GridNearGetRequest::new, new
GridNearGetRequestSerializer());
factory.register((short)50, GridNearGetResponse::new, new
GridNearGetResponseSerializer());
@@ -482,7 +483,7 @@ public class GridIoMessageFactory implements
MessageFactoryProvider {
factory.register(CachePartitionsToReloadMap.TYPE_CODE,
CachePartitionsToReloadMap::new, new CachePartitionsToReloadMapSerializer());
factory.register(IgniteDhtPartitionsToReloadMap.TYPE_CODE,
IgniteDhtPartitionsToReloadMap::new,
new IgniteDhtPartitionsToReloadMapSerializer());
- factory.register(PartitionSizesMap.TYPE_CODE, PartitionSizesMap::new,
new PartitionSizesMapSerializer());
+ factory.register(IntLongMap.TYPE_CODE, IntLongMap::new, new
IntLongMapSerializer());
factory.register(DeploymentModeMessage.TYPE_CODE,
DeploymentModeMessage::new, new DeploymentModeMessageSerializer());
// [-3..119] [124..129] [-23..-28] [-36..-55] [183..188] - this
diff --git
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java
index 9a6a8cfb045..bbc2e78bbb4 100644
---
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java
+++
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java
@@ -90,7 +90,7 @@ import
org.apache.ignite.internal.processors.cache.distributed.dht.preloader.Gri
import
org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPartitionsSingleRequest;
import
org.apache.ignite.internal.processors.cache.distributed.dht.preloader.IgniteDhtPartitionHistorySuppliersMap;
import
org.apache.ignite.internal.processors.cache.distributed.dht.preloader.IgniteDhtPartitionsToReloadMap;
-import
org.apache.ignite.internal.processors.cache.distributed.dht.preloader.PartitionSizesMap;
+import
org.apache.ignite.internal.processors.cache.distributed.dht.preloader.IntLongMap;
import
org.apache.ignite.internal.processors.cache.distributed.dht.preloader.PartitionsExchangeAware;
import
org.apache.ignite.internal.processors.cache.distributed.dht.preloader.RebalanceReassignExchangeTask;
import
org.apache.ignite.internal.processors.cache.distributed.dht.preloader.StopCachesOnClientReconnectExchangeTask;
@@ -1441,7 +1441,7 @@ public class GridCachePartitionExchangeManager<K, V>
extends GridCacheSharedMana
}
if (!partsSizes.isEmpty())
- m.partitionSizes(F.viewReadOnly(partsSizes,
PartitionSizesMap::new));
+ m.partitionSizes(F.viewReadOnly(partsSizes, IntLongMap::new));
return m;
}
@@ -1772,7 +1772,7 @@ public class GridCachePartitionExchangeManager<K, V>
extends GridCacheSharedMana
boolean updated = false;
- Map<Integer, PartitionSizesMap> partsSizes =
F.emptyIfNull(msg.partitionSizes());
+ Map<Integer, IntLongMap> partsSizes =
F.emptyIfNull(msg.partitionSizes());
for (Map.Entry<Integer, GridDhtPartitionFullMap> entry :
msg.partitions().entrySet()) {
Integer grpId = entry.getKey();
@@ -1782,13 +1782,13 @@ public class GridCachePartitionExchangeManager<K, V>
extends GridCacheSharedMana
GridDhtPartitionTopology top = grp == null ?
clientTops.get(grpId) : grp.topology();
if (top != null) {
- PartitionSizesMap sizesMap = partsSizes.get(grpId);
+ IntLongMap sizesMap = partsSizes.get(grpId);
updated |= top.update(null,
entry.getValue(),
null,
msg.partsToReload(cctx.localNodeId(), grpId),
- sizesMap != null ?
F.emptyIfNull(sizesMap.partitionSizes()) : Collections.emptyMap(),
+ sizesMap != null ? F.emptyIfNull(sizesMap.map()) :
Collections.emptyMap(),
msg.topologyVersion(),
null,
null);
diff --git
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsAbstractMessage.java
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsAbstractMessage.java
index 15a015d1114..df9672c02e5 100644
---
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsAbstractMessage.java
+++
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsAbstractMessage.java
@@ -17,14 +17,11 @@
package org.apache.ignite.internal.processors.cache.distributed.dht.preloader;
-import java.nio.ByteBuffer;
import org.apache.ignite.internal.Order;
import org.apache.ignite.internal.managers.communication.GridIoMessage;
import org.apache.ignite.internal.processors.cache.GridCacheMessage;
import org.apache.ignite.internal.processors.cache.version.GridCacheVersion;
import org.apache.ignite.internal.util.typedef.internal.S;
-import org.apache.ignite.plugin.extensions.communication.MessageReader;
-import org.apache.ignite.plugin.extensions.communication.MessageWriter;
import org.jetbrains.annotations.Nullable;
/**
@@ -159,83 +156,6 @@ public abstract class GridDhtPartitionsAbstractMessage
extends GridCacheMessage
return (flags & RESTORE_STATE_FLAG_MASK) != 0;
}
- /** {@inheritDoc} */
- @Override public boolean writeTo(ByteBuffer buf, MessageWriter writer) {
- // TODO: Remove #writeTo() after all inheritors have migrated to the
new ser/der scheme (IGNITE-25490).
- writer.setBuffer(buf);
-
- if (!super.writeTo(buf, writer))
- return false;
-
- if (!writer.isHeaderWritten()) {
- if (!writer.writeHeader(directType()))
- return false;
-
- writer.onHeaderWritten();
- }
-
- switch (writer.state()) {
- case 3:
- if (!writer.writeMessage(exchId))
- return false;
-
- writer.incrementState();
-
- case 4:
- if (!writer.writeByte(flags))
- return false;
-
- writer.incrementState();
-
- case 5:
- if (!writer.writeMessage(lastVer))
- return false;
-
- writer.incrementState();
-
- }
-
- return true;
- }
-
- /** {@inheritDoc} */
- @Override public boolean readFrom(ByteBuffer buf, MessageReader reader) {
- // TODO: Remove #readFrom() after all inheritors have migrated to the
new ser/der scheme (IGNITE-25490).
- reader.setBuffer(buf);
-
- if (!super.readFrom(buf, reader))
- return false;
-
- switch (reader.state()) {
- case 3:
- exchId = reader.readMessage();
-
- if (!reader.isLastRead())
- return false;
-
- reader.incrementState();
-
- case 4:
- flags = reader.readByte();
-
- if (!reader.isLastRead())
- return false;
-
- reader.incrementState();
-
- case 5:
- lastVer = reader.readMessage();
-
- if (!reader.isLastRead())
- return false;
-
- reader.incrementState();
-
- }
-
- return true;
- }
-
/** {@inheritDoc} */
@Override public String toString() {
return S.toString(GridDhtPartitionsAbstractMessage.class, this,
super.toString());
diff --git
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java
index dd1ded01ed4..3456d33cbab 100644
---
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java
+++
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java
@@ -4004,7 +4004,8 @@ public class GridDhtPartitionsExchangeFuture extends
GridDhtTopologyFutureAdapte
GridDhtPartitionsSingleMessage msg,
Map<Integer, CacheGroupAffinityMessage> messageAccumulator
) {
- msg.partitionUpdateCounters().forEach((grpId, updCntrs) ->
partitionTopology(grpId).collectUpdateCounters(updCntrs));
+ F.emptyIfNull(msg.partitionUpdateCounters()).forEach((grpId, updCntrs)
->
+ partitionTopology(grpId).collectUpdateCounters(updCntrs));
Collection<Integer> affReq = msg.cacheGroupsAffinityRequest();
@@ -4667,7 +4668,7 @@ public class GridDhtPartitionsExchangeFuture extends
GridDhtTopologyFutureAdapte
int parallelismLvl = U.availableThreadCount(cctx.kernalContext(),
GridIoPolicy.SYSTEM_POOL, 2);
try {
- Map<Integer, PartitionSizesMap> partsSizes =
F.emptyIfNull(msg.partitionSizes());
+ Map<Integer, IntLongMap> partsSizes =
F.emptyIfNull(msg.partitionSizes());
doInParallel(
parallelismLvl,
@@ -4678,13 +4679,13 @@ public class GridDhtPartitionsExchangeFuture extends
GridDhtTopologyFutureAdapte
CacheGroupContext grp = cctx.cache().cacheGroup(grpId);
if (grp != null) {
- PartitionSizesMap sizesMap = partsSizes.get(grpId);
+ IntLongMap sizesMap = partsSizes.get(grpId);
grp.topology().update(resTopVer,
msg.partitions().get(grpId),
cntrMap,
msg.partsToReload(cctx.localNodeId(), grpId),
- sizesMap != null ?
F.emptyIfNull(sizesMap.partitionSizes()) : Collections.emptyMap(),
+ sizesMap != null ? F.emptyIfNull(sizesMap.map()) :
Collections.emptyMap(),
null,
this,
msg.lostPartitions(grpId));
diff --git
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsFullMessage.java
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsFullMessage.java
index 0894a8ba48f..5655ba4a028 100644
---
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsFullMessage.java
+++
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsFullMessage.java
@@ -89,7 +89,7 @@ public class GridDhtPartitionsFullMessage extends
GridDhtPartitionsAbstractMessa
/** Partition sizes. */
@Order(value = 11, method = "partitionSizes")
- private Map<Integer, PartitionSizesMap> partsSizes;
+ private Map<Integer, IntLongMap> partsSizes;
/** Topology version. */
@Order(value = 12, method = "topologyVersion")
@@ -383,14 +383,14 @@ public class GridDhtPartitionsFullMessage extends
GridDhtPartitionsAbstractMessa
*
* @param partsSizes Partitions sizes map.
*/
- public void partitionSizes(Map<Integer, PartitionSizesMap> partsSizes) {
+ public void partitionSizes(Map<Integer, IntLongMap> partsSizes) {
this.partsSizes = partsSizes;
}
/**
* @return Partition sizes map (grpId, (partId, partSize)).
*/
- public Map<Integer, PartitionSizesMap> partitionSizes() {
+ public Map<Integer, IntLongMap> partitionSizes() {
return partsSizes;
}
diff --git
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsSingleMessage.java
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsSingleMessage.java
index 868867a2316..9266a486c7c 100644
---
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsSingleMessage.java
+++
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsSingleMessage.java
@@ -17,15 +17,13 @@
package org.apache.ignite.internal.processors.cache.distributed.dht.preloader;
-import java.nio.ByteBuffer;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.Map;
import org.apache.ignite.IgniteCheckedException;
-import org.apache.ignite.internal.GridDirectCollection;
-import org.apache.ignite.internal.GridDirectMap;
-import org.apache.ignite.internal.GridDirectTransient;
+import org.apache.ignite.internal.Order;
+import org.apache.ignite.internal.managers.communication.ErrorMessage;
import org.apache.ignite.internal.processors.cache.GridCacheSharedContext;
import
org.apache.ignite.internal.processors.cache.distributed.dht.topology.GridDhtPartitionState;
import org.apache.ignite.internal.processors.cache.version.GridCacheVersion;
@@ -33,9 +31,6 @@ import
org.apache.ignite.internal.util.tostring.GridToStringInclude;
import org.apache.ignite.internal.util.typedef.F;
import org.apache.ignite.internal.util.typedef.internal.S;
import org.apache.ignite.internal.util.typedef.internal.U;
-import
org.apache.ignite.plugin.extensions.communication.MessageCollectionItemType;
-import org.apache.ignite.plugin.extensions.communication.MessageReader;
-import org.apache.ignite.plugin.extensions.communication.MessageWriter;
import org.jetbrains.annotations.Nullable;
/**
@@ -46,62 +41,57 @@ import org.jetbrains.annotations.Nullable;
public class GridDhtPartitionsSingleMessage extends
GridDhtPartitionsAbstractMessage {
/** Local partitions. Serialized as {@link #partsBytes}, may be
compressed. */
@GridToStringInclude
- @GridDirectTransient
private Map<Integer, GridDhtPartitionMap> parts;
+ /**
+ * Serialized local partitions. Unmarshalled to {@link #parts}.
+ * <p>
+ * TODO Remove this field after completing task IGNITE-26976.
+ */
+ @Order(value = 6, method = "partitionBytes")
+ private byte[] partsBytes;
+
/** */
- @GridDirectMap(keyType = Integer.class, valueType = Integer.class)
+ @Order(value = 7, method = "duplicatedPartitionsData")
private Map<Integer, Integer> dupPartsData;
- /** Serialized local partitions. Unmarshalled to {@link #parts}. */
- private byte[] partsBytes;
-
/** Partitions update counters. */
+ @Order(value = 8, method = "partitionUpdateCounters")
@GridToStringInclude
- @GridDirectTransient
private Map<Integer, CachePartitionPartialCountersMap> partCntrs;
- /** Serialized partitions counters. */
- private byte[] partCntrsBytes;
-
/** Partitions sizes. */
+ @Order(value = 9, method = "partitionSizesMap")
@GridToStringInclude
- @GridDirectTransient
- private Map<Integer, Map<Integer, Long>> partsSizes;
-
- /** Serialized partitions counters. */
- private byte[] partsSizesBytes;
+ private Map<Integer, IntLongMap> partsSizes;
/** Partitions history reservation counters. */
+ @Order(value = 10, method = "partitionHistoryCountersMap")
@GridToStringInclude
- @GridDirectTransient
- private Map<Integer, Map<Integer, Long>> partHistCntrs;
-
- /** Serialized partitions history reservation counters. */
- private byte[] partHistCntrsBytes;
+ private Map<Integer, IntLongMap> partHistCntrs;
- /** Exception. */
+ /** Error message. */
+ @Order(value = 11, method = "errorMessage")
@GridToStringInclude
- @GridDirectTransient
- private Exception err;
-
- /** */
- private byte[] errBytes;
+ private ErrorMessage errMsg;
/** */
+ @Order(12)
private boolean client;
/** */
- @GridDirectCollection(Integer.class)
- private Collection<Integer> grpsAffRequest;
+ @Order(value = 13, method = "cacheGroupsAffinityRequest")
+ private Collection<Integer> grpsAffReq;
/** Start time of exchange on node which sent this message in nanoseconds.
*/
+ @Order(14)
private long exchangeStartTime;
/**
* Exchange finish message, sent to new coordinator when it tries to
restore state after previous coordinator failed
* during exchange.
*/
+ @Order(value = 15, method = "finishMessage")
private GridDhtPartitionsFullMessage finishMsg;
/**
@@ -132,29 +122,36 @@ public class GridDhtPartitionsSingleMessage extends
GridDhtPartitionsAbstractMes
/**
* @param finishMsg Exchange finish message (used to restore exchange
state on new coordinator).
*/
- void finishMessage(GridDhtPartitionsFullMessage finishMsg) {
+ public void finishMessage(GridDhtPartitionsFullMessage finishMsg) {
this.finishMsg = finishMsg;
}
/**
* @return Exchange finish message (used to restore exchange state on new
coordinator).
*/
- GridDhtPartitionsFullMessage finishMessage() {
+ public GridDhtPartitionsFullMessage finishMessage() {
return finishMsg;
}
/**
- * @param grpsAffRequest Cache groups to get affinity for (affinity is
requested when node joins cluster).
+ * @param grpsAffReq Cache groups to get affinity for (affinity is
requested when node joins cluster).
*/
- void cacheGroupsAffinityRequest(Collection<Integer> grpsAffRequest) {
- this.grpsAffRequest = grpsAffRequest;
+ public void cacheGroupsAffinityRequest(Collection<Integer> grpsAffReq) {
+ this.grpsAffReq = grpsAffReq;
}
/**
* @return Cache groups to get affinity for (affinity is requested when
node joins cluster).
*/
@Nullable public Collection<Integer> cacheGroupsAffinityRequest() {
- return grpsAffRequest;
+ return grpsAffReq;
+ }
+
+ /**
+ * @param client {@code True} if sent from client node.
+ */
+ public void client(boolean client) {
+ this.client = client;
}
/**
@@ -164,6 +161,20 @@ public class GridDhtPartitionsSingleMessage extends
GridDhtPartitionsAbstractMes
return client;
}
+ /**
+ * @return Duplicated partitions data.
+ */
+ public Map<Integer, Integer> duplicatedPartitionsData() {
+ return dupPartsData;
+ }
+
+ /**
+ * @param dupPartsData Duplicated partitions data.
+ */
+ public void duplicatedPartitionsData(Map<Integer, Integer> dupPartsData) {
+ this.dupPartsData = dupPartsData;
+ }
+
/**
* @param cacheId Cache ID to add local partition for.
* @param locMap Local partition map.
@@ -198,9 +209,16 @@ public class GridDhtPartitionsSingleMessage extends
GridDhtPartitionsAbstractMes
partCntrs.put(grpId, cntrMap);
}
+ /**
+ * @param partCntrs Partition update counters per cache group.
+ */
+ public void partitionUpdateCounters(Map<Integer,
CachePartitionPartialCountersMap> partCntrs) {
+ this.partCntrs = partCntrs;
+ }
+
/** @return Partition update counters per cache group. */
public Map<Integer, CachePartitionPartialCountersMap>
partitionUpdateCounters() {
- return partCntrs == null ? Collections.emptyMap() :
Collections.unmodifiableMap(partCntrs);
+ return partCntrs;
}
/**
@@ -226,7 +244,7 @@ public class GridDhtPartitionsSingleMessage extends
GridDhtPartitionsAbstractMes
if (partsSizes == null)
partsSizes = new HashMap<>();
- partsSizes.put(grpId, partSizesMap);
+ partsSizes.put(grpId, new IntLongMap(partSizesMap));
}
/**
@@ -239,29 +257,54 @@ public class GridDhtPartitionsSingleMessage extends
GridDhtPartitionsAbstractMes
if (partsSizes == null)
return Collections.emptyMap();
- return partsSizes.getOrDefault(grpId, Collections.emptyMap());
+ IntLongMap sizesMap = partsSizes.get(grpId);
+
+ return sizesMap != null ? F.emptyIfNull(sizesMap.map()) :
Collections.emptyMap();
}
/**
- * @param grpId Cache group ID.
- * @param cntrMap Partition history counters.
+ * @return Partitions sizes.
*/
- public void partitionHistoryCounters(int grpId, Map<Integer, Long>
cntrMap) {
- if (cntrMap.isEmpty())
- return;
+ public Map<Integer, IntLongMap> partitionSizesMap() {
+ return partsSizes;
+ }
+
+ /**
+ * @param partsSizes Partitions sizes.
+ */
+ public void partitionSizesMap(Map<Integer, IntLongMap> partsSizes) {
+ this.partsSizes = partsSizes;
+ }
- if (partHistCntrs == null)
- partHistCntrs = new HashMap<>();
+ /**
+ * @return Partitions history reservation counters.
+ */
+ public Map<Integer, IntLongMap> partitionHistoryCountersMap() {
+ return partHistCntrs;
+ }
- partHistCntrs.put(grpId, cntrMap);
+ /**
+ * @param partHistCntrs Partitions history reservation counters.
+ */
+ public void partitionHistoryCountersMap(Map<Integer, IntLongMap>
partHistCntrs) {
+ this.partHistCntrs = partHistCntrs;
}
/**
* @param cntrMap Partition history counters.
*/
void partitionHistoryCounters(Map<Integer, Map<Integer, Long>> cntrMap) {
- for (Map.Entry<Integer, Map<Integer, Long>> e : cntrMap.entrySet())
- partitionHistoryCounters(e.getKey(), e.getValue());
+ for (Map.Entry<Integer, Map<Integer, Long>> e : cntrMap.entrySet()) {
+ Map<Integer, Long> historyCntrs = e.getValue();
+
+ if (historyCntrs.isEmpty())
+ continue;
+
+ if (partHistCntrs == null)
+ partHistCntrs = new HashMap<>();
+
+ partHistCntrs.put(e.getKey(), new IntLongMap(historyCntrs));
+ }
}
/**
@@ -270,9 +313,9 @@ public class GridDhtPartitionsSingleMessage extends
GridDhtPartitionsAbstractMes
*/
Map<Integer, Long> partitionHistoryCounters(int grpId) {
if (partHistCntrs != null) {
- Map<Integer, Long> res = partHistCntrs.get(grpId);
+ IntLongMap res = partHistCntrs.get(grpId);
- return res != null ? res : Collections.<Integer, Long>emptyMap();
+ return res != null ? F.emptyIfNull(res.map()) :
Collections.emptyMap();
}
return Collections.emptyMap();
@@ -288,18 +331,46 @@ public class GridDhtPartitionsSingleMessage extends
GridDhtPartitionsAbstractMes
return parts;
}
+ /**
+ * @return Serialized local partitions.
+ */
+ public byte[] partitionBytes() {
+ return partsBytes;
+ }
+
+ /**
+ * @param partsBytes Serialized local partitions.
+ */
+ public void partitionBytes(byte[] partsBytes) {
+ this.partsBytes = partsBytes;
+ }
+
+ /**
+ * @return Error message.
+ */
+ public ErrorMessage errorMessage() {
+ return errMsg;
+ }
+
+ /**
+ * @param errMsg Error message.
+ */
+ public void errorMessage(ErrorMessage errMsg) {
+ this.errMsg = errMsg;
+ }
+
/**
* @param ex Exception.
*/
- public void setError(Exception ex) {
- this.err = ex;
+ public void setError(Throwable ex) {
+ errMsg = new ErrorMessage(ex);
}
/**
* @return Not null exception if exchange processing failed.
*/
- @Nullable public Exception getError() {
- return err;
+ @Nullable public Throwable getError() {
+ return ErrorMessage.error(errMsg);
}
/**
@@ -317,50 +388,15 @@ public class GridDhtPartitionsSingleMessage extends
GridDhtPartitionsAbstractMes
}
/** {@inheritDoc} */
- @Override public void prepareMarshal(GridCacheSharedContext ctx) throws
IgniteCheckedException {
+ @Override public void prepareMarshal(GridCacheSharedContext<?, ?> ctx)
throws IgniteCheckedException {
super.prepareMarshal(ctx);
- boolean marshal = (parts != null && partsBytes == null) ||
- (partCntrs != null && partCntrsBytes == null) ||
- (partHistCntrs != null && partHistCntrsBytes == null) ||
- (partsSizes != null && partsSizesBytes == null) ||
- (err != null && errBytes == null);
-
- if (marshal) {
- byte[] partsBytes0 = null;
- byte[] partCntrsBytes0 = null;
- byte[] partHistCntrsBytes0 = null;
- byte[] partsSizesBytes0 = null;
- byte[] errBytes0 = null;
-
- if (parts != null && partsBytes == null)
- partsBytes0 = U.marshal(ctx, parts);
-
- if (partCntrs != null && partCntrsBytes == null)
- partCntrsBytes0 = U.marshal(ctx, partCntrs);
-
- if (partHistCntrs != null && partHistCntrsBytes == null)
- partHistCntrsBytes0 = U.marshal(ctx, partHistCntrs);
-
- if (partsSizes != null && partsSizesBytes == null)
- partsSizesBytes0 = U.marshal(ctx, partsSizes);
-
- if (err != null && errBytes == null)
- errBytes0 = U.marshal(ctx, err);
+ if (parts != null && partsBytes == null) {
+ byte[] partsBytes0 = U.marshal(ctx, parts);
if (compressed()) {
try {
- byte[] partsBytesZip = U.zip(partsBytes0);
- byte[] partCntrsBytesZip = U.zip(partCntrsBytes0);
- byte[] partHistCntrsBytesZip = U.zip(partHistCntrsBytes0);
- byte[] partsSizesBytesZip = U.zip(partsSizesBytes0);
- byte[] exBytesZip = U.zip(errBytes0);
-
- partsBytes0 = partsBytesZip;
- partCntrsBytes0 = partCntrsBytesZip;
- partHistCntrsBytes0 = partHistCntrsBytesZip;
- partsSizesBytes0 = partsSizesBytesZip;
- errBytes0 = exBytesZip;
+ partsBytes0 = U.zip(partsBytes0);
}
catch (IgniteCheckedException e) {
U.error(ctx.logger(getClass()), "Failed to compress
partitions data: " + e, e);
@@ -368,50 +404,17 @@ public class GridDhtPartitionsSingleMessage extends
GridDhtPartitionsAbstractMes
}
partsBytes = partsBytes0;
- partCntrsBytes = partCntrsBytes0;
- partHistCntrsBytes = partHistCntrsBytes0;
- partsSizesBytes = partsSizesBytes0;
- errBytes = errBytes0;
}
}
/** {@inheritDoc} */
- @Override public void finishUnmarshal(GridCacheSharedContext ctx,
ClassLoader ldr) throws IgniteCheckedException {
+ @Override public void finishUnmarshal(GridCacheSharedContext<?, ?> ctx,
ClassLoader ldr) throws IgniteCheckedException {
super.finishUnmarshal(ctx, ldr);
if (partsBytes != null && parts == null) {
- if (compressed())
- parts = U.unmarshalZip(ctx.marshaller(), partsBytes,
U.resolveClassLoader(ldr, ctx.gridConfig()));
- else
- parts = U.unmarshal(ctx, partsBytes, U.resolveClassLoader(ldr,
ctx.gridConfig()));
- }
-
- if (partCntrsBytes != null && partCntrs == null) {
- if (compressed())
- partCntrs = U.unmarshalZip(ctx.marshaller(), partCntrsBytes,
U.resolveClassLoader(ldr, ctx.gridConfig()));
- else
- partCntrs = U.unmarshal(ctx, partCntrsBytes,
U.resolveClassLoader(ldr, ctx.gridConfig()));
- }
-
- if (partHistCntrsBytes != null && partHistCntrs == null) {
- if (compressed())
- partHistCntrs = U.unmarshalZip(ctx.marshaller(),
partHistCntrsBytes, U.resolveClassLoader(ldr, ctx.gridConfig()));
- else
- partHistCntrs = U.unmarshal(ctx, partHistCntrsBytes,
U.resolveClassLoader(ldr, ctx.gridConfig()));
- }
-
- if (partsSizesBytes != null && partsSizes == null) {
- if (compressed())
- partsSizes = U.unmarshalZip(ctx.marshaller(), partsSizesBytes,
U.resolveClassLoader(ldr, ctx.gridConfig()));
- else
- partsSizes = U.unmarshal(ctx, partsSizesBytes,
U.resolveClassLoader(ldr, ctx.gridConfig()));
- }
-
- if (errBytes != null && err == null) {
- if (compressed())
- err = U.unmarshalZip(ctx.marshaller(), errBytes,
U.resolveClassLoader(ldr, ctx.gridConfig()));
- else
- err = U.unmarshal(ctx, errBytes, U.resolveClassLoader(ldr,
ctx.gridConfig()));
+ parts = compressed()
+ ? U.unmarshalZip(ctx.marshaller(), partsBytes,
U.resolveClassLoader(ldr, ctx.gridConfig()))
+ : U.unmarshal(ctx, partsBytes, U.resolveClassLoader(ldr,
ctx.gridConfig()));
}
if (dupPartsData != null) {
@@ -435,179 +438,6 @@ public class GridDhtPartitionsSingleMessage extends
GridDhtPartitionsAbstractMes
}
}
- /** {@inheritDoc} */
- @Override public boolean writeTo(ByteBuffer buf, MessageWriter writer) {
- writer.setBuffer(buf);
-
- if (!super.writeTo(buf, writer))
- return false;
-
- if (!writer.isHeaderWritten()) {
- if (!writer.writeHeader(directType()))
- return false;
-
- writer.onHeaderWritten();
- }
-
- switch (writer.state()) {
- case 6:
- if (!writer.writeBoolean(client))
- return false;
-
- writer.incrementState();
-
- case 7:
- if (!writer.writeMap(dupPartsData,
MessageCollectionItemType.INT, MessageCollectionItemType.INT))
- return false;
-
- writer.incrementState();
-
- case 8:
- if (!writer.writeByteArray(errBytes))
- return false;
-
- writer.incrementState();
-
- case 9:
- if (!writer.writeLong(exchangeStartTime))
- return false;
-
- writer.incrementState();
-
- case 10:
- if (!writer.writeMessage(finishMsg))
- return false;
-
- writer.incrementState();
-
- case 11:
- if (!writer.writeCollection(grpsAffRequest,
MessageCollectionItemType.INT))
- return false;
-
- writer.incrementState();
-
- case 12:
- if (!writer.writeByteArray(partCntrsBytes))
- return false;
-
- writer.incrementState();
-
- case 13:
- if (!writer.writeByteArray(partHistCntrsBytes))
- return false;
-
- writer.incrementState();
-
- case 14:
- if (!writer.writeByteArray(partsBytes))
- return false;
-
- writer.incrementState();
-
- case 15:
- if (!writer.writeByteArray(partsSizesBytes))
- return false;
-
- writer.incrementState();
-
- }
-
- return true;
- }
-
- /** {@inheritDoc} */
- @Override public boolean readFrom(ByteBuffer buf, MessageReader reader) {
- reader.setBuffer(buf);
-
- if (!super.readFrom(buf, reader))
- return false;
-
- switch (reader.state()) {
- case 6:
- client = reader.readBoolean();
-
- if (!reader.isLastRead())
- return false;
-
- reader.incrementState();
-
- case 7:
- dupPartsData = reader.readMap(MessageCollectionItemType.INT,
MessageCollectionItemType.INT, false);
-
- if (!reader.isLastRead())
- return false;
-
- reader.incrementState();
-
- case 8:
- errBytes = reader.readByteArray();
-
- if (!reader.isLastRead())
- return false;
-
- reader.incrementState();
-
- case 9:
- exchangeStartTime = reader.readLong();
-
- if (!reader.isLastRead())
- return false;
-
- reader.incrementState();
-
- case 10:
- finishMsg = reader.readMessage();
-
- if (!reader.isLastRead())
- return false;
-
- reader.incrementState();
-
- case 11:
- grpsAffRequest =
reader.readCollection(MessageCollectionItemType.INT);
-
- if (!reader.isLastRead())
- return false;
-
- reader.incrementState();
-
- case 12:
- partCntrsBytes = reader.readByteArray();
-
- if (!reader.isLastRead())
- return false;
-
- reader.incrementState();
-
- case 13:
- partHistCntrsBytes = reader.readByteArray();
-
- if (!reader.isLastRead())
- return false;
-
- reader.incrementState();
-
- case 14:
- partsBytes = reader.readByteArray();
-
- if (!reader.isLastRead())
- return false;
-
- reader.incrementState();
-
- case 15:
- partsSizesBytes = reader.readByteArray();
-
- if (!reader.isLastRead())
- return false;
-
- reader.incrementState();
-
- }
-
- return true;
- }
-
/** {@inheritDoc} */
@Override public short directType() {
return 47;
diff --git
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/PartitionSizesMap.java
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/IntLongMap.java
similarity index 67%
rename from
modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/PartitionSizesMap.java
rename to
modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/IntLongMap.java
index 987fa9fe604..fba057b5c4d 100644
---
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/PartitionSizesMap.java
+++
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/IntLongMap.java
@@ -22,39 +22,42 @@ import org.apache.ignite.internal.Order;
import org.apache.ignite.plugin.extensions.communication.Message;
import org.jetbrains.annotations.Nullable;
-/** Partition sizes map. */
-public class PartitionSizesMap implements Message {
+/**
+ * Map for storing integer to long value mapping (e.g. partition size or
partition history counter for
+ * a partition of a given id).
+ */
+public class IntLongMap implements Message {
/** Type code. */
public static final short TYPE_CODE = 514;
- /** Partition sizes map. */
- @Order(value = 0, method = "partitionSizes")
- private @Nullable Map<Integer, Long> partSizes;
+ /** Map. */
+ @Order(0)
+ private @Nullable Map<Integer, Long> map;
/** Default constructor. */
- public PartitionSizesMap() {
+ public IntLongMap() {
// No-op.
}
/**
- * @param partSizes Partition sizes map.
+ * @param map Map.
*/
- public PartitionSizesMap(@Nullable Map<Integer, Long> partSizes) {
- this.partSizes = partSizes;
+ public IntLongMap(@Nullable Map<Integer, Long> map) {
+ this.map = map;
}
/**
- * @return Partition sizes map.
+ * @return Map.
*/
- public @Nullable Map<Integer, Long> partitionSizes() {
- return partSizes;
+ public @Nullable Map<Integer, Long> map() {
+ return map;
}
/**
- * @param partSizes Partition sizes map.
+ * @param map Map.
*/
- public void partitionSizes(Map<Integer, Long> partSizes) {
- this.partSizes = partSizes;
+ public void map(Map<Integer, Long> map) {
+ this.map = map;
}
/** {@inheritDoc} */
diff --git
a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteDynamicCacheStartCoordinatorFailoverTest.java
b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteDynamicCacheStartCoordinatorFailoverTest.java
index 0c34afbcc84..a7e9841931c 100644
---
a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteDynamicCacheStartCoordinatorFailoverTest.java
+++
b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteDynamicCacheStartCoordinatorFailoverTest.java
@@ -193,7 +193,7 @@ public class IgniteDynamicCacheStartCoordinatorFailoverTest
extends GridCommonAb
GridDhtPartitionsSingleMessage singleMsg =
(GridDhtPartitionsSingleMessage)msg0.message();
- Exception err = singleMsg.getError();
+ Throwable err = singleMsg.getError();
if (Boolean.TRUE.equals(attr) && err != null) {
// skip message