Repository: ignite Updated Branches: refs/heads/ignite-1093-2 252d4e72b -> dca0198c5
1093 Project: http://git-wip-us.apache.org/repos/asf/ignite/repo Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/dca0198c Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/dca0198c Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/dca0198c Branch: refs/heads/ignite-1093-2 Commit: dca0198c57a687a9ebe9851b277a9fe5b51e9de2 Parents: 252d4e7 Author: Anton Vinogradov <[email protected]> Authored: Mon Oct 12 20:46:15 2015 +0300 Committer: Anton Vinogradov <[email protected]> Committed: Mon Oct 12 20:46:15 2015 +0300 ---------------------------------------------------------------------- .../processors/cache/GridCacheMapEntry.java | 38 ++++---- .../GridCachePartitionExchangeManager.java | 52 +++++++---- .../processors/cache/GridCachePreloader.java | 2 +- .../cache/GridCachePreloaderAdapter.java | 2 +- .../distributed/dht/GridDhtCacheEntry.java | 5 +- .../distributed/dht/GridDhtLocalPartition.java | 94 +++++++++++++------- .../dht/preloader/GridDhtPartitionDemander.java | 10 +-- .../dht/preloader/GridDhtPreloader.java | 2 +- .../GridCacheRebalancingSyncSelfTest.java | 2 +- .../config/benchmark-rebalancing.properties | 2 +- 10 files changed, 131 insertions(+), 78 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/ignite/blob/dca0198c/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMapEntry.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMapEntry.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMapEntry.java index 4bf0aa1..4e92ed4 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMapEntry.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMapEntry.java @@ -453,7 +453,7 @@ public abstract class GridCacheMapEntry extends GridMetadataAwareAdapter impleme if (cctx.swap().offheapSwapEvict(key, entry, partition(), evictVer)) { assert !hasValueUnlocked() : this; - obsolete = markObsolete0(obsoleteVer, false); + obsolete = markObsolete0(obsoleteVer, false, null); assert obsolete : this; } @@ -1303,7 +1303,7 @@ public abstract class GridCacheMapEntry extends GridMetadataAwareAdapter impleme synchronized (this) { // If entry is still removed. if (newVer == ver) { - if (obsoleteVer == null || !(marked = markObsolete0(obsoleteVer, true))) { + if (obsoleteVer == null || !(marked = markObsolete0(obsoleteVer, true, null))) { if (log.isDebugEnabled()) log.debug("Entry could not be marked obsolete (it is still used): " + this); } @@ -2420,7 +2420,7 @@ public abstract class GridCacheMapEntry extends GridMetadataAwareAdapter impleme try { if ((!hasReaders() || readers)) { // markObsolete will clear the value. - if (!(marked = markObsolete0(ver, true))) { + if (!(marked = markObsolete0(ver, true, null))) { if (log.isDebugEnabled()) log.debug("Entry could not be marked obsolete (it is still used): " + this); @@ -2478,7 +2478,7 @@ public abstract class GridCacheMapEntry extends GridMetadataAwareAdapter impleme boolean obsolete; synchronized (this) { - obsolete = markObsolete0(ver, true); + obsolete = markObsolete0(ver, true, null); } if (obsolete) @@ -2511,7 +2511,7 @@ public abstract class GridCacheMapEntry extends GridMetadataAwareAdapter impleme } } else - obsolete = markObsolete0(ver, true); + obsolete = markObsolete0(ver, true, null); } } } @@ -2539,7 +2539,7 @@ public abstract class GridCacheMapEntry extends GridMetadataAwareAdapter impleme if (!this.ver.equals(ver)) return false; - marked = markObsolete0(ver, true); + marked = markObsolete0(ver, true, null); } if (marked) @@ -2555,9 +2555,10 @@ public abstract class GridCacheMapEntry extends GridMetadataAwareAdapter impleme * * @param ver Version. * @param clear {@code True} to clear. + * @param extras Predefined extras. * @return {@code True} if entry is obsolete, {@code false} if entry is still used by other threads or nodes. */ - protected final boolean markObsolete0(GridCacheVersion ver, boolean clear) { + protected final boolean markObsolete0(GridCacheVersion ver, boolean clear, GridCacheObsoleteEntryExtras extras) { assert Thread.holdsLock(this); GridCacheVersion obsoleteVer = obsoleteVersionExtras(); @@ -2572,7 +2573,7 @@ public abstract class GridCacheMapEntry extends GridMetadataAwareAdapter impleme if (mvcc == null || mvcc.isEmpty(ver)) { obsoleteVer = ver; - obsoleteVersionExtras(obsoleteVer); + obsoleteVersionExtras(obsoleteVer, extras); if (clear) value(null); @@ -2896,7 +2897,7 @@ public abstract class GridCacheMapEntry extends GridMetadataAwareAdapter impleme synchronized (this) { if (checkExpired()) { - rmv = markObsolete0(cctx.versions().next(this.ver), true); + rmv = markObsolete0(cctx.versions().next(this.ver), true, null); return null; } @@ -3366,7 +3367,7 @@ public abstract class GridCacheMapEntry extends GridMetadataAwareAdapter impleme } } else { - if (markObsolete0(obsoleteVer, true)) + if (markObsolete0(obsoleteVer, true, null)) obsolete = true; // Success, will return "true". } } @@ -3688,7 +3689,7 @@ public abstract class GridCacheMapEntry extends GridMetadataAwareAdapter impleme CacheObject prev = saveOldValueUnlocked(false); - if (!hasReaders() && markObsolete0(obsoleteVer, false)) { + if (!hasReaders() && markObsolete0(obsoleteVer, false, null)) { if (swap) { if (!isStartVersion()) { try { @@ -3736,7 +3737,7 @@ public abstract class GridCacheMapEntry extends GridMetadataAwareAdapter impleme CacheObject prevVal = saveValueForIndexUnlocked(); - if (!hasReaders() && markObsolete0(obsoleteVer, false)) { + if (!hasReaders() && markObsolete0(obsoleteVer, false, null)) { if (swap) { if (!isStartVersion()) { try { @@ -3812,7 +3813,7 @@ public abstract class GridCacheMapEntry extends GridMetadataAwareAdapter impleme GridCacheBatchSwapEntry ret = null; try { - if (!hasReaders() && markObsolete0(obsoleteVer, false)) { + if (!hasReaders() && markObsolete0(obsoleteVer, false, null)) { if (!isStartVersion() && hasValueUnlocked()) { if (cctx.offheapTiered() && hasOffHeapPointer()) { if (cctx.swap().offheapEvictionEnabled()) @@ -3871,7 +3872,7 @@ public abstract class GridCacheMapEntry extends GridMetadataAwareAdapter impleme return false; if (checkExpired()) { - rmv = markObsolete0(cctx.versions().next(this.ver), true); + rmv = markObsolete0(cctx.versions().next(this.ver), true, null); return false; } @@ -3984,9 +3985,12 @@ public abstract class GridCacheMapEntry extends GridMetadataAwareAdapter impleme /** * @param obsoleteVer Obsolete version. */ - protected void obsoleteVersionExtras(@Nullable GridCacheVersion obsoleteVer) { - extras = (extras != null) ? extras.obsoleteVersion(obsoleteVer) : obsoleteVer != null ? - new GridCacheObsoleteEntryExtras(obsoleteVer) : null; + protected void obsoleteVersionExtras(@Nullable GridCacheVersion obsoleteVer, GridCacheObsoleteEntryExtras ext) { + extras = (extras != null) ? + extras.obsoleteVersion(obsoleteVer) : + obsoleteVer != null ? + (ext != null) ? ext : new GridCacheObsoleteEntryExtras(obsoleteVer) : + null; } /** http://git-wip-us.apache.org/repos/asf/ignite/blob/dca0198c/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java index 7a210ce..ada4c84 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java @@ -25,13 +25,11 @@ import java.util.LinkedList; import java.util.List; import java.util.Map; import java.util.NavigableMap; +import java.util.Queue; import java.util.TreeMap; import java.util.UUID; import java.util.concurrent.BlockingQueue; -import java.util.concurrent.Callable; import java.util.concurrent.ConcurrentMap; -import java.util.concurrent.ExecutorService; -import java.util.concurrent.Executors; import java.util.concurrent.LinkedBlockingDeque; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicLong; @@ -74,15 +72,16 @@ import org.apache.ignite.internal.util.typedef.CI1; import org.apache.ignite.internal.util.typedef.CI2; import org.apache.ignite.internal.util.typedef.F; import org.apache.ignite.internal.util.typedef.internal.CU; +import org.apache.ignite.internal.util.typedef.internal.GPC; import org.apache.ignite.internal.util.typedef.internal.S; import org.apache.ignite.internal.util.typedef.internal.U; import org.apache.ignite.internal.util.worker.GridWorker; import org.apache.ignite.lang.IgniteBiInClosure; import org.apache.ignite.lang.IgniteUuid; import org.apache.ignite.thread.IgniteThread; -import org.apache.ignite.thread.IgniteThreadFactory; import org.jetbrains.annotations.Nullable; import org.jsr166.ConcurrentHashMap8; +import org.jsr166.ConcurrentLinkedDeque8; import static java.util.concurrent.TimeUnit.MILLISECONDS; import static org.apache.ignite.IgniteSystemProperties.IGNITE_AFFINITY_HISTORY_SIZE; @@ -142,7 +141,10 @@ public class GridCachePartitionExchangeManager<K, V> extends GridCacheSharedMana private GridFutureAdapter<?> reconnectExchangeFut; /** */ - private ExecutorService rebalancingOrderedExecutorService; + private final Queue<Runnable> rebalancingQueue = new ConcurrentLinkedDeque8<>(); + + /** */ + private final AtomicReference<Integer> rebalancingQueueOwning = new AtomicReference<>(0); /** * Partition map futures. @@ -259,9 +261,6 @@ public class GridCachePartitionExchangeManager<K, V> extends GridCacheSharedMana @Override protected void start0() throws IgniteCheckedException { super.start0(); - rebalancingOrderedExecutorService = Executors.newSingleThreadExecutor( - new IgniteThreadFactory(cctx.gridName(), "rebalancing-assigns")); - exchWorker = new ExchangeWorker(); cctx.gridEvents().addLocalEventListener(discoLsnr, EVT_NODE_JOINED, EVT_NODE_LEFT, EVT_NODE_FAILED, @@ -466,8 +465,6 @@ public class GridCachePartitionExchangeManager<K, V> extends GridCacheSharedMana @Override protected void stop0(boolean cancel) { super.stop0(cancel); - rebalancingOrderedExecutorService.shutdownNow(); - // Do not allow any activity in exchange manager after stop. busyLock.writeLock().lock(); @@ -1329,14 +1326,39 @@ public class GridCachePartitionExchangeManager<K, V> extends GridCacheSharedMana } } - Callable c = cacheCtx.preloader().addAssignments( - assignsMap.get(cacheId), forcePreload, waitList, cnt); + Runnable r = cacheCtx.preloader().addAssignments( + assignsMap.get(cacheId), forcePreload, waitList, cnt); - if (c != null) { + if (r != null) { U.log(log, "Rebalancing scheduled: [cache=" + cacheCtx.name() + - " , waitList=" + waitList.toString() + "]"); + " , waitList=" + waitList.toString() + "]"); + + rebalancingQueue.add(r); - rebalancingOrderedExecutorService.submit(c); + if (rebalancingQueueOwning.get() == 0) { + cacheCtx.closures().callLocalSafe(new GPC<Boolean>() { + @Override public Boolean call() { + while (true) { + if (!rebalancingQueueOwning.compareAndSet(0, 1)) + return false; + + try { + Runnable rn = rebalancingQueue.poll(); + + if (rn == null) + return false; + + rn.run(); + } + finally { + boolean res = rebalancingQueueOwning.compareAndSet(1, 0); + + assert res; + } + } + } + }, /*system pool*/ true); + } } } } http://git-wip-us.apache.org/repos/asf/ignite/blob/dca0198c/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePreloader.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePreloader.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePreloader.java index 46a09f0..b2c2a2b 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePreloader.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePreloader.java @@ -97,7 +97,7 @@ public interface GridCachePreloader { * @param caches Rebalancing of these caches will be finished before this started. * @param cnt Counter. */ - public Callable addAssignments(GridDhtPreloaderAssignments assignments, boolean forcePreload, + public Runnable addAssignments(GridDhtPreloaderAssignments assignments, boolean forcePreload, Collection<String> caches, int cnt) throws IgniteCheckedException; /** http://git-wip-us.apache.org/repos/asf/ignite/blob/dca0198c/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePreloaderAdapter.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePreloaderAdapter.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePreloaderAdapter.java index 1266a3b..6ffb2bf 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePreloaderAdapter.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePreloaderAdapter.java @@ -158,7 +158,7 @@ public class GridCachePreloaderAdapter implements GridCachePreloader { } /** {@inheritDoc} */ - @Override public Callable addAssignments(GridDhtPreloaderAssignments assignments, boolean forcePreload, + @Override public Runnable addAssignments(GridDhtPreloaderAssignments assignments, boolean forcePreload, Collection<String> caches, int cnt) throws IgniteCheckedException { return null; } http://git-wip-us.apache.org/repos/asf/ignite/blob/dca0198c/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtCacheEntry.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtCacheEntry.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtCacheEntry.java index be2f3d3..b2279ed 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtCacheEntry.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtCacheEntry.java @@ -37,6 +37,7 @@ import org.apache.ignite.internal.processors.cache.GridCacheMvccCandidate; import org.apache.ignite.internal.processors.cache.KeyCacheObject; import org.apache.ignite.internal.processors.cache.distributed.GridDistributedCacheEntry; import org.apache.ignite.internal.processors.cache.distributed.GridDistributedLockCancelledException; +import org.apache.ignite.internal.processors.cache.extras.GridCacheObsoleteEntryExtras; import org.apache.ignite.internal.processors.cache.transactions.IgniteInternalTx; import org.apache.ignite.internal.processors.cache.version.GridCacheVersion; import org.apache.ignite.internal.util.lang.GridPlainRunnable; @@ -539,7 +540,7 @@ public class GridDhtCacheEntry extends GridDistributedCacheEntry { * @return {@code True} if entry was not being used, passed the filter and could be removed. * @throws IgniteCheckedException If failed to remove from swap. */ - public boolean clearInternal(GridCacheVersion ver, boolean swap) throws IgniteCheckedException { + public boolean clearInternal(GridCacheVersion ver, boolean swap, GridCacheObsoleteEntryExtras extras) throws IgniteCheckedException { boolean rmv = false; try { @@ -548,7 +549,7 @@ public class GridDhtCacheEntry extends GridDistributedCacheEntry { // Call markObsolete0 to avoid recursive calls to clear if // we are clearing dht local partition (onMarkedObsolete should not be called). - if (!markObsolete0(ver, false)) { + if (!markObsolete0(ver, false, extras)) { if (log.isDebugEnabled()) log.debug("Entry could not be marked obsolete (it is still used or has readers): " + this); http://git-wip-us.apache.org/repos/asf/ignite/blob/dca0198c/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtLocalPartition.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtLocalPartition.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtLocalPartition.java index 6e6ccfd..15eede0 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtLocalPartition.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtLocalPartition.java @@ -21,6 +21,7 @@ import java.util.Collection; import java.util.HashMap; import java.util.Iterator; import java.util.Map; +import java.util.Queue; import java.util.Set; import java.util.concurrent.ConcurrentMap; import java.util.concurrent.CopyOnWriteArrayList; @@ -38,10 +39,10 @@ import org.apache.ignite.internal.processors.cache.GridCacheEntryRemovedExceptio import org.apache.ignite.internal.processors.cache.GridCacheSwapEntry; import org.apache.ignite.internal.processors.cache.KeyCacheObject; import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPreloader; +import org.apache.ignite.internal.processors.cache.extras.GridCacheObsoleteEntryExtras; import org.apache.ignite.internal.processors.cache.version.GridCacheVersion; import org.apache.ignite.internal.processors.query.GridQueryProcessor; import org.apache.ignite.internal.util.GridCircularBuffer; -import org.apache.ignite.internal.util.future.GridFinishedFuture; import org.apache.ignite.internal.util.future.GridFutureAdapter; import org.apache.ignite.internal.util.lang.GridCloseableIterator; import org.apache.ignite.internal.util.tostring.GridToStringExclude; @@ -55,6 +56,7 @@ import org.apache.ignite.internal.util.typedef.internal.U; import org.apache.ignite.lang.IgniteUuid; import org.jetbrains.annotations.NotNull; import org.jsr166.ConcurrentHashMap8; +import org.jsr166.ConcurrentLinkedDeque8; import org.jsr166.LongAdder8; import static org.apache.ignite.IgniteSystemProperties.IGNITE_ATOMIC_CACHE_DELETE_HISTORY_SIZE; @@ -78,6 +80,12 @@ public class GridDhtLocalPartition implements Comparable<GridDhtLocalPartition>, /** Logger. */ private static volatile IgniteLogger log; + /** */ + private static final Queue<GridDhtLocalPartition> partitionsToEvict = new ConcurrentLinkedDeque8<>(); + + /** */ + private static final AtomicReference<Integer> partitionsEvictionOwning = new AtomicReference<>(0); + /** Partition ID. */ private final int id; @@ -284,7 +292,7 @@ public class GridDhtLocalPartition implements Comparable<GridDhtLocalPartition>, } // Attempt to evict. - tryEvict(true); + tryEvict(); } /** @@ -409,7 +417,7 @@ public class GridDhtLocalPartition implements Comparable<GridDhtLocalPartition>, // Decrement reservations. if (state.compareAndSet(s, s, reservations, --reservations)) { - tryEvict(true); + tryEvict(); break; } @@ -477,7 +485,7 @@ public class GridDhtLocalPartition implements Comparable<GridDhtLocalPartition>, * @param updateSeq Update sequence. * @return Future for evict attempt. */ - IgniteInternalFuture<Boolean> tryEvictAsync(boolean updateSeq) { + void tryEvictAsync(boolean updateSeq) { if (map.isEmpty() && !GridQueryProcessor.isEnabled(cctx.config()) && state.compareAndSet(RENTING, EVICTED, 0, 0)) { if (log.isDebugEnabled()) @@ -495,15 +503,10 @@ public class GridDhtLocalPartition implements Comparable<GridDhtLocalPartition>, ((GridDhtPreloader)cctx.preloader()).onPartitionEvicted(this, updateSeq); clearDeferredDeletes(); - - return new GridFinishedFuture<>(true); } - - return cctx.closures().callLocalSafe(new GPC<Boolean>() { - @Override public Boolean call() { - return tryEvict(true); - } - }, /*system pool*/ true); + else { + tryEvict(); + } } /** @@ -519,38 +522,61 @@ public class GridDhtLocalPartition implements Comparable<GridDhtLocalPartition>, } /** - * @param updateSeq Update sequence. * @return {@code True} if entry has been transitioned to state EVICTED. */ - boolean tryEvict(boolean updateSeq) { + void tryEvict() { if (state.getReference() != RENTING || state.getStamp() != 0 || groupReserved()) - return false; + return; - // Attempt to evict partition entries from cache. - clearAll(); + partitionsToEvict.add(this); - if (map.isEmpty() && state.compareAndSet(RENTING, EVICTED, 0, 0)) { - if (log.isDebugEnabled()) - log.debug("Evicted partition: " + this); + if (partitionsEvictionOwning.get() == 0) { + cctx.closures().callLocalSafe(new GPC<Boolean>() { + @Override public Boolean call() { + while (true) { + if (!partitionsEvictionOwning.compareAndSet(0, 1)) + return false; - if (!GridQueryProcessor.isEnabled(cctx.config())) - clearSwap(); + try { + GridDhtLocalPartition part = partitionsToEvict.poll(); - if (cctx.isDrEnabled()) - cctx.dr().partitionEvicted(id); + if (part == null) { + return false; + } - cctx.dataStructures().onPartitionEvicted(id); + if (part.state.getReference() != EVICTED) { + // Attempt to evict partition entries from cache. + part.clearAll(); - rent.onDone(); + if (part.map.isEmpty() && part.state.compareAndSet(RENTING, EVICTED, 0, 0)) { + if (log.isDebugEnabled()) + log.debug("Evicted partition: " + this); - ((GridDhtPreloader)cctx.preloader()).onPartitionEvicted(this, updateSeq); + if (!GridQueryProcessor.isEnabled(part.cctx.config())) + part.clearSwap(); - clearDeferredDeletes(); + if (part.cctx.isDrEnabled()) + part.cctx.dr().partitionEvicted(id); - return true; - } + part.cctx.dataStructures().onPartitionEvicted(id); - return false; + part.rent.onDone(); + + ((GridDhtPreloader)part.cctx.preloader()).onPartitionEvicted(part, true); + + part.clearDeferredDeletes(); + } + } + } + finally { + boolean res = partitionsEvictionOwning.compareAndSet(1, 0); + + assert res; + } + } + } + }, /*system pool*/ true); + } } /** @@ -590,7 +616,7 @@ public class GridDhtLocalPartition implements Comparable<GridDhtLocalPartition>, * */ void onUnlock() { - tryEvict(true); + tryEvict(); } /** @@ -630,6 +656,8 @@ public class GridDhtLocalPartition implements Comparable<GridDhtLocalPartition>, it = F.concat(it, unswapIt); } + GridCacheObsoleteEntryExtras extras = new GridCacheObsoleteEntryExtras(clearVer); + try { while (it.hasNext()) { GridDhtCacheEntry cached = null; @@ -637,7 +665,7 @@ public class GridDhtLocalPartition implements Comparable<GridDhtLocalPartition>, try { cached = it.next(); - if (cached.clearInternal(clearVer, swap)) { + if (cached.clearInternal(clearVer, swap, extras)) { map.remove(cached.key(), cached); if (!cached.isInternal()) { http://git-wip-us.apache.org/repos/asf/ignite/blob/dca0198c/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionDemander.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionDemander.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionDemander.java index 9b079ec..e68209e 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionDemander.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionDemander.java @@ -248,7 +248,7 @@ public class GridDhtPartitionDemander { * @param cnt Counter. * @throws IgniteCheckedException Exception */ - Callable addAssignments(final GridDhtPreloaderAssignments assigns, boolean force, final Collection<String> caches, + Runnable addAssignments(final GridDhtPreloaderAssignments assigns, boolean force, final Collection<String> caches, int cnt) throws IgniteCheckedException { if (log.isDebugEnabled()) log.debug("Adding partition assignments: " + assigns); @@ -293,19 +293,17 @@ public class GridDhtPartitionDemander { return null; } - return new Callable<Boolean>() { + return new Runnable() { @Override - public Boolean call() { + public void run() { for (String c : caches) { waitForCacheRebalancing(c, fut); if (fut.isDone()) - return false; + return; } requestPartitions(fut, assigns); - - return true; } }; } http://git-wip-us.apache.org/repos/asf/ignite/blob/dca0198c/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPreloader.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPreloader.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPreloader.java index 6107daa..01109c1 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPreloader.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPreloader.java @@ -400,7 +400,7 @@ public class GridDhtPreloader extends GridCachePreloaderAdapter { } /** {@inheritDoc} */ - @Override public Callable addAssignments(GridDhtPreloaderAssignments assignments, + @Override public Runnable addAssignments(GridDhtPreloaderAssignments assignments, boolean forcePreload, Collection<String> caches, int cnt) throws IgniteCheckedException { return demander.addAssignments(assignments, forcePreload, caches, cnt); } http://git-wip-us.apache.org/repos/asf/ignite/blob/dca0198c/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/rebalancing/GridCacheRebalancingSyncSelfTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/rebalancing/GridCacheRebalancingSyncSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/rebalancing/GridCacheRebalancingSyncSelfTest.java index 897a820..85bf8d6 100644 --- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/rebalancing/GridCacheRebalancingSyncSelfTest.java +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/rebalancing/GridCacheRebalancingSyncSelfTest.java @@ -106,6 +106,7 @@ public class GridCacheRebalancingSyncSelfTest extends GridCommonAbstractTest { cacheRCfg.setRebalanceMode(CacheRebalanceMode.SYNC); cacheRCfg.setRebalanceBatchSize(1); cacheRCfg.setRebalanceBatchesCount(Integer.MAX_VALUE); + ((TcpCommunicationSpi)iCfg.getCommunicationSpi()).setSharedMemoryPort(-1);//Shmem fix for Integer.MAX_VALUE. CacheConfiguration<Integer, Integer> cacheRCfg2 = new CacheConfiguration<>(); @@ -117,7 +118,6 @@ public class GridCacheRebalancingSyncSelfTest extends GridCommonAbstractTest { iCfg.setCacheConfiguration(cachePCfg, cachePCfg2, cacheRCfg, cacheRCfg2); iCfg.setRebalanceThreadPoolSize(2); - ((TcpCommunicationSpi)iCfg.getCommunicationSpi()).setSharedMemoryPort(-1);//Shmem fix. return iCfg; } http://git-wip-us.apache.org/repos/asf/ignite/blob/dca0198c/modules/yardstick/config/benchmark-rebalancing.properties ---------------------------------------------------------------------- diff --git a/modules/yardstick/config/benchmark-rebalancing.properties b/modules/yardstick/config/benchmark-rebalancing.properties index 796e49f..4d0e328 100644 --- a/modules/yardstick/config/benchmark-rebalancing.properties +++ b/modules/yardstick/config/benchmark-rebalancing.properties @@ -39,7 +39,7 @@ JVM_OPTS=${JVM_OPTS}" \ -XX:MaxTenuringThreshold=0 \ -XX:SurvivorRatio=1024 \ -XX:+UseCMSInitiatingOccupancyOnly \ - -XX:CMSInitiatingOccupancyFraction=40 \ + -XX:CMSInitiatingOccupancyFraction=60 \ -XX:MaxGCPauseMillis=100 \ "
