This is an automated email from the ASF dual-hosted git repository.

gian pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/druid.git


The following commit(s) were added to refs/heads/master by this push:
     new ad1b5f28bfe fix native compaction oom issue (#19121)
ad1b5f28bfe is described below

commit ad1b5f28bfecfd7b3cbd7f86d0d03c0af61df9b8
Author: Cece Mei <[email protected]>
AuthorDate: Tue Mar 10 20:16:34 2026 -0700

    fix native compaction oom issue (#19121)
---
 .../druid/indexing/common/task/CompactionTask.java |   2 +-
 .../common/task/CompactionTaskParallelRunTest.java |   2 +-
 .../common/task/CompactionTaskRunBase.java         | 117 +++++++++++++++++----
 .../common/task/NativeCompactionTaskRunTest.java   |   2 +-
 .../druid/msq/exec/MSQCompactionTaskRunTest.java   |   2 +-
 5 files changed, 99 insertions(+), 26 deletions(-)

diff --git 
a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/CompactionTask.java
 
b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/CompactionTask.java
index 5c431648169..d7a224b0585 100644
--- 
a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/CompactionTask.java
+++ 
b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/CompactionTask.java
@@ -636,7 +636,7 @@ public class CompactionTask extends AbstractBatchIndexTask 
implements PendingSeg
           toolbox.getEmitter(),
           metricBuilder,
           segmentProvider.dataSource,
-          umbrellaInterval(timelineSegments, segmentProvider),
+          JodaUtils.umbrellaInterval(Iterables.transform(timelineSegments, 
DataSegment::getInterval)),
           lazyFetchSegments(
               timelineSegments,
               toolbox.getSegmentCacheManager()
diff --git 
a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/CompactionTaskParallelRunTest.java
 
b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/CompactionTaskParallelRunTest.java
index 75d04659388..6b12ad84461 100644
--- 
a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/CompactionTaskParallelRunTest.java
+++ 
b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/CompactionTaskParallelRunTest.java
@@ -372,7 +372,7 @@ public class CompactionTaskParallelRunTest extends 
AbstractParallelIndexSupervis
               Granularities.HOUR,
               Granularities.MINUTE,
               true,
-              ImmutableList.of(INTERVAL_TO_INDEX)
+              
ImmutableList.of(Intervals.of("2014-01-01T00:00:00Z/2014-01-01T03:00:00Z"))
           ),
           null
       );
diff --git 
a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/CompactionTaskRunBase.java
 
b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/CompactionTaskRunBase.java
index 50b53e61488..b8f14ae1a3d 100644
--- 
a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/CompactionTaskRunBase.java
+++ 
b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/CompactionTaskRunBase.java
@@ -170,6 +170,7 @@ public abstract class CompactionTaskRunBase
 
   protected static final Interval TEST_INTERVAL_DAY = 
Intervals.of("2014-01-01/2014-01-02");
   protected static final Interval TEST_INTERVAL = 
Intervals.of("2014-01-01T00:00:00Z/2014-01-01T06:00:00Z");
+  protected static final Interval TEST_ACTUAL_INTERVAL = 
Intervals.of("2014-01-01T00:00:00Z/2014-01-01T03:00:00Z");
   protected static final List<String> TEST_ROWS = ImmutableList.of(
       "2014-01-01T00:00:10Z,a,1\n",
       "2014-01-01T00:00:10Z,b,2\n",
@@ -304,7 +305,13 @@ public abstract class CompactionTaskRunBase
     final List<DataSegment> segments = new 
ArrayList<>(dataSegmentsWithSchemas.getSegments());
     List<String> rowsFromSegment = getCSVFormatRowsFromSegments(segments);
     Assert.assertEquals(TEST_ROWS, rowsFromSegment);
-    verifyCompactedSegment(segments, segmentGranularity, DEFAULT_QUERY_GRAN, 
false);
+    verifyCompactedSegment(
+        compactionTask.getCompactionRunner(),
+        segments,
+        segmentGranularity,
+        DEFAULT_QUERY_GRAN,
+        false
+    );
   }
 
   @Test
@@ -364,14 +371,44 @@ public abstract class CompactionTaskRunBase
 
     final Pair<TaskStatus, DataSegmentsWithSchemas> resultPair1 = 
runTask(compactionTask1);
     verifyTaskSuccessRowsAndSchemaMatch(resultPair1, TOTAL_TEST_ROWS);
-    verifyCompactedSegment(List.copyOf(resultPair1.rhs.getSegments()), 
segmentGranularity, DEFAULT_QUERY_GRAN, false);
+    verifyCompactedSegment(
+        compactionTask1.getCompactionRunner(),
+        List.copyOf(resultPair1.rhs.getSegments()),
+        segmentGranularity,
+        DEFAULT_QUERY_GRAN,
+        false
+    );
 
     final CompactionTask compactionTask2 =
         compactionTaskBuilder(segmentGranularity).interval(inputInterval, 
true).build();
 
     final Pair<TaskStatus, DataSegmentsWithSchemas> resultPair2 = 
runTask(compactionTask2);
     verifyTaskSuccessRowsAndSchemaMatch(resultPair2, TOTAL_TEST_ROWS);
-    verifyCompactedSegment(List.copyOf(resultPair2.rhs.getSegments()), 
segmentGranularity, DEFAULT_QUERY_GRAN, false);
+    if (segmentGranularity == null || 
segmentGranularity.equals(Granularities.HOUR)) {
+      verifyCompactedSegment(
+          compactionTask1.getCompactionRunner(),
+          List.copyOf(resultPair2.rhs.getSegments()),
+          segmentGranularity,
+          DEFAULT_QUERY_GRAN,
+          false
+      );
+    } else if (segmentGranularity.equals(Granularities.SIX_HOUR)) {
+      Set<DataSegment> compactedSegments = resultPair2.rhs.getSegments();
+      Assert.assertEquals(1, compactedSegments.size());
+
+      DataSegment compactedSegment = 
Iterables.getOnlyElement(compactedSegments);
+      Assert.assertEquals(TEST_INTERVAL, compactedSegment.getInterval());
+
+      // compact interval is always the SIX_HOUR interval, since the previous 
compaction has generated a new SIX_HOUR segment, this means the compaction 
state in the second compaction is different from the first one.
+      Assert.assertEquals(
+          getDefaultCompactionState(segmentGranularity, DEFAULT_QUERY_GRAN, 
List.of(TEST_INTERVAL)),
+          compactedSegment.getLastCompactionState()
+      );
+      Assert.assertEquals(new NumberedShardSpec(0, 1), 
compactedSegment.getShardSpec());
+
+    } else {
+      throw new RE("Gran[%s] is not supported", segmentGranularity);
+    }
   }
 
   @Test
