kfaraz commented on code in PR #19059:
URL: https://github.com/apache/druid/pull/19059#discussion_r2871815005


##########
indexing-service/src/main/java/org/apache/druid/indexing/common/task/UncompactedInputSpec.java:
##########
@@ -0,0 +1,132 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.indexing.common.task;
+
+import com.fasterxml.jackson.annotation.JsonCreator;
+import com.fasterxml.jackson.annotation.JsonProperty;
+import org.apache.druid.indexing.common.LockGranularity;
+import org.apache.druid.java.util.common.IAE;
+import org.apache.druid.java.util.common.JodaUtils;
+import org.apache.druid.query.SegmentDescriptor;
+import org.apache.druid.timeline.DataSegment;
+import org.joda.time.Interval;
+
+import java.util.List;
+import java.util.Objects;
+import java.util.stream.Collectors;
+
+/**
+ * Specifies uncompacted segments to compact within an interval.
+ * Used for minor compaction to compact only uncompacted segments while 
leaving compacted segments untouched.
+ */
+public class UncompactedInputSpec implements CompactionInputSpec
+{
+  public static final String TYPE = "uncompacted";
+
+  private final Interval interval;
+  private final List<SegmentDescriptor> uncompactedSegments;
+
+  @JsonCreator
+  public UncompactedInputSpec(
+      @JsonProperty("interval") Interval interval,
+      @JsonProperty("uncompactedSegments") List<SegmentDescriptor> 
uncompactedSegments
+  )
+  {
+    if (interval == null) {
+      throw new IAE("Uncompacted interval must not be null");
+    }
+    if (interval.toDurationMillis() == 0) {
+      throw new IAE("Uncompacted interval[%s] is empty, must specify a 
nonempty interval", interval);
+    }
+    if (uncompactedSegments == null || uncompactedSegments.isEmpty()) {
+      throw new IAE("Uncompacted segments must not be null or empty");
+    }
+
+    // Validate that all segments are within the interval
+    List<SegmentDescriptor> segmentsNotInInterval =
+        uncompactedSegments.stream().filter(s -> 
!interval.contains(s.getInterval())).collect(Collectors.toList());
+    if (!segmentsNotInInterval.isEmpty()) {
+      throw new IAE(
+          "All uncompacted segments must be within interval[%s], got segments 
outside interval: %s",
+          interval,
+          segmentsNotInInterval
+      );
+    }

Review Comment:
   Nit: move this to a separate private method



##########
indexing-service/src/main/java/org/apache/druid/indexing/common/task/UncompactedInputSpec.java:
##########
@@ -0,0 +1,132 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.indexing.common.task;
+
+import com.fasterxml.jackson.annotation.JsonCreator;
+import com.fasterxml.jackson.annotation.JsonProperty;
+import org.apache.druid.indexing.common.LockGranularity;
+import org.apache.druid.java.util.common.IAE;
+import org.apache.druid.java.util.common.JodaUtils;
+import org.apache.druid.query.SegmentDescriptor;
+import org.apache.druid.timeline.DataSegment;
+import org.joda.time.Interval;
+
+import java.util.List;
+import java.util.Objects;
+import java.util.stream.Collectors;
+
+/**
+ * Specifies uncompacted segments to compact within an interval.
+ * Used for minor compaction to compact only uncompacted segments while 
leaving compacted segments untouched.
+ */
+public class UncompactedInputSpec implements CompactionInputSpec

Review Comment:
   May be call it `MinorCompactionInputSpec`?



##########
indexing-service/src/main/java/org/apache/druid/indexing/common/actions/MarkSegmentToUpgradeAction.java:
##########
@@ -0,0 +1,103 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.indexing.common.actions;
+
+import com.fasterxml.jackson.annotation.JsonCreator;
+import com.fasterxml.jackson.annotation.JsonProperty;
+import com.fasterxml.jackson.core.type.TypeReference;
+import org.apache.druid.error.InvalidInput;
+import org.apache.druid.indexing.common.task.Task;
+import org.apache.druid.metadata.ReplaceTaskLock;
+import org.apache.druid.timeline.DataSegment;
+
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+
+/**
+ * Task action that records segments as being upgraded in the metadata store.
+ * <p>
+ * This action is used during compaction to track which segments are being 
replaced.
+ * It validates that all segments to be upgraded are covered by
+ * {@link ReplaceTaskLock}s before inserting them into the upgrade segments 
table.
+ * <p>
+ * The action will fail if any of the upgrade segments do not have a 
corresponding
+ * replace lock, ensuring that only properly locked segments can be marked for 
upgrade.
+ *
+ * @return the number of segments successfully inserted into the upgrade 
segments table
+ */
+public class MarkSegmentToUpgradeAction implements TaskAction<Integer>
+{
+  private final String dataSource;
+  private final List<DataSegment> upgradeSegments;
+
+  /**
+   * @param dataSource the datasource containing the segments to upgrade
+   * @param upgradeSegments the list of segments to be recorded as upgraded
+   */
+  @JsonCreator
+  public MarkSegmentToUpgradeAction(
+      @JsonProperty("dataSource") String dataSource,
+      @JsonProperty("upgradeSegments") List<DataSegment> upgradeSegments
+  )
+  {
+    this.dataSource = dataSource;
+    this.upgradeSegments = upgradeSegments;
+  }
+
+  @JsonProperty
+  public String getDataSource()
+  {
+    return dataSource;
+  }
+
+  @JsonProperty
+  public List<DataSegment> getUpgradeSegments()
+  {
+    return upgradeSegments;
+  }
+
+  @Override
+  public TypeReference<Integer> getReturnTypeReference()
+  {
+    return new TypeReference<>()
+    {
+    };
+  }
+
+  @Override
+  public Integer perform(Task task, TaskActionToolbox toolbox)
+  {
+    final String datasource = task.getDataSource();
+    final Map<DataSegment, ReplaceTaskLock> segmentToReplaceLock
+        = TaskLocks.findReplaceLocksCoveringSegments(datasource, 
toolbox.getTaskLockbox(), Set.copyOf(upgradeSegments));
+
+    if (segmentToReplaceLock.size() < upgradeSegments.size()) {
+      throw InvalidInput.exception(
+          "Not all segments are hold by a replace lock, only [%d] segments out 
of total segments[%d] are hold by repalce lock",

Review Comment:
   ```suggestion
             "Segments to upgrade must be covered by a REPLACE lock. Only [%d] 
out of [%d] segments are covered.",
   ```



##########
indexing-service/src/main/java/org/apache/druid/indexing/common/actions/MarkSegmentToUpgradeAction.java:
##########
@@ -0,0 +1,103 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.indexing.common.actions;
+
+import com.fasterxml.jackson.annotation.JsonCreator;
+import com.fasterxml.jackson.annotation.JsonProperty;
+import com.fasterxml.jackson.core.type.TypeReference;
+import org.apache.druid.error.InvalidInput;
+import org.apache.druid.indexing.common.task.Task;
+import org.apache.druid.metadata.ReplaceTaskLock;
+import org.apache.druid.timeline.DataSegment;
+
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+
+/**
+ * Task action that records segments as being upgraded in the metadata store.
+ * <p>
+ * This action is used during compaction to track which segments are being 
replaced.
+ * It validates that all segments to be upgraded are covered by
+ * {@link ReplaceTaskLock}s before inserting them into the upgrade segments 
table.
+ * <p>
+ * The action will fail if any of the upgrade segments do not have a 
corresponding
+ * replace lock, ensuring that only properly locked segments can be marked for 
upgrade.
+ *
+ * @return the number of segments successfully inserted into the upgrade 
segments table
+ */
+public class MarkSegmentToUpgradeAction implements TaskAction<Integer>
+{
+  private final String dataSource;
+  private final List<DataSegment> upgradeSegments;
+
+  /**
+   * @param dataSource the datasource containing the segments to upgrade
+   * @param upgradeSegments the list of segments to be recorded as upgraded
+   */
+  @JsonCreator
+  public MarkSegmentToUpgradeAction(
+      @JsonProperty("dataSource") String dataSource,
+      @JsonProperty("upgradeSegments") List<DataSegment> upgradeSegments
+  )
+  {
+    this.dataSource = dataSource;
+    this.upgradeSegments = upgradeSegments;
+  }
+
+  @JsonProperty
+  public String getDataSource()
+  {
+    return dataSource;
+  }
+
+  @JsonProperty
+  public List<DataSegment> getUpgradeSegments()
+  {
+    return upgradeSegments;
+  }
+
+  @Override
+  public TypeReference<Integer> getReturnTypeReference()
+  {
+    return new TypeReference<>()
+    {
+    };
+  }
+
+  @Override
+  public Integer perform(Task task, TaskActionToolbox toolbox)
+  {
+    final String datasource = task.getDataSource();
+    final Map<DataSegment, ReplaceTaskLock> segmentToReplaceLock
+        = TaskLocks.findReplaceLocksCoveringSegments(datasource, 
toolbox.getTaskLockbox(), Set.copyOf(upgradeSegments));

Review Comment:
   Since we use a set here, I wonder if the constructor of this class should 
also accept a `Set` instead of `List`.
   Otherwise, it is possible for users to pass in duplicate segments and then 
they would an exception `Not all segments are hold by replace lock`, which 
might be confusing.



##########
indexing-service/src/main/java/org/apache/druid/indexing/common/task/UncompactedInputSpec.java:
##########
@@ -0,0 +1,132 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.indexing.common.task;
+
+import com.fasterxml.jackson.annotation.JsonCreator;
+import com.fasterxml.jackson.annotation.JsonProperty;
+import org.apache.druid.indexing.common.LockGranularity;
+import org.apache.druid.java.util.common.IAE;
+import org.apache.druid.java.util.common.JodaUtils;
+import org.apache.druid.query.SegmentDescriptor;
+import org.apache.druid.timeline.DataSegment;
+import org.joda.time.Interval;
+
+import java.util.List;
+import java.util.Objects;
+import java.util.stream.Collectors;
+
+/**
+ * Specifies uncompacted segments to compact within an interval.
+ * Used for minor compaction to compact only uncompacted segments while 
leaving compacted segments untouched.
+ */
+public class UncompactedInputSpec implements CompactionInputSpec
+{
+  public static final String TYPE = "uncompacted";
+
+  private final Interval interval;
+  private final List<SegmentDescriptor> uncompactedSegments;
+
+  @JsonCreator
+  public UncompactedInputSpec(
+      @JsonProperty("interval") Interval interval,
+      @JsonProperty("uncompactedSegments") List<SegmentDescriptor> 
uncompactedSegments
+  )
+  {
+    if (interval == null) {
+      throw new IAE("Uncompacted interval must not be null");

Review Comment:
   Please use `InvalidInput` exceptions instead.



##########
indexing-service/src/main/java/org/apache/druid/indexing/common/task/UncompactedInputSpec.java:
##########
@@ -0,0 +1,132 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.indexing.common.task;
+
+import com.fasterxml.jackson.annotation.JsonCreator;
+import com.fasterxml.jackson.annotation.JsonProperty;
+import org.apache.druid.indexing.common.LockGranularity;
+import org.apache.druid.java.util.common.IAE;
+import org.apache.druid.java.util.common.JodaUtils;
+import org.apache.druid.query.SegmentDescriptor;
+import org.apache.druid.timeline.DataSegment;
+import org.joda.time.Interval;
+
+import java.util.List;
+import java.util.Objects;
+import java.util.stream.Collectors;
+
+/**
+ * Specifies uncompacted segments to compact within an interval.
+ * Used for minor compaction to compact only uncompacted segments while 
leaving compacted segments untouched.

Review Comment:
   ```suggestion
    * Used for MSQ-based minor compaction to compact only uncompacted segments 
while upgrading compacted segments (i.e. no change to physical segment files).
   ```



##########
server/src/main/java/org/apache/druid/metadata/IndexerSQLMetadataStorageCoordinator.java:
##########
@@ -2186,12 +2222,13 @@ private Map<String, String> 
getAppendSegmentsCommittedDuringTask(
     );
 
     ResultIterator<Pair<String, String>> resultIterator = 
transaction.getHandle()

Review Comment:
   Nit: Move the `.getHandle()` to the next line to make the reformatting 
changes smaller and the code cleaner (less indented).



##########
indexing-service/src/main/java/org/apache/druid/indexing/compact/CompactionSupervisorSpec.java:
##########
@@ -96,12 +97,12 @@ public CompactionSupervisor createSupervisor()
   /**
    * @return {@link CompactionJobTemplate} used to create jobs for the 
supervisor.
    */
-  public CompactionJobTemplate getTemplate()
+  public CompactionJobTemplate getTemplate(CompactionStatusTracker 
statusTracker)

Review Comment:
   Please don't make this change. This breaks the clean template APIs already 
in place.
   Please follow the steps outlined here instead:
   https://github.com/apache/druid/pull/19059#discussion_r2864067544



##########
indexing-service/src/main/java/org/apache/druid/indexing/common/task/CompactionTask.java:
##########
@@ -572,17 +584,37 @@ static Map<Interval, DataSchema> 
createDataSchemasForIntervals(
       return Collections.emptyMap();
     }
 
+    if (segmentProvider.minorCompaction) {
+      Iterable<DataSegment> segmentsNotCompletelyWithinin =
+          Iterables.filter(timelineSegments, s -> 
!segmentProvider.interval.contains(s.getInterval()));
+      if (segmentsNotCompletelyWithinin.iterator().hasNext()) {
+        throw new ISE(

Review Comment:
   Please use `DruidException`.
   
   Is this case possible? Isn't this validation already done in the new 
`CompactionInputSpec` class?



##########
indexing-service/src/main/java/org/apache/druid/indexing/compact/CompactionConfigBasedJobTemplate.java:
##########
@@ -91,12 +96,35 @@ public List<CompactionJob> createCompactionJobs(
     // Create a job for each CompactionCandidate
     while (segmentIterator.hasNext()) {
       final CompactionCandidate candidate = segmentIterator.next();
+      final CompactionCandidateSearchPolicy.Eligibility eligibility =
+          params.getClusterCompactionConfig()
+                .getCompactionPolicy()
+                .checkEligibilityForCompaction(candidate, 
statusTracker.getLatestTaskStatus(candidate));
+      if (!eligibility.isEligible()) {
+        continue;
+      }
+      final CompactionCandidate finalCandidate;
+      switch (eligibility.getMode()) {
+        case ALL_SEGMENTS:
+          finalCandidate = candidate;
+          break;
+        case UNCOMPACTED_SEGMENTS_ONLY:
+          finalCandidate = CompactionCandidate.from(
+              candidate.getUncompactedSegments(),
+              null,
+              candidate.getCurrentStatus()
+          );

Review Comment:
   Why is this change needed? Is the candidate being changed in any way?



##########
indexing-service/src/main/java/org/apache/druid/indexing/common/task/CompactionTask.java:
##########
@@ -627,18 +659,35 @@ static Map<Interval, DataSchema> 
createDataSchemasForIntervals(
             projections,
             needMultiValuedColumns
         );
-        intervalDataSchemaMap.put(interval, dataSchema);
+        inputSchemas.put(
+            segmentProvider.minorCompaction
+            ? new MultipleSpecificSegmentSpec(segmentsToCompact.stream()
+                                                               
.map(DataSegment::toDescriptor)
+                                                               
.collect(Collectors.toList()))
+            : new MultipleIntervalSegmentSpec(List.of(interval)), dataSchema);
       }
-      return intervalDataSchemaMap;
+      return inputSchemas;
     } else {
       // given segment granularity
+      List<DataSegment> upgradeSegments = Lists.newArrayList(Iterables.filter(
+          timelineSegments,
+          segmentProvider.segmentsToUpgradePredicate
+      ));
+      if (!upgradeSegments.isEmpty()) {
+        toolbox.getTaskActionClient().submit(new 
MarkSegmentToUpgradeAction(segmentProvider.dataSource, upgradeSegments));

Review Comment:
   Please add a log line here.



##########
indexing-service/src/main/java/org/apache/druid/indexing/common/task/CompactionTask.java:
##########
@@ -648,7 +697,11 @@ static Map<Interval, DataSchema> 
createDataSchemasForIntervals(
           projections,
           needMultiValuedColumns
       );
-      return Collections.singletonMap(segmentProvider.interval, dataSchema);
+      return Map.of(segmentProvider.minorCompaction
+                    ? new 
MultipleSpecificSegmentSpec(StreamSupport.stream(segmentsToCompact.spliterator(),
 false)
+                                                                   
.map(DataSegment::toDescriptor)
+                                                                   
.collect(Collectors.toList()))
+                    : new 
MultipleIntervalSegmentSpec(List.of(segmentProvider.interval)), dataSchema);

Review Comment:
   Current code is difficult to follow. Please use `if-else` and a separate 
variable to clean it up.
   
   ```suggestion
         final QuerySegmentSpec segmentSpec;
         if (segmentProvider.isMinorCompaction) {
             segmentSpec = new MultipleSpecificSegmentSpec(
                     StreamSupport.stream(segmentsToCompact.spliterator(), 
false)
                                               .map(DataSegment::toDescriptor)
                                               .collect(Collectors.toList())
               );
         } else {
               segmentSpec = new 
MultipleIntervalSegmentSpec(List.of(segmentProvider.interval));
         }
         
         return Map.of(segmentSpec, dataSchema);
   ```



##########
server/src/main/java/org/apache/druid/metadata/IndexerSQLMetadataStorageCoordinator.java:
##########
@@ -1959,9 +1968,21 @@ private Set<DataSegmentPlus> 
createNewIdsOfAppendSegmentsAfterReplace(
               oldSegmentMetadata.getIndexingStateFingerprint()
           )
       );
+      segmentsToInsert.add(dataSegment);
     }
 
-    return upgradedSegments;
+    // update corePartitions in shard spec
+    return Pair.of(upgradedSegments, segmentsToInsert.stream().map(segment -> {
+      Integer partitionNum = 
intervalToCurrentPartitionNum.get(segment.getInterval());
+      if (!segment.isTombstone()
+          && !numChunkNotSupported.contains(segment.getInterval())
+          && partitionNum != null
+          && partitionNum + 1 != 
segment.getShardSpec().getNumCorePartitions()) {

Review Comment:
   Why this condition?
   
   Is it possible that existing compacted segments in an interval were range 
partitioned and had say 10 core partitions. And the segments created by the 
minor compaction are also range partitioned and have say 5 core partitions. In 
this case, none of the segments have core partition = 16, so will their shard 
spec remain unchanged?



##########
indexing-service/src/main/java/org/apache/druid/indexing/common/task/CompactionTask.java:
##########
@@ -1222,11 +1274,26 @@ static class SegmentProvider
     private final CompactionInputSpec inputSpec;
     private final Interval interval;
 
+    private final boolean minorCompaction;
+    private final Predicate<DataSegment> segmentsToUpgradePredicate;
+    private final Predicate<DataSegment> segmentsToCompactPredicate;

Review Comment:
   Nit: Instead of predicates, please add private methods 
`shouldUpgradeSegment(DataSegment)` and `shouldCompactSegment(DataSegment)` to 
the `SegmentProvider` class.
   Use of predicates does not really seem warranted here.



##########
server/src/main/java/org/apache/druid/metadata/IndexerSQLMetadataStorageCoordinator.java:
##########
@@ -1852,9 +1853,14 @@ protected Set<DataSegment> insertSegments(
   }
 
   /**
-   * Creates new versions of segments appended while a "REPLACE" task was in 
progress.
+   * Retrieves segments from the upgrade segments table and creates upgraded 
versions with new intervals,
+   * versions, and partition numbers. Combines upgraded segments with replace 
segments and updates shard
+   * specs with correct core partition counts.
+   *
+   * @return pair of (upgraded segments for metadata tracking, segments to 
insert into segment table)
+   * @throws DruidException if a replace interval partially overlaps a segment 
being upgraded
    */
-  private Set<DataSegmentPlus> createNewIdsOfAppendSegmentsAfterReplace(
+  private Pair<Set<DataSegmentPlus>, Set<DataSegment>> 
createNewSegmentsAfterReplace(

Review Comment:
   This method should just return a `Set<DataSegmentPlus>`. Each 
`DataSegmentPlus` already contains a `DataSegment` inside it.
   The `DataSegmentPlus.getUpgradedFromSegmentId()` should be used to 
distinguish if the segment is an upgraded one or not.



##########
server/src/main/java/org/apache/druid/metadata/IndexerSQLMetadataStorageCoordinator.java:
##########
@@ -1925,15 +1933,16 @@ private Set<DataSegmentPlus> 
createNewIdsOfAppendSegmentsAfterReplace(
         // but a (revoked) REPLACE lock covers this segment
         newInterval = oldInterval;
       }
+      if (!oldSegment.getShardSpec().isNumChunkSupported()) {
+        numChunkNotSupported.add(newInterval);
+      }
 
       // Compute shard spec of the upgraded segment
       final int partitionNum = intervalToCurrentPartitionNum.compute(
           newInterval,
           (i, value) -> value == null ? 0 : value + 1
       );
-      final int numCorePartitions = 
intervalToNumCorePartitions.get(newInterval);
-      ShardSpec shardSpec = new NumberedShardSpec(partitionNum, 
numCorePartitions);
-
+      final ShardSpec shardSpec = 
oldSegment.getShardSpec().withPartitionNum(partitionNum);
       // Create upgraded segment with the correct interval, version and shard 
spec
       String lockVersion = 
upgradeSegmentToLockVersion.get(oldSegment.getId().toString());
       DataSegment dataSegment = DataSegment.builder(oldSegment)

Review Comment:
   Use the final shardSpec to build this `DataSegment`.



##########
server/src/main/java/org/apache/druid/client/indexing/ClientCompactionIntervalSpec.java:
##########
@@ -20,43 +20,67 @@
 package org.apache.druid.client.indexing;
 
 import com.fasterxml.jackson.annotation.JsonCreator;
+import com.fasterxml.jackson.annotation.JsonInclude;
 import com.fasterxml.jackson.annotation.JsonProperty;
 import org.apache.druid.java.util.common.IAE;
+import org.apache.druid.query.SegmentDescriptor;
 import org.joda.time.Interval;
 
 import javax.annotation.Nullable;
+import java.util.List;
 import java.util.Objects;
+import java.util.stream.Collectors;
 
 /**
  * InputSpec for {@link ClientCompactionIOConfig}.
  * <p>
- * Should be synchronized with 
org.apache.druid.indexing.common.task.CompactionIntervalSpec.
+ * Should be synchronized with 
org.apache.druid.indexing.common.task.CompactionIntervalSpec and 
org.apache.druid.indexing.common.task.UncompactedInputSpec.
  */
 public class ClientCompactionIntervalSpec
 {
-  private static final String TYPE = "interval";
+  private static final String TYPE_ALL_SEGMENTS = "interval";
+  private static final String TYPE_UNCOMPACTED_SEGMENTS_ONLY = "uncompacted";
 
   private final Interval interval;
   @Nullable
+  private final List<SegmentDescriptor> uncompactedSegments;
+  @Nullable
   private final String sha256OfSortedSegmentIds;
 
   @JsonCreator
   public ClientCompactionIntervalSpec(
       @JsonProperty("interval") Interval interval,
+      @JsonProperty("uncompactedSegments") @Nullable List<SegmentDescriptor> 
uncompactedSegments,
       @JsonProperty("sha256OfSortedSegmentIds") @Nullable String 
sha256OfSortedSegmentIds
   )
   {
     if (interval != null && interval.toDurationMillis() == 0) {
       throw new IAE("Interval[%s] is empty, must specify a nonempty interval", 
interval);
     }
     this.interval = interval;
+    if (uncompactedSegments == null) {
+      // perform a full compaction
+    } else if (uncompactedSegments.isEmpty()) {
+      throw new IAE("Can not supply empty segments as input, please use either 
null or non-empty segments.");
+    } else if (interval != null) {
+      List<SegmentDescriptor> segmentsNotInInterval =
+          uncompactedSegments.stream().filter(s -> 
!interval.contains(s.getInterval())).collect(Collectors.toList());
+      if (!segmentsNotInInterval.isEmpty()) {
+        throw new IAE(
+            "Can not supply segments outside interval[%s], got segments[%s].",
+            interval,
+            segmentsNotInInterval
+        );
+      }
+    }
+    this.uncompactedSegments = uncompactedSegments;
     this.sha256OfSortedSegmentIds = sha256OfSortedSegmentIds;
   }
 
   @JsonProperty
   public String getType()
   {
-    return TYPE;
+    return (uncompactedSegments == null) ? TYPE_ALL_SEGMENTS : 
TYPE_UNCOMPACTED_SEGMENTS_ONLY;

Review Comment:
   This seems unclean. Please add a separate class instead.



##########
embedded-tests/src/test/java/org/apache/druid/testing/embedded/compact/CompactionSupervisorTest.java:
##########
@@ -202,6 +229,133 @@ public void 
test_ingestDayGranularity_andCompactToMonthGranularity_andCompactToY
     verifyCompactedSegmentsHaveFingerprints(yearGranConfig);
   }
 
+  @Test
+  public void test_minorCompactionWithMSQ() throws Exception
+  {
+    configureCompaction(
+        CompactionEngine.MSQ,
+        new MostFragmentedIntervalFirstPolicy(2, new 
HumanReadableBytes("1KiB"), null, 80, null)
+    );
+    KafkaSupervisorSpecBuilder kafkaSupervisorSpecBuilder = 
MoreResources.Supervisor.KAFKA_JSON
+        .get()
+        .withDataSchema(schema -> schema.withTimestamp(new 
TimestampSpec("timestamp", "iso", null))
+                                        
.withDimensions(DimensionsSpec.builder().useSchemaDiscovery(true).build()))
+        .withTuningConfig(tuningConfig -> 
tuningConfig.withMaxRowsPerSegment(1))
+        .withIoConfig(ioConfig -> 
ioConfig.withConsumerProperties(kafkaServer.consumerProperties()).withTaskCount(2));
+
+    // Set up first topic and supervisor
+    final String topic1 = IdUtils.getRandomId();
+    kafkaServer.createTopicWithPartitions(topic1, 1);
+    final KafkaSupervisorSpec supervisor1 = 
kafkaSupervisorSpecBuilder.withId(topic1).build(dataSource, topic1);
+    cluster.callApi().postSupervisor(supervisor1);
+
+    final int totalRowCount = publish1kRecords(topic1, true) + 
publish1kRecords(topic1, false);
+    waitUntilPublishedRecordsAreIngested(totalRowCount);
+
+    // Before compaction
+    Assertions.assertEquals(4, getNumSegmentsWith(Granularities.HOUR));
+
+    PartitionsSpec partitionsSpec = new DimensionRangePartitionsSpec(null, 
5000, List.of("page"), false);

Review Comment:
   Please add a test case for dynamic partitioning too.



##########
processing/src/main/java/org/apache/druid/timeline/partition/ShardSpec.java:
##########
@@ -56,6 +57,15 @@
 })
 public interface ShardSpec
 {
+  /**
+   * Returns whether {@link #createChunk} returns a {@link 
NumberedPartitionChunk} instance.
+   * This is necessary for supporting {@link PartitionHolder#isComplete()} if 
updating to a new corePartitions spec.
+   */
+  default boolean isNumChunkSupported()

Review Comment:
   ```suggestion
     default boolean canCreateNumberedPartitionChunk()
   ```



##########
indexing-service/src/main/java/org/apache/druid/indexing/common/task/CompactionTask.java:
##########
@@ -627,18 +659,35 @@ static Map<Interval, DataSchema> 
createDataSchemasForIntervals(
             projections,
             needMultiValuedColumns
         );
-        intervalDataSchemaMap.put(interval, dataSchema);
+        inputSchemas.put(
+            segmentProvider.minorCompaction
+            ? new MultipleSpecificSegmentSpec(segmentsToCompact.stream()
+                                                               
.map(DataSegment::toDescriptor)
+                                                               
.collect(Collectors.toList()))
+            : new MultipleIntervalSegmentSpec(List.of(interval)), dataSchema);
       }
-      return intervalDataSchemaMap;
+      return inputSchemas;
     } else {
       // given segment granularity
+      List<DataSegment> upgradeSegments = Lists.newArrayList(Iterables.filter(
+          timelineSegments,
+          segmentProvider.segmentsToUpgradePredicate

Review Comment:
   ```suggestion
             segmentProvider::shouldUpgradeSegment
   ```



##########
indexing-service/src/main/java/org/apache/druid/indexing/common/task/CompactionTask.java:
##########
@@ -627,18 +659,35 @@ static Map<Interval, DataSchema> 
createDataSchemasForIntervals(
             projections,
             needMultiValuedColumns
         );
-        intervalDataSchemaMap.put(interval, dataSchema);
+        inputSchemas.put(
+            segmentProvider.minorCompaction
+            ? new MultipleSpecificSegmentSpec(segmentsToCompact.stream()
+                                                               
.map(DataSegment::toDescriptor)
+                                                               
.collect(Collectors.toList()))
+            : new MultipleIntervalSegmentSpec(List.of(interval)), dataSchema);

Review Comment:
   Compute the value for the key `QuerySegmentSpec` in a separate line rather 
than computing it inside the `put`. Current code is difficult to follow.



##########
indexing-service/src/main/java/org/apache/druid/indexing/compact/CascadingReindexingTemplate.java:
##########
@@ -110,7 +113,8 @@ public CascadingReindexingTemplate(
       @JsonProperty("taskContext") @Nullable Map<String, Object> taskContext,
       @JsonProperty("skipOffsetFromLatest") @Nullable Period 
skipOffsetFromLatest,
       @JsonProperty("skipOffsetFromNow") @Nullable Period skipOffsetFromNow,
-      @JsonProperty("defaultSegmentGranularity") Granularity 
defaultSegmentGranularity
+      @JsonProperty("defaultSegmentGranularity") Granularity 
defaultSegmentGranularity,
+      @JacksonInject CompactionStatusTracker statusTracker

Review Comment:
   The status tracker should not be passed here. It would be passed via 
`CompactionJobParams` in a manner similar to the 
`IndexingStateFingerprintMapper` interface. Please refer to the other comments 
regarding the `CompactionStatusTracker`.



##########
indexing-service/src/main/java/org/apache/druid/indexing/common/task/CompactionTask.java:
##########
@@ -627,18 +659,35 @@ static Map<Interval, DataSchema> 
createDataSchemasForIntervals(
             projections,
             needMultiValuedColumns
         );
-        intervalDataSchemaMap.put(interval, dataSchema);
+        inputSchemas.put(
+            segmentProvider.minorCompaction
+            ? new MultipleSpecificSegmentSpec(segmentsToCompact.stream()
+                                                               
.map(DataSegment::toDescriptor)
+                                                               
.collect(Collectors.toList()))
+            : new MultipleIntervalSegmentSpec(List.of(interval)), dataSchema);
       }
-      return intervalDataSchemaMap;
+      return inputSchemas;
     } else {
       // given segment granularity
+      List<DataSegment> upgradeSegments = Lists.newArrayList(Iterables.filter(

Review Comment:
   ```suggestion
         List<DataSegment> segmentsToUpgrade = 
Lists.newArrayList(Iterables.filter(
   ```



##########
server/src/main/java/org/apache/druid/metadata/IndexerSQLMetadataStorageCoordinator.java:
##########
@@ -2097,17 +2119,27 @@ private SegmentMetadata 
getSegmentMetadataFromSchemaMappingOrUpgradeMetadata(
     return segmentMetadata;
   }
 
+  @Override
+  public int insertIntoUpgradeSegmentsTable(Map<DataSegment, ReplaceTaskLock> 
segmentToReplaceLock)
+  {
+    final String dataSource = 
verifySegmentsToCommit(segmentToReplaceLock.keySet());
+    return inReadWriteDatasourceTransaction(
+        dataSource,
+        transaction -> insertIntoUpgradeSegmentsTableDoWork(transaction, 
segmentToReplaceLock)
+    );
+  }
+
   /**
    * Inserts entries into the upgrade_segments table in batches of size
    * {@link #MAX_NUM_SEGMENTS_TO_ANNOUNCE_AT_ONCE}.
    */
-  private void insertIntoUpgradeSegmentsTable(
+  private int insertIntoUpgradeSegmentsTableDoWork(

Review Comment:
   Please do not rename this method if the intention is only to distinguish it 
from the new `insertIntoUpgradeSegmentsTable(map)` method. It is okay for two 
methods to have the same name if they serve a similar purpose but have 
different args.



##########
indexing-service/src/main/java/org/apache/druid/indexing/compact/OverlordCompactionScheduler.java:
##########
@@ -259,6 +259,12 @@ public boolean isRunning()
     return started.get();
   }
 
+  @Override
+  public CompactionStatusTracker getCompactionStatusTracker()

Review Comment:
   this shouldn't be required



##########
indexing-service/src/main/java/org/apache/druid/indexing/common/task/CompactionTask.java:
##########
@@ -572,17 +584,37 @@ static Map<Interval, DataSchema> 
createDataSchemasForIntervals(
       return Collections.emptyMap();
     }
 
+    if (segmentProvider.minorCompaction) {
+      Iterable<DataSegment> segmentsNotCompletelyWithinin =
+          Iterables.filter(timelineSegments, s -> 
!segmentProvider.interval.contains(s.getInterval()));
+      if (segmentsNotCompletelyWithinin.iterator().hasNext()) {
+        throw new ISE(
+            "Incremental compaction doesn't allow segments not completely 
within interval[%s]",
+            segmentProvider.interval
+        );
+      }
+    }
+
     if (granularitySpec == null || granularitySpec.getSegmentGranularity() == 
null) {
-      Map<Interval, DataSchema> intervalDataSchemaMap = new HashMap<>();
+      Map<QuerySegmentSpec, DataSchema> inputSchemas = new HashMap<>();
+      // if segment is already compacted in incremental compaction, they need 
to be upgraded directly, supported in MSQ
+      List<DataSegment> upgradeSegments = new ArrayList<>();
 
       // original granularity
       final Map<Interval, List<DataSegment>> intervalToSegments = new 
TreeMap<>(
           Comparators.intervalsByStartThenEnd()
       );
 
       for (final DataSegment dataSegment : timelineSegments) {
-        intervalToSegments.computeIfAbsent(dataSegment.getInterval(), k -> new 
ArrayList<>())
-                          .add(dataSegment);
+        if (segmentProvider.segmentsToUpgradePredicate.test(dataSegment)) {
+          upgradeSegments.add(dataSegment);
+        } else {
+          intervalToSegments.computeIfAbsent(dataSegment.getInterval(), k -> 
new ArrayList<>())
+                            .add(dataSegment);
+        }
+      }
+      if (!upgradeSegments.isEmpty()) {
+        toolbox.getTaskActionClient().submit(new 
MarkSegmentToUpgradeAction(segmentProvider.dataSource, upgradeSegments));

Review Comment:
   A log line here would be helpful.



##########
server/src/main/java/org/apache/druid/metadata/IndexerSQLMetadataStorageCoordinator.java:
##########
@@ -1959,9 +1968,21 @@ private Set<DataSegmentPlus> 
createNewIdsOfAppendSegmentsAfterReplace(
               oldSegmentMetadata.getIndexingStateFingerprint()
           )
       );
+      segmentsToInsert.add(dataSegment);
     }
 
-    return upgradedSegments;
+    // update corePartitions in shard spec
+    return Pair.of(upgradedSegments, segmentsToInsert.stream().map(segment -> {

Review Comment:
   Please break up this statement to separate out the return and the 
computation.



##########
indexing-service/src/main/java/org/apache/druid/indexing/common/task/NativeCompactionRunner.java:
##########
@@ -37,9 +37,11 @@
 import org.apache.druid.indexing.input.DruidInputSource;
 import org.apache.druid.java.util.common.IAE;
 import org.apache.druid.java.util.common.Intervals;
+import org.apache.druid.java.util.common.JodaUtils;

Review Comment:
   The validation in `NativeCompactionRunner.validateCompactionTask()` should 
return a `failure` if the `CompactionInputSpec` is of the new type 
`Uncompacted` (aka minor compaction). 



##########
embedded-tests/src/test/java/org/apache/druid/testing/embedded/compact/CompactionSupervisorTest.java:
##########
@@ -202,6 +229,133 @@ public void 
test_ingestDayGranularity_andCompactToMonthGranularity_andCompactToY
     verifyCompactedSegmentsHaveFingerprints(yearGranConfig);
   }
 
+  @Test
+  public void test_minorCompactionWithMSQ() throws Exception
+  {
+    configureCompaction(
+        CompactionEngine.MSQ,
+        new MostFragmentedIntervalFirstPolicy(2, new 
HumanReadableBytes("1KiB"), null, 80, null)
+    );
+    KafkaSupervisorSpecBuilder kafkaSupervisorSpecBuilder = 
MoreResources.Supervisor.KAFKA_JSON
+        .get()
+        .withDataSchema(schema -> schema.withTimestamp(new 
TimestampSpec("timestamp", "iso", null))
+                                        
.withDimensions(DimensionsSpec.builder().useSchemaDiscovery(true).build()))
+        .withTuningConfig(tuningConfig -> 
tuningConfig.withMaxRowsPerSegment(1))
+        .withIoConfig(ioConfig -> 
ioConfig.withConsumerProperties(kafkaServer.consumerProperties()).withTaskCount(2));
+
+    // Set up first topic and supervisor
+    final String topic1 = IdUtils.getRandomId();
+    kafkaServer.createTopicWithPartitions(topic1, 1);
+    final KafkaSupervisorSpec supervisor1 = 
kafkaSupervisorSpecBuilder.withId(topic1).build(dataSource, topic1);
+    cluster.callApi().postSupervisor(supervisor1);
+
+    final int totalRowCount = publish1kRecords(topic1, true) + 
publish1kRecords(topic1, false);
+    waitUntilPublishedRecordsAreIngested(totalRowCount);
+
+    // Before compaction
+    Assertions.assertEquals(4, getNumSegmentsWith(Granularities.HOUR));
+
+    PartitionsSpec partitionsSpec = new DimensionRangePartitionsSpec(null, 
5000, List.of("page"), false);
+    // Create a compaction config with DAY granularity
+    InlineSchemaDataSourceCompactionConfig dayGranularityConfig =
+        InlineSchemaDataSourceCompactionConfig
+            .builder()
+            .forDataSource(dataSource)
+            .withSkipOffsetFromLatest(Period.seconds(0))
+            .withGranularitySpec(new 
UserCompactionTaskGranularityConfig(Granularities.DAY, null, false))
+            .withDimensionsSpec(new UserCompactionTaskDimensionsConfig(
+                WikipediaStreamEventStreamGenerator.dimensions()
+                                                   .stream()
+                                                   
.map(StringDimensionSchema::new)
+                                                   
.collect(Collectors.toUnmodifiableList())))
+            .withTaskContext(Map.of("useConcurrentLocks", true))
+            .withIoConfig(new UserCompactionTaskIOConfig(true))
+            
.withTuningConfig(UserCompactionTaskQueryTuningConfig.builder().partitionsSpec(partitionsSpec).build())
+            .build();
+
+    runCompactionWithSpec(dayGranularityConfig);
+    waitForAllCompactionTasksToFinish();
+
+    pauseCompaction(dayGranularityConfig);
+    Assertions.assertEquals(0, getNumSegmentsWith(Granularities.HOUR));
+    Assertions.assertEquals(1, getNumSegmentsWith(Granularities.DAY));
+
+    verifyCompactedSegmentsHaveFingerprints(dayGranularityConfig);
+
+    // Set up another topic and supervisor

Review Comment:
   Why do we need a separate topic and supervisor?



##########
embedded-tests/src/test/java/org/apache/druid/testing/embedded/compact/CompactionSupervisorTest.java:
##########
@@ -202,6 +229,133 @@ public void 
test_ingestDayGranularity_andCompactToMonthGranularity_andCompactToY
     verifyCompactedSegmentsHaveFingerprints(yearGranConfig);
   }
 
+  @Test
+  public void test_minorCompactionWithMSQ() throws Exception
+  {
+    configureCompaction(
+        CompactionEngine.MSQ,
+        new MostFragmentedIntervalFirstPolicy(2, new 
HumanReadableBytes("1KiB"), null, 80, null)
+    );
+    KafkaSupervisorSpecBuilder kafkaSupervisorSpecBuilder = 
MoreResources.Supervisor.KAFKA_JSON
+        .get()
+        .withDataSchema(schema -> schema.withTimestamp(new 
TimestampSpec("timestamp", "iso", null))
+                                        
.withDimensions(DimensionsSpec.builder().useSchemaDiscovery(true).build()))
+        .withTuningConfig(tuningConfig -> 
tuningConfig.withMaxRowsPerSegment(1))
+        .withIoConfig(ioConfig -> 
ioConfig.withConsumerProperties(kafkaServer.consumerProperties()).withTaskCount(2));
+
+    // Set up first topic and supervisor
+    final String topic1 = IdUtils.getRandomId();
+    kafkaServer.createTopicWithPartitions(topic1, 1);
+    final KafkaSupervisorSpec supervisor1 = 
kafkaSupervisorSpecBuilder.withId(topic1).build(dataSource, topic1);
+    cluster.callApi().postSupervisor(supervisor1);
+
+    final int totalRowCount = publish1kRecords(topic1, true) + 
publish1kRecords(topic1, false);
+    waitUntilPublishedRecordsAreIngested(totalRowCount);
+
+    // Before compaction
+    Assertions.assertEquals(4, getNumSegmentsWith(Granularities.HOUR));
+
+    PartitionsSpec partitionsSpec = new DimensionRangePartitionsSpec(null, 
5000, List.of("page"), false);
+    // Create a compaction config with DAY granularity
+    InlineSchemaDataSourceCompactionConfig dayGranularityConfig =
+        InlineSchemaDataSourceCompactionConfig
+            .builder()
+            .forDataSource(dataSource)
+            .withSkipOffsetFromLatest(Period.seconds(0))
+            .withGranularitySpec(new 
UserCompactionTaskGranularityConfig(Granularities.DAY, null, false))
+            .withDimensionsSpec(new UserCompactionTaskDimensionsConfig(
+                WikipediaStreamEventStreamGenerator.dimensions()
+                                                   .stream()
+                                                   
.map(StringDimensionSchema::new)
+                                                   
.collect(Collectors.toUnmodifiableList())))
+            .withTaskContext(Map.of("useConcurrentLocks", true))
+            .withIoConfig(new UserCompactionTaskIOConfig(true))
+            
.withTuningConfig(UserCompactionTaskQueryTuningConfig.builder().partitionsSpec(partitionsSpec).build())
+            .build();
+
+    runCompactionWithSpec(dayGranularityConfig);
+    waitForAllCompactionTasksToFinish();
+
+    pauseCompaction(dayGranularityConfig);
+    Assertions.assertEquals(0, getNumSegmentsWith(Granularities.HOUR));
+    Assertions.assertEquals(1, getNumSegmentsWith(Granularities.DAY));
+
+    verifyCompactedSegmentsHaveFingerprints(dayGranularityConfig);
+
+    // Set up another topic and supervisor
+    final String topic2 = IdUtils.getRandomId();
+    kafkaServer.createTopicWithPartitions(topic2, 1);
+    final KafkaSupervisorSpec supervisor2 = 
kafkaSupervisorSpecBuilder.withId(topic2).build(dataSource, topic2);
+    cluster.callApi().postSupervisor(supervisor2);
+
+    // published another 1k
+    final int appendedRowCount = publish1kRecords(topic2, true);
+    indexer.latchableEmitter().flush();
+    waitUntilPublishedRecordsAreIngested(appendedRowCount);
+
+    // Tear down both topics and supervisors
+    kafkaServer.deleteTopic(topic1);
+    cluster.callApi().postSupervisor(supervisor1.createSuspendedSpec());
+    kafkaServer.deleteTopic(topic2);
+    cluster.callApi().postSupervisor(supervisor2.createSuspendedSpec());
+    long totalUsed = overlord.latchableEmitter().getMetricValues(
+        "segment/metadataCache/used/count",
+        Map.of(DruidMetrics.DATASOURCE, dataSource)
+    ).stream().reduce((first, second) -> second).orElse(0).longValue();
+
+    Assertions.assertEquals(0, getNumSegmentsWith(Granularities.HOUR));
+    // 1 compacted segment + 2 appended segment
+    Assertions.assertEquals(3, getNumSegmentsWith(Granularities.DAY));
+
+    runCompactionWithSpec(dayGranularityConfig);
+    waitForAllCompactionTasksToFinish();
+
+    // wait for new segments have been updated to the cache
+    overlord.latchableEmitter().waitForEvent(
+        event -> event.hasMetricName("segment/metadataCache/used/count")
+                      .hasDimension(DruidMetrics.DATASOURCE, dataSource)
+                      .hasValueMatching(Matchers.greaterThan(totalUsed)));
+
+    // performed incremental compaction: 1 previously compacted segment + 1 
incrementally compacted segment
+    Assertions.assertEquals(2, getNumSegmentsWith(Granularities.DAY));

Review Comment:
   Please also add assertions to verify the results of some queries on the 
final compacted data.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to