Repository: ignite Updated Branches: refs/heads/master d17ac69c4 -> ebd669e4c
IGNITE-8474 Fixed WalStateNodeLeaveExchangeTask preventing exchange merge - Fixes #3990. Signed-off-by: Alexey Goncharuk <alexey.goncha...@gmail.com> Project: http://git-wip-us.apache.org/repos/asf/ignite/repo Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/ebd669e4 Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/ebd669e4 Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/ebd669e4 Branch: refs/heads/master Commit: ebd669e4c53cfd66708ff18dd59071e4aace38ae Parents: d17ac69 Author: Sergey Chugunov <sergey.chugu...@gmail.com> Authored: Thu May 17 13:10:01 2018 +0300 Committer: Alexey Goncharuk <alexey.goncha...@gmail.com> Committed: Thu May 17 14:23:15 2018 +0300 ---------------------------------------------------------------------- .../GridCachePartitionExchangeManager.java | 115 ++++++++++++------- .../cache/WalStateNodeLeaveExchangeTask.java | 2 +- .../distributed/CacheExchangeMergeTest.java | 25 +++- .../ignite/testframework/GridTestUtils.java | 11 +- 4 files changed, 108 insertions(+), 45 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/ignite/blob/ebd669e4/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java index 28d5d20..c3a0add 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java @@ -217,6 +217,9 @@ public class GridCachePartitionExchangeManager<K, V> extends GridCacheSharedMana /** For tests only. */ private volatile AffinityTopologyVersion exchMergeTestWaitVer; + /** For tests only. */ + private volatile List mergedEvtsForTest; + /** Distributed latch manager. */ private ExchangeLatchManager latchMgr; @@ -1879,9 +1882,14 @@ public class GridCachePartitionExchangeManager<K, V> extends GridCacheSharedMana * For testing only. * * @param exchMergeTestWaitVer Version to wait for. + * @param mergedEvtsForTest List to collect discovery events with merged exchanges. */ - public void mergeExchangesTestWaitVersion(AffinityTopologyVersion exchMergeTestWaitVer) { + public void mergeExchangesTestWaitVersion( + AffinityTopologyVersion exchMergeTestWaitVer, + @Nullable List mergedEvtsForTest + ) { this.exchMergeTestWaitVer = exchMergeTestWaitVer; + this.mergedEvtsForTest = mergedEvtsForTest; } /** @@ -1968,46 +1976,8 @@ public class GridCachePartitionExchangeManager<K, V> extends GridCacheSharedMana AffinityTopologyVersion exchMergeTestWaitVer = this.exchMergeTestWaitVer; - if (exchMergeTestWaitVer != null) { - if (log.isInfoEnabled()) { - log.info("Exchange merge test, waiting for version [exch=" + curFut.initialVersion() + - ", waitVer=" + exchMergeTestWaitVer + ']'); - } - - long end = U.currentTimeMillis() + 10_000; - - while (U.currentTimeMillis() < end) { - boolean found = false; - - for (CachePartitionExchangeWorkerTask task : exchWorker.futQ) { - if (task instanceof GridDhtPartitionsExchangeFuture) { - GridDhtPartitionsExchangeFuture fut = (GridDhtPartitionsExchangeFuture)task; - - if (exchMergeTestWaitVer.equals(fut.initialVersion())) { - if (log.isInfoEnabled()) - log.info("Exchange merge test, found awaited version: " + exchMergeTestWaitVer); - - found = true; - - break; - } - } - } - - if (found) - break; - else { - try { - U.sleep(100); - } - catch (IgniteInterruptedCheckedException e) { - break; - } - } - } - - this.exchMergeTestWaitVer = null; - } + if (exchMergeTestWaitVer != null) + waitForTestVersion(exchMergeTestWaitVer, curFut); synchronized (curFut.mutex()) { int awaited = 0; @@ -2048,6 +2018,8 @@ public class GridCachePartitionExchangeManager<K, V> extends GridCacheSharedMana ", evtNodeClient=" + CU.clientNode(fut.firstEvent().eventNode())+ ']'); } + addDiscoEvtForTest(fut.firstEvent()); + curFut.context().events().addEvent(fut.initialVersion(), fut.firstEvent(), fut.firstEventCache()); @@ -2071,6 +2043,67 @@ public class GridCachePartitionExchangeManager<K, V> extends GridCacheSharedMana } } + + /** + * For testing purposes. Stores discovery events with merged exchanges to enable examining them later. + * + * @param discoEvt Discovery event. + */ + private void addDiscoEvtForTest(DiscoveryEvent discoEvt) { + List mergedEvtsForTest = this.mergedEvtsForTest; + + if (mergedEvtsForTest != null) + mergedEvtsForTest.add(discoEvt); + } + + /** + * For testing purposes. Method allows to wait for an exchange future of specific version + * to appear in exchange worker queue. + * + * @param exchMergeTestWaitVer Topology Version to wait for. + * @param curFut Current Exchange Future. + */ + private void waitForTestVersion(AffinityTopologyVersion exchMergeTestWaitVer, GridDhtPartitionsExchangeFuture curFut) { + if (log.isInfoEnabled()) { + log.info("Exchange merge test, waiting for version [exch=" + curFut.initialVersion() + + ", waitVer=" + exchMergeTestWaitVer + ']'); + } + + long end = U.currentTimeMillis() + 10_000; + + while (U.currentTimeMillis() < end) { + boolean found = false; + + for (CachePartitionExchangeWorkerTask task : exchWorker.futQ) { + if (task instanceof GridDhtPartitionsExchangeFuture) { + GridDhtPartitionsExchangeFuture fut = (GridDhtPartitionsExchangeFuture)task; + + if (exchMergeTestWaitVer.equals(fut.initialVersion())) { + if (log.isInfoEnabled()) + log.info("Exchange merge test, found awaited version: " + exchMergeTestWaitVer); + + found = true; + + break; + } + } + } + + if (found) + break; + else { + try { + U.sleep(100); + } + catch (IgniteInterruptedCheckedException e) { + break; + } + } + } + + this.exchMergeTestWaitVer = null; + } + /** * Exchange future thread. All exchanges happen only by one thread and next * exchange will not start until previous one completes. http://git-wip-us.apache.org/repos/asf/ignite/blob/ebd669e4/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/WalStateNodeLeaveExchangeTask.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/WalStateNodeLeaveExchangeTask.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/WalStateNodeLeaveExchangeTask.java index 3ac12fc..77dfc34 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/WalStateNodeLeaveExchangeTask.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/WalStateNodeLeaveExchangeTask.java @@ -47,7 +47,7 @@ public class WalStateNodeLeaveExchangeTask implements CachePartitionExchangeWork /** {@inheritDoc} */ @Override public boolean skipForExchangeMerge() { - return false; + return true; } /** {@inheritDoc} */ http://git-wip-us.apache.org/repos/asf/ignite/blob/ebd669e4/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/CacheExchangeMergeTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/CacheExchangeMergeTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/CacheExchangeMergeTest.java index 6c714b1..53a75d4 100644 --- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/CacheExchangeMergeTest.java +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/CacheExchangeMergeTest.java @@ -22,6 +22,7 @@ import java.util.HashMap; import java.util.List; import java.util.Map; import java.util.TreeMap; +import java.util.UUID; import java.util.concurrent.Callable; import java.util.concurrent.CountDownLatch; import java.util.concurrent.ExecutorService; @@ -775,17 +776,37 @@ public class CacheExchangeMergeTest extends GridCommonAbstractTest { * @throws Exception If failed. */ private void mergeServersFail1(boolean waitRebalance) throws Exception { - final Ignite srv0 = startGrids(4); + final Ignite srv0 = startGrids(5); if (waitRebalance) awaitPartitionMapExchange(); - mergeExchangeWaitVersion(srv0, 6); + final List<DiscoveryEvent> mergedEvts = new ArrayList<>(); + + mergeExchangeWaitVersion(srv0, 8, mergedEvts); + + UUID grid3Id = grid(3).localNode().id(); + UUID grid2Id = grid(2).localNode().id(); + stopGrid(getTestIgniteInstanceName(4), true, false); stopGrid(getTestIgniteInstanceName(3), true, false); stopGrid(getTestIgniteInstanceName(2), true, false); checkCaches(); + + awaitPartitionMapExchange(); + + assertTrue("Unexpected number of merged disco events: " + mergedEvts.size(), mergedEvts.size() == 2); + + for (DiscoveryEvent discoEvt : mergedEvts) { + ClusterNode evtNode = discoEvt.eventNode(); + + assertTrue("eventNode is null for DiscoEvent " + discoEvt, evtNode != null); + + assertTrue("Unexpected eventNode ID: " + + evtNode.id() + " while expecting " + grid2Id + " or " + grid3Id, + evtNode.id().equals(grid2Id) || evtNode.id().equals(grid3Id)); + } } /** http://git-wip-us.apache.org/repos/asf/ignite/blob/ebd669e4/modules/core/src/test/java/org/apache/ignite/testframework/GridTestUtils.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/testframework/GridTestUtils.java b/modules/core/src/test/java/org/apache/ignite/testframework/GridTestUtils.java index e6c6657..9390d6b 100644 --- a/modules/core/src/test/java/org/apache/ignite/testframework/GridTestUtils.java +++ b/modules/core/src/test/java/org/apache/ignite/testframework/GridTestUtils.java @@ -1949,6 +1949,15 @@ public final class GridTestUtils { */ public static void mergeExchangeWaitVersion(Ignite node, long topVer) { ((IgniteEx)node).context().cache().context().exchange().mergeExchangesTestWaitVersion( - new AffinityTopologyVersion(topVer, 0)); + new AffinityTopologyVersion(topVer, 0), null); + } + + /** + * @param node Node. + * @param topVer Ready exchange version to wait for before trying to merge exchanges. + */ + public static void mergeExchangeWaitVersion(Ignite node, long topVer, List mergedEvts) { + ((IgniteEx)node).context().cache().context().exchange().mergeExchangesTestWaitVersion( + new AffinityTopologyVersion(topVer, 0), mergedEvts); } }