IGNITE-8661 WALIterator should be stopped if it fails to deserialize a record - 
Fixes #4155.

Signed-off-by: Alexey Goncharuk <alexey.goncha...@gmail.com>


Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/d6ab2ae6
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/d6ab2ae6
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/d6ab2ae6

Branch: refs/heads/ignite-8446
Commit: d6ab2ae684d04431fbe0d8bc52bd6634d54e52b2
Parents: c56d16f
Author: Dmitriy Govorukhin <dmitriy.govoruk...@gmail.com>
Authored: Thu Jun 28 17:39:38 2018 +0300
Committer: Alexey Goncharuk <alexey.goncha...@gmail.com>
Committed: Thu Jun 28 17:49:45 2018 +0300

----------------------------------------------------------------------
 .../pagemem/wal/IgniteWriteAheadLogManager.java |    5 +
 .../GridCacheDatabaseSharedManager.java         |  221 +++-
 .../wal/AbstractWalRecordsIterator.java         |   27 +-
 .../wal/FileWriteAheadLogManager.java           |   39 +-
 .../wal/FsyncModeFileWriteAheadLogManager.java  |    5 +
 .../SingleSegmentLogicalRecordsIterator.java    |    4 +-
 .../wal/reader/IgniteWalIteratorFactory.java    |  617 +++++++---
 .../reader/StandaloneWalRecordsIterator.java    |  246 ++--
 .../serializer/RecordSerializerFactoryImpl.java |   39 +-
 .../wal/serializer/RecordV1Serializer.java      |   27 +-
 .../wal/serializer/RecordV2Serializer.java      |    3 +
 ...gniteWalIteratorExceptionDuringReadTest.java |  150 +++
 .../db/wal/reader/IgniteWalReaderTest.java      | 1050 ++++++++++--------
 .../persistence/pagemem/NoOpWALManager.java     |    5 +
 .../ignite/testsuites/IgnitePdsTestSuite2.java  |    3 +
 .../development/utils/IgniteWalConverter.java   |   10 +-
 16 files changed, 1589 insertions(+), 862 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/ignite/blob/d6ab2ae6/modules/core/src/main/java/org/apache/ignite/internal/pagemem/wal/IgniteWriteAheadLogManager.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/pagemem/wal/IgniteWriteAheadLogManager.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/pagemem/wal/IgniteWriteAheadLogManager.java
index fd5d53b..2b6358b 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/pagemem/wal/IgniteWriteAheadLogManager.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/pagemem/wal/IgniteWriteAheadLogManager.java
@@ -122,6 +122,11 @@ public interface IgniteWriteAheadLogManager extends 
GridCacheSharedManager, Igni
     public int walArchiveSegments();
 
     /**
+     * @return Last archived segment index.
+     */
+    public long lastArchivedSegment();
+
+    /**
      * Checks if WAL segment is under lock or reserved
      *
      * @param ptr Pointer to check.

http://git-wip-us.apache.org/repos/asf/ignite/blob/d6ab2ae6/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/GridCacheDatabaseSharedManager.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/GridCacheDatabaseSharedManager.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/GridCacheDatabaseSharedManager.java
index 9892b8c..7c83c1e 100755
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/GridCacheDatabaseSharedManager.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/GridCacheDatabaseSharedManager.java
@@ -131,6 +131,7 @@ import 
org.apache.ignite.internal.processors.cache.persistence.snapshot.Snapshot
 import org.apache.ignite.internal.processors.cache.persistence.tree.io.PageIO;
 import 
org.apache.ignite.internal.processors.cache.persistence.tree.io.PagePartitionMetaIO;
 import 
org.apache.ignite.internal.processors.cache.persistence.wal.FileWALPointer;
+import 
org.apache.ignite.internal.processors.cache.persistence.wal.crc.IgniteDataIntegrityViolationException;
 import 
org.apache.ignite.internal.processors.cache.persistence.wal.crc.PureJavaCrc32;
 import org.apache.ignite.internal.processors.port.GridPortRecord;
 import org.apache.ignite.internal.util.GridMultiCollectionWrapper;
@@ -166,6 +167,7 @@ import static 
org.apache.ignite.IgniteSystemProperties.IGNITE_PDS_SKIP_CRC;
 import static 
org.apache.ignite.IgniteSystemProperties.IGNITE_PDS_WAL_REBALANCE_THRESHOLD;
 import static org.apache.ignite.failure.FailureType.CRITICAL_ERROR;
 import static org.apache.ignite.failure.FailureType.SYSTEM_WORKER_TERMINATION;
+import static 
org.apache.ignite.internal.pagemem.wal.record.WALRecord.RecordType.CHECKPOINT_RECORD;
 import static 
org.apache.ignite.internal.processors.cache.persistence.metastorage.MetaStorage.METASTORAGE_CACHE_ID;
 
 /**
@@ -1930,39 +1932,26 @@ public class GridCacheDatabaseSharedManager extends 
IgniteCacheDatabaseSharedMan
             cctx.wal().allowCompressionUntil(status.startPtr);
 
         long start = U.currentTimeMillis();
-        int applied = 0;
-        WALPointer lastRead = null;
+
+        long lastArchivedSegment = cctx.wal().lastArchivedSegment();
+
+        RestoreBinaryState restoreBinaryState = new RestoreBinaryState(status, 
lastArchivedSegment, log);
 
         Collection<Integer> ignoreGrps = metastoreOnly ? 
Collections.emptySet() :
             F.concat(false, initiallyGlobalWalDisabledGrps, 
initiallyLocalWalDisabledGrps);
 
+        int applied = 0;
+
         try (WALIterator it = cctx.wal().replay(status.endPtr)) {
             while (it.hasNextX()) {
-                IgniteBiTuple<WALPointer, WALRecord> tup = it.nextX();
-
-                WALRecord rec = tup.get2();
+                WALRecord rec = restoreBinaryState.next(it);
 
-                lastRead = tup.get1();
+                if (rec == null)
+                    break;
 
                 switch (rec.type()) {
-                    case CHECKPOINT_RECORD:
-                        CheckpointRecord cpRec = (CheckpointRecord)rec;
-
-                        // We roll memory up until we find a checkpoint start 
record registered in the status.
-                        if (F.eq(cpRec.checkpointId(), status.cpStartId)) {
-                            log.info("Found last checkpoint marker [cpId=" + 
cpRec.checkpointId() +
-                                ", pos=" + tup.get1() + ']');
-
-                            apply = false;
-                        }
-                        else if (!F.eq(cpRec.checkpointId(), status.cpEndId))
-                            U.warn(log, "Found unexpected checkpoint marker, 
skipping [cpId=" + cpRec.checkpointId() +
-                                ", expCpId=" + status.cpStartId + ", pos=" + 
tup.get1() + ']');
-
-                        break;
-
                     case PAGE_RECORD:
-                        if (apply) {
+                        if (restoreBinaryState.needApplyBinaryUpdate()) {
                             PageSnapshot pageRec = (PageSnapshot)rec;
 
                             // Here we do not require tag check because we may 
be applying memory changes after
@@ -2045,7 +2034,7 @@ public class GridCacheDatabaseSharedManager extends 
IgniteCacheDatabaseSharedMan
                         break;
 
                     default:
-                        if (apply && rec instanceof PageDeltaRecord) {
+                        if (restoreBinaryState.needApplyBinaryUpdate() && rec 
instanceof PageDeltaRecord) {
                             PageDeltaRecord r = (PageDeltaRecord)rec;
 
                             int grpId = r.groupId();
@@ -2086,11 +2075,13 @@ public class GridCacheDatabaseSharedManager extends 
IgniteCacheDatabaseSharedMan
         if (metastoreOnly)
             return null;
 
+        WALPointer lastReadPtr = restoreBinaryState.lastReadRecordPointer();
+
         if (status.needRestoreMemory()) {
-            if (apply)
+            if (restoreBinaryState.needApplyBinaryUpdate())
                 throw new StorageException("Failed to restore memory state 
(checkpoint marker is present " +
                     "on disk, but checkpoint record is missed in WAL) " +
-                    "[cpStatus=" + status + ", lastRead=" + lastRead + "]");
+                    "[cpStatus=" + status + ", lastRead=" + lastReadPtr + "]");
 
             log.info("Finished applying memory changes [changesApplied=" + 
applied +
                 ", time=" + (U.currentTimeMillis() - start) + "ms]");
@@ -2101,7 +2092,7 @@ public class GridCacheDatabaseSharedManager extends 
IgniteCacheDatabaseSharedMan
 
         cpHistory.initialize(retreiveHistory());
 
-        return lastRead == null ? null : lastRead.next();
+        return lastReadPtr == null ? null : lastReadPtr.next();
     }
 
     /**
@@ -2210,6 +2201,10 @@ public class GridCacheDatabaseSharedManager extends 
IgniteCacheDatabaseSharedMan
         if (!metastoreOnly)
             cctx.kernalContext().query().skipFieldLookup(true);
 
+        long lastArchivedSegment = cctx.wal().lastArchivedSegment();
+
+        RestoreLogicalState restoreLogicalState = new 
RestoreLogicalState(lastArchivedSegment, log);
+
         long start = U.currentTimeMillis();
         int applied = 0;
 
@@ -2220,9 +2215,10 @@ public class GridCacheDatabaseSharedManager extends 
IgniteCacheDatabaseSharedMan
             Map<T2<Integer, Integer>, T2<Integer, Long>> partStates = new 
HashMap<>();
 
             while (it.hasNextX()) {
-                IgniteBiTuple<WALPointer, WALRecord> next = it.nextX();
+                WALRecord rec = restoreLogicalState.next(it);
 
-                WALRecord rec = next.get2();
+                if (rec == null)
+                    break;
 
                 switch (rec.type()) {
                     case DATA_RECORD:
@@ -4365,4 +4361,171 @@ public class GridCacheDatabaseSharedManager extends 
IgniteCacheDatabaseSharedMan
         else
             return null;
     }
+
+    /**
+     * Abstract class for create restore context.
+     */
+    public abstract static class RestoreStateContext {
+        /** */
+        protected final IgniteLogger log;
+
+        /** Last archived segment. */
+        protected final long lastArchivedSegment;
+
+        /** Last read record WAL pointer. */
+        protected FileWALPointer lastRead;
+
+        /**
+         * @param lastArchivedSegment Last archived segment index.
+         * @param log Ignite logger.
+         */
+        public RestoreStateContext(long lastArchivedSegment, IgniteLogger log) 
{
+            this.lastArchivedSegment = lastArchivedSegment;
+            this.log = log;
+        }
+
+        /**
+         * Advance iterator to the next record.
+         *
+         * @param it WAL iterator.
+         * @return WALRecord entry.
+         * @throws IgniteCheckedException If CRC check fail during binary 
recovery state or another exception occurring.
+         */
+        public WALRecord next(WALIterator it) throws IgniteCheckedException {
+            try {
+                IgniteBiTuple<WALPointer, WALRecord> tup = it.nextX();
+
+                WALRecord rec = tup.get2();
+
+                WALPointer ptr = tup.get1();
+
+                lastRead = (FileWALPointer)ptr;
+
+                rec.position(ptr);
+
+                return rec;
+            }
+            catch (IgniteCheckedException e) {
+                boolean throwsCRCError = throwsCRCError();
+
+                if (X.hasCause(e, 
IgniteDataIntegrityViolationException.class)) {
+                    if (throwsCRCError)
+                        throw e;
+                    else
+                        return null;
+                }
+
+                log.error("Catch error during restore state, throwsCRCError=" 
+ throwsCRCError, e);
+
+                throw e;
+            }
+        }
+
+        /**
+         *
+         * @return Last read WAL record pointer.
+         */
+        public WALPointer lastReadRecordPointer() {
+            return lastRead;
+        }
+
+        /**
+         *
+         * @return Flag indicates need throws CRC exception or not.
+         */
+        public boolean throwsCRCError(){
+            FileWALPointer lastReadPtr = lastRead;
+
+            return lastReadPtr != null && lastReadPtr.index() <= 
lastArchivedSegment;
+        }
+    }
+
+    /**
+     * Restore memory context. Tracks the safety of binary recovery.
+     */
+    public static class RestoreBinaryState extends RestoreStateContext {
+        /** Checkpoint status. */
+        private final CheckpointStatus status;
+
+        /** The flag indicates need to apply the binary update or no needed. */
+        private boolean needApplyBinaryUpdates;
+
+        /**
+         * @param status Checkpoint status.
+         * @param lastArchivedSegment Last archived segment index.
+         * @param log Ignite logger.
+         */
+        public RestoreBinaryState(CheckpointStatus status, long 
lastArchivedSegment, IgniteLogger log) {
+            super(lastArchivedSegment, log);
+
+            this.status = status;
+            needApplyBinaryUpdates = status.needRestoreMemory();
+        }
+
+        /**
+         * Advance iterator to the next record.
+         *
+         * @param it WAL iterator.
+         * @return WALRecord entry.
+         * @throws IgniteCheckedException If CRC check fail during binary 
recovery state or another exception occurring.
+         */
+        @Override public WALRecord next(WALIterator it) throws 
IgniteCheckedException {
+            WALRecord rec = super.next(it);
+
+            if (rec == null)
+                return null;
+
+            if (rec.type() == CHECKPOINT_RECORD) {
+                CheckpointRecord cpRec = (CheckpointRecord)rec;
+
+                // We roll memory up until we find a checkpoint start record 
registered in the status.
+                if (F.eq(cpRec.checkpointId(), status.cpStartId)) {
+                    log.info("Found last checkpoint marker [cpId=" + 
cpRec.checkpointId() +
+                        ", pos=" + rec.position() + ']');
+
+                    needApplyBinaryUpdates = false;
+                }
+                else if (!F.eq(cpRec.checkpointId(), status.cpEndId))
+                    U.warn(log, "Found unexpected checkpoint marker, skipping 
[cpId=" + cpRec.checkpointId() +
+                        ", expCpId=" + status.cpStartId + ", pos=" + 
rec.position() + ']');
+            }
+
+            return rec;
+        }
+
+        /**
+         *
+         * @return Flag indicates need apply binary record or not.
+         */
+        public boolean needApplyBinaryUpdate() {
+            return needApplyBinaryUpdates;
+        }
+
+        /**
+         *
+         * @return Flag indicates need throws CRC exception or not.
+         */
+        @Override public boolean throwsCRCError() {
+            log.info("Throws CRC error check, needApplyBinaryUpdates=" + 
needApplyBinaryUpdates +
+                ", lastArchivedSegment=" + lastArchivedSegment + ", lastRead=" 
+ lastRead);
+
+            if (needApplyBinaryUpdates)
+                return true;
+
+            return super.throwsCRCError();
+        }
+    }
+
+    /**
+     * Restore logical state context. Tracks the safety of logical recovery.
+     */
+    public static class RestoreLogicalState extends RestoreStateContext {
+        /**
+         * @param lastArchivedSegment Last archived segment index.
+         * @param log Ignite logger.
+         */
+        public RestoreLogicalState(long lastArchivedSegment, IgniteLogger log) 
{
+            super(lastArchivedSegment, log);
+        }
+    }
 }

