This is an automated email from the ASF dual-hosted git repository.

tuglu pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/druid.git


The following commit(s) were added to refs/heads/master by this push:
     new 964ec586f74 Speed up node boot times by parallelizing buffer 
acquisition (#19025)
964ec586f74 is described below

commit 964ec586f747ef2dfef68c11e9f8008b33721b6c
Author: jtuglu1 <[email protected]>
AuthorDate: Tue Mar 3 01:22:19 2026 -0800

    Speed up node boot times by parallelizing buffer acquisition (#19025)
    
    Brokers/Historicals/Peons currently allocate buffers serially on boot. For 
large quantities of larger buffers (100+ buffers @ ~2GB per buffer) this can 
mean waiting for several minutes (in our case, upwards of 6mins for brokers, 5+ 
seconds for peons) just to acquire the memory needed, which isn't great. This 
is because it is effectively doing 100 sequential malloc/mmap calls each 
needing 2GB zero'd out memory. This change parallelizes the acquisition of the 
buffers proportional to the [...]
---
 docs/configuration/index.md                        |  3 ++
 .../SeekableStreamAppenderatorConfigTest.java      |  1 +
 .../druid/collections/DefaultBlockingPool.java     | 36 ++++++++++++++++++++--
 .../apache/druid/query/DruidProcessingConfig.java  | 15 ++++++++-
 .../apache/druid/collections/BlockingPoolTest.java | 13 ++++++++
 .../druid/query/DruidProcessingConfigTest.java     | 18 +++++++++++
 .../apache/druid/guice/BrokerProcessingModule.java |  3 +-
 .../apache/druid/guice/DruidProcessingModule.java  |  3 +-
 website/.spelling                                  |  1 +
 9 files changed, 88 insertions(+), 5 deletions(-)

diff --git a/docs/configuration/index.md b/docs/configuration/index.md
index f1e681d4fff..edee90d2e6b 100644
--- a/docs/configuration/index.md
+++ b/docs/configuration/index.md
@@ -1381,6 +1381,7 @@ Processing properties set on the Middle Manager are 
passed through to Peons.
 |`druid.processing.fifo`|Enables the processing queue to treat tasks of equal 
priority in a FIFO manner.|`true`|
 |`druid.processing.tmpDir`|Path where temporary files created while processing 
a query should be stored. If specified, this configuration takes priority over 
the default `java.io.tmpdir` path.|path represented by `java.io.tmpdir`|
 |`druid.processing.intermediaryData.storage.type`|Storage type for 
intermediary segments of data shuffle between native parallel index tasks. <br 
/>Set to `local` to store segment files in the local storage of the Middle 
Manager or Indexer. <br />Set to `deepstore` to use configured deep storage for 
better fault tolerance during rolling updates. When the storage type is 
`deepstore`, Druid stores the data in the `shuffle-data` directory under the 
configured deep storage path. Druid does n [...]
+|`druid.processing.parallelPoolInit`|(EXPERIMENTAL) Allows all 
merge/processing memory pools to be allocated in parallel on process launch. 
This may significantly speed up Peon launch times if allocating several large 
buffers.|`false`|
 
 The amount of direct memory needed by Druid is at least
 `druid.processing.buffer.sizeBytes * (druid.processing.numMergeBuffers + 
druid.processing.numThreads + 1)`. You can
@@ -1526,6 +1527,7 @@ Druid uses Jetty to serve HTTP requests.
 |`druid.processing.numTimeoutThreads`|The number of processing threads to have 
available for handling per-segment query timeouts. Setting this value to `0` 
removes the ability to service per-segment timeouts, irrespective of 
`perSegmentTimeout` query context parameter. As these threads are just 
servicing timers, it's recommended to set this value to some small percent 
(e.g. 5%) of the total query processing cores available to the indexer.|0|
 |`druid.processing.fifo`|If the processing queue should treat tasks of equal 
priority in a FIFO manner|`true`|
 |`druid.processing.tmpDir`|Path where temporary files created while processing 
a query should be stored. If specified, this configuration takes priority over 
the default `java.io.tmpdir` path.|path represented by `java.io.tmpdir`|
+|`druid.processing.parallelPoolInit`|(EXPERIMENTAL) Allows all 
merge/processing memory pools to be allocated in parallel on process launch. 
This may significantly speed up Indexer launch times if allocating several 
large buffers.|`false`|
 
 The amount of direct memory needed by Druid is at least
 `druid.processing.buffer.sizeBytes * (druid.processing.numMergeBuffers + 
druid.processing.numThreads + 1)`. You can
@@ -1636,6 +1638,7 @@ Druid uses Jetty to serve HTTP requests.
 |`druid.processing.numTimeoutThreads`|The number of processing threads to have 
available for handling per-segment query timeouts. Setting this value to `0` 
removes the ability to service per-segment timeouts, irrespective of 
`perSegmentTimeout` query context parameter. As these threads are just 
servicing timers, it's recommended to set this value to some small percent 
(e.g. 5%) of the total query processing cores available to the historical.|0|
 |`druid.processing.fifo`|If the processing queue should treat tasks of equal 
priority in a FIFO manner|`true`|
 |`druid.processing.tmpDir`|Path where temporary files created while processing 
a query should be stored. If specified, this configuration takes priority over 
the default `java.io.tmpdir` path.|path represented by `java.io.tmpdir`|
+|`druid.processing.parallelPoolInit`|(EXPERIMENTAL) Allows all 
merge/processing memory pools to be allocated in parallel on process launch. 
This may significantly speed up Historical/Broker launch times if allocating 
several large buffers.|`false`|
 
 The amount of direct memory needed by Druid is at least
 `druid.processing.buffer.sizeBytes * (druid.processing.numMergeBuffers + 
druid.processing.numThreads + 1)`. You can
diff --git 
a/indexing-service/src/test/java/org/apache/druid/indexing/seekablestream/SeekableStreamAppenderatorConfigTest.java
 
b/indexing-service/src/test/java/org/apache/druid/indexing/seekablestream/SeekableStreamAppenderatorConfigTest.java
index 85c4f2ea5eb..ac3ddb5fcf1 100644
--- 
a/indexing-service/src/test/java/org/apache/druid/indexing/seekablestream/SeekableStreamAppenderatorConfigTest.java
+++ 
b/indexing-service/src/test/java/org/apache/druid/indexing/seekablestream/SeekableStreamAppenderatorConfigTest.java
@@ -198,6 +198,7 @@ public class SeekableStreamAppenderatorConfigTest
           null,
           new 
DruidProcessingBufferConfig(HumanReadableBytes.valueOf(bufferSize), null, null),
           null,
+          null,
           JvmUtils.getRuntimeInfo()
       );
     }
