Repository: ignite Updated Branches: refs/heads/ignite-1093-2 94c9297ad -> a8b323d74
1093 Project: http://git-wip-us.apache.org/repos/asf/ignite/repo Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/a8b323d7 Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/a8b323d7 Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/a8b323d7 Branch: refs/heads/ignite-1093-2 Commit: a8b323d74d57f33b6fa11a8ef629a1a26325e783 Parents: 94c9297 Author: Anton Vinogradov <[email protected]> Authored: Tue Sep 22 19:05:00 2015 +0300 Committer: Anton Vinogradov <[email protected]> Committed: Tue Sep 22 19:05:00 2015 +0300 ---------------------------------------------------------------------- .../dht/preloader/GridDhtPartitionDemander.java | 48 +++++++++++++------- .../GridCacheRebalancingSyncSelfTest.java | 2 +- 2 files changed, 32 insertions(+), 18 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/ignite/blob/a8b323d7/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionDemander.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionDemander.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionDemander.java index 7f2dc48..bbb6a21 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionDemander.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionDemander.java @@ -256,6 +256,12 @@ public class GridDhtPartitionDemander { final SyncFuture oldFut = syncFut; + if (!oldFut.isDummy() && assigns.topologyVersion().compareTo(oldFut.topologyVersion()) < 0) { + U.log(log, "Skipping obsolete (dummy) exchange. [top=" + assigns.topologyVersion() + "]"); + + return; + } + final SyncFuture fut = new SyncFuture(assigns, cctx, log, oldFut.isDummy()); if (!oldFut.isDummy()) @@ -381,7 +387,7 @@ public class GridDhtPartitionDemander { U.log(log, "Starting rebalancing [cache=" + cctx.name() + ", mode=" + cfg.getRebalanceMode() + ", fromNode=" + node.id() + ", partitionsCount=" + d.partitions().size() + - ", topology=" + d.topologyVersion() + "]"); + ", topology=" + d.topologyVersion() + ", updateSeq=" + d.updateSequence() + "]"); //Check remote node rebalancing API version. if (new Integer(1).equals(node.attribute(IgniteNodeAttributes.REBALANCING_VERSION))) { @@ -411,19 +417,24 @@ public class GridDhtPartitionDemander { initD.topic(GridCachePartitionExchangeManager.rebalanceTopic(cnt)); try { - if (!topologyChanged(fut)) + if (!topologyChanged(fut)) { cctx.io().sendOrderedMessage(node, GridCachePartitionExchangeManager.rebalanceTopic(cnt), initD, cctx.ioPolicy(), d.timeout()); - else + + if (log.isDebugEnabled()) + log.debug("Requested rebalancing [from node=" + node.id() + ", listener index=" + cnt + ", partitions count=" + sParts.get(cnt).size() + " (" + partitionsList(sParts.get(cnt)) + ")]"); + + } + else { fut.cancel(); + + return; + } } catch (IgniteCheckedException ex) { fut.cancel(); U.error(log, "Failed to send partition demand message to node", ex); } - - if (log.isDebugEnabled()) - log.debug("Requested rebalancing [from node=" + node.id() + ", listener index=" + cnt + ", partitions count=" + sParts.get(cnt).size() + " (" + partitionsList(sParts.get(cnt)) + ")]"); } } } @@ -619,12 +630,12 @@ public class GridDhtPartitionDemander { } else fut.cancel(); - } catch (ClusterTopologyCheckedException e) { if (log.isDebugEnabled()) log.debug("Node left during rebalancing [node=" + node.id() + ", msg=" + e.getMessage() + ']'); + fut.cancel(); } catch (IgniteCheckedException ex) { @@ -990,20 +1001,23 @@ public class GridDhtPartitionDemander { if (log.isDebugEnabled()) log.debug("Completed sync future."); - Collection<Integer> m = new HashSet<>(); + if (cctx.affinity().affinityTopologyVersion().equals(topVer)) { - for (Map.Entry<UUID, Collection<Integer>> e : missed.entrySet()) { - if (e.getValue() != null && !e.getValue().isEmpty()) - m.addAll(e.getValue()); - } + Collection<Integer> m = new HashSet<>(); - if (!m.isEmpty()) { - U.log(log, ("Reassigning partitions that were missed: " + m)); + for (Map.Entry<UUID, Collection<Integer>> e : missed.entrySet()) { + if (e.getValue() != null && !e.getValue().isEmpty()) + m.addAll(e.getValue()); + } - cctx.shared().exchange().forceDummyExchange(true, exchFut); - } + if (!m.isEmpty()) { + U.log(log, ("Reassigning partitions that were missed: " + m)); - cctx.shared().exchange().scheduleResendPartitions(); + cctx.shared().exchange().forceDummyExchange(true, exchFut); + } + + cctx.shared().exchange().scheduleResendPartitions(); + } onDone(); } http://git-wip-us.apache.org/repos/asf/ignite/blob/a8b323d7/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/rebalancing/GridCacheRebalancingSyncSelfTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/rebalancing/GridCacheRebalancingSyncSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/rebalancing/GridCacheRebalancingSyncSelfTest.java index 712f3cd..42c1857 100644 --- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/rebalancing/GridCacheRebalancingSyncSelfTest.java +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/rebalancing/GridCacheRebalancingSyncSelfTest.java @@ -172,7 +172,7 @@ public class GridCacheRebalancingSyncSelfTest extends GridCommonAbstractTest { log.info("Checked " + i * 100 / (TEST_SIZE) + "% entries (" + TEST_SIZE + ")."); assert ignite.cache(name).get(i) != null && ignite.cache(name).get(i).equals(i + name.hashCode()) : - "value " + (i + name.hashCode()) + " does not match (" + ignite.cache(name).get(i) + ")"; + i + " value " + (i + name.hashCode()) + " does not match (" + ignite.cache(name).get(i) + ")"; } }
