Repository: ignite
Updated Branches:
  refs/heads/ignite-2.7 76dbc5245 -> 82df92362


http://git-wip-us.apache.org/repos/asf/ignite/blob/82df9236/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/io/SimpleFileInput.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/io/SimpleFileInput.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/io/SimpleFileInput.java
new file mode 100644
index 0000000..5918b0b
--- /dev/null
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/io/SimpleFileInput.java
@@ -0,0 +1,272 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.cache.persistence.wal.io;
+
+import java.io.EOFException;
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import org.apache.ignite.internal.processors.cache.persistence.file.FileIO;
+import 
org.apache.ignite.internal.processors.cache.persistence.wal.ByteBufferExpander;
+import org.jetbrains.annotations.NotNull;
+
+/**
+ * File input, backed by byte buffer file input.
+ * This class allows to read data by chunks from file and then read primitives
+ */
+public class SimpleFileInput implements FileInput {
+    /**
+     * Buffer for reading blocks of data into.
+     * <b>Note:</b> biggest block requested from this input can't be longer 
than buffer capacity
+     */
+    private ByteBuffer buf;
+
+    /** I/O interface for read/write operations with file */
+    protected FileIO io;
+
+    /** */
+    private long pos;
+
+    /** */
+    private ByteBufferExpander expBuf;
+
+    /**
+     * @param io FileIO to read from.
+     * @param buf Buffer for reading blocks of data into.
+     */
+    public SimpleFileInput(FileIO io, ByteBufferExpander buf) throws 
IOException {
+        assert io != null;
+
+        this.io = io;
+        this.buf = buf.buffer();
+
+        expBuf = buf;
+
+        pos = io.position();
+
+        clearBuffer();
+    }
+
+    /** {@inheritDoc} */
+    @Override public FileIO io() {
+        return io;
+    }
+
+    /**
+     * Clear buffer.
+     */
+    private void clearBuffer() {
+        buf.clear();
+        buf.limit(0);
+
+        assert buf.remaining() == 0; // Buffer is empty.
+    }
+
+    /** {@inheritDoc} */
+    @Override public void seek(long pos) throws IOException {
+        if (pos > io.size())
+            throw new EOFException();
+
+        io.position(pos);
+
+        this.pos = pos;
+
+        clearBuffer();
+    }
+
+    /**
+     * @return Underlying buffer.
+     */
+    @Override public ByteBuffer buffer() {
+        return buf;
+    }
+
+
+    /** {@inheritDoc} */
+    @Override public void ensure(int requested) throws IOException {
+        int available = buf.remaining();
+
+        if (available >= requested)
+            return;
+
+        if (buf.capacity() < requested) {
+            if (expBuf == null)
+                throw new IOException("Requested size is greater than buffer: 
" + requested);
+
+            buf = expBuf.expand(requested);
+
+            assert available == buf.remaining();
+        }
+
+        buf.compact();
+
+        do {
+            int read = io.read(buf);
+
+            if (read == -1)
+                throw new EOFException("EOF at position [" + io.position() + 
"] expected to read [" + requested + "] bytes");
+
+            available += read;
+
+            pos += read;
+        }
+        while (available < requested);
+
+        buf.flip();
+    }
+
+    /**
+     * @return Position in the stream.
+     */
+    @Override public long position() {
+        return pos - buf.remaining();
+    }
+
+    /**
+     * {@inheritDoc}
+     */
+    @Override public void readFully(@NotNull byte[] b) throws IOException {
+        ensure(b.length);
+
+        buf.get(b);
+    }
+
+    /**
+     * {@inheritDoc}
+     */
+    @Override public void readFully(@NotNull byte[] b, int off, int len) 
throws IOException {
+        ensure(len);
+
+        buf.get(b, off, len);
+    }
+
+    /**
+     * {@inheritDoc}
+     */
+    @Override public int skipBytes(int n) throws IOException {
+        if (buf.remaining() >= n)
+            buf.position(buf.position() + n);
+        else
+            seek(pos + n);
+
+        return n;
+    }
+
+    /**
+     * {@inheritDoc}
+     */
+    @Override public boolean readBoolean() throws IOException {
+        return readByte() == 1;
+    }
+
+    /**
+     * {@inheritDoc}
+     */
+    @Override public byte readByte() throws IOException {
+        ensure(1);
+
+        return buf.get();
+    }
+
+    /**
+     * {@inheritDoc}
+     */
+    @Override public int readUnsignedByte() throws IOException {
+        return readByte() & 0xFF;
+    }
+
+    /**
+     * {@inheritDoc}
+     */
+    @Override public short readShort() throws IOException {
+        ensure(2);
+
+        return buf.getShort();
+    }
+
+    /**
+     * {@inheritDoc}
+     */
+    @Override public int readUnsignedShort() throws IOException {
+        return readShort() & 0xFFFF;
+    }
+
+    /**
+     * {@inheritDoc}
+     */
+    @Override public char readChar() throws IOException {
+        ensure(2);
+
+        return buf.getChar();
+    }
+
+    /**
+     * {@inheritDoc}
+     */
+    @Override public int readInt() throws IOException {
+        ensure(4);
+
+        return buf.getInt();
+    }
+
+    /**
+     * {@inheritDoc}
+     */
+    @Override public long readLong() throws IOException {
+        ensure(8);
+
+        return buf.getLong();
+    }
+
+    /**
+     * {@inheritDoc}
+     */
+    @Override public float readFloat() throws IOException {
+        ensure(4);
+
+        return buf.getFloat();
+    }
+
+    /**
+     * {@inheritDoc}
+     */
+    @Override public double readDouble() throws IOException {
+        ensure(8);
+
+        return buf.getDouble();
+    }
+
+    /**
+     * {@inheritDoc}
+     */
+    @Override public String readLine() throws IOException {
+        throw new UnsupportedOperationException();
+    }
+
+    /** {@inheritDoc} */
+    @Override public String readUTF() throws IOException {
+        throw new UnsupportedOperationException();
+    }
+
+    /**
+     * @param skipCheck If CRC check should be skipped.
+     * @return autoclosable fileInput, after its closing crc32 will be 
calculated and compared with saved one
+     */
+    public Crc32CheckingFileInput startRead(boolean skipCheck) {
+        return new Crc32CheckingFileInput(this, skipCheck);
+    }
+}

http://git-wip-us.apache.org/repos/asf/ignite/blob/82df9236/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/io/SimpleSegmentFileInputFactory.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/io/SimpleSegmentFileInputFactory.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/io/SimpleSegmentFileInputFactory.java
new file mode 100644
index 0000000..2e5ea75
--- /dev/null
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/io/SimpleSegmentFileInputFactory.java
@@ -0,0 +1,33 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.cache.persistence.wal.io;
+
+import java.io.IOException;
+import 
org.apache.ignite.internal.processors.cache.persistence.wal.ByteBufferExpander;
+
+/**
+ * Simple implementation of {@link SegmentFileInputFactory}.
+ */
+public class SimpleSegmentFileInputFactory implements SegmentFileInputFactory {
+
+    /** {@inheritDoc} */
+    @Override public FileInput createFileInput(SegmentIO segmentIO,
+        ByteBufferExpander buf) throws IOException {
+        return new SimpleFileInput(segmentIO, buf);
+    }
+}

http://git-wip-us.apache.org/repos/asf/ignite/blob/82df9236/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/reader/IgniteWalIteratorFactory.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/reader/IgniteWalIteratorFactory.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/reader/IgniteWalIteratorFactory.java
index aaff33a..f9388ea 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/reader/IgniteWalIteratorFactory.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/reader/IgniteWalIteratorFactory.java
@@ -39,13 +39,13 @@ import org.apache.ignite.internal.pagemem.wal.WALIterator;
 import org.apache.ignite.internal.pagemem.wal.WALPointer;
 import org.apache.ignite.internal.pagemem.wal.record.WALRecord.RecordType;
 import org.apache.ignite.internal.processors.cache.GridCacheSharedContext;
-import org.apache.ignite.internal.processors.cache.persistence.file.FileIO;
 import 
org.apache.ignite.internal.processors.cache.persistence.file.FileIOFactory;
-import 
org.apache.ignite.internal.processors.cache.persistence.file.UnzipFileIO;
 import 
org.apache.ignite.internal.processors.cache.persistence.wal.ByteBufferExpander;
 import 
org.apache.ignite.internal.processors.cache.persistence.wal.FileDescriptor;
-import org.apache.ignite.internal.processors.cache.persistence.wal.FileInput;
 import 
org.apache.ignite.internal.processors.cache.persistence.wal.FileWALPointer;
+import 
org.apache.ignite.internal.processors.cache.persistence.wal.io.SegmentFileInputFactory;
+import 
org.apache.ignite.internal.processors.cache.persistence.wal.io.SegmentIO;
+import 
org.apache.ignite.internal.processors.cache.persistence.wal.io.SimpleSegmentFileInputFactory;
 import org.apache.ignite.internal.util.typedef.T2;
 import org.apache.ignite.internal.util.typedef.internal.A;
 import org.apache.ignite.internal.util.typedef.internal.U;
@@ -67,6 +67,9 @@ public class IgniteWalIteratorFactory {
     /** Logger. */
     private final IgniteLogger log;
 
+    /** */
+    private final SegmentFileInputFactory segmentFileInputFactory = new 
SimpleSegmentFileInputFactory();
+
     /**
      * Creates WAL files iterator factory.
      * WAL iterator supports automatic converting from CacheObjects and 
KeyCacheObject into BinaryObjects
@@ -319,10 +322,10 @@ public class IgniteWalIteratorFactory {
         FileDescriptor ds = new FileDescriptor(file);
 
         try (
-            FileIO fileIO = ds.isCompressed() ? new UnzipFileIO(file) : 
ioFactory.create(file);
+            SegmentIO fileIO = ds.toIO(ioFactory);
             ByteBufferExpander buf = new 
ByteBufferExpander(HEADER_RECORD_SIZE, ByteOrder.nativeOrder())
         ) {
-            final DataInput in = new FileInput(fileIO, buf);
+            final DataInput in = 
segmentFileInputFactory.createFileInput(fileIO, buf);
 
             // Header record must be agnostic to the serializer version.
             final int type = in.readUnsignedByte();

http://git-wip-us.apache.org/repos/asf/ignite/blob/82df9236/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/reader/StandaloneWalRecordsIterator.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/reader/StandaloneWalRecordsIterator.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/reader/StandaloneWalRecordsIterator.java
index 7cfb66d..c33a45b 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/reader/StandaloneWalRecordsIterator.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/reader/StandaloneWalRecordsIterator.java
@@ -37,15 +37,16 @@ import 
org.apache.ignite.internal.processors.cache.CacheObject;
 import org.apache.ignite.internal.processors.cache.CacheObjectContext;
 import org.apache.ignite.internal.processors.cache.GridCacheSharedContext;
 import org.apache.ignite.internal.processors.cache.KeyCacheObject;
-import org.apache.ignite.internal.processors.cache.persistence.file.FileIO;
 import 
org.apache.ignite.internal.processors.cache.persistence.file.FileIOFactory;
-import 
org.apache.ignite.internal.processors.cache.persistence.file.UnzipFileIO;
 import 
org.apache.ignite.internal.processors.cache.persistence.wal.AbstractWalRecordsIterator;
 import 
org.apache.ignite.internal.processors.cache.persistence.wal.FileDescriptor;
-import org.apache.ignite.internal.processors.cache.persistence.wal.FileInput;
 import 
org.apache.ignite.internal.processors.cache.persistence.wal.FileWALPointer;
 import 
org.apache.ignite.internal.processors.cache.persistence.wal.FileWriteAheadLogManager.ReadFileHandle;
 import 
org.apache.ignite.internal.processors.cache.persistence.wal.WalSegmentTailReachedException;
+import 
org.apache.ignite.internal.processors.cache.persistence.wal.io.FileInput;
+import 
org.apache.ignite.internal.processors.cache.persistence.wal.io.SegmentFileInputFactory;
+import 
org.apache.ignite.internal.processors.cache.persistence.wal.io.SegmentIO;
+import 
org.apache.ignite.internal.processors.cache.persistence.wal.io.SimpleSegmentFileInputFactory;
 import 
org.apache.ignite.internal.processors.cache.persistence.wal.crc.IgniteDataIntegrityViolationException;
 import 
org.apache.ignite.internal.processors.cache.persistence.wal.serializer.RecordSerializer;
 import 
org.apache.ignite.internal.processors.cache.persistence.wal.serializer.RecordSerializerFactoryImpl;
@@ -72,6 +73,9 @@ class StandaloneWalRecordsIterator extends 
AbstractWalRecordsIterator {
 
     /** */
     private static final long serialVersionUID = 0L;
+
+    /** Factory to provide I/O interfaces for read primitives with files. */
+    private static final SegmentFileInputFactory FILE_INPUT_FACTORY = new 
SimpleSegmentFileInputFactory();
     /**
      * File descriptors remained to scan.
      * <code>null</code> value means directory scan mode
@@ -118,7 +122,8 @@ class StandaloneWalRecordsIterator extends 
AbstractWalRecordsIterator {
             sharedCtx,
             new RecordSerializerFactoryImpl(sharedCtx, readTypeFilter),
             ioFactory,
-            initialReadBufferSize
+            initialReadBufferSize,
+            FILE_INPUT_FACTORY
         );
 
         this.lowBound = lowBound;
@@ -262,13 +267,13 @@ class StandaloneWalRecordsIterator extends 
AbstractWalRecordsIterator {
     ) throws IgniteCheckedException, FileNotFoundException {
 
         AbstractFileDescriptor fd = desc;
-        FileIO fileIO = null;
+        SegmentIO fileIO = null;
         SegmentHeader segmentHeader;
         while (true) {
             try {
-                fileIO = fd.isCompressed() ? new UnzipFileIO(fd.file()) : 
ioFactory.create(fd.file());
+                fileIO = fd.toIO(ioFactory);
 
-                segmentHeader = readSegmentHeader(fileIO, curWalSegmIdx);
+                segmentHeader = readSegmentHeader(fileIO, FILE_INPUT_FACTORY);
 
                 break;
             }
@@ -422,8 +427,8 @@ class StandaloneWalRecordsIterator extends 
AbstractWalRecordsIterator {
 
     /** {@inheritDoc} */
     @Override protected AbstractReadFileHandle createReadFileHandle(
-        FileIO fileIO, long idx, RecordSerializer ser, FileInput in
+        SegmentIO fileIO, RecordSerializer ser, FileInput in
     ) {
-        return new ReadFileHandle(fileIO, idx, ser, in);
+        return new ReadFileHandle(fileIO, ser, in, null);
     }
 }

http://git-wip-us.apache.org/repos/asf/ignite/blob/82df9236/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/serializer/RecordSerializer.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/serializer/RecordSerializer.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/serializer/RecordSerializer.java
index c5760ab..a9d793d 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/serializer/RecordSerializer.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/serializer/RecordSerializer.java
@@ -22,7 +22,7 @@ import java.nio.ByteBuffer;
 import org.apache.ignite.IgniteCheckedException;
 import org.apache.ignite.internal.pagemem.wal.WALPointer;
 import org.apache.ignite.internal.pagemem.wal.record.WALRecord;
-import org.apache.ignite.internal.processors.cache.persistence.wal.FileInput;
+import 
org.apache.ignite.internal.processors.cache.persistence.wal.io.FileInput;
 
 /**
  * Record serializer.

http://git-wip-us.apache.org/repos/asf/ignite/blob/82df9236/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/serializer/RecordV1Serializer.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/serializer/RecordV1Serializer.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/serializer/RecordV1Serializer.java
index ca484ce..afd770d 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/serializer/RecordV1Serializer.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/serializer/RecordV1Serializer.java
@@ -29,13 +29,15 @@ import 
org.apache.ignite.internal.pagemem.wal.record.FilteredRecord;
 import org.apache.ignite.internal.pagemem.wal.record.MarshalledRecord;
 import org.apache.ignite.internal.pagemem.wal.record.WALRecord;
 import org.apache.ignite.internal.pagemem.wal.record.WALRecord.RecordType;
-import org.apache.ignite.internal.processors.cache.persistence.file.FileIO;
 import 
org.apache.ignite.internal.processors.cache.persistence.tree.io.CacheVersionIO;
 import 
org.apache.ignite.internal.processors.cache.persistence.wal.ByteBufferBackedDataInput;
 import 
org.apache.ignite.internal.processors.cache.persistence.wal.ByteBufferExpander;
-import org.apache.ignite.internal.processors.cache.persistence.wal.FileInput;
+import 
org.apache.ignite.internal.processors.cache.persistence.wal.io.FileInput;
+import 
org.apache.ignite.internal.processors.cache.persistence.wal.io.SegmentFileInputFactory;
 import 
org.apache.ignite.internal.processors.cache.persistence.wal.FileWALPointer;
 import 
org.apache.ignite.internal.processors.cache.persistence.wal.SegmentEofException;
+import 
org.apache.ignite.internal.processors.cache.persistence.wal.io.SegmentIO;
+import 
org.apache.ignite.internal.processors.cache.persistence.wal.io.SimpleFileInput;
 import 
org.apache.ignite.internal.processors.cache.persistence.wal.WalSegmentTailReachedException;
 import 
org.apache.ignite.internal.processors.cache.persistence.wal.crc.PureJavaCrc32;
 import 
org.apache.ignite.internal.processors.cache.persistence.wal.record.HeaderRecord;
@@ -246,14 +248,14 @@ public class RecordV1Serializer implements 
RecordSerializer {
      * NOTE: Method mutates position of {@code io}.
      *
      * @param io I/O interface for file.
-     * @param expectedIdx Expected WAL segment index for readable record.
+     * @param segmentFileInputFactory File input factory.
      * @return Instance of {@link SegmentHeader} extracted from the file.
      * @throws IgniteCheckedException If failed to read serializer version.
      */
-    public static SegmentHeader readSegmentHeader(FileIO io, long expectedIdx)
+    public static SegmentHeader readSegmentHeader(SegmentIO io, 
SegmentFileInputFactory segmentFileInputFactory)
         throws IgniteCheckedException, IOException {
         try (ByteBufferExpander buf = new 
ByteBufferExpander(HEADER_RECORD_SIZE, ByteOrder.nativeOrder())) {
-            FileInput in = new FileInput(io, buf);
+            ByteBufferBackedDataInput in = 
segmentFileInputFactory.createFileInput(io, buf);
 
             in.ensure(HEADER_RECORD_SIZE);
 
@@ -270,7 +272,7 @@ public class RecordV1Serializer implements RecordSerializer 
{
             // Read file pointer.
             FileWALPointer ptr = readPosition(in);
 
-            if (expectedIdx != ptr.index())
+            if (io.getSegmentId() != ptr.index())
                 throw new SegmentEofException("Reached logical end of the 
segment by pointer", null);
 
             assert ptr.fileOffset() == 0 : "Header record should be placed at 
the beginning of file " + ptr;
@@ -364,7 +366,7 @@ public class RecordV1Serializer implements RecordSerializer 
{
     ) throws EOFException, IgniteCheckedException {
         long startPos = -1;
 
-        try (FileInput.Crc32CheckingFileInput in = in0.startRead(skipCrc)) {
+        try (SimpleFileInput.Crc32CheckingFileInput in = 
in0.startRead(skipCrc)) {
             startPos = in0.position();
 
             WALRecord res = reader.readWithHeaders(in, expPtr);

http://git-wip-us.apache.org/repos/asf/ignite/blob/82df9236/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/serializer/RecordV2Serializer.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/serializer/RecordV2Serializer.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/serializer/RecordV2Serializer.java
index 68e55e0..e112522 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/serializer/RecordV2Serializer.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/serializer/RecordV2Serializer.java
@@ -28,7 +28,7 @@ import 
org.apache.ignite.internal.pagemem.wal.record.FilteredRecord;
 import org.apache.ignite.internal.pagemem.wal.record.MarshalledRecord;
 import org.apache.ignite.internal.pagemem.wal.record.WALRecord;
 import 
org.apache.ignite.internal.processors.cache.persistence.wal.ByteBufferBackedDataInput;
-import org.apache.ignite.internal.processors.cache.persistence.wal.FileInput;
+import 
org.apache.ignite.internal.processors.cache.persistence.wal.io.FileInput;
 import 
org.apache.ignite.internal.processors.cache.persistence.wal.FileWALPointer;
 import 
org.apache.ignite.internal.processors.cache.persistence.wal.SegmentEofException;
 import 
org.apache.ignite.internal.processors.cache.persistence.wal.WalSegmentTailReachedException;

http://git-wip-us.apache.org/repos/asf/ignite/blob/82df9236/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/db/wal/IgniteWalHistoryReservationsTest.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/db/wal/IgniteWalHistoryReservationsTest.java
 
b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/db/wal/IgniteWalHistoryReservationsTest.java
index dd10479..54a7d8c 100644
--- 
a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/db/wal/IgniteWalHistoryReservationsTest.java
+++ 
b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/db/wal/IgniteWalHistoryReservationsTest.java
@@ -65,8 +65,8 @@ public class IgniteWalHistoryReservationsTest extends 
GridCommonAbstractTest {
                 new DataRegionConfiguration()
                     .setMaxSize(200L * 1024 * 1024)
                     .setPersistenceEnabled(true))
-                .setWalMode(WALMode.LOG_ONLY)
-                .setWalSegmentSize(512 * 1024);
+            .setWalMode(WALMode.LOG_ONLY)
+            .setWalSegmentSize(512 * 1024);
 
         cfg.setDataStorageConfiguration(memCfg);
 
@@ -124,8 +124,8 @@ public class IgniteWalHistoryReservationsTest extends 
GridCommonAbstractTest {
 
         log.warning("Start loading");
 
-        try (IgniteDataStreamer<Object, Object> st = 
ig0.dataStreamer("cache1")){
-            for (int k = 0; k < entryCnt; k++){
+        try (IgniteDataStreamer<Object, Object> st = 
ig0.dataStreamer("cache1")) {
+            for (int k = 0; k < entryCnt; k++) {
                 st.addData(k, k);
 
                 printProgress(k);
@@ -158,10 +158,10 @@ public class IgniteWalHistoryReservationsTest extends 
GridCommonAbstractTest {
 
         log.warning("Start loading");
 
-        try (IgniteDataStreamer<Object, Object> st = 
ig0.dataStreamer("cache1")){
+        try (IgniteDataStreamer<Object, Object> st = 
ig0.dataStreamer("cache1")) {
             st.allowOverwrite(true);
 
-            for (int k = 0; k < entryCnt; k++){
+            for (int k = 0; k < entryCnt; k++) {
                 st.addData(k, k);
 
                 printProgress(k);
@@ -193,16 +193,8 @@ public class IgniteWalHistoryReservationsTest extends 
GridCommonAbstractTest {
                     for (int g = 0; g < initGridCnt; g++) {
                         IgniteEx ig = grid(g);
 
-                        FileWriteAheadLogManager wal = 
(FileWriteAheadLogManager)ig.context().cache().context().wal();
-
-                        Object reservationStorage = 
GridTestUtils.getFieldValue(wal, "reservationStorage");
-
-                        synchronized (reservationStorage) {
-                            Map reserved = 
GridTestUtils.getFieldValue(reservationStorage, "reserved");
-
-                            if (reserved.isEmpty())
-                                return false;
-                        }
+                        if (isReserveListEmpty(ig))
+                            return false;
                     }
 
                     return true;
@@ -220,16 +212,8 @@ public class IgniteWalHistoryReservationsTest extends 
GridCommonAbstractTest {
                 for (int g = 0; g < initGridCnt; g++) {
                     IgniteEx ig = grid(g);
 
-                    FileWriteAheadLogManager wal = 
(FileWriteAheadLogManager)ig.context().cache().context().wal();
-
-                    Object reservationStorage = 
GridTestUtils.getFieldValue(wal, "reservationStorage");
-
-                    synchronized (reservationStorage) {
-                        Map reserved = 
GridTestUtils.getFieldValue(reservationStorage, "reserved");
-
-                        if (!reserved.isEmpty())
-                            return false;
-                    }
+                    if (isReserveListEmpty(ig))
+                        return false;
                 }
 
                 return true;
@@ -240,9 +224,26 @@ public class IgniteWalHistoryReservationsTest extends 
GridCommonAbstractTest {
     }
 
     /**
+     * @return {@code true} if reserve list is empty.
+     */
+    private boolean isReserveListEmpty(IgniteEx ig) {
+        FileWriteAheadLogManager wal = 
(FileWriteAheadLogManager)ig.context().cache().context().wal();
+
+        Object segmentAware = GridTestUtils.getFieldValue(wal, "segmentAware");
+
+        synchronized (segmentAware) {
+            Map reserved = 
GridTestUtils.getFieldValue(GridTestUtils.getFieldValue(segmentAware, 
"reservationStorage"), "reserved");
+
+            if (reserved.isEmpty())
+                return true;
+        }
+        return false;
+    }
+
+    /**
      *
      */
-    private void printProgress(int k){
+    private void printProgress(int k) {
         if (k % 1000 == 0)
             log.warning("Keys -> " + k);
     }
@@ -253,7 +254,7 @@ public class IgniteWalHistoryReservationsTest extends 
GridCommonAbstractTest {
     public void testRemovesArePreloadedIfHistoryIsAvailable() throws Exception 
{
         int entryCnt = 10_000;
 
-        IgniteEx ig0 = (IgniteEx) startGrids(2);
+        IgniteEx ig0 = (IgniteEx)startGrids(2);
 
         ig0.active(true);
 
@@ -302,7 +303,7 @@ public class IgniteWalHistoryReservationsTest extends 
GridCommonAbstractTest {
     public void testNodeIsClearedIfHistoryIsUnavailable() throws Exception {
         int entryCnt = 10_000;
 
-        IgniteEx ig0 = (IgniteEx) startGrids(2);
+        IgniteEx ig0 = (IgniteEx)startGrids(2);
 
         ig0.active(true);
 
@@ -362,7 +363,7 @@ public class IgniteWalHistoryReservationsTest extends 
GridCommonAbstractTest {
     public void testWalHistoryPartiallyRemoved() throws Exception {
         int entryCnt = 10_000;
 
-        IgniteEx ig0 = (IgniteEx) startGrids(2);
+        IgniteEx ig0 = (IgniteEx)startGrids(2);
 
         ig0.cluster().active(true);
 
@@ -386,7 +387,7 @@ public class IgniteWalHistoryReservationsTest extends 
GridCommonAbstractTest {
         stopAllGrids();
 
         U.delete(U.resolveWorkDirectory(U.defaultWorkDirectory(), walArchPath 
+ "/" +
-                nodeId0, false));
+            nodeId0, false));
 
         startGrid(0);
 
@@ -436,16 +437,8 @@ public class IgniteWalHistoryReservationsTest extends 
GridCommonAbstractTest {
                     for (int g = 0; g < initGridCnt; g++) {
                         IgniteEx ig = grid(g);
 
-                        FileWriteAheadLogManager wal = 
(FileWriteAheadLogManager)ig.context().cache().context().wal();
-
-                        Object reservationStorage = 
GridTestUtils.getFieldValue(wal, "reservationStorage");
-
-                        synchronized (reservationStorage) {
-                            Map reserved = 
GridTestUtils.getFieldValue(reservationStorage, "reserved");
-
-                            if (reserved.isEmpty())
-                                return false;
-                        }
+                        if (isReserveListEmpty(ig))
+                            return false;
                     }
 
                     return true;
@@ -465,16 +458,8 @@ public class IgniteWalHistoryReservationsTest extends 
GridCommonAbstractTest {
                 for (int g = 0; g < initGridCnt - 1; g++) {
                     IgniteEx ig = grid(g);
 
-                    FileWriteAheadLogManager wal = 
(FileWriteAheadLogManager)ig.context().cache().context().wal();
-
-                    Object reservationStorage = 
GridTestUtils.getFieldValue(wal, "reservationStorage");
-
-                    synchronized (reservationStorage) {
-                        Map reserved = 
GridTestUtils.getFieldValue(reservationStorage, "reserved");
-
-                        if (!reserved.isEmpty())
-                            return false;
-                    }
+                    if (isReserveListEmpty(ig))
+                        return false;
                 }
 
                 return true;

http://git-wip-us.apache.org/repos/asf/ignite/blob/82df9236/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/db/wal/IgniteWalIteratorSwitchSegmentTest.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/db/wal/IgniteWalIteratorSwitchSegmentTest.java
 
b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/db/wal/IgniteWalIteratorSwitchSegmentTest.java
index 9dbef5d..74db28f 100644
--- 
a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/db/wal/IgniteWalIteratorSwitchSegmentTest.java
+++ 
b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/db/wal/IgniteWalIteratorSwitchSegmentTest.java
@@ -18,13 +18,18 @@
 package org.apache.ignite.internal.processors.cache.persistence.db.wal;
 
 import java.io.File;
+import java.nio.channels.Channel;
 import java.nio.file.Paths;
 import java.util.Random;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.atomic.AtomicReference;
 import org.apache.ignite.IgniteCheckedException;
 import org.apache.ignite.configuration.DataStorageConfiguration;
 import org.apache.ignite.configuration.IgniteConfiguration;
 import org.apache.ignite.configuration.WALMode;
 import org.apache.ignite.internal.GridKernalContext;
+import org.apache.ignite.internal.IgniteInternalFuture;
 import 
org.apache.ignite.internal.managers.eventstorage.GridEventStorageManager;
 import org.apache.ignite.internal.pagemem.wal.IgniteWriteAheadLogManager;
 import org.apache.ignite.internal.pagemem.wal.WALIterator;
@@ -40,8 +45,11 @@ import 
org.apache.ignite.internal.processors.cache.persistence.IgniteCacheDataba
 import org.apache.ignite.internal.processors.cache.persistence.file.FileIO;
 import 
org.apache.ignite.internal.processors.cache.persistence.file.FileIOFactory;
 import 
org.apache.ignite.internal.processors.cache.persistence.file.RandomAccessFileIOFactory;
+import 
org.apache.ignite.internal.processors.cache.persistence.wal.AbstractWalRecordsIterator;
 import 
org.apache.ignite.internal.processors.cache.persistence.wal.FileWriteAheadLogManager;
 import 
org.apache.ignite.internal.processors.cache.persistence.wal.FsyncModeFileWriteAheadLogManager;
+import 
org.apache.ignite.internal.processors.cache.persistence.wal.aware.SegmentAware;
+import 
org.apache.ignite.internal.processors.cache.persistence.wal.io.FileInput;
 import 
org.apache.ignite.internal.processors.cache.persistence.wal.reader.StandaloneGridKernalContext;
 import 
org.apache.ignite.internal.processors.cache.persistence.wal.serializer.RecordSerializer;
 import 
org.apache.ignite.internal.processors.cache.persistence.wal.serializer.RecordSerializerFactoryImpl;
@@ -58,6 +66,8 @@ import org.junit.Assert;
 
 import static 
org.apache.ignite.internal.pagemem.wal.record.WALRecord.RecordType.METASTORE_DATA_RECORD;
 import static 
org.apache.ignite.internal.processors.cache.persistence.wal.serializer.RecordV1Serializer.HEADER_RECORD_SIZE;
+import static 
org.apache.ignite.testframework.GridTestUtils.getFieldValueHierarchy;
+import static org.apache.ignite.testframework.GridTestUtils.waitForCondition;
 
 /***
  * Test check correct switch segment if in the tail of segment have garbage.
@@ -159,7 +169,7 @@ public class IgniteWalIteratorSwitchSegmentTest extends 
GridCommonAbstractTest {
      *
      * @throws Exception If some thing failed.
      */
-    public void test() throws Exception {
+    public void testInvariantSwitchSegment() throws Exception {
         for (int serVer : checkSerializerVers) {
             for (Class walMgrClass : checkWalManagers) {
                 try {
@@ -175,6 +185,22 @@ public class IgniteWalIteratorSwitchSegmentTest extends 
GridCommonAbstractTest {
     }
 
     /**
+     * Test for check switch segment from work dir to archive dir during 
iteration.
+     *
+     * @throws Exception If some thing failed.
+     */
+    public void testSwitchReadingSegmentFromWorkToArchive() throws Exception {
+        for (int serVer : checkSerializerVers) {
+                try {
+                    
checkSwitchReadingSegmentDuringIteration(FileWriteAheadLogManager.class, 
serVer);
+                }
+                finally {
+                    U.delete(Paths.get(U.defaultWorkDirectory()));
+                }
+        }
+    }
+
+    /**
      * @param walMgrClass WAL manager class.
      * @param serVer WAL serializer version.
      * @throws Exception If some thing failed.
@@ -299,6 +325,101 @@ public class IgniteWalIteratorSwitchSegmentTest extends 
GridCommonAbstractTest {
         Assert.assertEquals("Not all records read during iteration.", 
expectedRecords, actualRecords);
     }
 
+    /**
+     * @param walMgrClass WAL manager class.
+     * @param serVer WAL serializer version.
+     * @throws Exception If some thing failed.
+     */
+    private void checkSwitchReadingSegmentDuringIteration(Class walMgrClass, 
int serVer) throws Exception {
+        String workDir = U.defaultWorkDirectory();
+
+        T2<IgniteWriteAheadLogManager, RecordSerializer> initTup = 
initiate(walMgrClass, serVer, workDir);
+
+        IgniteWriteAheadLogManager walMgr = initTup.get1();
+
+        RecordSerializer recordSerializer = initTup.get2();
+
+        MetastoreDataRecord rec = new MetastoreDataRecord("0", new byte[100]);
+
+        int recSize = recordSerializer.size(rec);
+
+        // Add more record for rollover to the next segment.
+        int recordsToWrite = SEGMENT_SIZE / recSize + 100;
+
+        SegmentAware segmentAware = GridTestUtils.getFieldValue(walMgr, 
"segmentAware");
+
+        //guard from archivation before iterator would be created.
+        segmentAware.checkCanReadArchiveOrReserveWorkSegment(0);
+
+        for (int i = 0; i < recordsToWrite; i++)
+            walMgr.log(new MetastoreDataRecord(rec.key(), rec.value()));
+
+        walMgr.flush(null, true);
+
+        int expectedRecords = recordsToWrite;
+        AtomicInteger actualRecords = new AtomicInteger(0);
+
+        AtomicReference<String> startedSegmentPath = new AtomicReference<>();
+        AtomicReference<String> finishedSegmentPath = new AtomicReference<>();
+
+        CountDownLatch startedIteratorLatch = new CountDownLatch(1);
+        CountDownLatch finishedArchivedLatch = new CountDownLatch(1);
+
+        IgniteInternalFuture<Object> future = GridTestUtils.runAsync(
+            () -> {
+                // Check that switch segment works as expected and all record 
is reachable.
+                try (WALIterator it = walMgr.replay(null)) {
+                    Object handle = getFieldValueHierarchy(it,  
"currWalSegment");
+
+                    FileInput in = getFieldValueHierarchy(handle, "in");
+                    Object delegate = getFieldValueHierarchy(in.io(), 
"delegate");
+                    Channel ch = getFieldValueHierarchy(delegate, "ch");
+                    String path = getFieldValueHierarchy(ch, "path");
+
+                    startedSegmentPath.set(path);
+
+                    startedIteratorLatch.countDown();
+
+                    while (it.hasNext()) {
+                        IgniteBiTuple<WALPointer, WALRecord> tup = it.next();
+
+                        WALRecord rec0 = tup.get2();
+
+                        if (rec0.type() == METASTORE_DATA_RECORD)
+                            actualRecords.incrementAndGet();
+
+                        finishedArchivedLatch.await();
+                    }
+
+                    in = getFieldValueHierarchy(handle, "in");
+                    delegate = getFieldValueHierarchy(in.io(), "delegate");
+                    ch = getFieldValueHierarchy(delegate, "ch");
+                    path = getFieldValueHierarchy(ch, "path");
+
+                    finishedSegmentPath.set(path);
+                }
+
+                return null;
+            }
+        );
+
+        startedIteratorLatch.await();
+
+        segmentAware.releaseWorkSegment(0);
+
+        waitForCondition(() -> segmentAware.lastArchivedAbsoluteIndex() == 0, 
5000);
+
+        finishedArchivedLatch.countDown();
+
+        future.get();
+
+        //should started iteration from work directory but finish from archive 
directory.
+        assertEquals(workDir + WORK_SUB_DIR + "/0000000000000000.wal", 
startedSegmentPath.get());
+        assertEquals(workDir + ARCHIVE_SUB_DIR + "/0000000000000000.wal", 
finishedSegmentPath.get());
+
+        Assert.assertEquals("Not all records read during iteration.", 
expectedRecords, actualRecords.get());
+    }
+
     /***
      * Initiate WAL manager.
      *
@@ -323,6 +444,7 @@ public class IgniteWalIteratorSwitchSegmentTest extends 
GridCommonAbstractTest {
                 cfg.setDataStorageConfiguration(
                     new DataStorageConfiguration()
                         .setWalSegmentSize(SEGMENT_SIZE)
+                        .setWalRecordIteratorBufferSize(SEGMENT_SIZE / 2)
                         .setWalMode(WALMode.FSYNC)
                         .setWalPath(workDir + WORK_SUB_DIR)
                         .setWalArchivePath(workDir + ARCHIVE_SUB_DIR)

http://git-wip-us.apache.org/repos/asf/ignite/blob/82df9236/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/db/wal/crc/IgniteDataIntegrityTests.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/db/wal/crc/IgniteDataIntegrityTests.java
 
b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/db/wal/crc/IgniteDataIntegrityTests.java
index c077b27..59dd3b7 100644
--- 
a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/db/wal/crc/IgniteDataIntegrityTests.java
+++ 
b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/db/wal/crc/IgniteDataIntegrityTests.java
@@ -27,7 +27,8 @@ import java.util.concurrent.ThreadLocalRandom;
 import 
org.apache.ignite.internal.processors.cache.persistence.file.FileIOFactory;
 import 
org.apache.ignite.internal.processors.cache.persistence.file.RandomAccessFileIOFactory;
 import 
org.apache.ignite.internal.processors.cache.persistence.wal.ByteBufferExpander;
-import org.apache.ignite.internal.processors.cache.persistence.wal.FileInput;
+import 
org.apache.ignite.internal.processors.cache.persistence.wal.io.FileInput;
+import 
org.apache.ignite.internal.processors.cache.persistence.wal.io.SimpleFileInput;
 import 
org.apache.ignite.internal.processors.cache.persistence.wal.crc.IgniteDataIntegrityViolationException;
 import 
org.apache.ignite.internal.processors.cache.persistence.wal.crc.PureJavaCrc32;
 
@@ -36,7 +37,7 @@ import 
org.apache.ignite.internal.processors.cache.persistence.wal.crc.PureJavaC
  */
 public class IgniteDataIntegrityTests extends TestCase {
     /** File input. */
-    private FileInput fileInput;
+    private SimpleFileInput fileInput;
 
     /** Buffer expander. */
     private ByteBufferExpander expBuf;
@@ -52,7 +53,7 @@ public class IgniteDataIntegrityTests extends TestCase {
 
         FileIOFactory factory = new RandomAccessFileIOFactory();
 
-        fileInput = new FileInput(
+        fileInput = new SimpleFileInput(
                 factory.create(file),
                 expBuf
         );

http://git-wip-us.apache.org/repos/asf/ignite/blob/82df9236/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/wal/aware/SegmentAwareTest.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/wal/aware/SegmentAwareTest.java
 
b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/wal/aware/SegmentAwareTest.java
new file mode 100644
index 0000000..8287684
--- /dev/null
+++ 
b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/wal/aware/SegmentAwareTest.java
@@ -0,0 +1,601 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.ignite.internal.processors.cache.persistence.wal.aware;
+
+import java.util.concurrent.CountDownLatch;
+import junit.framework.TestCase;
+import org.apache.ignite.IgniteCheckedException;
+import org.apache.ignite.internal.IgniteFutureTimeoutCheckedException;
+import org.apache.ignite.internal.IgniteInternalFuture;
+import org.apache.ignite.internal.IgniteInterruptedCheckedException;
+import org.apache.ignite.testframework.GridTestUtils;
+
+import static org.hamcrest.CoreMatchers.is;
+import static org.junit.Assert.assertThat;
+
+/**
+ * Test for {@link SegmentAware}.
+ */
+public class SegmentAwareTest extends TestCase {
+
+    /**
+     * Waiting finished when work segment is set.
+     */
+    public void testFinishAwaitSegment_WhenExactWaitingSegmentWasSet() throws 
IgniteCheckedException, InterruptedException {
+        //given: thread which awaited segment.
+        SegmentAware aware = new SegmentAware(10);
+
+        IgniteInternalFuture future = awaitThread(() -> aware.awaitSegment(5));
+
+        //when: set exact awaiting segment.
+        aware.curAbsWalIdx(5);
+
+        //then: waiting should finish immediately
+        future.get(20);
+    }
+
+    /**
+     * Waiting finished when work segment greater than expected is set.
+     */
+    public void testFinishAwaitSegment_WhenGreaterThanWaitingSegmentWasSet() 
throws IgniteCheckedException, InterruptedException {
+        //given: thread which awaited segment.
+        SegmentAware aware = new SegmentAware(10);
+
+        IgniteInternalFuture future = awaitThread(() -> aware.awaitSegment(5));
+
+        //when: set grater than awaiting segment.
+        aware.curAbsWalIdx(10);
+
+        //then: waiting should finish immediately
+        future.get(20);
+    }
+
+    /**
+     * Waiting finished when work segment is set.
+     */
+    public void testFinishAwaitSegment_WhenNextSegmentEqualToWaitingOne() 
throws IgniteCheckedException, InterruptedException {
+        //given: thread which awaited segment.
+        SegmentAware aware = new SegmentAware(10);
+
+        IgniteInternalFuture future = awaitThread(() -> aware.awaitSegment(5));
+
+        //when: set less than awaiting segment.
+        aware.curAbsWalIdx(4);
+
+        //then: thread still waiting the segment
+        assertFutureIsNotFinish(future);
+
+        //when: trigger next segment.
+        aware.nextAbsoluteSegmentIndex();
+
+        //then: waiting should finish immediately
+        future.get(20);
+    }
+
+    /**
+     * Waiting finished when interrupt was triggered.
+     */
+    public void testFinishAwaitSegment_WhenInterruptWasCall() throws 
IgniteCheckedException, InterruptedException {
+        //given: thread which awaited segment.
+        SegmentAware aware = new SegmentAware(10);
+
+        IgniteInternalFuture future = awaitThread(() -> aware.awaitSegment(5));
+
+        //when: interrupt waiting.
+        aware.interrupt();
+
+        //then: IgniteInterruptedCheckedException should be throw.
+        assertTrue(future.get(20) instanceof 
IgniteInterruptedCheckedException);
+    }
+
+    /**
+     * Waiting finished when next work segment triggered.
+     */
+    public void testFinishWaitSegmentForArchive_WhenWorkSegmentIncremented() 
throws IgniteCheckedException, InterruptedException {
+        //given: thread which awaited segment.
+        SegmentAware aware = new SegmentAware(10);
+
+        aware.curAbsWalIdx(5);
+        aware.setLastArchivedAbsoluteIndex(4);
+
+        IgniteInternalFuture future = 
awaitThread(aware::waitNextSegmentForArchivation);
+
+        //when: next work segment triggered.
+        aware.nextAbsoluteSegmentIndex();
+
+        //then: waiting should finish immediately.
+        future.get(20);
+    }
+
+    /**
+     * Waiting finished when work segment is set.
+     */
+    public void testFinishWaitSegmentForArchive_WhenWorkSegmentGreaterValue() 
throws IgniteCheckedException, InterruptedException {
+        //given: thread which awaited segment.
+        SegmentAware aware = new SegmentAware(10);
+
+        aware.curAbsWalIdx(5);
+        aware.setLastArchivedAbsoluteIndex(4);
+
+        IgniteInternalFuture future = 
awaitThread(aware::waitNextSegmentForArchivation);
+
+        //when: set work segment greater than required.
+        aware.curAbsWalIdx(7);
+
+        //then: waiting should finish immediately.
+        future.get(20);
+    }
+
+    /**
+     * Waiting finished when interrupt was triggered.
+     */
+    public void testFinishWaitSegmentForArchive_WhenInterruptWasCall() throws 
IgniteCheckedException, InterruptedException {
+        //given: thread which awaited segment.
+        SegmentAware aware = new SegmentAware(10);
+
+        aware.curAbsWalIdx(5);
+        aware.setLastArchivedAbsoluteIndex(4);
+
+        IgniteInternalFuture future = 
awaitThread(aware::waitNextSegmentForArchivation);
+
+        //when: interrupt waiting.
+        aware.interrupt();
+
+        //then: IgniteInterruptedCheckedException should be throw.
+        assertTrue(future.get(20) instanceof 
IgniteInterruptedCheckedException);
+    }
+
+    /**
+     * Should correct calculate next segment.
+     */
+    public void testCorrectCalculateNextSegmentIndex() throws 
IgniteCheckedException, InterruptedException {
+        //given: thread which awaited segment.
+        SegmentAware aware = new SegmentAware(10);
+
+        aware.curAbsWalIdx(5);
+
+        //when: request next work segment.
+        long segmentIndex = aware.nextAbsoluteSegmentIndex();
+
+        //then:
+        assertThat(segmentIndex, is(6L));
+    }
+
+    /**
+     * Waiting finished when segment archived.
+     */
+    public void 
testFinishWaitNextAbsoluteIndex_WhenMarkAsArchivedFirstSegment() throws 
IgniteCheckedException, InterruptedException {
+        //given: thread which awaited segment.
+        SegmentAware aware = new SegmentAware(2);
+
+        aware.curAbsWalIdx(1);
+        aware.setLastArchivedAbsoluteIndex(-1);
+
+        IgniteInternalFuture future = 
awaitThread(aware::nextAbsoluteSegmentIndex);
+
+        //when: mark first segment as moved.
+        aware.markAsMovedToArchive(0);
+
+        //then: waiting should finish immediately.
+        future.get(20);
+    }
+
+    /**
+     * Waiting finished when segment archived.
+     */
+    public void testFinishWaitNextAbsoluteIndex_WhenSetToArchivedFirst() 
throws IgniteCheckedException, InterruptedException {
+        //given: thread which awaited segment.
+        SegmentAware aware = new SegmentAware(2);
+
+        aware.curAbsWalIdx(1);
+        aware.setLastArchivedAbsoluteIndex(-1);
+
+        IgniteInternalFuture future = 
awaitThread(aware::nextAbsoluteSegmentIndex);
+
+        //when: mark first segment as moved.
+        aware.setLastArchivedAbsoluteIndex(0);
+
+        //then: waiting should finish immediately.
+        future.get(20);
+    }
+
+    /**
+     * Waiting finished when force interrupt was triggered.
+     */
+    public void 
testFinishWaitNextAbsoluteIndex_WhenOnlyForceInterruptWasCall() throws 
IgniteCheckedException, InterruptedException {
+        //given: thread which awaited segment.
+        SegmentAware aware = new SegmentAware(2);
+
+        aware.curAbsWalIdx(2);
+        aware.setLastArchivedAbsoluteIndex(-1);
+
+        IgniteInternalFuture future = 
awaitThread(aware::nextAbsoluteSegmentIndex);
+
+        //when: interrupt waiting.
+        aware.interrupt();
+
+        //then: nothing to happen because nextAbsoluteSegmentIndex is not 
interrupt by "interrupt" call.
+        assertFutureIsNotFinish(future);
+
+        //when: force interrupt waiting.
+        aware.forceInterrupt();
+
+        //then: IgniteInterruptedCheckedException should be throw.
+        assertTrue(future.get(20) instanceof 
IgniteInterruptedCheckedException);
+    }
+
+    /**
+     * Waiting finished when segment archived.
+     */
+    public void testFinishSegmentArchived_WhenSetExactWaitingSegment() throws 
IgniteCheckedException, InterruptedException {
+        //given: thread which awaited segment.
+        SegmentAware aware = new SegmentAware(10);
+
+        IgniteInternalFuture future = awaitThread(() -> 
aware.awaitSegmentArchived(5));
+
+        //when: archived exact expected segment.
+        aware.setLastArchivedAbsoluteIndex(5);
+
+        //then: waiting should finish immediately.
+        future.get(20);
+    }
+
+    /**
+     * Waiting finished when segment archived.
+     */
+    public void testFinishSegmentArchived_WhenMarkExactWatingSegment() throws 
IgniteCheckedException, InterruptedException {
+        //given: thread which awaited segment.
+        SegmentAware aware = new SegmentAware(10);
+
+        IgniteInternalFuture future = awaitThread(() -> 
aware.awaitSegmentArchived(5));
+
+        //when: mark exact segment as moved.
+        aware.markAsMovedToArchive(5);
+
+        //then: waiting should finish immediately.
+        future.get(20);
+    }
+
+    /**
+     * Waiting finished when segment archived.
+     */
+    public void testFinishSegmentArchived_WhenSetGreaterThanWatingSegment() 
throws IgniteCheckedException, InterruptedException {
+        //given: thread which awaited segment.
+        SegmentAware aware = new SegmentAware(10);
+
+        IgniteInternalFuture future = awaitThread(() -> 
aware.awaitSegmentArchived(5));
+
+        //when: archived greater than expected segment.
+        aware.setLastArchivedAbsoluteIndex(7);
+
+        //then: waiting should finish immediately.
+        future.get(20);
+    }
+
+    /**
+     * Waiting finished when segment archived.
+     */
+    public void testFinishSegmentArchived_WhenMarkGreaterThanWatingSegment() 
throws IgniteCheckedException, InterruptedException {
+        //given: thread which awaited segment.
+        SegmentAware aware = new SegmentAware(10);
+
+        IgniteInternalFuture future = awaitThread(() -> 
aware.awaitSegmentArchived(5));
+
+        //when: moved greater than expected segment.
+        aware.markAsMovedToArchive(7);
+
+        //then: waiting should finish immediately.
+        future.get(20);
+    }
+
+    /**
+     * Waiting finished when interrupt was triggered.
+     */
+    public void testFinishSegmentArchived_WhenInterruptWasCall() throws 
IgniteCheckedException, InterruptedException {
+        //given: thread which awaited segment.
+        SegmentAware aware = new SegmentAware(10);
+
+        aware.curAbsWalIdx(5);
+        aware.setLastArchivedAbsoluteIndex(4);
+
+        IgniteInternalFuture future = awaitThread(() -> 
aware.awaitSegmentArchived(5));
+
+        //when: interrupt waiting.
+        aware.interrupt();
+
+        //then: IgniteInterruptedCheckedException should be throw.
+        assertTrue(future.get(20) instanceof 
IgniteInterruptedCheckedException);
+    }
+
+    /**
+     * Waiting finished when release work segment.
+     */
+    public void testMarkAsMovedToArchive_WhenReleaseLockedSegment() throws 
IgniteCheckedException, InterruptedException {
+        //given: thread which awaited segment.
+        SegmentAware aware = new SegmentAware(10);
+
+        aware.checkCanReadArchiveOrReserveWorkSegment(5);
+
+        IgniteInternalFuture future = awaitThread(() -> 
aware.markAsMovedToArchive(5));
+
+        //when: release exact expected work segment.
+        aware.releaseWorkSegment(5);
+
+        //then: waiting should finish immediately.
+        future.get(20);
+    }
+
+    /**
+     * Waiting finished and increment archived segment when interrupt was call.
+     */
+    public void testMarkAsMovedToArchive_WhenInterruptWasCall() throws 
IgniteCheckedException, InterruptedException {
+        //given: thread which awaited segment.
+        SegmentAware aware = new SegmentAware(10);
+        aware.checkCanReadArchiveOrReserveWorkSegment(5);
+
+        IgniteInternalFuture future = awaitThread(() -> 
aware.markAsMovedToArchive(5));
+
+        //when: interrupt waiting.
+        aware.interrupt();
+
+        //then: IgniteInterruptedCheckedException should be throw.
+        assertFalse(future.get(20) instanceof 
IgniteInterruptedCheckedException);
+
+        //and: last archived segment should be changed.
+        assertEquals(5, aware.lastArchivedAbsoluteIndex());
+    }
+
+    /**
+     * Waiting finished when segment archived.
+     */
+    public void testFinishWaitSegmentToCompress_WhenSetLastArchivedSegment() 
throws IgniteCheckedException, InterruptedException {
+        //given: thread which awaited segment.
+        SegmentAware aware = new SegmentAware(10);
+
+        aware.lastCompressedIdx(5);
+
+        IgniteInternalFuture future = 
awaitThread(aware::waitNextSegmentToCompress);
+
+        //when: archived expected segment.
+        aware.setLastArchivedAbsoluteIndex(6);
+
+        //then: waiting should finish immediately.
+        future.get(20);
+    }
+
+    /**
+     * Waiting finished when segment archived.
+     */
+    public void testFinishWaitSegmentToCompress_WhenMarkLastArchivedSegment() 
throws IgniteCheckedException, InterruptedException {
+        //given: thread which awaited segment.
+        SegmentAware aware = new SegmentAware(10);
+
+        aware.lastCompressedIdx(5);
+
+        IgniteInternalFuture future = 
awaitThread(aware::waitNextSegmentToCompress);
+
+        //when: marked expected segment.
+        aware.markAsMovedToArchive(6);
+
+        //then: waiting should finish immediately.
+        future.get(20);
+    }
+
+    /**
+     * Next segment for compress based on truncated archive idx.
+     */
+    public void testCorrectCalculateNextCompressSegment() throws 
IgniteCheckedException, InterruptedException {
+        //given: thread which awaited segment.
+        SegmentAware aware = new SegmentAware(10);
+
+        aware.lastCompressedIdx(5);
+        aware.setLastArchivedAbsoluteIndex(6);
+        aware.lastTruncatedArchiveIdx(7);
+
+        //when:
+        long segmentToCompress = aware.waitNextSegmentToCompress();
+
+        //then: segment to compress greater than truncated archive idx
+        assertEquals(8, segmentToCompress);
+    }
+
+    /**
+     * Waiting finished when interrupt was call.
+     */
+    public void testFinishWaitSegmentToCompress_WhenInterruptWasCall() throws 
IgniteCheckedException, InterruptedException {
+        //given: thread which awaited segment.
+        SegmentAware aware = new SegmentAware(10);
+        aware.lastCompressedIdx(5);
+
+        IgniteInternalFuture future = 
awaitThread(aware::waitNextSegmentToCompress);
+
+        //when: interrupt waiting.
+        aware.interrupt();
+
+        //then: IgniteInterruptedCheckedException should be throw.
+        assertTrue(future.get(20) instanceof 
IgniteInterruptedCheckedException);
+    }
+
+    /**
+     * Segment reserve correctly.
+     */
+    public void testReserveCorrectly() {
+        //given: thread which awaited segment.
+        SegmentAware aware = new SegmentAware(10);
+
+        //when: reserve one segment twice and one segment once.
+        aware.reserve(5);
+        aware.reserve(5);
+        aware.reserve(7);
+
+        //then: segments greater than minimum should be reserved.
+        assertTrue(aware.reserved(5));
+        assertTrue(aware.reserved(10));
+        assertFalse(aware.reserved(4));
+
+        //when: release one of twice locked segment.
+        aware.release(5);
+
+        //then: nothing to change.
+        assertTrue(aware.reserved(5));
+        assertTrue(aware.reserved(10));
+        assertFalse(aware.reserved(4));
+
+        //when: again release one of twice locked segment.
+        aware.release(5);
+
+        //then: segments greater than second locked segment should be reserved.
+        assertTrue(aware.reserved(7));
+        assertTrue(aware.reserved(10));
+        assertFalse(aware.reserved(5));
+        assertFalse(aware.reserved(6));
+
+        //when: release last segment.
+        aware.release(7);
+
+        //then: all segments should be released.
+        assertFalse(aware.reserved(7));
+        assertFalse(aware.reserved(10));
+        assertFalse(aware.reserved(6));
+    }
+
+    /**
+     * Should fail when release unreserved segment.
+     */
+    public void testAssertFail_WhenReleaseUnreservedSegment() {
+        //given: thread which awaited segment.
+        SegmentAware aware = new SegmentAware(10);
+
+        aware.reserve(5);
+        try {
+
+            aware.release(7);
+        }
+        catch (AssertionError e) {
+            return;
+        }
+
+        fail("Should fail with AssertError because this segment have not 
reserved");
+    }
+
+    /**
+     * Segment locked correctly.
+     */
+    public void testReserveWorkSegmentCorrectly() {
+        //given: thread which awaited segment.
+        SegmentAware aware = new SegmentAware(10);
+
+        //when: lock one segment twice.
+        aware.checkCanReadArchiveOrReserveWorkSegment(5);
+        aware.checkCanReadArchiveOrReserveWorkSegment(5);
+
+        //then: exact one segment should locked.
+        assertTrue(aware.locked(5));
+        assertFalse(aware.locked(6));
+        assertFalse(aware.locked(4));
+
+        //when: release segment once.
+        aware.releaseWorkSegment(5);
+
+        //then: nothing to change, segment still locked.
+        assertTrue(aware.locked(5));
+        assertFalse(aware.locked(6));
+        assertFalse(aware.locked(4));
+
+        //when: release segment.
+        aware.releaseWorkSegment(5);
+
+        //then: all segments should be unlocked.
+        assertFalse(aware.locked(5));
+        assertFalse(aware.locked(6));
+        assertFalse(aware.locked(4));
+    }
+
+    /**
+     * Should fail when release unlocked segment.
+     */
+    public void testAssertFail_WhenReleaseUnreservedWorkSegment() {
+        //given: thread which awaited segment.
+        SegmentAware aware = new SegmentAware(10);
+
+        aware.checkCanReadArchiveOrReserveWorkSegment(5);
+        try {
+
+            aware.releaseWorkSegment(7);
+        }
+        catch (AssertionError e) {
+            return;
+        }
+
+        fail("Should fail with AssertError because this segment have not 
reserved");
+    }
+
+    /**
+     * Assert that future is still not finished.
+     *
+     * @param future Future to check.
+     */
+    private void assertFutureIsNotFinish(IgniteInternalFuture future) throws 
IgniteCheckedException {
+        try {
+            future.get(20);
+
+            fail("Timeout should be appear because thread should be still 
work");
+        }
+        catch (IgniteFutureTimeoutCheckedException ignore) {
+
+        }
+    }
+
+    /**
+     * Create thread for execute waiter.
+     *
+     * @param waiter Waiter for execute in new thread.
+     * @return Future of thread.
+     */
+    private IgniteInternalFuture awaitThread(Waiter waiter) throws 
IgniteCheckedException, InterruptedException {
+        CountDownLatch latch = new CountDownLatch(1);
+        IgniteInternalFuture<Object> future = GridTestUtils.runAsync(
+            () -> {
+                latch.countDown();
+                try {
+                    waiter.await();
+                }
+                catch (IgniteInterruptedCheckedException e) {
+                    return e;
+                }
+
+                return null;
+            }
+        );
+
+        latch.await();
+
+        assertFutureIsNotFinish(future);
+
+        return future;
+    }
+
+    /**
+     * Represent of command for waiting.
+     */
+    interface Waiter {
+        /**
+         * Some waiting operation.
+         */
+        void await() throws IgniteInterruptedCheckedException;
+    }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/ignite/blob/82df9236/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/wal/reader/StandaloneWalRecordsIteratorTest.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/wal/reader/StandaloneWalRecordsIteratorTest.java
 
b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/wal/reader/StandaloneWalRecordsIteratorTest.java
index b6a04d0..cf660c8 100644
--- 
a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/wal/reader/StandaloneWalRecordsIteratorTest.java
+++ 
b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/wal/reader/StandaloneWalRecordsIteratorTest.java
@@ -123,7 +123,7 @@ public class StandaloneWalRecordsIteratorTest extends 
GridCommonAbstractTest {
 
         assertTrue("At least one WAL file must be opened!", 
CountedFileIO.getCountOpenedWalFiles() > 0);
 
-        assertEquals("All WAL files must be closed!", 
CountedFileIO.getCountOpenedWalFiles(), CountedFileIO.getCountClosedWalFiles());
+        assertTrue("All WAL files must be closed at least ones!", 
CountedFileIO.getCountOpenedWalFiles() <= 
CountedFileIO.getCountClosedWalFiles());
     }
 
     /**

http://git-wip-us.apache.org/repos/asf/ignite/blob/82df9236/modules/core/src/test/java/org/apache/ignite/testframework/GridTestUtils.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/test/java/org/apache/ignite/testframework/GridTestUtils.java 
b/modules/core/src/test/java/org/apache/ignite/testframework/GridTestUtils.java
index e4ea4a9..3c4bc4f 100644
--- 
a/modules/core/src/test/java/org/apache/ignite/testframework/GridTestUtils.java
+++ 
b/modules/core/src/test/java/org/apache/ignite/testframework/GridTestUtils.java
@@ -1313,25 +1313,7 @@ public final class GridTestUtils {
         assert fieldName != null;
 
         try {
-            // Resolve inner field.
-            Field field = cls.getDeclaredField(fieldName);
-
-            synchronized (field) {
-                // Backup accessible field state.
-                boolean accessible = field.isAccessible();
-
-                try {
-                    if (!accessible)
-                        field.setAccessible(true);
-
-                    obj = field.get(obj);
-                }
-                finally {
-                    // Recover accessible field state.
-                    if (!accessible)
-                        field.setAccessible(false);
-                }
-            }
+            obj = findField(cls, obj, fieldName);
 
             return (T)obj;
         }
@@ -1361,25 +1343,7 @@ public final class GridTestUtils {
                 Class<?> cls = obj instanceof Class ? (Class)obj : 
obj.getClass();
 
                 try {
-                    // Resolve inner field.
-                    Field field = cls.getDeclaredField(fieldName);
-
-                    synchronized (field) {
-                        // Backup accessible field state.
-                        boolean accessible = field.isAccessible();
-
-                        try {
-                            if (!accessible)
-                                field.setAccessible(true);
-
-                            obj = field.get(obj);
-                        }
-                        finally {
-                            // Recover accessible field state.
-                            if (!accessible)
-                                field.setAccessible(false);
-                        }
-                    }
+                    obj = findField(cls, obj, fieldName);
                 }
                 catch (NoSuchFieldException e) {
                     // Resolve inner class, if not an inner field.
@@ -1402,6 +1366,74 @@ public final class GridTestUtils {
     }
 
     /**
+     * Get object field value via reflection(including superclass).
+     *
+     * @param obj Object or class to get field value from.
+     * @param fieldNames Field names to get value for: 
obj->field1->field2->...->fieldN.
+     * @param <T> Expected field class.
+     * @return Field value.
+     * @throws IgniteException In case of error.
+     */
+    @SuppressWarnings("SynchronizationOnLocalVariableOrMethodParameter")
+    public static <T> T getFieldValueHierarchy(Object obj, String... 
fieldNames) throws IgniteException {
+        assert obj != null;
+        assert fieldNames != null;
+        assert fieldNames.length >= 1;
+
+        try {
+            for (String fieldName : fieldNames) {
+                Class<?> cls = obj instanceof Class ? (Class)obj : 
obj.getClass();
+
+                while (cls != null) {
+                    try {
+                        obj = findField(cls, obj, fieldName);
+
+                        break;
+                    }
+                    catch (NoSuchFieldException e) {
+                        cls = cls.getSuperclass();
+                    }
+                }
+            }
+
+            return (T)obj;
+        }
+        catch (IllegalAccessException e) {
+            throw new IgniteException("Failed to get object field [obj=" + obj 
+
+                ", fieldNames=" + Arrays.toString(fieldNames) + ']', e);
+        }
+    }
+
+    /**
+     * @param cls Class for searching.
+     * @param obj Target object.
+     * @param fieldName Field name for search.
+     * @return Field from object if it was found.
+     */
+    private static Object findField(Class<?> cls, Object obj,
+        String fieldName) throws NoSuchFieldException, IllegalAccessException {
+        // Resolve inner field.
+        Field field = cls.getDeclaredField(fieldName);
+
+        synchronized (field) {
+            // Backup accessible field state.
+            boolean accessible = field.isAccessible();
+
+            try {
+                if (!accessible)
+                    field.setAccessible(true);
+
+                return field.get(obj);
+            }
+            finally {
+                // Recover accessible field state.
+                if (!accessible)
+                    field.setAccessible(false);
+            }
+        }
+    }
+
+    /**
      * Get inner class by its name from the enclosing class.
      *
      * @param parentCls Parent class to resolve inner class for.

http://git-wip-us.apache.org/repos/asf/ignite/blob/82df9236/modules/core/src/test/java/org/apache/ignite/testsuites/IgnitePdsTestSuite.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/test/java/org/apache/ignite/testsuites/IgnitePdsTestSuite.java
 
b/modules/core/src/test/java/org/apache/ignite/testsuites/IgnitePdsTestSuite.java
index 9f50b32..2b57223 100644
--- 
a/modules/core/src/test/java/org/apache/ignite/testsuites/IgnitePdsTestSuite.java
+++ 
b/modules/core/src/test/java/org/apache/ignite/testsuites/IgnitePdsTestSuite.java
@@ -50,6 +50,8 @@ import 
org.apache.ignite.internal.processors.cache.persistence.wal.CpTriggeredWa
 import 
org.apache.ignite.internal.processors.cache.persistence.wal.ExplicitWalDeltaConsistencyTest;
 import 
org.apache.ignite.internal.processors.cache.persistence.wal.SegmentedRingByteBufferTest;
 import 
org.apache.ignite.internal.processors.cache.persistence.wal.SysPropWalDeltaConsistencyTest;
+import 
org.apache.ignite.internal.processors.cache.persistence.wal.aware.SegmentAware;
+import 
org.apache.ignite.internal.processors.cache.persistence.wal.aware.SegmentAwareTest;
 import 
org.apache.ignite.internal.processors.database.IgniteDbDynamicCacheSelfTest;
 import 
org.apache.ignite.internal.processors.database.IgniteDbMultiNodePutGetTest;
 import 
org.apache.ignite.internal.processors.database.IgniteDbPutGetWithCacheStoreTest;
@@ -99,6 +101,8 @@ public class IgnitePdsTestSuite extends TestSuite {
         // Binary meta tests.
         
suite.addTestSuite(IgnitePdsCacheObjectBinaryProcessorOnDiscoveryTest.class);
 
+        suite.addTestSuite(SegmentAwareTest.class);
+
         return suite;
     }
 

http://git-wip-us.apache.org/repos/asf/ignite/blob/82df9236/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/persistence/db/wal/IgniteWalRecoveryTest.java
----------------------------------------------------------------------
diff --git 
a/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/persistence/db/wal/IgniteWalRecoveryTest.java
 
b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/persistence/db/wal/IgniteWalRecoveryTest.java
index fcbad9b..c2b5dc2 100644
--- 
a/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/persistence/db/wal/IgniteWalRecoveryTest.java
+++ 
b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/persistence/db/wal/IgniteWalRecoveryTest.java
@@ -30,6 +30,7 @@ import java.util.Random;
 import java.util.Set;
 import java.util.UUID;
 import java.util.concurrent.Callable;
+import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.ThreadLocalRandom;
 import java.util.concurrent.TimeUnit;
 import org.apache.ignite.Ignite;
@@ -74,6 +75,7 @@ import 
org.apache.ignite.internal.processors.cache.persistence.pagemem.PageMemor
 import 
org.apache.ignite.internal.processors.cache.persistence.tree.io.TrackingPageIO;
 import org.apache.ignite.internal.processors.cache.version.GridCacheVersion;
 import org.apache.ignite.internal.util.GridUnsafe;
+import org.apache.ignite.internal.util.typedef.CA;
 import org.apache.ignite.internal.util.typedef.F;
 import org.apache.ignite.internal.util.typedef.PA;
 import org.apache.ignite.internal.util.typedef.PAX;
@@ -123,6 +125,9 @@ public class IgniteWalRecoveryTest extends 
GridCommonAbstractTest {
     /** */
     private int walSegmentSize;
 
+    /** */
+    private int walSegments = 10;
+
     /** Log only. */
     private boolean logOnly;
 
@@ -176,6 +181,8 @@ public class IgniteWalRecoveryTest extends 
GridCommonAbstractTest {
         if (walSegmentSize != 0)
             dbCfg.setWalSegmentSize(walSegmentSize);
 
+        dbCfg.setWalSegments(walSegments);
+
         cfg.setDataStorageConfiguration(dbCfg);
 
         BinaryConfiguration binCfg = new BinaryConfiguration();
@@ -446,7 +453,7 @@ public class IgniteWalRecoveryTest extends 
GridCommonAbstractTest {
 
                 Arrays.fill(data, (byte)i);
 
-                final byte[] loaded = (byte[]) cache.get(i);
+                final byte[] loaded = (byte[])cache.get(i);
 
                 Assert.assertArrayEquals(data, loaded);
 
@@ -913,7 +920,7 @@ public class IgniteWalRecoveryTest extends 
GridCommonAbstractTest {
                 sharedCtx0.database().checkpointReadLock();
 
                 try {
-                    storage0.putData(String.valueOf(i), new byte[]{(byte)(i % 
256), 2, 3});
+                    storage0.putData(String.valueOf(i), new byte[] {(byte)(i % 
256), 2, 3});
                 }
                 finally {
                     sharedCtx0.database().checkpointReadUnlock();
@@ -973,7 +980,7 @@ public class IgniteWalRecoveryTest extends 
GridCommonAbstractTest {
             for (int i = 0; i < cnt; i++) {
                 byte[] b1 = new byte[arraySize];
                 for (int k = 0; k < arraySize; k++) {
-                    b1[k] = (byte) (k % 100);
+                    b1[k] = (byte)(k % 100);
                 }
 
                 sharedCtx.database().checkpointReadLock();
@@ -991,7 +998,7 @@ public class IgniteWalRecoveryTest extends 
GridCommonAbstractTest {
                 assertEquals(arraySize, d2.length);
 
                 for (int k = 0; k < arraySize; k++) {
-                    assertEquals((byte) (k % 100), d2[k]);
+                    assertEquals((byte)(k % 100), d2[k]);
                 }
             }
 
@@ -1022,7 +1029,7 @@ public class IgniteWalRecoveryTest extends 
GridCommonAbstractTest {
                 sharedCtx0.database().checkpointReadLock();
 
                 try {
-                    storage.putData(String.valueOf(i), new byte[]{1, 2, 3});
+                    storage.putData(String.valueOf(i), new byte[] {1, 2, 3});
                 }
                 finally {
                     sharedCtx0.database().checkpointReadUnlock();
@@ -1075,7 +1082,7 @@ public class IgniteWalRecoveryTest extends 
GridCommonAbstractTest {
                 sharedCtx0.database().checkpointReadLock();
 
                 try {
-                    storage.putData(String.valueOf(i), new byte[]{1, 2, 3});
+                    storage.putData(String.valueOf(i), new byte[] {1, 2, 3});
                 }
                 finally {
                     sharedCtx0.database().checkpointReadUnlock();
@@ -1086,7 +1093,7 @@ public class IgniteWalRecoveryTest extends 
GridCommonAbstractTest {
                 sharedCtx0.database().checkpointReadLock();
 
                 try {
-                    storage.putData(String.valueOf(i), new byte[]{2, 2, 3, 4});
+                    storage.putData(String.valueOf(i), new byte[] {2, 2, 3, 
4});
                 }
                 finally {
                     sharedCtx0.database().checkpointReadUnlock();
@@ -1127,7 +1134,7 @@ public class IgniteWalRecoveryTest extends 
GridCommonAbstractTest {
                 sharedCtx0.database().checkpointReadLock();
 
                 try {
-                    storage.putData(String.valueOf(i), new byte[]{1, 2, 3});
+                    storage.putData(String.valueOf(i), new byte[] {1, 2, 3});
                 }
                 finally {
                     sharedCtx0.database().checkpointReadUnlock();
@@ -1165,6 +1172,74 @@ public class IgniteWalRecoveryTest extends 
GridCommonAbstractTest {
     /**
      * @throws Exception if failed.
      */
+    public void testAbsentDeadlock_Iterator_RollOver_Archivation() throws 
Exception {
+        try {
+            walSegments = 2;
+
+            walSegmentSize = 512 * 1024;
+
+            IgniteEx ignite0 = (IgniteEx)startGrid("node0");
+
+            ignite0.active(true);
+
+            IgniteCache<Object, Object> cache0 = ignite0.cache(CACHE_NAME);
+
+            for (int i = 0; i < 100; i++)
+                cache0.put(i, new IndexedObject(i));
+
+            GridCacheSharedContext<Object, Object> sharedCtx = 
ignite0.context().cache().context();
+
+            GridCacheDatabaseSharedManager db = 
(GridCacheDatabaseSharedManager)sharedCtx.database();
+
+            db.waitForCheckpoint("test");
+            db.enableCheckpoints(false).get();
+
+            // Log something to know where to start.
+            WALPointer ptr = sharedCtx.wal().log(new 
MemoryRecoveryRecord(U.currentTimeMillis()));
+
+            info("Replay marker: " + ptr);
+
+            for (int i = 100; i < 200; i++)
+                cache0.put(i, new IndexedObject(i));
+
+            CountDownLatch insertFinished = new CountDownLatch(1);
+            GridTestUtils.runAsync(
+                () -> {
+                    try (WALIterator it = sharedCtx.wal().replay(ptr)) {
+                        if (it.hasNext()) {
+                            it.next();
+
+                            insertFinished.await();
+                        }
+                    }
+
+                    return null;
+                }
+            );
+
+            IgniteInternalFuture<Object> future = GridTestUtils.runAsync(
+                () -> {
+                    for (int i = 0; i < 10000; i++)
+                        cache0.put(i, new IndexedObject(i));
+
+                    return null;
+                }
+            );
+
+            future.get();
+
+            insertFinished.countDown();
+
+            ignite0.close();
+        }
+        finally {
+            stopAllGrids();
+        }
+    }
+
+    /**
+     * @throws Exception if failed.
+     */
     public void testApplyDeltaRecords() throws Exception {
         try {
             IgniteEx ignite0 = (IgniteEx)startGrid("node0");
@@ -1310,12 +1385,12 @@ public class IgniteWalRecoveryTest extends 
GridCommonAbstractTest {
             final String cacheName = "transactional";
 
             CacheConfiguration<Object, Object> cacheConfiguration = new 
CacheConfiguration<>(cacheName)
-                    .setAtomicityMode(CacheAtomicityMode.TRANSACTIONAL)
-                    .setAffinity(new RendezvousAffinityFunction(false, 32))
-                    .setCacheMode(CacheMode.PARTITIONED)
-                    .setRebalanceMode(CacheRebalanceMode.SYNC)
-                    
.setWriteSynchronizationMode(CacheWriteSynchronizationMode.FULL_SYNC)
-                    .setBackups(2);
+                .setAtomicityMode(CacheAtomicityMode.TRANSACTIONAL)
+                .setAffinity(new RendezvousAffinityFunction(false, 32))
+                .setCacheMode(CacheMode.PARTITIONED)
+                .setRebalanceMode(CacheRebalanceMode.SYNC)
+                
.setWriteSynchronizationMode(CacheWriteSynchronizationMode.FULL_SYNC)
+                .setBackups(2);
 
             ignite.createCache(cacheConfiguration);
 
@@ -1329,7 +1404,7 @@ public class IgniteWalRecoveryTest extends 
GridCommonAbstractTest {
 
             for (int t = 1; t <= transactions; t++) {
                 Transaction tx = ignite.transactions().txStart(
-                        TransactionConcurrency.OPTIMISTIC, 
TransactionIsolation.READ_COMMITTED);
+                    TransactionConcurrency.OPTIMISTIC, 
TransactionIsolation.READ_COMMITTED);
 
                 Map<Object, Object> changesInTransaction = new HashMap<>();
 
@@ -1388,12 +1463,12 @@ public class IgniteWalRecoveryTest extends 
GridCommonAbstractTest {
             final String cacheName = "transactional";
 
             CacheConfiguration<Object, Object> cacheConfiguration = new 
CacheConfiguration<>(cacheName)
-                    .setAtomicityMode(CacheAtomicityMode.TRANSACTIONAL)
-                    .setAffinity(new RendezvousAffinityFunction(false, 32))
-                    .setCacheMode(CacheMode.PARTITIONED)
-                    .setRebalanceMode(CacheRebalanceMode.SYNC)
-                    
.setWriteSynchronizationMode(CacheWriteSynchronizationMode.FULL_SYNC)
-                    .setBackups(0);
+                .setAtomicityMode(CacheAtomicityMode.TRANSACTIONAL)
+                .setAffinity(new RendezvousAffinityFunction(false, 32))
+                .setCacheMode(CacheMode.PARTITIONED)
+                .setRebalanceMode(CacheRebalanceMode.SYNC)
+                
.setWriteSynchronizationMode(CacheWriteSynchronizationMode.FULL_SYNC)
+                .setBackups(0);
 
             ignite.createCache(cacheConfiguration);
 
@@ -1416,7 +1491,7 @@ public class IgniteWalRecoveryTest extends 
GridCommonAbstractTest {
 
             for (int t = 1; t <= transactions; t++) {
                 Transaction tx = ignite.transactions().txStart(
-                        TransactionConcurrency.OPTIMISTIC, 
TransactionIsolation.READ_COMMITTED);
+                    TransactionConcurrency.OPTIMISTIC, 
TransactionIsolation.READ_COMMITTED);
 
                 for (int op = 0; op < operationsPerTransaction; op++) {
                     int key = random.nextInt(1000) + 1;
@@ -1447,7 +1522,7 @@ public class IgniteWalRecoveryTest extends 
GridCommonAbstractTest {
                     WALRecord rec = tup.get2();
 
                     if (rec instanceof TxRecord) {
-                        TxRecord txRecord = (TxRecord) rec;
+                        TxRecord txRecord = (TxRecord)rec;
                         GridCacheVersion txId = txRecord.nearXidVersion();
 
                         switch (txRecord.state()) {
@@ -1470,8 +1545,9 @@ public class IgniteWalRecoveryTest extends 
GridCommonAbstractTest {
                             default:
                                 throw new IllegalStateException("Unknown Tx 
state of record " + txRecord);
                         }
-                    } else if (rec instanceof DataRecord) {
-                        DataRecord dataRecord = (DataRecord) rec;
+                    }
+                    else if (rec instanceof DataRecord) {
+                        DataRecord dataRecord = (DataRecord)rec;
 
                         for (DataEntry entry : dataRecord.writeEntries()) {
                             GridCacheVersion txId = entry.nearXidVersion();
@@ -1514,16 +1590,18 @@ public class IgniteWalRecoveryTest extends 
GridCommonAbstractTest {
             // Create pseudo-random array.
             for (int i = 0; i < payload.length; i++)
                 if (i % index == 0)
-                    payload[i] = (byte) index;
+                    payload[i] = (byte)index;
         }
 
         @Override
         public boolean equals(Object o) {
-            if (this == o) return true;
-            if (o == null || getClass() != o.getClass()) return false;
-            BigObject bigObject = (BigObject) o;
+            if (this == o)
+                return true;
+            if (o == null || getClass() != o.getClass())
+                return false;
+            BigObject bigObject = (BigObject)o;
             return index == bigObject.index &&
-                    Arrays.equals(payload, bigObject.payload);
+                Arrays.equals(payload, bigObject.payload);
         }
 
         @Override
@@ -1867,7 +1945,6 @@ public class IgniteWalRecoveryTest extends 
GridCommonAbstractTest {
         }
     }
 
-
     /**
      *
      */

Reply via email to