Repository: ignite
Updated Branches:
  refs/heads/master c4d859c92 -> 47dcc2c9b


IGNITE-8393 Unexpected error during WAL compression fixed

Signed-off-by: Andrey Gura <[email protected]>


Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/47dcc2c9
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/47dcc2c9
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/47dcc2c9

Branch: refs/heads/master
Commit: 47dcc2c9b6986f8ff189765125edba610072043e
Parents: c4d859c
Author: Ivan Rakov <[email protected]>
Authored: Fri Apr 27 15:06:08 2018 +0300
Committer: Andrey Gura <[email protected]>
Committed: Fri Apr 27 15:06:54 2018 +0300

----------------------------------------------------------------------
 .../GridCacheDatabaseSharedManager.java         |  6 +-
 .../wal/FileWriteAheadLogManager.java           | 53 ++++++++---
 .../wal/FsyncModeFileWriteAheadLogManager.java  | 59 ++++++++----
 .../persistence/db/wal/WalCompactionTest.java   | 99 +++++++++++++++++++-
 4 files changed, 180 insertions(+), 37 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/ignite/blob/47dcc2c9/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/GridCacheDatabaseSharedManager.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/GridCacheDatabaseSharedManager.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/GridCacheDatabaseSharedManager.java
index 7c23cad..8c5e32f 100755
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/GridCacheDatabaseSharedManager.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/GridCacheDatabaseSharedManager.java
@@ -780,8 +780,10 @@ public class GridCacheDatabaseSharedManager extends 
IgniteCacheDatabaseSharedMan
 
             WALPointer restore = restoreMemory(status);
 
