This is an automated email from the ASF dual-hosted git repository.

pnowojski pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/master by this push:
     new 56c81995d3b [FLINK-35772][filesystem] Drop DuplicatingFileSystem in 
favour of the newer PathsCopyingFileSystem
56c81995d3b is described below

commit 56c81995d3b34ed9066b6771755407b93438f5ab
Author: Piotr Nowojski <[email protected]>
AuthorDate: Tue Aug 20 14:48:15 2024 +0200

    [FLINK-35772][filesystem] Drop DuplicatingFileSystem in favour of the newer 
PathsCopyingFileSystem
---
 .../flink/core/fs/DuplicatingFileSystem.java       | 27 +++++-----------------
 .../state/filesystem/FsCheckpointStateToolset.java | 15 ++++++------
 .../filesystem/FsCheckpointStorageAccess.java      |  6 ++---
 .../filesystem/FsCheckpointStreamFactory.java      | 11 +++++----
 .../filesystem/FsCheckpointStateToolsetTest.java   | 12 ++++++----
 .../filesystem/FsCheckpointStorageAccessTest.java  | 10 ++++----
 6 files changed, 37 insertions(+), 44 deletions(-)

diff --git 
a/flink-core/src/main/java/org/apache/flink/core/fs/DuplicatingFileSystem.java 
b/flink-core/src/main/java/org/apache/flink/core/fs/DuplicatingFileSystem.java
index 3313fbc3a41..d0cdc4d8877 100644
--- 
a/flink-core/src/main/java/org/apache/flink/core/fs/DuplicatingFileSystem.java
+++ 
b/flink-core/src/main/java/org/apache/flink/core/fs/DuplicatingFileSystem.java
@@ -22,33 +22,18 @@ import java.io.IOException;
 import java.util.List;
 
 /**
- * An extension interface for {@link FileSystem FileSystems} that can perform 
cheap DFS side
- * duplicate operation. Such an operation can improve the time required for 
creating cheaply
- * independent snapshots from incremental snapshots.
+ * This interface is no longer used. Implementing it doesn't have any effect. 
Please migrate to
+ * {@link PathsCopyingFileSystem} which provides the same functionality.
  */
