This is an automated email from the ASF dual-hosted git repository.

pnowojski pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 5c4b1aa2bd906bff076e48a5161ba394007cf657
Author: Piotr Nowojski <[email protected]>
AuthorDate: Thu Nov 9 14:58:53 2023 +0100

    [FLINK-35767][filesystems] Add PathsCopyingFileSystem
---
 .../java/org/apache/flink/core/fs/FileSystem.java  | 302 ++++-----------
 .../java/org/apache/flink/core/fs/IFileSystem.java | 415 +++++++++++++++++++++
 .../flink/core/fs/PathsCopyingFileSystem.java      |  91 +++++
 .../flink/core/fs/PluginFileSystemFactory.java     |  18 +-
 .../flink/core/fs/SafetyNetWrapperFileSystem.java  |  15 +-
 .../flink/runtime/state/KeyGroupsStateHandle.java  |   5 +
 .../runtime/state/OperatorStreamStateHandle.java   |   5 +
 .../state/RetrievableStreamStateHandle.java        |   5 +
 .../flink/runtime/state/StreamStateHandle.java     |   8 +
 .../runtime/state/filesystem/FileStateHandle.java  |   5 +
 10 files changed, 636 insertions(+), 233 deletions(-)

diff --git a/flink-core/src/main/java/org/apache/flink/core/fs/FileSystem.java 
b/flink-core/src/main/java/org/apache/flink/core/fs/FileSystem.java
index cee47823e1f..b6cd7a49276 100644
--- a/flink-core/src/main/java/org/apache/flink/core/fs/FileSystem.java
+++ b/flink-core/src/main/java/org/apache/flink/core/fs/FileSystem.java
@@ -196,8 +196,7 @@ import static 
org.apache.flink.util.Preconditions.checkNotNull;
  * @see FSDataOutputStream
  */
 @Public
