Repository: ignite
Updated Branches:
  refs/heads/ignite-1093-2 4baf249db -> 9dd41d5f1


1093


Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/9dd41d5f
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/9dd41d5f
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/9dd41d5f

Branch: refs/heads/ignite-1093-2
Commit: 9dd41d5f18a54a66fb59dd520a855023d3e452de
Parents: 4baf249
Author: Anton Vinogradov <[email protected]>
Authored: Sun Oct 11 00:46:12 2015 +0300
Committer: Anton Vinogradov <[email protected]>
Committed: Sun Oct 11 00:46:12 2015 +0300

----------------------------------------------------------------------
 .../GridCachePartitionExchangeManager.java      | 65 ++++++++++++----
 .../processors/cache/GridCachePreloader.java    |  5 +-
 .../cache/GridCachePreloaderAdapter.java        |  6 +-
 .../processors/cache/GridCacheProcessor.java    | 50 ------------
 .../distributed/dht/GridDhtLocalPartition.java  | 12 +--
 .../distributed/dht/GridDhtPartitionState.java  |  2 +-
 .../distributed/dht/GridDhtTxPrepareFuture.java |  4 +-
 .../dht/preloader/GridDhtPartitionDemander.java | 81 ++++++--------------
 .../dht/preloader/GridDhtPreloader.java         |  7 +-
 .../GridCacheRebalancingSyncSelfTest.java       | 12 ++-
 .../config/benchmark-rebalancing.properties     |  2 -
 11 files changed, 109 insertions(+), 137 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/ignite/blob/9dd41d5f/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java
index 2dbbe3c..8057d18 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java
@@ -21,11 +21,17 @@ import java.util.ArrayList;
 import java.util.Collection;
 import java.util.Comparator;
 import java.util.HashMap;
+import java.util.LinkedList;
 import java.util.List;
 import java.util.Map;
+import java.util.NavigableMap;
+import java.util.TreeMap;
 import java.util.UUID;
 import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.Callable;
 import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
 import java.util.concurrent.LinkedBlockingDeque;
 import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.AtomicLong;
@@ -74,6 +80,7 @@ import org.apache.ignite.internal.util.worker.GridWorker;
 import org.apache.ignite.lang.IgniteBiInClosure;
 import org.apache.ignite.lang.IgniteUuid;
 import org.apache.ignite.thread.IgniteThread;
+import org.apache.ignite.thread.IgniteThreadFactory;
 import org.jetbrains.annotations.Nullable;
 import org.jsr166.ConcurrentHashMap8;
 
@@ -134,6 +141,9 @@ public class GridCachePartitionExchangeManager<K, V> 
extends GridCacheSharedMana
     /** */
     private GridFutureAdapter<?> reconnectExchangeFut;
 
