This is an automated email from the ASF dual-hosted git repository.
clintropolis pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/druid.git
The following commit(s) were added to refs/heads/master by this push:
new b210c99149c minor: scale AWSClientConfig default maxConnections with
available processors (#19536)
b210c99149c is described below
commit b210c99149c6378e585fb76af51b7a6798fc9c6b
Author: Clint Wylie <[email protected]>
AuthorDate: Tue Jun 2 01:23:56 2026 -0700
minor: scale AWSClientConfig default maxConnections with available
processors (#19536)
changes:
* `AWSClientConfig` now defaults `maxConnections` to scale with available
processors `(max(50, 4 * cores))` to be in sync with virtual storage mode
historical download thread pool size
* tests with artificial `RuntimeInfo` to cover the config scaling
---
.../apache/druid/common/aws/AWSClientConfig.java | 28 ++++++++++--
.../druid/common/aws/AWSClientConfigTest.java | 53 +++++++++++++++++++++-
.../druid/storage/s3/S3StorageDruidModuleTest.java | 17 +++----
3 files changed, 85 insertions(+), 13 deletions(-)
diff --git
a/cloud/aws-common/src/main/java/org/apache/druid/common/aws/AWSClientConfig.java
b/cloud/aws-common/src/main/java/org/apache/druid/common/aws/AWSClientConfig.java
index e8d299cbd85..9fdf9ba592f 100644
---
a/cloud/aws-common/src/main/java/org/apache/druid/common/aws/AWSClientConfig.java
+++
b/cloud/aws-common/src/main/java/org/apache/druid/common/aws/AWSClientConfig.java
@@ -19,7 +19,9 @@
package org.apache.druid.common.aws;
+import com.fasterxml.jackson.annotation.JacksonInject;
import com.fasterxml.jackson.annotation.JsonProperty;
+import org.apache.druid.utils.RuntimeInfo;
import javax.annotation.Nullable;
@@ -31,7 +33,17 @@ public class AWSClientConfig
private static final int DEFAULT_CONNECTION_TIMEOUT_MILLIS = 10_000;
private static final int DEFAULT_SOCKET_TIMEOUT_MILLIS = 50_000;
- private static final int DEFAULT_MAX_CONNECTIONS = 50;
+ /** AWS SDK v2's own default. */
+ private static final int DEFAULT_MAX_CONNECTIONS_FLOOR = 50;
+
+ /**
+ * Used by {@link #getMaxConnections} to scale the default connection pool
with host size so hosts large enough to
+ * do a lot of concurrent deep-storage I/O (e.g. virtual-storage historicals
fanning out on-demand loads to S3)
+ * aren't bottlenecked at the SDK's connection pool. The field initializer
covers direct construction (no Jackson);
+ * Jackson overwrites with the injected {@link RuntimeInfo} during
deserialization.
+ */
+ @JacksonInject
+ private final RuntimeInfo runtimeInfo = new RuntimeInfo();
@JsonProperty
private String protocol = "https"; // The default of aws-java-sdk
@@ -60,8 +72,13 @@ public class AWSClientConfig
@JsonProperty
private int socketTimeout = DEFAULT_SOCKET_TIMEOUT_MILLIS;
+ /**
+ * Null means use the dynamic default in {@link #getMaxConnections} ({@code
max(50, 4 × availableProcessors)});
+ * any explicit value set in JSON wins.
+ */
@JsonProperty
- private int maxConnections = DEFAULT_MAX_CONNECTIONS;
+ @Nullable
+ private Integer maxConnections = null;
public String getProtocol()
{
@@ -123,7 +140,10 @@ public class AWSClientConfig
public int getMaxConnections()
{
- return maxConnections;
+ if (maxConnections != null) {
+ return maxConnections;
+ }
+ return Math.max(DEFAULT_MAX_CONNECTIONS_FLOOR, 4 *
runtimeInfo.getAvailableProcessors());
}
@Override
@@ -136,7 +156,7 @@ public class AWSClientConfig
", crossRegionAccessEnabled=" + isCrossRegionAccessEnabled() +
", connectionTimeout=" + connectionTimeout +
", socketTimeout=" + socketTimeout +
- ", maxConnections=" + maxConnections +
+ ", maxConnections=" + getMaxConnections() +
'}';
}
}
diff --git
a/cloud/aws-common/src/test/java/org/apache/druid/common/aws/AWSClientConfigTest.java
b/cloud/aws-common/src/test/java/org/apache/druid/common/aws/AWSClientConfigTest.java
index 4e8837566ca..99b927efb2b 100644
---
a/cloud/aws-common/src/test/java/org/apache/druid/common/aws/AWSClientConfigTest.java
+++
b/cloud/aws-common/src/test/java/org/apache/druid/common/aws/AWSClientConfigTest.java
@@ -19,13 +19,24 @@
package org.apache.druid.common.aws;
+import com.fasterxml.jackson.databind.InjectableValues;
import com.fasterxml.jackson.databind.ObjectMapper;
+import org.apache.druid.utils.RuntimeInfo;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.Test;
public class AWSClientConfigTest
{
- private static final ObjectMapper MAPPER = new ObjectMapper();
+ private static final ObjectMapper MAPPER = new
ObjectMapper().setInjectableValues(
+ new InjectableValues.Std().addValue(RuntimeInfo.class, new RuntimeInfo())
+ );
+
+ private static ObjectMapper mapperWithRuntimeInfo(RuntimeInfo runtimeInfo)
+ {
+ return new ObjectMapper().setInjectableValues(
+ new InjectableValues.Std().addValue(RuntimeInfo.class, runtimeInfo)
+ );
+ }
@Test
public void testDefaultCrossRegionAccessEnabled() throws Exception
@@ -83,4 +94,44 @@ public class AWSClientConfigTest
Assertions.assertNull(config.isForceGlobalBucketAccessEnabled());
Assertions.assertTrue(config.isCrossRegionAccessEnabled());
}
+
+ @Test
+ public void testDefaultMaxConnectionsKeepsAwsSdkFloorOnSmallHost() throws
Exception
+ {
+ AWSClientConfig config = mapperWithRuntimeInfo(new
FixedProcessorsRuntimeInfo(8))
+ .readValue("{}", AWSClientConfig.class);
+ Assertions.assertEquals(50, config.getMaxConnections());
+ }
+
+ @Test
+ public void testDefaultMaxConnectionsScalesWithCoresOnLargeHost() throws
Exception
+ {
+ AWSClientConfig config = mapperWithRuntimeInfo(new
FixedProcessorsRuntimeInfo(32))
+ .readValue("{}", AWSClientConfig.class);
+ Assertions.assertEquals(128, config.getMaxConnections());
+ }
+
+ @Test
+ public void testExplicitMaxConnectionsOverridesDefault() throws Exception
+ {
+ AWSClientConfig config = mapperWithRuntimeInfo(new
FixedProcessorsRuntimeInfo(64))
+ .readValue("{\"maxConnections\": 200}", AWSClientConfig.class);
+ Assertions.assertEquals(200, config.getMaxConnections());
+ }
+
+ private static final class FixedProcessorsRuntimeInfo extends RuntimeInfo
+ {
+ private final int availableProcessors;
+
+ private FixedProcessorsRuntimeInfo(int availableProcessors)
+ {
+ this.availableProcessors = availableProcessors;
+ }
+
+ @Override
+ public int getAvailableProcessors()
+ {
+ return availableProcessors;
+ }
+ }
}
diff --git
a/extensions-core/s3-extensions/src/test/java/org/apache/druid/storage/s3/S3StorageDruidModuleTest.java
b/extensions-core/s3-extensions/src/test/java/org/apache/druid/storage/s3/S3StorageDruidModuleTest.java
index 3932c147695..5d4fc4f188f 100644
---
a/extensions-core/s3-extensions/src/test/java/org/apache/druid/storage/s3/S3StorageDruidModuleTest.java
+++
b/extensions-core/s3-extensions/src/test/java/org/apache/druid/storage/s3/S3StorageDruidModuleTest.java
@@ -19,11 +19,12 @@
package org.apache.druid.storage.s3;
-import com.google.common.collect.ImmutableList;
+import com.google.inject.Guice;
import com.google.inject.Injector;
import org.apache.druid.common.aws.AWSModule;
-import org.apache.druid.guice.GuiceInjectors;
+import org.apache.druid.guice.DruidSecondaryModule;
import org.apache.druid.guice.ServerModule;
+import org.apache.druid.guice.StartupInjectorBuilder;
import org.apache.druid.segment.loading.OmniDataSegmentArchiver;
import org.apache.druid.segment.loading.OmniDataSegmentKiller;
import org.apache.druid.segment.loading.OmniDataSegmentMover;
@@ -70,12 +71,12 @@ public class S3StorageDruidModuleTest
private static Injector createInjector()
{
- return GuiceInjectors.makeStartupInjectorWithModules(
- ImmutableList.of(
- new AWSModule(),
- new S3StorageDruidModule(),
- new ServerModule()
- )
+ final Injector startupInjector = new
StartupInjectorBuilder().forServer().build();
+ return Guice.createInjector(
+ startupInjector.getInstance(DruidSecondaryModule.class),
+ new AWSModule(),
+ new S3StorageDruidModule(),
+ new ServerModule()
);
}
}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]