jon-wei commented on a change in pull request #7547: Add support minor
compaction with segment locking
URL: https://github.com/apache/incubator-druid/pull/7547#discussion_r306595976
##########
File path:
indexing-service/src/main/java/org/apache/druid/indexing/common/task/AbstractBatchIndexTask.java
##########
@@ -110,60 +80,143 @@ protected AbstractBatchIndexTask(
)
{
super(id, groupId, taskResource, dataSource, context);
+ segmentLockHelper = new SegmentLockHelper(dataSource);
}
- public abstract boolean requireLockInputSegments();
+ /**
+ * Return true if this task can overwrite existing segments.
+ */
+ public abstract boolean requireLockExistingSegments();
- public abstract List<DataSegment> findInputSegments(TaskActionClient
taskActionClient, List<Interval> intervals)
+ /**
+ * Find segments to lock in the given intervals.
+ * If this task is intend to overwrite only some segments in those
intervals, this method should return only those
+ * segments instead of entire segments in those intervals.
+ */
+ // TODO: remove this
+ public abstract List<DataSegment> findSegmentsToLock(TaskActionClient
taskActionClient, List<Interval> intervals)
throws IOException;
- public abstract boolean checkIfChangeSegmentGranularity(List<Interval>
intervalOfExistingSegments);
-
public abstract boolean isPerfectRollup();
- /**
- * Returns the segmentGranularity for the given interval. Usually tasks are
supposed to return its segmentGranularity
- * if exists. The compactionTask can return different segmentGranularity
depending on its configuration and the input
- * interval.
- *
- * @return segmentGranularity or null if it doesn't support it.
- */
+ public boolean isUseSegmentLock()
+ {
+ return useSegmentLock;
+ }
+
@Nullable
- public abstract Granularity getSegmentGranularity(Interval interval);
+ public abstract Granularity getSegmentGranularity();
- protected boolean tryLockWithIntervals(TaskActionClient client,
List<Interval> intervals, boolean isInitialRequest)
+ public boolean determineLockGranularityAndTryLock(
+ TaskActionClient client,
+ GranularitySpec granularitySpec
+ ) throws IOException
+ {
+ final List<Interval> intervals =
granularitySpec.bucketIntervals().isPresent()
+ ? new
ArrayList<>(granularitySpec.bucketIntervals().get())
+ : Collections.emptyList();
+ return determineLockGranularityandTryLock(client, intervals);
+ }
+
+ public SegmentLockHelper getNonNullSegmentLockHelper()
+ {
+ return Preconditions.checkNotNull(segmentLockHelper, "segmentLockHelper");
+ }
+
+ protected boolean determineLockGranularityandTryLock(TaskActionClient
client, List<Interval> intervals)
throws IOException
{
- if (requireLockInputSegments()) {
- if (isPerfectRollup()) {
+ final boolean forceTimeChunkLock = getContextValue(
+ Tasks.FORCE_TIME_CHUNK_LOCK_KEY,
+ Tasks.DEFAULT_FORCE_TIME_CHUNK_LOCK
+ );
+ if (forceTimeChunkLock) {
+ log.info("[%s] is set to true in task context. Use timeChunk lock",
Tasks.FORCE_TIME_CHUNK_LOCK_KEY);
+ useSegmentLock = false;
+ if (!intervals.isEmpty()) {
return tryTimeChunkLock(client, intervals);
+ } else {
+ return true;
+ }
+ } else {
+ if (!intervals.isEmpty()) {
+ final LockGranularityDeterminResult result =
determineSegmentGranularity(client, intervals);
+ useSegmentLock = result.lockGranularity == LockGranularity.SEGMENT;
+ return tryLockWithDetermineResult(client, result);
+ } else {
+ return true;
+ }
+ }
+ }
+
+ protected boolean
determineLockGranularityandTryLockWithSegments(TaskActionClient client,
List<DataSegment> segments)
+ throws IOException
+ {
+ final boolean forceTimeChunkLock = getContextValue(
+ Tasks.FORCE_TIME_CHUNK_LOCK_KEY,
+ Tasks.DEFAULT_FORCE_TIME_CHUNK_LOCK
+ );
+ if (forceTimeChunkLock) {
+ log.info("[%s] is set to true in task context. Use timeChunk lock",
Tasks.FORCE_TIME_CHUNK_LOCK_KEY);
+ useSegmentLock = false;
+ return tryTimeChunkLock(
+ client,
+ new
ArrayList<>(segments.stream().map(DataSegment::getInterval).collect(Collectors.toSet()))
+ );
+ } else {
+ final LockGranularityDeterminResult result =
determineSegmentGranularity(segments);
+ useSegmentLock = result.lockGranularity == LockGranularity.SEGMENT;
+ return tryLockWithDetermineResult(client, result);
+ }
+ }
+
+ private boolean tryLockWithDetermineResult(TaskActionClient client,
LockGranularityDeterminResult result)
+ throws IOException
+ {
+ if (result.lockGranularity == LockGranularity.TIME_CHUNK) {
+ return tryTimeChunkLock(client,
Preconditions.checkNotNull(result.intervals, "intervals"));
+ } else {
+ final boolean isReady = segmentLockHelper.verifyAndLockExistingSegments(
+ client,
+ Preconditions.checkNotNull(result.segments, "segments")
+ );
+ return isReady;
+ }
+ }
+
+ protected LockGranularityDeterminResult determineSegmentGranularity(
+ TaskActionClient client,
+ List<Interval> intervals
+ ) throws IOException
+ {
+ if (requireLockExistingSegments()) {
+ if (isPerfectRollup()) {
+ log.info("Using timeChunk lock for perfrect rollup");
Review comment:
perfrect -> perfect
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]
With regards,
Apache Git Services
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]