This is an automated email from the ASF dual-hosted git repository.
sergeychugunov pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/ignite.git
The following commit(s) were added to refs/heads/master by this push:
new 6591914ba0a IGNITE-26515 Use MessageSerializer for
GridDhtPartitionDemandMessage (#12387)
6591914ba0a is described below
commit 6591914ba0a6cf513527a654ba8280bfa42634a7
Author: Dmitry Werner <[email protected]>
AuthorDate: Thu Oct 9 16:59:27 2025 +0500
IGNITE-26515 Use MessageSerializer for GridDhtPartitionDemandMessage
(#12387)
---
.../communication/GridIoMessageFactory.java | 12 +-
.../cache/IgniteCacheOffheapManagerImpl.java | 2 +-
.../CachePartitionPartialCountersMap.java | 74 +++++++-
.../preloader/GridDhtPartitionDemandMessage.java | 203 +++------------------
.../dht/preloader/GridDhtPartitionDemander.java | 21 ++-
.../dht/preloader/GridDhtPartitionSupplier.java | 9 +-
.../dht/preloader/GridDhtPreloader.java | 4 +
.../dht/preloader/GridDhtPreloaderAssignments.java | 14 +-
.../preloader/IgniteDhtDemandedPartitionsMap.java | 34 +++-
.../apache/ignite/cache/CircledRebalanceTest.java | 2 +-
.../db/FullHistRebalanceOnClientStopTest.java | 2 +-
.../db/wal/HistoricalReservationTest.java | 2 +-
.../db/wal/WalRebalanceRestartTest.java | 2 +-
13 files changed, 170 insertions(+), 211 deletions(-)
diff --git
a/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoMessageFactory.java
b/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoMessageFactory.java
index 7640cd1f4e9..48cbc18fece 100644
---
a/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoMessageFactory.java
+++
b/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoMessageFactory.java
@@ -29,6 +29,7 @@ import
org.apache.ignite.internal.codegen.AtomicApplicationAttributesAwareReques
import
org.apache.ignite.internal.codegen.CacheContinuousQueryBatchAckSerializer;
import org.apache.ignite.internal.codegen.CacheEvictionEntrySerializer;
import org.apache.ignite.internal.codegen.CacheGroupAffinityMessageSerializer;
+import
org.apache.ignite.internal.codegen.CachePartitionPartialCountersMapSerializer;
import org.apache.ignite.internal.codegen.CacheVersionedValueSerializer;
import
org.apache.ignite.internal.codegen.GenerateEncryptionKeyRequestSerializer;
import org.apache.ignite.internal.codegen.GridCacheEntryInfoSerializer;
@@ -43,6 +44,7 @@ import
org.apache.ignite.internal.codegen.GridDeploymentResponseSerializer;
import
org.apache.ignite.internal.codegen.GridDhtAffinityAssignmentRequestSerializer;
import org.apache.ignite.internal.codegen.GridDhtAtomicNearResponseSerializer;
import org.apache.ignite.internal.codegen.GridDhtForceKeysRequestSerializer;
+import
org.apache.ignite.internal.codegen.GridDhtPartitionDemandMessageSerializer;
import org.apache.ignite.internal.codegen.GridDhtPartitionExchangeIdSerializer;
import
org.apache.ignite.internal.codegen.GridDhtPartitionSupplyMessageSerializer;
import
org.apache.ignite.internal.codegen.GridDhtPartitionsSingleRequestSerializer;
@@ -65,6 +67,7 @@ import
org.apache.ignite.internal.codegen.GridQueryNextPageRequestSerializer;
import org.apache.ignite.internal.codegen.GridQueryNextPageResponseSerializer;
import org.apache.ignite.internal.codegen.GridTaskCancelRequestSerializer;
import org.apache.ignite.internal.codegen.GridTaskResultRequestSerializer;
+import
org.apache.ignite.internal.codegen.IgniteDhtDemandedPartitionsMapSerializer;
import org.apache.ignite.internal.codegen.IgniteTxKeySerializer;
import
org.apache.ignite.internal.codegen.IncrementalSnapshotAwareMessageSerializer;
import org.apache.ignite.internal.codegen.JobStealingRequestSerializer;
@@ -145,6 +148,7 @@ import
org.apache.ignite.internal.processors.cache.distributed.dht.atomic.GridNe
import
org.apache.ignite.internal.processors.cache.distributed.dht.atomic.NearCacheUpdates;
import
org.apache.ignite.internal.processors.cache.distributed.dht.atomic.UpdateErrors;
import
org.apache.ignite.internal.processors.cache.distributed.dht.preloader.CacheGroupAffinityMessage;
+import
org.apache.ignite.internal.processors.cache.distributed.dht.preloader.CachePartitionPartialCountersMap;
import
org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtForceKeysRequest;
import
org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtForceKeysResponse;
import
org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPartitionDemandMessage;
@@ -153,6 +157,7 @@ import
org.apache.ignite.internal.processors.cache.distributed.dht.preloader.Gri
import
org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPartitionsFullMessage;
import
org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPartitionsSingleMessage;
import
org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPartitionsSingleRequest;
+import
org.apache.ignite.internal.processors.cache.distributed.dht.preloader.IgniteDhtDemandedPartitionsMap;
import
org.apache.ignite.internal.processors.cache.distributed.dht.preloader.latch.LatchAckMessage;
import
org.apache.ignite.internal.processors.cache.distributed.near.CacheVersionedValue;
import
org.apache.ignite.internal.processors.cache.distributed.near.GridNearGetRequest;
@@ -288,7 +293,7 @@ public class GridIoMessageFactory implements
MessageFactoryProvider {
factory.register((short)41, GridNearAtomicUpdateResponse::new, new
GridNearAtomicUpdateResponseSerializer());
factory.register((short)42, GridDhtForceKeysRequest::new, new
GridDhtForceKeysRequestSerializer());
factory.register((short)43, GridDhtForceKeysResponse::new);
- factory.register((short)45, GridDhtPartitionDemandMessage::new);
+ factory.register((short)45, GridDhtPartitionDemandMessage::new, new
GridDhtPartitionDemandMessageSerializer());
factory.register((short)46, GridDhtPartitionsFullMessage::new);
factory.register((short)47, GridDhtPartitionsSingleMessage::new);
factory.register((short)48, GridDhtPartitionsSingleRequest::new, new
GridDhtPartitionsSingleRequestSerializer());
@@ -386,6 +391,11 @@ public class GridIoMessageFactory implements
MessageFactoryProvider {
factory.register(StatisticsRequest.TYPE_CODE, StatisticsRequest::new);
factory.register(StatisticsResponse.TYPE_CODE,
StatisticsResponse::new);
+ factory.register(CachePartitionPartialCountersMap.TYPE_CODE,
CachePartitionPartialCountersMap::new,
+ new CachePartitionPartialCountersMapSerializer());
+ factory.register(IgniteDhtDemandedPartitionsMap.TYPE_CODE,
IgniteDhtDemandedPartitionsMap::new,
+ new IgniteDhtDemandedPartitionsMapSerializer());
+
// [-3..119] [124..129] [-23..-28] [-36..-55] [183..188] - this
// [120..123] - DR
// [-44, 0..2, 42, 200..204, 210, 302] - Use in tests.
diff --git
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheOffheapManagerImpl.java
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheOffheapManagerImpl.java
index 923eb153bef..bfcdd05db3f 100644
---
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheOffheapManagerImpl.java
+++
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheOffheapManagerImpl.java
@@ -949,7 +949,7 @@ public class IgniteCacheOffheapManagerImpl implements
IgniteCacheOffheapManager
Set<Integer> missing = new HashSet<>();
- for (Integer p : parts.fullSet()) {
+ for (Integer p : parts.full()) {
GridCloseableIterator<CacheDataRow> partIter = reservedIterator(p,
topVer);
if (partIter == null) {
diff --git
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/CachePartitionPartialCountersMap.java
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/CachePartitionPartialCountersMap.java
index bc9871bb5fb..eaf0593154e 100644
---
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/CachePartitionPartialCountersMap.java
+++
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/CachePartitionPartialCountersMap.java
@@ -21,13 +21,18 @@ import java.io.Serializable;
import java.util.Arrays;
import java.util.Collections;
import java.util.Map;
+import org.apache.ignite.internal.Order;
import org.apache.ignite.internal.util.typedef.T2;
import org.apache.ignite.internal.util.typedef.internal.U;
+import org.apache.ignite.plugin.extensions.communication.Message;
/**
*
*/
-public class CachePartitionPartialCountersMap implements Serializable {
+public class CachePartitionPartialCountersMap implements Serializable, Message
{
+ /** */
+ public static final short TYPE_CODE = 500;
+
/** */
private static final long serialVersionUID = 0L;
@@ -35,19 +40,23 @@ public class CachePartitionPartialCountersMap implements
Serializable {
public static final CachePartitionPartialCountersMap EMPTY = new
CachePartitionPartialCountersMap();
/** */
+ @Order(value = 0, method = "partitionIds")
private int[] partIds;
/** */
+ @Order(value = 1, method = "initialUpdateCounters")
private long[] initialUpdCntrs;
/** */
+ @Order(value = 2, method = "updateCounters")
private long[] updCntrs;
/** */
+ @Order(value = 3, method = "currentIndex")
private int curIdx;
/** */
- private CachePartitionPartialCountersMap() {
+ public CachePartitionPartialCountersMap() {
this(0);
}
@@ -226,4 +235,65 @@ public class CachePartitionPartialCountersMap implements
Serializable {
return sb.toString();
}
+
+ /**
+ * @return Partition IDs.
+ */
+ public int[] partitionIds() {
+ return partIds;
+ }
+
+ /**
+ * @param partIds Partition IDs.
+ */
+ public void partitionIds(int[] partIds) {
+ this.partIds = partIds;
+ }
+
+ /**
+ * @return Partition initial update counters.
+ */
+ public long[] initialUpdateCounters() {
+ return initialUpdCntrs;
+ }
+
+ /**
+ * @param initialUpdCntrs Partition initial update counters.
+ */
+ public void initialUpdateCounters(long[] initialUpdCntrs) {
+ this.initialUpdCntrs = initialUpdCntrs;
+ }
+
+ /**
+ * @return Partition update counters.
+ */
+ public long[] updateCounters() {
+ return updCntrs;
+ }
+
+ /**
+ * @param updCntrs Partition update counters.
+ */
+ public void updateCounters(long[] updCntrs) {
+ this.updCntrs = updCntrs;
+ }
+
+ /**
+ * @return Current index.
+ */
+ public int currentIndex() {
+ return curIdx;
+ }
+
+ /**
+ * @param curIdx Current index.
+ */
+ public void currentIndex(int curIdx) {
+ this.curIdx = curIdx;
+ }
+
+ /** {@inheritDoc} */
+ @Override public short directType() {
+ return TYPE_CODE;
+ }
}
diff --git
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionDemandMessage.java
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionDemandMessage.java
index d34af84101c..b06ce1df103 100644
---
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionDemandMessage.java
+++
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionDemandMessage.java
@@ -17,52 +17,34 @@
package org.apache.ignite.internal.processors.cache.distributed.dht.preloader;
-import java.nio.ByteBuffer;
-import org.apache.ignite.IgniteCheckedException;
-import org.apache.ignite.internal.GridDirectTransient;
-import org.apache.ignite.internal.IgniteCodeGeneratingFail;
+import org.apache.ignite.internal.Order;
import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion;
import org.apache.ignite.internal.processors.cache.GridCacheGroupIdMessage;
-import
org.apache.ignite.internal.processors.cache.GridCachePartitionExchangeManager;
-import org.apache.ignite.internal.processors.cache.GridCacheSharedContext;
import org.apache.ignite.internal.util.typedef.internal.S;
-import org.apache.ignite.internal.util.typedef.internal.U;
-import org.apache.ignite.plugin.extensions.communication.MessageReader;
-import org.apache.ignite.plugin.extensions.communication.MessageWriter;
import org.jetbrains.annotations.NotNull;
/**
* Partition demand request.
*/
-@IgniteCodeGeneratingFail
public class GridDhtPartitionDemandMessage extends GridCacheGroupIdMessage {
- /** Cache rebalance topic. */
- private static final Object REBALANCE_TOPIC =
GridCachePartitionExchangeManager.rebalanceTopic(0);
-
/** Rebalance id. */
+ @Order(4)
private long rebalanceId;
/** Partitions map. */
- @GridDirectTransient
+ @Order(value = 5, method = "partitions")
private IgniteDhtDemandedPartitionsMap parts;
- /** Serialized partitions map. */
- private byte[] partsBytes;
-
- /** Topic. */
- @GridDirectTransient
- private Object topic = REBALANCE_TOPIC;
-
- /** Serialized topic. */
- private byte[] topicBytes;
-
/** Timeout. */
+ @Order(6)
private long timeout;
/** Worker ID. */
+ @Order(7)
private int workerId = -1;
/** Topology version. */
+ @Order(value = 8, method = "topologyVersion")
private AffinityTopologyVersion topVer;
/**
@@ -105,7 +87,6 @@ public class GridDhtPartitionDemandMessage extends
GridCacheGroupIdMessage {
GridDhtPartitionDemandMessage cp = new GridDhtPartitionDemandMessage();
cp.grpId = grpId;
cp.rebalanceId = rebalanceId;
- cp.topic = topic;
cp.timeout = timeout;
cp.workerId = workerId;
cp.topVer = topVer;
@@ -114,58 +95,58 @@ public class GridDhtPartitionDemandMessage extends
GridCacheGroupIdMessage {
}
/**
- * @return Partition.
+ * @return Partitions.
*/
public IgniteDhtDemandedPartitionsMap partitions() {
return parts;
}
+ /**
+ * @param parts Partitions.
+ */
+ public void partitions(IgniteDhtDemandedPartitionsMap parts) {
+ this.parts = parts;
+ }
+
/**
* @param updateSeq Update sequence.
*/
- void rebalanceId(long updateSeq) {
- this.rebalanceId = updateSeq;
+ public void rebalanceId(long updateSeq) {
+ rebalanceId = updateSeq;
}
/**
* @return Unique rebalance session id.
*/
- long rebalanceId() {
+ public long rebalanceId() {
return rebalanceId;
}
/**
* @return Reply message timeout.
*/
- long timeout() {
+ public long timeout() {
return timeout;
}
/**
* @param timeout Timeout.
*/
- void timeout(long timeout) {
+ public void timeout(long timeout) {
this.timeout = timeout;
}
- /**
- * @return Topic.
- */
- Object topic() {
- return topic;
- }
-
/**
* @return Worker ID.
*/
- int workerId() {
+ public int workerId() {
return workerId;
}
/**
* @param workerId Worker ID.
*/
- void workerId(int workerId) {
+ public void workerId(int workerId) {
this.workerId = workerId;
}
@@ -176,26 +157,11 @@ public class GridDhtPartitionDemandMessage extends
GridCacheGroupIdMessage {
return topVer;
}
- /** {@inheritDoc} */
- @Override public void prepareMarshal(GridCacheSharedContext ctx) throws
IgniteCheckedException {
- super.prepareMarshal(ctx);
-
- if (topic != null && topicBytes == null)
- topicBytes = U.marshal(ctx, topic);
-
- if (parts != null && partsBytes == null)
- partsBytes = U.marshal(ctx, parts);
- }
-
- /** {@inheritDoc} */
- @Override public void finishUnmarshal(GridCacheSharedContext ctx,
ClassLoader ldr) throws IgniteCheckedException {
- super.finishUnmarshal(ctx, ldr);
-
- if (topicBytes != null && topic == null)
- topic = U.unmarshal(ctx, topicBytes, U.resolveClassLoader(ldr,
ctx.gridConfig()));
-
- if (partsBytes != null && parts == null)
- parts = U.unmarshal(ctx, partsBytes, U.resolveClassLoader(ldr,
ctx.gridConfig()));
+ /**
+ * @param topVer Topology version for which demand message is sent.
+ */
+ public void topologyVersion(AffinityTopologyVersion topVer) {
+ this.topVer = topVer;
}
/** {@inheritDoc} */
@@ -203,123 +169,6 @@ public class GridDhtPartitionDemandMessage extends
GridCacheGroupIdMessage {
return false;
}
- /** {@inheritDoc} */
- @Override public boolean writeTo(ByteBuffer buf, MessageWriter writer) {
- writer.setBuffer(buf);
-
- if (!super.writeTo(buf, writer))
- return false;
-
- if (!writer.isHeaderWritten()) {
- if (!writer.writeHeader(directType()))
- return false;
-
- writer.onHeaderWritten();
- }
-
- switch (writer.state()) {
- case 4:
- if (!writer.writeByteArray(partsBytes))
- return false;
-
- writer.incrementState();
-
- case 5:
- if (!writer.writeLong(timeout))
- return false;
-
- writer.incrementState();
-
- case 6:
- if (!writer.writeAffinityTopologyVersion(topVer))
- return false;
-
- writer.incrementState();
-
- case 7:
- if (!writer.writeByteArray(topicBytes))
- return false;
-
- writer.incrementState();
-
- case 8:
- if (!writer.writeLong(rebalanceId))
- return false;
-
- writer.incrementState();
-
- case 9:
- if (!writer.writeInt(workerId))
- return false;
-
- writer.incrementState();
-
- }
-
- return true;
- }
-
- /** {@inheritDoc} */
- @Override public boolean readFrom(ByteBuffer buf, MessageReader reader) {
- reader.setBuffer(buf);
-
- if (!super.readFrom(buf, reader))
- return false;
-
- switch (reader.state()) {
- case 4:
- partsBytes = reader.readByteArray();
-
- if (!reader.isLastRead())
- return false;
-
- reader.incrementState();
-
- case 5:
- timeout = reader.readLong();
-
- if (!reader.isLastRead())
- return false;
-
- reader.incrementState();
-
- case 6:
- topVer = reader.readAffinityTopologyVersion();
-
- if (!reader.isLastRead())
- return false;
-
- reader.incrementState();
-
- case 7:
- topicBytes = reader.readByteArray();
-
- if (!reader.isLastRead())
- return false;
-
- reader.incrementState();
-
- case 8:
- rebalanceId = reader.readLong();
-
- if (!reader.isLastRead())
- return false;
-
- reader.incrementState();
-
- case 9:
- workerId = reader.readInt();
-
- if (!reader.isLastRead())
- return false;
-
- reader.incrementState();
-
- }
-
- return true;
- }
-
/** {@inheritDoc} */
@Override public short directType() {
return 45;
diff --git
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionDemander.java
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionDemander.java
index 2eea404a145..e6ff9257eec 100644
---
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionDemander.java
+++
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionDemander.java
@@ -96,6 +96,7 @@ import static
org.apache.ignite.events.EventType.EVT_CACHE_REBALANCE_STARTED;
import static org.apache.ignite.events.EventType.EVT_CACHE_REBALANCE_STOPPED;
import static
org.apache.ignite.internal.processors.cache.CacheGroupMetricsImpl.CACHE_GROUP_METRICS_PREFIX;
import static
org.apache.ignite.internal.processors.cache.GridCacheUtils.TTL_ETERNAL;
+import static
org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPreloader.REBALANCE_TOPIC;
import static
org.apache.ignite.internal.processors.cache.distributed.dht.topology.GridDhtPartitionState.MOVING;
import static
org.apache.ignite.internal.processors.cache.persistence.CheckpointState.FINISHED;
import static
org.apache.ignite.internal.processors.cache.persistence.CheckpointState.PAGE_SNAPSHOT_TAKEN;
@@ -406,7 +407,7 @@ public class GridDhtPartitionDemander {
metrics.clearRebalanceCounters();
for (GridDhtPartitionDemandMessage msg :
assignments.values()) {
- for (Integer partId : msg.partitions().fullSet())
+ for (Integer partId : msg.partitions().full())
metrics.onRebalancingKeysCountEstimateReceived(grp.topology().globalPartSizes().get(partId));
CachePartitionPartialCountersMap histMap =
msg.partitions().historicalMap();
@@ -685,7 +686,7 @@ public class GridDhtPartitionDemander {
if (!fut.isDone()) {
// Send demand message.
try {
- ctx.io().sendOrderedMessage(node, d.topic(), d,
grp.ioPolicy(), grp.preloader().timeout());
+ ctx.io().sendOrderedMessage(node, REBALANCE_TOPIC, d,
grp.ioPolicy(), grp.preloader().timeout());
if (log.isDebugEnabled())
log.debug("Send next demand message [" +
demandRoutineInfo(nodeId, supplyMsg) + "]");
@@ -1062,13 +1063,13 @@ public class GridDhtPartitionDemander {
HashSet<Integer> parts = new HashSet<>(v.partitions().size());
parts.addAll(v.partitions().historicalSet());
- parts.addAll(v.partitions().fullSet());
+ parts.addAll(v.partitions().full());
rebalancingParts.put(k.id(), parts);
historical.addAll(v.partitions().historicalSet());
- Stream.concat(v.partitions().historicalSet().stream(),
v.partitions().fullSet().stream())
+ Stream.concat(v.partitions().historicalSet().stream(),
v.partitions().full().stream())
.forEach(
p -> {
queued.put(p, new LongAdder());
@@ -1181,11 +1182,11 @@ public class GridDhtPartitionDemander {
// Make sure partitions scheduled for full rebalancing are
cleared first.
// Clearing attempt is also required for in-memory caches
because some partitions can be switched
// from RENTING to MOVING state in the middle of clearing.
- final int fullSetSize = d.partitions().fullSet().size();
+ final int fullSetSize = d.partitions().full().size();
AtomicInteger waitCnt = new AtomicInteger(fullSetSize);
- for (Integer partId : d.partitions().fullSet()) {
+ for (Integer partId : d.partitions().full()) {
GridDhtLocalPartition part =
grp.topology().localPartition(partId);
// Due to rebalance cancellation it's possible for a
group to be already partially rebalanced,
@@ -1213,7 +1214,7 @@ public class GridDhtPartitionDemander {
}
// The special case for historical only rebalancing.
- if (d.partitions().fullSet().isEmpty() &&
!d.partitions().historicalSet().isEmpty())
+ if (d.partitions().full().isEmpty() &&
!d.partitions().historicalSet().isEmpty())
ctx.kernalContext().closure().runLocalSafe((GridPlainRunnable)() ->
requestPartitions0(node, parts, d));
}
}
@@ -1237,11 +1238,11 @@ public class GridDhtPartitionDemander {
log.info("Starting rebalance routine [" +
grp.cacheOrGroupName() +
", topVer=" + topVer +
", supplier=" + supplierNode.id() +
- ", fullPartitions=" +
S.toStringSortedDistinct(parts.fullSet()) +
+ ", fullPartitions=" +
S.toStringSortedDistinct(parts.full()) +
", histPartitions=" +
S.toStringSortedDistinct(parts.historicalSet()) +
", rebalanceId=" + rebalanceId + ']');
- ctx.io().sendOrderedMessage(supplierNode, msg.topic(), msg,
grp.ioPolicy(), msg.timeout());
+ ctx.io().sendOrderedMessage(supplierNode, REBALANCE_TOPIC,
msg, grp.ioPolicy(), msg.timeout());
// Cleanup required in case partitions demanded in parallel
with cancellation.
synchronized (this) {
@@ -1676,7 +1677,7 @@ public class GridDhtPartitionDemander {
p0.addAll(partitions);
for (GridDhtPartitionDemandMessage msg : newAssignments.values()) {
- p1.addAll(msg.partitions().fullSet());
+ p1.addAll(msg.partitions().full());
p1.addAll(msg.partitions().historicalSet());
}
diff --git
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionSupplier.java
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionSupplier.java
index cc29d0f58bd..85180dff2ef 100644
---
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionSupplier.java
+++
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionSupplier.java
@@ -53,6 +53,7 @@ import org.apache.ignite.spi.IgniteSpiException;
import static
org.apache.ignite.events.EventType.EVT_CACHE_REBALANCE_PART_MISSED;
import static
org.apache.ignite.events.EventType.EVT_CACHE_REBALANCE_PART_SUPPLIED;
+import static
org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPreloader.REBALANCE_TOPIC;
import static
org.apache.ignite.internal.processors.cache.distributed.dht.topology.GridDhtPartitionState.OWNING;
/**
@@ -267,7 +268,7 @@ public class GridDhtPartitionSupplier {
if (sctx == null) {
if (log.isDebugEnabled())
log.debug("Starting supplying rebalancing [" +
supplyRoutineInfo(topicId, nodeId, demandMsg) +
- ", fullPartitions=" +
S.toStringSortedDistinct(demandMsg.partitions().fullSet()) +
+ ", fullPartitions=" +
S.toStringSortedDistinct(demandMsg.partitions().full()) +
", histPartitions=" +
S.toStringSortedDistinct(demandMsg.partitions().historicalSet()) + "]");
}
else
@@ -275,7 +276,7 @@ public class GridDhtPartitionSupplier {
if (sctx == null || sctx.iterator == null) {
- remainingParts = new
HashSet<>(demandMsg.partitions().fullSet());
+ remainingParts = new HashSet<>(demandMsg.partitions().full());
CachePartitionPartialCountersMap histMap =
demandMsg.partitions().historicalMap();
@@ -456,7 +457,7 @@ public class GridDhtPartitionSupplier {
// Mark all remaining partitions as missed to trigger full
rebalance.
if (iter == null && F.isEmpty(remainingParts)) {
- remainingParts = new
HashSet<>(demandMsg.partitions().fullSet());
+ remainingParts = new
HashSet<>(demandMsg.partitions().full());
remainingParts.addAll(demandMsg.partitions().historicalSet());
}
@@ -518,7 +519,7 @@ public class GridDhtPartitionSupplier {
if (log.isDebugEnabled())
log.debug("Send next supply message [" +
supplyRoutineInfo(topicId, demander.id(), demandMsg) + "]");
- grp.shared().io().sendOrderedMessage(demander, demandMsg.topic(),
supplyMsg, grp.ioPolicy(), demandMsg.timeout());
+ grp.shared().io().sendOrderedMessage(demander, REBALANCE_TOPIC,
supplyMsg, grp.ioPolicy(), demandMsg.timeout());
// Throttle preloading.
if (rebalanceThrottleOverride > 0)
diff --git
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPreloader.java
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPreloader.java
index a22f7a764d3..606ba1352ae 100644
---
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPreloader.java
+++
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPreloader.java
@@ -32,6 +32,7 @@ import
org.apache.ignite.internal.processors.affinity.AffinityAssignment;
import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion;
import org.apache.ignite.internal.processors.cache.CacheGroupContext;
import org.apache.ignite.internal.processors.cache.GridCacheContext;
+import
org.apache.ignite.internal.processors.cache.GridCachePartitionExchangeManager;
import org.apache.ignite.internal.processors.cache.GridCachePreloaderAdapter;
import org.apache.ignite.internal.processors.cache.KeyCacheObject;
import
org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtFuture;
@@ -62,6 +63,9 @@ public class GridDhtPreloader extends
GridCachePreloaderAdapter {
/** Default preload resend timeout. */
public static final long DFLT_PRELOAD_RESEND_TIMEOUT = 1500;
+ /** Cache rebalance topic. */
+ static final Object REBALANCE_TOPIC =
GridCachePartitionExchangeManager.rebalanceTopic(0);
+
/** Disable rebalancing cancellation optimization. */
private final boolean disableRebalancingCancellationOptimization =
IgniteSystemProperties.getBoolean(
IGNITE_DISABLE_REBALANCING_CANCELLATION_OPTIMIZATION);
diff --git
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPreloaderAssignments.java
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPreloaderAssignments.java
index 73fbce85dee..d21f9ffa1b9 100644
---
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPreloaderAssignments.java
+++
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPreloaderAssignments.java
@@ -17,6 +17,8 @@
package org.apache.ignite.internal.processors.cache.distributed.dht.preloader;
+import java.util.Collection;
+import java.util.HashSet;
import java.util.Iterator;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
@@ -148,22 +150,22 @@ public class GridDhtPreloaderAssignments extends
ConcurrentHashMap<ClusterNode,
}
}
- Set<Integer> curFullSet = cntrMap.fullSet();
+ Collection<Integer> curFull = cntrMap.full();
Set<Integer> newFullSet = null;
- if (!curFullSet.isEmpty()) {
+ if (!curFull.isEmpty()) {
int moving = 0;
// Fast-path check.
- for (Integer partId : curFullSet) {
+ for (Integer partId : curFull) {
if (top.localPartition(partId).state() == MOVING)
moving++;
}
- if (moving != curFullSet.size()) {
+ if (moving != curFull.size()) {
newFullSet = U.newHashSet(moving);
- for (Integer partId : curFullSet) {
+ for (Integer partId : curFull) {
if (top.localPartition(partId).state() == MOVING)
newFullSet.add(partId);
}
@@ -175,7 +177,7 @@ public class GridDhtPreloaderAssignments extends
ConcurrentHashMap<ClusterNode,
newHistMap = curHistMap;
if (newFullSet == null)
- newFullSet = curFullSet;
+ newFullSet = new HashSet<>(curFull);
IgniteDhtDemandedPartitionsMap newMap = new
IgniteDhtDemandedPartitionsMap(newHistMap, newFullSet);
diff --git
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/IgniteDhtDemandedPartitionsMap.java
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/IgniteDhtDemandedPartitionsMap.java
index c27d856970d..91004bb83be 100644
---
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/IgniteDhtDemandedPartitionsMap.java
+++
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/IgniteDhtDemandedPartitionsMap.java
@@ -23,24 +23,31 @@ import java.util.Collection;
import java.util.Collections;
import java.util.HashSet;
import java.util.Set;
+import org.apache.ignite.internal.Order;
import org.apache.ignite.internal.util.tostring.GridToStringInclude;
import org.apache.ignite.internal.util.typedef.F;
import org.apache.ignite.internal.util.typedef.internal.S;
+import org.apache.ignite.plugin.extensions.communication.Message;
import org.jetbrains.annotations.Nullable;
/**
* Map of partitions demanded during rebalancing.
*/
-public class IgniteDhtDemandedPartitionsMap implements Serializable {
+public class IgniteDhtDemandedPartitionsMap implements Serializable, Message {
+ /** */
+ public static final short TYPE_CODE = 501;
+
/** */
private static final long serialVersionUID = 0L;
/** Map of partitions that will be preloaded from history. (partId ->
(fromCntr, toCntr)). */
+ @Order(value = 0, method = "historicalMap")
private CachePartitionPartialCountersMap historical;
- /** Set of partitions that will be preloaded from all it's current data. */
+ /** Set of partitions that require full rebalancing. */
+ @Order(1)
@GridToStringInclude
- private Set<Integer> full;
+ private Collection<Integer> full;
/**
* @param historical Historical partition set.
@@ -153,11 +160,21 @@ public class IgniteDhtDemandedPartitionsMap implements
Serializable {
}
/** */
- public Set<Integer> fullSet() {
+ public void historicalMap(CachePartitionPartialCountersMap historical) {
+ this.historical = historical;
+ }
+
+ /** */
+ public Collection<Integer> full() {
if (full == null)
return Collections.emptySet();
- return Collections.unmodifiableSet(full);
+ return Collections.unmodifiableCollection(full);
+ }
+
+ /** */
+ public void full(Collection<Integer> full) {
+ this.full = full;
}
/** */
@@ -178,7 +195,7 @@ public class IgniteDhtDemandedPartitionsMap implements
Serializable {
/** */
public Collection<Integer> all() {
- return F.concat(false, fullSet(), historicalSet());
+ return F.concat(false, full(), historicalSet());
}
@@ -186,4 +203,9 @@ public class IgniteDhtDemandedPartitionsMap implements
Serializable {
@Override public String toString() {
return S.toString(IgniteDhtDemandedPartitionsMap.class, this);
}
+
+ /** {@inheritDoc} */
+ @Override public short directType() {
+ return TYPE_CODE;
+ }
}
diff --git
a/modules/core/src/test/java/org/apache/ignite/cache/CircledRebalanceTest.java
b/modules/core/src/test/java/org/apache/ignite/cache/CircledRebalanceTest.java
index 587ac90acdc..f8fc31de61a 100644
---
a/modules/core/src/test/java/org/apache/ignite/cache/CircledRebalanceTest.java
+++
b/modules/core/src/test/java/org/apache/ignite/cache/CircledRebalanceTest.java
@@ -169,7 +169,7 @@ public class CircledRebalanceTest extends
GridCommonAbstractTest {
if (msg instanceof GridDhtPartitionDemandMessage) {
GridDhtPartitionDemandMessage demandMsg =
(GridDhtPartitionDemandMessage)msg;
- hasFullRebalance.compareAndSet(false,
!F.isEmpty(demandMsg.partitions().fullSet()));
+ hasFullRebalance.compareAndSet(false,
!F.isEmpty(demandMsg.partitions().full()));
return true;
}
diff --git
a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/db/FullHistRebalanceOnClientStopTest.java
b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/db/FullHistRebalanceOnClientStopTest.java
index 7c84d32cacf..f95b8bb025b 100644
---
a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/db/FullHistRebalanceOnClientStopTest.java
+++
b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/db/FullHistRebalanceOnClientStopTest.java
@@ -246,7 +246,7 @@ public class FullHistRebalanceOnClientStopTest extends
GridCommonAbstractTest {
}
}
- if (!map.fullSet().isEmpty()) {
+ if (!map.full().isEmpty()) {
synchronized (mux) {
topVersForFull = true;
}
diff --git
a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/db/wal/HistoricalReservationTest.java
b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/db/wal/HistoricalReservationTest.java
index 5478ec6e085..0c4c80211a3 100644
---
a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/db/wal/HistoricalReservationTest.java
+++
b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/db/wal/HistoricalReservationTest.java
@@ -194,7 +194,7 @@ public class HistoricalReservationTest extends
GridCommonAbstractTest {
if (msg instanceof GridDhtPartitionDemandMessage) {
GridDhtPartitionDemandMessage demandMsg =
(GridDhtPartitionDemandMessage)msg;
- if (!F.isEmpty(demandMsg.partitions().fullSet()))
+ if (!F.isEmpty(demandMsg.partitions().full()))
hasFullRebalance.compareAndSet(false, true);
}
diff --git
a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/db/wal/WalRebalanceRestartTest.java
b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/db/wal/WalRebalanceRestartTest.java
index 19f6e9f67bd..6b4d5e2552b 100644
---
a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/db/wal/WalRebalanceRestartTest.java
+++
b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/db/wal/WalRebalanceRestartTest.java
@@ -268,7 +268,7 @@ public class WalRebalanceRestartTest extends
GridCommonAbstractTest {
if (rebTopVer == null ||
rebTopVer.before(demandMsg.topologyVersion()))
rebTopVer = demandMsg.topologyVersion();
- if (!F.isEmpty(demandMsg.partitions().fullSet()))
+ if (!F.isEmpty(demandMsg.partitions().full()))
hasFullRebalance.compareAndSet(false, true);
}