jihoonson commented on a change in pull request #12334:
URL: https://github.com/apache/druid/pull/12334#discussion_r827570566



##########
File path: 
server/src/main/java/org/apache/druid/client/indexing/ClientCompactionIntervalSpec.java
##########
@@ -44,12 +45,27 @@
   @Nullable
   private final String sha256OfSortedSegmentIds;
 
-  public static ClientCompactionIntervalSpec fromSegments(List<DataSegment> 
segments)
+  public static ClientCompactionIntervalSpec fromSegments(List<DataSegment> 
segments, Granularity segmentGranularity)

Review comment:
       ```suggestion
     public static ClientCompactionIntervalSpec fromSegments(List<DataSegment> 
segments, @Nullable Granularity segmentGranularity)
   ```

##########
File path: 
server/src/main/java/org/apache/druid/client/indexing/ClientCompactionIntervalSpec.java
##########
@@ -44,12 +45,27 @@
   @Nullable
   private final String sha256OfSortedSegmentIds;
 
-  public static ClientCompactionIntervalSpec fromSegments(List<DataSegment> 
segments)
+  public static ClientCompactionIntervalSpec fromSegments(List<DataSegment> 
segments, Granularity segmentGranularity)
   {
+    Interval interval = 
JodaUtils.umbrellaInterval(segments.stream().map(DataSegment::getInterval).collect(Collectors.toList()));
+    if (segmentGranularity != null) {
+      // If segmentGranularity is set, then the segmentGranularity of the 
segments may not align with the configured segmentGranularity
+      // We must adjust the interval of the compaction task to fully cover and 
align with the segmentGranularity
+      // For example,
+      // - The umbrella interval of the segments is 2015-04-11/2015-04-12 but 
configured segmentGranularity is YEAR,
+      // if the compaction task's interval is 2015-04-11/2015-04-12 then we 
can run into race condition where after submitting
+      // the compaction task, a new segment outside of the interval (i.e. 
2015-02-11/2015-02-12) got created will be lost as it is
+      // overshadowed by the compacted segment (compacted segment has interval 
2015-01-01/2016-01-01.
+      // Hence, in this case, we must adjust the compaction task interval to 
2015-01-01/2016-01-01.
+      // - The umbrella interval of the segments is 2015-02-01/2015-03-01 but 
configured segmentGranularity is MONTH,
+      // if the compaction task's interval is 2015-02-01/2015-03-01 then 
compacted segments created will be
+      // 2015-01-26/2015-02-02, 2015-02-02/2015-02-09, 2015-02-09/2015-02-16, 
2015-02-16/2015-02-23, 2015-02-23/2015-03-02.
+      // The compacted segment would cause existing data from 2015-01-26 to 
2015-02-01 and 2015-03-01 to 2015-03-02 to be lost.
+      //  Hence, in this case, we must adjust the compaction task interval to 
2015-01-26/2015-03-02
+      interval = 
JodaUtils.umbrellaInterval(segmentGranularity.getIterable(interval));

Review comment:
       Looking at `JodaUtils.umbrellaInterval()`, I think this code can cause 
OOM if `segmentGranularity.getIterable()` returns lots of intervals. 
`JodaUtils.umbrellaInterval()` doesn't seem to need to store all `startDates` 
and `endDates` in memory. Instead, it can keep only one pair of `minStartDate` 
and `maxEndDate` which are updated in the loop. Should we fix it in this PR too?

##########
File path: 
server/src/main/java/org/apache/druid/client/indexing/ClientCompactionIntervalSpec.java
##########
@@ -44,12 +45,27 @@
   @Nullable
   private final String sha256OfSortedSegmentIds;
 
-  public static ClientCompactionIntervalSpec fromSegments(List<DataSegment> 
segments)
+  public static ClientCompactionIntervalSpec fromSegments(List<DataSegment> 
segments, Granularity segmentGranularity)
   {
+    Interval interval = 
JodaUtils.umbrellaInterval(segments.stream().map(DataSegment::getInterval).collect(Collectors.toList()));
+    if (segmentGranularity != null) {
+      // If segmentGranularity is set, then the segmentGranularity of the 
segments may not align with the configured segmentGranularity
+      // We must adjust the interval of the compaction task to fully cover and 
align with the segmentGranularity
+      // For example,
+      // - The umbrella interval of the segments is 2015-04-11/2015-04-12 but 
configured segmentGranularity is YEAR,
+      // if the compaction task's interval is 2015-04-11/2015-04-12 then we 
can run into race condition where after submitting
+      // the compaction task, a new segment outside of the interval (i.e. 
2015-02-11/2015-02-12) got created will be lost as it is
+      // overshadowed by the compacted segment (compacted segment has interval 
2015-01-01/2016-01-01.
+      // Hence, in this case, we must adjust the compaction task interval to 
2015-01-01/2016-01-01.
+      // - The umbrella interval of the segments is 2015-02-01/2015-03-01 but 
configured segmentGranularity is MONTH,
+      // if the compaction task's interval is 2015-02-01/2015-03-01 then 
compacted segments created will be
+      // 2015-01-26/2015-02-02, 2015-02-02/2015-02-09, 2015-02-09/2015-02-16, 
2015-02-16/2015-02-23, 2015-02-23/2015-03-02.

Review comment:
       I'm not sure how the segments will have these intervals after compacted. 
Can you elaborate? It would be nice to add your explanation in the comment.

##########
File path: 
server/src/main/java/org/apache/druid/client/indexing/ClientCompactionIntervalSpec.java
##########
@@ -44,12 +45,27 @@
   @Nullable
   private final String sha256OfSortedSegmentIds;
 
-  public static ClientCompactionIntervalSpec fromSegments(List<DataSegment> 
segments)
+  public static ClientCompactionIntervalSpec fromSegments(List<DataSegment> 
segments, Granularity segmentGranularity)
   {
+    Interval interval = 
JodaUtils.umbrellaInterval(segments.stream().map(DataSegment::getInterval).collect(Collectors.toList()));
+    if (segmentGranularity != null) {
+      // If segmentGranularity is set, then the segmentGranularity of the 
segments may not align with the configured segmentGranularity
+      // We must adjust the interval of the compaction task to fully cover and 
align with the segmentGranularity
+      // For example,
+      // - The umbrella interval of the segments is 2015-04-11/2015-04-12 but 
configured segmentGranularity is YEAR,
+      // if the compaction task's interval is 2015-04-11/2015-04-12 then we 
can run into race condition where after submitting
+      // the compaction task, a new segment outside of the interval (i.e. 
2015-02-11/2015-02-12) got created will be lost as it is
+      // overshadowed by the compacted segment (compacted segment has interval 
2015-01-01/2016-01-01.
+      // Hence, in this case, we must adjust the compaction task interval to 
2015-01-01/2016-01-01.
+      // - The umbrella interval of the segments is 2015-02-01/2015-03-01 but 
configured segmentGranularity is MONTH,
+      // if the compaction task's interval is 2015-02-01/2015-03-01 then 
compacted segments created will be
+      // 2015-01-26/2015-02-02, 2015-02-02/2015-02-09, 2015-02-09/2015-02-16, 
2015-02-16/2015-02-23, 2015-02-23/2015-03-02.
+      // The compacted segment would cause existing data from 2015-01-26 to 
2015-02-01 and 2015-03-01 to 2015-03-02 to be lost.
+      //  Hence, in this case, we must adjust the compaction task interval to 
2015-01-26/2015-03-02
+      interval = 
JodaUtils.umbrellaInterval(segmentGranularity.getIterable(interval));

Review comment:
       Or should we have a guardrail for `segmentGranularity` to not return 
such many intervals?

##########
File path: 
integration-tests/src/test/java/org/apache/druid/tests/coordinator/duty/ITAutoCompactionTest.java
##########
@@ -631,6 +635,103 @@ public void 
testAutoCompactionDutyWithSegmentGranularityAndSmallerSegmentGranula
     }
   }
 
+  @Test
+  public void 
testAutoCompactionDutyWithSegmentGranularityFinerAndNotAlignWithSegment() 
throws Exception
+  {
+    updateCompactionTaskSlot(1, 1, null);
+    final ISOChronology chrono = 
ISOChronology.getInstance(DateTimes.inferTzFromString("America/Los_Angeles"));
+    Map<String, Object> specs = ImmutableMap.of("%%GRANULARITYSPEC%%", new 
UniformGranularitySpec(Granularities.MONTH, Granularities.DAY, false, 
ImmutableList.of(new Interval("2013-08-31/2013-09-02", chrono))));
+    loadData(INDEX_TASK_WITH_GRANULARITY_SPEC, specs);
+    try (final Closeable ignored = unloader(fullDatasourceName)) {
+      Map<String, Object> expectedResult = ImmutableMap.of(
+          "%%FIELD_TO_QUERY%%", "added",
+          "%%EXPECTED_COUNT_RESULT%%", 2,
+          "%%EXPECTED_SCAN_RESULT%%", 
ImmutableList.of(ImmutableMap.of("events", 
ImmutableList.of(ImmutableList.of(57.0), ImmutableList.of(459.0))))
+      );
+      verifyQuery(INDEX_ROLLUP_QUERIES_RESOURCE, expectedResult);
+      submitCompactionConfig(
+          MAX_ROWS_PER_SEGMENT_COMPACTED,
+          NO_SKIP_OFFSET,
+          new UserCompactionTaskGranularityConfig(Granularities.WEEK, null, 
null),
+          false
+      );
+      // Before compaction, we have segments with the interval 
2013-08-01/2013-09-01 and 2013-09-01/2013-10-01
+      // We will compact the latest segment, 2013-09-01/2013-10-01, to WEEK.
+      // Since the start of the week does not align with 2013-09-01 or 
2013-10-01, we expect the compaction task's
+      // interval to be adjusted so that the compacted WEEK segments does not 
unintentionally remove data of the
+      // non compacted 2013-08-01/2013-09-01 segment.
+      // Note that the compacted WEEK segment does not fully cover the 
original MONTH segment as the MONTH segment
+      // does not have data on every week on the month
+      forceTriggerAutoCompaction(3);
+      // Make sure that no data is lost after compaction
+      expectedResult = ImmutableMap.of(
+          "%%FIELD_TO_QUERY%%", "added",
+          "%%EXPECTED_COUNT_RESULT%%", 2,
+          "%%EXPECTED_SCAN_RESULT%%", 
ImmutableList.of(ImmutableMap.of("events", 
ImmutableList.of(ImmutableList.of(57.0), ImmutableList.of(459.0))))
+      );
+      verifyQuery(INDEX_ROLLUP_QUERIES_RESOURCE, expectedResult);
+      verifySegmentsCompacted(1, MAX_ROWS_PER_SEGMENT_COMPACTED);
+      List<TaskResponseObject> tasks = 
indexer.getCompleteTasksForDataSource(fullDatasourceName);
+      TaskResponseObject compactTask = null;
+      for (TaskResponseObject task : tasks) {
+        if (task.getType().equals("compact")) {
+          compactTask = task;
+        }
+      }
+      Assert.assertNotNull(compactTask);
+      TaskPayloadResponse task = indexer.getTaskPayload(compactTask.getId());
+      // Verify that compaction task interval is adjusted to align with 
segmentGranularity
+      
Assert.assertEquals(Intervals.of("2013-08-26T00:00:00.000Z/2013-10-07T00:00:00.000Z"),
 ((CompactionIntervalSpec) ((CompactionTask) 
task.getPayload()).getIoConfig().getInputSpec()).getInterval());
+    }
+  }
+
+  @Test
+  public void 
testAutoCompactionDutyWithSegmentGranularityCoarserAndNotAlignWithSegment() 
throws Exception

Review comment:
       Should this tests week -> month instead of month -> year if it's for the 
case when the new granularity does not align with the previous?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]



---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to