AmatyaAvadhanula commented on code in PR #15281: URL: https://github.com/apache/druid/pull/15281#discussion_r1420392344
########## server/src/main/java/org/apache/druid/server/coordinator/duty/MarkEternityTombstonesAsUnused.java: ########## @@ -0,0 +1,173 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.server.coordinator.duty; + +import com.google.common.base.Optional; +import org.apache.druid.client.DataSourcesSnapshot; +import org.apache.druid.java.util.common.DateTimes; +import org.apache.druid.java.util.common.Intervals; +import org.apache.druid.java.util.common.granularity.Granularities; +import org.apache.druid.java.util.common.logger.Logger; +import org.apache.druid.server.coordinator.DruidCoordinatorRuntimeParams; +import org.apache.druid.server.coordinator.stats.CoordinatorRunStats; +import org.apache.druid.server.coordinator.stats.Dimension; +import org.apache.druid.server.coordinator.stats.RowKey; +import org.apache.druid.server.coordinator.stats.Stats; +import org.apache.druid.timeline.DataSegment; +import org.apache.druid.timeline.Partitions; +import org.apache.druid.timeline.SegmentId; +import org.apache.druid.timeline.SegmentTimeline; +import org.apache.druid.timeline.partition.TombstoneShardSpec; + +import java.util.HashMap; +import java.util.HashSet; +import java.util.Map; +import java.util.Set; + +/** + * Mark eternity tombstones not overshadowed by currently served segments as unused. A candidate segment must fit all + * the criteria: + * <li> It is a tombstone that starts at {@link DateTimes#MIN} or ends at {@link DateTimes#MAX} </li> + * <li> It does not overlap with any overshadowed segment in the datasource </li> + * <li> It has has 0 core partitions i.e., {@link TombstoneShardSpec#getNumCorePartitions()} == 0</li> + * + * <p> + * Only infinite-interval tombstones are considered as candidate segments in this duty because they + * don't honor the preferred segment granularity specified at ingest time to cover an underlying segment with + * {@link Granularities#ALL} as it can generate too many segments per time chunk and cause an OOM. The infinite-interval + * tombstones make it hard to append data on the end of a data set that started out with an {@link Granularities#ALL} eternity and then + * moved to actual time grains, so the compromise is that the coordinator will remove these segments as long as it doesn't overlap any other + * segment. + * </p> + * <p> + * The overlapping condition is necessary as a candidate segment can overlap with an overshadowed segment, and the latter + * needs to be marked as unused first by {@link MarkOvershadowedSegmentsAsUnused} duty before the tombstone candidate + * can be marked as unused by {@link MarkEternityTombstonesAsUnused} duty. + *</p> + * <p> + * Only tombstones with 0 core partitions is considered as candidate segments. Earlier generation tombstones with 1 core + * partition (i.e., {@link TombstoneShardSpec#getNumCorePartitions()} == 1) are ignored by this duty because it can potentially + * cause data loss in a concurrent append and replace scenario and needs to be manually cleaned up. See this + * <a href="https://github.com/apache/druid/pull/15379">for details</a>. + * </p> + */ +public class MarkEternityTombstonesAsUnused implements CoordinatorDuty +{ + private static final Logger log = new Logger(MarkEternityTombstonesAsUnused.class); + + private final SegmentDeleteHandler deleteHandler; + + public MarkEternityTombstonesAsUnused(final SegmentDeleteHandler deleteHandler) + { + this.deleteHandler = deleteHandler; + } + + @Override + public DruidCoordinatorRuntimeParams run(final DruidCoordinatorRuntimeParams params) + { + DataSourcesSnapshot dataSourcesSnapshot = params.getDataSourcesSnapshot(); + + final Map<String, Set<SegmentId>> datasourceToEternityTombstones = determineCandidateTombstones(dataSourcesSnapshot); + + if (datasourceToEternityTombstones.size() == 0) { + log.debug("No non-overlapping eternity tombstones found."); + return params; + } + + log.debug("Found [%d] datasource containing non-overlapping eternity tombstones[%s]", + datasourceToEternityTombstones.size(), datasourceToEternityTombstones + ); + + final CoordinatorRunStats stats = params.getCoordinatorStats(); + datasourceToEternityTombstones.forEach((datasource, unusedSegments) -> { + RowKey datasourceKey = RowKey.of(Dimension.DATASOURCE, datasource); + stats.add(Stats.Segments.ETERNITY_TOMBSTONE, datasourceKey, unusedSegments.size()); + int unusedCount = deleteHandler.markSegmentsAsUnused(unusedSegments); + log.info( + "Successfully marked [%d] non-overlapping eternity tombstones of datasource[%s] as unused.", Review Comment: Would it possible to have the tombstone ids logged as well? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected] --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
