jon-wei commented on a change in pull request #10843:
URL: https://github.com/apache/druid/pull/10843#discussion_r572460287



##########
File path: 
server/src/main/java/org/apache/druid/server/coordinator/duty/NewestSegmentFirstIterator.java
##########
@@ -75,6 +85,10 @@
   // searchIntervals keeps track of the current state of which interval should 
be considered to search segments to
   // compact.
   private final Map<String, CompactibleTimelineObjectHolderCursor> 
timelineIterators;
+  // This is needed for datasource that has segmentGranularity configured
+  // If configured segmentGranularity is finer than current 
segmentGranularity, the same set of segments
+  // can belong to multiple intervals in the timeline. We keep track of the
+  private final Map<String, Set<Interval>> intervalCompactedForDatasource = 
new HashMap<>();

Review comment:
       Did you mean to add more to the comment?

##########
File path: 
server/src/main/java/org/apache/druid/server/coordinator/duty/NewestSegmentFirstIterator.java
##########
@@ -93,12 +107,53 @@
 
     dataSources.forEach((String dataSource, VersionedIntervalTimeline<String, 
DataSegment> timeline) -> {
       final DataSourceCompactionConfig config = 
compactionConfigs.get(dataSource);
-
+      Granularity configuredSegmentGranularity = null;
       if (config != null && !timeline.isEmpty()) {
+        Map<Pair<Interval, SegmentId>, ShardSpec> originalShardSpecs = new 
HashMap<>();
+        if (config.getGranularitySpec() != null && 
config.getGranularitySpec().getSegmentGranularity() != null) {
+          Map<Interval, Set<DataSegment>> intervalToPartitionMap = new 
HashMap<>();
+          configuredSegmentGranularity = 
config.getGranularitySpec().getSegmentGranularity();
+          // Create a new timeline to hold segments in the new configured 
segment granularity
+          VersionedIntervalTimeline<String, DataSegment> 
timelineWithConfiguredSegmentGranularity = new 
VersionedIntervalTimeline<>(Comparator.naturalOrder());
+          Set<DataSegment> segments = 
timeline.findNonOvershadowedObjectsInInterval(Intervals.ETERNITY, 
Partitions.ONLY_COMPLETE);
+          for (DataSegment segment : segments) {
+            // Convert original segmentGranularity to new granularities bucket 
by configuredSegmentGranularity
+            // For example, if the original is interval of 
2020-01-28/2020-02-03 with WEEK granularity
+            // and the configuredSegmentGranularity is MONTH, the segment will 
be split to two segments
+            // of 2020-01/2020-02 and 2020-02/2020-03.
+            for (Interval interval : 
configuredSegmentGranularity.getIterable(segment.getInterval())) {
+              intervalToPartitionMap.computeIfAbsent(interval, k -> new 
HashSet<>()).add(segment);
+            }
+          }
+          for (Map.Entry<Interval, Set<DataSegment>> partitionsPerInterval : 
intervalToPartitionMap.entrySet()) {
+            Interval interval = partitionsPerInterval.getKey();
+            int partitionNum = 0;
+            Set<DataSegment> segmentSet = partitionsPerInterval.getValue();
+            int partitions = segmentSet.size();
+            for (DataSegment segment : segmentSet) {
+              DataSegment segmentsForCompact = segment.withShardSpec(new 
NumberedShardSpec(partitionNum, partitions));
+              // PartitionHolder can only holds chucks of one partition space

Review comment:
       chucks -> chunks




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[email protected]



---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to