IGNITE-7380 Implemented pluggable Direct IO - Fixes #3226. Signed-off-by: Alexey Goncharuk <[email protected]>
Project: http://git-wip-us.apache.org/repos/asf/ignite/repo Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/9bba2d52 Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/9bba2d52 Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/9bba2d52 Branch: refs/heads/master Commit: 9bba2d5203b0e67b206f153cac4a60a213ce9a15 Parents: 577e632 Author: dpavlov <[email protected]> Authored: Thu Jan 18 13:53:29 2018 +0300 Committer: Alexey Goncharuk <[email protected]> Committed: Thu Jan 18 13:53:29 2018 +0300 ---------------------------------------------------------------------- .../apache/ignite/IgniteSystemProperties.java | 8 + .../cache/GridCacheSharedContext.java | 21 +- .../GridCacheDatabaseSharedManager.java | 11 +- .../cache/persistence/file/FileIO.java | 7 +- .../cache/persistence/file/FilePageStore.java | 3 +- .../persistence/file/FilePageStoreManager.java | 47 +- .../file/FileVersionCheckingFactory.java | 32 +- .../wal/FileWriteAheadLogManager.java | 34 +- .../IgniteClusterActivateDeactivateTest.java | 2 +- .../IgniteDataStorageMetricsSelfTest.java | 2 +- .../IgnitePdsClientNearCachePutGetTest.java | 33 +- .../persistence/IgnitePdsDynamicCacheTest.java | 2 +- ...gnitePdsSingleNodePutGetPersistenceTest.java | 15 - .../db/IgnitePdsCacheRestoreTest.java | 2 +- ...gniteCheckpointDirtyPagesForLowLoadTest.java | 29 +- .../db/file/IgnitePdsCheckpointSimpleTest.java | 100 ++++ .../db/file/IgnitePdsEvictionTest.java | 2 +- .../database/IgniteDbAbstractTest.java | 29 +- .../IgniteDbClientNearCachePutGetTest.java | 39 -- .../database/IgniteDbDynamicCacheSelfTest.java | 11 +- .../IgniteDbMemoryLeakAbstractTest.java | 2 +- .../database/IgniteDbPutGetAbstractTest.java | 1 + .../ignite/testsuites/IgnitePdsTestSuite.java | 34 +- .../ignite/testsuites/IgnitePdsTestSuite2.java | 29 +- modules/direct-io/pom.xml | 122 +++++ .../cache/persistence/file/AlignedBuffers.java | 72 +++ .../file/AlignedBuffersDirectFileIO.java | 511 +++++++++++++++++++ .../file/AlignedBuffersDirectFileIOFactory.java | 177 +++++++ .../persistence/file/IgniteNativeIoLib.java | 405 +++++++++++++++ .../persistence/file/LinuxNativeIoPlugin.java | 25 + .../file/LinuxNativeIoPluginProvider.java | 238 +++++++++ .../org.apache.ignite.plugin.PluginProvider | 1 + .../IgniteNativeIoWithNoPersistenceTest.java | 76 +++ .../testsuites/IgnitePdsNativeIoTestSuite.java | 38 ++ .../testsuites/IgnitePdsNativeIoTestSuite2.java | 36 ++ pom.xml | 1 + 36 files changed, 2051 insertions(+), 146 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/ignite/blob/9bba2d52/modules/core/src/main/java/org/apache/ignite/IgniteSystemProperties.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/IgniteSystemProperties.java b/modules/core/src/main/java/org/apache/ignite/IgniteSystemProperties.java index 9d520fb..833773a 100644 --- a/modules/core/src/main/java/org/apache/ignite/IgniteSystemProperties.java +++ b/modules/core/src/main/java/org/apache/ignite/IgniteSystemProperties.java @@ -810,6 +810,14 @@ public final class IgniteSystemProperties { public static final String IGNITE_GRID_CLIENT_LOG_ENABLED = "IGNITE_GRID_CLIENT_LOG_ENABLED"; /** + * When set to {@code true}, direct IO may be enabled. Direct IO enabled only if JAR file with corresponding + * feature is available in classpath and OS and filesystem settings allows to enable this mode. + * Default is {@code true}. + */ + public static final String IGNITE_DIRECT_IO_ENABLED = "IGNITE_DIRECT_IO_ENABLED"; + + + /** * Enforces singleton. */ private IgniteSystemProperties() { http://git-wip-us.apache.org/repos/asf/ignite/blob/9bba2d52/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheSharedContext.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheSharedContext.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheSharedContext.java index 5bf1343..306723b 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheSharedContext.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheSharedContext.java @@ -104,8 +104,8 @@ public class GridCacheSharedContext<K, V> { /** Deployment manager. */ private GridCacheDeploymentManager<K, V> depMgr; - /** Write ahead log manager. */ - private IgniteWriteAheadLogManager walMgr; + /** Write ahead log manager. {@code Null} if persistence is not enabled. */ + @Nullable private IgniteWriteAheadLogManager walMgr; /** Database manager. */ private IgniteCacheDatabaseSharedManager dbMgr; @@ -113,8 +113,8 @@ public class GridCacheSharedContext<K, V> { /** Snp manager. */ private IgniteCacheSnapshotManager snpMgr; - /** Page store manager. */ - private IgnitePageStoreManager pageStoreMgr; + /** Page store manager. {@code Null} if persistence is not enabled. */ + @Nullable private IgnitePageStoreManager pageStoreMgr; /** Affinity manager. */ private CacheAffinitySharedManager affMgr; @@ -169,6 +169,8 @@ public class GridCacheSharedContext<K, V> { * @param txMgr Transaction manager. * @param verMgr Version manager. * @param mvccMgr MVCC manager. + * @param pageStoreMgr Page store manager. {@code Null} if persistence is not enabled. + * @param walMgr WAL manager. {@code Null} if persistence is not enabled. * @param depMgr Deployment manager. * @param exchMgr Exchange manager. * @param affMgr Affinity manager. @@ -182,8 +184,8 @@ public class GridCacheSharedContext<K, V> { IgniteTxManager txMgr, GridCacheVersionManager verMgr, GridCacheMvccManager mvccMgr, - IgnitePageStoreManager pageStoreMgr, - IgniteWriteAheadLogManager walMgr, + @Nullable IgnitePageStoreManager pageStoreMgr, + @Nullable IgniteWriteAheadLogManager walMgr, IgniteCacheDatabaseSharedManager dbMgr, IgniteCacheSnapshotManager snpMgr, GridCacheDeploymentManager<K, V> depMgr, @@ -398,6 +400,7 @@ public class GridCacheSharedContext<K, V> { * @param jtaMgr JTA manager. * @param verMgr Version manager. * @param mvccMgr MVCC manager. + * @param pageStoreMgr Page store manager. {@code Null} if persistence is not enabled. * @param depMgr Deployment manager. * @param exchMgr Exchange manager. * @param affMgr Affinity manager. @@ -409,7 +412,7 @@ public class GridCacheSharedContext<K, V> { CacheJtaManagerAdapter jtaMgr, GridCacheVersionManager verMgr, GridCacheMvccManager mvccMgr, - IgnitePageStoreManager pageStoreMgr, + @Nullable IgnitePageStoreManager pageStoreMgr, IgniteWriteAheadLogManager walMgr, IgniteCacheDatabaseSharedManager dbMgr, IgniteCacheSnapshotManager snpMgr, @@ -667,9 +670,9 @@ public class GridCacheSharedContext<K, V> { } /** - * @return Page store manager. + * @return Page store manager. {@code Null} if persistence is not enabled. */ - public IgnitePageStoreManager pageStore() { + @Nullable public IgnitePageStoreManager pageStore() { return pageStoreMgr; } http://git-wip-us.apache.org/repos/asf/ignite/blob/9bba2d52/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/GridCacheDatabaseSharedManager.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/GridCacheDatabaseSharedManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/GridCacheDatabaseSharedManager.java index 3599594..1b0eb6d 100755 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/GridCacheDatabaseSharedManager.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/GridCacheDatabaseSharedManager.java @@ -304,7 +304,7 @@ public class GridCacheDatabaseSharedManager extends IgniteCacheDatabaseSharedMan /** Checkpoint runner thread pool. If null tasks are to be run in single thread */ @Nullable private ExecutorService asyncRunner; - /** Buffer for the checkpoint threads. */ + /** Thread local with buffers for the checkpoint threads. Each buffer represent one page for durable memory. */ private ThreadLocal<ByteBuffer> threadBuf; /** */ @@ -2436,6 +2436,15 @@ public class GridCacheDatabaseSharedManager extends IgniteCacheDatabaseSharedMan } /** + * Replace thread local with buffers. Thread local should provide direct buffer with one page in length. + * + * @param threadBuf new thread-local with buffers for the checkpoint threads. + */ + public void setThreadBuf(final ThreadLocal<ByteBuffer> threadBuf) { + this.threadBuf = threadBuf; + } + + /** * */ @SuppressWarnings("NakedNotify") http://git-wip-us.apache.org/repos/asf/ignite/blob/9bba2d52/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/file/FileIO.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/file/FileIO.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/file/FileIO.java index 849f03a..73e44b0 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/file/FileIO.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/file/FileIO.java @@ -52,7 +52,8 @@ public interface FileIO extends AutoCloseable { * * @param destBuf Destination byte buffer. * - * @return Number of read bytes. + * @return Number of read bytes, or <tt>-1</tt> if the + * given position is greater than or equal to the file's current size * * @throws IOException If some I/O error occurs. */ @@ -65,7 +66,9 @@ public interface FileIO extends AutoCloseable { * @param destBuf Destination byte buffer. * @param position Starting position of file. * - * @return Number of read bytes. + * @return Number of read bytes, possibly zero, or <tt>-1</tt> if the + * given position is greater than or equal to the file's current + * size * * @throws IOException If some I/O error occurs. */ http://git-wip-us.apache.org/repos/asf/ignite/blob/9bba2d52/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/file/FilePageStore.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/file/FilePageStore.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/file/FilePageStore.java index 053ab2b..0ed9167 100755 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/file/FilePageStore.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/file/FilePageStore.java @@ -451,7 +451,8 @@ public class FilePageStore implements PageStore { assert pageBuf.capacity() == pageSize; assert pageBuf.position() == 0; - assert pageBuf.order() == ByteOrder.nativeOrder(); + assert pageBuf.order() == ByteOrder.nativeOrder() : "Page buffer order " + pageBuf.order() + + " should be same with " + ByteOrder.nativeOrder(); assert PageIO.getType(pageBuf) != 0 : "Invalid state. Type is 0! pageId = " + U.hexLong(pageId); assert PageIO.getVersion(pageBuf) != 0 : "Invalid state. Version is 0! pageId = " + U.hexLong(pageId); http://git-wip-us.apache.org/repos/asf/ignite/blob/9bba2d52/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/file/FilePageStoreManager.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/file/FilePageStoreManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/file/FilePageStoreManager.java index d406df6..deacf79 100755 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/file/FilePageStoreManager.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/file/FilePageStoreManager.java @@ -36,8 +36,8 @@ import java.util.Set; import java.util.concurrent.ConcurrentHashMap; import org.apache.ignite.IgniteCheckedException; import org.apache.ignite.configuration.CacheConfiguration; -import org.apache.ignite.configuration.IgniteConfiguration; import org.apache.ignite.configuration.DataStorageConfiguration; +import org.apache.ignite.configuration.IgniteConfiguration; import org.apache.ignite.internal.GridKernalContext; import org.apache.ignite.internal.pagemem.PageIdAllocator; import org.apache.ignite.internal.pagemem.PageIdUtils; @@ -94,8 +94,20 @@ public class FilePageStoreManager extends GridCacheSharedManagerAdapter implemen /** */ private final IgniteConfiguration igniteCfg; + /** + * File IO factory for page store, by default is taken from {@link #dsCfg}. + * May be overriden by block read/write. + */ + private FileIOFactory pageStoreFileIoFactory; + + /** + * File IO factory for page store V1 and for fast checking page store (non block read). + * By default is taken from {@link #dsCfg}. + */ + private FileIOFactory pageStoreV1FileIoFactory; + /** */ - private DataStorageConfiguration dsCfg; + private final DataStorageConfiguration dsCfg; /** Absolute directory for file page store. Includes consistent id based folder. */ private File storeWorkDir; @@ -117,6 +129,8 @@ public class FilePageStoreManager extends GridCacheSharedManagerAdapter implemen assert dsCfg != null; this.dsCfg = dsCfg; + + pageStoreV1FileIoFactory = pageStoreFileIoFactory = dsCfg.getFileIOFactory(); } /** {@inheritDoc} */ @@ -373,8 +387,8 @@ public class FilePageStoreManager extends GridCacheSharedManagerAdapter implemen if (dirExisted && !idxFile.exists()) grpsWithoutIdx.add(grpId); - FileVersionCheckingFactory pageStoreFactory = new FileVersionCheckingFactory( - dsCfg.getFileIOFactory(), igniteCfg.getDataStorageConfiguration()); + FilePageStoreFactory pageStoreFactory = new FileVersionCheckingFactory( + pageStoreFileIoFactory, pageStoreV1FileIoFactory, igniteCfg.getDataStorageConfiguration()); FilePageStore idxStore = pageStoreFactory.createPageStore(PageMemory.FLAG_IDX, idxFile); @@ -687,6 +701,31 @@ public class FilePageStoreManager extends GridCacheSharedManagerAdapter implemen } /** + * @param pageStoreFileIoFactory File IO factory to override default, may be used for blocked read-write. + * @param pageStoreV1FileIoFactory File IO factory for reading V1 page store and for fast touching page files + * (non blocking). + */ + public void setPageStoreFileIOFactories(final FileIOFactory pageStoreFileIoFactory, + final FileIOFactory pageStoreV1FileIoFactory) { + this.pageStoreFileIoFactory = pageStoreFileIoFactory; + this.pageStoreV1FileIoFactory = pageStoreV1FileIoFactory; + } + + /** + * @return File IO factory currently selected for page store. + */ + public FileIOFactory getPageStoreFileIoFactory() { + return pageStoreFileIoFactory; + } + + /** + * @return Durable memory page size in bytes. + */ + public int pageSize() { + return dsCfg.getPageSize(); + } + + /** * */ private static class CacheStoreHolder { http://git-wip-us.apache.org/repos/asf/ignite/blob/9bba2d52/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/file/FileVersionCheckingFactory.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/file/FileVersionCheckingFactory.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/file/FileVersionCheckingFactory.java index bab2cf0..bb25ab0 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/file/FileVersionCheckingFactory.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/file/FileVersionCheckingFactory.java @@ -32,30 +32,46 @@ public class FileVersionCheckingFactory implements FilePageStoreFactory { public static final String LATEST_VERSION_OVERRIDE_PROPERTY = "file.page.store.latest.version.override"; /** Latest page store version. */ - public final static int LATEST_VERSION = 2; + public static final int LATEST_VERSION = 2; /** Factory to provide I/O interfaces for read/write operations with files. */ private final FileIOFactory fileIOFactory; + /** + * Factory to provide I/O interfaces for read/write operations with files. + * This is backup factory for V1 page store. + */ + private FileIOFactory fileIOFactoryStoreV1; + /** Memory configuration. */ private final DataStorageConfiguration memCfg; /** - * @param fileIOFactory File io factory. + * @param fileIOFactory File IO factory. + * @param fileIOFactoryStoreV1 File IO factory for V1 page store and for version checking. * @param memCfg Memory configuration. */ - public FileVersionCheckingFactory( - FileIOFactory fileIOFactory, DataStorageConfiguration memCfg) { + public FileVersionCheckingFactory(FileIOFactory fileIOFactory, FileIOFactory fileIOFactoryStoreV1, + DataStorageConfiguration memCfg) { this.fileIOFactory = fileIOFactory; + this.fileIOFactoryStoreV1 = fileIOFactoryStoreV1; this.memCfg = memCfg; } + /** + * @param fileIOFactory File IO factory for V1 & V2 page store and for version checking. + * @param memCfg Memory configuration. + */ + public FileVersionCheckingFactory(FileIOFactory fileIOFactory, DataStorageConfiguration memCfg) { + this(fileIOFactory, fileIOFactory, memCfg); + } + /** {@inheritDoc} */ @Override public FilePageStore createPageStore(byte type, File file) throws IgniteCheckedException { if (!file.exists()) return createPageStore(type, file, latestVersion()); - try (FileIO fileIO = fileIOFactory.create(file)) { + try (FileIO fileIO = fileIOFactoryStoreV1.create(file)) { int minHdr = FilePageStore.HEADER_SIZE; if (fileIO.size() < minHdr) @@ -101,16 +117,16 @@ public class FileVersionCheckingFactory implements FilePageStoreFactory { * @param file File. * @param ver Version. */ - public FilePageStore createPageStore(byte type, File file, int ver) throws IgniteCheckedException { + public FilePageStore createPageStore(byte type, File file, int ver) { switch (ver) { case FilePageStore.VERSION: - return new FilePageStore(type, file, fileIOFactory, memCfg); + return new FilePageStore(type, file, fileIOFactoryStoreV1, memCfg); case FilePageStoreV2.VERSION: return new FilePageStoreV2(type, file, fileIOFactory, memCfg); default: - throw new IllegalArgumentException("Unknown version of file page store: " + ver); + throw new IllegalArgumentException("Unknown version of file page store: " + ver + " for file [" + file.getAbsolutePath() + "]"); } } } http://git-wip-us.apache.org/repos/asf/ignite/blob/9bba2d52/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/FileWriteAheadLogManager.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/FileWriteAheadLogManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/FileWriteAheadLogManager.java index 5ae0226..444a207 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/FileWriteAheadLogManager.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/FileWriteAheadLogManager.java @@ -227,7 +227,7 @@ public class FileWriteAheadLogManager extends GridCacheSharedManagerAdapter impl /** */ private final boolean alwaysWriteFullPages; - /** WAL segment size in bytes */ + /** WAL segment size in bytes. . This is maximum value, actual segments may be shorter. */ private final long maxWalSegmentSize; /** */ @@ -309,7 +309,8 @@ public class FileWriteAheadLogManager extends GridCacheSharedManagerAdapter impl private AtomicLong lastRecordLoggedMs = new AtomicLong(); /** - * Cancellable task for {@link WALMode#BACKGROUND}, should be cancelled at shutdown Null for non background modes + * Cancellable task for {@link WALMode#BACKGROUND}, should be cancelled at shutdown. + * Null for non background modes. */ @Nullable private volatile GridTimeoutProcessor.CancelableTask backgroundFlushSchedule; @@ -323,6 +324,11 @@ public class FileWriteAheadLogManager extends GridCacheSharedManagerAdapter impl private WALWriter walWriter; /** + * Listener invoked for each segment file IO initializer. + */ + @Nullable private volatile IgniteInClosure<FileIO> createWalFileListener; + + /** * @param ctx Kernal context. */ public FileWriteAheadLogManager(@NotNull final GridKernalContext ctx) { @@ -1048,6 +1054,11 @@ public class FileWriteAheadLogManager extends GridCacheSharedManagerAdapter impl try { FileIO fileIO = ioFactory.create(curFile); + IgniteInClosure<FileIO> lsnr = createWalFileListener; + + if (lsnr != null) + lsnr.apply(fileIO); + try { int serVer = serializerVer; @@ -1137,6 +1148,10 @@ public class FileWriteAheadLogManager extends GridCacheSharedManagerAdapter impl try { fileIO = ioFactory.create(nextFile); + IgniteInClosure<FileIO> lsnr = createWalFileListener; + if (lsnr != null) + lsnr.apply(fileIO); + if (mmap) { try { MappedByteBuffer buf = fileIO.map((int)maxWalSegmentSize); @@ -1341,6 +1356,21 @@ public class FileWriteAheadLogManager extends GridCacheSharedManagerAdapter impl } /** + * Setup listener for WAL segment write File IO creation. + * @param createWalFileListener Listener to be invoked for new segment file IO creation. + */ + public void setCreateWalFileListener(@Nullable IgniteInClosure<FileIO> createWalFileListener) { + this.createWalFileListener = createWalFileListener; + } + + /** + * @return {@link #maxWalSegmentSize}. + */ + public long maxWalSegmentSize() { + return maxWalSegmentSize; + } + + /** * File archiver operates on absolute segment indexes. For any given absolute segment index N we can calculate the * work WAL segment: S(N) = N % dsCfg.walSegments. When a work segment is finished, it is given to the archiver. If * the absolute index of last archived segment is denoted by A and the absolute index of next segment we want to http://git-wip-us.apache.org/repos/asf/ignite/blob/9bba2d52/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteClusterActivateDeactivateTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteClusterActivateDeactivateTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteClusterActivateDeactivateTest.java index 267cb18..27739a2 100644 --- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteClusterActivateDeactivateTest.java +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteClusterActivateDeactivateTest.java @@ -122,7 +122,7 @@ public class IgniteClusterActivateDeactivateTest extends GridCommonAbstractTest } DataStorageConfiguration memCfg = new DataStorageConfiguration(); - memCfg.setPageSize(1024); + memCfg.setPageSize(4 * 1024); memCfg.setDefaultDataRegionConfiguration(new DataRegionConfiguration() .setMaxSize(10 * 1024 * 1024) .setPersistenceEnabled(persistenceEnabled())); http://git-wip-us.apache.org/repos/asf/ignite/blob/9bba2d52/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/IgniteDataStorageMetricsSelfTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/IgniteDataStorageMetricsSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/IgniteDataStorageMetricsSelfTest.java index 93fa24e..5609995 100644 --- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/IgniteDataStorageMetricsSelfTest.java +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/IgniteDataStorageMetricsSelfTest.java @@ -85,7 +85,7 @@ public class IgniteDataStorageMetricsSelfTest extends GridCommonAbstractTest { .setMetricsEnabled(true) .setName("no-persistence")) .setWalMode(WALMode.LOG_ONLY) - .setPageSize(1024) + .setPageSize(4 * 1024) .setMetricsEnabled(true); cfg.setDataStorageConfiguration(memCfg); http://git-wip-us.apache.org/repos/asf/ignite/blob/9bba2d52/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/IgnitePdsClientNearCachePutGetTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/IgnitePdsClientNearCachePutGetTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/IgnitePdsClientNearCachePutGetTest.java index 130a91c..8bc9155 100644 --- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/IgnitePdsClientNearCachePutGetTest.java +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/IgnitePdsClientNearCachePutGetTest.java @@ -17,41 +17,24 @@ package org.apache.ignite.internal.processors.cache.persistence; -import org.apache.ignite.configuration.DataStorageConfiguration; -import org.apache.ignite.configuration.IgniteConfiguration; -import org.apache.ignite.configuration.WALMode; -import org.apache.ignite.internal.processors.database.IgniteDbClientNearCachePutGetTest; -import org.apache.ignite.internal.util.typedef.internal.U; - -import static org.apache.ignite.internal.processors.cache.persistence.file.FilePageStoreManager.DFLT_STORE_DIR; +import org.apache.ignite.internal.processors.database.IgniteDbPutGetAbstractTest; /** * */ -public class IgnitePdsClientNearCachePutGetTest extends IgniteDbClientNearCachePutGetTest { +public class IgnitePdsClientNearCachePutGetTest extends IgniteDbPutGetAbstractTest { /** {@inheritDoc} */ - @Override protected IgniteConfiguration getConfiguration(String gridName) throws Exception { - IgniteConfiguration cfg = super.getConfiguration(gridName); - - cfg.setDataStorageConfiguration( - new DataStorageConfiguration() - .setWalMode(WALMode.LOG_ONLY) - ); - - return cfg; + @Override protected int gridCount() { + return 1; } /** {@inheritDoc} */ - @Override protected void beforeTest() throws Exception { - deleteRecursively(U.resolveWorkDirectory(U.defaultWorkDirectory(), DFLT_STORE_DIR, false)); - - super.beforeTest(); + @Override protected boolean indexingEnabled() { + return false; } /** {@inheritDoc} */ - @Override protected void afterTest() throws Exception { - super.afterTest(); - - deleteRecursively(U.resolveWorkDirectory(U.defaultWorkDirectory(), DFLT_STORE_DIR, false)); + @Override protected boolean withClientNearCache() { + return true; } } http://git-wip-us.apache.org/repos/asf/ignite/blob/9bba2d52/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/IgnitePdsDynamicCacheTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/IgnitePdsDynamicCacheTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/IgnitePdsDynamicCacheTest.java index 7e0cf82..ba1933f 100644 --- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/IgnitePdsDynamicCacheTest.java +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/IgnitePdsDynamicCacheTest.java @@ -49,7 +49,7 @@ public class IgnitePdsDynamicCacheTest extends IgniteDbDynamicCacheSelfTest { .setDefaultDataRegionConfiguration( new DataRegionConfiguration().setMaxSize(200 * 1024 * 1024).setPersistenceEnabled(true)) .setWalMode(WALMode.LOG_ONLY) - .setPageSize(1024); + .setPageSize(4 * 1024); cfg.setDataStorageConfiguration(memCfg); http://git-wip-us.apache.org/repos/asf/ignite/blob/9bba2d52/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/IgnitePdsSingleNodePutGetPersistenceTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/IgnitePdsSingleNodePutGetPersistenceTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/IgnitePdsSingleNodePutGetPersistenceTest.java index 18e31fc..59a6f84 100644 --- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/IgnitePdsSingleNodePutGetPersistenceTest.java +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/IgnitePdsSingleNodePutGetPersistenceTest.java @@ -17,9 +17,6 @@ package org.apache.ignite.internal.processors.cache.persistence; -import org.apache.ignite.configuration.DataStorageConfiguration; -import org.apache.ignite.configuration.IgniteConfiguration; -import org.apache.ignite.configuration.WALMode; import org.apache.ignite.internal.processors.database.IgniteDbSingleNodePutGetTest; import org.apache.ignite.internal.util.typedef.internal.U; @@ -30,18 +27,6 @@ import static org.apache.ignite.internal.processors.cache.persistence.file.FileP */ public class IgnitePdsSingleNodePutGetPersistenceTest extends IgniteDbSingleNodePutGetTest { /** {@inheritDoc} */ - @Override protected IgniteConfiguration getConfiguration(String gridName) throws Exception { - IgniteConfiguration cfg = super.getConfiguration(gridName); - - cfg.setDataStorageConfiguration( - new DataStorageConfiguration() - .setWalMode(WALMode.LOG_ONLY) - ); - - return cfg; - } - - /** {@inheritDoc} */ @Override protected void beforeTest() throws Exception { deleteRecursively(U.resolveWorkDirectory(U.defaultWorkDirectory(), DFLT_STORE_DIR, false)); http://git-wip-us.apache.org/repos/asf/ignite/blob/9bba2d52/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/db/IgnitePdsCacheRestoreTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/db/IgnitePdsCacheRestoreTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/db/IgnitePdsCacheRestoreTest.java index 577cf9a..7da765b 100644 --- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/db/IgnitePdsCacheRestoreTest.java +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/db/IgnitePdsCacheRestoreTest.java @@ -61,7 +61,7 @@ public class IgnitePdsCacheRestoreTest extends GridCommonAbstractTest { DataStorageConfiguration memCfg = new DataStorageConfiguration() .setDefaultDataRegionConfiguration( new DataRegionConfiguration().setMaxSize(10 * 1024 * 1024).setPersistenceEnabled(true)) - .setPageSize(1024) + .setPageSize(4 * 1024) .setWalMode(WALMode.LOG_ONLY); memCfg.setDataRegionConfigurations(new DataRegionConfiguration() http://git-wip-us.apache.org/repos/asf/ignite/blob/9bba2d52/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/db/checkpoint/IgniteCheckpointDirtyPagesForLowLoadTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/db/checkpoint/IgniteCheckpointDirtyPagesForLowLoadTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/db/checkpoint/IgniteCheckpointDirtyPagesForLowLoadTest.java index 34662a7..782949f 100644 --- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/db/checkpoint/IgniteCheckpointDirtyPagesForLowLoadTest.java +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/db/checkpoint/IgniteCheckpointDirtyPagesForLowLoadTest.java @@ -32,6 +32,7 @@ import org.apache.ignite.configuration.DataStorageConfiguration; import org.apache.ignite.configuration.IgniteConfiguration; import org.apache.ignite.configuration.WALMode; import org.apache.ignite.internal.IgniteEx; +import org.apache.ignite.internal.IgniteFutureTimeoutCheckedException; import org.apache.ignite.internal.processors.cache.persistence.GridCacheDatabaseSharedManager; import org.apache.ignite.internal.util.typedef.internal.U; import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest; @@ -130,11 +131,21 @@ public class IgniteCheckpointDirtyPagesForLowLoadTest extends GridCommonAbstract ignite.cache(fullname).put(d, d); - log.info("Put to " + fullname + " value " + d); + if (log.isInfoEnabled()) + log.info("Put to cache [" + fullname + "] value " + d); - db.wakeupForCheckpoint("").get(); + final int timeout = 5000; + try { + db.wakeupForCheckpoint("").get(timeout, TimeUnit.MILLISECONDS); + } + catch (IgniteFutureTimeoutCheckedException e) { + continue; + } - int currCpPages = waitForCurrentCheckpointPagesCounterUpdated(db); + int currCpPages = waitForCurrentCheckpointPagesCounterUpdated(db, timeout); + + if (currCpPages < 0) + continue; pageCntObserved.add(currCpPages); @@ -158,15 +169,23 @@ public class IgniteCheckpointDirtyPagesForLowLoadTest extends GridCommonAbstract } /** + * Waits counter of pages will be set up. If it is not changed for timeout milliseconds, method returns negative + * value. + * * @param db DB shared manager. - * @return counter when it becomes non-zero. + * @param timeout milliseconds to wait. + * @return counter when it becomes non-zero, negative value indicates timeout during wait for update. */ - private int waitForCurrentCheckpointPagesCounterUpdated(GridCacheDatabaseSharedManager db) { + private int waitForCurrentCheckpointPagesCounterUpdated(GridCacheDatabaseSharedManager db, int timeout) { int currCpPages = 0; + long start = System.currentTimeMillis(); while (currCpPages == 0) { LockSupport.parkNanos(TimeUnit.MILLISECONDS.toNanos(1)); currCpPages = db.currentCheckpointPagesCount(); + + if (currCpPages == 0 && ((System.currentTimeMillis() - start) > timeout)) + return -1; } return currCpPages; http://git-wip-us.apache.org/repos/asf/ignite/blob/9bba2d52/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/db/file/IgnitePdsCheckpointSimpleTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/db/file/IgnitePdsCheckpointSimpleTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/db/file/IgnitePdsCheckpointSimpleTest.java new file mode 100644 index 0000000..4655fcc --- /dev/null +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/db/file/IgnitePdsCheckpointSimpleTest.java @@ -0,0 +1,100 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.cache.persistence.db.file; + +import com.google.common.base.Strings; +import java.util.concurrent.TimeUnit; +import org.apache.ignite.IgniteCache; +import org.apache.ignite.configuration.DataRegionConfiguration; +import org.apache.ignite.configuration.DataStorageConfiguration; +import org.apache.ignite.configuration.IgniteConfiguration; +import org.apache.ignite.internal.IgniteEx; +import org.apache.ignite.testframework.GridTestUtils; +import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest; +import org.jetbrains.annotations.NotNull; + +/** + * Puts data into grid, waits for checkpoint to start and then verifies data + */ +public class IgnitePdsCheckpointSimpleTest extends GridCommonAbstractTest { + + /** {@inheritDoc} */ + @Override protected IgniteConfiguration getConfiguration(String igniteInstanceName) throws Exception { + IgniteConfiguration cfg = super.getConfiguration(igniteInstanceName); + + DataRegionConfiguration regCfg = new DataRegionConfiguration().setPersistenceEnabled(true); + + DataStorageConfiguration dsCfg = new DataStorageConfiguration() + .setPageSize(4 * 1024) + .setDefaultDataRegionConfiguration(regCfg) + .setCheckpointFrequency(TimeUnit.SECONDS.toMillis(10)); + + return cfg.setDataStorageConfiguration(dsCfg); + } + + /** {@inheritDoc} */ + @Override protected void beforeTestsStarted() throws Exception { + super.beforeTestsStarted(); + + GridTestUtils.deleteDbFiles(); + } + + /** {@inheritDoc} */ + @Override protected void afterTest() throws Exception { + super.afterTest(); + + stopAllGrids(); + } + + /** + * Checks if same data can be loaded after checkpoint. + * @throws Exception if failed. + */ + public void testRecoveryAfterCpEnd() throws Exception { + IgniteEx ignite = startGrid(0); + + ignite.active(true); + + IgniteCache<Object, Object> cache = ignite.getOrCreateCache("cache"); + + for (int i = 0; i < 10000; i++) + cache.put(i, valueWithRedundancyForKey(i)); + + ignite.context().cache().context().database().waitForCheckpoint("test"); + + stopAllGrids(); + + IgniteEx igniteRestart = startGrid(0); + igniteRestart.active(true); + + IgniteCache<Object, Object> cacheRestart = igniteRestart.getOrCreateCache("cache"); + + for (int i = 0; i < 10000; i++) + assertEquals(valueWithRedundancyForKey(i), cacheRestart.get(i)); + + stopAllGrids(); + } + + /** + * @param i key. + * @return value with extra data, which allows to verify + */ + @NotNull private String valueWithRedundancyForKey(int i) { + return Strings.repeat(Integer.toString(i), 10); + } +} http://git-wip-us.apache.org/repos/asf/ignite/blob/9bba2d52/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/db/file/IgnitePdsEvictionTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/db/file/IgnitePdsEvictionTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/db/file/IgnitePdsEvictionTest.java index 1b86e3d..117be9b 100644 --- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/db/file/IgnitePdsEvictionTest.java +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/db/file/IgnitePdsEvictionTest.java @@ -52,7 +52,7 @@ public class IgnitePdsEvictionTest extends GridCommonAbstractTest { private static final int NUMBER_OF_SEGMENTS = 64; /** */ - private static final int PAGE_SIZE = 1024; + private static final int PAGE_SIZE = 4 * 1024; /** */ private static final long CHUNK_SIZE = 1024 * 1024; http://git-wip-us.apache.org/repos/asf/ignite/blob/9bba2d52/modules/core/src/test/java/org/apache/ignite/internal/processors/database/IgniteDbAbstractTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/database/IgniteDbAbstractTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/database/IgniteDbAbstractTest.java index 1d5b624..9ce3077 100644 --- a/modules/core/src/test/java/org/apache/ignite/internal/processors/database/IgniteDbAbstractTest.java +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/database/IgniteDbAbstractTest.java @@ -22,21 +22,23 @@ import java.util.Arrays; import org.apache.ignite.cache.affinity.rendezvous.RendezvousAffinityFunction; import org.apache.ignite.cache.query.annotations.QuerySqlField; import org.apache.ignite.configuration.CacheConfiguration; +import org.apache.ignite.configuration.DataRegionConfiguration; import org.apache.ignite.configuration.DataStorageConfiguration; import org.apache.ignite.configuration.IgniteConfiguration; +import org.apache.ignite.configuration.WALMode; +import org.apache.ignite.internal.cluster.IgniteClusterEx; import org.apache.ignite.internal.processors.cache.persistence.tree.BPlusTree; import org.apache.ignite.internal.util.typedef.internal.S; -import org.apache.ignite.internal.util.typedef.internal.U; import org.apache.ignite.spi.discovery.tcp.TcpDiscoverySpi; import org.apache.ignite.spi.discovery.tcp.ipfinder.TcpDiscoveryIpFinder; import org.apache.ignite.spi.discovery.tcp.ipfinder.vm.TcpDiscoveryVmIpFinder; +import org.apache.ignite.testframework.GridTestUtils; import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest; import static org.apache.ignite.cache.CacheAtomicityMode.ATOMIC; import static org.apache.ignite.cache.CacheAtomicityMode.TRANSACTIONAL; import static org.apache.ignite.cache.CacheRebalanceMode.SYNC; import static org.apache.ignite.cache.CacheWriteSynchronizationMode.FULL_SYNC; -import static org.apache.ignite.internal.processors.cache.persistence.file.FilePageStoreManager.DFLT_STORE_DIR; /** * @@ -73,7 +75,12 @@ public abstract class IgniteDbAbstractTest extends GridCommonAbstractTest { if (isLargePage()) dbCfg.setPageSize(16 * 1024); else - dbCfg.setPageSize(1024); + dbCfg.setPageSize(4 * 1024); + + dbCfg.setWalMode(WALMode.LOG_ONLY); + + dbCfg.setDefaultDataRegionConfiguration( + new DataRegionConfiguration().setPersistenceEnabled(true).setName("default")); configure(dbCfg); @@ -151,7 +158,7 @@ public abstract class IgniteDbAbstractTest extends GridCommonAbstractTest { /** * @param mCfg DataStorageConfiguration. */ - protected void configure(DataStorageConfiguration mCfg){ + protected void configure(DataStorageConfiguration mCfg) { // No-op. } @@ -164,7 +171,7 @@ public abstract class IgniteDbAbstractTest extends GridCommonAbstractTest { /** {@inheritDoc} */ @Override protected void beforeTest() throws Exception { - deleteRecursively(U.resolveWorkDirectory(U.defaultWorkDirectory(), DFLT_STORE_DIR, false)); + GridTestUtils.deleteDbFiles(); startGrids(gridCount()); @@ -178,7 +185,15 @@ public abstract class IgniteDbAbstractTest extends GridCommonAbstractTest { assert gridCount() > 0; - grid(0).active(true); + final IgniteClusterEx cluster = grid(0).cluster(); + + if (log.isInfoEnabled()) + log.info("BTL before activation: " + cluster.currentBaselineTopology()); + + cluster.active(true); + + if (log.isInfoEnabled()) + log.info("BTL after activation: " + cluster.currentBaselineTopology()); awaitPartitionMapExchange(); } @@ -189,7 +204,7 @@ public abstract class IgniteDbAbstractTest extends GridCommonAbstractTest { stopAllGrids(); - deleteRecursively(U.resolveWorkDirectory(U.defaultWorkDirectory(), DFLT_STORE_DIR, false)); + GridTestUtils.deleteDbFiles(); } /** http://git-wip-us.apache.org/repos/asf/ignite/blob/9bba2d52/modules/core/src/test/java/org/apache/ignite/internal/processors/database/IgniteDbClientNearCachePutGetTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/database/IgniteDbClientNearCachePutGetTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/database/IgniteDbClientNearCachePutGetTest.java deleted file mode 100644 index aa08190..0000000 --- a/modules/core/src/test/java/org/apache/ignite/internal/processors/database/IgniteDbClientNearCachePutGetTest.java +++ /dev/null @@ -1,39 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * - */ - -package org.apache.ignite.internal.processors.database; - -/** - * - */ -public class IgniteDbClientNearCachePutGetTest extends IgniteDbPutGetAbstractTest { - /** {@inheritDoc} */ - @Override protected int gridCount() { - return 1; - } - - /** {@inheritDoc} */ - @Override protected boolean indexingEnabled() { - return false; - } - - /** {@inheritDoc} */ - @Override protected boolean withClientNearCache() { - return true; - } -} http://git-wip-us.apache.org/repos/asf/ignite/blob/9bba2d52/modules/core/src/test/java/org/apache/ignite/internal/processors/database/IgniteDbDynamicCacheSelfTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/database/IgniteDbDynamicCacheSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/database/IgniteDbDynamicCacheSelfTest.java index e5c0e8a..00c2240 100644 --- a/modules/core/src/test/java/org/apache/ignite/internal/processors/database/IgniteDbDynamicCacheSelfTest.java +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/database/IgniteDbDynamicCacheSelfTest.java @@ -31,6 +31,8 @@ import org.apache.ignite.configuration.IgniteConfiguration; import org.apache.ignite.internal.util.typedef.internal.U; import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest; +import static org.apache.ignite.internal.processors.cache.persistence.file.FilePageStoreManager.DFLT_STORE_DIR; + /** * */ @@ -40,7 +42,7 @@ public class IgniteDbDynamicCacheSelfTest extends GridCommonAbstractTest { IgniteConfiguration cfg = super.getConfiguration(gridName); DataStorageConfiguration memCfg = new DataStorageConfiguration().setDefaultDataRegionConfiguration( - new DataRegionConfiguration().setMaxSize(200 * 1024 * 1024)); + new DataRegionConfiguration().setMaxSize(200 * 1024 * 1024).setPersistenceEnabled(true)); cfg.setDataStorageConfiguration(memCfg); @@ -58,6 +60,13 @@ public class IgniteDbDynamicCacheSelfTest extends GridCommonAbstractTest { } /** {@inheritDoc} */ + @Override protected void beforeTest() throws Exception { + super.beforeTest(); + + deleteRecursively(U.resolveWorkDirectory(U.defaultWorkDirectory(), DFLT_STORE_DIR, false)); + } + + /** {@inheritDoc} */ @Override protected void afterTest() throws Exception { super.afterTest(); http://git-wip-us.apache.org/repos/asf/ignite/blob/9bba2d52/modules/core/src/test/java/org/apache/ignite/internal/processors/database/IgniteDbMemoryLeakAbstractTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/database/IgniteDbMemoryLeakAbstractTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/database/IgniteDbMemoryLeakAbstractTest.java index c4e8bee..81b5515 100644 --- a/modules/core/src/test/java/org/apache/ignite/internal/processors/database/IgniteDbMemoryLeakAbstractTest.java +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/database/IgniteDbMemoryLeakAbstractTest.java @@ -79,7 +79,7 @@ public abstract class IgniteDbMemoryLeakAbstractTest extends IgniteDbAbstractTes @Override protected void configure(DataStorageConfiguration mCfg) { mCfg.setConcurrencyLevel(CONCURRENCY_LEVEL); - long size = (1024 * (isLargePage() ? 16 : 1) + 24) * pagesMax(); + long size = (1024 * (isLargePage() ? 16 : 4) + 24) * pagesMax(); mCfg.setDefaultDataRegionConfiguration( new DataRegionConfiguration().setMaxSize(Math.max(size, MIN_PAGE_CACHE_SIZE)).setName("default")); http://git-wip-us.apache.org/repos/asf/ignite/blob/9bba2d52/modules/core/src/test/java/org/apache/ignite/internal/processors/database/IgniteDbPutGetAbstractTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/database/IgniteDbPutGetAbstractTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/database/IgniteDbPutGetAbstractTest.java index 0b0c763..84455ba 100644 --- a/modules/core/src/test/java/org/apache/ignite/internal/processors/database/IgniteDbPutGetAbstractTest.java +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/database/IgniteDbPutGetAbstractTest.java @@ -36,6 +36,7 @@ import org.apache.ignite.cache.query.QueryCursor; import org.apache.ignite.cache.query.ScanQuery; import org.apache.ignite.cache.query.SqlFieldsQuery; import org.apache.ignite.cache.query.SqlQuery; +import org.apache.ignite.configuration.DataStorageConfiguration; import org.apache.ignite.configuration.NearCacheConfiguration; import org.apache.ignite.internal.IgniteEx; import org.apache.ignite.internal.processors.cache.GridCacheAdapter; http://git-wip-us.apache.org/repos/asf/ignite/blob/9bba2d52/modules/core/src/test/java/org/apache/ignite/testsuites/IgnitePdsTestSuite.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/testsuites/IgnitePdsTestSuite.java b/modules/core/src/test/java/org/apache/ignite/testsuites/IgnitePdsTestSuite.java index 59425c8..8e02279 100644 --- a/modules/core/src/test/java/org/apache/ignite/testsuites/IgnitePdsTestSuite.java +++ b/modules/core/src/test/java/org/apache/ignite/testsuites/IgnitePdsTestSuite.java @@ -33,7 +33,6 @@ import org.apache.ignite.internal.processors.cache.persistence.pagemem.PageMemor import org.apache.ignite.internal.processors.cache.persistence.pagemem.PageMemoryImplTest; import org.apache.ignite.internal.processors.cache.persistence.pagemem.PagesWriteThrottleSmokeTest; import org.apache.ignite.internal.processors.cache.persistence.wal.SegmentedRingByteBufferTest; -import org.apache.ignite.internal.processors.database.IgniteDbClientNearCachePutGetTest; import org.apache.ignite.internal.processors.database.IgniteDbDynamicCacheSelfTest; import org.apache.ignite.internal.processors.database.IgniteDbMultiNodePutGetTest; import org.apache.ignite.internal.processors.database.IgniteDbPutGetWithCacheStoreTest; @@ -52,25 +51,43 @@ public class IgnitePdsTestSuite extends TestSuite { public static TestSuite suite() throws Exception { TestSuite suite = new TestSuite("Ignite Persistent Store Test Suite"); + addRealPageStoreTests(suite); + // Basic PageMemory tests. suite.addTestSuite(PageMemoryImplNoLoadTest.class); suite.addTestSuite(IndexStoragePageMemoryImplTest.class); suite.addTestSuite(IgnitePdsEvictionTest.class); suite.addTestSuite(PageMemoryImplTest.class); - // Checkpointing smoke-test. - suite.addTestSuite(IgnitePdsCheckpointSimulationWithRealCpDisabledTest.class); - // BTree tests with store page memory. suite.addTestSuite(BPlusTreePageMemoryImplTest.class); suite.addTestSuite(BPlusTreeReuseListPageMemoryImplTest.class); + suite.addTestSuite(SegmentedRingByteBufferTest.class); + + // Write throttling + suite.addTestSuite(PagesWriteThrottleSmokeTest.class); + + return suite; + } + + /** + * Fills {@code suite} with PDS test subset, which operates with real page store and does actual disk operations. + * + * @param suite suite to add tests into. + */ + public static void addRealPageStoreTests(TestSuite suite) { + // Basic PageMemory tests. + suite.addTestSuite(IgnitePdsEvictionTest.class); + + // Checkpointing smoke-test. + suite.addTestSuite(IgnitePdsCheckpointSimulationWithRealCpDisabledTest.class); + // Basic API tests. suite.addTestSuite(IgniteDbSingleNodePutGetTest.class); suite.addTestSuite(IgniteDbMultiNodePutGetTest.class); suite.addTestSuite(IgniteDbSingleNodeTinyPutGetTest.class); suite.addTestSuite(IgniteDbDynamicCacheSelfTest.class); - suite.addTestSuite(IgniteDbClientNearCachePutGetTest.class); // Persistence-enabled. suite.addTestSuite(IgnitePdsSingleNodePutGetPersistenceTest.class); @@ -83,12 +100,5 @@ public class IgnitePdsTestSuite extends TestSuite { suite.addTestSuite(IgnitePdsCacheRestoreTest.class); suite.addTestSuite(DefaultPageSizeBackwardsCompatibilityTest.class); - - suite.addTestSuite(SegmentedRingByteBufferTest.class); - - // Write throttling - suite.addTestSuite(PagesWriteThrottleSmokeTest.class); - - return suite; } } http://git-wip-us.apache.org/repos/asf/ignite/blob/9bba2d52/modules/core/src/test/java/org/apache/ignite/testsuites/IgnitePdsTestSuite2.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/testsuites/IgnitePdsTestSuite2.java b/modules/core/src/test/java/org/apache/ignite/testsuites/IgnitePdsTestSuite2.java index 8dc318a..3852a16 100644 --- a/modules/core/src/test/java/org/apache/ignite/testsuites/IgnitePdsTestSuite2.java +++ b/modules/core/src/test/java/org/apache/ignite/testsuites/IgnitePdsTestSuite2.java @@ -50,13 +50,30 @@ import org.apache.ignite.internal.processors.cache.persistence.db.wal.reader.Ign public class IgnitePdsTestSuite2 extends TestSuite { /** * @return Suite. - * @throws Exception If failed. */ - public static TestSuite suite() throws Exception { + public static TestSuite suite() { TestSuite suite = new TestSuite("Ignite persistent Store Test Suite 2"); // Integrity test. suite.addTestSuite(IgniteDataIntegrityTests.class); + + addRealPageStoreTests(suite); + + // BaselineTopology tests + suite.addTestSuite(IgniteAllBaselineNodesOnlineFullApiSelfTest.class); + suite.addTestSuite(IgniteOfflineBaselineNodeFullApiSelfTest.class); + suite.addTestSuite(IgniteOnlineNodeOutOfBaselineFullApiSelfTest.class); + + return suite; + } + + /** + * Fills {@code suite} with PDS test subset, which operates with real page store and does actual disk operations. + * + * @param suite suite to add tests into. + */ + public static void addRealPageStoreTests(TestSuite suite) { + // Integrity test. suite.addTestSuite(IgnitePdsRecoveryAfterFileCorruptionTest.class); suite.addTestSuite(IgnitePdsPageSizesTest.class); @@ -89,6 +106,7 @@ public class IgnitePdsTestSuite2 extends TestSuite { suite.addTestSuite(IgniteWalFlushLogOnlySelfTest.class); + // Test suite uses Standalone WAL iterator to verify PDS content. suite.addTestSuite(IgniteWalReaderTest.class); suite.addTestSuite(IgnitePdsExchangeDuringCheckpointTest.class); @@ -98,15 +116,8 @@ public class IgnitePdsTestSuite2 extends TestSuite { suite.addTestSuite(IgniteWalSerializerVersionTest.class); - // BaselineTopology tests - suite.addTestSuite(IgniteAllBaselineNodesOnlineFullApiSelfTest.class); - suite.addTestSuite(IgniteOfflineBaselineNodeFullApiSelfTest.class); - suite.addTestSuite(IgniteOnlineNodeOutOfBaselineFullApiSelfTest.class); - suite.addTestSuite(WalCompactionTest.class); suite.addTestSuite(IgniteCheckpointDirtyPagesForLowLoadTest.class); - - return suite; } } http://git-wip-us.apache.org/repos/asf/ignite/blob/9bba2d52/modules/direct-io/pom.xml ---------------------------------------------------------------------- diff --git a/modules/direct-io/pom.xml b/modules/direct-io/pom.xml new file mode 100644 index 0000000..1d24672 --- /dev/null +++ b/modules/direct-io/pom.xml @@ -0,0 +1,122 @@ +<?xml version="1.0" encoding="UTF-8"?> + +<!-- + Licensed to the Apache Software Foundation (ASF) under one or more + contributor license agreements. See the NOTICE file distributed with + this work for additional information regarding copyright ownership. + The ASF licenses this file to You under the Apache License, Version 2.0 + (the "License"); you may not use this file except in compliance with + the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. +--> + +<!-- + POM file. +--> +<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" + xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> + <modelVersion>4.0.0</modelVersion> + + <parent> + <groupId>org.apache.ignite</groupId> + <artifactId>ignite-parent</artifactId> + <version>1</version> + <relativePath>../../parent</relativePath> + </parent> + + <artifactId>ignite-direct-io</artifactId> + <version>2.4.0-SNAPSHOT</version> + <url>http://ignite.apache.org</url> + + <dependencies> + + <dependency> + <groupId>org.apache.ignite</groupId> + <artifactId>ignite-core</artifactId> + <version>${project.version}</version> + </dependency> + + + <dependency> + <groupId>org.apache.ignite</groupId> + <artifactId>ignite-core</artifactId> + <version>${project.version}</version> + <type>test-jar</type> + <scope>test</scope> + </dependency> + + <dependency> + <groupId>org.apache.ignite</groupId> + <artifactId>ignite-indexing</artifactId> + <version>${project.version}</version> + </dependency> + + <dependency> + <groupId>org.springframework</groupId> + <artifactId>spring-beans</artifactId> + <version>${spring.version}</version> + <scope>test</scope> + </dependency> + + <dependency> + <groupId>org.springframework</groupId> + <artifactId>spring-context</artifactId> + <version>${spring.version}</version> + <scope>test</scope> + </dependency> + + <dependency> + <groupId>net.java.dev.jna</groupId> + <artifactId>jna</artifactId> + <version>4.5.0</version> + </dependency> + + <dependency> + <groupId>com.google.guava</groupId> + <artifactId>guava</artifactId> + <version>${guava.version}</version> + <scope>test</scope> + </dependency> + + <dependency> + <groupId>log4j</groupId> + <artifactId>log4j</artifactId> + <scope>test</scope> + </dependency> + + <dependency> + <groupId>com.thoughtworks.xstream</groupId> + <artifactId>xstream</artifactId> + <version>1.4.8</version> + <scope>test</scope> + </dependency> + + <dependency> + <groupId>org.mockito</groupId> + <artifactId>mockito-all</artifactId> + <version>1.9.5</version> + <scope>test</scope> + </dependency> + </dependencies> + + + <build> + <plugins> + <plugin> + <groupId>org.apache.maven.plugins</groupId> + <artifactId>maven-deploy-plugin</artifactId> + <version>2.8.2</version> + <configuration> + <skip>true</skip> + </configuration> + </plugin> + </plugins> + </build> +</project> http://git-wip-us.apache.org/repos/asf/ignite/blob/9bba2d52/modules/direct-io/src/main/java/org/apache/ignite/internal/processors/cache/persistence/file/AlignedBuffers.java ---------------------------------------------------------------------- diff --git a/modules/direct-io/src/main/java/org/apache/ignite/internal/processors/cache/persistence/file/AlignedBuffers.java b/modules/direct-io/src/main/java/org/apache/ignite/internal/processors/cache/persistence/file/AlignedBuffers.java new file mode 100644 index 0000000..4bc7b08 --- /dev/null +++ b/modules/direct-io/src/main/java/org/apache/ignite/internal/processors/cache/persistence/file/AlignedBuffers.java @@ -0,0 +1,72 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.cache.persistence.file; + +import com.sun.jna.NativeLong; +import com.sun.jna.Pointer; +import com.sun.jna.ptr.PointerByReference; +import java.nio.ByteBuffer; +import org.apache.ignite.internal.mem.IgniteOutOfMemoryException; +import org.apache.ignite.internal.util.GridUnsafe; + +/** + * Utility class for work with aligned buffers. + */ +@SuppressWarnings("WeakerAccess") +public class AlignedBuffers { + /** + * Allocates align memory for use with O_DIRECT and returns native byte buffer. + * @param fsBlockSize alignment, FS ans OS block size. + * @param size capacity. + * @return byte buffer, to be released by {@link #free(ByteBuffer)}. + */ + public static ByteBuffer allocate(int fsBlockSize, int size) { + assert fsBlockSize > 0; + assert size > 0; + + PointerByReference refToPtr = new PointerByReference(); + + int retVal = IgniteNativeIoLib.posix_memalign(refToPtr, new NativeLong(fsBlockSize), + new NativeLong(size)); + + if (retVal != 0) + throw new IgniteOutOfMemoryException("Failed to allocate memory: " + IgniteNativeIoLib.strerror(retVal)); + + return GridUnsafe.wrapPointer(Pointer.nativeValue(refToPtr.getValue()), size); + } + + /** + * Frees the memory space used by direct buffer, which must have been returned by a previous call + * {@link #allocate(int, int)}. + * + * @param buf direct buffer to free. + */ + public static void free(ByteBuffer buf) { + free(GridUnsafe.bufferAddress(buf)); + } + + /** + * Frees the memory space pointed to by {@code addr} - address of buffer, which must have been returned by a + * previous call {@link #allocate(int, int)}. + * + * @param addr direct buffer address to free. + */ + public static void free(long addr) { + IgniteNativeIoLib.free(new Pointer(addr)); + } +} http://git-wip-us.apache.org/repos/asf/ignite/blob/9bba2d52/modules/direct-io/src/main/java/org/apache/ignite/internal/processors/cache/persistence/file/AlignedBuffersDirectFileIO.java ---------------------------------------------------------------------- diff --git a/modules/direct-io/src/main/java/org/apache/ignite/internal/processors/cache/persistence/file/AlignedBuffersDirectFileIO.java b/modules/direct-io/src/main/java/org/apache/ignite/internal/processors/cache/persistence/file/AlignedBuffersDirectFileIO.java new file mode 100644 index 0000000..62e1db3 --- /dev/null +++ b/modules/direct-io/src/main/java/org/apache/ignite/internal/processors/cache/persistence/file/AlignedBuffersDirectFileIO.java @@ -0,0 +1,511 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.cache.persistence.file; + +import com.sun.jna.Native; +import com.sun.jna.NativeLong; +import com.sun.jna.Pointer; +import java.io.File; +import java.io.IOException; +import java.nio.BufferOverflowException; +import java.nio.ByteBuffer; +import java.nio.MappedByteBuffer; +import java.nio.file.OpenOption; +import java.nio.file.StandardOpenOption; +import java.util.Arrays; +import java.util.List; +import java.util.concurrent.atomic.AtomicReferenceArray; +import org.apache.ignite.IgniteLogger; +import org.apache.ignite.configuration.DataStorageConfiguration; +import org.apache.ignite.internal.util.GridUnsafe; +import org.apache.ignite.internal.util.typedef.internal.U; +import org.jetbrains.annotations.NotNull; +import org.jsr166.ConcurrentHashMap8; + +/** + * Limited capabilities Direct IO, which enables file write and read using aligned buffers and O_DIRECT mode. + * + * Works only for Linux + */ +public class AlignedBuffersDirectFileIO implements FileIO { + /** Negative value for file offset: read/write starting from current file position */ + private static final int FILE_POS_USE_CURRENT = -1; + + /** File system & linux block size. Minimal amount of data can be written using DirectIO. */ + private final int fsBlockSize; + + /** Durable memory Page size. Can have greater value than {@link #fsBlockSize}. */ + private final int pageSize; + + /** File. */ + private final File file; + + /** Logger. */ + private final IgniteLogger log; + + /** Thread local with buffers with capacity = one page {@link #pageSize} and aligned using {@link #fsBlockSize}. */ + private ThreadLocal<ByteBuffer> tlbOnePageAligned; + + /** Managed aligned buffers. Used to check if buffer is applicable for direct IO our data should be copied. */ + private ConcurrentHashMap8<Long, Thread> managedAlignedBuffers; + + /** File descriptor. */ + private int fd = -1; + + /** Number of instances to cache */ + private static final int CACHED_LONGS = 512; + + /** Value used as divisor in {@link #nativeLongCache}. Native longs divisible by this value will be cached. */ + private static final int NL_CACHE_DIVISOR = 4096; + + /** Native long instance cache. */ + private static final AtomicReferenceArray<NativeLong> nativeLongCache = new AtomicReferenceArray<>(CACHED_LONGS); + + /** + * Creates Direct File IO. + * + * @param fsBlockSize FS/OS block size. + * @param pageSize Durable memory Page size. + * @param file File to open. + * @param modes Open options (flags). + * @param tlbOnePageAligned Thread local with buffers with capacity = one page {@code pageSize} and aligned using + * {@code fsBlockSize}. + * @param managedAlignedBuffers Managed aligned buffers map, used to check if buffer is known. + * @param log Logger. + * @throws IOException if file open failed. + */ + AlignedBuffersDirectFileIO( + int fsBlockSize, + int pageSize, + File file, + OpenOption[] modes, + ThreadLocal<ByteBuffer> tlbOnePageAligned, + ConcurrentHashMap8<Long, Thread> managedAlignedBuffers, + IgniteLogger log) + throws IOException { + this.log = log; + this.fsBlockSize = fsBlockSize; + this.pageSize = pageSize; + this.file = file; + this.tlbOnePageAligned = tlbOnePageAligned; + this.managedAlignedBuffers = managedAlignedBuffers; + + String pathname = file.getAbsolutePath(); + + int openFlags = setupOpenFlags(modes, log, true); + int mode = IgniteNativeIoLib.DEFAULT_OPEN_MODE; + int fd = IgniteNativeIoLib.open(pathname, openFlags, mode); + + if (fd < 0) { + int error = Native.getLastError(); + String msg = "Error opening file [" + pathname + "] with flags [0x" + + String.format("%2X", openFlags) + ": DIRECT & " + Arrays.asList(modes) + + "], got error [" + error + ": " + getLastError() + "]"; + + if (error == IgniteNativeIoLib.E_INVAL) { + openFlags = setupOpenFlags(modes, log, false); + fd = IgniteNativeIoLib.open(pathname, openFlags, mode); + + if (fd > 0) { + U.warn(log, "Disable Direct IO mode for path " + file.getParentFile() + + "(probably incompatible file system selected, for example, tmpfs): " + msg); + + this.fd = fd; + + return; + } + } + + throw new IOException(msg); + } + + this.fd = fd; + } + + /** + * Convert Java open options to native flags. + * + * @param modes java options. + * @param log logger. + * @param enableDirect flag for enabling option {@link IgniteNativeIoLib#O_DIRECT} . + * @return native flags for open method. + */ + private static int setupOpenFlags(OpenOption[] modes, IgniteLogger log, boolean enableDirect) { + int flags = enableDirect ? IgniteNativeIoLib.O_DIRECT : 0; + List<OpenOption> openOptionList = Arrays.asList(modes); + + for (OpenOption mode : openOptionList) { + if (mode == StandardOpenOption.READ) { + flags |= openOptionList.contains(StandardOpenOption.WRITE) + ? IgniteNativeIoLib.O_RDWR + : IgniteNativeIoLib.O_RDONLY; + } + else if (mode == StandardOpenOption.WRITE) { + flags |= openOptionList.contains(StandardOpenOption.READ) + ? IgniteNativeIoLib.O_RDWR + : IgniteNativeIoLib.O_WRONLY; + } + else if (mode == StandardOpenOption.CREATE) + flags |= IgniteNativeIoLib.O_CREAT; + else if (mode == StandardOpenOption.TRUNCATE_EXISTING) + flags |= IgniteNativeIoLib.O_TRUNC; + else if (mode == StandardOpenOption.SYNC) + flags |= IgniteNativeIoLib.O_SYNC; + else + log.error("Unsupported open option [" + mode + "]"); + } + + return flags; + } + + /** {@inheritDoc} */ + @Override public long position() throws IOException { + long position = IgniteNativeIoLib.lseek(fdCheckOpened(), 0, IgniteNativeIoLib.SEEK_CUR); + + if (position < 0) { + throw new IOException(String.format("Error checking file [%s] position: %s", + file, getLastError())); + } + + return position; + } + + /** {@inheritDoc} */ + @Override public void position(long newPosition) throws IOException { + if (IgniteNativeIoLib.lseek(fdCheckOpened(), newPosition, IgniteNativeIoLib.SEEK_SET) < 0) { + throw new IOException(String.format("Error setting file [%s] position to [%s]: %s", + file, Long.toString(newPosition), getLastError())); + } + } + + /** {@inheritDoc} */ + @Override public int read(ByteBuffer destBuf) throws IOException { + return read(destBuf, FILE_POS_USE_CURRENT); + } + + /** {@inheritDoc} */ + @Override public int read(ByteBuffer destBuf, long filePosition) throws IOException { + int size = checkSizeIsPadded(destBuf.remaining()); + + if (isKnownAligned(destBuf)) + return readIntoAlignedBuffer(destBuf, filePosition); + + boolean useTlb = size == pageSize; + ByteBuffer alignedBuf = useTlb ? tlbOnePageAligned.get() : AlignedBuffers.allocate(fsBlockSize, size); + + try { + assert alignedBuf.position() == 0: "Temporary aligned buffer is in incorrect state: position is set incorrectly"; + assert alignedBuf.limit() == size: "Temporary aligned buffer is in incorrect state: limit is set incorrectly"; + + int loaded = readIntoAlignedBuffer(alignedBuf, filePosition); + + alignedBuf.flip(); + + if (loaded > 0) + destBuf.put(alignedBuf); + + return loaded; + } + finally { + alignedBuf.clear(); + + if (!useTlb) + AlignedBuffers.free(alignedBuf); + } + } + + /** {@inheritDoc} */ + @Override public int read(byte[] buf, int off, int len) throws IOException { + return read(ByteBuffer.wrap(buf, off, len)); + } + + /** {@inheritDoc} */ + @Override public int write(ByteBuffer srcBuf) throws IOException { + return write(srcBuf, FILE_POS_USE_CURRENT); + } + + /** {@inheritDoc} */ + @Override public int write(ByteBuffer srcBuf, long filePosition) throws IOException { + int size = checkSizeIsPadded(srcBuf.remaining()); + + if (isKnownAligned(srcBuf)) + return writeFromAlignedBuffer(srcBuf, filePosition); + + boolean useTlb = size == pageSize; + ByteBuffer alignedBuf = useTlb ? tlbOnePageAligned.get() : AlignedBuffers.allocate(fsBlockSize, size); + + try { + assert alignedBuf.position() == 0 : "Temporary aligned buffer is in incorrect state: position is set incorrectly"; + assert alignedBuf.limit() == size : "Temporary aligned buffer is in incorrect state: limit is set incorrectly"; + + int initPos = srcBuf.position(); + + alignedBuf.put(srcBuf); + alignedBuf.flip(); + + srcBuf.position(initPos); // will update later from write results + + int written = writeFromAlignedBuffer(alignedBuf, filePosition); + + if (written > 0) + srcBuf.position(initPos + written); + + return written; + } + finally { + alignedBuf.clear(); + + if (!useTlb) + AlignedBuffers.free(alignedBuf); + } + } + + /** + * Checks if we can run fast path: we got well known buffer is already aligned. + * + * @param srcBuf buffer to check if it is known buffer. + * @return {@code true} if this buffer was allocated with alignment, may be used directly. + */ + private boolean isKnownAligned(ByteBuffer srcBuf) { + return srcBuf.isDirect() + && managedAlignedBuffers != null + && managedAlignedBuffers.containsKey(GridUnsafe.bufferAddress(srcBuf)); + } + + /** + * Check if size is appropriate for aligned/direct IO. + * + * @param size buffer size to write, should be divisible by {@link #fsBlockSize}. + * @return size from parameter. + * @throws IOException if provided size can't be written using direct IO. + */ + private int checkSizeIsPadded(int size) throws IOException { + if (size % fsBlockSize != 0) { + throw new IOException( + String.format("Unable to apply DirectIO for read/write buffer [%d] bytes on file system " + + "block size [%d]. Consider setting %s.setPageSize(%d) or disable Direct IO.", + size, fsBlockSize, DataStorageConfiguration.class.getSimpleName(), fsBlockSize)); + } + + return size; + } + + /** + * Checks if file is opened and returns descriptor. + * + * @return file descriptor. + * @throws IOException if file not opened. + */ + private int fdCheckOpened() throws IOException { + if (fd < 0) + throw new IOException(String.format("Error %s not opened", file)); + + return fd; + } + + /** + * Read bytes from file using Native IO and aligned buffers. + * + * @param destBuf Destination aligned byte buffer. + * @param filePos Starting position of file. Providing {@link #FILE_POS_USE_CURRENT} means it is required + * to read from current file position. + * @return number of bytes read from file, or <tt>-1</tt> if tried to read past EOF for file. + * @throws IOException if reading failed. + */ + private int readIntoAlignedBuffer(ByteBuffer destBuf, long filePos) throws IOException { + int pos = destBuf.position(); + int limit = destBuf.limit(); + int toRead = pos <= limit ? limit - pos : 0; + + if (toRead == 0) + return 0; + + if ((pos + toRead) > destBuf.capacity()) + throw new BufferOverflowException(); + + int rd; + Pointer ptr = bufferPtrAtPosition(destBuf, pos); + + if (filePos == FILE_POS_USE_CURRENT) + rd = IgniteNativeIoLib.read(fdCheckOpened(), ptr, nl(toRead)).intValue(); + else + rd = IgniteNativeIoLib.pread(fdCheckOpened(), ptr, nl(toRead), nl(filePos)).intValue(); + + if (rd == 0) + return -1; //Tried to read past EOF for file + + if (rd < 0) + throw new IOException(String.format("Error during reading file [%s] from position [%s] : %s", + file, filePos == FILE_POS_USE_CURRENT ? "current" : Long.toString(filePos), getLastError())); + + destBuf.position(pos + rd); + + return rd; + } + + /** + * Writes the aligned native buffer starting at {@code buf} to the file at offset + * {@code filePos}. The file offset is not changed. + * + * @param srcBuf pointer to buffer. + * @param filePos position in file to write data. Providing {@link #FILE_POS_USE_CURRENT} means it is required + * to read from current file position. + * @return the number of bytes written. + */ + private int writeFromAlignedBuffer(ByteBuffer srcBuf, long filePos) throws IOException { + int pos = srcBuf.position(); + int limit = srcBuf.limit(); + int toWrite = pos <= limit ? limit - pos : 0; + + if (toWrite == 0) + return 0; + + int wr; + Pointer ptr = bufferPtrAtPosition(srcBuf, pos); + + if (filePos == FILE_POS_USE_CURRENT) + wr = IgniteNativeIoLib.write(fdCheckOpened(), ptr, nl(toWrite)).intValue(); + else + wr = IgniteNativeIoLib.pwrite(fdCheckOpened(), ptr, nl(toWrite), nl(filePos)).intValue(); + + if (wr < 0) { + throw new IOException(String.format("Error during writing file [%s] to position [%s]: %s", + file, filePos == FILE_POS_USE_CURRENT ? "current" : Long.toString(filePos), getLastError())); + } + + if ((pos + wr) > limit) { + throw new IllegalStateException(String.format("Write illegal state for file [%s]: pos=%d, wr=%d, limit=%d", + file, pos, wr, limit)); + } + + srcBuf.position(pos + wr); + + return wr; + } + + /** + * @param val value to box to native long. + * @return native long. + */ + @NotNull private static NativeLong nl(long val) { + if (val % NL_CACHE_DIVISOR == 0 && val < CACHED_LONGS * NL_CACHE_DIVISOR) { + int cacheIdx = (int)(val / NL_CACHE_DIVISOR); + + NativeLong curCached = nativeLongCache.get(cacheIdx); + + if (curCached != null) + return curCached; + + NativeLong nl = new NativeLong(val); + + nativeLongCache.compareAndSet(cacheIdx, null, nl); + + return nl; + } + return new NativeLong(val); + } + + /** + * Retrieve last error set by the OS as string. This corresponds to and <code>errno</code> on + * *nix platforms. + * @return displayable string with OS error info. + */ + private static String getLastError() { + return IgniteNativeIoLib.strerror(Native.getLastError()); + } + + /** + * Gets address in memory for direct aligned buffer taking into account its current {@code position()} as offset. + * Produces warnings if data or offset seems to be not aligned. + * + * @param buf Direct aligned buffer. + * @param pos position, used as offset for resulting pointer. + * @return Buffer memory address. + */ + @NotNull private Pointer bufferPtrAtPosition(ByteBuffer buf, int pos) { + long alignedPointer = GridUnsafe.bufferAddress(buf); + + if (pos < 0) + throw new IllegalArgumentException(); + + if (pos > buf.capacity()) + throw new BufferOverflowException(); + + if ((alignedPointer + pos) % fsBlockSize != 0) { + U.warn(log, String.format("IO Buffer Pointer [%d] and/or offset [%d] seems to be not aligned " + + "for FS block size [%d]. Direct IO may fail.", alignedPointer, buf.position(), fsBlockSize)); + } + + return new Pointer(alignedPointer + pos); + } + + /** {@inheritDoc} */ + @Override public void write(byte[] buf, int off, int len) throws IOException { + write(ByteBuffer.wrap(buf, off, len)); + } + + /** {@inheritDoc} */ + @Override public MappedByteBuffer map(int maxWalSegmentSize) throws IOException { + throw new UnsupportedOperationException("AsynchronousFileChannel doesn't support mmap."); + } + + /** {@inheritDoc} */ + @Override public void force() throws IOException { + if (IgniteNativeIoLib.fsync(fdCheckOpened()) < 0) + throw new IOException(String.format("Error fsync()'ing %s, got %s", file, getLastError())); + } + + /** {@inheritDoc} */ + @Override public long size() throws IOException { + return file.length(); + } + + /** {@inheritDoc} */ + @Override public void clear() throws IOException { + truncate(0); + } + + /** + * Truncates this channel's file to the given size. + * + * <p> If the given size is less than the file's current size then the file + * is truncated, discarding any bytes beyond the new end of the file. If + * the given size is greater than or equal to the file's current size then + * the file is not modified. In either case, if this channel's file + * position is greater than the given size then it is set to that size. + * </p> + * + * @param size The new size, a non-negative byte count + * + */ + private void truncate(long size) throws IOException { + if (IgniteNativeIoLib.ftruncate(fdCheckOpened(), size) < 0) + throw new IOException(String.format("Error truncating file %s, got %s", file, getLastError())); + + if (position() > size) + position(size); + } + + /** {@inheritDoc} */ + @Override public void close() throws IOException { + if (IgniteNativeIoLib.close(fdCheckOpened()) < 0) + throw new IOException(String.format("Error closing %s, got %s", file, getLastError())); + + fd = -1; + } +}
