IGNITE-9909 Merge FsyncWalManager and WalManager - Fixes #5013.

Signed-off-by: Dmitriy Govorukhin <dmitriy.govoruk...@gmail.com>


Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/889ce79b
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/889ce79b
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/889ce79b

Branch: refs/heads/master
Commit: 889ce79bba0187891d334fedbc63235ff85fabe6
Parents: cf81e5a
Author: Anton Kalashnikov <kaa....@yandex.ru>
Authored: Fri Nov 9 15:05:18 2018 +0300
Committer: Dmitriy Govorukhin <dmitriy.govoruk...@gmail.com>
Committed: Fri Nov 9 15:05:18 2018 +0300

----------------------------------------------------------------------
 .../processors/cache/GridCacheProcessor.java    |   17 +-
 .../wal/FileWriteAheadLogManager.java           | 1017 +----
 .../wal/FsyncModeFileWriteAheadLogManager.java  | 3482 ------------------
 .../wal/filehandle/AbstractFileHandle.java      |   47 +
 .../wal/filehandle/FileHandleManager.java       |   81 +
 .../filehandle/FileHandleManagerFactory.java    |   90 +
 .../wal/filehandle/FileHandleManagerImpl.java   |  603 +++
 .../wal/filehandle/FileWriteHandle.java         |  113 +
 .../wal/filehandle/FileWriteHandleImpl.java     |  601 +++
 .../filehandle/FsyncFileHandleManagerImpl.java  |  157 +
 .../wal/filehandle/FsyncFileWriteHandle.java    |  845 +++++
 .../wal/serializer/RecordSerializerFactory.java |    2 +
 .../resources/META-INF/classnames.properties    |    5 -
 ...lFlushMultiNodeFailoverAbstractSelfTest.java |    9 +-
 .../wal/IgniteWalIteratorSwitchSegmentTest.java |   65 +-
 .../wal/memtracker/PageMemoryTracker.java       |   45 +-
 .../yardstick/IgniteAbstractBenchmark.java      |    7 +-
 17 files changed, 2615 insertions(+), 4571 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/ignite/blob/889ce79b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java
index 317378b..940e4a6 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java
@@ -62,7 +62,6 @@ import org.apache.ignite.configuration.IgniteConfiguration;
 import org.apache.ignite.configuration.MemoryConfiguration;
 import org.apache.ignite.configuration.NearCacheConfiguration;
 import org.apache.ignite.configuration.TransactionConfiguration;
-import org.apache.ignite.configuration.WALMode;
 import org.apache.ignite.events.EventType;
 import org.apache.ignite.internal.GridKernalContext;
 import org.apache.ignite.internal.IgniteClientDisconnectedCheckedException;
@@ -111,7 +110,6 @@ import 
org.apache.ignite.internal.processors.cache.persistence.snapshot.IgniteCa
 import 
org.apache.ignite.internal.processors.cache.persistence.snapshot.SnapshotDiscoveryMessage;
 import 
org.apache.ignite.internal.processors.cache.persistence.tree.reuse.ReuseList;
 import 
org.apache.ignite.internal.processors.cache.persistence.wal.FileWriteAheadLogManager;
-import 
org.apache.ignite.internal.processors.cache.persistence.wal.FsyncModeFileWriteAheadLogManager;
 import 
org.apache.ignite.internal.processors.cache.query.GridCacheDistributedQueryManager;
 import 
org.apache.ignite.internal.processors.cache.query.GridCacheLocalQueryManager;
 import org.apache.ignite.internal.processors.cache.query.GridCacheQueryManager;
