[FLINK-5812] [core] Cleanups in FileSystem (round 1) - This makes the FileSystem use the 'WriteMode' (otherwise it was an unused enumeration) - Extends comments - Deprecate the method that controls the replication factor and block size
Project: http://git-wip-us.apache.org/repos/asf/flink/repo Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/a1bfae95 Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/a1bfae95 Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/a1bfae95 Branch: refs/heads/master Commit: a1bfae95fec8d076ef90d5a36ffa32d3870870d8 Parents: 31c26e3 Author: Stephan Ewen <[email protected]> Authored: Wed Feb 15 17:10:53 2017 +0100 Committer: Stephan Ewen <[email protected]> Committed: Mon Feb 20 01:01:24 2017 +0100 ---------------------------------------------------------------------- .../flink/api/common/io/FileOutputFormat.java | 4 +- .../org/apache/flink/core/fs/FileSystem.java | 82 +++++++++++++++++--- .../core/fs/SafetyNetWrapperFileSystem.java | 6 +- .../flink/core/fs/local/LocalFileSystem.java | 26 ++++--- .../flink/util/AbstractCloseableRegistry.java | 15 ++-- .../flink/runtime/fs/hdfs/HadoopFileSystem.java | 4 +- .../flink/runtime/fs/maprfs/MapRFileSystem.java | 4 +- 7 files changed, 104 insertions(+), 37 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/flink/blob/a1bfae95/flink-core/src/main/java/org/apache/flink/api/common/io/FileOutputFormat.java ---------------------------------------------------------------------- diff --git a/flink-core/src/main/java/org/apache/flink/api/common/io/FileOutputFormat.java b/flink-core/src/main/java/org/apache/flink/api/common/io/FileOutputFormat.java index 0ab12df..1382f06 100644 --- a/flink-core/src/main/java/org/apache/flink/api/common/io/FileOutputFormat.java +++ b/flink-core/src/main/java/org/apache/flink/api/common/io/FileOutputFormat.java @@ -104,7 +104,7 @@ public abstract class FileOutputFormat<IT> extends RichOutputFormat<IT> implemen protected Path outputFilePath; /** - * The write mode of the output. + * The write mode of the output. */ private WriteMode writeMode; @@ -249,7 +249,7 @@ public abstract class FileOutputFormat<IT> extends RichOutputFormat<IT> implemen this.actualFilePath = (numTasks > 1 || outputDirectoryMode == OutputDirectoryMode.ALWAYS) ? p.suffix("/" + getDirectoryFileName(taskNumber)) : p; // create output file - this.stream = fs.create(this.actualFilePath, writeMode == WriteMode.OVERWRITE); + this.stream = fs.create(this.actualFilePath, writeMode); // at this point, the file creation must have succeeded, or an exception has been thrown this.fileCreated = true; http://git-wip-us.apache.org/repos/asf/flink/blob/a1bfae95/flink-core/src/main/java/org/apache/flink/core/fs/FileSystem.java ---------------------------------------------------------------------- diff --git a/flink-core/src/main/java/org/apache/flink/core/fs/FileSystem.java b/flink-core/src/main/java/org/apache/flink/core/fs/FileSystem.java index c3828fb..4149d5e 100644 --- a/flink-core/src/main/java/org/apache/flink/core/fs/FileSystem.java +++ b/flink-core/src/main/java/org/apache/flink/core/fs/FileSystem.java @@ -33,6 +33,7 @@ import org.apache.flink.core.fs.local.LocalFileSystem; import org.apache.flink.util.IOUtils; import org.apache.flink.util.OperatingSystem; import org.apache.flink.util.Preconditions; + import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -59,7 +60,20 @@ import static org.apache.flink.util.Preconditions.checkNotNull; * machine-local file system). Other file system types are accessed by an implementation that bridges * to the suite of file systems supported by Hadoop (such as for example HDFS). * - * <h2>Data Persistence</h2> + * <h2>Scope and Purpose</h2> + * + * The purpose of this abstraction is used to expose a common and well defined interface for + * access to files. This abstraction is used both by Flink's fault tolerance mechanism (storing + * state and recovery data) and by reusable built-in connectors (file sources / sinks). + * + * <p>The purpose of this abstraction is <b>not</b> to give user programs an abstraction with + * extreme flexibility and control across all possible file systems. That mission would be a folly, + * as the differences in characteristics of even the most common file systems are already quite + * large. It is expected that user programs that need specialized functionality of certain file systems + * in their functions, operations, sources, or sinks instantiate the specialized file system adapters + * directly. + * + * <h2>Data Persistence Contract</h2> * * The FileSystem's {@link FSDataOutputStream output streams} are used to persistently store data, * both for results of streaming applications and for fault tolerance and recovery. It is therefore @@ -152,6 +166,14 @@ import static org.apache.flink.util.Preconditions.checkNotNull; * in between read or write operations, because there are no guarantees about the visibility of * operations across threads (many operations do not create memory fences). * + * <h2>Streams Safety Net</h2> + * + * When application code obtains a FileSystem (via {@link FileSystem#get(URI)} or via + * {@link Path#getFileSystem()}), the FileSystem instantiates a safety net for that FileSystem. + * The safety net ensures that all streams created from the FileSystem are closed when the + * application task finishes (or is canceled or failed). That way, the task's threads do not + * leak connections. + * * @see FSDataInputStream * @see FSDataOutputStream */ @@ -164,11 +186,13 @@ public abstract class FileSystem { */ public enum WriteMode { - /** Creates the target file if it does not exist. Does not overwrite existing files and directories. */ + /** Creates the target file only if no file exists at that path already. + * Does not overwrite existing files and directories. */ NO_OVERWRITE, - /** Creates a new target file regardless of any existing files or directories. Existing files and - * directories will be removed/overwritten. */ + /** Creates a new target file regardless of any existing files or directories. + * Existing files and directories will be deleted (recursively) automatically before + * creating the new file. */ OVERWRITE } @@ -555,7 +579,6 @@ public abstract class FileSystem { * source file */ public boolean exists(final Path f) throws IOException { - try { return (getFileStatus(f) != null); } catch (FileNotFoundException e) { @@ -590,6 +613,11 @@ public abstract class FileSystem { /** * Opens an FSDataOutputStream at the indicated Path. + * + * <p>This method is deprecated, because most of its parameters are ignored by most file systems. + * To control for example the replication factor and block size in the Hadoop Distributed File system, + * make sure that the respective Hadoop configuration file is either linked from the Flink configuration, + * or in the classpath of either Flink or the user code. * * @param f * the file name to open @@ -602,8 +630,15 @@ public abstract class FileSystem { * required block replication for the file. * @param blockSize * the size of the file blocks - * @throws IOException + * + * @throws IOException Thrown, if the stream could not be opened because of an I/O, or because + * a file already exists at that path and the write mode indicates to not + * overwrite the file. + * + * @deprecated Deprecated because not well supported across types of file systems. + * Control the behavior of specific file systems via configurations instead. */ + @Deprecated public abstract FSDataOutputStream create(Path f, boolean overwrite, int bufferSize, short replication, long blockSize) throws IOException; @@ -615,9 +650,34 @@ public abstract class FileSystem { * @param overwrite * if a file with this name already exists, then if true, * the file will be overwritten, and if false an error will be thrown. - * @throws IOException + * + * @throws IOException Thrown, if the stream could not be opened because of an I/O, or because + * a file already exists at that path and the write mode indicates to not + * overwrite the file. + * + * @deprecated Use {@link #create(Path, WriteMode)} instead. + */ + @Deprecated + public FSDataOutputStream create(Path f, boolean overwrite) throws IOException { + return create(f, overwrite ? WriteMode.OVERWRITE : WriteMode.NO_OVERWRITE); + } + + /** + * Opens an FSDataOutputStream to a new file at the given path. + * + * <p>If the file already exists, the behavior depends on the given {@code WriteMode}. + * If the mode is set to {@link WriteMode#NO_OVERWRITE}, then this method fails with an + * exception. + * + * @param f The file path to write to + * @param overwriteMode The action to take if a file or directory already exists at the given path. + * @return The stream to the new file at the target path. + * + * @throws IOException Thrown, if the stream could not be opened because of an I/O, or because + * a file already exists at that path and the write mode indicates to not + * overwrite the file. */ - public abstract FSDataOutputStream create(Path f, boolean overwrite) throws IOException; + public abstract FSDataOutputStream create(Path f, WriteMode overwriteMode) throws IOException; /** * Renames the file/directory src to dst. @@ -632,7 +692,9 @@ public abstract class FileSystem { public abstract boolean rename(Path src, Path dst) throws IOException; /** - * Returns true if this is a distributed file system, false otherwise. + * Returns true if this is a distributed file system. A distributed file system here means + * that the file system is shared among all Flink processes that participate in a cluster or + * job and that all these processes can see the same files. * * @return True, if this is a distributed file system, false otherwise. */ @@ -911,7 +973,7 @@ public abstract class FileSystem { * An identifier of a file system, via its scheme and its authority. * This class needs to stay public, because it is detected as part of the public API. */ - public static class FSKey { + private static final class FSKey { /** The scheme of the file system. */ private final String scheme; http://git-wip-us.apache.org/repos/asf/flink/blob/a1bfae95/flink-core/src/main/java/org/apache/flink/core/fs/SafetyNetWrapperFileSystem.java ---------------------------------------------------------------------- diff --git a/flink-core/src/main/java/org/apache/flink/core/fs/SafetyNetWrapperFileSystem.java b/flink-core/src/main/java/org/apache/flink/core/fs/SafetyNetWrapperFileSystem.java index 63e6253..1dacafd 100644 --- a/flink-core/src/main/java/org/apache/flink/core/fs/SafetyNetWrapperFileSystem.java +++ b/flink-core/src/main/java/org/apache/flink/core/fs/SafetyNetWrapperFileSystem.java @@ -30,8 +30,8 @@ import java.net.URI; * {@link ClosingFSDataInputStream} or {@link ClosingFSDataOutputStream} and (ii) registers them to * a {@link SafetyNetCloseableRegistry}. * - * Streams obtained by this are therefore managed by the {@link SafetyNetCloseableRegistry} to prevent resource leaks - * from unclosed streams. + * <p>Streams obtained by this are therefore managed by the {@link SafetyNetCloseableRegistry} to + * prevent resource leaks from unclosed streams. */ @Internal public class SafetyNetWrapperFileSystem extends FileSystem implements WrappingProxy<FileSystem> { @@ -120,7 +120,7 @@ public class SafetyNetWrapperFileSystem extends FileSystem implements WrappingPr } @Override - public FSDataOutputStream create(Path f, boolean overwrite) throws IOException { + public FSDataOutputStream create(Path f, WriteMode overwrite) throws IOException { FSDataOutputStream innerStream = unsafeFileSystem.create(f, overwrite); return ClosingFSDataOutputStream.wrapSafe(innerStream, registry, String.valueOf(f)); } http://git-wip-us.apache.org/repos/asf/flink/blob/a1bfae95/flink-core/src/main/java/org/apache/flink/core/fs/local/LocalFileSystem.java ---------------------------------------------------------------------- diff --git a/flink-core/src/main/java/org/apache/flink/core/fs/local/LocalFileSystem.java b/flink-core/src/main/java/org/apache/flink/core/fs/local/LocalFileSystem.java index acbf814..12aeb7f 100644 --- a/flink-core/src/main/java/org/apache/flink/core/fs/local/LocalFileSystem.java +++ b/flink-core/src/main/java/org/apache/flink/core/fs/local/LocalFileSystem.java @@ -43,9 +43,12 @@ import java.io.IOException; import java.net.InetAddress; import java.net.URI; import java.net.UnknownHostException; +import java.nio.file.FileAlreadyExistsException; + +import static org.apache.flink.util.Preconditions.checkNotNull; /** - * The class <code>LocalFile</code> provides an implementation of the {@link FileSystem} interface + * The class {@code LocalFileSystem} is an implementation of the {@link FileSystem} interface * for the local file system of the machine where the JVM runs. */ @Internal @@ -231,28 +234,27 @@ public class LocalFileSystem extends FileSystem { return (parent == null || mkdirs(parent)) && (p2f.mkdir() || p2f.isDirectory()); } - @Override - public FSDataOutputStream create(final Path f, final boolean overwrite, final int bufferSize, - final short replication, final long blockSize) throws IOException { + public FSDataOutputStream create(final Path filePath, final WriteMode overwrite) throws IOException { + checkNotNull(filePath, "filePath"); - if (exists(f) && !overwrite) { - throw new IOException("File already exists:" + f); + if (exists(filePath) && overwrite == WriteMode.NO_OVERWRITE) { + throw new FileAlreadyExistsException("File already exists: " + filePath); } - final Path parent = f.getParent(); + final Path parent = filePath.getParent(); if (parent != null && !mkdirs(parent)) { - throw new IOException("Mkdirs failed to create " + parent.toString()); + throw new IOException("Mkdirs failed to create " + parent); } - final File file = pathToFile(f); + final File file = pathToFile(filePath); return new LocalDataOutputStream(file); } - @Override - public FSDataOutputStream create(final Path f, final boolean overwrite) throws IOException { - return create(f, overwrite, 0, (short) 0, 0); + public FSDataOutputStream create( + Path f, boolean overwrite, int bufferSize, short replication, long blockSize) throws IOException { + return create(f, overwrite ? WriteMode.OVERWRITE : WriteMode.NO_OVERWRITE); } http://git-wip-us.apache.org/repos/asf/flink/blob/a1bfae95/flink-core/src/main/java/org/apache/flink/util/AbstractCloseableRegistry.java ---------------------------------------------------------------------- diff --git a/flink-core/src/main/java/org/apache/flink/util/AbstractCloseableRegistry.java b/flink-core/src/main/java/org/apache/flink/util/AbstractCloseableRegistry.java index e165d97..2b7a8c8 100644 --- a/flink-core/src/main/java/org/apache/flink/util/AbstractCloseableRegistry.java +++ b/flink-core/src/main/java/org/apache/flink/util/AbstractCloseableRegistry.java @@ -27,10 +27,10 @@ import java.util.Map; /** * This is the abstract base class for registries that allow to register instances of {@link Closeable}, which are all * closed if this registry is closed. - * <p> - * Registering to an already closed registry will throw an exception and close the provided {@link Closeable} - * <p> - * All methods in this class are thread-safe. + * + * <p>Registering to an already closed registry will throw an exception and close the provided {@link Closeable} + * + * <p>All methods in this class are thread-safe. * * @param <C> Type of the closeable this registers * @param <T> Type for potential meta data associated with the registering closeables @@ -51,7 +51,7 @@ public abstract class AbstractCloseableRegistry<C extends Closeable, T> implemen * {@link IllegalStateException} and closes the passed {@link Closeable}. * * @param closeable Closeable tor register - * @return true if the the Closeable was newly added to the registry + * * @throws IOException exception when the registry was closed before */ public final void registerClosable(C closeable) throws IOException { @@ -74,7 +74,6 @@ public abstract class AbstractCloseableRegistry<C extends Closeable, T> implemen * Removes a {@link Closeable} from the registry. * * @param closeable instance to remove from the registry. - * @return true, if the instance was actually registered and now removed */ public final void unregisterClosable(C closeable) { @@ -109,6 +108,10 @@ public abstract class AbstractCloseableRegistry<C extends Closeable, T> implemen return closeableToRef; } + // ------------------------------------------------------------------------ + // + // ------------------------------------------------------------------------ + protected abstract void doUnRegister(C closeable, Map<Closeable, T> closeableMap); protected abstract void doRegister(C closeable, Map<Closeable, T> closeableMap) throws IOException; http://git-wip-us.apache.org/repos/asf/flink/blob/a1bfae95/flink-runtime/src/main/java/org/apache/flink/runtime/fs/hdfs/HadoopFileSystem.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/fs/hdfs/HadoopFileSystem.java b/flink-runtime/src/main/java/org/apache/flink/runtime/fs/hdfs/HadoopFileSystem.java index 36dfa55..1371d21 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/fs/hdfs/HadoopFileSystem.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/fs/hdfs/HadoopFileSystem.java @@ -417,9 +417,9 @@ public final class HadoopFileSystem extends FileSystem implements HadoopFileSyst @Override - public HadoopDataOutputStream create(final Path f, final boolean overwrite) throws IOException { + public HadoopDataOutputStream create(final Path f, final WriteMode overwrite) throws IOException { final org.apache.hadoop.fs.FSDataOutputStream fsDataOutputStream = this.fs - .create(new org.apache.hadoop.fs.Path(f.toString()), overwrite); + .create(new org.apache.hadoop.fs.Path(f.toString()), overwrite == WriteMode.OVERWRITE); return new HadoopDataOutputStream(fsDataOutputStream); } http://git-wip-us.apache.org/repos/asf/flink/blob/a1bfae95/flink-runtime/src/main/java/org/apache/flink/runtime/fs/maprfs/MapRFileSystem.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/fs/maprfs/MapRFileSystem.java b/flink-runtime/src/main/java/org/apache/flink/runtime/fs/maprfs/MapRFileSystem.java index a7ef441..57eea6f 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/fs/maprfs/MapRFileSystem.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/fs/maprfs/MapRFileSystem.java @@ -327,11 +327,11 @@ public final class MapRFileSystem extends FileSystem { } @Override - public FSDataOutputStream create(final Path f, final boolean overwrite) + public FSDataOutputStream create(final Path f, final WriteMode overwrite) throws IOException { final org.apache.hadoop.fs.FSDataOutputStream fdos = this.fs.create( - new org.apache.hadoop.fs.Path(f.toString()), overwrite); + new org.apache.hadoop.fs.Path(f.toString()), overwrite == WriteMode.OVERWRITE); return new HadoopDataOutputStream(fdos); }
