Repository: ignite Updated Branches: refs/heads/ignite-1093-2 d74cdbcc7 -> 205b85c96
1093 Project: http://git-wip-us.apache.org/repos/asf/ignite/repo Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/205b85c9 Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/205b85c9 Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/205b85c9 Branch: refs/heads/ignite-1093-2 Commit: 205b85c9668de5e92f5d26116cea99d026102b28 Parents: d74cdbc Author: Anton Vinogradov <[email protected]> Authored: Fri Sep 11 15:47:40 2015 +0300 Committer: Anton Vinogradov <[email protected]> Committed: Fri Sep 11 15:47:40 2015 +0300 ---------------------------------------------------------------------- .../GridCacheRebalancingSyncSelfTest.java | 153 ++++++++++--------- 1 file changed, 78 insertions(+), 75 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/ignite/blob/205b85c9/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/rebalancing/GridCacheRebalancingSyncSelfTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/rebalancing/GridCacheRebalancingSyncSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/rebalancing/GridCacheRebalancingSyncSelfTest.java index 4be4852..c299a99 100644 --- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/rebalancing/GridCacheRebalancingSyncSelfTest.java +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/rebalancing/GridCacheRebalancingSyncSelfTest.java @@ -43,13 +43,25 @@ public class GridCacheRebalancingSyncSelfTest extends GridCommonAbstractTest { /** */ protected static TcpDiscoveryIpFinder ipFinder = new TcpDiscoveryVmIpFinder(true); - private static int TEST_SIZE = 1_000_000; + private static int TEST_SIZE = 100_000; - /** cache name. */ - protected static String CACHE_NAME_DHT = "cache"; + /** partitioned cache name. */ + protected static String CACHE_NAME_DHT_PARTITIONED = "cacheP"; - /** cache 2 name. */ - protected static String CACHE_2_NAME_DHT = "cache2"; + /** partitioned cache 2 name. */ + protected static String CACHE_NAME_DHT_PARTITIONED_2 = "cacheP2"; + + /** replicated cache name. */ + protected static String CACHE_NAME_DHT_REPLICATED = "cacheR"; + + /** replicated cache 2 name. */ + protected static String CACHE_NAME_DHT_REPLICATED_2 = "cacheR2"; + + /** */ + private volatile boolean concurrentStartFinished = false; + + /** */ + private volatile boolean concurrentStartFinished2 = false; /** {@inheritDoc} */ @Override protected long getTestTimeout() { @@ -66,73 +78,80 @@ public class GridCacheRebalancingSyncSelfTest extends GridCommonAbstractTest { if (getTestGridName(10).equals(gridName)) iCfg.setClientMode(true); - CacheConfiguration<Integer, Integer> cacheCfg = new CacheConfiguration<>(); + CacheConfiguration<Integer, Integer> cachePCfg = new CacheConfiguration<>(); + + cachePCfg.setName(CACHE_NAME_DHT_PARTITIONED); + cachePCfg.setCacheMode(CacheMode.PARTITIONED); + cachePCfg.setRebalanceMode(CacheRebalanceMode.SYNC); + cachePCfg.setBackups(1); - cacheCfg.setName(CACHE_NAME_DHT); - cacheCfg.setCacheMode(CacheMode.PARTITIONED); - //cacheCfg.setRebalanceBatchSize(1024); - //cacheCfg.setRebalanceBatchesCount(1); - cacheCfg.setRebalanceMode(CacheRebalanceMode.SYNC); - cacheCfg.setBackups(1); + CacheConfiguration<Integer, Integer> cachePCfg2 = new CacheConfiguration<>(); - CacheConfiguration<Integer, Integer> cacheCfg2 = new CacheConfiguration<>(); + cachePCfg2.setName(CACHE_NAME_DHT_PARTITIONED_2); + cachePCfg2.setCacheMode(CacheMode.PARTITIONED); + cachePCfg2.setRebalanceMode(CacheRebalanceMode.SYNC); + cachePCfg2.setBackups(1); - cacheCfg2.setName(CACHE_2_NAME_DHT); - cacheCfg2.setCacheMode(CacheMode.PARTITIONED); - //cacheCfg2.setRebalanceBatchSize(1024); - //cacheCfg2.setRebalanceBatchesCount(1); - cacheCfg2.setRebalanceMode(CacheRebalanceMode.SYNC); - cacheCfg2.setBackups(1); + CacheConfiguration<Integer, Integer> cacheRCfg = new CacheConfiguration<>(); + + cacheRCfg.setName(CACHE_NAME_DHT_REPLICATED); + cacheRCfg.setCacheMode(CacheMode.REPLICATED); + cacheRCfg.setRebalanceMode(CacheRebalanceMode.SYNC); + cacheRCfg.setBackups(1); + + CacheConfiguration<Integer, Integer> cacheRCfg2 = new CacheConfiguration<>(); + + cacheRCfg2.setName(CACHE_NAME_DHT_REPLICATED_2); + cacheRCfg2.setCacheMode(CacheMode.REPLICATED); + cacheRCfg2.setRebalanceMode(CacheRebalanceMode.SYNC); + cacheRCfg2.setBackups(1); iCfg.setRebalanceThreadPoolSize(4); - iCfg.setCacheConfiguration(cacheCfg, cacheCfg2); + iCfg.setCacheConfiguration(cachePCfg, cachePCfg2, cacheRCfg, cacheRCfg2); return iCfg; } + protected void generateData(Ignite ignite) { + generateData(ignite, CACHE_NAME_DHT_PARTITIONED); + generateData(ignite, CACHE_NAME_DHT_PARTITIONED_2); + generateData(ignite, CACHE_NAME_DHT_REPLICATED); + generateData(ignite, CACHE_NAME_DHT_REPLICATED_2); + } + /** * @param ignite Ignite. */ - protected void generateData(Ignite ignite) { - try (IgniteDataStreamer<Integer, Integer> stmr = ignite.dataStreamer(CACHE_NAME_DHT)) { + protected void generateData(Ignite ignite, String name) { + try (IgniteDataStreamer<Integer, Integer> stmr = ignite.dataStreamer(name)) { for (int i = 0; i < TEST_SIZE; i++) { - if (i % 1_000_000 == 0) - log.info("Prepared " + i / 1_000_000 + "m entries."); + if (i % (TEST_SIZE / 10) == 0) + log.info("Prepared " + i * 100 / (TEST_SIZE) + "% entries (" + TEST_SIZE + ")."); - stmr.addData(i, i); + stmr.addData(i, i + name.hashCode()); } stmr.flush(); } - try (IgniteDataStreamer<Integer, Integer> stmr = ignite.dataStreamer(CACHE_2_NAME_DHT)) { - for (int i = 0; i < TEST_SIZE; i++) { - if (i % 1_000_000 == 0) - log.info("Prepared " + i / 1_000_000 + "m entries."); - - stmr.addData(i, i + 3); - } + } - stmr.flush(); - } + protected void checkData(Ignite ignite) throws IgniteCheckedException { + checkData(ignite, CACHE_NAME_DHT_PARTITIONED); + checkData(ignite, CACHE_NAME_DHT_PARTITIONED_2); + checkData(ignite, CACHE_NAME_DHT_REPLICATED); + checkData(ignite, CACHE_NAME_DHT_REPLICATED_2); } /** * @param ignite Ignite. * @throws IgniteCheckedException */ - protected void checkData(Ignite ignite) throws IgniteCheckedException { - for (int i = 0; i < TEST_SIZE; i++) { - if (i % 1_000_000 == 0) - log.info("Checked " + i / 1_000_000 + "m entries."); - - assert ignite.cache(CACHE_NAME_DHT).get(i) != null && ignite.cache(CACHE_NAME_DHT).get(i).equals(i) : - "key " + i + " does not match (" + ignite.cache(CACHE_NAME_DHT).get(i) + ")"; - } + protected void checkData(Ignite ignite, String name) throws IgniteCheckedException { for (int i = 0; i < TEST_SIZE; i++) { - if (i % 1_000_000 == 0) - log.info("Checked " + i / 1_000_000 + "m entries."); + if (i % (TEST_SIZE / 10) == 0) + log.info("Checked " + i * 100 / (TEST_SIZE) + "% entries (" + TEST_SIZE + ")."); - assert ignite.cache(CACHE_2_NAME_DHT).get(i) != null && ignite.cache(CACHE_2_NAME_DHT).get(i).equals(i + 3) : - "key " + i + " does not match (" + ignite.cache(CACHE_2_NAME_DHT).get(i) + ")"; + assert ignite.cache(name).get(i) != null && ignite.cache(name).get(i).equals(i + name.hashCode()) : + "value " + i + name.hashCode() + " does not match (" + ignite.cache(name).get(i) + ")"; } } @@ -198,49 +217,27 @@ public class GridCacheRebalancingSyncSelfTest extends GridCommonAbstractTest { long start = System.currentTimeMillis(); - new Thread(){ + new Thread() { @Override public void run() { try { startGrid(1); - } - catch (Exception e) { - e.printStackTrace(); - } - } - }.start(); - - U.sleep(500); - - new Thread(){ - @Override public void run() { - try { startGrid(2); - } - catch (Exception e) { - e.printStackTrace(); - } - } - }.start();// Should cancel current rebalancing. - U.sleep(500); - - new Thread(){ - @Override public void run() { - try { - startGrid(3); + concurrentStartFinished = true; } catch (Exception e) { e.printStackTrace(); } } - }.start();// Should cancel current rebalancing. - - U.sleep(500); + }.start(); - new Thread(){ + new Thread() { @Override public void run() { try { + startGrid(3); startGrid(4); + + concurrentStartFinished2 = true; } catch (Exception e) { e.printStackTrace(); @@ -248,6 +245,10 @@ public class GridCacheRebalancingSyncSelfTest extends GridCommonAbstractTest { } }.start();// Should cancel current rebalancing. + while (!concurrentStartFinished || !concurrentStartFinished2) { + U.sleep(10); + } + //wait until cache rebalanced in async mode waitForRebalancing(1, 5); waitForRebalancing(2, 5); @@ -315,5 +316,7 @@ public class GridCacheRebalancingSyncSelfTest extends GridCommonAbstractTest { stopGrid(0); checkData(grid(1)); + + stopAllGrids(); } } \ No newline at end of file
