Ignite-1093

Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/a34a408b
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/a34a408b
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/a34a408b

Branch: refs/heads/ignite-1093-3
Commit: a34a408bf4736aca2879207037ba3bfe5d87de82
Parents: 1334e77
Author: Anton Vinogradov <a...@apache.org>
Authored: Tue Oct 27 12:00:13 2015 +0300
Committer: Anton Vinogradov <a...@apache.org>
Committed: Tue Oct 27 12:00:13 2015 +0300

----------------------------------------------------------------------
 .../configuration/CacheConfiguration.java       |   50 +-
 .../configuration/IgniteConfiguration.java      |   30 +-
 .../apache/ignite/internal/IgniteKernal.java    |   10 +
 .../org/apache/ignite/internal/IgnitionEx.java  |    3 +
 .../communication/GridIoMessageFactory.java     |   10 +-
 .../processors/cache/GridCacheIoManager.java    |    9 +
 .../processors/cache/GridCacheMapEntry.java     |   38 +-
 .../GridCachePartitionExchangeManager.java      |  157 ++-
 .../processors/cache/GridCachePreloader.java    |   43 +-
 .../cache/GridCachePreloaderAdapter.java        |   35 +-
 .../processors/cache/GridCacheProcessor.java    |   54 +-
 .../distributed/dht/GridDhtCacheEntry.java      |    5 +-
 .../distributed/dht/GridDhtLocalPartition.java  |   60 +-
 .../GridDhtPartitionDemandMessage.java          |    9 +-
 .../preloader/GridDhtPartitionDemandPool.java   | 1192 ----------------
 .../dht/preloader/GridDhtPartitionDemander.java | 1310 ++++++++++++++++++
 .../dht/preloader/GridDhtPartitionSupplier.java |  999 +++++++++++++
 .../GridDhtPartitionSupplyMessageV2.java        |  404 ++++++
 .../preloader/GridDhtPartitionSupplyPool.java   |  555 --------
 .../dht/preloader/GridDhtPreloader.java         |  269 +++-
 .../datastructures/DataStructuresProcessor.java |    3 +
 .../processors/task/GridTaskWorker.java         |    4 +-
 .../ignite/internal/util/lang/GridTuple4.java   |    2 +-
 .../ignite/internal/util/nio/GridNioServer.java |    2 +
 .../spi/discovery/tcp/TcpDiscoverySpi.java      |    2 +-
 ...eAtomicInvalidPartitionHandlingSelfTest.java |    3 +
 .../GridCacheRebalancingAsyncSelfTest.java      |   63 +
 .../GridCacheRebalancingSyncSelfTest.java       |  472 +++++++
 .../GridCacheReplicatedPreloadSelfTest.java     |   22 +-
 .../spi/discovery/tcp/TcpDiscoverySelfTest.java |   17 -
 .../spi/discovery/tcp/TestTcpDiscoverySpi.java  |   46 +
 .../testframework/junits/GridAbstractTest.java  |    3 +-
 .../junits/common/GridCommonAbstractTest.java   |    5 +-
 .../testsuites/IgniteCacheTestSuite3.java       |    5 +
 .../tcp/GridOrderedMessageCancelSelfTest.java   |   18 +-
 35 files changed, 3946 insertions(+), 1963 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/ignite/blob/a34a408b/modules/core/src/main/java/org/apache/ignite/configuration/CacheConfiguration.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/configuration/CacheConfiguration.java
 
