1093
Project: http://git-wip-us.apache.org/repos/asf/ignite/repo Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/7d51f61c Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/7d51f61c Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/7d51f61c Branch: refs/heads/ignite-1093-2 Commit: 7d51f61c29a96475e6662484a6dae474b3f8f609 Parents: ff0e2e1 Author: Anton Vinogradov <[email protected]> Authored: Fri Sep 18 12:18:43 2015 +0300 Committer: Anton Vinogradov <[email protected]> Committed: Fri Sep 18 12:18:43 2015 +0300 ---------------------------------------------------------------------- .../configuration/CacheConfiguration.java | 2 +- .../dht/preloader/GridDhtPartitionDemander.java | 1 + .../dht/preloader/GridDhtPartitionSupplier.java | 2 + .../GridCacheRebalancingSyncSelfTest.java | 48 +++++++++++--------- 4 files changed, 30 insertions(+), 23 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/ignite/blob/7d51f61c/modules/core/src/main/java/org/apache/ignite/configuration/CacheConfiguration.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/configuration/CacheConfiguration.java b/modules/core/src/main/java/org/apache/ignite/configuration/CacheConfiguration.java index e77b540..92d9ab1 100644 --- a/modules/core/src/main/java/org/apache/ignite/configuration/CacheConfiguration.java +++ b/modules/core/src/main/java/org/apache/ignite/configuration/CacheConfiguration.java @@ -1084,7 +1084,7 @@ public class CacheConfiguration<K, V> extends MutableConfiguration<K, V> { * @return {@code this} for chaining. */ public CacheConfiguration<K, V> setRebalanceBatchSize(int rebalanceBatchSize) { - this.rebalanceBatchSize = rebalanceBatchSize; + this.rebalanceBatchSize = Math.max(1, rebalanceBatchSize); return this; } http://git-wip-us.apache.org/repos/asf/ignite/blob/7d51f61c/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionDemander.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionDemander.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionDemander.java index fbe57dc..3c5a2f2 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionDemander.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionDemander.java @@ -409,6 +409,7 @@ public class GridDhtPartitionDemander { GridDhtPartitionDemandMessage initD = new GridDhtPartitionDemandMessage(d, sParts.get(cnt)); initD.topic(GridCachePartitionExchangeManager.rebalanceTopic(cnt)); + try { if (!topologyChanged(topVer)) cctx.io().sendOrderedMessage(node, GridCachePartitionExchangeManager.rebalanceTopic(cnt), initD, cctx.ioPolicy(), d.timeout()); http://git-wip-us.apache.org/repos/asf/ignite/blob/7d51f61c/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionSupplier.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionSupplier.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionSupplier.java index fb9f796..ee01158 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionSupplier.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionSupplier.java @@ -509,6 +509,8 @@ class GridDhtPartitionSupplier { } } + scMap.remove(scId); + reply(node, d, s); } catch (IgniteCheckedException e) { http://git-wip-us.apache.org/repos/asf/ignite/blob/7d51f61c/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/rebalancing/GridCacheRebalancingSyncSelfTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/rebalancing/GridCacheRebalancingSyncSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/rebalancing/GridCacheRebalancingSyncSelfTest.java index 3366381..39f5d4b 100644 --- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/rebalancing/GridCacheRebalancingSyncSelfTest.java +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/rebalancing/GridCacheRebalancingSyncSelfTest.java @@ -95,6 +95,8 @@ public class GridCacheRebalancingSyncSelfTest extends GridCommonAbstractTest { cachePCfg.setCacheMode(CacheMode.PARTITIONED); cachePCfg.setRebalanceMode(CacheRebalanceMode.SYNC); cachePCfg.setBackups(1); + cachePCfg.setRebalanceBatchSize(1); + cachePCfg.setRebalanceBatchesCount(1); CacheConfiguration<Integer, Integer> cachePCfg2 = new CacheConfiguration<>(); @@ -108,6 +110,8 @@ public class GridCacheRebalancingSyncSelfTest extends GridCommonAbstractTest { cacheRCfg.setName(CACHE_NAME_DHT_REPLICATED); cacheRCfg.setCacheMode(CacheMode.REPLICATED); cacheRCfg.setRebalanceMode(CacheRebalanceMode.SYNC); + cachePCfg.setRebalanceBatchSize(1); + cachePCfg.setRebalanceBatchesCount(Integer.MAX_VALUE); CacheConfiguration<Integer, Integer> cacheRCfg2 = new CacheConfiguration<>(); @@ -123,19 +127,19 @@ public class GridCacheRebalancingSyncSelfTest extends GridCommonAbstractTest { /** * @param ignite Ignite. */ - protected void generateData(Ignite ignite) { - generateData(ignite, CACHE_NAME_DHT_PARTITIONED); - generateData(ignite, CACHE_NAME_DHT_PARTITIONED_2); - generateData(ignite, CACHE_NAME_DHT_REPLICATED); - generateData(ignite, CACHE_NAME_DHT_REPLICATED_2); + protected void generateData(Ignite ignite, int from) { + generateData(ignite, CACHE_NAME_DHT_PARTITIONED, from); + generateData(ignite, CACHE_NAME_DHT_PARTITIONED_2, from); + generateData(ignite, CACHE_NAME_DHT_REPLICATED, from); + generateData(ignite, CACHE_NAME_DHT_REPLICATED_2, from); } /** * @param ignite Ignite. */ - protected void generateData(Ignite ignite, String name) { + protected void generateData(Ignite ignite, String name, int from) { try (IgniteDataStreamer<Integer, Integer> stmr = ignite.dataStreamer(name)) { - for (int i = 0; i < TEST_SIZE; i++) { + for (int i = from; i < from + TEST_SIZE; i++) { if (i % (TEST_SIZE / 10) == 0) log.info("Prepared " + i * 100 / (TEST_SIZE) + "% entries (" + TEST_SIZE + ")."); @@ -150,11 +154,11 @@ public class GridCacheRebalancingSyncSelfTest extends GridCommonAbstractTest { * @param ignite Ignite. * @throws IgniteCheckedException Exception. */ - protected void checkData(Ignite ignite) throws IgniteCheckedException { - checkData(ignite, CACHE_NAME_DHT_PARTITIONED); - checkData(ignite, CACHE_NAME_DHT_PARTITIONED_2); - checkData(ignite, CACHE_NAME_DHT_REPLICATED); - checkData(ignite, CACHE_NAME_DHT_REPLICATED_2); + protected void checkData(Ignite ignite, int from) throws IgniteCheckedException { + checkData(ignite, CACHE_NAME_DHT_PARTITIONED, from); + checkData(ignite, CACHE_NAME_DHT_PARTITIONED_2, from); + checkData(ignite, CACHE_NAME_DHT_REPLICATED, from); + checkData(ignite, CACHE_NAME_DHT_REPLICATED_2, from); } /** @@ -162,13 +166,13 @@ public class GridCacheRebalancingSyncSelfTest extends GridCommonAbstractTest { * @param name Cache name. * @throws IgniteCheckedException Exception. */ - protected void checkData(Ignite ignite, String name) throws IgniteCheckedException { - for (int i = 0; i < TEST_SIZE; i++) { + protected void checkData(Ignite ignite, String name, int from) throws IgniteCheckedException { + for (int i = from; i < from + TEST_SIZE; i++) { if (i % (TEST_SIZE / 10) == 0) log.info("Checked " + i * 100 / (TEST_SIZE) + "% entries (" + TEST_SIZE + ")."); assert ignite.cache(name).get(i) != null && ignite.cache(name).get(i).equals(i + name.hashCode()) : - "value " + i + name.hashCode() + " does not match (" + ignite.cache(name).get(i) + ")"; + "value " + (i + name.hashCode()) + " does not match (" + ignite.cache(name).get(i) + ")"; } } @@ -178,7 +182,7 @@ public class GridCacheRebalancingSyncSelfTest extends GridCommonAbstractTest { public void testSimpleRebalancing() throws Exception { Ignite ignite = startGrid(0); - generateData(ignite); + generateData(ignite, 0); log.info("Preloading started."); @@ -204,7 +208,7 @@ public class GridCacheRebalancingSyncSelfTest extends GridCommonAbstractTest { long spend = (System.currentTimeMillis() - start) / 1000; - checkData(grid(1)); + checkData(grid(1), 0); log.info("Spend " + spend + " seconds to rebalance entries."); @@ -260,7 +264,7 @@ public class GridCacheRebalancingSyncSelfTest extends GridCommonAbstractTest { public void testComplexRebalancing() throws Exception { Ignite ignite = startGrid(0); - generateData(ignite); + generateData(ignite, 0); log.info("Preloading started."); @@ -351,7 +355,7 @@ public class GridCacheRebalancingSyncSelfTest extends GridCommonAbstractTest { long spend = (System.currentTimeMillis() - start) / 1000; - checkData(grid(4)); + checkData(grid(4), 0); log.info("Spend " + spend + " seconds to rebalance entries."); @@ -370,7 +374,7 @@ public class GridCacheRebalancingSyncSelfTest extends GridCommonAbstractTest { ((TcpDiscoveryNode)ignite.cluster().localNode()).setAttributes(map); - generateData(ignite); + generateData(ignite, 0); startGrid(1); @@ -378,7 +382,7 @@ public class GridCacheRebalancingSyncSelfTest extends GridCommonAbstractTest { stopGrid(0); - checkData(grid(1)); + checkData(grid(1), 0); stopAllGrids(); } @@ -389,7 +393,7 @@ public class GridCacheRebalancingSyncSelfTest extends GridCommonAbstractTest { public void testNodeFailedAtRebalancing() throws Exception { Ignite ignite = startGrid(0); - generateData(ignite); + generateData(ignite, 0); log.info("Preloading started.");
