Repository: ignite
Updated Branches:
  refs/heads/master 1c840f590 -> dbf557400


IGNITE-9050 WAL iterator should throw an exception if segment tail is reached 
inside archive directory - Fixes #4429.

Signed-off-by: Alexey Goncharuk <alexey.goncha...@gmail.com>


Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/dbf55740
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/dbf55740
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/dbf55740

Branch: refs/heads/master
Commit: dbf557400c77880e182bcac8358f47d2148451b1
Parents: 1c840f5
Author: Dmitriy Govorukhin <dmitriy.govoruk...@gmail.com>
Authored: Fri Aug 10 15:32:19 2018 +0300
Committer: Alexey Goncharuk <alexey.goncha...@gmail.com>
Committed: Fri Aug 10 15:32:19 2018 +0300

----------------------------------------------------------------------
 .../wal/AbstractWalRecordsIterator.java         |  17 +-
 .../wal/reader/IgniteWalIteratorFactory.java    |  15 +-
 .../wal/serializer/RecordV2Serializer.java      |  19 +-
 ...IsReachedDuringIterationOverArchiveTest.java | 245 +++++++++++++++++++
 .../ignite/testsuites/IgnitePdsTestSuite2.java  |   3 +
 5 files changed, 279 insertions(+), 20 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/ignite/blob/dbf55740/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/AbstractWalRecordsIterator.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/AbstractWalRecordsIterator.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/AbstractWalRecordsIterator.java
index 01b0933..ac68ea9 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/AbstractWalRecordsIterator.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/AbstractWalRecordsIterator.java
@@ -162,6 +162,13 @@ public abstract class AbstractWalRecordsIterator
                 }
             }
             catch (WalSegmentTailReachedException e) {
+                AbstractReadFileHandle currWalSegment = this.currWalSegment;
+
+                if (!currWalSegment.workDir())
+                    throw new IgniteCheckedException(
+                        "WAL tail reached in archive directory, " +
+                            "WAL segment file is corrupted.", e);
+
                 log.warning(e.getMessage());
 
                 curRec = null;
@@ -197,7 +204,8 @@ public abstract class AbstractWalRecordsIterator
      * @throws IgniteCheckedException if reading failed
      */
     protected abstract AbstractReadFileHandle advanceSegment(
-        @Nullable final AbstractReadFileHandle curWalSegment) throws 
IgniteCheckedException;
+        @Nullable final AbstractReadFileHandle curWalSegment
+    ) throws IgniteCheckedException;
 
     /**
      * Switches to new record.
@@ -222,8 +230,11 @@ public abstract class AbstractWalRecordsIterator
             return new IgniteBiTuple<>((WALPointer)actualFilePtr, 
postProcessRecord(rec));
         }
         catch (IOException | IgniteCheckedException e) {
-            if (e instanceof WalSegmentTailReachedException)
-                throw (WalSegmentTailReachedException)e;
+            if (e instanceof WalSegmentTailReachedException) {
+                throw new WalSegmentTailReachedException(
+                    "WAL segment tail reached. [idx=" + hnd.idx() +
+                        ", isWorkDir=" + hnd.workDir() + ", serVer=" + 
hnd.ser() + "]", e);
+            }
 
             if (!(e instanceof SegmentEofException) && !(e instanceof 
EOFException)) {
                 IgniteCheckedException e0 = handleRecordException(e, 
actualFilePtr);

http://git-wip-us.apache.org/repos/asf/ignite/blob/dbf55740/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/reader/IgniteWalIteratorFactory.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/reader/IgniteWalIteratorFactory.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/reader/IgniteWalIteratorFactory.java
index 2bfc22d..eeb52b3 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/reader/IgniteWalIteratorFactory.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/reader/IgniteWalIteratorFactory.java
@@ -133,10 +133,7 @@ public class IgniteWalIteratorFactory {
         return new StandaloneWalRecordsIterator(log,
             prepareSharedCtx(iteratorParametersBuilder),
             iteratorParametersBuilder.ioFactory,
-            resolveWalFiles(
-                iteratorParametersBuilder.filesOrDirs,
-                iteratorParametersBuilder
-            ),
+            resolveWalFiles(iteratorParametersBuilder),
             iteratorParametersBuilder.filter,
             iteratorParametersBuilder.keepBinary,
             iteratorParametersBuilder.bufferSize
@@ -182,10 +179,7 @@ public class IgniteWalIteratorFactory {
 
         List<T2<Long, Long>> gaps = new ArrayList<>();
 
-        List<FileDescriptor> descriptors = resolveWalFiles(
-            iteratorParametersBuilder.filesOrDirs,
-            iteratorParametersBuilder
-        );
+        List<FileDescriptor> descriptors = 
resolveWalFiles(iteratorParametersBuilder);
 
         Iterator<FileDescriptor> it = descriptors.iterator();
 
@@ -217,10 +211,11 @@ public class IgniteWalIteratorFactory {
      * @param iteratorParametersBuilder IteratorParametersBuilder.
      * @return list of file descriptors with checked header records, having 
correct file index is set
      */
