This is an automated email from the ASF dual-hosted git repository.

namelchev pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/ignite.git


The following commit(s) were added to refs/heads/master by this push:
     new dbf1c7825d7 IGNITE-20097 Fixed WAL logging to an archived segment 
after node restart (#10887)
dbf1c7825d7 is described below

commit dbf1c7825d74809cd6859c85a8ac9ed9ac071e39
Author: Nikita Amelchev <[email protected]>
AuthorDate: Thu Aug 31 22:43:09 2023 +0300

    IGNITE-20097 Fixed WAL logging to an archived segment after node restart 
(#10887)
---
 .../persistence/wal/FileWriteAheadLogManager.java  |  53 +++++-
 .../cdc/RestartWithWalForceArchiveTimeoutTest.java |  46 ++++-
 .../apache/ignite/cdc/WalRolloverOnStopTest.java   | 202 +++++++++++++++++++++
 .../ignite/testsuites/IgnitePdsTestSuite2.java     |   2 +
 4 files changed, 292 insertions(+), 11 deletions(-)

diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/FileWriteAheadLogManager.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/FileWriteAheadLogManager.java
index f68b366fbb6..c1f04bd3baa 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/FileWriteAheadLogManager.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/FileWriteAheadLogManager.java
@@ -1455,16 +1455,25 @@ public class FileWriteAheadLogManager extends 
GridCacheSharedManagerAdapter impl
      * @throws StorageException If failed to initialize WAL write handle.
      */
     private FileWriteHandle restoreWriteHandle(@Nullable WALPointer 
lastReadPtr) throws StorageException {
-        long absIdx = lastReadPtr == null ? 0 : lastReadPtr.index();
-
         @Nullable FileArchiver archiver0 = archiver;
 
-        long segNo = archiver0 == null ? absIdx : absIdx % 
dsCfg.getWalSegments();
+        long absIdx;
+        int off;
 
-        File curFile = new File(walWorkDir, fileName(segNo));
+        if (lastReadPtr == null) {
+            absIdx = 0;
+            off = 0;
+        }
+        else if (nextSegmentInited(lastReadPtr)) {
+            absIdx = lastReadPtr.index() + 1;
+            off = HEADER_RECORD_SIZE;
+        }
+        else {
+            absIdx = lastReadPtr.index();
+            off = lastReadPtr.fileOffset() + lastReadPtr.length();
+        }
 
-        int off = lastReadPtr == null ? 0 : lastReadPtr.fileOffset();
-        int len = lastReadPtr == null ? 0 : lastReadPtr.length();
+        File curFile = segmentFile(absIdx);
 
         try {
             SegmentIO fileIO = new SegmentIO(absIdx, 
ioFactory.create(curFile));
@@ -1494,7 +1503,7 @@ public class FileWriteAheadLogManager extends 
GridCacheSharedManagerAdapter impl
                         ", offset=" + off + ", ver=" + serVer + ']');
                 }
 
-                FileWriteHandle hnd = fileHandleManager.initHandle(fileIO, off 
+ len, ser);
+                FileWriteHandle hnd = fileHandleManager.initHandle(fileIO, 
off, ser);
 
                 segmentAware.curAbsWalIdx(absIdx);
 
@@ -1545,6 +1554,36 @@ public class FileWriteAheadLogManager extends 
GridCacheSharedManagerAdapter impl
         }
     }
 
