This is an automated email from the ASF dual-hosted git repository.

jonwei pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-druid.git


The following commit(s) were added to refs/heads/master by this push:
     new f8f4526  Add suspend|resume|terminate all supervisors endpoints. 
(#6272)
f8f4526 is described below

commit f8f4526b168568b770b0fd309d3bc2c59dfe4e27
Author: QiuMM <[email protected]>
AuthorDate: Thu Oct 11 12:41:59 2018 +0800

    Add suspend|resume|terminate all supervisors endpoints. (#6272)
    
    * ability to showdown all supervisors
    
    * add doc
    
    * address comments
    
    * fix code style
    
    * address comments
    
    * change ternary assignment to if statement
    
    * better docs
---
 .../development/extensions-core/kafka-ingestion.md | 24 +++++-
 .../overlord/supervisor/SupervisorManager.java     | 53 ++++++++++++--
 .../overlord/supervisor/SupervisorResource.java    | 68 +++++++++++++----
 .../supervisor/SupervisorResourceTest.java         | 85 ++++++++++++++--------
 4 files changed, 175 insertions(+), 55 deletions(-)

diff --git a/docs/content/development/extensions-core/kafka-ingestion.md 
b/docs/content/development/extensions-core/kafka-ingestion.md
index ebc240a..568fc94 100644
--- a/docs/content/development/extensions-core/kafka-ingestion.md
+++ b/docs/content/development/extensions-core/kafka-ingestion.md
@@ -206,13 +206,27 @@ Suspend indexing tasks associated with a supervisor. Note 
that the supervisor it
 operating and emitting logs and metrics, it will just ensure that no indexing 
tasks are running until the supervisor
 is resumed. Responds with updated SupervisorSpec.
 
-#### Resume Supervisor 
+#### Suspend All Supervisors
+
+```
+POST /druid/indexer/v1/supervisor/suspendAll
+```
+Suspend all supervisors at once.
+
+#### Resume Supervisor
 
 ```
 POST /druid/indexer/v1/supervisor/<supervisorId>/resume
 ```
 Resume indexing tasks for a supervisor. Responds with updated SupervisorSpec.
 
+#### Resume All Supervisors
+
+```
+POST /druid/indexer/v1/supervisor/resumeAll
+```
+Resume all supervisors at once.
+
 #### Reset Supervisor
 ```
 POST /druid/indexer/v1/supervisor/<supervisorId>/reset
@@ -241,7 +255,13 @@ with the supervisor history api, but will not be listed in 
the 'get supervisors'
 or status report be retrieved. The only way this supervisor can start again is 
by submitting a functioning supervisor
 spec to the create api.
 
-#### Shutdown Supervisor 
+#### Terminate All Supervisors
+```
+POST /druid/indexer/v1/supervisor/terminateAll
+```
+Terminate all supervisors at once.
+
+#### Shutdown Supervisor
 _Deprecated: use the equivalent 'terminate' instead_
 ```
 POST /druid/indexer/v1/supervisor/<supervisorId>/shutdown
diff --git 
a/indexing-service/src/main/java/org/apache/druid/indexing/overlord/supervisor/SupervisorManager.java
 
b/indexing-service/src/main/java/org/apache/druid/indexing/overlord/supervisor/SupervisorManager.java
index 22fd827..cfca0de 100644
--- 
a/indexing-service/src/main/java/org/apache/druid/indexing/overlord/supervisor/SupervisorManager.java
+++ 
b/indexing-service/src/main/java/org/apache/druid/indexing/overlord/supervisor/SupervisorManager.java
@@ -94,13 +94,31 @@ public class SupervisorManager
   public boolean suspendOrResumeSupervisor(String id, boolean suspend)
   {
     Preconditions.checkState(started, "SupervisorManager not started");
-    Pair<Supervisor, SupervisorSpec> pair = supervisors.get(id);
-    Preconditions.checkNotNull(pair.rhs, "spec");
+    Preconditions.checkNotNull(id, "id");
+
+    synchronized (lock) {
+      Preconditions.checkState(started, "SupervisorManager not started");
+      return possiblySuspendOrResumeSupervisorInternal(id, suspend);
+    }
+  }
+
+  public void stopAndRemoveAllSupervisors()
+  {
+    Preconditions.checkState(started, "SupervisorManager not started");
+
     synchronized (lock) {
       Preconditions.checkState(started, "SupervisorManager not started");
-      SupervisorSpec nextState = suspend ? pair.rhs.createSuspendedSpec() : 
pair.rhs.createRunningSpec();
-      possiblyStopAndRemoveSupervisorInternal(nextState.getId(), false);
-      return createAndStartSupervisorInternal(nextState, true);
+      supervisors.keySet().forEach(id -> 
possiblyStopAndRemoveSupervisorInternal(id, true));
+    }
+  }
+
+  public void suspendOrResumeAllSupervisors(boolean suspend)
+  {
+    Preconditions.checkState(started, "SupervisorManager not started");
+
+    synchronized (lock) {
+      Preconditions.checkState(started, "SupervisorManager not started");
+      supervisors.keySet().forEach(id -> 
possiblySuspendOrResumeSupervisorInternal(id, suspend));
     }
   }
 
@@ -206,7 +224,7 @@ public class SupervisorManager
    * Stops a supervisor with a given id and then removes it from the list.
    * <p/>
    * Caller should have acquired [lock] before invoking this method to avoid 
contention with other threads that may be
-   * starting and stopping supervisors.
+   * starting, stopping, suspending and resuming supervisors.
    *
    * @return true if a supervisor was stopped, false if there was no 
supervisor with this id
    */
@@ -227,10 +245,31 @@ public class SupervisorManager
   }
 
   /**
+   * Suspend or resume a supervisor with a given id.
+   * <p/>
+   * Caller should have acquired [lock] before invoking this method to avoid 
contention with other threads that may be
+   * starting, stopping, suspending and resuming supervisors.
+   *
+   * @return true if a supervisor was suspended or resumed, false if there was 
no supervisor with this id
+   * or suspend a suspended supervisor or resume a running supervisor
+   */
+  private boolean possiblySuspendOrResumeSupervisorInternal(String id, boolean 
suspend)
+  {
+    Pair<Supervisor, SupervisorSpec> pair = supervisors.get(id);
+    if (pair == null || pair.rhs.isSuspended() == suspend) {
+      return false;
+    }
+
+    SupervisorSpec nextState = suspend ? pair.rhs.createSuspendedSpec() : 
pair.rhs.createRunningSpec();
+    possiblyStopAndRemoveSupervisorInternal(nextState.getId(), false);
+    return createAndStartSupervisorInternal(nextState, true);
+  }
+
+  /**
    * Creates a supervisor from the provided spec and starts it if there is not 
already a supervisor with that id.
    * <p/>
    * Caller should have acquired [lock] before invoking this method to avoid 
contention with other threads that may be
-   * starting and stopping supervisors.
+   * starting, stopping, suspending and resuming supervisors.
    *
    * @return true if a new supervisor was created, false if there was already 
an existing supervisor with this id
    */
diff --git 
a/indexing-service/src/main/java/org/apache/druid/indexing/overlord/supervisor/SupervisorResource.java
 
b/indexing-service/src/main/java/org/apache/druid/indexing/overlord/supervisor/SupervisorResource.java
index d3e19bb..97e0580 100644
--- 
a/indexing-service/src/main/java/org/apache/druid/indexing/overlord/supervisor/SupervisorResource.java
+++ 
b/indexing-service/src/main/java/org/apache/druid/indexing/overlord/supervisor/SupervisorResource.java
@@ -260,6 +260,35 @@ public class SupervisorResource
     );
   }
 
