abhishekagarwal87 commented on a change in pull request #11190:
URL: https://github.com/apache/druid/pull/11190#discussion_r625731629



##########
File path: 
server/src/main/java/org/apache/druid/server/coordinator/duty/CompactSegments.java
##########
@@ -154,6 +160,17 @@ public DruidCoordinatorRuntimeParams 
run(DruidCoordinatorRuntimeParams params)
           }
         }
 
+        // Skip all the locked intervals
+        LOG.debug(
+            "Skipping the following intervals for Compaction as they are 
currently locked: %s",
+            taskToLockedIntervals
+        );
+        taskToLockedIntervals.forEach(
+            (taskId, datasourceIntervals) -> compactionTaskIntervals
+                .computeIfAbsent(datasourceIntervals.getDatasource(), ds -> 
new ArrayList<>())
+                .addAll(datasourceIntervals.getIntervals())
+        );
+
         final CompactionSegmentIterator iterator =
             policy.reset(compactionConfigs, dataSources, 
compactionTaskIntervals);

Review comment:
       maybe `compactionTaskIntervals` needs to be called something else now 
since it can also include intervals for which there is no lock. 

##########
File path: 
indexing-service/src/main/java/org/apache/druid/indexing/overlord/TaskLockbox.java
##########
@@ -674,6 +675,64 @@ public TaskLock apply(TaskLockPosse taskLockPosse)
     }
   }
 
+  /**
+   * Gets a Map containing intervals locked by active tasks. Intervals locked
+   * by revoked TaskLocks are not included in the returned Map.
+   *
+   * @return Map from Task Id to locked intervals.
+   */
+  public Map<String, DatasourceIntervals> getLockedIntervals()
+  {
+    final Map<String, List<Interval>> taskToIntervals = new HashMap<>();
+    final Map<String, String> taskToDatasource = new HashMap<>();
+
+    // Take a lock and populate the maps
+    giant.lock();
+    try {
+      running.forEach(
+          (datasource, datasourceLocks) -> datasourceLocks.forEach(
+              (startTime, startTimeLocks) -> startTimeLocks.forEach(
+                  (interval, taskLockPosses) -> taskLockPosses.forEach(
+                      taskLockPosse -> taskLockPosse.taskIds.forEach(taskId -> 
{
+                        // Do not proceed if the lock is revoked
+                        if (taskLockPosse.getTaskLock().isRevoked()) {

Review comment:
       what is the rationale behind this? 




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[email protected]



---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to