Repository: ignite
Updated Branches:
  refs/heads/ignite-1093-2 252d4e72b -> dca0198c5


1093


Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/dca0198c
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/dca0198c
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/dca0198c

Branch: refs/heads/ignite-1093-2
Commit: dca0198c57a687a9ebe9851b277a9fe5b51e9de2
Parents: 252d4e7
Author: Anton Vinogradov <[email protected]>
Authored: Mon Oct 12 20:46:15 2015 +0300
Committer: Anton Vinogradov <[email protected]>
Committed: Mon Oct 12 20:46:15 2015 +0300

----------------------------------------------------------------------
 .../processors/cache/GridCacheMapEntry.java     | 38 ++++----
 .../GridCachePartitionExchangeManager.java      | 52 +++++++----
 .../processors/cache/GridCachePreloader.java    |  2 +-
 .../cache/GridCachePreloaderAdapter.java        |  2 +-
 .../distributed/dht/GridDhtCacheEntry.java      |  5 +-
 .../distributed/dht/GridDhtLocalPartition.java  | 94 +++++++++++++-------
 .../dht/preloader/GridDhtPartitionDemander.java | 10 +--
 .../dht/preloader/GridDhtPreloader.java         |  2 +-
 .../GridCacheRebalancingSyncSelfTest.java       |  2 +-
 .../config/benchmark-rebalancing.properties     |  2 +-
 10 files changed, 131 insertions(+), 78 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/ignite/blob/dca0198c/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMapEntry.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMapEntry.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMapEntry.java
index 4bf0aa1..4e92ed4 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMapEntry.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMapEntry.java
@@ -453,7 +453,7 @@ public abstract class GridCacheMapEntry extends 
GridMetadataAwareAdapter impleme
             if (cctx.swap().offheapSwapEvict(key, entry, partition(), 
evictVer)) {
                 assert !hasValueUnlocked() : this;
 
-                obsolete = markObsolete0(obsoleteVer, false);
+                obsolete = markObsolete0(obsoleteVer, false, null);
 
                 assert obsolete : this;
             }