+    /** */
+    private ExecutorService rebalancingOrderedExecutorService;
+
     /**
      * Partition map futures.
      * This set also contains already completed exchange futures to address 
race conditions when coordinator
@@ -249,6 +259,9 @@ public class GridCachePartitionExchangeManager<K, V> 
extends GridCacheSharedMana
     @Override protected void start0() throws IgniteCheckedException {
         super.start0();
 
+        rebalancingOrderedExecutorService = Executors.newSingleThreadExecutor(
+                new IgniteThreadFactory(cctx.gridName(), 
"rebalancing-assigns"));
+
         exchWorker = new ExchangeWorker();
 
         cctx.gridEvents().addLocalEventListener(discoLsnr, EVT_NODE_JOINED, 
EVT_NODE_LEFT, EVT_NODE_FAILED,
@@ -453,6 +466,8 @@ public class GridCachePartitionExchangeManager<K, V> 
extends GridCacheSharedMana
     @Override protected void stop0(boolean cancel) {
         super.stop0(cancel);
 
+        rebalancingOrderedExecutorService.shutdownNow();
+
         // Do not allow any activity in exchange manager after stop.
         busyLock.writeLock().lock();
 
@@ -1263,27 +1278,21 @@ public class GridCachePartitionExchangeManager<K, V> 
extends GridCacheSharedMana
                     }
 
                     if (assignsMap != null) {
+                        NavigableMap<Integer, List<Integer>> orderMap = new 
TreeMap<>();
+
                         //Marshaller cache first.
                         int mId = CU.cacheId(GridCacheUtils.MARSH_CACHE_NAME);
 
-                        GridDhtPreloaderAssignments mA = assignsMap.get(mId);
-
-                        assert mA != null;
+                        orderMap.put(-2, new ArrayList<Integer>(1));
 
-                        GridCacheContext<K, V> mCacheCtx = 
cctx.cacheContext(mId);
-
-                        mCacheCtx.preloader().addAssignments(mA, forcePreload);
+                        orderMap.get(-2).add(mId);
 
                         //Utility cache second.
                         int uId = 
CU.cacheId(GridCacheUtils.UTILITY_CACHE_NAME);
 
-                        GridDhtPreloaderAssignments uA = assignsMap.get(uId);
-
-                        assert uA != null;
-
-                        GridCacheContext<K, V> uCacheCtx = 
cctx.cacheContext(uId);
+                        orderMap.put(-1, new ArrayList<Integer>(1));
 
-                        uCacheCtx.preloader().addAssignments(uA, forcePreload);
+                        orderMap.get(-1).add(uId);
 
                         //Others.
                         for (Map.Entry<Integer, GridDhtPreloaderAssignments> e 
: assignsMap.entrySet()) {
@@ -1292,7 +1301,37 @@ public class GridCachePartitionExchangeManager<K, V> 
extends GridCacheSharedMana
                             if (cacheId != uId && cacheId != mId) {
                                 GridCacheContext<K, V> cacheCtx = 
cctx.cacheContext(cacheId);
 
-                                
cacheCtx.preloader().addAssignments(e.getValue(), forcePreload);
+                                int order = 
cacheCtx.config().getRebalanceOrder();
+
+                                if (orderMap.get(order) == null)
+                                    orderMap.put(order, new 
LinkedList<Integer>());
+
+                                orderMap.get(order).add(cacheId);
+                            }
+                        }
+
+                        //Ordered rebalance scheduling.
+                        for (Integer order : orderMap.keySet()) {
+                            for (Integer cacheId : orderMap.get(order)) {
+                                GridCacheContext<K, V> cacheCtx = 
cctx.cacheContext(cacheId);
+
+                                List<String> waitList = new LinkedList<>();
+
+                                for (List<Integer> cIds : 
orderMap.headMap(order).values()) {
+                                    for (Integer cId : cIds) {
+                                        
waitList.add(cctx.cacheContext(cId).name());
+                                    }
+                                }
+
+                                Callable c = 
cacheCtx.preloader().addAssignments(
+                                        assignsMap.get(cacheId), forcePreload, 
waitList);
+
+                                if (c != null) {
+                                    U.log(log, "Rebalancing scheduled: 
[cache=" + cacheCtx.name() +
+                                            " , waitList=" + 
waitList.toString() + "]");
+
+                                    
rebalancingOrderedExecutorService.submit(c);
+                                }
                             }
                         }
                     }

http://git-wip-us.apache.org/repos/asf/ignite/blob/9dd41d5f/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePreloader.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePreloader.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePreloader.java
index 622d01e..878f985 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePreloader.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePreloader.java
@@ -19,6 +19,7 @@ package org.apache.ignite.internal.processors.cache;
 
 import java.util.Collection;
 import java.util.UUID;
+import java.util.concurrent.Callable;
 import org.apache.ignite.IgniteCheckedException;
 import org.apache.ignite.internal.IgniteInternalFuture;
 import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion;
@@ -93,8 +94,10 @@ public interface GridCachePreloader {
      *
      * @param assignments Assignments to add.
      * @param forcePreload Force preload flag.
+     * @param caches Rebalancing of these caches will be finished before this 
started.
      */
-    public void addAssignments(GridDhtPreloaderAssignments assignments, 
boolean forcePreload) throws IgniteCheckedException;
+    public Callable addAssignments(GridDhtPreloaderAssignments assignments, 
boolean forcePreload, Collection<String> caches)
+            throws IgniteCheckedException;
 
     /**
      * @param p Preload predicate.

http://git-wip-us.apache.org/repos/asf/ignite/blob/9dd41d5f/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePreloaderAdapter.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePreloaderAdapter.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePreloaderAdapter.java
index c5f503f..0aae0dc 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePreloaderAdapter.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePreloaderAdapter.java
@@ -19,6 +19,7 @@ package org.apache.ignite.internal.processors.cache;
 
 import java.util.Collection;
 import java.util.UUID;
+import java.util.concurrent.Callable;
 import org.apache.ignite.IgniteCheckedException;
 import org.apache.ignite.IgniteLogger;
 import org.apache.ignite.cache.affinity.AffinityFunction;
@@ -156,7 +157,8 @@ public class GridCachePreloaderAdapter implements 
GridCachePreloader {
     }
 
     /** {@inheritDoc} */
