maytasm commented on a change in pull request #10371:
URL: https://github.com/apache/druid/pull/10371#discussion_r490710835
##########
File path:
server/src/main/java/org/apache/druid/server/coordinator/duty/CompactSegments.java
##########
@@ -234,29 +294,174 @@ private CoordinatorStats doRun(
return newContext;
}
- private CoordinatorStats makeStats(int numCompactionTasks,
CompactionSegmentIterator iterator)
+ /**
+ * This method can be use to atomically update the snapshots in {@code
autoCompactionSnapshotPerDataSource} when
+ * no compaction task is schedule in this run. Currently, this method does
not update compaction statistics
+ * (bytes, interval count, segment count, etc) since we skip iterating
through the segments and cannot get an update
+ * on those statistics. Thus, this method only updates the schedule status
and task list (compaction statistics
+ * remains the same as the previous snapshot).
+ */
+ private void updateAutoCompactionSnapshotWhenNoCompactTaskScheduled(
+ Map<String, AutoCompactionSnapshot.Builder>
currentRunAutoCompactionSnapshotBuilders
+ )
+ {
+ Map<String, AutoCompactionSnapshot> previousSnapshots =
autoCompactionSnapshotPerDataSource.get();
+ for (Map.Entry<String, AutoCompactionSnapshot.Builder>
autoCompactionSnapshotBuilderEntry :
currentRunAutoCompactionSnapshotBuilders.entrySet()) {
+ final String dataSource = autoCompactionSnapshotBuilderEntry.getKey();
+ AutoCompactionSnapshot previousSnapshot =
previousSnapshots.get(dataSource);
+ if (previousSnapshot != null) {
+
autoCompactionSnapshotBuilderEntry.getValue().incrementBytesAwaitingCompaction(previousSnapshot.getBytesAwaitingCompaction());
+
autoCompactionSnapshotBuilderEntry.getValue().incrementBytesCompacted(previousSnapshot.getBytesCompacted());
+
autoCompactionSnapshotBuilderEntry.getValue().incrementBytesSkipped(previousSnapshot.getBytesSkipped());
+
autoCompactionSnapshotBuilderEntry.getValue().incrementSegmentCountAwaitingCompaction(previousSnapshot.getSegmentCountAwaitingCompaction());
+
autoCompactionSnapshotBuilderEntry.getValue().incrementSegmentCountCompacted(previousSnapshot.getSegmentCountCompacted());
+
autoCompactionSnapshotBuilderEntry.getValue().incrementSegmentCountSkipped(previousSnapshot.getSegmentCountSkipped());
+
autoCompactionSnapshotBuilderEntry.getValue().incrementIntervalCountAwaitingCompaction(previousSnapshot.getIntervalCountAwaitingCompaction());
+
autoCompactionSnapshotBuilderEntry.getValue().incrementIntervalCountCompacted(previousSnapshot.getIntervalCountCompacted());
+
autoCompactionSnapshotBuilderEntry.getValue().incrementIntervalCountSkipped(previousSnapshot.getIntervalCountSkipped());
+ }
+ }
+
+ Map<String, AutoCompactionSnapshot>
currentAutoCompactionSnapshotPerDataSource = Maps.transformValues(
+ currentRunAutoCompactionSnapshotBuilders,
+ AutoCompactionSnapshot.Builder::build
+ );
+ // Atomic update of autoCompactionSnapshotPerDataSource with the latest
from this coordinator run
+
autoCompactionSnapshotPerDataSource.set(currentAutoCompactionSnapshotPerDataSource);
+ }
+
+ private CoordinatorStats makeStats(
+ Map<String, AutoCompactionSnapshot.Builder>
currentRunAutoCompactionSnapshotBuilders,
+ int numCompactionTasks,
+ CompactionSegmentIterator iterator
+ )
{
+ final Map<String, AutoCompactionSnapshot>
currentAutoCompactionSnapshotPerDataSource = new HashMap<>();
final CoordinatorStats stats = new CoordinatorStats();
stats.addToGlobalStat(COMPACTION_TASK_COUNT, numCompactionTasks);
- totalSizesOfSegmentsAwaitingCompactionPerDataSource =
iterator.totalRemainingSegmentsSizeBytes();
-
totalSizesOfSegmentsAwaitingCompactionPerDataSource.object2LongEntrySet().fastForEach(
- entry -> {
- final String dataSource = entry.getKey();
- final long totalSizeOfSegmentsAwaitingCompaction =
entry.getLongValue();
- stats.addToDataSourceStat(
- TOTAL_SIZE_OF_SEGMENTS_AWAITING_COMPACTION,
- dataSource,
- totalSizeOfSegmentsAwaitingCompaction
- );
- }
- );
+
+ // Iterate through all the remaining segments in the iterator.
+ // As these segments could be compacted but were not compacted due to lack
of task slot, we will aggregates
+ // the statistic to the AwaitingCompaction statistics
+ for (; iterator.hasNext();) {
Review comment:
Done
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]