+@Deprecated
 public interface DuplicatingFileSystem {
-    /**
-     * Tells if we can perform duplicate/copy between given paths.
-     *
-     * <p>This should be a rather cheap operation, preferably not involving 
any remote accesses. You
-     * can check e.g. if both paths are on the same host.
-     *
-     * @param source The path of the source file to duplicate
-     * @param destination The path where to duplicate the source file
-     * @return true, if we can perform the duplication
-     */
+    /** Please use {@link PathsCopyingFileSystem#canCopyPaths(Path, Path)}. */
     boolean canFastDuplicate(Path source, Path destination) throws IOException;
 
-    /**
-     * Duplicates the source path into the destination path.
-     *
-     * <p>You should first check if you can duplicate with {@link 
#canFastDuplicate(Path, Path)}.
-     *
-     * @param requests Pairs of src/dst to copy.
-     */
+    /** Please use {@link PathsCopyingFileSystem#copyFiles(List, 
ICloseableRegistry)}. */
     void duplicate(List<CopyRequest> requests) throws IOException;
 
-    /** A pair of source and destination to duplicate a file. */
+    /** Please use {@link PathsCopyingFileSystem.CopyRequest}. */
     interface CopyRequest {
         /** The path of the source file to duplicate. */
         Path getSource();
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/state/filesystem/FsCheckpointStateToolset.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/state/filesystem/FsCheckpointStateToolset.java
index 1f8423c1646..eee0d7ba60d 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/state/filesystem/FsCheckpointStateToolset.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/state/filesystem/FsCheckpointStateToolset.java
@@ -18,10 +18,11 @@
 
 package org.apache.flink.runtime.state.filesystem;
 
-import org.apache.flink.core.fs.DuplicatingFileSystem;
-import org.apache.flink.core.fs.DuplicatingFileSystem.CopyRequest;
+import org.apache.flink.core.fs.CloseableRegistry;
 import org.apache.flink.core.fs.EntropyInjector;
 import org.apache.flink.core.fs.Path;
+import org.apache.flink.core.fs.PathsCopyingFileSystem;
+import org.apache.flink.core.fs.PathsCopyingFileSystem.CopyRequest;
 import org.apache.flink.runtime.state.CheckpointStateToolset;
 import org.apache.flink.runtime.state.StreamStateHandle;
 
@@ -33,14 +34,14 @@ import java.util.stream.IntStream;
 
 /**
  * An implementation of {@link CheckpointStateToolset} that does file based 
duplicating with as
- * {@link DuplicatingFileSystem}.
+ * {@link PathsCopyingFileSystem}.
  */
 public class FsCheckpointStateToolset implements CheckpointStateToolset {
 
     private final Path basePath;
-    private final DuplicatingFileSystem fs;
+    private final PathsCopyingFileSystem fs;
 
-    public FsCheckpointStateToolset(Path basePath, DuplicatingFileSystem fs) {
+    public FsCheckpointStateToolset(Path basePath, PathsCopyingFileSystem fs) {
         this.basePath = basePath;
         this.fs = fs;
     }
@@ -52,7 +53,7 @@ public class FsCheckpointStateToolset implements 
CheckpointStateToolset {
         }
         final Path srcPath = ((FileStateHandle) stateHandle).getFilePath();
         final Path dst = getNewDstPath(srcPath.getName());
-        return fs.canFastDuplicate(srcPath, dst);
+        return fs.canCopyPaths(srcPath, dst);
     }
 
     @Override
@@ -67,7 +68,7 @@ public class FsCheckpointStateToolset implements 
CheckpointStateToolset {
             final Path srcPath = ((FileStateHandle) handle).getFilePath();
             requests.add(CopyRequest.of(srcPath, 
getNewDstPath(srcPath.getName())));
         }
-        fs.duplicate(requests);
+        fs.copyFiles(requests, new CloseableRegistry());
 
         return IntStream.range(0, stateHandles.size())
                 .mapToObj(
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/state/filesystem/FsCheckpointStorageAccess.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/state/filesystem/FsCheckpointStorageAccess.java
index c32ff6c3892..56f4205d0b1 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/state/filesystem/FsCheckpointStorageAccess.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/state/filesystem/FsCheckpointStorageAccess.java
@@ -20,9 +20,9 @@ package org.apache.flink.runtime.state.filesystem;
 
 import org.apache.flink.annotation.VisibleForTesting;
 import org.apache.flink.api.common.JobID;
-import org.apache.flink.core.fs.DuplicatingFileSystem;
 import org.apache.flink.core.fs.FileSystem;
 import org.apache.flink.core.fs.Path;
+import org.apache.flink.core.fs.PathsCopyingFileSystem;
 import 
org.apache.flink.runtime.checkpoint.filemerging.FileMergingSnapshotManager;
 import org.apache.flink.runtime.execution.Environment;
 import org.apache.flink.runtime.state.CheckpointStateOutputStream;
@@ -213,9 +213,9 @@ public class FsCheckpointStorageAccess extends 
AbstractFsCheckpointStorageAccess
 
     @Override
     public CheckpointStateToolset createTaskOwnedCheckpointStateToolset() {
-        if (fileSystem instanceof DuplicatingFileSystem) {
+        if (fileSystem instanceof PathsCopyingFileSystem) {
             return new FsCheckpointStateToolset(
-                    taskOwnedStateDirectory, (DuplicatingFileSystem) 
fileSystem);
+                    taskOwnedStateDirectory, (PathsCopyingFileSystem) 
fileSystem);
         } else {
             return new NotDuplicatingCheckpointStateToolset();
         }
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/state/filesystem/FsCheckpointStreamFactory.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/state/filesystem/FsCheckpointStreamFactory.java
index e1167775b92..07d872766ed 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/state/filesystem/FsCheckpointStreamFactory.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/state/filesystem/FsCheckpointStreamFactory.java
@@ -18,13 +18,13 @@
 
 package org.apache.flink.runtime.state.filesystem;
 
-import org.apache.flink.core.fs.DuplicatingFileSystem;
 import org.apache.flink.core.fs.EntropyInjector;
 import org.apache.flink.core.fs.FSDataOutputStream;
 import org.apache.flink.core.fs.FileSystem;
 import org.apache.flink.core.fs.FileSystem.WriteMode;
 import org.apache.flink.core.fs.OutputStreamAndPath;
 import org.apache.flink.core.fs.Path;
+import org.apache.flink.core.fs.PathsCopyingFileSystem;
 import org.apache.flink.runtime.state.CheckpointStateOutputStream;
 import org.apache.flink.runtime.state.CheckpointStreamFactory;
 import org.apache.flink.runtime.state.CheckpointedStateScope;
@@ -132,12 +132,13 @@ public class FsCheckpointStreamFactory implements 
CheckpointStreamFactory {
         this.sharedStateDirectory = checkNotNull(sharedStateDirectory);
         this.fileStateThreshold = fileStateSizeThreshold;
         this.writeBufferSize = writeBufferSize;
-        if (fileSystem instanceof DuplicatingFileSystem) {
-            final DuplicatingFileSystem duplicatingFileSystem = 
(DuplicatingFileSystem) fileSystem;
+        if (fileSystem instanceof PathsCopyingFileSystem) {
+            final PathsCopyingFileSystem pathsCopyingFileSystem =
+                    (PathsCopyingFileSystem) fileSystem;
             this.privateStateToolset =
-                    new FsCheckpointStateToolset(checkpointDirectory, 
duplicatingFileSystem);
+                    new FsCheckpointStateToolset(checkpointDirectory, 
pathsCopyingFileSystem);
             this.sharedStateToolset =
-                    new FsCheckpointStateToolset(sharedStateDirectory, 
duplicatingFileSystem);
+                    new FsCheckpointStateToolset(sharedStateDirectory, 
pathsCopyingFileSystem);
         } else {
             this.privateStateToolset = null;
             this.sharedStateToolset = null;
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/state/filesystem/FsCheckpointStateToolsetTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/state/filesystem/FsCheckpointStateToolsetTest.java
index d9dafe55540..d547fbb8fee 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/state/filesystem/FsCheckpointStateToolsetTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/state/filesystem/FsCheckpointStateToolsetTest.java
@@ -18,10 +18,12 @@
 
 package org.apache.flink.runtime.state.filesystem;
 
-import org.apache.flink.core.fs.DuplicatingFileSystem;
+import org.apache.flink.core.fs.ICloseableRegistry;
 import org.apache.flink.core.fs.Path;
+import org.apache.flink.core.fs.PathsCopyingFileSystem;
 import org.apache.flink.runtime.state.StreamStateHandle;
 import org.apache.flink.runtime.state.memory.ByteStreamStateHandle;
+import org.apache.flink.testutils.TestFileSystem;
 
 import org.junit.jupiter.api.Test;
 
@@ -90,14 +92,16 @@ class FsCheckpointStateToolsetTest {
                                 new Path("test-path", "test-file3"), 
"test-file3", 0));
     }
 
-    private static final class TestDuplicatingFileSystem implements 
DuplicatingFileSystem {
+    private static final class TestDuplicatingFileSystem extends TestFileSystem
+            implements PathsCopyingFileSystem {
 
         @Override
-        public boolean canFastDuplicate(Path source, Path destination) throws 
IOException {
+        public boolean canCopyPaths(Path source, Path destination) throws 
IOException {
             return !source.equals(destination);
         }
 
         @Override
-        public void duplicate(List<CopyRequest> requests) throws IOException {}
+        public void copyFiles(List<CopyRequest> requests, ICloseableRegistry 
closeableRegistry)
+                throws IOException {}
     }
 }
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/state/filesystem/FsCheckpointStorageAccessTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/state/filesystem/FsCheckpointStorageAccessTest.java
index 75d42dcbfe2..306fed51c6c 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/state/filesystem/FsCheckpointStorageAccessTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/state/filesystem/FsCheckpointStorageAccessTest.java
@@ -19,9 +19,10 @@
 package org.apache.flink.runtime.state.filesystem;
 
 import org.apache.flink.api.common.JobID;
-import org.apache.flink.core.fs.DuplicatingFileSystem;
 import org.apache.flink.core.fs.FileSystem;
+import org.apache.flink.core.fs.ICloseableRegistry;
 import org.apache.flink.core.fs.Path;
+import org.apache.flink.core.fs.PathsCopyingFileSystem;
 import org.apache.flink.core.fs.local.LocalFileSystem;
 import org.apache.flink.runtime.state.CheckpointStateOutputStream;
 import org.apache.flink.runtime.state.CheckpointStorageAccess;
@@ -324,15 +325,16 @@ class FsCheckpointStorageAccessTest extends 
AbstractFileCheckpointStorageAccessT
     }
 
     private static final class TestDuplicatingFileSystem extends TestFileSystem
-            implements DuplicatingFileSystem {
+            implements PathsCopyingFileSystem {
 
         @Override
-        public boolean canFastDuplicate(Path source, Path destination) throws 
IOException {
+        public boolean canCopyPaths(Path source, Path destination) throws 
IOException {
             return !source.equals(destination);
         }
 
         @Override
-        public void duplicate(List<CopyRequest> requests) throws IOException {}
+        public void copyFiles(List<CopyRequest> requests, ICloseableRegistry 
closeableRegistry)
+                throws IOException {}
     }
 
     // ------------------------------------------------------------------------

Reply via email to