kfaraz commented on code in PR #18098:
URL: https://github.com/apache/druid/pull/18098#discussion_r2140519121


##########
indexing-service/src/main/java/org/apache/druid/indexing/common/actions/SegmentAllocationQueue.java:
##########
@@ -253,46 +287,58 @@ private void requeueBatch(AllocateRequestBatch batch)
     });
   }
 
+  /**
+   * Processes batches in the {@link #processingQueue} that are already due.
+   * This method must be invoked on the {@link #managerExec}.
+   */
   private void processBatchesDue()
   {
     clearQueueIfNotLeader();
-
-    // Process all the batches that are already due
-    int numProcessedBatches = 0;
-    AllocateRequestBatch nextBatch = processingQueue.peekFirst();
-    while (nextBatch != null && nextBatch.isDue()) {
-      // Process the next batch in the queue
-      processingQueue.pollFirst();
-      final AllocateRequestBatch currentBatch = nextBatch;
-      boolean processed;
-      try {
-        processed = processBatch(currentBatch);
-      }
-      catch (Throwable t) {
-        currentBatch.failPendingRequests(t);
-        processed = true;
-        log.error(t, "Error while processing batch[%s].", currentBatch.key);
-      }
-
-      // Requeue if not fully processed yet
-      if (processed) {
-        ++numProcessedBatches;
+    isProcessingScheduled.set(false);
+
+    // Process the batches that are already due
+    int numSubmittedBatches = 0;
+    int numSkippedBatches = 0;
+
+    // Although thread-safe, this iterator might not see entries added by other
+    // concurrent threads. Those entries will be handled in the next 
processBatchesDue().
+    final Iterator<AllocateRequestBatch> queueIterator = 
processingQueue.iterator();
+    while (queueIterator.hasNext() && runningDatasources.size() < 
config.getBatchAllocationNumThreads()) {
+      final AllocateRequestBatch nextBatch = queueIterator.next();
+      final String dataSource = nextBatch.key.dataSource;
+      if (nextBatch.isDue()) {
+        if (runningDatasources.contains(dataSource)) {
+          // Skip this batch as another batch for the same datasource is in 
progress
+          markBatchAsSkipped(nextBatch);
+          ++numSkippedBatches;
+        } else {
+          // Process this batch
+          queueIterator.remove();
+          runningDatasources.add(dataSource);
+          workerExec.submit(() -> runBatchOnWorker(nextBatch));
+          emitBatchMetric("task/action/batch/submitted", 1L, nextBatch.key);
+          ++numSubmittedBatches;
+        }
       } else {
-        requeueBatch(currentBatch);
+        break;
       }
-
-      nextBatch = processingQueue.peek();
     }
+    log.debug("Submitted [%d] batches, skipped [%d] batches.", 
numSubmittedBatches, numSkippedBatches);
 
-    // Schedule the next round of processing if the queue is not empty
-    if (processingQueue.isEmpty()) {
-      log.debug("Processed [%d] batches, not scheduling again since queue is 
empty.", numProcessedBatches);
+    // Schedule the next round of processing if there are available worker 
threads
+    if (runningDatasources.size() >= config.getBatchAllocationNumThreads()) {
+      log.debug("Not scheduling again since all worker threads are busy.");
+    } else if (numSkippedBatches >= processingQueue.size()) {
+      // All remaining entries in the queue were skipped
+      log.debug("Not scheduling again since datasources are already being 
processed.");
+    } else if (processingQueue.isEmpty()) {
+      log.debug("Not scheduling again since queue is empty.");

Review Comment:
   Is it okay to keep it as-is for now since it seems to make the compiler 
happy?
   
   Otherwise it warns about a possible NPE.
   
   <img width="1505" alt="Screenshot 2025-06-11 at 9 11 31 PM" 
src="https://github.com/user-attachments/assets/393065d6-2150-4a86-8d88-00546c71c445";
 />
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@druid.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@druid.apache.org
For additional commands, e-mail: commits-h...@druid.apache.org

Reply via email to