This is an automated email from the ASF dual-hosted git repository.

pnowojski pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit dd853492a970efde6fdbb98230da6f17000253ce
Author: Piotr Nowojski <[email protected]>
AuthorDate: Wed Jan 17 16:27:02 2024 +0100

    [FLINK-35768][state,rocksdb] Use fast files download in 
RocksdBStateDownloader
---
 .../java/org/apache/flink/core/fs/FileSystem.java  |   7 +-
 .../streaming/state/RocksDBStateDownloader.java    | 112 +++++++++++++++++----
 2 files changed, 100 insertions(+), 19 deletions(-)

diff --git a/flink-core/src/main/java/org/apache/flink/core/fs/FileSystem.java 
b/flink-core/src/main/java/org/apache/flink/core/fs/FileSystem.java
index b6cd7a49276..3beab0e05df 100644
--- a/flink-core/src/main/java/org/apache/flink/core/fs/FileSystem.java
+++ b/flink-core/src/main/java/org/apache/flink/core/fs/FileSystem.java
@@ -995,7 +995,8 @@ public abstract class FileSystem implements IFileSystem {
     // ------------------------------------------------------------------------
 
     /** An identifier of a file system, via its scheme and its authority. */
-    private static final class FSKey {
+    @Internal
+    public static final class FSKey {
 
         /** The scheme of the file system. */
         private final String scheme;
@@ -1014,6 +1015,10 @@ public abstract class FileSystem implements IFileSystem {
             this.authority = authority;
         }
 
+        public FSKey(URI uri) {
+            this(uri.getScheme(), uri.getAuthority());
+        }
+
         @Override
         public boolean equals(final Object obj) {
             if (obj == this) {
diff --git 
a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBStateDownloader.java
 
b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBStateDownloader.java
index ec7cd945bc7..02476e98bb4 100644
--- 
a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBStateDownloader.java
+++ 
b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBStateDownloader.java
@@ -20,7 +20,10 @@ package org.apache.flink.contrib.streaming.state;
 import org.apache.flink.annotation.VisibleForTesting;
 import org.apache.flink.core.fs.CloseableRegistry;
 import org.apache.flink.core.fs.FSDataInputStream;
-import org.apache.flink.runtime.state.IncrementalKeyedStateHandle;
+import org.apache.flink.core.fs.FileSystem;
+import org.apache.flink.core.fs.PathsCopyingFileSystem;
+import org.apache.flink.core.fs.PathsCopyingFileSystem.CopyRequest;
+import 
org.apache.flink.runtime.state.IncrementalKeyedStateHandle.HandleAndLocalPath;
 import org.apache.flink.runtime.state.StreamStateHandle;
 import org.apache.flink.util.ExceptionUtils;
 import org.apache.flink.util.FileUtils;
@@ -29,6 +32,9 @@ import org.apache.flink.util.IOUtils;
 import org.apache.flink.util.concurrent.FutureUtils;
 import org.apache.flink.util.function.ThrowingRunnable;
 
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
 import java.io.Closeable;
 import java.io.IOException;
 import java.io.OutputStream;
@@ -36,14 +42,21 @@ import java.nio.file.Files;
 import java.nio.file.Path;
 import java.util.ArrayList;
 import java.util.Collection;
+import java.util.HashMap;
 import java.util.List;
+import java.util.Map;
+import java.util.Optional;
 import java.util.concurrent.CompletableFuture;
-import java.util.concurrent.ExecutionException;
 import java.util.stream.Collectors;
 import java.util.stream.Stream;
 
+import static java.nio.file.StandardOpenOption.CREATE_NEW;
+import static org.apache.flink.util.Preconditions.checkState;
+
 /** Help class for downloading RocksDB state files. */
 public class RocksDBStateDownloader implements Closeable {
+    private static final Logger LOG = 
LoggerFactory.getLogger(RocksDBStateDownloader.class);
+
     private final RocksDBStateDataTransferHelper transfer;
 
     @VisibleForTesting
@@ -103,27 +116,90 @@ public class RocksDBStateDownloader implements Closeable {
         }
     }
 
-    private List<Runnable> createDownloadRunnables(
+    private Collection<Runnable> createDownloadRunnables(
             Collection<StateHandleDownloadSpec> downloadRequests,
-            CloseableRegistry closeableRegistry) {
+            CloseableRegistry closeableRegistry)
+            throws IOException {
+        // We need to support recovery from multiple FileSystems. At least one 
scenario that it can
+        // happen is when:
+        // 1. A checkpoint/savepoint is created on FileSystem_1
+        // 2. Job terminates
+        // 3. Configuration is changed use checkpoint directory using 
FileSystem_2
+        // 4. Job is restarted from checkpoint (1.) using claim mode
+        // 5. New incremental checkpoint is created, that can refer to files 
both from FileSystem_1
+        // and FileSystem_2.
+        Map<FileSystem.FSKey, List<CopyRequest>> filesSystemsFilesToDownload = 
new HashMap<>();
         List<Runnable> runnables = new ArrayList<>();
-        for (StateHandleDownloadSpec downloadRequest : downloadRequests) {
-            Stream.concat(
-                            
downloadRequest.getStateHandle().getSharedState().stream(),
-                            
downloadRequest.getStateHandle().getPrivateState().stream())
-                    .map(
-                            handleAndLocalPath ->
-                                    runnables.add(
-                                            createDownloadRunnableUsingStreams(
-                                                    
handleAndLocalPath.getHandle(),
-                                                    downloadRequest
-                                                            
.getDownloadDestination()
-                                                            
.resolve(handleAndLocalPath.getLocalPath()),
-                                                    closeableRegistry)));
+
+        for (StateHandleDownloadSpec downloadSpec : downloadRequests) {
+            for (HandleAndLocalPath handleAndLocalPath : 
getAllHandles(downloadSpec)) {
+                Path downloadDestination =
+                        downloadSpec
+                                .getDownloadDestination()
+                                .resolve(handleAndLocalPath.getLocalPath());
+                if (canCopyPaths(handleAndLocalPath)) {
+                    org.apache.flink.core.fs.Path remotePath =
+                            
handleAndLocalPath.getHandle().maybeGetPath().get();
+                    FileSystem.FSKey newFSKey = new 
FileSystem.FSKey(remotePath.toUri());
+                    filesSystemsFilesToDownload
+                            .computeIfAbsent(newFSKey, fsKey -> new 
ArrayList<>())
+                            .add(
+                                    CopyRequest.of(
+                                            remotePath,
+                                            new org.apache.flink.core.fs.Path(
+                                                    
downloadDestination.toUri())));
+                } else {
+                    runnables.add(
+                            createDownloadRunnableUsingStreams(
+                                    handleAndLocalPath.getHandle(),
+                                    downloadDestination,
+                                    closeableRegistry));
+                }
+            }
+        }
+
+        for (List<CopyRequest> filesToDownload : 
filesSystemsFilesToDownload.values()) {
+            checkState(!filesToDownload.isEmpty());
+            FileSystem srcFileSystem = 
FileSystem.get(filesToDownload.get(0).getSource().toUri());
+            runnables.add(
+                    createDownloadRunnableUsingCopyFiles(
+                            (PathsCopyingFileSystem) srcFileSystem,
+                            filesToDownload,
+                            closeableRegistry));
         }
+
         return runnables;
     }
 
+    private boolean canCopyPaths(HandleAndLocalPath handleAndLocalPath) throws 
IOException {
+        Optional<org.apache.flink.core.fs.Path> remotePath =
+                handleAndLocalPath.getHandle().maybeGetPath();
+        if (!remotePath.isPresent()) {
+            return false;
+        }
+        return FileSystem.get(remotePath.get().toUri())
+                .canCopyPaths(
+                        remotePath.get(),
+                        new 
org.apache.flink.core.fs.Path(handleAndLocalPath.getLocalPath()));
+    }
+
+    private Iterable<? extends HandleAndLocalPath> getAllHandles(
+            StateHandleDownloadSpec downloadSpec) {
+        return Stream.concat(
+                        
downloadSpec.getStateHandle().getSharedState().stream(),
+                        
downloadSpec.getStateHandle().getPrivateState().stream())
+                .collect(Collectors.toList());
+    }
+
+    private Runnable createDownloadRunnableUsingCopyFiles(
+            PathsCopyingFileSystem fileSystem,
+            List<CopyRequest> copyRequests,
+            CloseableRegistry closeableRegistry) {
+        LOG.debug("Using copy paths for {} of file system [{}]", copyRequests, 
fileSystem);
+        return ThrowingRunnable.unchecked(
+                () -> fileSystem.copyFiles(copyRequests, closeableRegistry));
+    }
+
     private Runnable createDownloadRunnableUsingStreams(
             StreamStateHandle remoteFileHandle,
             Path destination,
@@ -148,7 +224,7 @@ public class RocksDBStateDownloader implements Closeable {
             closeableRegistry.registerCloseable(inputStream);
 
             Files.createDirectories(restoreFilePath.getParent());
-            OutputStream outputStream = Files.newOutputStream(restoreFilePath);
+            OutputStream outputStream = Files.newOutputStream(restoreFilePath, 
CREATE_NEW);
             closeableRegistry.registerCloseable(outputStream);
 
             byte[] buffer = new byte[8 * 1024];

Reply via email to