This is an automated email from the ASF dual-hosted git repository.
georgew5656 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/druid.git
The following commit(s) were added to refs/heads/master by this push:
new ed9881df88a Cleanup logic from handoff API (#16457)
ed9881df88a is described below
commit ed9881df88ac936c11d192a9c54bd7a0cff9d001
Author: George Shiqi Wu <[email protected]>
AuthorDate: Thu May 16 08:42:44 2024 -0700
Cleanup logic from handoff API (#16457)
* Cleanup logic from handoff API
* Fix test
* Fix checkstyle
* Update docs
---
docs/api-reference/supervisor-api.md | 12 +-----------
.../overlord/supervisor/SupervisorResource.java | 2 +-
.../supervisor/SeekableStreamSupervisor.java | 18 +++++++++---------
.../druid/indexing/overlord/supervisor/Supervisor.java | 9 ++++++---
.../apache/druid/indexing/NoopSupervisorSpecTest.java | 3 +--
5 files changed, 18 insertions(+), 26 deletions(-)
diff --git a/docs/api-reference/supervisor-api.md
b/docs/api-reference/supervisor-api.md
index 74bbe27fd1e..aca9a2bb0d7 100644
--- a/docs/api-reference/supervisor-api.md
+++ b/docs/api-reference/supervisor-api.md
@@ -3594,17 +3594,7 @@ Content-Type: application/json
<details>
<summary>View the response</summary>
-
- ```json
-{
- "id": "social_media",
- "taskGroupIds": [
- 1,
- 2,
- 3
- ]
-}
- ```
+(empty response)
</details>
### Shut down a supervisor
diff --git
a/indexing-service/src/main/java/org/apache/druid/indexing/overlord/supervisor/SupervisorResource.java
b/indexing-service/src/main/java/org/apache/druid/indexing/overlord/supervisor/SupervisorResource.java
index 0ff76e5d41c..0cf58d38512 100644
---
a/indexing-service/src/main/java/org/apache/druid/indexing/overlord/supervisor/SupervisorResource.java
+++
b/indexing-service/src/main/java/org/apache/druid/indexing/overlord/supervisor/SupervisorResource.java
@@ -422,7 +422,7 @@ public class SupervisorResource
manager -> {
try {
if (manager.handoffTaskGroupsEarly(id, taskGroupIds)) {
- return Response.ok(ImmutableMap.of("id", id, "taskGroupIds",
taskGroupIds)).build();
+ return Response.ok().build();
} else {
return Response.status(Response.Status.NOT_FOUND)
.entity(ImmutableMap.of("error",
StringUtils.format("Supervisor was not found [%s]", id)))
diff --git
a/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisor.java
b/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisor.java
index 379f5fe9ae4..d413a9bec3e 100644
---
a/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisor.java
+++
b/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisor.java
@@ -203,7 +203,7 @@ public abstract class
SeekableStreamSupervisor<PartitionIdType, SequenceOffsetTy
final String baseSequenceName;
DateTime completionTimeout; // is set after signalTasksToFinish(); if not
done by timeout, take corrective action
- boolean shutdownEarly = false; // set by
SupervisorManager.stopTaskGroupEarly
+ boolean handoffEarly = false; // set by
SupervisorManager.stopTaskGroupEarly
TaskGroup(
int groupId,
@@ -268,14 +268,14 @@ public abstract class
SeekableStreamSupervisor<PartitionIdType, SequenceOffsetTy
return tasks.keySet();
}
- void setShutdownEarly()
+ void setHandoffEarly()
{
- shutdownEarly = true;
+ handoffEarly = true;
}
- Boolean getShutdownEarly()
+ Boolean getHandoffEarly()
{
- return shutdownEarly;
+ return handoffEarly;
}
@VisibleForTesting
@@ -690,8 +690,8 @@ public abstract class
SeekableStreamSupervisor<PartitionIdType, SequenceOffsetTy
log.info("Tried to stop task group [%d] for supervisor [%s] that
wasn't actively reading.", taskGroupId, supervisorId);
continue;
}
-
- taskGroup.setShutdownEarly();
+ log.info("Task group [%d] for supervisor [%s] will handoff early.",
taskGroupId, supervisorId);
+ taskGroup.setHandoffEarly();
}
}
@@ -3194,7 +3194,7 @@ public abstract class
SeekableStreamSupervisor<PartitionIdType, SequenceOffsetTy
} else {
DateTime earliestTaskStart = computeEarliestTaskStartTime(group);
- if
(earliestTaskStart.plus(ioConfig.getTaskDuration()).isBeforeNow() ||
group.getShutdownEarly()) {
+ if
(earliestTaskStart.plus(ioConfig.getTaskDuration()).isBeforeNow() ||
group.getHandoffEarly()) {
// if this task has run longer than the configured duration
// as long as the pending task groups are less than the
configured stop task count.
// If shutdownEarly has been set, ignore stopTaskCount since
this is a manual operator action.
@@ -3202,7 +3202,7 @@ public abstract class
SeekableStreamSupervisor<PartitionIdType, SequenceOffsetTy
.stream()
.mapToInt(CopyOnWriteArrayList::size)
.sum() + stoppedTasks.get()
- < ioConfig.getMaxAllowedStops() || group.getShutdownEarly())
{
+ < ioConfig.getMaxAllowedStops() || group.getHandoffEarly()) {
log.info(
"Task group [%d] has run for [%s]. Stopping.",
groupId,
diff --git
a/server/src/main/java/org/apache/druid/indexing/overlord/supervisor/Supervisor.java
b/server/src/main/java/org/apache/druid/indexing/overlord/supervisor/Supervisor.java
index 4d88a6b8633..e92b194e3e3 100644
---
a/server/src/main/java/org/apache/druid/indexing/overlord/supervisor/Supervisor.java
+++
b/server/src/main/java/org/apache/druid/indexing/overlord/supervisor/Supervisor.java
@@ -21,7 +21,6 @@ package org.apache.druid.indexing.overlord.supervisor;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
-import org.apache.commons.lang3.NotImplementedException;
import org.apache.druid.error.DruidException;
import org.apache.druid.indexing.overlord.DataSourceMetadata;
import org.apache.druid.indexing.overlord.supervisor.autoscaler.LagStats;
@@ -95,9 +94,13 @@ public interface Supervisor
int getActiveTaskGroupsCount();
- /** Handoff the task group with id=taskGroupId the next time the supervisor
runs regardless of task run time*/
+ /**
+ * Marks the given task groups as ready for segment hand-off irrespective of
the task run times.
+ * In the subsequent run, the supervisor initiates segment publish and
hand-off for these task groups and rolls over their tasks.
+ * taskGroupIds that are not valid or not actively reading are simply
ignored.
+ */
default void handoffTaskGroupsEarly(List<Integer> taskGroupIds)
{
- throw new NotImplementedException("Supervisor does not have the feature to
handoff task groups early implemented");
+ throw new UnsupportedOperationException("Supervisor does not have the
feature to handoff task groups early implemented");
}
}
diff --git
a/server/src/test/java/org/apache/druid/indexing/NoopSupervisorSpecTest.java
b/server/src/test/java/org/apache/druid/indexing/NoopSupervisorSpecTest.java
index c83c4c0647d..8bd4de3b889 100644
--- a/server/src/test/java/org/apache/druid/indexing/NoopSupervisorSpecTest.java
+++ b/server/src/test/java/org/apache/druid/indexing/NoopSupervisorSpecTest.java
@@ -20,7 +20,6 @@
package org.apache.druid.indexing;
import com.google.common.collect.ImmutableList;
-import org.apache.commons.lang3.NotImplementedException;
import org.apache.druid.indexing.overlord.ObjectMetadata;
import org.apache.druid.indexing.overlord.supervisor.NoopSupervisorSpec;
import org.apache.druid.indexing.overlord.supervisor.Supervisor;
@@ -97,7 +96,7 @@ public class NoopSupervisorSpecTest
{
NoopSupervisorSpec expectedSpec = new NoopSupervisorSpec(null, null);
Supervisor noOpSupervisor = expectedSpec.createSupervisor();
- Assert.assertThrows(NotImplementedException.class,
+ Assert.assertThrows(UnsupportedOperationException.class,
() -> noOpSupervisor.handoffTaskGroupsEarly(ImmutableList.of())
);
}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]