This is an automated email from the ASF dual-hosted git repository.

sergeychugunov pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/ignite.git


The following commit(s) were added to refs/heads/master by this push:
     new b2fdcea5483 IGNITE-26866 Add Message interface to 
IgniteDhtPartitionsToReloadMap (#12487)
b2fdcea5483 is described below

commit b2fdcea548339b4a13e636adb8cce4c0796421ec
Author: Dmitry Werner <[email protected]>
AuthorDate: Fri Nov 7 13:00:04 2025 +0500

    IGNITE-26866 Add Message interface to IgniteDhtPartitionsToReloadMap 
(#12487)
---
 .../apache/ignite/internal/util/lang/GridFunc.java |  7 ++
 .../communication/GridIoMessageFactory.java        | 13 ++++
 .../cache/GridCachePartitionExchangeManager.java   |  9 ++-
 .../dht/preloader/CachePartitionsToReloadMap.java  | 75 ++++++++++++++++++++++
 .../preloader/GridDhtPartitionsExchangeFuture.java | 16 +++--
 .../preloader/GridDhtPartitionsFullMessage.java    | 69 ++++----------------
 .../preloader/IgniteDhtPartitionsToReloadMap.java  | 57 ++++++++++------
 .../dht/preloader/PartitionSizesMap.java           | 64 ++++++++++++++++++
 .../dht/preloader/PartitionsToReload.java          | 62 ++++++++++++++++++
 .../dht/topology/GridClientPartitionTopology.java  |  2 +-
 .../dht/topology/GridDhtPartitionTopology.java     |  5 +-
 .../dht/topology/GridDhtPartitionTopologyImpl.java |  2 +-
 12 files changed, 292 insertions(+), 89 deletions(-)

diff --git 
a/modules/commons/src/main/java/org/apache/ignite/internal/util/lang/GridFunc.java
 
b/modules/commons/src/main/java/org/apache/ignite/internal/util/lang/GridFunc.java
index 3af938392a4..8bf794062ef 100755
--- 
a/modules/commons/src/main/java/org/apache/ignite/internal/util/lang/GridFunc.java
+++ 
b/modules/commons/src/main/java/org/apache/ignite/internal/util/lang/GridFunc.java
@@ -2146,4 +2146,11 @@ public class GridFunc {
     public static <T> Collection<T> emptyIfNull(@Nullable Collection<T> col) {
         return col == null ? Collections.emptySet() : col;
     }
+
+    /**
+     * @param map Map.
+     */
+    public static <K, V> Map<K, V> emptyIfNull(@Nullable Map<K, V> map) {
+        return map == null ? Collections.emptyMap() : map;
+    }
 }
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoMessageFactory.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoMessageFactory.java
index c3804b5beae..9028c55a0c8 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoMessageFactory.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoMessageFactory.java
@@ -35,6 +35,7 @@ import 
org.apache.ignite.internal.codegen.CacheGroupAffinityMessageSerializer;
 import org.apache.ignite.internal.codegen.CacheInvokeDirectResultSerializer;
 import 
org.apache.ignite.internal.codegen.CachePartitionFullCountersMapSerializer;
 import 
org.apache.ignite.internal.codegen.CachePartitionPartialCountersMapSerializer;
+import org.apache.ignite.internal.codegen.CachePartitionsToReloadMapSerializer;
 import org.apache.ignite.internal.codegen.CacheVersionedValueSerializer;
 import 
org.apache.ignite.internal.codegen.CacheWriteSynchronizationModeMessageSerializer;
 import org.apache.ignite.internal.codegen.ErrorMessageSerializer;
@@ -110,6 +111,7 @@ import 
org.apache.ignite.internal.codegen.HandshakeWaitMessageSerializer;
 import 
org.apache.ignite.internal.codegen.IgniteDhtDemandedPartitionsMapSerializer;
 import 
org.apache.ignite.internal.codegen.IgniteDhtPartitionCountersMapSerializer;
 import 
org.apache.ignite.internal.codegen.IgniteDhtPartitionHistorySuppliersMapSerializer;
+import 
org.apache.ignite.internal.codegen.IgniteDhtPartitionsToReloadMapSerializer;
 import org.apache.ignite.internal.codegen.IgniteTxKeySerializer;
 import 
org.apache.ignite.internal.codegen.IncrementalSnapshotAwareMessageSerializer;
 import org.apache.ignite.internal.codegen.JobStealingRequestSerializer;
@@ -121,6 +123,8 @@ import 
org.apache.ignite.internal.codegen.MissingMappingResponseMessageSerialize
 import org.apache.ignite.internal.codegen.NearCacheUpdatesSerializer;
 import org.apache.ignite.internal.codegen.NodeIdMessageSerializer;
 import org.apache.ignite.internal.codegen.PartitionReservationsMapSerializer;
+import org.apache.ignite.internal.codegen.PartitionSizesMapSerializer;
+import org.apache.ignite.internal.codegen.PartitionsToReloadSerializer;
 import 
org.apache.ignite.internal.codegen.RecoveryLastReceivedMessageSerializer;
 import 
org.apache.ignite.internal.codegen.SchemaOperationStatusMessageSerializer;
 import org.apache.ignite.internal.codegen.ServiceDeploymentProcessIdSerializer;
@@ -201,6 +205,7 @@ import 
org.apache.ignite.internal.processors.cache.distributed.dht.atomic.Update
 import 
org.apache.ignite.internal.processors.cache.distributed.dht.preloader.CacheGroupAffinityMessage;
 import 
org.apache.ignite.internal.processors.cache.distributed.dht.preloader.CachePartitionFullCountersMap;
 import 
org.apache.ignite.internal.processors.cache.distributed.dht.preloader.CachePartitionPartialCountersMap;
+import 
org.apache.ignite.internal.processors.cache.distributed.dht.preloader.CachePartitionsToReloadMap;
 import 
org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtForceKeysRequest;
 import 
org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtForceKeysResponse;
 import 
org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPartitionDemandMessage;
@@ -213,7 +218,10 @@ import 
org.apache.ignite.internal.processors.cache.distributed.dht.preloader.Gro
 import 
org.apache.ignite.internal.processors.cache.distributed.dht.preloader.IgniteDhtDemandedPartitionsMap;
 import 
org.apache.ignite.internal.processors.cache.distributed.dht.preloader.IgniteDhtPartitionCountersMap;
 import 
org.apache.ignite.internal.processors.cache.distributed.dht.preloader.IgniteDhtPartitionHistorySuppliersMap;
+import 
org.apache.ignite.internal.processors.cache.distributed.dht.preloader.IgniteDhtPartitionsToReloadMap;
 import 
org.apache.ignite.internal.processors.cache.distributed.dht.preloader.PartitionReservationsMap;
+import 
org.apache.ignite.internal.processors.cache.distributed.dht.preloader.PartitionSizesMap;
+import 
org.apache.ignite.internal.processors.cache.distributed.dht.preloader.PartitionsToReload;
 import 
org.apache.ignite.internal.processors.cache.distributed.dht.preloader.latch.LatchAckMessage;
 import 
org.apache.ignite.internal.processors.cache.distributed.near.CacheVersionedValue;
 import 
org.apache.ignite.internal.processors.cache.distributed.near.GridNearGetRequest;
@@ -466,6 +474,11 @@ public class GridIoMessageFactory implements 
MessageFactoryProvider {
         factory.register(PartitionReservationsMap.TYPE_CODE, 
PartitionReservationsMap::new, new PartitionReservationsMapSerializer());
         factory.register(IgniteDhtPartitionHistorySuppliersMap.TYPE_CODE, 
IgniteDhtPartitionHistorySuppliersMap::new,
             new IgniteDhtPartitionHistorySuppliersMapSerializer());
+        factory.register(PartitionsToReload.TYPE_CODE, 
PartitionsToReload::new, new PartitionsToReloadSerializer());
+        factory.register(CachePartitionsToReloadMap.TYPE_CODE, 
CachePartitionsToReloadMap::new, new CachePartitionsToReloadMapSerializer());
+        factory.register(IgniteDhtPartitionsToReloadMap.TYPE_CODE, 
IgniteDhtPartitionsToReloadMap::new,
+            new IgniteDhtPartitionsToReloadMapSerializer());
+        factory.register(PartitionSizesMap.TYPE_CODE, PartitionSizesMap::new, 
new PartitionSizesMapSerializer());
 
         // [-3..119] [124..129] [-23..-28] [-36..-55] [183..188] - this
         // [120..123] - DR
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java
index 6bff39ae692..9a6a8cfb045 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java
@@ -90,6 +90,7 @@ import 
org.apache.ignite.internal.processors.cache.distributed.dht.preloader.Gri
 import 
org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPartitionsSingleRequest;
 import 
org.apache.ignite.internal.processors.cache.distributed.dht.preloader.IgniteDhtPartitionHistorySuppliersMap;
 import 
org.apache.ignite.internal.processors.cache.distributed.dht.preloader.IgniteDhtPartitionsToReloadMap;
+import 
org.apache.ignite.internal.processors.cache.distributed.dht.preloader.PartitionSizesMap;
 import 
org.apache.ignite.internal.processors.cache.distributed.dht.preloader.PartitionsExchangeAware;
 import 
org.apache.ignite.internal.processors.cache.distributed.dht.preloader.RebalanceReassignExchangeTask;
 import 
org.apache.ignite.internal.processors.cache.distributed.dht.preloader.StopCachesOnClientReconnectExchangeTask;
@@ -1440,7 +1441,7 @@ public class GridCachePartitionExchangeManager<K, V> 
extends GridCacheSharedMana
         }
 
         if (!partsSizes.isEmpty())
-            m.partitionSizes(cctx, partsSizes);
+            m.partitionSizes(F.viewReadOnly(partsSizes, 
PartitionSizesMap::new));
 
         return m;
     }
@@ -1771,7 +1772,7 @@ public class GridCachePartitionExchangeManager<K, V> 
extends GridCacheSharedMana
 
                 boolean updated = false;
 
-                Map<Integer, Map<Integer, Long>> partsSizes = 
msg.partitionSizes(cctx);
+                Map<Integer, PartitionSizesMap> partsSizes = 
F.emptyIfNull(msg.partitionSizes());
 
                 for (Map.Entry<Integer, GridDhtPartitionFullMap> entry : 
msg.partitions().entrySet()) {
                     Integer grpId = entry.getKey();
@@ -1781,11 +1782,13 @@ public class GridCachePartitionExchangeManager<K, V> 
extends GridCacheSharedMana
                     GridDhtPartitionTopology top = grp == null ? 
clientTops.get(grpId) : grp.topology();
 
                     if (top != null) {
+                        PartitionSizesMap sizesMap = partsSizes.get(grpId);
+
                         updated |= top.update(null,
                             entry.getValue(),
                             null,
                             msg.partsToReload(cctx.localNodeId(), grpId),
-                            partsSizes.getOrDefault(grpId, 
Collections.emptyMap()),
+                            sizesMap != null ? 
F.emptyIfNull(sizesMap.partitionSizes()) : Collections.emptyMap(),
                             msg.topologyVersion(),
                             null,
                             null);
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/CachePartitionsToReloadMap.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/CachePartitionsToReloadMap.java
new file mode 100644
index 00000000000..bcf6ab80c3a
--- /dev/null
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/CachePartitionsToReloadMap.java
@@ -0,0 +1,75 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.cache.distributed.dht.preloader;
+
+import java.util.HashMap;
+import java.util.Map;
+import org.apache.ignite.internal.Order;
+import org.apache.ignite.plugin.extensions.communication.Message;
+import org.jetbrains.annotations.Nullable;
+
+/** Partition reload map for cache. */
+public class CachePartitionsToReloadMap implements Message {
+    /** Type code. */
+    public static final short TYPE_CODE = 512;
+
+    /** Partition reload map for cache. */
+    @Order(value = 0, method = "cachePartitions")
+    private Map<Integer, PartitionsToReload> map;
+
+    /**
+     * @return Partition reload map for cache.
+     */
+    public Map<Integer, PartitionsToReload> cachePartitions() {
+        return map;
+    }
+
+    /**
+     * @param map Partition reload map for cache.
+     */
+    public void cachePartitions(Map<Integer, PartitionsToReload> map) {
+        this.map = map;
+    }
+
+    /**
+     * @param cacheId Cache id.
+     * @return Partitions to reload for this cache.
+     */
+    public @Nullable PartitionsToReload get(int cacheId) {
+        if (map == null)
+            return null;
+
+        return map.get(cacheId);
+    }
+
+    /**
+     * @param cacheId Cache id.
+     * @param parts Partitions to reload.
+     */
+    public void put(int cacheId, PartitionsToReload parts) {
+        if (map == null)
+            map = new HashMap<>();
+
+        map.put(cacheId, parts);
+    }
+
+    /** {@inheritDoc} */
+    @Override public short directType() {
+        return TYPE_CODE;
+    }
+}
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java
index 0a1b955f0d0..e40f6234a90 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java
@@ -3311,12 +3311,18 @@ public class GridDhtPartitionsExchangeFuture extends 
GridDhtTopologyFutureAdapte
             if (partMap == null)
                 continue;
 
+            Map<Integer, Long> grpPartSizes = 
singleMsg.partitionSizes(top.groupId());
+
             for (Map.Entry<Integer, GridDhtPartitionState> e0 : 
partMap.entrySet()) {
                 int p = e0.getKey();
                 GridDhtPartitionState state = e0.getValue();
 
-                if (state == GridDhtPartitionState.OWNING)
-                    partSizes.put(p, 
singleMsg.partitionSizes(top.groupId()).get(p));
+                if (state == GridDhtPartitionState.OWNING) {
+                    Long size = grpPartSizes.get(p);
+
+                    if (size != null)
+                        partSizes.put(p, size);
+                }
             }
         }
 
@@ -4654,7 +4660,7 @@ public class GridDhtPartitionsExchangeFuture extends 
GridDhtTopologyFutureAdapte
         int parallelismLvl = U.availableThreadCount(cctx.kernalContext(), 
GridIoPolicy.SYSTEM_POOL, 2);
 
         try {
-            Map<Integer, Map<Integer, Long>> partsSizes = 
msg.partitionSizes(cctx);
+            Map<Integer, PartitionSizesMap> partsSizes = 
F.emptyIfNull(msg.partitionSizes());
 
             doInParallel(
                 parallelismLvl,
@@ -4665,11 +4671,13 @@ public class GridDhtPartitionsExchangeFuture extends 
GridDhtTopologyFutureAdapte
                     CacheGroupContext grp = cctx.cache().cacheGroup(grpId);
 
                     if (grp != null) {
+                        PartitionSizesMap sizesMap = partsSizes.get(grpId);
+
                         grp.topology().update(resTopVer,
                             msg.partitions().get(grpId),
                             cntrMap,
                             msg.partsToReload(cctx.localNodeId(), grpId),
-                            partsSizes.getOrDefault(grpId, 
Collections.emptyMap()),
+                            sizesMap != null ? 
F.emptyIfNull(sizesMap.partitionSizes()) : Collections.emptyMap(),
                             null,
                             this,
                             msg.lostPartitions(grpId));
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsFullMessage.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsFullMessage.java
index 469a85140f7..15fef52e473 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsFullMessage.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsFullMessage.java
@@ -29,7 +29,6 @@ import java.util.UUID;
 import java.util.stream.Collectors;
 import java.util.stream.IntStream;
 import org.apache.ignite.IgniteCheckedException;
-import org.apache.ignite.IgniteException;
 import org.apache.ignite.cluster.ClusterNode;
 import org.apache.ignite.internal.GridDirectMap;
 import org.apache.ignite.internal.GridDirectTransient;
@@ -83,14 +82,10 @@ public class GridDhtPartitionsFullMessage extends 
GridDhtPartitionsAbstractMessa
 
     /** Partitions that must be cleared and re-loaded. */
     @GridToStringInclude
-    @GridDirectTransient
     private IgniteDhtPartitionsToReloadMap partsToReload;
 
-    /** Serialized partitions that must be cleared and re-loaded. */
-    private byte[] partsToReloadBytes;
-
-    /** Serialized partitions sizes. */
-    private byte[] partsSizesBytes;
+    /** Partition sizes. */
+    private Map<Integer, PartitionSizesMap> partsSizes;
 
     /** Topology version. */
     private AffinityTopologyVersion topVer;
@@ -178,8 +173,7 @@ public class GridDhtPartitionsFullMessage extends 
GridDhtPartitionsAbstractMessa
         cp.partCntrs = partCntrs;
         cp.partHistSuppliers = partHistSuppliers;
         cp.partsToReload = partsToReload;
-        cp.partsToReloadBytes = partsToReloadBytes;
-        cp.partsSizesBytes = partsSizesBytes;
+        cp.partsSizes = partsSizes;
         cp.topVer = topVer;
         cp.errs = errs;
         cp.errsBytes = errsBytes;
@@ -351,7 +345,7 @@ public class GridDhtPartitionsFullMessage extends 
GridDhtPartitionsAbstractMessa
     /**
      *
      */
-    public Set<Integer> partsToReload(UUID nodeId, int grpId) {
+    public Collection<Integer> partsToReload(UUID nodeId, int grpId) {
         if (partsToReload == null)
             return Collections.emptySet();
 
@@ -361,41 +355,17 @@ public class GridDhtPartitionsFullMessage extends 
GridDhtPartitionsAbstractMessa
     /**
      * Supplies partition sizes map for all cache groups.
      *
-     * @param ctx Cache context.
      * @param partsSizes Partitions sizes map.
      */
-    public void partitionSizes(GridCacheSharedContext ctx, Map<Integer, 
Map<Integer, Long>> partsSizes) {
-        try {
-            byte[] marshalled = U.marshal(ctx, partsSizes);
-
-            if (compressed())
-                marshalled = U.zip(marshalled, 
ctx.gridConfig().getNetworkCompressionLevel());
-
-            partsSizesBytes = marshalled;
-        }
-        catch (IgniteCheckedException ex) {
-            throw new IgniteException(ex);
-        }
+    public void partitionSizes(Map<Integer, PartitionSizesMap> partsSizes) {
+        this.partsSizes = partsSizes;
     }
 
     /**
-     * Returns partition sizes map for all cache groups.
-     *
-     * @param ctx Cache context.
      * @return Partition sizes map (grpId, (partId, partSize)).
      */
-    public Map<Integer, Map<Integer, Long>> 
partitionSizes(GridCacheSharedContext ctx) {
-        if (partsSizesBytes == null)
-            return Collections.emptyMap();
-
-        try {
-            return compressed()
-                ? U.unmarshalZip(ctx.marshaller(), partsSizesBytes, 
ctx.deploy().globalLoader())
-                : U.unmarshal(ctx, partsSizesBytes, 
ctx.deploy().globalLoader());
-        }
-        catch (IgniteCheckedException ex) {
-            throw new IgniteException(ex);
-        }
+    public Map<Integer, PartitionSizesMap> partitionSizes() {
+        return partsSizes;
     }
 
     /**
@@ -431,7 +401,6 @@ public class GridDhtPartitionsFullMessage extends 
GridDhtPartitionsAbstractMessa
         super.prepareMarshal(ctx);
 
         boolean marshal = (!F.isEmpty(parts) && partsBytes == null) ||
-            (partsToReload != null && partsToReloadBytes == null) ||
             (!F.isEmpty(errs) && errsBytes == null);
 
         if (marshal) {
@@ -443,9 +412,6 @@ public class GridDhtPartitionsFullMessage extends 
GridDhtPartitionsAbstractMessa
             if (!F.isEmpty(parts) && partsBytes == null)
                 objectsToMarshall.add(parts);
 
-            if (partsToReload != null && partsToReloadBytes == null)
-                objectsToMarshall.add(partsToReload);
-
             if (!F.isEmpty(errs) && errsBytes == null)
                 objectsToMarshall.add(errs);
 
@@ -469,9 +435,6 @@ public class GridDhtPartitionsFullMessage extends 
GridDhtPartitionsAbstractMessa
             if (!F.isEmpty(parts) && partsBytes == null)
                 partsBytes = iter.next();
 
-            if (partsToReload != null && partsToReloadBytes == null)
-                partsToReloadBytes = iter.next();
-
             if (!F.isEmpty(errs) && errsBytes == null)
                 errsBytes = iter.next();
         }
@@ -505,9 +468,6 @@ public class GridDhtPartitionsFullMessage extends 
GridDhtPartitionsAbstractMessa
         if (partsBytes != null && parts == null)
             objectsToUnmarshall.add(partsBytes);
 
-        if (partsToReloadBytes != null && partsToReload == null)
-            objectsToUnmarshall.add(partsToReloadBytes);
-
         if (errsBytes != null && errs == null)
             objectsToUnmarshall.add(errsBytes);
 
@@ -557,9 +517,6 @@ public class GridDhtPartitionsFullMessage extends 
GridDhtPartitionsAbstractMessa
             }
         }
 
-        if (partsToReloadBytes != null && partsToReload == null)
-            partsToReload = (IgniteDhtPartitionsToReloadMap)iter.next();
-
         if (errsBytes != null && errs == null)
             errs = (Map<UUID, Exception>)iter.next();
 
@@ -649,13 +606,13 @@ public class GridDhtPartitionsFullMessage extends 
GridDhtPartitionsAbstractMessa
                 writer.incrementState();
 
             case 15:
-                if (!writer.writeByteArray(partsSizesBytes))
+                if (!writer.writeMap(partsSizes, 
MessageCollectionItemType.INT, MessageCollectionItemType.MSG))
                     return false;
 
                 writer.incrementState();
 
             case 16:
-                if (!writer.writeByteArray(partsToReloadBytes))
+                if (!writer.writeMessage(partsToReload))
                     return false;
 
                 writer.incrementState();
@@ -758,7 +715,7 @@ public class GridDhtPartitionsFullMessage extends 
GridDhtPartitionsAbstractMessa
                 reader.incrementState();
 
             case 15:
-                partsSizesBytes = reader.readByteArray();
+                partsSizes = reader.readMap(MessageCollectionItemType.INT, 
MessageCollectionItemType.MSG, false);
 
                 if (!reader.isLastRead())
                     return false;
@@ -766,7 +723,7 @@ public class GridDhtPartitionsFullMessage extends 
GridDhtPartitionsAbstractMessa
                 reader.incrementState();
 
             case 16:
-                partsToReloadBytes = reader.readByteArray();
+                partsToReload = reader.readMessage();
 
                 if (!reader.isLastRead())
                     return false;
@@ -842,8 +799,6 @@ public class GridDhtPartitionsFullMessage extends 
GridDhtPartitionsAbstractMessa
     public void cleanUp() {
         partsBytes = null;
         partCntrs = null;
-        partsToReloadBytes = null;
-        partsSizesBytes = null;
         errsBytes = null;
     }
 }
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/IgniteDhtPartitionsToReloadMap.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/IgniteDhtPartitionsToReloadMap.java
index 8515004c3fd..bc965a49926 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/IgniteDhtPartitionsToReloadMap.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/IgniteDhtPartitionsToReloadMap.java
@@ -18,45 +18,47 @@
 
 package org.apache.ignite.internal.processors.cache.distributed.dht.preloader;
 
-import java.io.Serializable;
+import java.util.Collection;
 import java.util.Collections;
 import java.util.HashMap;
-import java.util.HashSet;
 import java.util.Map;
-import java.util.Set;
 import java.util.UUID;
+import org.apache.ignite.internal.Order;
+import org.apache.ignite.internal.util.typedef.F;
 import org.apache.ignite.internal.util.typedef.internal.S;
+import org.apache.ignite.plugin.extensions.communication.Message;
 
 /**
  * Partition reload map.
  */
-public class IgniteDhtPartitionsToReloadMap implements Serializable {
-    /** */
-    private static final long serialVersionUID = 0L;
+public class IgniteDhtPartitionsToReloadMap implements Message {
+    /** Type code. */
+    public static final short TYPE_CODE = 513;
 
     /** */
-    private Map<UUID, Map<Integer, Set<Integer>>> map;
+    @Order(value = 0, method = "partitionsToReload")
+    private Map<UUID, CachePartitionsToReloadMap> map;
 
     /**
      * @param nodeId Node ID.
      * @param cacheId Cache ID.
      * @return Collection of partitions to reload.
      */
-    public synchronized Set<Integer> get(UUID nodeId, int cacheId) {
+    public synchronized Collection<Integer> get(UUID nodeId, int cacheId) {
         if (map == null)
             return Collections.emptySet();
 
-        Map<Integer, Set<Integer>> nodeMap = map.get(nodeId);
+        CachePartitionsToReloadMap nodeMap = map.get(nodeId);
 
         if (nodeMap == null)
             return Collections.emptySet();
 
-        Set<Integer> parts = nodeMap.get(cacheId);
+        PartitionsToReload partsToReload = nodeMap.get(cacheId);
 
-        if (parts == null)
+        if (partsToReload == null)
             return Collections.emptySet();
 
-        return parts;
+        return F.emptyIfNull(partsToReload.partitions());
     }
 
     /**
@@ -68,18 +70,12 @@ public class IgniteDhtPartitionsToReloadMap implements 
Serializable {
         if (map == null)
             map = new HashMap<>();
 
-        Map<Integer, Set<Integer>> nodeMap = map.get(nodeId);
-
-        if (nodeMap == null) {
-            nodeMap = new HashMap<>();
-
-            map.put(nodeId, nodeMap);
-        }
+        CachePartitionsToReloadMap nodeMap = map.computeIfAbsent(nodeId, k -> 
new CachePartitionsToReloadMap());
 
-        Set<Integer> parts = nodeMap.get(cacheId);
+        PartitionsToReload parts = nodeMap.get(cacheId);
 
         if (parts == null) {
-            parts = new HashSet<>();
+            parts = new PartitionsToReload();
 
             nodeMap.put(cacheId, parts);
         }
@@ -98,4 +94,23 @@ public class IgniteDhtPartitionsToReloadMap implements 
Serializable {
     @Override public String toString() {
         return S.toString(IgniteDhtPartitionsToReloadMap.class, this);
     }
+
+    /**
+     * @return Partition reload map.
+     */
+    public Map<UUID, CachePartitionsToReloadMap> partitionsToReload() {
+        return map;
+    }
+
+    /**
+     * @param map Partition reload map.
+     */
+    public void partitionsToReload(Map<UUID, CachePartitionsToReloadMap> map) {
+        this.map = map;
+    }
+
+    /** {@inheritDoc} */
+    @Override public short directType() {
+        return TYPE_CODE;
+    }
 }
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/PartitionSizesMap.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/PartitionSizesMap.java
new file mode 100644
index 00000000000..987fa9fe604
--- /dev/null
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/PartitionSizesMap.java
@@ -0,0 +1,64 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.cache.distributed.dht.preloader;
+
+import java.util.Map;
+import org.apache.ignite.internal.Order;
+import org.apache.ignite.plugin.extensions.communication.Message;
+import org.jetbrains.annotations.Nullable;
+
+/** Partition sizes map. */
+public class PartitionSizesMap implements Message {
+    /** Type code. */
+    public static final short TYPE_CODE = 514;
+
+    /** Partition sizes map. */
+    @Order(value = 0, method = "partitionSizes")
+    private @Nullable Map<Integer, Long> partSizes;
+
+    /** Default constructor. */
+    public PartitionSizesMap() {
+        // No-op.
+    }
+
+    /**
+     * @param partSizes Partition sizes map.
+     */
+    public PartitionSizesMap(@Nullable Map<Integer, Long> partSizes) {
+        this.partSizes = partSizes;
+    }
+
+    /**
+     * @return Partition sizes map.
+     */
+    public @Nullable Map<Integer, Long> partitionSizes() {
+        return partSizes;
+    }
+
+    /**
+     * @param partSizes Partition sizes map.
+     */
+    public void partitionSizes(Map<Integer, Long> partSizes) {
+        this.partSizes = partSizes;
+    }
+
+    /** {@inheritDoc} */
+    @Override public short directType() {
+        return TYPE_CODE;
+    }
+}
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/PartitionsToReload.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/PartitionsToReload.java
new file mode 100644
index 00000000000..023b28ad03f
--- /dev/null
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/PartitionsToReload.java
@@ -0,0 +1,62 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.cache.distributed.dht.preloader;
+
+import java.util.Collection;
+import java.util.HashSet;
+import org.apache.ignite.internal.Order;
+import org.apache.ignite.plugin.extensions.communication.Message;
+
+/** Partitions to reload. */
+public class PartitionsToReload implements Message {
+    /** Type code. */
+    public static final short TYPE_CODE = 511;
+
+    /** Collection of partitions to reload. */
+    @Order(value = 0, method = "partitions")
+    private Collection<Integer> parts;
+
+    /**
+     * @return Collection of partitions to reload.
+     */
+    public Collection<Integer> partitions() {
+        return parts;
+    }
+
+    /**
+     * @param parts Collection of partitions to reload.
+     */
+    public void partitions(Collection<Integer> parts) {
+        this.parts = parts;
+    }
+
+    /**
+     * @param partId Partition ID.
+     */
+    public void add(int partId) {
+        if (parts == null)
+            parts = new HashSet<>();
+
+        parts.add(partId);
+    }
+
+    /** {@inheritDoc} */
+    @Override public short directType() {
+        return TYPE_CODE;
+    }
+}
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/topology/GridClientPartitionTopology.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/topology/GridClientPartitionTopology.java
index 9ca26f636ae..675f891be69 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/topology/GridClientPartitionTopology.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/topology/GridClientPartitionTopology.java
@@ -747,7 +747,7 @@ public class GridClientPartitionTopology implements 
GridDhtPartitionTopology {
         @Nullable AffinityTopologyVersion exchangeVer,
         GridDhtPartitionFullMap partMap,
         @Nullable CachePartitionFullCountersMap cntrMap,
-        Set<Integer> partsToReload,
+        Collection<Integer> partsToReload,
         @Nullable Map<Integer, Long> partSizes,
         @Nullable AffinityTopologyVersion msgTopVer,
         @Nullable GridDhtPartitionsExchangeFuture exchFut,
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/topology/GridDhtPartitionTopology.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/topology/GridDhtPartitionTopology.java
index 0c2f415a7a1..7d6a079f044 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/topology/GridDhtPartitionTopology.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/topology/GridDhtPartitionTopology.java
@@ -17,6 +17,7 @@
 
 package org.apache.ignite.internal.processors.cache.distributed.dht.topology;
 
+import java.util.Collection;
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
@@ -297,7 +298,7 @@ public interface GridDhtPartitionTopology {
      *      means full map received is not related to exchange
      * @param partMap Update partition map.
      * @param cntrMap Partition update counters.
-     * @param partsToReload Set of partitions that need to be reloaded.
+     * @param partsToReload Collection of partitions that need to be reloaded.
      * @param partSizes Global partition sizes.
      * @param msgTopVer Topology version from incoming message. This value is 
not null only for case message is not
      *      related to exchange. Value should be not less than previous 
'Topology version from exchange'.
@@ -309,7 +310,7 @@ public interface GridDhtPartitionTopology {
         @Nullable AffinityTopologyVersion exchangeResVer,
         GridDhtPartitionFullMap partMap,
         @Nullable CachePartitionFullCountersMap cntrMap,
-        Set<Integer> partsToReload,
+        Collection<Integer> partsToReload,
         @Nullable Map<Integer, Long> partSizes,
         @Nullable AffinityTopologyVersion msgTopVer,
         @Nullable GridDhtPartitionsExchangeFuture exchFut,
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/topology/GridDhtPartitionTopologyImpl.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/topology/GridDhtPartitionTopologyImpl.java
index b004053ca6f..8227e161728 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/topology/GridDhtPartitionTopologyImpl.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/topology/GridDhtPartitionTopologyImpl.java
@@ -1440,7 +1440,7 @@ public class GridDhtPartitionTopologyImpl implements 
GridDhtPartitionTopology {
         @Nullable AffinityTopologyVersion exchangeVer,
         GridDhtPartitionFullMap partMap,
         @Nullable CachePartitionFullCountersMap incomeCntrMap,
-        Set<Integer> partsToReload,
+        Collection<Integer> partsToReload,
         @Nullable Map<Integer, Long> partSizes,
         @Nullable AffinityTopologyVersion msgTopVer,
         @Nullable GridDhtPartitionsExchangeFuture exchFut,


Reply via email to