[FLINK-5812] [core] Cleanups in FileSystem (round 1)

  - This makes the FileSystem use the 'WriteMode' (otherwise it was an unused 
enumeration)
  - Extends comments
  - Deprecate the method that controls the replication factor and block size


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/a1bfae95
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/a1bfae95
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/a1bfae95

Branch: refs/heads/master
Commit: a1bfae95fec8d076ef90d5a36ffa32d3870870d8
Parents: 31c26e3
Author: Stephan Ewen <[email protected]>
Authored: Wed Feb 15 17:10:53 2017 +0100
Committer: Stephan Ewen <[email protected]>
Committed: Mon Feb 20 01:01:24 2017 +0100

----------------------------------------------------------------------
 .../flink/api/common/io/FileOutputFormat.java   |  4 +-
 .../org/apache/flink/core/fs/FileSystem.java    | 82 +++++++++++++++++---
 .../core/fs/SafetyNetWrapperFileSystem.java     |  6 +-
 .../flink/core/fs/local/LocalFileSystem.java    | 26 ++++---
 .../flink/util/AbstractCloseableRegistry.java   | 15 ++--
 .../flink/runtime/fs/hdfs/HadoopFileSystem.java |  4 +-
 .../flink/runtime/fs/maprfs/MapRFileSystem.java |  4 +-
 7 files changed, 104 insertions(+), 37 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/a1bfae95/flink-core/src/main/java/org/apache/flink/api/common/io/FileOutputFormat.java
----------------------------------------------------------------------
diff --git 
a/flink-core/src/main/java/org/apache/flink/api/common/io/FileOutputFormat.java 
b/flink-core/src/main/java/org/apache/flink/api/common/io/FileOutputFormat.java
index 0ab12df..1382f06 100644
--- 
a/flink-core/src/main/java/org/apache/flink/api/common/io/FileOutputFormat.java
+++ 
b/flink-core/src/main/java/org/apache/flink/api/common/io/FileOutputFormat.java
@@ -104,7 +104,7 @@ public abstract class FileOutputFormat<IT> extends 
RichOutputFormat<IT> implemen
        protected Path outputFilePath;
        
        /**
-        * The write mode of the output.        
+        * The write mode of the output.
         */
        private WriteMode writeMode;
        
@@ -249,7 +249,7 @@ public abstract class FileOutputFormat<IT> extends 
RichOutputFormat<IT> implemen
                this.actualFilePath = (numTasks > 1 || outputDirectoryMode == 
OutputDirectoryMode.ALWAYS) ? p.suffix("/" + getDirectoryFileName(taskNumber)) 
: p;
 
                // create output file
-               this.stream = fs.create(this.actualFilePath, writeMode == 
WriteMode.OVERWRITE);
+               this.stream = fs.create(this.actualFilePath, writeMode);
                
                // at this point, the file creation must have succeeded, or an 
exception has been thrown
                this.fileCreated = true;

http://git-wip-us.apache.org/repos/asf/flink/blob/a1bfae95/flink-core/src/main/java/org/apache/flink/core/fs/FileSystem.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/core/fs/FileSystem.java 
b/flink-core/src/main/java/org/apache/flink/core/fs/FileSystem.java
index c3828fb..4149d5e 100644
--- a/flink-core/src/main/java/org/apache/flink/core/fs/FileSystem.java
+++ b/flink-core/src/main/java/org/apache/flink/core/fs/FileSystem.java
@@ -33,6 +33,7 @@ import org.apache.flink.core.fs.local.LocalFileSystem;
 import org.apache.flink.util.IOUtils;
 import org.apache.flink.util.OperatingSystem;
 import org.apache.flink.util.Preconditions;
+
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -59,7 +60,20 @@ import static 
org.apache.flink.util.Preconditions.checkNotNull;
  * machine-local file system). Other file system types are accessed by an 
implementation that bridges
  * to the suite of file systems supported by Hadoop (such as for example HDFS).
  * 
