This is an automated email from the ASF dual-hosted git repository.

shishkovilja pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/ignite.git


The following commit(s) were added to refs/heads/master by this push:
     new 0bac455c329 IGNITE-26434 Refactor GridDhtPartitionSupplyMessage 
(#12355)
0bac455c329 is described below

commit 0bac455c32934957de87beb16a3ef2c8bab51faf
Author: Ilya Shishkov <[email protected]>
AuthorDate: Mon Sep 29 15:33:08 2025 +0300

    IGNITE-26434 Refactor GridDhtPartitionSupplyMessage (#12355)
---
 .../communication/GridIoMessageFactory.java        |   5 +-
 .../dht/preloader/GridDhtPartitionDemander.java    |  20 +-
 .../dht/preloader/GridDhtPartitionSupplier.java    |  28 +-
 .../GridDhtPartitionSupplyErrorMessage.java        | 138 -------
 .../preloader/GridDhtPartitionSupplyMessage.java   | 402 +++++++--------------
 .../main/resources/META-INF/classnames.properties  |   1 -
 .../cache/CacheNoAffinityExchangeTest.java         |   4 +-
 7 files changed, 152 insertions(+), 446 deletions(-)

diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoMessageFactory.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoMessageFactory.java
index 163d9d211f1..50f6d2df1a3 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoMessageFactory.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoMessageFactory.java
@@ -42,6 +42,7 @@ import 
org.apache.ignite.internal.codegen.GridDhtAffinityAssignmentRequestSerial
 import org.apache.ignite.internal.codegen.GridDhtAtomicNearResponseSerializer;
 import org.apache.ignite.internal.codegen.GridDhtForceKeysRequestSerializer;
 import org.apache.ignite.internal.codegen.GridDhtPartitionExchangeIdSerializer;
+import 
org.apache.ignite.internal.codegen.GridDhtPartitionSupplyMessageSerializer;
 import 
org.apache.ignite.internal.codegen.GridDhtPartitionsSingleRequestSerializer;
 import 
org.apache.ignite.internal.codegen.GridDhtTxOnePhaseCommitAckRequestSerializer;
 import org.apache.ignite.internal.codegen.GridDhtTxPrepareRequestSerializer;
@@ -145,7 +146,6 @@ import 
org.apache.ignite.internal.processors.cache.distributed.dht.preloader.Gri
 import 
org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtForceKeysResponse;
 import 
org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPartitionDemandMessage;
 import 
org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPartitionExchangeId;
-import 
org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPartitionSupplyErrorMessage;
 import 
org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPartitionSupplyMessage;
 import 
org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPartitionsFullMessage;
 import 
org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPartitionsSingleMessage;
@@ -337,7 +337,7 @@ public class GridIoMessageFactory implements 
MessageFactoryProvider {
         factory.register((short)109, GridQueryNextPageResponse::new, new 
GridQueryNextPageResponseSerializer());
         factory.register((short)112, GridCacheSqlQuery::new, new 
GridCacheSqlQuerySerializer());
         // 113 - BinaryObjectImpl
-        factory.register((short)114, GridDhtPartitionSupplyMessage::new);
+        factory.register((short)114, GridDhtPartitionSupplyMessage::new, new 
GridDhtPartitionSupplyMessageSerializer());
         factory.register((short)115, UUIDCollectionMessage::new);
         factory.register((short)116, GridNearSingleGetRequest::new);
         factory.register((short)117, GridNearSingleGetResponse::new);
@@ -356,7 +356,6 @@ public class GridIoMessageFactory implements 
MessageFactoryProvider {
         factory.register((short)134, ContinuousRoutineStartResultMessage::new);
         factory.register((short)135, LatchAckMessage::new, new 
LatchAckMessageSerializer());
         factory.register((short)157, PartitionUpdateCountersMessage::new);
-        factory.register((short)158, GridDhtPartitionSupplyErrorMessage::new);
         factory.register((short)162, GenerateEncryptionKeyRequest::new, new 
GenerateEncryptionKeyRequestSerializer());
         factory.register((short)163, GenerateEncryptionKeyResponse::new);
         factory.register((short)167, ServiceDeploymentProcessId::new);
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionDemander.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionDemander.java
index 16a68ea4703..2eea404a145 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionDemander.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionDemander.java
@@ -472,7 +472,7 @@ public class GridDhtPartitionDemander {
         if (fut.isActual(supplyMsg.rebalanceId())) {
             boolean historical = false;
 
-            for (Integer p : supplyMsg.infos().keySet()) {
+            for (Integer p : supplyMsg.getInfosSafe().keySet()) {
                 fut.queued.get(p).increment();
 
                 if (fut.historical.contains(p))
@@ -546,7 +546,7 @@ public class GridDhtPartitionDemander {
             if (msgExc != null) {
                 GridDhtPartitionMap partMap = top.localPartitionMap();
 
-                Set<Integer> unstableParts = 
supplyMsg.infos().keySet().stream()
+                Set<Integer> unstableParts = 
supplyMsg.getInfosSafe().keySet().stream()
                     .filter(p -> partMap.get(p) == MOVING)
                     .collect(Collectors.toSet());
 
@@ -563,12 +563,6 @@ public class GridDhtPartitionDemander {
 
             for (GridCacheContext cctx : grp.caches()) {
                 if (cctx.statisticsEnabled()) {
-                    long keysCnt = grp.sharedGroup() ? 
supplyMsg.keysForCache(cctx.cacheId()) :
-                        supplyMsg.estimatedKeysCount();
-
-                    if (keysCnt != -1)
-                        
cctx.cache().metrics0().onRebalancingKeysCountEstimateReceived(keysCnt);
-
                     // Can not be calculated per cache.
                     
cctx.cache().metrics0().onRebalanceBatchReceived(supplyMsg.messageSize());
                 }
@@ -578,7 +572,7 @@ public class GridDhtPartitionDemander {
                 AffinityAssignment aff = grp.affinity().cachedAffinity(topVer);
 
                 // Preload.
-                for (Map.Entry<Integer, CacheEntryInfoCollection> e : 
supplyMsg.infos().entrySet()) {
+                for (Map.Entry<Integer, CacheEntryInfoCollection> e : 
supplyMsg.getInfosSafe().entrySet()) {
                     int p = e.getKey();
 
                     if (aff.get(p).contains(ctx.localNode())) {
@@ -604,7 +598,7 @@ public class GridDhtPartitionDemander {
 
                         assert part != null;
 
-                        boolean last = supplyMsg.last().containsKey(p);
+                        boolean last = supplyMsg.last() != null && 
supplyMsg.last().containsKey(p);
 
                         if (part.state() == MOVING) {
                             boolean reserved = part.reserve();
@@ -670,13 +664,15 @@ public class GridDhtPartitionDemander {
                     }
                 }
 
+                Collection<Integer> missed = supplyMsg.missed() == null ? 
Collections.emptyList() : supplyMsg.missed();
+
                 // Only request partitions based on latest topology version.
-                for (Integer miss : supplyMsg.missed()) {
+                for (Integer miss : missed) {
                     if (aff.get(miss).contains(ctx.localNode()))
                         fut.partitionMissed(nodeId, miss);
                 }
 
-                for (Integer miss : supplyMsg.missed())
+                for (Integer miss : missed)
                     fut.partitionDone(nodeId, miss, false);
 
                 GridDhtPartitionDemandMessage d = new 
GridDhtPartitionDemandMessage(
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionSupplier.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionSupplier.java
index 6f6ccdc1337..cc29d0f58bd 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionSupplier.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionSupplier.java
@@ -40,7 +40,6 @@ import 
org.apache.ignite.internal.processors.cache.CacheGroupContext;
 import org.apache.ignite.internal.processors.cache.GridCacheEntryInfo;
 import org.apache.ignite.internal.processors.cache.IgniteRebalanceIterator;
 import 
org.apache.ignite.internal.processors.cache.distributed.dht.topology.GridDhtLocalPartition;
-import 
org.apache.ignite.internal.processors.cache.distributed.dht.topology.GridDhtPartitionState;
 import 
org.apache.ignite.internal.processors.cache.distributed.dht.topology.GridDhtPartitionTopology;
 import org.apache.ignite.internal.processors.cache.persistence.CacheDataRow;
 import org.apache.ignite.internal.util.tostring.GridToStringExclude;
@@ -287,27 +286,6 @@ public class GridDhtPartitionSupplier {
                 }
 
                 iter = grp.offheap().rebalanceIterator(demandMsg.partitions(), 
demandMsg.topologyVersion());
-
-                for (Integer part : demandMsg.partitions().fullSet()) {
-                    if (iter.isPartitionMissing(part))
-                        continue;
-
-                    GridDhtLocalPartition loc = top.localPartition(part, 
demandMsg.topologyVersion(), false);
-
-                    assert loc != null && loc.state() == 
GridDhtPartitionState.OWNING
-                        : "Partition should be in OWNING state: " + loc;
-
-                    
supplyMsg.addEstimatedKeysCount(loc.dataStore().fullSize());
-                }
-
-                for (int i = 0; i < histMap.size(); i++) {
-                    int p = histMap.partitionAt(i);
-
-                    if (iter.isPartitionMissing(p))
-                        continue;
-
-                    supplyMsg.addEstimatedKeysCount(histMap.updateCounterAt(i) 
- histMap.initialUpdateCounterAt(i));
-                }
             }
             else {
                 iter = sctx.iterator;
@@ -381,7 +359,7 @@ public class GridDhtPartitionSupplier {
                 supplyMsg.addEntry0(part, iter.historical(part), info, 
grp.shared(), grp.cacheObjectContext());
 
                 if (iter.isPartitionDone(part)) {
-                    supplyMsg.last(part, loc.updateCounter());
+                    supplyMsg.addLast(part, loc.updateCounter());
 
                     remainingParts.remove(part);
 
@@ -401,7 +379,7 @@ public class GridDhtPartitionSupplier {
                     assert loc != null
                         : "Supply partition is gone: grp=" + 
grp.cacheOrGroupName() + ", p=" + p;
 
-                    supplyMsg.last(p, loc.updateCounter());
+                    supplyMsg.addLast(p, loc.updateCounter());
 
                     remainingIter.remove();
 
@@ -488,7 +466,7 @@ public class GridDhtPartitionSupplier {
                     errMsg = supplyMsg;
                 }
                 else {
-                    errMsg = new GridDhtPartitionSupplyErrorMessage(
+                    errMsg = new GridDhtPartitionSupplyMessage(
                         demandMsg.rebalanceId(),
                         grp.groupId(),
                         demandMsg.topologyVersion(),
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionSupplyErrorMessage.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionSupplyErrorMessage.java
deleted file mode 100644
index 89a44bd69df..00000000000
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionSupplyErrorMessage.java
+++ /dev/null
@@ -1,138 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.ignite.internal.processors.cache.distributed.dht.preloader;
-
-import java.nio.ByteBuffer;
-import org.apache.ignite.IgniteCheckedException;
-import org.apache.ignite.internal.GridDirectTransient;
-import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion;
-import org.apache.ignite.internal.processors.cache.GridCacheSharedContext;
-import org.apache.ignite.internal.util.typedef.internal.U;
-import org.apache.ignite.plugin.extensions.communication.MessageReader;
-import org.apache.ignite.plugin.extensions.communication.MessageWriter;
-import org.jetbrains.annotations.Nullable;
-
-/**
- * Supply message with supplier error transfer support.
- */
-public class GridDhtPartitionSupplyErrorMessage extends 
GridDhtPartitionSupplyMessage {
-    /** Supplying process error. */
-    @GridDirectTransient
-    private Throwable err;
-
-    /** Supplying process error bytes. */
-    private byte[] errBytes;
-
-    /**
-     * Default constructor.
-     */
-    public GridDhtPartitionSupplyErrorMessage() {
-    }
-
-    /**
-     * @param rebalanceId Rebalance id.
-     * @param grpId Group id.
-     * @param topVer Topology version.
-     * @param addDepInfo Add dep info.
-     * @param err Supply process error.
-     */
-    public GridDhtPartitionSupplyErrorMessage(
-        long rebalanceId,
-        int grpId,
-        AffinityTopologyVersion topVer,
-        boolean addDepInfo,
-        Throwable err
-    ) {
-        super(rebalanceId, grpId, topVer, addDepInfo);
-
-        this.err = err;
-    }
-
-    /** {@inheritDoc} */
-    @Override public void prepareMarshal(GridCacheSharedContext ctx) throws 
IgniteCheckedException {
-        super.prepareMarshal(ctx);
-
-        if (err != null && errBytes == null)
-            errBytes = U.marshal(ctx, err);
-    }
-
-    /** {@inheritDoc} */
-    @Override public void finishUnmarshal(GridCacheSharedContext ctx, 
ClassLoader ldr) throws IgniteCheckedException {
-        super.finishUnmarshal(ctx, ldr);
-
-        if (errBytes != null && err == null)
-            err = U.unmarshal(ctx, errBytes, U.resolveClassLoader(ldr, 
ctx.gridConfig()));
-    }
-
-    /** {@inheritDoc} */
-    @Override public boolean writeTo(ByteBuffer buf, MessageWriter writer) {
-        writer.setBuffer(buf);
-
-        if (!super.writeTo(buf, writer))
-            return false;
-
-        if (!writer.isHeaderWritten()) {
-            if (!writer.writeHeader(directType()))
-                return false;
-
-            writer.onHeaderWritten();
-        }
-
-        switch (writer.state()) {
-            case 13:
-                if (!writer.writeByteArray(errBytes))
-                    return false;
-
-                writer.incrementState();
-
-        }
-
-        return true;
-    }
-
-    /** {@inheritDoc} */
-    @Override public boolean readFrom(ByteBuffer buf, MessageReader reader) {
-        reader.setBuffer(buf);
-
-        if (!super.readFrom(buf, reader))
-            return false;
-
-        switch (reader.state()) {
-            case 13:
-                errBytes = reader.readByteArray();
-
-                if (!reader.isLastRead())
-                    return false;
-
-                reader.incrementState();
-
-        }
-
-        return true;
-    }
-
-    /** {@inheritDoc} */
-    @Nullable @Override public Throwable error() {
-        return err;
-    }
-
-    /** {@inheritDoc} */
-    @Override public short directType() {
-        return 158;
-    }
-}
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionSupplyMessage.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionSupplyMessage.java
index c894a368113..c01319b763f 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionSupplyMessage.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionSupplyMessage.java
@@ -17,17 +17,13 @@
 
 package org.apache.ignite.internal.processors.cache.distributed.dht.preloader;
 
-import java.nio.ByteBuffer;
 import java.util.Collection;
-import java.util.Collections;
 import java.util.HashMap;
 import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
 import org.apache.ignite.IgniteCheckedException;
-import org.apache.ignite.internal.GridDirectCollection;
-import org.apache.ignite.internal.GridDirectMap;
-import org.apache.ignite.internal.IgniteCodeGeneratingFail;
+import org.apache.ignite.internal.Order;
 import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion;
 import org.apache.ignite.internal.processors.cache.CacheEntryInfoCollection;
 import org.apache.ignite.internal.processors.cache.CacheGroupContext;
@@ -38,48 +34,46 @@ import 
org.apache.ignite.internal.processors.cache.GridCacheGroupIdMessage;
 import org.apache.ignite.internal.processors.cache.GridCacheSharedContext;
 import org.apache.ignite.internal.util.tostring.GridToStringInclude;
 import org.apache.ignite.internal.util.typedef.internal.S;
-import 
org.apache.ignite.plugin.extensions.communication.MessageCollectionItemType;
-import org.apache.ignite.plugin.extensions.communication.MessageReader;
-import org.apache.ignite.plugin.extensions.communication.MessageWriter;
+import org.apache.ignite.internal.util.typedef.internal.U;
+import org.jetbrains.annotations.Nullable;
 
 /**
  * Partition supply message.
  */
-@IgniteCodeGeneratingFail
+@SuppressWarnings("AssignmentOrReturnOfFieldWithMutableType")
 public class GridDhtPartitionSupplyMessage extends GridCacheGroupIdMessage 
implements GridCacheDeployable {
     /** An unique (per demander) rebalance id. */
+    @Order(4)
     private long rebalanceId;
 
-    /** Topology version. */
+    /** Topology version for which demand message is sent. */
+    @Order(value = 5, method = "topologyVersion")
     private AffinityTopologyVersion topVer;
 
     /** Partitions that have been fully sent. */
-    @GridDirectMap(keyType = int.class, valueType = long.class)
+    @Order(6)
     private Map<Integer, Long> last;
 
     /** Partitions which were not found. */
     @GridToStringInclude
-    @GridDirectCollection(int.class)
+    @Order(7)
     private Collection<Integer> missed;
 
-    /** Partitions for which we were able to get historical iterator. */
-    @GridToStringInclude
-    @GridDirectCollection(int.class)
-    private Collection<Integer> clean;
-
     /** Entries. */
-    @GridDirectMap(keyType = int.class, valueType = 
CacheEntryInfoCollection.class)
+    @Order(8)
     private Map<Integer, CacheEntryInfoCollection> infos;
 
     /** Message size. */
+    @Order(value = 9, method = "messageSize")
     private int msgSize;
 
-    /** Estimated keys count. */
-    private long estimatedKeysCnt = -1;
+    /** Supplying process error. */
+    private Throwable err;
 
-    /** Estimated keys count per cache in case the message is for shared 
group. */
-    @GridDirectMap(keyType = int.class, valueType = long.class)
-    private Map<Integer, Long> keysPerCache;
+    // TODO: Should be removed in 
https://issues.apache.org/jira/browse/IGNITE-26523
+    /** Serialized form of supplying process error. */
+    @Order(10)
+    private byte[] errBytes;
 
     /**
      * @param rebalanceId Rebalance id.
@@ -99,6 +93,26 @@ public class GridDhtPartitionSupplyMessage extends 
GridCacheGroupIdMessage imple
         this.addDepInfo = addDepInfo;
     }
 
+    /**
+     * @param rebalanceId Rebalance id.
+     * @param grpId Cache group ID.
+     * @param topVer Topology version.
+     * @param addDepInfo Deployment info flag.
+     */
+    GridDhtPartitionSupplyMessage(
+        long rebalanceId,
+        int grpId,
+        AffinityTopologyVersion topVer,
+        boolean addDepInfo,
+        Throwable err
+    ) {
+        this.grpId = grpId;
+        this.rebalanceId = rebalanceId;
+        this.topVer = topVer;
+        this.addDepInfo = addDepInfo;
+        this.err = err;
+    }
+
     /**
      * Empty constructor.
      */
@@ -112,12 +126,19 @@ public class GridDhtPartitionSupplyMessage extends 
GridCacheGroupIdMessage imple
     }
 
     /**
-     * @return Rebalance id.
+     * @return An unique (per demander) rebalance id.
      */
-    long rebalanceId() {
+    public long rebalanceId() {
         return rebalanceId;
     }
 
+    /**
+     * @param rebalanceId New unique (per demander) rebalance id.
+     */
+    public void rebalanceId(long rebalanceId) {
+        this.rebalanceId = rebalanceId;
+    }
+
     /**
      * @return Topology version for which demand message is sent.
      */
@@ -126,16 +147,30 @@ public class GridDhtPartitionSupplyMessage extends 
GridCacheGroupIdMessage imple
     }
 
     /**
-     * @return Flag to indicate last message for partition.
+     * @param topVer New topology version for which demand message is sent.
+     */
+    public void topologyVersion(AffinityTopologyVersion topVer) {
+        this.topVer = topVer;
+    }
+
+    /**
+     * @return Partitions that have been fully sent.
+     */
+    public Map<Integer, Long> last() {
+        return last;
+    }
+
+    /**
+     * @param last New map of partitions that have been fully sent.
      */
-    Map<Integer, Long> last() {
-        return last == null ? Collections.<Integer, Long>emptyMap() : last;
+    public void last(Map<Integer, Long> last) {
+        this.last = last;
     }
 
     /**
      * @param p Partition which was fully sent.
      */
-    void last(int p, long cntr) {
+    void addLast(int p, long cntr) {
         if (last == null)
             last = new HashMap<>();
 
@@ -143,35 +178,16 @@ public class GridDhtPartitionSupplyMessage extends 
GridCacheGroupIdMessage imple
             msgSize += 12;
 
             // If partition is empty, we need to add it.
-            if (!infos().containsKey(p)) {
+            if (!getInfosSafe().containsKey(p)) {
                 CacheEntryInfoCollection infoCol = new 
CacheEntryInfoCollection();
 
                 infoCol.init();
 
-                infos().put(p, infoCol);
+                getInfosSafe().put(p, infoCol);
             }
         }
     }
 
-    /**
-     * @param p Partition to clean.
-     */
-    void clean(int p) {
-        if (clean == null)
-            clean = new HashSet<>();
-
-        if (clean.add(p))
-            msgSize += 4;
-    }
-
-    /**
-     * @param p Partition to check.
-     * @return Check result.
-     */
-    boolean isClean(int p) {
-        return clean != null && clean.contains(p);
-    }
-
     /**
      * @param p Missed partition.
      */
@@ -186,27 +202,74 @@ public class GridDhtPartitionSupplyMessage extends 
GridCacheGroupIdMessage imple
     /**
      * @return Missed partitions.
      */
-    Collection<Integer> missed() {
-        return missed == null ? Collections.<Integer>emptySet() : missed;
+    public Collection<Integer> missed() {
+        return missed;
+    }
+
+    /**
+     * @param missed New partitions which were not found.
+     */
+    public void missed(Collection<Integer> missed) {
+        this.missed = missed;
     }
 
     /**
      * @return Entries.
      */
-    Map<Integer, CacheEntryInfoCollection> infos() {
+    public Map<Integer, CacheEntryInfoCollection> getInfosSafe() {
         if (infos == null)
             infos = new HashMap<>();
 
         return infos;
     }
 
+    /**
+     * @return Entries.
+     */
+    public Map<Integer, CacheEntryInfoCollection> infos() {
+        return infos;
+    }
+
+    /**
+     * @param infos New entries.
+     */
+    public void infos(Map<Integer, CacheEntryInfoCollection> infos) {
+        this.infos = infos;
+    }
+
+    /** Supplying process error. */
+    @Nullable @Override public Throwable error() {
+        return err;
+    }
+
+    /**
+     * @return Serialized form of supplying process error.
+     */
+    public byte[] errBytes() {
+        return errBytes;
+    }
+
+    /**
+     * @param errBytes New serialized form of supplying process error.
+     */
+    public void errBytes(byte[] errBytes) {
+        this.errBytes = errBytes;
+    }
+
     /**
      * @return Message size.
      */
-    int messageSize() {
+    public int messageSize() {
         return msgSize;
     }
 
+    /**
+     * @param msgSize New message size.
+     */
+    public void messageSize(int msgSize) {
+        this.msgSize = msgSize;
+    }
+
     /**
      * @param p Partition.
      * @param historical {@code True} if partition rebalancing using WAL 
history.
@@ -215,7 +278,7 @@ public class GridDhtPartitionSupplyMessage extends 
GridCacheGroupIdMessage imple
      * @param cacheObjCtx Cache object context.
      * @throws IgniteCheckedException If failed.
      */
-    void addEntry0(int p, boolean historical, GridCacheEntryInfo info, 
GridCacheSharedContext ctx,
+    void addEntry0(int p, boolean historical, GridCacheEntryInfo info, 
GridCacheSharedContext<?, ?> ctx,
         CacheObjectContext cacheObjCtx) throws IgniteCheckedException {
         assert info != null;
         assert info.key() != null : info;
@@ -226,12 +289,12 @@ public class GridDhtPartitionSupplyMessage extends 
GridCacheGroupIdMessage imple
 
         msgSize += info.marshalledSize(cacheObjCtx);
 
-        CacheEntryInfoCollection infoCol = infos().get(p);
+        CacheEntryInfoCollection infoCol = getInfosSafe().get(p);
 
         if (infoCol == null) {
             msgSize += 4;
 
-            infos().put(p, infoCol = new CacheEntryInfoCollection());
+            getInfosSafe().put(p, infoCol = new CacheEntryInfoCollection());
 
             infoCol.init();
         }
@@ -240,8 +303,16 @@ public class GridDhtPartitionSupplyMessage extends 
GridCacheGroupIdMessage imple
     }
 
     /** {@inheritDoc} */
-    @SuppressWarnings("ForLoopReplaceableByForEach")
-    @Override public void finishUnmarshal(GridCacheSharedContext ctx, 
ClassLoader ldr) throws IgniteCheckedException {
+    @Override public void prepareMarshal(GridCacheSharedContext<?, ?> ctx) 
throws IgniteCheckedException {
+        super.prepareMarshal(ctx);
+
+        // TODO: Should be removed in 
https://issues.apache.org/jira/browse/IGNITE-26523
+        if (err != null && errBytes == null)
+            errBytes = U.marshal(ctx, err);
+    }
+
+    /** {@inheritDoc} */
+    @Override public void finishUnmarshal(GridCacheSharedContext<?, ?> ctx, 
ClassLoader ldr) throws IgniteCheckedException {
         super.finishUnmarshal(ctx, ldr);
 
         CacheGroupContext grp = ctx.cache().cacheGroup(grpId);
@@ -249,12 +320,16 @@ public class GridDhtPartitionSupplyMessage extends 
GridCacheGroupIdMessage imple
         if (grp == null)
             return;
 
-        for (CacheEntryInfoCollection col : infos().values()) {
+        for (CacheEntryInfoCollection col : getInfosSafe().values()) {
             List<GridCacheEntryInfo> entries = col.infos();
 
             for (int i = 0; i < entries.size(); i++)
                 entries.get(i).unmarshal(grp.cacheObjectContext(), ldr);
         }
+
+        // TODO: Should be removed in 
https://issues.apache.org/jira/browse/IGNITE-26523
+        if (errBytes != null && err == null)
+            err = U.unmarshal(ctx, errBytes, U.resolveClassLoader(ldr, 
ctx.gridConfig()));
     }
 
     /** {@inheritDoc} */
@@ -266,168 +341,7 @@ public class GridDhtPartitionSupplyMessage extends 
GridCacheGroupIdMessage imple
      * @return Number of entries in message.
      */
     public int size() {
-        return infos().size();
-    }
-
-    /** {@inheritDoc} */
-    @Override public boolean writeTo(ByteBuffer buf, MessageWriter writer) {
-        writer.setBuffer(buf);
-
-        if (!super.writeTo(buf, writer))
-            return false;
-
-        if (!writer.isHeaderWritten()) {
-            if (!writer.writeHeader(directType()))
-                return false;
-
-            writer.onHeaderWritten();
-        }
-
-        switch (writer.state()) {
-            case 4:
-                if (!writer.writeCollection(clean, 
MessageCollectionItemType.INT))
-                    return false;
-
-                writer.incrementState();
-
-            case 5:
-                if (!writer.writeLong(estimatedKeysCnt))
-                    return false;
-
-                writer.incrementState();
-
-            case 6:
-                if (!writer.writeMap(infos, MessageCollectionItemType.INT, 
MessageCollectionItemType.MSG))
-                    return false;
-
-                writer.incrementState();
-
-            case 7:
-                if (!writer.writeMap(keysPerCache, 
MessageCollectionItemType.INT, MessageCollectionItemType.LONG))
-                    return false;
-
-                writer.incrementState();
-
-            case 8:
-                if (!writer.writeMap(last, MessageCollectionItemType.INT, 
MessageCollectionItemType.LONG))
-                    return false;
-
-                writer.incrementState();
-
-            case 9:
-                if (!writer.writeCollection(missed, 
MessageCollectionItemType.INT))
-                    return false;
-
-                writer.incrementState();
-
-            case 10:
-                if (!writer.writeInt(msgSize))
-                    return false;
-
-                writer.incrementState();
-
-            case 11:
-                if (!writer.writeAffinityTopologyVersion(topVer))
-                    return false;
-
-                writer.incrementState();
-
-            case 12:
-                // Keep 'updateSeq' name for compatibility.
-                if (!writer.writeLong(rebalanceId))
-                    return false;
-
-                writer.incrementState();
-
-        }
-
-        return true;
-    }
-
-    /** {@inheritDoc} */
-    @Override public boolean readFrom(ByteBuffer buf, MessageReader reader) {
-        reader.setBuffer(buf);
-
-        if (!super.readFrom(buf, reader))
-            return false;
-
-        switch (reader.state()) {
-            case 4:
-                clean = reader.readCollection(MessageCollectionItemType.INT);
-
-                if (!reader.isLastRead())
-                    return false;
-
-                reader.incrementState();
-
-            case 5:
-                estimatedKeysCnt = reader.readLong();
-
-                if (!reader.isLastRead())
-                    return false;
-
-                reader.incrementState();
-
-            case 6:
-                infos = reader.readMap(MessageCollectionItemType.INT, 
MessageCollectionItemType.MSG, false);
-
-                if (!reader.isLastRead())
-                    return false;
-
-                reader.incrementState();
-
-            case 7:
-                keysPerCache = reader.readMap(MessageCollectionItemType.INT, 
MessageCollectionItemType.LONG, false);
-
-                if (!reader.isLastRead())
-                    return false;
-
-                reader.incrementState();
-
-            case 8:
-                last = reader.readMap(MessageCollectionItemType.INT, 
MessageCollectionItemType.LONG, false);
-
-                if (!reader.isLastRead())
-                    return false;
-
-                reader.incrementState();
-
-            case 9:
-                missed = reader.readCollection(MessageCollectionItemType.INT);
-
-                if (!reader.isLastRead())
-                    return false;
-
-                reader.incrementState();
-
-            case 10:
-                msgSize = reader.readInt();
-
-                if (!reader.isLastRead())
-                    return false;
-
-                reader.incrementState();
-
-            case 11:
-                topVer = reader.readAffinityTopologyVersion();
-
-                if (!reader.isLastRead())
-                    return false;
-
-                reader.incrementState();
-
-            case 12:
-                // Keep 'updateSeq' name for compatibility.
-                rebalanceId = reader.readLong();
-
-                if (!reader.isLastRead())
-                    return false;
-
-                reader.incrementState();
-
-        }
-
-        return true;
+        return getInfosSafe().size();
     }
 
     /** {@inheritDoc} */
@@ -435,53 +349,11 @@ public class GridDhtPartitionSupplyMessage extends 
GridCacheGroupIdMessage imple
         return 114;
     }
 
-    /**
-     * @return Estimated keys count.
-     */
-    public long estimatedKeysCount() {
-        return -1;
-    }
-
-    /**
-     * @param cnt Keys count to add.
-     */
-    public void addEstimatedKeysCount(long cnt) {
-        this.estimatedKeysCnt += cnt;
-    }
-
-    /**
-     * @return Estimated keys count for a given cache ID.
-     */
-    public long keysForCache(int cacheId) {
-        return -1;
-    }
-
-    /**
-     * @param cacheId Cache ID.
-     * @param cnt Keys count.
-     */
-    public void addKeysForCache(int cacheId, long cnt) {
-        assert cacheId != 0 && cnt >= 0;
-
-        if (keysPerCache == null)
-            keysPerCache = new HashMap<>();
-
-        Long cnt0 = keysPerCache.get(cacheId);
-
-        if (cnt0 == null) {
-            keysPerCache.put(cacheId, cnt);
-
-            msgSize += 12;
-        }
-        else
-            keysPerCache.put(cacheId, cnt0 + cnt);
-    }
-
     /** {@inheritDoc} */
     @Override public String toString() {
         return S.toString(GridDhtPartitionSupplyMessage.class, this,
             "size", size(),
-            "parts", infos().keySet(),
+            "parts", getInfosSafe().keySet(),
             "super", super.toString());
     }
 }
diff --git a/modules/core/src/main/resources/META-INF/classnames.properties 
b/modules/core/src/main/resources/META-INF/classnames.properties
index 319e2991b35..eb5449d73e3 100644
--- a/modules/core/src/main/resources/META-INF/classnames.properties
+++ b/modules/core/src/main/resources/META-INF/classnames.properties
@@ -1189,7 +1189,6 @@ 
org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPar
 
org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPartitionExchangeId
 
org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPartitionFullMap
 
org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPartitionMap
-org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPartitionSupplyErrorMessage
 
org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPartitionSupplyMessage
 
org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPartitionsAbstractMessage
 
org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPartitionsExchangeFuture$2
diff --git 
a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/CacheNoAffinityExchangeTest.java
 
b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/CacheNoAffinityExchangeTest.java
index 48851c0de24..6c092d77e6c 100644
--- 
a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/CacheNoAffinityExchangeTest.java
+++ 
b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/CacheNoAffinityExchangeTest.java
@@ -46,7 +46,7 @@ import org.apache.ignite.internal.IgniteInternalFuture;
 import org.apache.ignite.internal.TestRecordingCommunicationSpi;
 import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion;
 import 
org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtAffinityAssignmentRequest;
-import 
org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPartitionSupplyErrorMessage;
+import 
org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPartitionSupplyMessage;
 import 
org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPartitionsSingleMessage;
 import org.apache.ignite.internal.util.typedef.G;
 import org.apache.ignite.internal.util.typedef.internal.U;
@@ -290,7 +290,7 @@ public class CacheNoAffinityExchangeTest extends 
GridCommonAbstractTest {
 
             TestRecordingCommunicationSpi.spi(ig).blockMessages(new 
IgniteBiPredicate<ClusterNode, Message>() {
                 @Override public boolean apply(ClusterNode node, Message msg) {
-                    return msg instanceof GridDhtPartitionSupplyErrorMessage;
+                    return msg instanceof GridDhtPartitionSupplyMessage && 
((GridCacheMessage)msg).error() != null;
                 }
             });
 


Reply via email to