This is an automated email from the ASF dual-hosted git repository.

gwphua pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/druid.git


The following commit(s) were added to refs/heads/master by this push:
     new d22f28277fa feat: Native Minor compaction (#19016)
d22f28277fa is described below

commit d22f28277fa79a46969e4ad8ad91bff3c756d28d
Author: Virushade <[email protected]>
AuthorDate: Thu Mar 26 15:11:59 2026 +0800

    feat: Native Minor compaction (#19016)
    
    * Test Driven Dev
    
    CompactionTaskTest
    
    TaskLockHelperTest
    
    * Minor Compaction Impl
    
    * Deprecated fixes
    
    * Check minor compaction
    
    * Minor compaction impl
    
    * Documentations
    
    * Tidy up segmentProvider
    
    * We no longer need the context key
    
    * Bug fixes
    
    * Trim up unnecessary code
    
    * Docs and stylecheck
    
    * Checkstyle
    
    * Documentations first
    
    * Integrate minor compaction spec
    
    * Checkstyle
    
    * Docs
    
    * Tidy up changes
    
    * Revert changes to SpecificSegmentsSpec
    
    * Spell check
    
    * Force time chunk lock only for minor compaction
    
    * Docs
    
    * Minor compaction test
    
    * Native minor compaction add in default values
    
    * Junit5 for TaskLockHelperTest
    
    * ImmutableXXX.of() -> XXX.of() methods in Test
    
    * Validate minor compaction task configs
    
    * Compaction IO Config creation refactor
    
    * Validations
    
    * Test fixes
    
    * Remove test asserting finding segments to lock
    
    * Validation tests
    
    * Rename uncompacted -> minor compaction
    
    * Checkstyle
    
    * Solve validation errors in InputSpecTest
    
    * Change name to use 'minor' for compaction
    
    * Minor compaction input spec fix
    
    * Revert unintended changes in Druid.xml
    
    * Remove uncompacted spellcheck exclusion
    
    * Minor compaction constructor should validate the lock
    
    * Nitpicked test name change
    
    * Remove unnecessary validation
    
    * Partial Compaction Testing
    
    * Checkstyle
    
    * Apply suggestion from @kfaraz
    
    Co-authored-by: Kashif Faraz <[email protected]>
    
    * Change ImmutableMap.of moments to Map.of
    
    ---------
    
    Co-authored-by: Kashif Faraz <[email protected]>
---
 docs/data-management/manual-compaction.md          |  54 +++-
 .../druid/indexing/common/task/CompactionTask.java |  30 ++-
 .../common/task/MinorCompactionInputSpec.java      |  30 +--
 .../common/task/NativeCompactionRunner.java        | 103 +++++++-
 .../indexing/common/task/SpecificSegmentsSpec.java |   4 +
 .../indexing/common/task/CompactionTaskTest.java   | 294 +++++++++++++++++++--
 .../common/task/MinorCompactionInputSpecTest.java  |  10 +-
 .../indexing/common/task/TaskLockHelperTest.java   | 224 ++++++++++++++++
 .../task/batch/parallel/PartialCompactionTest.java | 156 ++++++++++-
 .../indexing/ClientMinorCompactionInputSpec.java   |  24 +-
 .../indexing/ClientCompactionIntervalSpecTest.java |   6 +-
 11 files changed, 849 insertions(+), 86 deletions(-)

diff --git a/docs/data-management/manual-compaction.md 
b/docs/data-management/manual-compaction.md
index e6e34dba82e..4c13d4b40f2 100644
--- a/docs/data-management/manual-compaction.md
+++ b/docs/data-management/manual-compaction.md
@@ -117,9 +117,9 @@ The compaction `ioConfig` requires specifying `inputSpec` 
as follows:
 |Field|Description|Default|Required|
 |-----|-----------|-------|--------|
 |`type`|Task type. Set the value to `compact`.|none|Yes|
-|`inputSpec`|Specification of the target [interval](#interval-inputspec) or 
[segments](#segments-inputspec).|none|Yes|
-|`dropExisting`|If `true`, the task replaces all existing segments fully 
contained by either of the following:<br />- the `interval` in the `interval` 
type `inputSpec`.<br />- the umbrella interval of the `segments` in the 
`segment` type `inputSpec`.<br />If compaction fails, Druid does not change any 
of the existing segments.<br />**WARNING**: `dropExisting` in `ioConfig` is a 
beta feature. |false|No|
-|`allowNonAlignedInterval`|If `true`, the task allows an explicit 
[`segmentGranularity`](#compaction-granularity-spec) that is not aligned with 
the provided [interval](#interval-inputspec) or 
[segments](#segments-inputspec). This parameter is only used if 
[`segmentGranularity`](#compaction-granularity-spec) is explicitly provided.<br 
/><br />This parameter is provided for backwards compatibility. In most 
scenarios it should not be set, as it can lead to data being accidentally 
overshadow [...]
+|`inputSpec`|Specification of the target [interval](#interval-inputspec) or 
[minor](#minor-inputspec).|none|Yes|
+|`dropExisting`|If `true`, the task replaces all existing segments fully 
contained by the `interval` in the `interval` type `inputSpec`. If compaction 
fails, Druid does not change any of the existing segments.<br />**WARNING**: 
`dropExisting` in `ioConfig` is a beta feature. |false|No|
+|`allowNonAlignedInterval`|If `true`, the task allows an explicit 
[`segmentGranularity`](#compaction-granularity-spec) that is not aligned with 
the provided [interval](#interval-inputspec) or [minor](#minor-inputspec). This 
parameter is only used if [`segmentGranularity`](#compaction-granularity-spec) 
is explicitly provided.<br /><br />This parameter is provided for backwards 
compatibility. In most scenarios it should not be set, as it can lead to data 
being accidentally overshadowed. Th [...]
 
 The compaction task has two kinds of `inputSpec`:
 
@@ -127,15 +127,55 @@ The compaction task has two kinds of `inputSpec`:
 
 |Field|Description|Required|
 |-----|-----------|--------|
-|`type`|Task type. Set the value to `interval`.|Yes|
+|`type`|Task type. Set the value to `interval` to trigger major 
compaction.|Yes|
 |`interval`|Interval to compact.|Yes|
 
-### Segments `inputSpec`
+### Minor `inputSpec`
 
 |Field|Description|Required|
 |-----|-----------|--------|
-|`type`|Task type. Set the value to `segments`.|Yes|
-|`segments`|A list of segment IDs.|Yes|
+|`type`|Task type. Set the value to `minor` to trigger minor compaction.|Yes|
+|`interval`|Interval to compact.|Yes|
+|`segments`|A list of segment descriptors.|Yes|
+
+The required segment descriptor fields can be retrieved from the "Segments" 
section in the web console.
+
+|Field|Description|Required|
+|-----|-----------|--------|
+|`itvl`|Interval of segment to compact.|Yes|
+|`ver`|Version of the segment.|Yes|
+|`part`|Partition number of the segment.|Yes|
+
+#### Example minor inputSpec
+
+```json
+{
+  "type": "minor",
+  "interval": "2020-01-01T00:00:00.000Z/2020-01-01T01:00:00.000Z",
+  "segments": [
+    {
+      "itvl": "2020-01-01T00:00:00.000Z/2020-01-01T01:00:00.000Z",
+      "ver": "2020-01-01T00:07:18.186Z",
+      "part": 0
+    },
+    {
+      "itvl": "2020-01-01T00:00:00.000Z/2020-01-01T01:00:00.000Z",
+      "ver": "2020-01-01T00:07:18.186Z",
+      "part": 1
+    }
+  ]
+}
+```
+
+When using the minor `inputSpec`, the task compacts only the specified 
segments. Segments in the same interval that are not in the spec are upgraded 
in place rather than compacted. This allows compacting a subset of segments 
while preserving others.
+
+There are some requirements when triggering a minor compaction:
+- Set `useConcurrentLocks: true` in the task context. Minor compaction uses 
REPLACE locks over the entire interval.
+- Set `dropExisting: true` in the ioConfig.
+
+### Segment `inputSpec`
+
+No longer documented as this is deprecated. Please use `interval` or `minor` 
specs instead.
 
 ## Compaction dimensions spec
 
diff --git 
a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/CompactionTask.java
 
b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/CompactionTask.java
index 6cbdb1dc6a4..82310272b36 100644
--- 
a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/CompactionTask.java
+++ 
b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/CompactionTask.java
@@ -223,6 +223,23 @@ public class CompactionTask extends AbstractBatchIndexTask 
implements PendingSeg
       //noinspection ConstantConditions
       this.ioConfig = new 
CompactionIOConfig(SpecificSegmentsSpec.fromSegments(segments), false, null);
     }
+
+    if (ioConfig != null && ioConfig.getInputSpec() != null && 
ioConfig.getInputSpec() instanceof MinorCompactionInputSpec) {
+      if (computeCompactionIngestionMode(ioConfig) != IngestionMode.REPLACE) {
+        throw DruidException.forPersona(DruidException.Persona.USER)
+                            .ofCategory(DruidException.Category.INVALID_INPUT)
+                            .build("Minor compaction is only used with REPLACE 
ingestion mode. Please set ioconfig[isDropExisting] to true.");
+      }
+
+      boolean usingConcurrentLocks = 
this.getContextValue(Tasks.USE_CONCURRENT_LOCKS, 
Tasks.DEFAULT_USE_CONCURRENT_LOCKS);
+
+      if (!usingConcurrentLocks) {
+        throw DruidException.forPersona(DruidException.Persona.USER)
+                            .ofCategory(DruidException.Category.INVALID_INPUT)
+                            .build("Minor compaction is only used with REPLACE 
ingestion mode. Please set ioconfig[%s] to true.", Tasks.USE_CONCURRENT_LOCKS);
+      }
+    }
+
     this.dimensionsSpec = dimensionsSpec == null ? dimensions : dimensionsSpec;
     this.transformSpec = transformSpec;
     this.metricsSpec = metricsSpec;
@@ -1284,6 +1301,11 @@ public class CompactionTask extends 
AbstractBatchIndexTask implements PendingSeg
     }
   }
 
+  /**
+   * Provides segment discovery and validation for compaction.
+   * For minor compaction (MinorCompactionInputSpec), finds all segments
+   * in the interval and partitions them into 'compact and upgrade metadata' 
vs 'upgrade metadata only' via {@link #shouldUpgradeSegment}.
+   */
   @VisibleForTesting
   static class SegmentProvider
   {
@@ -1292,7 +1314,7 @@ public class CompactionTask extends 
AbstractBatchIndexTask implements PendingSeg
     private final Interval interval;
 
     private final boolean minorCompaction;
-    private final Set<SegmentDescriptor> uncompactedSegments;
+    private final Set<SegmentDescriptor> segmentsToCompact;
 
     SegmentProvider(String dataSource, CompactionInputSpec inputSpec)
     {
@@ -1301,17 +1323,17 @@ public class CompactionTask extends 
AbstractBatchIndexTask implements PendingSeg
       this.interval = inputSpec.findInterval(dataSource);
       if (inputSpec instanceof MinorCompactionInputSpec) {
         minorCompaction = true;
-        uncompactedSegments = Set.copyOf(((MinorCompactionInputSpec) 
inputSpec).getUncompactedSegments());
+        segmentsToCompact = Set.copyOf(((MinorCompactionInputSpec) 
inputSpec).getSegments());
       } else {
         minorCompaction = false;
-        uncompactedSegments = null;
+        segmentsToCompact = null;
       }
     }
 
     private boolean shouldUpgradeSegment(DataSegment s)
     {
       if (minorCompaction) {
-        return !uncompactedSegments.contains(s.toDescriptor()) && 
this.interval.contains(s.getInterval());
+        return !segmentsToCompact.contains(s.toDescriptor()) && 
this.interval.contains(s.getInterval());
       } else {
         return false;
       }
diff --git 
a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/MinorCompactionInputSpec.java
 
b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/MinorCompactionInputSpec.java
index 0176386bf07..16fbcd4e5ca 100644
--- 
a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/MinorCompactionInputSpec.java
+++ 
b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/MinorCompactionInputSpec.java
@@ -38,40 +38,40 @@ import java.util.stream.Collectors;
  */
 public class MinorCompactionInputSpec implements CompactionInputSpec
 {
-  public static final String TYPE = "uncompacted";
+  public static final String TYPE = "minor";
 
   private final Interval interval;
-  private final List<SegmentDescriptor> uncompactedSegments;
+  private final List<SegmentDescriptor> segments;
 
   @JsonCreator
   public MinorCompactionInputSpec(
       @JsonProperty("interval") Interval interval,
-      @JsonProperty("uncompactedSegments") List<SegmentDescriptor> 
uncompactedSegments
+      @JsonProperty("segments") List<SegmentDescriptor> segments
   )
   {
-    InvalidInput.conditionalException(interval != null, "Uncompacted interval 
must not be null");
+    InvalidInput.conditionalException(interval != null, "Minor compaction 
interval must not be null");
     InvalidInput.conditionalException(
         interval.toDurationMillis() > 0,
-        "Uncompacted interval[%s] is empty, must specify a nonempty interval",
+        "Minor compaction interval[%s] is empty, must specify a nonempty 
interval",
         interval
     );
     InvalidInput.conditionalException(
-        uncompactedSegments != null && !uncompactedSegments.isEmpty(),
-        "Uncompacted segments must not be null or empty"
+        segments != null && !segments.isEmpty(),
+        "Minor compaction specified segments must not be null or empty"
     );
 
     // Validate that all segments are within the interval
     List<SegmentDescriptor> segmentsNotInInterval =
-        uncompactedSegments.stream().filter(s -> 
!interval.contains(s.getInterval())).collect(Collectors.toList());
+        segments.stream().filter(s -> 
!interval.contains(s.getInterval())).collect(Collectors.toList());
     InvalidInput.conditionalException(
         segmentsNotInInterval.isEmpty(),
-        "All uncompacted segments must be within interval[%s], got segments 
outside interval: %s",
+        "All segments must be within interval[%s], got segments outside 
interval: %s",
         interval,
         segmentsNotInInterval
     );
 
     this.interval = interval;
-    this.uncompactedSegments = uncompactedSegments;
+    this.segments = segments;
   }
 
   @JsonProperty
@@ -81,9 +81,9 @@ public class MinorCompactionInputSpec implements 
CompactionInputSpec
   }
 
   @JsonProperty
-  public List<SegmentDescriptor> getUncompactedSegments()
+  public List<SegmentDescriptor> getSegments()
   {
-    return uncompactedSegments;
+    return segments;
   }
 
   @Override
@@ -112,13 +112,13 @@ public class MinorCompactionInputSpec implements 
CompactionInputSpec
     }
     MinorCompactionInputSpec that = (MinorCompactionInputSpec) o;
     return Objects.equals(interval, that.interval) &&
-           Objects.equals(uncompactedSegments, that.uncompactedSegments);
+           Objects.equals(segments, that.segments);
   }
 
   @Override
   public int hashCode()
   {
-    return Objects.hash(interval, uncompactedSegments);
+    return Objects.hash(interval, segments);
   }
 
   @Override
@@ -126,7 +126,7 @@ public class MinorCompactionInputSpec implements 
CompactionInputSpec
   {
     return "MinorCompactionInputSpec{" +
            "interval=" + interval +
-           ", uncompactedSegments=" + uncompactedSegments +
+           ", segments=" + segments +
            '}';
   }
 }
diff --git 
a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/NativeCompactionRunner.java
 
b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/NativeCompactionRunner.java
index afde45cdb13..74a9b0aaef4 100644
--- 
a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/NativeCompactionRunner.java
+++ 
b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/NativeCompactionRunner.java
@@ -35,20 +35,24 @@ import 
org.apache.druid.indexing.common.task.batch.parallel.ParallelIndexIOConfi
 import 
org.apache.druid.indexing.common.task.batch.parallel.ParallelIndexIngestionSpec;
 import 
org.apache.druid.indexing.common.task.batch.parallel.ParallelIndexSupervisorTask;
 import org.apache.druid.indexing.input.DruidInputSource;
+import org.apache.druid.indexing.input.WindowedSegmentId;
 import org.apache.druid.java.util.common.IAE;
 import org.apache.druid.java.util.common.Intervals;
 import org.apache.druid.java.util.common.JodaUtils;
 import org.apache.druid.java.util.common.StringUtils;
 import org.apache.druid.java.util.common.granularity.Granularity;
 import org.apache.druid.java.util.common.logger.Logger;
+import org.apache.druid.query.SegmentDescriptor;
 import org.apache.druid.query.spec.QuerySegmentSpec;
 import org.apache.druid.segment.indexing.DataSchema;
 import org.apache.druid.server.coordinator.CompactionConfigValidationResult;
 import org.apache.druid.server.coordinator.duty.CompactSegments;
+import org.apache.druid.timeline.SegmentId;
 import org.apache.druid.utils.CollectionUtils;
 import org.joda.time.Interval;
 
 import javax.annotation.Nullable;
+import java.util.ArrayList;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
@@ -99,9 +103,14 @@ public class NativeCompactionRunner implements 
CompactionRunner
           "Virtual columns in filter rules are not supported by the Native 
compaction engine. Use MSQ compaction engine instead."
       );
     }
+
     if (compactionTask.getIoConfig().getInputSpec() instanceof 
MinorCompactionInputSpec) {
-      return CompactionConfigValidationResult.failure(
-          "Minor compaction is not supported by Native  compaction engine. Use 
MSQ compaction engine instead.");
+      boolean usingConcurrentLocks = 
compactionTask.getContextValue(Tasks.USE_CONCURRENT_LOCKS, 
Tasks.DEFAULT_USE_CONCURRENT_LOCKS);
+      if (!usingConcurrentLocks) {
+        return CompactionConfigValidationResult.failure(
+            "Task context[%s] must be true when using native minor 
compaction", Tasks.USE_CONCURRENT_LOCKS
+        );
+      }
     }
     return CompactionConfigValidationResult.success();
   }
@@ -153,19 +162,32 @@ public class NativeCompactionRunner implements 
CompactionRunner
       SegmentCacheManagerFactory segmentCacheManagerFactory,
       CompactionIOConfig compactionIOConfig
   )
+  {
+    return (compactionIOConfig.getInputSpec() instanceof 
MinorCompactionInputSpec)
+           ? createMinorCompactionIoConfig(toolbox, dataSchema, interval, 
coordinatorClient, segmentCacheManagerFactory, compactionIOConfig)
+           : createMajorCompactionIoConfig(toolbox, dataSchema, interval, 
coordinatorClient, segmentCacheManagerFactory, compactionIOConfig);
+  }
+
+  private static ParallelIndexIOConfig createMajorCompactionIoConfig(
+      TaskToolbox toolbox,
+      DataSchema dataSchema,
+      Interval inputInterval,
+      CoordinatorClient coordinatorClient,
+      SegmentCacheManagerFactory segmentCacheManagerFactory,
+      CompactionIOConfig compactionIOConfig
+  )
   {
     if (!compactionIOConfig.isAllowNonAlignedInterval()) {
-      // Validate interval alignment.
       final Granularity segmentGranularity = 
dataSchema.getGranularitySpec().getSegmentGranularity();
       final Interval widenedInterval = Intervals.utc(
-          segmentGranularity.bucketStart(interval.getStart()).getMillis(),
-          segmentGranularity.bucketEnd(interval.getEnd().minus(1)).getMillis()
+          segmentGranularity.bucketStart(inputInterval.getStart()).getMillis(),
+          
segmentGranularity.bucketEnd(inputInterval.getEnd().minus(1)).getMillis()
       );
 
-      if (!interval.equals(widenedInterval)) {
+      if (!inputInterval.equals(widenedInterval)) {
         throw new IAE(
             "Interval[%s] to compact is not aligned with 
segmentGranularity[%s]",
-            interval,
+            inputInterval,
             segmentGranularity
         );
       }
@@ -174,7 +196,7 @@ public class NativeCompactionRunner implements 
CompactionRunner
     return new ParallelIndexIOConfig(
         new DruidInputSource(
             dataSchema.getDataSource(),
-            interval,
+            inputInterval,
             null,
             null,
             null,
@@ -190,6 +212,67 @@ public class NativeCompactionRunner implements 
CompactionRunner
     );
   }
 
+  private static ParallelIndexIOConfig createMinorCompactionIoConfig(
+      TaskToolbox toolbox,
+      DataSchema dataSchema,
+      Interval interval,
+      CoordinatorClient coordinatorClient,
+      SegmentCacheManagerFactory segmentCacheManagerFactory,
+      CompactionIOConfig compactionIOConfig
+  )
+  {
+    final List<WindowedSegmentId> segmentIds = 
resolveSegmentIdsForMinorCompaction(
+        (MinorCompactionInputSpec) compactionIOConfig.getInputSpec(),
+        dataSchema.getDataSource(),
+        interval
+    );
+
+    return new ParallelIndexIOConfig(
+        new DruidInputSource(
+            dataSchema.getDataSource(),
+            null,
+            segmentIds,
+            null,
+            null,
+            null,
+            toolbox.getIndexIO(),
+            coordinatorClient,
+            segmentCacheManagerFactory,
+            toolbox.getConfig()
+        ).withTaskToolbox(toolbox),
+        null,
+        false,
+        compactionIOConfig.isDropExisting()
+    );
+  }
+
+  /**
+   * When using {@link MinorCompactionInputSpec}, resolves segment descriptors 
to compact that belong
+   * to the given interval and returns them as {@link WindowedSegmentId} 
objects.
+   */
+  private static List<WindowedSegmentId> resolveSegmentIdsForMinorCompaction(
+      MinorCompactionInputSpec inputSpec,
+      String dataSource,
+      Interval interval
+  )
+  {
+    final List<WindowedSegmentId> segmentIds = new ArrayList<>();
+    for (SegmentDescriptor desc : inputSpec.getSegments()) {
+      if (interval.contains(desc.getInterval())) {
+        final SegmentId segmentId = SegmentId.of(
+            dataSource,
+            desc.getInterval(),
+            desc.getVersion(),
+            desc.getPartitionNumber()
+        );
+        segmentIds.add(
+            new WindowedSegmentId(segmentId.toString(), 
List.of(desc.getInterval()))
+        );
+      }
+    }
+    return segmentIds;
+  }
+
   @Override
   public TaskStatus runCompactionTasks(
       CompactionTask compactionTask,
@@ -318,6 +401,10 @@ public class NativeCompactionRunner implements 
CompactionRunner
     newContext.putIfAbsent(CompactSegments.STORE_COMPACTION_STATE_KEY, 
STORE_COMPACTION_STATE);
     // Set the priority of the compaction task.
     newContext.put(Tasks.PRIORITY_KEY, compactionTask.getPriority());
+    // Native minor compaction uses REPLACE ingestion mode, which uses time 
chunk lock.
+    if (compactionTask.getIoConfig().getInputSpec() instanceof 
MinorCompactionInputSpec) {
+      newContext.put(Tasks.FORCE_TIME_CHUNK_LOCK_KEY, true);
+    }
     return newContext;
   }
 
diff --git 
a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/SpecificSegmentsSpec.java
 
b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/SpecificSegmentsSpec.java
index 7b26c22a4a5..93f181d0dca 100644
--- 
a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/SpecificSegmentsSpec.java
+++ 
b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/SpecificSegmentsSpec.java
@@ -33,6 +33,10 @@ import java.util.List;
 import java.util.Objects;
 import java.util.stream.Collectors;
 
+/**
+ * @deprecated Use {@link MinorCompactionInputSpec} for minor compaction 
instead.
+ */
+@Deprecated
 public class SpecificSegmentsSpec implements CompactionInputSpec
 {
   public static final String TYPE = "segments";
diff --git 
a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/CompactionTaskTest.java
 
b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/CompactionTaskTest.java
index 3a51d5b9fe2..10c27c0b268 100644
--- 
a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/CompactionTaskTest.java
+++ 
b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/CompactionTaskTest.java
@@ -29,9 +29,6 @@ import com.fasterxml.jackson.databind.jsontype.NamedType;
 import com.fasterxml.jackson.databind.module.SimpleModule;
 import com.google.common.base.Preconditions;
 import com.google.common.base.Supplier;
-import com.google.common.collect.ImmutableList;
-import com.google.common.collect.ImmutableMap;
-import com.google.common.collect.ImmutableSet;
 import com.google.common.collect.Iterables;
 import com.google.common.collect.Lists;
 import com.google.common.collect.Maps;
@@ -50,6 +47,7 @@ import org.apache.druid.data.input.impl.FloatDimensionSchema;
 import org.apache.druid.data.input.impl.LongDimensionSchema;
 import org.apache.druid.data.input.impl.StringDimensionSchema;
 import org.apache.druid.data.input.impl.TimestampSpec;
+import org.apache.druid.error.DruidException;
 import org.apache.druid.guice.GuiceAnnotationIntrospector;
 import org.apache.druid.guice.GuiceInjectableValues;
 import org.apache.druid.guice.GuiceInjectors;
@@ -62,9 +60,12 @@ import org.apache.druid.indexing.common.LockGranularity;
 import org.apache.druid.indexing.common.SegmentCacheManagerFactory;
 import org.apache.druid.indexing.common.TaskToolbox;
 import org.apache.druid.indexing.common.TestUtils;
+import org.apache.druid.indexing.common.actions.LocalTaskActionClient;
+import org.apache.druid.indexing.common.actions.MarkSegmentToUpgradeAction;
 import org.apache.druid.indexing.common.actions.RetrieveUsedSegmentsAction;
 import org.apache.druid.indexing.common.actions.TaskAction;
 import org.apache.druid.indexing.common.actions.TaskActionClient;
+import org.apache.druid.indexing.common.actions.TaskActionTestKit;
 import org.apache.druid.indexing.common.config.TaskConfig;
 import org.apache.druid.indexing.common.config.TaskConfigBuilder;
 import org.apache.druid.indexing.common.task.CompactionTask.Builder;
@@ -73,6 +74,7 @@ import 
org.apache.druid.indexing.common.task.IndexTask.IndexTuningConfig;
 import 
org.apache.druid.indexing.common.task.NativeCompactionRunner.PartitionConfigurationManager;
 import 
org.apache.druid.indexing.common.task.batch.parallel.ParallelIndexIOConfig;
 import 
org.apache.druid.indexing.common.task.batch.parallel.ParallelIndexIngestionSpec;
+import 
org.apache.druid.indexing.common.task.batch.parallel.ParallelIndexSupervisorTask;
 import 
org.apache.druid.indexing.common.task.batch.parallel.ParallelIndexTuningConfig;
 import org.apache.druid.indexing.input.DruidInputSource;
 import org.apache.druid.jackson.DefaultObjectMapper;
@@ -97,6 +99,7 @@ import 
org.apache.druid.query.aggregation.firstlast.first.FloatFirstAggregatorFa
 import 
org.apache.druid.query.aggregation.firstlast.last.DoubleLastAggregatorFactory;
 import org.apache.druid.query.expression.TestExprMacroTable;
 import org.apache.druid.query.filter.SelectorDimFilter;
+import org.apache.druid.query.spec.MultipleIntervalSegmentSpec;
 import org.apache.druid.query.spec.QuerySegmentSpec;
 import org.apache.druid.segment.AutoTypeColumnSchema;
 import org.apache.druid.segment.IndexIO;
@@ -186,7 +189,7 @@ public class CompactionTaskTest
   private static final String TIMESTAMP_COLUMN = "timestamp";
   private static final String MIXED_TYPE_COLUMN = "string_to_double";
   private static final Interval COMPACTION_INTERVAL = 
Intervals.of("2017-01-01/2017-07-01");
-  private static final List<Interval> SEGMENT_INTERVALS = ImmutableList.of(
+  private static final List<Interval> SEGMENT_INTERVALS = List.of(
       Intervals.of("2017-01-01/2017-02-01"),
       Intervals.of("2017-02-01/2017-03-01"),
       Intervals.of("2017-03-01/2017-04-01"),
@@ -271,7 +274,7 @@ public class CompactionTaskTest
               DATA_SOURCE,
               SEGMENT_INTERVALS.get(i),
               "version_" + i,
-              ImmutableMap.of(),
+              Map.of(),
               findDimensions(i, SEGMENT_INTERVALS.get(i)),
               
AGGREGATORS.stream().map(AggregatorFactory::getName).collect(Collectors.toList()),
               new NumberedShardSpec(0, 1),
@@ -299,7 +302,7 @@ public class CompactionTaskTest
     );
     GuiceInjectableValues injectableValues = new GuiceInjectableValues(
         GuiceInjectors.makeStartupInjectorWithModules(
-            ImmutableList.of(
+            List.of(
                 binder -> {
                   
binder.bind(AuthorizerMapper.class).toInstance(AuthTestUtils.TEST_AUTHORIZER_MAPPER);
                   binder.bind(ChatHandlerProvider.class).toInstance(new 
NoopChatHandlerProvider());
@@ -359,6 +362,9 @@ public class CompactionTaskTest
   @Rule
   public ExpectedException expectedException = ExpectedException.none();
 
+  @Rule
+  public TaskActionTestKit taskActionTestKit = new TaskActionTestKit();
+
   private StubServiceEmitter emitter;
 
   @Before
@@ -520,7 +526,7 @@ public class CompactionTaskTest
             new CompactionIntervalSpec(COMPACTION_INTERVAL, 
SegmentUtils.hashIds(SEGMENTS))
         )
         .tuningConfig(createTuningConfig())
-        .context(ImmutableMap.of("testKey", "testContext"))
+        .context(Map.of("testKey", "testContext"))
         .build();
 
     final byte[] bytes = OBJECT_MAPPER.writeValueAsBytes(task);
@@ -538,7 +544,7 @@ public class CompactionTaskTest
     final CompactionTask task = builder
         .segments(SEGMENTS)
         .tuningConfig(createTuningConfig())
-        .context(ImmutableMap.of("testKey", "testContext"))
+        .context(Map.of("testKey", "testContext"))
         .build();
 
     final byte[] bytes = OBJECT_MAPPER.writeValueAsBytes(task);
@@ -558,7 +564,7 @@ public class CompactionTaskTest
         .segments(SEGMENTS)
         .dimensionsSpec(
             new DimensionsSpec(
-                ImmutableList.of(
+                List.of(
                     new StringDimensionSchema("dim1"),
                     new StringDimensionSchema("dim2"),
                     new StringDimensionSchema("dim3")
@@ -566,7 +572,7 @@ public class CompactionTaskTest
             )
         )
         .tuningConfig(createTuningConfig())
-        .context(ImmutableMap.of("testKey", "testVal"))
+        .context(Map.of("testKey", "testVal"))
         .build();
 
     final byte[] bytes = OBJECT_MAPPER.writeValueAsBytes(task);
@@ -675,7 +681,7 @@ public class CompactionTaskTest
             new CompactionIntervalSpec(COMPACTION_INTERVAL, 
SegmentUtils.hashIds(SEGMENTS))
         )
         .tuningConfig(createTuningConfig())
-        .context(ImmutableMap.of("testKey", "testContext"))
+        .context(Map.of("testKey", "testContext"))
         .build();
 
     Assert.assertTrue(task.getInputSourceResources().isEmpty());
@@ -783,7 +789,7 @@ public class CompactionTaskTest
     expectedException.expectMessage(
         "No segments found for compaction. Please check that datasource name 
and interval are correct."
     );
-    provider.checkSegments(LockGranularity.TIME_CHUNK, ImmutableList.of());
+    provider.checkSegments(LockGranularity.TIME_CHUNK, List.of());
   }
 
   @Test
@@ -1233,11 +1239,10 @@ public class CompactionTaskTest
 
     final TestIndexIO indexIO = (TestIndexIO) toolbox.getIndexIO();
     
indexIO.removeMetadata(Iterables.getFirst(indexIO.getQueryableIndexMap().keySet(),
 null));
-    final List<DataSegment> segments = new ArrayList<>(SEGMENTS);
     final Map<QuerySegmentSpec, DataSchema> inputSchemas = 
CompactionTask.createInputDataSchemas(
         toolbox,
         LockGranularity.TIME_CHUNK,
-        new SegmentProvider(DATA_SOURCE, 
SpecificSegmentsSpec.fromSegments(segments)),
+        new SegmentProvider(DATA_SOURCE, new 
CompactionIntervalSpec(COMPACTION_INTERVAL, null)),
         null,
         null,
         null,
@@ -1298,7 +1303,7 @@ public class CompactionTaskTest
         COORDINATOR_CLIENT,
         segmentCacheManagerFactory
     );
-    final List<DimensionsSpec> expectedDimensionsSpec = ImmutableList.of(
+    final List<DimensionsSpec> expectedDimensionsSpec = List.of(
         new DimensionsSpec(getDimensionSchema(new 
DoubleDimensionSchema("string_to_double")))
     );
 
@@ -1393,7 +1398,7 @@ public class CompactionTaskTest
     );
 
 
-    final List<DimensionsSpec> expectedDimensionsSpec = ImmutableList.of(
+    final List<DimensionsSpec> expectedDimensionsSpec = List.of(
         new DimensionsSpec(getDimensionSchema(new 
DoubleDimensionSchema("string_to_double")))
     );
 
@@ -1626,7 +1631,7 @@ public class CompactionTaskTest
                                                 new 
DimensionRangePartitionsSpec(
                                                     3,
                                                     null,
-                                                    ImmutableList.of(
+                                                    List.of(
                                                         "string_dim_1"),
                                                     false
                                                 ))
@@ -1647,7 +1652,7 @@ public class CompactionTaskTest
     builder.granularitySpec(new ClientCompactionTaskGranularitySpec(null, 
null, true));
 
     DimensionSchema stringDim = new StringDimensionSchema("string_dim_1", 
null, null);
-    builder.dimensionsSpec(new DimensionsSpec(ImmutableList.of(stringDim)));
+    builder.dimensionsSpec(new DimensionsSpec(List.of(stringDim)));
     final CompactionTask compactionTask = builder.build();
     // A string dimension with rollup=true should need MVD info
     Assert.assertTrue(compactionTask.identifyMultiValuedDimensions());
@@ -1670,12 +1675,12 @@ public class CompactionTaskTest
                                                 new 
DimensionRangePartitionsSpec(
                                                     3,
                                                     null,
-                                                    ImmutableList.of(
+                                                    List.of(
                                                         stringDim.getName()),
                                                     false
                                                 ))
                                             .build());
-    builder.dimensionsSpec(new DimensionsSpec(ImmutableList.of(stringDim)));
+    builder.dimensionsSpec(new DimensionsSpec(List.of(stringDim)));
     CompactionTask compactionTask = builder.build();
     Assert.assertTrue(compactionTask.identifyMultiValuedDimensions());
   }
@@ -1702,7 +1707,7 @@ public class CompactionTaskTest
                                                 )
                                             )
                                             .build());
-    builder.dimensionsSpec(new DimensionsSpec(ImmutableList.of(stringDim)));
+    builder.dimensionsSpec(new DimensionsSpec(List.of(stringDim)));
     CompactionTask compactionTask = builder.build();
     Assert.assertFalse(compactionTask.identifyMultiValuedDimensions());
   }
@@ -1727,7 +1732,7 @@ public class CompactionTaskTest
   @Test
   public void testChooseFinestGranularityNone()
   {
-    List<Granularity> input = ImmutableList.of(
+    List<Granularity> input = List.of(
         Granularities.DAY,
         Granularities.SECOND,
         Granularities.MINUTE,
@@ -1792,7 +1797,7 @@ public class CompactionTaskTest
 
   private static List<DimensionsSpec> 
getExpectedDimensionsSpecForAutoGeneration()
   {
-    return ImmutableList.of(
+    return List.of(
         new DimensionsSpec(getDimensionSchema(new 
StringDimensionSchema("string_to_double", 
DimensionSchema.MultiValueHandling.ARRAY, null))),
         new DimensionsSpec(getDimensionSchema(new 
StringDimensionSchema("string_to_double", 
DimensionSchema.MultiValueHandling.ARRAY, null))),
         new DimensionsSpec(getDimensionSchema(new 
StringDimensionSchema("string_to_double", 
DimensionSchema.MultiValueHandling.ARRAY, null))),
@@ -1955,7 +1960,7 @@ public class CompactionTaskTest
         List<Interval> intervals
     )
     {
-      return 
Futures.immediateFuture(ImmutableList.copyOf(segmentMap.keySet()));
+      return Futures.immediateFuture(List.copyOf(segmentMap.keySet()));
     }
   }
 
@@ -2032,6 +2037,236 @@ public class CompactionTaskTest
         .build();
   }
 
+  @Test
+  public void testMinorCompactionChecksIfSegmentsToCompactIsEmpty()
+  {
+    Assert.assertThrows(
+        DruidException.class,
+        () -> new MinorCompactionInputSpec(COMPACTION_INTERVAL, List.of())
+    );
+  }
+
+  @Test
+  public void testMinorCompactionShouldAlwaysUseReplaceIngestionMode()
+  {
+    final Interval testInterval = 
Intervals.of("2024-11-18T00:00:00.000Z/2024-11-25T00:00:00.000Z");
+    final String version = "2024-11-17T23:49:06.823Z";
+    final DataSegment segment = createSegmentWithPartition(testInterval, 
version, 1);
+
+    final MinorCompactionInputSpec minorSpec = new MinorCompactionInputSpec(
+        testInterval,
+        List.of(segment.toDescriptor())
+    );
+
+    Assert.assertThrows(
+        DruidException.class,
+        // Setting dropExisting == false disables REPLACE mode.
+        () -> new Builder(DATA_SOURCE, segmentCacheManagerFactory)
+            .inputSpec(minorSpec, false)
+            .context(Map.of(Tasks.USE_CONCURRENT_LOCKS, true)).build()
+    );
+  }
+
+  @Test
+  public void testMinorCompactionUsesTimeChunkLockWithConcurrentLocks() throws 
Exception
+  {
+    final Interval testInterval = 
Intervals.of("2024-11-18T00:00:00.000Z/2024-11-25T00:00:00.000Z");
+    final List<DataSegment> segments = List.of(
+        createSegmentWithPartition(testInterval, "v1", 0),
+        createSegmentWithPartition(testInterval, "v1", 1)
+    );
+    final MinorCompactionInputSpec spec = new MinorCompactionInputSpec(
+        testInterval,
+        
segments.stream().map(DataSegment::toDescriptor).collect(Collectors.toList())
+    );
+
+    final CompactionTask task = new Builder(DATA_SOURCE, 
segmentCacheManagerFactory)
+        .inputSpec(spec, true)
+        .context(Map.of(Tasks.USE_CONCURRENT_LOCKS, true))
+        .build();
+
+    taskActionTestKit.getTaskLockbox().add(task);
+    final TaskActionClient taskActionClient = new LocalTaskActionClient(
+        task,
+        taskActionTestKit.getTaskActionToolbox()
+    );
+    // Use a client that returns segments for RetrieveUsedSegmentsAction - 
wrap to inject segments
+    final TaskActionClient segmentAwareClient = new TaskActionClient()
+    {
+      @Override
+      public <RetType> RetType submit(TaskAction<RetType> action) throws 
IOException
+      {
+        if (action instanceof RetrieveUsedSegmentsAction) {
+          @SuppressWarnings("unchecked")
+          RetType retVal = (RetType) segments;
+          return retVal;
+        }
+        return taskActionClient.submit(action);
+      }
+    };
+
+    task.determineLockGranularityAndTryLock(segmentAwareClient, 
List.of(testInterval));
+    Assert.assertEquals(LockGranularity.TIME_CHUNK, 
task.getTaskLockHelper().getLockGranularityToUse());
+  }
+
+  @Test
+  public void testNativeMinorCompactionSubtaskUsesTimeChunkLock() throws 
Exception
+  {
+    final Interval testInterval = 
Intervals.of("2024-11-18T00:00:00.000Z/2024-11-25T00:00:00.000Z");
+    final List<DataSegment> segments = List.of(
+        createSegmentWithPartition(testInterval, "v1", 0),
+        createSegmentWithPartition(testInterval, "v1", 1)
+    );
+    final MinorCompactionInputSpec spec = new MinorCompactionInputSpec(
+        testInterval,
+        
segments.stream().map(DataSegment::toDescriptor).collect(Collectors.toList())
+    );
+
+    final CompactionTask compactionTask = new Builder(DATA_SOURCE, 
segmentCacheManagerFactory)
+        .inputSpec(spec, true)
+        .context(Map.of(Tasks.USE_CONCURRENT_LOCKS, true))
+        .build();
+
+    final NativeCompactionRunner runner = new 
NativeCompactionRunner(segmentCacheManagerFactory);
+    final Map<String, Object> subtaskContext = 
runner.createContextForSubtask(compactionTask);
+
+    final DataSchema dataSchema = DataSchema.builder()
+        .withDataSource(DATA_SOURCE)
+        .withTimestamp(new TimestampSpec(TIMESTAMP_COLUMN, null, null))
+        .withDimensions(
+            new DimensionsSpec(List.of(new StringDimensionSchema("dim1"), new 
StringDimensionSchema("dim2")))
+        )
+        .withGranularity(
+            new UniformGranularitySpec(Granularities.DAY, Granularities.HOUR, 
false, List.of(testInterval))
+        )
+        .build();
+
+    final List<ParallelIndexIngestionSpec> ingestionSpecs = 
NativeCompactionRunner.createIngestionSpecs(
+        Map.of(new MultipleIntervalSegmentSpec(List.of(testInterval)), 
dataSchema),
+        toolbox,
+        new CompactionIOConfig(spec, false, null),
+        new PartitionConfigurationManager(null),
+        COORDINATOR_CLIENT,
+        segmentCacheManagerFactory
+    );
+
+    final ParallelIndexSupervisorTask subtask = new 
ParallelIndexSupervisorTask(
+        "test_subtask",
+        "test_group",
+        null,
+        ingestionSpecs.get(0),
+        "base_0",
+        subtaskContext,
+        true
+    );
+
+    taskActionTestKit.getTaskLockbox().add(subtask);
+    final TaskActionClient taskActionClient = new LocalTaskActionClient(
+        subtask,
+        taskActionTestKit.getTaskActionToolbox()
+    );
+    final TaskActionClient segmentAwareClient = new TaskActionClient()
+    {
+      @Override
+      public <RetType> RetType submit(TaskAction<RetType> action) throws 
IOException
+      {
+        if (action instanceof RetrieveUsedSegmentsAction) {
+          @SuppressWarnings("unchecked")
+          RetType retVal = (RetType) segments;
+          return retVal;
+        }
+        return taskActionClient.submit(action);
+      }
+    };
+    subtask.determineLockGranularityAndTryLock(segmentAwareClient, 
List.of(testInterval));
+
+    Assert.assertEquals(
+        LockGranularity.TIME_CHUNK,
+        subtask.getTaskLockHelper().getLockGranularityToUse()
+    );
+  }
+
+  @Test
+  public void testSegmentProviderCheckSegmentsAllowsSubsetForTimeChunk() 
throws Exception
+  {
+    final Interval testInterval = 
Intervals.of("2024-11-18T00:00:00.000Z/2024-11-25T00:00:00.000Z");
+    final List<DataSegment> allSegments = List.of(
+        createSegmentWithPartition(testInterval, "v1", 0),
+        createSegmentWithPartition(testInterval, "v1", 1),
+        createSegmentWithPartition(testInterval, "v1", 2)
+    );
+    final MinorCompactionInputSpec spec = new MinorCompactionInputSpec(
+        testInterval,
+        List.of(allSegments.get(0).toDescriptor(), 
allSegments.get(1).toDescriptor())
+    );
+
+    final SegmentProvider provider = new SegmentProvider(DATA_SOURCE, spec);
+    final TestTaskActionClient client = new TestTaskActionClient(allSegments);
+    provider.findSegments(client);
+
+    // Should not throw: specified segments (0,1) exist; segment 2 is not in 
spec but is in interval (will be upgraded)
+    provider.checkSegments(LockGranularity.TIME_CHUNK, allSegments);
+  }
+
+  @Test
+  public void testDruidInputSourceReceivesSegmentIdsForMinorCompaction()
+  {
+    final Interval interval = Intervals.of("2024-01-01/2024-01-02");
+    final List<DataSegment> segments = List.of(
+        createSegmentWithPartition(interval, "v1", 0),
+        createSegmentWithPartition(interval, "v1", 1)
+    );
+    final MinorCompactionInputSpec spec = new MinorCompactionInputSpec(
+        interval,
+        
segments.stream().map(DataSegment::toDescriptor).collect(Collectors.toList())
+    );
+
+    final DataSchema dataSchema = DataSchema.builder()
+        .withDataSource(DATA_SOURCE)
+        .withTimestamp(new TimestampSpec(TIMESTAMP_COLUMN, null, null))
+        .withDimensions(
+            new DimensionsSpec(
+                List.of(
+                    new StringDimensionSchema("dim1"),
+                    new StringDimensionSchema("dim2")
+                )
+            )
+        )
+        .withGranularity(
+            new UniformGranularitySpec(
+                Granularities.DAY,
+                Granularities.HOUR,
+                false,
+                List.of(interval)
+            )
+        )
+        .build();
+
+    final List<ParallelIndexIngestionSpec> ingestionSpecs = 
NativeCompactionRunner.createIngestionSpecs(
+        Map.of(new MultipleIntervalSegmentSpec(List.of(interval)), dataSchema),
+        toolbox,
+        new CompactionIOConfig(spec, false, null),
+        new PartitionConfigurationManager(null),
+        COORDINATOR_CLIENT,
+        segmentCacheManagerFactory
+    );
+
+    Assert.assertEquals(1, ingestionSpecs.size());
+    final InputSource inputSource = 
ingestionSpecs.get(0).getIOConfig().getInputSource();
+    Assert.assertTrue(inputSource instanceof DruidInputSource);
+    final DruidInputSource druidInputSource = (DruidInputSource) inputSource;
+    Assert.assertNotNull(druidInputSource.getSegmentIds());
+    Assert.assertEquals(2, druidInputSource.getSegmentIds().size());
+  }
+
+  private DataSegment createSegmentWithPartition(Interval interval, String 
version, int partitionNum)
+  {
+    return DataSegment.builder(SegmentId.of(DATA_SOURCE, interval, version, 
partitionNum))
+        .shardSpec(new NumberedShardSpec(partitionNum, 0))
+        .size(100)
+        .build();
+  }
+
   private static class TestTaskActionClient implements TaskActionClient
   {
     private final List<DataSegment> segments;
@@ -2045,10 +2280,13 @@ public class CompactionTaskTest
     @Override
     public <RetType> RetType submit(TaskAction<RetType> taskAction)
     {
-      if (!(taskAction instanceof RetrieveUsedSegmentsAction)) {
-        throw new ISE("action[%s] is not supported", taskAction);
+      if (taskAction instanceof RetrieveUsedSegmentsAction) {
+        return (RetType) segments;
+      }
+      if (taskAction instanceof MarkSegmentToUpgradeAction) {
+        return (RetType) Integer.valueOf(0);
       }
-      return (RetType) segments;
+      throw new ISE("action[%s] is not supported", taskAction);
     }
   }
 
@@ -2315,7 +2553,7 @@ public class CompactionTaskTest
     @Override
     public Set<ResourceAction> getInputSourceResources()
     {
-      return ImmutableSet.of();
+      return Set.of();
     }
 
     @JsonProperty
diff --git 
a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/MinorCompactionInputSpecTest.java
 
b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/MinorCompactionInputSpecTest.java
index c5a6f2fda03..71c59bc4a4a 100644
--- 
a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/MinorCompactionInputSpecTest.java
+++ 
b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/MinorCompactionInputSpecTest.java
@@ -47,7 +47,7 @@ public class MinorCompactionInputSpecTest
 
     Assert.assertEquals(spec, deserialized);
     Assert.assertEquals(interval, deserialized.getInterval());
-    Assert.assertEquals(segments, deserialized.getUncompactedSegments());
+    Assert.assertEquals(segments, deserialized.getSegments());
   }
 
   @Test
@@ -55,18 +55,18 @@ public class MinorCompactionInputSpecTest
   {
     ObjectMapper mapper = new DefaultObjectMapper();
     String clientJson = "{"
-                        + "\"type\":\"uncompacted\","
+                        + "\"type\":\"minor\","
                         + "\"interval\":\"2015-04-11/2015-04-12\","
-                        + 
"\"uncompactedSegments\":[{\"itvl\":\"2015-04-11/2015-04-12\",\"ver\":\"v1\",\"part\":0}]"
+                        + 
"\"segments\":[{\"itvl\":\"2015-04-11/2015-04-12\",\"ver\":\"v1\",\"part\":0}]"
                         + "}";
 
     MinorCompactionInputSpec deserialized = mapper.readValue(clientJson, 
MinorCompactionInputSpec.class);
 
     Assert.assertEquals(Intervals.of("2015-04-11/2015-04-12"), 
deserialized.getInterval());
-    Assert.assertEquals(1, deserialized.getUncompactedSegments().size());
+    Assert.assertEquals(1, deserialized.getSegments().size());
     Assert.assertEquals(
         new SegmentDescriptor(Intervals.of("2015-04-11/2015-04-12"), "v1", 0),
-        deserialized.getUncompactedSegments().get(0)
+        deserialized.getSegments().get(0)
     );
   }
 
diff --git 
a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/TaskLockHelperTest.java
 
b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/TaskLockHelperTest.java
new file mode 100644
index 00000000000..3a47c5a0ab7
--- /dev/null
+++ 
b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/TaskLockHelperTest.java
@@ -0,0 +1,224 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.indexing.common.task;
+
+import com.google.common.collect.ImmutableList;
+import org.apache.druid.java.util.common.DateTimes;
+import org.apache.druid.java.util.common.ISE;
+import org.apache.druid.java.util.common.Intervals;
+import org.apache.druid.timeline.DataSegment;
+import org.apache.druid.timeline.SegmentId;
+import org.apache.druid.timeline.partition.NumberedOverwriteShardSpec;
+import org.apache.druid.timeline.partition.PartitionIds;
+import org.joda.time.Interval;
+import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.Test;
+
+import java.util.Collections;
+import java.util.List;
+
+public class TaskLockHelperTest
+{
+  private static final String DATA_SOURCE = "test_datasource";
+  private static final Interval TEST_INTERVAL = 
Intervals.of("2017-01-01/2017-01-02");
+  private static final String TEST_VERSION = DateTimes.nowUtc().toString();
+
+  @Test
+  public void testVerifyNonConsecutiveSegmentsInInputFails()
+  {
+    // Test that non-consecutive segments within the input list fail.
+    // Compacting segments {0, 1, 3} should fail because root partition 2 is 
missing.
+    final List<DataSegment> segments = ImmutableList.of(
+        createSegment(0, 0, 1, (short) 1, (short) 1),  // rootPartitionRange 
[0, 1)
+        createSegment(1, 1, 2, (short) 1, (short) 1),   // rootPartitionRange 
[1, 2)
+        createSegment(3, 3, 4, (short) 1, (short) 1)    // rootPartitionRange 
[3, 4)
+    );
+
+    Assertions.assertThrows(
+        ISE.class,
+        () -> 
TaskLockHelper.verifyRootPartitionIsAdjacentAndAtomicUpdateGroupIsFull(segments)
+    );
+  }
+
+  @Test
+  public void testVerifyConsecutiveSegmentsSucceedEvenIfOtherSegmentsMissing()
+  {
+    final List<DataSegment> segments = ImmutableList.of(
+        createSegment(3, 3, 4, (short) 1, (short) 1),  // rootPartitionRange 
[3, 4)
+        createSegment(4, 4, 5, (short) 1, (short) 1),   // rootPartitionRange 
[4, 5)
+        createSegment(5, 5, 6, (short) 1, (short) 1)   // rootPartitionRange 
[5, 6)
+    );
+
+    
TaskLockHelper.verifyRootPartitionIsAdjacentAndAtomicUpdateGroupIsFull(segments);
+  }
+
+  @Test
+  public void testVerifyConsecutiveSegmentsStillWorks()
+  {
+    final List<DataSegment> segments = ImmutableList.of(
+        createSegment(0, 0, 1, (short) 1, (short) 1),
+        createSegment(1, 1, 2, (short) 1, (short) 1),
+        createSegment(2, 2, 3, (short) 1, (short) 1)
+    );
+
+    
TaskLockHelper.verifyRootPartitionIsAdjacentAndAtomicUpdateGroupIsFull(segments);
+  }
+
+  @Test
+  public void testVerifyLargeGapSegmentsFails()
+  {
+    final List<DataSegment> segments = ImmutableList.of(
+        createSegment(0, 0, 1, (short) 1, (short) 1),
+        createSegment(1, 1, 2, (short) 1, (short) 1),
+        createSegment(10, 10, 11, (short) 1, (short) 1)
+    );
+
+    Assertions.assertThrows(
+        ISE.class,
+        () -> 
TaskLockHelper.verifyRootPartitionIsAdjacentAndAtomicUpdateGroupIsFull(segments)
+    );
+  }
+
+  @Test
+  public void testVerifyAtomicUpdateGroupValidationStillWorks()
+  {
+    final List<DataSegment> segments = ImmutableList.of(
+        createSegment(0, 0, 1, (short) 1, (short) 2),
+        createSegment(1, 0, 1, (short) 1, (short) 2)
+    );
+
+    
TaskLockHelper.verifyRootPartitionIsAdjacentAndAtomicUpdateGroupIsFull(segments);
+  }
+
+  @Test
+  public void testVerifyAtomicUpdateGroupIncompleteFails()
+  {
+    final List<DataSegment> segments = ImmutableList.of(
+        createSegment(0, 0, 1, (short) 1, (short) 3),
+        createSegment(1, 0, 1, (short) 1, (short) 3)
+    );
+
+    // Should throw ISE because atomicUpdateGroupSize is 3 but we only have 2 
segments
+    Assertions.assertThrows(
+        ISE.class,
+        () -> 
TaskLockHelper.verifyRootPartitionIsAdjacentAndAtomicUpdateGroupIsFull(segments)
+    );
+  }
+
+  @Test
+  public void testVerifyDifferentMinorVersionsFail()
+  {
+    // Test that segments with same root partition range but different minor 
versions fail
+    final List<DataSegment> segments = ImmutableList.of(
+        createSegment(0, 0, 1, (short) 1, (short) 2),
+        createSegment(1, 0, 1, (short) 2, (short) 2) // Different minor version
+    );
+    Assertions.assertThrows(
+        ISE.class,
+        () -> 
TaskLockHelper.verifyRootPartitionIsAdjacentAndAtomicUpdateGroupIsFull(segments)
+    );
+  }
+
+  @Test
+  public void testVerifyDifferentAtomicUpdateGroupSizesFail()
+  {
+    // Test that segments with same root partition range but different 
atomicUpdateGroupSize fail
+    final List<DataSegment> segments = ImmutableList.of(
+        createSegment(0, 0, 1, (short) 1, (short) 2),
+        createSegment(1, 0, 1, (short) 1, (short) 3)
+    );
+    Assertions.assertThrows(
+        ISE.class,
+        () -> 
TaskLockHelper.verifyRootPartitionIsAdjacentAndAtomicUpdateGroupIsFull(segments)
+    );
+  }
+
+  @Test
+  public void testVerifyEmptySegmentsList()
+  {
+    final List<DataSegment> segments = Collections.emptyList();
+    
TaskLockHelper.verifyRootPartitionIsAdjacentAndAtomicUpdateGroupIsFull(segments);
+  }
+
+  @Test
+  public void testVerifySingleSegment()
+  {
+    final List<DataSegment> segments = ImmutableList.of(
+        createSegment(0, 0, 1, (short) 1, (short) 1)
+    );
+    
TaskLockHelper.verifyRootPartitionIsAdjacentAndAtomicUpdateGroupIsFull(segments);
+  }
+
+  @Test
+  public void testVerifyDifferentIntervalsFail()
+  {
+    final Interval interval1 = Intervals.of("2017-01-01/2017-01-02");
+    final Interval interval2 = Intervals.of("2017-01-02/2017-01-03");
+    final List<DataSegment> segments = ImmutableList.of(
+        DataSegment.builder(SegmentId.of(DATA_SOURCE, interval1, TEST_VERSION, 
0))
+            .shardSpec(new NumberedOverwriteShardSpec(
+                PartitionIds.NON_ROOT_GEN_START_PARTITION_ID,
+                0,
+                1,
+                (short) 1,
+                (short) 1
+            ))
+            .size(0)
+            .build(),
+        DataSegment.builder(SegmentId.of(DATA_SOURCE, interval2, TEST_VERSION, 
0))
+            .shardSpec(new NumberedOverwriteShardSpec(
+                PartitionIds.NON_ROOT_GEN_START_PARTITION_ID + 1,
+                1,
+                2,
+                (short) 1,
+                (short) 1
+            ))
+            .size(0)
+            .build()
+    );
+    Assertions.assertThrows(
+        IllegalArgumentException.class,
+        () -> 
TaskLockHelper.verifyRootPartitionIsAdjacentAndAtomicUpdateGroupIsFull(segments)
+    );
+  }
+
+  /**
+   * Helper method to create a test segment with NumberedOverwriteShardSpec
+   */
+  private DataSegment createSegment(
+      int partitionId,
+      int startRootPartitionId,
+      int endRootPartitionId,
+      short minorVersion,
+      short atomicUpdateGroupSize
+  )
+  {
+    return DataSegment.builder(SegmentId.of(DATA_SOURCE, TEST_INTERVAL, 
TEST_VERSION, partitionId))
+        .shardSpec(new NumberedOverwriteShardSpec(
+            PartitionIds.NON_ROOT_GEN_START_PARTITION_ID + partitionId,
+            startRootPartitionId,
+            endRootPartitionId,
+            minorVersion,
+            atomicUpdateGroupSize
+        ))
+        .size(0)
+        .build();
+  }
+}
diff --git 
a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/batch/parallel/PartialCompactionTest.java
 
b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/batch/parallel/PartialCompactionTest.java
index 9ef316e8aaa..882c1c73bb1 100644
--- 
a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/batch/parallel/PartialCompactionTest.java
+++ 
b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/batch/parallel/PartialCompactionTest.java
@@ -31,7 +31,8 @@ import 
org.apache.druid.indexer.partitions.SingleDimensionPartitionsSpec;
 import org.apache.druid.indexing.common.LockGranularity;
 import org.apache.druid.indexing.common.task.CompactionTask;
 import org.apache.druid.indexing.common.task.CompactionTask.Builder;
-import org.apache.druid.indexing.common.task.SpecificSegmentsSpec;
+import org.apache.druid.indexing.common.task.MinorCompactionInputSpec;
+import org.apache.druid.indexing.common.task.Tasks;
 import org.apache.druid.java.util.common.Intervals;
 import org.apache.druid.java.util.common.StringUtils;
 import org.apache.druid.segment.DataSegmentsWithSchemas;
@@ -50,8 +51,11 @@ import java.nio.file.Files;
 import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.Comparator;
+import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
+import java.util.Set;
+import java.util.stream.Collectors;
 
 public class PartialCompactionTest extends 
AbstractMultiPhaseParallelIndexingTest
 {
@@ -73,7 +77,7 @@ public class PartialCompactionTest extends 
AbstractMultiPhaseParallelIndexingTes
 
   public PartialCompactionTest()
   {
-    super(LockGranularity.SEGMENT, true, DEFAULT_TRANSIENT_TASK_FAILURE_RATE, 
DEFAULT_TRANSIENT_API_FAILURE_RATE);
+    super(LockGranularity.TIME_CHUNK, true, 
DEFAULT_TRANSIENT_TASK_FAILURE_RATE, DEFAULT_TRANSIENT_API_FAILURE_RATE);
   }
 
   @Before
@@ -137,8 +141,14 @@ public class PartialCompactionTest extends 
AbstractMultiPhaseParallelIndexingTes
       );
     }
     final CompactionTask compactionTask = newCompactionTaskBuilder()
-        .inputSpec(SpecificSegmentsSpec.fromSegments(segmentsToCompact))
+        .inputSpec(
+            new MinorCompactionInputSpec(
+                INTERVAL_TO_INDEX,
+                
segmentsToCompact.stream().map(DataSegment::toDescriptor).collect(Collectors.toList())
+            ), true
+        )
         .tuningConfig(newTuningConfig(new DynamicPartitionsSpec(20, null), 2, 
false))
+        .context(Map.of(Tasks.USE_CONCURRENT_LOCKS, true))
         .build();
     dataSegmentsWithSchemas = runTask(compactionTask, TaskState.SUCCESS);
     verifySchema(dataSegmentsWithSchemas);
@@ -197,8 +207,14 @@ public class PartialCompactionTest extends 
AbstractMultiPhaseParallelIndexingTes
       );
     }
     final CompactionTask compactionTask = newCompactionTaskBuilder()
-        .inputSpec(SpecificSegmentsSpec.fromSegments(segmentsToCompact))
+        .inputSpec(
+            new MinorCompactionInputSpec(
+                INTERVAL_TO_INDEX,
+                
segmentsToCompact.stream().map(DataSegment::toDescriptor).collect(Collectors.toList())
+            ), true
+        )
         .tuningConfig(newTuningConfig(new DynamicPartitionsSpec(20, null), 2, 
false))
+        .context(Map.of(Tasks.USE_CONCURRENT_LOCKS, true))
         .build();
 
     dataSegmentsWithSchemas = runTask(compactionTask, TaskState.SUCCESS);
@@ -213,6 +229,138 @@ public class PartialCompactionTest extends 
AbstractMultiPhaseParallelIndexingTes
     }
   }
 
+  @Test
+  public void testMinorCompactionUpgradesNonCompactedSegments()
+  {
+    DataSegmentsWithSchemas dataSegmentsWithSchemas = runTestTask(
+        new HashedPartitionsSpec(null, 4, null),
+        TaskState.SUCCESS,
+        false
+    );
+    verifySchema(dataSegmentsWithSchemas);
+    final Map<Interval, List<DataSegment>> hashPartitionedSegments =
+        
SegmentUtils.groupSegmentsByInterval(dataSegmentsWithSchemas.getSegments());
+    hashPartitionedSegments.values().forEach(
+        segmentsInInterval -> segmentsInInterval.sort(
+            Comparator.comparing(segment -> 
segment.getShardSpec().getPartitionNum())
+        )
+    );
+
+    final List<DataSegment> segmentsToCompact = new ArrayList<>();
+    for (List<DataSegment> segmentsInInterval : 
hashPartitionedSegments.values()) {
+      segmentsToCompact.addAll(segmentsInInterval.subList(0, Math.min(2, 
segmentsInInterval.size())));
+    }
+    final Set<DataSegment> originalSegments = 
dataSegmentsWithSchemas.getSegments();
+    final Set<String> compactedSegmentIds = segmentsToCompact.stream()
+                                                             .map(segment -> 
segment.getId().toString())
+                                                             
.collect(Collectors.toSet());
+    final Set<String> nonCompactedSegmentIds =
+        originalSegments.stream()
+                        .map(segment -> segment.getId().toString())
+                        .filter(segmentId -> 
!compactedSegmentIds.contains(segmentId))
+                        .collect(Collectors.toSet());
+    Assert.assertFalse(nonCompactedSegmentIds.isEmpty());
+    final Set<String> originalSegmentIds = new HashSet<>(compactedSegmentIds);
+    originalSegmentIds.addAll(nonCompactedSegmentIds);
+
+    final CompactionTask compactionTask = newCompactionTaskBuilder()
+        .inputSpec(
+            new MinorCompactionInputSpec(
+                INTERVAL_TO_INDEX,
+                
segmentsToCompact.stream().map(DataSegment::toDescriptor).collect(Collectors.toList())
+            ), true
+        )
+        .tuningConfig(newTuningConfig(new DynamicPartitionsSpec(20, null), 2, 
false))
+        .context(Map.of(Tasks.USE_CONCURRENT_LOCKS, true))
+        .build();
+    dataSegmentsWithSchemas = runTask(compactionTask, TaskState.SUCCESS);
+    verifySchema(dataSegmentsWithSchemas);
+
+    // Check published segment set after compaction
+    final Set<DataSegment> publishedAfterCompaction = 
dataSegmentsWithSchemas.getSegments();
+    
Assert.assertFalse(SegmentUtils.groupSegmentsByInterval(publishedAfterCompaction).isEmpty());
+
+    final Set<String> finalSegmentIds = publishedAfterCompaction.stream()
+                                                                .map(segment 
-> segment.getId().toString())
+                                                                
.collect(Collectors.toSet());
+
+    final Map<String, String> upgradedFromSegmentIdMap =
+        getStorageCoordinator().retrieveUpgradedFromSegmentIds(DATASOURCE, 
finalSegmentIds);
+    Assert.assertFalse(upgradedFromSegmentIdMap.isEmpty());
+    
Assert.assertTrue(upgradedFromSegmentIdMap.values().stream().noneMatch(compactedSegmentIds::contains));
+    
Assert.assertTrue(originalSegmentIds.containsAll(upgradedFromSegmentIdMap.values()));
+    for (final String successorSegmentId : upgradedFromSegmentIdMap.keySet()) {
+      Assert.assertTrue(finalSegmentIds.contains(successorSegmentId));
+    }
+
+    // Validate new segment ids (replacements and/or upgraded replicas)
+    final Set<String> newPublishedSegmentIds = new HashSet<>(finalSegmentIds);
+    newPublishedSegmentIds.removeAll(originalSegmentIds);
+    Assert.assertFalse(newPublishedSegmentIds.isEmpty());
+    Assert.assertTrue(
+        newPublishedSegmentIds.stream().anyMatch(id -> 
!upgradedFromSegmentIdMap.containsKey(id))
+    );
+
+    // Index newly published ids by day for compacted-source checks below.
+    final Map<Interval, Set<String>> newSegmentIdsByInterval =
+        publishedAfterCompaction.stream()
+                                .filter(segment -> !segment.isTombstone()
+                                               && 
newPublishedSegmentIds.contains(segment.getId().toString())
+                                )
+                                .collect(Collectors.groupingBy(
+                                    DataSegment::getInterval,
+                                    Collectors.mapping(
+                                        segment -> segment.getId().toString(),
+                                        Collectors.toCollection(HashSet::new)
+                                    )
+                                ));
+
+    // Verify non-compacted segments are being replaced
+    for (final String parentSegmentId : nonCompactedSegmentIds) {
+      final List<String> successorSegmentIds = 
upgradedFromSegmentIdMap.entrySet()
+                                                                       
.stream()
+                                                                       
.filter(e -> parentSegmentId.equals(e.getValue()))
+                                                                       
.map(Map.Entry::getKey)
+                                                                       
.toList();
+      if (finalSegmentIds.contains(parentSegmentId)) {
+        Assert.assertTrue(successorSegmentIds.isEmpty());
+      } else if (!successorSegmentIds.isEmpty()) {
+        Assert.assertEquals(1, successorSegmentIds.size());
+        
Assert.assertTrue(finalSegmentIds.contains(successorSegmentIds.get(0)));
+      }
+    }
+
+    // Verify compacted segments have new published ID
+    for (final DataSegment compactedSource : segmentsToCompact) {
+      final String compactedSourceId = compactedSource.getId().toString();
+      Assert.assertFalse(finalSegmentIds.contains(compactedSourceId));
+      final Set<String> newIdsInSameInterval = 
newSegmentIdsByInterval.getOrDefault(compactedSource.getInterval(), Set.of());
+      Assert.assertFalse(newIdsInSameInterval.isEmpty());
+    }
+
+    // non-compacted parents removed from published set match 
retrieveUpgradedToSegmentIds
+    final Set<String> removedNonCompactedParentIds =
+        nonCompactedSegmentIds.stream().filter(id -> 
!finalSegmentIds.contains(id)).collect(Collectors.toSet());
+    if (!removedNonCompactedParentIds.isEmpty()) {
+      final Map<String, Set<String>> upgradedToSegmentIdsByParent =
+          getStorageCoordinator().retrieveUpgradedToSegmentIds(DATASOURCE, 
removedNonCompactedParentIds);
+      for (final String parentSegmentId : removedNonCompactedParentIds) {
+        final Set<String> expectedSuccessorIds = 
upgradedFromSegmentIdMap.entrySet()
+                                                                         
.stream()
+                                                                         
.filter(e -> parentSegmentId.equals(e.getValue()))
+                                                                         
.map(Map.Entry::getKey)
+                                                                         
.collect(Collectors.toSet());
+        if (expectedSuccessorIds.isEmpty()) {
+          continue;
+        }
+        final Set<String> coordinatorSuccessorIds =
+            new 
HashSet<>(upgradedToSegmentIdsByParent.getOrDefault(parentSegmentId, Set.of()));
+        coordinatorSuccessorIds.remove(parentSegmentId);
+        
Assert.assertTrue(coordinatorSuccessorIds.containsAll(expectedSuccessorIds));
+      }
+    }
+  }
+
   private DataSegmentsWithSchemas runTestTask(
       PartitionsSpec partitionsSpec,
       TaskState expectedTaskState,
diff --git 
a/server/src/main/java/org/apache/druid/client/indexing/ClientMinorCompactionInputSpec.java
 
b/server/src/main/java/org/apache/druid/client/indexing/ClientMinorCompactionInputSpec.java
index 751ec8d462a..50e5dd44bd5 100644
--- 
a/server/src/main/java/org/apache/druid/client/indexing/ClientMinorCompactionInputSpec.java
+++ 
b/server/src/main/java/org/apache/druid/client/indexing/ClientMinorCompactionInputSpec.java
@@ -35,22 +35,22 @@ import java.util.stream.Collectors;
  */
 public class ClientMinorCompactionInputSpec extends 
ClientCompactionIntervalSpec
 {
-  public static final String TYPE = "uncompacted";
+  public static final String TYPE = "minor";
 
-  private final List<SegmentDescriptor> uncompactedSegments;
+  private final List<SegmentDescriptor> segments;
 
   @JsonCreator
   public ClientMinorCompactionInputSpec(
       @JsonProperty("interval") Interval interval,
-      @JsonProperty("uncompactedSegments") List<SegmentDescriptor> 
uncompactedSegments
+      @JsonProperty("segments") List<SegmentDescriptor> segments
   )
   {
     super(interval, null);
-    if (uncompactedSegments == null || uncompactedSegments.isEmpty()) {
-      throw InvalidInput.exception("'uncompactedSegments' must be non-empty.");
+    if (segments == null || segments.isEmpty()) {
+      throw InvalidInput.exception("'segments' must be non-empty.");
     } else if (interval != null) {
       List<SegmentDescriptor> segmentsNotInInterval =
-          uncompactedSegments.stream().filter(s -> 
!interval.contains(s.getInterval())).collect(Collectors.toList());
+          segments.stream().filter(s -> 
!interval.contains(s.getInterval())).collect(Collectors.toList());
       if (!segmentsNotInInterval.isEmpty()) {
         throw new IAE(
             "Can not supply segments outside interval[%s], got segments[%s].",
@@ -59,13 +59,13 @@ public class ClientMinorCompactionInputSpec extends 
ClientCompactionIntervalSpec
         );
       }
     }
-    this.uncompactedSegments = uncompactedSegments;
+    this.segments = segments;
   }
 
   @JsonProperty
-  public List<SegmentDescriptor> getUncompactedSegments()
+  public List<SegmentDescriptor> getSegments()
   {
-    return uncompactedSegments;
+    return segments;
   }
 
   @Override
@@ -81,13 +81,13 @@ public class ClientMinorCompactionInputSpec extends 
ClientCompactionIntervalSpec
       return false;
     }
     ClientMinorCompactionInputSpec that = (ClientMinorCompactionInputSpec) 
object;
-    return Objects.equals(uncompactedSegments, that.uncompactedSegments);
+    return Objects.equals(segments, that.segments);
   }
 
   @Override
   public int hashCode()
   {
-    return Objects.hash(super.hashCode(), uncompactedSegments);
+    return Objects.hash(super.hashCode(), segments);
   }
 
   @Override
@@ -95,7 +95,7 @@ public class ClientMinorCompactionInputSpec extends 
ClientCompactionIntervalSpec
   {
     return "ClientMinorCompactionInputSpec{" +
            "interval=" + getInterval() +
-           ",uncompactedSegments=" + uncompactedSegments +
+           ",segments=" + segments +
            '}';
   }
 }
diff --git 
a/server/src/test/java/org/apache/druid/client/indexing/ClientCompactionIntervalSpecTest.java
 
b/server/src/test/java/org/apache/druid/client/indexing/ClientCompactionIntervalSpecTest.java
index 7e116a271eb..c6a41d7fe81 100644
--- 
a/server/src/test/java/org/apache/druid/client/indexing/ClientCompactionIntervalSpecTest.java
+++ 
b/server/src/test/java/org/apache/druid/client/indexing/ClientCompactionIntervalSpecTest.java
@@ -136,7 +136,7 @@ public class ClientCompactionIntervalSpecTest
     ObjectMapper mapper = new DefaultObjectMapper();
     Interval interval = Intervals.of("2015-04-11/2015-04-12");
 
-    // Test without uncompactedSegments (full compaction)
+    // Test without segments (full compaction)
     ClientCompactionIntervalSpec withoutSegments = new 
ClientCompactionIntervalSpec(interval, null);
     String json2 = mapper.writeValueAsString(withoutSegments);
     ClientCompactionIntervalSpec deserialized2 = mapper.readValue(json2, 
ClientCompactionIntervalSpec.class);
@@ -152,12 +152,12 @@ public class ClientCompactionIntervalSpecTest
         new SegmentDescriptor(Intervals.of("2015-04-11/2015-04-12"), "v1", 0)
     );
 
-    // Test with uncompactedSegments (minor compaction)
+    // Test with segments (minor compaction)
     ClientCompactionInputSpec withSegments = new 
ClientMinorCompactionInputSpec(interval, segments);
     String json1 = mapper.writeValueAsString(withSegments);
     ClientCompactionInputSpec deserialized1 = mapper.readValue(json1, 
ClientCompactionIntervalSpec.class);
     Assert.assertTrue(deserialized1 instanceof ClientMinorCompactionInputSpec);
     Assert.assertEquals(withSegments, deserialized1);
-    Assert.assertEquals(segments, ((ClientMinorCompactionInputSpec) 
deserialized1).getUncompactedSegments());
+    Assert.assertEquals(segments, ((ClientMinorCompactionInputSpec) 
deserialized1).getSegments());
   }
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to