diff --git 
a/processing/src/main/java/org/apache/druid/collections/DefaultBlockingPool.java
 
b/processing/src/main/java/org/apache/druid/collections/DefaultBlockingPool.java
index 5b64c1bfc2a..9f3ba2d7b51 100644
--- 
a/processing/src/main/java/org/apache/druid/collections/DefaultBlockingPool.java
+++ 
b/processing/src/main/java/org/apache/druid/collections/DefaultBlockingPool.java
@@ -29,11 +29,14 @@ import java.util.ArrayDeque;
 import java.util.ArrayList;
 import java.util.Collections;
 import java.util.List;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.ForkJoinPool;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicLong;
 import java.util.concurrent.locks.Condition;
 import java.util.concurrent.locks.ReentrantLock;
 import java.util.stream.Collectors;
+import java.util.stream.IntStream;
 
 /**
  * Pool that pre-generates objects up to a limit, then permits 
possibly-blocking "take" operations.
@@ -55,12 +58,41 @@ public class DefaultBlockingPool<T> implements 
BlockingPool<T>
       Supplier<T> generator,
       int limit
   )
+  {
+    this(generator, limit, false);
+  }
+
+  public DefaultBlockingPool(
+      Supplier<T> generator,
+      int limit,
+      boolean parallelInit
+  )
   {
     this.objects = new ArrayDeque<>(limit);
     this.maxSize = limit;
 
-    for (int i = 0; i < limit; i++) {
-      objects.add(generator.get());
+    // Parallize allocations can significantly speed up node boot times
+    if (parallelInit) {
+      int parallelism = Runtime.getRuntime().availableProcessors();
+      ForkJoinPool fjp = new ForkJoinPool(parallelism);
+      try {
+        objects.addAll(fjp.submit(
+            () -> IntStream.range(0, limit)
+                           .parallel()
+                           .mapToObj(i -> generator.get())
+                           .collect(Collectors.toCollection(ArrayList::new))
+        ).get());
+      }
+      catch (ExecutionException | InterruptedException e) {
+        throw new RuntimeException(e);
+      }
+      finally {
+        fjp.shutdown();
+      }
+    } else {
+      for (int i = 0; i < limit; i++) {
+        objects.add(generator.get());
+      }
     }
 
     this.lock = new ReentrantLock();
diff --git 
a/processing/src/main/java/org/apache/druid/query/DruidProcessingConfig.java 
b/processing/src/main/java/org/apache/druid/query/DruidProcessingConfig.java
index 7a538910fe5..47127d455df 100644
--- a/processing/src/main/java/org/apache/druid/query/DruidProcessingConfig.java
+++ b/processing/src/main/java/org/apache/druid/query/DruidProcessingConfig.java
@@ -54,6 +54,8 @@ public class DruidProcessingConfig implements ColumnConfig
   private final DruidProcessingIndexesConfig indexes;
   @JsonProperty
   private final int numTimeoutThreads;
+  @JsonProperty
+  private final boolean parallelPoolInit;
 
   private final AtomicReference<Integer> computedBufferSizeBytes = new 
AtomicReference<>();
   private final boolean numThreadsConfigured;
@@ -69,6 +71,7 @@ public class DruidProcessingConfig implements ColumnConfig
       @JsonProperty("tmpDir") @Nullable String tmpDir,
       @JsonProperty("buffer") DruidProcessingBufferConfig buffer,
       @JsonProperty("indexes") DruidProcessingIndexesConfig indexes,
+      @JsonProperty("parallelPoolInit") @Nullable Boolean parallelPoolInit,
       @JacksonInject RuntimeInfo runtimeInfo
   )
   {
@@ -86,6 +89,7 @@ public class DruidProcessingConfig implements ColumnConfig
     this.tmpDir = Configs.valueOrDefault(tmpDir, 
System.getProperty("java.io.tmpdir"));
     this.buffer = Configs.valueOrDefault(buffer, new 
DruidProcessingBufferConfig());
     this.indexes = Configs.valueOrDefault(indexes, new 
DruidProcessingIndexesConfig());
+    this.parallelPoolInit = Configs.valueOrDefault(parallelPoolInit, false);
 
     this.numThreadsConfigured = numThreads != null;
     this.numMergeBuffersConfigured = numMergeBuffers != null;
@@ -95,7 +99,7 @@ public class DruidProcessingConfig implements ColumnConfig
   @VisibleForTesting
   public DruidProcessingConfig()
   {
-    this(null, null, null, null, null, null, null, null, 
JvmUtils.getRuntimeInfo());
+    this(null, null, null, null, null, null, null, null, null, 
JvmUtils.getRuntimeInfo());
   }
 
   private void initializeBufferSize(RuntimeInfo runtimeInfo)
@@ -202,5 +206,14 @@ public class DruidProcessingConfig implements ColumnConfig
   {
     return numMergeBuffersConfigured;
   }
+
+  /**
+   * Whether buffers in this pool are allocated in parallel.
+   * See the configuration property `druid.processing.parallelPoolInit` for 
more information.
+   */
+  public boolean isParallelMemoryPoolInit()
+  {
+    return parallelPoolInit;
+  }
 }
 