-    @Override public void addAssignments(GridDhtPreloaderAssignments 
assignments, boolean forcePreload) throws IgniteCheckedException {
-        // No-op.
+    @Override public Callable addAssignments(GridDhtPreloaderAssignments 
assignments, boolean forcePreload, Collection<String> caches)
+            throws IgniteCheckedException {
+        return null;
     }
 }

http://git-wip-us.apache.org/repos/asf/ignite/blob/9dd41d5f/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java
index 99b9269..9b2b558 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java
@@ -31,9 +31,7 @@ import java.util.LinkedList;
 import java.util.List;
 import java.util.ListIterator;
 import java.util.Map;
-import java.util.NavigableMap;
 import java.util.Set;
-import java.util.TreeMap;
 import java.util.UUID;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ConcurrentMap;
@@ -99,7 +97,6 @@ import 
org.apache.ignite.internal.processors.cache.version.GridCacheVersionManag
 import org.apache.ignite.internal.processors.plugin.CachePluginManager;
 import org.apache.ignite.internal.processors.query.GridQueryProcessor;
 import org.apache.ignite.internal.util.F0;
-import org.apache.ignite.internal.util.future.GridCompoundFuture;
 import org.apache.ignite.internal.util.future.GridFinishedFuture;
 import org.apache.ignite.internal.util.future.GridFutureAdapter;
 import org.apache.ignite.internal.util.tostring.GridToStringInclude;
@@ -162,12 +159,6 @@ public class GridCacheProcessor extends 
GridProcessorAdapter {
     /** Map of proxies. */
     private final Map<String, IgniteCacheProxy<?, ?>> jCacheProxies;
 
-    /** Map of preload finish futures grouped by preload order. */
-    private final NavigableMap<Integer, IgniteInternalFuture<?>> preloadFuts;
-
-    /** Maximum detected rebalance order. */
-    private int maxRebalanceOrder;
-
     /** Caches stop sequence. */
     private final Deque<String> stopSeq;
 
@@ -209,7 +200,6 @@ public class GridCacheProcessor extends 
GridProcessorAdapter {
 
         caches = new ConcurrentHashMap<>();
         jCacheProxies = new ConcurrentHashMap<>();
-        preloadFuts = new TreeMap<>();
 
         stopSeq = new LinkedList<>();
     }
@@ -612,8 +602,6 @@ public class GridCacheProcessor extends 
GridProcessorAdapter {
                     "Deployment mode for cache is not CONTINUOUS or SHARED.");
         }
 
