This is an automated email from the ASF dual-hosted git repository.

sergeychugunov pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/ignite.git


The following commit(s) were added to refs/heads/master by this push:
     new f935f99c09a IGNITE-26839 Add Message interface to 
IgniteDhtPartitionCountersMap  (#12467)
f935f99c09a is described below

commit f935f99c09a067d90c0b624285c6334425bf0094
Author: Dmitry Werner <[email protected]>
AuthorDate: Sat Nov 1 17:28:31 2025 +0500

    IGNITE-26839 Add Message interface to IgniteDhtPartitionCountersMap  
(#12467)
---
 .../communication/GridIoMessageFactory.java        |  19 +++-
 .../preloader/CachePartitionFullCountersMap.java   |  67 +++++++++++---
 .../preloader/GridDhtPartitionsExchangeFuture.java |   4 +-
 .../preloader/GridDhtPartitionsFullMessage.java    |  50 ++--------
 .../dht/preloader/GroupPartitionIdPair.java        | 101 +++++++++++++++++++++
 .../preloader/IgniteDhtPartitionCountersMap.java   |  29 +++++-
 .../IgniteDhtPartitionHistorySuppliersMap.java     |  48 ++++++----
 .../dht/preloader/PartitionReservationsMap.java    |  75 +++++++++++++++
 .../GridCacheDatabaseSharedManager.java            |   3 +-
 .../IgniteCacheDatabaseSharedManager.java          |   4 +-
 .../persistence/checkpoint/CheckpointHistory.java  |  17 ++--
 .../CacheExchangeMessageDuplicatedStateTest.java   |   4 +-
 .../persistence/WalPreloadingConcurrentTest.java   |   4 +-
 13 files changed, 329 insertions(+), 96 deletions(-)

diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoMessageFactory.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoMessageFactory.java
index ef9c9f1d733..09064df08e8 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoMessageFactory.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoMessageFactory.java
@@ -33,6 +33,7 @@ import 
org.apache.ignite.internal.codegen.CacheEntryPredicateAdapterSerializer;
 import org.apache.ignite.internal.codegen.CacheEvictionEntrySerializer;
 import org.apache.ignite.internal.codegen.CacheGroupAffinityMessageSerializer;
 import org.apache.ignite.internal.codegen.CacheInvokeDirectResultSerializer;
+import 
org.apache.ignite.internal.codegen.CachePartitionFullCountersMapSerializer;
 import 
org.apache.ignite.internal.codegen.CachePartitionPartialCountersMapSerializer;
 import org.apache.ignite.internal.codegen.CacheVersionedValueSerializer;
 import 
org.apache.ignite.internal.codegen.CacheWriteSynchronizationModeMessageSerializer;
@@ -101,9 +102,12 @@ import 
org.apache.ignite.internal.codegen.GridTaskCancelRequestSerializer;
 import org.apache.ignite.internal.codegen.GridTaskResultRequestSerializer;
 import org.apache.ignite.internal.codegen.GridTaskResultResponseSerializer;
 import org.apache.ignite.internal.codegen.GridTaskSessionRequestSerializer;
+import org.apache.ignite.internal.codegen.GroupPartitionIdPairSerializer;
 import org.apache.ignite.internal.codegen.HandshakeMessageSerializer;
 import org.apache.ignite.internal.codegen.HandshakeWaitMessageSerializer;
 import 
org.apache.ignite.internal.codegen.IgniteDhtDemandedPartitionsMapSerializer;
+import 
org.apache.ignite.internal.codegen.IgniteDhtPartitionCountersMapSerializer;
+import 
org.apache.ignite.internal.codegen.IgniteDhtPartitionHistorySuppliersMapSerializer;
 import org.apache.ignite.internal.codegen.IgniteTxKeySerializer;
 import 
org.apache.ignite.internal.codegen.IncrementalSnapshotAwareMessageSerializer;
 import org.apache.ignite.internal.codegen.JobStealingRequestSerializer;
@@ -114,6 +118,7 @@ import 
org.apache.ignite.internal.codegen.MissingMappingRequestMessageSerializer
 import 
org.apache.ignite.internal.codegen.MissingMappingResponseMessageSerializer;
 import org.apache.ignite.internal.codegen.NearCacheUpdatesSerializer;
 import org.apache.ignite.internal.codegen.NodeIdMessageSerializer;
+import org.apache.ignite.internal.codegen.PartitionReservationsMapSerializer;
 import 
org.apache.ignite.internal.codegen.RecoveryLastReceivedMessageSerializer;
 import 
org.apache.ignite.internal.codegen.SchemaOperationStatusMessageSerializer;
 import org.apache.ignite.internal.codegen.ServiceDeploymentProcessIdSerializer;
@@ -192,6 +197,7 @@ import 
org.apache.ignite.internal.processors.cache.distributed.dht.atomic.GridNe
 import 
org.apache.ignite.internal.processors.cache.distributed.dht.atomic.NearCacheUpdates;
 import 
org.apache.ignite.internal.processors.cache.distributed.dht.atomic.UpdateErrors;
 import 
org.apache.ignite.internal.processors.cache.distributed.dht.preloader.CacheGroupAffinityMessage;
+import 
org.apache.ignite.internal.processors.cache.distributed.dht.preloader.CachePartitionFullCountersMap;
 import 
org.apache.ignite.internal.processors.cache.distributed.dht.preloader.CachePartitionPartialCountersMap;
 import 
org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtForceKeysRequest;
 import 
org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtForceKeysResponse;
@@ -201,7 +207,11 @@ import 
org.apache.ignite.internal.processors.cache.distributed.dht.preloader.Gri
 import 
org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPartitionsFullMessage;
 import 
org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPartitionsSingleMessage;
 import 
org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPartitionsSingleRequest;
+import 
org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GroupPartitionIdPair;
 import 
org.apache.ignite.internal.processors.cache.distributed.dht.preloader.IgniteDhtDemandedPartitionsMap;
+import 
org.apache.ignite.internal.processors.cache.distributed.dht.preloader.IgniteDhtPartitionCountersMap;
+import 
org.apache.ignite.internal.processors.cache.distributed.dht.preloader.IgniteDhtPartitionHistorySuppliersMap;
+import 
org.apache.ignite.internal.processors.cache.distributed.dht.preloader.PartitionReservationsMap;
 import 
org.apache.ignite.internal.processors.cache.distributed.dht.preloader.latch.LatchAckMessage;
 import 
org.apache.ignite.internal.processors.cache.distributed.near.CacheVersionedValue;
 import 
org.apache.ignite.internal.processors.cache.distributed.near.GridNearGetRequest;
@@ -435,7 +445,6 @@ public class GridIoMessageFactory implements 
MessageFactoryProvider {
         factory.register(StatisticsRequest.TYPE_CODE, StatisticsRequest::new);
         factory.register(StatisticsResponse.TYPE_CODE, 
StatisticsResponse::new);
 
-        // Enums
         factory.register(CachePartitionPartialCountersMap.TYPE_CODE, 
CachePartitionPartialCountersMap::new,
             new CachePartitionPartialCountersMapSerializer());
         factory.register(IgniteDhtDemandedPartitionsMap.TYPE_CODE, 
IgniteDhtDemandedPartitionsMap::new,
@@ -447,6 +456,14 @@ public class GridIoMessageFactory implements 
MessageFactoryProvider {
         factory.register(GridCacheOperationMessage.TYPE_CODE, 
GridCacheOperationMessage::new, new GridCacheOperationMessageSerializer());
         factory.register(BinaryMetadataVersionInfo.TYPE_CODE, 
BinaryMetadataVersionInfo::new,
             new BinaryMetadataVersionInfoSerializer());
+        factory.register(CachePartitionFullCountersMap.TYPE_CODE, 
CachePartitionFullCountersMap::new,
+            new CachePartitionFullCountersMapSerializer());
+        factory.register(IgniteDhtPartitionCountersMap.TYPE_CODE, 
IgniteDhtPartitionCountersMap::new,
+            new IgniteDhtPartitionCountersMapSerializer());
+        factory.register(GroupPartitionIdPair.TYPE_CODE, 
GroupPartitionIdPair::new, new GroupPartitionIdPairSerializer());
+        factory.register(PartitionReservationsMap.TYPE_CODE, 
PartitionReservationsMap::new, new PartitionReservationsMapSerializer());
+        factory.register(IgniteDhtPartitionHistorySuppliersMap.TYPE_CODE, 
IgniteDhtPartitionHistorySuppliersMap::new,
+            new IgniteDhtPartitionHistorySuppliersMapSerializer());
 
         // [-3..119] [124..129] [-23..-28] [-36..-55] [183..188] - this
         // [120..123] - DR
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/CachePartitionFullCountersMap.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/CachePartitionFullCountersMap.java
index 80858a8a106..d5fb7ed1413 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/CachePartitionFullCountersMap.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/CachePartitionFullCountersMap.java
@@ -17,30 +17,40 @@
 
 package org.apache.ignite.internal.processors.cache.distributed.dht.preloader;
 
-import java.io.Serializable;
 import java.util.Arrays;
 import java.util.Collections;
 import java.util.HashSet;
 import java.util.Set;
+import org.apache.ignite.internal.Order;
+import org.apache.ignite.plugin.extensions.communication.Message;
 
 /**
  *
  */
-public class CachePartitionFullCountersMap implements Serializable {
-    /** */
-    private static final long serialVersionUID = 0L;
+public class CachePartitionFullCountersMap implements Message {
+    /** Type code. */
+    public static final short TYPE_CODE = 506;
 
     /** */
-    private long[] initialUpdCntrs;
+    @Order(value = 0, method = "initialUpdateCounters")
+    private long[] initUpdCntrs;
 
     /** */
+    @Order(value = 1, method = "updateCounters")
     private long[] updCntrs;
 
+    /**
+     * Default constructor.
+     */
+    public CachePartitionFullCountersMap() {
+        // No-op.
+    }
+
     /**
      * @param other Map to copy.
      */
     public CachePartitionFullCountersMap(CachePartitionFullCountersMap other) {
-        initialUpdCntrs = Arrays.copyOf(other.initialUpdCntrs, 
other.initialUpdCntrs.length);
+        initUpdCntrs = Arrays.copyOf(other.initUpdCntrs, 
other.initUpdCntrs.length);
         updCntrs = Arrays.copyOf(other.updCntrs, other.updCntrs.length);
     }
 
@@ -48,7 +58,7 @@ public class CachePartitionFullCountersMap implements 
Serializable {
      * @param partsCnt Total number of partitions.
      */
     public CachePartitionFullCountersMap(int partsCnt) {
-        initialUpdCntrs = new long[partsCnt];
+        initUpdCntrs = new long[partsCnt];
         updCntrs = new long[partsCnt];
     }
 
@@ -59,7 +69,7 @@ public class CachePartitionFullCountersMap implements 
Serializable {
      * @return Initial update counter for the partition with the given ID.
      */
     public long initialUpdateCounter(int p) {
-        return initialUpdCntrs[p];
+        return initUpdCntrs[p];
     }
 
     /**
@@ -76,10 +86,10 @@ public class CachePartitionFullCountersMap implements 
Serializable {
      * Sets an initial update counter by the partition ID.
      *
      * @param p Partition ID.
-     * @param initialUpdCntr Initial update counter to set.
+     * @param initUpdCntr Initial update counter to set.
      */
-    public void initialUpdateCounter(int p, long initialUpdCntr) {
-        initialUpdCntrs[p] = initialUpdCntr;
+    public void initialUpdateCounter(int p, long initUpdCntr) {
+        initUpdCntrs[p] = initUpdCntr;
     }
 
     /**
@@ -108,7 +118,40 @@ public class CachePartitionFullCountersMap implements 
Serializable {
      * Clears full counters map.
      */
     public void clear() {
-        Arrays.fill(initialUpdCntrs, 0);
+        Arrays.fill(initUpdCntrs, 0);
         Arrays.fill(updCntrs, 0);
     }
+
+    /**
+     * @return Initial update counters.
+     */
+    public long[] initialUpdateCounters() {
+        return initUpdCntrs;
+    }
+
+    /**
+     * @param initUpdCntrs Initial update counters.
+     */
+    public void initialUpdateCounters(long[] initUpdCntrs) {
+        this.initUpdCntrs = initUpdCntrs;
+    }
+
+    /**
+     * @return Update counters.
+     */
+    public long[] updateCounters() {
+        return updCntrs;
+    }
+
+    /**
+     * @param updCntrs Update counters.
+     */
+    public void updateCounters(long[] updCntrs) {
+        this.updCntrs = updCntrs;
+    }
+
+    /** {@inheritDoc} */
+    @Override public short directType() {
+        return TYPE_CODE;
+    }
 }
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java
index 35ac04f6385..0a1b955f0d0 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java
@@ -2461,10 +2461,10 @@ public class GridDhtPartitionsExchangeFuture extends 
GridDhtTopologyFutureAdapte
             // Create and destroy caches and cache proxies.
             cctx.cache().onExchangeDone(this, err);
 
-            Map<T2<Integer, Integer>, Long> locReserved = 
partHistSuppliers.getReservations(cctx.localNodeId());
+            PartitionReservationsMap locReserved = 
partHistSuppliers.getReservations(cctx.localNodeId());
 
             if (locReserved != null) {
-                boolean success = 
cctx.database().reserveHistoryForPreloading(locReserved);
+                boolean success = 
cctx.database().reserveHistoryForPreloading(locReserved.reservations());
 
                 if (!success) {
                     log.warning("Could not reserve history for historical 
rebalance " +
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsFullMessage.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsFullMessage.java
index 926a77c7644..469a85140f7 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsFullMessage.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsFullMessage.java
@@ -75,20 +75,12 @@ public class GridDhtPartitionsFullMessage extends 
GridDhtPartitionsAbstractMessa
 
     /** Partitions update counters. */
     @GridToStringInclude
-    @GridDirectTransient
     private IgniteDhtPartitionCountersMap partCntrs;
 
-    /** Serialized partitions counters. */
-    private byte[] partCntrsBytes;
-
     /** Partitions history suppliers. */
     @GridToStringInclude
-    @GridDirectTransient
     private IgniteDhtPartitionHistorySuppliersMap partHistSuppliers;
 
-    /** Serialized partitions history suppliers. */
-    private byte[] partHistSuppliersBytes;
-
     /** Partitions that must be cleared and re-loaded. */
     @GridToStringInclude
     @GridDirectTransient
@@ -184,9 +176,7 @@ public class GridDhtPartitionsFullMessage extends 
GridDhtPartitionsAbstractMessa
         cp.dupPartsData = dupPartsData;
         cp.partsBytes = partsBytes;
         cp.partCntrs = partCntrs;
-        cp.partCntrsBytes = partCntrsBytes;
         cp.partHistSuppliers = partHistSuppliers;
-        cp.partHistSuppliersBytes = partHistSuppliersBytes;
         cp.partsToReload = partsToReload;
         cp.partsToReloadBytes = partsToReloadBytes;
         cp.partsSizesBytes = partsSizesBytes;
@@ -437,12 +427,10 @@ public class GridDhtPartitionsFullMessage extends 
GridDhtPartitionsAbstractMessa
     }
 
     /** {@inheritDoc} */
-    @Override public void prepareMarshal(GridCacheSharedContext ctx) throws 
IgniteCheckedException {
+    @Override public void prepareMarshal(GridCacheSharedContext<?, ?> ctx) 
throws IgniteCheckedException {
         super.prepareMarshal(ctx);
 
         boolean marshal = (!F.isEmpty(parts) && partsBytes == null) ||
-            (partCntrs != null && !partCntrs.empty() && partCntrsBytes == 
null) ||
-            (partHistSuppliers != null && partHistSuppliersBytes == null) ||
             (partsToReload != null && partsToReloadBytes == null) ||
             (!F.isEmpty(errs) && errsBytes == null);
 
@@ -455,12 +443,6 @@ public class GridDhtPartitionsFullMessage extends 
GridDhtPartitionsAbstractMessa
             if (!F.isEmpty(parts) && partsBytes == null)
                 objectsToMarshall.add(parts);
 
-            if (partCntrs != null && !partCntrs.empty() && partCntrsBytes == 
null)
-                objectsToMarshall.add(partCntrs);
-
-            if (partHistSuppliers != null && partHistSuppliersBytes == null)
-                objectsToMarshall.add(partHistSuppliers);
-
             if (partsToReload != null && partsToReloadBytes == null)
                 objectsToMarshall.add(partsToReload);
 
@@ -487,12 +469,6 @@ public class GridDhtPartitionsFullMessage extends 
GridDhtPartitionsAbstractMessa
             if (!F.isEmpty(parts) && partsBytes == null)
                 partsBytes = iter.next();
 
-            if (partCntrs != null && !partCntrs.empty() && partCntrsBytes == 
null)
-                partCntrsBytes = iter.next();
-
-            if (partHistSuppliers != null && partHistSuppliersBytes == null)
-                partHistSuppliersBytes = iter.next();
-
             if (partsToReload != null && partsToReloadBytes == null)
                 partsToReloadBytes = iter.next();
 
@@ -516,7 +492,7 @@ public class GridDhtPartitionsFullMessage extends 
GridDhtPartitionsAbstractMessa
     }
 
     /** {@inheritDoc} */
-    @Override public void finishUnmarshal(GridCacheSharedContext ctx, 
ClassLoader ldr) throws IgniteCheckedException {
+    @Override public void finishUnmarshal(GridCacheSharedContext<?, ?> ctx, 
ClassLoader ldr) throws IgniteCheckedException {
         super.finishUnmarshal(ctx, ldr);
 
         ClassLoader clsLdr = U.resolveClassLoader(ldr, ctx.gridConfig());
@@ -529,12 +505,6 @@ public class GridDhtPartitionsFullMessage extends 
GridDhtPartitionsAbstractMessa
         if (partsBytes != null && parts == null)
             objectsToUnmarshall.add(partsBytes);
 
-        if (partCntrsBytes != null && partCntrs == null)
-            objectsToUnmarshall.add(partCntrsBytes);
-
-        if (partHistSuppliersBytes != null && partHistSuppliers == null)
-            objectsToUnmarshall.add(partHistSuppliersBytes);
-
         if (partsToReloadBytes != null && partsToReload == null)
             objectsToUnmarshall.add(partsToReloadBytes);
 
@@ -587,12 +557,6 @@ public class GridDhtPartitionsFullMessage extends 
GridDhtPartitionsAbstractMessa
             }
         }
 
-        if (partCntrsBytes != null && partCntrs == null)
-            partCntrs = (IgniteDhtPartitionCountersMap)iter.next();
-
-        if (partHistSuppliersBytes != null && partHistSuppliers == null)
-            partHistSuppliers = 
(IgniteDhtPartitionHistorySuppliersMap)iter.next();
-
         if (partsToReloadBytes != null && partsToReload == null)
             partsToReload = (IgniteDhtPartitionsToReloadMap)iter.next();
 
@@ -667,13 +631,13 @@ public class GridDhtPartitionsFullMessage extends 
GridDhtPartitionsAbstractMessa
                 writer.incrementState();
 
             case 12:
-                if (!writer.writeByteArray(partCntrsBytes))
+                if (!writer.writeMessage(partCntrs))
                     return false;
 
                 writer.incrementState();
 
             case 13:
-                if (!writer.writeByteArray(partHistSuppliersBytes))
+                if (!writer.writeMessage(partHistSuppliers))
                     return false;
 
                 writer.incrementState();
@@ -770,7 +734,7 @@ public class GridDhtPartitionsFullMessage extends 
GridDhtPartitionsAbstractMessa
                 reader.incrementState();
 
             case 12:
-                partCntrsBytes = reader.readByteArray();
+                partCntrs = reader.readMessage();
 
                 if (!reader.isLastRead())
                     return false;
@@ -778,7 +742,7 @@ public class GridDhtPartitionsFullMessage extends 
GridDhtPartitionsAbstractMessa
                 reader.incrementState();
 
             case 13:
-                partHistSuppliersBytes = reader.readByteArray();
+                partHistSuppliers = reader.readMessage();
 
                 if (!reader.isLastRead())
                     return false;
@@ -878,8 +842,6 @@ public class GridDhtPartitionsFullMessage extends 
GridDhtPartitionsAbstractMessa
     public void cleanUp() {
         partsBytes = null;
         partCntrs = null;
-        partCntrsBytes = null;
-        partHistSuppliersBytes = null;
         partsToReloadBytes = null;
         partsSizesBytes = null;
         errsBytes = null;
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GroupPartitionIdPair.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GroupPartitionIdPair.java
new file mode 100644
index 00000000000..738dc90fc29
--- /dev/null
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GroupPartitionIdPair.java
@@ -0,0 +1,101 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.cache.distributed.dht.preloader;
+
+import java.util.Objects;
+import org.apache.ignite.internal.Order;
+import org.apache.ignite.plugin.extensions.communication.Message;
+
+/** Pair of group ID and partition ID. */
+public class GroupPartitionIdPair implements Message {
+    /** Type code. */
+    public static final short TYPE_CODE = 508;
+
+    /** Group ID. */
+    @Order(value = 0, method = "groupId")
+    private int grpId;
+
+    /** Partition ID. */
+    @Order(value = 1, method = "partitionId")
+    private int partId;
+
+    /** Default constructor. */
+    public GroupPartitionIdPair() {
+        // No-op.
+    }
+
+    /**
+     * @param grpId Group ID.
+     * @param partId Partition ID.
+     */
+    public GroupPartitionIdPair(int grpId, int partId) {
+        this.grpId = grpId;
+        this.partId = partId;
+    }
+
+    /**
+     * @return Group ID.
+     */
+    public int groupId() {
+        return grpId;
+    }
+
+    /**
+     * @param grpId Group ID.
+     */
+    public void groupId(int grpId) {
+        this.grpId = grpId;
+    }
+
+    /**
+     * @return Partition ID.
+     */
+    public int partitionId() {
+        return partId;
+    }
+
+    /**
+     * @param partId Partition ID.
+     */
+    public void partitionId(int partId) {
+        this.partId = partId;
+    }
+
+    /** {@inheritDoc} */
+    @Override public short directType() {
+        return TYPE_CODE;
+    }
+
+    /** {@inheritDoc} */
+    @Override public boolean equals(Object o) {
+        if (this == o)
+            return true;
+
+        if (o == null || getClass() != o.getClass())
+            return false;
+
+        GroupPartitionIdPair that = (GroupPartitionIdPair)o;
+
+        return grpId == that.grpId && partId == that.partId;
+    }
+
+    /** {@inheritDoc} */
+    @Override public int hashCode() {
+        return Objects.hash(grpId, partId);
+    }
+}
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/IgniteDhtPartitionCountersMap.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/IgniteDhtPartitionCountersMap.java
index e7954d960ce..9f08178dae7 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/IgniteDhtPartitionCountersMap.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/IgniteDhtPartitionCountersMap.java
@@ -18,18 +18,20 @@
 
 package org.apache.ignite.internal.processors.cache.distributed.dht.preloader;
 
-import java.io.Serializable;
 import java.util.HashMap;
 import java.util.Map;
+import org.apache.ignite.internal.Order;
+import org.apache.ignite.plugin.extensions.communication.Message;
 
 /**
  * Partition counters map.
  */
-public class IgniteDhtPartitionCountersMap implements Serializable {
-    /** */
-    private static final long serialVersionUID = 0L;
+public class IgniteDhtPartitionCountersMap implements Message {
+    /** Type code. */
+    public static final short TYPE_CODE = 507;
 
     /** */
+    @Order(value = 0, method = "partitionCounters")
     private Map<Integer, CachePartitionFullCountersMap> map;
 
     /**
@@ -66,4 +68,23 @@ public class IgniteDhtPartitionCountersMap implements 
Serializable {
 
         return cntrMap;
     }
+
+    /**
+     * @return Partition counters map.
+     */
+    public Map<Integer, CachePartitionFullCountersMap> partitionCounters() {
+        return map;
+    }
+
+    /**
+     * @param map Partition counters map.
+     */
+    public void partitionCounters(Map<Integer, CachePartitionFullCountersMap> 
map) {
+        this.map = map;
+    }
+
+    /** {@inheritDoc} */
+    @Override public short directType() {
+        return TYPE_CODE;
+    }
 }
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/IgniteDhtPartitionHistorySuppliersMap.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/IgniteDhtPartitionHistorySuppliersMap.java
index 427aad8344e..dda003ffd33 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/IgniteDhtPartitionHistorySuppliersMap.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/IgniteDhtPartitionHistorySuppliersMap.java
@@ -18,29 +18,30 @@
 
 package org.apache.ignite.internal.processors.cache.distributed.dht.preloader;
 
-import java.io.Serializable;
 import java.util.ArrayList;
 import java.util.Collections;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 import java.util.UUID;
-import org.apache.ignite.internal.util.typedef.T2;
+import org.apache.ignite.internal.Order;
 import org.apache.ignite.internal.util.typedef.internal.S;
+import org.apache.ignite.plugin.extensions.communication.Message;
 import org.jetbrains.annotations.Nullable;
 
 /**
  *
  */
-public class IgniteDhtPartitionHistorySuppliersMap implements Serializable {
-    /** */
-    private static final long serialVersionUID = 0L;
+public class IgniteDhtPartitionHistorySuppliersMap implements Message {
+    /** Type code. */
+    public static final short TYPE_CODE = 510;
 
     /** */
     private static final IgniteDhtPartitionHistorySuppliersMap EMPTY = new 
IgniteDhtPartitionHistorySuppliersMap();
 
     /** */
-    private Map<UUID, Map<T2<Integer, Integer>, Long>> map;
+    @Order(value = 0, method = "historySuppliers")
+    private Map<UUID, PartitionReservationsMap> map;
 
     /**
      * @return Empty map.
@@ -61,10 +62,10 @@ public class IgniteDhtPartitionHistorySuppliersMap 
implements Serializable {
 
         List<UUID> suppliers = new ArrayList<>();
 
-        for (Map.Entry<UUID, Map<T2<Integer, Integer>, Long>> e : 
map.entrySet()) {
+        for (Map.Entry<UUID, PartitionReservationsMap> e : map.entrySet()) {
             UUID supplierNode = e.getKey();
 
-            Long historyCounter = e.getValue().get(new T2<>(grpId, partId));
+            Long historyCounter = e.getValue().get(new 
GroupPartitionIdPair(grpId, partId));
 
             if (historyCounter != null && historyCounter <= cntrSince)
                 suppliers.add(supplierNode);
@@ -77,7 +78,7 @@ public class IgniteDhtPartitionHistorySuppliersMap implements 
Serializable {
      * @param nodeId Node ID to check.
      * @return Reservations for the given node.
      */
-    @Nullable public synchronized Map<T2<Integer, Integer>, Long> 
getReservations(UUID nodeId) {
+    @Nullable public synchronized PartitionReservationsMap 
getReservations(UUID nodeId) {
         if (map == null)
             return null;
 
@@ -94,15 +95,9 @@ public class IgniteDhtPartitionHistorySuppliersMap 
implements Serializable {
         if (map == null)
             map = new HashMap<>();
 
-        Map<T2<Integer, Integer>, Long> nodeMap = map.get(nodeId);
-
-        if (nodeMap == null) {
-            nodeMap = new HashMap<>();
-
-            map.put(nodeId, nodeMap);
-        }
+        PartitionReservationsMap nodeMap = map.computeIfAbsent(nodeId, k -> 
new PartitionReservationsMap());
 
-        nodeMap.put(new T2<>(grpId, partId), cntr);
+        nodeMap.put(new GroupPartitionIdPair(grpId, partId), cntr);
     }
 
     /**
@@ -119,8 +114,27 @@ public class IgniteDhtPartitionHistorySuppliersMap 
implements Serializable {
         map = that.map;
     }
 
+    /**
+     * @return Partition history suppliers map.
+     */
+    public Map<UUID, PartitionReservationsMap> historySuppliers() {
+        return map;
+    }
+
+    /**
+     * @param map Partition history suppliers map.
+     */
+    public void historySuppliers(Map<UUID, PartitionReservationsMap> map) {
+        this.map = map;
+    }
+
     /** {@inheritDoc} */
     @Override public String toString() {
         return S.toString(IgniteDhtPartitionHistorySuppliersMap.class, this);
     }
+
+    /** {@inheritDoc} */
+    @Override public short directType() {
+        return TYPE_CODE;
+    }
 }
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/PartitionReservationsMap.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/PartitionReservationsMap.java
new file mode 100644
index 00000000000..3a37229db9e
--- /dev/null
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/PartitionReservationsMap.java
@@ -0,0 +1,75 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.cache.distributed.dht.preloader;
+
+import java.util.HashMap;
+import java.util.Map;
+import org.apache.ignite.internal.Order;
+import org.apache.ignite.plugin.extensions.communication.Message;
+import org.jetbrains.annotations.Nullable;
+
+/** Map for storing GroupPartitionIdPair and their respective history counter 
values. */
+public class PartitionReservationsMap implements Message {
+    /** Type code. */
+    public static final short TYPE_CODE = 509;
+
+    /** Mapping between GroupPartitionIdPair objects and their respective 
history counter values. */
+    @Order(value = 0, method = "reservations")
+    private Map<GroupPartitionIdPair, Long> map;
+
+    /**
+     * @return Partition reservations map.
+     */
+    public Map<GroupPartitionIdPair, Long> reservations() {
+        return map;
+    }
+
+    /**
+     * @param map Partition reservations map.
+     */
+    public void reservations(Map<GroupPartitionIdPair, Long> map) {
+        this.map = map;
+    }
+
+    /**
+     * @param pair Pair of group ID and partition ID.
+     * @return History counter for this pair or null.
+     */
+    public @Nullable Long get(GroupPartitionIdPair pair) {
+        if (map == null)
+            return null;
+
+        return map.get(pair);
+    }
+
+    /**
+     * @param pair Pair of group ID and partition ID.
+     * @param counter History counter for this pair.
+     */
+    public void put(GroupPartitionIdPair pair, Long counter) {
+        if (map == null)
+            map = new HashMap<>();
+
+        map.put(pair, counter);
+    }
+
+    /** {@inheritDoc} */
+    @Override public short directType() {
+        return TYPE_CODE;
+    }
+}
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/GridCacheDatabaseSharedManager.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/GridCacheDatabaseSharedManager.java
index f3cf2b9ab26..23fcaddfb60 100755
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/GridCacheDatabaseSharedManager.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/GridCacheDatabaseSharedManager.java
@@ -98,6 +98,7 @@ import 
org.apache.ignite.internal.processors.cache.GridCacheContext;
 import org.apache.ignite.internal.processors.cache.GridCacheSharedContext;
 import org.apache.ignite.internal.processors.cache.IgniteCacheOffheapManager;
 import 
org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPartitionsExchangeFuture;
+import 
org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GroupPartitionIdPair;
 import 
org.apache.ignite.internal.processors.cache.distributed.dht.topology.GridDhtLocalPartition;
 import 
org.apache.ignite.internal.processors.cache.distributed.dht.topology.GridDhtPartitionState;
 import 
org.apache.ignite.internal.processors.cache.persistence.checkpoint.CheckpointEntry;
@@ -1761,7 +1762,7 @@ public class GridCacheDatabaseSharedManager extends 
IgniteCacheDatabaseSharedMan
     }
 
     /** {@inheritDoc} */
-    @Override public boolean reserveHistoryForPreloading(Map<T2<Integer, 
Integer>, Long> reservationMap) {
+    @Override public boolean 
reserveHistoryForPreloading(Map<GroupPartitionIdPair, Long> reservationMap) {
         Map<GroupPartitionId, CheckpointEntry> entries = 
checkpointHistory().searchCheckpointEntry(reservationMap);
 
         if (F.isEmpty(entries))
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/IgniteCacheDatabaseSharedManager.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/IgniteCacheDatabaseSharedManager.java
index baeff9954fa..cae86127365 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/IgniteCacheDatabaseSharedManager.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/IgniteCacheDatabaseSharedManager.java
@@ -61,6 +61,7 @@ import 
org.apache.ignite.internal.processors.cache.CacheGroupContext;
 import org.apache.ignite.internal.processors.cache.GridCacheContext;
 import 
org.apache.ignite.internal.processors.cache.GridCacheSharedManagerAdapter;
 import 
org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPartitionsExchangeFuture;
+import 
org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GroupPartitionIdPair;
 import 
org.apache.ignite.internal.processors.cache.persistence.checkpoint.CheckpointMarkersStorage;
 import 
org.apache.ignite.internal.processors.cache.persistence.checkpoint.CheckpointProgress;
 import 
org.apache.ignite.internal.processors.cache.persistence.evict.FairFifoPageEvictionTracker;
@@ -81,7 +82,6 @@ import 
org.apache.ignite.internal.processors.cache.warmup.WarmUpStrategy;
 import 
org.apache.ignite.internal.processors.cluster.IgniteChangeGlobalStateSupport;
 import org.apache.ignite.internal.util.TimeBag;
 import org.apache.ignite.internal.util.typedef.F;
-import org.apache.ignite.internal.util.typedef.T2;
 import org.apache.ignite.internal.util.typedef.internal.CU;
 import org.apache.ignite.internal.util.typedef.internal.LT;
 import org.apache.ignite.internal.util.typedef.internal.U;
@@ -1153,7 +1153,7 @@ public class IgniteCacheDatabaseSharedManager extends 
GridCacheSharedManagerAdap
      * @param reservationMap Map contains of counters for partitions of groups.
      * @return True if successfully reserved.
      */
-    public boolean reserveHistoryForPreloading(Map<T2<Integer, Integer>, Long> 
reservationMap) {
+    public boolean reserveHistoryForPreloading(Map<GroupPartitionIdPair, Long> 
reservationMap) {
         return false;
     }
 
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/checkpoint/CheckpointHistory.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/checkpoint/CheckpointHistory.java
index 6b69bf02acb..b30cd6da5b5 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/checkpoint/CheckpointHistory.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/checkpoint/CheckpointHistory.java
@@ -39,6 +39,7 @@ import 
org.apache.ignite.configuration.DataStorageConfiguration;
 import org.apache.ignite.configuration.WALMode;
 import org.apache.ignite.internal.pagemem.wal.IgniteWriteAheadLogManager;
 import org.apache.ignite.internal.pagemem.wal.record.CacheState;
+import 
org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GroupPartitionIdPair;
 import 
org.apache.ignite.internal.processors.cache.persistence.checkpoint.CheckpointEntry.GroupState;
 import 
org.apache.ignite.internal.processors.cache.persistence.checkpoint.EarliestCheckpointMapSnapshot.GroupStateSnapshot;
 import 
org.apache.ignite.internal.processors.cache.persistence.partstate.GroupPartitionId;
@@ -646,16 +647,14 @@ public class CheckpointHistory {
     /**
      * Tries to search for a WAL pointer for the given partition counter start.
      *
-     * @param searchCntrMap Search map contains (Group Id, partition, counter).
+     * @param searchCntrMap Search map contains (Pair of group ID and 
partition ID, counter).
      * @return Map of group-partition on checkpoint entry or empty map if 
nothing found.
      */
-    public Map<GroupPartitionId, CheckpointEntry> searchCheckpointEntry(
-        Map<T2<Integer, Integer>, Long> searchCntrMap
-    ) {
+    public Map<GroupPartitionId, CheckpointEntry> 
searchCheckpointEntry(Map<GroupPartitionIdPair, Long> searchCntrMap) {
         if (F.isEmpty(searchCntrMap))
             return Collections.emptyMap();
 
-        Map<T2<Integer, Integer>, Long> modifiedSearchMap = new 
HashMap<>(searchCntrMap);
+        Map<GroupPartitionIdPair, Long> modifiedSearchMap = new 
HashMap<>(searchCntrMap);
 
         Map<GroupPartitionId, CheckpointEntry> res = new HashMap<>();
 
@@ -663,17 +662,17 @@ public class CheckpointHistory {
             try {
                 CheckpointEntry cpEntry = entry(cpTs);
 
-                Iterator<Map.Entry<T2<Integer, Integer>, Long>> iter = 
modifiedSearchMap.entrySet().iterator();
+                Iterator<Map.Entry<GroupPartitionIdPair, Long>> iter = 
modifiedSearchMap.entrySet().iterator();
 
                 while (iter.hasNext()) {
-                    Map.Entry<T2<Integer, Integer>, Long> entry = iter.next();
+                    Map.Entry<GroupPartitionIdPair, Long> entry = iter.next();
 
-                    Long foundCntr = cpEntry.partitionCounter(wal, 
entry.getKey().get1(), entry.getKey().get2());
+                    Long foundCntr = cpEntry.partitionCounter(wal, 
entry.getKey().groupId(), entry.getKey().partitionId());
 
                     if (foundCntr != null && foundCntr <= entry.getValue()) {
                         iter.remove();
 
-                        res.put(new GroupPartitionId(entry.getKey().get1(), 
entry.getKey().get2()), cpEntry);
+                        res.put(new GroupPartitionId(entry.getKey().groupId(), 
entry.getKey().partitionId()), cpEntry);
                     }
                 }
 
diff --git 
a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/CacheExchangeMessageDuplicatedStateTest.java
 
b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/CacheExchangeMessageDuplicatedStateTest.java
index 78db7aa4d2e..abbb7269066 100644
--- 
a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/CacheExchangeMessageDuplicatedStateTest.java
+++ 
b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/CacheExchangeMessageDuplicatedStateTest.java
@@ -234,8 +234,8 @@ public class CacheExchangeMessageDuplicatedStateTest 
extends GridCommonAbstractT
 
         if (partCntrs != null) {
             for (CachePartitionFullCountersMap cntrs : partCntrs.values()) {
-                long[] initUpdCntrs = getFieldValue(cntrs, "initialUpdCntrs");
-                long[] updCntrs = getFieldValue(cntrs, "updCntrs");
+                long[] initUpdCntrs = cntrs.initialUpdateCounters();
+                long[] updCntrs = cntrs.updateCounters();
 
                 for (int i = 0; i < initUpdCntrs.length; i++) {
                     assertEquals(0, initUpdCntrs[i]);
diff --git 
a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/WalPreloadingConcurrentTest.java
 
b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/WalPreloadingConcurrentTest.java
index 89789d90b18..bc25ce8c9e3 100644
--- 
a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/WalPreloadingConcurrentTest.java
+++ 
b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/WalPreloadingConcurrentTest.java
@@ -33,7 +33,7 @@ import org.apache.ignite.configuration.WALMode;
 import org.apache.ignite.internal.IgniteEx;
 import org.apache.ignite.internal.IgniteInternalFuture;
 import org.apache.ignite.internal.processors.cache.IgniteInternalCache;
-import org.apache.ignite.internal.util.typedef.T2;
+import 
org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GroupPartitionIdPair;
 import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest;
 import org.junit.Test;
 
@@ -118,7 +118,7 @@ public class WalPreloadingConcurrentTest extends 
GridCommonAbstractTest {
 
                 while (!stop.get()) {
                     db.reserveHistoryForPreloading(Collections.singletonMap(
-                        new T2<Integer, Integer>(cache.context().groupId(), 
randomPart),
+                        new GroupPartitionIdPair(cache.context().groupId(), 
randomPart),
                         0L
                     ));
                 }


Reply via email to