b/modules/core/src/main/java/org/apache/ignite/configuration/CacheConfiguration.java
index 6ac2b64..4012792 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/configuration/CacheConfiguration.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/configuration/CacheConfiguration.java
@@ -73,6 +73,9 @@ public class CacheConfiguration<K, V> extends 
MutableConfiguration<K, V> {
     /** Default rebalance timeout (ms).*/
     public static final long DFLT_REBALANCE_TIMEOUT = 10000;
 
+    /** Default rebalance batches count. */
+    public static final long DFLT_REBALANCE_BATCHES_COUNT = 2;
+
     /** Time in milliseconds to wait between rebalance messages to avoid 
overloading CPU. */
     public static final long DFLT_REBALANCE_THROTTLE = 0;
 
@@ -256,6 +259,9 @@ public class CacheConfiguration<K, V> extends 
MutableConfiguration<K, V> {
     /** Off-heap memory size. */
     private long offHeapMaxMem = DFLT_OFFHEAP_MEMORY;
 
+    /** Rebalance batches count. */
+    private long rebalanceBatchesCount = DFLT_REBALANCE_BATCHES_COUNT;
+
     /** */
     private boolean swapEnabled = DFLT_SWAP_ENABLED;
 
@@ -396,6 +402,7 @@ public class CacheConfiguration<K, V> extends 
MutableConfiguration<K, V> {
         rebalanceDelay = cc.getRebalanceDelay();
         rebalanceOrder = cc.getRebalanceOrder();
         rebalancePoolSize = cc.getRebalanceThreadPoolSize();
+        rebalanceBatchesCount = cc.getRebalanceBatchesCount();
         rebalanceTimeout = cc.getRebalanceTimeout();
         rebalanceThrottle = cc.getRebalanceThrottle();
         readFromBackup = cc.isReadFromBackup();
@@ -1033,10 +1040,10 @@ public class CacheConfiguration<K, V> extends 
MutableConfiguration<K, V> {
      * {@link CacheRebalanceMode#SYNC SYNC} or {@link CacheRebalanceMode#ASYNC 
ASYNC} rebalance modes only.
      * <p/>
      * If cache rebalance order is positive, rebalancing for this cache will 
be started only when rebalancing for
-     * all caches with smaller rebalance order (except caches with rebalance 
order {@code 0}) will be completed.
+     * all caches with smaller rebalance order will be completed.
      * <p/>
      * Note that cache with order {@code 0} does not participate in ordering. 
This means that cache with
-     * rebalance order {@code 1} will never wait for any other caches. All 
caches with order {@code 0} will
+     * rebalance order {@code 0} will never wait for any other caches. All 
caches with order {@code 0} will
      * be rebalanced right away concurrently with each other and ordered 
rebalance processes.
      * <p/>
      * If not set, cache order is 0, i.e. rebalancing is not ordered.
@@ -1079,7 +1086,7 @@ public class CacheConfiguration<K, V> extends 
MutableConfiguration<K, V> {
      * @return {@code this} for chaining.
      */
     public CacheConfiguration<K, V> setRebalanceBatchSize(int 
rebalanceBatchSize) {
-        this.rebalanceBatchSize = rebalanceBatchSize;
+        this.rebalanceBatchSize = Math.max(1, rebalanceBatchSize);
 
         return this;
     }
@@ -1269,11 +1276,9 @@ public class CacheConfiguration<K, V> extends 
MutableConfiguration<K, V> {
         return this;
     }
 
+    @Deprecated
     /**
-     * Gets size of rebalancing thread pool. Note that size serves as a hint 
and implementation
-     * may create more threads for rebalancing than specified here (but never 
less threads).
-     * <p>
-     * Default value is {@link #DFLT_REBALANCE_THREAD_POOL_SIZE}.
+     * Use {@link IgniteConfiguration#getRebalanceThreadPoolSize()} instead.
      *
      * @return Size of rebalancing thread pool.
      */
@@ -1281,9 +1286,9 @@ public class CacheConfiguration<K, V> extends 
MutableConfiguration<K, V> {
         return rebalancePoolSize;
     }
 
+    @Deprecated
     /**
-     * Sets size of rebalancing thread pool. Note that size serves as a hint 
and implementation may create more threads
-     * for rebalancing than specified here (but never less threads).
+     * Use {@link IgniteConfiguration#getRebalanceThreadPoolSize()} instead.
      *
      * @param rebalancePoolSize Size of rebalancing thread pool.
      * @return {@code this} for chaining.
@@ -1773,6 +1778,33 @@ public class CacheConfiguration<K, V> extends 
MutableConfiguration<K, V> {
     }
 
     /**
+     * To gain better rebalancing performance supplier node can provide mode 
than one batch at start and provide
+     * one new to each next demand request.
+     *
+     * Gets number of batches generated by supply node at rebalancing start.
+     *
+     * @return batches count
+     */
+    public long getRebalanceBatchesCount() {
+        return rebalanceBatchesCount;
+    }
+
+    /**
+     *  To gain better rebalancing performance supplier node can provide mode 
than one batch at start and provide
+     * one new to each next demand request.
+     *
+     * Sets number of batches generated by supply node at rebalancing start.
+     *
+     * @param rebalanceBatchesCnt batches count.
+     * @return {@code this} for chaining.
+     */
+    public CacheConfiguration<K, V> setRebalanceBatchesCount(long 
rebalanceBatchesCnt) {
+        this.rebalanceBatchesCount = rebalanceBatchesCnt;
+
+        return this;
+    }
+
+    /**
      * Gets cache store session listener factories.
      *
      * @return Cache store session listener factories.

http://git-wip-us.apache.org/repos/asf/ignite/blob/a34a408b/modules/core/src/main/java/org/apache/ignite/configuration/IgniteConfiguration.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/configuration/IgniteConfiguration.java
 
b/modules/core/src/main/java/org/apache/ignite/configuration/IgniteConfiguration.java
index ecae356..26145e3 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/configuration/IgniteConfiguration.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/configuration/IgniteConfiguration.java
@@ -149,6 +149,9 @@ public class IgniteConfiguration {
     /** Default keep alive time for public thread pool. */
     public static final long DFLT_PUBLIC_KEEP_ALIVE_TIME = 0;
 
+    /** Default limit of threads used at rebalance. */
+    public static final int DFLT_REBALANCE_THREAD_POOL_SIZE = 2;
+
     /** Default max queue capacity of public thread pool. */
     public static final int DFLT_PUBLIC_THREADPOOL_QUEUE_CAP = 
Integer.MAX_VALUE;
 
@@ -354,6 +357,9 @@ public class IgniteConfiguration {
     /** Client mode flag. */
     private Boolean clientMode;
 
+    /** Rebalance thread pool size. */
+    private int rebalanceThreadPoolSize = DFLT_REBALANCE_THREAD_POOL_SIZE;
+
     /** Transactions configuration. */
     private TransactionConfiguration txCfg = new TransactionConfiguration();
 
@@ -518,6 +524,7 @@ public class IgniteConfiguration {
         utilityCachePoolSize = cfg.getUtilityCacheThreadPoolSize();
         waitForSegOnStart = cfg.isWaitForSegmentOnStart();
         warmupClos = cfg.getWarmupClosure();
+        rebalanceThreadPoolSize = cfg.getRebalanceThreadPoolSize();
     }
 
     /**
@@ -1331,6 +1338,27 @@ public class IgniteConfiguration {
     }
 
     /**
+     * Gets Max count of threads can be used at rebalancing.
+     * Minimum is 1.
+     * @return count.
+     */
+    public int getRebalanceThreadPoolSize(){
+        return Math.max(1, rebalanceThreadPoolSize);
+    }
+
+    /**
+     * Sets Max count of threads can be used at rebalancing.
+     * Minimum is 1.
+     * @param size Size.
+     * @return {@code this} for chaining.
+     */
+    public IgniteConfiguration setRebalanceThreadPoolSize(int size){
+        this.rebalanceThreadPoolSize = size;
+
+        return this;
+    }
+
+    /**
      * Returns a collection of life-cycle beans. These beans will be 
automatically
      * notified of grid life-cycle events. Use life-cycle beans whenever you
      * want to perform certain logic before and after grid startup and stopping
@@ -2383,4 +2411,4 @@ public class IgniteConfiguration {
     @Override public String toString() {
         return S.toString(IgniteConfiguration.class, this);
     }
-}
\ No newline at end of file
+}

http://git-wip-us.apache.org/repos/asf/ignite/blob/a34a408b/modules/core/src/main/java/org/apache/ignite/internal/IgniteKernal.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/IgniteKernal.java 
b/modules/core/src/main/java/org/apache/ignite/internal/IgniteKernal.java
index c02dc59..da8cf3b 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/IgniteKernal.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/IgniteKernal.java
@@ -733,6 +733,7 @@ public class IgniteKernal implements IgniteEx, 
IgniteMXBean, Externalizable {
         ackEnvironmentVariables();
         ackCacheConfiguration();
         ackP2pConfiguration();
+        ackRebalanceConfiguration();
 
         // Run background network diagnostics.
         GridDiagnostic.runBackgroundCheck(gridName, execSvc, log);
@@ -2135,6 +2136,15 @@ public class IgniteKernal implements IgniteEx, 
IgniteMXBean, Externalizable {
     /**
      *
      */
+    private void ackRebalanceConfiguration() throws IgniteCheckedException {
+        if (cfg.getSystemThreadPoolSize() <= cfg.getRebalanceThreadPoolSize())
+            throw new IgniteCheckedException("Rebalance thread pool size 
exceed or equals System thread pool size. " +
+                "Change IgniteConfiguration.rebalanceThreadPoolSize property 
before next start.");
+    }
+
+    /**
+     *
+     */
     private void ackCacheConfiguration() {
         CacheConfiguration[] cacheCfgs = cfg.getCacheConfiguration();
 

http://git-wip-us.apache.org/repos/asf/ignite/blob/a34a408b/modules/core/src/main/java/org/apache/ignite/internal/IgnitionEx.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/IgnitionEx.java 
b/modules/core/src/main/java/org/apache/ignite/internal/IgnitionEx.java
index 02b28c5..7d2b2dc 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/IgnitionEx.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/IgnitionEx.java
@@ -2035,6 +2035,7 @@ public class IgnitionEx {
             cache.setAffinity(new RendezvousAffinityFunction(false, 20));
             cache.setNodeFilter(CacheConfiguration.ALL_NODES);
             cache.setStartSize(300);
+            cache.setRebalanceOrder(-2);//Prior to other system caches.
 
             return cache;
         }
@@ -2055,6 +2056,7 @@ public class IgnitionEx {
             cache.setWriteSynchronizationMode(FULL_SYNC);
             cache.setAffinity(new RendezvousAffinityFunction(false, 100));
             cache.setNodeFilter(CacheConfiguration.ALL_NODES);
+            cache.setRebalanceOrder(-1);//Prior to user caches.
 
             return cache;
         }
@@ -2075,6 +2077,7 @@ public class IgnitionEx {
             ccfg.setWriteSynchronizationMode(FULL_SYNC);
             ccfg.setCacheMode(cfg.getCacheMode());
             ccfg.setNodeFilter(CacheConfiguration.ALL_NODES);
+            ccfg.setRebalanceOrder(-1);//Prior to user caches.
 
             if (cfg.getCacheMode() == PARTITIONED)
                 ccfg.setBackups(cfg.getBackups());

http://git-wip-us.apache.org/repos/asf/ignite/blob/a34a408b/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoMessageFactory.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoMessageFactory.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoMessageFactory.java
index 079015c..ae8c753 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoMessageFactory.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoMessageFactory.java
@@ -74,6 +74,7 @@ import 
org.apache.ignite.internal.processors.cache.distributed.dht.preloader.Gri
 import 
org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPartitionDemandMessage;
 import 
org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPartitionExchangeId;
 import 
org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPartitionSupplyMessage;
+import 
org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPartitionSupplyMessageV2;
 import 
org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPartitionsFullMessage;
 import 
org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPartitionsSingleMessage;
 import 
org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPartitionsSingleRequest;
@@ -684,7 +685,12 @@ public class GridIoMessageFactory implements 
MessageFactory {
 
                 break;
 
-            // [-3..112] - this
+            case 114:
+                msg = new GridDhtPartitionSupplyMessageV2();
+
+                break;
+
+            // [-3..114] - this
             // [120..123] - DR
             // [-4..-22] - SQL
             default:
@@ -722,4 +728,4 @@ public class GridIoMessageFactory implements MessageFactory 
{
 
         CUSTOM.put(type, c);
     }
-}
\ No newline at end of file
+}

http://git-wip-us.apache.org/repos/asf/ignite/blob/a34a408b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheIoManager.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheIoManager.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheIoManager.java
index 476a96c..1fe55d9 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheIoManager.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheIoManager.java
@@ -46,6 +46,7 @@ import 
org.apache.ignite.internal.processors.cache.distributed.dht.atomic.GridNe
 import 
org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtForceKeysRequest;
 import 
org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtForceKeysResponse;
 import 
org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPartitionSupplyMessage;
+import 
org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPartitionSupplyMessageV2;
 import 
org.apache.ignite.internal.processors.cache.distributed.near.GridNearGetFuture;
 import 
org.apache.ignite.internal.processors.cache.distributed.near.GridNearGetRequest;
 import 
org.apache.ignite.internal.processors.cache.distributed.near.GridNearGetResponse;
@@ -483,6 +484,14 @@ public class GridCacheIoManager extends 
GridCacheSharedManagerAdapter {
 
             break;
 
+            case 114: {
+                GridDhtPartitionSupplyMessageV2 req = 
(GridDhtPartitionSupplyMessageV2)msg;
+
+                U.error(log, "Supply message v2 cannot be unmarshalled.", 
req.classError());
+            }
+
+            break;
+
             default:
                 throw new IgniteCheckedException("Failed to send response to 
node. Unsupported direct type [message="
                     + msg + "]");

http://git-wip-us.apache.org/repos/asf/ignite/blob/a34a408b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMapEntry.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMapEntry.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMapEntry.java
index 4bf0aa1..4e92ed4 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMapEntry.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMapEntry.java
@@ -453,7 +453,7 @@ public abstract class GridCacheMapEntry extends 
GridMetadataAwareAdapter impleme
             if (cctx.swap().offheapSwapEvict(key, entry, partition(), 
evictVer)) {
                 assert !hasValueUnlocked() : this;
 
-                obsolete = markObsolete0(obsoleteVer, false);
+                obsolete = markObsolete0(obsoleteVer, false, null);
 
                 assert obsolete : this;
             }
@@ -1303,7 +1303,7 @@ public abstract class GridCacheMapEntry extends 
GridMetadataAwareAdapter impleme
             synchronized (this) {
                 // If entry is still removed.
                 if (newVer == ver) {
-                    if (obsoleteVer == null || !(marked = 
markObsolete0(obsoleteVer, true))) {
+                    if (obsoleteVer == null || !(marked = 
markObsolete0(obsoleteVer, true, null))) {
                         if (log.isDebugEnabled())
                             log.debug("Entry could not be marked obsolete (it 
is still used): " + this);
                     }
@@ -2420,7 +2420,7 @@ public abstract class GridCacheMapEntry extends 
GridMetadataAwareAdapter impleme
                 try {
                     if ((!hasReaders() || readers)) {
                         // markObsolete will clear the value.
-                        if (!(marked = markObsolete0(ver, true))) {
+                        if (!(marked = markObsolete0(ver, true, null))) {
                             if (log.isDebugEnabled())
                                 log.debug("Entry could not be marked obsolete 
(it is still used): " + this);
 
@@ -2478,7 +2478,7 @@ public abstract class GridCacheMapEntry extends 
GridMetadataAwareAdapter impleme
         boolean obsolete;
 
         synchronized (this) {
-            obsolete = markObsolete0(ver, true);
+            obsolete = markObsolete0(ver, true, null);
         }
 
         if (obsolete)
@@ -2511,7 +2511,7 @@ public abstract class GridCacheMapEntry extends 
GridMetadataAwareAdapter impleme
                         }
                     }
                     else
-                        obsolete = markObsolete0(ver, true);
+                        obsolete = markObsolete0(ver, true, null);
                 }
             }
         }
@@ -2539,7 +2539,7 @@ public abstract class GridCacheMapEntry extends 
GridMetadataAwareAdapter impleme
             if (!this.ver.equals(ver))
                 return false;
 
-            marked = markObsolete0(ver, true);
+            marked = markObsolete0(ver, true, null);
         }
 
         if (marked)
@@ -2555,9 +2555,10 @@ public abstract class GridCacheMapEntry extends 
GridMetadataAwareAdapter impleme
      *
      * @param ver Version.
      * @param clear {@code True} to clear.
+     * @param extras Predefined extras.
      * @return {@code True} if entry is obsolete, {@code false} if entry is 
still used by other threads or nodes.
      */
-    protected final boolean markObsolete0(GridCacheVersion ver, boolean clear) 
{
+    protected final boolean markObsolete0(GridCacheVersion ver, boolean clear, 
GridCacheObsoleteEntryExtras extras) {
         assert Thread.holdsLock(this);
 
         GridCacheVersion obsoleteVer = obsoleteVersionExtras();
@@ -2572,7 +2573,7 @@ public abstract class GridCacheMapEntry extends 
GridMetadataAwareAdapter impleme
             if (mvcc == null || mvcc.isEmpty(ver)) {
                 obsoleteVer = ver;
 
-                obsoleteVersionExtras(obsoleteVer);
+                obsoleteVersionExtras(obsoleteVer, extras);
 
                 if (clear)
                     value(null);
@@ -2896,7 +2897,7 @@ public abstract class GridCacheMapEntry extends 
GridMetadataAwareAdapter impleme
 
                 synchronized (this) {
                     if (checkExpired()) {
-                        rmv = markObsolete0(cctx.versions().next(this.ver), 
true);
+                        rmv = markObsolete0(cctx.versions().next(this.ver), 
true, null);
 
                         return null;
                     }
@@ -3366,7 +3367,7 @@ public abstract class GridCacheMapEntry extends 
GridMetadataAwareAdapter impleme
                             }
                         }
                         else {
-                            if (markObsolete0(obsoleteVer, true))
+                            if (markObsolete0(obsoleteVer, true, null))
                                 obsolete = true; // Success, will return 
"true".
                         }
                     }
@@ -3688,7 +3689,7 @@ public abstract class GridCacheMapEntry extends 
GridMetadataAwareAdapter impleme
 
                     CacheObject prev = saveOldValueUnlocked(false);
 
-                    if (!hasReaders() && markObsolete0(obsoleteVer, false)) {
+                    if (!hasReaders() && markObsolete0(obsoleteVer, false, 
null)) {
                         if (swap) {
                             if (!isStartVersion()) {
                                 try {
@@ -3736,7 +3737,7 @@ public abstract class GridCacheMapEntry extends 
GridMetadataAwareAdapter impleme
 
                         CacheObject prevVal = saveValueForIndexUnlocked();
 
-                        if (!hasReaders() && markObsolete0(obsoleteVer, 
false)) {
+                        if (!hasReaders() && markObsolete0(obsoleteVer, false, 
null)) {
                             if (swap) {
                                 if (!isStartVersion()) {
                                     try {
@@ -3812,7 +3813,7 @@ public abstract class GridCacheMapEntry extends 
GridMetadataAwareAdapter impleme
         GridCacheBatchSwapEntry ret = null;
 
         try {
-            if (!hasReaders() && markObsolete0(obsoleteVer, false)) {
+            if (!hasReaders() && markObsolete0(obsoleteVer, false, null)) {
                 if (!isStartVersion() && hasValueUnlocked()) {
                     if (cctx.offheapTiered() && hasOffHeapPointer()) {
                         if (cctx.swap().offheapEvictionEnabled())
@@ -3871,7 +3872,7 @@ public abstract class GridCacheMapEntry extends 
GridMetadataAwareAdapter impleme
                     return false;
 
                 if (checkExpired()) {
-                    rmv = markObsolete0(cctx.versions().next(this.ver), true);
+                    rmv = markObsolete0(cctx.versions().next(this.ver), true, 
null);
 
                     return false;
                 }
@@ -3984,9 +3985,12 @@ public abstract class GridCacheMapEntry extends 
GridMetadataAwareAdapter impleme
     /**
      * @param obsoleteVer Obsolete version.
      */
-    protected void obsoleteVersionExtras(@Nullable GridCacheVersion 
obsoleteVer) {
-        extras = (extras != null) ? extras.obsoleteVersion(obsoleteVer) : 
obsoleteVer != null ?
-            new GridCacheObsoleteEntryExtras(obsoleteVer) : null;
+    protected void obsoleteVersionExtras(@Nullable GridCacheVersion 
obsoleteVer, GridCacheObsoleteEntryExtras ext) {
+        extras = (extras != null) ?
+            extras.obsoleteVersion(obsoleteVer) :
+            obsoleteVer != null ?
+                (ext != null) ? ext : new 
GridCacheObsoleteEntryExtras(obsoleteVer) :
+                null;
     }
 
     /**

http://git-wip-us.apache.org/repos/asf/ignite/blob/a34a408b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java
index adc2174..6793f9f 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java
@@ -21,12 +21,15 @@ import java.util.ArrayList;
 import java.util.Collection;
 import java.util.Comparator;
 import java.util.HashMap;
+import java.util.LinkedList;
 import java.util.List;
 import java.util.Map;
+import java.util.NavigableMap;
 import java.util.Queue;
+import java.util.TreeMap;
 import java.util.UUID;
 import java.util.concurrent.BlockingQueue;
-import java.util.concurrent.ConcurrentLinkedQueue;
+import java.util.concurrent.Callable;
 import java.util.concurrent.ConcurrentMap;
 import java.util.concurrent.LinkedBlockingDeque;
 import java.util.concurrent.atomic.AtomicBoolean;
@@ -49,9 +52,11 @@ import 
org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion;
 import 
org.apache.ignite.internal.processors.cache.distributed.dht.GridClientPartitionTopology;
 import 
org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtPartitionTopology;
 import 
org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtTopologyFuture;
+import 
org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPartitionDemandMessage;
 import 
org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPartitionExchangeId;
 import 
org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPartitionFullMap;
 import 
org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPartitionMap;
+import 
org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPartitionSupplyMessageV2;
 import 
org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPartitionsExchangeFuture;
 import 
org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPartitionsFullMessage;
 import 
org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPartitionsSingleMessage;
@@ -65,8 +70,10 @@ import 
org.apache.ignite.internal.util.future.GridFutureAdapter;
 import org.apache.ignite.internal.util.tostring.GridToStringExclude;
 import org.apache.ignite.internal.util.tostring.GridToStringInclude;
 import org.apache.ignite.internal.util.typedef.CI1;
+import org.apache.ignite.internal.util.typedef.CI2;
 import org.apache.ignite.internal.util.typedef.F;
 import org.apache.ignite.internal.util.typedef.internal.CU;
+import org.apache.ignite.internal.util.typedef.internal.GPC;
 import org.apache.ignite.internal.util.typedef.internal.S;
 import org.apache.ignite.internal.util.typedef.internal.U;
 import org.apache.ignite.internal.util.worker.GridWorker;
@@ -75,6 +82,7 @@ import org.apache.ignite.lang.IgniteUuid;
 import org.apache.ignite.thread.IgniteThread;
 import org.jetbrains.annotations.Nullable;
 import org.jsr166.ConcurrentHashMap8;
+import org.jsr166.ConcurrentLinkedDeque8;
 
 import static java.util.concurrent.TimeUnit.MILLISECONDS;
 import static 
org.apache.ignite.IgniteSystemProperties.IGNITE_AFFINITY_HISTORY_SIZE;
@@ -85,6 +93,7 @@ import static 
org.apache.ignite.events.EventType.EVT_CACHE_REBALANCE_STARTED;
 import static org.apache.ignite.events.EventType.EVT_NODE_FAILED;
 import static org.apache.ignite.events.EventType.EVT_NODE_JOINED;
 import static org.apache.ignite.events.EventType.EVT_NODE_LEFT;
+import static org.apache.ignite.internal.GridTopic.TOPIC_CACHE;
 import static 
org.apache.ignite.internal.events.DiscoveryCustomEvent.EVT_DISCOVERY_CUSTOM_EVT;
 import static 
org.apache.ignite.internal.managers.communication.GridIoPolicy.SYSTEM_POOL;
 import static 
org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPreloader.DFLT_PRELOAD_RESEND_TIMEOUT;
@@ -132,6 +141,9 @@ public class GridCachePartitionExchangeManager<K, V> 
extends GridCacheSharedMana
     /** */
     private GridFutureAdapter<?> reconnectExchangeFut;
 
+    /** */
+    private final Queue<Callable<Boolean>> rebalancingQueue = new 
ConcurrentLinkedDeque8<>();
+
     /**
      * Partition map futures.
      * This set also contains already completed exchange futures to address 
race conditions when coordinator
@@ -309,6 +321,34 @@ public class GridCachePartitionExchangeManager<K, V> 
extends GridCacheSharedMana
 
         exchWorker.futQ.addFirst(fut);
 
+        if (!cctx.kernalContext().clientNode()) {
+
+            for (int cnt = 0; cnt < 
cctx.gridConfig().getRebalanceThreadPoolSize(); cnt++) {
+                final int idx = cnt;
+
+                cctx.io().addOrderedHandler(rebalanceTopic(cnt), new CI2<UUID, 
GridCacheMessage>() {
+                    @Override public void apply(final UUID id, final 
GridCacheMessage m) {
+                        if (!enterBusy())
+                            return;
+
+                        try {
+                            if (m instanceof GridDhtPartitionSupplyMessageV2)
+                                
cctx.cacheContext(m.cacheId).preloader().handleSupplyMessage(
+                                    idx, id, 
(GridDhtPartitionSupplyMessageV2)m);
+                            else if (m instanceof 
GridDhtPartitionDemandMessage)
+                                
cctx.cacheContext(m.cacheId).preloader().handleDemandMessage(
+                                    idx, id, (GridDhtPartitionDemandMessage)m);
+                            else
+                                log.error("Unsupported message type: " + 
m.getClass().getName());
+                        }
+                        finally {
+                            leaveBusy();
+                        }
+                    }
+                });
+            }
+        }
+
         new IgniteThread(cctx.gridName(), "exchange-worker", 
exchWorker).start();
 
         if (reconnect) {
@@ -368,6 +408,14 @@ public class GridCachePartitionExchangeManager<K, V> 
extends GridCacheSharedMana
         }
     }
 
+    /**
+     * @param idx
+     * @return topic
+     */
+    public static Object rebalanceTopic(int idx) {
+        return TOPIC_CACHE.topic("Rebalance", idx);
+    }
+
     /** {@inheritDoc} */
     @Override protected void onKernalStop0(boolean cancel) {
         cctx.gridEvents().removeLocalEventListener(discoLsnr);
@@ -392,6 +440,11 @@ public class GridCachePartitionExchangeManager<K, V> 
extends GridCacheSharedMana
         for (AffinityReadyFuture f : readyFuts.values())
             f.onDone(stopErr);
 
+        if (!cctx.kernalContext().clientNode())
+            for (int cnt = 0; cnt < 
cctx.gridConfig().getRebalanceThreadPoolSize(); cnt++) {
+                cctx.io().removeOrderedHandler(rebalanceTopic(cnt));
+            }
+
         U.cancel(exchWorker);
 
         if (log.isDebugEnabled())
@@ -1103,9 +1156,15 @@ public class GridCachePartitionExchangeManager<K, V> 
extends GridCacheSharedMana
 
             boolean startEvtFired = false;
 
+            int cnt = 0;
+
+            IgniteInternalFuture asyncStartFut = null;
+
             while (!isCancelled()) {
                 GridDhtPartitionsExchangeFuture exchFut = null;
 
+                cnt++;
+
                 try {
                     boolean preloadFinished = true;
 
@@ -1220,12 +1279,106 @@ public class GridCachePartitionExchangeManager<K, V> 
extends GridCacheSharedMana
                     }
 
                     if (assignsMap != null) {
+                        rebalancingQueue.clear();
+
+                        NavigableMap<Integer, List<Integer>> orderMap = new 
TreeMap<>();
+
                         for (Map.Entry<Integer, GridDhtPreloaderAssignments> e 
: assignsMap.entrySet()) {
                             int cacheId = e.getKey();
 
                             GridCacheContext<K, V> cacheCtx = 
cctx.cacheContext(cacheId);
 
-                            cacheCtx.preloader().addAssignments(e.getValue(), 
forcePreload);
+                            int order = cacheCtx.config().getRebalanceOrder();
+
+                            if (orderMap.get(order) == null)
+                                orderMap.put(order, new LinkedList<Integer>());
+
+                            orderMap.get(order).add(cacheId);
+                        }
+
+                        Callable<Boolean> marsR = null;
+
+                        //Ordered rebalance scheduling.
+                        for (Integer order : orderMap.keySet()) {
+                            for (Integer cacheId : orderMap.get(order)) {
+                                GridCacheContext<K, V> cacheCtx = 
cctx.cacheContext(cacheId);
+
+                                List<String> waitList = new LinkedList<>();
+
+                                for (List<Integer> cIds : 
orderMap.headMap(order).values()) {
+                                    for (Integer cId : cIds) {
+                                        
waitList.add(cctx.cacheContext(cId).name());
+                                    }
+                                }
+
+                                Callable<Boolean> r = 
cacheCtx.preloader().addAssignments(
+                                    assignsMap.get(cacheId), forcePreload, 
waitList, cnt);
+
+                                if (r != null) {
+                                    U.log(log, "Cache rebalancing scheduled: 
[cache=" + cacheCtx.name() +
+                                        ", waitList=" + waitList.toString() + 
"]");
+
+                                    if (cacheId == 
CU.cacheId(GridCacheUtils.MARSH_CACHE_NAME))
+                                        marsR = r;
+                                    else
+                                        rebalancingQueue.add(r);
+                                }
+                            }
+                        }
+
+                        if (asyncStartFut != null)
+                            asyncStartFut.get(); // Wait for thread stop.
+
+                        if (marsR != null || !rebalancingQueue.isEmpty()) {
+                            if (futQ.isEmpty()) {
+                                U.log(log, "Starting caches rebalancing [top=" 
+ exchFut.topologyVersion() + "]");
+
+                                if (marsR != null)
+                                    try {
+                                        marsR.call();//Marshaller cache 
rebalancing launches in sync way.
+                                    }
+                                    catch (Exception ex) {
+                                        if (log.isDebugEnabled())
+                                            log.debug("Failed to send initial 
demand request to node");
+
+                                        continue;
+                                    }
+
+                                final GridFutureAdapter fut = new 
GridFutureAdapter();
+
+                                asyncStartFut = fut;
+
+                                
cctx.kernalContext().closure().callLocalSafe(new GPC<Boolean>() {
+                                    @Override public Boolean call() {
+                                        try {
+                                            while (true) {
+                                                Callable<Boolean> r = 
rebalancingQueue.poll();
+
+                                                if (r == null)
+                                                    return false;
+
+                                                if (!r.call())
+                                                    return false;
+                                            }
+                                        }
+                                        catch (Exception ex) {
+                                            if (log.isDebugEnabled())
+                                                log.debug("Failed to send 
initial demand request to node");
+
+                                            return false;
+                                        }
+                                        finally {
+                                            fut.onDone();
+                                        }
+                                    }
+                                }, /*system pool*/ true);
+                            }
+                            else {
+                                U.log(log, "Obsolete exchange, skipping 
rebalancing [top=" + exchFut.topologyVersion() + "]");
+                            }
+                        }
+                        else {
+                            U.log(log, "Nothing scheduled, skipping 
rebalancing [top=" + exchFut.topologyVersion() + "]");
                         }
                     }
                 }

http://git-wip-us.apache.org/repos/asf/ignite/blob/a34a408b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePreloader.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePreloader.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePreloader.java
index 755958e..79861a2 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePreloader.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePreloader.java
@@ -18,9 +18,14 @@
 package org.apache.ignite.internal.processors.cache;
 
 import java.util.Collection;
+import java.util.UUID;
+import java.util.concurrent.Callable;
 import org.apache.ignite.IgniteCheckedException;
 import org.apache.ignite.internal.IgniteInternalFuture;
 import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion;
+import 
org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtLocalPartition;
+import 
org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPartitionDemandMessage;
+import 
org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPartitionSupplyMessageV2;
 import 
org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPartitionsExchangeFuture;
 import 
org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPreloaderAssignments;
 import org.apache.ignite.lang.IgnitePredicate;
@@ -90,8 +95,11 @@ public interface GridCachePreloader {
      *
      * @param assignments Assignments to add.
      * @param forcePreload Force preload flag.
+     * @param caches Rebalancing of these caches will be finished before this 
started.
+     * @param cnt Counter.
      */
-    public void addAssignments(GridDhtPreloaderAssignments assignments, 
boolean forcePreload);
+    public Callable<Boolean> addAssignments(GridDhtPreloaderAssignments 
assignments, boolean forcePreload,
+        Collection<String> caches, int cnt);
 
     /**
      * @param p Preload predicate.
@@ -115,6 +123,11 @@ public interface GridCachePreloader {
     public IgniteInternalFuture<?> syncFuture();
 
     /**
+     * @return Future which will complete when preloading is finished on 
current topology.
+     */
+    public IgniteInternalFuture<Boolean> rebalanceFuture();
+
+    /**
      * Requests that preloader sends the request for the key.
      *
      * @param keys Keys to request.
@@ -132,4 +145,30 @@ public interface GridCachePreloader {
      * Unwinds undeploys.
      */
     public void unwindUndeploys();
-}
\ No newline at end of file
+
+
+    /**
+     * Handles Supply message.
+     *
+     * @param idx Index.
+     * @param id Node Id.
+     * @param s Supply message.
+     */
+    public void handleSupplyMessage(int idx, UUID id, final 
GridDhtPartitionSupplyMessageV2 s);
+
+    /**
+     * Handles Demand message.
+     *
+     * @param idx Index.
+     * @param id Node Id.
+     * @param d Demand message.
+     */
+    public void handleDemandMessage(int idx, UUID id, 
GridDhtPartitionDemandMessage d);
+
+    /**
+     * Evicts partition asynchronously.
+     *
+     * @param part Partition.
+     */
+    public void evictPartitionAsync(GridDhtLocalPartition part);
+}

http://git-wip-us.apache.org/repos/asf/ignite/blob/a34a408b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePreloaderAdapter.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePreloaderAdapter.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePreloaderAdapter.java
index 5405449..b784383 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePreloaderAdapter.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePreloaderAdapter.java
@@ -18,11 +18,16 @@
 package org.apache.ignite.internal.processors.cache;
 
 import java.util.Collection;
+import java.util.UUID;
+import java.util.concurrent.Callable;
 import org.apache.ignite.IgniteCheckedException;
 import org.apache.ignite.IgniteLogger;
 import org.apache.ignite.cache.affinity.AffinityFunction;
 import org.apache.ignite.internal.IgniteInternalFuture;
 import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion;
+import 
org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtLocalPartition;
+import 
org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPartitionDemandMessage;
+import 
org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPartitionSupplyMessageV2;
 import 
org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPartitionsExchangeFuture;
 import 
org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPreloaderAssignments;
 import org.apache.ignite.internal.util.future.GridFinishedFuture;
@@ -36,7 +41,7 @@ public class GridCachePreloaderAdapter implements 
GridCachePreloader {
     /** Cache context. */
     protected final GridCacheContext<?, ?> cctx;
 
-    /** Logger.*/
+    /** Logger. */
     protected final IgniteLogger log;
 
     /** Affinity. */
@@ -113,12 +118,28 @@ public class GridCachePreloaderAdapter implements 
GridCachePreloader {
     }
 
     /** {@inheritDoc} */
+    @Override public IgniteInternalFuture<Boolean> rebalanceFuture() {
+        return new GridFinishedFuture<>(true);
+    }
+
+    /** {@inheritDoc} */
     @Override public void unwindUndeploys() {
         cctx.deploy().unwind(cctx);
     }
 
     /** {@inheritDoc} */
-    @Override public IgniteInternalFuture<Object> 
request(Collection<KeyCacheObject> keys, AffinityTopologyVersion topVer) {
+    @Override public void handleSupplyMessage(int idx, UUID id, 
GridDhtPartitionSupplyMessageV2 s) {
+        // No-op.
+    }
+
+    /** {@inheritDoc} */
+    @Override public void handleDemandMessage(int idx, UUID id, 
GridDhtPartitionDemandMessage d) {
+        // No-op.
+    }
+
+    /** {@inheritDoc} */
+    @Override public IgniteInternalFuture<Object> 
request(Collection<KeyCacheObject> keys,
+        AffinityTopologyVersion topVer) {
         return new GridFinishedFuture<>();
     }
 
@@ -143,7 +164,13 @@ public class GridCachePreloaderAdapter implements 
GridCachePreloader {
     }
 
     /** {@inheritDoc} */
-    @Override public void addAssignments(GridDhtPreloaderAssignments 
assignments, boolean forcePreload) {
+    @Override public Callable<Boolean> 
addAssignments(GridDhtPreloaderAssignments assignments, boolean forcePreload,
+        Collection<String> caches, int cnt) {
+        return null;
+    }
+
+    /** {@inheritDoc} */
+    @Override public void evictPartitionAsync(GridDhtLocalPartition part) {
         // No-op.
     }
-}
\ No newline at end of file
+}

http://git-wip-us.apache.org/repos/asf/ignite/blob/a34a408b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java
index 722e570..c2acd99 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java
@@ -31,9 +31,7 @@ import java.util.LinkedList;
 import java.util.List;
 import java.util.ListIterator;
 import java.util.Map;
-import java.util.NavigableMap;
 import java.util.Set;
-import java.util.TreeMap;
 import java.util.UUID;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ConcurrentMap;
@@ -99,7 +97,6 @@ import 
org.apache.ignite.internal.processors.cache.version.GridCacheVersionManag
 import org.apache.ignite.internal.processors.plugin.CachePluginManager;
 import org.apache.ignite.internal.processors.query.GridQueryProcessor;
 import org.apache.ignite.internal.util.F0;
-import org.apache.ignite.internal.util.future.GridCompoundFuture;
 import org.apache.ignite.internal.util.future.GridFinishedFuture;
 import org.apache.ignite.internal.util.future.GridFutureAdapter;
 import org.apache.ignite.internal.util.tostring.GridToStringInclude;
@@ -162,12 +159,6 @@ public class GridCacheProcessor extends 
GridProcessorAdapter {
     /** Map of proxies. */
     private final Map<String, IgniteCacheProxy<?, ?>> jCacheProxies;
 
-    /** Map of preload finish futures grouped by preload order. */
-    private final NavigableMap<Integer, IgniteInternalFuture<?>> preloadFuts;
-
-    /** Maximum detected rebalance order. */
-    private int maxRebalanceOrder;
-
     /** Caches stop sequence. */
     private final Deque<String> stopSeq;
 
@@ -209,7 +200,6 @@ public class GridCacheProcessor extends 
GridProcessorAdapter {
 
         caches = new ConcurrentHashMap<>();
         jCacheProxies = new ConcurrentHashMap<>();
-        preloadFuts = new TreeMap<>();
 
         stopSeq = new LinkedList<>();
     }
@@ -392,10 +382,8 @@ public class GridCacheProcessor extends 
GridProcessorAdapter {
             U.warn(log, "AffinityFunction configuration parameter will be 
ignored for local cache [cacheName=" +
                 U.maskName(cc.getName()) + ']');
 
-        if (cc.getRebalanceMode() != CacheRebalanceMode.NONE) {
-            assertParameter(cc.getRebalanceThreadPoolSize() > 0, 
"rebalanceThreadPoolSize > 0");
+        if (cc.getRebalanceMode() != CacheRebalanceMode.NONE)
             assertParameter(cc.getRebalanceBatchSize() > 0, 
"rebalanceBatchSize > 0");
-        }
 
         if (cc.getCacheMode() == PARTITIONED || cc.getCacheMode() == 
REPLICATED) {
             if (cc.getAtomicityMode() == ATOMIC && 
cc.getWriteSynchronizationMode() == FULL_ASYNC)
@@ -614,8 +602,6 @@ public class GridCacheProcessor extends 
GridProcessorAdapter {
                     "Deployment mode for cache is not CONTINUOUS or SHARED.");
         }
 
-        maxRebalanceOrder = 
validatePreloadOrder(ctx.config().getCacheConfiguration());
-
         ctx.discovery().setCustomEventListener(DynamicCacheChangeBatch.class,
             new CustomEventListener<DynamicCacheChangeBatch>() {
                 @Override public void onCustomEvent(ClusterNode snd,
@@ -846,31 +832,6 @@ public class GridCacheProcessor extends 
GridProcessorAdapter {
         for (GridCacheSharedManager<?, ?> mgr : sharedCtx.managers())
             mgr.onKernalStart(false);
 
-        for (Map.Entry<String, GridCacheAdapter<?, ?>> e : caches.entrySet()) {
-            GridCacheAdapter cache = e.getValue();
-
-            if (maxRebalanceOrder > 0) {
-                CacheConfiguration cfg = cache.configuration();
-
-                int order = cfg.getRebalanceOrder();
-
-                if (order > 0 && order != maxRebalanceOrder && 
cfg.getCacheMode() != LOCAL) {
-                    GridCompoundFuture fut = 
(GridCompoundFuture)preloadFuts.get(order);
-
-                    if (fut == null) {
-                        fut = new GridCompoundFuture<>();
-
-                        preloadFuts.put(order, fut);
-                    }
-
-                    fut.add(cache.preloader().syncFuture());
-                }
-            }
-        }
-
-        for (IgniteInternalFuture<?> fut : preloadFuts.values())
-            ((GridCompoundFuture<Object, Object>)fut).markInitialized();
-
         for (GridCacheAdapter<?, ?> cache : caches.values())
             onKernalStart(cache);
 
@@ -2779,19 +2740,6 @@ public class GridCacheProcessor extends 
GridProcessorAdapter {
     }
 
     /**
-     * Gets preload finish future for preload-ordered cache with given order. 
I.e. will get compound preload future
-     * with maximum order less than {@code order}.
-     *
-     * @param order Cache order.
-     * @return Compound preload future or {@code null} if order is minimal 
order found.
-     */
-    @Nullable public IgniteInternalFuture<?> orderedPreloadFuture(int order) {
-        Map.Entry<Integer, IgniteInternalFuture<?>> entry = 
preloadFuts.lowerEntry(order);
-
-        return entry == null ? null : entry.getValue();
-    }
-
-    /**
      * @param spaceName Space name.
      * @param keyBytes Key bytes.
      * @param valBytes Value bytes.

http://git-wip-us.apache.org/repos/asf/ignite/blob/a34a408b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtCacheEntry.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtCacheEntry.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtCacheEntry.java
index be2f3d3..b2279ed 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtCacheEntry.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtCacheEntry.java
@@ -37,6 +37,7 @@ import 
org.apache.ignite.internal.processors.cache.GridCacheMvccCandidate;
 import org.apache.ignite.internal.processors.cache.KeyCacheObject;
 import 
org.apache.ignite.internal.processors.cache.distributed.GridDistributedCacheEntry;
 import 
org.apache.ignite.internal.processors.cache.distributed.GridDistributedLockCancelledException;
+import 
org.apache.ignite.internal.processors.cache.extras.GridCacheObsoleteEntryExtras;
 import 
org.apache.ignite.internal.processors.cache.transactions.IgniteInternalTx;
 import org.apache.ignite.internal.processors.cache.version.GridCacheVersion;
 import org.apache.ignite.internal.util.lang.GridPlainRunnable;
@@ -539,7 +540,7 @@ public class GridDhtCacheEntry extends 
GridDistributedCacheEntry {
      * @return {@code True} if entry was not being used, passed the filter and 
could be removed.
      * @throws IgniteCheckedException If failed to remove from swap.
      */
-    public boolean clearInternal(GridCacheVersion ver, boolean swap) throws 
IgniteCheckedException {
+    public boolean clearInternal(GridCacheVersion ver, boolean swap, 
GridCacheObsoleteEntryExtras extras) throws IgniteCheckedException {
         boolean rmv = false;
 
         try {
@@ -548,7 +549,7 @@ public class GridDhtCacheEntry extends 
GridDistributedCacheEntry {
 
                 // Call markObsolete0 to avoid recursive calls to clear if
                 // we are clearing dht local partition (onMarkedObsolete 
should not be called).
-                if (!markObsolete0(ver, false)) {
+                if (!markObsolete0(ver, false, extras)) {
                     if (log.isDebugEnabled())
                         log.debug("Entry could not be marked obsolete (it is 
still used or has readers): " + this);
 

http://git-wip-us.apache.org/repos/asf/ignite/blob/a34a408b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtLocalPartition.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtLocalPartition.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtLocalPartition.java
index 4f124e6..b3c13a8 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtLocalPartition.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtLocalPartition.java
@@ -17,6 +17,17 @@
 
 package org.apache.ignite.internal.processors.cache.distributed.dht;
 
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.CopyOnWriteArrayList;
+import java.util.concurrent.atomic.AtomicReference;
+import java.util.concurrent.atomic.AtomicStampedReference;
+import java.util.concurrent.locks.ReentrantLock;
+import javax.cache.CacheException;
 import org.apache.ignite.IgniteCheckedException;
 import org.apache.ignite.IgniteLogger;
 import org.apache.ignite.internal.IgniteInternalFuture;
@@ -27,10 +38,10 @@ import 
org.apache.ignite.internal.processors.cache.GridCacheEntryRemovedExceptio
 import org.apache.ignite.internal.processors.cache.GridCacheSwapEntry;
 import org.apache.ignite.internal.processors.cache.KeyCacheObject;
 import 
org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPreloader;
+import 
org.apache.ignite.internal.processors.cache.extras.GridCacheObsoleteEntryExtras;
 import org.apache.ignite.internal.processors.cache.version.GridCacheVersion;
 import org.apache.ignite.internal.processors.query.GridQueryProcessor;
 import org.apache.ignite.internal.util.GridCircularBuffer;
-import org.apache.ignite.internal.util.future.GridFinishedFuture;
 import org.apache.ignite.internal.util.future.GridFutureAdapter;
 import org.apache.ignite.internal.util.lang.GridCloseableIterator;
 import org.apache.ignite.internal.util.tostring.GridToStringExclude;
@@ -38,7 +49,6 @@ import org.apache.ignite.internal.util.typedef.CI1;
 import org.apache.ignite.internal.util.typedef.F;
 import org.apache.ignite.internal.util.typedef.T2;
 import org.apache.ignite.internal.util.typedef.internal.CU;
-import org.apache.ignite.internal.util.typedef.internal.GPC;
 import org.apache.ignite.internal.util.typedef.internal.S;
 import org.apache.ignite.internal.util.typedef.internal.U;
 import org.apache.ignite.lang.IgniteUuid;
@@ -46,18 +56,6 @@ import org.jetbrains.annotations.NotNull;
 import org.jsr166.ConcurrentHashMap8;
 import org.jsr166.LongAdder8;
 
-import javax.cache.CacheException;
-import java.util.Collection;
-import java.util.HashMap;
-import java.util.Iterator;
-import java.util.Map;
-import java.util.Set;
-import java.util.concurrent.ConcurrentMap;
-import java.util.concurrent.CopyOnWriteArrayList;
-import java.util.concurrent.atomic.AtomicReference;
-import java.util.concurrent.atomic.AtomicStampedReference;
-import java.util.concurrent.locks.ReentrantLock;
-
 import static 
org.apache.ignite.IgniteSystemProperties.IGNITE_ATOMIC_CACHE_DELETE_HISTORY_SIZE;
 import static 
org.apache.ignite.events.EventType.EVT_CACHE_REBALANCE_OBJECT_UNLOADED;
 import static 
org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtPartitionState.EVICTED;
@@ -286,7 +284,7 @@ public class GridDhtLocalPartition implements 
Comparable<GridDhtLocalPartition>,
         }
 
         // Attempt to evict.
-        tryEvict(true);
+        cctx.preloader().evictPartitionAsync(this);
     }
 
     /**
@@ -411,7 +409,7 @@ public class GridDhtLocalPartition implements 
Comparable<GridDhtLocalPartition>,
 
             // Decrement reservations.
             if (state.compareAndSet(s, s, reservations, --reservations)) {
-                tryEvict(true);
+                cctx.preloader().evictPartitionAsync(this);
 
                 break;
             }
@@ -479,7 +477,7 @@ public class GridDhtLocalPartition implements 
Comparable<GridDhtLocalPartition>,
      * @param updateSeq Update sequence.
      * @return Future for evict attempt.
      */
-    IgniteInternalFuture<Boolean> tryEvictAsync(boolean updateSeq) {
+    void tryEvictAsync(boolean updateSeq) {
         if (map.isEmpty() && !GridQueryProcessor.isEnabled(cctx.config()) &&
             state.compareAndSet(RENTING, EVICTED, 0, 0)) {
             if (log.isDebugEnabled())
@@ -497,15 +495,10 @@ public class GridDhtLocalPartition implements 
Comparable<GridDhtLocalPartition>,
             ((GridDhtPreloader)cctx.preloader()).onPartitionEvicted(this, 
updateSeq);
 
             clearDeferredDeletes();
-
-            return new GridFinishedFuture<>(true);
         }
-
-        return cctx.closures().callLocalSafe(new GPC<Boolean>() {
-            @Override public Boolean call() {
-                return tryEvict(true);
-            }
-        }, /*system pool*/ true);
+        else {
+            cctx.preloader().evictPartitionAsync(this);
+        }
     }
 
     /**
@@ -521,12 +514,11 @@ public class GridDhtLocalPartition implements 
Comparable<GridDhtLocalPartition>,
     }
 
     /**
-     * @param updateSeq Update sequence.
      * @return {@code True} if entry has been transitioned to state EVICTED.
      */
-    boolean tryEvict(boolean updateSeq) {
+    public void tryEvict() {
         if (state.getReference() != RENTING || state.getStamp() != 0 || 
groupReserved())
-            return false;
+            return;
 
         // Attempt to evict partition entries from cache.
         clearAll();
@@ -545,14 +537,10 @@ public class GridDhtLocalPartition implements 
Comparable<GridDhtLocalPartition>,
 
             rent.onDone();
 
-            ((GridDhtPreloader)cctx.preloader()).onPartitionEvicted(this, 
updateSeq);
+            ((GridDhtPreloader)cctx.preloader()).onPartitionEvicted(this, 
true);
 
             clearDeferredDeletes();
-
-            return true;
         }
-
-        return false;
     }
 
     /**
@@ -592,7 +580,7 @@ public class GridDhtLocalPartition implements 
Comparable<GridDhtLocalPartition>,
      *
      */
     void onUnlock() {
-        tryEvict(true);
+        cctx.preloader().evictPartitionAsync(this);
     }
 
     /**
@@ -632,6 +620,8 @@ public class GridDhtLocalPartition implements 
Comparable<GridDhtLocalPartition>,
                 it = F.concat(it, unswapIt);
         }
 
+        GridCacheObsoleteEntryExtras extras = new 
GridCacheObsoleteEntryExtras(clearVer);
+
         try {
             while (it.hasNext()) {
                 GridDhtCacheEntry cached = null;
@@ -639,7 +629,7 @@ public class GridDhtLocalPartition implements 
Comparable<GridDhtLocalPartition>,
                 try {
                     cached = it.next();
 
-                    if (cached.clearInternal(clearVer, swap)) {
+                    if (cached.clearInternal(clearVer, swap, extras)) {
                         map.remove(cached.key(), cached);
 
                         if (!cached.isInternal()) {

http://git-wip-us.apache.org/repos/asf/ignite/blob/a34a408b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionDemandMessage.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionDemandMessage.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionDemandMessage.java
index 848ad87..885b0dd 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionDemandMessage.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionDemandMessage.java
@@ -116,6 +116,13 @@ public class GridDhtPartitionDemandMessage extends 
GridCacheMessage {
     }
 
     /**
+     * @param updateSeq Update sequence.
+     */
+    void updateSequence(long updateSeq) {
+        this.updateSeq = updateSeq;
+    }
+
+    /**
      * @return Update sequence.
      */
     long updateSequence() {
@@ -320,7 +327,7 @@ public class GridDhtPartitionDemandMessage extends 
GridCacheMessage {
 
     /** {@inheritDoc} */
     @Override public String toString() {
-        return S.toString(GridDhtPartitionDemandMessage.class, this, 
"partCnt", parts.size(), "super",
+        return S.toString(GridDhtPartitionDemandMessage.class, this, 
"partCnt", parts != null ? parts.size() : 0, "super",
             super.toString());
     }
 }

Reply via email to