Repository: ignite Updated Branches: refs/heads/master c4d859c92 -> 47dcc2c9b
IGNITE-8393 Unexpected error during WAL compression fixed Signed-off-by: Andrey Gura <[email protected]> Project: http://git-wip-us.apache.org/repos/asf/ignite/repo Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/47dcc2c9 Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/47dcc2c9 Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/47dcc2c9 Branch: refs/heads/master Commit: 47dcc2c9b6986f8ff189765125edba610072043e Parents: c4d859c Author: Ivan Rakov <[email protected]> Authored: Fri Apr 27 15:06:08 2018 +0300 Committer: Andrey Gura <[email protected]> Committed: Fri Apr 27 15:06:54 2018 +0300 ---------------------------------------------------------------------- .../GridCacheDatabaseSharedManager.java | 6 +- .../wal/FileWriteAheadLogManager.java | 53 ++++++++--- .../wal/FsyncModeFileWriteAheadLogManager.java | 59 ++++++++---- .../persistence/db/wal/WalCompactionTest.java | 99 +++++++++++++++++++- 4 files changed, 180 insertions(+), 37 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/ignite/blob/47dcc2c9/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/GridCacheDatabaseSharedManager.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/GridCacheDatabaseSharedManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/GridCacheDatabaseSharedManager.java index 7c23cad..8c5e32f 100755 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/GridCacheDatabaseSharedManager.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/GridCacheDatabaseSharedManager.java @@ -780,8 +780,10 @@ public class GridCacheDatabaseSharedManager extends IgniteCacheDatabaseSharedMan WALPointer restore = restoreMemory(status); - if (restore == null && status.endPtr != CheckpointStatus.NULL_PTR) - throw new StorageException("Restore wal pointer = " + restore + ", while status.endPtr = " + status.endPtr + "."); + if (restore == null && status.endPtr != CheckpointStatus.NULL_PTR) { + throw new StorageException("Restore wal pointer = " + restore + ", while status.endPtr = " + + status.endPtr + ". Can't restore memory - critical part of WAL archive is missing."); + } // First, bring memory to the last consistent checkpoint state if needed. // This method should return a pointer to the last valid record in the WAL. http://git-wip-us.apache.org/repos/asf/ignite/blob/47dcc2c9/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/FileWriteAheadLogManager.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/FileWriteAheadLogManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/FileWriteAheadLogManager.java index a28b73b..7795344 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/FileWriteAheadLogManager.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/FileWriteAheadLogManager.java @@ -39,8 +39,10 @@ import java.util.ArrayList; import java.util.Arrays; import java.util.Collection; import java.util.HashMap; +import java.util.HashSet; import java.util.List; import java.util.Map; +import java.util.Set; import java.util.TreeSet; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.PriorityBlockingQueue; @@ -1869,17 +1871,28 @@ public class FileWriteAheadLogManager extends GridCacheSharedManagerAdapter impl } /** - * + * Deletes raw WAL segments if they aren't locked and already have compressed copies of themselves. */ private void deleteObsoleteRawSegments() { - FileDescriptor[] descs = scan(walArchiveDir.listFiles(WAL_SEGMENT_FILE_FILTER)); + FileDescriptor[] descs = scan(walArchiveDir.listFiles(WAL_SEGMENT_COMPACTED_OR_RAW_FILE_FILTER)); + + Set<Long> indices = new HashSet<>(); + Set<Long> duplicateIndices = new HashSet<>(); + + for (FileDescriptor desc : descs) { + if (!indices.add(desc.idx)) + duplicateIndices.add(desc.idx); + } for (FileDescriptor desc : descs) { + if (desc.isCompressed()) + continue; + // Do not delete reserved or locked segment and any segment after it. if (segmentReservedOrLocked(desc.idx)) return; - if (desc.idx < lastCompressedIdx) { + if (desc.idx < lastCompressedIdx && duplicateIndices.contains(desc.idx)) { if (!desc.file.delete()) U.warn(log, "Failed to remove obsolete WAL segment (make sure the process has enough rights): " + desc.file.getAbsolutePath() + ", exists: " + desc.file.exists()); @@ -1892,22 +1905,24 @@ public class FileWriteAheadLogManager extends GridCacheSharedManagerAdapter impl init(); while (!Thread.currentThread().isInterrupted() && !stopped) { + long currReservedSegment = -1; + try { deleteObsoleteRawSegments(); - long nextSegment = tryReserveNextSegmentOrWait(); - if (nextSegment == -1) + currReservedSegment = tryReserveNextSegmentOrWait(); + if (currReservedSegment == -1) continue; - File tmpZip = new File(walArchiveDir, FileDescriptor.fileName(nextSegment) + ".zip" + ".tmp"); + File tmpZip = new File(walArchiveDir, FileDescriptor.fileName(currReservedSegment) + ".zip" + ".tmp"); - File zip = new File(walArchiveDir, FileDescriptor.fileName(nextSegment) + ".zip"); + File zip = new File(walArchiveDir, FileDescriptor.fileName(currReservedSegment) + ".zip"); - File raw = new File(walArchiveDir, FileDescriptor.fileName(nextSegment)); + File raw = new File(walArchiveDir, FileDescriptor.fileName(currReservedSegment)); if (!Files.exists(raw.toPath())) throw new IgniteCheckedException("WAL archive segment is missing: " + raw); - compressSegmentToFile(nextSegment, raw, tmpZip); + compressSegmentToFile(currReservedSegment, raw, tmpZip); Files.move(tmpZip.toPath(), zip.toPath()); @@ -1917,14 +1932,27 @@ public class FileWriteAheadLogManager extends GridCacheSharedManagerAdapter impl } } - lastCompressedIdx = nextSegment; + lastCompressedIdx = currReservedSegment; } catch (IgniteCheckedException | IOException e) { - U.error(log, "Unexpected error during WAL compression", e); + U.error(log, "Compression of WAL segment [idx=" + currReservedSegment + + "] was skipped due to unexpected error", e); + + lastCompressedIdx++; } catch (InterruptedException ignore) { Thread.currentThread().interrupt(); } + finally { + try { + if (currReservedSegment != -1) + release(new FileWALPointer(currReservedSegment, 0, 0)); + } + catch (IgniteCheckedException e) { + U.error(log, "Can't release raw WAL segment [idx=" + currReservedSegment + + "] after compression", e); + } + } } } @@ -1977,9 +2005,6 @@ public class FileWriteAheadLogManager extends GridCacheSharedManagerAdapter impl zos.write(heapBuf.array()); } - finally { - release(new FileWALPointer(nextSegment, 0, 0)); - } } /** http://git-wip-us.apache.org/repos/asf/ignite/blob/47dcc2c9/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/FsyncModeFileWriteAheadLogManager.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/FsyncModeFileWriteAheadLogManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/FsyncModeFileWriteAheadLogManager.java index e354b43..dfb1c41 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/FsyncModeFileWriteAheadLogManager.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/FsyncModeFileWriteAheadLogManager.java @@ -34,9 +34,11 @@ import java.util.ArrayList; import java.util.Arrays; import java.util.Collection; import java.util.HashMap; +import java.util.HashSet; import java.util.List; import java.util.Map; import java.util.NavigableMap; +import java.util.Set; import java.util.TreeMap; import java.util.TreeSet; import java.util.concurrent.PriorityBlockingQueue; @@ -1703,19 +1705,30 @@ public class FsyncModeFileWriteAheadLogManager extends GridCacheSharedManagerAda } /** - * + * Deletes raw WAL segments if they aren't locked and already have compressed copies of themselves. */ private void deleteObsoleteRawSegments() { - FileDescriptor[] descs = scan(walArchiveDir.listFiles(WAL_SEGMENT_FILE_FILTER)); + FsyncModeFileWriteAheadLogManager.FileDescriptor[] descs = scan(walArchiveDir.listFiles(WAL_SEGMENT_COMPACTED_OR_RAW_FILE_FILTER)); + + Set<Long> indices = new HashSet<>(); + Set<Long> duplicateIndices = new HashSet<>(); + + for (FsyncModeFileWriteAheadLogManager.FileDescriptor desc : descs) { + if (!indices.add(desc.idx)) + duplicateIndices.add(desc.idx); + } FileArchiver archiver0 = archiver; - for (FileDescriptor desc : descs) { + for (FsyncModeFileWriteAheadLogManager.FileDescriptor desc : descs) { + if (desc.isCompressed()) + continue; + // Do not delete reserved or locked segment and any segment after it. if (archiver0 != null && archiver0.reserved(desc.idx)) return; - if (desc.idx < lastCompressedIdx) { + if (desc.idx < lastCompressedIdx && duplicateIndices.contains(desc.idx)) { if (!desc.file.delete()) U.warn(log, "Failed to remove obsolete WAL segment (make sure the process has enough rights): " + desc.file.getAbsolutePath() + ", exists: " + desc.file.exists()); @@ -1728,39 +1741,54 @@ public class FsyncModeFileWriteAheadLogManager extends GridCacheSharedManagerAda init(); while (!Thread.currentThread().isInterrupted() && !stopped) { + long currReservedSegment = -1; + try { deleteObsoleteRawSegments(); - long nextSegment = tryReserveNextSegmentOrWait(); - if (nextSegment == -1) + currReservedSegment = tryReserveNextSegmentOrWait(); + if (currReservedSegment == -1) continue; - File tmpZip = new File(walArchiveDir, FileDescriptor.fileName(nextSegment) + ".zip" + ".tmp"); + File tmpZip = new File(walArchiveDir, FileWriteAheadLogManager.FileDescriptor.fileName(currReservedSegment) + ".zip" + ".tmp"); - File zip = new File(walArchiveDir, FileDescriptor.fileName(nextSegment) + ".zip"); + File zip = new File(walArchiveDir, FileWriteAheadLogManager.FileDescriptor.fileName(currReservedSegment) + ".zip"); - File raw = new File(walArchiveDir, FileDescriptor.fileName(nextSegment)); + File raw = new File(walArchiveDir, FileWriteAheadLogManager.FileDescriptor.fileName(currReservedSegment)); if (!Files.exists(raw.toPath())) throw new IgniteCheckedException("WAL archive segment is missing: " + raw); - compressSegmentToFile(nextSegment, raw, tmpZip); + compressSegmentToFile(currReservedSegment, raw, tmpZip); Files.move(tmpZip.toPath(), zip.toPath()); - if (mode == WALMode.FSYNC) { + if (mode != WALMode.NONE) { try (FileIO f0 = ioFactory.create(zip, CREATE, READ, WRITE)) { f0.force(); } } - lastCompressedIdx = nextSegment; + lastCompressedIdx = currReservedSegment; } catch (IgniteCheckedException | IOException e) { - U.error(log, "Unexpected error during WAL compression", e); + U.error(log, "Compression of WAL segment [idx=" + currReservedSegment + + "] was skipped due to unexpected error", e); + + lastCompressedIdx++; } - catch (InterruptedException e) { + catch (InterruptedException ignore) { Thread.currentThread().interrupt(); } + finally { + try { + if (currReservedSegment != -1) + release(new FileWALPointer(currReservedSegment, 0, 0)); + } + catch (IgniteCheckedException e) { + U.error(log, "Can't release raw WAL segment [idx=" + currReservedSegment + + "] after compression", e); + } + } } } @@ -1804,9 +1832,6 @@ public class FsyncModeFileWriteAheadLogManager extends GridCacheSharedManagerAda iter.nextX(); } } - finally { - release(new FileWALPointer(nextSegment, 0, 0)); - } } /** http://git-wip-us.apache.org/repos/asf/ignite/blob/47dcc2c9/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/db/wal/WalCompactionTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/db/wal/WalCompactionTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/db/wal/WalCompactionTest.java index c1e8967..26c5ac5 100644 --- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/db/wal/WalCompactionTest.java +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/db/wal/WalCompactionTest.java @@ -18,6 +18,7 @@ package org.apache.ignite.internal.processors.cache.persistence.db.wal; import java.io.File; import java.io.FilenameFilter; +import java.io.RandomAccessFile; import java.util.Arrays; import java.util.Comparator; import org.apache.ignite.IgniteCache; @@ -57,6 +58,12 @@ public class WalCompactionTest extends GridCommonAbstractTest { /** Entries count. */ public static final int ENTRIES = 1000; + /** Compaction enabled flag. */ + private boolean compactionEnabled; + + /** Wal mode. */ + private WALMode walMode; + /** {@inheritDoc} */ @Override protected IgniteConfiguration getConfiguration(String name) throws Exception { IgniteConfiguration cfg = super.getConfiguration(name); @@ -67,10 +74,10 @@ public class WalCompactionTest extends GridCommonAbstractTest { .setDefaultDataRegionConfiguration(new DataRegionConfiguration() .setPersistenceEnabled(true) .setMaxSize(200 * 1024 * 1024)) - .setWalMode(WALMode.LOG_ONLY) + .setWalMode(walMode) .setWalSegmentSize(WAL_SEGMENT_SIZE) .setWalHistorySize(500) - .setWalCompactionEnabled(true)); + .setWalCompactionEnabled(compactionEnabled)); CacheConfiguration ccfg = new CacheConfiguration(); @@ -91,6 +98,10 @@ public class WalCompactionTest extends GridCommonAbstractTest { stopAllGrids(); cleanPersistenceDir(); + + compactionEnabled = true; + + walMode = WALMode.LOG_ONLY; } /** {@inheritDoc} */ @@ -125,7 +136,7 @@ public class WalCompactionTest extends GridCommonAbstractTest { ig.context().cache().context().database().wakeupForCheckpoint("Forced checkpoint").get(); ig.context().cache().context().database().wakeupForCheckpoint("Forced checkpoint").get(); - Thread.sleep(15_000); // Allow compressor to archive WAL segments. + Thread.sleep(15_000); // Allow compressor to compress WAL segments. String nodeFolderName = ig.context().pdsFolderResolver().resolveFolders().folderName(); @@ -189,6 +200,86 @@ public class WalCompactionTest extends GridCommonAbstractTest { } /** + * + */ + public void testCompressorToleratesEmptyWalSegmentsFsync() throws Exception { + testCompressorToleratesEmptyWalSegments(WALMode.FSYNC); + } + + /** + * + */ + public void testCompressorToleratesEmptyWalSegmentsLogOnly() throws Exception { + testCompressorToleratesEmptyWalSegments(WALMode.LOG_ONLY); + } + + /** + * Tests that WAL compaction won't be stopped by single broken WAL segment. + */ + private void testCompressorToleratesEmptyWalSegments(WALMode walMode) throws Exception { + this.walMode = walMode; + compactionEnabled = false; + + IgniteEx ig = startGrid(0); + ig.cluster().active(true); + + IgniteCache<Integer, byte[]> cache = ig.cache("cache"); + + for (int i = 0; i < 2500; i++) { // At least 50MB of raw data in total. + final byte[] val = new byte[20000]; + + val[i] = 1; + + cache.put(i, val); + } + + // WAL archive segment is allowed to be compressed when it's at least one checkpoint away from current WAL head. + ig.context().cache().context().database().wakeupForCheckpoint("Forced checkpoint").get(); + ig.context().cache().context().database().wakeupForCheckpoint("Forced checkpoint").get(); + + String nodeFolderName = ig.context().pdsFolderResolver().resolveFolders().folderName(); + + stopAllGrids(); + + int emptyIdx = 5; + + File dbDir = U.resolveWorkDirectory(U.defaultWorkDirectory(), "db", false); + File walDir = new File(dbDir, "wal"); + File archiveDir = new File(walDir, "archive"); + File nodeArchiveDir = new File(archiveDir, nodeFolderName); + File walSegment = new File(nodeArchiveDir, FileWriteAheadLogManager.FileDescriptor.fileName(emptyIdx)); + + try (RandomAccessFile raf = new RandomAccessFile(walSegment, "rw")) { + raf.setLength(0); // Clear wal segment, but don't delete. + } + + compactionEnabled = true; + + ig = startGrid(0); + ig.cluster().active(true); + + Thread.sleep(15_000); // Allow compressor to compress WAL segments. + + File[] compressedSegments = nodeArchiveDir.listFiles(new FilenameFilter() { + @Override public boolean accept(File dir, String name) { + return name.endsWith(".wal.zip"); + } + }); + + long maxIdx = -1; + for (File f : compressedSegments) { + String idxPart = f.getName().substring(0, f.getName().length() - ".wal.zip".length()); + + maxIdx = Math.max(maxIdx, Long.parseLong(idxPart)); + } + + System.out.println("Max compressed index: " + maxIdx); + assertTrue(maxIdx > emptyIdx); + + assertTrue(walSegment.exists()); // Failed to compress WAL segment shoudn't be deleted. + } + + /** * @throws Exception If failed. */ public void testSeekingStartInCompactedSegment() throws Exception { @@ -241,7 +332,7 @@ public class WalCompactionTest extends GridCommonAbstractTest { ig.context().cache().context().database().wakeupForCheckpoint("Forced checkpoint").get(); ig.context().cache().context().database().wakeupForCheckpoint("Forced checkpoint").get(); - Thread.sleep(15_000); // Allow compressor to archive WAL segments. + Thread.sleep(15_000); // Allow compressor to compress WAL segments. File walDir = new File(dbDir, "wal"); File archiveDir = new File(walDir, "archive");
