kfaraz commented on code in PR #19016:
URL: https://github.com/apache/druid/pull/19016#discussion_r2958021150


##########
docs/data-management/manual-compaction.md:
##########
@@ -117,25 +117,65 @@ The compaction `ioConfig` requires specifying `inputSpec` 
as follows:
 |Field|Description|Default|Required|
 |-----|-----------|-------|--------|
 |`type`|Task type. Set the value to `compact`.|none|Yes|
-|`inputSpec`|Specification of the target [interval](#interval-inputspec) or 
[segments](#segments-inputspec).|none|Yes|
+|`inputSpec`|Specification of the target [interval](#interval-inputspec) or 
[uncompacted](#uncompacted-inputspec).|none|Yes|
 |`dropExisting`|If `true`, the task replaces all existing segments fully 
contained by either of the following:<br />- the `interval` in the `interval` 
type `inputSpec`.<br />- the umbrella interval of the `segments` in the 
`segment` type `inputSpec`.<br />If compaction fails, Druid does not change any 
of the existing segments.<br />**WARNING**: `dropExisting` in `ioConfig` is a 
beta feature. |false|No|
-|`allowNonAlignedInterval`|If `true`, the task allows an explicit 
[`segmentGranularity`](#compaction-granularity-spec) that is not aligned with 
the provided [interval](#interval-inputspec) or 
[segments](#segments-inputspec). This parameter is only used if 
[`segmentGranularity`](#compaction-granularity-spec) is explicitly provided.<br 
/><br />This parameter is provided for backwards compatibility. In most 
scenarios it should not be set, as it can lead to data being accidentally 
overshadowed. This parameter may be removed in a future release.|false|No|
+|`allowNonAlignedInterval`|If `true`, the task allows an explicit 
[`segmentGranularity`](#compaction-granularity-spec) that is not aligned with 
the provided [interval](#interval-inputspec) or 
[uncompacted](#uncompacted-inputspec). This parameter is only used if 
[`segmentGranularity`](#compaction-granularity-spec) is explicitly provided.<br 
/><br />This parameter is provided for backwards compatibility. In most 
scenarios it should not be set, as it can lead to data being accidentally 
overshadowed. This parameter may be removed in a future release.|false|No|
 
 The compaction task has two kinds of `inputSpec`:
 
 ### Interval `inputSpec`
 
 |Field|Description|Required|
 |-----|-----------|--------|
-|`type`|Task type. Set the value to `interval`.|Yes|
+|`type`|Task type. Set the value to `interval` to trigger major 
compaction.|Yes|
 |`interval`|Interval to compact.|Yes|
 
-### Segments `inputSpec`
+### Uncompacted `inputSpec`
 
 |Field|Description|Required|
 |-----|-----------|--------|
-|`type`|Task type. Set the value to `segments`.|Yes|
-|`segments`|A list of segment IDs.|Yes|
+|`type`|Task type. Set the value to `uncompacted` to trigger minor 
compaction.|Yes|
+|`interval`|Interval to compact.|Yes|
+|`uncompactedSegments`|A list of segment descriptors.|Yes|
+
+The required segment descriptor fields can be retrieved from the "Segments" 
section in the web console.
+
+|Field|Description|Required|
+|-----|-----------|--------|
+|`itvl`|Interval of segment to compact.|Yes|
+|`ver`|Version of the segment.|Yes|
+|`part`|Partition number of the segment.|Yes|
+
+#### Example uncompacted inputSpec
+
+```json
+{
+  "type": "uncompacted",
+  "interval": "2020-01-01T00:00:00.000Z/2020-01-01T01:00:00.000Z",
+  "uncompactedSegments": [
+    {
+      "itvl": "2020-01-01T00:00:00.000Z/2020-01-01T01:00:00.000Z",
+      "ver": "2020-01-01T00:07:18.186Z",
+      "part": 0
+    },
+    {
+      "itvl": "2020-01-01T00:00:00.000Z/2020-01-01T01:00:00.000Z",
+      "ver": "2020-01-01T00:07:18.186Z",
+      "part": 1
+    }
+  ]
+}
+```
+
+When using the uncompacted `inputSpec`, the task compacts only the specified 
segments. Segments in the same interval that are not in the spec are upgraded 
in place rather than compacted. This allows compacting a subset of segments 
while preserving others.
+
+There are some requirements when triggering a minor compaction:
+- Set `useConcurrentLocks: true` in the task context. Minor compaction uses 
REPLACE locks over the entire interval.
+- `dropExisting: true` is allowed with segments `inputSpec`; the task replaces 
only the compacted segments.
+
+### Segment `inputSpec`
+
+The segment `inputSpec` is deprecated, instructions for usage will no longer 
be documented. Please use the above 2 `inputSpec` instead.

Review Comment:
   ```suggestion
   The segment `inputSpec` is deprecated, instructions for usage will no longer 
be documented. Please use the types `interval` or `uncompacted` instead.
   ```



##########
indexing-service/src/test/java/org/apache/druid/indexing/common/task/TaskLockHelperTest.java:
##########
@@ -0,0 +1,208 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.indexing.common.task;
+
+import com.google.common.collect.ImmutableList;
+import org.apache.druid.java.util.common.DateTimes;
+import org.apache.druid.java.util.common.ISE;
+import org.apache.druid.java.util.common.Intervals;
+import org.apache.druid.timeline.DataSegment;
+import org.apache.druid.timeline.SegmentId;
+import org.apache.druid.timeline.partition.NumberedOverwriteShardSpec;
+import org.apache.druid.timeline.partition.PartitionIds;
+import org.joda.time.Interval;
+import org.junit.Test;

Review Comment:
   Since this is a new test, maybe use Junit5 instead?



##########
docs/data-management/manual-compaction.md:
##########
@@ -117,25 +117,65 @@ The compaction `ioConfig` requires specifying `inputSpec` 
as follows:
 |Field|Description|Default|Required|
 |-----|-----------|-------|--------|
 |`type`|Task type. Set the value to `compact`.|none|Yes|
-|`inputSpec`|Specification of the target [interval](#interval-inputspec) or 
[segments](#segments-inputspec).|none|Yes|
+|`inputSpec`|Specification of the target [interval](#interval-inputspec) or 
[uncompacted](#uncompacted-inputspec).|none|Yes|

Review Comment:
   I wonder if we should change the type name from `uncompacted` to `minor`.
   Since this feature has not been released yet, I think we still have time to 
fix it up.
   If we do this, the field name can be changed from `uncompactedSegments` to 
`segmentsToCompact`.
   
   `uncompacted` is slightly incorrect because there may be segments that were 
appended (by a concurrent APPEND job) after the minor compaction task started. 
The minor compaction task would not compact these segments (since they are not 
present in `uncompactedSegments` passed to the spec) and would simply be 
upgraded instead.



##########
indexing-service/src/main/java/org/apache/druid/indexing/common/task/NativeCompactionRunner.java:
##########
@@ -139,6 +140,39 @@ static List<ParallelIndexIngestionSpec> 
createIngestionSpecs(
     ).collect(Collectors.toList());
   }
 
+  /**
+   * When using {@link MinorCompactionInputSpec}, resolves uncompacted segment 
descriptors that belong
+   * to the given interval and returns them as {@link WindowedSegmentId} 
objects.
+   * Returns null for interval-based compaction.
+   */
+  @Nullable
+  private static List<WindowedSegmentId> resolveSegmentIdsForInterval(
+      CompactionInputSpec inputSpec,
+      String dataSource,
+      Interval interval
+  )
+  {
+    if (!(inputSpec instanceof MinorCompactionInputSpec)) {
+      return null;
+    }
+    final List<WindowedSegmentId> segmentIds = new ArrayList<>();
+    for (SegmentDescriptor desc : ((MinorCompactionInputSpec) 
inputSpec).getUncompactedSegments()) {
+      if (interval.contains(desc.getInterval())) {
+        final SegmentId segmentId = SegmentId.of(
+            dataSource,
+            desc.getInterval(),
+            desc.getVersion(),
+            desc.getPartitionNumber()
+        );
+        segmentIds.add(new WindowedSegmentId(
+            segmentId.toString(),
+            Collections.singletonList(desc.getInterval())

Review Comment:
   Nit:
   ```suggestion
               List.of(desc.getInterval())
   ```



##########
indexing-service/src/main/java/org/apache/druid/indexing/common/task/NativeCompactionRunner.java:
##########
@@ -139,6 +140,39 @@ static List<ParallelIndexIngestionSpec> 
createIngestionSpecs(
     ).collect(Collectors.toList());
   }
 
+  /**
+   * When using {@link MinorCompactionInputSpec}, resolves uncompacted segment 
descriptors that belong
+   * to the given interval and returns them as {@link WindowedSegmentId} 
objects.
+   * Returns null for interval-based compaction.
+   */
+  @Nullable
+  private static List<WindowedSegmentId> resolveSegmentIdsForInterval(

Review Comment:
   ```suggestion
     private static List<WindowedSegmentId> resolveSegmentIdsForMinorCompaction(
   ```



##########
indexing-service/src/test/java/org/apache/druid/indexing/common/task/CompactionTaskTest.java:
##########
@@ -2032,6 +2039,247 @@ public void drop(DataSegment segment)
         .build();
   }
 
+  @Test
+  public void testFindSegmentsToLockReturnsAllSegmentsForMinorCompaction() 
throws Exception
+  {
+    final Interval testInterval = 
Intervals.of("2024-11-18T00:00:00.000Z/2024-11-25T00:00:00.000Z");
+    final String version = "2024-11-17T23:49:06.823Z";
+
+    final List<DataSegment> allSegmentsInInterval = new ArrayList<>();
+    allSegmentsInInterval.add(createSegmentWithPartition(testInterval, 
version, 0));
+    allSegmentsInInterval.add(createSegmentWithPartition(testInterval, 
version, 2));
+    allSegmentsInInterval.add(createSegmentWithPartition(testInterval, 
version, 4));
+    final DataSegment segment6 = createSegmentWithPartition(testInterval, 
version, 6);
+    final DataSegment segment7 = createSegmentWithPartition(testInterval, 
version, 7);
+    final DataSegment segment8 = createSegmentWithPartition(testInterval, 
version, 8);
+    allSegmentsInInterval.add(segment6);
+    allSegmentsInInterval.add(segment7);
+    allSegmentsInInterval.add(segment8);
+    allSegmentsInInterval.add(createSegmentWithPartition(testInterval, 
version, 10));
+    allSegmentsInInterval.add(createSegmentWithPartition(testInterval, 
version, 12));
+
+    final MinorCompactionInputSpec minorSpec = new MinorCompactionInputSpec(
+        testInterval,
+        ImmutableList.of(segment6.toDescriptor(), segment7.toDescriptor(), 
segment8.toDescriptor())

Review Comment:
   Nit: Use `List.of()`, `Map.of()` and `Set.of()` for brevity.



##########
indexing-service/src/test/java/org/apache/druid/indexing/common/task/CompactionTaskTest.java:
##########
@@ -2032,6 +2039,247 @@ public void drop(DataSegment segment)
         .build();
   }
 
+  @Test
+  public void testFindSegmentsToLockReturnsAllSegmentsForMinorCompaction() 
throws Exception
+  {
+    final Interval testInterval = 
Intervals.of("2024-11-18T00:00:00.000Z/2024-11-25T00:00:00.000Z");
+    final String version = "2024-11-17T23:49:06.823Z";
+
+    final List<DataSegment> allSegmentsInInterval = new ArrayList<>();
+    allSegmentsInInterval.add(createSegmentWithPartition(testInterval, 
version, 0));
+    allSegmentsInInterval.add(createSegmentWithPartition(testInterval, 
version, 2));
+    allSegmentsInInterval.add(createSegmentWithPartition(testInterval, 
version, 4));
+    final DataSegment segment6 = createSegmentWithPartition(testInterval, 
version, 6);
+    final DataSegment segment7 = createSegmentWithPartition(testInterval, 
version, 7);
+    final DataSegment segment8 = createSegmentWithPartition(testInterval, 
version, 8);
+    allSegmentsInInterval.add(segment6);
+    allSegmentsInInterval.add(segment7);
+    allSegmentsInInterval.add(segment8);
+    allSegmentsInInterval.add(createSegmentWithPartition(testInterval, 
version, 10));
+    allSegmentsInInterval.add(createSegmentWithPartition(testInterval, 
version, 12));
+
+    final MinorCompactionInputSpec minorSpec = new MinorCompactionInputSpec(
+        testInterval,
+        ImmutableList.of(segment6.toDescriptor(), segment7.toDescriptor(), 
segment8.toDescriptor())
+    );
+
+    final CompactionTask compactionTask = new Builder(DATA_SOURCE, 
segmentCacheManagerFactory)
+        .inputSpec(minorSpec)
+        .context(ImmutableMap.of(Tasks.USE_CONCURRENT_LOCKS, true))
+        .build();
+
+    final TestTaskActionClient taskActionClient = new 
TestTaskActionClient(allSegmentsInInterval);
+
+    // Verify findSegmentsToLock() returns ALL segments in interval (no 
filtering)
+    final List<DataSegment> segmentsToLock = compactionTask.findSegmentsToLock(

Review Comment:
   IIUC, the `findSegmentsToLock()` method is invoked only when we go down the 
segment locking code flow.
   Ideally, `CompactionTask.isReady()` would call 
`determineLockGranularityAndTryLock()` which would short-circuit to return a 
TIME_CHUNK lock and we would never go down the path that involves methods 
`determineSegmentGranularity()` or `findSegmentsToLock()`.
   
   If needed, we can perform validatations in  `CompactionTask` constructor to 
ensure that minor compaction is used only with `ingestionMode == REPLACE`. We 
need not support `REPLACE_LEGACY` (which uses `isDropExisting: false`) since it 
is a legacy mode and has been deprecated for a while. I don't think that mode 
makes sense in the context of compaction either.
   
   Let me know what you think.



##########
indexing-service/src/main/java/org/apache/druid/indexing/common/task/NativeCompactionRunner.java:
##########
@@ -318,6 +370,11 @@ Map<String, Object> createContextForSubtask(CompactionTask 
compactionTask)
     newContext.putIfAbsent(CompactSegments.STORE_COMPACTION_STATE_KEY, 
STORE_COMPACTION_STATE);
     // Set the priority of the compaction task.
     newContext.put(Tasks.PRIORITY_KEY, compactionTask.getPriority());
+    // Native minor compaction uses REPLACE ingestion mode, which uses time 
chunk lock.
+    if (compactionTask.getIoConfig().getInputSpec() instanceof 
MinorCompactionInputSpec) {
+      newContext.put(Tasks.FORCE_TIME_CHUNK_LOCK_KEY, true);
+      newContext.put(Tasks.USE_CONCURRENT_LOCKS, true);

Review Comment:
   Why should we force `useConcurrentLocks` here?
   
   If `useConcurrentLocks` is a requirement for minor compaction, I think we 
should validate that in the `CompactionTask` constructor instead so that users 
are aware of the requirement.
   
   Also, I wonder if there is any real benefit to forcing these context 
parameters here since IIUC, the locks would have already been acquired at this 
point.



##########
indexing-service/src/test/java/org/apache/druid/indexing/common/task/TaskLockHelperTest.java:
##########
@@ -0,0 +1,208 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.indexing.common.task;
+
+import com.google.common.collect.ImmutableList;
+import org.apache.druid.java.util.common.DateTimes;
+import org.apache.druid.java.util.common.ISE;
+import org.apache.druid.java.util.common.Intervals;
+import org.apache.druid.timeline.DataSegment;
+import org.apache.druid.timeline.SegmentId;
+import org.apache.druid.timeline.partition.NumberedOverwriteShardSpec;
+import org.apache.druid.timeline.partition.PartitionIds;
+import org.joda.time.Interval;
+import org.junit.Test;
+
+import java.util.Collections;
+import java.util.List;
+
+public class TaskLockHelperTest
+{
+  private static final String DATA_SOURCE = "test_datasource";
+  private static final Interval TEST_INTERVAL = 
Intervals.of("2017-01-01/2017-01-02");
+  private static final String TEST_VERSION = DateTimes.nowUtc().toString();
+
+  @Test(expected = ISE.class)
+  public void testVerifyNonConsecutiveSegmentsInInputFails()
+  {
+    // Test that non-consecutive segments within the input list fail.
+    // Compacting segments {0, 1, 3} should fail because root partition 2 is 
missing.
+    final List<DataSegment> segments = ImmutableList.of(
+        createSegment(0, 0, 1, (short) 1, (short) 1),  // rootPartitionRange 
[0, 1)
+        createSegment(1, 1, 2, (short) 1, (short) 1),   // rootPartitionRange 
[1, 2)
+        createSegment(3, 3, 4, (short) 1, (short) 1)    // rootPartitionRange 
[3, 4)
+    );
+
+    
TaskLockHelper.verifyRootPartitionIsAdjacentAndAtomicUpdateGroupIsFull(segments);
+  }
+
+  @Test
+  public void testVerifyConsecutiveSegmentsSucceedEvenIfOtherSegmentsMissing()
+  {
+    final List<DataSegment> segments = ImmutableList.of(
+        createSegment(3, 3, 4, (short) 1, (short) 1),  // rootPartitionRange 
[3, 4)
+        createSegment(4, 4, 5, (short) 1, (short) 1),   // rootPartitionRange 
[4, 5)
+        createSegment(5, 5, 6, (short) 1, (short) 1)   // rootPartitionRange 
[5, 6)
+    );
+
+    
TaskLockHelper.verifyRootPartitionIsAdjacentAndAtomicUpdateGroupIsFull(segments);
+  }
+
+  @Test
+  public void testVerifyConsecutiveSegmentsStillWorks()
+  {
+    final List<DataSegment> segments = ImmutableList.of(
+        createSegment(0, 0, 1, (short) 1, (short) 1),
+        createSegment(1, 1, 2, (short) 1, (short) 1),
+        createSegment(2, 2, 3, (short) 1, (short) 1)
+    );
+
+    
TaskLockHelper.verifyRootPartitionIsAdjacentAndAtomicUpdateGroupIsFull(segments);
+  }
+
+  @Test(expected = ISE.class)
+  public void testVerifyLargeGapSegmentsFails()
+  {
+    final List<DataSegment> segments = ImmutableList.of(
+        createSegment(0, 0, 1, (short) 1, (short) 1),
+        createSegment(1, 1, 2, (short) 1, (short) 1),
+        createSegment(10, 10, 11, (short) 1, (short) 1)
+    );
+
+    
TaskLockHelper.verifyRootPartitionIsAdjacentAndAtomicUpdateGroupIsFull(segments);
+  }
+
+  @Test
+  public void testVerifyAtomicUpdateGroupValidationStillWorks()
+  {
+    final List<DataSegment> segments = ImmutableList.of(
+        createSegment(0, 0, 1, (short) 1, (short) 2),
+        createSegment(1, 0, 1, (short) 1, (short) 2)
+    );
+
+    
TaskLockHelper.verifyRootPartitionIsAdjacentAndAtomicUpdateGroupIsFull(segments);
+  }
+
+  @Test(expected = ISE.class)
+  public void testVerifyAtomicUpdateGroupIncompleteFails()
+  {
+    final List<DataSegment> segments = ImmutableList.of(
+        createSegment(0, 0, 1, (short) 1, (short) 3),
+        createSegment(1, 0, 1, (short) 1, (short) 3)
+    );
+
+    // Should throw ISE because atomicUpdateGroupSize is 3 but we only have 2 
segments
+    
TaskLockHelper.verifyRootPartitionIsAdjacentAndAtomicUpdateGroupIsFull(segments);
+  }
+
+  @Test(expected = ISE.class)
+  public void testVerifyDifferentMinorVersionsFail()
+  {
+    // Test that segments with same root partition range but different minor 
versions fail
+    final List<DataSegment> segments = ImmutableList.of(
+        createSegment(0, 0, 1, (short) 1, (short) 2),
+        createSegment(1, 0, 1, (short) 2, (short) 2) // Different minor version
+    );
+
+    
TaskLockHelper.verifyRootPartitionIsAdjacentAndAtomicUpdateGroupIsFull(segments);
+  }
+
+  @Test(expected = ISE.class)

Review Comment:
   Use `Assertions.assertThrows` to verify the thrown exception here and in the 
other methods.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to