Repository: ignite
Updated Branches:
  refs/heads/ignite-1093-2 07d683fd3 -> 7fe5f119b


1093


Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/7fe5f119
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/7fe5f119
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/7fe5f119

Branch: refs/heads/ignite-1093-2
Commit: 7fe5f119b2163e6910f64e44c68a95a7150d810b
Parents: 07d683f
Author: Anton Vinogradov <[email protected]>
Authored: Mon Sep 14 17:24:30 2015 +0300
Committer: Anton Vinogradov <[email protected]>
Committed: Mon Sep 14 17:24:30 2015 +0300

----------------------------------------------------------------------
 .../GridCachePartitionExchangeManager.java      | 29 ++++++-
 .../dht/preloader/GridDhtPartitionDemander.java | 79 +++++++++++---------
 2 files changed, 70 insertions(+), 38 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/ignite/blob/7fe5f119/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java
index e3e2d53..d71f158 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java
@@ -1271,12 +1271,37 @@ public class GridCachePartitionExchangeManager<K, V> 
extends GridCacheSharedMana
                     }
 
                     if (assignsMap != null) {
+                        //Marshaller cache first.
+                        int mId = CU.cacheId(GridCacheUtils.MARSH_CACHE_NAME);
+
+                        GridDhtPreloaderAssignments mA = assignsMap.get(mId);
+
+                        assert mA != null;
+
+                        GridCacheContext<K, V> mCacheCtx = 
cctx.cacheContext(mId);
+
+                        mCacheCtx.preloader().addAssignments(mA, forcePreload);
+
+                        //Utility cache second.
+                        int uId = 
CU.cacheId(GridCacheUtils.UTILITY_CACHE_NAME);
+
+                        GridDhtPreloaderAssignments uA = assignsMap.get(uId);
+
+                        assert uA != null;
+
+                        GridCacheContext<K, V> uCacheCtx = 
cctx.cacheContext(uId);
+
+                        uCacheCtx.preloader().addAssignments(uA, forcePreload);
+
+                        //Others.
                         for (Map.Entry<Integer, GridDhtPreloaderAssignments> e 
: assignsMap.entrySet()) {
                             int cacheId = e.getKey();
 
-                            GridCacheContext<K, V> cacheCtx = 
cctx.cacheContext(cacheId);
+                            if (cacheId != uId && cacheId != mId) {
+                                GridCacheContext<K, V> cacheCtx = 
cctx.cacheContext(cacheId);
 
-                            cacheCtx.preloader().addAssignments(e.getValue(), 
forcePreload);
+                                
cacheCtx.preloader().addAssignments(e.getValue(), forcePreload);
+                            }
                         }
                     }
                 }

http://git-wip-us.apache.org/repos/asf/ignite/blob/7fe5f119/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionDemander.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionDemander.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionDemander.java
index b902fed..b03fa67 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionDemander.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionDemander.java
@@ -52,6 +52,7 @@ import 
org.apache.ignite.internal.processors.cache.GridCacheEntryEx;
 import org.apache.ignite.internal.processors.cache.GridCacheEntryInfo;
 import 
org.apache.ignite.internal.processors.cache.GridCacheEntryRemovedException;
 import 
org.apache.ignite.internal.processors.cache.GridCachePartitionExchangeManager;
+import org.apache.ignite.internal.processors.cache.GridCacheUtils;
 import 
org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtInvalidPartitionException;
 import 
org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtLocalPartition;
 import 
org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtPartitionTopology;
@@ -208,6 +209,41 @@ public class GridDhtPartitionDemander {
     }
 
     /**
+     * @param name Name.
+     * @param fut Future.
+     */
+    private void waitForCacheRebalancing(String name, SyncFuture fut) {
+        if (log.isDebugEnabled())
+            log.debug("Waiting for " + name + " cache rebalancing [cacheName=" 
+ cctx.name() + ']');
+
+        try {
+            SyncFuture wFut = 
(SyncFuture)cctx.kernalContext().cache().internalCache(name).preloader().syncFuture();
+
+            if (!topologyChanged(fut.assigns.topologyVersion()))
+                wFut.get();
+            else {
+                fut.onCancel();
+
+                return;
+            }
+        }
+        catch (IgniteInterruptedCheckedException ignored) {
+            if (log.isDebugEnabled()) {
+                log.debug("Failed to wait for " + name + " cache rebalancing 
future (grid is stopping): " +
+                    "[cacheName=" + cctx.name() + ']');
+                fut.onCancel();
+
+                return;
+            }
+        }
+        catch (IgniteCheckedException e) {
+            fut.onCancel();
+
+            throw new Error("Ordered rebalancing future should never fail: " + 
e.getMessage(), e);
+        }
+    }
+
+    /**
      * @param assigns Assignments.
      * @param force {@code True} if dummy reassign.
      * @throws IgniteCheckedException
@@ -240,6 +276,10 @@ public class GridDhtPartitionDemander {
             if (assigns.isEmpty()) {
                 fut.checkIsDone();
 
+                if (fut.assigns.topologyVersion().topologyVersion() > 1)// 
First node.
+                    U.log(log, "Rebalancing is not required [cache=" + 
cctx.name() +
+                        ", topology=" + fut.assigns.topologyVersion() + "]");
+
                 return;
             }
 
@@ -254,43 +294,10 @@ public class GridDhtPartitionDemander {
             IgniteThread thread = new IgniteThread(cctx.gridName(), 
"demand-thread-" + cctx.cache().name(), new Runnable() {
                 @Override public void run() {
                     if (!CU.isMarshallerCache(cctx.name())) {
-                        if (log.isDebugEnabled())
-                            log.debug("Waiting for marshaller cache preload 
[cacheName=" + cctx.name() + ']');
-
-                        try {
-                            IgniteInternalFuture mFut;
-                            do {
-                                mFut = 
cctx.kernalContext().cache().marshallerCache().preloader().syncFuture();
-                            }
-                            while (!((SyncFuture)mFut).isInited() || 
((SyncFuture)mFut).topologyVersion().topologyVersion() < 
curFut.topologyVersion().topologyVersion());
-
-                            if 
(((SyncFuture)mFut).topologyVersion().topologyVersion() > 
curFut.topologyVersion().topologyVersion()) {
-                                curFut.onCancel();
-
-                                return;
-                            }
-
-                            if (!topologyChanged(topVer))
-                                mFut.get();
-                            else {
-                                curFut.onCancel();
-
-                                return;
-                            }
-                        }
-                        catch (IgniteInterruptedCheckedException ignored) {
-                            if (log.isDebugEnabled()) {
-                                log.debug("Failed to wait for marshaller cache 
preload future (grid is stopping): " +
-                                    "[cacheName=" + cctx.name() + ']');
-                                curFut.onCancel();
-
-                                return;
-                            }
-                        }
-                        catch (IgniteCheckedException e) {
-                            curFut.onCancel();
+                        
waitForCacheRebalancing(GridCacheUtils.MARSH_CACHE_NAME, curFut);
 
-                            throw new Error("Ordered preload future should 
never fail: " + e.getMessage(), e);
+                        if (!CU.isUtilityCache(cctx.name())) {
+                            
waitForCacheRebalancing(GridCacheUtils.UTILITY_CACHE_NAME, curFut);
                         }
                     }
 

Reply via email to