kfaraz commented on code in PR #13369:
URL: https://github.com/apache/druid/pull/13369#discussion_r1035529643


##########
indexing-service/src/main/java/org/apache/druid/indexing/common/actions/SegmentAllocationQueue.java:
##########
@@ -0,0 +1,694 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.indexing.common.actions;
+
+import com.google.inject.Inject;
+import org.apache.druid.guice.ManageLifecycle;
+import org.apache.druid.indexing.common.LockGranularity;
+import org.apache.druid.indexing.common.task.IndexTaskUtils;
+import org.apache.druid.indexing.common.task.Task;
+import org.apache.druid.indexing.overlord.IndexerMetadataStorageCoordinator;
+import org.apache.druid.indexing.overlord.Segments;
+import org.apache.druid.indexing.overlord.TaskLockbox;
+import org.apache.druid.indexing.overlord.config.TaskLockConfig;
+import org.apache.druid.java.util.common.ISE;
+import org.apache.druid.java.util.common.Intervals;
+import org.apache.druid.java.util.common.concurrent.ScheduledExecutorFactory;
+import org.apache.druid.java.util.common.granularity.Granularity;
+import org.apache.druid.java.util.common.guava.Comparators;
+import org.apache.druid.java.util.common.lifecycle.LifecycleStart;
+import org.apache.druid.java.util.common.lifecycle.LifecycleStop;
+import org.apache.druid.java.util.common.logger.Logger;
+import org.apache.druid.java.util.emitter.service.ServiceEmitter;
+import org.apache.druid.java.util.emitter.service.ServiceMetricEvent;
+import org.apache.druid.query.DruidMetrics;
+import org.apache.druid.segment.realtime.appenderator.SegmentIdWithShardSpec;
+import org.apache.druid.timeline.DataSegment;
+import org.joda.time.Interval;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Objects;
+import java.util.Set;
+import java.util.TreeSet;
+import java.util.concurrent.BlockingDeque;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.Future;
+import java.util.concurrent.LinkedBlockingDeque;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicReference;
+
+/**
+ * Queue for {@link SegmentAllocateRequest}s.
+ */
+@ManageLifecycle
+public class SegmentAllocationQueue
+{
+  private static final Logger log = new Logger(SegmentAllocationQueue.class);
+
+  private static final int MAX_QUEUE_SIZE = 2000;
+
+  private final long maxWaitTimeMillis;
+  private final boolean enabled;
+
+  private final TaskLockbox taskLockbox;
+  private final ScheduledExecutorService executor;
+  private final IndexerMetadataStorageCoordinator metadataStorage;
+  private final AtomicBoolean isLeader = new AtomicBoolean(false);
+  private final ServiceEmitter emitter;
+
+  private final ConcurrentHashMap<AllocateRequestKey, AllocateRequestBatch> 
keyToBatch = new ConcurrentHashMap<>();
+  private final BlockingDeque<AllocateRequestBatch> processingQueue = new 
LinkedBlockingDeque<>(MAX_QUEUE_SIZE);
+
+  @Inject
+  public SegmentAllocationQueue(
+      TaskLockbox taskLockbox,
+      TaskLockConfig taskLockConfig,
+      IndexerMetadataStorageCoordinator metadataStorage,
+      ServiceEmitter emitter,
+      ScheduledExecutorFactory executorFactory
+  )
+  {
+    this.emitter = emitter;
+    this.taskLockbox = taskLockbox;
+    this.metadataStorage = metadataStorage;
+    this.maxWaitTimeMillis = taskLockConfig.getBatchAllocationMaxWaitTime();
+    this.enabled = taskLockConfig.isBatchSegmentAllocation();
+
+    this.executor = executorFactory.create(1, "SegmentAllocQueue-%s");
+  }
+
+  @LifecycleStart
+  public void start()
+  {
+    log.info("Initializing segment allocation queue.");
+  }
+
+  @LifecycleStop
+  public void stop()
+  {
+    log.info("Tearing down segment allocation queue.");
+    executor.shutdownNow();
+  }
+
+  public boolean isEnabled()
+  {
+    return enabled;
+  }
+
+  private void scheduleQueuePoll(long delay)
+  {
+    executor.schedule(this::processBatchesDue, delay, TimeUnit.MILLISECONDS);
+  }
+
+  /**
+   * Gets the number of batches currently in the queue.
+   */
+  public int size()
+  {
+    return processingQueue.size();
+  }
+
+  /**
+   * Queues a SegmentAllocateRequest. The returned future may complete 
successfully
+   * with a non-null value or with a non-null value.
+   */
+  public Future<SegmentIdWithShardSpec> add(SegmentAllocateRequest request)
+  {
+    if (!isLeader.get()) {
+      throw new ISE("Cannot allocate segment if not leader.");
+    } else if (!isEnabled()) {
+      throw new ISE("Batched segment allocation is disabled.");
+    }
+
+    final AllocateRequestKey requestKey = new AllocateRequestKey(request, 
false);
+    final AtomicReference<Future<SegmentIdWithShardSpec>> requestFuture = new 
AtomicReference<>();
+
+    // Possible race condition:
+    // t1 -> new batch is added to queue or batch already exists in queue
+    // t2 -> executor pops batch, processes all requests in it
+    // t1 -> new request is added to dangling batch and is never picked up
+    // Solution: For existing batch, call keyToBatch.remove() on the key to
+    // wait on keyToBatch.compute() to finish before proceeding with 
processBatch().
+    // For new batch, keyToBatch.remove() would not wait as key is not in map 
yet
+    // but a new batch is unlikely to be due immediately, so it won't get 
popped right away.
+    keyToBatch.compute(requestKey, (key, existingBatch) -> {
+      if (existingBatch == null) {
+        AllocateRequestBatch newBatch = new AllocateRequestBatch(key);
+        requestFuture.set(newBatch.add(request));
+        return addBatchToQueue(newBatch) ? newBatch : null;
+      } else {
+        requestFuture.set(existingBatch.add(request));
+        return existingBatch;
+      }
+    });
+
+    return requestFuture.get();
+  }
+
+  /**
+   * Tries to add the given batch to the processing queue. If the queue is 
full,
+   * marks the batch as completed failing all the pending requests.
+   */
+  private boolean addBatchToQueue(AllocateRequestBatch batch)
+  {
+    batch.resetQueueTime();
+    if (processingQueue.offer(batch)) {
+      log.debug("Added a new batch [%s] to queue.", batch.key);
+      return true;
+    } else {
+      log.warn("Cannot add batch [%s] as queue is full. Failing [%d] 
requests.", batch.key, batch.size());
+      batch.markCompleted();
+      return false;
+    }
+  }
+
+  /**
+   * Tries to add the given batch to the processing queue. If a batch already
+   * exists for this key, transfers all the requests from this batch to the
+   * existing one.
+   */
+  private void requeueBatch(AllocateRequestBatch batch)
+  {
+    log.info("Requeueing [%d] failed requests in batch [%s].", batch.size(), 
batch.key);
+    keyToBatch.compute(batch.key, (key, existingBatch) -> {
+      if (existingBatch == null) {
+        return addBatchToQueue(batch) ? batch : null;
+      } else {
+        // Merge requests from this batch to existing one
+        existingBatch.transferRequestsFrom(batch);
+        return existingBatch;
+      }
+    });
+  }
+
+  private void processBatchesDue()
+  {
+    // If not leader, clear the queue and do not schedule any more rounds of 
processing
+    if (!isLeader.get()) {
+      log.info("Not leader anymore. Failing [%d] batches in queue.", 
processingQueue.size());
+
+      // Keep removing items from the queue as long as not leader
+      AllocateRequestBatch nextBatch = processingQueue.peekFirst();
+      while (nextBatch != null && !isLeader.get()) {
+        processingQueue.pollFirst();
+        keyToBatch.remove(nextBatch.key);
+        nextBatch.markCompleted();
+        nextBatch = processingQueue.peekFirst();
+      }
+    }
+
+    // Check once again for leadership
+    if (!isLeader.get()) {
+      return;
+    }

Review Comment:
   I guess that is not likely to happen. New items are rejected if we are not 
the leader.
   But maybe it could happen if the leader is flapping?
   - t1: we are not leader, we clear the queue
   - t2: become leader
   - t3: add a request
   - t4: stop being leader
   - t1: we are not leader, return
   the request added by t3 will thus remain stuck in the queue, until we become 
leader again and process it. The future will timeout after 5 minutes.
   
   A simpler alternative to all of this can be to have the leadership check 
done for each batch separately in `processBatch`. If not leader, fail it there. 
The only difference would be that even non-leaders would keep polling the 
queue. I wanted to avoid that somehow. Please let me know what you think.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@druid.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@druid.apache.org
For additional commands, e-mail: commits-h...@druid.apache.org

Reply via email to