-        maxRebalanceOrder = 
validatePreloadOrder(ctx.config().getCacheConfiguration());
-
         ctx.discovery().setCustomEventListener(DynamicCacheChangeBatch.class,
             new CustomEventListener<DynamicCacheChangeBatch>() {
                 @Override public void onCustomEvent(ClusterNode snd,
@@ -844,31 +832,6 @@ public class GridCacheProcessor extends 
GridProcessorAdapter {
         for (GridCacheSharedManager<?, ?> mgr : sharedCtx.managers())
             mgr.onKernalStart(false);
 
-        for (Map.Entry<String, GridCacheAdapter<?, ?>> e : caches.entrySet()) {
-            GridCacheAdapter cache = e.getValue();
-
-            if (maxRebalanceOrder > 0) {
-                CacheConfiguration cfg = cache.configuration();
-
-                int order = cfg.getRebalanceOrder();
-
-                if (order > 0 && order != maxRebalanceOrder && 
cfg.getCacheMode() != LOCAL) {
-                    GridCompoundFuture fut = 
(GridCompoundFuture)preloadFuts.get(order);
-
-                    if (fut == null) {
-                        fut = new GridCompoundFuture<>();
-
-                        preloadFuts.put(order, fut);
-                    }
-
-                    fut.add(cache.preloader().syncFuture());
-                }
-            }
-        }
-
-        for (IgniteInternalFuture<?> fut : preloadFuts.values())
-            ((GridCompoundFuture<Object, Object>)fut).markInitialized();
-
         for (GridCacheAdapter<?, ?> cache : caches.values())
             onKernalStart(cache);
 
@@ -2770,19 +2733,6 @@ public class GridCacheProcessor extends 
GridProcessorAdapter {
     }
 
     /**
-     * Gets preload finish future for preload-ordered cache with given order. 
I.e. will get compound preload future
-     * with maximum order less than {@code order}.
-     *
-     * @param order Cache order.
-     * @return Compound preload future or {@code null} if order is minimal 
order found.
-     */
-    @Nullable public IgniteInternalFuture<?> orderedPreloadFuture(int order) {
-        Map.Entry<Integer, IgniteInternalFuture<?>> entry = 
preloadFuts.lowerEntry(order);
-
-        return entry == null ? null : entry.getValue();
-    }
-
-    /**
      * @param spaceName Space name.
      * @param keyBytes Key bytes.
      * @param valBytes Value bytes.

http://git-wip-us.apache.org/repos/asf/ignite/blob/9dd41d5f/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtLocalPartition.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtLocalPartition.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtLocalPartition.java
index 9fea4f9..3a5ef5c 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtLocalPartition.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtLocalPartition.java
@@ -154,7 +154,8 @@ public class GridDhtLocalPartition implements 
Comparable<GridDhtLocalPartition>,
      * @return {@code false} If such reservation already added.
      */
     public boolean addReservation(GridDhtPartitionsReservation r) {
-        assert state.getReference() != EVICTED : "we can reserve only active 
partitions";
+        assert state.getReference() != EVICTED && state.getReference() != 
EVICTING :
+            "we can reserve only active partitions";
         assert state.getStamp() != 0 : "partition must be already reserved 
before adding group reservation";
 
         return reservations.addIfAbsent(r);
@@ -260,7 +261,7 @@ public class GridDhtLocalPartition implements 
Comparable<GridDhtLocalPartition>,
     void onAdded(GridDhtCacheEntry entry) {
         GridDhtPartitionState state = state();
 
-        if (state == EVICTED)
+        if (state == EVICTED || state == EVICTING)
             throw new GridDhtInvalidPartitionException(id, "Adding entry to 
invalid partition [part=" + id + ']');
 
         map.put(entry.key(), entry);
@@ -389,7 +390,7 @@ public class GridDhtLocalPartition implements 
Comparable<GridDhtLocalPartition>,
 
             GridDhtPartitionState s = state.getReference();
 
-            if (s == EVICTED)
+            if (s == EVICTED || s == EVICTING)
                 return false;
 
             if (state.compareAndSet(s, s, reservations, reservations + 1))
@@ -568,9 +569,8 @@ public class GridDhtLocalPartition implements 
Comparable<GridDhtLocalPartition>,
 
             return true;
         }
-        else {
-            assert false : "expected EVICTING state";
-        }
+
+        assert false : "expected EVICTING state";
 
         return false;
     }

http://git-wip-us.apache.org/repos/asf/ignite/blob/9dd41d5f/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionState.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionState.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionState.java
index 4f6d59d..9849c0a 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionState.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionState.java
@@ -53,6 +53,6 @@ public enum GridDhtPartitionState {
      * @return {@code True} if state is active or owning.
      */
     public boolean active() {
-        return this != EVICTED;
+        return this != EVICTED && this != EVICTING;
     }
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/ignite/blob/9dd41d5f/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxPrepareFuture.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxPrepareFuture.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxPrepareFuture.java
index 761bbb0..09ce270 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxPrepareFuture.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxPrepareFuture.java
@@ -1212,7 +1212,9 @@ public final class GridDhtTxPrepareFuture extends 
GridCompoundFuture<IgniteInter
                     GridDhtPartitionState state = 
entry.context().topology().partitionState(n.id(),
                         entry.cached().partition());
 
-                    if (state != GridDhtPartitionState.OWNING && state != 
GridDhtPartitionState.EVICTED) {
+                    if (state != GridDhtPartitionState.OWNING &&
+                            state != GridDhtPartitionState.EVICTING &&
+                            state != GridDhtPartitionState.EVICTED) {
                         CacheObject procVal = 
entry.entryProcessorCalculatedValue();
 
                         entry.op(procVal == null ? DELETE : UPDATE);

http://git-wip-us.apache.org/repos/asf/ignite/blob/9dd41d5f/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionDemander.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionDemander.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionDemander.java
index 4e274f3..e585eca 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionDemander.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionDemander.java
@@ -53,7 +53,6 @@ import 
org.apache.ignite.internal.processors.cache.GridCacheEntryEx;
 import org.apache.ignite.internal.processors.cache.GridCacheEntryInfo;
 import 
org.apache.ignite.internal.processors.cache.GridCacheEntryRemovedException;
 import 
org.apache.ignite.internal.processors.cache.GridCachePartitionExchangeManager;
-import org.apache.ignite.internal.processors.cache.GridCacheUtils;
 import 
org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtInvalidPartitionException;
 import 
org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtLocalPartition;
 import 
org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtPartitionTopology;
@@ -66,7 +65,6 @@ import 
org.apache.ignite.internal.util.tostring.GridToStringInclude;
 import org.apache.ignite.internal.util.typedef.CI1;
 import org.apache.ignite.internal.util.typedef.CI2;
 import org.apache.ignite.internal.util.typedef.T2;
-import org.apache.ignite.internal.util.typedef.internal.CU;
 import org.apache.ignite.internal.util.typedef.internal.LT;
 import org.apache.ignite.internal.util.typedef.internal.S;
 import org.apache.ignite.internal.util.typedef.internal.U;
@@ -224,8 +222,10 @@ public class GridDhtPartitionDemander {
         try {
             SyncFuture wFut = 
(SyncFuture)cctx.kernalContext().cache().internalCache(name).preloader().syncFuture();
 
-            if (!topologyChanged(fut))
-                wFut.get();
+            if (!topologyChanged(fut)) {
+                if (!wFut.get())
+                    fut.cancel();
+            }
             else {
                 fut.cancel();
             }
@@ -247,10 +247,11 @@ public class GridDhtPartitionDemander {
     /**
      * @param assigns Assignments.
      * @param force {@code True} if dummy reassign.
+     * @param caches Rebalancing of these caches will be finished before this 
started.
      * @throws IgniteCheckedException Exception
      */
-
-    void addAssignments(final GridDhtPreloaderAssignments assigns, boolean 
force) throws IgniteCheckedException {
+    Callable addAssignments(final GridDhtPreloaderAssignments assigns, boolean 
force, final Collection<String> caches)
+        throws IgniteCheckedException {
         if (log.isDebugEnabled())
             log.debug("Adding partition assignments: " + assigns);
 
@@ -261,9 +262,9 @@ public class GridDhtPartitionDemander {
 
             final SyncFuture oldFut = syncFut;
 
-            final SyncFuture fut = new SyncFuture(assigns, cctx, log, 
oldFut.isDummy(), ++updateSeq);
+            final SyncFuture fut = new SyncFuture(assigns, cctx, log, 
oldFut.isInitial(), ++updateSeq);
 
-            if (!oldFut.isDummy())
+            if (!oldFut.isInitial())
                 oldFut.cancel();
             else
                 fut.listen(new CI1<IgniteInternalFuture<Boolean>>() {
@@ -279,72 +280,36 @@ public class GridDhtPartitionDemander {
 
                 fut.cancel();
 
-                return;
+                return null;
             }
 
             if (assigns.isEmpty()) {
                 fut.doneIfEmpty();
 
-                return;
+                return null;
             }
 
             if (topologyChanged(fut)) {
                 fut.cancel();
 
-                return;
+                return null;
             }
 
-            cctx.closures().callLocalSafe(new Callable<Boolean>() {
-                @Override public Boolean call() {
-                    if (!CU.isMarshallerCache(cctx.name())) {
-                        
waitForCacheRebalancing(GridCacheUtils.MARSH_CACHE_NAME, fut);
-
-                        if (!CU.isUtilityCache(cctx.name())) {
-                            
waitForCacheRebalancing(GridCacheUtils.UTILITY_CACHE_NAME, fut);
-                        }
-                    }
-
-                    int rebalanceOrder = cctx.config().getRebalanceOrder();
-
-                    if (rebalanceOrder > 0) {
-                        IgniteInternalFuture<?> oFut = 
cctx.kernalContext().cache().orderedPreloadFuture(rebalanceOrder);
-
-                        try {
-                            if (oFut != null) {
-                                if (log.isDebugEnabled())
-                                    log.debug("Waiting for dependant caches 
rebalance [cacheName=" + cctx.name() +
-                                        ", rebalanceOrder=" + rebalanceOrder + 
']');
-
-                                if (!topologyChanged(fut))
-                                    oFut.get();
-                                else {
-                                    fut.cancel();
-
-                                    return false;
-                                }
-                            }
-                        }
-                        catch (IgniteInterruptedCheckedException ignored) {
-                            if (log.isDebugEnabled()) {
-                                log.debug("Failed to wait for ordered 
rebalance future (grid is stopping): " +
-                                    "[cacheName=" + cctx.name() + ", 
rebalanceOrder=" + rebalanceOrder + ']');
-                                fut.cancel();
-
-                                return false;
-                            }
-                        }
-                        catch (IgniteCheckedException e) {
-                            fut.cancel();
+            return new Callable<Boolean>() {
+                @Override
+                public Boolean call() {
+                    for (String c : caches) {
+                        waitForCacheRebalancing(c, fut);
 
-                            throw new Error("Ordered rebalance future should 
never fail: " + e.getMessage(), e);
-                        }
+                        if (fut.isDone())
+                            return false;
                     }
 
                     requestPartitions(fut, assigns);
 
                     return true;
                 }
-            });
+            };
         }
         else if (delay > 0) {
             GridTimeoutObject obj = lastTimeoutObj.get();
@@ -370,6 +335,8 @@ public class GridDhtPartitionDemander {
 
             cctx.time().addTimeoutObject(obj);
         }
+
+        return null;
     }
 
     /**
@@ -843,9 +810,9 @@ public class GridDhtPartitionDemander {
         }
 
         /**
-         * @return Is dummy (created at demander creation).
+         * @return Is initial (created at demander creation).
          */
-        private boolean isDummy() {
+        private boolean isInitial() {
             return topVer == null;
         }
 

http://git-wip-us.apache.org/repos/asf/ignite/blob/9dd41d5f/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPreloader.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPreloader.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPreloader.java
index 06f4522..fbc74ff 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPreloader.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPreloader.java
@@ -22,6 +22,7 @@ import java.util.Collection;
 import java.util.Collections;
 import java.util.List;
 import java.util.UUID;
+import java.util.concurrent.Callable;
 import java.util.concurrent.ConcurrentMap;
 import java.util.concurrent.locks.ReadWriteLock;
 import java.util.concurrent.locks.ReentrantReadWriteLock;
@@ -396,9 +397,9 @@ public class GridDhtPreloader extends 
GridCachePreloaderAdapter {
     }
 
     /** {@inheritDoc} */
-    @Override public void addAssignments(GridDhtPreloaderAssignments 
assignments,
-        boolean forcePreload) throws IgniteCheckedException {
-        demander.addAssignments(assignments, forcePreload);
+    @Override public Callable addAssignments(GridDhtPreloaderAssignments 
assignments,
+        boolean forcePreload, Collection<String> caches) throws 
IgniteCheckedException {
+        return demander.addAssignments(assignments, forcePreload, caches);
     }
 
     /**

http://git-wip-us.apache.org/repos/asf/ignite/blob/9dd41d5f/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/rebalancing/GridCacheRebalancingSyncSelfTest.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/rebalancing/GridCacheRebalancingSyncSelfTest.java
 
b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/rebalancing/GridCacheRebalancingSyncSelfTest.java
index 028a8fc..b6d11c5 100644
--- 
a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/rebalancing/GridCacheRebalancingSyncSelfTest.java
+++ 
b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/rebalancing/GridCacheRebalancingSyncSelfTest.java
@@ -88,6 +88,7 @@ public class GridCacheRebalancingSyncSelfTest extends 
GridCommonAbstractTest {
         cachePCfg.setBackups(1);
         cachePCfg.setRebalanceBatchSize(1);
         cachePCfg.setRebalanceBatchesCount(1);
+        cachePCfg.setRebalanceOrder(2);
 
         CacheConfiguration<Integer, Integer> cachePCfg2 = new 
CacheConfiguration<>();
 
@@ -95,6 +96,8 @@ public class GridCacheRebalancingSyncSelfTest extends 
GridCommonAbstractTest {
         cachePCfg2.setCacheMode(CacheMode.PARTITIONED);
         cachePCfg2.setRebalanceMode(CacheRebalanceMode.SYNC);
         cachePCfg2.setBackups(1);
+        cachePCfg2.setRebalanceOrder(2);
+      //  cachePCfg2.setRebalanceDelay(5000);
 
         CacheConfiguration<Integer, Integer> cacheRCfg = new 
CacheConfiguration<>();
 
@@ -103,13 +106,13 @@ public class GridCacheRebalancingSyncSelfTest extends 
GridCommonAbstractTest {
         cacheRCfg.setRebalanceMode(CacheRebalanceMode.SYNC);
         cacheRCfg.setRebalanceBatchSize(1);
         cacheRCfg.setRebalanceBatchesCount(Integer.MAX_VALUE);
-        cacheRCfg.setRebalanceDelay(5000);
 
         CacheConfiguration<Integer, Integer> cacheRCfg2 = new 
CacheConfiguration<>();
 
         cacheRCfg2.setName(CACHE_NAME_DHT_REPLICATED_2);
         cacheRCfg2.setCacheMode(CacheMode.REPLICATED);
         cacheRCfg2.setRebalanceMode(CacheRebalanceMode.SYNC);
+        cacheRCfg2.setRebalanceOrder(4);
 
         iCfg.setCacheConfiguration(cachePCfg, cachePCfg2, cacheRCfg, 
cacheRCfg2);
 
@@ -356,6 +359,12 @@ public class GridCacheRebalancingSyncSelfTest extends 
GridCommonAbstractTest {
                         U.sleep(10);
                     }
 
+                    waitForRebalancing(0, 5, 0);
+                    waitForRebalancing(1, 5, 0);
+                    waitForRebalancing(2, 5, 0);
+                    waitForRebalancing(3, 5, 0);
+                    waitForRebalancing(4, 5, 0);
+
                     awaitPartitionMapExchange();
 
                     //New cache should start rebalancing.
@@ -409,6 +418,7 @@ public class GridCacheRebalancingSyncSelfTest extends 
GridCommonAbstractTest {
         t2.join();
         t3.join();
 
+        waitForRebalancing(0, 5, 1);
         waitForRebalancing(1, 5, 1);
         waitForRebalancing(2, 5, 1);
         waitForRebalancing(3, 5, 1);

http://git-wip-us.apache.org/repos/asf/ignite/blob/9dd41d5f/modules/yardstick/config/benchmark-rebalancing.properties
----------------------------------------------------------------------
diff --git a/modules/yardstick/config/benchmark-rebalancing.properties 
b/modules/yardstick/config/benchmark-rebalancing.properties
index 604c7ac..796e49f 100644
--- a/modules/yardstick/config/benchmark-rebalancing.properties
+++ b/modules/yardstick/config/benchmark-rebalancing.properties
@@ -70,8 +70,6 @@ nodesNum=$((`echo ${SERVER_HOSTS} | tr ',' '\n' | wc -l` + 
`echo ${DRIVER_HOSTS}
 # Run configuration.
 CONFIGS="\
 -cfg ${SCRIPT_DIR}/../config/ignite-rebalancing-multicast-config.xml -nn 
${nodesNum} -b 1 -w 0 -d 140 -t 64 -sm PRIMARY_SYNC -dn 
IgniteRebalancePutGetBenchmark -sn IgniteNode -ds PutGet2 -cl -r 20000000 -cn 
rebalance2,\
--cfg ${SCRIPT_DIR}/../config/ignite-rebalancing-multicast-config.xml -nn 
${nodesNum} -b 1 -w 0 -d 140 -t 64 -sm PRIMARY_SYNC -dn 
IgniteRebalancePutGetBenchmark -sn IgniteNode -ds PutGet2_1 -cl -r 20000000 -cn 
rebalance2,\
--cfg ${SCRIPT_DIR}/../config/ignite-rebalancing-multicast-config.xml -nn 
${nodesNum} -b 1 -w 0 -d 140 -t 64 -sm PRIMARY_SYNC -dn 
IgniteRebalancePutGetBenchmark -sn IgniteNode -ds PutGet2_2 -cl -r 20000000 -cn 
rebalance2,\
 "
 #-cfg ${SCRIPT_DIR}/../config/ignite-rebalancing-multicast-config.xml -nn 
${nodesNum} -b 1 -w 0 -d 140 -t 64 -sm PRIMARY_SYNC -dn 
IgniteRebalancePutGetBenchmark -sn IgniteNode -ds PutGet1 -cl -r 20000000 -cn 
rebalance1,\
 #-cfg ${SCRIPT_DIR}/../config/ignite-rebalancing-multicast-config.xml -nn 
${nodesNum} -b 1 -w 0 -d 140 -t 64 -sm PRIMARY_SYNC -dn 
IgniteRebalancePutGetBenchmark -sn IgniteNode -ds PutGet2 -cl -r 20000000 -cn 
rebalance2,\

Reply via email to