capistrant commented on code in PR #18402:
URL: https://github.com/apache/druid/pull/18402#discussion_r2470832205


##########
server/src/main/java/org/apache/druid/server/compaction/CompactionStatusTracker.java:
##########
@@ -86,50 +75,46 @@ public Set<String> getSubmittedTaskIds()
     return submittedTaskIdToSegments.keySet();
   }
 
+  /**
+   * Checks if compaction can be started for the given {@link 
CompactionCandidate}.
+   * This method assumes that the given candidate is eligible for compaction
+   * based on the current compaction config/supervisor of the datasource.
+   */
   public CompactionStatus computeCompactionStatus(
       CompactionCandidate candidate,
-      DataSourceCompactionConfig config,
       CompactionCandidateSearchPolicy searchPolicy
   )
   {
-    final CompactionStatus compactionStatus = 
CompactionStatus.compute(candidate, config, objectMapper);
-    if (compactionStatus.isComplete()) {
-      return compactionStatus;
-    }
-
-    // Skip intervals that violate max allowed input segment size
-    final long inputSegmentSize = config.getInputSegmentSizeBytes();
-    if (candidate.getTotalBytes() > inputSegmentSize) {
-      return CompactionStatus.skipped(
-          "'inputSegmentSize' exceeded: Total segment size[%d] is larger than 
allowed inputSegmentSize[%d]",
-          candidate.getTotalBytes(), inputSegmentSize
-      );
-    }
+    final CompactionStatus pendingStatus = CompactionStatus.pending("not 
compacted yet");

Review Comment:
   nit: renaming this status to pending when there is an actual status called 
pending that may not be the final status we return feels odd. I didn't mind the 
previous name or maybe something like `computedStatus`, not sure. Big scheme, 
not a big deal and up to you if you just want to leave it as is.



##########
indexing-service/src/main/java/org/apache/druid/indexing/compact/CompactionStateMatcher.java:
##########
@@ -0,0 +1,183 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.indexing.compact;
+
+import com.fasterxml.jackson.annotation.JsonCreator;
+import com.fasterxml.jackson.annotation.JsonInclude;
+import com.fasterxml.jackson.annotation.JsonProperty;
+import org.apache.druid.data.input.impl.AggregateProjectionSpec;
+import org.apache.druid.data.input.impl.DimensionsSpec;
+import org.apache.druid.indexer.partitions.PartitionsSpec;
+import org.apache.druid.java.util.common.granularity.Granularity;
+import org.apache.druid.query.aggregation.AggregatorFactory;
+import org.apache.druid.segment.IndexSpec;
+import org.apache.druid.segment.transform.CompactionTransformSpec;
+import org.apache.druid.server.coordinator.UserCompactionTaskGranularityConfig;
+
+import javax.annotation.Nullable;
+import java.util.Arrays;
+import java.util.List;
+import java.util.Objects;
+
+/**
+ * Target compacted state of segments used to determine if compaction is needed
+ * for an interval. An explicitly defined target state helps avoid superfluous
+ * compaction when only the job definition has changed.
+ * <p>
+ * This class is mostly a duplicate of {@code CompactionState} but is kept
+ * separate to allow:
+ * <ul>
+ * <li>fields to be nullable so that only non-null fields are used for 
matching</li>
+ * <li>legacy "compaction-incompatible" fields to be removed</li>

Review Comment:
   I think this may be me coming late to the party on compaction. For my sake, 
could you elaborate on this list item to get me up to speed. I guess I'm 
unclear on what you refer to as compaction-incompatible fields



##########
indexing-service/src/main/java/org/apache/druid/indexing/compact/MSQCompactionJobTemplate.java:
##########
@@ -0,0 +1,191 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.indexing.compact;
+
+import com.fasterxml.jackson.annotation.JsonCreator;
+import com.fasterxml.jackson.annotation.JsonProperty;
+import org.apache.druid.indexing.input.DruidInputSource;
+import org.apache.druid.java.util.common.StringUtils;
+import org.apache.druid.java.util.common.granularity.Granularity;
+import org.apache.druid.query.http.ClientSqlQuery;
+import org.apache.druid.server.compaction.CompactionCandidate;
+import org.apache.druid.server.compaction.CompactionSlotManager;
+import org.apache.druid.server.compaction.DataSourceCompactibleSegmentIterator;
+import org.apache.druid.server.coordinator.duty.CompactSegments;
+import org.joda.time.Interval;
+import org.joda.time.format.DateTimeFormat;
+import org.joda.time.format.DateTimeFormatter;
+
+import javax.annotation.Nullable;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Objects;
+
+/**
+ * Compaction template that creates MSQ SQL jobs using a templatized SQL with
+ * variables of the format {@code ${variableName}} for fields such as 
datasource
+ * name and start timestamp.
+ * <p>
+ * Compaction is triggered for an interval only if the current compaction state
+ * of the underlying segments DOES NOT match with the {@link #targetState}.
+ */
+public class MSQCompactionJobTemplate implements CompactionJobTemplate
+{
+  public static final String TYPE = "compactMsq";
+
+  public static final String VAR_DATASOURCE = "${dataSource}";
+  public static final String VAR_START_TIMESTAMP = "${startTimestamp}";
+  public static final String VAR_END_TIMESTAMP = "${endTimestamp}";
+
+  private static final DateTimeFormatter TIMESTAMP_FORMATTER =
+      DateTimeFormat.forPattern("yyyy-MM-dd HH:mm:ss.SSS");
+
+  private final ClientSqlQuery sqlTemplate;
+  private final CompactionStateMatcher targetState;
+
+  @JsonCreator
+  public MSQCompactionJobTemplate(
+      @JsonProperty("sqlTemplate") ClientSqlQuery sqlTemplate,
+      @JsonProperty("targetState") CompactionStateMatcher targetState
+  )
+  {
+    this.sqlTemplate = sqlTemplate;
+    this.targetState = targetState;
+  }
+
+  @JsonProperty
+  public ClientSqlQuery getSqlTemplate()
+  {
+    return sqlTemplate;
+  }
+
+  @JsonProperty
+  public CompactionStateMatcher getTargetState()
+  {
+    return targetState;
+  }
+
+  @Nullable
+  @Override
+  public Granularity getSegmentGranularity()
+  {
+    return targetState.getSegmentGranularity();
+  }
+
+  @Override
+  public List<CompactionJob> createCompactionJobs(
+      DruidInputSource source,
+      CompactionJobParams jobParams
+  )
+  {
+    final String dataSource = source.getDataSource();
+
+    // Identify the compactible candidate segments
+    final CompactionConfigBasedJobTemplate delegate =
+        CompactionConfigBasedJobTemplate.create(dataSource, targetState);
+    final DataSourceCompactibleSegmentIterator candidateIterator =
+        delegate.getCompactibleCandidates(source, jobParams);
+
+    // Create MSQ jobs for each candidate by interpolating the template 
variables
+    final List<CompactionJob> jobs = new ArrayList<>();
+    while (candidateIterator.hasNext()) {
+      final CompactionCandidate candidate = candidateIterator.next();
+      jobs.add(
+          new CompactionJob(
+              createQueryForJob(dataSource, candidate.getCompactionInterval()),
+              candidate,
+              
CompactionSlotManager.getMaxTaskSlotsForMSQCompactionTask(sqlTemplate.getContext())
+          )
+      );
+    }
+
+    return jobs;
+  }
+
+  private ClientSqlQuery createQueryForJob(String dataSource, Interval 
compactionInterval)

Review Comment:
   wondering aloud on if there is a way to make the query and formatting more 
flexible/extensible if folks want to use more than these standard format vars. 
Perhaps if someone wanted to do some filtering during compaction? Or would you 
suggest that if a person desires that, that it would be in some custom template 
type that they roll on their own?



##########
server/src/main/java/org/apache/druid/server/compaction/CompactionSlotManager.java:
##########
@@ -0,0 +1,358 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.server.compaction;
+
+import com.google.common.annotations.VisibleForTesting;
+import org.apache.druid.client.indexing.ClientCompactionTaskQuery;
+import org.apache.druid.client.indexing.ClientCompactionTaskQueryTuningConfig;
+import org.apache.druid.client.indexing.ClientMSQContext;
+import org.apache.druid.client.indexing.TaskPayloadResponse;
+import org.apache.druid.common.guava.FutureUtils;
+import org.apache.druid.indexer.CompactionEngine;
+import org.apache.druid.indexer.TaskStatus;
+import org.apache.druid.indexer.TaskStatusPlus;
+import org.apache.druid.indexer.partitions.DimensionRangePartitionsSpec;
+import org.apache.druid.java.util.common.ISE;
+import org.apache.druid.java.util.common.granularity.Granularity;
+import org.apache.druid.java.util.common.logger.Logger;
+import org.apache.druid.metadata.LockFilterPolicy;
+import org.apache.druid.rpc.indexing.OverlordClient;
+import org.apache.druid.server.coordinator.ClusterCompactionConfig;
+import org.apache.druid.server.coordinator.DataSourceCompactionConfig;
+import org.apache.druid.server.coordinator.duty.CoordinatorDutyUtils;
+import org.joda.time.Interval;
+
+import javax.annotation.Nullable;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.stream.Collectors;
+
+/**
+ * Fetches running compaction tasks from the Overlord and tracks their 
compaction
+ * intervals and task slots.
+ */
+public class CompactionSlotManager
+{
+  /**
+   * Task type for native compaction tasks.
+   */
+  public static final String COMPACTION_TASK_TYPE = "compact";
+
+  private static final Logger log = new Logger(CompactionSlotManager.class);
+
+  private final OverlordClient overlordClient;
+  private final CompactionStatusTracker statusTracker;
+
+  private final Map<String, List<Interval>> intervalsToSkipCompaction;
+
+  private int numAvailableTaskSlots;
+
+  public CompactionSlotManager(
+      OverlordClient overlordClient,
+      CompactionStatusTracker statusTracker,
+      ClusterCompactionConfig clusterCompactionConfig
+  )
+  {
+    this.overlordClient = overlordClient;
+    this.statusTracker = statusTracker;
+    this.numAvailableTaskSlots = 
getCompactionTaskCapacity(clusterCompactionConfig);
+    this.intervalsToSkipCompaction = new HashMap<>();
+  }
+
+  public int getNumAvailableTaskSlots()
+  {
+    return numAvailableTaskSlots;
+  }
+
+  public Map<String, List<Interval>> getDatasourceIntervalsToSkipCompaction()
+  {
+    return intervalsToSkipCompaction;
+  }
+
+  public void reserveTaskSlots(int numSlotsToReserve)
+  {
+    numAvailableTaskSlots -= numSlotsToReserve;
+  }
+
+  /**
+   * Reserves task slots for the given task from the overall compaction task 
capacity.
+   */
+  public void reserveTaskSlots(ClientCompactionTaskQuery compactionTaskQuery)
+  {
+    // Note: The default compactionRunnerType used here should match the 
default runner used in CompactionTask when
+    // no runner is provided there.
+    CompactionEngine compactionRunnerType = 
compactionTaskQuery.getCompactionRunner() == null
+                                            ? CompactionEngine.NATIVE
+                                            : 
compactionTaskQuery.getCompactionRunner().getType();
+    if (compactionRunnerType == CompactionEngine.NATIVE) {
+      numAvailableTaskSlots -=
+          
getMaxTaskSlotsForNativeCompactionTask(compactionTaskQuery.getTuningConfig());
+    } else {
+      numAvailableTaskSlots -=
+          
getMaxTaskSlotsForMSQCompactionTask(compactionTaskQuery.getContext());
+    }
+  }
+
+  /**
+   * Retrieves currently running tasks of type {@link #COMPACTION_TASK_TYPE} 
from
+   * the Overlord.
+   * <p>
+   * Also queries the Overlord for the status of all tasks that were submitted
+   * recently but are not active anymore. The statuses are then updated in the
+   * {@link CompactionStatusTracker}.
+   */
+  public List<ClientCompactionTaskQuery> fetchRunningCompactionTasks()
+  {
+    // Fetch currently running compaction tasks
+    final List<TaskStatusPlus> compactionTasks = 
CoordinatorDutyUtils.getStatusOfActiveTasks(
+        overlordClient,
+        status -> status != null && 
COMPACTION_TASK_TYPE.equals(status.getType())
+    );
+
+    final Set<String> activeTaskIds
+        = 
compactionTasks.stream().map(TaskStatusPlus::getId).collect(Collectors.toSet());
+    trackStatusOfCompletedTasks(activeTaskIds);
+
+    final List<ClientCompactionTaskQuery> runningCompactTasks = new 
ArrayList<>();
+    for (TaskStatusPlus status : compactionTasks) {
+      final TaskPayloadResponse response =
+          FutureUtils.getUnchecked(overlordClient.taskPayload(status.getId()), 
true);
+      if (response == null) {
+        throw new ISE("Could not find payload for active compaction task[%s]", 
status.getId());
+      } else if 
(!COMPACTION_TASK_TYPE.equals(response.getPayload().getType())) {
+        throw new ISE(
+            "Payload of active compaction task[%s] is of invalid type[%s]",
+            status.getId(), response.getPayload().getType()
+        );
+      }
+
+      runningCompactTasks.add((ClientCompactionTaskQuery) 
response.getPayload());
+    }
+
+    return runningCompactTasks;
+  }
+
+  /**
+   * Cancels a currently running compaction task only if the segment 
granularity
+   * has changed in the datasource compaction config. Otherwise, the task is
+   * retained and its intervals are skipped from the current round of 
compaction.
+   *
+   * @return true if the task was canceled, false otherwise.
+   */
+  public boolean cancelTaskOnlyIfGranularityChanged(
+      ClientCompactionTaskQuery compactionTaskQuery,
+      DataSourceCompactionConfig dataSourceCompactionConfig
+  )
+  {
+    if (dataSourceCompactionConfig == null
+        || dataSourceCompactionConfig.getGranularitySpec() == null
+        || compactionTaskQuery.getGranularitySpec() == null) {
+      skipTaskInterval(compactionTaskQuery);
+      reserveTaskSlots(compactionTaskQuery);
+      return false;
+    }
+
+    Granularity configuredSegmentGranularity = 
dataSourceCompactionConfig.getGranularitySpec()
+                                                                         
.getSegmentGranularity();
+    Granularity taskSegmentGranularity = 
compactionTaskQuery.getGranularitySpec().getSegmentGranularity();
+    if (configuredSegmentGranularity == null || 
configuredSegmentGranularity.equals(taskSegmentGranularity)) {
+      skipTaskInterval(compactionTaskQuery);
+      reserveTaskSlots(compactionTaskQuery);
+      return false;
+    }
+
+    log.info(
+        "Cancelling task[%s] as task segmentGranularity[%s] differs from 
compaction config segmentGranularity[%s].",
+        compactionTaskQuery.getId(), taskSegmentGranularity, 
configuredSegmentGranularity
+    );
+    overlordClient.cancelTask(compactionTaskQuery.getId());
+    return true;
+  }
+
+  /**
+   * Retrieves the list of intervals locked by higher priority tasks for each 
datasource.
+   * Since compaction tasks submitted for these Intervals would have to wait 
anyway,
+   * we skip these Intervals until the next compaction run by adding them to
+   * {@link #intervalsToSkipCompaction}.
+   * <p>
+   */
+  public void skipLockedIntervals(List<DataSourceCompactionConfig> 
compactionConfigs)
+  {
+    final List<LockFilterPolicy> lockFilterPolicies = compactionConfigs
+        .stream()
+        .map(config ->
+                 new LockFilterPolicy(config.getDataSource(), 
config.getTaskPriority(), null, config.getTaskContext()))
+        .collect(Collectors.toList());
+    final Map<String, List<Interval>> datasourceToLockedIntervals =
+        new 
HashMap<>(FutureUtils.getUnchecked(overlordClient.findLockedIntervals(lockFilterPolicies),
 true));
+    log.debug(
+        "Skipping the following intervals for Compaction as they are currently 
locked: %s",
+        datasourceToLockedIntervals
+    );
+
+    // Skip all the intervals locked by higher priority tasks for each 
datasource
+    // This must be done after the invalid compaction tasks are cancelled

Review Comment:
   should this suggestion/requirement be in the javadoc instead of a normal 
code comment?



##########
indexing-service/src/main/java/org/apache/druid/indexing/compact/CompactionSupervisor.java:
##########
@@ -51,6 +54,32 @@
     this.dataSource = supervisorSpec.getSpec().getDataSource();
   }
 
+  public CompactionSupervisorSpec getSpec()
+  {
+    return supervisorSpec;
+  }
+
+  /**
+   * Checks if this supervisor is ready to create jobs in the current run of 
the
+   * scheduler.
+   */
+  public boolean shouldCreateJobs()
+  {
+    return !supervisorSpec.isSuspended();
+  }
+
+  /**
+   * Creates compaction jobs for this supervisor.
+   */
+  public List<CompactionJob> createJobs(
+      DruidInputSource inputSource,
+      DruidDatasourceDestination destination,

Review Comment:
   bumping this notice for awareness. seems like signature needs refactor



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to