This is an automated email from the ASF dual-hosted git repository. sergeychugunov pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/ignite.git
The following commit(s) were added to refs/heads/master by this push: new b2c4aff IGNITE-14952 Cancelling WAL segments reservation when max WAL archive size is reached - Fixes 9198 b2c4aff is described below commit b2c4affb8275b62f71ff65f83807b2da1bf9998b Author: Kirill Tkalenko <tkalkir...@yandex.ru> AuthorDate: Tue Jul 13 17:38:23 2021 +0300 IGNITE-14952 Cancelling WAL segments reservation when max WAL archive size is reached - Fixes 9198 Signed-off-by: Sergey Chugunov <sergey.chugu...@gmail.com> --- .../pagemem/wal/IgniteWriteAheadLogManager.java | 4 + .../persistence/wal/FileWriteAheadLogManager.java | 66 +++-- .../wal/aware/SegmentArchiveSizeStorage.java | 161 +++++++++-- .../cache/persistence/wal/aware/SegmentAware.java | 48 ++-- .../wal/aware/SegmentReservationStorage.java | 37 ++- .../persistence/wal/aware/SegmentAwareTest.java | 317 ++++++++++++++++++--- 6 files changed, 517 insertions(+), 116 deletions(-) diff --git a/modules/core/src/main/java/org/apache/ignite/internal/pagemem/wal/IgniteWriteAheadLogManager.java b/modules/core/src/main/java/org/apache/ignite/internal/pagemem/wal/IgniteWriteAheadLogManager.java index eafd228..044a79a 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/pagemem/wal/IgniteWriteAheadLogManager.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/pagemem/wal/IgniteWriteAheadLogManager.java @@ -19,6 +19,7 @@ package org.apache.ignite.internal.pagemem.wal; import org.apache.ignite.IgniteCheckedException; import org.apache.ignite.IgniteException; +import org.apache.ignite.configuration.DataStorageConfiguration; import org.apache.ignite.internal.pagemem.wal.record.RolloverType; import org.apache.ignite.internal.pagemem.wal.record.WALRecord; import org.apache.ignite.internal.processors.cache.GridCacheSharedManager; @@ -133,7 +134,10 @@ public interface IgniteWriteAheadLogManager extends GridCacheSharedManager, Igni /** * Invoke this method to reserve WAL history since provided pointer and prevent it's deletion. * + * NOTE: If the {@link DataStorageConfiguration#getMaxWalArchiveSize()} is exceeded, the segment will be released. + * * @param start WAL pointer. + * @return {@code True} if the reservation was successful. */ public boolean reserve(WALPointer start); diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/FileWriteAheadLogManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/FileWriteAheadLogManager.java index 673accd..33ca3c7 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/FileWriteAheadLogManager.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/FileWriteAheadLogManager.java @@ -501,7 +501,13 @@ public class FileWriteAheadLogManager extends GridCacheSharedManagerAdapter impl }); } - segmentAware = new SegmentAware(dsCfg.getWalSegments(), dsCfg.isWalCompactionEnabled(), log); + segmentAware = new SegmentAware( + log, + dsCfg.getWalSegments(), + dsCfg.isWalCompactionEnabled(), + minWalArchiveSize, + maxWalArchiveSize + ); // We have to initialize compressor before archiver in order to setup already compressed segments. // Otherwise, FileArchiver initialization will trigger redundant work for FileCompressor. @@ -568,7 +574,9 @@ public class FileWriteAheadLogManager extends GridCacheSharedManagerAdapter impl segmentAware.reset(); segmentAware.resetWalArchiveSizes(); - segmentAware.addCurrentWalArchiveSize(totalSize(walArchiveFiles())); + + for (FileDescriptor descriptor : walArchiveFiles()) + segmentAware.addSize(descriptor.idx, descriptor.file.length()); if (isArchiverEnabled()) { assert archiver != null : "FileArchiver should be initialized."; @@ -1101,8 +1109,10 @@ public class FileWriteAheadLogManager extends GridCacheSharedManagerAdapter impl else { deleted++; - segmentSize.remove(desc.idx()); - segmentAware.addCurrentWalArchiveSize(-len); + long idx = desc.idx(); + + segmentSize.remove(idx); + segmentAware.addSize(idx, -len); } // Bump up the oldest archive segment index. @@ -1307,8 +1317,12 @@ public class FileWriteAheadLogManager extends GridCacheSharedManagerAdapter impl switchSegmentRecordOffset.set(idx, hnd.getSwitchSegmentRecordOffset()); } + long idx = cur.getSegmentId() + 1; + long currSize = 0; + long reservedSize = maxWalSegmentSize; + if (archiver == null) - segmentAware.addReservedWalArchiveSize(maxWalSegmentSize); + segmentAware.addSize(idx, reservedSize); FileWriteHandle next; try { @@ -1328,14 +1342,12 @@ public class FileWriteAheadLogManager extends GridCacheSharedManagerAdapter impl assert ptr != null; } - segmentSize.put(next.getSegmentId(), maxWalSegmentSize); - - if (archiver == null) - segmentAware.addCurrentWalArchiveSize(maxWalSegmentSize); + currSize = reservedSize; + segmentSize.put(idx, currSize); } finally { if (archiver == null) - segmentAware.addReservedWalArchiveSize(-maxWalSegmentSize); + segmentAware.addSize(idx, currSize - reservedSize); } if (next.getSegmentId() - lastCheckpointPtr.index() >= maxSegCountWithoutCheckpoint) @@ -2025,8 +2037,10 @@ public class FileWriteAheadLogManager extends GridCacheSharedManagerAdapter impl long offs = switchSegmentRecordOffset.getAndSet((int)segIdx, 0); long origLen = origFile.length(); + long currSize = 0; long reservedSize = offs > 0 && offs < origLen ? offs : origLen; - segmentAware.addReservedWalArchiveSize(reservedSize); + + segmentAware.addSize(absIdx, reservedSize); try { if (offs > 0 && offs < origLen) @@ -2045,8 +2059,8 @@ public class FileWriteAheadLogManager extends GridCacheSharedManagerAdapter impl } } - segmentSize.put(absIdx, dstFile.length()); - segmentAware.addCurrentWalArchiveSize(dstFile.length()); + currSize = dstFile.length(); + segmentSize.put(absIdx, currSize); } catch (IOException e) { deleteArchiveFiles(dstFile, dstTmpFile); @@ -2056,7 +2070,7 @@ public class FileWriteAheadLogManager extends GridCacheSharedManagerAdapter impl ", dstFile=" + dstTmpFile.getAbsolutePath() + ']', e); } finally { - segmentAware.addReservedWalArchiveSize(-reservedSize); + segmentAware.addSize(absIdx, currSize - reservedSize); } if (log.isInfoEnabled()) { @@ -2252,8 +2266,10 @@ public class FileWriteAheadLogManager extends GridCacheSharedManagerAdapter impl File raw = new File(walArchiveDir, segmentFileName); + long currSize = 0; long reservedSize = raw.length(); - segmentAware.addReservedWalArchiveSize(reservedSize); + + segmentAware.addSize(segIdx, reservedSize); try { deleteObsoleteRawSegments(); @@ -2269,12 +2285,10 @@ public class FileWriteAheadLogManager extends GridCacheSharedManagerAdapter impl f0.force(); } - long zipLen = zip.length(); + currSize = zip.length(); + segmentSize.put(segIdx, currSize); - segmentSize.put(segIdx, zipLen); - segmentAware.addCurrentWalArchiveSize(zipLen); - - metrics.onWalSegmentCompressed(zipLen); + metrics.onWalSegmentCompressed(currSize); segmentAware.onSegmentCompressed(segIdx); @@ -2292,7 +2306,7 @@ public class FileWriteAheadLogManager extends GridCacheSharedManagerAdapter impl segmentAware.onSegmentCompressed(segIdx); } finally { - segmentAware.addReservedWalArchiveSize(-reservedSize); + segmentAware.addSize(segIdx, currSize - reservedSize); } } catch (IgniteInterruptedCheckedException ignore) { @@ -2403,7 +2417,7 @@ public class FileWriteAheadLogManager extends GridCacheSharedManagerAdapter impl return; if (desc.idx < lastCheckpointPtr.index() && duplicateIndices.contains(desc.idx)) - segmentAware.addCurrentWalArchiveSize(-deleteArchiveFiles(desc.file)); + segmentAware.addSize(desc.idx, -deleteArchiveFiles(desc.file)); } } } @@ -2458,8 +2472,10 @@ public class FileWriteAheadLogManager extends GridCacheSharedManagerAdapter impl File unzipTmp = new File(walArchiveDir, segmentFileName + TMP_SUFFIX); File unzip = new File(walArchiveDir, segmentFileName); + long currSize = 0; long reservedSize = U.uncompressedSize(zip); - segmentAware.addReservedWalArchiveSize(reservedSize); + + segmentAware.addSize(segmentToDecompress, reservedSize); IgniteCheckedException ex = null; @@ -2477,7 +2493,7 @@ public class FileWriteAheadLogManager extends GridCacheSharedManagerAdapter impl Files.move(unzipTmp.toPath(), unzip.toPath()); - segmentAware.addCurrentWalArchiveSize(unzip.length()); + currSize = unzip.length(); } catch (IOException e) { deleteArchiveFiles(unzipTmp); @@ -2492,7 +2508,7 @@ public class FileWriteAheadLogManager extends GridCacheSharedManagerAdapter impl } } finally { - segmentAware.addReservedWalArchiveSize(-reservedSize); + segmentAware.addSize(segmentToDecompress, currSize - reservedSize); } updateHeartbeat(); diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/aware/SegmentArchiveSizeStorage.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/aware/SegmentArchiveSizeStorage.java index 76d6022..82d1b15 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/aware/SegmentArchiveSizeStorage.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/aware/SegmentArchiveSizeStorage.java @@ -17,65 +17,154 @@ package org.apache.ignite.internal.processors.cache.persistence.wal.aware; +import java.util.Map; +import java.util.TreeMap; +import org.apache.ignite.IgniteLogger; +import org.apache.ignite.configuration.DataStorageConfiguration; import org.apache.ignite.internal.IgniteInterruptedCheckedException; +import org.apache.ignite.internal.util.typedef.internal.U; +import org.jetbrains.annotations.Nullable; + +import static org.apache.ignite.configuration.DataStorageConfiguration.UNLIMITED_WAL_ARCHIVE; /** * Storage WAL archive size. * Allows to track the exceeding of the maximum archive size. */ class SegmentArchiveSizeStorage { - /** Current WAL archive size in bytes. */ - private long curr; + /** Logger. */ + private final IgniteLogger log; + + /** Current WAL archive size in bytes. Guarded by {@code this}. */ + private long walArchiveSize; + + /** Flag of interrupt waiting on this object. Guarded by {@code this}. */ + private boolean interrupted; + + /** Minimum size of the WAL archive in bytes. */ + private final long minWalArchiveSize; + + /** Maximum size of the WAL archive in bytes. */ + private final long maxWalArchiveSize; - /** Reserved WAL archive size in bytes. */ - private long reserved; + /** WAL archive size unlimited. */ + private final boolean walArchiveUnlimited; - /** Flag of interrupt waiting on this object. */ - private volatile boolean interrupted; + /** + * Segment sizes. Mapping: segment idx -> size in bytes. Guarded by {@code this}. + * {@code null} if {@link #walArchiveUnlimited} == {@code true}. + */ + @Nullable private final Map<Long, Long> segmentSizes; + + /** + * Segment reservations storage. + * {@code null} if {@link #walArchiveUnlimited} == {@code true}. + */ + @Nullable private final SegmentReservationStorage reservationStorage; /** - * Adding current WAL archive size in bytes. + * Constructor. * - * @param size Size in bytes. + * @param minWalArchiveSize Minimum size of the WAL archive in bytes. + * @param maxWalArchiveSize Maximum size of the WAL archive in bytes + * or {@link DataStorageConfiguration#UNLIMITED_WAL_ARCHIVE}. + * @param reservationStorage Segment reservations storage. */ - synchronized void addCurrentSize(long size) { - curr += size; + public SegmentArchiveSizeStorage( + IgniteLogger log, + long minWalArchiveSize, + long maxWalArchiveSize, + SegmentReservationStorage reservationStorage + ) { + this.log = log; - if (size > 0) - notifyAll(); + this.minWalArchiveSize = minWalArchiveSize; + this.maxWalArchiveSize = maxWalArchiveSize; + + if (maxWalArchiveSize != UNLIMITED_WAL_ARCHIVE) { + walArchiveUnlimited = false; + + segmentSizes = new TreeMap<>(); + this.reservationStorage = reservationStorage; + } + else { + walArchiveUnlimited = true; + + segmentSizes = null; + this.reservationStorage = null; + } } /** - * Adding reserved WAL archive size in bytes. - * Defines a hint to determine if the maximum size is exceeded before a new segment is archived. + * Adds or updates information about size of a WAL segment in archive. * - * @param size Size in bytes. + * @param idx Absolut segment index. + * @param sizeChange Segment size in bytes. Could be positive (if segment is added to the archive) + * or negative (e.g. when it is removed from the archive). */ - synchronized void addReservedSize(long size) { - reserved += size; + void changeSize(long idx, long sizeChange) { + long releaseIdx = -1; + int releaseCnt = 0; + + synchronized (this) { + walArchiveSize += sizeChange; + + if (!walArchiveUnlimited) { + segmentSizes.compute(idx, (i, size) -> { + long res = (size == null ? 0 : size) + sizeChange; + + return res == 0 ? null : res; + }); + } + + if (sizeChange > 0) { + if (!walArchiveUnlimited && walArchiveSize >= maxWalArchiveSize) { + long size = 0; - if (size > 0) - notifyAll(); + for (Map.Entry<Long, Long> e : segmentSizes.entrySet()) { + releaseIdx = e.getKey(); + releaseCnt++; + + if (walArchiveSize - (size += e.getValue()) < minWalArchiveSize) + break; + } + } + + notifyAll(); + } + } + + if (releaseIdx != -1) { + if (log.isInfoEnabled()) { + log.info("Maximum size of the WAL archive exceeded, the segments will be forcibly released [" + + "maxWalArchiveSize=" + U.humanReadableByteCount(maxWalArchiveSize) + ", releasedSegmentCnt=" + + releaseCnt + ", lastReleasedSegmentIdx=" + releaseIdx + ']'); + } + + reservationStorage.forceRelease(releaseIdx); + } } /** * Reset the current and reserved WAL archive sizes. */ synchronized void resetSizes() { - curr = 0; - reserved = 0; + walArchiveSize = 0; + + if (!walArchiveUnlimited) + segmentSizes.clear(); } /** * Waiting for exceeding the maximum WAL archive size. - * To track size of WAL archive, need to use {@link #addCurrentSize} and {@link #addReservedSize}. + * To track size of WAL archive, need to use {@link #changeSize}. * * @param max Maximum WAL archive size in bytes. * @throws IgniteInterruptedCheckedException If it was interrupted. */ synchronized void awaitExceedMaxSize(long max) throws IgniteInterruptedCheckedException { try { - while (max - (curr + reserved) > 0 && !interrupted) + while (max - walArchiveSize > 0 && !interrupted) wait(); } catch (InterruptedException e) { @@ -98,7 +187,31 @@ class SegmentArchiveSizeStorage { /** * Reset interrupted flag. */ - void reset() { + synchronized void reset() { interrupted = false; } + + /** + * Getting current WAL archive size in bytes. + * + * @return Size in bytes. + */ + synchronized long currentSize() { + return walArchiveSize; + } + + /** + * Getting the size of the WAL segment of the archive in bytes. + * + * @return Size in bytes or {@code null} if the segment is absent or the archive is unlimited. + */ + @Nullable Long segmentSize(long idx) { + if (walArchiveUnlimited) + return null; + else { + synchronized (this) { + return segmentSizes.get(idx); + } + } + } } diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/aware/SegmentAware.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/aware/SegmentAware.java index 48b16b0..ee43249 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/aware/SegmentAware.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/aware/SegmentAware.java @@ -18,6 +18,7 @@ package org.apache.ignite.internal.processors.cache.persistence.wal.aware; import org.apache.ignite.IgniteLogger; +import org.apache.ignite.configuration.DataStorageConfiguration; import org.apache.ignite.internal.IgniteInterruptedCheckedException; /** @@ -48,17 +49,33 @@ public class SegmentAware { /** * Constructor. * + * @param log Logger. * @param walSegmentsCnt Total WAL segments count. * @param compactionEnabled Is wal compaction enabled. - * @param log Logger. - */ - public SegmentAware(int walSegmentsCnt, boolean compactionEnabled, IgniteLogger log) { + * @param minWalArchiveSize Minimum size of the WAL archive in bytes + * or {@link DataStorageConfiguration#UNLIMITED_WAL_ARCHIVE}. + * @param maxWalArchiveSize Maximum size of the WAL archive in bytes + * or {@link DataStorageConfiguration#UNLIMITED_WAL_ARCHIVE}. + */ + public SegmentAware( + IgniteLogger log, + int walSegmentsCnt, + boolean compactionEnabled, + long minWalArchiveSize, + long maxWalArchiveSize + ) { segmentArchivedStorage = new SegmentArchivedStorage(segmentLockStorage); segmentCurrStateStorage = new SegmentCurrentStateStorage(walSegmentsCnt); segmentCompressStorage = new SegmentCompressStorage(log, compactionEnabled); - archiveSizeStorage = new SegmentArchiveSizeStorage(); + archiveSizeStorage = new SegmentArchiveSizeStorage( + log, + minWalArchiveSize, + maxWalArchiveSize, + reservationStorage + ); + truncateStorage = new SegmentTruncateStorage(); segmentArchivedStorage.addObserver(segmentCurrStateStorage::onSegmentArchived); @@ -318,22 +335,13 @@ public class SegmentAware { } /** - * Adding current WAL archive size in bytes. - * - * @param size Size in bytes. - */ - public void addCurrentWalArchiveSize(long size) { - archiveSizeStorage.addCurrentSize(size); - } - - /** - * Adding reserved WAL archive size in bytes. - * Defines a hint to determine if the maximum size is exceeded before a new segment is archived. + * Adding the WAL segment size in the archive. * - * @param size Size in bytes. + * @param idx Absolut segment index. + * @param sizeChange Segment size in bytes. */ - public void addReservedWalArchiveSize(long size) { - archiveSizeStorage.addReservedSize(size); + public void addSize(long idx, long sizeChange) { + archiveSizeStorage.changeSize(idx, sizeChange); } /** @@ -344,8 +352,8 @@ public class SegmentAware { } /** - * Waiting for exceeding the maximum WAL archive size. To track size of WAL archive, - * need to use {@link #addCurrentWalArchiveSize} and {@link #addReservedWalArchiveSize}. + * Waiting for exceeding the maximum WAL archive size. + * To track size of WAL archive, need to use {@link #addSize}. * * @param max Maximum WAL archive size in bytes. * @throws IgniteInterruptedCheckedException If it was interrupted. diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/aware/SegmentReservationStorage.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/aware/SegmentReservationStorage.java index 1f0979a..ee81bff 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/aware/SegmentReservationStorage.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/aware/SegmentReservationStorage.java @@ -28,12 +28,12 @@ import org.jetbrains.annotations.Nullable; */ class SegmentReservationStorage extends SegmentObservable { /** - * Maps absolute segment index to reservation counter. If counter > 0 then we wouldn't delete all segments which has - * index >= reserved segment index. Guarded by {@code this}. + * Maps absolute segment index to reservation counter. Guarded by {@code this}. + * If counter > 0 then we wouldn't delete all segments which has index >= reserved segment index. */ private final NavigableMap<Long, Integer> reserved = new TreeMap<>(); - /** Maximum segment index that can be reserved. */ + /** Maximum segment index that can be reserved. Guarded by {@code this}. */ private long minReserveIdx = -1; /** @@ -71,6 +71,8 @@ class SegmentReservationStorage extends SegmentObservable { } /** + * Segment release. + * * @param absIdx Reserved index. */ void release(long absIdx) { @@ -122,4 +124,33 @@ class SegmentReservationStorage extends SegmentObservable { return Objects.equals(oldMin, newMin) ? null : newMin == null ? -1 : newMin; } + + /** + * Forces the release of reserved segments. + * Also increases minimum segment index that can be reserved. + * + * @param absIdx Absolute segment index up (and including) to which the + * segments will be released, and it will also not be possible to reserve segments. + */ + void forceRelease(long absIdx) { + Long minReservedIdx; + + synchronized (this) { + minReservedIdx = trackingMinReservedIdx(reserved -> reserved.headMap(absIdx, true).clear()); + + minReserveIdx = Math.max(minReserveIdx, absIdx); + } + + if (minReservedIdx != null) + notifyObservers(minReservedIdx); + } + + /** + * Getting maximum segment index that can be reserved. + * + * @return Absolute segment index. + */ + synchronized long minReserveIdx() { + return minReserveIdx; + } } diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/wal/aware/SegmentAwareTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/wal/aware/SegmentAwareTest.java index 08a3741..1652d35 100644 --- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/wal/aware/SegmentAwareTest.java +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/wal/aware/SegmentAwareTest.java @@ -18,16 +18,20 @@ package org.apache.ignite.internal.processors.cache.persistence.wal.aware; import java.util.concurrent.CountDownLatch; import org.apache.ignite.IgniteCheckedException; +import org.apache.ignite.configuration.DataStorageConfiguration; import org.apache.ignite.internal.IgniteFutureTimeoutCheckedException; import org.apache.ignite.internal.IgniteInternalFuture; import org.apache.ignite.internal.IgniteInterruptedCheckedException; import org.apache.ignite.logger.NullLogger; -import org.apache.ignite.testframework.GridTestUtils; import org.junit.Test; +import static org.apache.ignite.configuration.DataStorageConfiguration.UNLIMITED_WAL_ARCHIVE; +import static org.apache.ignite.testframework.GridTestUtils.getFieldValue; +import static org.apache.ignite.testframework.GridTestUtils.runAsync; import static org.hamcrest.CoreMatchers.is; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertNull; import static org.junit.Assert.assertThat; import static org.junit.Assert.assertTrue; import static org.junit.Assert.fail; @@ -44,12 +48,12 @@ public class SegmentAwareTest { */ @Test public void testAvoidDeadlockArchiverAndLockStorage() throws IgniteCheckedException { - SegmentAware aware = new SegmentAware(10, false, new NullLogger()); + SegmentAware aware = segmentAware(10); int iterationCnt = 100_000; int segmentToHandle = 1; - IgniteInternalFuture archiverThread = GridTestUtils.runAsync(() -> { + IgniteInternalFuture archiverThread = runAsync(() -> { int i = iterationCnt; while (i-- > 0) { @@ -62,7 +66,7 @@ public class SegmentAwareTest { } }); - IgniteInternalFuture lockerThread = GridTestUtils.runAsync(() -> { + IgniteInternalFuture lockerThread = runAsync(() -> { int i = iterationCnt; while (i-- > 0) { @@ -81,7 +85,7 @@ public class SegmentAwareTest { @Test public void testFinishAwaitSegment_WhenExactWaitingSegmentWasSet() throws IgniteCheckedException, InterruptedException { //given: thread which awaited segment. - SegmentAware aware = new SegmentAware(10, false, new NullLogger()); + SegmentAware aware = segmentAware(10); IgniteInternalFuture future = awaitThread(() -> aware.awaitSegment(5)); @@ -98,7 +102,7 @@ public class SegmentAwareTest { @Test public void testFinishAwaitSegment_WhenGreaterThanWaitingSegmentWasSet() throws IgniteCheckedException, InterruptedException { //given: thread which awaited segment. - SegmentAware aware = new SegmentAware(10, false, new NullLogger()); + SegmentAware aware = segmentAware(10); IgniteInternalFuture future = awaitThread(() -> aware.awaitSegment(5)); @@ -115,7 +119,7 @@ public class SegmentAwareTest { @Test public void testFinishAwaitSegment_WhenNextSegmentEqualToWaitingOne() throws IgniteCheckedException, InterruptedException { //given: thread which awaited segment. - SegmentAware aware = new SegmentAware(10, false, new NullLogger()); + SegmentAware aware = segmentAware(10); IgniteInternalFuture future = awaitThread(() -> aware.awaitSegment(5)); @@ -138,7 +142,7 @@ public class SegmentAwareTest { @Test public void testFinishAwaitSegment_WhenInterruptWasCall() throws IgniteCheckedException, InterruptedException { //given: thread which awaited segment. - SegmentAware aware = new SegmentAware(10, false, new NullLogger()); + SegmentAware aware = segmentAware(10); IgniteInternalFuture future = awaitThread(() -> aware.awaitSegment(5)); @@ -155,7 +159,7 @@ public class SegmentAwareTest { @Test public void testFinishWaitSegmentForArchive_WhenWorkSegmentIncremented() throws IgniteCheckedException, InterruptedException { //given: thread which awaited segment. - SegmentAware aware = new SegmentAware(10, false, new NullLogger()); + SegmentAware aware = segmentAware(10); aware.curAbsWalIdx(5); aware.setLastArchivedAbsoluteIndex(4); @@ -175,7 +179,7 @@ public class SegmentAwareTest { @Test public void testFinishWaitSegmentForArchive_WhenWorkSegmentGreaterValue() throws IgniteCheckedException, InterruptedException { //given: thread which awaited segment. - SegmentAware aware = new SegmentAware(10, false, new NullLogger()); + SegmentAware aware = segmentAware(10); aware.curAbsWalIdx(5); aware.setLastArchivedAbsoluteIndex(4); @@ -195,7 +199,7 @@ public class SegmentAwareTest { @Test public void testFinishWaitSegmentForArchive_WhenInterruptWasCall() throws IgniteCheckedException, InterruptedException { //given: thread which awaited segment. - SegmentAware aware = new SegmentAware(10, false, new NullLogger()); + SegmentAware aware = segmentAware(10); aware.curAbsWalIdx(5); aware.setLastArchivedAbsoluteIndex(4); @@ -215,7 +219,7 @@ public class SegmentAwareTest { @Test public void testCorrectCalculateNextSegmentIndex() throws IgniteCheckedException, InterruptedException { //given: thread which awaited segment. - SegmentAware aware = new SegmentAware(10, false, new NullLogger()); + SegmentAware aware = segmentAware(10); aware.curAbsWalIdx(5); @@ -232,7 +236,7 @@ public class SegmentAwareTest { @Test public void testFinishWaitNextAbsoluteIndex_WhenMarkAsArchivedFirstSegment() throws IgniteCheckedException, InterruptedException { //given: thread which awaited segment. - SegmentAware aware = new SegmentAware(2, false, new NullLogger()); + SegmentAware aware = segmentAware(2); aware.curAbsWalIdx(1); aware.setLastArchivedAbsoluteIndex(-1); @@ -252,7 +256,7 @@ public class SegmentAwareTest { @Test public void testFinishWaitNextAbsoluteIndex_WhenSetToArchivedFirst() throws IgniteCheckedException, InterruptedException { //given: thread which awaited segment. - SegmentAware aware = new SegmentAware(2, false, new NullLogger()); + SegmentAware aware = segmentAware(2); aware.curAbsWalIdx(1); aware.setLastArchivedAbsoluteIndex(-1); @@ -272,7 +276,7 @@ public class SegmentAwareTest { @Test public void testFinishWaitNextAbsoluteIndex_WhenOnlyForceInterruptWasCall() throws IgniteCheckedException, InterruptedException { //given: thread which awaited segment. - SegmentAware aware = new SegmentAware(2, false, new NullLogger()); + SegmentAware aware = segmentAware(2); aware.curAbsWalIdx(2); aware.setLastArchivedAbsoluteIndex(-1); @@ -298,7 +302,7 @@ public class SegmentAwareTest { @Test public void testFinishSegmentArchived_WhenSetExactWaitingSegment() throws IgniteCheckedException, InterruptedException { //given: thread which awaited segment. - SegmentAware aware = new SegmentAware(10, false, new NullLogger()); + SegmentAware aware = segmentAware(10); IgniteInternalFuture future = awaitThread(() -> aware.awaitSegmentArchived(5)); @@ -315,7 +319,7 @@ public class SegmentAwareTest { @Test public void testFinishSegmentArchived_WhenMarkExactWaitingSegment() throws IgniteCheckedException, InterruptedException { //given: thread which awaited segment. - SegmentAware aware = new SegmentAware(10, false, new NullLogger()); + SegmentAware aware = segmentAware(10); IgniteInternalFuture future = awaitThread(() -> aware.awaitSegmentArchived(5)); @@ -332,7 +336,7 @@ public class SegmentAwareTest { @Test public void testFinishSegmentArchived_WhenSetGreaterThanWaitingSegment() throws IgniteCheckedException, InterruptedException { //given: thread which awaited segment. - SegmentAware aware = new SegmentAware(10, false, new NullLogger()); + SegmentAware aware = segmentAware(10); IgniteInternalFuture future = awaitThread(() -> aware.awaitSegmentArchived(5)); @@ -349,7 +353,7 @@ public class SegmentAwareTest { @Test public void testFinishSegmentArchived_WhenMarkGreaterThanWaitingSegment() throws IgniteCheckedException, InterruptedException { //given: thread which awaited segment. - SegmentAware aware = new SegmentAware(10, false, new NullLogger()); + SegmentAware aware = segmentAware(10); IgniteInternalFuture future = awaitThread(() -> aware.awaitSegmentArchived(5)); @@ -366,7 +370,7 @@ public class SegmentAwareTest { @Test public void testFinishSegmentArchived_WhenInterruptWasCall() throws IgniteCheckedException, InterruptedException { //given: thread which awaited segment. - SegmentAware aware = new SegmentAware(10, false, new NullLogger()); + SegmentAware aware = segmentAware(10); aware.curAbsWalIdx(5); aware.setLastArchivedAbsoluteIndex(4); @@ -386,7 +390,7 @@ public class SegmentAwareTest { @Test public void testMarkAsMovedToArchive_WhenReleaseLockedSegment() throws IgniteCheckedException, InterruptedException { //given: thread which awaited segment. - SegmentAware aware = new SegmentAware(10, false, new NullLogger()); + SegmentAware aware = segmentAware(10); assertTrue(aware.lock(5)); @@ -405,7 +409,7 @@ public class SegmentAwareTest { @Test public void testMarkAsMovedToArchive_WhenInterruptWasCall() throws IgniteCheckedException, InterruptedException { //given: thread which awaited segment. - SegmentAware aware = new SegmentAware(10, false, new NullLogger()); + SegmentAware aware = segmentAware(10); assertTrue(aware.lock(5)); @@ -427,7 +431,7 @@ public class SegmentAwareTest { @Test public void testFinishWaitSegmentToCompress_WhenSetLastArchivedSegment() throws IgniteCheckedException, InterruptedException { //given: thread which awaited segment. - SegmentAware aware = new SegmentAware(10, true, new NullLogger()); + SegmentAware aware = segmentAware(10, true); aware.onSegmentCompressed(5); @@ -446,7 +450,7 @@ public class SegmentAwareTest { @Test public void testFinishWaitSegmentToCompress_WhenMarkLastArchivedSegment() throws IgniteCheckedException, InterruptedException { //given: thread which awaited segment. - SegmentAware aware = new SegmentAware(10, true, new NullLogger()); + SegmentAware aware = segmentAware(10, true); aware.onSegmentCompressed(5); @@ -464,7 +468,7 @@ public class SegmentAwareTest { */ @Test public void testCorrectCalculateNextCompressSegment() throws IgniteCheckedException, InterruptedException { - SegmentAware aware = new SegmentAware(10, true, new NullLogger()); + SegmentAware aware = segmentAware(10, true); aware.setLastArchivedAbsoluteIndex(6); @@ -478,7 +482,7 @@ public class SegmentAwareTest { @Test public void testFinishWaitSegmentToCompress_WhenInterruptWasCall() throws IgniteCheckedException, InterruptedException { //given: thread which awaited segment. - SegmentAware aware = new SegmentAware(10, true, new NullLogger()); + SegmentAware aware = segmentAware(10, true); aware.onSegmentCompressed(5); IgniteInternalFuture future = awaitThread(aware::waitNextSegmentToCompress); @@ -495,7 +499,7 @@ public class SegmentAwareTest { */ @Test public void testLastCompressedIdxProperOrdering() throws IgniteInterruptedCheckedException { - SegmentAware aware = new SegmentAware(10, true, new NullLogger()); + SegmentAware aware = segmentAware(10, true); for (int i = 0; i < 5; i++) { aware.setLastArchivedAbsoluteIndex(i); @@ -520,7 +524,7 @@ public class SegmentAwareTest { @Test public void testReserveCorrectly() { //given: thread which awaited segment. - SegmentAware aware = new SegmentAware(10, false, new NullLogger()); + SegmentAware aware = segmentAware(10); // Set limits. aware.curAbsWalIdx(10); @@ -563,15 +567,14 @@ public class SegmentAwareTest { } /** - * Shouldn't fail when release unreserved segment. + * Check that there will be no error if a non-reserved segment is released. */ @Test public void testReleaseUnreservedSegment() { //given: thread which awaited segment. - SegmentAware aware = new SegmentAware(10, false, new NullLogger()); + SegmentAware aware = segmentAware(10); aware.reserve(5); - aware.release(7); } @@ -581,7 +584,7 @@ public class SegmentAwareTest { @Test public void testReserveWorkSegmentCorrectly() { //given: thread which awaited segment. - SegmentAware aware = new SegmentAware(10, false, new NullLogger()); + SegmentAware aware = segmentAware(10); //when: lock one segment twice. assertTrue(aware.lock(5)); @@ -615,7 +618,7 @@ public class SegmentAwareTest { @Test public void testAssertFail_WhenReleaseUnreservedWorkSegment() { //given: thread which awaited segment. - SegmentAware aware = new SegmentAware(10, false, new NullLogger()); + SegmentAware aware = segmentAware(10); assertTrue(aware.lock(5)); try { @@ -633,7 +636,7 @@ public class SegmentAwareTest { */ @Test public void testReservationBorder() { - SegmentAware aware = new SegmentAware(10, false, new NullLogger()); + SegmentAware aware = segmentAware(10); assertTrue(aware.reserve(0)); assertTrue(aware.reserve(1)); @@ -655,7 +658,7 @@ public class SegmentAwareTest { */ @Test public void testLockBorder() { - SegmentAware aware = new SegmentAware(10, false, new NullLogger()); + SegmentAware aware = segmentAware(10); assertTrue(aware.lock(0)); assertTrue(aware.lock(1)); @@ -679,30 +682,30 @@ public class SegmentAwareTest { */ @Test public void testWalArchiveSize() throws Exception { - SegmentAware aware = new SegmentAware(10, false, new NullLogger()); + SegmentAware aware = segmentAware(10); IgniteInternalFuture<?> fut = awaitThread(() -> aware.awaitExceedMaxArchiveSize(10)); - aware.addCurrentWalArchiveSize(4); + aware.addSize(0, 4); assertFutureIsNotFinish(fut); - aware.addReservedWalArchiveSize(4); + aware.addSize(0, 4); assertFutureIsNotFinish(fut); - aware.addCurrentWalArchiveSize(4); + aware.addSize(0, 4); fut.get(20); aware.resetWalArchiveSizes(); fut = awaitThread(() -> aware.awaitExceedMaxArchiveSize(10)); - aware.addCurrentWalArchiveSize(4); + aware.addSize(1, 4); assertFutureIsNotFinish(fut); - aware.addReservedWalArchiveSize(4); + aware.addSize(1, 4); assertFutureIsNotFinish(fut); - aware.addReservedWalArchiveSize(4); + aware.addSize(1, 4); fut.get(20); aware.resetWalArchiveSizes(); @@ -727,7 +730,7 @@ public class SegmentAwareTest { */ @Test public void testTruncate() throws Exception { - SegmentAware aware = new SegmentAware(10, false, new NullLogger()); + SegmentAware aware = segmentAware(10); IgniteInternalFuture<?> fut = awaitThread(aware::awaitAvailableTruncateArchive); @@ -768,6 +771,165 @@ public class SegmentAwareTest { } /** + * Checking the correct calculation of the WAL archive size for an unlimited WAL archive. + */ + @Test + public void testArchiveSizeForUnlimitedWalArchive() { + SegmentAware aware = segmentAware(1, false, 0, UNLIMITED_WAL_ARCHIVE); + SegmentArchiveSizeStorage sizeStorage = archiveSizeStorage(aware); + + aware.addSize(0, 10); + + assertEquals(10, sizeStorage.currentSize()); + assertNull(sizeStorage.segmentSize(0)); + + aware.addSize(0, 20); + + assertEquals(30, sizeStorage.currentSize()); + assertNull(sizeStorage.segmentSize(0)); + + aware.addSize(1, 10); + + assertEquals(40, sizeStorage.currentSize()); + assertNull(sizeStorage.segmentSize(0)); + assertNull(sizeStorage.segmentSize(1)); + + aware.addSize(0, -10); + + assertEquals(30, sizeStorage.currentSize()); + assertNull(sizeStorage.segmentSize(0)); + assertNull(sizeStorage.segmentSize(1)); + + aware.addSize(1, -10); + + assertEquals(20, sizeStorage.currentSize()); + assertNull(sizeStorage.segmentSize(0)); + assertNull(sizeStorage.segmentSize(1)); + + aware.addSize(0, -20); + + assertEquals(0, sizeStorage.currentSize()); + assertNull(sizeStorage.segmentSize(0)); + assertNull(sizeStorage.segmentSize(1)); + } + + /** + * Checking the correct calculation of the WAL archive size for a limited WAL archive. + */ + @Test + public void testArchiveSizeForLimitedWalArchive() { + SegmentAware aware = segmentAware(1, false, 100, 200); + SegmentArchiveSizeStorage sizeStorage = archiveSizeStorage(aware); + + aware.addSize(0, 10); + + assertEquals(10, sizeStorage.currentSize()); + assertEquals(Long.valueOf(10), sizeStorage.segmentSize(0)); + + aware.addSize(0, 20); + + assertEquals(30, sizeStorage.currentSize()); + assertEquals(Long.valueOf(30), sizeStorage.segmentSize(0)); + + aware.addSize(1, 5); + + assertEquals(35, sizeStorage.currentSize()); + assertEquals(Long.valueOf(30), sizeStorage.segmentSize(0)); + assertEquals(Long.valueOf(5), sizeStorage.segmentSize(1)); + + aware.addSize(0, -5); + + assertEquals(30, sizeStorage.currentSize()); + assertEquals(Long.valueOf(25), sizeStorage.segmentSize(0)); + assertEquals(Long.valueOf(5), sizeStorage.segmentSize(1)); + + aware.addSize(0, -10); + + assertEquals(20, sizeStorage.currentSize()); + assertEquals(Long.valueOf(15), sizeStorage.segmentSize(0)); + assertEquals(Long.valueOf(5), sizeStorage.segmentSize(1)); + + aware.addSize(1, -3); + + assertEquals(17, sizeStorage.currentSize()); + assertEquals(Long.valueOf(15), sizeStorage.segmentSize(0)); + assertEquals(Long.valueOf(2), sizeStorage.segmentSize(1)); + + aware.addSize(0, -15); + + assertEquals(2, sizeStorage.currentSize()); + assertNull(sizeStorage.segmentSize(0)); + assertEquals(Long.valueOf(2), sizeStorage.segmentSize(1)); + + aware.addSize(1, -2); + + assertEquals(0, sizeStorage.currentSize()); + assertNull(sizeStorage.segmentSize(0)); + assertNull(sizeStorage.segmentSize(1)); + } + + /** + * Checking that when the {@code SegmentArchiveSizeStorage#maxWalArchiveSize} is reached, + * the segments will be released to the {@code SegmentArchiveSizeStorage#minWalArchiveSize}, + * and it will also not be possible to reserve them. + */ + @Test + public void testReleaseSegmentsOnExceedMaxWalArchiveSize() { + SegmentAware aware = segmentAware(1, false, 50, 100); + SegmentReservationStorage reservationStorage = reservationStorage(aware); + + for (int i = 0; i < 9; i++) + aware.addSize(i, 10); + + assertTrue(aware.reserve(0)); + assertTrue(aware.reserve(1)); + assertTrue(aware.reserve(8)); + + aware.addSize(9, 10); + + assertFalse(aware.reserved(0)); + assertFalse(aware.reserved(1)); + assertTrue(aware.reserved(8)); + + assertEquals(5, reservationStorage.minReserveIdx()); + + for (int i = 0; i <= 5; i++) { + assertFalse(aware.reserve(i)); + assertFalse(aware.reserved(i)); + + assertTrue(aware.minReserveIndex(i)); + } + + for (int i = 6; i < 10; i++) { + assertTrue(aware.reserve(i)); + + assertFalse(aware.minReserveIndex(i)); + } + } + + /** + * Check that if the size of the segments does not reach the {@code SegmentArchiveSizeStorage#maxWalArchiveSize} + * then there will be no release of the segments. + */ + @Test + public void testNoReleaseSegmentNearMaxWalArchiveSize() { + SegmentAware aware = segmentAware(1, false, 50, 100); + + for (int i = 0; i < 9; i++) + aware.addSize(i, 10); + + assertTrue(aware.reserve(0)); + assertTrue(aware.reserve(1)); + assertTrue(aware.reserve(8)); + + aware.addSize(9, 9); + + assertTrue(aware.reserve(0)); + assertTrue(aware.reserve(1)); + assertTrue(aware.reserve(8)); + } + + /** * Assert that future is still not finished. * * @param future Future to check. @@ -791,7 +953,7 @@ public class SegmentAwareTest { */ private IgniteInternalFuture awaitThread(Waiter waiter) throws IgniteCheckedException, InterruptedException { CountDownLatch latch = new CountDownLatch(1); - IgniteInternalFuture<Object> future = GridTestUtils.runAsync( + IgniteInternalFuture<Object> future = runAsync( () -> { latch.countDown(); try { @@ -821,4 +983,71 @@ public class SegmentAwareTest { */ void await() throws IgniteInterruptedCheckedException; } + + /** + * Factory method for the {@link SegmentAware}. + * + * @param walSegmentsCnt Total WAL segments count. + * @return New instance. + */ + private SegmentAware segmentAware(int walSegmentsCnt) { + return segmentAware(walSegmentsCnt, false); + } + + /** + * Factory method for the {@link SegmentAware}. + * + * @param walSegmentsCnt Total WAL segments count. + * @param compactionEnabled Is wal compaction enabled. + * @return New instance. + */ + private SegmentAware segmentAware(int walSegmentsCnt, boolean compactionEnabled) { + return segmentAware(walSegmentsCnt, compactionEnabled, UNLIMITED_WAL_ARCHIVE, UNLIMITED_WAL_ARCHIVE); + } + + /** + * Factory method for the {@link SegmentAware}. + * + * @param walSegmentsCnt Total WAL segments count. + * @param compactionEnabled Is wal compaction enabled. + * @param minWalArchiveSize Minimum size of the WAL archive in bytes + * or {@link DataStorageConfiguration#UNLIMITED_WAL_ARCHIVE}. + * @param maxWalArchiveSize Maximum size of the WAL archive in bytes + * or {@link DataStorageConfiguration#UNLIMITED_WAL_ARCHIVE}. + * @return New instance. + */ + private SegmentAware segmentAware( + int walSegmentsCnt, + boolean compactionEnabled, + long minWalArchiveSize, + long maxWalArchiveSize + ) { + return new SegmentAware( + new NullLogger(), + walSegmentsCnt, + compactionEnabled, + minWalArchiveSize, + maxWalArchiveSize + ); + } + + /** + * Getting {@code SegmentAware#archiveSizeStorage}. + * + * @param aware Segment aware. + * @return Instance of {@link SegmentArchiveSizeStorage}. + */ + private SegmentArchiveSizeStorage archiveSizeStorage(SegmentAware aware) { + return getFieldValue(aware, "archiveSizeStorage"); + } + + /** + * Getting {@code SegmentAware#reservationStorage}. + * + * @param aware Segment aware. + * @return Instance of {@link SegmentReservationStorage}. + */ + private SegmentReservationStorage reservationStorage(SegmentAware aware) { + return getFieldValue(aware, "reservationStorage"); + } }