IGNITE-4941: Removed old GridDhtPartitionSupplyMessage.
Project: http://git-wip-us.apache.org/repos/asf/ignite/repo Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/aeacad6b Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/aeacad6b Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/aeacad6b Branch: refs/heads/ignite-4929 Commit: aeacad6b87ac95dd2f5da525573d6fa58f4e51db Parents: edfa353 Author: devozerov <[email protected]> Authored: Tue Apr 11 12:18:52 2017 +0300 Committer: devozerov <[email protected]> Committed: Tue Apr 11 12:18:52 2017 +0300 ---------------------------------------------------------------------- .../communication/GridIoMessageFactory.java | 8 +- .../GridCachePartitionExchangeManager.java | 7 +- .../processors/cache/GridCachePreloader.java | 4 +- .../cache/GridCachePreloaderAdapter.java | 4 +- .../dht/preloader/GridDhtPartitionDemander.java | 2 +- .../dht/preloader/GridDhtPartitionSupplier.java | 310 +-------------- .../GridDhtPartitionSupplyMessage.java | 99 ++--- .../GridDhtPartitionSupplyMessageV2.java | 384 ------------------- .../dht/preloader/GridDhtPreloader.java | 3 +- .../resources/META-INF/classnames.properties | 1 - .../CacheLateAffinityAssignmentTest.java | 6 +- .../IgniteCacheReadFromBackupTest.java | 6 +- .../atomic/IgniteCacheAtomicProtocolTest.java | 3 +- 13 files changed, 60 insertions(+), 777 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/ignite/blob/aeacad6b/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoMessageFactory.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoMessageFactory.java b/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoMessageFactory.java index 737d047..8aac56d 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoMessageFactory.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoMessageFactory.java @@ -86,7 +86,6 @@ import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.Gri import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPartitionDemandMessage; import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPartitionExchangeId; import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPartitionSupplyMessage; -import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPartitionSupplyMessageV2; import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPartitionsFullMessage; import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPartitionsSingleMessage; import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPartitionsSingleRequest; @@ -498,11 +497,6 @@ public class GridIoMessageFactory implements MessageFactory { break; - case 45: - msg = new GridDhtPartitionSupplyMessage(); - - break; - case 46: msg = new GridDhtPartitionsFullMessage(); @@ -824,7 +818,7 @@ public class GridIoMessageFactory implements MessageFactory { break; case 114: - msg = new GridDhtPartitionSupplyMessageV2(); + msg = new GridDhtPartitionSupplyMessage(); break; http://git-wip-us.apache.org/repos/asf/ignite/blob/aeacad6b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java index 231dff8..885106d 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java @@ -67,8 +67,7 @@ import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.Gri import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPartitionExchangeId; import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPartitionFullMap; import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPartitionMap; -import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPartitionSupplyMessageV2; -import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPartitionsAbstractMessage; +import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPartitionSupplyMessage; import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPartitionsExchangeFuture; import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPartitionsFullMessage; import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPartitionsSingleMessage; @@ -394,9 +393,9 @@ public class GridCachePartitionExchangeManager<K, V> extends GridCacheSharedMana GridCacheContext cacheCtx = cctx.cacheContext(m.cacheId); if (cacheCtx != null) { - if (m instanceof GridDhtPartitionSupplyMessageV2) + if (m instanceof GridDhtPartitionSupplyMessage) cacheCtx.preloader().handleSupplyMessage( - idx, id, (GridDhtPartitionSupplyMessageV2)m); + idx, id, (GridDhtPartitionSupplyMessage)m); else if (m instanceof GridDhtPartitionDemandMessage) cacheCtx.preloader().handleDemandMessage( idx, id, (GridDhtPartitionDemandMessage)m); http://git-wip-us.apache.org/repos/asf/ignite/blob/aeacad6b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePreloader.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePreloader.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePreloader.java index 0c28691..df0d71d 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePreloader.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePreloader.java @@ -25,7 +25,7 @@ import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion; import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtLocalPartition; import org.apache.ignite.internal.processors.cache.distributed.dht.atomic.GridNearAtomicAbstractUpdateRequest; import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPartitionDemandMessage; -import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPartitionSupplyMessageV2; +import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPartitionSupplyMessage; import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPartitionsExchangeFuture; import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPreloaderAssignments; import org.apache.ignite.internal.util.future.GridFutureAdapter; @@ -168,7 +168,7 @@ public interface GridCachePreloader { * @param id Node Id. * @param s Supply message. */ - public void handleSupplyMessage(int idx, UUID id, final GridDhtPartitionSupplyMessageV2 s); + public void handleSupplyMessage(int idx, UUID id, final GridDhtPartitionSupplyMessage s); /** * Handles Demand message. http://git-wip-us.apache.org/repos/asf/ignite/blob/aeacad6b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePreloaderAdapter.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePreloaderAdapter.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePreloaderAdapter.java index d7ec288..ac3b6cc 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePreloaderAdapter.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePreloaderAdapter.java @@ -27,7 +27,7 @@ import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion; import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtLocalPartition; import org.apache.ignite.internal.processors.cache.distributed.dht.atomic.GridNearAtomicAbstractUpdateRequest; import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPartitionDemandMessage; -import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPartitionSupplyMessageV2; +import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPartitionSupplyMessage; import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPartitionsExchangeFuture; import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPreloaderAssignments; import org.apache.ignite.internal.util.future.GridFinishedFuture; @@ -134,7 +134,7 @@ public class GridCachePreloaderAdapter implements GridCachePreloader { } /** {@inheritDoc} */ - @Override public void handleSupplyMessage(int idx, UUID id, GridDhtPartitionSupplyMessageV2 s) { + @Override public void handleSupplyMessage(int idx, UUID id, GridDhtPartitionSupplyMessage s) { // No-op. } http://git-wip-us.apache.org/repos/asf/ignite/blob/aeacad6b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionDemander.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionDemander.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionDemander.java index d5f2246..f8114cb 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionDemander.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionDemander.java @@ -541,7 +541,7 @@ public class GridDhtPartitionDemander { public void handleSupplyMessage( int idx, final UUID id, - final GridDhtPartitionSupplyMessageV2 supply + final GridDhtPartitionSupplyMessage supply ) { AffinityTopologyVersion topVer = supply.topologyVersion(); http://git-wip-us.apache.org/repos/asf/ignite/blob/aeacad6b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionSupplier.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionSupplier.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionSupplier.java index 9942423..7c2599a 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionSupplier.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionSupplier.java @@ -38,7 +38,6 @@ import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtLocalP import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtPartitionTopology; import org.apache.ignite.internal.util.lang.GridCloseableIterator; import org.apache.ignite.internal.util.tostring.GridToStringExclude; -import org.apache.ignite.internal.util.typedef.CI2; import org.apache.ignite.internal.util.typedef.T3; import org.apache.ignite.internal.util.typedef.internal.S; import org.apache.ignite.internal.util.typedef.internal.U; @@ -87,13 +86,6 @@ class GridDhtPartitionSupplier { /** * */ - void start() { - startOldListeners(); - } - - /** - * - */ void stop() { synchronized (scMap) { Iterator<T3<UUID, Integer, AffinityTopologyVersion>> it = scMap.keySet().iterator(); @@ -106,8 +98,6 @@ class GridDhtPartitionSupplier { it.remove(); } } - - stopOldListeners(); } /** @@ -146,6 +136,7 @@ class GridDhtPartitionSupplier { * * @param topVer Topology version. */ + @SuppressWarnings("ConstantConditions") public void onTopologyChanged(AffinityTopologyVersion topVer) { synchronized (scMap) { Iterator<T3<UUID, Integer, AffinityTopologyVersion>> it = scMap.keySet().iterator(); @@ -179,6 +170,7 @@ class GridDhtPartitionSupplier { * @param idx Index. * @param id Node uuid. */ + @SuppressWarnings("unchecked") public void handleDemandMessage(int idx, UUID id, GridDhtPartitionDemandMessage d) { assert d != null; assert id != null; @@ -208,7 +200,7 @@ class GridDhtPartitionSupplier { log.debug("Demand request accepted [current=" + cutTop + ", demanded=" + demTop + ", from=" + id + ", idx=" + idx + "]"); - GridDhtPartitionSupplyMessageV2 s = new GridDhtPartitionSupplyMessageV2( + GridDhtPartitionSupplyMessage s = new GridDhtPartitionSupplyMessage( d.updateSequence(), cctx.cacheId(), d.topologyVersion(), cctx.deploymentEnabled()); ClusterNode node = cctx.discovery().node(id); @@ -338,7 +330,7 @@ class GridDhtPartitionSupplier { if (!reply(node, d, s, scId)) return; - s = new GridDhtPartitionSupplyMessageV2(d.updateSequence(), + s = new GridDhtPartitionSupplyMessage(d.updateSequence(), cctx.cacheId(), d.topologyVersion(), cctx.deploymentEnabled()); } } @@ -424,7 +416,7 @@ class GridDhtPartitionSupplier { if (!reply(node, d, s, scId)) return; - s = new GridDhtPartitionSupplyMessageV2(d.updateSequence(), + s = new GridDhtPartitionSupplyMessage(d.updateSequence(), cctx.cacheId(), d.topologyVersion(), cctx.deploymentEnabled()); } } @@ -545,7 +537,7 @@ class GridDhtPartitionSupplier { if (!reply(node, d, s, scId)) return; - s = new GridDhtPartitionSupplyMessageV2(d.updateSequence(), + s = new GridDhtPartitionSupplyMessage(d.updateSequence(), cctx.cacheId(), d.topologyVersion(), cctx.deploymentEnabled()); } } @@ -605,7 +597,7 @@ class GridDhtPartitionSupplier { */ private boolean reply(ClusterNode n, GridDhtPartitionDemandMessage d, - GridDhtPartitionSupplyMessageV2 s, + GridDhtPartitionSupplyMessage s, T3<UUID, Integer, AffinityTopologyVersion> scId) throws IgniteCheckedException { @@ -744,294 +736,6 @@ class GridDhtPartitionSupplier { } } - @Deprecated//Backward compatibility. To be removed in future. - public void startOldListeners() { - if (!cctx.kernalContext().clientNode() && cctx.rebalanceEnabled()) { - cctx.io().addHandler(cctx.cacheId(), GridDhtPartitionDemandMessage.class, new CI2<UUID, GridDhtPartitionDemandMessage>() { - @Override public void apply(UUID id, GridDhtPartitionDemandMessage m) { - processOldDemandMessage(m, id); - } - }); - } - } - - @Deprecated//Backward compatibility. To be removed in future. - public void stopOldListeners() { - if (!cctx.kernalContext().clientNode() && cctx.rebalanceEnabled()) - cctx.io().removeHandler(cctx.cacheId(), GridDhtPartitionDemandMessage.class); - } - - /** - * @param d D. - * @param id Id. - */ - @Deprecated//Backward compatibility. To be removed in future. - private void processOldDemandMessage(GridDhtPartitionDemandMessage d, UUID id) { - GridDhtPartitionSupplyMessage s = new GridDhtPartitionSupplyMessage(d.workerId(), - d.updateSequence(), cctx.cacheId(), cctx.deploymentEnabled()); - - ClusterNode node = cctx.node(id); - - if (node == null) - return; - - long preloadThrottle = cctx.config().getRebalanceThrottle(); - - boolean ack = false; - - try { - for (int part : d.partitions()) { - GridDhtLocalPartition loc = top.localPartition(part, d.topologyVersion(), false); - - if (loc == null || loc.state() != OWNING || !loc.reserve()) { - // Reply with partition of "-1" to let sender know that - // this node is no longer an owner. - s.missed(part); - - if (log.isDebugEnabled()) - log.debug("Requested partition is not owned by local node [part=" + part + - ", demander=" + id + ']'); - - continue; - } - - GridCacheEntryInfoCollectSwapListener swapLsnr = null; - - try { - if (cctx.isSwapOrOffheapEnabled()) { - swapLsnr = new GridCacheEntryInfoCollectSwapListener(log); - - cctx.swap().addOffHeapListener(part, swapLsnr); - cctx.swap().addSwapListener(part, swapLsnr); - } - - boolean partMissing = false; - - for (GridCacheEntryEx e : loc.allEntries()) { - if (!cctx.affinity().partitionBelongs(node, part, d.topologyVersion())) { - // Demander no longer needs this partition, so we send '-1' partition and move on. - s.missed(part); - - if (log.isDebugEnabled()) - log.debug("Demanding node does not need requested partition [part=" + part + - ", nodeId=" + id + ']'); - - partMissing = true; - - break; - } - - if (s.messageSize() >= cctx.config().getRebalanceBatchSize()) { - ack = true; - - if (!replyOld(node, d, s)) - return; - - // Throttle preloading. - if (preloadThrottle > 0) - U.sleep(preloadThrottle); - - s = new GridDhtPartitionSupplyMessage(d.workerId(), d.updateSequence(), - cctx.cacheId(), cctx.deploymentEnabled()); - } - - GridCacheEntryInfo info = e.info(); - - if (info != null && !info.isNew()) { - if (preloadPred == null || preloadPred.apply(info)) - s.addEntry(part, info, cctx); - else if (log.isDebugEnabled()) - log.debug("Rebalance predicate evaluated to false (will not sender cache entry): " + - info); - } - } - - if (partMissing) - continue; - - if (cctx.isSwapOrOffheapEnabled()) { - GridCloseableIterator<Map.Entry<byte[], GridCacheSwapEntry>> iter = - cctx.swap().iterator(part); - - // Iterator may be null if space does not exist. - if (iter != null) { - try { - boolean prepared = false; - - for (Map.Entry<byte[], GridCacheSwapEntry> e : iter) { - if (!cctx.affinity().partitionBelongs(node, part, d.topologyVersion())) { - // Demander no longer needs this partition, - // so we send '-1' partition and move on. - s.missed(part); - - if (log.isDebugEnabled()) - log.debug("Demanding node does not need requested partition " + - "[part=" + part + ", nodeId=" + id + ']'); - - partMissing = true; - - break; // For. - } - - if (s.messageSize() >= cctx.config().getRebalanceBatchSize()) { - ack = true; - - if (!replyOld(node, d, s)) - return; - - // Throttle preloading. - if (preloadThrottle > 0) - U.sleep(preloadThrottle); - - s = new GridDhtPartitionSupplyMessage(d.workerId(), - d.updateSequence(), cctx.cacheId(), cctx.deploymentEnabled()); - } - - GridCacheSwapEntry swapEntry = e.getValue(); - - GridCacheEntryInfo info = new GridCacheEntryInfo(); - - info.keyBytes(e.getKey()); - info.ttl(swapEntry.ttl()); - info.expireTime(swapEntry.expireTime()); - info.version(swapEntry.version()); - info.value(swapEntry.value()); - - if (preloadPred == null || preloadPred.apply(info)) - s.addEntry0(part, info, cctx); - else { - if (log.isDebugEnabled()) - log.debug("Rebalance predicate evaluated to false (will not send " + - "cache entry): " + info); - - continue; - } - - // Need to manually prepare cache message. - if (depEnabled && !prepared) { - ClassLoader ldr = swapEntry.keyClassLoaderId() != null ? - cctx.deploy().getClassLoader(swapEntry.keyClassLoaderId()) : - swapEntry.valueClassLoaderId() != null ? - cctx.deploy().getClassLoader(swapEntry.valueClassLoaderId()) : - null; - - if (ldr == null) - continue; - - if (ldr instanceof GridDeploymentInfo) { - s.prepare((GridDeploymentInfo)ldr); - - prepared = true; - } - } - } - - if (partMissing) - continue; - } - finally { - iter.close(); - } - } - } - - // Stop receiving promote notifications. - if (swapLsnr != null) { - cctx.swap().removeOffHeapListener(part, swapLsnr); - cctx.swap().removeSwapListener(part, swapLsnr); - } - - if (swapLsnr != null) { - Collection<GridCacheEntryInfo> entries = swapLsnr.entries(); - - swapLsnr = null; - - for (GridCacheEntryInfo info : entries) { - if (!cctx.affinity().partitionBelongs(node, part, d.topologyVersion())) { - // Demander no longer needs this partition, - // so we send '-1' partition and move on. - s.missed(part); - - if (log.isDebugEnabled()) - log.debug("Demanding node does not need requested partition " + - "[part=" + part + ", nodeId=" + id + ']'); - - // No need to continue iteration over swap entries. - break; - } - - if (s.messageSize() >= cctx.config().getRebalanceBatchSize()) { - ack = true; - - if (!replyOld(node, d, s)) - return; - - s = new GridDhtPartitionSupplyMessage(d.workerId(), - d.updateSequence(), - cctx.cacheId(), - cctx.deploymentEnabled()); - } - - if (preloadPred == null || preloadPred.apply(info)) - s.addEntry(part, info, cctx); - else if (log.isDebugEnabled()) - log.debug("Rebalance predicate evaluated to false (will not sender cache entry): " + - info); - } - } - - // Mark as last supply message. - s.last(part); - - if (ack) { - s.markAck(); - - break; // Partition for loop. - } - } - finally { - loc.release(); - - if (swapLsnr != null) { - cctx.swap().removeOffHeapListener(part, swapLsnr); - cctx.swap().removeSwapListener(part, swapLsnr); - } - } - } - - replyOld(node, d, s); - } - catch (IgniteCheckedException e) { - U.error(log, "Failed to send partition supply message to node: " + node.id(), e); - } - } - - /** - * @param n Node. - * @param d Demand message. - * @param s Supply message. - * @return {@code True} if message was sent, {@code false} if recipient left grid. - * @throws IgniteCheckedException If failed. - */ - @Deprecated//Backward compatibility. To be removed in future. - private boolean replyOld(ClusterNode n, GridDhtPartitionDemandMessage d, GridDhtPartitionSupplyMessage s) - throws IgniteCheckedException { - try { - if (log.isDebugEnabled()) - log.debug("Replying to partition demand [node=" + n.id() + ", demand=" + d + ", supply=" + s + ']'); - - cctx.io().sendOrderedMessage(n, d.topic(), s, cctx.ioPolicy(), d.timeout()); - - return true; - } - catch (ClusterTopologyCheckedException ignore) { - if (log.isDebugEnabled()) - log.debug("Failed to send partition supply message because node left grid: " + n.id()); - - return false; - } - } - /** * Dumps debug information. */ http://git-wip-us.apache.org/repos/asf/ignite/blob/aeacad6b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionSupplyMessage.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionSupplyMessage.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionSupplyMessage.java index cc30321..a01be28 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionSupplyMessage.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionSupplyMessage.java @@ -29,6 +29,7 @@ import org.apache.ignite.IgniteCheckedException; import org.apache.ignite.internal.GridDirectCollection; import org.apache.ignite.internal.GridDirectMap; import org.apache.ignite.internal.GridDirectTransient; +import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion; import org.apache.ignite.internal.processors.cache.CacheEntryInfoCollection; import org.apache.ignite.internal.processors.cache.GridCacheContext; import org.apache.ignite.internal.processors.cache.GridCacheDeployable; @@ -48,14 +49,11 @@ public class GridDhtPartitionSupplyMessage extends GridCacheMessage implements G /** */ private static final long serialVersionUID = 0L; - /** Worker ID. */ - private int workerId = -1; - /** Update sequence. */ private long updateSeq; - /** Acknowledgement flag. */ - private boolean ack; + /** Topology version. */ + private AffinityTopologyVersion topVer; /** Partitions that have been fully sent. */ @GridDirectCollection(int.class) @@ -68,27 +66,26 @@ public class GridDhtPartitionSupplyMessage extends GridCacheMessage implements G /** Entries. */ @GridDirectMap(keyType = int.class, valueType = CacheEntryInfoCollection.class) - private Map<Integer, CacheEntryInfoCollection> infos = new HashMap<>(); + private Map<Integer, CacheEntryInfoCollection> infos; /** Message size. */ @GridDirectTransient private int msgSize; /** - * @param workerId Worker ID. * @param updateSeq Update sequence for this node. * @param cacheId Cache ID. + * @param topVer Topology version. * @param addDepInfo Deployment info flag. */ - GridDhtPartitionSupplyMessage(int workerId, long updateSeq, int cacheId, boolean addDepInfo) { - assert workerId >= 0; - assert updateSeq > 0; - + GridDhtPartitionSupplyMessage(long updateSeq, + int cacheId, + AffinityTopologyVersion topVer, + boolean addDepInfo) { this.cacheId = cacheId; this.updateSeq = updateSeq; - this.workerId = workerId; - this.addDepInfo = addDepInfo; - } + this.topVer = topVer; + this.addDepInfo = addDepInfo; } /** * Empty constructor required for {@link Externalizable}. @@ -103,13 +100,6 @@ public class GridDhtPartitionSupplyMessage extends GridCacheMessage implements G } /** - * @return Worker ID. - */ - int workerId() { - return workerId; - } - - /** * @return Update sequence. */ long updateSequence() { @@ -117,17 +107,10 @@ public class GridDhtPartitionSupplyMessage extends GridCacheMessage implements G } /** - * Marks this message for acknowledgment. + * @return Topology version for which demand message is sent. */ - void markAck() { - ack = true; - } - - /** - * @return Acknowledgement flag. - */ - boolean ack() { - return ack; + @Override public AffinityTopologyVersion topologyVersion() { + return topVer; } /** @@ -148,12 +131,12 @@ public class GridDhtPartitionSupplyMessage extends GridCacheMessage implements G msgSize += 4; // If partition is empty, we need to add it. - if (!infos.containsKey(p)) { + if (!infos().containsKey(p)) { CacheEntryInfoCollection infoCol = new CacheEntryInfoCollection(); infoCol.init(); - infos.put(p, infoCol); + infos().put(p, infoCol); } } } @@ -180,6 +163,9 @@ public class GridDhtPartitionSupplyMessage extends GridCacheMessage implements G * @return Entries. */ Map<Integer, CacheEntryInfoCollection> infos() { + if (infos == null) + infos = new HashMap<>(); + return infos; } @@ -203,12 +189,12 @@ public class GridDhtPartitionSupplyMessage extends GridCacheMessage implements G msgSize += info.marshalledSize(ctx); - CacheEntryInfoCollection infoCol = infos.get(p); + CacheEntryInfoCollection infoCol = infos().get(p); if (infoCol == null) { msgSize += 4; - infos.put(p, infoCol = new CacheEntryInfoCollection()); + infos().put(p, infoCol = new CacheEntryInfoCollection()); infoCol.init(); } @@ -232,12 +218,12 @@ public class GridDhtPartitionSupplyMessage extends GridCacheMessage implements G msgSize += info.marshalledSize(ctx); - CacheEntryInfoCollection infoCol = infos.get(p); + CacheEntryInfoCollection infoCol = infos().get(p); if (infoCol == null) { msgSize += 4; - infos.put(p, infoCol = new CacheEntryInfoCollection()); + infos().put(p, infoCol = new CacheEntryInfoCollection()); infoCol.init(); } @@ -253,7 +239,7 @@ public class GridDhtPartitionSupplyMessage extends GridCacheMessage implements G GridCacheContext cacheCtx = ctx.cacheContext(cacheId); for (CacheEntryInfoCollection col : infos().values()) { - List<GridCacheEntryInfo> entries = col.infos(); + List<GridCacheEntryInfo> entries = col.infos(); for (int i = 0; i < entries.size(); i++) entries.get(i).unmarshal(cacheCtx, ldr); @@ -269,7 +255,7 @@ public class GridDhtPartitionSupplyMessage extends GridCacheMessage implements G * @return Number of entries in message. */ public int size() { - return infos.size(); + return infos().size(); } /** {@inheritDoc} */ @@ -288,25 +274,25 @@ public class GridDhtPartitionSupplyMessage extends GridCacheMessage implements G switch (writer.state()) { case 3: - if (!writer.writeBoolean("ack", ack)) + if (!writer.writeMap("infos", infos, MessageCollectionItemType.INT, MessageCollectionItemType.MSG)) return false; writer.incrementState(); case 4: - if (!writer.writeMap("infos", infos, MessageCollectionItemType.INT, MessageCollectionItemType.MSG)) + if (!writer.writeCollection("last", last, MessageCollectionItemType.INT)) return false; writer.incrementState(); case 5: - if (!writer.writeCollection("last", last, MessageCollectionItemType.INT)) + if (!writer.writeCollection("missed", missed, MessageCollectionItemType.INT)) return false; writer.incrementState(); case 6: - if (!writer.writeCollection("missed", missed, MessageCollectionItemType.INT)) + if (!writer.writeMessage("topVer", topVer)) return false; writer.incrementState(); @@ -317,12 +303,6 @@ public class GridDhtPartitionSupplyMessage extends GridCacheMessage implements G writer.incrementState(); - case 8: - if (!writer.writeInt("workerId", workerId)) - return false; - - writer.incrementState(); - } return true; @@ -340,7 +320,7 @@ public class GridDhtPartitionSupplyMessage extends GridCacheMessage implements G switch (reader.state()) { case 3: - ack = reader.readBoolean("ack"); + infos = reader.readMap("infos", MessageCollectionItemType.INT, MessageCollectionItemType.MSG, false); if (!reader.isLastRead()) return false; @@ -348,7 +328,7 @@ public class GridDhtPartitionSupplyMessage extends GridCacheMessage implements G reader.incrementState(); case 4: - infos = reader.readMap("infos", MessageCollectionItemType.INT, MessageCollectionItemType.MSG, false); + last = reader.readCollection("last", MessageCollectionItemType.INT); if (!reader.isLastRead()) return false; @@ -356,7 +336,7 @@ public class GridDhtPartitionSupplyMessage extends GridCacheMessage implements G reader.incrementState(); case 5: - last = reader.readCollection("last", MessageCollectionItemType.INT); + missed = reader.readCollection("missed", MessageCollectionItemType.INT); if (!reader.isLastRead()) return false; @@ -364,7 +344,7 @@ public class GridDhtPartitionSupplyMessage extends GridCacheMessage implements G reader.incrementState(); case 6: - missed = reader.readCollection("missed", MessageCollectionItemType.INT); + topVer = reader.readMessage("topVer"); if (!reader.isLastRead()) return false; @@ -379,14 +359,6 @@ public class GridDhtPartitionSupplyMessage extends GridCacheMessage implements G reader.incrementState(); - case 8: - workerId = reader.readInt("workerId"); - - if (!reader.isLastRead()) - return false; - - reader.incrementState(); - } return reader.afterMessageRead(GridDhtPartitionSupplyMessage.class); @@ -394,18 +366,19 @@ public class GridDhtPartitionSupplyMessage extends GridCacheMessage implements G /** {@inheritDoc} */ @Override public short directType() { - return 45; + return 114; } /** {@inheritDoc} */ @Override public byte fieldsCount() { - return 9; + return 8; } /** {@inheritDoc} */ @Override public String toString() { return S.toString(GridDhtPartitionSupplyMessage.class, this, "size", size(), + "parts", infos().keySet(), "super", super.toString()); } } http://git-wip-us.apache.org/repos/asf/ignite/blob/aeacad6b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionSupplyMessageV2.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionSupplyMessageV2.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionSupplyMessageV2.java deleted file mode 100644 index 2294582..0000000 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionSupplyMessageV2.java +++ /dev/null @@ -1,384 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.ignite.internal.processors.cache.distributed.dht.preloader; - -import java.io.Externalizable; -import java.nio.ByteBuffer; -import java.util.Collection; -import java.util.Collections; -import java.util.HashMap; -import java.util.HashSet; -import java.util.List; -import java.util.Map; -import org.apache.ignite.IgniteCheckedException; -import org.apache.ignite.internal.GridDirectCollection; -import org.apache.ignite.internal.GridDirectMap; -import org.apache.ignite.internal.GridDirectTransient; -import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion; -import org.apache.ignite.internal.processors.cache.CacheEntryInfoCollection; -import org.apache.ignite.internal.processors.cache.GridCacheContext; -import org.apache.ignite.internal.processors.cache.GridCacheDeployable; -import org.apache.ignite.internal.processors.cache.GridCacheEntryInfo; -import org.apache.ignite.internal.processors.cache.GridCacheMessage; -import org.apache.ignite.internal.processors.cache.GridCacheSharedContext; -import org.apache.ignite.internal.util.tostring.GridToStringInclude; -import org.apache.ignite.internal.util.typedef.internal.S; -import org.apache.ignite.plugin.extensions.communication.MessageCollectionItemType; -import org.apache.ignite.plugin.extensions.communication.MessageReader; -import org.apache.ignite.plugin.extensions.communication.MessageWriter; - -/** - * Partition supply message. - */ -public class GridDhtPartitionSupplyMessageV2 extends GridCacheMessage implements GridCacheDeployable { - /** */ - private static final long serialVersionUID = 0L; - - /** Update sequence. */ - private long updateSeq; - - /** Topology version. */ - private AffinityTopologyVersion topVer; - - /** Partitions that have been fully sent. */ - @GridDirectCollection(int.class) - private Collection<Integer> last; - - /** Partitions which were not found. */ - @GridToStringInclude - @GridDirectCollection(int.class) - private Collection<Integer> missed; - - /** Entries. */ - @GridDirectMap(keyType = int.class, valueType = CacheEntryInfoCollection.class) - private Map<Integer, CacheEntryInfoCollection> infos; - - /** Message size. */ - @GridDirectTransient - private int msgSize; - - /** - * @param updateSeq Update sequence for this node. - * @param cacheId Cache ID. - * @param topVer Topology version. - * @param addDepInfo Deployment info flag. - */ - GridDhtPartitionSupplyMessageV2(long updateSeq, - int cacheId, - AffinityTopologyVersion topVer, - boolean addDepInfo) { - this.cacheId = cacheId; - this.updateSeq = updateSeq; - this.topVer = topVer; - this.addDepInfo = addDepInfo; } - - /** - * Empty constructor required for {@link Externalizable}. - */ - public GridDhtPartitionSupplyMessageV2() { - // No-op. - } - - /** {@inheritDoc} */ - @Override public boolean ignoreClassErrors() { - return true; - } - - /** - * @return Update sequence. - */ - long updateSequence() { - return updateSeq; - } - - /** - * @return Topology version for which demand message is sent. - */ - @Override public AffinityTopologyVersion topologyVersion() { - return topVer; - } - - /** - * @return Flag to indicate last message for partition. - */ - Collection<Integer> last() { - return last == null ? Collections.<Integer>emptySet() : last; - } - - /** - * @param p Partition which was fully sent. - */ - void last(int p) { - if (last == null) - last = new HashSet<>(); - - if (last.add(p)) { - msgSize += 4; - - // If partition is empty, we need to add it. - if (!infos().containsKey(p)) { - CacheEntryInfoCollection infoCol = new CacheEntryInfoCollection(); - - infoCol.init(); - - infos().put(p, infoCol); - } - } - } - - /** - * @param p Missed partition. - */ - void missed(int p) { - if (missed == null) - missed = new HashSet<>(); - - if (missed.add(p)) - msgSize += 4; - } - - /** - * @return Missed partitions. - */ - Collection<Integer> missed() { - return missed == null ? Collections.<Integer>emptySet() : missed; - } - - /** - * @return Entries. - */ - Map<Integer, CacheEntryInfoCollection> infos() { - if (infos == null) - infos = new HashMap<>(); - - return infos; - } - - /** - * @return Message size. - */ - int messageSize() { - return msgSize; - } - - /** - * @param p Partition. - * @param info Entry to add. - * @param ctx Cache context. - * @throws IgniteCheckedException If failed. - */ - void addEntry(int p, GridCacheEntryInfo info, GridCacheContext ctx) throws IgniteCheckedException { - assert info != null; - - marshalInfo(info, ctx); - - msgSize += info.marshalledSize(ctx); - - CacheEntryInfoCollection infoCol = infos().get(p); - - if (infoCol == null) { - msgSize += 4; - - infos().put(p, infoCol = new CacheEntryInfoCollection()); - - infoCol.init(); - } - - infoCol.add(info); - } - - /** - * @param p Partition. - * @param info Entry to add. - * @param ctx Cache context. - * @throws IgniteCheckedException If failed. - */ - void addEntry0(int p, GridCacheEntryInfo info, GridCacheContext ctx) throws IgniteCheckedException { - assert info != null; - assert (info.key() != null || info.keyBytes() != null); - assert info.value() != null; - - // Need to call this method to initialize info properly. - marshalInfo(info, ctx); - - msgSize += info.marshalledSize(ctx); - - CacheEntryInfoCollection infoCol = infos().get(p); - - if (infoCol == null) { - msgSize += 4; - - infos().put(p, infoCol = new CacheEntryInfoCollection()); - - infoCol.init(); - } - - infoCol.add(info); - } - - /** {@inheritDoc} */ - @SuppressWarnings("ForLoopReplaceableByForEach") - @Override public void finishUnmarshal(GridCacheSharedContext ctx, ClassLoader ldr) throws IgniteCheckedException { - super.finishUnmarshal(ctx, ldr); - - GridCacheContext cacheCtx = ctx.cacheContext(cacheId); - - for (CacheEntryInfoCollection col : infos().values()) { - List<GridCacheEntryInfo> entries = col.infos(); - - for (int i = 0; i < entries.size(); i++) - entries.get(i).unmarshal(cacheCtx, ldr); - } - } - - /** {@inheritDoc} */ - @Override public boolean addDeploymentInfo() { - return addDepInfo; - } - - /** - * @return Number of entries in message. - */ - public int size() { - return infos().size(); - } - - /** {@inheritDoc} */ - @Override public boolean writeTo(ByteBuffer buf, MessageWriter writer) { - writer.setBuffer(buf); - - if (!super.writeTo(buf, writer)) - return false; - - if (!writer.isHeaderWritten()) { - if (!writer.writeHeader(directType(), fieldsCount())) - return false; - - writer.onHeaderWritten(); - } - - switch (writer.state()) { - case 3: - if (!writer.writeMap("infos", infos, MessageCollectionItemType.INT, MessageCollectionItemType.MSG)) - return false; - - writer.incrementState(); - - case 4: - if (!writer.writeCollection("last", last, MessageCollectionItemType.INT)) - return false; - - writer.incrementState(); - - case 5: - if (!writer.writeCollection("missed", missed, MessageCollectionItemType.INT)) - return false; - - writer.incrementState(); - - case 6: - if (!writer.writeMessage("topVer", topVer)) - return false; - - writer.incrementState(); - - case 7: - if (!writer.writeLong("updateSeq", updateSeq)) - return false; - - writer.incrementState(); - - } - - return true; - } - - /** {@inheritDoc} */ - @Override public boolean readFrom(ByteBuffer buf, MessageReader reader) { - reader.setBuffer(buf); - - if (!reader.beforeMessageRead()) - return false; - - if (!super.readFrom(buf, reader)) - return false; - - switch (reader.state()) { - case 3: - infos = reader.readMap("infos", MessageCollectionItemType.INT, MessageCollectionItemType.MSG, false); - - if (!reader.isLastRead()) - return false; - - reader.incrementState(); - - case 4: - last = reader.readCollection("last", MessageCollectionItemType.INT); - - if (!reader.isLastRead()) - return false; - - reader.incrementState(); - - case 5: - missed = reader.readCollection("missed", MessageCollectionItemType.INT); - - if (!reader.isLastRead()) - return false; - - reader.incrementState(); - - case 6: - topVer = reader.readMessage("topVer"); - - if (!reader.isLastRead()) - return false; - - reader.incrementState(); - - case 7: - updateSeq = reader.readLong("updateSeq"); - - if (!reader.isLastRead()) - return false; - - reader.incrementState(); - - } - - return reader.afterMessageRead(GridDhtPartitionSupplyMessageV2.class); - } - - /** {@inheritDoc} */ - @Override public short directType() { - return 114; - } - - /** {@inheritDoc} */ - @Override public byte fieldsCount() { - return 8; - } - - /** {@inheritDoc} */ - @Override public String toString() { - return S.toString(GridDhtPartitionSupplyMessageV2.class, this, - "size", size(), - "parts", infos().keySet(), - "super", super.toString()); - } -} http://git-wip-us.apache.org/repos/asf/ignite/blob/aeacad6b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPreloader.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPreloader.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPreloader.java index dc988bd..a5dcd8c 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPreloader.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPreloader.java @@ -187,7 +187,6 @@ public class GridDhtPreloader extends GridCachePreloaderAdapter { supplier = new GridDhtPartitionSupplier(cctx); demander = new GridDhtPartitionDemander(cctx); - supplier.start(); demander.start(); cctx.events().addListener(discoLsnr, EVT_NODE_JOINED, EVT_NODE_LEFT, EVT_NODE_FAILED); @@ -380,7 +379,7 @@ public class GridDhtPreloader extends GridCachePreloaderAdapter { } /** {@inheritDoc} */ - public void handleSupplyMessage(int idx, UUID id, final GridDhtPartitionSupplyMessageV2 s) { + public void handleSupplyMessage(int idx, UUID id, final GridDhtPartitionSupplyMessage s) { if (!enterBusy()) return; http://git-wip-us.apache.org/repos/asf/ignite/blob/aeacad6b/modules/core/src/main/resources/META-INF/classnames.properties ---------------------------------------------------------------------- diff --git a/modules/core/src/main/resources/META-INF/classnames.properties b/modules/core/src/main/resources/META-INF/classnames.properties index 9cce826..8c5a72e 100644 --- a/modules/core/src/main/resources/META-INF/classnames.properties +++ b/modules/core/src/main/resources/META-INF/classnames.properties @@ -743,7 +743,6 @@ org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPar org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPartitionSupplier$1 org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPartitionSupplier$SupplyContextPhase org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPartitionSupplyMessage -org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPartitionSupplyMessageV2 org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPartitionsAbstractMessage org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPartitionsExchangeFuture$1 org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPartitionsExchangeFuture$2 http://git-wip-us.apache.org/repos/asf/ignite/blob/aeacad6b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/CacheLateAffinityAssignmentTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/CacheLateAffinityAssignmentTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/CacheLateAffinityAssignmentTest.java index e482a93..5582fdd 100644 --- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/CacheLateAffinityAssignmentTest.java +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/CacheLateAffinityAssignmentTest.java @@ -65,7 +65,7 @@ import org.apache.ignite.internal.processors.cache.GridCacheContext; import org.apache.ignite.internal.processors.cache.GridCacheSharedContext; import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtForceKeysRequest; import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtForceKeysResponse; -import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPartitionSupplyMessageV2; +import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPartitionSupplyMessage; import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPartitionsExchangeFuture; import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPartitionsFullMessage; import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPartitionsSingleMessage; @@ -2016,10 +2016,10 @@ public class CacheLateAffinityAssignmentTest extends GridCommonAbstractTest { private void blockSupplySend(TestRecordingCommunicationSpi spi, final String cacheName) { spi.blockMessages(new IgnitePredicate<GridIoMessage>() { @Override public boolean apply(GridIoMessage ioMsg) { - if (!ioMsg.message().getClass().equals(GridDhtPartitionSupplyMessageV2.class)) + if (!ioMsg.message().getClass().equals(GridDhtPartitionSupplyMessage.class)) return false; - GridDhtPartitionSupplyMessageV2 msg = (GridDhtPartitionSupplyMessageV2)ioMsg.message(); + GridDhtPartitionSupplyMessage msg = (GridDhtPartitionSupplyMessage)ioMsg.message(); return msg.cacheId() == CU.cacheId(cacheName); } http://git-wip-us.apache.org/repos/asf/ignite/blob/aeacad6b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/IgniteCacheReadFromBackupTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/IgniteCacheReadFromBackupTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/IgniteCacheReadFromBackupTest.java index 89fcf6b..29c2af6 100644 --- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/IgniteCacheReadFromBackupTest.java +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/IgniteCacheReadFromBackupTest.java @@ -38,7 +38,7 @@ import org.apache.ignite.configuration.IgniteConfiguration; import org.apache.ignite.configuration.NearCacheConfiguration; import org.apache.ignite.internal.TestRecordingCommunicationSpi; import org.apache.ignite.internal.managers.communication.GridIoMessage; -import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPartitionSupplyMessageV2; +import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPartitionSupplyMessage; import org.apache.ignite.internal.processors.cache.distributed.near.GridNearGetRequest; import org.apache.ignite.internal.processors.cache.distributed.near.GridNearSingleGetRequest; import org.apache.ignite.internal.util.typedef.internal.CU; @@ -197,10 +197,10 @@ public class IgniteCacheReadFromBackupTest extends GridCommonAbstractTest { spi.blockMessages(new IgnitePredicate<GridIoMessage>() { @Override public boolean apply(GridIoMessage ioMsg) { - if (!ioMsg.message().getClass().equals(GridDhtPartitionSupplyMessageV2.class)) + if (!ioMsg.message().getClass().equals(GridDhtPartitionSupplyMessage.class)) return false; - GridDhtPartitionSupplyMessageV2 msg = (GridDhtPartitionSupplyMessageV2)ioMsg.message(); + GridDhtPartitionSupplyMessage msg = (GridDhtPartitionSupplyMessage)ioMsg.message(); return msg.cacheId() == CU.cacheId(ccfg.getName()); } http://git-wip-us.apache.org/repos/asf/ignite/blob/aeacad6b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/IgniteCacheAtomicProtocolTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/IgniteCacheAtomicProtocolTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/IgniteCacheAtomicProtocolTest.java index eda030c..dfb3f65 100644 --- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/IgniteCacheAtomicProtocolTest.java +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/IgniteCacheAtomicProtocolTest.java @@ -37,7 +37,6 @@ import org.apache.ignite.internal.TestRecordingCommunicationSpi; import org.apache.ignite.internal.managers.communication.GridIoMessage; import org.apache.ignite.internal.processors.cache.GridCacheMessage; import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPartitionSupplyMessage; -import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPartitionSupplyMessageV2; import org.apache.ignite.internal.util.lang.GridAbsPredicate; import org.apache.ignite.internal.util.typedef.F; import org.apache.ignite.internal.util.typedef.G; @@ -106,7 +105,7 @@ public class IgniteCacheAtomicProtocolTest extends GridCommonAbstractTest { @Override public boolean apply(GridIoMessage msg) { Object msg0 = msg.message(); - return (msg0 instanceof GridDhtPartitionSupplyMessage || msg0 instanceof GridDhtPartitionSupplyMessageV2) + return (msg0 instanceof GridDhtPartitionSupplyMessage) && ((GridCacheMessage)msg0).cacheId() == CU.cacheId(TEST_CACHE); } });
