cecemei commented on code in PR #19059:
URL: https://github.com/apache/druid/pull/19059#discussion_r2885201680
##########
indexing-service/src/main/java/org/apache/druid/indexing/compact/CompactionConfigBasedJobTemplate.java:
##########
@@ -91,12 +96,35 @@ public List<CompactionJob> createCompactionJobs(
// Create a job for each CompactionCandidate
while (segmentIterator.hasNext()) {
final CompactionCandidate candidate = segmentIterator.next();
+ final CompactionCandidateSearchPolicy.Eligibility eligibility =
+ params.getClusterCompactionConfig()
+ .getCompactionPolicy()
+ .checkEligibilityForCompaction(candidate,
statusTracker.getLatestTaskStatus(candidate));
+ if (!eligibility.isEligible()) {
+ continue;
+ }
+ final CompactionCandidate finalCandidate;
+ switch (eligibility.getMode()) {
+ case ALL_SEGMENTS:
+ finalCandidate = candidate;
+ break;
+ case UNCOMPACTED_SEGMENTS_ONLY:
+ finalCandidate = CompactionCandidate.from(
+ candidate.getUncompactedSegments(),
+ null,
+ candidate.getCurrentStatus()
+ );
Review Comment:
yes the candidate is now reconstructed with only uncompactedsegments
##########
indexing-service/src/main/java/org/apache/druid/indexing/common/task/CompactionTask.java:
##########
@@ -572,17 +584,37 @@ static Map<Interval, DataSchema>
createDataSchemasForIntervals(
return Collections.emptyMap();
}
+ if (segmentProvider.minorCompaction) {
+ Iterable<DataSegment> segmentsNotCompletelyWithinin =
+ Iterables.filter(timelineSegments, s ->
!segmentProvider.interval.contains(s.getInterval()));
+ if (segmentsNotCompletelyWithinin.iterator().hasNext()) {
+ throw new ISE(
+ "Incremental compaction doesn't allow segments not completely
within interval[%s]",
+ segmentProvider.interval
+ );
+ }
+ }
+
if (granularitySpec == null || granularitySpec.getSegmentGranularity() ==
null) {
- Map<Interval, DataSchema> intervalDataSchemaMap = new HashMap<>();
+ Map<QuerySegmentSpec, DataSchema> inputSchemas = new HashMap<>();
+ // if segment is already compacted in incremental compaction, they need
to be upgraded directly, supported in MSQ
+ List<DataSegment> upgradeSegments = new ArrayList<>();
// original granularity
final Map<Interval, List<DataSegment>> intervalToSegments = new
TreeMap<>(
Comparators.intervalsByStartThenEnd()
);
for (final DataSegment dataSegment : timelineSegments) {
- intervalToSegments.computeIfAbsent(dataSegment.getInterval(), k -> new
ArrayList<>())
- .add(dataSegment);
+ if (segmentProvider.segmentsToUpgradePredicate.test(dataSegment)) {
+ upgradeSegments.add(dataSegment);
+ } else {
+ intervalToSegments.computeIfAbsent(dataSegment.getInterval(), k ->
new ArrayList<>())
+ .add(dataSegment);
+ }
+ }
+ if (!upgradeSegments.isEmpty()) {
+ toolbox.getTaskActionClient().submit(new
MarkSegmentToUpgradeAction(segmentProvider.dataSource, upgradeSegments));
Review Comment:
done
##########
server/src/main/java/org/apache/druid/client/indexing/ClientCompactionIntervalSpec.java:
##########
@@ -20,43 +20,67 @@
package org.apache.druid.client.indexing;
import com.fasterxml.jackson.annotation.JsonCreator;
+import com.fasterxml.jackson.annotation.JsonInclude;
import com.fasterxml.jackson.annotation.JsonProperty;
import org.apache.druid.java.util.common.IAE;
+import org.apache.druid.query.SegmentDescriptor;
import org.joda.time.Interval;
import javax.annotation.Nullable;
+import java.util.List;
import java.util.Objects;
+import java.util.stream.Collectors;
/**
* InputSpec for {@link ClientCompactionIOConfig}.
* <p>
- * Should be synchronized with
org.apache.druid.indexing.common.task.CompactionIntervalSpec.
+ * Should be synchronized with
org.apache.druid.indexing.common.task.CompactionIntervalSpec and
org.apache.druid.indexing.common.task.UncompactedInputSpec.
*/
public class ClientCompactionIntervalSpec
{
- private static final String TYPE = "interval";
+ private static final String TYPE_ALL_SEGMENTS = "interval";
+ private static final String TYPE_UNCOMPACTED_SEGMENTS_ONLY = "uncompacted";
private final Interval interval;
@Nullable
+ private final List<SegmentDescriptor> uncompactedSegments;
+ @Nullable
private final String sha256OfSortedSegmentIds;
@JsonCreator
public ClientCompactionIntervalSpec(
@JsonProperty("interval") Interval interval,
+ @JsonProperty("uncompactedSegments") @Nullable List<SegmentDescriptor>
uncompactedSegments,
@JsonProperty("sha256OfSortedSegmentIds") @Nullable String
sha256OfSortedSegmentIds
)
{
if (interval != null && interval.toDurationMillis() == 0) {
throw new IAE("Interval[%s] is empty, must specify a nonempty interval",
interval);
}
this.interval = interval;
+ if (uncompactedSegments == null) {
+ // perform a full compaction
+ } else if (uncompactedSegments.isEmpty()) {
+ throw new IAE("Can not supply empty segments as input, please use either
null or non-empty segments.");
+ } else if (interval != null) {
+ List<SegmentDescriptor> segmentsNotInInterval =
+ uncompactedSegments.stream().filter(s ->
!interval.contains(s.getInterval())).collect(Collectors.toList());
+ if (!segmentsNotInInterval.isEmpty()) {
+ throw new IAE(
+ "Can not supply segments outside interval[%s], got segments[%s].",
+ interval,
+ segmentsNotInInterval
+ );
+ }
+ }
+ this.uncompactedSegments = uncompactedSegments;
this.sha256OfSortedSegmentIds = sha256OfSortedSegmentIds;
}
@JsonProperty
public String getType()
{
- return TYPE;
+ return (uncompactedSegments == null) ? TYPE_ALL_SEGMENTS :
TYPE_UNCOMPACTED_SEGMENTS_ONLY;
Review Comment:
whats the point of `ClientCompactionIntervalSpec` if every class needs to be
mapped to CompactionIntervalSpec class?
##########
server/src/main/java/org/apache/druid/metadata/IndexerSQLMetadataStorageCoordinator.java:
##########
@@ -2186,12 +2222,13 @@ private Map<String, String>
getAppendSegmentsCommittedDuringTask(
);
ResultIterator<Pair<String, String>> resultIterator =
transaction.getHandle()
Review Comment:
done
##########
processing/src/main/java/org/apache/druid/timeline/partition/ShardSpec.java:
##########
@@ -56,6 +57,15 @@
})
public interface ShardSpec
{
+ /**
+ * Returns whether {@link #createChunk} returns a {@link
NumberedPartitionChunk} instance.
+ * This is necessary for supporting {@link PartitionHolder#isComplete()} if
updating to a new corePartitions spec.
+ */
+ default boolean isNumChunkSupported()
Review Comment:
done
##########
server/src/main/java/org/apache/druid/metadata/IndexerSQLMetadataStorageCoordinator.java:
##########
@@ -2097,17 +2119,27 @@ private SegmentMetadata
getSegmentMetadataFromSchemaMappingOrUpgradeMetadata(
return segmentMetadata;
}
+ @Override
+ public int insertIntoUpgradeSegmentsTable(Map<DataSegment, ReplaceTaskLock>
segmentToReplaceLock)
+ {
+ final String dataSource =
verifySegmentsToCommit(segmentToReplaceLock.keySet());
+ return inReadWriteDatasourceTransaction(
+ dataSource,
+ transaction -> insertIntoUpgradeSegmentsTableDoWork(transaction,
segmentToReplaceLock)
+ );
+ }
+
/**
* Inserts entries into the upgrade_segments table in batches of size
* {@link #MAX_NUM_SEGMENTS_TO_ANNOUNCE_AT_ONCE}.
*/
- private void insertIntoUpgradeSegmentsTable(
+ private int insertIntoUpgradeSegmentsTableDoWork(
Review Comment:
updated
##########
server/src/main/java/org/apache/druid/metadata/IndexerSQLMetadataStorageCoordinator.java:
##########
@@ -1959,9 +1968,21 @@ private Set<DataSegmentPlus>
createNewIdsOfAppendSegmentsAfterReplace(
oldSegmentMetadata.getIndexingStateFingerprint()
)
);
+ segmentsToInsert.add(dataSegment);
}
- return upgradedSegments;
+ // update corePartitions in shard spec
+ return Pair.of(upgradedSegments, segmentsToInsert.stream().map(segment -> {
+ Integer partitionNum =
intervalToCurrentPartitionNum.get(segment.getInterval());
+ if (!segment.isTombstone()
+ && !numChunkNotSupported.contains(segment.getInterval())
+ && partitionNum != null
+ && partitionNum + 1 !=
segment.getShardSpec().getNumCorePartitions()) {
Review Comment:
updated
##########
indexing-service/src/main/java/org/apache/druid/indexing/common/task/CompactionTask.java:
##########
@@ -627,18 +659,35 @@ static Map<Interval, DataSchema>
createDataSchemasForIntervals(
projections,
needMultiValuedColumns
);
- intervalDataSchemaMap.put(interval, dataSchema);
+ inputSchemas.put(
+ segmentProvider.minorCompaction
+ ? new MultipleSpecificSegmentSpec(segmentsToCompact.stream()
+
.map(DataSegment::toDescriptor)
+
.collect(Collectors.toList()))
+ : new MultipleIntervalSegmentSpec(List.of(interval)), dataSchema);
}
- return intervalDataSchemaMap;
+ return inputSchemas;
} else {
// given segment granularity
+ List<DataSegment> upgradeSegments = Lists.newArrayList(Iterables.filter(
+ timelineSegments,
+ segmentProvider.segmentsToUpgradePredicate
+ ));
+ if (!upgradeSegments.isEmpty()) {
+ toolbox.getTaskActionClient().submit(new
MarkSegmentToUpgradeAction(segmentProvider.dataSource, upgradeSegments));
Review Comment:
done
##########
indexing-service/src/main/java/org/apache/druid/indexing/common/task/CompactionTask.java:
##########
@@ -648,7 +697,11 @@ static Map<Interval, DataSchema>
createDataSchemasForIntervals(
projections,
needMultiValuedColumns
);
- return Collections.singletonMap(segmentProvider.interval, dataSchema);
+ return Map.of(segmentProvider.minorCompaction
+ ? new
MultipleSpecificSegmentSpec(StreamSupport.stream(segmentsToCompact.spliterator(),
false)
+
.map(DataSegment::toDescriptor)
+
.collect(Collectors.toList()))
+ : new
MultipleIntervalSegmentSpec(List.of(segmentProvider.interval)), dataSchema);
Review Comment:
done
##########
indexing-service/src/main/java/org/apache/druid/indexing/common/task/CompactionTask.java:
##########
@@ -1222,11 +1274,26 @@ static class SegmentProvider
private final CompactionInputSpec inputSpec;
private final Interval interval;
+ private final boolean minorCompaction;
+ private final Predicate<DataSegment> segmentsToUpgradePredicate;
+ private final Predicate<DataSegment> segmentsToCompactPredicate;
Review Comment:
done
##########
indexing-service/src/main/java/org/apache/druid/indexing/common/task/CompactionTask.java:
##########
@@ -572,17 +584,37 @@ static Map<Interval, DataSchema>
createDataSchemasForIntervals(
return Collections.emptyMap();
}
+ if (segmentProvider.minorCompaction) {
+ Iterable<DataSegment> segmentsNotCompletelyWithinin =
+ Iterables.filter(timelineSegments, s ->
!segmentProvider.interval.contains(s.getInterval()));
+ if (segmentsNotCompletelyWithinin.iterator().hasNext()) {
+ throw new ISE(
Review Comment:
timelineSegments is not user input, but the segments overlapping with the
interval. the interval can overlap with some segments, in this case we cant
simiply upgrade the overlapping segment or include it in the query. this is an
edge case we discussed before.
##########
indexing-service/src/main/java/org/apache/druid/indexing/common/task/NativeCompactionRunner.java:
##########
@@ -37,9 +37,11 @@
import org.apache.druid.indexing.input.DruidInputSource;
import org.apache.druid.java.util.common.IAE;
import org.apache.druid.java.util.common.Intervals;
+import org.apache.druid.java.util.common.JodaUtils;
Review Comment:
done
##########
indexing-service/src/main/java/org/apache/druid/indexing/common/task/CompactionTask.java:
##########
@@ -627,18 +659,35 @@ static Map<Interval, DataSchema>
createDataSchemasForIntervals(
projections,
needMultiValuedColumns
);
- intervalDataSchemaMap.put(interval, dataSchema);
+ inputSchemas.put(
+ segmentProvider.minorCompaction
+ ? new MultipleSpecificSegmentSpec(segmentsToCompact.stream()
+
.map(DataSegment::toDescriptor)
+
.collect(Collectors.toList()))
+ : new MultipleIntervalSegmentSpec(List.of(interval)), dataSchema);
}
- return intervalDataSchemaMap;
+ return inputSchemas;
} else {
// given segment granularity
+ List<DataSegment> upgradeSegments = Lists.newArrayList(Iterables.filter(
+ timelineSegments,
+ segmentProvider.segmentsToUpgradePredicate
Review Comment:
done
##########
indexing-service/src/main/java/org/apache/druid/indexing/common/task/CompactionTask.java:
##########
@@ -627,18 +659,35 @@ static Map<Interval, DataSchema>
createDataSchemasForIntervals(
projections,
needMultiValuedColumns
);
- intervalDataSchemaMap.put(interval, dataSchema);
+ inputSchemas.put(
+ segmentProvider.minorCompaction
+ ? new MultipleSpecificSegmentSpec(segmentsToCompact.stream()
+
.map(DataSegment::toDescriptor)
+
.collect(Collectors.toList()))
+ : new MultipleIntervalSegmentSpec(List.of(interval)), dataSchema);
}
- return intervalDataSchemaMap;
+ return inputSchemas;
} else {
// given segment granularity
+ List<DataSegment> upgradeSegments = Lists.newArrayList(Iterables.filter(
Review Comment:
done
##########
server/src/main/java/org/apache/druid/metadata/IndexerSQLMetadataStorageCoordinator.java:
##########
@@ -1959,9 +1968,21 @@ private Set<DataSegmentPlus>
createNewIdsOfAppendSegmentsAfterReplace(
oldSegmentMetadata.getIndexingStateFingerprint()
)
);
+ segmentsToInsert.add(dataSegment);
}
- return upgradedSegments;
+ // update corePartitions in shard spec
+ return Pair.of(upgradedSegments, segmentsToInsert.stream().map(segment -> {
Review Comment:
updated
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]