This is an automated email from the ASF dual-hosted git repository. pnowojski pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/flink.git
commit dd853492a970efde6fdbb98230da6f17000253ce Author: Piotr Nowojski <[email protected]> AuthorDate: Wed Jan 17 16:27:02 2024 +0100 [FLINK-35768][state,rocksdb] Use fast files download in RocksdBStateDownloader --- .../java/org/apache/flink/core/fs/FileSystem.java | 7 +- .../streaming/state/RocksDBStateDownloader.java | 112 +++++++++++++++++---- 2 files changed, 100 insertions(+), 19 deletions(-) diff --git a/flink-core/src/main/java/org/apache/flink/core/fs/FileSystem.java b/flink-core/src/main/java/org/apache/flink/core/fs/FileSystem.java index b6cd7a49276..3beab0e05df 100644 --- a/flink-core/src/main/java/org/apache/flink/core/fs/FileSystem.java +++ b/flink-core/src/main/java/org/apache/flink/core/fs/FileSystem.java @@ -995,7 +995,8 @@ public abstract class FileSystem implements IFileSystem { // ------------------------------------------------------------------------ /** An identifier of a file system, via its scheme and its authority. */ - private static final class FSKey { + @Internal + public static final class FSKey { /** The scheme of the file system. */ private final String scheme; @@ -1014,6 +1015,10 @@ public abstract class FileSystem implements IFileSystem { this.authority = authority; } + public FSKey(URI uri) { + this(uri.getScheme(), uri.getAuthority()); + } + @Override public boolean equals(final Object obj) { if (obj == this) { diff --git a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBStateDownloader.java b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBStateDownloader.java index ec7cd945bc7..02476e98bb4 100644 --- a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBStateDownloader.java +++ b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBStateDownloader.java @@ -20,7 +20,10 @@ package org.apache.flink.contrib.streaming.state; import org.apache.flink.annotation.VisibleForTesting; import org.apache.flink.core.fs.CloseableRegistry; import org.apache.flink.core.fs.FSDataInputStream; -import org.apache.flink.runtime.state.IncrementalKeyedStateHandle; +import org.apache.flink.core.fs.FileSystem; +import org.apache.flink.core.fs.PathsCopyingFileSystem; +import org.apache.flink.core.fs.PathsCopyingFileSystem.CopyRequest; +import org.apache.flink.runtime.state.IncrementalKeyedStateHandle.HandleAndLocalPath; import org.apache.flink.runtime.state.StreamStateHandle; import org.apache.flink.util.ExceptionUtils; import org.apache.flink.util.FileUtils; @@ -29,6 +32,9 @@ import org.apache.flink.util.IOUtils; import org.apache.flink.util.concurrent.FutureUtils; import org.apache.flink.util.function.ThrowingRunnable; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + import java.io.Closeable; import java.io.IOException; import java.io.OutputStream; @@ -36,14 +42,21 @@ import java.nio.file.Files; import java.nio.file.Path; import java.util.ArrayList; import java.util.Collection; +import java.util.HashMap; import java.util.List; +import java.util.Map; +import java.util.Optional; import java.util.concurrent.CompletableFuture; -import java.util.concurrent.ExecutionException; import java.util.stream.Collectors; import java.util.stream.Stream; +import static java.nio.file.StandardOpenOption.CREATE_NEW; +import static org.apache.flink.util.Preconditions.checkState; + /** Help class for downloading RocksDB state files. */ public class RocksDBStateDownloader implements Closeable { + private static final Logger LOG = LoggerFactory.getLogger(RocksDBStateDownloader.class); + private final RocksDBStateDataTransferHelper transfer; @VisibleForTesting @@ -103,27 +116,90 @@ public class RocksDBStateDownloader implements Closeable { } } - private List<Runnable> createDownloadRunnables( + private Collection<Runnable> createDownloadRunnables( Collection<StateHandleDownloadSpec> downloadRequests, - CloseableRegistry closeableRegistry) { + CloseableRegistry closeableRegistry) + throws IOException { + // We need to support recovery from multiple FileSystems. At least one scenario that it can + // happen is when: + // 1. A checkpoint/savepoint is created on FileSystem_1 + // 2. Job terminates + // 3. Configuration is changed use checkpoint directory using FileSystem_2 + // 4. Job is restarted from checkpoint (1.) using claim mode + // 5. New incremental checkpoint is created, that can refer to files both from FileSystem_1 + // and FileSystem_2. + Map<FileSystem.FSKey, List<CopyRequest>> filesSystemsFilesToDownload = new HashMap<>(); List<Runnable> runnables = new ArrayList<>(); - for (StateHandleDownloadSpec downloadRequest : downloadRequests) { - Stream.concat( - downloadRequest.getStateHandle().getSharedState().stream(), - downloadRequest.getStateHandle().getPrivateState().stream()) - .map( - handleAndLocalPath -> - runnables.add( - createDownloadRunnableUsingStreams( - handleAndLocalPath.getHandle(), - downloadRequest - .getDownloadDestination() - .resolve(handleAndLocalPath.getLocalPath()), - closeableRegistry))); + + for (StateHandleDownloadSpec downloadSpec : downloadRequests) { + for (HandleAndLocalPath handleAndLocalPath : getAllHandles(downloadSpec)) { + Path downloadDestination = + downloadSpec + .getDownloadDestination() + .resolve(handleAndLocalPath.getLocalPath()); + if (canCopyPaths(handleAndLocalPath)) { + org.apache.flink.core.fs.Path remotePath = + handleAndLocalPath.getHandle().maybeGetPath().get(); + FileSystem.FSKey newFSKey = new FileSystem.FSKey(remotePath.toUri()); + filesSystemsFilesToDownload + .computeIfAbsent(newFSKey, fsKey -> new ArrayList<>()) + .add( + CopyRequest.of( + remotePath, + new org.apache.flink.core.fs.Path( + downloadDestination.toUri()))); + } else { + runnables.add( + createDownloadRunnableUsingStreams( + handleAndLocalPath.getHandle(), + downloadDestination, + closeableRegistry)); + } + } + } + + for (List<CopyRequest> filesToDownload : filesSystemsFilesToDownload.values()) { + checkState(!filesToDownload.isEmpty()); + FileSystem srcFileSystem = FileSystem.get(filesToDownload.get(0).getSource().toUri()); + runnables.add( + createDownloadRunnableUsingCopyFiles( + (PathsCopyingFileSystem) srcFileSystem, + filesToDownload, + closeableRegistry)); } + return runnables; } + private boolean canCopyPaths(HandleAndLocalPath handleAndLocalPath) throws IOException { + Optional<org.apache.flink.core.fs.Path> remotePath = + handleAndLocalPath.getHandle().maybeGetPath(); + if (!remotePath.isPresent()) { + return false; + } + return FileSystem.get(remotePath.get().toUri()) + .canCopyPaths( + remotePath.get(), + new org.apache.flink.core.fs.Path(handleAndLocalPath.getLocalPath())); + } + + private Iterable<? extends HandleAndLocalPath> getAllHandles( + StateHandleDownloadSpec downloadSpec) { + return Stream.concat( + downloadSpec.getStateHandle().getSharedState().stream(), + downloadSpec.getStateHandle().getPrivateState().stream()) + .collect(Collectors.toList()); + } + + private Runnable createDownloadRunnableUsingCopyFiles( + PathsCopyingFileSystem fileSystem, + List<CopyRequest> copyRequests, + CloseableRegistry closeableRegistry) { + LOG.debug("Using copy paths for {} of file system [{}]", copyRequests, fileSystem); + return ThrowingRunnable.unchecked( + () -> fileSystem.copyFiles(copyRequests, closeableRegistry)); + } + private Runnable createDownloadRunnableUsingStreams( StreamStateHandle remoteFileHandle, Path destination, @@ -148,7 +224,7 @@ public class RocksDBStateDownloader implements Closeable { closeableRegistry.registerCloseable(inputStream); Files.createDirectories(restoreFilePath.getParent()); - OutputStream outputStream = Files.newOutputStream(restoreFilePath); + OutputStream outputStream = Files.newOutputStream(restoreFilePath, CREATE_NEW); closeableRegistry.registerCloseable(outputStream); byte[] buffer = new byte[8 * 1024];
