kfaraz commented on code in PR #19059: URL: https://github.com/apache/druid/pull/19059#discussion_r2871815005
########## indexing-service/src/main/java/org/apache/druid/indexing/common/task/UncompactedInputSpec.java: ########## @@ -0,0 +1,132 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.indexing.common.task; + +import com.fasterxml.jackson.annotation.JsonCreator; +import com.fasterxml.jackson.annotation.JsonProperty; +import org.apache.druid.indexing.common.LockGranularity; +import org.apache.druid.java.util.common.IAE; +import org.apache.druid.java.util.common.JodaUtils; +import org.apache.druid.query.SegmentDescriptor; +import org.apache.druid.timeline.DataSegment; +import org.joda.time.Interval; + +import java.util.List; +import java.util.Objects; +import java.util.stream.Collectors; + +/** + * Specifies uncompacted segments to compact within an interval. + * Used for minor compaction to compact only uncompacted segments while leaving compacted segments untouched. + */ +public class UncompactedInputSpec implements CompactionInputSpec +{ + public static final String TYPE = "uncompacted"; + + private final Interval interval; + private final List<SegmentDescriptor> uncompactedSegments; + + @JsonCreator + public UncompactedInputSpec( + @JsonProperty("interval") Interval interval, + @JsonProperty("uncompactedSegments") List<SegmentDescriptor> uncompactedSegments + ) + { + if (interval == null) { + throw new IAE("Uncompacted interval must not be null"); + } + if (interval.toDurationMillis() == 0) { + throw new IAE("Uncompacted interval[%s] is empty, must specify a nonempty interval", interval); + } + if (uncompactedSegments == null || uncompactedSegments.isEmpty()) { + throw new IAE("Uncompacted segments must not be null or empty"); + } + + // Validate that all segments are within the interval + List<SegmentDescriptor> segmentsNotInInterval = + uncompactedSegments.stream().filter(s -> !interval.contains(s.getInterval())).collect(Collectors.toList()); + if (!segmentsNotInInterval.isEmpty()) { + throw new IAE( + "All uncompacted segments must be within interval[%s], got segments outside interval: %s", + interval, + segmentsNotInInterval + ); + } Review Comment: Nit: move this to a separate private method ########## indexing-service/src/main/java/org/apache/druid/indexing/common/task/UncompactedInputSpec.java: ########## @@ -0,0 +1,132 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.indexing.common.task; + +import com.fasterxml.jackson.annotation.JsonCreator; +import com.fasterxml.jackson.annotation.JsonProperty; +import org.apache.druid.indexing.common.LockGranularity; +import org.apache.druid.java.util.common.IAE; +import org.apache.druid.java.util.common.JodaUtils; +import org.apache.druid.query.SegmentDescriptor; +import org.apache.druid.timeline.DataSegment; +import org.joda.time.Interval; + +import java.util.List; +import java.util.Objects; +import java.util.stream.Collectors; + +/** + * Specifies uncompacted segments to compact within an interval. + * Used for minor compaction to compact only uncompacted segments while leaving compacted segments untouched. + */ +public class UncompactedInputSpec implements CompactionInputSpec Review Comment: May be call it `MinorCompactionInputSpec`? ########## indexing-service/src/main/java/org/apache/druid/indexing/common/actions/MarkSegmentToUpgradeAction.java: ########## @@ -0,0 +1,103 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.indexing.common.actions; + +import com.fasterxml.jackson.annotation.JsonCreator; +import com.fasterxml.jackson.annotation.JsonProperty; +import com.fasterxml.jackson.core.type.TypeReference; +import org.apache.druid.error.InvalidInput; +import org.apache.druid.indexing.common.task.Task; +import org.apache.druid.metadata.ReplaceTaskLock; +import org.apache.druid.timeline.DataSegment; + +import java.util.List; +import java.util.Map; +import java.util.Set; + +/** + * Task action that records segments as being upgraded in the metadata store. + * <p> + * This action is used during compaction to track which segments are being replaced. + * It validates that all segments to be upgraded are covered by + * {@link ReplaceTaskLock}s before inserting them into the upgrade segments table. + * <p> + * The action will fail if any of the upgrade segments do not have a corresponding + * replace lock, ensuring that only properly locked segments can be marked for upgrade. + * + * @return the number of segments successfully inserted into the upgrade segments table + */ +public class MarkSegmentToUpgradeAction implements TaskAction<Integer> +{ + private final String dataSource; + private final List<DataSegment> upgradeSegments; + + /** + * @param dataSource the datasource containing the segments to upgrade + * @param upgradeSegments the list of segments to be recorded as upgraded + */ + @JsonCreator + public MarkSegmentToUpgradeAction( + @JsonProperty("dataSource") String dataSource, + @JsonProperty("upgradeSegments") List<DataSegment> upgradeSegments + ) + { + this.dataSource = dataSource; + this.upgradeSegments = upgradeSegments; + } + + @JsonProperty + public String getDataSource() + { + return dataSource; + } + + @JsonProperty + public List<DataSegment> getUpgradeSegments() + { + return upgradeSegments; + } + + @Override + public TypeReference<Integer> getReturnTypeReference() + { + return new TypeReference<>() + { + }; + } + + @Override + public Integer perform(Task task, TaskActionToolbox toolbox) + { + final String datasource = task.getDataSource(); + final Map<DataSegment, ReplaceTaskLock> segmentToReplaceLock + = TaskLocks.findReplaceLocksCoveringSegments(datasource, toolbox.getTaskLockbox(), Set.copyOf(upgradeSegments)); + + if (segmentToReplaceLock.size() < upgradeSegments.size()) { + throw InvalidInput.exception( + "Not all segments are hold by a replace lock, only [%d] segments out of total segments[%d] are hold by repalce lock", Review Comment: ```suggestion "Segments to upgrade must be covered by a REPLACE lock. Only [%d] out of [%d] segments are covered.", ``` ########## indexing-service/src/main/java/org/apache/druid/indexing/common/actions/MarkSegmentToUpgradeAction.java: ########## @@ -0,0 +1,103 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.indexing.common.actions; + +import com.fasterxml.jackson.annotation.JsonCreator; +import com.fasterxml.jackson.annotation.JsonProperty; +import com.fasterxml.jackson.core.type.TypeReference; +import org.apache.druid.error.InvalidInput; +import org.apache.druid.indexing.common.task.Task; +import org.apache.druid.metadata.ReplaceTaskLock; +import org.apache.druid.timeline.DataSegment; + +import java.util.List; +import java.util.Map; +import java.util.Set; + +/** + * Task action that records segments as being upgraded in the metadata store. + * <p> + * This action is used during compaction to track which segments are being replaced. + * It validates that all segments to be upgraded are covered by + * {@link ReplaceTaskLock}s before inserting them into the upgrade segments table. + * <p> + * The action will fail if any of the upgrade segments do not have a corresponding + * replace lock, ensuring that only properly locked segments can be marked for upgrade. + * + * @return the number of segments successfully inserted into the upgrade segments table + */ +public class MarkSegmentToUpgradeAction implements TaskAction<Integer> +{ + private final String dataSource; + private final List<DataSegment> upgradeSegments; + + /** + * @param dataSource the datasource containing the segments to upgrade + * @param upgradeSegments the list of segments to be recorded as upgraded + */ + @JsonCreator + public MarkSegmentToUpgradeAction( + @JsonProperty("dataSource") String dataSource, + @JsonProperty("upgradeSegments") List<DataSegment> upgradeSegments + ) + { + this.dataSource = dataSource; + this.upgradeSegments = upgradeSegments; + } + + @JsonProperty + public String getDataSource() + { + return dataSource; + } + + @JsonProperty + public List<DataSegment> getUpgradeSegments() + { + return upgradeSegments; + } + + @Override + public TypeReference<Integer> getReturnTypeReference() + { + return new TypeReference<>() + { + }; + } + + @Override + public Integer perform(Task task, TaskActionToolbox toolbox) + { + final String datasource = task.getDataSource(); + final Map<DataSegment, ReplaceTaskLock> segmentToReplaceLock + = TaskLocks.findReplaceLocksCoveringSegments(datasource, toolbox.getTaskLockbox(), Set.copyOf(upgradeSegments)); Review Comment: Since we use a set here, I wonder if the constructor of this class should also accept a `Set` instead of `List`. Otherwise, it is possible for users to pass in duplicate segments and then they would an exception `Not all segments are hold by replace lock`, which might be confusing. ########## indexing-service/src/main/java/org/apache/druid/indexing/common/task/UncompactedInputSpec.java: ########## @@ -0,0 +1,132 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.indexing.common.task; + +import com.fasterxml.jackson.annotation.JsonCreator; +import com.fasterxml.jackson.annotation.JsonProperty; +import org.apache.druid.indexing.common.LockGranularity; +import org.apache.druid.java.util.common.IAE; +import org.apache.druid.java.util.common.JodaUtils; +import org.apache.druid.query.SegmentDescriptor; +import org.apache.druid.timeline.DataSegment; +import org.joda.time.Interval; + +import java.util.List; +import java.util.Objects; +import java.util.stream.Collectors; + +/** + * Specifies uncompacted segments to compact within an interval. + * Used for minor compaction to compact only uncompacted segments while leaving compacted segments untouched. + */ +public class UncompactedInputSpec implements CompactionInputSpec +{ + public static final String TYPE = "uncompacted"; + + private final Interval interval; + private final List<SegmentDescriptor> uncompactedSegments; + + @JsonCreator + public UncompactedInputSpec( + @JsonProperty("interval") Interval interval, + @JsonProperty("uncompactedSegments") List<SegmentDescriptor> uncompactedSegments + ) + { + if (interval == null) { + throw new IAE("Uncompacted interval must not be null"); Review Comment: Please use `InvalidInput` exceptions instead. ########## indexing-service/src/main/java/org/apache/druid/indexing/common/task/UncompactedInputSpec.java: ########## @@ -0,0 +1,132 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.indexing.common.task; + +import com.fasterxml.jackson.annotation.JsonCreator; +import com.fasterxml.jackson.annotation.JsonProperty; +import org.apache.druid.indexing.common.LockGranularity; +import org.apache.druid.java.util.common.IAE; +import org.apache.druid.java.util.common.JodaUtils; +import org.apache.druid.query.SegmentDescriptor; +import org.apache.druid.timeline.DataSegment; +import org.joda.time.Interval; + +import java.util.List; +import java.util.Objects; +import java.util.stream.Collectors; + +/** + * Specifies uncompacted segments to compact within an interval. + * Used for minor compaction to compact only uncompacted segments while leaving compacted segments untouched. Review Comment: ```suggestion * Used for MSQ-based minor compaction to compact only uncompacted segments while upgrading compacted segments (i.e. no change to physical segment files). ``` ########## server/src/main/java/org/apache/druid/metadata/IndexerSQLMetadataStorageCoordinator.java: ########## @@ -2186,12 +2222,13 @@ private Map<String, String> getAppendSegmentsCommittedDuringTask( ); ResultIterator<Pair<String, String>> resultIterator = transaction.getHandle() Review Comment: Nit: Move the `.getHandle()` to the next line to make the reformatting changes smaller and the code cleaner (less indented). ########## indexing-service/src/main/java/org/apache/druid/indexing/compact/CompactionSupervisorSpec.java: ########## @@ -96,12 +97,12 @@ public CompactionSupervisor createSupervisor() /** * @return {@link CompactionJobTemplate} used to create jobs for the supervisor. */ - public CompactionJobTemplate getTemplate() + public CompactionJobTemplate getTemplate(CompactionStatusTracker statusTracker) Review Comment: Please don't make this change. This breaks the clean template APIs already in place. Please follow the steps outlined here instead: https://github.com/apache/druid/pull/19059#discussion_r2864067544 ########## indexing-service/src/main/java/org/apache/druid/indexing/common/task/CompactionTask.java: ########## @@ -572,17 +584,37 @@ static Map<Interval, DataSchema> createDataSchemasForIntervals( return Collections.emptyMap(); } + if (segmentProvider.minorCompaction) { + Iterable<DataSegment> segmentsNotCompletelyWithinin = + Iterables.filter(timelineSegments, s -> !segmentProvider.interval.contains(s.getInterval())); + if (segmentsNotCompletelyWithinin.iterator().hasNext()) { + throw new ISE( Review Comment: Please use `DruidException`. Is this case possible? Isn't this validation already done in the new `CompactionInputSpec` class? ########## indexing-service/src/main/java/org/apache/druid/indexing/compact/CompactionConfigBasedJobTemplate.java: ########## @@ -91,12 +96,35 @@ public List<CompactionJob> createCompactionJobs( // Create a job for each CompactionCandidate while (segmentIterator.hasNext()) { final CompactionCandidate candidate = segmentIterator.next(); + final CompactionCandidateSearchPolicy.Eligibility eligibility = + params.getClusterCompactionConfig() + .getCompactionPolicy() + .checkEligibilityForCompaction(candidate, statusTracker.getLatestTaskStatus(candidate)); + if (!eligibility.isEligible()) { + continue; + } + final CompactionCandidate finalCandidate; + switch (eligibility.getMode()) { + case ALL_SEGMENTS: + finalCandidate = candidate; + break; + case UNCOMPACTED_SEGMENTS_ONLY: + finalCandidate = CompactionCandidate.from( + candidate.getUncompactedSegments(), + null, + candidate.getCurrentStatus() + ); Review Comment: Why is this change needed? Is the candidate being changed in any way? ########## indexing-service/src/main/java/org/apache/druid/indexing/common/task/CompactionTask.java: ########## @@ -627,18 +659,35 @@ static Map<Interval, DataSchema> createDataSchemasForIntervals( projections, needMultiValuedColumns ); - intervalDataSchemaMap.put(interval, dataSchema); + inputSchemas.put( + segmentProvider.minorCompaction + ? new MultipleSpecificSegmentSpec(segmentsToCompact.stream() + .map(DataSegment::toDescriptor) + .collect(Collectors.toList())) + : new MultipleIntervalSegmentSpec(List.of(interval)), dataSchema); } - return intervalDataSchemaMap; + return inputSchemas; } else { // given segment granularity + List<DataSegment> upgradeSegments = Lists.newArrayList(Iterables.filter( + timelineSegments, + segmentProvider.segmentsToUpgradePredicate + )); + if (!upgradeSegments.isEmpty()) { + toolbox.getTaskActionClient().submit(new MarkSegmentToUpgradeAction(segmentProvider.dataSource, upgradeSegments)); Review Comment: Please add a log line here. ########## indexing-service/src/main/java/org/apache/druid/indexing/common/task/CompactionTask.java: ########## @@ -648,7 +697,11 @@ static Map<Interval, DataSchema> createDataSchemasForIntervals( projections, needMultiValuedColumns ); - return Collections.singletonMap(segmentProvider.interval, dataSchema); + return Map.of(segmentProvider.minorCompaction + ? new MultipleSpecificSegmentSpec(StreamSupport.stream(segmentsToCompact.spliterator(), false) + .map(DataSegment::toDescriptor) + .collect(Collectors.toList())) + : new MultipleIntervalSegmentSpec(List.of(segmentProvider.interval)), dataSchema); Review Comment: Current code is difficult to follow. Please use `if-else` and a separate variable to clean it up. ```suggestion final QuerySegmentSpec segmentSpec; if (segmentProvider.isMinorCompaction) { segmentSpec = new MultipleSpecificSegmentSpec( StreamSupport.stream(segmentsToCompact.spliterator(), false) .map(DataSegment::toDescriptor) .collect(Collectors.toList()) ); } else { segmentSpec = new MultipleIntervalSegmentSpec(List.of(segmentProvider.interval)); } return Map.of(segmentSpec, dataSchema); ``` ########## server/src/main/java/org/apache/druid/metadata/IndexerSQLMetadataStorageCoordinator.java: ########## @@ -1959,9 +1968,21 @@ private Set<DataSegmentPlus> createNewIdsOfAppendSegmentsAfterReplace( oldSegmentMetadata.getIndexingStateFingerprint() ) ); + segmentsToInsert.add(dataSegment); } - return upgradedSegments; + // update corePartitions in shard spec + return Pair.of(upgradedSegments, segmentsToInsert.stream().map(segment -> { + Integer partitionNum = intervalToCurrentPartitionNum.get(segment.getInterval()); + if (!segment.isTombstone() + && !numChunkNotSupported.contains(segment.getInterval()) + && partitionNum != null + && partitionNum + 1 != segment.getShardSpec().getNumCorePartitions()) { Review Comment: Why this condition? Is it possible that existing compacted segments in an interval were range partitioned and had say 10 core partitions. And the segments created by the minor compaction are also range partitioned and have say 5 core partitions. In this case, none of the segments have core partition = 16, so will their shard spec remain unchanged? ########## indexing-service/src/main/java/org/apache/druid/indexing/common/task/CompactionTask.java: ########## @@ -1222,11 +1274,26 @@ static class SegmentProvider private final CompactionInputSpec inputSpec; private final Interval interval; + private final boolean minorCompaction; + private final Predicate<DataSegment> segmentsToUpgradePredicate; + private final Predicate<DataSegment> segmentsToCompactPredicate; Review Comment: Nit: Instead of predicates, please add private methods `shouldUpgradeSegment(DataSegment)` and `shouldCompactSegment(DataSegment)` to the `SegmentProvider` class. Use of predicates does not really seem warranted here. ########## server/src/main/java/org/apache/druid/metadata/IndexerSQLMetadataStorageCoordinator.java: ########## @@ -1852,9 +1853,14 @@ protected Set<DataSegment> insertSegments( } /** - * Creates new versions of segments appended while a "REPLACE" task was in progress. + * Retrieves segments from the upgrade segments table and creates upgraded versions with new intervals, + * versions, and partition numbers. Combines upgraded segments with replace segments and updates shard + * specs with correct core partition counts. + * + * @return pair of (upgraded segments for metadata tracking, segments to insert into segment table) + * @throws DruidException if a replace interval partially overlaps a segment being upgraded */ - private Set<DataSegmentPlus> createNewIdsOfAppendSegmentsAfterReplace( + private Pair<Set<DataSegmentPlus>, Set<DataSegment>> createNewSegmentsAfterReplace( Review Comment: This method should just return a `Set<DataSegmentPlus>`. Each `DataSegmentPlus` already contains a `DataSegment` inside it. The `DataSegmentPlus.getUpgradedFromSegmentId()` should be used to distinguish if the segment is an upgraded one or not. ########## server/src/main/java/org/apache/druid/metadata/IndexerSQLMetadataStorageCoordinator.java: ########## @@ -1925,15 +1933,16 @@ private Set<DataSegmentPlus> createNewIdsOfAppendSegmentsAfterReplace( // but a (revoked) REPLACE lock covers this segment newInterval = oldInterval; } + if (!oldSegment.getShardSpec().isNumChunkSupported()) { + numChunkNotSupported.add(newInterval); + } // Compute shard spec of the upgraded segment final int partitionNum = intervalToCurrentPartitionNum.compute( newInterval, (i, value) -> value == null ? 0 : value + 1 ); - final int numCorePartitions = intervalToNumCorePartitions.get(newInterval); - ShardSpec shardSpec = new NumberedShardSpec(partitionNum, numCorePartitions); - + final ShardSpec shardSpec = oldSegment.getShardSpec().withPartitionNum(partitionNum); // Create upgraded segment with the correct interval, version and shard spec String lockVersion = upgradeSegmentToLockVersion.get(oldSegment.getId().toString()); DataSegment dataSegment = DataSegment.builder(oldSegment) Review Comment: Use the final shardSpec to build this `DataSegment`. ########## server/src/main/java/org/apache/druid/client/indexing/ClientCompactionIntervalSpec.java: ########## @@ -20,43 +20,67 @@ package org.apache.druid.client.indexing; import com.fasterxml.jackson.annotation.JsonCreator; +import com.fasterxml.jackson.annotation.JsonInclude; import com.fasterxml.jackson.annotation.JsonProperty; import org.apache.druid.java.util.common.IAE; +import org.apache.druid.query.SegmentDescriptor; import org.joda.time.Interval; import javax.annotation.Nullable; +import java.util.List; import java.util.Objects; +import java.util.stream.Collectors; /** * InputSpec for {@link ClientCompactionIOConfig}. * <p> - * Should be synchronized with org.apache.druid.indexing.common.task.CompactionIntervalSpec. + * Should be synchronized with org.apache.druid.indexing.common.task.CompactionIntervalSpec and org.apache.druid.indexing.common.task.UncompactedInputSpec. */ public class ClientCompactionIntervalSpec { - private static final String TYPE = "interval"; + private static final String TYPE_ALL_SEGMENTS = "interval"; + private static final String TYPE_UNCOMPACTED_SEGMENTS_ONLY = "uncompacted"; private final Interval interval; @Nullable + private final List<SegmentDescriptor> uncompactedSegments; + @Nullable private final String sha256OfSortedSegmentIds; @JsonCreator public ClientCompactionIntervalSpec( @JsonProperty("interval") Interval interval, + @JsonProperty("uncompactedSegments") @Nullable List<SegmentDescriptor> uncompactedSegments, @JsonProperty("sha256OfSortedSegmentIds") @Nullable String sha256OfSortedSegmentIds ) { if (interval != null && interval.toDurationMillis() == 0) { throw new IAE("Interval[%s] is empty, must specify a nonempty interval", interval); } this.interval = interval; + if (uncompactedSegments == null) { + // perform a full compaction + } else if (uncompactedSegments.isEmpty()) { + throw new IAE("Can not supply empty segments as input, please use either null or non-empty segments."); + } else if (interval != null) { + List<SegmentDescriptor> segmentsNotInInterval = + uncompactedSegments.stream().filter(s -> !interval.contains(s.getInterval())).collect(Collectors.toList()); + if (!segmentsNotInInterval.isEmpty()) { + throw new IAE( + "Can not supply segments outside interval[%s], got segments[%s].", + interval, + segmentsNotInInterval + ); + } + } + this.uncompactedSegments = uncompactedSegments; this.sha256OfSortedSegmentIds = sha256OfSortedSegmentIds; } @JsonProperty public String getType() { - return TYPE; + return (uncompactedSegments == null) ? TYPE_ALL_SEGMENTS : TYPE_UNCOMPACTED_SEGMENTS_ONLY; Review Comment: This seems unclean. Please add a separate class instead. ########## embedded-tests/src/test/java/org/apache/druid/testing/embedded/compact/CompactionSupervisorTest.java: ########## @@ -202,6 +229,133 @@ public void test_ingestDayGranularity_andCompactToMonthGranularity_andCompactToY verifyCompactedSegmentsHaveFingerprints(yearGranConfig); } + @Test + public void test_minorCompactionWithMSQ() throws Exception + { + configureCompaction( + CompactionEngine.MSQ, + new MostFragmentedIntervalFirstPolicy(2, new HumanReadableBytes("1KiB"), null, 80, null) + ); + KafkaSupervisorSpecBuilder kafkaSupervisorSpecBuilder = MoreResources.Supervisor.KAFKA_JSON + .get() + .withDataSchema(schema -> schema.withTimestamp(new TimestampSpec("timestamp", "iso", null)) + .withDimensions(DimensionsSpec.builder().useSchemaDiscovery(true).build())) + .withTuningConfig(tuningConfig -> tuningConfig.withMaxRowsPerSegment(1)) + .withIoConfig(ioConfig -> ioConfig.withConsumerProperties(kafkaServer.consumerProperties()).withTaskCount(2)); + + // Set up first topic and supervisor + final String topic1 = IdUtils.getRandomId(); + kafkaServer.createTopicWithPartitions(topic1, 1); + final KafkaSupervisorSpec supervisor1 = kafkaSupervisorSpecBuilder.withId(topic1).build(dataSource, topic1); + cluster.callApi().postSupervisor(supervisor1); + + final int totalRowCount = publish1kRecords(topic1, true) + publish1kRecords(topic1, false); + waitUntilPublishedRecordsAreIngested(totalRowCount); + + // Before compaction + Assertions.assertEquals(4, getNumSegmentsWith(Granularities.HOUR)); + + PartitionsSpec partitionsSpec = new DimensionRangePartitionsSpec(null, 5000, List.of("page"), false); Review Comment: Please add a test case for dynamic partitioning too. ########## processing/src/main/java/org/apache/druid/timeline/partition/ShardSpec.java: ########## @@ -56,6 +57,15 @@ }) public interface ShardSpec { + /** + * Returns whether {@link #createChunk} returns a {@link NumberedPartitionChunk} instance. + * This is necessary for supporting {@link PartitionHolder#isComplete()} if updating to a new corePartitions spec. + */ + default boolean isNumChunkSupported() Review Comment: ```suggestion default boolean canCreateNumberedPartitionChunk() ``` ########## indexing-service/src/main/java/org/apache/druid/indexing/common/task/CompactionTask.java: ########## @@ -627,18 +659,35 @@ static Map<Interval, DataSchema> createDataSchemasForIntervals( projections, needMultiValuedColumns ); - intervalDataSchemaMap.put(interval, dataSchema); + inputSchemas.put( + segmentProvider.minorCompaction + ? new MultipleSpecificSegmentSpec(segmentsToCompact.stream() + .map(DataSegment::toDescriptor) + .collect(Collectors.toList())) + : new MultipleIntervalSegmentSpec(List.of(interval)), dataSchema); } - return intervalDataSchemaMap; + return inputSchemas; } else { // given segment granularity + List<DataSegment> upgradeSegments = Lists.newArrayList(Iterables.filter( + timelineSegments, + segmentProvider.segmentsToUpgradePredicate Review Comment: ```suggestion segmentProvider::shouldUpgradeSegment ``` ########## indexing-service/src/main/java/org/apache/druid/indexing/common/task/CompactionTask.java: ########## @@ -627,18 +659,35 @@ static Map<Interval, DataSchema> createDataSchemasForIntervals( projections, needMultiValuedColumns ); - intervalDataSchemaMap.put(interval, dataSchema); + inputSchemas.put( + segmentProvider.minorCompaction + ? new MultipleSpecificSegmentSpec(segmentsToCompact.stream() + .map(DataSegment::toDescriptor) + .collect(Collectors.toList())) + : new MultipleIntervalSegmentSpec(List.of(interval)), dataSchema); Review Comment: Compute the value for the key `QuerySegmentSpec` in a separate line rather than computing it inside the `put`. Current code is difficult to follow. ########## indexing-service/src/main/java/org/apache/druid/indexing/compact/CascadingReindexingTemplate.java: ########## @@ -110,7 +113,8 @@ public CascadingReindexingTemplate( @JsonProperty("taskContext") @Nullable Map<String, Object> taskContext, @JsonProperty("skipOffsetFromLatest") @Nullable Period skipOffsetFromLatest, @JsonProperty("skipOffsetFromNow") @Nullable Period skipOffsetFromNow, - @JsonProperty("defaultSegmentGranularity") Granularity defaultSegmentGranularity + @JsonProperty("defaultSegmentGranularity") Granularity defaultSegmentGranularity, + @JacksonInject CompactionStatusTracker statusTracker Review Comment: The status tracker should not be passed here. It would be passed via `CompactionJobParams` in a manner similar to the `IndexingStateFingerprintMapper` interface. Please refer to the other comments regarding the `CompactionStatusTracker`. ########## indexing-service/src/main/java/org/apache/druid/indexing/common/task/CompactionTask.java: ########## @@ -627,18 +659,35 @@ static Map<Interval, DataSchema> createDataSchemasForIntervals( projections, needMultiValuedColumns ); - intervalDataSchemaMap.put(interval, dataSchema); + inputSchemas.put( + segmentProvider.minorCompaction + ? new MultipleSpecificSegmentSpec(segmentsToCompact.stream() + .map(DataSegment::toDescriptor) + .collect(Collectors.toList())) + : new MultipleIntervalSegmentSpec(List.of(interval)), dataSchema); } - return intervalDataSchemaMap; + return inputSchemas; } else { // given segment granularity + List<DataSegment> upgradeSegments = Lists.newArrayList(Iterables.filter( Review Comment: ```suggestion List<DataSegment> segmentsToUpgrade = Lists.newArrayList(Iterables.filter( ``` ########## server/src/main/java/org/apache/druid/metadata/IndexerSQLMetadataStorageCoordinator.java: ########## @@ -2097,17 +2119,27 @@ private SegmentMetadata getSegmentMetadataFromSchemaMappingOrUpgradeMetadata( return segmentMetadata; } + @Override + public int insertIntoUpgradeSegmentsTable(Map<DataSegment, ReplaceTaskLock> segmentToReplaceLock) + { + final String dataSource = verifySegmentsToCommit(segmentToReplaceLock.keySet()); + return inReadWriteDatasourceTransaction( + dataSource, + transaction -> insertIntoUpgradeSegmentsTableDoWork(transaction, segmentToReplaceLock) + ); + } + /** * Inserts entries into the upgrade_segments table in batches of size * {@link #MAX_NUM_SEGMENTS_TO_ANNOUNCE_AT_ONCE}. */ - private void insertIntoUpgradeSegmentsTable( + private int insertIntoUpgradeSegmentsTableDoWork( Review Comment: Please do not rename this method if the intention is only to distinguish it from the new `insertIntoUpgradeSegmentsTable(map)` method. It is okay for two methods to have the same name if they serve a similar purpose but have different args. ########## indexing-service/src/main/java/org/apache/druid/indexing/compact/OverlordCompactionScheduler.java: ########## @@ -259,6 +259,12 @@ public boolean isRunning() return started.get(); } + @Override + public CompactionStatusTracker getCompactionStatusTracker() Review Comment: this shouldn't be required ########## indexing-service/src/main/java/org/apache/druid/indexing/common/task/CompactionTask.java: ########## @@ -572,17 +584,37 @@ static Map<Interval, DataSchema> createDataSchemasForIntervals( return Collections.emptyMap(); } + if (segmentProvider.minorCompaction) { + Iterable<DataSegment> segmentsNotCompletelyWithinin = + Iterables.filter(timelineSegments, s -> !segmentProvider.interval.contains(s.getInterval())); + if (segmentsNotCompletelyWithinin.iterator().hasNext()) { + throw new ISE( + "Incremental compaction doesn't allow segments not completely within interval[%s]", + segmentProvider.interval + ); + } + } + if (granularitySpec == null || granularitySpec.getSegmentGranularity() == null) { - Map<Interval, DataSchema> intervalDataSchemaMap = new HashMap<>(); + Map<QuerySegmentSpec, DataSchema> inputSchemas = new HashMap<>(); + // if segment is already compacted in incremental compaction, they need to be upgraded directly, supported in MSQ + List<DataSegment> upgradeSegments = new ArrayList<>(); // original granularity final Map<Interval, List<DataSegment>> intervalToSegments = new TreeMap<>( Comparators.intervalsByStartThenEnd() ); for (final DataSegment dataSegment : timelineSegments) { - intervalToSegments.computeIfAbsent(dataSegment.getInterval(), k -> new ArrayList<>()) - .add(dataSegment); + if (segmentProvider.segmentsToUpgradePredicate.test(dataSegment)) { + upgradeSegments.add(dataSegment); + } else { + intervalToSegments.computeIfAbsent(dataSegment.getInterval(), k -> new ArrayList<>()) + .add(dataSegment); + } + } + if (!upgradeSegments.isEmpty()) { + toolbox.getTaskActionClient().submit(new MarkSegmentToUpgradeAction(segmentProvider.dataSource, upgradeSegments)); Review Comment: A log line here would be helpful. ########## server/src/main/java/org/apache/druid/metadata/IndexerSQLMetadataStorageCoordinator.java: ########## @@ -1959,9 +1968,21 @@ private Set<DataSegmentPlus> createNewIdsOfAppendSegmentsAfterReplace( oldSegmentMetadata.getIndexingStateFingerprint() ) ); + segmentsToInsert.add(dataSegment); } - return upgradedSegments; + // update corePartitions in shard spec + return Pair.of(upgradedSegments, segmentsToInsert.stream().map(segment -> { Review Comment: Please break up this statement to separate out the return and the computation. ########## indexing-service/src/main/java/org/apache/druid/indexing/common/task/NativeCompactionRunner.java: ########## @@ -37,9 +37,11 @@ import org.apache.druid.indexing.input.DruidInputSource; import org.apache.druid.java.util.common.IAE; import org.apache.druid.java.util.common.Intervals; +import org.apache.druid.java.util.common.JodaUtils; Review Comment: The validation in `NativeCompactionRunner.validateCompactionTask()` should return a `failure` if the `CompactionInputSpec` is of the new type `Uncompacted` (aka minor compaction). ########## embedded-tests/src/test/java/org/apache/druid/testing/embedded/compact/CompactionSupervisorTest.java: ########## @@ -202,6 +229,133 @@ public void test_ingestDayGranularity_andCompactToMonthGranularity_andCompactToY verifyCompactedSegmentsHaveFingerprints(yearGranConfig); } + @Test + public void test_minorCompactionWithMSQ() throws Exception + { + configureCompaction( + CompactionEngine.MSQ, + new MostFragmentedIntervalFirstPolicy(2, new HumanReadableBytes("1KiB"), null, 80, null) + ); + KafkaSupervisorSpecBuilder kafkaSupervisorSpecBuilder = MoreResources.Supervisor.KAFKA_JSON + .get() + .withDataSchema(schema -> schema.withTimestamp(new TimestampSpec("timestamp", "iso", null)) + .withDimensions(DimensionsSpec.builder().useSchemaDiscovery(true).build())) + .withTuningConfig(tuningConfig -> tuningConfig.withMaxRowsPerSegment(1)) + .withIoConfig(ioConfig -> ioConfig.withConsumerProperties(kafkaServer.consumerProperties()).withTaskCount(2)); + + // Set up first topic and supervisor + final String topic1 = IdUtils.getRandomId(); + kafkaServer.createTopicWithPartitions(topic1, 1); + final KafkaSupervisorSpec supervisor1 = kafkaSupervisorSpecBuilder.withId(topic1).build(dataSource, topic1); + cluster.callApi().postSupervisor(supervisor1); + + final int totalRowCount = publish1kRecords(topic1, true) + publish1kRecords(topic1, false); + waitUntilPublishedRecordsAreIngested(totalRowCount); + + // Before compaction + Assertions.assertEquals(4, getNumSegmentsWith(Granularities.HOUR)); + + PartitionsSpec partitionsSpec = new DimensionRangePartitionsSpec(null, 5000, List.of("page"), false); + // Create a compaction config with DAY granularity + InlineSchemaDataSourceCompactionConfig dayGranularityConfig = + InlineSchemaDataSourceCompactionConfig + .builder() + .forDataSource(dataSource) + .withSkipOffsetFromLatest(Period.seconds(0)) + .withGranularitySpec(new UserCompactionTaskGranularityConfig(Granularities.DAY, null, false)) + .withDimensionsSpec(new UserCompactionTaskDimensionsConfig( + WikipediaStreamEventStreamGenerator.dimensions() + .stream() + .map(StringDimensionSchema::new) + .collect(Collectors.toUnmodifiableList()))) + .withTaskContext(Map.of("useConcurrentLocks", true)) + .withIoConfig(new UserCompactionTaskIOConfig(true)) + .withTuningConfig(UserCompactionTaskQueryTuningConfig.builder().partitionsSpec(partitionsSpec).build()) + .build(); + + runCompactionWithSpec(dayGranularityConfig); + waitForAllCompactionTasksToFinish(); + + pauseCompaction(dayGranularityConfig); + Assertions.assertEquals(0, getNumSegmentsWith(Granularities.HOUR)); + Assertions.assertEquals(1, getNumSegmentsWith(Granularities.DAY)); + + verifyCompactedSegmentsHaveFingerprints(dayGranularityConfig); + + // Set up another topic and supervisor Review Comment: Why do we need a separate topic and supervisor? ########## embedded-tests/src/test/java/org/apache/druid/testing/embedded/compact/CompactionSupervisorTest.java: ########## @@ -202,6 +229,133 @@ public void test_ingestDayGranularity_andCompactToMonthGranularity_andCompactToY verifyCompactedSegmentsHaveFingerprints(yearGranConfig); } + @Test + public void test_minorCompactionWithMSQ() throws Exception + { + configureCompaction( + CompactionEngine.MSQ, + new MostFragmentedIntervalFirstPolicy(2, new HumanReadableBytes("1KiB"), null, 80, null) + ); + KafkaSupervisorSpecBuilder kafkaSupervisorSpecBuilder = MoreResources.Supervisor.KAFKA_JSON + .get() + .withDataSchema(schema -> schema.withTimestamp(new TimestampSpec("timestamp", "iso", null)) + .withDimensions(DimensionsSpec.builder().useSchemaDiscovery(true).build())) + .withTuningConfig(tuningConfig -> tuningConfig.withMaxRowsPerSegment(1)) + .withIoConfig(ioConfig -> ioConfig.withConsumerProperties(kafkaServer.consumerProperties()).withTaskCount(2)); + + // Set up first topic and supervisor + final String topic1 = IdUtils.getRandomId(); + kafkaServer.createTopicWithPartitions(topic1, 1); + final KafkaSupervisorSpec supervisor1 = kafkaSupervisorSpecBuilder.withId(topic1).build(dataSource, topic1); + cluster.callApi().postSupervisor(supervisor1); + + final int totalRowCount = publish1kRecords(topic1, true) + publish1kRecords(topic1, false); + waitUntilPublishedRecordsAreIngested(totalRowCount); + + // Before compaction + Assertions.assertEquals(4, getNumSegmentsWith(Granularities.HOUR)); + + PartitionsSpec partitionsSpec = new DimensionRangePartitionsSpec(null, 5000, List.of("page"), false); + // Create a compaction config with DAY granularity + InlineSchemaDataSourceCompactionConfig dayGranularityConfig = + InlineSchemaDataSourceCompactionConfig + .builder() + .forDataSource(dataSource) + .withSkipOffsetFromLatest(Period.seconds(0)) + .withGranularitySpec(new UserCompactionTaskGranularityConfig(Granularities.DAY, null, false)) + .withDimensionsSpec(new UserCompactionTaskDimensionsConfig( + WikipediaStreamEventStreamGenerator.dimensions() + .stream() + .map(StringDimensionSchema::new) + .collect(Collectors.toUnmodifiableList()))) + .withTaskContext(Map.of("useConcurrentLocks", true)) + .withIoConfig(new UserCompactionTaskIOConfig(true)) + .withTuningConfig(UserCompactionTaskQueryTuningConfig.builder().partitionsSpec(partitionsSpec).build()) + .build(); + + runCompactionWithSpec(dayGranularityConfig); + waitForAllCompactionTasksToFinish(); + + pauseCompaction(dayGranularityConfig); + Assertions.assertEquals(0, getNumSegmentsWith(Granularities.HOUR)); + Assertions.assertEquals(1, getNumSegmentsWith(Granularities.DAY)); + + verifyCompactedSegmentsHaveFingerprints(dayGranularityConfig); + + // Set up another topic and supervisor + final String topic2 = IdUtils.getRandomId(); + kafkaServer.createTopicWithPartitions(topic2, 1); + final KafkaSupervisorSpec supervisor2 = kafkaSupervisorSpecBuilder.withId(topic2).build(dataSource, topic2); + cluster.callApi().postSupervisor(supervisor2); + + // published another 1k + final int appendedRowCount = publish1kRecords(topic2, true); + indexer.latchableEmitter().flush(); + waitUntilPublishedRecordsAreIngested(appendedRowCount); + + // Tear down both topics and supervisors + kafkaServer.deleteTopic(topic1); + cluster.callApi().postSupervisor(supervisor1.createSuspendedSpec()); + kafkaServer.deleteTopic(topic2); + cluster.callApi().postSupervisor(supervisor2.createSuspendedSpec()); + long totalUsed = overlord.latchableEmitter().getMetricValues( + "segment/metadataCache/used/count", + Map.of(DruidMetrics.DATASOURCE, dataSource) + ).stream().reduce((first, second) -> second).orElse(0).longValue(); + + Assertions.assertEquals(0, getNumSegmentsWith(Granularities.HOUR)); + // 1 compacted segment + 2 appended segment + Assertions.assertEquals(3, getNumSegmentsWith(Granularities.DAY)); + + runCompactionWithSpec(dayGranularityConfig); + waitForAllCompactionTasksToFinish(); + + // wait for new segments have been updated to the cache + overlord.latchableEmitter().waitForEvent( + event -> event.hasMetricName("segment/metadataCache/used/count") + .hasDimension(DruidMetrics.DATASOURCE, dataSource) + .hasValueMatching(Matchers.greaterThan(totalUsed))); + + // performed incremental compaction: 1 previously compacted segment + 1 incrementally compacted segment + Assertions.assertEquals(2, getNumSegmentsWith(Granularities.DAY)); Review Comment: Please also add assertions to verify the results of some queries on the final compacted data. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected] --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
