This is an automated email from the ASF dual-hosted git repository.
irakov pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/ignite.git
The following commit(s) were added to refs/heads/master by this push:
new 214044a IGNITE-12459 Searching checkpoint record in WAL doesn't work
with segment compaction - Fixes #7148.
214044a is described below
commit 214044ae2e6dedb1ee5fc219f54a4de51f889a08
Author: Anton Kalashnikov <[email protected]>
AuthorDate: Mon Jan 20 20:20:13 2020 +0300
IGNITE-12459 Searching checkpoint record in WAL doesn't work with segment
compaction - Fixes #7148.
Signed-off-by: Ivan Rakov <[email protected]>
---
.../persistence/wal/FileWriteAheadLogManager.java | 2 +-
.../persistence/db/wal/WalCompactionTest.java | 77 +++++++++++-----------
2 files changed, 38 insertions(+), 41 deletions(-)
diff --git
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/FileWriteAheadLogManager.java
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/FileWriteAheadLogManager.java
index 851526e..f534662 100644
---
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/FileWriteAheadLogManager.java
+++
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/FileWriteAheadLogManager.java
@@ -908,7 +908,7 @@ public class FileWriteAheadLogManager extends
GridCacheSharedManagerAdapter impl
try (WALIterator it = replay(ptr)) {
IgniteBiTuple<WALPointer, WALRecord> rec = it.next();
- if (rec.get1().equals(ptr))
+ if (rec != null && rec.get2().position().equals(ptr))
return rec.get2();
else
throw new StorageException("Failed to read record by pointer
[ptr=" + ptr + ", rec=" + rec + "]");
diff --git
a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/db/wal/WalCompactionTest.java
b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/db/wal/WalCompactionTest.java
index 9ef9363..5b9fd1d 100644
---
a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/db/wal/WalCompactionTest.java
+++
b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/db/wal/WalCompactionTest.java
@@ -20,8 +20,9 @@ import java.io.File;
import java.io.FilenameFilter;
import java.io.RandomAccessFile;
import java.nio.ByteBuffer;
+import java.nio.file.Paths;
import java.util.Arrays;
-import java.util.Comparator;
+import java.util.Set;
import org.apache.ignite.Ignite;
import org.apache.ignite.IgniteCache;
import org.apache.ignite.IgniteCheckedException;
@@ -41,13 +42,16 @@ import
org.apache.ignite.internal.pagemem.wal.record.PageSnapshot;
import org.apache.ignite.internal.pagemem.wal.record.RolloverType;
import org.apache.ignite.internal.processors.cache.persistence.DummyPageIO;
import
org.apache.ignite.internal.processors.cache.persistence.IgniteCacheDatabaseSharedManager;
-import
org.apache.ignite.internal.processors.cache.persistence.file.FilePageStoreManager;
import
org.apache.ignite.internal.processors.cache.persistence.wal.FileDescriptor;
import org.apache.ignite.internal.util.GridUnsafe;
import org.apache.ignite.internal.util.typedef.internal.U;
+import org.apache.ignite.testframework.GridTestUtils;
import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest;
import org.junit.Test;
+import static java.util.stream.Collectors.toSet;
+import static
org.apache.ignite.internal.processors.cache.persistence.file.FilePageStoreManager.ZIP_SUFFIX;
+
/**
*
*/
@@ -184,17 +188,17 @@ public class WalCompactionTest extends
GridCommonAbstractTest {
ig.context().cache().context().database().wakeupForCheckpoint("Forced
checkpoint").get();
ig.context().cache().context().database().wakeupForCheckpoint("Forced
checkpoint").get();
- Thread.sleep(15_000); // Allow compressor to compress WAL segments.
-
String nodeFolderName =
ig.context().pdsFolderResolver().resolveFolders().folderName();
File dbDir = U.resolveWorkDirectory(U.defaultWorkDirectory(), "db",
false);
File walDir = new File(dbDir, "wal");
File archiveDir = new File(walDir, "archive");
File nodeArchiveDir = new File(archiveDir, nodeFolderName);
- File walSegment = new File(nodeArchiveDir, FileDescriptor.fileName(0)
+ FilePageStoreManager.ZIP_SUFFIX);
+ File walSegment = new File(nodeArchiveDir, FileDescriptor.fileName(0)
+ ZIP_SUFFIX);
+
+ // Allow compressor to compress WAL segments.
+ assertTrue(GridTestUtils.waitForCondition(walSegment::exists, 15_000));
- assertTrue(walSegment.exists());
assertTrue(walSegment.length() < WAL_SEGMENT_SIZE / 2); // Should be
compressed at least in half.
stopAllGrids();
@@ -327,7 +331,8 @@ public class WalCompactionTest extends
GridCommonAbstractTest {
do {
Thread.yield();
- } while(walMgr.lastArchivedSegment() < 0 &&
(System.currentTimeMillis() - start < 15_000));
+ }
+ while (walMgr.lastArchivedSegment() < 0 && (System.currentTimeMillis()
- start < 15_000));
assertTrue(System.currentTimeMillis() - start < 15_000);
@@ -379,6 +384,7 @@ public class WalCompactionTest extends
GridCommonAbstractTest {
File archiveDir = new File(walDir, "archive");
File nodeArchiveDir = new File(archiveDir, nodeFolderName);
File walSegment = new File(nodeArchiveDir,
FileDescriptor.fileName(emptyIdx));
+ File zippedWalSegment = new File(nodeArchiveDir,
FileDescriptor.fileName(emptyIdx + 1) + ZIP_SUFFIX);
long start = U.currentTimeMillis();
do {
@@ -393,14 +399,16 @@ public class WalCompactionTest extends
GridCommonAbstractTest {
throw new IgniteCheckedException("Can't trucate: " +
walSegment.getAbsolutePath());
Thread.yield();
- } while (true);
+ }
+ while (true);
compactionEnabled = true;
ig = startGrid(0);
ig.cluster().active(true);
- Thread.sleep(15_000); // Allow compressor to compress WAL segments.
+ // Allow compressor to compress WAL segments.
+ assertTrue(GridTestUtils.waitForCondition(zippedWalSegment::exists,
15_000));
File[] compressedSegments = nodeArchiveDir.listFiles(new
FilenameFilter() {
@Override public boolean accept(File dir, String name) {
@@ -438,13 +446,11 @@ public class WalCompactionTest extends
GridCommonAbstractTest {
*/
@Test
public void testSeekingStartInCompactedSegment() throws Exception {
- IgniteEx ig = (IgniteEx)startGrids(3);
+ IgniteEx ig = startGrids(3);
ig.cluster().active(true);
IgniteCache<Integer, byte[]> cache = ig.cache(CACHE_NAME);
- final int pageSize =
ig.cachex(CACHE_NAME).context().dataRegion().pageMemory().pageSize();
-
for (int i = 0; i < 100; i++) {
final byte[] val = new byte[20000];
@@ -462,16 +468,9 @@ public class WalCompactionTest extends
GridCommonAbstractTest {
File nodeLfsDir = new File(dbDir, nodeFolderName);
File cpMarkersDir = new File(nodeLfsDir, "cp");
- final File[] cpMarkersToSave = cpMarkersDir.listFiles();
-
- assert cpMarkersToSave != null;
- assertTrue(cpMarkersToSave.length >= 2);
+ Set<String> cpMarkersToSave =
Arrays.stream(cpMarkersDir.listFiles()).map(File::getName).collect(toSet());
- Arrays.sort(cpMarkersToSave, new Comparator<File>() {
- @Override public int compare(File o1, File o2) {
- return o1.getName().compareTo(o2.getName());
- }
- });
+ assertTrue(cpMarkersToSave.size() >= 2);
for (int i = 100; i < ENTRIES; i++) { // At least 20MB of raw data in
total.
final byte[] val = new byte[20000];
@@ -479,8 +478,15 @@ public class WalCompactionTest extends
GridCommonAbstractTest {
val[i] = 1;
cache.put(i, val);
+
+ //It trigger checkout in the middle of put that it shifts
'keepUncompressedIdx'
+ // to allow the compressor to delete unzipped segments.
+ if (i % 100 == 0)
+
ig.context().cache().context().database().wakeupForCheckpoint("Forced
checkpoint").get();
}
+ final int pageSize =
ig.cachex(CACHE_NAME).context().dataRegion().pageMemory().pageSize();
+
byte[] dummyPage = dummyPage(pageSize);
// Spam WAL to move all data records to compressible WAL zone.
@@ -493,28 +499,19 @@ public class WalCompactionTest extends
GridCommonAbstractTest {
ig.context().cache().context().database().wakeupForCheckpoint("Forced
checkpoint").get();
ig.context().cache().context().database().wakeupForCheckpoint("Forced
checkpoint").get();
- Thread.sleep(15_000); // Allow compressor to compress WAL segments.
+ File nodeArchiveDir = dbDir.toPath().resolve(Paths.get("wal",
"archive", nodeFolderName)).toFile();
+ File unzippedWalSegment = new File(nodeArchiveDir,
FileDescriptor.fileName(0));
+ File walSegment = new File(nodeArchiveDir, FileDescriptor.fileName(0)
+ ZIP_SUFFIX);
- File walDir = new File(dbDir, "wal");
- File archiveDir = new File(walDir, "archive");
- File nodeArchiveDir = new File(archiveDir, nodeFolderName);
- File walSegment = new File(nodeArchiveDir, FileDescriptor.fileName(0)
+ FilePageStoreManager.ZIP_SUFFIX);
+ // Allow compressor to compress WAL segments.
+ assertTrue(GridTestUtils.waitForCondition(() ->
!unzippedWalSegment.exists(), 15_000));
assertTrue(walSegment.exists());
assertTrue(walSegment.length() < WAL_SEGMENT_SIZE / 2); // Should be
compressed at least in half.
stopAllGrids();
- File[] cpMarkers = cpMarkersDir.listFiles(new FilenameFilter() {
- @Override public boolean accept(File dir, String name) {
- for (File cpMarker : cpMarkersToSave) {
- if (cpMarker.getName().equals(name))
- return false;
- }
-
- return true;
- }
- });
+ File[] cpMarkers = cpMarkersDir.listFiles((dir, name) ->
!cpMarkersToSave.contains(name));
assertNotNull(cpMarkers);
assertTrue(cpMarkers.length > 0);
@@ -532,7 +529,7 @@ public class WalCompactionTest extends
GridCommonAbstractTest {
for (File f : lfsFiles)
f.delete();
- ig = (IgniteEx)startGrids(3);
+ ig = startGrids(3);
awaitPartitionMapExchange();
@@ -545,7 +542,7 @@ public class WalCompactionTest extends
GridCommonAbstractTest {
missing++;
}
- System.out.println(">>> Missing " + missing + " entries logged before
WAL iteration start");
+ log.info(">>> Missing " + missing + " entries logged before WAL
iteration start");
assertTrue(missing > 0);
boolean fail = false;
@@ -555,12 +552,12 @@ public class WalCompactionTest extends
GridCommonAbstractTest {
byte[] arr = cache.get(i);
if (arr == null) {
- System.out.println(">>> Missing: " + i);
+ log.info(">>> Missing: " + i);
fail = true;
}
else if (arr[i] != 1) {
- System.out.println(">>> Corrupted: " + i);
+ log.info(">>> Corrupted: " + i);
fail = true;
}