kfaraz commented on code in PR #18402:
URL: https://github.com/apache/druid/pull/18402#discussion_r2476633181


##########
server/src/main/java/org/apache/druid/server/compaction/CompactionSlotManager.java:
##########
@@ -0,0 +1,358 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.server.compaction;
+
+import com.google.common.annotations.VisibleForTesting;
+import org.apache.druid.client.indexing.ClientCompactionTaskQuery;
+import org.apache.druid.client.indexing.ClientCompactionTaskQueryTuningConfig;
+import org.apache.druid.client.indexing.ClientMSQContext;
+import org.apache.druid.client.indexing.TaskPayloadResponse;
+import org.apache.druid.common.guava.FutureUtils;
+import org.apache.druid.indexer.CompactionEngine;
+import org.apache.druid.indexer.TaskStatus;
+import org.apache.druid.indexer.TaskStatusPlus;
+import org.apache.druid.indexer.partitions.DimensionRangePartitionsSpec;
+import org.apache.druid.java.util.common.ISE;
+import org.apache.druid.java.util.common.granularity.Granularity;
+import org.apache.druid.java.util.common.logger.Logger;
+import org.apache.druid.metadata.LockFilterPolicy;
+import org.apache.druid.rpc.indexing.OverlordClient;
+import org.apache.druid.server.coordinator.ClusterCompactionConfig;
+import org.apache.druid.server.coordinator.DataSourceCompactionConfig;
+import org.apache.druid.server.coordinator.duty.CoordinatorDutyUtils;
+import org.joda.time.Interval;
+
+import javax.annotation.Nullable;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.stream.Collectors;
+
+/**
+ * Fetches running compaction tasks from the Overlord and tracks their 
compaction
+ * intervals and task slots.
+ */
+public class CompactionSlotManager
+{
+  /**
+   * Task type for native compaction tasks.
+   */
+  public static final String COMPACTION_TASK_TYPE = "compact";
+
+  private static final Logger log = new Logger(CompactionSlotManager.class);
+
+  private final OverlordClient overlordClient;
+  private final CompactionStatusTracker statusTracker;
+
+  private final Map<String, List<Interval>> intervalsToSkipCompaction;
+
+  private int numAvailableTaskSlots;
+
+  public CompactionSlotManager(
+      OverlordClient overlordClient,
+      CompactionStatusTracker statusTracker,
+      ClusterCompactionConfig clusterCompactionConfig
+  )
+  {
+    this.overlordClient = overlordClient;
+    this.statusTracker = statusTracker;
+    this.numAvailableTaskSlots = 
getCompactionTaskCapacity(clusterCompactionConfig);
+    this.intervalsToSkipCompaction = new HashMap<>();
+  }
+
+  public int getNumAvailableTaskSlots()
+  {
+    return numAvailableTaskSlots;
+  }
+
+  public Map<String, List<Interval>> getDatasourceIntervalsToSkipCompaction()
+  {
+    return intervalsToSkipCompaction;
+  }
+
+  public void reserveTaskSlots(int numSlotsToReserve)
+  {
+    numAvailableTaskSlots -= numSlotsToReserve;
+  }
+
+  /**
+   * Reserves task slots for the given task from the overall compaction task 
capacity.
+   */
+  public void reserveTaskSlots(ClientCompactionTaskQuery compactionTaskQuery)
+  {
+    // Note: The default compactionRunnerType used here should match the 
default runner used in CompactionTask when
+    // no runner is provided there.
+    CompactionEngine compactionRunnerType = 
compactionTaskQuery.getCompactionRunner() == null
+                                            ? CompactionEngine.NATIVE
+                                            : 
compactionTaskQuery.getCompactionRunner().getType();
+    if (compactionRunnerType == CompactionEngine.NATIVE) {
+      numAvailableTaskSlots -=
+          
getMaxTaskSlotsForNativeCompactionTask(compactionTaskQuery.getTuningConfig());
+    } else {
+      numAvailableTaskSlots -=
+          
getMaxTaskSlotsForMSQCompactionTask(compactionTaskQuery.getContext());
+    }
+  }
+
+  /**
+   * Retrieves currently running tasks of type {@link #COMPACTION_TASK_TYPE} 
from
+   * the Overlord.
+   * <p>
+   * Also queries the Overlord for the status of all tasks that were submitted
+   * recently but are not active anymore. The statuses are then updated in the
+   * {@link CompactionStatusTracker}.
+   */
+  public List<ClientCompactionTaskQuery> fetchRunningCompactionTasks()
+  {
+    // Fetch currently running compaction tasks
+    final List<TaskStatusPlus> compactionTasks = 
CoordinatorDutyUtils.getStatusOfActiveTasks(
+        overlordClient,
+        status -> status != null && 
COMPACTION_TASK_TYPE.equals(status.getType())
+    );
+
+    final Set<String> activeTaskIds
+        = 
compactionTasks.stream().map(TaskStatusPlus::getId).collect(Collectors.toSet());
+    trackStatusOfCompletedTasks(activeTaskIds);
+
+    final List<ClientCompactionTaskQuery> runningCompactTasks = new 
ArrayList<>();
+    for (TaskStatusPlus status : compactionTasks) {
+      final TaskPayloadResponse response =
+          FutureUtils.getUnchecked(overlordClient.taskPayload(status.getId()), 
true);
+      if (response == null) {
+        throw new ISE("Could not find payload for active compaction task[%s]", 
status.getId());
+      } else if 
(!COMPACTION_TASK_TYPE.equals(response.getPayload().getType())) {
+        throw new ISE(
+            "Payload of active compaction task[%s] is of invalid type[%s]",
+            status.getId(), response.getPayload().getType()
+        );
+      }
+
+      runningCompactTasks.add((ClientCompactionTaskQuery) 
response.getPayload());
+    }
+
+    return runningCompactTasks;
+  }
+
+  /**
+   * Cancels a currently running compaction task only if the segment 
granularity
+   * has changed in the datasource compaction config. Otherwise, the task is
+   * retained and its intervals are skipped from the current round of 
compaction.
+   *
+   * @return true if the task was canceled, false otherwise.
+   */
+  public boolean cancelTaskOnlyIfGranularityChanged(
+      ClientCompactionTaskQuery compactionTaskQuery,
+      DataSourceCompactionConfig dataSourceCompactionConfig
+  )
+  {
+    if (dataSourceCompactionConfig == null
+        || dataSourceCompactionConfig.getGranularitySpec() == null
+        || compactionTaskQuery.getGranularitySpec() == null) {
+      skipTaskInterval(compactionTaskQuery);
+      reserveTaskSlots(compactionTaskQuery);
+      return false;
+    }
+
+    Granularity configuredSegmentGranularity = 
dataSourceCompactionConfig.getGranularitySpec()
+                                                                         
.getSegmentGranularity();
+    Granularity taskSegmentGranularity = 
compactionTaskQuery.getGranularitySpec().getSegmentGranularity();
+    if (configuredSegmentGranularity == null || 
configuredSegmentGranularity.equals(taskSegmentGranularity)) {
+      skipTaskInterval(compactionTaskQuery);
+      reserveTaskSlots(compactionTaskQuery);
+      return false;
+    }
+
+    log.info(
+        "Cancelling task[%s] as task segmentGranularity[%s] differs from 
compaction config segmentGranularity[%s].",
+        compactionTaskQuery.getId(), taskSegmentGranularity, 
configuredSegmentGranularity
+    );
+    overlordClient.cancelTask(compactionTaskQuery.getId());
+    return true;
+  }
+
+  /**
+   * Retrieves the list of intervals locked by higher priority tasks for each 
datasource.
+   * Since compaction tasks submitted for these Intervals would have to wait 
anyway,
+   * we skip these Intervals until the next compaction run by adding them to
+   * {@link #intervalsToSkipCompaction}.
+   * <p>
+   */
+  public void skipLockedIntervals(List<DataSourceCompactionConfig> 
compactionConfigs)
+  {
+    final List<LockFilterPolicy> lockFilterPolicies = compactionConfigs
+        .stream()
+        .map(config ->
+                 new LockFilterPolicy(config.getDataSource(), 
config.getTaskPriority(), null, config.getTaskContext()))
+        .collect(Collectors.toList());
+    final Map<String, List<Interval>> datasourceToLockedIntervals =
+        new 
HashMap<>(FutureUtils.getUnchecked(overlordClient.findLockedIntervals(lockFilterPolicies),
 true));
+    log.debug(
+        "Skipping the following intervals for Compaction as they are currently 
locked: %s",
+        datasourceToLockedIntervals
+    );
+
+    // Skip all the intervals locked by higher priority tasks for each 
datasource
+    // This must be done after the invalid compaction tasks are cancelled

Review Comment:
   updated, thanks for the suggestion!



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to