This is an automated email from the ASF dual-hosted git repository.
jonwei pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-druid.git
The following commit(s) were added to refs/heads/master by this push:
new f8f4526 Add suspend|resume|terminate all supervisors endpoints.
(#6272)
f8f4526 is described below
commit f8f4526b168568b770b0fd309d3bc2c59dfe4e27
Author: QiuMM <[email protected]>
AuthorDate: Thu Oct 11 12:41:59 2018 +0800
Add suspend|resume|terminate all supervisors endpoints. (#6272)
* ability to showdown all supervisors
* add doc
* address comments
* fix code style
* address comments
* change ternary assignment to if statement
* better docs
---
.../development/extensions-core/kafka-ingestion.md | 24 +++++-
.../overlord/supervisor/SupervisorManager.java | 53 ++++++++++++--
.../overlord/supervisor/SupervisorResource.java | 68 +++++++++++++----
.../supervisor/SupervisorResourceTest.java | 85 ++++++++++++++--------
4 files changed, 175 insertions(+), 55 deletions(-)
diff --git a/docs/content/development/extensions-core/kafka-ingestion.md
b/docs/content/development/extensions-core/kafka-ingestion.md
index ebc240a..568fc94 100644
--- a/docs/content/development/extensions-core/kafka-ingestion.md
+++ b/docs/content/development/extensions-core/kafka-ingestion.md
@@ -206,13 +206,27 @@ Suspend indexing tasks associated with a supervisor. Note
that the supervisor it
operating and emitting logs and metrics, it will just ensure that no indexing
tasks are running until the supervisor
is resumed. Responds with updated SupervisorSpec.
-#### Resume Supervisor
+#### Suspend All Supervisors
+
+```
+POST /druid/indexer/v1/supervisor/suspendAll
+```
+Suspend all supervisors at once.
+
+#### Resume Supervisor
```
POST /druid/indexer/v1/supervisor/<supervisorId>/resume
```
Resume indexing tasks for a supervisor. Responds with updated SupervisorSpec.
+#### Resume All Supervisors
+
+```
+POST /druid/indexer/v1/supervisor/resumeAll
+```
+Resume all supervisors at once.
+
#### Reset Supervisor
```
POST /druid/indexer/v1/supervisor/<supervisorId>/reset
@@ -241,7 +255,13 @@ with the supervisor history api, but will not be listed in
the 'get supervisors'
or status report be retrieved. The only way this supervisor can start again is
by submitting a functioning supervisor
spec to the create api.
-#### Shutdown Supervisor
+#### Terminate All Supervisors
+```
+POST /druid/indexer/v1/supervisor/terminateAll
+```
+Terminate all supervisors at once.
+
+#### Shutdown Supervisor
_Deprecated: use the equivalent 'terminate' instead_
```
POST /druid/indexer/v1/supervisor/<supervisorId>/shutdown
diff --git
a/indexing-service/src/main/java/org/apache/druid/indexing/overlord/supervisor/SupervisorManager.java
b/indexing-service/src/main/java/org/apache/druid/indexing/overlord/supervisor/SupervisorManager.java
index 22fd827..cfca0de 100644
---
a/indexing-service/src/main/java/org/apache/druid/indexing/overlord/supervisor/SupervisorManager.java
+++
b/indexing-service/src/main/java/org/apache/druid/indexing/overlord/supervisor/SupervisorManager.java
@@ -94,13 +94,31 @@ public class SupervisorManager
public boolean suspendOrResumeSupervisor(String id, boolean suspend)
{
Preconditions.checkState(started, "SupervisorManager not started");
- Pair<Supervisor, SupervisorSpec> pair = supervisors.get(id);
- Preconditions.checkNotNull(pair.rhs, "spec");
+ Preconditions.checkNotNull(id, "id");
+
+ synchronized (lock) {
+ Preconditions.checkState(started, "SupervisorManager not started");
+ return possiblySuspendOrResumeSupervisorInternal(id, suspend);
+ }
+ }
+
+ public void stopAndRemoveAllSupervisors()
+ {
+ Preconditions.checkState(started, "SupervisorManager not started");
+
synchronized (lock) {
Preconditions.checkState(started, "SupervisorManager not started");
- SupervisorSpec nextState = suspend ? pair.rhs.createSuspendedSpec() :
pair.rhs.createRunningSpec();
- possiblyStopAndRemoveSupervisorInternal(nextState.getId(), false);
- return createAndStartSupervisorInternal(nextState, true);
+ supervisors.keySet().forEach(id ->
possiblyStopAndRemoveSupervisorInternal(id, true));
+ }
+ }
+
+ public void suspendOrResumeAllSupervisors(boolean suspend)
+ {
+ Preconditions.checkState(started, "SupervisorManager not started");
+
+ synchronized (lock) {
+ Preconditions.checkState(started, "SupervisorManager not started");
+ supervisors.keySet().forEach(id ->
possiblySuspendOrResumeSupervisorInternal(id, suspend));
}
}
@@ -206,7 +224,7 @@ public class SupervisorManager
* Stops a supervisor with a given id and then removes it from the list.
* <p/>
* Caller should have acquired [lock] before invoking this method to avoid
contention with other threads that may be
- * starting and stopping supervisors.
+ * starting, stopping, suspending and resuming supervisors.
*
* @return true if a supervisor was stopped, false if there was no
supervisor with this id
*/
@@ -227,10 +245,31 @@ public class SupervisorManager
}
/**
+ * Suspend or resume a supervisor with a given id.
+ * <p/>
+ * Caller should have acquired [lock] before invoking this method to avoid
contention with other threads that may be
+ * starting, stopping, suspending and resuming supervisors.
+ *
+ * @return true if a supervisor was suspended or resumed, false if there was
no supervisor with this id
+ * or suspend a suspended supervisor or resume a running supervisor
+ */
+ private boolean possiblySuspendOrResumeSupervisorInternal(String id, boolean
suspend)
+ {
+ Pair<Supervisor, SupervisorSpec> pair = supervisors.get(id);
+ if (pair == null || pair.rhs.isSuspended() == suspend) {
+ return false;
+ }
+
+ SupervisorSpec nextState = suspend ? pair.rhs.createSuspendedSpec() :
pair.rhs.createRunningSpec();
+ possiblyStopAndRemoveSupervisorInternal(nextState.getId(), false);
+ return createAndStartSupervisorInternal(nextState, true);
+ }
+
+ /**
* Creates a supervisor from the provided spec and starts it if there is not
already a supervisor with that id.
* <p/>
* Caller should have acquired [lock] before invoking this method to avoid
contention with other threads that may be
- * starting and stopping supervisors.
+ * starting, stopping, suspending and resuming supervisors.
*
* @return true if a new supervisor was created, false if there was already
an existing supervisor with this id
*/
diff --git
a/indexing-service/src/main/java/org/apache/druid/indexing/overlord/supervisor/SupervisorResource.java
b/indexing-service/src/main/java/org/apache/druid/indexing/overlord/supervisor/SupervisorResource.java
index d3e19bb..97e0580 100644
---
a/indexing-service/src/main/java/org/apache/druid/indexing/overlord/supervisor/SupervisorResource.java
+++
b/indexing-service/src/main/java/org/apache/druid/indexing/overlord/supervisor/SupervisorResource.java
@@ -260,6 +260,35 @@ public class SupervisorResource
);
}
+ @POST
+ @Path("/suspendAll")
+ @Produces(MediaType.APPLICATION_JSON)
+ public Response suspendAll()
+ {
+ return suspendOrResumeAll(true);
+ }
+
+ @POST
+ @Path("/resumeAll")
+ @Produces(MediaType.APPLICATION_JSON)
+ public Response resumeAll()
+ {
+ return suspendOrResumeAll(false);
+ }
+
+ @POST
+ @Path("/terminateAll")
+ @Produces(MediaType.APPLICATION_JSON)
+ public Response terminateAll()
+ {
+ return asLeaderWithSupervisorManager(
+ manager -> {
+ manager.stopAndRemoveAllSupervisors();
+ return Response.ok(ImmutableMap.of("status", "success")).build();
+ }
+ );
+ }
+
@GET
@Path("/history")
@Produces(MediaType.APPLICATION_JSON)
@@ -378,23 +407,34 @@ public class SupervisorResource
{
return asLeaderWithSupervisorManager(
manager -> {
- Optional<SupervisorSpec> spec = manager.getSupervisorSpec(id);
- if (!spec.isPresent()) {
- return Response.status(Response.Status.NOT_FOUND)
- .entity(ImmutableMap.of("error",
StringUtils.format("[%s] does not exist", id)))
- .build();
- }
-
- if (spec.get().isSuspended() == suspend) {
- final String errMsg =
- StringUtils.format("[%s] is already %s", id, suspend ?
"suspended" : "running");
- return Response.status(Response.Status.BAD_REQUEST)
+ if (manager.suspendOrResumeSupervisor(id, suspend)) {
+ Optional<SupervisorSpec> spec = manager.getSupervisorSpec(id);
+ return Response.ok(spec.get()).build();
+ } else {
+ Optional<SupervisorSpec> spec = manager.getSupervisorSpec(id);
+ Response.Status status;
+ String errMsg;
+ if (spec.isPresent()) {
+ status = Response.Status.BAD_REQUEST;
+ errMsg = StringUtils.format("[%s] is already %s", id, suspend ?
"suspended" : "running");
+ } else {
+ status = Response.Status.NOT_FOUND;
+ errMsg = StringUtils.format("[%s] does not exist", id);
+ }
+ return Response.status(status)
.entity(ImmutableMap.of("error", errMsg))
.build();
}
- manager.suspendOrResumeSupervisor(id, suspend);
- spec = manager.getSupervisorSpec(id);
- return Response.ok(spec.get()).build();
+ }
+ );
+ }
+
+ private Response suspendOrResumeAll(boolean suspend)
+ {
+ return asLeaderWithSupervisorManager(
+ manager -> {
+ manager.suspendOrResumeAllSupervisors(suspend);
+ return Response.ok(ImmutableMap.of("status", "success")).build();
}
);
}
diff --git
a/indexing-service/src/test/java/org/apache/druid/indexing/overlord/supervisor/SupervisorResourceTest.java
b/indexing-service/src/test/java/org/apache/druid/indexing/overlord/supervisor/SupervisorResourceTest.java
index 9d6eab3..d893898 100644
---
a/indexing-service/src/test/java/org/apache/druid/indexing/overlord/supervisor/SupervisorResourceTest.java
+++
b/indexing-service/src/test/java/org/apache/druid/indexing/overlord/supervisor/SupervisorResourceTest.java
@@ -292,14 +292,6 @@ public class SupervisorResourceTest extends EasyMockSupport
@Test
public void testSpecSuspend()
{
-
- TestSupervisorSpec running = new TestSupervisorSpec("my-id", null, null,
false) {
- @Override
- public List<String> getDataSources()
- {
- return Collections.singletonList("datasource1");
- }
- };
TestSupervisorSpec suspended = new TestSupervisorSpec("my-id", null, null,
true) {
@Override
public List<String> getDataSources()
@@ -309,11 +301,8 @@ public class SupervisorResourceTest extends EasyMockSupport
};
EasyMock.expect(taskMaster.getSupervisorManager()).andReturn(Optional.of(supervisorManager));
- EasyMock.expect(supervisorManager.getSupervisorSpec("my-id"))
- .andReturn(Optional.of(running)).times(1)
- .andReturn(Optional.of(suspended)).times(1);
EasyMock.expect(supervisorManager.suspendOrResumeSupervisor("my-id",
true)).andReturn(true);
- EasyMock.expectLastCall().anyTimes();
+
EasyMock.expect(supervisorManager.getSupervisorSpec("my-id")).andReturn(Optional.of(suspended));
replayAll();
Response response = supervisorResource.specSuspend("my-id");
@@ -326,7 +315,8 @@ public class SupervisorResourceTest extends EasyMockSupport
resetAll();
EasyMock.expect(taskMaster.getSupervisorManager()).andReturn(Optional.of(supervisorManager));
-
EasyMock.expect(supervisorManager.getSupervisorSpec("my-id")).andReturn(Optional.of(suspended)).atLeastOnce();
+ EasyMock.expect(supervisorManager.suspendOrResumeSupervisor("my-id",
true)).andReturn(false);
+
EasyMock.expect(supervisorManager.getSupervisorSpec("my-id")).andReturn(Optional.of(suspended));
replayAll();
response = supervisorResource.specSuspend("my-id");
@@ -336,18 +326,9 @@ public class SupervisorResourceTest extends EasyMockSupport
Assert.assertEquals(ImmutableMap.of("error", "[my-id] is already
suspended"), response.getEntity());
}
-
-
@Test
public void testSpecResume()
{
- TestSupervisorSpec suspended = new TestSupervisorSpec("my-id", null, null,
true) {
- @Override
- public List<String> getDataSources()
- {
- return Collections.singletonList("datasource1");
- }
- };
TestSupervisorSpec running = new TestSupervisorSpec("my-id", null, null,
false) {
@Override
public List<String> getDataSources()
@@ -357,11 +338,8 @@ public class SupervisorResourceTest extends EasyMockSupport
};
EasyMock.expect(taskMaster.getSupervisorManager()).andReturn(Optional.of(supervisorManager));
- EasyMock.expect(supervisorManager.getSupervisorSpec("my-id"))
- .andReturn(Optional.of(suspended)).times(1)
- .andReturn(Optional.of(running)).times(1);
EasyMock.expect(supervisorManager.suspendOrResumeSupervisor("my-id",
false)).andReturn(true);
- EasyMock.expectLastCall().anyTimes();
+
EasyMock.expect(supervisorManager.getSupervisorSpec("my-id")).andReturn(Optional.of(running));
replayAll();
Response response = supervisorResource.specResume("my-id");
@@ -374,7 +352,8 @@ public class SupervisorResourceTest extends EasyMockSupport
resetAll();
EasyMock.expect(taskMaster.getSupervisorManager()).andReturn(Optional.of(supervisorManager));
-
EasyMock.expect(supervisorManager.getSupervisorSpec("my-id")).andReturn(Optional.of(running)).atLeastOnce();
+ EasyMock.expect(supervisorManager.suspendOrResumeSupervisor("my-id",
false)).andReturn(false);
+
EasyMock.expect(supervisorManager.getSupervisorSpec("my-id")).andReturn(Optional.of(running));
replayAll();
response = supervisorResource.specResume("my-id");
@@ -385,19 +364,19 @@ public class SupervisorResourceTest extends
EasyMockSupport
}
@Test
- public void testShutdown()
+ public void testTerminate()
{
EasyMock.expect(taskMaster.getSupervisorManager()).andReturn(Optional.of(supervisorManager)).times(2);
EasyMock.expect(supervisorManager.stopAndRemoveSupervisor("my-id")).andReturn(true);
EasyMock.expect(supervisorManager.stopAndRemoveSupervisor("my-id-2")).andReturn(false);
replayAll();
- Response response = supervisorResource.shutdown("my-id");
+ Response response = supervisorResource.terminate("my-id");
Assert.assertEquals(200, response.getStatus());
Assert.assertEquals(ImmutableMap.of("id", "my-id"), response.getEntity());
- response = supervisorResource.shutdown("my-id-2");
+ response = supervisorResource.terminate("my-id-2");
Assert.assertEquals(404, response.getStatus());
verifyAll();
@@ -407,13 +386,55 @@ public class SupervisorResourceTest extends
EasyMockSupport
EasyMock.expect(taskMaster.getSupervisorManager()).andReturn(Optional.absent());
replayAll();
- response = supervisorResource.shutdown("my-id");
+ response = supervisorResource.terminate("my-id");
verifyAll();
Assert.assertEquals(503, response.getStatus());
}
@Test
+ public void testSuspendAll()
+ {
+
EasyMock.expect(taskMaster.getSupervisorManager()).andReturn(Optional.of(supervisorManager));
+ supervisorManager.suspendOrResumeAllSupervisors(true);
+ EasyMock.expectLastCall();
+ replayAll();
+
+ Response response = supervisorResource.suspendAll();
+ Assert.assertEquals(200, response.getStatus());
+ Assert.assertEquals(ImmutableMap.of("status", "success"),
response.getEntity());
+ verifyAll();
+ }
+
+ @Test
+ public void testResumeAll()
+ {
+
EasyMock.expect(taskMaster.getSupervisorManager()).andReturn(Optional.of(supervisorManager));
+ supervisorManager.suspendOrResumeAllSupervisors(false);
+ EasyMock.expectLastCall();
+ replayAll();
+
+ Response response = supervisorResource.resumeAll();
+ Assert.assertEquals(200, response.getStatus());
+ Assert.assertEquals(ImmutableMap.of("status", "success"),
response.getEntity());
+ verifyAll();
+ }
+
+ @Test
+ public void testTerminateAll()
+ {
+
EasyMock.expect(taskMaster.getSupervisorManager()).andReturn(Optional.of(supervisorManager));
+ supervisorManager.stopAndRemoveAllSupervisors();
+ EasyMock.expectLastCall();
+ replayAll();
+
+ Response response = supervisorResource.terminateAll();
+ Assert.assertEquals(200, response.getStatus());
+ Assert.assertEquals(ImmutableMap.of("status", "success"),
response.getEntity());
+ verifyAll();
+ }
+
+ @Test
public void testSpecGetAllHistory()
{
List<VersionedSupervisorSpec> versions1 = ImmutableList.of(
@@ -872,7 +893,7 @@ public class SupervisorResourceTest extends EasyMockSupport
EasyMock.expect(taskMaster.getSupervisorManager()).andReturn(Optional.absent());
replayAll();
- response = supervisorResource.shutdown("my-id");
+ response = supervisorResource.terminate("my-id");
Assert.assertEquals(503, response.getStatus());
verifyAll();
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]