kfaraz commented on code in PR #18098: URL: https://github.com/apache/druid/pull/18098#discussion_r2140519121
########## indexing-service/src/main/java/org/apache/druid/indexing/common/actions/SegmentAllocationQueue.java: ########## @@ -253,46 +287,58 @@ private void requeueBatch(AllocateRequestBatch batch) }); } + /** + * Processes batches in the {@link #processingQueue} that are already due. + * This method must be invoked on the {@link #managerExec}. + */ private void processBatchesDue() { clearQueueIfNotLeader(); - - // Process all the batches that are already due - int numProcessedBatches = 0; - AllocateRequestBatch nextBatch = processingQueue.peekFirst(); - while (nextBatch != null && nextBatch.isDue()) { - // Process the next batch in the queue - processingQueue.pollFirst(); - final AllocateRequestBatch currentBatch = nextBatch; - boolean processed; - try { - processed = processBatch(currentBatch); - } - catch (Throwable t) { - currentBatch.failPendingRequests(t); - processed = true; - log.error(t, "Error while processing batch[%s].", currentBatch.key); - } - - // Requeue if not fully processed yet - if (processed) { - ++numProcessedBatches; + isProcessingScheduled.set(false); + + // Process the batches that are already due + int numSubmittedBatches = 0; + int numSkippedBatches = 0; + + // Although thread-safe, this iterator might not see entries added by other + // concurrent threads. Those entries will be handled in the next processBatchesDue(). + final Iterator<AllocateRequestBatch> queueIterator = processingQueue.iterator(); + while (queueIterator.hasNext() && runningDatasources.size() < config.getBatchAllocationNumThreads()) { + final AllocateRequestBatch nextBatch = queueIterator.next(); + final String dataSource = nextBatch.key.dataSource; + if (nextBatch.isDue()) { + if (runningDatasources.contains(dataSource)) { + // Skip this batch as another batch for the same datasource is in progress + markBatchAsSkipped(nextBatch); + ++numSkippedBatches; + } else { + // Process this batch + queueIterator.remove(); + runningDatasources.add(dataSource); + workerExec.submit(() -> runBatchOnWorker(nextBatch)); + emitBatchMetric("task/action/batch/submitted", 1L, nextBatch.key); + ++numSubmittedBatches; + } } else { - requeueBatch(currentBatch); + break; } - - nextBatch = processingQueue.peek(); } + log.debug("Submitted [%d] batches, skipped [%d] batches.", numSubmittedBatches, numSkippedBatches); - // Schedule the next round of processing if the queue is not empty - if (processingQueue.isEmpty()) { - log.debug("Processed [%d] batches, not scheduling again since queue is empty.", numProcessedBatches); + // Schedule the next round of processing if there are available worker threads + if (runningDatasources.size() >= config.getBatchAllocationNumThreads()) { + log.debug("Not scheduling again since all worker threads are busy."); + } else if (numSkippedBatches >= processingQueue.size()) { + // All remaining entries in the queue were skipped + log.debug("Not scheduling again since datasources are already being processed."); + } else if (processingQueue.isEmpty()) { + log.debug("Not scheduling again since queue is empty."); Review Comment: Is it okay to keep it as-is for now since it seems to make the compiler happy? Otherwise it warns about a possible NPE. <img width="1505" alt="Screenshot 2025-06-11 at 9 11 31 PM" src="https://github.com/user-attachments/assets/393065d6-2150-4a86-8d88-00546c71c445" /> -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@druid.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@druid.apache.org For additional commands, e-mail: commits-h...@druid.apache.org