maytasm commented on a change in pull request #10371:
URL: https://github.com/apache/druid/pull/10371#discussion_r490710835



##########
File path: 
server/src/main/java/org/apache/druid/server/coordinator/duty/CompactSegments.java
##########
@@ -234,29 +294,174 @@ private CoordinatorStats doRun(
     return newContext;
   }
 
-  private CoordinatorStats makeStats(int numCompactionTasks, 
CompactionSegmentIterator iterator)
+  /**
+   * This method can be use to atomically update the snapshots in {@code 
autoCompactionSnapshotPerDataSource} when
+   * no compaction task is schedule in this run. Currently, this method does 
not update compaction statistics
+   * (bytes, interval count, segment count, etc) since we skip iterating 
through the segments and cannot get an update
+   * on those statistics. Thus, this method only updates the schedule status 
and task list (compaction statistics
+   * remains the same as the previous snapshot).
+   */
+  private void updateAutoCompactionSnapshotWhenNoCompactTaskScheduled(
+      Map<String, AutoCompactionSnapshot.Builder> 
currentRunAutoCompactionSnapshotBuilders
+  )
+  {
+    Map<String, AutoCompactionSnapshot> previousSnapshots = 
autoCompactionSnapshotPerDataSource.get();
+    for (Map.Entry<String, AutoCompactionSnapshot.Builder> 
autoCompactionSnapshotBuilderEntry : 
currentRunAutoCompactionSnapshotBuilders.entrySet()) {
+      final String dataSource = autoCompactionSnapshotBuilderEntry.getKey();
+      AutoCompactionSnapshot previousSnapshot = 
previousSnapshots.get(dataSource);
+      if (previousSnapshot != null) {
+        
autoCompactionSnapshotBuilderEntry.getValue().incrementBytesAwaitingCompaction(previousSnapshot.getBytesAwaitingCompaction());
+        
autoCompactionSnapshotBuilderEntry.getValue().incrementBytesCompacted(previousSnapshot.getBytesCompacted());
+        
autoCompactionSnapshotBuilderEntry.getValue().incrementBytesSkipped(previousSnapshot.getBytesSkipped());
+        
autoCompactionSnapshotBuilderEntry.getValue().incrementSegmentCountAwaitingCompaction(previousSnapshot.getSegmentCountAwaitingCompaction());
+        
autoCompactionSnapshotBuilderEntry.getValue().incrementSegmentCountCompacted(previousSnapshot.getSegmentCountCompacted());
+        
autoCompactionSnapshotBuilderEntry.getValue().incrementSegmentCountSkipped(previousSnapshot.getSegmentCountSkipped());
+        
autoCompactionSnapshotBuilderEntry.getValue().incrementIntervalCountAwaitingCompaction(previousSnapshot.getIntervalCountAwaitingCompaction());
+        
autoCompactionSnapshotBuilderEntry.getValue().incrementIntervalCountCompacted(previousSnapshot.getIntervalCountCompacted());
+        
autoCompactionSnapshotBuilderEntry.getValue().incrementIntervalCountSkipped(previousSnapshot.getIntervalCountSkipped());
+      }
+    }
+
+    Map<String, AutoCompactionSnapshot> 
currentAutoCompactionSnapshotPerDataSource = Maps.transformValues(
+        currentRunAutoCompactionSnapshotBuilders,
+        AutoCompactionSnapshot.Builder::build
+    );
+    // Atomic update of autoCompactionSnapshotPerDataSource with the latest 
from this coordinator run
+    
autoCompactionSnapshotPerDataSource.set(currentAutoCompactionSnapshotPerDataSource);
+  }
+
+  private CoordinatorStats makeStats(
+      Map<String, AutoCompactionSnapshot.Builder> 
currentRunAutoCompactionSnapshotBuilders,
+      int numCompactionTasks,
+      CompactionSegmentIterator iterator
+  )
   {
+    final Map<String, AutoCompactionSnapshot> 
currentAutoCompactionSnapshotPerDataSource = new HashMap<>();
     final CoordinatorStats stats = new CoordinatorStats();
     stats.addToGlobalStat(COMPACTION_TASK_COUNT, numCompactionTasks);
-    totalSizesOfSegmentsAwaitingCompactionPerDataSource = 
iterator.totalRemainingSegmentsSizeBytes();
-    
totalSizesOfSegmentsAwaitingCompactionPerDataSource.object2LongEntrySet().fastForEach(
-        entry -> {
-          final String dataSource = entry.getKey();
-          final long totalSizeOfSegmentsAwaitingCompaction = 
entry.getLongValue();
-          stats.addToDataSourceStat(
-              TOTAL_SIZE_OF_SEGMENTS_AWAITING_COMPACTION,
-              dataSource,
-              totalSizeOfSegmentsAwaitingCompaction
-          );
-        }
-    );
+
+    // Iterate through all the remaining segments in the iterator.
+    // As these segments could be compacted but were not compacted due to lack 
of task slot, we will aggregates
+    // the statistic to the AwaitingCompaction statistics
+    for (; iterator.hasNext();) {

Review comment:
       Done




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[email protected]



---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to