IGNITE-6539 WAL parser fails if empty log files exist in directory - Fixes #2794.
Signed-off-by: Alexey Goncharuk <[email protected]> Project: http://git-wip-us.apache.org/repos/asf/ignite/repo Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/474479c3 Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/474479c3 Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/474479c3 Branch: refs/heads/ignite-3478 Commit: 474479c3f624c3a3c67b6ae549a566caac1f6b1c Parents: 3b1cad2 Author: dpavlov <[email protected]> Authored: Thu Oct 5 18:06:27 2017 +0300 Committer: Alexey Goncharuk <[email protected]> Committed: Thu Oct 5 18:06:27 2017 +0300 ---------------------------------------------------------------------- .../reader/StandaloneWalRecordsIterator.java | 24 ++++-- .../db/wal/reader/IgniteWalReaderTest.java | 89 ++++++++++++++++++++ 2 files changed, 104 insertions(+), 9 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/ignite/blob/474479c3/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/reader/StandaloneWalRecordsIterator.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/reader/StandaloneWalRecordsIterator.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/reader/StandaloneWalRecordsIterator.java index 24b2148..42bb410 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/reader/StandaloneWalRecordsIterator.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/reader/StandaloneWalRecordsIterator.java @@ -45,10 +45,10 @@ import org.apache.ignite.internal.processors.cache.persistence.wal.ByteBufferExp import org.apache.ignite.internal.processors.cache.persistence.wal.FileInput; import org.apache.ignite.internal.processors.cache.persistence.wal.FileWALPointer; import org.apache.ignite.internal.processors.cache.persistence.wal.FileWriteAheadLogManager; -import org.apache.ignite.internal.processors.cache.persistence.wal.SegmentEofException; import org.apache.ignite.internal.processors.cache.persistence.wal.serializer.RecordV1Serializer; import org.apache.ignite.internal.processors.cacheobject.IgniteCacheObjectProcessor; import org.apache.ignite.internal.util.typedef.F; +import org.apache.ignite.internal.util.typedef.internal.U; import org.jetbrains.annotations.NotNull; import org.jetbrains.annotations.Nullable; @@ -185,12 +185,11 @@ class StandaloneWalRecordsIterator extends AbstractWalRecordsIterator { * Header record and its position is checked. WAL position is used to determine real index. * File index from file name is ignored. * - * @param allFiles files to scan - * @return list of file descriptors with checked header records, file index is set - * @throws IgniteCheckedException if IO error occurs + * @param allFiles files to scan. + * @return list of file descriptors with checked header records, having correct file index is set */ private List<FileWriteAheadLogManager.FileDescriptor> scanIndexesFromFileHeaders( - @Nullable final File[] allFiles) throws IgniteCheckedException { + @Nullable final File[] allFiles) { if (allFiles == null || allFiles.length == 0) return Collections.emptyList(); @@ -198,7 +197,7 @@ class StandaloneWalRecordsIterator extends AbstractWalRecordsIterator { for (File file : allFiles) { if (file.length() < HEADER_RECORD_SIZE) - continue; + continue; //filter out this segment as it is too short FileWALPointer ptr; @@ -211,17 +210,24 @@ class StandaloneWalRecordsIterator extends AbstractWalRecordsIterator { // Header record must be agnostic to the serializer version. final int type = in.readUnsignedByte(); - if (type == WALRecord.RecordType.STOP_ITERATION_RECORD_TYPE) - throw new SegmentEofException("Reached logical end of the segment", null); + if (type == WALRecord.RecordType.STOP_ITERATION_RECORD_TYPE) { + if (log.isInfoEnabled()) + log.info("Reached logical end of the segment for file " + file); + + continue; //filter out this segment + } ptr = RecordV1Serializer.readPosition(in); } catch (IOException e) { - throw new IgniteCheckedException("Failed to scan index from file [" + file + "]", e); + U.warn(log, "Failed to scan index from file [" + file + "]. Skipping this file during iteration", e); + + continue; //filter out this segment } resultingDescs.add(new FileWriteAheadLogManager.FileDescriptor(file, ptr.index())); } Collections.sort(resultingDescs); + return resultingDescs; } http://git-wip-us.apache.org/repos/asf/ignite/blob/474479c3/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/db/wal/reader/IgniteWalReaderTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/db/wal/reader/IgniteWalReaderTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/db/wal/reader/IgniteWalReaderTest.java index 6db2784..93df8b2 100644 --- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/db/wal/reader/IgniteWalReaderTest.java +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/db/wal/reader/IgniteWalReaderTest.java @@ -75,6 +75,7 @@ import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest; import org.apache.ignite.transactions.Transaction; import org.jetbrains.annotations.NotNull; import org.jetbrains.annotations.Nullable; +import org.junit.Assert; import static org.apache.ignite.events.EventType.EVT_WAL_SEGMENT_ARCHIVED; import static org.apache.ignite.internal.processors.cache.GridCacheOperation.DELETE; @@ -340,6 +341,22 @@ public class IgniteWalReaderTest extends GridCommonAbstractTest { } /** + * Puts provided number of records to fill WAL + * + * @param ignite ignite instance + * @param recordsToWrite count + */ + private void putAllDummyRecords(Ignite ignite, int recordsToWrite) { + IgniteCache<Object, Object> cache0 = ignite.cache(CACHE_NAME); + + Map<Object, Object> values = new HashMap<>(); + + for (int i = 0; i < recordsToWrite; i++) + values.put(i, new IndexedObject(i)); + + cache0.putAll(values); + } + /** * Puts provided number of records to fill WAL under transactions * * @param ignite ignite instance @@ -715,6 +732,78 @@ public class IgniteWalReaderTest extends GridCommonAbstractTest { } /** + * Tests archive completed event is fired + * + * @throws Exception if failed + */ + public void testFillWalForExactSegmentsCount() throws Exception { + customWalMode = WALMode.DEFAULT; + + final CountDownLatch reqSegments = new CountDownLatch(15); + final Ignite ignite = startGrid("node0"); + + ignite.active(true); + + final IgniteEvents evts = ignite.events(); + + if (!evts.isEnabled(EVT_WAL_SEGMENT_ARCHIVED)) + assertTrue("nothing to test", false); + + evts.localListen(new IgnitePredicate<Event>() { + @Override public boolean apply(Event e) { + WalSegmentArchivedEvent archComplEvt = (WalSegmentArchivedEvent)e; + long idx = archComplEvt.getAbsWalSegmentIdx(); + log.info("Finished archive for segment [" + idx + ", " + + archComplEvt.getArchiveFile() + "]: [" + e + "]"); + + reqSegments.countDown(); + return true; + } + }, EVT_WAL_SEGMENT_ARCHIVED); + + + int totalEntries = 0; + while (reqSegments.getCount() > 0) { + final int write = 500; + putAllDummyRecords(ignite, write); + totalEntries += write; + Assert.assertTrue("Too much entries generated, but segments was not become available", + totalEntries < 10000); + } + final String subfolderName = genDbSubfolderName(ignite, 0); + + stopGrid("node0"); + + final String workDir = U.defaultWorkDirectory(); + final IgniteWalIteratorFactory factory = createWalIteratorFactory(subfolderName, workDir); + + scanIterateAndCount(factory, workDir, subfolderName, totalEntries, 0, null, null); + } + + /** + * Tests reading of empty WAL from non filled cluster + * + * @throws Exception if failed. + */ + public void testReadEmptyWal() throws Exception { + customWalMode = WALMode.DEFAULT; + + final Ignite ignite = startGrid("node0"); + + ignite.active(true); + ignite.active(false); + + final String subfolderName = genDbSubfolderName(ignite, 0); + + stopGrid("node0"); + + final String workDir = U.defaultWorkDirectory(); + final IgniteWalIteratorFactory factory = createWalIteratorFactory(subfolderName, workDir); + + scanIterateAndCount(factory, workDir, subfolderName, 0, 0, null, null); + } + + /** * Creates and fills cache with data. * * @param ig Ignite instance.
