jon-wei commented on a change in pull request #10843:
URL: https://github.com/apache/druid/pull/10843#discussion_r572460287
##########
File path:
server/src/main/java/org/apache/druid/server/coordinator/duty/NewestSegmentFirstIterator.java
##########
@@ -75,6 +85,10 @@
// searchIntervals keeps track of the current state of which interval should
be considered to search segments to
// compact.
private final Map<String, CompactibleTimelineObjectHolderCursor>
timelineIterators;
+ // This is needed for datasource that has segmentGranularity configured
+ // If configured segmentGranularity is finer than current
segmentGranularity, the same set of segments
+ // can belong to multiple intervals in the timeline. We keep track of the
+ private final Map<String, Set<Interval>> intervalCompactedForDatasource =
new HashMap<>();
Review comment:
Did you mean to add more to the comment?
##########
File path:
server/src/main/java/org/apache/druid/server/coordinator/duty/NewestSegmentFirstIterator.java
##########
@@ -93,12 +107,53 @@
dataSources.forEach((String dataSource, VersionedIntervalTimeline<String,
DataSegment> timeline) -> {
final DataSourceCompactionConfig config =
compactionConfigs.get(dataSource);
-
+ Granularity configuredSegmentGranularity = null;
if (config != null && !timeline.isEmpty()) {
+ Map<Pair<Interval, SegmentId>, ShardSpec> originalShardSpecs = new
HashMap<>();
+ if (config.getGranularitySpec() != null &&
config.getGranularitySpec().getSegmentGranularity() != null) {
+ Map<Interval, Set<DataSegment>> intervalToPartitionMap = new
HashMap<>();
+ configuredSegmentGranularity =
config.getGranularitySpec().getSegmentGranularity();
+ // Create a new timeline to hold segments in the new configured
segment granularity
+ VersionedIntervalTimeline<String, DataSegment>
timelineWithConfiguredSegmentGranularity = new
VersionedIntervalTimeline<>(Comparator.naturalOrder());
+ Set<DataSegment> segments =
timeline.findNonOvershadowedObjectsInInterval(Intervals.ETERNITY,
Partitions.ONLY_COMPLETE);
+ for (DataSegment segment : segments) {
+ // Convert original segmentGranularity to new granularities bucket
by configuredSegmentGranularity
+ // For example, if the original is interval of
2020-01-28/2020-02-03 with WEEK granularity
+ // and the configuredSegmentGranularity is MONTH, the segment will
be split to two segments
+ // of 2020-01/2020-02 and 2020-02/2020-03.
+ for (Interval interval :
configuredSegmentGranularity.getIterable(segment.getInterval())) {
+ intervalToPartitionMap.computeIfAbsent(interval, k -> new
HashSet<>()).add(segment);
+ }
+ }
+ for (Map.Entry<Interval, Set<DataSegment>> partitionsPerInterval :
intervalToPartitionMap.entrySet()) {
+ Interval interval = partitionsPerInterval.getKey();
+ int partitionNum = 0;
+ Set<DataSegment> segmentSet = partitionsPerInterval.getValue();
+ int partitions = segmentSet.size();
+ for (DataSegment segment : segmentSet) {
+ DataSegment segmentsForCompact = segment.withShardSpec(new
NumberedShardSpec(partitionNum, partitions));
+ // PartitionHolder can only holds chucks of one partition space
Review comment:
chucks -> chunks
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]