kfaraz commented on code in PR #18402: URL: https://github.com/apache/druid/pull/18402#discussion_r2476633181
########## server/src/main/java/org/apache/druid/server/compaction/CompactionSlotManager.java: ########## @@ -0,0 +1,358 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.server.compaction; + +import com.google.common.annotations.VisibleForTesting; +import org.apache.druid.client.indexing.ClientCompactionTaskQuery; +import org.apache.druid.client.indexing.ClientCompactionTaskQueryTuningConfig; +import org.apache.druid.client.indexing.ClientMSQContext; +import org.apache.druid.client.indexing.TaskPayloadResponse; +import org.apache.druid.common.guava.FutureUtils; +import org.apache.druid.indexer.CompactionEngine; +import org.apache.druid.indexer.TaskStatus; +import org.apache.druid.indexer.TaskStatusPlus; +import org.apache.druid.indexer.partitions.DimensionRangePartitionsSpec; +import org.apache.druid.java.util.common.ISE; +import org.apache.druid.java.util.common.granularity.Granularity; +import org.apache.druid.java.util.common.logger.Logger; +import org.apache.druid.metadata.LockFilterPolicy; +import org.apache.druid.rpc.indexing.OverlordClient; +import org.apache.druid.server.coordinator.ClusterCompactionConfig; +import org.apache.druid.server.coordinator.DataSourceCompactionConfig; +import org.apache.druid.server.coordinator.duty.CoordinatorDutyUtils; +import org.joda.time.Interval; + +import javax.annotation.Nullable; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.stream.Collectors; + +/** + * Fetches running compaction tasks from the Overlord and tracks their compaction + * intervals and task slots. + */ +public class CompactionSlotManager +{ + /** + * Task type for native compaction tasks. + */ + public static final String COMPACTION_TASK_TYPE = "compact"; + + private static final Logger log = new Logger(CompactionSlotManager.class); + + private final OverlordClient overlordClient; + private final CompactionStatusTracker statusTracker; + + private final Map<String, List<Interval>> intervalsToSkipCompaction; + + private int numAvailableTaskSlots; + + public CompactionSlotManager( + OverlordClient overlordClient, + CompactionStatusTracker statusTracker, + ClusterCompactionConfig clusterCompactionConfig + ) + { + this.overlordClient = overlordClient; + this.statusTracker = statusTracker; + this.numAvailableTaskSlots = getCompactionTaskCapacity(clusterCompactionConfig); + this.intervalsToSkipCompaction = new HashMap<>(); + } + + public int getNumAvailableTaskSlots() + { + return numAvailableTaskSlots; + } + + public Map<String, List<Interval>> getDatasourceIntervalsToSkipCompaction() + { + return intervalsToSkipCompaction; + } + + public void reserveTaskSlots(int numSlotsToReserve) + { + numAvailableTaskSlots -= numSlotsToReserve; + } + + /** + * Reserves task slots for the given task from the overall compaction task capacity. + */ + public void reserveTaskSlots(ClientCompactionTaskQuery compactionTaskQuery) + { + // Note: The default compactionRunnerType used here should match the default runner used in CompactionTask when + // no runner is provided there. + CompactionEngine compactionRunnerType = compactionTaskQuery.getCompactionRunner() == null + ? CompactionEngine.NATIVE + : compactionTaskQuery.getCompactionRunner().getType(); + if (compactionRunnerType == CompactionEngine.NATIVE) { + numAvailableTaskSlots -= + getMaxTaskSlotsForNativeCompactionTask(compactionTaskQuery.getTuningConfig()); + } else { + numAvailableTaskSlots -= + getMaxTaskSlotsForMSQCompactionTask(compactionTaskQuery.getContext()); + } + } + + /** + * Retrieves currently running tasks of type {@link #COMPACTION_TASK_TYPE} from + * the Overlord. + * <p> + * Also queries the Overlord for the status of all tasks that were submitted + * recently but are not active anymore. The statuses are then updated in the + * {@link CompactionStatusTracker}. + */ + public List<ClientCompactionTaskQuery> fetchRunningCompactionTasks() + { + // Fetch currently running compaction tasks + final List<TaskStatusPlus> compactionTasks = CoordinatorDutyUtils.getStatusOfActiveTasks( + overlordClient, + status -> status != null && COMPACTION_TASK_TYPE.equals(status.getType()) + ); + + final Set<String> activeTaskIds + = compactionTasks.stream().map(TaskStatusPlus::getId).collect(Collectors.toSet()); + trackStatusOfCompletedTasks(activeTaskIds); + + final List<ClientCompactionTaskQuery> runningCompactTasks = new ArrayList<>(); + for (TaskStatusPlus status : compactionTasks) { + final TaskPayloadResponse response = + FutureUtils.getUnchecked(overlordClient.taskPayload(status.getId()), true); + if (response == null) { + throw new ISE("Could not find payload for active compaction task[%s]", status.getId()); + } else if (!COMPACTION_TASK_TYPE.equals(response.getPayload().getType())) { + throw new ISE( + "Payload of active compaction task[%s] is of invalid type[%s]", + status.getId(), response.getPayload().getType() + ); + } + + runningCompactTasks.add((ClientCompactionTaskQuery) response.getPayload()); + } + + return runningCompactTasks; + } + + /** + * Cancels a currently running compaction task only if the segment granularity + * has changed in the datasource compaction config. Otherwise, the task is + * retained and its intervals are skipped from the current round of compaction. + * + * @return true if the task was canceled, false otherwise. + */ + public boolean cancelTaskOnlyIfGranularityChanged( + ClientCompactionTaskQuery compactionTaskQuery, + DataSourceCompactionConfig dataSourceCompactionConfig + ) + { + if (dataSourceCompactionConfig == null + || dataSourceCompactionConfig.getGranularitySpec() == null + || compactionTaskQuery.getGranularitySpec() == null) { + skipTaskInterval(compactionTaskQuery); + reserveTaskSlots(compactionTaskQuery); + return false; + } + + Granularity configuredSegmentGranularity = dataSourceCompactionConfig.getGranularitySpec() + .getSegmentGranularity(); + Granularity taskSegmentGranularity = compactionTaskQuery.getGranularitySpec().getSegmentGranularity(); + if (configuredSegmentGranularity == null || configuredSegmentGranularity.equals(taskSegmentGranularity)) { + skipTaskInterval(compactionTaskQuery); + reserveTaskSlots(compactionTaskQuery); + return false; + } + + log.info( + "Cancelling task[%s] as task segmentGranularity[%s] differs from compaction config segmentGranularity[%s].", + compactionTaskQuery.getId(), taskSegmentGranularity, configuredSegmentGranularity + ); + overlordClient.cancelTask(compactionTaskQuery.getId()); + return true; + } + + /** + * Retrieves the list of intervals locked by higher priority tasks for each datasource. + * Since compaction tasks submitted for these Intervals would have to wait anyway, + * we skip these Intervals until the next compaction run by adding them to + * {@link #intervalsToSkipCompaction}. + * <p> + */ + public void skipLockedIntervals(List<DataSourceCompactionConfig> compactionConfigs) + { + final List<LockFilterPolicy> lockFilterPolicies = compactionConfigs + .stream() + .map(config -> + new LockFilterPolicy(config.getDataSource(), config.getTaskPriority(), null, config.getTaskContext())) + .collect(Collectors.toList()); + final Map<String, List<Interval>> datasourceToLockedIntervals = + new HashMap<>(FutureUtils.getUnchecked(overlordClient.findLockedIntervals(lockFilterPolicies), true)); + log.debug( + "Skipping the following intervals for Compaction as they are currently locked: %s", + datasourceToLockedIntervals + ); + + // Skip all the intervals locked by higher priority tasks for each datasource + // This must be done after the invalid compaction tasks are cancelled Review Comment: updated, thanks for the suggestion! -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected] --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
