This is an automated email from the ASF dual-hosted git repository.
tuglu pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/druid.git
The following commit(s) were added to refs/heads/master by this push:
new 964ec586f74 Speed up node boot times by parallelizing buffer
acquisition (#19025)
964ec586f74 is described below
commit 964ec586f747ef2dfef68c11e9f8008b33721b6c
Author: jtuglu1 <[email protected]>
AuthorDate: Tue Mar 3 01:22:19 2026 -0800
Speed up node boot times by parallelizing buffer acquisition (#19025)
Brokers/Historicals/Peons currently allocate buffers serially on boot. For
large quantities of larger buffers (100+ buffers @ ~2GB per buffer) this can
mean waiting for several minutes (in our case, upwards of 6mins for brokers, 5+
seconds for peons) just to acquire the memory needed, which isn't great. This
is because it is effectively doing 100 sequential malloc/mmap calls each
needing 2GB zero'd out memory. This change parallelizes the acquisition of the
buffers proportional to the [...]
---
docs/configuration/index.md | 3 ++
.../SeekableStreamAppenderatorConfigTest.java | 1 +
.../druid/collections/DefaultBlockingPool.java | 36 ++++++++++++++++++++--
.../apache/druid/query/DruidProcessingConfig.java | 15 ++++++++-
.../apache/druid/collections/BlockingPoolTest.java | 13 ++++++++
.../druid/query/DruidProcessingConfigTest.java | 18 +++++++++++
.../apache/druid/guice/BrokerProcessingModule.java | 3 +-
.../apache/druid/guice/DruidProcessingModule.java | 3 +-
website/.spelling | 1 +
9 files changed, 88 insertions(+), 5 deletions(-)
diff --git a/docs/configuration/index.md b/docs/configuration/index.md
index f1e681d4fff..edee90d2e6b 100644
--- a/docs/configuration/index.md
+++ b/docs/configuration/index.md
@@ -1381,6 +1381,7 @@ Processing properties set on the Middle Manager are
passed through to Peons.
|`druid.processing.fifo`|Enables the processing queue to treat tasks of equal
priority in a FIFO manner.|`true`|
|`druid.processing.tmpDir`|Path where temporary files created while processing
a query should be stored. If specified, this configuration takes priority over
the default `java.io.tmpdir` path.|path represented by `java.io.tmpdir`|
|`druid.processing.intermediaryData.storage.type`|Storage type for
intermediary segments of data shuffle between native parallel index tasks. <br
/>Set to `local` to store segment files in the local storage of the Middle
Manager or Indexer. <br />Set to `deepstore` to use configured deep storage for
better fault tolerance during rolling updates. When the storage type is
`deepstore`, Druid stores the data in the `shuffle-data` directory under the
configured deep storage path. Druid does n [...]
+|`druid.processing.parallelPoolInit`|(EXPERIMENTAL) Allows all
merge/processing memory pools to be allocated in parallel on process launch.
This may significantly speed up Peon launch times if allocating several large
buffers.|`false`|
The amount of direct memory needed by Druid is at least
`druid.processing.buffer.sizeBytes * (druid.processing.numMergeBuffers +
druid.processing.numThreads + 1)`. You can
@@ -1526,6 +1527,7 @@ Druid uses Jetty to serve HTTP requests.
|`druid.processing.numTimeoutThreads`|The number of processing threads to have
available for handling per-segment query timeouts. Setting this value to `0`
removes the ability to service per-segment timeouts, irrespective of
`perSegmentTimeout` query context parameter. As these threads are just
servicing timers, it's recommended to set this value to some small percent
(e.g. 5%) of the total query processing cores available to the indexer.|0|
|`druid.processing.fifo`|If the processing queue should treat tasks of equal
priority in a FIFO manner|`true`|
|`druid.processing.tmpDir`|Path where temporary files created while processing
a query should be stored. If specified, this configuration takes priority over
the default `java.io.tmpdir` path.|path represented by `java.io.tmpdir`|
+|`druid.processing.parallelPoolInit`|(EXPERIMENTAL) Allows all
merge/processing memory pools to be allocated in parallel on process launch.
This may significantly speed up Indexer launch times if allocating several
large buffers.|`false`|
The amount of direct memory needed by Druid is at least
`druid.processing.buffer.sizeBytes * (druid.processing.numMergeBuffers +
druid.processing.numThreads + 1)`. You can
@@ -1636,6 +1638,7 @@ Druid uses Jetty to serve HTTP requests.
|`druid.processing.numTimeoutThreads`|The number of processing threads to have
available for handling per-segment query timeouts. Setting this value to `0`
removes the ability to service per-segment timeouts, irrespective of
`perSegmentTimeout` query context parameter. As these threads are just
servicing timers, it's recommended to set this value to some small percent
(e.g. 5%) of the total query processing cores available to the historical.|0|
|`druid.processing.fifo`|If the processing queue should treat tasks of equal
priority in a FIFO manner|`true`|
|`druid.processing.tmpDir`|Path where temporary files created while processing
a query should be stored. If specified, this configuration takes priority over
the default `java.io.tmpdir` path.|path represented by `java.io.tmpdir`|
+|`druid.processing.parallelPoolInit`|(EXPERIMENTAL) Allows all
merge/processing memory pools to be allocated in parallel on process launch.
This may significantly speed up Historical/Broker launch times if allocating
several large buffers.|`false`|
The amount of direct memory needed by Druid is at least
`druid.processing.buffer.sizeBytes * (druid.processing.numMergeBuffers +
druid.processing.numThreads + 1)`. You can
diff --git
a/indexing-service/src/test/java/org/apache/druid/indexing/seekablestream/SeekableStreamAppenderatorConfigTest.java
b/indexing-service/src/test/java/org/apache/druid/indexing/seekablestream/SeekableStreamAppenderatorConfigTest.java
index 85c4f2ea5eb..ac3ddb5fcf1 100644
---
a/indexing-service/src/test/java/org/apache/druid/indexing/seekablestream/SeekableStreamAppenderatorConfigTest.java
+++
b/indexing-service/src/test/java/org/apache/druid/indexing/seekablestream/SeekableStreamAppenderatorConfigTest.java
@@ -198,6 +198,7 @@ public class SeekableStreamAppenderatorConfigTest
null,
new
DruidProcessingBufferConfig(HumanReadableBytes.valueOf(bufferSize), null, null),
null,
+ null,
JvmUtils.getRuntimeInfo()
);
}
diff --git
a/processing/src/main/java/org/apache/druid/collections/DefaultBlockingPool.java
b/processing/src/main/java/org/apache/druid/collections/DefaultBlockingPool.java
index 5b64c1bfc2a..9f3ba2d7b51 100644
---
a/processing/src/main/java/org/apache/druid/collections/DefaultBlockingPool.java
+++
b/processing/src/main/java/org/apache/druid/collections/DefaultBlockingPool.java
@@ -29,11 +29,14 @@ import java.util.ArrayDeque;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.ForkJoinPool;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.ReentrantLock;
import java.util.stream.Collectors;
+import java.util.stream.IntStream;
/**
* Pool that pre-generates objects up to a limit, then permits
possibly-blocking "take" operations.
@@ -55,12 +58,41 @@ public class DefaultBlockingPool<T> implements
BlockingPool<T>
Supplier<T> generator,
int limit
)
+ {
+ this(generator, limit, false);
+ }
+
+ public DefaultBlockingPool(
+ Supplier<T> generator,
+ int limit,
+ boolean parallelInit
+ )
{
this.objects = new ArrayDeque<>(limit);
this.maxSize = limit;
- for (int i = 0; i < limit; i++) {
- objects.add(generator.get());
+ // Parallize allocations can significantly speed up node boot times
+ if (parallelInit) {
+ int parallelism = Runtime.getRuntime().availableProcessors();
+ ForkJoinPool fjp = new ForkJoinPool(parallelism);
+ try {
+ objects.addAll(fjp.submit(
+ () -> IntStream.range(0, limit)
+ .parallel()
+ .mapToObj(i -> generator.get())
+ .collect(Collectors.toCollection(ArrayList::new))
+ ).get());
+ }
+ catch (ExecutionException | InterruptedException e) {
+ throw new RuntimeException(e);
+ }
+ finally {
+ fjp.shutdown();
+ }
+ } else {
+ for (int i = 0; i < limit; i++) {
+ objects.add(generator.get());
+ }
}
this.lock = new ReentrantLock();
diff --git
a/processing/src/main/java/org/apache/druid/query/DruidProcessingConfig.java
b/processing/src/main/java/org/apache/druid/query/DruidProcessingConfig.java
index 7a538910fe5..47127d455df 100644
--- a/processing/src/main/java/org/apache/druid/query/DruidProcessingConfig.java
+++ b/processing/src/main/java/org/apache/druid/query/DruidProcessingConfig.java
@@ -54,6 +54,8 @@ public class DruidProcessingConfig implements ColumnConfig
private final DruidProcessingIndexesConfig indexes;
@JsonProperty
private final int numTimeoutThreads;
+ @JsonProperty
+ private final boolean parallelPoolInit;
private final AtomicReference<Integer> computedBufferSizeBytes = new
AtomicReference<>();
private final boolean numThreadsConfigured;
@@ -69,6 +71,7 @@ public class DruidProcessingConfig implements ColumnConfig
@JsonProperty("tmpDir") @Nullable String tmpDir,
@JsonProperty("buffer") DruidProcessingBufferConfig buffer,
@JsonProperty("indexes") DruidProcessingIndexesConfig indexes,
+ @JsonProperty("parallelPoolInit") @Nullable Boolean parallelPoolInit,
@JacksonInject RuntimeInfo runtimeInfo
)
{
@@ -86,6 +89,7 @@ public class DruidProcessingConfig implements ColumnConfig
this.tmpDir = Configs.valueOrDefault(tmpDir,
System.getProperty("java.io.tmpdir"));
this.buffer = Configs.valueOrDefault(buffer, new
DruidProcessingBufferConfig());
this.indexes = Configs.valueOrDefault(indexes, new
DruidProcessingIndexesConfig());
+ this.parallelPoolInit = Configs.valueOrDefault(parallelPoolInit, false);
this.numThreadsConfigured = numThreads != null;
this.numMergeBuffersConfigured = numMergeBuffers != null;
@@ -95,7 +99,7 @@ public class DruidProcessingConfig implements ColumnConfig
@VisibleForTesting
public DruidProcessingConfig()
{
- this(null, null, null, null, null, null, null, null,
JvmUtils.getRuntimeInfo());
+ this(null, null, null, null, null, null, null, null, null,
JvmUtils.getRuntimeInfo());
}
private void initializeBufferSize(RuntimeInfo runtimeInfo)
@@ -202,5 +206,14 @@ public class DruidProcessingConfig implements ColumnConfig
{
return numMergeBuffersConfigured;
}
+
+ /**
+ * Whether buffers in this pool are allocated in parallel.
+ * See the configuration property `druid.processing.parallelPoolInit` for
more information.
+ */
+ public boolean isParallelMemoryPoolInit()
+ {
+ return parallelPoolInit;
+ }
}
diff --git
a/processing/src/test/java/org/apache/druid/collections/BlockingPoolTest.java
b/processing/src/test/java/org/apache/druid/collections/BlockingPoolTest.java
index cc5b82ba26e..43a681b7807 100644
---
a/processing/src/test/java/org/apache/druid/collections/BlockingPoolTest.java
+++
b/processing/src/test/java/org/apache/druid/collections/BlockingPoolTest.java
@@ -62,6 +62,19 @@ public class BlockingPoolTest
service.shutdownNow();
}
+ @Test(timeout = 60_000L)
+ public void testParallelInit()
+ {
+ DefaultBlockingPool<Integer> parallelPool = new
DefaultBlockingPool<>(Suppliers.ofInstance(1), 10, true);
+ Assert.assertEquals(10, parallelPool.getPoolSize());
+ final ReferenceCountingResourceHolder<Integer> holder =
+ Iterables.getOnlyElement(parallelPool.takeBatch(1, 100), null);
+ Assert.assertNotNull(holder);
+ Assert.assertEquals(9, parallelPool.getPoolSize());
+ holder.close();
+ Assert.assertEquals(10, parallelPool.getPoolSize());
+ }
+
@Test
public void testTakeFromEmptyPool()
{
diff --git
a/processing/src/test/java/org/apache/druid/query/DruidProcessingConfigTest.java
b/processing/src/test/java/org/apache/druid/query/DruidProcessingConfigTest.java
index 9b4feb29f46..c5f6be16af6 100644
---
a/processing/src/test/java/org/apache/druid/query/DruidProcessingConfigTest.java
+++
b/processing/src/test/java/org/apache/druid/query/DruidProcessingConfigTest.java
@@ -117,6 +117,24 @@ public class DruidProcessingConfigTest
Assert.assertEquals(0, config.getNumInitalBuffersForIntermediatePool());
}
+ @Test
+ public void testParallelPoolInitDefaultIsFalse()
+ {
+ Injector injector = makeInjector(NUM_PROCESSORS, DIRECT_SIZE, HEAP_SIZE);
+ DruidProcessingConfig config =
injector.getInstance(DruidProcessingConfig.class);
+ Assert.assertFalse(config.isParallelMemoryPoolInit());
+ }
+
+ @Test
+ public void testParallelPoolInitEnabled()
+ {
+ Properties props = new Properties();
+ props.setProperty("druid.processing.parallelPoolInit", "true");
+ Injector injector = makeInjector(NUM_PROCESSORS, DIRECT_SIZE, HEAP_SIZE,
props);
+ DruidProcessingConfig config =
injector.getInstance(DruidProcessingConfig.class);
+ Assert.assertTrue(config.isParallelMemoryPoolInit());
+ }
+
@Test
public void testInvalidSizeBytes()
{
diff --git
a/server/src/main/java/org/apache/druid/guice/BrokerProcessingModule.java
b/server/src/main/java/org/apache/druid/guice/BrokerProcessingModule.java
index 6362f585333..db84419bfc1 100644
--- a/server/src/main/java/org/apache/druid/guice/BrokerProcessingModule.java
+++ b/server/src/main/java/org/apache/druid/guice/BrokerProcessingModule.java
@@ -109,7 +109,8 @@ public class BrokerProcessingModule implements Module
verifyDirectMemory(config, runtimeInfo);
return new DefaultBlockingPool<>(
new OffheapBufferGenerator("result merging",
config.intermediateComputeSizeBytes()),
- config.getNumMergeBuffers()
+ config.getNumMergeBuffers(),
+ config.isParallelMemoryPoolInit()
);
}
diff --git
a/server/src/main/java/org/apache/druid/guice/DruidProcessingModule.java
b/server/src/main/java/org/apache/druid/guice/DruidProcessingModule.java
index 33bec735d5d..f2615cb882d 100644
--- a/server/src/main/java/org/apache/druid/guice/DruidProcessingModule.java
+++ b/server/src/main/java/org/apache/druid/guice/DruidProcessingModule.java
@@ -185,7 +185,8 @@ public class DruidProcessingModule implements Module
verifyDirectMemory(config, runtimeInfo);
return new DefaultBlockingPool<>(
new OffheapBufferGenerator("result merging",
config.intermediateComputeSizeBytes()),
- config.getNumMergeBuffers()
+ config.getNumMergeBuffers(),
+ config.isParallelMemoryPoolInit()
);
}
diff --git a/website/.spelling b/website/.spelling
index 905a4ae8aa0..8bab890e6cf 100644
--- a/website/.spelling
+++ b/website/.spelling
@@ -1883,6 +1883,7 @@ minTopNThreshold
parallelMergeInitialYieldRows
parallelMergeParallelism
parallelMergeSmallBatchRows
+parallelPoolInit
populateCache
populateResultLevelCache
queryId
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]