IGNITE-7380 Implemented pluggable Direct IO - Fixes #3226.

Signed-off-by: Alexey Goncharuk <[email protected]>


Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/9bba2d52
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/9bba2d52
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/9bba2d52

Branch: refs/heads/master
Commit: 9bba2d5203b0e67b206f153cac4a60a213ce9a15
Parents: 577e632
Author: dpavlov <[email protected]>
Authored: Thu Jan 18 13:53:29 2018 +0300
Committer: Alexey Goncharuk <[email protected]>
Committed: Thu Jan 18 13:53:29 2018 +0300

----------------------------------------------------------------------
 .../apache/ignite/IgniteSystemProperties.java   |   8 +
 .../cache/GridCacheSharedContext.java           |  21 +-
 .../GridCacheDatabaseSharedManager.java         |  11 +-
 .../cache/persistence/file/FileIO.java          |   7 +-
 .../cache/persistence/file/FilePageStore.java   |   3 +-
 .../persistence/file/FilePageStoreManager.java  |  47 +-
 .../file/FileVersionCheckingFactory.java        |  32 +-
 .../wal/FileWriteAheadLogManager.java           |  34 +-
 .../IgniteClusterActivateDeactivateTest.java    |   2 +-
 .../IgniteDataStorageMetricsSelfTest.java       |   2 +-
 .../IgnitePdsClientNearCachePutGetTest.java     |  33 +-
 .../persistence/IgnitePdsDynamicCacheTest.java  |   2 +-
 ...gnitePdsSingleNodePutGetPersistenceTest.java |  15 -
 .../db/IgnitePdsCacheRestoreTest.java           |   2 +-
 ...gniteCheckpointDirtyPagesForLowLoadTest.java |  29 +-
 .../db/file/IgnitePdsCheckpointSimpleTest.java  | 100 ++++
 .../db/file/IgnitePdsEvictionTest.java          |   2 +-
 .../database/IgniteDbAbstractTest.java          |  29 +-
 .../IgniteDbClientNearCachePutGetTest.java      |  39 --
 .../database/IgniteDbDynamicCacheSelfTest.java  |  11 +-
 .../IgniteDbMemoryLeakAbstractTest.java         |   2 +-
 .../database/IgniteDbPutGetAbstractTest.java    |   1 +
 .../ignite/testsuites/IgnitePdsTestSuite.java   |  34 +-
 .../ignite/testsuites/IgnitePdsTestSuite2.java  |  29 +-
 modules/direct-io/pom.xml                       | 122 +++++
 .../cache/persistence/file/AlignedBuffers.java  |  72 +++
 .../file/AlignedBuffersDirectFileIO.java        | 511 +++++++++++++++++++
 .../file/AlignedBuffersDirectFileIOFactory.java | 177 +++++++
 .../persistence/file/IgniteNativeIoLib.java     | 405 +++++++++++++++
 .../persistence/file/LinuxNativeIoPlugin.java   |  25 +
 .../file/LinuxNativeIoPluginProvider.java       | 238 +++++++++
 .../org.apache.ignite.plugin.PluginProvider     |   1 +
 .../IgniteNativeIoWithNoPersistenceTest.java    |  76 +++
 .../testsuites/IgnitePdsNativeIoTestSuite.java  |  38 ++
 .../testsuites/IgnitePdsNativeIoTestSuite2.java |  36 ++
 pom.xml                                         |   1 +
 36 files changed, 2051 insertions(+), 146 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/ignite/blob/9bba2d52/modules/core/src/main/java/org/apache/ignite/IgniteSystemProperties.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/IgniteSystemProperties.java 
