This is an automated email from the ASF dual-hosted git repository.
shishkovilja pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/ignite.git
The following commit(s) were added to refs/heads/master by this push:
new 0bac455c329 IGNITE-26434 Refactor GridDhtPartitionSupplyMessage
(#12355)
0bac455c329 is described below
commit 0bac455c32934957de87beb16a3ef2c8bab51faf
Author: Ilya Shishkov <[email protected]>
AuthorDate: Mon Sep 29 15:33:08 2025 +0300
IGNITE-26434 Refactor GridDhtPartitionSupplyMessage (#12355)
---
.../communication/GridIoMessageFactory.java | 5 +-
.../dht/preloader/GridDhtPartitionDemander.java | 20 +-
.../dht/preloader/GridDhtPartitionSupplier.java | 28 +-
.../GridDhtPartitionSupplyErrorMessage.java | 138 -------
.../preloader/GridDhtPartitionSupplyMessage.java | 402 +++++++--------------
.../main/resources/META-INF/classnames.properties | 1 -
.../cache/CacheNoAffinityExchangeTest.java | 4 +-
7 files changed, 152 insertions(+), 446 deletions(-)
diff --git
a/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoMessageFactory.java
b/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoMessageFactory.java
index 163d9d211f1..50f6d2df1a3 100644
---
a/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoMessageFactory.java
+++
b/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoMessageFactory.java
@@ -42,6 +42,7 @@ import
org.apache.ignite.internal.codegen.GridDhtAffinityAssignmentRequestSerial
import org.apache.ignite.internal.codegen.GridDhtAtomicNearResponseSerializer;
import org.apache.ignite.internal.codegen.GridDhtForceKeysRequestSerializer;
import org.apache.ignite.internal.codegen.GridDhtPartitionExchangeIdSerializer;
+import
org.apache.ignite.internal.codegen.GridDhtPartitionSupplyMessageSerializer;
import
org.apache.ignite.internal.codegen.GridDhtPartitionsSingleRequestSerializer;
import
org.apache.ignite.internal.codegen.GridDhtTxOnePhaseCommitAckRequestSerializer;
import org.apache.ignite.internal.codegen.GridDhtTxPrepareRequestSerializer;
@@ -145,7 +146,6 @@ import
org.apache.ignite.internal.processors.cache.distributed.dht.preloader.Gri
import
org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtForceKeysResponse;
import
org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPartitionDemandMessage;
import
org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPartitionExchangeId;
-import
org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPartitionSupplyErrorMessage;
import
org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPartitionSupplyMessage;
import
org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPartitionsFullMessage;
import
org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPartitionsSingleMessage;
@@ -337,7 +337,7 @@ public class GridIoMessageFactory implements
MessageFactoryProvider {
factory.register((short)109, GridQueryNextPageResponse::new, new
GridQueryNextPageResponseSerializer());
factory.register((short)112, GridCacheSqlQuery::new, new
GridCacheSqlQuerySerializer());
// 113 - BinaryObjectImpl
- factory.register((short)114, GridDhtPartitionSupplyMessage::new);
+ factory.register((short)114, GridDhtPartitionSupplyMessage::new, new
GridDhtPartitionSupplyMessageSerializer());
factory.register((short)115, UUIDCollectionMessage::new);
factory.register((short)116, GridNearSingleGetRequest::new);
factory.register((short)117, GridNearSingleGetResponse::new);
@@ -356,7 +356,6 @@ public class GridIoMessageFactory implements
MessageFactoryProvider {
factory.register((short)134, ContinuousRoutineStartResultMessage::new);
factory.register((short)135, LatchAckMessage::new, new
LatchAckMessageSerializer());
factory.register((short)157, PartitionUpdateCountersMessage::new);
- factory.register((short)158, GridDhtPartitionSupplyErrorMessage::new);
factory.register((short)162, GenerateEncryptionKeyRequest::new, new
GenerateEncryptionKeyRequestSerializer());
factory.register((short)163, GenerateEncryptionKeyResponse::new);
factory.register((short)167, ServiceDeploymentProcessId::new);
diff --git
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionDemander.java
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionDemander.java
index 16a68ea4703..2eea404a145 100644
---
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionDemander.java
+++
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionDemander.java
@@ -472,7 +472,7 @@ public class GridDhtPartitionDemander {
if (fut.isActual(supplyMsg.rebalanceId())) {
boolean historical = false;
- for (Integer p : supplyMsg.infos().keySet()) {
+ for (Integer p : supplyMsg.getInfosSafe().keySet()) {
fut.queued.get(p).increment();
if (fut.historical.contains(p))
@@ -546,7 +546,7 @@ public class GridDhtPartitionDemander {
if (msgExc != null) {
GridDhtPartitionMap partMap = top.localPartitionMap();
- Set<Integer> unstableParts =
supplyMsg.infos().keySet().stream()
+ Set<Integer> unstableParts =
supplyMsg.getInfosSafe().keySet().stream()
.filter(p -> partMap.get(p) == MOVING)
.collect(Collectors.toSet());
@@ -563,12 +563,6 @@ public class GridDhtPartitionDemander {
for (GridCacheContext cctx : grp.caches()) {
if (cctx.statisticsEnabled()) {
- long keysCnt = grp.sharedGroup() ?
supplyMsg.keysForCache(cctx.cacheId()) :
- supplyMsg.estimatedKeysCount();
-
- if (keysCnt != -1)
-
cctx.cache().metrics0().onRebalancingKeysCountEstimateReceived(keysCnt);
-
// Can not be calculated per cache.
cctx.cache().metrics0().onRebalanceBatchReceived(supplyMsg.messageSize());
}
@@ -578,7 +572,7 @@ public class GridDhtPartitionDemander {
AffinityAssignment aff = grp.affinity().cachedAffinity(topVer);
// Preload.
- for (Map.Entry<Integer, CacheEntryInfoCollection> e :
supplyMsg.infos().entrySet()) {
+ for (Map.Entry<Integer, CacheEntryInfoCollection> e :
supplyMsg.getInfosSafe().entrySet()) {
int p = e.getKey();
if (aff.get(p).contains(ctx.localNode())) {
@@ -604,7 +598,7 @@ public class GridDhtPartitionDemander {
assert part != null;
- boolean last = supplyMsg.last().containsKey(p);
+ boolean last = supplyMsg.last() != null &&
supplyMsg.last().containsKey(p);
if (part.state() == MOVING) {
boolean reserved = part.reserve();
@@ -670,13 +664,15 @@ public class GridDhtPartitionDemander {
}
}
+ Collection<Integer> missed = supplyMsg.missed() == null ?
Collections.emptyList() : supplyMsg.missed();
+
// Only request partitions based on latest topology version.
- for (Integer miss : supplyMsg.missed()) {
+ for (Integer miss : missed) {
if (aff.get(miss).contains(ctx.localNode()))
fut.partitionMissed(nodeId, miss);
}
- for (Integer miss : supplyMsg.missed())
+ for (Integer miss : missed)
fut.partitionDone(nodeId, miss, false);
GridDhtPartitionDemandMessage d = new
GridDhtPartitionDemandMessage(
diff --git
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionSupplier.java
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionSupplier.java
index 6f6ccdc1337..cc29d0f58bd 100644
---
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionSupplier.java
+++
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionSupplier.java
@@ -40,7 +40,6 @@ import
org.apache.ignite.internal.processors.cache.CacheGroupContext;
import org.apache.ignite.internal.processors.cache.GridCacheEntryInfo;
import org.apache.ignite.internal.processors.cache.IgniteRebalanceIterator;
import
org.apache.ignite.internal.processors.cache.distributed.dht.topology.GridDhtLocalPartition;
-import
org.apache.ignite.internal.processors.cache.distributed.dht.topology.GridDhtPartitionState;
import
org.apache.ignite.internal.processors.cache.distributed.dht.topology.GridDhtPartitionTopology;
import org.apache.ignite.internal.processors.cache.persistence.CacheDataRow;
import org.apache.ignite.internal.util.tostring.GridToStringExclude;
@@ -287,27 +286,6 @@ public class GridDhtPartitionSupplier {
}
iter = grp.offheap().rebalanceIterator(demandMsg.partitions(),
demandMsg.topologyVersion());
-
- for (Integer part : demandMsg.partitions().fullSet()) {
- if (iter.isPartitionMissing(part))
- continue;
-
- GridDhtLocalPartition loc = top.localPartition(part,
demandMsg.topologyVersion(), false);
-
- assert loc != null && loc.state() ==
GridDhtPartitionState.OWNING
- : "Partition should be in OWNING state: " + loc;
-
-
supplyMsg.addEstimatedKeysCount(loc.dataStore().fullSize());
- }
-
- for (int i = 0; i < histMap.size(); i++) {
- int p = histMap.partitionAt(i);
-
- if (iter.isPartitionMissing(p))
- continue;
-
- supplyMsg.addEstimatedKeysCount(histMap.updateCounterAt(i)
- histMap.initialUpdateCounterAt(i));
- }
}
else {
iter = sctx.iterator;
@@ -381,7 +359,7 @@ public class GridDhtPartitionSupplier {
supplyMsg.addEntry0(part, iter.historical(part), info,
grp.shared(), grp.cacheObjectContext());
if (iter.isPartitionDone(part)) {
- supplyMsg.last(part, loc.updateCounter());
+ supplyMsg.addLast(part, loc.updateCounter());
remainingParts.remove(part);
@@ -401,7 +379,7 @@ public class GridDhtPartitionSupplier {
assert loc != null
: "Supply partition is gone: grp=" +
grp.cacheOrGroupName() + ", p=" + p;
- supplyMsg.last(p, loc.updateCounter());
+ supplyMsg.addLast(p, loc.updateCounter());
remainingIter.remove();
@@ -488,7 +466,7 @@ public class GridDhtPartitionSupplier {
errMsg = supplyMsg;
}
else {
- errMsg = new GridDhtPartitionSupplyErrorMessage(
+ errMsg = new GridDhtPartitionSupplyMessage(
demandMsg.rebalanceId(),
grp.groupId(),
demandMsg.topologyVersion(),
diff --git
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionSupplyErrorMessage.java
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionSupplyErrorMessage.java
deleted file mode 100644
index 89a44bd69df..00000000000
---
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionSupplyErrorMessage.java
+++ /dev/null
@@ -1,138 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.ignite.internal.processors.cache.distributed.dht.preloader;
-
-import java.nio.ByteBuffer;
-import org.apache.ignite.IgniteCheckedException;
-import org.apache.ignite.internal.GridDirectTransient;
-import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion;
-import org.apache.ignite.internal.processors.cache.GridCacheSharedContext;
-import org.apache.ignite.internal.util.typedef.internal.U;
-import org.apache.ignite.plugin.extensions.communication.MessageReader;
-import org.apache.ignite.plugin.extensions.communication.MessageWriter;
-import org.jetbrains.annotations.Nullable;
-
-/**
- * Supply message with supplier error transfer support.
- */
-public class GridDhtPartitionSupplyErrorMessage extends
GridDhtPartitionSupplyMessage {
- /** Supplying process error. */
- @GridDirectTransient
- private Throwable err;
-
- /** Supplying process error bytes. */
- private byte[] errBytes;
-
- /**
- * Default constructor.
- */
- public GridDhtPartitionSupplyErrorMessage() {
- }
-
- /**
- * @param rebalanceId Rebalance id.
- * @param grpId Group id.
- * @param topVer Topology version.
- * @param addDepInfo Add dep info.
- * @param err Supply process error.
- */
- public GridDhtPartitionSupplyErrorMessage(
- long rebalanceId,
- int grpId,
- AffinityTopologyVersion topVer,
- boolean addDepInfo,
- Throwable err
- ) {
- super(rebalanceId, grpId, topVer, addDepInfo);
-
- this.err = err;
- }
-
- /** {@inheritDoc} */
- @Override public void prepareMarshal(GridCacheSharedContext ctx) throws
IgniteCheckedException {
- super.prepareMarshal(ctx);
-
- if (err != null && errBytes == null)
- errBytes = U.marshal(ctx, err);
- }
-
- /** {@inheritDoc} */
- @Override public void finishUnmarshal(GridCacheSharedContext ctx,
ClassLoader ldr) throws IgniteCheckedException {
- super.finishUnmarshal(ctx, ldr);
-
- if (errBytes != null && err == null)
- err = U.unmarshal(ctx, errBytes, U.resolveClassLoader(ldr,
ctx.gridConfig()));
- }
-
- /** {@inheritDoc} */
- @Override public boolean writeTo(ByteBuffer buf, MessageWriter writer) {
- writer.setBuffer(buf);
-
- if (!super.writeTo(buf, writer))
- return false;
-
- if (!writer.isHeaderWritten()) {
- if (!writer.writeHeader(directType()))
- return false;
-
- writer.onHeaderWritten();
- }
-
- switch (writer.state()) {
- case 13:
- if (!writer.writeByteArray(errBytes))
- return false;
-
- writer.incrementState();
-
- }
-
- return true;
- }
-
- /** {@inheritDoc} */
- @Override public boolean readFrom(ByteBuffer buf, MessageReader reader) {
- reader.setBuffer(buf);
-
- if (!super.readFrom(buf, reader))
- return false;
-
- switch (reader.state()) {
- case 13:
- errBytes = reader.readByteArray();
-
- if (!reader.isLastRead())
- return false;
-
- reader.incrementState();
-
- }
-
- return true;
- }
-
- /** {@inheritDoc} */
- @Nullable @Override public Throwable error() {
- return err;
- }
-
- /** {@inheritDoc} */
- @Override public short directType() {
- return 158;
- }
-}
diff --git
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionSupplyMessage.java
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionSupplyMessage.java
index c894a368113..c01319b763f 100644
---
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionSupplyMessage.java
+++
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionSupplyMessage.java
@@ -17,17 +17,13 @@
package org.apache.ignite.internal.processors.cache.distributed.dht.preloader;
-import java.nio.ByteBuffer;
import java.util.Collection;
-import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import org.apache.ignite.IgniteCheckedException;
-import org.apache.ignite.internal.GridDirectCollection;
-import org.apache.ignite.internal.GridDirectMap;
-import org.apache.ignite.internal.IgniteCodeGeneratingFail;
+import org.apache.ignite.internal.Order;
import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion;
import org.apache.ignite.internal.processors.cache.CacheEntryInfoCollection;
import org.apache.ignite.internal.processors.cache.CacheGroupContext;
@@ -38,48 +34,46 @@ import
org.apache.ignite.internal.processors.cache.GridCacheGroupIdMessage;
import org.apache.ignite.internal.processors.cache.GridCacheSharedContext;
import org.apache.ignite.internal.util.tostring.GridToStringInclude;
import org.apache.ignite.internal.util.typedef.internal.S;
-import
org.apache.ignite.plugin.extensions.communication.MessageCollectionItemType;
-import org.apache.ignite.plugin.extensions.communication.MessageReader;
-import org.apache.ignite.plugin.extensions.communication.MessageWriter;
+import org.apache.ignite.internal.util.typedef.internal.U;
+import org.jetbrains.annotations.Nullable;
/**
* Partition supply message.
*/
-@IgniteCodeGeneratingFail
+@SuppressWarnings("AssignmentOrReturnOfFieldWithMutableType")
public class GridDhtPartitionSupplyMessage extends GridCacheGroupIdMessage
implements GridCacheDeployable {
/** An unique (per demander) rebalance id. */
+ @Order(4)
private long rebalanceId;
- /** Topology version. */
+ /** Topology version for which demand message is sent. */
+ @Order(value = 5, method = "topologyVersion")
private AffinityTopologyVersion topVer;
/** Partitions that have been fully sent. */
- @GridDirectMap(keyType = int.class, valueType = long.class)
+ @Order(6)
private Map<Integer, Long> last;
/** Partitions which were not found. */
@GridToStringInclude
- @GridDirectCollection(int.class)
+ @Order(7)
private Collection<Integer> missed;
- /** Partitions for which we were able to get historical iterator. */
- @GridToStringInclude
- @GridDirectCollection(int.class)
- private Collection<Integer> clean;
-
/** Entries. */
- @GridDirectMap(keyType = int.class, valueType =
CacheEntryInfoCollection.class)
+ @Order(8)
private Map<Integer, CacheEntryInfoCollection> infos;
/** Message size. */
+ @Order(value = 9, method = "messageSize")
private int msgSize;
- /** Estimated keys count. */
- private long estimatedKeysCnt = -1;
+ /** Supplying process error. */
+ private Throwable err;
- /** Estimated keys count per cache in case the message is for shared
group. */
- @GridDirectMap(keyType = int.class, valueType = long.class)
- private Map<Integer, Long> keysPerCache;
+ // TODO: Should be removed in
https://issues.apache.org/jira/browse/IGNITE-26523
+ /** Serialized form of supplying process error. */
+ @Order(10)
+ private byte[] errBytes;
/**
* @param rebalanceId Rebalance id.
@@ -99,6 +93,26 @@ public class GridDhtPartitionSupplyMessage extends
GridCacheGroupIdMessage imple
this.addDepInfo = addDepInfo;
}
+ /**
+ * @param rebalanceId Rebalance id.
+ * @param grpId Cache group ID.
+ * @param topVer Topology version.
+ * @param addDepInfo Deployment info flag.
+ */
+ GridDhtPartitionSupplyMessage(
+ long rebalanceId,
+ int grpId,
+ AffinityTopologyVersion topVer,
+ boolean addDepInfo,
+ Throwable err
+ ) {
+ this.grpId = grpId;
+ this.rebalanceId = rebalanceId;
+ this.topVer = topVer;
+ this.addDepInfo = addDepInfo;
+ this.err = err;
+ }
+
/**
* Empty constructor.
*/
@@ -112,12 +126,19 @@ public class GridDhtPartitionSupplyMessage extends
GridCacheGroupIdMessage imple
}
/**
- * @return Rebalance id.
+ * @return An unique (per demander) rebalance id.
*/
- long rebalanceId() {
+ public long rebalanceId() {
return rebalanceId;
}
+ /**
+ * @param rebalanceId New unique (per demander) rebalance id.
+ */
+ public void rebalanceId(long rebalanceId) {
+ this.rebalanceId = rebalanceId;
+ }
+
/**
* @return Topology version for which demand message is sent.
*/
@@ -126,16 +147,30 @@ public class GridDhtPartitionSupplyMessage extends
GridCacheGroupIdMessage imple
}
/**
- * @return Flag to indicate last message for partition.
+ * @param topVer New topology version for which demand message is sent.
+ */
+ public void topologyVersion(AffinityTopologyVersion topVer) {
+ this.topVer = topVer;
+ }
+
+ /**
+ * @return Partitions that have been fully sent.
+ */
+ public Map<Integer, Long> last() {
+ return last;
+ }
+
+ /**
+ * @param last New map of partitions that have been fully sent.
*/
- Map<Integer, Long> last() {
- return last == null ? Collections.<Integer, Long>emptyMap() : last;
+ public void last(Map<Integer, Long> last) {
+ this.last = last;
}
/**
* @param p Partition which was fully sent.
*/
- void last(int p, long cntr) {
+ void addLast(int p, long cntr) {
if (last == null)
last = new HashMap<>();
@@ -143,35 +178,16 @@ public class GridDhtPartitionSupplyMessage extends
GridCacheGroupIdMessage imple
msgSize += 12;
// If partition is empty, we need to add it.
- if (!infos().containsKey(p)) {
+ if (!getInfosSafe().containsKey(p)) {
CacheEntryInfoCollection infoCol = new
CacheEntryInfoCollection();
infoCol.init();
- infos().put(p, infoCol);
+ getInfosSafe().put(p, infoCol);
}
}
}
- /**
- * @param p Partition to clean.
- */
- void clean(int p) {
- if (clean == null)
- clean = new HashSet<>();
-
- if (clean.add(p))
- msgSize += 4;
- }
-
- /**
- * @param p Partition to check.
- * @return Check result.
- */
- boolean isClean(int p) {
- return clean != null && clean.contains(p);
- }
-
/**
* @param p Missed partition.
*/
@@ -186,27 +202,74 @@ public class GridDhtPartitionSupplyMessage extends
GridCacheGroupIdMessage imple
/**
* @return Missed partitions.
*/
- Collection<Integer> missed() {
- return missed == null ? Collections.<Integer>emptySet() : missed;
+ public Collection<Integer> missed() {
+ return missed;
+ }
+
+ /**
+ * @param missed New partitions which were not found.
+ */
+ public void missed(Collection<Integer> missed) {
+ this.missed = missed;
}
/**
* @return Entries.
*/
- Map<Integer, CacheEntryInfoCollection> infos() {
+ public Map<Integer, CacheEntryInfoCollection> getInfosSafe() {
if (infos == null)
infos = new HashMap<>();
return infos;
}
+ /**
+ * @return Entries.
+ */
+ public Map<Integer, CacheEntryInfoCollection> infos() {
+ return infos;
+ }
+
+ /**
+ * @param infos New entries.
+ */
+ public void infos(Map<Integer, CacheEntryInfoCollection> infos) {
+ this.infos = infos;
+ }
+
+ /** Supplying process error. */
+ @Nullable @Override public Throwable error() {
+ return err;
+ }
+
+ /**
+ * @return Serialized form of supplying process error.
+ */
+ public byte[] errBytes() {
+ return errBytes;
+ }
+
+ /**
+ * @param errBytes New serialized form of supplying process error.
+ */
+ public void errBytes(byte[] errBytes) {
+ this.errBytes = errBytes;
+ }
+
/**
* @return Message size.
*/
- int messageSize() {
+ public int messageSize() {
return msgSize;
}
+ /**
+ * @param msgSize New message size.
+ */
+ public void messageSize(int msgSize) {
+ this.msgSize = msgSize;
+ }
+
/**
* @param p Partition.
* @param historical {@code True} if partition rebalancing using WAL
history.
@@ -215,7 +278,7 @@ public class GridDhtPartitionSupplyMessage extends
GridCacheGroupIdMessage imple
* @param cacheObjCtx Cache object context.
* @throws IgniteCheckedException If failed.
*/
- void addEntry0(int p, boolean historical, GridCacheEntryInfo info,
GridCacheSharedContext ctx,
+ void addEntry0(int p, boolean historical, GridCacheEntryInfo info,
GridCacheSharedContext<?, ?> ctx,
CacheObjectContext cacheObjCtx) throws IgniteCheckedException {
assert info != null;
assert info.key() != null : info;
@@ -226,12 +289,12 @@ public class GridDhtPartitionSupplyMessage extends
GridCacheGroupIdMessage imple
msgSize += info.marshalledSize(cacheObjCtx);
- CacheEntryInfoCollection infoCol = infos().get(p);
+ CacheEntryInfoCollection infoCol = getInfosSafe().get(p);
if (infoCol == null) {
msgSize += 4;
- infos().put(p, infoCol = new CacheEntryInfoCollection());
+ getInfosSafe().put(p, infoCol = new CacheEntryInfoCollection());
infoCol.init();
}
@@ -240,8 +303,16 @@ public class GridDhtPartitionSupplyMessage extends
GridCacheGroupIdMessage imple
}
/** {@inheritDoc} */
- @SuppressWarnings("ForLoopReplaceableByForEach")
- @Override public void finishUnmarshal(GridCacheSharedContext ctx,
ClassLoader ldr) throws IgniteCheckedException {
+ @Override public void prepareMarshal(GridCacheSharedContext<?, ?> ctx)
throws IgniteCheckedException {
+ super.prepareMarshal(ctx);
+
+ // TODO: Should be removed in
https://issues.apache.org/jira/browse/IGNITE-26523
+ if (err != null && errBytes == null)
+ errBytes = U.marshal(ctx, err);
+ }
+
+ /** {@inheritDoc} */
+ @Override public void finishUnmarshal(GridCacheSharedContext<?, ?> ctx,
ClassLoader ldr) throws IgniteCheckedException {
super.finishUnmarshal(ctx, ldr);
CacheGroupContext grp = ctx.cache().cacheGroup(grpId);
@@ -249,12 +320,16 @@ public class GridDhtPartitionSupplyMessage extends
GridCacheGroupIdMessage imple
if (grp == null)
return;
- for (CacheEntryInfoCollection col : infos().values()) {
+ for (CacheEntryInfoCollection col : getInfosSafe().values()) {
List<GridCacheEntryInfo> entries = col.infos();
for (int i = 0; i < entries.size(); i++)
entries.get(i).unmarshal(grp.cacheObjectContext(), ldr);
}
+
+ // TODO: Should be removed in
https://issues.apache.org/jira/browse/IGNITE-26523
+ if (errBytes != null && err == null)
+ err = U.unmarshal(ctx, errBytes, U.resolveClassLoader(ldr,
ctx.gridConfig()));
}
/** {@inheritDoc} */
@@ -266,168 +341,7 @@ public class GridDhtPartitionSupplyMessage extends
GridCacheGroupIdMessage imple
* @return Number of entries in message.
*/
public int size() {
- return infos().size();
- }
-
- /** {@inheritDoc} */
- @Override public boolean writeTo(ByteBuffer buf, MessageWriter writer) {
- writer.setBuffer(buf);
-
- if (!super.writeTo(buf, writer))
- return false;
-
- if (!writer.isHeaderWritten()) {
- if (!writer.writeHeader(directType()))
- return false;
-
- writer.onHeaderWritten();
- }
-
- switch (writer.state()) {
- case 4:
- if (!writer.writeCollection(clean,
MessageCollectionItemType.INT))
- return false;
-
- writer.incrementState();
-
- case 5:
- if (!writer.writeLong(estimatedKeysCnt))
- return false;
-
- writer.incrementState();
-
- case 6:
- if (!writer.writeMap(infos, MessageCollectionItemType.INT,
MessageCollectionItemType.MSG))
- return false;
-
- writer.incrementState();
-
- case 7:
- if (!writer.writeMap(keysPerCache,
MessageCollectionItemType.INT, MessageCollectionItemType.LONG))
- return false;
-
- writer.incrementState();
-
- case 8:
- if (!writer.writeMap(last, MessageCollectionItemType.INT,
MessageCollectionItemType.LONG))
- return false;
-
- writer.incrementState();
-
- case 9:
- if (!writer.writeCollection(missed,
MessageCollectionItemType.INT))
- return false;
-
- writer.incrementState();
-
- case 10:
- if (!writer.writeInt(msgSize))
- return false;
-
- writer.incrementState();
-
- case 11:
- if (!writer.writeAffinityTopologyVersion(topVer))
- return false;
-
- writer.incrementState();
-
- case 12:
- // Keep 'updateSeq' name for compatibility.
- if (!writer.writeLong(rebalanceId))
- return false;
-
- writer.incrementState();
-
- }
-
- return true;
- }
-
- /** {@inheritDoc} */
- @Override public boolean readFrom(ByteBuffer buf, MessageReader reader) {
- reader.setBuffer(buf);
-
- if (!super.readFrom(buf, reader))
- return false;
-
- switch (reader.state()) {
- case 4:
- clean = reader.readCollection(MessageCollectionItemType.INT);
-
- if (!reader.isLastRead())
- return false;
-
- reader.incrementState();
-
- case 5:
- estimatedKeysCnt = reader.readLong();
-
- if (!reader.isLastRead())
- return false;
-
- reader.incrementState();
-
- case 6:
- infos = reader.readMap(MessageCollectionItemType.INT,
MessageCollectionItemType.MSG, false);
-
- if (!reader.isLastRead())
- return false;
-
- reader.incrementState();
-
- case 7:
- keysPerCache = reader.readMap(MessageCollectionItemType.INT,
MessageCollectionItemType.LONG, false);
-
- if (!reader.isLastRead())
- return false;
-
- reader.incrementState();
-
- case 8:
- last = reader.readMap(MessageCollectionItemType.INT,
MessageCollectionItemType.LONG, false);
-
- if (!reader.isLastRead())
- return false;
-
- reader.incrementState();
-
- case 9:
- missed = reader.readCollection(MessageCollectionItemType.INT);
-
- if (!reader.isLastRead())
- return false;
-
- reader.incrementState();
-
- case 10:
- msgSize = reader.readInt();
-
- if (!reader.isLastRead())
- return false;
-
- reader.incrementState();
-
- case 11:
- topVer = reader.readAffinityTopologyVersion();
-
- if (!reader.isLastRead())
- return false;
-
- reader.incrementState();
-
- case 12:
- // Keep 'updateSeq' name for compatibility.
- rebalanceId = reader.readLong();
-
- if (!reader.isLastRead())
- return false;
-
- reader.incrementState();
-
- }
-
- return true;
+ return getInfosSafe().size();
}
/** {@inheritDoc} */
@@ -435,53 +349,11 @@ public class GridDhtPartitionSupplyMessage extends
GridCacheGroupIdMessage imple
return 114;
}
- /**
- * @return Estimated keys count.
- */
- public long estimatedKeysCount() {
- return -1;
- }
-
- /**
- * @param cnt Keys count to add.
- */
- public void addEstimatedKeysCount(long cnt) {
- this.estimatedKeysCnt += cnt;
- }
-
- /**
- * @return Estimated keys count for a given cache ID.
- */
- public long keysForCache(int cacheId) {
- return -1;
- }
-
- /**
- * @param cacheId Cache ID.
- * @param cnt Keys count.
- */
- public void addKeysForCache(int cacheId, long cnt) {
- assert cacheId != 0 && cnt >= 0;
-
- if (keysPerCache == null)
- keysPerCache = new HashMap<>();
-
- Long cnt0 = keysPerCache.get(cacheId);
-
- if (cnt0 == null) {
- keysPerCache.put(cacheId, cnt);
-
- msgSize += 12;
- }
- else
- keysPerCache.put(cacheId, cnt0 + cnt);
- }
-
/** {@inheritDoc} */
@Override public String toString() {
return S.toString(GridDhtPartitionSupplyMessage.class, this,
"size", size(),
- "parts", infos().keySet(),
+ "parts", getInfosSafe().keySet(),
"super", super.toString());
}
}
diff --git a/modules/core/src/main/resources/META-INF/classnames.properties
b/modules/core/src/main/resources/META-INF/classnames.properties
index 319e2991b35..eb5449d73e3 100644
--- a/modules/core/src/main/resources/META-INF/classnames.properties
+++ b/modules/core/src/main/resources/META-INF/classnames.properties
@@ -1189,7 +1189,6 @@
org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPar
org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPartitionExchangeId
org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPartitionFullMap
org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPartitionMap
-org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPartitionSupplyErrorMessage
org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPartitionSupplyMessage
org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPartitionsAbstractMessage
org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPartitionsExchangeFuture$2
diff --git
a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/CacheNoAffinityExchangeTest.java
b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/CacheNoAffinityExchangeTest.java
index 48851c0de24..6c092d77e6c 100644
---
a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/CacheNoAffinityExchangeTest.java
+++
b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/CacheNoAffinityExchangeTest.java
@@ -46,7 +46,7 @@ import org.apache.ignite.internal.IgniteInternalFuture;
import org.apache.ignite.internal.TestRecordingCommunicationSpi;
import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion;
import
org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtAffinityAssignmentRequest;
-import
org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPartitionSupplyErrorMessage;
+import
org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPartitionSupplyMessage;
import
org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPartitionsSingleMessage;
import org.apache.ignite.internal.util.typedef.G;
import org.apache.ignite.internal.util.typedef.internal.U;
@@ -290,7 +290,7 @@ public class CacheNoAffinityExchangeTest extends
GridCommonAbstractTest {
TestRecordingCommunicationSpi.spi(ig).blockMessages(new
IgniteBiPredicate<ClusterNode, Message>() {
@Override public boolean apply(ClusterNode node, Message msg) {
- return msg instanceof GridDhtPartitionSupplyErrorMessage;
+ return msg instanceof GridDhtPartitionSupplyMessage &&
((GridCacheMessage)msg).error() != null;
}
});