jihoonson commented on a change in pull request #10371:
URL: https://github.com/apache/druid/pull/10371#discussion_r488943398



##########
File path: 
server/src/main/java/org/apache/druid/server/coordinator/AutoCompactionSnapshot.java
##########
@@ -0,0 +1,192 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.server.coordinator;
+
+import com.fasterxml.jackson.annotation.JsonCreator;
+import com.fasterxml.jackson.annotation.JsonProperty;
+
+import javax.annotation.Nullable;
+import javax.validation.constraints.NotNull;
+import java.util.Objects;
+
+public class AutoCompactionSnapshot
+{
+  public enum AutoCompactionScheduleStatus
+  {
+    NOT_ENABLED,
+    RUNNING
+  }
+
+  @JsonProperty
+  private String dataSource;
+  @JsonProperty
+  private AutoCompactionScheduleStatus scheduleStatus;
+  @JsonProperty
+  private String latestScheduledTaskId;
+  @JsonProperty
+  private long byteCountAwaitingCompaction;
+  @JsonProperty
+  private long byteCountProcessed;
+  @JsonProperty
+  private long segmentCountAwaitingCompaction;
+  @JsonProperty
+  private long segmentCountProcessed;
+  @JsonProperty
+  private long intervalCountAwaitingCompaction;
+  @JsonProperty
+  private long intervalCountProcessed;
+
+  @JsonCreator
+  public AutoCompactionSnapshot(
+      @JsonProperty @NotNull String dataSource,
+      @JsonProperty @NotNull AutoCompactionScheduleStatus scheduleStatus
+  )
+  {
+    this.dataSource = dataSource;
+    this.scheduleStatus = scheduleStatus;
+  }
+
+  @NotNull
+  public String getDataSource()
+  {
+    return dataSource;
+  }
+
+  @NotNull
+  public AutoCompactionScheduleStatus getScheduleStatus()
+  {
+    return scheduleStatus;
+  }
+
+  @Nullable
+  public String getLatestScheduledTaskId()
+  {
+    return latestScheduledTaskId;
+  }
+
+  public long getByteCountAwaitingCompaction()
+  {
+    return byteCountAwaitingCompaction;
+  }
+
+  public long getByteCountProcessed()
+  {
+    return byteCountProcessed;
+  }
+
+  public long getSegmentCountAwaitingCompaction()
+  {
+    return segmentCountAwaitingCompaction;
+  }
+
+  public long getSegmentCountProcessed()
+  {
+    return segmentCountProcessed;
+  }
+
+  public long getIntervalCountAwaitingCompaction()
+  {
+    return intervalCountAwaitingCompaction;
+  }
+
+  public long getIntervalCountProcessed()
+  {
+    return intervalCountProcessed;
+  }
+
+  public void setScheduleStatus(AutoCompactionScheduleStatus scheduleStatus)
+  {
+    this.scheduleStatus = scheduleStatus;
+  }
+
+  public void setLatestScheduledTaskId(String latestScheduledTaskId)
+  {
+    this.latestScheduledTaskId = latestScheduledTaskId;
+  }
+
+  public void setByteCountAwaitingCompaction(long byteCountAwaitingCompaction)
+  {
+    this.byteCountAwaitingCompaction = byteCountAwaitingCompaction;
+  }
+
+  public void setByteCountProcessed(long byteCountProcessed)
+  {
+    this.byteCountProcessed = byteCountProcessed;
+  }
+
+  public void setSegmentCountAwaitingCompaction(long 
segmentCountAwaitingCompaction)
+  {
+    this.segmentCountAwaitingCompaction = segmentCountAwaitingCompaction;
+  }
+
+  public void setSegmentCountProcessed(long segmentCountProcessed)
+  {
+    this.segmentCountProcessed = segmentCountProcessed;
+  }
+
+  public void setIntervalCountAwaitingCompaction(long 
intervalCountAwaitingCompaction)
+  {
+    this.intervalCountAwaitingCompaction = intervalCountAwaitingCompaction;
+  }
+
+  public void setIntervalCountProcessed(long intervalCountProcessed)
+  {
+    this.intervalCountProcessed = intervalCountProcessed;
+  }
+
+
+
+  @Override
+  public boolean equals(Object o)
+  {
+    if (this == o) {
+      return true;
+    }
+    if (o == null || getClass() != o.getClass()) {
+      return false;
+    }
+    AutoCompactionSnapshot that = (AutoCompactionSnapshot) o;
+    return byteCountAwaitingCompaction == that.byteCountAwaitingCompaction &&
+           byteCountProcessed == that.byteCountProcessed &&
+           segmentCountAwaitingCompaction == 
that.segmentCountAwaitingCompaction &&
+           segmentCountProcessed == that.segmentCountProcessed &&
+           intervalCountAwaitingCompaction == 
that.intervalCountAwaitingCompaction &&
+           intervalCountProcessed == that.intervalCountProcessed &&
+           dataSource.equals(that.dataSource) &&
+           Objects.equals(scheduleStatus, that.scheduleStatus) &&
+           Objects.equals(latestScheduledTaskId, that.latestScheduledTaskId);
+  }
+
+  @Override
+  public int hashCode()
+  {
+    return Objects.hash(
+        dataSource,
+        scheduleStatus,
+        latestScheduledTaskId,
+        byteCountAwaitingCompaction,

Review comment:
       This looks error-prone since all fields in this class are mutable. 
Suppose that this class was used as a key in a hash set. If you updated a field 
in this class after adding it to the set, its hash key would be different which 
will cause an unintended result. Even though it seems that `hashCode()` and 
`equals()` are used only in unit tests, I would suggest to make this class 
immutable.

##########
File path: 
server/src/main/java/org/apache/druid/server/coordinator/duty/CompactSegments.java
##########
@@ -61,7 +70,7 @@
   private final CompactionSegmentSearchPolicy policy;
   private final IndexingServiceClient indexingServiceClient;
 
-  private Object2LongOpenHashMap<String> 
totalSizesOfSegmentsAwaitingCompactionPerDataSource;
+  private HashMap<String, AutoCompactionSnapshot> 
autoCompactionSnapshotPerDataSource = new HashMap<>();

Review comment:
       It was my bad that updating 
`totalSizesOfSegmentsAwaitingCompactionPerDataSource` was not thread-safe, but 
it should be. It applies same to `autoCompactionSnapshotPerDataSource`. I think 
an easy way is storing `autoCompactionSnapshotPerDataSource` in an 
`AtomicReference` so that we can atomically update the reference to the hash 
map whenever we compute new snapshots.

##########
File path: 
server/src/main/java/org/apache/druid/server/coordinator/DruidCoordinator.java
##########
@@ -569,7 +580,7 @@ private void becomeLeader()
       }
 
       for (final Pair<? extends DutiesRunnable, Duration> dutiesRunnable : 
dutiesRunnables) {
-        ScheduledExecutors.scheduleWithFixedDelay(
+        ScheduledExecutors.scheduleAtFixedRate(

Review comment:
       :+1: 

##########
File path: 
server/src/main/java/org/apache/druid/server/coordinator/DruidCoordinator.java
##########
@@ -632,8 +643,9 @@ private void stopBeingLeader()
   {
     List<CoordinatorDuty> duties = new ArrayList<>();
     duties.add(new LogUsedSegments());
-    duties.addAll(makeCompactSegmentsDuty());
     duties.addAll(indexingServiceDuties);
+    // CompactSegmentsDuty should be the last duty as it can takea long time

Review comment:
       typo: `takea` -> `take a`

##########
File path: 
server/src/main/java/org/apache/druid/server/coordinator/duty/CompactionSegmentIterator.java
##########
@@ -19,22 +19,45 @@
 
 package org.apache.druid.server.coordinator.duty;
 
-import it.unimi.dsi.fastutil.objects.Object2LongOpenHashMap;
+import org.apache.druid.server.coordinator.CompactionStatistics;
 import org.apache.druid.timeline.DataSegment;
 
 import java.util.Iterator;
 import java.util.List;
+import java.util.Map;
 
 /**
  * Segments in the lists which are the elements of this iterator are sorted 
according to the natural segment order
  * (see {@link DataSegment#compareTo}).
  */
 public interface CompactionSegmentIterator extends Iterator<List<DataSegment>>

Review comment:
       Thinking about new methods in this interface, I'm not sure if they are 
good since now every implementation of this interface should track of remaining 
and processed segments even though the tracking logic will be likely duplicate 
(even though we have only one implementation yet :slightly_smiling_face:). How 
about adding `next(Map<String, CompactionStatistics> stats)` so that 
`CompactSegments` can pass in an appropriate map? Then, it can just iterate 
over all remaining entries in the iterator without introducing any methods such 
as `flushAllSegments()` which seems to have a complicated contract. 
`CompactionSegmentIterator` will not extend `Iterator` anymore in this case.

##########
File path: 
server/src/main/java/org/apache/druid/server/coordinator/duty/CompactSegments.java
##########
@@ -238,25 +272,102 @@ private CoordinatorStats makeStats(int 
numCompactionTasks, CompactionSegmentIter
   {
     final CoordinatorStats stats = new CoordinatorStats();
     stats.addToGlobalStat(COMPACTION_TASK_COUNT, numCompactionTasks);
-    totalSizesOfSegmentsAwaitingCompactionPerDataSource = 
iterator.totalRemainingSegmentsSizeBytes();
-    
totalSizesOfSegmentsAwaitingCompactionPerDataSource.object2LongEntrySet().fastForEach(
-        entry -> {
-          final String dataSource = entry.getKey();
-          final long totalSizeOfSegmentsAwaitingCompaction = 
entry.getLongValue();
-          stats.addToDataSourceStat(
-              TOTAL_SIZE_OF_SEGMENTS_AWAITING_COMPACTION,
-              dataSource,
-              totalSizeOfSegmentsAwaitingCompaction
-          );
-        }
-    );
+
+    // Make sure that the iterator iterate through all the remaining segments 
so that we can get accurate and correct
+    // statistics (remaining, skipped, processed, etc.). The reason we have to 
do this explicitly here is because
+    // earlier (when we are iterating to submit compaction tasks) we may have 
ran out of task slot and were not able
+    // to iterate to the first segment that needs compaction for some 
datasource.
+    iterator.flushAllSegments();
+    // Statistics of all segments that still need compaction after this run
+    Map<String, CompactionStatistics> allRemainingStatistics = 
iterator.totalRemainingStatistics();
+    // Statistics of all segments either compacted or skipped after this run
+    Map<String, CompactionStatistics> allProcessedStatistics = 
iterator.totalProcessedStatistics();
+
+    for (Map.Entry<String, AutoCompactionSnapshot> autoCompactionSnapshotEntry 
: autoCompactionSnapshotPerDataSource.entrySet()) {
+      final String dataSource = autoCompactionSnapshotEntry.getKey();
+      CompactionStatistics remainingStatistics = 
allRemainingStatistics.get(dataSource);
+      CompactionStatistics processedStatistics = 
allProcessedStatistics.get(dataSource);
+
+      long byteAwaitingCompaction = 0;
+      long segmentCountAwaitingCompaction = 0;
+      long intervalCountAwaitingCompaction = 0;
+      if (remainingStatistics != null) {
+        // If null means that all segments are either compacted or skipped.
+        // Hence, we can leave these set to default value of 0. If not null, 
we set it to the collected statistic.
+        byteAwaitingCompaction = remainingStatistics.getByteSum();
+        segmentCountAwaitingCompaction = 
remainingStatistics.getSegmentNumberCountSum();
+        intervalCountAwaitingCompaction = 
remainingStatistics.getSegmentIntervalCountSum();
+      }
+
+      long byteProcessed = 0;
+      long segmentCountProcessed = 0;
+      long intervalCountProcessed = 0;
+      if (processedStatistics != null) {
+        byteProcessed = processedStatistics.getByteSum();
+        segmentCountProcessed = processedStatistics.getSegmentNumberCountSum();
+        intervalCountProcessed = 
processedStatistics.getSegmentIntervalCountSum();
+      }
+
+      
autoCompactionSnapshotEntry.getValue().setByteCountAwaitingCompaction(byteAwaitingCompaction);
+      
autoCompactionSnapshotEntry.getValue().setByteCountProcessed(byteProcessed);
+      
autoCompactionSnapshotEntry.getValue().setSegmentCountAwaitingCompaction(segmentCountAwaitingCompaction);
+      
autoCompactionSnapshotEntry.getValue().setSegmentCountProcessed(segmentCountProcessed);
+      
autoCompactionSnapshotEntry.getValue().setIntervalCountAwaitingCompaction(intervalCountAwaitingCompaction);
+      
autoCompactionSnapshotEntry.getValue().setIntervalCountProcessed(intervalCountProcessed);
+
+      stats.addToDataSourceStat(
+          TOTAL_SIZE_OF_SEGMENTS_AWAITING_COMPACTION,
+          dataSource,
+          byteAwaitingCompaction
+      );
+      stats.addToDataSourceStat(
+          TOTAL_COUNT_OF_SEGMENTS_AWAITING_COMPACTION,
+          dataSource,
+          segmentCountAwaitingCompaction
+      );
+      stats.addToDataSourceStat(
+          TOTAL_INTERVAL_OF_SEGMENTS_AWAITING_COMPACTION,
+          dataSource,
+          intervalCountAwaitingCompaction
+      );
+      stats.addToDataSourceStat(
+          TOTAL_SIZE_OF_SEGMENTS_COMPACTED,
+          dataSource,
+          byteProcessed
+      );
+      stats.addToDataSourceStat(
+          TOTAL_COUNT_OF_SEGMENTS_COMPACTED,
+          dataSource,
+          segmentCountProcessed
+      );
+      stats.addToDataSourceStat(
+          TOTAL_INTERVAL_OF_SEGMENTS_COMPACTED,
+          dataSource,
+          intervalCountProcessed
+      );
+    }
+
     return stats;
   }
 
-  @SuppressWarnings("deprecation") // Intentionally using boxing get() to 
return null if dataSource is unknown
   @Nullable
   public Long getTotalSizeOfSegmentsAwaitingCompaction(String dataSource)
   {
-    return totalSizesOfSegmentsAwaitingCompactionPerDataSource.get(dataSource);
+    AutoCompactionSnapshot autoCompactionSnapshot = 
autoCompactionSnapshotPerDataSource.get(dataSource);
+    if (autoCompactionSnapshot == null) {
+      return null;
+    }
+    return 
autoCompactionSnapshotPerDataSource.get(dataSource).getByteCountAwaitingCompaction();

Review comment:
       Duplicate call to `autoCompactionSnapshotPerDataSource.get()`.

##########
File path: 
server/src/main/java/org/apache/druid/server/coordinator/duty/EmitClusterStatsAndMetrics.java
##########
@@ -301,13 +301,82 @@ public DruidCoordinatorRuntimeParams 
run(DruidCoordinatorRuntimeParams params)
         )
     );
 
+    emitter.emit(
+        new ServiceMetricEvent.Builder().build(
+            "compact/maxSlot/count",
+            stats.getGlobalStat(CompactSegments.MAX_COMPACTION_TASK_SLOT)
+        )
+    );
+
+    emitter.emit(
+        new ServiceMetricEvent.Builder().build(
+            "compact/availableSlot/count",
+            stats.getGlobalStat(CompactSegments.AVAILABLE_COMPACTION_TASK_SLOT)
+        )
+    );
+
+    stats.forEachDataSourceStat(
+        CompactSegments.TOTAL_SIZE_OF_SEGMENTS_AWAITING_COMPACTION,
+        (final String dataSource, final long count) -> {
+          emitter.emit(
+              new ServiceMetricEvent.Builder()
+                  .setDimension(DruidMetrics.DATASOURCE, dataSource)
+                  .build("segment/waitCompact/segmentByte", count)
+          );
+        }
+    );
+
+    stats.forEachDataSourceStat(
+        CompactSegments.TOTAL_COUNT_OF_SEGMENTS_AWAITING_COMPACTION,
+        (final String dataSource, final long count) -> {
+          emitter.emit(
+              new ServiceMetricEvent.Builder()
+                  .setDimension(DruidMetrics.DATASOURCE, dataSource)
+                  .build("segment/waitCompact/segmentCount", count)
+          );
+        }
+    );
+
+    stats.forEachDataSourceStat(
+        CompactSegments.TOTAL_INTERVAL_OF_SEGMENTS_AWAITING_COMPACTION,
+        (final String dataSource, final long count) -> {
+          emitter.emit(
+              new ServiceMetricEvent.Builder()
+                  .setDimension(DruidMetrics.DATASOURCE, dataSource)
+                  .build("segment/waitCompact/intervalCount", count)
+          );
+        }
+    );
+
+    stats.forEachDataSourceStat(
+        CompactSegments.TOTAL_SIZE_OF_SEGMENTS_COMPACTED,
+        (final String dataSource, final long count) -> {
+          emitter.emit(
+              new ServiceMetricEvent.Builder()
+                  .setDimension(DruidMetrics.DATASOURCE, dataSource)
+                  .build("segment/compacted/segmentByte", count)

Review comment:
       Same here. I think it should be `segmentBytes`.

##########
File path: 
server/src/main/java/org/apache/druid/server/coordinator/duty/EmitClusterStatsAndMetrics.java
##########
@@ -301,13 +301,82 @@ public DruidCoordinatorRuntimeParams 
run(DruidCoordinatorRuntimeParams params)
         )
     );
 
+    emitter.emit(
+        new ServiceMetricEvent.Builder().build(
+            "compact/maxSlot/count",
+            stats.getGlobalStat(CompactSegments.MAX_COMPACTION_TASK_SLOT)
+        )
+    );
+
+    emitter.emit(
+        new ServiceMetricEvent.Builder().build(
+            "compact/availableSlot/count",
+            stats.getGlobalStat(CompactSegments.AVAILABLE_COMPACTION_TASK_SLOT)
+        )
+    );
+
+    stats.forEachDataSourceStat(
+        CompactSegments.TOTAL_SIZE_OF_SEGMENTS_AWAITING_COMPACTION,
+        (final String dataSource, final long count) -> {
+          emitter.emit(
+              new ServiceMetricEvent.Builder()
+                  .setDimension(DruidMetrics.DATASOURCE, dataSource)
+                  .build("segment/waitCompact/segmentByte", count)

Review comment:
       I think it should be `segmentBytes`.

##########
File path: 
server/src/main/java/org/apache/druid/server/coordinator/duty/EmitClusterStatsAndMetrics.java
##########
@@ -301,13 +301,82 @@ public DruidCoordinatorRuntimeParams 
run(DruidCoordinatorRuntimeParams params)
         )
     );
 
+    emitter.emit(
+        new ServiceMetricEvent.Builder().build(
+            "compact/maxSlot/count",
+            stats.getGlobalStat(CompactSegments.MAX_COMPACTION_TASK_SLOT)
+        )
+    );
+
+    emitter.emit(
+        new ServiceMetricEvent.Builder().build(
+            "compact/availableSlot/count",
+            stats.getGlobalStat(CompactSegments.AVAILABLE_COMPACTION_TASK_SLOT)
+        )
+    );
+
+    stats.forEachDataSourceStat(
+        CompactSegments.TOTAL_SIZE_OF_SEGMENTS_AWAITING_COMPACTION,
+        (final String dataSource, final long count) -> {
+          emitter.emit(
+              new ServiceMetricEvent.Builder()
+                  .setDimension(DruidMetrics.DATASOURCE, dataSource)
+                  .build("segment/waitCompact/segmentByte", count)
+          );
+        }
+    );
+
+    stats.forEachDataSourceStat(
+        CompactSegments.TOTAL_COUNT_OF_SEGMENTS_AWAITING_COMPACTION,
+        (final String dataSource, final long count) -> {
+          emitter.emit(
+              new ServiceMetricEvent.Builder()
+                  .setDimension(DruidMetrics.DATASOURCE, dataSource)
+                  .build("segment/waitCompact/segmentCount", count)
+          );
+        }
+    );
+
+    stats.forEachDataSourceStat(
+        CompactSegments.TOTAL_INTERVAL_OF_SEGMENTS_AWAITING_COMPACTION,
+        (final String dataSource, final long count) -> {
+          emitter.emit(
+              new ServiceMetricEvent.Builder()
+                  .setDimension(DruidMetrics.DATASOURCE, dataSource)
+                  .build("segment/waitCompact/intervalCount", count)
+          );
+        }
+    );
+
+    stats.forEachDataSourceStat(
+        CompactSegments.TOTAL_SIZE_OF_SEGMENTS_COMPACTED,
+        (final String dataSource, final long count) -> {
+          emitter.emit(
+              new ServiceMetricEvent.Builder()
+                  .setDimension(DruidMetrics.DATASOURCE, dataSource)
+                  .build("segment/compacted/segmentByte", count)
+          );
+        }
+    );
+
+    stats.forEachDataSourceStat(
+        CompactSegments.TOTAL_COUNT_OF_SEGMENTS_COMPACTED,
+        (final String dataSource, final long count) -> {
+          emitter.emit(
+              new ServiceMetricEvent.Builder()
+                  .setDimension(DruidMetrics.DATASOURCE, dataSource)
+                  .build("segment/compacted/segmentCount", count)

Review comment:
       Should we emit metrics for skipped intervals segments as well?

##########
File path: 
server/src/main/java/org/apache/druid/server/coordinator/duty/NewestSegmentFirstIterator.java
##########
@@ -336,25 +353,72 @@ private boolean 
needsCompaction(ClientCompactionTaskQueryTuningConfig tuningConf
    * @return segments to compact
    */
   private SegmentsToCompact findSegmentsToCompact(
+      final String dataSourceName,
       final CompactibleTimelineObjectHolderCursor 
compactibleTimelineObjectHolderCursor,
       final DataSourceCompactionConfig config
   )
   {
-    final long inputSegmentSize = config.getInputSegmentSizeBytes();
+    while (compactibleTimelineObjectHolderCursor.hasNext()) {
+      final SegmentsToCompact candidates = new 
SegmentsToCompact(compactibleTimelineObjectHolderCursor.next());
+      if (isSegmentsNeedCompact(candidates, config, true)) {
+        return candidates;
+      } else {
+        collectSegmentStatistics(processedSegments, dataSourceName, 
candidates);
+      }
+    }
+    log.info("All segments look good! Nothing to compact");
+    return new SegmentsToCompact();
+  }
 
+  /**
+   * Progressively iterates all remaining time intervals (latest first) in the
+   * timeline {@param compactibleTimelineObjectHolderCursor}. Note that the 
timeline lookup duration is one day.
+   * The logic for checking if the segments can be compacted or not is then 
perform on each iteration.
+   * This is repeated until no remaining time intervals in {@param 
compactibleTimelineObjectHolderCursor}.
+   */
+  private void iterateAllSegments(
+      final String dataSourceName,
+      final CompactibleTimelineObjectHolderCursor 
compactibleTimelineObjectHolderCursor,
+      final DataSourceCompactionConfig config
+  )
+  {
     while (compactibleTimelineObjectHolderCursor.hasNext()) {
       final SegmentsToCompact candidates = new 
SegmentsToCompact(compactibleTimelineObjectHolderCursor.next());
+      if (isSegmentsNeedCompact(candidates, config, false)) {
+        // Collect statistic for segments that need compaction
+        collectSegmentStatistics(remainingSegments, dataSourceName, 
candidates);
+      } else {
+        // Collect statistic for segments that does not need compaction
+        collectSegmentStatistics(processedSegments, dataSourceName, 
candidates);
+      }
+    }
+  }
 
-      if (!candidates.isEmpty()) {
-        final boolean isCompactibleSize = candidates.getTotalSize() <= 
inputSegmentSize;
-        final boolean needsCompaction = needsCompaction(
-            
ClientCompactionTaskQueryTuningConfig.from(config.getTuningConfig(), 
config.getMaxRowsPerSegment()),
-            candidates
-        );
+  /**
+   * This method encapsulates the logic for checking if a given {@param 
candidates} needs compaction or not.
+   * If {@param logCannotCompactReason} is true then the reason for {@param 
candidates} not needing compaction is
+   * logged (for the case that {@param candidates} does not needs compaction).
+   *
+   * @return true if the {@param candidates} needs compaction, false if the 
{@param candidates} does not needs compaction
+   */
+  private boolean isSegmentsNeedCompact(

Review comment:
       `doSegmentsNeedCompaction()`?

##########
File path: 
server/src/main/java/org/apache/druid/server/coordinator/AutoCompactionSnapshot.java
##########
@@ -0,0 +1,192 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.server.coordinator;
+
+import com.fasterxml.jackson.annotation.JsonCreator;
+import com.fasterxml.jackson.annotation.JsonProperty;
+
+import javax.annotation.Nullable;
+import javax.validation.constraints.NotNull;
+import java.util.Objects;
+
+public class AutoCompactionSnapshot
+{
+  public enum AutoCompactionScheduleStatus
+  {
+    NOT_ENABLED,
+    RUNNING
+  }
+
+  @JsonProperty
+  private String dataSource;
+  @JsonProperty
+  private AutoCompactionScheduleStatus scheduleStatus;
+  @JsonProperty
+  private String latestScheduledTaskId;
+  @JsonProperty
+  private long byteCountAwaitingCompaction;

Review comment:
       IIRC, it was `byte` not `bytes`. `bytes` sounds better to me too.

##########
File path: 
server/src/main/java/org/apache/druid/server/coordinator/duty/NewestSegmentFirstIterator.java
##########
@@ -336,25 +353,72 @@ private boolean 
needsCompaction(ClientCompactionTaskQueryTuningConfig tuningConf
    * @return segments to compact
    */
   private SegmentsToCompact findSegmentsToCompact(
+      final String dataSourceName,
       final CompactibleTimelineObjectHolderCursor 
compactibleTimelineObjectHolderCursor,
       final DataSourceCompactionConfig config
   )
   {
-    final long inputSegmentSize = config.getInputSegmentSizeBytes();
+    while (compactibleTimelineObjectHolderCursor.hasNext()) {
+      final SegmentsToCompact candidates = new 
SegmentsToCompact(compactibleTimelineObjectHolderCursor.next());
+      if (isSegmentsNeedCompact(candidates, config, true)) {
+        return candidates;
+      } else {
+        collectSegmentStatistics(processedSegments, dataSourceName, 
candidates);
+      }
+    }
+    log.info("All segments look good! Nothing to compact");
+    return new SegmentsToCompact();
+  }
 
+  /**
+   * Progressively iterates all remaining time intervals (latest first) in the
+   * timeline {@param compactibleTimelineObjectHolderCursor}. Note that the 
timeline lookup duration is one day.
+   * The logic for checking if the segments can be compacted or not is then 
perform on each iteration.
+   * This is repeated until no remaining time intervals in {@param 
compactibleTimelineObjectHolderCursor}.
+   */
+  private void iterateAllSegments(
+      final String dataSourceName,
+      final CompactibleTimelineObjectHolderCursor 
compactibleTimelineObjectHolderCursor,
+      final DataSourceCompactionConfig config
+  )
+  {
     while (compactibleTimelineObjectHolderCursor.hasNext()) {
       final SegmentsToCompact candidates = new 
SegmentsToCompact(compactibleTimelineObjectHolderCursor.next());
+      if (isSegmentsNeedCompact(candidates, config, false)) {
+        // Collect statistic for segments that need compaction
+        collectSegmentStatistics(remainingSegments, dataSourceName, 
candidates);
+      } else {
+        // Collect statistic for segments that does not need compaction
+        collectSegmentStatistics(processedSegments, dataSourceName, 
candidates);

Review comment:
       Should we distinguish segments processed and segments skipped? 
Information about segments skipped will be useful that you can be aware of how 
many segments (or intervals) that auto compaction has skipped.

##########
File path: 
server/src/main/java/org/apache/druid/server/coordinator/duty/CompactSegments.java
##########
@@ -61,7 +70,7 @@
   private final CompactionSegmentSearchPolicy policy;
   private final IndexingServiceClient indexingServiceClient;
 
-  private Object2LongOpenHashMap<String> 
totalSizesOfSegmentsAwaitingCompactionPerDataSource;
+  private HashMap<String, AutoCompactionSnapshot> 
autoCompactionSnapshotPerDataSource = new HashMap<>();

Review comment:
       When you fix this concurrency issue, please add enough description about 
what concurrency issue exists here and how it is handled. It would be also 
useful to check out our [concurrency 
checklist](https://github.com/apache/druid/blob/master/dev/code-review/concurrency.md).

##########
File path: 
server/src/test/java/org/apache/druid/server/coordinator/duty/CompactSegmentsTest.java
##########
@@ -270,12 +275,246 @@ public String get()
     assertLastSegmentNotCompacted(compactSegments);
   }
 
+  @Test
+  public void testMakeStats()

Review comment:
       Hmm this test looks similar to `testRun()`. Can they be merged by moving 
the snapshot verification to `assertCompactSegments()` (or merging 
`assertCompactSegmentStatistics` and `assertCompactSegments`)?




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[email protected]



---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to