+    /** */
+    private File segmentFile(long absIdx) {
+        long segNo = archiver == null ? absIdx : absIdx % 
dsCfg.getWalSegments();
+
+        return new File(walWorkDir, fileName(segNo));
+    }
+
+    /** @return {@code True} if the given pointer is the last in a segment and 
a next segment has been initialized. */
+    private boolean nextSegmentInited(WALPointer ptr) {
+        try {
+            try (WALIterator iter = replay(new WALPointer(ptr.index(), 
ptr.fileOffset() + ptr.length(), 0))) {
+                if (iter.hasNext())
+                    return false;
+            }
+
+            long nextIdx = ptr.index() + 1;
+
+            try (SegmentIO fileIO = new SegmentIO(nextIdx, 
ioFactory.create(segmentFile(nextIdx), READ))) {
+                readSegmentHeader(fileIO, segmentFileInputFactory);
+            }
+
+            return true;
+        }
+        catch (Exception ignored) {
+            // No-op.
+        }
+
+        return false;
+    }
+
     /**
      * Fills the file header for a new segment. Calling this method signals we 
are done with the segment and it can be
      * archived. If we don't have prepared file yet and achiever is busy this 
method blocks.
diff --git 
a/modules/core/src/test/java/org/apache/ignite/cdc/RestartWithWalForceArchiveTimeoutTest.java
 
b/modules/core/src/test/java/org/apache/ignite/cdc/RestartWithWalForceArchiveTimeoutTest.java
index d28abb3e5b1..57cdee034db 100644
--- 
a/modules/core/src/test/java/org/apache/ignite/cdc/RestartWithWalForceArchiveTimeoutTest.java
+++ 
b/modules/core/src/test/java/org/apache/ignite/cdc/RestartWithWalForceArchiveTimeoutTest.java
@@ -41,6 +41,9 @@ public class RestartWithWalForceArchiveTimeoutTest extends 
GridCommonAbstractTes
     @Parameterized.Parameter
     public WALMode walMode;
 
+    /** */
+    private long walForceArchiveTimeout;
+
     /** {@inheritDoc} */
     @Override protected IgniteConfiguration getConfiguration(String 
igniteInstanceName) throws Exception {
         IgniteConfiguration cfg = super.getConfiguration(igniteInstanceName);
@@ -49,7 +52,7 @@ public class RestartWithWalForceArchiveTimeoutTest extends 
GridCommonAbstractTes
 
         cfg.setDataStorageConfiguration(new DataStorageConfiguration()
             .setWalMode(walMode)
-            .setWalForceArchiveTimeout(60 * 60 * 1000) // 1 hour to make sure 
auto archive will not work.
+            .setWalForceArchiveTimeout(walForceArchiveTimeout)
             .setDefaultDataRegionConfiguration(new 
DataRegionConfiguration().setPersistenceEnabled(true)));
 
         return cfg;
@@ -61,12 +64,17 @@ public class RestartWithWalForceArchiveTimeoutTest extends 
GridCommonAbstractTes
         return EnumSet.of(WALMode.FSYNC, WALMode.LOG_ONLY, WALMode.BACKGROUND);
     }
 
