Repository: ignite Updated Branches: refs/heads/ignite-1093-2 18952f146 -> 049918089
1093 Project: http://git-wip-us.apache.org/repos/asf/ignite/repo Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/04991808 Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/04991808 Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/04991808 Branch: refs/heads/ignite-1093-2 Commit: 04991808972fb23e48b685552483b9eea94a72e2 Parents: 18952f1 Author: Anton Vinogradov <[email protected]> Authored: Mon Sep 7 16:30:21 2015 +0300 Committer: Anton Vinogradov <[email protected]> Committed: Mon Sep 7 16:30:21 2015 +0300 ---------------------------------------------------------------------- .../dht/preloader/GridDhtPartitionDemander.java | 14 +++- .../GridCacheRebalancingAsyncSelfTest.java | 19 ++--- .../GridCacheRebalancingSyncSelfTest.java | 85 +++++++++++++++----- 3 files changed, 87 insertions(+), 31 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/ignite/blob/04991808/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionDemander.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionDemander.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionDemander.java index 4d575d0..b260501 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionDemander.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionDemander.java @@ -258,7 +258,17 @@ public class GridDhtPartitionDemander { log.debug("Waiting for marshaller cache preload [cacheName=" + cctx.name() + ']'); try { - IgniteInternalFuture fut = cctx.kernalContext().cache().marshallerCache().preloader().syncFuture(); + IgniteInternalFuture fut; + do { + fut = cctx.kernalContext().cache().marshallerCache().preloader().syncFuture(); + } + while (!((SyncFuture)fut).isInited() || ((SyncFuture)fut).topologyVersion().topologyVersion() < cSF.topologyVersion().topologyVersion()); + + if (((SyncFuture)fut).topologyVersion().topologyVersion() > cSF.topologyVersion().topologyVersion()) { + cSF.onCancel(); + + return; + } if (!topologyChanged(topVer)) fut.get(); @@ -363,7 +373,7 @@ public class GridDhtPartitionDemander { AffinityTopologyVersion topVer = fut.topologyVersion(); for (Map.Entry<ClusterNode, GridDhtPartitionDemandMessage> e : assigns.entrySet()) { - if (topologyChanged(topVer)) { + if (topologyChanged(topVer) || Thread.interrupted()) { fut.onCancel(); return; http://git-wip-us.apache.org/repos/asf/ignite/blob/04991808/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/rebalancing/GridCacheRebalancingAsyncSelfTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/rebalancing/GridCacheRebalancingAsyncSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/rebalancing/GridCacheRebalancingAsyncSelfTest.java index a17fc7a..3a0c9d8 100644 --- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/rebalancing/GridCacheRebalancingAsyncSelfTest.java +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/rebalancing/GridCacheRebalancingAsyncSelfTest.java @@ -17,10 +17,11 @@ package org.apache.ignite.internal.processors.cache.distributed.rebalancing; -import org.apache.ignite.*; -import org.apache.ignite.cache.*; -import org.apache.ignite.configuration.*; -import org.apache.ignite.spi.discovery.tcp.*; +import org.apache.ignite.Ignite; +import org.apache.ignite.cache.CacheRebalanceMode; +import org.apache.ignite.configuration.CacheConfiguration; +import org.apache.ignite.configuration.IgniteConfiguration; +import org.apache.ignite.spi.discovery.tcp.TcpDiscoverySpi; /** * @@ -30,13 +31,9 @@ public class GridCacheRebalancingAsyncSelfTest extends GridCacheRebalancingSyncS @Override protected IgniteConfiguration getConfiguration(String gridName) throws Exception { IgniteConfiguration iCfg = super.getConfiguration(gridName); - CacheConfiguration cacheCfg = iCfg.getCacheConfiguration()[0]; - - cacheCfg.setRebalanceMode(CacheRebalanceMode.ASYNC); - - cacheCfg = iCfg.getCacheConfiguration()[1]; - - cacheCfg.setRebalanceMode(CacheRebalanceMode.ASYNC); + for (CacheConfiguration cacheCfg : iCfg.getCacheConfiguration()) { + cacheCfg.setRebalanceMode(CacheRebalanceMode.ASYNC); + } iCfg.setDiscoverySpi(new FailableTcpDiscoverySpi()); http://git-wip-us.apache.org/repos/asf/ignite/blob/04991808/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/rebalancing/GridCacheRebalancingSyncSelfTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/rebalancing/GridCacheRebalancingSyncSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/rebalancing/GridCacheRebalancingSyncSelfTest.java index bd1bf28..4be4852 100644 --- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/rebalancing/GridCacheRebalancingSyncSelfTest.java +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/rebalancing/GridCacheRebalancingSyncSelfTest.java @@ -17,19 +17,24 @@ package org.apache.ignite.internal.processors.cache.distributed.rebalancing; -import org.apache.ignite.*; -import org.apache.ignite.cache.*; -import org.apache.ignite.configuration.*; -import org.apache.ignite.internal.*; -import org.apache.ignite.internal.processors.cache.*; -import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.*; -import org.apache.ignite.spi.discovery.tcp.*; -import org.apache.ignite.spi.discovery.tcp.internal.*; -import org.apache.ignite.spi.discovery.tcp.ipfinder.*; -import org.apache.ignite.spi.discovery.tcp.ipfinder.vm.*; -import org.apache.ignite.testframework.junits.common.*; - -import java.util.*; +import java.util.HashMap; +import java.util.Map; +import org.apache.ignite.Ignite; +import org.apache.ignite.IgniteCheckedException; +import org.apache.ignite.IgniteDataStreamer; +import org.apache.ignite.cache.CacheMode; +import org.apache.ignite.cache.CacheRebalanceMode; +import org.apache.ignite.configuration.CacheConfiguration; +import org.apache.ignite.configuration.IgniteConfiguration; +import org.apache.ignite.internal.IgniteNodeAttributes; +import org.apache.ignite.internal.processors.cache.GridCacheAdapter; +import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPartitionDemander; +import org.apache.ignite.internal.util.typedef.internal.U; +import org.apache.ignite.spi.discovery.tcp.TcpDiscoverySpi; +import org.apache.ignite.spi.discovery.tcp.internal.TcpDiscoveryNode; +import org.apache.ignite.spi.discovery.tcp.ipfinder.TcpDiscoveryIpFinder; +import org.apache.ignite.spi.discovery.tcp.ipfinder.vm.TcpDiscoveryVmIpFinder; +import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest; /** * @@ -193,11 +198,55 @@ public class GridCacheRebalancingSyncSelfTest extends GridCommonAbstractTest { long start = System.currentTimeMillis(); - //will be started simultaneously in case of ASYNC mode - startGrid(1); - startGrid(2); - startGrid(3); - startGrid(4); + new Thread(){ + @Override public void run() { + try { + startGrid(1); + } + catch (Exception e) { + e.printStackTrace(); + } + } + }.start(); + + U.sleep(500); + + new Thread(){ + @Override public void run() { + try { + startGrid(2); + } + catch (Exception e) { + e.printStackTrace(); + } + } + }.start();// Should cancel current rebalancing. + + U.sleep(500); + + new Thread(){ + @Override public void run() { + try { + startGrid(3); + } + catch (Exception e) { + e.printStackTrace(); + } + } + }.start();// Should cancel current rebalancing. + + U.sleep(500); + + new Thread(){ + @Override public void run() { + try { + startGrid(4); + } + catch (Exception e) { + e.printStackTrace(); + } + } + }.start();// Should cancel current rebalancing. //wait until cache rebalanced in async mode waitForRebalancing(1, 5);