@@ -385,7 +422,13 @@ public abstract class CompactionTaskRunBase
 
     final Pair<TaskStatus, DataSegmentsWithSchemas> resultPair1 = 
runTask(compactionTask1);
     verifyTaskSuccessRowsAndSchemaMatch(resultPair1, TOTAL_TEST_ROWS);
-    verifyCompactedSegment(List.copyOf(resultPair1.rhs.getSegments()), 
segmentGranularity, DEFAULT_QUERY_GRAN, true);
+    verifyCompactedSegment(
+        compactionTask1.getCompactionRunner(),
+        List.copyOf(resultPair1.rhs.getSegments()),
+        segmentGranularity,
+        DEFAULT_QUERY_GRAN,
+        true
+    );
 
     final CompactionTask compactionTask2 =
         compactionTaskBuilder(segmentGranularity).interval(inputInterval, 
false).build();
@@ -399,9 +442,9 @@ public abstract class CompactionTaskRunBase
       for (int i = 0; i < 3; i++) {
         Interval interval = 
Intervals.of("2014-01-01T0%d:00:00/2014-01-01T0%d:00:00", i, i + 1);
         Assert.assertEquals(interval, segments.get(i).getInterval());
-        Interval inputInterval = segmentGranularity == null ? interval : 
this.inputInterval;
+        Interval compactInterval = segmentGranularity == null ? interval : 
TEST_ACTUAL_INTERVAL;
         Assert.assertEquals(
-            getDefaultCompactionState(DEFAULT_SEGMENT_GRAN, 
DEFAULT_QUERY_GRAN, List.of(inputInterval)),
+            getDefaultCompactionState(DEFAULT_SEGMENT_GRAN, 
DEFAULT_QUERY_GRAN, List.of(compactInterval)),
             segments.get(i).getLastCompactionState()
         );
         // overwrite shard starts at NON_ROOT_GEN_START_PARTITION_ID + 1, and 
minor version 2 for the second compaction
@@ -416,8 +459,9 @@ public abstract class CompactionTaskRunBase
     } else if (segmentGranularity.equals(Granularities.SIX_HOUR)) {
       Assert.assertEquals(1, segments.size());
       Assert.assertEquals(TEST_INTERVAL, segments.get(0).getInterval());
+      // compact interval is always the SIX_HOUR interval, since the previous 
compaction has generated a new SIX_HOUR segment, this means the compaction 
state in the second compaction is different from the first one.
       Assert.assertEquals(
-          getDefaultCompactionState(segmentGranularity, DEFAULT_QUERY_GRAN, 
List.of(inputInterval)),
+          getDefaultCompactionState(segmentGranularity, DEFAULT_QUERY_GRAN, 
List.of(TEST_INTERVAL)),
           segments.get(0).getLastCompactionState()
       );
       // use overwrite shard for the second compaction
@@ -436,7 +480,11 @@ public abstract class CompactionTaskRunBase
   @Test
   public void testRunIndexAndCompactAtTheSameTimeForDifferentInterval() throws 
Exception
   {
-    Assume.assumeTrue("Use 3 hr interval to compact", 
TEST_INTERVAL.equals(inputInterval));
+    Assume.assumeTrue(
+        "test with defined segment granularity and interval in this test",
+        Granularities.SIX_HOUR.equals(segmentGranularity) && 
TEST_INTERVAL.equals(inputInterval)
+        && lockGranularity != LockGranularity.SEGMENT
+    );
     verifyTaskSuccessRowsAndSchemaMatch(runIndexTask(), TOTAL_TEST_ROWS);
 
     final CompactionTask compactionTask =
@@ -487,6 +535,7 @@ public abstract class CompactionTaskRunBase
     Pair<TaskStatus, DataSegmentsWithSchemas> compactionResult = 
compactionFuture.get();
     verifyTaskSuccessRowsAndSchemaMatch(compactionResult, TOTAL_TEST_ROWS);
     verifyCompactedSegment(
+        compactionTask.getCompactionRunner(),
         List.copyOf(compactionResult.rhs.getSegments()),
         segmentGranularity,
         DEFAULT_QUERY_GRAN,
@@ -539,11 +588,7 @@ public abstract class CompactionTaskRunBase
     Assert.assertEquals(Intervals.of("2013-12-30/2014-01-06"), 
segments.get(0).getInterval());
     Assert.assertEquals(new NumberedShardSpec(0, 1), 
segments.get(0).getShardSpec());
     Assert.assertEquals(
-        getDefaultCompactionState(
-            Granularities.WEEK,
-            DEFAULT_QUERY_GRAN,
-            List.of(inputInterval)
-        ),
+        getDefaultCompactionState(Granularities.WEEK, DEFAULT_QUERY_GRAN, 
List.of(TEST_ACTUAL_INTERVAL)),
         segments.get(0).getLastCompactionState()
     );
   }
@@ -604,10 +649,14 @@ public abstract class CompactionTaskRunBase
     Assert.assertEquals(TEST_INTERVAL, segments.get(0).getInterval());
     Assert.assertEquals(new NumberedShardSpec(0, 1), 
segments.get(0).getShardSpec());
 
+    // native runner use interval from actual seen segments, while msq runner 
use interval from input
+    Interval compactInterval = compactionTask.getCompactionRunner() instanceof 
NativeCompactionRunner
+                               ? TEST_ACTUAL_INTERVAL
+                               : inputInterval;
     CompactionState expectedCompactionState = getDefaultCompactionState(
         segmentGranularity,
         DEFAULT_QUERY_GRAN,
-        List.of(inputInterval)
+        List.of(compactInterval)
     ).toBuilder().transformSpec(new CompactionTransformSpec(new 
SelectorDimFilter("dim", "a", null), null)).build();
     Assert.assertEquals(expectedCompactionState, 
segments.get(0).getLastCompactionState());
   }
@@ -663,7 +712,7 @@ public abstract class CompactionTaskRunBase
     AggregatorFactory expectedCountMetric = new CountAggregatorFactory("cnt");
     AggregatorFactory expectedLongSumMetric = new 
LongSumAggregatorFactory("val", "val");
     CompactionState expectedCompactionState =
-        getDefaultCompactionState(segmentGranularity, Granularities.MINUTE, 
List.of(inputInterval))
+        getDefaultCompactionState(segmentGranularity, Granularities.MINUTE, 
List.of(TEST_ACTUAL_INTERVAL))
             .toBuilder()
             .metricsSpec(List.of(expectedCountMetric, expectedLongSumMetric))
             .build();
@@ -685,7 +734,13 @@ public abstract class CompactionTaskRunBase
     verifyTaskSuccessRowsAndSchemaMatch(resultPair, TOTAL_TEST_ROWS);
 
     List<DataSegment> segments = new ArrayList<>(resultPair.rhs.getSegments());
-    verifyCompactedSegment(segments, segmentGranularity, Granularities.SECOND, 
false);
+    verifyCompactedSegment(
+        compactionTask1.getCompactionRunner(),
+        segments,
+        segmentGranularity,
+        Granularities.SECOND,
+        false
+    );
   }
 
   @Test
