This is an automated email from the ASF dual-hosted git repository.

gian pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/druid.git


The following commit(s) were added to refs/heads/master by this push:
     new 608abc6f635 Make SegmentAllocationQueue multithreaded (#18098)
608abc6f635 is described below

commit 608abc6f63575c65513ba74b4bedbddfca1ad94b
Author: Kashif Faraz <[email protected]>
AuthorDate: Thu Jun 12 00:02:47 2025 +0530

    Make SegmentAllocationQueue multithreaded (#18098)
    
    * Make SegmentAllocationQueue multithreaded
    
    * Do not run multiple jobs for the same datasource
    
    * Add docs, min schedule delay to avoid busy waiting
    
    * Trigger queue poll when worker finishes
    
    * Emit skip metric once per queued batch
    
    * Simplify scheduling condition
---
 docs/configuration/index.md                        |   1 +
 .../common/actions/SegmentAllocationQueue.java     | 174 ++++++++++++++++-----
 .../indexing/overlord/config/TaskLockConfig.java   |   7 +
 .../common/actions/SegmentAllocationQueueTest.java |  54 +++++--
 4 files changed, 186 insertions(+), 50 deletions(-)

diff --git a/docs/configuration/index.md b/docs/configuration/index.md
index 4b34159fd61..6c8ad4ec023 100644
--- a/docs/configuration/index.md
+++ b/docs/configuration/index.md
@@ -1121,6 +1121,7 @@ These Overlord static configurations can be defined in 
the `overlord/runtime.pro
 |`druid.indexer.tasklock.forceTimeChunkLock`|**Setting this to false is still 
experimental**<br/> If set, all tasks are enforced to use time chunk lock. If 
not set, each task automatically chooses a lock type to use. This configuration 
can be overwritten by setting `forceTimeChunkLock` in the [task 
context](../ingestion/tasks.md#context-parameters). See [Task lock 
system](../ingestion/tasks.md#task-lock-system) for more details about locking 
in tasks.|true|
 |`druid.indexer.tasklock.batchSegmentAllocation`| If set to true, Druid 
performs segment allocate actions in batches to improve throughput and reduce 
the average `task/action/run/time`. See [batching `segmentAllocate` 
actions](../ingestion/tasks.md#batching-segmentallocate-actions) for 
details.|true|
 |`druid.indexer.tasklock.batchAllocationWaitTime`|Number of milliseconds after 
Druid adds the first segment allocate action to a batch, until it executes the 
batch. Allows the batch to add more requests and improve the average segment 
allocation run time. This configuration takes effect only if 
`batchSegmentAllocation` is enabled.|0|
+|`druid.indexer.tasklock.batchAllocationNumThreads`|Number of worker threads 
to use for batch segment allocation. This represents the maximum number of 
allocation batches that can be processed in parallel for distinct datasources. 
Batches for a single datasource are always processed sequentially. This 
configuration takes effect only if `batchSegmentAllocation` is enabled.|5|
 |`druid.indexer.task.default.context`|Default task context that is applied to 
all tasks submitted to the Overlord. Any default in this config does not 
override neither the context values the user provides nor 
`druid.indexer.tasklock.forceTimeChunkLock`.|empty context|
 |`druid.indexer.queue.maxSize`|Maximum number of active tasks at one 
time.|`Integer.MAX_VALUE`|
 |`druid.indexer.queue.startDelay`|Sleep this long before starting Overlord 
queue management. This can be useful to give a cluster time to re-orient itself 
(for example, after a widespread network issue).|`PT1M`|
diff --git 
a/indexing-service/src/main/java/org/apache/druid/indexing/common/actions/SegmentAllocationQueue.java
 
b/indexing-service/src/main/java/org/apache/druid/indexing/common/actions/SegmentAllocationQueue.java
index dd92032c588..70df3af2f90 100644
--- 
a/indexing-service/src/main/java/org/apache/druid/indexing/common/actions/SegmentAllocationQueue.java
+++ 
b/indexing-service/src/main/java/org/apache/druid/indexing/common/actions/SegmentAllocationQueue.java
@@ -44,8 +44,10 @@ import org.apache.druid.timeline.Partitions;
 import org.joda.time.Interval;
 
 import java.util.ArrayList;
+import java.util.Collections;
 import java.util.HashMap;
 import java.util.HashSet;
+import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
 import java.util.Objects;
@@ -80,13 +82,30 @@ public class SegmentAllocationQueue
   private final ServiceEmitter emitter;
 
   /**
-   * Single-threaded executor to process allocation queue.
+   * Single-threaded executor to pick up jobs from allocation queue and assign
+   * to worker threads.
    */
-  private final ScheduledExecutorService executor;
+  private final ScheduledExecutorService managerExec;
+
+  /**
+   * Multithreaded executor to process allocation jobs.
+   */
+  private final ScheduledExecutorService workerExec;
+
+  /**
+   * Thread-safe set of datasources for which a segment allocation is 
currently in-progress.
+   */
+  private final Set<String> runningDatasources = 
Collections.synchronizedSet(new HashSet<>());
+
+  /**
+   * Indicates if a processing of the queue is already scheduled.
+   */
+  private final AtomicBoolean isProcessingScheduled = new AtomicBoolean(false);
 
   private final ConcurrentHashMap<AllocateRequestKey, AllocateRequestBatch> 
keyToBatch = new ConcurrentHashMap<>();
   private final BlockingDeque<AllocateRequestBatch> processingQueue = new 
LinkedBlockingDeque<>(MAX_QUEUE_SIZE);
 
+  private final TaskLockConfig config;
   private final boolean reduceMetadataIO;
 
   @Inject
@@ -103,16 +122,28 @@ public class SegmentAllocationQueue
     this.metadataStorage = metadataStorage;
     this.maxWaitTimeMillis = taskLockConfig.getBatchAllocationWaitTime();
     this.reduceMetadataIO = taskLockConfig.isBatchAllocationReduceMetadataIO();
+    this.config = taskLockConfig;
 
-    this.executor = taskLockConfig.isBatchSegmentAllocation()
-                    ? executorFactory.create(1, "SegmentAllocQueue-%s") : null;
+    if (taskLockConfig.isBatchSegmentAllocation()) {
+      this.managerExec = executorFactory.create(1, 
"SegmentAllocQueue-Manager-%s");
+      this.workerExec = executorFactory.create(
+          taskLockConfig.getBatchAllocationNumThreads(),
+          "SegmentAllocQueue-Worker-%s"
+      );
+    } else {
+      this.managerExec = null;
+      this.workerExec = null;
+    }
   }
 
   @LifecycleStart
   public void start()
   {
     if (isEnabled()) {
-      log.info("Initializing segment allocation queue.");
+      log.info(
+          "Initializing segment allocation queue with [%d] worker threads.",
+          config.getBatchAllocationNumThreads()
+      );
       scheduleQueuePoll(maxWaitTimeMillis);
     }
   }
@@ -122,7 +153,8 @@ public class SegmentAllocationQueue
   {
     if (isEnabled()) {
       log.info("Tearing down segment allocation queue.");
-      executor.shutdownNow();
+      managerExec.shutdownNow();
+      workerExec.shutdownNow();
     }
   }
 
@@ -153,16 +185,18 @@ public class SegmentAllocationQueue
 
   public boolean isEnabled()
   {
-    return executor != null && !executor.isShutdown();
+    return managerExec != null && !managerExec.isShutdown();
   }
 
   /**
-   * Schedules a poll of the allocation queue that runs on the {@link 
#executor}.
-   * It is okay to schedule multiple polls since the executor is single 
threaded.
+   * Schedules a poll of the allocation queue that runs on the {@link 
#managerExec},
+   * if not already scheduled.
    */
   private void scheduleQueuePoll(long delay)
   {
-    executor.schedule(this::processBatchesDue, delay, TimeUnit.MILLISECONDS);
+    if (isProcessingScheduled.compareAndSet(false, true)) {
+      managerExec.schedule(this::processBatchesDue, delay, 
TimeUnit.MILLISECONDS);
+    }
   }
 
   /**
@@ -253,46 +287,56 @@ public class SegmentAllocationQueue
     });
   }
 
+  /**
+   * Processes batches in the {@link #processingQueue} that are already due.
+   * This method must be invoked on the {@link #managerExec}.
+   */
   private void processBatchesDue()
   {
     clearQueueIfNotLeader();
-
-    // Process all the batches that are already due
-    int numProcessedBatches = 0;
-    AllocateRequestBatch nextBatch = processingQueue.peekFirst();
-    while (nextBatch != null && nextBatch.isDue()) {
-      // Process the next batch in the queue
-      processingQueue.pollFirst();
-      final AllocateRequestBatch currentBatch = nextBatch;
-      boolean processed;
-      try {
-        processed = processBatch(currentBatch);
-      }
-      catch (Throwable t) {
-        currentBatch.failPendingRequests(t);
-        processed = true;
-        log.error(t, "Error while processing batch[%s].", currentBatch.key);
-      }
-
-      // Requeue if not fully processed yet
-      if (processed) {
-        ++numProcessedBatches;
+    isProcessingScheduled.set(false);
+
+    // Process the batches that are already due
+    int numSubmittedBatches = 0;
+    int numSkippedBatches = 0;
+
+    // Although thread-safe, this iterator might not see entries added by other
+    // concurrent threads. Those entries will be handled in the next 
processBatchesDue().
+    final Iterator<AllocateRequestBatch> queueIterator = 
processingQueue.iterator();
+    while (queueIterator.hasNext() && runningDatasources.size() < 
config.getBatchAllocationNumThreads()) {
+      final AllocateRequestBatch nextBatch = queueIterator.next();
+      final String dataSource = nextBatch.key.dataSource;
+      if (nextBatch.isDue()) {
+        if (runningDatasources.contains(dataSource)) {
+          // Skip this batch as another batch for the same datasource is in 
progress
+          markBatchAsSkipped(nextBatch);
+          ++numSkippedBatches;
+        } else {
+          // Process this batch
+          queueIterator.remove();
+          runningDatasources.add(dataSource);
+          workerExec.submit(() -> runBatchOnWorker(nextBatch));
+          emitBatchMetric("task/action/batch/submitted", 1L, nextBatch.key);
+          ++numSubmittedBatches;
+        }
       } else {
-        requeueBatch(currentBatch);
+        break;
       }
-
-      nextBatch = processingQueue.peek();
     }
+    log.debug("Submitted [%d] batches, skipped [%d] batches.", 
numSubmittedBatches, numSkippedBatches);
 
-    // Schedule the next round of processing if the queue is not empty
-    if (processingQueue.isEmpty()) {
-      log.debug("Processed [%d] batches, not scheduling again since queue is 
empty.", numProcessedBatches);
+    // Schedule the next round of processing if there are available worker 
threads
+    if (runningDatasources.size() >= config.getBatchAllocationNumThreads()) {
+      log.debug("Not scheduling again since all worker threads are busy.");
+    } else if (numSkippedBatches >= processingQueue.size() || 
processingQueue.isEmpty()) {
+      // All remaining entries in the queue were skipped
+      log.debug("Not scheduling again since there are no eligible batches 
(skipped [%d]).", numSkippedBatches);
     } else {
-      nextBatch = processingQueue.peek();
+      final AllocateRequestBatch nextBatch = processingQueue.peek();
       long timeElapsed = System.currentTimeMillis() - nextBatch.getQueueTime();
       long nextScheduleDelay = Math.max(0, maxWaitTimeMillis - timeElapsed);
       scheduleQueuePoll(nextScheduleDelay);
-      log.debug("Processed [%d] batches, next execution in [%d ms]", 
numProcessedBatches, nextScheduleDelay);
+      log.debug("Next execution in [%d ms]", nextScheduleDelay);
     }
   }
 
@@ -316,6 +360,48 @@ public class SegmentAllocationQueue
     }
   }
 
+  /**
+   * Runs the given batch. This method must be invoked on the {@link 
#workerExec}.
+   */
+  private void runBatchOnWorker(AllocateRequestBatch batch)
+  {
+    boolean processed;
+    try {
+      processed = processBatch(batch);
+    }
+    catch (Throwable t) {
+      batch.failPendingRequests(t);
+      processed = true;
+      log.error(t, "Error while processing batch[%s].", batch.key);
+    }
+    finally {
+      runningDatasources.remove(batch.key.dataSource);
+    }
+
+    // Requeue if not fully processed yet
+    if (!processed) {
+      requeueBatch(batch);
+    }
+
+    scheduleQueuePoll(0);
+  }
+
+  /**
+   * Marks the given batch as skipped.
+   */
+  private void markBatchAsSkipped(AllocateRequestBatch batch)
+  {
+    keyToBatch.compute(
+        batch.key,
+        (batchKey, latestBatchForKey) -> {
+          if (latestBatchForKey != null) {
+            latestBatchForKey.markSkipped();
+          }
+          return latestBatchForKey;
+        }
+    );
+  }
+
   /**
    * Processes the given batch. This method marks the batch as started and
    * removes it from the map {@link #keyToBatch} so that no more requests can 
be
@@ -552,6 +638,7 @@ public class SegmentAllocationQueue
     private long queueTimeMillis;
     private final AllocateRequestKey key;
     private boolean started = false;
+    private boolean skipped = false;
 
     /**
      * Map from allocate requests (represents a single SegmentAllocateAction)
@@ -579,6 +666,7 @@ public class SegmentAllocationQueue
     {
       queueTimeMillis = System.currentTimeMillis();
       started = false;
+      skipped = false;
     }
 
     void markStarted()
@@ -591,6 +679,14 @@ public class SegmentAllocationQueue
       return started;
     }
 
+    void markSkipped()
+    {
+      if (!skipped) {
+        skipped = true;
+        emitBatchMetric("task/action/batch/skipped", 1L, key);
+      }
+    }
+
     boolean isFull()
     {
       return size() >= MAX_BATCH_SIZE;
diff --git 
a/indexing-service/src/main/java/org/apache/druid/indexing/overlord/config/TaskLockConfig.java
 
b/indexing-service/src/main/java/org/apache/druid/indexing/overlord/config/TaskLockConfig.java
index bc3acdc597d..4447f5fe02e 100644
--- 
a/indexing-service/src/main/java/org/apache/druid/indexing/overlord/config/TaskLockConfig.java
+++ 
b/indexing-service/src/main/java/org/apache/druid/indexing/overlord/config/TaskLockConfig.java
@@ -39,6 +39,9 @@ public class TaskLockConfig
   @JsonProperty
   private boolean batchAllocationReduceMetadataIO = true;
 
+  @JsonProperty
+  private int batchAllocationNumThreads = 5;
+
   public boolean isForceTimeChunkLock()
   {
     return forceTimeChunkLock;
@@ -59,4 +62,8 @@ public class TaskLockConfig
     return batchAllocationReduceMetadataIO;
   }
 
+  public int getBatchAllocationNumThreads()
+  {
+    return Math.max(1, batchAllocationNumThreads);
+  }
 }
diff --git 
a/indexing-service/src/test/java/org/apache/druid/indexing/common/actions/SegmentAllocationQueueTest.java
 
b/indexing-service/src/test/java/org/apache/druid/indexing/common/actions/SegmentAllocationQueueTest.java
index 0b39b2cbef3..57923cd0f55 100644
--- 
a/indexing-service/src/test/java/org/apache/druid/indexing/common/actions/SegmentAllocationQueueTest.java
+++ 
b/indexing-service/src/test/java/org/apache/druid/indexing/common/actions/SegmentAllocationQueueTest.java
@@ -55,7 +55,8 @@ public class SegmentAllocationQueueTest
   private SegmentAllocationQueue allocationQueue;
 
   private StubServiceEmitter emitter;
-  private BlockingExecutorService executor;
+  private BlockingExecutorService managerExec;
+  private BlockingExecutorService workerExec;
 
   private final boolean reduceMetadataIO;
 
@@ -80,8 +81,9 @@ public class SegmentAllocationQueueTest
   @Before
   public void setUp()
   {
-    executor = new BlockingExecutorService("alloc-test-exec");
-    emitter = new StubServiceEmitter("overlord", "alloc-test");
+    managerExec = new BlockingExecutorService("test-manager-exec");
+    workerExec = new BlockingExecutorService("test-worker-exec");
+    emitter = new StubServiceEmitter();
 
     final TaskLockConfig lockConfig = new TaskLockConfig()
     {
@@ -102,6 +104,12 @@ public class SegmentAllocationQueueTest
       {
         return reduceMetadataIO;
       }
+
+      @Override
+      public int getBatchAllocationNumThreads()
+      {
+        return 20;
+      }
     };
 
     allocationQueue = new SegmentAllocationQueue(
@@ -109,8 +117,11 @@ public class SegmentAllocationQueueTest
         lockConfig,
         taskActionTestKit.getMetadataStorageCoordinator(),
         emitter,
-        (corePoolSize, nameFormat)
-            -> new WrappingScheduledExecutorService(nameFormat, executor, 
false)
+        (corePoolSize, nameFormat) -> new WrappingScheduledExecutorService(
+            nameFormat,
+            nameFormat.contains("Manager") ? managerExec : workerExec,
+            false
+        )
     );
     allocationQueue.start();
     allocationQueue.becomeLeader();
@@ -122,8 +133,8 @@ public class SegmentAllocationQueueTest
     if (allocationQueue != null) {
       allocationQueue.stop();
     }
-    if (executor != null) {
-      executor.shutdownNow();
+    if (managerExec != null) {
+      managerExec.shutdownNow();
     }
     emitter.flush();
   }
@@ -247,7 +258,8 @@ public class SegmentAllocationQueueTest
                          .build();
     Future<SegmentIdWithShardSpec> halfHourSegmentFuture = 
allocationQueue.add(halfHourSegmentRequest);
 
-    executor.finishNextPendingTask();
+    processDistinctDatasourceBatches();
+    processDistinctDatasourceBatches();
 
     Assert.assertNotNull(getSegmentId(hourSegmentFuture));
     Assert.assertNull(getSegmentId(halfHourSegmentFuture));
@@ -305,13 +317,18 @@ public class SegmentAllocationQueueTest
       segmentFutures.add(allocationQueue.add(request));
     }
 
-    executor.finishNextPendingTask();
+    for (int i = 0; i < 10; ++i) {
+      processDistinctDatasourceBatches();
+    }
 
     SegmentIdWithShardSpec segmentId1 = getSegmentId(segmentFutures.get(0));
 
     for (Future<SegmentIdWithShardSpec> future : segmentFutures) {
       Assert.assertEquals(getSegmentId(future), segmentId1);
     }
+
+    // Verify each datasource batch is marked skipped just once
+    emitter.verifySum("task/action/batch/skipped", 9);
   }
 
   @Test
@@ -331,7 +348,7 @@ public class SegmentAllocationQueueTest
     }
 
     allocationQueue.stopBeingLeader();
-    executor.finishNextPendingTask();
+    processDistinctDatasourceBatches();
 
     for (Future<SegmentIdWithShardSpec> future : segmentFutures) {
       Throwable t = Assert.assertThrows(ISE.class, () -> getSegmentId(future));
@@ -352,8 +369,11 @@ public class SegmentAllocationQueueTest
     final int expectedCount = canBatch ? 1 : 2;
     Assert.assertEquals(expectedCount, allocationQueue.size());
 
-    executor.finishNextPendingTask();
+    // Process both the jobs
+    processDistinctDatasourceBatches();
+    processDistinctDatasourceBatches();
     emitter.verifyEmitted("task/action/batch/size", expectedCount);
+    emitter.verifySum("task/action/batch/submitted", expectedCount);
 
     Assert.assertNotNull(getSegmentId(futureA));
     Assert.assertNotNull(getSegmentId(futureB));
@@ -389,4 +409,16 @@ public class SegmentAllocationQueueTest
     taskActionTestKit.getTaskLockbox().add(task);
     return task;
   }
+
+  /**
+   * Triggers the {@link #managerExec} and the {@link #workerExec} to process a
+   * queued set of jobs (i.e. segment allocation batches) for distinct 
datasources.
+   * A single invocation of this method can process up to 1 batch for any given
+   * datasource and up to {@code batchAllocationNumThreads} batches total.
+   */
+  private void processDistinctDatasourceBatches()
+  {
+    managerExec.finishNextPendingTask();
+    workerExec.finishAllPendingTasks();
+  }
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to