Repository: ignite Updated Branches: refs/heads/ignite-1093-3 8333e2535 -> 7316e4a44
1093 Project: http://git-wip-us.apache.org/repos/asf/ignite/repo Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/7316e4a4 Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/7316e4a4 Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/7316e4a4 Branch: refs/heads/ignite-1093-3 Commit: 7316e4a44b422cd00df7661a8086252b4c877cf4 Parents: 8333e25 Author: Anton Vinogradov <[email protected]> Authored: Tue Nov 3 12:59:27 2015 +0300 Committer: Anton Vinogradov <[email protected]> Committed: Tue Nov 3 12:59:27 2015 +0300 ---------------------------------------------------------------------- .../configuration/CacheConfiguration.java | 72 +++++++++++--------- .../configuration/IgniteConfiguration.java | 2 +- .../apache/ignite/internal/IgniteKernal.java | 11 +++ .../processors/cache/GridCachePreloader.java | 8 ++- .../dht/preloader/GridDhtPartitionDemander.java | 8 +-- .../dht/preloader/GridDhtPartitionSupplier.java | 5 +- .../GridCacheRebalancingSyncSelfTest.java | 4 +- 7 files changed, 67 insertions(+), 43 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/ignite/blob/7316e4a4/modules/core/src/main/java/org/apache/ignite/configuration/CacheConfiguration.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/configuration/CacheConfiguration.java b/modules/core/src/main/java/org/apache/ignite/configuration/CacheConfiguration.java index 1e1f437..76929b9 100644 --- a/modules/core/src/main/java/org/apache/ignite/configuration/CacheConfiguration.java +++ b/modules/core/src/main/java/org/apache/ignite/configuration/CacheConfiguration.java @@ -69,13 +69,14 @@ public class CacheConfiguration<K, V> extends MutableConfiguration<K, V> { private static final long serialVersionUID = 0L; /** Default size of rebalance thread pool. */ + @Deprecated public static final int DFLT_REBALANCE_THREAD_POOL_SIZE = 2; /** Default rebalance timeout (ms).*/ public static final long DFLT_REBALANCE_TIMEOUT = 10000; - /** Default rebalance batches count. */ - public static final long DFLT_REBALANCE_BATCHES_COUNT = 2; + /** Default rebalance batches prefetch count. */ + public static final long DFLT_REBALANCE_BATCHES_PREFETCH_COUNT = 2; /** Time in milliseconds to wait between rebalance messages to avoid overloading CPU. */ public static final long DFLT_REBALANCE_THROTTLE = 0; @@ -177,6 +178,7 @@ public class CacheConfiguration<K, V> extends MutableConfiguration<K, V> { private String name; /** Rebalance thread pool size. */ + @Deprecated private int rebalancePoolSize = DFLT_REBALANCE_THREAD_POOL_SIZE; /** Rebalance timeout. */ @@ -257,12 +259,12 @@ public class CacheConfiguration<K, V> extends MutableConfiguration<K, V> { /** Rebalance batch size. */ private int rebalanceBatchSize = DFLT_REBALANCE_BATCH_SIZE; + /** Rebalance batches prefetch count. */ + private long rebalanceBatchesPrefetchCount = DFLT_REBALANCE_BATCHES_PREFETCH_COUNT; + /** Off-heap memory size. */ private long offHeapMaxMem = DFLT_OFFHEAP_MEMORY; - /** Rebalance batches count. */ - private long rebalanceBatchesCount = DFLT_REBALANCE_BATCHES_COUNT; - /** */ private boolean swapEnabled = DFLT_SWAP_ENABLED; @@ -405,7 +407,7 @@ public class CacheConfiguration<K, V> extends MutableConfiguration<K, V> { rebalanceDelay = cc.getRebalanceDelay(); rebalanceOrder = cc.getRebalanceOrder(); rebalancePoolSize = cc.getRebalanceThreadPoolSize(); - rebalanceBatchesCount = cc.getRebalanceBatchesCount(); + rebalanceBatchesPrefetchCount = cc.getRebalanceBatchesPrefetchCount(); rebalanceTimeout = cc.getRebalanceTimeout(); rebalanceThrottle = cc.getRebalanceThrottle(); readFromBackup = cc.isReadFromBackup(); @@ -1089,7 +1091,36 @@ public class CacheConfiguration<K, V> extends MutableConfiguration<K, V> { * @return {@code this} for chaining. */ public CacheConfiguration<K, V> setRebalanceBatchSize(int rebalanceBatchSize) { - this.rebalanceBatchSize = Math.max(1, rebalanceBatchSize); + this.rebalanceBatchSize = rebalanceBatchSize; + + return this; + } + + /** + * To gain better rebalancing performance supplier node can provide more than one batch at rebalancing start and + * provide one new to each next demand request. + * + * Gets number of batches generated by supply node at rebalancing start. + * Minimum is 1. + * + * @return batches count + */ + public long getRebalanceBatchesPrefetchCount() { + return rebalanceBatchesPrefetchCount; + } + + /** + * To gain better rebalancing performance supplier node can provide more than one batch at rebalancing start and + * provide one new to each next demand request. + * + * Sets number of batches generated by supply node at rebalancing start. + * Minimum is 1. + * + * @param rebalanceBatchesCnt batches count. + * @return {@code this} for chaining. + */ + public CacheConfiguration<K, V> setRebalanceBatchesPrefetchCount(long rebalanceBatchesCnt) { + this.rebalanceBatchesPrefetchCount = rebalanceBatchesCnt; return this; } @@ -1781,33 +1812,6 @@ public class CacheConfiguration<K, V> extends MutableConfiguration<K, V> { } /** - * To gain better rebalancing performance supplier node can provide more than one batch at start and provide - * one new to each next demand request. - * - * Gets number of batches generated by supply node at rebalancing start. - * - * @return batches count - */ - public long getRebalanceBatchesCount() { - return rebalanceBatchesCount; - } - - /** - * To gain better rebalancing performance supplier node can provide more than one batch at start and provide - * one new to each next demand request. - * - * Sets number of batches generated by supply node at rebalancing start. - * - * @param rebalanceBatchesCnt batches count. - * @return {@code this} for chaining. - */ - public CacheConfiguration<K, V> setRebalanceBatchesCount(long rebalanceBatchesCnt) { - this.rebalanceBatchesCount = rebalanceBatchesCnt; - - return this; - } - - /** * Gets cache store session listener factories. * * @return Cache store session listener factories. http://git-wip-us.apache.org/repos/asf/ignite/blob/7316e4a4/modules/core/src/main/java/org/apache/ignite/configuration/IgniteConfiguration.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/configuration/IgniteConfiguration.java b/modules/core/src/main/java/org/apache/ignite/configuration/IgniteConfiguration.java index 02b1066..7cf3a65 100644 --- a/modules/core/src/main/java/org/apache/ignite/configuration/IgniteConfiguration.java +++ b/modules/core/src/main/java/org/apache/ignite/configuration/IgniteConfiguration.java @@ -1343,7 +1343,7 @@ public class IgniteConfiguration { * @return count. */ public int getRebalanceThreadPoolSize() { - return Math.max(1, rebalanceThreadPoolSize); + return rebalanceThreadPoolSize; } /** http://git-wip-us.apache.org/repos/asf/ignite/blob/7316e4a4/modules/core/src/main/java/org/apache/ignite/internal/IgniteKernal.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/IgniteKernal.java b/modules/core/src/main/java/org/apache/ignite/internal/IgniteKernal.java index 7691434..94ffb40 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/IgniteKernal.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/IgniteKernal.java @@ -2144,6 +2144,17 @@ public class IgniteKernal implements IgniteEx, IgniteMXBean, Externalizable { if (cfg.getSystemThreadPoolSize() <= cfg.getRebalanceThreadPoolSize()) throw new IgniteCheckedException("Rebalance thread pool size exceed or equals System thread pool size. " + "Change IgniteConfiguration.rebalanceThreadPoolSize property before next start."); + + if (cfg.getRebalanceThreadPoolSize() < 1) + throw new IgniteCheckedException("Rebalance thread pool size minimal allowed value is 1. " + + "Change IgniteConfiguration.rebalanceThreadPoolSize property before next start."); + + for (CacheConfiguration ccfg : cfg.getCacheConfiguration()){ + if (ccfg.getRebalanceBatchesPrefetchCount() < 1) + throw new IgniteCheckedException("Rebalance batches prefetch count minimal allowed value is 1. " + + "Change CacheConfiguration.rebalanceBatchesPrefetchCount property before next start. " + + "[cache="+ccfg.getName()+"]"); + } } /** http://git-wip-us.apache.org/repos/asf/ignite/blob/7316e4a4/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePreloader.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePreloader.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePreloader.java index cda392c..bab3b32 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePreloader.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePreloader.java @@ -123,7 +123,13 @@ public interface GridCachePreloader { public IgniteInternalFuture<?> syncFuture(); /** - * @return Future which will complete when preloading is finished on current topology. + * @return Future which will complete when preloading finishes on current topology. + * + * Future result is {@code true} in case rebalancing successfully finished at current topology. + * Future result is {@code false} in case rebalancing cancelled or finished with missed partitions and will be + * restarted at current or pending topology. + * + * Note that topology change creates new futures and finishes previous. */ public IgniteInternalFuture<Boolean> rebalanceFuture(); http://git-wip-us.apache.org/repos/asf/ignite/blob/7316e4a4/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionDemander.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionDemander.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionDemander.java index f2c1dc2..b131679 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionDemander.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionDemander.java @@ -110,6 +110,10 @@ public class GridDhtPartitionDemander { @Deprecated//Backward compatibility. To be removed in future. private final ReadWriteLock demandLock; + /** DemandWorker index. */ + @Deprecated//Backward compatibility. To be removed in future. + private final AtomicInteger dmIdx = new AtomicInteger(); + /** * @param cctx Cctx. * @param demandLock Demand lock. @@ -1063,10 +1067,6 @@ public class GridDhtPartitionDemander { } } - /** DemandWorker index. */ - @Deprecated//Backward compatibility. To be removed in future. - private final AtomicInteger dmIdx = new AtomicInteger(); - /** * */ http://git-wip-us.apache.org/repos/asf/ignite/blob/7316e4a4/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionSupplier.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionSupplier.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionSupplier.java index 494560f..e7e1dbe 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionSupplier.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionSupplier.java @@ -212,6 +212,9 @@ class GridDhtPartitionSupplier { ClusterNode node = cctx.discovery().node(id); + if (node == null) + return; //Context will be cleaned at topology change. + try { SupplyContext sctx; @@ -233,7 +236,7 @@ class GridDhtPartitionSupplier { boolean newReq = true; - long maxBatchesCnt = cctx.config().getRebalanceBatchesCount(); + long maxBatchesCnt = cctx.config().getRebalanceBatchesPrefetchCount(); if (sctx != null) { phase = sctx.phase; http://git-wip-us.apache.org/repos/asf/ignite/blob/7316e4a4/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/rebalancing/GridCacheRebalancingSyncSelfTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/rebalancing/GridCacheRebalancingSyncSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/rebalancing/GridCacheRebalancingSyncSelfTest.java index c866a1d..6a7f701 100644 --- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/rebalancing/GridCacheRebalancingSyncSelfTest.java +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/rebalancing/GridCacheRebalancingSyncSelfTest.java @@ -84,7 +84,7 @@ public class GridCacheRebalancingSyncSelfTest extends GridCommonAbstractTest { cachePCfg.setRebalanceMode(CacheRebalanceMode.SYNC); cachePCfg.setBackups(1); cachePCfg.setRebalanceBatchSize(1); - cachePCfg.setRebalanceBatchesCount(1); + cachePCfg.setRebalanceBatchesPrefetchCount(1); cachePCfg.setRebalanceOrder(2); CacheConfiguration<Integer, Integer> cachePCfg2 = new CacheConfiguration<>(); @@ -102,7 +102,7 @@ public class GridCacheRebalancingSyncSelfTest extends GridCommonAbstractTest { cacheRCfg.setCacheMode(CacheMode.REPLICATED); cacheRCfg.setRebalanceMode(CacheRebalanceMode.SYNC); cacheRCfg.setRebalanceBatchSize(1); - cacheRCfg.setRebalanceBatchesCount(Integer.MAX_VALUE); + cacheRCfg.setRebalanceBatchesPrefetchCount(Integer.MAX_VALUE); ((TcpCommunicationSpi)iCfg.getCommunicationSpi()).setSharedMemoryPort(-1);//Shmem fail fix for Integer.MAX_VALUE. CacheConfiguration<Integer, Integer> cacheRCfg2 = new CacheConfiguration<>();
