IGNITE-9693 Scale up wal compression workers to increase performance - Fixes #4831.
Signed-off-by: Ivan Rakov <[email protected]> Project: http://git-wip-us.apache.org/repos/asf/ignite/repo Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/036bd074 Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/036bd074 Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/036bd074 Branch: refs/heads/ignite-5797 Commit: 036bd074d8bfd25a2c4c463a60dde00604d11b9d Parents: a748090 Author: Ivan Daschinskiy <[email protected]> Authored: Fri Sep 28 18:03:45 2018 +0300 Committer: Ivan Rakov <[email protected]> Committed: Fri Sep 28 18:10:01 2018 +0300 ---------------------------------------------------------------------- .../apache/ignite/IgniteSystemProperties.java | 5 + .../pagemem/wal/IgniteWriteAheadLogManager.java | 3 +- .../GridCacheDatabaseSharedManager.java | 11 +- .../wal/FileWriteAheadLogManager.java | 208 ++++++++++++------- .../wal/FsyncModeFileWriteAheadLogManager.java | 7 +- .../persistence/wal/aware/SegmentAware.java | 28 ++- .../wal/aware/SegmentCompressStorage.java | 80 +++++-- ...PdsReserveWalSegmentsWithCompactionTest.java | 34 +++ .../persistence/pagemem/NoOpWALManager.java | 2 +- .../persistence/wal/aware/SegmentAwareTest.java | 90 +++++--- .../ignite/testsuites/IgnitePdsTestSuite2.java | 2 + 11 files changed, 324 insertions(+), 146 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/ignite/blob/036bd074/modules/core/src/main/java/org/apache/ignite/IgniteSystemProperties.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/IgniteSystemProperties.java b/modules/core/src/main/java/org/apache/ignite/IgniteSystemProperties.java index 5932de0..01fb02a 100644 --- a/modules/core/src/main/java/org/apache/ignite/IgniteSystemProperties.java +++ b/modules/core/src/main/java/org/apache/ignite/IgniteSystemProperties.java @@ -901,6 +901,11 @@ public final class IgniteSystemProperties { public static final String IGNITE_THRESHOLD_WAL_ARCHIVE_SIZE_PERCENTAGE = "IGNITE_THRESHOLD_WAL_ARCHIVE_SIZE_PERCENTAGE"; /** + * Count of WAL compressor worker threads. Default value is 4. + */ + public static final String IGNITE_WAL_COMPRESSOR_WORKER_THREAD_CNT = "IGNITE_WAL_COMPRESSOR_WORKER_THREAD_CNT"; + + /** * Whenever read load balancing is enabled, that means 'get' requests will be distributed between primary and backup * nodes if it is possible and {@link CacheConfiguration#readFromBackup} is {@code true}. * http://git-wip-us.apache.org/repos/asf/ignite/blob/036bd074/modules/core/src/main/java/org/apache/ignite/internal/pagemem/wal/IgniteWriteAheadLogManager.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/pagemem/wal/IgniteWriteAheadLogManager.java b/modules/core/src/main/java/org/apache/ignite/internal/pagemem/wal/IgniteWriteAheadLogManager.java index 12fd3e9..4ffa347 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/pagemem/wal/IgniteWriteAheadLogManager.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/pagemem/wal/IgniteWriteAheadLogManager.java @@ -86,9 +86,8 @@ public interface IgniteWriteAheadLogManager extends GridCacheSharedManager, Igni * Invoke this method to reserve WAL history since provided pointer and prevent it's deletion. * * @param start WAL pointer. - * @throws IgniteException If failed to reserve. */ - public boolean reserve(WALPointer start) throws IgniteCheckedException; + public boolean reserve(WALPointer start); /** * Invoke this method to release WAL history since provided pointer that was previously reserved. http://git-wip-us.apache.org/repos/asf/ignite/blob/036bd074/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/GridCacheDatabaseSharedManager.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/GridCacheDatabaseSharedManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/GridCacheDatabaseSharedManager.java index 158c3b1..5e0b7cb 100755 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/GridCacheDatabaseSharedManager.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/GridCacheDatabaseSharedManager.java @@ -1758,16 +1758,7 @@ public class GridCacheDatabaseSharedManager extends IgniteCacheDatabaseSharedMan if (ptr == null) return false; - boolean reserved; - - try { - reserved = cctx.wal().reserve(ptr); - } - catch (IgniteCheckedException e) { - U.error(log, "Error while trying to reserve history", e); - - reserved = false; - } + boolean reserved = cctx.wal().reserve(ptr); if (reserved) reservedForPreloading.put(new T2<>(grpId, partId), new T2<>(cntr, ptr)); http://git-wip-us.apache.org/repos/asf/ignite/blob/036bd074/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/FileWriteAheadLogManager.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/FileWriteAheadLogManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/FileWriteAheadLogManager.java index 5d165fd..43dfb8f 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/FileWriteAheadLogManager.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/FileWriteAheadLogManager.java @@ -134,6 +134,7 @@ import static java.nio.file.StandardOpenOption.READ; import static java.nio.file.StandardOpenOption.WRITE; import static org.apache.ignite.IgniteSystemProperties.IGNITE_CHECKPOINT_TRIGGER_ARCHIVE_SIZE_PERCENTAGE; import static org.apache.ignite.IgniteSystemProperties.IGNITE_THRESHOLD_WAL_ARCHIVE_SIZE_PERCENTAGE; +import static org.apache.ignite.IgniteSystemProperties.IGNITE_WAL_COMPRESSOR_WORKER_THREAD_CNT; import static org.apache.ignite.IgniteSystemProperties.IGNITE_WAL_MMAP; import static org.apache.ignite.IgniteSystemProperties.IGNITE_WAL_SEGMENT_SYNC_TIMEOUT; import static org.apache.ignite.IgniteSystemProperties.IGNITE_WAL_SERIALIZER_VERSION; @@ -257,6 +258,12 @@ public class FileWriteAheadLogManager extends GridCacheSharedManagerAdapter impl private static final double THRESHOLD_WAL_ARCHIVE_SIZE_PERCENTAGE = IgniteSystemProperties.getDouble(IGNITE_THRESHOLD_WAL_ARCHIVE_SIZE_PERCENTAGE, 0.5); + /** + * Number of WAL compressor worker threads. + */ + private final int WAL_COMPRESSOR_WORKER_THREAD_CNT = + IgniteSystemProperties.getInteger(IGNITE_WAL_COMPRESSOR_WORKER_THREAD_CNT, 4); + /** */ private final boolean alwaysWriteFullPages; @@ -415,7 +422,7 @@ public class FileWriteAheadLogManager extends GridCacheSharedManagerAdapter impl evt = ctx.event(); failureProcessor = ctx.failure(); - segmentAware = new SegmentAware(dsCfg.getWalSegments()); + segmentAware = new SegmentAware(dsCfg.getWalSegments(), dsCfg.isWalCompactionEnabled()); } /** @@ -486,7 +493,8 @@ public class FileWriteAheadLogManager extends GridCacheSharedManagerAdapter impl segmentAware.setLastArchivedAbsoluteIndex(lastAbsArchivedIdx); if (dsCfg.isWalCompactionEnabled()) { - compressor = new FileCompressor(); + if (compressor == null) + compressor = new FileCompressor(log); if (decompressor == null) { // Preventing of two file-decompressor thread instantiations. decompressor = new FileDecompressor(log); @@ -895,7 +903,7 @@ public class FileWriteAheadLogManager extends GridCacheSharedManagerAdapter impl } /** {@inheritDoc} */ - @Override public boolean reserve(WALPointer start) throws IgniteCheckedException { + @Override public boolean reserve(WALPointer start) { assert start != null && start instanceof FileWALPointer : "Invalid start pointer: " + start; if (mode == WALMode.NONE) @@ -1005,7 +1013,7 @@ public class FileWriteAheadLogManager extends GridCacheSharedManagerAdapter impl /** {@inheritDoc} */ @Override public void notchLastCheckpointPtr(WALPointer ptr) { if (compressor != null) - compressor.keepUncompressedIdxFrom(((FileWALPointer)ptr).index()); + segmentAware.keepUncompressedIdxFrom(((FileWALPointer)ptr).index()); } /** {@inheritDoc} */ @@ -1910,16 +1918,13 @@ public class FileWriteAheadLogManager extends GridCacheSharedManagerAdapter impl * Responsible for compressing WAL archive segments. * Also responsible for deleting raw copies of already compressed WAL archive segments if they are not reserved. */ - private class FileCompressor extends Thread { - /** Current thread stopping advice. */ - private volatile boolean stopped; - - /** All segments prior to this (inclusive) can be compressed. */ - private volatile long minUncompressedIdxToKeep = -1L; + private class FileCompressor extends FileCompressorWorker { + /** Workers queue. */ + List<FileCompressorWorker> workers = new ArrayList<>(); /** */ - FileCompressor() { - super("wal-file-compressor%" + cctx.igniteInstanceName()); + FileCompressor(IgniteLogger log) { + super(0, log); } /** */ @@ -1927,7 +1932,7 @@ public class FileWriteAheadLogManager extends GridCacheSharedManagerAdapter impl File[] toDel = walArchiveDir.listFiles(WAL_SEGMENT_TEMP_FILE_COMPACTED_FILTER); for (File f : toDel) { - if (stopped) + if (isCancelled()) return; f.delete(); @@ -1936,82 +1941,118 @@ public class FileWriteAheadLogManager extends GridCacheSharedManagerAdapter impl FileDescriptor[] alreadyCompressed = scan(walArchiveDir.listFiles(WAL_SEGMENT_FILE_COMPACTED_FILTER)); if (alreadyCompressed.length > 0) - segmentAware.lastCompressedIdx(alreadyCompressed[alreadyCompressed.length - 1].idx()); + segmentAware.onSegmentCompressed(alreadyCompressed[alreadyCompressed.length - 1].idx()); + + for (int i = 1; i < calculateThreadCount(); i++) { + FileCompressorWorker worker = new FileCompressorWorker(i, log); + + worker.start(); + + workers.add(worker); + } } /** - * @param idx Minimum raw segment index that should be preserved from deletion. + * Calculate optimal additional compressor worker threads count. If quarter of proc threads greater + * than WAL_COMPRESSOR_WORKER_THREAD_CNT, use this value. Otherwise, reduce number of threads. + * + * @return Optimal number of compressor threads. */ - void keepUncompressedIdxFrom(long idx) { - minUncompressedIdxToKeep = idx; + private int calculateThreadCount() { + int procNum = Runtime.getRuntime().availableProcessors(); + + // If quarter of proc threads greater than WAL_COMPRESSOR_WORKER_THREAD_CNT, + // use this value. Otherwise, reduce number of threads. + if (procNum >> 2 >= WAL_COMPRESSOR_WORKER_THREAD_CNT) + return WAL_COMPRESSOR_WORKER_THREAD_CNT; + else + return procNum >> 2; } - /** - * Pessimistically tries to reserve segment for compression in order to avoid concurrent truncation. - * Waits if there's no segment to archive right now. - */ - private long tryReserveNextSegmentOrWait() throws IgniteCheckedException { - long segmentToCompress = segmentAware.waitNextSegmentToCompress(); - boolean reserved = reserve(new FileWALPointer(segmentToCompress, 0, 0)); + /** {@inheritDoc} */ + @Override public void body() throws InterruptedException, IgniteInterruptedCheckedException{ + init(); - return reserved ? segmentToCompress : -1; + super.body0(); } /** - * Deletes raw WAL segments if they aren't locked and already have compressed copies of themselves. + * @throws IgniteInterruptedCheckedException If failed to wait for thread shutdown. */ - private void deleteObsoleteRawSegments() { - FileDescriptor[] descs = scan(walArchiveDir.listFiles(WAL_SEGMENT_COMPACTED_OR_RAW_FILE_FILTER)); + private void shutdown() throws IgniteInterruptedCheckedException { + synchronized (this) { + for (FileCompressorWorker worker: workers) + U.cancel(worker); - Set<Long> indices = new HashSet<>(); - Set<Long> duplicateIndices = new HashSet<>(); + for (FileCompressorWorker worker: workers) + U.join(worker); - for (FileDescriptor desc : descs) { - if (!indices.add(desc.idx)) - duplicateIndices.add(desc.idx); + U.cancel(this); } - for (FileDescriptor desc : descs) { - if (desc.isCompressed()) - continue; + U.join(this); + } + } - // Do not delete reserved or locked segment and any segment after it. - if (segmentReservedOrLocked(desc.idx)) - return; + /** */ + private class FileCompressorWorker extends GridWorker { + /** */ + private Thread thread; - if (desc.idx < minUncompressedIdxToKeep && duplicateIndices.contains(desc.idx)) { - if (!desc.file.delete()) - U.warn(log, "Failed to remove obsolete WAL segment (make sure the process has enough rights): " + - desc.file.getAbsolutePath() + ", exists: " + desc.file.exists()); - } - } + /** */ + FileCompressorWorker(int idx, IgniteLogger log) { + super(cctx.igniteInstanceName(), "wal-file-compressor-%" + cctx.igniteInstanceName() + "%-" + idx, log); + } + + /** */ + void start() { + thread = new IgniteThread(this); + + thread.start(); + } + + /** + * Pessimistically tries to reserve segment for compression in order to avoid concurrent truncation. + * Waits if there's no segment to archive right now. + */ + private long tryReserveNextSegmentOrWait() throws IgniteInterruptedCheckedException{ + long segmentToCompress = segmentAware.waitNextSegmentToCompress(); + + boolean reserved = reserve(new FileWALPointer(segmentToCompress, 0, 0)); + + return reserved ? segmentToCompress : -1; } /** {@inheritDoc} */ - @Override public void run() { - init(); + @Override protected void body() throws InterruptedException, IgniteInterruptedCheckedException { + body0(); + } - while (!Thread.currentThread().isInterrupted() && !stopped) { - long currReservedSegment = -1; + /** */ + private void body0() { + while (!isCancelled()) { + long segIdx = -1L; try { - deleteObsoleteRawSegments(); + segIdx = tryReserveNextSegmentOrWait(); - currReservedSegment = tryReserveNextSegmentOrWait(); - if (currReservedSegment == -1) + if (segIdx <= segmentAware.lastCompressedIdx()) continue; - File tmpZip = new File(walArchiveDir, FileDescriptor.fileName(currReservedSegment) - + FilePageStoreManager.ZIP_SUFFIX + FilePageStoreManager.TMP_SUFFIX); + deleteObsoleteRawSegments(); + + File tmpZip = new File(walArchiveDir, FileDescriptor.fileName(segIdx) + + FilePageStoreManager.ZIP_SUFFIX + FilePageStoreManager.TMP_SUFFIX); - File zip = new File(walArchiveDir, FileDescriptor.fileName(currReservedSegment) + FilePageStoreManager.ZIP_SUFFIX); + File zip = new File(walArchiveDir, FileDescriptor.fileName(segIdx) + FilePageStoreManager.ZIP_SUFFIX); + + File raw = new File(walArchiveDir, FileDescriptor.fileName(segIdx)); - File raw = new File(walArchiveDir, FileDescriptor.fileName(currReservedSegment)); if (!Files.exists(raw.toPath())) throw new IgniteCheckedException("WAL archive segment is missing: " + raw); - compressSegmentToFile(currReservedSegment, raw, tmpZip); + compressSegmentToFile(segIdx, raw, tmpZip); Files.move(tmpZip.toPath(), zip.toPath()); @@ -2022,27 +2063,27 @@ public class FileWriteAheadLogManager extends GridCacheSharedManagerAdapter impl if (evt.isRecordable(EVT_WAL_SEGMENT_COMPACTED)) { evt.record(new WalSegmentCompactedEvent( - cctx.discovery().localNode(), - currReservedSegment, - zip.getAbsoluteFile()) + cctx.localNode(), + segIdx, + zip.getAbsoluteFile()) ); } } - segmentAware.lastCompressedIdx(currReservedSegment); + segmentAware.onSegmentCompressed(segIdx); } catch (IgniteInterruptedCheckedException ignore) { Thread.currentThread().interrupt(); } catch (IgniteCheckedException | IOException e) { - U.error(log, "Compression of WAL segment [idx=" + currReservedSegment + - "] was skipped due to unexpected error", e); + U.error(log, "Compression of WAL segment [idx=" + segIdx + + "] was skipped due to unexpected error", e); - segmentAware.lastCompressedIdx(currReservedSegment); + segmentAware.onSegmentCompressed(segIdx); } finally { - if (currReservedSegment != -1) - release(new FileWALPointer(currReservedSegment, 0, 0)); + if (segIdx != -1L) + release(new FileWALPointer(segIdx, 0, 0)); } } } @@ -2053,7 +2094,7 @@ public class FileWriteAheadLogManager extends GridCacheSharedManagerAdapter impl * @param zip Zip file. */ private void compressSegmentToFile(long nextSegment, File raw, File zip) - throws IOException, IgniteCheckedException { + throws IOException, IgniteCheckedException { int segmentSerializerVer; try (FileIO fileIO = ioFactory.create(raw)) { @@ -2083,7 +2124,7 @@ public class FileWriteAheadLogManager extends GridCacheSharedManagerAdapter impl }; try (SingleSegmentLogicalRecordsIterator iter = new SingleSegmentLogicalRecordsIterator( - log, cctx, ioFactory, BUF_SIZE, nextSegment, walArchiveDir, appendToZipC)) { + log, cctx, ioFactory, BUF_SIZE, nextSegment, walArchiveDir, appendToZipC)) { while (iter.hasNextX()) iter.nextX(); @@ -2102,7 +2143,7 @@ public class FileWriteAheadLogManager extends GridCacheSharedManagerAdapter impl * @param ser Record Serializer. */ @NotNull private ByteBuffer prepareSwitchSegmentRecordBuffer(long nextSegment, RecordSerializer ser) - throws IgniteCheckedException { + throws IgniteCheckedException { SwitchSegmentRecord switchRecord = new SwitchSegmentRecord(); int switchRecordSize = ser.size(switchRecord); @@ -2117,16 +2158,33 @@ public class FileWriteAheadLogManager extends GridCacheSharedManagerAdapter impl } /** - * @throws IgniteInterruptedCheckedException If failed to wait for thread shutdown. + * Deletes raw WAL segments if they aren't locked and already have compressed copies of themselves. */ - private void shutdown() throws IgniteInterruptedCheckedException { - synchronized (this) { - stopped = true; + private void deleteObsoleteRawSegments() { + FileDescriptor[] descs = scan(walArchiveDir.listFiles(WAL_SEGMENT_COMPACTED_OR_RAW_FILE_FILTER)); - notifyAll(); + Set<Long> indices = new HashSet<>(); + Set<Long> duplicateIndices = new HashSet<>(); + + for (FileDescriptor desc : descs) { + if (!indices.add(desc.idx)) + duplicateIndices.add(desc.idx); } - U.join(this); + for (FileDescriptor desc : descs) { + if (desc.isCompressed()) + continue; + + // Do not delete reserved or locked segment and any segment after it. + if (segmentReservedOrLocked(desc.idx)) + return; + + if (desc.idx < segmentAware.keepUncompressedIdxFrom() && duplicateIndices.contains(desc.idx)) { + if (desc.file.exists() && !desc.file.delete()) + U.warn(log, "Failed to remove obsolete WAL segment (make sure the process has enough rights): " + + desc.file.getAbsolutePath() + ", exists: " + desc.file.exists()); + } + } } } http://git-wip-us.apache.org/repos/asf/ignite/blob/036bd074/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/FsyncModeFileWriteAheadLogManager.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/FsyncModeFileWriteAheadLogManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/FsyncModeFileWriteAheadLogManager.java index df8f4de..1c0325e 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/FsyncModeFileWriteAheadLogManager.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/FsyncModeFileWriteAheadLogManager.java @@ -783,7 +783,7 @@ public class FsyncModeFileWriteAheadLogManager extends GridCacheSharedManagerAda } /** {@inheritDoc} */ - @Override public boolean reserve(WALPointer start) throws IgniteCheckedException { + @Override public boolean reserve(WALPointer start) { assert start != null && start instanceof FileWALPointer : "Invalid start pointer: " + start; if (mode == WALMode.NONE) @@ -791,8 +791,7 @@ public class FsyncModeFileWriteAheadLogManager extends GridCacheSharedManagerAda FileArchiver archiver0 = archiver; - if (archiver0 == null) - throw new IgniteCheckedException("Could not reserve WAL segment: archiver == null"); + assert archiver0 != null : "Could not reserve WAL segment: archiver == null"; archiver0.reserve(((FileWALPointer)start).index()); @@ -1912,7 +1911,7 @@ public class FsyncModeFileWriteAheadLogManager extends GridCacheSharedManagerAda * Pessimistically tries to reserve segment for compression in order to avoid concurrent truncation. * Waits if there's no segment to archive right now. */ - private long tryReserveNextSegmentOrWait() throws InterruptedException, IgniteCheckedException { + private long tryReserveNextSegmentOrWait() throws InterruptedException { long segmentToCompress = lastCompressedIdx + 1; synchronized (this) { http://git-wip-us.apache.org/repos/asf/ignite/blob/036bd074/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/aware/SegmentAware.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/aware/SegmentAware.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/aware/SegmentAware.java index 3379b74..6ba0399 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/aware/SegmentAware.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/aware/SegmentAware.java @@ -36,15 +36,17 @@ public class SegmentAware { /** Manages last archived index, emulates archivation in no-archiver mode. */ private final SegmentArchivedStorage segmentArchivedStorage = buildArchivedStorage(segmentLockStorage); /** Storage of actual information about current index of compressed segments. */ - private final SegmentCompressStorage segmentCompressStorage = buildCompressStorage(segmentArchivedStorage); + private final SegmentCompressStorage segmentCompressStorage; /** Storage of absolute current segment index. */ private final SegmentCurrentStateStorage segmentCurrStateStorage; /** * @param walSegmentsCnt Total WAL segments count. + * @param compactionEnabled Is wal compaction enabled. */ - public SegmentAware(int walSegmentsCnt) { + public SegmentAware(int walSegmentsCnt, boolean compactionEnabled) { segmentCurrStateStorage = buildCurrentStateStorage(walSegmentsCnt, segmentArchivedStorage); + segmentCompressStorage = buildCompressStorage(segmentArchivedStorage, compactionEnabled); } /** @@ -108,12 +110,12 @@ public class SegmentAware { } /** - * Force set last compressed segment. + * Callback after segment compression finish. * - * @param lastCompressedIdx Segment which was last compressed. + * @param compressedIdx Index of compressed segment. */ - public void lastCompressedIdx(long lastCompressedIdx) { - segmentCompressStorage.lastCompressedIdx(lastCompressedIdx); + public void onSegmentCompressed(long compressedIdx) { + segmentCompressStorage.onSegmentCompressed(compressedIdx); } /** @@ -124,6 +126,20 @@ public class SegmentAware { } /** + * @param idx Minimum raw segment index that should be preserved from deletion. + */ + public void keepUncompressedIdxFrom(long idx) { + segmentCompressStorage.keepUncompressedIdxFrom(idx); + } + + /** + * @return Minimum raw segment index that should be preserved from deletion. + */ + public long keepUncompressedIdxFrom() { + return segmentCompressStorage.keepUncompressedIdxFrom(); + } + + /** * Update current WAL index. * * @param curAbsWalIdx New current WAL index. http://git-wip-us.apache.org/repos/asf/ignite/blob/036bd074/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/aware/SegmentCompressStorage.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/aware/SegmentCompressStorage.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/aware/SegmentCompressStorage.java index 30c9a2d..174fb46 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/aware/SegmentCompressStorage.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/aware/SegmentCompressStorage.java @@ -18,6 +18,10 @@ package org.apache.ignite.internal.processors.cache.persistence.wal.aware; import org.apache.ignite.internal.IgniteInterruptedCheckedException; +import java.util.ArrayDeque; +import java.util.ArrayList; +import java.util.List; +import java.util.Queue; /** * Storage of actual information about current index of compressed segments. @@ -25,25 +29,50 @@ import org.apache.ignite.internal.IgniteInterruptedCheckedException; public class SegmentCompressStorage { /** Flag of interrupt waiting on this object. */ private volatile boolean interrupted; + /** Manages last archived index, emulates archivation in no-archiver mode. */ private final SegmentArchivedStorage segmentArchivedStorage; + + /** If WAL compaction enabled. */ + private final boolean compactionEnabled; + /** Last successfully compressed segment. */ private volatile long lastCompressedIdx = -1L; + /** Last enqueued to compress segment. */ + private long lastEnqueuedToCompressIdx = -1L; + + /** Segments to compress queue. */ + private final Queue<Long> segmentsToCompress = new ArrayDeque<>(); + + /** List of currently compressing segments. */ + private final List<Long> compressingSegments = new ArrayList<>(); + + /** Compressed segment with maximal index. */ + private long lastMaxCompressedIdx = -1L; + + /** Min uncompressed index to keep. */ + private volatile long minUncompressedIdxToKeep = -1L; + /** * @param segmentArchivedStorage Storage of last archived segment. + * @param compactionEnabled If WAL compaction enabled. */ - private SegmentCompressStorage(SegmentArchivedStorage segmentArchivedStorage) { + private SegmentCompressStorage(SegmentArchivedStorage segmentArchivedStorage, boolean compactionEnabled) { this.segmentArchivedStorage = segmentArchivedStorage; + this.compactionEnabled = compactionEnabled; + this.segmentArchivedStorage.addObserver(this::onSegmentArchived); } /** * @param segmentArchivedStorage Storage of last archived segment. + * @param compactionEnabled If WAL compaction enabled. */ - static SegmentCompressStorage buildCompressStorage(SegmentArchivedStorage segmentArchivedStorage) { - SegmentCompressStorage storage = new SegmentCompressStorage(segmentArchivedStorage); + static SegmentCompressStorage buildCompressStorage(SegmentArchivedStorage segmentArchivedStorage, + boolean compactionEnabled) { + SegmentCompressStorage storage = new SegmentCompressStorage(segmentArchivedStorage, compactionEnabled); segmentArchivedStorage.addObserver(storage::onSegmentArchived); @@ -51,12 +80,20 @@ public class SegmentCompressStorage { } /** - * Force set last compressed segment. + * Callback after segment compression finish. * - * @param lastCompressedIdx Segment which was last compressed. + * @param compressedIdx Index of compressed segment. */ - void lastCompressedIdx(long lastCompressedIdx) { - this.lastCompressedIdx = lastCompressedIdx; + synchronized void onSegmentCompressed(long compressedIdx) { + if (compressedIdx > lastMaxCompressedIdx) + lastMaxCompressedIdx = compressedIdx; + + compressingSegments.remove(compressedIdx); + + if (!compressingSegments.isEmpty()) + this.lastCompressedIdx = Math.min(lastMaxCompressedIdx, compressingSegments.get(0) - 1); + else + this.lastCompressedIdx = lastMaxCompressedIdx; } /** @@ -71,13 +108,8 @@ public class SegmentCompressStorage { * there's no segment to archive right now. */ synchronized long nextSegmentToCompressOrWait() throws IgniteInterruptedCheckedException { - long segmentToCompress = lastCompressedIdx + 1; - try { - while ( - segmentToCompress > segmentArchivedStorage.lastArchivedAbsoluteIndex() - && !interrupted - ) + while (segmentsToCompress.peek() == null && !interrupted) wait(); } catch (InterruptedException e) { @@ -86,7 +118,11 @@ public class SegmentCompressStorage { checkInterrupted(); - return segmentToCompress; + Long idx = segmentsToCompress.poll(); + + compressingSegments.add(idx); + + return idx == null ? -1L : idx; } /** @@ -110,7 +146,23 @@ public class SegmentCompressStorage { * Callback for waking up compressor when new segment is archived. */ private synchronized void onSegmentArchived(long lastAbsArchivedIdx) { + while (lastEnqueuedToCompressIdx < lastAbsArchivedIdx && compactionEnabled) + segmentsToCompress.add(++lastEnqueuedToCompressIdx); + notifyAll(); } + /** + * @param idx Minimum raw segment index that should be preserved from deletion. + */ + void keepUncompressedIdxFrom(long idx) { + minUncompressedIdxToKeep = idx; + } + + /** + * @return Minimum raw segment index that should be preserved from deletion. + */ + long keepUncompressedIdxFrom() { + return minUncompressedIdxToKeep; + } } http://git-wip-us.apache.org/repos/asf/ignite/blob/036bd074/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/db/IgnitePdsReserveWalSegmentsWithCompactionTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/db/IgnitePdsReserveWalSegmentsWithCompactionTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/db/IgnitePdsReserveWalSegmentsWithCompactionTest.java new file mode 100644 index 0000000..bc34f29 --- /dev/null +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/db/IgnitePdsReserveWalSegmentsWithCompactionTest.java @@ -0,0 +1,34 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.cache.persistence.db; + +import org.apache.ignite.configuration.IgniteConfiguration; + +/** + * + */ +public class IgnitePdsReserveWalSegmentsWithCompactionTest extends IgnitePdsReserveWalSegmentsTest { + /** {@inheritDoc} */ + @Override protected IgniteConfiguration getConfiguration(String gridName) throws Exception { + IgniteConfiguration cfg = super.getConfiguration(gridName); + + cfg.getDataStorageConfiguration().setWalCompactionEnabled(true); + + return cfg; + } +} http://git-wip-us.apache.org/repos/asf/ignite/blob/036bd074/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/pagemem/NoOpWALManager.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/pagemem/NoOpWALManager.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/pagemem/NoOpWALManager.java index 811a231..df89419 100644 --- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/pagemem/NoOpWALManager.java +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/pagemem/NoOpWALManager.java @@ -68,7 +68,7 @@ public class NoOpWALManager implements IgniteWriteAheadLogManager { } /** {@inheritDoc} */ - @Override public boolean reserve(WALPointer start) throws IgniteCheckedException { + @Override public boolean reserve(WALPointer start) { return false; } http://git-wip-us.apache.org/repos/asf/ignite/blob/036bd074/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/wal/aware/SegmentAwareTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/wal/aware/SegmentAwareTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/wal/aware/SegmentAwareTest.java index 8287684..7840b09 100644 --- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/wal/aware/SegmentAwareTest.java +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/wal/aware/SegmentAwareTest.java @@ -31,13 +31,12 @@ import static org.junit.Assert.assertThat; * Test for {@link SegmentAware}. */ public class SegmentAwareTest extends TestCase { - /** * Waiting finished when work segment is set. */ public void testFinishAwaitSegment_WhenExactWaitingSegmentWasSet() throws IgniteCheckedException, InterruptedException { //given: thread which awaited segment. - SegmentAware aware = new SegmentAware(10); + SegmentAware aware = new SegmentAware(10, false); IgniteInternalFuture future = awaitThread(() -> aware.awaitSegment(5)); @@ -53,7 +52,7 @@ public class SegmentAwareTest extends TestCase { */ public void testFinishAwaitSegment_WhenGreaterThanWaitingSegmentWasSet() throws IgniteCheckedException, InterruptedException { //given: thread which awaited segment. - SegmentAware aware = new SegmentAware(10); + SegmentAware aware = new SegmentAware(10, false); IgniteInternalFuture future = awaitThread(() -> aware.awaitSegment(5)); @@ -69,7 +68,7 @@ public class SegmentAwareTest extends TestCase { */ public void testFinishAwaitSegment_WhenNextSegmentEqualToWaitingOne() throws IgniteCheckedException, InterruptedException { //given: thread which awaited segment. - SegmentAware aware = new SegmentAware(10); + SegmentAware aware = new SegmentAware(10, false); IgniteInternalFuture future = awaitThread(() -> aware.awaitSegment(5)); @@ -91,7 +90,7 @@ public class SegmentAwareTest extends TestCase { */ public void testFinishAwaitSegment_WhenInterruptWasCall() throws IgniteCheckedException, InterruptedException { //given: thread which awaited segment. - SegmentAware aware = new SegmentAware(10); + SegmentAware aware = new SegmentAware(10, false); IgniteInternalFuture future = awaitThread(() -> aware.awaitSegment(5)); @@ -107,7 +106,7 @@ public class SegmentAwareTest extends TestCase { */ public void testFinishWaitSegmentForArchive_WhenWorkSegmentIncremented() throws IgniteCheckedException, InterruptedException { //given: thread which awaited segment. - SegmentAware aware = new SegmentAware(10); + SegmentAware aware = new SegmentAware(10, false); aware.curAbsWalIdx(5); aware.setLastArchivedAbsoluteIndex(4); @@ -126,7 +125,7 @@ public class SegmentAwareTest extends TestCase { */ public void testFinishWaitSegmentForArchive_WhenWorkSegmentGreaterValue() throws IgniteCheckedException, InterruptedException { //given: thread which awaited segment. - SegmentAware aware = new SegmentAware(10); + SegmentAware aware = new SegmentAware(10, false); aware.curAbsWalIdx(5); aware.setLastArchivedAbsoluteIndex(4); @@ -145,7 +144,7 @@ public class SegmentAwareTest extends TestCase { */ public void testFinishWaitSegmentForArchive_WhenInterruptWasCall() throws IgniteCheckedException, InterruptedException { //given: thread which awaited segment. - SegmentAware aware = new SegmentAware(10); + SegmentAware aware = new SegmentAware(10, false); aware.curAbsWalIdx(5); aware.setLastArchivedAbsoluteIndex(4); @@ -164,7 +163,7 @@ public class SegmentAwareTest extends TestCase { */ public void testCorrectCalculateNextSegmentIndex() throws IgniteCheckedException, InterruptedException { //given: thread which awaited segment. - SegmentAware aware = new SegmentAware(10); + SegmentAware aware = new SegmentAware(10, false); aware.curAbsWalIdx(5); @@ -180,7 +179,7 @@ public class SegmentAwareTest extends TestCase { */ public void testFinishWaitNextAbsoluteIndex_WhenMarkAsArchivedFirstSegment() throws IgniteCheckedException, InterruptedException { //given: thread which awaited segment. - SegmentAware aware = new SegmentAware(2); + SegmentAware aware = new SegmentAware(2, false); aware.curAbsWalIdx(1); aware.setLastArchivedAbsoluteIndex(-1); @@ -199,7 +198,7 @@ public class SegmentAwareTest extends TestCase { */ public void testFinishWaitNextAbsoluteIndex_WhenSetToArchivedFirst() throws IgniteCheckedException, InterruptedException { //given: thread which awaited segment. - SegmentAware aware = new SegmentAware(2); + SegmentAware aware = new SegmentAware(2, false); aware.curAbsWalIdx(1); aware.setLastArchivedAbsoluteIndex(-1); @@ -218,7 +217,7 @@ public class SegmentAwareTest extends TestCase { */ public void testFinishWaitNextAbsoluteIndex_WhenOnlyForceInterruptWasCall() throws IgniteCheckedException, InterruptedException { //given: thread which awaited segment. - SegmentAware aware = new SegmentAware(2); + SegmentAware aware = new SegmentAware(2, false); aware.curAbsWalIdx(2); aware.setLastArchivedAbsoluteIndex(-1); @@ -243,7 +242,7 @@ public class SegmentAwareTest extends TestCase { */ public void testFinishSegmentArchived_WhenSetExactWaitingSegment() throws IgniteCheckedException, InterruptedException { //given: thread which awaited segment. - SegmentAware aware = new SegmentAware(10); + SegmentAware aware = new SegmentAware(10, false); IgniteInternalFuture future = awaitThread(() -> aware.awaitSegmentArchived(5)); @@ -257,9 +256,9 @@ public class SegmentAwareTest extends TestCase { /** * Waiting finished when segment archived. */ - public void testFinishSegmentArchived_WhenMarkExactWatingSegment() throws IgniteCheckedException, InterruptedException { + public void testFinishSegmentArchived_WhenMarkExactWaitingSegment() throws IgniteCheckedException, InterruptedException { //given: thread which awaited segment. - SegmentAware aware = new SegmentAware(10); + SegmentAware aware = new SegmentAware(10, false); IgniteInternalFuture future = awaitThread(() -> aware.awaitSegmentArchived(5)); @@ -273,9 +272,9 @@ public class SegmentAwareTest extends TestCase { /** * Waiting finished when segment archived. */ - public void testFinishSegmentArchived_WhenSetGreaterThanWatingSegment() throws IgniteCheckedException, InterruptedException { + public void testFinishSegmentArchived_WhenSetGreaterThanWaitingSegment() throws IgniteCheckedException, InterruptedException { //given: thread which awaited segment. - SegmentAware aware = new SegmentAware(10); + SegmentAware aware = new SegmentAware(10, false); IgniteInternalFuture future = awaitThread(() -> aware.awaitSegmentArchived(5)); @@ -289,9 +288,9 @@ public class SegmentAwareTest extends TestCase { /** * Waiting finished when segment archived. */ - public void testFinishSegmentArchived_WhenMarkGreaterThanWatingSegment() throws IgniteCheckedException, InterruptedException { + public void testFinishSegmentArchived_WhenMarkGreaterThanWaitingSegment() throws IgniteCheckedException, InterruptedException { //given: thread which awaited segment. - SegmentAware aware = new SegmentAware(10); + SegmentAware aware = new SegmentAware(10, false); IgniteInternalFuture future = awaitThread(() -> aware.awaitSegmentArchived(5)); @@ -307,7 +306,7 @@ public class SegmentAwareTest extends TestCase { */ public void testFinishSegmentArchived_WhenInterruptWasCall() throws IgniteCheckedException, InterruptedException { //given: thread which awaited segment. - SegmentAware aware = new SegmentAware(10); + SegmentAware aware = new SegmentAware(10, false); aware.curAbsWalIdx(5); aware.setLastArchivedAbsoluteIndex(4); @@ -326,7 +325,7 @@ public class SegmentAwareTest extends TestCase { */ public void testMarkAsMovedToArchive_WhenReleaseLockedSegment() throws IgniteCheckedException, InterruptedException { //given: thread which awaited segment. - SegmentAware aware = new SegmentAware(10); + SegmentAware aware = new SegmentAware(10, false); aware.checkCanReadArchiveOrReserveWorkSegment(5); @@ -344,7 +343,7 @@ public class SegmentAwareTest extends TestCase { */ public void testMarkAsMovedToArchive_WhenInterruptWasCall() throws IgniteCheckedException, InterruptedException { //given: thread which awaited segment. - SegmentAware aware = new SegmentAware(10); + SegmentAware aware = new SegmentAware(10, false); aware.checkCanReadArchiveOrReserveWorkSegment(5); IgniteInternalFuture future = awaitThread(() -> aware.markAsMovedToArchive(5)); @@ -364,9 +363,9 @@ public class SegmentAwareTest extends TestCase { */ public void testFinishWaitSegmentToCompress_WhenSetLastArchivedSegment() throws IgniteCheckedException, InterruptedException { //given: thread which awaited segment. - SegmentAware aware = new SegmentAware(10); + SegmentAware aware = new SegmentAware(10, true); - aware.lastCompressedIdx(5); + aware.onSegmentCompressed(5); IgniteInternalFuture future = awaitThread(aware::waitNextSegmentToCompress); @@ -382,9 +381,9 @@ public class SegmentAwareTest extends TestCase { */ public void testFinishWaitSegmentToCompress_WhenMarkLastArchivedSegment() throws IgniteCheckedException, InterruptedException { //given: thread which awaited segment. - SegmentAware aware = new SegmentAware(10); + SegmentAware aware = new SegmentAware(10, true); - aware.lastCompressedIdx(5); + aware.onSegmentCompressed(5); IgniteInternalFuture future = awaitThread(aware::waitNextSegmentToCompress); @@ -400,9 +399,9 @@ public class SegmentAwareTest extends TestCase { */ public void testCorrectCalculateNextCompressSegment() throws IgniteCheckedException, InterruptedException { //given: thread which awaited segment. - SegmentAware aware = new SegmentAware(10); + SegmentAware aware = new SegmentAware(10, true); - aware.lastCompressedIdx(5); + aware.onSegmentCompressed(5); aware.setLastArchivedAbsoluteIndex(6); aware.lastTruncatedArchiveIdx(7); @@ -418,8 +417,8 @@ public class SegmentAwareTest extends TestCase { */ public void testFinishWaitSegmentToCompress_WhenInterruptWasCall() throws IgniteCheckedException, InterruptedException { //given: thread which awaited segment. - SegmentAware aware = new SegmentAware(10); - aware.lastCompressedIdx(5); + SegmentAware aware = new SegmentAware(10, true); + aware.onSegmentCompressed(5); IgniteInternalFuture future = awaitThread(aware::waitNextSegmentToCompress); @@ -431,11 +430,34 @@ public class SegmentAwareTest extends TestCase { } /** + * Tests that {@link SegmentAware#onSegmentCompressed} returns segments in proper order. + */ + public void testLastCompressedIdxProperOrdering() throws IgniteInterruptedCheckedException { + SegmentAware aware = new SegmentAware(10, true); + + for (int i = 0; i < 5 ; i++) { + aware.setLastArchivedAbsoluteIndex(i); + aware.waitNextSegmentToCompress(); + } + + aware.onSegmentCompressed(0); + + aware.onSegmentCompressed(4); + assertEquals(0, aware.lastCompressedIdx()); + aware.onSegmentCompressed(1); + assertEquals(1, aware.lastCompressedIdx()); + aware.onSegmentCompressed(3); + assertEquals(1, aware.lastCompressedIdx()); + aware.onSegmentCompressed(2); + assertEquals(4, aware.lastCompressedIdx()); + } + + /** * Segment reserve correctly. */ public void testReserveCorrectly() { //given: thread which awaited segment. - SegmentAware aware = new SegmentAware(10); + SegmentAware aware = new SegmentAware(10, false); //when: reserve one segment twice and one segment once. aware.reserve(5); @@ -478,7 +500,7 @@ public class SegmentAwareTest extends TestCase { */ public void testAssertFail_WhenReleaseUnreservedSegment() { //given: thread which awaited segment. - SegmentAware aware = new SegmentAware(10); + SegmentAware aware = new SegmentAware(10, false); aware.reserve(5); try { @@ -497,7 +519,7 @@ public class SegmentAwareTest extends TestCase { */ public void testReserveWorkSegmentCorrectly() { //given: thread which awaited segment. - SegmentAware aware = new SegmentAware(10); + SegmentAware aware = new SegmentAware(10, false); //when: lock one segment twice. aware.checkCanReadArchiveOrReserveWorkSegment(5); @@ -530,7 +552,7 @@ public class SegmentAwareTest extends TestCase { */ public void testAssertFail_WhenReleaseUnreservedWorkSegment() { //given: thread which awaited segment. - SegmentAware aware = new SegmentAware(10); + SegmentAware aware = new SegmentAware(10, false); aware.checkCanReadArchiveOrReserveWorkSegment(5); try { http://git-wip-us.apache.org/repos/asf/ignite/blob/036bd074/modules/core/src/test/java/org/apache/ignite/testsuites/IgnitePdsTestSuite2.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/testsuites/IgnitePdsTestSuite2.java b/modules/core/src/test/java/org/apache/ignite/testsuites/IgnitePdsTestSuite2.java index a9f2601..7631834 100644 --- a/modules/core/src/test/java/org/apache/ignite/testsuites/IgnitePdsTestSuite2.java +++ b/modules/core/src/test/java/org/apache/ignite/testsuites/IgnitePdsTestSuite2.java @@ -34,6 +34,7 @@ import org.apache.ignite.internal.processors.cache.persistence.baseline.IgniteOf import org.apache.ignite.internal.processors.cache.persistence.baseline.IgniteOnlineNodeOutOfBaselineFullApiSelfTest; import org.apache.ignite.internal.processors.cache.persistence.db.IgnitePdsRebalancingOnNotStableTopologyTest; import org.apache.ignite.internal.processors.cache.persistence.db.IgnitePdsReserveWalSegmentsTest; +import org.apache.ignite.internal.processors.cache.persistence.db.IgnitePdsReserveWalSegmentsWithCompactionTest; import org.apache.ignite.internal.processors.cache.persistence.db.IgnitePdsWholeClusterRestartTest; import org.apache.ignite.internal.processors.cache.persistence.db.SlowHistoricalRebalanceSmallHistoryTest; import org.apache.ignite.internal.processors.cache.persistence.db.checkpoint.IgniteCheckpointDirtyPagesForLowLoadTest; @@ -156,6 +157,7 @@ public class IgnitePdsTestSuite2 extends TestSuite { suite.addTestSuite(IgnitePdsExchangeDuringCheckpointTest.class); suite.addTestSuite(IgnitePdsReserveWalSegmentsTest.class); + suite.addTestSuite(IgnitePdsReserveWalSegmentsWithCompactionTest.class); // new style folders with generated consistent ID test suite.addTestSuite(IgniteUidAsConsistentIdMigrationTest.class);
