Repository: ignite Updated Branches: refs/heads/ignite-1093-2 994372d40 -> b53a525d3
1093 Project: http://git-wip-us.apache.org/repos/asf/ignite/repo Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/4d5201cb Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/4d5201cb Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/4d5201cb Branch: refs/heads/ignite-1093-2 Commit: 4d5201cbc70036d9b62a19b436336f9b940f04ca Parents: 994372d Author: Anton Vinogradov <[email protected]> Authored: Tue Oct 6 15:34:07 2015 +0300 Committer: Anton Vinogradov <[email protected]> Committed: Tue Oct 6 15:34:07 2015 +0300 ---------------------------------------------------------------------- .../GridCacheRebalancingSyncSelfTest.java | 86 ++++++++++++++------ 1 file changed, 62 insertions(+), 24 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/ignite/blob/4d5201cb/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/rebalancing/GridCacheRebalancingSyncSelfTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/rebalancing/GridCacheRebalancingSyncSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/rebalancing/GridCacheRebalancingSyncSelfTest.java index 526f3f0..79966e2 100644 --- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/rebalancing/GridCacheRebalancingSyncSelfTest.java +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/rebalancing/GridCacheRebalancingSyncSelfTest.java @@ -65,6 +65,9 @@ public class GridCacheRebalancingSyncSelfTest extends GridCommonAbstractTest { /** */ private volatile boolean concurrentStartFinished2; + /** */ + private volatile boolean concurrentStartFinished3; + /** {@inheritDoc} */ @Override protected IgniteConfiguration getConfiguration(String gridName) throws Exception { IgniteConfiguration iCfg = super.getConfiguration(gridName); @@ -115,23 +118,25 @@ public class GridCacheRebalancingSyncSelfTest extends GridCommonAbstractTest { /** * @param ignite Ignite. */ - protected void generateData(Ignite ignite, int from) { - generateData(ignite, CACHE_NAME_DHT_PARTITIONED, from); - generateData(ignite, CACHE_NAME_DHT_PARTITIONED_2, from); - generateData(ignite, CACHE_NAME_DHT_REPLICATED, from); - generateData(ignite, CACHE_NAME_DHT_REPLICATED_2, from); + protected void generateData(Ignite ignite, int from, int iter) { + generateData(ignite, CACHE_NAME_DHT_PARTITIONED, from, iter); + generateData(ignite, CACHE_NAME_DHT_PARTITIONED_2, from, iter); + generateData(ignite, CACHE_NAME_DHT_REPLICATED, from, iter); + generateData(ignite, CACHE_NAME_DHT_REPLICATED_2, from, iter); } /** * @param ignite Ignite. */ - protected void generateData(Ignite ignite, String name, int from) { + protected void generateData(Ignite ignite, String name, int from, int iter) { try (IgniteDataStreamer<Integer, Integer> stmr = ignite.dataStreamer(name)) { + stmr.allowOverwrite(true); + for (int i = from; i < from + TEST_SIZE; i++) { if (i % (TEST_SIZE / 10) == 0) log.info("Prepared " + i * 100 / (TEST_SIZE) + "% entries (" + TEST_SIZE + ")."); - stmr.addData(i, i + name.hashCode()); + stmr.addData(i, i + name.hashCode() + iter); } stmr.flush(); @@ -142,11 +147,11 @@ public class GridCacheRebalancingSyncSelfTest extends GridCommonAbstractTest { * @param ignite Ignite. * @throws IgniteCheckedException Exception. */ - protected void checkData(Ignite ignite, int from) throws IgniteCheckedException { - checkData(ignite, CACHE_NAME_DHT_PARTITIONED, from); - checkData(ignite, CACHE_NAME_DHT_PARTITIONED_2, from); - checkData(ignite, CACHE_NAME_DHT_REPLICATED, from); - checkData(ignite, CACHE_NAME_DHT_REPLICATED_2, from); + protected void checkData(Ignite ignite, int from, int iter) throws IgniteCheckedException { + checkData(ignite, CACHE_NAME_DHT_PARTITIONED, from, iter); + checkData(ignite, CACHE_NAME_DHT_PARTITIONED_2, from, iter); + checkData(ignite, CACHE_NAME_DHT_REPLICATED, from, iter); + checkData(ignite, CACHE_NAME_DHT_REPLICATED_2, from, iter); } /** @@ -154,13 +159,13 @@ public class GridCacheRebalancingSyncSelfTest extends GridCommonAbstractTest { * @param name Cache name. * @throws IgniteCheckedException Exception. */ - protected void checkData(Ignite ignite, String name, int from) throws IgniteCheckedException { + protected void checkData(Ignite ignite, String name, int from, int iter) throws IgniteCheckedException { for (int i = from; i < from + TEST_SIZE; i++) { if (i % (TEST_SIZE / 10) == 0) log.info("<" + name + "> Checked " + i * 100 / (TEST_SIZE) + "% entries (" + TEST_SIZE + ")."); - assert ignite.cache(name).get(i) != null && ignite.cache(name).get(i).equals(i + name.hashCode()) : - i + " value " + (i + name.hashCode()) + " does not match (" + ignite.cache(name).get(i) + ")"; + assert ignite.cache(name).get(i) != null && ignite.cache(name).get(i).equals(i + name.hashCode() + iter) : + i + " value " + (i + name.hashCode() + iter) + " does not match (" + ignite.cache(name).get(i) + ")"; } } @@ -170,7 +175,7 @@ public class GridCacheRebalancingSyncSelfTest extends GridCommonAbstractTest { public void testSimpleRebalancing() throws Exception { Ignite ignite = startGrid(0); - generateData(ignite, 0); + generateData(ignite, 0, 0); log.info("Preloading started."); @@ -196,7 +201,7 @@ public class GridCacheRebalancingSyncSelfTest extends GridCommonAbstractTest { long spend = (System.currentTimeMillis() - start) / 1000; - checkData(grid(1), 0); + checkData(grid(1), 0, 0); log.info("Spend " + spend + " seconds to rebalance entries."); @@ -254,6 +259,9 @@ public class GridCacheRebalancingSyncSelfTest extends GridCommonAbstractTest { testComplexRebalancing(); U.sleep(5000); + + System.gc(); + } } @@ -265,9 +273,9 @@ public class GridCacheRebalancingSyncSelfTest extends GridCommonAbstractTest { * @throws Exception */ public void testComplexRebalancing() throws Exception { - Ignite ignite = startGrid(0); + final Ignite ignite = startGrid(0); - generateData(ignite, 0); + generateData(ignite, 0, 0); log.info("Preloading started."); @@ -275,6 +283,7 @@ public class GridCacheRebalancingSyncSelfTest extends GridCommonAbstractTest { concurrentStartFinished = false; concurrentStartFinished2 = false; + concurrentStartFinished3 = false; Thread t1 = new Thread() { @Override public void run() { @@ -286,6 +295,8 @@ public class GridCacheRebalancingSyncSelfTest extends GridCommonAbstractTest { U.sleep(10); } + awaitPartitionMapExchange(); + //New cache should start rebalancing. CacheConfiguration<Integer, Integer> cacheRCfg = new CacheConfiguration<>(); @@ -295,6 +306,10 @@ public class GridCacheRebalancingSyncSelfTest extends GridCommonAbstractTest { grid(0).getOrCreateCache(cacheRCfg); + while (!concurrentStartFinished3) { + U.sleep(10); + } + concurrentStartFinished = true; } catch (Exception e) { @@ -317,18 +332,39 @@ public class GridCacheRebalancingSyncSelfTest extends GridCommonAbstractTest { } }; + Thread t3 = new Thread() { + @Override public void run() { + generateData(ignite, 0, 1); + + concurrentStartFinished3 = true; + } + }; + t1.start(); t2.start();// Should cancel t1 rebalancing. + t3.start(); t1.join(); t2.join(); + t3.join(); waitForRebalancing(1, 5, 1); waitForRebalancing(2, 5, 1); waitForRebalancing(3, 5, 1); waitForRebalancing(4, 5, 1); - checkData(grid(4), 0); + checkData(grid(4), 0, 1); + + final Ignite ignite3 = grid(3); + + Thread t4 = new Thread() { + @Override public void run() { + generateData(ignite3, 0, 2); + + } + }; + + t4.start(); stopGrid(0); @@ -348,13 +384,15 @@ public class GridCacheRebalancingSyncSelfTest extends GridCommonAbstractTest { waitForRebalancing(3, 8); waitForRebalancing(4, 8); + t4.join(); + stopGrid(3); waitForRebalancing(4, 9); long spend = (System.currentTimeMillis() - start) / 1000; - checkData(grid(4), 0); + checkData(grid(4), 0, 2); log.info("Spend " + spend + " seconds to rebalance entries."); @@ -373,7 +411,7 @@ public class GridCacheRebalancingSyncSelfTest extends GridCommonAbstractTest { ((TcpDiscoveryNode)ignite.cluster().localNode()).setAttributes(map); - generateData(ignite, 0); + generateData(ignite, 0, 0); startGrid(1); @@ -381,7 +419,7 @@ public class GridCacheRebalancingSyncSelfTest extends GridCommonAbstractTest { stopGrid(0); - checkData(grid(1), 0); + checkData(grid(1), 0, 0); stopAllGrids(); } @@ -392,7 +430,7 @@ public class GridCacheRebalancingSyncSelfTest extends GridCommonAbstractTest { public void testNodeFailedAtRebalancing() throws Exception { Ignite ignite = startGrid(0); - generateData(ignite, 0); + generateData(ignite, 0, 0); log.info("Preloading started.");