-    private List<FileDescriptor> resolveWalFiles(
-        File[] filesOrDirs,
+    public List<FileDescriptor> resolveWalFiles(
         IteratorParametersBuilder iteratorParametersBuilder
     ) {
+        File[] filesOrDirs = iteratorParametersBuilder.filesOrDirs;
+
         if (filesOrDirs == null || filesOrDirs.length == 0)
             return Collections.emptyList();
 

http://git-wip-us.apache.org/repos/asf/ignite/blob/dbf55740/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/serializer/RecordV2Serializer.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/serializer/RecordV2Serializer.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/serializer/RecordV2Serializer.java
index 2c65ebe..68e55e0 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/serializer/RecordV2Serializer.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/serializer/RecordV2Serializer.java
@@ -113,10 +113,14 @@ public class RecordV2Serializer implements 
RecordSerializer {
             if (recType == SWITCH_SEGMENT_RECORD)
                 throw new SegmentEofException("Reached end of segment", null);
 
-            FileWALPointer ptr = readPositionAndCheckPoint(in, expPtr, 
skipPositionCheck);
+            FileWALPointer ptr = readPositionAndCheckPoint(in, expPtr, 
skipPositionCheck, recType);
 
-            if (recType == null)
-                throw new IOException("Unknown record type: " + recType);
+            if (recType == null) {
+                FileWALPointer exp = (FileWALPointer)expPtr;
+
+                throw new IOException("Unknown record type: " + recType +
+                    ", expected pointer [idx=" + exp.index() + ", offset=" + 
exp.fileOffset() + "]");
+            }
 
             if (recordFilter != null && !recordFilter.apply(recType, ptr)) {
                 int toSkip = ptr.length() - REC_TYPE_SIZE - 
FILE_WAL_POINTER_SIZE - CRC_SIZE;
@@ -241,7 +245,8 @@ public class RecordV2Serializer implements RecordSerializer 
{
     private static FileWALPointer readPositionAndCheckPoint(
         DataInput in,
         WALPointer expPtr,
-        boolean skipPositionCheck
+        boolean skipPositionCheck,
+        WALRecord.RecordType type
     ) throws IgniteCheckedException, IOException {
         long idx = in.readLong();
         int fileOff = in.readInt();
@@ -251,9 +256,9 @@ public class RecordV2Serializer implements RecordSerializer 
{
 
         if (!F.eq(idx, p.index()) || (!skipPositionCheck && !F.eq(fileOff, 
p.fileOffset())))
             throw new WalSegmentTailReachedException(
-                "WAL segment tail is reached. [ " +
-                        "Expected next state: {Index=" + p.index() + 
",Offset=" + p.fileOffset() + "}, " +
-                        "Actual state : {Index=" + idx + ",Offset=" + fileOff 
+ "} ]", null);
+                "WAL segment tail reached. [ " +
+                    "Expected next state: {Index=" + p.index() + ",Offset=" + 
p.fileOffset() + "}, " +
+                    "Actual state : {Index=" + idx + ",Offset=" + fileOff + "} 
] recordType=" + type, null);
 
         return new FileWALPointer(idx, fileOff, len);
     }

http://git-wip-us.apache.org/repos/asf/ignite/blob/dbf55740/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/db/wal/IgniteWALTailIsReachedDuringIterationOverArchiveTest.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/db/wal/IgniteWALTailIsReachedDuringIterationOverArchiveTest.java
 
b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/db/wal/IgniteWALTailIsReachedDuringIterationOverArchiveTest.java
new file mode 100644
index 0000000..a7c4081
--- /dev/null
+++ 
b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/db/wal/IgniteWALTailIsReachedDuringIterationOverArchiveTest.java
@@ -0,0 +1,245 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.cache.persistence.db.wal;
+
+import java.io.File;
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Random;
+import org.apache.ignite.Ignite;
+import org.apache.ignite.IgniteCheckedException;
+import org.apache.ignite.IgniteDataStreamer;
+import org.apache.ignite.configuration.CacheConfiguration;
+import org.apache.ignite.configuration.DataRegionConfiguration;
+import org.apache.ignite.configuration.DataStorageConfiguration;
+import org.apache.ignite.configuration.IgniteConfiguration;
+import org.apache.ignite.internal.IgniteEx;
+import org.apache.ignite.internal.pagemem.wal.IgniteWriteAheadLogManager;
+import org.apache.ignite.internal.pagemem.wal.WALIterator;
+import org.apache.ignite.internal.pagemem.wal.WALPointer;
+import org.apache.ignite.internal.pagemem.wal.record.WALRecord;
+import org.apache.ignite.internal.processors.cache.persistence.file.FileIO;
+import 
org.apache.ignite.internal.processors.cache.persistence.file.FileIOFactory;
+import 
org.apache.ignite.internal.processors.cache.persistence.file.RandomAccessFileIOFactory;
+import 
org.apache.ignite.internal.processors.cache.persistence.wal.FileWALPointer;
+import 
org.apache.ignite.internal.processors.cache.persistence.wal.FileWriteAheadLogManager.FileDescriptor;
+import 
org.apache.ignite.internal.processors.cache.persistence.wal.reader.IgniteWalIteratorFactory;
+import 
org.apache.ignite.internal.processors.cache.persistence.wal.reader.IgniteWalIteratorFactory.IteratorParametersBuilder;
+import org.apache.ignite.internal.util.typedef.internal.U;
+import org.apache.ignite.lang.IgniteBiTuple;
+import org.apache.ignite.spi.discovery.tcp.TcpDiscoverySpi;
+import org.apache.ignite.spi.discovery.tcp.ipfinder.TcpDiscoveryIpFinder;
+import org.apache.ignite.spi.discovery.tcp.ipfinder.vm.TcpDiscoveryVmIpFinder;
+import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest;
+import org.junit.Assert;
+
+import static java.nio.ByteBuffer.allocate;
+import static java.nio.file.StandardOpenOption.WRITE;
+import static java.util.concurrent.ThreadLocalRandom.current;
+
+/**
+ *
+ */
+public class IgniteWALTailIsReachedDuringIterationOverArchiveTest extends 
GridCommonAbstractTest {
+    /** */
+    private static final TcpDiscoveryIpFinder IP_FINDER = new 
TcpDiscoveryVmIpFinder(true);
+
+    /** WAL segment size. */
+    private static final int WAL_SEGMENT_SIZE = 10 * 1024 * 1024;
+
+    /** {@inheritDoc} */
+    @Override protected IgniteConfiguration getConfiguration(String name) 
throws Exception {
+        IgniteConfiguration cfg = super.getConfiguration(name);
+
+        cfg.setDataStorageConfiguration(
+            new DataStorageConfiguration()
+                .setWalSegmentSize(WAL_SEGMENT_SIZE)
+                .setWalSegments(2)
+                .setDefaultDataRegionConfiguration(
+                    new DataRegionConfiguration()
+                        .setPersistenceEnabled(true)
+                )
+        );
+
+        cfg.setDiscoverySpi(new TcpDiscoverySpi().setIpFinder(IP_FINDER));
+
+        cfg.setCacheConfiguration(new CacheConfiguration(DEFAULT_CACHE_NAME));
+
+        return cfg;
+    }
+
+    /** {@inheritDoc} */
+    @Override protected void beforeTest() throws Exception {
+        super.beforeTest();
+
+        cleanPersistenceDir();
+
+        Ignite ig = startGrid();
+
+        ig.cluster().active(true);
+
+        try (IgniteDataStreamer<Integer, byte[]> st = 
ig.dataStreamer(DEFAULT_CACHE_NAME)){
+            st.allowOverwrite(true);
+
+            byte[] payload = new byte[1024];
+
+            // Generate WAL segment files.
+            for (int i = 0; i < 100 * 1024; i++)
+                st.addData(i, payload);
+        }
+    }
+
+    /** {@inheritDoc} */
+    @Override protected void afterTest() throws Exception {
+        super.afterTest();
+
+        stopAllGrids();
+
+        cleanPersistenceDir();
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    public void testStandAloneIterator() throws Exception {
+        IgniteEx ig = grid();
+
+        IgniteWriteAheadLogManager wal = ig.context().cache().context().wal();
+
+        File walArchiveDir = U.field(wal, "walArchiveDir");
+
+        IgniteWalIteratorFactory iteratorFactory = new 
IgniteWalIteratorFactory();
+
+        doTest(wal, iteratorFactory.iterator(walArchiveDir));
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    public void testWALManagerIterator() throws Exception {
+        IgniteEx ig = grid();
+
+        IgniteWriteAheadLogManager wal = ig.context().cache().context().wal();
+
+        doTest(wal, wal.replay(null));
+    }
+
+    /**
+     *
+     * @param walMgr WAL manager.
+     * @param it WAL iterator.
+     * @throws IOException If IO exception.
+     * @throws IgniteCheckedException If WAL iterator failed.
+     */
+    private void doTest(IgniteWriteAheadLogManager walMgr, WALIterator it) 
throws IOException, IgniteCheckedException {
+        File walArchiveDir = U.field(walMgr, "walArchiveDir");
+
+        IgniteWalIteratorFactory iteratorFactory = new 
IgniteWalIteratorFactory();
+
+        List<FileDescriptor> descs = iteratorFactory.resolveWalFiles(
+            new IteratorParametersBuilder()
+                .filesOrDirs(walArchiveDir)
+        );
+
+        int maxIndex = descs.size() - 1;
+        int minIndex = 1;
+
+        int corruptedIdx = current().nextInt(minIndex, maxIndex);
+
+        log.info("Corrupted segment with idx:" + corruptedIdx);
+
+        FileWALPointer corruptedPtr = corruptedWAlSegmentFile(
+            descs.get(corruptedIdx),
+            new RandomAccessFileIOFactory(),
+            iteratorFactory
+        );
+
+        log.info("Should fail on ptr " + corruptedPtr);
+
+        FileWALPointer lastReadPtr = null;
+
+        boolean exception = false;
+
+        try (WALIterator it0 = it) {
+            while (it0.hasNextX()) {
+                IgniteBiTuple<WALPointer, WALRecord> tup = it0.nextX();
+
+                lastReadPtr = (FileWALPointer)tup.get1();
+            }
+        }
+        catch (IgniteCheckedException e) {
+            if (e.getMessage().contains("WAL tail reached in archive 
directory, WAL segment file is corrupted."))
+                exception = true;
+        }
+
+        Assert.assertNotNull(lastReadPtr);
+
+        if (!exception) {
+            fail("Last read ptr=" + lastReadPtr + ", corruptedPtr=" + 
corruptedPtr);
+        }
+    }
+
+    /**
+     *
+     * @param desc WAL segment descriptor.
+     * @param ioFactory IO factory.
+     * @param iteratorFactory Iterator factory.
+     * @return Corrupted position/
+     * @throws IOException If IO exception.
+     * @throws IgniteCheckedException If iterator failed.
+     */
+    private FileWALPointer corruptedWAlSegmentFile(
+        FileDescriptor desc,
+        FileIOFactory ioFactory,
+        IgniteWalIteratorFactory iteratorFactory
+    ) throws IOException, IgniteCheckedException {
+        LinkedList<FileWALPointer> pointers = new LinkedList<>();
+
+        try (WALIterator it = iteratorFactory.iterator(desc.file())) {
+            while (it.hasNext()) {
+                IgniteBiTuple<WALPointer, WALRecord> tup = it.next();
+
+                pointers.add((FileWALPointer)tup.get1());
+            }
+        }
+
+        int pointToCorrupt = current().nextInt(pointers.size());
+
+        FileWALPointer ptr = pointers.get(pointToCorrupt);
+
+        int offset = ptr.fileOffset();
+
+        // 20 pointer size, 8 idx, 4 offset, 4 length.
+        ByteBuffer buf = allocate(20);
+
+        Random r = new Random();
+
+        // Corrupt record pointer.
+        r.nextBytes(buf.array());
+
+        try (FileIO io = ioFactory.create(desc.file(), WRITE)) {
+            io.write(buf, offset + 1);
+
+            io.force(true);
+        }
+
+        return ptr;
+    }
+}

http://git-wip-us.apache.org/repos/asf/ignite/blob/dbf55740/modules/core/src/test/java/org/apache/ignite/testsuites/IgnitePdsTestSuite2.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/test/java/org/apache/ignite/testsuites/IgnitePdsTestSuite2.java
 
b/modules/core/src/test/java/org/apache/ignite/testsuites/IgnitePdsTestSuite2.java
index e8a8576..a3905f1 100644
--- 
a/modules/core/src/test/java/org/apache/ignite/testsuites/IgnitePdsTestSuite2.java
+++ 
b/modules/core/src/test/java/org/apache/ignite/testsuites/IgnitePdsTestSuite2.java
@@ -46,6 +46,7 @@ import 
org.apache.ignite.internal.processors.cache.persistence.db.checkpoint.Ign
 import 
org.apache.ignite.internal.processors.cache.persistence.db.IgnitePdsUnusedWalSegmentsTest;
 import 
org.apache.ignite.internal.processors.cache.persistence.db.filename.IgniteUidAsConsistentIdMigrationTest;
 import 
org.apache.ignite.internal.processors.cache.persistence.db.wal.IgniteNodeStoppedDuringDisableWALTest;
+import 
org.apache.ignite.internal.processors.cache.persistence.db.wal.IgniteWALTailIsReachedDuringIterationOverArchiveTest;
 import 
org.apache.ignite.internal.processors.cache.persistence.db.wal.IgniteWalFlushBackgroundSelfTest;
 import 
org.apache.ignite.internal.processors.cache.persistence.db.wal.IgniteWalFlushBackgroundWithMmapBufferSelfTest;
 import 
org.apache.ignite.internal.processors.cache.persistence.db.wal.IgniteWalFlushFsyncSelfTest;
@@ -192,5 +193,7 @@ public class IgnitePdsTestSuite2 extends TestSuite {
         //suite.addTestSuite(IgniteWalRecoverySeveralRestartsTest.class);
 
         suite.addTestSuite(IgniteRebalanceScheduleResendPartitionsTest.class);
+
+        
suite.addTestSuite(IgniteWALTailIsReachedDuringIterationOverArchiveTest.class);
     }
 }

Reply via email to