http://git-wip-us.apache.org/repos/asf/ignite/blob/d6ab2ae6/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/AbstractWalRecordsIterator.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/AbstractWalRecordsIterator.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/AbstractWalRecordsIterator.java
index e442386..01b0933 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/AbstractWalRecordsIterator.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/AbstractWalRecordsIterator.java
@@ -91,21 +91,21 @@ public abstract class AbstractWalRecordsIterator
      * @param sharedCtx Shared context.
      * @param serializerFactory Serializer of current version to read headers.
      * @param ioFactory ioFactory for file IO access.
-     * @param bufSize buffer for reading records size.
+     * @param initialReadBufferSize buffer for reading records size.
      */
     protected AbstractWalRecordsIterator(
         @NotNull final IgniteLogger log,
         @NotNull final GridCacheSharedContext sharedCtx,
         @NotNull final RecordSerializerFactory serializerFactory,
         @NotNull final FileIOFactory ioFactory,
-        final int bufSize
+        final int initialReadBufferSize
     ) {
         this.log = log;
         this.sharedCtx = sharedCtx;
         this.serializerFactory = serializerFactory;
         this.ioFactory = ioFactory;
 
-        buf = new ByteBufferExpander(bufSize, ByteOrder.nativeOrder());
+        buf = new ByteBufferExpander(initialReadBufferSize, 
ByteOrder.nativeOrder());
     }
 
     /** {@inheritDoc} */
@@ -225,8 +225,12 @@ public abstract class AbstractWalRecordsIterator
             if (e instanceof WalSegmentTailReachedException)
                 throw (WalSegmentTailReachedException)e;
 
-            if (!(e instanceof SegmentEofException))
-                handleRecordException(e, actualFilePtr);
+            if (!(e instanceof SegmentEofException) && !(e instanceof 
EOFException)) {
+                IgniteCheckedException e0 = handleRecordException(e, 
actualFilePtr);
+
+                if (e0 != null)
+                    throw e0;
+            }
 
             return null;
         }
@@ -248,12 +252,15 @@ public abstract class AbstractWalRecordsIterator
      *
      * @param e problem from records reading
      * @param ptr file pointer was accessed
+     *
+     * @return {@code null} if the error was handled and we can go ahead,
+     *  {@code IgniteCheckedException} if the error was not handled, and we 
should stop the iteration.
      */
