http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/7e71a2e0/ratis-common/src/main/java/org/apache/ratis/io/nativeio/NativeIO.java ---------------------------------------------------------------------- diff --git a/ratis-common/src/main/java/org/apache/ratis/io/nativeio/NativeIO.java b/ratis-common/src/main/java/org/apache/ratis/io/nativeio/NativeIO.java new file mode 100644 index 0000000..9d20682 --- /dev/null +++ b/ratis-common/src/main/java/org/apache/ratis/io/nativeio/NativeIO.java @@ -0,0 +1,805 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.ratis.io.nativeio; + +import com.google.common.annotations.VisibleForTesting; + +import org.apache.ratis.protocol.AlreadyExistsException; +import org.apache.ratis.util.NativeCodeLoader; +import org.apache.ratis.util.RaftUtils; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import sun.misc.Unsafe; + +import java.io.File; +import java.io.FileDescriptor; +import java.io.FileInputStream; +import java.io.FileOutputStream; +import java.io.IOException; +import java.io.RandomAccessFile; +import java.lang.reflect.Field; +import java.nio.ByteBuffer; +import java.nio.MappedByteBuffer; +import java.nio.channels.FileChannel; + +/** + * JNI wrappers for various native IO-related calls not available in Java. + * These functions should generally be used alongside a fallback to another + * more portable mechanism. + */ +public class NativeIO { + private static final Logger LOG = LoggerFactory.getLogger(NativeIO.class); + + public static class POSIX { + // Flags for open() call from bits/fcntl.h - Set by JNI + public static int O_RDONLY = -1; + public static int O_WRONLY = -1; + public static int O_RDWR = -1; + public static int O_CREAT = -1; + public static int O_EXCL = -1; + public static int O_NOCTTY = -1; + public static int O_TRUNC = -1; + public static int O_APPEND = -1; + public static int O_NONBLOCK = -1; + public static int O_SYNC = -1; + + // Flags for posix_fadvise() from bits/fcntl.h - Set by JNI + /* No further special treatment. */ + public static int POSIX_FADV_NORMAL = -1; + /* Expect random page references. */ + public static int POSIX_FADV_RANDOM = -1; + /* Expect sequential page references. */ + public static int POSIX_FADV_SEQUENTIAL = -1; + /* Will need these pages. */ + public static int POSIX_FADV_WILLNEED = -1; + /* Don't need these pages. */ + public static int POSIX_FADV_DONTNEED = -1; + /* Data will be accessed once. */ + public static int POSIX_FADV_NOREUSE = -1; + + + // Updated by JNI when supported by glibc. Leave defaults in case kernel + // supports sync_file_range, but glibc does not. + /* Wait upon writeout of all pages + in the range before performing the + write. */ + public static int SYNC_FILE_RANGE_WAIT_BEFORE = 1; + /* Initiate writeout of all those + dirty pages in the range which are + not presently under writeback. */ + public static int SYNC_FILE_RANGE_WRITE = 2; + /* Wait upon writeout of all pages in + the range after performing the + write. */ + public static int SYNC_FILE_RANGE_WAIT_AFTER = 4; + + // Set to true via JNI if possible + public static boolean fadvisePossible = false; + + private static boolean nativeLoaded = false; + private static boolean syncFileRangePossible = true; + + private static long cacheTimeout = -1; + + private static CacheManipulator cacheManipulator = new CacheManipulator(); + + public static CacheManipulator getCacheManipulator() { + return cacheManipulator; + } + + public static void setCacheManipulator(CacheManipulator cacheManipulator) { + POSIX.cacheManipulator = cacheManipulator; + } + + /** + * Used to manipulate the operating system cache. + */ + @VisibleForTesting + public static class CacheManipulator { + public void mlock(String identifier, ByteBuffer buffer, + long len) throws IOException { + POSIX.mlock(buffer, len); + } + + public long getMemlockLimit() { + return NativeIO.getMemlockLimit(); + } + + public long getOperatingSystemPageSize() { + return NativeIO.getOperatingSystemPageSize(); + } + + public void posixFadviseIfPossible(String identifier, + FileDescriptor fd, long offset, long len, int flags) + throws NativeIOException { + NativeIO.POSIX.posixFadviseIfPossible(identifier, fd, offset, + len, flags); + } + + public boolean verifyCanMlock() { + return NativeIO.isAvailable(); + } + } + + /** + * A CacheManipulator used for testing which does not actually call mlock. + * This allows many tests to be run even when the operating system does not + * allow mlock, or only allows limited mlocking. + */ + @VisibleForTesting + public static class NoMlockCacheManipulator extends CacheManipulator { + public void mlock(String identifier, ByteBuffer buffer, + long len) throws IOException { + LOG.info("mlocking " + identifier); + } + + public long getMemlockLimit() { + return 1125899906842624L; + } + + public long getOperatingSystemPageSize() { + return 4096; + } + + public boolean verifyCanMlock() { + return true; + } + } + + static { + initNativeLib(); + } + + /** + * Return true if the JNI-based native IO extensions are available. + */ + public static boolean isAvailable() { + return NativeCodeLoader.isNativeCodeLoaded() && nativeLoaded; + } + + private static void assertCodeLoaded() throws IOException { + if (!isAvailable()) { + throw new IOException("NativeIO was not loaded"); + } + } + + /** Wrapper around open(2) */ + public static native FileDescriptor open(String path, int flags, int mode) throws IOException; + /** Wrapper around fstat(2) */ + private static native Stat fstat(FileDescriptor fd) throws IOException; + + /** Native chmod implementation. On UNIX, it is a wrapper around chmod(2) */ + private static native void chmodImpl(String path, int mode) throws IOException; + + public static void chmod(String path, int mode) throws IOException { + if (!RaftUtils.WINDOWS) { + chmodImpl(path, mode); + } else { + try { + chmodImpl(path, mode); + } catch (NativeIOException nioe) { + if (nioe.getErrorCode() == 3) { + throw new NativeIOException("No such file or directory", + Errno.ENOENT); + } else { + LOG.warn(String.format("NativeIO.chmod error (%d): %s", + nioe.getErrorCode(), nioe.getMessage())); + throw new NativeIOException("Unknown error", Errno.UNKNOWN); + } + } + } + } + + /** Wrapper around posix_fadvise(2) */ + static native void posix_fadvise( + FileDescriptor fd, long offset, long len, int flags) throws NativeIOException; + + /** Wrapper around sync_file_range(2) */ + static native void sync_file_range( + FileDescriptor fd, long offset, long nbytes, int flags) throws NativeIOException; + + /** + * Call posix_fadvise on the given file descriptor. See the manpage + * for this syscall for more information. On systems where this + * call is not available, does nothing. + * + * @throws NativeIOException if there is an error with the syscall + */ + static void posixFadviseIfPossible(String identifier, + FileDescriptor fd, long offset, long len, int flags) + throws NativeIOException { + if (nativeLoaded && fadvisePossible) { + try { + posix_fadvise(fd, offset, len, flags); + } catch (UnsatisfiedLinkError ule) { + fadvisePossible = false; + } + } + } + + /** + * Call sync_file_range on the given file descriptor. See the manpage + * for this syscall for more information. On systems where this + * call is not available, does nothing. + * + * @throws NativeIOException if there is an error with the syscall + */ + public static void syncFileRangeIfPossible( + FileDescriptor fd, long offset, long nbytes, int flags) + throws NativeIOException { + if (nativeLoaded && syncFileRangePossible) { + try { + sync_file_range(fd, offset, nbytes, flags); + } catch (UnsupportedOperationException | UnsatisfiedLinkError uoe) { + syncFileRangePossible = false; + } + } + } + + static native void mlock_native( + ByteBuffer buffer, long len) throws NativeIOException; + + /** + * Locks the provided direct ByteBuffer into memory, preventing it from + * swapping out. After a buffer is locked, future accesses will not incur + * a page fault. + * + * See the mlock(2) man page for more information. + */ + static void mlock(ByteBuffer buffer, long len) + throws IOException { + assertCodeLoaded(); + if (!buffer.isDirect()) { + throw new IOException("Cannot mlock a non-direct ByteBuffer"); + } + mlock_native(buffer, len); + } + + /** + * Unmaps the block from memory. See munmap(2). + * + * There isn't any portable way to unmap a memory region in Java. + * So we use the sun.nio method here. + * Note that unmapping a memory region could cause crashes if code + * continues to reference the unmapped code. However, if we don't + * manually unmap the memory, we are dependent on the finalizer to + * do it, and we have no idea when the finalizer will run. + * + * @param buffer The buffer to unmap. + */ + public static void munmap(MappedByteBuffer buffer) { + if (buffer instanceof sun.nio.ch.DirectBuffer) { + sun.misc.Cleaner cleaner = + ((sun.nio.ch.DirectBuffer)buffer).cleaner(); + cleaner.clean(); + } + } + + /** Linux only methods used for getOwner() implementation */ + private static native long getUIDforFDOwnerforOwner(FileDescriptor fd) throws IOException; + private static native String getUserName(long uid) throws IOException; + + /** + * Result type of the fstat call + */ + public static class Stat { + private int ownerId, groupId; + private String owner, group; + private int mode; + + // Mode constants - Set by JNI + public static int S_IFMT = -1; /* type of file */ + public static int S_IFIFO = -1; /* named pipe (fifo) */ + public static int S_IFCHR = -1; /* character special */ + public static int S_IFDIR = -1; /* directory */ + public static int S_IFBLK = -1; /* block special */ + public static int S_IFREG = -1; /* regular */ + public static int S_IFLNK = -1; /* symbolic link */ + public static int S_IFSOCK = -1; /* socket */ + public static int S_ISUID = -1; /* set user id on execution */ + public static int S_ISGID = -1; /* set group id on execution */ + public static int S_ISVTX = -1; /* save swapped text even after use */ + public static int S_IRUSR = -1; /* read permission, owner */ + public static int S_IWUSR = -1; /* write permission, owner */ + public static int S_IXUSR = -1; /* execute/search permission, owner */ + + Stat(int ownerId, int groupId, int mode) { + this.ownerId = ownerId; + this.groupId = groupId; + this.mode = mode; + } + + Stat(String owner, String group, int mode) { + if (!RaftUtils.WINDOWS) { + this.owner = owner; + } else { + this.owner = stripDomain(owner); + } + if (!RaftUtils.WINDOWS) { + this.group = group; + } else { + this.group = stripDomain(group); + } + this.mode = mode; + } + + @Override + public String toString() { + return "Stat(owner='" + owner + "', group='" + group + "'" + + ", mode=" + mode + ")"; + } + + public String getOwner() { + return owner; + } + public String getGroup() { + return group; + } + public int getMode() { + return mode; + } + } + + private static class CachedName { + final long timestamp; + final String name; + + public CachedName(String name, long timestamp) { + this.name = name; + this.timestamp = timestamp; + } + } + + public final static int MMAP_PROT_READ = 0x1; + public final static int MMAP_PROT_WRITE = 0x2; + public final static int MMAP_PROT_EXEC = 0x4; + + public static native long mmap(FileDescriptor fd, int prot, + boolean shared, long length) throws IOException; + + public static native void munmap(long addr, long length) + throws IOException; + } + + private static boolean workaroundNonThreadSafePasswdCalls = false; + + + public static class Windows { + // Flags for CreateFile() call on Windows + public static final long GENERIC_READ = 0x80000000L; + public static final long GENERIC_WRITE = 0x40000000L; + + public static final long FILE_SHARE_READ = 0x00000001L; + public static final long FILE_SHARE_WRITE = 0x00000002L; + public static final long FILE_SHARE_DELETE = 0x00000004L; + + public static final long CREATE_NEW = 1; + public static final long CREATE_ALWAYS = 2; + public static final long OPEN_EXISTING = 3; + public static final long OPEN_ALWAYS = 4; + public static final long TRUNCATE_EXISTING = 5; + + public static final long FILE_BEGIN = 0; + public static final long FILE_CURRENT = 1; + public static final long FILE_END = 2; + + public static final long FILE_ATTRIBUTE_NORMAL = 0x00000080L; + + /** + * Create a directory with permissions set to the specified mode. By setting + * permissions at creation time, we avoid issues related to the user lacking + * WRITE_DAC rights on subsequent chmod calls. One example where this can + * occur is writing to an SMB share where the user does not have Full Control + * rights, and therefore WRITE_DAC is denied. + * + * @param path directory to create + * @param mode permissions of new directory + * @throws IOException if there is an I/O error + */ + public static void createDirectoryWithMode(File path, int mode) + throws IOException { + createDirectoryWithMode0(path.getAbsolutePath(), mode); + } + + /** Wrapper around CreateDirectory() on Windows */ + private static native void createDirectoryWithMode0(String path, int mode) + throws NativeIOException; + + /** Wrapper around CreateFile() on Windows */ + public static native FileDescriptor createFile(String path, + long desiredAccess, long shareMode, long creationDisposition) + throws IOException; + + /** + * Create a file for write with permissions set to the specified mode. By + * setting permissions at creation time, we avoid issues related to the user + * lacking WRITE_DAC rights on subsequent chmod calls. One example where + * this can occur is writing to an SMB share where the user does not have + * Full Control rights, and therefore WRITE_DAC is denied. + * + * This method mimics the semantics implemented by the JDK in + * {@link FileOutputStream}. The file is opened for truncate or + * append, the sharing mode allows other readers and writers, and paths + * longer than MAX_PATH are supported. (See io_util_md.c in the JDK.) + * + * @param path file to create + * @param append if true, then open file for append + * @param mode permissions of new directory + * @return FileOutputStream of opened file + * @throws IOException if there is an I/O error + */ + public static FileOutputStream createFileOutputStreamWithMode(File path, + boolean append, int mode) throws IOException { + long shareMode = FILE_SHARE_READ | FILE_SHARE_WRITE; + long creationDisposition = append ? OPEN_ALWAYS : CREATE_ALWAYS; + return new FileOutputStream(createFileWithMode0(path.getAbsolutePath(), + GENERIC_WRITE, shareMode, creationDisposition, mode)); + } + + /** Wrapper around CreateFile() with security descriptor on Windows */ + private static native FileDescriptor createFileWithMode0(String path, + long desiredAccess, long shareMode, long creationDisposition, int mode) + throws NativeIOException; + + /** Wrapper around SetFilePointer() on Windows */ + public static native long setFilePointer(FileDescriptor fd, + long distanceToMove, long moveMethod) throws IOException; + + /** Windows only methods used for getOwner() implementation */ + private static native String getOwner(FileDescriptor fd) throws IOException; + + /** Supported list of Windows access right flags */ + public enum AccessRight { + ACCESS_READ (0x0001), // FILE_READ_DATA + ACCESS_WRITE (0x0002), // FILE_WRITE_DATA + ACCESS_EXECUTE (0x0020); // FILE_EXECUTE + + private final int accessRight; + AccessRight(int access) { + accessRight = access; + } + + public int accessRight() { + return accessRight; + } + } + + /** Windows only method used to check if the current process has requested + * access rights on the given path. */ + private static native boolean access0(String path, int requestedAccess); + + /** + * Checks whether the current process has desired access rights on + * the given path. + * + * Longer term this native function can be substituted with JDK7 + * function Files#isReadable, isWritable, isExecutable. + * + * @param path input path + * @param desiredAccess ACCESS_READ, ACCESS_WRITE or ACCESS_EXECUTE + * @return true if access is allowed + * @throws IOException I/O exception on error + */ + public static boolean access(String path, AccessRight desiredAccess) + throws IOException { + return access0(path, desiredAccess.accessRight()); + } + + /** + * Extends both the minimum and maximum working set size of the current + * process. This method gets the current minimum and maximum working set + * size, adds the requested amount to each and then sets the minimum and + * maximum working set size to the new values. Controlling the working set + * size of the process also controls the amount of memory it can lock. + * + * @param delta amount to increment minimum and maximum working set size + * @throws IOException for any error + * @see POSIX#mlock(ByteBuffer, long) + */ + public static native void extendWorkingSetSize(long delta) throws IOException; + + static { + initNativeLib(); + } + } + + private static boolean nativeLoaded = false; + + static { + initNativeLib(); + } + + private static void initNativeLib() { + if (NativeCodeLoader.isNativeCodeLoaded()) { + try { + initNative(); + nativeLoaded = true; + } catch (Throwable t) { + LOG.debug("Unable to initialize NativeIO libraries", t); + } + } + } + + /** + * Return true if the JNI-based native IO extensions are available. + */ + public static boolean isAvailable() { + return NativeCodeLoader.isNativeCodeLoaded() && nativeLoaded; + } + + /** Initialize the JNI method ID and class ID cache */ + private static native void initNative(); + + /** + * Get the maximum number of bytes that can be locked into memory at any + * given point. + * + * @return 0 if no bytes can be locked into memory; + * Long.MAX_VALUE if there is no limit; + * The number of bytes that can be locked into memory otherwise. + */ + static long getMemlockLimit() { + return isAvailable() ? getMemlockLimit0() : 0; + } + + private static native long getMemlockLimit0(); + + /** + * @return the operating system's page size. + */ + static long getOperatingSystemPageSize() { + try { + Field f = Unsafe.class.getDeclaredField("theUnsafe"); + f.setAccessible(true); + Unsafe unsafe = (Unsafe)f.get(null); + return unsafe.pageSize(); + } catch (Throwable e) { + LOG.warn("Unable to get operating system page size. Guessing 4096.", e); + return 4096; + } + } + + private static class CachedUid { + final long timestamp; + final String username; + public CachedUid(String username, long timestamp) { + this.timestamp = timestamp; + this.username = username; + } + } + + private static boolean initialized = false; + + /** + * The Windows logon name has two part, NetBIOS domain name and + * user account name, of the format DOMAIN\UserName. This method + * will remove the domain part of the full logon name. + * + * @param name full principal name containing the domain + * @return name with domain removed + */ + private static String stripDomain(String name) { + int i = name.indexOf('\\'); + if (i != -1) + name = name.substring(i + 1); + return name; + } + + /** + * Create a FileInputStream that shares delete permission on the + * file opened, i.e. other process can delete the file the + * FileInputStream is reading. Only Windows implementation uses + * the native interface. + */ + public static FileInputStream getShareDeleteFileInputStream(File f) + throws IOException { + if (!RaftUtils.WINDOWS) { + // On Linux the default FileInputStream shares delete permission + // on the file opened. + // + return new FileInputStream(f); + } else { + // Use Windows native interface to create a FileInputStream that + // shares delete permission on the file opened. + // + FileDescriptor fd = Windows.createFile( + f.getAbsolutePath(), + Windows.GENERIC_READ, + Windows.FILE_SHARE_READ | + Windows.FILE_SHARE_WRITE | + Windows.FILE_SHARE_DELETE, + Windows.OPEN_EXISTING); + return new FileInputStream(fd); + } + } + + /** + * Create a FileInputStream that shares delete permission on the + * file opened at a given offset, i.e. other process can delete + * the file the FileInputStream is reading. Only Windows implementation + * uses the native interface. + */ + public static FileInputStream getShareDeleteFileInputStream(File f, long seekOffset) + throws IOException { + if (!RaftUtils.WINDOWS) { + RandomAccessFile rf = new RandomAccessFile(f, "r"); + if (seekOffset > 0) { + rf.seek(seekOffset); + } + return new FileInputStream(rf.getFD()); + } else { + // Use Windows native interface to create a FileInputStream that + // shares delete permission on the file opened, and set it to the + // given offset. + // + FileDescriptor fd = NativeIO.Windows.createFile( + f.getAbsolutePath(), + NativeIO.Windows.GENERIC_READ, + NativeIO.Windows.FILE_SHARE_READ | + NativeIO.Windows.FILE_SHARE_WRITE | + NativeIO.Windows.FILE_SHARE_DELETE, + NativeIO.Windows.OPEN_EXISTING); + if (seekOffset > 0) + NativeIO.Windows.setFilePointer(fd, seekOffset, NativeIO.Windows.FILE_BEGIN); + return new FileInputStream(fd); + } + } + + /** + * Create the specified File for write access, ensuring that it does not exist. + * @param f the file that we want to create + * @param permissions we want to have on the file (if security is enabled) + * + * @throws AlreadyExistsException if the file already exists + * @throws IOException if any other error occurred + */ + public static FileOutputStream getCreateForWriteFileOutputStream(File f, int permissions) + throws IOException { + if (!RaftUtils.WINDOWS) { + // Use the native wrapper around open(2) + try { + FileDescriptor fd = NativeIO.POSIX.open(f.getAbsolutePath(), + NativeIO.POSIX.O_WRONLY | NativeIO.POSIX.O_CREAT + | NativeIO.POSIX.O_EXCL, permissions); + return new FileOutputStream(fd); + } catch (NativeIOException nioe) { + if (nioe.getErrno() == Errno.EEXIST) { + throw new AlreadyExistsException(nioe); + } + throw nioe; + } + } else { + // Use the Windows native APIs to create equivalent FileOutputStream + try { + FileDescriptor fd = NativeIO.Windows.createFile(f.getCanonicalPath(), + NativeIO.Windows.GENERIC_WRITE, + NativeIO.Windows.FILE_SHARE_DELETE + | NativeIO.Windows.FILE_SHARE_READ + | NativeIO.Windows.FILE_SHARE_WRITE, + NativeIO.Windows.CREATE_NEW); + NativeIO.POSIX.chmod(f.getCanonicalPath(), permissions); + return new FileOutputStream(fd); + } catch (NativeIOException nioe) { + if (nioe.getErrorCode() == 80) { + // ERROR_FILE_EXISTS + // 80 (0x50) + // The file exists + throw new AlreadyExistsException(nioe); + } + throw nioe; + } + } + } + + /** + * A version of renameTo that throws a descriptive exception when it fails. + * + * @param src The source path + * @param dst The destination path + * + * @throws NativeIOException On failure. + */ + public static void renameTo(File src, File dst) + throws IOException { + if (!nativeLoaded) { + if (!src.renameTo(dst)) { + throw new IOException("renameTo(src=" + src + ", dst=" + + dst + ") failed."); + } + } else { + renameTo0(src.getAbsolutePath(), dst.getAbsolutePath()); + } + } + + /** + * A version of renameTo that throws a descriptive exception when it fails. + * + * @param src The source path + * @param dst The destination path + * + * @throws NativeIOException On failure. + */ + private static native void renameTo0(String src, String dst) + throws NativeIOException; + + private static native void link0(String src, String dst) + throws NativeIOException; + + /** + * Unbuffered file copy from src to dst without tainting OS buffer cache + * + * In POSIX platform: + * It uses FileChannel#transferTo() which internally attempts + * unbuffered IO on OS with native sendfile64() support and falls back to + * buffered IO otherwise. + * + * It minimizes the number of FileChannel#transferTo call by passing the the + * src file size directly instead of a smaller size as the 3rd parameter. + * This saves the number of sendfile64() system call when native sendfile64() + * is supported. In the two fall back cases where sendfile is not supported, + * FileChannle#transferTo already has its own batching of size 8 MB and 8 KB, + * respectively. + * + * In Windows Platform: + * It uses its own native wrapper of CopyFileEx with COPY_FILE_NO_BUFFERING + * flag, which is supported on Windows Server 2008 and above. + * + * Ideally, we should use FileChannel#transferTo() across both POSIX and Windows + * platform. Unfortunately, the wrapper(Java_sun_nio_ch_FileChannelImpl_transferTo0) + * used by FileChannel#transferTo for unbuffered IO is not implemented on Windows. + * Based on OpenJDK 6/7/8 source code, Java_sun_nio_ch_FileChannelImpl_transferTo0 + * on Windows simply returns IOS_UNSUPPORTED. + * + * Note: This simple native wrapper does minimal parameter checking before copy and + * consistency check (e.g., size) after copy. + * It is recommended to use wrapper function like + * the Storage#nativeCopyFileUnbuffered() function with pre/post copy checks. + * + * @param src The source path + * @param dst The destination path + */ + public static void copyFileUnbuffered(File src, File dst) throws IOException { + if (nativeLoaded && RaftUtils.WINDOWS) { + copyFileUnbuffered0(src.getAbsolutePath(), dst.getAbsolutePath()); + } else { + FileInputStream fis = null; + FileOutputStream fos = null; + FileChannel input = null; + FileChannel output = null; + try { + fis = new FileInputStream(src); + fos = new FileOutputStream(dst); + input = fis.getChannel(); + output = fos.getChannel(); + long remaining = input.size(); + long position = 0; + long transferred; + while (remaining > 0) { + transferred = input.transferTo(position, remaining, output); + remaining -= transferred; + position += transferred; + } + } finally { + RaftUtils.cleanup(LOG, output, fos, input, fis); + } + } + } + + private static native void copyFileUnbuffered0(String src, String dst) + throws NativeIOException; +}
http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/7e71a2e0/ratis-common/src/main/java/org/apache/ratis/io/nativeio/NativeIOException.java ---------------------------------------------------------------------- diff --git a/ratis-common/src/main/java/org/apache/ratis/io/nativeio/NativeIOException.java b/ratis-common/src/main/java/org/apache/ratis/io/nativeio/NativeIOException.java new file mode 100644 index 0000000..58b83e7 --- /dev/null +++ b/ratis-common/src/main/java/org/apache/ratis/io/nativeio/NativeIOException.java @@ -0,0 +1,70 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.ratis.io.nativeio; + +import java.io.IOException; + +import org.apache.ratis.util.RaftUtils; + + +/** + * An exception generated by a call to the native IO code. + * + * These exceptions simply wrap <i>errno</i> result codes on Linux, + * or the System Error Code on Windows. + */ +public class NativeIOException extends IOException { + private static final long serialVersionUID = 1L; + + private Errno errno; + + // Java has no unsigned primitive error code. Use a signed 32-bit + // integer to hold the unsigned 32-bit integer. + private int errorCode; + + public NativeIOException(String msg, Errno errno) { + super(msg); + this.errno = errno; + // Windows error code is always set to ERROR_SUCCESS on Linux, + // i.e. no failure on Windows + this.errorCode = 0; + } + + public NativeIOException(String msg, int errorCode) { + super(msg); + this.errorCode = errorCode; + this.errno = Errno.UNKNOWN; + } + + public long getErrorCode() { + return errorCode; + } + + public Errno getErrno() { + return errno; + } + + @Override + public String toString() { + if (RaftUtils.WINDOWS) + return errorCode + ": " + super.getMessage(); + else + return errno.toString() + ": " + super.getMessage(); + } +} + http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/7e71a2e0/ratis-common/src/main/java/org/apache/ratis/protocol/AlreadyExistsException.java ---------------------------------------------------------------------- diff --git a/ratis-common/src/main/java/org/apache/ratis/protocol/AlreadyExistsException.java b/ratis-common/src/main/java/org/apache/ratis/protocol/AlreadyExistsException.java new file mode 100644 index 0000000..cc441f2 --- /dev/null +++ b/ratis-common/src/main/java/org/apache/ratis/protocol/AlreadyExistsException.java @@ -0,0 +1,36 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.ratis.protocol; + +import java.io.IOException; + +/** + * Signals that an attempt to create a file at a given pathname has failed + * because another file already existed at that path. + */ +public class AlreadyExistsException extends IOException { + private static final long serialVersionUID = 1L; + + public AlreadyExistsException(String msg) { + super(msg); + } + + public AlreadyExistsException(Throwable cause) { + super(cause); + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/7e71a2e0/ratis-common/src/main/java/org/apache/ratis/protocol/ChecksumException.java ---------------------------------------------------------------------- diff --git a/ratis-common/src/main/java/org/apache/ratis/protocol/ChecksumException.java b/ratis-common/src/main/java/org/apache/ratis/protocol/ChecksumException.java new file mode 100644 index 0000000..1742c24 --- /dev/null +++ b/ratis-common/src/main/java/org/apache/ratis/protocol/ChecksumException.java @@ -0,0 +1,35 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ratis.protocol; + +import java.io.IOException; + +/** Thrown for checksum errors. */ +public class ChecksumException extends IOException { + private static final long serialVersionUID = 1L; + private long pos; + public ChecksumException(String description, long pos) { + super(description); + this.pos = pos; + } + + public long getPos() { + return pos; + } +} http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/7e71a2e0/ratis-common/src/main/java/org/apache/ratis/protocol/Message.java ---------------------------------------------------------------------- diff --git a/ratis-common/src/main/java/org/apache/ratis/protocol/Message.java b/ratis-common/src/main/java/org/apache/ratis/protocol/Message.java new file mode 100644 index 0000000..77ef267 --- /dev/null +++ b/ratis-common/src/main/java/org/apache/ratis/protocol/Message.java @@ -0,0 +1,30 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.ratis.protocol; + +import org.apache.ratis.shaded.com.google.protobuf.ByteString; + +/** + * The information clients append to the raft ring. + */ +public interface Message { + /** + * @return the content of the message + */ + ByteString getContent(); +} http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/7e71a2e0/ratis-common/src/main/java/org/apache/ratis/protocol/NotLeaderException.java ---------------------------------------------------------------------- diff --git a/ratis-common/src/main/java/org/apache/ratis/protocol/NotLeaderException.java b/ratis-common/src/main/java/org/apache/ratis/protocol/NotLeaderException.java new file mode 100644 index 0000000..1306290 --- /dev/null +++ b/ratis-common/src/main/java/org/apache/ratis/protocol/NotLeaderException.java @@ -0,0 +1,40 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.ratis.protocol; + +public class NotLeaderException extends RaftException { + private final RaftPeer suggestedLeader; + /** the client may need to update its RaftPeer list */ + private final RaftPeer[] peers; + + public NotLeaderException(String id, RaftPeer suggestedLeader, + RaftPeer[] peers) { + super("Server " + id + " is not the leader (" + suggestedLeader + + "). Request must be sent to leader."); + this.suggestedLeader = suggestedLeader; + this.peers = peers == null ? RaftPeer.EMPTY_PEERS : peers; + } + + public RaftPeer getSuggestedLeader() { + return suggestedLeader; + } + + public RaftPeer[] getPeers() { + return peers; + } +} http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/7e71a2e0/ratis-common/src/main/java/org/apache/ratis/protocol/RaftClientAsynchronousProtocol.java ---------------------------------------------------------------------- diff --git a/ratis-common/src/main/java/org/apache/ratis/protocol/RaftClientAsynchronousProtocol.java b/ratis-common/src/main/java/org/apache/ratis/protocol/RaftClientAsynchronousProtocol.java new file mode 100644 index 0000000..3298431 --- /dev/null +++ b/ratis-common/src/main/java/org/apache/ratis/protocol/RaftClientAsynchronousProtocol.java @@ -0,0 +1,30 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.ratis.protocol; + +import java.io.IOException; +import java.util.concurrent.CompletableFuture; + +/** Asynchronous version of {@link RaftClientProtocol}. */ +public interface RaftClientAsynchronousProtocol { + CompletableFuture<RaftClientReply> submitClientRequestAsync( + RaftClientRequest request) throws IOException; + + CompletableFuture<RaftClientReply> setConfigurationAsync( + SetConfigurationRequest request) throws IOException; +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/7e71a2e0/ratis-common/src/main/java/org/apache/ratis/protocol/RaftClientProtocol.java ---------------------------------------------------------------------- diff --git a/ratis-common/src/main/java/org/apache/ratis/protocol/RaftClientProtocol.java b/ratis-common/src/main/java/org/apache/ratis/protocol/RaftClientProtocol.java new file mode 100644 index 0000000..b3cbcc3 --- /dev/null +++ b/ratis-common/src/main/java/org/apache/ratis/protocol/RaftClientProtocol.java @@ -0,0 +1,26 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.ratis.protocol; + +import java.io.IOException; + +public interface RaftClientProtocol { + RaftClientReply submitClientRequest(RaftClientRequest request) throws IOException; + + RaftClientReply setConfiguration(SetConfigurationRequest request) throws IOException; +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/7e71a2e0/ratis-common/src/main/java/org/apache/ratis/protocol/RaftClientReply.java ---------------------------------------------------------------------- diff --git a/ratis-common/src/main/java/org/apache/ratis/protocol/RaftClientReply.java b/ratis-common/src/main/java/org/apache/ratis/protocol/RaftClientReply.java new file mode 100644 index 0000000..8c5cd75 --- /dev/null +++ b/ratis-common/src/main/java/org/apache/ratis/protocol/RaftClientReply.java @@ -0,0 +1,91 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.ratis.protocol; + +public class RaftClientReply extends RaftRpcMessage { + private final String requestorId; + private final String replierId; + private final boolean success; + private final long seqNum; + + /** non-null if the server is not leader */ + private final NotLeaderException notLeaderException; + private final Message message; + + public RaftClientReply(String requestorId, String replierId, long seqNum, + boolean success, Message message, NotLeaderException notLeaderException) { + this.requestorId = requestorId; + this.replierId = replierId; + this.success = success; + this.seqNum = seqNum; + this.message = message; + this.notLeaderException = notLeaderException; + } + + public RaftClientReply(RaftClientRequest request, + NotLeaderException notLeaderException) { + this(request.getRequestorId(), request.getReplierId(), request.getSeqNum(), + false, null, notLeaderException); + } + + public RaftClientReply(RaftClientRequest request, Message message) { + this(request.getRequestorId(), request.getReplierId(), request.getSeqNum(), + true, message, null); + } + + @Override + public final boolean isRequest() { + return false; + } + + @Override + public String getRequestorId() { + return requestorId; + } + + @Override + public String getReplierId() { + return replierId; + } + + public long getSeqNum() { + return seqNum; + } + + @Override + public String toString() { + return super.toString() + ", seqNum: " + getSeqNum() + + ", success: " + isSuccess(); + } + + public boolean isSuccess() { + return success; + } + + public Message getMessage() { + return message; + } + + public NotLeaderException getNotLeaderException() { + return notLeaderException; + } + + public boolean isNotLeader() { + return notLeaderException != null; + } +} http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/7e71a2e0/ratis-common/src/main/java/org/apache/ratis/protocol/RaftClientRequest.java ---------------------------------------------------------------------- diff --git a/ratis-common/src/main/java/org/apache/ratis/protocol/RaftClientRequest.java b/ratis-common/src/main/java/org/apache/ratis/protocol/RaftClientRequest.java new file mode 100644 index 0000000..90b648a --- /dev/null +++ b/ratis-common/src/main/java/org/apache/ratis/protocol/RaftClientRequest.java @@ -0,0 +1,73 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.ratis.protocol; + +public class RaftClientRequest extends RaftRpcMessage { + private final String requestorId; + private final String replierId; + private final long seqNum; + private final Message message; + private final boolean readOnly; + + public RaftClientRequest(String requestorId, String replierId, long seqNum, + Message message) { + this(requestorId, replierId, seqNum, message, false); + } + + public RaftClientRequest(String requestorId, String replierId, long seqNum, + Message message, boolean readOnly) { + this.requestorId = requestorId; + this.replierId = replierId; + this.seqNum = seqNum; + this.message = message; + this.readOnly = readOnly; + } + + @Override + public final boolean isRequest() { + return true; + } + + @Override + public String getRequestorId() { + return requestorId; + } + + @Override + public String getReplierId() { + return replierId; + } + + public long getSeqNum() { + return seqNum; + } + + public Message getMessage() { + return message; + } + + public boolean isReadOnly() { + return readOnly; + } + + @Override + public String toString() { + return super.toString() + ", seqNum: " + seqNum + ", " + + (isReadOnly()? "RO": "RW"); + } +} http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/7e71a2e0/ratis-common/src/main/java/org/apache/ratis/protocol/RaftException.java ---------------------------------------------------------------------- diff --git a/ratis-common/src/main/java/org/apache/ratis/protocol/RaftException.java b/ratis-common/src/main/java/org/apache/ratis/protocol/RaftException.java new file mode 100644 index 0000000..11aac90 --- /dev/null +++ b/ratis-common/src/main/java/org/apache/ratis/protocol/RaftException.java @@ -0,0 +1,36 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.ratis.protocol; + +import java.io.IOException; + +public class RaftException extends IOException { + private static final long serialVersionUID = 1L; + + public RaftException(String message) { + super(message); + } + + public RaftException(Throwable cause) { + super(cause); + } + + public RaftException(String message, Throwable cause) { + super(message, cause); + } +} http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/7e71a2e0/ratis-common/src/main/java/org/apache/ratis/protocol/RaftPeer.java ---------------------------------------------------------------------- diff --git a/ratis-common/src/main/java/org/apache/ratis/protocol/RaftPeer.java b/ratis-common/src/main/java/org/apache/ratis/protocol/RaftPeer.java new file mode 100644 index 0000000..a32aaa0 --- /dev/null +++ b/ratis-common/src/main/java/org/apache/ratis/protocol/RaftPeer.java @@ -0,0 +1,79 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.ratis.protocol; + +import com.google.common.net.HostAndPort; + +import java.net.InetSocketAddress; + +/** + * A {@link RaftPeer} is a server in a Raft cluster. + * + * The objects of this class are immutable. + */ +public class RaftPeer { + public static final RaftPeer[] EMPTY_PEERS = {}; + + /** The id of the peer. */ + private final String id; + /** The address of the peer. */ + private final String address; + + /** Construct a peer with the given id and a null address. */ + public RaftPeer(String id) { + this(id, (String)null); + } + + /** Construct a peer with the given id and address. */ + public RaftPeer(String id, InetSocketAddress address) { + this(id, address == null ? null : + HostAndPort.fromParts(address.getAddress().getHostAddress(), + address.getPort()).toString()); + } + + /** Construct a peer with the given id and address. */ + public RaftPeer(String id, String address) { + this.id = id; + this.address = address; + } + + /** @return The id of the peer. */ + public String getId() { + return id; + } + + /** @return The address of the peer. */ + public String getAddress() { + return address; + } + + @Override + public String toString() { + return id + ":" + address; + } + + @Override + public boolean equals(Object o) { + return (o instanceof RaftPeer) && id.equals(((RaftPeer) o).getId()); + } + + @Override + public int hashCode() { + return id.hashCode(); + } +} http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/7e71a2e0/ratis-common/src/main/java/org/apache/ratis/protocol/RaftRpcMessage.java ---------------------------------------------------------------------- diff --git a/ratis-common/src/main/java/org/apache/ratis/protocol/RaftRpcMessage.java b/ratis-common/src/main/java/org/apache/ratis/protocol/RaftRpcMessage.java new file mode 100644 index 0000000..82f1ebb --- /dev/null +++ b/ratis-common/src/main/java/org/apache/ratis/protocol/RaftRpcMessage.java @@ -0,0 +1,33 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.ratis.protocol; + +public abstract class RaftRpcMessage { + + public abstract boolean isRequest(); + + public abstract String getRequestorId(); + + public abstract String getReplierId(); + + @Override + public String toString() { + return getClass().getSimpleName() + "(" + getRequestorId() + + (isRequest()? "->": "<-") + getReplierId() + ")"; + } +} http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/7e71a2e0/ratis-common/src/main/java/org/apache/ratis/protocol/ReconfigurationInProgressException.java ---------------------------------------------------------------------- diff --git a/ratis-common/src/main/java/org/apache/ratis/protocol/ReconfigurationInProgressException.java b/ratis-common/src/main/java/org/apache/ratis/protocol/ReconfigurationInProgressException.java new file mode 100644 index 0000000..3d0f093 --- /dev/null +++ b/ratis-common/src/main/java/org/apache/ratis/protocol/ReconfigurationInProgressException.java @@ -0,0 +1,24 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.ratis.protocol; + +public class ReconfigurationInProgressException extends RaftException { + public ReconfigurationInProgressException(String message) { + super(message); + } +} http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/7e71a2e0/ratis-common/src/main/java/org/apache/ratis/protocol/ReconfigurationTimeoutException.java ---------------------------------------------------------------------- diff --git a/ratis-common/src/main/java/org/apache/ratis/protocol/ReconfigurationTimeoutException.java b/ratis-common/src/main/java/org/apache/ratis/protocol/ReconfigurationTimeoutException.java new file mode 100644 index 0000000..69a2e51 --- /dev/null +++ b/ratis-common/src/main/java/org/apache/ratis/protocol/ReconfigurationTimeoutException.java @@ -0,0 +1,24 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.ratis.protocol; + +public class ReconfigurationTimeoutException extends RaftException { + public ReconfigurationTimeoutException(String message) { + super(message); + } +} http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/7e71a2e0/ratis-common/src/main/java/org/apache/ratis/protocol/SetConfigurationRequest.java ---------------------------------------------------------------------- diff --git a/ratis-common/src/main/java/org/apache/ratis/protocol/SetConfigurationRequest.java b/ratis-common/src/main/java/org/apache/ratis/protocol/SetConfigurationRequest.java new file mode 100644 index 0000000..84449d4 --- /dev/null +++ b/ratis-common/src/main/java/org/apache/ratis/protocol/SetConfigurationRequest.java @@ -0,0 +1,39 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.ratis.protocol; + +import java.util.Arrays; + +public class SetConfigurationRequest extends RaftClientRequest { + private final RaftPeer[] peers; + + public SetConfigurationRequest(String requestorId, String replierId, + long seqNum, RaftPeer[] peers) { + super(requestorId, replierId, seqNum, null); + this.peers = peers; + } + + public RaftPeer[] getPeersInNewConf() { + return peers; + } + + @Override + public String toString() { + return super.toString() + ", peers:" + Arrays.asList(getPeersInNewConf()); + } +} http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/7e71a2e0/ratis-common/src/main/java/org/apache/ratis/protocol/StateMachineException.java ---------------------------------------------------------------------- diff --git a/ratis-common/src/main/java/org/apache/ratis/protocol/StateMachineException.java b/ratis-common/src/main/java/org/apache/ratis/protocol/StateMachineException.java new file mode 100644 index 0000000..099133d --- /dev/null +++ b/ratis-common/src/main/java/org/apache/ratis/protocol/StateMachineException.java @@ -0,0 +1,28 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.ratis.protocol; + +public class StateMachineException extends RaftException { + public StateMachineException(String serverId, Exception cause) { + super(cause.getClass().getName() + " from Server " + serverId, cause); + } + + public StateMachineException(String msg) { + super(msg); + } +} http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/7e71a2e0/ratis-common/src/main/java/org/apache/ratis/util/AtomicFileOutputStream.java ---------------------------------------------------------------------- diff --git a/ratis-common/src/main/java/org/apache/ratis/util/AtomicFileOutputStream.java b/ratis-common/src/main/java/org/apache/ratis/util/AtomicFileOutputStream.java new file mode 100644 index 0000000..e8e267e --- /dev/null +++ b/ratis-common/src/main/java/org/apache/ratis/util/AtomicFileOutputStream.java @@ -0,0 +1,112 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.ratis.util; + +import org.apache.ratis.io.nativeio.NativeIO; +import org.apache.ratis.io.nativeio.NativeIOException; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.*; + +/** + * A FileOutputStream that has the property that it will only show + * up at its destination once it has been entirely written and flushed + * to disk. While being written, it will use a .tmp suffix. + * + * When the output stream is closed, it is flushed, fsynced, and + * will be moved into place, overwriting any file that already + * exists at that location. + * + * <b>NOTE</b>: on Windows platforms, it will not atomically + * replace the target file - instead the target file is deleted + * before this one is moved into place. + */ +public class AtomicFileOutputStream extends FilterOutputStream { + + public static final String TMP_EXTENSION = ".tmp"; + + public static final Logger LOG = LoggerFactory.getLogger(AtomicFileOutputStream.class); + + private final File origFile; + private final File tmpFile; + + public AtomicFileOutputStream(File f) throws FileNotFoundException { + // Code unfortunately must be duplicated below since we can't assign anything + // before calling super + super(new FileOutputStream(new File(f.getParentFile(), f.getName() + TMP_EXTENSION))); + origFile = f.getAbsoluteFile(); + tmpFile = new File(f.getParentFile(), f.getName() + TMP_EXTENSION).getAbsoluteFile(); + } + + @Override + public void close() throws IOException { + boolean triedToClose = false, success = false; + try { + flush(); + ((FileOutputStream)out).getChannel().force(true); + + triedToClose = true; + super.close(); + success = true; + } finally { + if (success) { + boolean renamed = tmpFile.renameTo(origFile); + if (!renamed) { + // On windows, renameTo does not replace. + if (origFile.exists() && !origFile.delete()) { + throw new IOException("Could not delete original file " + origFile); + } + try { + NativeIO.renameTo(tmpFile, origFile); + } catch (NativeIOException e) { + throw new IOException("Could not rename temporary file " + tmpFile + + " to " + origFile + " due to failure in native rename. " + + e.toString()); + } + } + } else { + if (!triedToClose) { + // If we failed when flushing, try to close it to not leak an FD + RaftUtils.cleanup(LOG, out); + } + // close wasn't successful, try to delete the tmp file + if (!tmpFile.delete()) { + LOG.warn("Unable to delete tmp file " + tmpFile); + } + } + } + } + + /** + * Close the atomic file, but do not "commit" the temporary file + * on top of the destination. This should be used if there is a failure + * in writing. + */ + public void abort() { + try { + super.close(); + } catch (IOException ioe) { + LOG.warn("Unable to abort file " + tmpFile, ioe); + } + if (!tmpFile.delete()) { + LOG.warn("Unable to delete tmp file during abort " + tmpFile); + } + } + +} http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/7e71a2e0/ratis-common/src/main/java/org/apache/ratis/util/AutoCloseableLock.java ---------------------------------------------------------------------- diff --git a/ratis-common/src/main/java/org/apache/ratis/util/AutoCloseableLock.java b/ratis-common/src/main/java/org/apache/ratis/util/AutoCloseableLock.java new file mode 100644 index 0000000..489b5cd --- /dev/null +++ b/ratis-common/src/main/java/org/apache/ratis/util/AutoCloseableLock.java @@ -0,0 +1,57 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.ratis.util; + +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.locks.Lock; + +/** + * Wrap a lock with the {@link AutoCloseable} interface + * so that the {@link #close()} method will unlock the lock. + */ +public class AutoCloseableLock implements AutoCloseable { + /** + * Acquire the given lock and then wrap it with {@link AutoCloseableLock} + * so that the given lock can be released by calling {@link #close()}, + * or by using a {@code try}-with-resources statement as shown below. + * + * <pre> {@code + * try(AutoCloseableLock acl = AutoCloseableLock.acquire(lock)) { + * ... + * }}</pre> + */ + public static AutoCloseableLock acquire(final Lock lock) { + lock.lock(); + return new AutoCloseableLock(lock); + } + + private final Lock underlying; + private final AtomicBoolean closed = new AtomicBoolean(false); + + private AutoCloseableLock(Lock underlying) { + this.underlying = underlying; + } + + /** Unlock the underlying lock. This method is idempotent. */ + @Override + public void close() { + if (closed.compareAndSet(false, true)) { + underlying.unlock(); + } + } +} http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/7e71a2e0/ratis-common/src/main/java/org/apache/ratis/util/CheckedFunction.java ---------------------------------------------------------------------- diff --git a/ratis-common/src/main/java/org/apache/ratis/util/CheckedFunction.java b/ratis-common/src/main/java/org/apache/ratis/util/CheckedFunction.java new file mode 100644 index 0000000..4badc66 --- /dev/null +++ b/ratis-common/src/main/java/org/apache/ratis/util/CheckedFunction.java @@ -0,0 +1,30 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.ratis.util; + +import java.util.function.Function; + +/** Function with a throws-clause. */ +@FunctionalInterface +public interface CheckedFunction<INPUT, OUTPUT, THROWABLE extends Throwable> { + /** + * The same as {@link Function#apply(Object)} + * except that this method is declared with a throws-clause. + */ + OUTPUT apply(INPUT input) throws THROWABLE; +} http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/7e71a2e0/ratis-common/src/main/java/org/apache/ratis/util/CheckedRunnable.java ---------------------------------------------------------------------- diff --git a/ratis-common/src/main/java/org/apache/ratis/util/CheckedRunnable.java b/ratis-common/src/main/java/org/apache/ratis/util/CheckedRunnable.java new file mode 100644 index 0000000..b6e90b9 --- /dev/null +++ b/ratis-common/src/main/java/org/apache/ratis/util/CheckedRunnable.java @@ -0,0 +1,28 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.ratis.util; + +/** Runnable with a throws-clause. */ +@FunctionalInterface +public interface CheckedRunnable<THROWABLE extends Throwable> { + /** + * The same as {@link Runnable#run()} + * except that this method is declared with a throws-clause. + */ + void run() throws THROWABLE; +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/7e71a2e0/ratis-common/src/main/java/org/apache/ratis/util/CodeInjectionForTesting.java ---------------------------------------------------------------------- diff --git a/ratis-common/src/main/java/org/apache/ratis/util/CodeInjectionForTesting.java b/ratis-common/src/main/java/org/apache/ratis/util/CodeInjectionForTesting.java new file mode 100644 index 0000000..60b5daf --- /dev/null +++ b/ratis-common/src/main/java/org/apache/ratis/util/CodeInjectionForTesting.java @@ -0,0 +1,67 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.ratis.util; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.Arrays; +import java.util.Map; +import java.util.concurrent.ConcurrentHashMap; + +/** Inject code for testing. */ +public class CodeInjectionForTesting { + public static final Logger LOG = LoggerFactory.getLogger(CodeInjectionForTesting.class); + + /** Code to be injected. */ + public interface Code { + Logger LOG = CodeInjectionForTesting.LOG; + + /** + * Execute the injected code for testing. + * @param localId the id of the local peer + * @param remoteId the id of the remote peer if handling a request + * @param args other possible args + * @return if the injected code is executed + */ + boolean execute(String localId, String remoteId, Object... args); + } + + private static final Map<String, Code> INJECTION_POINTS + = new ConcurrentHashMap<>(); + + /** Put an injection point. */ + public static void put(String injectionPoint, Code code) { + LOG.debug("put: {}, {}", injectionPoint, code); + INJECTION_POINTS.put(injectionPoint, code); + } + + /** Execute the injected code, if there is any. */ + public static boolean execute(String injectionPoint, String localId, + String remoteId, Object... args) { + final Code code = INJECTION_POINTS.get(injectionPoint); + if (code == null) { + return false; + } + if (LOG.isDebugEnabled()) { + LOG.debug("execute: {}, {}, localId={}, remoteId={}, args={}", + injectionPoint, code, localId, remoteId, Arrays.toString(args)); + } + return code.execute(localId, remoteId, args); + } +} http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/7e71a2e0/ratis-common/src/main/java/org/apache/ratis/util/Daemon.java ---------------------------------------------------------------------- diff --git a/ratis-common/src/main/java/org/apache/ratis/util/Daemon.java b/ratis-common/src/main/java/org/apache/ratis/util/Daemon.java new file mode 100644 index 0000000..1ef95ae --- /dev/null +++ b/ratis-common/src/main/java/org/apache/ratis/util/Daemon.java @@ -0,0 +1,35 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.ratis.util; + +public class Daemon extends Thread { + { + setDaemon(true); + } + + /** Construct a daemon thread. */ + public Daemon() { + super(); + } + + /** Construct a daemon thread with the given runnable. */ + public Daemon(Runnable runnable) { + super(runnable); + this.setName(runnable.toString()); + } +} http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/7e71a2e0/ratis-common/src/main/java/org/apache/ratis/util/ExitUtils.java ---------------------------------------------------------------------- diff --git a/ratis-common/src/main/java/org/apache/ratis/util/ExitUtils.java b/ratis-common/src/main/java/org/apache/ratis/util/ExitUtils.java new file mode 100644 index 0000000..4404344 --- /dev/null +++ b/ratis-common/src/main/java/org/apache/ratis/util/ExitUtils.java @@ -0,0 +1,98 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.ratis.util; + +import org.slf4j.Logger; + +/** Facilitates hooking process termination for tests and debugging. */ +public class ExitUtils { + public static class ExitException extends RuntimeException { + private static final long serialVersionUID = 1L; + + public final int status; + + public ExitException(int status, String message, Throwable throwable) { + super(message, throwable); + this.status = status; + } + } + + private static volatile boolean systemExitDisabled = false; + private static volatile ExitException firstExitException; + + /** + * @return the first {@link ExitException} thrown, or null if none thrown yet. + */ + public static ExitException getFirstExitException() { + return firstExitException; + } + + /** + * Reset the tracking of process termination. + * This is useful when some tests expect an exit but the others do not. + */ + public static void resetFirstExitException() { + firstExitException = null; + } + + /** @return true if {@link #terminate(int, String, Throwable, Logger)} has been invoked. */ + public static boolean isTerminated() { + // Either this member is set or System.exit is actually invoked. + return firstExitException != null; + } + + /** Disable the use of {@link System#exit(int)} for testing. */ + public static void disableSystemExit() { + systemExitDisabled = true; + } + + /** + * Terminate the current process. Note that terminate is the *only* method + * that should be used to terminate the daemon processes. + * + * @param status Exit status + * @param message message used to create the {@code ExitException} + * @throws ExitException if System.exit is disabled for test purposes + */ + public static void terminate( + int status, String message, Throwable throwable, Logger log) + throws ExitException { + if (log != null) { + final String s = "Terminating with exit status " + status + ": " + message; + if (status == 0) { + log.info(s, throwable); + } else { + log.error(s, throwable); + } + } + + if (!systemExitDisabled) { + System.exit(status); + } + + final ExitException ee = new ExitException(status, message, throwable); + if (firstExitException == null) { + firstExitException = ee; + } + throw ee; + } + + public static void terminate(int status, String message, Logger log) { + terminate(status, message, null, log); + } +} http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/7e71a2e0/ratis-common/src/main/java/org/apache/ratis/util/FileUtils.java ---------------------------------------------------------------------- diff --git a/ratis-common/src/main/java/org/apache/ratis/util/FileUtils.java b/ratis-common/src/main/java/org/apache/ratis/util/FileUtils.java new file mode 100644 index 0000000..5dc509d --- /dev/null +++ b/ratis-common/src/main/java/org/apache/ratis/util/FileUtils.java @@ -0,0 +1,207 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.ratis.util; + +import org.apache.ratis.io.nativeio.NativeIO; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.File; +import java.io.FileOutputStream; +import java.io.IOException; +import java.net.URI; +import java.net.URISyntaxException; +import java.nio.file.Files; + +public class FileUtils { + public static final Logger LOG = LoggerFactory.getLogger(FileUtils.class); + + public static void truncateFile(File f, long target) throws IOException { + try (FileOutputStream out = new FileOutputStream(f, true)) { + out.getChannel().truncate(target); + } + } + + public static void deleteFile(File f) throws IOException { + try { + Files.delete(f.toPath()); + } catch (IOException e) { + LOG.warn("Could not delete " + f); + throw e; + } + } + + /** + * Delete a directory and all its contents. If + * we return false, the directory may be partially-deleted. + * (1) If dir is symlink to a file, the symlink is deleted. The file pointed + * to by the symlink is not deleted. + * (2) If dir is symlink to a directory, symlink is deleted. The directory + * pointed to by symlink is not deleted. + * (3) If dir is a normal file, it is deleted. + * (4) If dir is a normal directory, then dir and all its contents recursively + * are deleted. + */ + public static boolean fullyDelete(final File dir) { + if (deleteImpl(dir, false)) { + // dir is (a) normal file, (b) symlink to a file, (c) empty directory or + // (d) symlink to a directory + return true; + } + // handle nonempty directory deletion + return fullyDeleteContents(dir) && deleteImpl(dir, true); + } + + private static boolean deleteImpl(final File f, final boolean doLog) { + if (f == null) { + LOG.warn("null file argument."); + return false; + } + final boolean wasDeleted = f.delete(); + if (wasDeleted) { + return true; + } + final boolean ex = f.exists(); + if (doLog && ex) { + LOG.warn("Failed to delete file or dir [" + + f.getAbsolutePath() + "]: it still exists."); + } + return !ex; + } + + /** + * Delete the contents of a directory, not the directory itself. If + * we return false, the directory may be partially-deleted. + * If dir is a symlink to a directory, all the contents of the actual + * directory pointed to by dir will be deleted. + */ + private static boolean fullyDeleteContents(final File dir) { + boolean deletionSucceeded = true; + final File[] contents = dir.listFiles(); + if (contents != null) { + for (File content : contents) { + if (content.isFile()) { + if (!deleteImpl(content, true)) { + deletionSucceeded = false; + } + } else { + // Either directory or symlink to another directory. + // Try deleting the directory as this might be a symlink + if (deleteImpl(content, false)) { + // this was indeed a symlink or an empty directory + continue; + } + // if not an empty directory or symlink let + // fullyDelete handle it. + if (!fullyDelete(content)) { + deletionSucceeded = false; + // continue deletion of other files/dirs under dir + } + } + } + } + return deletionSucceeded; + } + + /** + * Interprets the passed string as a URI. In case of error it + * assumes the specified string is a file. + * + * @param s the string to interpret + * @return the resulting URI + */ + public static URI stringAsURI(String s) throws IOException { + URI u = null; + // try to make a URI + try { + u = new URI(s); + } catch (URISyntaxException e){ + LOG.error("Syntax error in URI " + s + + ". Please check hdfs configuration.", e); + } + + // if URI is null or scheme is undefined, then assume it's file:// + if(u == null || u.getScheme() == null){ + LOG.warn("Path " + s + " should be specified as a URI " + + "in configuration files. Please update configuration."); + u = fileAsURI(new File(s)); + } + return u; + } + + /** + * Converts the passed File to a URI. This method trims the trailing slash if + * one is appended because the underlying file is in fact a directory that + * exists. + * + * @param f the file to convert + * @return the resulting URI + */ + public static URI fileAsURI(File f) throws IOException { + URI u = f.getCanonicalFile().toURI(); + + // trim the trailing slash, if it's present + if (u.getPath().endsWith("/")) { + String uriAsString = u.toString(); + try { + u = new URI(uriAsString.substring(0, uriAsString.length() - 1)); + } catch (URISyntaxException e) { + throw new IOException(e); + } + } + return u; + } + + /** + * A wrapper for {@link File#listFiles()}. This java.io API returns null + * when a dir is not a directory or for any I/O error. Instead of having + * null check everywhere File#listFiles() is used, we will add utility API + * to get around this problem. For the majority of cases where we prefer + * an IOException to be thrown. + * @param dir directory for which listing should be performed + * @return list of files or empty list + * @exception IOException for invalid directory or for a bad disk. + */ + public static File[] listFiles(File dir) throws IOException { + File[] files = dir.listFiles(); + if(files == null) { + throw new IOException("Invalid directory or I/O error occurred for dir: " + + dir.toString()); + } + return files; + } + + /** + * Platform independent implementation for {@link File#canWrite()} + * @param f input file + * @return On Unix, same as {@link File#canWrite()} + * On Windows, true if process has write access on the path + */ + public static boolean canWrite(File f) { + if (RaftUtils.WINDOWS) { + try { + return NativeIO.Windows.access(f.getCanonicalPath(), + NativeIO.Windows.AccessRight.ACCESS_WRITE); + } catch (IOException e) { + return false; + } + } else { + return f.canWrite(); + } + } +}