kfaraz commented on code in PR #19016:
URL: https://github.com/apache/druid/pull/19016#discussion_r2971226997
##########
indexing-service/src/main/java/org/apache/druid/indexing/common/task/NativeCompactionRunner.java:
##########
@@ -99,9 +104,14 @@ public CompactionConfigValidationResult
validateCompactionTask(
"Virtual columns in filter rules are not supported by the Native
compaction engine. Use MSQ compaction engine instead."
);
}
+
if (compactionTask.getIoConfig().getInputSpec() instanceof
MinorCompactionInputSpec) {
- return CompactionConfigValidationResult.failure(
- "Minor compaction is not supported by Native compaction engine. Use
MSQ compaction engine instead.");
+ boolean usingConcurrentLocks =
compactionTask.getContextValue(Tasks.USE_CONCURRENT_LOCKS);
Review Comment:
Using the default value here will allow this code to work even when we
change the default value to `true` in the future:
```suggestion
boolean usingConcurrentLocks =
compactionTask.getContextValue(Tasks.USE_CONCURRENT_LOCKS,
Tasks.DEFAULT_USE_CONCURRENT_LOCKS);
```
##########
indexing-service/src/main/java/org/apache/druid/indexing/common/task/NativeCompactionRunner.java:
##########
@@ -99,9 +104,14 @@ public CompactionConfigValidationResult
validateCompactionTask(
"Virtual columns in filter rules are not supported by the Native
compaction engine. Use MSQ compaction engine instead."
);
}
+
if (compactionTask.getIoConfig().getInputSpec() instanceof
MinorCompactionInputSpec) {
- return CompactionConfigValidationResult.failure(
- "Minor compaction is not supported by Native compaction engine. Use
MSQ compaction engine instead.");
+ boolean usingConcurrentLocks =
compactionTask.getContextValue(Tasks.USE_CONCURRENT_LOCKS);
Review Comment:
I think this validation should happen in `CompactionTask` constructor
itself, since it is valid for MSQ minor compaction as well.
##########
indexing-service/src/main/java/org/apache/druid/indexing/common/task/NativeCompactionRunner.java:
##########
@@ -190,6 +213,73 @@ private static ParallelIndexIOConfig createIoConfig(
);
}
+ private static ParallelIndexIOConfig createMinorCompactionIoConfig(
+ TaskToolbox toolbox,
+ DataSchema dataSchema,
+ Interval interval,
+ CoordinatorClient coordinatorClient,
+ SegmentCacheManagerFactory segmentCacheManagerFactory,
+ CompactionIOConfig compactionIOConfig
+ )
+ {
+ final List<WindowedSegmentId> segmentIds =
resolveSegmentIdsForMinorCompaction(
+ (MinorCompactionInputSpec) compactionIOConfig.getInputSpec(),
+ dataSchema.getDataSource(),
+ interval
+ );
+
+ if (segmentIds.isEmpty()) {
+ throw DruidException.forPersona(DruidException.Persona.USER)
Review Comment:
is this check necessary, I think it would already be happening in
`MinorCompactionInputSpec` itself?
##########
indexing-service/src/test/java/org/apache/druid/indexing/common/task/batch/parallel/PartialCompactionTest.java:
##########
@@ -213,6 +228,51 @@ public void
testPartialCompactRangeAndDynamicPartitionedSegments()
}
}
+ /**
+ * End-to-end minor compaction: compact a subset of segments with
useConcurrentLocks (TIME_CHUNK).
+ * Non-compacted segments in the interval are upgraded via
MarkSegmentToUpgradeAction.
+ */
+ @Test
+ public void testMinorCompactionUpgradesNonCompactedSegments()
+ {
+ DataSegmentsWithSchemas dataSegmentsWithSchemas =
+ runTestTask(
+ new HashedPartitionsSpec(null, 4, null),
+ TaskState.SUCCESS,
+ false
+ );
+ verifySchema(dataSegmentsWithSchemas);
+ final Map<Interval, List<DataSegment>> hashPartitionedSegments =
+
SegmentUtils.groupSegmentsByInterval(dataSegmentsWithSchemas.getSegments());
+
+ hashPartitionedSegments.values().forEach(
+ segmentsInInterval -> segmentsInInterval.sort(
+ Comparator.comparing(segment ->
segment.getShardSpec().getPartitionNum())
+ )
+ );
+ // Pick a subset to compact (e.g. first 2 of each interval)
+ final List<DataSegment> segmentsToCompact = new ArrayList<>();
+ for (List<DataSegment> segmentsInInterval :
hashPartitionedSegments.values()) {
+ segmentsToCompact.addAll(segmentsInInterval.subList(0, Math.min(2,
segmentsInInterval.size())));
+ }
+ final CompactionTask compactionTask = newCompactionTaskBuilder()
+ .inputSpec(
+ new MinorCompactionInputSpec(
+ INTERVAL_TO_INDEX,
+
segmentsToCompact.stream().map(DataSegment::toDescriptor).collect(Collectors.toList())
+ ), true
+ )
+ .tuningConfig(newTuningConfig(new DynamicPartitionsSpec(20, null), 2,
false))
+ .context(ImmutableMap.of(Tasks.USE_CONCURRENT_LOCKS, true))
+ .build();
+ dataSegmentsWithSchemas = runTask(compactionTask, TaskState.SUCCESS);
+ verifySchema(dataSegmentsWithSchemas);
+ // After minor compaction: compacted subset produces new segments;
non-compacted segments were upgraded
+ final Map<Interval, List<DataSegment>> resultSegments =
+
SegmentUtils.groupSegmentsByInterval(dataSegmentsWithSchemas.getSegments());
+ Assert.assertFalse("Compaction should produce segments",
resultSegments.isEmpty());
Review Comment:
Nit: Should we also verify that the non-compacted segments were actually
upgraded (by looking at their `upgradedFromSegmentId` field?).
##########
indexing-service/src/main/java/org/apache/druid/indexing/common/task/CompactionTask.java:
##########
@@ -223,6 +223,15 @@ public CompactionTask(
//noinspection ConstantConditions
this.ioConfig = new
CompactionIOConfig(SpecificSegmentsSpec.fromSegments(segments), false, null);
}
+
+ if (ioConfig != null && ioConfig.getInputSpec() != null &&
ioConfig.getInputSpec() instanceof MinorCompactionInputSpec) {
+ if (computeCompactionIngestionMode(ioConfig) != IngestionMode.REPLACE) {
Review Comment:
Thanks for adding this!
We should also add a validation to check the lock type, which must be
`REPLACE` in case of minor compaction.
##########
indexing-service/src/test/java/org/apache/druid/indexing/common/task/CompactionTaskTest.java:
##########
@@ -2032,6 +2037,236 @@ public void drop(DataSegment segment)
.build();
}
+ @Test
+ public void testMinorCompactionChecksIfSegmentsToCompactIsEmpty()
+ {
+ Assert.assertThrows(
+ DruidException.class,
+ () -> new MinorCompactionInputSpec(COMPACTION_INTERVAL, List.of())
+ );
+ }
+
+ @Test
+ public void testMinorCompactionShouldAlwaysUseIngestionMode()
Review Comment:
```suggestion
public void testMinorCompactionShouldAlwaysUseReplaceIngestionMode()
```
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]