kfaraz commented on code in PR #13369: URL: https://github.com/apache/druid/pull/13369#discussion_r1035529643
########## indexing-service/src/main/java/org/apache/druid/indexing/common/actions/SegmentAllocationQueue.java: ########## @@ -0,0 +1,694 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.indexing.common.actions; + +import com.google.inject.Inject; +import org.apache.druid.guice.ManageLifecycle; +import org.apache.druid.indexing.common.LockGranularity; +import org.apache.druid.indexing.common.task.IndexTaskUtils; +import org.apache.druid.indexing.common.task.Task; +import org.apache.druid.indexing.overlord.IndexerMetadataStorageCoordinator; +import org.apache.druid.indexing.overlord.Segments; +import org.apache.druid.indexing.overlord.TaskLockbox; +import org.apache.druid.indexing.overlord.config.TaskLockConfig; +import org.apache.druid.java.util.common.ISE; +import org.apache.druid.java.util.common.Intervals; +import org.apache.druid.java.util.common.concurrent.ScheduledExecutorFactory; +import org.apache.druid.java.util.common.granularity.Granularity; +import org.apache.druid.java.util.common.guava.Comparators; +import org.apache.druid.java.util.common.lifecycle.LifecycleStart; +import org.apache.druid.java.util.common.lifecycle.LifecycleStop; +import org.apache.druid.java.util.common.logger.Logger; +import org.apache.druid.java.util.emitter.service.ServiceEmitter; +import org.apache.druid.java.util.emitter.service.ServiceMetricEvent; +import org.apache.druid.query.DruidMetrics; +import org.apache.druid.segment.realtime.appenderator.SegmentIdWithShardSpec; +import org.apache.druid.timeline.DataSegment; +import org.joda.time.Interval; + +import java.util.ArrayList; +import java.util.HashMap; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Objects; +import java.util.Set; +import java.util.TreeSet; +import java.util.concurrent.BlockingDeque; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.Future; +import java.util.concurrent.LinkedBlockingDeque; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicReference; + +/** + * Queue for {@link SegmentAllocateRequest}s. + */ +@ManageLifecycle +public class SegmentAllocationQueue +{ + private static final Logger log = new Logger(SegmentAllocationQueue.class); + + private static final int MAX_QUEUE_SIZE = 2000; + + private final long maxWaitTimeMillis; + private final boolean enabled; + + private final TaskLockbox taskLockbox; + private final ScheduledExecutorService executor; + private final IndexerMetadataStorageCoordinator metadataStorage; + private final AtomicBoolean isLeader = new AtomicBoolean(false); + private final ServiceEmitter emitter; + + private final ConcurrentHashMap<AllocateRequestKey, AllocateRequestBatch> keyToBatch = new ConcurrentHashMap<>(); + private final BlockingDeque<AllocateRequestBatch> processingQueue = new LinkedBlockingDeque<>(MAX_QUEUE_SIZE); + + @Inject + public SegmentAllocationQueue( + TaskLockbox taskLockbox, + TaskLockConfig taskLockConfig, + IndexerMetadataStorageCoordinator metadataStorage, + ServiceEmitter emitter, + ScheduledExecutorFactory executorFactory + ) + { + this.emitter = emitter; + this.taskLockbox = taskLockbox; + this.metadataStorage = metadataStorage; + this.maxWaitTimeMillis = taskLockConfig.getBatchAllocationMaxWaitTime(); + this.enabled = taskLockConfig.isBatchSegmentAllocation(); + + this.executor = executorFactory.create(1, "SegmentAllocQueue-%s"); + } + + @LifecycleStart + public void start() + { + log.info("Initializing segment allocation queue."); + } + + @LifecycleStop + public void stop() + { + log.info("Tearing down segment allocation queue."); + executor.shutdownNow(); + } + + public boolean isEnabled() + { + return enabled; + } + + private void scheduleQueuePoll(long delay) + { + executor.schedule(this::processBatchesDue, delay, TimeUnit.MILLISECONDS); + } + + /** + * Gets the number of batches currently in the queue. + */ + public int size() + { + return processingQueue.size(); + } + + /** + * Queues a SegmentAllocateRequest. The returned future may complete successfully + * with a non-null value or with a non-null value. + */ + public Future<SegmentIdWithShardSpec> add(SegmentAllocateRequest request) + { + if (!isLeader.get()) { + throw new ISE("Cannot allocate segment if not leader."); + } else if (!isEnabled()) { + throw new ISE("Batched segment allocation is disabled."); + } + + final AllocateRequestKey requestKey = new AllocateRequestKey(request, false); + final AtomicReference<Future<SegmentIdWithShardSpec>> requestFuture = new AtomicReference<>(); + + // Possible race condition: + // t1 -> new batch is added to queue or batch already exists in queue + // t2 -> executor pops batch, processes all requests in it + // t1 -> new request is added to dangling batch and is never picked up + // Solution: For existing batch, call keyToBatch.remove() on the key to + // wait on keyToBatch.compute() to finish before proceeding with processBatch(). + // For new batch, keyToBatch.remove() would not wait as key is not in map yet + // but a new batch is unlikely to be due immediately, so it won't get popped right away. + keyToBatch.compute(requestKey, (key, existingBatch) -> { + if (existingBatch == null) { + AllocateRequestBatch newBatch = new AllocateRequestBatch(key); + requestFuture.set(newBatch.add(request)); + return addBatchToQueue(newBatch) ? newBatch : null; + } else { + requestFuture.set(existingBatch.add(request)); + return existingBatch; + } + }); + + return requestFuture.get(); + } + + /** + * Tries to add the given batch to the processing queue. If the queue is full, + * marks the batch as completed failing all the pending requests. + */ + private boolean addBatchToQueue(AllocateRequestBatch batch) + { + batch.resetQueueTime(); + if (processingQueue.offer(batch)) { + log.debug("Added a new batch [%s] to queue.", batch.key); + return true; + } else { + log.warn("Cannot add batch [%s] as queue is full. Failing [%d] requests.", batch.key, batch.size()); + batch.markCompleted(); + return false; + } + } + + /** + * Tries to add the given batch to the processing queue. If a batch already + * exists for this key, transfers all the requests from this batch to the + * existing one. + */ + private void requeueBatch(AllocateRequestBatch batch) + { + log.info("Requeueing [%d] failed requests in batch [%s].", batch.size(), batch.key); + keyToBatch.compute(batch.key, (key, existingBatch) -> { + if (existingBatch == null) { + return addBatchToQueue(batch) ? batch : null; + } else { + // Merge requests from this batch to existing one + existingBatch.transferRequestsFrom(batch); + return existingBatch; + } + }); + } + + private void processBatchesDue() + { + // If not leader, clear the queue and do not schedule any more rounds of processing + if (!isLeader.get()) { + log.info("Not leader anymore. Failing [%d] batches in queue.", processingQueue.size()); + + // Keep removing items from the queue as long as not leader + AllocateRequestBatch nextBatch = processingQueue.peekFirst(); + while (nextBatch != null && !isLeader.get()) { + processingQueue.pollFirst(); + keyToBatch.remove(nextBatch.key); + nextBatch.markCompleted(); + nextBatch = processingQueue.peekFirst(); + } + } + + // Check once again for leadership + if (!isLeader.get()) { + return; + } Review Comment: I guess that is not likely to happen. New items are rejected if we are not the leader. But maybe it could happen if the leader is flapping? - t1: we are not leader, we clear the queue - t2: become leader - t3: add a request - t4: stop being leader - t1: we are not leader, return the request added by t3 will thus remain stuck in the queue, until we become leader again and process it. The future will timeout after 5 minutes. A simpler alternative to all of this can be to have the leadership check done for each batch separately in `processBatch`. If not leader, fail it there. The only difference would be that even non-leaders would keep polling the queue. I wanted to avoid that somehow. Please let me know what you think. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@druid.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@druid.apache.org For additional commands, e-mail: commits-h...@druid.apache.org