Repository: ignite
Updated Branches:
  refs/heads/ignite-1093-2 049918089 -> e97b5818a


1093


Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/e97b5818
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/e97b5818
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/e97b5818

Branch: refs/heads/ignite-1093-2
Commit: e97b5818a371ac3d71281f00820d2a025b55b7b6
Parents: 0499180
Author: Anton Vinogradov <[email protected]>
Authored: Mon Sep 7 19:03:36 2015 +0300
Committer: Anton Vinogradov <[email protected]>
Committed: Mon Sep 7 19:03:36 2015 +0300

----------------------------------------------------------------------
 .../dht/preloader/GridDhtPartitionDemander.java | 37 ++++++--------------
 .../dht/preloader/GridDhtPartitionSupplier.java |  2 +-
 2 files changed, 11 insertions(+), 28 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/ignite/blob/e97b5818/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionDemander.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionDemander.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionDemander.java
index b260501..7a0a94c 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionDemander.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionDemander.java
@@ -40,7 +40,6 @@ import org.apache.ignite.cache.CacheRebalanceMode;
 import org.apache.ignite.cluster.ClusterNode;
 import org.apache.ignite.configuration.CacheConfiguration;
 import org.apache.ignite.events.DiscoveryEvent;
-import org.apache.ignite.events.Event;
 import org.apache.ignite.internal.IgniteInternalFuture;
 import org.apache.ignite.internal.IgniteInterruptedCheckedException;
 import org.apache.ignite.internal.IgniteNodeAttributes;
@@ -78,7 +77,6 @@ import static java.util.concurrent.TimeUnit.MILLISECONDS;
 import static 
org.apache.ignite.events.EventType.EVT_CACHE_REBALANCE_OBJECT_LOADED;
 import static 
org.apache.ignite.events.EventType.EVT_CACHE_REBALANCE_PART_LOADED;
 import static org.apache.ignite.events.EventType.EVT_CACHE_REBALANCE_STOPPED;
-import static org.apache.ignite.events.EventType.EVT_NODE_FAILED;
 import static org.apache.ignite.internal.GridTopic.TOPIC_CACHE;
 import static 
org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtPartitionState.MOVING;
 import static org.apache.ignite.internal.processors.dr.GridDrType.DR_NONE;
@@ -142,6 +140,8 @@ public class GridDhtPartitionDemander {
      *
      */
     void stop() {
+        syncFut.onCancel();
+
         lastExchangeFut = null;
 
         lastTimeoutObj.set(null);
@@ -193,7 +193,7 @@ public class GridDhtPartitionDemander {
      * @return {@code True} if topology changed.
      */
     private boolean topologyChanged(AffinityTopologyVersion topVer) {
-        return !cctx.affinity().affinityTopologyVersion().equals(topVer);
+        return cctx.affinity().affinityTopologyVersion().topologyVersion() != 
topVer.topologyVersion();
     }
 
     /**
@@ -334,8 +334,6 @@ public class GridDhtPartitionDemander {
                 }
             });
 
-            fut.setDemandThread(thread);
-
             thread.start();
         }
         else if (delay > 0) {
@@ -373,7 +371,7 @@ public class GridDhtPartitionDemander {
         AffinityTopologyVersion topVer = fut.topologyVersion();
 
         for (Map.Entry<ClusterNode, GridDhtPartitionDemandMessage> e : 
assigns.entrySet()) {
-            if (topologyChanged(topVer) || Thread.interrupted()) {
+            if (topologyChanged(topVer)) {
                 fut.onCancel();
 
                 return;
@@ -778,8 +776,6 @@ public class GridDhtPartitionDemander {
         /** Started. */
         private ConcurrentHashMap8<UUID, Long> started = new 
ConcurrentHashMap8<>();
 
-        private volatile IgniteThread thread;
-
         /** Lock. */
         private Lock lock = new ReentrantLock();
 
@@ -812,24 +808,14 @@ public class GridDhtPartitionDemander {
          * @param assigns Assigns.
          */
         void init(GridDhtPreloaderAssignments assigns) {
-            final SyncFuture fut = this;
-
-            lsnr = new GridLocalEventListener() {
-                @Override public void onEvent(Event evt) {
-                    fut.onCancel();
-                }
-            };
-
-            cctx.events().addListener(lsnr, EVT_NODE_FAILED);
-
             this.assigns = assigns;
-        }
 
-        /**
-         * @param thread
-         */
-        void setDemandThread(IgniteThread thread) {
-            this.thread = thread;
+            
cctx.discovery().topologyFuture(assigns.topologyVersion().topologyVersion() + 
1).listen(
+                new CI1<IgniteInternalFuture<Long>>() {
+                    @Override public void apply(IgniteInternalFuture<Long> 
future) {
+                        SyncFuture.this.onCancel();
+                    }
+                });
         }
 
         /**
@@ -1028,9 +1014,6 @@ public class GridDhtPartitionDemander {
                 if (lsnr != null)
                     cctx.events().removeListener(lsnr);
 
-                if (thread != null)
-                    thread.interrupt();
-
                 onDone(completed);
             }
         }

http://git-wip-us.apache.org/repos/asf/ignite/blob/e97b5818/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionSupplier.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionSupplier.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionSupplier.java
index 0686376..49e89ca 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionSupplier.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionSupplier.java
@@ -104,7 +104,7 @@ class GridDhtPartitionSupplier {
         assert d != null;
         assert id != null;
 
-        if 
(!cctx.affinity().affinityTopologyVersion().equals(d.topologyVersion()))
+        if (cctx.affinity().affinityTopologyVersion().topologyVersion() != 
d.topologyVersion().topologyVersion())
             return;
 
         GridDhtPartitionSupplyMessageV2 s = new 
GridDhtPartitionSupplyMessageV2(d.workerId(),

Reply via email to