IGNITE-7594 Fixed performance drop after WAL optimization for FSYNC mode by 
reverting the old implementation - Fixes #3521.

Signed-off-by: Alexey Goncharuk <alexey.goncha...@gmail.com>


Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/b7cc1d74
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/b7cc1d74
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/b7cc1d74

Branch: refs/heads/ignite-2.4
Commit: b7cc1d745b44f7a0e32333c2716799e2cfac2988
Parents: 2eec607
Author: Ilya Lantukh <ilant...@gridgain.com>
Authored: Mon Feb 19 18:06:56 2018 +0300
Committer: Alexey Goncharuk <alexey.goncha...@gmail.com>
Committed: Mon Feb 19 18:06:56 2018 +0300

----------------------------------------------------------------------
 .../apache/ignite/IgniteSystemProperties.java   |    6 +
 .../configuration/DataStorageConfiguration.java |   11 +-
 .../PersistentStoreConfiguration.java           |    7 +-
 .../apache/ignite/configuration/WALMode.java    |   15 +-
 .../internal/pagemem/wal/record/WALRecord.java  |   19 +-
 .../processors/cache/GridCacheProcessor.java    |   10 +-
 .../wal/AbstractWalRecordsIterator.java         |   83 +-
 .../wal/FileWriteAheadLogManager.java           |  107 +-
 .../wal/FsyncModeFileWriteAheadLogManager.java  | 3142 ++++++++++++++++++
 .../SingleSegmentLogicalRecordsIterator.java    |   12 +-
 .../reader/StandaloneWalRecordsIterator.java    |   13 +-
 .../db/wal/IgniteWalFlushDefaultSelfTest.java   |    2 +-
 .../db/wal/reader/IgniteWalReaderTest.java      |    4 +-
 13 files changed, 3354 insertions(+), 77 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/ignite/blob/b7cc1d74/modules/core/src/main/java/org/apache/ignite/IgniteSystemProperties.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/IgniteSystemProperties.java 
