IGNITE-8544 Use exchange result topology version for local wal state management. - Fixes #4039.
Signed-off-by: Alexey Goncharuk <[email protected]> Project: http://git-wip-us.apache.org/repos/asf/ignite/repo Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/fe38f3e3 Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/fe38f3e3 Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/fe38f3e3 Branch: refs/heads/ignite-5789-1 Commit: fe38f3e389ed2cf9ccdf4146a7960a3375ee4d72 Parents: 028df98 Author: Pavel Kovalenko <[email protected]> Authored: Mon May 21 22:33:50 2018 +0300 Committer: Alexey Goncharuk <[email protected]> Committed: Mon May 21 22:33:50 2018 +0300 ---------------------------------------------------------------------- .../GridDhtPartitionsExchangeFuture.java | 2 +- ...lWalModeChangeDuringRebalancingSelfTest.java | 66 ++++++++++++++++++++ 2 files changed, 67 insertions(+), 1 deletion(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/ignite/blob/fe38f3e3/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java index 1b79b76..c62b067 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java @@ -1723,7 +1723,7 @@ public class GridDhtPartitionsExchangeFuture extends GridDhtTopologyFutureAdapte grp.topology().onExchangeDone(this, grp.affinity().readyAffinity(res), false); } - cctx.walState().changeLocalStatesOnExchangeDone(exchId.topologyVersion()); + cctx.walState().changeLocalStatesOnExchangeDone(res); } if (super.onDone(res, err)) { http://git-wip-us.apache.org/repos/asf/ignite/blob/fe38f3e3/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/LocalWalModeChangeDuringRebalancingSelfTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/LocalWalModeChangeDuringRebalancingSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/LocalWalModeChangeDuringRebalancingSelfTest.java index 07653f2..ca46a75 100644 --- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/LocalWalModeChangeDuringRebalancingSelfTest.java +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/LocalWalModeChangeDuringRebalancingSelfTest.java @@ -24,10 +24,12 @@ import java.nio.MappedByteBuffer; import java.nio.file.OpenOption; import java.util.concurrent.CountDownLatch; import java.util.concurrent.atomic.AtomicReference; +import com.sun.org.apache.regexp.internal.RE; import org.apache.ignite.Ignite; import org.apache.ignite.IgniteCache; import org.apache.ignite.IgniteException; import org.apache.ignite.IgniteSystemProperties; +import org.apache.ignite.cache.CacheMode; import org.apache.ignite.cluster.ClusterNode; import org.apache.ignite.configuration.CacheConfiguration; import org.apache.ignite.configuration.DataRegionConfiguration; @@ -49,6 +51,7 @@ import org.apache.ignite.spi.IgniteSpiException; import org.apache.ignite.spi.communication.tcp.TcpCommunicationSpi; import org.apache.ignite.testframework.GridTestUtils; import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest; +import org.junit.Assert; /** * @@ -63,6 +66,9 @@ public class LocalWalModeChangeDuringRebalancingSelfTest extends GridCommonAbstr /** */ private static final AtomicReference<CountDownLatch> fileIOLatch = new AtomicReference<>(); + /** Replicated cache name. */ + private static final String REPL_CACHE = "cache"; + /** {@inheritDoc} */ @Override protected IgniteConfiguration getConfiguration(String igniteInstanceName) throws Exception { IgniteConfiguration cfg = super.getConfiguration(igniteInstanceName); @@ -83,7 +89,11 @@ public class LocalWalModeChangeDuringRebalancingSelfTest extends GridCommonAbstr cfg.setCacheConfiguration( new CacheConfiguration(DEFAULT_CACHE_NAME) // Test checks internal state before and after rebalance, so it is configured to be triggered manually + .setRebalanceDelay(-1), + + new CacheConfiguration(REPL_CACHE) .setRebalanceDelay(-1) + .setCacheMode(CacheMode.REPLICATED) ); cfg.setCommunicationSpi(new TcpCommunicationSpi() { @@ -295,6 +305,62 @@ public class LocalWalModeChangeDuringRebalancingSelfTest extends GridCommonAbstr } /** + * Test that local WAL mode changing works well with exchanges merge. + * + * @throws Exception If failed. + */ + public void testWithExchangesMerge() throws Exception { + final int nodeCnt = 5; + final int keyCnt = 10_000; + + Ignite ignite = startGrids(nodeCnt); + + ignite.cluster().active(true); + + IgniteCache<Integer, Integer> cache = ignite.cache(REPL_CACHE); + + for (int k = 0; k < keyCnt; k++) + cache.put(k, k); + + stopGrid(2); + stopGrid(3); + stopGrid(4); + + // Rewrite data to trigger further rebalance. + for (int k = 0; k < keyCnt; k++) + cache.put(k, k * 2); + + // Start several grids in parallel to trigger exchanges merge. + startGridsMultiThreaded(2, 3); + + for (int nodeIdx = 2; nodeIdx < nodeCnt; nodeIdx++) { + CacheGroupContext grpCtx = grid(nodeIdx).cachex(REPL_CACHE).context().group(); + + assertFalse(grpCtx.walEnabled()); + } + + // Invoke rebalance manually. + for (Ignite g : G.allGrids()) + g.cache(REPL_CACHE).rebalance(); + + awaitPartitionMapExchange(); + + for (int nodeIdx = 2; nodeIdx < nodeCnt; nodeIdx++) { + CacheGroupContext grpCtx = grid(nodeIdx).cachex(REPL_CACHE).context().group(); + + assertTrue(grpCtx.walEnabled()); + } + + // Check no data loss. + for (int nodeIdx = 2; nodeIdx < nodeCnt; nodeIdx++) { + IgniteCache<Integer, Integer> cache0 = grid(nodeIdx).cache(REPL_CACHE); + + for (int k = 0; k < keyCnt; k++) + Assert.assertEquals("nodeIdx=" + nodeIdx + ", key=" + k, (Integer) (2 * k), cache0.get(k)); + } + } + + /** * @throws Exception If failed. */ public void testParallelExchangeDuringRebalance() throws Exception {