+  @POST
+  @Path("/suspendAll")
+  @Produces(MediaType.APPLICATION_JSON)
+  public Response suspendAll()
+  {
+    return suspendOrResumeAll(true);
+  }
+
+  @POST
+  @Path("/resumeAll")
+  @Produces(MediaType.APPLICATION_JSON)
+  public Response resumeAll()
+  {
+    return suspendOrResumeAll(false);
+  }
+
+  @POST
+  @Path("/terminateAll")
+  @Produces(MediaType.APPLICATION_JSON)
+  public Response terminateAll()
+  {
+    return asLeaderWithSupervisorManager(
+        manager -> {
+          manager.stopAndRemoveAllSupervisors();
+          return Response.ok(ImmutableMap.of("status", "success")).build();
+        }
+    );
+  }
+
   @GET
   @Path("/history")
   @Produces(MediaType.APPLICATION_JSON)
@@ -378,23 +407,34 @@ public class SupervisorResource
   {
     return asLeaderWithSupervisorManager(
         manager -> {
-          Optional<SupervisorSpec> spec = manager.getSupervisorSpec(id);
-          if (!spec.isPresent()) {
-            return Response.status(Response.Status.NOT_FOUND)
-                           .entity(ImmutableMap.of("error", 
StringUtils.format("[%s] does not exist", id)))
-                           .build();
-          }
-
-          if (spec.get().isSuspended() == suspend) {
-            final String errMsg =
-                StringUtils.format("[%s] is already %s", id, suspend ? 
"suspended" : "running");
-            return Response.status(Response.Status.BAD_REQUEST)
+          if (manager.suspendOrResumeSupervisor(id, suspend)) {
+            Optional<SupervisorSpec> spec = manager.getSupervisorSpec(id);
+            return Response.ok(spec.get()).build();
+          } else {
+            Optional<SupervisorSpec> spec = manager.getSupervisorSpec(id);
+            Response.Status status;
+            String errMsg;
+            if (spec.isPresent()) {
+              status = Response.Status.BAD_REQUEST;
+              errMsg = StringUtils.format("[%s] is already %s", id, suspend ? 
"suspended" : "running");
+            } else {
+              status = Response.Status.NOT_FOUND;
+              errMsg = StringUtils.format("[%s] does not exist", id);
+            }
+            return Response.status(status)
                            .entity(ImmutableMap.of("error", errMsg))
                            .build();
           }
-          manager.suspendOrResumeSupervisor(id, suspend);
-          spec = manager.getSupervisorSpec(id);
-          return Response.ok(spec.get()).build();
+        }
+    );
+  }
+
+  private Response suspendOrResumeAll(boolean suspend)
+  {
+    return asLeaderWithSupervisorManager(
+        manager -> {
+          manager.suspendOrResumeAllSupervisors(suspend);
+          return Response.ok(ImmutableMap.of("status", "success")).build();
         }
     );
   }
diff --git 
a/indexing-service/src/test/java/org/apache/druid/indexing/overlord/supervisor/SupervisorResourceTest.java
 
b/indexing-service/src/test/java/org/apache/druid/indexing/overlord/supervisor/SupervisorResourceTest.java
index 9d6eab3..d893898 100644
--- 
a/indexing-service/src/test/java/org/apache/druid/indexing/overlord/supervisor/SupervisorResourceTest.java
+++ 
b/indexing-service/src/test/java/org/apache/druid/indexing/overlord/supervisor/SupervisorResourceTest.java
@@ -292,14 +292,6 @@ public class SupervisorResourceTest extends EasyMockSupport
   @Test
   public void testSpecSuspend()
   {
-
-    TestSupervisorSpec running = new TestSupervisorSpec("my-id", null, null, 
false) {
-      @Override
-      public List<String> getDataSources()
-      {
-        return Collections.singletonList("datasource1");
-      }
-    };
     TestSupervisorSpec suspended = new TestSupervisorSpec("my-id", null, null, 
true) {
       @Override
       public List<String> getDataSources()
@@ -309,11 +301,8 @@ public class SupervisorResourceTest extends EasyMockSupport
     };
 
     
EasyMock.expect(taskMaster.getSupervisorManager()).andReturn(Optional.of(supervisorManager));
-    EasyMock.expect(supervisorManager.getSupervisorSpec("my-id"))
-            .andReturn(Optional.of(running)).times(1)
-            .andReturn(Optional.of(suspended)).times(1);
     EasyMock.expect(supervisorManager.suspendOrResumeSupervisor("my-id", 
true)).andReturn(true);
-    EasyMock.expectLastCall().anyTimes();
+    
EasyMock.expect(supervisorManager.getSupervisorSpec("my-id")).andReturn(Optional.of(suspended));
     replayAll();
 
     Response response = supervisorResource.specSuspend("my-id");
@@ -326,7 +315,8 @@ public class SupervisorResourceTest extends EasyMockSupport
     resetAll();
 
     
EasyMock.expect(taskMaster.getSupervisorManager()).andReturn(Optional.of(supervisorManager));
-    
EasyMock.expect(supervisorManager.getSupervisorSpec("my-id")).andReturn(Optional.of(suspended)).atLeastOnce();
+    EasyMock.expect(supervisorManager.suspendOrResumeSupervisor("my-id", 
true)).andReturn(false);
+    
EasyMock.expect(supervisorManager.getSupervisorSpec("my-id")).andReturn(Optional.of(suspended));
     replayAll();
 
     response = supervisorResource.specSuspend("my-id");
@@ -336,18 +326,9 @@ public class SupervisorResourceTest extends EasyMockSupport
     Assert.assertEquals(ImmutableMap.of("error", "[my-id] is already 
suspended"), response.getEntity());
   }
 
-
-
   @Test
   public void testSpecResume()
   {
-    TestSupervisorSpec suspended = new TestSupervisorSpec("my-id", null, null, 
true) {
-      @Override
-      public List<String> getDataSources()
-      {
-        return Collections.singletonList("datasource1");
-      }
-    };
     TestSupervisorSpec running = new TestSupervisorSpec("my-id", null, null, 
false) {
       @Override
       public List<String> getDataSources()
@@ -357,11 +338,8 @@ public class SupervisorResourceTest extends EasyMockSupport
     };
 
     
EasyMock.expect(taskMaster.getSupervisorManager()).andReturn(Optional.of(supervisorManager));
-    EasyMock.expect(supervisorManager.getSupervisorSpec("my-id"))
-            .andReturn(Optional.of(suspended)).times(1)
-            .andReturn(Optional.of(running)).times(1);
     EasyMock.expect(supervisorManager.suspendOrResumeSupervisor("my-id", 
false)).andReturn(true);
-    EasyMock.expectLastCall().anyTimes();
+    
EasyMock.expect(supervisorManager.getSupervisorSpec("my-id")).andReturn(Optional.of(running));
     replayAll();
 
     Response response = supervisorResource.specResume("my-id");
@@ -374,7 +352,8 @@ public class SupervisorResourceTest extends EasyMockSupport
     resetAll();
 
     
EasyMock.expect(taskMaster.getSupervisorManager()).andReturn(Optional.of(supervisorManager));
-    
EasyMock.expect(supervisorManager.getSupervisorSpec("my-id")).andReturn(Optional.of(running)).atLeastOnce();
+    EasyMock.expect(supervisorManager.suspendOrResumeSupervisor("my-id", 
false)).andReturn(false);
+    
EasyMock.expect(supervisorManager.getSupervisorSpec("my-id")).andReturn(Optional.of(running));
     replayAll();
 
     response = supervisorResource.specResume("my-id");
@@ -385,19 +364,19 @@ public class SupervisorResourceTest extends 
EasyMockSupport
   }
 
   @Test
-  public void testShutdown()
+  public void testTerminate()
   {
     
EasyMock.expect(taskMaster.getSupervisorManager()).andReturn(Optional.of(supervisorManager)).times(2);
     
EasyMock.expect(supervisorManager.stopAndRemoveSupervisor("my-id")).andReturn(true);
     
EasyMock.expect(supervisorManager.stopAndRemoveSupervisor("my-id-2")).andReturn(false);
     replayAll();
 
-    Response response = supervisorResource.shutdown("my-id");
+    Response response = supervisorResource.terminate("my-id");
 
     Assert.assertEquals(200, response.getStatus());
     Assert.assertEquals(ImmutableMap.of("id", "my-id"), response.getEntity());
 
-    response = supervisorResource.shutdown("my-id-2");
+    response = supervisorResource.terminate("my-id-2");
 
     Assert.assertEquals(404, response.getStatus());
     verifyAll();
@@ -407,13 +386,55 @@ public class SupervisorResourceTest extends 
EasyMockSupport
     
EasyMock.expect(taskMaster.getSupervisorManager()).andReturn(Optional.absent());
     replayAll();
 
-    response = supervisorResource.shutdown("my-id");
+    response = supervisorResource.terminate("my-id");
     verifyAll();
 
     Assert.assertEquals(503, response.getStatus());
   }
 
   @Test
+  public void testSuspendAll()
+  {
+    
EasyMock.expect(taskMaster.getSupervisorManager()).andReturn(Optional.of(supervisorManager));
+    supervisorManager.suspendOrResumeAllSupervisors(true);
+    EasyMock.expectLastCall();
+    replayAll();
+
+    Response response = supervisorResource.suspendAll();
+    Assert.assertEquals(200, response.getStatus());
+    Assert.assertEquals(ImmutableMap.of("status", "success"), 
response.getEntity());
+    verifyAll();
+  }
+
+  @Test
+  public void testResumeAll()
+  {
+    
EasyMock.expect(taskMaster.getSupervisorManager()).andReturn(Optional.of(supervisorManager));
+    supervisorManager.suspendOrResumeAllSupervisors(false);
+    EasyMock.expectLastCall();
+    replayAll();
+
+    Response response = supervisorResource.resumeAll();
+    Assert.assertEquals(200, response.getStatus());
+    Assert.assertEquals(ImmutableMap.of("status", "success"), 
response.getEntity());
+    verifyAll();
+  }
+
+  @Test
+  public void testTerminateAll()
+  {
+    
EasyMock.expect(taskMaster.getSupervisorManager()).andReturn(Optional.of(supervisorManager));
+    supervisorManager.stopAndRemoveAllSupervisors();
+    EasyMock.expectLastCall();
+    replayAll();
+
+    Response response = supervisorResource.terminateAll();
+    Assert.assertEquals(200, response.getStatus());
+    Assert.assertEquals(ImmutableMap.of("status", "success"), 
response.getEntity());
+    verifyAll();
+  }
+
+  @Test
   public void testSpecGetAllHistory()
   {
     List<VersionedSupervisorSpec> versions1 = ImmutableList.of(
@@ -872,7 +893,7 @@ public class SupervisorResourceTest extends EasyMockSupport
     
EasyMock.expect(taskMaster.getSupervisorManager()).andReturn(Optional.absent());
     replayAll();
 
-    response = supervisorResource.shutdown("my-id");
+    response = supervisorResource.terminate("my-id");
 
     Assert.assertEquals(503, response.getStatus());
     verifyAll();


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to