diff --git 
a/processing/src/test/java/org/apache/druid/collections/BlockingPoolTest.java 
b/processing/src/test/java/org/apache/druid/collections/BlockingPoolTest.java
index cc5b82ba26e..43a681b7807 100644
--- 
a/processing/src/test/java/org/apache/druid/collections/BlockingPoolTest.java
+++ 
b/processing/src/test/java/org/apache/druid/collections/BlockingPoolTest.java
@@ -62,6 +62,19 @@ public class BlockingPoolTest
     service.shutdownNow();
   }
 
+  @Test(timeout = 60_000L)
+  public void testParallelInit()
+  {
+    DefaultBlockingPool<Integer> parallelPool = new 
DefaultBlockingPool<>(Suppliers.ofInstance(1), 10, true);
+    Assert.assertEquals(10, parallelPool.getPoolSize());
+    final ReferenceCountingResourceHolder<Integer> holder =
+        Iterables.getOnlyElement(parallelPool.takeBatch(1, 100), null);
+    Assert.assertNotNull(holder);
+    Assert.assertEquals(9, parallelPool.getPoolSize());
+    holder.close();
+    Assert.assertEquals(10, parallelPool.getPoolSize());
+  }
+
   @Test
   public void testTakeFromEmptyPool()
   {
diff --git 
a/processing/src/test/java/org/apache/druid/query/DruidProcessingConfigTest.java
 
b/processing/src/test/java/org/apache/druid/query/DruidProcessingConfigTest.java
index 9b4feb29f46..c5f6be16af6 100644
--- 
a/processing/src/test/java/org/apache/druid/query/DruidProcessingConfigTest.java
+++ 
b/processing/src/test/java/org/apache/druid/query/DruidProcessingConfigTest.java
@@ -117,6 +117,24 @@ public class DruidProcessingConfigTest
     Assert.assertEquals(0, config.getNumInitalBuffersForIntermediatePool());
   }
 
