Author: szetszwo Date: Wed May 30 19:10:09 2012 New Revision: 1344419 URL: http://svn.apache.org/viewvc?rev=1344419&view=rev Log: HDFS-744. Support hsync in HDFS. Contributed by Lars Hofhans
Modified: hadoop/common/trunk/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/CreateFlag.java hadoop/common/trunk/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/FileSystem.java hadoop/common/trunk/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/SequenceFile.java hadoop/common/trunk/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/fs/TestFilterFileSystem.java Modified: hadoop/common/trunk/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/CreateFlag.java URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/CreateFlag.java?rev=1344419&r1=1344418&r2=1344419&view=diff ============================================================================== --- hadoop/common/trunk/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/CreateFlag.java (original) +++ hadoop/common/trunk/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/CreateFlag.java Wed May 30 19:10:09 2012 @@ -44,6 +44,9 @@ import org.apache.hadoop.classification. * else append to an existing file.</li> * <li> CREATE|OVERWRITE - to create a file if it does not exist, * else overwrite an existing file.</li> + * <li> SYNC_BLOCK - to force closed blocks to the disk device. + * In addition {@link Syncable#hsync()} should be called after each write, + * if true synchronous behavior is required.</li> * </ol> * * Following combination is not valid and will result in @@ -71,7 +74,12 @@ public enum CreateFlag { /** * Append to a file. See javadoc for more description. */ - APPEND((short) 0x04); + APPEND((short) 0x04), + + /** + * Force closed blocks to disk. Similar to POSIX O_SYNC. See javadoc for description. + */ + SYNC_BLOCK((short) 0x08); private final short mode; Modified: hadoop/common/trunk/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/FileSystem.java URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/FileSystem.java?rev=1344419&r1=1344418&r2=1344419&view=diff ============================================================================== --- hadoop/common/trunk/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/FileSystem.java (original) +++ hadoop/common/trunk/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/FileSystem.java Wed May 30 19:10:09 2012 @@ -830,6 +830,30 @@ public abstract class FileSystem extends long blockSize, Progressable progress) throws IOException; + /** + * Create an FSDataOutputStream at the indicated Path with write-progress + * reporting. + * @param f the file name to open + * @param permission + * @param flags {@link CreateFlag}s to use for this stream. + * @param bufferSize the size of the buffer to be used. + * @param replication required block replication for the file. + * @param blockSize + * @param progress + * @throws IOException + * @see #setPermission(Path, FsPermission) + */ + public FSDataOutputStream create(Path f, + FsPermission permission, + EnumSet<CreateFlag> flags, + int bufferSize, + short replication, + long blockSize, + Progressable progress) throws IOException { + // only DFS support this + return create(f, permission, flags.contains(CreateFlag.OVERWRITE), bufferSize, replication, blockSize, progress); + } + /*. * This create has been added to support the FileContext that processes @@ -954,10 +978,35 @@ public abstract class FileSystem extends public FSDataOutputStream createNonRecursive(Path f, FsPermission permission, boolean overwrite, int bufferSize, short replication, long blockSize, Progressable progress) throws IOException { - throw new IOException("createNonRecursive unsupported for this filesystem " - + this.getClass()); + return createNonRecursive(f, permission, + overwrite ? EnumSet.of(CreateFlag.CREATE, CreateFlag.OVERWRITE) + : EnumSet.of(CreateFlag.CREATE), bufferSize, + replication, blockSize, progress); } + /** + * Opens an FSDataOutputStream at the indicated Path with write-progress + * reporting. Same as create(), except fails if parent directory doesn't + * already exist. + * @param f the file name to open + * @param permission + * @param flags {@link CreateFlag}s to use for this stream. + * @param bufferSize the size of the buffer to be used. + * @param replication required block replication for the file. + * @param blockSize + * @param progress + * @throws IOException + * @see #setPermission(Path, FsPermission) + * @deprecated API only for 0.20-append + */ + @Deprecated + public FSDataOutputStream createNonRecursive(Path f, FsPermission permission, + EnumSet<CreateFlag> flags, int bufferSize, short replication, long blockSize, + Progressable progress) throws IOException { + throw new IOException("createNonRecursive unsupported for this filesystem " + + this.getClass()); + } + /** * Creates the given Path as a brand-new zero-length file. If * create fails, or if it already existed, return false. Modified: hadoop/common/trunk/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/SequenceFile.java URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/SequenceFile.java?rev=1344419&r1=1344418&r2=1344419&view=diff ============================================================================== --- hadoop/common/trunk/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/SequenceFile.java (original) +++ hadoop/common/trunk/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/SequenceFile.java Wed May 30 19:10:09 2012 @@ -807,7 +807,7 @@ public class SequenceFile { } /** Write key/value pairs to a sequence-format file. */ - public static class Writer implements java.io.Closeable { + public static class Writer implements java.io.Closeable, Syncable { private Configuration conf; FSDataOutputStream out; boolean ownOutputStream = true; @@ -1193,13 +1193,31 @@ public class SequenceFile { } } - /** flush all currently written data to the file system */ + /** + * flush all currently written data to the file system + * @deprecated Use {@link #hsync()} or {@link #hflush()} instead + */ + @Deprecated public void syncFs() throws IOException { if (out != null) { out.hflush(); // flush contents to file system } } + @Override + public void hsync() throws IOException { + if (out != null) { + out.hsync(); + } + } + + @Override + public void hflush() throws IOException { + if (out != null) { + out.hflush(); + } + } + /** Returns the configuration of this file. */ Configuration getConf() { return conf; } Modified: hadoop/common/trunk/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/fs/TestFilterFileSystem.java URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/fs/TestFilterFileSystem.java?rev=1344419&r1=1344418&r2=1344419&view=diff ============================================================================== --- hadoop/common/trunk/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/fs/TestFilterFileSystem.java (original) +++ hadoop/common/trunk/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/fs/TestFilterFileSystem.java Wed May 30 19:10:09 2012 @@ -74,6 +74,11 @@ public class TestFilterFileSystem { Progressable progress) throws IOException { return null; } + public FSDataOutputStream createNonRecursive(Path f, FsPermission permission, + EnumSet<CreateFlag> flags, int bufferSize, short replication, long blockSize, + Progressable progress) throws IOException { + return null; + } public boolean mkdirs(Path f) { return false; } public FSDataInputStream open(Path f) { return null; } public FSDataOutputStream create(Path f) { return null; } @@ -123,6 +128,15 @@ public class TestFilterFileSystem { Progressable progress) { return null; } + public FSDataOutputStream create(Path f, + FsPermission permission, + EnumSet<CreateFlag> flags, + int bufferSize, + short replication, + long blockSize, + Progressable progress) throws IOException { + return null; + } public String getName() { return null; } public boolean delete(Path f) { return false; } public short getReplication(Path src) { return 0 ; }