This is an automated email from the ASF dual-hosted git repository.

sergeychugunov pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/ignite.git


The following commit(s) were added to refs/heads/master by this push:
     new 6591914ba0a IGNITE-26515 Use MessageSerializer for 
GridDhtPartitionDemandMessage (#12387)
6591914ba0a is described below

commit 6591914ba0a6cf513527a654ba8280bfa42634a7
Author: Dmitry Werner <[email protected]>
AuthorDate: Thu Oct 9 16:59:27 2025 +0500

    IGNITE-26515 Use MessageSerializer for GridDhtPartitionDemandMessage 
(#12387)
---
 .../communication/GridIoMessageFactory.java        |  12 +-
 .../cache/IgniteCacheOffheapManagerImpl.java       |   2 +-
 .../CachePartitionPartialCountersMap.java          |  74 +++++++-
 .../preloader/GridDhtPartitionDemandMessage.java   | 203 +++------------------
 .../dht/preloader/GridDhtPartitionDemander.java    |  21 ++-
 .../dht/preloader/GridDhtPartitionSupplier.java    |   9 +-
 .../dht/preloader/GridDhtPreloader.java            |   4 +
 .../dht/preloader/GridDhtPreloaderAssignments.java |  14 +-
 .../preloader/IgniteDhtDemandedPartitionsMap.java  |  34 +++-
 .../apache/ignite/cache/CircledRebalanceTest.java  |   2 +-
 .../db/FullHistRebalanceOnClientStopTest.java      |   2 +-
 .../db/wal/HistoricalReservationTest.java          |   2 +-
 .../db/wal/WalRebalanceRestartTest.java            |   2 +-
 13 files changed, 170 insertions(+), 211 deletions(-)

diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoMessageFactory.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoMessageFactory.java
index 7640cd1f4e9..48cbc18fece 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoMessageFactory.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoMessageFactory.java
@@ -29,6 +29,7 @@ import 
org.apache.ignite.internal.codegen.AtomicApplicationAttributesAwareReques
 import 
org.apache.ignite.internal.codegen.CacheContinuousQueryBatchAckSerializer;
 import org.apache.ignite.internal.codegen.CacheEvictionEntrySerializer;
 import org.apache.ignite.internal.codegen.CacheGroupAffinityMessageSerializer;
+import 
org.apache.ignite.internal.codegen.CachePartitionPartialCountersMapSerializer;
 import org.apache.ignite.internal.codegen.CacheVersionedValueSerializer;
 import 
org.apache.ignite.internal.codegen.GenerateEncryptionKeyRequestSerializer;
 import org.apache.ignite.internal.codegen.GridCacheEntryInfoSerializer;
@@ -43,6 +44,7 @@ import 
org.apache.ignite.internal.codegen.GridDeploymentResponseSerializer;
 import 
org.apache.ignite.internal.codegen.GridDhtAffinityAssignmentRequestSerializer;
 import org.apache.ignite.internal.codegen.GridDhtAtomicNearResponseSerializer;
 import org.apache.ignite.internal.codegen.GridDhtForceKeysRequestSerializer;
+import 
org.apache.ignite.internal.codegen.GridDhtPartitionDemandMessageSerializer;
 import org.apache.ignite.internal.codegen.GridDhtPartitionExchangeIdSerializer;
 import 
org.apache.ignite.internal.codegen.GridDhtPartitionSupplyMessageSerializer;
 import 
org.apache.ignite.internal.codegen.GridDhtPartitionsSingleRequestSerializer;
@@ -65,6 +67,7 @@ import 
org.apache.ignite.internal.codegen.GridQueryNextPageRequestSerializer;
 import org.apache.ignite.internal.codegen.GridQueryNextPageResponseSerializer;
 import org.apache.ignite.internal.codegen.GridTaskCancelRequestSerializer;
 import org.apache.ignite.internal.codegen.GridTaskResultRequestSerializer;
+import 
org.apache.ignite.internal.codegen.IgniteDhtDemandedPartitionsMapSerializer;
 import org.apache.ignite.internal.codegen.IgniteTxKeySerializer;
 import 
org.apache.ignite.internal.codegen.IncrementalSnapshotAwareMessageSerializer;
 import org.apache.ignite.internal.codegen.JobStealingRequestSerializer;
@@ -145,6 +148,7 @@ import 
org.apache.ignite.internal.processors.cache.distributed.dht.atomic.GridNe
 import 
org.apache.ignite.internal.processors.cache.distributed.dht.atomic.NearCacheUpdates;
 import 
org.apache.ignite.internal.processors.cache.distributed.dht.atomic.UpdateErrors;
 import 
org.apache.ignite.internal.processors.cache.distributed.dht.preloader.CacheGroupAffinityMessage;
+import 
org.apache.ignite.internal.processors.cache.distributed.dht.preloader.CachePartitionPartialCountersMap;
 import 
org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtForceKeysRequest;
 import 
org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtForceKeysResponse;
 import 
org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPartitionDemandMessage;
@@ -153,6 +157,7 @@ import 
org.apache.ignite.internal.processors.cache.distributed.dht.preloader.Gri
 import 
org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPartitionsFullMessage;
 import 
org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPartitionsSingleMessage;
 import 
org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPartitionsSingleRequest;
+import 
org.apache.ignite.internal.processors.cache.distributed.dht.preloader.IgniteDhtDemandedPartitionsMap;
 import 
org.apache.ignite.internal.processors.cache.distributed.dht.preloader.latch.LatchAckMessage;
 import 
org.apache.ignite.internal.processors.cache.distributed.near.CacheVersionedValue;
 import 
org.apache.ignite.internal.processors.cache.distributed.near.GridNearGetRequest;
@@ -288,7 +293,7 @@ public class GridIoMessageFactory implements 
MessageFactoryProvider {
         factory.register((short)41, GridNearAtomicUpdateResponse::new, new 
GridNearAtomicUpdateResponseSerializer());
         factory.register((short)42, GridDhtForceKeysRequest::new, new 
GridDhtForceKeysRequestSerializer());
         factory.register((short)43, GridDhtForceKeysResponse::new);
-        factory.register((short)45, GridDhtPartitionDemandMessage::new);
+        factory.register((short)45, GridDhtPartitionDemandMessage::new, new 
GridDhtPartitionDemandMessageSerializer());
         factory.register((short)46, GridDhtPartitionsFullMessage::new);
         factory.register((short)47, GridDhtPartitionsSingleMessage::new);
         factory.register((short)48, GridDhtPartitionsSingleRequest::new, new 
GridDhtPartitionsSingleRequestSerializer());
@@ -386,6 +391,11 @@ public class GridIoMessageFactory implements 
MessageFactoryProvider {
         factory.register(StatisticsRequest.TYPE_CODE, StatisticsRequest::new);
         factory.register(StatisticsResponse.TYPE_CODE, 
StatisticsResponse::new);
 
+        factory.register(CachePartitionPartialCountersMap.TYPE_CODE, 
CachePartitionPartialCountersMap::new,
+            new CachePartitionPartialCountersMapSerializer());
+        factory.register(IgniteDhtDemandedPartitionsMap.TYPE_CODE, 
IgniteDhtDemandedPartitionsMap::new,
+                new IgniteDhtDemandedPartitionsMapSerializer());
+
         // [-3..119] [124..129] [-23..-28] [-36..-55] [183..188] - this
         // [120..123] - DR
         // [-44, 0..2, 42, 200..204, 210, 302] - Use in tests.
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheOffheapManagerImpl.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheOffheapManagerImpl.java
index 923eb153bef..bfcdd05db3f 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheOffheapManagerImpl.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheOffheapManagerImpl.java
@@ -949,7 +949,7 @@ public class IgniteCacheOffheapManagerImpl implements 
IgniteCacheOffheapManager
 
         Set<Integer> missing = new HashSet<>();
 
-        for (Integer p : parts.fullSet()) {
+        for (Integer p : parts.full()) {
             GridCloseableIterator<CacheDataRow> partIter = reservedIterator(p, 
topVer);
 
             if (partIter == null) {
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/CachePartitionPartialCountersMap.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/CachePartitionPartialCountersMap.java
index bc9871bb5fb..eaf0593154e 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/CachePartitionPartialCountersMap.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/CachePartitionPartialCountersMap.java
@@ -21,13 +21,18 @@ import java.io.Serializable;
 import java.util.Arrays;
 import java.util.Collections;
 import java.util.Map;
+import org.apache.ignite.internal.Order;
 import org.apache.ignite.internal.util.typedef.T2;
 import org.apache.ignite.internal.util.typedef.internal.U;
+import org.apache.ignite.plugin.extensions.communication.Message;
 
 /**
  *
  */
-public class CachePartitionPartialCountersMap implements Serializable {
+public class CachePartitionPartialCountersMap implements Serializable, Message 
{
+    /** */
+    public static final short TYPE_CODE = 500;
+
     /** */
     private static final long serialVersionUID = 0L;
 
@@ -35,19 +40,23 @@ public class CachePartitionPartialCountersMap implements 
Serializable {
     public static final CachePartitionPartialCountersMap EMPTY = new 
CachePartitionPartialCountersMap();
 
     /** */
+    @Order(value = 0, method = "partitionIds")
     private int[] partIds;
 
     /** */
+    @Order(value = 1, method = "initialUpdateCounters")
     private long[] initialUpdCntrs;
 
     /** */
+    @Order(value = 2, method = "updateCounters")
     private long[] updCntrs;
 
     /** */
+    @Order(value = 3, method = "currentIndex")
     private int curIdx;
 
     /** */
-    private CachePartitionPartialCountersMap() {
+    public CachePartitionPartialCountersMap() {
         this(0);
     }
 
@@ -226,4 +235,65 @@ public class CachePartitionPartialCountersMap implements 
Serializable {
 
         return sb.toString();
     }
+
+    /**
+     * @return Partition IDs.
+     */
+    public int[] partitionIds() {
+        return partIds;
+    }
+
+    /**
+     * @param partIds Partition IDs.
+     */
+    public void partitionIds(int[] partIds) {
+        this.partIds = partIds;
+    }
+
+    /**
+     * @return Partition initial update counters.
+     */
+    public long[] initialUpdateCounters() {
+        return initialUpdCntrs;
+    }
+
+    /**
+     * @param initialUpdCntrs Partition initial update counters.
+     */
+    public void initialUpdateCounters(long[] initialUpdCntrs) {
+        this.initialUpdCntrs = initialUpdCntrs;
+    }
+
+    /**
+     * @return Partition update counters.
+     */
+    public long[] updateCounters() {
+        return updCntrs;
+    }
+
+    /**
+     * @param updCntrs Partition update counters.
+     */
+    public void updateCounters(long[] updCntrs) {
+        this.updCntrs = updCntrs;
+    }
+
+    /**
+     * @return Current index.
+     */
+    public int currentIndex() {
+        return curIdx;
+    }
+
+    /**
+     * @param curIdx Current index.
+     */
+    public void currentIndex(int curIdx) {
+        this.curIdx = curIdx;
+    }
+
+    /** {@inheritDoc} */
+    @Override public short directType() {
+        return TYPE_CODE;
+    }
 }
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionDemandMessage.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionDemandMessage.java
index d34af84101c..b06ce1df103 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionDemandMessage.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionDemandMessage.java
@@ -17,52 +17,34 @@
 
 package org.apache.ignite.internal.processors.cache.distributed.dht.preloader;
 
-import java.nio.ByteBuffer;
-import org.apache.ignite.IgniteCheckedException;
-import org.apache.ignite.internal.GridDirectTransient;
-import org.apache.ignite.internal.IgniteCodeGeneratingFail;
+import org.apache.ignite.internal.Order;
 import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion;
 import org.apache.ignite.internal.processors.cache.GridCacheGroupIdMessage;
-import 
org.apache.ignite.internal.processors.cache.GridCachePartitionExchangeManager;
-import org.apache.ignite.internal.processors.cache.GridCacheSharedContext;
 import org.apache.ignite.internal.util.typedef.internal.S;
-import org.apache.ignite.internal.util.typedef.internal.U;
-import org.apache.ignite.plugin.extensions.communication.MessageReader;
-import org.apache.ignite.plugin.extensions.communication.MessageWriter;
 import org.jetbrains.annotations.NotNull;
 
 /**
  * Partition demand request.
  */
-@IgniteCodeGeneratingFail
 public class GridDhtPartitionDemandMessage extends GridCacheGroupIdMessage {
-    /** Cache rebalance topic. */
-    private static final Object REBALANCE_TOPIC = 
GridCachePartitionExchangeManager.rebalanceTopic(0);
-
     /** Rebalance id. */
+    @Order(4)
     private long rebalanceId;
 
     /** Partitions map. */
-    @GridDirectTransient
+    @Order(value = 5, method = "partitions")
     private IgniteDhtDemandedPartitionsMap parts;
 
-    /** Serialized partitions map. */
-    private byte[] partsBytes;
-
-    /** Topic. */
-    @GridDirectTransient
-    private Object topic = REBALANCE_TOPIC;
-
-    /** Serialized topic. */
-    private byte[] topicBytes;
-
     /** Timeout. */
+    @Order(6)
     private long timeout;
 
     /** Worker ID. */
+    @Order(7)
     private int workerId = -1;
 
     /** Topology version. */
+    @Order(value = 8, method = "topologyVersion")
     private AffinityTopologyVersion topVer;
 
     /**
@@ -105,7 +87,6 @@ public class GridDhtPartitionDemandMessage extends 
GridCacheGroupIdMessage {
         GridDhtPartitionDemandMessage cp = new GridDhtPartitionDemandMessage();
         cp.grpId = grpId;
         cp.rebalanceId = rebalanceId;
-        cp.topic = topic;
         cp.timeout = timeout;
         cp.workerId = workerId;
         cp.topVer = topVer;
@@ -114,58 +95,58 @@ public class GridDhtPartitionDemandMessage extends 
GridCacheGroupIdMessage {
     }
 
     /**
-     * @return Partition.
+     * @return Partitions.
      */
     public IgniteDhtDemandedPartitionsMap partitions() {
         return parts;
     }
 
+    /**
+     * @param parts Partitions.
+     */
+    public void partitions(IgniteDhtDemandedPartitionsMap parts) {
+        this.parts = parts;
+    }
+
     /**
      * @param updateSeq Update sequence.
      */
-    void rebalanceId(long updateSeq) {
-        this.rebalanceId = updateSeq;
+    public void rebalanceId(long updateSeq) {
+        rebalanceId = updateSeq;
     }
 
     /**
      * @return Unique rebalance session id.
      */
-    long rebalanceId() {
+    public long rebalanceId() {
         return rebalanceId;
     }
 
     /**
      * @return Reply message timeout.
      */
-    long timeout() {
+    public long timeout() {
         return timeout;
     }
 
     /**
      * @param timeout Timeout.
      */
-    void timeout(long timeout) {
+    public void timeout(long timeout) {
         this.timeout = timeout;
     }
 
-    /**
-     * @return Topic.
-     */
-    Object topic() {
-        return topic;
-    }
-
     /**
      * @return Worker ID.
      */
-    int workerId() {
+    public int workerId() {
         return workerId;
     }
 
     /**
      * @param workerId Worker ID.
      */
-    void workerId(int workerId) {
+    public void workerId(int workerId) {
         this.workerId = workerId;
     }
 
@@ -176,26 +157,11 @@ public class GridDhtPartitionDemandMessage extends 
GridCacheGroupIdMessage {
         return topVer;
     }
 
-    /** {@inheritDoc} */
-    @Override public void prepareMarshal(GridCacheSharedContext ctx) throws 
IgniteCheckedException {
-        super.prepareMarshal(ctx);
-
-        if (topic != null && topicBytes == null)
-            topicBytes = U.marshal(ctx, topic);
-
-        if (parts != null && partsBytes == null)
-            partsBytes = U.marshal(ctx, parts);
-    }
-
-    /** {@inheritDoc} */
-    @Override public void finishUnmarshal(GridCacheSharedContext ctx, 
ClassLoader ldr) throws IgniteCheckedException {
-        super.finishUnmarshal(ctx, ldr);
-
-        if (topicBytes != null && topic == null)
-            topic = U.unmarshal(ctx, topicBytes, U.resolveClassLoader(ldr, 
ctx.gridConfig()));
-
-        if (partsBytes != null && parts == null)
-            parts = U.unmarshal(ctx, partsBytes, U.resolveClassLoader(ldr, 
ctx.gridConfig()));
+    /**
+     * @param topVer Topology version for which demand message is sent.
+     */
+    public void topologyVersion(AffinityTopologyVersion topVer) {
+        this.topVer = topVer;
     }
 
     /** {@inheritDoc} */
@@ -203,123 +169,6 @@ public class GridDhtPartitionDemandMessage extends 
GridCacheGroupIdMessage {
         return false;
     }
 
-    /** {@inheritDoc} */
-    @Override public boolean writeTo(ByteBuffer buf, MessageWriter writer) {
-        writer.setBuffer(buf);
-
-        if (!super.writeTo(buf, writer))
-            return false;
-
-        if (!writer.isHeaderWritten()) {
-            if (!writer.writeHeader(directType()))
-                return false;
-
-            writer.onHeaderWritten();
-        }
-
-        switch (writer.state()) {
-            case 4:
-                if (!writer.writeByteArray(partsBytes))
-                    return false;
-
-                writer.incrementState();
-
-            case 5:
-                if (!writer.writeLong(timeout))
-                    return false;
-
-                writer.incrementState();
-
-            case 6:
-                if (!writer.writeAffinityTopologyVersion(topVer))
-                    return false;
-
-                writer.incrementState();
-
-            case 7:
-                if (!writer.writeByteArray(topicBytes))
-                    return false;
-
-                writer.incrementState();
-
-            case 8:
-                if (!writer.writeLong(rebalanceId))
-                    return false;
-
-                writer.incrementState();
-
-            case 9:
-                if (!writer.writeInt(workerId))
-                    return false;
-
-                writer.incrementState();
-
-        }
-
-        return true;
-    }
-
-    /** {@inheritDoc} */
-    @Override public boolean readFrom(ByteBuffer buf, MessageReader reader) {
-        reader.setBuffer(buf);
-
-        if (!super.readFrom(buf, reader))
-            return false;
-
-        switch (reader.state()) {
-            case 4:
-                partsBytes = reader.readByteArray();
-
-                if (!reader.isLastRead())
-                    return false;
-
-                reader.incrementState();
-
-            case 5:
-                timeout = reader.readLong();
-
-                if (!reader.isLastRead())
-                    return false;
-
-                reader.incrementState();
-
-            case 6:
-                topVer = reader.readAffinityTopologyVersion();
-
-                if (!reader.isLastRead())
-                    return false;
-
-                reader.incrementState();
-
-            case 7:
-                topicBytes = reader.readByteArray();
-
-                if (!reader.isLastRead())
-                    return false;
-
-                reader.incrementState();
-
-            case 8:
-                rebalanceId = reader.readLong();
-
-                if (!reader.isLastRead())
-                    return false;
-
-                reader.incrementState();
-
-            case 9:
-                workerId = reader.readInt();
-
-                if (!reader.isLastRead())
-                    return false;
-
-                reader.incrementState();
-
-        }
-
-        return true;
-    }
-
     /** {@inheritDoc} */
     @Override public short directType() {
         return 45;
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionDemander.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionDemander.java
index 2eea404a145..e6ff9257eec 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionDemander.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionDemander.java
@@ -96,6 +96,7 @@ import static 
org.apache.ignite.events.EventType.EVT_CACHE_REBALANCE_STARTED;
 import static org.apache.ignite.events.EventType.EVT_CACHE_REBALANCE_STOPPED;
 import static 
org.apache.ignite.internal.processors.cache.CacheGroupMetricsImpl.CACHE_GROUP_METRICS_PREFIX;
 import static 
org.apache.ignite.internal.processors.cache.GridCacheUtils.TTL_ETERNAL;
+import static 
org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPreloader.REBALANCE_TOPIC;
 import static 
org.apache.ignite.internal.processors.cache.distributed.dht.topology.GridDhtPartitionState.MOVING;
 import static 
org.apache.ignite.internal.processors.cache.persistence.CheckpointState.FINISHED;
 import static 
org.apache.ignite.internal.processors.cache.persistence.CheckpointState.PAGE_SNAPSHOT_TAKEN;
@@ -406,7 +407,7 @@ public class GridDhtPartitionDemander {
                     metrics.clearRebalanceCounters();
 
                     for (GridDhtPartitionDemandMessage msg : 
assignments.values()) {
-                        for (Integer partId : msg.partitions().fullSet())
+                        for (Integer partId : msg.partitions().full())
                             
metrics.onRebalancingKeysCountEstimateReceived(grp.topology().globalPartSizes().get(partId));
 
                         CachePartitionPartialCountersMap histMap = 
msg.partitions().historicalMap();
@@ -685,7 +686,7 @@ public class GridDhtPartitionDemander {
                 if (!fut.isDone()) {
                     // Send demand message.
                     try {
-                        ctx.io().sendOrderedMessage(node, d.topic(), d, 
grp.ioPolicy(), grp.preloader().timeout());
+                        ctx.io().sendOrderedMessage(node, REBALANCE_TOPIC, d, 
grp.ioPolicy(), grp.preloader().timeout());
 
                         if (log.isDebugEnabled())
                             log.debug("Send next demand message [" + 
demandRoutineInfo(nodeId, supplyMsg) + "]");
@@ -1062,13 +1063,13 @@ public class GridDhtPartitionDemander {
                 HashSet<Integer> parts = new HashSet<>(v.partitions().size());
 
                 parts.addAll(v.partitions().historicalSet());
-                parts.addAll(v.partitions().fullSet());
+                parts.addAll(v.partitions().full());
 
                 rebalancingParts.put(k.id(), parts);
 
                 historical.addAll(v.partitions().historicalSet());
 
-                Stream.concat(v.partitions().historicalSet().stream(), 
v.partitions().fullSet().stream())
+                Stream.concat(v.partitions().historicalSet().stream(), 
v.partitions().full().stream())
                     .forEach(
                         p -> {
                             queued.put(p, new LongAdder());
@@ -1181,11 +1182,11 @@ public class GridDhtPartitionDemander {
                     // Make sure partitions scheduled for full rebalancing are 
cleared first.
                     // Clearing attempt is also required for in-memory caches 
because some partitions can be switched
                     // from RENTING to MOVING state in the middle of clearing.
-                    final int fullSetSize = d.partitions().fullSet().size();
+                    final int fullSetSize = d.partitions().full().size();
 
                     AtomicInteger waitCnt = new AtomicInteger(fullSetSize);
 
-                    for (Integer partId : d.partitions().fullSet()) {
+                    for (Integer partId : d.partitions().full()) {
                         GridDhtLocalPartition part = 
grp.topology().localPartition(partId);
 
                         // Due to rebalance cancellation it's possible for a 
group to be already partially rebalanced,
@@ -1213,7 +1214,7 @@ public class GridDhtPartitionDemander {
                     }
 
                     // The special case for historical only rebalancing.
-                    if (d.partitions().fullSet().isEmpty() && 
!d.partitions().historicalSet().isEmpty())
+                    if (d.partitions().full().isEmpty() && 
!d.partitions().historicalSet().isEmpty())
                         
ctx.kernalContext().closure().runLocalSafe((GridPlainRunnable)() -> 
requestPartitions0(node, parts, d));
                 }
             }
@@ -1237,11 +1238,11 @@ public class GridDhtPartitionDemander {
                     log.info("Starting rebalance routine [" + 
grp.cacheOrGroupName() +
                         ", topVer=" + topVer +
                         ", supplier=" + supplierNode.id() +
-                        ", fullPartitions=" + 
S.toStringSortedDistinct(parts.fullSet()) +
+                        ", fullPartitions=" + 
S.toStringSortedDistinct(parts.full()) +
                         ", histPartitions=" + 
S.toStringSortedDistinct(parts.historicalSet()) +
                         ", rebalanceId=" + rebalanceId + ']');
 
-                ctx.io().sendOrderedMessage(supplierNode, msg.topic(), msg, 
grp.ioPolicy(), msg.timeout());
+                ctx.io().sendOrderedMessage(supplierNode, REBALANCE_TOPIC, 
msg, grp.ioPolicy(), msg.timeout());
 
                 // Cleanup required in case partitions demanded in parallel 
with cancellation.
                 synchronized (this) {
@@ -1676,7 +1677,7 @@ public class GridDhtPartitionDemander {
                 p0.addAll(partitions);
 
             for (GridDhtPartitionDemandMessage msg : newAssignments.values()) {
-                p1.addAll(msg.partitions().fullSet());
+                p1.addAll(msg.partitions().full());
                 p1.addAll(msg.partitions().historicalSet());
             }
 
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionSupplier.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionSupplier.java
index cc29d0f58bd..85180dff2ef 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionSupplier.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionSupplier.java
@@ -53,6 +53,7 @@ import org.apache.ignite.spi.IgniteSpiException;
 
 import static 
org.apache.ignite.events.EventType.EVT_CACHE_REBALANCE_PART_MISSED;
 import static 
org.apache.ignite.events.EventType.EVT_CACHE_REBALANCE_PART_SUPPLIED;
+import static 
org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPreloader.REBALANCE_TOPIC;
 import static 
org.apache.ignite.internal.processors.cache.distributed.dht.topology.GridDhtPartitionState.OWNING;
 
 /**
@@ -267,7 +268,7 @@ public class GridDhtPartitionSupplier {
             if (sctx == null) {
                 if (log.isDebugEnabled())
                     log.debug("Starting supplying rebalancing [" + 
supplyRoutineInfo(topicId, nodeId, demandMsg) +
-                        ", fullPartitions=" + 
S.toStringSortedDistinct(demandMsg.partitions().fullSet()) +
+                        ", fullPartitions=" + 
S.toStringSortedDistinct(demandMsg.partitions().full()) +
                         ", histPartitions=" + 
S.toStringSortedDistinct(demandMsg.partitions().historicalSet()) + "]");
             }
             else
@@ -275,7 +276,7 @@ public class GridDhtPartitionSupplier {
 
             if (sctx == null || sctx.iterator == null) {
 
-                remainingParts = new 
HashSet<>(demandMsg.partitions().fullSet());
+                remainingParts = new HashSet<>(demandMsg.partitions().full());
 
                 CachePartitionPartialCountersMap histMap = 
demandMsg.partitions().historicalMap();
 
@@ -456,7 +457,7 @@ public class GridDhtPartitionSupplier {
 
                     // Mark all remaining partitions as missed to trigger full 
rebalance.
                     if (iter == null && F.isEmpty(remainingParts)) {
-                        remainingParts = new 
HashSet<>(demandMsg.partitions().fullSet());
+                        remainingParts = new 
HashSet<>(demandMsg.partitions().full());
                         
remainingParts.addAll(demandMsg.partitions().historicalSet());
                     }
 
@@ -518,7 +519,7 @@ public class GridDhtPartitionSupplier {
             if (log.isDebugEnabled())
                 log.debug("Send next supply message [" + 
supplyRoutineInfo(topicId, demander.id(), demandMsg) + "]");
 
-            grp.shared().io().sendOrderedMessage(demander, demandMsg.topic(), 
supplyMsg, grp.ioPolicy(), demandMsg.timeout());
+            grp.shared().io().sendOrderedMessage(demander, REBALANCE_TOPIC, 
supplyMsg, grp.ioPolicy(), demandMsg.timeout());
 
             // Throttle preloading.
             if (rebalanceThrottleOverride > 0)
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPreloader.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPreloader.java
index a22f7a764d3..606ba1352ae 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPreloader.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPreloader.java
@@ -32,6 +32,7 @@ import 
org.apache.ignite.internal.processors.affinity.AffinityAssignment;
 import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion;
 import org.apache.ignite.internal.processors.cache.CacheGroupContext;
 import org.apache.ignite.internal.processors.cache.GridCacheContext;
+import 
org.apache.ignite.internal.processors.cache.GridCachePartitionExchangeManager;
 import org.apache.ignite.internal.processors.cache.GridCachePreloaderAdapter;
 import org.apache.ignite.internal.processors.cache.KeyCacheObject;
 import 
org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtFuture;
@@ -62,6 +63,9 @@ public class GridDhtPreloader extends 
GridCachePreloaderAdapter {
     /** Default preload resend timeout. */
     public static final long DFLT_PRELOAD_RESEND_TIMEOUT = 1500;
 
+    /** Cache rebalance topic. */
+    static final Object REBALANCE_TOPIC = 
GridCachePartitionExchangeManager.rebalanceTopic(0);
+
     /** Disable rebalancing cancellation optimization. */
     private final boolean disableRebalancingCancellationOptimization = 
IgniteSystemProperties.getBoolean(
         IGNITE_DISABLE_REBALANCING_CANCELLATION_OPTIMIZATION);
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPreloaderAssignments.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPreloaderAssignments.java
index 73fbce85dee..d21f9ffa1b9 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPreloaderAssignments.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPreloaderAssignments.java
@@ -17,6 +17,8 @@
 
 package org.apache.ignite.internal.processors.cache.distributed.dht.preloader;
 
+import java.util.Collection;
+import java.util.HashSet;
 import java.util.Iterator;
 import java.util.Set;
 import java.util.concurrent.ConcurrentHashMap;
@@ -148,22 +150,22 @@ public class GridDhtPreloaderAssignments extends 
ConcurrentHashMap<ClusterNode,
                 }
             }
 
-            Set<Integer> curFullSet = cntrMap.fullSet();
+            Collection<Integer> curFull = cntrMap.full();
             Set<Integer> newFullSet = null;
 
-            if (!curFullSet.isEmpty()) {
+            if (!curFull.isEmpty()) {
                 int moving = 0;
 
                 // Fast-path check.
-                for (Integer partId : curFullSet) {
+                for (Integer partId : curFull) {
                     if (top.localPartition(partId).state() == MOVING)
                         moving++;
                 }
 
-                if (moving != curFullSet.size()) {
+                if (moving != curFull.size()) {
                     newFullSet = U.newHashSet(moving);
 
-                    for (Integer partId : curFullSet) {
+                    for (Integer partId : curFull) {
                         if (top.localPartition(partId).state() == MOVING)
                             newFullSet.add(partId);
                     }
@@ -175,7 +177,7 @@ public class GridDhtPreloaderAssignments extends 
ConcurrentHashMap<ClusterNode,
                     newHistMap = curHistMap;
 
                 if (newFullSet == null)
-                    newFullSet = curFullSet;
+                    newFullSet = new HashSet<>(curFull);
 
                 IgniteDhtDemandedPartitionsMap newMap = new 
IgniteDhtDemandedPartitionsMap(newHistMap, newFullSet);
 
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/IgniteDhtDemandedPartitionsMap.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/IgniteDhtDemandedPartitionsMap.java
index c27d856970d..91004bb83be 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/IgniteDhtDemandedPartitionsMap.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/IgniteDhtDemandedPartitionsMap.java
@@ -23,24 +23,31 @@ import java.util.Collection;
 import java.util.Collections;
 import java.util.HashSet;
 import java.util.Set;
+import org.apache.ignite.internal.Order;
 import org.apache.ignite.internal.util.tostring.GridToStringInclude;
 import org.apache.ignite.internal.util.typedef.F;
 import org.apache.ignite.internal.util.typedef.internal.S;
+import org.apache.ignite.plugin.extensions.communication.Message;
 import org.jetbrains.annotations.Nullable;
 
 /**
  * Map of partitions demanded during rebalancing.
  */
-public class IgniteDhtDemandedPartitionsMap implements Serializable {
+public class IgniteDhtDemandedPartitionsMap implements Serializable, Message {
+    /** */
+    public static final short TYPE_CODE = 501;
+
     /** */
     private static final long serialVersionUID = 0L;
 
     /** Map of partitions that will be preloaded from history. (partId -> 
(fromCntr, toCntr)). */
+    @Order(value = 0, method = "historicalMap")
     private CachePartitionPartialCountersMap historical;
 
-    /** Set of partitions that will be preloaded from all it's current data. */
+    /** Set of partitions that require full rebalancing. */
+    @Order(1)
     @GridToStringInclude
-    private Set<Integer> full;
+    private Collection<Integer> full;
 
     /**
      * @param historical Historical partition set.
@@ -153,11 +160,21 @@ public class IgniteDhtDemandedPartitionsMap implements 
Serializable {
     }
 
     /** */
-    public Set<Integer> fullSet() {
+    public void historicalMap(CachePartitionPartialCountersMap historical) {
+        this.historical = historical;
+    }
+
+    /** */
+    public Collection<Integer> full() {
         if (full == null)
             return Collections.emptySet();
 
-        return Collections.unmodifiableSet(full);
+        return Collections.unmodifiableCollection(full);
+    }
+
+    /** */
+    public void full(Collection<Integer> full) {
+        this.full = full;
     }
 
     /** */
@@ -178,7 +195,7 @@ public class IgniteDhtDemandedPartitionsMap implements 
Serializable {
 
     /** */
     public Collection<Integer> all() {
-        return F.concat(false, fullSet(), historicalSet());
+        return F.concat(false, full(), historicalSet());
     }
 
 
@@ -186,4 +203,9 @@ public class IgniteDhtDemandedPartitionsMap implements 
Serializable {
     @Override public String toString() {
         return S.toString(IgniteDhtDemandedPartitionsMap.class, this);
     }
+
+    /** {@inheritDoc} */
+    @Override public short directType() {
+        return TYPE_CODE;
+    }
 }
diff --git 
a/modules/core/src/test/java/org/apache/ignite/cache/CircledRebalanceTest.java 
b/modules/core/src/test/java/org/apache/ignite/cache/CircledRebalanceTest.java
index 587ac90acdc..f8fc31de61a 100644
--- 
a/modules/core/src/test/java/org/apache/ignite/cache/CircledRebalanceTest.java
+++ 
b/modules/core/src/test/java/org/apache/ignite/cache/CircledRebalanceTest.java
@@ -169,7 +169,7 @@ public class CircledRebalanceTest extends 
GridCommonAbstractTest {
             if (msg instanceof GridDhtPartitionDemandMessage) {
                 GridDhtPartitionDemandMessage demandMsg = 
(GridDhtPartitionDemandMessage)msg;
 
-                hasFullRebalance.compareAndSet(false, 
!F.isEmpty(demandMsg.partitions().fullSet()));
+                hasFullRebalance.compareAndSet(false, 
!F.isEmpty(demandMsg.partitions().full()));
 
                 return true;
             }
diff --git 
a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/db/FullHistRebalanceOnClientStopTest.java
 
b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/db/FullHistRebalanceOnClientStopTest.java
index 7c84d32cacf..f95b8bb025b 100644
--- 
a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/db/FullHistRebalanceOnClientStopTest.java
+++ 
b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/db/FullHistRebalanceOnClientStopTest.java
@@ -246,7 +246,7 @@ public class FullHistRebalanceOnClientStopTest extends 
GridCommonAbstractTest {
                     }
                 }
 
-                if (!map.fullSet().isEmpty()) {
+                if (!map.full().isEmpty()) {
                     synchronized (mux) {
                         topVersForFull = true;
                     }
diff --git 
a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/db/wal/HistoricalReservationTest.java
 
b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/db/wal/HistoricalReservationTest.java
index 5478ec6e085..0c4c80211a3 100644
--- 
a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/db/wal/HistoricalReservationTest.java
+++ 
b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/db/wal/HistoricalReservationTest.java
@@ -194,7 +194,7 @@ public class HistoricalReservationTest extends 
GridCommonAbstractTest {
             if (msg instanceof GridDhtPartitionDemandMessage) {
                 GridDhtPartitionDemandMessage demandMsg = 
(GridDhtPartitionDemandMessage)msg;
 
-                if (!F.isEmpty(demandMsg.partitions().fullSet()))
+                if (!F.isEmpty(demandMsg.partitions().full()))
                     hasFullRebalance.compareAndSet(false, true);
             }
 
diff --git 
a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/db/wal/WalRebalanceRestartTest.java
 
b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/db/wal/WalRebalanceRestartTest.java
index 19f6e9f67bd..6b4d5e2552b 100644
--- 
a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/db/wal/WalRebalanceRestartTest.java
+++ 
b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/db/wal/WalRebalanceRestartTest.java
@@ -268,7 +268,7 @@ public class WalRebalanceRestartTest extends 
GridCommonAbstractTest {
                     if (rebTopVer == null || 
rebTopVer.before(demandMsg.topologyVersion()))
                         rebTopVer = demandMsg.topologyVersion();
 
-                    if (!F.isEmpty(demandMsg.partitions().fullSet()))
+                    if (!F.isEmpty(demandMsg.partitions().full()))
                         hasFullRebalance.compareAndSet(false, true);
                 }
 


Reply via email to