1093
Project: http://git-wip-us.apache.org/repos/asf/ignite/repo Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/5de124cd Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/5de124cd Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/5de124cd Branch: refs/heads/ignite-1093-2 Commit: 5de124cd9a270446c301b6782a0530c48758f035 Parents: f0f7c32 Author: Anton Vinogradov <[email protected]> Authored: Thu Oct 1 18:43:46 2015 +0300 Committer: Anton Vinogradov <[email protected]> Committed: Thu Oct 1 18:43:46 2015 +0300 ---------------------------------------------------------------------- .../dht/preloader/GridDhtPartitionDemander.java | 24 +++++---- .../dht/preloader/GridDhtPartitionSupplier.java | 56 ++++++++++++-------- 2 files changed, 47 insertions(+), 33 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/ignite/blob/5de124cd/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionDemander.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionDemander.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionDemander.java index d1d475c..56a9c9b 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionDemander.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionDemander.java @@ -199,8 +199,8 @@ public class GridDhtPartitionDemander { private boolean topologyChanged(SyncFuture fut) { return !cctx.affinity().affinityTopologyVersion().equals(fut.topologyVersion()) || // Topology already changed. - fut != syncFut || // Same topology, but dummy exchange forced because of missing partitions. - cctx.shared().exchange().hasPendingExchange(); // New topology pending. + fut != syncFut || // Same topology, but dummy exchange forced because of missing partitions. + cctx.shared().exchange().hasPendingExchange(); // New topology pending. } /** @@ -262,12 +262,6 @@ public class GridDhtPartitionDemander { final SyncFuture oldFut = syncFut; - if (cctx.shared().exchange().hasPendingExchange()) { // Will rebalance at actual topology. - U.log(log, "Skipping obsolete exchange. [top=" + assigns.topologyVersion() + "]"); - - return; - } - final SyncFuture fut = new SyncFuture(assigns, cctx, log, oldFut.isDummy(), ++updateSeq); if (!oldFut.isDummy()) @@ -281,6 +275,14 @@ public class GridDhtPartitionDemander { syncFut = fut; + if (cctx.shared().exchange().hasPendingExchange()) { // Will rebalance at actual topology. + U.log(log, "Skipping obsolete exchange. [top=" + assigns.topologyVersion() + "]"); + + fut.cancel(); + + return; + } + if (assigns.isEmpty()) { fut.doneIfEmpty(); @@ -397,7 +399,9 @@ public class GridDhtPartitionDemander { ", fromNode=" + node.id() + ", partitionsCount=" + d.partitions().size() + ", topology=" + d.topologyVersion() + ", updateSeq=" + fut.updateSeq + "]"); - fut.appendPartitions(node.id(), d.partitions()); + Collection<Integer> parts = new HashSet<>(d.partitions()); + + fut.appendPartitions(node.id(), parts); int lsnrCnt = cctx.gridConfig().getRebalanceThreadPoolSize(); @@ -406,7 +410,7 @@ public class GridDhtPartitionDemander { for (int cnt = 0; cnt < lsnrCnt; cnt++) sParts.add(new HashSet<Integer>()); - Iterator<Integer> it = d.partitions().iterator(); + Iterator<Integer> it = parts.iterator(); int cnt = 0; http://git-wip-us.apache.org/repos/asf/ignite/blob/5de124cd/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionSupplier.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionSupplier.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionSupplier.java index b5bb25d..98946f4 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionSupplier.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionSupplier.java @@ -46,6 +46,7 @@ import org.apache.ignite.lang.IgnitePredicate; import org.jsr166.ConcurrentHashMap8; import static org.apache.ignite.events.EventType.EVT_NODE_FAILED; +import static org.apache.ignite.events.EventType.EVT_NODE_LEFT; import static org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtPartitionState.OWNING; /** @@ -104,7 +105,7 @@ class GridDhtPartitionSupplier { clearContext(sctx, log); - U.log(log, "Supply context removed for failed node [node=" + t.get1() + "]"); + U.log(log, "Supply context removed for node failed or left [node=" + t.get1() + "]"); scMap.remove(t, sctx); } @@ -116,7 +117,7 @@ class GridDhtPartitionSupplier { } }; - cctx.events().addListener(lsnr, EVT_NODE_FAILED); + cctx.events().addListener(lsnr, EVT_NODE_LEFT, EVT_NODE_FAILED); startOldListeners(); } @@ -301,12 +302,12 @@ class GridDhtPartitionSupplier { swapLsnr = null; loc = null; - reply(node, d, s); + reply(node, d, s, scId); return; } else { - if (!reply(node, d, s)) + if (!reply(node, d, s, scId)) return; s = new GridDhtPartitionSupplyMessageV2(d.updateSequence(), @@ -388,12 +389,12 @@ class GridDhtPartitionSupplier { swapLsnr = null; loc = null; - reply(node, d, s); + reply(node, d, s, scId); return; } else { - if (!reply(node, d, s)) + if (!reply(node, d, s, scId)) return; s = new GridDhtPartitionSupplyMessageV2(d.updateSequence(), @@ -510,12 +511,12 @@ class GridDhtPartitionSupplier { loc = null; - reply(node, d, s); + reply(node, d, s, scId); return; } else { - if (!reply(node, d, s)) + if (!reply(node, d, s, scId)) return; s = new GridDhtPartitionSupplyMessageV2(d.updateSequence(), @@ -553,7 +554,7 @@ class GridDhtPartitionSupplier { scMap.remove(scId); - reply(node, d, s); + reply(node, d, s, scId); } catch (IgniteCheckedException e) { U.error(log, "Failed to send partition supply message to node: " + id, e); @@ -567,7 +568,10 @@ class GridDhtPartitionSupplier { * @return {@code True} if message was sent, {@code false} if recipient left grid. * @throws IgniteCheckedException If failed. */ - private boolean reply(ClusterNode n, GridDhtPartitionDemandMessage d, GridDhtPartitionSupplyMessageV2 s) + private boolean reply(ClusterNode n, + GridDhtPartitionDemandMessage d, + GridDhtPartitionSupplyMessageV2 s, + T2<UUID, Integer> scId) throws IgniteCheckedException { try { @@ -575,8 +579,11 @@ class GridDhtPartitionSupplier { log.debug("Replying to partition demand [node=" + n.id() + ", demand=" + d + ", supply=" + s + ']'); if (!cctx.affinity().affinityTopologyVersion().equals(d.topologyVersion()) || // Topology already changed. - cctx.shared().exchange().hasPendingExchange()) // New topology pending. - return true; + cctx.shared().exchange().hasPendingExchange()) { // New topology pending. + clearContext(scMap.remove(scId), log); + + return false; + } cctx.io().sendOrderedMessage(n, d.topic(), s, cctx.ioPolicy(), d.timeout()); @@ -590,6 +597,8 @@ class GridDhtPartitionSupplier { if (log.isDebugEnabled()) log.debug("Failed to send partition supply message because node left grid: " + n.id()); + clearContext(scMap.remove(scId), log); + return false; } } @@ -611,14 +620,15 @@ class GridDhtPartitionSupplier { GridDhtLocalPartition loc, AffinityTopologyVersion topVer, long updateSeq) { - SupplyContext old = scMap.putIfAbsent(t, new SupplyContext(phase, - partIt, - entryIt, - swapLsnr, - part, - loc, - topVer, - updateSeq)); + SupplyContext old = scMap.putIfAbsent(t, + new SupplyContext(phase, + partIt, + entryIt, + swapLsnr, + part, + loc, + topVer, + updateSeq)); assert old == null; } @@ -643,13 +653,13 @@ class GridDhtPartitionSupplier { private final int part; /** Local partition. */ - GridDhtLocalPartition loc; + private final GridDhtLocalPartition loc; /** Topology version. */ - AffinityTopologyVersion topVer; + private final AffinityTopologyVersion topVer; /** Update seq. */ - long updateSeq; + private final long updateSeq; /** * @param phase Phase.
