This is an automated email from the ASF dual-hosted git repository.
pnowojski pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git
The following commit(s) were added to refs/heads/master by this push:
new 56c81995d3b [FLINK-35772][filesystem] Drop DuplicatingFileSystem in
favour of the newer PathsCopyingFileSystem
56c81995d3b is described below
commit 56c81995d3b34ed9066b6771755407b93438f5ab
Author: Piotr Nowojski <[email protected]>
AuthorDate: Tue Aug 20 14:48:15 2024 +0200
[FLINK-35772][filesystem] Drop DuplicatingFileSystem in favour of the newer
PathsCopyingFileSystem
---
.../flink/core/fs/DuplicatingFileSystem.java | 27 +++++-----------------
.../state/filesystem/FsCheckpointStateToolset.java | 15 ++++++------
.../filesystem/FsCheckpointStorageAccess.java | 6 ++---
.../filesystem/FsCheckpointStreamFactory.java | 11 +++++----
.../filesystem/FsCheckpointStateToolsetTest.java | 12 ++++++----
.../filesystem/FsCheckpointStorageAccessTest.java | 10 ++++----
6 files changed, 37 insertions(+), 44 deletions(-)
diff --git
a/flink-core/src/main/java/org/apache/flink/core/fs/DuplicatingFileSystem.java
b/flink-core/src/main/java/org/apache/flink/core/fs/DuplicatingFileSystem.java
index 3313fbc3a41..d0cdc4d8877 100644
---
a/flink-core/src/main/java/org/apache/flink/core/fs/DuplicatingFileSystem.java
+++
b/flink-core/src/main/java/org/apache/flink/core/fs/DuplicatingFileSystem.java
@@ -22,33 +22,18 @@ import java.io.IOException;
import java.util.List;
/**
- * An extension interface for {@link FileSystem FileSystems} that can perform
cheap DFS side
- * duplicate operation. Such an operation can improve the time required for
creating cheaply
- * independent snapshots from incremental snapshots.
+ * This interface is no longer used. Implementing it doesn't have any effect.
Please migrate to
+ * {@link PathsCopyingFileSystem} which provides the same functionality.
*/
+@Deprecated
public interface DuplicatingFileSystem {
- /**
- * Tells if we can perform duplicate/copy between given paths.
- *
- * <p>This should be a rather cheap operation, preferably not involving
any remote accesses. You
- * can check e.g. if both paths are on the same host.
- *
- * @param source The path of the source file to duplicate
- * @param destination The path where to duplicate the source file
- * @return true, if we can perform the duplication
- */
+ /** Please use {@link PathsCopyingFileSystem#canCopyPaths(Path, Path)}. */
boolean canFastDuplicate(Path source, Path destination) throws IOException;
- /**
- * Duplicates the source path into the destination path.
- *
- * <p>You should first check if you can duplicate with {@link
#canFastDuplicate(Path, Path)}.
- *
- * @param requests Pairs of src/dst to copy.
- */
+ /** Please use {@link PathsCopyingFileSystem#copyFiles(List,
ICloseableRegistry)}. */
void duplicate(List<CopyRequest> requests) throws IOException;
- /** A pair of source and destination to duplicate a file. */
+ /** Please use {@link PathsCopyingFileSystem.CopyRequest}. */
interface CopyRequest {
/** The path of the source file to duplicate. */
Path getSource();
diff --git
a/flink-runtime/src/main/java/org/apache/flink/runtime/state/filesystem/FsCheckpointStateToolset.java
b/flink-runtime/src/main/java/org/apache/flink/runtime/state/filesystem/FsCheckpointStateToolset.java
index 1f8423c1646..eee0d7ba60d 100644
---
a/flink-runtime/src/main/java/org/apache/flink/runtime/state/filesystem/FsCheckpointStateToolset.java
+++
b/flink-runtime/src/main/java/org/apache/flink/runtime/state/filesystem/FsCheckpointStateToolset.java
@@ -18,10 +18,11 @@
package org.apache.flink.runtime.state.filesystem;
-import org.apache.flink.core.fs.DuplicatingFileSystem;
-import org.apache.flink.core.fs.DuplicatingFileSystem.CopyRequest;
+import org.apache.flink.core.fs.CloseableRegistry;
import org.apache.flink.core.fs.EntropyInjector;
import org.apache.flink.core.fs.Path;
+import org.apache.flink.core.fs.PathsCopyingFileSystem;
+import org.apache.flink.core.fs.PathsCopyingFileSystem.CopyRequest;
import org.apache.flink.runtime.state.CheckpointStateToolset;
import org.apache.flink.runtime.state.StreamStateHandle;
@@ -33,14 +34,14 @@ import java.util.stream.IntStream;
/**
* An implementation of {@link CheckpointStateToolset} that does file based
duplicating with as
- * {@link DuplicatingFileSystem}.
+ * {@link PathsCopyingFileSystem}.
*/
public class FsCheckpointStateToolset implements CheckpointStateToolset {
private final Path basePath;
- private final DuplicatingFileSystem fs;
+ private final PathsCopyingFileSystem fs;
- public FsCheckpointStateToolset(Path basePath, DuplicatingFileSystem fs) {
+ public FsCheckpointStateToolset(Path basePath, PathsCopyingFileSystem fs) {
this.basePath = basePath;
this.fs = fs;
}
@@ -52,7 +53,7 @@ public class FsCheckpointStateToolset implements
CheckpointStateToolset {
}
final Path srcPath = ((FileStateHandle) stateHandle).getFilePath();
final Path dst = getNewDstPath(srcPath.getName());
- return fs.canFastDuplicate(srcPath, dst);
+ return fs.canCopyPaths(srcPath, dst);
}
@Override
@@ -67,7 +68,7 @@ public class FsCheckpointStateToolset implements
CheckpointStateToolset {
final Path srcPath = ((FileStateHandle) handle).getFilePath();
requests.add(CopyRequest.of(srcPath,
getNewDstPath(srcPath.getName())));
}
- fs.duplicate(requests);
+ fs.copyFiles(requests, new CloseableRegistry());
return IntStream.range(0, stateHandles.size())
.mapToObj(
diff --git
a/flink-runtime/src/main/java/org/apache/flink/runtime/state/filesystem/FsCheckpointStorageAccess.java
b/flink-runtime/src/main/java/org/apache/flink/runtime/state/filesystem/FsCheckpointStorageAccess.java
index c32ff6c3892..56f4205d0b1 100644
---
a/flink-runtime/src/main/java/org/apache/flink/runtime/state/filesystem/FsCheckpointStorageAccess.java
+++
b/flink-runtime/src/main/java/org/apache/flink/runtime/state/filesystem/FsCheckpointStorageAccess.java
@@ -20,9 +20,9 @@ package org.apache.flink.runtime.state.filesystem;
import org.apache.flink.annotation.VisibleForTesting;
import org.apache.flink.api.common.JobID;
-import org.apache.flink.core.fs.DuplicatingFileSystem;
import org.apache.flink.core.fs.FileSystem;
import org.apache.flink.core.fs.Path;
+import org.apache.flink.core.fs.PathsCopyingFileSystem;
import
org.apache.flink.runtime.checkpoint.filemerging.FileMergingSnapshotManager;
import org.apache.flink.runtime.execution.Environment;
import org.apache.flink.runtime.state.CheckpointStateOutputStream;
@@ -213,9 +213,9 @@ public class FsCheckpointStorageAccess extends
AbstractFsCheckpointStorageAccess
@Override
public CheckpointStateToolset createTaskOwnedCheckpointStateToolset() {
- if (fileSystem instanceof DuplicatingFileSystem) {
+ if (fileSystem instanceof PathsCopyingFileSystem) {
return new FsCheckpointStateToolset(
- taskOwnedStateDirectory, (DuplicatingFileSystem)
fileSystem);
+ taskOwnedStateDirectory, (PathsCopyingFileSystem)
fileSystem);
} else {
return new NotDuplicatingCheckpointStateToolset();
}
diff --git
a/flink-runtime/src/main/java/org/apache/flink/runtime/state/filesystem/FsCheckpointStreamFactory.java
b/flink-runtime/src/main/java/org/apache/flink/runtime/state/filesystem/FsCheckpointStreamFactory.java
index e1167775b92..07d872766ed 100644
---
a/flink-runtime/src/main/java/org/apache/flink/runtime/state/filesystem/FsCheckpointStreamFactory.java
+++
b/flink-runtime/src/main/java/org/apache/flink/runtime/state/filesystem/FsCheckpointStreamFactory.java
@@ -18,13 +18,13 @@
package org.apache.flink.runtime.state.filesystem;
-import org.apache.flink.core.fs.DuplicatingFileSystem;
import org.apache.flink.core.fs.EntropyInjector;
import org.apache.flink.core.fs.FSDataOutputStream;
import org.apache.flink.core.fs.FileSystem;
import org.apache.flink.core.fs.FileSystem.WriteMode;
import org.apache.flink.core.fs.OutputStreamAndPath;
import org.apache.flink.core.fs.Path;
+import org.apache.flink.core.fs.PathsCopyingFileSystem;
import org.apache.flink.runtime.state.CheckpointStateOutputStream;
import org.apache.flink.runtime.state.CheckpointStreamFactory;
import org.apache.flink.runtime.state.CheckpointedStateScope;
@@ -132,12 +132,13 @@ public class FsCheckpointStreamFactory implements
CheckpointStreamFactory {
this.sharedStateDirectory = checkNotNull(sharedStateDirectory);
this.fileStateThreshold = fileStateSizeThreshold;
this.writeBufferSize = writeBufferSize;
- if (fileSystem instanceof DuplicatingFileSystem) {
- final DuplicatingFileSystem duplicatingFileSystem =
(DuplicatingFileSystem) fileSystem;
+ if (fileSystem instanceof PathsCopyingFileSystem) {
+ final PathsCopyingFileSystem pathsCopyingFileSystem =
+ (PathsCopyingFileSystem) fileSystem;
this.privateStateToolset =
- new FsCheckpointStateToolset(checkpointDirectory,
duplicatingFileSystem);
+ new FsCheckpointStateToolset(checkpointDirectory,
pathsCopyingFileSystem);
this.sharedStateToolset =
- new FsCheckpointStateToolset(sharedStateDirectory,
duplicatingFileSystem);
+ new FsCheckpointStateToolset(sharedStateDirectory,
pathsCopyingFileSystem);
} else {
this.privateStateToolset = null;
this.sharedStateToolset = null;
diff --git
a/flink-runtime/src/test/java/org/apache/flink/runtime/state/filesystem/FsCheckpointStateToolsetTest.java
b/flink-runtime/src/test/java/org/apache/flink/runtime/state/filesystem/FsCheckpointStateToolsetTest.java
index d9dafe55540..d547fbb8fee 100644
---
a/flink-runtime/src/test/java/org/apache/flink/runtime/state/filesystem/FsCheckpointStateToolsetTest.java
+++
b/flink-runtime/src/test/java/org/apache/flink/runtime/state/filesystem/FsCheckpointStateToolsetTest.java
@@ -18,10 +18,12 @@
package org.apache.flink.runtime.state.filesystem;
-import org.apache.flink.core.fs.DuplicatingFileSystem;
+import org.apache.flink.core.fs.ICloseableRegistry;
import org.apache.flink.core.fs.Path;
+import org.apache.flink.core.fs.PathsCopyingFileSystem;
import org.apache.flink.runtime.state.StreamStateHandle;
import org.apache.flink.runtime.state.memory.ByteStreamStateHandle;
+import org.apache.flink.testutils.TestFileSystem;
import org.junit.jupiter.api.Test;
@@ -90,14 +92,16 @@ class FsCheckpointStateToolsetTest {
new Path("test-path", "test-file3"),
"test-file3", 0));
}
- private static final class TestDuplicatingFileSystem implements
DuplicatingFileSystem {
+ private static final class TestDuplicatingFileSystem extends TestFileSystem
+ implements PathsCopyingFileSystem {
@Override
- public boolean canFastDuplicate(Path source, Path destination) throws
IOException {
+ public boolean canCopyPaths(Path source, Path destination) throws
IOException {
return !source.equals(destination);
}
@Override
- public void duplicate(List<CopyRequest> requests) throws IOException {}
+ public void copyFiles(List<CopyRequest> requests, ICloseableRegistry
closeableRegistry)
+ throws IOException {}
}
}
diff --git
a/flink-runtime/src/test/java/org/apache/flink/runtime/state/filesystem/FsCheckpointStorageAccessTest.java
b/flink-runtime/src/test/java/org/apache/flink/runtime/state/filesystem/FsCheckpointStorageAccessTest.java
index 75d42dcbfe2..306fed51c6c 100644
---
a/flink-runtime/src/test/java/org/apache/flink/runtime/state/filesystem/FsCheckpointStorageAccessTest.java
+++
b/flink-runtime/src/test/java/org/apache/flink/runtime/state/filesystem/FsCheckpointStorageAccessTest.java
@@ -19,9 +19,10 @@
package org.apache.flink.runtime.state.filesystem;
import org.apache.flink.api.common.JobID;
-import org.apache.flink.core.fs.DuplicatingFileSystem;
import org.apache.flink.core.fs.FileSystem;
+import org.apache.flink.core.fs.ICloseableRegistry;
import org.apache.flink.core.fs.Path;
+import org.apache.flink.core.fs.PathsCopyingFileSystem;
import org.apache.flink.core.fs.local.LocalFileSystem;
import org.apache.flink.runtime.state.CheckpointStateOutputStream;
import org.apache.flink.runtime.state.CheckpointStorageAccess;
@@ -324,15 +325,16 @@ class FsCheckpointStorageAccessTest extends
AbstractFileCheckpointStorageAccessT
}
private static final class TestDuplicatingFileSystem extends TestFileSystem
- implements DuplicatingFileSystem {
+ implements PathsCopyingFileSystem {
@Override
- public boolean canFastDuplicate(Path source, Path destination) throws
IOException {
+ public boolean canCopyPaths(Path source, Path destination) throws
IOException {
return !source.equals(destination);
}
@Override
- public void duplicate(List<CopyRequest> requests) throws IOException {}
+ public void copyFiles(List<CopyRequest> requests, ICloseableRegistry
closeableRegistry)
+ throws IOException {}
}
// ------------------------------------------------------------------------