This is an automated email from the ASF dual-hosted git repository.

georgew5656 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/druid.git


The following commit(s) were added to refs/heads/master by this push:
     new ed9881df88a Cleanup logic from handoff API (#16457)
ed9881df88a is described below

commit ed9881df88ac936c11d192a9c54bd7a0cff9d001
Author: George Shiqi Wu <[email protected]>
AuthorDate: Thu May 16 08:42:44 2024 -0700

    Cleanup logic from handoff API (#16457)
    
    * Cleanup logic from handoff API
    
    * Fix test
    
    * Fix checkstyle
    
    * Update docs
---
 docs/api-reference/supervisor-api.md                   | 12 +-----------
 .../overlord/supervisor/SupervisorResource.java        |  2 +-
 .../supervisor/SeekableStreamSupervisor.java           | 18 +++++++++---------
 .../druid/indexing/overlord/supervisor/Supervisor.java |  9 ++++++---
 .../apache/druid/indexing/NoopSupervisorSpecTest.java  |  3 +--
 5 files changed, 18 insertions(+), 26 deletions(-)

diff --git a/docs/api-reference/supervisor-api.md 
b/docs/api-reference/supervisor-api.md
index 74bbe27fd1e..aca9a2bb0d7 100644
--- a/docs/api-reference/supervisor-api.md
+++ b/docs/api-reference/supervisor-api.md
@@ -3594,17 +3594,7 @@ Content-Type: application/json
 
 <details>
   <summary>View the response</summary>
-
-  ```json
-{
-  "id": "social_media",
-  "taskGroupIds": [
-    1,
-    2,
-    3
-  ]
-}
-  ```
+(empty response)
 </details>
 
 ### Shut down a supervisor
diff --git 
a/indexing-service/src/main/java/org/apache/druid/indexing/overlord/supervisor/SupervisorResource.java
 
b/indexing-service/src/main/java/org/apache/druid/indexing/overlord/supervisor/SupervisorResource.java
index 0ff76e5d41c..0cf58d38512 100644
--- 
a/indexing-service/src/main/java/org/apache/druid/indexing/overlord/supervisor/SupervisorResource.java
+++ 
b/indexing-service/src/main/java/org/apache/druid/indexing/overlord/supervisor/SupervisorResource.java
@@ -422,7 +422,7 @@ public class SupervisorResource
         manager -> {
           try {
             if (manager.handoffTaskGroupsEarly(id, taskGroupIds)) {
-              return Response.ok(ImmutableMap.of("id", id, "taskGroupIds", 
taskGroupIds)).build();
+              return Response.ok().build();
             } else {
               return Response.status(Response.Status.NOT_FOUND)
                   .entity(ImmutableMap.of("error", 
StringUtils.format("Supervisor was not found [%s]", id)))
diff --git 
a/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisor.java
 
b/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisor.java
index 379f5fe9ae4..d413a9bec3e 100644
--- 
a/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisor.java
+++ 
b/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisor.java
@@ -203,7 +203,7 @@ public abstract class 
SeekableStreamSupervisor<PartitionIdType, SequenceOffsetTy
     final String baseSequenceName;
     DateTime completionTimeout; // is set after signalTasksToFinish(); if not 
done by timeout, take corrective action
 
-    boolean shutdownEarly = false; // set by 
SupervisorManager.stopTaskGroupEarly
+    boolean handoffEarly = false; // set by 
SupervisorManager.stopTaskGroupEarly
 
     TaskGroup(
         int groupId,
@@ -268,14 +268,14 @@ public abstract class 
SeekableStreamSupervisor<PartitionIdType, SequenceOffsetTy
       return tasks.keySet();
     }
 
-    void setShutdownEarly()
+    void setHandoffEarly()
     {
-      shutdownEarly = true;
+      handoffEarly = true;
     }
 
-    Boolean getShutdownEarly()
+    Boolean getHandoffEarly()
     {
-      return shutdownEarly;
+      return handoffEarly;
     }
 
     @VisibleForTesting
@@ -690,8 +690,8 @@ public abstract class 
SeekableStreamSupervisor<PartitionIdType, SequenceOffsetTy
           log.info("Tried to stop task group [%d] for supervisor [%s] that 
wasn't actively reading.", taskGroupId, supervisorId);
           continue;
         }
-
-        taskGroup.setShutdownEarly();
+        log.info("Task group [%d] for supervisor [%s] will handoff early.", 
taskGroupId, supervisorId);
+        taskGroup.setHandoffEarly();
       }
     }
 
@@ -3194,7 +3194,7 @@ public abstract class 
SeekableStreamSupervisor<PartitionIdType, SequenceOffsetTy
           } else {
             DateTime earliestTaskStart = computeEarliestTaskStartTime(group);
 
-            if 
(earliestTaskStart.plus(ioConfig.getTaskDuration()).isBeforeNow() || 
group.getShutdownEarly()) {
+            if 
(earliestTaskStart.plus(ioConfig.getTaskDuration()).isBeforeNow() || 
group.getHandoffEarly()) {
               // if this task has run longer than the configured duration
               // as long as the pending task groups are less than the 
configured stop task count.
               // If shutdownEarly has been set, ignore stopTaskCount since 
this is a manual operator action.
@@ -3202,7 +3202,7 @@ public abstract class 
SeekableStreamSupervisor<PartitionIdType, SequenceOffsetTy
                                              .stream()
                                              
.mapToInt(CopyOnWriteArrayList::size)
                                              .sum() + stoppedTasks.get()
-                  < ioConfig.getMaxAllowedStops() || group.getShutdownEarly()) 
{
+                  < ioConfig.getMaxAllowedStops() || group.getHandoffEarly()) {
                 log.info(
                     "Task group [%d] has run for [%s]. Stopping.",
                     groupId,
diff --git 
a/server/src/main/java/org/apache/druid/indexing/overlord/supervisor/Supervisor.java
 
b/server/src/main/java/org/apache/druid/indexing/overlord/supervisor/Supervisor.java
index 4d88a6b8633..e92b194e3e3 100644
--- 
a/server/src/main/java/org/apache/druid/indexing/overlord/supervisor/Supervisor.java
+++ 
b/server/src/main/java/org/apache/druid/indexing/overlord/supervisor/Supervisor.java
@@ -21,7 +21,6 @@ package org.apache.druid.indexing.overlord.supervisor;
 
 import com.google.common.collect.ImmutableList;
 import com.google.common.collect.ImmutableMap;
-import org.apache.commons.lang3.NotImplementedException;
 import org.apache.druid.error.DruidException;
 import org.apache.druid.indexing.overlord.DataSourceMetadata;
 import org.apache.druid.indexing.overlord.supervisor.autoscaler.LagStats;
@@ -95,9 +94,13 @@ public interface Supervisor
 
   int getActiveTaskGroupsCount();
 
-  /** Handoff the task group with id=taskGroupId the next time the supervisor 
runs regardless of task run time*/
+  /**
+   * Marks the given task groups as ready for segment hand-off irrespective of 
the task run times.
+   * In the subsequent run, the supervisor initiates segment publish and 
hand-off for these task groups and rolls over their tasks.
+   * taskGroupIds that are not valid or not actively reading are simply 
ignored.
+   */
   default void handoffTaskGroupsEarly(List<Integer> taskGroupIds)
   {
-    throw new NotImplementedException("Supervisor does not have the feature to 
handoff task groups early implemented");
+    throw new UnsupportedOperationException("Supervisor does not have the 
feature to handoff task groups early implemented");
   }
 }
diff --git 
a/server/src/test/java/org/apache/druid/indexing/NoopSupervisorSpecTest.java 
b/server/src/test/java/org/apache/druid/indexing/NoopSupervisorSpecTest.java
index c83c4c0647d..8bd4de3b889 100644
--- a/server/src/test/java/org/apache/druid/indexing/NoopSupervisorSpecTest.java
+++ b/server/src/test/java/org/apache/druid/indexing/NoopSupervisorSpecTest.java
@@ -20,7 +20,6 @@
 package org.apache.druid.indexing;
 
 import com.google.common.collect.ImmutableList;
-import org.apache.commons.lang3.NotImplementedException;
 import org.apache.druid.indexing.overlord.ObjectMetadata;
 import org.apache.druid.indexing.overlord.supervisor.NoopSupervisorSpec;
 import org.apache.druid.indexing.overlord.supervisor.Supervisor;
@@ -97,7 +96,7 @@ public class NoopSupervisorSpecTest
   {
     NoopSupervisorSpec expectedSpec = new NoopSupervisorSpec(null, null);
     Supervisor noOpSupervisor = expectedSpec.createSupervisor();
-    Assert.assertThrows(NotImplementedException.class,
+    Assert.assertThrows(UnsupportedOperationException.class,
         () -> noOpSupervisor.handoffTaskGroupsEarly(ImmutableList.of())
     );
   }


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to