clintropolis commented on a change in pull request #7547: Add support minor compaction with segment locking URL: https://github.com/apache/incubator-druid/pull/7547#discussion_r306677049
########## File path: indexing-service/src/main/java/org/apache/druid/indexing/common/task/AbstractBatchIndexTask.java ########## @@ -0,0 +1,384 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.indexing.common.task; + +import com.google.common.base.Preconditions; +import com.google.common.collect.Iterables; +import org.apache.druid.data.input.FirehoseFactory; +import org.apache.druid.indexing.common.LockGranularity; +import org.apache.druid.indexing.common.TaskLock; +import org.apache.druid.indexing.common.TaskLockType; +import org.apache.druid.indexing.common.actions.SegmentListUsedAction; +import org.apache.druid.indexing.common.actions.TaskActionClient; +import org.apache.druid.indexing.common.actions.TimeChunkLockTryAcquireAction; +import org.apache.druid.indexing.firehose.IngestSegmentFirehoseFactory; +import org.apache.druid.indexing.firehose.WindowedSegmentId; +import org.apache.druid.java.util.common.JodaUtils; +import org.apache.druid.java.util.common.granularity.Granularity; +import org.apache.druid.java.util.common.granularity.GranularityType; +import org.apache.druid.java.util.common.logger.Logger; +import org.apache.druid.segment.indexing.granularity.GranularitySpec; +import org.apache.druid.timeline.DataSegment; +import org.apache.druid.timeline.TimelineObjectHolder; +import org.apache.druid.timeline.VersionedIntervalTimeline; +import org.apache.druid.timeline.partition.PartitionChunk; +import org.joda.time.Interval; +import org.joda.time.Period; + +import javax.annotation.Nullable; +import java.io.IOException; +import java.util.ArrayList; +import java.util.Collections; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.stream.Collectors; +import java.util.stream.StreamSupport; + +/** + * Abstract class for batch tasks like {@link IndexTask}. + * Provides some methods such as {@link #determineSegmentGranularity}, {@link #findInputSegments}, + * and {@link #determineLockGranularityandTryLock} for easily acquiring task locks. + */ +public abstract class AbstractBatchIndexTask extends AbstractTask +{ + private static final Logger log = new Logger(AbstractBatchIndexTask.class); + + private final SegmentLockHelper segmentLockHelper; + + /** + * State to indicate that this task will use segmentLock or timeChunkLock. + * This is automatically set when {@link #determineLockGranularityandTryLock} is called. + */ + private boolean useSegmentLock; + + protected AbstractBatchIndexTask(String id, String dataSource, Map<String, Object> context) + { + super(id, dataSource, context); + segmentLockHelper = new SegmentLockHelper(); + } + + protected AbstractBatchIndexTask( + String id, + @Nullable String groupId, + @Nullable TaskResource taskResource, + String dataSource, + @Nullable Map<String, Object> context + ) + { + super(id, groupId, taskResource, dataSource, context); + segmentLockHelper = new SegmentLockHelper(); + } + + /** + * Return true if this task can overwrite existing segments. + */ + public abstract boolean requireLockExistingSegments(); + + /** + * Find segments to lock in the given intervals. + * If this task is intend to overwrite only some segments in those intervals, this method should return only those + * segments instead of entire segments in those intervals. + */ + public abstract List<DataSegment> findSegmentsToLock(TaskActionClient taskActionClient, List<Interval> intervals) + throws IOException; + + /** + * Returns true if this task is in the perfect (guaranteed) rollup mode. + */ + public abstract boolean isPerfectRollup(); + + /** + * Returns the segmentGranularity defined in the ingestion spec. + */ + @Nullable + public abstract Granularity getSegmentGranularity(); + + public boolean isUseSegmentLock() + { + return useSegmentLock; + } + + public SegmentLockHelper getSegmentLockHelper() + { + return segmentLockHelper; + } + + /** + * Determine lockGranularity to use and try to acquire necessary locks. + * This method respects the value of 'forceTimeChunkLock' in task context. + * If it's set to false or missing, this method checks if this task can use segmentLock. + */ + protected boolean determineLockGranularityAndTryLock( + TaskActionClient client, + GranularitySpec granularitySpec + ) throws IOException + { + final List<Interval> intervals = granularitySpec.bucketIntervals().isPresent() + ? new ArrayList<>(granularitySpec.bucketIntervals().get()) + : Collections.emptyList(); + return determineLockGranularityandTryLock(client, intervals); + } + + boolean determineLockGranularityandTryLock(TaskActionClient client, List<Interval> intervals) throws IOException + { + final boolean forceTimeChunkLock = getContextValue( + Tasks.FORCE_TIME_CHUNK_LOCK_KEY, + Tasks.DEFAULT_FORCE_TIME_CHUNK_LOCK + ); + // Respect task context value most. + if (forceTimeChunkLock) { + log.info("[%s] is set to true in task context. Use timeChunk lock", Tasks.FORCE_TIME_CHUNK_LOCK_KEY); + useSegmentLock = false; + if (!intervals.isEmpty()) { + return tryTimeChunkLock(client, intervals); + } else { + return true; + } + } else { + if (!intervals.isEmpty()) { + final LockGranularityDetermineResult result = determineSegmentGranularity(client, intervals); + useSegmentLock = result.lockGranularity == LockGranularity.SEGMENT; + return tryLockWithDetermineResult(client, result); + } else { + return true; + } + } + } + + boolean determineLockGranularityandTryLockWithSegments(TaskActionClient client, List<DataSegment> segments) + throws IOException + { + final boolean forceTimeChunkLock = getContextValue( + Tasks.FORCE_TIME_CHUNK_LOCK_KEY, + Tasks.DEFAULT_FORCE_TIME_CHUNK_LOCK + ); + if (forceTimeChunkLock) { + log.info("[%s] is set to true in task context. Use timeChunk lock", Tasks.FORCE_TIME_CHUNK_LOCK_KEY); + useSegmentLock = false; + return tryTimeChunkLock( + client, + new ArrayList<>(segments.stream().map(DataSegment::getInterval).collect(Collectors.toSet())) + ); + } else { + final LockGranularityDetermineResult result = determineSegmentGranularity(segments); + useSegmentLock = result.lockGranularity == LockGranularity.SEGMENT; + return tryLockWithDetermineResult(client, result); + } + } + + private LockGranularityDetermineResult determineSegmentGranularity(TaskActionClient client, List<Interval> intervals) + throws IOException + { + if (requireLockExistingSegments()) { + if (isPerfectRollup()) { + log.info("Using timeChunk lock for perfect rollup"); + return new LockGranularityDetermineResult(LockGranularity.TIME_CHUNK, intervals, null); + } else if (!intervals.isEmpty()) { + // This method finds segments falling in all given intervals and then tries to lock those segments. + // Thus, there might be a race between calling findSegmentsToLock() and determineSegmentGranularity(), + // i.e., a new segment can be added to the interval or an existing segment might be removed. + // Removed segments should be fine because indexing tasks would do nothing with removed segments. + // However, tasks wouldn't know about new segments added after findSegmentsToLock() call, it may missing those + // segments. This is usually fine, but if you want to avoid this, you should use timeChunk lock instead. + return determineSegmentGranularity(findSegmentsToLock(client, intervals)); + } else { + log.info("Using segment lock for empty intervals"); + return new LockGranularityDetermineResult(LockGranularity.SEGMENT, null, Collections.emptyList()); + } + } else { + log.info("Using segment lock since we don't have to lock existing segments"); + return new LockGranularityDetermineResult(LockGranularity.SEGMENT, null, Collections.emptyList()); + } + } + + private boolean tryLockWithDetermineResult(TaskActionClient client, LockGranularityDetermineResult result) + throws IOException + { + if (result.lockGranularity == LockGranularity.TIME_CHUNK) { + return tryTimeChunkLock(client, Preconditions.checkNotNull(result.intervals, "intervals")); + } else { + return segmentLockHelper.verifyAndLockExistingSegments( + client, + Preconditions.checkNotNull(result.segments, "segments") + ); + } + } + + private boolean tryTimeChunkLock(TaskActionClient client, List<Interval> intervals) throws IOException + { + // In this case, the intervals to lock must be alighed with segmentGranularity if it's defined Review comment: typo: > ... must be _aligned_ with ... ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: [email protected] With regards, Apache Git Services --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
