This is an automated email from the ASF dual-hosted git repository.

irakov pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/ignite.git


The following commit(s) were added to refs/heads/master by this push:
     new 214044a  IGNITE-12459 Searching checkpoint record in WAL doesn't work 
with segment compaction - Fixes #7148.
214044a is described below

commit 214044ae2e6dedb1ee5fc219f54a4de51f889a08
Author: Anton Kalashnikov <[email protected]>
AuthorDate: Mon Jan 20 20:20:13 2020 +0300

    IGNITE-12459 Searching checkpoint record in WAL doesn't work with segment 
compaction - Fixes #7148.
    
    Signed-off-by: Ivan Rakov <[email protected]>
---
 .../persistence/wal/FileWriteAheadLogManager.java  |  2 +-
 .../persistence/db/wal/WalCompactionTest.java      | 77 +++++++++++-----------
 2 files changed, 38 insertions(+), 41 deletions(-)

diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/FileWriteAheadLogManager.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/FileWriteAheadLogManager.java
index 851526e..f534662 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/FileWriteAheadLogManager.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/FileWriteAheadLogManager.java
@@ -908,7 +908,7 @@ public class FileWriteAheadLogManager extends 
GridCacheSharedManagerAdapter impl
         try (WALIterator it = replay(ptr)) {
             IgniteBiTuple<WALPointer, WALRecord> rec = it.next();
 
-            if (rec.get1().equals(ptr))
+            if (rec != null && rec.get2().position().equals(ptr))
                 return rec.get2();
             else
                 throw new StorageException("Failed to read record by pointer 
[ptr=" + ptr + ", rec=" + rec + "]");
diff --git 
a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/db/wal/WalCompactionTest.java
 
b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/db/wal/WalCompactionTest.java
index 9ef9363..5b9fd1d 100644
--- 
a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/db/wal/WalCompactionTest.java
+++ 
b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/db/wal/WalCompactionTest.java
@@ -20,8 +20,9 @@ import java.io.File;
 import java.io.FilenameFilter;
 import java.io.RandomAccessFile;
 import java.nio.ByteBuffer;
+import java.nio.file.Paths;
 import java.util.Arrays;
-import java.util.Comparator;
+import java.util.Set;
 import org.apache.ignite.Ignite;
 import org.apache.ignite.IgniteCache;
 import org.apache.ignite.IgniteCheckedException;
@@ -41,13 +42,16 @@ import 
org.apache.ignite.internal.pagemem.wal.record.PageSnapshot;
 import org.apache.ignite.internal.pagemem.wal.record.RolloverType;
 import org.apache.ignite.internal.processors.cache.persistence.DummyPageIO;
 import 
org.apache.ignite.internal.processors.cache.persistence.IgniteCacheDatabaseSharedManager;
-import 
org.apache.ignite.internal.processors.cache.persistence.file.FilePageStoreManager;
 import 
org.apache.ignite.internal.processors.cache.persistence.wal.FileDescriptor;
 import org.apache.ignite.internal.util.GridUnsafe;
 import org.apache.ignite.internal.util.typedef.internal.U;
+import org.apache.ignite.testframework.GridTestUtils;
 import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest;
 import org.junit.Test;
 
+import static java.util.stream.Collectors.toSet;
+import static 
org.apache.ignite.internal.processors.cache.persistence.file.FilePageStoreManager.ZIP_SUFFIX;
+
 /**
  *
  */
@@ -184,17 +188,17 @@ public class WalCompactionTest extends 
GridCommonAbstractTest {
         ig.context().cache().context().database().wakeupForCheckpoint("Forced 
checkpoint").get();
         ig.context().cache().context().database().wakeupForCheckpoint("Forced 
checkpoint").get();
 
-        Thread.sleep(15_000); // Allow compressor to compress WAL segments.
-
         String nodeFolderName = 
ig.context().pdsFolderResolver().resolveFolders().folderName();
 
         File dbDir = U.resolveWorkDirectory(U.defaultWorkDirectory(), "db", 
false);
         File walDir = new File(dbDir, "wal");
         File archiveDir = new File(walDir, "archive");
         File nodeArchiveDir = new File(archiveDir, nodeFolderName);
-        File walSegment = new File(nodeArchiveDir, FileDescriptor.fileName(0) 
+ FilePageStoreManager.ZIP_SUFFIX);
+        File walSegment = new File(nodeArchiveDir, FileDescriptor.fileName(0) 
+ ZIP_SUFFIX);
+
+        // Allow compressor to compress WAL segments.
+        assertTrue(GridTestUtils.waitForCondition(walSegment::exists, 15_000));
 
-        assertTrue(walSegment.exists());
         assertTrue(walSegment.length() < WAL_SEGMENT_SIZE / 2); // Should be 
compressed at least in half.
 
         stopAllGrids();
@@ -327,7 +331,8 @@ public class WalCompactionTest extends 
GridCommonAbstractTest {
 
         do {
             Thread.yield();
-        } while(walMgr.lastArchivedSegment() < 0 && 
(System.currentTimeMillis() - start < 15_000));
+        }
+        while (walMgr.lastArchivedSegment() < 0 && (System.currentTimeMillis() 
- start < 15_000));
 
         assertTrue(System.currentTimeMillis() - start < 15_000);
 
@@ -379,6 +384,7 @@ public class WalCompactionTest extends 
GridCommonAbstractTest {
         File archiveDir = new File(walDir, "archive");
         File nodeArchiveDir = new File(archiveDir, nodeFolderName);
         File walSegment = new File(nodeArchiveDir, 
FileDescriptor.fileName(emptyIdx));
+        File zippedWalSegment = new File(nodeArchiveDir, 
FileDescriptor.fileName(emptyIdx + 1) + ZIP_SUFFIX);
 
         long start = U.currentTimeMillis();
         do {
@@ -393,14 +399,16 @@ public class WalCompactionTest extends 
GridCommonAbstractTest {
                 throw new IgniteCheckedException("Can't trucate: " + 
walSegment.getAbsolutePath());
 
             Thread.yield();
-        } while (true);
+        }
+        while (true);
 
         compactionEnabled = true;
 
         ig = startGrid(0);
         ig.cluster().active(true);
 
-        Thread.sleep(15_000); // Allow compressor to compress WAL segments.
+        // Allow compressor to compress WAL segments.
+        assertTrue(GridTestUtils.waitForCondition(zippedWalSegment::exists, 
15_000));
 
         File[] compressedSegments = nodeArchiveDir.listFiles(new 
FilenameFilter() {
             @Override public boolean accept(File dir, String name) {
@@ -438,13 +446,11 @@ public class WalCompactionTest extends 
GridCommonAbstractTest {
      */
     @Test
     public void testSeekingStartInCompactedSegment() throws Exception {
-        IgniteEx ig = (IgniteEx)startGrids(3);
+        IgniteEx ig = startGrids(3);
         ig.cluster().active(true);
 
         IgniteCache<Integer, byte[]> cache = ig.cache(CACHE_NAME);
 
-        final int pageSize = 
ig.cachex(CACHE_NAME).context().dataRegion().pageMemory().pageSize();
-
         for (int i = 0; i < 100; i++) {
             final byte[] val = new byte[20000];
 
@@ -462,16 +468,9 @@ public class WalCompactionTest extends 
GridCommonAbstractTest {
         File nodeLfsDir = new File(dbDir, nodeFolderName);
         File cpMarkersDir = new File(nodeLfsDir, "cp");
 
-        final File[] cpMarkersToSave = cpMarkersDir.listFiles();
-
-        assert cpMarkersToSave != null;
-        assertTrue(cpMarkersToSave.length >= 2);
+        Set<String> cpMarkersToSave = 
Arrays.stream(cpMarkersDir.listFiles()).map(File::getName).collect(toSet());
 
-        Arrays.sort(cpMarkersToSave, new Comparator<File>() {
-            @Override public int compare(File o1, File o2) {
-                return o1.getName().compareTo(o2.getName());
-            }
-        });
+        assertTrue(cpMarkersToSave.size() >= 2);
 
         for (int i = 100; i < ENTRIES; i++) { // At least 20MB of raw data in 
total.
             final byte[] val = new byte[20000];
@@ -479,8 +478,15 @@ public class WalCompactionTest extends 
GridCommonAbstractTest {
             val[i] = 1;
 
             cache.put(i, val);
+
+            //It trigger checkout in the middle of put that it shifts 
'keepUncompressedIdx'
+            // to allow the compressor to delete unzipped segments.
+            if (i % 100 == 0)
+                
ig.context().cache().context().database().wakeupForCheckpoint("Forced 
checkpoint").get();
         }
 
+        final int pageSize = 
ig.cachex(CACHE_NAME).context().dataRegion().pageMemory().pageSize();
+
         byte[] dummyPage = dummyPage(pageSize);
 
         // Spam WAL to move all data records to compressible WAL zone.
@@ -493,28 +499,19 @@ public class WalCompactionTest extends 
GridCommonAbstractTest {
         ig.context().cache().context().database().wakeupForCheckpoint("Forced 
checkpoint").get();
         ig.context().cache().context().database().wakeupForCheckpoint("Forced 
checkpoint").get();
 
-        Thread.sleep(15_000); // Allow compressor to compress WAL segments.
+        File nodeArchiveDir = dbDir.toPath().resolve(Paths.get("wal", 
"archive", nodeFolderName)).toFile();
+        File unzippedWalSegment = new File(nodeArchiveDir, 
FileDescriptor.fileName(0));
+        File walSegment = new File(nodeArchiveDir, FileDescriptor.fileName(0) 
+ ZIP_SUFFIX);
 
-        File walDir = new File(dbDir, "wal");
-        File archiveDir = new File(walDir, "archive");
-        File nodeArchiveDir = new File(archiveDir, nodeFolderName);
-        File walSegment = new File(nodeArchiveDir, FileDescriptor.fileName(0) 
+ FilePageStoreManager.ZIP_SUFFIX);
+        // Allow compressor to compress WAL segments.
+        assertTrue(GridTestUtils.waitForCondition(() -> 
!unzippedWalSegment.exists(), 15_000));
 
         assertTrue(walSegment.exists());
         assertTrue(walSegment.length() < WAL_SEGMENT_SIZE / 2); // Should be 
compressed at least in half.
 
         stopAllGrids();
 
-        File[] cpMarkers = cpMarkersDir.listFiles(new FilenameFilter() {
-            @Override public boolean accept(File dir, String name) {
-                for (File cpMarker : cpMarkersToSave) {
-                    if (cpMarker.getName().equals(name))
-                        return false;
-                }
-
-                return true;
-            }
-        });
+        File[] cpMarkers = cpMarkersDir.listFiles((dir, name) -> 
!cpMarkersToSave.contains(name));
 
         assertNotNull(cpMarkers);
         assertTrue(cpMarkers.length > 0);
@@ -532,7 +529,7 @@ public class WalCompactionTest extends 
GridCommonAbstractTest {
         for (File f : lfsFiles)
             f.delete();
 
-        ig = (IgniteEx)startGrids(3);
+        ig = startGrids(3);
 
         awaitPartitionMapExchange();
 
@@ -545,7 +542,7 @@ public class WalCompactionTest extends 
GridCommonAbstractTest {
                 missing++;
         }
 
-        System.out.println(">>> Missing " + missing + " entries logged before 
WAL iteration start");
+        log.info(">>> Missing " + missing + " entries logged before WAL 
iteration start");
         assertTrue(missing > 0);
 
         boolean fail = false;
@@ -555,12 +552,12 @@ public class WalCompactionTest extends 
GridCommonAbstractTest {
             byte[] arr = cache.get(i);
 
             if (arr == null) {
-                System.out.println(">>> Missing: " + i);
+                log.info(">>> Missing: " + i);
 
                 fail = true;
             }
             else if (arr[i] != 1) {
-                System.out.println(">>> Corrupted: " + i);
+                log.info(">>> Corrupted: " + i);
 
                 fail = true;
             }

Reply via email to