+  @Test
+  public void testParallelPoolInitDefaultIsFalse()
+  {
+    Injector injector = makeInjector(NUM_PROCESSORS, DIRECT_SIZE, HEAP_SIZE);
+    DruidProcessingConfig config = 
injector.getInstance(DruidProcessingConfig.class);
+    Assert.assertFalse(config.isParallelMemoryPoolInit());
+  }
+
+  @Test
+  public void testParallelPoolInitEnabled()
+  {
+    Properties props = new Properties();
+    props.setProperty("druid.processing.parallelPoolInit", "true");
+    Injector injector = makeInjector(NUM_PROCESSORS, DIRECT_SIZE, HEAP_SIZE, 
props);
+    DruidProcessingConfig config = 
injector.getInstance(DruidProcessingConfig.class);
+    Assert.assertTrue(config.isParallelMemoryPoolInit());
+  }
+
   @Test
   public void testInvalidSizeBytes()
   {
diff --git 
a/server/src/main/java/org/apache/druid/guice/BrokerProcessingModule.java 
b/server/src/main/java/org/apache/druid/guice/BrokerProcessingModule.java
index 6362f585333..db84419bfc1 100644
--- a/server/src/main/java/org/apache/druid/guice/BrokerProcessingModule.java
+++ b/server/src/main/java/org/apache/druid/guice/BrokerProcessingModule.java
@@ -109,7 +109,8 @@ public class BrokerProcessingModule implements Module
     verifyDirectMemory(config, runtimeInfo);
     return new DefaultBlockingPool<>(
         new OffheapBufferGenerator("result merging", 
config.intermediateComputeSizeBytes()),
-        config.getNumMergeBuffers()
+        config.getNumMergeBuffers(),
+        config.isParallelMemoryPoolInit()
     );
   }
 
diff --git 
a/server/src/main/java/org/apache/druid/guice/DruidProcessingModule.java 
b/server/src/main/java/org/apache/druid/guice/DruidProcessingModule.java
index 33bec735d5d..f2615cb882d 100644
--- a/server/src/main/java/org/apache/druid/guice/DruidProcessingModule.java
+++ b/server/src/main/java/org/apache/druid/guice/DruidProcessingModule.java
@@ -185,7 +185,8 @@ public class DruidProcessingModule implements Module
     verifyDirectMemory(config, runtimeInfo);
     return new DefaultBlockingPool<>(
         new OffheapBufferGenerator("result merging", 
config.intermediateComputeSizeBytes()),
-        config.getNumMergeBuffers()
+        config.getNumMergeBuffers(),
+        config.isParallelMemoryPoolInit()
     );
   }
 
diff --git a/website/.spelling b/website/.spelling
index 905a4ae8aa0..8bab890e6cf 100644
--- a/website/.spelling
+++ b/website/.spelling
@@ -1883,6 +1883,7 @@ minTopNThreshold
 parallelMergeInitialYieldRows
 parallelMergeParallelism
 parallelMergeSmallBatchRows
+parallelPoolInit
 populateCache
 populateResultLevelCache
 queryId


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to