pnowojski commented on code in PR #25028:
URL: https://github.com/apache/flink/pull/25028#discussion_r1668797419


##########
flink-filesystems/flink-s3-fs-base/src/main/java/org/apache/flink/fs/s3/common/AbstractS3FileSystemFactory.java:
##########
@@ -41,6 +42,41 @@
 /** Base class for file system factories that create S3 file systems. */
 public abstract class AbstractS3FileSystemFactory implements FileSystemFactory 
{
 
+    public static final ConfigOption<String> ACCESS_KEY =
+            ConfigOptions.key("s3.access-key")

Review Comment:
   > Should we start generating in this hotfix commit?
   
   Probably a good idea, but I guess that would require some larger re-work of 
the docs. I presume currently the docs, including the list of configuration 
parameters, are manually crafted, without any part that is automatically 
generated.
   
   My motivation was to have those `ConfigOptions` in one place here, to avoid 
copy/pasting the keys in the code various of places, including tests. 



##########
flink-filesystems/flink-s3-fs-base/src/main/java/org/apache/flink/fs/s3/common/FlinkS3FileSystem.java:
##########
@@ -101,13 +237,154 @@ public FlinkS3FileSystem(
         this.s3AccessHelper = s3UploadHelper;
         this.uploadThreadPool = Executors.newCachedThreadPool();
 
-        Preconditions.checkArgument(s3uploadPartSize >= 
S3_MULTIPART_MIN_PART_SIZE);
+        checkArgument(s3uploadPartSize >= S3_MULTIPART_MIN_PART_SIZE);
         this.s3uploadPartSize = s3uploadPartSize;
         this.maxConcurrentUploadsPerStream = maxConcurrentUploadsPerStream;
+        LOG.info("Created Flink S3 FS, s5Cmd configuration: {}", 
s5CmdConfiguration);
     }
 
     // ------------------------------------------------------------------------
 
+    @Override
+    public boolean canCopyPaths(Path source, Path destination) {
+        return canCopyPaths();
+    }
+
+    private boolean canCopyPaths() {
+        return s5CmdConfiguration != null;
+    }

Review Comment:
   1. It seems to be (mostly) supported
   https://github.com/peak/s5cmd?tab=readme-ov-file#copy-objects-from-s3-to-s3
   
   2. The `checkState` below wouldn't work as nicely. Currently I `checkState` 
once per `copyFiles(...)` call, not once per every `CopyRequest`.



##########
flink-runtime/src/main/java/org/apache/flink/runtime/state/StreamStateHandle.java:
##########
@@ -38,6 +38,14 @@ public interface StreamStateHandle extends StateObject {
     /** @return Content of this handle as bytes array if it is already in 
memory. */
     Optional<byte[]> asBytesIfInMemory();
 
+    /**
+     * @return Path to an underlying file represented by this {@link 
StreamStateHandle} or {@link
+     *     Optional#empty()} if there is no such file.
+     */
+    default Optional<org.apache.flink.core.fs.Path> maybeGetPath() {
+        return Optional.empty();
+    }

Review Comment:
   It looks like `DirectoryStreamStateHandle` can not be used for recovery any 
way due to:
   ```
       public FSDataInputStream openInputStream() {
           throw new UnsupportedOperationException();
       }
   ```
   Also in that case I think we would have to add a special handling of 
directories (The `s5cmd` 
[docs](https://github.com/peak/s5cmd?tab=readme-ov-file#download-multiple-s3-objects)
 do not state that clearly, but it looks like for download, and only for 
download, you need to add a wildcard `*`?)
   
   Direct copy files for `SegmentFileStateHandle` might be less efficient, if 
roughly speaking, the desired segment is < 50% of the underlying file. So I'm 
not sure if I would use it. Maybe in the future we should add some code letting 
the filesystem to decide if it's worth copying given state handle or not? 🤔 
Moving more logic of deciding how to download handles from the 
`RocksDBDownloader` to `FileSystem`?



##########
flink-core/src/main/java/org/apache/flink/util/FileUtils.java:
##########
@@ -138,17 +140,27 @@ public static String readFile(File file, String 
charsetName) throws IOException
         return new String(bytes, charsetName);
     }
 
+    public static String readFile(File file, Charset charset) throws 
IOException {
+        byte[] bytes = readAllBytes(file.toPath());
+        return new String(bytes, charset);
+    }
+
     public static String readFileUtf8(File file) throws IOException {
-        return readFile(file, "UTF-8");
+        return readFile(file, StandardCharsets.UTF_8);
     }
 
     public static void writeFile(File file, String contents, String encoding) 
throws IOException {
         byte[] bytes = contents.getBytes(encoding);
         Files.write(file.toPath(), bytes, StandardOpenOption.WRITE);
     }

Review Comment:
   I presume it was kept for the sake of completeness? Or should we deprecate 
the `String` versions? 
   
   I see that the only `readFile(File, String)` could also be removed/replaced 
with `readFileUtf8`.



##########
flink-filesystems/flink-s3-fs-base/src/test/java/org/apache/flink/fs/s3/common/MinioTestContainer.java:
##########
@@ -113,16 +114,16 @@ private String getHttpEndpoint() {
      * relevant parameter to access the {@code Minio} instance.
      */
     public void setS3ConfigOptions(Configuration config) {
-        config.setString(FLINK_CONFIG_S3_ENDPOINT, getHttpEndpoint());
+        config.set(AbstractS3FileSystemFactory.ENDPOINT, getHttpEndpoint());

Review Comment:
   Huh? For me this ("[hotfix] Use newly defined ConfigOptions in 
MinioTestContainer") is the 3rd commit



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to