Repository: ignite Updated Branches: refs/heads/ignite-1093-2 205b85c96 -> 09b683268
1093 Project: http://git-wip-us.apache.org/repos/asf/ignite/repo Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/09b68326 Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/09b68326 Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/09b68326 Branch: refs/heads/ignite-1093-2 Commit: 09b68326891d8e6032dbed43b041214cc407cc25 Parents: 205b85c Author: Anton Vinogradov <[email protected]> Authored: Mon Sep 14 12:04:14 2015 +0300 Committer: Anton Vinogradov <[email protected]> Committed: Mon Sep 14 12:04:14 2015 +0300 ---------------------------------------------------------------------- .../configuration/CacheConfiguration.java | 3 ++ .../configuration/IgniteConfiguration.java | 9 ++-- .../GridCachePartitionExchangeManager.java | 6 +-- .../dht/preloader/GridDhtPartitionDemander.java | 2 +- .../GridCacheRebalancingAsyncSelfTest.java | 44 -------------------- .../GridCacheRebalancingSyncSelfTest.java | 39 +++++++++++++++++ .../ignite-rebalancing-multicast-config.xml | 4 +- 7 files changed, 51 insertions(+), 56 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/ignite/blob/09b68326/modules/core/src/main/java/org/apache/ignite/configuration/CacheConfiguration.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/configuration/CacheConfiguration.java b/modules/core/src/main/java/org/apache/ignite/configuration/CacheConfiguration.java index 54e5e43..12e6a06 100644 --- a/modules/core/src/main/java/org/apache/ignite/configuration/CacheConfiguration.java +++ b/modules/core/src/main/java/org/apache/ignite/configuration/CacheConfiguration.java @@ -1824,6 +1824,9 @@ public class CacheConfiguration<K, V> extends MutableConfiguration<K, V> { } /** + * To gain better rebalancing performance supplier node can provide mode than one batch at start and provide + * one new to each next demand request. + * * Sets number of batches generated by supply node at rebalancing start. * * @param rebalanceBatchesCnt batches count. http://git-wip-us.apache.org/repos/asf/ignite/blob/09b68326/modules/core/src/main/java/org/apache/ignite/configuration/IgniteConfiguration.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/configuration/IgniteConfiguration.java b/modules/core/src/main/java/org/apache/ignite/configuration/IgniteConfiguration.java index cc0e275..6668fc1 100644 --- a/modules/core/src/main/java/org/apache/ignite/configuration/IgniteConfiguration.java +++ b/modules/core/src/main/java/org/apache/ignite/configuration/IgniteConfiguration.java @@ -1336,20 +1336,17 @@ public class IgniteConfiguration { return this; } - /** - * Gets count of available rebalancing threads. - * Half will be used for supplying and half for demanding of partitions. + * Gets Max count of threads can be used at rebalancing. * Minimum is 1. * @return count. */ public int getRebalanceThreadPoolSize(){ - return rebalanceThreadPoolSize; + return Math.max(1, rebalanceThreadPoolSize); } /** - * Sets count of available rebalancing threads. - * Half will be used for supplying and half for demanding of partitions. + * Sets Max count of threads can be used at rebalancing. * Minimum is 1. * @param size Size. * @return {@code this} for chaining. http://git-wip-us.apache.org/repos/asf/ignite/blob/09b68326/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java index bbab008..e3e2d53 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java @@ -317,7 +317,7 @@ public class GridCachePartitionExchangeManager<K, V> extends GridCacheSharedMana if (!cctx.kernalContext().clientNode()) { - for (int cnt = 0; cnt < Math.max(1, cctx.gridConfig().getRebalanceThreadPoolSize()); cnt++) { + for (int cnt = 0; cnt < cctx.gridConfig().getRebalanceThreadPoolSize(); cnt++) { final int idx = cnt; cctx.io().addOrderedHandler(rebalanceTopic(cnt), new CI2<UUID, GridCacheMessage>() { @@ -333,7 +333,7 @@ public class GridCachePartitionExchangeManager<K, V> extends GridCacheSharedMana cctx.cacheContext(m.cacheId).preloader().handleDemandMessage( id, (GridDhtPartitionDemandMessage)m); else - log.error("Unsupported message type " + m.getClass().getName()); + log.error("Unsupported message type: " + m.getClass().getName()); } finally { leaveBusy(); @@ -437,7 +437,7 @@ public class GridCachePartitionExchangeManager<K, V> extends GridCacheSharedMana for (AffinityReadyFuture f : readyFuts.values()) f.onDone(err); - for (int cnt = 0; cnt < Math.max(1, cctx.gridConfig().getRebalanceThreadPoolSize()); cnt++) { + for (int cnt = 0; cnt < cctx.gridConfig().getRebalanceThreadPoolSize(); cnt++) { cctx.io().removeOrderedHandler(rebalanceTopic(cnt)); } http://git-wip-us.apache.org/repos/asf/ignite/blob/09b68326/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionDemander.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionDemander.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionDemander.java index bf2f36a..b902fed 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionDemander.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionDemander.java @@ -401,7 +401,7 @@ public class GridDhtPartitionDemander { fut.append(node.id(), remainings); - int lsnrCnt = Math.max(1, cctx.gridConfig().getRebalanceThreadPoolSize()); + int lsnrCnt = cctx.gridConfig().getRebalanceThreadPoolSize(); List<Set<Integer>> sParts = new ArrayList<>(lsnrCnt); http://git-wip-us.apache.org/repos/asf/ignite/blob/09b68326/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/rebalancing/GridCacheRebalancingAsyncSelfTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/rebalancing/GridCacheRebalancingAsyncSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/rebalancing/GridCacheRebalancingAsyncSelfTest.java index 3a0c9d8..f1ae72e 100644 --- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/rebalancing/GridCacheRebalancingAsyncSelfTest.java +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/rebalancing/GridCacheRebalancingAsyncSelfTest.java @@ -17,11 +17,9 @@ package org.apache.ignite.internal.processors.cache.distributed.rebalancing; -import org.apache.ignite.Ignite; import org.apache.ignite.cache.CacheRebalanceMode; import org.apache.ignite.configuration.CacheConfiguration; import org.apache.ignite.configuration.IgniteConfiguration; -import org.apache.ignite.spi.discovery.tcp.TcpDiscoverySpi; /** * @@ -35,48 +33,6 @@ public class GridCacheRebalancingAsyncSelfTest extends GridCacheRebalancingSyncS cacheCfg.setRebalanceMode(CacheRebalanceMode.ASYNC); } - iCfg.setDiscoverySpi(new FailableTcpDiscoverySpi()); - - ((TcpDiscoverySpi)iCfg.getDiscoverySpi()).setIpFinder(ipFinder); - ((TcpDiscoverySpi)iCfg.getDiscoverySpi()).setForceServerMode(true); - - if (getTestGridName(20).equals(gridName)) - spi = (FailableTcpDiscoverySpi)iCfg.getDiscoverySpi(); - return iCfg; } - - public static class FailableTcpDiscoverySpi extends TcpDiscoverySpi { - public void fail() { - simulateNodeFailure(); - } - } - - private volatile FailableTcpDiscoverySpi spi; - - /** - * @throws Exception - */ - public void testNodeFailedAtRebalancing() throws Exception { - Ignite ignite = startGrid(0); - - generateData(ignite); - - log.info("Preloading started."); - - startGrid(1); - - waitForRebalancing(1, 2); - - startGrid(20); - - waitForRebalancing(20, 3); - - spi.fail(); - - waitForRebalancing(0, 4); - waitForRebalancing(1, 4); - - stopAllGrids(); - } } \ No newline at end of file http://git-wip-us.apache.org/repos/asf/ignite/blob/09b68326/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/rebalancing/GridCacheRebalancingSyncSelfTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/rebalancing/GridCacheRebalancingSyncSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/rebalancing/GridCacheRebalancingSyncSelfTest.java index c299a99..bae2c7d 100644 --- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/rebalancing/GridCacheRebalancingSyncSelfTest.java +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/rebalancing/GridCacheRebalancingSyncSelfTest.java @@ -63,6 +63,14 @@ public class GridCacheRebalancingSyncSelfTest extends GridCommonAbstractTest { /** */ private volatile boolean concurrentStartFinished2 = false; + private volatile FailableTcpDiscoverySpi spi; + + public static class FailableTcpDiscoverySpi extends TcpDiscoverySpi { + public void fail() { + simulateNodeFailure(); + } + } + /** {@inheritDoc} */ @Override protected long getTestTimeout() { return Long.MAX_VALUE; @@ -72,6 +80,11 @@ public class GridCacheRebalancingSyncSelfTest extends GridCommonAbstractTest { @Override protected IgniteConfiguration getConfiguration(String gridName) throws Exception { IgniteConfiguration iCfg = super.getConfiguration(gridName); + iCfg.setDiscoverySpi(new FailableTcpDiscoverySpi()); + + if (getTestGridName(20).equals(gridName)) + spi = (FailableTcpDiscoverySpi)iCfg.getDiscoverySpi(); + ((TcpDiscoverySpi)iCfg.getDiscoverySpi()).setIpFinder(ipFinder); ((TcpDiscoverySpi)iCfg.getDiscoverySpi()).setForceServerMode(true); @@ -319,4 +332,30 @@ public class GridCacheRebalancingSyncSelfTest extends GridCommonAbstractTest { stopAllGrids(); } + + /** + * @throws Exception + */ + public void testNodeFailedAtRebalancing() throws Exception { + Ignite ignite = startGrid(0); + + generateData(ignite); + + log.info("Preloading started."); + + startGrid(1); + + waitForRebalancing(1, 2); + + startGrid(20); + + waitForRebalancing(20, 3); + + spi.fail(); + + waitForRebalancing(0, 4); + waitForRebalancing(1, 4); + + stopAllGrids(); + } } \ No newline at end of file http://git-wip-us.apache.org/repos/asf/ignite/blob/09b68326/modules/yardstick/config/ignite-rebalancing-multicast-config.xml ---------------------------------------------------------------------- diff --git a/modules/yardstick/config/ignite-rebalancing-multicast-config.xml b/modules/yardstick/config/ignite-rebalancing-multicast-config.xml index e16c351..ec8f4b0 100644 --- a/modules/yardstick/config/ignite-rebalancing-multicast-config.xml +++ b/modules/yardstick/config/ignite-rebalancing-multicast-config.xml @@ -159,9 +159,9 @@ </list> </property> - <property name="failureDetectionTimeout" value="1000"/> + <property name="failureDetectionTimeout" value="200"/> - <property name="metricsLogFrequency" value="200"/> + <property name="metricsLogFrequency" value="5000"/> <property name="warmupClosure" ref="warmupClosure"/>
