kfaraz commented on code in PR #13172:
URL: https://github.com/apache/druid/pull/13172#discussion_r996247487


##########
indexing-service/src/main/java/org/apache/druid/indexing/overlord/TaskLockbox.java:
##########
@@ -183,20 +187,39 @@ public int compare(Pair<Task, TaskLock> left, Pair<Task, 
TaskLock> right)
             );
           }
         } else {
-          throw new ISE(
-              "Could not reacquire lock on interval[%s] version[%s] for task: 
%s",
+          failedToReacquireLockTaskGroups.add(task.getGroupId());
+          log.error(
+              "Could not reacquire lock on interval[%s] version[%s] for task: 
%s from group %s.",
               savedTaskLockWithPriority.getInterval(),
               savedTaskLockWithPriority.getVersion(),
-              task.getId()
+              task.getId(),
+              task.getGroupId()
           );
+          continue;

Review Comment:
   Nit: probably not needed as we are already at the end of the loop.



##########
indexing-service/src/test/java/org/apache/druid/indexing/overlord/TaskLockboxTest.java:
##########
@@ -1258,6 +1259,39 @@ public void testGetLockedIntervalsForRevokedLocks() 
throws Exception
     );
   }
 
+  @Test
+  public void testFailedToReacquireTaskLock() throws Exception
+  {
+    final Task badTask0 = NoopTask.withGroupId("BadTask");
+    final Task badTask1 = NoopTask.withGroupId("BadTask");
+    final Task goodTask0 = NoopTask.withGroupId("GoodTask");
+    taskStorage.insert(badTask0, TaskStatus.running(badTask0.getId()));
+    taskStorage.insert(badTask1, TaskStatus.running(badTask1.getId()));
+    taskStorage.insert(goodTask0, TaskStatus.running(goodTask0.getId()));
+
+    TaskLockbox testLockbox = new NullLockPosseTaskLockbox(taskStorage, 
metadataStorageCoordinator);
+    testLockbox.add(badTask0);
+    testLockbox.add(badTask1);
+    testLockbox.add(goodTask0);
+
+    testLockbox.tryLock(badTask0, new 
TimeChunkLockRequest(TaskLockType.EXCLUSIVE,

Review Comment:
   Nit: Style: Put each arg on a separate line (when necessary) for consistency 
with the rest of Druid code.



##########
indexing-service/src/main/java/org/apache/druid/indexing/overlord/TaskLockbox.java:
##########
@@ -207,7 +230,8 @@ public int compare(Pair<Task, TaskLock> left, Pair<Task, 
TaskLock> right)
    * This method is called only in {@link #syncFromStorage()} and verifies the 
given task and the taskLock have the same
    * groupId, dataSource, and priority.
    */
-  private TaskLockPosse verifyAndCreateOrFindLockPosse(Task task, TaskLock 
taskLock)
+  @VisibleForTesting

Review Comment:
   Nit: is there a way to avoid this and still be able to test it? (without too 
much hassle)



##########
indexing-service/src/test/java/org/apache/druid/indexing/overlord/http/OverlordTest.java:
##########
@@ -270,19 +300,20 @@ public void testOverlordRun() throws Exception
     Assert.assertEquals(TASK_LOCATION, taskResponseObject.getLocation());
 
     // Simulate completion of task_1
-    taskCompletionCountDownLatches[Integer.parseInt(taskId_1)].countDown();
+    taskCompletionCountDownLatches.get(taskId_1).countDown();
     // Wait for taskQueue to handle success status of task_1
     waitForTaskStatus(taskId_1, TaskState.SUCCESS);
 
     // should return number of tasks which are not in running state
     response = overlordResource.getCompleteTasks(null, req);
-    Assert.assertEquals(2, (((List) response.getEntity()).size()));
+    Assert.assertEquals(4, (((List) response.getEntity()).size()));

Review Comment:
   Why did we need to change an existing test?
   I would advise adding a separate test for verifying the behaviour of such 
tasks where we failed to reacquire locks.



##########
indexing-service/src/main/java/org/apache/druid/indexing/overlord/TaskLockbox.java:
##########
@@ -183,20 +187,39 @@ public int compare(Pair<Task, TaskLock> left, Pair<Task, 
TaskLock> right)
             );
           }
         } else {
-          throw new ISE(
-              "Could not reacquire lock on interval[%s] version[%s] for task: 
%s",
+          failedToReacquireLockTaskGroups.add(task.getGroupId());
+          log.error(
+              "Could not reacquire lock on interval[%s] version[%s] for task: 
%s from group %s.",
               savedTaskLockWithPriority.getInterval(),
               savedTaskLockWithPriority.getVersion(),
-              task.getId()
+              task.getId(),
+              task.getGroupId()
           );
+          continue;
+        }
+      }
+
+      Set<Task> tasksToFail = new HashSet<>();
+      for (Task task : taskStorage.getActiveTasks()) {

Review Comment:
   We don't want to make another call to the storage. Use an iterator over the 
`activeTasks` or `storedActiveTasks`. Either should be fine.



##########
indexing-service/src/main/java/org/apache/druid/indexing/overlord/TaskQueue.java:
##########
@@ -173,6 +174,12 @@ public void start()
       Preconditions.checkState(!active, "queue must be stopped");
       active = true;
       syncFromStorage();
+      // Mark these tasks as failed as they could not reacuire the lock
+      // Clean up needs to happen after tasks have been synced from storage
+      Set<Task> tasksToFail = taskLockbox.syncFromStorage().getTasksToFail();
+      for (Task task : tasksToFail) {
+        shutdown(task.getId(), "Failed to reacquire lock.");

Review Comment:
   ```suggestion
           shutdown(task.getId(), "Shutting down forcefully as failed to 
reacquire lock after becoming leader.");
   ```
   It's a little verbose but paints a clearer picture of what happened.



##########
indexing-service/src/main/java/org/apache/druid/indexing/overlord/TaskLockbox.java:
##########
@@ -183,20 +187,39 @@ public int compare(Pair<Task, TaskLock> left, Pair<Task, 
TaskLock> right)
             );
           }
         } else {
-          throw new ISE(
-              "Could not reacquire lock on interval[%s] version[%s] for task: 
%s",
+          failedToReacquireLockTaskGroups.add(task.getGroupId());
+          log.error(
+              "Could not reacquire lock on interval[%s] version[%s] for task: 
%s from group %s.",
               savedTaskLockWithPriority.getInterval(),
               savedTaskLockWithPriority.getVersion(),
-              task.getId()
+              task.getId(),
+              task.getGroupId()
           );
+          continue;
+        }
+      }
+
+      Set<Task> tasksToFail = new HashSet<>();
+      for (Task task : taskStorage.getActiveTasks()) {
+        if (failedToReacquireLockTaskGroups.contains(task.getGroupId())) {
+          tasksToFail.add(task);
+          activeTasks.remove(task.getId());

Review Comment:
   Nit: Style: You could choose to remove all of them in one go, thus retaining 
the sense of atomic update to `activeTasks`.



##########
indexing-service/src/main/java/org/apache/druid/indexing/overlord/TaskQueue.java:
##########
@@ -228,6 +235,13 @@ public ScheduledExecutors.Signal call()
           }
       );
       requestManagement();
+      // Remove any unacquired locks from storage

Review Comment:
   Thanks for adding the comment!
   I am a little unclear on why there would be unacquired locks left behind. I 
would imagine that `shutdown()` would take care of this. If not, please 
rephrase the comments to clarify that point.



##########
indexing-service/src/main/java/org/apache/druid/indexing/overlord/TaskLockbox.java:
##########
@@ -139,6 +141,8 @@ public int compare(Pair<Task, TaskLock> left, Pair<Task, 
TaskLock> right)
       running.clear();
       activeTasks.clear();
       activeTasks.addAll(storedActiveTasks);
+      // Set of task groups in which at least one task failed to re-acquire a 
lock

Review Comment:
   Thanks for the comments!



##########
indexing-service/src/test/java/org/apache/druid/indexing/overlord/TaskLockboxTest.java:
##########
@@ -1258,6 +1259,39 @@ public void testGetLockedIntervalsForRevokedLocks() 
throws Exception
     );
   }
 
+  @Test
+  public void testFailedToReacquireTaskLock() throws Exception
+  {
+    final Task badTask0 = NoopTask.withGroupId("BadTask");

Review Comment:
   Probably a better name explaining why it's bad or good?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to