http://git-wip-us.apache.org/repos/asf/ignite/blob/b7cc1d74/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/FsyncModeFileWriteAheadLogManager.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/FsyncModeFileWriteAheadLogManager.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/FsyncModeFileWriteAheadLogManager.java
new file mode 100644
index 0000000..a4e98cf
--- /dev/null
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/FsyncModeFileWriteAheadLogManager.java
@@ -0,0 +1,3142 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.cache.persistence.wal;
+
+import java.io.BufferedInputStream;
+import java.io.BufferedOutputStream;
+import java.io.EOFException;
+import java.io.File;
+import java.io.FileFilter;
+import java.io.FileInputStream;
+import java.io.FileNotFoundException;
+import java.io.FileOutputStream;
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.nio.ByteOrder;
+import java.nio.file.Files;
+import java.sql.Time;
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.NavigableMap;
+import java.util.TreeMap;
+import java.util.TreeSet;
+import java.util.concurrent.PriorityBlockingQueue;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicLong;
+import java.util.concurrent.atomic.AtomicReference;
+import java.util.concurrent.atomic.AtomicReferenceFieldUpdater;
+import java.util.concurrent.locks.Condition;
+import java.util.concurrent.locks.Lock;
+import java.util.concurrent.locks.ReentrantLock;
+import java.util.regex.Pattern;
+import java.util.zip.ZipEntry;
+import java.util.zip.ZipInputStream;
+import java.util.zip.ZipOutputStream;
+import org.apache.ignite.IgniteCheckedException;
+import org.apache.ignite.IgniteLogger;
+import org.apache.ignite.IgniteSystemProperties;
+import org.apache.ignite.configuration.DataStorageConfiguration;
+import org.apache.ignite.configuration.IgniteConfiguration;
+import org.apache.ignite.configuration.WALMode;
+import org.apache.ignite.events.EventType;
+import org.apache.ignite.events.WalSegmentArchivedEvent;
+import org.apache.ignite.internal.GridKernalContext;
+import org.apache.ignite.internal.IgniteInternalFuture;
+import org.apache.ignite.internal.IgniteInterruptedCheckedException;
+import org.apache.ignite.internal.NodeInvalidator;
+import 
org.apache.ignite.internal.managers.eventstorage.GridEventStorageManager;
+import org.apache.ignite.internal.pagemem.wal.IgniteWriteAheadLogManager;
+import org.apache.ignite.internal.pagemem.wal.StorageException;
+import org.apache.ignite.internal.pagemem.wal.WALIterator;
+import org.apache.ignite.internal.pagemem.wal.WALPointer;
+import org.apache.ignite.internal.pagemem.wal.record.MarshalledRecord;
+import org.apache.ignite.internal.pagemem.wal.record.SwitchSegmentRecord;
+import org.apache.ignite.internal.pagemem.wal.record.WALRecord;
+import org.apache.ignite.internal.processors.cache.CacheGroupContext;
+import org.apache.ignite.internal.processors.cache.GridCacheSharedContext;
+import 
org.apache.ignite.internal.processors.cache.GridCacheSharedManagerAdapter;
+import 
org.apache.ignite.internal.processors.cache.persistence.DataStorageMetricsImpl;
+import 
org.apache.ignite.internal.processors.cache.persistence.GridCacheDatabaseSharedManager;
+import org.apache.ignite.internal.processors.cache.persistence.file.FileIO;
+import 
org.apache.ignite.internal.processors.cache.persistence.file.FileIOFactory;
+import 
org.apache.ignite.internal.processors.cache.persistence.filename.PdsFolderSettings;
+import 
org.apache.ignite.internal.processors.cache.persistence.wal.crc.PureJavaCrc32;
+import 
org.apache.ignite.internal.processors.cache.persistence.wal.record.HeaderRecord;
+import 
org.apache.ignite.internal.processors.cache.persistence.wal.serializer.RecordSerializer;
+import 
org.apache.ignite.internal.processors.cache.persistence.wal.serializer.RecordSerializerFactory;
+import 
org.apache.ignite.internal.processors.cache.persistence.wal.serializer.RecordSerializerFactoryImpl;
+import 
org.apache.ignite.internal.processors.cache.persistence.wal.serializer.RecordV1Serializer;
+import org.apache.ignite.internal.processors.timeout.GridTimeoutObject;
+import org.apache.ignite.internal.processors.timeout.GridTimeoutProcessor;
+import org.apache.ignite.internal.util.GridUnsafe;
+import org.apache.ignite.internal.util.future.GridFinishedFuture;
+import org.apache.ignite.internal.util.future.GridFutureAdapter;
+import org.apache.ignite.internal.util.typedef.CIX1;
+import org.apache.ignite.internal.util.typedef.F;
+import org.apache.ignite.internal.util.typedef.internal.S;
+import org.apache.ignite.internal.util.typedef.internal.SB;
+import org.apache.ignite.internal.util.typedef.internal.U;
+import org.apache.ignite.lang.IgniteBiTuple;
+import org.apache.ignite.lang.IgnitePredicate;
+import org.apache.ignite.lang.IgniteUuid;
+import org.jetbrains.annotations.NotNull;
+import org.jetbrains.annotations.Nullable;
+
+import static java.nio.file.StandardOpenOption.CREATE;
+import static java.nio.file.StandardOpenOption.READ;
+import static java.nio.file.StandardOpenOption.WRITE;
+import static 
org.apache.ignite.IgniteSystemProperties.IGNITE_WAL_SERIALIZER_VERSION;
+
+/**
+ * File WAL manager.
+ */
+public class FsyncModeFileWriteAheadLogManager extends 
GridCacheSharedManagerAdapter implements IgniteWriteAheadLogManager {
+    /** */
+    public static final FileDescriptor[] EMPTY_DESCRIPTORS = new 
FileDescriptor[0];
+
+    /** */
+    public static final String WAL_SEGMENT_FILE_EXT = ".wal";
+
+    /** */
+    private static final byte[] FILL_BUF = new byte[1024 * 1024];
+
+    /** Pattern for segment file names */
+    private static final Pattern WAL_NAME_PATTERN = 
Pattern.compile("\\d{16}\\.wal");
+
+    /** */
+    private static final Pattern WAL_TEMP_NAME_PATTERN = 
Pattern.compile("\\d{16}\\.wal\\.tmp");
+
+    /** WAL segment file filter, see {@link #WAL_NAME_PATTERN} */
+    public static final FileFilter WAL_SEGMENT_FILE_FILTER = new FileFilter() {
+        @Override public boolean accept(File file) {
+            return !file.isDirectory() && 
WAL_NAME_PATTERN.matcher(file.getName()).matches();
+        }
+    };
+
+    /** */
+    private static final FileFilter WAL_SEGMENT_TEMP_FILE_FILTER = new 
FileFilter() {
+        @Override public boolean accept(File file) {
+            return !file.isDirectory() && 
WAL_TEMP_NAME_PATTERN.matcher(file.getName()).matches();
+        }
+    };
+
+    /** */
+    private static final Pattern WAL_SEGMENT_FILE_COMPACTED_PATTERN = 
Pattern.compile("\\d{16}\\.wal\\.zip");
+
+    /** WAL segment file filter, see {@link #WAL_NAME_PATTERN} */
+    public static final FileFilter WAL_SEGMENT_COMPACTED_OR_RAW_FILE_FILTER = 
new FileFilter() {
+        @Override public boolean accept(File file) {
+            return !file.isDirectory() && 
(WAL_NAME_PATTERN.matcher(file.getName()).matches() ||
+                
WAL_SEGMENT_FILE_COMPACTED_PATTERN.matcher(file.getName()).matches());
+        }
+    };
+
+    /** */
+    private static final Pattern WAL_SEGMENT_TEMP_FILE_COMPACTED_PATTERN = 
Pattern.compile("\\d{16}\\.wal\\.zip\\.tmp");
+
+    /** */
+    private static final FileFilter WAL_SEGMENT_FILE_COMPACTED_FILTER = new 
FileFilter() {
+        @Override public boolean accept(File file) {
+            return !file.isDirectory() && 
WAL_SEGMENT_FILE_COMPACTED_PATTERN.matcher(file.getName()).matches();
+        }
+    };
+
+    /** */
+    private static final FileFilter WAL_SEGMENT_TEMP_FILE_COMPACTED_FILTER = 
new FileFilter() {
+        @Override public boolean accept(File file) {
+            return !file.isDirectory() && 
WAL_SEGMENT_TEMP_FILE_COMPACTED_PATTERN.matcher(file.getName()).matches();
+        }
+    };
+
+    /** Latest serializer version to use. */
+    private static final int LATEST_SERIALIZER_VERSION = 2;
+
+    /** */
+    private final boolean alwaysWriteFullPages;
+
+    /** WAL segment size in bytes */
+    private final long maxWalSegmentSize;
+
+    /** */
+    private final WALMode mode;
+
+    /** Thread local byte buffer size, see {@link #tlb} */
+    private final int tlbSize;
+
+    /** WAL flush frequency. Makes sense only for {@link WALMode#BACKGROUND} 
log WALMode. */
+    private final long flushFreq;
+
+    /** Fsync delay. */
+    private final long fsyncDelay;
+
+    /** */
+    private final DataStorageConfiguration dsCfg;
+
+    /** Events service */
+    private final GridEventStorageManager evt;
+
+    /** */
+    private IgniteConfiguration igCfg;
+
+    /** Persistence metrics tracker. */
+    private DataStorageMetricsImpl metrics;
+
+    /** */
+    private File walWorkDir;
+
+    /** WAL archive directory (including consistent ID as subfolder) */
+    private File walArchiveDir;
+
+    /** Serializer of latest version, used to read header record and for write 
records */
+    private RecordSerializer serializer;
+
+    /** Serializer latest version to use. */
+    private final int serializerVersion =
+        IgniteSystemProperties.getInteger(IGNITE_WAL_SERIALIZER_VERSION, 
LATEST_SERIALIZER_VERSION);
+
+    /** Latest segment cleared by {@link #truncate(WALPointer, WALPointer)}. */
+    private volatile long lastTruncatedArchiveIdx = -1L;
+
+    /** Factory to provide I/O interfaces for read/write operations with files 
*/
+    private final FileIOFactory ioFactory;
+
+    /** Updater for {@link #currentHnd}, used for verify there are no 
concurrent update for current log segment handle */
+    private static final 
AtomicReferenceFieldUpdater<FsyncModeFileWriteAheadLogManager, FileWriteHandle> 
currentHndUpd =
+        
AtomicReferenceFieldUpdater.newUpdater(FsyncModeFileWriteAheadLogManager.class, 
FileWriteHandle.class, "currentHnd");
+
+    /**
+     * Thread local byte buffer for saving serialized WAL records chain, see 
{@link FileWriteHandle#head}.
+     * Introduced to decrease number of buffers allocation.
+     * Used only for record itself is shorter than {@link #tlbSize}.
+     */
+    private final ThreadLocal<ByteBuffer> tlb = new ThreadLocal<ByteBuffer>() {
+        @Override protected ByteBuffer initialValue() {
+            ByteBuffer buf = ByteBuffer.allocateDirect(tlbSize);
+
+            buf.order(GridUnsafe.NATIVE_BYTE_ORDER);
+
+            return buf;
+        }
+    };
+
+    /** */
+    private volatile FileArchiver archiver;
+
+    /** Compressor. */
+    private volatile FileCompressor compressor;
+
+    /** Decompressor. */
+    private volatile FileDecompressor decompressor;
+
+    /** */
+    private final ThreadLocal<WALPointer> lastWALPtr = new ThreadLocal<>();
+
+    /** Current log segment handle */
+    private volatile FileWriteHandle currentHnd;
+
+    /** Environment failure. */
+    private volatile Throwable envFailed;
+
+    /**
+     * Positive (non-0) value indicates WAL can be archived even if not 
complete<br>
+     * See {@link 
DataStorageConfiguration#setWalAutoArchiveAfterInactivity(long)}<br>
+     */
+    private final long walAutoArchiveAfterInactivity;
+
+    /**
+     * Container with last WAL record logged timestamp.<br>
+     * Zero value means there was no records logged to current segment, skip 
possible archiving for this case<br>
+     * Value is filled only for case {@link #walAutoArchiveAfterInactivity} > 
0<br>
+     */
+    private AtomicLong lastRecordLoggedMs = new AtomicLong();
+
+    /**
+     * Cancellable task for {@link WALMode#BACKGROUND}, should be cancelled at 
shutdown
+     * Null for non background modes
+     */
+    @Nullable private volatile GridTimeoutProcessor.CancelableTask 
backgroundFlushSchedule;
+
+    /**
+     * Reference to the last added next archive timeout check object.
+     * Null if mode is not enabled.
+     * Should be cancelled at shutdown
+     */
+    @Nullable private volatile GridTimeoutObject nextAutoArchiveTimeoutObj;
+
+    /**
+     * @param ctx Kernal context.
+     */
+    public FsyncModeFileWriteAheadLogManager(@NotNull final GridKernalContext 
ctx) {
+        igCfg = ctx.config();
+
+        DataStorageConfiguration dsCfg = igCfg.getDataStorageConfiguration();
+
+        assert dsCfg != null;
+
+        this.dsCfg = dsCfg;
+
+        maxWalSegmentSize = dsCfg.getWalSegmentSize();
+        mode = dsCfg.getWalMode();
+        tlbSize = dsCfg.getWalThreadLocalBufferSize();
+        flushFreq = dsCfg.getWalFlushFrequency();
+        fsyncDelay = dsCfg.getWalFsyncDelayNanos();
+        alwaysWriteFullPages = dsCfg.isAlwaysWriteFullPages();
+        ioFactory = dsCfg.getFileIOFactory();
+        walAutoArchiveAfterInactivity = 
dsCfg.getWalAutoArchiveAfterInactivity();
+        evt = ctx.event();
+
+        assert mode == WALMode.FSYNC : dsCfg;
+    }
+
+    /** {@inheritDoc} */
+    @Override public void start0() throws IgniteCheckedException {
+        if (!cctx.kernalContext().clientNode()) {
+            final PdsFolderSettings resolveFolders = 
cctx.kernalContext().pdsFolderResolver().resolveFolders();
+
+            checkWalConfiguration();
+
+            walWorkDir = initDirectory(
+                dsCfg.getWalPath(),
+                DataStorageConfiguration.DFLT_WAL_PATH,
+                resolveFolders.folderName(),
+                "write ahead log work directory"
+            );
+
+            walArchiveDir = initDirectory(
+                dsCfg.getWalArchivePath(),
+                DataStorageConfiguration.DFLT_WAL_ARCHIVE_PATH,
+                resolveFolders.folderName(),
+                "write ahead log archive directory"
+            );
+
+            serializer = new 
RecordSerializerFactoryImpl(cctx).createSerializer(serializerVersion);
+
+            GridCacheDatabaseSharedManager dbMgr = 
(GridCacheDatabaseSharedManager)cctx.database();
+
+            metrics = dbMgr.persistentStoreMetricsImpl();
+
+            checkOrPrepareFiles();
+
+            IgniteBiTuple<Long, Long> tup = scanMinMaxArchiveIndices();
+
+            lastTruncatedArchiveIdx = tup == null ? -1 : tup.get1() - 1;
+
+            archiver = new FileArchiver(tup == null ? -1 : tup.get2());
+
+            if (dsCfg.isWalCompactionEnabled()) {
+                compressor = new FileCompressor();
+
+                decompressor = new FileDecompressor();
+            }
+
+            if (mode != WALMode.NONE) {
+                if (log.isInfoEnabled())
+                    log.info("Started write-ahead log manager [mode=" + mode + 
']');
+            }
+            else
+                U.quietAndWarn(log, "Started write-ahead log manager in NONE 
mode, persisted data may be lost in " +
+                    "a case of unexpected node failure. Make sure to 
deactivate the cluster before shutdown.");
+        }
+    }
+
+    /**
+     * @throws IgniteCheckedException if WAL store path is configured and 
archive path isn't (or vice versa)
+     */
+    private void checkWalConfiguration() throws IgniteCheckedException {
+        if (dsCfg.getWalPath() == null ^ dsCfg.getWalArchivePath() == null) {
+            throw new IgniteCheckedException(
+                "Properties should be either both specified or both null " +
+                    "[walStorePath = " + dsCfg.getWalPath() +
+                    ", walArchivePath = " + dsCfg.getWalArchivePath() + "]"
+            );
+        }
+    }
+
+    /** {@inheritDoc} */
+    @Override protected void stop0(boolean cancel) {
+        final GridTimeoutProcessor.CancelableTask schedule = 
backgroundFlushSchedule;
+
+        if (schedule != null)
+            schedule.close();
+
+        final GridTimeoutObject timeoutObj = nextAutoArchiveTimeoutObj;
+
+        if (timeoutObj != null)
+            cctx.time().removeTimeoutObject(timeoutObj);
+
+        final FileWriteHandle currHnd = currentHandle();
+
+        try {
+            if (mode == WALMode.BACKGROUND) {
+                if (currHnd != null)
+                    currHnd.flush((FileWALPointer)null, true);
+            }
+
+            if (currHnd != null)
+                currHnd.close(false);
+
+            if (archiver != null)
+                archiver.shutdown();
+
+            if (compressor != null)
+                compressor.shutdown();
+
+            if (decompressor != null)
+                decompressor.shutdown();
+        }
+        catch (Exception e) {
+            U.error(log, "Failed to gracefully close WAL segment: " + 
currentHnd.fileIO, e);
+        }
+    }
+
+    /** {@inheritDoc} */
+    @Override public void onActivate(GridKernalContext kctx) throws 
IgniteCheckedException {
+        if (log.isDebugEnabled())
+            log.debug("Activated file write ahead log manager [nodeId=" + 
cctx.localNodeId() +
+                " topVer=" + cctx.discovery().topologyVersionEx() + " ]");
+
+        start0();
+
+        if (!cctx.kernalContext().clientNode()) {
+            assert archiver != null;
+            archiver.start();
+
+            if (compressor != null)
+                compressor.start();
+
+            if (decompressor != null)
+                decompressor.start();
+        }
+    }
+
+    /** {@inheritDoc} */
+    @Override public void onDeActivate(GridKernalContext kctx) {
+        if (log.isDebugEnabled())
+            log.debug("DeActivate file write ahead log [nodeId=" + 
cctx.localNodeId() +
+                " topVer=" + cctx.discovery().topologyVersionEx() + " ]");
+
+        stop0(true);
+
+        currentHnd = null;
+    }
+
+    /** {@inheritDoc} */
+    @Override public boolean isAlwaysWriteFullPages() {
+        return alwaysWriteFullPages;
+    }
+
+    /** {@inheritDoc} */
+    @Override public boolean isFullSync() {
+        return mode == WALMode.FSYNC;
+    }
+
+    /** {@inheritDoc} */
+    @Override public void resumeLogging(WALPointer lastPtr) throws 
IgniteCheckedException {
+        try {
+            assert currentHnd == null;
+            assert lastPtr == null || lastPtr instanceof FileWALPointer;
+
+            FileWALPointer filePtr = (FileWALPointer)lastPtr;
+
+            currentHnd = restoreWriteHandle(filePtr);
+
+            if (currentHnd.serializer.version() != serializer.version()) {
+                if (log.isInfoEnabled())
+                    log.info("Record serializer version change detected, will 
start logging with a new WAL record " +
+                        "serializer to a new WAL segment [curFile=" + 
currentHnd + ", newVer=" + serializer.version() +
+                        ", oldVer=" + currentHnd.serializer.version() + ']');
+
+                rollOver(currentHnd);
+            }
+
+            if (mode == WALMode.BACKGROUND) {
+                backgroundFlushSchedule = cctx.time().schedule(new Runnable() {
+                    @Override public void run() {
+                        doFlush();
+                    }
+                }, flushFreq, flushFreq);
+            }
+
+            if (walAutoArchiveAfterInactivity > 0)
+                scheduleNextInactivityPeriodElapsedCheck();
+        }
+        catch (StorageException e) {
+            throw new IgniteCheckedException(e);
+        }
+    }
+
+    /**
+     * Schedules next check of inactivity period expired. Based on current 
record update timestamp.
+     * At timeout method does check of inactivity period and schedules new 
launch.
+     */
+    private void scheduleNextInactivityPeriodElapsedCheck() {
+        final long lastRecMs = lastRecordLoggedMs.get();
+        final long nextPossibleAutoArchive = (lastRecMs <= 0 ? 
U.currentTimeMillis() : lastRecMs) + walAutoArchiveAfterInactivity;
+
+        if (log.isDebugEnabled())
+            log.debug("Schedule WAL rollover check at " + new 
Time(nextPossibleAutoArchive).toString());
+
+        nextAutoArchiveTimeoutObj = new GridTimeoutObject() {
+            private final IgniteUuid id = IgniteUuid.randomUuid();
+
+            @Override public IgniteUuid timeoutId() {
+                return id;
+            }
+
+            @Override public long endTime() {
+                return nextPossibleAutoArchive;
+            }
+
+            @Override public void onTimeout() {
+                if (log.isDebugEnabled())
+                    log.debug("Checking if WAL rollover required (" + new 
Time(U.currentTimeMillis()).toString() + ")");
+
+                checkWalRolloverRequiredDuringInactivityPeriod();
+
+                scheduleNextInactivityPeriodElapsedCheck();
+            }
+        };
+        cctx.time().addTimeoutObject(nextAutoArchiveTimeoutObj);
+    }
+
+    /**
+     * @return Latest serializer version.
+     */
+    public int serializerVersion() {
+        return serializerVersion;
+    }
+
+    /**
+     * Checks if there was elapsed significant period of inactivity.
+     * If WAL auto-archive is enabled using {@link 
#walAutoArchiveAfterInactivity} > 0 this method will activate
+     * roll over by timeout<br>
+     */
+    private void checkWalRolloverRequiredDuringInactivityPeriod() {
+        if (walAutoArchiveAfterInactivity <= 0)
+            return; // feature not configured, nothing to do
+
+        final long lastRecMs = lastRecordLoggedMs.get();
+
+        if (lastRecMs == 0)
+            return; //no records were logged to current segment, does not 
consider inactivity
+
+        final long elapsedMs = U.currentTimeMillis() - lastRecMs;
+
+        if (elapsedMs <= walAutoArchiveAfterInactivity)
+            return; // not enough time elapsed since last write
+
+        if (!lastRecordLoggedMs.compareAndSet(lastRecMs, 0))
+            return; // record write occurred concurrently
+
+        final FileWriteHandle handle = currentHandle();
+
+        try {
+            rollOver(handle);
+        }
+        catch (IgniteCheckedException e) {
+            U.error(log, "Unable to perform segment rollover: " + 
e.getMessage(), e);
+
+            NodeInvalidator.INSTANCE.invalidate(cctx.kernalContext(), e);
+        }
+    }
+
+    /** {@inheritDoc} */
+    @SuppressWarnings("TooBroadScope")
+    @Override public WALPointer log(WALRecord record) throws 
IgniteCheckedException, StorageException {
+        if (serializer == null || mode == WALMode.NONE)
+            return null;
+
+        FileWriteHandle currWrHandle = currentHandle();
+
+        // Logging was not resumed yet.
+        if (currWrHandle == null)
+            return null;
+
+        // Need to calculate record size first.
+        record.size(serializer.size(record));
+
+        for (; ; currWrHandle = rollOver(currWrHandle)) {
+            WALPointer ptr = currWrHandle.addRecord(record);
+
+            if (ptr != null) {
+                metrics.onWalRecordLogged();
+
+                lastWALPtr.set(ptr);
+
+                if (walAutoArchiveAfterInactivity > 0)
+                    lastRecordLoggedMs.set(U.currentTimeMillis());
+
+                return ptr;
+            }
+
+            checkNode();
+
+            if (isStopping())
+                throw new IgniteCheckedException("Stopping.");
+        }
+    }
+
+    /** {@inheritDoc} */
+    @Override public void fsync(WALPointer ptr) throws IgniteCheckedException, 
StorageException {
+        if (serializer == null || mode == WALMode.NONE)
+            return;
+
+        FileWriteHandle cur = currentHandle();
+
+        // WAL manager was not started (client node).
+        if (cur == null)
+            return;
+
+        FileWALPointer filePtr = (FileWALPointer)(ptr == null ? 
lastWALPtr.get() : ptr);
+
+        // No need to sync if was rolled over.
+        if (filePtr != null && !cur.needFsync(filePtr))
+            return;
+
+        cur.fsync(filePtr, false);
+    }
+
+    /** {@inheritDoc} */
+    @Override public WALIterator replay(WALPointer start)
+        throws IgniteCheckedException, StorageException {
+        assert start == null || start instanceof FileWALPointer : "Invalid 
start pointer: " + start;
+
+        FileWriteHandle hnd = currentHandle();
+
+        FileWALPointer end = null;
+
+        if (hnd != null)
+            end = hnd.position();
+
+        return new RecordsIterator(
+            cctx,
+            walWorkDir,
+            walArchiveDir,
+            (FileWALPointer)start,
+            end,
+            dsCfg,
+            new RecordSerializerFactoryImpl(cctx),
+            ioFactory,
+            archiver,
+            decompressor,
+            log
+        );
+    }
+
+    /** {@inheritDoc} */
+    @Override public boolean reserve(WALPointer start) throws 
IgniteCheckedException {
+        assert start != null && start instanceof FileWALPointer : "Invalid 
start pointer: " + start;
+
+        if (mode == WALMode.NONE)
+            return false;
+
+        FileArchiver archiver0 = archiver;
+
+        if (archiver0 == null)
+            throw new IgniteCheckedException("Could not reserve WAL segment: 
archiver == null");
+
+        archiver0.reserve(((FileWALPointer)start).index());
+
+        if (!hasIndex(((FileWALPointer)start).index())) {
+            archiver0.release(((FileWALPointer)start).index());
+
+            return false;
+        }
+
+        return true;
+    }
+
+    /** {@inheritDoc} */
+    @Override public void release(WALPointer start) throws 
IgniteCheckedException {
+        assert start != null && start instanceof FileWALPointer : "Invalid 
start pointer: " + start;
+
+        if (mode == WALMode.NONE)
+            return;
+
+        FileArchiver archiver0 = archiver;
+
+        if (archiver0 == null)
+            throw new IgniteCheckedException("Could not release WAL segment: 
archiver == null");
+
+        archiver0.release(((FileWALPointer)start).index());
+    }
+
+    /**
+     * @param absIdx Absolulte index to check.
+     * @return {@code true} if has this index.
+     */
+    private boolean hasIndex(long absIdx) {
+        String segmentName = FileDescriptor.fileName(absIdx);
+
+        String zipSegmentName = FileDescriptor.fileName(absIdx) + ".zip";
+
+        boolean inArchive = new File(walArchiveDir, segmentName).exists() ||
+            new File(walArchiveDir, zipSegmentName).exists();
+
+        if (inArchive)
+            return true;
+
+        if (absIdx <= lastArchivedIndex())
+            return false;
+
+        FileWriteHandle cur = currentHnd;
+
+        return cur != null && cur.idx >= absIdx;
+    }
+
+    /** {@inheritDoc} */
+    @Override public int truncate(WALPointer low, WALPointer high) {
+        if (high == null)
+            return 0;
+
+        assert high instanceof FileWALPointer : high;
+
+        // File pointer bound: older entries will be deleted from archive
+        FileWALPointer lowPtr = (FileWALPointer)low;
+        FileWALPointer highPtr = (FileWALPointer)high;
+
+        FileDescriptor[] descs = 
scan(walArchiveDir.listFiles(WAL_SEGMENT_COMPACTED_OR_RAW_FILE_FILTER));
+
+        int deleted = 0;
+
+        FileArchiver archiver0 = archiver;
+
+        for (FileDescriptor desc : descs) {
+            if (lowPtr != null && desc.idx < lowPtr.index())
+                continue;
+
+            // Do not delete reserved or locked segment and any segment after 
it.
+            if (archiver0 != null && archiver0.reserved(desc.idx))
+                return deleted;
+
+            long lastArchived = archiver0 != null ? 
archiver0.lastArchivedAbsoluteIndex() : lastArchivedIndex();
+
+            // We need to leave at least one archived segment to correctly 
determine the archive index.
+            if (desc.idx < highPtr.index() && desc.idx < lastArchived) {
+                if (!desc.file.delete())
+                    U.warn(log, "Failed to remove obsolete WAL segment (make 
sure the process has enough rights): " +
+                        desc.file.getAbsolutePath());
+                else
+                    deleted++;
+
+                // Bump up the oldest archive segment index.
+                if (lastTruncatedArchiveIdx < desc.idx)
+                    lastTruncatedArchiveIdx = desc.idx;
+            }
+        }
+
+        return deleted;
+    }
+
+    /** {@inheritDoc} */
+    @Override public void allowCompressionUntil(WALPointer ptr) {
+        if (compressor != null)
+            compressor.allowCompressionUntil(((FileWALPointer)ptr).index());
+    }
+
+    /** {@inheritDoc} */
+    @Override public int walArchiveSegments() {
+        long lastTruncated = lastTruncatedArchiveIdx;
+
+        long lastArchived = archiver.lastArchivedAbsoluteIndex();
+
+        if (lastArchived == -1)
+            return 0;
+
+        int res = (int)(lastArchived - lastTruncated);
+
+        return res >= 0 ? res : 0;
+    }
+
+    /** {@inheritDoc} */
+    @Override public boolean reserved(WALPointer ptr) {
+        FileWALPointer fPtr = (FileWALPointer)ptr;
+
+        FileArchiver archiver0 = archiver;
+
+        return archiver0 != null && archiver0.reserved(fPtr.index());
+    }
+
+    /** {@inheritDoc} */
+    @Override public boolean disabled(int grpId) {
+        CacheGroupContext ctx = cctx.cache().cacheGroup(grpId);
+
+        return ctx != null && !ctx.walEnabled();
+    }
+
+    /**
+     * Lists files in archive directory and returns the index of last archived 
file.
+     *
+     * @return The absolute index of last archived file.
+     */
+    private long lastArchivedIndex() {
+        long lastIdx = -1;
+
+        for (File file : 
walArchiveDir.listFiles(WAL_SEGMENT_COMPACTED_OR_RAW_FILE_FILTER)) {
+            try {
+                long idx = Long.parseLong(file.getName().substring(0, 16));
+
+                lastIdx = Math.max(lastIdx, idx);
+            }
+            catch (NumberFormatException | IndexOutOfBoundsException ignore) {
+
+            }
+        }
+
+        return lastIdx;
+    }
+
+    /**
+     * Lists files in archive directory and returns the indices of least and 
last archived files.
+     * In case of holes, first segment after last "hole" is considered as 
minimum.
+     * Example: minimum(0, 1, 10, 11, 20, 21, 22) should be 20
+     *
+     * @return The absolute indices of min and max archived files.
+     */
+    private IgniteBiTuple<Long, Long> scanMinMaxArchiveIndices() {
+        TreeSet<Long> archiveIndices = new TreeSet<>();
+
+        for (File file : 
walArchiveDir.listFiles(WAL_SEGMENT_COMPACTED_OR_RAW_FILE_FILTER)) {
+            try {
+                long idx = Long.parseLong(file.getName().substring(0, 16));
+
+                archiveIndices.add(idx);
+            }
+            catch (NumberFormatException | IndexOutOfBoundsException ignore) {
+                // No-op.
+            }
+        }
+
+        if (archiveIndices.isEmpty())
+            return null;
+        else {
+            Long min = archiveIndices.first();
+            Long max = archiveIndices.last();
+
+            if (max - min == archiveIndices.size() - 1)
+                return F.t(min, max); // Short path.
+
+            for (Long idx : archiveIndices.descendingSet()) {
+                if (!archiveIndices.contains(idx - 1))
+                    return F.t(idx, max);
+            }
+
+            throw new IllegalStateException("Should never happen if TreeSet is 
valid.");
+        }
+    }
+
+    /**
+     * Creates a directory specified by the given arguments.
+     *
+     * @param cfg Configured directory path, may be {@code null}.
+     * @param defDir Default directory path, will be used if cfg is {@code 
null}.
+     * @param consId Local node consistent ID.
+     * @param msg File description to print out on successful initialization.
+     * @return Initialized directory.
+     * @throws IgniteCheckedException If failed to initialize directory.
+     */
+    private File initDirectory(String cfg, String defDir, String consId, 
String msg) throws IgniteCheckedException {
+        File dir;
+
+        if (cfg != null) {
+            File workDir0 = new File(cfg);
+
+            dir = workDir0.isAbsolute() ?
+                new File(workDir0, consId) :
+                new File(U.resolveWorkDirectory(igCfg.getWorkDirectory(), cfg, 
false), consId);
+        }
+        else
+            dir = new File(U.resolveWorkDirectory(igCfg.getWorkDirectory(), 
defDir, false), consId);
+
+        U.ensureDirectory(dir, msg, log);
+
+        return dir;
+    }
+
+    /**
+     * @return Current log segment handle.
+     */
+    private FileWriteHandle currentHandle() {
+        return currentHnd;
+    }
+
+    /**
+     * @param cur Handle that failed to fit the given entry.
+     * @return Handle that will fit the entry.
+     */
+    private FileWriteHandle rollOver(FileWriteHandle cur) throws 
StorageException, IgniteCheckedException {
+        FileWriteHandle hnd = currentHandle();
+
+        if (hnd != cur)
+            return hnd;
+
+        if (hnd.close(true)) {
+            FileWriteHandle next = initNextWriteHandle(cur.idx);
+
+            boolean swapped = currentHndUpd.compareAndSet(this, hnd, next);
+
+            assert swapped : "Concurrent updates on rollover are not allowed";
+
+            if (walAutoArchiveAfterInactivity > 0)
+                lastRecordLoggedMs.set(0);
+
+            // Let other threads to proceed with new segment.
+            hnd.signalNextAvailable();
+        }
+        else
+            hnd.awaitNext();
+
+        return currentHandle();
+    }
+
+    /**
+     * @param lastReadPtr Last read WAL file pointer.
+     * @return Initialized file write handle.
+     * @throws IgniteCheckedException If failed to initialize WAL write handle.
+     */
+    private FileWriteHandle restoreWriteHandle(FileWALPointer lastReadPtr) 
throws IgniteCheckedException {
+        long absIdx = lastReadPtr == null ? 0 : lastReadPtr.index();
+
+        long segNo = absIdx % dsCfg.getWalSegments();
+
+        File curFile = new File(walWorkDir, FileDescriptor.fileName(segNo));
+
+        int offset = lastReadPtr == null ? 0 : lastReadPtr.fileOffset();
+        int len = lastReadPtr == null ? 0 : lastReadPtr.length();
+
+        try {
+            FileIO fileIO = ioFactory.create(curFile);
+
+            try {
+                int serVer = serializerVersion;
+
+                // If we have existing segment, try to read version from it.
+                if (lastReadPtr != null) {
+                    try {
+                        serVer = 
readSerializerVersionAndCompactedFlag(fileIO).get1();
+                    }
+                    catch (SegmentEofException | EOFException ignore) {
+                        serVer = serializerVersion;
+                    }
+                }
+
+                RecordSerializer ser = new 
RecordSerializerFactoryImpl(cctx).createSerializer(serVer);
+
+                if (log.isInfoEnabled())
+                    log.info("Resuming logging to WAL segment [file=" + 
curFile.getAbsolutePath() +
+                        ", offset=" + offset + ", ver=" + serVer + ']');
+
+                FileWriteHandle hnd = new FileWriteHandle(
+                    fileIO,
+                    absIdx,
+                    offset + len,
+                    maxWalSegmentSize,
+                    ser);
+
+                // For new handle write serializer version to it.
+                if (lastReadPtr == null)
+                    hnd.writeSerializerVersion();
+
+                archiver.currentWalIndex(absIdx);
+
+                return hnd;
+            }
+            catch (IgniteCheckedException | IOException e) {
+                fileIO.close();
+
+                throw e;
+            }
+        }
+        catch (IOException e) {
+            throw new IgniteCheckedException("Failed to restore WAL write 
handle: " + curFile.getAbsolutePath(), e);
+        }
+    }
+
+    /**
+     * Fills the file header for a new segment.
+     * Calling this method signals we are done with the segment and it can be 
archived.
+     * If we don't have prepared file yet and achiever is busy this method 
blocks
+     *
+     * @param curIdx current absolute segment released by WAL writer
+     * @return Initialized file handle.
+     * @throws StorageException If IO exception occurred.
+     * @throws IgniteCheckedException If failed.
+     */
+    private FileWriteHandle initNextWriteHandle(long curIdx) throws 
StorageException, IgniteCheckedException {
+        try {
+            File nextFile = pollNextFile(curIdx);
+
+            if (log.isDebugEnabled())
+                log.debug("Switching to a new WAL segment: " + 
nextFile.getAbsolutePath());
+
+            FileIO fileIO = ioFactory.create(nextFile);
+
+            FileWriteHandle hnd = new FileWriteHandle(
+                fileIO,
+                curIdx + 1,
+                0,
+                maxWalSegmentSize,
+                serializer);
+
+            hnd.writeSerializerVersion();
+
+            return hnd;
+        }
+        catch (IOException e) {
+            StorageException se = new StorageException("Unable to initialize 
WAL segment", e);
+
+            NodeInvalidator.INSTANCE.invalidate(cctx.kernalContext(), se);
+
+            throw se;
+        }
+    }
+
+    /**
+     * Deletes temp files, creates and prepares new; Creates first segment if 
necessary
+     */
+    private void checkOrPrepareFiles() throws IgniteCheckedException {
+        // Clean temp files.
+        {
+            File[] tmpFiles = 
walWorkDir.listFiles(WAL_SEGMENT_TEMP_FILE_FILTER);
+
+            if (!F.isEmpty(tmpFiles)) {
+                for (File tmp : tmpFiles) {
+                    boolean deleted = tmp.delete();
+
+                    if (!deleted)
+                        throw new IgniteCheckedException("Failed to delete 
previously created temp file " +
+                            "(make sure Ignite process has enough rights): " + 
tmp.getAbsolutePath());
+                }
+            }
+        }
+
+        File[] allFiles = walWorkDir.listFiles(WAL_SEGMENT_FILE_FILTER);
+
+        if (allFiles.length != 0 && allFiles.length > dsCfg.getWalSegments())
+            throw new IgniteCheckedException("Failed to initialize wal (work 
directory contains " +
+                "incorrect number of segments) [cur=" + allFiles.length + ", 
expected=" + dsCfg.getWalSegments() + ']');
+
+        // Allocate the first segment synchronously. All other segments will 
be allocated by archiver in background.
+        if (allFiles.length == 0) {
+            File first = new File(walWorkDir, FileDescriptor.fileName(0));
+
+            createFile(first);
+        }
+        else
+            checkFiles(0, false, null);
+    }
+
+    /**
+     * Clears the file with zeros.
+     *
+     * @param file File to format.
+     */
+    private void formatFile(File file) throws IgniteCheckedException {
+        if (log.isDebugEnabled())
+            log.debug("Formatting file [exists=" + file.exists() + ", file=" + 
file.getAbsolutePath() + ']');
+
+        try (FileIO fileIO = ioFactory.create(file, CREATE, READ, WRITE)) {
+            int left = dsCfg.getWalSegmentSize();
+
+            if (mode == WALMode.FSYNC) {
+                while (left > 0) {
+                    int toWrite = Math.min(FILL_BUF.length, left);
+
+                    fileIO.write(FILL_BUF, 0, toWrite);
+
+                    left -= toWrite;
+                }
+
+                fileIO.force();
+            }
+            else
+                fileIO.clear();
+        }
+        catch (IOException e) {
+            throw new IgniteCheckedException("Failed to format WAL segment 
file: " + file.getAbsolutePath(), e);
+        }
+    }
+
+    /**
+     * Creates a file atomically with temp file.
+     *
+     * @param file File to create.
+     * @throws IgniteCheckedException If failed.
+     */
+    private void createFile(File file) throws IgniteCheckedException {
+        if (log.isDebugEnabled())
+            log.debug("Creating new file [exists=" + file.exists() + ", file=" 
+ file.getAbsolutePath() + ']');
+
+        File tmp = new File(file.getParent(), file.getName() + ".tmp");
+
+        formatFile(tmp);
+
+        try {
+            Files.move(tmp.toPath(), file.toPath());
+        }
+        catch (IOException e) {
+            throw new IgniteCheckedException("Failed to move temp file to a 
regular WAL segment file: " +
+                file.getAbsolutePath(), e);
+        }
+
+        if (log.isDebugEnabled())
+            log.debug("Created WAL segment [file=" + file.getAbsolutePath() + 
", size=" + file.length() + ']');
+    }
+
+    /**
+     * Retrieves next available file to write WAL data, waiting
+     * if necessary for a segment to become available.
+     *
+     * @param curIdx Current absolute WAL segment index.
+     * @return File ready for use as new WAL segment.
+     * @throws IgniteCheckedException If failed.
+     */
+    private File pollNextFile(long curIdx) throws IgniteCheckedException {
+        // Signal to archiver that we are done with the segment and it can be 
archived.
+        long absNextIdx = archiver.nextAbsoluteSegmentIndex(curIdx);
+
+        long segmentIdx = absNextIdx % dsCfg.getWalSegments();
+
+        return new File(walWorkDir, FileDescriptor.fileName(segmentIdx));
+    }
+
+
+    /**
+     * @return Sorted WAL files descriptors.
+     */
+    public static FileDescriptor[] scan(File[] allFiles) {
+        if (allFiles == null)
+            return EMPTY_DESCRIPTORS;
+
+        FileDescriptor[] descs = new FileDescriptor[allFiles.length];
+
+        for (int i = 0; i < allFiles.length; i++) {
+            File f = allFiles[i];
+
+            descs[i] = new FileDescriptor(f);
+        }
+
+        Arrays.sort(descs);
+
+        return descs;
+    }
+
+    /**
+     * @throws StorageException If node is no longer valid and we missed a WAL 
operation.
+     */
+    private void checkNode() throws StorageException {
+        if (cctx.kernalContext().invalidated())
+            throw new StorageException("Failed to perform WAL operation 
(environment was invalidated by a " +
+                    "previous error)");
+    }
+
+    /**
+     * File archiver operates on absolute segment indexes. For any given 
absolute segment index N we can calculate
+     * the work WAL segment: S(N) = N % dsCfg.walSegments.
+     * When a work segment is finished, it is given to the archiver. If the 
absolute index of last archived segment
+     * is denoted by A and the absolute index of next segment we want to write 
is denoted by W, then we can allow
+     * write to S(W) if W - A <= walSegments. <br>
+     *
+     * Monitor of current object is used for notify on:
+     * <ul>
+     * <li>exception occurred ({@link FileArchiver#cleanException}!=null)</li>
+     * <li>stopping thread ({@link FileArchiver#stopped}==true)</li>
+     * <li>current file index changed ({@link FileArchiver#curAbsWalIdx})</li>
+     * <li>last archived file index was changed ({@link 
FileArchiver#lastAbsArchivedIdx})</li>
+     * <li>some WAL index was removed from {@link FileArchiver#locked} map</li>
+     * </ul>
+     */
+    private class FileArchiver extends Thread {
+        /** Exception which occurred during initial creation of files or 
during archiving WAL segment */
+        private IgniteCheckedException cleanException;
+
+        /**
+         * Absolute current segment index WAL Manager writes to. Guarded by 
<code>this</code>.
+         * Incremented during rollover. Also may be directly set if WAL is 
resuming logging after start.
+         */
+        private long curAbsWalIdx = -1;
+
+        /** Last archived file index (absolute, 0-based). Guarded by 
<code>this</code>. */
+        private volatile long lastAbsArchivedIdx = -1;
+
+        /** current thread stopping advice */
+        private volatile boolean stopped;
+
+        /** */
+        private NavigableMap<Long, Integer> reserved = new TreeMap<>();
+
+        /**
+         * Maps absolute segment index to locks counter. Lock on segment 
protects from archiving segment and may
+         * come from {@link RecordsIterator} during WAL replay. Map itself is 
guarded by <code>this</code>.
+         */
+        private Map<Long, Integer> locked = new HashMap<>();
+
+        /**
+         *
+         */
+        private FileArchiver(long lastAbsArchivedIdx) {
+            super("wal-file-archiver%" + cctx.igniteInstanceName());
+
+            this.lastAbsArchivedIdx = lastAbsArchivedIdx;
+        }
+
+        /**
+         * @return Last archived segment absolute index.
+         */
+        private long lastArchivedAbsoluteIndex() {
+            return lastAbsArchivedIdx;
+        }
+
+        /**
+         * @throws IgniteInterruptedCheckedException If failed to wait for 
thread shutdown.
+         */
+        private void shutdown() throws IgniteInterruptedCheckedException {
+            synchronized (this) {
+                stopped = true;
+
+                notifyAll();
+            }
+
+            U.join(this);
+        }
+
+        /**
+         * @param curAbsWalIdx Current absolute WAL segment index.
+         */
+        private void currentWalIndex(long curAbsWalIdx) {
+            synchronized (this) {
+                this.curAbsWalIdx = curAbsWalIdx;
+
+                notifyAll();
+            }
+        }
+
+        /**
+         * @param absIdx Index for reservation.
+         */
+        private synchronized void reserve(long absIdx) {
+            Integer cur = reserved.get(absIdx);
+
+            if (cur == null)
+                reserved.put(absIdx, 1);
+            else
+                reserved.put(absIdx, cur + 1);
+        }
+
+        /**
+         * Check if WAL segment locked or reserved
+         *
+         * @param absIdx Index for check reservation.
+         * @return {@code True} if index is reserved.
+         */
+        private synchronized boolean reserved(long absIdx) {
+            return locked.containsKey(absIdx) || reserved.floorKey(absIdx) != 
null;
+        }
+
+        /**
+         * @param absIdx Reserved index.
+         */
+        private synchronized void release(long absIdx) {
+            Integer cur = reserved.get(absIdx);
+
+            assert cur != null && cur >= 1 : cur;
+
+            if (cur == 1)
+                reserved.remove(absIdx);
+            else
+                reserved.put(absIdx, cur - 1);
+        }
+
+        /** {@inheritDoc} */
+        @Override public void run() {
+            try {
+                allocateRemainingFiles();
+            }
+            catch (IgniteCheckedException e) {
+                synchronized (this) {
+                    // Stop the thread and report to starter.
+                    cleanException = e;
+
+                    notifyAll();
+
+                    return;
+                }
+            }
+
+            try {
+                synchronized (this) {
+                    while (curAbsWalIdx == -1 && !stopped)
+                        wait();
+
+                    if (curAbsWalIdx != 0 && lastAbsArchivedIdx == -1)
+                        
changeLastArchivedIndexAndWakeupCompressor(curAbsWalIdx - 1);
+                }
+
+                while (!Thread.currentThread().isInterrupted() && !stopped) {
+                    long toArchive;
+
+                    synchronized (this) {
+                        assert lastAbsArchivedIdx <= curAbsWalIdx : 
"lastArchived=" + lastAbsArchivedIdx +
+                            ", current=" + curAbsWalIdx;
+
+                        while (lastAbsArchivedIdx >= curAbsWalIdx - 1 && 
!stopped)
+                            wait();
+
+                        toArchive = lastAbsArchivedIdx + 1;
+                    }
+
+                    if (stopped)
+                        break;
+
+                    try {
+                        final SegmentArchiveResult res = 
archiveSegment(toArchive);
+
+                        synchronized (this) {
+                            while (locked.containsKey(toArchive) && !stopped)
+                                wait();
+                        }
+
+                        // Firstly, format working file
+                        if (!stopped)
+                            formatFile(res.getOrigWorkFile());
+
+                        synchronized (this) {
+                            // Then increase counter to allow rollover on 
clean working file
+                            
changeLastArchivedIndexAndWakeupCompressor(toArchive);
+
+                            notifyAll();
+                        }
+
+                        if 
(evt.isRecordable(EventType.EVT_WAL_SEGMENT_ARCHIVED))
+                            evt.record(new 
WalSegmentArchivedEvent(cctx.discovery().localNode(),
+                                res.getAbsIdx(), res.getDstArchiveFile()));
+                    }
+                    catch (IgniteCheckedException e) {
+                        synchronized (this) {
+                            cleanException = e;
+
+                            notifyAll();
+                        }
+                    }
+                }
+            }
+            catch (InterruptedException ignore) {
+                Thread.currentThread().interrupt();
+            }
+        }
+
+        /**
+         * @param idx Index.
+         */
+        private void changeLastArchivedIndexAndWakeupCompressor(long idx) {
+            lastAbsArchivedIdx = idx;
+
+            if (compressor != null)
+                compressor.onNextSegmentArchived();
+        }
+
+        /**
+         * Gets the absolute index of the next WAL segment available to write.
+         * Blocks till there are available file to write
+         *
+         * @param curIdx Current absolute index that we want to increment.
+         * @return Next index (curWalSegmIdx+1) when it is ready to be written.
+         * @throws IgniteCheckedException If failed (if interrupted or if 
exception occurred in the archiver thread).
+         */
+        private long nextAbsoluteSegmentIndex(long curIdx) throws 
IgniteCheckedException {
+            try {
+                synchronized (this) {
+                    if (cleanException != null)
+                        throw cleanException;
+
+                    assert curIdx == curAbsWalIdx;
+
+                    curAbsWalIdx++;
+
+                    // Notify archiver thread.
+                    notifyAll();
+
+                    while (curAbsWalIdx - lastAbsArchivedIdx > 
dsCfg.getWalSegments() && cleanException == null)
+                        wait();
+
+                    return curAbsWalIdx;
+                }
+            }
+            catch (InterruptedException e) {
+                Thread.currentThread().interrupt();
+
+                throw new IgniteInterruptedCheckedException(e);
+            }
+        }
+
+        /**
+         * @param absIdx Segment absolute index.
+         * @return <ul><li>{@code True} if can read, no lock is held, 
</li><li>{@code false} if work segment, need
+         * release segment later, use {@link #releaseWorkSegment} for 
unlock</li> </ul>
+         */
+        @SuppressWarnings("NonPrivateFieldAccessedInSynchronizedContext")
+        private boolean checkCanReadArchiveOrReserveWorkSegment(long absIdx) {
+            synchronized (this) {
+                if (lastAbsArchivedIdx >= absIdx) {
+                    if (log.isDebugEnabled())
+                        log.debug("Not needed to reserve WAL segment: absIdx=" 
+ absIdx + ";" +
+                            " lastAbsArchivedIdx=" + lastAbsArchivedIdx);
+
+                    return true;
+                }
+
+                Integer cur = locked.get(absIdx);
+
+                cur = cur == null ? 1 : cur + 1;
+
+                locked.put(absIdx, cur);
+
+                if (log.isDebugEnabled())
+                    log.debug("Reserved work segment [absIdx=" + absIdx + ", 
pins=" + cur + ']');
+
+                return false;
+            }
+        }
+
+        /**
+         * @param absIdx Segment absolute index.
+         */
+        @SuppressWarnings("NonPrivateFieldAccessedInSynchronizedContext")
+        private void releaseWorkSegment(long absIdx) {
+            synchronized (this) {
+                Integer cur = locked.get(absIdx);
+
+                assert cur != null && cur > 0 : "WAL Segment with Index " + 
absIdx + " is not locked;" +
+                    " lastAbsArchivedIdx = " + lastAbsArchivedIdx;
+
+                if (cur == 1) {
+                    locked.remove(absIdx);
+
+                    if (log.isDebugEnabled())
+                        log.debug("Fully released work segment (ready to 
archive) [absIdx=" + absIdx + ']');
+                }
+                else {
+                    locked.put(absIdx, cur - 1);
+
+                    if (log.isDebugEnabled())
+                        log.debug("Partially released work segment [absIdx=" + 
absIdx + ", pins=" + (cur - 1) + ']');
+                }
+
+                notifyAll();
+            }
+        }
+
+        /**
+         * Moves WAL segment from work folder to archive folder.
+         * Temp file is used to do movement
+         *
+         * @param absIdx Absolute index to archive.
+         */
+        private SegmentArchiveResult archiveSegment(long absIdx) throws 
IgniteCheckedException {
+            long segIdx = absIdx % dsCfg.getWalSegments();
+
+            File origFile = new File(walWorkDir, 
FileDescriptor.fileName(segIdx));
+
+            String name = FileDescriptor.fileName(absIdx);
+
+            File dstTmpFile = new File(walArchiveDir, name + ".tmp");
+
+            File dstFile = new File(walArchiveDir, name);
+
+            if (log.isDebugEnabled())
+                log.debug("Starting to copy WAL segment [absIdx=" + absIdx + 
", segIdx=" + segIdx +
+                    ", origFile=" + origFile.getAbsolutePath() + ", dstFile=" 
+ dstFile.getAbsolutePath() + ']');
+
+            try {
+                Files.deleteIfExists(dstTmpFile.toPath());
+
+                Files.copy(origFile.toPath(), dstTmpFile.toPath());
+
+                Files.move(dstTmpFile.toPath(), dstFile.toPath());
+
+                if (mode == WALMode.FSYNC) {
+                    try (FileIO f0 = ioFactory.create(dstFile, CREATE, READ, 
WRITE)) {
+                        f0.force();
+                    }
+                }
+            }
+            catch (IOException e) {
+                throw new IgniteCheckedException("Failed to archive WAL 
segment [" +
+                    "srcFile=" + origFile.getAbsolutePath() +
+                    ", dstFile=" + dstTmpFile.getAbsolutePath() + ']', e);
+            }
+
+            if (log.isDebugEnabled())
+                log.debug("Copied file [src=" + origFile.getAbsolutePath() +
+                    ", dst=" + dstFile.getAbsolutePath() + ']');
+
+            return new SegmentArchiveResult(absIdx, origFile, dstFile);
+        }
+
+        /**
+         *
+         */
+        private boolean checkStop() {
+            return stopped;
+        }
+
+        /**
+         * Background creation of all segments except first. First segment was 
created in main thread by
+         * {@link FileWriteAheadLogManager#checkOrPrepareFiles()}
+         */
+        private void allocateRemainingFiles() throws IgniteCheckedException {
+            checkFiles(1, true, new IgnitePredicate<Integer>() {
+                @Override public boolean apply(Integer integer) {
+                    return !checkStop();
+                }
+            });
+        }
+    }
+
+    /**
+     * Responsible for compressing WAL archive segments.
+     * Also responsible for deleting raw copies of already compressed WAL 
archive segments if they are not reserved.
+     */
+    private class FileCompressor extends Thread {
+        /** Current thread stopping advice. */
+        private volatile boolean stopped;
+
+        /** Last successfully compressed segment. */
+        private volatile long lastCompressedIdx = -1L;
+
+        /** All segments prior to this (inclusive) can be compressed. */
+        private volatile long lastAllowedToCompressIdx = -1L;
+
+        /**
+         *
+         */
+        FileCompressor() {
+            super("wal-file-compressor%" + cctx.igniteInstanceName());
+        }
+
+        /**
+         *
+         */
+        private void init() {
+            File[] toDel = 
walArchiveDir.listFiles(WAL_SEGMENT_TEMP_FILE_COMPACTED_FILTER);
+
+            for (File f : toDel) {
+                if (stopped)
+                    return;
+
+                f.delete();
+            }
+
+            FileDescriptor[] alreadyCompressed = 
scan(walArchiveDir.listFiles(WAL_SEGMENT_FILE_COMPACTED_FILTER));
+
+            if (alreadyCompressed.length > 0)
+                lastCompressedIdx = alreadyCompressed[alreadyCompressed.length 
- 1].getIdx();
+        }
+
+        /**
+         * @param lastCpStartIdx Segment index to allow compression until 
(exclusively).
+         */
+        synchronized void allowCompressionUntil(long lastCpStartIdx) {
+            lastAllowedToCompressIdx = lastCpStartIdx - 1;
+
+            notify();
+        }
+
+        /**
+         * Callback for waking up compressor when new segment is archived.
+         */
+        synchronized void onNextSegmentArchived() {
+            notify();
+        }
+
+        /**
+         * Pessimistically tries to reserve segment for compression in order 
to avoid concurrent truncation.
+         * Waits if there's no segment to archive right now.
+         */
+        private long tryReserveNextSegmentOrWait() throws 
InterruptedException, IgniteCheckedException {
+            long segmentToCompress = lastCompressedIdx + 1;
+
+            synchronized (this) {
+                while (segmentToCompress > Math.min(lastAllowedToCompressIdx, 
archiver.lastArchivedAbsoluteIndex())) {
+                    wait();
+
+                    if (stopped)
+                        return -1;
+                }
+            }
+
+            segmentToCompress = Math.max(segmentToCompress, 
lastTruncatedArchiveIdx + 1);
+
+            boolean reserved = reserve(new FileWALPointer(segmentToCompress, 
0, 0));
+
+            return reserved ? segmentToCompress : -1;
+        }
+
+        /**
+         *
+         */
+        private void deleteObsoleteRawSegments() {
+            FileDescriptor[] descs = 
scan(walArchiveDir.listFiles(WAL_SEGMENT_FILE_FILTER));
+
+            FileArchiver archiver0 = archiver;
+
+            for (FileDescriptor desc : descs) {
+                // Do not delete reserved or locked segment and any segment 
after it.
+                if (archiver0 != null && archiver0.reserved(desc.idx))
+                    return;
+
+                if (desc.idx < lastCompressedIdx) {
+                    if (!desc.file.delete())
+                        U.warn(log, "Failed to remove obsolete WAL segment 
(make sure the process has enough rights): " +
+                            desc.file.getAbsolutePath() + ", exists: " + 
desc.file.exists());
+                }
+            }
+        }
+
+        /** {@inheritDoc} */
+        @Override public void run() {
+            init();
+
+            while (!Thread.currentThread().isInterrupted() && !stopped) {
+                try {
+                    deleteObsoleteRawSegments();
+
+                    long nextSegment = tryReserveNextSegmentOrWait();
+                    if (nextSegment == -1)
+                        continue;
+
+                    File tmpZip = new File(walArchiveDir, 
FileDescriptor.fileName(nextSegment) + ".zip" + ".tmp");
+
+                    File zip = new File(walArchiveDir, 
FileDescriptor.fileName(nextSegment) + ".zip");
+
+                    File raw = new File(walArchiveDir, 
FileDescriptor.fileName(nextSegment));
+                    if (!Files.exists(raw.toPath()))
+                        throw new IgniteCheckedException("WAL archive segment 
is missing: " + raw);
+
+                    compressSegmentToFile(nextSegment, raw, tmpZip);
+
+                    Files.move(tmpZip.toPath(), zip.toPath());
+
+                    if (mode == WALMode.FSYNC) {
+                        try (FileIO f0 = ioFactory.create(zip, CREATE, READ, 
WRITE)) {
+                            f0.force();
+                        }
+                    }
+
+                    lastCompressedIdx = nextSegment;
+                }
+                catch (IgniteCheckedException | IOException e) {
+                    U.error(log, "Unexpected error during WAL compression", e);
+
+                    NodeInvalidator.INSTANCE.invalidate(cctx.kernalContext(), 
e);
+                }
+                catch (InterruptedException e) {
+                    Thread.currentThread().interrupt();
+                }
+            }
+        }
+
+        /**
+         * @param nextSegment Next segment absolute idx.
+         * @param raw Raw file.
+         * @param zip Zip file.
+         */
+        private void compressSegmentToFile(long nextSegment, File raw, File 
zip)
+            throws IOException, IgniteCheckedException {
+            int segmentSerializerVer;
+
+            try (FileIO fileIO = ioFactory.create(raw)) {
+                IgniteBiTuple<Integer, Boolean> tup = 
FileWriteAheadLogManager.readSerializerVersionAndCompactedFlag(fileIO);
+
+                segmentSerializerVer = tup.get1();
+            }
+
+            try (ZipOutputStream zos = new ZipOutputStream(new 
BufferedOutputStream(new FileOutputStream(zip)))) {
+                zos.putNextEntry(new ZipEntry(""));
+
+                zos.write(prepareSerializerVersionBuffer(nextSegment, 
segmentSerializerVer, true).array());
+
+                final CIX1<WALRecord> appendToZipC = new CIX1<WALRecord>() {
+                    @Override public void applyx(WALRecord record) throws 
IgniteCheckedException {
+                        final MarshalledRecord marshRec = 
(MarshalledRecord)record;
+
+                        try {
+                            zos.write(marshRec.buffer().array(), 0, 
marshRec.buffer().remaining());
+                        }
+                        catch (IOException e) {
+                            throw new IgniteCheckedException(e);
+                        }
+                    }
+                };
+
+                try (SingleSegmentLogicalRecordsIterator iter = new 
SingleSegmentLogicalRecordsIterator(
+                    log, cctx, ioFactory, tlbSize, nextSegment, walArchiveDir, 
appendToZipC)) {
+
+                    while (iter.hasNextX())
+                        iter.nextX();
+                }
+            }
+            finally {
+                release(new FileWALPointer(nextSegment, 0, 0));
+            }
+        }
+
+        /**
+         * @throws IgniteInterruptedCheckedException If failed to wait for 
thread shutdown.
+         */
+        private void shutdown() throws IgniteInterruptedCheckedException {
+            synchronized (this) {
+                stopped = true;
+
+                notifyAll();
+            }
+
+            U.join(this);
+        }
+    }
+
+    /**
+     * Responsible for decompressing previously compressed segments of WAL 
archive if they are needed for replay.
+     */
+    private class FileDecompressor extends Thread {
+        /** Current thread stopping advice. */
+        private volatile boolean stopped;
+
+        /** Decompression futures. */
+        private Map<Long, GridFutureAdapter<Void>> decompressionFutures = new 
HashMap<>();
+
+        /** Segments queue. */
+        private PriorityBlockingQueue<Long> segmentsQueue = new 
PriorityBlockingQueue<>();
+
+        /** Byte array for draining data. */
+        private byte[] arr = new byte[tlbSize];
+
+        /**
+         *
+         */
+        FileDecompressor() {
+            super("wal-file-decompressor%" + cctx.igniteInstanceName());
+        }
+
+        /** {@inheritDoc} */
+        @Override public void run() {
+            while (!Thread.currentThread().isInterrupted() && !stopped) {
+                try {
+                    long segmentToDecompress = segmentsQueue.take();
+
+                    if (stopped)
+                        break;
+
+                    File zip = new File(walArchiveDir, 
FileDescriptor.fileName(segmentToDecompress) + ".zip");
+                    File unzipTmp = new File(walArchiveDir, 
FileDescriptor.fileName(segmentToDecompress) + ".tmp");
+                    File unzip = new File(walArchiveDir, 
FileDescriptor.fileName(segmentToDecompress));
+
+                    try (ZipInputStream zis = new ZipInputStream(new 
BufferedInputStream(new FileInputStream(zip)));
+                        FileIO io = ioFactory.create(unzipTmp)) {
+                        zis.getNextEntry();
+
+                        int bytesRead;
+                        while ((bytesRead = zis.read(arr)) > 0)
+                            io.write(arr, 0, bytesRead);
+                    }
+
+                    Files.move(unzipTmp.toPath(), unzip.toPath());
+
+                    synchronized (this) {
+                        
decompressionFutures.remove(segmentToDecompress).onDone();
+                    }
+                }
+                catch (InterruptedException e){
+                    Thread.currentThread().interrupt();
+                }
+                catch (IOException e) {
+                    U.error(log, "Unexpected error during WAL decompression", 
e);
+
+                    NodeInvalidator.INSTANCE.invalidate(cctx.kernalContext(), 
e);
+                }
+            }
+        }
+
+        /**
+         * Asynchronously decompresses WAL segment which is present only in 
.zip file.
+         *
+         * @return Future which is completed once file is decompressed.
+         */
+        synchronized IgniteInternalFuture<Void> decompressFile(long idx) {
+            if (decompressionFutures.containsKey(idx))
+                return decompressionFutures.get(idx);
+
+            File f = new File(walArchiveDir, FileDescriptor.fileName(idx));
+
+            if (f.exists())
+                return new GridFinishedFuture<>();
+
+            segmentsQueue.put(idx);
+
+            GridFutureAdapter<Void> res = new GridFutureAdapter<>();
+
+            decompressionFutures.put(idx, res);
+
+            return res;
+        }
+
+        /**
+         * @throws IgniteInterruptedCheckedException If failed to wait for 
thread shutdown.
+         */
+        private void shutdown() throws IgniteInterruptedCheckedException {
+            synchronized (this) {
+                stopped = true;
+
+                // Put fake -1 to wake thread from queue.take()
+                segmentsQueue.put(-1L);
+            }
+
+            U.join(this);
+        }
+    }
+
+    /**
+     * Validate files depending on {@link 
DataStorageConfiguration#getWalSegments()}  and create if need.
+     * Check end when exit condition return false or all files are passed.
+     *
+     * @param startWith Start with.
+     * @param create Flag create file.
+     * @param p Predicate Exit condition.
+     * @throws IgniteCheckedException if validation or create file fail.
+     */
+    private void checkFiles(int startWith, boolean create, 
IgnitePredicate<Integer> p) throws IgniteCheckedException {
+        for (int i = startWith; i < dsCfg.getWalSegments() && (p == null || (p 
!= null && p.apply(i))); i++) {
+            File checkFile = new File(walWorkDir, FileDescriptor.fileName(i));
+
+            if (checkFile.exists()) {
+                if (checkFile.isDirectory())
+                    throw new IgniteCheckedException("Failed to initialize WAL 
log segment (a directory with " +
+                        "the same name already exists): " + 
checkFile.getAbsolutePath());
+                else if (checkFile.length() != dsCfg.getWalSegmentSize() && 
mode == WALMode.FSYNC)
+                    throw new IgniteCheckedException("Failed to initialize WAL 
log segment " +
+                        "(WAL segment size change is not supported):" + 
checkFile.getAbsolutePath());
+            }
+            else if (create)
+                createFile(checkFile);
+        }
+    }
+
+    /**
+     * Reads record serializer version from provided {@code io} along with 
compacted flag.
+     * NOTE: Method mutates position of {@code io}.
+     *
+     * @param io I/O interface for file.
+     * @return Serializer version stored in the file.
+     * @throws IgniteCheckedException If failed to read serializer version.
+     */
+    public static IgniteBiTuple<Integer, Boolean> 
readSerializerVersionAndCompactedFlag(FileIO io)
+            throws IgniteCheckedException, IOException {
+        try (ByteBufferExpander buf = new 
ByteBufferExpander(RecordV1Serializer.HEADER_RECORD_SIZE, 
ByteOrder.nativeOrder())) {
+            FileInput in = new FileInput(io, buf);
+
+            in.ensure(RecordV1Serializer.HEADER_RECORD_SIZE);
+
+            int recordType = in.readUnsignedByte();
+
+            if (recordType == WALRecord.RecordType.STOP_ITERATION_RECORD_TYPE)
+                throw new SegmentEofException("Reached logical end of the 
segment", null);
+
+            WALRecord.RecordType type = 
WALRecord.RecordType.fromOrdinal(recordType - 1);
+
+            if (type != WALRecord.RecordType.HEADER_RECORD)
+                throw new IOException("Can't read serializer version", null);
+
+            // Read file pointer.
+            FileWALPointer ptr = RecordV1Serializer.readPosition(in);
+
+            assert ptr.fileOffset() == 0 : "Header record should be placed at 
the beginning of file " + ptr;
+
+            long hdrMagicNum = in.readLong();
+
+            boolean compacted;
+            if (hdrMagicNum == HeaderRecord.REGULAR_MAGIC)
+                compacted = false;
+            else if (hdrMagicNum == HeaderRecord.COMPACTED_MAGIC)
+                compacted = true;
+            else {
+                throw new IOException("Magic is corrupted [exp=" + 
U.hexLong(HeaderRecord.REGULAR_MAGIC) +
+                    ", actual=" + U.hexLong(hdrMagicNum) + ']');
+            }
+
+            // Read serializer version.
+            int ver = in.readInt();
+
+            // Read and skip CRC.
+            in.readInt();
+
+            return new IgniteBiTuple<>(ver, compacted);
+        }
+    }
+
+    /**
+     * Writes record serializer version to provided {@code io}.
+     * NOTE: Method mutates position of {@code io}.
+     *
+     * @param io I/O interface for file.
+     * @param idx Segment index.
+     * @param version Serializer version.
+     * @return I/O position after write version.
+     * @throws IOException If failed to write serializer version.
+     */
+    public static long writeSerializerVersion(FileIO io, long idx, int 
version, WALMode mode) throws IOException {
+        ByteBuffer buffer = prepareSerializerVersionBuffer(idx, version, 
false);
+
+        do {
+            io.write(buffer);
+        }
+        while (buffer.hasRemaining());
+
+        // Flush
+        if (mode == WALMode.FSYNC)
+            io.force();
+
+        return io.position();
+    }
+
+    /**
+     * @param idx Index.
+     * @param ver Version.
+     * @param compacted Compacted flag.
+     */
+    @NotNull private static ByteBuffer prepareSerializerVersionBuffer(long 
idx, int ver, boolean compacted) {
+        ByteBuffer buf = 
ByteBuffer.allocate(RecordV1Serializer.HEADER_RECORD_SIZE);
+        buf.order(ByteOrder.nativeOrder());
+
+        // Write record type.
+        buf.put((byte) (WALRecord.RecordType.HEADER_RECORD.ordinal() + 1));
+
+        // Write position.
+        RecordV1Serializer.putPosition(buf, new FileWALPointer(idx, 0, 0));
+
+        // Place magic number.
+        buf.putLong(compacted ? HeaderRecord.COMPACTED_MAGIC : 
HeaderRecord.REGULAR_MAGIC);
+
+        // Place serializer version.
+        buf.putInt(ver);
+
+        // Place CRC if needed.
+        if (!RecordV1Serializer.skipCrc) {
+            int curPos = buf.position();
+
+            buf.position(0);
+
+            // This call will move buffer position to the end of the record 
again.
+            int crcVal = PureJavaCrc32.calcCrc32(buf, curPos);
+
+            buf.putInt(crcVal);
+        }
+        else
+            buf.putInt(0);
+
+        // Write header record through io.
+        buf.position(0);
+
+        return buf;
+    }
+
+    /**
+     * WAL file descriptor.
+     */
+    public static class FileDescriptor implements Comparable<FileDescriptor>, 
AbstractWalRecordsIterator.AbstractFileDescriptor {
+        /** */
+        protected final File file;
+
+        /** Absolute WAL segment file index */
+        protected final long idx;
+
+        /**
+         * Creates file descriptor. Index is restored from file name
+         *
+         * @param file WAL segment file.
+         */
+        public FileDescriptor(@NotNull File file) {
+            this(file, null);
+        }
+
+        /**
+         * @param file WAL segment file.
+         * @param idx Absolute WAL segment file index. For null value index is 
restored from file name
+         */
+        public FileDescriptor(@NotNull File file, @Nullable Long idx) {
+            this.file = file;
+
+            String fileName = file.getName();
+
+            assert fileName.contains(WAL_SEGMENT_FILE_EXT);
+
+            this.idx = idx == null ? Long.parseLong(fileName.substring(0, 16)) 
: idx;
+        }
+
+        /**
+         * @param segment Segment index.
+         * @return Segment file name.
+         */
+        public static String fileName(long segment) {
+            SB b = new SB();
+
+            String segmentStr = Long.toString(segment);
+
+            for (int i = segmentStr.length(); i < 16; i++)
+                b.a('0');
+
+            b.a(segmentStr).a(WAL_SEGMENT_FILE_EXT);
+
+            return b.toString();
+        }
+
+        /**
+         * @param segment Segment number as integer.
+         * @return Segment number as aligned string.
+         */
+        private static String segmentNumber(long segment) {
+            SB b = new SB();
+
+            String segmentStr = Long.toString(segment);
+
+            for (int i = segmentStr.length(); i < 16; i++)
+                b.a('0');
+
+            b.a(segmentStr);
+
+            return b.toString();
+        }
+
+        /** {@inheritDoc} */
+        @Override public int compareTo(FileDescriptor o) {
+            return Long.compare(idx, o.idx);
+        }
+
+        /** {@inheritDoc} */
+        @Override public boolean equals(Object o) {
+            if (this == o)
+                return true;
+
+            if (!(o instanceof FileDescriptor))
+                return false;
+
+            FileDescriptor that = (FileDescriptor)o;
+
+            return idx == that.idx;
+        }
+
+        /** {@inheritDoc} */
+        @Override public int hashCode() {
+            return (int)(idx ^ (idx >>> 32));
+        }
+
+        /**
+         * @return Absolute WAL segment file index
+         */
+        public long getIdx() {
+            return idx;
+        }
+
+        /**
+         * @return absolute pathname string of this file descriptor pathname.
+         */
+        public String getAbsolutePath() {
+            return file.getAbsolutePath();
+        }
+
+        /** {@inheritDoc} */
+        @Override public boolean isCompressed() {
+            return file.getName().endsWith(".zip");
+        }
+
+        /** {@inheritDoc} */
+        @Override public File file() {
+            return file;
+        }
+
+        /** {@inheritDoc} */
+        @Override public long idx() {
+            return idx;
+        }
+    }
+
+    /**
+     *
+     */
+    private abstract static class FileHandle {
+        /** I/O interface for read/write operations with file */
+        protected FileIO fileIO;
+
+        /** Absolute WAL segment file index (incremental counter) */
+        protected final long idx;
+
+        /**
+         * @param fileIO I/O interface for read/write operations of FileHandle.
+         * @param idx Absolute WAL segment file index (incremental counter).
+         */
+        private FileHandle(FileIO fileIO, long idx) {
+            this.fileIO = fileIO;
+            this.idx = idx;
+        }
+    }
+
+    /**
+     *
+     */
+    public static class ReadFileHandle extends FileHandle implements 
AbstractWalRecordsIterator.AbstractReadFileHandle {
+        /** Entry serializer. */
+        RecordSerializer ser;
+
+        /** */
+        FileInput in;
+
+        /**
+         * <code>true</code> if this file handle came from work directory.
+         * <code>false</code> if this file handle came from archive directory.
+         */
+        private boolean workDir;
+
+        /**
+         * @param fileIO I/O interface for read/write operations of FileHandle.
+         * @param idx Absolute WAL segment file index (incremental counter).
+         * @param ser Entry serializer.
+         * @param in File input.
+         */
+        ReadFileHandle(
+                FileIO fileIO,
+                long idx,
+                RecordSerializer ser,
+                FileInput in
+        ) {
+            super(fileIO, idx);
+
+            this.ser = ser;
+            this.in = in;
+        }
+
+        /**
+         * @throws IgniteCheckedException If failed to close the WAL segment 
file.
+         */
+        public void close() throws IgniteCheckedException {
+            try {
+                fileIO.close();
+            }
+            catch (IOException e) {
+                throw new IgniteCheckedException(e);
+            }
+        }
+
+        /** {@inheritDoc} */
+        @Override public long idx() {
+            return idx;
+        }
+
+        /** {@inheritDoc} */
+        @Override public FileInput in() {
+            return in;
+        }
+
+        /** {@inheritDoc} */
+        @Override public RecordSerializer ser() {
+            return ser;
+        }
+
+        /** {@inheritDoc} */
+        @Override public boolean workDir() {
+            return workDir;
+        }
+    }
+
+    /**
+     * File handle for one log segment.
+     */
+    @SuppressWarnings("SignalWithoutCorrespondingAwait")
+    private class FileWriteHandle extends FileHandle {
+        /** */
+        private final RecordSerializer serializer;
+
+        /** See {@link FileWriteAheadLogManager#maxWalSegmentSize} */
+        private final long maxSegmentSize;
+
+        /**
+         * Accumulated WAL records chain.
+         * This reference points to latest WAL record.
+         * When writing records chain is iterated from latest to oldest (see 
{@link WALRecord#previous()})
+         * Records from chain are saved into buffer in reverse order
+         */
+        private final AtomicReference<WALRecord> head = new 
AtomicReference<>();
+
+        /**
+         * Position in current file after the end of last written record 
(incremented after file channel write
+         * operation)
+         */
+        private volatile long written;
+
+        /** */
+        private volatile long lastFsyncPos;
+
+        /** Stop guard to provide warranty that only one thread will be 
successful in calling {@link #close(boolean)}*/
+        private final AtomicBoolean stop = new AtomicBoolean(false);
+
+        /** */
+        private final Lock lock = new ReentrantLock();
+
+        /** Condition activated each time writeBuffer() completes. Used to 
wait previously flushed write to complete */
+        private final Condition writeComplete = lock.newCondition();
+
+        /** Condition for timed wait of several threads, see {@link 
DataStorageConfiguration#getWalFsyncDelayNanos()} */
+        private final Condition fsync = lock.newCondition();
+
+        /**
+         * Next segment available condition.
+         * Protection from "spurious wakeup" is provided by predicate {@link 
#fileIO}=<code>null</code>
+         */
+        private final Condition nextSegment = lock.newCondition();
+
+        /**
+         * @param fileIO I/O file interface to use
+         * @param idx Absolute WAL segment file index for easy access.
+         * @param pos Position.
+         * @param maxSegmentSize Max segment size.
+         * @param serializer Serializer.
+         * @throws IOException If failed.
+         */
+        private FileWriteHandle(
+            FileIO fileIO,
+            long idx,
+            long pos,
+            long maxSegmentSize,
+            RecordSerializer serializer
+        ) throws IOException {
+            super(fileIO, idx);
+
+            assert serializer != null;
+
+            fileIO.position(pos);
+
+            this.maxSegmentSize = maxSegmentSize;
+            this.serializer = serializer;
+
+            head.set(new FakeRecord(new FileWALPointer(idx, (int)pos, 0), 
false));
+            written = pos;
+            lastFsyncPos = pos;
+        }
+
+        /**
+         * Write serializer version to current handle.
+         * NOTE: Method mutates {@code fileIO} position, written and 
lastFsyncPos fields.
+         *
+         * @throws IgniteCheckedException If fail to write serializer version.
+         */
+        public void writeSerializerVersion() throws IgniteCheckedException {
+            try {
+                assert fileIO.position() == 0 : "Serializer version can be 
written only at the begin of file " +
+                    fileIO.position();
+
+                long updatedPosition = 
FsyncModeFileWriteAheadLogManager.writeSerializerVersion(fileIO, idx,
+                    serializer.version(), mode);
+
+                written = updatedPosition;
+                lastFsyncPos = updatedPosition;
+                head.set(new FakeRecord(new FileWALPointer(idx, 
(int)updatedPosition, 0), false));
+            }
+            catch (IOException e) {
+                throw new IgniteCheckedException("Unable to write serializer 
version for segment " + idx, e);
+            }
+        }
+
+        /**
+         * Checks if current head is a close fake record and returns {@code 
true} if so.
+         *
+         * @return {@code true} if current head is close record.
+         */
+        private boolean stopped() {
+            return stopped(head.get());
+        }
+
+        /**
+         * @param record Record to check.
+         * @return {@code true} if the record is fake close record.
+         */
+        private boolean stopped(WALRecord record) {
+            return record instanceof FakeRecord && ((FakeRecord)record).stop;
+        }
+
+        /**
+         * @param rec Record to be added to record chain as new {@link #head}
+         * @return Pointer or null if roll over to next segment is required or 
already started by other thread.
+         * @throws StorageException If failed.
+         * @throws IgniteCheckedException If failed.
+         */
+        @Nullable private WALPointer addRecord(WALRecord rec) throws 
StorageException, IgniteCheckedException {
+            assert rec.size() > 0 || rec.getClass() == FakeRecord.class;
+
+            boolean flushed = false;
+
+            for (; ; ) {
+                WALRecord h = head.get();
+
+                long nextPos = nextPosition(h);
+
+                if (nextPos + rec.size() >= maxSegmentSize || stopped(h)) {
+                    // Can not write to this segment, need to switch to the 
next one.
+                    return null;
+                }
+
+                int newChainSize = h.chainSize() + rec.size();
+
+                if (newChainSize > tlbSize && !flushed) {
+                    boolean res = h.previous() == null || flush(h, false);
+
+                    if (rec.size() > tlbSize)
+                        flushed = res;
+
+                    continue;
+                }
+
+                rec.chainSize(newChainSize);
+                rec.previous(h);
+
+                FileWALPointer ptr = new FileWALPointer(
+                    idx,
+                    (int)nextPos,
+                    rec.size());
+
+                rec.position(ptr);
+
+                if (head.compareAndSet(h, rec))
+                    return ptr;
+            }
+        }
+
+        /**
+         * @param rec Record.
+         * @return Position for the next record.
+         */
+        private long nextPosition(WALRecord rec) {
+            return recordOffset(rec) + rec.size();
+        }
+
+        /**
+         * Flush or wait for concurrent flush completion.
+         *
+         * @param ptr Pointer.
+         * @throws IgniteCheckedException If failed.
+         */
+        private void flushOrWait(FileWALPointer ptr, boolean stop) throws 
IgniteCheckedException {
+            long expWritten;
+
+            if (ptr != null) {
+                // If requested obsolete file index, it must be already 
flushed by close.
+                if (ptr.index() != idx)
+                    return;
+
+                expWritten = ptr.fileOffset();
+            }
+            else // We read head position before the flush because otherwise 
we can get wrong position.
+                expWritten = recordOffset(head.get());
+
+            if (flush(ptr, stop))
+                return;
+            else if (stop) {
+                FakeRecord fr = (FakeRecord)head.get();
+
+                assert fr.stop : "Invalid fake record on top of the queue: " + 
fr;
+
+                expWritten = recordOffset(fr);
+            }
+
+            // Spin-wait for a while before acquiring the lock.
+            for (int i = 0; i < 64; i++) {
+                if (written >= expWritten)
+                    return;
+            }
+
+            // If we did not flush ourselves then await for concurrent flush 
to complete.
+            lock.lock();
+
+            try {
+                while (written < expWritten && envFailed == null)
+                    U.awaitQuiet(writeComplete);
+            }
+            finally {
+                lock.unlock();
+            }
+        }
+
+        /**
+         * @param ptr Pointer.
+         * @return {@code true} If the flush really happened.
+         * @throws IgniteCheckedException If failed.
+         * @throws StorageException If failed.
+         */
+        private boolean flush(FileWALPointer ptr, boolean stop) throws 
IgniteCheckedException, StorageException {
+            if (ptr == null) { // Unconditional flush.
+                for (; ; ) {
+                    WALRecord expHead = head.get();
+
+                    if (expHead.previous() == null) {
+                        FakeRecord frHead = (FakeRecord)expHead;
+
+                        if (frHead.stop == stop || frHead.stop ||
+                            head.compareAndSet(expHead, new 
FakeRecord(frHead.position(), stop)))
+                            return false;
+                    }
+
+                    if (flush(expHead, stop))
+                        return true;
+                }
+            }
+
+            assert ptr.index() == idx;
+
+            for (; ; ) {
+                WALRecord h = head.get();
+
+                // If current chain begin position is greater than requested, 
then someone else flushed our changes.
+                if (chainBeginPosition(h) > ptr.fileOffset())
+                    return false;
+
+                if (flush(h, stop))
+                    return true; // We are lucky.
+            }
+        }
+
+        /**
+         * @param h Head of the chain.
+         * @return Chain begin position.
+         */
+        private long chainBeginPosition(WALRecord h) {
+            return recordOffset(h) + h.size() - h.chainSize();
+        }
+
+        /**
+         * @param expHead Expected head of chain. If head was changed, flush 
is not performed in this thread
+         * @throws IgniteCheckedException If failed.
+         * @throws StorageException If failed.
+         */
+        private boolean flush(WALRecord expHead, boolean stop) throws 
StorageException, IgniteCheckedException {
+            if (expHead.previous() == null) {
+                FakeRecord frHead = (FakeRecord)expHead;
+
+                if (!stop || frHead.stop) // Protects from CASing terminal 
FakeRecord(true) to FakeRecord(false)
+                    return false;
+            }
+
+            // Fail-fast before CAS.
+            checkNode();
+
+            if (!head.compareAndSet(expHead, new FakeRecord(new 
FileWALPointer(idx, (int)nextPosition(expHead), 0), stop)))
+                return false;
+
+            if (expHead.chainSize() == 0)
+                return false;
+
+            // At this point we grabbed the piece of WAL chain.
+            // Any failure in this code must invalidate the environment.
+            try {
+                // We can safely allow ot

<TRUNCATED>

Reply via email to