Repository: ignite Updated Branches: refs/heads/ignite-1093-2 049918089 -> e97b5818a
1093 Project: http://git-wip-us.apache.org/repos/asf/ignite/repo Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/e97b5818 Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/e97b5818 Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/e97b5818 Branch: refs/heads/ignite-1093-2 Commit: e97b5818a371ac3d71281f00820d2a025b55b7b6 Parents: 0499180 Author: Anton Vinogradov <[email protected]> Authored: Mon Sep 7 19:03:36 2015 +0300 Committer: Anton Vinogradov <[email protected]> Committed: Mon Sep 7 19:03:36 2015 +0300 ---------------------------------------------------------------------- .../dht/preloader/GridDhtPartitionDemander.java | 37 ++++++-------------- .../dht/preloader/GridDhtPartitionSupplier.java | 2 +- 2 files changed, 11 insertions(+), 28 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/ignite/blob/e97b5818/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionDemander.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionDemander.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionDemander.java index b260501..7a0a94c 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionDemander.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionDemander.java @@ -40,7 +40,6 @@ import org.apache.ignite.cache.CacheRebalanceMode; import org.apache.ignite.cluster.ClusterNode; import org.apache.ignite.configuration.CacheConfiguration; import org.apache.ignite.events.DiscoveryEvent; -import org.apache.ignite.events.Event; import org.apache.ignite.internal.IgniteInternalFuture; import org.apache.ignite.internal.IgniteInterruptedCheckedException; import org.apache.ignite.internal.IgniteNodeAttributes; @@ -78,7 +77,6 @@ import static java.util.concurrent.TimeUnit.MILLISECONDS; import static org.apache.ignite.events.EventType.EVT_CACHE_REBALANCE_OBJECT_LOADED; import static org.apache.ignite.events.EventType.EVT_CACHE_REBALANCE_PART_LOADED; import static org.apache.ignite.events.EventType.EVT_CACHE_REBALANCE_STOPPED; -import static org.apache.ignite.events.EventType.EVT_NODE_FAILED; import static org.apache.ignite.internal.GridTopic.TOPIC_CACHE; import static org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtPartitionState.MOVING; import static org.apache.ignite.internal.processors.dr.GridDrType.DR_NONE; @@ -142,6 +140,8 @@ public class GridDhtPartitionDemander { * */ void stop() { + syncFut.onCancel(); + lastExchangeFut = null; lastTimeoutObj.set(null); @@ -193,7 +193,7 @@ public class GridDhtPartitionDemander { * @return {@code True} if topology changed. */ private boolean topologyChanged(AffinityTopologyVersion topVer) { - return !cctx.affinity().affinityTopologyVersion().equals(topVer); + return cctx.affinity().affinityTopologyVersion().topologyVersion() != topVer.topologyVersion(); } /** @@ -334,8 +334,6 @@ public class GridDhtPartitionDemander { } }); - fut.setDemandThread(thread); - thread.start(); } else if (delay > 0) { @@ -373,7 +371,7 @@ public class GridDhtPartitionDemander { AffinityTopologyVersion topVer = fut.topologyVersion(); for (Map.Entry<ClusterNode, GridDhtPartitionDemandMessage> e : assigns.entrySet()) { - if (topologyChanged(topVer) || Thread.interrupted()) { + if (topologyChanged(topVer)) { fut.onCancel(); return; @@ -778,8 +776,6 @@ public class GridDhtPartitionDemander { /** Started. */ private ConcurrentHashMap8<UUID, Long> started = new ConcurrentHashMap8<>(); - private volatile IgniteThread thread; - /** Lock. */ private Lock lock = new ReentrantLock(); @@ -812,24 +808,14 @@ public class GridDhtPartitionDemander { * @param assigns Assigns. */ void init(GridDhtPreloaderAssignments assigns) { - final SyncFuture fut = this; - - lsnr = new GridLocalEventListener() { - @Override public void onEvent(Event evt) { - fut.onCancel(); - } - }; - - cctx.events().addListener(lsnr, EVT_NODE_FAILED); - this.assigns = assigns; - } - /** - * @param thread - */ - void setDemandThread(IgniteThread thread) { - this.thread = thread; + cctx.discovery().topologyFuture(assigns.topologyVersion().topologyVersion() + 1).listen( + new CI1<IgniteInternalFuture<Long>>() { + @Override public void apply(IgniteInternalFuture<Long> future) { + SyncFuture.this.onCancel(); + } + }); } /** @@ -1028,9 +1014,6 @@ public class GridDhtPartitionDemander { if (lsnr != null) cctx.events().removeListener(lsnr); - if (thread != null) - thread.interrupt(); - onDone(completed); } } http://git-wip-us.apache.org/repos/asf/ignite/blob/e97b5818/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionSupplier.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionSupplier.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionSupplier.java index 0686376..49e89ca 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionSupplier.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionSupplier.java @@ -104,7 +104,7 @@ class GridDhtPartitionSupplier { assert d != null; assert id != null; - if (!cctx.affinity().affinityTopologyVersion().equals(d.topologyVersion())) + if (cctx.affinity().affinityTopologyVersion().topologyVersion() != d.topologyVersion().topologyVersion()) return; GridDhtPartitionSupplyMessageV2 s = new GridDhtPartitionSupplyMessageV2(d.workerId(),