-    /** */
-    @Test
-    public void testRestart() throws Exception {
+    /** {@inheritDoc} */
+    @Override protected void afterTest() throws Exception {
         stopAllGrids(true);
 
         cleanPersistenceDir();
+    }
+
+    /** */
+    @Test
+    public void testRestart() throws Exception {
+        walForceArchiveTimeout = 60 * 60 * 1000; // 1 hour to make sure auto 
archive will not work.
 
         Supplier<IgniteEx> restart = () -> {
             stopAllGrids(true);
@@ -92,4 +100,34 @@ public class RestartWithWalForceArchiveTimeoutTest extends 
GridCommonAbstractTes
         for (int i = 0; i < 5; i++)
             restart.get();
     }
+
+    /** */
+    @Test
+    public void testRestartAfterArchive() throws Exception {
+        walForceArchiveTimeout = 1000;
+
+        IgniteEx srv = startGrid(0);
+
+        srv.cluster().state(ACTIVE);
+
+        IgniteCache<Integer, Integer> cache = 
srv.getOrCreateCache(DEFAULT_CACHE_NAME);
+
+        cache.put(1, 1);
+
+        forceCheckpoint();
+
+        Thread.sleep(2 * walForceArchiveTimeout);
+
+        stopGrid(0);
+        srv = startGrid(0);
+        cache = srv.cache(DEFAULT_CACHE_NAME);
+
+        cache.put(2, 2);
+
+        stopGrid(0);
+        srv = startGrid(0);
+        cache = srv.cache(DEFAULT_CACHE_NAME);
+
+        assertEquals(2, cache.size());
+    }
 }
diff --git 
a/modules/core/src/test/java/org/apache/ignite/cdc/WalRolloverOnStopTest.java 
b/modules/core/src/test/java/org/apache/ignite/cdc/WalRolloverOnStopTest.java
new file mode 100644
index 00000000000..2e21eb86f0a
--- /dev/null
+++ 
b/modules/core/src/test/java/org/apache/ignite/cdc/WalRolloverOnStopTest.java
@@ -0,0 +1,202 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.cdc;
+
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.HashSet;
+import java.util.Set;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicLong;
+import org.apache.ignite.IgniteCache;
+import org.apache.ignite.IgniteException;
+import org.apache.ignite.cluster.ClusterState;
+import org.apache.ignite.configuration.DataRegionConfiguration;
+import org.apache.ignite.configuration.DataStorageConfiguration;
+import org.apache.ignite.configuration.IgniteConfiguration;
+import org.apache.ignite.configuration.WALMode;
+import org.apache.ignite.internal.IgniteEx;
+import org.apache.ignite.internal.IgniteInternalFuture;
+import org.apache.ignite.internal.IgniteInterruptedCheckedException;
+import org.apache.ignite.internal.pagemem.wal.WALIterator;
+import org.apache.ignite.internal.pagemem.wal.record.DataEntry;
+import org.apache.ignite.internal.pagemem.wal.record.DataRecord;
+import org.apache.ignite.internal.pagemem.wal.record.WALRecord;
+import 
org.apache.ignite.internal.processors.cache.persistence.GridCacheDatabaseSharedManager;
+import 
org.apache.ignite.internal.processors.cache.persistence.checkpoint.CheckpointListener;
+import org.apache.ignite.internal.processors.cache.persistence.wal.WALPointer;
+import 
org.apache.ignite.internal.processors.cache.persistence.wal.aware.SegmentAware;
+import 
org.apache.ignite.internal.processors.cache.persistence.wal.reader.IgniteWalIteratorFactory;
+import org.apache.ignite.internal.util.typedef.G;
+import org.apache.ignite.internal.util.typedef.internal.U;
+import org.apache.ignite.lang.IgniteBiTuple;
+import org.apache.ignite.testframework.GridTestUtils;
+import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+
+import static 
org.apache.ignite.configuration.DataStorageConfiguration.DFLT_WAL_ARCHIVE_PATH;
+import static 
org.apache.ignite.internal.pagemem.wal.record.WALRecord.RecordType.DATA_RECORD_V2;
+import static org.apache.ignite.testframework.GridTestUtils.runAsync;
+
+/**
+ * This tests check that the following scenario will works correctly.
+ */
+@RunWith(Parameterized.class)
+public class WalRolloverOnStopTest extends GridCommonAbstractTest {
+    /** WAL mode. */
+    @Parameterized.Parameter
+    public WALMode walMode;
+
+    /** @return Test parameters. */
+    @Parameterized.Parameters(name = "walMode={0}")
+    public static Collection<?> parameters() {
+        return Arrays.asList(new Object[][] {{WALMode.BACKGROUND}, 
{WALMode.LOG_ONLY}, {WALMode.FSYNC}});
+    }
+
+    /** {@inheritDoc} */
+    @Override protected IgniteConfiguration getConfiguration(String 
igniteInstanceName) throws Exception {
+        return super.getConfiguration(igniteInstanceName)
+            .setDataStorageConfiguration(new DataStorageConfiguration()
+                .setWalAutoArchiveAfterInactivity(1500L)
+                .setDefaultDataRegionConfiguration(new 
DataRegionConfiguration()
+                    .setPersistenceEnabled(true)));
+    }
+
+    /** {@inheritDoc} */
+    @Override protected void afterTest() throws Exception {
+        super.afterTest();
+
+        stopAllGrids();
+
+        cleanPersistenceDir();
+    }
+
+    /**
+     * Test scenario:
+     *
+     * 0. {@link DataStorageConfiguration#getWalAutoArchiveAfterInactivity()} 
> 0.
+     * 1. Node is gracefully stopping using {@link G#stop(String, boolean)}.
+     * 2. T0: {@code Checkpointer#doCheckpoint()} execute last checkpoint on 
stop and freeze.
+     * 3. T1: Rollover segment after inactivity timeout.
+     * 4. T2: Archive segment.
+     *
+     * After restart WAL should log in the next segment.
+     * */
+    @Test
+    public void testWallRollover() throws Exception {
+        AtomicLong curIdx = new AtomicLong();
+
+        for (int i = 0; i < 2; i++) {
+            IgniteEx ign = startGrid(0);
+
+            GridCacheDatabaseSharedManager db =
+                
(GridCacheDatabaseSharedManager)ign.context().cache().context().database();
+
+            SegmentAware aware = 
GridTestUtils.getFieldValue(ign.context().cache().context().wal(), 
"segmentAware");
+
+            ign.cluster().state(ClusterState.ACTIVE);
+
+            IgniteCache<Integer, Integer> cache = 
ign.getOrCreateCache("my-cache");
+
+            CountDownLatch waitAfterCp = new CountDownLatch(1);
+            AtomicLong cntr = new AtomicLong(0);
+
+            db.addCheckpointListener(new CheckpointListener() {
+                @Override public void afterCheckpointEnd(Context ctx) {
+                    if (!ign.context().isStopping())
+                        return;
+
+                    try {
+                        waitAfterCp.await(getTestTimeout(), 
TimeUnit.MILLISECONDS);
+
+                        cntr.incrementAndGet();
+                    }
+                    catch (InterruptedException e) {
+                        throw new IgniteException(e);
+                    }
+                }
+
+                @Override public void onMarkCheckpointBegin(Context ctx) {
+                    // No-op.
+                }
+
+                @Override public void onCheckpointBegin(Context ctx) {
+                    // No-op.
+                }
+
+                @Override public void beforeCheckpointBegin(Context ctx) {
+                    // No-op.
+                }
+            });
+
+            int maxKey = (i + 1) * 3;
+
+            for (int j = i * 3; j < maxKey; j++)
+                cache.put(j, j);
+
+            curIdx.set(aware.curAbsWalIdx());
+
+            IgniteInternalFuture<?> fut = runAsync(() -> {
+                try {
+                    aware.awaitSegmentArchived(curIdx.get());
+
+                    cntr.incrementAndGet();
+                }
+                catch (IgniteInterruptedCheckedException e) {
+                    throw new IgniteException(e);
+                }
+                finally {
+                    waitAfterCp.countDown();
+                }
+            });
+
+            G.stop(ign.name(), false);
+
+            fut.get(getTestTimeout());
+
+            // Checkpoint will happens two time because of segment archivation.
+            assertEquals("Should successfully wait for current segment 
archivation", 3, cntr.get());
+
+            IgniteWalIteratorFactory.IteratorParametersBuilder builder =
+                new IgniteWalIteratorFactory.IteratorParametersBuilder()
+                    .log(ign.log())
+                    .filesOrDirs(
+                        U.resolveWorkDirectory(U.defaultWorkDirectory(), 
DFLT_WAL_ARCHIVE_PATH, false))
+                    .filter((type, ptr) -> type == DATA_RECORD_V2);
+
+            Set<Integer> keys = new HashSet<>();
+
+            try (WALIterator it = new 
IgniteWalIteratorFactory().iterator(builder)) {
+                while (it.hasNext()) {
+                    IgniteBiTuple<WALPointer, WALRecord> tup = it.next();
+
+                    DataRecord rec = (DataRecord)tup.get2();
+
+                    for (DataEntry entry : rec.writeEntries())
+                        keys.add(entry.key().value(null, false));
+                }
+            }
+
+            for (int j = 0; j < maxKey; j++)
+                assertTrue(keys.contains(j));
+        }
+    }
+}
diff --git 
a/modules/core/src/test/java/org/apache/ignite/testsuites/IgnitePdsTestSuite2.java
 
b/modules/core/src/test/java/org/apache/ignite/testsuites/IgnitePdsTestSuite2.java
index 8b39f3e54ed..2d3e06fffd4 100644
--- 
a/modules/core/src/test/java/org/apache/ignite/testsuites/IgnitePdsTestSuite2.java
+++ 
b/modules/core/src/test/java/org/apache/ignite/testsuites/IgnitePdsTestSuite2.java
@@ -26,6 +26,7 @@ import org.apache.ignite.cdc.CdcNonDefaultWorkDirTest;
 import org.apache.ignite.cdc.CdcSelfTest;
 import org.apache.ignite.cdc.RestartWithWalForceArchiveTimeoutTest;
 import org.apache.ignite.cdc.WalForCdcTest;
+import org.apache.ignite.cdc.WalRolloverOnStopTest;
 import 
org.apache.ignite.internal.processors.cache.distributed.dht.preloader.HistoricalRebalanceCheckpointTest;
 import 
org.apache.ignite.internal.processors.cache.distributed.dht.preloader.HistoricalRebalanceHeuristicsTest;
 import 
org.apache.ignite.internal.processors.cache.distributed.dht.preloader.HistoricalRebalanceTwoPartsInDifferentCheckpointsTest;
@@ -155,6 +156,7 @@ public class IgnitePdsTestSuite2 {
         GridTestUtils.addTestIfNeeded(suite, CdcSelfTest.class, ignoredTests);
         GridTestUtils.addTestIfNeeded(suite, CdcCacheVersionTest.class, 
ignoredTests);
         GridTestUtils.addTestIfNeeded(suite, 
RestartWithWalForceArchiveTimeoutTest.class, ignoredTests);
+        GridTestUtils.addTestIfNeeded(suite, WalRolloverOnStopTest.class, 
ignoredTests);
         GridTestUtils.addTestIfNeeded(suite, WalForCdcTest.class, 
ignoredTests);
         GridTestUtils.addTestIfNeeded(suite, 
CdcCacheConfigOnRestartTest.class, ignoredTests);
         GridTestUtils.addTestIfNeeded(suite, CdcNonDefaultWorkDirTest.class, 
ignoredTests);

Reply via email to