b/modules/core/src/main/java/org/apache/ignite/IgniteSystemProperties.java
index 43b718b..383d700 100644
--- a/modules/core/src/main/java/org/apache/ignite/IgniteSystemProperties.java
+++ b/modules/core/src/main/java/org/apache/ignite/IgniteSystemProperties.java
@@ -792,6 +792,12 @@ public final class IgniteSystemProperties {
      */
     public static final String IGNITE_DIRECT_IO_ENABLED = 
"IGNITE_DIRECT_IO_ENABLED";
 
+    /**
+     * When set to {@code true}, WAL implementation with dedicated worker will 
be used even in FSYNC mode.
+     * Default is {@code false}.
+     */
+    public static final String IGNITE_WAL_FSYNC_WITH_DEDICATED_WORKER = 
"IGNITE_WAL_FSYNC_WITH_DEDICATED_WORKER";
+
 
     /**
      * Enforces singleton.

http://git-wip-us.apache.org/repos/asf/ignite/blob/b7cc1d74/modules/core/src/main/java/org/apache/ignite/configuration/DataStorageConfiguration.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/configuration/DataStorageConfiguration.java
 
b/modules/core/src/main/java/org/apache/ignite/configuration/DataStorageConfiguration.java
index 8d91503..4a3dbbf 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/configuration/DataStorageConfiguration.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/configuration/DataStorageConfiguration.java
@@ -120,7 +120,7 @@ public class DataStorageConfiguration implements 
Serializable {
     public static final int DFLT_WAL_SEGMENT_SIZE = 64 * 1024 * 1024;
 
     /** Default wal mode. */
-    public static final WALMode DFLT_WAL_MODE = WALMode.DEFAULT;
+    public static final WALMode DFLT_WAL_MODE = WALMode.LOG_ONLY;
 
     /** Default thread local buffer size. */
     public static final int DFLT_TLB_SIZE = 128 * 1024;
@@ -686,6 +686,9 @@ public class DataStorageConfiguration implements 
Serializable {
      * @param walMode Wal mode.
      */
     public DataStorageConfiguration setWalMode(WALMode walMode) {
+        if (walMode == WALMode.DEFAULT)
+            walMode = WALMode.FSYNC;
+
         this.walMode = walMode;
 
         return this;
@@ -755,7 +758,7 @@ public class DataStorageConfiguration implements 
Serializable {
     }
 
     /**
-     * Property that allows to trade latency for throughput in {@link 
WALMode#DEFAULT} mode.
+     * Property that allows to trade latency for throughput in {@link 
WALMode#FSYNC} mode.
      * It limits minimum time interval between WAL fsyncs. First thread that 
initiates WAL fsync will wait for
      * this number of nanoseconds, another threads will just wait fsync of 
first thread (similar to CyclicBarrier).
      * Total throughput should increase under load as total WAL fsync rate 
will be limited.
@@ -765,7 +768,7 @@ public class DataStorageConfiguration implements 
Serializable {
     }
 
     /**
-     * Sets property that allows to trade latency for throughput in {@link 
WALMode#DEFAULT} mode.
+     * Sets property that allows to trade latency for throughput in {@link 
WALMode#FSYNC} mode.
      * It limits minimum time interval between WAL fsyncs. First thread that 
initiates WAL fsync will wait for
      * this number of nanoseconds, another threads will just wait fsync of 
first thread (similar to CyclicBarrier).
      * Total throughput should increase under load as total WAL fsync rate 
will be limited.
@@ -845,7 +848,7 @@ public class DataStorageConfiguration implements 
Serializable {
     }
 
     /**
-     * <b>Note:</b> setting this value with {@link WALMode#DEFAULT} may 
generate file size overhead for WAL segments in case
+     * <b>Note:</b> setting this value with {@link WALMode#FSYNC} may generate 
file size overhead for WAL segments in case
      * grid is used rarely.
      *
      * @param walAutoArchiveAfterInactivity time in millis to run auto 
archiving segment (even if incomplete) after last

http://git-wip-us.apache.org/repos/asf/ignite/blob/b7cc1d74/modules/core/src/main/java/org/apache/ignite/configuration/PersistentStoreConfiguration.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/configuration/PersistentStoreConfiguration.java
 
b/modules/core/src/main/java/org/apache/ignite/configuration/PersistentStoreConfiguration.java
index d59d19b..37a2528 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/configuration/PersistentStoreConfiguration.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/configuration/PersistentStoreConfiguration.java
@@ -63,7 +63,7 @@ public class PersistentStoreConfiguration implements 
Serializable {
     public static final int DFLT_WAL_SEGMENT_SIZE = 64 * 1024 * 1024;
 
     /** Default wal mode. */
-    public static final WALMode DFLT_WAL_MODE = WALMode.DEFAULT;
+    public static final WALMode DFLT_WAL_MODE = WALMode.LOG_ONLY;
 
     /** Default Wal flush frequency. */
     public static final int DFLT_WAL_FLUSH_FREQ = 2000;
@@ -483,6 +483,9 @@ public class PersistentStoreConfiguration implements 
Serializable {
      * @param walMode Wal mode.
      */
     public PersistentStoreConfiguration setWalMode(WALMode walMode) {
+        if (walMode == WALMode.DEFAULT)
+            walMode = WALMode.FSYNC;
+
         this.walMode = walMode;
 
         return this;
@@ -623,7 +626,7 @@ public class PersistentStoreConfiguration implements 
Serializable {
     }
 
     /**
-     * <b>Note:</b> setting this value with {@link WALMode#DEFAULT} may 
generate file size overhead for WAL segments in case
+     * <b>Note:</b> setting this value with {@link WALMode#FSYNC} may generate 
file size overhead for WAL segments in case
      * grid is used rarely.
      *
      * @param walAutoArchiveAfterInactivity time in millis to run auto 
archiving segment (even if incomplete) after last

http://git-wip-us.apache.org/repos/asf/ignite/blob/b7cc1d74/modules/core/src/main/java/org/apache/ignite/configuration/WALMode.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/configuration/WALMode.java 
b/modules/core/src/main/java/org/apache/ignite/configuration/WALMode.java
index aa5cfdd..10471cf 100644
--- a/modules/core/src/main/java/org/apache/ignite/configuration/WALMode.java
+++ b/modules/core/src/main/java/org/apache/ignite/configuration/WALMode.java
@@ -25,11 +25,11 @@ import org.jetbrains.annotations.Nullable;
  */
 public enum WALMode {
     /**
-     * Default mode: full-sync disk writes. These writes survive power loss 
scenarios. When a control is returned
+     * FSYNC mode: full-sync disk writes. These writes survive power loss 
scenarios. When a control is returned
      * from the transaction commit operation, the changes are guaranteed to be 
persisted to disk according to the
      * transaction write synchronization mode.
      */
-    DEFAULT,
+    FSYNC,
 
     /**
      * Log only mode: flushes application buffers. These writes survive 
process crash. When a control is returned
@@ -48,7 +48,16 @@ public enum WALMode {
      * {@link Ignite#active(boolean)} method. If an Ignite node is terminated 
in NONE mode abruptly, it is likely
      * that the data stored on disk is corrupted and work directory will need 
to be cleared for a node restart.
      */
-    NONE;
+    NONE,
+
+    /**
+     * Default mode: full-sync disk writes. These writes survive power loss 
scenarios. When a control is returned
+     * from the transaction commit operation, the changes are guaranteed to be 
persisted to disk according to the
+     * transaction write synchronization mode.
+     * @deprecated This mode is no longer default and left here only for API 
compatibility. It is equivalent to the
+     * {@code FSYNC} mode.
+     */
+    @Deprecated DEFAULT;
 
     /**
      * Enumerated values.

http://git-wip-us.apache.org/repos/asf/ignite/blob/b7cc1d74/modules/core/src/main/java/org/apache/ignite/internal/pagemem/wal/record/WALRecord.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/pagemem/wal/record/WALRecord.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/pagemem/wal/record/WALRecord.java
index 8362a69..4fae179 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/pagemem/wal/record/WALRecord.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/pagemem/wal/record/WALRecord.java
@@ -188,7 +188,7 @@ public abstract class WALRecord {
         /**
          * Fake record type, causes stop iterating and indicates segment EOF
          * <b>Note:</b> regular record type is incremented by 1 and minimal 
value written to file is also 1
-         * For {@link WALMode#DEFAULT} this value is at least came from padding
+         * For {@link WALMode#FSYNC} this value is at least came from padding
          */
         public static final int STOP_ITERATION_RECORD_TYPE = 0;
     }
@@ -197,6 +197,9 @@ public abstract class WALRecord {
     private int size;
 
     /** */
+    private int chainSize;
+
+    /** */
     @GridToStringExclude
     private WALRecord prev;
 
@@ -204,6 +207,20 @@ public abstract class WALRecord {
     private WALPointer pos;
 
     /**
+     * @param chainSize Chain size in bytes.
+     */
+    public void chainSize(int chainSize) {
+        this.chainSize = chainSize;
+    }
+
+    /**
+     * @return Get chain size in bytes.
+     */
+    public int chainSize() {
+        return chainSize;
+    }
+
+    /**
      * @return Previous record in chain.
      */
     public WALRecord previous() {

http://git-wip-us.apache.org/repos/asf/ignite/blob/b7cc1d74/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java
index 1561f25..9007638 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java
@@ -56,6 +56,7 @@ import org.apache.ignite.configuration.IgniteConfiguration;
 import org.apache.ignite.configuration.MemoryConfiguration;
 import org.apache.ignite.configuration.NearCacheConfiguration;
 import org.apache.ignite.configuration.TransactionConfiguration;
+import org.apache.ignite.configuration.WALMode;
 import org.apache.ignite.events.EventType;
 import org.apache.ignite.internal.GridKernalContext;
 import org.apache.ignite.internal.IgniteClientDisconnectedCheckedException;
@@ -97,6 +98,7 @@ import 
org.apache.ignite.internal.processors.cache.persistence.metastorage.MetaS
 import 
org.apache.ignite.internal.processors.cache.persistence.snapshot.IgniteCacheSnapshotManager;
 import 
org.apache.ignite.internal.processors.cache.persistence.snapshot.SnapshotDiscoveryMessage;
 import 
org.apache.ignite.internal.processors.cache.persistence.tree.reuse.ReuseList;
+import 
org.apache.ignite.internal.processors.cache.persistence.wal.FsyncModeFileWriteAheadLogManager;
 import 
org.apache.ignite.internal.processors.cache.persistence.wal.FileWriteAheadLogManager;
 import 
org.apache.ignite.internal.processors.cache.query.GridCacheDistributedQueryManager;
 import 
org.apache.ignite.internal.processors.cache.query.GridCacheLocalQueryManager;
@@ -181,6 +183,9 @@ public class GridCacheProcessor extends 
GridProcessorAdapter {
     private final boolean startClientCaches =
         
IgniteSystemProperties.getBoolean(IgniteSystemProperties.IGNITE_START_CACHES_ON_JOIN,
 false);
 
+    private final boolean walFsyncWithDedicatedWorker =
+        
IgniteSystemProperties.getBoolean(IgniteSystemProperties.IGNITE_WAL_FSYNC_WITH_DEDICATED_WORKER,
 false);
+
     /** Shared cache context. */
     private GridCacheSharedContext<?, ?> sharedCtx;
 
@@ -2341,7 +2346,10 @@ public class GridCacheProcessor extends 
GridProcessorAdapter {
 
             pageStoreMgr = new FilePageStoreManager(ctx);
 
-            walMgr = new FileWriteAheadLogManager(ctx);
+            if (ctx.config().getDataStorageConfiguration().getWalMode() == 
WALMode.FSYNC && !walFsyncWithDedicatedWorker)
+                walMgr = new FsyncModeFileWriteAheadLogManager(ctx);
+            else
+                walMgr = new FileWriteAheadLogManager(ctx);
         }
         else
             dbMgr = new IgniteCacheDatabaseSharedManager();

http://git-wip-us.apache.org/repos/asf/ignite/blob/b7cc1d74/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/AbstractWalRecordsIterator.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/AbstractWalRecordsIterator.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/AbstractWalRecordsIterator.java
index bf59c81..65f3a20 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/AbstractWalRecordsIterator.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/AbstractWalRecordsIterator.java
@@ -31,6 +31,7 @@ import 
org.apache.ignite.internal.processors.cache.GridCacheSharedContext;
 import org.apache.ignite.internal.processors.cache.persistence.file.FileIO;
 import 
org.apache.ignite.internal.processors.cache.persistence.file.FileIOFactory;
 import 
org.apache.ignite.internal.processors.cache.persistence.file.UnzipFileIO;
+import 
org.apache.ignite.internal.processors.cache.persistence.wal.serializer.RecordSerializer;
 import 
org.apache.ignite.internal.processors.cache.persistence.wal.serializer.RecordSerializerFactory;
 import org.apache.ignite.internal.util.GridCloseableIteratorAdapter;
 import org.apache.ignite.internal.util.typedef.P2;
@@ -62,7 +63,7 @@ public abstract class AbstractWalRecordsIterator
     /**
      * Current WAL segment read file handle. To be filled by subclass 
advanceSegment
      */
-    private FileWriteAheadLogManager.ReadFileHandle currWalSegment;
+    private AbstractReadFileHandle currWalSegment;
 
     /** Logger */
     @NotNull protected final IgniteLogger log;
@@ -104,21 +105,6 @@ public abstract class AbstractWalRecordsIterator
         buf = new ByteBufferExpander(bufSize, ByteOrder.nativeOrder());
     }
 
-    /**
-     * Scans provided folder for a WAL segment files
-     * @param walFilesDir directory to scan
-     * @return found WAL file descriptors
-     */
-    protected static FileWriteAheadLogManager.FileDescriptor[] 
loadFileDescriptors(@NotNull final File walFilesDir) throws 
IgniteCheckedException {
-        final File[] files = 
walFilesDir.listFiles(FileWriteAheadLogManager.WAL_SEGMENT_COMPACTED_OR_RAW_FILE_FILTER);
-
-        if (files == null) {
-            throw new IgniteCheckedException("WAL files directory does not not 
denote a " +
-                "directory, or if an I/O error occurs: [" + 
walFilesDir.getAbsolutePath() + "]");
-        }
-        return FileWriteAheadLogManager.scan(files);
-    }
-
     /** {@inheritDoc} */
     @Override protected IgniteBiTuple<WALPointer, WALRecord> onNext() throws 
IgniteCheckedException {
         IgniteBiTuple<WALPointer, WALRecord> ret = curRec;
@@ -188,8 +174,8 @@ public abstract class AbstractWalRecordsIterator
      * @return closed handle
      * @throws IgniteCheckedException if IO failed
      */
-    @Nullable protected FileWriteAheadLogManager.ReadFileHandle 
closeCurrentWalSegment() throws IgniteCheckedException {
-        final FileWriteAheadLogManager.ReadFileHandle walSegmentClosed = 
currWalSegment;
+    @Nullable protected AbstractReadFileHandle closeCurrentWalSegment() throws 
IgniteCheckedException {
+        final AbstractReadFileHandle walSegmentClosed = currWalSegment;
 
         if (walSegmentClosed != null) {
             walSegmentClosed.close();
@@ -206,8 +192,8 @@ public abstract class AbstractWalRecordsIterator
      * @param curWalSegment current open WAL segment or null if there is no 
open segment yet
      * @return new WAL segment to read or null for stop iteration
      */
-    protected abstract FileWriteAheadLogManager.ReadFileHandle advanceSegment(
-        @Nullable final FileWriteAheadLogManager.ReadFileHandle curWalSegment) 
throws IgniteCheckedException;
+    protected abstract AbstractReadFileHandle advanceSegment(
+        @Nullable final AbstractReadFileHandle curWalSegment) throws 
IgniteCheckedException;
 
     /**
      * Switches to new record
@@ -215,15 +201,15 @@ public abstract class AbstractWalRecordsIterator
      * @return next advanced record
      */
     private IgniteBiTuple<WALPointer, WALRecord> advanceRecord(
-        @Nullable final FileWriteAheadLogManager.ReadFileHandle hnd
+        @Nullable final AbstractReadFileHandle hnd
     ) throws IgniteCheckedException {
         if (hnd == null)
             return null;
 
-        FileWALPointer actualFilePtr = new FileWALPointer(hnd.idx, 
(int)hnd.in.position(), 0);
+        FileWALPointer actualFilePtr = new FileWALPointer(hnd.idx(), 
(int)hnd.in().position(), 0);
 
         try {
-            WALRecord rec = hnd.ser.readRecord(hnd.in, actualFilePtr);
+            WALRecord rec = hnd.ser().readRecord(hnd.in(), actualFilePtr);
 
             actualFilePtr.length(rec.size());
 
@@ -271,12 +257,12 @@ public abstract class AbstractWalRecordsIterator
      * @throws FileNotFoundException If segment file is missing.
      * @throws IgniteCheckedException If initialized failed due to another 
unexpected error.
      */
-    protected FileWriteAheadLogManager.ReadFileHandle initReadHandle(
-        @NotNull final FileWriteAheadLogManager.FileDescriptor desc,
+    protected AbstractReadFileHandle initReadHandle(
+        @NotNull final AbstractFileDescriptor desc,
         @Nullable final FileWALPointer start)
         throws IgniteCheckedException, FileNotFoundException {
         try {
-            FileIO fileIO = desc.isCompressed() ? new UnzipFileIO(desc.file) : 
ioFactory.create(desc.file);
+            FileIO fileIO = desc.isCompressed() ? new UnzipFileIO(desc.file()) 
: ioFactory.create(desc.file());
 
             try {
                 IgniteBiTuple<Integer, Boolean> tup = 
FileWriteAheadLogManager.readSerializerVersionAndCompactedFlag(fileIO);
@@ -290,7 +276,7 @@ public abstract class AbstractWalRecordsIterator
 
                 FileInput in = new FileInput(fileIO, buf);
 
-                if (start != null && desc.idx == start.index()) {
+                if (start != null && desc.idx() == start.index()) {
                     if (isCompacted) {
                         if (start.fileOffset() != 0)
                             serializerFactory.recordDeserializeFilter(new 
StartSeekingFilter(start));
@@ -303,8 +289,7 @@ public abstract class AbstractWalRecordsIterator
                     }
                 }
 
-                return new FileWriteAheadLogManager.ReadFileHandle(
-                    fileIO, desc.idx, 
serializerFactory.createSerializer(serVer), in);
+                return createReadFileHandle(fileIO, desc.idx(), 
serializerFactory.createSerializer(serVer), in);
             }
             catch (SegmentEofException | EOFException ignore) {
                 try {
@@ -332,10 +317,18 @@ public abstract class AbstractWalRecordsIterator
         }
         catch (IOException e) {
             throw new IgniteCheckedException(
-                "Failed to initialize WAL segment: " + 
desc.file.getAbsolutePath(), e);
+                "Failed to initialize WAL segment: " + 
desc.file().getAbsolutePath(), e);
         }
     }
 
+    /** */
+    protected abstract AbstractReadFileHandle createReadFileHandle(
+        FileIO fileIO,
+        long idx,
+        RecordSerializer ser,
+        FileInput in
+    );
+
     /**
      * Filter that drops all records until given start pointer is reached.
      */
@@ -364,4 +357,34 @@ public abstract class AbstractWalRecordsIterator
             return startReached;
         }
     }
+
+    /** */
+    protected interface AbstractReadFileHandle {
+        /** */
+        void close() throws IgniteCheckedException;
+
+        /** */
+        long idx();
+
+        /** */
+        FileInput in();
+
+        /** */
+        RecordSerializer ser();
+
+        /** */
+        boolean workDir();
+    }
+
+    /** */
+    protected interface AbstractFileDescriptor {
+        /** */
+        boolean isCompressed();
+
+        /** */
+        File file();
+
+        /** */
+        long idx();
+    }
 }

http://git-wip-us.apache.org/repos/asf/ignite/blob/b7cc1d74/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/FileWriteAheadLogManager.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/FileWriteAheadLogManager.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/FileWriteAheadLogManager.java
index 73751c6..839dc1a 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/FileWriteAheadLogManager.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/FileWriteAheadLogManager.java
@@ -543,7 +543,7 @@ public class FileWriteAheadLogManager extends 
GridCacheSharedManagerAdapter impl
 
     /** {@inheritDoc} */
     @Override public boolean isFullSync() {
-        return mode == WALMode.DEFAULT;
+        return mode == WALMode.FSYNC;
     }
 
     /** {@inheritDoc} */
@@ -1264,7 +1264,7 @@ public class FileWriteAheadLogManager extends 
GridCacheSharedManagerAdapter impl
         try (FileIO fileIO = ioFactory.create(file, CREATE, READ, WRITE)) {
             int left = dsCfg.getWalSegmentSize();
 
-            if (mode == WALMode.DEFAULT) {
+            if (mode == WALMode.FSYNC) {
                 while (left > 0) {
                     int toWrite = Math.min(FILL_BUF.length, left);
 
@@ -1708,7 +1708,7 @@ public class FileWriteAheadLogManager extends 
GridCacheSharedManagerAdapter impl
 
                 Files.move(dstTmpFile.toPath(), dstFile.toPath());
 
-                if (mode == WALMode.DEFAULT) {
+                if (mode == WALMode.FSYNC) {
                     try (FileIO f0 = ioFactory.create(dstFile, CREATE, READ, 
WRITE)) {
                         f0.force();
                     }
@@ -1884,7 +1884,7 @@ public class FileWriteAheadLogManager extends 
GridCacheSharedManagerAdapter impl
 
                     Files.move(tmpZip.toPath(), zip.toPath());
 
-                    if (mode == WALMode.DEFAULT) {
+                    if (mode == WALMode.FSYNC) {
                         try (FileIO f0 = ioFactory.create(zip, CREATE, READ, 
WRITE)) {
                             f0.force();
                         }
@@ -2112,7 +2112,7 @@ public class FileWriteAheadLogManager extends 
GridCacheSharedManagerAdapter impl
                 if (checkFile.isDirectory())
                     throw new IgniteCheckedException("Failed to initialize WAL 
log segment (a directory with " +
                         "the same name already exists): " + 
checkFile.getAbsolutePath());
-                else if (checkFile.length() != dsCfg.getWalSegmentSize() && 
mode == WALMode.DEFAULT)
+                else if (checkFile.length() != dsCfg.getWalSegmentSize() && 
mode == WALMode.FSYNC)
                     throw new IgniteCheckedException("Failed to initialize WAL 
log segment " +
                         "(WAL segment size change is not supported in 
'DEFAULT' WAL mode) " +
                         "[filePath=" + checkFile.getAbsolutePath() +
@@ -2223,7 +2223,7 @@ public class FileWriteAheadLogManager extends 
GridCacheSharedManagerAdapter impl
     /**
      * WAL file descriptor.
      */
-    public static class FileDescriptor implements Comparable<FileDescriptor> {
+    public static class FileDescriptor implements Comparable<FileDescriptor>, 
AbstractWalRecordsIterator.AbstractFileDescriptor {
         /** */
         protected final File file;
 
@@ -2313,6 +2313,16 @@ public class FileWriteAheadLogManager extends 
GridCacheSharedManagerAdapter impl
         public boolean isCompressed() {
             return file.getName().endsWith(".zip");
         }
+
+        /** {@inheritDoc} */
+        @Override public File file() {
+            return file;
+        }
+
+        /** {@inheritDoc} */
+        @Override public long idx() {
+            return idx;
+        }
     }
 
     /**
@@ -2338,7 +2348,7 @@ public class FileWriteAheadLogManager extends 
GridCacheSharedManagerAdapter impl
     /**
      *
      */
-    public static class ReadFileHandle extends FileHandle {
+    public static class ReadFileHandle extends FileHandle implements 
AbstractWalRecordsIterator.AbstractReadFileHandle {
         /** Entry serializer. */
         RecordSerializer ser;
 
@@ -2357,7 +2367,7 @@ public class FileWriteAheadLogManager extends 
GridCacheSharedManagerAdapter impl
          * @param ser Entry serializer.
          * @param in File input.
          */
-        ReadFileHandle(
+        public ReadFileHandle(
             FileIO fileIO,
             long idx,
             RecordSerializer ser,
@@ -2380,6 +2390,26 @@ public class FileWriteAheadLogManager extends 
GridCacheSharedManagerAdapter impl
                 throw new IgniteCheckedException(e);
             }
         }
+
+        /** {@inheritDoc} */
+        @Override public long idx() {
+            return idx;
+        }
+
+        /** {@inheritDoc} */
+        @Override public FileInput in() {
+            return in;
+        }
+
+        /** {@inheritDoc} */
+        @Override public RecordSerializer ser() {
+            return ser;
+        }
+
+        /** {@inheritDoc} */
+        @Override public boolean workDir() {
+            return workDir;
+        }
     }
 
     /**
@@ -2744,7 +2774,7 @@ public class FileWriteAheadLogManager extends 
GridCacheSharedManagerAdapter impl
                         }
 
                         // Do the final fsync.
-                        if (mode == WALMode.DEFAULT) {
+                        if (mode == WALMode.FSYNC) {
                             if (mmap)
                                 ((MappedByteBuffer)buf.buf).force();
                             else
@@ -2795,7 +2825,7 @@ public class FileWriteAheadLogManager extends 
GridCacheSharedManagerAdapter impl
             lock.lock();
 
             try {
-                assert cctx.kernalContext().invalidated() || written == 
lastFsyncPos || mode != WALMode.DEFAULT :
+                assert cctx.kernalContext().invalidated() || written == 
lastFsyncPos || mode != WALMode.FSYNC :
                     "fsync [written=" + written + ", lastFsync=" + 
lastFsyncPos + ", idx=" + idx + ']';
 
                 fileIO = null;
@@ -2914,22 +2944,22 @@ public class FileWriteAheadLogManager extends 
GridCacheSharedManagerAdapter impl
 
         /** {@inheritDoc} */
         @Override protected ReadFileHandle initReadHandle(
-            @NotNull FileDescriptor desc,
+            @NotNull AbstractFileDescriptor desc,
             @Nullable FileWALPointer start
         ) throws IgniteCheckedException, FileNotFoundException {
-            if (decompressor != null && !desc.file.exists()) {
+            if (decompressor != null && !desc.file().exists()) {
                 FileDescriptor zipFile = new FileDescriptor(
-                    new File(walArchiveDir, 
FileDescriptor.fileName(desc.getIdx()) + ".zip"));
+                    new File(walArchiveDir, 
FileDescriptor.fileName(desc.idx()) + ".zip"));
 
                 if (!zipFile.file.exists()) {
                     throw new FileNotFoundException("Both compressed and raw 
segment files are missing in archive " +
-                        "[segmentIdx=" + desc.idx + "]");
+                        "[segmentIdx=" + desc.idx() + "]");
                 }
 
-                decompressor.decompressFile(desc.idx).get();
+                decompressor.decompressFile(desc.idx()).get();
             }
 
-            return super.initReadHandle(desc, start);
+            return (ReadFileHandle) super.initReadHandle(desc, start);
         }
 
         /** {@inheritDoc} */
@@ -2938,9 +2968,9 @@ public class FileWriteAheadLogManager extends 
GridCacheSharedManagerAdapter impl
 
             curRec = null;
 
-            final ReadFileHandle handle = closeCurrentWalSegment();
+            final AbstractReadFileHandle handle = closeCurrentWalSegment();
 
-            if (handle != null && handle.workDir)
+            if (handle != null && handle.workDir())
                 releaseWorkSegment(curWalSegmIdx);
 
             curWalSegmIdx = Integer.MAX_VALUE;
@@ -2950,16 +2980,16 @@ public class FileWriteAheadLogManager extends 
GridCacheSharedManagerAdapter impl
          * @throws IgniteCheckedException If failed to initialize first file 
handle.
          */
         private void init() throws IgniteCheckedException {
-            FileDescriptor[] descs = loadFileDescriptors(walArchiveDir);
+            AbstractFileDescriptor[] descs = 
loadFileDescriptors(walArchiveDir);
 
             if (start != null) {
                 if (!F.isEmpty(descs)) {
-                    if (descs[0].idx > start.index())
+                    if (descs[0].idx() > start.index())
                         throw new IgniteCheckedException("WAL history is too 
short " +
                             "[descs=" + Arrays.asList(descs) + ", start=" + 
start + ']');
 
-                    for (FileDescriptor desc : descs) {
-                        if (desc.idx == start.index()) {
+                    for (AbstractFileDescriptor desc : descs) {
+                        if (desc.idx() == start.index()) {
                             curWalSegmIdx = start.index();
 
                             break;
@@ -2967,7 +2997,7 @@ public class FileWriteAheadLogManager extends 
GridCacheSharedManagerAdapter impl
                     }
 
                     if (curWalSegmIdx == -1) {
-                        long lastArchived = descs[descs.length - 1].idx;
+                        long lastArchived = descs[descs.length - 1].idx();
 
                         if (lastArchived > start.index())
                             throw new IgniteCheckedException("WAL history is 
corrupted (segment is missing): " + start);
@@ -2984,7 +3014,7 @@ public class FileWriteAheadLogManager extends 
GridCacheSharedManagerAdapter impl
                 }
             }
             else
-                curWalSegmIdx = !F.isEmpty(descs) ? descs[0].idx : 0;
+                curWalSegmIdx = !F.isEmpty(descs) ? descs[0].idx() : 0;
 
             curWalSegmIdx--;
 
@@ -2993,14 +3023,14 @@ public class FileWriteAheadLogManager extends 
GridCacheSharedManagerAdapter impl
         }
 
         /** {@inheritDoc} */
-        @Override protected ReadFileHandle advanceSegment(
-            @Nullable final ReadFileHandle curWalSegment
+        @Override protected AbstractReadFileHandle advanceSegment(
+            @Nullable final AbstractReadFileHandle curWalSegment
         ) throws IgniteCheckedException {
             if (curWalSegment != null) {
                 curWalSegment.close();
 
-                if (curWalSegment.workDir)
-                    releaseWorkSegment(curWalSegment.idx);
+                if (curWalSegment.workDir())
+                    releaseWorkSegment(curWalSegment.idx());
 
             }
 
@@ -3068,6 +3098,12 @@ public class FileWriteAheadLogManager extends 
GridCacheSharedManagerAdapter impl
             if (archiver != null)
                 archiver.releaseWorkSegment(absIdx);
         }
+
+        /** {@inheritDoc} */
+        @Override protected AbstractReadFileHandle createReadFileHandle(FileIO 
fileIO, long idx,
+            RecordSerializer ser, FileInput in) {
+            return new ReadFileHandle(fileIO, idx, ser, in);
+        }
     }
 
     /**
@@ -3349,4 +3385,19 @@ public class FileWriteAheadLogManager extends 
GridCacheSharedManagerAdapter impl
             }
         }
     }
+
+    /**
+     * Scans provided folder for a WAL segment files
+     * @param walFilesDir directory to scan
+     * @return found WAL file descriptors
+     */
+    public static FileDescriptor[] loadFileDescriptors(@NotNull final File 
walFilesDir) throws IgniteCheckedException {
+        final File[] files = 
walFilesDir.listFiles(WAL_SEGMENT_COMPACTED_OR_RAW_FILE_FILTER);
+
+        if (files == null) {
+            throw new IgniteCheckedException("WAL files directory does not not 
denote a " +
+                "directory, or if an I/O error occurs: [" + 
walFilesDir.getAbsolutePath() + "]");
+        }
+        return scan(files);
+    }
 }

Reply via email to