This is an automated email from the ASF dual-hosted git repository.
namelchev pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/ignite.git
The following commit(s) were added to refs/heads/master by this push:
new dbf1c7825d7 IGNITE-20097 Fixed WAL logging to an archived segment
after node restart (#10887)
dbf1c7825d7 is described below
commit dbf1c7825d74809cd6859c85a8ac9ed9ac071e39
Author: Nikita Amelchev <[email protected]>
AuthorDate: Thu Aug 31 22:43:09 2023 +0300
IGNITE-20097 Fixed WAL logging to an archived segment after node restart
(#10887)
---
.../persistence/wal/FileWriteAheadLogManager.java | 53 +++++-
.../cdc/RestartWithWalForceArchiveTimeoutTest.java | 46 ++++-
.../apache/ignite/cdc/WalRolloverOnStopTest.java | 202 +++++++++++++++++++++
.../ignite/testsuites/IgnitePdsTestSuite2.java | 2 +
4 files changed, 292 insertions(+), 11 deletions(-)
diff --git
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/FileWriteAheadLogManager.java
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/FileWriteAheadLogManager.java
index f68b366fbb6..c1f04bd3baa 100644
---
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/FileWriteAheadLogManager.java
+++
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/FileWriteAheadLogManager.java
@@ -1455,16 +1455,25 @@ public class FileWriteAheadLogManager extends
GridCacheSharedManagerAdapter impl
* @throws StorageException If failed to initialize WAL write handle.
*/
private FileWriteHandle restoreWriteHandle(@Nullable WALPointer
lastReadPtr) throws StorageException {
- long absIdx = lastReadPtr == null ? 0 : lastReadPtr.index();
-
@Nullable FileArchiver archiver0 = archiver;
- long segNo = archiver0 == null ? absIdx : absIdx %
dsCfg.getWalSegments();
+ long absIdx;
+ int off;
- File curFile = new File(walWorkDir, fileName(segNo));
+ if (lastReadPtr == null) {
+ absIdx = 0;
+ off = 0;
+ }
+ else if (nextSegmentInited(lastReadPtr)) {
+ absIdx = lastReadPtr.index() + 1;
+ off = HEADER_RECORD_SIZE;
+ }
+ else {
+ absIdx = lastReadPtr.index();
+ off = lastReadPtr.fileOffset() + lastReadPtr.length();
+ }
- int off = lastReadPtr == null ? 0 : lastReadPtr.fileOffset();
- int len = lastReadPtr == null ? 0 : lastReadPtr.length();
+ File curFile = segmentFile(absIdx);
try {
SegmentIO fileIO = new SegmentIO(absIdx,
ioFactory.create(curFile));
@@ -1494,7 +1503,7 @@ public class FileWriteAheadLogManager extends
GridCacheSharedManagerAdapter impl
", offset=" + off + ", ver=" + serVer + ']');
}
- FileWriteHandle hnd = fileHandleManager.initHandle(fileIO, off
+ len, ser);
+ FileWriteHandle hnd = fileHandleManager.initHandle(fileIO,
off, ser);
segmentAware.curAbsWalIdx(absIdx);
@@ -1545,6 +1554,36 @@ public class FileWriteAheadLogManager extends
GridCacheSharedManagerAdapter impl
}
}
+ /** */
+ private File segmentFile(long absIdx) {
+ long segNo = archiver == null ? absIdx : absIdx %
dsCfg.getWalSegments();
+
+ return new File(walWorkDir, fileName(segNo));
+ }
+
+ /** @return {@code True} if the given pointer is the last in a segment and
a next segment has been initialized. */
+ private boolean nextSegmentInited(WALPointer ptr) {
+ try {
+ try (WALIterator iter = replay(new WALPointer(ptr.index(),
ptr.fileOffset() + ptr.length(), 0))) {
+ if (iter.hasNext())
+ return false;
+ }
+
+ long nextIdx = ptr.index() + 1;
+
+ try (SegmentIO fileIO = new SegmentIO(nextIdx,
ioFactory.create(segmentFile(nextIdx), READ))) {
+ readSegmentHeader(fileIO, segmentFileInputFactory);
+ }
+
+ return true;
+ }
+ catch (Exception ignored) {
+ // No-op.
+ }
+
+ return false;
+ }
+
/**
* Fills the file header for a new segment. Calling this method signals we
are done with the segment and it can be
* archived. If we don't have prepared file yet and achiever is busy this
method blocks.
diff --git
a/modules/core/src/test/java/org/apache/ignite/cdc/RestartWithWalForceArchiveTimeoutTest.java
b/modules/core/src/test/java/org/apache/ignite/cdc/RestartWithWalForceArchiveTimeoutTest.java
index d28abb3e5b1..57cdee034db 100644
---
a/modules/core/src/test/java/org/apache/ignite/cdc/RestartWithWalForceArchiveTimeoutTest.java
+++
b/modules/core/src/test/java/org/apache/ignite/cdc/RestartWithWalForceArchiveTimeoutTest.java
@@ -41,6 +41,9 @@ public class RestartWithWalForceArchiveTimeoutTest extends
GridCommonAbstractTes
@Parameterized.Parameter
public WALMode walMode;
+ /** */
+ private long walForceArchiveTimeout;
+
/** {@inheritDoc} */
@Override protected IgniteConfiguration getConfiguration(String
igniteInstanceName) throws Exception {
IgniteConfiguration cfg = super.getConfiguration(igniteInstanceName);
@@ -49,7 +52,7 @@ public class RestartWithWalForceArchiveTimeoutTest extends
GridCommonAbstractTes
cfg.setDataStorageConfiguration(new DataStorageConfiguration()
.setWalMode(walMode)
- .setWalForceArchiveTimeout(60 * 60 * 1000) // 1 hour to make sure
auto archive will not work.
+ .setWalForceArchiveTimeout(walForceArchiveTimeout)
.setDefaultDataRegionConfiguration(new
DataRegionConfiguration().setPersistenceEnabled(true)));
return cfg;
@@ -61,12 +64,17 @@ public class RestartWithWalForceArchiveTimeoutTest extends
GridCommonAbstractTes
return EnumSet.of(WALMode.FSYNC, WALMode.LOG_ONLY, WALMode.BACKGROUND);
}
- /** */
- @Test
- public void testRestart() throws Exception {
+ /** {@inheritDoc} */
+ @Override protected void afterTest() throws Exception {
stopAllGrids(true);
cleanPersistenceDir();
+ }
+
+ /** */
+ @Test
+ public void testRestart() throws Exception {
+ walForceArchiveTimeout = 60 * 60 * 1000; // 1 hour to make sure auto
archive will not work.
Supplier<IgniteEx> restart = () -> {
stopAllGrids(true);
@@ -92,4 +100,34 @@ public class RestartWithWalForceArchiveTimeoutTest extends
GridCommonAbstractTes
for (int i = 0; i < 5; i++)
restart.get();
}
+
+ /** */
+ @Test
+ public void testRestartAfterArchive() throws Exception {
+ walForceArchiveTimeout = 1000;
+
+ IgniteEx srv = startGrid(0);
+
+ srv.cluster().state(ACTIVE);
+
+ IgniteCache<Integer, Integer> cache =
srv.getOrCreateCache(DEFAULT_CACHE_NAME);
+
+ cache.put(1, 1);
+
+ forceCheckpoint();
+
+ Thread.sleep(2 * walForceArchiveTimeout);
+
+ stopGrid(0);
+ srv = startGrid(0);
+ cache = srv.cache(DEFAULT_CACHE_NAME);
+
+ cache.put(2, 2);
+
+ stopGrid(0);
+ srv = startGrid(0);
+ cache = srv.cache(DEFAULT_CACHE_NAME);
+
+ assertEquals(2, cache.size());
+ }
}
diff --git
a/modules/core/src/test/java/org/apache/ignite/cdc/WalRolloverOnStopTest.java
b/modules/core/src/test/java/org/apache/ignite/cdc/WalRolloverOnStopTest.java
new file mode 100644
index 00000000000..2e21eb86f0a
--- /dev/null
+++
b/modules/core/src/test/java/org/apache/ignite/cdc/WalRolloverOnStopTest.java
@@ -0,0 +1,202 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.cdc;
+
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.HashSet;
+import java.util.Set;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicLong;
+import org.apache.ignite.IgniteCache;
+import org.apache.ignite.IgniteException;
+import org.apache.ignite.cluster.ClusterState;
+import org.apache.ignite.configuration.DataRegionConfiguration;
+import org.apache.ignite.configuration.DataStorageConfiguration;
+import org.apache.ignite.configuration.IgniteConfiguration;
+import org.apache.ignite.configuration.WALMode;
+import org.apache.ignite.internal.IgniteEx;
+import org.apache.ignite.internal.IgniteInternalFuture;
+import org.apache.ignite.internal.IgniteInterruptedCheckedException;
+import org.apache.ignite.internal.pagemem.wal.WALIterator;
+import org.apache.ignite.internal.pagemem.wal.record.DataEntry;
+import org.apache.ignite.internal.pagemem.wal.record.DataRecord;
+import org.apache.ignite.internal.pagemem.wal.record.WALRecord;
+import
org.apache.ignite.internal.processors.cache.persistence.GridCacheDatabaseSharedManager;
+import
org.apache.ignite.internal.processors.cache.persistence.checkpoint.CheckpointListener;
+import org.apache.ignite.internal.processors.cache.persistence.wal.WALPointer;
+import
org.apache.ignite.internal.processors.cache.persistence.wal.aware.SegmentAware;
+import
org.apache.ignite.internal.processors.cache.persistence.wal.reader.IgniteWalIteratorFactory;
+import org.apache.ignite.internal.util.typedef.G;
+import org.apache.ignite.internal.util.typedef.internal.U;
+import org.apache.ignite.lang.IgniteBiTuple;
+import org.apache.ignite.testframework.GridTestUtils;
+import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+
+import static
org.apache.ignite.configuration.DataStorageConfiguration.DFLT_WAL_ARCHIVE_PATH;
+import static
org.apache.ignite.internal.pagemem.wal.record.WALRecord.RecordType.DATA_RECORD_V2;
+import static org.apache.ignite.testframework.GridTestUtils.runAsync;
+
+/**
+ * This tests check that the following scenario will works correctly.
+ */
+@RunWith(Parameterized.class)
+public class WalRolloverOnStopTest extends GridCommonAbstractTest {
+ /** WAL mode. */
+ @Parameterized.Parameter
+ public WALMode walMode;
+
+ /** @return Test parameters. */
+ @Parameterized.Parameters(name = "walMode={0}")
+ public static Collection<?> parameters() {
+ return Arrays.asList(new Object[][] {{WALMode.BACKGROUND},
{WALMode.LOG_ONLY}, {WALMode.FSYNC}});
+ }
+
+ /** {@inheritDoc} */
+ @Override protected IgniteConfiguration getConfiguration(String
igniteInstanceName) throws Exception {
+ return super.getConfiguration(igniteInstanceName)
+ .setDataStorageConfiguration(new DataStorageConfiguration()
+ .setWalAutoArchiveAfterInactivity(1500L)
+ .setDefaultDataRegionConfiguration(new
DataRegionConfiguration()
+ .setPersistenceEnabled(true)));
+ }
+
+ /** {@inheritDoc} */
+ @Override protected void afterTest() throws Exception {
+ super.afterTest();
+
+ stopAllGrids();
+
+ cleanPersistenceDir();
+ }
+
+ /**
+ * Test scenario:
+ *
+ * 0. {@link DataStorageConfiguration#getWalAutoArchiveAfterInactivity()}
> 0.
+ * 1. Node is gracefully stopping using {@link G#stop(String, boolean)}.
+ * 2. T0: {@code Checkpointer#doCheckpoint()} execute last checkpoint on
stop and freeze.
+ * 3. T1: Rollover segment after inactivity timeout.
+ * 4. T2: Archive segment.
+ *
+ * After restart WAL should log in the next segment.
+ * */
+ @Test
+ public void testWallRollover() throws Exception {
+ AtomicLong curIdx = new AtomicLong();
+
+ for (int i = 0; i < 2; i++) {
+ IgniteEx ign = startGrid(0);
+
+ GridCacheDatabaseSharedManager db =
+
(GridCacheDatabaseSharedManager)ign.context().cache().context().database();
+
+ SegmentAware aware =
GridTestUtils.getFieldValue(ign.context().cache().context().wal(),
"segmentAware");
+
+ ign.cluster().state(ClusterState.ACTIVE);
+
+ IgniteCache<Integer, Integer> cache =
ign.getOrCreateCache("my-cache");
+
+ CountDownLatch waitAfterCp = new CountDownLatch(1);
+ AtomicLong cntr = new AtomicLong(0);
+
+ db.addCheckpointListener(new CheckpointListener() {
+ @Override public void afterCheckpointEnd(Context ctx) {
+ if (!ign.context().isStopping())
+ return;
+
+ try {
+ waitAfterCp.await(getTestTimeout(),
TimeUnit.MILLISECONDS);
+
+ cntr.incrementAndGet();
+ }
+ catch (InterruptedException e) {
+ throw new IgniteException(e);
+ }
+ }
+
+ @Override public void onMarkCheckpointBegin(Context ctx) {
+ // No-op.
+ }
+
+ @Override public void onCheckpointBegin(Context ctx) {
+ // No-op.
+ }
+
+ @Override public void beforeCheckpointBegin(Context ctx) {
+ // No-op.
+ }
+ });
+
+ int maxKey = (i + 1) * 3;
+
+ for (int j = i * 3; j < maxKey; j++)
+ cache.put(j, j);
+
+ curIdx.set(aware.curAbsWalIdx());
+
+ IgniteInternalFuture<?> fut = runAsync(() -> {
+ try {
+ aware.awaitSegmentArchived(curIdx.get());
+
+ cntr.incrementAndGet();
+ }
+ catch (IgniteInterruptedCheckedException e) {
+ throw new IgniteException(e);
+ }
+ finally {
+ waitAfterCp.countDown();
+ }
+ });
+
+ G.stop(ign.name(), false);
+
+ fut.get(getTestTimeout());
+
+ // Checkpoint will happens two time because of segment archivation.
+ assertEquals("Should successfully wait for current segment
archivation", 3, cntr.get());
+
+ IgniteWalIteratorFactory.IteratorParametersBuilder builder =
+ new IgniteWalIteratorFactory.IteratorParametersBuilder()
+ .log(ign.log())
+ .filesOrDirs(
+ U.resolveWorkDirectory(U.defaultWorkDirectory(),
DFLT_WAL_ARCHIVE_PATH, false))
+ .filter((type, ptr) -> type == DATA_RECORD_V2);
+
+ Set<Integer> keys = new HashSet<>();
+
+ try (WALIterator it = new
IgniteWalIteratorFactory().iterator(builder)) {
+ while (it.hasNext()) {
+ IgniteBiTuple<WALPointer, WALRecord> tup = it.next();
+
+ DataRecord rec = (DataRecord)tup.get2();
+
+ for (DataEntry entry : rec.writeEntries())
+ keys.add(entry.key().value(null, false));
+ }
+ }
+
+ for (int j = 0; j < maxKey; j++)
+ assertTrue(keys.contains(j));
+ }
+ }
+}
diff --git
a/modules/core/src/test/java/org/apache/ignite/testsuites/IgnitePdsTestSuite2.java
b/modules/core/src/test/java/org/apache/ignite/testsuites/IgnitePdsTestSuite2.java
index 8b39f3e54ed..2d3e06fffd4 100644
---
a/modules/core/src/test/java/org/apache/ignite/testsuites/IgnitePdsTestSuite2.java
+++
b/modules/core/src/test/java/org/apache/ignite/testsuites/IgnitePdsTestSuite2.java
@@ -26,6 +26,7 @@ import org.apache.ignite.cdc.CdcNonDefaultWorkDirTest;
import org.apache.ignite.cdc.CdcSelfTest;
import org.apache.ignite.cdc.RestartWithWalForceArchiveTimeoutTest;
import org.apache.ignite.cdc.WalForCdcTest;
+import org.apache.ignite.cdc.WalRolloverOnStopTest;
import
org.apache.ignite.internal.processors.cache.distributed.dht.preloader.HistoricalRebalanceCheckpointTest;
import
org.apache.ignite.internal.processors.cache.distributed.dht.preloader.HistoricalRebalanceHeuristicsTest;
import
org.apache.ignite.internal.processors.cache.distributed.dht.preloader.HistoricalRebalanceTwoPartsInDifferentCheckpointsTest;
@@ -155,6 +156,7 @@ public class IgnitePdsTestSuite2 {
GridTestUtils.addTestIfNeeded(suite, CdcSelfTest.class, ignoredTests);
GridTestUtils.addTestIfNeeded(suite, CdcCacheVersionTest.class,
ignoredTests);
GridTestUtils.addTestIfNeeded(suite,
RestartWithWalForceArchiveTimeoutTest.class, ignoredTests);
+ GridTestUtils.addTestIfNeeded(suite, WalRolloverOnStopTest.class,
ignoredTests);
GridTestUtils.addTestIfNeeded(suite, WalForCdcTest.class,
ignoredTests);
GridTestUtils.addTestIfNeeded(suite,
CdcCacheConfigOnRestartTest.class, ignoredTests);
GridTestUtils.addTestIfNeeded(suite, CdcNonDefaultWorkDirTest.class,
ignoredTests);