This is an automated email from the ASF dual-hosted git repository.
abhishek pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/druid.git
The following commit(s) were added to refs/heads/master by this push:
new 7bf1d8c Facilitate lazy initialization of connections to mitigate
overwhelming of Coordinator (#12298)
7bf1d8c is described below
commit 7bf1d8c5c0adda27b94000f29113ad8915a23a4a
Author: AmatyaAvadhanula <[email protected]>
AuthorDate: Wed Mar 9 23:17:43 2022 +0530
Facilitate lazy initialization of connections to mitigate overwhelming of
Coordinator (#12298)
Add config for eager / lazy connection initialization in ResourcePool
Description
Currently, when multiple tasks are launched, each of them eagerly
initializes a full pool's worth of connections to the coordinator.
While this is acceptable when the parameter for number of eagerConnections
(== maxSize) is small, this can be problematic in environments where it's a
large value (say 1000) and multiple tasks are launched simultaneously, which
can cause a large number of connections to be created to the coordinator,
thereby overwhelming it.
Patch
Nodes like the broker may require eager initialization of resources and do
not create connections with the Coordinator.
It is unnecessary to do this with other types of nodes.
A config parameter eagerInitialization is added, which when set to true,
initializes the max permissible connections when ResourcePool is initialized.
If set to false, lazy initialization of connection resources takes place.
NOTE: All nodes except the broker have this new parameter set to false in
the quickstart as part of this PR
Algorithm
The current implementation relies on the creation of maxSize resources
eagerly.
The new implementation's behaviour is as follows:
If a resource has been previously created and is available, lend it.
Else if the number of created resources is less than the allowed parameter,
create and lend it.
Else, wait for one of the lent resources to be returned.
---
.../java/util/http/client/HttpClientConfig.java | 16 ++
.../java/util/http/client/HttpClientInit.java | 3 +-
.../java/util/http/client/pool/ResourcePool.java | 140 ++++++++++------
.../util/http/client/pool/ResourcePoolTest.java | 183 ++++++++++++++++++++-
docs/configuration/index.md | 3 +
.../cluster/_common/common.runtime.properties | 5 +
.../large/_common/common.runtime.properties | 5 +
.../medium/_common/common.runtime.properties | 5 +
.../_common/common.runtime.properties | 5 +
.../_common/common.runtime.properties | 5 +
.../small/_common/common.runtime.properties | 5 +
.../xlarge/_common/common.runtime.properties | 5 +
.../druid/guice/http/DruidHttpClientConfig.java | 8 +
.../apache/druid/guice/http/HttpClientModule.java | 1 +
.../druid/server/initialization/BaseJettyTest.java | 7 +-
15 files changed, 344 insertions(+), 52 deletions(-)
diff --git
a/core/src/main/java/org/apache/druid/java/util/http/client/HttpClientConfig.java
b/core/src/main/java/org/apache/druid/java/util/http/client/HttpClientConfig.java
index 08e5dac..b652c0a 100644
---
a/core/src/main/java/org/apache/druid/java/util/http/client/HttpClientConfig.java
+++
b/core/src/main/java/org/apache/druid/java/util/http/client/HttpClientConfig.java
@@ -81,6 +81,7 @@ public class HttpClientConfig
}
private final int numConnections;
+ private final boolean eagerInitialization;
private final SSLContext sslContext;
private final HttpClientProxyConfig proxyConfig;
private final Duration readTimeout;
@@ -92,6 +93,7 @@ public class HttpClientConfig
private HttpClientConfig(
int numConnections,
+ boolean eagerInitialization,
SSLContext sslContext,
HttpClientProxyConfig proxyConfig,
Duration readTimeout,
@@ -103,6 +105,7 @@ public class HttpClientConfig
)
{
this.numConnections = numConnections;
+ this.eagerInitialization = eagerInitialization;
this.sslContext = sslContext;
this.proxyConfig = proxyConfig;
this.readTimeout = readTimeout;
@@ -118,6 +121,11 @@ public class HttpClientConfig
return numConnections;
}
+ public boolean isEagerInitialization()
+ {
+ return eagerInitialization;
+ }
+
public SSLContext getSslContext()
{
return sslContext;
@@ -161,6 +169,7 @@ public class HttpClientConfig
public static class Builder
{
private int numConnections = 1;
+ private boolean eagerInitialization = true;
private SSLContext sslContext = null;
private HttpClientProxyConfig proxyConfig = null;
private Duration readTimeout = null;
@@ -180,6 +189,12 @@ public class HttpClientConfig
return this;
}
+ public Builder withEagerInitialization(boolean eagerInitialization)
+ {
+ this.eagerInitialization = eagerInitialization;
+ return this;
+ }
+
public Builder withSslContext(SSLContext sslContext)
{
this.sslContext = sslContext;
@@ -226,6 +241,7 @@ public class HttpClientConfig
{
return new HttpClientConfig(
numConnections,
+ eagerInitialization,
sslContext,
proxyConfig,
readTimeout,
diff --git
a/core/src/main/java/org/apache/druid/java/util/http/client/HttpClientInit.java
b/core/src/main/java/org/apache/druid/java/util/http/client/HttpClientInit.java
index fd3ee80..602a8a6 100644
---
a/core/src/main/java/org/apache/druid/java/util/http/client/HttpClientInit.java
+++
b/core/src/main/java/org/apache/druid/java/util/http/client/HttpClientInit.java
@@ -91,7 +91,8 @@ public class HttpClientInit
new ResourcePoolConfig(
config.getNumConnections(),
config.getUnusedConnectionTimeoutDuration().getMillis()
- )
+ ),
+ config.isEagerInitialization()
),
config.getReadTimeout(),
config.getCompressionCodec(),
diff --git
a/core/src/main/java/org/apache/druid/java/util/http/client/pool/ResourcePool.java
b/core/src/main/java/org/apache/druid/java/util/http/client/pool/ResourcePool.java
index 8129796..7727bac 100644
---
a/core/src/main/java/org/apache/druid/java/util/http/client/pool/ResourcePool.java
+++
b/core/src/main/java/org/apache/druid/java/util/http/client/pool/ResourcePool.java
@@ -40,36 +40,49 @@ import java.util.concurrent.atomic.AtomicBoolean;
/**
* A resource pool based on {@link LoadingCache}. When a resource is first
requested for a new key,
- * all {@link ResourcePoolConfig#getMaxPerKey()} resources are initialized and
cached in the {@link #pool}.
- * The individual resource in {@link ImmediateCreationResourceHolder} is valid
while (current time - last access time)
+ * If the flag: eagerInitialization is true: use {@link
EagerCreationResourceHolder}
+ * {@link ResourcePoolConfig#getMaxPerKey()} resources are initialized and
cached in the {@link #pool}.
+ * Else:
+ * Initialize a single resource and further lazily using {@link
LazyCreationResourceHolder}
+ * The individual resource in {@link ResourceHolderPerKey} is valid while
(current time - last access time)
* <= {@link ResourcePoolConfig#getUnusedConnectionTimeoutMillis()}.
*
* A resource is closed and reinitialized if {@link ResourceFactory#isGood}
returns false or it's expired based on
* {@link ResourcePoolConfig#getUnusedConnectionTimeoutMillis()}.
*
* {@link ResourcePoolConfig#getMaxPerKey() is a hard limit for the max number
of resources per cache entry. The total
- * number of resources in {@link ImmediateCreationResourceHolder} cannot be
larger than the limit in any case.
+ * number of resources in {@link ResourceHolderPerKey} cannot be larger than
the limit in any case.
*/
public class ResourcePool<K, V> implements Closeable
{
private static final Logger log = new Logger(ResourcePool.class);
- private final LoadingCache<K, ImmediateCreationResourceHolder<K, V>> pool;
+ private final LoadingCache<K, ResourceHolderPerKey<K, V>> pool;
private final AtomicBoolean closed = new AtomicBoolean(false);
- public ResourcePool(final ResourceFactory<K, V> factory, final
ResourcePoolConfig config)
+ public ResourcePool(final ResourceFactory<K, V> factory, final
ResourcePoolConfig config,
+ final boolean eagerInitialization)
{
this.pool = CacheBuilder.newBuilder().build(
- new CacheLoader<K, ImmediateCreationResourceHolder<K, V>>()
+ new CacheLoader<K, ResourceHolderPerKey<K, V>>()
{
@Override
- public ImmediateCreationResourceHolder<K, V> load(K input)
+ public ResourceHolderPerKey<K, V> load(K input)
{
- return new ImmediateCreationResourceHolder<>(
- config.getMaxPerKey(),
- config.getUnusedConnectionTimeoutMillis(),
- input,
- factory
- );
+ if (eagerInitialization) {
+ return new EagerCreationResourceHolder<>(
+ config.getMaxPerKey(),
+ config.getUnusedConnectionTimeoutMillis(),
+ input,
+ factory
+ );
+ } else {
+ return new LazyCreationResourceHolder<>(
+ config.getMaxPerKey(),
+ config.getUnusedConnectionTimeoutMillis(),
+ input,
+ factory
+ );
+ }
}
}
);
@@ -86,7 +99,7 @@ public class ResourcePool<K, V> implements Closeable
return null;
}
- final ImmediateCreationResourceHolder<K, V> holder;
+ final ResourceHolderPerKey<K, V> holder;
try {
holder = pool.get(key);
}
@@ -138,11 +151,11 @@ public class ResourcePool<K, V> implements Closeable
public void close()
{
closed.set(true);
- final ConcurrentMap<K, ImmediateCreationResourceHolder<K, V>> mapView =
pool.asMap();
+ final ConcurrentMap<K, ResourceHolderPerKey<K, V>> mapView = pool.asMap();
Closer closer = Closer.create();
- for (Iterator<Map.Entry<K, ImmediateCreationResourceHolder<K, V>>>
iterator =
+ for (Iterator<Map.Entry<K, ResourceHolderPerKey<K, V>>> iterator =
mapView.entrySet().iterator(); iterator.hasNext(); ) {
- Map.Entry<K, ImmediateCreationResourceHolder<K, V>> e = iterator.next();
+ Map.Entry<K, ResourceHolderPerKey<K, V>> e = iterator.next();
iterator.remove();
closer.register(e.getValue());
}
@@ -154,30 +167,18 @@ public class ResourcePool<K, V> implements Closeable
}
}
- private static class ImmediateCreationResourceHolder<K, V> implements
Closeable
+ private static class EagerCreationResourceHolder<K, V> extends
LazyCreationResourceHolder<K, V>
{
- private final int maxSize;
- private final K key;
- private final ResourceFactory<K, V> factory;
- private final ArrayDeque<ResourceHolder<V>> resourceHolderList;
- private int deficit = 0;
- private boolean closed = false;
- private final long unusedResourceTimeoutMillis;
-
- private ImmediateCreationResourceHolder(
+ private EagerCreationResourceHolder(
int maxSize,
long unusedResourceTimeoutMillis,
K key,
ResourceFactory<K, V> factory
)
{
- this.maxSize = maxSize;
- this.key = key;
- this.factory = factory;
- this.unusedResourceTimeoutMillis = unusedResourceTimeoutMillis;
- this.resourceHolderList = new ArrayDeque<>();
-
- for (int i = 0; i < maxSize; ++i) {
+ super(maxSize, unusedResourceTimeoutMillis, key, factory);
+ // Eagerly Instantiate
+ for (int i = 0; i < maxSize; i++) {
resourceHolderList.add(
new ResourceHolder<>(
System.currentTimeMillis(),
@@ -189,17 +190,60 @@ public class ResourcePool<K, V> implements Closeable
);
}
}
+ }
+
+ private static class LazyCreationResourceHolder<K, V> extends
ResourceHolderPerKey<K, V>
+ {
+ private LazyCreationResourceHolder(
+ int maxSize,
+ long unusedResourceTimeoutMillis,
+ K key,
+ ResourceFactory<K, V> factory
+ )
+ {
+ super(maxSize, unusedResourceTimeoutMillis, key, factory);
+ }
+ }
+
+ private static class ResourceHolderPerKey<K, V> implements Closeable
+ {
+ protected final int maxSize;
+ private final K key;
+ private final ResourceFactory<K, V> factory;
+ private final long unusedResourceTimeoutMillis;
+ // Hold previously created / returned resources
+ protected final ArrayDeque<ResourceHolder<V>> resourceHolderList;
+ // To keep track of resources that have been successfully returned to
caller.
+ private int numLentResources = 0;
+ private boolean closed = false;
+
+ protected ResourceHolderPerKey(
+ int maxSize,
+ long unusedResourceTimeoutMillis,
+ K key,
+ ResourceFactory<K, V> factory
+ )
+ {
+ this.maxSize = maxSize;
+ this.key = key;
+ this.factory = factory;
+ this.unusedResourceTimeoutMillis = unusedResourceTimeoutMillis;
+ this.resourceHolderList = new ArrayDeque<>();
+ }
/**
* Returns a resource or null if this holder is already closed or the
current thread is interrupted.
+ *
+ * Try to return a previously created resource if it isGood(). Else,
generate a new resource
*/
@Nullable
V get()
{
- // resourceHolderList can't have nulls, so we'll use a null to signal
that we need to create a new resource.
final V poolVal;
+ // resourceHolderList can't have nulls, so we'll use a null to signal
that we need to create a new resource.
+ boolean expired = false;
synchronized (this) {
- while (!closed && resourceHolderList.size() == 0 && deficit == 0) {
+ while (!closed && (numLentResources == maxSize)) {
try {
this.wait();
}
@@ -212,26 +256,27 @@ public class ResourcePool<K, V> implements Closeable
if (closed) {
log.info(StringUtils.format("get() called even though I'm closed.
key[%s]", key));
return null;
- } else if (!resourceHolderList.isEmpty()) {
- ResourceHolder<V> holder = resourceHolderList.removeFirst();
- if (System.currentTimeMillis() - holder.getLastAccessedTime() >
unusedResourceTimeoutMillis) {
- factory.close(holder.getResource());
+ } else if (numLentResources < maxSize) {
+ // Attempt to take an existing resource or create one if list is
empty, and increment numLentResources
+ if (resourceHolderList.isEmpty()) {
poolVal = factory.generate(key);
} else {
+ ResourceHolder<V> holder = resourceHolderList.removeFirst();
poolVal = holder.getResource();
+ if (System.currentTimeMillis() - holder.getLastAccessedTime() >
unusedResourceTimeoutMillis) {
+ expired = true;
+ }
}
- } else if (deficit > 0) {
- deficit--;
- poolVal = null;
+ numLentResources++;
} else {
- throw new IllegalStateException("Unexpected state: No objects left,
and no object deficit");
+ throw new IllegalStateException("Unexpected state: More objects lent
than permissible");
}
}
- // At this point, we must either return a valid resource or increment
"deficit".
final V retVal;
+ // At this point, we must either return a valid resource. Or throw and
exception decrement "numLentResources"
try {
- if (poolVal != null && factory.isGood(poolVal)) {
+ if (poolVal != null && !expired && factory.isGood(poolVal)) {
retVal = poolVal;
} else {
if (poolVal != null) {
@@ -242,7 +287,7 @@ public class ResourcePool<K, V> implements Closeable
}
catch (Throwable e) {
synchronized (this) {
- deficit++;
+ numLentResources--;
this.notifyAll();
}
Throwables.propagateIfPossible(e);
@@ -288,6 +333,7 @@ public class ResourcePool<K, V> implements Closeable
}
resourceHolderList.addLast(new
ResourceHolder<>(System.currentTimeMillis(), object));
+ numLentResources--;
this.notifyAll();
}
}
diff --git
a/core/src/test/java/org/apache/druid/java/util/http/client/pool/ResourcePoolTest.java
b/core/src/test/java/org/apache/druid/java/util/http/client/pool/ResourcePoolTest.java
index b7e074c..2961f65 100644
---
a/core/src/test/java/org/apache/druid/java/util/http/client/pool/ResourcePoolTest.java
+++
b/core/src/test/java/org/apache/druid/java/util/http/client/pool/ResourcePoolTest.java
@@ -39,12 +39,23 @@ public class ResourcePoolTest
@Before
public void setUp()
{
+ setUpPool(true);
+ }
+
+ public void setUpPoolWithoutEagerInitialization()
+ {
+ setUpPool(false);
+ }
+
+ public void setUpPool(boolean eagerInitialization)
+ {
resourceFactory = (ResourceFactory<String, String>)
EasyMock.createMock(ResourceFactory.class);
EasyMock.replay(resourceFactory);
pool = new ResourcePool<String, String>(
resourceFactory,
- new ResourcePoolConfig(2, TimeUnit.MINUTES.toMillis(4))
+ new ResourcePoolConfig(2, TimeUnit.MINUTES.toMillis(4)),
+ eagerInitialization
);
EasyMock.verify(resourceFactory);
@@ -58,6 +69,171 @@ public class ResourcePoolTest
EasyMock.replay(resourceFactory);
}
+ @Test
+ public void testTakeOnce_lazy()
+ {
+ setUpPoolWithoutEagerInitialization();
+
+ EasyMock.expect(resourceFactory.generate("billy")).andAnswer(new
StringIncrementingAnswer("billy")).times(1);
+ EasyMock.expect(resourceFactory.isGood("billy0")).andReturn(true).times(1);
+ EasyMock.replay(resourceFactory);
+
+ ResourceContainer<String> billyString = pool.take("billy");
+ Assert.assertEquals("billy0", billyString.get());
+
+ billyString.returnResource();
+ }
+
+ @Test
+ public void testTakeAfterReturn_lazy()
+ {
+ setUpPoolWithoutEagerInitialization();
+
+ // Generate and check before return
+ EasyMock.expect(resourceFactory.generate("billy")).andAnswer(new
StringIncrementingAnswer("billy")).times(1);
+ EasyMock.expect(resourceFactory.isGood("billy0")).andReturn(true).times(1);
+ // Only check since there's no need to generate after return
+ EasyMock.expect(resourceFactory.isGood("billy0")).andReturn(true).times(1);
+ EasyMock.replay(resourceFactory);
+
+ ResourceContainer<String> billyString = pool.take("billy");
+ Assert.assertEquals("billy0", billyString.get());
+
+ billyString.returnResource();
+
+ billyString = pool.take("billy");
+ Assert.assertEquals("billy0", billyString.get());
+
+ billyString.returnResource();
+ }
+
+ @Test
+ public void testTakeAfterFailure()
+ {
+ EasyMock.expect(resourceFactory.generate("billy")).andReturn("billy0");
+ EasyMock.expect(resourceFactory.generate("billy")).andReturn("billy1");
+
+ EasyMock.expect(resourceFactory.isGood("billy0")).andThrow(new
RuntimeException("blah"));
+
+ EasyMock.expect(resourceFactory.isGood("billy1")).andThrow(new
RuntimeException("blah"));
+
+ EasyMock.expect(resourceFactory.generate("billy")).andReturn("billy2");
+ EasyMock.expect(resourceFactory.isGood("billy2")).andReturn(true);
+
+ EasyMock.expect(resourceFactory.generate("billy")).andReturn("billy3");
+ EasyMock.expect(resourceFactory.isGood("billy3")).andReturn(true);
+
+ EasyMock.expect(resourceFactory.isGood("billy2")).andReturn(true);
+
+ EasyMock.expect(resourceFactory.isGood("billy3")).andReturn(true);
+
+ EasyMock.expect(resourceFactory.isGood("billy2")).andReturn(true);
+
+ EasyMock.replay(resourceFactory);
+ // numLentResources == 0, resourceHolderList.size() == 2
+
+ try {
+ pool.take("billy");
+ }
+ catch (Exception e) {
+ }
+ // numLentResources == 0, resourceHolderList.size() == 1
+
+ try {
+ pool.take("billy");
+ }
+ catch (Exception e) {
+ }
+ // numLentResources == 0, resourceHolderList.size() == 0
+
+ ResourceContainer<String> a = pool.take("billy");
+ // numLentResources == 1, resourceHolderList.size() == 0
+
+ ResourceContainer<String> b = pool.take("billy");
+ // numLentResources == 2, resourceHolderList.size() == 0
+
+ a.returnResource();
+ // numLentResources = 1, resourceHolderList.size() == 1
+
+ a = pool.take("billy");
+ // numLentResources = 2, resourceHolderList.size() == 0
+
+ b.returnResource();
+ // numLentResources = 1, resourceHolderList.size() == 1
+
+ a.returnResource();
+ // numLentResources = 0, resourceHolderList.size() == 2
+ }
+
+ @Test
+ public void testTakeAfterFailure_lazy()
+ {
+ setUpPoolWithoutEagerInitialization();
+
+ EasyMock.expect(resourceFactory.generate("billy")).andReturn("billy0");
+ EasyMock.expect(resourceFactory.isGood("billy0")).andThrow(new
RuntimeException("blah"));
+
+ EasyMock.expect(resourceFactory.generate("billy")).andReturn("billy1");
+ EasyMock.expect(resourceFactory.isGood("billy1")).andThrow(new
RuntimeException("blah"));
+
+ EasyMock.expect(resourceFactory.generate("billy")).andThrow(new
RuntimeException("blah"));
+
+ EasyMock.expect(resourceFactory.generate("billy")).andReturn("billy3");
+ EasyMock.expect(resourceFactory.isGood("billy3")).andReturn(true);
+
+ EasyMock.expect(resourceFactory.generate("billy")).andReturn("billy4");
+ EasyMock.expect(resourceFactory.isGood("billy4")).andReturn(true);
+
+ EasyMock.expect(resourceFactory.isGood("billy3")).andReturn(true);
+
+ EasyMock.expect(resourceFactory.isGood("billy4")).andReturn(true);
+
+ EasyMock.expect(resourceFactory.isGood("billy3")).andThrow(new
RuntimeException("blah"));
+
+ EasyMock.replay(resourceFactory);
+ // numLentResources == 0, resourceHolderList.size() == 0
+
+ try {
+ pool.take("billy");
+ }
+ catch (Exception e) {
+ }
+ // numLentResources == 0, resourceHolderList.size() == 0
+
+ try {
+ pool.take("billy");
+ }
+ catch (Exception e) {
+ }
+ // numLentResources == 0, resourceHolderList.size() == 0
+
+ try {
+ pool.take("billy");
+ }
+ catch (Exception e) {
+ }
+ // numLentResources == 0, resourceHolderList.size() == 0
+
+ ResourceContainer<String> a = pool.take("billy");
+ // numLentResources == 1, resourceHolderList.size() == 0
+
+ ResourceContainer<String> b = pool.take("billy");
+ // numLentResources == 2, resourceHolderList.size() == 0
+
+ a.returnResource();
+ // numLentResources == 1, resourceHolderList.size() == 1
+
+ try {
+ pool.take("billy");
+ }
+ catch (Exception e) {
+ }
+ // numLentResources = 1, resourceHolderList.size() == 0
+
+ b.returnResource();
+ // numLentResources = 0, resourceHolderList.size() == 1
+ }
+
private void primePool()
{
EasyMock.expect(resourceFactory.generate("billy")).andAnswer(new
StringIncrementingAnswer("billy")).times(2);
@@ -111,6 +287,7 @@ public class ResourcePoolTest
EasyMock.expectLastCall();
EasyMock.expect(resourceFactory.generate("billy")).andThrow(new
ISE("where's billy?")).times(1);
EasyMock.expect(resourceFactory.generate("billy")).andReturn("billy2").times(1);
+ EasyMock.expect(resourceFactory.isGood("billy2")).andReturn(true).times(1);
EasyMock.replay(resourceFactory);
IllegalStateException e1 = null;
@@ -243,7 +420,8 @@ public class ResourcePoolTest
pool = new ResourcePool<String, String>(
resourceFactory,
- new ResourcePoolConfig(2, TimeUnit.MILLISECONDS.toMillis(10))
+ new ResourcePoolConfig(2, TimeUnit.MILLISECONDS.toMillis(10)),
+ true
);
EasyMock.expect(resourceFactory.generate("billy")).andAnswer(new
StringIncrementingAnswer("billy")).times(2);
@@ -263,7 +441,6 @@ public class ResourcePoolTest
EasyMock.expect(resourceFactory.generate("billy")).andReturn("billy1").times(1);
resourceFactory.close("billy1");
- EasyMock.expect(resourceFactory.isGood("billy1")).andReturn(true).times(1);
EasyMock.replay(resourceFactory);
ResourceContainer<String> billy = pool.take("billy");
diff --git a/docs/configuration/index.md b/docs/configuration/index.md
index 0148b97..db00258 100644
--- a/docs/configuration/index.md
+++ b/docs/configuration/index.md
@@ -760,6 +760,7 @@ All Druid components can communicate with each other over
HTTP.
|Property|Description|Default|
|--------|-----------|-------|
|`druid.global.http.numConnections`|Size of connection pool per destination
URL. If there are more HTTP requests than this number that all need to speak to
the same URL, then they will queue up.|`20`|
+|`druid.global.http.eagerInitialization`|Indicates that http connections
should be eagerly initialized. If set to true, `numConnections` connections are
created upon initialization|`true`|
|`druid.global.http.compressionCodec`|Compression codec to communicate with
others. May be "gzip" or "identity".|`gzip`|
|`druid.global.http.readTimeout`|The timeout for data reads.|`PT15M`|
|`druid.global.http.unusedConnectionTimeout`|The timeout for idle connections
in connection pool. The connection in the pool will be closed after this
timeout and a new one will be established. This timeout should be less than
`druid.global.http.readTimeout`. Set this timeout = ~90% of
`druid.global.http.readTimeout`|`PT4M`|
@@ -1761,6 +1762,7 @@ client has the following configuration options.
|Property|Description|Default|
|--------|-----------|-------|
|`druid.broker.http.numConnections`|Size of connection pool for the Broker to
connect to Historical and real-time processes. If there are more queries than
this number that all need to speak to the same process, then they will queue
up.|`20`|
+|`druid.broker.http.eagerInitialization`|Indicates that http connections from
Broker to Historical and Real-time processes should be eagerly initialized. If
set to true, `numConnections` connections are created upon
initialization|`true`|
|`druid.broker.http.compressionCodec`|Compression codec the Broker uses to
communicate with Historical and real-time processes. May be "gzip" or
"identity".|`gzip`|
|`druid.broker.http.readTimeout`|The timeout for data reads from Historical
servers and real-time tasks.|`PT15M`|
|`druid.broker.http.unusedConnectionTimeout`|The timeout for idle connections
in connection pool. The connection in the pool will be closed after this
timeout and a new one will be established. This timeout should be less than
`druid.broker.http.readTimeout`. Set this timeout = ~90% of
`druid.broker.http.readTimeout`|`PT4M`|
@@ -2152,6 +2154,7 @@ Supported query contexts:
|`druid.router.avatica.balancer.type`|Class to use for balancing Avatica
queries across Brokers. Please see [Avatica Query
Balancing](../design/router.md#avatica-query-balancing).|rendezvousHash|
|`druid.router.managementProxy.enabled`|Enables the Router's [management
proxy](../design/router.md#router-as-management-proxy) functionality.|false|
|`druid.router.http.numConnections`|Size of connection pool for the Router to
connect to Broker processes. If there are more queries than this number that
all need to speak to the same process, then they will queue up.|`20`|
+|`druid.router.http.eagerInitialization`|Indicates that http connections from
Router to Broker should be eagerly initialized. If set to true,
`numConnections` connections are created upon initialization|`true`|
|`druid.router.http.readTimeout`|The timeout for data reads from Broker
processes.|`PT15M`|
|`druid.router.http.numMaxThreads`|Maximum number of worker threads to handle
HTTP requests and responses|`max(10, ((number of cores * 17) / 16 + 2) + 30)`|
|`druid.router.http.numRequestsQueued`|Maximum number of requests that may be
queued to a destination|`1024`|
diff --git a/examples/conf/druid/cluster/_common/common.runtime.properties
b/examples/conf/druid/cluster/_common/common.runtime.properties
index 0fa2e40..edcef96 100644
--- a/examples/conf/druid/cluster/_common/common.runtime.properties
+++ b/examples/conf/druid/cluster/_common/common.runtime.properties
@@ -151,3 +151,8 @@ druid.lookup.enableLookupSyncOnStartup=false
# Expression processing config
#
druid.expressions.useStrictBooleans=true
+
+#
+# Http client
+#
+druid.global.http.eagerInitialization=false
diff --git
a/examples/conf/druid/single-server/large/_common/common.runtime.properties
b/examples/conf/druid/single-server/large/_common/common.runtime.properties
index 0fa2e40..edcef96 100644
--- a/examples/conf/druid/single-server/large/_common/common.runtime.properties
+++ b/examples/conf/druid/single-server/large/_common/common.runtime.properties
@@ -151,3 +151,8 @@ druid.lookup.enableLookupSyncOnStartup=false
# Expression processing config
#
druid.expressions.useStrictBooleans=true
+
+#
+# Http client
+#
+druid.global.http.eagerInitialization=false
diff --git
a/examples/conf/druid/single-server/medium/_common/common.runtime.properties
b/examples/conf/druid/single-server/medium/_common/common.runtime.properties
index 0fa2e40..edcef96 100644
--- a/examples/conf/druid/single-server/medium/_common/common.runtime.properties
+++ b/examples/conf/druid/single-server/medium/_common/common.runtime.properties
@@ -151,3 +151,8 @@ druid.lookup.enableLookupSyncOnStartup=false
# Expression processing config
#
druid.expressions.useStrictBooleans=true
+
+#
+# Http client
+#
+druid.global.http.eagerInitialization=false
diff --git
a/examples/conf/druid/single-server/micro-quickstart/_common/common.runtime.properties
b/examples/conf/druid/single-server/micro-quickstart/_common/common.runtime.properties
index 0fa2e40..edcef96 100644
---
a/examples/conf/druid/single-server/micro-quickstart/_common/common.runtime.properties
+++
b/examples/conf/druid/single-server/micro-quickstart/_common/common.runtime.properties
@@ -151,3 +151,8 @@ druid.lookup.enableLookupSyncOnStartup=false
# Expression processing config
#
druid.expressions.useStrictBooleans=true
+
+#
+# Http client
+#
+druid.global.http.eagerInitialization=false
diff --git
a/examples/conf/druid/single-server/nano-quickstart/_common/common.runtime.properties
b/examples/conf/druid/single-server/nano-quickstart/_common/common.runtime.properties
index 0fa2e40..edcef96 100644
---
a/examples/conf/druid/single-server/nano-quickstart/_common/common.runtime.properties
+++
b/examples/conf/druid/single-server/nano-quickstart/_common/common.runtime.properties
@@ -151,3 +151,8 @@ druid.lookup.enableLookupSyncOnStartup=false
# Expression processing config
#
druid.expressions.useStrictBooleans=true
+
+#
+# Http client
+#
+druid.global.http.eagerInitialization=false
diff --git
a/examples/conf/druid/single-server/small/_common/common.runtime.properties
b/examples/conf/druid/single-server/small/_common/common.runtime.properties
index 0fa2e40..edcef96 100644
--- a/examples/conf/druid/single-server/small/_common/common.runtime.properties
+++ b/examples/conf/druid/single-server/small/_common/common.runtime.properties
@@ -151,3 +151,8 @@ druid.lookup.enableLookupSyncOnStartup=false
# Expression processing config
#
druid.expressions.useStrictBooleans=true
+
+#
+# Http client
+#
+druid.global.http.eagerInitialization=false
diff --git
a/examples/conf/druid/single-server/xlarge/_common/common.runtime.properties
b/examples/conf/druid/single-server/xlarge/_common/common.runtime.properties
index 0fa2e40..edcef96 100644
--- a/examples/conf/druid/single-server/xlarge/_common/common.runtime.properties
+++ b/examples/conf/druid/single-server/xlarge/_common/common.runtime.properties
@@ -151,3 +151,8 @@ druid.lookup.enableLookupSyncOnStartup=false
# Expression processing config
#
druid.expressions.useStrictBooleans=true
+
+#
+# Http client
+#
+druid.global.http.eagerInitialization=false
diff --git
a/server/src/main/java/org/apache/druid/guice/http/DruidHttpClientConfig.java
b/server/src/main/java/org/apache/druid/guice/http/DruidHttpClientConfig.java
index b475f64..abc5ced 100644
---
a/server/src/main/java/org/apache/druid/guice/http/DruidHttpClientConfig.java
+++
b/server/src/main/java/org/apache/druid/guice/http/DruidHttpClientConfig.java
@@ -67,6 +67,9 @@ public class DruidHttpClientConfig
@JsonProperty
private HumanReadableBytes maxQueuedBytes = HumanReadableBytes.ZERO;
+ @JsonProperty
+ private boolean eagerInitialization = true;
+
public int getNumConnections()
{
return numConnections;
@@ -115,4 +118,9 @@ public class DruidHttpClientConfig
{
return maxQueuedBytes.getBytes();
}
+
+ public boolean isEagerInitialization()
+ {
+ return eagerInitialization;
+ }
}
diff --git
a/server/src/main/java/org/apache/druid/guice/http/HttpClientModule.java
b/server/src/main/java/org/apache/druid/guice/http/HttpClientModule.java
index 813b86d..d475324 100644
--- a/server/src/main/java/org/apache/druid/guice/http/HttpClientModule.java
+++ b/server/src/main/java/org/apache/druid/guice/http/HttpClientModule.java
@@ -105,6 +105,7 @@ public class HttpClientModule implements Module
final HttpClientConfig.Builder builder = HttpClientConfig
.builder()
.withNumConnections(config.getNumConnections())
+ .withEagerInitialization(config.isEagerInitialization())
.withReadTimeout(config.getReadTimeout())
.withWorkerCount(config.getNumMaxThreads())
.withCompressionCodec(
diff --git
a/server/src/test/java/org/apache/druid/server/initialization/BaseJettyTest.java
b/server/src/test/java/org/apache/druid/server/initialization/BaseJettyTest.java
index caf0900..8029f44 100644
---
a/server/src/test/java/org/apache/druid/server/initialization/BaseJettyTest.java
+++
b/server/src/test/java/org/apache/druid/server/initialization/BaseJettyTest.java
@@ -122,7 +122,12 @@ public abstract class BaseJettyTest
try {
this.client = HttpClientInit.createClient(
-
HttpClientConfig.builder().withNumConnections(maxClientConnections).withSslContext(SSLContext.getDefault()).withReadTimeout(Duration.ZERO).build(),
+ HttpClientConfig.builder()
+ .withNumConnections(maxClientConnections)
+ .withSslContext(SSLContext.getDefault())
+ .withReadTimeout(Duration.ZERO)
+ .withEagerInitialization(true)
+ .build(),
druidLifecycle
);
}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]