IGNITE-10386 Add mode when WAL won't be disabled during rebalancing caused by BLT change
Signed-off-by: Ivan Rakov <[email protected]> Project: http://git-wip-us.apache.org/repos/asf/ignite/repo Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/7f758bfc Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/7f758bfc Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/7f758bfc Branch: refs/heads/ignite-10044 Commit: 7f758bfcb7038b9ef45786bd0148ed4bd9e8c9cf Parents: ea872cc Author: Andrey Kuznetsov <[email protected]> Authored: Tue Dec 4 19:46:45 2018 +0300 Committer: Ivan Rakov <[email protected]> Committed: Tue Dec 4 19:46:45 2018 +0300 ---------------------------------------------------------------------- .../apache/ignite/IgniteSystemProperties.java | 7 ++ .../processors/cache/WalStateManager.java | 7 +- .../GridDhtPartitionsExchangeFuture.java | 2 +- ...lWalModeChangeDuringRebalancingSelfTest.java | 102 ++++++++++++++++++- 4 files changed, 114 insertions(+), 4 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/ignite/blob/7f758bfc/modules/core/src/main/java/org/apache/ignite/IgniteSystemProperties.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/IgniteSystemProperties.java b/modules/core/src/main/java/org/apache/ignite/IgniteSystemProperties.java index d70162a..9979ee1 100644 --- a/modules/core/src/main/java/org/apache/ignite/IgniteSystemProperties.java +++ b/modules/core/src/main/java/org/apache/ignite/IgniteSystemProperties.java @@ -260,6 +260,13 @@ public final class IgniteSystemProperties { public static final String IGNITE_TX_DEADLOCK_DETECTION_TIMEOUT = "IGNITE_TX_DEADLOCK_DETECTION_TIMEOUT"; /** + * System property to enable pending transaction tracker. + * Affects impact of {@link IgniteSystemProperties#IGNITE_DISABLE_WAL_DURING_REBALANCING} property: + * if this property is set, WAL anyway won't be disabled during rebalancing triggered by baseline topology change. + */ + public static final String IGNITE_PENDING_TX_TRACKER_ENABLED = "IGNITE_PENDING_TX_TRACKER_ENABLED"; + + /** * System property to override multicast group taken from configuration. * Used for testing purposes. */ http://git-wip-us.apache.org/repos/asf/ignite/blob/7f758bfc/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/WalStateManager.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/WalStateManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/WalStateManager.java index ed62cad..0bcd07da 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/WalStateManager.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/WalStateManager.java @@ -387,9 +387,12 @@ public class WalStateManager extends GridCacheSharedManagerAdapter { * in OWNING state if such feature is enabled. * * @param topVer Topology version. + * @param changedBaseline The exchange is caused by Baseline Topology change. */ - public void changeLocalStatesOnExchangeDone(AffinityTopologyVersion topVer) { - if (!IgniteSystemProperties.getBoolean(IgniteSystemProperties.IGNITE_DISABLE_WAL_DURING_REBALANCING, false)) + public void changeLocalStatesOnExchangeDone(AffinityTopologyVersion topVer, boolean changedBaseline) { + if (changedBaseline + && IgniteSystemProperties.getBoolean(IgniteSystemProperties.IGNITE_PENDING_TX_TRACKER_ENABLED) + || !IgniteSystemProperties.getBoolean(IgniteSystemProperties.IGNITE_DISABLE_WAL_DURING_REBALANCING, false)) return; Set<Integer> grpsToEnableWal = new HashSet<>(); http://git-wip-us.apache.org/repos/asf/ignite/blob/7f758bfc/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java index 1eba8b4..ffc55a9 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java @@ -2102,7 +2102,7 @@ public class GridDhtPartitionsExchangeFuture extends GridDhtTopologyFutureAdapte } if (changedAffinity()) - cctx.walState().changeLocalStatesOnExchangeDone(res); + cctx.walState().changeLocalStatesOnExchangeDone(res, changedBaseline()); } final Throwable err0 = err; http://git-wip-us.apache.org/repos/asf/ignite/blob/7f758bfc/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/LocalWalModeChangeDuringRebalancingSelfTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/LocalWalModeChangeDuringRebalancingSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/LocalWalModeChangeDuringRebalancingSelfTest.java index 56426f3..04af1cc 100644 --- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/LocalWalModeChangeDuringRebalancingSelfTest.java +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/LocalWalModeChangeDuringRebalancingSelfTest.java @@ -23,6 +23,7 @@ import java.nio.ByteBuffer; import java.nio.MappedByteBuffer; import java.nio.file.OpenOption; import java.util.concurrent.CountDownLatch; +import java.util.concurrent.ThreadLocalRandom; import java.util.concurrent.atomic.AtomicReference; import org.apache.ignite.Ignite; import org.apache.ignite.IgniteCache; @@ -51,6 +52,7 @@ import org.apache.ignite.lang.IgniteInClosure; import org.apache.ignite.plugin.extensions.communication.Message; import org.apache.ignite.spi.IgniteSpiException; import org.apache.ignite.spi.communication.tcp.TcpCommunicationSpi; +import org.apache.ignite.testframework.GridTestUtils; import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest; import org.junit.Assert; @@ -64,6 +66,12 @@ public class LocalWalModeChangeDuringRebalancingSelfTest extends GridCommonAbstr private static boolean disableWalDuringRebalancing = true; /** */ + private static boolean enablePendingTxTracker = false; + + /** */ + private static int dfltCacheBackupCnt = 0; + + /** */ private static final AtomicReference<CountDownLatch> supplyMessageLatch = new AtomicReference<>(); /** */ @@ -92,7 +100,8 @@ public class LocalWalModeChangeDuringRebalancingSelfTest extends GridCommonAbstr cfg.setCacheConfiguration( new CacheConfiguration(DEFAULT_CACHE_NAME) // Test checks internal state before and after rebalance, so it is configured to be triggered manually - .setRebalanceDelay(-1), + .setRebalanceDelay(-1) + .setBackups(dfltCacheBackupCnt), new CacheConfiguration(REPL_CACHE) .setRebalanceDelay(-1) @@ -147,6 +156,9 @@ public class LocalWalModeChangeDuringRebalancingSelfTest extends GridCommonAbstr System.setProperty(IgniteSystemProperties.IGNITE_DISABLE_WAL_DURING_REBALANCING, Boolean.toString(disableWalDuringRebalancing)); + System.setProperty(IgniteSystemProperties.IGNITE_PENDING_TX_TRACKER_ENABLED, + Boolean.toString(enablePendingTxTracker)); + return cfg; } @@ -184,6 +196,17 @@ public class LocalWalModeChangeDuringRebalancingSelfTest extends GridCommonAbstr cleanPersistenceDir(); disableWalDuringRebalancing = true; + enablePendingTxTracker = false; + dfltCacheBackupCnt = 0; + } + + /** {@inheritDoc} */ + @Override protected void afterTestsStopped() throws Exception { + super.afterTestsStopped(); + + System.clearProperty(IgniteSystemProperties.IGNITE_DISABLE_WAL_DURING_REBALANCING); + + System.clearProperty(IgniteSystemProperties.IGNITE_PENDING_TX_TRACKER_ENABLED); } /** @@ -286,6 +309,60 @@ public class LocalWalModeChangeDuringRebalancingSelfTest extends GridCommonAbstr /** * @throws Exception If failed. */ + public void testWalDisabledDuringRebalancingWithPendingTxTracker() throws Exception { + enablePendingTxTracker = true; + dfltCacheBackupCnt = 2; + + Ignite ignite = startGrids(3); + + ignite.cluster().active(true); + + ignite.cluster().setBaselineTopology(3); + + IgniteCache<Integer, Integer> cache = ignite.cache(DEFAULT_CACHE_NAME); + + stopGrid(2); + + awaitExchange((IgniteEx)ignite); + + doLoad(cache, 4, 10_000); + + IgniteEx newIgnite = startGrid(2); + + awaitExchange(newIgnite); + + CacheGroupContext grpCtx = newIgnite.cachex(DEFAULT_CACHE_NAME).context().group(); + + assertFalse(grpCtx.walEnabled()); + + long rebalanceStartedTs = System.currentTimeMillis(); + + for (Ignite g : G.allGrids()) + g.cache(DEFAULT_CACHE_NAME).rebalance(); + + awaitPartitionMapExchange(); + + assertTrue(grpCtx.walEnabled()); + + long rebalanceFinishedTs = System.currentTimeMillis(); + + CheckpointHistory cpHist = + ((GridCacheDatabaseSharedManager)newIgnite.context().cache().context().database()).checkpointHistory(); + + assertNotNull(cpHist); + + // Ensure there was a checkpoint on WAL re-activation. + assertEquals( + 1, + cpHist.checkpoints() + .stream() + .filter(ts -> rebalanceStartedTs <= ts && ts <= rebalanceFinishedTs) + .count()); + } + + /** + * @throws Exception If failed. + */ public void testLocalAndGlobalWalStateInterdependence() throws Exception { Ignite ignite = startGrids(3); @@ -532,6 +609,29 @@ public class LocalWalModeChangeDuringRebalancingSelfTest extends GridCommonAbstr } /** + * Put random values to cache in multiple threads until time interval given expires. + * + * @param cache Cache to modify. + * @param threadCnt Number ot threads to be used. + * @param duration Time interval in milliseconds. + * @throws Exception When something goes wrong. + */ + private void doLoad(IgniteCache<Integer, Integer> cache, int threadCnt, long duration) throws Exception { + GridTestUtils.runMultiThreaded(() -> { + long stopTs = U.currentTimeMillis() + duration; + + int keysCnt = getKeysCount(); + + ThreadLocalRandom rnd = ThreadLocalRandom.current(); + + do { + cache.put(rnd.nextInt(keysCnt), rnd.nextInt()); + } + while (U.currentTimeMillis() < stopTs); + }, threadCnt, "load-cache"); + } + + /** * */ private static class TestFileIOFactory implements FileIOFactory {
