Repository: ignite Updated Branches: refs/heads/master ae7b9900e -> f07bdaa07
IGNITE-9601 Write rollover WAL record as the last record in current segment - Fixes #4762. Signed-off-by: Ivan Rakov <[email protected]> Project: http://git-wip-us.apache.org/repos/asf/ignite/repo Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/f07bdaa0 Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/f07bdaa0 Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/f07bdaa0 Branch: refs/heads/master Commit: f07bdaa07b15465e7bc532630c5c3fe0ee74d23c Parents: ae7b990 Author: Andrey Kuznetsov <[email protected]> Authored: Fri Oct 12 21:45:53 2018 +0300 Committer: Ivan Rakov <[email protected]> Committed: Fri Oct 12 21:45:53 2018 +0300 ---------------------------------------------------------------------- .../pagemem/wal/IgniteWriteAheadLogManager.java | 19 +- .../pagemem/wal/record/RolloverType.java | 38 ++ .../pagemem/wal/record/SnapshotRecord.java | 5 - .../internal/pagemem/wal/record/WALRecord.java | 7 - .../wal/FileWriteAheadLogManager.java | 76 +++- .../wal/FsyncModeFileWriteAheadLogManager.java | 51 ++- .../db/wal/WalRolloverRecordLoggingTest.java | 8 +- .../db/wal/WalRolloverTypesTest.java | 366 +++++++++++++++++++ .../persistence/pagemem/NoOpWALManager.java | 9 +- .../StandaloneWalRecordsIteratorTest.java | 3 +- .../ignite/testsuites/IgnitePdsTestSuite2.java | 3 + 11 files changed, 536 insertions(+), 49 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/ignite/blob/f07bdaa0/modules/core/src/main/java/org/apache/ignite/internal/pagemem/wal/IgniteWriteAheadLogManager.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/pagemem/wal/IgniteWriteAheadLogManager.java b/modules/core/src/main/java/org/apache/ignite/internal/pagemem/wal/IgniteWriteAheadLogManager.java index 4ffa347..68428d0 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/pagemem/wal/IgniteWriteAheadLogManager.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/pagemem/wal/IgniteWriteAheadLogManager.java @@ -19,6 +19,7 @@ package org.apache.ignite.internal.pagemem.wal; import org.apache.ignite.IgniteCheckedException; import org.apache.ignite.IgniteException; +import org.apache.ignite.internal.pagemem.wal.record.RolloverType; import org.apache.ignite.internal.pagemem.wal.record.WALRecord; import org.apache.ignite.internal.processors.cache.GridCacheSharedManager; import org.apache.ignite.internal.processors.cache.persistence.StorageException; @@ -52,7 +53,7 @@ public interface IgniteWriteAheadLogManager extends GridCacheSharedManager, Igni /** * Appends the given log entry to the write-ahead log. * - * @param entry entry to log. + * @param entry Entry to log. * @return WALPointer that may be passed to {@link #flush(WALPointer, boolean)} method to make sure the record is * written to the log. * @throws IgniteCheckedException If failed to construct log entry. @@ -61,6 +62,22 @@ public interface IgniteWriteAheadLogManager extends GridCacheSharedManager, Igni public WALPointer log(WALRecord entry) throws IgniteCheckedException, StorageException; /** + * Appends the given log entry to the write-ahead log. If entry logging leads to rollover, caller can specify + * whether to write the entry to the current segment or to th next one. + * + * @param entry Entry to log. + * @param rolloverType Rollover type. + * @return WALPointer that may be passed to {@link #flush(WALPointer, boolean)} method to make sure the record is + * written to the log. + * @throws IgniteCheckedException If failed to construct log entry. + * @throws StorageException If IO error occurred while writing log entry. + * + * @see RolloverType + */ + public WALPointer log(WALRecord entry, RolloverType rolloverType) + throws IgniteCheckedException, StorageException; + + /** * Makes sure that all log entries written to the log up until the specified pointer are actually written * to the underlying storage. * http://git-wip-us.apache.org/repos/asf/ignite/blob/f07bdaa0/modules/core/src/main/java/org/apache/ignite/internal/pagemem/wal/record/RolloverType.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/pagemem/wal/record/RolloverType.java b/modules/core/src/main/java/org/apache/ignite/internal/pagemem/wal/record/RolloverType.java new file mode 100644 index 0000000..1d99de1 --- /dev/null +++ b/modules/core/src/main/java/org/apache/ignite/internal/pagemem/wal/record/RolloverType.java @@ -0,0 +1,38 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.pagemem.wal.record; + +/** + * Defines WAL logging type with regard to segment rollover. + */ +public enum RolloverType { + /** Record being logged is not a rollover record. */ + NONE, + + /** + * Record being logged is a rollover record and it should get to the current segment whenever possible. + * If current segment is full, then the record gets to the next segment. Anyway, logging implementation should + * guarantee segment rollover afterwards. + */ + CURRENT_SEGMENT, + + /** + * Record being logged is a rollover record and it should become the first record in the next segment. + */ + NEXT_SEGMENT; +} http://git-wip-us.apache.org/repos/asf/ignite/blob/f07bdaa0/modules/core/src/main/java/org/apache/ignite/internal/pagemem/wal/record/SnapshotRecord.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/pagemem/wal/record/SnapshotRecord.java b/modules/core/src/main/java/org/apache/ignite/internal/pagemem/wal/record/SnapshotRecord.java index c6b6329..caa1494 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/pagemem/wal/record/SnapshotRecord.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/pagemem/wal/record/SnapshotRecord.java @@ -51,11 +51,6 @@ public class SnapshotRecord extends WALRecord { return full; } - /** {@inheritDoc} */ - @Override public boolean rollOver() { - return true; - } - /** * */ http://git-wip-us.apache.org/repos/asf/ignite/blob/f07bdaa0/modules/core/src/main/java/org/apache/ignite/internal/pagemem/wal/record/WALRecord.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/pagemem/wal/record/WALRecord.java b/modules/core/src/main/java/org/apache/ignite/internal/pagemem/wal/record/WALRecord.java index 667f8d9..e22adc0 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/pagemem/wal/record/WALRecord.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/pagemem/wal/record/WALRecord.java @@ -291,13 +291,6 @@ public abstract class WALRecord { } /** - * @return Need wal rollOver. - */ - public boolean rollOver(){ - return false; - } - - /** * @return Entry type. */ public abstract RecordType type(); http://git-wip-us.apache.org/repos/asf/ignite/blob/f07bdaa0/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/FileWriteAheadLogManager.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/FileWriteAheadLogManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/FileWriteAheadLogManager.java index b4ce0f1..5c6502b 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/FileWriteAheadLogManager.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/FileWriteAheadLogManager.java @@ -45,6 +45,7 @@ import java.util.HashMap; import java.util.HashSet; import java.util.List; import java.util.Map; +import java.util.Objects; import java.util.Set; import java.util.TreeSet; import java.util.concurrent.ConcurrentHashMap; @@ -82,6 +83,7 @@ import org.apache.ignite.internal.pagemem.wal.WALIterator; import org.apache.ignite.internal.pagemem.wal.WALPointer; import org.apache.ignite.internal.pagemem.wal.record.CheckpointRecord; import org.apache.ignite.internal.pagemem.wal.record.MarshalledRecord; +import org.apache.ignite.internal.pagemem.wal.record.RolloverType; import org.apache.ignite.internal.pagemem.wal.record.SwitchSegmentRecord; import org.apache.ignite.internal.pagemem.wal.record.WALRecord; import org.apache.ignite.internal.processors.cache.GridCacheSharedContext; @@ -95,15 +97,15 @@ import org.apache.ignite.internal.processors.cache.persistence.file.FileIOFactor import org.apache.ignite.internal.processors.cache.persistence.file.FilePageStoreManager; import org.apache.ignite.internal.processors.cache.persistence.file.RandomAccessFileIOFactory; import org.apache.ignite.internal.processors.cache.persistence.filename.PdsFolderSettings; +import org.apache.ignite.internal.processors.cache.persistence.wal.aware.SegmentAware; import org.apache.ignite.internal.processors.cache.persistence.wal.crc.IgniteDataIntegrityViolationException; import org.apache.ignite.internal.processors.cache.persistence.wal.crc.PureJavaCrc32; import org.apache.ignite.internal.processors.cache.persistence.wal.io.FileInput; -import org.apache.ignite.internal.processors.cache.persistence.wal.io.SegmentFileInputFactory; import org.apache.ignite.internal.processors.cache.persistence.wal.io.LockedSegmentFileInputFactory; +import org.apache.ignite.internal.processors.cache.persistence.wal.io.SegmentFileInputFactory; import org.apache.ignite.internal.processors.cache.persistence.wal.io.SegmentIO; import org.apache.ignite.internal.processors.cache.persistence.wal.io.SimpleSegmentFileInputFactory; import org.apache.ignite.internal.processors.cache.persistence.wal.record.HeaderRecord; -import org.apache.ignite.internal.processors.cache.persistence.wal.aware.SegmentAware; import org.apache.ignite.internal.processors.cache.persistence.wal.serializer.RecordSerializer; import org.apache.ignite.internal.processors.cache.persistence.wal.serializer.RecordSerializerFactory; import org.apache.ignite.internal.processors.cache.persistence.wal.serializer.RecordSerializerFactoryImpl; @@ -705,7 +707,7 @@ public class FileWriteAheadLogManager extends GridCacheSharedManagerAdapter impl "serializer to a new WAL segment [curFile=" + currHnd + ", newVer=" + serializer.version() + ", oldVer=" + currHnd.serializer.version() + ']'); - rollOver(currHnd); + rollOver(currHnd, null); } currHnd.resume = false; @@ -787,7 +789,7 @@ public class FileWriteAheadLogManager extends GridCacheSharedManagerAdapter impl try { handle.buf.close(); - rollOver(handle); + rollOver(handle, null); } catch (IgniteCheckedException e) { U.error(log, "Unable to perform segment rollover: " + e.getMessage(), e); @@ -797,8 +799,12 @@ public class FileWriteAheadLogManager extends GridCacheSharedManagerAdapter impl } /** {@inheritDoc} */ - @SuppressWarnings("TooBroadScope") - @Override public WALPointer log(WALRecord rec) throws IgniteCheckedException, StorageException { + @Override public WALPointer log(WALRecord rec) throws IgniteCheckedException { + return log(rec, RolloverType.NONE); + } + + /** {@inheritDoc} */ + @Override public WALPointer log(WALRecord rec, RolloverType rolloverType) throws IgniteCheckedException { if (serializer == null || mode == WALMode.NONE) return null; @@ -814,21 +820,32 @@ public class FileWriteAheadLogManager extends GridCacheSharedManagerAdapter impl rec.size(serializer.size(rec)); while (true) { - if (rec.rollOver()) { - assert cctx.database().checkpointLockIsHeldByThread(); + WALPointer ptr; - long idx = currWrHandle.getSegmentId(); + if (rolloverType == RolloverType.NONE) + ptr = currWrHandle.addRecord(rec); + else { + assert cctx.database().checkpointLockIsHeldByThread(); - currWrHandle.buf.close(); + if (rolloverType == RolloverType.NEXT_SEGMENT) { + WALPointer pos = rec.position(); - currWrHandle = rollOver(currWrHandle); + do { + // This will change rec.position() unless concurrent rollover happened. + currWrHandle = closeBufAndRollover(currWrHandle, rec, rolloverType); + } + while (Objects.equals(pos, rec.position())); - if (log != null && log.isInfoEnabled()) - log.info("Rollover segment [" + idx + " to " + currWrHandle.getSegmentId() + "], recordType=" + rec.type()); + ptr = rec.position(); + } + else if (rolloverType == RolloverType.CURRENT_SEGMENT) { + if ((ptr = currWrHandle.addRecord(rec)) != null) + currWrHandle = closeBufAndRollover(currWrHandle, rec, rolloverType); + } + else + throw new IgniteCheckedException("Unknown rollover type: " + rolloverType); } - WALPointer ptr = currWrHandle.addRecord(rec); - if (ptr != null) { metrics.onWalRecordLogged(); @@ -840,7 +857,7 @@ public class FileWriteAheadLogManager extends GridCacheSharedManagerAdapter impl return ptr; } else - currWrHandle = rollOver(currWrHandle); + currWrHandle = rollOver(currWrHandle, null); checkNode(); @@ -849,6 +866,24 @@ public class FileWriteAheadLogManager extends GridCacheSharedManagerAdapter impl } } + /** */ + private FileWriteHandle closeBufAndRollover( + FileWriteHandle currWriteHandle, + WALRecord rec, + RolloverType rolloverType + ) throws IgniteCheckedException { + long idx = currWriteHandle.getSegmentId(); + + currWriteHandle.buf.close(); + + FileWriteHandle res = rollOver(currWriteHandle, rolloverType == RolloverType.NEXT_SEGMENT ? rec : null); + + if (log != null && log.isInfoEnabled()) + log.info("Rollover segment [" + idx + " to " + res.getSegmentId() + "], recordType=" + rec.type()); + + return res; + } + /** {@inheritDoc} */ @Override public void flush(WALPointer ptr, boolean explicitFsync) throws IgniteCheckedException, StorageException { if (serializer == null || mode == WALMode.NONE) @@ -1179,9 +1214,10 @@ public class FileWriteAheadLogManager extends GridCacheSharedManagerAdapter impl /** * @param cur Handle that failed to fit the given entry. + * @param rec Optional record to be added right after header. * @return Handle that will fit the entry. */ - private FileWriteHandle rollOver(FileWriteHandle cur) throws StorageException, IgniteCheckedException { + private FileWriteHandle rollOver(FileWriteHandle cur, @Nullable WALRecord rec) throws IgniteCheckedException { FileWriteHandle hnd = currentHandle(); if (hnd != cur) @@ -1195,6 +1231,12 @@ public class FileWriteAheadLogManager extends GridCacheSharedManagerAdapter impl next.writeHeader(); + if (rec != null) { + WALPointer ptr = next.addRecord(rec); + + assert ptr != null; + } + if (next.getSegmentId() - lashCheckpointFileIdx() >= maxSegCountWithoutCheckpoint) cctx.database().forceCheckpoint("too big size of WAL without checkpoint"); http://git-wip-us.apache.org/repos/asf/ignite/blob/f07bdaa0/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/FsyncModeFileWriteAheadLogManager.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/FsyncModeFileWriteAheadLogManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/FsyncModeFileWriteAheadLogManager.java index fc1461a..7bfd10b 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/FsyncModeFileWriteAheadLogManager.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/FsyncModeFileWriteAheadLogManager.java @@ -41,6 +41,7 @@ import java.util.HashSet; import java.util.List; import java.util.Map; import java.util.NavigableMap; +import java.util.Objects; import java.util.Set; import java.util.TreeMap; import java.util.TreeSet; @@ -77,6 +78,7 @@ import org.apache.ignite.internal.pagemem.wal.IgniteWriteAheadLogManager; import org.apache.ignite.internal.pagemem.wal.WALIterator; import org.apache.ignite.internal.pagemem.wal.WALPointer; import org.apache.ignite.internal.pagemem.wal.record.MarshalledRecord; +import org.apache.ignite.internal.pagemem.wal.record.RolloverType; import org.apache.ignite.internal.pagemem.wal.record.SwitchSegmentRecord; import org.apache.ignite.internal.pagemem.wal.record.WALRecord; import org.apache.ignite.internal.processors.cache.GridCacheSharedContext; @@ -547,7 +549,7 @@ public class FsyncModeFileWriteAheadLogManager extends GridCacheSharedManagerAda "serializer to a new WAL segment [curFile=" + currentHnd + ", newVer=" + serializer.version() + ", oldVer=" + currentHnd.serializer.version() + ']'); - rollOver(currentHnd); + rollOver(currentHnd, null); } if (mode == WALMode.BACKGROUND) { @@ -679,7 +681,7 @@ public class FsyncModeFileWriteAheadLogManager extends GridCacheSharedManagerAda final FileWriteHandle handle = currentHandle(); try { - rollOver(handle); + rollOver(handle, null); } catch (IgniteCheckedException e) { U.error(log, "Unable to perform segment rollover: " + e.getMessage(), e); @@ -689,8 +691,12 @@ public class FsyncModeFileWriteAheadLogManager extends GridCacheSharedManagerAda } /** {@inheritDoc} */ - @SuppressWarnings("TooBroadScope") - @Override public WALPointer log(WALRecord record) throws IgniteCheckedException, StorageException { + @Override public WALPointer log(WALRecord record) throws IgniteCheckedException { + return log(record, RolloverType.NONE); + } + + /** {@inheritDoc} */ + @Override public WALPointer log(WALRecord record, RolloverType rolloverType) throws IgniteCheckedException { if (serializer == null || mode == WALMode.NONE) return null; @@ -706,13 +712,31 @@ public class FsyncModeFileWriteAheadLogManager extends GridCacheSharedManagerAda record.size(serializer.size(record)); while (true) { - if (record.rollOver()){ + WALPointer ptr; + + if (rolloverType == RolloverType.NONE) + ptr = currWrHandle.addRecord(record); + else { assert cctx.database().checkpointLockIsHeldByThread(); - currWrHandle = rollOver(currWrHandle); - } + if (rolloverType == RolloverType.NEXT_SEGMENT) { + WALPointer pos = record.position(); - WALPointer ptr = currWrHandle.addRecord(record); + do { + // This will change record.position() unless concurrent rollover happened. + currWrHandle = rollOver(currWrHandle, record); + } + while (Objects.equals(pos, record.position())); + + ptr = record.position(); + } + else if (rolloverType == RolloverType.CURRENT_SEGMENT) { + if ((ptr = currWrHandle.addRecord(record)) != null) + currWrHandle = rollOver(currWrHandle, null); + } + else + throw new IgniteCheckedException("Unknown rollover type: " + rolloverType); + } if (ptr != null) { metrics.onWalRecordLogged(); @@ -725,7 +749,7 @@ public class FsyncModeFileWriteAheadLogManager extends GridCacheSharedManagerAda return ptr; } else - currWrHandle = rollOver(currWrHandle); + currWrHandle = rollOver(currWrHandle, null); checkNode(); @@ -1115,9 +1139,10 @@ public class FsyncModeFileWriteAheadLogManager extends GridCacheSharedManagerAda /** * @param cur Handle that failed to fit the given entry. + * @param rec Optional record to be added to the beginning of the segment. * @return Handle that will fit the entry. */ - private FileWriteHandle rollOver(FileWriteHandle cur) throws IgniteCheckedException { + private FileWriteHandle rollOver(FileWriteHandle cur, @Nullable WALRecord rec) throws IgniteCheckedException { FileWriteHandle hnd = currentHandle(); if (hnd != cur) @@ -1129,6 +1154,12 @@ public class FsyncModeFileWriteAheadLogManager extends GridCacheSharedManagerAda FileWriteHandle next = initNextWriteHandle(cur.getSegmentId()); + if (rec != null) { + WALPointer ptr = next.addRecord(rec); + + assert ptr != null; + } + if (next.getSegmentId() - lashCheckpointFileIdx() >= maxSegCountWithoutCheckpoint) cctx.database().forceCheckpoint("too big size of WAL without checkpoint"); http://git-wip-us.apache.org/repos/asf/ignite/blob/f07bdaa0/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/db/wal/WalRolloverRecordLoggingTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/db/wal/WalRolloverRecordLoggingTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/db/wal/WalRolloverRecordLoggingTest.java index 395b03a..df6f57d 100644 --- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/db/wal/WalRolloverRecordLoggingTest.java +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/db/wal/WalRolloverRecordLoggingTest.java @@ -30,6 +30,7 @@ import org.apache.ignite.internal.IgniteInternalFuture; import org.apache.ignite.internal.IgniteInterruptedCheckedException; import org.apache.ignite.internal.pagemem.wal.IgniteWriteAheadLogManager; import org.apache.ignite.internal.pagemem.wal.record.CheckpointRecord; +import org.apache.ignite.internal.pagemem.wal.record.RolloverType; import org.apache.ignite.internal.processors.cache.persistence.IgniteCacheDatabaseSharedManager; import org.apache.ignite.internal.util.typedef.internal.U; import org.apache.ignite.spi.discovery.tcp.TcpDiscoverySpi; @@ -51,11 +52,6 @@ public abstract class WalRolloverRecordLoggingTest extends GridCommonAbstractTes private RolloverRecord() { super(null); } - - /** {@inheritDoc} */ - @Override public boolean rollOver() { - return true; - } } /** {@inheritDoc} */ @@ -142,7 +138,7 @@ public abstract class WalRolloverRecordLoggingTest extends GridCommonAbstractTes dbMgr.checkpointReadLock(); try { - walMgr.log(rec); + walMgr.log(rec, RolloverType.NEXT_SEGMENT); } finally { dbMgr.checkpointReadUnlock(); http://git-wip-us.apache.org/repos/asf/ignite/blob/f07bdaa0/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/db/wal/WalRolloverTypesTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/db/wal/WalRolloverTypesTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/db/wal/WalRolloverTypesTest.java new file mode 100644 index 0000000..122ecb6 --- /dev/null +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/db/wal/WalRolloverTypesTest.java @@ -0,0 +1,366 @@ +/* +* Licensed to the Apache Software Foundation (ASF) under one or more +* contributor license agreements. See the NOTICE file distributed with +* this work for additional information regarding copyright ownership. +* The ASF licenses this file to You under the Apache License, Version 2.0 +* (the "License"); you may not use this file except in compliance with +* the License. You may obtain a copy of the License at +* +* http://www.apache.org/licenses/LICENSE-2.0 +* +* Unless required by applicable law or agreed to in writing, software +* distributed under the License is distributed on an "AS IS" BASIS, +* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +* See the License for the specific language governing permissions and +* limitations under the License. +*/ +package org.apache.ignite.internal.processors.cache.persistence.db.wal; + +import java.util.concurrent.ThreadLocalRandom; +import org.apache.ignite.IgniteCache; +import org.apache.ignite.IgniteCheckedException; +import org.apache.ignite.configuration.DataRegionConfiguration; +import org.apache.ignite.configuration.DataStorageConfiguration; +import org.apache.ignite.configuration.IgniteConfiguration; +import org.apache.ignite.configuration.WALMode; +import org.apache.ignite.internal.IgniteEx; +import org.apache.ignite.internal.IgniteInternalFuture; +import org.apache.ignite.internal.pagemem.wal.IgniteWriteAheadLogManager; +import org.apache.ignite.internal.pagemem.wal.WALPointer; +import org.apache.ignite.internal.pagemem.wal.record.CheckpointRecord; +import org.apache.ignite.internal.processors.cache.persistence.IgniteCacheDatabaseSharedManager; +import org.apache.ignite.internal.processors.cache.persistence.wal.FileWALPointer; +import org.apache.ignite.internal.util.typedef.internal.U; +import org.apache.ignite.spi.checkpoint.noop.NoopCheckpointSpi; +import org.apache.ignite.spi.discovery.tcp.TcpDiscoverySpi; +import org.apache.ignite.spi.discovery.tcp.ipfinder.TcpDiscoveryIpFinder; +import org.apache.ignite.spi.discovery.tcp.ipfinder.vm.TcpDiscoveryVmIpFinder; +import org.apache.ignite.testframework.GridTestUtils; +import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest; + +import static org.apache.ignite.configuration.DataStorageConfiguration.DFLT_WAL_ARCHIVE_PATH; +import static org.apache.ignite.configuration.DataStorageConfiguration.DFLT_WAL_PATH; +import static org.apache.ignite.configuration.WALMode.FSYNC; +import static org.apache.ignite.configuration.WALMode.LOG_ONLY; +import static org.apache.ignite.internal.pagemem.wal.record.RolloverType.CURRENT_SEGMENT; +import static org.apache.ignite.internal.pagemem.wal.record.RolloverType.NEXT_SEGMENT; +import static org.apache.ignite.internal.processors.cache.persistence.wal.serializer.RecordV1Serializer.HEADER_RECORD_SIZE; + +/** + * + */ +public class WalRolloverTypesTest extends GridCommonAbstractTest { + /** */ + private static final TcpDiscoveryIpFinder IP_FINDER = new TcpDiscoveryVmIpFinder(true); + + /** */ + private WALMode walMode; + + /** */ + private boolean disableWALArchiving; + + /** */ + private static class AdHocWALRecord extends CheckpointRecord { + /** */ + private AdHocWALRecord() { + super(null); + } + } + + /** {@inheritDoc} */ + @Override protected IgniteConfiguration getConfiguration(String name) throws Exception { + IgniteConfiguration cfg = super.getConfiguration(name); + + cfg.setDiscoverySpi(new TcpDiscoverySpi().setIpFinder(IP_FINDER)); + + cfg.setDataStorageConfiguration(new DataStorageConfiguration() + .setDefaultDataRegionConfiguration(new DataRegionConfiguration() + .setPersistenceEnabled(true) + .setMaxSize(20 * 1024 * 1024)) + .setWalMode(walMode) + .setWalArchivePath(disableWALArchiving ? DFLT_WAL_PATH : DFLT_WAL_ARCHIVE_PATH) + .setWalSegmentSize(4 * 1024 * 1024)) + .setCheckpointSpi(new NoopCheckpointSpi()) + ; + + return cfg; + } + + /** {@inheritDoc} */ + @Override protected void beforeTest() throws Exception { + stopAllGrids(); + + cleanPersistenceDir(); + } + + /** {@inheritDoc} */ + @Override protected void afterTest() throws Exception { + stopAllGrids(); + + cleanPersistenceDir(); + } + + /** */ + public void testCurrentSegmentTypeLogOnlyModeArchiveOn() throws Exception { + checkCurrentSegmentType(LOG_ONLY, false); + } + + /** */ + public void testCurrentSegmentTypeLogOnlyModeArchiveOff() throws Exception { + checkCurrentSegmentType(LOG_ONLY, true); + } + + /** */ + public void testCurrentSegmentTypeLogFsyncModeArchiveOn() throws Exception { + checkCurrentSegmentType(FSYNC, false); + } + + /** */ + public void testCurrentSegmentTypeLogFsyncModeArchiveOff() throws Exception { + fail("https://issues.apache.org/jira/browse/IGNITE-9776"); + + checkCurrentSegmentType(FSYNC, true); + } + + /** */ + public void testNextSegmentTypeLogOnlyModeArchiveOn() throws Exception { + checkNextSegmentType(LOG_ONLY, false); + } + + /** */ + public void testNextSegmentTypeLogOnlyModeArchiveOff() throws Exception { + checkNextSegmentType(LOG_ONLY, true); + } + + /** */ + public void testNextSegmentTypeFsyncModeArchiveOn() throws Exception { + checkNextSegmentType(FSYNC, false); + } + + /** */ + public void testNextSegmentTypeFsyncModeArchiveOff() throws Exception { + fail("https://issues.apache.org/jira/browse/IGNITE-9776"); + + checkNextSegmentType(FSYNC, true); + } + + /** */ + private void checkCurrentSegmentType(WALMode mode, boolean disableArch) throws Exception { + walMode = mode; + disableWALArchiving = disableArch; + + IgniteEx ig = startGrid(0); + + ig.cluster().active(true); + + IgniteWriteAheadLogManager walMgr = ig.context().cache().context().wal(); + + ig.context().cache().context().database().checkpointReadLock(); + + try { + WALPointer ptr = walMgr.log(new AdHocWALRecord(), CURRENT_SEGMENT); + + assertEquals(0, ((FileWALPointer)ptr).index()); + } + finally { + ig.context().cache().context().database().checkpointReadUnlock(); + } + } + + /** */ + private void checkNextSegmentType(WALMode mode, boolean disableArch) throws Exception { + walMode = mode; + disableWALArchiving = disableArch; + + IgniteEx ig = startGrid(0); + + ig.cluster().active(true); + + IgniteWriteAheadLogManager walMgr = ig.context().cache().context().wal(); + + ig.context().cache().context().database().checkpointReadLock(); + + try { + WALPointer ptr = walMgr.log(new AdHocWALRecord(), NEXT_SEGMENT); + + assertEquals(1, ((FileWALPointer)ptr).index()); + } + finally { + ig.context().cache().context().database().checkpointReadUnlock(); + } + } + + /** */ + public void testNextSegmentTypeWithCacheActivityLogOnlyModeArchiveOn() throws Exception { + checkNextSegmentTypeWithCacheActivity(LOG_ONLY, false); + } + + /** */ + public void testNextSegmentTypeWithCacheActivityLogOnlyModeArchiveOff() throws Exception { + checkNextSegmentTypeWithCacheActivity(LOG_ONLY, true); + } + + /** */ + public void testNextSegmentTypeWithCacheActivityFsyncModeArchiveOn() throws Exception { + checkNextSegmentTypeWithCacheActivity(FSYNC, false); + } + + /** */ + public void testNextSegmentTypeWithCacheActivityFsyncModeArchiveOff() throws Exception { + fail("https://issues.apache.org/jira/browse/IGNITE-9776"); + + checkNextSegmentTypeWithCacheActivity(FSYNC, true); + } + + /** + * Under load, ensures the record gets into very beginning of the segment in {@code NEXT_SEGMENT} log mode. + */ + private void checkNextSegmentTypeWithCacheActivity(WALMode mode, boolean disableArch) throws Exception { + walMode = mode; + disableWALArchiving = disableArch; + + IgniteEx ig = startGrid(0); + + ig.cluster().active(true); + + IgniteCache<Integer, Integer> cache = ig.getOrCreateCache(DEFAULT_CACHE_NAME); + + final long testDuration = 30_000; + + long startTime = U.currentTimeMillis(); + + IgniteInternalFuture<Long> fut = GridTestUtils.runMultiThreadedAsync( + () -> { + ThreadLocalRandom random = ThreadLocalRandom.current(); + + while (U.currentTimeMillis() - startTime < testDuration) + cache.put(random.nextInt(100), random.nextInt(100_000)); + }, + 8, "cache-put-thread"); + + IgniteWriteAheadLogManager walMgr = ig.context().cache().context().wal(); + + IgniteCacheDatabaseSharedManager dbMgr = ig.context().cache().context().database(); + + AdHocWALRecord markerRecord = new AdHocWALRecord(); + + WALPointer ptr0; + WALPointer ptr1; + + do { + try { + U.sleep(1000); + + ptr0 = walMgr.log(markerRecord); + + dbMgr.checkpointReadLock(); + + try { + ptr1 = walMgr.log(markerRecord, NEXT_SEGMENT); + } + finally { + dbMgr.checkpointReadUnlock(); + } + + assertTrue(ptr0 instanceof FileWALPointer); + assertTrue(ptr1 instanceof FileWALPointer); + + assertTrue(((FileWALPointer)ptr0).index() < ((FileWALPointer)ptr1).index()); + + assertEquals(HEADER_RECORD_SIZE, ((FileWALPointer)ptr1).fileOffset()); + } + catch (IgniteCheckedException e) { + log.error(e.getMessage(), e); + } + } + while (U.currentTimeMillis() - startTime < testDuration); + + fut.get(); + } + + /** */ + public void testCurrentSegmentTypeWithCacheActivityLogOnlyModeArchiveOn() throws Exception { + checkCurrentSegmentTypeWithCacheActivity(LOG_ONLY, false); + } + + /** */ + public void testCurrentSegmentTypeWithCacheActivityLogOnlyModeArchiveOff() throws Exception { + checkCurrentSegmentTypeWithCacheActivity(LOG_ONLY, true); + } + + /** */ + public void testCurrentSegmentTypeWithCacheActivityFsyncModeArchiveOn() throws Exception { + checkCurrentSegmentTypeWithCacheActivity(FSYNC, false); + } + + /** */ + public void testCurrentSegmentTypeWithCacheActivityFsyncModeArchiveOff() throws Exception { + fail("https://issues.apache.org/jira/browse/IGNITE-9776"); + + checkCurrentSegmentTypeWithCacheActivity(FSYNC, true); + } + + /** + * Under load, ensures the record gets into very beginning of the segment in {@code NEXT_SEGMENT} log mode. + */ + private void checkCurrentSegmentTypeWithCacheActivity(WALMode mode, boolean disableArch) throws Exception { + walMode = mode; + disableWALArchiving = disableArch; + + IgniteEx ig = startGrid(0); + + ig.cluster().active(true); + + IgniteCache<Integer, Integer> cache = ig.getOrCreateCache(DEFAULT_CACHE_NAME); + + final long testDuration = 30_000; + + long startTime = U.currentTimeMillis(); + + IgniteInternalFuture<Long> fut = GridTestUtils.runMultiThreadedAsync( + () -> { + ThreadLocalRandom random = ThreadLocalRandom.current(); + + while (U.currentTimeMillis() - startTime < testDuration) + cache.put(random.nextInt(100), random.nextInt(100_000)); + }, + 8, "cache-put-thread"); + + IgniteWriteAheadLogManager walMgr = ig.context().cache().context().wal(); + + IgniteCacheDatabaseSharedManager dbMgr = ig.context().cache().context().database(); + + AdHocWALRecord markerRecord = new AdHocWALRecord(); + + WALPointer ptr0; + WALPointer ptr1; + + do { + try { + U.sleep(1000); + + dbMgr.checkpointReadLock(); + + try { + ptr0 = walMgr.log(markerRecord, CURRENT_SEGMENT); + } + finally { + dbMgr.checkpointReadUnlock(); + } + + ptr1 = walMgr.log(markerRecord); + + assertTrue(ptr0 instanceof FileWALPointer); + assertTrue(ptr1 instanceof FileWALPointer); + + assertTrue(((FileWALPointer)ptr0).index() < ((FileWALPointer)ptr1).index()); + } + catch (IgniteCheckedException e) { + log.error(e.getMessage(), e); + } + } + while (U.currentTimeMillis() - startTime < testDuration); + + fut.get(); + } +} http://git-wip-us.apache.org/repos/asf/ignite/blob/f07bdaa0/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/pagemem/NoOpWALManager.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/pagemem/NoOpWALManager.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/pagemem/NoOpWALManager.java index df89419..0beeed7 100644 --- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/pagemem/NoOpWALManager.java +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/pagemem/NoOpWALManager.java @@ -20,12 +20,12 @@ package org.apache.ignite.internal.processors.cache.persistence.pagemem; import org.apache.ignite.IgniteCheckedException; import org.apache.ignite.internal.GridKernalContext; import org.apache.ignite.internal.pagemem.wal.IgniteWriteAheadLogManager; -import org.apache.ignite.internal.processors.cache.persistence.StorageException; import org.apache.ignite.internal.pagemem.wal.WALIterator; import org.apache.ignite.internal.pagemem.wal.WALPointer; +import org.apache.ignite.internal.pagemem.wal.record.RolloverType; import org.apache.ignite.internal.pagemem.wal.record.WALRecord; import org.apache.ignite.internal.processors.cache.GridCacheSharedContext; -import org.apache.ignite.internal.processors.cache.persistence.wal.FileDescriptor; +import org.apache.ignite.internal.processors.cache.persistence.StorageException; import org.apache.ignite.lang.IgniteFuture; /** @@ -58,6 +58,11 @@ public class NoOpWALManager implements IgniteWriteAheadLogManager { } /** {@inheritDoc} */ + @Override public WALPointer log(WALRecord entry, RolloverType rollOverType) { + return null; + } + + /** {@inheritDoc} */ @Override public void flush(WALPointer ptr, boolean explicitFsync) throws IgniteCheckedException, StorageException { } http://git-wip-us.apache.org/repos/asf/ignite/blob/f07bdaa0/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/wal/reader/StandaloneWalRecordsIteratorTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/wal/reader/StandaloneWalRecordsIteratorTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/wal/reader/StandaloneWalRecordsIteratorTest.java index cf660c8..4d2bdcf 100644 --- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/wal/reader/StandaloneWalRecordsIteratorTest.java +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/wal/reader/StandaloneWalRecordsIteratorTest.java @@ -30,6 +30,7 @@ import org.apache.ignite.configuration.IgniteConfiguration; import org.apache.ignite.internal.IgniteEx; import org.apache.ignite.internal.pagemem.wal.IgniteWriteAheadLogManager; import org.apache.ignite.internal.pagemem.wal.WALIterator; +import org.apache.ignite.internal.pagemem.wal.record.RolloverType; import org.apache.ignite.internal.pagemem.wal.record.SnapshotRecord; import org.apache.ignite.internal.processors.cache.persistence.IgniteCacheDatabaseSharedManager; import org.apache.ignite.internal.processors.cache.persistence.file.FileIO; @@ -108,7 +109,7 @@ public class StandaloneWalRecordsIteratorTest extends GridCommonAbstractTest { sharedMgr.checkpointReadLock(); try { - walMgr.log(new SnapshotRecord(i, false)); + walMgr.log(new SnapshotRecord(i, false), RolloverType.NEXT_SEGMENT); } finally { sharedMgr.checkpointReadUnlock(); http://git-wip-us.apache.org/repos/asf/ignite/blob/f07bdaa0/modules/core/src/test/java/org/apache/ignite/testsuites/IgnitePdsTestSuite2.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/testsuites/IgnitePdsTestSuite2.java b/modules/core/src/test/java/org/apache/ignite/testsuites/IgnitePdsTestSuite2.java index 7631834..bf83bf5 100644 --- a/modules/core/src/test/java/org/apache/ignite/testsuites/IgnitePdsTestSuite2.java +++ b/modules/core/src/test/java/org/apache/ignite/testsuites/IgnitePdsTestSuite2.java @@ -57,6 +57,7 @@ import org.apache.ignite.internal.processors.cache.persistence.db.wal.IgniteWalS import org.apache.ignite.internal.processors.cache.persistence.db.wal.WalCompactionTest; import org.apache.ignite.internal.processors.cache.persistence.db.wal.WalDeletionArchiveFsyncTest; import org.apache.ignite.internal.processors.cache.persistence.db.wal.WalDeletionArchiveLogOnlyTest; +import org.apache.ignite.internal.processors.cache.persistence.db.wal.WalRolloverTypesTest; import org.apache.ignite.internal.processors.cache.persistence.db.wal.crc.IgniteDataIntegrityTests; import org.apache.ignite.internal.processors.cache.persistence.db.wal.crc.IgniteFsyncReplayWalIteratorInvalidCrcTest; import org.apache.ignite.internal.processors.cache.persistence.db.wal.crc.IgniteReplayWalIteratorInvalidCrcTest; @@ -186,5 +187,7 @@ public class IgnitePdsTestSuite2 extends TestSuite { suite.addTestSuite(IgniteRebalanceScheduleResendPartitionsTest.class); suite.addTestSuite(IgniteWALTailIsReachedDuringIterationOverArchiveTest.class); + + suite.addTestSuite(WalRolloverTypesTest.class); } }
