Repository: ignite Updated Branches: refs/heads/master 577e632e7 -> 9bba2d520
http://git-wip-us.apache.org/repos/asf/ignite/blob/9bba2d52/modules/direct-io/src/main/java/org/apache/ignite/internal/processors/cache/persistence/file/AlignedBuffersDirectFileIOFactory.java ---------------------------------------------------------------------- diff --git a/modules/direct-io/src/main/java/org/apache/ignite/internal/processors/cache/persistence/file/AlignedBuffersDirectFileIOFactory.java b/modules/direct-io/src/main/java/org/apache/ignite/internal/processors/cache/persistence/file/AlignedBuffersDirectFileIOFactory.java new file mode 100644 index 0000000..b9d11cd --- /dev/null +++ b/modules/direct-io/src/main/java/org/apache/ignite/internal/processors/cache/persistence/file/AlignedBuffersDirectFileIOFactory.java @@ -0,0 +1,177 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.cache.persistence.file; + +import java.io.File; +import java.io.IOException; +import java.nio.ByteBuffer; +import java.nio.ByteOrder; +import java.nio.file.OpenOption; +import org.apache.ignite.IgniteLogger; +import org.apache.ignite.IgniteSystemProperties; +import org.apache.ignite.configuration.DataStorageConfiguration; +import org.apache.ignite.internal.util.GridUnsafe; +import org.apache.ignite.internal.util.typedef.internal.U; +import org.jetbrains.annotations.NotNull; +import org.jsr166.ConcurrentHashMap8; + +import static java.nio.file.StandardOpenOption.CREATE; +import static java.nio.file.StandardOpenOption.READ; +import static java.nio.file.StandardOpenOption.WRITE; + +/** + * Direct native IO factory for block IO operations on aligned memory structures.<br> + * This limited functionality is used for page store operations.<br> + * <b>Note: </b> This type of IO not applicable for WAL or other files.<br> <br> + * This IO tries to minimize cache effects of the I/O (page caching by OS). <br> <br> + * In general this will degrade performance, but it is useful in special + * situations, such as when applications do their own caching.<br> + */ +public class AlignedBuffersDirectFileIOFactory implements FileIOFactory { + /** Logger. */ + private final IgniteLogger log; + + /** Page size from durable memory. */ + private final int pageSize; + + /** Backup factory for files in case native is not available or not applicable. */ + private final FileIOFactory backupFactory; + + /** File system/os block size, negative value if library init was failed. */ + private final int fsBlockSize; + + /** Use backup factory, {@code true} if direct IO setup failed. */ + private boolean useBackupFactory; + + /** Thread local with buffers with capacity = one page {@code pageSize} and aligned using {@code fsBlockSize}. */ + private ThreadLocal<ByteBuffer> tlbOnePageAligned; + + /** + * Managed aligned buffers. This collection is used to free buffers, an for checking if buffer is known to be + * already aligned. + */ + private final ConcurrentHashMap8<Long, Thread> managedAlignedBuffers = new ConcurrentHashMap8<>(); + + /** + * Creates direct native IO factory. + * + * @param log Logger. + * @param storePath Storage path, used to check FS settings. + * @param pageSize durable memory page size. + * @param backupFactory fallback factory if init failed. + */ + public AlignedBuffersDirectFileIOFactory( + final IgniteLogger log, + final File storePath, + final int pageSize, + final FileIOFactory backupFactory) { + this.log = log; + this.pageSize = pageSize; + this.backupFactory = backupFactory; + + useBackupFactory = true; + fsBlockSize = IgniteNativeIoLib.getFsBlockSize(storePath.getAbsolutePath(), log); + + if(!IgniteSystemProperties.getBoolean(IgniteSystemProperties.IGNITE_DIRECT_IO_ENABLED, true)) { + if (log.isInfoEnabled()) + log.info("Direct IO is explicitly disabled by system property"); + + return; + } + + if (fsBlockSize > 0) { + int blkSize = fsBlockSize; + + if (pageSize % blkSize != 0) { + U.warn(log, String.format("Unable to setup Direct IO for Ignite [pageSize=%d bytes;" + + " file system block size=%d]. For speeding up Ignite consider setting %s.setPageSize(%d)." + + " Direct IO is disabled", + pageSize, blkSize, DataStorageConfiguration.class.getSimpleName(), blkSize)); + } + else { + useBackupFactory = false; + + tlbOnePageAligned = new ThreadLocal<ByteBuffer>() { + @Override protected ByteBuffer initialValue() { + return createManagedBuffer(pageSize); + } + }; + + if (log.isInfoEnabled()) { + log.info(String.format("Direct IO is enabled for block IO operations on aligned memory structures." + + " [block size = %d, durable memory page size = %d]", blkSize, pageSize)); + } + } + } + else { + if (log.isInfoEnabled()) { + log.info(String.format("Direct IO library is not available on current operating system [%s]." + + " Direct IO is not enabled.", System.getProperty("os.version"))); + } + } + + } + + /** + * <b>Note: </b> Use only if {@link #isDirectIoAvailable()}. + * + * @param size buffer size to allocate. + * @return new byte buffer. + */ + @NotNull ByteBuffer createManagedBuffer(int size) { + assert !useBackupFactory : "Direct IO is disabled, aligned managed buffer creation is disabled now"; + assert managedAlignedBuffers != null : "Direct buffers not available"; + + ByteBuffer allocate = AlignedBuffers.allocate(fsBlockSize, size).order(ByteOrder.nativeOrder()); + + managedAlignedBuffers.put(GridUnsafe.bufferAddress(allocate), Thread.currentThread()); + + return allocate; + } + + /** {@inheritDoc} */ + @Override public FileIO create(File file) throws IOException { + return create(file, CREATE, READ, WRITE); + } + + /** {@inheritDoc} */ + @Override public FileIO create(File file, OpenOption... modes) throws IOException { + if (useBackupFactory) + return backupFactory.create(file, modes); + + return new AlignedBuffersDirectFileIO(fsBlockSize, pageSize, file, modes, tlbOnePageAligned, managedAlignedBuffers, log); + + } + + /** + * @return {@code true} if Direct IO can be used on current OS and file system settings + */ + boolean isDirectIoAvailable() { + return !useBackupFactory; + } + + /** + * Managed aligned buffers and its associated threads. This collection is used to free buffers, an for checking if + * buffer is known to be already aligned. + * + * @return map address->thread. + */ + ConcurrentHashMap8<Long, Thread> managedAlignedBuffers() { + return managedAlignedBuffers; + } +} http://git-wip-us.apache.org/repos/asf/ignite/blob/9bba2d52/modules/direct-io/src/main/java/org/apache/ignite/internal/processors/cache/persistence/file/IgniteNativeIoLib.java ---------------------------------------------------------------------- diff --git a/modules/direct-io/src/main/java/org/apache/ignite/internal/processors/cache/persistence/file/IgniteNativeIoLib.java b/modules/direct-io/src/main/java/org/apache/ignite/internal/processors/cache/persistence/file/IgniteNativeIoLib.java new file mode 100644 index 0000000..47f1e6a --- /dev/null +++ b/modules/direct-io/src/main/java/org/apache/ignite/internal/processors/cache/persistence/file/IgniteNativeIoLib.java @@ -0,0 +1,405 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.cache.persistence.file; + +import com.sun.jna.Native; +import com.sun.jna.NativeLong; +import com.sun.jna.Platform; +import com.sun.jna.Pointer; +import com.sun.jna.ptr.PointerByReference; +import java.util.ArrayList; +import java.util.List; +import java.util.StringTokenizer; +import org.apache.ignite.IgniteLogger; +import org.apache.ignite.internal.util.typedef.internal.U; +import org.jetbrains.annotations.Nullable; + +/** + * Native IO library based on *nix C library, enabled for Linux, kernel version >= 2.4.10. <br> + * <br> + * Uses JNA library (https://github.com/java-native-access/jna) to access native calls. <br> + * <br> + */ +@SuppressWarnings({"OctalInteger", "WeakerAccess"}) +public class IgniteNativeIoLib { + /** Open for reading only. */ + public static final int O_RDONLY = 00; + + /** Open for writing only. */ + public static final int O_WRONLY = 01; + + /** Open for reading and writing. */ + public static final int O_RDWR = 02; + + /** File shall be created. If the file exists, this flag has no effect. */ + public static final int O_CREAT = 0100; + + /** If the file exists and is a regular file length shall be truncated to 0. */ + public static final int O_TRUNC = 01000; + + /** Try to minimize cache effects of the I/O to and from this file. */ + public static final int O_DIRECT = 040000; + + /** + * Write operations on the file will complete according to the requirements of synchronized I/O file integrity + * completion. By the time write(2) (or similar) returns, the output data and associated file metadata have been + * transferred to the underlying hardware. + */ + public static final int O_SYNC = 04000000; + + /** + * The specified data will not be accessed in the near future. See fadvise.h and "man 2 posix_fadvise". + */ + public static final int POSIX_FADV_DONTNEED = 4; + + /** Flag for newly created files: user has read permission. */ + public static final int S_IRUSR = 00400; + + /** Flag for newly created files: user has write permission. */ + public static final int S_IWUSR = 00200; + + /** Flag for newly created files: group has read permission. */ + public static final int S_IRGRP = 00040; + + /** Flag for newly created files: others have read permission. */ + public static final int S_IROTH = 00004; + + /** Default access mask for newly created files. */ + public static final int DEFAULT_OPEN_MODE = S_IRUSR | S_IWUSR | S_IROTH | S_IRGRP; + + /** Invalid argument. */ + public static final int E_INVAL = 22; + + /** Seek option: set file offset to offset */ + public static final int SEEK_SET = 0; + + /** Seek option: change file position to offset */ + public static final int SEEK_CUR = 1; + + /** JNA library available and initialized. Always {@code false} for non linux systems. */ + private static boolean jnaAvailable; + + /** JNA library initialization exception. To be logged to Ignite logger later. */ + @Nullable private static Exception ex; + + static { + if (Platform.isLinux()) { + try { + if (checkLinuxVersion()) { + Native.register(Platform.C_LIBRARY_NAME); + jnaAvailable = true; + } + else + jnaAvailable = false; + } + catch (Exception e) { + ex = e; + jnaAvailable = false; + } + } + else + jnaAvailable = false; + } + + /** + * O_DIRECT support was added under Linux in kernel version 2.4.10. + * + * @return {@code true} if O_DIRECT is supported, kernel version >= 2.4.10 + */ + private static boolean checkLinuxVersion() { + final String osVer = System.getProperty("os.version"); + + if (osVer == null) + return false; + + List<Integer> verIntComps = new ArrayList<>(); + + for (StringTokenizer tokenizer = new StringTokenizer(osVer, ".-"); tokenizer.hasMoreTokens(); ) { + String verComp = tokenizer.nextToken(); + + if (verComp.matches("\\d*")) + verIntComps.add(Integer.parseInt(verComp)); + } + + if (verIntComps.isEmpty()) + return false; + + final int verIdx = 0; + final int majorRevIdx = 1; + final int minorRevIdx = 2; + + if (verIntComps.get(verIdx) > 2) + return true; + else if (verIntComps.get(verIdx) == 2) { + int compsCnt = verIntComps.size(); + + if (compsCnt > majorRevIdx && verIntComps.get(majorRevIdx) > 4) + return true; + else if (compsCnt > minorRevIdx + && verIntComps.get(majorRevIdx) == 4 + && verIntComps.get(minorRevIdx) >= 10) + return true; + } + return false; + } + + + /** + * Calculate Lowest Common Multiplier. + * @param a first value. + * @param b second value. + */ + private static long lcm(final long a, final long b) { + return (a * b) / gcf(a, b); + } + + /** + * Calculate Greatest Common Factor. + * @param a first value. + * @param b second value. + */ + private static long gcf(final long a, final long b) { + if (b == 0) + return a; + else + return gcf(b, a % b); + } + + /** + * Determines FS and OS block size. Returns file system block size for use with storageDir see "man 3 posix_memalign" + * + * @param storageDir storage path, base path to check (FS) configuration parameters. + * @param log Logger. + * @return <ul><li>FS block size to be used in Direct IO and memory alignments.</li> + * <li>or <tt>-1</tt> Operating System is not applicable for enabling Direct IO.</li> + * <li>and <tt>-1</tt> if failed to determine block size.</li> + * <li>and <tt>-1</tt> if JNA is not available or init failed.</li> </ul> + */ + public static int getFsBlockSize(final String storageDir, final IgniteLogger log) { + if (ex != null) { + U.warn(log, "Failed to initialize O_DIRECT support at current OS: " + ex.getMessage(), ex); + + return -1; + } + + if (!jnaAvailable) + return -1; + + int fsBlockSize = -1; + int _PC_REC_XFER_ALIGN = 0x11; + int pcAlign = pathconf(storageDir, _PC_REC_XFER_ALIGN).intValue(); + + if (pcAlign > 0) + fsBlockSize = pcAlign; + + int pageSize = getpagesize(); + + fsBlockSize = (int)lcm(fsBlockSize, pageSize); + + // just being completely paranoid: (512 is the rule for 2.6+ kernels) + fsBlockSize = (int)lcm(fsBlockSize, 512); + + if (log.isInfoEnabled()) + log.info(String.format("Page size configuration for storage path [%s]: %d;" + + " Linux memory page size: %d;" + + " Selected FS block size : %d.", + storageDir, pcAlign, pageSize, fsBlockSize)); + + // lastly, a sanity check + if (fsBlockSize <= 0 || ((fsBlockSize & (fsBlockSize - 1)) != 0)) { + U.warn(log, "File system block size should be a power of two, was found to be " + fsBlockSize + + " Disabling O_DIRECT support"); + + return -1; + } + + if (log.isInfoEnabled()) + log.info("Selected FS block size : " + fsBlockSize); + + return fsBlockSize; + } + + /** + * @return Flag indicating JNA library available and initialized. Always {@code false} for non linux systems. + */ + public static boolean isJnaAvailable() { + return jnaAvailable; + } + + /** + * Open a file. See "man 3 open". + * + * @param pathname pathname naming the file. + * @param flags flag/open options. Flags are constructed by a bitwise-inclusive OR of flags. + * @param mode create file mode creation mask. + * @return file descriptor. + */ + public static native int open(String pathname, int flags, int mode); + + /** + * See "man 2 close". + * + * @param fd The file descriptor of the file to close. + * @return 0 on success, -1 on error. + */ + public static native int close(int fd); + + /** + * Writes up to {@code cnt} bytes to the buffer starting at {@code buf} to the file descriptor {@code fd} at offset + * {@code offset}. The file offset is not changed. See "man 2 pwrite". + * + * @param fd file descriptor. + * @param buf pointer to buffer with data. + * @param cnt bytes to write. + * @param off position in file to write data. + * @return the number of bytes written. Note that is not an error for a successful call to transfer fewer bytes than + * requested. + */ + public static native NativeLong pwrite(int fd, Pointer buf, NativeLong cnt, NativeLong off); + + /** + * Writes up to {@code cnt} bytes to the buffer starting at {@code buf} to the file descriptor {@code fd}. + * The file offset is changed. See "man 2 write". + * + * @param fd file descriptor. + * @param buf pointer to buffer with data. + * @param cnt bytes to write. + * @return the number of bytes written. Note that is not an error for a successful call to transfer fewer bytes than + * requested. + */ + public static native NativeLong write(int fd, Pointer buf, NativeLong cnt); + + /** + * Reads up to {@code cnt} bytes from file descriptor {@code fd} at offset {@code off} (from the start of the file) + * into the buffer starting at {@code buf}. The file offset is not changed. See "man 2 pread". + * + * @param fd file descriptor. + * @param buf pointer to buffer to place the data. + * @param cnt bytes to read. + * @return On success, the number of bytes read is returned (zero indicates end of file), on error, -1 is returned, + * and errno is set appropriately. + */ + public static native NativeLong pread(int fd, Pointer buf, NativeLong cnt, NativeLong off); + + /** + * Reads up to {@code cnt} bytes from file descriptor {@code fd} into the buffer starting at {@code buf}. The file + * offset is changed. See "man 2 read". + * + * @param fd file descriptor. + * @param buf pointer to buffer to place the data. + * @param cnt bytes to read. + * @return On success, the number of bytes read is returned (zero indicates end of file), on error, -1 is returned, + * and errno is set appropriately. + */ + public static native NativeLong read(int fd, Pointer buf, NativeLong cnt); + + /** + * Synchronize a file's in-core state with storage device. See "man 2 fsync". + * @param fd file descriptor. + * @return On success return zero. On error, -1 is returned, and errno is set appropriately. + */ + public static native int fsync(int fd); + + /** + * Allocates size bytes and places the address of the allocated memory in {@code memptr}. + * The address of the allocated memory will be a multiple of {@code alignment}. + * + * See "man 3 posix_memalign". + * @param memptr out memory pointer. + * @param alignment memory alignment, must be a power of two and a multiple of sizeof(void *). + * @param size size of buffer. + * @return returns zero on success, or one of the error values. + */ + public static native int posix_memalign(PointerByReference memptr, NativeLong alignment, NativeLong size); + + /** + * Frees the memory space pointed to by ptr, which must have been returned by a previous call to native allocation + * methods. POSIX requires that memory obtained from {@link #posix_memalign} can be freed using free. See "man 3 + * free". + * + * @param ptr pointer to free. + */ + public static native void free(Pointer ptr); + + /** + * Function returns a string that describes the error code passed in the argument {@code errnum}. See "man 3 + * strerror". + * + * @param errnum error code. + * @return displayable error information. + */ + public static native String strerror(int errnum); + + /** + * Return path (FS) configuration parameter value. <br> + * Helps to determine alignment restrictions, for example, on buffers used for direct block device I/O. <br> + * POSIX specifies the pathconf(path,_PC_REC_XFER_ALIGN) call that tells what alignment is needed. + * + * @param path base path to check settings. + * @param name variable name to query. + */ + public static native NativeLong pathconf(String path, int name); + + /** + * The function getpagesize() returns the number of bytes in a memory + * page, where "page" is a fixed-length block, the unit for memory + * allocation and file mapping + */ + public static native int getpagesize(); + + /** + * Allows to announce an intention to access file data in a specific pattern in the future, thus allowing the + * kernel to perform appropriate optimizations. + * + * The advice applies to a (not necessarily existent) region starting at + * {@code off} and extending for {@code len} bytes (or until the end of the file if len is 0) + * within the file referred to by fd. + * + * See "man 2 posix_fadvise". + * + * @param fd file descriptor. + * @param off region start. + * @param len region end. + * @param flag advice (option) to apply. + * @return On success, zero is returned. On error, an error number is returned. + */ + public static native int posix_fadvise(int fd, long off, long len, int flag); + + /** + * Causes regular file referenced by fd to be truncated to a size of precisely length bytes. + * + * If the file previously was larger than this size, the extra data is lost. + * If the file previously was shorter, it is extended, and the extended part reads as null bytes ('\0'). + * The file offset is not changed. + * + * @param fd file descriptor. + * @param len required length. + * @return On success, zero is returned. On error, -1 is returned, and errno is set appropriately. + */ + public static native int ftruncate(int fd, long len); + + /** + * Repositions the file offset of the open file description associated with the file descriptor {@code fd} + * to the argument offset according to the directive {@code whence} + * @param fd file descriptor. + * @param off required position offset. + * @param whence position base. + * @return On error, the value -1 is returned and errno is set to indicate the error. + */ + public static native long lseek(int fd, long off, int whence); +} http://git-wip-us.apache.org/repos/asf/ignite/blob/9bba2d52/modules/direct-io/src/main/java/org/apache/ignite/internal/processors/cache/persistence/file/LinuxNativeIoPlugin.java ---------------------------------------------------------------------- diff --git a/modules/direct-io/src/main/java/org/apache/ignite/internal/processors/cache/persistence/file/LinuxNativeIoPlugin.java b/modules/direct-io/src/main/java/org/apache/ignite/internal/processors/cache/persistence/file/LinuxNativeIoPlugin.java new file mode 100644 index 0000000..32c63df --- /dev/null +++ b/modules/direct-io/src/main/java/org/apache/ignite/internal/processors/cache/persistence/file/LinuxNativeIoPlugin.java @@ -0,0 +1,25 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.cache.persistence.file; + +import org.apache.ignite.plugin.IgnitePlugin; + +/** Noop plugin. See {@link IgniteNativeIoLib}. */ +public class LinuxNativeIoPlugin implements IgnitePlugin { + // No-op. +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/ignite/blob/9bba2d52/modules/direct-io/src/main/java/org/apache/ignite/internal/processors/cache/persistence/file/LinuxNativeIoPluginProvider.java ---------------------------------------------------------------------- diff --git a/modules/direct-io/src/main/java/org/apache/ignite/internal/processors/cache/persistence/file/LinuxNativeIoPluginProvider.java b/modules/direct-io/src/main/java/org/apache/ignite/internal/processors/cache/persistence/file/LinuxNativeIoPluginProvider.java new file mode 100644 index 0000000..918ff5c --- /dev/null +++ b/modules/direct-io/src/main/java/org/apache/ignite/internal/processors/cache/persistence/file/LinuxNativeIoPluginProvider.java @@ -0,0 +1,238 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.cache.persistence.file; + +import java.io.FileDescriptor; +import java.io.Serializable; +import java.nio.ByteBuffer; +import java.nio.channels.FileChannel; +import java.util.Map; +import java.util.UUID; +import org.apache.ignite.Ignite; +import org.apache.ignite.IgniteLogger; +import org.apache.ignite.cluster.ClusterNode; +import org.apache.ignite.internal.IgniteEx; +import org.apache.ignite.internal.pagemem.store.IgnitePageStoreManager; +import org.apache.ignite.internal.processors.cache.GridCacheSharedContext; +import org.apache.ignite.internal.processors.cache.persistence.GridCacheDatabaseSharedManager; +import org.apache.ignite.internal.processors.cache.persistence.wal.FileWriteAheadLogManager; +import org.apache.ignite.internal.util.typedef.internal.U; +import org.apache.ignite.lang.IgniteInClosure; +import org.apache.ignite.plugin.CachePluginContext; +import org.apache.ignite.plugin.CachePluginProvider; +import org.apache.ignite.plugin.ExtensionRegistry; +import org.apache.ignite.plugin.IgnitePlugin; +import org.apache.ignite.plugin.PluginContext; +import org.apache.ignite.plugin.PluginProvider; +import org.apache.ignite.plugin.PluginValidationException; +import org.jetbrains.annotations.Nullable; +import org.jsr166.ConcurrentHashMap8; + +/** + * Plugin provider for setting up {@link IgniteNativeIoLib}. + */ +public class LinuxNativeIoPluginProvider implements PluginProvider { + /** Managed buffers map from address to thread requested buffer. */ + @Nullable private ConcurrentHashMap8<Long, Thread> managedBuffers; + + /** Logger. */ + private IgniteLogger log; + + /** {@inheritDoc} */ + @Override public String name() { + return "Ignite Native I/O Plugin [Direct I/O]"; + } + + /** {@inheritDoc} */ + @Override public String version() { + return ""; + } + + /** {@inheritDoc} */ + @Override public String copyright() { + return "Copyright(C) Apache Software Foundation"; + } + + /** {@inheritDoc} */ + @Override public void initExtensions(PluginContext ctx, ExtensionRegistry registry) { + // No-op. + } + + /** {@inheritDoc} */ + @Override public CachePluginProvider createCacheProvider(CachePluginContext ctx) { + return null; + } + + /** {@inheritDoc} */ + @Override public void start(PluginContext ctx) { + final Ignite ignite = ctx.grid(); + + log = ignite.log(); + managedBuffers = setupDirect((IgniteEx)ignite); + } + + /** {@inheritDoc} */ + @Override public void stop(boolean cancel) { + freeDirectBuffers(); + } + + /** + * Free direct thread local buffer allocated for Direct IO user's threads. + */ + private void freeDirectBuffers() { + ConcurrentHashMap8<Long, Thread> buffers = managedBuffers; + + if (buffers == null) + return; + + managedBuffers = null; + + if (log.isDebugEnabled()) + log.debug("Direct IO buffers to be freed: " + buffers.size()); + + for (Map.Entry<Long, Thread> next : buffers.entrySet()) { + Thread th = next.getValue(); + Long addr = next.getKey(); + + if (log.isDebugEnabled()) + log.debug(String.format("Free Direct IO buffer [address=%d; Thread=%s; alive=%s]", + addr, th != null ? th.getName() : "", th != null && th.isAlive())); + + AlignedBuffers.free(addr); + } + + buffers.clear(); + } + + /** {@inheritDoc} */ + @Override public void onIgniteStart() { + // No-op. + } + + /** {@inheritDoc} */ + @Override public void onIgniteStop(boolean cancel) { + // No-op. + } + + /** {@inheritDoc} */ + @Nullable @Override public Serializable provideDiscoveryData(UUID nodeId) { + return null; + } + + /** {@inheritDoc} */ + @Override public void receiveDiscoveryData(UUID nodeId, Serializable data) { + // No-op. + } + + /** {@inheritDoc} */ + @Override public void validateNewNode(ClusterNode node) throws PluginValidationException { + // No-op. + } + + /** {@inheritDoc} */ + @Nullable @Override public Object createComponent(PluginContext ctx, Class cls) { + return null; + } + + /** {@inheritDoc} */ + @Override public IgnitePlugin plugin() { + return new LinuxNativeIoPlugin(); + } + + /** + * @param ignite Ignite starting up. + * @return Managed aligned buffers and its associated threads. This collection is used to free buffers. May return + * {@code null}. + */ + @Nullable private ConcurrentHashMap8<Long, Thread> setupDirect(IgniteEx ignite) { + GridCacheSharedContext<Object, Object> cacheCtx = ignite.context().cache().context(); + IgnitePageStoreManager ignitePageStoreMgr = cacheCtx.pageStore(); + + if (ignitePageStoreMgr == null) + return null; + + if (!(ignitePageStoreMgr instanceof FilePageStoreManager)) + return null; + + final FilePageStoreManager pageStore = (FilePageStoreManager)ignitePageStoreMgr; + FileIOFactory backupIoFactory = pageStore.getPageStoreFileIoFactory(); + + final AlignedBuffersDirectFileIOFactory factory = new AlignedBuffersDirectFileIOFactory( + ignite.log(), + pageStore.workDir(), + pageStore.pageSize(), + backupIoFactory); + + final FileWriteAheadLogManager walMgr = (FileWriteAheadLogManager)cacheCtx.wal(); + + if (walMgr != null && IgniteNativeIoLib.isJnaAvailable()) { + walMgr.setCreateWalFileListener(new IgniteInClosure<FileIO>() { + @Override public void apply(FileIO fileIO) { + adviceFileDontNeed(fileIO, walMgr.maxWalSegmentSize()); + } + }); + } + + if (!factory.isDirectIoAvailable()) + return null; + + GridCacheDatabaseSharedManager db = (GridCacheDatabaseSharedManager)cacheCtx.database(); + + db.setThreadBuf(new ThreadLocal<ByteBuffer>() { + @Override protected ByteBuffer initialValue() { + return factory.createManagedBuffer(pageStore.pageSize()); + } + }); + + pageStore.setPageStoreFileIOFactories(factory, backupIoFactory); + + return factory.managedAlignedBuffers(); + } + + /** + * Apply advice: The specified data will not be accessed in the near future. + * + * Useful for WAL segments to indicate file content won't be loaded. + * + * @param fileIO file to advice. + * @param size expected size of file. + */ + private void adviceFileDontNeed(FileIO fileIO, long size) { + try { + if(fileIO instanceof RandomAccessFileIO) { + RandomAccessFileIO chIo = (RandomAccessFileIO)fileIO; + + FileChannel ch = U.field(chIo, "ch"); + + FileDescriptor fd = U.field(ch, "fd"); + + int fdVal = U.field(fd, "fd"); + + int retVal = IgniteNativeIoLib.posix_fadvise(fdVal, 0, size, IgniteNativeIoLib.POSIX_FADV_DONTNEED); + + if (retVal != 0) { + U.warn(log, "Unable to apply fadvice on WAL file descriptor [fd=" + fdVal + "]:" + + IgniteNativeIoLib.strerror(retVal)); + } + } + } + catch (Exception e) { + U.warn(log, "Unable to advice on WAL file descriptor: [" + e.getMessage() + "]", e); + } + } +} http://git-wip-us.apache.org/repos/asf/ignite/blob/9bba2d52/modules/direct-io/src/main/resources/META-INF/services/org.apache.ignite.plugin.PluginProvider ---------------------------------------------------------------------- diff --git a/modules/direct-io/src/main/resources/META-INF/services/org.apache.ignite.plugin.PluginProvider b/modules/direct-io/src/main/resources/META-INF/services/org.apache.ignite.plugin.PluginProvider new file mode 100644 index 0000000..ee1ccd8 --- /dev/null +++ b/modules/direct-io/src/main/resources/META-INF/services/org.apache.ignite.plugin.PluginProvider @@ -0,0 +1 @@ +org.apache.ignite.internal.processors.cache.persistence.file.LinuxNativeIoPluginProvider \ No newline at end of file http://git-wip-us.apache.org/repos/asf/ignite/blob/9bba2d52/modules/direct-io/src/test/java/org/apache/ignite/internal/processors/cache/persistence/file/IgniteNativeIoWithNoPersistenceTest.java ---------------------------------------------------------------------- diff --git a/modules/direct-io/src/test/java/org/apache/ignite/internal/processors/cache/persistence/file/IgniteNativeIoWithNoPersistenceTest.java b/modules/direct-io/src/test/java/org/apache/ignite/internal/processors/cache/persistence/file/IgniteNativeIoWithNoPersistenceTest.java new file mode 100644 index 0000000..981e0d5 --- /dev/null +++ b/modules/direct-io/src/test/java/org/apache/ignite/internal/processors/cache/persistence/file/IgniteNativeIoWithNoPersistenceTest.java @@ -0,0 +1,76 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.cache.persistence.file; + +import com.google.common.base.Strings; +import org.apache.ignite.IgniteCache; +import org.apache.ignite.configuration.DataRegionConfiguration; +import org.apache.ignite.configuration.DataStorageConfiguration; +import org.apache.ignite.configuration.IgniteConfiguration; +import org.apache.ignite.internal.IgniteEx; +import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest; +import org.jetbrains.annotations.NotNull; + +/** + * Checks if Direct IO can be set up if no persistent store is configured + */ +public class IgniteNativeIoWithNoPersistenceTest extends GridCommonAbstractTest { + + /** {@inheritDoc} */ + @Override protected IgniteConfiguration getConfiguration(String igniteInstanceName) throws Exception { + IgniteConfiguration configuration = super.getConfiguration(igniteInstanceName); + + configuration.setDataStorageConfiguration(new DataStorageConfiguration() + .setDefaultDataRegionConfiguration(new DataRegionConfiguration())); + + return configuration; + } + + /** {@inheritDoc} */ + @Override protected void afterTest() throws Exception { + super.afterTest(); + + stopAllGrids(); + } + + /** + * Checks simple launch with native IO. + * @throws Exception if failed + */ + public void testDirectIoHandlesNoPersistentGrid() throws Exception { + IgniteEx ignite = startGrid(0); + + ignite.active(true); + + IgniteCache<Object, Object> cache = ignite.getOrCreateCache("cache"); + + for (int i = 0; i < 100; i++) + cache.put(i, valueForKey(i)); + + + stopAllGrids(); + } + + /** + * @param i key. + * @return value with extra data, which allows to verify + */ + @NotNull private String valueForKey(int i) { + return Strings.repeat(Integer.toString(i), 10); + } +} http://git-wip-us.apache.org/repos/asf/ignite/blob/9bba2d52/modules/direct-io/src/test/java/org/apache/ignite/testsuites/IgnitePdsNativeIoTestSuite.java ---------------------------------------------------------------------- diff --git a/modules/direct-io/src/test/java/org/apache/ignite/testsuites/IgnitePdsNativeIoTestSuite.java b/modules/direct-io/src/test/java/org/apache/ignite/testsuites/IgnitePdsNativeIoTestSuite.java new file mode 100644 index 0000000..48454ea --- /dev/null +++ b/modules/direct-io/src/test/java/org/apache/ignite/testsuites/IgnitePdsNativeIoTestSuite.java @@ -0,0 +1,38 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.ignite.testsuites; + +import junit.framework.TestSuite; +import org.apache.ignite.internal.processors.cache.persistence.file.IgniteNativeIoWithNoPersistenceTest; + +/** + * Subset of {@link IgnitePdsTestSuite} suite test, started with direct-oi jar in classpath. + */ +public class IgnitePdsNativeIoTestSuite extends TestSuite { + /** + * @return Suite. + */ + public static TestSuite suite() { + TestSuite suite = new TestSuite("Ignite Persistent Store Test Suite (with Direct IO)"); + + IgnitePdsTestSuite.addRealPageStoreTests(suite); + + suite.addTestSuite(IgniteNativeIoWithNoPersistenceTest.class); + + return suite; + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/ignite/blob/9bba2d52/modules/direct-io/src/test/java/org/apache/ignite/testsuites/IgnitePdsNativeIoTestSuite2.java ---------------------------------------------------------------------- diff --git a/modules/direct-io/src/test/java/org/apache/ignite/testsuites/IgnitePdsNativeIoTestSuite2.java b/modules/direct-io/src/test/java/org/apache/ignite/testsuites/IgnitePdsNativeIoTestSuite2.java new file mode 100644 index 0000000..54dd7d3 --- /dev/null +++ b/modules/direct-io/src/test/java/org/apache/ignite/testsuites/IgnitePdsNativeIoTestSuite2.java @@ -0,0 +1,36 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.ignite.testsuites; + +import junit.framework.TestSuite; + +/** + * Same as {@link IgnitePdsTestSuite2} but is started with direct-oi jar in classpath. + */ +public class IgnitePdsNativeIoTestSuite2 extends TestSuite { + /** + * @return Suite. + * @throws Exception If failed. + */ + public static TestSuite suite() throws Exception { + TestSuite suite = new TestSuite("Ignite Persistent Store Test Suite 2 (Native IO)"); + + IgnitePdsTestSuite2.addRealPageStoreTests(suite); + + return suite; + } +} http://git-wip-us.apache.org/repos/asf/ignite/blob/9bba2d52/pom.xml ---------------------------------------------------------------------- diff --git a/pom.xml b/pom.xml index 1d8637c..eab1ec8 100644 --- a/pom.xml +++ b/pom.xml @@ -52,6 +52,7 @@ <module>modules/tools</module> <module>modules/core</module> <module>modules/dev-utils</module> + <module>modules/direct-io</module> <module>modules/hadoop</module> <module>modules/extdata/p2p</module> <module>modules/extdata/uri</module>
