AmatyaAvadhanula commented on code in PR #16144:
URL: https://github.com/apache/druid/pull/16144#discussion_r1567044496


##########
server/src/main/java/org/apache/druid/indexing/overlord/IndexerMetadataStorageCoordinator.java:
##########
@@ -476,4 +478,19 @@ SegmentPublishResult commitMetadataOnly(
    * @return number of deleted entries from the metadata store
    */
   int deleteUpgradeSegmentsForTask(String taskId);
+
+  /**
+   * Delete pending segment for a give task group after all the tasks 
belonging to it have completed.
+   * @param taskGroup task group
+   * @return number of pending segments deleted from the metadata store
+   */
+  int deletePendingSegmentsForTaskGroup(String taskGroup);

Review Comment:
   Done



##########
indexing-service/src/main/java/org/apache/druid/indexing/overlord/TaskLockbox.java:
##########
@@ -1176,13 +1204,35 @@ public void remove(final Task task)
     try {
       try {
         log.info("Removing task[%s] from activeTasks", task.getId());
-        if (findLocksForTask(task).stream().anyMatch(lock -> lock.getType() == 
TaskLockType.REPLACE)) {
-          final int upgradeSegmentsDeleted = 
metadataStorageCoordinator.deleteUpgradeSegmentsForTask(task.getId());
-          log.info(
-              "Deleted [%d] entries from upgradeSegments table for task[%s] 
with REPLACE locks.",
-              upgradeSegmentsDeleted,
-              task.getId()
-          );
+        try {
+          if (findLocksForTask(task).stream().anyMatch(lock -> lock.getType() 
== TaskLockType.REPLACE)) {
+            final int upgradeSegmentsDeleted = 
metadataStorageCoordinator.deleteUpgradeSegmentsForTask(task.getId());
+            log.info(
+                "Deleted [%d] entries from upgradeSegments table for task[%s] 
with REPLACE locks.",
+                upgradeSegmentsDeleted,
+                task.getId()

Review Comment:
   Done



##########
indexing-service/src/main/java/org/apache/druid/indexing/overlord/TaskLockbox.java:
##########
@@ -1176,13 +1204,35 @@ public void remove(final Task task)
     try {
       try {
         log.info("Removing task[%s] from activeTasks", task.getId());
-        if (findLocksForTask(task).stream().anyMatch(lock -> lock.getType() == 
TaskLockType.REPLACE)) {
-          final int upgradeSegmentsDeleted = 
metadataStorageCoordinator.deleteUpgradeSegmentsForTask(task.getId());
-          log.info(
-              "Deleted [%d] entries from upgradeSegments table for task[%s] 
with REPLACE locks.",
-              upgradeSegmentsDeleted,
-              task.getId()
-          );
+        try {
+          if (findLocksForTask(task).stream().anyMatch(lock -> lock.getType() 
== TaskLockType.REPLACE)) {

Review Comment:
   Added



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to