- * <h2>Data Persistence</h2>
+ * <h2>Scope and Purpose</h2>
+ * 
+ * The purpose of this abstraction is used to expose a common and well defined 
interface for
+ * access to files. This abstraction is used both by Flink's fault tolerance 
mechanism (storing
+ * state and recovery data) and by reusable built-in connectors (file sources 
/ sinks).
+ * 
+ * <p>The purpose of this abstraction is <b>not</b> to give user programs an 
abstraction with
+ * extreme flexibility and control across all possible file systems. That 
mission would be a folly,
+ * as the differences in characteristics of even the most common file systems 
are already quite
+ * large. It is expected that user programs that need specialized 
functionality of certain file systems
+ * in their functions, operations, sources, or sinks instantiate the 
specialized file system adapters
+ * directly.
+ * 
+ * <h2>Data Persistence Contract</h2>
  * 
  * The FileSystem's {@link FSDataOutputStream output streams} are used to 
persistently store data,
  * both for results of streaming applications and for fault tolerance and 
recovery. It is therefore
@@ -152,6 +166,14 @@ import static 
org.apache.flink.util.Preconditions.checkNotNull;
  * in between read or write operations, because there are no guarantees about 
the visibility of
  * operations across threads (many operations do not create memory fences).
  * 
+ * <h2>Streams Safety Net</h2>
+ * 
+ * When application code obtains a FileSystem (via {@link FileSystem#get(URI)} 
or via
+ * {@link Path#getFileSystem()}), the FileSystem instantiates a safety net for 
that FileSystem.
+ * The safety net ensures that all streams created from the FileSystem are 
closed when the
+ * application task finishes (or is canceled or failed). That way, the task's 
threads do not
+ * leak connections.
+ * 
  * @see FSDataInputStream
  * @see FSDataOutputStream
  */
@@ -164,11 +186,13 @@ public abstract class FileSystem {
         */
        public enum WriteMode {
 
-               /** Creates the target file if it does not exist. Does not 
overwrite existing files and directories. */
+               /** Creates the target file only if no file exists at that path 
already.
+                * Does not overwrite existing files and directories. */
                NO_OVERWRITE,
 
-               /** Creates a new target file regardless of any existing files 
or directories. Existing files and
-                * directories will be removed/overwritten. */
+               /** Creates a new target file regardless of any existing files 
or directories.
+                * Existing files and directories will be deleted (recursively) 
automatically before
+                * creating the new file. */
                OVERWRITE
        }
 
@@ -555,7 +579,6 @@ public abstract class FileSystem {
         *        source file
         */
        public boolean exists(final Path f) throws IOException {
-
                try {
                        return (getFileStatus(f) != null);
                } catch (FileNotFoundException e) {
@@ -590,6 +613,11 @@ public abstract class FileSystem {
 
        /**
         * Opens an FSDataOutputStream at the indicated Path.
+        * 
+        * <p>This method is deprecated, because most of its parameters are 
ignored by most file systems.
+        * To control for example the replication factor and block size in the 
Hadoop Distributed File system,
+        * make sure that the respective Hadoop configuration file is either 
linked from the Flink configuration,
+        * or in the classpath of either Flink or the user code.
         *
         * @param f
         *        the file name to open
@@ -602,8 +630,15 @@ public abstract class FileSystem {
         *        required block replication for the file.
         * @param blockSize
         *        the size of the file blocks
-        * @throws IOException
+        * 
+        * @throws IOException Thrown, if the stream could not be opened 
because of an I/O, or because
+        *                     a file already exists at that path and the write 
mode indicates to not
+        *                     overwrite the file.
+        * 
+        * @deprecated Deprecated because not well supported across types of 
file systems.
+        *             Control the behavior of specific file systems via 
configurations instead. 
         */
+       @Deprecated
        public abstract FSDataOutputStream create(Path f, boolean overwrite, 
int bufferSize, short replication,
                        long blockSize) throws IOException;
 
@@ -615,9 +650,34 @@ public abstract class FileSystem {
         * @param overwrite
         *        if a file with this name already exists, then if true,
         *        the file will be overwritten, and if false an error will be 
thrown.
-        * @throws IOException
+        * 
+        * @throws IOException Thrown, if the stream could not be opened 
because of an I/O, or because
+        *                     a file already exists at that path and the write 
mode indicates to not
+        *                     overwrite the file.
+        * 
+        * @deprecated Use {@link #create(Path, WriteMode)} instead.
+        */
+       @Deprecated
+       public FSDataOutputStream create(Path f, boolean overwrite) throws 
IOException {
+               return create(f, overwrite ? WriteMode.OVERWRITE : 
WriteMode.NO_OVERWRITE);
+       }
+
+       /**
+        * Opens an FSDataOutputStream to a new file at the given path.
+        * 
+        * <p>If the file already exists, the behavior depends on the given 
{@code WriteMode}.
+        * If the mode is set to {@link WriteMode#NO_OVERWRITE}, then this 
method fails with an
+        * exception.
+        *
+        * @param f The file path to write to
+        * @param overwriteMode The action to take if a file or directory 
already exists at the given path.
+        * @return The stream to the new file at the target path.
+        * 
+        * @throws IOException Thrown, if the stream could not be opened 
because of an I/O, or because
+        *                     a file already exists at that path and the write 
mode indicates to not
+        *                     overwrite the file.
         */
-       public abstract FSDataOutputStream create(Path f, boolean overwrite) 
throws IOException;
+       public abstract FSDataOutputStream create(Path f, WriteMode 
overwriteMode) throws IOException;
 
        /**
         * Renames the file/directory src to dst.
@@ -632,7 +692,9 @@ public abstract class FileSystem {
        public abstract boolean rename(Path src, Path dst) throws IOException;
 
        /**
-        * Returns true if this is a distributed file system, false otherwise.
+        * Returns true if this is a distributed file system. A distributed 
file system here means
+        * that the file system is shared among all Flink processes that 
participate in a cluster or
+        * job and that all these processes can see the same files.
         *
         * @return True, if this is a distributed file system, false otherwise.
         */
@@ -911,7 +973,7 @@ public abstract class FileSystem {
         * An identifier of a file system, via its scheme and its authority.
         * This class needs to stay public, because it is detected as part of 
the public API.
         */
-       public static class FSKey {
+       private static final class FSKey {
 
                /** The scheme of the file system. */
                private final String scheme;

http://git-wip-us.apache.org/repos/asf/flink/blob/a1bfae95/flink-core/src/main/java/org/apache/flink/core/fs/SafetyNetWrapperFileSystem.java
----------------------------------------------------------------------
diff --git 
a/flink-core/src/main/java/org/apache/flink/core/fs/SafetyNetWrapperFileSystem.java
 
b/flink-core/src/main/java/org/apache/flink/core/fs/SafetyNetWrapperFileSystem.java
index 63e6253..1dacafd 100644
--- 
a/flink-core/src/main/java/org/apache/flink/core/fs/SafetyNetWrapperFileSystem.java
+++ 
b/flink-core/src/main/java/org/apache/flink/core/fs/SafetyNetWrapperFileSystem.java
@@ -30,8 +30,8 @@ import java.net.URI;
  * {@link ClosingFSDataInputStream} or {@link ClosingFSDataOutputStream} and 
(ii) registers them to
  * a {@link SafetyNetCloseableRegistry}.
  *
- * Streams obtained by this are therefore managed by the {@link 
SafetyNetCloseableRegistry} to prevent resource leaks
- * from unclosed streams.
+ * <p>Streams obtained by this are therefore managed by the {@link 
SafetyNetCloseableRegistry} to
+ * prevent resource leaks from unclosed streams.
  */
 @Internal
 public class SafetyNetWrapperFileSystem extends FileSystem implements 
WrappingProxy<FileSystem> {
@@ -120,7 +120,7 @@ public class SafetyNetWrapperFileSystem extends FileSystem 
implements WrappingPr
        }
 
        @Override
-       public FSDataOutputStream create(Path f, boolean overwrite) throws 
IOException {
+       public FSDataOutputStream create(Path f, WriteMode overwrite) throws 
IOException {
                FSDataOutputStream innerStream = unsafeFileSystem.create(f, 
overwrite);
                return ClosingFSDataOutputStream.wrapSafe(innerStream, 
registry, String.valueOf(f));
        }

http://git-wip-us.apache.org/repos/asf/flink/blob/a1bfae95/flink-core/src/main/java/org/apache/flink/core/fs/local/LocalFileSystem.java
----------------------------------------------------------------------
diff --git 
a/flink-core/src/main/java/org/apache/flink/core/fs/local/LocalFileSystem.java 
b/flink-core/src/main/java/org/apache/flink/core/fs/local/LocalFileSystem.java
index acbf814..12aeb7f 100644
--- 
a/flink-core/src/main/java/org/apache/flink/core/fs/local/LocalFileSystem.java
+++ 
b/flink-core/src/main/java/org/apache/flink/core/fs/local/LocalFileSystem.java
@@ -43,9 +43,12 @@ import java.io.IOException;
 import java.net.InetAddress;
 import java.net.URI;
 import java.net.UnknownHostException;
+import java.nio.file.FileAlreadyExistsException;
+
+import static org.apache.flink.util.Preconditions.checkNotNull;
 
 /**
- * The class <code>LocalFile</code> provides an implementation of the {@link 
FileSystem} interface
+ * The class {@code LocalFileSystem} is an implementation of the {@link 
FileSystem} interface
  * for the local file system of the machine where the JVM runs.
  */
 @Internal
@@ -231,28 +234,27 @@ public class LocalFileSystem extends FileSystem {
                return (parent == null || mkdirs(parent)) && (p2f.mkdir() || 
p2f.isDirectory());
        }
 
-
        @Override
-       public FSDataOutputStream create(final Path f, final boolean overwrite, 
final int bufferSize,
-                       final short replication, final long blockSize) throws 
IOException {
+       public FSDataOutputStream create(final Path filePath, final WriteMode 
overwrite) throws IOException {
+               checkNotNull(filePath, "filePath");
 
-               if (exists(f) && !overwrite) {
-                       throw new IOException("File already exists:" + f);
+               if (exists(filePath) && overwrite == WriteMode.NO_OVERWRITE) {
+                       throw new FileAlreadyExistsException("File already 
exists: " + filePath);
                }
 
-               final Path parent = f.getParent();
+               final Path parent = filePath.getParent();
                if (parent != null && !mkdirs(parent)) {
-                       throw new IOException("Mkdirs failed to create " + 
parent.toString());
+                       throw new IOException("Mkdirs failed to create " + 
parent);
                }
 
-               final File file = pathToFile(f);
+               final File file = pathToFile(filePath);
                return new LocalDataOutputStream(file);
        }
 
-
        @Override
-       public FSDataOutputStream create(final Path f, final boolean overwrite) 
throws IOException {
-               return create(f, overwrite, 0, (short) 0, 0);
+       public FSDataOutputStream create(
+                       Path f, boolean overwrite, int bufferSize, short 
replication, long blockSize) throws IOException {
+               return create(f, overwrite ? WriteMode.OVERWRITE : 
WriteMode.NO_OVERWRITE);
        }
 
 

http://git-wip-us.apache.org/repos/asf/flink/blob/a1bfae95/flink-core/src/main/java/org/apache/flink/util/AbstractCloseableRegistry.java
----------------------------------------------------------------------
diff --git 
a/flink-core/src/main/java/org/apache/flink/util/AbstractCloseableRegistry.java 
b/flink-core/src/main/java/org/apache/flink/util/AbstractCloseableRegistry.java
index e165d97..2b7a8c8 100644
--- 
a/flink-core/src/main/java/org/apache/flink/util/AbstractCloseableRegistry.java
+++ 
b/flink-core/src/main/java/org/apache/flink/util/AbstractCloseableRegistry.java
@@ -27,10 +27,10 @@ import java.util.Map;
 /**
  * This is the abstract base class for registries that allow to register 
instances of {@link Closeable}, which are all
  * closed if this registry is closed.
- * <p>
- * Registering to an already closed registry will throw an exception and close 
the provided {@link Closeable}
- * <p>
- * All methods in this class are thread-safe.
+ * 
+ * <p>Registering to an already closed registry will throw an exception and 
close the provided {@link Closeable}
+ * 
+ * <p>All methods in this class are thread-safe.
  *
  * @param <C> Type of the closeable this registers
  * @param <T> Type for potential meta data associated with the registering 
closeables
@@ -51,7 +51,7 @@ public abstract class AbstractCloseableRegistry<C extends 
Closeable, T> implemen
         * {@link IllegalStateException} and closes the passed {@link 
Closeable}.
         *
         * @param closeable Closeable tor register
-        * @return true if the the Closeable was newly added to the registry
+        * 
         * @throws IOException exception when the registry was closed before
         */
        public final void registerClosable(C closeable) throws IOException {
@@ -74,7 +74,6 @@ public abstract class AbstractCloseableRegistry<C extends 
Closeable, T> implemen
         * Removes a {@link Closeable} from the registry.
         *
         * @param closeable instance to remove from the registry.
-        * @return true, if the instance was actually registered and now removed
         */
        public final void unregisterClosable(C closeable) {
 
@@ -109,6 +108,10 @@ public abstract class AbstractCloseableRegistry<C extends 
Closeable, T> implemen
                return closeableToRef;
        }
 
+       // 
------------------------------------------------------------------------
+       //  
+       // 
------------------------------------------------------------------------
+
        protected abstract void doUnRegister(C closeable, Map<Closeable, T> 
closeableMap);
 
        protected abstract void doRegister(C closeable, Map<Closeable, T> 
closeableMap) throws IOException;

http://git-wip-us.apache.org/repos/asf/flink/blob/a1bfae95/flink-runtime/src/main/java/org/apache/flink/runtime/fs/hdfs/HadoopFileSystem.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/fs/hdfs/HadoopFileSystem.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/fs/hdfs/HadoopFileSystem.java
index 36dfa55..1371d21 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/fs/hdfs/HadoopFileSystem.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/fs/hdfs/HadoopFileSystem.java
@@ -417,9 +417,9 @@ public final class HadoopFileSystem extends FileSystem 
implements HadoopFileSyst
 
 
        @Override
-       public HadoopDataOutputStream create(final Path f, final boolean 
overwrite) throws IOException {
+       public HadoopDataOutputStream create(final Path f, final WriteMode 
overwrite) throws IOException {
                final org.apache.hadoop.fs.FSDataOutputStream 
fsDataOutputStream = this.fs
-                       .create(new org.apache.hadoop.fs.Path(f.toString()), 
overwrite);
+                       .create(new org.apache.hadoop.fs.Path(f.toString()), 
overwrite == WriteMode.OVERWRITE);
                return new HadoopDataOutputStream(fsDataOutputStream);
        }
 

http://git-wip-us.apache.org/repos/asf/flink/blob/a1bfae95/flink-runtime/src/main/java/org/apache/flink/runtime/fs/maprfs/MapRFileSystem.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/fs/maprfs/MapRFileSystem.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/fs/maprfs/MapRFileSystem.java
index a7ef441..57eea6f 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/fs/maprfs/MapRFileSystem.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/fs/maprfs/MapRFileSystem.java
@@ -327,11 +327,11 @@ public final class MapRFileSystem extends FileSystem {
        }
 
        @Override
-       public FSDataOutputStream create(final Path f, final boolean overwrite)
+       public FSDataOutputStream create(final Path f, final WriteMode 
overwrite)
                        throws IOException {
 
                final org.apache.hadoop.fs.FSDataOutputStream fdos = 
this.fs.create(
-                               new org.apache.hadoop.fs.Path(f.toString()), 
overwrite);
+                               new org.apache.hadoop.fs.Path(f.toString()), 
overwrite == WriteMode.OVERWRITE);
 
                return new HadoopDataOutputStream(fdos);
        }

Reply via email to