This is an automated email from the ASF dual-hosted git repository.

amatya pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/druid.git


The following commit(s) were added to refs/heads/master by this push:
     new c937068625e Improve polling in segment allocation queue (#15590)
c937068625e is described below

commit c937068625e41d49b91316102fe5d41d5f38a119
Author: Kashif Faraz <[email protected]>
AuthorDate: Thu Jan 4 17:42:02 2024 +0530

    Improve polling in segment allocation queue (#15590)
    
    Description
    When batchAllocationWaitTime is set to 0, the segment allocation queue is 
polled continuously even when it is empty. This would take up cpu cycles 
unnecessarily.
    
    Some existing race conditions would also become more frequent when the 
batchAllocationWaitTime is 0. This PR tries to better address those race 
conditions as well.
    
    Changes
    Do not reschedule a poll if queue is empty
    When a new batch is added to queue, schedule a poll
    Simplify keyToBatch map
    Handle race conditions better
    As soon as a batch starts getting processed, do not add any more requests 
to it
---
 .../common/actions/SegmentAllocationQueue.java     | 199 +++++++++++----------
 1 file changed, 100 insertions(+), 99 deletions(-)

diff --git 
a/indexing-service/src/main/java/org/apache/druid/indexing/common/actions/SegmentAllocationQueue.java
 
b/indexing-service/src/main/java/org/apache/druid/indexing/common/actions/SegmentAllocationQueue.java
index 6638c2f2578..6986ec683a5 100644
--- 
a/indexing-service/src/main/java/org/apache/druid/indexing/common/actions/SegmentAllocationQueue.java
+++ 
b/indexing-service/src/main/java/org/apache/druid/indexing/common/actions/SegmentAllocationQueue.java
@@ -75,13 +75,17 @@ public class SegmentAllocationQueue
   private final long maxWaitTimeMillis;
 
   private final TaskLockbox taskLockbox;
-  private final ScheduledExecutorService executor;
   private final IndexerMetadataStorageCoordinator metadataStorage;
   private final AtomicBoolean isLeader = new AtomicBoolean(false);
   private final ServiceEmitter emitter;
 
+  /**
+   * Single-threaded executor to process allocation queue.
+   */
+  private final ScheduledExecutorService executor;
+
   private final ConcurrentHashMap<AllocateRequestKey, AllocateRequestBatch> 
keyToBatch = new ConcurrentHashMap<>();
-  private final BlockingDeque<AllocateRequestKey> processingQueue = new 
LinkedBlockingDeque<>(MAX_QUEUE_SIZE);
+  private final BlockingDeque<AllocateRequestBatch> processingQueue = new 
LinkedBlockingDeque<>(MAX_QUEUE_SIZE);
 
   @Inject
   public SegmentAllocationQueue(
@@ -149,6 +153,10 @@ public class SegmentAllocationQueue
     return executor != null && !executor.isShutdown();
   }
 
+  /**
+   * Schedules a poll of the allocation queue that runs on the {@link 
#executor}.
+   * It is okay to schedule multiple polls since the executor is single 
threaded.
+   */
   private void scheduleQueuePoll(long delay)
   {
     executor.schedule(this::processBatchesDue, delay, TimeUnit.MILLISECONDS);
@@ -174,19 +182,20 @@ public class SegmentAllocationQueue
       throw new ISE("Batched segment allocation is disabled.");
     }
 
-    final AllocateRequestKey requestKey = getKeyForAvailableBatch(request);
+    final AllocateRequestKey requestKey = new AllocateRequestKey(request);
     final AtomicReference<Future<SegmentIdWithShardSpec>> futureReference = 
new AtomicReference<>();
 
     // Possible race condition:
     // t1 -> new batch is added to queue or batch already exists in queue
     // t2 -> executor pops batch, processes all requests in it
     // t1 -> new request is added to dangling batch and is never picked up
-    // Solution: For existing batch, call keyToBatch.remove() on the key to
-    // wait on keyToBatch.compute() to finish before proceeding with 
processBatch().
-    // For new batch, keyToBatch.remove() would not wait as key is not in map 
yet
-    // but a new batch is unlikely to be due immediately, so it won't get 
popped right away.
+    // Solution: Perform the following operations only inside 
keyToBatch.compute():
+    // 1. Add or remove from map
+    // 2. Add batch to queue
+    // 3. Mark batch as started
+    // 4. Update requests in batch
     keyToBatch.compute(requestKey, (key, existingBatch) -> {
-      if (existingBatch == null) {
+      if (existingBatch == null || existingBatch.isStarted() || 
existingBatch.isFull()) {
         AllocateRequestBatch newBatch = new AllocateRequestBatch(key);
         futureReference.set(newBatch.add(request));
         return addBatchToQueue(newBatch) ? newBatch : null;
@@ -199,36 +208,19 @@ public class SegmentAllocationQueue
     return futureReference.get();
   }
 
-  /**
-   * Returns the key for a batch that is not added to the queue yet and/or has
-   * available space. Throws an exception if the queue is already full and no
-   * batch has available capacity.
-   */
-  private AllocateRequestKey getKeyForAvailableBatch(SegmentAllocateRequest 
request)
-  {
-    for (int batchIncrementalId = 0; batchIncrementalId < MAX_QUEUE_SIZE; 
++batchIncrementalId) {
-      AllocateRequestKey nextKey = new AllocateRequestKey(request, 
maxWaitTimeMillis, batchIncrementalId);
-      AllocateRequestBatch nextBatch = keyToBatch.get(nextKey);
-      if (nextBatch == null || nextBatch.size() < MAX_BATCH_SIZE) {
-        return nextKey;
-      }
-    }
-
-    throw new ISE("Allocation queue is at capacity, all batches are full.");
-  }
-
   /**
    * Tries to add the given batch to the processing queue. Fails all the 
pending
    * requests in the batch if we are not leader or if the queue is full.
    */
   private boolean addBatchToQueue(AllocateRequestBatch batch)
   {
-    batch.key.resetQueueTime();
+    batch.resetQueueTime();
     if (!isLeader.get()) {
       batch.failPendingRequests("Not leader anymore");
       return false;
-    } else if (processingQueue.offer(batch.key)) {
-      log.debug("Added a new batch [%s] to queue.", batch.key);
+    } else if (processingQueue.offer(batch)) {
+      log.debug("Added a new batch for key[%s] to queue.", batch.key);
+      scheduleQueuePoll(maxWaitTimeMillis);
       return true;
     } else {
       batch.failPendingRequests(
@@ -248,7 +240,7 @@ public class SegmentAllocationQueue
   {
     log.info("Requeueing [%d] failed requests in batch [%s].", batch.size(), 
batch.key);
     keyToBatch.compute(batch.key, (key, existingBatch) -> {
-      if (existingBatch == null) {
+      if (existingBatch == null || existingBatch.isFull() || 
existingBatch.isStarted()) {
         return addBatchToQueue(batch) ? batch : null;
       } else {
         // Merge requests from this batch to existing one
@@ -262,44 +254,43 @@ public class SegmentAllocationQueue
   {
     clearQueueIfNotLeader();
 
+    // Process all the batches that are already due
     int numProcessedBatches = 0;
-    AllocateRequestKey nextKey = processingQueue.peekFirst();
-    while (nextKey != null && nextKey.isDue()) {
-      processingQueue.pollFirst();
-
+    AllocateRequestBatch nextBatch = processingQueue.peekFirst();
+    while (nextBatch != null && nextBatch.isDue()) {
       // Process the next batch in the queue
+      processingQueue.pollFirst();
+      final AllocateRequestBatch currentBatch = nextBatch;
       boolean processed;
-      AllocateRequestBatch nextBatch = keyToBatch.remove(nextKey);
       try {
-        processed = processBatch(nextBatch);
+        processed = processBatch(currentBatch);
       }
       catch (Throwable t) {
-        nextBatch.failPendingRequests(t);
+        currentBatch.failPendingRequests(t);
         processed = true;
-        log.error(t, "Error while processing batch [%s]", nextKey);
+        log.error(t, "Error while processing batch [%s]", currentBatch.key);
       }
 
       // Requeue if not fully processed yet
       if (processed) {
         ++numProcessedBatches;
       } else {
-        requeueBatch(nextBatch);
+        requeueBatch(currentBatch);
       }
 
-      nextKey = processingQueue.peek();
+      nextBatch = processingQueue.peek();
     }
 
-    // Schedule the next round of processing
-    final long nextScheduleDelay;
+    // Schedule the next round of processing if the queue is not empty
     if (processingQueue.isEmpty()) {
-      nextScheduleDelay = maxWaitTimeMillis;
+      log.debug("Processed [%d] batches, not scheduling again since queue is 
empty.", numProcessedBatches);
     } else {
-      nextKey = processingQueue.peek();
-      long timeElapsed = System.currentTimeMillis() - nextKey.getQueueTime();
-      nextScheduleDelay = Math.max(0, maxWaitTimeMillis - timeElapsed);
+      nextBatch = processingQueue.peek();
+      long timeElapsed = System.currentTimeMillis() - nextBatch.getQueueTime();
+      long nextScheduleDelay = Math.max(0, maxWaitTimeMillis - timeElapsed);
+      scheduleQueuePoll(nextScheduleDelay);
+      log.debug("Processed [%d] batches, next execution in [%d ms]", 
numProcessedBatches, nextScheduleDelay);
     }
-    scheduleQueuePoll(nextScheduleDelay);
-    log.debug("Processed [%d] batches, next execution in [%d ms]", 
numProcessedBatches, nextScheduleDelay);
   }
 
   /**
@@ -308,14 +299,14 @@ public class SegmentAllocationQueue
   private void clearQueueIfNotLeader()
   {
     int failedBatches = 0;
-    AllocateRequestKey nextKey = processingQueue.peekFirst();
-    while (nextKey != null && !isLeader.get()) {
+    AllocateRequestBatch nextBatch = processingQueue.peekFirst();
+    while (nextBatch != null && !isLeader.get()) {
       processingQueue.pollFirst();
-      AllocateRequestBatch nextBatch = keyToBatch.remove(nextKey);
+      keyToBatch.remove(nextBatch.key);
       nextBatch.failPendingRequests("Not leader anymore");
       ++failedBatches;
 
-      nextKey = processingQueue.peekFirst();
+      nextBatch = processingQueue.peekFirst();
     }
     if (failedBatches > 0) {
       log.info("Not leader. Failed [%d] batches, remaining in queue [%d].", 
failedBatches, processingQueue.size());
@@ -323,11 +314,21 @@ public class SegmentAllocationQueue
   }
 
   /**
-   * Processes the given batch. Returns true if the batch was completely 
processed
-   * and should not be requeued.
+   * Processes the given batch. This method marks the batch as started and
+   * removes it from the map {@link #keyToBatch} so that no more requests can 
be
+   * added to it.
+   *
+   * @return true if the batch was completely processed and should not be 
requeued.
    */
   private boolean processBatch(AllocateRequestBatch requestBatch)
   {
+    keyToBatch.compute(requestBatch.key, (batchKey, latestBatchForKey) -> {
+      // Mark the batch as started so that no more requests are added to it
+      requestBatch.markStarted();
+      // Remove the corresponding key from the map if this is the latest batch 
for the key
+      return requestBatch.equals(latestBatchForKey) ? null : latestBatchForKey;
+    });
+
     final AllocateRequestKey requestKey = requestBatch.key;
     if (requestBatch.isEmpty()) {
       return true;
@@ -338,13 +339,13 @@ public class SegmentAllocationQueue
 
     log.debug(
         "Processing [%d] requests for batch [%s], queue time [%s].",
-        requestBatch.size(), requestKey, requestKey.getQueueTime()
+        requestBatch.size(), requestKey, requestBatch.getQueueTime()
     );
 
     final long startTimeMillis = System.currentTimeMillis();
     final int batchSize = requestBatch.size();
     emitBatchMetric("task/action/batch/size", batchSize, requestKey);
-    emitBatchMetric("task/action/batch/queueTime", (startTimeMillis - 
requestKey.getQueueTime()), requestKey);
+    emitBatchMetric("task/action/batch/queueTime", (startTimeMillis - 
requestBatch.getQueueTime()), requestKey);
 
     final Set<DataSegment> usedSegments = retrieveUsedSegments(requestKey);
     final int successCount = allocateSegmentsForBatch(requestBatch, 
usedSegments);
@@ -546,15 +547,13 @@ public class SegmentAllocationQueue
    */
   private class AllocateRequestBatch
   {
+    private long queueTimeMillis;
     private final AllocateRequestKey key;
+    private boolean started = false;
 
     /**
      * Map from allocate requests (represents a single SegmentAllocateAction)
      * to the future of allocated segment id.
-     * <p>
-     * This must be accessed through methods synchronized on this batch.
-     * It is to avoid races between a new request being added just when the 
batch
-     * is being processed.
      */
     private final Map<SegmentAllocateRequest, 
CompletableFuture<SegmentIdWithShardSpec>>
         requestToFuture = new HashMap<>();
@@ -564,29 +563,60 @@ public class SegmentAllocationQueue
       this.key = key;
     }
 
-    synchronized Future<SegmentIdWithShardSpec> add(SegmentAllocateRequest 
request)
+    long getQueueTime()
+    {
+      return queueTimeMillis;
+    }
+
+    boolean isDue()
+    {
+      return System.currentTimeMillis() - queueTimeMillis >= maxWaitTimeMillis;
+    }
+
+    void resetQueueTime()
+    {
+      queueTimeMillis = System.currentTimeMillis();
+      started = false;
+    }
+
+    void markStarted()
+    {
+      started = true;
+    }
+
+    boolean isStarted()
+    {
+      return started;
+    }
+
+    boolean isFull()
+    {
+      return size() >= MAX_BATCH_SIZE;
+    }
+
+    Future<SegmentIdWithShardSpec> add(SegmentAllocateRequest request)
     {
       log.debug("Adding request to batch [%s]: %s", key, request.getAction());
       return requestToFuture.computeIfAbsent(request, req -> new 
CompletableFuture<>());
     }
 
-    synchronized void transferRequestsFrom(AllocateRequestBatch batch)
+    void transferRequestsFrom(AllocateRequestBatch batch)
     {
       requestToFuture.putAll(batch.requestToFuture);
       batch.requestToFuture.clear();
     }
 
-    synchronized Set<SegmentAllocateRequest> getRequests()
+    Set<SegmentAllocateRequest> getRequests()
     {
       return new HashSet<>(requestToFuture.keySet());
     }
 
-    synchronized void failPendingRequests(String reason)
+    void failPendingRequests(String reason)
     {
       failPendingRequests(new ISE(reason));
     }
 
-    synchronized void failPendingRequests(Throwable cause)
+    void failPendingRequests(Throwable cause)
     {
       if (!requestToFuture.isEmpty()) {
         log.warn("Failing [%d] requests in batch [%s], reason [%s].", size(), 
cause.getMessage(), key);
@@ -598,7 +628,7 @@ public class SegmentAllocationQueue
       }
     }
 
-    synchronized void completePendingRequestsWithNull()
+    void completePendingRequestsWithNull()
     {
       if (requestToFuture.isEmpty()) {
         return;
@@ -611,7 +641,7 @@ public class SegmentAllocationQueue
       requestToFuture.clear();
     }
 
-    synchronized void handleResult(SegmentAllocateResult result, 
SegmentAllocateRequest request)
+    void handleResult(SegmentAllocateResult result, SegmentAllocateRequest 
request)
     {
       request.incrementAttempts();
 
@@ -634,12 +664,12 @@ public class SegmentAllocationQueue
       }
     }
 
-    synchronized boolean isEmpty()
+    boolean isEmpty()
     {
       return requestToFuture.isEmpty();
     }
 
-    synchronized int size()
+    int size()
     {
       return requestToFuture.size();
     }
@@ -650,14 +680,6 @@ public class SegmentAllocationQueue
    */
   private static class AllocateRequestKey
   {
-    /**
-     * ID to distinguish between two batches for the same datasource, groupId, 
etc.
-     */
-    private final int batchIncrementalId;
-
-    private long queueTimeMillis;
-    private final long maxWaitTimeMillis;
-
     private final String dataSource;
     private final String groupId;
     private final Interval preferredAllocationInterval;
@@ -675,12 +697,11 @@ public class SegmentAllocationQueue
      * Creates a new key for the given request. The batch for a unique key will
      * always contain a single request.
      */
-    AllocateRequestKey(SegmentAllocateRequest request, long maxWaitTimeMillis, 
int batchIncrementalId)
+    AllocateRequestKey(SegmentAllocateRequest request)
     {
       final SegmentAllocateAction action = request.getAction();
       final Task task = request.getTask();
 
-      this.batchIncrementalId = batchIncrementalId;
       this.dataSource = action.getDataSource();
       this.groupId = task.getGroupId();
       this.skipSegmentLineageCheck = action.isSkipSegmentLineageCheck();
@@ -694,30 +715,12 @@ public class SegmentAllocationQueue
       this.hash = Objects.hash(
           dataSource,
           groupId,
-          batchIncrementalId,
           skipSegmentLineageCheck,
           useNonRootGenPartitionSpace,
           preferredAllocationInterval,
           lockGranularity
       );
       this.serialized = serialize();
-
-      this.maxWaitTimeMillis = maxWaitTimeMillis;
-    }
-
-    void resetQueueTime()
-    {
-      queueTimeMillis = System.currentTimeMillis();
-    }
-
-    long getQueueTime()
-    {
-      return queueTimeMillis;
-    }
-
-    boolean isDue()
-    {
-      return System.currentTimeMillis() - queueTimeMillis >= maxWaitTimeMillis;
     }
 
     @Override
@@ -732,7 +735,6 @@ public class SegmentAllocationQueue
       AllocateRequestKey that = (AllocateRequestKey) o;
       return dataSource.equals(that.dataSource)
              && groupId.equals(that.groupId)
-             && batchIncrementalId == that.batchIncrementalId
              && skipSegmentLineageCheck == that.skipSegmentLineageCheck
              && useNonRootGenPartitionSpace == that.useNonRootGenPartitionSpace
              && 
preferredAllocationInterval.equals(that.preferredAllocationInterval)
@@ -756,7 +758,6 @@ public class SegmentAllocationQueue
       return "{" +
              "datasource='" + dataSource + '\'' +
              ", groupId='" + groupId + '\'' +
-             ", batchId=" + batchIncrementalId +
              ", lock=" + lockGranularity +
              ", allocInterval=" + preferredAllocationInterval +
              ", skipLineageCheck=" + skipSegmentLineageCheck +


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to