This is an automated email from the ASF dual-hosted git repository. pnowojski pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/flink.git
commit 5c4b1aa2bd906bff076e48a5161ba394007cf657 Author: Piotr Nowojski <[email protected]> AuthorDate: Thu Nov 9 14:58:53 2023 +0100 [FLINK-35767][filesystems] Add PathsCopyingFileSystem --- .../java/org/apache/flink/core/fs/FileSystem.java | 302 ++++----------- .../java/org/apache/flink/core/fs/IFileSystem.java | 415 +++++++++++++++++++++ .../flink/core/fs/PathsCopyingFileSystem.java | 91 +++++ .../flink/core/fs/PluginFileSystemFactory.java | 18 +- .../flink/core/fs/SafetyNetWrapperFileSystem.java | 15 +- .../flink/runtime/state/KeyGroupsStateHandle.java | 5 + .../runtime/state/OperatorStreamStateHandle.java | 5 + .../state/RetrievableStreamStateHandle.java | 5 + .../flink/runtime/state/StreamStateHandle.java | 8 + .../runtime/state/filesystem/FileStateHandle.java | 5 + 10 files changed, 636 insertions(+), 233 deletions(-) diff --git a/flink-core/src/main/java/org/apache/flink/core/fs/FileSystem.java b/flink-core/src/main/java/org/apache/flink/core/fs/FileSystem.java index cee47823e1f..b6cd7a49276 100644 --- a/flink-core/src/main/java/org/apache/flink/core/fs/FileSystem.java +++ b/flink-core/src/main/java/org/apache/flink/core/fs/FileSystem.java @@ -196,8 +196,7 @@ import static org.apache.flink.util.Preconditions.checkNotNull; * @see FSDataOutputStream */ @Public -public abstract class FileSystem { - +public abstract class FileSystem implements IFileSystem { /** * The possible write modes. The write mode decides what happens if a file should be created, * but already exists. @@ -217,8 +216,6 @@ public abstract class FileSystem { OVERWRITE } - // ------------------------------------------------------------------------ - /** Logger for all FileSystem work. */ private static final Logger LOG = LoggerFactory.getLogger(FileSystem.class); @@ -570,140 +567,9 @@ public abstract class FileSystem { } // ------------------------------------------------------------------------ - // File System Methods + // File System Methods deprecated // ------------------------------------------------------------------------ - /** - * Returns the path of the file system's current working directory. - * - * @return the path of the file system's current working directory - */ - public abstract Path getWorkingDirectory(); - - /** - * Returns the path of the user's home directory in this file system. - * - * @return the path of the user's home directory in this file system. - */ - public abstract Path getHomeDirectory(); - - /** - * Returns a URI whose scheme and authority identify this file system. - * - * @return a URI whose scheme and authority identify this file system - */ - public abstract URI getUri(); - - /** - * Return a file status object that represents the path. - * - * @param f The path we want information from - * @return a FileStatus object - * @throws FileNotFoundException when the path does not exist; IOException see specific - * implementation - */ - public abstract FileStatus getFileStatus(Path f) throws IOException; - - /** - * Return an array containing hostnames, offset and size of portions of the given file. For a - * nonexistent file or regions, null will be returned. This call is most helpful with DFS, where - * it returns hostnames of machines that contain the given file. The FileSystem will simply - * return an elt containing 'localhost'. - */ - public abstract BlockLocation[] getFileBlockLocations(FileStatus file, long start, long len) - throws IOException; - - /** - * Opens an FSDataInputStream at the indicated Path. - * - * @param f the file name to open - * @param bufferSize the size of the buffer to be used. - */ - public abstract FSDataInputStream open(Path f, int bufferSize) throws IOException; - - /** - * Opens an FSDataInputStream at the indicated Path. - * - * @param f the file to open - */ - public abstract FSDataInputStream open(Path f) throws IOException; - - /** - * Creates a new {@link RecoverableWriter}. A recoverable writer creates streams that can - * persist and recover their intermediate state. Persisting and recovering intermediate state is - * a core building block for writing to files that span multiple checkpoints. - * - * <p>The returned object can act as a shared factory to open and recover multiple streams. - * - * <p>This method is optional on file systems and various file system implementations may not - * support this method, throwing an {@code UnsupportedOperationException}. - * - * @return A RecoverableWriter for this file system. - * @throws IOException Thrown, if the recoverable writer cannot be instantiated. - */ - public RecoverableWriter createRecoverableWriter() throws IOException { - throw new UnsupportedOperationException( - "This file system does not support recoverable writers."); - } - - /** - * Return the number of bytes that large input files should be optimally be split into to - * minimize I/O time. - * - * @return the number of bytes that large input files should be optimally be split into to - * minimize I/O time - * @deprecated This value is no longer used and is meaningless. - */ - @Deprecated - public long getDefaultBlockSize() { - return 32 * 1024 * 1024; // 32 MB; - } - - /** - * List the statuses of the files/directories in the given path if the path is a directory. - * - * @param f given path - * @return the statuses of the files/directories in the given path - * @throws IOException - */ - public abstract FileStatus[] listStatus(Path f) throws IOException; - - /** - * Check if exists. - * - * @param f source file - */ - public boolean exists(final Path f) throws IOException { - try { - return (getFileStatus(f) != null); - } catch (FileNotFoundException e) { - return false; - } - } - - /** - * Delete a file. - * - * @param f the path to delete - * @param recursive if path is a directory and set to <code>true</code>, the directory is - * deleted else throws an exception. In case of a file the recursive can be set to either - * <code>true</code> or <code>false</code> - * @return <code>true</code> if delete is successful, <code>false</code> otherwise - * @throws IOException - */ - public abstract boolean delete(Path f, boolean recursive) throws IOException; - - /** - * Make the given file and all non-existent parents into directories. Has the semantics of Unix - * 'mkdir -p'. Existence of the directory hierarchy is not an error. - * - * @param f the directory/directories to be created - * @return <code>true</code> if at least one new directory has been created, <code>false</code> - * otherwise - * @throws IOException thrown if an I/O error occurs while creating the directory - */ - public abstract boolean mkdirs(Path f) throws IOException; - /** * Opens an FSDataOutputStream at the indicated Path. * @@ -739,7 +605,7 @@ public abstract class FileSystem { * overwritten, and if false an error will be thrown. * @throws IOException Thrown, if the stream could not be opened because of an I/O, or because a * file already exists at that path and the write mode indicates to not overwrite the file. - * @deprecated Use {@link #create(Path, WriteMode)} instead. + * @deprecated Use {@link #create(Path, FileSystem.WriteMode)} instead. */ @Deprecated public FSDataOutputStream create(Path f, boolean overwrite) throws IOException { @@ -747,92 +613,86 @@ public abstract class FileSystem { } /** - * Opens an FSDataOutputStream to a new file at the given path. - * - * <p>If the file already exists, the behavior depends on the given {@code WriteMode}. If the - * mode is set to {@link WriteMode#NO_OVERWRITE}, then this method fails with an exception. + * Gets a description of the characteristics of this file system. * - * @param f The file path to write to - * @param overwriteMode The action to take if a file or directory already exists at the given - * path. - * @return The stream to the new file at the target path. - * @throws IOException Thrown, if the stream could not be opened because of an I/O, or because a - * file already exists at that path and the write mode indicates to not overwrite the file. + * @deprecated this method is not used anymore. */ - public abstract FSDataOutputStream create(Path f, WriteMode overwriteMode) throws IOException; + @Deprecated + public abstract FileSystemKind getKind(); /** - * Renames the file/directory src to dst. + * Return the number of bytes that large input files should be optimally be split into to + * minimize I/O time. * - * @param src the file/directory to rename - * @param dst the new name of the file/directory - * @return <code>true</code> if the renaming was successful, <code>false</code> otherwise - * @throws IOException + * @return the number of bytes that large input files should be optimally be split into to + * minimize I/O time + * @deprecated This value is no longer used and is meaningless. */ + @Deprecated + public long getDefaultBlockSize() { + return 32 * 1024 * 1024; // 32 MB; + } + + // ------------------------------------------------------------------------ + // File System Methods kept for binary compatibility. Please check {@link IFileSystem} for the + // documentation. + // ------------------------------------------------------------------------ + + @Override + public abstract Path getWorkingDirectory(); + + @Override + public abstract Path getHomeDirectory(); + + @Override + public abstract URI getUri(); + + @Override + public abstract FileStatus getFileStatus(Path f) throws IOException; + + @Override + public abstract BlockLocation[] getFileBlockLocations(FileStatus file, long start, long len) + throws IOException; + + @Override + public abstract FSDataInputStream open(Path f, int bufferSize) throws IOException; + + @Override + public abstract FSDataInputStream open(Path f) throws IOException; + + @Override + public RecoverableWriter createRecoverableWriter() throws IOException { + return IFileSystem.super.createRecoverableWriter(); + } + + @Override + public abstract FileStatus[] listStatus(Path f) throws IOException; + + @Override + public boolean exists(final Path f) throws IOException { + return IFileSystem.super.exists(f); + } + + @Override + public abstract boolean delete(Path f, boolean recursive) throws IOException; + + @Override + public abstract boolean mkdirs(Path f) throws IOException; + + @Override + public abstract FSDataOutputStream create(Path f, WriteMode overwriteMode) throws IOException; + + @Override public abstract boolean rename(Path src, Path dst) throws IOException; - /** - * Returns true if this is a distributed file system. A distributed file system here means that - * the file system is shared among all Flink processes that participate in a cluster or job and - * that all these processes can see the same files. - * - * @return True, if this is a distributed file system, false otherwise. - */ + @Override public abstract boolean isDistributedFS(); - /** - * Gets a description of the characteristics of this file system. - * - * @deprecated this method is not used anymore. - */ - @Deprecated - public abstract FileSystemKind getKind(); - // ------------------------------------------------------------------------ // output directory initialization // ------------------------------------------------------------------------ - /** - * Initializes output directories on local file systems according to the given write mode. - * - * <ul> - * <li>WriteMode.NO_OVERWRITE & parallel output: - * <ul> - * <li>A directory is created if the output path does not exist. - * <li>An existing directory is reused, files contained in the directory are NOT - * deleted. - * <li>An existing file raises an exception. - * </ul> - * <li>WriteMode.NO_OVERWRITE & NONE parallel output: - * <ul> - * <li>An existing file or directory raises an exception. - * </ul> - * <li>WriteMode.OVERWRITE & parallel output: - * <ul> - * <li>A directory is created if the output path does not exist. - * <li>An existing directory is reused, files contained in the directory are NOT - * deleted. - * <li>An existing file is deleted and replaced by a new directory. - * </ul> - * <li>WriteMode.OVERWRITE & NONE parallel output: - * <ul> - * <li>An existing file or directory (and all its content) is deleted - * </ul> - * </ul> - * - * <p>Files contained in an existing directory are not deleted, because multiple instances of a - * DataSinkTask might call this function at the same time and hence might perform concurrent - * delete operations on the file system (possibly deleting output files of concurrently running - * tasks). Since concurrent DataSinkTasks are not aware of each other, coordination of delete - * and create operations would be difficult. - * - * @param outPath Output path that should be prepared. - * @param writeMode Write mode to consider. - * @param createDirectory True, to initialize a directory at the given path, false to prepare - * space for a file. - * @return True, if the path was successfully prepared, false otherwise. - * @throws IOException Thrown, if any of the file system access operations failed. - */ + @Override public boolean initOutPathLocalFS(Path outPath, WriteMode writeMode, boolean createDirectory) throws IOException { if (isDistributedFS()) { @@ -843,6 +703,7 @@ public abstract class FileSystem { // concurrently work in this method (multiple output formats writing locally) might end // up deleting each other's directories and leave non-retrievable files, without necessarily // causing an exception. That results in very subtle issues, like output files looking as if + // they are not getting created. // we acquire the lock interruptibly here, to make sure that concurrent threads waiting @@ -945,28 +806,7 @@ public abstract class FileSystem { } } - /** - * Initializes output directories on distributed file systems according to the given write mode. - * - * <p>WriteMode.NO_OVERWRITE & parallel output: - A directory is created if the output path - * does not exist. - An existing file or directory raises an exception. - * - * <p>WriteMode.NO_OVERWRITE & NONE parallel output: - An existing file or directory raises - * an exception. - * - * <p>WriteMode.OVERWRITE & parallel output: - A directory is created if the output path - * does not exist. - An existing directory and its content is deleted and a new directory is - * created. - An existing file is deleted and replaced by a new directory. - * - * <p>WriteMode.OVERWRITE & NONE parallel output: - An existing file or directory is deleted - * and replaced by a new directory. - * - * @param outPath Output path that should be prepared. - * @param writeMode Write mode to consider. - * @param createDirectory True, to initialize a directory at the given path, false otherwise. - * @return True, if the path was successfully prepared, false otherwise. - * @throws IOException Thrown, if any of the file system access operations failed. - */ + @Override public boolean initOutPathDistFS(Path outPath, WriteMode writeMode, boolean createDirectory) throws IOException { if (!isDistributedFS()) { diff --git a/flink-core/src/main/java/org/apache/flink/core/fs/IFileSystem.java b/flink-core/src/main/java/org/apache/flink/core/fs/IFileSystem.java new file mode 100644 index 00000000000..dcbd79aa3f9 --- /dev/null +++ b/flink-core/src/main/java/org/apache/flink/core/fs/IFileSystem.java @@ -0,0 +1,415 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +/* + * This file is based on source code from the Hadoop Project (http://hadoop.apache.org/), licensed by the Apache + * Software Foundation (ASF) under the Apache License, Version 2.0. See the NOTICE file distributed with this work for + * additional information regarding copyright ownership. + */ + +package org.apache.flink.core.fs; + +import org.apache.flink.annotation.Experimental; +import org.apache.flink.core.fs.local.LocalFileSystem; + +import java.io.FileNotFoundException; +import java.io.IOException; +import java.net.URI; + +/** + * Interface of all file systems used by Flink. This interface may be extended to implement + * distributed file systems, or local file systems. The abstraction by this file system is very + * simple, and the set of available operations quite limited, to support the common denominator of a + * wide range of file systems. For example, appending to or mutating existing files is not + * supported. + * + * <p>Flink implements and supports some file system types directly (for example the default + * machine-local file system). Other file system types are accessed by an implementation that + * bridges to the suite of file systems supported by Hadoop (such as for example HDFS). + * + * <h2>Scope and Purpose</h2> + * + * <p>The purpose of this abstraction is used to expose a common and well defined interface for + * access to files. This abstraction is used both by Flink's fault tolerance mechanism (storing + * state and recovery data) and by reusable built-in connectors (file sources / sinks). + * + * <p>The purpose of this abstraction is <b>not</b> to give user programs an abstraction with + * extreme flexibility and control across all possible file systems. That mission would be a folly, + * as the differences in characteristics of even the most common file systems are already quite + * large. It is expected that user programs that need specialized functionality of certain file + * systems in their functions, operations, sources, or sinks instantiate the specialized file system + * adapters directly. + * + * <h2>Data Persistence Contract</h2> + * + * <p>The FileSystem's {@link FSDataOutputStream output streams} are used to persistently store + * data, both for results of streaming applications and for fault tolerance and recovery. It is + * therefore crucial that the persistence semantics of these streams are well defined. + * + * <h3>Definition of Persistence Guarantees</h3> + * + * <p>Data written to an output stream is considered persistent, if two requirements are met: + * + * <ol> + * <li><b>Visibility Requirement:</b> It must be guaranteed that all other processes, machines, + * virtual machines, containers, etc. that are able to access the file see the data + * consistently when given the absolute file path. This requirement is similar to the + * <i>close-to-open</i> semantics defined by POSIX, but restricted to the file itself (by its + * absolute path). + * <li><b>Durability Requirement:</b> The file system's specific durability/persistence + * requirements must be met. These are specific to the particular file system. For example the + * {@link LocalFileSystem} does not provide any durability guarantees for crashes of both + * hardware and operating system, while replicated distributed file systems (like HDFS) + * typically guarantee durability in the presence of at most <i>n</i> concurrent node + * failures, where <i>n</i> is the replication factor. + * </ol> + * + * <p>Updates to the file's parent directory (such that the file shows up when listing the directory + * contents) are not required to be complete for the data in the file stream to be considered + * persistent. This relaxation is important for file systems where updates to directory contents are + * only eventually consistent. + * + * <p>The {@link FSDataOutputStream} has to guarantee data persistence for the written bytes once + * the call to {@link FSDataOutputStream#close()} returns. + * + * <h3>Examples</h3> + * + * <h4>Fault-tolerant distributed file systems</h4> + * + * <p>For <b>fault-tolerant distributed file systems</b>, data is considered persistent once it has + * been received and acknowledged by the file system, typically by having been replicated to a + * quorum of machines (<i>durability requirement</i>). In addition the absolute file path must be + * visible to all other machines that will potentially access the file (<i>visibility + * requirement</i>). + * + * <p>Whether data has hit non-volatile storage on the storage nodes depends on the specific + * guarantees of the particular file system. + * + * <p>The metadata updates to the file's parent directory are not required to have reached a + * consistent state. It is permissible that some machines see the file when listing the parent + * directory's contents while others do not, as long as access to the file by its absolute path is + * possible on all nodes. + * + * <h4>Local file systems</h4> + * + * <p>A <b>local file system</b> must support the POSIX <i>close-to-open</i> semantics. Because the + * local file system does not have any fault tolerance guarantees, no further requirements exist. + * + * <p>The above implies specifically that data may still be in the OS cache when considered + * persistent from the local file system's perspective. Crashes that cause the OS cache to lose data + * are considered fatal to the local machine and are not covered by the local file system's + * guarantees as defined by Flink. + * + * <p>That means that computed results, checkpoints, and savepoints that are written only to the + * local filesystem are not guaranteed to be recoverable from the local machine's failure, making + * local file systems unsuitable for production setups. + * + * <h2>Updating File Contents</h2> + * + * <p>Many file systems either do not support overwriting contents of existing files at all, or do + * not support consistent visibility of the updated contents in that case. For that reason, Flink's + * FileSystem does not support appending to existing files, or seeking within output streams so that + * previously written data could be overwritten. + * + * <h2>Overwriting Files</h2> + * + * <p>Overwriting files is in general possible. A file is overwritten by deleting it and creating a + * new file. However, certain filesystems cannot make that change synchronously visible to all + * parties that have access to the file. For example <a + * href="https://aws.amazon.com/documentation/s3/">Amazon S3</a> guarantees only <i>eventual + * consistency</i> in the visibility of the file replacement: Some machines may see the old file, + * some machines may see the new file. + * + * <p>To avoid these consistency issues, the implementations of failure/recovery mechanisms in Flink + * strictly avoid writing to the same file path more than once. + * + * <h2>Thread Safety</h2> + * + * <p>Implementations of {@code FileSystem} must be thread-safe: The same instance of FileSystem is + * frequently shared across multiple threads in Flink and must be able to concurrently create + * input/output streams and list file metadata. + * + * <p>The {@link FSDataInputStream} and {@link FSDataOutputStream} implementations are strictly + * <b>not thread-safe</b>. Instances of the streams should also not be passed between threads in + * between read or write operations, because there are no guarantees about the visibility of + * operations across threads (many operations do not create memory fences). + * + * <h2>Streams Safety Net</h2> + * + * <p>When application code obtains a FileSystem (via {@link FileSystem#get(URI)} or via {@link + * Path#getFileSystem()}), the FileSystem instantiates a safety net for that FileSystem. The safety + * net ensures that all streams created from the FileSystem are closed when the application task + * finishes (or is canceled or failed). That way, the task's threads do not leak connections. + * + * <p>Internal runtime code can explicitly obtain a FileSystem that does not use the safety net via + * {@link FileSystem#getUnguardedFileSystem(URI)}. + * + * @see FSDataInputStream + * @see FSDataOutputStream + */ +@Experimental +public interface IFileSystem { + + /** + * Returns the path of the file system's current working directory. + * + * @return the path of the file system's current working directory + */ + Path getWorkingDirectory(); + + /** + * Returns the path of the user's home directory in this file system. + * + * @return the path of the user's home directory in this file system. + */ + Path getHomeDirectory(); + + /** + * Returns a URI whose scheme and authority identify this file system. + * + * @return a URI whose scheme and authority identify this file system + */ + URI getUri(); + + /** + * Return a file status object that represents the path. + * + * @param f The path we want information from + * @return a FileStatus object + * @throws FileNotFoundException when the path does not exist; IOException see specific + * implementation + */ + FileStatus getFileStatus(Path f) throws IOException; + + /** + * Return an array containing hostnames, offset and size of portions of the given file. For a + * nonexistent file or regions, null will be returned. This call is most helpful with DFS, where + * it returns hostnames of machines that contain the given file. The FileSystem will simply + * return an elt containing 'localhost'. + */ + BlockLocation[] getFileBlockLocations(FileStatus file, long start, long len) throws IOException; + + /** + * Opens an FSDataInputStream at the indicated Path. + * + * @param f the file name to open + * @param bufferSize the size of the buffer to be used. + */ + FSDataInputStream open(Path f, int bufferSize) throws IOException; + + /** + * Opens an FSDataInputStream at the indicated Path. + * + * @param f the file to open + */ + FSDataInputStream open(Path f) throws IOException; + + /** + * Creates a new {@link RecoverableWriter}. A recoverable writer creates streams that can + * persist and recover their intermediate state. Persisting and recovering intermediate state is + * a core building block for writing to files that span multiple checkpoints. + * + * <p>The returned object can act as a shared factory to open and recover multiple streams. + * + * <p>This method is optional on file systems and various file system implementations may not + * support this method, throwing an {@code UnsupportedOperationException}. + * + * @return A RecoverableWriter for this file system. + * @throws IOException Thrown, if the recoverable writer cannot be instantiated. + */ + default RecoverableWriter createRecoverableWriter() throws IOException { + throw new UnsupportedOperationException( + "This file system does not support recoverable writers."); + } + + /** + * List the statuses of the files/directories in the given path if the path is a directory. + * + * @param f given path + * @return the statuses of the files/directories in the given path + * @throws IOException + */ + FileStatus[] listStatus(Path f) throws IOException; + + /** + * Check if exists. + * + * @param f source file + */ + default boolean exists(final Path f) throws IOException { + try { + return (getFileStatus(f) != null); + } catch (FileNotFoundException e) { + return false; + } + } + + /** + * Tells if this {@link FileSystem} supports an optimised way to directly copy between given + * paths. In other words if it implements {@link PathsCopyingFileSystem}. + * + * <p>At least one of, either source or destination belongs to this {@link IFileSystem}. One of + * them can point to the local file system. In other words this request can correspond to + * either: downloading a file from the remote file system, uploading a file to the remote file + * system or duplicating a file in the remote file system. + * + * @param source The path of the source file to duplicate + * @param destination The path where to duplicate the source file + * @return true, if this {@link IFileSystem} can perform this operation more quickly compared to + * the generic code path of using streams. + */ + default boolean canCopyPaths(Path source, Path destination) throws IOException { + return false; + } + + /** + * Delete a file. + * + * @param f the path to delete + * @param recursive if path is a directory and set to <code>true</code>, the directory is + * deleted else throws an exception. In case of a file the recursive can be set to either + * <code>true</code> or <code>false</code> + * @return <code>true</code> if delete is successful, <code>false</code> otherwise + * @throws IOException + */ + boolean delete(Path f, boolean recursive) throws IOException; + + /** + * Make the given file and all non-existent parents into directories. Has the semantics of Unix + * 'mkdir -p'. Existence of the directory hierarchy is not an error. + * + * @param f the directory/directories to be created + * @return <code>true</code> if at least one new directory has been created, <code>false</code> + * otherwise + * @throws IOException thrown if an I/O error occurs while creating the directory + */ + boolean mkdirs(Path f) throws IOException; + + /** + * Opens an FSDataOutputStream to a new file at the given path. + * + * <p>If the file already exists, the behavior depends on the given {@code WriteMode}. If the + * mode is set to {@link FileSystem.WriteMode#NO_OVERWRITE}, then this method fails with an + * exception. + * + * @param f The file path to write to + * @param overwriteMode The action to take if a file or directory already exists at the given + * path. + * @return The stream to the new file at the target path. + * @throws IOException Thrown, if the stream could not be opened because of an I/O, or because a + * file already exists at that path and the write mode indicates to not overwrite the file. + */ + FSDataOutputStream create(Path f, FileSystem.WriteMode overwriteMode) throws IOException; + + /** + * Renames the file/directory src to dst. + * + * @param src the file/directory to rename + * @param dst the new name of the file/directory + * @return <code>true</code> if the renaming was successful, <code>false</code> otherwise + * @throws IOException + */ + boolean rename(Path src, Path dst) throws IOException; + + /** + * Returns true if this is a distributed file system. A distributed file system here means that + * the file system is shared among all Flink processes that participate in a cluster or job and + * that all these processes can see the same files. + * + * @return True, if this is a distributed file system, false otherwise. + */ + boolean isDistributedFS(); + + /** + * Gets a description of the characteristics of this file system. + * + * @deprecated this method is not used anymore. + */ + @Deprecated + FileSystemKind getKind(); + + /** + * Initializes output directories on local file systems according to the given write mode. + * + * <ul> + * <li>WriteMode.NO_OVERWRITE & parallel output: + * <ul> + * <li>A directory is created if the output path does not exist. + * <li>An existing directory is reused, files contained in the directory are NOT + * deleted. + * <li>An existing file raises an exception. + * </ul> + * <li>WriteMode.NO_OVERWRITE & NONE parallel output: + * <ul> + * <li>An existing file or directory raises an exception. + * </ul> + * <li>WriteMode.OVERWRITE & parallel output: + * <ul> + * <li>A directory is created if the output path does not exist. + * <li>An existing directory is reused, files contained in the directory are NOT + * deleted. + * <li>An existing file is deleted and replaced by a new directory. + * </ul> + * <li>WriteMode.OVERWRITE & NONE parallel output: + * <ul> + * <li>An existing file or directory (and all its content) is deleted + * </ul> + * </ul> + * + * <p>Files contained in an existing directory are not deleted, because multiple instances of a + * DataSinkTask might call this function at the same time and hence might perform concurrent + * delete operations on the file system (possibly deleting output files of concurrently running + * tasks). Since concurrent DataSinkTasks are not aware of each other, coordination of delete + * and create operations would be difficult. + * + * @param outPath Output path that should be prepared. + * @param writeMode Write mode to consider. + * @param createDirectory True, to initialize a directory at the given path, false to prepare + * space for a file. + * @return True, if the path was successfully prepared, false otherwise. + * @throws IOException Thrown, if any of the file system access operations failed. + */ + boolean initOutPathLocalFS( + Path outPath, FileSystem.WriteMode writeMode, boolean createDirectory) + throws IOException; + + /** + * Initializes output directories on distributed file systems according to the given write mode. + * + * <p>WriteMode.NO_OVERWRITE & parallel output: - A directory is created if the output path + * does not exist. - An existing file or directory raises an exception. + * + * <p>WriteMode.NO_OVERWRITE & NONE parallel output: - An existing file or directory raises + * an exception. + * + * <p>WriteMode.OVERWRITE & parallel output: - A directory is created if the output path + * does not exist. - An existing directory and its content is deleted and a new directory is + * created. - An existing file is deleted and replaced by a new directory. + * + * <p>WriteMode.OVERWRITE & NONE parallel output: - An existing file or directory is deleted + * and replaced by a new directory. + * + * @param outPath Output path that should be prepared. + * @param writeMode Write mode to consider. + * @param createDirectory True, to initialize a directory at the given path, false otherwise. + * @return True, if the path was successfully prepared, false otherwise. + * @throws IOException Thrown, if any of the file system access operations failed. + */ + boolean initOutPathDistFS(Path outPath, FileSystem.WriteMode writeMode, boolean createDirectory) + throws IOException; +} diff --git a/flink-core/src/main/java/org/apache/flink/core/fs/PathsCopyingFileSystem.java b/flink-core/src/main/java/org/apache/flink/core/fs/PathsCopyingFileSystem.java new file mode 100644 index 00000000000..a20f6822b3d --- /dev/null +++ b/flink-core/src/main/java/org/apache/flink/core/fs/PathsCopyingFileSystem.java @@ -0,0 +1,91 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +/* + * This file is based on source code from the Hadoop Project (http://hadoop.apache.org/), licensed by the Apache + * Software Foundation (ASF) under the Apache License, Version 2.0. See the NOTICE file distributed with this work for + * additional information regarding copyright ownership. + */ + +package org.apache.flink.core.fs; + +import org.apache.flink.annotation.Experimental; + +import java.io.IOException; +import java.util.List; + +/** + * An interface marking that given {@link FileSystem} have an optimised path for copying paths + * instead of using {@link FSDataOutputStream} or {@link FSDataInputStream}. + */ +@Experimental +public interface PathsCopyingFileSystem extends IFileSystem { + /** + * A pair of source and destination to duplicate a file. + * + * <p>At least one of, either source or destination belongs to this {@link + * PathsCopyingFileSystem}. One of them can point to the local file system. In other words this + * request can correspond to either: downloading a file from the remote file system, uploading a + * file to the remote file system or duplicating a file in the remote file system. + */ + interface CopyRequest { + /** The path of the source file to duplicate. */ + Path getSource(); + + /** The path where to duplicate the source file. */ + Path getDestination(); + + /** A factory method for creating a simple pair of source/destination. */ + static CopyRequest of(Path source, Path destination) { + return new CopyRequest() { + @Override + public Path getSource() { + return source; + } + + @Override + public Path getDestination() { + return destination; + } + + @Override + public String toString() { + return "CopyRequest{" + + "source=" + + source + + ", destination=" + + destination + + '}'; + } + }; + } + } + + /** + * List of {@link CopyRequest} to copy in batch by this {@link PathsCopyingFileSystem}. In case + * of an exception some files might have been already copied fully or partially. Caller should + * clean this up. Copy can be interrupted by the {@link CloseableRegistry}. + */ + void copyFiles(List<CopyRequest> requests, ICloseableRegistry closeableRegistry) + throws IOException; + + @Override + default boolean canCopyPaths(Path source, Path destination) throws IOException { + return true; + } +} diff --git a/flink-core/src/main/java/org/apache/flink/core/fs/PluginFileSystemFactory.java b/flink-core/src/main/java/org/apache/flink/core/fs/PluginFileSystemFactory.java index 0d1e889c61b..9e06cc52faf 100644 --- a/flink-core/src/main/java/org/apache/flink/core/fs/PluginFileSystemFactory.java +++ b/flink-core/src/main/java/org/apache/flink/core/fs/PluginFileSystemFactory.java @@ -23,6 +23,7 @@ import org.apache.flink.util.WrappingProxy; import java.io.IOException; import java.net.URI; +import java.util.List; /** * A wrapper around {@link FileSystemFactory} that ensures the plugin classloader is used for all @@ -69,7 +70,7 @@ public class PluginFileSystemFactory implements FileSystemFactory { } static class ClassLoaderFixingFileSystem extends FileSystem - implements WrappingProxy<FileSystem> { + implements WrappingProxy<FileSystem>, PathsCopyingFileSystem { private final FileSystem inner; private final ClassLoader loader; @@ -149,6 +150,21 @@ public class PluginFileSystemFactory implements FileSystemFactory { } } + @Override + public void copyFiles(List<CopyRequest> requests, ICloseableRegistry closeableRegistry) + throws IOException { + try (TemporaryClassLoaderContext ignored = TemporaryClassLoaderContext.of(loader)) { + ((PathsCopyingFileSystem) inner).copyFiles(requests, closeableRegistry); + } + } + + @Override + public boolean canCopyPaths(Path source, Path destination) throws IOException { + try (TemporaryClassLoaderContext ignored = TemporaryClassLoaderContext.of(loader)) { + return inner.canCopyPaths(source, destination); + } + } + @Override public boolean delete(final Path f, final boolean recursive) throws IOException { try (TemporaryClassLoaderContext ignored = TemporaryClassLoaderContext.of(loader)) { diff --git a/flink-core/src/main/java/org/apache/flink/core/fs/SafetyNetWrapperFileSystem.java b/flink-core/src/main/java/org/apache/flink/core/fs/SafetyNetWrapperFileSystem.java index e49d11695a0..13020b33a78 100644 --- a/flink-core/src/main/java/org/apache/flink/core/fs/SafetyNetWrapperFileSystem.java +++ b/flink-core/src/main/java/org/apache/flink/core/fs/SafetyNetWrapperFileSystem.java @@ -24,6 +24,7 @@ import org.apache.flink.util.WrappingProxy; import java.io.IOException; import java.net.URI; +import java.util.List; /** * This is a {@link WrappingProxy} around {@link FileSystem} which (i) wraps all opened streams as @@ -34,7 +35,8 @@ import java.net.URI; * prevent resource leaks from unclosed streams. */ @Internal -public class SafetyNetWrapperFileSystem extends FileSystem implements WrappingProxy<FileSystem> { +public class SafetyNetWrapperFileSystem extends FileSystem + implements WrappingProxy<FileSystem>, PathsCopyingFileSystem { private final SafetyNetCloseableRegistry registry; private final FileSystem unsafeFileSystem; @@ -45,6 +47,17 @@ public class SafetyNetWrapperFileSystem extends FileSystem implements WrappingPr this.unsafeFileSystem = Preconditions.checkNotNull(unsafeFileSystem); } + @Override + public void copyFiles(List<CopyRequest> requests, ICloseableRegistry closeableRegistry) + throws IOException { + ((PathsCopyingFileSystem) unsafeFileSystem).copyFiles(requests, closeableRegistry); + } + + @Override + public boolean canCopyPaths(Path source, Path destination) throws IOException { + return unsafeFileSystem.canCopyPaths(source, destination); + } + @Override public Path getWorkingDirectory() { return unsafeFileSystem.getWorkingDirectory(); diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/KeyGroupsStateHandle.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/KeyGroupsStateHandle.java index 94eedcf9642..3732aa2d30c 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/state/KeyGroupsStateHandle.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/KeyGroupsStateHandle.java @@ -161,6 +161,11 @@ public class KeyGroupsStateHandle implements StreamStateHandle, KeyedStateHandle return stateHandle.asBytesIfInMemory(); } + @Override + public Optional<org.apache.flink.core.fs.Path> maybeGetPath() { + return stateHandle.maybeGetPath(); + } + @Override public boolean equals(Object o) { if (this == o) { diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/OperatorStreamStateHandle.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/OperatorStreamStateHandle.java index 90c1f20b28b..8bd40f64184 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/state/OperatorStreamStateHandle.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/OperatorStreamStateHandle.java @@ -76,6 +76,11 @@ public class OperatorStreamStateHandle implements OperatorStateHandle { return delegateStateHandle.asBytesIfInMemory(); } + @Override + public Optional<org.apache.flink.core.fs.Path> maybeGetPath() { + return delegateStateHandle.maybeGetPath(); + } + @Override public PhysicalStateHandleID getStreamStateHandleID() { return delegateStateHandle.getStreamStateHandleID(); diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/RetrievableStreamStateHandle.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/RetrievableStreamStateHandle.java index e807b93dd63..fb5d4697173 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/state/RetrievableStreamStateHandle.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/RetrievableStreamStateHandle.java @@ -71,6 +71,11 @@ public class RetrievableStreamStateHandle<T extends Serializable> return wrappedStreamStateHandle.asBytesIfInMemory(); } + @Override + public Optional<Path> maybeGetPath() { + return wrappedStreamStateHandle.maybeGetPath(); + } + @Override public PhysicalStateHandleID getStreamStateHandleID() { return wrappedStreamStateHandle.getStreamStateHandleID(); diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/StreamStateHandle.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/StreamStateHandle.java index a2290d47e91..72df5b53356 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/state/StreamStateHandle.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/StreamStateHandle.java @@ -38,6 +38,14 @@ public interface StreamStateHandle extends StateObject { /** @return Content of this handle as bytes array if it is already in memory. */ Optional<byte[]> asBytesIfInMemory(); + /** + * @return Path to an underlying file represented by this {@link StreamStateHandle} or {@link + * Optional#empty()} if there is no such file. + */ + default Optional<org.apache.flink.core.fs.Path> maybeGetPath() { + return Optional.empty(); + } + /** @return a unique identifier of this handle. */ PhysicalStateHandleID getStreamStateHandleID(); } diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/filesystem/FileStateHandle.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/filesystem/FileStateHandle.java index 4d3fe238fcc..fce6d499ef6 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/state/filesystem/FileStateHandle.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/filesystem/FileStateHandle.java @@ -58,6 +58,11 @@ public class FileStateHandle implements StreamStateHandle { this.stateSize = stateSize; } + @Override + public Optional<Path> maybeGetPath() { + return Optional.of(getFilePath()); + } + /** * Gets the path where this handle's state is stored. *