@@ -214,9 +212,6 @@ public class GridCacheProcessor extends 
GridProcessorAdapter {
     private final boolean startClientCaches =
         
IgniteSystemProperties.getBoolean(IgniteSystemProperties.IGNITE_START_CACHES_ON_JOIN,
 false);
 
-    private final boolean walFsyncWithDedicatedWorker =
-        
IgniteSystemProperties.getBoolean(IgniteSystemProperties.IGNITE_WAL_FSYNC_WITH_DEDICATED_WORKER,
 false);
-
     /** Enables start caches in parallel. */
     private final boolean IGNITE_ALLOW_START_CACHES_IN_PARALLEL =
         
IgniteSystemProperties.getBoolean(IgniteSystemProperties.IGNITE_ALLOW_START_CACHES_IN_PARALLEL,
 true);
@@ -3041,13 +3036,8 @@ public class GridCacheProcessor extends 
GridProcessorAdapter {
 
             walMgr = 
ctx.plugins().createComponent(IgniteWriteAheadLogManager.class);
 
-            if (walMgr == null) {
-                if (ctx.config().getDataStorageConfiguration().getWalMode() == 
WALMode.FSYNC &&
-                    !walFsyncWithDedicatedWorker)
-                    walMgr = new FsyncModeFileWriteAheadLogManager(ctx);
-                else
-                    walMgr = new FileWriteAheadLogManager(ctx);
-            }
+            if (walMgr == null)
+                walMgr = new FileWriteAheadLogManager(ctx);
         }
         else {
             if (CU.isPersistenceEnabled(ctx.config()) && ctx.clientNode()) {
@@ -3531,7 +3521,6 @@ public class GridCacheProcessor extends 
GridProcessorAdapter {
      * @param checkThreadTx If {@code true} checks that current thread does 
not have active transactions.
      * @return Future that will be completed when cache is deployed.
      */
-    @SuppressWarnings("IfMayBeConditional")
     public IgniteInternalFuture<Boolean> dynamicStartCache(
         @Nullable CacheConfiguration ccfg,
         String cacheName,
@@ -3555,7 +3544,6 @@ public class GridCacheProcessor extends 
GridProcessorAdapter {
      *
      * @param ccfg Cache configuration.
      */
-    @SuppressWarnings("IfMayBeConditional")
     public IgniteInternalFuture<Boolean> dynamicStartSqlCache(
         CacheConfiguration ccfg
     ) {
@@ -3584,7 +3572,6 @@ public class GridCacheProcessor extends 
GridProcessorAdapter {
      * @param checkThreadTx If {@code true} checks that current thread does 
not have active transactions.
      * @return Future that will be completed when cache is deployed.
      */
-    @SuppressWarnings("IfMayBeConditional")
     public IgniteInternalFuture<Boolean> dynamicStartCache(
         @Nullable CacheConfiguration ccfg,
         String cacheName,

http://git-wip-us.apache.org/repos/asf/ignite/blob/889ce79b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/FileWriteAheadLogManager.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/FileWriteAheadLogManager.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/FileWriteAheadLogManager.java
index fbc4f6c..0bd20bc 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/FileWriteAheadLogManager.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/FileWriteAheadLogManager.java
@@ -26,12 +26,8 @@ import java.io.FileInputStream;
 import java.io.FileNotFoundException;
 import java.io.FileOutputStream;
 import java.io.IOException;
-import java.lang.reflect.Field;
-import java.lang.reflect.InvocationTargetException;
-import java.lang.reflect.Method;
 import java.nio.ByteBuffer;
 import java.nio.ByteOrder;
-import java.nio.MappedByteBuffer;
 import java.nio.channels.ClosedByInterruptException;
 import java.nio.file.FileAlreadyExistsException;
 import java.nio.file.Files;
@@ -46,17 +42,9 @@ import java.util.Map;
 import java.util.Objects;
 import java.util.Set;
 import java.util.TreeSet;
-import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.PriorityBlockingQueue;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.AtomicLong;
-import java.util.concurrent.atomic.AtomicLongFieldUpdater;
 import java.util.concurrent.atomic.AtomicReferenceFieldUpdater;
-import java.util.concurrent.locks.Condition;
-import java.util.concurrent.locks.Lock;
-import java.util.concurrent.locks.LockSupport;
-import java.util.concurrent.locks.ReentrantLock;
 import java.util.regex.Pattern;
 import java.util.stream.Stream;
 import java.util.zip.ZipEntry;
@@ -79,7 +67,6 @@ import 
org.apache.ignite.internal.managers.eventstorage.GridEventStorageManager;
 import org.apache.ignite.internal.pagemem.wal.IgniteWriteAheadLogManager;
 import org.apache.ignite.internal.pagemem.wal.WALIterator;
 import org.apache.ignite.internal.pagemem.wal.WALPointer;
-import org.apache.ignite.internal.pagemem.wal.record.CheckpointRecord;
 import org.apache.ignite.internal.pagemem.wal.record.MarshalledRecord;
 import org.apache.ignite.internal.pagemem.wal.record.MemoryRecoveryRecord;
 import org.apache.ignite.internal.pagemem.wal.record.PageSnapshot;
@@ -101,6 +88,10 @@ import 
org.apache.ignite.internal.processors.cache.persistence.filename.PdsFolde
 import 
org.apache.ignite.internal.processors.cache.persistence.wal.aware.SegmentAware;
 import org.apache.ignite.internal.processors.cache.persistence.wal.crc.FastCrc;
 import 
org.apache.ignite.internal.processors.cache.persistence.wal.crc.IgniteDataIntegrityViolationException;
+import 
org.apache.ignite.internal.processors.cache.persistence.wal.filehandle.AbstractFileHandle;
+import 
org.apache.ignite.internal.processors.cache.persistence.wal.filehandle.FileHandleManagerFactory;
+import 
org.apache.ignite.internal.processors.cache.persistence.wal.filehandle.FileHandleManager;
+import 
org.apache.ignite.internal.processors.cache.persistence.wal.filehandle.FileWriteHandle;
 import 
org.apache.ignite.internal.processors.cache.persistence.wal.io.FileInput;
 import 
org.apache.ignite.internal.processors.cache.persistence.wal.io.LockedSegmentFileInputFactory;
 import 
org.apache.ignite.internal.processors.cache.persistence.wal.io.SegmentFileInputFactory;
@@ -114,7 +105,6 @@ import 
org.apache.ignite.internal.processors.cache.persistence.wal.serializer.Re
 import org.apache.ignite.internal.processors.failure.FailureProcessor;
 import org.apache.ignite.internal.processors.timeout.GridTimeoutObject;
 import org.apache.ignite.internal.processors.timeout.GridTimeoutProcessor;
-import org.apache.ignite.internal.util.GridUnsafe;
 import org.apache.ignite.internal.util.future.GridFinishedFuture;
 import org.apache.ignite.internal.util.future.GridFutureAdapter;
 import org.apache.ignite.internal.util.typedef.CI1;
@@ -139,48 +129,20 @@ import static 
org.apache.ignite.IgniteSystemProperties.IGNITE_CHECKPOINT_TRIGGER
 import static 
org.apache.ignite.IgniteSystemProperties.IGNITE_THRESHOLD_WAL_ARCHIVE_SIZE_PERCENTAGE;
 import static 
org.apache.ignite.IgniteSystemProperties.IGNITE_WAL_COMPRESSOR_WORKER_THREAD_CNT;
 import static org.apache.ignite.IgniteSystemProperties.IGNITE_WAL_MMAP;
-import static 
org.apache.ignite.IgniteSystemProperties.IGNITE_WAL_SEGMENT_SYNC_TIMEOUT;
 import static 
org.apache.ignite.IgniteSystemProperties.IGNITE_WAL_SERIALIZER_VERSION;
-import static org.apache.ignite.configuration.WALMode.LOG_ONLY;
 import static org.apache.ignite.events.EventType.EVT_WAL_SEGMENT_ARCHIVED;
 import static org.apache.ignite.events.EventType.EVT_WAL_SEGMENT_COMPACTED;
 import static org.apache.ignite.failure.FailureType.CRITICAL_ERROR;
 import static org.apache.ignite.failure.FailureType.SYSTEM_WORKER_TERMINATION;
-import static 
org.apache.ignite.internal.pagemem.wal.record.WALRecord.RecordType.SWITCH_SEGMENT_RECORD;
-import static 
org.apache.ignite.internal.processors.cache.persistence.wal.SegmentedRingByteBuffer.BufferMode.DIRECT;
+import static 
org.apache.ignite.internal.processors.cache.persistence.wal.serializer.RecordSerializerFactory.LATEST_SERIALIZER_VERSION;
 import static 
org.apache.ignite.internal.processors.cache.persistence.wal.serializer.RecordV1Serializer.HEADER_RECORD_SIZE;
 import static 
org.apache.ignite.internal.processors.cache.persistence.wal.serializer.RecordV1Serializer.readSegmentHeader;
-import static org.apache.ignite.internal.util.IgniteUtils.findField;
-import static org.apache.ignite.internal.util.IgniteUtils.findNonPublicMethod;
-import static org.apache.ignite.internal.util.IgniteUtils.sleep;
 
 /**
  * File WAL manager.
  */
 @SuppressWarnings("IfMayBeConditional")
 public class FileWriteAheadLogManager extends GridCacheSharedManagerAdapter 
implements IgniteWriteAheadLogManager {
-    /** Dfault wal segment sync timeout. */
-    public static final long DFLT_WAL_SEGMENT_SYNC_TIMEOUT = 500L;
-    /** {@link MappedByteBuffer#force0(java.io.FileDescriptor, long, long)}. */
-    private static final Method force0 = findNonPublicMethod(
-        MappedByteBuffer.class, "force0",
-        java.io.FileDescriptor.class, long.class, long.class
-    );
-
-    /** {@link MappedByteBuffer#mappingOffset()}. */
-    private static final Method mappingOffset = 
findNonPublicMethod(MappedByteBuffer.class, "mappingOffset");
-
-    /** {@link MappedByteBuffer#mappingAddress(long)}. */
-    private static final Method mappingAddress = findNonPublicMethod(
-        MappedByteBuffer.class, "mappingAddress", long.class
-    );
-
-    /** {@link MappedByteBuffer#fd} */
-    private static final Field fd = findField(MappedByteBuffer.class, "fd");
-
-    /** Page size. */
-    private static final int PAGE_SIZE = GridUnsafe.pageSize();
-
     /** */
     private static final FileDescriptor[] EMPTY_DESCRIPTORS = new 
FileDescriptor[0];
 
@@ -235,19 +197,12 @@ public class FileWriteAheadLogManager extends 
GridCacheSharedManagerAdapter impl
         }
     };
 
-    /** Latest serializer version to use. */
-    private static final int LATEST_SERIALIZER_VERSION = 2;
-
     /** Buffer size. */
     private static final int BUF_SIZE = 1024 * 1024;
 
     /** Use mapped byte buffer. */
     private final boolean mmap = 
IgniteSystemProperties.getBoolean(IGNITE_WAL_MMAP, true);
 
-    /** {@link FileWriteHandle#written} atomic field updater. */
-    private static final AtomicLongFieldUpdater<FileWriteHandle> WRITTEN_UPD =
-        AtomicLongFieldUpdater.newUpdater(FileWriteHandle.class, "written");
-
     /**
      * Percentage of archive size for checkpoint trigger. Need for calculate 
max size of WAL after last checkpoint.
      * Checkpoint should be triggered when max size of WAL after last 
checkpoint more than maxWallArchiveSize * thisValue
@@ -288,9 +243,6 @@ public class FileWriteAheadLogManager extends 
GridCacheSharedManagerAdapter impl
     /** WAL flush frequency. Makes sense only for {@link WALMode#BACKGROUND} 
log WALMode. */
     private final long flushFreq;
 
-    /** Fsync delay. */
-    private final long fsyncDelay;
-
     /** */
     private final DataStorageConfiguration dsCfg;
 
@@ -347,9 +299,12 @@ public class FileWriteAheadLogManager extends 
GridCacheSharedManagerAdapter impl
     /** */
     private final ThreadLocal<WALPointer> lastWALPtr = new ThreadLocal<>();
 
-    /** Current log segment handle */
+    /** Current log segment handle. */
     private volatile FileWriteHandle currHnd;
 
+    /** File handle manager. */
+    private FileHandleManager fileHandleManager;
+
     /** */
     private volatile WALDisableContext walDisableContext;
 
@@ -378,17 +333,11 @@ public class FileWriteAheadLogManager extends 
GridCacheSharedManagerAdapter impl
      */
     @Nullable private volatile GridTimeoutObject nextAutoArchiveTimeoutObj;
 
-    /** WAL writer worker. */
-    private WALWriter walWriter;
-
     /**
      * Listener invoked for each segment file IO initializer.
      */
     @Nullable private volatile IgniteInClosure<FileIO> createWalFileListener;
 
-    /** Wal segment sync worker. */
-    private WalSegmentSyncer walSegmentSyncWorker;
-
     /**
      * Manage of segment location.
      */
@@ -397,6 +346,8 @@ public class FileWriteAheadLogManager extends 
GridCacheSharedManagerAdapter impl
     /** Segment factory with ability locked segment during reading. */
     private SegmentFileInputFactory lockedSegmentFileInputFactory;
 
+    private FileHandleManagerFactory fileHandleManagerFactory;
+
     /**
      * @param ctx Kernal context.
      */
@@ -412,9 +363,8 @@ public class FileWriteAheadLogManager extends 
GridCacheSharedManagerAdapter impl
         maxWalSegmentSize = dsCfg.getWalSegmentSize();
         mode = dsCfg.getWalMode();
         flushFreq = dsCfg.getWalFlushFrequency();
-        fsyncDelay = dsCfg.getWalFsyncDelayNanos();
         alwaysWriteFullPages = dsCfg.isAlwaysWriteFullPages();
-        ioFactory = new RandomAccessFileIOFactory();
+        ioFactory = mode == WALMode.FSYNC ? dsCfg.getFileIOFactory() : new 
RandomAccessFileIOFactory();
         segmentFileInputFactory = new SimpleSegmentFileInputFactory();
         walAutoArchiveAfterInactivity = 
dsCfg.getWalAutoArchiveAfterInactivity();
 
@@ -422,6 +372,8 @@ public class FileWriteAheadLogManager extends 
GridCacheSharedManagerAdapter impl
 
         evt = ctx.event();
         failureProcessor = ctx.failure();
+
+        fileHandleManagerFactory = new FileHandleManagerFactory(dsCfg);
     }
 
     /**
@@ -512,16 +464,11 @@ public class FileWriteAheadLogManager extends 
GridCacheSharedManagerAdapter impl
 
             walDisableContext = cctx.walState().walDisableContext();
 
-            if (mode != WALMode.NONE && mode != WALMode.FSYNC) {
-                walSegmentSyncWorker = new 
WalSegmentSyncer(igCfg.getIgniteInstanceName(),
-                    cctx.kernalContext().log(WalSegmentSyncer.class));
+            fileHandleManager = fileHandleManagerFactory.build(
+                cctx, metrics, mmap, lastWALPtr::get, serializer, 
this::currentHandle
+            );
 
-                if (log.isInfoEnabled())
-                    log.info("Started write-ahead log manager [mode=" + mode + 
']');
-            }
-            else
-                U.quietAndWarn(log, "Started write-ahead log manager in NONE 
mode, persisted data may be lost in " +
-                    "a case of unexpected node failure. Make sure to 
deactivate the cluster before shutdown.");
+            fileHandleManager.start();
 
             lockedSegmentFileInputFactory = new LockedSegmentFileInputFactory(
                 segmentAware,
@@ -541,8 +488,7 @@ public class FileWriteAheadLogManager extends 
GridCacheSharedManagerAdapter impl
             new IgniteThread(archiver).start();
         }
 
-        if (walSegmentSyncWorker != null)
-            new IgniteThread(walSegmentSyncWorker).start();
+        fileHandleManager.onActivate();
 
         if (compressor != null)
             compressor.start();
@@ -625,22 +571,8 @@ public class FileWriteAheadLogManager extends 
GridCacheSharedManagerAdapter impl
         if (timeoutObj != null)
             cctx.time().removeTimeoutObject(timeoutObj);
 
-        final FileWriteHandle currHnd = currentHandle();
-
         try {
-            if (mode == WALMode.BACKGROUND) {
-                if (currHnd != null)
-                    currHnd.flush(null);
-            }
-
-            if (currHnd != null)
-                currHnd.close(false);
-
-            if (walSegmentSyncWorker != null)
-                walSegmentSyncWorker.shutdown();
-
-            if (walWriter != null)
-                walWriter.shutdown();
+            fileHandleManager.onDeactivate();
 
             segmentAware.interrupt();
 
@@ -660,7 +592,7 @@ public class FileWriteAheadLogManager extends 
GridCacheSharedManagerAdapter impl
             }
         }
         catch (Exception e) {
-            U.error(log, "Failed to gracefully close WAL segment: " + 
this.currHnd.fileIO, e);
+            U.error(log, "Failed to gracefully close WAL segment: " + 
this.currHnd, e);
         }
     }
 
@@ -706,10 +638,7 @@ public class FileWriteAheadLogManager extends 
GridCacheSharedManagerAdapter impl
 
         FileWALPointer filePtr = (FileWALPointer)lastPtr;
 
-        walWriter = new WALWriter(log);
-
-        if (!mmap)
-            new IgniteThread(walWriter).start();
+        fileHandleManager.resumeLogging();
 
         currHnd = restoreWriteHandle(filePtr);
 
@@ -717,16 +646,16 @@ public class FileWriteAheadLogManager extends 
GridCacheSharedManagerAdapter impl
         if (filePtr == null)
             currHnd.writeHeader();
 
-        if (currHnd.serializer.version() != serializer.version()) {
+        if (currHnd.serializerVersion() != serializer.version()) {
             if (log.isInfoEnabled())
                 log.info("Record serializer version change detected, will 
start logging with a new WAL record " +
                     "serializer to a new WAL segment [curFile=" + currHnd + ", 
newVer=" + serializer.version() +
-                    ", oldVer=" + currHnd.serializer.version() + ']');
+                    ", oldVer=" + currHnd.serializerVersion() + ']');
 
             rollOver(currHnd, null);
         }
 
-        currHnd.resume = false;
+        currHnd.finishResumeLogging();
 
         if (mode == WALMode.BACKGROUND) {
             backgroundFlushSchedule = cctx.time().schedule(new Runnable() {
@@ -803,9 +732,7 @@ public class FileWriteAheadLogManager extends 
GridCacheSharedManagerAdapter impl
         final FileWriteHandle handle = currentHandle();
 
         try {
-            handle.buf.close();
-
-            rollOver(handle, null);
+            closeBufAndRollover(handle, null, RolloverType.NONE);
         }
         catch (IgniteCheckedException e) {
             U.error(log, "Unable to perform segment rollover: " + 
e.getMessage(), e);
@@ -895,7 +822,7 @@ public class FileWriteAheadLogManager extends 
GridCacheSharedManagerAdapter impl
     ) throws IgniteCheckedException {
         long idx = currWriteHandle.getSegmentId();
 
-        currWriteHandle.buf.close();
+        currWriteHandle.closeBuffer();
 
         FileWriteHandle res = rollOver(currWriteHandle, rolloverType == 
RolloverType.NEXT_SEGMENT ? rec : null);
 
@@ -907,28 +834,7 @@ public class FileWriteAheadLogManager extends 
GridCacheSharedManagerAdapter impl
 
     /** {@inheritDoc} */
     @Override public void flush(WALPointer ptr, boolean explicitFsync) throws 
IgniteCheckedException, StorageException {
-        if (serializer == null || mode == WALMode.NONE)
-            return;
-
-        FileWriteHandle cur = currentHandle();
-
-        // WAL manager was not started (client node).
-        if (cur == null)
-            return;
-
-        FileWALPointer filePtr = (FileWALPointer)(ptr == null ? 
lastWALPtr.get() : ptr);
-
-        if (mode == LOG_ONLY)
-            cur.flushOrWait(filePtr);
-
-        if (!explicitFsync && mode != WALMode.FSYNC)
-            return; // No need to sync in LOG_ONLY or BACKGROUND unless 
explicit fsync is required.
-
-        // No need to sync if was rolled over.
-        if (filePtr != null && !cur.needFsync(filePtr))
-            return;
-
-        cur.fsync(filePtr);
+        fileHandleManager.flush(ptr, explicitFsync);
     }
 
     /** {@inheritDoc} */
@@ -1330,25 +1236,7 @@ public class FileWriteAheadLogManager extends 
GridCacheSharedManagerAdapter impl
                     log.info("Resuming logging to WAL segment [file=" + 
curFile.getAbsolutePath() +
                         ", offset=" + off + ", ver=" + serVer + ']');
 
-                SegmentedRingByteBuffer rbuf;
-
-                if (mmap) {
-                    MappedByteBuffer buf = fileIO.map((int)maxWalSegmentSize);
-
-                    rbuf = new SegmentedRingByteBuffer(buf, metrics);
-                }
-                else
-                    rbuf = new 
SegmentedRingByteBuffer(dsCfg.getWalBufferSize(), maxWalSegmentSize, DIRECT, 
metrics);
-
-                if (lastReadPtr != null)
-                    rbuf.init(lastReadPtr.fileOffset() + lastReadPtr.length());
-
-                FileWriteHandle hnd = new FileWriteHandle(
-                    fileIO,
-                    off + len,
-                    true,
-                    ser,
-                    rbuf);
+                FileWriteHandle hnd = fileHandleManager.initHandle(fileIO, off 
+ len, ser);
 
                 if (archiver0 != null)
                     segmentAware.curAbsWalIdx(absIdx);
@@ -1393,8 +1281,6 @@ public class FileWriteAheadLogManager extends 
GridCacheSharedManagerAdapter impl
             if (log.isDebugEnabled())
                 log.debug("Switching to a new WAL segment: " + 
nextFile.getAbsolutePath());
 
-            SegmentedRingByteBuffer rbuf = null;
-
             SegmentIO fileIO = null;
 
             FileWriteHandle hnd;
@@ -1409,20 +1295,8 @@ public class FileWriteAheadLogManager extends 
GridCacheSharedManagerAdapter impl
                     if (lsnr != null)
                         lsnr.apply(fileIO);
 
-                    if (mmap) {
-                        MappedByteBuffer buf = 
fileIO.map((int)maxWalSegmentSize);
 
-                        rbuf = new SegmentedRingByteBuffer(buf, metrics);
-                    }
-                    else
-                        rbuf = cur.buf.reset();
-
-                    hnd = new FileWriteHandle(
-                        fileIO,
-                        0,
-                        false,
-                        serializer,
-                        rbuf);
+                    hnd = fileHandleManager.nextHandle(fileIO, serializer);
 
                     if (interrupted)
                         Thread.currentThread().interrupt();
@@ -1444,12 +1318,6 @@ public class FileWriteAheadLogManager extends 
GridCacheSharedManagerAdapter impl
 
                         fileIO = null;
                     }
-
-                    if (rbuf != null) {
-                        rbuf.free();
-
-                        rbuf = null;
-                    }
                 }
             }
 
@@ -2416,7 +2284,7 @@ public class FileWriteAheadLogManager extends 
GridCacheSharedManagerAdapter impl
      * @param ver Version.
      * @param compacted Compacted flag.
      */
-    @NotNull private static ByteBuffer prepareSerializerVersionBuffer(long 
idx, int ver, boolean compacted, ByteBuffer buf) {
+    @NotNull public static ByteBuffer prepareSerializerVersionBuffer(long idx, 
int ver, boolean compacted, ByteBuffer buf) {
         // Write record type.
         buf.put((byte) (WALRecord.RecordType.HEADER_RECORD.ordinal() + 1));
 
@@ -2452,33 +2320,7 @@ public class FileWriteAheadLogManager extends 
GridCacheSharedManagerAdapter impl
     /**
      *
      */
-    private abstract static class FileHandle {
-        /** I/O interface for read/write operations with file */
-        SegmentIO fileIO;
-
-        /** Segment idx corresponded to fileIo*/
-        final long segmentIdx;
-
-        /**
-         * @param fileIO I/O interface for read/write operations of FileHandle.
-         */
-        private FileHandle(@NotNull SegmentIO fileIO) {
-            this.fileIO = fileIO;
-            segmentIdx = fileIO.getSegmentId();
-        }
-
-        /**
-         * @return Absolute WAL segment file index (incremental counter).
-         */
-        public long getSegmentId(){
-            return segmentIdx;
-        }
-    }
-
-    /**
-     *
-     */
-    public static class ReadFileHandle extends FileHandle implements 
AbstractWalRecordsIterator.AbstractReadFileHandle {
+    public static class ReadFileHandle extends AbstractFileHandle implements 
AbstractWalRecordsIterator.AbstractReadFileHandle {
         /** Entry serializer. */
         RecordSerializer ser;
 
@@ -2489,7 +2331,7 @@ public class FileWriteAheadLogManager extends 
GridCacheSharedManagerAdapter impl
         private final SegmentAware segmentAware;
 
         /**
-         * @param fileIO I/O interface for read/write operations of FileHandle.
+         * @param fileIO I/O interface for read/write operations of 
AbstractFileHandle.
          * @param ser Entry serializer.
          * @param in File input.
          * @param aware Segment aware.
@@ -2542,459 +2384,6 @@ public class FileWriteAheadLogManager extends 
GridCacheSharedManagerAdapter impl
     }
 
     /**
-     * File handle for one log segment.
-     */
-    @SuppressWarnings("SignalWithoutCorrespondingAwait")
-    private class FileWriteHandle extends FileHandle {
-        /** */
-        private final RecordSerializer serializer;
-
-        /** Created on resume logging. */
-        private volatile boolean resume;
-
-        /**
-         * Position in current file after the end of last written record 
(incremented after file channel write
-         * operation)
-         */
-        volatile long written;
-
-        /** */
-        private volatile long lastFsyncPos;
-
-        /** Stop guard to provide warranty that only one thread will be 
successful in calling {@link #close(boolean)} */
-        private final AtomicBoolean stop = new AtomicBoolean(false);
-
-        /** */
-        private final Lock lock = new ReentrantLock();
-
-        /** Condition for timed wait of several threads, see {@link 
DataStorageConfiguration#getWalFsyncDelayNanos()} */
-        private final Condition fsync = lock.newCondition();
-
-        /**
-         * Next segment available condition. Protection from "spurious wakeup" 
is provided by predicate {@link
-         * #fileIO}=<code>null</code>
-         */
-        private final Condition nextSegment = lock.newCondition();
-
-        /** Buffer. */
-        private final SegmentedRingByteBuffer buf;
-
-        /**
-         * @param fileIO I/O file interface to use
-         * @param pos Position.
-         * @param resume Created on resume logging flag.
-         * @param serializer Serializer.
-         * @param buf Buffer.
-         * @throws IOException If failed.
-         */
-        private FileWriteHandle(
-            SegmentIO fileIO,
-            long pos,
-            boolean resume,
-            RecordSerializer serializer,
-            SegmentedRingByteBuffer buf
-        ) throws IOException {
-            super(fileIO);
-
-            assert serializer != null;
-
-            if (!mmap)
-                fileIO.position(pos);
-
-            this.serializer = serializer;
-
-            written = pos;
-            lastFsyncPos = pos;
-            this.resume = resume;
-            this.buf = buf;
-        }
-
-        /**
-         * Write serializer version to current handle.
-         */
-        public void writeHeader() {
-            SegmentedRingByteBuffer.WriteSegment seg = 
buf.offer(HEADER_RECORD_SIZE);
-
-            assert seg != null && seg.position() > 0;
-
-            prepareSerializerVersionBuffer(getSegmentId(), 
serializerVersion(), false, seg.buffer());
-
-            seg.release();
-        }
-
-        /**
-         * @param rec Record to be added to write queue.
-         * @return Pointer or null if roll over to next segment is required or 
already started by other thread.
-         * @throws StorageException If failed.
-         * @throws IgniteCheckedException If failed.
-         */
-        @Nullable private WALPointer addRecord(WALRecord rec) throws 
StorageException, IgniteCheckedException {
-            assert rec.size() > 0 : rec;
-
-            for (;;) {
-                checkNode();
-
-                SegmentedRingByteBuffer.WriteSegment seg;
-
-                // Buffer can be in open state in case of resuming with 
different serializer version.
-                if (rec.type() == SWITCH_SEGMENT_RECORD && !currHnd.resume)
-                    seg = buf.offerSafe(rec.size());
-                else
-                    seg = buf.offer(rec.size());
-
-                FileWALPointer ptr = null;
-
-                if (seg != null) {
-                    try {
-                        int pos = (int)(seg.position() - rec.size());
-
-                        ByteBuffer buf = seg.buffer();
-
-                        if (buf == null)
-                            return null; // Can not write to this segment, 
need to switch to the next one.
-
-                        ptr = new FileWALPointer(getSegmentId(), pos, 
rec.size());
-
-                        rec.position(ptr);
-
-                        fillBuffer(buf, rec);
-
-                        if (mmap) {
-                            // written field must grow only, but segment with 
greater position can be serialized
-                            // earlier than segment with smaller position.
-                            while (true) {
-                                long written0 = written;
-
-                                if (seg.position() > written0) {
-                                    if (WRITTEN_UPD.compareAndSet(this, 
written0, seg.position()))
-                                        break;
-                                }
-                                else
-                                    break;
-                            }
-                        }
-
-                        return ptr;
-                    }
-                    finally {
-                        seg.release();
-
-                        if (mode == WALMode.BACKGROUND && rec instanceof 
CheckpointRecord)
-                            flushOrWait(ptr);
-                    }
-                }
-                else
-                    walWriter.flushAll();
-            }
-        }
-
-        /**
-         * Flush or wait for concurrent flush completion.
-         *
-         * @param ptr Pointer.
-         */
-        private void flushOrWait(FileWALPointer ptr) throws 
IgniteCheckedException {
-            if (ptr != null) {
-                // If requested obsolete file index, it must be already 
flushed by close.
-                if (ptr.index() != getSegmentId())
-                    return;
-            }
-
-            flush(ptr);
-        }
-
-        /**
-         * @param ptr Pointer.
-         */
-        private void flush(FileWALPointer ptr) throws IgniteCheckedException {
-            if (ptr == null) { // Unconditional flush.
-                walWriter.flushAll();
-
-                return;
-            }
-
-            assert ptr.index() == getSegmentId();
-
-            walWriter.flushBuffer(ptr.fileOffset());
-        }
-
-        /**
-         * @param buf Buffer.
-         * @param rec WAL record.
-         * @throws IgniteCheckedException If failed.
-         */
-        private void fillBuffer(ByteBuffer buf, WALRecord rec) throws 
IgniteCheckedException {
-            try {
-                serializer.writeRecord(rec, buf);
-            }
-            catch (RuntimeException e) {
-                throw new IllegalStateException("Failed to write record: " + 
rec, e);
-            }
-        }
-
-        /**
-         * Non-blocking check if this pointer needs to be sync'ed.
-         *
-         * @param ptr WAL pointer to check.
-         * @return {@code False} if this pointer has been already sync'ed.
-         */
-        private boolean needFsync(FileWALPointer ptr) {
-            // If index has changed, it means that the log was rolled over and 
already sync'ed.
-            // If requested position is smaller than last sync'ed, it also 
means all is good.
-            // If position is equal, then our record is the last not synced.
-            return getSegmentId() == ptr.index() && lastFsyncPos <= 
ptr.fileOffset();
-        }
-
-        /**
-         * @return Pointer to the end of the last written record (probably not 
fsync-ed).
-         */
-        private FileWALPointer position() {
-            lock.lock();
-
-            try {
-                return new FileWALPointer(getSegmentId(), (int)written, 0);
-            }
-            finally {
-                lock.unlock();
-            }
-        }
-
-        /**
-         * @param ptr Pointer to sync.
-         * @throws StorageException If failed.
-         */
-        private void fsync(FileWALPointer ptr) throws StorageException, 
IgniteCheckedException {
-            lock.lock();
-
-            try {
-                if (ptr != null) {
-                    if (!needFsync(ptr))
-                        return;
-
-                    if (fsyncDelay > 0 && !stop.get()) {
-                        // Delay fsync to collect as many updates as possible: 
trade latency for throughput.
-                        U.await(fsync, fsyncDelay, TimeUnit.NANOSECONDS);
-
-                        if (!needFsync(ptr))
-                            return;
-                    }
-                }
-
-                flushOrWait(ptr);
-
-                if (stop.get())
-                    return;
-
-                long lastFsyncPos0 = lastFsyncPos;
-                long written0 = written;
-
-                if (lastFsyncPos0 != written0) {
-                    // Fsync position must be behind.
-                    assert lastFsyncPos0 < written0 : "lastFsyncPos=" + 
lastFsyncPos0 + ", written=" + written0;
-
-                    boolean metricsEnabled = metrics.metricsEnabled();
-
-                    long start = metricsEnabled ? System.nanoTime() : 0;
-
-                    if (mmap) {
-                        long pos = ptr == null ? -1 : ptr.fileOffset();
-
-                        List<SegmentedRingByteBuffer.ReadSegment> segs = 
buf.poll(pos);
-
-                        if (segs != null) {
-                            assert segs.size() == 1;
-
-                            SegmentedRingByteBuffer.ReadSegment seg = 
segs.get(0);
-
-                            int off = seg.buffer().position();
-                            int len = seg.buffer().limit() - off;
-
-                            fsync((MappedByteBuffer)buf.buf, off, len);
-
-                            seg.release();
-                        }
-                    }
-                    else
-                        walWriter.force();
-
-                    lastFsyncPos = written;
-
-                    if (fsyncDelay > 0)
-                        fsync.signalAll();
-
-                    long end = metricsEnabled ? System.nanoTime() : 0;
-
-                    if (metricsEnabled)
-                        metrics.onFsync(end - start);
-                }
-            }
-            finally {
-                lock.unlock();
-            }
-        }
-
-        /**
-         * @param buf Mapped byte buffer..
-         * @param off Offset.
-         * @param len Length.
-         */
-        private void fsync(MappedByteBuffer buf, int off, int len) throws 
IgniteCheckedException {
-            try {
-                long mappedOff = (Long)mappingOffset.invoke(buf);
-
-                assert mappedOff == 0 : mappedOff;
-
-                long addr = (Long)mappingAddress.invoke(buf, mappedOff);
-
-                long delta = (addr + off) % PAGE_SIZE;
-
-                long alignedAddr = (addr + off) - delta;
-
-                force0.invoke(buf, fd.get(buf), alignedAddr, len + delta);
-            }
-            catch (IllegalAccessException | InvocationTargetException e) {
-                throw new IgniteCheckedException(e);
-            }
-        }
-
-        /**
-         * @return {@code true} If this thread actually closed the segment.
-         * @throws IgniteCheckedException If failed.
-         * @throws StorageException If failed.
-         */
-        private boolean close(boolean rollOver) throws IgniteCheckedException, 
StorageException {
-            if (stop.compareAndSet(false, true)) {
-                lock.lock();
-
-                try {
-                    flushOrWait(null);
-
-                    try {
-                        RecordSerializer backwardSerializer = new 
RecordSerializerFactoryImpl(cctx)
-                            .createSerializer(serializerVer);
-
-                        SwitchSegmentRecord segmentRecord = new 
SwitchSegmentRecord();
-
-                        int switchSegmentRecSize = 
backwardSerializer.size(segmentRecord);
-
-                        if (rollOver && written < (maxWalSegmentSize - 
switchSegmentRecSize)) {
-                            segmentRecord.size(switchSegmentRecSize);
-
-                            WALPointer segRecPtr = addRecord(segmentRecord);
-
-                            if (segRecPtr != null)
-                                fsync((FileWALPointer)segRecPtr);
-                        }
-
-                        if (mmap) {
-                            List<SegmentedRingByteBuffer.ReadSegment> segs = 
buf.poll(maxWalSegmentSize);
-
-                            if (segs != null) {
-                                assert segs.size() == 1;
-
-                                segs.get(0).release();
-                            }
-                        }
-
-                        // Do the final fsync.
-                        if (mode != WALMode.NONE) {
-                            if (mmap)
-                                ((MappedByteBuffer)buf.buf).force();
-                            else
-                                fileIO.force();
-
-                            lastFsyncPos = written;
-                        }
-
-                        if (mmap) {
-                            try {
-                                fileIO.close();
-                            }
-                            catch (IOException ignore) {
-                                // No-op.
-                            }
-                        }
-                        else {
-                            walWriter.close();
-
-                            if (!rollOver)
-                                buf.free();
-                        }
-                    }
-                    catch (IOException e) {
-                        throw new StorageException("Failed to close WAL write 
handle [idx=" + getSegmentId() + "]", e);
-                    }
-
-                    if (log.isDebugEnabled())
-                        log.debug("Closed WAL write handle [idx=" + 
getSegmentId() + "]");
-
-                    return true;
-                }
-                finally {
-                    if (mmap)
-                        buf.free();
-
-                    lock.unlock();
-                }
-            }
-            else
-                return false;
-        }
-
-        /**
-         * Signals next segment available to wake up other worker threads 
waiting for WAL to write
-         */
-        private void signalNextAvailable() {
-            lock.lock();
-
-            try {
-                assert cctx.kernalContext().invalid() ||
-                    written == lastFsyncPos || mode != WALMode.FSYNC :
-                    "fsync [written=" + written + ", lastFsync=" + 
lastFsyncPos + ", idx=" + getSegmentId() + ']';
-
-                fileIO = null;
-
-                nextSegment.signalAll();
-            }
-            finally {
-                lock.unlock();
-            }
-        }
-
-        /**
-         *
-         */
-        private void awaitNext() {
-            lock.lock();
-
-            try {
-                while (fileIO != null)
-                    U.awaitQuiet(nextSegment);
-            }
-            finally {
-                lock.unlock();
-            }
-        }
-
-        /**
-         * @return Safely reads current position of the file channel as 
String. Will return "null" if channel is null.
-         */
-        private String safePosition() {
-            FileIO io = fileIO;
-
-            if (io == null)
-                return "null";
-
-            try {
-                return String.valueOf(io.position());
-            }
-            catch (IOException e) {
-                return "{Failed to read channel position: " + e.getMessage() + 
'}';
-            }
-        }
-    }
-
-    /**
      * Iterator over WAL-log.
      */
     private static class RecordsIterator extends AbstractWalRecordsIterator {
@@ -3282,7 +2671,7 @@ public class FileWriteAheadLogManager extends 
GridCacheSharedManagerAdapter impl
         FileWriteHandle hnd = currentHandle();
 
         try {
-            hnd.flush(null);
+            hnd.flushAll();
         }
         catch (Exception e) {
             U.warn(log, "Failed to flush WAL record queue", e);
@@ -3290,344 +2679,6 @@ public class FileWriteAheadLogManager extends 
GridCacheSharedManagerAdapter impl
     }
 
     /**
-     * WAL writer worker.
-     */
-    private class WALWriter extends GridWorker {
-        /** Unconditional flush. */
-        private static final long UNCONDITIONAL_FLUSH = -1L;
-
-        /** File close. */
-        private static final long FILE_CLOSE = -2L;
-
-        /** File force. */
-        private static final long FILE_FORCE = -3L;
-
-        /** Err. */
-        private volatile Throwable err;
-
-        //TODO: replace with GC free data structure.
-        /** Parked threads. */
-        final Map<Thread, Long> waiters = new ConcurrentHashMap<>();
-
-        /**
-         * Default constructor.
-         *
-         * @param log Logger.
-         */
-        WALWriter(IgniteLogger log) {
-            super(cctx.igniteInstanceName(), "wal-write-worker%" + 
cctx.igniteInstanceName(), log,
-                cctx.kernalContext().workersRegistry());
-        }
-
-        /** {@inheritDoc} */
-        @Override protected void body() {
-            Throwable err = null;
-
-            try {
-                while (!isCancelled()) {
-                    onIdle();
-
-                    while (waiters.isEmpty()) {
-                        if (!isCancelled()) {
-                            blockingSectionBegin();
-
-                            try {
-                                LockSupport.park();
-                            }
-                            finally {
-                                blockingSectionEnd();
-                            }
-                        }
-                        else {
-                            unparkWaiters(Long.MAX_VALUE);
-
-                            return;
-                        }
-                    }
-
-                    Long pos = null;
-
-                    for (Long val : waiters.values()) {
-                        if (val > Long.MIN_VALUE)
-                            pos = val;
-                    }
-
-                    updateHeartbeat();
-
-                    if (pos == null)
-                        continue;
-                    else if (pos < UNCONDITIONAL_FLUSH) {
-                        try {
-                            assert pos == FILE_CLOSE || pos == FILE_FORCE : 
pos;
-
-                            if (pos == FILE_CLOSE)
-                                currHnd.fileIO.close();
-                            else if (pos == FILE_FORCE)
-                                currHnd.fileIO.force();
-                        }
-                        catch (IOException e) {
-                            log.error("Exception in WAL writer thread: ", e);
-
-                            err = e;
-
-                            unparkWaiters(Long.MAX_VALUE);
-
-                            return;
-                        }
-
-                        unparkWaiters(pos);
-                    }
-
-                    updateHeartbeat();
-
-                    List<SegmentedRingByteBuffer.ReadSegment> segs = 
currentHandle().buf.poll(pos);
-
-                    if (segs == null) {
-                        unparkWaiters(pos);
-
-                        continue;
-                    }
-
-                    for (int i = 0; i < segs.size(); i++) {
-                        SegmentedRingByteBuffer.ReadSegment seg = segs.get(i);
-
-                        updateHeartbeat();
-
-                        try {
-                            writeBuffer(seg.position(), seg.buffer());
-                        }
-                        catch (Throwable e) {
-                            log.error("Exception in WAL writer thread:", e);
-
-                            err = e;
-                        }
-                        finally {
-                            seg.release();
-
-                            long p = pos <= UNCONDITIONAL_FLUSH || err != null 
? Long.MAX_VALUE : currentHandle().written;
-
-                            unparkWaiters(p);
-                        }
-                    }
-                }
-            }
-            catch (Throwable t) {
-                err = t;
-            }
-            finally {
-                unparkWaiters(Long.MAX_VALUE);
-
-                if (err == null && !isCancelled)
-                    err = new IllegalStateException("Worker " + name() + " is 
terminated unexpectedly");
-
-                if (err instanceof OutOfMemoryError)
-                    cctx.kernalContext().failure().process(new 
FailureContext(CRITICAL_ERROR, err));
-                else if (err != null)
-                    cctx.kernalContext().failure().process(new 
FailureContext(SYSTEM_WORKER_TERMINATION, err));
-            }
-        }
-
-        /**
-         * Shutdowns thread.
-         */
-        public void shutdown() throws IgniteInterruptedCheckedException {
-            U.cancel(this);
-
-            LockSupport.unpark(runner());
-
-            U.join(runner());
-        }
-
-        /**
-         * Unparks waiting threads.
-         *
-         * @param pos Pos.
-         */
-        private void unparkWaiters(long pos) {
-            assert pos > Long.MIN_VALUE : pos;
-
-            for (Map.Entry<Thread, Long> e : waiters.entrySet()) {
-                Long val = e.getValue();
-
-                if (val <= pos) {
-                    if (val != Long.MIN_VALUE)
-                        waiters.put(e.getKey(), Long.MIN_VALUE);
-
-                    LockSupport.unpark(e.getKey());
-                }
-            }
-        }
-
-        /**
-         * Forces all made changes to the file.
-         */
-        void force() throws IgniteCheckedException {
-            flushBuffer(FILE_FORCE);
-        }
-
-        /**
-         * Closes file.
-         */
-        void close() throws IgniteCheckedException {
-            flushBuffer(FILE_CLOSE);
-        }
-
-        /**
-         * Flushes all data from the buffer.
-         */
-        void flushAll() throws IgniteCheckedException {
-            flushBuffer(UNCONDITIONAL_FLUSH);
-        }
-
-        /**
-         * @param expPos Expected position.
-         */
-        void flushBuffer(long expPos) throws IgniteCheckedException {
-            if (mmap)
-                return;
-
-            Throwable err = walWriter.err;
-
-            if (err != null)
-                cctx.kernalContext().failure().process(new 
FailureContext(CRITICAL_ERROR, err));
-
-            if (expPos == UNCONDITIONAL_FLUSH)
-                expPos = (currentHandle().buf.tail());
-
-            Thread t = Thread.currentThread();
-
-            waiters.put(t, expPos);
-
-            LockSupport.unpark(walWriter.runner());
-
-            while (true) {
-                Long val = waiters.get(t);
-
-                assert val != null : "Only this thread can remove thread from 
waiters";
-
-                if (val == Long.MIN_VALUE) {
-                    waiters.remove(t);
-
-                    Throwable walWriterError = walWriter.err;
-
-                    if (walWriterError != null)
-                        throw new IgniteCheckedException("Flush buffer 
failed.", walWriterError);
-
-                    return;
-                }
-                else
-                    LockSupport.park();
-            }
-        }
-
-        /**
-         * @param pos Position in file to start write from. May be checked 
against actual position to wait previous
-         * writes to complete
-         * @param buf Buffer to write to file
-         * @throws StorageException If failed.
-         * @throws IgniteCheckedException If failed.
-         */
-        @SuppressWarnings("TooBroadScope")
-        private void writeBuffer(long pos, ByteBuffer buf) throws 
StorageException, IgniteCheckedException {
-            FileWriteHandle hdl = currentHandle();
-
-            assert hdl.fileIO != null : "Writing to a closed segment.";
-
-            checkNode();
-
-            long lastLogged = U.currentTimeMillis();
-
-            long logBackoff = 2_000;
-
-            // If we were too fast, need to wait previous writes to complete.
-            while (hdl.written != pos) {
-                assert hdl.written < pos : "written = " + hdl.written + ", pos 
= " + pos; // No one can write further than we are now.
-
-                // Permutation occurred between blocks write operations.
-                // Order of acquiring lock is not the same as order of write.
-                long now = U.currentTimeMillis();
-
-                if (now - lastLogged >= logBackoff) {
-                    if (logBackoff < 60 * 60_000)
-                        logBackoff *= 2;
-
-                    U.warn(log, "Still waiting for a concurrent write to 
complete [written=" + hdl.written +
-                        ", pos=" + pos + ", lastFsyncPos=" + hdl.lastFsyncPos 
+ ", stop=" + hdl.stop.get() +
-                        ", actualPos=" + hdl.safePosition() + ']');
-
-                    lastLogged = now;
-                }
-
-                checkNode();
-            }
-
-            // Do the write.
-            int size = buf.remaining();
-
-            assert size > 0 : size;
-
-            try {
-                assert hdl.written == hdl.fileIO.position();
-
-                hdl.written += hdl.fileIO.writeFully(buf);
-
-                metrics.onWalBytesWritten(size);
-
-                assert hdl.written == hdl.fileIO.position();
-            }
-            catch (IOException e) {
-                StorageException se = new StorageException("Failed to write 
buffer.", e);
-
-                cctx.kernalContext().failure().process(new 
FailureContext(CRITICAL_ERROR, se));
-
-                throw se;
-            }
-        }
-    }
-
-    /**
-     * Syncs WAL segment file.
-     */
-    private class WalSegmentSyncer extends GridWorker {
-        /** Sync timeout. */
-        long syncTimeout;
-
-        /**
-         * @param igniteInstanceName Ignite instance name.
-         * @param log Logger.
-         */
-        public WalSegmentSyncer(String igniteInstanceName, IgniteLogger log) {
-            super(igniteInstanceName, "wal-segment-syncer", log);
-
-            syncTimeout = 
Math.max(IgniteSystemProperties.getLong(IGNITE_WAL_SEGMENT_SYNC_TIMEOUT,
-                DFLT_WAL_SEGMENT_SYNC_TIMEOUT), 100L);
-        }
-
-        /** {@inheritDoc} */
-        @Override protected void body() throws InterruptedException, 
IgniteInterruptedCheckedException {
-            while (!isCancelled()) {
-                sleep(syncTimeout);
-
-                try {
-                    flush(null, true);
-                }
-                catch (IgniteCheckedException e) {
-                    U.error(log, "Exception when flushing WAL.", e);
-                }
-            }
-        }
-
-        /** Shutted down the worker. */
-        private void shutdown() {
-            synchronized (this) {
-                U.cancel(this);
-            }
-
-            U.join(this, log);
-        }
-    }
-
-    /**
      * Scans provided folder for a WAL segment files
      * @param walFilesDir directory to scan
      * @return found WAL file descriptors

Reply via email to