This is an automated email from the ASF dual-hosted git repository.
gian pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/druid.git
The following commit(s) were added to refs/heads/master by this push:
new 608abc6f635 Make SegmentAllocationQueue multithreaded (#18098)
608abc6f635 is described below
commit 608abc6f63575c65513ba74b4bedbddfca1ad94b
Author: Kashif Faraz <[email protected]>
AuthorDate: Thu Jun 12 00:02:47 2025 +0530
Make SegmentAllocationQueue multithreaded (#18098)
* Make SegmentAllocationQueue multithreaded
* Do not run multiple jobs for the same datasource
* Add docs, min schedule delay to avoid busy waiting
* Trigger queue poll when worker finishes
* Emit skip metric once per queued batch
* Simplify scheduling condition
---
docs/configuration/index.md | 1 +
.../common/actions/SegmentAllocationQueue.java | 174 ++++++++++++++++-----
.../indexing/overlord/config/TaskLockConfig.java | 7 +
.../common/actions/SegmentAllocationQueueTest.java | 54 +++++--
4 files changed, 186 insertions(+), 50 deletions(-)
diff --git a/docs/configuration/index.md b/docs/configuration/index.md
index 4b34159fd61..6c8ad4ec023 100644
--- a/docs/configuration/index.md
+++ b/docs/configuration/index.md
@@ -1121,6 +1121,7 @@ These Overlord static configurations can be defined in
the `overlord/runtime.pro
|`druid.indexer.tasklock.forceTimeChunkLock`|**Setting this to false is still
experimental**<br/> If set, all tasks are enforced to use time chunk lock. If
not set, each task automatically chooses a lock type to use. This configuration
can be overwritten by setting `forceTimeChunkLock` in the [task
context](../ingestion/tasks.md#context-parameters). See [Task lock
system](../ingestion/tasks.md#task-lock-system) for more details about locking
in tasks.|true|
|`druid.indexer.tasklock.batchSegmentAllocation`| If set to true, Druid
performs segment allocate actions in batches to improve throughput and reduce
the average `task/action/run/time`. See [batching `segmentAllocate`
actions](../ingestion/tasks.md#batching-segmentallocate-actions) for
details.|true|
|`druid.indexer.tasklock.batchAllocationWaitTime`|Number of milliseconds after
Druid adds the first segment allocate action to a batch, until it executes the
batch. Allows the batch to add more requests and improve the average segment
allocation run time. This configuration takes effect only if
`batchSegmentAllocation` is enabled.|0|
+|`druid.indexer.tasklock.batchAllocationNumThreads`|Number of worker threads
to use for batch segment allocation. This represents the maximum number of
allocation batches that can be processed in parallel for distinct datasources.
Batches for a single datasource are always processed sequentially. This
configuration takes effect only if `batchSegmentAllocation` is enabled.|5|
|`druid.indexer.task.default.context`|Default task context that is applied to
all tasks submitted to the Overlord. Any default in this config does not
override neither the context values the user provides nor
`druid.indexer.tasklock.forceTimeChunkLock`.|empty context|
|`druid.indexer.queue.maxSize`|Maximum number of active tasks at one
time.|`Integer.MAX_VALUE`|
|`druid.indexer.queue.startDelay`|Sleep this long before starting Overlord
queue management. This can be useful to give a cluster time to re-orient itself
(for example, after a widespread network issue).|`PT1M`|
diff --git
a/indexing-service/src/main/java/org/apache/druid/indexing/common/actions/SegmentAllocationQueue.java
b/indexing-service/src/main/java/org/apache/druid/indexing/common/actions/SegmentAllocationQueue.java
index dd92032c588..70df3af2f90 100644
---
a/indexing-service/src/main/java/org/apache/druid/indexing/common/actions/SegmentAllocationQueue.java
+++
b/indexing-service/src/main/java/org/apache/druid/indexing/common/actions/SegmentAllocationQueue.java
@@ -44,8 +44,10 @@ import org.apache.druid.timeline.Partitions;
import org.joda.time.Interval;
import java.util.ArrayList;
+import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
+import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Objects;
@@ -80,13 +82,30 @@ public class SegmentAllocationQueue
private final ServiceEmitter emitter;
/**
- * Single-threaded executor to process allocation queue.
+ * Single-threaded executor to pick up jobs from allocation queue and assign
+ * to worker threads.
*/
- private final ScheduledExecutorService executor;
+ private final ScheduledExecutorService managerExec;
+
+ /**
+ * Multithreaded executor to process allocation jobs.
+ */
+ private final ScheduledExecutorService workerExec;
+
+ /**
+ * Thread-safe set of datasources for which a segment allocation is
currently in-progress.
+ */
+ private final Set<String> runningDatasources =
Collections.synchronizedSet(new HashSet<>());
+
+ /**
+ * Indicates if a processing of the queue is already scheduled.
+ */
+ private final AtomicBoolean isProcessingScheduled = new AtomicBoolean(false);
private final ConcurrentHashMap<AllocateRequestKey, AllocateRequestBatch>
keyToBatch = new ConcurrentHashMap<>();
private final BlockingDeque<AllocateRequestBatch> processingQueue = new
LinkedBlockingDeque<>(MAX_QUEUE_SIZE);
+ private final TaskLockConfig config;
private final boolean reduceMetadataIO;
@Inject
@@ -103,16 +122,28 @@ public class SegmentAllocationQueue
this.metadataStorage = metadataStorage;
this.maxWaitTimeMillis = taskLockConfig.getBatchAllocationWaitTime();
this.reduceMetadataIO = taskLockConfig.isBatchAllocationReduceMetadataIO();
+ this.config = taskLockConfig;
- this.executor = taskLockConfig.isBatchSegmentAllocation()
- ? executorFactory.create(1, "SegmentAllocQueue-%s") : null;
+ if (taskLockConfig.isBatchSegmentAllocation()) {
+ this.managerExec = executorFactory.create(1,
"SegmentAllocQueue-Manager-%s");
+ this.workerExec = executorFactory.create(
+ taskLockConfig.getBatchAllocationNumThreads(),
+ "SegmentAllocQueue-Worker-%s"
+ );
+ } else {
+ this.managerExec = null;
+ this.workerExec = null;
+ }
}
@LifecycleStart
public void start()
{
if (isEnabled()) {
- log.info("Initializing segment allocation queue.");
+ log.info(
+ "Initializing segment allocation queue with [%d] worker threads.",
+ config.getBatchAllocationNumThreads()
+ );
scheduleQueuePoll(maxWaitTimeMillis);
}
}
@@ -122,7 +153,8 @@ public class SegmentAllocationQueue
{
if (isEnabled()) {
log.info("Tearing down segment allocation queue.");
- executor.shutdownNow();
+ managerExec.shutdownNow();
+ workerExec.shutdownNow();
}
}
@@ -153,16 +185,18 @@ public class SegmentAllocationQueue
public boolean isEnabled()
{
- return executor != null && !executor.isShutdown();
+ return managerExec != null && !managerExec.isShutdown();
}
/**
- * Schedules a poll of the allocation queue that runs on the {@link
#executor}.
- * It is okay to schedule multiple polls since the executor is single
threaded.
+ * Schedules a poll of the allocation queue that runs on the {@link
#managerExec},
+ * if not already scheduled.
*/
private void scheduleQueuePoll(long delay)
{
- executor.schedule(this::processBatchesDue, delay, TimeUnit.MILLISECONDS);
+ if (isProcessingScheduled.compareAndSet(false, true)) {
+ managerExec.schedule(this::processBatchesDue, delay,
TimeUnit.MILLISECONDS);
+ }
}
/**
@@ -253,46 +287,56 @@ public class SegmentAllocationQueue
});
}
+ /**
+ * Processes batches in the {@link #processingQueue} that are already due.
+ * This method must be invoked on the {@link #managerExec}.
+ */
private void processBatchesDue()
{
clearQueueIfNotLeader();
-
- // Process all the batches that are already due
- int numProcessedBatches = 0;
- AllocateRequestBatch nextBatch = processingQueue.peekFirst();
- while (nextBatch != null && nextBatch.isDue()) {
- // Process the next batch in the queue
- processingQueue.pollFirst();
- final AllocateRequestBatch currentBatch = nextBatch;
- boolean processed;
- try {
- processed = processBatch(currentBatch);
- }
- catch (Throwable t) {
- currentBatch.failPendingRequests(t);
- processed = true;
- log.error(t, "Error while processing batch[%s].", currentBatch.key);
- }
-
- // Requeue if not fully processed yet
- if (processed) {
- ++numProcessedBatches;
+ isProcessingScheduled.set(false);
+
+ // Process the batches that are already due
+ int numSubmittedBatches = 0;
+ int numSkippedBatches = 0;
+
+ // Although thread-safe, this iterator might not see entries added by other
+ // concurrent threads. Those entries will be handled in the next
processBatchesDue().
+ final Iterator<AllocateRequestBatch> queueIterator =
processingQueue.iterator();
+ while (queueIterator.hasNext() && runningDatasources.size() <
config.getBatchAllocationNumThreads()) {
+ final AllocateRequestBatch nextBatch = queueIterator.next();
+ final String dataSource = nextBatch.key.dataSource;
+ if (nextBatch.isDue()) {
+ if (runningDatasources.contains(dataSource)) {
+ // Skip this batch as another batch for the same datasource is in
progress
+ markBatchAsSkipped(nextBatch);
+ ++numSkippedBatches;
+ } else {
+ // Process this batch
+ queueIterator.remove();
+ runningDatasources.add(dataSource);
+ workerExec.submit(() -> runBatchOnWorker(nextBatch));
+ emitBatchMetric("task/action/batch/submitted", 1L, nextBatch.key);
+ ++numSubmittedBatches;
+ }
} else {
- requeueBatch(currentBatch);
+ break;
}
-
- nextBatch = processingQueue.peek();
}
+ log.debug("Submitted [%d] batches, skipped [%d] batches.",
numSubmittedBatches, numSkippedBatches);
- // Schedule the next round of processing if the queue is not empty
- if (processingQueue.isEmpty()) {
- log.debug("Processed [%d] batches, not scheduling again since queue is
empty.", numProcessedBatches);
+ // Schedule the next round of processing if there are available worker
threads
+ if (runningDatasources.size() >= config.getBatchAllocationNumThreads()) {
+ log.debug("Not scheduling again since all worker threads are busy.");
+ } else if (numSkippedBatches >= processingQueue.size() ||
processingQueue.isEmpty()) {
+ // All remaining entries in the queue were skipped
+ log.debug("Not scheduling again since there are no eligible batches
(skipped [%d]).", numSkippedBatches);
} else {
- nextBatch = processingQueue.peek();
+ final AllocateRequestBatch nextBatch = processingQueue.peek();
long timeElapsed = System.currentTimeMillis() - nextBatch.getQueueTime();
long nextScheduleDelay = Math.max(0, maxWaitTimeMillis - timeElapsed);
scheduleQueuePoll(nextScheduleDelay);
- log.debug("Processed [%d] batches, next execution in [%d ms]",
numProcessedBatches, nextScheduleDelay);
+ log.debug("Next execution in [%d ms]", nextScheduleDelay);
}
}
@@ -316,6 +360,48 @@ public class SegmentAllocationQueue
}
}
+ /**
+ * Runs the given batch. This method must be invoked on the {@link
#workerExec}.
+ */
+ private void runBatchOnWorker(AllocateRequestBatch batch)
+ {
+ boolean processed;
+ try {
+ processed = processBatch(batch);
+ }
+ catch (Throwable t) {
+ batch.failPendingRequests(t);
+ processed = true;
+ log.error(t, "Error while processing batch[%s].", batch.key);
+ }
+ finally {
+ runningDatasources.remove(batch.key.dataSource);
+ }
+
+ // Requeue if not fully processed yet
+ if (!processed) {
+ requeueBatch(batch);
+ }
+
+ scheduleQueuePoll(0);
+ }
+
+ /**
+ * Marks the given batch as skipped.
+ */
+ private void markBatchAsSkipped(AllocateRequestBatch batch)
+ {
+ keyToBatch.compute(
+ batch.key,
+ (batchKey, latestBatchForKey) -> {
+ if (latestBatchForKey != null) {
+ latestBatchForKey.markSkipped();
+ }
+ return latestBatchForKey;
+ }
+ );
+ }
+
/**
* Processes the given batch. This method marks the batch as started and
* removes it from the map {@link #keyToBatch} so that no more requests can
be
@@ -552,6 +638,7 @@ public class SegmentAllocationQueue
private long queueTimeMillis;
private final AllocateRequestKey key;
private boolean started = false;
+ private boolean skipped = false;
/**
* Map from allocate requests (represents a single SegmentAllocateAction)
@@ -579,6 +666,7 @@ public class SegmentAllocationQueue
{
queueTimeMillis = System.currentTimeMillis();
started = false;
+ skipped = false;
}
void markStarted()
@@ -591,6 +679,14 @@ public class SegmentAllocationQueue
return started;
}
+ void markSkipped()
+ {
+ if (!skipped) {
+ skipped = true;
+ emitBatchMetric("task/action/batch/skipped", 1L, key);
+ }
+ }
+
boolean isFull()
{
return size() >= MAX_BATCH_SIZE;
diff --git
a/indexing-service/src/main/java/org/apache/druid/indexing/overlord/config/TaskLockConfig.java
b/indexing-service/src/main/java/org/apache/druid/indexing/overlord/config/TaskLockConfig.java
index bc3acdc597d..4447f5fe02e 100644
---
a/indexing-service/src/main/java/org/apache/druid/indexing/overlord/config/TaskLockConfig.java
+++
b/indexing-service/src/main/java/org/apache/druid/indexing/overlord/config/TaskLockConfig.java
@@ -39,6 +39,9 @@ public class TaskLockConfig
@JsonProperty
private boolean batchAllocationReduceMetadataIO = true;
+ @JsonProperty
+ private int batchAllocationNumThreads = 5;
+
public boolean isForceTimeChunkLock()
{
return forceTimeChunkLock;
@@ -59,4 +62,8 @@ public class TaskLockConfig
return batchAllocationReduceMetadataIO;
}
+ public int getBatchAllocationNumThreads()
+ {
+ return Math.max(1, batchAllocationNumThreads);
+ }
}
diff --git
a/indexing-service/src/test/java/org/apache/druid/indexing/common/actions/SegmentAllocationQueueTest.java
b/indexing-service/src/test/java/org/apache/druid/indexing/common/actions/SegmentAllocationQueueTest.java
index 0b39b2cbef3..57923cd0f55 100644
---
a/indexing-service/src/test/java/org/apache/druid/indexing/common/actions/SegmentAllocationQueueTest.java
+++
b/indexing-service/src/test/java/org/apache/druid/indexing/common/actions/SegmentAllocationQueueTest.java
@@ -55,7 +55,8 @@ public class SegmentAllocationQueueTest
private SegmentAllocationQueue allocationQueue;
private StubServiceEmitter emitter;
- private BlockingExecutorService executor;
+ private BlockingExecutorService managerExec;
+ private BlockingExecutorService workerExec;
private final boolean reduceMetadataIO;
@@ -80,8 +81,9 @@ public class SegmentAllocationQueueTest
@Before
public void setUp()
{
- executor = new BlockingExecutorService("alloc-test-exec");
- emitter = new StubServiceEmitter("overlord", "alloc-test");
+ managerExec = new BlockingExecutorService("test-manager-exec");
+ workerExec = new BlockingExecutorService("test-worker-exec");
+ emitter = new StubServiceEmitter();
final TaskLockConfig lockConfig = new TaskLockConfig()
{
@@ -102,6 +104,12 @@ public class SegmentAllocationQueueTest
{
return reduceMetadataIO;
}
+
+ @Override
+ public int getBatchAllocationNumThreads()
+ {
+ return 20;
+ }
};
allocationQueue = new SegmentAllocationQueue(
@@ -109,8 +117,11 @@ public class SegmentAllocationQueueTest
lockConfig,
taskActionTestKit.getMetadataStorageCoordinator(),
emitter,
- (corePoolSize, nameFormat)
- -> new WrappingScheduledExecutorService(nameFormat, executor,
false)
+ (corePoolSize, nameFormat) -> new WrappingScheduledExecutorService(
+ nameFormat,
+ nameFormat.contains("Manager") ? managerExec : workerExec,
+ false
+ )
);
allocationQueue.start();
allocationQueue.becomeLeader();
@@ -122,8 +133,8 @@ public class SegmentAllocationQueueTest
if (allocationQueue != null) {
allocationQueue.stop();
}
- if (executor != null) {
- executor.shutdownNow();
+ if (managerExec != null) {
+ managerExec.shutdownNow();
}
emitter.flush();
}
@@ -247,7 +258,8 @@ public class SegmentAllocationQueueTest
.build();
Future<SegmentIdWithShardSpec> halfHourSegmentFuture =
allocationQueue.add(halfHourSegmentRequest);
- executor.finishNextPendingTask();
+ processDistinctDatasourceBatches();
+ processDistinctDatasourceBatches();
Assert.assertNotNull(getSegmentId(hourSegmentFuture));
Assert.assertNull(getSegmentId(halfHourSegmentFuture));
@@ -305,13 +317,18 @@ public class SegmentAllocationQueueTest
segmentFutures.add(allocationQueue.add(request));
}
- executor.finishNextPendingTask();
+ for (int i = 0; i < 10; ++i) {
+ processDistinctDatasourceBatches();
+ }
SegmentIdWithShardSpec segmentId1 = getSegmentId(segmentFutures.get(0));
for (Future<SegmentIdWithShardSpec> future : segmentFutures) {
Assert.assertEquals(getSegmentId(future), segmentId1);
}
+
+ // Verify each datasource batch is marked skipped just once
+ emitter.verifySum("task/action/batch/skipped", 9);
}
@Test
@@ -331,7 +348,7 @@ public class SegmentAllocationQueueTest
}
allocationQueue.stopBeingLeader();
- executor.finishNextPendingTask();
+ processDistinctDatasourceBatches();
for (Future<SegmentIdWithShardSpec> future : segmentFutures) {
Throwable t = Assert.assertThrows(ISE.class, () -> getSegmentId(future));
@@ -352,8 +369,11 @@ public class SegmentAllocationQueueTest
final int expectedCount = canBatch ? 1 : 2;
Assert.assertEquals(expectedCount, allocationQueue.size());
- executor.finishNextPendingTask();
+ // Process both the jobs
+ processDistinctDatasourceBatches();
+ processDistinctDatasourceBatches();
emitter.verifyEmitted("task/action/batch/size", expectedCount);
+ emitter.verifySum("task/action/batch/submitted", expectedCount);
Assert.assertNotNull(getSegmentId(futureA));
Assert.assertNotNull(getSegmentId(futureB));
@@ -389,4 +409,16 @@ public class SegmentAllocationQueueTest
taskActionTestKit.getTaskLockbox().add(task);
return task;
}
+
+ /**
+ * Triggers the {@link #managerExec} and the {@link #workerExec} to process a
+ * queued set of jobs (i.e. segment allocation batches) for distinct
datasources.
+ * A single invocation of this method can process up to 1 batch for any given
+ * datasource and up to {@code batchAllocationNumThreads} batches total.
+ */
+ private void processDistinctDatasourceBatches()
+ {
+ managerExec.finishNextPendingTask();
+ workerExec.finishAllPendingTasks();
+ }
}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]