cecemei commented on code in PR #19059:
URL: https://github.com/apache/druid/pull/19059#discussion_r2885201680


##########
indexing-service/src/main/java/org/apache/druid/indexing/compact/CompactionConfigBasedJobTemplate.java:
##########
@@ -91,12 +96,35 @@ public List<CompactionJob> createCompactionJobs(
     // Create a job for each CompactionCandidate
     while (segmentIterator.hasNext()) {
       final CompactionCandidate candidate = segmentIterator.next();
+      final CompactionCandidateSearchPolicy.Eligibility eligibility =
+          params.getClusterCompactionConfig()
+                .getCompactionPolicy()
+                .checkEligibilityForCompaction(candidate, 
statusTracker.getLatestTaskStatus(candidate));
+      if (!eligibility.isEligible()) {
+        continue;
+      }
+      final CompactionCandidate finalCandidate;
+      switch (eligibility.getMode()) {
+        case ALL_SEGMENTS:
+          finalCandidate = candidate;
+          break;
+        case UNCOMPACTED_SEGMENTS_ONLY:
+          finalCandidate = CompactionCandidate.from(
+              candidate.getUncompactedSegments(),
+              null,
+              candidate.getCurrentStatus()
+          );

Review Comment:
   yes the candidate is now reconstructed with only uncompactedsegments



##########
indexing-service/src/main/java/org/apache/druid/indexing/common/task/CompactionTask.java:
##########
@@ -572,17 +584,37 @@ static Map<Interval, DataSchema> 
createDataSchemasForIntervals(
       return Collections.emptyMap();
     }
 
+    if (segmentProvider.minorCompaction) {
+      Iterable<DataSegment> segmentsNotCompletelyWithinin =
+          Iterables.filter(timelineSegments, s -> 
!segmentProvider.interval.contains(s.getInterval()));
+      if (segmentsNotCompletelyWithinin.iterator().hasNext()) {
+        throw new ISE(
+            "Incremental compaction doesn't allow segments not completely 
within interval[%s]",
+            segmentProvider.interval
+        );
+      }
+    }
+
     if (granularitySpec == null || granularitySpec.getSegmentGranularity() == 
null) {
-      Map<Interval, DataSchema> intervalDataSchemaMap = new HashMap<>();
+      Map<QuerySegmentSpec, DataSchema> inputSchemas = new HashMap<>();
+      // if segment is already compacted in incremental compaction, they need 
to be upgraded directly, supported in MSQ
+      List<DataSegment> upgradeSegments = new ArrayList<>();
 
       // original granularity
       final Map<Interval, List<DataSegment>> intervalToSegments = new 
TreeMap<>(
           Comparators.intervalsByStartThenEnd()
       );
 
       for (final DataSegment dataSegment : timelineSegments) {
-        intervalToSegments.computeIfAbsent(dataSegment.getInterval(), k -> new 
ArrayList<>())
-                          .add(dataSegment);
+        if (segmentProvider.segmentsToUpgradePredicate.test(dataSegment)) {
+          upgradeSegments.add(dataSegment);
+        } else {
+          intervalToSegments.computeIfAbsent(dataSegment.getInterval(), k -> 
new ArrayList<>())
+                            .add(dataSegment);
+        }
+      }
+      if (!upgradeSegments.isEmpty()) {
+        toolbox.getTaskActionClient().submit(new 
MarkSegmentToUpgradeAction(segmentProvider.dataSource, upgradeSegments));

Review Comment:
   done



##########
server/src/main/java/org/apache/druid/client/indexing/ClientCompactionIntervalSpec.java:
##########
@@ -20,43 +20,67 @@
 package org.apache.druid.client.indexing;
 
 import com.fasterxml.jackson.annotation.JsonCreator;
+import com.fasterxml.jackson.annotation.JsonInclude;
 import com.fasterxml.jackson.annotation.JsonProperty;
 import org.apache.druid.java.util.common.IAE;
+import org.apache.druid.query.SegmentDescriptor;
 import org.joda.time.Interval;
 
 import javax.annotation.Nullable;
+import java.util.List;
 import java.util.Objects;
+import java.util.stream.Collectors;
 
 /**
  * InputSpec for {@link ClientCompactionIOConfig}.
  * <p>
- * Should be synchronized with 
org.apache.druid.indexing.common.task.CompactionIntervalSpec.
+ * Should be synchronized with 
org.apache.druid.indexing.common.task.CompactionIntervalSpec and 
org.apache.druid.indexing.common.task.UncompactedInputSpec.
  */
 public class ClientCompactionIntervalSpec
 {
-  private static final String TYPE = "interval";
+  private static final String TYPE_ALL_SEGMENTS = "interval";
+  private static final String TYPE_UNCOMPACTED_SEGMENTS_ONLY = "uncompacted";
 
   private final Interval interval;
   @Nullable
+  private final List<SegmentDescriptor> uncompactedSegments;
+  @Nullable
   private final String sha256OfSortedSegmentIds;
 
   @JsonCreator
   public ClientCompactionIntervalSpec(
       @JsonProperty("interval") Interval interval,
+      @JsonProperty("uncompactedSegments") @Nullable List<SegmentDescriptor> 
uncompactedSegments,
       @JsonProperty("sha256OfSortedSegmentIds") @Nullable String 
sha256OfSortedSegmentIds
   )
   {
     if (interval != null && interval.toDurationMillis() == 0) {
       throw new IAE("Interval[%s] is empty, must specify a nonempty interval", 
interval);
     }
     this.interval = interval;
+    if (uncompactedSegments == null) {
+      // perform a full compaction
+    } else if (uncompactedSegments.isEmpty()) {
+      throw new IAE("Can not supply empty segments as input, please use either 
null or non-empty segments.");
+    } else if (interval != null) {
+      List<SegmentDescriptor> segmentsNotInInterval =
+          uncompactedSegments.stream().filter(s -> 
!interval.contains(s.getInterval())).collect(Collectors.toList());
+      if (!segmentsNotInInterval.isEmpty()) {
+        throw new IAE(
+            "Can not supply segments outside interval[%s], got segments[%s].",
+            interval,
+            segmentsNotInInterval
+        );
+      }
+    }
+    this.uncompactedSegments = uncompactedSegments;
     this.sha256OfSortedSegmentIds = sha256OfSortedSegmentIds;
   }
 
   @JsonProperty
   public String getType()
   {
-    return TYPE;
+    return (uncompactedSegments == null) ? TYPE_ALL_SEGMENTS : 
TYPE_UNCOMPACTED_SEGMENTS_ONLY;

Review Comment:
   whats the point of `ClientCompactionIntervalSpec` if every class needs to be 
mapped to CompactionIntervalSpec class? 



##########
server/src/main/java/org/apache/druid/metadata/IndexerSQLMetadataStorageCoordinator.java:
##########
@@ -2186,12 +2222,13 @@ private Map<String, String> 
getAppendSegmentsCommittedDuringTask(
     );
 
     ResultIterator<Pair<String, String>> resultIterator = 
transaction.getHandle()

Review Comment:
   done



##########
processing/src/main/java/org/apache/druid/timeline/partition/ShardSpec.java:
##########
@@ -56,6 +57,15 @@
 })
 public interface ShardSpec
 {
+  /**
+   * Returns whether {@link #createChunk} returns a {@link 
NumberedPartitionChunk} instance.
+   * This is necessary for supporting {@link PartitionHolder#isComplete()} if 
updating to a new corePartitions spec.
+   */
+  default boolean isNumChunkSupported()

Review Comment:
   done



##########
server/src/main/java/org/apache/druid/metadata/IndexerSQLMetadataStorageCoordinator.java:
##########
@@ -2097,17 +2119,27 @@ private SegmentMetadata 
getSegmentMetadataFromSchemaMappingOrUpgradeMetadata(
     return segmentMetadata;
   }
 
+  @Override
+  public int insertIntoUpgradeSegmentsTable(Map<DataSegment, ReplaceTaskLock> 
segmentToReplaceLock)
+  {
+    final String dataSource = 
verifySegmentsToCommit(segmentToReplaceLock.keySet());
+    return inReadWriteDatasourceTransaction(
+        dataSource,
+        transaction -> insertIntoUpgradeSegmentsTableDoWork(transaction, 
segmentToReplaceLock)
+    );
+  }
+
   /**
    * Inserts entries into the upgrade_segments table in batches of size
    * {@link #MAX_NUM_SEGMENTS_TO_ANNOUNCE_AT_ONCE}.
    */
-  private void insertIntoUpgradeSegmentsTable(
+  private int insertIntoUpgradeSegmentsTableDoWork(

Review Comment:
   updated



##########
server/src/main/java/org/apache/druid/metadata/IndexerSQLMetadataStorageCoordinator.java:
##########
@@ -1959,9 +1968,21 @@ private Set<DataSegmentPlus> 
createNewIdsOfAppendSegmentsAfterReplace(
               oldSegmentMetadata.getIndexingStateFingerprint()
           )
       );
+      segmentsToInsert.add(dataSegment);
     }
 
-    return upgradedSegments;
+    // update corePartitions in shard spec
+    return Pair.of(upgradedSegments, segmentsToInsert.stream().map(segment -> {
+      Integer partitionNum = 
intervalToCurrentPartitionNum.get(segment.getInterval());
+      if (!segment.isTombstone()
+          && !numChunkNotSupported.contains(segment.getInterval())
+          && partitionNum != null
+          && partitionNum + 1 != 
segment.getShardSpec().getNumCorePartitions()) {

Review Comment:
   updated



##########
indexing-service/src/main/java/org/apache/druid/indexing/common/task/CompactionTask.java:
##########
@@ -627,18 +659,35 @@ static Map<Interval, DataSchema> 
createDataSchemasForIntervals(
             projections,
             needMultiValuedColumns
         );
-        intervalDataSchemaMap.put(interval, dataSchema);
+        inputSchemas.put(
+            segmentProvider.minorCompaction
+            ? new MultipleSpecificSegmentSpec(segmentsToCompact.stream()
+                                                               
.map(DataSegment::toDescriptor)
+                                                               
.collect(Collectors.toList()))
+            : new MultipleIntervalSegmentSpec(List.of(interval)), dataSchema);
       }
-      return intervalDataSchemaMap;
+      return inputSchemas;
     } else {
       // given segment granularity
+      List<DataSegment> upgradeSegments = Lists.newArrayList(Iterables.filter(
+          timelineSegments,
+          segmentProvider.segmentsToUpgradePredicate
+      ));
+      if (!upgradeSegments.isEmpty()) {
+        toolbox.getTaskActionClient().submit(new 
MarkSegmentToUpgradeAction(segmentProvider.dataSource, upgradeSegments));

Review Comment:
   done



##########
indexing-service/src/main/java/org/apache/druid/indexing/common/task/CompactionTask.java:
##########
@@ -648,7 +697,11 @@ static Map<Interval, DataSchema> 
createDataSchemasForIntervals(
           projections,
           needMultiValuedColumns
       );
-      return Collections.singletonMap(segmentProvider.interval, dataSchema);
+      return Map.of(segmentProvider.minorCompaction
+                    ? new 
MultipleSpecificSegmentSpec(StreamSupport.stream(segmentsToCompact.spliterator(),
 false)
+                                                                   
.map(DataSegment::toDescriptor)
+                                                                   
.collect(Collectors.toList()))
+                    : new 
MultipleIntervalSegmentSpec(List.of(segmentProvider.interval)), dataSchema);

Review Comment:
   done



##########
indexing-service/src/main/java/org/apache/druid/indexing/common/task/CompactionTask.java:
##########
@@ -1222,11 +1274,26 @@ static class SegmentProvider
     private final CompactionInputSpec inputSpec;
     private final Interval interval;
 
+    private final boolean minorCompaction;
+    private final Predicate<DataSegment> segmentsToUpgradePredicate;
+    private final Predicate<DataSegment> segmentsToCompactPredicate;

Review Comment:
   done



##########
indexing-service/src/main/java/org/apache/druid/indexing/common/task/CompactionTask.java:
##########
@@ -572,17 +584,37 @@ static Map<Interval, DataSchema> 
createDataSchemasForIntervals(
       return Collections.emptyMap();
     }
 
+    if (segmentProvider.minorCompaction) {
+      Iterable<DataSegment> segmentsNotCompletelyWithinin =
+          Iterables.filter(timelineSegments, s -> 
!segmentProvider.interval.contains(s.getInterval()));
+      if (segmentsNotCompletelyWithinin.iterator().hasNext()) {
+        throw new ISE(

Review Comment:
   timelineSegments is not user input, but the segments overlapping with the 
interval. the interval can overlap with some segments, in this case we cant 
simiply upgrade the overlapping segment or include it in the query. this is an 
edge case we discussed before. 



##########
indexing-service/src/main/java/org/apache/druid/indexing/common/task/NativeCompactionRunner.java:
##########
@@ -37,9 +37,11 @@
 import org.apache.druid.indexing.input.DruidInputSource;
 import org.apache.druid.java.util.common.IAE;
 import org.apache.druid.java.util.common.Intervals;
+import org.apache.druid.java.util.common.JodaUtils;

Review Comment:
   done



##########
indexing-service/src/main/java/org/apache/druid/indexing/common/task/CompactionTask.java:
##########
@@ -627,18 +659,35 @@ static Map<Interval, DataSchema> 
createDataSchemasForIntervals(
             projections,
             needMultiValuedColumns
         );
-        intervalDataSchemaMap.put(interval, dataSchema);
+        inputSchemas.put(
+            segmentProvider.minorCompaction
+            ? new MultipleSpecificSegmentSpec(segmentsToCompact.stream()
+                                                               
.map(DataSegment::toDescriptor)
+                                                               
.collect(Collectors.toList()))
+            : new MultipleIntervalSegmentSpec(List.of(interval)), dataSchema);
       }
-      return intervalDataSchemaMap;
+      return inputSchemas;
     } else {
       // given segment granularity
+      List<DataSegment> upgradeSegments = Lists.newArrayList(Iterables.filter(
+          timelineSegments,
+          segmentProvider.segmentsToUpgradePredicate

Review Comment:
   done



##########
indexing-service/src/main/java/org/apache/druid/indexing/common/task/CompactionTask.java:
##########
@@ -627,18 +659,35 @@ static Map<Interval, DataSchema> 
createDataSchemasForIntervals(
             projections,
             needMultiValuedColumns
         );
-        intervalDataSchemaMap.put(interval, dataSchema);
+        inputSchemas.put(
+            segmentProvider.minorCompaction
+            ? new MultipleSpecificSegmentSpec(segmentsToCompact.stream()
+                                                               
.map(DataSegment::toDescriptor)
+                                                               
.collect(Collectors.toList()))
+            : new MultipleIntervalSegmentSpec(List.of(interval)), dataSchema);
       }
-      return intervalDataSchemaMap;
+      return inputSchemas;
     } else {
       // given segment granularity
+      List<DataSegment> upgradeSegments = Lists.newArrayList(Iterables.filter(

Review Comment:
   done



##########
server/src/main/java/org/apache/druid/metadata/IndexerSQLMetadataStorageCoordinator.java:
##########
@@ -1959,9 +1968,21 @@ private Set<DataSegmentPlus> 
createNewIdsOfAppendSegmentsAfterReplace(
               oldSegmentMetadata.getIndexingStateFingerprint()
           )
       );
+      segmentsToInsert.add(dataSegment);
     }
 
-    return upgradedSegments;
+    // update corePartitions in shard spec
+    return Pair.of(upgradedSegments, segmentsToInsert.stream().map(segment -> {

Review Comment:
   updated



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to