@@ -1303,7 +1303,7 @@ public abstract class GridCacheMapEntry extends 
GridMetadataAwareAdapter impleme
             synchronized (this) {
                 // If entry is still removed.
                 if (newVer == ver) {
-                    if (obsoleteVer == null || !(marked = 
markObsolete0(obsoleteVer, true))) {
+                    if (obsoleteVer == null || !(marked = 
markObsolete0(obsoleteVer, true, null))) {
                         if (log.isDebugEnabled())
                             log.debug("Entry could not be marked obsolete (it 
is still used): " + this);
                     }
@@ -2420,7 +2420,7 @@ public abstract class GridCacheMapEntry extends 
GridMetadataAwareAdapter impleme
                 try {
                     if ((!hasReaders() || readers)) {
                         // markObsolete will clear the value.
-                        if (!(marked = markObsolete0(ver, true))) {
+                        if (!(marked = markObsolete0(ver, true, null))) {
                             if (log.isDebugEnabled())
                                 log.debug("Entry could not be marked obsolete 
(it is still used): " + this);
 
@@ -2478,7 +2478,7 @@ public abstract class GridCacheMapEntry extends 
GridMetadataAwareAdapter impleme
         boolean obsolete;
 
         synchronized (this) {
-            obsolete = markObsolete0(ver, true);
+            obsolete = markObsolete0(ver, true, null);
         }
 
         if (obsolete)
@@ -2511,7 +2511,7 @@ public abstract class GridCacheMapEntry extends 
GridMetadataAwareAdapter impleme
                         }
                     }
                     else
-                        obsolete = markObsolete0(ver, true);
+                        obsolete = markObsolete0(ver, true, null);
                 }
             }
         }
@@ -2539,7 +2539,7 @@ public abstract class GridCacheMapEntry extends 
GridMetadataAwareAdapter impleme
             if (!this.ver.equals(ver))
                 return false;
 
-            marked = markObsolete0(ver, true);
+            marked = markObsolete0(ver, true, null);
         }
 
         if (marked)
@@ -2555,9 +2555,10 @@ public abstract class GridCacheMapEntry extends 
GridMetadataAwareAdapter impleme
      *
      * @param ver Version.
      * @param clear {@code True} to clear.
+     * @param extras Predefined extras.
      * @return {@code True} if entry is obsolete, {@code false} if entry is 
still used by other threads or nodes.
      */
-    protected final boolean markObsolete0(GridCacheVersion ver, boolean clear) 
{
+    protected final boolean markObsolete0(GridCacheVersion ver, boolean clear, 
GridCacheObsoleteEntryExtras extras) {
         assert Thread.holdsLock(this);
 
         GridCacheVersion obsoleteVer = obsoleteVersionExtras();
@@ -2572,7 +2573,7 @@ public abstract class GridCacheMapEntry extends 
GridMetadataAwareAdapter impleme
             if (mvcc == null || mvcc.isEmpty(ver)) {
                 obsoleteVer = ver;
 
-                obsoleteVersionExtras(obsoleteVer);
+                obsoleteVersionExtras(obsoleteVer, extras);
 
                 if (clear)
                     value(null);
@@ -2896,7 +2897,7 @@ public abstract class GridCacheMapEntry extends 
GridMetadataAwareAdapter impleme
 
                 synchronized (this) {
                     if (checkExpired()) {
-                        rmv = markObsolete0(cctx.versions().next(this.ver), 
true);
+                        rmv = markObsolete0(cctx.versions().next(this.ver), 
true, null);
 
                         return null;
                     }
@@ -3366,7 +3367,7 @@ public abstract class GridCacheMapEntry extends 
GridMetadataAwareAdapter impleme
                             }
                         }
                         else {
-                            if (markObsolete0(obsoleteVer, true))
+                            if (markObsolete0(obsoleteVer, true, null))
                                 obsolete = true; // Success, will return 
"true".
                         }
                     }
@@ -3688,7 +3689,7 @@ public abstract class GridCacheMapEntry extends 
GridMetadataAwareAdapter impleme
 
                     CacheObject prev = saveOldValueUnlocked(false);
 
-                    if (!hasReaders() && markObsolete0(obsoleteVer, false)) {
+                    if (!hasReaders() && markObsolete0(obsoleteVer, false, 
null)) {
                         if (swap) {
                             if (!isStartVersion()) {
                                 try {
@@ -3736,7 +3737,7 @@ public abstract class GridCacheMapEntry extends 
GridMetadataAwareAdapter impleme
 
                         CacheObject prevVal = saveValueForIndexUnlocked();
 
-                        if (!hasReaders() && markObsolete0(obsoleteVer, 
false)) {
+                        if (!hasReaders() && markObsolete0(obsoleteVer, false, 
null)) {
                             if (swap) {
                                 if (!isStartVersion()) {
                                     try {
@@ -3812,7 +3813,7 @@ public abstract class GridCacheMapEntry extends 
GridMetadataAwareAdapter impleme
         GridCacheBatchSwapEntry ret = null;
 
         try {
-            if (!hasReaders() && markObsolete0(obsoleteVer, false)) {
+            if (!hasReaders() && markObsolete0(obsoleteVer, false, null)) {
                 if (!isStartVersion() && hasValueUnlocked()) {
                     if (cctx.offheapTiered() && hasOffHeapPointer()) {
                         if (cctx.swap().offheapEvictionEnabled())
@@ -3871,7 +3872,7 @@ public abstract class GridCacheMapEntry extends 
GridMetadataAwareAdapter impleme
                     return false;
 
                 if (checkExpired()) {
-                    rmv = markObsolete0(cctx.versions().next(this.ver), true);
+                    rmv = markObsolete0(cctx.versions().next(this.ver), true, 
null);
 
                     return false;
                 }
@@ -3984,9 +3985,12 @@ public abstract class GridCacheMapEntry extends 
GridMetadataAwareAdapter impleme
     /**
      * @param obsoleteVer Obsolete version.
      */
-    protected void obsoleteVersionExtras(@Nullable GridCacheVersion 
obsoleteVer) {
-        extras = (extras != null) ? extras.obsoleteVersion(obsoleteVer) : 
obsoleteVer != null ?
-            new GridCacheObsoleteEntryExtras(obsoleteVer) : null;
+    protected void obsoleteVersionExtras(@Nullable GridCacheVersion 
obsoleteVer, GridCacheObsoleteEntryExtras ext) {
+        extras = (extras != null) ?
+            extras.obsoleteVersion(obsoleteVer) :
+            obsoleteVer != null ?
+                (ext != null) ? ext : new 
GridCacheObsoleteEntryExtras(obsoleteVer) :
+                null;
     }
 
     /**

http://git-wip-us.apache.org/repos/asf/ignite/blob/dca0198c/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java
index 7a210ce..ada4c84 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java
@@ -25,13 +25,11 @@ import java.util.LinkedList;
 import java.util.List;
 import java.util.Map;
 import java.util.NavigableMap;
+import java.util.Queue;
 import java.util.TreeMap;
 import java.util.UUID;
 import java.util.concurrent.BlockingQueue;
-import java.util.concurrent.Callable;
 import java.util.concurrent.ConcurrentMap;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Executors;
 import java.util.concurrent.LinkedBlockingDeque;
 import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.AtomicLong;
@@ -74,15 +72,16 @@ import org.apache.ignite.internal.util.typedef.CI1;
 import org.apache.ignite.internal.util.typedef.CI2;
 import org.apache.ignite.internal.util.typedef.F;
 import org.apache.ignite.internal.util.typedef.internal.CU;
+import org.apache.ignite.internal.util.typedef.internal.GPC;
 import org.apache.ignite.internal.util.typedef.internal.S;
 import org.apache.ignite.internal.util.typedef.internal.U;
 import org.apache.ignite.internal.util.worker.GridWorker;
 import org.apache.ignite.lang.IgniteBiInClosure;
 import org.apache.ignite.lang.IgniteUuid;
 import org.apache.ignite.thread.IgniteThread;
-import org.apache.ignite.thread.IgniteThreadFactory;
 import org.jetbrains.annotations.Nullable;
 import org.jsr166.ConcurrentHashMap8;
+import org.jsr166.ConcurrentLinkedDeque8;
 
 import static java.util.concurrent.TimeUnit.MILLISECONDS;
 import static 
org.apache.ignite.IgniteSystemProperties.IGNITE_AFFINITY_HISTORY_SIZE;
@@ -142,7 +141,10 @@ public class GridCachePartitionExchangeManager<K, V> 
extends GridCacheSharedMana
     private GridFutureAdapter<?> reconnectExchangeFut;
 
     /** */
-    private ExecutorService rebalancingOrderedExecutorService;
+    private final Queue<Runnable> rebalancingQueue = new 
ConcurrentLinkedDeque8<>();
+
+    /** */
+    private final AtomicReference<Integer> rebalancingQueueOwning = new 
AtomicReference<>(0);
 
     /**
      * Partition map futures.
@@ -259,9 +261,6 @@ public class GridCachePartitionExchangeManager<K, V> 
extends GridCacheSharedMana
     @Override protected void start0() throws IgniteCheckedException {
         super.start0();
 
-        rebalancingOrderedExecutorService = Executors.newSingleThreadExecutor(
-                new IgniteThreadFactory(cctx.gridName(), 
"rebalancing-assigns"));
-
         exchWorker = new ExchangeWorker();
 
         cctx.gridEvents().addLocalEventListener(discoLsnr, EVT_NODE_JOINED, 
EVT_NODE_LEFT, EVT_NODE_FAILED,
@@ -466,8 +465,6 @@ public class GridCachePartitionExchangeManager<K, V> 
extends GridCacheSharedMana
     @Override protected void stop0(boolean cancel) {
         super.stop0(cancel);
 
-        rebalancingOrderedExecutorService.shutdownNow();
-
         // Do not allow any activity in exchange manager after stop.
         busyLock.writeLock().lock();
 
@@ -1329,14 +1326,39 @@ public class GridCachePartitionExchangeManager<K, V> 
extends GridCacheSharedMana
                                     }
                                 }
 
-                                Callable c = 
cacheCtx.preloader().addAssignments(
-                                        assignsMap.get(cacheId), forcePreload, 
waitList, cnt);
+                                Runnable r = 
cacheCtx.preloader().addAssignments(
+                                    assignsMap.get(cacheId), forcePreload, 
waitList, cnt);
 
-                                if (c != null) {
+                                if (r != null) {
                                     U.log(log, "Rebalancing scheduled: 
[cache=" + cacheCtx.name() +
-                                            " , waitList=" + 
waitList.toString() + "]");
+                                        " , waitList=" + waitList.toString() + 
"]");
+
+                                    rebalancingQueue.add(r);
 
-                                    
rebalancingOrderedExecutorService.submit(c);
+                                    if (rebalancingQueueOwning.get() == 0) {
+                                        cacheCtx.closures().callLocalSafe(new 
GPC<Boolean>() {
+                                            @Override public Boolean call() {
+                                                while (true) {
+                                                    if 
(!rebalancingQueueOwning.compareAndSet(0, 1))
+                                                        return false;
+
+                                                    try {
+                                                        Runnable rn = 
rebalancingQueue.poll();
+
+                                                        if (rn == null)
+                                                            return false;
+
+                                                        rn.run();
+                                                    }
+                                                    finally {
+                                                        boolean res = 
rebalancingQueueOwning.compareAndSet(1, 0);
+
+                                                        assert res;
+                                                    }
+                                                }
+                                            }
+                                        }, /*system pool*/ true);
+                                    }
                                 }
                             }
                         }

http://git-wip-us.apache.org/repos/asf/ignite/blob/dca0198c/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePreloader.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePreloader.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePreloader.java
index 46a09f0..b2c2a2b 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePreloader.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePreloader.java
@@ -97,7 +97,7 @@ public interface GridCachePreloader {
      * @param caches Rebalancing of these caches will be finished before this 
started.
      * @param cnt Counter.
      */
-    public Callable addAssignments(GridDhtPreloaderAssignments assignments, 
boolean forcePreload,
+    public Runnable addAssignments(GridDhtPreloaderAssignments assignments, 
boolean forcePreload,
         Collection<String> caches, int cnt) throws IgniteCheckedException;
 
     /**

http://git-wip-us.apache.org/repos/asf/ignite/blob/dca0198c/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePreloaderAdapter.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePreloaderAdapter.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePreloaderAdapter.java
index 1266a3b..6ffb2bf 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePreloaderAdapter.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePreloaderAdapter.java
@@ -158,7 +158,7 @@ public class GridCachePreloaderAdapter implements 
GridCachePreloader {
     }
 
     /** {@inheritDoc} */
-    @Override public Callable addAssignments(GridDhtPreloaderAssignments 
assignments, boolean forcePreload,
+    @Override public Runnable addAssignments(GridDhtPreloaderAssignments 
assignments, boolean forcePreload,
         Collection<String> caches, int cnt) throws IgniteCheckedException {
         return null;
     }

http://git-wip-us.apache.org/repos/asf/ignite/blob/dca0198c/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtCacheEntry.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtCacheEntry.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtCacheEntry.java
index be2f3d3..b2279ed 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtCacheEntry.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtCacheEntry.java
@@ -37,6 +37,7 @@ import 
org.apache.ignite.internal.processors.cache.GridCacheMvccCandidate;
 import org.apache.ignite.internal.processors.cache.KeyCacheObject;
 import 
org.apache.ignite.internal.processors.cache.distributed.GridDistributedCacheEntry;
 import 
org.apache.ignite.internal.processors.cache.distributed.GridDistributedLockCancelledException;
+import 
org.apache.ignite.internal.processors.cache.extras.GridCacheObsoleteEntryExtras;
 import 
org.apache.ignite.internal.processors.cache.transactions.IgniteInternalTx;
 import org.apache.ignite.internal.processors.cache.version.GridCacheVersion;
 import org.apache.ignite.internal.util.lang.GridPlainRunnable;
@@ -539,7 +540,7 @@ public class GridDhtCacheEntry extends 
GridDistributedCacheEntry {
      * @return {@code True} if entry was not being used, passed the filter and 
could be removed.
      * @throws IgniteCheckedException If failed to remove from swap.
      */
-    public boolean clearInternal(GridCacheVersion ver, boolean swap) throws 
IgniteCheckedException {
+    public boolean clearInternal(GridCacheVersion ver, boolean swap, 
GridCacheObsoleteEntryExtras extras) throws IgniteCheckedException {
         boolean rmv = false;
 
         try {
@@ -548,7 +549,7 @@ public class GridDhtCacheEntry extends 
GridDistributedCacheEntry {
 
                 // Call markObsolete0 to avoid recursive calls to clear if
                 // we are clearing dht local partition (onMarkedObsolete 
should not be called).
-                if (!markObsolete0(ver, false)) {
+                if (!markObsolete0(ver, false, extras)) {
                     if (log.isDebugEnabled())
                         log.debug("Entry could not be marked obsolete (it is 
still used or has readers): " + this);
 

http://git-wip-us.apache.org/repos/asf/ignite/blob/dca0198c/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtLocalPartition.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtLocalPartition.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtLocalPartition.java
index 6e6ccfd..15eede0 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtLocalPartition.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtLocalPartition.java
@@ -21,6 +21,7 @@ import java.util.Collection;
 import java.util.HashMap;
 import java.util.Iterator;
 import java.util.Map;
+import java.util.Queue;
 import java.util.Set;
 import java.util.concurrent.ConcurrentMap;
 import java.util.concurrent.CopyOnWriteArrayList;
@@ -38,10 +39,10 @@ import 
org.apache.ignite.internal.processors.cache.GridCacheEntryRemovedExceptio
 import org.apache.ignite.internal.processors.cache.GridCacheSwapEntry;
 import org.apache.ignite.internal.processors.cache.KeyCacheObject;
 import 
org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPreloader;
+import 
org.apache.ignite.internal.processors.cache.extras.GridCacheObsoleteEntryExtras;
 import org.apache.ignite.internal.processors.cache.version.GridCacheVersion;
 import org.apache.ignite.internal.processors.query.GridQueryProcessor;
 import org.apache.ignite.internal.util.GridCircularBuffer;
-import org.apache.ignite.internal.util.future.GridFinishedFuture;
 import org.apache.ignite.internal.util.future.GridFutureAdapter;
 import org.apache.ignite.internal.util.lang.GridCloseableIterator;
 import org.apache.ignite.internal.util.tostring.GridToStringExclude;
@@ -55,6 +56,7 @@ import org.apache.ignite.internal.util.typedef.internal.U;
 import org.apache.ignite.lang.IgniteUuid;
 import org.jetbrains.annotations.NotNull;
 import org.jsr166.ConcurrentHashMap8;
+import org.jsr166.ConcurrentLinkedDeque8;
 import org.jsr166.LongAdder8;
 
 import static 
org.apache.ignite.IgniteSystemProperties.IGNITE_ATOMIC_CACHE_DELETE_HISTORY_SIZE;
@@ -78,6 +80,12 @@ public class GridDhtLocalPartition implements 
Comparable<GridDhtLocalPartition>,
     /** Logger. */
     private static volatile IgniteLogger log;
 
+    /** */
+    private static final Queue<GridDhtLocalPartition> partitionsToEvict = new 
ConcurrentLinkedDeque8<>();
+
+    /** */
+    private static final AtomicReference<Integer> partitionsEvictionOwning = 
new AtomicReference<>(0);
+
     /** Partition ID. */
     private final int id;
 
@@ -284,7 +292,7 @@ public class GridDhtLocalPartition implements 
Comparable<GridDhtLocalPartition>,
         }
 
         // Attempt to evict.
-        tryEvict(true);
+        tryEvict();
     }
 
     /**
@@ -409,7 +417,7 @@ public class GridDhtLocalPartition implements 
Comparable<GridDhtLocalPartition>,
 
             // Decrement reservations.
             if (state.compareAndSet(s, s, reservations, --reservations)) {
-                tryEvict(true);
+                tryEvict();
 
                 break;
             }
@@ -477,7 +485,7 @@ public class GridDhtLocalPartition implements 
Comparable<GridDhtLocalPartition>,
      * @param updateSeq Update sequence.
      * @return Future for evict attempt.
      */
-    IgniteInternalFuture<Boolean> tryEvictAsync(boolean updateSeq) {
+    void tryEvictAsync(boolean updateSeq) {
         if (map.isEmpty() && !GridQueryProcessor.isEnabled(cctx.config()) &&
             state.compareAndSet(RENTING, EVICTED, 0, 0)) {
             if (log.isDebugEnabled())
@@ -495,15 +503,10 @@ public class GridDhtLocalPartition implements 
Comparable<GridDhtLocalPartition>,
             ((GridDhtPreloader)cctx.preloader()).onPartitionEvicted(this, 
updateSeq);
 
             clearDeferredDeletes();
-
-            return new GridFinishedFuture<>(true);
         }
-
-        return cctx.closures().callLocalSafe(new GPC<Boolean>() {
-            @Override public Boolean call() {
-                return tryEvict(true);
-            }
-        }, /*system pool*/ true);
+        else {
+            tryEvict();
+        }
     }
 
     /**
@@ -519,38 +522,61 @@ public class GridDhtLocalPartition implements 
Comparable<GridDhtLocalPartition>,
     }
 
     /**
-     * @param updateSeq Update sequence.
      * @return {@code True} if entry has been transitioned to state EVICTED.
      */
-    boolean tryEvict(boolean updateSeq) {
+    void tryEvict() {
         if (state.getReference() != RENTING || state.getStamp() != 0 || 
groupReserved())
-            return false;
+            return;
 
-        // Attempt to evict partition entries from cache.
-        clearAll();
+        partitionsToEvict.add(this);
 
-        if (map.isEmpty() && state.compareAndSet(RENTING, EVICTED, 0, 0)) {
-            if (log.isDebugEnabled())
-                log.debug("Evicted partition: " + this);
+        if (partitionsEvictionOwning.get() == 0) {
+            cctx.closures().callLocalSafe(new GPC<Boolean>() {
+                @Override public Boolean call() {
+                    while (true) {
+                        if (!partitionsEvictionOwning.compareAndSet(0, 1))
+                            return false;
 
-            if (!GridQueryProcessor.isEnabled(cctx.config()))
-                clearSwap();
+                        try {
+                            GridDhtLocalPartition part = 
partitionsToEvict.poll();
 
-            if (cctx.isDrEnabled())
-                cctx.dr().partitionEvicted(id);
+                            if (part == null) {
+                                return false;
+                            }
 
-            cctx.dataStructures().onPartitionEvicted(id);
+                            if (part.state.getReference() != EVICTED) {
+                                // Attempt to evict partition entries from 
cache.
+                                part.clearAll();
 
-            rent.onDone();
+                                if (part.map.isEmpty() && 
part.state.compareAndSet(RENTING, EVICTED, 0, 0)) {
+                                    if (log.isDebugEnabled())
+                                        log.debug("Evicted partition: " + 
this);
 
-            ((GridDhtPreloader)cctx.preloader()).onPartitionEvicted(this, 
updateSeq);
+                                    if 
(!GridQueryProcessor.isEnabled(part.cctx.config()))
+                                        part.clearSwap();
 
-            clearDeferredDeletes();
+                                    if (part.cctx.isDrEnabled())
+                                        part.cctx.dr().partitionEvicted(id);
 
-            return true;
-        }
+                                    
part.cctx.dataStructures().onPartitionEvicted(id);
 
-        return false;
+                                    part.rent.onDone();
+
+                                    
((GridDhtPreloader)part.cctx.preloader()).onPartitionEvicted(part, true);
+
+                                    part.clearDeferredDeletes();
+                                }
+                            }
+                        }
+                        finally {
+                            boolean res = 
partitionsEvictionOwning.compareAndSet(1, 0);
+
+                            assert res;
+                        }
+                    }
+                }
+            }, /*system pool*/ true);
+        }
     }
 
     /**
@@ -590,7 +616,7 @@ public class GridDhtLocalPartition implements 
Comparable<GridDhtLocalPartition>,
      *
      */
     void onUnlock() {
-        tryEvict(true);
+        tryEvict();
     }
 
     /**
@@ -630,6 +656,8 @@ public class GridDhtLocalPartition implements 
Comparable<GridDhtLocalPartition>,
                 it = F.concat(it, unswapIt);
         }
 
+        GridCacheObsoleteEntryExtras extras = new 
GridCacheObsoleteEntryExtras(clearVer);
+
         try {
             while (it.hasNext()) {
                 GridDhtCacheEntry cached = null;
@@ -637,7 +665,7 @@ public class GridDhtLocalPartition implements 
Comparable<GridDhtLocalPartition>,
                 try {
                     cached = it.next();
 
-                    if (cached.clearInternal(clearVer, swap)) {
+                    if (cached.clearInternal(clearVer, swap, extras)) {
                         map.remove(cached.key(), cached);
 
                         if (!cached.isInternal()) {

http://git-wip-us.apache.org/repos/asf/ignite/blob/dca0198c/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionDemander.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionDemander.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionDemander.java
index 9b079ec..e68209e 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionDemander.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionDemander.java
@@ -248,7 +248,7 @@ public class GridDhtPartitionDemander {
      * @param cnt Counter.
      * @throws IgniteCheckedException Exception
      */
-    Callable addAssignments(final GridDhtPreloaderAssignments assigns, boolean 
force, final Collection<String> caches,
+    Runnable addAssignments(final GridDhtPreloaderAssignments assigns, boolean 
force, final Collection<String> caches,
         int cnt) throws IgniteCheckedException {
         if (log.isDebugEnabled())
             log.debug("Adding partition assignments: " + assigns);
@@ -293,19 +293,17 @@ public class GridDhtPartitionDemander {
                 return null;
             }
 
-            return new Callable<Boolean>() {
+            return new Runnable() {
                 @Override
-                public Boolean call() {
+                public void run() {
                     for (String c : caches) {
                         waitForCacheRebalancing(c, fut);
 
                         if (fut.isDone())
-                            return false;
+                            return;
                     }
 
                     requestPartitions(fut, assigns);
-
-                    return true;
                 }
             };
         }

http://git-wip-us.apache.org/repos/asf/ignite/blob/dca0198c/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPreloader.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPreloader.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPreloader.java
index 6107daa..01109c1 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPreloader.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPreloader.java
@@ -400,7 +400,7 @@ public class GridDhtPreloader extends 
GridCachePreloaderAdapter {
     }
 
     /** {@inheritDoc} */
-    @Override public Callable addAssignments(GridDhtPreloaderAssignments 
assignments,
+    @Override public Runnable addAssignments(GridDhtPreloaderAssignments 
assignments,
         boolean forcePreload, Collection<String> caches, int cnt) throws 
IgniteCheckedException {
         return demander.addAssignments(assignments, forcePreload, caches, cnt);
     }

http://git-wip-us.apache.org/repos/asf/ignite/blob/dca0198c/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/rebalancing/GridCacheRebalancingSyncSelfTest.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/rebalancing/GridCacheRebalancingSyncSelfTest.java
 
b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/rebalancing/GridCacheRebalancingSyncSelfTest.java
index 897a820..85bf8d6 100644
--- 
a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/rebalancing/GridCacheRebalancingSyncSelfTest.java
+++ 
b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/rebalancing/GridCacheRebalancingSyncSelfTest.java
@@ -106,6 +106,7 @@ public class GridCacheRebalancingSyncSelfTest extends 
GridCommonAbstractTest {
         cacheRCfg.setRebalanceMode(CacheRebalanceMode.SYNC);
         cacheRCfg.setRebalanceBatchSize(1);
         cacheRCfg.setRebalanceBatchesCount(Integer.MAX_VALUE);
+        
((TcpCommunicationSpi)iCfg.getCommunicationSpi()).setSharedMemoryPort(-1);//Shmem
 fix for Integer.MAX_VALUE.
 
         CacheConfiguration<Integer, Integer> cacheRCfg2 = new 
CacheConfiguration<>();
 
@@ -117,7 +118,6 @@ public class GridCacheRebalancingSyncSelfTest extends 
GridCommonAbstractTest {
         iCfg.setCacheConfiguration(cachePCfg, cachePCfg2, cacheRCfg, 
cacheRCfg2);
 
         iCfg.setRebalanceThreadPoolSize(2);
-        
((TcpCommunicationSpi)iCfg.getCommunicationSpi()).setSharedMemoryPort(-1);//Shmem
 fix.
 
         return iCfg;
     }

http://git-wip-us.apache.org/repos/asf/ignite/blob/dca0198c/modules/yardstick/config/benchmark-rebalancing.properties
----------------------------------------------------------------------
diff --git a/modules/yardstick/config/benchmark-rebalancing.properties 
b/modules/yardstick/config/benchmark-rebalancing.properties
index 796e49f..4d0e328 100644
--- a/modules/yardstick/config/benchmark-rebalancing.properties
+++ b/modules/yardstick/config/benchmark-rebalancing.properties
@@ -39,7 +39,7 @@ JVM_OPTS=${JVM_OPTS}" \
   -XX:MaxTenuringThreshold=0 \
   -XX:SurvivorRatio=1024 \
   -XX:+UseCMSInitiatingOccupancyOnly \
-  -XX:CMSInitiatingOccupancyFraction=40 \
+  -XX:CMSInitiatingOccupancyFraction=60 \
   -XX:MaxGCPauseMillis=100 \
 "
 

Reply via email to