jihoonson commented on a change in pull request #10371:
URL: https://github.com/apache/druid/pull/10371#discussion_r490690136



##########
File path: 
server/src/main/java/org/apache/druid/server/coordinator/duty/CompactSegments.java
##########
@@ -61,7 +77,7 @@
   private final CompactionSegmentSearchPolicy policy;
   private final IndexingServiceClient indexingServiceClient;
 
-  private Object2LongOpenHashMap<String> 
totalSizesOfSegmentsAwaitingCompactionPerDataSource;
+  private AtomicReference<Map<String, AutoCompactionSnapshot>> 
autoCompactionSnapshotPerDataSource = new AtomicReference<>();

Review comment:
       Could you please add a Javadoc about the concurrent access pattern on 
`autoCompactionSnapshotPerDataSource`? I guess it can say "This variable is 
updated by the Coordinator thread executing duties and read by HTTP threads 
processing Coordinator API calls."

##########
File path: 
server/src/main/java/org/apache/druid/server/coordinator/duty/EmitClusterStatsAndMetrics.java
##########
@@ -301,13 +301,115 @@ public DruidCoordinatorRuntimeParams 
run(DruidCoordinatorRuntimeParams params)
         )
     );
 
+    emitter.emit(
+        new ServiceMetricEvent.Builder().build(
+            "compact/maxSlot/count",

Review comment:
       Maybe `compactTask/maxSlot/count`?

##########
File path: 
server/src/main/java/org/apache/druid/server/coordinator/duty/EmitClusterStatsAndMetrics.java
##########
@@ -301,13 +301,115 @@ public DruidCoordinatorRuntimeParams 
run(DruidCoordinatorRuntimeParams params)
         )
     );
 
