kfaraz commented on code in PR #13369:
URL: https://github.com/apache/druid/pull/13369#discussion_r1027784364


##########
indexing-service/src/main/java/org/apache/druid/indexing/common/actions/SegmentAllocationQueue.java:
##########
@@ -0,0 +1,685 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.indexing.common.actions;
+
+import com.google.inject.Inject;
+import org.apache.druid.client.indexing.IndexingService;
+import org.apache.druid.discovery.DruidLeaderSelector;
+import org.apache.druid.guice.ManageLifecycle;
+import org.apache.druid.indexing.common.LockGranularity;
+import org.apache.druid.indexing.common.task.IndexTaskUtils;
+import org.apache.druid.indexing.common.task.Task;
+import org.apache.druid.indexing.overlord.IndexerMetadataStorageCoordinator;
+import org.apache.druid.indexing.overlord.Segments;
+import org.apache.druid.indexing.overlord.TaskLockbox;
+import org.apache.druid.indexing.overlord.config.TaskLockConfig;
+import org.apache.druid.java.util.common.ISE;
+import org.apache.druid.java.util.common.concurrent.ScheduledExecutors;
+import org.apache.druid.java.util.common.granularity.Granularity;
+import org.apache.druid.java.util.common.guava.Comparators;
+import org.apache.druid.java.util.common.lifecycle.LifecycleStart;
+import org.apache.druid.java.util.common.lifecycle.LifecycleStop;
+import org.apache.druid.java.util.common.logger.Logger;
+import org.apache.druid.java.util.emitter.service.ServiceEmitter;
+import org.apache.druid.java.util.emitter.service.ServiceMetricEvent;
+import org.apache.druid.query.DruidMetrics;
+import org.apache.druid.segment.realtime.appenderator.SegmentIdWithShardSpec;
+import org.apache.druid.timeline.DataSegment;
+import org.joda.time.Interval;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.Deque;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Objects;
+import java.util.Set;
+import java.util.TreeSet;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentLinkedDeque;
+import java.util.concurrent.Future;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicReference;
+
+/**
+ * Queue for {@link SegmentAllocateRequest}s.
+ */
+@ManageLifecycle
+public class SegmentAllocationQueue
+{
+  private static final Logger log = new Logger(SegmentAllocationQueue.class);
+
+  private final long maxWaitTimeMillis;
+  private final boolean enabled;
+
+  private final TaskLockbox taskLockbox;
+  private final ScheduledExecutorService executor;
+  private final IndexerMetadataStorageCoordinator metadataStorage;
+  private final DruidLeaderSelector leaderSelector;
+  private final ServiceEmitter emitter;
+
+  private final ConcurrentHashMap<AllocateRequestKey, AllocateRequestBatch> 
keyToBatch = new ConcurrentHashMap<>();
+  private final Deque<AllocateRequestBatch> processingQueue = new 
ConcurrentLinkedDeque<>();
+
+  @Inject
+  public SegmentAllocationQueue(
+      TaskLockbox taskLockbox,
+      TaskLockConfig taskLockConfig,
+      IndexerMetadataStorageCoordinator metadataStorage,
+      @IndexingService DruidLeaderSelector leaderSelector,
+      ServiceEmitter emitter
+  )
+  {
+    this.emitter = emitter;
+    this.taskLockbox = taskLockbox;
+    this.metadataStorage = metadataStorage;
+    this.leaderSelector = leaderSelector;
+    this.maxWaitTimeMillis = taskLockConfig.getBatchAllocationMaxWaitTime();
+    this.enabled = taskLockConfig.isBatchSegmentAllocation();
+
+    this.executor = ScheduledExecutors.fixed(1, "SegmentAllocQueue-%s");
+  }
+
+  @LifecycleStart
+  public void start()
+  {
+    log.info("Starting queue.");
+  }
+
+  @LifecycleStop
+  public void stop()
+  {
+    log.info("Stopping queue.");
+    executor.shutdownNow();
+  }
+
+  public boolean isEnabled()
+  {
+    return enabled;
+  }
+
+  private void scheduleQueuePoll(long delay)
+  {
+    executor.schedule(this::processBatchesDue, delay, TimeUnit.MILLISECONDS);
+  }
+
+  /**
+   * Queues a SegmentAllocateRequest. The returned future may complete 
successfully
+   * with a non-null value or with a non-null value.
+   */
+  public Future<SegmentIdWithShardSpec> add(SegmentAllocateRequest request)
+  {
+    if (!leaderSelector.isLeader()) {
+      throw new ISE("Cannot allocate segment if not leader.");
+    } else if (!isEnabled()) {
+      throw new ISE("Batched segment allocation is disabled.");
+    }
+
+    final AllocateRequestKey requestKey = new AllocateRequestKey(request, 
false);
+    final AtomicReference<Future<SegmentIdWithShardSpec>> requestFuture = new 
AtomicReference<>();
+
+    keyToBatch.compute(requestKey, (key, existingBatch) -> {
+      AllocateRequestBatch computedBatch = existingBatch;
+      if (computedBatch == null) {
+        computedBatch = new AllocateRequestBatch(key);
+        computedBatch.resetQueueTime();
+        processingQueue.offer(computedBatch);
+      }
+
+      // Possible race condition:
+      // t1 -> new batch is added to queue or batch already exists in queue
+      // t2 -> executor pops batch, processes all requests in it
+      // t1 -> new request is added to dangling batch and is never picked up
+      // Solution: For existing batch, call keyToBatch.remove() on the key to
+      // wait on keyToBatch.compute() to finish before proceeding with 
processBatch().
+      // For new batch, keyToBatch.remove() would not wait as key is not in 
map yet
+      // but a new batch is unlikely to be due immediately, so it won't get 
popped right away.
+      requestFuture.set(computedBatch.add(request));
+      return computedBatch;
+    });
+
+    return requestFuture.get();
+  }
+
+  private void requeueBatch(AllocateRequestBatch batch)
+  {
+    log.info("Requeueing [%d] failed requests in batch [%s].", batch.size(), 
batch.key);
+    keyToBatch.compute(batch.key, (key, existingBatch) -> {
+      if (existingBatch == null) {
+        batch.resetQueueTime();
+        processingQueue.offer(batch);
+        return batch;
+      } else {
+        // Merge requests from this batch to existing one
+        existingBatch.merge(batch);
+        return existingBatch;
+      }
+    });
+  }
+
+  private void processBatchesDue()
+  {
+    // If not leader, clear the queue and do not schedule any more rounds of 
processing
+    if (!leaderSelector.isLeader()) {
+      log.info("Not leader anymore. Clearing [%d] batches from queue.", 
processingQueue.size());
+      processingQueue.clear();
+      keyToBatch.clear();

Review Comment:
   Updated.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to