-            if (restore == null && status.endPtr != CheckpointStatus.NULL_PTR)
-                throw new StorageException("Restore wal pointer = " + restore 
+ ", while status.endPtr = " + status.endPtr + ".");
+            if (restore == null && status.endPtr != CheckpointStatus.NULL_PTR) 
{
+                throw new StorageException("Restore wal pointer = " + restore 
+ ", while status.endPtr = " +
+                    status.endPtr + ". Can't restore memory - critical part of 
WAL archive is missing.");
+            }
 
             // First, bring memory to the last consistent checkpoint state if 
needed.
             // This method should return a pointer to the last valid record in 
the WAL.

http://git-wip-us.apache.org/repos/asf/ignite/blob/47dcc2c9/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/FileWriteAheadLogManager.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/FileWriteAheadLogManager.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/FileWriteAheadLogManager.java
index a28b73b..7795344 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/FileWriteAheadLogManager.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/FileWriteAheadLogManager.java
@@ -39,8 +39,10 @@ import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.Collection;
 import java.util.HashMap;
+import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
+import java.util.Set;
 import java.util.TreeSet;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.PriorityBlockingQueue;
@@ -1869,17 +1871,28 @@ public class FileWriteAheadLogManager extends 
GridCacheSharedManagerAdapter impl
         }
 
         /**
-         *
+         * Deletes raw WAL segments if they aren't locked and already have 
compressed copies of themselves.
          */
         private void deleteObsoleteRawSegments() {
-            FileDescriptor[] descs = 
scan(walArchiveDir.listFiles(WAL_SEGMENT_FILE_FILTER));
+            FileDescriptor[] descs = 
scan(walArchiveDir.listFiles(WAL_SEGMENT_COMPACTED_OR_RAW_FILE_FILTER));
+
+            Set<Long> indices = new HashSet<>();
+            Set<Long> duplicateIndices = new HashSet<>();
+
+            for (FileDescriptor desc : descs) {
+                if (!indices.add(desc.idx))
+                    duplicateIndices.add(desc.idx);
+            }
 
             for (FileDescriptor desc : descs) {
+                if (desc.isCompressed())
+                    continue;
+
                 // Do not delete reserved or locked segment and any segment 
after it.
                 if (segmentReservedOrLocked(desc.idx))
                     return;
 
-                if (desc.idx < lastCompressedIdx) {
+                if (desc.idx < lastCompressedIdx && 
duplicateIndices.contains(desc.idx)) {
                     if (!desc.file.delete())
                         U.warn(log, "Failed to remove obsolete WAL segment 
(make sure the process has enough rights): " +
                             desc.file.getAbsolutePath() + ", exists: " + 
desc.file.exists());
@@ -1892,22 +1905,24 @@ public class FileWriteAheadLogManager extends 
GridCacheSharedManagerAdapter impl
             init();
 
             while (!Thread.currentThread().isInterrupted() && !stopped) {
+                long currReservedSegment = -1;
+
                 try {
                     deleteObsoleteRawSegments();
 
-                    long nextSegment = tryReserveNextSegmentOrWait();
-                    if (nextSegment == -1)
+                    currReservedSegment = tryReserveNextSegmentOrWait();
+                    if (currReservedSegment == -1)
                         continue;
 
-                    File tmpZip = new File(walArchiveDir, 
FileDescriptor.fileName(nextSegment) + ".zip" + ".tmp");
+                    File tmpZip = new File(walArchiveDir, 
FileDescriptor.fileName(currReservedSegment) + ".zip" + ".tmp");
 
-                    File zip = new File(walArchiveDir, 
FileDescriptor.fileName(nextSegment) + ".zip");
+                    File zip = new File(walArchiveDir, 
FileDescriptor.fileName(currReservedSegment) + ".zip");
 
-                    File raw = new File(walArchiveDir, 
FileDescriptor.fileName(nextSegment));
+                    File raw = new File(walArchiveDir, 
FileDescriptor.fileName(currReservedSegment));
                     if (!Files.exists(raw.toPath()))
                         throw new IgniteCheckedException("WAL archive segment 
is missing: " + raw);
 
-                    compressSegmentToFile(nextSegment, raw, tmpZip);
+                    compressSegmentToFile(currReservedSegment, raw, tmpZip);
 
                     Files.move(tmpZip.toPath(), zip.toPath());
 
@@ -1917,14 +1932,27 @@ public class FileWriteAheadLogManager extends 
GridCacheSharedManagerAdapter impl
                         }
                     }
 
-                    lastCompressedIdx = nextSegment;
+                    lastCompressedIdx = currReservedSegment;
                 }
                 catch (IgniteCheckedException | IOException e) {
-                    U.error(log, "Unexpected error during WAL compression", e);
+                    U.error(log, "Compression of WAL segment [idx=" + 
currReservedSegment +
+                        "] was skipped due to unexpected error", e);
+
+                    lastCompressedIdx++;
                 }
                 catch (InterruptedException ignore) {
                     Thread.currentThread().interrupt();
                 }
+                finally {
+                    try {
+                        if (currReservedSegment != -1)
+                            release(new FileWALPointer(currReservedSegment, 0, 
0));
+                    }
+                    catch (IgniteCheckedException e) {
+                        U.error(log, "Can't release raw WAL segment [idx=" + 
currReservedSegment +
+                            "] after compression", e);
+                    }
+                }
             }
         }
 
@@ -1977,9 +2005,6 @@ public class FileWriteAheadLogManager extends 
GridCacheSharedManagerAdapter impl
 
                 zos.write(heapBuf.array());
             }
-            finally {
-                release(new FileWALPointer(nextSegment, 0, 0));
-            }
         }
 
         /**

http://git-wip-us.apache.org/repos/asf/ignite/blob/47dcc2c9/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/FsyncModeFileWriteAheadLogManager.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/FsyncModeFileWriteAheadLogManager.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/FsyncModeFileWriteAheadLogManager.java
index e354b43..dfb1c41 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/FsyncModeFileWriteAheadLogManager.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/FsyncModeFileWriteAheadLogManager.java
@@ -34,9 +34,11 @@ import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.Collection;
 import java.util.HashMap;
+import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
 import java.util.NavigableMap;
+import java.util.Set;
 import java.util.TreeMap;
 import java.util.TreeSet;
 import java.util.concurrent.PriorityBlockingQueue;
@@ -1703,19 +1705,30 @@ public class FsyncModeFileWriteAheadLogManager extends 
GridCacheSharedManagerAda
         }
 
         /**
-         *
+         * Deletes raw WAL segments if they aren't locked and already have 
compressed copies of themselves.
          */
         private void deleteObsoleteRawSegments() {
-            FileDescriptor[] descs = 
scan(walArchiveDir.listFiles(WAL_SEGMENT_FILE_FILTER));
+            FsyncModeFileWriteAheadLogManager.FileDescriptor[] descs = 
scan(walArchiveDir.listFiles(WAL_SEGMENT_COMPACTED_OR_RAW_FILE_FILTER));
+
+            Set<Long> indices = new HashSet<>();
+            Set<Long> duplicateIndices = new HashSet<>();
+
+            for (FsyncModeFileWriteAheadLogManager.FileDescriptor desc : 
descs) {
+                if (!indices.add(desc.idx))
+                    duplicateIndices.add(desc.idx);
+            }
 
             FileArchiver archiver0 = archiver;
 
-            for (FileDescriptor desc : descs) {
+            for (FsyncModeFileWriteAheadLogManager.FileDescriptor desc : 
descs) {
+                if (desc.isCompressed())
+                    continue;
+
                 // Do not delete reserved or locked segment and any segment 
after it.
                 if (archiver0 != null && archiver0.reserved(desc.idx))
                     return;
 
-                if (desc.idx < lastCompressedIdx) {
+                if (desc.idx < lastCompressedIdx && 
duplicateIndices.contains(desc.idx)) {
                     if (!desc.file.delete())
                         U.warn(log, "Failed to remove obsolete WAL segment 
(make sure the process has enough rights): " +
                             desc.file.getAbsolutePath() + ", exists: " + 
desc.file.exists());
@@ -1728,39 +1741,54 @@ public class FsyncModeFileWriteAheadLogManager extends 
GridCacheSharedManagerAda
             init();
 
             while (!Thread.currentThread().isInterrupted() && !stopped) {
+                long currReservedSegment = -1;
+
                 try {
                     deleteObsoleteRawSegments();
 
-                    long nextSegment = tryReserveNextSegmentOrWait();
-                    if (nextSegment == -1)
+                    currReservedSegment = tryReserveNextSegmentOrWait();
+                    if (currReservedSegment == -1)
                         continue;
 
-                    File tmpZip = new File(walArchiveDir, 
FileDescriptor.fileName(nextSegment) + ".zip" + ".tmp");
+                    File tmpZip = new File(walArchiveDir, 
FileWriteAheadLogManager.FileDescriptor.fileName(currReservedSegment) + ".zip" 
+ ".tmp");
 
-                    File zip = new File(walArchiveDir, 
FileDescriptor.fileName(nextSegment) + ".zip");
+                    File zip = new File(walArchiveDir, 
FileWriteAheadLogManager.FileDescriptor.fileName(currReservedSegment) + ".zip");
 
-                    File raw = new File(walArchiveDir, 
FileDescriptor.fileName(nextSegment));
+                    File raw = new File(walArchiveDir, 
FileWriteAheadLogManager.FileDescriptor.fileName(currReservedSegment));
                     if (!Files.exists(raw.toPath()))
                         throw new IgniteCheckedException("WAL archive segment 
is missing: " + raw);
 
-                    compressSegmentToFile(nextSegment, raw, tmpZip);
+                    compressSegmentToFile(currReservedSegment, raw, tmpZip);
 
                     Files.move(tmpZip.toPath(), zip.toPath());
 
-                    if (mode == WALMode.FSYNC) {
+                    if (mode != WALMode.NONE) {
                         try (FileIO f0 = ioFactory.create(zip, CREATE, READ, 
WRITE)) {
                             f0.force();
                         }
                     }
 
-                    lastCompressedIdx = nextSegment;
+                    lastCompressedIdx = currReservedSegment;
                 }
                 catch (IgniteCheckedException | IOException e) {
-                    U.error(log, "Unexpected error during WAL compression", e);
+                    U.error(log, "Compression of WAL segment [idx=" + 
currReservedSegment +
+                        "] was skipped due to unexpected error", e);
+
+                    lastCompressedIdx++;
                 }
-                catch (InterruptedException e) {
+                catch (InterruptedException ignore) {
                     Thread.currentThread().interrupt();
                 }
+                finally {
+                    try {
+                        if (currReservedSegment != -1)
+                            release(new FileWALPointer(currReservedSegment, 0, 
0));
+                    }
+                    catch (IgniteCheckedException e) {
+                        U.error(log, "Can't release raw WAL segment [idx=" + 
currReservedSegment +
+                            "] after compression", e);
+                    }
+                }
             }
         }
 
@@ -1804,9 +1832,6 @@ public class FsyncModeFileWriteAheadLogManager extends 
GridCacheSharedManagerAda
                         iter.nextX();
                 }
             }
-            finally {
-                release(new FileWALPointer(nextSegment, 0, 0));
-            }
         }
 
         /**

http://git-wip-us.apache.org/repos/asf/ignite/blob/47dcc2c9/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/db/wal/WalCompactionTest.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/db/wal/WalCompactionTest.java
 
b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/db/wal/WalCompactionTest.java
index c1e8967..26c5ac5 100644
--- 
a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/db/wal/WalCompactionTest.java
+++ 
b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/db/wal/WalCompactionTest.java
@@ -18,6 +18,7 @@ package 
org.apache.ignite.internal.processors.cache.persistence.db.wal;
 
 import java.io.File;
 import java.io.FilenameFilter;
+import java.io.RandomAccessFile;
 import java.util.Arrays;
 import java.util.Comparator;
 import org.apache.ignite.IgniteCache;
@@ -57,6 +58,12 @@ public class WalCompactionTest extends 
GridCommonAbstractTest {
     /** Entries count. */
     public static final int ENTRIES = 1000;
 
+    /** Compaction enabled flag. */
+    private boolean compactionEnabled;
+
+    /** Wal mode. */
+    private WALMode walMode;
+
     /** {@inheritDoc} */
     @Override protected IgniteConfiguration getConfiguration(String name) 
throws Exception {
         IgniteConfiguration cfg = super.getConfiguration(name);
@@ -67,10 +74,10 @@ public class WalCompactionTest extends 
GridCommonAbstractTest {
             .setDefaultDataRegionConfiguration(new DataRegionConfiguration()
                 .setPersistenceEnabled(true)
                 .setMaxSize(200 * 1024 * 1024))
-            .setWalMode(WALMode.LOG_ONLY)
+            .setWalMode(walMode)
             .setWalSegmentSize(WAL_SEGMENT_SIZE)
             .setWalHistorySize(500)
-            .setWalCompactionEnabled(true));
+            .setWalCompactionEnabled(compactionEnabled));
 
         CacheConfiguration ccfg = new CacheConfiguration();
 
@@ -91,6 +98,10 @@ public class WalCompactionTest extends 
GridCommonAbstractTest {
         stopAllGrids();
 
         cleanPersistenceDir();
+
+        compactionEnabled = true;
+
+        walMode = WALMode.LOG_ONLY;
     }
 
     /** {@inheritDoc} */
@@ -125,7 +136,7 @@ public class WalCompactionTest extends 
GridCommonAbstractTest {
         ig.context().cache().context().database().wakeupForCheckpoint("Forced 
checkpoint").get();
         ig.context().cache().context().database().wakeupForCheckpoint("Forced 
checkpoint").get();
 
-        Thread.sleep(15_000); // Allow compressor to archive WAL segments.
+        Thread.sleep(15_000); // Allow compressor to compress WAL segments.
 
         String nodeFolderName = 
ig.context().pdsFolderResolver().resolveFolders().folderName();
 
@@ -189,6 +200,86 @@ public class WalCompactionTest extends 
GridCommonAbstractTest {
     }
 
     /**
+     *
+     */
+    public void testCompressorToleratesEmptyWalSegmentsFsync() throws 
Exception {
+        testCompressorToleratesEmptyWalSegments(WALMode.FSYNC);
+    }
+
+    /**
+     *
+     */
+    public void testCompressorToleratesEmptyWalSegmentsLogOnly() throws 
Exception {
+        testCompressorToleratesEmptyWalSegments(WALMode.LOG_ONLY);
+    }
+
+    /**
+     * Tests that WAL compaction won't be stopped by single broken WAL segment.
+     */
+    private void testCompressorToleratesEmptyWalSegments(WALMode walMode) 
throws Exception {
+        this.walMode = walMode;
+        compactionEnabled = false;
+
+        IgniteEx ig = startGrid(0);
+        ig.cluster().active(true);
+
+        IgniteCache<Integer, byte[]> cache = ig.cache("cache");
+
+        for (int i = 0; i < 2500; i++) { // At least 50MB of raw data in total.
+            final byte[] val = new byte[20000];
+
+            val[i] = 1;
+
+            cache.put(i, val);
+        }
+
+        // WAL archive segment is allowed to be compressed when it's at least 
one checkpoint away from current WAL head.
+        ig.context().cache().context().database().wakeupForCheckpoint("Forced 
checkpoint").get();
+        ig.context().cache().context().database().wakeupForCheckpoint("Forced 
checkpoint").get();
+
+        String nodeFolderName = 
ig.context().pdsFolderResolver().resolveFolders().folderName();
+
+        stopAllGrids();
+
+        int emptyIdx = 5;
+
+        File dbDir = U.resolveWorkDirectory(U.defaultWorkDirectory(), "db", 
false);
+        File walDir = new File(dbDir, "wal");
+        File archiveDir = new File(walDir, "archive");
+        File nodeArchiveDir = new File(archiveDir, nodeFolderName);
+        File walSegment = new File(nodeArchiveDir, 
FileWriteAheadLogManager.FileDescriptor.fileName(emptyIdx));
+
+        try (RandomAccessFile raf = new RandomAccessFile(walSegment, "rw")) {
+            raf.setLength(0); // Clear wal segment, but don't delete.
+        }
+
+        compactionEnabled = true;
+
+        ig = startGrid(0);
+        ig.cluster().active(true);
+
+        Thread.sleep(15_000); // Allow compressor to compress WAL segments.
+
+        File[] compressedSegments = nodeArchiveDir.listFiles(new 
FilenameFilter() {
+            @Override public boolean accept(File dir, String name) {
+                return name.endsWith(".wal.zip");
+            }
+        });
+
+        long maxIdx = -1;
+        for (File f : compressedSegments) {
+            String idxPart = f.getName().substring(0, f.getName().length() - 
".wal.zip".length());
+
+            maxIdx = Math.max(maxIdx, Long.parseLong(idxPart));
+        }
+
+        System.out.println("Max compressed index: " + maxIdx);
+        assertTrue(maxIdx > emptyIdx);
+
+        assertTrue(walSegment.exists()); // Failed to compress WAL segment 
shoudn't be deleted.
+    }
+
+    /**
      * @throws Exception If failed.
      */
     public void testSeekingStartInCompactedSegment() throws Exception {
@@ -241,7 +332,7 @@ public class WalCompactionTest extends 
GridCommonAbstractTest {
         ig.context().cache().context().database().wakeupForCheckpoint("Forced 
checkpoint").get();
         ig.context().cache().context().database().wakeupForCheckpoint("Forced 
checkpoint").get();
 
-        Thread.sleep(15_000); // Allow compressor to archive WAL segments.
+        Thread.sleep(15_000); // Allow compressor to compress WAL segments.
 
         File walDir = new File(dbDir, "wal");
         File archiveDir = new File(walDir, "archive");

Reply via email to