kfaraz commented on code in PR #19016:
URL: https://github.com/apache/druid/pull/19016#discussion_r2971226997


##########
indexing-service/src/main/java/org/apache/druid/indexing/common/task/NativeCompactionRunner.java:
##########
@@ -99,9 +104,14 @@ public CompactionConfigValidationResult 
validateCompactionTask(
           "Virtual columns in filter rules are not supported by the Native 
compaction engine. Use MSQ compaction engine instead."
       );
     }
+
     if (compactionTask.getIoConfig().getInputSpec() instanceof 
MinorCompactionInputSpec) {
-      return CompactionConfigValidationResult.failure(
-          "Minor compaction is not supported by Native  compaction engine. Use 
MSQ compaction engine instead.");
+      boolean usingConcurrentLocks = 
compactionTask.getContextValue(Tasks.USE_CONCURRENT_LOCKS);

Review Comment:
   Using the default value here will allow this code to work even when we 
change the default value to `true` in the future:
   
   ```suggestion
         boolean usingConcurrentLocks = 
compactionTask.getContextValue(Tasks.USE_CONCURRENT_LOCKS, 
Tasks.DEFAULT_USE_CONCURRENT_LOCKS);
   ```



##########
indexing-service/src/main/java/org/apache/druid/indexing/common/task/NativeCompactionRunner.java:
##########
@@ -99,9 +104,14 @@ public CompactionConfigValidationResult 
validateCompactionTask(
           "Virtual columns in filter rules are not supported by the Native 
compaction engine. Use MSQ compaction engine instead."
       );
     }
+
     if (compactionTask.getIoConfig().getInputSpec() instanceof 
MinorCompactionInputSpec) {
-      return CompactionConfigValidationResult.failure(
-          "Minor compaction is not supported by Native  compaction engine. Use 
MSQ compaction engine instead.");
+      boolean usingConcurrentLocks = 
compactionTask.getContextValue(Tasks.USE_CONCURRENT_LOCKS);

Review Comment:
   I think this validation should happen in `CompactionTask` constructor 
itself, since it is valid for MSQ minor compaction as well.



##########
indexing-service/src/main/java/org/apache/druid/indexing/common/task/NativeCompactionRunner.java:
##########
@@ -190,6 +213,73 @@ private static ParallelIndexIOConfig createIoConfig(
     );
   }
 
+  private static ParallelIndexIOConfig createMinorCompactionIoConfig(
+      TaskToolbox toolbox,
+      DataSchema dataSchema,
+      Interval interval,
+      CoordinatorClient coordinatorClient,
+      SegmentCacheManagerFactory segmentCacheManagerFactory,
+      CompactionIOConfig compactionIOConfig
+  )
+  {
+    final List<WindowedSegmentId> segmentIds = 
resolveSegmentIdsForMinorCompaction(
+        (MinorCompactionInputSpec) compactionIOConfig.getInputSpec(),
+        dataSchema.getDataSource(),
+        interval
+    );
+
+    if (segmentIds.isEmpty()) {
+      throw DruidException.forPersona(DruidException.Persona.USER)

Review Comment:
   is this check necessary, I think it would already be happening in 
`MinorCompactionInputSpec` itself?



##########
indexing-service/src/test/java/org/apache/druid/indexing/common/task/batch/parallel/PartialCompactionTest.java:
##########
@@ -213,6 +228,51 @@ public void 
testPartialCompactRangeAndDynamicPartitionedSegments()
     }
   }
 
+  /**
+   * End-to-end minor compaction: compact a subset of segments with 
useConcurrentLocks (TIME_CHUNK).
+   * Non-compacted segments in the interval are upgraded via 
MarkSegmentToUpgradeAction.
+   */
+  @Test
+  public void testMinorCompactionUpgradesNonCompactedSegments()
+  {
+    DataSegmentsWithSchemas dataSegmentsWithSchemas =
+        runTestTask(
+            new HashedPartitionsSpec(null, 4, null),
+            TaskState.SUCCESS,
+            false
+        );
+    verifySchema(dataSegmentsWithSchemas);
+    final Map<Interval, List<DataSegment>> hashPartitionedSegments =
+        
SegmentUtils.groupSegmentsByInterval(dataSegmentsWithSchemas.getSegments());
+
+    hashPartitionedSegments.values().forEach(
+        segmentsInInterval -> segmentsInInterval.sort(
+            Comparator.comparing(segment -> 
segment.getShardSpec().getPartitionNum())
+        )
+    );
+    // Pick a subset to compact (e.g. first 2 of each interval)
+    final List<DataSegment> segmentsToCompact = new ArrayList<>();
+    for (List<DataSegment> segmentsInInterval : 
hashPartitionedSegments.values()) {
+      segmentsToCompact.addAll(segmentsInInterval.subList(0, Math.min(2, 
segmentsInInterval.size())));
+    }
+    final CompactionTask compactionTask = newCompactionTaskBuilder()
+        .inputSpec(
+            new MinorCompactionInputSpec(
+                INTERVAL_TO_INDEX,
+                
segmentsToCompact.stream().map(DataSegment::toDescriptor).collect(Collectors.toList())
+            ), true
+        )
+        .tuningConfig(newTuningConfig(new DynamicPartitionsSpec(20, null), 2, 
false))
+        .context(ImmutableMap.of(Tasks.USE_CONCURRENT_LOCKS, true))
+        .build();
+    dataSegmentsWithSchemas = runTask(compactionTask, TaskState.SUCCESS);
+    verifySchema(dataSegmentsWithSchemas);
+    // After minor compaction: compacted subset produces new segments; 
non-compacted segments were upgraded
+    final Map<Interval, List<DataSegment>> resultSegments =
+        
SegmentUtils.groupSegmentsByInterval(dataSegmentsWithSchemas.getSegments());
+    Assert.assertFalse("Compaction should produce segments", 
resultSegments.isEmpty());

Review Comment:
   Nit: Should we also verify that the non-compacted segments were actually 
upgraded (by looking at their `upgradedFromSegmentId` field?).



##########
indexing-service/src/main/java/org/apache/druid/indexing/common/task/CompactionTask.java:
##########
@@ -223,6 +223,15 @@ public CompactionTask(
       //noinspection ConstantConditions
       this.ioConfig = new 
CompactionIOConfig(SpecificSegmentsSpec.fromSegments(segments), false, null);
     }
+
+    if (ioConfig != null && ioConfig.getInputSpec() != null && 
ioConfig.getInputSpec() instanceof MinorCompactionInputSpec) {
+      if (computeCompactionIngestionMode(ioConfig) != IngestionMode.REPLACE) {

Review Comment:
   Thanks for adding this!
   We should also add a validation to check the lock type, which must be 
`REPLACE` in case of minor compaction.



##########
indexing-service/src/test/java/org/apache/druid/indexing/common/task/CompactionTaskTest.java:
##########
@@ -2032,6 +2037,236 @@ public void drop(DataSegment segment)
         .build();
   }
 
+  @Test
+  public void testMinorCompactionChecksIfSegmentsToCompactIsEmpty()
+  {
+    Assert.assertThrows(
+        DruidException.class,
+        () -> new MinorCompactionInputSpec(COMPACTION_INTERVAL, List.of())
+    );
+  }
+
+  @Test
+  public void testMinorCompactionShouldAlwaysUseIngestionMode()

Review Comment:
   ```suggestion
     public void testMinorCompactionShouldAlwaysUseReplaceIngestionMode()
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to