github-code-scanning[bot] commented on code in PR #13197:
URL: https://github.com/apache/druid/pull/13197#discussion_r1217664291


##########
server/src/main/java/org/apache/druid/server/coordinator/duty/CompactSegments.java:
##########
@@ -345,135 +322,184 @@
         .collect(Collectors.toList());
   }
 
-  private CoordinatorStats doRun(
+  private int getCompactionTaskCapacity(CoordinatorCompactionConfig 
dynamicConfig)
+  {
+    int totalWorkerCapacity;
+    try {
+      totalWorkerCapacity = dynamicConfig.isUseAutoScaleSlots()
+                            ? 
indexingServiceClient.getTotalWorkerCapacityWithAutoScale()
+                            : indexingServiceClient.getTotalWorkerCapacity();
+    }
+    catch (Exception e) {
+      LOG.warn("Failed to get total worker capacity with auto scale slots. 
Falling back to current capacity count");
+      totalWorkerCapacity = indexingServiceClient.getTotalWorkerCapacity();
+    }
+
+    return Math.min(
+        (int) (totalWorkerCapacity * 
dynamicConfig.getCompactionTaskSlotRatio()),
+        dynamicConfig.getMaxCompactionTaskSlots()
+    );
+  }
+
+  private int getAvailableCompactionTaskSlots(int compactionTaskCapacity, int 
busyCompactionTaskSlots)
+  {
+    final int availableCompactionTaskSlots;
+    if (busyCompactionTaskSlots > 0) {
+      availableCompactionTaskSlots = Math.max(0, compactionTaskCapacity - 
busyCompactionTaskSlots);
+    } else {
+      // compactionTaskCapacity might be 0 if totalWorkerCapacity is low.
+      // This guarantees that at least one slot is available if
+      // compaction is enabled and estimatedIncompleteCompactionTasks is 0.
+      availableCompactionTaskSlots = Math.max(1, compactionTaskCapacity);
+    }
+    LOG.info(
+        "Found [%d] available task slots for compaction out of max compaction 
task capacity [%d]",
+        availableCompactionTaskSlots, compactionTaskCapacity
+    );
+
+    return availableCompactionTaskSlots;
+  }
+
+  /**
+   * Submits compaction tasks to the Overlord. Returns total number of tasks 
submitted.
+   */
+  private int submitCompactionTasks(
       Map<String, DataSourceCompactionConfig> compactionConfigs,
       Map<String, AutoCompactionSnapshot.Builder> 
currentRunAutoCompactionSnapshotBuilders,
       int numAvailableCompactionTaskSlots,
       CompactionSegmentIterator iterator
   )
   {
+    if (numAvailableCompactionTaskSlots <= 0) {
+      return 0;
+    }
+
     int numSubmittedTasks = 0;
     int numCompactionTasksAndSubtasks = 0;
 
     while (iterator.hasNext() && numCompactionTasksAndSubtasks < 
numAvailableCompactionTaskSlots) {
       final List<DataSegment> segmentsToCompact = iterator.next();
+      if (segmentsToCompact.isEmpty()) {
+        throw new ISE("segmentsToCompact is empty?");
+      }
 
-      if (!segmentsToCompact.isEmpty()) {
-        final String dataSourceName = segmentsToCompact.get(0).getDataSource();
-        // As these segments will be compacted, we will aggregates the 
statistic to the Compacted statistics
+      final String dataSourceName = segmentsToCompact.get(0).getDataSource();
 
-        AutoCompactionSnapshot.Builder snapshotBuilder = 
currentRunAutoCompactionSnapshotBuilders.computeIfAbsent(
-            dataSourceName,
-            k -> new AutoCompactionSnapshot.Builder(k, 
AutoCompactionSnapshot.AutoCompactionScheduleStatus.RUNNING)
-        );
-        
snapshotBuilder.incrementBytesCompacted(segmentsToCompact.stream().mapToLong(DataSegment::getSize).sum());
-        
snapshotBuilder.incrementIntervalCountCompacted(segmentsToCompact.stream().map(DataSegment::getInterval).distinct().count());
-        
snapshotBuilder.incrementSegmentCountCompacted(segmentsToCompact.size());
-
-        final DataSourceCompactionConfig config = 
compactionConfigs.get(dataSourceName);
-
-        // Create granularitySpec to send to compaction task
-        ClientCompactionTaskGranularitySpec granularitySpec;
-        Granularity segmentGranularityToUse = null;
-        if (config.getGranularitySpec() == null || 
config.getGranularitySpec().getSegmentGranularity() == null) {
-          // Determines segmentGranularity from the segmentsToCompact
-          // Each batch of segmentToCompact from CompactionSegmentIterator 
will contains the same interval as
-          // segmentGranularity is not set in the compaction config
-          Interval interval = segmentsToCompact.get(0).getInterval();
-          if (segmentsToCompact.stream().allMatch(segment -> 
interval.overlaps(segment.getInterval()))) {
-            try {
-              segmentGranularityToUse = 
GranularityType.fromPeriod(interval.toPeriod()).getDefaultGranularity();
-            }
-            catch (IllegalArgumentException iae) {
-              // This case can happen if the existing segment interval result 
in complicated periods.
-              // Fall back to setting segmentGranularity as null
-              LOG.warn("Cannot determine segmentGranularity from interval 
[%s]", interval);
-            }
-          } else {
-            LOG.warn("segmentsToCompact does not have the same interval. 
Fallback to not setting segmentGranularity for auto compaction task");
+      // As these segments will be compacted, we will aggregates the statistic 
to the Compacted statistics
+      AutoCompactionSnapshot.Builder snapshotBuilder = 
currentRunAutoCompactionSnapshotBuilders.computeIfAbsent(
+          dataSourceName,
+          k -> new AutoCompactionSnapshot.Builder(k, 
AutoCompactionSnapshot.AutoCompactionScheduleStatus.RUNNING)
+      );
+      snapshotBuilder.incrementBytesCompacted(
+          segmentsToCompact.stream().mapToLong(DataSegment::getSize).sum()
+      );
+      snapshotBuilder.incrementIntervalCountCompacted(
+          segmentsToCompact.stream()
+                           .map(DataSegment::getInterval)
+                           .distinct().count()
+      );
+      snapshotBuilder.incrementSegmentCountCompacted(segmentsToCompact.size());
+
+      final DataSourceCompactionConfig config = 
compactionConfigs.get(dataSourceName);
+
+      // Create granularitySpec to send to compaction task
+      ClientCompactionTaskGranularitySpec granularitySpec;
+      Granularity segmentGranularityToUse = null;
+      if (config.getGranularitySpec() == null || 
config.getGranularitySpec().getSegmentGranularity() == null) {
+        // Determines segmentGranularity from the segmentsToCompact
+        // Each batch of segmentToCompact from CompactionSegmentIterator will 
contains the same interval as
+        // segmentGranularity is not set in the compaction config
+        Interval interval = segmentsToCompact.get(0).getInterval();
+        if (segmentsToCompact.stream().allMatch(segment -> 
interval.overlaps(segment.getInterval()))) {
+          try {
+            segmentGranularityToUse = 
GranularityType.fromPeriod(interval.toPeriod()).getDefaultGranularity();
+          }
+          catch (IllegalArgumentException iae) {
+            // This case can happen if the existing segment interval result in 
complicated periods.
+            // Fall back to setting segmentGranularity as null
+            LOG.warn("Cannot determine segmentGranularity from interval [%s]", 
interval);
           }
         } else {
-          segmentGranularityToUse = 
config.getGranularitySpec().getSegmentGranularity();
+          LOG.warn(
+              "segmentsToCompact does not have the same interval. Fallback to 
not setting segmentGranularity for auto compaction task");
         }
-        granularitySpec = new ClientCompactionTaskGranularitySpec(
-            segmentGranularityToUse,
-            config.getGranularitySpec() != null ? 
config.getGranularitySpec().getQueryGranularity() : null,
-            config.getGranularitySpec() != null ? 
config.getGranularitySpec().isRollup() : null
+      } else {
+        segmentGranularityToUse = 
config.getGranularitySpec().getSegmentGranularity();
+      }
+      granularitySpec = new ClientCompactionTaskGranularitySpec(
+          segmentGranularityToUse,
+          config.getGranularitySpec() != null ? 
config.getGranularitySpec().getQueryGranularity() : null,
+          config.getGranularitySpec() != null ? 
config.getGranularitySpec().isRollup() : null
+      );
 
+      // Create dimensionsSpec to send to compaction task
+      ClientCompactionTaskDimensionsSpec dimensionsSpec;
+      if (config.getDimensionsSpec() != null) {
+        dimensionsSpec = new ClientCompactionTaskDimensionsSpec(
+            config.getDimensionsSpec().getDimensions()
         );
+      } else {
+        dimensionsSpec = null;
+      }
 
-        // Create dimensionsSpec to send to compaction task
-        ClientCompactionTaskDimensionsSpec dimensionsSpec;
-        if (config.getDimensionsSpec() != null) {
-          dimensionsSpec = new ClientCompactionTaskDimensionsSpec(
-              config.getDimensionsSpec().getDimensions()
-          );
-        } else {
-          dimensionsSpec = null;
-        }
-
-        // Create transformSpec to send to compaction task
-        ClientCompactionTaskTransformSpec transformSpec = null;
-        if (config.getTransformSpec() != null) {
-          transformSpec = new ClientCompactionTaskTransformSpec(
-              config.getTransformSpec().getFilter()
-          );
-        }
+      // Create transformSpec to send to compaction task
+      ClientCompactionTaskTransformSpec transformSpec = null;
+      if (config.getTransformSpec() != null) {
+        transformSpec = new ClientCompactionTaskTransformSpec(
+            config.getTransformSpec().getFilter()
+        );
+      }
 
-        Boolean dropExisting = null;
-        if (config.getIoConfig() != null) {
-          dropExisting = config.getIoConfig().isDropExisting();
-        }
+      Boolean dropExisting = null;
+      if (config.getIoConfig() != null) {
+        dropExisting = config.getIoConfig().isDropExisting();
+      }
 
-        // If all the segments found to be compacted are tombstones then 
dropExisting
-        // needs to be forced to true. This forcing needs to  happen in the 
case that
-        // the flag is null, or it is false. It is needed when it is null to 
avoid the
-        // possibility of the code deciding to default it to false later.
-        // Forcing the flag to true will enable the task ingestion code to 
generate new, compacted, tombstones to
-        // cover the tombstones found to be compacted as well as to mark them
-        // as compacted (update their lastCompactionState). If we don't force 
the
-        // flag then every time this compact duty runs it will find the same 
tombstones
-        // in the interval since their lastCompactionState
-        // was not set repeating this over and over and the duty will not make 
progress; it
-        // will become stuck on this set of tombstones.
-        // This forcing code should be revised
-        // when/if the autocompaction code policy to decide which segments to 
compact changes
-        if (dropExisting == null || !dropExisting) {
-          if (segmentsToCompact.stream().allMatch(dataSegment -> 
dataSegment.isTombstone())) {
-            dropExisting = true;
-            LOG.info("Forcing dropExisting to %s since all segments to compact 
are tombstones", dropExisting);
-          }
+      // If all the segments found to be compacted are tombstones then 
dropExisting
+      // needs to be forced to true. This forcing needs to  happen in the case 
that
+      // the flag is null, or it is false. It is needed when it is null to 
avoid the
+      // possibility of the code deciding to default it to false later.
+      // Forcing the flag to true will enable the task ingestion code to 
generate new, compacted, tombstones to
+      // cover the tombstones found to be compacted as well as to mark them
+      // as compacted (update their lastCompactionState). If we don't force the
+      // flag then every time this compact duty runs it will find the same 
tombstones
+      // in the interval since their lastCompactionState
+      // was not set repeating this over and over and the duty will not make 
progress; it
+      // will become stuck on this set of tombstones.
+      // This forcing code should be revised
+      // when/if the autocompaction code policy to decide which segments to 
compact changes
+      if (dropExisting == null || !dropExisting) {
+        if (segmentsToCompact.stream().allMatch(DataSegment::isTombstone)) {
+          dropExisting = true;
+          LOG.info("Forcing dropExisting to true since all segments to compact 
are tombstones.");
         }
+      }
 
-        // make tuningConfig
-        final String taskId = indexingServiceClient.compactSegments(
-            "coordinator-issued",
-            segmentsToCompact,
-            config.getTaskPriority(),
-            
ClientCompactionTaskQueryTuningConfig.from(config.getTuningConfig(), 
config.getMaxRowsPerSegment(), config.getMetricsSpec() != null),
-            granularitySpec,
-            dimensionsSpec,
-            config.getMetricsSpec(),
-            transformSpec,
-            dropExisting,
-            newAutoCompactionContext(config.getTaskContext())
-        );
+      final String taskId = indexingServiceClient.compactSegments(
+          "coordinator-issued",
+          segmentsToCompact,
+          config.getTaskPriority(),
+          ClientCompactionTaskQueryTuningConfig.from(
+              config.getTuningConfig(),
+              config.getMaxRowsPerSegment(),

Review Comment:
   ## Deprecated method or constructor invocation
   
   Invoking [DataSourceCompactionConfig.getMaxRowsPerSegment](1) should be 
avoided because it has been deprecated.
   
   [Show more 
details](https://github.com/apache/druid/security/code-scanning/5041)



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to