kfaraz commented on code in PR #16310:
URL: https://github.com/apache/druid/pull/16310#discussion_r1600978610
##########
server/src/main/java/org/apache/druid/indexing/overlord/supervisor/Supervisor.java:
##########
@@ -93,4 +94,10 @@ default Boolean isHealthy()
LagStats computeLagStats();
int getActiveTaskGroupsCount();
+
+ /** Handoff the task group with id=taskGroupId the next time the supervisor
runs regardless of task run time*/
Review Comment:
Some rephrase + Druid javadoc styling
```suggestion
/**
* Marks the given task groups as ready for segment hand-off irrespective
of the task run times.
* In the subsequent run, the supervisor initiates segment publish and
hand-off for these task groups and rolls over their tasks.
* taskGroupIds that are not valid or not actively reading are simply
ignored.
*/
```
##########
server/src/main/java/org/apache/druid/indexing/overlord/supervisor/Supervisor.java:
##########
@@ -93,4 +94,10 @@ default Boolean isHealthy()
LagStats computeLagStats();
int getActiveTaskGroupsCount();
+
+ /** Handoff the task group with id=taskGroupId the next time the supervisor
runs regardless of task run time*/
+ default void handoffTaskGroupsEarly(List<Integer> taskGroupIds)
+ {
+ throw new NotImplementedException("Supervisor does not have the feature to
handoff task groups early implemented");
Review Comment:
Nit: Why not use `UnsupportedOperationException` instead?
The error message seems to suggest that this is more of an "unsupported"
scenario.
##########
indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisor.java:
##########
@@ -266,6 +268,16 @@ Set<String> taskIds()
return tasks.keySet();
}
+ void setShutdownEarly()
Review Comment:
Should these methods be renamed to `setHandoffEarly()` to correspond to the
API and the method in `SupervisorManager` / `Supervisor` classes?
##########
indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisor.java:
##########
@@ -3143,14 +3194,15 @@ private void checkTaskDuration() throws
ExecutionException, InterruptedException
} else {
DateTime earliestTaskStart = computeEarliestTaskStartTime(group);
- if
(earliestTaskStart.plus(ioConfig.getTaskDuration()).isBeforeNow()) {
+ if
(earliestTaskStart.plus(ioConfig.getTaskDuration()).isBeforeNow() ||
group.getShutdownEarly()) {
Review Comment:
If the task group is marked for early shutdown, we should log it.
##########
indexing-service/src/main/java/org/apache/druid/indexing/overlord/supervisor/SupervisorResource.java:
##########
@@ -395,6 +399,45 @@ public Response shutdown(@PathParam("id") final String id)
return terminate(id);
}
+ /**
+ * This method will immediately try to handoff the list of task group ids
for the given supervisor.
+ * This is a best effort API and makes no guarantees of execution, e.g. if a
non-existent task group id
+ * is passed to it, the API call will still suceced.
+ */
+ @POST
+ @Path("/{id}/taskGroups/handoff")
+ @Consumes(MediaType.APPLICATION_JSON)
+ @Produces(MediaType.APPLICATION_JSON)
+ @ResourceFilters(SupervisorResourceFilter.class)
+ public Response handoffTaskGroups(@PathParam("id") final String id, @Nonnull
final HandoffTaskGroupsRequest handoffTaskGroupsRequest)
+ {
+ List<Integer> taskGroupIds = handoffTaskGroupsRequest.getTaskGroupIds();
+ if (taskGroupIds == null || taskGroupIds.isEmpty()) {
+ return Response.status(Response.Status.BAD_REQUEST)
+ .entity(ImmutableMap.of("error", "List of task groups to handoff
can't be empty"))
+ .build();
+
+ }
+ return asLeaderWithSupervisorManager(
+ manager -> {
+ try {
+ if (manager.handoffTaskGroupsEarly(id, taskGroupIds)) {
+ return Response.ok(ImmutableMap.of("id", id, "taskGroupIds",
taskGroupIds)).build();
Review Comment:
In the success case, why do we return the request parameters back in the
response? We might as well just return an empty 200 OK response.
Alternatively, we could return the `taskGroupIds` which were actually marked
for early hand-off. In the case where some of the requested `taskGroupIds` are
non-existent or not actively reading, the returned set of `taskGroupIds` could
differ from the requested one, thus telling the caller which ones will actually
be handed off.
##########
indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisor.java:
##########
@@ -657,6 +669,39 @@ public String getType()
}
}
+ private class HandoffTaskGroupsNotice implements Notice
+ {
+ final List<Integer> taskGroupIds;
+ private static final String TYPE = "handoff_task_group_notice";
+
+ HandoffTaskGroupsNotice(
+ @Nonnull final List<Integer> taskGroupIds
+ )
+ {
+ this.taskGroupIds = taskGroupIds;
+ }
+
+ @Override
+ public void handle()
+ {
+ for (Integer taskGroupId : taskGroupIds) {
+ TaskGroup taskGroup =
activelyReadingTaskGroups.getOrDefault(taskGroupId, null);
+ if (taskGroup == null) {
+ log.info("Tried to stop task group [%d] for supervisor [%s] that
wasn't actively reading.", taskGroupId, supervisorId);
+ continue;
+ }
+
+ taskGroup.setShutdownEarly();
Review Comment:
Nit:
```suggestion
if (taskGroup == null) {
log.info("Tried to stop task group[%d] for supervisor[%s] but it
is not actively reading.", taskGroupId, supervisorId);
} else {
taskGroup.setShutdownEarly();
}
```
##########
indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisor.java:
##########
@@ -657,6 +669,39 @@ public String getType()
}
}
+ private class HandoffTaskGroupsNotice implements Notice
+ {
+ final List<Integer> taskGroupIds;
+ private static final String TYPE = "handoff_task_group_notice";
+
+ HandoffTaskGroupsNotice(
+ @Nonnull final List<Integer> taskGroupIds
+ )
+ {
+ this.taskGroupIds = taskGroupIds;
+ }
+
+ @Override
+ public void handle()
+ {
+ for (Integer taskGroupId : taskGroupIds) {
Review Comment:
We should add an info log line here saying that we are now going to handoff
these task groups early. Otherwise, there is no way to know that such a request
was even received by the supervisor.
We can probably also add a log line in `SupervisorManager` to indicate when
the request was received. The log here can be used to identify when the notice
is actually handled.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]