@@ -709,8 +764,12 @@ public abstract class CompactionTaskRunBase
     List<DataSegment> segments = List.copyOf(resultPair.rhs.getSegments());
     Assert.assertEquals(1, segments.size());
     Assert.assertEquals(TEST_INTERVAL_DAY, segments.get(0).getInterval());
+    // native runner use interval from actual seen segments, while msq runner 
use interval from input
+    Interval interval = compactionTask1.getCompactionRunner() instanceof 
NativeCompactionRunner
+                        ? TEST_ACTUAL_INTERVAL
+                        : TEST_INTERVAL_DAY;
     Assert.assertEquals(
-        getDefaultCompactionState(Granularities.DAY, Granularities.DAY, 
List.of(TEST_INTERVAL_DAY)),
+        getDefaultCompactionState(Granularities.DAY, Granularities.DAY, 
List.of(interval)),
         segments.get(0).getLastCompactionState()
     );
     Assert.assertEquals(new NumberedShardSpec(0, 1), 
segments.get(0).getShardSpec());
@@ -1181,10 +1240,14 @@ public abstract class CompactionTaskRunBase
     Assert.assertEquals(1, segments.size());
 
     Assert.assertEquals(TEST_INTERVAL, segments.get(0).getInterval());
+    // native runner use interval from actual seen segments, while msq runner 
use interval from input
+    Interval compactInterval = compactionTask.getCompactionRunner() instanceof 
NativeCompactionRunner
+                               ? 
Intervals.of("2014-01-01T00:00:00Z/2014-01-01T02:00:00Z")
+                               : inputInterval;
     CompactionState defaultCompactionState = getDefaultCompactionState(
         Granularities.SIX_HOUR,
         Granularities.MINUTE,
-        List.of(TEST_INTERVAL)
+        List.of(compactInterval)
     );
     CompactionState newCompactionState =
         defaultCompactionState.toBuilder().dimensionsSpec(
@@ -1291,11 +1354,15 @@ public abstract class CompactionTaskRunBase
 
     final List<String> dimensionExclusions =
         compactionTask.getCompactionRunner() instanceof NativeCompactionRunner 
? List.of() : List.of("__time", "val");
+    // native runner use interval from actual seen segments, while msq runner 
use interval from input
+    final Interval compactInterval = compactionTask.getCompactionRunner() 
instanceof NativeCompactionRunner
+                                     ? 
Intervals.of("2014-01-01T00:00:00Z/2014-01-01T02:00:00Z")
+                                     : TEST_INTERVAL;
     CompactionState expectedState =
         getDefaultCompactionState(
             Granularities.SIX_HOUR,
             Granularities.MINUTE,
-            List.of(TEST_INTERVAL)
+            List.of(compactInterval)
         ).toBuilder()
          .dimensionsSpec(
              new DimensionsSpec(Arrays.asList(/* check explicitly specified 
types are preserved */
@@ -1728,6 +1795,7 @@ public abstract class CompactionTaskRunBase
   }
 
   protected void verifyCompactedSegment(
+      CompactionRunner compactionRunner,
       List<DataSegment> segments,
       Granularity gran,
       Granularity queryGran,
@@ -1740,9 +1808,10 @@ public abstract class CompactionTaskRunBase
       for (int i = 0; i < 3; i++) {
         Interval interval = 
Intervals.of("2014-01-01T0%d:00:00/2014-01-01T0%d:00:00", i, i + 1);
         Assert.assertEquals(interval, segments.get(i).getInterval());
-        Interval inputInterval = gran == null ? interval : this.inputInterval;
+        // native runner use interval from actual seen segments, while msq 
runner use never use this gran
+        Interval compactInterval = gran == null ? interval : 
TEST_ACTUAL_INTERVAL;
         Assert.assertEquals(
-            getDefaultCompactionState(DEFAULT_SEGMENT_GRAN, queryGran, 
List.of(inputInterval)),
+            getDefaultCompactionState(DEFAULT_SEGMENT_GRAN, queryGran, 
List.of(compactInterval)),
             segments.get(i).getLastCompactionState()
         );
         if (useOverwriteShard) {
@@ -1757,8 +1826,12 @@ public abstract class CompactionTaskRunBase
     } else if (gran.equals(Granularities.SIX_HOUR)) {
       Assert.assertEquals(1, segments.size());
       Assert.assertEquals(TEST_INTERVAL, segments.get(0).getInterval());
+      // native runner use interval from actual seen segments, while msq 
runner use interval from input
+      Interval compactInterval = compactionRunner instanceof 
NativeCompactionRunner
+                                 ? TEST_ACTUAL_INTERVAL
+                                 : inputInterval;
       Assert.assertEquals(
-          getDefaultCompactionState(gran, queryGran, List.of(inputInterval)),
+          getDefaultCompactionState(gran, queryGran, List.of(compactInterval)),
           segments.get(0).getLastCompactionState()
       );
       Assert.assertEquals(new NumberedShardSpec(0, 1), 
segments.get(0).getShardSpec());
diff --git 
a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/NativeCompactionTaskRunTest.java
 
b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/NativeCompactionTaskRunTest.java
index d3430d90b23..3e306e4a9a4 100644
--- 
a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/NativeCompactionTaskRunTest.java
+++ 
b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/NativeCompactionTaskRunTest.java
@@ -35,7 +35,7 @@ import java.util.List;
 @RunWith(Parameterized.class)
 public class NativeCompactionTaskRunTest extends CompactionTaskRunBase
 {
-  @Parameterized.Parameters(name = "name={0}, inputInterval={5}, 
segmentGran={6}")
+  @Parameterized.Parameters(name = "name={0}, inputInterval={6}, 
segmentGran={7}")
   public static Iterable<Object[]> constructorFeeder()
   {
     final List<Object[]> constructors = new ArrayList<>();
diff --git 
a/multi-stage-query/src/test/java/org/apache/druid/msq/exec/MSQCompactionTaskRunTest.java
 
b/multi-stage-query/src/test/java/org/apache/druid/msq/exec/MSQCompactionTaskRunTest.java
index 51235417799..758f27a741f 100644
--- 
a/multi-stage-query/src/test/java/org/apache/druid/msq/exec/MSQCompactionTaskRunTest.java
+++ 
b/multi-stage-query/src/test/java/org/apache/druid/msq/exec/MSQCompactionTaskRunTest.java
@@ -126,7 +126,7 @@ public class MSQCompactionTaskRunTest extends 
CompactionTaskRunBase
   private final ConcurrentHashMap<String, TaskActionClient> taskActionClients 
= new ConcurrentHashMap<>();
   private Injector injector;
 
-  @Parameterized.Parameters(name = "name: {0}, inputInterval={5}, 
segmentGran={6}")
+  @Parameterized.Parameters(name = "name: {0}, inputInterval={6}, 
segmentGran={7}")
   public static Iterable<Object[]> constructorFeeder()
   {
     final List<Object[]> constructors = new ArrayList<>();


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to