kfaraz commented on code in PR #16310:
URL: https://github.com/apache/druid/pull/16310#discussion_r1600978610


##########
server/src/main/java/org/apache/druid/indexing/overlord/supervisor/Supervisor.java:
##########
@@ -93,4 +94,10 @@ default Boolean isHealthy()
   LagStats computeLagStats();
 
   int getActiveTaskGroupsCount();
+
+  /** Handoff the task group with id=taskGroupId the next time the supervisor 
runs regardless of task run time*/

Review Comment:
   Some rephrase + Druid javadoc styling
   ```suggestion
     /**
      * Marks the given task groups as ready for segment hand-off irrespective 
of the task run times.
      * In the subsequent run, the supervisor initiates segment publish and 
hand-off for these task groups and rolls over their tasks.
      * taskGroupIds that are not valid or not actively reading are simply 
ignored.
      */
   ```



##########
server/src/main/java/org/apache/druid/indexing/overlord/supervisor/Supervisor.java:
##########
@@ -93,4 +94,10 @@ default Boolean isHealthy()
   LagStats computeLagStats();
 
   int getActiveTaskGroupsCount();
+
+  /** Handoff the task group with id=taskGroupId the next time the supervisor 
runs regardless of task run time*/
+  default void handoffTaskGroupsEarly(List<Integer> taskGroupIds)
+  {
+    throw new NotImplementedException("Supervisor does not have the feature to 
handoff task groups early implemented");

Review Comment:
   Nit: Why not use `UnsupportedOperationException` instead?
   The error message seems to suggest that this is more of an "unsupported" 
scenario.



##########
indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisor.java:
##########
@@ -266,6 +268,16 @@ Set<String> taskIds()
       return tasks.keySet();
     }
 
+    void setShutdownEarly()

Review Comment:
   Should these methods be renamed to `setHandoffEarly()` to correspond to the 
API and the method in `SupervisorManager` / `Supervisor` classes?



##########
indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisor.java:
##########
@@ -3143,14 +3194,15 @@ private void checkTaskDuration() throws 
ExecutionException, InterruptedException
           } else {
             DateTime earliestTaskStart = computeEarliestTaskStartTime(group);
 
-            if 
(earliestTaskStart.plus(ioConfig.getTaskDuration()).isBeforeNow()) {
+            if 
(earliestTaskStart.plus(ioConfig.getTaskDuration()).isBeforeNow() || 
group.getShutdownEarly()) {

Review Comment:
   If the task group is marked for early shutdown, we should log it.



##########
indexing-service/src/main/java/org/apache/druid/indexing/overlord/supervisor/SupervisorResource.java:
##########
@@ -395,6 +399,45 @@ public Response shutdown(@PathParam("id") final String id)
     return terminate(id);
   }
 
+  /**
+   * This method will immediately try to handoff the list of task group ids 
for the given supervisor.
+   * This is a best effort API and makes no guarantees of execution, e.g. if a 
non-existent task group id
+   * is passed to it, the API call will still suceced.
+   */
+  @POST
+  @Path("/{id}/taskGroups/handoff")
+  @Consumes(MediaType.APPLICATION_JSON)
+  @Produces(MediaType.APPLICATION_JSON)
+  @ResourceFilters(SupervisorResourceFilter.class)
+  public Response handoffTaskGroups(@PathParam("id") final String id, @Nonnull 
final HandoffTaskGroupsRequest handoffTaskGroupsRequest)
+  {
+    List<Integer> taskGroupIds = handoffTaskGroupsRequest.getTaskGroupIds();
+    if (taskGroupIds == null || taskGroupIds.isEmpty()) {
+      return Response.status(Response.Status.BAD_REQUEST)
+          .entity(ImmutableMap.of("error", "List of task groups to handoff 
can't be empty"))
+          .build();
+
+    }
+    return asLeaderWithSupervisorManager(
+        manager -> {
+          try {
+            if (manager.handoffTaskGroupsEarly(id, taskGroupIds)) {
+              return Response.ok(ImmutableMap.of("id", id, "taskGroupIds", 
taskGroupIds)).build();

Review Comment:
   In the success case, why do we return the request parameters back in the 
response? We might as well just return an empty 200 OK response.
   
   Alternatively, we could return the `taskGroupIds` which were actually marked 
for early hand-off. In the  case where some of the requested `taskGroupIds` are 
non-existent or not actively reading, the returned set of `taskGroupIds` could 
differ from the requested one, thus telling the caller which ones will actually 
be handed off.



##########
indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisor.java:
##########
@@ -657,6 +669,39 @@ public String getType()
     }
   }
 
+  private class HandoffTaskGroupsNotice implements Notice
+  {
+    final List<Integer> taskGroupIds;
+    private static final String TYPE = "handoff_task_group_notice";
+
+    HandoffTaskGroupsNotice(
+        @Nonnull final List<Integer> taskGroupIds
+    )
+    {
+      this.taskGroupIds = taskGroupIds;
+    }
+
+    @Override
+    public void handle()
+    {
+      for (Integer taskGroupId : taskGroupIds) {
+        TaskGroup taskGroup = 
activelyReadingTaskGroups.getOrDefault(taskGroupId, null);
+        if (taskGroup == null) {
+          log.info("Tried to stop task group [%d] for supervisor [%s] that 
wasn't actively reading.", taskGroupId, supervisorId);
+          continue;
+        }
+
+        taskGroup.setShutdownEarly();

Review Comment:
   Nit:
   ```suggestion
           if (taskGroup == null) {
             log.info("Tried to stop task group[%d] for supervisor[%s] but it 
is not actively reading.", taskGroupId, supervisorId);
           } else {
               taskGroup.setShutdownEarly();
           }
   ```



##########
indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisor.java:
##########
@@ -657,6 +669,39 @@ public String getType()
     }
   }
 
+  private class HandoffTaskGroupsNotice implements Notice
+  {
+    final List<Integer> taskGroupIds;
+    private static final String TYPE = "handoff_task_group_notice";
+
+    HandoffTaskGroupsNotice(
+        @Nonnull final List<Integer> taskGroupIds
+    )
+    {
+      this.taskGroupIds = taskGroupIds;
+    }
+
+    @Override
+    public void handle()
+    {
+      for (Integer taskGroupId : taskGroupIds) {

Review Comment:
   We should add an info log line here saying that we are now going to handoff 
these task groups early. Otherwise, there is no way to know that such a request 
was even received by the supervisor.
   
   We can probably also add a log line in `SupervisorManager` to indicate when 
the request was received. The log here can be used to identify when the notice 
is actually handled.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to