Izeren commented on code in PR #28136:
URL: https://github.com/apache/flink/pull/28136#discussion_r3258967623
##########
flink-filesystems/flink-s3-fs-native/src/main/java/org/apache/flink/fs/s3native/NativeS3FileSystemFactory.java:
##########
@@ -292,6 +292,27 @@ public class NativeS3FileSystemFactory implements
FileSystemFactory {
+ "When not set, the default chain is
used: delegation tokens -> "
+ "static credentials (if configured) ->
DefaultCredentialsProvider.");
+ public static final ConfigOption<Boolean> CRT_ENABLED =
+ ConfigOptions.key("s3.crt.enabled")
+ .booleanType()
+ .defaultValue(false)
+ .withDescription(
+ "Enable AWS Common Runtime (CRT) HTTP transport. "
+ + "When true, uses AwsCrtHttpClient for
sync S3 operations and "
+ + "S3AsyncClient.crtBuilder() for
async/transfer operations, "
+ + "providing higher throughput for large
S3 transfers. "
+ + "Requires aws-crt-client and aws-crt
JARs in the plugin directory "
+ + "(they are not bundled in the fat JAR
due to JNI shading constraints).");
+
+ public static final ConfigOption<Double> CRT_TARGET_THROUGHPUT_GBPS =
+ ConfigOptions.key("s3.crt.target-throughput-gbps")
+ .doubleType()
+ .defaultValue(10.0)
+ .withDescription(
+ "Target throughput in Gbps for the CRT-based S3
async client. "
+ + "Only used when s3.crt.enabled is true. "
+ + "Higher values allow more concurrent
transfers.");
Review Comment:
It sounds like this we establish the link between throughput and number of
writing threads. Is it how CRT implementation abstracts it away? Or should we
allow more granular configurability in flink itself?
##########
flink-filesystems/flink-s3-fs-native/src/main/java/org/apache/flink/fs/s3native/S3ClientProvider.java:
##########
@@ -326,6 +345,11 @@ public static class Builder {
// Custom credentials provider class names (comma-separated)
@Nullable private String credentialsProviderClasses;
+ // CRT configuration
+ private boolean useCrt = false;
+ private double crtTargetThroughputGbps = 10.0;
+ private long crtMinPartSizeInBytes = 5L << 20; // 5MB
Review Comment:
+1, we should define default values once
##########
flink-filesystems/flink-s3-fs-native/src/main/java/org/apache/flink/fs/s3native/NativeS3FileSystemFactory.java:
##########
@@ -292,6 +292,27 @@ public class NativeS3FileSystemFactory implements
FileSystemFactory {
+ "When not set, the default chain is
used: delegation tokens -> "
+ "static credentials (if configured) ->
DefaultCredentialsProvider.");
+ public static final ConfigOption<Boolean> CRT_ENABLED =
+ ConfigOptions.key("s3.crt.enabled")
+ .booleanType()
+ .defaultValue(false)
Review Comment:
I guess we introduce the option to allow soft configuration based switch
over? Sounds good to me
##########
flink-filesystems/flink-s3-fs-native/src/test/java/org/apache/flink/fs/s3native/S3ClientProviderTest.java:
##########
@@ -203,6 +203,27 @@ void testEmptyProviderStringThrows() {
.hasMessageContaining("no valid provider class names");
}
+ @Test
+ void testCrtDisabledByDefault() {
+ S3ClientProvider provider =
+
S3ClientProvider.builder().endpoint(DUMMY_ENDPOINT).region(DUMMY_REGION).build();
+ assertThat(provider.isUseCrt()).isFalse();
+ }
+
+ @Test
+ void testCrtFlagIsRecorded() {
+ S3ClientProvider provider =
+ S3ClientProvider.builder()
+ .endpoint(DUMMY_ENDPOINT)
+ .region(DUMMY_REGION)
+ .useCrt(true)
+ .crtTargetThroughputGbps(20.0)
+ .build();
+
+ assertThat(provider.isUseCrt()).isTrue();
+ assertThat(provider.getCrtTargetThroughputGbps()).isEqualTo(20.0);
Review Comment:
+1, we don't check here that http client has been configured to use crt this
way.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]