IGNITE-8827 Disable WAL during apply updates on recovery Signed-off-by: Andrey Gura <ag...@apache.org>
Project: http://git-wip-us.apache.org/repos/asf/ignite/repo Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/8adff242 Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/8adff242 Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/8adff242 Branch: refs/heads/ignite-8446 Commit: 8adff242dccddc4893514785fcc3cd543930d1ad Parents: 532dc79 Author: Dmitriy Govorukhin <dmitriy.govoruk...@gmail.com> Authored: Fri Jul 6 19:09:57 2018 +0300 Committer: Andrey Gura <ag...@apache.org> Committed: Fri Jul 6 19:09:57 2018 +0300 ---------------------------------------------------------------------- .../pagemem/store/IgnitePageStoreManager.java | 5 + .../processors/cache/WalStateManager.java | 225 +++++++++++++++- .../GridCacheDatabaseSharedManager.java | 105 ++++---- .../IgniteCacheDatabaseSharedManager.java | 7 + .../persistence/file/FilePageStoreManager.java | 37 ++- .../persistence/metastorage/MetaStorage.java | 8 +- .../persistence/tree/util/PageHandler.java | 2 + .../wal/FileWriteAheadLogManager.java | 20 +- .../wal/FsyncModeFileWriteAheadLogManager.java | 15 +- .../IgniteNodeStoppedDuringDisableWALTest.java | 261 +++++++++++++++++++ .../pagemem/NoOpPageStoreManager.java | 5 + .../ignite/testsuites/IgnitePdsTestSuite2.java | 3 + 12 files changed, 613 insertions(+), 80 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/ignite/blob/8adff242/modules/core/src/main/java/org/apache/ignite/internal/pagemem/store/IgnitePageStoreManager.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/pagemem/store/IgnitePageStoreManager.java b/modules/core/src/main/java/org/apache/ignite/internal/pagemem/store/IgnitePageStoreManager.java index 7dba8ae..5475bef 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/pagemem/store/IgnitePageStoreManager.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/pagemem/store/IgnitePageStoreManager.java @@ -228,4 +228,9 @@ public interface IgnitePageStoreManager extends GridCacheSharedManager, IgniteCh * @param cacheConfiguration Cache configuration of cache which should be cleanup. */ public void cleanupPersistentSpace(CacheConfiguration cacheConfiguration) throws IgniteCheckedException; + + /** + * Cleanup persistent space for all caches. + */ + public void cleanupPersistentSpace() throws IgniteCheckedException; } http://git-wip-us.apache.org/repos/asf/ignite/blob/8adff242/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/WalStateManager.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/WalStateManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/WalStateManager.java index 4a14730..6b17985 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/WalStateManager.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/WalStateManager.java @@ -17,6 +17,16 @@ package org.apache.ignite.internal.processors.cache; +import java.util.ArrayList; +import java.util.Collection; +import java.util.Collections; +import java.util.HashMap; +import java.util.HashSet; +import java.util.Iterator; +import java.util.LinkedList; +import java.util.Map; +import java.util.Set; +import java.util.UUID; import org.apache.ignite.IgniteCheckedException; import org.apache.ignite.IgniteException; import org.apache.ignite.IgniteLogger; @@ -28,9 +38,15 @@ import org.apache.ignite.internal.GridKernalContext; import org.apache.ignite.internal.IgniteInternalFuture; import org.apache.ignite.internal.IgniteInterruptedCheckedException; import org.apache.ignite.internal.managers.communication.GridMessageListener; +import org.apache.ignite.internal.pagemem.store.IgnitePageStoreManager; import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion; import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtLocalPartition; import org.apache.ignite.internal.processors.cache.persistence.CheckpointFuture; +import org.apache.ignite.internal.processors.cache.persistence.GridCacheDatabaseSharedManager; +import org.apache.ignite.internal.processors.cache.persistence.IgniteCacheDatabaseSharedManager; +import org.apache.ignite.internal.processors.cache.persistence.metastorage.MetastorageLifecycleListener; +import org.apache.ignite.internal.processors.cache.persistence.metastorage.ReadOnlyMetastorage; +import org.apache.ignite.internal.processors.cache.persistence.metastorage.ReadWriteMetastorage; import org.apache.ignite.internal.util.GridBoundedConcurrentLinkedHashSet; import org.apache.ignite.internal.util.future.GridFinishedFuture; import org.apache.ignite.internal.util.future.GridFutureAdapter; @@ -42,22 +58,12 @@ import org.apache.ignite.internal.util.worker.GridWorker; import org.apache.ignite.lang.IgniteFuture; import org.apache.ignite.lang.IgniteInClosure; import org.apache.ignite.lang.IgnitePredicate; +import org.apache.ignite.lang.IgniteRunnable; import org.apache.ignite.lang.IgniteUuid; -import org.apache.ignite.thread.OomExceptionHandler; import org.apache.ignite.thread.IgniteThread; +import org.apache.ignite.thread.OomExceptionHandler; import org.jetbrains.annotations.Nullable; -import java.util.ArrayList; -import java.util.Collection; -import java.util.Collections; -import java.util.HashMap; -import java.util.HashSet; -import java.util.Iterator; -import java.util.LinkedList; -import java.util.Map; -import java.util.Set; -import java.util.UUID; - import static org.apache.ignite.internal.GridTopic.TOPIC_WAL; import static org.apache.ignite.internal.managers.communication.GridIoPolicy.SYSTEM_POOL; import static org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtPartitionState.OWNING; @@ -113,6 +119,9 @@ public class WalStateManager extends GridCacheSharedManagerAdapter { /** Holder for groups with temporary disabled WAL. */ private volatile TemporaryDisabledWal tmpDisabledWal; + /** */ + private volatile WALDisableContext walDisableContext; + /** * Constructor. * @@ -155,6 +164,15 @@ public class WalStateManager extends GridCacheSharedManagerAdapter { @Override protected void start0() throws IgniteCheckedException { if (srv) cctx.kernalContext().io().addMessageListener(TOPIC_WAL, ioLsnr); + + walDisableContext = new WALDisableContext( + cctx.cache().context().database(), + cctx.pageStore(), + log + ); + + cctx.kernalContext().internalSubscriptionProcessor().registerMetastorageListener(walDisableContext); + } /** {@inheritDoc} */ @@ -1018,6 +1036,40 @@ public class WalStateManager extends GridCacheSharedManagerAdapter { } /** + * Checks WAL disabled for cache group. + * + * @param grpId Group id. + * @return {@code True} if WAL disable for group. {@code False} If not. + */ + public boolean isDisabled(int grpId) { + CacheGroupContext ctx = cctx.cache().cacheGroup(grpId); + + return ctx != null && !ctx.walEnabled(); + } + + /** + * @return WAL disable context. + */ + public WALDisableContext walDisableContext(){ + return walDisableContext; + } + + /** + * None record will be logged in closure call. + * + * @param cls Closure to execute out of WAL scope. + * @throws IgniteCheckedException If operation failed. + */ + public void runWithOutWAL(IgniteRunnable cls) throws IgniteCheckedException { + WALDisableContext ctx = walDisableContext; + + if (ctx == null) + throw new IgniteCheckedException("Disable WAL context is not initialized."); + + ctx.execute(cls); + } + + /** * WAL state change worker. */ private class WalStateChangeWorker extends GridWorker { @@ -1071,4 +1123,153 @@ public class WalStateManager extends GridCacheSharedManagerAdapter { this.topVer = topVer; } } + + /** + * + */ + public static class WALDisableContext implements MetastorageLifecycleListener{ + /** */ + public static final String WAL_DISABLED = "wal-disabled"; + + /** */ + private final IgniteLogger log; + + /** */ + private final IgniteCacheDatabaseSharedManager dbMgr; + + /** */ + private volatile ReadWriteMetastorage metaStorage; + + /** */ + private final IgnitePageStoreManager pageStoreMgr; + + /** */ + private volatile boolean resetWalFlag; + + /** */ + private volatile boolean disableWal; + + /** + * @param dbMgr Database manager. + * @param pageStoreMgr Page store manager. + * @param log + * + */ + public WALDisableContext( + IgniteCacheDatabaseSharedManager dbMgr, + IgnitePageStoreManager pageStoreMgr, + @Nullable IgniteLogger log + ) { + this.dbMgr = dbMgr; + this.pageStoreMgr = pageStoreMgr; + this.log = log; + } + + /** + * @param cls Closure to execute with disabled WAL. + * @throws IgniteCheckedException If execution failed. + */ + public void execute(IgniteRunnable cls) throws IgniteCheckedException { + if (cls == null) + throw new IgniteCheckedException("Task to execute is not specified."); + + if (metaStorage == null) + throw new IgniteCheckedException("Meta storage is not ready."); + + writeMetaStoreDisableWALFlag(); + + dbMgr.waitForCheckpoint("Checkpoint before apply updates on recovery."); + + disableWAL(true); + + try { + cls.run(); + } + catch (IgniteException e) { + throw new IgniteCheckedException(e); + } + finally { + disableWAL(false); + + dbMgr.waitForCheckpoint("Checkpoint after apply updates on recovery."); + + removeMetaStoreDisableWALFlag(); + } + } + + /** + * @throws IgniteCheckedException If write meta store flag failed. + */ + protected void writeMetaStoreDisableWALFlag() throws IgniteCheckedException { + dbMgr.checkpointReadLock(); + + try { + metaStorage.write(WAL_DISABLED, Boolean.TRUE); + } + finally { + dbMgr.checkpointReadUnlock(); + } + } + + /** + * @throws IgniteCheckedException If remove meta store flag failed. + */ + protected void removeMetaStoreDisableWALFlag() throws IgniteCheckedException { + dbMgr.checkpointReadLock(); + + try { + metaStorage.remove(WAL_DISABLED); + } + finally { + dbMgr.checkpointReadUnlock(); + } + } + + /** + * @param disable Flag wal disable. + */ + protected void disableWAL(boolean disable) throws IgniteCheckedException { + dbMgr.checkpointReadLock(); + + try { + disableWal = disable; + + if (log != null) + log.info("WAL logging " + (disable ? "disabled" : "enabled")); + } + finally { + dbMgr.checkpointReadUnlock(); + } + } + + /** {@inheritDoc} */ + @Override public void onReadyForRead(ReadOnlyMetastorage ms) throws IgniteCheckedException { + Boolean disabled = (Boolean)ms.read(WAL_DISABLED); + + // Node crash when WAL was disabled. + if (disabled != null && disabled){ + resetWalFlag = true; + + pageStoreMgr.cleanupPersistentSpace(); + + dbMgr.cleanupTempCheckpointDirectory(); + + dbMgr.cleanupCheckpointDirectory(); + } + } + + /** {@inheritDoc} */ + @Override public void onReadyForReadWrite(ReadWriteMetastorage ms) throws IgniteCheckedException { + // On new node start WAL always enabled. Remove flag from metastore. + if (resetWalFlag) + ms.remove(WAL_DISABLED); + + metaStorage = ms; + } + + /** {@inheritDoc} */ + public boolean check() { + return disableWal; + } + } } http://git-wip-us.apache.org/repos/asf/ignite/blob/8adff242/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/GridCacheDatabaseSharedManager.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/GridCacheDatabaseSharedManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/GridCacheDatabaseSharedManager.java index 55394ae..6389942 100755 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/GridCacheDatabaseSharedManager.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/GridCacheDatabaseSharedManager.java @@ -215,7 +215,7 @@ public class GridCacheDatabaseSharedManager extends IgniteCacheDatabaseSharedMan private static final boolean ASSERTION_ENABLED = GridCacheDatabaseSharedManager.class.desiredAssertionStatus(); /** Checkpoint file name pattern. */ - private static final Pattern CP_FILE_NAME_PATTERN = Pattern.compile("(\\d+)-(.*)-(START|END)\\.bin"); + public static final Pattern CP_FILE_NAME_PATTERN = Pattern.compile("(\\d+)-(.*)-(START|END)\\.bin"); /** Checkpoint file temporary suffix. This is needed to safe writing checkpoint markers through temporary file and renaming. */ public static final String FILE_TMP_SUFFIX = ".tmp"; @@ -349,7 +349,6 @@ public class GridCacheDatabaseSharedManager extends IgniteCacheDatabaseSharedMan /** File I/O factory for writing checkpoint markers. */ private final FileIOFactory ioFactory; - /** * @param ctx Kernal context. */ @@ -492,7 +491,7 @@ public class GridCacheDatabaseSharedManager extends IgniteCacheDatabaseSharedMan /** * Cleanup checkpoint directory from all temporary files {@link #FILE_TMP_SUFFIX}. */ - private void cleanupTempCheckpointDirectory() throws IgniteCheckedException { + public void cleanupTempCheckpointDirectory() throws IgniteCheckedException { try { try (DirectoryStream<Path> files = Files.newDirectoryStream( cpDir.toPath(), @@ -825,8 +824,11 @@ public class GridCacheDatabaseSharedManager extends IgniteCacheDatabaseSharedMan cctx.pageStore().initializeForMetastorage(); - metaStorage = new MetaStorage(cctx, dataRegionMap.get(METASTORE_DATA_REGION_NAME), - (DataRegionMetricsImpl)memMetricsMap.get(METASTORE_DATA_REGION_NAME)); + metaStorage = new MetaStorage( + cctx, + dataRegionMap.get(METASTORE_DATA_REGION_NAME), + (DataRegionMetricsImpl)memMetricsMap.get(METASTORE_DATA_REGION_NAME) + ); WALPointer restore = restoreMemory(status); @@ -837,7 +839,6 @@ public class GridCacheDatabaseSharedManager extends IgniteCacheDatabaseSharedMan // First, bring memory to the last consistent checkpoint state if needed. // This method should return a pointer to the last valid record in the WAL. - cctx.wal().resumeLogging(restore); WALPointer ptr = cctx.wal().log(new MemoryRecoveryRecord(U.currentTimeMillis())); @@ -2129,62 +2130,70 @@ public class GridCacheDatabaseSharedManager extends IgniteCacheDatabaseSharedMan IgnitePredicate<DataEntry> entryPredicate, Map<T2<Integer, Integer>, T2<Integer, Long>> partStates ) throws IgniteCheckedException { - if (it != null) { - while (it.hasNextX()) { - IgniteBiTuple<WALPointer, WALRecord> next = it.nextX(); + cctx.walState().runWithOutWAL(() -> { + if (it != null) { + while (it.hasNext()) { + IgniteBiTuple<WALPointer, WALRecord> next = it.next(); - WALRecord rec = next.get2(); + WALRecord rec = next.get2(); - if (!recPredicate.apply(next)) - break; + if (!recPredicate.apply(next)) + break; - switch (rec.type()) { - case DATA_RECORD: - checkpointReadLock(); + switch (rec.type()) { + case DATA_RECORD: + checkpointReadLock(); - try { - DataRecord dataRec = (DataRecord)rec; + try { + DataRecord dataRec = (DataRecord)rec; - for (DataEntry dataEntry : dataRec.writeEntries()) { - if (entryPredicate.apply(dataEntry)) { - checkpointReadLock(); + for (DataEntry dataEntry : dataRec.writeEntries()) { + if (entryPredicate.apply(dataEntry)) { + checkpointReadLock(); - try { - int cacheId = dataEntry.cacheId(); + try { + int cacheId = dataEntry.cacheId(); - GridCacheContext cacheCtx = cctx.cacheContext(cacheId); + GridCacheContext cacheCtx = cctx.cacheContext(cacheId); - if (cacheCtx != null) - applyUpdate(cacheCtx, dataEntry); - else if (log != null) - log.warning("Cache (cacheId=" + cacheId + ") is not started, can't apply updates."); - } - finally { - checkpointReadUnlock(); + if (cacheCtx != null) + applyUpdate(cacheCtx, dataEntry); + else if (log != null) + log.warning("Cache (cacheId=" + cacheId + ") is not started, can't apply updates."); + } + finally { + checkpointReadUnlock(); + } } } } - } - finally { - checkpointReadUnlock(); - } + catch (IgniteCheckedException e) { + throw new IgniteException(e); + } + finally { + checkpointReadUnlock(); + } - break; + break; - default: - // Skip other records. + default: + // Skip other records. + } } } - } - checkpointReadLock(); + checkpointReadLock(); - try { - restorePartitionStates(partStates, null); - } - finally { - checkpointReadUnlock(); - } + try { + restorePartitionStates(partStates, null); + } + catch (IgniteCheckedException e) { + throw new IgniteException(e); + } + finally { + checkpointReadUnlock(); + } + }); } /** @@ -2316,8 +2325,10 @@ public class GridCacheDatabaseSharedManager extends IgniteCacheDatabaseSharedMan * @param onlyForGroups If not {@code null} restore states only for specified cache groups. * @throws IgniteCheckedException If failed to restore partition states. */ - private void restorePartitionStates(Map<T2<Integer, Integer>, T2<Integer, Long>> partStates, - @Nullable Set<Integer> onlyForGroups) throws IgniteCheckedException { + private void restorePartitionStates( + Map<T2<Integer, Integer>, T2<Integer, Long>> partStates, + @Nullable Set<Integer> onlyForGroups + ) throws IgniteCheckedException { for (CacheGroupContext grp : cctx.cache().cacheGroups()) { if (grp.isLocal() || !grp.affinityNode()) { // Local cache has no partitions and its states. http://git-wip-us.apache.org/repos/asf/ignite/blob/8adff242/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/IgniteCacheDatabaseSharedManager.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/IgniteCacheDatabaseSharedManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/IgniteCacheDatabaseSharedManager.java index 4e59ad1..92de54a 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/IgniteCacheDatabaseSharedManager.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/IgniteCacheDatabaseSharedManager.java @@ -743,6 +743,13 @@ public class IgniteCacheDatabaseSharedManager extends GridCacheSharedManagerAdap } /** + * No-op for non-persistent storage. + */ + public void cleanupTempCheckpointDirectory() throws IgniteCheckedException{ + // No-op. + } + + /** * */ @Nullable public IgniteInternalFuture wakeupForCheckpoint(String reason) { http://git-wip-us.apache.org/repos/asf/ignite/blob/8adff242/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/file/FilePageStoreManager.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/file/FilePageStoreManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/file/FilePageStoreManager.java index afc8281..c8a64c8 100755 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/file/FilePageStoreManager.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/file/FilePageStoreManager.java @@ -65,6 +65,9 @@ import org.apache.ignite.marshaller.jdk.JdkMarshaller; import org.jetbrains.annotations.NotNull; import org.jetbrains.annotations.Nullable; +import static java.nio.file.Files.delete; +import static java.nio.file.Files.newDirectoryStream; + /** * File page store manager. */ @@ -93,6 +96,9 @@ public class FilePageStoreManager extends GridCacheSharedManagerAdapter implemen /** */ public static final String DFLT_STORE_DIR = "db"; + /** */ + public static final String META_STORAGE_NAME = "metastorage"; + /** Marshaller. */ private static final Marshaller marshaller = new JdkMarshaller(); @@ -167,21 +173,41 @@ public class FilePageStoreManager extends GridCacheSharedManagerAdapter implemen } /** {@inheritDoc} */ - public void cleanupPersistentSpace(CacheConfiguration cacheConfiguration) throws IgniteCheckedException { + @Override public void cleanupPersistentSpace(CacheConfiguration cacheConfiguration) throws IgniteCheckedException { try { File cacheWorkDir = cacheWorkDir(cacheConfiguration); if(!cacheWorkDir.exists()) return; - try (DirectoryStream<Path> files = Files.newDirectoryStream(cacheWorkDir.toPath(), + try (DirectoryStream<Path> files = newDirectoryStream(cacheWorkDir.toPath(), new DirectoryStream.Filter<Path>() { @Override public boolean accept(Path entry) throws IOException { return entry.toFile().getName().endsWith(FILE_SUFFIX); } })) { for (Path path : files) - Files.delete(path); + delete(path); + } + } + catch (IOException e) { + throw new IgniteCheckedException("Failed to cleanup persistent directory: ", e); + } + } + + /** {@inheritDoc} */ + @Override public void cleanupPersistentSpace() throws IgniteCheckedException { + try { + try (DirectoryStream<Path> files = newDirectoryStream( + storeWorkDir.toPath(), entry -> { + String name = entry.toFile().getName(); + + return !name.equals(META_STORAGE_NAME) && + (name.startsWith(CACHE_DIR_PREFIX) || name.startsWith(CACHE_GRP_DIR_PREFIX)); + } + )) { + for (Path path : files) + U.delete(path); } } catch (IOException e) { @@ -268,8 +294,7 @@ public class FilePageStoreManager extends GridCacheSharedManagerAdapter implemen if (!idxCacheStores.containsKey(grpId)) { CacheStoreHolder holder = initDir( - new File(storeWorkDir, - "metastorage"), + new File(storeWorkDir, META_STORAGE_NAME), grpId, 1, delta -> {/* No-op */} ); @@ -835,7 +860,7 @@ public class FilePageStoreManager extends GridCacheSharedManagerAdapter implemen } }; - try (DirectoryStream<Path> dirStream = Files.newDirectoryStream(cacheGrpDir.toPath(), cacheCfgFileFilter)) { + try (DirectoryStream<Path> dirStream = newDirectoryStream(cacheGrpDir.toPath(), cacheCfgFileFilter)) { for(Path path: dirStream) Files.deleteIfExists(path); } http://git-wip-us.apache.org/repos/asf/ignite/blob/8adff242/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/metastorage/MetaStorage.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/metastorage/MetaStorage.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/metastorage/MetaStorage.java index e2e26e8..14bd450 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/metastorage/MetaStorage.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/metastorage/MetaStorage.java @@ -115,8 +115,12 @@ public class MetaStorage implements DbCheckpointListener, ReadOnlyMetastorage, R private final Marshaller marshaller = new JdkMarshaller(); /** */ - public MetaStorage(GridCacheSharedContext cctx, DataRegion dataRegion, DataRegionMetricsImpl regionMetrics, - boolean readOnly) { + public MetaStorage( + GridCacheSharedContext cctx, + DataRegion dataRegion, + DataRegionMetricsImpl regionMetrics, + boolean readOnly + ) { wal = cctx.wal(); this.dataRegion = dataRegion; this.regionMetrics = regionMetrics; http://git-wip-us.apache.org/repos/asf/ignite/blob/8adff242/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/tree/util/PageHandler.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/tree/util/PageHandler.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/tree/util/PageHandler.java index 8f854cf..a52038a 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/tree/util/PageHandler.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/tree/util/PageHandler.java @@ -23,6 +23,8 @@ import org.apache.ignite.internal.pagemem.PageMemory; import org.apache.ignite.internal.pagemem.PageSupport; import org.apache.ignite.internal.pagemem.wal.IgniteWriteAheadLogManager; import org.apache.ignite.internal.pagemem.wal.record.delta.InitNewPageRecord; +import org.apache.ignite.internal.processors.cache.GridCacheSharedManager; +import org.apache.ignite.internal.processors.cache.GridCacheSharedManagerAdapter; import org.apache.ignite.internal.processors.cache.persistence.tree.io.PageIO; import org.apache.ignite.internal.util.GridUnsafe; http://git-wip-us.apache.org/repos/asf/ignite/blob/8adff242/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/FileWriteAheadLogManager.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/FileWriteAheadLogManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/FileWriteAheadLogManager.java index 457a59c..3a8cd15 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/FileWriteAheadLogManager.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/FileWriteAheadLogManager.java @@ -68,7 +68,6 @@ import org.apache.ignite.IgniteSystemProperties; import org.apache.ignite.configuration.DataStorageConfiguration; import org.apache.ignite.configuration.IgniteConfiguration; import org.apache.ignite.configuration.WALMode; -import org.apache.ignite.events.EventType; import org.apache.ignite.events.WalSegmentArchivedEvent; import org.apache.ignite.failure.FailureContext; import org.apache.ignite.failure.FailureType; @@ -84,9 +83,9 @@ import org.apache.ignite.internal.pagemem.wal.record.CheckpointRecord; import org.apache.ignite.internal.pagemem.wal.record.MarshalledRecord; import org.apache.ignite.internal.pagemem.wal.record.SwitchSegmentRecord; import org.apache.ignite.internal.pagemem.wal.record.WALRecord; -import org.apache.ignite.internal.processors.cache.CacheGroupContext; import org.apache.ignite.internal.processors.cache.GridCacheSharedContext; import org.apache.ignite.internal.processors.cache.GridCacheSharedManagerAdapter; +import org.apache.ignite.internal.processors.cache.WalStateManager.WALDisableContext; import org.apache.ignite.internal.processors.cache.persistence.DataStorageMetricsImpl; import org.apache.ignite.internal.processors.cache.persistence.GridCacheDatabaseSharedManager; import org.apache.ignite.internal.processors.cache.persistence.file.FileIO; @@ -175,7 +174,7 @@ public class FileWriteAheadLogManager extends GridCacheSharedManagerAdapter impl public static final Pattern WAL_NAME_PATTERN = Pattern.compile("\\d{16}\\.wal"); /** */ - private static final Pattern WAL_TEMP_NAME_PATTERN = Pattern.compile("\\d{16}\\.wal\\.tmp"); + public static final Pattern WAL_TEMP_NAME_PATTERN = Pattern.compile("\\d{16}\\.wal\\.tmp"); /** WAL segment file filter, see {@link #WAL_NAME_PATTERN} */ public static final FileFilter WAL_SEGMENT_FILE_FILTER = new FileFilter() { @@ -316,6 +315,9 @@ public class FileWriteAheadLogManager extends GridCacheSharedManagerAdapter impl /** Current log segment handle */ private volatile FileWriteHandle currHnd; + /** */ + private volatile WALDisableContext walDisableContext; + /** * Positive (non-0) value indicates WAL can be archived even if not complete<br> * See {@link DataStorageConfiguration#setWalAutoArchiveAfterInactivity(long)}<br> @@ -448,6 +450,8 @@ public class FileWriteAheadLogManager extends GridCacheSharedManagerAdapter impl new IgniteThread(decompressor).start(); } + walDisableContext = cctx.walState().walDisableContext(); + if (mode != WALMode.NONE) { if (log.isInfoEnabled()) log.info("Started write-ahead log manager [mode=" + mode + ']'); @@ -727,8 +731,10 @@ public class FileWriteAheadLogManager extends GridCacheSharedManagerAdapter impl FileWriteHandle currWrHandle = currentHandle(); + WALDisableContext isDisable = walDisableContext; + // Logging was not resumed yet. - if (currWrHandle == null) + if (currWrHandle == null || (isDisable != null && isDisable.check())) return null; // Need to calculate record size first. @@ -994,9 +1000,7 @@ public class FileWriteAheadLogManager extends GridCacheSharedManagerAdapter impl /** {@inheritDoc} */ @Override public boolean disabled(int grpId) { - CacheGroupContext ctx = cctx.cache().cacheGroup(grpId); - - return ctx != null && !ctx.walEnabled(); + return cctx.walState().isDisabled(grpId); } /** @@ -1355,7 +1359,7 @@ public class FileWriteAheadLogManager extends GridCacheSharedManagerAdapter impl } /** {@inheritDoc} */ - public void cleanupWalDirectories() throws IgniteCheckedException { + @Override public void cleanupWalDirectories() throws IgniteCheckedException { try { try (DirectoryStream<Path> files = Files.newDirectoryStream(walWorkDir.toPath())) { for (Path path : files) http://git-wip-us.apache.org/repos/asf/ignite/blob/8adff242/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/FsyncModeFileWriteAheadLogManager.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/FsyncModeFileWriteAheadLogManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/FsyncModeFileWriteAheadLogManager.java index a75dd31..b11b829 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/FsyncModeFileWriteAheadLogManager.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/FsyncModeFileWriteAheadLogManager.java @@ -78,9 +78,9 @@ import org.apache.ignite.internal.pagemem.wal.WALPointer; import org.apache.ignite.internal.pagemem.wal.record.MarshalledRecord; import org.apache.ignite.internal.pagemem.wal.record.SwitchSegmentRecord; import org.apache.ignite.internal.pagemem.wal.record.WALRecord; -import org.apache.ignite.internal.processors.cache.CacheGroupContext; import org.apache.ignite.internal.processors.cache.GridCacheSharedContext; import org.apache.ignite.internal.processors.cache.GridCacheSharedManagerAdapter; +import org.apache.ignite.internal.processors.cache.WalStateManager.WALDisableContext; import org.apache.ignite.internal.processors.cache.persistence.DataStorageMetricsImpl; import org.apache.ignite.internal.processors.cache.persistence.GridCacheDatabaseSharedManager; import org.apache.ignite.internal.processors.cache.persistence.file.FileIO; @@ -268,6 +268,9 @@ public class FsyncModeFileWriteAheadLogManager extends GridCacheSharedManagerAda /** Current log segment handle */ private volatile FileWriteHandle currentHnd; + /** */ + private volatile WALDisableContext walDisableContext; + /** * Positive (non-0) value indicates WAL can be archived even if not complete<br> * See {@link DataStorageConfiguration#setWalAutoArchiveAfterInactivity(long)}<br> @@ -383,6 +386,8 @@ public class FsyncModeFileWriteAheadLogManager extends GridCacheSharedManagerAda decompressor = new FileDecompressor(log); } + walDisableContext = cctx.walState().walDisableContext(); + if (mode != WALMode.NONE) { if (log.isInfoEnabled()) log.info("Started write-ahead log manager [mode=" + mode + ']'); @@ -637,8 +642,10 @@ public class FsyncModeFileWriteAheadLogManager extends GridCacheSharedManagerAda FileWriteHandle currWrHandle = currentHandle(); + WALDisableContext isDisable = walDisableContext; + // Logging was not resumed yet. - if (currWrHandle == null) + if (currWrHandle == null || (isDisable != null && isDisable.check())) return null; // Need to calculate record size first. @@ -891,9 +898,7 @@ public class FsyncModeFileWriteAheadLogManager extends GridCacheSharedManagerAda /** {@inheritDoc} */ @Override public boolean disabled(int grpId) { - CacheGroupContext ctx = cctx.cache().cacheGroup(grpId); - - return ctx != null && !ctx.walEnabled(); + return cctx.walState().isDisabled(grpId); } /** {@inheritDoc} */ http://git-wip-us.apache.org/repos/asf/ignite/blob/8adff242/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/db/wal/IgniteNodeStoppedDuringDisableWALTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/db/wal/IgniteNodeStoppedDuringDisableWALTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/db/wal/IgniteNodeStoppedDuringDisableWALTest.java new file mode 100644 index 0000000..80198e8 --- /dev/null +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/db/wal/IgniteNodeStoppedDuringDisableWALTest.java @@ -0,0 +1,261 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.cache.persistence.db.wal; + +import java.io.File; +import java.io.IOException; +import java.nio.file.FileVisitResult; +import java.nio.file.Path; +import java.nio.file.SimpleFileVisitor; +import java.nio.file.attribute.BasicFileAttributes; +import java.util.HashMap; +import org.apache.ignite.Ignite; +import org.apache.ignite.IgniteCheckedException; +import org.apache.ignite.IgniteDataStreamer; +import org.apache.ignite.configuration.CacheConfiguration; +import org.apache.ignite.configuration.DataRegionConfiguration; +import org.apache.ignite.configuration.DataStorageConfiguration; +import org.apache.ignite.configuration.IgniteConfiguration; +import org.apache.ignite.internal.IgniteEx; +import org.apache.ignite.internal.pagemem.wal.IgniteWriteAheadLogManager; +import org.apache.ignite.internal.pagemem.wal.WALIterator; +import org.apache.ignite.internal.processors.cache.GridCacheSharedContext; +import org.apache.ignite.internal.processors.cache.WalStateManager.WALDisableContext; +import org.apache.ignite.internal.processors.cache.persistence.GridCacheDatabaseSharedManager; +import org.apache.ignite.internal.processors.cache.persistence.filename.PdsFoldersResolver; +import org.apache.ignite.spi.discovery.tcp.TcpDiscoverySpi; +import org.apache.ignite.spi.discovery.tcp.ipfinder.TcpDiscoveryIpFinder; +import org.apache.ignite.spi.discovery.tcp.ipfinder.vm.TcpDiscoveryVmIpFinder; +import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest; +import org.junit.Assert; + +import static java.nio.file.FileVisitResult.CONTINUE; +import static java.nio.file.Files.walkFileTree; +import static org.apache.ignite.internal.processors.cache.persistence.GridCacheDatabaseSharedManager.CP_FILE_NAME_PATTERN; +import static org.apache.ignite.internal.processors.cache.persistence.GridCacheDatabaseSharedManager.FILE_TMP_SUFFIX; +import static org.apache.ignite.internal.processors.cache.persistence.file.FilePageStoreManager.INDEX_FILE_NAME; +import static org.apache.ignite.internal.processors.cache.persistence.file.FilePageStoreManager.META_STORAGE_NAME; +import static org.apache.ignite.internal.processors.cache.persistence.file.FilePageStoreManager.PART_FILE_PREFIX; +import static org.apache.ignite.internal.processors.cache.persistence.wal.FileWriteAheadLogManager.WAL_NAME_PATTERN; +import static org.apache.ignite.internal.processors.cache.persistence.wal.FileWriteAheadLogManager.WAL_TEMP_NAME_PATTERN; +import static org.apache.ignite.testframework.GridTestUtils.setFieldValue; + +/*** + * + */ +public class IgniteNodeStoppedDuringDisableWALTest extends GridCommonAbstractTest { + /** */ + public static final TcpDiscoveryIpFinder IP_FINDER = new TcpDiscoveryVmIpFinder(true); + + /** {@inheritDoc} */ + @Override protected IgniteConfiguration getConfiguration(String name) throws Exception { + IgniteConfiguration cfg = super.getConfiguration(name); + + cfg.setDiscoverySpi(new TcpDiscoverySpi().setIpFinder(IP_FINDER)); + + cfg.setDataStorageConfiguration( + new DataStorageConfiguration() + .setDefaultDataRegionConfiguration( + new DataRegionConfiguration() + .setPersistenceEnabled(true) + ) + ); + + cfg.setAutoActivationEnabled(false); + + cfg.setCacheConfiguration(new CacheConfiguration(DEFAULT_CACHE_NAME)); + + return cfg; + } + + /** {@inheritDoc} */ + @Override protected void beforeTest() throws Exception { + super.beforeTest(); + + cleanPersistenceDir(); + } + + /** + * @throws Exception If failed. + */ + public void test() throws Exception { + for (NodeStopPoint nodeStopPoint : NodeStopPoint.values()) { + testStopNodeWithDisableWAL(nodeStopPoint); + + stopAllGrids(); + + cleanPersistenceDir(); + } + } + + /** + * @param nodeStopPoint Stop point. + * @throws Exception If failed. + */ + private void testStopNodeWithDisableWAL(NodeStopPoint nodeStopPoint) throws Exception { + log.info("Start test crash " + nodeStopPoint); + + IgniteEx ig0 = startGrid(0); + + GridCacheSharedContext<Object, Object> sharedContext = ig0.context().cache().context(); + + GridCacheDatabaseSharedManager dbMgr = (GridCacheDatabaseSharedManager)sharedContext.database(); + IgniteWriteAheadLogManager WALmgr = sharedContext.wal(); + + WALDisableContext walDisableContext = new WALDisableContext(dbMgr, sharedContext.pageStore(), log) { + @Override protected void writeMetaStoreDisableWALFlag() throws IgniteCheckedException { + if (nodeStopPoint == NodeStopPoint.BEFORE_WRITE_KEY_TO_META_STORE) + failNode(nodeStopPoint); + + super.writeMetaStoreDisableWALFlag(); + + if (nodeStopPoint == NodeStopPoint.AFTER_WRITE_KEY_TO_META_STORE) + failNode(nodeStopPoint); + } + + @Override protected void removeMetaStoreDisableWALFlag() throws IgniteCheckedException { + if (nodeStopPoint == NodeStopPoint.AFTER_CHECKPOINT_AFTER_ENABLE_WAL) + failNode(nodeStopPoint); + + super.removeMetaStoreDisableWALFlag(); + + if (nodeStopPoint == NodeStopPoint.AFTER_REMOVE_KEY_TO_META_STORE) + failNode(nodeStopPoint); + } + + @Override protected void disableWAL(boolean disable) throws IgniteCheckedException { + if (disable) { + if (nodeStopPoint == NodeStopPoint.AFTER_CHECKPOINT_BEFORE_DISABLE_WAL) + failNode(nodeStopPoint); + + super.disableWAL(disable); + + if (nodeStopPoint == NodeStopPoint.AFTER_DISABLE_WAL) + failNode(nodeStopPoint); + + } + else { + super.disableWAL(disable); + + if (nodeStopPoint == NodeStopPoint.AFTER_ENABLE_WAL) + failNode(nodeStopPoint); + } + } + }; + + setFieldValue(sharedContext.walState(), "walDisableContext", walDisableContext); + + setFieldValue(WALmgr, "walDisableContext", walDisableContext); + + ig0.context().internalSubscriptionProcessor().registerMetastorageListener(walDisableContext); + + ig0.cluster().active(true); + + try (IgniteDataStreamer<Integer, Integer> st = ig0.dataStreamer(DEFAULT_CACHE_NAME)) { + st.allowOverwrite(true); + + for (int i = 0; i < 10_000; i++) + st.addData(i, -i); + } + + boolean fail = false; + + try (WALIterator it = sharedContext.wal().replay(null)) { + dbMgr.applyUpdatesOnRecovery(it, (tup) -> true, (entry) -> true, new HashMap<>()); + } + catch (IgniteCheckedException e) { + if (nodeStopPoint.needCleanUp) + fail = true; + } + + Assert.assertEquals(nodeStopPoint.needCleanUp, fail); + + Ignite ig1 = startGrid(0); + + String msg = nodeStopPoint.toString(); + + if (nodeStopPoint.needCleanUp) { + PdsFoldersResolver foldersResolver = ((IgniteEx)ig1).context().pdsFolderResolver(); + + File root = foldersResolver.resolveFolders().persistentStoreRootPath(); + + walkFileTree(root.toPath(), new SimpleFileVisitor<Path>() { + @Override public FileVisitResult visitFile(Path path, BasicFileAttributes attrs) throws IOException { + String name = path.toFile().getName(); + + String filePath = path.toString(); + + if (path.toFile().getParentFile().getName().equals(META_STORAGE_NAME)) + return CONTINUE; + + if (WAL_NAME_PATTERN.matcher(name).matches() || WAL_TEMP_NAME_PATTERN.matcher(name).matches()) + return CONTINUE; + + boolean failed = false; + + if (name.endsWith(FILE_TMP_SUFFIX)) + failed = true; + + if (CP_FILE_NAME_PATTERN.matcher(name).matches()) + failed = true; + + if (name.startsWith(PART_FILE_PREFIX)) + failed = true; + + if (name.startsWith(INDEX_FILE_NAME)) + failed = true; + + if (failed) + fail(msg + " " + filePath); + + return CONTINUE; + } + }); + } + } + + /** + * @param nodeStopPoint Stop point. + * @throws IgniteCheckedException Always throws exception. + */ + private void failNode(NodeStopPoint nodeStopPoint) throws IgniteCheckedException { + stopGrid(0, true); + + throw new IgniteCheckedException(nodeStopPoint.toString()); + } + + /** + * Crash point. + */ + private enum NodeStopPoint { + BEFORE_WRITE_KEY_TO_META_STORE(false), + AFTER_WRITE_KEY_TO_META_STORE(true), + AFTER_CHECKPOINT_BEFORE_DISABLE_WAL(true), + AFTER_DISABLE_WAL(true), + AFTER_ENABLE_WAL(true), + AFTER_CHECKPOINT_AFTER_ENABLE_WAL(true), + AFTER_REMOVE_KEY_TO_META_STORE(false); + + /** Clean up flag. */ + private final boolean needCleanUp; + + NodeStopPoint(boolean up) { + needCleanUp = up; + } + } +} http://git-wip-us.apache.org/repos/asf/ignite/blob/8adff242/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/pagemem/NoOpPageStoreManager.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/pagemem/NoOpPageStoreManager.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/pagemem/NoOpPageStoreManager.java index ba236af..c61b3c0 100644 --- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/pagemem/NoOpPageStoreManager.java +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/pagemem/NoOpPageStoreManager.java @@ -221,4 +221,9 @@ public class NoOpPageStoreManager implements IgnitePageStoreManager { @Override public void cleanupPersistentSpace(CacheConfiguration cacheConfiguration) throws IgniteCheckedException { // No-op. } + + /** {@inheritDoc} */ + @Override public void cleanupPersistentSpace() throws IgniteCheckedException { + // No-op. + } } http://git-wip-us.apache.org/repos/asf/ignite/blob/8adff242/modules/core/src/test/java/org/apache/ignite/testsuites/IgnitePdsTestSuite2.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/testsuites/IgnitePdsTestSuite2.java b/modules/core/src/test/java/org/apache/ignite/testsuites/IgnitePdsTestSuite2.java index aac759b..24255f7 100644 --- a/modules/core/src/test/java/org/apache/ignite/testsuites/IgnitePdsTestSuite2.java +++ b/modules/core/src/test/java/org/apache/ignite/testsuites/IgnitePdsTestSuite2.java @@ -43,6 +43,7 @@ import org.apache.ignite.internal.processors.cache.persistence.db.IgnitePdsWhole import org.apache.ignite.internal.processors.cache.persistence.db.checkpoint.IgniteCheckpointDirtyPagesForLowLoadTest; import org.apache.ignite.internal.processors.cache.persistence.db.IgnitePdsUnusedWalSegmentsTest; import org.apache.ignite.internal.processors.cache.persistence.db.filename.IgniteUidAsConsistentIdMigrationTest; +import org.apache.ignite.internal.processors.cache.persistence.db.wal.IgniteNodeStoppedDuringDisableWALTest; import org.apache.ignite.internal.processors.cache.persistence.db.wal.IgniteWalFlushBackgroundSelfTest; import org.apache.ignite.internal.processors.cache.persistence.db.wal.IgniteWalFlushBackgroundWithMmapBufferSelfTest; import org.apache.ignite.internal.processors.cache.persistence.db.wal.IgniteWalFlushFsyncSelfTest; @@ -180,5 +181,7 @@ public class IgnitePdsTestSuite2 extends TestSuite { suite.addTestSuite(IgniteWalIteratorSwitchSegmentTest.class); suite.addTestSuite(IgniteWalIteratorExceptionDuringReadTest.class); + + suite.addTestSuite(IgniteNodeStoppedDuringDisableWALTest.class); } }