Ignite-1093 "Rebalancing with default parameters is very slow" fixes.
Project: http://git-wip-us.apache.org/repos/asf/ignite/repo Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/242d988a Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/242d988a Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/242d988a Branch: refs/heads/ignite-perftest Commit: 242d988a077878cca93f7bec85af3438a94e44d7 Parents: 7573003 Author: Anton Vinogradov <a...@apache.org> Authored: Thu Nov 5 16:15:34 2015 +0300 Committer: Anton Vinogradov <a...@apache.org> Committed: Thu Nov 5 20:29:29 2015 +0300 ---------------------------------------------------------------------- .../configuration/CacheConfiguration.java | 54 +- .../configuration/IgniteConfiguration.java | 32 +- .../apache/ignite/internal/IgniteKernal.java | 21 + .../org/apache/ignite/internal/IgnitionEx.java | 3 + .../communication/GridIoMessageFactory.java | 10 +- .../processors/cache/GridCacheIoManager.java | 19 +- .../processors/cache/GridCacheMapEntry.java | 38 +- .../GridCachePartitionExchangeManager.java | 164 ++- .../processors/cache/GridCachePreloader.java | 55 +- .../cache/GridCachePreloaderAdapter.java | 40 +- .../processors/cache/GridCacheProcessor.java | 55 +- .../distributed/dht/GridDhtCacheEntry.java | 11 +- .../distributed/dht/GridDhtLocalPartition.java | 63 +- .../dht/GridDhtPartitionsReservation.java | 2 +- .../GridDhtPartitionDemandMessage.java | 14 +- .../preloader/GridDhtPartitionDemandPool.java | 1192 --------------- .../dht/preloader/GridDhtPartitionDemander.java | 1389 ++++++++++++++++++ .../dht/preloader/GridDhtPartitionSupplier.java | 1034 +++++++++++++ .../GridDhtPartitionSupplyMessageV2.java | 380 +++++ .../preloader/GridDhtPartitionSupplyPool.java | 555 ------- .../GridDhtPartitionsExchangeFuture.java | 2 + .../dht/preloader/GridDhtPreloader.java | 282 +++- .../datastructures/DataStructuresProcessor.java | 3 + .../processors/task/GridTaskWorker.java | 4 +- .../ignite/internal/util/lang/GridTuple4.java | 2 +- .../spi/discovery/tcp/TcpDiscoverySpi.java | 2 +- .../IgniteCacheP2pUnmarshallingErrorTest.java | 2 +- .../dht/GridCacheTxNodeFailureSelfTest.java | 21 +- .../GridCacheRebalancingAsyncSelfTest.java | 68 + .../GridCacheRebalancingSyncSelfTest.java | 506 +++++++ ...eRebalancingUnmarshallingFailedSelfTest.java | 147 ++ .../GridCacheReplicatedPreloadSelfTest.java | 22 +- .../spi/discovery/tcp/TcpDiscoverySelfTest.java | 17 - .../spi/discovery/tcp/TestTcpDiscoverySpi.java | 46 + .../testframework/junits/GridAbstractTest.java | 3 +- .../junits/common/GridCommonAbstractTest.java | 21 +- .../testsuites/IgniteCacheTestSuite3.java | 4 + .../tcp/GridOrderedMessageCancelSelfTest.java | 18 +- 38 files changed, 4309 insertions(+), 1992 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/ignite/blob/242d988a/modules/core/src/main/java/org/apache/ignite/configuration/CacheConfiguration.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/configuration/CacheConfiguration.java b/modules/core/src/main/java/org/apache/ignite/configuration/CacheConfiguration.java index 374743f..b7276c9 100644 --- a/modules/core/src/main/java/org/apache/ignite/configuration/CacheConfiguration.java +++ b/modules/core/src/main/java/org/apache/ignite/configuration/CacheConfiguration.java @@ -69,11 +69,15 @@ public class CacheConfiguration<K, V> extends MutableConfiguration<K, V> { private static final long serialVersionUID = 0L; /** Default size of rebalance thread pool. */ + @Deprecated public static final int DFLT_REBALANCE_THREAD_POOL_SIZE = 2; /** Default rebalance timeout (ms).*/ public static final long DFLT_REBALANCE_TIMEOUT = 10000; + /** Default rebalance batches prefetch count. */ + public static final long DFLT_REBALANCE_BATCHES_PREFETCH_COUNT = 2; + /** Time in milliseconds to wait between rebalance messages to avoid overloading CPU. */ public static final long DFLT_REBALANCE_THROTTLE = 0; @@ -174,6 +178,7 @@ public class CacheConfiguration<K, V> extends MutableConfiguration<K, V> { private String name; /** Rebalance thread pool size. */ + @Deprecated private int rebalancePoolSize = DFLT_REBALANCE_THREAD_POOL_SIZE; /** Rebalance timeout. */ @@ -254,6 +259,9 @@ public class CacheConfiguration<K, V> extends MutableConfiguration<K, V> { /** Rebalance batch size. */ private int rebalanceBatchSize = DFLT_REBALANCE_BATCH_SIZE; + /** Rebalance batches prefetch count. */ + private long rebalanceBatchesPrefetchCount = DFLT_REBALANCE_BATCHES_PREFETCH_COUNT; + /** Off-heap memory size. */ private long offHeapMaxMem = DFLT_OFFHEAP_MEMORY; @@ -394,9 +402,10 @@ public class CacheConfiguration<K, V> extends MutableConfiguration<K, V> { name = cc.getName(); nearCfg = cc.getNearConfiguration(); nodeFilter = cc.getNodeFilter(); - rebalanceMode = cc.getRebalanceMode(); + rebalanceBatchesPrefetchCount = cc.getRebalanceBatchesPrefetchCount(); rebalanceBatchSize = cc.getRebalanceBatchSize(); rebalanceDelay = cc.getRebalanceDelay(); + rebalanceMode = cc.getRebalanceMode(); rebalanceOrder = cc.getRebalanceOrder(); rebalancePoolSize = cc.getRebalanceThreadPoolSize(); rebalanceTimeout = cc.getRebalanceTimeout(); @@ -1036,10 +1045,10 @@ public class CacheConfiguration<K, V> extends MutableConfiguration<K, V> { * {@link CacheRebalanceMode#SYNC SYNC} or {@link CacheRebalanceMode#ASYNC ASYNC} rebalance modes only. * <p/> * If cache rebalance order is positive, rebalancing for this cache will be started only when rebalancing for - * all caches with smaller rebalance order (except caches with rebalance order {@code 0}) will be completed. + * all caches with smaller rebalance order will be completed. * <p/> * Note that cache with order {@code 0} does not participate in ordering. This means that cache with - * rebalance order {@code 1} will never wait for any other caches. All caches with order {@code 0} will + * rebalance order {@code 0} will never wait for any other caches. All caches with order {@code 0} will * be rebalanced right away concurrently with each other and ordered rebalance processes. * <p/> * If not set, cache order is 0, i.e. rebalancing is not ordered. @@ -1088,6 +1097,35 @@ public class CacheConfiguration<K, V> extends MutableConfiguration<K, V> { } /** + * To gain better rebalancing performance supplier node can provide more than one batch at rebalancing start and + * provide one new to each next demand request. + * + * Gets number of batches generated by supply node at rebalancing start. + * Minimum is 1. + * + * @return batches count + */ + public long getRebalanceBatchesPrefetchCount() { + return rebalanceBatchesPrefetchCount; + } + + /** + * To gain better rebalancing performance supplier node can provide more than one batch at rebalancing start and + * provide one new to each next demand request. + * + * Sets number of batches generated by supply node at rebalancing start. + * Minimum is 1. + * + * @param rebalanceBatchesCnt batches count. + * @return {@code this} for chaining. + */ + public CacheConfiguration<K, V> setRebalanceBatchesPrefetchCount(long rebalanceBatchesCnt) { + this.rebalanceBatchesPrefetchCount = rebalanceBatchesCnt; + + return this; + } + + /** * Flag indicating whether Ignite should use swap storage by default. By default * swap is disabled which is defined via {@link #DFLT_SWAP_ENABLED} constant. * @@ -1273,24 +1311,22 @@ public class CacheConfiguration<K, V> extends MutableConfiguration<K, V> { } /** - * Gets size of rebalancing thread pool. Note that size serves as a hint and implementation - * may create more threads for rebalancing than specified here (but never less threads). - * <p> - * Default value is {@link #DFLT_REBALANCE_THREAD_POOL_SIZE}. + * Use {@link IgniteConfiguration#getRebalanceThreadPoolSize()} instead. * * @return Size of rebalancing thread pool. */ + @Deprecated public int getRebalanceThreadPoolSize() { return rebalancePoolSize; } /** - * Sets size of rebalancing thread pool. Note that size serves as a hint and implementation may create more threads - * for rebalancing than specified here (but never less threads). + * Use {@link IgniteConfiguration#getRebalanceThreadPoolSize()} instead. * * @param rebalancePoolSize Size of rebalancing thread pool. * @return {@code this} for chaining. */ + @Deprecated public CacheConfiguration<K, V> setRebalanceThreadPoolSize(int rebalancePoolSize) { this.rebalancePoolSize = rebalancePoolSize; http://git-wip-us.apache.org/repos/asf/ignite/blob/242d988a/modules/core/src/main/java/org/apache/ignite/configuration/IgniteConfiguration.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/configuration/IgniteConfiguration.java b/modules/core/src/main/java/org/apache/ignite/configuration/IgniteConfiguration.java index ecae356..9298c6d 100644 --- a/modules/core/src/main/java/org/apache/ignite/configuration/IgniteConfiguration.java +++ b/modules/core/src/main/java/org/apache/ignite/configuration/IgniteConfiguration.java @@ -149,6 +149,9 @@ public class IgniteConfiguration { /** Default keep alive time for public thread pool. */ public static final long DFLT_PUBLIC_KEEP_ALIVE_TIME = 0; + /** Default limit of threads used for rebalance. */ + public static final int DFLT_REBALANCE_THREAD_POOL_SIZE = 1; + /** Default max queue capacity of public thread pool. */ public static final int DFLT_PUBLIC_THREADPOOL_QUEUE_CAP = Integer.MAX_VALUE; @@ -354,6 +357,9 @@ public class IgniteConfiguration { /** Client mode flag. */ private Boolean clientMode; + /** Rebalance thread pool size. */ + private int rebalanceThreadPoolSize = DFLT_REBALANCE_THREAD_POOL_SIZE; + /** Transactions configuration. */ private TransactionConfiguration txCfg = new TransactionConfiguration(); @@ -500,6 +506,7 @@ public class IgniteConfiguration { p2pPoolSize = cfg.getPeerClassLoadingThreadPoolSize(); pluginCfgs = cfg.getPluginConfigurations(); pubPoolSize = cfg.getPublicThreadPoolSize(); + rebalanceThreadPoolSize = cfg.getRebalanceThreadPoolSize(); segChkFreq = cfg.getSegmentCheckFrequency(); segPlc = cfg.getSegmentationPolicy(); segResolveAttempts = cfg.getSegmentationResolveAttempts(); @@ -1331,6 +1338,29 @@ public class IgniteConfiguration { } /** + * Gets Max count of threads can be used at rebalancing. + * Minimum is 1. + * @return count. + */ + public int getRebalanceThreadPoolSize() { + return rebalanceThreadPoolSize; + } + + /** + * Sets Max count of threads can be used at rebalancing. + * + * Default is {@code 1} which has minimal impact on the operation of the grid. + * + * @param size Size. + * @return {@code this} for chaining. + */ + public IgniteConfiguration setRebalanceThreadPoolSize(int size) { + this.rebalanceThreadPoolSize = size; + + return this; + } + + /** * Returns a collection of life-cycle beans. These beans will be automatically * notified of grid life-cycle events. Use life-cycle beans whenever you * want to perform certain logic before and after grid startup and stopping @@ -2383,4 +2413,4 @@ public class IgniteConfiguration { @Override public String toString() { return S.toString(IgniteConfiguration.class, this); } -} \ No newline at end of file +} http://git-wip-us.apache.org/repos/asf/ignite/blob/242d988a/modules/core/src/main/java/org/apache/ignite/internal/IgniteKernal.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/IgniteKernal.java b/modules/core/src/main/java/org/apache/ignite/internal/IgniteKernal.java index 8603055..dd129da 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/IgniteKernal.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/IgniteKernal.java @@ -735,6 +735,7 @@ public class IgniteKernal implements IgniteEx, IgniteMXBean, Externalizable { ackEnvironmentVariables(); ackCacheConfiguration(); ackP2pConfiguration(); + ackRebalanceConfiguration(); // Run background network diagnostics. GridDiagnostic.runBackgroundCheck(gridName, execSvc, log); @@ -2147,6 +2148,26 @@ public class IgniteKernal implements IgniteEx, IgniteMXBean, Externalizable { /** * */ + private void ackRebalanceConfiguration() throws IgniteCheckedException { + if (cfg.getSystemThreadPoolSize() <= cfg.getRebalanceThreadPoolSize()) + throw new IgniteCheckedException("Rebalance thread pool size exceed or equals System thread pool size. " + + "Change IgniteConfiguration.rebalanceThreadPoolSize property before next start."); + + if (cfg.getRebalanceThreadPoolSize() < 1) + throw new IgniteCheckedException("Rebalance thread pool size minimal allowed value is 1. " + + "Change IgniteConfiguration.rebalanceThreadPoolSize property before next start."); + + for (CacheConfiguration ccfg : cfg.getCacheConfiguration()){ + if (ccfg.getRebalanceBatchesPrefetchCount() < 1) + throw new IgniteCheckedException("Rebalance batches prefetch count minimal allowed value is 1. " + + "Change CacheConfiguration.rebalanceBatchesPrefetchCount property before next start. " + + "[cache="+ccfg.getName()+"]"); + } + } + + /** + * + */ private void ackCacheConfiguration() { CacheConfiguration[] cacheCfgs = cfg.getCacheConfiguration(); http://git-wip-us.apache.org/repos/asf/ignite/blob/242d988a/modules/core/src/main/java/org/apache/ignite/internal/IgnitionEx.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/IgnitionEx.java b/modules/core/src/main/java/org/apache/ignite/internal/IgnitionEx.java index 02b28c5..7d2b2dc 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/IgnitionEx.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/IgnitionEx.java @@ -2035,6 +2035,7 @@ public class IgnitionEx { cache.setAffinity(new RendezvousAffinityFunction(false, 20)); cache.setNodeFilter(CacheConfiguration.ALL_NODES); cache.setStartSize(300); + cache.setRebalanceOrder(-2);//Prior to other system caches. return cache; } @@ -2055,6 +2056,7 @@ public class IgnitionEx { cache.setWriteSynchronizationMode(FULL_SYNC); cache.setAffinity(new RendezvousAffinityFunction(false, 100)); cache.setNodeFilter(CacheConfiguration.ALL_NODES); + cache.setRebalanceOrder(-1);//Prior to user caches. return cache; } @@ -2075,6 +2077,7 @@ public class IgnitionEx { ccfg.setWriteSynchronizationMode(FULL_SYNC); ccfg.setCacheMode(cfg.getCacheMode()); ccfg.setNodeFilter(CacheConfiguration.ALL_NODES); + ccfg.setRebalanceOrder(-1);//Prior to user caches. if (cfg.getCacheMode() == PARTITIONED) ccfg.setBackups(cfg.getBackups()); http://git-wip-us.apache.org/repos/asf/ignite/blob/242d988a/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoMessageFactory.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoMessageFactory.java b/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoMessageFactory.java index 079015c..ae8c753 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoMessageFactory.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoMessageFactory.java @@ -74,6 +74,7 @@ import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.Gri import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPartitionDemandMessage; import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPartitionExchangeId; import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPartitionSupplyMessage; +import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPartitionSupplyMessageV2; import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPartitionsFullMessage; import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPartitionsSingleMessage; import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPartitionsSingleRequest; @@ -684,7 +685,12 @@ public class GridIoMessageFactory implements MessageFactory { break; - // [-3..112] - this + case 114: + msg = new GridDhtPartitionSupplyMessageV2(); + + break; + + // [-3..114] - this // [120..123] - DR // [-4..-22] - SQL default: @@ -722,4 +728,4 @@ public class GridIoMessageFactory implements MessageFactory { CUSTOM.put(type, c); } -} \ No newline at end of file +} http://git-wip-us.apache.org/repos/asf/ignite/blob/242d988a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheIoManager.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheIoManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheIoManager.java index 082f330..2334780 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheIoManager.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheIoManager.java @@ -45,7 +45,6 @@ import org.apache.ignite.internal.processors.cache.distributed.dht.atomic.GridNe import org.apache.ignite.internal.processors.cache.distributed.dht.atomic.GridNearAtomicUpdateResponse; import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtForceKeysRequest; import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtForceKeysResponse; -import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPartitionSupplyMessage; import org.apache.ignite.internal.processors.cache.distributed.near.GridNearGetFuture; import org.apache.ignite.internal.processors.cache.distributed.near.GridNearGetRequest; import org.apache.ignite.internal.processors.cache.distributed.near.GridNearGetResponse; @@ -269,7 +268,7 @@ public class GridCacheIoManager extends GridCacheSharedManagerAdapter { unmarshall(nodeId, cacheMsg); if (cacheMsg.classError() != null) - processFailedMessage(nodeId, cacheMsg); + processFailedMessage(nodeId, cacheMsg, c); else processMessage(nodeId, cacheMsg, c); } @@ -313,7 +312,8 @@ public class GridCacheIoManager extends GridCacheSharedManagerAdapter { * @param msg Message. * @throws IgniteCheckedException If failed. */ - private void processFailedMessage(UUID nodeId, GridCacheMessage msg) throws IgniteCheckedException { + private void processFailedMessage(UUID nodeId, GridCacheMessage msg, IgniteBiInClosure<UUID, GridCacheMessage> c) + throws IgniteCheckedException { GridCacheContext ctx = cctx.cacheContext(msg.cacheId()); switch (msg.directType()) { @@ -412,9 +412,7 @@ public class GridCacheIoManager extends GridCacheSharedManagerAdapter { break; case 45: { - GridDhtPartitionSupplyMessage req = (GridDhtPartitionSupplyMessage)msg; - - U.error(log, "Supply message cannot be unmarshalled.", req.classError()); + processMessage(nodeId,msg,c);// Will be handled by Rebalance Demander. } break; @@ -517,6 +515,12 @@ public class GridCacheIoManager extends GridCacheSharedManagerAdapter { break; + case 114: { + processMessage(nodeId,msg,c);// Will be handled by Rebalance Demander. + } + + break; + default: throw new IgniteCheckedException("Failed to send response to node. Unsupported direct type [message=" + msg + "]"); @@ -528,8 +532,7 @@ public class GridCacheIoManager extends GridCacheSharedManagerAdapter { * @param msg Message. * @param c Closure. */ - private void processMessage(UUID nodeId, GridCacheMessage msg, - IgniteBiInClosure<UUID, GridCacheMessage> c) { + private void processMessage(UUID nodeId, GridCacheMessage msg, IgniteBiInClosure<UUID, GridCacheMessage> c) { try { // We will not end up with storing a bunch of new UUIDs // in each cache entry, since node ID is stored in NIO session http://git-wip-us.apache.org/repos/asf/ignite/blob/242d988a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMapEntry.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMapEntry.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMapEntry.java index 2111594..ca0995a 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMapEntry.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMapEntry.java @@ -456,7 +456,7 @@ public abstract class GridCacheMapEntry extends GridMetadataAwareAdapter impleme if (cctx.swap().offheapSwapEvict(key, entry, partition(), evictVer)) { assert !hasValueUnlocked() : this; - obsolete = markObsolete0(obsoleteVer, false); + obsolete = markObsolete0(obsoleteVer, false, null); assert obsolete : this; } @@ -1364,7 +1364,7 @@ public abstract class GridCacheMapEntry extends GridMetadataAwareAdapter impleme synchronized (this) { // If entry is still removed. if (newVer == ver) { - if (obsoleteVer == null || !(marked = markObsolete0(obsoleteVer, true))) { + if (obsoleteVer == null || !(marked = markObsolete0(obsoleteVer, true, null))) { if (log.isDebugEnabled()) log.debug("Entry could not be marked obsolete (it is still used): " + this); } @@ -2481,7 +2481,7 @@ public abstract class GridCacheMapEntry extends GridMetadataAwareAdapter impleme try { if ((!hasReaders() || readers)) { // markObsolete will clear the value. - if (!(marked = markObsolete0(ver, true))) { + if (!(marked = markObsolete0(ver, true, null))) { if (log.isDebugEnabled()) log.debug("Entry could not be marked obsolete (it is still used): " + this); @@ -2539,7 +2539,7 @@ public abstract class GridCacheMapEntry extends GridMetadataAwareAdapter impleme boolean obsolete; synchronized (this) { - obsolete = markObsolete0(ver, true); + obsolete = markObsolete0(ver, true, null); } if (obsolete) @@ -2572,7 +2572,7 @@ public abstract class GridCacheMapEntry extends GridMetadataAwareAdapter impleme } } else - obsolete = markObsolete0(ver, true); + obsolete = markObsolete0(ver, true, null); } } } @@ -2600,7 +2600,7 @@ public abstract class GridCacheMapEntry extends GridMetadataAwareAdapter impleme if (!this.ver.equals(ver)) return false; - marked = markObsolete0(ver, true); + marked = markObsolete0(ver, true, null); } if (marked) @@ -2623,9 +2623,10 @@ public abstract class GridCacheMapEntry extends GridMetadataAwareAdapter impleme * * @param ver Version. * @param clear {@code True} to clear. + * @param extras Predefined extras. * @return {@code True} if entry is obsolete, {@code false} if entry is still used by other threads or nodes. */ - protected final boolean markObsolete0(GridCacheVersion ver, boolean clear) { + protected final boolean markObsolete0(GridCacheVersion ver, boolean clear, GridCacheObsoleteEntryExtras extras) { assert Thread.holdsLock(this); if (evictionDisabled()) { @@ -2646,7 +2647,7 @@ public abstract class GridCacheMapEntry extends GridMetadataAwareAdapter impleme if (mvcc == null || mvcc.isEmpty(ver)) { obsoleteVer = ver; - obsoleteVersionExtras(obsoleteVer); + obsoleteVersionExtras(obsoleteVer, extras); if (clear) value(null); @@ -2989,7 +2990,7 @@ public abstract class GridCacheMapEntry extends GridMetadataAwareAdapter impleme synchronized (this) { if (checkExpired()) { - rmv = markObsolete0(cctx.versions().next(this.ver), true); + rmv = markObsolete0(cctx.versions().next(this.ver), true, null); return null; } @@ -3465,7 +3466,7 @@ public abstract class GridCacheMapEntry extends GridMetadataAwareAdapter impleme } } else { - if (markObsolete0(obsoleteVer, true)) + if (markObsolete0(obsoleteVer, true, null)) obsolete = true; // Success, will return "true". } } @@ -3793,7 +3794,7 @@ public abstract class GridCacheMapEntry extends GridMetadataAwareAdapter impleme CacheObject prev = saveOldValueUnlocked(false); - if (!hasReaders() && markObsolete0(obsoleteVer, false)) { + if (!hasReaders() && markObsolete0(obsoleteVer, false, null)) { if (swap) { if (!isStartVersion()) { try { @@ -3847,7 +3848,7 @@ public abstract class GridCacheMapEntry extends GridMetadataAwareAdapter impleme CacheObject prevVal = saveValueForIndexUnlocked(); - if (!hasReaders() && markObsolete0(obsoleteVer, false)) { + if (!hasReaders() && markObsolete0(obsoleteVer, false, null)) { if (swap) { if (!isStartVersion()) { try { @@ -3923,7 +3924,7 @@ public abstract class GridCacheMapEntry extends GridMetadataAwareAdapter impleme GridCacheBatchSwapEntry ret = null; try { - if (!hasReaders() && markObsolete0(obsoleteVer, false)) { + if (!hasReaders() && markObsolete0(obsoleteVer, false, null)) { if (!isStartVersion() && hasValueUnlocked()) { if (cctx.offheapTiered() && hasOffHeapPointer()) { if (cctx.swap().offheapEvictionEnabled()) @@ -3982,7 +3983,7 @@ public abstract class GridCacheMapEntry extends GridMetadataAwareAdapter impleme return false; if (checkExpired()) { - rmv = markObsolete0(cctx.versions().next(this.ver), true); + rmv = markObsolete0(cctx.versions().next(this.ver), true, null); return false; } @@ -4095,9 +4096,12 @@ public abstract class GridCacheMapEntry extends GridMetadataAwareAdapter impleme /** * @param obsoleteVer Obsolete version. */ - protected void obsoleteVersionExtras(@Nullable GridCacheVersion obsoleteVer) { - extras = (extras != null) ? extras.obsoleteVersion(obsoleteVer) : obsoleteVer != null ? - new GridCacheObsoleteEntryExtras(obsoleteVer) : null; + protected void obsoleteVersionExtras(@Nullable GridCacheVersion obsoleteVer, GridCacheObsoleteEntryExtras ext) { + extras = (extras != null) ? + extras.obsoleteVersion(obsoleteVer) : + obsoleteVer != null ? + (ext != null) ? ext : new GridCacheObsoleteEntryExtras(obsoleteVer) : + null; } /** http://git-wip-us.apache.org/repos/asf/ignite/blob/242d988a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java index c8ee6e3..479a0b6 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java @@ -23,10 +23,12 @@ import java.util.Comparator; import java.util.HashMap; import java.util.List; import java.util.Map; +import java.util.NavigableMap; import java.util.Queue; +import java.util.TreeMap; import java.util.UUID; import java.util.concurrent.BlockingQueue; -import java.util.concurrent.ConcurrentLinkedQueue; +import java.util.concurrent.Callable; import java.util.concurrent.ConcurrentMap; import java.util.concurrent.LinkedBlockingDeque; import java.util.concurrent.atomic.AtomicBoolean; @@ -49,9 +51,11 @@ import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion; import org.apache.ignite.internal.processors.cache.distributed.dht.GridClientPartitionTopology; import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtPartitionTopology; import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtTopologyFuture; +import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPartitionDemandMessage; import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPartitionExchangeId; import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPartitionFullMap; import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPartitionMap; +import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPartitionSupplyMessageV2; import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPartitionsExchangeFuture; import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPartitionsFullMessage; import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPartitionsSingleMessage; @@ -65,8 +69,10 @@ import org.apache.ignite.internal.util.future.GridFutureAdapter; import org.apache.ignite.internal.util.tostring.GridToStringExclude; import org.apache.ignite.internal.util.tostring.GridToStringInclude; import org.apache.ignite.internal.util.typedef.CI1; +import org.apache.ignite.internal.util.typedef.CI2; import org.apache.ignite.internal.util.typedef.F; import org.apache.ignite.internal.util.typedef.internal.CU; +import org.apache.ignite.internal.util.typedef.internal.GPC; import org.apache.ignite.internal.util.typedef.internal.S; import org.apache.ignite.internal.util.typedef.internal.U; import org.apache.ignite.internal.util.worker.GridWorker; @@ -75,6 +81,7 @@ import org.apache.ignite.lang.IgniteUuid; import org.apache.ignite.thread.IgniteThread; import org.jetbrains.annotations.Nullable; import org.jsr166.ConcurrentHashMap8; +import org.jsr166.ConcurrentLinkedDeque8; import static java.util.concurrent.TimeUnit.MILLISECONDS; import static org.apache.ignite.IgniteSystemProperties.IGNITE_AFFINITY_HISTORY_SIZE; @@ -85,6 +92,7 @@ import static org.apache.ignite.events.EventType.EVT_CACHE_REBALANCE_STARTED; import static org.apache.ignite.events.EventType.EVT_NODE_FAILED; import static org.apache.ignite.events.EventType.EVT_NODE_JOINED; import static org.apache.ignite.events.EventType.EVT_NODE_LEFT; +import static org.apache.ignite.internal.GridTopic.TOPIC_CACHE; import static org.apache.ignite.internal.events.DiscoveryCustomEvent.EVT_DISCOVERY_CUSTOM_EVT; import static org.apache.ignite.internal.managers.communication.GridIoPolicy.SYSTEM_POOL; import static org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPreloader.DFLT_PRELOAD_RESEND_TIMEOUT; @@ -132,6 +140,9 @@ public class GridCachePartitionExchangeManager<K, V> extends GridCacheSharedMana /** */ private GridFutureAdapter<?> reconnectExchangeFut; + /** */ + private final Queue<Callable<Boolean>> rebalanceQ = new ConcurrentLinkedDeque8<>(); + /** * Partition map futures. * This set also contains already completed exchange futures to address race conditions when coordinator @@ -309,6 +320,33 @@ public class GridCachePartitionExchangeManager<K, V> extends GridCacheSharedMana exchWorker.futQ.addFirst(fut); + if (!cctx.kernalContext().clientNode()) { + for (int cnt = 0; cnt < cctx.gridConfig().getRebalanceThreadPoolSize(); cnt++) { + final int idx = cnt; + + cctx.io().addOrderedHandler(rebalanceTopic(cnt), new CI2<UUID, GridCacheMessage>() { + @Override public void apply(final UUID id, final GridCacheMessage m) { + if (!enterBusy()) + return; + + try { + if (m instanceof GridDhtPartitionSupplyMessageV2) + cctx.cacheContext(m.cacheId).preloader().handleSupplyMessage( + idx, id, (GridDhtPartitionSupplyMessageV2)m); + else if (m instanceof GridDhtPartitionDemandMessage) + cctx.cacheContext(m.cacheId).preloader().handleDemandMessage( + idx, id, (GridDhtPartitionDemandMessage)m); + else + log.error("Unsupported message type: " + m.getClass().getName()); + } + finally { + leaveBusy(); + } + } + }); + } + } + new IgniteThread(cctx.gridName(), "exchange-worker", exchWorker).start(); if (reconnect) { @@ -368,6 +406,14 @@ public class GridCachePartitionExchangeManager<K, V> extends GridCacheSharedMana } } + /** + * @param idx Index. + * @return Topic for index. + */ + public static Object rebalanceTopic(int idx) { + return TOPIC_CACHE.topic("Rebalance", idx); + } + /** {@inheritDoc} */ @Override protected void onKernalStop0(boolean cancel) { cctx.gridEvents().removeLocalEventListener(discoLsnr); @@ -392,6 +438,11 @@ public class GridCachePartitionExchangeManager<K, V> extends GridCacheSharedMana for (AffinityReadyFuture f : readyFuts.values()) f.onDone(stopErr); + if (!cctx.kernalContext().clientNode()) { + for (int cnt = 0; cnt < cctx.gridConfig().getRebalanceThreadPoolSize(); cnt++) + cctx.io().removeOrderedHandler(rebalanceTopic(cnt)); + } + U.cancel(exchWorker); if (log.isDebugEnabled()) @@ -1015,6 +1066,10 @@ public class GridCachePartitionExchangeManager<K, V> extends GridCacheSharedMana } dumpPendingObjects(); + + for (GridCacheContext cacheCtx : cctx.cacheContexts()) { + cacheCtx.preloader().dumpDebugInfo(); + } } /** @@ -1127,9 +1182,15 @@ public class GridCachePartitionExchangeManager<K, V> extends GridCacheSharedMana boolean startEvtFired = false; + int cnt = 0; + + IgniteInternalFuture asyncStartFut = null; + while (!isCancelled()) { GridDhtPartitionsExchangeFuture exchFut = null; + cnt++; + try { boolean preloadFinished = true; @@ -1244,12 +1305,111 @@ public class GridCachePartitionExchangeManager<K, V> extends GridCacheSharedMana } if (assignsMap != null) { + int size = assignsMap.size(); + + rebalanceQ.clear(); + + NavigableMap<Integer, List<Integer>> orderMap = new TreeMap<>(); + for (Map.Entry<Integer, GridDhtPreloaderAssignments> e : assignsMap.entrySet()) { int cacheId = e.getKey(); GridCacheContext<K, V> cacheCtx = cctx.cacheContext(cacheId); - cacheCtx.preloader().addAssignments(e.getValue(), forcePreload); + int order = cacheCtx.config().getRebalanceOrder(); + + if (orderMap.get(order) == null) + orderMap.put(order, new ArrayList<Integer>(size)); + + orderMap.get(order).add(cacheId); + } + + Callable<Boolean> marshR = null; + List<Callable<Boolean>> orderedRs = new ArrayList<>(size); + + //Ordered rebalance scheduling. + for (Integer order : orderMap.keySet()) { + for (Integer cacheId : orderMap.get(order)) { + GridCacheContext<K, V> cacheCtx = cctx.cacheContext(cacheId); + + List<String> waitList = new ArrayList<>(size - 1); + + for (List<Integer> cIds : orderMap.headMap(order).values()) { + for (Integer cId : cIds) { + waitList.add(cctx.cacheContext(cId).name()); + } + } + + Callable<Boolean> r = cacheCtx.preloader().addAssignments( + assignsMap.get(cacheId), forcePreload, waitList, cnt); + + if (r != null) { + U.log(log, "Cache rebalancing scheduled: [cache=" + cacheCtx.name() + + ", waitList=" + waitList.toString() + "]"); + + if (cacheId == CU.cacheId(GridCacheUtils.MARSH_CACHE_NAME)) + marshR = r; + else + orderedRs.add(r); + } + } + } + + if (asyncStartFut != null) + asyncStartFut.get(); // Wait for thread stop. + + rebalanceQ.addAll(orderedRs); + + if (marshR != null || !rebalanceQ.isEmpty()) { + if (futQ.isEmpty()) { + U.log(log, "Starting caches rebalancing [top=" + exchFut.topologyVersion() + "]"); + + if (marshR != null) + try { + marshR.call(); //Marshaller cache rebalancing launches in sync way. + } + catch (Exception ex) { + if (log.isDebugEnabled()) + log.debug("Failed to send initial demand request to node"); + + continue; + } + + final GridFutureAdapter fut = new GridFutureAdapter(); + + asyncStartFut = fut; + + cctx.kernalContext().closure().callLocalSafe(new GPC<Boolean>() { + @Override public Boolean call() { + try { + while (true) { + Callable<Boolean> r = rebalanceQ.poll(); + + if (r == null) + return false; + + if (!r.call()) + return false; + } + } + catch (Exception ex) { + if (log.isDebugEnabled()) + log.debug("Failed to send initial demand request to node"); + + return false; + } + finally { + fut.onDone(); + } + } + }, /*system pool*/ true); + } + else { + U.log(log, "Obsolete exchange, skipping rebalancing [top=" + exchFut.topologyVersion() + "]"); + } + } + else { + U.log(log, "Nothing scheduled, skipping rebalancing [top=" + exchFut.topologyVersion() + "]"); } } } http://git-wip-us.apache.org/repos/asf/ignite/blob/242d988a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePreloader.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePreloader.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePreloader.java index 1edaef2..1658a89 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePreloader.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePreloader.java @@ -18,9 +18,14 @@ package org.apache.ignite.internal.processors.cache; import java.util.Collection; +import java.util.UUID; +import java.util.concurrent.Callable; import org.apache.ignite.IgniteCheckedException; import org.apache.ignite.internal.IgniteInternalFuture; import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion; +import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtLocalPartition; +import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPartitionDemandMessage; +import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPartitionSupplyMessageV2; import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPartitionsExchangeFuture; import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPreloaderAssignments; import org.apache.ignite.lang.IgnitePredicate; @@ -90,8 +95,11 @@ public interface GridCachePreloader { * * @param assignments Assignments to add. * @param forcePreload Force preload flag. + * @param caches Rebalancing of these caches will be finished before this started. + * @param cnt Counter. */ - public void addAssignments(GridDhtPreloaderAssignments assignments, boolean forcePreload); + public Callable<Boolean> addAssignments(GridDhtPreloaderAssignments assignments, boolean forcePreload, + Collection<String> caches, int cnt); /** * @param p Preload predicate. @@ -115,6 +123,17 @@ public interface GridCachePreloader { public IgniteInternalFuture<?> syncFuture(); /** + * @return Future which will complete when preloading finishes on current topology. + * + * Future result is {@code true} in case rebalancing successfully finished at current topology. + * Future result is {@code false} in case rebalancing cancelled or finished with missed partitions and will be + * restarted at current or pending topology. + * + * Note that topology change creates new futures and finishes previous. + */ + public IgniteInternalFuture<Boolean> rebalanceFuture(); + + /** * Requests that preloader sends the request for the key. * * @param keys Keys to request. @@ -134,7 +153,39 @@ public interface GridCachePreloader { public void unwindUndeploys(); /** + * Handles Supply message. + * + * @param idx Index. + * @param id Node Id. + * @param s Supply message. + */ + public void handleSupplyMessage(int idx, UUID id, final GridDhtPartitionSupplyMessageV2 s); + + /** + * Handles Demand message. + * + * @param idx Index. + * @param id Node Id. + * @param d Demand message. + */ + public void handleDemandMessage(int idx, UUID id, GridDhtPartitionDemandMessage d); + + /** + * Evicts partition asynchronously. + * + * @param part Partition. + */ + public void evictPartitionAsync(GridDhtLocalPartition part); + + /** + * Handles new topology. + * + * @param topVer Topology version. + */ + public void onTopologyChanged(AffinityTopologyVersion topVer); + + /** * Dumps debug information. */ public void dumpDebugInfo(); -} \ No newline at end of file +} http://git-wip-us.apache.org/repos/asf/ignite/blob/242d988a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePreloaderAdapter.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePreloaderAdapter.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePreloaderAdapter.java index 4ec6749..9c0e9f7 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePreloaderAdapter.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePreloaderAdapter.java @@ -18,11 +18,16 @@ package org.apache.ignite.internal.processors.cache; import java.util.Collection; +import java.util.UUID; +import java.util.concurrent.Callable; import org.apache.ignite.IgniteCheckedException; import org.apache.ignite.IgniteLogger; import org.apache.ignite.cache.affinity.AffinityFunction; import org.apache.ignite.internal.IgniteInternalFuture; import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion; +import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtLocalPartition; +import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPartitionDemandMessage; +import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPartitionSupplyMessageV2; import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPartitionsExchangeFuture; import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPreloaderAssignments; import org.apache.ignite.internal.util.future.GridFinishedFuture; @@ -36,7 +41,7 @@ public class GridCachePreloaderAdapter implements GridCachePreloader { /** Cache context. */ protected final GridCacheContext<?, ?> cctx; - /** Logger.*/ + /** Logger. */ protected final IgniteLogger log; /** Affinity. */ @@ -113,12 +118,28 @@ public class GridCachePreloaderAdapter implements GridCachePreloader { } /** {@inheritDoc} */ + @Override public IgniteInternalFuture<Boolean> rebalanceFuture() { + return new GridFinishedFuture<>(true); + } + + /** {@inheritDoc} */ @Override public void unwindUndeploys() { cctx.deploy().unwind(cctx); } /** {@inheritDoc} */ - @Override public IgniteInternalFuture<Object> request(Collection<KeyCacheObject> keys, AffinityTopologyVersion topVer) { + @Override public void handleSupplyMessage(int idx, UUID id, GridDhtPartitionSupplyMessageV2 s) { + // No-op. + } + + /** {@inheritDoc} */ + @Override public void handleDemandMessage(int idx, UUID id, GridDhtPartitionDemandMessage d) { + // No-op. + } + + /** {@inheritDoc} */ + @Override public IgniteInternalFuture<Object> request(Collection<KeyCacheObject> keys, + AffinityTopologyVersion topVer) { return new GridFinishedFuture<>(); } @@ -143,7 +164,18 @@ public class GridCachePreloaderAdapter implements GridCachePreloader { } /** {@inheritDoc} */ - @Override public void addAssignments(GridDhtPreloaderAssignments assignments, boolean forcePreload) { + @Override public Callable<Boolean> addAssignments(GridDhtPreloaderAssignments assignments, boolean forcePreload, + Collection<String> caches, int cnt) { + return null; + } + + /** {@inheritDoc} */ + @Override public void evictPartitionAsync(GridDhtLocalPartition part) { + // No-op. + } + + /** {@inheritDoc} */ + @Override public void onTopologyChanged(AffinityTopologyVersion topVer) { // No-op. } @@ -151,4 +183,4 @@ public class GridCachePreloaderAdapter implements GridCachePreloader { @Override public void dumpDebugInfo() { // No-op. } -} \ No newline at end of file +} http://git-wip-us.apache.org/repos/asf/ignite/blob/242d988a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java index b2bb6ff..f54f63e 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java @@ -31,9 +31,7 @@ import java.util.LinkedList; import java.util.List; import java.util.ListIterator; import java.util.Map; -import java.util.NavigableMap; import java.util.Set; -import java.util.TreeMap; import java.util.UUID; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentMap; @@ -98,7 +96,6 @@ import org.apache.ignite.internal.processors.cache.version.GridCacheVersionManag import org.apache.ignite.internal.processors.plugin.CachePluginManager; import org.apache.ignite.internal.processors.query.GridQueryProcessor; import org.apache.ignite.internal.util.F0; -import org.apache.ignite.internal.util.future.GridCompoundFuture; import org.apache.ignite.internal.util.future.GridFinishedFuture; import org.apache.ignite.internal.util.future.GridFutureAdapter; import org.apache.ignite.internal.util.tostring.GridToStringInclude; @@ -160,12 +157,6 @@ public class GridCacheProcessor extends GridProcessorAdapter { /** Map of proxies. */ private final Map<String, IgniteCacheProxy<?, ?>> jCacheProxies; - /** Map of preload finish futures grouped by preload order. */ - private final NavigableMap<Integer, IgniteInternalFuture<?>> preloadFuts; - - /** Maximum detected rebalance order. */ - private int maxRebalanceOrder; - /** Caches stop sequence. */ private final Deque<String> stopSeq; @@ -207,8 +198,6 @@ public class GridCacheProcessor extends GridProcessorAdapter { caches = new ConcurrentHashMap<>(); jCacheProxies = new ConcurrentHashMap<>(); - preloadFuts = new TreeMap<>(); - stopSeq = new LinkedList<>(); } @@ -378,10 +367,8 @@ public class GridCacheProcessor extends GridProcessorAdapter { U.warn(log, "AffinityFunction configuration parameter will be ignored for local cache [cacheName=" + U.maskName(cc.getName()) + ']'); - if (cc.getRebalanceMode() != CacheRebalanceMode.NONE) { - assertParameter(cc.getRebalanceThreadPoolSize() > 0, "rebalanceThreadPoolSize > 0"); + if (cc.getRebalanceMode() != CacheRebalanceMode.NONE) assertParameter(cc.getRebalanceBatchSize() > 0, "rebalanceBatchSize > 0"); - } if (cc.getCacheMode() == PARTITIONED || cc.getCacheMode() == REPLICATED) { if (cc.getAtomicityMode() == ATOMIC && cc.getWriteSynchronizationMode() == FULL_ASYNC) @@ -591,8 +578,6 @@ public class GridCacheProcessor extends GridProcessorAdapter { "Deployment mode for cache is not CONTINUOUS or SHARED."); } - maxRebalanceOrder = validatePreloadOrder(ctx.config().getCacheConfiguration()); - ctx.discovery().setCustomEventListener(DynamicCacheChangeBatch.class, new CustomEventListener<DynamicCacheChangeBatch>() { @Override public void onCustomEvent(ClusterNode snd, @@ -820,31 +805,6 @@ public class GridCacheProcessor extends GridProcessorAdapter { for (GridCacheSharedManager<?, ?> mgr : sharedCtx.managers()) mgr.onKernalStart(false); - for (Map.Entry<String, GridCacheAdapter<?, ?>> e : caches.entrySet()) { - GridCacheAdapter cache = e.getValue(); - - if (maxRebalanceOrder > 0) { - CacheConfiguration cfg = cache.configuration(); - - int order = cfg.getRebalanceOrder(); - - if (order > 0 && order != maxRebalanceOrder && cfg.getCacheMode() != LOCAL) { - GridCompoundFuture fut = (GridCompoundFuture)preloadFuts.get(order); - - if (fut == null) { - fut = new GridCompoundFuture<>(); - - preloadFuts.put(order, fut); - } - - fut.add(cache.preloader().syncFuture()); - } - } - } - - for (IgniteInternalFuture<?> fut : preloadFuts.values()) - ((GridCompoundFuture<Object, Object>)fut).markInitialized(); - for (GridCacheAdapter<?, ?> cache : caches.values()) onKernalStart(cache); @@ -2791,19 +2751,6 @@ public class GridCacheProcessor extends GridProcessorAdapter { } /** - * Gets preload finish future for preload-ordered cache with given order. I.e. will get compound preload future - * with maximum order less than {@code order}. - * - * @param order Cache order. - * @return Compound preload future or {@code null} if order is minimal order found. - */ - @Nullable public IgniteInternalFuture<?> orderedPreloadFuture(int order) { - Map.Entry<Integer, IgniteInternalFuture<?>> entry = preloadFuts.lowerEntry(order); - - return entry == null ? null : entry.getValue(); - } - - /** * @param spaceName Space name. * @param keyBytes Key bytes. * @param valBytes Value bytes. http://git-wip-us.apache.org/repos/asf/ignite/blob/242d988a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtCacheEntry.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtCacheEntry.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtCacheEntry.java index 1b2d834..392ad6a 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtCacheEntry.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtCacheEntry.java @@ -37,6 +37,7 @@ import org.apache.ignite.internal.processors.cache.GridCacheMvccCandidate; import org.apache.ignite.internal.processors.cache.KeyCacheObject; import org.apache.ignite.internal.processors.cache.distributed.GridDistributedCacheEntry; import org.apache.ignite.internal.processors.cache.distributed.GridDistributedLockCancelledException; +import org.apache.ignite.internal.processors.cache.extras.GridCacheObsoleteEntryExtras; import org.apache.ignite.internal.processors.cache.transactions.IgniteInternalTx; import org.apache.ignite.internal.processors.cache.version.GridCacheVersion; import org.apache.ignite.internal.util.lang.GridPlainRunnable; @@ -554,7 +555,11 @@ public class GridDhtCacheEntry extends GridDistributedCacheEntry { * @return {@code True} if entry was not being used, passed the filter and could be removed. * @throws IgniteCheckedException If failed to remove from swap. */ - public boolean clearInternal(GridCacheVersion ver, boolean swap) throws IgniteCheckedException { + public boolean clearInternal( + GridCacheVersion ver, + boolean swap, + GridCacheObsoleteEntryExtras extras + ) throws IgniteCheckedException { boolean rmv = false; try { @@ -563,7 +568,7 @@ public class GridDhtCacheEntry extends GridDistributedCacheEntry { // Call markObsolete0 to avoid recursive calls to clear if // we are clearing dht local partition (onMarkedObsolete should not be called). - if (!markObsolete0(ver, false)) { + if (!markObsolete0(ver, false, extras)) { if (log.isDebugEnabled()) log.debug("Entry could not be marked obsolete (it is still used or has readers): " + this); @@ -819,4 +824,4 @@ public class GridDhtCacheEntry extends GridDistributedCacheEntry { return S.toString(ReaderId.class, this); } } -} \ No newline at end of file +} http://git-wip-us.apache.org/repos/asf/ignite/blob/242d988a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtLocalPartition.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtLocalPartition.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtLocalPartition.java index 749d06a..1516ee4 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtLocalPartition.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtLocalPartition.java @@ -17,6 +17,17 @@ package org.apache.ignite.internal.processors.cache.distributed.dht; +import java.util.Collection; +import java.util.HashMap; +import java.util.Iterator; +import java.util.Map; +import java.util.Set; +import java.util.concurrent.ConcurrentMap; +import java.util.concurrent.CopyOnWriteArrayList; +import java.util.concurrent.atomic.AtomicReference; +import java.util.concurrent.atomic.AtomicStampedReference; +import java.util.concurrent.locks.ReentrantLock; +import javax.cache.CacheException; import org.apache.ignite.IgniteCheckedException; import org.apache.ignite.IgniteLogger; import org.apache.ignite.internal.IgniteInternalFuture; @@ -27,10 +38,10 @@ import org.apache.ignite.internal.processors.cache.GridCacheEntryRemovedExceptio import org.apache.ignite.internal.processors.cache.GridCacheSwapEntry; import org.apache.ignite.internal.processors.cache.KeyCacheObject; import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPreloader; +import org.apache.ignite.internal.processors.cache.extras.GridCacheObsoleteEntryExtras; import org.apache.ignite.internal.processors.cache.version.GridCacheVersion; import org.apache.ignite.internal.processors.query.GridQueryProcessor; import org.apache.ignite.internal.util.GridCircularBuffer; -import org.apache.ignite.internal.util.future.GridFinishedFuture; import org.apache.ignite.internal.util.future.GridFutureAdapter; import org.apache.ignite.internal.util.lang.GridCloseableIterator; import org.apache.ignite.internal.util.tostring.GridToStringExclude; @@ -38,7 +49,6 @@ import org.apache.ignite.internal.util.typedef.CI1; import org.apache.ignite.internal.util.typedef.F; import org.apache.ignite.internal.util.typedef.T2; import org.apache.ignite.internal.util.typedef.internal.CU; -import org.apache.ignite.internal.util.typedef.internal.GPC; import org.apache.ignite.internal.util.typedef.internal.S; import org.apache.ignite.internal.util.typedef.internal.U; import org.apache.ignite.lang.IgniteUuid; @@ -46,18 +56,6 @@ import org.jetbrains.annotations.NotNull; import org.jsr166.ConcurrentHashMap8; import org.jsr166.LongAdder8; -import javax.cache.CacheException; -import java.util.Collection; -import java.util.HashMap; -import java.util.Iterator; -import java.util.Map; -import java.util.Set; -import java.util.concurrent.ConcurrentMap; -import java.util.concurrent.CopyOnWriteArrayList; -import java.util.concurrent.atomic.AtomicReference; -import java.util.concurrent.atomic.AtomicStampedReference; -import java.util.concurrent.locks.ReentrantLock; - import static org.apache.ignite.IgniteSystemProperties.IGNITE_ATOMIC_CACHE_DELETE_HISTORY_SIZE; import static org.apache.ignite.events.EventType.EVT_CACHE_REBALANCE_OBJECT_UNLOADED; import static org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtPartitionState.EVICTED; @@ -286,7 +284,7 @@ public class GridDhtLocalPartition implements Comparable<GridDhtLocalPartition>, } // Attempt to evict. - tryEvict(true); + tryEvict(); } /** @@ -411,7 +409,7 @@ public class GridDhtLocalPartition implements Comparable<GridDhtLocalPartition>, // Decrement reservations. if (state.compareAndSet(s, s, reservations, --reservations)) { - tryEvict(true); + tryEvict(); break; } @@ -477,10 +475,10 @@ public class GridDhtLocalPartition implements Comparable<GridDhtLocalPartition>, /** * @param updateSeq Update sequence. - * @return Future for evict attempt. */ - IgniteInternalFuture<Boolean> tryEvictAsync(boolean updateSeq) { + void tryEvictAsync(boolean updateSeq) { if (map.isEmpty() && !GridQueryProcessor.isEnabled(cctx.config()) && + state.getReference() == RENTING && state.getStamp() == 0 && state.compareAndSet(RENTING, EVICTED, 0, 0)) { if (log.isDebugEnabled()) log.debug("Evicted partition: " + this); @@ -497,15 +495,9 @@ public class GridDhtLocalPartition implements Comparable<GridDhtLocalPartition>, ((GridDhtPreloader)cctx.preloader()).onPartitionEvicted(this, updateSeq); clearDeferredDeletes(); - - return new GridFinishedFuture<>(true); } - - return cctx.closures().callLocalSafe(new GPC<Boolean>() { - @Override public Boolean call() { - return tryEvict(true); - } - }, /*system pool*/ true); + else + cctx.preloader().evictPartitionAsync(this); } /** @@ -521,12 +513,11 @@ public class GridDhtLocalPartition implements Comparable<GridDhtLocalPartition>, } /** - * @param updateSeq Update sequence. - * @return {@code True} if entry has been transitioned to state EVICTED. + * */ - boolean tryEvict(boolean updateSeq) { + public void tryEvict() { if (state.getReference() != RENTING || state.getStamp() != 0 || groupReserved()) - return false; + return; // Attempt to evict partition entries from cache. clearAll(); @@ -545,14 +536,10 @@ public class GridDhtLocalPartition implements Comparable<GridDhtLocalPartition>, rent.onDone(); - ((GridDhtPreloader)cctx.preloader()).onPartitionEvicted(this, updateSeq); + ((GridDhtPreloader)cctx.preloader()).onPartitionEvicted(this, true); clearDeferredDeletes(); - - return true; } - - return false; } /** @@ -592,7 +579,7 @@ public class GridDhtLocalPartition implements Comparable<GridDhtLocalPartition>, * */ void onUnlock() { - tryEvict(true); + tryEvict(); } /** @@ -640,6 +627,8 @@ public class GridDhtLocalPartition implements Comparable<GridDhtLocalPartition>, it = F.concat(it, unswapIt); } + GridCacheObsoleteEntryExtras extras = new GridCacheObsoleteEntryExtras(clearVer); + try { while (it.hasNext()) { GridDhtCacheEntry cached = null; @@ -647,7 +636,7 @@ public class GridDhtLocalPartition implements Comparable<GridDhtLocalPartition>, try { cached = it.next(); - if (cached.clearInternal(clearVer, swap)) { + if (cached.clearInternal(clearVer, swap, extras)) { map.remove(cached.key(), cached); if (!cached.isInternal()) { http://git-wip-us.apache.org/repos/asf/ignite/blob/242d988a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionsReservation.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionsReservation.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionsReservation.java index 756326e..d12247e 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionsReservation.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionsReservation.java @@ -239,7 +239,7 @@ public class GridDhtPartitionsReservation implements GridReservable { } /** - * Must be checked in {@link GridDhtLocalPartition#tryEvict(boolean)}. + * Must be checked in {@link GridDhtLocalPartition#tryEvict()}. * If returns {@code true} this reservation object becomes invalid and partitions * can be evicted or at least cleared. * Also this means that after returning {@code true} here method {@link #reserve()} can not http://git-wip-us.apache.org/repos/asf/ignite/blob/242d988a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionDemandMessage.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionDemandMessage.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionDemandMessage.java index 7609d98..53c3d90 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionDemandMessage.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionDemandMessage.java @@ -68,8 +68,6 @@ public class GridDhtPartitionDemandMessage extends GridCacheMessage { * @param topVer Topology version. */ GridDhtPartitionDemandMessage(long updateSeq, @NotNull AffinityTopologyVersion topVer, int cacheId) { - assert updateSeq > 0; - this.cacheId = cacheId; this.updateSeq = updateSeq; this.topVer = topVer; @@ -116,6 +114,13 @@ public class GridDhtPartitionDemandMessage extends GridCacheMessage { } /** + * @param updateSeq Update sequence. + */ + void updateSequence(long updateSeq) { + this.updateSeq = updateSeq; + } + + /** * @return Update sequence. */ long updateSequence() { @@ -325,7 +330,8 @@ public class GridDhtPartitionDemandMessage extends GridCacheMessage { /** {@inheritDoc} */ @Override public String toString() { - return S.toString(GridDhtPartitionDemandMessage.class, this, "partCnt", parts.size(), "super", - super.toString()); + return S.toString(GridDhtPartitionDemandMessage.class, this, + "partCnt", parts != null ? parts.size() : 0, + "super", super.toString()); } }