kfaraz commented on code in PR #13172:
URL: https://github.com/apache/druid/pull/13172#discussion_r996247487
##########
indexing-service/src/main/java/org/apache/druid/indexing/overlord/TaskLockbox.java:
##########
@@ -183,20 +187,39 @@ public int compare(Pair<Task, TaskLock> left, Pair<Task,
TaskLock> right)
);
}
} else {
- throw new ISE(
- "Could not reacquire lock on interval[%s] version[%s] for task:
%s",
+ failedToReacquireLockTaskGroups.add(task.getGroupId());
+ log.error(
+ "Could not reacquire lock on interval[%s] version[%s] for task:
%s from group %s.",
savedTaskLockWithPriority.getInterval(),
savedTaskLockWithPriority.getVersion(),
- task.getId()
+ task.getId(),
+ task.getGroupId()
);
+ continue;
Review Comment:
Nit: probably not needed as we are already at the end of the loop.
##########
indexing-service/src/test/java/org/apache/druid/indexing/overlord/TaskLockboxTest.java:
##########
@@ -1258,6 +1259,39 @@ public void testGetLockedIntervalsForRevokedLocks()
throws Exception
);
}
+ @Test
+ public void testFailedToReacquireTaskLock() throws Exception
+ {
+ final Task badTask0 = NoopTask.withGroupId("BadTask");
+ final Task badTask1 = NoopTask.withGroupId("BadTask");
+ final Task goodTask0 = NoopTask.withGroupId("GoodTask");
+ taskStorage.insert(badTask0, TaskStatus.running(badTask0.getId()));
+ taskStorage.insert(badTask1, TaskStatus.running(badTask1.getId()));
+ taskStorage.insert(goodTask0, TaskStatus.running(goodTask0.getId()));
+
+ TaskLockbox testLockbox = new NullLockPosseTaskLockbox(taskStorage,
metadataStorageCoordinator);
+ testLockbox.add(badTask0);
+ testLockbox.add(badTask1);
+ testLockbox.add(goodTask0);
+
+ testLockbox.tryLock(badTask0, new
TimeChunkLockRequest(TaskLockType.EXCLUSIVE,
Review Comment:
Nit: Style: Put each arg on a separate line (when necessary) for consistency
with the rest of Druid code.
##########
indexing-service/src/main/java/org/apache/druid/indexing/overlord/TaskLockbox.java:
##########
@@ -207,7 +230,8 @@ public int compare(Pair<Task, TaskLock> left, Pair<Task,
TaskLock> right)
* This method is called only in {@link #syncFromStorage()} and verifies the
given task and the taskLock have the same
* groupId, dataSource, and priority.
*/
- private TaskLockPosse verifyAndCreateOrFindLockPosse(Task task, TaskLock
taskLock)
+ @VisibleForTesting
Review Comment:
Nit: is there a way to avoid this and still be able to test it? (without too
much hassle)
##########
indexing-service/src/test/java/org/apache/druid/indexing/overlord/http/OverlordTest.java:
##########
@@ -270,19 +300,20 @@ public void testOverlordRun() throws Exception
Assert.assertEquals(TASK_LOCATION, taskResponseObject.getLocation());
// Simulate completion of task_1
- taskCompletionCountDownLatches[Integer.parseInt(taskId_1)].countDown();
+ taskCompletionCountDownLatches.get(taskId_1).countDown();
// Wait for taskQueue to handle success status of task_1
waitForTaskStatus(taskId_1, TaskState.SUCCESS);
// should return number of tasks which are not in running state
response = overlordResource.getCompleteTasks(null, req);
- Assert.assertEquals(2, (((List) response.getEntity()).size()));
+ Assert.assertEquals(4, (((List) response.getEntity()).size()));
Review Comment:
Why did we need to change an existing test?
I would advise adding a separate test for verifying the behaviour of such
tasks where we failed to reacquire locks.
##########
indexing-service/src/main/java/org/apache/druid/indexing/overlord/TaskLockbox.java:
##########
@@ -183,20 +187,39 @@ public int compare(Pair<Task, TaskLock> left, Pair<Task,
TaskLock> right)
);
}
} else {
- throw new ISE(
- "Could not reacquire lock on interval[%s] version[%s] for task:
%s",
+ failedToReacquireLockTaskGroups.add(task.getGroupId());
+ log.error(
+ "Could not reacquire lock on interval[%s] version[%s] for task:
%s from group %s.",
savedTaskLockWithPriority.getInterval(),
savedTaskLockWithPriority.getVersion(),
- task.getId()
+ task.getId(),
+ task.getGroupId()
);
+ continue;
+ }
+ }
+
+ Set<Task> tasksToFail = new HashSet<>();
+ for (Task task : taskStorage.getActiveTasks()) {
Review Comment:
We don't want to make another call to the storage. Use an iterator over the
`activeTasks` or `storedActiveTasks`. Either should be fine.
##########
indexing-service/src/main/java/org/apache/druid/indexing/overlord/TaskQueue.java:
##########
@@ -173,6 +174,12 @@ public void start()
Preconditions.checkState(!active, "queue must be stopped");
active = true;
syncFromStorage();
+ // Mark these tasks as failed as they could not reacuire the lock
+ // Clean up needs to happen after tasks have been synced from storage
+ Set<Task> tasksToFail = taskLockbox.syncFromStorage().getTasksToFail();
+ for (Task task : tasksToFail) {
+ shutdown(task.getId(), "Failed to reacquire lock.");
Review Comment:
```suggestion
shutdown(task.getId(), "Shutting down forcefully as failed to
reacquire lock after becoming leader.");
```
It's a little verbose but paints a clearer picture of what happened.
##########
indexing-service/src/main/java/org/apache/druid/indexing/overlord/TaskLockbox.java:
##########
@@ -183,20 +187,39 @@ public int compare(Pair<Task, TaskLock> left, Pair<Task,
TaskLock> right)
);
}
} else {
- throw new ISE(
- "Could not reacquire lock on interval[%s] version[%s] for task:
%s",
+ failedToReacquireLockTaskGroups.add(task.getGroupId());
+ log.error(
+ "Could not reacquire lock on interval[%s] version[%s] for task:
%s from group %s.",
savedTaskLockWithPriority.getInterval(),
savedTaskLockWithPriority.getVersion(),
- task.getId()
+ task.getId(),
+ task.getGroupId()
);
+ continue;
+ }
+ }
+
+ Set<Task> tasksToFail = new HashSet<>();
+ for (Task task : taskStorage.getActiveTasks()) {
+ if (failedToReacquireLockTaskGroups.contains(task.getGroupId())) {
+ tasksToFail.add(task);
+ activeTasks.remove(task.getId());
Review Comment:
Nit: Style: You could choose to remove all of them in one go, thus retaining
the sense of atomic update to `activeTasks`.
##########
indexing-service/src/main/java/org/apache/druid/indexing/overlord/TaskQueue.java:
##########
@@ -228,6 +235,13 @@ public ScheduledExecutors.Signal call()
}
);
requestManagement();
+ // Remove any unacquired locks from storage
Review Comment:
Thanks for adding the comment!
I am a little unclear on why there would be unacquired locks left behind. I
would imagine that `shutdown()` would take care of this. If not, please
rephrase the comments to clarify that point.
##########
indexing-service/src/main/java/org/apache/druid/indexing/overlord/TaskLockbox.java:
##########
@@ -139,6 +141,8 @@ public int compare(Pair<Task, TaskLock> left, Pair<Task,
TaskLock> right)
running.clear();
activeTasks.clear();
activeTasks.addAll(storedActiveTasks);
+ // Set of task groups in which at least one task failed to re-acquire a
lock
Review Comment:
Thanks for the comments!
##########
indexing-service/src/test/java/org/apache/druid/indexing/overlord/TaskLockboxTest.java:
##########
@@ -1258,6 +1259,39 @@ public void testGetLockedIntervalsForRevokedLocks()
throws Exception
);
}
+ @Test
+ public void testFailedToReacquireTaskLock() throws Exception
+ {
+ final Task badTask0 = NoopTask.withGroupId("BadTask");
Review Comment:
Probably a better name explaining why it's bad or good?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]