maytasm commented on a change in pull request #10371:
URL: https://github.com/apache/druid/pull/10371#discussion_r489189326



##########
File path: 
server/src/main/java/org/apache/druid/server/coordinator/duty/CompactSegments.java
##########
@@ -238,25 +272,102 @@ private CoordinatorStats makeStats(int 
numCompactionTasks, CompactionSegmentIter
   {
     final CoordinatorStats stats = new CoordinatorStats();
     stats.addToGlobalStat(COMPACTION_TASK_COUNT, numCompactionTasks);
-    totalSizesOfSegmentsAwaitingCompactionPerDataSource = 
iterator.totalRemainingSegmentsSizeBytes();
-    
totalSizesOfSegmentsAwaitingCompactionPerDataSource.object2LongEntrySet().fastForEach(
-        entry -> {
-          final String dataSource = entry.getKey();
-          final long totalSizeOfSegmentsAwaitingCompaction = 
entry.getLongValue();
-          stats.addToDataSourceStat(
-              TOTAL_SIZE_OF_SEGMENTS_AWAITING_COMPACTION,
-              dataSource,
-              totalSizeOfSegmentsAwaitingCompaction
-          );
-        }
-    );
+
+    // Make sure that the iterator iterate through all the remaining segments 
so that we can get accurate and correct
+    // statistics (remaining, skipped, processed, etc.). The reason we have to 
do this explicitly here is because
+    // earlier (when we are iterating to submit compaction tasks) we may have 
ran out of task slot and were not able
+    // to iterate to the first segment that needs compaction for some 
datasource.
+    iterator.flushAllSegments();
+    // Statistics of all segments that still need compaction after this run
+    Map<String, CompactionStatistics> allRemainingStatistics = 
iterator.totalRemainingStatistics();
+    // Statistics of all segments either compacted or skipped after this run
+    Map<String, CompactionStatistics> allProcessedStatistics = 
iterator.totalProcessedStatistics();
+
+    for (Map.Entry<String, AutoCompactionSnapshot> autoCompactionSnapshotEntry 
: autoCompactionSnapshotPerDataSource.entrySet()) {
+      final String dataSource = autoCompactionSnapshotEntry.getKey();
+      CompactionStatistics remainingStatistics = 
allRemainingStatistics.get(dataSource);
+      CompactionStatistics processedStatistics = 
allProcessedStatistics.get(dataSource);
+
+      long byteAwaitingCompaction = 0;
+      long segmentCountAwaitingCompaction = 0;
+      long intervalCountAwaitingCompaction = 0;
+      if (remainingStatistics != null) {
+        // If null means that all segments are either compacted or skipped.
+        // Hence, we can leave these set to default value of 0. If not null, 
we set it to the collected statistic.
+        byteAwaitingCompaction = remainingStatistics.getByteSum();
+        segmentCountAwaitingCompaction = 
remainingStatistics.getSegmentNumberCountSum();
+        intervalCountAwaitingCompaction = 
remainingStatistics.getSegmentIntervalCountSum();
+      }
+
+      long byteProcessed = 0;
+      long segmentCountProcessed = 0;
+      long intervalCountProcessed = 0;
+      if (processedStatistics != null) {
+        byteProcessed = processedStatistics.getByteSum();
+        segmentCountProcessed = processedStatistics.getSegmentNumberCountSum();
+        intervalCountProcessed = 
processedStatistics.getSegmentIntervalCountSum();
+      }
+
+      
autoCompactionSnapshotEntry.getValue().setByteCountAwaitingCompaction(byteAwaitingCompaction);
+      
autoCompactionSnapshotEntry.getValue().setByteCountProcessed(byteProcessed);
+      
autoCompactionSnapshotEntry.getValue().setSegmentCountAwaitingCompaction(segmentCountAwaitingCompaction);
+      
autoCompactionSnapshotEntry.getValue().setSegmentCountProcessed(segmentCountProcessed);
+      
autoCompactionSnapshotEntry.getValue().setIntervalCountAwaitingCompaction(intervalCountAwaitingCompaction);
+      
autoCompactionSnapshotEntry.getValue().setIntervalCountProcessed(intervalCountProcessed);
+
+      stats.addToDataSourceStat(
+          TOTAL_SIZE_OF_SEGMENTS_AWAITING_COMPACTION,
+          dataSource,
+          byteAwaitingCompaction
+      );
+      stats.addToDataSourceStat(
+          TOTAL_COUNT_OF_SEGMENTS_AWAITING_COMPACTION,
+          dataSource,
+          segmentCountAwaitingCompaction
+      );
+      stats.addToDataSourceStat(
+          TOTAL_INTERVAL_OF_SEGMENTS_AWAITING_COMPACTION,
+          dataSource,
+          intervalCountAwaitingCompaction
+      );
+      stats.addToDataSourceStat(
+          TOTAL_SIZE_OF_SEGMENTS_COMPACTED,
+          dataSource,
+          byteProcessed
+      );
+      stats.addToDataSourceStat(
+          TOTAL_COUNT_OF_SEGMENTS_COMPACTED,
+          dataSource,
+          segmentCountProcessed
+      );
+      stats.addToDataSourceStat(
+          TOTAL_INTERVAL_OF_SEGMENTS_COMPACTED,
+          dataSource,
+          intervalCountProcessed
+      );
+    }
+
     return stats;
   }
 
-  @SuppressWarnings("deprecation") // Intentionally using boxing get() to 
return null if dataSource is unknown
   @Nullable
   public Long getTotalSizeOfSegmentsAwaitingCompaction(String dataSource)
   {
-    return totalSizesOfSegmentsAwaitingCompactionPerDataSource.get(dataSource);
+    AutoCompactionSnapshot autoCompactionSnapshot = 
autoCompactionSnapshotPerDataSource.get(dataSource);
+    if (autoCompactionSnapshot == null) {
+      return null;
+    }
+    return 
autoCompactionSnapshotPerDataSource.get(dataSource).getByteCountAwaitingCompaction();

Review comment:
       Oops. Done




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[email protected]



---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to