jon-wei commented on a change in pull request #7547: Add support minor 
compaction with segment locking
URL: https://github.com/apache/incubator-druid/pull/7547#discussion_r306595976
 
 

 ##########
 File path: 
indexing-service/src/main/java/org/apache/druid/indexing/common/task/AbstractBatchIndexTask.java
 ##########
 @@ -110,60 +80,143 @@ protected AbstractBatchIndexTask(
   )
   {
     super(id, groupId, taskResource, dataSource, context);
+    segmentLockHelper = new SegmentLockHelper(dataSource);
   }
 
-  public abstract boolean requireLockInputSegments();
+  /**
+   * Return true if this task can overwrite existing segments.
+   */
+  public abstract boolean requireLockExistingSegments();
 
-  public abstract List<DataSegment> findInputSegments(TaskActionClient 
taskActionClient, List<Interval> intervals)
+  /**
+   * Find segments to lock in the given intervals.
+   * If this task is intend to overwrite only some segments in those 
intervals, this method should return only those
+   * segments instead of entire segments in those intervals.
+   */
+  // TODO: remove this
+  public abstract List<DataSegment> findSegmentsToLock(TaskActionClient 
taskActionClient, List<Interval> intervals)
       throws IOException;
 
-  public abstract boolean checkIfChangeSegmentGranularity(List<Interval> 
intervalOfExistingSegments);
-
   public abstract boolean isPerfectRollup();
 
-  /**
-   * Returns the segmentGranularity for the given interval. Usually tasks are 
supposed to return its segmentGranularity
-   * if exists. The compactionTask can return different segmentGranularity 
depending on its configuration and the input
-   * interval.
-   *
-   * @return segmentGranularity or null if it doesn't support it.
-   */
+  public boolean isUseSegmentLock()
+  {
+    return useSegmentLock;
+  }
+
   @Nullable
-  public abstract Granularity getSegmentGranularity(Interval interval);
+  public abstract Granularity getSegmentGranularity();
 
-  protected boolean tryLockWithIntervals(TaskActionClient client, 
List<Interval> intervals, boolean isInitialRequest)
+  public boolean determineLockGranularityAndTryLock(
+      TaskActionClient client,
+      GranularitySpec granularitySpec
+  ) throws IOException
+  {
+    final List<Interval> intervals = 
granularitySpec.bucketIntervals().isPresent()
+                                     ? new 
ArrayList<>(granularitySpec.bucketIntervals().get())
+                                     : Collections.emptyList();
+    return determineLockGranularityandTryLock(client, intervals);
+  }
+
+  public SegmentLockHelper getNonNullSegmentLockHelper()
+  {
+    return Preconditions.checkNotNull(segmentLockHelper, "segmentLockHelper");
+  }
+
+  protected boolean determineLockGranularityandTryLock(TaskActionClient 
client, List<Interval> intervals)
       throws IOException
   {
-    if (requireLockInputSegments()) {
-      if (isPerfectRollup()) {
+    final boolean forceTimeChunkLock = getContextValue(
+        Tasks.FORCE_TIME_CHUNK_LOCK_KEY,
+        Tasks.DEFAULT_FORCE_TIME_CHUNK_LOCK
+    );
+    if (forceTimeChunkLock) {
+      log.info("[%s] is set to true in task context. Use timeChunk lock", 
Tasks.FORCE_TIME_CHUNK_LOCK_KEY);
+      useSegmentLock = false;
+      if (!intervals.isEmpty()) {
         return tryTimeChunkLock(client, intervals);
+      } else {
+        return true;
+      }
+    } else {
+      if (!intervals.isEmpty()) {
+        final LockGranularityDeterminResult result = 
determineSegmentGranularity(client, intervals);
+        useSegmentLock = result.lockGranularity == LockGranularity.SEGMENT;
+        return tryLockWithDetermineResult(client, result);
+      } else {
+        return true;
+      }
+    }
+  }
+
+  protected boolean 
determineLockGranularityandTryLockWithSegments(TaskActionClient client, 
List<DataSegment> segments)
+      throws IOException
+  {
+    final boolean forceTimeChunkLock = getContextValue(
+        Tasks.FORCE_TIME_CHUNK_LOCK_KEY,
+        Tasks.DEFAULT_FORCE_TIME_CHUNK_LOCK
+    );
+    if (forceTimeChunkLock) {
+      log.info("[%s] is set to true in task context. Use timeChunk lock", 
Tasks.FORCE_TIME_CHUNK_LOCK_KEY);
+      useSegmentLock = false;
+      return tryTimeChunkLock(
+          client,
+          new 
ArrayList<>(segments.stream().map(DataSegment::getInterval).collect(Collectors.toSet()))
+      );
+    } else {
+      final LockGranularityDeterminResult result = 
determineSegmentGranularity(segments);
+      useSegmentLock = result.lockGranularity == LockGranularity.SEGMENT;
+      return tryLockWithDetermineResult(client, result);
+    }
+  }
+
+  private boolean tryLockWithDetermineResult(TaskActionClient client, 
LockGranularityDeterminResult result)
+      throws IOException
+  {
+    if (result.lockGranularity == LockGranularity.TIME_CHUNK) {
+      return tryTimeChunkLock(client, 
Preconditions.checkNotNull(result.intervals, "intervals"));
+    } else {
+      final boolean isReady = segmentLockHelper.verifyAndLockExistingSegments(
+          client,
+          Preconditions.checkNotNull(result.segments, "segments")
+      );
+      return isReady;
+    }
+  }
+
+  protected LockGranularityDeterminResult determineSegmentGranularity(
+      TaskActionClient client,
+      List<Interval> intervals
+  ) throws IOException
+  {
+    if (requireLockExistingSegments()) {
+      if (isPerfectRollup()) {
+        log.info("Using timeChunk lock for perfrect rollup");
 
 Review comment:
   perfrect -> perfect

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
[email protected]


With regards,
Apache Git Services

---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to