maytasm commented on a change in pull request #10371:
URL: https://github.com/apache/druid/pull/10371#discussion_r488385897
##########
File path:
server/src/main/java/org/apache/druid/server/coordinator/duty/NewestSegmentFirstIterator.java
##########
@@ -112,27 +114,38 @@
}
@Override
- public Object2LongOpenHashMap<String> totalRemainingSegmentsSizeBytes()
+ public Map<String, CompactionStatistics> totalRemainingStatistics()
{
- final Object2LongOpenHashMap<String> resultMap = new
Object2LongOpenHashMap<>();
- resultMap.defaultReturnValue(UNKNOWN_TOTAL_REMAINING_SEGMENTS_SIZE);
- for (QueueEntry entry : queue) {
- final VersionedIntervalTimeline<String, DataSegment> timeline =
dataSources.get(entry.getDataSource());
- final Interval interval = new
Interval(timeline.first().getInterval().getStart(), entry.interval.getEnd());
-
- final List<TimelineObjectHolder<String, DataSegment>> holders =
timeline.lookup(interval);
-
- long size = 0;
- for (DataSegment segment : FluentIterable
- .from(holders)
- .transformAndConcat(TimelineObjectHolder::getObject)
- .transform(PartitionChunk::getObject)) {
- size += segment.getSize();
- }
+ return remainingSegments;
+ }
+
+ @Override
+ public Map<String, CompactionStatistics> totalProcessedStatistics()
+ {
+ return processedSegments;
+ }
- resultMap.put(entry.getDataSource(), size);
+ @Override
+ public void flushAllSegments()
+ {
+ if (queue.isEmpty()) {
+ return;
+ }
+ QueueEntry entry;
+ while ((entry = queue.poll()) != null) {
+ final List<DataSegment> resultSegments = entry.segments;
+ final String dataSourceName = resultSegments.get(0).getDataSource();
+ // This entry was in the queue, meaning that it was not processed.
Hence, also aggregates it's
+ // statistic to the remaining segments counts.
+ collectSegmentStatistics(remainingSegments, dataSourceName, new
SegmentsToCompact(entry.segments));
+ final CompactibleTimelineObjectHolderCursor
compactibleTimelineObjectHolderCursor = timelineIterators.get(
+ dataSourceName
+ );
+ // WARNING: This iterates the compactibleTimelineObjectHolderCursor.
+ // Since this method is intended to only be use after all necessary
iteration is done on this iterator
Review comment:
I mean that this method (`flushAllSegments`), will iterates the
`compactibleTimelineObjectHolderCursor` in `iterateAllSegments` method call.
This class `NewestSegmentFirstIterator` when use as a iterator in the next()
method will also iterate the `compactibleTimelineObjectHolderCursor`.
Hence, this iterator (`NewestSegmentFirstIterator) cannot be use to iterate
after this method (`flushAllSegments`) is called.
Basically, you cannot call `flushAllSegments` while iterating the
NewestSegmentFirstIterator and you cannot call `flushAllSegments` then go back
to iterating the NewestSegmentFirstIterator
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]