1093
Project: http://git-wip-us.apache.org/repos/asf/ignite/repo Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/feea8f98 Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/feea8f98 Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/feea8f98 Branch: refs/heads/ignite-1093-3 Commit: feea8f983e5eab7b742e2bd1f2ec5c1b8d1ec6d4 Parents: d78e4cd Author: Anton Vinogradov <[email protected]> Authored: Wed Oct 28 12:55:43 2015 +0300 Committer: Anton Vinogradov <[email protected]> Committed: Wed Oct 28 12:55:43 2015 +0300 ---------------------------------------------------------------------- .../processors/cache/GridCachePreloader.java | 7 + .../cache/GridCachePreloaderAdapter.java | 5 + .../GridDhtPartitionDemandMessage.java | 2 - .../dht/preloader/GridDhtPartitionDemander.java | 44 +++++- .../dht/preloader/GridDhtPartitionSupplier.java | 156 +++++++++---------- .../GridDhtPartitionSupplyMessageV2.java | 2 - .../GridDhtPartitionsExchangeFuture.java | 2 + .../dht/preloader/GridDhtPreloader.java | 5 + .../GridCacheRebalancingSyncSelfTest.java | 12 +- 9 files changed, 139 insertions(+), 96 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/ignite/blob/feea8f98/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePreloader.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePreloader.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePreloader.java index 79861a2..b2bb8f1 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePreloader.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePreloader.java @@ -171,4 +171,11 @@ public interface GridCachePreloader { * @param part Partition. */ public void evictPartitionAsync(GridDhtLocalPartition part); + + /** + * Handles new topology. + * + * @param topVer Topology version. + */ + public void onTopologyChanged(AffinityTopologyVersion topVer); } http://git-wip-us.apache.org/repos/asf/ignite/blob/feea8f98/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePreloaderAdapter.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePreloaderAdapter.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePreloaderAdapter.java index b784383..d465950 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePreloaderAdapter.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePreloaderAdapter.java @@ -173,4 +173,9 @@ public class GridCachePreloaderAdapter implements GridCachePreloader { @Override public void evictPartitionAsync(GridDhtLocalPartition part) { // No-op. } + + /** {@inheritDoc} */ + @Override public void onTopologyChanged(AffinityTopologyVersion topVer) { + // No-op. + } } http://git-wip-us.apache.org/repos/asf/ignite/blob/feea8f98/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionDemandMessage.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionDemandMessage.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionDemandMessage.java index 4ac644a..e99fa9d 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionDemandMessage.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionDemandMessage.java @@ -68,8 +68,6 @@ public class GridDhtPartitionDemandMessage extends GridCacheMessage { * @param topVer Topology version. */ GridDhtPartitionDemandMessage(long updateSeq, @NotNull AffinityTopologyVersion topVer, int cacheId) { - assert updateSeq > 0; - this.cacheId = cacheId; this.updateSeq = updateSeq; this.topVer = topVer; http://git-wip-us.apache.org/repos/asf/ignite/blob/feea8f98/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionDemander.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionDemander.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionDemander.java index 6479542..deedf21 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionDemander.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionDemander.java @@ -377,8 +377,12 @@ public class GridDhtPartitionDemander { initD.updateSequence(fut.updateSeq); initD.timeout(cctx.config().getRebalanceTimeout()); - cctx.io().sendOrderedMessage(node, - GridCachePartitionExchangeManager.rebalanceTopic(cnt), initD, cctx.ioPolicy(), initD.timeout()); + synchronized (fut) { + if (!fut.isDone())// Future can be already cancelled at this moment and all failovers happened. + // New requests will not be covered by failovers. + cctx.io().sendOrderedMessage(node, + GridCachePartitionExchangeManager.rebalanceTopic(cnt), initD, cctx.ioPolicy(), initD.timeout()); + } if (log.isDebugEnabled()) log.debug("Requested rebalancing [from node=" + node.id() + ", listener index=" + @@ -810,11 +814,15 @@ public class GridDhtPartitionDemander { if (isDone()) return true; - remaining.clear(); - U.log(log, "Cancelled rebalancing from all nodes [cache=" + cctx.name() + ", topology=" + topologyVersion()); + for (UUID nodeId : remaining.keySet()) { + cleanupRemoteContexts(nodeId); + } + + remaining.clear(); + checkIsDone(true /* cancelled */); } @@ -833,6 +841,8 @@ public class GridDhtPartitionDemander { ", fromNode=" + nodeId + ", topology=" + topologyVersion() + ", time=" + (U.currentTimeMillis() - remaining.get(nodeId).get1()) + " ms]")); + cleanupRemoteContexts(nodeId); + remaining.remove(nodeId); checkIsDone(); @@ -856,6 +866,32 @@ public class GridDhtPartitionDemander { } } + private void cleanupRemoteContexts(UUID nodeId) { + ClusterNode node = cctx.discovery().node(nodeId); + + //Check remote node rebalancing API version. + if (node.version().compareTo(GridDhtPreloader.REBALANCING_VER_2_SINCE) >= 0) { + + GridDhtPartitionDemandMessage d = new GridDhtPartitionDemandMessage( + -1/* remove supply context signal */, this.topologyVersion(), cctx.cacheId()); + + d.timeout(cctx.config().getRebalanceTimeout()); + + try { + for (int idx = 0; idx < cctx.gridConfig().getRebalanceThreadPoolSize(); idx++) { + d.topic(GridCachePartitionExchangeManager.rebalanceTopic(idx)); + + cctx.io().sendOrderedMessage(node, GridCachePartitionExchangeManager.rebalanceTopic(idx), + d, cctx.ioPolicy(), cctx.config().getRebalanceTimeout()); + } + } + catch (IgniteCheckedException e) { + if (log.isDebugEnabled()) + log.debug("Failed to send failover context cleanup request to node"); + } + } + } + /** * @param nodeId Node id. * @param p P. http://git-wip-us.apache.org/repos/asf/ignite/blob/feea8f98/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionSupplier.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionSupplier.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionSupplier.java index f5ae93b..9db2dc7 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionSupplier.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionSupplier.java @@ -18,17 +18,15 @@ package org.apache.ignite.internal.processors.cache.distributed.dht.preloader; import java.util.Collection; +import java.util.HashMap; import java.util.Iterator; import java.util.Map; import java.util.UUID; import org.apache.ignite.IgniteCheckedException; import org.apache.ignite.IgniteLogger; import org.apache.ignite.cluster.ClusterNode; -import org.apache.ignite.events.DiscoveryEvent; -import org.apache.ignite.events.Event; import org.apache.ignite.internal.cluster.ClusterTopologyCheckedException; import org.apache.ignite.internal.managers.deployment.GridDeploymentInfo; -import org.apache.ignite.internal.managers.eventstorage.GridLocalEventListener; import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion; import org.apache.ignite.internal.processors.cache.GridCacheContext; import org.apache.ignite.internal.processors.cache.GridCacheEntryEx; @@ -40,13 +38,10 @@ import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtLocalP import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtPartitionTopology; import org.apache.ignite.internal.util.lang.GridCloseableIterator; import org.apache.ignite.internal.util.typedef.CI2; -import org.apache.ignite.internal.util.typedef.T2; +import org.apache.ignite.internal.util.typedef.T3; import org.apache.ignite.internal.util.typedef.internal.U; import org.apache.ignite.lang.IgnitePredicate; -import org.jsr166.ConcurrentHashMap8; -import static org.apache.ignite.events.EventType.EVT_NODE_FAILED; -import static org.apache.ignite.events.EventType.EVT_NODE_LEFT; import static org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtPartitionState.OWNING; /** @@ -68,12 +63,8 @@ class GridDhtPartitionSupplier { /** Preload predicate. */ private IgnitePredicate<GridCacheEntryInfo> preloadPred; - /** Supply context map. T2: nodeId, idx. */ - private final ConcurrentHashMap8<T2<UUID, Integer>, SupplyContext> scMap = - new ConcurrentHashMap8<>(); - - /** Rebalancing listener. */ - private GridLocalEventListener lsnr; + /** Supply context map. T2: nodeId, idx, topVer. */ + private final Map<T3<UUID, Integer, AffinityTopologyVersion>, SupplyContext> scMap = new HashMap<>(); /** * @param cctx Cache context. @@ -94,32 +85,6 @@ class GridDhtPartitionSupplier { * */ void start() { - lsnr = new GridLocalEventListener() { - @Override public void onEvent(Event evt) { - if (evt instanceof DiscoveryEvent) { - for (Map.Entry<T2<UUID, Integer>, SupplyContext> entry : scMap.entrySet()) { - T2<UUID, Integer> t = entry.getKey(); - - if (t.get1().equals(((DiscoveryEvent)evt).eventNode().id())) { - SupplyContext sctx = entry.getValue(); - - clearContext(sctx, log); - - if (log.isDebugEnabled()) - log.debug("Supply context removed for failed or left node [node=" + t.get1() + "]"); - - scMap.remove(t, sctx); - } - } - } - else { - assert false; - } - } - }; - - cctx.events().addListener(lsnr, EVT_NODE_LEFT, EVT_NODE_FAILED); - startOldListeners(); } @@ -127,11 +92,16 @@ class GridDhtPartitionSupplier { * */ void stop() { - if (lsnr != null) - cctx.events().removeListener(lsnr); + synchronized (scMap) { + Iterator<T3<UUID, Integer, AffinityTopologyVersion>> it = scMap.keySet().iterator(); + + while (it.hasNext()) { + T3<UUID, Integer, AffinityTopologyVersion> t = it.next(); - for (Map.Entry<T2<UUID, Integer>, SupplyContext> entry : scMap.entrySet()) { - clearContext(entry.getValue(), log); + clearContext(scMap.get(t), log); + + it.remove(); + } } stopOldListeners(); @@ -152,10 +122,7 @@ class GridDhtPartitionSupplier { if (it != null && it instanceof GridCloseableIterator && !((GridCloseableIterator)it).isClosed()) { try { - synchronized (it) { - if (!((GridCloseableIterator)it).isClosed()) - ((GridCloseableIterator)it).close(); - } + ((GridCloseableIterator)it).close(); } catch (IgniteCheckedException e) { log.error("Iterator close failed.", e); @@ -164,12 +131,34 @@ class GridDhtPartitionSupplier { final GridDhtLocalPartition loc = sc.loc; - if (loc != null && loc.reservations() > 0) { - synchronized (loc) { - if (loc.reservations() > 0) - loc.release(); - } + if (loc != null) { + assert loc.reservations() > 0; + loc.release(); + } + } + } + + /** + * Handles new topology. + * + * @param topVer Topology version. + */ + public void onTopologyChanged(AffinityTopologyVersion topVer) { + synchronized (scMap) { + Iterator<T3<UUID, Integer, AffinityTopologyVersion>> it = scMap.keySet().iterator(); + + while (it.hasNext()) { + T3<UUID, Integer, AffinityTopologyVersion> t = it.next(); + + if (topVer.compareTo(t.get3()) > 0) {// Clear all obsolete contexts. + clearContext(scMap.get(t), log); + + it.remove(); + + if (log.isDebugEnabled()) + log.debug("Supply context removed [node=" + t.get1() + "]"); + } } } } @@ -195,6 +184,16 @@ class GridDhtPartitionSupplier { AffinityTopologyVersion cutTop = cctx.affinity().affinityTopologyVersion(); AffinityTopologyVersion demTop = d.topologyVersion(); + T3<UUID, Integer, AffinityTopologyVersion> scId = new T3<>(id, idx, demTop); + + if (d.updateSequence() == -1) {//Demand node requested context cleanup. + synchronized (scMap) { + clearContext(scMap.remove(scId), log); + + return; + } + } + if (cutTop.compareTo(demTop) > 0) { if (log.isDebugEnabled()) log.debug("Demand request cancelled [current=" + cutTop + ", demanded=" + demTop + @@ -212,16 +211,13 @@ class GridDhtPartitionSupplier { ClusterNode node = cctx.discovery().node(id); - T2<UUID, Integer> scId = new T2<>(id, idx); - try { - SupplyContext sctx = scMap.remove(scId); + SupplyContext sctx; - // Context will be cleaned in case topology changed. - if (sctx != null && (!d.topologyVersion().equals(sctx.topVer) || d.updateSequence() != sctx.updateSeq)) { - clearContext(sctx, log); + synchronized (scMap) { + sctx = scMap.remove(scId); - sctx = null; + assert sctx == null || d.updateSequence() == sctx.updateSeq; } // Initial demand request should contain partitions list. @@ -371,7 +367,6 @@ class GridDhtPartitionSupplier { swapLsnr, part, loc, - d.topologyVersion(), d.updateSequence()); } } @@ -497,7 +492,6 @@ class GridDhtPartitionSupplier { null, part, loc, - d.topologyVersion(), d.updateSequence()); } } @@ -579,8 +573,6 @@ class GridDhtPartitionSupplier { } } - scMap.remove(scId); - reply(node, d, s, scId); if (log.isDebugEnabled()) @@ -604,7 +596,7 @@ class GridDhtPartitionSupplier { private boolean reply(ClusterNode n, GridDhtPartitionDemandMessage d, GridDhtPartitionSupplyMessageV2 s, - T2<UUID, Integer> scId) + T3<UUID, Integer, AffinityTopologyVersion> scId) throws IgniteCheckedException { try { @@ -623,7 +615,9 @@ class GridDhtPartitionSupplier { if (log.isDebugEnabled()) log.debug("Failed to send partition supply message because node left grid: " + n.id()); - clearContext(scMap.remove(scId), log); + synchronized (scMap) { + clearContext(scMap.remove(scId), log); + } return false; } @@ -638,7 +632,7 @@ class GridDhtPartitionSupplier { * @param swapLsnr Swap listener. */ private void saveSupplyContext( - T2<UUID, Integer> t, + T3<UUID, Integer, AffinityTopologyVersion> t, int phase, Iterator<Integer> partIt, int part, @@ -646,17 +640,20 @@ class GridDhtPartitionSupplier { GridDhtLocalPartition loc, AffinityTopologyVersion topVer, long updateSeq) { - SupplyContext old = scMap.putIfAbsent(t, - new SupplyContext(phase, - partIt, - entryIt, - swapLsnr, - part, - loc, - topVer, - updateSeq)); - - assert old == null; + synchronized (scMap) { + if (cctx.affinity().affinityTopologyVersion().equals(topVer)) { + assert scMap.get(t) == null; + + scMap.put(t, + new SupplyContext(phase, + partIt, + entryIt, + swapLsnr, + part, + loc, + updateSeq)); + } + } } /** @@ -681,9 +678,6 @@ class GridDhtPartitionSupplier { /** Local partition. */ private final GridDhtLocalPartition loc; - /** Topology version. */ - private final AffinityTopologyVersion topVer; - /** Update seq. */ private final long updateSeq; @@ -700,7 +694,6 @@ class GridDhtPartitionSupplier { GridCacheEntryInfoCollectSwapListener swapLsnr, int part, GridDhtLocalPartition loc, - AffinityTopologyVersion topVer, long updateSeq) { this.phase = phase; this.partIt = partIt; @@ -708,7 +701,6 @@ class GridDhtPartitionSupplier { this.swapLsnr = swapLsnr; this.part = part; this.loc = loc; - this.topVer = topVer; this.updateSeq = updateSeq; } } http://git-wip-us.apache.org/repos/asf/ignite/blob/feea8f98/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionSupplyMessageV2.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionSupplyMessageV2.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionSupplyMessageV2.java index d68e417..502620c 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionSupplyMessageV2.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionSupplyMessageV2.java @@ -81,8 +81,6 @@ public class GridDhtPartitionSupplyMessageV2 extends GridCacheMessage implements * @param addDepInfo Deployment info flag. */ GridDhtPartitionSupplyMessageV2(long updateSeq, int cacheId, AffinityTopologyVersion topVer, boolean addDepInfo) { - assert updateSeq > 0; - this.cacheId = cacheId; this.updateSeq = updateSeq; this.topVer = topVer; http://git-wip-us.apache.org/repos/asf/ignite/blob/feea8f98/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java index 77e47a7..5c7190b 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java @@ -742,6 +742,8 @@ public class GridDhtPartitionsExchangeFuture extends GridFutureAdapter<AffinityT // Must initialize topology after we get discovery event. initTopology(cacheCtx); + cacheCtx.preloader().onTopologyChanged(exchId.topologyVersion()); + cacheCtx.preloader().updateLastExchangeFuture(this); } http://git-wip-us.apache.org/repos/asf/ignite/blob/feea8f98/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPreloader.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPreloader.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPreloader.java index 64d5a19..36e0c9a 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPreloader.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPreloader.java @@ -272,6 +272,11 @@ public class GridDhtPreloader extends GridCachePreloaderAdapter { } /** {@inheritDoc} */ + @Override public void onTopologyChanged(AffinityTopologyVersion topVer) { + supplier.onTopologyChanged(topVer); + } + + /** {@inheritDoc} */ @Override public GridDhtPreloaderAssignments assign(GridDhtPartitionsExchangeFuture exchFut) { // No assignments for disabled preloader. GridDhtPartitionTopology top = cctx.dht().topology(); http://git-wip-us.apache.org/repos/asf/ignite/blob/feea8f98/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/rebalancing/GridCacheRebalancingSyncSelfTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/rebalancing/GridCacheRebalancingSyncSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/rebalancing/GridCacheRebalancingSyncSelfTest.java index cea7808..b17588f 100644 --- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/rebalancing/GridCacheRebalancingSyncSelfTest.java +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/rebalancing/GridCacheRebalancingSyncSelfTest.java @@ -316,7 +316,9 @@ public class GridCacheRebalancingSyncSelfTest extends GridCommonAbstractTest { Map map = U.field(supplier, "scMap"); - assert map.isEmpty(); + synchronized (map) { + assert map.isEmpty(); + } } } } @@ -357,8 +359,6 @@ public class GridCacheRebalancingSyncSelfTest extends GridCommonAbstractTest { waitForRebalancing(3, 5, 0); waitForRebalancing(4, 5, 0); - checkSupplyContextMapIsEmpty(); - //New cache should start rebalancing. CacheConfiguration<Integer, Integer> cacheRCfg = new CacheConfiguration<>(); @@ -431,14 +431,14 @@ public class GridCacheRebalancingSyncSelfTest extends GridCommonAbstractTest { t4.start(); - stopGrid(0); + stopGrid(1); - waitForRebalancing(1, 6); + waitForRebalancing(0, 6); waitForRebalancing(2, 6); waitForRebalancing(3, 6); waitForRebalancing(4, 6); - stopGrid(1); + stopGrid(0); waitForRebalancing(2, 7); waitForRebalancing(3, 7);
