maytasm commented on a change in pull request #10843:
URL: https://github.com/apache/druid/pull/10843#discussion_r573559553
##########
File path:
server/src/main/java/org/apache/druid/server/coordinator/duty/NewestSegmentFirstIterator.java
##########
@@ -93,12 +107,53 @@
dataSources.forEach((String dataSource, VersionedIntervalTimeline<String,
DataSegment> timeline) -> {
final DataSourceCompactionConfig config =
compactionConfigs.get(dataSource);
-
+ Granularity configuredSegmentGranularity = null;
if (config != null && !timeline.isEmpty()) {
+ Map<Pair<Interval, SegmentId>, ShardSpec> originalShardSpecs = new
HashMap<>();
+ if (config.getGranularitySpec() != null &&
config.getGranularitySpec().getSegmentGranularity() != null) {
+ Map<Interval, Set<DataSegment>> intervalToPartitionMap = new
HashMap<>();
+ configuredSegmentGranularity =
config.getGranularitySpec().getSegmentGranularity();
+ // Create a new timeline to hold segments in the new configured
segment granularity
+ VersionedIntervalTimeline<String, DataSegment>
timelineWithConfiguredSegmentGranularity = new
VersionedIntervalTimeline<>(Comparator.naturalOrder());
+ Set<DataSegment> segments =
timeline.findNonOvershadowedObjectsInInterval(Intervals.ETERNITY,
Partitions.ONLY_COMPLETE);
+ for (DataSegment segment : segments) {
+ // Convert original segmentGranularity to new granularities bucket
by configuredSegmentGranularity
+ // For example, if the original is interval of
2020-01-28/2020-02-03 with WEEK granularity
+ // and the configuredSegmentGranularity is MONTH, the segment will
be split to two segments
+ // of 2020-01/2020-02 and 2020-02/2020-03.
+ for (Interval interval :
configuredSegmentGranularity.getIterable(segment.getInterval())) {
+ intervalToPartitionMap.computeIfAbsent(interval, k -> new
HashSet<>()).add(segment);
+ }
+ }
+ for (Map.Entry<Interval, Set<DataSegment>> partitionsPerInterval :
intervalToPartitionMap.entrySet()) {
+ Interval interval = partitionsPerInterval.getKey();
+ int partitionNum = 0;
+ Set<DataSegment> segmentSet = partitionsPerInterval.getValue();
+ int partitions = segmentSet.size();
+ for (DataSegment segment : segmentSet) {
+ DataSegment segmentsForCompact = segment.withShardSpec(new
NumberedShardSpec(partitionNum, partitions));
+ // PartitionHolder can only holds chucks of one partition space
Review comment:
Done
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]