1093
Project: http://git-wip-us.apache.org/repos/asf/ignite/repo Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/eeb313c4 Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/eeb313c4 Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/eeb313c4 Branch: refs/heads/ignite-1093-2 Commit: eeb313c44980ac96e96890d823602716f815e098 Parents: b7e9179 Author: Anton Vinogradov <[email protected]> Authored: Fri Sep 25 14:01:41 2015 +0300 Committer: Anton Vinogradov <[email protected]> Committed: Fri Sep 25 14:01:41 2015 +0300 ---------------------------------------------------------------------- .../dht/preloader/GridDhtPartitionSupplier.java | 56 ++++++++++---------- .../dht/preloader/GridDhtPreloader.java | 15 ++++-- .../GridCacheRebalancingSyncSelfTest.java | 50 +++++++++-------- 3 files changed, 69 insertions(+), 52 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/ignite/blob/eeb313c4/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionSupplier.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionSupplier.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionSupplier.java index a4bd134..e23a50b 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionSupplier.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionSupplier.java @@ -24,6 +24,7 @@ import java.util.UUID; import org.apache.ignite.IgniteCheckedException; import org.apache.ignite.IgniteLogger; import org.apache.ignite.cluster.ClusterNode; +import org.apache.ignite.events.CacheRebalancingEvent; import org.apache.ignite.events.DiscoveryEvent; import org.apache.ignite.events.Event; import org.apache.ignite.internal.cluster.ClusterTopologyCheckedException; @@ -45,6 +46,7 @@ import org.apache.ignite.internal.util.typedef.internal.U; import org.apache.ignite.lang.IgnitePredicate; import org.jsr166.ConcurrentHashMap8; +import static org.apache.ignite.events.EventType.EVT_CACHE_REBALANCE_STOPPED; import static org.apache.ignite.events.EventType.EVT_NODE_FAILED; import static org.apache.ignite.events.EventType.EVT_NODE_JOINED; import static org.apache.ignite.events.EventType.EVT_NODE_LEFT; @@ -69,8 +71,9 @@ class GridDhtPartitionSupplier { /** Preload predicate. */ private IgnitePredicate<GridCacheEntryInfo> preloadPred; - /** Supply context map. */ - private final ConcurrentHashMap8<T4, SupplyContext> scMap = new ConcurrentHashMap8<>(); + /** Supply context map. T4: nodeId, idx, topologyVersion, updateSequence. */ + private final ConcurrentHashMap8<T4<UUID, Integer, AffinityTopologyVersion, Long>, SupplyContext> scMap = + new ConcurrentHashMap8<>(); /** Rebalancing listener. */ private GridLocalEventListener lsnr; @@ -97,7 +100,27 @@ class GridDhtPartitionSupplier { lsnr = new GridLocalEventListener() { @Override public void onEvent(Event evt) { if (evt instanceof DiscoveryEvent) { - clearContexts(scMap, log, cctx); + for (Map.Entry<T4<UUID, Integer, AffinityTopologyVersion, Long>, SupplyContext> entry : scMap.entrySet()) { + T4<UUID, Integer, AffinityTopologyVersion, Long> t = entry.getKey(); + + SupplyContext sc = entry.getValue(); + + if (t.get3() != null && !t.get3().equals(cctx.affinity().affinityTopologyVersion()) && sc != null) + clearContext(scMap, t, sc, log); + } + } + else if (evt instanceof CacheRebalancingEvent) { + CacheRebalancingEvent e = (CacheRebalancingEvent)evt; + + if (cctx.name().equals(e.cacheName())) { + UUID id = e.discoveryNode().id(); + + for (Map.Entry<T4<UUID, Integer, AffinityTopologyVersion, Long>, SupplyContext> entry : scMap.entrySet()) { + if (id.equals(entry.getKey().get1())) + clearContext(scMap, entry.getKey(), entry.getValue(), log); + } + + } } else { assert false; @@ -105,9 +128,7 @@ class GridDhtPartitionSupplier { } }; - //todo: rebalance stopped. - - cctx.events().addListener(lsnr, EVT_NODE_JOINED, EVT_NODE_LEFT, EVT_NODE_FAILED); + cctx.events().addListener(lsnr, EVT_NODE_JOINED, EVT_NODE_LEFT, EVT_NODE_FAILED, EVT_CACHE_REBALANCE_STOPPED); startOldListeners(); } @@ -122,25 +143,6 @@ class GridDhtPartitionSupplier { } /** - * Clear contexts. - * - * @param map Context map. - * @param log Logger. - * @param cctx Context. - */ - private static void clearContexts( - ConcurrentHashMap8<T4, SupplyContext> map, IgniteLogger log, GridCacheContext<?, ?> cctx) { - for (Map.Entry<T4, SupplyContext> entry : map.entrySet()) { - T4 t = entry.getKey(); - - SupplyContext sc = entry.getValue(); - - if (t.get3() != null && !t.get3().equals(cctx.affinity().affinityTopologyVersion()) && sc != null) - clearContext(map, t, sc, log); - } - } - - /** * Clear context. * * @param map Context map. @@ -150,8 +152,8 @@ class GridDhtPartitionSupplier { * @return true in case context was removed. */ private static boolean clearContext( - final ConcurrentHashMap8<T4, SupplyContext> map, - final T4 t, + final ConcurrentHashMap8<T4<UUID, Integer, AffinityTopologyVersion, Long>, SupplyContext> map, + final T4<UUID, Integer, AffinityTopologyVersion, Long> t, final SupplyContext sc, final IgniteLogger log) { final Iterator it = sc.entryIt; http://git-wip-us.apache.org/repos/asf/ignite/blob/eeb313c4/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPreloader.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPreloader.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPreloader.java index 30b5505..ec9b8e7 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPreloader.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPreloader.java @@ -27,7 +27,6 @@ import java.util.concurrent.locks.ReadWriteLock; import java.util.concurrent.locks.ReentrantReadWriteLock; import org.apache.ignite.IgniteCheckedException; import org.apache.ignite.cluster.ClusterNode; -import org.apache.ignite.configuration.CacheConfiguration; import org.apache.ignite.events.DiscoveryEvent; import org.apache.ignite.events.Event; import org.apache.ignite.internal.IgniteInternalFuture; @@ -60,13 +59,13 @@ import org.apache.ignite.lang.IgniteUuid; import org.jetbrains.annotations.Nullable; import org.jsr166.ConcurrentHashMap8; +import static org.apache.ignite.events.EventType.EVT_CACHE_REBALANCE_PART_DATA_LOST; import static org.apache.ignite.events.EventType.EVT_CACHE_REBALANCE_PART_UNLOADED; import static org.apache.ignite.events.EventType.EVT_NODE_FAILED; import static org.apache.ignite.events.EventType.EVT_NODE_JOINED; import static org.apache.ignite.events.EventType.EVT_NODE_LEFT; -import static org.apache.ignite.events.EventType.EVT_CACHE_REBALANCE_PART_DATA_LOST; -import static org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtPartitionState.MOVING; import static org.apache.ignite.internal.managers.communication.GridIoPolicy.AFFINITY_POOL; +import static org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtPartitionState.MOVING; import static org.apache.ignite.internal.util.GridConcurrentFactory.newMap; /** @@ -376,6 +375,7 @@ public class GridDhtPreloader extends GridCachePreloaderAdapter { /** {@inheritDoc} */ public void handleSupplyMessage(int idx, UUID id, final GridDhtPartitionSupplyMessageV2 s) { demandLock.readLock().lock(); + try { demander.handleSupplyMessage(idx, id, s); } @@ -391,7 +391,14 @@ public class GridDhtPreloader extends GridCachePreloaderAdapter { /** {@inheritDoc} */ @Override public void addAssignments(GridDhtPreloaderAssignments assignments, boolean forcePreload) throws IgniteCheckedException { - demander.addAssignments(assignments, forcePreload); + demandLock.writeLock().lock(); + + try { + demander.addAssignments(assignments, forcePreload); + } + finally { + demandLock.writeLock().unlock(); + } } /** http://git-wip-us.apache.org/repos/asf/ignite/blob/eeb313c4/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/rebalancing/GridCacheRebalancingSyncSelfTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/rebalancing/GridCacheRebalancingSyncSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/rebalancing/GridCacheRebalancingSyncSelfTest.java index 42c1857..be8e24b 100644 --- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/rebalancing/GridCacheRebalancingSyncSelfTest.java +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/rebalancing/GridCacheRebalancingSyncSelfTest.java @@ -169,7 +169,7 @@ public class GridCacheRebalancingSyncSelfTest extends GridCommonAbstractTest { protected void checkData(Ignite ignite, String name, int from) throws IgniteCheckedException { for (int i = from; i < from + TEST_SIZE; i++) { if (i % (TEST_SIZE / 10) == 0) - log.info("Checked " + i * 100 / (TEST_SIZE) + "% entries (" + TEST_SIZE + ")."); + log.info("<" + name + "> Checked " + i * 100 / (TEST_SIZE) + "% entries (" + TEST_SIZE + ")."); assert ignite.cache(name).get(i) != null && ignite.cache(name).get(i).equals(i + name.hashCode()) : i + " value " + (i + name.hashCode()) + " does not match (" + ignite.cache(name).get(i) + ")"; @@ -252,12 +252,27 @@ public class GridCacheRebalancingSyncSelfTest extends GridCommonAbstractTest { break; } - else - fut.get(); + else if (!fut.get()) { + finished = false; + + log.warning("Rebalancing finished with missed partitions."); + } } } } + private void test() throws Exception { + while (true) { + testComplexRebalancing(); + + U.sleep(5000); + } + } + + @Override protected long getTestTimeout() { + return Long.MAX_VALUE; + } + /** * @throws Exception */ @@ -270,7 +285,7 @@ public class GridCacheRebalancingSyncSelfTest extends GridCommonAbstractTest { long start = System.currentTimeMillis(); - new Thread() { + Thread t1 = new Thread() { @Override public void run() { try { startGrid(1); @@ -295,9 +310,9 @@ public class GridCacheRebalancingSyncSelfTest extends GridCommonAbstractTest { e.printStackTrace(); } } - }.start(); + }; - new Thread() { + Thread t2 = new Thread() { @Override public void run() { try { startGrid(3); @@ -309,50 +324,43 @@ public class GridCacheRebalancingSyncSelfTest extends GridCommonAbstractTest { e.printStackTrace(); } } - }.start();// Should cancel current rebalancing. + }; - while (!concurrentStartFinished || !concurrentStartFinished2) { - U.sleep(10); - } + t1.start(); + t2.start();// Should cancel t1 rebalancing. - //wait until cache rebalanced in async mode + t1.join(); + t2.join(); waitForRebalancing(1, 5, 1); waitForRebalancing(2, 5, 1); waitForRebalancing(3, 5, 1); waitForRebalancing(4, 5, 1); - //cache rebalanced in async node + checkData(grid(4), 0); stopGrid(0); - //wait until cache rebalanced waitForRebalancing(1, 6); waitForRebalancing(2, 6); waitForRebalancing(3, 6); waitForRebalancing(4, 6); - //cache rebalanced - stopGrid(1); - //wait until cache rebalanced waitForRebalancing(2, 7); waitForRebalancing(3, 7); waitForRebalancing(4, 7); - //cache rebalanced - stopGrid(2); - //wait until cache rebalanced waitForRebalancing(3, 8); waitForRebalancing(4, 8); - //cache rebalanced - stopGrid(3); + waitForRebalancing(4, 9); + long spend = (System.currentTimeMillis() - start) / 1000; checkData(grid(4), 0);