-    protected void handleRecordException(
-        @NotNull final Exception e,
-        @Nullable final FileWALPointer ptr) {
+    protected IgniteCheckedException handleRecordException(@NotNull final 
Exception e, @Nullable final FileWALPointer ptr) {
         if (log.isInfoEnabled())
             log.info("Stopping WAL iteration due to an exception: " + 
e.getMessage() + ", ptr=" + ptr);
+
+        return new IgniteCheckedException(e);
     }
 
     /**
@@ -265,8 +272,8 @@ public abstract class AbstractWalRecordsIterator
      */
     protected AbstractReadFileHandle initReadHandle(
         @NotNull final AbstractFileDescriptor desc,
-        @Nullable final FileWALPointer start)
-        throws IgniteCheckedException, FileNotFoundException {
+        @Nullable final FileWALPointer start
+    ) throws IgniteCheckedException, FileNotFoundException {
         try {
             FileIO fileIO = desc.isCompressed() ? new UnzipFileIO(desc.file()) 
: ioFactory.create(desc.file());
 

http://git-wip-us.apache.org/repos/asf/ignite/blob/d6ab2ae6/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/FileWriteAheadLogManager.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/FileWriteAheadLogManager.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/FileWriteAheadLogManager.java
index 3ca51f3..96387eb 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/FileWriteAheadLogManager.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/FileWriteAheadLogManager.java
@@ -93,6 +93,7 @@ import 
org.apache.ignite.internal.processors.cache.persistence.file.FileIO;
 import 
org.apache.ignite.internal.processors.cache.persistence.file.FileIOFactory;
 import 
org.apache.ignite.internal.processors.cache.persistence.file.RandomAccessFileIOFactory;
 import 
org.apache.ignite.internal.processors.cache.persistence.filename.PdsFolderSettings;
+import 
org.apache.ignite.internal.processors.cache.persistence.wal.AbstractWalRecordsIterator.AbstractFileDescriptor;
 import 
org.apache.ignite.internal.processors.cache.persistence.wal.crc.PureJavaCrc32;
 import 
org.apache.ignite.internal.processors.cache.persistence.wal.record.HeaderRecord;
 import 
org.apache.ignite.internal.processors.cache.persistence.wal.serializer.RecordSerializer;
@@ -126,6 +127,7 @@ import static java.nio.file.StandardOpenOption.WRITE;
 import static org.apache.ignite.IgniteSystemProperties.IGNITE_WAL_MMAP;
 import static 
org.apache.ignite.IgniteSystemProperties.IGNITE_WAL_SERIALIZER_VERSION;
 import static org.apache.ignite.configuration.WALMode.LOG_ONLY;
+import static org.apache.ignite.events.EventType.EVT_WAL_SEGMENT_ARCHIVED;
 import static org.apache.ignite.failure.FailureType.CRITICAL_ERROR;
 import static org.apache.ignite.failure.FailureType.SYSTEM_WORKER_TERMINATION;
 import static 
org.apache.ignite.internal.pagemem.wal.record.WALRecord.RecordType.SWITCH_SEGMENT_RECORD;
@@ -170,7 +172,7 @@ public class FileWriteAheadLogManager extends 
GridCacheSharedManagerAdapter impl
     private static final byte[] FILL_BUF = new byte[1024 * 1024];
 
     /** Pattern for segment file names */
-    private static final Pattern WAL_NAME_PATTERN = 
Pattern.compile("\\d{16}\\.wal");
+    public static final Pattern WAL_NAME_PATTERN = 
Pattern.compile("\\d{16}\\.wal");
 
     /** */
     private static final Pattern WAL_TEMP_NAME_PATTERN = 
Pattern.compile("\\d{16}\\.wal\\.tmp");
@@ -190,7 +192,7 @@ public class FileWriteAheadLogManager extends 
GridCacheSharedManagerAdapter impl
     };
 
     /** */
-    private static final Pattern WAL_SEGMENT_FILE_COMPACTED_PATTERN = 
Pattern.compile("\\d{16}\\.wal\\.zip");
+    public static final Pattern WAL_SEGMENT_FILE_COMPACTED_PATTERN = 
Pattern.compile("\\d{16}\\.wal\\.zip");
 
     /** WAL segment file filter, see {@link #WAL_NAME_PATTERN} */
     public static final FileFilter WAL_SEGMENT_COMPACTED_OR_RAW_FILE_FILTER = 
new FileFilter() {
@@ -951,6 +953,11 @@ public class FileWriteAheadLogManager extends 
GridCacheSharedManagerAdapter impl
     }
 
     /** {@inheritDoc} */
+    @Override public long lastArchivedSegment() {
+        return archivedMonitor.lastArchivedAbsoluteIndex();
+    }
+
+    /** {@inheritDoc} */
     @Override public boolean reserved(WALPointer ptr) {
         FileWALPointer fPtr = (FileWALPointer)ptr;
 
@@ -1655,9 +1662,12 @@ public class FileWriteAheadLogManager extends 
GridCacheSharedManagerAdapter impl
                         notifyAll();
                     }
 
-                    if (evt.isRecordable(EventType.EVT_WAL_SEGMENT_ARCHIVED)) {
-                        evt.record(new 
WalSegmentArchivedEvent(cctx.discovery().localNode(),
-                            res.getAbsIdx(), res.getDstArchiveFile()));
+                    if (evt.isRecordable(EVT_WAL_SEGMENT_ARCHIVED)) {
+                        evt.record(new WalSegmentArchivedEvent(
+                                cctx.discovery().localNode(),
+                                res.getAbsIdx(),
+                                res.getDstArchiveFile())
+                        );
                     }
                 }
             }
@@ -1911,7 +1921,7 @@ public class FileWriteAheadLogManager extends 
GridCacheSharedManagerAdapter impl
             FileDescriptor[] alreadyCompressed = 
scan(walArchiveDir.listFiles(WAL_SEGMENT_FILE_COMPACTED_FILTER));
 
             if (alreadyCompressed.length > 0)
-                lastCompressedIdx = alreadyCompressed[alreadyCompressed.length 
- 1].getIdx();
+                lastCompressedIdx = alreadyCompressed[alreadyCompressed.length 
- 1].idx();
         }
 
         /**
@@ -2319,7 +2329,8 @@ public class FileWriteAheadLogManager extends 
GridCacheSharedManagerAdapter impl
     /**
      * WAL file descriptor.
      */
