This is an automated email from the ASF dual-hosted git repository.
sergeychugunov pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/ignite.git
The following commit(s) were added to refs/heads/master by this push:
new f935f99c09a IGNITE-26839 Add Message interface to
IgniteDhtPartitionCountersMap (#12467)
f935f99c09a is described below
commit f935f99c09a067d90c0b624285c6334425bf0094
Author: Dmitry Werner <[email protected]>
AuthorDate: Sat Nov 1 17:28:31 2025 +0500
IGNITE-26839 Add Message interface to IgniteDhtPartitionCountersMap
(#12467)
---
.../communication/GridIoMessageFactory.java | 19 +++-
.../preloader/CachePartitionFullCountersMap.java | 67 +++++++++++---
.../preloader/GridDhtPartitionsExchangeFuture.java | 4 +-
.../preloader/GridDhtPartitionsFullMessage.java | 50 ++--------
.../dht/preloader/GroupPartitionIdPair.java | 101 +++++++++++++++++++++
.../preloader/IgniteDhtPartitionCountersMap.java | 29 +++++-
.../IgniteDhtPartitionHistorySuppliersMap.java | 48 ++++++----
.../dht/preloader/PartitionReservationsMap.java | 75 +++++++++++++++
.../GridCacheDatabaseSharedManager.java | 3 +-
.../IgniteCacheDatabaseSharedManager.java | 4 +-
.../persistence/checkpoint/CheckpointHistory.java | 17 ++--
.../CacheExchangeMessageDuplicatedStateTest.java | 4 +-
.../persistence/WalPreloadingConcurrentTest.java | 4 +-
13 files changed, 329 insertions(+), 96 deletions(-)
diff --git
a/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoMessageFactory.java
b/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoMessageFactory.java
index ef9c9f1d733..09064df08e8 100644
---
a/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoMessageFactory.java
+++
b/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoMessageFactory.java
@@ -33,6 +33,7 @@ import
org.apache.ignite.internal.codegen.CacheEntryPredicateAdapterSerializer;
import org.apache.ignite.internal.codegen.CacheEvictionEntrySerializer;
import org.apache.ignite.internal.codegen.CacheGroupAffinityMessageSerializer;
import org.apache.ignite.internal.codegen.CacheInvokeDirectResultSerializer;
+import
org.apache.ignite.internal.codegen.CachePartitionFullCountersMapSerializer;
import
org.apache.ignite.internal.codegen.CachePartitionPartialCountersMapSerializer;
import org.apache.ignite.internal.codegen.CacheVersionedValueSerializer;
import
org.apache.ignite.internal.codegen.CacheWriteSynchronizationModeMessageSerializer;
@@ -101,9 +102,12 @@ import
org.apache.ignite.internal.codegen.GridTaskCancelRequestSerializer;
import org.apache.ignite.internal.codegen.GridTaskResultRequestSerializer;
import org.apache.ignite.internal.codegen.GridTaskResultResponseSerializer;
import org.apache.ignite.internal.codegen.GridTaskSessionRequestSerializer;
+import org.apache.ignite.internal.codegen.GroupPartitionIdPairSerializer;
import org.apache.ignite.internal.codegen.HandshakeMessageSerializer;
import org.apache.ignite.internal.codegen.HandshakeWaitMessageSerializer;
import
org.apache.ignite.internal.codegen.IgniteDhtDemandedPartitionsMapSerializer;
+import
org.apache.ignite.internal.codegen.IgniteDhtPartitionCountersMapSerializer;
+import
org.apache.ignite.internal.codegen.IgniteDhtPartitionHistorySuppliersMapSerializer;
import org.apache.ignite.internal.codegen.IgniteTxKeySerializer;
import
org.apache.ignite.internal.codegen.IncrementalSnapshotAwareMessageSerializer;
import org.apache.ignite.internal.codegen.JobStealingRequestSerializer;
@@ -114,6 +118,7 @@ import
org.apache.ignite.internal.codegen.MissingMappingRequestMessageSerializer
import
org.apache.ignite.internal.codegen.MissingMappingResponseMessageSerializer;
import org.apache.ignite.internal.codegen.NearCacheUpdatesSerializer;
import org.apache.ignite.internal.codegen.NodeIdMessageSerializer;
+import org.apache.ignite.internal.codegen.PartitionReservationsMapSerializer;
import
org.apache.ignite.internal.codegen.RecoveryLastReceivedMessageSerializer;
import
org.apache.ignite.internal.codegen.SchemaOperationStatusMessageSerializer;
import org.apache.ignite.internal.codegen.ServiceDeploymentProcessIdSerializer;
@@ -192,6 +197,7 @@ import
org.apache.ignite.internal.processors.cache.distributed.dht.atomic.GridNe
import
org.apache.ignite.internal.processors.cache.distributed.dht.atomic.NearCacheUpdates;
import
org.apache.ignite.internal.processors.cache.distributed.dht.atomic.UpdateErrors;
import
org.apache.ignite.internal.processors.cache.distributed.dht.preloader.CacheGroupAffinityMessage;
+import
org.apache.ignite.internal.processors.cache.distributed.dht.preloader.CachePartitionFullCountersMap;
import
org.apache.ignite.internal.processors.cache.distributed.dht.preloader.CachePartitionPartialCountersMap;
import
org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtForceKeysRequest;
import
org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtForceKeysResponse;
@@ -201,7 +207,11 @@ import
org.apache.ignite.internal.processors.cache.distributed.dht.preloader.Gri
import
org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPartitionsFullMessage;
import
org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPartitionsSingleMessage;
import
org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPartitionsSingleRequest;
+import
org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GroupPartitionIdPair;
import
org.apache.ignite.internal.processors.cache.distributed.dht.preloader.IgniteDhtDemandedPartitionsMap;
+import
org.apache.ignite.internal.processors.cache.distributed.dht.preloader.IgniteDhtPartitionCountersMap;
+import
org.apache.ignite.internal.processors.cache.distributed.dht.preloader.IgniteDhtPartitionHistorySuppliersMap;
+import
org.apache.ignite.internal.processors.cache.distributed.dht.preloader.PartitionReservationsMap;
import
org.apache.ignite.internal.processors.cache.distributed.dht.preloader.latch.LatchAckMessage;
import
org.apache.ignite.internal.processors.cache.distributed.near.CacheVersionedValue;
import
org.apache.ignite.internal.processors.cache.distributed.near.GridNearGetRequest;
@@ -435,7 +445,6 @@ public class GridIoMessageFactory implements
MessageFactoryProvider {
factory.register(StatisticsRequest.TYPE_CODE, StatisticsRequest::new);
factory.register(StatisticsResponse.TYPE_CODE,
StatisticsResponse::new);
- // Enums
factory.register(CachePartitionPartialCountersMap.TYPE_CODE,
CachePartitionPartialCountersMap::new,
new CachePartitionPartialCountersMapSerializer());
factory.register(IgniteDhtDemandedPartitionsMap.TYPE_CODE,
IgniteDhtDemandedPartitionsMap::new,
@@ -447,6 +456,14 @@ public class GridIoMessageFactory implements
MessageFactoryProvider {
factory.register(GridCacheOperationMessage.TYPE_CODE,
GridCacheOperationMessage::new, new GridCacheOperationMessageSerializer());
factory.register(BinaryMetadataVersionInfo.TYPE_CODE,
BinaryMetadataVersionInfo::new,
new BinaryMetadataVersionInfoSerializer());
+ factory.register(CachePartitionFullCountersMap.TYPE_CODE,
CachePartitionFullCountersMap::new,
+ new CachePartitionFullCountersMapSerializer());
+ factory.register(IgniteDhtPartitionCountersMap.TYPE_CODE,
IgniteDhtPartitionCountersMap::new,
+ new IgniteDhtPartitionCountersMapSerializer());
+ factory.register(GroupPartitionIdPair.TYPE_CODE,
GroupPartitionIdPair::new, new GroupPartitionIdPairSerializer());
+ factory.register(PartitionReservationsMap.TYPE_CODE,
PartitionReservationsMap::new, new PartitionReservationsMapSerializer());
+ factory.register(IgniteDhtPartitionHistorySuppliersMap.TYPE_CODE,
IgniteDhtPartitionHistorySuppliersMap::new,
+ new IgniteDhtPartitionHistorySuppliersMapSerializer());
// [-3..119] [124..129] [-23..-28] [-36..-55] [183..188] - this
// [120..123] - DR
diff --git
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/CachePartitionFullCountersMap.java
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/CachePartitionFullCountersMap.java
index 80858a8a106..d5fb7ed1413 100644
---
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/CachePartitionFullCountersMap.java
+++
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/CachePartitionFullCountersMap.java
@@ -17,30 +17,40 @@
package org.apache.ignite.internal.processors.cache.distributed.dht.preloader;
-import java.io.Serializable;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashSet;
import java.util.Set;
+import org.apache.ignite.internal.Order;
+import org.apache.ignite.plugin.extensions.communication.Message;
/**
*
*/
-public class CachePartitionFullCountersMap implements Serializable {
- /** */
- private static final long serialVersionUID = 0L;
+public class CachePartitionFullCountersMap implements Message {
+ /** Type code. */
+ public static final short TYPE_CODE = 506;
/** */
- private long[] initialUpdCntrs;
+ @Order(value = 0, method = "initialUpdateCounters")
+ private long[] initUpdCntrs;
/** */
+ @Order(value = 1, method = "updateCounters")
private long[] updCntrs;
+ /**
+ * Default constructor.
+ */
+ public CachePartitionFullCountersMap() {
+ // No-op.
+ }
+
/**
* @param other Map to copy.
*/
public CachePartitionFullCountersMap(CachePartitionFullCountersMap other) {
- initialUpdCntrs = Arrays.copyOf(other.initialUpdCntrs,
other.initialUpdCntrs.length);
+ initUpdCntrs = Arrays.copyOf(other.initUpdCntrs,
other.initUpdCntrs.length);
updCntrs = Arrays.copyOf(other.updCntrs, other.updCntrs.length);
}
@@ -48,7 +58,7 @@ public class CachePartitionFullCountersMap implements
Serializable {
* @param partsCnt Total number of partitions.
*/
public CachePartitionFullCountersMap(int partsCnt) {
- initialUpdCntrs = new long[partsCnt];
+ initUpdCntrs = new long[partsCnt];
updCntrs = new long[partsCnt];
}
@@ -59,7 +69,7 @@ public class CachePartitionFullCountersMap implements
Serializable {
* @return Initial update counter for the partition with the given ID.
*/
public long initialUpdateCounter(int p) {
- return initialUpdCntrs[p];
+ return initUpdCntrs[p];
}
/**
@@ -76,10 +86,10 @@ public class CachePartitionFullCountersMap implements
Serializable {
* Sets an initial update counter by the partition ID.
*
* @param p Partition ID.
- * @param initialUpdCntr Initial update counter to set.
+ * @param initUpdCntr Initial update counter to set.
*/
- public void initialUpdateCounter(int p, long initialUpdCntr) {
- initialUpdCntrs[p] = initialUpdCntr;
+ public void initialUpdateCounter(int p, long initUpdCntr) {
+ initUpdCntrs[p] = initUpdCntr;
}
/**
@@ -108,7 +118,40 @@ public class CachePartitionFullCountersMap implements
Serializable {
* Clears full counters map.
*/
public void clear() {
- Arrays.fill(initialUpdCntrs, 0);
+ Arrays.fill(initUpdCntrs, 0);
Arrays.fill(updCntrs, 0);
}
+
+ /**
+ * @return Initial update counters.
+ */
+ public long[] initialUpdateCounters() {
+ return initUpdCntrs;
+ }
+
+ /**
+ * @param initUpdCntrs Initial update counters.
+ */
+ public void initialUpdateCounters(long[] initUpdCntrs) {
+ this.initUpdCntrs = initUpdCntrs;
+ }
+
+ /**
+ * @return Update counters.
+ */
+ public long[] updateCounters() {
+ return updCntrs;
+ }
+
+ /**
+ * @param updCntrs Update counters.
+ */
+ public void updateCounters(long[] updCntrs) {
+ this.updCntrs = updCntrs;
+ }
+
+ /** {@inheritDoc} */
+ @Override public short directType() {
+ return TYPE_CODE;
+ }
}
diff --git
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java
index 35ac04f6385..0a1b955f0d0 100644
---
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java
+++
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java
@@ -2461,10 +2461,10 @@ public class GridDhtPartitionsExchangeFuture extends
GridDhtTopologyFutureAdapte
// Create and destroy caches and cache proxies.
cctx.cache().onExchangeDone(this, err);
- Map<T2<Integer, Integer>, Long> locReserved =
partHistSuppliers.getReservations(cctx.localNodeId());
+ PartitionReservationsMap locReserved =
partHistSuppliers.getReservations(cctx.localNodeId());
if (locReserved != null) {
- boolean success =
cctx.database().reserveHistoryForPreloading(locReserved);
+ boolean success =
cctx.database().reserveHistoryForPreloading(locReserved.reservations());
if (!success) {
log.warning("Could not reserve history for historical
rebalance " +
diff --git
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsFullMessage.java
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsFullMessage.java
index 926a77c7644..469a85140f7 100644
---
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsFullMessage.java
+++
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsFullMessage.java
@@ -75,20 +75,12 @@ public class GridDhtPartitionsFullMessage extends
GridDhtPartitionsAbstractMessa
/** Partitions update counters. */
@GridToStringInclude
- @GridDirectTransient
private IgniteDhtPartitionCountersMap partCntrs;
- /** Serialized partitions counters. */
- private byte[] partCntrsBytes;
-
/** Partitions history suppliers. */
@GridToStringInclude
- @GridDirectTransient
private IgniteDhtPartitionHistorySuppliersMap partHistSuppliers;
- /** Serialized partitions history suppliers. */
- private byte[] partHistSuppliersBytes;
-
/** Partitions that must be cleared and re-loaded. */
@GridToStringInclude
@GridDirectTransient
@@ -184,9 +176,7 @@ public class GridDhtPartitionsFullMessage extends
GridDhtPartitionsAbstractMessa
cp.dupPartsData = dupPartsData;
cp.partsBytes = partsBytes;
cp.partCntrs = partCntrs;
- cp.partCntrsBytes = partCntrsBytes;
cp.partHistSuppliers = partHistSuppliers;
- cp.partHistSuppliersBytes = partHistSuppliersBytes;
cp.partsToReload = partsToReload;
cp.partsToReloadBytes = partsToReloadBytes;
cp.partsSizesBytes = partsSizesBytes;
@@ -437,12 +427,10 @@ public class GridDhtPartitionsFullMessage extends
GridDhtPartitionsAbstractMessa
}
/** {@inheritDoc} */
- @Override public void prepareMarshal(GridCacheSharedContext ctx) throws
IgniteCheckedException {
+ @Override public void prepareMarshal(GridCacheSharedContext<?, ?> ctx)
throws IgniteCheckedException {
super.prepareMarshal(ctx);
boolean marshal = (!F.isEmpty(parts) && partsBytes == null) ||
- (partCntrs != null && !partCntrs.empty() && partCntrsBytes ==
null) ||
- (partHistSuppliers != null && partHistSuppliersBytes == null) ||
(partsToReload != null && partsToReloadBytes == null) ||
(!F.isEmpty(errs) && errsBytes == null);
@@ -455,12 +443,6 @@ public class GridDhtPartitionsFullMessage extends
GridDhtPartitionsAbstractMessa
if (!F.isEmpty(parts) && partsBytes == null)
objectsToMarshall.add(parts);
- if (partCntrs != null && !partCntrs.empty() && partCntrsBytes ==
null)
- objectsToMarshall.add(partCntrs);
-
- if (partHistSuppliers != null && partHistSuppliersBytes == null)
- objectsToMarshall.add(partHistSuppliers);
-
if (partsToReload != null && partsToReloadBytes == null)
objectsToMarshall.add(partsToReload);
@@ -487,12 +469,6 @@ public class GridDhtPartitionsFullMessage extends
GridDhtPartitionsAbstractMessa
if (!F.isEmpty(parts) && partsBytes == null)
partsBytes = iter.next();
- if (partCntrs != null && !partCntrs.empty() && partCntrsBytes ==
null)
- partCntrsBytes = iter.next();
-
- if (partHistSuppliers != null && partHistSuppliersBytes == null)
- partHistSuppliersBytes = iter.next();
-
if (partsToReload != null && partsToReloadBytes == null)
partsToReloadBytes = iter.next();
@@ -516,7 +492,7 @@ public class GridDhtPartitionsFullMessage extends
GridDhtPartitionsAbstractMessa
}
/** {@inheritDoc} */
- @Override public void finishUnmarshal(GridCacheSharedContext ctx,
ClassLoader ldr) throws IgniteCheckedException {
+ @Override public void finishUnmarshal(GridCacheSharedContext<?, ?> ctx,
ClassLoader ldr) throws IgniteCheckedException {
super.finishUnmarshal(ctx, ldr);
ClassLoader clsLdr = U.resolveClassLoader(ldr, ctx.gridConfig());
@@ -529,12 +505,6 @@ public class GridDhtPartitionsFullMessage extends
GridDhtPartitionsAbstractMessa
if (partsBytes != null && parts == null)
objectsToUnmarshall.add(partsBytes);
- if (partCntrsBytes != null && partCntrs == null)
- objectsToUnmarshall.add(partCntrsBytes);
-
- if (partHistSuppliersBytes != null && partHistSuppliers == null)
- objectsToUnmarshall.add(partHistSuppliersBytes);
-
if (partsToReloadBytes != null && partsToReload == null)
objectsToUnmarshall.add(partsToReloadBytes);
@@ -587,12 +557,6 @@ public class GridDhtPartitionsFullMessage extends
GridDhtPartitionsAbstractMessa
}
}
- if (partCntrsBytes != null && partCntrs == null)
- partCntrs = (IgniteDhtPartitionCountersMap)iter.next();
-
- if (partHistSuppliersBytes != null && partHistSuppliers == null)
- partHistSuppliers =
(IgniteDhtPartitionHistorySuppliersMap)iter.next();
-
if (partsToReloadBytes != null && partsToReload == null)
partsToReload = (IgniteDhtPartitionsToReloadMap)iter.next();
@@ -667,13 +631,13 @@ public class GridDhtPartitionsFullMessage extends
GridDhtPartitionsAbstractMessa
writer.incrementState();
case 12:
- if (!writer.writeByteArray(partCntrsBytes))
+ if (!writer.writeMessage(partCntrs))
return false;
writer.incrementState();
case 13:
- if (!writer.writeByteArray(partHistSuppliersBytes))
+ if (!writer.writeMessage(partHistSuppliers))
return false;
writer.incrementState();
@@ -770,7 +734,7 @@ public class GridDhtPartitionsFullMessage extends
GridDhtPartitionsAbstractMessa
reader.incrementState();
case 12:
- partCntrsBytes = reader.readByteArray();
+ partCntrs = reader.readMessage();
if (!reader.isLastRead())
return false;
@@ -778,7 +742,7 @@ public class GridDhtPartitionsFullMessage extends
GridDhtPartitionsAbstractMessa
reader.incrementState();
case 13:
- partHistSuppliersBytes = reader.readByteArray();
+ partHistSuppliers = reader.readMessage();
if (!reader.isLastRead())
return false;
@@ -878,8 +842,6 @@ public class GridDhtPartitionsFullMessage extends
GridDhtPartitionsAbstractMessa
public void cleanUp() {
partsBytes = null;
partCntrs = null;
- partCntrsBytes = null;
- partHistSuppliersBytes = null;
partsToReloadBytes = null;
partsSizesBytes = null;
errsBytes = null;
diff --git
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GroupPartitionIdPair.java
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GroupPartitionIdPair.java
new file mode 100644
index 00000000000..738dc90fc29
--- /dev/null
+++
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GroupPartitionIdPair.java
@@ -0,0 +1,101 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.cache.distributed.dht.preloader;
+
+import java.util.Objects;
+import org.apache.ignite.internal.Order;
+import org.apache.ignite.plugin.extensions.communication.Message;
+
+/** Pair of group ID and partition ID. */
+public class GroupPartitionIdPair implements Message {
+ /** Type code. */
+ public static final short TYPE_CODE = 508;
+
+ /** Group ID. */
+ @Order(value = 0, method = "groupId")
+ private int grpId;
+
+ /** Partition ID. */
+ @Order(value = 1, method = "partitionId")
+ private int partId;
+
+ /** Default constructor. */
+ public GroupPartitionIdPair() {
+ // No-op.
+ }
+
+ /**
+ * @param grpId Group ID.
+ * @param partId Partition ID.
+ */
+ public GroupPartitionIdPair(int grpId, int partId) {
+ this.grpId = grpId;
+ this.partId = partId;
+ }
+
+ /**
+ * @return Group ID.
+ */
+ public int groupId() {
+ return grpId;
+ }
+
+ /**
+ * @param grpId Group ID.
+ */
+ public void groupId(int grpId) {
+ this.grpId = grpId;
+ }
+
+ /**
+ * @return Partition ID.
+ */
+ public int partitionId() {
+ return partId;
+ }
+
+ /**
+ * @param partId Partition ID.
+ */
+ public void partitionId(int partId) {
+ this.partId = partId;
+ }
+
+ /** {@inheritDoc} */
+ @Override public short directType() {
+ return TYPE_CODE;
+ }
+
+ /** {@inheritDoc} */
+ @Override public boolean equals(Object o) {
+ if (this == o)
+ return true;
+
+ if (o == null || getClass() != o.getClass())
+ return false;
+
+ GroupPartitionIdPair that = (GroupPartitionIdPair)o;
+
+ return grpId == that.grpId && partId == that.partId;
+ }
+
+ /** {@inheritDoc} */
+ @Override public int hashCode() {
+ return Objects.hash(grpId, partId);
+ }
+}
diff --git
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/IgniteDhtPartitionCountersMap.java
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/IgniteDhtPartitionCountersMap.java
index e7954d960ce..9f08178dae7 100644
---
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/IgniteDhtPartitionCountersMap.java
+++
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/IgniteDhtPartitionCountersMap.java
@@ -18,18 +18,20 @@
package org.apache.ignite.internal.processors.cache.distributed.dht.preloader;
-import java.io.Serializable;
import java.util.HashMap;
import java.util.Map;
+import org.apache.ignite.internal.Order;
+import org.apache.ignite.plugin.extensions.communication.Message;
/**
* Partition counters map.
*/
-public class IgniteDhtPartitionCountersMap implements Serializable {
- /** */
- private static final long serialVersionUID = 0L;
+public class IgniteDhtPartitionCountersMap implements Message {
+ /** Type code. */
+ public static final short TYPE_CODE = 507;
/** */
+ @Order(value = 0, method = "partitionCounters")
private Map<Integer, CachePartitionFullCountersMap> map;
/**
@@ -66,4 +68,23 @@ public class IgniteDhtPartitionCountersMap implements
Serializable {
return cntrMap;
}
+
+ /**
+ * @return Partition counters map.
+ */
+ public Map<Integer, CachePartitionFullCountersMap> partitionCounters() {
+ return map;
+ }
+
+ /**
+ * @param map Partition counters map.
+ */
+ public void partitionCounters(Map<Integer, CachePartitionFullCountersMap>
map) {
+ this.map = map;
+ }
+
+ /** {@inheritDoc} */
+ @Override public short directType() {
+ return TYPE_CODE;
+ }
}
diff --git
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/IgniteDhtPartitionHistorySuppliersMap.java
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/IgniteDhtPartitionHistorySuppliersMap.java
index 427aad8344e..dda003ffd33 100644
---
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/IgniteDhtPartitionHistorySuppliersMap.java
+++
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/IgniteDhtPartitionHistorySuppliersMap.java
@@ -18,29 +18,30 @@
package org.apache.ignite.internal.processors.cache.distributed.dht.preloader;
-import java.io.Serializable;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.UUID;
-import org.apache.ignite.internal.util.typedef.T2;
+import org.apache.ignite.internal.Order;
import org.apache.ignite.internal.util.typedef.internal.S;
+import org.apache.ignite.plugin.extensions.communication.Message;
import org.jetbrains.annotations.Nullable;
/**
*
*/
-public class IgniteDhtPartitionHistorySuppliersMap implements Serializable {
- /** */
- private static final long serialVersionUID = 0L;
+public class IgniteDhtPartitionHistorySuppliersMap implements Message {
+ /** Type code. */
+ public static final short TYPE_CODE = 510;
/** */
private static final IgniteDhtPartitionHistorySuppliersMap EMPTY = new
IgniteDhtPartitionHistorySuppliersMap();
/** */
- private Map<UUID, Map<T2<Integer, Integer>, Long>> map;
+ @Order(value = 0, method = "historySuppliers")
+ private Map<UUID, PartitionReservationsMap> map;
/**
* @return Empty map.
@@ -61,10 +62,10 @@ public class IgniteDhtPartitionHistorySuppliersMap
implements Serializable {
List<UUID> suppliers = new ArrayList<>();
- for (Map.Entry<UUID, Map<T2<Integer, Integer>, Long>> e :
map.entrySet()) {
+ for (Map.Entry<UUID, PartitionReservationsMap> e : map.entrySet()) {
UUID supplierNode = e.getKey();
- Long historyCounter = e.getValue().get(new T2<>(grpId, partId));
+ Long historyCounter = e.getValue().get(new
GroupPartitionIdPair(grpId, partId));
if (historyCounter != null && historyCounter <= cntrSince)
suppliers.add(supplierNode);
@@ -77,7 +78,7 @@ public class IgniteDhtPartitionHistorySuppliersMap implements
Serializable {
* @param nodeId Node ID to check.
* @return Reservations for the given node.
*/
- @Nullable public synchronized Map<T2<Integer, Integer>, Long>
getReservations(UUID nodeId) {
+ @Nullable public synchronized PartitionReservationsMap
getReservations(UUID nodeId) {
if (map == null)
return null;
@@ -94,15 +95,9 @@ public class IgniteDhtPartitionHistorySuppliersMap
implements Serializable {
if (map == null)
map = new HashMap<>();
- Map<T2<Integer, Integer>, Long> nodeMap = map.get(nodeId);
-
- if (nodeMap == null) {
- nodeMap = new HashMap<>();
-
- map.put(nodeId, nodeMap);
- }
+ PartitionReservationsMap nodeMap = map.computeIfAbsent(nodeId, k ->
new PartitionReservationsMap());
- nodeMap.put(new T2<>(grpId, partId), cntr);
+ nodeMap.put(new GroupPartitionIdPair(grpId, partId), cntr);
}
/**
@@ -119,8 +114,27 @@ public class IgniteDhtPartitionHistorySuppliersMap
implements Serializable {
map = that.map;
}
+ /**
+ * @return Partition history suppliers map.
+ */
+ public Map<UUID, PartitionReservationsMap> historySuppliers() {
+ return map;
+ }
+
+ /**
+ * @param map Partition history suppliers map.
+ */
+ public void historySuppliers(Map<UUID, PartitionReservationsMap> map) {
+ this.map = map;
+ }
+
/** {@inheritDoc} */
@Override public String toString() {
return S.toString(IgniteDhtPartitionHistorySuppliersMap.class, this);
}
+
+ /** {@inheritDoc} */
+ @Override public short directType() {
+ return TYPE_CODE;
+ }
}
diff --git
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/PartitionReservationsMap.java
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/PartitionReservationsMap.java
new file mode 100644
index 00000000000..3a37229db9e
--- /dev/null
+++
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/PartitionReservationsMap.java
@@ -0,0 +1,75 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.cache.distributed.dht.preloader;
+
+import java.util.HashMap;
+import java.util.Map;
+import org.apache.ignite.internal.Order;
+import org.apache.ignite.plugin.extensions.communication.Message;
+import org.jetbrains.annotations.Nullable;
+
+/** Map for storing GroupPartitionIdPair and their respective history counter
values. */
+public class PartitionReservationsMap implements Message {
+ /** Type code. */
+ public static final short TYPE_CODE = 509;
+
+ /** Mapping between GroupPartitionIdPair objects and their respective
history counter values. */
+ @Order(value = 0, method = "reservations")
+ private Map<GroupPartitionIdPair, Long> map;
+
+ /**
+ * @return Partition reservations map.
+ */
+ public Map<GroupPartitionIdPair, Long> reservations() {
+ return map;
+ }
+
+ /**
+ * @param map Partition reservations map.
+ */
+ public void reservations(Map<GroupPartitionIdPair, Long> map) {
+ this.map = map;
+ }
+
+ /**
+ * @param pair Pair of group ID and partition ID.
+ * @return History counter for this pair or null.
+ */
+ public @Nullable Long get(GroupPartitionIdPair pair) {
+ if (map == null)
+ return null;
+
+ return map.get(pair);
+ }
+
+ /**
+ * @param pair Pair of group ID and partition ID.
+ * @param counter History counter for this pair.
+ */
+ public void put(GroupPartitionIdPair pair, Long counter) {
+ if (map == null)
+ map = new HashMap<>();
+
+ map.put(pair, counter);
+ }
+
+ /** {@inheritDoc} */
+ @Override public short directType() {
+ return TYPE_CODE;
+ }
+}
diff --git
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/GridCacheDatabaseSharedManager.java
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/GridCacheDatabaseSharedManager.java
index f3cf2b9ab26..23fcaddfb60 100755
---
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/GridCacheDatabaseSharedManager.java
+++
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/GridCacheDatabaseSharedManager.java
@@ -98,6 +98,7 @@ import
org.apache.ignite.internal.processors.cache.GridCacheContext;
import org.apache.ignite.internal.processors.cache.GridCacheSharedContext;
import org.apache.ignite.internal.processors.cache.IgniteCacheOffheapManager;
import
org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPartitionsExchangeFuture;
+import
org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GroupPartitionIdPair;
import
org.apache.ignite.internal.processors.cache.distributed.dht.topology.GridDhtLocalPartition;
import
org.apache.ignite.internal.processors.cache.distributed.dht.topology.GridDhtPartitionState;
import
org.apache.ignite.internal.processors.cache.persistence.checkpoint.CheckpointEntry;
@@ -1761,7 +1762,7 @@ public class GridCacheDatabaseSharedManager extends
IgniteCacheDatabaseSharedMan
}
/** {@inheritDoc} */
- @Override public boolean reserveHistoryForPreloading(Map<T2<Integer,
Integer>, Long> reservationMap) {
+ @Override public boolean
reserveHistoryForPreloading(Map<GroupPartitionIdPair, Long> reservationMap) {
Map<GroupPartitionId, CheckpointEntry> entries =
checkpointHistory().searchCheckpointEntry(reservationMap);
if (F.isEmpty(entries))
diff --git
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/IgniteCacheDatabaseSharedManager.java
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/IgniteCacheDatabaseSharedManager.java
index baeff9954fa..cae86127365 100644
---
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/IgniteCacheDatabaseSharedManager.java
+++
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/IgniteCacheDatabaseSharedManager.java
@@ -61,6 +61,7 @@ import
org.apache.ignite.internal.processors.cache.CacheGroupContext;
import org.apache.ignite.internal.processors.cache.GridCacheContext;
import
org.apache.ignite.internal.processors.cache.GridCacheSharedManagerAdapter;
import
org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPartitionsExchangeFuture;
+import
org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GroupPartitionIdPair;
import
org.apache.ignite.internal.processors.cache.persistence.checkpoint.CheckpointMarkersStorage;
import
org.apache.ignite.internal.processors.cache.persistence.checkpoint.CheckpointProgress;
import
org.apache.ignite.internal.processors.cache.persistence.evict.FairFifoPageEvictionTracker;
@@ -81,7 +82,6 @@ import
org.apache.ignite.internal.processors.cache.warmup.WarmUpStrategy;
import
org.apache.ignite.internal.processors.cluster.IgniteChangeGlobalStateSupport;
import org.apache.ignite.internal.util.TimeBag;
import org.apache.ignite.internal.util.typedef.F;
-import org.apache.ignite.internal.util.typedef.T2;
import org.apache.ignite.internal.util.typedef.internal.CU;
import org.apache.ignite.internal.util.typedef.internal.LT;
import org.apache.ignite.internal.util.typedef.internal.U;
@@ -1153,7 +1153,7 @@ public class IgniteCacheDatabaseSharedManager extends
GridCacheSharedManagerAdap
* @param reservationMap Map contains of counters for partitions of groups.
* @return True if successfully reserved.
*/
- public boolean reserveHistoryForPreloading(Map<T2<Integer, Integer>, Long>
reservationMap) {
+ public boolean reserveHistoryForPreloading(Map<GroupPartitionIdPair, Long>
reservationMap) {
return false;
}
diff --git
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/checkpoint/CheckpointHistory.java
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/checkpoint/CheckpointHistory.java
index 6b69bf02acb..b30cd6da5b5 100644
---
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/checkpoint/CheckpointHistory.java
+++
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/checkpoint/CheckpointHistory.java
@@ -39,6 +39,7 @@ import
org.apache.ignite.configuration.DataStorageConfiguration;
import org.apache.ignite.configuration.WALMode;
import org.apache.ignite.internal.pagemem.wal.IgniteWriteAheadLogManager;
import org.apache.ignite.internal.pagemem.wal.record.CacheState;
+import
org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GroupPartitionIdPair;
import
org.apache.ignite.internal.processors.cache.persistence.checkpoint.CheckpointEntry.GroupState;
import
org.apache.ignite.internal.processors.cache.persistence.checkpoint.EarliestCheckpointMapSnapshot.GroupStateSnapshot;
import
org.apache.ignite.internal.processors.cache.persistence.partstate.GroupPartitionId;
@@ -646,16 +647,14 @@ public class CheckpointHistory {
/**
* Tries to search for a WAL pointer for the given partition counter start.
*
- * @param searchCntrMap Search map contains (Group Id, partition, counter).
+ * @param searchCntrMap Search map contains (Pair of group ID and
partition ID, counter).
* @return Map of group-partition on checkpoint entry or empty map if
nothing found.
*/
- public Map<GroupPartitionId, CheckpointEntry> searchCheckpointEntry(
- Map<T2<Integer, Integer>, Long> searchCntrMap
- ) {
+ public Map<GroupPartitionId, CheckpointEntry>
searchCheckpointEntry(Map<GroupPartitionIdPair, Long> searchCntrMap) {
if (F.isEmpty(searchCntrMap))
return Collections.emptyMap();
- Map<T2<Integer, Integer>, Long> modifiedSearchMap = new
HashMap<>(searchCntrMap);
+ Map<GroupPartitionIdPair, Long> modifiedSearchMap = new
HashMap<>(searchCntrMap);
Map<GroupPartitionId, CheckpointEntry> res = new HashMap<>();
@@ -663,17 +662,17 @@ public class CheckpointHistory {
try {
CheckpointEntry cpEntry = entry(cpTs);
- Iterator<Map.Entry<T2<Integer, Integer>, Long>> iter =
modifiedSearchMap.entrySet().iterator();
+ Iterator<Map.Entry<GroupPartitionIdPair, Long>> iter =
modifiedSearchMap.entrySet().iterator();
while (iter.hasNext()) {
- Map.Entry<T2<Integer, Integer>, Long> entry = iter.next();
+ Map.Entry<GroupPartitionIdPair, Long> entry = iter.next();
- Long foundCntr = cpEntry.partitionCounter(wal,
entry.getKey().get1(), entry.getKey().get2());
+ Long foundCntr = cpEntry.partitionCounter(wal,
entry.getKey().groupId(), entry.getKey().partitionId());
if (foundCntr != null && foundCntr <= entry.getValue()) {
iter.remove();
- res.put(new GroupPartitionId(entry.getKey().get1(),
entry.getKey().get2()), cpEntry);
+ res.put(new GroupPartitionId(entry.getKey().groupId(),
entry.getKey().partitionId()), cpEntry);
}
}
diff --git
a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/CacheExchangeMessageDuplicatedStateTest.java
b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/CacheExchangeMessageDuplicatedStateTest.java
index 78db7aa4d2e..abbb7269066 100644
---
a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/CacheExchangeMessageDuplicatedStateTest.java
+++
b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/CacheExchangeMessageDuplicatedStateTest.java
@@ -234,8 +234,8 @@ public class CacheExchangeMessageDuplicatedStateTest
extends GridCommonAbstractT
if (partCntrs != null) {
for (CachePartitionFullCountersMap cntrs : partCntrs.values()) {
- long[] initUpdCntrs = getFieldValue(cntrs, "initialUpdCntrs");
- long[] updCntrs = getFieldValue(cntrs, "updCntrs");
+ long[] initUpdCntrs = cntrs.initialUpdateCounters();
+ long[] updCntrs = cntrs.updateCounters();
for (int i = 0; i < initUpdCntrs.length; i++) {
assertEquals(0, initUpdCntrs[i]);
diff --git
a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/WalPreloadingConcurrentTest.java
b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/WalPreloadingConcurrentTest.java
index 89789d90b18..bc25ce8c9e3 100644
---
a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/WalPreloadingConcurrentTest.java
+++
b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/WalPreloadingConcurrentTest.java
@@ -33,7 +33,7 @@ import org.apache.ignite.configuration.WALMode;
import org.apache.ignite.internal.IgniteEx;
import org.apache.ignite.internal.IgniteInternalFuture;
import org.apache.ignite.internal.processors.cache.IgniteInternalCache;
-import org.apache.ignite.internal.util.typedef.T2;
+import
org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GroupPartitionIdPair;
import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest;
import org.junit.Test;
@@ -118,7 +118,7 @@ public class WalPreloadingConcurrentTest extends
GridCommonAbstractTest {
while (!stop.get()) {
db.reserveHistoryForPreloading(Collections.singletonMap(
- new T2<Integer, Integer>(cache.context().groupId(),
randomPart),
+ new GroupPartitionIdPair(cache.context().groupId(),
randomPart),
0L
));
}