abhishekagarwal87 commented on code in PR #13369:
URL: https://github.com/apache/druid/pull/13369#discussion_r1027900519


##########
indexing-service/src/main/java/org/apache/druid/indexing/common/actions/SegmentAllocationQueue.java:
##########
@@ -0,0 +1,713 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.indexing.common.actions;
+
+import com.google.inject.Inject;
+import org.apache.druid.client.indexing.IndexingService;
+import org.apache.druid.discovery.DruidLeaderSelector;
+import org.apache.druid.guice.ManageLifecycle;
+import org.apache.druid.indexing.common.LockGranularity;
+import org.apache.druid.indexing.common.task.IndexTaskUtils;
+import org.apache.druid.indexing.common.task.Task;
+import org.apache.druid.indexing.overlord.IndexerMetadataStorageCoordinator;
+import org.apache.druid.indexing.overlord.Segments;
+import org.apache.druid.indexing.overlord.TaskLockbox;
+import org.apache.druid.indexing.overlord.config.TaskLockConfig;
+import org.apache.druid.java.util.common.ISE;
+import org.apache.druid.java.util.common.concurrent.ScheduledExecutors;
+import org.apache.druid.java.util.common.granularity.Granularity;
+import org.apache.druid.java.util.common.guava.Comparators;
+import org.apache.druid.java.util.common.lifecycle.LifecycleStart;
+import org.apache.druid.java.util.common.lifecycle.LifecycleStop;
+import org.apache.druid.java.util.common.logger.Logger;
+import org.apache.druid.java.util.emitter.service.ServiceEmitter;
+import org.apache.druid.java.util.emitter.service.ServiceMetricEvent;
+import org.apache.druid.query.DruidMetrics;
+import org.apache.druid.segment.realtime.appenderator.SegmentIdWithShardSpec;
+import org.apache.druid.timeline.DataSegment;
+import org.joda.time.Interval;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Objects;
+import java.util.Set;
+import java.util.TreeSet;
+import java.util.concurrent.BlockingDeque;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.Future;
+import java.util.concurrent.LinkedBlockingDeque;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicReference;
+
+/**
+ * Queue for {@link SegmentAllocateRequest}s.
+ */
+@ManageLifecycle
+public class SegmentAllocationQueue
+{
+  private static final Logger log = new Logger(SegmentAllocationQueue.class);
+
+  private static final int MAX_QUEUE_SIZE = 5000;
+
+  private final long maxWaitTimeMillis;
+  private final boolean enabled;
+
+  private final TaskLockbox taskLockbox;
+  private final ScheduledExecutorService executor;
+  private final IndexerMetadataStorageCoordinator metadataStorage;
+  private final DruidLeaderSelector leaderSelector;
+  private final ServiceEmitter emitter;
+
+  private final ConcurrentHashMap<AllocateRequestKey, AllocateRequestBatch> 
keyToBatch = new ConcurrentHashMap<>();
+  private final BlockingDeque<AllocateRequestBatch> processingQueue = new 
LinkedBlockingDeque<>(MAX_QUEUE_SIZE);
+
+  @Inject
+  public SegmentAllocationQueue(
+      TaskLockbox taskLockbox,
+      TaskLockConfig taskLockConfig,
+      IndexerMetadataStorageCoordinator metadataStorage,
+      @IndexingService DruidLeaderSelector leaderSelector,
+      ServiceEmitter emitter
+  )
+  {
+    this.emitter = emitter;
+    this.taskLockbox = taskLockbox;
+    this.metadataStorage = metadataStorage;
+    this.leaderSelector = leaderSelector;
+    this.maxWaitTimeMillis = taskLockConfig.getBatchAllocationMaxWaitTime();
+    this.enabled = taskLockConfig.isBatchSegmentAllocation();
+
+    this.executor = ScheduledExecutors.fixed(1, "SegmentAllocQueue-%s");
+  }
+
+  @LifecycleStart
+  public void start()
+  {
+    log.info("Initializing segment allocation queue.");
+  }
+
+  @LifecycleStop
+  public void stop()
+  {
+    log.info("Tearing down segment allocation queue.");
+    executor.shutdownNow();
+  }
+
+  public boolean isEnabled()
+  {
+    return enabled;
+  }
+
+  private void scheduleQueuePoll(long delay)
+  {
+    executor.schedule(this::processBatchesDue, delay, TimeUnit.MILLISECONDS);
+  }
+
+  /**
+   * Queues a SegmentAllocateRequest. The returned future may complete 
successfully
+   * with a non-null value or with a non-null value.
+   */
+  public Future<SegmentIdWithShardSpec> add(SegmentAllocateRequest request)
+  {
+    if (!leaderSelector.isLeader()) {
+      throw new ISE("Cannot allocate segment if not leader.");
+    } else if (!isEnabled()) {
+      throw new ISE("Batched segment allocation is disabled.");
+    }
+
+    final AllocateRequestKey requestKey = new AllocateRequestKey(request, 
false);
+    final AtomicReference<Future<SegmentIdWithShardSpec>> requestFuture = new 
AtomicReference<>();
+
+    // Possible race condition:
+    // t1 -> new batch is added to queue or batch already exists in queue
+    // t2 -> executor pops batch, processes all requests in it
+    // t1 -> new request is added to dangling batch and is never picked up
+    // Solution: For existing batch, call keyToBatch.remove() on the key to
+    // wait on keyToBatch.compute() to finish before proceeding with 
processBatch().
+    // For new batch, keyToBatch.remove() would not wait as key is not in map 
yet
+    // but a new batch is unlikely to be due immediately, so it won't get 
popped right away.
+    keyToBatch.compute(requestKey, (key, existingBatch) -> {
+      if (existingBatch == null) {
+        AllocateRequestBatch newBatch = new AllocateRequestBatch(key);
+        requestFuture.set(newBatch.add(request));
+        return addBatchToQueue(newBatch) ? newBatch : null;
+      } else {
+        requestFuture.set(existingBatch.add(request));
+        return existingBatch;
+      }
+    });
+
+    return requestFuture.get();
+  }
+
+  /**
+   * Tries to add the given batch to the processing queue. If the queue is 
full,
+   * marks the batch as completed failing all the pending requests.
+   */
+  private boolean addBatchToQueue(AllocateRequestBatch batch)
+  {
+    batch.resetQueueTime();
+    if (processingQueue.offer(batch)) {
+      log.debug("Added a new batch [%s] to queue.", batch.key);
+      return true;
+    } else {
+      log.warn("Cannot add batch [%s] as queue is full. Failing [%d] 
requests.", batch.key, batch.size());
+      batch.markCompleted();
+      return false;
+    }
+  }
+
+  /**
+   * Tries to add the given batch to the processing queue. If a batch already
+   * exists for this key, transfers all the requests from this batch to the
+   * existing one.
+   */
+  private void requeueBatch(AllocateRequestBatch batch)
+  {
+    log.info("Requeueing [%d] failed requests in batch [%s].", batch.size(), 
batch.key);
+    keyToBatch.compute(batch.key, (key, existingBatch) -> {
+      if (existingBatch == null) {
+        batch.resetQueueTime();
+        return addBatchToQueue(batch) ? batch : null;
+      } else {
+        // Merge requests from this batch to existing one
+        existingBatch.transferRequestsFrom(batch);
+        return existingBatch;
+      }
+    });
+  }
+
+  private void processBatchesDue()
+  {
+    // If not leader, clear the queue and do not schedule any more rounds of 
processing
+    if (!leaderSelector.isLeader()) {
+      log.info("Not leader anymore. Failing [%d] batches in queue.", 
processingQueue.size());
+
+      AllocateRequestBatch nextBatch = processingQueue.pollFirst();
+      while (nextBatch != null) {
+        nextBatch.markCompleted();
+        nextBatch = processingQueue.pollFirst();
+      }
+
+      keyToBatch.clear();
+      return;
+    }
+
+    // Process all batches which are due
+    log.debug("Processing all batches which are due for execution. Overall 
queue size [%d].", processingQueue.size());
+    int numProcessedBatches = 0;
+
+    AllocateRequestBatch nextBatch = processingQueue.peekFirst();
+    while (nextBatch != null && nextBatch.isDue()) {
+      processingQueue.pollFirst();
+      boolean processed;
+      try {
+        processed = processBatch(nextBatch);
+        ++numProcessedBatches;
+      }
+      catch (Throwable t) {
+        processed = true;
+        log.error(t, "Error while processing batch [%s]", nextBatch.key);
+      }
+
+      if (processed) {
+        nextBatch.markCompleted();
+      } else {
+        requeueBatch(nextBatch);
+      }
+
+      nextBatch = processingQueue.peek();
+    }
+
+    // Schedule the next round of processing
+    final long nextScheduleDelay;
+    if (processingQueue.isEmpty()) {
+      nextScheduleDelay = maxWaitTimeMillis;
+    } else {
+      nextBatch = processingQueue.peek();
+      long timeElapsed = System.currentTimeMillis() - nextBatch.getQueueTime();
+      nextScheduleDelay = Math.max(0, maxWaitTimeMillis - timeElapsed);
+    }
+    scheduleQueuePoll(nextScheduleDelay);
+    log.debug("Processed [%d] batches, next execution in [%d ms]", 
numProcessedBatches, nextScheduleDelay);
+  }
+
+  /**
+   * Processes the given batch. Returns true if the batch was completely 
processed
+   * and should not be requeued.
+   */
+  private boolean processBatch(AllocateRequestBatch requestBatch)
+  {
+    final AllocateRequestKey requestKey = requestBatch.key;
+    keyToBatch.remove(requestKey);
+    if (requestBatch.isEmpty()) {
+      return true;
+    }
+
+    log.info(
+        "Processing [%d] requests for batch [%s], queue time [%s].",
+        requestBatch.size(),
+        requestKey,
+        requestBatch.getQueueTime()
+    );
+
+    final long startTimeMillis = System.currentTimeMillis();
+    final int batchSize = requestBatch.size();
+    emitBatchMetric("task/action/batch/size", batchSize, requestKey);
+    emitBatchMetric("task/action/batch/queueTime", (startTimeMillis - 
requestBatch.getQueueTime()), requestKey);
+
+    final Set<DataSegment> usedSegments = retrieveUsedSegments(requestKey);
+    final int successCount = allocateSegmentsForBatch(requestBatch, 
usedSegments);
+
+    emitBatchMetric("task/action/batch/retries", 1L, requestKey);
+    emitBatchMetric("task/action/batch/runTime", (System.currentTimeMillis() - 
startTimeMillis), requestKey);
+    log.info("Successfully processed [%d / %d] requests in batch [%s].", 
successCount, batchSize, requestKey);
+
+    if (requestBatch.isEmpty()) {
+      log.info("All requests in batch [%s] have been processed.", requestKey);
+      return true;
+    }
+
+    // Requeue the batch only if used segments have changed
+    log.info("There are [%d] failed requests in batch [%s].", 
requestBatch.size(), requestKey);
+    final Set<DataSegment> updatedUsedSegments = 
retrieveUsedSegments(requestKey);
+
+    if (updatedUsedSegments.equals(usedSegments)) {
+      log.error("Used segments have not changed. Not requeueing failed 
requests.");
+      return true;
+    } else {
+      log.info("Used segments have changed. Requeuing failed requests");
+      return false;
+    }
+  }
+
+  private Set<DataSegment> retrieveUsedSegments(AllocateRequestKey key)
+  {
+    return new HashSet<>(
+        metadataStorage.retrieveUsedSegmentsForInterval(
+            key.dataSource,
+            key.preferredAllocationInterval,
+            Segments.ONLY_VISIBLE
+        )
+    );
+  }
+
+  private int allocateSegmentsForBatch(AllocateRequestBatch requestBatch, 
Set<DataSegment> usedSegments)
+  {
+    int successCount = 0;
+
+    final List<SegmentAllocateRequest> allRequests = 
requestBatch.getRequests();
+    final Set<SegmentAllocateRequest> pendingRequests = new HashSet<>();
+
+    if (usedSegments.isEmpty()) {
+      pendingRequests.addAll(allRequests);
+    } else {
+      final Interval[] sortedUsedSegmentIntervals = 
getSortedIntervals(usedSegments);
+      final Map<Interval, List<SegmentAllocateRequest>> usedIntervalToRequests 
= new HashMap<>();
+
+      for (SegmentAllocateRequest request : allRequests) {
+        // If there is an overlapping used segment interval, that interval is
+        // the only candidate for allocation
+        Interval overlappingInterval = findOverlappingInterval(
+            request.getRowInterval(),
+            sortedUsedSegmentIntervals
+        );
+
+        if (overlappingInterval == null) {
+          pendingRequests.add(request);
+        } else if (overlappingInterval.contains(request.getRowInterval())) {
+          // Found an enclosing interval, use this for allocation
+          usedIntervalToRequests.computeIfAbsent(overlappingInterval, i -> new 
ArrayList<>())
+                                .add(request);
+        }
+      }
+
+      // Try to allocate segments for the identified used segment intervals
+      // Do not retry the failed requests with other intervals unless the 
batch is requeued
+      for (Map.Entry<Interval, List<SegmentAllocateRequest>> entry : 
usedIntervalToRequests.entrySet()) {
+        List<SegmentAllocateRequest> successfulRequests = 
allocateSegmentsForInterval(
+            entry.getKey(),
+            entry.getValue(),
+            requestBatch
+        );
+        successCount += successfulRequests.size();
+      }
+    }
+
+    // For requests that do not overlap with a used segment, first try to 
allocate
+    // using the preferred granularity, then smaller granularities
+    for (Granularity granularity :
+        
Granularity.granularitiesFinerThan(requestBatch.key.preferredSegmentGranularity))
 {
+      Map<Interval, List<SegmentAllocateRequest>> requestsByInterval =
+          getRequestsByInterval(pendingRequests, granularity);
+
+      for (Map.Entry<Interval, List<SegmentAllocateRequest>> entry : 
requestsByInterval.entrySet()) {
+        List<SegmentAllocateRequest> successfulRequests = 
allocateSegmentsForInterval(
+            entry.getKey(),
+            entry.getValue(),
+            requestBatch
+        );
+        successCount += successfulRequests.size();
+        pendingRequests.removeAll(successfulRequests);
+      }
+    }
+
+    return successCount;
+  }
+
+  private Interval findOverlappingInterval(Interval searchInterval, Interval[] 
sortedIntervals)
+  {
+    int index = Arrays.binarySearch(
+        sortedIntervals,
+        searchInterval,
+        Comparators.intervalsByStartThenEnd()
+    );
+    if (index >= 0) {
+      return sortedIntervals[index];
+    }
+
+    // Key was not found, index returned from binarySearch is 
(-(insertionPoint) - 1)
+    index = -(index + 1);
+
+    // If the interval at index doesn't overlap, (index + 1) wouldn't overlap 
either
+    if (index < sortedIntervals.length) {
+      if (sortedIntervals[index].overlaps(searchInterval)) {
+        return sortedIntervals[index];
+      }
+    }
+
+    // If the interval at (index - 1) doesn't overlap, (index - 2) wouldn't 
overlap either
+    if (index > 0) {
+      if (sortedIntervals[index - 1].overlaps(searchInterval)) {
+        return sortedIntervals[index - 1];
+      }
+    }
+
+    return null;
+  }
+
+  private Interval[] getSortedIntervals(Set<DataSegment> usedSegments)
+  {
+    TreeSet<Interval> sortedSet = new 
TreeSet<>(Comparators.intervalsByStartThenEnd());
+    usedSegments.forEach(segment -> sortedSet.add(segment.getInterval()));
+    return sortedSet.toArray(new Interval[0]);
+  }
+
+  /**
+   * Tries to allocate segments for the given requests over the specified 
interval.
+   * Returns the list of requests for which segments were successfully 
allocated.
+   */
+  private List<SegmentAllocateRequest> allocateSegmentsForInterval(
+      Interval tryInterval,
+      List<SegmentAllocateRequest> requests,
+      AllocateRequestBatch requestBatch
+  )
+  {
+    if (requests.isEmpty()) {
+      return Collections.emptyList();
+    }
+
+    final AllocateRequestKey requestKey = requestBatch.key;
+    log.info(
+        "Trying allocation for [%d] requests, interval [%s] in batch [%s]",
+        requests.size(),
+        tryInterval,
+        requestKey
+    );
+
+    final List<SegmentAllocateResult> results = taskLockbox.allocateSegments(
+        requests,
+        requestKey.dataSource,
+        tryInterval,
+        requestKey.skipSegmentLineageCheck,
+        requestKey.lockGranularity
+    );
+
+    final List<SegmentAllocateRequest> successfulRequests = new ArrayList<>();
+    for (int i = 0; i < requests.size(); ++i) {
+      SegmentAllocateRequest request = requests.get(i);
+      SegmentAllocateResult result = results.get(i);
+      if (result.isSuccess()) {
+        successfulRequests.add(request);
+      }
+
+      requestBatch.handleResult(result, request);
+    }
+
+    return successfulRequests;
+  }
+
+  private Map<Interval, List<SegmentAllocateRequest>> getRequestsByInterval(
+      Set<SegmentAllocateRequest> requests,
+      Granularity tryGranularity
+  )
+  {
+    final Map<Interval, List<SegmentAllocateRequest>> tryIntervalToRequests = 
new HashMap<>();
+    for (SegmentAllocateRequest request : requests) {
+      Interval tryInterval = 
tryGranularity.bucket(request.getAction().getTimestamp());
+      if (tryInterval.contains(request.getRowInterval())) {
+        tryIntervalToRequests.computeIfAbsent(tryInterval, i -> new 
ArrayList<>()).add(request);
+      }
+    }
+    return tryIntervalToRequests;
+  }
+
+  private void emitTaskMetric(String metric, long value, 
SegmentAllocateRequest request)
+  {
+    final ServiceMetricEvent.Builder metricBuilder = 
ServiceMetricEvent.builder();
+    IndexTaskUtils.setTaskDimensions(metricBuilder, request.getTask());
+    metricBuilder.setDimension("taskActionType", SegmentAllocateAction.TYPE);
+    emitter.emit(metricBuilder.build(metric, value));
+  }
+
+  private void emitBatchMetric(String metric, long value, AllocateRequestKey 
key)
+  {
+    final ServiceMetricEvent.Builder metricBuilder = 
ServiceMetricEvent.builder();
+    metricBuilder.setDimension("taskActionType", SegmentAllocateAction.TYPE);
+    metricBuilder.setDimension(DruidMetrics.DATASOURCE, key.dataSource);
+    metricBuilder.setDimension(DruidMetrics.INTERVAL, 
key.preferredAllocationInterval.toString());
+    emitter.emit(metricBuilder.build(metric, value));
+  }
+
+  public void becomeLeader()
+  {
+    if (isEnabled()) {
+      // Start polling the queue
+      log.info("Elected leader. Starting queue processing.");
+      scheduleQueuePoll(maxWaitTimeMillis);
+    } else {
+      log.info(
+          "Elected leader but batched segment allocation is disabled. "
+          + "Segment allocation queue will not be used."
+      );
+    }
+  }
+
+  public void stopBeingLeader()
+  {
+    if (isEnabled()) {
+      log.info("Not leader anymore. Stopping queue processing.");
+    } else {
+      log.info("Not leader anymore. Segment allocation queue is already 
disabled.");
+    }
+  }
+
+  /**
+   * A batch of segment allocation requests.
+   */
+  private class AllocateRequestBatch
+  {
+    private long queueTimeMillis;
+    private final AllocateRequestKey key;
+
+    /**
+     * Map from allocate requests (represents a single SegmentAllocateAction)
+     * to the future of allocated segment id.
+     * <p>
+     * This must be accessed through methods synchronized on this batch.
+     * It is to avoid races between a new request being added just when the 
batch
+     * is being processed.
+     */
+    private final Map<SegmentAllocateRequest, 
CompletableFuture<SegmentIdWithShardSpec>>
+        requestToFuture = new HashMap<>();
+
+    AllocateRequestBatch(AllocateRequestKey key)
+    {
+      this.key = key;
+    }
+
+    synchronized Future<SegmentIdWithShardSpec> add(SegmentAllocateRequest 
request)
+    {
+      log.info("Adding request to batch [%s]: %s", key, request.getAction());
+      return requestToFuture.computeIfAbsent(request, req -> new 
CompletableFuture<>());
+    }
+
+    synchronized void transferRequestsFrom(AllocateRequestBatch batch)
+    {
+      requestToFuture.putAll(batch.requestToFuture);
+      batch.requestToFuture.clear();
+    }
+
+    synchronized List<SegmentAllocateRequest> getRequests()
+    {
+      return new ArrayList<>(requestToFuture.keySet());
+    }
+
+    synchronized void markCompleted()
+    {
+      if (!requestToFuture.isEmpty()) {
+        log.info("Marking [%d] requests in batch [%s] as failed.", size(), 
key);
+        requestToFuture.values().forEach(future -> future.complete(null));
+        requestToFuture.keySet().forEach(
+            request -> emitTaskMetric("task/action/failed/count", 1L, request)
+        );
+        requestToFuture.clear();
+      }
+    }
+
+    synchronized void handleResult(SegmentAllocateResult result, 
SegmentAllocateRequest request)
+    {
+      request.incrementAttempts();
+
+      if (result.isSuccess()) {
+        emitTaskMetric("task/action/success/count", 1L, request);
+        requestToFuture.remove(request).complete(result.getSegmentId());
+      } else if (request.canRetry()) {
+        log.debug(

Review Comment:
   this should be `info` or `warn` I think. If the retry is causing a delay, we 
will only know it from logs. 



##########
indexing-service/src/main/java/org/apache/druid/indexing/common/actions/SegmentAllocationQueue.java:
##########
@@ -0,0 +1,685 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.indexing.common.actions;
+
+import com.google.inject.Inject;
+import org.apache.druid.client.indexing.IndexingService;
+import org.apache.druid.discovery.DruidLeaderSelector;
+import org.apache.druid.guice.ManageLifecycle;
+import org.apache.druid.indexing.common.LockGranularity;
+import org.apache.druid.indexing.common.task.IndexTaskUtils;
+import org.apache.druid.indexing.common.task.Task;
+import org.apache.druid.indexing.overlord.IndexerMetadataStorageCoordinator;
+import org.apache.druid.indexing.overlord.Segments;
+import org.apache.druid.indexing.overlord.TaskLockbox;
+import org.apache.druid.indexing.overlord.config.TaskLockConfig;
+import org.apache.druid.java.util.common.ISE;
+import org.apache.druid.java.util.common.concurrent.ScheduledExecutors;
+import org.apache.druid.java.util.common.granularity.Granularity;
+import org.apache.druid.java.util.common.guava.Comparators;
+import org.apache.druid.java.util.common.lifecycle.LifecycleStart;
+import org.apache.druid.java.util.common.lifecycle.LifecycleStop;
+import org.apache.druid.java.util.common.logger.Logger;
+import org.apache.druid.java.util.emitter.service.ServiceEmitter;
+import org.apache.druid.java.util.emitter.service.ServiceMetricEvent;
+import org.apache.druid.query.DruidMetrics;
+import org.apache.druid.segment.realtime.appenderator.SegmentIdWithShardSpec;
+import org.apache.druid.timeline.DataSegment;
+import org.joda.time.Interval;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.Deque;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Objects;
+import java.util.Set;
+import java.util.TreeSet;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentLinkedDeque;
+import java.util.concurrent.Future;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicReference;
+
+/**
+ * Queue for {@link SegmentAllocateRequest}s.
+ */
+@ManageLifecycle
+public class SegmentAllocationQueue
+{
+  private static final Logger log = new Logger(SegmentAllocationQueue.class);
+
+  private final long maxWaitTimeMillis;
+  private final boolean enabled;
+
+  private final TaskLockbox taskLockbox;
+  private final ScheduledExecutorService executor;
+  private final IndexerMetadataStorageCoordinator metadataStorage;
+  private final DruidLeaderSelector leaderSelector;
+  private final ServiceEmitter emitter;
+
+  private final ConcurrentHashMap<AllocateRequestKey, AllocateRequestBatch> 
keyToBatch = new ConcurrentHashMap<>();
+  private final Deque<AllocateRequestBatch> processingQueue = new 
ConcurrentLinkedDeque<>();
+
+  @Inject
+  public SegmentAllocationQueue(
+      TaskLockbox taskLockbox,
+      TaskLockConfig taskLockConfig,
+      IndexerMetadataStorageCoordinator metadataStorage,
+      @IndexingService DruidLeaderSelector leaderSelector,
+      ServiceEmitter emitter
+  )
+  {
+    this.emitter = emitter;
+    this.taskLockbox = taskLockbox;
+    this.metadataStorage = metadataStorage;
+    this.leaderSelector = leaderSelector;
+    this.maxWaitTimeMillis = taskLockConfig.getBatchAllocationMaxWaitTime();
+    this.enabled = taskLockConfig.isBatchSegmentAllocation();
+
+    this.executor = ScheduledExecutors.fixed(1, "SegmentAllocQueue-%s");
+  }
+
+  @LifecycleStart
+  public void start()
+  {
+    log.info("Starting queue.");
+  }
+
+  @LifecycleStop
+  public void stop()
+  {
+    log.info("Stopping queue.");
+    executor.shutdownNow();
+  }
+
+  public boolean isEnabled()
+  {
+    return enabled;
+  }
+
+  private void scheduleQueuePoll(long delay)
+  {
+    executor.schedule(this::processBatchesDue, delay, TimeUnit.MILLISECONDS);
+  }
+
+  /**
+   * Queues a SegmentAllocateRequest. The returned future may complete 
successfully
+   * with a non-null value or with a non-null value.
+   */
+  public Future<SegmentIdWithShardSpec> add(SegmentAllocateRequest request)
+  {
+    if (!leaderSelector.isLeader()) {
+      throw new ISE("Cannot allocate segment if not leader.");
+    } else if (!isEnabled()) {
+      throw new ISE("Batched segment allocation is disabled.");
+    }
+
+    final AllocateRequestKey requestKey = new AllocateRequestKey(request, 
false);
+    final AtomicReference<Future<SegmentIdWithShardSpec>> requestFuture = new 
AtomicReference<>();
+
+    keyToBatch.compute(requestKey, (key, existingBatch) -> {
+      AllocateRequestBatch computedBatch = existingBatch;
+      if (computedBatch == null) {
+        computedBatch = new AllocateRequestBatch(key);
+        computedBatch.resetQueueTime();
+        processingQueue.offer(computedBatch);
+      }
+
+      // Possible race condition:
+      // t1 -> new batch is added to queue or batch already exists in queue
+      // t2 -> executor pops batch, processes all requests in it
+      // t1 -> new request is added to dangling batch and is never picked up
+      // Solution: For existing batch, call keyToBatch.remove() on the key to
+      // wait on keyToBatch.compute() to finish before proceeding with 
processBatch().
+      // For new batch, keyToBatch.remove() would not wait as key is not in 
map yet
+      // but a new batch is unlikely to be due immediately, so it won't get 
popped right away.
+      requestFuture.set(computedBatch.add(request));
+      return computedBatch;
+    });
+
+    return requestFuture.get();
+  }
+
+  private void requeueBatch(AllocateRequestBatch batch)
+  {
+    log.info("Requeueing [%d] failed requests in batch [%s].", batch.size(), 
batch.key);
+    keyToBatch.compute(batch.key, (key, existingBatch) -> {
+      if (existingBatch == null) {
+        batch.resetQueueTime();
+        processingQueue.offer(batch);
+        return batch;
+      } else {
+        // Merge requests from this batch to existing one
+        existingBatch.merge(batch);
+        return existingBatch;
+      }
+    });
+  }
+
+  private void processBatchesDue()
+  {
+    // If not leader, clear the queue and do not schedule any more rounds of 
processing
+    if (!leaderSelector.isLeader()) {
+      log.info("Not leader anymore. Clearing [%d] batches from queue.", 
processingQueue.size());
+      processingQueue.clear();
+      keyToBatch.clear();
+      return;
+    }
+
+    // Process all batches which are due
+    log.debug("Processing all batches which are due for execution.");
+    int numProcessedBatches = 0;
+
+    AllocateRequestBatch nextBatch = processingQueue.peek();
+    while (nextBatch != null && nextBatch.isDue()) {
+      processingQueue.poll();
+      boolean processed;
+      try {
+        processed = processBatch(nextBatch);
+        ++numProcessedBatches;
+      }
+      catch (Throwable t) {
+        processed = true;

Review Comment:
   well there could be db failures for example. If we don't retry, we should 
surface the failure to the caller. 



##########
indexing-service/src/main/java/org/apache/druid/indexing/common/actions/SegmentAllocationQueue.java:
##########
@@ -0,0 +1,694 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.indexing.common.actions;
+
+import com.google.inject.Inject;
+import org.apache.druid.guice.ManageLifecycle;
+import org.apache.druid.indexing.common.LockGranularity;
+import org.apache.druid.indexing.common.task.IndexTaskUtils;
+import org.apache.druid.indexing.common.task.Task;
+import org.apache.druid.indexing.overlord.IndexerMetadataStorageCoordinator;
+import org.apache.druid.indexing.overlord.Segments;
+import org.apache.druid.indexing.overlord.TaskLockbox;
+import org.apache.druid.indexing.overlord.config.TaskLockConfig;
+import org.apache.druid.java.util.common.ISE;
+import org.apache.druid.java.util.common.Intervals;
+import org.apache.druid.java.util.common.concurrent.ScheduledExecutorFactory;
+import org.apache.druid.java.util.common.granularity.Granularity;
+import org.apache.druid.java.util.common.guava.Comparators;
+import org.apache.druid.java.util.common.lifecycle.LifecycleStart;
+import org.apache.druid.java.util.common.lifecycle.LifecycleStop;
+import org.apache.druid.java.util.common.logger.Logger;
+import org.apache.druid.java.util.emitter.service.ServiceEmitter;
+import org.apache.druid.java.util.emitter.service.ServiceMetricEvent;
+import org.apache.druid.query.DruidMetrics;
+import org.apache.druid.segment.realtime.appenderator.SegmentIdWithShardSpec;
+import org.apache.druid.timeline.DataSegment;
+import org.joda.time.Interval;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Objects;
+import java.util.Set;
+import java.util.TreeSet;
+import java.util.concurrent.BlockingDeque;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.Future;
+import java.util.concurrent.LinkedBlockingDeque;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicReference;
+
+/**
+ * Queue for {@link SegmentAllocateRequest}s.
+ */
+@ManageLifecycle
+public class SegmentAllocationQueue
+{
+  private static final Logger log = new Logger(SegmentAllocationQueue.class);
+
+  private static final int MAX_QUEUE_SIZE = 2000;
+
+  private final long maxWaitTimeMillis;
+  private final boolean enabled;
+
+  private final TaskLockbox taskLockbox;
+  private final ScheduledExecutorService executor;
+  private final IndexerMetadataStorageCoordinator metadataStorage;
+  private final AtomicBoolean isLeader = new AtomicBoolean(false);
+  private final ServiceEmitter emitter;
+
+  private final ConcurrentHashMap<AllocateRequestKey, AllocateRequestBatch> 
keyToBatch = new ConcurrentHashMap<>();
+  private final BlockingDeque<AllocateRequestBatch> processingQueue = new 
LinkedBlockingDeque<>(MAX_QUEUE_SIZE);
+
+  @Inject
+  public SegmentAllocationQueue(
+      TaskLockbox taskLockbox,
+      TaskLockConfig taskLockConfig,
+      IndexerMetadataStorageCoordinator metadataStorage,
+      ServiceEmitter emitter,
+      ScheduledExecutorFactory executorFactory
+  )
+  {
+    this.emitter = emitter;
+    this.taskLockbox = taskLockbox;
+    this.metadataStorage = metadataStorage;
+    this.maxWaitTimeMillis = taskLockConfig.getBatchAllocationMaxWaitTime();
+    this.enabled = taskLockConfig.isBatchSegmentAllocation();
+
+    this.executor = executorFactory.create(1, "SegmentAllocQueue-%s");
+  }
+
+  @LifecycleStart
+  public void start()
+  {
+    log.info("Initializing segment allocation queue.");
+  }
+
+  @LifecycleStop
+  public void stop()
+  {
+    log.info("Tearing down segment allocation queue.");
+    executor.shutdownNow();
+  }
+
+  public boolean isEnabled()
+  {
+    return enabled;
+  }
+
+  private void scheduleQueuePoll(long delay)
+  {
+    executor.schedule(this::processBatchesDue, delay, TimeUnit.MILLISECONDS);
+  }
+
+  /**
+   * Gets the number of batches currently in the queue.
+   */
+  public int size()
+  {
+    return processingQueue.size();
+  }
+
+  /**
+   * Queues a SegmentAllocateRequest. The returned future may complete 
successfully
+   * with a non-null value or with a non-null value.
+   */
+  public Future<SegmentIdWithShardSpec> add(SegmentAllocateRequest request)
+  {
+    if (!isLeader.get()) {
+      throw new ISE("Cannot allocate segment if not leader.");
+    } else if (!isEnabled()) {
+      throw new ISE("Batched segment allocation is disabled.");
+    }
+
+    final AllocateRequestKey requestKey = new AllocateRequestKey(request, 
false);
+    final AtomicReference<Future<SegmentIdWithShardSpec>> requestFuture = new 
AtomicReference<>();
+
+    // Possible race condition:
+    // t1 -> new batch is added to queue or batch already exists in queue
+    // t2 -> executor pops batch, processes all requests in it
+    // t1 -> new request is added to dangling batch and is never picked up
+    // Solution: For existing batch, call keyToBatch.remove() on the key to
+    // wait on keyToBatch.compute() to finish before proceeding with 
processBatch().
+    // For new batch, keyToBatch.remove() would not wait as key is not in map 
yet
+    // but a new batch is unlikely to be due immediately, so it won't get 
popped right away.
+    keyToBatch.compute(requestKey, (key, existingBatch) -> {
+      if (existingBatch == null) {
+        AllocateRequestBatch newBatch = new AllocateRequestBatch(key);
+        requestFuture.set(newBatch.add(request));
+        return addBatchToQueue(newBatch) ? newBatch : null;
+      } else {
+        requestFuture.set(existingBatch.add(request));
+        return existingBatch;
+      }
+    });
+
+    return requestFuture.get();
+  }
+
+  /**
+   * Tries to add the given batch to the processing queue. If the queue is 
full,
+   * marks the batch as completed failing all the pending requests.
+   */
+  private boolean addBatchToQueue(AllocateRequestBatch batch)
+  {
+    batch.resetQueueTime();
+    if (processingQueue.offer(batch)) {
+      log.debug("Added a new batch [%s] to queue.", batch.key);
+      return true;
+    } else {
+      log.warn("Cannot add batch [%s] as queue is full. Failing [%d] 
requests.", batch.key, batch.size());
+      batch.markCompleted();
+      return false;
+    }
+  }
+
+  /**
+   * Tries to add the given batch to the processing queue. If a batch already
+   * exists for this key, transfers all the requests from this batch to the
+   * existing one.
+   */
+  private void requeueBatch(AllocateRequestBatch batch)
+  {
+    log.info("Requeueing [%d] failed requests in batch [%s].", batch.size(), 
batch.key);
+    keyToBatch.compute(batch.key, (key, existingBatch) -> {
+      if (existingBatch == null) {
+        return addBatchToQueue(batch) ? batch : null;
+      } else {
+        // Merge requests from this batch to existing one
+        existingBatch.transferRequestsFrom(batch);
+        return existingBatch;
+      }
+    });
+  }
+
+  private void processBatchesDue()
+  {
+    // If not leader, clear the queue and do not schedule any more rounds of 
processing
+    if (!isLeader.get()) {
+      log.info("Not leader anymore. Failing [%d] batches in queue.", 
processingQueue.size());
+
+      // Keep removing items from the queue as long as not leader
+      AllocateRequestBatch nextBatch = processingQueue.peekFirst();
+      while (nextBatch != null && !isLeader.get()) {
+        processingQueue.pollFirst();
+        keyToBatch.remove(nextBatch.key);
+        nextBatch.markCompleted();
+        nextBatch = processingQueue.peekFirst();
+      }
+    }
+
+    // Check once again for leadership
+    if (!isLeader.get()) {
+      return;
+    }

Review Comment:
   is this necessary? what if some items were added to the queue just before 
line 225? 



##########
indexing-service/src/main/java/org/apache/druid/indexing/common/actions/SegmentAllocationQueue.java:
##########
@@ -0,0 +1,694 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.indexing.common.actions;
+
+import com.google.inject.Inject;
+import org.apache.druid.guice.ManageLifecycle;
+import org.apache.druid.indexing.common.LockGranularity;
+import org.apache.druid.indexing.common.task.IndexTaskUtils;
+import org.apache.druid.indexing.common.task.Task;
+import org.apache.druid.indexing.overlord.IndexerMetadataStorageCoordinator;
+import org.apache.druid.indexing.overlord.Segments;
+import org.apache.druid.indexing.overlord.TaskLockbox;
+import org.apache.druid.indexing.overlord.config.TaskLockConfig;
+import org.apache.druid.java.util.common.ISE;
+import org.apache.druid.java.util.common.Intervals;
+import org.apache.druid.java.util.common.concurrent.ScheduledExecutorFactory;
+import org.apache.druid.java.util.common.granularity.Granularity;
+import org.apache.druid.java.util.common.guava.Comparators;
+import org.apache.druid.java.util.common.lifecycle.LifecycleStart;
+import org.apache.druid.java.util.common.lifecycle.LifecycleStop;
+import org.apache.druid.java.util.common.logger.Logger;
+import org.apache.druid.java.util.emitter.service.ServiceEmitter;
+import org.apache.druid.java.util.emitter.service.ServiceMetricEvent;
+import org.apache.druid.query.DruidMetrics;
+import org.apache.druid.segment.realtime.appenderator.SegmentIdWithShardSpec;
+import org.apache.druid.timeline.DataSegment;
+import org.joda.time.Interval;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Objects;
+import java.util.Set;
+import java.util.TreeSet;
+import java.util.concurrent.BlockingDeque;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.Future;
+import java.util.concurrent.LinkedBlockingDeque;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicReference;
+
+/**
+ * Queue for {@link SegmentAllocateRequest}s.
+ */
+@ManageLifecycle
+public class SegmentAllocationQueue
+{
+  private static final Logger log = new Logger(SegmentAllocationQueue.class);
+
+  private static final int MAX_QUEUE_SIZE = 2000;
+
+  private final long maxWaitTimeMillis;
+  private final boolean enabled;
+
+  private final TaskLockbox taskLockbox;
+  private final ScheduledExecutorService executor;
+  private final IndexerMetadataStorageCoordinator metadataStorage;
+  private final AtomicBoolean isLeader = new AtomicBoolean(false);
+  private final ServiceEmitter emitter;
+
+  private final ConcurrentHashMap<AllocateRequestKey, AllocateRequestBatch> 
keyToBatch = new ConcurrentHashMap<>();
+  private final BlockingDeque<AllocateRequestBatch> processingQueue = new 
LinkedBlockingDeque<>(MAX_QUEUE_SIZE);
+
+  @Inject
+  public SegmentAllocationQueue(
+      TaskLockbox taskLockbox,
+      TaskLockConfig taskLockConfig,
+      IndexerMetadataStorageCoordinator metadataStorage,
+      ServiceEmitter emitter,
+      ScheduledExecutorFactory executorFactory
+  )
+  {
+    this.emitter = emitter;
+    this.taskLockbox = taskLockbox;
+    this.metadataStorage = metadataStorage;
+    this.maxWaitTimeMillis = taskLockConfig.getBatchAllocationMaxWaitTime();
+    this.enabled = taskLockConfig.isBatchSegmentAllocation();
+
+    this.executor = executorFactory.create(1, "SegmentAllocQueue-%s");
+  }
+
+  @LifecycleStart
+  public void start()
+  {
+    log.info("Initializing segment allocation queue.");
+  }
+
+  @LifecycleStop
+  public void stop()
+  {
+    log.info("Tearing down segment allocation queue.");
+    executor.shutdownNow();
+  }
+
+  public boolean isEnabled()
+  {
+    return enabled;
+  }
+
+  private void scheduleQueuePoll(long delay)
+  {
+    executor.schedule(this::processBatchesDue, delay, TimeUnit.MILLISECONDS);
+  }
+
+  /**
+   * Gets the number of batches currently in the queue.
+   */
+  public int size()
+  {
+    return processingQueue.size();
+  }
+
+  /**
+   * Queues a SegmentAllocateRequest. The returned future may complete 
successfully
+   * with a non-null value or with a non-null value.
+   */
+  public Future<SegmentIdWithShardSpec> add(SegmentAllocateRequest request)
+  {
+    if (!isLeader.get()) {
+      throw new ISE("Cannot allocate segment if not leader.");
+    } else if (!isEnabled()) {
+      throw new ISE("Batched segment allocation is disabled.");
+    }
+
+    final AllocateRequestKey requestKey = new AllocateRequestKey(request, 
false);
+    final AtomicReference<Future<SegmentIdWithShardSpec>> requestFuture = new 
AtomicReference<>();
+
+    // Possible race condition:
+    // t1 -> new batch is added to queue or batch already exists in queue
+    // t2 -> executor pops batch, processes all requests in it
+    // t1 -> new request is added to dangling batch and is never picked up
+    // Solution: For existing batch, call keyToBatch.remove() on the key to
+    // wait on keyToBatch.compute() to finish before proceeding with 
processBatch().
+    // For new batch, keyToBatch.remove() would not wait as key is not in map 
yet
+    // but a new batch is unlikely to be due immediately, so it won't get 
popped right away.
+    keyToBatch.compute(requestKey, (key, existingBatch) -> {
+      if (existingBatch == null) {
+        AllocateRequestBatch newBatch = new AllocateRequestBatch(key);
+        requestFuture.set(newBatch.add(request));
+        return addBatchToQueue(newBatch) ? newBatch : null;
+      } else {
+        requestFuture.set(existingBatch.add(request));
+        return existingBatch;
+      }
+    });
+
+    return requestFuture.get();
+  }
+
+  /**
+   * Tries to add the given batch to the processing queue. If the queue is 
full,
+   * marks the batch as completed failing all the pending requests.
+   */
+  private boolean addBatchToQueue(AllocateRequestBatch batch)
+  {
+    batch.resetQueueTime();
+    if (processingQueue.offer(batch)) {
+      log.debug("Added a new batch [%s] to queue.", batch.key);
+      return true;
+    } else {
+      log.warn("Cannot add batch [%s] as queue is full. Failing [%d] 
requests.", batch.key, batch.size());

Review Comment:
   what happens as a result of this? I assume it will be caused when the 
metadata store is slow. can there be any other reason? should we add some 
possible corrective action here like checking metadata store sizing? 



##########
indexing-service/src/main/java/org/apache/druid/indexing/common/actions/SegmentAllocationQueue.java:
##########
@@ -0,0 +1,685 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.indexing.common.actions;
+
+import com.google.inject.Inject;
+import org.apache.druid.client.indexing.IndexingService;
+import org.apache.druid.discovery.DruidLeaderSelector;
+import org.apache.druid.guice.ManageLifecycle;
+import org.apache.druid.indexing.common.LockGranularity;
+import org.apache.druid.indexing.common.task.IndexTaskUtils;
+import org.apache.druid.indexing.common.task.Task;
+import org.apache.druid.indexing.overlord.IndexerMetadataStorageCoordinator;
+import org.apache.druid.indexing.overlord.Segments;
+import org.apache.druid.indexing.overlord.TaskLockbox;
+import org.apache.druid.indexing.overlord.config.TaskLockConfig;
+import org.apache.druid.java.util.common.ISE;
+import org.apache.druid.java.util.common.concurrent.ScheduledExecutors;
+import org.apache.druid.java.util.common.granularity.Granularity;
+import org.apache.druid.java.util.common.guava.Comparators;
+import org.apache.druid.java.util.common.lifecycle.LifecycleStart;
+import org.apache.druid.java.util.common.lifecycle.LifecycleStop;
+import org.apache.druid.java.util.common.logger.Logger;
+import org.apache.druid.java.util.emitter.service.ServiceEmitter;
+import org.apache.druid.java.util.emitter.service.ServiceMetricEvent;
+import org.apache.druid.query.DruidMetrics;
+import org.apache.druid.segment.realtime.appenderator.SegmentIdWithShardSpec;
+import org.apache.druid.timeline.DataSegment;
+import org.joda.time.Interval;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.Deque;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Objects;
+import java.util.Set;
+import java.util.TreeSet;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentLinkedDeque;
+import java.util.concurrent.Future;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicReference;
+
+/**
+ * Queue for {@link SegmentAllocateRequest}s.
+ */
+@ManageLifecycle
+public class SegmentAllocationQueue
+{
+  private static final Logger log = new Logger(SegmentAllocationQueue.class);
+
+  private final long maxWaitTimeMillis;
+  private final boolean enabled;
+
+  private final TaskLockbox taskLockbox;
+  private final ScheduledExecutorService executor;
+  private final IndexerMetadataStorageCoordinator metadataStorage;
+  private final DruidLeaderSelector leaderSelector;
+  private final ServiceEmitter emitter;
+
+  private final ConcurrentHashMap<AllocateRequestKey, AllocateRequestBatch> 
keyToBatch = new ConcurrentHashMap<>();
+  private final Deque<AllocateRequestBatch> processingQueue = new 
ConcurrentLinkedDeque<>();
+
+  @Inject
+  public SegmentAllocationQueue(
+      TaskLockbox taskLockbox,
+      TaskLockConfig taskLockConfig,
+      IndexerMetadataStorageCoordinator metadataStorage,
+      @IndexingService DruidLeaderSelector leaderSelector,
+      ServiceEmitter emitter
+  )
+  {
+    this.emitter = emitter;
+    this.taskLockbox = taskLockbox;
+    this.metadataStorage = metadataStorage;
+    this.leaderSelector = leaderSelector;
+    this.maxWaitTimeMillis = taskLockConfig.getBatchAllocationMaxWaitTime();
+    this.enabled = taskLockConfig.isBatchSegmentAllocation();
+
+    this.executor = ScheduledExecutors.fixed(1, "SegmentAllocQueue-%s");
+  }
+
+  @LifecycleStart
+  public void start()
+  {
+    log.info("Starting queue.");
+  }
+
+  @LifecycleStop
+  public void stop()
+  {
+    log.info("Stopping queue.");
+    executor.shutdownNow();
+  }
+
+  public boolean isEnabled()
+  {
+    return enabled;
+  }
+
+  private void scheduleQueuePoll(long delay)
+  {
+    executor.schedule(this::processBatchesDue, delay, TimeUnit.MILLISECONDS);
+  }
+
+  /**
+   * Queues a SegmentAllocateRequest. The returned future may complete 
successfully
+   * with a non-null value or with a non-null value.
+   */
+  public Future<SegmentIdWithShardSpec> add(SegmentAllocateRequest request)
+  {
+    if (!leaderSelector.isLeader()) {
+      throw new ISE("Cannot allocate segment if not leader.");
+    } else if (!isEnabled()) {
+      throw new ISE("Batched segment allocation is disabled.");
+    }
+
+    final AllocateRequestKey requestKey = new AllocateRequestKey(request, 
false);
+    final AtomicReference<Future<SegmentIdWithShardSpec>> requestFuture = new 
AtomicReference<>();
+
+    keyToBatch.compute(requestKey, (key, existingBatch) -> {
+      AllocateRequestBatch computedBatch = existingBatch;
+      if (computedBatch == null) {
+        computedBatch = new AllocateRequestBatch(key);
+        computedBatch.resetQueueTime();
+        processingQueue.offer(computedBatch);
+      }
+
+      // Possible race condition:
+      // t1 -> new batch is added to queue or batch already exists in queue
+      // t2 -> executor pops batch, processes all requests in it
+      // t1 -> new request is added to dangling batch and is never picked up
+      // Solution: For existing batch, call keyToBatch.remove() on the key to
+      // wait on keyToBatch.compute() to finish before proceeding with 
processBatch().
+      // For new batch, keyToBatch.remove() would not wait as key is not in 
map yet
+      // but a new batch is unlikely to be due immediately, so it won't get 
popped right away.
+      requestFuture.set(computedBatch.add(request));
+      return computedBatch;
+    });
+
+    return requestFuture.get();
+  }
+
+  private void requeueBatch(AllocateRequestBatch batch)
+  {
+    log.info("Requeueing [%d] failed requests in batch [%s].", batch.size(), 
batch.key);
+    keyToBatch.compute(batch.key, (key, existingBatch) -> {
+      if (existingBatch == null) {
+        batch.resetQueueTime();
+        processingQueue.offer(batch);
+        return batch;
+      } else {
+        // Merge requests from this batch to existing one
+        existingBatch.merge(batch);
+        return existingBatch;
+      }
+    });
+  }
+
+  private void processBatchesDue()
+  {
+    // If not leader, clear the queue and do not schedule any more rounds of 
processing
+    if (!leaderSelector.isLeader()) {
+      log.info("Not leader anymore. Clearing [%d] batches from queue.", 
processingQueue.size());
+      processingQueue.clear();
+      keyToBatch.clear();
+      return;
+    }
+
+    // Process all batches which are due
+    log.debug("Processing all batches which are due for execution.");
+    int numProcessedBatches = 0;
+
+    AllocateRequestBatch nextBatch = processingQueue.peek();
+    while (nextBatch != null && nextBatch.isDue()) {
+      processingQueue.poll();
+      boolean processed;
+      try {
+        processed = processBatch(nextBatch);
+        ++numProcessedBatches;
+      }
+      catch (Throwable t) {
+        processed = true;
+        log.error(t, "Error while processing batch [%s]", nextBatch.key);
+      }
+
+      if (processed) {
+        nextBatch.markCompleted();
+      } else {
+        requeueBatch(nextBatch);
+      }
+
+      nextBatch = processingQueue.peek();
+    }
+
+    // Schedule the next round of processing
+    final long nextScheduleDelay;
+    if (processingQueue.isEmpty()) {
+      nextScheduleDelay = maxWaitTimeMillis;
+    } else {
+      nextBatch = processingQueue.peek();
+      long timeElapsed = System.currentTimeMillis() - nextBatch.getQueueTime();
+      nextScheduleDelay = Math.max(0, maxWaitTimeMillis - timeElapsed);
+    }
+    scheduleQueuePoll(nextScheduleDelay);
+    log.debug("Processed [%d] batches, next execution in [%d ms]", 
numProcessedBatches, nextScheduleDelay);
+  }
+
+  /**
+   * Processes the given batch. Returns true if the batch was completely 
processed
+   * and should not be requeued.
+   */
+  private boolean processBatch(AllocateRequestBatch requestBatch)
+  {
+    final AllocateRequestKey requestKey = requestBatch.key;
+    keyToBatch.remove(requestKey);
+    if (requestBatch.isEmpty()) {
+      return true;
+    }
+
+    log.info(
+        "Processing [%d] requests for batch [%s], queue time [%s].",
+        requestBatch.size(),
+        requestKey,
+        requestBatch.getQueueTime()
+    );
+
+    final long startTimeMillis = System.currentTimeMillis();
+    final int batchSize = requestBatch.size();
+    emitBatchMetric("task/action/batch/size", batchSize, requestKey);
+    emitBatchMetric("task/action/batch/queueTime", (startTimeMillis - 
requestBatch.getQueueTime()), requestKey);
+
+    final Set<DataSegment> usedSegments = retrieveUsedSegments(requestKey);
+    final int successCount = allocateSegmentsForBatch(requestBatch, 
usedSegments);
+
+    emitBatchMetric("task/action/batch/retries", 1L, requestKey);
+    emitBatchMetric("task/action/batch/runTime", (System.currentTimeMillis() - 
startTimeMillis), requestKey);
+    log.info("Successfully processed [%d / %d] requests in batch [%s].", 
successCount, batchSize, requestKey);
+
+    if (requestBatch.isEmpty()) {
+      log.info("All requests in batch [%s] have been processed.", requestKey);
+      return true;
+    }
+
+    // Requeue the batch only if used segments have changed
+    log.info("There are [%d] failed requests in batch [%s].", 
requestBatch.size(), requestKey);
+    final Set<DataSegment> updatedUsedSegments = 
retrieveUsedSegments(requestKey);
+
+    if (updatedUsedSegments.equals(usedSegments)) {
+      log.error("Used segments have not changed. Not requeueing failed 
requests.");
+      return true;
+    } else {
+      log.info("Used segments have changed. Requeuing failed requests");
+      return false;
+    }
+  }
+
+  private Set<DataSegment> retrieveUsedSegments(AllocateRequestKey key)
+  {
+    return new HashSet<>(
+        metadataStorage.retrieveUsedSegmentsForInterval(
+            key.dataSource,
+            key.preferredAllocationInterval,
+            Segments.ONLY_VISIBLE
+        )
+    );
+  }
+
+  private int allocateSegmentsForBatch(AllocateRequestBatch requestBatch, 
Set<DataSegment> usedSegments)
+  {
+    int successCount = 0;
+
+    final List<SegmentAllocateRequest> allRequests = 
requestBatch.getRequests();
+    final Set<SegmentAllocateRequest> pendingRequests = new HashSet<>();
+
+    if (usedSegments.isEmpty()) {
+      pendingRequests.addAll(allRequests);
+    } else {
+      final Interval[] sortedUsedSegmentIntervals = 
getSortedIntervals(usedSegments);
+      final Map<Interval, List<SegmentAllocateRequest>> usedIntervalToRequests 
= new HashMap<>();
+
+      for (SegmentAllocateRequest request : allRequests) {
+        // If there is an overlapping used segment interval, that interval is
+        // the only candidate for allocation
+        Interval overlappingInterval = findOverlappingInterval(
+            request.getRowInterval(),
+            sortedUsedSegmentIntervals
+        );
+
+        if (overlappingInterval == null) {
+          pendingRequests.add(request);
+        } else if (overlappingInterval.contains(request.getRowInterval())) {
+          // Found an enclosing interval, use this for allocation
+          usedIntervalToRequests.computeIfAbsent(overlappingInterval, i -> new 
ArrayList<>())
+                                .add(request);
+        }
+      }
+
+      // Try to allocate segments for the identified used segment intervals
+      // Do not retry the failed requests with other intervals unless the 
batch is requeued
+      for (Map.Entry<Interval, List<SegmentAllocateRequest>> entry : 
usedIntervalToRequests.entrySet()) {
+        List<SegmentAllocateRequest> successfulRequests = 
allocateSegmentsForInterval(
+            entry.getKey(),
+            entry.getValue(),
+            requestBatch
+        );
+        successCount += successfulRequests.size();
+      }
+    }
+
+    // For requests that do not overlap with a used segment, first try to 
allocate
+    // using the preferred granularity, then smaller granularities
+    for (Granularity granularity :
+        
Granularity.granularitiesFinerThan(requestBatch.key.preferredSegmentGranularity))
 {
+      Map<Interval, List<SegmentAllocateRequest>> requestsByInterval =
+          getRequestsByInterval(pendingRequests, granularity);
+
+      for (Map.Entry<Interval, List<SegmentAllocateRequest>> entry : 
requestsByInterval.entrySet()) {
+        List<SegmentAllocateRequest> successfulRequests = 
allocateSegmentsForInterval(
+            entry.getKey(),
+            entry.getValue(),
+            requestBatch
+        );
+        successCount += successfulRequests.size();
+        pendingRequests.removeAll(successfulRequests);
+      }
+    }
+
+    return successCount;
+  }
+
+  private Interval findOverlappingInterval(Interval searchInterval, Interval[] 
sortedIntervals)
+  {
+    int index = Arrays.binarySearch(
+        sortedIntervals,
+        searchInterval,
+        Comparators.intervalsByStartThenEnd()
+    );
+    if (index >= 0) {
+      return sortedIntervals[index];
+    }
+
+    // Key was not found, returned index is (-(insertionPoint) - 1)
+    index = -(index + 1);

Review Comment:
   oh, I see. 



##########
indexing-service/src/main/java/org/apache/druid/indexing/overlord/TaskLockbox.java:
##########
@@ -446,6 +449,140 @@ public LockResult tryLock(final Task task, final 
LockRequest request)
     }
   }
 
+  /**
+   * Attempts to allocate segments for the given requests. Each request 
contains
+   * a {@link Task} and a {@link SegmentAllocateAction}. This method tries to
+   * acquire the task locks on the required intervals/segments and then 
performs
+   * a batch allocation of segments. It is possible that some requests succeed
+   * successfully and others failed. In that case, only the failed ones should 
be
+   * retried.
+   *
+   * @param requests                List of allocation requests
+   * @param dataSource              Datasource for which segment is to be 
allocated.
+   * @param interval                Interval for which segment is to be 
allocated.
+   * @param skipSegmentLineageCheck Whether lineage check is to be skipped
+   *                                (this is true for streaming ingestion)
+   * @param lockGranularity         Granularity of task lock
+   * @return List of allocation results in the same order as the requests.
+   */
+  public List<SegmentAllocateResult> allocateSegments(
+      List<SegmentAllocateRequest> requests,
+      String dataSource,
+      Interval interval,
+      boolean skipSegmentLineageCheck,
+      LockGranularity lockGranularity
+  )
+  {
+    log.info("Allocating [%d] segments for datasource [%s], interval [%s]", 
requests.size(), dataSource, interval);
+    final boolean isTimeChunkLock = lockGranularity == 
LockGranularity.TIME_CHUNK;
+
+    final AllocationHolderList holderList = new AllocationHolderList(requests, 
interval);
+    holderList.getPending().forEach(this::verifyTaskIsActive);
+
+    giant.lock();
+    try {
+      if (isTimeChunkLock) {
+        holderList.getPending().forEach(holder -> acquireTaskLock(holder, 
true));
+        allocateSegmentIds(dataSource, interval, skipSegmentLineageCheck, 
holderList.getPending());
+      } else {
+        allocateSegmentIds(dataSource, interval, skipSegmentLineageCheck, 
holderList.getPending());
+        holderList.getPending().forEach(holder -> acquireTaskLock(holder, 
false));
+      }
+
+      // TODO: for failed allocations, cleanup newly created locks from the 
posse map

Review Comment:
   can this todo be resolved? 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to