This is an automated email from the ASF dual-hosted git repository. ilyak pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/ignite.git
The following commit(s) were added to refs/heads/master by this push: new 28b15f1 IGNITE-13815 Remove ability to delete segments from the middle of WAL archive - Fixes #8545. 28b15f1 is described below commit 28b15f134c260436bd705acd646e611000dae180 Author: Kirill Tkalenko <tkalkir...@yandex.ru> AuthorDate: Fri Dec 11 15:28:47 2020 +0300 IGNITE-13815 Remove ability to delete segments from the middle of WAL archive - Fixes #8545. Signed-off-by: Ilya Kasnacheev <ilya.kasnach...@gmail.com> --- .../pagemem/wal/IgniteWriteAheadLogManager.java | 11 +- .../persistence/checkpoint/CheckpointHistory.java | 2 +- .../persistence/wal/FileWriteAheadLogManager.java | 345 ++++++++++----------- .../wal/aware/SegmentArchivedStorage.java | 17 +- .../cache/persistence/wal/aware/SegmentAware.java | 92 +++--- .../wal/aware/SegmentCompressStorage.java | 42 +-- .../wal/aware/SegmentCurrentStateStorage.java | 84 +++-- .../persistence/wal/aware/SegmentLockStorage.java | 39 ++- .../wal/aware/SegmentReservationStorage.java | 35 ++- .../persistence/wal/io/LockedReadFileInput.java | 27 +- .../ignite/internal/visor/misc/VisorWalTask.java | 2 +- .../db/IgnitePdsReserveWalSegmentsTest.java | 148 ++++++--- .../db/wal/IgniteWalIteratorSwitchSegmentTest.java | 171 +++++----- .../cache/persistence/pagemem/NoOpWALManager.java | 2 +- .../persistence/wal/aware/SegmentAwareTest.java | 73 ++++- 15 files changed, 602 insertions(+), 488 deletions(-) diff --git a/modules/core/src/main/java/org/apache/ignite/internal/pagemem/wal/IgniteWriteAheadLogManager.java b/modules/core/src/main/java/org/apache/ignite/internal/pagemem/wal/IgniteWriteAheadLogManager.java index cb4fc30..f3d85c5 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/pagemem/wal/IgniteWriteAheadLogManager.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/pagemem/wal/IgniteWriteAheadLogManager.java @@ -146,15 +146,14 @@ public interface IgniteWriteAheadLogManager extends GridCacheSharedManager, Igni public void release(WALPointer start) throws IgniteCheckedException; /** - * Gives a hint to WAL manager to clear entries logged before the given pointer. Some entries before the - * the given pointer will be kept because there is a configurable WAL history size. Those entries may be used - * for partial partition rebalancing. + * Gives a hint to WAL manager to clear entries logged before the given pointer. + * If entries are needed for binary recovery, they will not be affected. + * Some entries may be reserved eg for historical rebalance and they also will not be affected. * - * @param low Pointer since which WAL will be truncated. If null, WAL will be truncated from the oldest segment. - * @param high Pointer for which it is safe to clear the log. + * @param high Upper border to which WAL segments will be deleted. * @return Number of deleted WAL segments. */ - public int truncate(WALPointer low, WALPointer high); + public int truncate(@Nullable WALPointer high); /** * Notifies {@code this} about latest checkpoint pointer. diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/checkpoint/CheckpointHistory.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/checkpoint/CheckpointHistory.java index 1112579..ed28d95 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/checkpoint/CheckpointHistory.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/checkpoint/CheckpointHistory.java @@ -378,7 +378,7 @@ public class CheckpointHistory { int deleted = 0; if (truncateWalOnCpFinish) - deleted += wal.truncate(null, firstCheckpointPointer()); + deleted += wal.truncate(firstCheckpointPointer()); chp.walFilesDeleted(deleted); diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/FileWriteAheadLogManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/FileWriteAheadLogManager.java index 34db1dee..7a24fa9 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/FileWriteAheadLogManager.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/FileWriteAheadLogManager.java @@ -290,8 +290,8 @@ public class FileWriteAheadLogManager extends GridCacheSharedManagerAdapter impl /** Failure processor */ private final FailureProcessor failureProcessor; - /** */ - private IgniteConfiguration igCfg; + /** Ignite configuration. */ + private final IgniteConfiguration igCfg; /** Persistence metrics tracker. */ private DataStorageMetricsImpl metrics; @@ -400,7 +400,12 @@ public class FileWriteAheadLogManager extends GridCacheSharedManagerAdapter impl */ private final Map<Long, Long> segmentSize = new ConcurrentHashMap<>(); + /** Pointer to the last successful checkpoint until which WAL segments can be safely deleted. */ + private volatile WALPointer lastCheckpointPtr = new WALPointer(0, 0, 0); + /** + * Constructor. + * * @param ctx Kernal context. */ public FileWriteAheadLogManager(final GridKernalContext ctx) { @@ -428,8 +433,8 @@ public class FileWriteAheadLogManager extends GridCacheSharedManagerAdapter impl fileHandleManagerFactory = new FileHandleManagerFactory(dsCfg); maxSegCountWithoutCheckpoint = - (long)((U.adjustedWalHistorySize(dsCfg, log) * CHECKPOINT_TRIGGER_ARCHIVE_SIZE_PERCENTAGE) - / dsCfg.getWalSegmentSize()); + (long)((U.adjustedWalHistorySize(dsCfg, log) * CHECKPOINT_TRIGGER_ARCHIVE_SIZE_PERCENTAGE) + / dsCfg.getWalSegmentSize()); switchSegmentRecordOffset = isArchiverEnabled() ? new AtomicLongArray(dsCfg.getWalSegments()) : null; } @@ -973,7 +978,8 @@ public class FileWriteAheadLogManager extends GridCacheSharedManagerAdapter impl log, segmentAware, segmentRouter, - lockedSegmentFileInputFactory); + lockedSegmentFileInputFactory + ); try { iter.init(); // Make sure iterator is closed on any error. @@ -989,25 +995,27 @@ public class FileWriteAheadLogManager extends GridCacheSharedManagerAdapter impl /** {@inheritDoc} */ @Override public boolean reserve(WALPointer start) { - assert start != null : "Invalid start pointer: " + start; + assert start != null; if (mode == WALMode.NONE) return false; - segmentAware.reserve(start.index()); + // Protection from deletion. + boolean reserved = segmentAware.reserve(start.index()); - if (!hasIndex(start.index())) { - segmentAware.release(start.index()); + // Segment presence check. + if (reserved && !hasIndex(start.index())) { + segmentAware.reserve(start.index()); - return false; + reserved = false; } - return true; + return reserved; } /** {@inheritDoc} */ @Override public void release(WALPointer start) { - assert start != null : "Invalid start pointer: " + start; + assert start != null; if (mode == WALMode.NONE) return; @@ -1016,16 +1024,16 @@ public class FileWriteAheadLogManager extends GridCacheSharedManagerAdapter impl } /** - * @param absIdx Absolulte index to check. - * @return {@code true} if has this index. + * Checking for the existence of an index. + * + * @param absIdx Segment index. + * @return {@code True} exists. */ private boolean hasIndex(long absIdx) { String segmentName = fileName(absIdx); - String zipSegmentName = segmentName + ZIP_SUFFIX; - boolean inArchive = new File(walArchiveDir, segmentName).exists() || - new File(walArchiveDir, zipSegmentName).exists(); + new File(walArchiveDir, segmentName + ZIP_SUFFIX).exists(); if (inArchive) return true; @@ -1039,30 +1047,25 @@ public class FileWriteAheadLogManager extends GridCacheSharedManagerAdapter impl } /** {@inheritDoc} */ - @Override public int truncate(WALPointer low, WALPointer high) { + @Override public int truncate(@Nullable WALPointer high) { if (high == null) return 0; - // File pointer bound: older entries will be deleted from archive - - FileDescriptor[] descs = scan(walArchiveDir.listFiles(WAL_SEGMENT_COMPACTED_OR_RAW_FILE_FILTER)); + FileDescriptor[] descs = walArchiveFiles(); int deleted = 0; for (FileDescriptor desc : descs) { - if (low != null && desc.idx < low.index()) - continue; - - // Do not delete reserved or locked segment and any segment after it. - if (segmentReservedOrLocked(desc.idx)) - return deleted; - long archivedAbsIdx = segmentAware.lastArchivedAbsoluteIndex(); long lastArchived = archivedAbsIdx >= 0 ? archivedAbsIdx : lastArchivedIndex(); - // We need to leave at least one archived segment to correctly determine the archive index. - if (desc.idx < high.index() && desc.idx < lastArchived) { + if (desc.idx >= lastCheckpointPtr.index() // We cannot delete segments needed for binary recovery. + || desc.idx >= lastArchived // We cannot delete last segment, it is needed at start of node and avoid gaps. + || !segmentAware.minReserveIndex(desc.idx)) // We cannot delete reserved segment. + return deleted; + + if (desc.idx < high.index()) { if (!desc.file.delete()) { U.warn(log, "Failed to remove obsolete WAL segment (make sure the process has enough rights): " + desc.file.getAbsolutePath()); @@ -1099,8 +1102,7 @@ public class FileWriteAheadLogManager extends GridCacheSharedManagerAdapter impl /** {@inheritDoc} */ @Override public void notchLastCheckpointPtr(WALPointer ptr) { - if (compressor != null) - segmentAware.keepUncompressedIdxFrom(ptr.index()); + lastCheckpointPtr = ptr; } /** {@inheritDoc} */ @@ -1117,9 +1119,7 @@ public class FileWriteAheadLogManager extends GridCacheSharedManagerAdapter impl if (lastArchived == -1) return 0; - int res = (int)(lastArchived - lastTruncated); - - return res >= 0 ? res : 0; + return Math.max((int)(lastArchived - lastTruncated), 0); } /** {@inheritDoc} */ @@ -1369,9 +1369,13 @@ public class FileWriteAheadLogManager extends GridCacheSharedManagerAdapter impl FileWriteHandle hnd = fileHandleManager.initHandle(fileIO, off + len, ser); - if (archiver0 != null) - segmentAware.curAbsWalIdx(absIdx); - else + segmentAware.curAbsWalIdx(absIdx); + + FileDescriptor[] walArchiveFiles = walArchiveFiles(); + + segmentAware.minReserveIndex(F.isEmpty(walArchiveFiles) ? -1 : walArchiveFiles[0].idx - 1); + + if (archiver0 == null) segmentAware.setLastArchivedAbsoluteIndex(absIdx - 1); // Getting segment sizes. @@ -1494,17 +1498,17 @@ public class FileWriteAheadLogManager extends GridCacheSharedManagerAdapter impl * @throws StorageException If failed. */ private void checkOrPrepareFiles() throws StorageException { - // Clean temp files. - { - File[] tmpFiles = walWorkDir.listFiles(WAL_SEGMENT_TEMP_FILE_FILTER); - - if (!F.isEmpty(tmpFiles)) { - for (File tmp : tmpFiles) { - if (!tmp.delete()) { - throw new StorageException("Failed to delete previously created temp file " + - "(make sure Ignite process has enough rights): " + tmp.getAbsolutePath()); - } - } + Collection<File> tmpFiles = new HashSet<>(); + + for (File walDir : F.asList(walWorkDir, walArchiveDir)) { + tmpFiles.addAll(F.asList(walDir.listFiles(WAL_SEGMENT_TEMP_FILE_FILTER))); + tmpFiles.addAll(F.asList(walDir.listFiles(WAL_SEGMENT_TEMP_FILE_COMPACTED_FILTER))); + } + + for (File tmpFile : tmpFiles) { + if (tmpFile.exists() && !tmpFile.delete()) { + throw new StorageException("Failed to delete previously created temp file " + + "(make sure Ignite process has enough rights): " + tmpFile.getAbsolutePath()); } } @@ -1605,6 +1609,7 @@ public class FileWriteAheadLogManager extends GridCacheSharedManagerAdapter impl FileArchiver archiver0 = archiver; if (archiver0 == null) { + segmentAware.curAbsWalIdx(curIdx + 1); segmentAware.setLastArchivedAbsoluteIndex(curIdx); return new File(walWorkDir, fileName(curIdx + 1)); @@ -1634,7 +1639,9 @@ public class FileWriteAheadLogManager extends GridCacheSharedManagerAdapter impl } /** - * Files from archive WAL directory. + * Files from {@link #walArchiveDir}. + * + * @return Raw or compressed WAL segments from archive. */ private FileDescriptor[] walArchiveFiles() { return scan(walArchiveDir.listFiles(WAL_SEGMENT_COMPACTED_OR_RAW_FILE_FILTER)); @@ -1985,22 +1992,6 @@ public class FileWriteAheadLogManager extends GridCacheSharedManagerAdapter impl } /** - * @param absIdx Segment absolute index. - * @return <ul><li>{@code True} if can read, no lock is held, </li><li>{@code false} if work segment, need - * release segment later, use {@link #releaseWorkSegment} for unlock</li> </ul> - */ - public boolean checkCanReadArchiveOrReserveWorkSegment(long absIdx) { - return segmentAware.checkCanReadArchiveOrReserveWorkSegment(absIdx); - } - - /** - * @param absIdx Segment absolute index. - */ - public void releaseWorkSegment(long absIdx) { - segmentAware.releaseWorkSegment(absIdx); - } - - /** * Moves WAL segment from work folder to archive folder. Temp file is used to do movement. * * @param absIdx Absolute index to archive. @@ -2081,18 +2072,12 @@ public class FileWriteAheadLogManager extends GridCacheSharedManagerAdapter impl checkFiles( 1, true, - new IgnitePredicate<Integer>() { - @Override public boolean apply(Integer integer) { - return !checkStop(); - } - }, - new CI1<Integer>() { - @Override public void apply(Integer idx) { - synchronized (FileArchiver.this) { - formatted = idx; + (IgnitePredicate<Integer>)integer -> !checkStop(), + (CI1<Integer>)idx -> { + synchronized (FileArchiver.this) { + formatted = idx; - FileArchiver.this.notifyAll(); - } + FileArchiver.this.notifyAll(); } } ); @@ -2131,15 +2116,6 @@ public class FileWriteAheadLogManager extends GridCacheSharedManagerAdapter impl /** */ private void init() { - File[] toDel = walArchiveDir.listFiles(WAL_SEGMENT_TEMP_FILE_COMPACTED_FILTER); - - for (File f : toDel) { - if (isCancelled()) - return; - - f.delete(); - } - for (int i = 1; i < calculateThreadCount(); i++) { FileCompressorWorker worker = new FileCompressorWorker(i, log); @@ -2400,7 +2376,7 @@ public class FileWriteAheadLogManager extends GridCacheSharedManagerAdapter impl if (segmentReservedOrLocked(desc.idx)) return; - if (desc.idx < segmentAware.keepUncompressedIdxFrom() && duplicateIndices.contains(desc.idx)) { + if (desc.idx < lastCheckpointPtr.index() && duplicateIndices.contains(desc.idx)) { if (desc.file.exists() && !desc.file.delete()) { U.warn(log, "Failed to remove obsolete WAL segment " + "(make sure the process has enough rights): " + desc.file.getAbsolutePath() + @@ -2416,13 +2392,13 @@ public class FileWriteAheadLogManager extends GridCacheSharedManagerAdapter impl */ private class FileDecompressor extends GridWorker { /** Decompression futures. */ - private Map<Long, GridFutureAdapter<Void>> decompressionFutures = new HashMap<>(); + private final Map<Long, GridFutureAdapter<Void>> decompressionFutures = new HashMap<>(); /** Segments queue. */ private final PriorityBlockingQueue<Long> segmentsQueue = new PriorityBlockingQueue<>(); /** Byte array for draining data. */ - private byte[] arr = new byte[BUF_SIZE]; + private final byte[] arr = new byte[BUF_SIZE]; /** * @param log Logger. @@ -2730,18 +2706,16 @@ public class FileWriteAheadLogManager extends GridCacheSharedManagerAdapter impl private final DataStorageConfiguration dsCfg; /** Optional start pointer. */ - @Nullable - private WALPointer start; + @Nullable private final WALPointer start; /** Optional end pointer. */ - @Nullable - private WALPointer end; + @Nullable private final WALPointer end; /** Manager of segment location. */ - private SegmentRouter segmentRouter; + private final SegmentRouter segmentRouter; /** Holder of actual information of latest manipulation on WAL segments. */ - private SegmentAware segmentAware; + private final SegmentAware segmentAware; /** * @param cctx Shared context. @@ -2756,10 +2730,10 @@ public class FileWriteAheadLogManager extends GridCacheSharedManagerAdapter impl * @param log Logger @throws IgniteCheckedException If failed to initialize WAL segment. * @param segmentAware Segment aware. * @param segmentRouter Segment router. - * @param segmentFileInputFactory + * @param segmentFileInputFactory Factory to provide I/O interfaces for read primitives with files. */ private RecordsIterator( - GridCacheSharedContext cctx, + GridCacheSharedContext<?, ?> cctx, File walArchiveDir, File walWorkDir, @Nullable WALPointer start, @@ -2774,13 +2748,15 @@ public class FileWriteAheadLogManager extends GridCacheSharedManagerAdapter impl SegmentRouter segmentRouter, SegmentFileInputFactory segmentFileInputFactory ) throws IgniteCheckedException { - super(log, + super( + log, cctx, serializerFactory, ioFactory, dsCfg.getWalRecordIteratorBufferSize(), segmentFileInputFactory ); + this.walArchiveDir = walArchiveDir; this.walWorkDir = walWorkDir; this.archiver = archiver; @@ -2890,57 +2866,70 @@ public class FileWriteAheadLogManager extends GridCacheSharedManagerAdapter impl curWalSegmIdx++; - boolean readArchive = canReadArchiveOrReserveWork(curWalSegmIdx); //lock during creation handle. + // Segment deletion protection. + if (!segmentAware.reserve(curWalSegmIdx)) + throw new IgniteCheckedException("Segment does not exist: " + curWalSegmIdx); - FileDescriptor fd = null; - ReadFileHandle nextHandle; try { - fd = segmentRouter.findSegment(curWalSegmIdx); + // Protection against transferring a segment to the archive by #archiver. + boolean readArchive = archiver != null && !segmentAware.lock(curWalSegmIdx); - if (log.isDebugEnabled()) - log.debug("Reading next file [absIdx=" + curWalSegmIdx + ", file=" + fd.file.getAbsolutePath() + ']'); + FileDescriptor fd = null; + ReadFileHandle nextHandle; + try { + fd = segmentRouter.findSegment(curWalSegmIdx); - nextHandle = initReadHandle(fd, start != null && curWalSegmIdx == start.index() ? start : null); - } - catch (FileNotFoundException e) { - if (readArchive) - throw new IgniteCheckedException("Missing WAL segment in the archive", e); - else { - // Log only when no segments were read. This will help us avoiding logging on the end of the WAL. - if (curRec == null && curWalSegment == null) { - File workDirFile = new File(walWorkDir, fileName(curWalSegmIdx % dsCfg.getWalSegments())); - File archiveDirFile = new File(walArchiveDir, fileName(curWalSegmIdx)); - - U.warn( - log, - "Next segment file is not found [" + - "curWalSegmIdx=" + curWalSegmIdx - + ", start=" + start - + ", end=" + end - + ", filePath=" + (fd == null ? "<empty>" : fd.file.getAbsolutePath()) - + ", walWorkDir=" + walWorkDir - + ", walWorkDirContent=" + listFileNames(walWorkDir) - + ", walArchiveDir=" + walArchiveDir - + ", walArchiveDirContent=" + listFileNames(walArchiveDir) - + ", workDirFile=" + workDirFile.getName() - + ", exists=" + workDirFile.exists() - + ", archiveDirFile=" + archiveDirFile.getName() - + ", exists=" + archiveDirFile.exists() - + "]", - e - ); + if (log.isDebugEnabled()) { + log.debug("Reading next file [absIdx=" + curWalSegmIdx + + ", file=" + fd.file.getAbsolutePath() + ']'); } - nextHandle = null; + nextHandle = initReadHandle(fd, start != null && curWalSegmIdx == start.index() ? start : null); } - } + catch (FileNotFoundException e) { + if (readArchive) + throw new IgniteCheckedException("Missing WAL segment in the archive: " + curWalSegment, e); + else { + // Log only when no segments were read. This will help us avoiding logging on the end of the WAL. + if (curRec == null && curWalSegment == null) { + File workDirFile = new File(walWorkDir, fileName(curWalSegmIdx % dsCfg.getWalSegments())); + File archiveDirFile = new File(walArchiveDir, fileName(curWalSegmIdx)); + + U.warn( + log, + "Next segment file is not found [" + + "curWalSegmIdx=" + curWalSegmIdx + + ", start=" + start + + ", end=" + end + + ", filePath=" + (fd == null ? "<empty>" : fd.file.getAbsolutePath()) + + ", walWorkDir=" + walWorkDir + + ", walWorkDirContent=" + listFileNames(walWorkDir) + + ", walArchiveDir=" + walArchiveDir + + ", walArchiveDirContent=" + listFileNames(walArchiveDir) + + ", workDirFile=" + workDirFile.getName() + + ", exists=" + workDirFile.exists() + + ", archiveDirFile=" + archiveDirFile.getName() + + ", exists=" + archiveDirFile.exists() + + "]", + e + ); + } - if (!readArchive) - releaseWorkSegment(curWalSegmIdx); + nextHandle = null; + } + } + finally { + if (archiver != null && !readArchive) + segmentAware.unlock(curWalSegmIdx); + } - curRec = null; + curRec = null; - return nextHandle; + return nextHandle; + } + finally { + segmentAware.release(curWalSegmIdx); + } } /** */ @@ -2955,63 +2944,47 @@ public class FileWriteAheadLogManager extends GridCacheSharedManagerAdapter impl /** {@inheritDoc} */ @Override protected IgniteCheckedException handleRecordException(Exception e, @Nullable WALPointer ptr) { - if (e instanceof IgniteCheckedException) - if (X.hasCause(e, IgniteDataIntegrityViolationException.class)) - // This means that there is no explicit last sengment, so we iterate unil the very end. - if (end == null) { - long nextWalSegmentIdx = curWalSegmIdx + 1; - - if (!isArchiverEnabled()) - if (canIgnoreCrcError(nextWalSegmentIdx, nextWalSegmentIdx, e, ptr)) - return null; - + if (e instanceof IgniteCheckedException && X.hasCause(e, IgniteDataIntegrityViolationException.class)) { + // This means that there is no explicit last segment, so we iterate until the very end. + if (end == null) { + long nextWalSegmentIdx = curWalSegmIdx + 1; + + if (archiver == null) { + if (canIgnoreCrcError(nextWalSegmentIdx, nextWalSegmentIdx, e, ptr)) + return null; + } + else { // Check that we should not look this segment up in archive directory. // Basically the same check as in "advanceSegment" method. - if (isArchiverEnabled() && archiver != null) - if (!canReadArchiveOrReserveWork(nextWalSegmentIdx)) - try { - long workIdx = nextWalSegmentIdx % dsCfg.getWalSegments(); - if (canIgnoreCrcError(workIdx, nextWalSegmentIdx, e, ptr)) - return null; - } - finally { - releaseWorkSegment(nextWalSegmentIdx); + // Segment deletion protection. + if (segmentAware.reserve(nextWalSegmentIdx)) { + try { + // Protection against transferring a segment to the archive by #archiver. + if (segmentAware.lock(nextWalSegmentIdx)) { + try { + long workIdx = nextWalSegmentIdx % dsCfg.getWalSegments(); + + if (canIgnoreCrcError(workIdx, nextWalSegmentIdx, e, ptr)) + return null; + } + finally { + segmentAware.unlock(nextWalSegmentIdx); + } } + } + finally { + segmentAware.release(nextWalSegmentIdx); + } + } } + } + } return super.handleRecordException(e, ptr); } /** - * @param absIdx Absolute index to check. - * @return <ul><li> {@code True} if we can safely read the archive, </li> <li>{@code false} if the segment has - * not been archived yet. In this case the corresponding work segment is reserved (will not be deleted until - * release). Use {@link #releaseWorkSegment} for unlock </li></ul> - */ - private boolean canReadArchiveOrReserveWork(long absIdx) { - return archiver != null && archiver.checkCanReadArchiveOrReserveWorkSegment(absIdx); - } - - /** - * @param absIdx Absolute index to release. - */ - private void releaseWorkSegment(long absIdx) { - if (archiver != null) - archiver.releaseWorkSegment(absIdx); - } - - /** - * Check that archiver is enabled - */ - private boolean isArchiverEnabled() { - if (walArchiveDir != null && walWorkDir != null) - return !walArchiveDir.equals(walWorkDir); - - return !new File(dsCfg.getWalArchivePath()).equals(new File(dsCfg.getWalPath())); - } - - /** * @param workIdx Work index. * @param walSegmentIdx Wal segment index. * @param e Exception. diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/aware/SegmentArchivedStorage.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/aware/SegmentArchivedStorage.java index 438b922..53b3b59 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/aware/SegmentArchivedStorage.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/aware/SegmentArchivedStorage.java @@ -43,22 +43,11 @@ class SegmentArchivedStorage extends SegmentObservable { /** * @param segmentLockStorage Protects WAL work segments from moving. */ - private SegmentArchivedStorage(SegmentLockStorage segmentLockStorage) { + SegmentArchivedStorage(SegmentLockStorage segmentLockStorage) { this.segmentLockStorage = segmentLockStorage; } /** - * @param segmentLockStorage Protects WAL work segments from moving. - */ - static SegmentArchivedStorage buildArchivedStorage(SegmentLockStorage segmentLockStorage) { - SegmentArchivedStorage archivedStorage = new SegmentArchivedStorage(segmentLockStorage); - - segmentLockStorage.addObserver(archivedStorage::onSegmentUnlocked); - - return archivedStorage; - } - - /** * @return Last archived segment absolute index. */ long lastArchivedAbsoluteIndex() { @@ -105,7 +94,7 @@ class SegmentArchivedStorage extends SegmentObservable { */ synchronized void markAsMovedToArchive(long toArchive) throws IgniteInterruptedCheckedException { try { - while (segmentLockStorage.locked(toArchive) && !interrupted) + while (!segmentLockStorage.minLockIndex(toArchive) && !interrupted) wait(); } catch (InterruptedException e) { @@ -145,7 +134,7 @@ class SegmentArchivedStorage extends SegmentObservable { /** * Callback for waking up waiters of this object when unlocked happened. */ - private synchronized void onSegmentUnlocked(long segmentId) { + synchronized void onSegmentUnlocked(long segmentId) { notifyAll(); } diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/aware/SegmentAware.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/aware/SegmentAware.java index be60895..89523db 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/aware/SegmentAware.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/aware/SegmentAware.java @@ -19,10 +19,6 @@ package org.apache.ignite.internal.processors.cache.persistence.wal.aware; import org.apache.ignite.internal.IgniteInterruptedCheckedException; -import static org.apache.ignite.internal.processors.cache.persistence.wal.aware.SegmentArchivedStorage.buildArchivedStorage; -import static org.apache.ignite.internal.processors.cache.persistence.wal.aware.SegmentCompressStorage.buildCompressStorage; -import static org.apache.ignite.internal.processors.cache.persistence.wal.aware.SegmentCurrentStateStorage.buildCurrentStateStorage; - /** * Holder of actual information of latest manipulation on WAL segments. */ @@ -34,7 +30,7 @@ public class SegmentAware { private final SegmentLockStorage segmentLockStorage = new SegmentLockStorage(); /** Manages last archived index, emulates archivation in no-archiver mode. */ - private final SegmentArchivedStorage segmentArchivedStorage = buildArchivedStorage(segmentLockStorage); + private final SegmentArchivedStorage segmentArchivedStorage; /** Storage of actual information about current index of compressed segments. */ private final SegmentCompressStorage segmentCompressStorage; @@ -43,12 +39,21 @@ public class SegmentAware { private final SegmentCurrentStateStorage segmentCurrStateStorage; /** + * Constructor. + * * @param walSegmentsCnt Total WAL segments count. * @param compactionEnabled Is wal compaction enabled. */ public SegmentAware(int walSegmentsCnt, boolean compactionEnabled) { - segmentCurrStateStorage = buildCurrentStateStorage(walSegmentsCnt, segmentArchivedStorage); - segmentCompressStorage = buildCompressStorage(segmentArchivedStorage, compactionEnabled); + segmentArchivedStorage = new SegmentArchivedStorage(segmentLockStorage); + + segmentCurrStateStorage = new SegmentCurrentStateStorage(walSegmentsCnt); + segmentCompressStorage = new SegmentCompressStorage(compactionEnabled); + + segmentArchivedStorage.addObserver(segmentCurrStateStorage::onSegmentArchived); + segmentArchivedStorage.addObserver(segmentCompressStorage::onSegmentArchived); + + segmentLockStorage.addObserver(segmentArchivedStorage::onSegmentUnlocked); } /** @@ -133,20 +138,6 @@ public class SegmentAware { } /** - * @param idx Minimum raw segment index that should be preserved from deletion. - */ - public void keepUncompressedIdxFrom(long idx) { - segmentCompressStorage.keepUncompressedIdxFrom(idx); - } - - /** - * @return Minimum raw segment index that should be preserved from deletion. - */ - public long keepUncompressedIdxFrom() { - return segmentCompressStorage.keepUncompressedIdxFrom(); - } - - /** * Update current WAL index. * * @param curAbsWalIdx New current WAL index. @@ -184,10 +175,14 @@ public class SegmentAware { } /** + * Segment reservation. It will be successful if segment is {@code >} than + * the {@link #minReserveIndex minimum}. + * * @param absIdx Index for reservation. + * @return {@code True} if the reservation was successful. */ - public void reserve(long absIdx) { - reservationStorage.reserve(absIdx); + public boolean reserve(long absIdx) { + return reservationStorage.reserve(absIdx); } /** @@ -208,9 +203,9 @@ public class SegmentAware { } /** - * Check if WAL segment locked (protected from move to archive) + * Check if WAL segment locked (protected from move to archive). * - * @param absIdx Index for check reservation. + * @param absIdx Index for check locking. * @return {@code True} if index is locked. */ public boolean locked(long absIdx) { @@ -218,27 +213,20 @@ public class SegmentAware { } /** - * @param absIdx Segment absolute index. - * @return <ul><li>{@code True} if can read, no lock is held, </li><li>{@code false} if work segment, need release - * segment later, use {@link #releaseWorkSegment} for unlock</li> </ul> - */ - public boolean checkCanReadArchiveOrReserveWorkSegment(long absIdx) { - return lastArchivedAbsoluteIndex() >= absIdx || segmentLockStorage.lockWorkSegment(absIdx); - } - - /** - * Visible for test. + * Segment lock. It will be successful if segment is {@code >} than + * the {@link #lastArchivedAbsoluteIndex last archived}. * - * @param absIdx Segment absolute index. segment later, use {@link #releaseWorkSegment} for unlock</li> </ul> + * @param absIdx Index to lock. + * @return {@code True} if the lock was successful. */ - void lockWorkSegment(long absIdx) { - segmentLockStorage.lockWorkSegment(absIdx); + public boolean lock(long absIdx) { + return segmentLockStorage.lockWorkSegment(absIdx); } /** - * @param absIdx Segment absolute index. + * @param absIdx Index to unlock. */ - public void releaseWorkSegment(long absIdx) { + public void unlock(long absIdx) { segmentLockStorage.releaseWorkSegment(absIdx); } @@ -274,4 +262,28 @@ public class SegmentAware { segmentCurrStateStorage.forceInterrupt(); } + + /** + * Increasing minimum segment index after that can be reserved. + * Value will be updated if it is greater than the current one. + * If segment is already reserved, the update will fail. + * + * @param absIdx Absolut segment index. + * @return {@code True} if update is successful. + */ + public boolean minReserveIndex(long absIdx) { + return reservationStorage.minReserveIndex(absIdx); + } + + /** + * Increasing minimum segment index after that can be locked. + * Value will be updated if it is greater than the current one. + * If segment is already reserved, the update will fail. + * + * @param absIdx Absolut segment index. + * @return {@code True} if update is successful. + */ + public boolean minLockIndex(long absIdx) { + return segmentLockStorage.minLockIndex(absIdx); + } } diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/aware/SegmentCompressStorage.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/aware/SegmentCompressStorage.java index 5d88e52..62fe69d 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/aware/SegmentCompressStorage.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/aware/SegmentCompressStorage.java @@ -30,9 +30,6 @@ public class SegmentCompressStorage { /** Flag of interrupt waiting on this object. */ private volatile boolean interrupted; - /** Manages last archived index, emulates archivation in no-archiver mode. */ - private final SegmentArchivedStorage segmentArchivedStorage; - /** If WAL compaction enabled. */ private final boolean compactionEnabled; @@ -51,33 +48,16 @@ public class SegmentCompressStorage { /** Compressed segment with maximal index. */ private long lastMaxCompressedIdx = -1L; - /** Min uncompressed index to keep. */ - private volatile long minUncompressedIdxToKeep = -1L; - /** - * @param segmentArchivedStorage Storage of last archived segment. + * Constructor. + * * @param compactionEnabled If WAL compaction enabled. */ - private SegmentCompressStorage(SegmentArchivedStorage segmentArchivedStorage, boolean compactionEnabled) { - this.segmentArchivedStorage = segmentArchivedStorage; - + SegmentCompressStorage(boolean compactionEnabled) { this.compactionEnabled = compactionEnabled; } /** - * @param segmentArchivedStorage Storage of last archived segment. - * @param compactionEnabled If WAL compaction enabled. - */ - static SegmentCompressStorage buildCompressStorage(SegmentArchivedStorage segmentArchivedStorage, - boolean compactionEnabled) { - SegmentCompressStorage storage = new SegmentCompressStorage(segmentArchivedStorage, compactionEnabled); - - segmentArchivedStorage.addObserver(storage::onSegmentArchived); - - return storage; - } - - /** * Callback after segment compression finish. * * @param compressedIdx Index of compressed segment. @@ -148,7 +128,7 @@ public class SegmentCompressStorage { /** * Callback for waking up compressor when new segment is archived. */ - private synchronized void onSegmentArchived(long lastAbsArchivedIdx) { + synchronized void onSegmentArchived(long lastAbsArchivedIdx) { while (lastEnqueuedToCompressIdx < lastAbsArchivedIdx && compactionEnabled) segmentsToCompress.add(++lastEnqueuedToCompressIdx); @@ -156,20 +136,6 @@ public class SegmentCompressStorage { } /** - * @param idx Minimum raw segment index that should be preserved from deletion. - */ - void keepUncompressedIdxFrom(long idx) { - minUncompressedIdxToKeep = idx; - } - - /** - * @return Minimum raw segment index that should be preserved from deletion. - */ - long keepUncompressedIdxFrom() { - return minUncompressedIdxToKeep; - } - - /** * Reset interrupted flag. */ public void reset() { diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/aware/SegmentCurrentStateStorage.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/aware/SegmentCurrentStateStorage.java index 7339497..6672879 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/aware/SegmentCurrentStateStorage.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/aware/SegmentCurrentStateStorage.java @@ -22,7 +22,7 @@ import org.apache.ignite.internal.IgniteInterruptedCheckedException; /** * Storage of absolute current segment index. */ -class SegmentCurrentStateStorage { +class SegmentCurrentStateStorage extends SegmentObservable { /** Flag of interrupt of waiting on this object. */ private volatile boolean interrupted; @@ -32,38 +32,22 @@ class SegmentCurrentStateStorage { /** Total WAL segments count. */ private final int walSegmentsCnt; - /** Manages last archived index, emulates archivation in no-archiver mode. */ - private final SegmentArchivedStorage segmentArchivedStorage; - /** * Absolute current segment index WAL Manager writes to. Guarded by <code>this</code>. Incremented during rollover. * Also may be directly set if WAL is resuming logging after start. */ private volatile long curAbsWalIdx = -1; - /** - * @param walSegmentsCnt Total WAL segments count. - * @param segmentArchivedStorage Last archived segment storage. - */ - private SegmentCurrentStateStorage(int walSegmentsCnt, SegmentArchivedStorage segmentArchivedStorage) { - this.walSegmentsCnt = walSegmentsCnt; - this.segmentArchivedStorage = segmentArchivedStorage; - } + /** Last archived file absolute index. */ + private volatile long lastAbsArchivedIdx = -1; /** + * Constructor. + * * @param walSegmentsCnt Total WAL segments count. - * @param segmentArchivedStorage Last archived segment storage. */ - static SegmentCurrentStateStorage buildCurrentStateStorage( - int walSegmentsCnt, - SegmentArchivedStorage segmentArchivedStorage - ) { - - SegmentCurrentStateStorage currStorage = new SegmentCurrentStateStorage(walSegmentsCnt, segmentArchivedStorage); - - segmentArchivedStorage.addObserver(currStorage::onSegmentArchived); - - return currStorage; + SegmentCurrentStateStorage(int walSegmentsCnt) { + this.walSegmentsCnt = walSegmentsCnt; } /** @@ -87,13 +71,11 @@ class SegmentCurrentStateStorage { * Waiting until archivation of next segment will be allowed. */ synchronized long waitNextSegmentForArchivation() throws IgniteInterruptedCheckedException { - long lastArchivedSegment = segmentArchivedStorage.lastArchivedAbsoluteIndex(); - //We can archive segment if it less than current work segment so for archivate lastArchiveSegment + 1 // we should be ensure that currentWorkSegment = lastArchiveSegment + 2 - awaitSegment(lastArchivedSegment + 2); + awaitSegment(lastAbsArchivedIdx + 2); - return lastArchivedSegment + 1; + return lastAbsArchivedIdx + 1; } /** @@ -102,23 +84,31 @@ class SegmentCurrentStateStorage { * * @return Next absolute segment index. */ - synchronized long nextAbsoluteSegmentIndex() throws IgniteInterruptedCheckedException { - curAbsWalIdx++; + long nextAbsoluteSegmentIndex() throws IgniteInterruptedCheckedException { + long nextAbsIdx; - notifyAll(); + synchronized (this) { + curAbsWalIdx++; - try { - while (curAbsWalIdx - segmentArchivedStorage.lastArchivedAbsoluteIndex() > walSegmentsCnt && !forceInterrupted) - wait(); - } - catch (InterruptedException e) { - throw new IgniteInterruptedCheckedException(e); + notifyAll(); + + try { + while (curAbsWalIdx - lastAbsArchivedIdx > walSegmentsCnt && !forceInterrupted) + wait(); + } + catch (InterruptedException e) { + throw new IgniteInterruptedCheckedException(e); + } + + if (forceInterrupted) + throw new IgniteInterruptedCheckedException("Interrupt waiting of change archived idx"); + + nextAbsIdx = curAbsWalIdx; } - if (forceInterrupted) - throw new IgniteInterruptedCheckedException("Interrupt waiting of change archived idx"); + notifyObservers(nextAbsIdx); - return curAbsWalIdx; + return nextAbsIdx; } /** @@ -126,10 +116,14 @@ class SegmentCurrentStateStorage { * * @param curAbsWalIdx New current WAL index. */ - synchronized void curAbsWalIdx(long curAbsWalIdx) { - this.curAbsWalIdx = curAbsWalIdx; + void curAbsWalIdx(long curAbsWalIdx) { + synchronized (this) { + this.curAbsWalIdx = curAbsWalIdx; - notifyAll(); + notifyAll(); + } + + notifyObservers(curAbsWalIdx); } /** @@ -160,8 +154,12 @@ class SegmentCurrentStateStorage { /** * Callback for waking up awaiting when new segment is archived. + * + * @param lastAbsArchivedIdx Last archived file absolute index. */ - private synchronized void onSegmentArchived(long lastAbsArchivedIdx) { + synchronized void onSegmentArchived(long lastAbsArchivedIdx) { + this.lastAbsArchivedIdx = lastAbsArchivedIdx; + notifyAll(); } diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/aware/SegmentLockStorage.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/aware/SegmentLockStorage.java index 6588769..a5a7948 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/aware/SegmentLockStorage.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/aware/SegmentLockStorage.java @@ -29,7 +29,10 @@ public class SegmentLockStorage extends SegmentObservable { * Maps absolute segment index to locks counter. Lock on segment protects from archiving segment and may come from * {@link FileWriteAheadLogManager.RecordsIterator} during WAL replay. Map itself is guarded by <code>this</code>. */ - private Map<Long, Integer> locked = new ConcurrentHashMap<>(); + private final Map<Long, Integer> locked = new ConcurrentHashMap<>(); + + /** Maximum segment index that can be locked. */ + private volatile long minLockIdx = -1; /** * Check if WAL segment locked (protected from move to archive) @@ -37,17 +40,22 @@ public class SegmentLockStorage extends SegmentObservable { * @param absIdx Index for check reservation. * @return {@code True} if index is locked. */ - public boolean locked(long absIdx) { + boolean locked(long absIdx) { return locked.containsKey(absIdx); } /** - * @param absIdx Segment absolute index. - * @return <ul><li>{@code True} if can read, no lock is held, </li><li>{@code false} if work segment, need release - * segment later, use {@link #releaseWorkSegment} for unlock</li> </ul> + * Segment lock. It will be successful if segment is {@code >} than the {@link #minLockIdx minimum}. + * + * @param absIdx Index to lock. + * @return {@code True} if the lock was successful. */ - boolean lockWorkSegment(long absIdx) { - locked.compute(absIdx, (idx, count) -> count == null ? 1 : count + 1); + synchronized boolean lockWorkSegment(long absIdx) { + if (absIdx > minLockIdx) { + locked.merge(absIdx, 1, Integer::sum); + + return true; + } return false; } @@ -64,4 +72,21 @@ public class SegmentLockStorage extends SegmentObservable { notifyObservers(absIdx); } + + /** + * Increasing minimum segment index that can be locked. + * Value will be updated if it is greater than the current one. + * If segment is already locked, the update will fail. + * + * @param absIdx Absolut segment index. + * @return {@code True} if update is successful. + */ + synchronized boolean minLockIndex(long absIdx) { + if (locked(absIdx)) + return false; + + minLockIdx = Math.max(minLockIdx, absIdx); + + return true; + } } diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/aware/SegmentReservationStorage.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/aware/SegmentReservationStorage.java index 50c2bbf..42eece7 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/aware/SegmentReservationStorage.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/aware/SegmentReservationStorage.java @@ -27,13 +27,25 @@ class SegmentReservationStorage { * Maps absolute segment index to reservation counter. If counter > 0 then we wouldn't delete all segments which has * index >= reserved segment index. Guarded by {@code this}. */ - private NavigableMap<Long, Integer> reserved = new TreeMap<>(); + private final NavigableMap<Long, Integer> reserved = new TreeMap<>(); + + /** Maximum segment index that can be reserved. */ + private long minReserveIdx = -1; /** + * Segment reservation. It will be successful if segment is {@code >} than the {@link #minReserveIdx minimum}. + * * @param absIdx Index for reservation. + * @return {@code True} if the reservation was successful. */ - synchronized void reserve(long absIdx) { - reserved.merge(absIdx, 1, (a, b) -> a + b); + synchronized boolean reserve(long absIdx) { + if (absIdx > minReserveIdx) { + reserved.merge(absIdx, 1, Integer::sum); + + return true; + } + + return false; } /** @@ -59,4 +71,21 @@ class SegmentReservationStorage { else reserved.put(absIdx, cur - 1); } + + /** + * Increasing minimum segment index that can be reserved. + * Value will be updated if it is greater than the current one. + * If segment is already reserved, the update will fail. + * + * @param absIdx Absolut segment index. + * @return {@code True} if update is successful. + */ + synchronized boolean minReserveIndex(long absIdx) { + if (reserved(absIdx)) + return false; + + minReserveIdx = Math.max(minReserveIdx, absIdx); + + return true; + } } diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/io/LockedReadFileInput.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/io/LockedReadFileInput.java index 6bb4786..13a905f 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/io/LockedReadFileInput.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/io/LockedReadFileInput.java @@ -17,6 +17,7 @@ package org.apache.ignite.internal.processors.cache.persistence.wal.io; +import java.io.FileNotFoundException; import java.io.IOException; import org.apache.ignite.internal.processors.cache.persistence.file.FileIO; import org.apache.ignite.internal.processors.cache.persistence.wal.ByteBufferExpander; @@ -69,19 +70,29 @@ final class LockedReadFileInput extends SimpleFileInput { if (available >= requested) return; - boolean readArchive = segmentAware.checkCanReadArchiveOrReserveWorkSegment(segmentId); + // Segment deletion protection. + if (!segmentAware.reserve(segmentId)) + throw new FileNotFoundException("Segment does not exist: " + segmentId); + try { - if (readArchive && !isLastReadFromArchive) { - isLastReadFromArchive = true; + // Protection against transferring a segment to the archive by #archiver. + boolean readArchive = !segmentAware.lock(segmentId); + try { + if (readArchive && !isLastReadFromArchive) { + isLastReadFromArchive = true; - refreshIO(); - } + refreshIO(); + } - super.ensure(requested); + super.ensure(requested); + } + finally { + if (!readArchive) + segmentAware.unlock(segmentId); + } } finally { - if (!readArchive) - segmentAware.releaseWorkSegment(segmentId); + segmentAware.release(segmentId); } } diff --git a/modules/core/src/main/java/org/apache/ignite/internal/visor/misc/VisorWalTask.java b/modules/core/src/main/java/org/apache/ignite/internal/visor/misc/VisorWalTask.java index 19b3e92..d535751 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/visor/misc/VisorWalTask.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/visor/misc/VisorWalTask.java @@ -237,7 +237,7 @@ public class VisorWalTask extends VisorMultiNodeTask<VisorWalTaskArg, VisorWalTa dbMgr.onWalTruncated(lowBoundForTruncate); - int num = wal.truncate(null, lowBoundForTruncate); + int num = wal.truncate(lowBoundForTruncate); if (walFiles != null) { sortWalFiles(walFiles); diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/db/IgnitePdsReserveWalSegmentsTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/db/IgnitePdsReserveWalSegmentsTest.java index 65dd776..e7c212f 100644 --- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/db/IgnitePdsReserveWalSegmentsTest.java +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/db/IgnitePdsReserveWalSegmentsTest.java @@ -19,6 +19,7 @@ package org.apache.ignite.internal.processors.cache.persistence.db; import org.apache.ignite.IgniteCache; import org.apache.ignite.cache.affinity.rendezvous.RendezvousAffinityFunction; +import org.apache.ignite.cluster.ClusterState; import org.apache.ignite.configuration.CacheConfiguration; import org.apache.ignite.configuration.DataRegionConfiguration; import org.apache.ignite.configuration.DataStorageConfiguration; @@ -28,11 +29,13 @@ import org.apache.ignite.internal.IgniteEx; import org.apache.ignite.internal.pagemem.wal.IgniteWriteAheadLogManager; import org.apache.ignite.internal.processors.cache.persistence.GridCacheDatabaseSharedManager; import org.apache.ignite.internal.processors.cache.persistence.wal.WALPointer; +import org.apache.ignite.testframework.GridTestUtils; import org.apache.ignite.testframework.junits.WithSystemProperty; import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest; import org.junit.Test; import static org.apache.ignite.IgniteSystemProperties.IGNITE_PDS_MAX_CHECKPOINT_MEMORY_HISTORY_SIZE; +import static org.apache.ignite.testframework.GridTestUtils.waitForCondition; /** * Test correctness of truncating unused WAL segments. @@ -40,33 +43,6 @@ import static org.apache.ignite.IgniteSystemProperties.IGNITE_PDS_MAX_CHECKPOINT @WithSystemProperty(key = IGNITE_PDS_MAX_CHECKPOINT_MEMORY_HISTORY_SIZE, value = "2") public class IgnitePdsReserveWalSegmentsTest extends GridCommonAbstractTest { /** {@inheritDoc} */ - @Override protected IgniteConfiguration getConfiguration(String gridName) throws Exception { - IgniteConfiguration cfg = super.getConfiguration(gridName); - - cfg.setConsistentId(gridName); - - CacheConfiguration<Integer, Object> ccfg = new CacheConfiguration<>(DEFAULT_CACHE_NAME); - - ccfg.setAffinity(new RendezvousAffinityFunction(false, 32)); - - cfg.setCacheConfiguration(ccfg); - - DataStorageConfiguration dbCfg = new DataStorageConfiguration(); - - cfg.setDataStorageConfiguration(dbCfg); - - dbCfg.setWalSegmentSize(1024 * 1024) - .setMaxWalArchiveSize(Long.MAX_VALUE) - .setWalSegments(10) - .setWalMode(WALMode.LOG_ONLY) - .setDefaultDataRegionConfiguration(new DataRegionConfiguration() - .setMaxSize(100 * 1024 * 1024) - .setPersistenceEnabled(true)); - - return cfg; - } - - /** {@inheritDoc} */ @Override protected void beforeTest() throws Exception { stopAllGrids(); @@ -80,6 +56,28 @@ public class IgnitePdsReserveWalSegmentsTest extends GridCommonAbstractTest { cleanPersistenceDir(); } + /** {@inheritDoc} */ + @Override protected IgniteConfiguration getConfiguration(String igniteInstanceName) throws Exception { + return super.getConfiguration(igniteInstanceName) + .setConsistentId(igniteInstanceName) + .setCacheConfiguration( + new CacheConfiguration<>(DEFAULT_CACHE_NAME) + .setAffinity(new RendezvousAffinityFunction(false, 32)) + ).setDataStorageConfiguration( + new DataStorageConfiguration() + .setCheckpointFrequency(Long.MAX_VALUE) + .setWalMode(WALMode.LOG_ONLY) + .setMaxWalArchiveSize(Long.MAX_VALUE) + .setWalSegmentSize(1024 * 1024) + .setWalSegments(10) + .setDefaultDataRegionConfiguration( + new DataRegionConfiguration() + .setMaxSize(100 * 1024 * 1024) + .setPersistenceEnabled(true) + ) + ); + } + /** * Tests that range reserved method return correct number of reserved WAL segments. * @@ -87,18 +85,17 @@ public class IgnitePdsReserveWalSegmentsTest extends GridCommonAbstractTest { */ @Test public void testWalManagerRangeReservation() throws Exception { - IgniteEx ig0 = prepareGrid(4); + IgniteEx n = prepareGrid(2); - GridCacheDatabaseSharedManager dbMgr = (GridCacheDatabaseSharedManager)ig0.context().cache().context() - .database(); + IgniteWriteAheadLogManager wal = n.context().cache().context().wal(); - IgniteWriteAheadLogManager wal = ig0.context().cache().context().wal(); + assertNotNull(wal); - long resIdx = getReservedWalSegmentIndex(dbMgr); + long resIdx = getReservedWalSegmentIndex(wal); assertTrue("Expected that at least resIdx greater than 0, real is " + resIdx, resIdx > 0); - WALPointer lowPtr = dbMgr.checkpointHistory().firstCheckpointPointer(); + WALPointer lowPtr = lastCheckpointPointer(n); assertTrue("Expected that dbMbr returns valid resIdx", lowPtr.index() == resIdx); @@ -117,25 +114,24 @@ public class IgnitePdsReserveWalSegmentsTest extends GridCommonAbstractTest { */ @Test public void testWalDoesNotTruncatedWhenSegmentReserved() throws Exception { - IgniteEx ig0 = prepareGrid(4); + IgniteEx n = prepareGrid(2); - GridCacheDatabaseSharedManager dbMgr = (GridCacheDatabaseSharedManager)ig0.context().cache().context() - .database(); + IgniteWriteAheadLogManager wal = n.context().cache().context().wal(); - IgniteWriteAheadLogManager wal = ig0.context().cache().context().wal(); + assertNotNull(wal); - long resIdx = getReservedWalSegmentIndex(dbMgr); + long resIdx = getReservedWalSegmentIndex(wal); assertTrue("Expected that at least resIdx greater than 0, real is " + resIdx, resIdx > 0); - WALPointer lowPtr = dbMgr.checkpointHistory().firstCheckpointPointer(); + WALPointer lowPtr = lastCheckpointPointer(n); assertTrue("Expected that dbMbr returns valid resIdx", lowPtr.index() == resIdx); // Reserve previous WAL segment. wal.reserve(new WALPointer(resIdx - 1, 0, 0)); - int numDel = wal.truncate(null, lowPtr); + int numDel = wal.truncate(lowPtr); int expNumDel = (int)resIdx - 1; @@ -143,6 +139,48 @@ public class IgnitePdsReserveWalSegmentsTest extends GridCommonAbstractTest { } /** + * Checking that there will be no truncation of segments required for binary recovery. + * + * @throws Exception If failed. + */ + @Test + public void testNotTruncateSegmentsForBinaryRecovery() throws Exception { + IgniteEx n = prepareGrid(1); + + IgniteWriteAheadLogManager wal = n.context().cache().context().wal(); + + assertNotNull(wal); + + long resIdx = getReservedWalSegmentIndex(wal); + assertTrue(resIdx > 3); + + WALPointer lastCheckpointPtr = lastCheckpointPointer(n); + assertEquals(lastCheckpointPtr.index(), resIdx); + + wal.notchLastCheckpointPtr(new WALPointer(1, 0, 0)); + + if (compactionEnabled(n)) + assertTrue(waitForCondition(() -> wal.lastCompactedSegment() >= 1, 10_000)); + + int truncated = wal.truncate(lastCheckpointPtr); + assertTrue("truncated: " + truncated, truncated >= 1); + + truncated = wal.truncate(lastCheckpointPtr); + assertEquals(0, truncated); + + wal.notchLastCheckpointPtr(new WALPointer(2, 0, 0)); + + if (compactionEnabled(n)) + assertTrue(waitForCondition(() -> wal.lastCompactedSegment() >= 2, 10_000)); + + truncated = wal.truncate(lastCheckpointPtr); + assertTrue("truncated: " + truncated, truncated >= 1); + + truncated = wal.truncate(lastCheckpointPtr); + assertEquals(0, truncated); + } + + /** * Starts grid and populates test data. * * @param cnt Grid count. @@ -152,7 +190,8 @@ public class IgnitePdsReserveWalSegmentsTest extends GridCommonAbstractTest { private IgniteEx prepareGrid(int cnt) throws Exception { IgniteEx ig0 = startGrids(cnt); - ig0.cluster().active(true); + ig0.cluster().state(ClusterState.ACTIVE); + awaitPartitionMapExchange(); IgniteCache<Object, Object> cache = ig0.cache(DEFAULT_CACHE_NAME); @@ -167,11 +206,32 @@ public class IgnitePdsReserveWalSegmentsTest extends GridCommonAbstractTest { } /** - * Get index of reserved WAL segment by checkpointer. + * Get index of reserved WAL segment by checkpoint. * * @param dbMgr Database shared manager. */ - private long getReservedWalSegmentIndex(GridCacheDatabaseSharedManager dbMgr) { - return dbMgr.checkpointHistory().firstCheckpointPointer().index(); + private long getReservedWalSegmentIndex(IgniteWriteAheadLogManager dbMgr) { + return ((WALPointer)GridTestUtils.getFieldValueHierarchy(dbMgr, "lastCheckpointPtr")).index(); + } + + /** + * Getting WAL pointer last checkpoint. + * + * @param n Node. + * @return WAL pointer last checkpoint. + */ + private WALPointer lastCheckpointPointer(IgniteEx n) { + return ((GridCacheDatabaseSharedManager)n.context().cache().context().database()) + .checkpointHistory().lastCheckpoint().checkpointMark(); + } + + /** + * Checking that wal compaction enabled. + * + * @param n Node. + * @return {@code True} if enabled. + */ + private boolean compactionEnabled(IgniteEx n) { + return n.configuration().getDataStorageConfiguration().isWalCompactionEnabled(); } } diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/db/wal/IgniteWalIteratorSwitchSegmentTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/db/wal/IgniteWalIteratorSwitchSegmentTest.java index f9b029e..8d65954 100644 --- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/db/wal/IgniteWalIteratorSwitchSegmentTest.java +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/db/wal/IgniteWalIteratorSwitchSegmentTest.java @@ -19,7 +19,6 @@ package org.apache.ignite.internal.processors.cache.persistence.db.wal; import java.io.File; import java.nio.channels.Channel; -import java.nio.file.Paths; import java.util.Random; import java.util.concurrent.CountDownLatch; import java.util.concurrent.atomic.AtomicInteger; @@ -75,30 +74,30 @@ public class IgniteWalIteratorSwitchSegmentTest extends GridCommonAbstractTest { /** Segment file size. */ private static final int SEGMENT_SIZE = 1024 * 1024; + /** Node dir. */ + private static final String NODE_DIR = "NODE"; + /** WAL segment file sub directory. */ - private static final String WORK_SUB_DIR = "/NODE/wal"; + private static final String WORK_SUB_DIR = String.join(File.separator, "", NODE_DIR, "wal"); /** WAL archive segment file sub directory. */ - private static final String ARCHIVE_SUB_DIR = "/NODE/walArchive"; + private static final String ARCHIVE_SUB_DIR = String.join(File.separator, "", NODE_DIR, "walArchive"); /** Serializer versions for check. */ - private int[] checkSerializerVers = new int[] { - 1, - 2 - }; + private final int[] checkSerializerVers = new int[] {1, 2}; /** {@inheritDoc} */ @Override protected void beforeTest() throws Exception { super.beforeTest(); - U.delete(Paths.get(U.defaultWorkDirectory())); + deleteNodeDir(); } /** {@inheritDoc} */ @Override protected void afterTest() throws Exception { super.afterTest(); - U.delete(Paths.get(U.defaultWorkDirectory())); + deleteNodeDir(); } /** @@ -114,6 +113,40 @@ public class IgniteWalIteratorSwitchSegmentTest extends GridCommonAbstractTest { } /** + * Test for check invariant, size of SWITCH_SEGMENT_RECORD should be 1 byte. + * + * @throws Exception If some thing failed. + */ + @Test + public void testInvariantSwitchSegment() throws Exception { + for (int serVer : checkSerializerVers) { + try { + checkInvariantSwitchSegment(serVer); + } + finally { + deleteNodeDir(); + } + } + } + + /** + * Test for check switch segment from work dir to archive dir during iteration. + * + * @throws Exception If some thing failed. + */ + @Test + public void testSwitchReadingSegmentFromWorkToArchive() throws Exception { + for (int serVer : checkSerializerVers) { + try { + checkSwitchReadingSegmentDuringIteration(serVer); + } + finally { + deleteNodeDir(); + } + } + } + + /** * @param serVer WAL serializer version. * @throws Exception If some thing failed. */ @@ -162,40 +195,6 @@ public class IgniteWalIteratorSwitchSegmentTest extends GridCommonAbstractTest { } /** - * Test for check invariant, size of SWITCH_SEGMENT_RECORD should be 1 byte. - * - * @throws Exception If some thing failed. - */ - @Test - public void testInvariantSwitchSegment() throws Exception { - for (int serVer : checkSerializerVers) { - try { - checkInvariantSwitchSegment(serVer); - } - finally { - U.delete(Paths.get(U.defaultWorkDirectory())); - } - } - } - - /** - * Test for check switch segment from work dir to archive dir during iteration. - * - * @throws Exception If some thing failed. - */ - @Test - public void testSwitchReadingSegmentFromWorkToArchive() throws Exception { - for (int serVer : checkSerializerVers) { - try { - checkSwitchReadingSegmentDuringIteration(serVer); - } - finally { - U.delete(Paths.get(U.defaultWorkDirectory())); - } - } - } - - /** * @param serVer WAL serializer version. * @throws Exception If some thing failed. */ @@ -270,8 +269,10 @@ public class IgniteWalIteratorSwitchSegmentTest extends GridCommonAbstractTest { walMgr.flush(null, true); + SegmentAware segmentAware = GridTestUtils.getFieldValue(walMgr, "segmentAware"); + // Await archiver move segment to WAL archive. - Thread.sleep(5000); + waitForCondition(() -> segmentAware.lastArchivedAbsoluteIndex() == 0, 5_000); // If switchSegmentRecordSize more that 1, it mean that invariant is broke. // Filling tail some garbage. Simulate tail garbage on rotate segment in WAL work directory. @@ -300,7 +301,7 @@ public class IgniteWalIteratorSwitchSegmentTest extends GridCommonAbstractTest { seg0.close(); } - int expectedRecords = recordsToWrite; + int expRecords = recordsToWrite; int actualRecords = 0; // Check that switch segment works as expected and all record is reachable. @@ -315,7 +316,7 @@ public class IgniteWalIteratorSwitchSegmentTest extends GridCommonAbstractTest { } } - Assert.assertEquals("Not all records read during iteration.", expectedRecords, actualRecords); + Assert.assertEquals("Not all records read during iteration.", expRecords, actualRecords); } /** @@ -340,75 +341,72 @@ public class IgniteWalIteratorSwitchSegmentTest extends GridCommonAbstractTest { SegmentAware segmentAware = GridTestUtils.getFieldValue(walMgr, "segmentAware"); - //guard from archivation before iterator would be created. - segmentAware.checkCanReadArchiveOrReserveWorkSegment(0); + // Guard from archiving before iterator would be created. + assertTrue(segmentAware.lock(0)); for (int i = 0; i < recordsToWrite; i++) walMgr.log(new MetastoreDataRecord(rec.key(), rec.value())); walMgr.flush(null, true); - int expectedRecords = recordsToWrite; AtomicInteger actualRecords = new AtomicInteger(0); AtomicReference<String> startedSegmentPath = new AtomicReference<>(); AtomicReference<String> finishedSegmentPath = new AtomicReference<>(); - CountDownLatch startedIteratorLatch = new CountDownLatch(1); + CountDownLatch startedIterLatch = new CountDownLatch(1); CountDownLatch finishedArchivedLatch = new CountDownLatch(1); - IgniteInternalFuture<Object> future = GridTestUtils.runAsync( - () -> { - // Check that switch segment works as expected and all record is reachable. - try (WALIterator it = walMgr.replay(null)) { - Object handle = getFieldValueHierarchy(it, "currWalSegment"); - FileInput in = getFieldValueHierarchy(handle, "in"); - Object delegate = getFieldValueHierarchy(in.io(), "delegate"); - Channel ch = getFieldValueHierarchy(delegate, "ch"); - String path = getFieldValueHierarchy(ch, "path"); + IgniteInternalFuture<?> fut = GridTestUtils.runAsync(() -> { + // Check that switch segment works as expected and all record is reachable. + try (WALIterator it = walMgr.replay(null)) { + Object handle = getFieldValueHierarchy(it, "currWalSegment"); + FileInput in = getFieldValueHierarchy(handle, "in"); + Object delegate = getFieldValueHierarchy(in.io(), "delegate"); + Channel ch = getFieldValueHierarchy(delegate, "ch"); + String path = getFieldValueHierarchy(ch, "path"); - startedSegmentPath.set(path); + startedSegmentPath.set(path); - startedIteratorLatch.countDown(); + startedIterLatch.countDown(); - while (it.hasNext()) { - IgniteBiTuple<WALPointer, WALRecord> tup = it.next(); + while (it.hasNext()) { + IgniteBiTuple<WALPointer, WALRecord> tup = it.next(); - WALRecord rec0 = tup.get2(); + WALRecord rec0 = tup.get2(); - if (rec0.type() == METASTORE_DATA_RECORD) - actualRecords.incrementAndGet(); + if (rec0.type() == METASTORE_DATA_RECORD) + actualRecords.incrementAndGet(); - finishedArchivedLatch.await(); - } - - in = getFieldValueHierarchy(handle, "in"); - delegate = getFieldValueHierarchy(in.io(), "delegate"); - ch = getFieldValueHierarchy(delegate, "ch"); - path = getFieldValueHierarchy(ch, "path"); - - finishedSegmentPath.set(path); + finishedArchivedLatch.await(); } - return null; + in = getFieldValueHierarchy(handle, "in"); + delegate = getFieldValueHierarchy(in.io(), "delegate"); + ch = getFieldValueHierarchy(delegate, "ch"); + path = getFieldValueHierarchy(ch, "path"); + + finishedSegmentPath.set(path); } - ); - startedIteratorLatch.await(); + return null; + }); + + startedIterLatch.await(); - segmentAware.releaseWorkSegment(0); + segmentAware.unlock(0); waitForCondition(() -> segmentAware.lastArchivedAbsoluteIndex() == 0, 5000); finishedArchivedLatch.countDown(); - future.get(); + fut.get(); //should started iteration from work directory but finish from archive directory. - assertEquals(workDir + WORK_SUB_DIR + "/0000000000000000.wal", startedSegmentPath.get()); - assertEquals(workDir + ARCHIVE_SUB_DIR + "/0000000000000000.wal", finishedSegmentPath.get()); + assertEquals(workDir + WORK_SUB_DIR + File.separator + "0000000000000000.wal", startedSegmentPath.get()); + assertEquals(workDir + ARCHIVE_SUB_DIR + File.separator + "0000000000000000.wal", finishedSegmentPath.get()); - Assert.assertEquals("Not all records read during iteration.", expectedRecords, actualRecords.get()); + Assert.assertEquals("Not all records read during iteration.", recordsToWrite, actualRecords.get()); } /*** @@ -493,4 +491,11 @@ public class IgniteWalIteratorSwitchSegmentTest extends GridCommonAbstractTest { return new T2<>(walMgr, recordSerializer); } + + /** + * Delete node dir. + */ + private void deleteNodeDir() throws Exception { + U.delete(U.resolveWorkDirectory(U.defaultWorkDirectory(), NODE_DIR, false)); + } } diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/pagemem/NoOpWALManager.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/pagemem/NoOpWALManager.java index 2e78ad0..10634cc 100644 --- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/pagemem/NoOpWALManager.java +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/pagemem/NoOpWALManager.java @@ -95,7 +95,7 @@ public class NoOpWALManager implements IgniteWriteAheadLogManager { } /** {@inheritDoc} */ - @Override public int truncate(WALPointer low, WALPointer high) { + @Override public int truncate(@Nullable WALPointer high) { return 0; } diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/wal/aware/SegmentAwareTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/wal/aware/SegmentAwareTest.java index 0bd9fcb..60663ef 100644 --- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/wal/aware/SegmentAwareTest.java +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/wal/aware/SegmentAwareTest.java @@ -65,9 +65,8 @@ public class SegmentAwareTest { int i = iterationCnt; while (i-- > 0) { - aware.lockWorkSegment(segmentToHandle); - - aware.releaseWorkSegment(segmentToHandle); + if (aware.lock(segmentToHandle)) + aware.unlock(segmentToHandle); } }); @@ -388,12 +387,12 @@ public class SegmentAwareTest { //given: thread which awaited segment. SegmentAware aware = new SegmentAware(10, false); - aware.checkCanReadArchiveOrReserveWorkSegment(5); + assertTrue(aware.lock(5)); IgniteInternalFuture future = awaitThread(() -> aware.markAsMovedToArchive(5)); //when: release exact expected work segment. - aware.releaseWorkSegment(5); + aware.unlock(5); //then: waiting should finish immediately. future.get(20); @@ -406,7 +405,8 @@ public class SegmentAwareTest { public void testMarkAsMovedToArchive_WhenInterruptWasCall() throws IgniteCheckedException, InterruptedException { //given: thread which awaited segment. SegmentAware aware = new SegmentAware(10, false); - aware.checkCanReadArchiveOrReserveWorkSegment(5); + + assertTrue(aware.lock(5)); IgniteInternalFuture future = awaitThread(() -> aware.markAsMovedToArchive(5)); @@ -521,6 +521,10 @@ public class SegmentAwareTest { //given: thread which awaited segment. SegmentAware aware = new SegmentAware(10, false); + // Set limits. + aware.curAbsWalIdx(10); + aware.minReserveIndex(0); + //when: reserve one segment twice and one segment once. aware.reserve(5); aware.reserve(5); @@ -586,8 +590,8 @@ public class SegmentAwareTest { SegmentAware aware = new SegmentAware(10, false); //when: lock one segment twice. - aware.checkCanReadArchiveOrReserveWorkSegment(5); - aware.checkCanReadArchiveOrReserveWorkSegment(5); + assertTrue(aware.lock(5)); + assertTrue(aware.lock(5)); //then: exact one segment should locked. assertTrue(aware.locked(5)); @@ -595,7 +599,7 @@ public class SegmentAwareTest { assertFalse(aware.locked(4)); //when: release segment once. - aware.releaseWorkSegment(5); + aware.unlock(5); //then: nothing to change, segment still locked. assertTrue(aware.locked(5)); @@ -603,7 +607,7 @@ public class SegmentAwareTest { assertFalse(aware.locked(4)); //when: release segment. - aware.releaseWorkSegment(5); + aware.unlock(5); //then: all segments should be unlocked. assertFalse(aware.locked(5)); @@ -619,10 +623,9 @@ public class SegmentAwareTest { //given: thread which awaited segment. SegmentAware aware = new SegmentAware(10, false); - aware.checkCanReadArchiveOrReserveWorkSegment(5); + assertTrue(aware.lock(5)); try { - - aware.releaseWorkSegment(7); + aware.unlock(7); } catch (AssertionError e) { return; @@ -632,6 +635,50 @@ public class SegmentAwareTest { } /** + * Check that the reservation border is working correctly. + */ + @Test + public void testReservationBorder() { + SegmentAware aware = new SegmentAware(10, false); + + assertTrue(aware.reserve(0)); + assertTrue(aware.reserve(1)); + + assertFalse(aware.minReserveIndex(0)); + assertFalse(aware.minReserveIndex(1)); + + aware.release(0); + + assertTrue(aware.minReserveIndex(0)); + assertFalse(aware.minReserveIndex(1)); + + assertFalse(aware.reserve(0)); + assertTrue(aware.reserve(1)); + } + + /** + * Check that the lock border is working correctly. + */ + @Test + public void testLockBorder() { + SegmentAware aware = new SegmentAware(10, false); + + assertTrue(aware.lock(0)); + assertTrue(aware.lock(1)); + + assertFalse(aware.minLockIndex(0)); + assertFalse(aware.minLockIndex(1)); + + aware.unlock(0); + + assertTrue(aware.minLockIndex(0)); + assertFalse(aware.minLockIndex(1)); + + assertFalse(aware.lock(0)); + assertTrue(aware.lock(1)); + } + + /** * Assert that future is still not finished. * * @param future Future to check.