b/modules/core/src/main/java/org/apache/ignite/IgniteSystemProperties.java
index 9d520fb..833773a 100644
--- a/modules/core/src/main/java/org/apache/ignite/IgniteSystemProperties.java
+++ b/modules/core/src/main/java/org/apache/ignite/IgniteSystemProperties.java
@@ -810,6 +810,14 @@ public final class IgniteSystemProperties {
     public static final String IGNITE_GRID_CLIENT_LOG_ENABLED = 
"IGNITE_GRID_CLIENT_LOG_ENABLED";
 
     /**
+     * When set to {@code true}, direct IO may be enabled. Direct IO enabled 
only if JAR file with corresponding
+     * feature is available in classpath and OS and filesystem settings allows 
to enable this mode.
+     * Default is {@code true}.
+     */
+    public static final String IGNITE_DIRECT_IO_ENABLED = 
"IGNITE_DIRECT_IO_ENABLED";
+
+
+    /**
      * Enforces singleton.
      */
     private IgniteSystemProperties() {

http://git-wip-us.apache.org/repos/asf/ignite/blob/9bba2d52/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheSharedContext.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheSharedContext.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheSharedContext.java
index 5bf1343..306723b 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheSharedContext.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheSharedContext.java
@@ -104,8 +104,8 @@ public class GridCacheSharedContext<K, V> {
     /** Deployment manager. */
     private GridCacheDeploymentManager<K, V> depMgr;
 
-    /** Write ahead log manager. */
-    private IgniteWriteAheadLogManager walMgr;
+    /** Write ahead log manager. {@code Null} if persistence is not enabled. */
+    @Nullable private IgniteWriteAheadLogManager walMgr;
 
     /** Database manager. */
     private IgniteCacheDatabaseSharedManager dbMgr;
@@ -113,8 +113,8 @@ public class GridCacheSharedContext<K, V> {
     /** Snp manager. */
     private IgniteCacheSnapshotManager snpMgr;
 
-    /** Page store manager. */
-    private IgnitePageStoreManager pageStoreMgr;
+    /** Page store manager. {@code Null} if persistence is not enabled. */
+    @Nullable private IgnitePageStoreManager pageStoreMgr;
 
     /** Affinity manager. */
     private CacheAffinitySharedManager affMgr;
@@ -169,6 +169,8 @@ public class GridCacheSharedContext<K, V> {
      * @param txMgr Transaction manager.
      * @param verMgr Version manager.
      * @param mvccMgr MVCC manager.
+     * @param pageStoreMgr Page store manager. {@code Null} if persistence is 
not enabled.
+     * @param walMgr WAL manager. {@code Null} if persistence is not enabled.
      * @param depMgr Deployment manager.
      * @param exchMgr Exchange manager.
      * @param affMgr Affinity manager.
@@ -182,8 +184,8 @@ public class GridCacheSharedContext<K, V> {
         IgniteTxManager txMgr,
         GridCacheVersionManager verMgr,
         GridCacheMvccManager mvccMgr,
-        IgnitePageStoreManager pageStoreMgr,
-        IgniteWriteAheadLogManager walMgr,
+        @Nullable IgnitePageStoreManager pageStoreMgr,
+        @Nullable IgniteWriteAheadLogManager walMgr,
         IgniteCacheDatabaseSharedManager dbMgr,
         IgniteCacheSnapshotManager snpMgr,
         GridCacheDeploymentManager<K, V> depMgr,
@@ -398,6 +400,7 @@ public class GridCacheSharedContext<K, V> {
      * @param jtaMgr JTA manager.
      * @param verMgr Version manager.
      * @param mvccMgr MVCC manager.
+     * @param pageStoreMgr Page store manager. {@code Null} if persistence is 
not enabled.
      * @param depMgr Deployment manager.
      * @param exchMgr Exchange manager.
      * @param affMgr Affinity manager.
@@ -409,7 +412,7 @@ public class GridCacheSharedContext<K, V> {
         CacheJtaManagerAdapter jtaMgr,
         GridCacheVersionManager verMgr,
         GridCacheMvccManager mvccMgr,
-        IgnitePageStoreManager pageStoreMgr,
+        @Nullable IgnitePageStoreManager pageStoreMgr,
         IgniteWriteAheadLogManager walMgr,
         IgniteCacheDatabaseSharedManager dbMgr,
         IgniteCacheSnapshotManager snpMgr,
@@ -667,9 +670,9 @@ public class GridCacheSharedContext<K, V> {
     }
 
     /**
-     * @return Page store manager.
+     * @return Page store manager. {@code Null} if persistence is not enabled.
      */
-    public IgnitePageStoreManager pageStore() {
+    @Nullable public IgnitePageStoreManager pageStore() {
         return pageStoreMgr;
     }
 

http://git-wip-us.apache.org/repos/asf/ignite/blob/9bba2d52/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/GridCacheDatabaseSharedManager.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/GridCacheDatabaseSharedManager.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/GridCacheDatabaseSharedManager.java
index 3599594..1b0eb6d 100755
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/GridCacheDatabaseSharedManager.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/GridCacheDatabaseSharedManager.java
@@ -304,7 +304,7 @@ public class GridCacheDatabaseSharedManager extends 
IgniteCacheDatabaseSharedMan
     /** Checkpoint runner thread pool. If null tasks are to be run in single 
thread */
     @Nullable private ExecutorService asyncRunner;
 
-    /** Buffer for the checkpoint threads. */
+    /** Thread local with buffers for the checkpoint threads. Each buffer 
represent one page for durable memory. */
     private ThreadLocal<ByteBuffer> threadBuf;
 
     /** */
@@ -2436,6 +2436,15 @@ public class GridCacheDatabaseSharedManager extends 
IgniteCacheDatabaseSharedMan
     }
 
     /**
+     * Replace thread local with buffers. Thread local should provide direct 
buffer with one page in length.
+     *
+     * @param threadBuf new thread-local with buffers for the checkpoint 
threads.
+     */
+    public void setThreadBuf(final ThreadLocal<ByteBuffer> threadBuf) {
+        this.threadBuf = threadBuf;
+    }
+
+    /**
      *
      */
     @SuppressWarnings("NakedNotify")

http://git-wip-us.apache.org/repos/asf/ignite/blob/9bba2d52/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/file/FileIO.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/file/FileIO.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/file/FileIO.java
index 849f03a..73e44b0 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/file/FileIO.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/file/FileIO.java
@@ -52,7 +52,8 @@ public interface FileIO extends AutoCloseable {
      *
      * @param destBuf Destination byte buffer.
      *
-     * @return Number of read bytes.
+     * @return Number of read bytes, or <tt>-1</tt> if the
+     *          given position is greater than or equal to the file's current 
size
      *
      * @throws IOException If some I/O error occurs.
      */
@@ -65,7 +66,9 @@ public interface FileIO extends AutoCloseable {
      * @param destBuf Destination byte buffer.
      * @param position Starting position of file.
      *
-     * @return Number of read bytes.
+     * @return Number of read bytes, possibly zero, or <tt>-1</tt> if the
+     *          given position is greater than or equal to the file's current
+     *          size
      *
      * @throws IOException If some I/O error occurs.
      */

http://git-wip-us.apache.org/repos/asf/ignite/blob/9bba2d52/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/file/FilePageStore.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/file/FilePageStore.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/file/FilePageStore.java
index 053ab2b..0ed9167 100755
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/file/FilePageStore.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/file/FilePageStore.java
@@ -451,7 +451,8 @@ public class FilePageStore implements PageStore {
 
             assert pageBuf.capacity() == pageSize;
             assert pageBuf.position() == 0;
-            assert pageBuf.order() == ByteOrder.nativeOrder();
+            assert pageBuf.order() == ByteOrder.nativeOrder() : "Page buffer 
order " + pageBuf.order()
+                + " should be same with " + ByteOrder.nativeOrder();
             assert PageIO.getType(pageBuf) != 0 : "Invalid state. Type is 0! 
pageId = " + U.hexLong(pageId);
             assert PageIO.getVersion(pageBuf) != 0 : "Invalid state. Version 
is 0! pageId = " + U.hexLong(pageId);
 

http://git-wip-us.apache.org/repos/asf/ignite/blob/9bba2d52/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/file/FilePageStoreManager.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/file/FilePageStoreManager.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/file/FilePageStoreManager.java
index d406df6..deacf79 100755
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/file/FilePageStoreManager.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/file/FilePageStoreManager.java
@@ -36,8 +36,8 @@ import java.util.Set;
 import java.util.concurrent.ConcurrentHashMap;
 import org.apache.ignite.IgniteCheckedException;
 import org.apache.ignite.configuration.CacheConfiguration;
-import org.apache.ignite.configuration.IgniteConfiguration;
 import org.apache.ignite.configuration.DataStorageConfiguration;
+import org.apache.ignite.configuration.IgniteConfiguration;
 import org.apache.ignite.internal.GridKernalContext;
 import org.apache.ignite.internal.pagemem.PageIdAllocator;
 import org.apache.ignite.internal.pagemem.PageIdUtils;
@@ -94,8 +94,20 @@ public class FilePageStoreManager extends 
GridCacheSharedManagerAdapter implemen
     /** */
     private final IgniteConfiguration igniteCfg;
 
+    /**
+     * File IO factory for page store, by default is taken from {@link #dsCfg}.
+     * May be overriden by block read/write.
+     */
+    private FileIOFactory pageStoreFileIoFactory;
+
+    /**
+     * File IO factory for page store V1 and for fast checking page store (non 
block read).
+     * By default is taken from {@link #dsCfg}.
+     */
+    private FileIOFactory pageStoreV1FileIoFactory;
+
     /** */
-    private DataStorageConfiguration dsCfg;
+    private final DataStorageConfiguration dsCfg;
 
     /** Absolute directory for file page store. Includes consistent id based 
folder. */
     private File storeWorkDir;
@@ -117,6 +129,8 @@ public class FilePageStoreManager extends 
GridCacheSharedManagerAdapter implemen
         assert dsCfg != null;
 
         this.dsCfg = dsCfg;
+
+        pageStoreV1FileIoFactory = pageStoreFileIoFactory = 
dsCfg.getFileIOFactory();
     }
 
     /** {@inheritDoc} */
@@ -373,8 +387,8 @@ public class FilePageStoreManager extends 
GridCacheSharedManagerAdapter implemen
         if (dirExisted && !idxFile.exists())
             grpsWithoutIdx.add(grpId);
 
-        FileVersionCheckingFactory pageStoreFactory = new 
FileVersionCheckingFactory(
-            dsCfg.getFileIOFactory(), igniteCfg.getDataStorageConfiguration());
+        FilePageStoreFactory pageStoreFactory = new FileVersionCheckingFactory(
+            pageStoreFileIoFactory, pageStoreV1FileIoFactory, 
igniteCfg.getDataStorageConfiguration());
 
         FilePageStore idxStore = 
pageStoreFactory.createPageStore(PageMemory.FLAG_IDX, idxFile);
 
@@ -687,6 +701,31 @@ public class FilePageStoreManager extends 
GridCacheSharedManagerAdapter implemen
     }
 
     /**
+     * @param pageStoreFileIoFactory File IO factory to override default, may 
be used for blocked read-write.
+     * @param pageStoreV1FileIoFactory File IO factory for reading V1 page 
store and for fast touching page files
+     *      (non blocking).
+     */
+    public void setPageStoreFileIOFactories(final FileIOFactory 
pageStoreFileIoFactory,
+        final FileIOFactory pageStoreV1FileIoFactory) {
+        this.pageStoreFileIoFactory = pageStoreFileIoFactory;
+        this.pageStoreV1FileIoFactory = pageStoreV1FileIoFactory;
+    }
+
+    /**
+     * @return File IO factory currently selected for page store.
+     */
+    public FileIOFactory getPageStoreFileIoFactory() {
+        return pageStoreFileIoFactory;
+    }
+
+    /**
+     * @return Durable memory page size in bytes.
+     */
+    public int pageSize() {
+        return dsCfg.getPageSize();
+    }
+
+    /**
      *
      */
     private static class CacheStoreHolder {

http://git-wip-us.apache.org/repos/asf/ignite/blob/9bba2d52/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/file/FileVersionCheckingFactory.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/file/FileVersionCheckingFactory.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/file/FileVersionCheckingFactory.java
index bab2cf0..bb25ab0 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/file/FileVersionCheckingFactory.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/file/FileVersionCheckingFactory.java
@@ -32,30 +32,46 @@ public class FileVersionCheckingFactory implements 
FilePageStoreFactory {
     public static final String LATEST_VERSION_OVERRIDE_PROPERTY = 
"file.page.store.latest.version.override";
 
     /** Latest page store version. */
-    public final static int LATEST_VERSION = 2;
+    public static final int LATEST_VERSION = 2;
 
     /** Factory to provide I/O interfaces for read/write operations with 
files. */
     private final FileIOFactory fileIOFactory;
 
+    /**
+     * Factory to provide I/O interfaces for read/write operations with files.
+     * This is backup factory for V1 page store.
+     */
+    private FileIOFactory fileIOFactoryStoreV1;
+
     /** Memory configuration. */
     private final DataStorageConfiguration memCfg;
 
     /**
-     * @param fileIOFactory File io factory.
+     * @param fileIOFactory File IO factory.
+     * @param fileIOFactoryStoreV1 File IO factory for V1 page store and for 
version checking.
      * @param memCfg Memory configuration.
      */
-    public FileVersionCheckingFactory(
-        FileIOFactory fileIOFactory, DataStorageConfiguration memCfg) {
+    public FileVersionCheckingFactory(FileIOFactory fileIOFactory, 
FileIOFactory fileIOFactoryStoreV1,
+        DataStorageConfiguration memCfg) {
         this.fileIOFactory = fileIOFactory;
+        this.fileIOFactoryStoreV1 = fileIOFactoryStoreV1;
         this.memCfg = memCfg;
     }
 
+    /**
+     * @param fileIOFactory File IO factory for V1 & V2 page store and for 
version checking.
+     * @param memCfg Memory configuration.
+     */
+    public FileVersionCheckingFactory(FileIOFactory fileIOFactory, 
DataStorageConfiguration memCfg) {
+        this(fileIOFactory, fileIOFactory, memCfg);
+    }
+
     /** {@inheritDoc} */
     @Override public FilePageStore createPageStore(byte type, File file) 
throws IgniteCheckedException {
         if (!file.exists())
             return createPageStore(type, file, latestVersion());
 
-        try (FileIO fileIO = fileIOFactory.create(file)) {
+        try (FileIO fileIO = fileIOFactoryStoreV1.create(file)) {
             int minHdr = FilePageStore.HEADER_SIZE;
 
             if (fileIO.size() < minHdr)
@@ -101,16 +117,16 @@ public class FileVersionCheckingFactory implements 
FilePageStoreFactory {
      * @param file File.
      * @param ver Version.
      */
-    public FilePageStore createPageStore(byte type, File file, int ver) throws 
IgniteCheckedException {
+    public FilePageStore createPageStore(byte type, File file, int ver) {
         switch (ver) {
             case FilePageStore.VERSION:
-                return new FilePageStore(type, file, fileIOFactory, memCfg);
+                return new FilePageStore(type, file, fileIOFactoryStoreV1, 
memCfg);
 
             case FilePageStoreV2.VERSION:
                 return new FilePageStoreV2(type, file, fileIOFactory, memCfg);
 
             default:
-                throw new IllegalArgumentException("Unknown version of file 
page store: " + ver);
+                throw new IllegalArgumentException("Unknown version of file 
page store: " + ver + " for file [" + file.getAbsolutePath() + "]");
         }
     }
 }

http://git-wip-us.apache.org/repos/asf/ignite/blob/9bba2d52/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/FileWriteAheadLogManager.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/FileWriteAheadLogManager.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/FileWriteAheadLogManager.java
index 5ae0226..444a207 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/FileWriteAheadLogManager.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/FileWriteAheadLogManager.java
@@ -227,7 +227,7 @@ public class FileWriteAheadLogManager extends 
GridCacheSharedManagerAdapter impl
     /** */
     private final boolean alwaysWriteFullPages;
 
-    /** WAL segment size in bytes */
+    /** WAL segment size in bytes. . This is maximum value, actual segments 
may be shorter. */
     private final long maxWalSegmentSize;
 
     /** */
@@ -309,7 +309,8 @@ public class FileWriteAheadLogManager extends 
GridCacheSharedManagerAdapter impl
     private AtomicLong lastRecordLoggedMs = new AtomicLong();
 
     /**
-     * Cancellable task for {@link WALMode#BACKGROUND}, should be cancelled at 
shutdown Null for non background modes
+     * Cancellable task for {@link WALMode#BACKGROUND}, should be cancelled at 
shutdown.
+     * Null for non background modes.
      */
     @Nullable private volatile GridTimeoutProcessor.CancelableTask 
backgroundFlushSchedule;
 
@@ -323,6 +324,11 @@ public class FileWriteAheadLogManager extends 
GridCacheSharedManagerAdapter impl
     private WALWriter walWriter;
 
     /**
+     * Listener invoked for each segment file IO initializer.
+     */
+    @Nullable private volatile IgniteInClosure<FileIO> createWalFileListener;
+
+    /**
      * @param ctx Kernal context.
      */
     public FileWriteAheadLogManager(@NotNull final GridKernalContext ctx) {
@@ -1048,6 +1054,11 @@ public class FileWriteAheadLogManager extends 
GridCacheSharedManagerAdapter impl
         try {
             FileIO fileIO = ioFactory.create(curFile);
 
+            IgniteInClosure<FileIO> lsnr = createWalFileListener;
+
+            if (lsnr != null)
+                lsnr.apply(fileIO);
+
             try {
                 int serVer = serializerVer;
 
@@ -1137,6 +1148,10 @@ public class FileWriteAheadLogManager extends 
GridCacheSharedManagerAdapter impl
                 try {
                     fileIO = ioFactory.create(nextFile);
 
+                    IgniteInClosure<FileIO> lsnr = createWalFileListener;
+                    if (lsnr != null)
+                        lsnr.apply(fileIO);
+
                     if (mmap) {
                         try {
                             MappedByteBuffer buf = 
fileIO.map((int)maxWalSegmentSize);
@@ -1341,6 +1356,21 @@ public class FileWriteAheadLogManager extends 
GridCacheSharedManagerAdapter impl
     }
 
     /**
+     * Setup listener for WAL segment write File IO creation.
+     * @param createWalFileListener Listener to be invoked for new segment 
file IO creation.
+     */
+    public void setCreateWalFileListener(@Nullable IgniteInClosure<FileIO> 
createWalFileListener) {
+        this.createWalFileListener = createWalFileListener;
+    }
+
+    /**
+     * @return {@link #maxWalSegmentSize}.
+     */
+    public long maxWalSegmentSize() {
+        return maxWalSegmentSize;
+    }
+
+    /**
      * File archiver operates on absolute segment indexes. For any given 
absolute segment index N we can calculate the
      * work WAL segment: S(N) = N % dsCfg.walSegments. When a work segment is 
finished, it is given to the archiver. If
      * the absolute index of last archived segment is denoted by A and the 
absolute index of next segment we want to

http://git-wip-us.apache.org/repos/asf/ignite/blob/9bba2d52/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteClusterActivateDeactivateTest.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteClusterActivateDeactivateTest.java
 
b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteClusterActivateDeactivateTest.java
index 267cb18..27739a2 100644
--- 
a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteClusterActivateDeactivateTest.java
+++ 
b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteClusterActivateDeactivateTest.java
@@ -122,7 +122,7 @@ public class IgniteClusterActivateDeactivateTest extends 
GridCommonAbstractTest
         }
 
         DataStorageConfiguration memCfg = new DataStorageConfiguration();
-        memCfg.setPageSize(1024);
+        memCfg.setPageSize(4 * 1024);
         memCfg.setDefaultDataRegionConfiguration(new DataRegionConfiguration()
             .setMaxSize(10 * 1024 * 1024)
             .setPersistenceEnabled(persistenceEnabled()));

http://git-wip-us.apache.org/repos/asf/ignite/blob/9bba2d52/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/IgniteDataStorageMetricsSelfTest.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/IgniteDataStorageMetricsSelfTest.java
 
b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/IgniteDataStorageMetricsSelfTest.java
index 93fa24e..5609995 100644
--- 
a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/IgniteDataStorageMetricsSelfTest.java
+++ 
b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/IgniteDataStorageMetricsSelfTest.java
@@ -85,7 +85,7 @@ public class IgniteDataStorageMetricsSelfTest extends 
GridCommonAbstractTest {
                 .setMetricsEnabled(true)
                 .setName("no-persistence"))
             .setWalMode(WALMode.LOG_ONLY)
-            .setPageSize(1024)
+            .setPageSize(4 * 1024)
             .setMetricsEnabled(true);
 
         cfg.setDataStorageConfiguration(memCfg);

http://git-wip-us.apache.org/repos/asf/ignite/blob/9bba2d52/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/IgnitePdsClientNearCachePutGetTest.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/IgnitePdsClientNearCachePutGetTest.java
 
b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/IgnitePdsClientNearCachePutGetTest.java
index 130a91c..8bc9155 100644
--- 
a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/IgnitePdsClientNearCachePutGetTest.java
+++ 
b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/IgnitePdsClientNearCachePutGetTest.java
@@ -17,41 +17,24 @@
 
 package org.apache.ignite.internal.processors.cache.persistence;
 
-import org.apache.ignite.configuration.DataStorageConfiguration;
-import org.apache.ignite.configuration.IgniteConfiguration;
-import org.apache.ignite.configuration.WALMode;
-import 
org.apache.ignite.internal.processors.database.IgniteDbClientNearCachePutGetTest;
-import org.apache.ignite.internal.util.typedef.internal.U;
-
-import static 
org.apache.ignite.internal.processors.cache.persistence.file.FilePageStoreManager.DFLT_STORE_DIR;
+import 
org.apache.ignite.internal.processors.database.IgniteDbPutGetAbstractTest;
 
 /**
  *
  */
-public class IgnitePdsClientNearCachePutGetTest extends 
IgniteDbClientNearCachePutGetTest {
+public class IgnitePdsClientNearCachePutGetTest extends 
IgniteDbPutGetAbstractTest {
     /** {@inheritDoc} */
-    @Override protected IgniteConfiguration getConfiguration(String gridName) 
throws Exception {
-        IgniteConfiguration cfg = super.getConfiguration(gridName);
-
-        cfg.setDataStorageConfiguration(
-            new DataStorageConfiguration()
-                .setWalMode(WALMode.LOG_ONLY)
-        );
-
-        return cfg;
+    @Override protected int gridCount() {
+        return 1;
     }
 
     /** {@inheritDoc} */
-    @Override protected void beforeTest() throws Exception {
-        deleteRecursively(U.resolveWorkDirectory(U.defaultWorkDirectory(), 
DFLT_STORE_DIR, false));
-
-        super.beforeTest();
+    @Override protected boolean indexingEnabled() {
+        return false;
     }
 
     /** {@inheritDoc} */
-    @Override protected void afterTest() throws Exception {
-        super.afterTest();
-
-        deleteRecursively(U.resolveWorkDirectory(U.defaultWorkDirectory(), 
DFLT_STORE_DIR, false));
+    @Override protected boolean withClientNearCache() {
+        return true;
     }
 }

http://git-wip-us.apache.org/repos/asf/ignite/blob/9bba2d52/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/IgnitePdsDynamicCacheTest.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/IgnitePdsDynamicCacheTest.java
 
b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/IgnitePdsDynamicCacheTest.java
index 7e0cf82..ba1933f 100644
--- 
a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/IgnitePdsDynamicCacheTest.java
+++ 
b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/IgnitePdsDynamicCacheTest.java
@@ -49,7 +49,7 @@ public class IgnitePdsDynamicCacheTest extends 
IgniteDbDynamicCacheSelfTest {
             .setDefaultDataRegionConfiguration(
                 new DataRegionConfiguration().setMaxSize(200 * 1024 * 
1024).setPersistenceEnabled(true))
             .setWalMode(WALMode.LOG_ONLY)
-            .setPageSize(1024);
+            .setPageSize(4 * 1024);
 
         cfg.setDataStorageConfiguration(memCfg);
 

http://git-wip-us.apache.org/repos/asf/ignite/blob/9bba2d52/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/IgnitePdsSingleNodePutGetPersistenceTest.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/IgnitePdsSingleNodePutGetPersistenceTest.java
 
b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/IgnitePdsSingleNodePutGetPersistenceTest.java
index 18e31fc..59a6f84 100644
--- 
a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/IgnitePdsSingleNodePutGetPersistenceTest.java
+++ 
b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/IgnitePdsSingleNodePutGetPersistenceTest.java
@@ -17,9 +17,6 @@
 
 package org.apache.ignite.internal.processors.cache.persistence;
 
-import org.apache.ignite.configuration.DataStorageConfiguration;
-import org.apache.ignite.configuration.IgniteConfiguration;
-import org.apache.ignite.configuration.WALMode;
 import 
org.apache.ignite.internal.processors.database.IgniteDbSingleNodePutGetTest;
 import org.apache.ignite.internal.util.typedef.internal.U;
 
@@ -30,18 +27,6 @@ import static 
org.apache.ignite.internal.processors.cache.persistence.file.FileP
  */
 public class IgnitePdsSingleNodePutGetPersistenceTest extends 
IgniteDbSingleNodePutGetTest {
     /** {@inheritDoc} */
-    @Override protected IgniteConfiguration getConfiguration(String gridName) 
throws Exception {
-        IgniteConfiguration cfg = super.getConfiguration(gridName);
-
-        cfg.setDataStorageConfiguration(
-            new DataStorageConfiguration()
-            .setWalMode(WALMode.LOG_ONLY)
-        );
-
-        return cfg;
-    }
-
-    /** {@inheritDoc} */
     @Override protected void beforeTest() throws Exception {
         deleteRecursively(U.resolveWorkDirectory(U.defaultWorkDirectory(), 
DFLT_STORE_DIR, false));
 

http://git-wip-us.apache.org/repos/asf/ignite/blob/9bba2d52/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/db/IgnitePdsCacheRestoreTest.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/db/IgnitePdsCacheRestoreTest.java
 
b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/db/IgnitePdsCacheRestoreTest.java
index 577cf9a..7da765b 100644
--- 
a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/db/IgnitePdsCacheRestoreTest.java
+++ 
b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/db/IgnitePdsCacheRestoreTest.java
@@ -61,7 +61,7 @@ public class IgnitePdsCacheRestoreTest extends 
GridCommonAbstractTest {
         DataStorageConfiguration memCfg = new DataStorageConfiguration()
             .setDefaultDataRegionConfiguration(
                 new DataRegionConfiguration().setMaxSize(10 * 1024 * 
1024).setPersistenceEnabled(true))
-            .setPageSize(1024)
+            .setPageSize(4 * 1024)
             .setWalMode(WALMode.LOG_ONLY);
 
         memCfg.setDataRegionConfigurations(new DataRegionConfiguration()

http://git-wip-us.apache.org/repos/asf/ignite/blob/9bba2d52/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/db/checkpoint/IgniteCheckpointDirtyPagesForLowLoadTest.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/db/checkpoint/IgniteCheckpointDirtyPagesForLowLoadTest.java
 
b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/db/checkpoint/IgniteCheckpointDirtyPagesForLowLoadTest.java
index 34662a7..782949f 100644
--- 
a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/db/checkpoint/IgniteCheckpointDirtyPagesForLowLoadTest.java
+++ 
b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/db/checkpoint/IgniteCheckpointDirtyPagesForLowLoadTest.java
@@ -32,6 +32,7 @@ import 
org.apache.ignite.configuration.DataStorageConfiguration;
 import org.apache.ignite.configuration.IgniteConfiguration;
 import org.apache.ignite.configuration.WALMode;
 import org.apache.ignite.internal.IgniteEx;
+import org.apache.ignite.internal.IgniteFutureTimeoutCheckedException;
 import 
org.apache.ignite.internal.processors.cache.persistence.GridCacheDatabaseSharedManager;
 import org.apache.ignite.internal.util.typedef.internal.U;
 import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest;
@@ -130,11 +131,21 @@ public class IgniteCheckpointDirtyPagesForLowLoadTest 
extends GridCommonAbstract
 
                 ignite.cache(fullname).put(d, d);
 
-                log.info("Put to " + fullname + " value " + d);
+                if (log.isInfoEnabled())
+                    log.info("Put to cache [" + fullname + "] value " + d);
 
-                db.wakeupForCheckpoint("").get();
+                final int timeout = 5000;
+                try {
+                    db.wakeupForCheckpoint("").get(timeout, 
TimeUnit.MILLISECONDS);
+                }
+                catch (IgniteFutureTimeoutCheckedException e) {
+                    continue;
+                }
 
-                int currCpPages = 
waitForCurrentCheckpointPagesCounterUpdated(db);
+                int currCpPages = 
waitForCurrentCheckpointPagesCounterUpdated(db, timeout);
+
+                if (currCpPages < 0)
+                    continue;
 
                 pageCntObserved.add(currCpPages);
 
@@ -158,15 +169,23 @@ public class IgniteCheckpointDirtyPagesForLowLoadTest 
extends GridCommonAbstract
     }
 
     /**
+     * Waits counter of pages will be set up. If it is not changed for timeout 
milliseconds, method returns negative
+     * value.
+     *
      * @param db DB shared manager.
-     * @return counter when it becomes non-zero.
+     * @param timeout milliseconds to wait.
+     * @return counter when it becomes non-zero, negative value indicates 
timeout during wait for update.
      */
-    private int 
waitForCurrentCheckpointPagesCounterUpdated(GridCacheDatabaseSharedManager db) {
+    private int 
waitForCurrentCheckpointPagesCounterUpdated(GridCacheDatabaseSharedManager db, 
int timeout) {
         int currCpPages = 0;
+        long start = System.currentTimeMillis();
 
         while (currCpPages == 0) {
             LockSupport.parkNanos(TimeUnit.MILLISECONDS.toNanos(1));
             currCpPages = db.currentCheckpointPagesCount();
+
+            if (currCpPages == 0 && ((System.currentTimeMillis() - start) > 
timeout))
+                return -1;
         }
 
         return currCpPages;

http://git-wip-us.apache.org/repos/asf/ignite/blob/9bba2d52/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/db/file/IgnitePdsCheckpointSimpleTest.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/db/file/IgnitePdsCheckpointSimpleTest.java
 
b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/db/file/IgnitePdsCheckpointSimpleTest.java
new file mode 100644
index 0000000..4655fcc
--- /dev/null
+++ 
b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/db/file/IgnitePdsCheckpointSimpleTest.java
@@ -0,0 +1,100 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.cache.persistence.db.file;
+
+import com.google.common.base.Strings;
+import java.util.concurrent.TimeUnit;
+import org.apache.ignite.IgniteCache;
+import org.apache.ignite.configuration.DataRegionConfiguration;
+import org.apache.ignite.configuration.DataStorageConfiguration;
+import org.apache.ignite.configuration.IgniteConfiguration;
+import org.apache.ignite.internal.IgniteEx;
+import org.apache.ignite.testframework.GridTestUtils;
+import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest;
+import org.jetbrains.annotations.NotNull;
+
+/**
+ * Puts data into grid, waits for checkpoint to start and then verifies data
+ */
+public class IgnitePdsCheckpointSimpleTest extends GridCommonAbstractTest {
+
+    /** {@inheritDoc} */
+    @Override protected IgniteConfiguration getConfiguration(String 
igniteInstanceName) throws Exception {
+        IgniteConfiguration cfg = super.getConfiguration(igniteInstanceName);
+
+        DataRegionConfiguration regCfg = new 
DataRegionConfiguration().setPersistenceEnabled(true);
+
+        DataStorageConfiguration dsCfg = new DataStorageConfiguration()
+            .setPageSize(4 * 1024)
+            .setDefaultDataRegionConfiguration(regCfg)
+            .setCheckpointFrequency(TimeUnit.SECONDS.toMillis(10));
+
+        return cfg.setDataStorageConfiguration(dsCfg);
+    }
+
+    /** {@inheritDoc} */
+    @Override protected void beforeTestsStarted() throws Exception {
+        super.beforeTestsStarted();
+
+        GridTestUtils.deleteDbFiles();
+    }
+
+    /** {@inheritDoc} */
+    @Override protected void afterTest() throws Exception {
+        super.afterTest();
+
+        stopAllGrids();
+    }
+
+    /**
+     * Checks if same data can be loaded after checkpoint.
+     * @throws Exception if failed.
+     */
+    public void testRecoveryAfterCpEnd() throws Exception {
+        IgniteEx ignite = startGrid(0);
+
+        ignite.active(true);
+
+        IgniteCache<Object, Object> cache = ignite.getOrCreateCache("cache");
+
+        for (int i = 0; i < 10000; i++)
+            cache.put(i, valueWithRedundancyForKey(i));
+
+        
ignite.context().cache().context().database().waitForCheckpoint("test");
+
+        stopAllGrids();
+
+        IgniteEx igniteRestart = startGrid(0);
+        igniteRestart.active(true);
+
+        IgniteCache<Object, Object> cacheRestart = 
igniteRestart.getOrCreateCache("cache");
+
+        for (int i = 0; i < 10000; i++)
+            assertEquals(valueWithRedundancyForKey(i), cacheRestart.get(i));
+
+        stopAllGrids();
+    }
+
+    /**
+     * @param i key.
+     * @return value with extra data, which allows to verify
+     */
+    @NotNull private String valueWithRedundancyForKey(int i) {
+        return Strings.repeat(Integer.toString(i), 10);
+    }
+}

http://git-wip-us.apache.org/repos/asf/ignite/blob/9bba2d52/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/db/file/IgnitePdsEvictionTest.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/db/file/IgnitePdsEvictionTest.java
 
b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/db/file/IgnitePdsEvictionTest.java
index 1b86e3d..117be9b 100644
--- 
a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/db/file/IgnitePdsEvictionTest.java
+++ 
b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/db/file/IgnitePdsEvictionTest.java
@@ -52,7 +52,7 @@ public class IgnitePdsEvictionTest extends 
GridCommonAbstractTest {
     private static final int NUMBER_OF_SEGMENTS = 64;
 
     /** */
-    private static final int PAGE_SIZE = 1024;
+    private static final int PAGE_SIZE = 4 * 1024;
 
     /** */
     private static final long CHUNK_SIZE = 1024 * 1024;

http://git-wip-us.apache.org/repos/asf/ignite/blob/9bba2d52/modules/core/src/test/java/org/apache/ignite/internal/processors/database/IgniteDbAbstractTest.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/test/java/org/apache/ignite/internal/processors/database/IgniteDbAbstractTest.java
 
b/modules/core/src/test/java/org/apache/ignite/internal/processors/database/IgniteDbAbstractTest.java
index 1d5b624..9ce3077 100644
--- 
a/modules/core/src/test/java/org/apache/ignite/internal/processors/database/IgniteDbAbstractTest.java
+++ 
b/modules/core/src/test/java/org/apache/ignite/internal/processors/database/IgniteDbAbstractTest.java
@@ -22,21 +22,23 @@ import java.util.Arrays;
 import org.apache.ignite.cache.affinity.rendezvous.RendezvousAffinityFunction;
 import org.apache.ignite.cache.query.annotations.QuerySqlField;
 import org.apache.ignite.configuration.CacheConfiguration;
+import org.apache.ignite.configuration.DataRegionConfiguration;
 import org.apache.ignite.configuration.DataStorageConfiguration;
 import org.apache.ignite.configuration.IgniteConfiguration;
+import org.apache.ignite.configuration.WALMode;
+import org.apache.ignite.internal.cluster.IgniteClusterEx;
 import org.apache.ignite.internal.processors.cache.persistence.tree.BPlusTree;
 import org.apache.ignite.internal.util.typedef.internal.S;
-import org.apache.ignite.internal.util.typedef.internal.U;
 import org.apache.ignite.spi.discovery.tcp.TcpDiscoverySpi;
 import org.apache.ignite.spi.discovery.tcp.ipfinder.TcpDiscoveryIpFinder;
 import org.apache.ignite.spi.discovery.tcp.ipfinder.vm.TcpDiscoveryVmIpFinder;
+import org.apache.ignite.testframework.GridTestUtils;
 import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest;
 
 import static org.apache.ignite.cache.CacheAtomicityMode.ATOMIC;
 import static org.apache.ignite.cache.CacheAtomicityMode.TRANSACTIONAL;
 import static org.apache.ignite.cache.CacheRebalanceMode.SYNC;
 import static org.apache.ignite.cache.CacheWriteSynchronizationMode.FULL_SYNC;
-import static 
org.apache.ignite.internal.processors.cache.persistence.file.FilePageStoreManager.DFLT_STORE_DIR;
 
 /**
  *
@@ -73,7 +75,12 @@ public abstract class IgniteDbAbstractTest extends 
GridCommonAbstractTest {
         if (isLargePage())
             dbCfg.setPageSize(16 * 1024);
         else
-            dbCfg.setPageSize(1024);
+            dbCfg.setPageSize(4 * 1024);
+
+        dbCfg.setWalMode(WALMode.LOG_ONLY);
+
+        dbCfg.setDefaultDataRegionConfiguration(
+            new 
DataRegionConfiguration().setPersistenceEnabled(true).setName("default"));
 
         configure(dbCfg);
 
@@ -151,7 +158,7 @@ public abstract class IgniteDbAbstractTest extends 
GridCommonAbstractTest {
     /**
      * @param mCfg DataStorageConfiguration.
      */
-    protected void configure(DataStorageConfiguration mCfg){
+    protected void configure(DataStorageConfiguration mCfg) {
         // No-op.
     }
 
@@ -164,7 +171,7 @@ public abstract class IgniteDbAbstractTest extends 
GridCommonAbstractTest {
 
     /** {@inheritDoc} */
     @Override protected void beforeTest() throws Exception {
-        deleteRecursively(U.resolveWorkDirectory(U.defaultWorkDirectory(), 
DFLT_STORE_DIR, false));
+        GridTestUtils.deleteDbFiles();
 
         startGrids(gridCount());
 
@@ -178,7 +185,15 @@ public abstract class IgniteDbAbstractTest extends 
GridCommonAbstractTest {
 
         assert gridCount() > 0;
 
-        grid(0).active(true);
+        final IgniteClusterEx cluster = grid(0).cluster();
+
+        if (log.isInfoEnabled())
+            log.info("BTL before activation: " + 
cluster.currentBaselineTopology());
+
+        cluster.active(true);
+
+        if (log.isInfoEnabled())
+            log.info("BTL after activation: " + 
cluster.currentBaselineTopology());
 
         awaitPartitionMapExchange();
     }
@@ -189,7 +204,7 @@ public abstract class IgniteDbAbstractTest extends 
GridCommonAbstractTest {
 
         stopAllGrids();
 
-        deleteRecursively(U.resolveWorkDirectory(U.defaultWorkDirectory(), 
DFLT_STORE_DIR, false));
+        GridTestUtils.deleteDbFiles();
     }
 
     /**

http://git-wip-us.apache.org/repos/asf/ignite/blob/9bba2d52/modules/core/src/test/java/org/apache/ignite/internal/processors/database/IgniteDbClientNearCachePutGetTest.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/test/java/org/apache/ignite/internal/processors/database/IgniteDbClientNearCachePutGetTest.java
 
b/modules/core/src/test/java/org/apache/ignite/internal/processors/database/IgniteDbClientNearCachePutGetTest.java
deleted file mode 100644
index aa08190..0000000
--- 
a/modules/core/src/test/java/org/apache/ignite/internal/processors/database/IgniteDbClientNearCachePutGetTest.java
+++ /dev/null
@@ -1,39 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- *
- */
-
-package org.apache.ignite.internal.processors.database;
-
-/**
- *
- */
-public class IgniteDbClientNearCachePutGetTest extends 
IgniteDbPutGetAbstractTest {
-    /** {@inheritDoc} */
-    @Override protected int gridCount() {
-        return 1;
-    }
-
-    /** {@inheritDoc} */
-    @Override protected boolean indexingEnabled() {
-        return false;
-    }
-
-    /** {@inheritDoc} */
-    @Override protected boolean withClientNearCache() {
-        return true;
-    }
-}

http://git-wip-us.apache.org/repos/asf/ignite/blob/9bba2d52/modules/core/src/test/java/org/apache/ignite/internal/processors/database/IgniteDbDynamicCacheSelfTest.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/test/java/org/apache/ignite/internal/processors/database/IgniteDbDynamicCacheSelfTest.java
 
b/modules/core/src/test/java/org/apache/ignite/internal/processors/database/IgniteDbDynamicCacheSelfTest.java
index e5c0e8a..00c2240 100644
--- 
a/modules/core/src/test/java/org/apache/ignite/internal/processors/database/IgniteDbDynamicCacheSelfTest.java
+++ 
b/modules/core/src/test/java/org/apache/ignite/internal/processors/database/IgniteDbDynamicCacheSelfTest.java
@@ -31,6 +31,8 @@ import org.apache.ignite.configuration.IgniteConfiguration;
 import org.apache.ignite.internal.util.typedef.internal.U;
 import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest;
 
+import static 
org.apache.ignite.internal.processors.cache.persistence.file.FilePageStoreManager.DFLT_STORE_DIR;
+
 /**
  *
  */
@@ -40,7 +42,7 @@ public class IgniteDbDynamicCacheSelfTest extends 
GridCommonAbstractTest {
         IgniteConfiguration cfg = super.getConfiguration(gridName);
 
         DataStorageConfiguration memCfg = new 
DataStorageConfiguration().setDefaultDataRegionConfiguration(
-            new DataRegionConfiguration().setMaxSize(200 * 1024 * 1024));
+            new DataRegionConfiguration().setMaxSize(200 * 1024 * 
1024).setPersistenceEnabled(true));
 
         cfg.setDataStorageConfiguration(memCfg);
 
@@ -58,6 +60,13 @@ public class IgniteDbDynamicCacheSelfTest extends 
GridCommonAbstractTest {
     }
 
     /** {@inheritDoc} */
+    @Override protected void beforeTest() throws Exception {
+        super.beforeTest();
+
+        deleteRecursively(U.resolveWorkDirectory(U.defaultWorkDirectory(), 
DFLT_STORE_DIR, false));
+    }
+
+    /** {@inheritDoc} */
     @Override protected void afterTest() throws Exception {
         super.afterTest();
 

http://git-wip-us.apache.org/repos/asf/ignite/blob/9bba2d52/modules/core/src/test/java/org/apache/ignite/internal/processors/database/IgniteDbMemoryLeakAbstractTest.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/test/java/org/apache/ignite/internal/processors/database/IgniteDbMemoryLeakAbstractTest.java
 
b/modules/core/src/test/java/org/apache/ignite/internal/processors/database/IgniteDbMemoryLeakAbstractTest.java
index c4e8bee..81b5515 100644
--- 
a/modules/core/src/test/java/org/apache/ignite/internal/processors/database/IgniteDbMemoryLeakAbstractTest.java
+++ 
b/modules/core/src/test/java/org/apache/ignite/internal/processors/database/IgniteDbMemoryLeakAbstractTest.java
@@ -79,7 +79,7 @@ public abstract class IgniteDbMemoryLeakAbstractTest extends 
IgniteDbAbstractTes
     @Override protected void configure(DataStorageConfiguration mCfg) {
         mCfg.setConcurrencyLevel(CONCURRENCY_LEVEL);
 
-        long size = (1024 * (isLargePage() ? 16 : 1) + 24) * pagesMax();
+        long size = (1024 * (isLargePage() ? 16 : 4) + 24) * pagesMax();
 
         mCfg.setDefaultDataRegionConfiguration(
             new DataRegionConfiguration().setMaxSize(Math.max(size, 
MIN_PAGE_CACHE_SIZE)).setName("default"));

http://git-wip-us.apache.org/repos/asf/ignite/blob/9bba2d52/modules/core/src/test/java/org/apache/ignite/internal/processors/database/IgniteDbPutGetAbstractTest.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/test/java/org/apache/ignite/internal/processors/database/IgniteDbPutGetAbstractTest.java
 
b/modules/core/src/test/java/org/apache/ignite/internal/processors/database/IgniteDbPutGetAbstractTest.java
index 0b0c763..84455ba 100644
--- 
a/modules/core/src/test/java/org/apache/ignite/internal/processors/database/IgniteDbPutGetAbstractTest.java
+++ 
b/modules/core/src/test/java/org/apache/ignite/internal/processors/database/IgniteDbPutGetAbstractTest.java
@@ -36,6 +36,7 @@ import org.apache.ignite.cache.query.QueryCursor;
 import org.apache.ignite.cache.query.ScanQuery;
 import org.apache.ignite.cache.query.SqlFieldsQuery;
 import org.apache.ignite.cache.query.SqlQuery;
+import org.apache.ignite.configuration.DataStorageConfiguration;
 import org.apache.ignite.configuration.NearCacheConfiguration;
 import org.apache.ignite.internal.IgniteEx;
 import org.apache.ignite.internal.processors.cache.GridCacheAdapter;

http://git-wip-us.apache.org/repos/asf/ignite/blob/9bba2d52/modules/core/src/test/java/org/apache/ignite/testsuites/IgnitePdsTestSuite.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/test/java/org/apache/ignite/testsuites/IgnitePdsTestSuite.java
 
b/modules/core/src/test/java/org/apache/ignite/testsuites/IgnitePdsTestSuite.java
index 59425c8..8e02279 100644
--- 
a/modules/core/src/test/java/org/apache/ignite/testsuites/IgnitePdsTestSuite.java
+++ 
b/modules/core/src/test/java/org/apache/ignite/testsuites/IgnitePdsTestSuite.java
@@ -33,7 +33,6 @@ import 
org.apache.ignite.internal.processors.cache.persistence.pagemem.PageMemor
 import 
org.apache.ignite.internal.processors.cache.persistence.pagemem.PageMemoryImplTest;
 import 
org.apache.ignite.internal.processors.cache.persistence.pagemem.PagesWriteThrottleSmokeTest;
 import 
org.apache.ignite.internal.processors.cache.persistence.wal.SegmentedRingByteBufferTest;
-import 
org.apache.ignite.internal.processors.database.IgniteDbClientNearCachePutGetTest;
 import 
org.apache.ignite.internal.processors.database.IgniteDbDynamicCacheSelfTest;
 import 
org.apache.ignite.internal.processors.database.IgniteDbMultiNodePutGetTest;
 import 
org.apache.ignite.internal.processors.database.IgniteDbPutGetWithCacheStoreTest;
@@ -52,25 +51,43 @@ public class IgnitePdsTestSuite extends TestSuite {
     public static TestSuite suite() throws Exception {
         TestSuite suite = new TestSuite("Ignite Persistent Store Test Suite");
 
+        addRealPageStoreTests(suite);
+
         // Basic PageMemory tests.
         suite.addTestSuite(PageMemoryImplNoLoadTest.class);
         suite.addTestSuite(IndexStoragePageMemoryImplTest.class);
         suite.addTestSuite(IgnitePdsEvictionTest.class);
         suite.addTestSuite(PageMemoryImplTest.class);
 
-        // Checkpointing smoke-test.
-        
suite.addTestSuite(IgnitePdsCheckpointSimulationWithRealCpDisabledTest.class);
-
         // BTree tests with store page memory.
         suite.addTestSuite(BPlusTreePageMemoryImplTest.class);
         suite.addTestSuite(BPlusTreeReuseListPageMemoryImplTest.class);
 
+        suite.addTestSuite(SegmentedRingByteBufferTest.class);
+
+        // Write throttling
+        suite.addTestSuite(PagesWriteThrottleSmokeTest.class);
+
+        return suite;
+    }
+
+    /**
+     * Fills {@code suite} with PDS test subset, which operates with real page 
store and does actual disk operations.
+     *
+     * @param suite suite to add tests into.
+     */
+    public static void addRealPageStoreTests(TestSuite suite) {
+        // Basic PageMemory tests.
+        suite.addTestSuite(IgnitePdsEvictionTest.class);
+
+        // Checkpointing smoke-test.
+        
suite.addTestSuite(IgnitePdsCheckpointSimulationWithRealCpDisabledTest.class);
+
         // Basic API tests.
         suite.addTestSuite(IgniteDbSingleNodePutGetTest.class);
         suite.addTestSuite(IgniteDbMultiNodePutGetTest.class);
         suite.addTestSuite(IgniteDbSingleNodeTinyPutGetTest.class);
         suite.addTestSuite(IgniteDbDynamicCacheSelfTest.class);
-        suite.addTestSuite(IgniteDbClientNearCachePutGetTest.class);
 
         // Persistence-enabled.
         suite.addTestSuite(IgnitePdsSingleNodePutGetPersistenceTest.class);
@@ -83,12 +100,5 @@ public class IgnitePdsTestSuite extends TestSuite {
         suite.addTestSuite(IgnitePdsCacheRestoreTest.class);
 
         suite.addTestSuite(DefaultPageSizeBackwardsCompatibilityTest.class);
-
-        suite.addTestSuite(SegmentedRingByteBufferTest.class);
-
-        // Write throttling
-        suite.addTestSuite(PagesWriteThrottleSmokeTest.class);
-
-        return suite;
     }
 }

http://git-wip-us.apache.org/repos/asf/ignite/blob/9bba2d52/modules/core/src/test/java/org/apache/ignite/testsuites/IgnitePdsTestSuite2.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/test/java/org/apache/ignite/testsuites/IgnitePdsTestSuite2.java
 
b/modules/core/src/test/java/org/apache/ignite/testsuites/IgnitePdsTestSuite2.java
index 8dc318a..3852a16 100644
--- 
a/modules/core/src/test/java/org/apache/ignite/testsuites/IgnitePdsTestSuite2.java
+++ 
b/modules/core/src/test/java/org/apache/ignite/testsuites/IgnitePdsTestSuite2.java
@@ -50,13 +50,30 @@ import 
org.apache.ignite.internal.processors.cache.persistence.db.wal.reader.Ign
 public class IgnitePdsTestSuite2 extends TestSuite {
     /**
      * @return Suite.
-     * @throws Exception If failed.
      */
-    public static TestSuite suite() throws Exception {
+    public static TestSuite suite() {
         TestSuite suite = new TestSuite("Ignite persistent Store Test Suite 
2");
 
         // Integrity test.
         suite.addTestSuite(IgniteDataIntegrityTests.class);
+
+        addRealPageStoreTests(suite);
+
+        // BaselineTopology tests
+        suite.addTestSuite(IgniteAllBaselineNodesOnlineFullApiSelfTest.class);
+        suite.addTestSuite(IgniteOfflineBaselineNodeFullApiSelfTest.class);
+        suite.addTestSuite(IgniteOnlineNodeOutOfBaselineFullApiSelfTest.class);
+
+        return suite;
+    }
+
+    /**
+     * Fills {@code suite} with PDS test subset, which operates with real page 
store and does actual disk operations.
+     *
+     * @param suite suite to add tests into.
+     */
+    public static void addRealPageStoreTests(TestSuite suite) {
+        // Integrity test.
         suite.addTestSuite(IgnitePdsRecoveryAfterFileCorruptionTest.class);
         suite.addTestSuite(IgnitePdsPageSizesTest.class);
 
@@ -89,6 +106,7 @@ public class IgnitePdsTestSuite2 extends TestSuite {
 
         suite.addTestSuite(IgniteWalFlushLogOnlySelfTest.class);
 
+        // Test suite uses Standalone WAL iterator to verify PDS content.
         suite.addTestSuite(IgniteWalReaderTest.class);
 
         suite.addTestSuite(IgnitePdsExchangeDuringCheckpointTest.class);
@@ -98,15 +116,8 @@ public class IgnitePdsTestSuite2 extends TestSuite {
 
         suite.addTestSuite(IgniteWalSerializerVersionTest.class);
 
-        // BaselineTopology tests
-        suite.addTestSuite(IgniteAllBaselineNodesOnlineFullApiSelfTest.class);
-        suite.addTestSuite(IgniteOfflineBaselineNodeFullApiSelfTest.class);
-        suite.addTestSuite(IgniteOnlineNodeOutOfBaselineFullApiSelfTest.class);
-
         suite.addTestSuite(WalCompactionTest.class);
 
         suite.addTestSuite(IgniteCheckpointDirtyPagesForLowLoadTest.class);
-
-        return suite;
     }
 }

http://git-wip-us.apache.org/repos/asf/ignite/blob/9bba2d52/modules/direct-io/pom.xml
----------------------------------------------------------------------
diff --git a/modules/direct-io/pom.xml b/modules/direct-io/pom.xml
new file mode 100644
index 0000000..1d24672
--- /dev/null
+++ b/modules/direct-io/pom.xml
@@ -0,0 +1,122 @@
+<?xml version="1.0" encoding="UTF-8"?>
+
+<!--
+  Licensed to the Apache Software Foundation (ASF) under one or more
+  contributor license agreements.  See the NOTICE file distributed with
+  this work for additional information regarding copyright ownership.
+  The ASF licenses this file to You under the Apache License, Version 2.0
+  (the "License"); you may not use this file except in compliance with
+  the License.  You may obtain a copy of the License at
+
+       http://www.apache.org/licenses/LICENSE-2.0
+
+  Unless required by applicable law or agreed to in writing, software
+  distributed under the License is distributed on an "AS IS" BASIS,
+  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  See the License for the specific language governing permissions and
+  limitations under the License.
+-->
+
+<!--
+    POM file.
+-->
+<project xmlns="http://maven.apache.org/POM/4.0.0"; 
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance";
+         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 
http://maven.apache.org/xsd/maven-4.0.0.xsd";>
+    <modelVersion>4.0.0</modelVersion>
+
+    <parent>
+        <groupId>org.apache.ignite</groupId>
+        <artifactId>ignite-parent</artifactId>
+        <version>1</version>
+        <relativePath>../../parent</relativePath>
+    </parent>
+
+    <artifactId>ignite-direct-io</artifactId>
+    <version>2.4.0-SNAPSHOT</version>
+    <url>http://ignite.apache.org</url>
+
+    <dependencies>
+
+        <dependency>
+            <groupId>org.apache.ignite</groupId>
+            <artifactId>ignite-core</artifactId>
+            <version>${project.version}</version>
+        </dependency>
+
+
+        <dependency>
+            <groupId>org.apache.ignite</groupId>
+            <artifactId>ignite-core</artifactId>
+            <version>${project.version}</version>
+            <type>test-jar</type>
+            <scope>test</scope>
+        </dependency>
+
+        <dependency>
+            <groupId>org.apache.ignite</groupId>
+            <artifactId>ignite-indexing</artifactId>
+            <version>${project.version}</version>
+        </dependency>
+
+        <dependency>
+            <groupId>org.springframework</groupId>
+            <artifactId>spring-beans</artifactId>
+            <version>${spring.version}</version>
+            <scope>test</scope>
+        </dependency>
+
+        <dependency>
+            <groupId>org.springframework</groupId>
+            <artifactId>spring-context</artifactId>
+            <version>${spring.version}</version>
+            <scope>test</scope>
+        </dependency>
+
+        <dependency>
+            <groupId>net.java.dev.jna</groupId>
+            <artifactId>jna</artifactId>
+            <version>4.5.0</version>
+        </dependency>
+
+        <dependency>
+            <groupId>com.google.guava</groupId>
+            <artifactId>guava</artifactId>
+            <version>${guava.version}</version>
+            <scope>test</scope>
+        </dependency>
+
+        <dependency>
+            <groupId>log4j</groupId>
+            <artifactId>log4j</artifactId>
+            <scope>test</scope>
+        </dependency>
+
+        <dependency>
+            <groupId>com.thoughtworks.xstream</groupId>
+            <artifactId>xstream</artifactId>
+            <version>1.4.8</version>
+            <scope>test</scope>
+        </dependency>
+
+        <dependency>
+            <groupId>org.mockito</groupId>
+            <artifactId>mockito-all</artifactId>
+            <version>1.9.5</version>
+            <scope>test</scope>
+        </dependency>
+    </dependencies>
+
+
+    <build>
+        <plugins>
+            <plugin>
+                <groupId>org.apache.maven.plugins</groupId>
+                <artifactId>maven-deploy-plugin</artifactId>
+                <version>2.8.2</version>
+                <configuration>
+                    <skip>true</skip>
+                </configuration>
+            </plugin>
+        </plugins>
+    </build>
+</project>

http://git-wip-us.apache.org/repos/asf/ignite/blob/9bba2d52/modules/direct-io/src/main/java/org/apache/ignite/internal/processors/cache/persistence/file/AlignedBuffers.java
----------------------------------------------------------------------
diff --git 
a/modules/direct-io/src/main/java/org/apache/ignite/internal/processors/cache/persistence/file/AlignedBuffers.java
 
b/modules/direct-io/src/main/java/org/apache/ignite/internal/processors/cache/persistence/file/AlignedBuffers.java
new file mode 100644
index 0000000..4bc7b08
--- /dev/null
+++ 
b/modules/direct-io/src/main/java/org/apache/ignite/internal/processors/cache/persistence/file/AlignedBuffers.java
@@ -0,0 +1,72 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.cache.persistence.file;
+
+import com.sun.jna.NativeLong;
+import com.sun.jna.Pointer;
+import com.sun.jna.ptr.PointerByReference;
+import java.nio.ByteBuffer;
+import org.apache.ignite.internal.mem.IgniteOutOfMemoryException;
+import org.apache.ignite.internal.util.GridUnsafe;
+
+/**
+ * Utility class for work with aligned buffers.
+ */
+@SuppressWarnings("WeakerAccess")
+public class AlignedBuffers {
+    /**
+     * Allocates align memory for use with O_DIRECT and returns native byte 
buffer.
+     * @param fsBlockSize alignment, FS ans OS block size.
+     * @param size capacity.
+     * @return byte buffer, to be released by {@link #free(ByteBuffer)}.
+     */
+    public static ByteBuffer allocate(int fsBlockSize, int size) {
+        assert fsBlockSize > 0;
+        assert size > 0;
+
+        PointerByReference refToPtr = new PointerByReference();
+
+        int retVal = IgniteNativeIoLib.posix_memalign(refToPtr, new 
NativeLong(fsBlockSize),
+            new NativeLong(size));
+
+        if (retVal != 0)
+            throw new IgniteOutOfMemoryException("Failed to allocate memory: " 
+ IgniteNativeIoLib.strerror(retVal));
+
+        return 
GridUnsafe.wrapPointer(Pointer.nativeValue(refToPtr.getValue()), size);
+    }
+
+    /**
+     * Frees the memory space used by direct buffer, which must have been 
returned by a previous call
+     * {@link #allocate(int, int)}.
+     *
+     * @param buf direct buffer to free.
+     */
+    public static void free(ByteBuffer buf) {
+        free(GridUnsafe.bufferAddress(buf));
+    }
+
+    /**
+     * Frees the memory space pointed to by {@code addr} - address of buffer, 
which must have been returned by a
+     * previous call {@link #allocate(int, int)}.
+     *
+     * @param addr direct buffer address to free.
+     */
+    public static void free(long addr) {
+        IgniteNativeIoLib.free(new Pointer(addr));
+    }
+}

http://git-wip-us.apache.org/repos/asf/ignite/blob/9bba2d52/modules/direct-io/src/main/java/org/apache/ignite/internal/processors/cache/persistence/file/AlignedBuffersDirectFileIO.java
----------------------------------------------------------------------
diff --git 
a/modules/direct-io/src/main/java/org/apache/ignite/internal/processors/cache/persistence/file/AlignedBuffersDirectFileIO.java
 
b/modules/direct-io/src/main/java/org/apache/ignite/internal/processors/cache/persistence/file/AlignedBuffersDirectFileIO.java
new file mode 100644
index 0000000..62e1db3
--- /dev/null
+++ 
b/modules/direct-io/src/main/java/org/apache/ignite/internal/processors/cache/persistence/file/AlignedBuffersDirectFileIO.java
@@ -0,0 +1,511 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.cache.persistence.file;
+
+import com.sun.jna.Native;
+import com.sun.jna.NativeLong;
+import com.sun.jna.Pointer;
+import java.io.File;
+import java.io.IOException;
+import java.nio.BufferOverflowException;
+import java.nio.ByteBuffer;
+import java.nio.MappedByteBuffer;
+import java.nio.file.OpenOption;
+import java.nio.file.StandardOpenOption;
+import java.util.Arrays;
+import java.util.List;
+import java.util.concurrent.atomic.AtomicReferenceArray;
+import org.apache.ignite.IgniteLogger;
+import org.apache.ignite.configuration.DataStorageConfiguration;
+import org.apache.ignite.internal.util.GridUnsafe;
+import org.apache.ignite.internal.util.typedef.internal.U;
+import org.jetbrains.annotations.NotNull;
+import org.jsr166.ConcurrentHashMap8;
+
+/**
+ * Limited capabilities Direct IO, which enables file write and read using 
aligned buffers and O_DIRECT mode.
+ *
+ * Works only for Linux
+ */
+public class AlignedBuffersDirectFileIO implements FileIO {
+    /** Negative value for file offset: read/write starting from current file 
position */
+    private static final int FILE_POS_USE_CURRENT = -1;
+
+    /** File system & linux block size. Minimal amount of data can be written 
using DirectIO. */
+    private final int fsBlockSize;
+
+    /** Durable memory Page size. Can have greater value than {@link 
#fsBlockSize}. */
+    private final int pageSize;
+
+    /** File. */
+    private final File file;
+
+    /** Logger. */
+    private final IgniteLogger log;
+
+    /** Thread local with buffers with capacity = one page {@link #pageSize} 
and aligned using {@link #fsBlockSize}. */
+    private ThreadLocal<ByteBuffer> tlbOnePageAligned;
+
+    /** Managed aligned buffers. Used to check if buffer is applicable for 
direct IO our data should be copied. */
+    private ConcurrentHashMap8<Long, Thread> managedAlignedBuffers;
+
+    /** File descriptor. */
+    private int fd = -1;
+
+    /** Number of instances to cache */
+    private static final int CACHED_LONGS = 512;
+
+    /** Value used as divisor in {@link #nativeLongCache}. Native longs 
divisible by this value will be cached. */
+    private static final int NL_CACHE_DIVISOR = 4096;
+
+    /** Native long instance cache. */
+    private static final AtomicReferenceArray<NativeLong> nativeLongCache = 
new AtomicReferenceArray<>(CACHED_LONGS);
+
+    /**
+     * Creates Direct File IO.
+     *
+     * @param fsBlockSize FS/OS block size.
+     * @param pageSize Durable memory Page size.
+     * @param file File to open.
+     * @param modes Open options (flags).
+     * @param tlbOnePageAligned Thread local with buffers with capacity = one 
page {@code pageSize} and aligned using
+     * {@code fsBlockSize}.
+     * @param managedAlignedBuffers Managed aligned buffers map, used to check 
if buffer is known.
+     * @param log Logger.
+     * @throws IOException if file open failed.
+     */
+    AlignedBuffersDirectFileIO(
+        int fsBlockSize,
+        int pageSize,
+        File file,
+        OpenOption[] modes,
+        ThreadLocal<ByteBuffer> tlbOnePageAligned,
+        ConcurrentHashMap8<Long, Thread> managedAlignedBuffers,
+        IgniteLogger log)
+        throws IOException {
+        this.log = log;
+        this.fsBlockSize = fsBlockSize;
+        this.pageSize = pageSize;
+        this.file = file;
+        this.tlbOnePageAligned = tlbOnePageAligned;
+        this.managedAlignedBuffers = managedAlignedBuffers;
+
+        String pathname = file.getAbsolutePath();
+
+        int openFlags = setupOpenFlags(modes, log, true);
+        int mode = IgniteNativeIoLib.DEFAULT_OPEN_MODE;
+        int fd = IgniteNativeIoLib.open(pathname, openFlags, mode);
+
+        if (fd < 0) {
+            int error = Native.getLastError();
+            String msg = "Error opening file [" + pathname + "] with flags [0x"
+                + String.format("%2X", openFlags) + ": DIRECT & " + 
Arrays.asList(modes)
+                + "], got error [" + error + ": " + getLastError() + "]";
+
+            if (error == IgniteNativeIoLib.E_INVAL) {
+                openFlags = setupOpenFlags(modes, log, false);
+                fd = IgniteNativeIoLib.open(pathname, openFlags, mode);
+
+                if (fd > 0) {
+                    U.warn(log, "Disable Direct IO mode for path " + 
file.getParentFile() +
+                        "(probably incompatible file system selected, for 
example, tmpfs): " + msg);
+
+                    this.fd = fd;
+
+                    return;
+                }
+            }
+
+            throw new IOException(msg);
+        }
+
+        this.fd = fd;
+    }
+
+    /**
+     * Convert Java open options to native flags.
+     *
+     * @param modes java options.
+     * @param log logger.
+     * @param enableDirect flag for enabling option {@link 
IgniteNativeIoLib#O_DIRECT} .
+     * @return native flags for open method.
+     */
+    private static int setupOpenFlags(OpenOption[] modes, IgniteLogger log, 
boolean enableDirect) {
+        int flags = enableDirect ? IgniteNativeIoLib.O_DIRECT : 0;
+        List<OpenOption> openOptionList = Arrays.asList(modes);
+
+        for (OpenOption mode : openOptionList) {
+            if (mode == StandardOpenOption.READ) {
+                flags |= openOptionList.contains(StandardOpenOption.WRITE)
+                    ? IgniteNativeIoLib.O_RDWR
+                    : IgniteNativeIoLib.O_RDONLY;
+            }
+            else if (mode == StandardOpenOption.WRITE) {
+                flags |= openOptionList.contains(StandardOpenOption.READ)
+                    ? IgniteNativeIoLib.O_RDWR
+                    : IgniteNativeIoLib.O_WRONLY;
+            }
+            else if (mode == StandardOpenOption.CREATE)
+                flags |= IgniteNativeIoLib.O_CREAT;
+            else if (mode == StandardOpenOption.TRUNCATE_EXISTING)
+                flags |= IgniteNativeIoLib.O_TRUNC;
+            else if (mode == StandardOpenOption.SYNC)
+                flags |= IgniteNativeIoLib.O_SYNC;
+            else
+                log.error("Unsupported open option [" + mode + "]");
+        }
+
+        return flags;
+    }
+
+    /** {@inheritDoc} */
+    @Override public long position() throws IOException {
+        long position = IgniteNativeIoLib.lseek(fdCheckOpened(), 0, 
IgniteNativeIoLib.SEEK_CUR);
+
+        if (position < 0) {
+            throw new IOException(String.format("Error checking file [%s] 
position: %s",
+                file, getLastError()));
+        }
+
+        return position;
+    }
+
+    /** {@inheritDoc} */
+    @Override public void position(long newPosition) throws IOException {
+        if (IgniteNativeIoLib.lseek(fdCheckOpened(), newPosition, 
IgniteNativeIoLib.SEEK_SET) < 0) {
+            throw new IOException(String.format("Error setting file [%s] 
position to [%s]: %s",
+                file, Long.toString(newPosition), getLastError()));
+        }
+    }
+
+    /** {@inheritDoc} */
+    @Override public int read(ByteBuffer destBuf) throws IOException {
+        return read(destBuf, FILE_POS_USE_CURRENT);
+    }
+
+    /** {@inheritDoc} */
+    @Override public int read(ByteBuffer destBuf, long filePosition) throws 
IOException {
+        int size = checkSizeIsPadded(destBuf.remaining());
+
+        if (isKnownAligned(destBuf))
+            return readIntoAlignedBuffer(destBuf, filePosition);
+
+        boolean useTlb = size == pageSize;
+        ByteBuffer alignedBuf = useTlb ? tlbOnePageAligned.get() : 
AlignedBuffers.allocate(fsBlockSize, size);
+
+        try {
+            assert alignedBuf.position() == 0: "Temporary aligned buffer is in 
incorrect state: position is set incorrectly";
+            assert alignedBuf.limit() == size: "Temporary aligned buffer is in 
incorrect state: limit is set incorrectly";
+
+            int loaded = readIntoAlignedBuffer(alignedBuf, filePosition);
+
+            alignedBuf.flip();
+
+            if (loaded > 0)
+                  destBuf.put(alignedBuf);
+
+            return loaded;
+        }
+        finally {
+            alignedBuf.clear();
+
+            if (!useTlb)
+                AlignedBuffers.free(alignedBuf);
+        }
+    }
+
+    /** {@inheritDoc} */
+    @Override public int read(byte[] buf, int off, int len) throws IOException 
{
+        return read(ByteBuffer.wrap(buf, off, len));
+    }
+
+    /** {@inheritDoc} */
+    @Override public int write(ByteBuffer srcBuf) throws IOException {
+        return write(srcBuf, FILE_POS_USE_CURRENT);
+    }
+
+    /** {@inheritDoc} */
+    @Override public int write(ByteBuffer srcBuf, long filePosition) throws 
IOException {
+        int size = checkSizeIsPadded(srcBuf.remaining());
+
+        if (isKnownAligned(srcBuf))
+            return writeFromAlignedBuffer(srcBuf, filePosition);
+
+        boolean useTlb = size == pageSize;
+        ByteBuffer alignedBuf = useTlb ? tlbOnePageAligned.get() : 
AlignedBuffers.allocate(fsBlockSize, size);
+
+        try {
+            assert alignedBuf.position() == 0 : "Temporary aligned buffer is 
in incorrect state: position is set incorrectly";
+            assert alignedBuf.limit() == size : "Temporary aligned buffer is 
in incorrect state: limit is set incorrectly";
+
+            int initPos = srcBuf.position();
+
+            alignedBuf.put(srcBuf);
+            alignedBuf.flip();
+
+            srcBuf.position(initPos); // will update later from write results
+
+            int written = writeFromAlignedBuffer(alignedBuf, filePosition);
+
+            if (written > 0)
+                srcBuf.position(initPos + written);
+
+            return written;
+        }
+        finally {
+            alignedBuf.clear();
+
+            if (!useTlb)
+                AlignedBuffers.free(alignedBuf);
+        }
+    }
+
+    /**
+     * Checks if we can run fast path: we got well known buffer is already 
aligned.
+     *
+     * @param srcBuf buffer to check if it is known buffer.
+     * @return {@code true} if this buffer was allocated with alignment, may 
be used directly.
+     */
+    private boolean isKnownAligned(ByteBuffer srcBuf) {
+        return srcBuf.isDirect()
+            && managedAlignedBuffers != null
+            && 
managedAlignedBuffers.containsKey(GridUnsafe.bufferAddress(srcBuf));
+    }
+
+    /**
+     * Check if size is appropriate for aligned/direct IO.
+     *
+     * @param size buffer size to write, should be divisible by {@link 
#fsBlockSize}.
+     * @return size from parameter.
+     * @throws IOException if provided size can't be written using direct IO.
+     */
+    private int checkSizeIsPadded(int size) throws IOException {
+        if (size % fsBlockSize != 0) {
+            throw new IOException(
+                String.format("Unable to apply DirectIO for read/write buffer 
[%d] bytes on file system " +
+                    "block size [%d]. Consider setting %s.setPageSize(%d) or 
disable Direct IO.",
+                    size, fsBlockSize, 
DataStorageConfiguration.class.getSimpleName(), fsBlockSize));
+        }
+
+        return size;
+    }
+
+    /**
+     * Checks if file is opened and returns descriptor.
+     *
+     * @return file descriptor.
+     * @throws IOException if file not opened.
+     */
+    private int fdCheckOpened() throws IOException {
+        if (fd < 0)
+            throw new IOException(String.format("Error %s not opened", file));
+
+        return fd;
+    }
+
+    /**
+     * Read bytes from file using Native IO and aligned buffers.
+     *
+     * @param destBuf Destination aligned byte buffer.
+     * @param filePos Starting position of file. Providing {@link 
#FILE_POS_USE_CURRENT} means it is required
+     *     to read from current file position.
+     * @return number of bytes read from file, or <tt>-1</tt> if tried to read 
past EOF for file.
+     * @throws IOException if reading failed.
+     */
+    private int readIntoAlignedBuffer(ByteBuffer destBuf, long filePos) throws 
IOException {
+        int pos = destBuf.position();
+        int limit = destBuf.limit();
+        int toRead = pos <= limit ? limit - pos : 0;
+
+        if (toRead == 0)
+            return 0;
+
+        if ((pos + toRead) > destBuf.capacity())
+            throw new BufferOverflowException();
+
+        int rd;
+        Pointer ptr = bufferPtrAtPosition(destBuf, pos);
+
+        if (filePos == FILE_POS_USE_CURRENT)
+            rd = IgniteNativeIoLib.read(fdCheckOpened(), ptr, 
nl(toRead)).intValue();
+        else
+            rd = IgniteNativeIoLib.pread(fdCheckOpened(), ptr, nl(toRead), 
nl(filePos)).intValue();
+
+        if (rd == 0)
+            return -1; //Tried to read past EOF for file
+
+        if (rd < 0)
+            throw new IOException(String.format("Error during reading file 
[%s] from position [%s] : %s",
+                file, filePos == FILE_POS_USE_CURRENT ? "current" : 
Long.toString(filePos), getLastError()));
+
+        destBuf.position(pos + rd);
+
+        return rd;
+    }
+
+    /**
+     * Writes the aligned native buffer starting at {@code buf} to the file at 
offset
+     * {@code filePos}. The file offset is not changed.
+     *
+     * @param srcBuf pointer to buffer.
+     * @param filePos position in file to write data. Providing {@link 
#FILE_POS_USE_CURRENT} means it is required
+     *     to read from current file position.
+     * @return the number of bytes written.
+     */
+    private int writeFromAlignedBuffer(ByteBuffer srcBuf, long filePos) throws 
IOException {
+        int pos = srcBuf.position();
+        int limit = srcBuf.limit();
+        int toWrite = pos <= limit ? limit - pos : 0;
+
+        if (toWrite == 0)
+            return 0;
+
+        int wr;
+        Pointer ptr = bufferPtrAtPosition(srcBuf, pos);
+
+        if (filePos == FILE_POS_USE_CURRENT)
+            wr = IgniteNativeIoLib.write(fdCheckOpened(), ptr, 
nl(toWrite)).intValue();
+        else
+            wr = IgniteNativeIoLib.pwrite(fdCheckOpened(), ptr, nl(toWrite), 
nl(filePos)).intValue();
+
+        if (wr < 0) {
+            throw new IOException(String.format("Error during writing file 
[%s] to position [%s]: %s",
+                file, filePos == FILE_POS_USE_CURRENT ? "current" : 
Long.toString(filePos), getLastError()));
+        }
+
+        if ((pos + wr) > limit) {
+            throw new IllegalStateException(String.format("Write illegal state 
for file [%s]: pos=%d, wr=%d, limit=%d",
+                file, pos, wr, limit));
+        }
+
+        srcBuf.position(pos + wr);
+
+        return wr;
+    }
+
+    /**
+     * @param val value to box to native long.
+     * @return native long.
+     */
+    @NotNull private static NativeLong nl(long val) {
+        if (val % NL_CACHE_DIVISOR == 0 && val < CACHED_LONGS * 
NL_CACHE_DIVISOR) {
+            int cacheIdx = (int)(val / NL_CACHE_DIVISOR);
+
+            NativeLong curCached = nativeLongCache.get(cacheIdx);
+
+            if (curCached != null)
+                return curCached;
+
+            NativeLong nl = new NativeLong(val);
+
+            nativeLongCache.compareAndSet(cacheIdx, null, nl);
+
+            return nl;
+        }
+        return new NativeLong(val);
+    }
+
+    /**
+     * Retrieve last error set by the OS as string. This corresponds to and 
<code>errno</code> on
+     * *nix platforms.
+     * @return displayable string with OS error info.
+     */
+    private static String getLastError() {
+        return IgniteNativeIoLib.strerror(Native.getLastError());
+    }
+
+    /**
+     * Gets address in memory for direct aligned buffer taking into account 
its current {@code position()} as offset.
+     * Produces warnings if data or offset seems to be not aligned.
+     *
+     * @param buf Direct aligned buffer.
+     * @param pos position, used as offset for resulting pointer.
+     * @return Buffer memory address.
+     */
+    @NotNull private Pointer bufferPtrAtPosition(ByteBuffer buf, int pos) {
+        long alignedPointer = GridUnsafe.bufferAddress(buf);
+
+        if (pos < 0)
+            throw new IllegalArgumentException();
+
+        if (pos > buf.capacity())
+            throw new BufferOverflowException();
+
+        if ((alignedPointer + pos) % fsBlockSize != 0) {
+            U.warn(log, String.format("IO Buffer Pointer [%d] and/or offset 
[%d] seems to be not aligned " +
+                "for FS block size [%d]. Direct IO may fail.", alignedPointer, 
buf.position(), fsBlockSize));
+        }
+
+        return new Pointer(alignedPointer + pos);
+    }
+
+    /** {@inheritDoc} */
+    @Override public void write(byte[] buf, int off, int len) throws 
IOException {
+        write(ByteBuffer.wrap(buf, off, len));
+    }
+
+    /** {@inheritDoc} */
+    @Override public MappedByteBuffer map(int maxWalSegmentSize) throws 
IOException {
+        throw new UnsupportedOperationException("AsynchronousFileChannel 
doesn't support mmap.");
+    }
+
+    /** {@inheritDoc} */
+    @Override public void force() throws IOException {
+        if (IgniteNativeIoLib.fsync(fdCheckOpened()) < 0)
+            throw new IOException(String.format("Error fsync()'ing %s, got 
%s", file, getLastError()));
+    }
+
+    /** {@inheritDoc} */
+    @Override public long size() throws IOException {
+        return file.length();
+    }
+
+    /** {@inheritDoc} */
+    @Override public void clear() throws IOException {
+        truncate(0);
+    }
+
+    /**
+     * Truncates this channel's file to the given size.
+     *
+     * <p> If the given size is less than the file's current size then the file
+     * is truncated, discarding any bytes beyond the new end of the file. If
+     * the given size is greater than or equal to the file's current size then
+     * the file is not modified. In either case, if this channel's file
+     * position is greater than the given size then it is set to that size.
+     * </p>
+     *
+     * @param  size The new size, a non-negative byte count
+     *
+     */
+    private void truncate(long size) throws IOException {
+        if (IgniteNativeIoLib.ftruncate(fdCheckOpened(), size) < 0)
+            throw new IOException(String.format("Error truncating file %s, got 
%s", file, getLastError()));
+
+        if (position() > size)
+            position(size);
+    }
+
+    /** {@inheritDoc} */
+    @Override public void close() throws IOException {
+        if (IgniteNativeIoLib.close(fdCheckOpened()) < 0)
+            throw new IOException(String.format("Error closing %s, got %s", 
file, getLastError()));
+
+        fd = -1;
+    }
+}

Reply via email to