Repository: ignite Updated Branches: refs/heads/master 1c840f590 -> dbf557400
IGNITE-9050 WAL iterator should throw an exception if segment tail is reached inside archive directory - Fixes #4429. Signed-off-by: Alexey Goncharuk <alexey.goncha...@gmail.com> Project: http://git-wip-us.apache.org/repos/asf/ignite/repo Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/dbf55740 Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/dbf55740 Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/dbf55740 Branch: refs/heads/master Commit: dbf557400c77880e182bcac8358f47d2148451b1 Parents: 1c840f5 Author: Dmitriy Govorukhin <dmitriy.govoruk...@gmail.com> Authored: Fri Aug 10 15:32:19 2018 +0300 Committer: Alexey Goncharuk <alexey.goncha...@gmail.com> Committed: Fri Aug 10 15:32:19 2018 +0300 ---------------------------------------------------------------------- .../wal/AbstractWalRecordsIterator.java | 17 +- .../wal/reader/IgniteWalIteratorFactory.java | 15 +- .../wal/serializer/RecordV2Serializer.java | 19 +- ...IsReachedDuringIterationOverArchiveTest.java | 245 +++++++++++++++++++ .../ignite/testsuites/IgnitePdsTestSuite2.java | 3 + 5 files changed, 279 insertions(+), 20 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/ignite/blob/dbf55740/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/AbstractWalRecordsIterator.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/AbstractWalRecordsIterator.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/AbstractWalRecordsIterator.java index 01b0933..ac68ea9 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/AbstractWalRecordsIterator.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/AbstractWalRecordsIterator.java @@ -162,6 +162,13 @@ public abstract class AbstractWalRecordsIterator } } catch (WalSegmentTailReachedException e) { + AbstractReadFileHandle currWalSegment = this.currWalSegment; + + if (!currWalSegment.workDir()) + throw new IgniteCheckedException( + "WAL tail reached in archive directory, " + + "WAL segment file is corrupted.", e); + log.warning(e.getMessage()); curRec = null; @@ -197,7 +204,8 @@ public abstract class AbstractWalRecordsIterator * @throws IgniteCheckedException if reading failed */ protected abstract AbstractReadFileHandle advanceSegment( - @Nullable final AbstractReadFileHandle curWalSegment) throws IgniteCheckedException; + @Nullable final AbstractReadFileHandle curWalSegment + ) throws IgniteCheckedException; /** * Switches to new record. @@ -222,8 +230,11 @@ public abstract class AbstractWalRecordsIterator return new IgniteBiTuple<>((WALPointer)actualFilePtr, postProcessRecord(rec)); } catch (IOException | IgniteCheckedException e) { - if (e instanceof WalSegmentTailReachedException) - throw (WalSegmentTailReachedException)e; + if (e instanceof WalSegmentTailReachedException) { + throw new WalSegmentTailReachedException( + "WAL segment tail reached. [idx=" + hnd.idx() + + ", isWorkDir=" + hnd.workDir() + ", serVer=" + hnd.ser() + "]", e); + } if (!(e instanceof SegmentEofException) && !(e instanceof EOFException)) { IgniteCheckedException e0 = handleRecordException(e, actualFilePtr); http://git-wip-us.apache.org/repos/asf/ignite/blob/dbf55740/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/reader/IgniteWalIteratorFactory.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/reader/IgniteWalIteratorFactory.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/reader/IgniteWalIteratorFactory.java index 2bfc22d..eeb52b3 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/reader/IgniteWalIteratorFactory.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/reader/IgniteWalIteratorFactory.java @@ -133,10 +133,7 @@ public class IgniteWalIteratorFactory { return new StandaloneWalRecordsIterator(log, prepareSharedCtx(iteratorParametersBuilder), iteratorParametersBuilder.ioFactory, - resolveWalFiles( - iteratorParametersBuilder.filesOrDirs, - iteratorParametersBuilder - ), + resolveWalFiles(iteratorParametersBuilder), iteratorParametersBuilder.filter, iteratorParametersBuilder.keepBinary, iteratorParametersBuilder.bufferSize @@ -182,10 +179,7 @@ public class IgniteWalIteratorFactory { List<T2<Long, Long>> gaps = new ArrayList<>(); - List<FileDescriptor> descriptors = resolveWalFiles( - iteratorParametersBuilder.filesOrDirs, - iteratorParametersBuilder - ); + List<FileDescriptor> descriptors = resolveWalFiles(iteratorParametersBuilder); Iterator<FileDescriptor> it = descriptors.iterator(); @@ -217,10 +211,11 @@ public class IgniteWalIteratorFactory { * @param iteratorParametersBuilder IteratorParametersBuilder. * @return list of file descriptors with checked header records, having correct file index is set */ - private List<FileDescriptor> resolveWalFiles( - File[] filesOrDirs, + public List<FileDescriptor> resolveWalFiles( IteratorParametersBuilder iteratorParametersBuilder ) { + File[] filesOrDirs = iteratorParametersBuilder.filesOrDirs; + if (filesOrDirs == null || filesOrDirs.length == 0) return Collections.emptyList(); http://git-wip-us.apache.org/repos/asf/ignite/blob/dbf55740/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/serializer/RecordV2Serializer.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/serializer/RecordV2Serializer.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/serializer/RecordV2Serializer.java index 2c65ebe..68e55e0 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/serializer/RecordV2Serializer.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/serializer/RecordV2Serializer.java @@ -113,10 +113,14 @@ public class RecordV2Serializer implements RecordSerializer { if (recType == SWITCH_SEGMENT_RECORD) throw new SegmentEofException("Reached end of segment", null); - FileWALPointer ptr = readPositionAndCheckPoint(in, expPtr, skipPositionCheck); + FileWALPointer ptr = readPositionAndCheckPoint(in, expPtr, skipPositionCheck, recType); - if (recType == null) - throw new IOException("Unknown record type: " + recType); + if (recType == null) { + FileWALPointer exp = (FileWALPointer)expPtr; + + throw new IOException("Unknown record type: " + recType + + ", expected pointer [idx=" + exp.index() + ", offset=" + exp.fileOffset() + "]"); + } if (recordFilter != null && !recordFilter.apply(recType, ptr)) { int toSkip = ptr.length() - REC_TYPE_SIZE - FILE_WAL_POINTER_SIZE - CRC_SIZE; @@ -241,7 +245,8 @@ public class RecordV2Serializer implements RecordSerializer { private static FileWALPointer readPositionAndCheckPoint( DataInput in, WALPointer expPtr, - boolean skipPositionCheck + boolean skipPositionCheck, + WALRecord.RecordType type ) throws IgniteCheckedException, IOException { long idx = in.readLong(); int fileOff = in.readInt(); @@ -251,9 +256,9 @@ public class RecordV2Serializer implements RecordSerializer { if (!F.eq(idx, p.index()) || (!skipPositionCheck && !F.eq(fileOff, p.fileOffset()))) throw new WalSegmentTailReachedException( - "WAL segment tail is reached. [ " + - "Expected next state: {Index=" + p.index() + ",Offset=" + p.fileOffset() + "}, " + - "Actual state : {Index=" + idx + ",Offset=" + fileOff + "} ]", null); + "WAL segment tail reached. [ " + + "Expected next state: {Index=" + p.index() + ",Offset=" + p.fileOffset() + "}, " + + "Actual state : {Index=" + idx + ",Offset=" + fileOff + "} ] recordType=" + type, null); return new FileWALPointer(idx, fileOff, len); } http://git-wip-us.apache.org/repos/asf/ignite/blob/dbf55740/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/db/wal/IgniteWALTailIsReachedDuringIterationOverArchiveTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/db/wal/IgniteWALTailIsReachedDuringIterationOverArchiveTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/db/wal/IgniteWALTailIsReachedDuringIterationOverArchiveTest.java new file mode 100644 index 0000000..a7c4081 --- /dev/null +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/db/wal/IgniteWALTailIsReachedDuringIterationOverArchiveTest.java @@ -0,0 +1,245 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.cache.persistence.db.wal; + +import java.io.File; +import java.io.IOException; +import java.nio.ByteBuffer; +import java.util.LinkedList; +import java.util.List; +import java.util.Random; +import org.apache.ignite.Ignite; +import org.apache.ignite.IgniteCheckedException; +import org.apache.ignite.IgniteDataStreamer; +import org.apache.ignite.configuration.CacheConfiguration; +import org.apache.ignite.configuration.DataRegionConfiguration; +import org.apache.ignite.configuration.DataStorageConfiguration; +import org.apache.ignite.configuration.IgniteConfiguration; +import org.apache.ignite.internal.IgniteEx; +import org.apache.ignite.internal.pagemem.wal.IgniteWriteAheadLogManager; +import org.apache.ignite.internal.pagemem.wal.WALIterator; +import org.apache.ignite.internal.pagemem.wal.WALPointer; +import org.apache.ignite.internal.pagemem.wal.record.WALRecord; +import org.apache.ignite.internal.processors.cache.persistence.file.FileIO; +import org.apache.ignite.internal.processors.cache.persistence.file.FileIOFactory; +import org.apache.ignite.internal.processors.cache.persistence.file.RandomAccessFileIOFactory; +import org.apache.ignite.internal.processors.cache.persistence.wal.FileWALPointer; +import org.apache.ignite.internal.processors.cache.persistence.wal.FileWriteAheadLogManager.FileDescriptor; +import org.apache.ignite.internal.processors.cache.persistence.wal.reader.IgniteWalIteratorFactory; +import org.apache.ignite.internal.processors.cache.persistence.wal.reader.IgniteWalIteratorFactory.IteratorParametersBuilder; +import org.apache.ignite.internal.util.typedef.internal.U; +import org.apache.ignite.lang.IgniteBiTuple; +import org.apache.ignite.spi.discovery.tcp.TcpDiscoverySpi; +import org.apache.ignite.spi.discovery.tcp.ipfinder.TcpDiscoveryIpFinder; +import org.apache.ignite.spi.discovery.tcp.ipfinder.vm.TcpDiscoveryVmIpFinder; +import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest; +import org.junit.Assert; + +import static java.nio.ByteBuffer.allocate; +import static java.nio.file.StandardOpenOption.WRITE; +import static java.util.concurrent.ThreadLocalRandom.current; + +/** + * + */ +public class IgniteWALTailIsReachedDuringIterationOverArchiveTest extends GridCommonAbstractTest { + /** */ + private static final TcpDiscoveryIpFinder IP_FINDER = new TcpDiscoveryVmIpFinder(true); + + /** WAL segment size. */ + private static final int WAL_SEGMENT_SIZE = 10 * 1024 * 1024; + + /** {@inheritDoc} */ + @Override protected IgniteConfiguration getConfiguration(String name) throws Exception { + IgniteConfiguration cfg = super.getConfiguration(name); + + cfg.setDataStorageConfiguration( + new DataStorageConfiguration() + .setWalSegmentSize(WAL_SEGMENT_SIZE) + .setWalSegments(2) + .setDefaultDataRegionConfiguration( + new DataRegionConfiguration() + .setPersistenceEnabled(true) + ) + ); + + cfg.setDiscoverySpi(new TcpDiscoverySpi().setIpFinder(IP_FINDER)); + + cfg.setCacheConfiguration(new CacheConfiguration(DEFAULT_CACHE_NAME)); + + return cfg; + } + + /** {@inheritDoc} */ + @Override protected void beforeTest() throws Exception { + super.beforeTest(); + + cleanPersistenceDir(); + + Ignite ig = startGrid(); + + ig.cluster().active(true); + + try (IgniteDataStreamer<Integer, byte[]> st = ig.dataStreamer(DEFAULT_CACHE_NAME)){ + st.allowOverwrite(true); + + byte[] payload = new byte[1024]; + + // Generate WAL segment files. + for (int i = 0; i < 100 * 1024; i++) + st.addData(i, payload); + } + } + + /** {@inheritDoc} */ + @Override protected void afterTest() throws Exception { + super.afterTest(); + + stopAllGrids(); + + cleanPersistenceDir(); + } + + /** + * @throws Exception If failed. + */ + public void testStandAloneIterator() throws Exception { + IgniteEx ig = grid(); + + IgniteWriteAheadLogManager wal = ig.context().cache().context().wal(); + + File walArchiveDir = U.field(wal, "walArchiveDir"); + + IgniteWalIteratorFactory iteratorFactory = new IgniteWalIteratorFactory(); + + doTest(wal, iteratorFactory.iterator(walArchiveDir)); + } + + /** + * @throws Exception If failed. + */ + public void testWALManagerIterator() throws Exception { + IgniteEx ig = grid(); + + IgniteWriteAheadLogManager wal = ig.context().cache().context().wal(); + + doTest(wal, wal.replay(null)); + } + + /** + * + * @param walMgr WAL manager. + * @param it WAL iterator. + * @throws IOException If IO exception. + * @throws IgniteCheckedException If WAL iterator failed. + */ + private void doTest(IgniteWriteAheadLogManager walMgr, WALIterator it) throws IOException, IgniteCheckedException { + File walArchiveDir = U.field(walMgr, "walArchiveDir"); + + IgniteWalIteratorFactory iteratorFactory = new IgniteWalIteratorFactory(); + + List<FileDescriptor> descs = iteratorFactory.resolveWalFiles( + new IteratorParametersBuilder() + .filesOrDirs(walArchiveDir) + ); + + int maxIndex = descs.size() - 1; + int minIndex = 1; + + int corruptedIdx = current().nextInt(minIndex, maxIndex); + + log.info("Corrupted segment with idx:" + corruptedIdx); + + FileWALPointer corruptedPtr = corruptedWAlSegmentFile( + descs.get(corruptedIdx), + new RandomAccessFileIOFactory(), + iteratorFactory + ); + + log.info("Should fail on ptr " + corruptedPtr); + + FileWALPointer lastReadPtr = null; + + boolean exception = false; + + try (WALIterator it0 = it) { + while (it0.hasNextX()) { + IgniteBiTuple<WALPointer, WALRecord> tup = it0.nextX(); + + lastReadPtr = (FileWALPointer)tup.get1(); + } + } + catch (IgniteCheckedException e) { + if (e.getMessage().contains("WAL tail reached in archive directory, WAL segment file is corrupted.")) + exception = true; + } + + Assert.assertNotNull(lastReadPtr); + + if (!exception) { + fail("Last read ptr=" + lastReadPtr + ", corruptedPtr=" + corruptedPtr); + } + } + + /** + * + * @param desc WAL segment descriptor. + * @param ioFactory IO factory. + * @param iteratorFactory Iterator factory. + * @return Corrupted position/ + * @throws IOException If IO exception. + * @throws IgniteCheckedException If iterator failed. + */ + private FileWALPointer corruptedWAlSegmentFile( + FileDescriptor desc, + FileIOFactory ioFactory, + IgniteWalIteratorFactory iteratorFactory + ) throws IOException, IgniteCheckedException { + LinkedList<FileWALPointer> pointers = new LinkedList<>(); + + try (WALIterator it = iteratorFactory.iterator(desc.file())) { + while (it.hasNext()) { + IgniteBiTuple<WALPointer, WALRecord> tup = it.next(); + + pointers.add((FileWALPointer)tup.get1()); + } + } + + int pointToCorrupt = current().nextInt(pointers.size()); + + FileWALPointer ptr = pointers.get(pointToCorrupt); + + int offset = ptr.fileOffset(); + + // 20 pointer size, 8 idx, 4 offset, 4 length. + ByteBuffer buf = allocate(20); + + Random r = new Random(); + + // Corrupt record pointer. + r.nextBytes(buf.array()); + + try (FileIO io = ioFactory.create(desc.file(), WRITE)) { + io.write(buf, offset + 1); + + io.force(true); + } + + return ptr; + } +} http://git-wip-us.apache.org/repos/asf/ignite/blob/dbf55740/modules/core/src/test/java/org/apache/ignite/testsuites/IgnitePdsTestSuite2.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/testsuites/IgnitePdsTestSuite2.java b/modules/core/src/test/java/org/apache/ignite/testsuites/IgnitePdsTestSuite2.java index e8a8576..a3905f1 100644 --- a/modules/core/src/test/java/org/apache/ignite/testsuites/IgnitePdsTestSuite2.java +++ b/modules/core/src/test/java/org/apache/ignite/testsuites/IgnitePdsTestSuite2.java @@ -46,6 +46,7 @@ import org.apache.ignite.internal.processors.cache.persistence.db.checkpoint.Ign import org.apache.ignite.internal.processors.cache.persistence.db.IgnitePdsUnusedWalSegmentsTest; import org.apache.ignite.internal.processors.cache.persistence.db.filename.IgniteUidAsConsistentIdMigrationTest; import org.apache.ignite.internal.processors.cache.persistence.db.wal.IgniteNodeStoppedDuringDisableWALTest; +import org.apache.ignite.internal.processors.cache.persistence.db.wal.IgniteWALTailIsReachedDuringIterationOverArchiveTest; import org.apache.ignite.internal.processors.cache.persistence.db.wal.IgniteWalFlushBackgroundSelfTest; import org.apache.ignite.internal.processors.cache.persistence.db.wal.IgniteWalFlushBackgroundWithMmapBufferSelfTest; import org.apache.ignite.internal.processors.cache.persistence.db.wal.IgniteWalFlushFsyncSelfTest; @@ -192,5 +193,7 @@ public class IgnitePdsTestSuite2 extends TestSuite { //suite.addTestSuite(IgniteWalRecoverySeveralRestartsTest.class); suite.addTestSuite(IgniteRebalanceScheduleResendPartitionsTest.class); + + suite.addTestSuite(IgniteWALTailIsReachedDuringIterationOverArchiveTest.class); } }