Repository: ignite Updated Branches: refs/heads/master 4ee696fc6 -> 2b22933b8
IGNITE-8929 Do not disable WAL if node does not have MOVING partitions. Fixes #4372 Project: http://git-wip-us.apache.org/repos/asf/ignite/repo Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/2b22933b Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/2b22933b Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/2b22933b Branch: refs/heads/master Commit: 2b22933b89060f5247c866386a5bc7d44135ed8d Parents: 4ee696f Author: DmitriyGovorukhin <[email protected]> Authored: Wed Jul 18 16:16:58 2018 +0300 Committer: Alexey Goncharuk <[email protected]> Committed: Wed Jul 18 16:16:58 2018 +0300 ---------------------------------------------------------------------- .../processors/cache/CacheGroupContext.java | 18 +- .../processors/cache/WalStateManager.java | 35 ++-- ...hangeDuringRebalanceOnNonNodeAssignTest.java | 169 +++++++++++++++++++ ...lWalModeChangeDuringRebalancingSelfTest.java | 38 ++++- .../ignite/testsuites/IgnitePdsTestSuite2.java | 3 + 5 files changed, 237 insertions(+), 26 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/ignite/blob/2b22933b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheGroupContext.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheGroupContext.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheGroupContext.java index d2b98f2..7009575 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheGroupContext.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheGroupContext.java @@ -1059,18 +1059,28 @@ public class CacheGroupContext { * @param enabled Global WAL enabled flag. */ public void globalWalEnabled(boolean enabled) { - persistGlobalWalState(enabled); + if (globalWalEnabled != enabled) { + log.info("Global WAL state for group=" + cacheOrGroupName() + + " changed from " + globalWalEnabled + " to " + enabled); - this.globalWalEnabled = enabled; + persistGlobalWalState(enabled); + + globalWalEnabled = enabled; + } } /** * @param enabled Local WAL enabled flag. */ public void localWalEnabled(boolean enabled) { - persistLocalWalState(enabled); + if (localWalEnabled != enabled){ + log.info("Local WAL state for group=" + cacheOrGroupName() + + " changed from " + localWalEnabled + " to " + enabled); - this.localWalEnabled = enabled; + persistLocalWalState(enabled); + + localWalEnabled = enabled; + } } /** http://git-wip-us.apache.org/repos/asf/ignite/blob/2b22933b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/WalStateManager.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/WalStateManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/WalStateManager.java index 52e3ecb..16b92ce 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/WalStateManager.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/WalStateManager.java @@ -42,7 +42,6 @@ import org.apache.ignite.internal.pagemem.store.IgnitePageStoreManager; import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion; import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtLocalPartition; import org.apache.ignite.internal.processors.cache.persistence.CheckpointFuture; -import org.apache.ignite.internal.processors.cache.persistence.GridCacheDatabaseSharedManager; import org.apache.ignite.internal.processors.cache.persistence.IgniteCacheDatabaseSharedManager; import org.apache.ignite.internal.processors.cache.persistence.metastorage.MetastorageLifecycleListener; import org.apache.ignite.internal.processors.cache.persistence.metastorage.ReadOnlyMetastorage; @@ -66,6 +65,7 @@ import org.jetbrains.annotations.Nullable; import static org.apache.ignite.internal.GridTopic.TOPIC_WAL; import static org.apache.ignite.internal.managers.communication.GridIoPolicy.SYSTEM_POOL; +import static org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtPartitionState.MOVING; import static org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtPartitionState.OWNING; /** @@ -379,6 +379,7 @@ public class WalStateManager extends GridCacheSharedManagerAdapter { continue; boolean hasOwning = false; + boolean hasMoving = false; int parts = 0; @@ -394,18 +395,23 @@ public class WalStateManager extends GridCacheSharedManagerAdapter { break; } + + parts++; } - parts++; + if (locPart.state() == MOVING) + hasMoving = true; } - log.info("Prepare change WAL state, grp=" + grp.cacheOrGroupName() + - ", grpId=" + grp.groupId() + ", hasOwning=" + hasOwning + - ", WALState=" + grp.walEnabled() + ", parts=" + parts); + if (log.isDebugEnabled()) + log.debug("Prepare change WAL state, grp=" + grp.cacheOrGroupName() + + ", grpId=" + grp.groupId() + ", hasOwning=" + hasOwning + ", hasMoving=" + hasMoving + + ", WALState=" + grp.walEnabled() + ", parts=" + parts); - if (hasOwning && !grp.localWalEnabled()) + if (hasOwning && !grp.localWalEnabled()) { grpsToEnableWal.add(grp.groupId()); - else if (!hasOwning && grp.localWalEnabled()) { + } + else if (hasMoving && !hasOwning && grp.localWalEnabled()) { grpsToDisableWal.add(grp.groupId()); grpsWithWalDisabled.add(grp.groupId()); @@ -421,7 +427,7 @@ public class WalStateManager extends GridCacheSharedManagerAdapter { try { if (hasNonEmptyOwning && !grpsToEnableWal.isEmpty()) - triggerCheckpoint(0).finishFuture().get(); + triggerCheckpoint("wal-local-state-change-" + topVer).finishFuture().get(); } catch (IgniteCheckedException ex) { throw new IgniteException(ex); @@ -465,7 +471,7 @@ public class WalStateManager extends GridCacheSharedManagerAdapter { tmpDisabledWal = null; } - CheckpointFuture cpFut = triggerCheckpoint(0); + CheckpointFuture cpFut = triggerCheckpoint("wal-local-state-changed-rebalance-finished-" + topVer); assert cpFut != null; @@ -618,7 +624,7 @@ public class WalStateManager extends GridCacheSharedManagerAdapter { res = new WalStateResult(msg, false); else { // Initiate a checkpoint. - CheckpointFuture cpFut = triggerCheckpoint(msg.groupId()); + CheckpointFuture cpFut = triggerCheckpoint("wal-state-change-grp-" + msg.groupId()); if (cpFut != null) { try { @@ -1009,11 +1015,11 @@ public class WalStateManager extends GridCacheSharedManagerAdapter { /** * Force checkpoint. * - * @param grpId Group ID. + * @param msg Message. * @return Checkpoint future or {@code null} if failed to get checkpointer. */ - @Nullable private CheckpointFuture triggerCheckpoint(int grpId) { - return cctx.database().forceCheckpoint("wal-state-change-grp-" + grpId); + @Nullable private CheckpointFuture triggerCheckpoint(String msg) { + return cctx.database().forceCheckpoint(msg); } /** @@ -1126,7 +1132,8 @@ public class WalStateManager extends GridCacheSharedManagerAdapter { /** */ public TemporaryDisabledWal( Set<Integer> disabledGrps, - AffinityTopologyVersion topVer) { + AffinityTopologyVersion topVer + ) { this.disabledGrps = Collections.unmodifiableSet(disabledGrps); this.remainingGrps = new HashSet<>(disabledGrps); this.topVer = topVer; http://git-wip-us.apache.org/repos/asf/ignite/blob/2b22933b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/LocalWacModeNoChangeDuringRebalanceOnNonNodeAssignTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/LocalWacModeNoChangeDuringRebalanceOnNonNodeAssignTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/LocalWacModeNoChangeDuringRebalanceOnNonNodeAssignTest.java new file mode 100644 index 0000000..675ed31 --- /dev/null +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/LocalWacModeNoChangeDuringRebalanceOnNonNodeAssignTest.java @@ -0,0 +1,169 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.cache.persistence; + +import org.apache.ignite.Ignite; +import org.apache.ignite.IgniteCheckedException; +import org.apache.ignite.IgniteDataStreamer; +import org.apache.ignite.cache.affinity.rendezvous.RendezvousAffinityFunction; +import org.apache.ignite.configuration.CacheConfiguration; +import org.apache.ignite.configuration.DataRegionConfiguration; +import org.apache.ignite.configuration.DataStorageConfiguration; +import org.apache.ignite.configuration.IgniteConfiguration; +import org.apache.ignite.internal.IgniteEx; +import org.apache.ignite.internal.pagemem.wal.WALIterator; +import org.apache.ignite.internal.pagemem.wal.WALPointer; +import org.apache.ignite.internal.pagemem.wal.record.MetastoreDataRecord; +import org.apache.ignite.internal.pagemem.wal.record.WALRecord; +import org.apache.ignite.internal.processors.cache.persistence.wal.reader.IgniteWalIteratorFactory; +import org.apache.ignite.internal.util.typedef.internal.U; +import org.apache.ignite.lang.IgniteBiTuple; +import org.apache.ignite.spi.discovery.tcp.TcpDiscoverySpi; +import org.apache.ignite.spi.discovery.tcp.ipfinder.TcpDiscoveryIpFinder; +import org.apache.ignite.spi.discovery.tcp.ipfinder.vm.TcpDiscoveryVmIpFinder; +import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest; + +import static java.lang.String.valueOf; +import static org.apache.ignite.IgniteSystemProperties.IGNITE_DISABLE_WAL_DURING_REBALANCING; +import static org.apache.ignite.internal.pagemem.wal.record.WALRecord.RecordType.METASTORE_DATA_RECORD; +import static org.apache.ignite.internal.processors.cache.GridCacheUtils.cacheId; +import static org.apache.ignite.internal.processors.cache.persistence.file.FilePageStoreManager.DFLT_STORE_DIR; + +/** + * + */ +public class LocalWacModeNoChangeDuringRebalanceOnNonNodeAssignTest extends GridCommonAbstractTest { + + /** */ + private static final TcpDiscoveryIpFinder IP_FINDER = new TcpDiscoveryVmIpFinder(true); + + /** */ + private final int NODES = 3; + + /** {@inheritDoc} */ + @Override protected IgniteConfiguration getConfiguration(String name) throws Exception { + IgniteConfiguration cfg = super.getConfiguration(name); + + cfg.setConsistentId(name); + + cfg.setDiscoverySpi(new TcpDiscoverySpi().setIpFinder(IP_FINDER)); + + cfg.setDataStorageConfiguration( + new DataStorageConfiguration() + .setWalPath(walPath(name)) + .setWalArchivePath(walArchivePath(name)) + .setDefaultDataRegionConfiguration( + new DataRegionConfiguration() + .setPersistenceEnabled(true) + ) + ); + + cfg.setCacheConfiguration( + new CacheConfiguration(DEFAULT_CACHE_NAME) + .setAffinity( + new RendezvousAffinityFunction(false, 3)) + ); + + return cfg; + } + + /** {@inheritDoc} */ + @Override protected void beforeTest() throws Exception { + super.beforeTest(); + + cleanPersistenceDir(); + + System.setProperty(IGNITE_DISABLE_WAL_DURING_REBALANCING, "true"); + } + + /** {@inheritDoc} */ + @Override protected void afterTest() throws Exception { + super.afterTest(); + + System.clearProperty(IGNITE_DISABLE_WAL_DURING_REBALANCING); + } + + /** + * @throws Exception If failed. + */ + public void test() throws Exception { + Ignite ig = startGrids(NODES); + + ig.cluster().active(true); + + int entries = 100_000; + + try (IgniteDataStreamer<Integer, Integer> st = ig.dataStreamer(DEFAULT_CACHE_NAME)) { + st.allowOverwrite(true); + + for (int i = 0; i < entries; i++) + st.addData(i, -i); + } + + IgniteEx ig4 = startGrid(NODES); + + ig4.cluster().setBaselineTopology(ig4.context().discovery().topologyVersion()); + + IgniteWalIteratorFactory iteratorFactory = new IgniteWalIteratorFactory(log); + + String name = ig4.name(); + + try (WALIterator it = iteratorFactory.iterator(walPath(name), walArchivePath(name))) { + while (it.hasNext()) { + IgniteBiTuple<WALPointer, WALRecord> tup = it.next(); + + WALRecord rec = tup.get2(); + + if (rec.type() == METASTORE_DATA_RECORD) { + MetastoreDataRecord metastoreDataRecord = (MetastoreDataRecord)rec; + + String key = metastoreDataRecord.key(); + + if (key.startsWith("grp-wal-") && + key.contains(valueOf(cacheId(DEFAULT_CACHE_NAME))) && + metastoreDataRecord.value() != null) + fail("WAL was disabled but should not."); + } + } + } + } + + /** + * + * @param nodeName Node name. + * @return Path to WAL work directory. + * @throws IgniteCheckedException If failed. + */ + private String walPath(String nodeName) throws IgniteCheckedException { + String workDir = U.defaultWorkDirectory(); + + return workDir + "/" + DFLT_STORE_DIR + "/" + nodeName + "/wal"; + } + + /** + * + * @param nodeName Node name. + * @return Path to WAL archive directory. + * @throws IgniteCheckedException If failed. + */ + private String walArchivePath(String nodeName) throws IgniteCheckedException { + String workDir = U.defaultWorkDirectory(); + + return workDir + "/" + DFLT_STORE_DIR + "/" + nodeName + "/walArchive"; + } +} http://git-wip-us.apache.org/repos/asf/ignite/blob/2b22933b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/LocalWalModeChangeDuringRebalancingSelfTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/LocalWalModeChangeDuringRebalancingSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/LocalWalModeChangeDuringRebalancingSelfTest.java index c3aee2b..9ec27ce 100644 --- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/LocalWalModeChangeDuringRebalancingSelfTest.java +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/LocalWalModeChangeDuringRebalancingSelfTest.java @@ -26,6 +26,7 @@ import java.util.concurrent.CountDownLatch; import java.util.concurrent.atomic.AtomicReference; import org.apache.ignite.Ignite; import org.apache.ignite.IgniteCache; +import org.apache.ignite.IgniteCheckedException; import org.apache.ignite.IgniteException; import org.apache.ignite.IgniteSystemProperties; import org.apache.ignite.cache.CacheMode; @@ -50,10 +51,11 @@ import org.apache.ignite.lang.IgniteInClosure; import org.apache.ignite.plugin.extensions.communication.Message; import org.apache.ignite.spi.IgniteSpiException; import org.apache.ignite.spi.communication.tcp.TcpCommunicationSpi; -import org.apache.ignite.testframework.GridTestUtils; import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest; import org.junit.Assert; +import static org.apache.ignite.testframework.GridTestUtils.waitForCondition; + /** * */ @@ -184,7 +186,6 @@ public class LocalWalModeChangeDuringRebalancingSelfTest extends GridCommonAbstr disableWalDuringRebalancing = true; } - /** * @return Count of entries to be processed within test. */ @@ -228,7 +229,7 @@ public class LocalWalModeChangeDuringRebalancingSelfTest extends GridCommonAbstr final CheckpointHistory cpHist = ((GridCacheDatabaseSharedManager)newIgnite.context().cache().context().database()).checkpointHistory(); - GridTestUtils.waitForCondition(new GridAbsPredicate() { + waitForCondition(new GridAbsPredicate() { @Override public boolean apply() { return !cpHist.checkpoints().isEmpty(); } @@ -238,7 +239,9 @@ public class LocalWalModeChangeDuringRebalancingSelfTest extends GridCommonAbstr long newIgniteStartedTimestamp = System.currentTimeMillis(); - ignite.cluster().setBaselineTopology(4); + newIgnite.cluster().setBaselineTopology(4); + + awaitExchange(newIgnite); CacheGroupContext grpCtx = newIgnite.cachex(DEFAULT_CACHE_NAME).context().group(); @@ -295,7 +298,9 @@ public class LocalWalModeChangeDuringRebalancingSelfTest extends GridCommonAbstr IgniteEx newIgnite = startGrid(3); - ignite.cluster().setBaselineTopology(ignite.cluster().nodes()); + newIgnite.cluster().setBaselineTopology(ignite.cluster().nodes()); + + awaitExchange(newIgnite); CacheGroupContext grpCtx = newIgnite.cachex(DEFAULT_CACHE_NAME).context().group(); @@ -367,7 +372,7 @@ public class LocalWalModeChangeDuringRebalancingSelfTest extends GridCommonAbstr IgniteCache<Integer, Integer> cache0 = grid(nodeIdx).cache(REPL_CACHE); for (int k = 0; k < keyCnt; k++) - Assert.assertEquals("nodeIdx=" + nodeIdx + ", key=" + k, (Integer) (2 * k), cache0.get(k)); + Assert.assertEquals("nodeIdx=" + nodeIdx + ", key=" + k, (Integer)(2 * k), cache0.get(k)); } } @@ -408,6 +413,9 @@ public class LocalWalModeChangeDuringRebalancingSelfTest extends GridCommonAbstr ignite.cluster().setBaselineTopology(ignite.cluster().nodes()); + // Await fully exchange complete. + awaitExchange(newIgnite); + for (Ignite g : G.allGrids()) g.cache(DEFAULT_CACHE_NAME).rebalance(); @@ -427,7 +435,7 @@ public class LocalWalModeChangeDuringRebalancingSelfTest extends GridCommonAbstr awaitPartitionMapExchange(); - assertTrue(grpCtx.walEnabled()); + assertTrue(waitForCondition(grpCtx::walEnabled, 2_000)); } /** @@ -447,7 +455,10 @@ public class LocalWalModeChangeDuringRebalancingSelfTest extends GridCommonAbstr IgniteEx newIgnite = startGrid(1); - ignite.cluster().setBaselineTopology(2); + newIgnite.cluster().setBaselineTopology(2); + + // Await fully exchange complete. + awaitExchange(newIgnite); CacheGroupContext grpCtx = newIgnite.cachex(DEFAULT_CACHE_NAME).context().group(); @@ -493,6 +504,9 @@ public class LocalWalModeChangeDuringRebalancingSelfTest extends GridCommonAbstr ignite.cluster().setBaselineTopology(5); + // Await fully exchange complete. + awaitExchange((IgniteEx)ignite); + for (Ignite g : G.allGrids()) { CacheGroupContext grpCtx = ((IgniteEx)g).cachex(DEFAULT_CACHE_NAME).context().group(); @@ -512,6 +526,14 @@ public class LocalWalModeChangeDuringRebalancingSelfTest extends GridCommonAbstr /** * + * @param ig Ignite. + */ + private void awaitExchange(IgniteEx ig) throws IgniteCheckedException { + ig.context().cache().context().exchange().lastTopologyFuture().get(); + } + + /** + * */ private static class TestFileIOFactory implements FileIOFactory { /** */ http://git-wip-us.apache.org/repos/asf/ignite/blob/2b22933b/modules/core/src/test/java/org/apache/ignite/testsuites/IgnitePdsTestSuite2.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/testsuites/IgnitePdsTestSuite2.java b/modules/core/src/test/java/org/apache/ignite/testsuites/IgnitePdsTestSuite2.java index 29276c47..066a12a 100644 --- a/modules/core/src/test/java/org/apache/ignite/testsuites/IgnitePdsTestSuite2.java +++ b/modules/core/src/test/java/org/apache/ignite/testsuites/IgnitePdsTestSuite2.java @@ -30,6 +30,7 @@ import org.apache.ignite.internal.processors.cache.persistence.IgnitePdsRecovery import org.apache.ignite.internal.processors.cache.persistence.IgnitePdsTaskCancelingTest; import org.apache.ignite.internal.processors.cache.persistence.IgnitePersistentStoreDataStructuresTest; import org.apache.ignite.internal.processors.cache.persistence.IgniteRebalanceScheduleResendPartitionsTest; +import org.apache.ignite.internal.processors.cache.persistence.LocalWacModeNoChangeDuringRebalanceOnNonNodeAssignTest; import org.apache.ignite.internal.processors.cache.persistence.LocalWalModeChangeDuringRebalancingSelfTest; import org.apache.ignite.internal.processors.cache.persistence.baseline.IgniteAbsentEvictionNodeOutOfBaselineTest; import org.apache.ignite.internal.processors.cache.persistence.baseline.ClientAffinityAssignmentWithBaselineTest; @@ -117,6 +118,8 @@ public class IgnitePdsTestSuite2 extends TestSuite { suite.addTestSuite(LocalWalModeChangeDuringRebalancingSelfTest.class); + suite.addTestSuite(LocalWacModeNoChangeDuringRebalanceOnNonNodeAssignTest.class); + suite.addTestSuite(IgniteWalFlushFsyncSelfTest.class); suite.addTestSuite(IgniteWalFlushFsyncWithDedicatedWorkerSelfTest.class);
