This is an automated email from the ASF dual-hosted git repository. ilyak pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/ignite.git
The following commit(s) were added to refs/heads/master by this push: new 1888416 IGNITE-13831 Move WAL archive cleanup from checkpoint to rollover - Fixes #8563. 1888416 is described below commit 1888416f563f67f3b786609a27951736d2948a5c Author: Kirill Tkalenko <tkalkir...@yandex.ru> AuthorDate: Wed Dec 23 16:43:35 2020 +0300 IGNITE-13831 Move WAL archive cleanup from checkpoint to rollover - Fixes #8563. Signed-off-by: Ilya Kasnacheev <ilya.kasnach...@gmail.com> --- .../pagemem/wal/IgniteWriteAheadLogManager.java | 5 - .../GridCacheDatabaseSharedManager.java | 7 +- .../cache/persistence/checkpoint/Checkpoint.java | 19 +- .../persistence/checkpoint/CheckpointHistory.java | 106 +---- .../persistence/checkpoint/CheckpointManager.java | 7 +- .../checkpoint/CheckpointMarkersStorage.java | 18 +- .../cache/persistence/checkpoint/Checkpointer.java | 16 +- .../persistence/wal/FileWriteAheadLogManager.java | 446 ++++++++++++++------- .../wal/aware/SegmentArchiveSizeStorage.java | 104 +++++ .../wal/aware/SegmentArchivedStorage.java | 17 - .../cache/persistence/wal/aware/SegmentAware.java | 98 ++++- .../wal/aware/SegmentCompressStorage.java | 2 +- .../persistence/wal/aware/SegmentLockStorage.java | 2 +- .../persistence/wal/aware/SegmentObservable.java | 2 +- .../wal/aware/SegmentReservationStorage.java | 67 +++- .../wal/aware/SegmentTruncateStorage.java | 151 +++++++ .../apache/ignite/internal/util/IgniteUtils.java | 20 + .../db/wal/WalDeletionArchiveAbstractTest.java | 120 ++++-- .../cache/persistence/pagemem/NoOpWALManager.java | 5 - .../persistence/wal/aware/SegmentAwareTest.java | 95 +++++ .../ignite/internal/util/IgniteUtilsSelfTest.java | 32 ++ 21 files changed, 989 insertions(+), 350 deletions(-) diff --git a/modules/core/src/main/java/org/apache/ignite/internal/pagemem/wal/IgniteWriteAheadLogManager.java b/modules/core/src/main/java/org/apache/ignite/internal/pagemem/wal/IgniteWriteAheadLogManager.java index f3d85c5..a4ed599 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/pagemem/wal/IgniteWriteAheadLogManager.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/pagemem/wal/IgniteWriteAheadLogManager.java @@ -186,11 +186,6 @@ public interface IgniteWriteAheadLogManager extends GridCacheSharedManager, Igni public long lastCompactedSegment(); /** - * @return Max allowed index of archived segment to delete or -1 if it does not exist. - */ - public long maxArchivedSegmentToDelete(); - - /** * Checks if WAL segment is under lock or reserved * * @param ptr Pointer to check. diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/GridCacheDatabaseSharedManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/GridCacheDatabaseSharedManager.java index 06e00f9..98ee778 100755 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/GridCacheDatabaseSharedManager.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/GridCacheDatabaseSharedManager.java @@ -2744,11 +2744,12 @@ public class GridCacheDatabaseSharedManager extends IgniteCacheDatabaseSharedMan } /** - * Wal truncate callBack. + * Wal truncate callback. * - * @param highBound WALPointer. + * @param highBound Upper bound. + * @throws IgniteCheckedException If failed. */ - public void onWalTruncated(WALPointer highBound) throws IgniteCheckedException { + public void onWalTruncated(@Nullable WALPointer highBound) throws IgniteCheckedException { checkpointManager.removeCheckpointsUntil(highBound); } diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/checkpoint/Checkpoint.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/checkpoint/Checkpoint.java index e6a378b..bc842dd 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/checkpoint/Checkpoint.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/checkpoint/Checkpoint.java @@ -21,7 +21,6 @@ import org.apache.ignite.internal.pagemem.FullPageId; import org.apache.ignite.internal.processors.cache.persistence.pagemem.PageMemoryEx; import org.apache.ignite.internal.util.GridConcurrentMultiPairQueue; import org.apache.ignite.lang.IgniteBiTuple; -import org.jetbrains.annotations.NotNull; import org.jetbrains.annotations.Nullable; /** @@ -34,26 +33,25 @@ class Checkpoint { /** Checkpoint pages. */ final GridConcurrentMultiPairQueue<PageMemoryEx, FullPageId> cpPages; - /** */ + /** Checkpoint progress status. */ final CheckpointProgressImpl progress; - /** Number of deleted WAL files. */ - int walFilesDeleted; - /** WAL segments fully covered by this checkpoint. */ IgniteBiTuple<Long, Long> walSegsCoveredRange; - /** */ + /** Number of dirty pages. */ final int pagesSize; /** + * Constructor. + * * @param cpEntry Checkpoint entry. * @param cpPages Pages to write to the page store. * @param progress Checkpoint progress status. */ Checkpoint( @Nullable CheckpointEntry cpEntry, - @NotNull GridConcurrentMultiPairQueue<PageMemoryEx, FullPageId> cpPages, + GridConcurrentMultiPairQueue<PageMemoryEx, FullPageId> cpPages, CheckpointProgressImpl progress ) { this.cpEntry = cpEntry; @@ -71,13 +69,6 @@ class Checkpoint { } /** - * @param walFilesDeleted Wal files deleted. - */ - public void walFilesDeleted(int walFilesDeleted) { - this.walFilesDeleted = walFilesDeleted; - } - - /** * @param walSegsCoveredRange WAL segments fully covered by this checkpoint. */ public void walSegsCoveredRange(final IgniteBiTuple<Long, Long> walSegsCoveredRange) { diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/checkpoint/CheckpointHistory.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/checkpoint/CheckpointHistory.java index 869f0da..87be2eb 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/checkpoint/CheckpointHistory.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/checkpoint/CheckpointHistory.java @@ -87,22 +87,26 @@ public class CheckpointHistory { * Constructor. * * @param dsCfg Data storage configuration. - * @param wal Write ahead log. + * @param logFun Function for getting a logger. + * @param wal WAL manager. * @param inapplicable Checkpoint inapplicable filter. */ CheckpointHistory( DataStorageConfiguration dsCfg, - Function<Class<?>, IgniteLogger> logger, + Function<Class<?>, IgniteLogger> logFun, IgniteWriteAheadLogManager wal, IgniteThrowableBiPredicate<Long, Integer> inapplicable ) { - this.log = logger.apply(getClass()); + this.log = logFun.apply(getClass()); this.wal = wal; this.checkpointInapplicable = inapplicable; isWalTruncationEnabled = dsCfg.getMaxWalArchiveSize() != DataStorageConfiguration.UNLIMITED_WAL_ARCHIVE; - maxCpHistMemSize = IgniteSystemProperties.getInteger(IGNITE_PDS_MAX_CHECKPOINT_MEMORY_HISTORY_SIZE, DFLT_PDS_MAX_CHECKPOINT_MEMORY_HISTORY_SIZE); + maxCpHistMemSize = IgniteSystemProperties.getInteger( + IGNITE_PDS_MAX_CHECKPOINT_MEMORY_HISTORY_SIZE, + DFLT_PDS_MAX_CHECKPOINT_MEMORY_HISTORY_SIZE + ); reservationDisabled = dsCfg.getWalMode() == WALMode.NONE; } @@ -228,7 +232,7 @@ public class CheckpointHistory { addCpGroupStatesToEarliestCpMap(entry, states); } catch (IgniteCheckedException ex) { - U.warn(log, "Failed to process checkpoint: " + (entry != null ? entry : "none"), ex); + U.warn(log, "Failed to process checkpoint: " + entry, ex); earliestCp.clear(); } @@ -314,6 +318,7 @@ public class CheckpointHistory { /** * Clears checkpoint history after WAL truncation. * + * @param highBound Upper bound. * @return List of checkpoint entries removed from history. */ public List<CheckpointEntry> onWalTruncated(WALPointer highBound) { @@ -391,100 +396,13 @@ public class CheckpointHistory { /** * Logs and clears checkpoint history after checkpoint finish. * + * @param chp Finished checkpoint. * @return List of checkpoints removed from history. */ public List<CheckpointEntry> onCheckpointFinished(Checkpoint chp) { chp.walSegsCoveredRange(calculateWalSegmentsCovered()); - int removeCount = isWalTruncationEnabled - ? checkpointCountUntilDeleteByArchiveSize() - : (histMap.size() - maxCpHistMemSize); - - if (removeCount <= 0) - return Collections.emptyList(); - - List<CheckpointEntry> deletedCheckpoints = removeCheckpoints(removeCount); - - if (isWalTruncationEnabled) { - int deleted = wal.truncate(firstCheckpointPointer()); - - chp.walFilesDeleted(deleted); - } - - return deletedCheckpoints; - } - - /** - * @param first One of pointers to choose the newest. - * @param second One of pointers to choose the newest. - * @return The newest pointer from input ones. - */ - private WALPointer newerPointer(WALPointer first, WALPointer second) { - if (first == null) - return second; - - if (second == null) - return first; - - return first.index() > second.index() ? first : second; - } - - /** - * Calculate mark until delete by maximum checkpoint history memory size. - * - * @return Checkpoint mark until which checkpoints can be deleted(not including this pointer). - */ - private WALPointer checkpointMarkUntilDeleteByMemorySize() { - if (histMap.size() <= maxCpHistMemSize) - return null; - - int calculatedCpHistSize = maxCpHistMemSize; - - for (Map.Entry<Long, CheckpointEntry> entry : histMap.entrySet()) { - if (histMap.size() <= calculatedCpHistSize++) - return entry.getValue().checkpointMark(); - } - - return lastCheckpoint().checkpointMark(); - } - - /** - * Calculate count of checkpoints to delete by maximum allowed archive size. - * - * @return Checkpoint count to be deleted. - */ - private int checkpointCountUntilDeleteByArchiveSize() { - long absFileIdxToDel = wal.maxArchivedSegmentToDelete(); - - if (absFileIdxToDel < 0) - return 0; - - long fileUntilDel = absFileIdxToDel + 1; - - long checkpointFileIdx = absFileIdx(lastCheckpoint()); - - int countToRemove = 0; - - for (CheckpointEntry cpEntry : histMap.values()) { - long currFileIdx = absFileIdx(cpEntry); - - if (checkpointFileIdx <= currFileIdx || fileUntilDel <= currFileIdx) - return countToRemove; - - countToRemove++; - } - - return histMap.size() - 1; - } - - /** - * Retrieve absolute file index by checkpoint entry. - * - * @param pointer checkpoint entry for which need to calculate absolute file index. - * @return absolute file index for given checkpoint entry. - */ - private long absFileIdx(CheckpointEntry pointer) { - return pointer.checkpointMark().index(); + return removeCheckpoints(isWalTruncationEnabled ? 0 : histMap.size() - maxCpHistMemSize); } /** diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/checkpoint/CheckpointManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/checkpoint/CheckpointManager.java index 7fe7002..072d3bf 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/checkpoint/CheckpointManager.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/checkpoint/CheckpointManager.java @@ -300,11 +300,12 @@ public class CheckpointManager { } /** - * Wal truncate callBack. + * Wal truncate callback. * - * @param highBound WALPointer. + * @param highBound Upper bound. + * @throws IgniteCheckedException If failed. */ - public void removeCheckpointsUntil(WALPointer highBound) throws IgniteCheckedException { + public void removeCheckpointsUntil(@Nullable WALPointer highBound) throws IgniteCheckedException { checkpointMarkersStorage.removeCheckpointsUntil(highBound); } diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/checkpoint/CheckpointMarkersStorage.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/checkpoint/CheckpointMarkersStorage.java index b421a33..d601d53 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/checkpoint/CheckpointMarkersStorage.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/checkpoint/CheckpointMarkersStorage.java @@ -141,24 +141,28 @@ public class CheckpointMarkersStorage { } /** - * Wal truncate callBack. + * Wal truncate callback. * - * @param highBound WALPointer. + * @param highBound Upper bound. + * @throws IgniteCheckedException If failed. */ - public void removeCheckpointsUntil(WALPointer highBound) throws IgniteCheckedException { - List<CheckpointEntry> removedFromHistory = history().onWalTruncated(highBound); + public void removeCheckpointsUntil(@Nullable WALPointer highBound) throws IgniteCheckedException { + List<CheckpointEntry> rmvFromHist = history().onWalTruncated(highBound); - for (CheckpointEntry cp : removedFromHistory) + for (CheckpointEntry cp : rmvFromHist) removeCheckpointFiles(cp); } /** * Logs and clears checkpoint history after checkpoint finish. + * + * @param chp Finished checkpoint. + * @throws IgniteCheckedException If failed. */ public void onCheckpointFinished(Checkpoint chp) throws IgniteCheckedException { - List<CheckpointEntry> removedFromHistory = history().onCheckpointFinished(chp); + List<CheckpointEntry> rmvFromHist = history().onCheckpointFinished(chp); - for (CheckpointEntry cp : removedFromHistory) + for (CheckpointEntry cp : rmvFromHist) removeCheckpointFiles(cp); } diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/checkpoint/Checkpointer.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/checkpoint/Checkpointer.java index 6747e58..63440b2 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/checkpoint/Checkpointer.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/checkpoint/Checkpointer.java @@ -456,16 +456,12 @@ public class Checkpointer extends GridWorker { if (chp.hasDelta() || destroyedPartitionsCnt > 0) { if (log.isInfoEnabled()) { - String walSegsCoveredMsg = chp.walSegsCoveredRange == null ? "" : prepareWalSegsCoveredMsg(chp.walSegsCoveredRange); - log.info(String.format("Checkpoint finished [cpId=%s, pages=%d, markPos=%s, " + - "walSegmentsCleared=%d, walSegmentsCovered=%s, markDuration=%dms, pagesWrite=%dms, fsync=%dms, " + - "total=%dms]", + "walSegmentsCovered=%s, markDuration=%dms, pagesWrite=%dms, fsync=%dms, total=%dms]", chp.cpEntry != null ? chp.cpEntry.checkpointId() : "", chp.pagesSize, chp.cpEntry != null ? chp.cpEntry.checkpointMark() : "", - chp.walFilesDeleted, - walSegsCoveredMsg, + walRangeStr(chp.walSegsCoveredRange), tracker.markDuration(), tracker.pagesWriteDuration(), tracker.fsyncDuration(), @@ -591,9 +587,15 @@ public class Checkpointer extends GridWorker { } /** + * Creates a string of a range WAL segments. + * + * @param walRange Range of WAL segments. * @return The message about how many WAL segments was between previous checkpoint and current one. */ - private String prepareWalSegsCoveredMsg(IgniteBiTuple<Long, Long> walRange) { + private String walRangeStr(@Nullable IgniteBiTuple<Long, Long> walRange) { + if (walRange == null) + return ""; + String res; long startIdx = walRange.get1(); diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/FileWriteAheadLogManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/FileWriteAheadLogManager.java index f22aed9..4b97487 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/FileWriteAheadLogManager.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/FileWriteAheadLogManager.java @@ -51,7 +51,6 @@ import java.util.concurrent.atomic.AtomicLongArray; import java.util.concurrent.atomic.AtomicReferenceFieldUpdater; import java.util.regex.Pattern; import java.util.stream.Collectors; -import java.util.stream.Stream; import java.util.zip.ZipEntry; import java.util.zip.ZipInputStream; import java.util.zip.ZipOutputStream; @@ -257,7 +256,7 @@ public class FileWriteAheadLogManager extends GridCacheSharedManagerAdapter impl */ private final long maxSegCountWithoutCheckpoint; - /** Size of wal archive since which removing of old archive should be started */ + /** Size of wal archive since which removing of old archive should be started. */ private final long allowedThresholdWalArchiveSize; /** */ @@ -319,6 +318,12 @@ public class FileWriteAheadLogManager extends GridCacheSharedManagerAdapter impl /** Decompressor. */ @Nullable private FileDecompressor decompressor; + /** + * Cleaner of segments from WAL archive when the maximum size is reached. + * Will not work if WAL archive size is {@link DataStorageConfiguration#UNLIMITED_WAL_ARCHIVE}. + */ + @Nullable private FileCleaner cleaner; + /** Current log segment handle. */ private volatile FileWriteHandle currHnd; @@ -494,8 +499,9 @@ public class FileWriteAheadLogManager extends GridCacheSharedManagerAdapter impl if (isArchiverEnabled()) archiver = new FileArchiver(segmentAware, log); - else - archiver = null; + + if (!walArchiveUnlimited()) + cleaner = new FileCleaner(log); segmentRouter = new SegmentRouter(walWorkDir, walArchiveDir, segmentAware, dsCfg); @@ -534,11 +540,14 @@ public class FileWriteAheadLogManager extends GridCacheSharedManagerAdapter impl } /** - * + * Running workers of WAL archive. */ - private void startArchiverAndCompressor() { + private void startArchiveWorkers() { segmentAware.reset(); + segmentAware.resetWalArchiveSizes(); + segmentAware.addCurrentWalArchiveSize(totalSize(walArchiveFiles())); + if (isArchiverEnabled()) { assert archiver != null : "FileArchiver should be initialized."; @@ -554,6 +563,12 @@ public class FileWriteAheadLogManager extends GridCacheSharedManagerAdapter impl decompressor.restart(); } + + if (!walArchiveUnlimited()) { + assert cleaner != null : "FileCleaner should be initialized."; + + cleaner.restart(); + } } /** @@ -654,6 +669,9 @@ public class FileWriteAheadLogManager extends GridCacheSharedManagerAdapter impl if (decompressor != null) decompressor.shutdown(); + + if (cleaner != null) + cleaner.shutdown(); } catch (IgniteInterruptedCheckedException e) { U.error(log, "Failed to gracefully shutdown WAL components, thread was interrupted.", e); @@ -703,7 +721,7 @@ public class FileWriteAheadLogManager extends GridCacheSharedManagerAdapter impl assert currHnd == null; - startArchiverAndCompressor(); + startArchiveWorkers(); assert (isArchiverEnabled() && archiver != null) || (!isArchiverEnabled() && archiver == null) : "Trying to restore FileWriteHandle on deactivated write ahead log manager"; @@ -1051,6 +1069,8 @@ public class FileWriteAheadLogManager extends GridCacheSharedManagerAdapter impl || !segmentAware.minReserveIndex(desc.idx)) // We cannot delete reserved segment. return deleted; + long len = desc.file.length(); + if (!desc.file.delete()) { U.warn(log, "Failed to remove obsolete WAL segment (make sure the process has enough rights): " + desc.file.getAbsolutePath()); @@ -1059,6 +1079,7 @@ public class FileWriteAheadLogManager extends GridCacheSharedManagerAdapter impl deleted++; segmentSize.remove(desc.idx()); + segmentAware.addCurrentWalArchiveSize(-len); } // Bump up the oldest archive segment index. @@ -1087,6 +1108,8 @@ public class FileWriteAheadLogManager extends GridCacheSharedManagerAdapter impl /** {@inheritDoc} */ @Override public void notchLastCheckpointPtr(WALPointer ptr) { lastCheckpointPtr = ptr; + + segmentAware.lastCheckpointIdx(ptr.index()); } /** {@inheritDoc} */ @@ -1261,24 +1284,38 @@ public class FileWriteAheadLogManager extends GridCacheSharedManagerAdapter impl switchSegmentRecordOffset.set(idx, hnd.getSwitchSegmentRecordOffset()); } + if (archiver == null) + segmentAware.addReservedWalArchiveSize(maxWalSegmentSize); + FileWriteHandle next; try { - next = initNextWriteHandle(cur); - } - catch (IgniteCheckedException e) { - //Allow to avoid forever waiting in other threads. - cur.signalNextAvailable(); + try { + next = initNextWriteHandle(cur); + } + catch (IgniteCheckedException e) { + //Allow to avoid forever waiting in other threads. + cur.signalNextAvailable(); - throw e; - } + throw e; + } - if (rec != null) { - WALPointer ptr = next.addRecord(rec); + if (rec != null) { + WALPointer ptr = next.addRecord(rec); - assert ptr != null; + assert ptr != null; + } + + segmentSize.put(next.getSegmentId(), maxWalSegmentSize); + + if (archiver == null) + segmentAware.addCurrentWalArchiveSize(maxWalSegmentSize); + } + finally { + if (archiver == null) + segmentAware.addReservedWalArchiveSize(-maxWalSegmentSize); } - if (next.getSegmentId() - lashCheckpointFileIdx() >= maxSegCountWithoutCheckpoint) + if (next.getSegmentId() - lastCheckpointPtr.index() >= maxSegCountWithoutCheckpoint) cctx.database().forceCheckpoint("too big size of WAL without checkpoint"); boolean updated = updateCurrentHandle(next, hnd); @@ -1298,15 +1335,6 @@ public class FileWriteAheadLogManager extends GridCacheSharedManagerAdapter impl } /** - * Give last checkpoint file idx. - */ - private long lashCheckpointFileIdx() { - WALPointer lastCheckpointMark = cctx.database().lastCheckpointMarkWalPointer(); - - return lastCheckpointMark == null ? 0 : lastCheckpointMark.index(); - } - - /** * @param lastReadPtr Last read WAL file pointer. * @return Initialized file write handle. * @throws StorageException If failed to initialize WAL write handle. @@ -1358,6 +1386,7 @@ public class FileWriteAheadLogManager extends GridCacheSharedManagerAdapter impl FileDescriptor[] walArchiveFiles = walArchiveFiles(); segmentAware.minReserveIndex(F.isEmpty(walArchiveFiles) ? -1 : walArchiveFiles[0].idx - 1); + segmentAware.lastTruncatedArchiveIdx(F.isEmpty(walArchiveFiles) ? -1 : walArchiveFiles[0].idx - 1); if (archiver0 == null) segmentAware.setLastArchivedAbsoluteIndex(absIdx - 1); @@ -1627,37 +1656,10 @@ public class FileWriteAheadLogManager extends GridCacheSharedManagerAdapter impl * * @return Raw or compressed WAL segments from archive. */ - private FileDescriptor[] walArchiveFiles() { + public FileDescriptor[] walArchiveFiles() { return scan(walArchiveDir.listFiles(WAL_SEGMENT_COMPACTED_OR_RAW_FILE_FILTER)); } - /** {@inheritDoc} */ - @Override public long maxArchivedSegmentToDelete() { - //When maxWalArchiveSize==-1 deleting files is not permitted. - if (dsCfg.getMaxWalArchiveSize() == DataStorageConfiguration.UNLIMITED_WAL_ARCHIVE) - return -1; - - FileDescriptor[] archivedFiles = walArchiveFiles(); - - Long totalArchiveSize = Stream.of(archivedFiles) - .map(desc -> desc.file().length()) - .reduce(0L, Long::sum); - - if (archivedFiles.length == 0 || totalArchiveSize < allowedThresholdWalArchiveSize) - return -1; - - long sizeOfOldestArchivedFiles = 0; - - for (FileDescriptor desc : archivedFiles) { - sizeOfOldestArchivedFiles += desc.file().length(); - - if (totalArchiveSize - sizeOfOldestArchivedFiles < allowedThresholdWalArchiveSize) - return desc.getIdx(); - } - - return archivedFiles[archivedFiles.length - 1].getIdx(); - } - /** * @return Sorted WAL files descriptors. */ @@ -1997,24 +1999,18 @@ public class FileWriteAheadLogManager extends GridCacheSharedManagerAdapter impl ", origFile=" + origFile.getAbsolutePath() + ", dstFile=" + dstFile.getAbsolutePath() + ']'); } - try { - Files.deleteIfExists(dstTmpFile.toPath()); - - boolean copied = false; - - long offs = switchSegmentRecordOffset.get((int)segIdx); - - if (offs > 0) { - switchSegmentRecordOffset.set((int)segIdx, 0); + assert switchSegmentRecordOffset != null; - if (offs < origFile.length()) { - GridFileUtils.copy(ioFactory, origFile, ioFactory, dstTmpFile, offs); + long offs = switchSegmentRecordOffset.getAndSet((int)segIdx, 0); + long origLen = origFile.length(); - copied = true; - } - } + long reservedSize = offs > 0 && offs < origLen ? offs : origLen; + segmentAware.addReservedWalArchiveSize(reservedSize); - if (!copied) + try { + if (offs > 0 && offs < origLen) + GridFileUtils.copy(ioFactory, origFile, ioFactory, dstTmpFile, offs); + else Files.copy(origFile.toPath(), dstTmpFile.toPath()); Files.move(dstTmpFile.toPath(), dstFile.toPath()); @@ -2026,12 +2022,18 @@ public class FileWriteAheadLogManager extends GridCacheSharedManagerAdapter impl } segmentSize.put(absIdx, dstFile.length()); + segmentAware.addCurrentWalArchiveSize(dstFile.length()); } catch (IOException e) { + deleteArchiveFiles(dstFile, dstTmpFile); + throw new StorageException("Failed to archive WAL segment [" + "srcFile=" + origFile.getAbsolutePath() + ", dstFile=" + dstTmpFile.getAbsolutePath() + ']', e); } + finally { + segmentAware.addReservedWalArchiveSize(-reservedSize); + } if (log.isInfoEnabled()) { log.info("Copied file [src=" + origFile.getAbsolutePath() + @@ -2217,8 +2219,6 @@ public class FileWriteAheadLogManager extends GridCacheSharedManagerAdapter impl if ((segIdx = tryReserveNextSegmentOrWait()) == -1) continue; - deleteObsoleteRawSegments(); - String segmentFileName = fileName(segIdx); File tmpZip = new File(walArchiveDir, segmentFileName + ZIP_SUFFIX + TMP_SUFFIX); @@ -2227,33 +2227,48 @@ public class FileWriteAheadLogManager extends GridCacheSharedManagerAdapter impl File raw = new File(walArchiveDir, segmentFileName); - if (!Files.exists(raw.toPath())) - throw new IgniteCheckedException("WAL archive segment is missing: " + raw); + long reservedSize = raw.length(); + segmentAware.addReservedWalArchiveSize(reservedSize); - compressSegmentToFile(segIdx, raw, tmpZip); + try { + deleteObsoleteRawSegments(); - Files.move(tmpZip.toPath(), zip.toPath()); + if (!Files.exists(raw.toPath())) + throw new IgniteCheckedException("WAL archive segment is missing: " + raw); - try (FileIO f0 = ioFactory.create(zip, CREATE, READ, WRITE)) { - f0.force(); + compressSegmentToFile(segIdx, raw, tmpZip); + + Files.move(tmpZip.toPath(), zip.toPath()); + + try (FileIO f0 = ioFactory.create(zip, CREATE, READ, WRITE)) { + f0.force(); + } + + segmentSize.put(segIdx, zip.length()); + segmentAware.addCurrentWalArchiveSize(zip.length()); + + segmentAware.onSegmentCompressed(segIdx); + + if (evt.isRecordable(EVT_WAL_SEGMENT_COMPACTED) && !cctx.kernalContext().recoveryMode()) + evt.record(new WalSegmentCompactedEvent(cctx.localNode(), segIdx, zip.getAbsoluteFile())); } + catch (IgniteCheckedException | IOException e) { + deleteArchiveFiles(zip, tmpZip); - segmentAware.onSegmentCompressed(segIdx); + lastCompressionError = e; - if (evt.isRecordable(EVT_WAL_SEGMENT_COMPACTED) && !cctx.kernalContext().recoveryMode()) - evt.record(new WalSegmentCompactedEvent(cctx.localNode(), segIdx, zip.getAbsoluteFile())); + U.error(log, "Compression of WAL segment [idx=" + segIdx + + "] was skipped due to unexpected error", lastCompressionError); + + segmentAware.onSegmentCompressed(segIdx); + } + finally { + segmentAware.addReservedWalArchiveSize(-reservedSize); + } } catch (IgniteInterruptedCheckedException ignore) { Thread.currentThread().interrupt(); } - catch (IgniteCheckedException | IOException e) { - lastCompressionError = e; - - U.error(log, "Compression of WAL segment [idx=" + segIdx + - "] was skipped due to unexpected error", lastCompressionError); - - segmentAware.onSegmentCompressed(segIdx); - } finally { if (segIdx != -1L) release(new WALPointer(segIdx, 0, 0)); @@ -2313,8 +2328,6 @@ public class FileWriteAheadLogManager extends GridCacheSharedManagerAdapter impl zos.write(heapBuf.array()); } - - segmentSize.put(idx, zip.length()); } /** @@ -2342,7 +2355,7 @@ public class FileWriteAheadLogManager extends GridCacheSharedManagerAdapter impl * Deletes raw WAL segments if they aren't locked and already have compressed copies of themselves. */ private void deleteObsoleteRawSegments() { - FileDescriptor[] descs = scan(walArchiveDir.listFiles(WAL_SEGMENT_COMPACTED_OR_RAW_FILE_FILTER)); + FileDescriptor[] descs = walArchiveFiles(); Set<Long> indices = new HashSet<>(); Set<Long> duplicateIndices = new HashSet<>(); @@ -2360,13 +2373,8 @@ public class FileWriteAheadLogManager extends GridCacheSharedManagerAdapter impl if (segmentReservedOrLocked(desc.idx)) return; - if (desc.idx < lastCheckpointPtr.index() && duplicateIndices.contains(desc.idx)) { - if (desc.file.exists() && !desc.file.delete()) { - U.warn(log, "Failed to remove obsolete WAL segment " + - "(make sure the process has enough rights): " + desc.file.getAbsolutePath() + - ", exists: " + desc.file.exists()); - } - } + if (desc.idx < lastCheckpointPtr.index() && duplicateIndices.contains(desc.idx)) + segmentAware.addCurrentWalArchiveSize(-deleteArchiveFiles(desc.file)); } } } @@ -2400,24 +2408,35 @@ public class FileWriteAheadLogManager extends GridCacheSharedManagerAdapter impl while (!isCancelled()) { long segmentToDecompress = -1L; + blockingSectionBegin(); + try { - blockingSectionBegin(); + segmentToDecompress = segmentsQueue.take(); + } + finally { + blockingSectionEnd(); + } - try { - segmentToDecompress = segmentsQueue.take(); - } - finally { - blockingSectionEnd(); - } + if (isCancelled()) + break; - if (isCancelled()) - break; + if (segmentToDecompress == -1) + continue; + + String segmentFileName = fileName(segmentToDecompress); + + File zip = new File(walArchiveDir, segmentFileName + ZIP_SUFFIX); + File unzipTmp = new File(walArchiveDir, segmentFileName + TMP_SUFFIX); + File unzip = new File(walArchiveDir, segmentFileName); + + long reservedSize = U.uncompressedSize(zip); + segmentAware.addReservedWalArchiveSize(reservedSize); - String segmentFileName = fileName(segmentToDecompress); + IgniteCheckedException ex = null; - File zip = new File(walArchiveDir, segmentFileName + ZIP_SUFFIX); - File unzipTmp = new File(walArchiveDir, segmentFileName + TMP_SUFFIX); - File unzip = new File(walArchiveDir, segmentFileName); + try { + if (unzip.exists()) + throw new FileAlreadyExistsException(unzip.getAbsolutePath()); try (ZipInputStream zis = new ZipInputStream(new BufferedInputStream(new FileInputStream(zip))); FileIO io = ioFactory.create(unzipTmp)) { @@ -2427,32 +2446,30 @@ public class FileWriteAheadLogManager extends GridCacheSharedManagerAdapter impl updateHeartbeat(); } - try { - Files.move(unzipTmp.toPath(), unzip.toPath()); - } - catch (FileAlreadyExistsException e) { + Files.move(unzipTmp.toPath(), unzip.toPath()); + + segmentAware.addCurrentWalArchiveSize(unzip.length()); + } + catch (IOException e) { + deleteArchiveFiles(unzipTmp); + + if (e instanceof FileAlreadyExistsException) { U.error(log, "Can't rename temporary unzipped segment: raw segment is already present " + "[tmp=" + unzipTmp + ", raw=" + unzip + "]", e); - - if (!unzipTmp.delete()) - U.error(log, "Can't delete temporary unzipped segment [tmp=" + unzipTmp + "]"); } - - updateHeartbeat(); - - synchronized (this) { - decompressionFutures.remove(segmentToDecompress).onDone(); + else if (!isCancelled) { + ex = new IgniteCheckedException("Error during WAL segment decompression [segmentIdx=" + + segmentToDecompress + "]", e); } } - catch (IOException ex) { - if (!isCancelled && segmentToDecompress != -1L) { - IgniteCheckedException e = new IgniteCheckedException("Error during WAL segment " + - "decompression [segmentIdx=" + segmentToDecompress + "]", ex); + finally { + segmentAware.addReservedWalArchiveSize(-reservedSize); + } - synchronized (this) { - decompressionFutures.remove(segmentToDecompress).onDone(e); - } - } + updateHeartbeat(); + + synchronized (this) { + decompressionFutures.remove(segmentToDecompress).onDone(ex); } } } @@ -3062,8 +3079,6 @@ public class FileWriteAheadLogManager extends GridCacheSharedManagerAdapter impl else res = CURR_HND_UPD.compareAndSet(this, c, n); - segmentSize.put(n.getSegmentId(), maxWalSegmentSize); - return res; } @@ -3077,4 +3092,167 @@ public class FileWriteAheadLogManager extends GridCacheSharedManagerAdapter impl return name != null && (WAL_NAME_PATTERN.matcher(name).matches() || WAL_SEGMENT_FILE_COMPACTED_PATTERN.matcher(name).matches()); } + + /** + * Getting last truncated segment. + * + * @return Absolut segment index. + */ + public long lastTruncatedSegment() { + return segmentAware.lastTruncatedArchiveIdx(); + } + + /** + * Total size of the segments in bytes. + * + * @return Size in bytes. + */ + public long totalSize(FileDescriptor... fileDescriptors) { + long len = 0; + + for (FileDescriptor descriptor : fileDescriptors) + len += descriptor.file.length(); + + return len; + } + + /** + * Check if WAL archive is unlimited. + * + * @return {@code True} if unlimited. + */ + private boolean walArchiveUnlimited() { + return dsCfg.getMaxWalArchiveSize() == DataStorageConfiguration.UNLIMITED_WAL_ARCHIVE; + } + + /** + * Removing files from {@link #walArchiveDir}. + * + * @param files Files from {@link #walArchiveDir}. + * @return Total deleted size in bytes. + */ + private long deleteArchiveFiles(File... files) { + long size = 0; + + for (File file : files) { + if (file.exists()) { + long len = file.length(); + + if (file.delete()) + size += len; + else if (file.exists()) { + U.warn(log, "Unable to delete file from WAL archive" + + " (make sure the process has enough rights): " + file.getAbsolutePath()); + } + } + } + + return size; + } + + /** + * Worker for an asynchronous WAL archive cleanup that starts when the maximum size is exceeded. + * {@link SegmentAware#awaitExceedMaxArchiveSize} is used to determine if the maximum is exceeded. + */ + private class FileCleaner extends GridWorker { + /** + * Constructor. + * + * @param log Logger. + */ + public FileCleaner(IgniteLogger log) { + super(cctx.igniteInstanceName(), "wal-file-cleaner%" + cctx.igniteInstanceName(), log); + + assert !walArchiveUnlimited(); + } + + /** {@inheritDoc} */ + @Override protected void body() throws InterruptedException, IgniteInterruptedCheckedException { + Throwable err = null; + + try { + while (!isCancelled()) { + segmentAware.awaitExceedMaxArchiveSize(allowedThresholdWalArchiveSize); + segmentAware.awaitAvailableTruncateArchive(); + + FileDescriptor[] walArchiveFiles = walArchiveFiles(); + + FileDescriptor high = null; + + long size = 0; + + for (FileDescriptor fileDesc : walArchiveFiles) { + if (fileDesc.idx >= lastCheckpointPtr.index() || segmentAware.reserved(fileDesc.idx)) + break; + else { + high = fileDesc; + + // Ensure that there will be exactly removed at least one segment. + if ((size += fileDesc.file.length()) > allowedThresholdWalArchiveSize) + break; + } + } + + if (high != null) { + WALPointer highPtr = new WALPointer(high.idx + 1, 0, 0); + + if (log.isInfoEnabled()) { + log.info("Starting to clean WAL archive [highIdx=" + highPtr.index() + + ", currSize=" + U.humanReadableByteCount(totalSize(walArchiveFiles)) + + ", maxSize=" + U.humanReadableByteCount(dsCfg.getMaxWalArchiveSize()) + ']'); + } + + ((GridCacheDatabaseSharedManager)cctx.database()).onWalTruncated(highPtr); + + int truncated = truncate(highPtr); + + if (log.isInfoEnabled()) { + log.info("Finish clean WAL archive [cleanCnt=" + truncated + + ", currSize=" + U.humanReadableByteCount(totalSize(walArchiveFiles())) + + ", maxSize=" + U.humanReadableByteCount(dsCfg.getMaxWalArchiveSize()) + ']'); + } + } + } + } + catch (IgniteInterruptedCheckedException e) { + Thread.currentThread().interrupt(); + + isCancelled = true; + } + catch (Throwable t) { + err = t; + } + finally { + if (err == null && !isCancelled()) + err = new IllegalStateException("Worker " + name() + " is terminated unexpectedly"); + + if (err instanceof OutOfMemoryError) + failureProcessor.process(new FailureContext(CRITICAL_ERROR, err)); + else if (err != null) + failureProcessor.process(new FailureContext(SYSTEM_WORKER_TERMINATION, err)); + } + } + + /** + * Shutdown worker. + * + * @throws IgniteInterruptedCheckedException If failed to wait for worker shutdown. + */ + private void shutdown() throws IgniteInterruptedCheckedException { + isCancelled = true; + + U.join(this); + } + + /** + * Restart worker in IgniteThread. + */ + public void restart() { + assert runner() == null : "FileCleaner is still running"; + + isCancelled = false; + + new IgniteThread(this).start(); + } + } } diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/aware/SegmentArchiveSizeStorage.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/aware/SegmentArchiveSizeStorage.java new file mode 100644 index 0000000..76d6022 --- /dev/null +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/aware/SegmentArchiveSizeStorage.java @@ -0,0 +1,104 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.cache.persistence.wal.aware; + +import org.apache.ignite.internal.IgniteInterruptedCheckedException; + +/** + * Storage WAL archive size. + * Allows to track the exceeding of the maximum archive size. + */ +class SegmentArchiveSizeStorage { + /** Current WAL archive size in bytes. */ + private long curr; + + /** Reserved WAL archive size in bytes. */ + private long reserved; + + /** Flag of interrupt waiting on this object. */ + private volatile boolean interrupted; + + /** + * Adding current WAL archive size in bytes. + * + * @param size Size in bytes. + */ + synchronized void addCurrentSize(long size) { + curr += size; + + if (size > 0) + notifyAll(); + } + + /** + * Adding reserved WAL archive size in bytes. + * Defines a hint to determine if the maximum size is exceeded before a new segment is archived. + * + * @param size Size in bytes. + */ + synchronized void addReservedSize(long size) { + reserved += size; + + if (size > 0) + notifyAll(); + } + + /** + * Reset the current and reserved WAL archive sizes. + */ + synchronized void resetSizes() { + curr = 0; + reserved = 0; + } + + /** + * Waiting for exceeding the maximum WAL archive size. + * To track size of WAL archive, need to use {@link #addCurrentSize} and {@link #addReservedSize}. + * + * @param max Maximum WAL archive size in bytes. + * @throws IgniteInterruptedCheckedException If it was interrupted. + */ + synchronized void awaitExceedMaxSize(long max) throws IgniteInterruptedCheckedException { + try { + while (max - (curr + reserved) > 0 && !interrupted) + wait(); + } + catch (InterruptedException e) { + throw new IgniteInterruptedCheckedException(e); + } + + if (interrupted) + throw new IgniteInterruptedCheckedException("Interrupt waiting of exceed max archive size"); + } + + /** + * Interrupt waiting on this object. + */ + synchronized void interrupt() { + interrupted = true; + + notifyAll(); + } + + /** + * Reset interrupted flag. + */ + void reset() { + interrupted = false; + } +} diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/aware/SegmentArchivedStorage.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/aware/SegmentArchivedStorage.java index 53b3b59..12a1e5f 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/aware/SegmentArchivedStorage.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/aware/SegmentArchivedStorage.java @@ -37,9 +37,6 @@ class SegmentArchivedStorage extends SegmentObservable { */ private volatile long lastAbsArchivedIdx = -1; - /** Latest truncated segment. */ - private volatile long lastTruncatedArchiveIdx = -1; - /** * @param segmentLockStorage Protects WAL work segments from moving. */ @@ -137,18 +134,4 @@ class SegmentArchivedStorage extends SegmentObservable { synchronized void onSegmentUnlocked(long segmentId) { notifyAll(); } - - /** - * @param lastTruncatedArchiveIdx Last truncated segment. - */ - void lastTruncatedArchiveIdx(long lastTruncatedArchiveIdx) { - this.lastTruncatedArchiveIdx = lastTruncatedArchiveIdx; - } - - /** - * @return Last truncated segment. - */ - long lastTruncatedArchiveIdx() { - return lastTruncatedArchiveIdx; - } } diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/aware/SegmentAware.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/aware/SegmentAware.java index 8d676c9..48b16b0 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/aware/SegmentAware.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/aware/SegmentAware.java @@ -39,6 +39,12 @@ public class SegmentAware { /** Storage of absolute current segment index. */ private final SegmentCurrentStateStorage segmentCurrStateStorage; + /** Storage of archive size. */ + private final SegmentArchiveSizeStorage archiveSizeStorage; + + /** Storage of truncated segments. */ + private final SegmentTruncateStorage truncateStorage; + /** * Constructor. * @@ -52,10 +58,16 @@ public class SegmentAware { segmentCurrStateStorage = new SegmentCurrentStateStorage(walSegmentsCnt); segmentCompressStorage = new SegmentCompressStorage(log, compactionEnabled); + archiveSizeStorage = new SegmentArchiveSizeStorage(); + truncateStorage = new SegmentTruncateStorage(); + segmentArchivedStorage.addObserver(segmentCurrStateStorage::onSegmentArchived); segmentArchivedStorage.addObserver(segmentCompressStorage::onSegmentArchived); + segmentArchivedStorage.addObserver(truncateStorage::lastArchivedIdx); segmentLockStorage.addObserver(segmentArchivedStorage::onSegmentUnlocked); + + reservationStorage.addObserver(truncateStorage::minReservedIdx); } /** @@ -149,17 +161,21 @@ public class SegmentAware { } /** - * @param lastTruncatedArchiveIdx Last truncated segment; + * Update last truncated segment. + * + * @param absIdx Absolut segment index. */ - public void lastTruncatedArchiveIdx(long lastTruncatedArchiveIdx) { - segmentArchivedStorage.lastTruncatedArchiveIdx(lastTruncatedArchiveIdx); + public void lastTruncatedArchiveIdx(long absIdx) { + truncateStorage.lastTruncatedIdx(absIdx); } /** - * @return Last truncated segment. + * Getting last truncated segment. + * + * @return Absolut segment index. */ public long lastTruncatedArchiveIdx() { - return segmentArchivedStorage.lastTruncatedArchiveIdx(); + return truncateStorage.lastTruncatedIdx(); } /** @@ -241,6 +257,10 @@ public class SegmentAware { segmentCompressStorage.reset(); segmentCurrStateStorage.reset(); + + archiveSizeStorage.reset(); + + truncateStorage.reset(); } /** @@ -252,6 +272,10 @@ public class SegmentAware { segmentCompressStorage.interrupt(); segmentCurrStateStorage.interrupt(); + + archiveSizeStorage.interrupt(); + + truncateStorage.interrupt(); } /** @@ -263,6 +287,10 @@ public class SegmentAware { segmentCompressStorage.interrupt(); segmentCurrStateStorage.forceInterrupt(); + + archiveSizeStorage.interrupt(); + + truncateStorage.interrupt(); } /** @@ -288,4 +316,64 @@ public class SegmentAware { public boolean minLockIndex(long absIdx) { return segmentLockStorage.minLockIndex(absIdx); } + + /** + * Adding current WAL archive size in bytes. + * + * @param size Size in bytes. + */ + public void addCurrentWalArchiveSize(long size) { + archiveSizeStorage.addCurrentSize(size); + } + + /** + * Adding reserved WAL archive size in bytes. + * Defines a hint to determine if the maximum size is exceeded before a new segment is archived. + * + * @param size Size in bytes. + */ + public void addReservedWalArchiveSize(long size) { + archiveSizeStorage.addReservedSize(size); + } + + /** + * Reset the current and reserved WAL archive sizes. + */ + public void resetWalArchiveSizes() { + archiveSizeStorage.resetSizes(); + } + + /** + * Waiting for exceeding the maximum WAL archive size. To track size of WAL archive, + * need to use {@link #addCurrentWalArchiveSize} and {@link #addReservedWalArchiveSize}. + * + * @param max Maximum WAL archive size in bytes. + * @throws IgniteInterruptedCheckedException If it was interrupted. + */ + public void awaitExceedMaxArchiveSize(long max) throws IgniteInterruptedCheckedException { + archiveSizeStorage.awaitExceedMaxSize(max); + } + + /** + * Update segment of last completed checkpoint. + * Required for binary recovery. + * + * @param absIdx Absolut segment index. + */ + public void lastCheckpointIdx(long absIdx) { + truncateStorage.lastCheckpointIdx(absIdx); + } + + /** + * Waiting for segment truncation to be available. To get the number of segments available for truncation, use + * {@link #lastTruncatedArchiveIdx}, {@link #lastCheckpointIdx}, {@link #reserve} and + * {@link #lastArchivedAbsoluteIndex} (to restart the node correctly) and is calculated as + * {@code lastTruncatedArchiveIdx} - {@code min(lastCheckpointIdx, reserve, lastArchivedAbsoluteIndex)}. + * + * @return Number of segments available to truncate. + * @throws IgniteInterruptedCheckedException If it was interrupted. + */ + public long awaitAvailableTruncateArchive() throws IgniteInterruptedCheckedException { + return truncateStorage.awaitAvailableTruncate(); + } } diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/aware/SegmentCompressStorage.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/aware/SegmentCompressStorage.java index 5fca1ec..f71dd79 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/aware/SegmentCompressStorage.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/aware/SegmentCompressStorage.java @@ -27,7 +27,7 @@ import org.apache.ignite.internal.IgniteInterruptedCheckedException; /** * Storage of actual information about current index of compressed segments. */ -public class SegmentCompressStorage { +class SegmentCompressStorage { /** Logger. */ private final IgniteLogger log; diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/aware/SegmentLockStorage.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/aware/SegmentLockStorage.java index a5a7948..189559ac 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/aware/SegmentLockStorage.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/aware/SegmentLockStorage.java @@ -24,7 +24,7 @@ import org.apache.ignite.internal.processors.cache.persistence.wal.FileWriteAhea /** * Lock on segment protects from archiving segment. */ -public class SegmentLockStorage extends SegmentObservable { +class SegmentLockStorage extends SegmentObservable { /** * Maps absolute segment index to locks counter. Lock on segment protects from archiving segment and may come from * {@link FileWriteAheadLogManager.RecordsIterator} during WAL replay. Map itself is guarded by <code>this</code>. diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/aware/SegmentObservable.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/aware/SegmentObservable.java index 3e91504..f6cd9b0 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/aware/SegmentObservable.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/aware/SegmentObservable.java @@ -24,7 +24,7 @@ import java.util.function.Consumer; /** * Implementation of observer-observable pattern. For handling specific changes of segment. */ -public abstract class SegmentObservable { +abstract class SegmentObservable { /** Observers for handle changes of archived index. */ private final Queue<Consumer<Long>> observers = new ConcurrentLinkedQueue<>(); diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/aware/SegmentReservationStorage.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/aware/SegmentReservationStorage.java index 42eece7..453cc17 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/aware/SegmentReservationStorage.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/aware/SegmentReservationStorage.java @@ -16,13 +16,17 @@ */ package org.apache.ignite.internal.processors.cache.persistence.wal.aware; +import java.util.Map; import java.util.NavigableMap; +import java.util.Objects; import java.util.TreeMap; +import java.util.function.Consumer; +import org.jetbrains.annotations.Nullable; /** * Segment reservations storage: Protects WAL segments from deletion during WAL log cleanup. */ -class SegmentReservationStorage { +class SegmentReservationStorage extends SegmentObservable { /** * Maps absolute segment index to reservation counter. If counter > 0 then we wouldn't delete all segments which has * index >= reserved segment index. Guarded by {@code this}. @@ -38,14 +42,22 @@ class SegmentReservationStorage { * @param absIdx Index for reservation. * @return {@code True} if the reservation was successful. */ - synchronized boolean reserve(long absIdx) { - if (absIdx > minReserveIdx) { - reserved.merge(absIdx, 1, Integer::sum); + boolean reserve(long absIdx) { + boolean res = false; + Long minReservedIdx = null; - return true; + synchronized (this) { + if (absIdx > minReserveIdx) { + minReservedIdx = trackingMinReservedIdx(reserved -> reserved.merge(absIdx, 1, Integer::sum)); + + res = true; + } } - return false; + if (minReservedIdx != null) + notifyObservers(minReservedIdx); + + return res; } /** @@ -61,15 +73,24 @@ class SegmentReservationStorage { /** * @param absIdx Reserved index. */ - synchronized void release(long absIdx) { - Integer cur = reserved.get(absIdx); + void release(long absIdx) { + Long minReservedIdx; - assert cur != null && cur >= 1 : "cur=" + cur + ", absIdx=" + absIdx; + synchronized (this) { + minReservedIdx = trackingMinReservedIdx(reserved -> { + Integer cur = reserved.get(absIdx); - if (cur == 1) - reserved.remove(absIdx); - else - reserved.put(absIdx, cur - 1); + assert cur != null && cur >= 1 : "cur=" + cur + ", absIdx=" + absIdx; + + if (cur == 1) + reserved.remove(absIdx); + else + reserved.put(absIdx, cur - 1); + }); + } + + if (minReservedIdx != null) + notifyObservers(minReservedIdx); } /** @@ -88,4 +109,24 @@ class SegmentReservationStorage { return true; } + + /** + * Updating {@link #reserved} with tracking changes of minimum reserved segment. + * + * @param updateFun {@link #reserved} update function. + * @return New minimum reserved segment, {@code null} if there are no changes, + * {@code -1} if there are no reserved segments. + */ + @Nullable private synchronized Long trackingMinReservedIdx(Consumer<NavigableMap<Long, Integer>> updateFun) { + Map.Entry<Long, Integer> oldMinE = reserved.firstEntry(); + + updateFun.accept(reserved); + + Map.Entry<Long, Integer> newMinE = reserved.firstEntry(); + + Long oldMin = oldMinE == null ? null : oldMinE.getKey(); + Long newMin = newMinE == null ? null : newMinE.getKey(); + + return Objects.equals(oldMin, newMin) ? null : newMin == null ? -1 : newMin; + } } diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/aware/SegmentTruncateStorage.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/aware/SegmentTruncateStorage.java new file mode 100644 index 0000000..a65ef8a --- /dev/null +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/aware/SegmentTruncateStorage.java @@ -0,0 +1,151 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.cache.persistence.wal.aware; + +import org.apache.ignite.internal.IgniteInterruptedCheckedException; + +/** + * Store the last truncated segment and allows to get the number of segments available for truncation. + * We cannot truncate the segments required for {@link #lastCpIdx binary recovery}, {@link #minReservedIdx reserved} + * and {@link #lastArchivedIdx last archived} (to restart the node correctly). Thus, we need to take account of these + * conditions in the calculation of the number of segments available for truncation. + */ +class SegmentTruncateStorage { + /** Flag of interrupt waiting on this object. */ + private volatile boolean interrupted; + + /** Latest truncated segment. */ + private long lastTruncatedIdx = -1; + + /** Minimum reserved segment. */ + private long minReservedIdx = -1; + + /** Segment of last completed checkpoint. */ + private long lastCpIdx = -1; + + /** Last archived segment. */ + private long lastArchivedIdx = -1; + + /** + * Update last truncated segment. + * + * @param absIdx Absolut segment index. + */ + synchronized void lastTruncatedIdx(long absIdx) { + lastTruncatedIdx = absIdx; + + notifyAll(); + } + + /** + * Update minimum reserved segment. + * Protected from deletion. + * + * @param absIdx Absolut segment index. + */ + synchronized void minReservedIdx(long absIdx) { + minReservedIdx = absIdx; + + notifyAll(); + } + + /** + * Update segment of last completed checkpoint. + * Required for binary recovery. + * + * @param absIdx Absolut segment index. + */ + synchronized void lastCheckpointIdx(long absIdx) { + lastCpIdx = absIdx; + + notifyAll(); + } + + /** + * Update last archived segment. + * Needed to restart the node correctly. + * + * @param absIdx Absolut segment index. + */ + synchronized void lastArchivedIdx(long absIdx) { + lastArchivedIdx = absIdx; + + notifyAll(); + } + + /** + * Getting last truncated segment. + * + * @return Absolut segment index. + */ + synchronized long lastTruncatedIdx() { + return lastTruncatedIdx; + } + + /** + * Waiting for segment truncation to be available. Use {@link #lastTruncatedIdx}, {@link #lastCpIdx}, + * {@link #minReservedIdx} and {@link #lastArchivedIdx} to determine the number of segments to truncate. + * + * @return Number of segments available to truncate. + * @throws IgniteInterruptedCheckedException If it was interrupted. + */ + synchronized long awaitAvailableTruncate() throws IgniteInterruptedCheckedException { + try { + while (availableTruncateCnt() == 0 && !interrupted) + wait(); + } + catch (InterruptedException e) { + throw new IgniteInterruptedCheckedException(e); + } + + if (interrupted) + throw new IgniteInterruptedCheckedException("Interrupt waiting for truncation availability"); + + return availableTruncateCnt(); + } + + /** + * Interrupt waiting on this object. + */ + synchronized void interrupt() { + interrupted = true; + + notifyAll(); + } + + /** + * Resets interrupted flag. + */ + void reset() { + interrupted = false; + } + + /** + * Calculation the number of segments that can be truncated. + * + * @return Number of segments. + */ + private synchronized long availableTruncateCnt() { + long highIdx = minReservedIdx == -1 ? lastCpIdx : Math.min(minReservedIdx, lastCpIdx); + + // Protection against deleting the last segment from WAL archive for correct restart the node. + highIdx = lastArchivedIdx == -1 ? highIdx : Math.min(lastArchivedIdx, highIdx); + + return Math.max(0, highIdx == -1 ? 0 : highIdx - (lastTruncatedIdx + 1)); + } +} diff --git a/modules/core/src/main/java/org/apache/ignite/internal/util/IgniteUtils.java b/modules/core/src/main/java/org/apache/ignite/internal/util/IgniteUtils.java index 3cf5c38..5f5aafe 100755 --- a/modules/core/src/main/java/org/apache/ignite/internal/util/IgniteUtils.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/util/IgniteUtils.java @@ -12028,4 +12028,24 @@ public abstract class IgniteUtils { return sb.toString(); } + + /** + * Getting the total size of uncompressed data in zip. + * + * @param zip Zip file. + * @return Total uncompressed size. + * @throws IOException If failed. + */ + public static long uncompressedSize(File zip) throws IOException { + try (ZipFile zipFile = new ZipFile(zip)) { + long size = 0; + + Enumeration<? extends ZipEntry> entries = zipFile.entries(); + + while (entries.hasMoreElements()) + size += entries.nextElement().getSize(); + + return size; + } + } } diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/db/wal/WalDeletionArchiveAbstractTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/db/wal/WalDeletionArchiveAbstractTest.java index 9f3eaaf..e66abe3 100644 --- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/db/wal/WalDeletionArchiveAbstractTest.java +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/db/wal/WalDeletionArchiveAbstractTest.java @@ -17,29 +17,33 @@ package org.apache.ignite.internal.processors.cache.persistence.db.wal; +import java.io.File; import java.util.function.Consumer; import java.util.stream.Stream; import org.apache.ignite.Ignite; import org.apache.ignite.IgniteCache; import org.apache.ignite.IgniteException; import org.apache.ignite.cache.CacheAtomicityMode; +import org.apache.ignite.cluster.ClusterState; import org.apache.ignite.configuration.CacheConfiguration; import org.apache.ignite.configuration.DataRegionConfiguration; import org.apache.ignite.configuration.DataStorageConfiguration; import org.apache.ignite.configuration.IgniteConfiguration; import org.apache.ignite.configuration.WALMode; import org.apache.ignite.internal.IgniteEx; -import org.apache.ignite.internal.pagemem.wal.IgniteWriteAheadLogManager; import org.apache.ignite.internal.processors.cache.persistence.GridCacheDatabaseSharedManager; import org.apache.ignite.internal.processors.cache.persistence.checkpoint.CheckpointHistory; import org.apache.ignite.internal.processors.cache.persistence.checkpoint.Checkpointer; import org.apache.ignite.internal.processors.cache.persistence.wal.FileDescriptor; +import org.apache.ignite.internal.processors.cache.persistence.wal.FileWriteAheadLogManager; import org.apache.ignite.internal.util.typedef.internal.U; import org.apache.ignite.testframework.junits.WithSystemProperty; import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest; import org.junit.Test; import static org.apache.ignite.IgniteSystemProperties.IGNITE_PDS_MAX_CHECKPOINT_MEMORY_HISTORY_SIZE; +import static org.apache.ignite.testframework.GridTestUtils.getFieldValueHierarchy; +import static org.apache.ignite.testframework.GridTestUtils.waitForCondition; /** * @@ -66,14 +70,14 @@ public abstract class WalDeletionArchiveAbstractTest extends GridCommonAbstractT Ignite ignite = startGrid(configuration); - ignite.active(true); + ignite.cluster().state(ClusterState.ACTIVE); return ignite; } /** */ - private CacheConfiguration<Integer, Integer> cacheConfiguration() { - CacheConfiguration<Integer, Integer> ccfg = new CacheConfiguration<>(DEFAULT_CACHE_NAME); + private CacheConfiguration<Integer, Object> cacheConfiguration() { + CacheConfiguration<Integer, Object> ccfg = new CacheConfiguration<>(DEFAULT_CACHE_NAME); return ccfg.setAtomicityMode(CacheAtomicityMode.TRANSACTIONAL); } @@ -135,37 +139,36 @@ public abstract class WalDeletionArchiveAbstractTest extends GridCommonAbstractT public void testCorrectDeletedArchivedWalFiles() throws Exception { //given: configured grid with setted max wal archive size long maxWalArchiveSize = 2 * 1024 * 1024; - Ignite ignite = startGrid(dbCfg -> { - dbCfg.setMaxWalArchiveSize(maxWalArchiveSize); - }); + Ignite ignite = startGrid(dbCfg -> dbCfg.setMaxWalArchiveSize(maxWalArchiveSize)); GridCacheDatabaseSharedManager dbMgr = gridDatabase(ignite); - long allowedThresholdWalArchiveSize = maxWalArchiveSize / 2; + CheckpointHistory hist = dbMgr.checkpointHistory(); + assertNotNull(hist); - IgniteCache<Integer, Integer> cache = ignite.getOrCreateCache(cacheConfiguration()); + IgniteCache<Integer, Object> cache = ignite.getOrCreateCache(cacheConfiguration()); //when: put to cache more than 2 MB - for (int i = 0; i < 500; i++) + for (int i = 0; i < 500; i++) { + if (i % 100 == 0) + forceCheckpoint(); + cache.put(i, i); + } - forceCheckpoint(); + //then: total archive size less than of maxWalArchiveSize(by current logic) + FileWriteAheadLogManager wal = wal(ignite); - //then: total archive size less than half of maxWalArchiveSize(by current logic) - IgniteWriteAheadLogManager wal = wal(ignite); + assertTrue(waitForCondition(() -> wal.lastTruncatedSegment() >= 0, 10_000)); - FileDescriptor[] files = (FileDescriptor[])U.findNonPublicMethod(wal.getClass(), "walArchiveFiles").invoke(wal); + FileDescriptor[] files = wal.walArchiveFiles(); - Long totalSize = Stream.of(files) - .map(desc -> desc.file().length()) - .reduce(0L, Long::sum); + long totalSize = wal.totalSize(files); assertTrue(files.length >= 1); - assertTrue(totalSize <= allowedThresholdWalArchiveSize); + assertTrue(totalSize < maxWalArchiveSize); assertFalse(Stream.of(files).anyMatch(desc -> desc.file().getName().endsWith("00001.wal"))); - CheckpointHistory hist = dbMgr.checkpointHistory(); - assertTrue(!hist.checkpoints().isEmpty()); } @@ -175,13 +178,11 @@ public abstract class WalDeletionArchiveAbstractTest extends GridCommonAbstractT @Test public void testCheckpointStarted_WhenWalHasTooBigSizeWithoutCheckpoint() throws Exception { //given: configured grid with max wal archive size = 1MB, wal segment size = 512KB - Ignite ignite = startGrid(dbCfg -> { - dbCfg.setMaxWalArchiveSize(1 * 1024 * 1024);// 1 Mbytes - }); + Ignite ignite = startGrid(dbCfg -> dbCfg.setMaxWalArchiveSize(1024 * 1024)); GridCacheDatabaseSharedManager dbMgr = gridDatabase(ignite); - IgniteCache<Integer, Integer> cache = ignite.getOrCreateCache(cacheConfiguration()); + IgniteCache<Integer, Object> cache = ignite.getOrCreateCache(cacheConfiguration()); for (int i = 0; i < 500; i++) cache.put(i, i); @@ -195,20 +196,63 @@ public abstract class WalDeletionArchiveAbstractTest extends GridCommonAbstractT } /** - * Correct delete checkpoint history from memory depends on IGNITE_PDS_MAX_CHECKPOINT_MEMORY_HISTORY_SIZE. WAL files - * doesn't delete because deleting was disabled. + * Test for check deprecated removing checkpoint by deprecated walHistorySize parameter + * + * @deprecated Test old removing process depends on WalHistorySize. + */ + @Test + public void testCheckpointHistoryRemovingByTruncate() throws Exception { + Ignite ignite = startGrid(dbCfg -> dbCfg.setMaxWalArchiveSize(2 * 1024 * 1024)); + + GridCacheDatabaseSharedManager dbMgr = gridDatabase(ignite); + + IgniteCache<Integer, Object> cache = ignite.getOrCreateCache(cacheConfiguration()); + + CheckpointHistory hist = dbMgr.checkpointHistory(); + assertNotNull(hist); + + int startHistSize = hist.checkpoints().size(); + + int checkpointCnt = 10; + + for (int i = 0; i < checkpointCnt; i++) { + cache.put(i, i); + //and: wait for checkpoint finished + forceCheckpoint(); + // Check that the history is growing. + assertEquals(startHistSize + (i + 1), hist.checkpoints().size()); + } + + // Ensure rollover and wal archive cleaning. + for (int i = 0; i < 6; i++) + cache.put(i, new byte[ignite.configuration().getDataStorageConfiguration().getWalSegmentSize() / 2]); + + FileWriteAheadLogManager wal = wal(ignite); + assertTrue(waitForCondition(() -> wal.lastTruncatedSegment() >= 0, 10_000)); + + assertTrue(hist.checkpoints().size() < checkpointCnt + startHistSize); + + File[] cpFiles = dbMgr.checkpointDirectory().listFiles(); + + assertTrue(cpFiles.length <= (checkpointCnt * 2 + 1));// starts & ends + node_start + } + + /** + * Correct delete checkpoint history from memory depends on IGNITE_PDS_MAX_CHECKPOINT_MEMORY_HISTORY_SIZE. + * WAL files doesn't delete because deleting was disabled. */ @Test @WithSystemProperty(key = IGNITE_PDS_MAX_CHECKPOINT_MEMORY_HISTORY_SIZE, value = "2") public void testCorrectDeletedCheckpointHistoryButKeepWalFiles() throws Exception { //given: configured grid with disabled WAL removing. - Ignite ignite = startGrid(dbCfg -> { - dbCfg.setMaxWalArchiveSize(DataStorageConfiguration.UNLIMITED_WAL_ARCHIVE); - }); + Ignite ignite = startGrid(dbCfg -> dbCfg.setMaxWalArchiveSize(DataStorageConfiguration.UNLIMITED_WAL_ARCHIVE)); GridCacheDatabaseSharedManager dbMgr = gridDatabase(ignite); - IgniteCache<Integer, Integer> cache = ignite.getOrCreateCache(cacheConfiguration()); + CheckpointHistory hist = dbMgr.checkpointHistory(); + assertNotNull(hist); + + IgniteCache<Integer, Object> cache = ignite.getOrCreateCache(cacheConfiguration()); //when: put to cache for (int i = 0; i < 500; i++) { @@ -221,16 +265,12 @@ public abstract class WalDeletionArchiveAbstractTest extends GridCommonAbstractT forceCheckpoint(); //then: WAL files was not deleted but some of checkpoint history was deleted. - IgniteWriteAheadLogManager wal = wal(ignite); - - FileDescriptor[] files = (FileDescriptor[])U.findNonPublicMethod(wal.getClass(), "walArchiveFiles").invoke(wal); + FileWriteAheadLogManager wal = wal(ignite); + assertNull(getFieldValueHierarchy(wal, "cleaner")); - boolean hasFirstSegment = Stream.of(files) - .anyMatch(desc -> desc.file().getName().endsWith("0001.wal")); + FileDescriptor[] files = wal.walArchiveFiles(); - assertTrue(hasFirstSegment); - - CheckpointHistory hist = dbMgr.checkpointHistory(); + assertTrue(Stream.of(files).anyMatch(desc -> desc.file().getName().endsWith("0001.wal"))); assertTrue(hist.checkpoints().size() == 2); } @@ -245,7 +285,7 @@ public abstract class WalDeletionArchiveAbstractTest extends GridCommonAbstractT /** * Extract IgniteWriteAheadLogManager. */ - private IgniteWriteAheadLogManager wal(Ignite ignite) { - return ((IgniteEx)ignite).context().cache().context().wal(); + private FileWriteAheadLogManager wal(Ignite ignite) { + return (FileWriteAheadLogManager)((IgniteEx)ignite).context().cache().context().wal(); } } diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/pagemem/NoOpWALManager.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/pagemem/NoOpWALManager.java index 10634cc..035f289 100644 --- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/pagemem/NoOpWALManager.java +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/pagemem/NoOpWALManager.java @@ -185,11 +185,6 @@ public class NoOpWALManager implements IgniteWriteAheadLogManager { } /** {@inheritDoc} */ - @Override public long maxArchivedSegmentToDelete() { - return -1; - } - - /** {@inheritDoc} */ @Override public long segmentSize(long idx) { return -1; } diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/wal/aware/SegmentAwareTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/wal/aware/SegmentAwareTest.java index 5037ca3..18291c4 100644 --- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/wal/aware/SegmentAwareTest.java +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/wal/aware/SegmentAwareTest.java @@ -680,6 +680,101 @@ public class SegmentAwareTest { } /** + * Checking the correctness of WAL archive size. + * + * @throws Exception If failed. + */ + @Test + public void testWalArchiveSize() throws Exception { + SegmentAware aware = new SegmentAware(10, false, new NullLogger()); + + IgniteInternalFuture<?> fut = awaitThread(() -> aware.awaitExceedMaxArchiveSize(10)); + + aware.addCurrentWalArchiveSize(4); + assertFutureIsNotFinish(fut); + + aware.addReservedWalArchiveSize(4); + assertFutureIsNotFinish(fut); + + aware.addCurrentWalArchiveSize(4); + fut.get(20); + + aware.resetWalArchiveSizes(); + + fut = awaitThread(() -> aware.awaitExceedMaxArchiveSize(10)); + + aware.addCurrentWalArchiveSize(4); + assertFutureIsNotFinish(fut); + + aware.addReservedWalArchiveSize(4); + assertFutureIsNotFinish(fut); + + aware.addReservedWalArchiveSize(4); + fut.get(20); + + aware.resetWalArchiveSizes(); + + fut = awaitThread(() -> aware.awaitExceedMaxArchiveSize(10)); + + aware.interrupt(); + assertTrue(fut.get(20) instanceof IgniteInterruptedCheckedException); + + aware.reset(); + + fut = awaitThread(() -> aware.awaitExceedMaxArchiveSize(10)); + + aware.forceInterrupt(); + assertTrue(fut.get(20) instanceof IgniteInterruptedCheckedException); + } + + /** + * Checking the correctness of truncate logic. + * + * @throws Exception If failed. + */ + @Test + public void testTruncate() throws Exception { + SegmentAware aware = new SegmentAware(10, false, new NullLogger()); + + IgniteInternalFuture<?> fut = awaitThread(aware::awaitAvailableTruncateArchive); + + aware.lastCheckpointIdx(5); + + fut.get(20); + assertEquals(5, aware.awaitAvailableTruncateArchive()); + + aware.reserve(4); + assertEquals(4, aware.awaitAvailableTruncateArchive()); + + aware.setLastArchivedAbsoluteIndex(3); + assertEquals(3, aware.awaitAvailableTruncateArchive()); + + aware.lastTruncatedArchiveIdx(0); + assertEquals(2, aware.awaitAvailableTruncateArchive()); + assertEquals(0, aware.lastTruncatedArchiveIdx()); + + aware.reserve(0); + fut = awaitThread(aware::awaitAvailableTruncateArchive); + + aware.release(0); + + fut.get(20); + assertEquals(2, aware.awaitAvailableTruncateArchive()); + + aware.setLastArchivedAbsoluteIndex(4); + assertEquals(3, aware.awaitAvailableTruncateArchive()); + + aware.release(4); + assertEquals(3, aware.awaitAvailableTruncateArchive()); + + aware.lastCheckpointIdx(6); + assertEquals(3, aware.awaitAvailableTruncateArchive()); + + aware.setLastArchivedAbsoluteIndex(6); + assertEquals(5, aware.awaitAvailableTruncateArchive()); + } + + /** * Assert that future is still not finished. * * @param future Future to check. diff --git a/modules/core/src/test/java/org/apache/ignite/internal/util/IgniteUtilsSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/util/IgniteUtilsSelfTest.java index 2d38439..3f1cf07 100644 --- a/modules/core/src/test/java/org/apache/ignite/internal/util/IgniteUtilsSelfTest.java +++ b/modules/core/src/test/java/org/apache/ignite/internal/util/IgniteUtilsSelfTest.java @@ -25,6 +25,7 @@ import java.io.DataInputStream; import java.io.DataOutput; import java.io.DataOutputStream; import java.io.File; +import java.io.FileOutputStream; import java.io.IOException; import java.io.InputStreamReader; import java.io.ObjectInputStream; @@ -39,6 +40,7 @@ import java.net.InetAddress; import java.net.InetSocketAddress; import java.net.URL; import java.nio.ByteBuffer; +import java.nio.charset.StandardCharsets; import java.util.ArrayList; import java.util.Arrays; import java.util.Collection; @@ -55,6 +57,7 @@ import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeoutException; import java.util.concurrent.atomic.AtomicInteger; import java.util.function.Consumer; +import java.util.stream.IntStream; import org.apache.ignite.Ignite; import org.apache.ignite.IgniteCheckedException; import org.apache.ignite.IgniteException; @@ -90,6 +93,7 @@ import static java.util.concurrent.TimeUnit.HOURS; import static java.util.concurrent.TimeUnit.MINUTES; import static java.util.concurrent.TimeUnit.SECONDS; import static java.util.stream.Collectors.joining; +import static org.apache.ignite.testframework.GridTestUtils.assertThrows; import static org.apache.ignite.testframework.GridTestUtils.readResource; import static org.junit.Assert.assertArrayEquals; @@ -1424,6 +1428,34 @@ public class IgniteUtilsSelfTest extends GridCommonAbstractTest { } /** + * Test to verify the {@link U#uncompressedSize}. + * + * @throws Exception If failed. + */ + @Test + public void testUncompressedSize() throws Exception { + File zipFile = new File(System.getProperty("java.io.tmpdir"), "test.zip"); + + try { + assertThrows(log, () -> U.uncompressedSize(zipFile), IOException.class, null); + + byte[] raw = IntStream.range(0, 10).mapToObj(i -> zipFile.getAbsolutePath() + i) + .collect(joining()).getBytes(StandardCharsets.UTF_8); + + try (FileOutputStream fos = new FileOutputStream(zipFile)) { + fos.write(U.zip(raw)); + + fos.flush(); + } + + assertEquals(raw.length, U.uncompressedSize(zipFile)); + } + finally { + assertTrue(U.delete(zipFile)); + } + } + + /** * Reading lines from a resource file and passing them to consumer. * If read string is {@code "null"}, it is converted to {@code null}. *