-public abstract class FileSystem {
-
+public abstract class FileSystem implements IFileSystem {
     /**
      * The possible write modes. The write mode decides what happens if a file 
should be created,
      * but already exists.
@@ -217,8 +216,6 @@ public abstract class FileSystem {
         OVERWRITE
     }
 
-    // ------------------------------------------------------------------------
-
     /** Logger for all FileSystem work. */
     private static final Logger LOG = 
LoggerFactory.getLogger(FileSystem.class);
 
@@ -570,140 +567,9 @@ public abstract class FileSystem {
     }
 
     // ------------------------------------------------------------------------
-    //  File System Methods
+    //  File System Methods deprecated
     // ------------------------------------------------------------------------
 
-    /**
-     * Returns the path of the file system's current working directory.
-     *
-     * @return the path of the file system's current working directory
-     */
-    public abstract Path getWorkingDirectory();
-
-    /**
-     * Returns the path of the user's home directory in this file system.
-     *
-     * @return the path of the user's home directory in this file system.
-     */
-    public abstract Path getHomeDirectory();
-
-    /**
-     * Returns a URI whose scheme and authority identify this file system.
-     *
-     * @return a URI whose scheme and authority identify this file system
-     */
-    public abstract URI getUri();
-
-    /**
-     * Return a file status object that represents the path.
-     *
-     * @param f The path we want information from
-     * @return a FileStatus object
-     * @throws FileNotFoundException when the path does not exist; IOException 
see specific
-     *     implementation
-     */
-    public abstract FileStatus getFileStatus(Path f) throws IOException;
-
-    /**
-     * Return an array containing hostnames, offset and size of portions of 
the given file. For a
-     * nonexistent file or regions, null will be returned. This call is most 
helpful with DFS, where
-     * it returns hostnames of machines that contain the given file. The 
FileSystem will simply
-     * return an elt containing 'localhost'.
-     */
-    public abstract BlockLocation[] getFileBlockLocations(FileStatus file, 
long start, long len)
-            throws IOException;
-
-    /**
-     * Opens an FSDataInputStream at the indicated Path.
-     *
-     * @param f the file name to open
-     * @param bufferSize the size of the buffer to be used.
-     */
-    public abstract FSDataInputStream open(Path f, int bufferSize) throws 
IOException;
-
-    /**
-     * Opens an FSDataInputStream at the indicated Path.
-     *
-     * @param f the file to open
-     */
-    public abstract FSDataInputStream open(Path f) throws IOException;
-
-    /**
-     * Creates a new {@link RecoverableWriter}. A recoverable writer creates 
streams that can
-     * persist and recover their intermediate state. Persisting and recovering 
intermediate state is
-     * a core building block for writing to files that span multiple 
checkpoints.
-     *
-     * <p>The returned object can act as a shared factory to open and recover 
multiple streams.
-     *
-     * <p>This method is optional on file systems and various file system 
implementations may not
-     * support this method, throwing an {@code UnsupportedOperationException}.
-     *
-     * @return A RecoverableWriter for this file system.
-     * @throws IOException Thrown, if the recoverable writer cannot be 
instantiated.
-     */
-    public RecoverableWriter createRecoverableWriter() throws IOException {
-        throw new UnsupportedOperationException(
-                "This file system does not support recoverable writers.");
-    }
-
-    /**
-     * Return the number of bytes that large input files should be optimally 
be split into to
-     * minimize I/O time.
-     *
-     * @return the number of bytes that large input files should be optimally 
be split into to
-     *     minimize I/O time
-     * @deprecated This value is no longer used and is meaningless.
-     */
-    @Deprecated
-    public long getDefaultBlockSize() {
-        return 32 * 1024 * 1024; // 32 MB;
-    }
-
-    /**
-     * List the statuses of the files/directories in the given path if the 
path is a directory.
-     *
-     * @param f given path
-     * @return the statuses of the files/directories in the given path
-     * @throws IOException
-     */
-    public abstract FileStatus[] listStatus(Path f) throws IOException;
-
-    /**
-     * Check if exists.
-     *
-     * @param f source file
-     */
-    public boolean exists(final Path f) throws IOException {
-        try {
-            return (getFileStatus(f) != null);
-        } catch (FileNotFoundException e) {
-            return false;
-        }
-    }
-
-    /**
-     * Delete a file.
-     *
-     * @param f the path to delete
-     * @param recursive if path is a directory and set to <code>true</code>, 
the directory is
-     *     deleted else throws an exception. In case of a file the recursive 
can be set to either
-     *     <code>true</code> or <code>false</code>
-     * @return <code>true</code> if delete is successful, <code>false</code> 
otherwise
-     * @throws IOException
-     */
-    public abstract boolean delete(Path f, boolean recursive) throws 
IOException;
-
-    /**
-     * Make the given file and all non-existent parents into directories. Has 
the semantics of Unix
-     * 'mkdir -p'. Existence of the directory hierarchy is not an error.
-     *
-     * @param f the directory/directories to be created
-     * @return <code>true</code> if at least one new directory has been 
created, <code>false</code>
-     *     otherwise
-     * @throws IOException thrown if an I/O error occurs while creating the 
directory
-     */
-    public abstract boolean mkdirs(Path f) throws IOException;
-
     /**
      * Opens an FSDataOutputStream at the indicated Path.
      *
@@ -739,7 +605,7 @@ public abstract class FileSystem {
      *     overwritten, and if false an error will be thrown.
      * @throws IOException Thrown, if the stream could not be opened because 
of an I/O, or because a
      *     file already exists at that path and the write mode indicates to 
not overwrite the file.
-     * @deprecated Use {@link #create(Path, WriteMode)} instead.
+     * @deprecated Use {@link #create(Path, FileSystem.WriteMode)} instead.
      */
     @Deprecated
     public FSDataOutputStream create(Path f, boolean overwrite) throws 
IOException {
@@ -747,92 +613,86 @@ public abstract class FileSystem {
     }
 
     /**
-     * Opens an FSDataOutputStream to a new file at the given path.
-     *
-     * <p>If the file already exists, the behavior depends on the given {@code 
WriteMode}. If the
-     * mode is set to {@link WriteMode#NO_OVERWRITE}, then this method fails 
with an exception.
+     * Gets a description of the characteristics of this file system.
      *
-     * @param f The file path to write to
-     * @param overwriteMode The action to take if a file or directory already 
exists at the given
-     *     path.
-     * @return The stream to the new file at the target path.
-     * @throws IOException Thrown, if the stream could not be opened because 
of an I/O, or because a
-     *     file already exists at that path and the write mode indicates to 
not overwrite the file.
+     * @deprecated this method is not used anymore.
      */
-    public abstract FSDataOutputStream create(Path f, WriteMode overwriteMode) 
throws IOException;
+    @Deprecated
+    public abstract FileSystemKind getKind();
 
     /**
-     * Renames the file/directory src to dst.
+     * Return the number of bytes that large input files should be optimally 
be split into to
+     * minimize I/O time.
      *
-     * @param src the file/directory to rename
-     * @param dst the new name of the file/directory
-     * @return <code>true</code> if the renaming was successful, 
<code>false</code> otherwise
-     * @throws IOException
+     * @return the number of bytes that large input files should be optimally 
be split into to
+     *     minimize I/O time
+     * @deprecated This value is no longer used and is meaningless.
      */
+    @Deprecated
+    public long getDefaultBlockSize() {
+        return 32 * 1024 * 1024; // 32 MB;
+    }
+
+    // ------------------------------------------------------------------------
+    // File System Methods kept for binary compatibility. Please check {@link 
IFileSystem} for the
+    // documentation.
+    // ------------------------------------------------------------------------
+
+    @Override
+    public abstract Path getWorkingDirectory();
+
+    @Override
+    public abstract Path getHomeDirectory();
+
+    @Override
+    public abstract URI getUri();
+
+    @Override
+    public abstract FileStatus getFileStatus(Path f) throws IOException;
+
+    @Override
+    public abstract BlockLocation[] getFileBlockLocations(FileStatus file, 
long start, long len)
+            throws IOException;
+
+    @Override
+    public abstract FSDataInputStream open(Path f, int bufferSize) throws 
IOException;
+
+    @Override
+    public abstract FSDataInputStream open(Path f) throws IOException;
+
+    @Override
+    public RecoverableWriter createRecoverableWriter() throws IOException {
+        return IFileSystem.super.createRecoverableWriter();
+    }
+
+    @Override
+    public abstract FileStatus[] listStatus(Path f) throws IOException;
+
+    @Override
+    public boolean exists(final Path f) throws IOException {
+        return IFileSystem.super.exists(f);
+    }
+
+    @Override
+    public abstract boolean delete(Path f, boolean recursive) throws 
IOException;
+
+    @Override
+    public abstract boolean mkdirs(Path f) throws IOException;
+
+    @Override
+    public abstract FSDataOutputStream create(Path f, WriteMode overwriteMode) 
throws IOException;
+
+    @Override
     public abstract boolean rename(Path src, Path dst) throws IOException;
 
-    /**
-     * Returns true if this is a distributed file system. A distributed file 
system here means that
-     * the file system is shared among all Flink processes that participate in 
a cluster or job and
-     * that all these processes can see the same files.
-     *
-     * @return True, if this is a distributed file system, false otherwise.
-     */
+    @Override
     public abstract boolean isDistributedFS();
 
-    /**
-     * Gets a description of the characteristics of this file system.
-     *
-     * @deprecated this method is not used anymore.
-     */
-    @Deprecated
-    public abstract FileSystemKind getKind();
-
     // ------------------------------------------------------------------------
     //  output directory initialization
     // ------------------------------------------------------------------------
 
-    /**
-     * Initializes output directories on local file systems according to the 
given write mode.
-     *
-     * <ul>
-     *   <li>WriteMode.NO_OVERWRITE &amp; parallel output:
-     *       <ul>
-     *         <li>A directory is created if the output path does not exist.
-     *         <li>An existing directory is reused, files contained in the 
directory are NOT
-     *             deleted.
-     *         <li>An existing file raises an exception.
-     *       </ul>
-     *   <li>WriteMode.NO_OVERWRITE &amp; NONE parallel output:
-     *       <ul>
-     *         <li>An existing file or directory raises an exception.
-     *       </ul>
-     *   <li>WriteMode.OVERWRITE &amp; parallel output:
-     *       <ul>
-     *         <li>A directory is created if the output path does not exist.
-     *         <li>An existing directory is reused, files contained in the 
directory are NOT
-     *             deleted.
-     *         <li>An existing file is deleted and replaced by a new directory.
-     *       </ul>
-     *   <li>WriteMode.OVERWRITE &amp; NONE parallel output:
-     *       <ul>
-     *         <li>An existing file or directory (and all its content) is 
deleted
-     *       </ul>
-     * </ul>
-     *
-     * <p>Files contained in an existing directory are not deleted, because 
multiple instances of a
-     * DataSinkTask might call this function at the same time and hence might 
perform concurrent
-     * delete operations on the file system (possibly deleting output files of 
concurrently running
-     * tasks). Since concurrent DataSinkTasks are not aware of each other, 
coordination of delete
-     * and create operations would be difficult.
-     *
-     * @param outPath Output path that should be prepared.
-     * @param writeMode Write mode to consider.
-     * @param createDirectory True, to initialize a directory at the given 
path, false to prepare
-     *     space for a file.
-     * @return True, if the path was successfully prepared, false otherwise.
-     * @throws IOException Thrown, if any of the file system access operations 
failed.
-     */
+    @Override
     public boolean initOutPathLocalFS(Path outPath, WriteMode writeMode, 
boolean createDirectory)
             throws IOException {
         if (isDistributedFS()) {
@@ -843,6 +703,7 @@ public abstract class FileSystem {
         // concurrently work in this method (multiple output formats writing 
locally) might end
         // up deleting each other's directories and leave non-retrievable 
files, without necessarily
         // causing an exception. That results in very subtle issues, like 
output files looking as if
+
         // they are not getting created.
 
         // we acquire the lock interruptibly here, to make sure that 
concurrent threads waiting
@@ -945,28 +806,7 @@ public abstract class FileSystem {
         }
     }
 
-    /**
-     * Initializes output directories on distributed file systems according to 
the given write mode.
-     *
-     * <p>WriteMode.NO_OVERWRITE &amp; parallel output: - A directory is 
created if the output path
-     * does not exist. - An existing file or directory raises an exception.
-     *
-     * <p>WriteMode.NO_OVERWRITE &amp; NONE parallel output: - An existing 
file or directory raises
-     * an exception.
-     *
-     * <p>WriteMode.OVERWRITE &amp; parallel output: - A directory is created 
if the output path
-     * does not exist. - An existing directory and its content is deleted and 
a new directory is
-     * created. - An existing file is deleted and replaced by a new directory.
-     *
-     * <p>WriteMode.OVERWRITE &amp; NONE parallel output: - An existing file 
or directory is deleted
-     * and replaced by a new directory.
-     *
-     * @param outPath Output path that should be prepared.
-     * @param writeMode Write mode to consider.
-     * @param createDirectory True, to initialize a directory at the given 
path, false otherwise.
-     * @return True, if the path was successfully prepared, false otherwise.
-     * @throws IOException Thrown, if any of the file system access operations 
failed.
-     */
+    @Override
     public boolean initOutPathDistFS(Path outPath, WriteMode writeMode, 
boolean createDirectory)
             throws IOException {
         if (!isDistributedFS()) {
diff --git a/flink-core/src/main/java/org/apache/flink/core/fs/IFileSystem.java 
b/flink-core/src/main/java/org/apache/flink/core/fs/IFileSystem.java
new file mode 100644
index 00000000000..dcbd79aa3f9
--- /dev/null
+++ b/flink-core/src/main/java/org/apache/flink/core/fs/IFileSystem.java
@@ -0,0 +1,415 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+/*
+ * This file is based on source code from the Hadoop Project 
(http://hadoop.apache.org/), licensed by the Apache
+ * Software Foundation (ASF) under the Apache License, Version 2.0. See the 
NOTICE file distributed with this work for
+ * additional information regarding copyright ownership.
+ */
+
+package org.apache.flink.core.fs;
+
+import org.apache.flink.annotation.Experimental;
+import org.apache.flink.core.fs.local.LocalFileSystem;
+
+import java.io.FileNotFoundException;
+import java.io.IOException;
+import java.net.URI;
+
+/**
+ * Interface of all file systems used by Flink. This interface may be extended 
to implement
+ * distributed file systems, or local file systems. The abstraction by this 
file system is very
+ * simple, and the set of available operations quite limited, to support the 
common denominator of a
+ * wide range of file systems. For example, appending to or mutating existing 
files is not
+ * supported.
+ *
+ * <p>Flink implements and supports some file system types directly (for 
example the default
+ * machine-local file system). Other file system types are accessed by an 
implementation that
+ * bridges to the suite of file systems supported by Hadoop (such as for 
example HDFS).
+ *
+ * <h2>Scope and Purpose</h2>
+ *
+ * <p>The purpose of this abstraction is used to expose a common and well 
defined interface for
+ * access to files. This abstraction is used both by Flink's fault tolerance 
mechanism (storing
+ * state and recovery data) and by reusable built-in connectors (file sources 
/ sinks).
+ *
+ * <p>The purpose of this abstraction is <b>not</b> to give user programs an 
abstraction with
+ * extreme flexibility and control across all possible file systems. That 
mission would be a folly,
+ * as the differences in characteristics of even the most common file systems 
are already quite
+ * large. It is expected that user programs that need specialized 
functionality of certain file
+ * systems in their functions, operations, sources, or sinks instantiate the 
specialized file system
+ * adapters directly.
+ *
+ * <h2>Data Persistence Contract</h2>
+ *
+ * <p>The FileSystem's {@link FSDataOutputStream output streams} are used to 
persistently store
+ * data, both for results of streaming applications and for fault tolerance 
and recovery. It is
+ * therefore crucial that the persistence semantics of these streams are well 
defined.
+ *
+ * <h3>Definition of Persistence Guarantees</h3>
+ *
+ * <p>Data written to an output stream is considered persistent, if two 
requirements are met:
+ *
+ * <ol>
+ *   <li><b>Visibility Requirement:</b> It must be guaranteed that all other 
processes, machines,
+ *       virtual machines, containers, etc. that are able to access the file 
see the data
+ *       consistently when given the absolute file path. This requirement is 
similar to the
+ *       <i>close-to-open</i> semantics defined by POSIX, but restricted to 
the file itself (by its
+ *       absolute path).
+ *   <li><b>Durability Requirement:</b> The file system's specific 
durability/persistence
+ *       requirements must be met. These are specific to the particular file 
system. For example the
+ *       {@link LocalFileSystem} does not provide any durability guarantees 
for crashes of both
+ *       hardware and operating system, while replicated distributed file 
systems (like HDFS)
+ *       typically guarantee durability in the presence of at most <i>n</i> 
concurrent node
+ *       failures, where <i>n</i> is the replication factor.
+ * </ol>
+ *
+ * <p>Updates to the file's parent directory (such that the file shows up when 
listing the directory
+ * contents) are not required to be complete for the data in the file stream 
to be considered
+ * persistent. This relaxation is important for file systems where updates to 
directory contents are
+ * only eventually consistent.
+ *
+ * <p>The {@link FSDataOutputStream} has to guarantee data persistence for the 
written bytes once
+ * the call to {@link FSDataOutputStream#close()} returns.
+ *
+ * <h3>Examples</h3>
+ *
+ * <h4>Fault-tolerant distributed file systems</h4>
+ *
+ * <p>For <b>fault-tolerant distributed file systems</b>, data is considered 
persistent once it has
+ * been received and acknowledged by the file system, typically by having been 
replicated to a
+ * quorum of machines (<i>durability requirement</i>). In addition the 
absolute file path must be
+ * visible to all other machines that will potentially access the file 
(<i>visibility
+ * requirement</i>).
+ *
+ * <p>Whether data has hit non-volatile storage on the storage nodes depends 
on the specific
+ * guarantees of the particular file system.
+ *
+ * <p>The metadata updates to the file's parent directory are not required to 
have reached a
+ * consistent state. It is permissible that some machines see the file when 
listing the parent
+ * directory's contents while others do not, as long as access to the file by 
its absolute path is
+ * possible on all nodes.
+ *
+ * <h4>Local file systems</h4>
+ *
+ * <p>A <b>local file system</b> must support the POSIX <i>close-to-open</i> 
semantics. Because the
+ * local file system does not have any fault tolerance guarantees, no further 
requirements exist.
+ *
+ * <p>The above implies specifically that data may still be in the OS cache 
when considered
+ * persistent from the local file system's perspective. Crashes that cause the 
OS cache to lose data
+ * are considered fatal to the local machine and are not covered by the local 
file system's
+ * guarantees as defined by Flink.
+ *
+ * <p>That means that computed results, checkpoints, and savepoints that are 
written only to the
+ * local filesystem are not guaranteed to be recoverable from the local 
machine's failure, making
+ * local file systems unsuitable for production setups.
+ *
+ * <h2>Updating File Contents</h2>
+ *
+ * <p>Many file systems either do not support overwriting contents of existing 
files at all, or do
+ * not support consistent visibility of the updated contents in that case. For 
that reason, Flink's
+ * FileSystem does not support appending to existing files, or seeking within 
output streams so that
+ * previously written data could be overwritten.
+ *
+ * <h2>Overwriting Files</h2>
+ *
+ * <p>Overwriting files is in general possible. A file is overwritten by 
deleting it and creating a
+ * new file. However, certain filesystems cannot make that change 
synchronously visible to all
+ * parties that have access to the file. For example <a
+ * href="https://aws.amazon.com/documentation/s3/";>Amazon S3</a> guarantees 
only <i>eventual
+ * consistency</i> in the visibility of the file replacement: Some machines 
may see the old file,
+ * some machines may see the new file.
+ *
+ * <p>To avoid these consistency issues, the implementations of 
failure/recovery mechanisms in Flink
+ * strictly avoid writing to the same file path more than once.
+ *
+ * <h2>Thread Safety</h2>
+ *
+ * <p>Implementations of {@code FileSystem} must be thread-safe: The same 
instance of FileSystem is
+ * frequently shared across multiple threads in Flink and must be able to 
concurrently create
+ * input/output streams and list file metadata.
+ *
+ * <p>The {@link FSDataInputStream} and {@link FSDataOutputStream} 
implementations are strictly
+ * <b>not thread-safe</b>. Instances of the streams should also not be passed 
between threads in
+ * between read or write operations, because there are no guarantees about the 
visibility of
+ * operations across threads (many operations do not create memory fences).
+ *
+ * <h2>Streams Safety Net</h2>
+ *
+ * <p>When application code obtains a FileSystem (via {@link 
FileSystem#get(URI)} or via {@link
+ * Path#getFileSystem()}), the FileSystem instantiates a safety net for that 
FileSystem. The safety
+ * net ensures that all streams created from the FileSystem are closed when 
the application task
+ * finishes (or is canceled or failed). That way, the task's threads do not 
leak connections.
+ *
+ * <p>Internal runtime code can explicitly obtain a FileSystem that does not 
use the safety net via
+ * {@link FileSystem#getUnguardedFileSystem(URI)}.
+ *
+ * @see FSDataInputStream
+ * @see FSDataOutputStream
+ */
+@Experimental
+public interface IFileSystem {
+
+    /**
+     * Returns the path of the file system's current working directory.
+     *
+     * @return the path of the file system's current working directory
+     */
+    Path getWorkingDirectory();
+
+    /**
+     * Returns the path of the user's home directory in this file system.
+     *
+     * @return the path of the user's home directory in this file system.
+     */
+    Path getHomeDirectory();
+
+    /**
+     * Returns a URI whose scheme and authority identify this file system.
+     *
+     * @return a URI whose scheme and authority identify this file system
+     */
+    URI getUri();
+
+    /**
+     * Return a file status object that represents the path.
+     *
+     * @param f The path we want information from
+     * @return a FileStatus object
+     * @throws FileNotFoundException when the path does not exist; IOException 
see specific
+     *     implementation
+     */
+    FileStatus getFileStatus(Path f) throws IOException;
+
+    /**
+     * Return an array containing hostnames, offset and size of portions of 
the given file. For a
+     * nonexistent file or regions, null will be returned. This call is most 
helpful with DFS, where
+     * it returns hostnames of machines that contain the given file. The 
FileSystem will simply
+     * return an elt containing 'localhost'.
+     */
+    BlockLocation[] getFileBlockLocations(FileStatus file, long start, long 
len) throws IOException;
+
+    /**
+     * Opens an FSDataInputStream at the indicated Path.
+     *
+     * @param f the file name to open
+     * @param bufferSize the size of the buffer to be used.
+     */
+    FSDataInputStream open(Path f, int bufferSize) throws IOException;
+
+    /**
+     * Opens an FSDataInputStream at the indicated Path.
+     *
+     * @param f the file to open
+     */
+    FSDataInputStream open(Path f) throws IOException;
+
+    /**
+     * Creates a new {@link RecoverableWriter}. A recoverable writer creates 
streams that can
+     * persist and recover their intermediate state. Persisting and recovering 
intermediate state is
+     * a core building block for writing to files that span multiple 
checkpoints.
+     *
+     * <p>The returned object can act as a shared factory to open and recover 
multiple streams.
+     *
+     * <p>This method is optional on file systems and various file system 
implementations may not
+     * support this method, throwing an {@code UnsupportedOperationException}.
+     *
+     * @return A RecoverableWriter for this file system.
+     * @throws IOException Thrown, if the recoverable writer cannot be 
instantiated.
+     */
+    default RecoverableWriter createRecoverableWriter() throws IOException {
+        throw new UnsupportedOperationException(
+                "This file system does not support recoverable writers.");
+    }
+
+    /**
+     * List the statuses of the files/directories in the given path if the 
path is a directory.
+     *
+     * @param f given path
+     * @return the statuses of the files/directories in the given path
+     * @throws IOException
+     */
+    FileStatus[] listStatus(Path f) throws IOException;
+
+    /**
+     * Check if exists.
+     *
+     * @param f source file
+     */
+    default boolean exists(final Path f) throws IOException {
+        try {
+            return (getFileStatus(f) != null);
+        } catch (FileNotFoundException e) {
+            return false;
+        }
+    }
+
+    /**
+     * Tells if this {@link FileSystem} supports an optimised way to directly 
copy between given
+     * paths. In other words if it implements {@link PathsCopyingFileSystem}.
+     *
+     * <p>At least one of, either source or destination belongs to this {@link 
IFileSystem}. One of
+     * them can point to the local file system. In other words this request 
can correspond to
+     * either: downloading a file from the remote file system, uploading a 
file to the remote file
+     * system or duplicating a file in the remote file system.
+     *
+     * @param source The path of the source file to duplicate
+     * @param destination The path where to duplicate the source file
+     * @return true, if this {@link IFileSystem} can perform this operation 
more quickly compared to
+     *     the generic code path of using streams.
+     */
+    default boolean canCopyPaths(Path source, Path destination) throws 
IOException {
+        return false;
+    }
+
+    /**
+     * Delete a file.
+     *
+     * @param f the path to delete
+     * @param recursive if path is a directory and set to <code>true</code>, 
the directory is
+     *     deleted else throws an exception. In case of a file the recursive 
can be set to either
+     *     <code>true</code> or <code>false</code>
+     * @return <code>true</code> if delete is successful, <code>false</code> 
otherwise
+     * @throws IOException
+     */
+    boolean delete(Path f, boolean recursive) throws IOException;
+
+    /**
+     * Make the given file and all non-existent parents into directories. Has 
the semantics of Unix
+     * 'mkdir -p'. Existence of the directory hierarchy is not an error.
+     *
+     * @param f the directory/directories to be created
+     * @return <code>true</code> if at least one new directory has been 
created, <code>false</code>
+     *     otherwise
+     * @throws IOException thrown if an I/O error occurs while creating the 
directory
+     */
+    boolean mkdirs(Path f) throws IOException;
+
+    /**
+     * Opens an FSDataOutputStream to a new file at the given path.
+     *
+     * <p>If the file already exists, the behavior depends on the given {@code 
WriteMode}. If the
+     * mode is set to {@link FileSystem.WriteMode#NO_OVERWRITE}, then this 
method fails with an
+     * exception.
+     *
+     * @param f The file path to write to
+     * @param overwriteMode The action to take if a file or directory already 
exists at the given
+     *     path.
+     * @return The stream to the new file at the target path.
+     * @throws IOException Thrown, if the stream could not be opened because 
of an I/O, or because a
+     *     file already exists at that path and the write mode indicates to 
not overwrite the file.
+     */
+    FSDataOutputStream create(Path f, FileSystem.WriteMode overwriteMode) 
throws IOException;
+
+    /**
+     * Renames the file/directory src to dst.
+     *
+     * @param src the file/directory to rename
+     * @param dst the new name of the file/directory
+     * @return <code>true</code> if the renaming was successful, 
<code>false</code> otherwise
+     * @throws IOException
+     */
+    boolean rename(Path src, Path dst) throws IOException;
+
+    /**
+     * Returns true if this is a distributed file system. A distributed file 
system here means that
+     * the file system is shared among all Flink processes that participate in 
a cluster or job and
+     * that all these processes can see the same files.
+     *
+     * @return True, if this is a distributed file system, false otherwise.
+     */
+    boolean isDistributedFS();
+
+    /**
+     * Gets a description of the characteristics of this file system.
+     *
+     * @deprecated this method is not used anymore.
+     */
+    @Deprecated
+    FileSystemKind getKind();
+
+    /**
+     * Initializes output directories on local file systems according to the 
given write mode.
+     *
+     * <ul>
+     *   <li>WriteMode.NO_OVERWRITE &amp; parallel output:
+     *       <ul>
+     *         <li>A directory is created if the output path does not exist.
+     *         <li>An existing directory is reused, files contained in the 
directory are NOT
+     *             deleted.
+     *         <li>An existing file raises an exception.
+     *       </ul>
+     *   <li>WriteMode.NO_OVERWRITE &amp; NONE parallel output:
+     *       <ul>
+     *         <li>An existing file or directory raises an exception.
+     *       </ul>
+     *   <li>WriteMode.OVERWRITE &amp; parallel output:
+     *       <ul>
+     *         <li>A directory is created if the output path does not exist.
+     *         <li>An existing directory is reused, files contained in the 
directory are NOT
+     *             deleted.
+     *         <li>An existing file is deleted and replaced by a new directory.
+     *       </ul>
+     *   <li>WriteMode.OVERWRITE &amp; NONE parallel output:
+     *       <ul>
+     *         <li>An existing file or directory (and all its content) is 
deleted
+     *       </ul>
+     * </ul>
+     *
+     * <p>Files contained in an existing directory are not deleted, because 
multiple instances of a
+     * DataSinkTask might call this function at the same time and hence might 
perform concurrent
+     * delete operations on the file system (possibly deleting output files of 
concurrently running
+     * tasks). Since concurrent DataSinkTasks are not aware of each other, 
coordination of delete
+     * and create operations would be difficult.
+     *
+     * @param outPath Output path that should be prepared.
+     * @param writeMode Write mode to consider.
+     * @param createDirectory True, to initialize a directory at the given 
path, false to prepare
+     *     space for a file.
+     * @return True, if the path was successfully prepared, false otherwise.
+     * @throws IOException Thrown, if any of the file system access operations 
failed.
+     */
+    boolean initOutPathLocalFS(
+            Path outPath, FileSystem.WriteMode writeMode, boolean 
createDirectory)
+            throws IOException;
+
+    /**
+     * Initializes output directories on distributed file systems according to 
the given write mode.
+     *
+     * <p>WriteMode.NO_OVERWRITE &amp; parallel output: - A directory is 
created if the output path
+     * does not exist. - An existing file or directory raises an exception.
+     *
+     * <p>WriteMode.NO_OVERWRITE &amp; NONE parallel output: - An existing 
file or directory raises
+     * an exception.
+     *
+     * <p>WriteMode.OVERWRITE &amp; parallel output: - A directory is created 
if the output path
+     * does not exist. - An existing directory and its content is deleted and 
a new directory is
+     * created. - An existing file is deleted and replaced by a new directory.
+     *
+     * <p>WriteMode.OVERWRITE &amp; NONE parallel output: - An existing file 
or directory is deleted
+     * and replaced by a new directory.
+     *
+     * @param outPath Output path that should be prepared.
+     * @param writeMode Write mode to consider.
+     * @param createDirectory True, to initialize a directory at the given 
path, false otherwise.
+     * @return True, if the path was successfully prepared, false otherwise.
+     * @throws IOException Thrown, if any of the file system access operations 
failed.
+     */
+    boolean initOutPathDistFS(Path outPath, FileSystem.WriteMode writeMode, 
boolean createDirectory)
+            throws IOException;
+}
diff --git 
a/flink-core/src/main/java/org/apache/flink/core/fs/PathsCopyingFileSystem.java 
b/flink-core/src/main/java/org/apache/flink/core/fs/PathsCopyingFileSystem.java
new file mode 100644
index 00000000000..a20f6822b3d
--- /dev/null
+++ 
b/flink-core/src/main/java/org/apache/flink/core/fs/PathsCopyingFileSystem.java
@@ -0,0 +1,91 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+/*
+ * This file is based on source code from the Hadoop Project 
(http://hadoop.apache.org/), licensed by the Apache
+ * Software Foundation (ASF) under the Apache License, Version 2.0. See the 
NOTICE file distributed with this work for
+ * additional information regarding copyright ownership.
+ */
+
+package org.apache.flink.core.fs;
+
+import org.apache.flink.annotation.Experimental;
+
+import java.io.IOException;
+import java.util.List;
+
+/**
+ * An interface marking that given {@link FileSystem} have an optimised path 
for copying paths
+ * instead of using {@link FSDataOutputStream} or {@link FSDataInputStream}.
+ */
+@Experimental
+public interface PathsCopyingFileSystem extends IFileSystem {
+    /**
+     * A pair of source and destination to duplicate a file.
+     *
+     * <p>At least one of, either source or destination belongs to this {@link
+     * PathsCopyingFileSystem}. One of them can point to the local file 
system. In other words this
+     * request can correspond to either: downloading a file from the remote 
file system, uploading a
+     * file to the remote file system or duplicating a file in the remote file 
system.
+     */
+    interface CopyRequest {
+        /** The path of the source file to duplicate. */
+        Path getSource();
+
+        /** The path where to duplicate the source file. */
+        Path getDestination();
+
+        /** A factory method for creating a simple pair of source/destination. 
*/
+        static CopyRequest of(Path source, Path destination) {
+            return new CopyRequest() {
+                @Override
+                public Path getSource() {
+                    return source;
+                }
+
+                @Override
+                public Path getDestination() {
+                    return destination;
+                }
+
+                @Override
+                public String toString() {
+                    return "CopyRequest{"
+                            + "source="
+                            + source
+                            + ", destination="
+                            + destination
+                            + '}';
+                }
+            };
+        }
+    }
+
+    /**
+     * List of {@link CopyRequest} to copy in batch by this {@link 
PathsCopyingFileSystem}. In case
+     * of an exception some files might have been already copied fully or 
partially. Caller should
+     * clean this up. Copy can be interrupted by the {@link CloseableRegistry}.
+     */
+    void copyFiles(List<CopyRequest> requests, ICloseableRegistry 
closeableRegistry)
+            throws IOException;
+
+    @Override
+    default boolean canCopyPaths(Path source, Path destination) throws 
IOException {
+        return true;
+    }
+}
diff --git 
a/flink-core/src/main/java/org/apache/flink/core/fs/PluginFileSystemFactory.java
 
b/flink-core/src/main/java/org/apache/flink/core/fs/PluginFileSystemFactory.java
index 0d1e889c61b..9e06cc52faf 100644
--- 
a/flink-core/src/main/java/org/apache/flink/core/fs/PluginFileSystemFactory.java
+++ 
b/flink-core/src/main/java/org/apache/flink/core/fs/PluginFileSystemFactory.java
@@ -23,6 +23,7 @@ import org.apache.flink.util.WrappingProxy;
 
 import java.io.IOException;
 import java.net.URI;
+import java.util.List;
 
 /**
  * A wrapper around {@link FileSystemFactory} that ensures the plugin 
classloader is used for all
@@ -69,7 +70,7 @@ public class PluginFileSystemFactory implements 
FileSystemFactory {
     }
 
     static class ClassLoaderFixingFileSystem extends FileSystem
-            implements WrappingProxy<FileSystem> {
+            implements WrappingProxy<FileSystem>, PathsCopyingFileSystem {
         private final FileSystem inner;
         private final ClassLoader loader;
 
@@ -149,6 +150,21 @@ public class PluginFileSystemFactory implements 
FileSystemFactory {
             }
         }
 
+        @Override
+        public void copyFiles(List<CopyRequest> requests, ICloseableRegistry 
closeableRegistry)
+                throws IOException {
+            try (TemporaryClassLoaderContext ignored = 
TemporaryClassLoaderContext.of(loader)) {
+                ((PathsCopyingFileSystem) inner).copyFiles(requests, 
closeableRegistry);
+            }
+        }
+
+        @Override
+        public boolean canCopyPaths(Path source, Path destination) throws 
IOException {
+            try (TemporaryClassLoaderContext ignored = 
TemporaryClassLoaderContext.of(loader)) {
+                return inner.canCopyPaths(source, destination);
+            }
+        }
+
         @Override
         public boolean delete(final Path f, final boolean recursive) throws 
IOException {
             try (TemporaryClassLoaderContext ignored = 
TemporaryClassLoaderContext.of(loader)) {
diff --git 
a/flink-core/src/main/java/org/apache/flink/core/fs/SafetyNetWrapperFileSystem.java
 
b/flink-core/src/main/java/org/apache/flink/core/fs/SafetyNetWrapperFileSystem.java
index e49d11695a0..13020b33a78 100644
--- 
a/flink-core/src/main/java/org/apache/flink/core/fs/SafetyNetWrapperFileSystem.java
+++ 
b/flink-core/src/main/java/org/apache/flink/core/fs/SafetyNetWrapperFileSystem.java
@@ -24,6 +24,7 @@ import org.apache.flink.util.WrappingProxy;
 
 import java.io.IOException;
 import java.net.URI;
+import java.util.List;
 
 /**
  * This is a {@link WrappingProxy} around {@link FileSystem} which (i) wraps 
all opened streams as
@@ -34,7 +35,8 @@ import java.net.URI;
  * prevent resource leaks from unclosed streams.
  */
 @Internal
-public class SafetyNetWrapperFileSystem extends FileSystem implements 
WrappingProxy<FileSystem> {
+public class SafetyNetWrapperFileSystem extends FileSystem
+        implements WrappingProxy<FileSystem>, PathsCopyingFileSystem {
 
     private final SafetyNetCloseableRegistry registry;
     private final FileSystem unsafeFileSystem;
@@ -45,6 +47,17 @@ public class SafetyNetWrapperFileSystem extends FileSystem 
implements WrappingPr
         this.unsafeFileSystem = Preconditions.checkNotNull(unsafeFileSystem);
     }
 
+    @Override
+    public void copyFiles(List<CopyRequest> requests, ICloseableRegistry 
closeableRegistry)
+            throws IOException {
+        ((PathsCopyingFileSystem) unsafeFileSystem).copyFiles(requests, 
closeableRegistry);
+    }
+
+    @Override
+    public boolean canCopyPaths(Path source, Path destination) throws 
IOException {
+        return unsafeFileSystem.canCopyPaths(source, destination);
+    }
+
     @Override
     public Path getWorkingDirectory() {
         return unsafeFileSystem.getWorkingDirectory();
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/state/KeyGroupsStateHandle.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/state/KeyGroupsStateHandle.java
index 94eedcf9642..3732aa2d30c 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/state/KeyGroupsStateHandle.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/state/KeyGroupsStateHandle.java
@@ -161,6 +161,11 @@ public class KeyGroupsStateHandle implements 
StreamStateHandle, KeyedStateHandle
         return stateHandle.asBytesIfInMemory();
     }
 
+    @Override
+    public Optional<org.apache.flink.core.fs.Path> maybeGetPath() {
+        return stateHandle.maybeGetPath();
+    }
+
     @Override
     public boolean equals(Object o) {
         if (this == o) {
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/state/OperatorStreamStateHandle.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/state/OperatorStreamStateHandle.java
index 90c1f20b28b..8bd40f64184 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/state/OperatorStreamStateHandle.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/state/OperatorStreamStateHandle.java
@@ -76,6 +76,11 @@ public class OperatorStreamStateHandle implements 
OperatorStateHandle {
         return delegateStateHandle.asBytesIfInMemory();
     }
 
+    @Override
+    public Optional<org.apache.flink.core.fs.Path> maybeGetPath() {
+        return delegateStateHandle.maybeGetPath();
+    }
+
     @Override
     public PhysicalStateHandleID getStreamStateHandleID() {
         return delegateStateHandle.getStreamStateHandleID();
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/state/RetrievableStreamStateHandle.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/state/RetrievableStreamStateHandle.java
index e807b93dd63..fb5d4697173 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/state/RetrievableStreamStateHandle.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/state/RetrievableStreamStateHandle.java
@@ -71,6 +71,11 @@ public class RetrievableStreamStateHandle<T extends 
Serializable>
         return wrappedStreamStateHandle.asBytesIfInMemory();
     }
 
+    @Override
+    public Optional<Path> maybeGetPath() {
+        return wrappedStreamStateHandle.maybeGetPath();
+    }
+
     @Override
     public PhysicalStateHandleID getStreamStateHandleID() {
         return wrappedStreamStateHandle.getStreamStateHandleID();
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/state/StreamStateHandle.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/state/StreamStateHandle.java
index a2290d47e91..72df5b53356 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/state/StreamStateHandle.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/state/StreamStateHandle.java
@@ -38,6 +38,14 @@ public interface StreamStateHandle extends StateObject {
     /** @return Content of this handle as bytes array if it is already in 
memory. */
     Optional<byte[]> asBytesIfInMemory();
 
+    /**
+     * @return Path to an underlying file represented by this {@link 
StreamStateHandle} or {@link
+     *     Optional#empty()} if there is no such file.
+     */
+    default Optional<org.apache.flink.core.fs.Path> maybeGetPath() {
+        return Optional.empty();
+    }
+
     /** @return a unique identifier of this handle. */
     PhysicalStateHandleID getStreamStateHandleID();
 }
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/state/filesystem/FileStateHandle.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/state/filesystem/FileStateHandle.java
index 4d3fe238fcc..fce6d499ef6 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/state/filesystem/FileStateHandle.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/state/filesystem/FileStateHandle.java
@@ -58,6 +58,11 @@ public class FileStateHandle implements StreamStateHandle {
         this.stateSize = stateSize;
     }
 
+    @Override
+    public Optional<Path> maybeGetPath() {
+        return Optional.of(getFilePath());
+    }
+
     /**
      * Gets the path where this handle's state is stored.
      *


Reply via email to