This is an automated email from the ASF dual-hosted git repository.
sergeychugunov pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/ignite.git
The following commit(s) were added to refs/heads/master by this push:
new b2fdcea5483 IGNITE-26866 Add Message interface to
IgniteDhtPartitionsToReloadMap (#12487)
b2fdcea5483 is described below
commit b2fdcea548339b4a13e636adb8cce4c0796421ec
Author: Dmitry Werner <[email protected]>
AuthorDate: Fri Nov 7 13:00:04 2025 +0500
IGNITE-26866 Add Message interface to IgniteDhtPartitionsToReloadMap
(#12487)
---
.../apache/ignite/internal/util/lang/GridFunc.java | 7 ++
.../communication/GridIoMessageFactory.java | 13 ++++
.../cache/GridCachePartitionExchangeManager.java | 9 ++-
.../dht/preloader/CachePartitionsToReloadMap.java | 75 ++++++++++++++++++++++
.../preloader/GridDhtPartitionsExchangeFuture.java | 16 +++--
.../preloader/GridDhtPartitionsFullMessage.java | 69 ++++----------------
.../preloader/IgniteDhtPartitionsToReloadMap.java | 57 ++++++++++------
.../dht/preloader/PartitionSizesMap.java | 64 ++++++++++++++++++
.../dht/preloader/PartitionsToReload.java | 62 ++++++++++++++++++
.../dht/topology/GridClientPartitionTopology.java | 2 +-
.../dht/topology/GridDhtPartitionTopology.java | 5 +-
.../dht/topology/GridDhtPartitionTopologyImpl.java | 2 +-
12 files changed, 292 insertions(+), 89 deletions(-)
diff --git
a/modules/commons/src/main/java/org/apache/ignite/internal/util/lang/GridFunc.java
b/modules/commons/src/main/java/org/apache/ignite/internal/util/lang/GridFunc.java
index 3af938392a4..8bf794062ef 100755
---
a/modules/commons/src/main/java/org/apache/ignite/internal/util/lang/GridFunc.java
+++
b/modules/commons/src/main/java/org/apache/ignite/internal/util/lang/GridFunc.java
@@ -2146,4 +2146,11 @@ public class GridFunc {
public static <T> Collection<T> emptyIfNull(@Nullable Collection<T> col) {
return col == null ? Collections.emptySet() : col;
}
+
+ /**
+ * @param map Map.
+ */
+ public static <K, V> Map<K, V> emptyIfNull(@Nullable Map<K, V> map) {
+ return map == null ? Collections.emptyMap() : map;
+ }
}
diff --git
a/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoMessageFactory.java
b/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoMessageFactory.java
index c3804b5beae..9028c55a0c8 100644
---
a/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoMessageFactory.java
+++
b/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoMessageFactory.java
@@ -35,6 +35,7 @@ import
org.apache.ignite.internal.codegen.CacheGroupAffinityMessageSerializer;
import org.apache.ignite.internal.codegen.CacheInvokeDirectResultSerializer;
import
org.apache.ignite.internal.codegen.CachePartitionFullCountersMapSerializer;
import
org.apache.ignite.internal.codegen.CachePartitionPartialCountersMapSerializer;
+import org.apache.ignite.internal.codegen.CachePartitionsToReloadMapSerializer;
import org.apache.ignite.internal.codegen.CacheVersionedValueSerializer;
import
org.apache.ignite.internal.codegen.CacheWriteSynchronizationModeMessageSerializer;
import org.apache.ignite.internal.codegen.ErrorMessageSerializer;
@@ -110,6 +111,7 @@ import
org.apache.ignite.internal.codegen.HandshakeWaitMessageSerializer;
import
org.apache.ignite.internal.codegen.IgniteDhtDemandedPartitionsMapSerializer;
import
org.apache.ignite.internal.codegen.IgniteDhtPartitionCountersMapSerializer;
import
org.apache.ignite.internal.codegen.IgniteDhtPartitionHistorySuppliersMapSerializer;
+import
org.apache.ignite.internal.codegen.IgniteDhtPartitionsToReloadMapSerializer;
import org.apache.ignite.internal.codegen.IgniteTxKeySerializer;
import
org.apache.ignite.internal.codegen.IncrementalSnapshotAwareMessageSerializer;
import org.apache.ignite.internal.codegen.JobStealingRequestSerializer;
@@ -121,6 +123,8 @@ import
org.apache.ignite.internal.codegen.MissingMappingResponseMessageSerialize
import org.apache.ignite.internal.codegen.NearCacheUpdatesSerializer;
import org.apache.ignite.internal.codegen.NodeIdMessageSerializer;
import org.apache.ignite.internal.codegen.PartitionReservationsMapSerializer;
+import org.apache.ignite.internal.codegen.PartitionSizesMapSerializer;
+import org.apache.ignite.internal.codegen.PartitionsToReloadSerializer;
import
org.apache.ignite.internal.codegen.RecoveryLastReceivedMessageSerializer;
import
org.apache.ignite.internal.codegen.SchemaOperationStatusMessageSerializer;
import org.apache.ignite.internal.codegen.ServiceDeploymentProcessIdSerializer;
@@ -201,6 +205,7 @@ import
org.apache.ignite.internal.processors.cache.distributed.dht.atomic.Update
import
org.apache.ignite.internal.processors.cache.distributed.dht.preloader.CacheGroupAffinityMessage;
import
org.apache.ignite.internal.processors.cache.distributed.dht.preloader.CachePartitionFullCountersMap;
import
org.apache.ignite.internal.processors.cache.distributed.dht.preloader.CachePartitionPartialCountersMap;
+import
org.apache.ignite.internal.processors.cache.distributed.dht.preloader.CachePartitionsToReloadMap;
import
org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtForceKeysRequest;
import
org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtForceKeysResponse;
import
org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPartitionDemandMessage;
@@ -213,7 +218,10 @@ import
org.apache.ignite.internal.processors.cache.distributed.dht.preloader.Gro
import
org.apache.ignite.internal.processors.cache.distributed.dht.preloader.IgniteDhtDemandedPartitionsMap;
import
org.apache.ignite.internal.processors.cache.distributed.dht.preloader.IgniteDhtPartitionCountersMap;
import
org.apache.ignite.internal.processors.cache.distributed.dht.preloader.IgniteDhtPartitionHistorySuppliersMap;
+import
org.apache.ignite.internal.processors.cache.distributed.dht.preloader.IgniteDhtPartitionsToReloadMap;
import
org.apache.ignite.internal.processors.cache.distributed.dht.preloader.PartitionReservationsMap;
+import
org.apache.ignite.internal.processors.cache.distributed.dht.preloader.PartitionSizesMap;
+import
org.apache.ignite.internal.processors.cache.distributed.dht.preloader.PartitionsToReload;
import
org.apache.ignite.internal.processors.cache.distributed.dht.preloader.latch.LatchAckMessage;
import
org.apache.ignite.internal.processors.cache.distributed.near.CacheVersionedValue;
import
org.apache.ignite.internal.processors.cache.distributed.near.GridNearGetRequest;
@@ -466,6 +474,11 @@ public class GridIoMessageFactory implements
MessageFactoryProvider {
factory.register(PartitionReservationsMap.TYPE_CODE,
PartitionReservationsMap::new, new PartitionReservationsMapSerializer());
factory.register(IgniteDhtPartitionHistorySuppliersMap.TYPE_CODE,
IgniteDhtPartitionHistorySuppliersMap::new,
new IgniteDhtPartitionHistorySuppliersMapSerializer());
+ factory.register(PartitionsToReload.TYPE_CODE,
PartitionsToReload::new, new PartitionsToReloadSerializer());
+ factory.register(CachePartitionsToReloadMap.TYPE_CODE,
CachePartitionsToReloadMap::new, new CachePartitionsToReloadMapSerializer());
+ factory.register(IgniteDhtPartitionsToReloadMap.TYPE_CODE,
IgniteDhtPartitionsToReloadMap::new,
+ new IgniteDhtPartitionsToReloadMapSerializer());
+ factory.register(PartitionSizesMap.TYPE_CODE, PartitionSizesMap::new,
new PartitionSizesMapSerializer());
// [-3..119] [124..129] [-23..-28] [-36..-55] [183..188] - this
// [120..123] - DR
diff --git
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java
index 6bff39ae692..9a6a8cfb045 100644
---
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java
+++
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java
@@ -90,6 +90,7 @@ import
org.apache.ignite.internal.processors.cache.distributed.dht.preloader.Gri
import
org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPartitionsSingleRequest;
import
org.apache.ignite.internal.processors.cache.distributed.dht.preloader.IgniteDhtPartitionHistorySuppliersMap;
import
org.apache.ignite.internal.processors.cache.distributed.dht.preloader.IgniteDhtPartitionsToReloadMap;
+import
org.apache.ignite.internal.processors.cache.distributed.dht.preloader.PartitionSizesMap;
import
org.apache.ignite.internal.processors.cache.distributed.dht.preloader.PartitionsExchangeAware;
import
org.apache.ignite.internal.processors.cache.distributed.dht.preloader.RebalanceReassignExchangeTask;
import
org.apache.ignite.internal.processors.cache.distributed.dht.preloader.StopCachesOnClientReconnectExchangeTask;
@@ -1440,7 +1441,7 @@ public class GridCachePartitionExchangeManager<K, V>
extends GridCacheSharedMana
}
if (!partsSizes.isEmpty())
- m.partitionSizes(cctx, partsSizes);
+ m.partitionSizes(F.viewReadOnly(partsSizes,
PartitionSizesMap::new));
return m;
}
@@ -1771,7 +1772,7 @@ public class GridCachePartitionExchangeManager<K, V>
extends GridCacheSharedMana
boolean updated = false;
- Map<Integer, Map<Integer, Long>> partsSizes =
msg.partitionSizes(cctx);
+ Map<Integer, PartitionSizesMap> partsSizes =
F.emptyIfNull(msg.partitionSizes());
for (Map.Entry<Integer, GridDhtPartitionFullMap> entry :
msg.partitions().entrySet()) {
Integer grpId = entry.getKey();
@@ -1781,11 +1782,13 @@ public class GridCachePartitionExchangeManager<K, V>
extends GridCacheSharedMana
GridDhtPartitionTopology top = grp == null ?
clientTops.get(grpId) : grp.topology();
if (top != null) {
+ PartitionSizesMap sizesMap = partsSizes.get(grpId);
+
updated |= top.update(null,
entry.getValue(),
null,
msg.partsToReload(cctx.localNodeId(), grpId),
- partsSizes.getOrDefault(grpId,
Collections.emptyMap()),
+ sizesMap != null ?
F.emptyIfNull(sizesMap.partitionSizes()) : Collections.emptyMap(),
msg.topologyVersion(),
null,
null);
diff --git
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/CachePartitionsToReloadMap.java
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/CachePartitionsToReloadMap.java
new file mode 100644
index 00000000000..bcf6ab80c3a
--- /dev/null
+++
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/CachePartitionsToReloadMap.java
@@ -0,0 +1,75 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.cache.distributed.dht.preloader;
+
+import java.util.HashMap;
+import java.util.Map;
+import org.apache.ignite.internal.Order;
+import org.apache.ignite.plugin.extensions.communication.Message;
+import org.jetbrains.annotations.Nullable;
+
+/** Partition reload map for cache. */
+public class CachePartitionsToReloadMap implements Message {
+ /** Type code. */
+ public static final short TYPE_CODE = 512;
+
+ /** Partition reload map for cache. */
+ @Order(value = 0, method = "cachePartitions")
+ private Map<Integer, PartitionsToReload> map;
+
+ /**
+ * @return Partition reload map for cache.
+ */
+ public Map<Integer, PartitionsToReload> cachePartitions() {
+ return map;
+ }
+
+ /**
+ * @param map Partition reload map for cache.
+ */
+ public void cachePartitions(Map<Integer, PartitionsToReload> map) {
+ this.map = map;
+ }
+
+ /**
+ * @param cacheId Cache id.
+ * @return Partitions to reload for this cache.
+ */
+ public @Nullable PartitionsToReload get(int cacheId) {
+ if (map == null)
+ return null;
+
+ return map.get(cacheId);
+ }
+
+ /**
+ * @param cacheId Cache id.
+ * @param parts Partitions to reload.
+ */
+ public void put(int cacheId, PartitionsToReload parts) {
+ if (map == null)
+ map = new HashMap<>();
+
+ map.put(cacheId, parts);
+ }
+
+ /** {@inheritDoc} */
+ @Override public short directType() {
+ return TYPE_CODE;
+ }
+}
diff --git
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java
index 0a1b955f0d0..e40f6234a90 100644
---
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java
+++
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java
@@ -3311,12 +3311,18 @@ public class GridDhtPartitionsExchangeFuture extends
GridDhtTopologyFutureAdapte
if (partMap == null)
continue;
+ Map<Integer, Long> grpPartSizes =
singleMsg.partitionSizes(top.groupId());
+
for (Map.Entry<Integer, GridDhtPartitionState> e0 :
partMap.entrySet()) {
int p = e0.getKey();
GridDhtPartitionState state = e0.getValue();
- if (state == GridDhtPartitionState.OWNING)
- partSizes.put(p,
singleMsg.partitionSizes(top.groupId()).get(p));
+ if (state == GridDhtPartitionState.OWNING) {
+ Long size = grpPartSizes.get(p);
+
+ if (size != null)
+ partSizes.put(p, size);
+ }
}
}
@@ -4654,7 +4660,7 @@ public class GridDhtPartitionsExchangeFuture extends
GridDhtTopologyFutureAdapte
int parallelismLvl = U.availableThreadCount(cctx.kernalContext(),
GridIoPolicy.SYSTEM_POOL, 2);
try {
- Map<Integer, Map<Integer, Long>> partsSizes =
msg.partitionSizes(cctx);
+ Map<Integer, PartitionSizesMap> partsSizes =
F.emptyIfNull(msg.partitionSizes());
doInParallel(
parallelismLvl,
@@ -4665,11 +4671,13 @@ public class GridDhtPartitionsExchangeFuture extends
GridDhtTopologyFutureAdapte
CacheGroupContext grp = cctx.cache().cacheGroup(grpId);
if (grp != null) {
+ PartitionSizesMap sizesMap = partsSizes.get(grpId);
+
grp.topology().update(resTopVer,
msg.partitions().get(grpId),
cntrMap,
msg.partsToReload(cctx.localNodeId(), grpId),
- partsSizes.getOrDefault(grpId,
Collections.emptyMap()),
+ sizesMap != null ?
F.emptyIfNull(sizesMap.partitionSizes()) : Collections.emptyMap(),
null,
this,
msg.lostPartitions(grpId));
diff --git
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsFullMessage.java
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsFullMessage.java
index 469a85140f7..15fef52e473 100644
---
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsFullMessage.java
+++
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsFullMessage.java
@@ -29,7 +29,6 @@ import java.util.UUID;
import java.util.stream.Collectors;
import java.util.stream.IntStream;
import org.apache.ignite.IgniteCheckedException;
-import org.apache.ignite.IgniteException;
import org.apache.ignite.cluster.ClusterNode;
import org.apache.ignite.internal.GridDirectMap;
import org.apache.ignite.internal.GridDirectTransient;
@@ -83,14 +82,10 @@ public class GridDhtPartitionsFullMessage extends
GridDhtPartitionsAbstractMessa
/** Partitions that must be cleared and re-loaded. */
@GridToStringInclude
- @GridDirectTransient
private IgniteDhtPartitionsToReloadMap partsToReload;
- /** Serialized partitions that must be cleared and re-loaded. */
- private byte[] partsToReloadBytes;
-
- /** Serialized partitions sizes. */
- private byte[] partsSizesBytes;
+ /** Partition sizes. */
+ private Map<Integer, PartitionSizesMap> partsSizes;
/** Topology version. */
private AffinityTopologyVersion topVer;
@@ -178,8 +173,7 @@ public class GridDhtPartitionsFullMessage extends
GridDhtPartitionsAbstractMessa
cp.partCntrs = partCntrs;
cp.partHistSuppliers = partHistSuppliers;
cp.partsToReload = partsToReload;
- cp.partsToReloadBytes = partsToReloadBytes;
- cp.partsSizesBytes = partsSizesBytes;
+ cp.partsSizes = partsSizes;
cp.topVer = topVer;
cp.errs = errs;
cp.errsBytes = errsBytes;
@@ -351,7 +345,7 @@ public class GridDhtPartitionsFullMessage extends
GridDhtPartitionsAbstractMessa
/**
*
*/
- public Set<Integer> partsToReload(UUID nodeId, int grpId) {
+ public Collection<Integer> partsToReload(UUID nodeId, int grpId) {
if (partsToReload == null)
return Collections.emptySet();
@@ -361,41 +355,17 @@ public class GridDhtPartitionsFullMessage extends
GridDhtPartitionsAbstractMessa
/**
* Supplies partition sizes map for all cache groups.
*
- * @param ctx Cache context.
* @param partsSizes Partitions sizes map.
*/
- public void partitionSizes(GridCacheSharedContext ctx, Map<Integer,
Map<Integer, Long>> partsSizes) {
- try {
- byte[] marshalled = U.marshal(ctx, partsSizes);
-
- if (compressed())
- marshalled = U.zip(marshalled,
ctx.gridConfig().getNetworkCompressionLevel());
-
- partsSizesBytes = marshalled;
- }
- catch (IgniteCheckedException ex) {
- throw new IgniteException(ex);
- }
+ public void partitionSizes(Map<Integer, PartitionSizesMap> partsSizes) {
+ this.partsSizes = partsSizes;
}
/**
- * Returns partition sizes map for all cache groups.
- *
- * @param ctx Cache context.
* @return Partition sizes map (grpId, (partId, partSize)).
*/
- public Map<Integer, Map<Integer, Long>>
partitionSizes(GridCacheSharedContext ctx) {
- if (partsSizesBytes == null)
- return Collections.emptyMap();
-
- try {
- return compressed()
- ? U.unmarshalZip(ctx.marshaller(), partsSizesBytes,
ctx.deploy().globalLoader())
- : U.unmarshal(ctx, partsSizesBytes,
ctx.deploy().globalLoader());
- }
- catch (IgniteCheckedException ex) {
- throw new IgniteException(ex);
- }
+ public Map<Integer, PartitionSizesMap> partitionSizes() {
+ return partsSizes;
}
/**
@@ -431,7 +401,6 @@ public class GridDhtPartitionsFullMessage extends
GridDhtPartitionsAbstractMessa
super.prepareMarshal(ctx);
boolean marshal = (!F.isEmpty(parts) && partsBytes == null) ||
- (partsToReload != null && partsToReloadBytes == null) ||
(!F.isEmpty(errs) && errsBytes == null);
if (marshal) {
@@ -443,9 +412,6 @@ public class GridDhtPartitionsFullMessage extends
GridDhtPartitionsAbstractMessa
if (!F.isEmpty(parts) && partsBytes == null)
objectsToMarshall.add(parts);
- if (partsToReload != null && partsToReloadBytes == null)
- objectsToMarshall.add(partsToReload);
-
if (!F.isEmpty(errs) && errsBytes == null)
objectsToMarshall.add(errs);
@@ -469,9 +435,6 @@ public class GridDhtPartitionsFullMessage extends
GridDhtPartitionsAbstractMessa
if (!F.isEmpty(parts) && partsBytes == null)
partsBytes = iter.next();
- if (partsToReload != null && partsToReloadBytes == null)
- partsToReloadBytes = iter.next();
-
if (!F.isEmpty(errs) && errsBytes == null)
errsBytes = iter.next();
}
@@ -505,9 +468,6 @@ public class GridDhtPartitionsFullMessage extends
GridDhtPartitionsAbstractMessa
if (partsBytes != null && parts == null)
objectsToUnmarshall.add(partsBytes);
- if (partsToReloadBytes != null && partsToReload == null)
- objectsToUnmarshall.add(partsToReloadBytes);
-
if (errsBytes != null && errs == null)
objectsToUnmarshall.add(errsBytes);
@@ -557,9 +517,6 @@ public class GridDhtPartitionsFullMessage extends
GridDhtPartitionsAbstractMessa
}
}
- if (partsToReloadBytes != null && partsToReload == null)
- partsToReload = (IgniteDhtPartitionsToReloadMap)iter.next();
-
if (errsBytes != null && errs == null)
errs = (Map<UUID, Exception>)iter.next();
@@ -649,13 +606,13 @@ public class GridDhtPartitionsFullMessage extends
GridDhtPartitionsAbstractMessa
writer.incrementState();
case 15:
- if (!writer.writeByteArray(partsSizesBytes))
+ if (!writer.writeMap(partsSizes,
MessageCollectionItemType.INT, MessageCollectionItemType.MSG))
return false;
writer.incrementState();
case 16:
- if (!writer.writeByteArray(partsToReloadBytes))
+ if (!writer.writeMessage(partsToReload))
return false;
writer.incrementState();
@@ -758,7 +715,7 @@ public class GridDhtPartitionsFullMessage extends
GridDhtPartitionsAbstractMessa
reader.incrementState();
case 15:
- partsSizesBytes = reader.readByteArray();
+ partsSizes = reader.readMap(MessageCollectionItemType.INT,
MessageCollectionItemType.MSG, false);
if (!reader.isLastRead())
return false;
@@ -766,7 +723,7 @@ public class GridDhtPartitionsFullMessage extends
GridDhtPartitionsAbstractMessa
reader.incrementState();
case 16:
- partsToReloadBytes = reader.readByteArray();
+ partsToReload = reader.readMessage();
if (!reader.isLastRead())
return false;
@@ -842,8 +799,6 @@ public class GridDhtPartitionsFullMessage extends
GridDhtPartitionsAbstractMessa
public void cleanUp() {
partsBytes = null;
partCntrs = null;
- partsToReloadBytes = null;
- partsSizesBytes = null;
errsBytes = null;
}
}
diff --git
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/IgniteDhtPartitionsToReloadMap.java
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/IgniteDhtPartitionsToReloadMap.java
index 8515004c3fd..bc965a49926 100644
---
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/IgniteDhtPartitionsToReloadMap.java
+++
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/IgniteDhtPartitionsToReloadMap.java
@@ -18,45 +18,47 @@
package org.apache.ignite.internal.processors.cache.distributed.dht.preloader;
-import java.io.Serializable;
+import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
-import java.util.HashSet;
import java.util.Map;
-import java.util.Set;
import java.util.UUID;
+import org.apache.ignite.internal.Order;
+import org.apache.ignite.internal.util.typedef.F;
import org.apache.ignite.internal.util.typedef.internal.S;
+import org.apache.ignite.plugin.extensions.communication.Message;
/**
* Partition reload map.
*/
-public class IgniteDhtPartitionsToReloadMap implements Serializable {
- /** */
- private static final long serialVersionUID = 0L;
+public class IgniteDhtPartitionsToReloadMap implements Message {
+ /** Type code. */
+ public static final short TYPE_CODE = 513;
/** */
- private Map<UUID, Map<Integer, Set<Integer>>> map;
+ @Order(value = 0, method = "partitionsToReload")
+ private Map<UUID, CachePartitionsToReloadMap> map;
/**
* @param nodeId Node ID.
* @param cacheId Cache ID.
* @return Collection of partitions to reload.
*/
- public synchronized Set<Integer> get(UUID nodeId, int cacheId) {
+ public synchronized Collection<Integer> get(UUID nodeId, int cacheId) {
if (map == null)
return Collections.emptySet();
- Map<Integer, Set<Integer>> nodeMap = map.get(nodeId);
+ CachePartitionsToReloadMap nodeMap = map.get(nodeId);
if (nodeMap == null)
return Collections.emptySet();
- Set<Integer> parts = nodeMap.get(cacheId);
+ PartitionsToReload partsToReload = nodeMap.get(cacheId);
- if (parts == null)
+ if (partsToReload == null)
return Collections.emptySet();
- return parts;
+ return F.emptyIfNull(partsToReload.partitions());
}
/**
@@ -68,18 +70,12 @@ public class IgniteDhtPartitionsToReloadMap implements
Serializable {
if (map == null)
map = new HashMap<>();
- Map<Integer, Set<Integer>> nodeMap = map.get(nodeId);
-
- if (nodeMap == null) {
- nodeMap = new HashMap<>();
-
- map.put(nodeId, nodeMap);
- }
+ CachePartitionsToReloadMap nodeMap = map.computeIfAbsent(nodeId, k ->
new CachePartitionsToReloadMap());
- Set<Integer> parts = nodeMap.get(cacheId);
+ PartitionsToReload parts = nodeMap.get(cacheId);
if (parts == null) {
- parts = new HashSet<>();
+ parts = new PartitionsToReload();
nodeMap.put(cacheId, parts);
}
@@ -98,4 +94,23 @@ public class IgniteDhtPartitionsToReloadMap implements
Serializable {
@Override public String toString() {
return S.toString(IgniteDhtPartitionsToReloadMap.class, this);
}
+
+ /**
+ * @return Partition reload map.
+ */
+ public Map<UUID, CachePartitionsToReloadMap> partitionsToReload() {
+ return map;
+ }
+
+ /**
+ * @param map Partition reload map.
+ */
+ public void partitionsToReload(Map<UUID, CachePartitionsToReloadMap> map) {
+ this.map = map;
+ }
+
+ /** {@inheritDoc} */
+ @Override public short directType() {
+ return TYPE_CODE;
+ }
}
diff --git
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/PartitionSizesMap.java
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/PartitionSizesMap.java
new file mode 100644
index 00000000000..987fa9fe604
--- /dev/null
+++
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/PartitionSizesMap.java
@@ -0,0 +1,64 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.cache.distributed.dht.preloader;
+
+import java.util.Map;
+import org.apache.ignite.internal.Order;
+import org.apache.ignite.plugin.extensions.communication.Message;
+import org.jetbrains.annotations.Nullable;
+
+/** Partition sizes map. */
+public class PartitionSizesMap implements Message {
+ /** Type code. */
+ public static final short TYPE_CODE = 514;
+
+ /** Partition sizes map. */
+ @Order(value = 0, method = "partitionSizes")
+ private @Nullable Map<Integer, Long> partSizes;
+
+ /** Default constructor. */
+ public PartitionSizesMap() {
+ // No-op.
+ }
+
+ /**
+ * @param partSizes Partition sizes map.
+ */
+ public PartitionSizesMap(@Nullable Map<Integer, Long> partSizes) {
+ this.partSizes = partSizes;
+ }
+
+ /**
+ * @return Partition sizes map.
+ */
+ public @Nullable Map<Integer, Long> partitionSizes() {
+ return partSizes;
+ }
+
+ /**
+ * @param partSizes Partition sizes map.
+ */
+ public void partitionSizes(Map<Integer, Long> partSizes) {
+ this.partSizes = partSizes;
+ }
+
+ /** {@inheritDoc} */
+ @Override public short directType() {
+ return TYPE_CODE;
+ }
+}
diff --git
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/PartitionsToReload.java
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/PartitionsToReload.java
new file mode 100644
index 00000000000..023b28ad03f
--- /dev/null
+++
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/PartitionsToReload.java
@@ -0,0 +1,62 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.cache.distributed.dht.preloader;
+
+import java.util.Collection;
+import java.util.HashSet;
+import org.apache.ignite.internal.Order;
+import org.apache.ignite.plugin.extensions.communication.Message;
+
+/** Partitions to reload. */
+public class PartitionsToReload implements Message {
+ /** Type code. */
+ public static final short TYPE_CODE = 511;
+
+ /** Collection of partitions to reload. */
+ @Order(value = 0, method = "partitions")
+ private Collection<Integer> parts;
+
+ /**
+ * @return Collection of partitions to reload.
+ */
+ public Collection<Integer> partitions() {
+ return parts;
+ }
+
+ /**
+ * @param parts Collection of partitions to reload.
+ */
+ public void partitions(Collection<Integer> parts) {
+ this.parts = parts;
+ }
+
+ /**
+ * @param partId Partition ID.
+ */
+ public void add(int partId) {
+ if (parts == null)
+ parts = new HashSet<>();
+
+ parts.add(partId);
+ }
+
+ /** {@inheritDoc} */
+ @Override public short directType() {
+ return TYPE_CODE;
+ }
+}
diff --git
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/topology/GridClientPartitionTopology.java
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/topology/GridClientPartitionTopology.java
index 9ca26f636ae..675f891be69 100644
---
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/topology/GridClientPartitionTopology.java
+++
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/topology/GridClientPartitionTopology.java
@@ -747,7 +747,7 @@ public class GridClientPartitionTopology implements
GridDhtPartitionTopology {
@Nullable AffinityTopologyVersion exchangeVer,
GridDhtPartitionFullMap partMap,
@Nullable CachePartitionFullCountersMap cntrMap,
- Set<Integer> partsToReload,
+ Collection<Integer> partsToReload,
@Nullable Map<Integer, Long> partSizes,
@Nullable AffinityTopologyVersion msgTopVer,
@Nullable GridDhtPartitionsExchangeFuture exchFut,
diff --git
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/topology/GridDhtPartitionTopology.java
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/topology/GridDhtPartitionTopology.java
index 0c2f415a7a1..7d6a079f044 100644
---
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/topology/GridDhtPartitionTopology.java
+++
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/topology/GridDhtPartitionTopology.java
@@ -17,6 +17,7 @@
package org.apache.ignite.internal.processors.cache.distributed.dht.topology;
+import java.util.Collection;
import java.util.List;
import java.util.Map;
import java.util.Set;
@@ -297,7 +298,7 @@ public interface GridDhtPartitionTopology {
* means full map received is not related to exchange
* @param partMap Update partition map.
* @param cntrMap Partition update counters.
- * @param partsToReload Set of partitions that need to be reloaded.
+ * @param partsToReload Collection of partitions that need to be reloaded.
* @param partSizes Global partition sizes.
* @param msgTopVer Topology version from incoming message. This value is
not null only for case message is not
* related to exchange. Value should be not less than previous
'Topology version from exchange'.
@@ -309,7 +310,7 @@ public interface GridDhtPartitionTopology {
@Nullable AffinityTopologyVersion exchangeResVer,
GridDhtPartitionFullMap partMap,
@Nullable CachePartitionFullCountersMap cntrMap,
- Set<Integer> partsToReload,
+ Collection<Integer> partsToReload,
@Nullable Map<Integer, Long> partSizes,
@Nullable AffinityTopologyVersion msgTopVer,
@Nullable GridDhtPartitionsExchangeFuture exchFut,
diff --git
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/topology/GridDhtPartitionTopologyImpl.java
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/topology/GridDhtPartitionTopologyImpl.java
index b004053ca6f..8227e161728 100644
---
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/topology/GridDhtPartitionTopologyImpl.java
+++
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/topology/GridDhtPartitionTopologyImpl.java
@@ -1440,7 +1440,7 @@ public class GridDhtPartitionTopologyImpl implements
GridDhtPartitionTopology {
@Nullable AffinityTopologyVersion exchangeVer,
GridDhtPartitionFullMap partMap,
@Nullable CachePartitionFullCountersMap incomeCntrMap,
- Set<Integer> partsToReload,
+ Collection<Integer> partsToReload,
@Nullable Map<Integer, Long> partSizes,
@Nullable AffinityTopologyVersion msgTopVer,
@Nullable GridDhtPartitionsExchangeFuture exchFut,