This is an automated email from the ASF dual-hosted git repository.
gian pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/druid.git
The following commit(s) were added to refs/heads/master by this push:
new ad1b5f28bfe fix native compaction oom issue (#19121)
ad1b5f28bfe is described below
commit ad1b5f28bfecfd7b3cbd7f86d0d03c0af61df9b8
Author: Cece Mei <[email protected]>
AuthorDate: Tue Mar 10 20:16:34 2026 -0700
fix native compaction oom issue (#19121)
---
.../druid/indexing/common/task/CompactionTask.java | 2 +-
.../common/task/CompactionTaskParallelRunTest.java | 2 +-
.../common/task/CompactionTaskRunBase.java | 117 +++++++++++++++++----
.../common/task/NativeCompactionTaskRunTest.java | 2 +-
.../druid/msq/exec/MSQCompactionTaskRunTest.java | 2 +-
5 files changed, 99 insertions(+), 26 deletions(-)
diff --git
a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/CompactionTask.java
b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/CompactionTask.java
index 5c431648169..d7a224b0585 100644
---
a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/CompactionTask.java
+++
b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/CompactionTask.java
@@ -636,7 +636,7 @@ public class CompactionTask extends AbstractBatchIndexTask
implements PendingSeg
toolbox.getEmitter(),
metricBuilder,
segmentProvider.dataSource,
- umbrellaInterval(timelineSegments, segmentProvider),
+ JodaUtils.umbrellaInterval(Iterables.transform(timelineSegments,
DataSegment::getInterval)),
lazyFetchSegments(
timelineSegments,
toolbox.getSegmentCacheManager()
diff --git
a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/CompactionTaskParallelRunTest.java
b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/CompactionTaskParallelRunTest.java
index 75d04659388..6b12ad84461 100644
---
a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/CompactionTaskParallelRunTest.java
+++
b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/CompactionTaskParallelRunTest.java
@@ -372,7 +372,7 @@ public class CompactionTaskParallelRunTest extends
AbstractParallelIndexSupervis
Granularities.HOUR,
Granularities.MINUTE,
true,
- ImmutableList.of(INTERVAL_TO_INDEX)
+
ImmutableList.of(Intervals.of("2014-01-01T00:00:00Z/2014-01-01T03:00:00Z"))
),
null
);
diff --git
a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/CompactionTaskRunBase.java
b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/CompactionTaskRunBase.java
index 50b53e61488..b8f14ae1a3d 100644
---
a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/CompactionTaskRunBase.java
+++
b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/CompactionTaskRunBase.java
@@ -170,6 +170,7 @@ public abstract class CompactionTaskRunBase
protected static final Interval TEST_INTERVAL_DAY =
Intervals.of("2014-01-01/2014-01-02");
protected static final Interval TEST_INTERVAL =
Intervals.of("2014-01-01T00:00:00Z/2014-01-01T06:00:00Z");
+ protected static final Interval TEST_ACTUAL_INTERVAL =
Intervals.of("2014-01-01T00:00:00Z/2014-01-01T03:00:00Z");
protected static final List<String> TEST_ROWS = ImmutableList.of(
"2014-01-01T00:00:10Z,a,1\n",
"2014-01-01T00:00:10Z,b,2\n",
@@ -304,7 +305,13 @@ public abstract class CompactionTaskRunBase
final List<DataSegment> segments = new
ArrayList<>(dataSegmentsWithSchemas.getSegments());
List<String> rowsFromSegment = getCSVFormatRowsFromSegments(segments);
Assert.assertEquals(TEST_ROWS, rowsFromSegment);
- verifyCompactedSegment(segments, segmentGranularity, DEFAULT_QUERY_GRAN,
false);
+ verifyCompactedSegment(
+ compactionTask.getCompactionRunner(),
+ segments,
+ segmentGranularity,
+ DEFAULT_QUERY_GRAN,
+ false
+ );
}
@Test
@@ -364,14 +371,44 @@ public abstract class CompactionTaskRunBase
final Pair<TaskStatus, DataSegmentsWithSchemas> resultPair1 =
runTask(compactionTask1);
verifyTaskSuccessRowsAndSchemaMatch(resultPair1, TOTAL_TEST_ROWS);
- verifyCompactedSegment(List.copyOf(resultPair1.rhs.getSegments()),
segmentGranularity, DEFAULT_QUERY_GRAN, false);
+ verifyCompactedSegment(
+ compactionTask1.getCompactionRunner(),
+ List.copyOf(resultPair1.rhs.getSegments()),
+ segmentGranularity,
+ DEFAULT_QUERY_GRAN,
+ false
+ );
final CompactionTask compactionTask2 =
compactionTaskBuilder(segmentGranularity).interval(inputInterval,
true).build();
final Pair<TaskStatus, DataSegmentsWithSchemas> resultPair2 =
runTask(compactionTask2);
verifyTaskSuccessRowsAndSchemaMatch(resultPair2, TOTAL_TEST_ROWS);
- verifyCompactedSegment(List.copyOf(resultPair2.rhs.getSegments()),
segmentGranularity, DEFAULT_QUERY_GRAN, false);
+ if (segmentGranularity == null ||
segmentGranularity.equals(Granularities.HOUR)) {
+ verifyCompactedSegment(
+ compactionTask1.getCompactionRunner(),
+ List.copyOf(resultPair2.rhs.getSegments()),
+ segmentGranularity,
+ DEFAULT_QUERY_GRAN,
+ false
+ );
+ } else if (segmentGranularity.equals(Granularities.SIX_HOUR)) {
+ Set<DataSegment> compactedSegments = resultPair2.rhs.getSegments();
+ Assert.assertEquals(1, compactedSegments.size());
+
+ DataSegment compactedSegment =
Iterables.getOnlyElement(compactedSegments);
+ Assert.assertEquals(TEST_INTERVAL, compactedSegment.getInterval());
+
+ // compact interval is always the SIX_HOUR interval, since the previous
compaction has generated a new SIX_HOUR segment, this means the compaction
state in the second compaction is different from the first one.
+ Assert.assertEquals(
+ getDefaultCompactionState(segmentGranularity, DEFAULT_QUERY_GRAN,
List.of(TEST_INTERVAL)),
+ compactedSegment.getLastCompactionState()
+ );
+ Assert.assertEquals(new NumberedShardSpec(0, 1),
compactedSegment.getShardSpec());
+
+ } else {
+ throw new RE("Gran[%s] is not supported", segmentGranularity);
+ }
}
@Test
@@ -385,7 +422,13 @@ public abstract class CompactionTaskRunBase
final Pair<TaskStatus, DataSegmentsWithSchemas> resultPair1 =
runTask(compactionTask1);
verifyTaskSuccessRowsAndSchemaMatch(resultPair1, TOTAL_TEST_ROWS);
- verifyCompactedSegment(List.copyOf(resultPair1.rhs.getSegments()),
segmentGranularity, DEFAULT_QUERY_GRAN, true);
+ verifyCompactedSegment(
+ compactionTask1.getCompactionRunner(),
+ List.copyOf(resultPair1.rhs.getSegments()),
+ segmentGranularity,
+ DEFAULT_QUERY_GRAN,
+ true
+ );
final CompactionTask compactionTask2 =
compactionTaskBuilder(segmentGranularity).interval(inputInterval,
false).build();
@@ -399,9 +442,9 @@ public abstract class CompactionTaskRunBase
for (int i = 0; i < 3; i++) {
Interval interval =
Intervals.of("2014-01-01T0%d:00:00/2014-01-01T0%d:00:00", i, i + 1);
Assert.assertEquals(interval, segments.get(i).getInterval());
- Interval inputInterval = segmentGranularity == null ? interval :
this.inputInterval;
+ Interval compactInterval = segmentGranularity == null ? interval :
TEST_ACTUAL_INTERVAL;
Assert.assertEquals(
- getDefaultCompactionState(DEFAULT_SEGMENT_GRAN,
DEFAULT_QUERY_GRAN, List.of(inputInterval)),
+ getDefaultCompactionState(DEFAULT_SEGMENT_GRAN,
DEFAULT_QUERY_GRAN, List.of(compactInterval)),
segments.get(i).getLastCompactionState()
);
// overwrite shard starts at NON_ROOT_GEN_START_PARTITION_ID + 1, and
minor version 2 for the second compaction
@@ -416,8 +459,9 @@ public abstract class CompactionTaskRunBase
} else if (segmentGranularity.equals(Granularities.SIX_HOUR)) {
Assert.assertEquals(1, segments.size());
Assert.assertEquals(TEST_INTERVAL, segments.get(0).getInterval());
+ // compact interval is always the SIX_HOUR interval, since the previous
compaction has generated a new SIX_HOUR segment, this means the compaction
state in the second compaction is different from the first one.
Assert.assertEquals(
- getDefaultCompactionState(segmentGranularity, DEFAULT_QUERY_GRAN,
List.of(inputInterval)),
+ getDefaultCompactionState(segmentGranularity, DEFAULT_QUERY_GRAN,
List.of(TEST_INTERVAL)),
segments.get(0).getLastCompactionState()
);
// use overwrite shard for the second compaction
@@ -436,7 +480,11 @@ public abstract class CompactionTaskRunBase
@Test
public void testRunIndexAndCompactAtTheSameTimeForDifferentInterval() throws
Exception
{
- Assume.assumeTrue("Use 3 hr interval to compact",
TEST_INTERVAL.equals(inputInterval));
+ Assume.assumeTrue(
+ "test with defined segment granularity and interval in this test",
+ Granularities.SIX_HOUR.equals(segmentGranularity) &&
TEST_INTERVAL.equals(inputInterval)
+ && lockGranularity != LockGranularity.SEGMENT
+ );
verifyTaskSuccessRowsAndSchemaMatch(runIndexTask(), TOTAL_TEST_ROWS);
final CompactionTask compactionTask =
@@ -487,6 +535,7 @@ public abstract class CompactionTaskRunBase
Pair<TaskStatus, DataSegmentsWithSchemas> compactionResult =
compactionFuture.get();
verifyTaskSuccessRowsAndSchemaMatch(compactionResult, TOTAL_TEST_ROWS);
verifyCompactedSegment(
+ compactionTask.getCompactionRunner(),
List.copyOf(compactionResult.rhs.getSegments()),
segmentGranularity,
DEFAULT_QUERY_GRAN,
@@ -539,11 +588,7 @@ public abstract class CompactionTaskRunBase
Assert.assertEquals(Intervals.of("2013-12-30/2014-01-06"),
segments.get(0).getInterval());
Assert.assertEquals(new NumberedShardSpec(0, 1),
segments.get(0).getShardSpec());
Assert.assertEquals(
- getDefaultCompactionState(
- Granularities.WEEK,
- DEFAULT_QUERY_GRAN,
- List.of(inputInterval)
- ),
+ getDefaultCompactionState(Granularities.WEEK, DEFAULT_QUERY_GRAN,
List.of(TEST_ACTUAL_INTERVAL)),
segments.get(0).getLastCompactionState()
);
}
@@ -604,10 +649,14 @@ public abstract class CompactionTaskRunBase
Assert.assertEquals(TEST_INTERVAL, segments.get(0).getInterval());
Assert.assertEquals(new NumberedShardSpec(0, 1),
segments.get(0).getShardSpec());
+ // native runner use interval from actual seen segments, while msq runner
use interval from input
+ Interval compactInterval = compactionTask.getCompactionRunner() instanceof
NativeCompactionRunner
+ ? TEST_ACTUAL_INTERVAL
+ : inputInterval;
CompactionState expectedCompactionState = getDefaultCompactionState(
segmentGranularity,
DEFAULT_QUERY_GRAN,
- List.of(inputInterval)
+ List.of(compactInterval)
).toBuilder().transformSpec(new CompactionTransformSpec(new
SelectorDimFilter("dim", "a", null), null)).build();
Assert.assertEquals(expectedCompactionState,
segments.get(0).getLastCompactionState());
}
@@ -663,7 +712,7 @@ public abstract class CompactionTaskRunBase
AggregatorFactory expectedCountMetric = new CountAggregatorFactory("cnt");
AggregatorFactory expectedLongSumMetric = new
LongSumAggregatorFactory("val", "val");
CompactionState expectedCompactionState =
- getDefaultCompactionState(segmentGranularity, Granularities.MINUTE,
List.of(inputInterval))
+ getDefaultCompactionState(segmentGranularity, Granularities.MINUTE,
List.of(TEST_ACTUAL_INTERVAL))
.toBuilder()
.metricsSpec(List.of(expectedCountMetric, expectedLongSumMetric))
.build();
@@ -685,7 +734,13 @@ public abstract class CompactionTaskRunBase
verifyTaskSuccessRowsAndSchemaMatch(resultPair, TOTAL_TEST_ROWS);
List<DataSegment> segments = new ArrayList<>(resultPair.rhs.getSegments());
- verifyCompactedSegment(segments, segmentGranularity, Granularities.SECOND,
false);
+ verifyCompactedSegment(
+ compactionTask1.getCompactionRunner(),
+ segments,
+ segmentGranularity,
+ Granularities.SECOND,
+ false
+ );
}
@Test
@@ -709,8 +764,12 @@ public abstract class CompactionTaskRunBase
List<DataSegment> segments = List.copyOf(resultPair.rhs.getSegments());
Assert.assertEquals(1, segments.size());
Assert.assertEquals(TEST_INTERVAL_DAY, segments.get(0).getInterval());
+ // native runner use interval from actual seen segments, while msq runner
use interval from input
+ Interval interval = compactionTask1.getCompactionRunner() instanceof
NativeCompactionRunner
+ ? TEST_ACTUAL_INTERVAL
+ : TEST_INTERVAL_DAY;
Assert.assertEquals(
- getDefaultCompactionState(Granularities.DAY, Granularities.DAY,
List.of(TEST_INTERVAL_DAY)),
+ getDefaultCompactionState(Granularities.DAY, Granularities.DAY,
List.of(interval)),
segments.get(0).getLastCompactionState()
);
Assert.assertEquals(new NumberedShardSpec(0, 1),
segments.get(0).getShardSpec());
@@ -1181,10 +1240,14 @@ public abstract class CompactionTaskRunBase
Assert.assertEquals(1, segments.size());
Assert.assertEquals(TEST_INTERVAL, segments.get(0).getInterval());
+ // native runner use interval from actual seen segments, while msq runner
use interval from input
+ Interval compactInterval = compactionTask.getCompactionRunner() instanceof
NativeCompactionRunner
+ ?
Intervals.of("2014-01-01T00:00:00Z/2014-01-01T02:00:00Z")
+ : inputInterval;
CompactionState defaultCompactionState = getDefaultCompactionState(
Granularities.SIX_HOUR,
Granularities.MINUTE,
- List.of(TEST_INTERVAL)
+ List.of(compactInterval)
);
CompactionState newCompactionState =
defaultCompactionState.toBuilder().dimensionsSpec(
@@ -1291,11 +1354,15 @@ public abstract class CompactionTaskRunBase
final List<String> dimensionExclusions =
compactionTask.getCompactionRunner() instanceof NativeCompactionRunner
? List.of() : List.of("__time", "val");
+ // native runner use interval from actual seen segments, while msq runner
use interval from input
+ final Interval compactInterval = compactionTask.getCompactionRunner()
instanceof NativeCompactionRunner
+ ?
Intervals.of("2014-01-01T00:00:00Z/2014-01-01T02:00:00Z")
+ : TEST_INTERVAL;
CompactionState expectedState =
getDefaultCompactionState(
Granularities.SIX_HOUR,
Granularities.MINUTE,
- List.of(TEST_INTERVAL)
+ List.of(compactInterval)
).toBuilder()
.dimensionsSpec(
new DimensionsSpec(Arrays.asList(/* check explicitly specified
types are preserved */
@@ -1728,6 +1795,7 @@ public abstract class CompactionTaskRunBase
}
protected void verifyCompactedSegment(
+ CompactionRunner compactionRunner,
List<DataSegment> segments,
Granularity gran,
Granularity queryGran,
@@ -1740,9 +1808,10 @@ public abstract class CompactionTaskRunBase
for (int i = 0; i < 3; i++) {
Interval interval =
Intervals.of("2014-01-01T0%d:00:00/2014-01-01T0%d:00:00", i, i + 1);
Assert.assertEquals(interval, segments.get(i).getInterval());
- Interval inputInterval = gran == null ? interval : this.inputInterval;
+ // native runner use interval from actual seen segments, while msq
runner use never use this gran
+ Interval compactInterval = gran == null ? interval :
TEST_ACTUAL_INTERVAL;
Assert.assertEquals(
- getDefaultCompactionState(DEFAULT_SEGMENT_GRAN, queryGran,
List.of(inputInterval)),
+ getDefaultCompactionState(DEFAULT_SEGMENT_GRAN, queryGran,
List.of(compactInterval)),
segments.get(i).getLastCompactionState()
);
if (useOverwriteShard) {
@@ -1757,8 +1826,12 @@ public abstract class CompactionTaskRunBase
} else if (gran.equals(Granularities.SIX_HOUR)) {
Assert.assertEquals(1, segments.size());
Assert.assertEquals(TEST_INTERVAL, segments.get(0).getInterval());
+ // native runner use interval from actual seen segments, while msq
runner use interval from input
+ Interval compactInterval = compactionRunner instanceof
NativeCompactionRunner
+ ? TEST_ACTUAL_INTERVAL
+ : inputInterval;
Assert.assertEquals(
- getDefaultCompactionState(gran, queryGran, List.of(inputInterval)),
+ getDefaultCompactionState(gran, queryGran, List.of(compactInterval)),
segments.get(0).getLastCompactionState()
);
Assert.assertEquals(new NumberedShardSpec(0, 1),
segments.get(0).getShardSpec());
diff --git
a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/NativeCompactionTaskRunTest.java
b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/NativeCompactionTaskRunTest.java
index d3430d90b23..3e306e4a9a4 100644
---
a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/NativeCompactionTaskRunTest.java
+++
b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/NativeCompactionTaskRunTest.java
@@ -35,7 +35,7 @@ import java.util.List;
@RunWith(Parameterized.class)
public class NativeCompactionTaskRunTest extends CompactionTaskRunBase
{
- @Parameterized.Parameters(name = "name={0}, inputInterval={5},
segmentGran={6}")
+ @Parameterized.Parameters(name = "name={0}, inputInterval={6},
segmentGran={7}")
public static Iterable<Object[]> constructorFeeder()
{
final List<Object[]> constructors = new ArrayList<>();
diff --git
a/multi-stage-query/src/test/java/org/apache/druid/msq/exec/MSQCompactionTaskRunTest.java
b/multi-stage-query/src/test/java/org/apache/druid/msq/exec/MSQCompactionTaskRunTest.java
index 51235417799..758f27a741f 100644
---
a/multi-stage-query/src/test/java/org/apache/druid/msq/exec/MSQCompactionTaskRunTest.java
+++
b/multi-stage-query/src/test/java/org/apache/druid/msq/exec/MSQCompactionTaskRunTest.java
@@ -126,7 +126,7 @@ public class MSQCompactionTaskRunTest extends
CompactionTaskRunBase
private final ConcurrentHashMap<String, TaskActionClient> taskActionClients
= new ConcurrentHashMap<>();
private Injector injector;
- @Parameterized.Parameters(name = "name: {0}, inputInterval={5},
segmentGran={6}")
+ @Parameterized.Parameters(name = "name: {0}, inputInterval={6},
segmentGran={7}")
public static Iterable<Object[]> constructorFeeder()
{
final List<Object[]> constructors = new ArrayList<>();
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]