This is an automated email from the ASF dual-hosted git repository.
nizhikov pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/ignite.git
The following commit(s) were added to refs/heads/master by this push:
new 055e138d0da IGNITE-18424 Fix WAL timeout rollover (#10448)
055e138d0da is described below
commit 055e138d0da6e98c0727c7b9b685d74c419b0575
Author: Nikolay <[email protected]>
AuthorDate: Fri Dec 16 14:53:18 2022 +0300
IGNITE-18424 Fix WAL timeout rollover (#10448)
---
.../persistence/wal/FileWriteAheadLogManager.java | 3 --
.../java/org/apache/ignite/cdc/WalForCdcTest.java | 57 +++++++++++++++++++++-
2 files changed, 56 insertions(+), 4 deletions(-)
diff --git
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/FileWriteAheadLogManager.java
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/FileWriteAheadLogManager.java
index 90a9b30d534..a30511728dc 100644
---
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/FileWriteAheadLogManager.java
+++
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/FileWriteAheadLogManager.java
@@ -851,9 +851,6 @@ public class FileWriteAheadLogManager extends
GridCacheSharedManagerAdapter impl
if (walForceArchiveTimeout > 0) {
if (lastDataRecordLoggedMs.get() == 0)
return; //no data records were logged to current segment, do
not rollover.
-
- if (!checkTimeout(lastDataRecordLoggedMs, walForceArchiveTimeout))
- return;
}
else if (!checkTimeout(lastRecordLoggedMs,
walAutoArchiveAfterInactivity))
return;
diff --git
a/modules/core/src/test/java/org/apache/ignite/cdc/WalForCdcTest.java
b/modules/core/src/test/java/org/apache/ignite/cdc/WalForCdcTest.java
index 9f39ede2252..401b02f3161 100644
--- a/modules/core/src/test/java/org/apache/ignite/cdc/WalForCdcTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/cdc/WalForCdcTest.java
@@ -36,6 +36,7 @@ import
org.apache.ignite.configuration.DataRegionConfiguration;
import org.apache.ignite.configuration.DataStorageConfiguration;
import org.apache.ignite.configuration.IgniteConfiguration;
import org.apache.ignite.internal.IgniteEx;
+import org.apache.ignite.internal.IgniteInternalFuture;
import org.apache.ignite.internal.pagemem.wal.IgniteWriteAheadLogManager;
import org.apache.ignite.internal.pagemem.wal.WALIterator;
import org.apache.ignite.internal.pagemem.wal.record.DataRecord;
@@ -62,6 +63,7 @@ import static
org.apache.ignite.configuration.DataStorageConfiguration.UNLIMITED
import static org.apache.ignite.internal.util.IgniteUtils.KB;
import static org.apache.ignite.internal.util.IgniteUtils.MB;
import static org.apache.ignite.testframework.GridTestUtils.getFieldValue;
+import static
org.apache.ignite.testframework.GridTestUtils.runMultiThreadedAsync;
import static org.apache.ignite.testframework.GridTestUtils.waitForCondition;
/** Check only {@link DataRecord} written to the WAL for in-memory cache. */
@@ -70,6 +72,9 @@ public class WalForCdcTest extends GridCommonAbstractTest {
/** */
private static final int RECORD_COUNT = 10;
+ /** */
+ public static final int DFLT_WAL_SGMNT_SZ = (int)(2 * MB);
+
/** */
@Parameterized.Parameter
public CacheMode mode;
@@ -87,6 +92,9 @@ public class WalForCdcTest extends GridCommonAbstractTest {
/** */
private long archiveSz = UNLIMITED_WAL_ARCHIVE;
+ /** */
+ private int walSgmntSz = DFLT_WAL_SGMNT_SZ;
+
/** */
@Parameterized.Parameters(name = "mode={0}, atomicityMode={1}")
public static Collection<?> parameters() {
@@ -105,7 +113,7 @@ public class WalForCdcTest extends GridCommonAbstractTest {
cfg.setDataStorageConfiguration(new DataStorageConfiguration()
.setWalForceArchiveTimeout(WAL_ARCHIVE_TIMEOUT)
- .setWalSegmentSize((int)(2 * MB))
+ .setWalSegmentSize(walSgmntSz)
.setMaxWalArchiveSize(archiveSz)
.setDefaultDataRegionConfiguration(new DataRegionConfiguration()
.setPersistenceEnabled(persistenceEnabled)
@@ -263,6 +271,53 @@ public class WalForCdcTest extends GridCommonAbstractTest {
);
}
+ /** Tests that WAL rollover by timeout will happen if concurrent
operations exists. */
+ @Test
+ public void testTimeoutRolloverWithConcurrentLoad() throws Exception {
+ persistenceEnabled = true;
+ cdcEnabled = true;
+ walSgmntSz = Integer.MAX_VALUE;
+
+ try {
+ IgniteEx ignite = startGrid(0);
+
+ ignite.cluster().state(ClusterState.ACTIVE);
+
+ IgniteCache<Long, Long> cache = ignite.getOrCreateCache(
+ new CacheConfiguration<Long, Long>(DEFAULT_CACHE_NAME)
+ .setCacheMode(mode)
+ .setAtomicityMode(atomicityMode));
+
+ IgniteInternalFuture<Long> genFut = runMultiThreadedAsync(() -> {
+ long cntr = 0;
+
+ while (!Thread.currentThread().isInterrupted())
+ cache.put(cntr++, cntr);
+ }, 2, "data-generator");
+
+ try {
+ IgniteWriteAheadLogManager wal =
ignite.context().cache().context().wal(true);
+
+ for (int i = 0; i < 3; i++) {
+ log.info("Waiting for WAL rollover[iter=" + i + 1 + ']');
+
+ long startSgmnt = wal.currentSegment();
+
+ assertTrue(
+ "Timeout rollover must happen",
+ waitForCondition(() -> startSgmnt <
wal.currentSegment(), WAL_ARCHIVE_TIMEOUT * 2)
+ );
+ }
+ }
+ finally {
+ genFut.cancel();
+ }
+ }
+ finally {
+ walSgmntSz = DFLT_WAL_SGMNT_SZ;
+ }
+ }
+
/** */
private void doTestWal(
IgniteEx ignite,