IGNITE-8661 WALIterator should be stopped if it fails to deserialize a record - Fixes #4155.
Signed-off-by: Alexey Goncharuk <alexey.goncha...@gmail.com> Project: http://git-wip-us.apache.org/repos/asf/ignite/repo Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/d6ab2ae6 Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/d6ab2ae6 Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/d6ab2ae6 Branch: refs/heads/ignite-8446 Commit: d6ab2ae684d04431fbe0d8bc52bd6634d54e52b2 Parents: c56d16f Author: Dmitriy Govorukhin <dmitriy.govoruk...@gmail.com> Authored: Thu Jun 28 17:39:38 2018 +0300 Committer: Alexey Goncharuk <alexey.goncha...@gmail.com> Committed: Thu Jun 28 17:49:45 2018 +0300 ---------------------------------------------------------------------- .../pagemem/wal/IgniteWriteAheadLogManager.java | 5 + .../GridCacheDatabaseSharedManager.java | 221 +++- .../wal/AbstractWalRecordsIterator.java | 27 +- .../wal/FileWriteAheadLogManager.java | 39 +- .../wal/FsyncModeFileWriteAheadLogManager.java | 5 + .../SingleSegmentLogicalRecordsIterator.java | 4 +- .../wal/reader/IgniteWalIteratorFactory.java | 617 +++++++--- .../reader/StandaloneWalRecordsIterator.java | 246 ++-- .../serializer/RecordSerializerFactoryImpl.java | 39 +- .../wal/serializer/RecordV1Serializer.java | 27 +- .../wal/serializer/RecordV2Serializer.java | 3 + ...gniteWalIteratorExceptionDuringReadTest.java | 150 +++ .../db/wal/reader/IgniteWalReaderTest.java | 1050 ++++++++++-------- .../persistence/pagemem/NoOpWALManager.java | 5 + .../ignite/testsuites/IgnitePdsTestSuite2.java | 3 + .../development/utils/IgniteWalConverter.java | 10 +- 16 files changed, 1589 insertions(+), 862 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/ignite/blob/d6ab2ae6/modules/core/src/main/java/org/apache/ignite/internal/pagemem/wal/IgniteWriteAheadLogManager.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/pagemem/wal/IgniteWriteAheadLogManager.java b/modules/core/src/main/java/org/apache/ignite/internal/pagemem/wal/IgniteWriteAheadLogManager.java index fd5d53b..2b6358b 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/pagemem/wal/IgniteWriteAheadLogManager.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/pagemem/wal/IgniteWriteAheadLogManager.java @@ -122,6 +122,11 @@ public interface IgniteWriteAheadLogManager extends GridCacheSharedManager, Igni public int walArchiveSegments(); /** + * @return Last archived segment index. + */ + public long lastArchivedSegment(); + + /** * Checks if WAL segment is under lock or reserved * * @param ptr Pointer to check. http://git-wip-us.apache.org/repos/asf/ignite/blob/d6ab2ae6/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/GridCacheDatabaseSharedManager.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/GridCacheDatabaseSharedManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/GridCacheDatabaseSharedManager.java index 9892b8c..7c83c1e 100755 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/GridCacheDatabaseSharedManager.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/GridCacheDatabaseSharedManager.java @@ -131,6 +131,7 @@ import org.apache.ignite.internal.processors.cache.persistence.snapshot.Snapshot import org.apache.ignite.internal.processors.cache.persistence.tree.io.PageIO; import org.apache.ignite.internal.processors.cache.persistence.tree.io.PagePartitionMetaIO; import org.apache.ignite.internal.processors.cache.persistence.wal.FileWALPointer; +import org.apache.ignite.internal.processors.cache.persistence.wal.crc.IgniteDataIntegrityViolationException; import org.apache.ignite.internal.processors.cache.persistence.wal.crc.PureJavaCrc32; import org.apache.ignite.internal.processors.port.GridPortRecord; import org.apache.ignite.internal.util.GridMultiCollectionWrapper; @@ -166,6 +167,7 @@ import static org.apache.ignite.IgniteSystemProperties.IGNITE_PDS_SKIP_CRC; import static org.apache.ignite.IgniteSystemProperties.IGNITE_PDS_WAL_REBALANCE_THRESHOLD; import static org.apache.ignite.failure.FailureType.CRITICAL_ERROR; import static org.apache.ignite.failure.FailureType.SYSTEM_WORKER_TERMINATION; +import static org.apache.ignite.internal.pagemem.wal.record.WALRecord.RecordType.CHECKPOINT_RECORD; import static org.apache.ignite.internal.processors.cache.persistence.metastorage.MetaStorage.METASTORAGE_CACHE_ID; /** @@ -1930,39 +1932,26 @@ public class GridCacheDatabaseSharedManager extends IgniteCacheDatabaseSharedMan cctx.wal().allowCompressionUntil(status.startPtr); long start = U.currentTimeMillis(); - int applied = 0; - WALPointer lastRead = null; + + long lastArchivedSegment = cctx.wal().lastArchivedSegment(); + + RestoreBinaryState restoreBinaryState = new RestoreBinaryState(status, lastArchivedSegment, log); Collection<Integer> ignoreGrps = metastoreOnly ? Collections.emptySet() : F.concat(false, initiallyGlobalWalDisabledGrps, initiallyLocalWalDisabledGrps); + int applied = 0; + try (WALIterator it = cctx.wal().replay(status.endPtr)) { while (it.hasNextX()) { - IgniteBiTuple<WALPointer, WALRecord> tup = it.nextX(); - - WALRecord rec = tup.get2(); + WALRecord rec = restoreBinaryState.next(it); - lastRead = tup.get1(); + if (rec == null) + break; switch (rec.type()) { - case CHECKPOINT_RECORD: - CheckpointRecord cpRec = (CheckpointRecord)rec; - - // We roll memory up until we find a checkpoint start record registered in the status. - if (F.eq(cpRec.checkpointId(), status.cpStartId)) { - log.info("Found last checkpoint marker [cpId=" + cpRec.checkpointId() + - ", pos=" + tup.get1() + ']'); - - apply = false; - } - else if (!F.eq(cpRec.checkpointId(), status.cpEndId)) - U.warn(log, "Found unexpected checkpoint marker, skipping [cpId=" + cpRec.checkpointId() + - ", expCpId=" + status.cpStartId + ", pos=" + tup.get1() + ']'); - - break; - case PAGE_RECORD: - if (apply) { + if (restoreBinaryState.needApplyBinaryUpdate()) { PageSnapshot pageRec = (PageSnapshot)rec; // Here we do not require tag check because we may be applying memory changes after @@ -2045,7 +2034,7 @@ public class GridCacheDatabaseSharedManager extends IgniteCacheDatabaseSharedMan break; default: - if (apply && rec instanceof PageDeltaRecord) { + if (restoreBinaryState.needApplyBinaryUpdate() && rec instanceof PageDeltaRecord) { PageDeltaRecord r = (PageDeltaRecord)rec; int grpId = r.groupId(); @@ -2086,11 +2075,13 @@ public class GridCacheDatabaseSharedManager extends IgniteCacheDatabaseSharedMan if (metastoreOnly) return null; + WALPointer lastReadPtr = restoreBinaryState.lastReadRecordPointer(); + if (status.needRestoreMemory()) { - if (apply) + if (restoreBinaryState.needApplyBinaryUpdate()) throw new StorageException("Failed to restore memory state (checkpoint marker is present " + "on disk, but checkpoint record is missed in WAL) " + - "[cpStatus=" + status + ", lastRead=" + lastRead + "]"); + "[cpStatus=" + status + ", lastRead=" + lastReadPtr + "]"); log.info("Finished applying memory changes [changesApplied=" + applied + ", time=" + (U.currentTimeMillis() - start) + "ms]"); @@ -2101,7 +2092,7 @@ public class GridCacheDatabaseSharedManager extends IgniteCacheDatabaseSharedMan cpHistory.initialize(retreiveHistory()); - return lastRead == null ? null : lastRead.next(); + return lastReadPtr == null ? null : lastReadPtr.next(); } /** @@ -2210,6 +2201,10 @@ public class GridCacheDatabaseSharedManager extends IgniteCacheDatabaseSharedMan if (!metastoreOnly) cctx.kernalContext().query().skipFieldLookup(true); + long lastArchivedSegment = cctx.wal().lastArchivedSegment(); + + RestoreLogicalState restoreLogicalState = new RestoreLogicalState(lastArchivedSegment, log); + long start = U.currentTimeMillis(); int applied = 0; @@ -2220,9 +2215,10 @@ public class GridCacheDatabaseSharedManager extends IgniteCacheDatabaseSharedMan Map<T2<Integer, Integer>, T2<Integer, Long>> partStates = new HashMap<>(); while (it.hasNextX()) { - IgniteBiTuple<WALPointer, WALRecord> next = it.nextX(); + WALRecord rec = restoreLogicalState.next(it); - WALRecord rec = next.get2(); + if (rec == null) + break; switch (rec.type()) { case DATA_RECORD: @@ -4365,4 +4361,171 @@ public class GridCacheDatabaseSharedManager extends IgniteCacheDatabaseSharedMan else return null; } + + /** + * Abstract class for create restore context. + */ + public abstract static class RestoreStateContext { + /** */ + protected final IgniteLogger log; + + /** Last archived segment. */ + protected final long lastArchivedSegment; + + /** Last read record WAL pointer. */ + protected FileWALPointer lastRead; + + /** + * @param lastArchivedSegment Last archived segment index. + * @param log Ignite logger. + */ + public RestoreStateContext(long lastArchivedSegment, IgniteLogger log) { + this.lastArchivedSegment = lastArchivedSegment; + this.log = log; + } + + /** + * Advance iterator to the next record. + * + * @param it WAL iterator. + * @return WALRecord entry. + * @throws IgniteCheckedException If CRC check fail during binary recovery state or another exception occurring. + */ + public WALRecord next(WALIterator it) throws IgniteCheckedException { + try { + IgniteBiTuple<WALPointer, WALRecord> tup = it.nextX(); + + WALRecord rec = tup.get2(); + + WALPointer ptr = tup.get1(); + + lastRead = (FileWALPointer)ptr; + + rec.position(ptr); + + return rec; + } + catch (IgniteCheckedException e) { + boolean throwsCRCError = throwsCRCError(); + + if (X.hasCause(e, IgniteDataIntegrityViolationException.class)) { + if (throwsCRCError) + throw e; + else + return null; + } + + log.error("Catch error during restore state, throwsCRCError=" + throwsCRCError, e); + + throw e; + } + } + + /** + * + * @return Last read WAL record pointer. + */ + public WALPointer lastReadRecordPointer() { + return lastRead; + } + + /** + * + * @return Flag indicates need throws CRC exception or not. + */ + public boolean throwsCRCError(){ + FileWALPointer lastReadPtr = lastRead; + + return lastReadPtr != null && lastReadPtr.index() <= lastArchivedSegment; + } + } + + /** + * Restore memory context. Tracks the safety of binary recovery. + */ + public static class RestoreBinaryState extends RestoreStateContext { + /** Checkpoint status. */ + private final CheckpointStatus status; + + /** The flag indicates need to apply the binary update or no needed. */ + private boolean needApplyBinaryUpdates; + + /** + * @param status Checkpoint status. + * @param lastArchivedSegment Last archived segment index. + * @param log Ignite logger. + */ + public RestoreBinaryState(CheckpointStatus status, long lastArchivedSegment, IgniteLogger log) { + super(lastArchivedSegment, log); + + this.status = status; + needApplyBinaryUpdates = status.needRestoreMemory(); + } + + /** + * Advance iterator to the next record. + * + * @param it WAL iterator. + * @return WALRecord entry. + * @throws IgniteCheckedException If CRC check fail during binary recovery state or another exception occurring. + */ + @Override public WALRecord next(WALIterator it) throws IgniteCheckedException { + WALRecord rec = super.next(it); + + if (rec == null) + return null; + + if (rec.type() == CHECKPOINT_RECORD) { + CheckpointRecord cpRec = (CheckpointRecord)rec; + + // We roll memory up until we find a checkpoint start record registered in the status. + if (F.eq(cpRec.checkpointId(), status.cpStartId)) { + log.info("Found last checkpoint marker [cpId=" + cpRec.checkpointId() + + ", pos=" + rec.position() + ']'); + + needApplyBinaryUpdates = false; + } + else if (!F.eq(cpRec.checkpointId(), status.cpEndId)) + U.warn(log, "Found unexpected checkpoint marker, skipping [cpId=" + cpRec.checkpointId() + + ", expCpId=" + status.cpStartId + ", pos=" + rec.position() + ']'); + } + + return rec; + } + + /** + * + * @return Flag indicates need apply binary record or not. + */ + public boolean needApplyBinaryUpdate() { + return needApplyBinaryUpdates; + } + + /** + * + * @return Flag indicates need throws CRC exception or not. + */ + @Override public boolean throwsCRCError() { + log.info("Throws CRC error check, needApplyBinaryUpdates=" + needApplyBinaryUpdates + + ", lastArchivedSegment=" + lastArchivedSegment + ", lastRead=" + lastRead); + + if (needApplyBinaryUpdates) + return true; + + return super.throwsCRCError(); + } + } + + /** + * Restore logical state context. Tracks the safety of logical recovery. + */ + public static class RestoreLogicalState extends RestoreStateContext { + /** + * @param lastArchivedSegment Last archived segment index. + * @param log Ignite logger. + */ + public RestoreLogicalState(long lastArchivedSegment, IgniteLogger log) { + super(lastArchivedSegment, log); + } + } } http://git-wip-us.apache.org/repos/asf/ignite/blob/d6ab2ae6/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/AbstractWalRecordsIterator.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/AbstractWalRecordsIterator.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/AbstractWalRecordsIterator.java index e442386..01b0933 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/AbstractWalRecordsIterator.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/AbstractWalRecordsIterator.java @@ -91,21 +91,21 @@ public abstract class AbstractWalRecordsIterator * @param sharedCtx Shared context. * @param serializerFactory Serializer of current version to read headers. * @param ioFactory ioFactory for file IO access. - * @param bufSize buffer for reading records size. + * @param initialReadBufferSize buffer for reading records size. */ protected AbstractWalRecordsIterator( @NotNull final IgniteLogger log, @NotNull final GridCacheSharedContext sharedCtx, @NotNull final RecordSerializerFactory serializerFactory, @NotNull final FileIOFactory ioFactory, - final int bufSize + final int initialReadBufferSize ) { this.log = log; this.sharedCtx = sharedCtx; this.serializerFactory = serializerFactory; this.ioFactory = ioFactory; - buf = new ByteBufferExpander(bufSize, ByteOrder.nativeOrder()); + buf = new ByteBufferExpander(initialReadBufferSize, ByteOrder.nativeOrder()); } /** {@inheritDoc} */ @@ -225,8 +225,12 @@ public abstract class AbstractWalRecordsIterator if (e instanceof WalSegmentTailReachedException) throw (WalSegmentTailReachedException)e; - if (!(e instanceof SegmentEofException)) - handleRecordException(e, actualFilePtr); + if (!(e instanceof SegmentEofException) && !(e instanceof EOFException)) { + IgniteCheckedException e0 = handleRecordException(e, actualFilePtr); + + if (e0 != null) + throw e0; + } return null; } @@ -248,12 +252,15 @@ public abstract class AbstractWalRecordsIterator * * @param e problem from records reading * @param ptr file pointer was accessed + * + * @return {@code null} if the error was handled and we can go ahead, + * {@code IgniteCheckedException} if the error was not handled, and we should stop the iteration. */ - protected void handleRecordException( - @NotNull final Exception e, - @Nullable final FileWALPointer ptr) { + protected IgniteCheckedException handleRecordException(@NotNull final Exception e, @Nullable final FileWALPointer ptr) { if (log.isInfoEnabled()) log.info("Stopping WAL iteration due to an exception: " + e.getMessage() + ", ptr=" + ptr); + + return new IgniteCheckedException(e); } /** @@ -265,8 +272,8 @@ public abstract class AbstractWalRecordsIterator */ protected AbstractReadFileHandle initReadHandle( @NotNull final AbstractFileDescriptor desc, - @Nullable final FileWALPointer start) - throws IgniteCheckedException, FileNotFoundException { + @Nullable final FileWALPointer start + ) throws IgniteCheckedException, FileNotFoundException { try { FileIO fileIO = desc.isCompressed() ? new UnzipFileIO(desc.file()) : ioFactory.create(desc.file()); http://git-wip-us.apache.org/repos/asf/ignite/blob/d6ab2ae6/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/FileWriteAheadLogManager.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/FileWriteAheadLogManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/FileWriteAheadLogManager.java index 3ca51f3..96387eb 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/FileWriteAheadLogManager.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/FileWriteAheadLogManager.java @@ -93,6 +93,7 @@ import org.apache.ignite.internal.processors.cache.persistence.file.FileIO; import org.apache.ignite.internal.processors.cache.persistence.file.FileIOFactory; import org.apache.ignite.internal.processors.cache.persistence.file.RandomAccessFileIOFactory; import org.apache.ignite.internal.processors.cache.persistence.filename.PdsFolderSettings; +import org.apache.ignite.internal.processors.cache.persistence.wal.AbstractWalRecordsIterator.AbstractFileDescriptor; import org.apache.ignite.internal.processors.cache.persistence.wal.crc.PureJavaCrc32; import org.apache.ignite.internal.processors.cache.persistence.wal.record.HeaderRecord; import org.apache.ignite.internal.processors.cache.persistence.wal.serializer.RecordSerializer; @@ -126,6 +127,7 @@ import static java.nio.file.StandardOpenOption.WRITE; import static org.apache.ignite.IgniteSystemProperties.IGNITE_WAL_MMAP; import static org.apache.ignite.IgniteSystemProperties.IGNITE_WAL_SERIALIZER_VERSION; import static org.apache.ignite.configuration.WALMode.LOG_ONLY; +import static org.apache.ignite.events.EventType.EVT_WAL_SEGMENT_ARCHIVED; import static org.apache.ignite.failure.FailureType.CRITICAL_ERROR; import static org.apache.ignite.failure.FailureType.SYSTEM_WORKER_TERMINATION; import static org.apache.ignite.internal.pagemem.wal.record.WALRecord.RecordType.SWITCH_SEGMENT_RECORD; @@ -170,7 +172,7 @@ public class FileWriteAheadLogManager extends GridCacheSharedManagerAdapter impl private static final byte[] FILL_BUF = new byte[1024 * 1024]; /** Pattern for segment file names */ - private static final Pattern WAL_NAME_PATTERN = Pattern.compile("\\d{16}\\.wal"); + public static final Pattern WAL_NAME_PATTERN = Pattern.compile("\\d{16}\\.wal"); /** */ private static final Pattern WAL_TEMP_NAME_PATTERN = Pattern.compile("\\d{16}\\.wal\\.tmp"); @@ -190,7 +192,7 @@ public class FileWriteAheadLogManager extends GridCacheSharedManagerAdapter impl }; /** */ - private static final Pattern WAL_SEGMENT_FILE_COMPACTED_PATTERN = Pattern.compile("\\d{16}\\.wal\\.zip"); + public static final Pattern WAL_SEGMENT_FILE_COMPACTED_PATTERN = Pattern.compile("\\d{16}\\.wal\\.zip"); /** WAL segment file filter, see {@link #WAL_NAME_PATTERN} */ public static final FileFilter WAL_SEGMENT_COMPACTED_OR_RAW_FILE_FILTER = new FileFilter() { @@ -951,6 +953,11 @@ public class FileWriteAheadLogManager extends GridCacheSharedManagerAdapter impl } /** {@inheritDoc} */ + @Override public long lastArchivedSegment() { + return archivedMonitor.lastArchivedAbsoluteIndex(); + } + + /** {@inheritDoc} */ @Override public boolean reserved(WALPointer ptr) { FileWALPointer fPtr = (FileWALPointer)ptr; @@ -1655,9 +1662,12 @@ public class FileWriteAheadLogManager extends GridCacheSharedManagerAdapter impl notifyAll(); } - if (evt.isRecordable(EventType.EVT_WAL_SEGMENT_ARCHIVED)) { - evt.record(new WalSegmentArchivedEvent(cctx.discovery().localNode(), - res.getAbsIdx(), res.getDstArchiveFile())); + if (evt.isRecordable(EVT_WAL_SEGMENT_ARCHIVED)) { + evt.record(new WalSegmentArchivedEvent( + cctx.discovery().localNode(), + res.getAbsIdx(), + res.getDstArchiveFile()) + ); } } } @@ -1911,7 +1921,7 @@ public class FileWriteAheadLogManager extends GridCacheSharedManagerAdapter impl FileDescriptor[] alreadyCompressed = scan(walArchiveDir.listFiles(WAL_SEGMENT_FILE_COMPACTED_FILTER)); if (alreadyCompressed.length > 0) - lastCompressedIdx = alreadyCompressed[alreadyCompressed.length - 1].getIdx(); + lastCompressedIdx = alreadyCompressed[alreadyCompressed.length - 1].idx(); } /** @@ -2319,7 +2329,8 @@ public class FileWriteAheadLogManager extends GridCacheSharedManagerAdapter impl /** * WAL file descriptor. */ - public static class FileDescriptor implements Comparable<FileDescriptor>, AbstractWalRecordsIterator.AbstractFileDescriptor { + public static class FileDescriptor implements + Comparable<FileDescriptor>, AbstractFileDescriptor { /** */ protected final File file; @@ -2390,20 +2401,6 @@ public class FileWriteAheadLogManager extends GridCacheSharedManagerAdapter impl } /** - * @return Absolute WAL segment file index - */ - public long getIdx() { - return idx; - } - - /** - * @return absolute pathname string of this file descriptor pathname. - */ - public String getAbsolutePath() { - return file.getAbsolutePath(); - } - - /** * @return True if segment is ZIP compressed. */ @Override public boolean isCompressed() { http://git-wip-us.apache.org/repos/asf/ignite/blob/d6ab2ae6/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/FsyncModeFileWriteAheadLogManager.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/FsyncModeFileWriteAheadLogManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/FsyncModeFileWriteAheadLogManager.java index 6e59ad3..5db21d2 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/FsyncModeFileWriteAheadLogManager.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/FsyncModeFileWriteAheadLogManager.java @@ -846,6 +846,11 @@ public class FsyncModeFileWriteAheadLogManager extends GridCacheSharedManagerAda } /** {@inheritDoc} */ + @Override public long lastArchivedSegment() { + return archiver.lastArchivedAbsoluteIndex(); + } + + /** {@inheritDoc} */ @Override public boolean reserved(WALPointer ptr) { FileWALPointer fPtr = (FileWALPointer)ptr; http://git-wip-us.apache.org/repos/asf/ignite/blob/d6ab2ae6/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/SingleSegmentLogicalRecordsIterator.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/SingleSegmentLogicalRecordsIterator.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/SingleSegmentLogicalRecordsIterator.java index 36e5b0e..f688bb4 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/SingleSegmentLogicalRecordsIterator.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/SingleSegmentLogicalRecordsIterator.java @@ -86,9 +86,7 @@ public class SingleSegmentLogicalRecordsIterator extends AbstractWalRecordsItera private static RecordSerializerFactory initLogicalRecordsSerializerFactory(GridCacheSharedContext sharedCtx) throws IgniteCheckedException { - return new RecordSerializerFactoryImpl(sharedCtx) - .recordDeserializeFilter(new LogicalRecordsFilter()) - .marshalledMode(true); + return new RecordSerializerFactoryImpl(sharedCtx, new LogicalRecordsFilter()).marshalledMode(true); } /** {@inheritDoc} */ http://git-wip-us.apache.org/repos/asf/ignite/blob/d6ab2ae6/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/reader/IgniteWalIteratorFactory.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/reader/IgniteWalIteratorFactory.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/reader/IgniteWalIteratorFactory.java index 0c7bbb3..2bfc22d 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/reader/IgniteWalIteratorFactory.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/reader/IgniteWalIteratorFactory.java @@ -17,17 +17,49 @@ package org.apache.ignite.internal.processors.cache.persistence.wal.reader; +import java.io.DataInput; import java.io.File; +import java.io.IOException; +import java.io.PrintStream; +import java.nio.ByteOrder; +import java.nio.file.FileVisitResult; +import java.nio.file.Path; +import java.nio.file.SimpleFileVisitor; +import java.nio.file.attribute.BasicFileAttributes; +import java.util.ArrayList; +import java.util.Collections; +import java.util.Iterator; +import java.util.List; +import java.util.TreeSet; import org.apache.ignite.IgniteCheckedException; import org.apache.ignite.IgniteLogger; import org.apache.ignite.configuration.DataStorageConfiguration; import org.apache.ignite.internal.GridKernalContext; import org.apache.ignite.internal.pagemem.wal.WALIterator; +import org.apache.ignite.internal.pagemem.wal.WALPointer; +import org.apache.ignite.internal.pagemem.wal.record.WALRecord.RecordType; import org.apache.ignite.internal.processors.cache.GridCacheSharedContext; +import org.apache.ignite.internal.processors.cache.persistence.file.FileIO; import org.apache.ignite.internal.processors.cache.persistence.file.FileIOFactory; +import org.apache.ignite.internal.processors.cache.persistence.file.UnzipFileIO; +import org.apache.ignite.internal.processors.cache.persistence.wal.ByteBufferExpander; +import org.apache.ignite.internal.processors.cache.persistence.wal.FileInput; +import org.apache.ignite.internal.processors.cache.persistence.wal.FileWALPointer; +import org.apache.ignite.internal.processors.cache.persistence.wal.FileWriteAheadLogManager.FileDescriptor; +import org.apache.ignite.internal.util.typedef.T2; +import org.apache.ignite.internal.util.typedef.internal.A; +import org.apache.ignite.internal.util.typedef.internal.U; +import org.apache.ignite.lang.IgniteBiPredicate; import org.jetbrains.annotations.NotNull; import org.jetbrains.annotations.Nullable; +import static java.lang.System.arraycopy; +import static java.nio.file.Files.walkFileTree; +import static org.apache.ignite.internal.processors.cache.persistence.wal.FileWriteAheadLogManager.WAL_NAME_PATTERN; +import static org.apache.ignite.internal.processors.cache.persistence.wal.FileWriteAheadLogManager.WAL_SEGMENT_FILE_COMPACTED_PATTERN; +import static org.apache.ignite.internal.processors.cache.persistence.wal.serializer.RecordV1Serializer.HEADER_RECORD_SIZE; +import static org.apache.ignite.internal.processors.cache.persistence.wal.serializer.RecordV1Serializer.readPosition; + /** * Factory for creating iterator over WAL files */ @@ -35,189 +67,522 @@ public class IgniteWalIteratorFactory { /** Logger. */ private final IgniteLogger log; - /** Page size, in standalone iterator mode this value can't be taken from memory configuration. */ - private final int pageSize; - - /** - * Folder specifying location of metadata File Store. {@code null} means no specific folder is configured. <br> - * This folder should be specified for converting data entries into BinaryObjects - */ - @Nullable private File binaryMetadataFileStoreDir; - /** - * Folder specifying location of marshaller mapping file store. {@code null} means no specific folder is configured. - * <br> This folder should be specified for converting data entries into BinaryObjects. Providing {@code null} will - * disable unmarshall for non primitive objects, BinaryObjects will be provided + * Creates WAL files iterator factory. + * WAL iterator supports automatic converting from CacheObjects and KeyCacheObject into BinaryObjects */ - @Nullable private File marshallerMappingFileStoreDir; - - /** Keep binary. This flag disables converting of non primitive types (BinaryObjects) */ - private boolean keepBinary; - - /** Factory to provide I/O interfaces for read/write operations with files */ - private FileIOFactory ioFactory; - - /** Wal records iterator buffer size */ - private int bufSize = StandaloneWalRecordsIterator.DFLT_BUF_SIZE; + public IgniteWalIteratorFactory() { + this(ConsoleLogger.INSTANCE); + } /** * Creates WAL files iterator factory. * WAL iterator supports automatic converting from CacheObjects and KeyCacheObject into BinaryObjects * * @param log Logger. - * @param pageSize Page size which was used in Ignite Persistent Data store to read WAL from, size is validated - * according its boundaries. - * @param binaryMetadataFileStoreDir folder specifying location of metadata File Store. Should include "binary_meta" - * subfolder and consistent ID subfolder. Note Consistent ID should be already masked and should not contain special - * symbols. Providing {@code null} means no specific folder is configured. <br> - * @param marshallerMappingFileStoreDir Folder specifying location of marshaller mapping file store. Should include - * "marshaller" subfolder. Providing {@code null} will disable unmarshall for non primitive objects, BinaryObjects - * will be provided - * @param keepBinary {@code true} disables complex object unmarshall into source classes */ - public IgniteWalIteratorFactory( - @NotNull final IgniteLogger log, - final int pageSize, - @Nullable final File binaryMetadataFileStoreDir, - @Nullable final File marshallerMappingFileStoreDir, - final boolean keepBinary) { + public IgniteWalIteratorFactory(@NotNull final IgniteLogger log) { this.log = log; - this.pageSize = pageSize; - this.binaryMetadataFileStoreDir = binaryMetadataFileStoreDir; - this.marshallerMappingFileStoreDir = marshallerMappingFileStoreDir; - this.keepBinary = keepBinary; - this.ioFactory = new DataStorageConfiguration().getFileIOFactory(); - new DataStorageConfiguration().setPageSize(pageSize); // just for validate } /** - * Creates WAL files iterator factory. - * WAL iterator supports automatic converting from CacheObjects and KeyCacheObject into BinaryObjects + * Creates iterator for file by file scan mode. + * This method may be used for work folder, file indexes are scanned from the file context. + * In this mode only provided WAL segments will be scanned. New WAL files created during iteration will be ignored. * - * @param log Logger. - * @param pageSize Page size which was used in Ignite Persistent Data store to read WAL from, size is validated - * according its boundaries. - * @param binaryMetadataFileStoreDir folder specifying location of metadata File Store. Should include "binary_meta" - * subfolder and consistent ID subfolder. Note Consistent ID should be already masked and should not contain special - * symbols. Providing {@code null} means no specific folder is configured. <br> - * @param marshallerMappingFileStoreDir Folder specifying location of marshaller mapping file store. Should include - * "marshaller" subfolder. Providing {@code null} will disable unmarshall for non primitive objects, BinaryObjects - * will be provided + * @param filesOrDirs files to scan. A file can be the path to '.wal' file, or directory with '.wal' files. + * Order is not important, but it is significant to provide all segments without omissions. + * Path should not contain special symbols. Special symbols should be already masked. + * @return closable WAL records iterator, should be closed when non needed. + * @throws IgniteCheckedException if failed to read files + * @throws IllegalArgumentException If parameter validation failed. */ - public IgniteWalIteratorFactory( - @NotNull final IgniteLogger log, - final int pageSize, - @Nullable final File binaryMetadataFileStoreDir, - @Nullable final File marshallerMappingFileStoreDir) { - this(log, pageSize, binaryMetadataFileStoreDir, marshallerMappingFileStoreDir, false); + public WALIterator iterator( + @NotNull File... filesOrDirs + ) throws IgniteCheckedException, IllegalArgumentException { + return iterator(new IteratorParametersBuilder().filesOrDirs(filesOrDirs)); } /** - * Creates WAL files iterator factory. This constructor does not allow WAL iterators access to data entries key and value. + * Creates iterator for file by file scan mode. + * This method may be used for work folder, file indexes are scanned from the file context. + * In this mode only provided WAL segments will be scanned. New WAL files created during iteration will be ignored. * - * @param log Logger. - * @param ioFactory Custom factory for non-standard file API to be used in WAL reading. - * @param pageSize Page size which was used in Ignite Persistent Data store to read WAL from, size is validated - * according its boundaries. + * @param filesOrDirs paths to scan. A path can be direct to '.wal' file, or directory with '.wal' files. + * Order is not important, but it is significant to provide all segments without omissions. + * Path should not contain special symbols. Special symbols should be already masked. + * @return closable WAL records iterator, should be closed when non needed. + * @throws IgniteCheckedException If failed to read files. + * @throws IllegalArgumentException If parameter validation failed. */ - public IgniteWalIteratorFactory(@NotNull final IgniteLogger log, @NotNull final FileIOFactory ioFactory, int pageSize) { - this.log = log; - this.pageSize = pageSize; - this.ioFactory = ioFactory; - new DataStorageConfiguration().setPageSize(pageSize); // just for validate + public WALIterator iterator( + @NotNull String... filesOrDirs + ) throws IgniteCheckedException, IllegalArgumentException { + return iterator(new IteratorParametersBuilder().filesOrDirs(filesOrDirs)); } /** - * Creates WAL files iterator factory. This constructor does not allow WAL iterators access to data entries key and - * value. - * - * @param log Logger. - * @param pageSize Page size which was used in Ignite Persistent Data store to read WAL from, size is validated - * according its boundaries. + * @param iteratorParametersBuilder Iterator parameters builder. + * @return closable WAL records iterator, should be closed when non needed */ - public IgniteWalIteratorFactory(@NotNull final IgniteLogger log, int pageSize) { - this(log, new DataStorageConfiguration().getFileIOFactory(), pageSize); + public WALIterator iterator( + @NotNull IteratorParametersBuilder iteratorParametersBuilder + ) throws IgniteCheckedException, IllegalArgumentException { + iteratorParametersBuilder.validate(); + + return new StandaloneWalRecordsIterator(log, + prepareSharedCtx(iteratorParametersBuilder), + iteratorParametersBuilder.ioFactory, + resolveWalFiles( + iteratorParametersBuilder.filesOrDirs, + iteratorParametersBuilder + ), + iteratorParametersBuilder.filter, + iteratorParametersBuilder.keepBinary, + iteratorParametersBuilder.bufferSize + ); } /** - * Creates iterator for (archive) directory scan mode. - * Note in this mode total scanned files at end of iteration may be wider that initial files in directory. - * This mode does not support work directory scan because work directory contains unpredictable number in file name. - * Such file may broke iteration. + * Find WAL gaps, for example: + * 0 1 2 3 4 7 8 10 - WAL segment files in directory, this method will return + * List with two tuples [(4,7),(8,10)]. * - * @param walDirWithConsistentId directory with WAL files. Should already contain node consistent ID as subfolder. - * Note: 'Consistent ID'-based subfolder name (if any) should not contain special symbols. - * @return closable WAL records iterator, should be closed when non needed - * @throws IgniteCheckedException if failed to read folder + * @param filesOrDirs Paths to files or directories for scan. + * @return List of tuples, low and high index segments with gap. */ - public WALIterator iteratorArchiveDirectory( - @NotNull final File walDirWithConsistentId) throws IgniteCheckedException { - return new StandaloneWalRecordsIterator( - walDirWithConsistentId, log, prepareSharedCtx(), ioFactory, keepBinary, bufSize); + public List<T2<Long, Long>> hasGaps( + @NotNull String... filesOrDirs + ) throws IllegalArgumentException { + return hasGaps(new IteratorParametersBuilder().filesOrDirs(filesOrDirs)); } /** - * Creates iterator for file by file scan mode. - * This method may be used only for archive folder (not for work). - * In this mode only provided WAL segments will be scanned. New WAL files created during iteration will be ignored + * Find WAL gaps, for example: + * 0 1 2 3 4 7 8 10 - WAL segment files in directory, this method will return + * List with two tuples [(4,7),(8,10)]. * - * @param files files to scan. Order is not important, but it is significant to provide all segments without omissions. - * Parameter should contain direct file links to '.wal' files from archive directory. - * 'Consistent ID'-based subfolder name (if any) should not contain special symbols. - * Special symbols should be already masked. - * - * @return closable WAL records iterator, should be closed when non needed - * @throws IgniteCheckedException if failed to read files + * @param filesOrDirs Files or directories to scan. + * @return List of tuples, low and high index segments with gap. */ - public WALIterator iteratorArchiveFiles(@NotNull final File... files) throws IgniteCheckedException { - return new StandaloneWalRecordsIterator(log, prepareSharedCtx(), ioFactory, false, keepBinary, bufSize, files); + public List<T2<Long, Long>> hasGaps( + @NotNull File... filesOrDirs + ) throws IllegalArgumentException { + return hasGaps(new IteratorParametersBuilder().filesOrDirs(filesOrDirs)); } /** - * Creates iterator for file by file scan mode. - * This method may be used for work folder, file indexes are scanned from the file context. - * In this mode only provided WAL segments will be scanned. New WAL files created during iteration will be ignored. - * - * @param files files to scan. Order is not important, but it is significant to provide all segments without omissions. - * Parameter should contain direct file links to '.wal' files from work directory. - * 'Consistent ID'-based subfolder name (if any) should not contain special symbols. - * Special symbols should be already masked. + * @param iteratorParametersBuilder Iterator parameters builder. + * @return List of tuples, low and high index segments with gap. + */ + public List<T2<Long, Long>> hasGaps( + @NotNull IteratorParametersBuilder iteratorParametersBuilder + ) throws IllegalArgumentException { + iteratorParametersBuilder.validate(); + + List<T2<Long, Long>> gaps = new ArrayList<>(); + + List<FileDescriptor> descriptors = resolveWalFiles( + iteratorParametersBuilder.filesOrDirs, + iteratorParametersBuilder + ); + + Iterator<FileDescriptor> it = descriptors.iterator(); + + FileDescriptor prevFd = null; + + while (it.hasNext()) { + FileDescriptor nextFd = it.next(); + + if (prevFd == null) { + prevFd = nextFd; + + continue; + } + + if (prevFd.idx() + 1 != nextFd.idx()) + gaps.add(new T2<>(prevFd.idx(), nextFd.idx())); + + prevFd = nextFd; + } + + return gaps; + } + + /** + * This methods checks all provided files to be correct WAL segment. + * Header record and its position is checked. WAL position is used to determine real index. + * File index from file name is ignored. * - * @return closable WAL records iterator, should be closed when non needed - * @throws IgniteCheckedException if failed to read files + * @param iteratorParametersBuilder IteratorParametersBuilder. + * @return list of file descriptors with checked header records, having correct file index is set + */ + private List<FileDescriptor> resolveWalFiles( + File[] filesOrDirs, + IteratorParametersBuilder iteratorParametersBuilder + ) { + if (filesOrDirs == null || filesOrDirs.length == 0) + return Collections.emptyList(); + + final FileIOFactory ioFactory = iteratorParametersBuilder.ioFactory; + + final TreeSet<FileDescriptor> descriptors = new TreeSet<>(); + + for (File file : filesOrDirs) { + if (file.isDirectory()) { + try { + walkFileTree(file.toPath(), new SimpleFileVisitor<Path>() { + @Override + public FileVisitResult visitFile(Path path, BasicFileAttributes attrs) { + addFileDescriptor(path.toFile(), ioFactory, descriptors); + + return FileVisitResult.CONTINUE; + } + }); + } + catch (IOException e) { + U.error(log, "Failed to walk directories from root [" + file + "]. Skipping this directory.", e); + } + + continue; + } + + addFileDescriptor(file, ioFactory, descriptors); + } + + return new ArrayList<>(descriptors); + } + + /** + * @param file File. + * @param ioFactory IO factory. + * @param descriptors List of descriptors. + */ + private void addFileDescriptor(File file, FileIOFactory ioFactory, TreeSet<FileDescriptor> descriptors) { + if (file.length() < HEADER_RECORD_SIZE) + return; // Filter out this segment as it is too short. + + String fileName = file.getName(); + + if (!WAL_NAME_PATTERN.matcher(fileName).matches() && + !WAL_SEGMENT_FILE_COMPACTED_PATTERN.matcher(fileName).matches()) + return; // Filter out this because it is not segment file. + + FileDescriptor desc = readFileDescriptor(file, ioFactory); + + if (desc != null) + descriptors.add(desc); + } + + /** + * @param file File to read. + * @param ioFactory IO factory. */ - public WALIterator iteratorWorkFiles(@NotNull final File... files) throws IgniteCheckedException { - return new StandaloneWalRecordsIterator(log, prepareSharedCtx(), ioFactory, true, keepBinary, bufSize, files); + private FileDescriptor readFileDescriptor(File file, FileIOFactory ioFactory) { + FileDescriptor ds = new FileDescriptor(file); + + try ( + FileIO fileIO = ds.isCompressed() ? new UnzipFileIO(file) : ioFactory.create(file); + ByteBufferExpander buf = new ByteBufferExpander(HEADER_RECORD_SIZE, ByteOrder.nativeOrder()) + ) { + final DataInput in = new FileInput(fileIO, buf); + + // Header record must be agnostic to the serializer version. + final int type = in.readUnsignedByte(); + + if (type == RecordType.STOP_ITERATION_RECORD_TYPE) { + if (log.isInfoEnabled()) + log.info("Reached logical end of the segment for file " + file); + + return null; + } + + FileWALPointer ptr = readPosition(in); + + return new FileDescriptor(file, ptr.index()); + } + catch (IOException e) { + U.warn(log, "Failed to scan index from file [" + file + "]. Skipping this file during iteration", e); + + return null; + } } /** - * @return fake shared context required for create minimal services for record reading + * @return Fake shared context required for create minimal services for record reading. */ - @NotNull private GridCacheSharedContext prepareSharedCtx() throws IgniteCheckedException { - final GridKernalContext kernalCtx = new StandaloneGridKernalContext(log, binaryMetadataFileStoreDir, marshallerMappingFileStoreDir); + @NotNull private GridCacheSharedContext prepareSharedCtx( + IteratorParametersBuilder iteratorParametersBuilder + ) throws IgniteCheckedException { + GridKernalContext kernalCtx = new StandaloneGridKernalContext(log, + iteratorParametersBuilder.binaryMetadataFileStoreDir, + iteratorParametersBuilder.marshallerMappingFileStoreDir + ); - final StandaloneIgniteCacheDatabaseSharedManager dbMgr = new StandaloneIgniteCacheDatabaseSharedManager(); + StandaloneIgniteCacheDatabaseSharedManager dbMgr = new StandaloneIgniteCacheDatabaseSharedManager(); - dbMgr.setPageSize(pageSize); + dbMgr.setPageSize(iteratorParametersBuilder.pageSize); return new GridCacheSharedContext<>( kernalCtx, null, null, null, null, null, null, dbMgr, null, null, null, null, null, - null, null, null); + null, null, null + ); } /** - * @param ioFactory New factory to provide I/O interfaces for read/write operations with files + * Wal iterator parameter builder. */ - public void ioFactory(FileIOFactory ioFactory) { - this.ioFactory = ioFactory; + public static class IteratorParametersBuilder { + /** */ + private File[] filesOrDirs; + + /** */ + private int pageSize = DataStorageConfiguration.DFLT_PAGE_SIZE; + + /** Wal records iterator buffer size. */ + private int bufferSize = StandaloneWalRecordsIterator.DFLT_BUF_SIZE; + + /** Keep binary. This flag disables converting of non primitive types (BinaryObjects). */ + private boolean keepBinary; + + /** Factory to provide I/O interfaces for read/write operations with files. */ + private FileIOFactory ioFactory = new DataStorageConfiguration().getFileIOFactory(); + + /** + * Folder specifying location of metadata File Store. {@code null} means no specific folder is configured. <br> + * This folder should be specified for converting data entries into BinaryObjects + */ + @Nullable private File binaryMetadataFileStoreDir; + + /** + * Folder specifying location of marshaller mapping file store. {@code null} means no specific folder is configured. + * <br> This folder should be specified for converting data entries into BinaryObjects. Providing {@code null} will + * disable unmarshall for non primitive objects, BinaryObjects will be provided + */ + @Nullable private File marshallerMappingFileStoreDir; + + /** */ + @Nullable private IgniteBiPredicate<RecordType, WALPointer> filter; + + /** + * @param filesOrDirs Paths to files or directories. + * @return IteratorParametersBuilder Self reference. + */ + public IteratorParametersBuilder filesOrDirs(String... filesOrDirs) { + File[] filesOrDirs0 = new File[filesOrDirs.length]; + + for (int i = 0; i < filesOrDirs.length; i++) { + filesOrDirs0[i] = new File(filesOrDirs[i]); + } + + return filesOrDirs(filesOrDirs0); + } + + /** + * @param filesOrDirs Files or directories. + * @return IteratorParametersBuilder Self reference. + */ + public IteratorParametersBuilder filesOrDirs(File... filesOrDirs) { + if (this.filesOrDirs == null) + this.filesOrDirs = filesOrDirs; + else + this.filesOrDirs = merge(this.filesOrDirs, filesOrDirs); + + return this; + } + + /** + * @param pageSize Page size. + * @return IteratorParametersBuilder Self reference. + */ + public IteratorParametersBuilder pageSize(int pageSize) { + this.pageSize = pageSize; + + return this; + } + + /** + * @param bufferSize Initial size of buffer for reading segments. + * @return IteratorParametersBuilder Self reference. + */ + public IteratorParametersBuilder bufferSize(int bufferSize) { + this.bufferSize = bufferSize; + + return this; + } + + /** + * @return IteratorParametersBuilder Self reference. + */ + public IteratorParametersBuilder keepBinary(boolean keepBinary) { + this.keepBinary = keepBinary; + + return this; + } + + /** + * @param ioFactory Custom IO factory for reading files. + * @return IteratorParametersBuilder Self reference. + */ + public IteratorParametersBuilder ioFactory(FileIOFactory ioFactory) { + this.ioFactory = ioFactory; + + return this; + } + + /** + * @param binaryMetadataFileStoreDir Path to the binary metadata. + * @return IteratorParametersBuilder Self reference. + */ + public IteratorParametersBuilder binaryMetadataFileStoreDir(File binaryMetadataFileStoreDir) { + this.binaryMetadataFileStoreDir = binaryMetadataFileStoreDir; + + return this; + } + + /** + * @param marshallerMappingFileStoreDir Path to the marshaller mapping. + * @return IteratorParametersBuilder Self reference. + */ + public IteratorParametersBuilder marshallerMappingFileStoreDir(File marshallerMappingFileStoreDir) { + this.marshallerMappingFileStoreDir = marshallerMappingFileStoreDir; + + return this; + } + + /** + * @param filter Record filter for skip records during iteration. + * @return IteratorParametersBuilder Self reference. + */ + public IteratorParametersBuilder filter(IgniteBiPredicate<RecordType, WALPointer> filter) { + this.filter = filter; + + return this; + } + + /** + * Copy current state of builder to new instance. + * + * @return IteratorParametersBuilder Self reference. + */ + public IteratorParametersBuilder copy() { + return new IteratorParametersBuilder() + .filesOrDirs(filesOrDirs) + .pageSize(pageSize) + .bufferSize(bufferSize) + .keepBinary(keepBinary) + .ioFactory(ioFactory) + .binaryMetadataFileStoreDir(binaryMetadataFileStoreDir) + .marshallerMappingFileStoreDir(marshallerMappingFileStoreDir) + .filter(filter); + } + + /** + * @throws IllegalArgumentException If validation failed. + */ + public void validate() throws IllegalArgumentException { + A.ensure(pageSize >= 1024 && pageSize <= 16 * 1024, "Page size must be between 1kB and 16kB."); + A.ensure(U.isPow2(pageSize), "Page size must be a power of 2."); + + A.ensure(bufferSize >= pageSize * 2, "Buffer to small."); + } + + /** + * Merge file arrays. + * + * @param f1 Files array one. + * @param f2 Files array two. + * @return Merged arrays from one and two arrays. + */ + private File[] merge(File[] f1, File[] f2) { + File[] merged = new File[f1.length + f2.length]; + + arraycopy(f1, 0, merged, 0, f1.length); + arraycopy(f2, 0, merged, f1.length, f2.length); + + return merged; + } } /** - * @param bufSize New wal records iterator buffer size + * */ - public void bufferSize(int bufSize) { - this.bufSize = bufSize; + private static class ConsoleLogger implements IgniteLogger { + /** */ + private static final ConsoleLogger INSTANCE = new ConsoleLogger(); + + /** */ + private static final PrintStream OUT = System.out; + + /** */ + private static final PrintStream ERR = System.err; + + /** */ + private ConsoleLogger() { + + } + + /** {@inheritDoc} */ + @Override public IgniteLogger getLogger(Object ctgr) { + return this; + } + + /** {@inheritDoc} */ + @Override public void trace(String msg) { + + } + + /** {@inheritDoc} */ + @Override public void debug(String msg) { + + } + + /** {@inheritDoc} */ + @Override public void info(String msg) { + OUT.println(msg); + } + + /** {@inheritDoc} */ + @Override public void warning(String msg, @Nullable Throwable e) { + OUT.println(msg); + + if (e != null) + e.printStackTrace(OUT); + } + + /** {@inheritDoc} */ + @Override public void error(String msg, @Nullable Throwable e) { + ERR.println(msg); + + if (e != null) + e.printStackTrace(ERR); + } + + /** {@inheritDoc} */ + @Override public boolean isTraceEnabled() { + return false; + } + + /** {@inheritDoc} */ + @Override public boolean isDebugEnabled() { + return false; + } + + /** {@inheritDoc} */ + @Override public boolean isInfoEnabled() { + return true; + } + + /** {@inheritDoc} */ + @Override public boolean isQuiet() { + return false; + } + + /** {@inheritDoc} */ + @Override public String fileName() { + return "SYSTEM.OUT"; + } } } http://git-wip-us.apache.org/repos/asf/ignite/blob/d6ab2ae6/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/reader/StandaloneWalRecordsIterator.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/reader/StandaloneWalRecordsIterator.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/reader/StandaloneWalRecordsIterator.java index 712517b..9df4468 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/reader/StandaloneWalRecordsIterator.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/reader/StandaloneWalRecordsIterator.java @@ -17,44 +17,41 @@ package org.apache.ignite.internal.processors.cache.persistence.wal.reader; -import java.io.DataInput; import java.io.File; import java.io.FileNotFoundException; import java.io.IOException; -import java.nio.ByteOrder; import java.util.ArrayList; -import java.util.Arrays; -import java.util.Collections; import java.util.List; import org.apache.ignite.IgniteCheckedException; import org.apache.ignite.IgniteLogger; import org.apache.ignite.internal.GridKernalContext; +import org.apache.ignite.internal.pagemem.wal.WALPointer; import org.apache.ignite.internal.pagemem.wal.record.DataEntry; import org.apache.ignite.internal.pagemem.wal.record.DataRecord; import org.apache.ignite.internal.pagemem.wal.record.LazyDataEntry; import org.apache.ignite.internal.pagemem.wal.record.UnwrapDataEntry; import org.apache.ignite.internal.pagemem.wal.record.WALRecord; +import org.apache.ignite.internal.pagemem.wal.record.WALRecord.RecordType; import org.apache.ignite.internal.processors.cache.CacheObject; import org.apache.ignite.internal.processors.cache.CacheObjectContext; import org.apache.ignite.internal.processors.cache.GridCacheSharedContext; import org.apache.ignite.internal.processors.cache.KeyCacheObject; import org.apache.ignite.internal.processors.cache.persistence.file.FileIO; import org.apache.ignite.internal.processors.cache.persistence.file.FileIOFactory; +import org.apache.ignite.internal.processors.cache.persistence.file.UnzipFileIO; import org.apache.ignite.internal.processors.cache.persistence.wal.AbstractWalRecordsIterator; -import org.apache.ignite.internal.processors.cache.persistence.wal.ByteBufferExpander; import org.apache.ignite.internal.processors.cache.persistence.wal.FileInput; import org.apache.ignite.internal.processors.cache.persistence.wal.FileWALPointer; -import org.apache.ignite.internal.processors.cache.persistence.wal.FileWriteAheadLogManager; +import org.apache.ignite.internal.processors.cache.persistence.wal.FileWriteAheadLogManager.FileDescriptor; +import org.apache.ignite.internal.processors.cache.persistence.wal.FileWriteAheadLogManager.ReadFileHandle; import org.apache.ignite.internal.processors.cache.persistence.wal.serializer.RecordSerializer; import org.apache.ignite.internal.processors.cache.persistence.wal.serializer.RecordSerializerFactoryImpl; -import org.apache.ignite.internal.processors.cache.persistence.wal.serializer.RecordV1Serializer; import org.apache.ignite.internal.processors.cacheobject.IgniteCacheObjectProcessor; -import org.apache.ignite.internal.util.typedef.F; -import org.apache.ignite.internal.util.typedef.internal.U; +import org.apache.ignite.lang.IgniteBiPredicate; import org.jetbrains.annotations.NotNull; import org.jetbrains.annotations.Nullable; -import static org.apache.ignite.internal.processors.cache.persistence.wal.serializer.RecordV1Serializer.HEADER_RECORD_SIZE; +import static org.apache.ignite.internal.processors.cache.persistence.wal.serializer.RecordV1Serializer.readSegmentHeader; /** * WAL reader iterator, for creation in standalone WAL reader tool @@ -66,78 +63,52 @@ class StandaloneWalRecordsIterator extends AbstractWalRecordsIterator { /** */ private static final long serialVersionUID = 0L; - - /** - * WAL files directory. Should already contain 'consistent ID' as subfolder. - * <code>null</code> value means file-by-file iteration mode - */ - @Nullable - private File walFilesDir; - /** * File descriptors remained to scan. * <code>null</code> value means directory scan mode */ @Nullable - private List<FileWriteAheadLogManager.FileDescriptor> walFileDescriptors; + private final List<FileDescriptor> walFileDescriptors; + + /** */ + private int curIdx = -1; /** Keep binary. This flag disables converting of non primitive types (BinaryObjects) */ private boolean keepBinary; /** - * Creates iterator in directory scan mode - * @param walFilesDir Wal files directory. Should already contain node consistent ID as subfolder - * @param log Logger. - * @param sharedCtx Shared context. Cache processor is to be configured if Cache Object Key & Data Entry is required. - * @param ioFactory File I/O factory. - * @param keepBinary Keep binary. This flag disables converting of non primitive types - * @param bufSize Buffer size. - */ - StandaloneWalRecordsIterator( - @NotNull File walFilesDir, - @NotNull IgniteLogger log, - @NotNull GridCacheSharedContext sharedCtx, - @NotNull FileIOFactory ioFactory, - boolean keepBinary, - int bufSize - ) throws IgniteCheckedException { - super(log, - sharedCtx, - new RecordSerializerFactoryImpl(sharedCtx), - ioFactory, - bufSize); - this.keepBinary = keepBinary; - init(walFilesDir, false, null); - advance(); - } - - /** * Creates iterator in file-by-file iteration mode. Directory * @param log Logger. * @param sharedCtx Shared context. Cache processor is to be configured if Cache Object Key & Data Entry is * required. * @param ioFactory File I/O factory. - * @param workDir Work directory is scanned, false - archive * @param keepBinary Keep binary. This flag disables converting of non primitive types * (BinaryObjects will be used instead) * @param walFiles Wal files. */ StandaloneWalRecordsIterator( - @NotNull IgniteLogger log, - @NotNull GridCacheSharedContext sharedCtx, - @NotNull FileIOFactory ioFactory, - boolean workDir, - boolean keepBinary, - int bufSize, - @NotNull File... walFiles) throws IgniteCheckedException { - super(log, + @NotNull IgniteLogger log, + @NotNull GridCacheSharedContext sharedCtx, + @NotNull FileIOFactory ioFactory, + @NotNull List<FileDescriptor> walFiles, + IgniteBiPredicate<RecordType, WALPointer> readTypeFilter, + boolean keepBinary, + int initialReadBufferSize + ) throws IgniteCheckedException { + super( + log, sharedCtx, - new RecordSerializerFactoryImpl(sharedCtx), + new RecordSerializerFactoryImpl(sharedCtx, readTypeFilter), ioFactory, - bufSize); + initialReadBufferSize + ); this.keepBinary = keepBinary; - init(null, workDir, walFiles); + + walFileDescriptors = walFiles; + + init(walFiles); + advance(); } @@ -145,119 +116,42 @@ class StandaloneWalRecordsIterator extends AbstractWalRecordsIterator { * For directory mode sets oldest file as initial segment, * for file by file mode, converts all files to descriptors and gets oldest as initial. * - * @param walFilesDir directory for directory scan mode - * @param workDir work directory, only for file-by-file mode * @param walFiles files for file-by-file iteration mode */ - private void init( - @Nullable final File walFilesDir, - final boolean workDir, - @Nullable final File[] walFiles) throws IgniteCheckedException { - if (walFilesDir != null) { - FileWriteAheadLogManager.FileDescriptor[] descs = FileWriteAheadLogManager.loadFileDescriptors(walFilesDir); - curWalSegmIdx = !F.isEmpty(descs) ? descs[0].getIdx() : 0; - this.walFilesDir = walFilesDir; - } - else { + private void init(List<FileDescriptor> walFiles) { + if (walFiles == null || walFiles.isEmpty()) + return; - if (workDir) - walFileDescriptors = scanIndexesFromFileHeaders(walFiles); - else - walFileDescriptors = new ArrayList<>(Arrays.asList(FileWriteAheadLogManager.scan(walFiles))); - - curWalSegmIdx = !walFileDescriptors.isEmpty() ? walFileDescriptors.get(0).getIdx() : 0; - } - curWalSegmIdx--; + curWalSegmIdx = walFiles.get(curIdx + 1).idx() - 1; if (log.isDebugEnabled()) log.debug("Initialized WAL cursor [curWalSegmIdx=" + curWalSegmIdx + ']'); } - /** - * This methods checks all provided files to be correct WAL segment. - * Header record and its position is checked. WAL position is used to determine real index. - * File index from file name is ignored. - * - * @param allFiles files to scan. - * @return list of file descriptors with checked header records, having correct file index is set - */ - private List<FileWriteAheadLogManager.FileDescriptor> scanIndexesFromFileHeaders( - @Nullable final File[] allFiles) { - if (allFiles == null || allFiles.length == 0) - return Collections.emptyList(); - - final List<FileWriteAheadLogManager.FileDescriptor> resultingDescs = new ArrayList<>(); - - for (File file : allFiles) { - if (file.length() < HEADER_RECORD_SIZE) - continue; //filter out this segment as it is too short - - FileWALPointer ptr; - - try ( - FileIO fileIO = ioFactory.create(file); - ByteBufferExpander buf = new ByteBufferExpander(HEADER_RECORD_SIZE, ByteOrder.nativeOrder()) - ) { - final DataInput in = new FileInput(fileIO, buf); - - // Header record must be agnostic to the serializer version. - final int type = in.readUnsignedByte(); - - if (type == WALRecord.RecordType.STOP_ITERATION_RECORD_TYPE) { - if (log.isInfoEnabled()) - log.info("Reached logical end of the segment for file " + file); - - continue; //filter out this segment - } - ptr = RecordV1Serializer.readPosition(in); - } - catch (IOException e) { - U.warn(log, "Failed to scan index from file [" + file + "]. Skipping this file during iteration", e); - - continue; //filter out this segment - } - - resultingDescs.add(new FileWriteAheadLogManager.FileDescriptor(file, ptr.index())); - } - Collections.sort(resultingDescs); - - return resultingDescs; - } - /** {@inheritDoc} */ @Override protected AbstractReadFileHandle advanceSegment( - @Nullable final AbstractReadFileHandle curWalSegment) throws IgniteCheckedException { + @Nullable final AbstractReadFileHandle curWalSegment + ) throws IgniteCheckedException { if (curWalSegment != null) curWalSegment.close(); curWalSegmIdx++; - // curHandle.workDir is false - final FileWriteAheadLogManager.FileDescriptor fd; - - if (walFilesDir != null) { - File segmentFile = new File(walFilesDir, - FileWriteAheadLogManager.FileDescriptor.fileName(curWalSegmIdx)); - if (!segmentFile.exists()) - segmentFile = new File(walFilesDir, - FileWriteAheadLogManager.FileDescriptor.fileName(curWalSegmIdx) + ".zip"); + curIdx++; - fd = new FileWriteAheadLogManager.FileDescriptor(segmentFile); - } - else { - if (walFileDescriptors.isEmpty()) - return null; //no files to read, stop iteration + if (curIdx >= walFileDescriptors.size()) + return null; - fd = walFileDescriptors.remove(0); - } + FileDescriptor fd = walFileDescriptors.get(curIdx); if (log.isDebugEnabled()) - log.debug("Reading next file [absIdx=" + curWalSegmIdx + ", file=" + fd.getAbsolutePath() + ']'); + log.debug("Reading next file [absIdx=" + curWalSegmIdx + ", file=" + fd.file().getAbsolutePath() + ']'); assert fd != null; curRec = null; + try { return initReadHandle(fd, null); } @@ -270,11 +164,42 @@ class StandaloneWalRecordsIterator extends AbstractWalRecordsIterator { } /** {@inheritDoc} */ + @Override protected AbstractReadFileHandle initReadHandle( + @NotNull AbstractFileDescriptor desc, + @Nullable FileWALPointer start + ) throws IgniteCheckedException, FileNotFoundException { + + AbstractFileDescriptor fd = desc; + + while (true) { + try { + FileIO fileIO = fd.isCompressed() ? new UnzipFileIO(fd.file()) : ioFactory.create(fd.file()); + + readSegmentHeader(fileIO, curWalSegmIdx); + + break; + } + catch (IOException | IgniteCheckedException e) { + log.error("Failed to init segment curWalSegmIdx=" + curWalSegmIdx + ", curIdx=" + curIdx, e); + + curIdx++; + + if (curIdx >= walFileDescriptors.size()) + return null; + + fd = walFileDescriptors.get(curIdx); + } + } + + return super.initReadHandle(fd, start); + } + + /** {@inheritDoc} */ @NotNull @Override protected WALRecord postProcessRecord(@NotNull final WALRecord rec) { - final GridKernalContext kernalCtx = sharedCtx.kernalContext(); - final IgniteCacheObjectProcessor processor = kernalCtx.cacheObjects(); + GridKernalContext kernalCtx = sharedCtx.kernalContext(); + IgniteCacheObjectProcessor processor = kernalCtx.cacheObjects(); - if (processor != null && rec.type() == WALRecord.RecordType.DATA_RECORD) { + if (processor != null && rec.type() == RecordType.DATA_RECORD) { try { return postProcessDataRecord((DataRecord)rec, kernalCtx, processor); } @@ -296,11 +221,12 @@ class StandaloneWalRecordsIterator extends AbstractWalRecordsIterator { * @throws IgniteCheckedException if failed. */ @NotNull private WALRecord postProcessDataRecord( - @NotNull final DataRecord dataRec, - final GridKernalContext kernalCtx, - final IgniteCacheObjectProcessor processor) throws IgniteCheckedException { - final CacheObjectContext fakeCacheObjCtx = new CacheObjectContext(kernalCtx, - null, null, false, false, false); + @NotNull DataRecord dataRec, + GridKernalContext kernalCtx, + IgniteCacheObjectProcessor processor + ) throws IgniteCheckedException { + final CacheObjectContext fakeCacheObjCtx = new CacheObjectContext( + kernalCtx, null, null, false, false, false); final List<DataEntry> entries = dataRec.writeEntries(); final List<DataEntry> postProcessedEntries = new ArrayList<>(entries.size()); @@ -327,8 +253,7 @@ class StandaloneWalRecordsIterator extends AbstractWalRecordsIterator { * @return post precessed entry * @throws IgniteCheckedException if failed */ - @NotNull - private DataEntry postProcessDataEntry( + @NotNull private DataEntry postProcessDataEntry( final IgniteCacheObjectProcessor processor, final CacheObjectContext fakeCacheObjCtx, final DataEntry dataEntry) throws IgniteCheckedException { @@ -383,8 +308,9 @@ class StandaloneWalRecordsIterator extends AbstractWalRecordsIterator { } /** {@inheritDoc} */ - @Override protected AbstractReadFileHandle createReadFileHandle(FileIO fileIO, long idx, - RecordSerializer ser, FileInput in) { - return new FileWriteAheadLogManager.ReadFileHandle(fileIO, idx, ser, in); + @Override protected AbstractReadFileHandle createReadFileHandle( + FileIO fileIO, long idx, RecordSerializer ser, FileInput in + ) { + return new ReadFileHandle(fileIO, idx, ser, in); } } http://git-wip-us.apache.org/repos/asf/ignite/blob/d6ab2ae6/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/serializer/RecordSerializerFactoryImpl.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/serializer/RecordSerializerFactoryImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/serializer/RecordSerializerFactoryImpl.java index 468392a..2e2e2f8 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/serializer/RecordSerializerFactoryImpl.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/serializer/RecordSerializerFactoryImpl.java @@ -31,7 +31,7 @@ public class RecordSerializerFactoryImpl implements RecordSerializerFactory { private GridCacheSharedContext cctx; /** Write pointer. */ - private boolean writePointer; + private boolean needWritePointer; /** Read record filter. */ private IgniteBiPredicate<WALRecord.RecordType, WALPointer> recordDeserializeFilter; @@ -49,7 +49,17 @@ public class RecordSerializerFactoryImpl implements RecordSerializerFactory { * @param cctx Cctx. */ public RecordSerializerFactoryImpl(GridCacheSharedContext cctx) { + this(cctx, null); + } + /** + * @param cctx Cctx. + */ + public RecordSerializerFactoryImpl( + GridCacheSharedContext cctx, + IgniteBiPredicate<WALRecord.RecordType, WALPointer> readTypeFilter + ) { this.cctx = cctx; + this.recordDeserializeFilter = readTypeFilter; } /** {@inheritDoc} */ @@ -59,14 +69,24 @@ public class RecordSerializerFactoryImpl implements RecordSerializerFactory { switch (ver) { case 1: - return new RecordV1Serializer(new RecordDataV1Serializer(cctx), - writePointer, marshalledMode, skipPositionCheck, recordDeserializeFilter); + return new RecordV1Serializer( + new RecordDataV1Serializer(cctx), + needWritePointer, + marshalledMode, + skipPositionCheck, + recordDeserializeFilter); case 2: - RecordDataV2Serializer dataV2Serializer = new RecordDataV2Serializer(new RecordDataV1Serializer(cctx)); + RecordDataV2Serializer dataV2Serializer = new RecordDataV2Serializer( + new RecordDataV1Serializer(cctx)); - return new RecordV2Serializer(dataV2Serializer, - writePointer, marshalledMode, skipPositionCheck, recordDeserializeFilter); + return new RecordV2Serializer( + dataV2Serializer, + needWritePointer, + marshalledMode, + skipPositionCheck, + recordDeserializeFilter + ); default: throw new IgniteCheckedException("Failed to create a serializer with the given version " + @@ -78,12 +98,12 @@ public class RecordSerializerFactoryImpl implements RecordSerializerFactory { * @return Write pointer. */ public boolean writePointer() { - return writePointer; + return needWritePointer; } /** {@inheritDoc} */ @Override public RecordSerializerFactoryImpl writePointer(boolean writePointer) { - this.writePointer = writePointer; + this.needWritePointer = writePointer; return this; } @@ -97,7 +117,8 @@ public class RecordSerializerFactoryImpl implements RecordSerializerFactory { /** {@inheritDoc} */ @Override public RecordSerializerFactoryImpl recordDeserializeFilter( - IgniteBiPredicate<WALRecord.RecordType, WALPointer> readTypeFilter) { + IgniteBiPredicate<WALRecord.RecordType, WALPointer> readTypeFilter + ) { this.recordDeserializeFilter = readTypeFilter; return this; http://git-wip-us.apache.org/repos/asf/ignite/blob/d6ab2ae6/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/serializer/RecordV1Serializer.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/serializer/RecordV1Serializer.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/serializer/RecordV1Serializer.java index caa0962..ca484ce 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/serializer/RecordV1Serializer.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/serializer/RecordV1Serializer.java @@ -134,6 +134,9 @@ public class RecordV1Serializer implements RecordSerializer { throw new SegmentEofException("WAL segment rollover detected (will end iteration) [expPtr=" + expPtr + ", readPtr=" + ptr + ']', null); + if (recType == null) + throw new IOException("Unknown record type: " + recType); + final WALRecord rec = dataSerializer.readRecord(recType, in); rec.position(ptr); @@ -341,12 +344,7 @@ public class RecordV1Serializer implements RecordSerializer { if (type == WALRecord.RecordType.STOP_ITERATION_RECORD_TYPE) throw new SegmentEofException("Reached logical end of the segment", null); - RecordType recType = RecordType.fromOrdinal(type - 1); - - if (recType == null) - throw new IOException("Unknown record type: " + type); - - return recType; + return RecordType.fromOrdinal(type - 1); } /** @@ -359,7 +357,11 @@ public class RecordV1Serializer implements RecordSerializer { * @throws EOFException In case of end of file. * @throws IgniteCheckedException If it's unable to read record. */ - static WALRecord readWithCrc(FileInput in0, WALPointer expPtr, RecordIO reader) throws EOFException, IgniteCheckedException { + static WALRecord readWithCrc( + FileInput in0, + WALPointer expPtr, + RecordIO reader + ) throws EOFException, IgniteCheckedException { long startPos = -1; try (FileInput.Crc32CheckingFileInput in = in0.startRead(skipCrc)) { @@ -377,7 +379,16 @@ public class RecordV1Serializer implements RecordSerializer { throw e; } catch (Exception e) { - throw new IgniteCheckedException("Failed to read WAL record at position: " + startPos, e); + long size = -1; + + try { + size = in0.io().size(); + } + catch (IOException ignore) { + // No-op. It just for information. Fail calculate file size. + } + + throw new IgniteCheckedException("Failed to read WAL record at position: " + startPos + " size: " + size, e); } } http://git-wip-us.apache.org/repos/asf/ignite/blob/d6ab2ae6/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/serializer/RecordV2Serializer.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/serializer/RecordV2Serializer.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/serializer/RecordV2Serializer.java index 2b81210..2c65ebe 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/serializer/RecordV2Serializer.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/serializer/RecordV2Serializer.java @@ -115,6 +115,9 @@ public class RecordV2Serializer implements RecordSerializer { FileWALPointer ptr = readPositionAndCheckPoint(in, expPtr, skipPositionCheck); + if (recType == null) + throw new IOException("Unknown record type: " + recType); + if (recordFilter != null && !recordFilter.apply(recType, ptr)) { int toSkip = ptr.length() - REC_TYPE_SIZE - FILE_WAL_POINTER_SIZE - CRC_SIZE;