5578

Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/354b8682
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/354b8682
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/354b8682

Branch: refs/heads/ignite-5578
Commit: 354b8682d97b04c642abd377afd6591727ebb12b
Parents: de050c7
Author: sboikov <[email protected]>
Authored: Wed Aug 2 23:28:30 2017 +0300
Committer: sboikov <[email protected]>
Committed: Wed Aug 2 23:28:30 2017 +0300

----------------------------------------------------------------------
 .../GridCachePartitionExchangeManager.java      | 45 ++++++++++++++++++--
 .../GridDhtPartitionsExchangeFuture.java        |  6 ++-
 2 files changed, 46 insertions(+), 5 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/ignite/blob/354b8682/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java
index c8235ac..e4508ce 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java
@@ -641,6 +641,8 @@ public class GridCachePartitionExchangeManager<K, V> 
extends GridCacheSharedMana
 
     /** {@inheritDoc} */
     @Override protected void onKernalStop0(boolean cancel) {
+        exchWorker.onKernalStop();
+
         cctx.gridEvents().removeDiscoveryEventListener(discoLsnr);
 
         cctx.io().removeHandler(false, 0, 
GridDhtPartitionsSingleMessage.class);
@@ -1767,11 +1769,18 @@ public class GridCachePartitionExchangeManager<K, V> 
extends GridCacheSharedMana
         this.exchMergeTestWaitVer = exchMergeTestWaitVer;
     }
 
-    public void mergeExchanges(final GridDhtPartitionsExchangeFuture curFut, 
GridDhtPartitionsFullMessage msg)
+    /**
+     * @param curFut Current exchange future.
+     * @param msg Message.
+     * @return {@code True} if node is stopping.
+     * @throws IgniteInterruptedCheckedException If interrupted.
+     */
+    public boolean mergeExchanges(final GridDhtPartitionsExchangeFuture 
curFut, GridDhtPartitionsFullMessage msg)
         throws IgniteInterruptedCheckedException {
         AffinityTopologyVersion resVer = msg.resultTopologyVersion();
 
-        exchWorker.waitForExchangeFuture(resVer);
+        if (exchWorker.waitForExchangeFuture(resVer))
+            return true;
 
         for (CachePartitionExchangeWorkerTask task : exchWorker.futQ) {
             if (task instanceof GridDhtPartitionsExchangeFuture) {
@@ -1811,6 +1820,8 @@ public class GridCachePartitionExchangeManager<K, V> 
extends GridCacheSharedMana
 
         assert evts.topologyVersion().equals(resVer) : "Invalid exchange merge 
result [ver=" + evts.topologyVersion()
             + ", expVer=" + resVer + ']';
+
+        return false;
     }
 
     /**
@@ -1944,6 +1955,9 @@ public class GridCachePartitionExchangeManager<K, V> 
extends GridCacheSharedMana
         /** */
         private boolean crd;
 
+        /** */
+        private boolean stop;
+
         /**
          * Constructor.
          */
@@ -1989,13 +2003,36 @@ public class GridCachePartitionExchangeManager<K, V> 
extends GridCacheSharedMana
                 log.debug("Added exchange future to exchange worker: " + 
exchFut);
         }
 
-        private void waitForExchangeFuture(AffinityTopologyVersion resVer) 
throws IgniteInterruptedCheckedException {
+        /**
+         *
+         */
+        private void onKernalStop() {
+            synchronized (this) {
+                stop = true;
+
+                notifyAll();
+            }
+        }
+
+        /**
+         * @param resVer Version to wait for.
+         * @return {@code True} if node is stopping.
+         * @throws IgniteInterruptedCheckedException If interrupted.
+         */
+        private boolean waitForExchangeFuture(AffinityTopologyVersion resVer) 
throws IgniteInterruptedCheckedException {
             synchronized (this) {
-                while (lastFutVer.compareTo(resVer) < 0)
+                while (!stop && lastFutVer.compareTo(resVer) < 0)
                     U.wait(this);
+
+                return stop;
             }
         }
 
+        /**
+         * @param resVer Exchange result version.
+         * @param exchFut Exchange future.
+         * @throws IgniteInterruptedCheckedException If interrupted.
+         */
         private void onExchangeDone(AffinityTopologyVersion resVer, 
GridDhtPartitionsExchangeFuture exchFut)
             throws IgniteInterruptedCheckedException {
             if (resVer.compareTo(exchFut.exchangeId().topologyVersion()) != 0) 
{

http://git-wip-us.apache.org/repos/asf/ignite/blob/354b8682/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java
index 6f77f96..5191557 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java
@@ -2645,7 +2645,11 @@ public class GridDhtPartitionsExchangeFuture extends 
GridDhtTopologyFutureAdapte
 
                     resTopVer = msg.resultTopologyVersion();
 
-                    cctx.exchange().mergeExchanges(this, msg);
+                    if (cctx.exchange().mergeExchanges(this, msg)) {
+                        assert cctx.kernalContext().isStopping();
+
+                        return; // Node is stopping, no need to further 
process exchange.
+                    }
 
                     assert 
resTopVer.equals(exchCtx.events().topologyVersion()) :  "Unexpected result 
version [" +
                         "msgVer=" + resTopVer +

Reply via email to