-    public static class FileDescriptor implements Comparable<FileDescriptor>, 
AbstractWalRecordsIterator.AbstractFileDescriptor {
+    public static class FileDescriptor implements
+        Comparable<FileDescriptor>, AbstractFileDescriptor {
         /** */
         protected final File file;
 
@@ -2390,20 +2401,6 @@ public class FileWriteAheadLogManager extends 
GridCacheSharedManagerAdapter impl
         }
 
         /**
-         * @return Absolute WAL segment file index
-         */
-        public long getIdx() {
-            return idx;
-        }
-
-        /**
-         * @return absolute pathname string of this file descriptor pathname.
-         */
-        public String getAbsolutePath() {
-            return file.getAbsolutePath();
-        }
-
-        /**
          * @return True if segment is ZIP compressed.
          */
         @Override public boolean isCompressed() {

http://git-wip-us.apache.org/repos/asf/ignite/blob/d6ab2ae6/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/FsyncModeFileWriteAheadLogManager.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/FsyncModeFileWriteAheadLogManager.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/FsyncModeFileWriteAheadLogManager.java
index 6e59ad3..5db21d2 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/FsyncModeFileWriteAheadLogManager.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/FsyncModeFileWriteAheadLogManager.java
@@ -846,6 +846,11 @@ public class FsyncModeFileWriteAheadLogManager extends 
GridCacheSharedManagerAda
     }
 
     /** {@inheritDoc} */
+    @Override public long lastArchivedSegment() {
+        return archiver.lastArchivedAbsoluteIndex();
+    }
+
+    /** {@inheritDoc} */
     @Override public boolean reserved(WALPointer ptr) {
         FileWALPointer fPtr = (FileWALPointer)ptr;
 

http://git-wip-us.apache.org/repos/asf/ignite/blob/d6ab2ae6/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/SingleSegmentLogicalRecordsIterator.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/SingleSegmentLogicalRecordsIterator.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/SingleSegmentLogicalRecordsIterator.java
index 36e5b0e..f688bb4 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/SingleSegmentLogicalRecordsIterator.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/SingleSegmentLogicalRecordsIterator.java
@@ -86,9 +86,7 @@ public class SingleSegmentLogicalRecordsIterator extends 
AbstractWalRecordsItera
     private static RecordSerializerFactory 
initLogicalRecordsSerializerFactory(GridCacheSharedContext sharedCtx)
         throws IgniteCheckedException {
 
-        return new RecordSerializerFactoryImpl(sharedCtx)
-            .recordDeserializeFilter(new LogicalRecordsFilter())
-            .marshalledMode(true);
+        return new RecordSerializerFactoryImpl(sharedCtx, new 
LogicalRecordsFilter()).marshalledMode(true);
     }
 
     /** {@inheritDoc} */

http://git-wip-us.apache.org/repos/asf/ignite/blob/d6ab2ae6/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/reader/IgniteWalIteratorFactory.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/reader/IgniteWalIteratorFactory.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/reader/IgniteWalIteratorFactory.java
index 0c7bbb3..2bfc22d 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/reader/IgniteWalIteratorFactory.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/reader/IgniteWalIteratorFactory.java
@@ -17,17 +17,49 @@
 
 package org.apache.ignite.internal.processors.cache.persistence.wal.reader;
 
+import java.io.DataInput;
 import java.io.File;
+import java.io.IOException;
+import java.io.PrintStream;
+import java.nio.ByteOrder;
+import java.nio.file.FileVisitResult;
+import java.nio.file.Path;
+import java.nio.file.SimpleFileVisitor;
+import java.nio.file.attribute.BasicFileAttributes;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.Iterator;
+import java.util.List;
+import java.util.TreeSet;
 import org.apache.ignite.IgniteCheckedException;
 import org.apache.ignite.IgniteLogger;
 import org.apache.ignite.configuration.DataStorageConfiguration;
 import org.apache.ignite.internal.GridKernalContext;
 import org.apache.ignite.internal.pagemem.wal.WALIterator;
+import org.apache.ignite.internal.pagemem.wal.WALPointer;
+import org.apache.ignite.internal.pagemem.wal.record.WALRecord.RecordType;
 import org.apache.ignite.internal.processors.cache.GridCacheSharedContext;
+import org.apache.ignite.internal.processors.cache.persistence.file.FileIO;
 import 
org.apache.ignite.internal.processors.cache.persistence.file.FileIOFactory;
+import 
org.apache.ignite.internal.processors.cache.persistence.file.UnzipFileIO;
+import 
org.apache.ignite.internal.processors.cache.persistence.wal.ByteBufferExpander;
+import org.apache.ignite.internal.processors.cache.persistence.wal.FileInput;
+import 
org.apache.ignite.internal.processors.cache.persistence.wal.FileWALPointer;
+import 
org.apache.ignite.internal.processors.cache.persistence.wal.FileWriteAheadLogManager.FileDescriptor;
+import org.apache.ignite.internal.util.typedef.T2;
+import org.apache.ignite.internal.util.typedef.internal.A;
+import org.apache.ignite.internal.util.typedef.internal.U;
+import org.apache.ignite.lang.IgniteBiPredicate;
 import org.jetbrains.annotations.NotNull;
 import org.jetbrains.annotations.Nullable;
 
+import static java.lang.System.arraycopy;
+import static java.nio.file.Files.walkFileTree;
+import static 
org.apache.ignite.internal.processors.cache.persistence.wal.FileWriteAheadLogManager.WAL_NAME_PATTERN;
+import static 
org.apache.ignite.internal.processors.cache.persistence.wal.FileWriteAheadLogManager.WAL_SEGMENT_FILE_COMPACTED_PATTERN;
+import static 
org.apache.ignite.internal.processors.cache.persistence.wal.serializer.RecordV1Serializer.HEADER_RECORD_SIZE;
+import static 
org.apache.ignite.internal.processors.cache.persistence.wal.serializer.RecordV1Serializer.readPosition;
+
 /**
  * Factory for creating iterator over WAL files
  */
@@ -35,189 +67,522 @@ public class IgniteWalIteratorFactory {
     /** Logger. */
     private final IgniteLogger log;
 
-    /** Page size, in standalone iterator mode this value can't be taken from 
memory configuration. */
-    private final int pageSize;
-
-    /**
-     * Folder specifying location of metadata File Store. {@code null} means 
no specific folder is configured. <br>
-     * This folder should be specified for converting data entries into 
BinaryObjects
-     */
-    @Nullable private File binaryMetadataFileStoreDir;
-
     /**
-     * Folder specifying location of marshaller mapping file store. {@code 
null} means no specific folder is configured.
-     * <br> This folder should be specified for converting data entries into 
BinaryObjects. Providing {@code null} will
-     * disable unmarshall for non primitive objects, BinaryObjects will be 
provided
+     * Creates WAL files iterator factory.
+     * WAL iterator supports automatic converting from CacheObjects and 
KeyCacheObject into BinaryObjects
      */
-    @Nullable private File marshallerMappingFileStoreDir;
-
-    /** Keep binary. This flag disables converting of non primitive types 
(BinaryObjects) */
-    private boolean keepBinary;
-
-    /** Factory to provide I/O interfaces for read/write operations with files 
*/
-    private FileIOFactory ioFactory;
-
-    /** Wal records iterator buffer size */
-    private int bufSize = StandaloneWalRecordsIterator.DFLT_BUF_SIZE;
+    public IgniteWalIteratorFactory() {
+        this(ConsoleLogger.INSTANCE);
+    }
 
     /**
      * Creates WAL files iterator factory.
      * WAL iterator supports automatic converting from CacheObjects and 
KeyCacheObject into BinaryObjects
      *
      * @param log Logger.
-     * @param pageSize Page size which was used in Ignite Persistent Data 
store to read WAL from, size is validated
-     * according its boundaries.
-     * @param binaryMetadataFileStoreDir folder specifying location of 
metadata File Store. Should include "binary_meta"
-     * subfolder and consistent ID subfolder. Note Consistent ID should be 
already masked and should not contain special
-     * symbols. Providing {@code null} means no specific folder is configured. 
<br>
-     * @param marshallerMappingFileStoreDir Folder specifying location of 
marshaller mapping file store. Should include
-     * "marshaller" subfolder. Providing {@code null} will disable unmarshall 
for non primitive objects, BinaryObjects
-     * will be provided
-     * @param keepBinary {@code true} disables complex object unmarshall into 
source classes
      */
-    public IgniteWalIteratorFactory(
-        @NotNull final IgniteLogger log,
-        final int pageSize,
-        @Nullable final File binaryMetadataFileStoreDir,
-        @Nullable final File marshallerMappingFileStoreDir,
-        final boolean keepBinary) {
+    public IgniteWalIteratorFactory(@NotNull final IgniteLogger log) {
         this.log = log;
-        this.pageSize = pageSize;
-        this.binaryMetadataFileStoreDir = binaryMetadataFileStoreDir;
-        this.marshallerMappingFileStoreDir = marshallerMappingFileStoreDir;
-        this.keepBinary = keepBinary;
-        this.ioFactory = new DataStorageConfiguration().getFileIOFactory();
-        new DataStorageConfiguration().setPageSize(pageSize); // just for 
validate
     }
 
     /**
-     * Creates WAL files iterator factory.
-     * WAL iterator supports automatic converting from CacheObjects and 
KeyCacheObject into BinaryObjects
+     * Creates iterator for file by file scan mode.
+     * This method may be used for work folder, file indexes are scanned from 
the file context.
+     * In this mode only provided WAL segments will be scanned. New WAL files 
created during iteration will be ignored.
      *
-     * @param log Logger.
-     * @param pageSize Page size which was used in Ignite Persistent Data 
store to read WAL from, size is validated
-     * according its boundaries.
-     * @param binaryMetadataFileStoreDir folder specifying location of 
metadata File Store. Should include "binary_meta"
-     * subfolder and consistent ID subfolder. Note Consistent ID should be 
already masked and should not contain special
-     * symbols. Providing {@code null} means no specific folder is configured. 
<br>
-     * @param marshallerMappingFileStoreDir Folder specifying location of 
marshaller mapping file store. Should include
-     * "marshaller" subfolder. Providing {@code null} will disable unmarshall 
for non primitive objects, BinaryObjects
-     * will be provided
+     * @param filesOrDirs files to scan. A file can be the path to '.wal' 
file, or directory with '.wal' files.
+     * Order is not important, but it is significant to provide all segments 
without omissions.
+     * Path should not contain special symbols. Special symbols should be 
already masked.
+     * @return closable WAL records iterator, should be closed when non needed.
+     * @throws IgniteCheckedException if failed to read files
+     * @throws IllegalArgumentException If parameter validation failed.
      */
-    public IgniteWalIteratorFactory(
-        @NotNull final IgniteLogger log,
-        final int pageSize,
-        @Nullable final File binaryMetadataFileStoreDir,
-        @Nullable final File marshallerMappingFileStoreDir) {
-        this(log, pageSize, binaryMetadataFileStoreDir, 
marshallerMappingFileStoreDir, false);
+    public WALIterator iterator(
+        @NotNull File... filesOrDirs
+    ) throws IgniteCheckedException, IllegalArgumentException {
+        return iterator(new 
IteratorParametersBuilder().filesOrDirs(filesOrDirs));
     }
 
     /**
-     * Creates WAL files iterator factory. This constructor does not allow WAL 
iterators access to data entries key and value.
+     * Creates iterator for file by file scan mode.
+     * This method may be used for work folder, file indexes are scanned from 
the file context.
+     * In this mode only provided WAL segments will be scanned. New WAL files 
created during iteration will be ignored.
      *
-     * @param log Logger.
-     * @param ioFactory Custom factory for non-standard file API to be used in 
WAL reading.
-     * @param pageSize Page size which was used in Ignite Persistent Data 
store to read WAL from, size is validated
-     * according its boundaries.
+     * @param filesOrDirs paths to scan. A path can be direct to '.wal' file, 
or directory with '.wal' files.
+     * Order is not important, but it is significant to provide all segments 
without omissions.
+     * Path should not contain special symbols. Special symbols should be 
already masked.
+     * @return closable WAL records iterator, should be closed when non needed.
+     * @throws IgniteCheckedException If failed to read files.
+     * @throws IllegalArgumentException If parameter validation failed.
      */
-    public IgniteWalIteratorFactory(@NotNull final IgniteLogger log, @NotNull 
final FileIOFactory ioFactory, int pageSize) {
-        this.log = log;
-        this.pageSize = pageSize;
-        this.ioFactory = ioFactory;
-        new DataStorageConfiguration().setPageSize(pageSize); // just for 
validate
+    public WALIterator iterator(
+        @NotNull String... filesOrDirs
+    ) throws IgniteCheckedException, IllegalArgumentException {
+        return iterator(new 
IteratorParametersBuilder().filesOrDirs(filesOrDirs));
     }
 
     /**
-     * Creates WAL files iterator factory. This constructor does not allow WAL 
iterators access to data entries key and
-     * value.
-     *
-     * @param log Logger.
-     * @param pageSize Page size which was used in Ignite Persistent Data 
store to read WAL from, size is validated
-     * according its boundaries.
+     * @param iteratorParametersBuilder Iterator parameters builder.
+     * @return closable WAL records iterator, should be closed when non needed
      */
-    public IgniteWalIteratorFactory(@NotNull final IgniteLogger log, int 
pageSize) {
-        this(log, new DataStorageConfiguration().getFileIOFactory(), pageSize);
+    public WALIterator iterator(
+        @NotNull IteratorParametersBuilder iteratorParametersBuilder
+    ) throws IgniteCheckedException, IllegalArgumentException {
+        iteratorParametersBuilder.validate();
+
+        return new StandaloneWalRecordsIterator(log,
+            prepareSharedCtx(iteratorParametersBuilder),
+            iteratorParametersBuilder.ioFactory,
+            resolveWalFiles(
+                iteratorParametersBuilder.filesOrDirs,
+                iteratorParametersBuilder
+            ),
+            iteratorParametersBuilder.filter,
+            iteratorParametersBuilder.keepBinary,
+            iteratorParametersBuilder.bufferSize
+        );
     }
 
     /**
-     * Creates iterator for (archive) directory scan mode.
-     * Note in this mode total scanned files at end of iteration may be wider 
that initial files in directory.
-     * This mode does not support work directory scan because work directory 
contains unpredictable number in file name.
-     * Such file may broke iteration.
+     * Find WAL gaps, for example:
+     * 0 1 2 3 4 7 8 10 - WAL segment files in directory, this method will 
return
+     * List with two tuples [(4,7),(8,10)].
      *
-     * @param walDirWithConsistentId directory with WAL files. Should already 
contain node consistent ID as subfolder.
-     * Note: 'Consistent ID'-based subfolder name (if any) should not contain 
special symbols.
-     * @return closable WAL records iterator, should be closed when non needed
-     * @throws IgniteCheckedException if failed to read folder
+     * @param filesOrDirs Paths to files or directories for scan.
+     * @return List of tuples, low and high index segments with gap.
      */
-    public WALIterator iteratorArchiveDirectory(
-        @NotNull final File walDirWithConsistentId) throws 
IgniteCheckedException {
-        return new StandaloneWalRecordsIterator(
-            walDirWithConsistentId, log, prepareSharedCtx(), ioFactory, 
keepBinary, bufSize);
+    public List<T2<Long, Long>> hasGaps(
+        @NotNull String... filesOrDirs
+    ) throws IllegalArgumentException {
+        return hasGaps(new 
IteratorParametersBuilder().filesOrDirs(filesOrDirs));
     }
 
     /**
-     * Creates iterator for file by file scan mode.
-     * This method may be used only for archive folder (not for work).
-     * In this mode only provided WAL segments will be scanned. New WAL files 
created during iteration will be ignored
+     * Find WAL gaps, for example:
+     * 0 1 2 3 4 7 8 10 - WAL segment files in directory, this method will 
return
+     * List with two tuples [(4,7),(8,10)].
      *
-     * @param files files to scan. Order is not important, but it is 
significant to provide all segments without omissions.
-     * Parameter should contain direct file links to '.wal' files from archive 
directory.
-     * 'Consistent ID'-based subfolder name (if any) should not contain 
special symbols.
-     * Special symbols should be already masked.
-     *
-     * @return closable WAL records iterator, should be closed when non needed
-     * @throws IgniteCheckedException if failed to read files
+     * @param filesOrDirs Files or directories to scan.
+     * @return List of tuples, low and high index segments with gap.
      */
-    public WALIterator iteratorArchiveFiles(@NotNull final File... files) 
throws IgniteCheckedException {
-        return new StandaloneWalRecordsIterator(log, prepareSharedCtx(), 
ioFactory, false, keepBinary, bufSize, files);
+    public List<T2<Long, Long>> hasGaps(
+        @NotNull File... filesOrDirs
+    ) throws IllegalArgumentException {
+        return hasGaps(new 
IteratorParametersBuilder().filesOrDirs(filesOrDirs));
     }
 
     /**
-     * Creates iterator for file by file scan mode.
-     * This method may be used for work folder, file indexes are scanned from 
the file context.
-     * In this mode only provided WAL segments will be scanned. New WAL files 
created during iteration will be ignored.
-     *
-     * @param files files to scan. Order is not important, but it is 
significant to provide all segments without omissions.
-     * Parameter should contain direct file links to '.wal' files from work 
directory.
-     * 'Consistent ID'-based subfolder name (if any) should not contain 
special symbols.
-     * Special symbols should be already masked.
+     * @param iteratorParametersBuilder Iterator parameters builder.
+     * @return List of tuples, low and high index segments with gap.
+     */
+    public List<T2<Long, Long>> hasGaps(
+        @NotNull IteratorParametersBuilder iteratorParametersBuilder
+    ) throws IllegalArgumentException {
+        iteratorParametersBuilder.validate();
+
+        List<T2<Long, Long>> gaps = new ArrayList<>();
+
+        List<FileDescriptor> descriptors = resolveWalFiles(
+            iteratorParametersBuilder.filesOrDirs,
+            iteratorParametersBuilder
+        );
+
+        Iterator<FileDescriptor> it = descriptors.iterator();
+
+        FileDescriptor prevFd = null;
+
+        while (it.hasNext()) {
+            FileDescriptor nextFd = it.next();
+
+            if (prevFd == null) {
+                prevFd = nextFd;
+
+                continue;
+            }
+
+            if (prevFd.idx() + 1 != nextFd.idx())
+                gaps.add(new T2<>(prevFd.idx(), nextFd.idx()));
+
+            prevFd = nextFd;
+        }
+
+        return gaps;
+    }
+
+    /**
+     * This methods checks all provided files to be correct WAL segment.
+     * Header record and its position is checked. WAL position is used to 
determine real index.
+     * File index from file name is ignored.
      *
-     * @return closable WAL records iterator, should be closed when non needed
-     * @throws IgniteCheckedException if failed to read files
+     * @param iteratorParametersBuilder IteratorParametersBuilder.
+     * @return list of file descriptors with checked header records, having 
correct file index is set
+     */
+    private List<FileDescriptor> resolveWalFiles(
+        File[] filesOrDirs,
+        IteratorParametersBuilder iteratorParametersBuilder
+    ) {
+        if (filesOrDirs == null || filesOrDirs.length == 0)
+            return Collections.emptyList();
+
+        final FileIOFactory ioFactory = iteratorParametersBuilder.ioFactory;
+
+        final TreeSet<FileDescriptor> descriptors = new TreeSet<>();
+
+        for (File file : filesOrDirs) {
+            if (file.isDirectory()) {
+                try {
+                    walkFileTree(file.toPath(), new SimpleFileVisitor<Path>() {
+                        @Override
+                        public FileVisitResult visitFile(Path path, 
BasicFileAttributes attrs) {
+                            addFileDescriptor(path.toFile(), ioFactory, 
descriptors);
+
+                            return FileVisitResult.CONTINUE;
+                        }
+                    });
+                }
+                catch (IOException e) {
+                    U.error(log, "Failed to walk directories from root [" + 
file + "]. Skipping this directory.", e);
+                }
+
+                continue;
+            }
+
+            addFileDescriptor(file, ioFactory, descriptors);
+        }
+
+        return new ArrayList<>(descriptors);
+    }
+
+    /**
+     * @param file File.
+     * @param ioFactory IO factory.
+     * @param descriptors List of descriptors.
+     */
+    private void addFileDescriptor(File file, FileIOFactory ioFactory, 
TreeSet<FileDescriptor> descriptors) {
+        if (file.length() < HEADER_RECORD_SIZE)
+            return; // Filter out this segment as it is too short.
+
+        String fileName = file.getName();
+
+        if (!WAL_NAME_PATTERN.matcher(fileName).matches() &&
+            !WAL_SEGMENT_FILE_COMPACTED_PATTERN.matcher(fileName).matches())
+            return;  // Filter out this because it is not segment file.
+
+        FileDescriptor desc = readFileDescriptor(file, ioFactory);
+
+        if (desc != null)
+            descriptors.add(desc);
+    }
+
+    /**
+     * @param file File to read.
+     * @param ioFactory IO factory.
      */
-    public WALIterator iteratorWorkFiles(@NotNull final File... files) throws 
IgniteCheckedException {
-        return new StandaloneWalRecordsIterator(log, prepareSharedCtx(), 
ioFactory, true, keepBinary, bufSize, files);
+    private FileDescriptor readFileDescriptor(File file, FileIOFactory 
ioFactory) {
+        FileDescriptor ds = new FileDescriptor(file);
+
+        try (
+            FileIO fileIO = ds.isCompressed() ? new UnzipFileIO(file) : 
ioFactory.create(file);
+            ByteBufferExpander buf = new 
ByteBufferExpander(HEADER_RECORD_SIZE, ByteOrder.nativeOrder())
+        ) {
+            final DataInput in = new FileInput(fileIO, buf);
+
+            // Header record must be agnostic to the serializer version.
+            final int type = in.readUnsignedByte();
+
+            if (type == RecordType.STOP_ITERATION_RECORD_TYPE) {
+                if (log.isInfoEnabled())
+                    log.info("Reached logical end of the segment for file " + 
file);
+
+                return null;
+            }
+
+            FileWALPointer ptr = readPosition(in);
+
+            return new FileDescriptor(file, ptr.index());
+        }
+        catch (IOException e) {
+            U.warn(log, "Failed to scan index from file [" + file + "]. 
Skipping this file during iteration", e);
+
+            return null;
+        }
     }
 
     /**
-     * @return fake shared context required for create minimal services for 
record reading
+     * @return Fake shared context required for create minimal services for 
record reading.
      */
-    @NotNull private GridCacheSharedContext prepareSharedCtx() throws 
IgniteCheckedException {
-        final GridKernalContext kernalCtx = new 
StandaloneGridKernalContext(log, binaryMetadataFileStoreDir, 
marshallerMappingFileStoreDir);
+    @NotNull private GridCacheSharedContext prepareSharedCtx(
+        IteratorParametersBuilder iteratorParametersBuilder
+    ) throws IgniteCheckedException {
+        GridKernalContext kernalCtx = new StandaloneGridKernalContext(log,
+            iteratorParametersBuilder.binaryMetadataFileStoreDir,
+            iteratorParametersBuilder.marshallerMappingFileStoreDir
+        );
 
-        final StandaloneIgniteCacheDatabaseSharedManager dbMgr = new 
StandaloneIgniteCacheDatabaseSharedManager();
+        StandaloneIgniteCacheDatabaseSharedManager dbMgr = new 
StandaloneIgniteCacheDatabaseSharedManager();
 
-        dbMgr.setPageSize(pageSize);
+        dbMgr.setPageSize(iteratorParametersBuilder.pageSize);
 
         return new GridCacheSharedContext<>(
             kernalCtx, null, null, null,
             null, null, null, dbMgr, null,
             null, null, null, null,
-            null, null, null);
+            null, null, null
+        );
     }
 
     /**
-     * @param ioFactory New factory to provide I/O interfaces for read/write 
operations with files
+     * Wal iterator parameter builder.
      */
-    public void ioFactory(FileIOFactory ioFactory) {
-        this.ioFactory = ioFactory;
+    public static class IteratorParametersBuilder {
+        /** */
+        private File[] filesOrDirs;
+
+        /** */
+        private int pageSize = DataStorageConfiguration.DFLT_PAGE_SIZE;
+
+        /** Wal records iterator buffer size. */
+        private int bufferSize = StandaloneWalRecordsIterator.DFLT_BUF_SIZE;
+
+        /** Keep binary. This flag disables converting of non primitive types 
(BinaryObjects). */
+        private boolean keepBinary;
+
+        /** Factory to provide I/O interfaces for read/write operations with 
files. */
+        private FileIOFactory ioFactory = new 
DataStorageConfiguration().getFileIOFactory();
+
+        /**
+         * Folder specifying location of metadata File Store. {@code null} 
means no specific folder is configured. <br>
+         * This folder should be specified for converting data entries into 
BinaryObjects
+         */
+        @Nullable private File binaryMetadataFileStoreDir;
+
+        /**
+         * Folder specifying location of marshaller mapping file store. {@code 
null} means no specific folder is configured.
+         * <br> This folder should be specified for converting data entries 
into BinaryObjects. Providing {@code null} will
+         * disable unmarshall for non primitive objects, BinaryObjects will be 
provided
+         */
+        @Nullable private File marshallerMappingFileStoreDir;
+
+        /** */
+        @Nullable private IgniteBiPredicate<RecordType, WALPointer> filter;
+
+        /**
+         * @param filesOrDirs Paths to files or directories.
+         * @return IteratorParametersBuilder Self reference.
+         */
+        public IteratorParametersBuilder filesOrDirs(String... filesOrDirs) {
+            File[] filesOrDirs0 = new File[filesOrDirs.length];
+
+            for (int i = 0; i < filesOrDirs.length; i++) {
+                filesOrDirs0[i] = new File(filesOrDirs[i]);
+            }
+
+            return filesOrDirs(filesOrDirs0);
+        }
+
+        /**
+         * @param filesOrDirs Files or directories.
+         * @return IteratorParametersBuilder Self reference.
+         */
+        public IteratorParametersBuilder filesOrDirs(File... filesOrDirs) {
+            if (this.filesOrDirs == null)
+                this.filesOrDirs = filesOrDirs;
+            else
+                this.filesOrDirs = merge(this.filesOrDirs, filesOrDirs);
+
+            return this;
+        }
+
+        /**
+         * @param pageSize Page size.
+         * @return IteratorParametersBuilder Self reference.
+         */
+        public IteratorParametersBuilder pageSize(int pageSize) {
+            this.pageSize = pageSize;
+
+            return this;
+        }
+
+        /**
+         * @param bufferSize Initial size of buffer for reading segments.
+         * @return IteratorParametersBuilder Self reference.
+         */
+        public IteratorParametersBuilder bufferSize(int bufferSize) {
+            this.bufferSize = bufferSize;
+
+            return this;
+        }
+
+        /**
+         * @return IteratorParametersBuilder Self reference.
+         */
+        public IteratorParametersBuilder keepBinary(boolean keepBinary) {
+            this.keepBinary = keepBinary;
+
+            return this;
+        }
+
+        /**
+         * @param ioFactory Custom IO factory for reading files.
+         * @return IteratorParametersBuilder Self reference.
+         */
+        public IteratorParametersBuilder ioFactory(FileIOFactory ioFactory) {
+            this.ioFactory = ioFactory;
+
+            return this;
+        }
+
+        /**
+         * @param binaryMetadataFileStoreDir Path to the binary metadata.
+         * @return IteratorParametersBuilder Self reference.
+         */
+        public IteratorParametersBuilder binaryMetadataFileStoreDir(File 
binaryMetadataFileStoreDir) {
+            this.binaryMetadataFileStoreDir = binaryMetadataFileStoreDir;
+
+            return this;
+        }
+
+        /**
+         * @param marshallerMappingFileStoreDir Path to the marshaller mapping.
+         * @return IteratorParametersBuilder Self reference.
+         */
+        public IteratorParametersBuilder marshallerMappingFileStoreDir(File 
marshallerMappingFileStoreDir) {
+            this.marshallerMappingFileStoreDir = marshallerMappingFileStoreDir;
+
+            return this;
+        }
+
+        /**
+         * @param filter Record filter for skip records during iteration.
+         * @return IteratorParametersBuilder Self reference.
+         */
+        public IteratorParametersBuilder filter(IgniteBiPredicate<RecordType, 
WALPointer> filter) {
+            this.filter = filter;
+
+            return this;
+        }
+
+        /**
+         * Copy current state of builder to new instance.
+         *
+         * @return IteratorParametersBuilder Self reference.
+         */
+        public IteratorParametersBuilder copy() {
+            return new IteratorParametersBuilder()
+                .filesOrDirs(filesOrDirs)
+                .pageSize(pageSize)
+                .bufferSize(bufferSize)
+                .keepBinary(keepBinary)
+                .ioFactory(ioFactory)
+                .binaryMetadataFileStoreDir(binaryMetadataFileStoreDir)
+                .marshallerMappingFileStoreDir(marshallerMappingFileStoreDir)
+                .filter(filter);
+        }
+
+        /**
+         * @throws IllegalArgumentException If validation failed.
+         */
+        public void validate() throws IllegalArgumentException {
+            A.ensure(pageSize >= 1024 && pageSize <= 16 * 1024, "Page size 
must be between 1kB and 16kB.");
+            A.ensure(U.isPow2(pageSize), "Page size must be a power of 2.");
+
+            A.ensure(bufferSize >= pageSize * 2, "Buffer to small.");
+        }
+
+        /**
+         * Merge file arrays.
+         *
+         * @param f1 Files array one.
+         * @param f2 Files array two.
+         * @return Merged arrays from one and two arrays.
+         */
+        private File[] merge(File[] f1, File[] f2) {
+            File[] merged = new File[f1.length + f2.length];
+
+            arraycopy(f1, 0, merged, 0, f1.length);
+            arraycopy(f2, 0, merged, f1.length, f2.length);
+
+            return merged;
+        }
     }
 
     /**
-     * @param bufSize New wal records iterator buffer size
+     *
      */
-    public void bufferSize(int bufSize) {
-        this.bufSize = bufSize;
+    private static class ConsoleLogger implements IgniteLogger {
+        /** */
+        private static final ConsoleLogger INSTANCE = new ConsoleLogger();
+
+        /** */
+        private static final PrintStream OUT = System.out;
+
+        /** */
+        private static final PrintStream ERR = System.err;
+
+        /** */
+        private ConsoleLogger() {
+
+        }
+
+        /** {@inheritDoc} */
+        @Override public IgniteLogger getLogger(Object ctgr) {
+            return this;
+        }
+
+        /** {@inheritDoc} */
+        @Override public void trace(String msg) {
+
+        }
+
+        /** {@inheritDoc} */
+        @Override public void debug(String msg) {
+
+        }
+
+        /** {@inheritDoc} */
+        @Override public void info(String msg) {
+            OUT.println(msg);
+        }
+
+        /** {@inheritDoc} */
+        @Override public void warning(String msg, @Nullable Throwable e) {
+            OUT.println(msg);
+
+            if (e != null)
+                e.printStackTrace(OUT);
+        }
+
+        /** {@inheritDoc} */
+        @Override public void error(String msg, @Nullable Throwable e) {
+            ERR.println(msg);
+
+            if (e != null)
+                e.printStackTrace(ERR);
+        }
+
+        /** {@inheritDoc} */
+        @Override public boolean isTraceEnabled() {
+            return false;
+        }
+
+        /** {@inheritDoc} */
+        @Override public boolean isDebugEnabled() {
+            return false;
+        }
+
+        /** {@inheritDoc} */
+        @Override public boolean isInfoEnabled() {
+            return true;
+        }
+
+        /** {@inheritDoc} */
+        @Override public boolean isQuiet() {
+            return false;
+        }
+
+        /** {@inheritDoc} */
+        @Override public String fileName() {
+            return "SYSTEM.OUT";
+        }
     }
 }

http://git-wip-us.apache.org/repos/asf/ignite/blob/d6ab2ae6/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/reader/StandaloneWalRecordsIterator.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/reader/StandaloneWalRecordsIterator.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/reader/StandaloneWalRecordsIterator.java
index 712517b..9df4468 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/reader/StandaloneWalRecordsIterator.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/reader/StandaloneWalRecordsIterator.java
@@ -17,44 +17,41 @@
 
 package org.apache.ignite.internal.processors.cache.persistence.wal.reader;
 
-import java.io.DataInput;
 import java.io.File;
 import java.io.FileNotFoundException;
 import java.io.IOException;
-import java.nio.ByteOrder;
 import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.Collections;
 import java.util.List;
 import org.apache.ignite.IgniteCheckedException;
 import org.apache.ignite.IgniteLogger;
 import org.apache.ignite.internal.GridKernalContext;
+import org.apache.ignite.internal.pagemem.wal.WALPointer;
 import org.apache.ignite.internal.pagemem.wal.record.DataEntry;
 import org.apache.ignite.internal.pagemem.wal.record.DataRecord;
 import org.apache.ignite.internal.pagemem.wal.record.LazyDataEntry;
 import org.apache.ignite.internal.pagemem.wal.record.UnwrapDataEntry;
 import org.apache.ignite.internal.pagemem.wal.record.WALRecord;
+import org.apache.ignite.internal.pagemem.wal.record.WALRecord.RecordType;
 import org.apache.ignite.internal.processors.cache.CacheObject;
 import org.apache.ignite.internal.processors.cache.CacheObjectContext;
 import org.apache.ignite.internal.processors.cache.GridCacheSharedContext;
 import org.apache.ignite.internal.processors.cache.KeyCacheObject;
 import org.apache.ignite.internal.processors.cache.persistence.file.FileIO;
 import 
org.apache.ignite.internal.processors.cache.persistence.file.FileIOFactory;
+import 
org.apache.ignite.internal.processors.cache.persistence.file.UnzipFileIO;
 import 
org.apache.ignite.internal.processors.cache.persistence.wal.AbstractWalRecordsIterator;
-import 
org.apache.ignite.internal.processors.cache.persistence.wal.ByteBufferExpander;
 import org.apache.ignite.internal.processors.cache.persistence.wal.FileInput;
 import 
org.apache.ignite.internal.processors.cache.persistence.wal.FileWALPointer;
-import 
org.apache.ignite.internal.processors.cache.persistence.wal.FileWriteAheadLogManager;
+import 
org.apache.ignite.internal.processors.cache.persistence.wal.FileWriteAheadLogManager.FileDescriptor;
+import 
org.apache.ignite.internal.processors.cache.persistence.wal.FileWriteAheadLogManager.ReadFileHandle;
 import 
org.apache.ignite.internal.processors.cache.persistence.wal.serializer.RecordSerializer;
 import 
org.apache.ignite.internal.processors.cache.persistence.wal.serializer.RecordSerializerFactoryImpl;
-import 
org.apache.ignite.internal.processors.cache.persistence.wal.serializer.RecordV1Serializer;
 import 
org.apache.ignite.internal.processors.cacheobject.IgniteCacheObjectProcessor;
-import org.apache.ignite.internal.util.typedef.F;
-import org.apache.ignite.internal.util.typedef.internal.U;
+import org.apache.ignite.lang.IgniteBiPredicate;
 import org.jetbrains.annotations.NotNull;
 import org.jetbrains.annotations.Nullable;
 
-import static 
org.apache.ignite.internal.processors.cache.persistence.wal.serializer.RecordV1Serializer.HEADER_RECORD_SIZE;
+import static 
org.apache.ignite.internal.processors.cache.persistence.wal.serializer.RecordV1Serializer.readSegmentHeader;
 
 /**
  * WAL reader iterator, for creation in standalone WAL reader tool
@@ -66,78 +63,52 @@ class StandaloneWalRecordsIterator extends 
AbstractWalRecordsIterator {
 
     /** */
     private static final long serialVersionUID = 0L;
-
-    /**
-     * WAL files directory. Should already contain 'consistent ID' as 
subfolder.
-     * <code>null</code> value means file-by-file iteration mode
-     */
-    @Nullable
-    private File walFilesDir;
-
     /**
      * File descriptors remained to scan.
      * <code>null</code> value means directory scan mode
      */
     @Nullable
-    private List<FileWriteAheadLogManager.FileDescriptor> walFileDescriptors;
+    private final List<FileDescriptor> walFileDescriptors;
+
+    /** */
+    private int curIdx = -1;
 
     /** Keep binary. This flag disables converting of non primitive types 
(BinaryObjects) */
     private boolean keepBinary;
 
     /**
-     * Creates iterator in directory scan mode
-     * @param walFilesDir Wal files directory. Should already contain node 
consistent ID as subfolder
-     * @param log Logger.
-     * @param sharedCtx Shared context. Cache processor is to be configured if 
Cache Object Key & Data Entry is required.
-     * @param ioFactory File I/O factory.
-     * @param keepBinary  Keep binary. This flag disables converting of non 
primitive types
-     * @param bufSize Buffer size.
-     */
-    StandaloneWalRecordsIterator(
-        @NotNull File walFilesDir,
-        @NotNull IgniteLogger log,
-        @NotNull GridCacheSharedContext sharedCtx,
-        @NotNull FileIOFactory ioFactory,
-        boolean keepBinary,
-        int bufSize
-    ) throws IgniteCheckedException {
-        super(log,
-            sharedCtx,
-            new RecordSerializerFactoryImpl(sharedCtx),
-            ioFactory,
-            bufSize);
-        this.keepBinary = keepBinary;
-        init(walFilesDir, false, null);
-        advance();
-    }
-
-    /**
      * Creates iterator in file-by-file iteration mode. Directory
      * @param log Logger.
      * @param sharedCtx Shared context. Cache processor is to be configured if 
Cache Object Key & Data Entry is
      * required.
      * @param ioFactory File I/O factory.
-     * @param workDir Work directory is scanned, false - archive
      * @param keepBinary Keep binary. This flag disables converting of non 
primitive types
      * (BinaryObjects will be used instead)
      * @param walFiles Wal files.
      */
     StandaloneWalRecordsIterator(
-            @NotNull IgniteLogger log,
-            @NotNull GridCacheSharedContext sharedCtx,
-            @NotNull FileIOFactory ioFactory,
-            boolean workDir,
-            boolean keepBinary,
-            int bufSize,
-            @NotNull File... walFiles) throws IgniteCheckedException {
-        super(log,
+        @NotNull IgniteLogger log,
+        @NotNull GridCacheSharedContext sharedCtx,
+        @NotNull FileIOFactory ioFactory,
+        @NotNull List<FileDescriptor> walFiles,
+        IgniteBiPredicate<RecordType, WALPointer> readTypeFilter,
+        boolean keepBinary,
+        int initialReadBufferSize
+    ) throws IgniteCheckedException {
+        super(
+            log,
             sharedCtx,
-            new RecordSerializerFactoryImpl(sharedCtx),
+            new RecordSerializerFactoryImpl(sharedCtx, readTypeFilter),
             ioFactory,
-            bufSize);
+            initialReadBufferSize
+        );
 
         this.keepBinary = keepBinary;
-        init(null, workDir, walFiles);
+
+        walFileDescriptors = walFiles;
+
+        init(walFiles);
+
         advance();
     }
 
@@ -145,119 +116,42 @@ class StandaloneWalRecordsIterator extends 
AbstractWalRecordsIterator {
      * For directory mode sets oldest file as initial segment,
      * for file by file mode, converts all files to descriptors and gets 
oldest as initial.
      *
-     * @param walFilesDir directory for directory scan mode
-     * @param workDir work directory, only for file-by-file mode
      * @param walFiles files for file-by-file iteration mode
      */
-    private void init(
-        @Nullable final File walFilesDir,
-        final boolean workDir,
-        @Nullable final File[] walFiles) throws IgniteCheckedException {
-        if (walFilesDir != null) {
-            FileWriteAheadLogManager.FileDescriptor[] descs = 
FileWriteAheadLogManager.loadFileDescriptors(walFilesDir);
-            curWalSegmIdx = !F.isEmpty(descs) ? descs[0].getIdx() : 0;
-            this.walFilesDir = walFilesDir;
-        }
-        else {
+    private void init(List<FileDescriptor> walFiles) {
+        if (walFiles == null || walFiles.isEmpty())
+            return;
 
-            if (workDir)
-                walFileDescriptors = scanIndexesFromFileHeaders(walFiles);
-            else
-                walFileDescriptors = new 
ArrayList<>(Arrays.asList(FileWriteAheadLogManager.scan(walFiles)));
-
-            curWalSegmIdx = !walFileDescriptors.isEmpty() ? 
walFileDescriptors.get(0).getIdx() : 0;
-        }
-        curWalSegmIdx--;
+        curWalSegmIdx = walFiles.get(curIdx + 1).idx() - 1;
 
         if (log.isDebugEnabled())
             log.debug("Initialized WAL cursor [curWalSegmIdx=" + curWalSegmIdx 
+ ']');
     }
 
-    /**
-     * This methods checks all provided files to be correct WAL segment.
-     * Header record and its position is checked. WAL position is used to 
determine real index.
-     * File index from file name is ignored.
-     *
-     * @param allFiles files to scan.
-     * @return list of file descriptors with checked header records, having 
correct file index is set
-     */
-    private List<FileWriteAheadLogManager.FileDescriptor> 
scanIndexesFromFileHeaders(
-        @Nullable final File[] allFiles) {
-        if (allFiles == null || allFiles.length == 0)
-            return Collections.emptyList();
-
-        final List<FileWriteAheadLogManager.FileDescriptor> resultingDescs = 
new ArrayList<>();
-
-        for (File file : allFiles) {
-            if (file.length() < HEADER_RECORD_SIZE)
-                continue;  //filter out this segment as it is too short
-
-            FileWALPointer ptr;
-
-            try (
-                FileIO fileIO = ioFactory.create(file);
-                ByteBufferExpander buf = new 
ByteBufferExpander(HEADER_RECORD_SIZE, ByteOrder.nativeOrder())
-            ) {
-                final DataInput in = new FileInput(fileIO, buf);
-
-                // Header record must be agnostic to the serializer version.
-                final int type = in.readUnsignedByte();
-
-                if (type == WALRecord.RecordType.STOP_ITERATION_RECORD_TYPE) {
-                    if (log.isInfoEnabled())
-                        log.info("Reached logical end of the segment for file 
" + file);
-
-                    continue; //filter out this segment
-                }
-                ptr = RecordV1Serializer.readPosition(in);
-            }
-            catch (IOException e) {
-                U.warn(log, "Failed to scan index from file [" + file + "]. 
Skipping this file during iteration", e);
-
-                continue; //filter out this segment
-            }
-
-            resultingDescs.add(new 
FileWriteAheadLogManager.FileDescriptor(file, ptr.index()));
-        }
-        Collections.sort(resultingDescs);
-
-        return resultingDescs;
-    }
-
     /** {@inheritDoc} */
     @Override protected AbstractReadFileHandle advanceSegment(
-        @Nullable final AbstractReadFileHandle curWalSegment) throws 
IgniteCheckedException {
+        @Nullable final AbstractReadFileHandle curWalSegment
+    ) throws IgniteCheckedException {
 
         if (curWalSegment != null)
             curWalSegment.close();
 
         curWalSegmIdx++;
-        // curHandle.workDir is false
-        final FileWriteAheadLogManager.FileDescriptor fd;
-
-        if (walFilesDir != null) {
-            File segmentFile = new File(walFilesDir,
-                
FileWriteAheadLogManager.FileDescriptor.fileName(curWalSegmIdx));
 
-            if (!segmentFile.exists())
-                segmentFile = new File(walFilesDir,
-                    
FileWriteAheadLogManager.FileDescriptor.fileName(curWalSegmIdx) + ".zip");
+        curIdx++;
 
-            fd = new FileWriteAheadLogManager.FileDescriptor(segmentFile);
-        }
-        else {
-            if (walFileDescriptors.isEmpty())
-                return null; //no files to read, stop iteration
+        if (curIdx >= walFileDescriptors.size())
+            return null;
 
-            fd = walFileDescriptors.remove(0);
-        }
+        FileDescriptor fd = walFileDescriptors.get(curIdx);
 
         if (log.isDebugEnabled())
-            log.debug("Reading next file [absIdx=" + curWalSegmIdx + ", file=" 
+ fd.getAbsolutePath() + ']');
+            log.debug("Reading next file [absIdx=" + curWalSegmIdx + ", file=" 
+ fd.file().getAbsolutePath() + ']');
 
         assert fd != null;
 
         curRec = null;
+
         try {
             return initReadHandle(fd, null);
         }
@@ -270,11 +164,42 @@ class StandaloneWalRecordsIterator extends 
AbstractWalRecordsIterator {
     }
 
     /** {@inheritDoc} */
+    @Override protected AbstractReadFileHandle initReadHandle(
+        @NotNull AbstractFileDescriptor desc,
+        @Nullable FileWALPointer start
+    ) throws IgniteCheckedException, FileNotFoundException {
+
+        AbstractFileDescriptor fd = desc;
+
+        while (true) {
+            try {
+                FileIO fileIO = fd.isCompressed() ? new UnzipFileIO(fd.file()) 
: ioFactory.create(fd.file());
+
+                readSegmentHeader(fileIO, curWalSegmIdx);
+
+                break;
+            }
+            catch (IOException | IgniteCheckedException e) {
+                log.error("Failed to init segment curWalSegmIdx=" + 
curWalSegmIdx + ", curIdx=" + curIdx, e);
+
+                curIdx++;
+
+                if (curIdx >= walFileDescriptors.size())
+                    return null;
+
+                fd = walFileDescriptors.get(curIdx);
+            }
+        }
+
+        return super.initReadHandle(fd, start);
+    }
+
+    /** {@inheritDoc} */
     @NotNull @Override protected WALRecord postProcessRecord(@NotNull final 
WALRecord rec) {
-        final GridKernalContext kernalCtx = sharedCtx.kernalContext();
-        final IgniteCacheObjectProcessor processor = kernalCtx.cacheObjects();
+         GridKernalContext kernalCtx = sharedCtx.kernalContext();
+         IgniteCacheObjectProcessor processor = kernalCtx.cacheObjects();
 
-        if (processor != null && rec.type() == 
WALRecord.RecordType.DATA_RECORD) {
+        if (processor != null && rec.type() == RecordType.DATA_RECORD) {
             try {
                 return postProcessDataRecord((DataRecord)rec, kernalCtx, 
processor);
             }
@@ -296,11 +221,12 @@ class StandaloneWalRecordsIterator extends 
AbstractWalRecordsIterator {
      * @throws IgniteCheckedException if failed.
      */
     @NotNull private WALRecord postProcessDataRecord(
-        @NotNull final DataRecord dataRec,
-        final GridKernalContext kernalCtx,
-        final IgniteCacheObjectProcessor processor) throws 
IgniteCheckedException {
-        final CacheObjectContext fakeCacheObjCtx = new 
CacheObjectContext(kernalCtx,
-            null, null, false, false, false);
+        @NotNull DataRecord dataRec,
+        GridKernalContext kernalCtx,
+        IgniteCacheObjectProcessor processor
+    ) throws IgniteCheckedException {
+        final CacheObjectContext fakeCacheObjCtx = new CacheObjectContext(
+            kernalCtx, null, null, false, false, false);
 
         final List<DataEntry> entries = dataRec.writeEntries();
         final List<DataEntry> postProcessedEntries = new 
ArrayList<>(entries.size());
@@ -327,8 +253,7 @@ class StandaloneWalRecordsIterator extends 
AbstractWalRecordsIterator {
      * @return post precessed entry
      * @throws IgniteCheckedException if failed
      */
-    @NotNull
-    private DataEntry postProcessDataEntry(
+    @NotNull private DataEntry postProcessDataEntry(
         final IgniteCacheObjectProcessor processor,
         final CacheObjectContext fakeCacheObjCtx,
         final DataEntry dataEntry) throws IgniteCheckedException {
@@ -383,8 +308,9 @@ class StandaloneWalRecordsIterator extends 
AbstractWalRecordsIterator {
     }
 
     /** {@inheritDoc} */
-    @Override protected AbstractReadFileHandle createReadFileHandle(FileIO 
fileIO, long idx,
-        RecordSerializer ser, FileInput in) {
-        return new FileWriteAheadLogManager.ReadFileHandle(fileIO, idx, ser, 
in);
+    @Override protected AbstractReadFileHandle createReadFileHandle(
+        FileIO fileIO, long idx, RecordSerializer ser, FileInput in
+    ) {
+        return new ReadFileHandle(fileIO, idx, ser, in);
     }
 }

http://git-wip-us.apache.org/repos/asf/ignite/blob/d6ab2ae6/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/serializer/RecordSerializerFactoryImpl.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/serializer/RecordSerializerFactoryImpl.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/serializer/RecordSerializerFactoryImpl.java
index 468392a..2e2e2f8 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/serializer/RecordSerializerFactoryImpl.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/serializer/RecordSerializerFactoryImpl.java
@@ -31,7 +31,7 @@ public class RecordSerializerFactoryImpl implements 
RecordSerializerFactory {
     private GridCacheSharedContext cctx;
 
     /** Write pointer. */
-    private boolean writePointer;
+    private boolean needWritePointer;
 
     /** Read record filter. */
     private IgniteBiPredicate<WALRecord.RecordType, WALPointer> 
recordDeserializeFilter;
@@ -49,7 +49,17 @@ public class RecordSerializerFactoryImpl implements 
RecordSerializerFactory {
      * @param cctx Cctx.
      */
     public RecordSerializerFactoryImpl(GridCacheSharedContext cctx) {
+        this(cctx, null);
+    }
+    /**
+     * @param cctx Cctx.
+     */
+    public RecordSerializerFactoryImpl(
+        GridCacheSharedContext cctx,
+        IgniteBiPredicate<WALRecord.RecordType, WALPointer> readTypeFilter
+    ) {
         this.cctx = cctx;
+        this.recordDeserializeFilter = readTypeFilter;
     }
 
     /** {@inheritDoc} */
@@ -59,14 +69,24 @@ public class RecordSerializerFactoryImpl implements 
RecordSerializerFactory {
 
         switch (ver) {
             case 1:
-                return new RecordV1Serializer(new RecordDataV1Serializer(cctx),
-                    writePointer, marshalledMode, skipPositionCheck, 
recordDeserializeFilter);
+                return new RecordV1Serializer(
+                    new RecordDataV1Serializer(cctx),
+                    needWritePointer,
+                    marshalledMode,
+                    skipPositionCheck,
+                    recordDeserializeFilter);
 
             case 2:
-                RecordDataV2Serializer dataV2Serializer = new 
RecordDataV2Serializer(new RecordDataV1Serializer(cctx));
+                RecordDataV2Serializer dataV2Serializer = new 
RecordDataV2Serializer(
+                    new RecordDataV1Serializer(cctx));
 
-                return new RecordV2Serializer(dataV2Serializer,
-                    writePointer, marshalledMode, skipPositionCheck, 
recordDeserializeFilter);
+                return new RecordV2Serializer(
+                    dataV2Serializer,
+                    needWritePointer,
+                    marshalledMode,
+                    skipPositionCheck,
+                    recordDeserializeFilter
+                );
 
             default:
                 throw new IgniteCheckedException("Failed to create a 
serializer with the given version " +
@@ -78,12 +98,12 @@ public class RecordSerializerFactoryImpl implements 
RecordSerializerFactory {
      * @return Write pointer.
      */
     public boolean writePointer() {
-        return writePointer;
+        return needWritePointer;
     }
 
     /** {@inheritDoc} */
     @Override public RecordSerializerFactoryImpl writePointer(boolean 
writePointer) {
-        this.writePointer = writePointer;
+        this.needWritePointer = writePointer;
 
         return this;
     }
@@ -97,7 +117,8 @@ public class RecordSerializerFactoryImpl implements 
RecordSerializerFactory {
 
     /** {@inheritDoc} */
     @Override public RecordSerializerFactoryImpl recordDeserializeFilter(
-        IgniteBiPredicate<WALRecord.RecordType, WALPointer> readTypeFilter) {
+        IgniteBiPredicate<WALRecord.RecordType, WALPointer> readTypeFilter
+    ) {
         this.recordDeserializeFilter = readTypeFilter;
 
         return this;

http://git-wip-us.apache.org/repos/asf/ignite/blob/d6ab2ae6/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/serializer/RecordV1Serializer.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/serializer/RecordV1Serializer.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/serializer/RecordV1Serializer.java
index caa0962..ca484ce 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/serializer/RecordV1Serializer.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/serializer/RecordV1Serializer.java
@@ -134,6 +134,9 @@ public class RecordV1Serializer implements RecordSerializer 
{
                 throw new SegmentEofException("WAL segment rollover detected 
(will end iteration) [expPtr=" + expPtr +
                         ", readPtr=" + ptr + ']', null);
 
+            if (recType == null)
+                throw new IOException("Unknown record type: " + recType);
+
             final WALRecord rec = dataSerializer.readRecord(recType, in);
 
             rec.position(ptr);
@@ -341,12 +344,7 @@ public class RecordV1Serializer implements 
RecordSerializer {
         if (type == WALRecord.RecordType.STOP_ITERATION_RECORD_TYPE)
             throw new SegmentEofException("Reached logical end of the 
segment", null);
 
-        RecordType recType = RecordType.fromOrdinal(type - 1);
-
-        if (recType == null)
-            throw new IOException("Unknown record type: " + type);
-
-        return recType;
+        return RecordType.fromOrdinal(type - 1);
     }
 
     /**
@@ -359,7 +357,11 @@ public class RecordV1Serializer implements 
RecordSerializer {
      * @throws EOFException In case of end of file.
      * @throws IgniteCheckedException If it's unable to read record.
      */
-    static WALRecord readWithCrc(FileInput in0, WALPointer expPtr, RecordIO 
reader) throws EOFException, IgniteCheckedException {
+    static WALRecord readWithCrc(
+        FileInput in0,
+        WALPointer expPtr,
+        RecordIO reader
+    ) throws EOFException, IgniteCheckedException {
         long startPos = -1;
 
         try (FileInput.Crc32CheckingFileInput in = in0.startRead(skipCrc)) {
@@ -377,7 +379,16 @@ public class RecordV1Serializer implements 
RecordSerializer {
             throw e;
         }
         catch (Exception e) {
-            throw new IgniteCheckedException("Failed to read WAL record at 
position: " + startPos, e);
+            long size = -1;
+
+            try {
+                size = in0.io().size();
+            }
+            catch (IOException ignore) {
+                // No-op. It just for information. Fail calculate file size.
+            }
+
+            throw new IgniteCheckedException("Failed to read WAL record at 
position: " + startPos + " size: " + size, e);
         }
     }
 

http://git-wip-us.apache.org/repos/asf/ignite/blob/d6ab2ae6/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/serializer/RecordV2Serializer.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/serializer/RecordV2Serializer.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/serializer/RecordV2Serializer.java
index 2b81210..2c65ebe 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/serializer/RecordV2Serializer.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/serializer/RecordV2Serializer.java
@@ -115,6 +115,9 @@ public class RecordV2Serializer implements RecordSerializer 
{
 
             FileWALPointer ptr = readPositionAndCheckPoint(in, expPtr, 
skipPositionCheck);
 
+            if (recType == null)
+                throw new IOException("Unknown record type: " + recType);
+
             if (recordFilter != null && !recordFilter.apply(recType, ptr)) {
                 int toSkip = ptr.length() - REC_TYPE_SIZE - 
FILE_WAL_POINTER_SIZE - CRC_SIZE;
 

Reply via email to