rkhachatryan commented on code in PR #25028:
URL: https://github.com/apache/flink/pull/25028#discussion_r1668622658
##########
flink-filesystems/flink-s3-fs-base/src/main/java/org/apache/flink/fs/s3/common/AbstractS3FileSystemFactory.java:
##########
@@ -41,6 +42,41 @@
/** Base class for file system factories that create S3 file systems. */
public abstract class AbstractS3FileSystemFactory implements FileSystemFactory
{
+ public static final ConfigOption<String> ACCESS_KEY =
+ ConfigOptions.key("s3.access-key")
Review Comment:
I couldn't find generated docs for these options (all s3 options).
Should we start generating in this hotfix commit?
##########
flink-core/src/main/java/org/apache/flink/util/FileUtils.java:
##########
@@ -138,17 +140,27 @@ public static String readFile(File file, String
charsetName) throws IOException
return new String(bytes, charsetName);
}
+ public static String readFile(File file, Charset charset) throws
IOException {
+ byte[] bytes = readAllBytes(file.toPath());
+ return new String(bytes, charset);
+ }
+
public static String readFileUtf8(File file) throws IOException {
- return readFile(file, "UTF-8");
+ return readFile(file, StandardCharsets.UTF_8);
}
public static void writeFile(File file, String contents, String encoding)
throws IOException {
byte[] bytes = contents.getBytes(encoding);
Files.write(file.toPath(), bytes, StandardOpenOption.WRITE);
}
Review Comment:
This method seems to be unused now.
##########
flink-runtime/src/main/java/org/apache/flink/runtime/state/StreamStateHandle.java:
##########
@@ -38,6 +38,14 @@ public interface StreamStateHandle extends StateObject {
/** @return Content of this handle as bytes array if it is already in
memory. */
Optional<byte[]> asBytesIfInMemory();
+ /**
+ * @return Path to an underlying file represented by this {@link
StreamStateHandle} or {@link
+ * Optional#empty()} if there is no such file.
+ */
+ default Optional<org.apache.flink.core.fs.Path> maybeGetPath() {
+ return Optional.empty();
+ }
Review Comment:
Should this also be implemented by `DirectoryStreamStateHandle` and
`SegmentFileStateHandle`?
##########
flink-filesystems/flink-s3-fs-base/src/main/java/org/apache/flink/fs/s3/common/FlinkS3FileSystem.java:
##########
@@ -101,13 +237,154 @@ public FlinkS3FileSystem(
this.s3AccessHelper = s3UploadHelper;
this.uploadThreadPool = Executors.newCachedThreadPool();
- Preconditions.checkArgument(s3uploadPartSize >=
S3_MULTIPART_MIN_PART_SIZE);
+ checkArgument(s3uploadPartSize >= S3_MULTIPART_MIN_PART_SIZE);
this.s3uploadPartSize = s3uploadPartSize;
this.maxConcurrentUploadsPerStream = maxConcurrentUploadsPerStream;
+ LOG.info("Created Flink S3 FS, s5Cmd configuration: {}",
s5CmdConfiguration);
}
// ------------------------------------------------------------------------
+ @Override
+ public boolean canCopyPaths(Path source, Path destination) {
+ return canCopyPaths();
+ }
+
+ private boolean canCopyPaths() {
+ return s5CmdConfiguration != null;
+ }
Review Comment:
1. Should we also check that one is remote and one is local? (IIRC, this is
s5cmd requirement)
2. Use public method everywhere and inline the private one?
##########
flink-filesystems/flink-s3-fs-base/src/test/java/org/apache/flink/fs/s3/common/MinioTestContainer.java:
##########
@@ -113,16 +114,16 @@ private String getHttpEndpoint() {
* relevant parameter to access the {@code Minio} instance.
*/
public void setS3ConfigOptions(Configuration config) {
- config.setString(FLINK_CONFIG_S3_ENDPOINT, getHttpEndpoint());
+ config.set(AbstractS3FileSystemFactory.ENDPOINT, getHttpEndpoint());
Review Comment:
nit: re-order commits so that the use of options goes after their
introduction?
Currently, I see it as
```
[hotfix] Use newly defined ConfigOptions in MinioTestContainer
[hotfix] Move CompressionUtils to flink-core
[hotfix] Create ConfigOptions for s3 access/secret keys and endpoint
```
##########
flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBStateDownloader.java:
##########
@@ -94,46 +116,103 @@ public void transferAllStateDataToDirectory(
}
}
- /** Asynchronously runs the specified download requests on
executorService. */
- private Stream<CompletableFuture<Void>>
transferAllStateDataToDirectoryAsync(
- Collection<StateHandleDownloadSpec> handleWithPaths,
+ private Collection<Runnable> createDownloadRunnables(
+ Collection<StateHandleDownloadSpec> downloadRequests,
+ CloseableRegistry closeableRegistry)
+ throws IOException {
+ // We need to support recovery from multiple FileSystems. At least one
scenario that it can
+ // happen is when:
+ // 1. A checkpoint/savepoint is created on FileSystem_1
+ // 2. Job terminates
+ // 3. Configuration is changed use checkpoint directory using
FileSystem_2
+ // 4. Job is restarted from checkpoint (1.) using claim mode
+ // 5. New incremental checkpoint is created, that can refer to files
both from FileSystem_1
+ // and FileSystem_2.
+ Map<FileSystem.FSKey, List<CopyRequest>> filesSystemsFilesToDownload =
new HashMap<>();
+ List<Runnable> runnables = new ArrayList<>();
+
+ for (StateHandleDownloadSpec downloadSpec : downloadRequests) {
+ for (HandleAndLocalPath handleAndLocalPath :
getAllHandles(downloadSpec)) {
+ Path downloadDestination =
+ downloadSpec
+ .getDownloadDestination()
+ .resolve(handleAndLocalPath.getLocalPath());
+ if (canCopyPaths(handleAndLocalPath)) {
+ org.apache.flink.core.fs.Path remotePath =
+
handleAndLocalPath.getHandle().maybeGetPath().get();
+ FileSystem.FSKey newFSKey = new
FileSystem.FSKey(remotePath.toUri());
+ List<CopyRequest> filesToDownload =
+ filesSystemsFilesToDownload.computeIfAbsent(
+ newFSKey, fsKey -> new ArrayList<>());
+ filesToDownload.add(
+ CopyRequest.of(
+ remotePath,
+ new org.apache.flink.core.fs.Path(
+ downloadDestination.toUri())));
Review Comment:
nit: inlining `filesToDownload` makes this code more readable for me
##########
flink-filesystems/flink-s3-fs-base/src/main/java/org/apache/flink/fs/s3/common/FlinkS3FileSystem.java:
##########
@@ -101,13 +237,154 @@ public FlinkS3FileSystem(
this.s3AccessHelper = s3UploadHelper;
this.uploadThreadPool = Executors.newCachedThreadPool();
- Preconditions.checkArgument(s3uploadPartSize >=
S3_MULTIPART_MIN_PART_SIZE);
+ checkArgument(s3uploadPartSize >= S3_MULTIPART_MIN_PART_SIZE);
this.s3uploadPartSize = s3uploadPartSize;
this.maxConcurrentUploadsPerStream = maxConcurrentUploadsPerStream;
+ LOG.info("Created Flink S3 FS, s5Cmd configuration: {}",
s5CmdConfiguration);
}
// ------------------------------------------------------------------------
+ @Override
+ public boolean canCopyPaths(Path source, Path destination) {
+ return canCopyPaths();
+ }
+
+ private boolean canCopyPaths() {
+ return s5CmdConfiguration != null;
+ }
+
+ @Override
+ public void copyFiles(List<CopyRequest> requests, ICloseableRegistry
closeableRegistry)
+ throws IOException {
+ checkState(canCopyPaths(), "#downloadFiles has been called illegally");
+ List<String> artefacts = new ArrayList<>();
+ artefacts.add(s5CmdConfiguration.path);
+ artefacts.addAll(s5CmdConfiguration.args);
+ artefacts.add("run");
+ castSpell(convertToSpells(requests), closeableRegistry,
artefacts.toArray(new String[0]));
+ }
+
+ private List<String> convertToSpells(List<CopyRequest> requests) throws
IOException {
+ List<String> spells = new ArrayList<>();
+ for (CopyRequest request : requests) {
+
Files.createDirectories(Paths.get(request.getDestination().toUri()).getParent());
+ spells.add(
+ String.format(
+ "cp %s %s",
+ request.getSource().toUri().toString(),
+ request.getDestination().getPath()));
+ }
+ return spells;
+ }
+
+ private void castSpell(
+ List<String> spells, ICloseableRegistry closeableRegistry,
String... artefacts)
+ throws IOException {
+ LOG.info("Casting spell: {}", Arrays.toString(artefacts));
+ int exitCode = 0;
+ final AtomicReference<IOException> maybeCloseableRegistryException =
+ new AtomicReference<>();
+
+ // Setup temporary working directory for the process
+ File tmpWorkingDir = new File(localTmpDir, "s5cmd_" +
UUID.randomUUID());
+ java.nio.file.Path tmpWorkingPath =
Files.createDirectories(tmpWorkingDir.toPath());
+
+ try {
+ // Redirect the process input/output to files. Communicating
directly through a
+ // stream can lead to blocking and undefined behavior if the
underlying process is
+ // killed (known Java problem).
+ ProcessBuilder hogwart = new
ProcessBuilder(artefacts).directory(tmpWorkingDir);
+ s5CmdConfiguration.configureEnvironment(hogwart.environment());
+ File inScrolls = new File(tmpWorkingDir, "s5cmd_input");
+ Preconditions.checkState(inScrolls.createNewFile());
+ File outScrolls = new File(tmpWorkingDir, "s5cmd_output");
+ Preconditions.checkState(outScrolls.createNewFile());
+
+ FileUtils.writeFileUtf8(inScrolls,
String.join(System.lineSeparator(), spells));
Review Comment:
I think,
a line separator after the last string is necessary
because the file content serves as input to a process
and similar to input from a terminal it needs a newline to take
effect
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]