AmatyaAvadhanula commented on code in PR #16422:
URL: https://github.com/apache/druid/pull/16422#discussion_r1595239042


##########
indexing-service/src/main/java/org/apache/druid/indexing/overlord/TaskLockbox.java:
##########
@@ -1271,33 +1215,63 @@ public void remove(final Task task)
     }
   }
 
-  /**
-   * Return the currently-active lock posses for some task.
-   *
-   * @param task task for which to locate locks
-   */
-  private List<TaskLockPosse> findLockPossesForTask(final Task task)
+  @GuardedBy("giant")
+  private void cleanupUpgradeAndPendingSegments(Task task)
   {
-    giant.lock();
-
     try {
-      // Scan through all locks for this datasource
-      final NavigableMap<DateTime, SortedMap<Interval, List<TaskLockPosse>>> 
dsRunning = running.get(task.getDataSource());
-      if (dsRunning == null) {
-        return ImmutableList.of();
-      } else {
-        return dsRunning.values().stream()
-                        .flatMap(map -> map.values().stream())
-                        .flatMap(Collection::stream)
-                        .filter(taskLockPosse -> 
taskLockPosse.containsTask(task))
-                        .collect(Collectors.toList());
+      // Clean up upgrade segment entries associated with a REPLACE task
+      if (findLocksForTask(task).stream().anyMatch(lock -> lock.getType() == 
TaskLockType.REPLACE)) {
+        final int upgradeSegmentsDeleted = 
metadataStorageCoordinator.deleteUpgradeSegmentsForTask(task.getId());
+        log.info(
+            "Deleted [%d] entries from upgradeSegments table for task[%s] with 
REPLACE locks.",
+            upgradeSegmentsDeleted, task.getId()
+        );
+      }
+
+      // Clean up pending segments associated with an APPEND task
+      if (task instanceof PendingSegmentAllocatingTask) {
+        final String taskAllocatorId = ((PendingSegmentAllocatingTask) 
task).getTaskAllocatorId();
+        if (activeAllocatorIdToTaskIds.containsKey(taskAllocatorId)) {
+          final Set<String> taskIdsForSameAllocator = 
activeAllocatorIdToTaskIds.get(taskAllocatorId);
+          taskIdsForSameAllocator.remove(task.getId());
+
+          if (taskIdsForSameAllocator.isEmpty()) {
+            final int pendingSegmentsDeleted = metadataStorageCoordinator
+                .deletePendingSegmentsForTaskAllocatorId(task.getDataSource(), 
taskAllocatorId);
+            log.info(
+                "Deleted [%d] entries from pendingSegments table for 
taskAllocatorId[%s].",
+                pendingSegmentsDeleted, taskAllocatorId
+            );
+          }
+          activeAllocatorIdToTaskIds.remove(taskAllocatorId);
+        }
       }
     }
-    finally {
-      giant.unlock();
+    catch (Exception e) {
+      log.warn(e, "Failure cleaning up upgradeSegments or pendingSegments 
tables.");
     }
   }
 
+  /**
+   * Finds the currently-active lock posses for the task.

Review Comment:
   "currently-active" can be ambiguous as the method returns posses for both 
active and revoked locks



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to