jon-wei closed pull request #6272: Add suspend|resume|terminate all supervisors
endpoints.
URL: https://github.com/apache/incubator-druid/pull/6272
This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:
As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):
diff --git a/docs/content/development/extensions-core/kafka-ingestion.md
b/docs/content/development/extensions-core/kafka-ingestion.md
index ebc240a2d16..568fc94fe30 100644
--- a/docs/content/development/extensions-core/kafka-ingestion.md
+++ b/docs/content/development/extensions-core/kafka-ingestion.md
@@ -206,13 +206,27 @@ Suspend indexing tasks associated with a supervisor. Note
that the supervisor it
operating and emitting logs and metrics, it will just ensure that no indexing
tasks are running until the supervisor
is resumed. Responds with updated SupervisorSpec.
-#### Resume Supervisor
+#### Suspend All Supervisors
+
+```
+POST /druid/indexer/v1/supervisor/suspendAll
+```
+Suspend all supervisors at once.
+
+#### Resume Supervisor
```
POST /druid/indexer/v1/supervisor/<supervisorId>/resume
```
Resume indexing tasks for a supervisor. Responds with updated SupervisorSpec.
+#### Resume All Supervisors
+
+```
+POST /druid/indexer/v1/supervisor/resumeAll
+```
+Resume all supervisors at once.
+
#### Reset Supervisor
```
POST /druid/indexer/v1/supervisor/<supervisorId>/reset
@@ -241,7 +255,13 @@ with the supervisor history api, but will not be listed in
the 'get supervisors'
or status report be retrieved. The only way this supervisor can start again is
by submitting a functioning supervisor
spec to the create api.
-#### Shutdown Supervisor
+#### Terminate All Supervisors
+```
+POST /druid/indexer/v1/supervisor/terminateAll
+```
+Terminate all supervisors at once.
+
+#### Shutdown Supervisor
_Deprecated: use the equivalent 'terminate' instead_
```
POST /druid/indexer/v1/supervisor/<supervisorId>/shutdown
diff --git
a/indexing-service/src/main/java/org/apache/druid/indexing/overlord/supervisor/SupervisorManager.java
b/indexing-service/src/main/java/org/apache/druid/indexing/overlord/supervisor/SupervisorManager.java
index 22fd82746a7..cfca0de4b60 100644
---
a/indexing-service/src/main/java/org/apache/druid/indexing/overlord/supervisor/SupervisorManager.java
+++
b/indexing-service/src/main/java/org/apache/druid/indexing/overlord/supervisor/SupervisorManager.java
@@ -94,13 +94,31 @@ public boolean stopAndRemoveSupervisor(String id)
public boolean suspendOrResumeSupervisor(String id, boolean suspend)
{
Preconditions.checkState(started, "SupervisorManager not started");
- Pair<Supervisor, SupervisorSpec> pair = supervisors.get(id);
- Preconditions.checkNotNull(pair.rhs, "spec");
+ Preconditions.checkNotNull(id, "id");
+
+ synchronized (lock) {
+ Preconditions.checkState(started, "SupervisorManager not started");
+ return possiblySuspendOrResumeSupervisorInternal(id, suspend);
+ }
+ }
+
+ public void stopAndRemoveAllSupervisors()
+ {
+ Preconditions.checkState(started, "SupervisorManager not started");
+
synchronized (lock) {
Preconditions.checkState(started, "SupervisorManager not started");
- SupervisorSpec nextState = suspend ? pair.rhs.createSuspendedSpec() :
pair.rhs.createRunningSpec();
- possiblyStopAndRemoveSupervisorInternal(nextState.getId(), false);
- return createAndStartSupervisorInternal(nextState, true);
+ supervisors.keySet().forEach(id ->
possiblyStopAndRemoveSupervisorInternal(id, true));
+ }
+ }
+
+ public void suspendOrResumeAllSupervisors(boolean suspend)
+ {
+ Preconditions.checkState(started, "SupervisorManager not started");
+
+ synchronized (lock) {
+ Preconditions.checkState(started, "SupervisorManager not started");
+ supervisors.keySet().forEach(id ->
possiblySuspendOrResumeSupervisorInternal(id, suspend));
}
}
@@ -206,7 +224,7 @@ public boolean checkPointDataSourceMetadata(
* Stops a supervisor with a given id and then removes it from the list.
* <p/>
* Caller should have acquired [lock] before invoking this method to avoid
contention with other threads that may be
- * starting and stopping supervisors.
+ * starting, stopping, suspending and resuming supervisors.
*
* @return true if a supervisor was stopped, false if there was no
supervisor with this id
*/
@@ -226,11 +244,32 @@ private boolean
possiblyStopAndRemoveSupervisorInternal(String id, boolean write
return true;
}
+ /**
+ * Suspend or resume a supervisor with a given id.
+ * <p/>
+ * Caller should have acquired [lock] before invoking this method to avoid
contention with other threads that may be
+ * starting, stopping, suspending and resuming supervisors.
+ *
+ * @return true if a supervisor was suspended or resumed, false if there was
no supervisor with this id
+ * or suspend a suspended supervisor or resume a running supervisor
+ */
+ private boolean possiblySuspendOrResumeSupervisorInternal(String id, boolean
suspend)
+ {
+ Pair<Supervisor, SupervisorSpec> pair = supervisors.get(id);
+ if (pair == null || pair.rhs.isSuspended() == suspend) {
+ return false;
+ }
+
+ SupervisorSpec nextState = suspend ? pair.rhs.createSuspendedSpec() :
pair.rhs.createRunningSpec();
+ possiblyStopAndRemoveSupervisorInternal(nextState.getId(), false);
+ return createAndStartSupervisorInternal(nextState, true);
+ }
+
/**
* Creates a supervisor from the provided spec and starts it if there is not
already a supervisor with that id.
* <p/>
* Caller should have acquired [lock] before invoking this method to avoid
contention with other threads that may be
- * starting and stopping supervisors.
+ * starting, stopping, suspending and resuming supervisors.
*
* @return true if a new supervisor was created, false if there was already
an existing supervisor with this id
*/
diff --git
a/indexing-service/src/main/java/org/apache/druid/indexing/overlord/supervisor/SupervisorResource.java
b/indexing-service/src/main/java/org/apache/druid/indexing/overlord/supervisor/SupervisorResource.java
index d3e19bbb4ab..97e0580376e 100644
---
a/indexing-service/src/main/java/org/apache/druid/indexing/overlord/supervisor/SupervisorResource.java
+++
b/indexing-service/src/main/java/org/apache/druid/indexing/overlord/supervisor/SupervisorResource.java
@@ -260,6 +260,35 @@ public Response terminate(@PathParam("id") final String id)
);
}
+ @POST
+ @Path("/suspendAll")
+ @Produces(MediaType.APPLICATION_JSON)
+ public Response suspendAll()
+ {
+ return suspendOrResumeAll(true);
+ }
+
+ @POST
+ @Path("/resumeAll")
+ @Produces(MediaType.APPLICATION_JSON)
+ public Response resumeAll()
+ {
+ return suspendOrResumeAll(false);
+ }
+
+ @POST
+ @Path("/terminateAll")
+ @Produces(MediaType.APPLICATION_JSON)
+ public Response terminateAll()
+ {
+ return asLeaderWithSupervisorManager(
+ manager -> {
+ manager.stopAndRemoveAllSupervisors();
+ return Response.ok(ImmutableMap.of("status", "success")).build();
+ }
+ );
+ }
+
@GET
@Path("/history")
@Produces(MediaType.APPLICATION_JSON)
@@ -378,23 +407,34 @@ private Response specSuspendOrResume(final String id,
boolean suspend)
{
return asLeaderWithSupervisorManager(
manager -> {
- Optional<SupervisorSpec> spec = manager.getSupervisorSpec(id);
- if (!spec.isPresent()) {
- return Response.status(Response.Status.NOT_FOUND)
- .entity(ImmutableMap.of("error",
StringUtils.format("[%s] does not exist", id)))
- .build();
- }
-
- if (spec.get().isSuspended() == suspend) {
- final String errMsg =
- StringUtils.format("[%s] is already %s", id, suspend ?
"suspended" : "running");
- return Response.status(Response.Status.BAD_REQUEST)
+ if (manager.suspendOrResumeSupervisor(id, suspend)) {
+ Optional<SupervisorSpec> spec = manager.getSupervisorSpec(id);
+ return Response.ok(spec.get()).build();
+ } else {
+ Optional<SupervisorSpec> spec = manager.getSupervisorSpec(id);
+ Response.Status status;
+ String errMsg;
+ if (spec.isPresent()) {
+ status = Response.Status.BAD_REQUEST;
+ errMsg = StringUtils.format("[%s] is already %s", id, suspend ?
"suspended" : "running");
+ } else {
+ status = Response.Status.NOT_FOUND;
+ errMsg = StringUtils.format("[%s] does not exist", id);
+ }
+ return Response.status(status)
.entity(ImmutableMap.of("error", errMsg))
.build();
}
- manager.suspendOrResumeSupervisor(id, suspend);
- spec = manager.getSupervisorSpec(id);
- return Response.ok(spec.get()).build();
+ }
+ );
+ }
+
+ private Response suspendOrResumeAll(boolean suspend)
+ {
+ return asLeaderWithSupervisorManager(
+ manager -> {
+ manager.suspendOrResumeAllSupervisors(suspend);
+ return Response.ok(ImmutableMap.of("status", "success")).build();
}
);
}
diff --git
a/indexing-service/src/test/java/org/apache/druid/indexing/overlord/supervisor/SupervisorResourceTest.java
b/indexing-service/src/test/java/org/apache/druid/indexing/overlord/supervisor/SupervisorResourceTest.java
index 9d6eab33e1e..d893898c5a1 100644
---
a/indexing-service/src/test/java/org/apache/druid/indexing/overlord/supervisor/SupervisorResourceTest.java
+++
b/indexing-service/src/test/java/org/apache/druid/indexing/overlord/supervisor/SupervisorResourceTest.java
@@ -292,14 +292,6 @@ public void testSpecGetStatus()
@Test
public void testSpecSuspend()
{
-
- TestSupervisorSpec running = new TestSupervisorSpec("my-id", null, null,
false) {
- @Override
- public List<String> getDataSources()
- {
- return Collections.singletonList("datasource1");
- }
- };
TestSupervisorSpec suspended = new TestSupervisorSpec("my-id", null, null,
true) {
@Override
public List<String> getDataSources()
@@ -309,11 +301,8 @@ public void testSpecSuspend()
};
EasyMock.expect(taskMaster.getSupervisorManager()).andReturn(Optional.of(supervisorManager));
- EasyMock.expect(supervisorManager.getSupervisorSpec("my-id"))
- .andReturn(Optional.of(running)).times(1)
- .andReturn(Optional.of(suspended)).times(1);
EasyMock.expect(supervisorManager.suspendOrResumeSupervisor("my-id",
true)).andReturn(true);
- EasyMock.expectLastCall().anyTimes();
+
EasyMock.expect(supervisorManager.getSupervisorSpec("my-id")).andReturn(Optional.of(suspended));
replayAll();
Response response = supervisorResource.specSuspend("my-id");
@@ -326,7 +315,8 @@ public void testSpecSuspend()
resetAll();
EasyMock.expect(taskMaster.getSupervisorManager()).andReturn(Optional.of(supervisorManager));
-
EasyMock.expect(supervisorManager.getSupervisorSpec("my-id")).andReturn(Optional.of(suspended)).atLeastOnce();
+ EasyMock.expect(supervisorManager.suspendOrResumeSupervisor("my-id",
true)).andReturn(false);
+
EasyMock.expect(supervisorManager.getSupervisorSpec("my-id")).andReturn(Optional.of(suspended));
replayAll();
response = supervisorResource.specSuspend("my-id");
@@ -336,18 +326,9 @@ public void testSpecSuspend()
Assert.assertEquals(ImmutableMap.of("error", "[my-id] is already
suspended"), response.getEntity());
}
-
-
@Test
public void testSpecResume()
{
- TestSupervisorSpec suspended = new TestSupervisorSpec("my-id", null, null,
true) {
- @Override
- public List<String> getDataSources()
- {
- return Collections.singletonList("datasource1");
- }
- };
TestSupervisorSpec running = new TestSupervisorSpec("my-id", null, null,
false) {
@Override
public List<String> getDataSources()
@@ -357,11 +338,8 @@ public void testSpecResume()
};
EasyMock.expect(taskMaster.getSupervisorManager()).andReturn(Optional.of(supervisorManager));
- EasyMock.expect(supervisorManager.getSupervisorSpec("my-id"))
- .andReturn(Optional.of(suspended)).times(1)
- .andReturn(Optional.of(running)).times(1);
EasyMock.expect(supervisorManager.suspendOrResumeSupervisor("my-id",
false)).andReturn(true);
- EasyMock.expectLastCall().anyTimes();
+
EasyMock.expect(supervisorManager.getSupervisorSpec("my-id")).andReturn(Optional.of(running));
replayAll();
Response response = supervisorResource.specResume("my-id");
@@ -374,7 +352,8 @@ public void testSpecResume()
resetAll();
EasyMock.expect(taskMaster.getSupervisorManager()).andReturn(Optional.of(supervisorManager));
-
EasyMock.expect(supervisorManager.getSupervisorSpec("my-id")).andReturn(Optional.of(running)).atLeastOnce();
+ EasyMock.expect(supervisorManager.suspendOrResumeSupervisor("my-id",
false)).andReturn(false);
+
EasyMock.expect(supervisorManager.getSupervisorSpec("my-id")).andReturn(Optional.of(running));
replayAll();
response = supervisorResource.specResume("my-id");
@@ -385,19 +364,19 @@ public void testSpecResume()
}
@Test
- public void testShutdown()
+ public void testTerminate()
{
EasyMock.expect(taskMaster.getSupervisorManager()).andReturn(Optional.of(supervisorManager)).times(2);
EasyMock.expect(supervisorManager.stopAndRemoveSupervisor("my-id")).andReturn(true);
EasyMock.expect(supervisorManager.stopAndRemoveSupervisor("my-id-2")).andReturn(false);
replayAll();
- Response response = supervisorResource.shutdown("my-id");
+ Response response = supervisorResource.terminate("my-id");
Assert.assertEquals(200, response.getStatus());
Assert.assertEquals(ImmutableMap.of("id", "my-id"), response.getEntity());
- response = supervisorResource.shutdown("my-id-2");
+ response = supervisorResource.terminate("my-id-2");
Assert.assertEquals(404, response.getStatus());
verifyAll();
@@ -407,12 +386,54 @@ public void testShutdown()
EasyMock.expect(taskMaster.getSupervisorManager()).andReturn(Optional.absent());
replayAll();
- response = supervisorResource.shutdown("my-id");
+ response = supervisorResource.terminate("my-id");
verifyAll();
Assert.assertEquals(503, response.getStatus());
}
+ @Test
+ public void testSuspendAll()
+ {
+
EasyMock.expect(taskMaster.getSupervisorManager()).andReturn(Optional.of(supervisorManager));
+ supervisorManager.suspendOrResumeAllSupervisors(true);
+ EasyMock.expectLastCall();
+ replayAll();
+
+ Response response = supervisorResource.suspendAll();
+ Assert.assertEquals(200, response.getStatus());
+ Assert.assertEquals(ImmutableMap.of("status", "success"),
response.getEntity());
+ verifyAll();
+ }
+
+ @Test
+ public void testResumeAll()
+ {
+
EasyMock.expect(taskMaster.getSupervisorManager()).andReturn(Optional.of(supervisorManager));
+ supervisorManager.suspendOrResumeAllSupervisors(false);
+ EasyMock.expectLastCall();
+ replayAll();
+
+ Response response = supervisorResource.resumeAll();
+ Assert.assertEquals(200, response.getStatus());
+ Assert.assertEquals(ImmutableMap.of("status", "success"),
response.getEntity());
+ verifyAll();
+ }
+
+ @Test
+ public void testTerminateAll()
+ {
+
EasyMock.expect(taskMaster.getSupervisorManager()).andReturn(Optional.of(supervisorManager));
+ supervisorManager.stopAndRemoveAllSupervisors();
+ EasyMock.expectLastCall();
+ replayAll();
+
+ Response response = supervisorResource.terminateAll();
+ Assert.assertEquals(200, response.getStatus());
+ Assert.assertEquals(ImmutableMap.of("status", "success"),
response.getEntity());
+ verifyAll();
+ }
+
@Test
public void testSpecGetAllHistory()
{
@@ -872,7 +893,7 @@ public void testReset()
EasyMock.expect(taskMaster.getSupervisorManager()).andReturn(Optional.absent());
replayAll();
- response = supervisorResource.shutdown("my-id");
+ response = supervisorResource.terminate("my-id");
Assert.assertEquals(503, response.getStatus());
verifyAll();
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]
With regards,
Apache Git Services
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]