This is an automated email from the ASF dual-hosted git repository.

abhishek pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/druid.git


The following commit(s) were added to refs/heads/master by this push:
     new 7bf1d8c  Facilitate lazy initialization of connections to mitigate 
overwhelming of Coordinator (#12298)
7bf1d8c is described below

commit 7bf1d8c5c0adda27b94000f29113ad8915a23a4a
Author: AmatyaAvadhanula <[email protected]>
AuthorDate: Wed Mar 9 23:17:43 2022 +0530

    Facilitate lazy initialization of connections to mitigate overwhelming of 
Coordinator (#12298)
    
    Add config for eager / lazy connection initialization in ResourcePool
    
    Description
    Currently, when multiple tasks are launched, each of them eagerly 
initializes a full pool's worth of connections to the coordinator.
    
    While this is acceptable when the parameter for number of eagerConnections 
(== maxSize) is small, this can be problematic in environments where it's a 
large value (say 1000) and multiple tasks are launched simultaneously, which 
can cause a large number of connections to be created to the coordinator, 
thereby overwhelming it.
    
    Patch
    Nodes like the broker may require eager initialization of resources and do 
not create connections with the Coordinator.
    It is unnecessary to do this with other types of nodes.
    
    A config parameter eagerInitialization is added, which when set to true, 
initializes the max permissible connections when ResourcePool is initialized.
    
    If set to false, lazy initialization of connection resources takes place.
    
    NOTE: All nodes except the broker have this new parameter set to false in 
the quickstart as part of this PR
    
    Algorithm
    The current implementation relies on the creation of maxSize resources 
eagerly.
    
    The new implementation's behaviour is as follows:
    
    If a resource has been previously created and is available, lend it.
    Else if the number of created resources is less than the allowed parameter, 
create and lend it.
    Else, wait for one of the lent resources to be returned.
---
 .../java/util/http/client/HttpClientConfig.java    |  16 ++
 .../java/util/http/client/HttpClientInit.java      |   3 +-
 .../java/util/http/client/pool/ResourcePool.java   | 140 ++++++++++------
 .../util/http/client/pool/ResourcePoolTest.java    | 183 ++++++++++++++++++++-
 docs/configuration/index.md                        |   3 +
 .../cluster/_common/common.runtime.properties      |   5 +
 .../large/_common/common.runtime.properties        |   5 +
 .../medium/_common/common.runtime.properties       |   5 +
 .../_common/common.runtime.properties              |   5 +
 .../_common/common.runtime.properties              |   5 +
 .../small/_common/common.runtime.properties        |   5 +
 .../xlarge/_common/common.runtime.properties       |   5 +
 .../druid/guice/http/DruidHttpClientConfig.java    |   8 +
 .../apache/druid/guice/http/HttpClientModule.java  |   1 +
 .../druid/server/initialization/BaseJettyTest.java |   7 +-
 15 files changed, 344 insertions(+), 52 deletions(-)

diff --git 
a/core/src/main/java/org/apache/druid/java/util/http/client/HttpClientConfig.java
 
b/core/src/main/java/org/apache/druid/java/util/http/client/HttpClientConfig.java
index 08e5dac..b652c0a 100644
--- 
a/core/src/main/java/org/apache/druid/java/util/http/client/HttpClientConfig.java
+++ 
b/core/src/main/java/org/apache/druid/java/util/http/client/HttpClientConfig.java
@@ -81,6 +81,7 @@ public class HttpClientConfig
   }
 
   private final int numConnections;
+  private final boolean eagerInitialization;
   private final SSLContext sslContext;
   private final HttpClientProxyConfig proxyConfig;
   private final Duration readTimeout;
@@ -92,6 +93,7 @@ public class HttpClientConfig
 
   private HttpClientConfig(
       int numConnections,
+      boolean eagerInitialization,
       SSLContext sslContext,
       HttpClientProxyConfig proxyConfig,
       Duration readTimeout,
@@ -103,6 +105,7 @@ public class HttpClientConfig
   )
   {
     this.numConnections = numConnections;
+    this.eagerInitialization = eagerInitialization;
     this.sslContext = sslContext;
     this.proxyConfig = proxyConfig;
     this.readTimeout = readTimeout;
@@ -118,6 +121,11 @@ public class HttpClientConfig
     return numConnections;
   }
 
+  public boolean isEagerInitialization()
+  {
+    return eagerInitialization;
+  }
+
   public SSLContext getSslContext()
   {
     return sslContext;
@@ -161,6 +169,7 @@ public class HttpClientConfig
   public static class Builder
   {
     private int numConnections = 1;
+    private boolean eagerInitialization = true;
     private SSLContext sslContext = null;
     private HttpClientProxyConfig proxyConfig = null;
     private Duration readTimeout = null;
@@ -180,6 +189,12 @@ public class HttpClientConfig
       return this;
     }
 
+    public Builder withEagerInitialization(boolean eagerInitialization)
+    {
+      this.eagerInitialization = eagerInitialization;
+      return this;
+    }
+
     public Builder withSslContext(SSLContext sslContext)
     {
       this.sslContext = sslContext;
@@ -226,6 +241,7 @@ public class HttpClientConfig
     {
       return new HttpClientConfig(
           numConnections,
+          eagerInitialization,
           sslContext,
           proxyConfig,
           readTimeout,
diff --git 
a/core/src/main/java/org/apache/druid/java/util/http/client/HttpClientInit.java 
b/core/src/main/java/org/apache/druid/java/util/http/client/HttpClientInit.java
index fd3ee80..602a8a6 100644
--- 
a/core/src/main/java/org/apache/druid/java/util/http/client/HttpClientInit.java
+++ 
b/core/src/main/java/org/apache/druid/java/util/http/client/HttpClientInit.java
@@ -91,7 +91,8 @@ public class HttpClientInit
                   new ResourcePoolConfig(
                       config.getNumConnections(),
                       config.getUnusedConnectionTimeoutDuration().getMillis()
-                  )
+                  ),
+                  config.isEagerInitialization()
               ),
               config.getReadTimeout(),
               config.getCompressionCodec(),
diff --git 
a/core/src/main/java/org/apache/druid/java/util/http/client/pool/ResourcePool.java
 
b/core/src/main/java/org/apache/druid/java/util/http/client/pool/ResourcePool.java
index 8129796..7727bac 100644
--- 
a/core/src/main/java/org/apache/druid/java/util/http/client/pool/ResourcePool.java
+++ 
b/core/src/main/java/org/apache/druid/java/util/http/client/pool/ResourcePool.java
@@ -40,36 +40,49 @@ import java.util.concurrent.atomic.AtomicBoolean;
 
 /**
  * A resource pool based on {@link LoadingCache}. When a resource is first 
requested for a new key,
- * all {@link ResourcePoolConfig#getMaxPerKey()} resources are initialized and 
cached in the {@link #pool}.
- * The individual resource in {@link ImmediateCreationResourceHolder} is valid 
while (current time - last access time)
+ * If the flag: eagerInitialization is true: use {@link 
EagerCreationResourceHolder}
+ *    {@link ResourcePoolConfig#getMaxPerKey()} resources are initialized and 
cached in the {@link #pool}.
+ * Else:
+ *    Initialize a single resource and further lazily using {@link 
LazyCreationResourceHolder}
+ * The individual resource in {@link ResourceHolderPerKey} is valid while 
(current time - last access time)
  * <= {@link ResourcePoolConfig#getUnusedConnectionTimeoutMillis()}.
  *
  * A resource is closed and reinitialized if {@link ResourceFactory#isGood} 
returns false or it's expired based on
  * {@link ResourcePoolConfig#getUnusedConnectionTimeoutMillis()}.
  *
  * {@link ResourcePoolConfig#getMaxPerKey() is a hard limit for the max number 
of resources per cache entry. The total
- * number of resources in {@link ImmediateCreationResourceHolder} cannot be 
larger than the limit in any case.
+ * number of resources in {@link ResourceHolderPerKey} cannot be larger than 
the limit in any case.
  */
 public class ResourcePool<K, V> implements Closeable
 {
   private static final Logger log = new Logger(ResourcePool.class);
-  private final LoadingCache<K, ImmediateCreationResourceHolder<K, V>> pool;
+  private final LoadingCache<K, ResourceHolderPerKey<K, V>> pool;
   private final AtomicBoolean closed = new AtomicBoolean(false);
 
-  public ResourcePool(final ResourceFactory<K, V> factory, final 
ResourcePoolConfig config)
+  public ResourcePool(final ResourceFactory<K, V> factory, final 
ResourcePoolConfig config,
+                      final boolean eagerInitialization)
   {
     this.pool = CacheBuilder.newBuilder().build(
-        new CacheLoader<K, ImmediateCreationResourceHolder<K, V>>()
+        new CacheLoader<K, ResourceHolderPerKey<K, V>>()
         {
           @Override
-          public ImmediateCreationResourceHolder<K, V> load(K input)
+          public ResourceHolderPerKey<K, V> load(K input)
           {
-            return new ImmediateCreationResourceHolder<>(
-                config.getMaxPerKey(),
-                config.getUnusedConnectionTimeoutMillis(),
-                input,
-                factory
-            );
+            if (eagerInitialization) {
+              return new EagerCreationResourceHolder<>(
+                  config.getMaxPerKey(),
+                  config.getUnusedConnectionTimeoutMillis(),
+                  input,
+                  factory
+              );
+            } else {
+              return new LazyCreationResourceHolder<>(
+                  config.getMaxPerKey(),
+                  config.getUnusedConnectionTimeoutMillis(),
+                  input,
+                  factory
+              );
+            }
           }
         }
     );
@@ -86,7 +99,7 @@ public class ResourcePool<K, V> implements Closeable
       return null;
     }
 
-    final ImmediateCreationResourceHolder<K, V> holder;
+    final ResourceHolderPerKey<K, V> holder;
     try {
       holder = pool.get(key);
     }
@@ -138,11 +151,11 @@ public class ResourcePool<K, V> implements Closeable
   public void close()
   {
     closed.set(true);
-    final ConcurrentMap<K, ImmediateCreationResourceHolder<K, V>> mapView = 
pool.asMap();
+    final ConcurrentMap<K, ResourceHolderPerKey<K, V>> mapView = pool.asMap();
     Closer closer = Closer.create();
-    for (Iterator<Map.Entry<K, ImmediateCreationResourceHolder<K, V>>> 
iterator =
+    for (Iterator<Map.Entry<K, ResourceHolderPerKey<K, V>>> iterator =
          mapView.entrySet().iterator(); iterator.hasNext(); ) {
-      Map.Entry<K, ImmediateCreationResourceHolder<K, V>> e = iterator.next();
+      Map.Entry<K, ResourceHolderPerKey<K, V>> e = iterator.next();
       iterator.remove();
       closer.register(e.getValue());
     }
@@ -154,30 +167,18 @@ public class ResourcePool<K, V> implements Closeable
     }
   }
 
-  private static class ImmediateCreationResourceHolder<K, V> implements 
Closeable
+  private static class EagerCreationResourceHolder<K, V> extends 
LazyCreationResourceHolder<K, V>
   {
-    private final int maxSize;
-    private final K key;
-    private final ResourceFactory<K, V> factory;
-    private final ArrayDeque<ResourceHolder<V>> resourceHolderList;
-    private int deficit = 0;
-    private boolean closed = false;
-    private final long unusedResourceTimeoutMillis;
-
-    private ImmediateCreationResourceHolder(
+    private EagerCreationResourceHolder(
         int maxSize,
         long unusedResourceTimeoutMillis,
         K key,
         ResourceFactory<K, V> factory
     )
     {
-      this.maxSize = maxSize;
-      this.key = key;
-      this.factory = factory;
-      this.unusedResourceTimeoutMillis = unusedResourceTimeoutMillis;
-      this.resourceHolderList = new ArrayDeque<>();
-
-      for (int i = 0; i < maxSize; ++i) {
+      super(maxSize, unusedResourceTimeoutMillis, key, factory);
+      // Eagerly Instantiate
+      for (int i = 0; i < maxSize; i++) {
         resourceHolderList.add(
             new ResourceHolder<>(
                 System.currentTimeMillis(),
@@ -189,17 +190,60 @@ public class ResourcePool<K, V> implements Closeable
         );
       }
     }
+  }
+
+  private static class LazyCreationResourceHolder<K, V> extends 
ResourceHolderPerKey<K, V>
+  {
+    private LazyCreationResourceHolder(
+        int maxSize,
+        long unusedResourceTimeoutMillis,
+        K key,
+        ResourceFactory<K, V> factory
+    )
+    {
+      super(maxSize, unusedResourceTimeoutMillis, key, factory);
+    }
+  }
+
+  private static class ResourceHolderPerKey<K, V> implements Closeable
+  {
+    protected final int maxSize;
+    private final K key;
+    private final ResourceFactory<K, V> factory;
+    private final long unusedResourceTimeoutMillis;
+    // Hold previously created / returned resources
+    protected final ArrayDeque<ResourceHolder<V>> resourceHolderList;
+    // To keep track of resources that have been successfully returned to 
caller.
+    private int numLentResources = 0;
+    private boolean closed = false;
+
+    protected ResourceHolderPerKey(
+        int maxSize,
+        long unusedResourceTimeoutMillis,
+        K key,
+        ResourceFactory<K, V> factory
+    )
+    {
+      this.maxSize = maxSize;
+      this.key = key;
+      this.factory = factory;
+      this.unusedResourceTimeoutMillis = unusedResourceTimeoutMillis;
+      this.resourceHolderList = new ArrayDeque<>();
+    }
 
     /**
      * Returns a resource or null if this holder is already closed or the 
current thread is interrupted.
+     *
+     * Try to return a previously created resource if it isGood(). Else, 
generate a new resource
      */
     @Nullable
     V get()
     {
-      // resourceHolderList can't have nulls, so we'll use a null to signal 
that we need to create a new resource.
       final V poolVal;
+      // resourceHolderList can't have nulls, so we'll use a null to signal 
that we need to create a new resource.
+      boolean expired = false;
       synchronized (this) {
-        while (!closed && resourceHolderList.size() == 0 && deficit == 0) {
+        while (!closed && (numLentResources == maxSize)) {
           try {
             this.wait();
           }
@@ -212,26 +256,27 @@ public class ResourcePool<K, V> implements Closeable
         if (closed) {
           log.info(StringUtils.format("get() called even though I'm closed. 
key[%s]", key));
           return null;
-        } else if (!resourceHolderList.isEmpty()) {
-          ResourceHolder<V> holder = resourceHolderList.removeFirst();
-          if (System.currentTimeMillis() - holder.getLastAccessedTime() > 
unusedResourceTimeoutMillis) {
-            factory.close(holder.getResource());
+        } else if (numLentResources < maxSize) {
+          // Attempt to take an existing resource or create one if list is 
empty, and increment numLentResources
+          if (resourceHolderList.isEmpty()) {
             poolVal = factory.generate(key);
           } else {
+            ResourceHolder<V> holder = resourceHolderList.removeFirst();
             poolVal = holder.getResource();
+            if (System.currentTimeMillis() - holder.getLastAccessedTime() > 
unusedResourceTimeoutMillis) {
+              expired = true;
+            }
           }
-        } else if (deficit > 0) {
-          deficit--;
-          poolVal = null;
+          numLentResources++;
         } else {
-          throw new IllegalStateException("Unexpected state: No objects left, 
and no object deficit");
+          throw new IllegalStateException("Unexpected state: More objects lent 
than permissible");
         }
       }
 
-      // At this point, we must either return a valid resource or increment 
"deficit".
       final V retVal;
+      // At this point, we must either return a valid resource. Or throw and 
exception decrement "numLentResources"
       try {
-        if (poolVal != null && factory.isGood(poolVal)) {
+        if (poolVal != null && !expired && factory.isGood(poolVal)) {
           retVal = poolVal;
         } else {
           if (poolVal != null) {
@@ -242,7 +287,7 @@ public class ResourcePool<K, V> implements Closeable
       }
       catch (Throwable e) {
         synchronized (this) {
-          deficit++;
+          numLentResources--;
           this.notifyAll();
         }
         Throwables.propagateIfPossible(e);
@@ -288,6 +333,7 @@ public class ResourcePool<K, V> implements Closeable
         }
 
         resourceHolderList.addLast(new 
ResourceHolder<>(System.currentTimeMillis(), object));
+        numLentResources--;
         this.notifyAll();
       }
     }
diff --git 
a/core/src/test/java/org/apache/druid/java/util/http/client/pool/ResourcePoolTest.java
 
b/core/src/test/java/org/apache/druid/java/util/http/client/pool/ResourcePoolTest.java
index b7e074c..2961f65 100644
--- 
a/core/src/test/java/org/apache/druid/java/util/http/client/pool/ResourcePoolTest.java
+++ 
b/core/src/test/java/org/apache/druid/java/util/http/client/pool/ResourcePoolTest.java
@@ -39,12 +39,23 @@ public class ResourcePoolTest
   @Before
   public void setUp()
   {
+    setUpPool(true);
+  }
+
+  public void setUpPoolWithoutEagerInitialization()
+  {
+    setUpPool(false);
+  }
+
+  public void setUpPool(boolean eagerInitialization)
+  {
     resourceFactory = (ResourceFactory<String, String>) 
EasyMock.createMock(ResourceFactory.class);
 
     EasyMock.replay(resourceFactory);
     pool = new ResourcePool<String, String>(
         resourceFactory,
-        new ResourcePoolConfig(2, TimeUnit.MINUTES.toMillis(4))
+        new ResourcePoolConfig(2, TimeUnit.MINUTES.toMillis(4)),
+        eagerInitialization
     );
 
     EasyMock.verify(resourceFactory);
@@ -58,6 +69,171 @@ public class ResourcePoolTest
     EasyMock.replay(resourceFactory);
   }
 
+  @Test
+  public void testTakeOnce_lazy()
+  {
+    setUpPoolWithoutEagerInitialization();
+
+    EasyMock.expect(resourceFactory.generate("billy")).andAnswer(new 
StringIncrementingAnswer("billy")).times(1);
+    EasyMock.expect(resourceFactory.isGood("billy0")).andReturn(true).times(1);
+    EasyMock.replay(resourceFactory);
+
+    ResourceContainer<String> billyString = pool.take("billy");
+    Assert.assertEquals("billy0", billyString.get());
+
+    billyString.returnResource();
+  }
+
+  @Test
+  public void testTakeAfterReturn_lazy()
+  {
+    setUpPoolWithoutEagerInitialization();
+
+    // Generate and check before return
+    EasyMock.expect(resourceFactory.generate("billy")).andAnswer(new 
StringIncrementingAnswer("billy")).times(1);
+    EasyMock.expect(resourceFactory.isGood("billy0")).andReturn(true).times(1);
+    // Only check since there's no need to generate after return
+    EasyMock.expect(resourceFactory.isGood("billy0")).andReturn(true).times(1);
+    EasyMock.replay(resourceFactory);
+
+    ResourceContainer<String> billyString = pool.take("billy");
+    Assert.assertEquals("billy0", billyString.get());
+
+    billyString.returnResource();
+
+    billyString = pool.take("billy");
+    Assert.assertEquals("billy0", billyString.get());
+
+    billyString.returnResource();
+  }
+
+  @Test
+  public void testTakeAfterFailure()
+  {
+    EasyMock.expect(resourceFactory.generate("billy")).andReturn("billy0");
+    EasyMock.expect(resourceFactory.generate("billy")).andReturn("billy1");
+
+    EasyMock.expect(resourceFactory.isGood("billy0")).andThrow(new 
RuntimeException("blah"));
+
+    EasyMock.expect(resourceFactory.isGood("billy1")).andThrow(new 
RuntimeException("blah"));
+
+    EasyMock.expect(resourceFactory.generate("billy")).andReturn("billy2");
+    EasyMock.expect(resourceFactory.isGood("billy2")).andReturn(true);
+
+    EasyMock.expect(resourceFactory.generate("billy")).andReturn("billy3");
+    EasyMock.expect(resourceFactory.isGood("billy3")).andReturn(true);
+
+    EasyMock.expect(resourceFactory.isGood("billy2")).andReturn(true);
+
+    EasyMock.expect(resourceFactory.isGood("billy3")).andReturn(true);
+
+    EasyMock.expect(resourceFactory.isGood("billy2")).andReturn(true);
+
+    EasyMock.replay(resourceFactory);
+    // numLentResources == 0, resourceHolderList.size() == 2
+
+    try {
+      pool.take("billy");
+    }
+    catch (Exception e) {
+    }
+    // numLentResources == 0, resourceHolderList.size() == 1
+
+    try {
+      pool.take("billy");
+    }
+    catch (Exception e) {
+    }
+    // numLentResources == 0, resourceHolderList.size() == 0
+
+    ResourceContainer<String> a = pool.take("billy");
+    // numLentResources == 1, resourceHolderList.size() == 0
+
+    ResourceContainer<String> b = pool.take("billy");
+    // numLentResources == 2, resourceHolderList.size() == 0
+
+    a.returnResource();
+    // numLentResources = 1, resourceHolderList.size() == 1
+
+    a = pool.take("billy");
+    // numLentResources = 2, resourceHolderList.size() == 0
+
+    b.returnResource();
+    // numLentResources = 1, resourceHolderList.size() == 1
+
+    a.returnResource();
+    // numLentResources = 0, resourceHolderList.size() == 2
+  }
+
+  @Test
+  public void testTakeAfterFailure_lazy()
+  {
+    setUpPoolWithoutEagerInitialization();
+
+    EasyMock.expect(resourceFactory.generate("billy")).andReturn("billy0");
+    EasyMock.expect(resourceFactory.isGood("billy0")).andThrow(new 
RuntimeException("blah"));
+
+    EasyMock.expect(resourceFactory.generate("billy")).andReturn("billy1");
+    EasyMock.expect(resourceFactory.isGood("billy1")).andThrow(new 
RuntimeException("blah"));
+
+    EasyMock.expect(resourceFactory.generate("billy")).andThrow(new 
RuntimeException("blah"));
+
+    EasyMock.expect(resourceFactory.generate("billy")).andReturn("billy3");
+    EasyMock.expect(resourceFactory.isGood("billy3")).andReturn(true);
+
+    EasyMock.expect(resourceFactory.generate("billy")).andReturn("billy4");
+    EasyMock.expect(resourceFactory.isGood("billy4")).andReturn(true);
+
+    EasyMock.expect(resourceFactory.isGood("billy3")).andReturn(true);
+
+    EasyMock.expect(resourceFactory.isGood("billy4")).andReturn(true);
+
+    EasyMock.expect(resourceFactory.isGood("billy3")).andThrow(new 
RuntimeException("blah"));
+
+    EasyMock.replay(resourceFactory);
+    // numLentResources == 0, resourceHolderList.size() == 0
+
+    try {
+      pool.take("billy");
+    }
+    catch (Exception e) {
+    }
+    // numLentResources == 0, resourceHolderList.size() == 0
+
+    try {
+      pool.take("billy");
+    }
+    catch (Exception e) {
+    }
+    // numLentResources == 0, resourceHolderList.size() == 0
+
+    try {
+      pool.take("billy");
+    }
+    catch (Exception e) {
+    }
+    // numLentResources == 0, resourceHolderList.size() == 0
+
+    ResourceContainer<String> a = pool.take("billy");
+    // numLentResources == 1, resourceHolderList.size() == 0
+
+    ResourceContainer<String> b = pool.take("billy");
+    // numLentResources == 2, resourceHolderList.size() == 0
+
+    a.returnResource();
+    // numLentResources == 1, resourceHolderList.size() == 1
+
+    try {
+      pool.take("billy");
+    }
+    catch (Exception e) {
+    }
+    // numLentResources = 1, resourceHolderList.size() == 0
+
+    b.returnResource();
+    // numLentResources = 0, resourceHolderList.size() == 1
+  }
+
   private void primePool()
   {
     EasyMock.expect(resourceFactory.generate("billy")).andAnswer(new 
StringIncrementingAnswer("billy")).times(2);
@@ -111,6 +287,7 @@ public class ResourcePoolTest
     EasyMock.expectLastCall();
     EasyMock.expect(resourceFactory.generate("billy")).andThrow(new 
ISE("where's billy?")).times(1);
     
EasyMock.expect(resourceFactory.generate("billy")).andReturn("billy2").times(1);
+    EasyMock.expect(resourceFactory.isGood("billy2")).andReturn(true).times(1);
     EasyMock.replay(resourceFactory);
 
     IllegalStateException e1 = null;
@@ -243,7 +420,8 @@ public class ResourcePoolTest
 
     pool = new ResourcePool<String, String>(
         resourceFactory,
-        new ResourcePoolConfig(2, TimeUnit.MILLISECONDS.toMillis(10))
+        new ResourcePoolConfig(2, TimeUnit.MILLISECONDS.toMillis(10)),
+        true
     );
 
     EasyMock.expect(resourceFactory.generate("billy")).andAnswer(new 
StringIncrementingAnswer("billy")).times(2);
@@ -263,7 +441,6 @@ public class ResourcePoolTest
 
     
EasyMock.expect(resourceFactory.generate("billy")).andReturn("billy1").times(1);
     resourceFactory.close("billy1");
-    EasyMock.expect(resourceFactory.isGood("billy1")).andReturn(true).times(1);
     EasyMock.replay(resourceFactory);
 
     ResourceContainer<String> billy = pool.take("billy");
diff --git a/docs/configuration/index.md b/docs/configuration/index.md
index 0148b97..db00258 100644
--- a/docs/configuration/index.md
+++ b/docs/configuration/index.md
@@ -760,6 +760,7 @@ All Druid components can communicate with each other over 
HTTP.
 |Property|Description|Default|
 |--------|-----------|-------|
 |`druid.global.http.numConnections`|Size of connection pool per destination 
URL. If there are more HTTP requests than this number that all need to speak to 
the same URL, then they will queue up.|`20`|
+|`druid.global.http.eagerInitialization`|Indicates that http connections 
should be eagerly initialized. If set to true, `numConnections` connections are 
created upon initialization|`true`|
 |`druid.global.http.compressionCodec`|Compression codec to communicate with 
others. May be "gzip" or "identity".|`gzip`|
 |`druid.global.http.readTimeout`|The timeout for data reads.|`PT15M`|
 |`druid.global.http.unusedConnectionTimeout`|The timeout for idle connections 
in connection pool. The connection in the pool will be closed after this 
timeout and a new one will be established. This timeout should be less than 
`druid.global.http.readTimeout`. Set this timeout = ~90% of 
`druid.global.http.readTimeout`|`PT4M`|
@@ -1761,6 +1762,7 @@ client has the following configuration options.
 |Property|Description|Default|
 |--------|-----------|-------|
 |`druid.broker.http.numConnections`|Size of connection pool for the Broker to 
connect to Historical and real-time processes. If there are more queries than 
this number that all need to speak to the same process, then they will queue 
up.|`20`|
+|`druid.broker.http.eagerInitialization`|Indicates that http connections from 
Broker to Historical and Real-time processes should be eagerly initialized. If 
set to true, `numConnections` connections are created upon 
initialization|`true`|
 |`druid.broker.http.compressionCodec`|Compression codec the Broker uses to 
communicate with Historical and real-time processes. May be "gzip" or 
"identity".|`gzip`|
 |`druid.broker.http.readTimeout`|The timeout for data reads from Historical 
servers and real-time tasks.|`PT15M`|
 |`druid.broker.http.unusedConnectionTimeout`|The timeout for idle connections 
in connection pool. The connection in the pool will be closed after this 
timeout and a new one will be established. This timeout should be less than 
`druid.broker.http.readTimeout`. Set this timeout = ~90% of 
`druid.broker.http.readTimeout`|`PT4M`|
@@ -2152,6 +2154,7 @@ Supported query contexts:
 |`druid.router.avatica.balancer.type`|Class to use for balancing Avatica 
queries across Brokers. Please see [Avatica Query 
Balancing](../design/router.md#avatica-query-balancing).|rendezvousHash|
 |`druid.router.managementProxy.enabled`|Enables the Router's [management 
proxy](../design/router.md#router-as-management-proxy) functionality.|false|
 |`druid.router.http.numConnections`|Size of connection pool for the Router to 
connect to Broker processes. If there are more queries than this number that 
all need to speak to the same process, then they will queue up.|`20`|
+|`druid.router.http.eagerInitialization`|Indicates that http connections from 
Router to Broker should be eagerly initialized. If set to true, 
`numConnections` connections are created upon initialization|`true`|
 |`druid.router.http.readTimeout`|The timeout for data reads from Broker 
processes.|`PT15M`|
 |`druid.router.http.numMaxThreads`|Maximum number of worker threads to handle 
HTTP requests and responses|`max(10, ((number of cores * 17) / 16 + 2) + 30)`|
 |`druid.router.http.numRequestsQueued`|Maximum number of requests that may be 
queued to a destination|`1024`|
diff --git a/examples/conf/druid/cluster/_common/common.runtime.properties 
b/examples/conf/druid/cluster/_common/common.runtime.properties
index 0fa2e40..edcef96 100644
--- a/examples/conf/druid/cluster/_common/common.runtime.properties
+++ b/examples/conf/druid/cluster/_common/common.runtime.properties
@@ -151,3 +151,8 @@ druid.lookup.enableLookupSyncOnStartup=false
 # Expression processing config
 #
 druid.expressions.useStrictBooleans=true
+
+#
+# Http client
+#
+druid.global.http.eagerInitialization=false
diff --git 
a/examples/conf/druid/single-server/large/_common/common.runtime.properties 
b/examples/conf/druid/single-server/large/_common/common.runtime.properties
index 0fa2e40..edcef96 100644
--- a/examples/conf/druid/single-server/large/_common/common.runtime.properties
+++ b/examples/conf/druid/single-server/large/_common/common.runtime.properties
@@ -151,3 +151,8 @@ druid.lookup.enableLookupSyncOnStartup=false
 # Expression processing config
 #
 druid.expressions.useStrictBooleans=true
+
+#
+# Http client
+#
+druid.global.http.eagerInitialization=false
diff --git 
a/examples/conf/druid/single-server/medium/_common/common.runtime.properties 
b/examples/conf/druid/single-server/medium/_common/common.runtime.properties
index 0fa2e40..edcef96 100644
--- a/examples/conf/druid/single-server/medium/_common/common.runtime.properties
+++ b/examples/conf/druid/single-server/medium/_common/common.runtime.properties
@@ -151,3 +151,8 @@ druid.lookup.enableLookupSyncOnStartup=false
 # Expression processing config
 #
 druid.expressions.useStrictBooleans=true
+
+#
+# Http client
+#
+druid.global.http.eagerInitialization=false
diff --git 
a/examples/conf/druid/single-server/micro-quickstart/_common/common.runtime.properties
 
b/examples/conf/druid/single-server/micro-quickstart/_common/common.runtime.properties
index 0fa2e40..edcef96 100644
--- 
a/examples/conf/druid/single-server/micro-quickstart/_common/common.runtime.properties
+++ 
b/examples/conf/druid/single-server/micro-quickstart/_common/common.runtime.properties
@@ -151,3 +151,8 @@ druid.lookup.enableLookupSyncOnStartup=false
 # Expression processing config
 #
 druid.expressions.useStrictBooleans=true
+
+#
+# Http client
+#
+druid.global.http.eagerInitialization=false
diff --git 
a/examples/conf/druid/single-server/nano-quickstart/_common/common.runtime.properties
 
b/examples/conf/druid/single-server/nano-quickstart/_common/common.runtime.properties
index 0fa2e40..edcef96 100644
--- 
a/examples/conf/druid/single-server/nano-quickstart/_common/common.runtime.properties
+++ 
b/examples/conf/druid/single-server/nano-quickstart/_common/common.runtime.properties
@@ -151,3 +151,8 @@ druid.lookup.enableLookupSyncOnStartup=false
 # Expression processing config
 #
 druid.expressions.useStrictBooleans=true
+
+#
+# Http client
+#
+druid.global.http.eagerInitialization=false
diff --git 
a/examples/conf/druid/single-server/small/_common/common.runtime.properties 
b/examples/conf/druid/single-server/small/_common/common.runtime.properties
index 0fa2e40..edcef96 100644
--- a/examples/conf/druid/single-server/small/_common/common.runtime.properties
+++ b/examples/conf/druid/single-server/small/_common/common.runtime.properties
@@ -151,3 +151,8 @@ druid.lookup.enableLookupSyncOnStartup=false
 # Expression processing config
 #
 druid.expressions.useStrictBooleans=true
+
+#
+# Http client
+#
+druid.global.http.eagerInitialization=false
diff --git 
a/examples/conf/druid/single-server/xlarge/_common/common.runtime.properties 
b/examples/conf/druid/single-server/xlarge/_common/common.runtime.properties
index 0fa2e40..edcef96 100644
--- a/examples/conf/druid/single-server/xlarge/_common/common.runtime.properties
+++ b/examples/conf/druid/single-server/xlarge/_common/common.runtime.properties
@@ -151,3 +151,8 @@ druid.lookup.enableLookupSyncOnStartup=false
 # Expression processing config
 #
 druid.expressions.useStrictBooleans=true
+
+#
+# Http client
+#
+druid.global.http.eagerInitialization=false
diff --git 
a/server/src/main/java/org/apache/druid/guice/http/DruidHttpClientConfig.java 
b/server/src/main/java/org/apache/druid/guice/http/DruidHttpClientConfig.java
index b475f64..abc5ced 100644
--- 
a/server/src/main/java/org/apache/druid/guice/http/DruidHttpClientConfig.java
+++ 
b/server/src/main/java/org/apache/druid/guice/http/DruidHttpClientConfig.java
@@ -67,6 +67,9 @@ public class DruidHttpClientConfig
   @JsonProperty
   private HumanReadableBytes maxQueuedBytes = HumanReadableBytes.ZERO;
 
+  @JsonProperty
+  private boolean eagerInitialization = true;
+
   public int getNumConnections()
   {
     return numConnections;
@@ -115,4 +118,9 @@ public class DruidHttpClientConfig
   {
     return maxQueuedBytes.getBytes();
   }
+
+  public boolean isEagerInitialization()
+  {
+    return eagerInitialization;
+  }
 }
diff --git 
a/server/src/main/java/org/apache/druid/guice/http/HttpClientModule.java 
b/server/src/main/java/org/apache/druid/guice/http/HttpClientModule.java
index 813b86d..d475324 100644
--- a/server/src/main/java/org/apache/druid/guice/http/HttpClientModule.java
+++ b/server/src/main/java/org/apache/druid/guice/http/HttpClientModule.java
@@ -105,6 +105,7 @@ public class HttpClientModule implements Module
       final HttpClientConfig.Builder builder = HttpClientConfig
           .builder()
           .withNumConnections(config.getNumConnections())
+          .withEagerInitialization(config.isEagerInitialization())
           .withReadTimeout(config.getReadTimeout())
           .withWorkerCount(config.getNumMaxThreads())
           .withCompressionCodec(
diff --git 
a/server/src/test/java/org/apache/druid/server/initialization/BaseJettyTest.java
 
b/server/src/test/java/org/apache/druid/server/initialization/BaseJettyTest.java
index caf0900..8029f44 100644
--- 
a/server/src/test/java/org/apache/druid/server/initialization/BaseJettyTest.java
+++ 
b/server/src/test/java/org/apache/druid/server/initialization/BaseJettyTest.java
@@ -122,7 +122,12 @@ public abstract class BaseJettyTest
 
       try {
         this.client = HttpClientInit.createClient(
-            
HttpClientConfig.builder().withNumConnections(maxClientConnections).withSslContext(SSLContext.getDefault()).withReadTimeout(Duration.ZERO).build(),
+            HttpClientConfig.builder()
+                            .withNumConnections(maxClientConnections)
+                            .withSslContext(SSLContext.getDefault())
+                            .withReadTimeout(Duration.ZERO)
+                            .withEagerInitialization(true)
+                            .build(),
             druidLifecycle
         );
       }

---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to