This is an automated email from the ASF dual-hosted git repository. pnowojski pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/flink.git
commit 3b31886d0c06caf2fe62b4687ca3819d2d27d302 Author: Efrat Levitan <[email protected]> AuthorDate: Thu May 28 10:49:12 2026 +0300 [FLINK-39811][s3] Optionally Adjust s5cmd part size to fully ustilize s5cmd workers pool during downloads Unlike the upload path (recoverableWriter) which sets S3_MULTIPART_MIN_PART_SIZE of 5 MiB[1], we don't set any part size for s5cmd so downloads part size fall back to its default of 50MiB[2]. As a result, the effective concurrency is capped to batchSize / 50MiB, underutilizing the worker pool for downloaded batchSizes below (50*5 MiB) (5 being s5cmd default concurrency[3]) s3 sdk source ref[4] showing N(=concurrency) channels are initialized but only (batchsize/partsize) of them are assigned a range to read. [1]https://github.com/apache/flink/blob/master/flink-filesystems/flink-s3-fs-base/src/main/java/org/apache/flink/fs/s3/common/FlinkS3FileSystem.java\#L93 [2]https://github.com/peak/s5cmd/blob/54d6a8a955688f07e5acc40d61f9c42ceac6c33b/command/cp.go\#L30 [3]https://github.com/peak/s5cmd/blob/54d6a8a955688f07e5acc40d61f9c42ceac6c33b/command/cp.go\#L29 [4]https://github.com/aws/aws-sdk-go/blob/070853e88d22854d2355c2543d0958a5f76ad407/service/s3/s3manager/download.go\#L329-L338 --- .../fs/s3/common/AbstractS3FileSystemFactory.java | 13 ++++ .../flink/fs/s3/common/FlinkS3FileSystem.java | 39 ++++++++++-- .../flink/fs/s3/common/FlinkS3FileSystemTest.java | 73 ++++++++++++++++++++++ 3 files changed, 121 insertions(+), 4 deletions(-) diff --git a/flink-filesystems/flink-s3-fs-base/src/main/java/org/apache/flink/fs/s3/common/AbstractS3FileSystemFactory.java b/flink-filesystems/flink-s3-fs-base/src/main/java/org/apache/flink/fs/s3/common/AbstractS3FileSystemFactory.java index 4a6846a4df2..c96e7109694 100644 --- a/flink-filesystems/flink-s3-fs-base/src/main/java/org/apache/flink/fs/s3/common/AbstractS3FileSystemFactory.java +++ b/flink-filesystems/flink-s3-fs-base/src/main/java/org/apache/flink/fs/s3/common/AbstractS3FileSystemFactory.java @@ -90,6 +90,19 @@ public abstract class AbstractS3FileSystemFactory implements FileSystemFactory { .defaultValue(100) .withDescription("Maximum number of files to download per one call to s5cmd"); + public static final ConfigOption<Boolean> S5CMD_ADJUST_PART_SIZE = + ConfigOptions.key("s3.s5cmd.adjust-part-size") + .booleanType() + .defaultValue(true) + .withDescription( + "When set to true, s5cmd will dynamically adjust the part size for copy commands to fully utilize all " + + FlinkS3FileSystem.DEFAULT_S5CMD_CONCURRENCY + + " channels. " + + "A smaller part size will improve operations speed but might incur in increased S3 request costs. " + + "When set to false, part size will default to " + + FlinkS3FileSystem.DEFAULT_S5CMD_PART_SIZE_MB + + " MiB."); + public static final ConfigOption<Long> PART_UPLOAD_MIN_SIZE = ConfigOptions.key("s3.upload.min.part.size") .longType() diff --git a/flink-filesystems/flink-s3-fs-base/src/main/java/org/apache/flink/fs/s3/common/FlinkS3FileSystem.java b/flink-filesystems/flink-s3-fs-base/src/main/java/org/apache/flink/fs/s3/common/FlinkS3FileSystem.java index c8888690c5e..f2a7cca5104 100644 --- a/flink-filesystems/flink-s3-fs-base/src/main/java/org/apache/flink/fs/s3/common/FlinkS3FileSystem.java +++ b/flink-filesystems/flink-s3-fs-base/src/main/java/org/apache/flink/fs/s3/common/FlinkS3FileSystem.java @@ -61,6 +61,7 @@ import java.util.concurrent.atomic.AtomicReference; import static org.apache.flink.fs.s3.common.AbstractS3FileSystemFactory.ACCESS_KEY; import static org.apache.flink.fs.s3.common.AbstractS3FileSystemFactory.ENDPOINT; +import static org.apache.flink.fs.s3.common.AbstractS3FileSystemFactory.S5CMD_ADJUST_PART_SIZE; import static org.apache.flink.fs.s3.common.AbstractS3FileSystemFactory.S5CMD_BATCH_MAX_FILES; import static org.apache.flink.fs.s3.common.AbstractS3FileSystemFactory.S5CMD_BATCH_MAX_SIZE; import static org.apache.flink.fs.s3.common.AbstractS3FileSystemFactory.S5CMD_EXTRA_ARGS; @@ -92,6 +93,12 @@ public class FlinkS3FileSystem extends HadoopFileSystem /** The minimum size of a part in the multipart upload, except for the last part: 5 MIBytes. */ public static final long S3_MULTIPART_MIN_PART_SIZE = 5L << 20; + /** The maximum allowed part size by AWS: 5 GIBytes. */ + public static final long S3_MULTIPART_MAX_PART_SIZE = 5L << 30; + + public static final long DEFAULT_S5CMD_PART_SIZE_MB = 50; + public static final long DEFAULT_S5CMD_CONCURRENCY = 5; + private final String localTmpDir; private final FunctionWithException<File, RefCountedFileWithStream, IOException> tmpFileCreator; @@ -113,6 +120,7 @@ public class FlinkS3FileSystem extends HadoopFileSystem @Nullable private final String accessArtifact; @Nullable private final String secretArtifact; @Nullable private final String endpoint; + private final boolean adjustPartSize; private long maxBatchSizeFiles; private long maxBatchSizeBytes; @@ -124,7 +132,8 @@ public class FlinkS3FileSystem extends HadoopFileSystem @Nullable String secretArtifact, @Nullable String endpoint, int maxBatchSizeFiles, - long maxBatchSizeBytes) { + long maxBatchSizeBytes, + boolean adjustPartSize) { if (!path.isEmpty()) { File s5CmdFile = new File(path); checkArgument(s5CmdFile.isFile(), "Unable to find s5cmd binary under [%s]", path); @@ -138,6 +147,7 @@ public class FlinkS3FileSystem extends HadoopFileSystem this.endpoint = endpoint; this.maxBatchSizeFiles = maxBatchSizeFiles; this.maxBatchSizeBytes = maxBatchSizeBytes; + this.adjustPartSize = adjustPartSize; } public static Optional<S5CmdConfiguration> of(Configuration flinkConfig) { @@ -152,7 +162,8 @@ public class FlinkS3FileSystem extends HadoopFileSystem flinkConfig.get(SECRET_KEY), flinkConfig.get(ENDPOINT), flinkConfig.get(S5CMD_BATCH_MAX_FILES), - flinkConfig.get(S5CMD_BATCH_MAX_SIZE).getBytes())); + flinkConfig.get(S5CMD_BATCH_MAX_SIZE).getBytes(), + flinkConfig.get(S5CMD_ADJUST_PART_SIZE))); } private void configureEnvironment(Map<String, String> environment) { @@ -204,6 +215,12 @@ public class FlinkS3FileSystem extends HadoopFileSystem + ", endpoint='" + endpoint + '\'' + + ", maxBatchSizeFiles=" + + maxBatchSizeFiles + + ", maxBatchSizeBytes=" + + maxBatchSizeBytes + + ", adjustPartSize=" + + adjustPartSize + '}'; } } @@ -301,13 +318,27 @@ public class FlinkS3FileSystem extends HadoopFileSystem } } + private long partSizeFrom(long fileSizeBytes) { + return Math.max( + S3_MULTIPART_MIN_PART_SIZE, + Math.min( + S3_MULTIPART_MAX_PART_SIZE, + fileSizeBytes / DEFAULT_S5CMD_CONCURRENCY)) + / (1L << 20); + } + private List<String> convertToSpells(List<CopyRequest> requests) throws IOException { List<String> spells = new ArrayList<>(); for (CopyRequest request : requests) { Files.createDirectories(Paths.get(request.getDestination().toUri()).getParent()); + final long partSize = + s5CmdConfiguration.adjustPartSize + ? partSizeFrom(request.getSize()) + : DEFAULT_S5CMD_PART_SIZE_MB; spells.add( String.format( - "cp %s %s", + "cp --part-size %s %s %s", + partSize, request.getSource().toUri().toString(), request.getDestination().getPath())); } @@ -320,7 +351,7 @@ public class FlinkS3FileSystem extends HadoopFileSystem int exitCode = 0; final AtomicReference<IOException> maybeCloseableRegistryException = new AtomicReference<>(); - + LOG.debug("Casting spells: {}", spells); // Setup temporary working directory for the process File tmpWorkingDir = new File(localTmpDir, "s5cmd_" + UUID.randomUUID()); java.nio.file.Path tmpWorkingPath = Files.createDirectories(tmpWorkingDir.toPath()); diff --git a/flink-filesystems/flink-s3-fs-base/src/test/java/org/apache/flink/fs/s3/common/FlinkS3FileSystemTest.java b/flink-filesystems/flink-s3-fs-base/src/test/java/org/apache/flink/fs/s3/common/FlinkS3FileSystemTest.java index 05c43f0e78b..4a47e908803 100644 --- a/flink-filesystems/flink-s3-fs-base/src/test/java/org/apache/flink/fs/s3/common/FlinkS3FileSystemTest.java +++ b/flink-filesystems/flink-s3-fs-base/src/test/java/org/apache/flink/fs/s3/common/FlinkS3FileSystemTest.java @@ -33,6 +33,8 @@ import org.junit.jupiter.api.condition.EnabledOnOs; import org.junit.jupiter.api.condition.OS; import org.junit.jupiter.api.io.TempDir; import org.junit.jupiter.params.ParameterizedTest; +import org.junit.jupiter.params.provider.Arguments; +import org.junit.jupiter.params.provider.MethodSource; import org.junit.jupiter.params.provider.ValueSource; import javax.annotation.Nonnull; @@ -53,6 +55,7 @@ import java.util.stream.IntStream; import static org.apache.flink.fs.s3.common.AbstractS3FileSystemFactory.ACCESS_KEY; import static org.apache.flink.fs.s3.common.AbstractS3FileSystemFactory.ENDPOINT; +import static org.apache.flink.fs.s3.common.AbstractS3FileSystemFactory.S5CMD_ADJUST_PART_SIZE; import static org.apache.flink.fs.s3.common.AbstractS3FileSystemFactory.S5CMD_BATCH_MAX_FILES; import static org.apache.flink.fs.s3.common.AbstractS3FileSystemFactory.S5CMD_EXTRA_ARGS; import static org.apache.flink.fs.s3.common.AbstractS3FileSystemFactory.S5CMD_PATH; @@ -185,4 +188,74 @@ class FlinkS3FileSystemTest { totalSubcommands, tasks) .isEqualTo(numFiles); } + + private static List<Arguments> partSizeTestArguments() { + return List.of( + Arguments.of(true, 123L * (1L << 20), 24L), + Arguments.of( + true, + 25_605L * (1L << 20), + FlinkS3FileSystem.S3_MULTIPART_MAX_PART_SIZE / (1L << 20)), + Arguments.of( + true, + 25_600L * (1L << 20), + FlinkS3FileSystem.S3_MULTIPART_MAX_PART_SIZE / (1L << 20)), + Arguments.of( + true, + 10L * (1L << 20), + FlinkS3FileSystem.S3_MULTIPART_MIN_PART_SIZE / (1L << 20)), + Arguments.of( + false, 123L * (1L << 20), FlinkS3FileSystem.DEFAULT_S5CMD_PART_SIZE_MB)); + } + + @ParameterizedTest + @MethodSource("partSizeTestArguments") + @EnabledOnOs({OS.LINUX, OS.MAC}) // POSIX OS only to run shell script + public void testCopyUsesPartSize( + boolean adjustPartSize, + long copyRequestSize, + long expectedPartSize, + @TempDir File temporaryDirectory) + throws Exception { + File cmdFile = new File(temporaryDirectory, "cmd"); + File inputToCmd = new File(temporaryDirectory, "input"); + Preconditions.checkState(inputToCmd.mkdir()); + + String cmd = + String.format( + "file=$(mktemp %s/s5cmd-input-XXX)\n" + + "while read line; do echo $line >> $file; done < /dev/stdin", + inputToCmd.getAbsolutePath()); + + FileUtils.writeStringToFile(cmdFile, cmd); + Preconditions.checkState(cmdFile.setExecutable(true), "Cannot set script file executable."); + + final Configuration conf = new Configuration(); + conf.set(S5CMD_PATH, cmdFile.getAbsolutePath()); + conf.set(S5CMD_EXTRA_ARGS, ""); + conf.set(S5CMD_ADJUST_PART_SIZE, adjustPartSize); + conf.set(ACCESS_KEY, "test-access-key"); + conf.set(SECRET_KEY, "test-secret-key"); + conf.set(ENDPOINT, "test-endpoint"); + + TestS3FileSystemFactory factory = new TestS3FileSystemFactory(); + factory.configure(conf); + + FlinkS3FileSystem fs = (FlinkS3FileSystem) factory.create(new URI("s3://test")); + List<CopyRequest> tasks = + Collections.singletonList( + CopyRequest.of( + new Path("file:///src-file"), + new Path("file:///dst-file"), + copyRequestSize)); + fs.copyFiles(tasks, ICloseableRegistry.NO_OP); + File[] files = inputToCmd.listFiles(); + Assertions.assertThat(files).isNotNull().hasSize(1); + List<String> subcommands = FileUtils.readLines(files[0], StandardCharsets.UTF_8); + Assertions.assertThat(subcommands).hasSize(1); + String command = subcommands.get(0); + Assertions.assertThat(command) + .describedAs("s5cmd command should contain --part-size %d", expectedPartSize) + .contains("--part-size " + expectedPartSize); + } }