+    emitter.emit(
+        new ServiceMetricEvent.Builder().build(
+            "compact/maxSlot/count",
+            stats.getGlobalStat(CompactSegments.MAX_COMPACTION_TASK_SLOT)
+        )
+    );
+
+    emitter.emit(
+        new ServiceMetricEvent.Builder().build(
+            "compact/availableSlot/count",
+            stats.getGlobalStat(CompactSegments.AVAILABLE_COMPACTION_TASK_SLOT)
+        )
+    );
+
+    stats.forEachDataSourceStat(
+        CompactSegments.TOTAL_SIZE_OF_SEGMENTS_AWAITING,
+        (final String dataSource, final long count) -> {
+          emitter.emit(
+              new ServiceMetricEvent.Builder()
+                  .setDimension(DruidMetrics.DATASOURCE, dataSource)
+                  .build("segment/waitCompact/segmentBytes", count)

Review comment:
       I think `segment/waitCompact/bytes` would be enough. The second segment 
in `segmentBytes` seems duplicate.

##########
File path: 
server/src/main/java/org/apache/druid/server/coordinator/duty/EmitClusterStatsAndMetrics.java
##########
@@ -301,13 +301,115 @@ public DruidCoordinatorRuntimeParams 
run(DruidCoordinatorRuntimeParams params)
         )
     );
 
+    emitter.emit(
+        new ServiceMetricEvent.Builder().build(
+            "compact/maxSlot/count",
+            stats.getGlobalStat(CompactSegments.MAX_COMPACTION_TASK_SLOT)
+        )
+    );
+
+    emitter.emit(
+        new ServiceMetricEvent.Builder().build(
+            "compact/availableSlot/count",
+            stats.getGlobalStat(CompactSegments.AVAILABLE_COMPACTION_TASK_SLOT)
+        )
+    );
+
+    stats.forEachDataSourceStat(
+        CompactSegments.TOTAL_SIZE_OF_SEGMENTS_AWAITING,
+        (final String dataSource, final long count) -> {
+          emitter.emit(
+              new ServiceMetricEvent.Builder()
+                  .setDimension(DruidMetrics.DATASOURCE, dataSource)
+                  .build("segment/waitCompact/segmentBytes", count)

Review comment:
       Same comment for other metrics for segment size and count.

##########
File path: 
server/src/main/java/org/apache/druid/server/coordinator/duty/CompactSegments.java
##########
@@ -61,7 +77,7 @@
   private final CompactionSegmentSearchPolicy policy;
   private final IndexingServiceClient indexingServiceClient;
 
-  private Object2LongOpenHashMap<String> 
totalSizesOfSegmentsAwaitingCompactionPerDataSource;
+  private AtomicReference<Map<String, AutoCompactionSnapshot>> 
autoCompactionSnapshotPerDataSource = new AtomicReference<>();

Review comment:
       nit: this variable can be final.

##########
File path: 
server/src/main/java/org/apache/druid/server/coordinator/duty/EmitClusterStatsAndMetrics.java
##########
@@ -301,13 +301,115 @@ public DruidCoordinatorRuntimeParams 
run(DruidCoordinatorRuntimeParams params)
         )
     );
 
+    emitter.emit(
+        new ServiceMetricEvent.Builder().build(
+            "compact/maxSlot/count",
+            stats.getGlobalStat(CompactSegments.MAX_COMPACTION_TASK_SLOT)
+        )
+    );
+
+    emitter.emit(
+        new ServiceMetricEvent.Builder().build(
+            "compact/availableSlot/count",
+            stats.getGlobalStat(CompactSegments.AVAILABLE_COMPACTION_TASK_SLOT)
+        )
+    );
+
+    stats.forEachDataSourceStat(
+        CompactSegments.TOTAL_SIZE_OF_SEGMENTS_AWAITING,
+        (final String dataSource, final long count) -> {
+          emitter.emit(
+              new ServiceMetricEvent.Builder()
+                  .setDimension(DruidMetrics.DATASOURCE, dataSource)
+                  .build("segment/waitCompact/segmentBytes", count)
+          );
+        }
+    );
+
+    stats.forEachDataSourceStat(
+        CompactSegments.TOTAL_COUNT_OF_SEGMENTS_AWAITING,
+        (final String dataSource, final long count) -> {
+          emitter.emit(
+              new ServiceMetricEvent.Builder()
+                  .setDimension(DruidMetrics.DATASOURCE, dataSource)
+                  .build("segment/waitCompact/segmentCount", count)
+          );
+        }
+    );
+
+    stats.forEachDataSourceStat(
+        CompactSegments.TOTAL_INTERVAL_OF_SEGMENTS_AWAITING,
+        (final String dataSource, final long count) -> {
+          emitter.emit(
+              new ServiceMetricEvent.Builder()
+                  .setDimension(DruidMetrics.DATASOURCE, dataSource)
+                  .build("segment/waitCompact/intervalCount", count)

Review comment:
       Same question for other metrics for interval.

##########
File path: 
server/src/main/java/org/apache/druid/server/coordinator/duty/CompactSegments.java
##########
@@ -234,29 +294,174 @@ private CoordinatorStats doRun(
     return newContext;
   }
 
-  private CoordinatorStats makeStats(int numCompactionTasks, 
CompactionSegmentIterator iterator)
+  /**
+   * This method can be use to atomically update the snapshots in {@code 
autoCompactionSnapshotPerDataSource} when
+   * no compaction task is schedule in this run. Currently, this method does 
not update compaction statistics
+   * (bytes, interval count, segment count, etc) since we skip iterating 
through the segments and cannot get an update
+   * on those statistics. Thus, this method only updates the schedule status 
and task list (compaction statistics
+   * remains the same as the previous snapshot).
+   */
+  private void updateAutoCompactionSnapshotWhenNoCompactTaskScheduled(
+      Map<String, AutoCompactionSnapshot.Builder> 
currentRunAutoCompactionSnapshotBuilders
+  )
+  {
+    Map<String, AutoCompactionSnapshot> previousSnapshots = 
autoCompactionSnapshotPerDataSource.get();
+    for (Map.Entry<String, AutoCompactionSnapshot.Builder> 
autoCompactionSnapshotBuilderEntry : 
currentRunAutoCompactionSnapshotBuilders.entrySet()) {
+      final String dataSource = autoCompactionSnapshotBuilderEntry.getKey();
+      AutoCompactionSnapshot previousSnapshot = 
previousSnapshots.get(dataSource);
+      if (previousSnapshot != null) {
+        
autoCompactionSnapshotBuilderEntry.getValue().incrementBytesAwaitingCompaction(previousSnapshot.getBytesAwaitingCompaction());
+        
autoCompactionSnapshotBuilderEntry.getValue().incrementBytesCompacted(previousSnapshot.getBytesCompacted());
+        
autoCompactionSnapshotBuilderEntry.getValue().incrementBytesSkipped(previousSnapshot.getBytesSkipped());
+        
autoCompactionSnapshotBuilderEntry.getValue().incrementSegmentCountAwaitingCompaction(previousSnapshot.getSegmentCountAwaitingCompaction());
+        
autoCompactionSnapshotBuilderEntry.getValue().incrementSegmentCountCompacted(previousSnapshot.getSegmentCountCompacted());
+        
autoCompactionSnapshotBuilderEntry.getValue().incrementSegmentCountSkipped(previousSnapshot.getSegmentCountSkipped());
+        
autoCompactionSnapshotBuilderEntry.getValue().incrementIntervalCountAwaitingCompaction(previousSnapshot.getIntervalCountAwaitingCompaction());
+        
autoCompactionSnapshotBuilderEntry.getValue().incrementIntervalCountCompacted(previousSnapshot.getIntervalCountCompacted());
+        
autoCompactionSnapshotBuilderEntry.getValue().incrementIntervalCountSkipped(previousSnapshot.getIntervalCountSkipped());
+      }
+    }
+
+    Map<String, AutoCompactionSnapshot> 
currentAutoCompactionSnapshotPerDataSource = Maps.transformValues(
+        currentRunAutoCompactionSnapshotBuilders,
+        AutoCompactionSnapshot.Builder::build
+    );
+    // Atomic update of autoCompactionSnapshotPerDataSource with the latest 
from this coordinator run
+    
autoCompactionSnapshotPerDataSource.set(currentAutoCompactionSnapshotPerDataSource);
+  }
+
+  private CoordinatorStats makeStats(
+      Map<String, AutoCompactionSnapshot.Builder> 
currentRunAutoCompactionSnapshotBuilders,
+      int numCompactionTasks,
+      CompactionSegmentIterator iterator
+  )
   {
+    final Map<String, AutoCompactionSnapshot> 
currentAutoCompactionSnapshotPerDataSource = new HashMap<>();
     final CoordinatorStats stats = new CoordinatorStats();
     stats.addToGlobalStat(COMPACTION_TASK_COUNT, numCompactionTasks);
-    totalSizesOfSegmentsAwaitingCompactionPerDataSource = 
iterator.totalRemainingSegmentsSizeBytes();
-    
totalSizesOfSegmentsAwaitingCompactionPerDataSource.object2LongEntrySet().fastForEach(
-        entry -> {
-          final String dataSource = entry.getKey();
-          final long totalSizeOfSegmentsAwaitingCompaction = 
entry.getLongValue();
-          stats.addToDataSourceStat(
-              TOTAL_SIZE_OF_SEGMENTS_AWAITING_COMPACTION,
-              dataSource,
-              totalSizeOfSegmentsAwaitingCompaction
-          );
-        }
-    );
+
+    // Iterate through all the remaining segments in the iterator.
+    // As these segments could be compacted but were not compacted due to lack 
of task slot, we will aggregates
+    // the statistic to the AwaitingCompaction statistics
+    for (; iterator.hasNext();) {

Review comment:
       nit: you can use `while(iterator.hasNext())`

##########
File path: 
server/src/main/java/org/apache/druid/server/coordinator/duty/EmitClusterStatsAndMetrics.java
##########
@@ -301,13 +301,115 @@ public DruidCoordinatorRuntimeParams 
run(DruidCoordinatorRuntimeParams params)
         )
     );
 
+    emitter.emit(
+        new ServiceMetricEvent.Builder().build(
+            "compact/maxSlot/count",
+            stats.getGlobalStat(CompactSegments.MAX_COMPACTION_TASK_SLOT)
+        )
+    );
+
+    emitter.emit(
+        new ServiceMetricEvent.Builder().build(
+            "compact/availableSlot/count",

Review comment:
       Similarly, maybe `compactTask/availableSlot/count`?

##########
File path: 
server/src/main/java/org/apache/druid/server/coordinator/duty/EmitClusterStatsAndMetrics.java
##########
@@ -301,13 +301,115 @@ public DruidCoordinatorRuntimeParams 
run(DruidCoordinatorRuntimeParams params)
         )
     );
 
+    emitter.emit(
+        new ServiceMetricEvent.Builder().build(
+            "compact/maxSlot/count",
+            stats.getGlobalStat(CompactSegments.MAX_COMPACTION_TASK_SLOT)
+        )
+    );
+
+    emitter.emit(
+        new ServiceMetricEvent.Builder().build(
+            "compact/availableSlot/count",
+            stats.getGlobalStat(CompactSegments.AVAILABLE_COMPACTION_TASK_SLOT)
+        )
+    );
+
+    stats.forEachDataSourceStat(
+        CompactSegments.TOTAL_SIZE_OF_SEGMENTS_AWAITING,
+        (final String dataSource, final long count) -> {
+          emitter.emit(
+              new ServiceMetricEvent.Builder()
+                  .setDimension(DruidMetrics.DATASOURCE, dataSource)
+                  .build("segment/waitCompact/segmentBytes", count)
+          );
+        }
+    );
+
+    stats.forEachDataSourceStat(
+        CompactSegments.TOTAL_COUNT_OF_SEGMENTS_AWAITING,
+        (final String dataSource, final long count) -> {
+          emitter.emit(
+              new ServiceMetricEvent.Builder()
+                  .setDimension(DruidMetrics.DATASOURCE, dataSource)
+                  .build("segment/waitCompact/segmentCount", count)
+          );
+        }
+    );
+
+    stats.forEachDataSourceStat(
+        CompactSegments.TOTAL_INTERVAL_OF_SEGMENTS_AWAITING,
+        (final String dataSource, final long count) -> {
+          emitter.emit(
+              new ServiceMetricEvent.Builder()
+                  .setDimension(DruidMetrics.DATASOURCE, dataSource)
+                  .build("segment/waitCompact/intervalCount", count)

Review comment:
       Hmm, should it be `interval/waitCompact/count`?




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[email protected]



---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to