This is an automated email from the ASF dual-hosted git repository.

jonwei pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-druid.git


The following commit(s) were added to refs/heads/master by this push:
     new 91a37c6  'suspend' and 'resume' support for supervisors (kafka 
indexing service, materialized views) (#6234)
91a37c6 is described below

commit 91a37c692dd8db34d6e4323cd61dba38a00145b1
Author: Clint Wylie <cjwy...@gmail.com>
AuthorDate: Thu Sep 13 14:42:18 2018 -0700

    'suspend' and 'resume' support for supervisors (kafka indexing service, 
materialized views) (#6234)
    
    * 'suspend' and 'resume' support for kafka indexing service
    changes:
    * introduces `SuspendableSupervisorSpec` interface to describe supervisors 
which support suspend/resume functionality controlled through the 
`SupervisorManager`, which will gracefully shutdown the supervisor and it's 
tasks, update it's `SupervisorSpec` with either a suspended or running state, 
and update with the toggled spec. Spec updates are provided by 
`SuspendableSupervisorSpec.createSuspendedSpec` and 
`SuspendableSupervisorSpec.createRunningSpec` respectively.
    * `KafkaSupervisorSpec` extends `SuspendableSupervisorSpec` and now 
supports suspend/resume functionality. The difference in behavior between 
'running' and 'suspended' state is whether the supervisor will attempt to 
ensure that indexing tasks are or are not running respectively. Behavior is 
identical otherwise.
    * `SupervisorResource` now provides 
`/druid/indexer/v1/supervisor/{id}/suspend` and 
`/druid/indexer/v1/supervisor/{id}/resume` which are used to suspend/resume 
suspendable supervisors
    * Deprecated `/druid/indexer/v1/supervisor/{id}/shutdown` and moved it's 
functionality to `/druid/indexer/v1/supervisor/{id}/terminate` since 'shutdown' 
is ambiguous verbage for something that effectively stops a supervisor forever
    * Added ability to get all supervisor specs from 
`/druid/indexer/v1/supervisor` by supplying the 'full' query parameter 
`/druid/indexer/v1/supervisor?full` which will return a list of json objects of 
the form `{"id":<id>, "spec":<SupervisorSpec>}`
    * Updated overlord console ui to enable suspend/resume, and changed 
'shutdown' to 'terminate'
    
    * move overlord console status to own column in supervisor table so does 
not look like garbage
    
    * spacing
    
    * padding
    
    * other kind of spacing
    
    * fix rebase fail
    
    * fix more better
    
    * all supervisors now suspendable, updated materialized view supervisor to 
support suspend, more tests
    
    * fix log
---
 .../development/extensions-core/kafka-ingestion.md |  80 ++++--
 .../MaterializedViewSupervisor.java                |  64 +++--
 .../MaterializedViewSupervisorReport.java          |   4 +-
 .../MaterializedViewSupervisorSpec.java            |  68 ++++-
 .../MaterializedViewSupervisorSpecTest.java        |  49 ++++
 .../MaterializedViewSupervisorTest.java            |  49 +++-
 .../indexing/kafka/supervisor/KafkaSupervisor.java |  59 ++--
 .../supervisor/KafkaSupervisorReportPayload.java   |  12 +-
 .../kafka/supervisor/KafkaSupervisorSpec.java      |  41 +++
 .../kafka/supervisor/KafkaSupervisorSpecTest.java  | 234 ++++++++++++++++
 .../kafka/supervisor/KafkaSupervisorTest.java      | 170 +++++++++++-
 .../overlord/supervisor/SupervisorManager.java     |  13 +
 .../overlord/supervisor/SupervisorResource.java    | 309 +++++++++++----------
 .../resources/indexer_static/js/console-0.0.1.js   |  58 +++-
 .../OverlordSecurityResourceFilterTest.java        |  18 ++
 .../overlord/supervisor/SupervisorManagerTest.java | 108 ++++++-
 .../supervisor/SupervisorResourceTest.java         | 208 ++++++++++++--
 .../overlord/supervisor/NoopSupervisorSpec.java    |  45 ++-
 .../overlord/supervisor/SupervisorSpec.java        |  16 ++
 19 files changed, 1341 insertions(+), 264 deletions(-)

diff --git a/docs/content/development/extensions-core/kafka-ingestion.md 
b/docs/content/development/extensions-core/kafka-ingestion.md
index 7fbacf4..7b17c46 100644
--- a/docs/content/development/extensions-core/kafka-ingestion.md
+++ b/docs/content/development/extensions-core/kafka-ingestion.md
@@ -194,17 +194,73 @@ existing publishing tasks and will create new tasks 
starting at the offsets the
 
 Seamless schema migrations can thus be achieved by simply submitting the new 
schema using this endpoint.
 
-#### Shutdown Supervisor
+#### Suspend Supervisor 
+
+```
+POST /druid/indexer/v1/supervisor/<supervisorId>/suspend
+```
+Suspend indexing tasks associated with a supervisor. Note that the supervisor 
itself will still be
+operating and emitting logs and metrics, it will just ensure that no indexing 
tasks are running until the supervisor
+is resumed. Responds with updated SupervisorSpec.
+
+#### Resume Supervisor 
+
+```
+POST /druid/indexer/v1/supervisor/<supervisorId>/resume
+```
+Resume indexing tasks for a supervisor. Responds with updated SupervisorSpec.
+
+#### Reset Supervisor
+```
+POST /druid/indexer/v1/supervisor/<supervisorId>/reset
+```
+The indexing service keeps track of the latest persisted Kafka offsets in 
order to provide exactly-once ingestion
+guarantees across tasks. Subsequent tasks must start reading from where the 
previous task completed in order for the
+generated segments to be accepted. If the messages at the expected starting 
offsets are no longer available in Kafka
+(typically because the message retention period has elapsed or the topic was 
removed and re-created) the supervisor will
+refuse to start and in-flight tasks will fail.
+
+This endpoint can be used to clear the stored offsets which will cause the 
supervisor to start reading from
+either the earliest or latest offsets in Kafka (depending on the value of 
`useEarliestOffset`). The supervisor must be
+running for this endpoint to be available. After the stored offsets are 
cleared, the supervisor will automatically kill
+and re-create any active tasks so that tasks begin reading from valid offsets.
+
+Note that since the stored offsets are necessary to guarantee exactly-once 
ingestion, resetting them with this endpoint
+may cause some Kafka messages to be skipped or to be read twice.
+
+#### Terminate Supervisor 
+```
+POST /druid/indexer/v1/supervisor/<supervisorId>/terminate
+```
+Terminate a supervisor and cause all associated indexing tasks managed by this 
supervisor to immediately stop and begin 
+publishing their segments. This supervisor will still exist in the metadata 
store and it's history may be retrieved 
+with the supervisor history api, but will not be listed in the 'get 
supervisors' api response nor can it's configuration
+or status report be retrieved. The only way this supervisor can start again is 
by submitting a functioning supervisor
+spec to the create api.
+
+#### Shutdown Supervisor 
+_Deprecated: use the equivalent 'terminate' instead_
 ```
 POST /druid/indexer/v1/supervisor/<supervisorId>/shutdown
 ```
-Note that this will cause all indexing tasks managed by this supervisor to 
immediately stop and begin publishing their segments.
 
 #### Get Supervisor IDs
 ```
 GET /druid/indexer/v1/supervisor
 ```
-Returns a list of the currently active supervisors.
+Returns a list of strings of the currently active supervisor ids.
+
+#### Get Supervisors
+```
+GET /druid/indexer/v1/supervisor?full
+```
+Returns a list of objects of the currently active supervisors.
+
+|Field|Type|Description|
+|---|---|---|
+|`id`|String|supervisor unique identifier|
+|`spec`|SupervisorSpec|json specification of supervisor (See Supervisor 
Configuration for details)|
+
 
 #### Get Supervisor Spec
 ```
@@ -233,24 +289,6 @@ GET /druid/indexer/v1/supervisor/<supervisorId>/history
 ```
 Returns an audit history of specs for the supervisor with the provided ID.
 
-#### Reset Supervisor
-```
-POST /druid/indexer/v1/supervisor/<supervisorId>/reset
-```
-The indexing service keeps track of the latest persisted Kafka offsets in 
order to provide exactly-once ingestion
-guarantees across tasks. Subsequent tasks must start reading from where the 
previous task completed in order for the
-generated segments to be accepted. If the messages at the expected starting 
offsets are no longer available in Kafka
-(typically because the message retention period has elapsed or the topic was 
removed and re-created) the supervisor will
-refuse to start and in-flight tasks will fail.
-
-This endpoint can be used to clear the stored offsets which will cause the 
supervisor to start reading from
-either the earliest or latest offsets in Kafka (depending on the value of 
`useEarliestOffset`). The supervisor must be
-running for this endpoint to be available. After the stored offsets are 
cleared, the supervisor will automatically kill
-and re-create any active tasks so that tasks begin reading from valid offsets.
-
-Note that since the stored offsets are necessary to guarantee exactly-once 
ingestion, resetting them with this endpoint
-may cause some Kafka messages to be skipped or to be read twice.
-
 ## Capacity Planning
 
 Kafka indexing tasks run on middle managers and are thus limited by the 
resources available in the middle manager
diff --git 
a/extensions-contrib/materialized-view-maintenance/src/main/java/org/apache/druid/indexing/materializedview/MaterializedViewSupervisor.java
 
b/extensions-contrib/materialized-view-maintenance/src/main/java/org/apache/druid/indexing/materializedview/MaterializedViewSupervisor.java
index 56afe3e..d4e7f08 100644
--- 
a/extensions-contrib/materialized-view-maintenance/src/main/java/org/apache/druid/indexing/materializedview/MaterializedViewSupervisor.java
+++ 
b/extensions-contrib/materialized-view-maintenance/src/main/java/org/apache/druid/indexing/materializedview/MaterializedViewSupervisor.java
@@ -135,31 +135,7 @@ public class MaterializedViewSupervisor implements 
Supervisor
       exec = 
MoreExecutors.listeningDecorator(Execs.scheduledSingleThreaded(supervisorId));
       final Duration delay = 
config.getTaskCheckDuration().toStandardDuration();
       future = exec.scheduleWithFixedDelay(
-          new Runnable() {
-            @Override
-            public void run()
-            {
-              try {
-                DataSourceMetadata metadata = 
metadataStorageCoordinator.getDataSourceMetadata(dataSource);
-                if (metadata instanceof DerivativeDataSourceMetadata 
-                    && 
spec.getBaseDataSource().equals(((DerivativeDataSourceMetadata) 
metadata).getBaseDataSource())
-                    && 
spec.getDimensions().equals(((DerivativeDataSourceMetadata) 
metadata).getDimensions())
-                    && 
spec.getMetrics().equals(((DerivativeDataSourceMetadata) 
metadata).getMetrics())) {
-                  checkSegmentsAndSubmitTasks();
-                } else {
-                  log.error(
-                      "Failed to start %s. Metadata in database(%s) is 
different from new dataSource metadata(%s)",
-                      supervisorId,
-                      metadata,
-                      spec
-                  );
-                }
-              }
-              catch (Exception e) {
-                log.makeAlert(e, StringUtils.format("uncaught exception in 
%s.", supervisorId)).emit();
-              }
-            }
-          },
+          MaterializedViewSupervisor.this::run,
           0,
           delay.getMillis(),
           TimeUnit.MILLISECONDS
@@ -167,7 +143,40 @@ public class MaterializedViewSupervisor implements 
Supervisor
       started = true;
     }
   }
-  
+
+  @VisibleForTesting
+  public void run()
+  {
+    try {
+      if (spec.isSuspended()) {
+        log.info(
+            "Materialized view supervisor[%s:%s] is suspended",
+            spec.getId(),
+            spec.getDataSourceName()
+        );
+        return;
+      }
+
+      DataSourceMetadata metadata = 
metadataStorageCoordinator.getDataSourceMetadata(dataSource);
+      if (metadata instanceof DerivativeDataSourceMetadata
+          && spec.getBaseDataSource().equals(((DerivativeDataSourceMetadata) 
metadata).getBaseDataSource())
+          && spec.getDimensions().equals(((DerivativeDataSourceMetadata) 
metadata).getDimensions())
+          && spec.getMetrics().equals(((DerivativeDataSourceMetadata) 
metadata).getMetrics())) {
+        checkSegmentsAndSubmitTasks();
+      } else {
+        log.error(
+            "Failed to start %s. Metadata in database(%s) is different from 
new dataSource metadata(%s)",
+            supervisorId,
+            metadata,
+            spec
+        );
+      }
+    }
+    catch (Exception e) {
+      log.makeAlert(e, StringUtils.format("uncaught exception in %s.", 
supervisorId)).emit();
+    }
+  }
+
   @Override
   public void stop(boolean stopGracefully) 
   {
@@ -207,7 +216,8 @@ public class MaterializedViewSupervisor implements 
Supervisor
   {
     return new MaterializedViewSupervisorReport(
         dataSource,
-        DateTimes.nowUtc(), 
+        DateTimes.nowUtc(),
+        spec.isSuspended(),
         spec.getBaseDataSource(),
         spec.getDimensions(),
         spec.getMetrics(),
diff --git 
a/extensions-contrib/materialized-view-maintenance/src/main/java/org/apache/druid/indexing/materializedview/MaterializedViewSupervisorReport.java
 
b/extensions-contrib/materialized-view-maintenance/src/main/java/org/apache/druid/indexing/materializedview/MaterializedViewSupervisorReport.java
index 107354f..f05f8e0 100644
--- 
a/extensions-contrib/materialized-view-maintenance/src/main/java/org/apache/druid/indexing/materializedview/MaterializedViewSupervisorReport.java
+++ 
b/extensions-contrib/materialized-view-maintenance/src/main/java/org/apache/druid/indexing/materializedview/MaterializedViewSupervisorReport.java
@@ -29,10 +29,11 @@ import java.util.Set;
 
 public class MaterializedViewSupervisorReport extends SupervisorReport 
 {
-  
+
   public MaterializedViewSupervisorReport(
       String dataSource,
       DateTime generationTime,
+      boolean suspended,
       String baseDataSource,
       Set<String> dimensions,
       Set<String> metrics,
@@ -42,6 +43,7 @@ public class MaterializedViewSupervisorReport extends 
SupervisorReport
     super(dataSource, generationTime, "{" +
         "dataSource='" + dataSource + '\'' +
         ", baseDataSource='" + baseDataSource + '\'' +
+        ", suspended='" + suspended + "\'" +
         ", dimensions=" + dimensions +
         ", metrics=" + metrics +
         ", missTimeline" + Sets.newHashSet(missTimeline) +
diff --git 
a/extensions-contrib/materialized-view-maintenance/src/main/java/org/apache/druid/indexing/materializedview/MaterializedViewSupervisorSpec.java
 
b/extensions-contrib/materialized-view-maintenance/src/main/java/org/apache/druid/indexing/materializedview/MaterializedViewSupervisorSpec.java
index b81d85e2..29904a4 100644
--- 
a/extensions-contrib/materialized-view-maintenance/src/main/java/org/apache/druid/indexing/materializedview/MaterializedViewSupervisorSpec.java
+++ 
b/extensions-contrib/materialized-view-maintenance/src/main/java/org/apache/druid/indexing/materializedview/MaterializedViewSupervisorSpec.java
@@ -81,6 +81,7 @@ public class MaterializedViewSupervisorSpec implements 
SupervisorSpec
   private final MaterializedViewTaskConfig config;
   private final AuthorizerMapper authorizerMapper;
   private final ChatHandlerProvider chatHandlerProvider;
+  private final boolean suspended;
   
   public MaterializedViewSupervisorSpec(
       @JsonProperty("baseDataSource") String baseDataSource,
@@ -92,6 +93,7 @@ public class MaterializedViewSupervisorSpec implements 
SupervisorSpec
       @JsonProperty("hadoopDependencyCoordinates") List<String> 
hadoopDependencyCoordinates,
       @JsonProperty("classpathPrefix") String classpathPrefix,
       @JsonProperty("context") Map<String, Object> context,
+      @JsonProperty("suspended") Boolean suspended,
       @JacksonInject ObjectMapper objectMapper,
       @JacksonInject TaskMaster taskMaster,
       @JacksonInject TaskStorage taskStorage,
@@ -139,7 +141,8 @@ public class MaterializedViewSupervisorSpec implements 
SupervisorSpec
     this.authorizerMapper = authorizerMapper;
     this.chatHandlerProvider = chatHandlerProvider;
     this.config = config;
-    
+    this.suspended = suspended != null ? suspended : false;
+
     this.metrics = Sets.newHashSet();
     for (AggregatorFactory aggregatorFactory : aggregators) {
       metrics.add(aggregatorFactory.getName());
@@ -305,7 +308,14 @@ public class MaterializedViewSupervisorSpec implements 
SupervisorSpec
   {
     return context;
   }
-  
+
+  @Override
+  @JsonProperty("suspended")
+  public boolean isSuspended()
+  {
+    return suspended;
+  }
+
   @Override
   public String getId() 
   {
@@ -331,7 +341,59 @@ public class MaterializedViewSupervisorSpec implements 
SupervisorSpec
   {
     return ImmutableList.of(dataSourceName);
   }
-  
+
+  @Override
+  public SupervisorSpec createSuspendedSpec()
+  {
+    return new MaterializedViewSupervisorSpec(
+        baseDataSource,
+        dimensionsSpec,
+        aggregators,
+        tuningConfig,
+        dataSourceName,
+        hadoopCoordinates,
+        hadoopDependencyCoordinates,
+        classpathPrefix,
+        context,
+        true,
+        objectMapper,
+        taskMaster,
+        taskStorage,
+        metadataSupervisorManager,
+        segmentManager,
+        metadataStorageCoordinator,
+        config,
+        authorizerMapper,
+        chatHandlerProvider
+    );
+  }
+
+  @Override
+  public SupervisorSpec createRunningSpec()
+  {
+    return new MaterializedViewSupervisorSpec(
+        baseDataSource,
+        dimensionsSpec,
+        aggregators,
+        tuningConfig,
+        dataSourceName,
+        hadoopCoordinates,
+        hadoopDependencyCoordinates,
+        classpathPrefix,
+        context,
+        false,
+        objectMapper,
+        taskMaster,
+        taskStorage,
+        metadataSupervisorManager,
+        segmentManager,
+        metadataStorageCoordinator,
+        config,
+        authorizerMapper,
+        chatHandlerProvider
+    );
+  }
+
   @Override
   public String toString()
   {
diff --git 
a/extensions-contrib/materialized-view-maintenance/src/test/java/org/apache/druid/indexing/materializedview/MaterializedViewSupervisorSpecTest.java
 
b/extensions-contrib/materialized-view-maintenance/src/test/java/org/apache/druid/indexing/materializedview/MaterializedViewSupervisorSpecTest.java
index b9c8160..40da151 100644
--- 
a/extensions-contrib/materialized-view-maintenance/src/test/java/org/apache/druid/indexing/materializedview/MaterializedViewSupervisorSpecTest.java
+++ 
b/extensions-contrib/materialized-view-maintenance/src/test/java/org/apache/druid/indexing/materializedview/MaterializedViewSupervisorSpecTest.java
@@ -75,6 +75,7 @@ public class MaterializedViewSupervisorSpecTest
             .addValue(ChatHandlerProvider.class, new NoopChatHandlerProvider())
     );
   }
+
   @Test
   public void testSupervisorSerialization() throws IOException 
   {
@@ -132,6 +133,7 @@ public class MaterializedViewSupervisorSpecTest
         null,
         null,
         null,
+        false,
         objectMapper,
         null,
         null,
@@ -151,6 +153,51 @@ public class MaterializedViewSupervisorSpecTest
   }
 
   @Test
+  public void testSuspendResuume() throws IOException
+  {
+    String supervisorStr = "{\n" +
+                           "  \"type\" : \"derivativeDataSource\",\n" +
+                           "  \"baseDataSource\": \"wikiticker\",\n" +
+                           "  \"dimensionsSpec\":{\n" +
+                           "            \"dimensions\" : [\n" +
+                           "              \"isUnpatrolled\",\n" +
+                           "              \"metroCode\",\n" +
+                           "              \"namespace\",\n" +
+                           "              \"page\",\n" +
+                           "              \"regionIsoCode\",\n" +
+                           "              \"regionName\",\n" +
+                           "              \"user\"\n" +
+                           "            ]\n" +
+                           "          },\n" +
+                           "    \"metricsSpec\" : [\n" +
+                           "        {\n" +
+                           "          \"name\" : \"count\",\n" +
+                           "          \"type\" : \"count\"\n" +
+                           "        },\n" +
+                           "        {\n" +
+                           "          \"name\" : \"added\",\n" +
+                           "          \"type\" : \"longSum\",\n" +
+                           "          \"fieldName\" : \"added\"\n" +
+                           "        }\n" +
+                           "      ],\n" +
+                           "  \"tuningConfig\": {\n" +
+                           "      \"type\" : \"hadoop\"\n" +
+                           "  }\n" +
+                           "}";
+
+    MaterializedViewSupervisorSpec spec = 
objectMapper.readValue(supervisorStr, MaterializedViewSupervisorSpec.class);
+    Assert.assertFalse(spec.isSuspended());
+
+    String suspendedSerialized = 
objectMapper.writeValueAsString(spec.createSuspendedSpec());
+    MaterializedViewSupervisorSpec suspendedSpec = 
objectMapper.readValue(suspendedSerialized, 
MaterializedViewSupervisorSpec.class);
+    Assert.assertTrue(suspendedSpec.isSuspended());
+
+    String runningSerialized = 
objectMapper.writeValueAsString(spec.createRunningSpec());
+    MaterializedViewSupervisorSpec runningSpec = 
objectMapper.readValue(runningSerialized, MaterializedViewSupervisorSpec.class);
+    Assert.assertFalse(runningSpec.isSuspended());
+  }
+
+  @Test
   public void testEmptyBaseDataSource() throws Exception
   {
     
expectedException.expect(CoreMatchers.instanceOf(IllegalArgumentException.class));
@@ -182,6 +229,7 @@ public class MaterializedViewSupervisorSpecTest
         null,
         null,
         null,
+        false,
         objectMapper,
         null,
         null,
@@ -226,6 +274,7 @@ public class MaterializedViewSupervisorSpecTest
         null,
         null,
         null,
+        false,
         objectMapper,
         null,
         null,
diff --git 
a/extensions-contrib/materialized-view-maintenance/src/test/java/org/apache/druid/indexing/materializedview/MaterializedViewSupervisorTest.java
 
b/extensions-contrib/materialized-view-maintenance/src/test/java/org/apache/druid/indexing/materializedview/MaterializedViewSupervisorTest.java
index a48f76f..4ece1f0 100644
--- 
a/extensions-contrib/materialized-view-maintenance/src/test/java/org/apache/druid/indexing/materializedview/MaterializedViewSupervisorTest.java
+++ 
b/extensions-contrib/materialized-view-maintenance/src/test/java/org/apache/druid/indexing/materializedview/MaterializedViewSupervisorTest.java
@@ -47,6 +47,9 @@ import org.apache.druid.server.security.AuthorizerMapper;
 import org.apache.druid.timeline.DataSegment;
 import org.apache.druid.timeline.partition.HashBasedNumberedShardSpec;
 import static org.easymock.EasyMock.expect;
+
+import org.easymock.EasyMock;
+import org.easymock.IAnswer;
 import org.joda.time.Interval;
 import org.junit.Assert;
 import org.junit.Before;
@@ -109,6 +112,7 @@ public class MaterializedViewSupervisorTest
         null,
         null,
         null,
+        false,
         objectMapper,
         taskMaster,
         taskStorage,
@@ -121,9 +125,9 @@ public class MaterializedViewSupervisorTest
     );
     supervisor = (MaterializedViewSupervisor) spec.createSupervisor();
   }
-  
+
   @Test
-  public void testCheckSegments() throws IOException 
+  public void testCheckSegments() throws IOException
   {
     Set<DataSegment> baseSegments = Sets.newHashSet(
         new DataSegment(
@@ -156,7 +160,7 @@ public class MaterializedViewSupervisorTest
     Pair<SortedMap<Interval, String>, Map<Interval, List<DataSegment>>> 
toBuildInterval = supervisor.checkSegments();
     Map<Interval, List<DataSegment>> expectedSegments = Maps.newHashMap();
     expectedSegments.put(
-        Intervals.of("2015-01-01T00Z/2015-01-02T00Z"), 
+        Intervals.of("2015-01-01T00Z/2015-01-02T00Z"),
         Collections.singletonList(
             new DataSegment(
                 "base",
@@ -173,4 +177,43 @@ public class MaterializedViewSupervisorTest
     );
     Assert.assertEquals(expectedSegments, toBuildInterval.rhs);
   }
+
+
+  @Test
+  public void testSuspendedDoesntRun() throws IOException
+  {
+    MaterializedViewSupervisorSpec suspended = new 
MaterializedViewSupervisorSpec(
+        "base",
+        new DimensionsSpec(Collections.singletonList(new 
StringDimensionSchema("dim")), null, null),
+        new AggregatorFactory[]{new LongSumAggregatorFactory("m1", "m1")},
+        HadoopTuningConfig.makeDefaultTuningConfig(),
+        null,
+        null,
+        null,
+        null,
+        null,
+        true,
+        objectMapper,
+        taskMaster,
+        taskStorage,
+        metadataSupervisorManager,
+        sqlMetadataSegmentManager,
+        indexerMetadataStorageCoordinator,
+        new MaterializedViewTaskConfig(),
+        createMock(AuthorizerMapper.class),
+        createMock(ChatHandlerProvider.class)
+    );
+    MaterializedViewSupervisor supervisor = (MaterializedViewSupervisor) 
suspended.createSupervisor();
+
+    // mock IndexerSQLMetadataStorageCoordinator to ensure that 
getDataSourceMetadata is not called
+    // which will be true if truly suspended, since this is the first 
operation of the 'run' method otherwise
+    IndexerSQLMetadataStorageCoordinator mock = 
createMock(IndexerSQLMetadataStorageCoordinator.class);
+    
expect(mock.getDataSourceMetadata(suspended.getDataSourceName())).andAnswer((IAnswer)
 () -> {
+      Assert.fail();
+      return null;
+    }).anyTimes();
+
+    EasyMock.replay(mock);
+    supervisor.run();
+  }
 }
diff --git 
a/extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/indexing/kafka/supervisor/KafkaSupervisor.java
 
b/extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/indexing/kafka/supervisor/KafkaSupervisor.java
index 78652a7..5087eef 100644
--- 
a/extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/indexing/kafka/supervisor/KafkaSupervisor.java
+++ 
b/extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/indexing/kafka/supervisor/KafkaSupervisor.java
@@ -309,14 +309,8 @@ public class KafkaSupervisor implements Supervisor
         Optional<TaskRunner> taskRunner = taskMaster.getTaskRunner();
         if (taskRunner.isPresent()) {
           Optional<? extends TaskRunnerWorkItem> item = Iterables.tryFind(
-              taskRunner.get().getRunningTasks(), new 
Predicate<TaskRunnerWorkItem>()
-              {
-                @Override
-                public boolean apply(TaskRunnerWorkItem taskRunnerWorkItem)
-                {
-                  return id.equals(taskRunnerWorkItem.getTaskId());
-                }
-              }
+              taskRunner.get().getRunningTasks(),
+              (Predicate<TaskRunnerWorkItem>) taskRunnerWorkItem -> 
id.equals(taskRunnerWorkItem.getTaskId())
           );
 
           if (item.isPresent()) {
@@ -372,29 +366,24 @@ public class KafkaSupervisor implements Supervisor
         consumer = getKafkaConsumer();
 
         exec.submit(
-            new Runnable()
-            {
-              @Override
-              public void run()
-              {
-                try {
-                  while (!Thread.currentThread().isInterrupted()) {
-                    final Notice notice = notices.take();
-
-                    try {
-                      notice.handle();
-                    }
-                    catch (Throwable e) {
-                      log.makeAlert(e, "KafkaSupervisor[%s] failed to handle 
notice", dataSource)
-                         .addData("noticeClass", 
notice.getClass().getSimpleName())
-                         .emit();
-                    }
+            () -> {
+              try {
+                while (!Thread.currentThread().isInterrupted()) {
+                  final Notice notice = notices.take();
+
+                  try {
+                    notice.handle();
+                  }
+                  catch (Throwable e) {
+                    log.makeAlert(e, "KafkaSupervisor[%s] failed to handle 
notice", dataSource)
+                       .addData("noticeClass", 
notice.getClass().getSimpleName())
+                       .emit();
                   }
-                }
-                catch (InterruptedException e) {
-                  log.info("KafkaSupervisor[%s] interrupted, exiting", 
dataSource);
                 }
               }
+              catch (InterruptedException e) {
+                log.info("KafkaSupervisor[%s] interrupted, exiting", 
dataSource);
+              }
             }
         );
         firstRunTime = DateTimes.nowUtc().plus(ioConfig.getStartDelay());
@@ -898,7 +887,16 @@ public class KafkaSupervisor implements Supervisor
     checkTaskDuration();
     checkPendingCompletionTasks();
     checkCurrentTaskState();
-    createNewTasks();
+
+    // if supervisor is not suspended, ensure required tasks are running
+    // if suspended, ensure tasks have been requested to gracefully stop
+    if (!spec.isSuspended()) {
+      log.info("[%s] supervisor is running.", dataSource);
+      createNewTasks();
+    } else {
+      log.info("[%s] supervisor is suspended.", dataSource);
+      gracefulShutdownInternal();
+    }
 
     if (log.isDebugEnabled()) {
       log.debug(generateReport(true).toString());
@@ -2096,7 +2094,8 @@ public class KafkaSupervisor implements Supervisor
         includeOffsets ? latestOffsetsFromKafka : null,
         includeOffsets ? partitionLag : null,
         includeOffsets ? partitionLag.values().stream().mapToLong(x -> 
Math.max(x, 0)).sum() : null,
-        includeOffsets ? offsetsLastUpdated : null
+        includeOffsets ? offsetsLastUpdated : null,
+        spec.isSuspended()
     );
     SupervisorReport<KafkaSupervisorReportPayload> report = new 
SupervisorReport<>(
         dataSource,
diff --git 
a/extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/indexing/kafka/supervisor/KafkaSupervisorReportPayload.java
 
b/extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/indexing/kafka/supervisor/KafkaSupervisorReportPayload.java
index 2a5a482..d9533a3 100644
--- 
a/extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/indexing/kafka/supervisor/KafkaSupervisorReportPayload.java
+++ 
b/extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/indexing/kafka/supervisor/KafkaSupervisorReportPayload.java
@@ -42,6 +42,7 @@ public class KafkaSupervisorReportPayload
   private final Map<Integer, Long> minimumLag;
   private final Long aggregateLag;
   private final DateTime offsetsLastUpdated;
+  private final boolean suspended;
 
   public KafkaSupervisorReportPayload(
       String dataSource,
@@ -52,7 +53,8 @@ public class KafkaSupervisorReportPayload
       @Nullable Map<Integer, Long> latestOffsets,
       @Nullable Map<Integer, Long> minimumLag,
       @Nullable Long aggregateLag,
-      @Nullable DateTime offsetsLastUpdated
+      @Nullable DateTime offsetsLastUpdated,
+      boolean suspended
   )
   {
     this.dataSource = dataSource;
@@ -66,6 +68,7 @@ public class KafkaSupervisorReportPayload
     this.minimumLag = minimumLag;
     this.aggregateLag = aggregateLag;
     this.offsetsLastUpdated = offsetsLastUpdated;
+    this.suspended = suspended;
   }
 
   public void addTask(TaskReportData data)
@@ -148,6 +151,12 @@ public class KafkaSupervisorReportPayload
     return offsetsLastUpdated;
   }
 
+  @JsonProperty
+  public boolean getSuspended()
+  {
+    return suspended;
+  }
+
   @Override
   public String toString()
   {
@@ -163,6 +172,7 @@ public class KafkaSupervisorReportPayload
            (minimumLag != null ? ", minimumLag=" + minimumLag : "") +
            (aggregateLag != null ? ", aggregateLag=" + aggregateLag : "") +
            (offsetsLastUpdated != null ? ", offsetsLastUpdated=" + 
offsetsLastUpdated : "") +
+           ", suspended=" + suspended +
            '}';
   }
 }
diff --git 
a/extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/indexing/kafka/supervisor/KafkaSupervisorSpec.java
 
b/extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/indexing/kafka/supervisor/KafkaSupervisorSpec.java
index fc620ea..28dab02 100644
--- 
a/extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/indexing/kafka/supervisor/KafkaSupervisorSpec.java
+++ 
b/extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/indexing/kafka/supervisor/KafkaSupervisorSpec.java
@@ -55,6 +55,7 @@ public class KafkaSupervisorSpec implements SupervisorSpec
   private final ServiceEmitter emitter;
   private final DruidMonitorSchedulerConfig monitorSchedulerConfig;
   private final RowIngestionMetersFactory rowIngestionMetersFactory;
+  private final boolean suspended;
 
   @JsonCreator
   public KafkaSupervisorSpec(
@@ -62,6 +63,7 @@ public class KafkaSupervisorSpec implements SupervisorSpec
       @JsonProperty("tuningConfig") KafkaSupervisorTuningConfig tuningConfig,
       @JsonProperty("ioConfig") KafkaSupervisorIOConfig ioConfig,
       @JsonProperty("context") Map<String, Object> context,
+      @JsonProperty("suspended") Boolean suspended,
       @JacksonInject TaskStorage taskStorage,
       @JacksonInject TaskMaster taskMaster,
       @JacksonInject IndexerMetadataStorageCoordinator 
indexerMetadataStorageCoordinator,
@@ -111,6 +113,7 @@ public class KafkaSupervisorSpec implements SupervisorSpec
     this.emitter = emitter;
     this.monitorSchedulerConfig = monitorSchedulerConfig;
     this.rowIngestionMetersFactory = rowIngestionMetersFactory;
+    this.suspended = suspended != null ? suspended : false;
   }
 
   @JsonProperty
@@ -137,6 +140,13 @@ public class KafkaSupervisorSpec implements SupervisorSpec
     return context;
   }
 
+  @Override
+  @JsonProperty("suspended")
+  public boolean isSuspended()
+  {
+    return suspended;
+  }
+
   public ServiceEmitter getEmitter()
   {
     return emitter;
@@ -182,4 +192,35 @@ public class KafkaSupervisorSpec implements SupervisorSpec
            ", ioConfig=" + ioConfig +
            '}';
   }
+
+  @Override
+  public KafkaSupervisorSpec createSuspendedSpec()
+  {
+    return toggleSuspend(true);
+  }
+
+  @Override
+  public KafkaSupervisorSpec createRunningSpec()
+  {
+    return toggleSuspend(false);
+  }
+
+  private KafkaSupervisorSpec toggleSuspend(boolean suspend)
+  {
+    return new KafkaSupervisorSpec(
+        dataSchema,
+        tuningConfig,
+        ioConfig,
+        context,
+        suspend,
+        taskStorage,
+        taskMaster,
+        indexerMetadataStorageCoordinator,
+        kafkaIndexTaskClientFactory,
+        mapper,
+        emitter,
+        monitorSchedulerConfig,
+        rowIngestionMetersFactory
+    );
+  }
 }
diff --git 
a/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/supervisor/KafkaSupervisorSpecTest.java
 
b/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/supervisor/KafkaSupervisorSpecTest.java
new file mode 100644
index 0000000..b4c6dfd
--- /dev/null
+++ 
b/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/supervisor/KafkaSupervisorSpecTest.java
@@ -0,0 +1,234 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.indexing.kafka.supervisor;
+
+import com.fasterxml.jackson.databind.InjectableValues;
+import com.fasterxml.jackson.databind.Module;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import org.apache.druid.indexing.common.stats.RowIngestionMetersFactory;
+import org.apache.druid.indexing.kafka.KafkaIndexTaskClientFactory;
+import org.apache.druid.indexing.kafka.KafkaIndexTaskModule;
+import org.apache.druid.indexing.overlord.IndexerMetadataStorageCoordinator;
+import org.apache.druid.indexing.overlord.TaskMaster;
+import org.apache.druid.indexing.overlord.TaskStorage;
+import org.apache.druid.jackson.DefaultObjectMapper;
+import org.apache.druid.java.util.emitter.service.ServiceEmitter;
+import org.apache.druid.math.expr.ExprMacroTable;
+import org.apache.druid.query.expression.LookupEnabledTestExprMacroTable;
+import org.apache.druid.server.metrics.DruidMonitorSchedulerConfig;
+import org.apache.druid.server.metrics.NoopServiceEmitter;
+import org.junit.Assert;
+import org.junit.Test;
+
+import java.io.IOException;
+
+public class KafkaSupervisorSpecTest
+{
+  private final ObjectMapper mapper;
+
+  public KafkaSupervisorSpecTest()
+  {
+    mapper = new DefaultObjectMapper();
+    mapper.setInjectableValues(
+        new InjectableValues.Std()
+            .addValue(TaskStorage.class, null)
+            .addValue(TaskMaster.class, null)
+            .addValue(IndexerMetadataStorageCoordinator.class, null)
+            .addValue(KafkaIndexTaskClientFactory.class, null)
+            .addValue(ObjectMapper.class, mapper)
+            .addValue(ServiceEmitter.class, new NoopServiceEmitter())
+            .addValue(DruidMonitorSchedulerConfig.class, null)
+            .addValue(RowIngestionMetersFactory.class, null)
+            .addValue(ExprMacroTable.class.getName(), 
LookupEnabledTestExprMacroTable.INSTANCE)
+    );
+    mapper.registerModules((Iterable<Module>) new 
KafkaIndexTaskModule().getJacksonModules());
+  }
+
+  @Test
+  public void testSerde() throws IOException
+  {
+    String json = "{\n"
+                  + "  \"type\": \"kafka\",\n"
+                  + "  \"dataSchema\": {\n"
+                  + "    \"dataSource\": \"metrics-kafka\",\n"
+                  + "    \"parser\": {\n"
+                  + "      \"type\": \"string\",\n"
+                  + "      \"parseSpec\": {\n"
+                  + "        \"format\": \"json\",\n"
+                  + "        \"timestampSpec\": {\n"
+                  + "          \"column\": \"timestamp\",\n"
+                  + "          \"format\": \"auto\"\n"
+                  + "        },\n"
+                  + "        \"dimensionsSpec\": {\n"
+                  + "          \"dimensions\": [],\n"
+                  + "          \"dimensionExclusions\": [\n"
+                  + "            \"timestamp\",\n"
+                  + "            \"value\"\n"
+                  + "          ]\n"
+                  + "        }\n"
+                  + "      }\n"
+                  + "    },\n"
+                  + "    \"metricsSpec\": [\n"
+                  + "      {\n"
+                  + "        \"name\": \"count\",\n"
+                  + "        \"type\": \"count\"\n"
+                  + "      },\n"
+                  + "      {\n"
+                  + "        \"name\": \"value_sum\",\n"
+                  + "        \"fieldName\": \"value\",\n"
+                  + "        \"type\": \"doubleSum\"\n"
+                  + "      },\n"
+                  + "      {\n"
+                  + "        \"name\": \"value_min\",\n"
+                  + "        \"fieldName\": \"value\",\n"
+                  + "        \"type\": \"doubleMin\"\n"
+                  + "      },\n"
+                  + "      {\n"
+                  + "        \"name\": \"value_max\",\n"
+                  + "        \"fieldName\": \"value\",\n"
+                  + "        \"type\": \"doubleMax\"\n"
+                  + "      }\n"
+                  + "    ],\n"
+                  + "    \"granularitySpec\": {\n"
+                  + "      \"type\": \"uniform\",\n"
+                  + "      \"segmentGranularity\": \"HOUR\",\n"
+                  + "      \"queryGranularity\": \"NONE\"\n"
+                  + "    }\n"
+                  + "  },\n"
+                  + "  \"ioConfig\": {\n"
+                  + "    \"topic\": \"metrics\",\n"
+                  + "    \"consumerProperties\": {\n"
+                  + "      \"bootstrap.servers\": \"localhost:9092\"\n"
+                  + "    },\n"
+                  + "    \"taskCount\": 1\n"
+                  + "  }\n"
+                  + "}";
+    KafkaSupervisorSpec spec = mapper.readValue(json, 
KafkaSupervisorSpec.class);
+
+    Assert.assertNotNull(spec);
+    Assert.assertNotNull(spec.getDataSchema());
+    Assert.assertEquals(4, spec.getDataSchema().getAggregators().length);
+    Assert.assertNotNull(spec.getIoConfig());
+    Assert.assertEquals("metrics", spec.getIoConfig().getTopic());
+    Assert.assertNotNull(spec.getTuningConfig());
+    Assert.assertNull(spec.getContext());
+    Assert.assertFalse(spec.isSuspended());
+    String serialized = mapper.writeValueAsString(spec);
+
+    // expect default values populated in reserialized string
+    Assert.assertTrue(serialized.contains("\"tuningConfig\":{"));
+    Assert.assertTrue(serialized.contains("\"indexSpec\":{"));
+    Assert.assertTrue(serialized.contains("\"suspended\":false"));
+
+    KafkaSupervisorSpec spec2 = mapper.readValue(serialized, 
KafkaSupervisorSpec.class);
+
+    String stable = mapper.writeValueAsString(spec2);
+
+    Assert.assertEquals(serialized, stable);
+  }
+
+  @Test
+  public void testSuspendResume() throws IOException
+  {
+    String json = "{\n"
+                  + "  \"type\": \"kafka\",\n"
+                  + "  \"dataSchema\": {\n"
+                  + "    \"dataSource\": \"metrics-kafka\",\n"
+                  + "    \"parser\": {\n"
+                  + "      \"type\": \"string\",\n"
+                  + "      \"parseSpec\": {\n"
+                  + "        \"format\": \"json\",\n"
+                  + "        \"timestampSpec\": {\n"
+                  + "          \"column\": \"timestamp\",\n"
+                  + "          \"format\": \"auto\"\n"
+                  + "        },\n"
+                  + "        \"dimensionsSpec\": {\n"
+                  + "          \"dimensions\": [],\n"
+                  + "          \"dimensionExclusions\": [\n"
+                  + "            \"timestamp\",\n"
+                  + "            \"value\"\n"
+                  + "          ]\n"
+                  + "        }\n"
+                  + "      }\n"
+                  + "    },\n"
+                  + "    \"metricsSpec\": [\n"
+                  + "      {\n"
+                  + "        \"name\": \"count\",\n"
+                  + "        \"type\": \"count\"\n"
+                  + "      },\n"
+                  + "      {\n"
+                  + "        \"name\": \"value_sum\",\n"
+                  + "        \"fieldName\": \"value\",\n"
+                  + "        \"type\": \"doubleSum\"\n"
+                  + "      },\n"
+                  + "      {\n"
+                  + "        \"name\": \"value_min\",\n"
+                  + "        \"fieldName\": \"value\",\n"
+                  + "        \"type\": \"doubleMin\"\n"
+                  + "      },\n"
+                  + "      {\n"
+                  + "        \"name\": \"value_max\",\n"
+                  + "        \"fieldName\": \"value\",\n"
+                  + "        \"type\": \"doubleMax\"\n"
+                  + "      }\n"
+                  + "    ],\n"
+                  + "    \"granularitySpec\": {\n"
+                  + "      \"type\": \"uniform\",\n"
+                  + "      \"segmentGranularity\": \"HOUR\",\n"
+                  + "      \"queryGranularity\": \"NONE\"\n"
+                  + "    }\n"
+                  + "  },\n"
+                  + "  \"ioConfig\": {\n"
+                  + "    \"topic\": \"metrics\",\n"
+                  + "    \"consumerProperties\": {\n"
+                  + "      \"bootstrap.servers\": \"localhost:9092\"\n"
+                  + "    },\n"
+                  + "    \"taskCount\": 1\n"
+                  + "  }\n"
+                  + "}";
+    KafkaSupervisorSpec spec = mapper.readValue(json, 
KafkaSupervisorSpec.class);
+
+    Assert.assertNotNull(spec);
+    Assert.assertNotNull(spec.getDataSchema());
+    Assert.assertEquals(4, spec.getDataSchema().getAggregators().length);
+    Assert.assertNotNull(spec.getIoConfig());
+    Assert.assertEquals("metrics", spec.getIoConfig().getTopic());
+    Assert.assertNotNull(spec.getTuningConfig());
+    Assert.assertNull(spec.getContext());
+    Assert.assertFalse(spec.isSuspended());
+
+    String suspendedSerialized = 
mapper.writeValueAsString(spec.createSuspendedSpec());
+
+    // expect default values populated in reserialized string
+    Assert.assertTrue(suspendedSerialized.contains("\"tuningConfig\":{"));
+    Assert.assertTrue(suspendedSerialized.contains("\"indexSpec\":{"));
+    Assert.assertTrue(suspendedSerialized.contains("\"suspended\":true"));
+
+    KafkaSupervisorSpec suspendedSpec = mapper.readValue(suspendedSerialized, 
KafkaSupervisorSpec.class);
+
+    Assert.assertTrue(suspendedSpec.isSuspended());
+
+    String runningSerialized = 
mapper.writeValueAsString(spec.createRunningSpec());
+
+    KafkaSupervisorSpec runningSpec = mapper.readValue(runningSerialized, 
KafkaSupervisorSpec.class);
+
+    Assert.assertFalse(runningSpec.isSuspended());
+  }
+}
diff --git 
a/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/supervisor/KafkaSupervisorTest.java
 
b/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/supervisor/KafkaSupervisorTest.java
index 964f5f7..978de2f 100644
--- 
a/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/supervisor/KafkaSupervisorTest.java
+++ 
b/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/supervisor/KafkaSupervisorTest.java
@@ -79,6 +79,7 @@ import org.easymock.Capture;
 import org.easymock.CaptureType;
 import org.easymock.EasyMock;
 import org.easymock.EasyMockSupport;
+import org.easymock.IAnswer;
 import org.joda.time.DateTime;
 import org.joda.time.Duration;
 import org.joda.time.Period;
@@ -2293,6 +2294,149 @@ public class KafkaSupervisorTest extends EasyMockSupport
     verifyAll();
   }
 
+  @Test
+  public void testSuspendedNoRunningTasks() throws Exception
+  {
+    supervisor = getSupervisor(1, 1, true, "PT1H", null, null, false, true);
+    addSomeEvents(1);
+
+    
expect(taskMaster.getTaskQueue()).andReturn(Optional.of(taskQueue)).anyTimes();
+    
expect(taskMaster.getTaskRunner()).andReturn(Optional.of(taskRunner)).anyTimes();
+    
expect(taskStorage.getActiveTasks()).andReturn(ImmutableList.of()).anyTimes();
+    
expect(indexerMetadataStorageCoordinator.getDataSourceMetadata(DATASOURCE)).andReturn(
+        new KafkaDataSourceMetadata(
+            null
+        )
+    ).anyTimes();
+    // this asserts that taskQueue.add does not in fact get called because 
supervisor should be suspended
+    expect(taskQueue.add(anyObject())).andAnswer((IAnswer) () -> {
+      Assert.fail();
+      return null;
+    }).anyTimes();
+    taskRunner.registerListener(anyObject(TaskRunnerListener.class), 
anyObject(Executor.class));
+    replayAll();
+
+    supervisor.start();
+    supervisor.runInternal();
+    verifyAll();
+  }
+
+  @Test
+  public void testSuspendedRunningTasks() throws Exception
+  {
+    // graceful shutdown is expected to be called on running tasks since state 
is suspended
+
+    final TaskLocation location1 = new TaskLocation("testHost", 1234, -1);
+    final TaskLocation location2 = new TaskLocation("testHost2", 145, -1);
+    final DateTime startTime = DateTimes.nowUtc();
+
+    supervisor = getSupervisor(2, 1, true, "PT1H", null, null, false, true);
+    addSomeEvents(1);
+
+    Task id1 = createKafkaIndexTask(
+        "id1",
+        DATASOURCE,
+        0,
+        new KafkaPartitions("topic", ImmutableMap.of(0, 0L, 1, 0L, 2, 0L)),
+        new KafkaPartitions("topic", ImmutableMap.of(0, Long.MAX_VALUE, 1, 
Long.MAX_VALUE, 2, Long.MAX_VALUE)),
+        null,
+        null
+    );
+
+    Task id2 = createKafkaIndexTask(
+        "id2",
+        DATASOURCE,
+        0,
+        new KafkaPartitions("topic", ImmutableMap.of(0, 10L, 1, 20L, 2, 30L)),
+        new KafkaPartitions("topic", ImmutableMap.of(0, Long.MAX_VALUE, 1, 
Long.MAX_VALUE, 2, Long.MAX_VALUE)),
+        null,
+        null
+    );
+
+    Task id3 = createKafkaIndexTask(
+        "id3",
+        DATASOURCE,
+        0,
+        new KafkaPartitions("topic", ImmutableMap.of(0, 10L, 1, 20L, 2, 30L)),
+        new KafkaPartitions("topic", ImmutableMap.of(0, Long.MAX_VALUE, 1, 
Long.MAX_VALUE, 2, Long.MAX_VALUE)),
+        null,
+        null
+    );
+
+    Collection workItems = new ArrayList<>();
+    workItems.add(new TestTaskRunnerWorkItem(id1, null, location1));
+    workItems.add(new TestTaskRunnerWorkItem(id2, null, location2));
+
+    
expect(taskMaster.getTaskQueue()).andReturn(Optional.of(taskQueue)).anyTimes();
+    
expect(taskMaster.getTaskRunner()).andReturn(Optional.of(taskRunner)).anyTimes();
+    expect(taskRunner.getRunningTasks()).andReturn(workItems).anyTimes();
+    expect(taskStorage.getActiveTasks()).andReturn(ImmutableList.of(id1, id2, 
id3)).anyTimes();
+    
expect(taskStorage.getStatus("id1")).andReturn(Optional.of(TaskStatus.running("id1"))).anyTimes();
+    
expect(taskStorage.getStatus("id2")).andReturn(Optional.of(TaskStatus.running("id2"))).anyTimes();
+    
expect(taskStorage.getStatus("id3")).andReturn(Optional.of(TaskStatus.running("id3"))).anyTimes();
+    expect(taskStorage.getTask("id1")).andReturn(Optional.of(id1)).anyTimes();
+    expect(taskStorage.getTask("id2")).andReturn(Optional.of(id2)).anyTimes();
+    expect(taskStorage.getTask("id3")).andReturn(Optional.of(id3)).anyTimes();
+    
expect(indexerMetadataStorageCoordinator.getDataSourceMetadata(DATASOURCE)).andReturn(
+        new KafkaDataSourceMetadata(
+            null
+        )
+    ).anyTimes();
+    
expect(taskClient.getStatusAsync("id1")).andReturn(Futures.immediateFuture(KafkaIndexTask.Status.PUBLISHING));
+    
expect(taskClient.getStatusAsync("id2")).andReturn(Futures.immediateFuture(KafkaIndexTask.Status.READING));
+    
expect(taskClient.getStatusAsync("id3")).andReturn(Futures.immediateFuture(KafkaIndexTask.Status.READING));
+    
expect(taskClient.getStartTimeAsync("id2")).andReturn(Futures.immediateFuture(startTime));
+    
expect(taskClient.getStartTimeAsync("id3")).andReturn(Futures.immediateFuture(startTime));
+    expect(taskClient.getEndOffsets("id1")).andReturn(ImmutableMap.of(0, 10L, 
1, 20L, 2, 30L));
+
+    // getCheckpoints will not be called for id1 as it is in publishing state
+    TreeMap<Integer, Map<Integer, Long>> checkpoints = new TreeMap<>();
+    checkpoints.put(0, ImmutableMap.of(0, 10L, 1, 20L, 2, 30L));
+    expect(taskClient.getCheckpointsAsync(EasyMock.contains("id2"), 
anyBoolean()))
+        .andReturn(Futures.immediateFuture(checkpoints))
+        .times(1);
+    expect(taskClient.getCheckpointsAsync(EasyMock.contains("id3"), 
anyBoolean()))
+        .andReturn(Futures.immediateFuture(checkpoints))
+        .times(1);
+
+    taskRunner.registerListener(anyObject(TaskRunnerListener.class), 
anyObject(Executor.class));
+
+    expect(taskClient.pauseAsync("id2"))
+        .andReturn(Futures.immediateFuture(ImmutableMap.of(0, 15L, 1, 25L, 2, 
30L)));
+    expect(taskClient.setEndOffsetsAsync("id2", ImmutableMap.of(0, 15L, 1, 
25L, 2, 30L), true))
+        .andReturn(Futures.immediateFuture(true));
+    taskQueue.shutdown("id3");
+    expectLastCall().times(2);
+
+    replayAll();
+    supervisor.start();
+    supervisor.runInternal();
+    verifyAll();
+  }
+
+  @Test
+  public void testResetSuspended() throws Exception
+  {
+    
expect(taskMaster.getTaskQueue()).andReturn(Optional.of(taskQueue)).anyTimes();
+    
expect(taskMaster.getTaskRunner()).andReturn(Optional.of(taskRunner)).anyTimes();
+    
expect(taskRunner.getRunningTasks()).andReturn(Collections.EMPTY_LIST).anyTimes();
+    
expect(taskStorage.getActiveTasks()).andReturn(ImmutableList.of()).anyTimes();
+    taskRunner.registerListener(anyObject(TaskRunnerListener.class), 
anyObject(Executor.class));
+    replayAll();
+
+    supervisor = getSupervisor(1, 1, true, "PT1H", null, null, false, true);
+    supervisor.start();
+    supervisor.runInternal();
+    verifyAll();
+
+    reset(indexerMetadataStorageCoordinator);
+    
expect(indexerMetadataStorageCoordinator.deleteDataSourceMetadata(DATASOURCE)).andReturn(true);
+    replay(indexerMetadataStorageCoordinator);
+
+    supervisor.resetInternal(null);
+    verifyAll();
+  }
+
   private void addSomeEvents(int numEventsPerPartition) throws Exception
   {
     try (final KafkaProducer<byte[], byte[]> kafkaProducer = 
kafkaServer.newProducer()) {
@@ -2321,6 +2465,29 @@ public class KafkaSupervisorTest extends EasyMockSupport
       boolean skipOffsetGaps
   )
   {
+    return getSupervisor(
+        replicas,
+        taskCount,
+        useEarliestOffset,
+        duration,
+        lateMessageRejectionPeriod,
+        earlyMessageRejectionPeriod,
+        skipOffsetGaps,
+        false
+    );
+  }
+
+  private KafkaSupervisor getSupervisor(
+      int replicas,
+      int taskCount,
+      boolean useEarliestOffset,
+      String duration,
+      Period lateMessageRejectionPeriod,
+      Period earlyMessageRejectionPeriod,
+      boolean skipOffsetGaps,
+      boolean suspended
+  )
+  {
     KafkaSupervisorIOConfig kafkaSupervisorIOConfig = new 
KafkaSupervisorIOConfig(
         topic,
         replicas,
@@ -2368,6 +2535,7 @@ public class KafkaSupervisorTest extends EasyMockSupport
             tuningConfig,
             kafkaSupervisorIOConfig,
             null,
+            suspended,
             taskStorage,
             taskMaster,
             indexerMetadataStorageCoordinator,
@@ -2476,7 +2644,7 @@ public class KafkaSupervisorTest extends EasyMockSupport
     }
 
     @Override
-    public String getDataSource() 
+    public String getDataSource()
     {
       return dataSource;
     }
diff --git 
a/indexing-service/src/main/java/org/apache/druid/indexing/overlord/supervisor/SupervisorManager.java
 
b/indexing-service/src/main/java/org/apache/druid/indexing/overlord/supervisor/SupervisorManager.java
index b5c24ef..cca6fe5 100644
--- 
a/indexing-service/src/main/java/org/apache/druid/indexing/overlord/supervisor/SupervisorManager.java
+++ 
b/indexing-service/src/main/java/org/apache/druid/indexing/overlord/supervisor/SupervisorManager.java
@@ -91,6 +91,19 @@ public class SupervisorManager
     }
   }
 
+  public boolean suspendOrResumeSupervisor(String id, boolean suspend)
+  {
+    Preconditions.checkState(started, "SupervisorManager not started");
+    Pair<Supervisor, SupervisorSpec> pair = supervisors.get(id);
+    Preconditions.checkNotNull(pair.lhs, "spec");
+    synchronized (lock) {
+      Preconditions.checkState(started, "SupervisorManager not started");
+      SupervisorSpec nextState = suspend ? pair.rhs.createSuspendedSpec() : 
pair.rhs.createRunningSpec();
+      possiblyStopAndRemoveSupervisorInternal(nextState.getId(), false);
+      return createAndStartSupervisorInternal(nextState, true);
+    }
+  }
+
   @LifecycleStart
   public void start()
   {
diff --git 
a/indexing-service/src/main/java/org/apache/druid/indexing/overlord/supervisor/SupervisorResource.java
 
b/indexing-service/src/main/java/org/apache/druid/indexing/overlord/supervisor/SupervisorResource.java
index 9be9c9f..d3e19bb 100644
--- 
a/indexing-service/src/main/java/org/apache/druid/indexing/overlord/supervisor/SupervisorResource.java
+++ 
b/indexing-service/src/main/java/org/apache/druid/indexing/overlord/supervisor/SupervisorResource.java
@@ -44,6 +44,7 @@ import javax.ws.rs.POST;
 import javax.ws.rs.Path;
 import javax.ws.rs.PathParam;
 import javax.ws.rs.Produces;
+import javax.ws.rs.QueryParam;
 import javax.ws.rs.core.Context;
 import javax.ws.rs.core.MediaType;
 import javax.ws.rs.core.Response;
@@ -52,6 +53,7 @@ import java.util.Collection;
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
+import java.util.stream.Collectors;
 
 /**
  * Endpoints for submitting and starting a {@link SupervisorSpec}, getting 
running supervisors, stopping supervisors,
@@ -90,49 +92,55 @@ public class SupervisorResource
   public Response specPost(final SupervisorSpec spec, @Context final 
HttpServletRequest req)
   {
     return asLeaderWithSupervisorManager(
-        new Function<SupervisorManager, Response>()
-        {
-          @Override
-          public Response apply(SupervisorManager manager)
-          {
-            Preconditions.checkArgument(
-                spec.getDataSources() != null && spec.getDataSources().size() 
> 0,
-                "No dataSources found to perform authorization checks"
-            );
-
-            Access authResult = AuthorizationUtils.authorizeAllResourceActions(
-                req,
-                Iterables.transform(spec.getDataSources(), 
AuthorizationUtils.DATASOURCE_WRITE_RA_GENERATOR),
-                authorizerMapper
-            );
+        manager -> {
+          Preconditions.checkArgument(
+              spec.getDataSources() != null && spec.getDataSources().size() > 
0,
+              "No dataSources found to perform authorization checks"
+          );
 
-            if (!authResult.isAllowed()) {
-              throw new ForbiddenException(authResult.toString());
-            }
+          Access authResult = AuthorizationUtils.authorizeAllResourceActions(
+              req,
+              Iterables.transform(spec.getDataSources(), 
AuthorizationUtils.DATASOURCE_WRITE_RA_GENERATOR),
+              authorizerMapper
+          );
 
-            manager.createOrUpdateAndStartSupervisor(spec);
-            return Response.ok(ImmutableMap.of("id", spec.getId())).build();
+          if (!authResult.isAllowed()) {
+            throw new ForbiddenException(authResult.toString());
           }
+
+          manager.createOrUpdateAndStartSupervisor(spec);
+          return Response.ok(ImmutableMap.of("id", spec.getId())).build();
         }
     );
   }
 
   @GET
   @Produces(MediaType.APPLICATION_JSON)
-  public Response specGetAll(@Context final HttpServletRequest req)
+  public Response specGetAll(
+      @QueryParam("full") String full,
+      @Context final HttpServletRequest req
+  )
   {
     return asLeaderWithSupervisorManager(
-        new Function<SupervisorManager, Response>()
-        {
-          @Override
-          public Response apply(final SupervisorManager manager)
-          {
-            Set<String> authorizedSupervisorIds = 
filterAuthorizedSupervisorIds(
-                req,
-                manager,
-                manager.getSupervisorIds()
-            );
+        manager -> {
+          Set<String> authorizedSupervisorIds = filterAuthorizedSupervisorIds(
+              req,
+              manager,
+              manager.getSupervisorIds()
+          );
+
+          if (full == null) {
             return Response.ok(authorizedSupervisorIds).build();
+          } else {
+            List<Map<String, ?>> all =
+                authorizedSupervisorIds.stream()
+                                       .map(x -> ImmutableMap.<String, 
Object>builder()
+                                           .put("id", x)
+                                           .put("spec", 
manager.getSupervisorSpec(x).get())
+                                           .build()
+                                       )
+                                       .collect(Collectors.toList());
+            return Response.ok(all).build();
           }
         }
     );
@@ -145,20 +153,15 @@ public class SupervisorResource
   public Response specGet(@PathParam("id") final String id)
   {
     return asLeaderWithSupervisorManager(
-        new Function<SupervisorManager, Response>()
-        {
-          @Override
-          public Response apply(SupervisorManager manager)
-          {
-            Optional<SupervisorSpec> spec = manager.getSupervisorSpec(id);
-            if (!spec.isPresent()) {
-              return Response.status(Response.Status.NOT_FOUND)
-                             .entity(ImmutableMap.of("error", 
StringUtils.format("[%s] does not exist", id)))
-                             .build();
-            }
-
-            return Response.ok(spec.get()).build();
+        manager -> {
+          Optional<SupervisorSpec> spec = manager.getSupervisorSpec(id);
+          if (!spec.isPresent()) {
+            return Response.status(Response.Status.NOT_FOUND)
+                           .entity(ImmutableMap.of("error", 
StringUtils.format("[%s] does not exist", id)))
+                           .build();
           }
+
+          return Response.ok(spec.get()).build();
         }
     );
   }
@@ -170,20 +173,15 @@ public class SupervisorResource
   public Response specGetStatus(@PathParam("id") final String id)
   {
     return asLeaderWithSupervisorManager(
-        new Function<SupervisorManager, Response>()
-        {
-          @Override
-          public Response apply(SupervisorManager manager)
-          {
-            Optional<SupervisorReport> spec = manager.getSupervisorStatus(id);
-            if (!spec.isPresent()) {
-              return Response.status(Response.Status.NOT_FOUND)
-                             .entity(ImmutableMap.of("error", 
StringUtils.format("[%s] does not exist", id)))
-                             .build();
-            }
-
-            return Response.ok(spec.get()).build();
+        manager -> {
+          Optional<SupervisorReport> spec = manager.getSupervisorStatus(id);
+          if (!spec.isPresent()) {
+            return Response.status(Response.Status.NOT_FOUND)
+                           .entity(ImmutableMap.of("error", 
StringUtils.format("[%s] does not exist", id)))
+                           .build();
           }
+
+          return Response.ok(spec.get()).build();
         }
     );
   }
@@ -197,49 +195,66 @@ public class SupervisorResource
   )
   {
     return asLeaderWithSupervisorManager(
-        new Function<SupervisorManager, Response>()
-        {
-          @Override
-          public Response apply(SupervisorManager manager)
-          {
-            Optional<Map<String, Map<String, Object>>> stats = 
manager.getSupervisorStats(id);
-            if (!stats.isPresent()) {
-              return Response.status(Response.Status.NOT_FOUND)
-                             .entity(
-                                 ImmutableMap.of(
-                                     "error",
-                                     StringUtils.format("[%s] does not exist", 
id)
-                                 )
-                             )
-                             .build();
-            }
-
-            return Response.ok(stats.get()).build();
+        manager -> {
+          Optional<Map<String, Map<String, Object>>> stats = 
manager.getSupervisorStats(id);
+          if (!stats.isPresent()) {
+            return Response.status(Response.Status.NOT_FOUND)
+                           .entity(
+                               ImmutableMap.of(
+                                   "error",
+                                   StringUtils.format("[%s] does not exist", 
id)
+                               )
+                           )
+                           .build();
           }
+
+          return Response.ok(stats.get()).build();
         }
     );
   }
 
+  @POST
+  @Path("/{id}/resume")
+  @Produces(MediaType.APPLICATION_JSON)
+  @ResourceFilters(SupervisorResourceFilter.class)
+  public Response specResume(@PathParam("id") final String id)
+  {
+    return specSuspendOrResume(id, false);
+  }
+
+  @POST
+  @Path("/{id}/suspend")
+  @Produces(MediaType.APPLICATION_JSON)
+  @ResourceFilters(SupervisorResourceFilter.class)
+  public Response specSuspend(@PathParam("id") final String id)
+  {
+    return specSuspendOrResume(id, true);
+  }
 
+  @Deprecated
   @POST
   @Path("/{id}/shutdown")
   @Produces(MediaType.APPLICATION_JSON)
   @ResourceFilters(SupervisorResourceFilter.class)
   public Response shutdown(@PathParam("id") final String id)
   {
+    return terminate(id);
+  }
+
+  @POST
+  @Path("/{id}/terminate")
+  @Produces(MediaType.APPLICATION_JSON)
+  @ResourceFilters(SupervisorResourceFilter.class)
+  public Response terminate(@PathParam("id") final String id)
+  {
     return asLeaderWithSupervisorManager(
-        new Function<SupervisorManager, Response>()
-        {
-          @Override
-          public Response apply(SupervisorManager manager)
-          {
-            if (manager.stopAndRemoveSupervisor(id)) {
-              return Response.ok(ImmutableMap.of("id", id)).build();
-            } else {
-              return Response.status(Response.Status.NOT_FOUND)
-                             .entity(ImmutableMap.of("error", 
StringUtils.format("[%s] does not exist", id)))
-                             .build();
-            }
+        manager -> {
+          if (manager.stopAndRemoveSupervisor(id)) {
+            return Response.ok(ImmutableMap.of("id", id)).build();
+          } else {
+            return Response.status(Response.Status.NOT_FOUND)
+                           .entity(ImmutableMap.of("error", 
StringUtils.format("[%s] does not exist", id)))
+                           .build();
           }
         }
     );
@@ -251,21 +266,14 @@ public class SupervisorResource
   public Response specGetAllHistory(@Context final HttpServletRequest req)
   {
     return asLeaderWithSupervisorManager(
-        new Function<SupervisorManager, Response>()
-        {
-          @Override
-          public Response apply(final SupervisorManager manager)
-          {
-            return Response.ok(
-                AuthorizationUtils.filterAuthorizedResources(
-                    req,
-                    manager.getSupervisorHistory(),
-                    SPEC_DATASOURCE_READ_RA_GENERATOR,
-                    authorizerMapper
-                )
-            ).build();
-          }
-        }
+        manager -> Response.ok(
+            AuthorizationUtils.filterAuthorizedResources(
+                req,
+                manager.getSupervisorHistory(),
+                SPEC_DATASOURCE_READ_RA_GENERATOR,
+                authorizerMapper
+            )
+        ).build()
     );
   }
 
@@ -277,38 +285,33 @@ public class SupervisorResource
       @PathParam("id") final String id)
   {
     return asLeaderWithSupervisorManager(
-        new Function<SupervisorManager, Response>()
-        {
-          @Override
-          public Response apply(SupervisorManager manager)
-          {
-            Map<String, List<VersionedSupervisorSpec>> supervisorHistory = 
manager.getSupervisorHistory();
-            Iterable<VersionedSupervisorSpec> historyForId = 
supervisorHistory.get(id);
-            if (historyForId != null) {
-              final List<VersionedSupervisorSpec> authorizedHistoryForId =
-                  Lists.newArrayList(
-                      AuthorizationUtils.filterAuthorizedResources(
-                          req,
-                          historyForId,
-                          SPEC_DATASOURCE_READ_RA_GENERATOR,
-                          authorizerMapper
-                      )
-                  );
-              if (authorizedHistoryForId.size() > 0) {
-                return Response.ok(authorizedHistoryForId).build();
-              }
+        manager -> {
+          Map<String, List<VersionedSupervisorSpec>> supervisorHistory = 
manager.getSupervisorHistory();
+          Iterable<VersionedSupervisorSpec> historyForId = 
supervisorHistory.get(id);
+          if (historyForId != null) {
+            final List<VersionedSupervisorSpec> authorizedHistoryForId =
+                Lists.newArrayList(
+                    AuthorizationUtils.filterAuthorizedResources(
+                        req,
+                        historyForId,
+                        SPEC_DATASOURCE_READ_RA_GENERATOR,
+                        authorizerMapper
+                    )
+                );
+            if (authorizedHistoryForId.size() > 0) {
+              return Response.ok(authorizedHistoryForId).build();
             }
+          }
 
-            return Response.status(Response.Status.NOT_FOUND)
-                           .entity(
-                               ImmutableMap.of(
-                                   "error",
-                                   StringUtils.format("No history for [%s].", 
id)
-                               )
-                           )
-                           .build();
+          return Response.status(Response.Status.NOT_FOUND)
+                         .entity(
+                             ImmutableMap.of(
+                                 "error",
+                                 StringUtils.format("No history for [%s].", id)
+                             )
+                         )
+                         .build();
 
-          }
         }
     );
   }
@@ -320,18 +323,13 @@ public class SupervisorResource
   public Response reset(@PathParam("id") final String id)
   {
     return asLeaderWithSupervisorManager(
-        new Function<SupervisorManager, Response>()
-        {
-          @Override
-          public Response apply(SupervisorManager manager)
-          {
-            if (manager.resetSupervisor(id, null)) {
-              return Response.ok(ImmutableMap.of("id", id)).build();
-            } else {
-              return Response.status(Response.Status.NOT_FOUND)
-                             .entity(ImmutableMap.of("error", 
StringUtils.format("[%s] does not exist", id)))
-                             .build();
-            }
+        manager -> {
+          if (manager.resetSupervisor(id, null)) {
+            return Response.ok(ImmutableMap.of("id", id)).build();
+          } else {
+            return Response.status(Response.Status.NOT_FOUND)
+                           .entity(ImmutableMap.of("error", 
StringUtils.format("[%s] does not exist", id)))
+                           .build();
           }
         }
     );
@@ -375,4 +373,29 @@ public class SupervisorResource
         )
     );
   }
+
+  private Response specSuspendOrResume(final String id, boolean suspend)
+  {
+    return asLeaderWithSupervisorManager(
+        manager -> {
+          Optional<SupervisorSpec> spec = manager.getSupervisorSpec(id);
+          if (!spec.isPresent()) {
+            return Response.status(Response.Status.NOT_FOUND)
+                           .entity(ImmutableMap.of("error", 
StringUtils.format("[%s] does not exist", id)))
+                           .build();
+          }
+
+          if (spec.get().isSuspended() == suspend) {
+            final String errMsg =
+                StringUtils.format("[%s] is already %s", id, suspend ? 
"suspended" : "running");
+            return Response.status(Response.Status.BAD_REQUEST)
+                           .entity(ImmutableMap.of("error", errMsg))
+                           .build();
+          }
+          manager.suspendOrResumeSupervisor(id, suspend);
+          spec = manager.getSupervisorSpec(id);
+          return Response.ok(spec.get()).build();
+        }
+    );
+  }
 }
diff --git 
a/indexing-service/src/main/resources/indexer_static/js/console-0.0.1.js 
b/indexing-service/src/main/resources/indexer_static/js/console-0.0.1.js
index 5ec87c5..a3bf8e6 100644
--- a/indexing-service/src/main/resources/indexer_static/js/console-0.0.1.js
+++ b/indexing-service/src/main/resources/indexer_static/js/console-0.0.1.js
@@ -6,7 +6,7 @@ var killTask = function(taskId) {
   if(confirm('Do you really want to kill: '+taskId)) {
     $.ajax({
       type:'POST',
-      url: '/druid/indexer/v1/task/'+ taskId +'/shutdown',
+      url: '/druid/indexer/v1/task/'+ taskId +'/terminate',
       data: ''
     }).done(function(data) {
       setTimeout(function() { location.reload(true) }, 750);
@@ -16,6 +16,42 @@ var killTask = function(taskId) {
   }
 }
 
+
+var suspendSupervisor = function(supervisorId) {
+  if(confirm('Do you really want to suspend: '+ supervisorId)) {
+    $.ajax({
+      type:'POST',
+      url: '/druid/indexer/v1/supervisor/' + supervisorId + '/suspend',
+      data: ''
+    }).done(function(data) {
+      setTimeout(function() { location.reload(true) }, 750);
+    }).fail(function(data) {
+      var errMsg = data && data.responseJSON && data.responseJSON.error ?
+           data.responseJSON.error :
+          'suspend request failed, please check overlord logs for details.';
+      alert(errMsg);
+    })
+  }
+}
+
+
+var resumeSupervisor = function(supervisorId) {
+  if(confirm('Do you really want to resume: '+ supervisorId)) {
+    $.ajax({
+      type:'POST',
+      url: '/druid/indexer/v1/supervisor/' + supervisorId + '/resume',
+      data: ''
+    }).done(function(data) {
+      setTimeout(function() { location.reload(true) }, 750);
+    }).fail(function(data) {
+      var errMsg = data && data.responseJSON && data.responseJSON.error ?
+       data.responseJSON.error :
+      'resume request failed, please check overlord logs for details.';
+      alert(errMsg);
+    })
+  }
+}
+
 var resetSupervisor = function(supervisorId) {
   if(confirm('Do you really want to reset: '+ supervisorId)) {
     $.ajax({
@@ -31,7 +67,7 @@ var resetSupervisor = function(supervisorId) {
 }
 
 var shutdownSupervisor = function(supervisorId) {
-  if(confirm('Do you really want to shutdown: '+ supervisorId)) {
+  if(confirm('Do you really want to terminate: '+ supervisorId)) {
     $.ajax({
       type:'POST',
       url: '/druid/indexer/v1/supervisor/' + supervisorId + '/shutdown',
@@ -39,7 +75,7 @@ var shutdownSupervisor = function(supervisorId) {
     }).done(function(data) {
       setTimeout(function() { location.reload(true) }, 750);
     }).fail(function(data) {
-      alert('Shutdown request failed, please check overlord logs for 
details.');
+      alert('Terminate request failed, please check overlord logs for 
details.');
     })
   }
 }
@@ -59,18 +95,28 @@ $(document).ready(function() {
     }
   }
 
-  $.get('/druid/indexer/v1/supervisor', function(dataList) {
+  $.get('/druid/indexer/v1/supervisor?full', function(dataList) {
+
     var data = []
     for (i = 0 ; i < dataList.length ; i++) {
-      var supervisorId = encodeURIComponent(dataList[i])
+      var supervisorId = encodeURIComponent(dataList[i].id)
+      var supervisorSpec = dataList[i].spec;
+      var statusText = supervisorSpec && supervisorSpec.suspended ?
+       '<span style="color:#FF6000">suspended</span>' :
+       '<span style="color:#08B157">running</span>';
       data[i] = {
         "dataSource" : supervisorId,
         "more" :
           '<a href="/druid/indexer/v1/supervisor/' + supervisorId + 
'">payload</a>' +
           '<a href="/druid/indexer/v1/supervisor/' + supervisorId + 
'/status">status</a>' +
           '<a href="/druid/indexer/v1/supervisor/' + supervisorId + 
'/history">history</a>' +
+          (supervisorSpec.suspended ?
+           '<a style="padding-right:5px;" onclick="resumeSupervisor(\'' + 
supervisorId + '\');">resume</a>' :
+           '<a onclick="suspendSupervisor(\'' + supervisorId + 
'\');">suspend</a>'
+          ) +
           '<a onclick="resetSupervisor(\'' + supervisorId + '\');">reset</a>' +
-          '<a onclick="shutdownSupervisor(\'' + supervisorId + 
'\');">shutdown</a>'
+          '<a onclick="shutdownSupervisor(\'' + supervisorId + 
'\');">terminate</a>',
+        "status": statusText
       }
     }
     buildTable((data), $('#supervisorsTable'));
diff --git 
a/indexing-service/src/test/java/org/apache/druid/indexing/overlord/http/security/OverlordSecurityResourceFilterTest.java
 
b/indexing-service/src/test/java/org/apache/druid/indexing/overlord/http/security/OverlordSecurityResourceFilterTest.java
index abdd60a..0a97709 100644
--- 
a/indexing-service/src/test/java/org/apache/druid/indexing/overlord/http/security/OverlordSecurityResourceFilterTest.java
+++ 
b/indexing-service/src/test/java/org/apache/druid/indexing/overlord/http/security/OverlordSecurityResourceFilterTest.java
@@ -131,6 +131,24 @@ public class OverlordSecurityResourceFilterTest extends 
ResourceFilterTestHelper
         {
           return ImmutableList.of("test");
         }
+
+        @Override
+        public SupervisorSpec createSuspendedSpec()
+        {
+          return null;
+        }
+
+        @Override
+        public SupervisorSpec createRunningSpec()
+        {
+          return null;
+        }
+
+        @Override
+        public boolean isSuspended()
+        {
+          return false;
+        }
       };
       
EasyMock.expect(supervisorManager.getSupervisorSpec(EasyMock.anyString()))
               .andReturn(Optional.of(supervisorSpec))
diff --git 
a/indexing-service/src/test/java/org/apache/druid/indexing/overlord/supervisor/SupervisorManagerTest.java
 
b/indexing-service/src/test/java/org/apache/druid/indexing/overlord/supervisor/SupervisorManagerTest.java
index f91dc1a..85cfd95 100644
--- 
a/indexing-service/src/test/java/org/apache/druid/indexing/overlord/supervisor/SupervisorManagerTest.java
+++ 
b/indexing-service/src/test/java/org/apache/druid/indexing/overlord/supervisor/SupervisorManagerTest.java
@@ -24,6 +24,7 @@ import com.google.common.collect.ImmutableMap;
 import org.apache.druid.indexing.overlord.DataSourceMetadata;
 import org.apache.druid.java.util.common.DateTimes;
 import org.apache.druid.metadata.MetadataSupervisorManager;
+import org.easymock.Capture;
 import org.easymock.EasyMock;
 import org.easymock.EasyMockRunner;
 import org.easymock.EasyMockSupport;
@@ -40,6 +41,7 @@ import java.util.List;
 import java.util.Map;
 
 import static org.easymock.EasyMock.anyObject;
+import static org.easymock.EasyMock.capture;
 import static org.easymock.EasyMock.eq;
 
 @RunWith(EasyMockRunner.class)
@@ -263,15 +265,114 @@ public class SupervisorManagerTest extends 
EasyMockSupport
     verifyAll();
   }
 
+  @Test
+  public void testCreateSuspendResumeAndStopSupervisor()
+  {
+    Capture<TestSupervisorSpec> capturedInsert = Capture.newInstance();
+    SupervisorSpec spec = new TestSupervisorSpec("id1", supervisor1, false, 
supervisor2);
+    Map<String, SupervisorSpec> existingSpecs = ImmutableMap.of(
+        "id3", new TestSupervisorSpec("id3", supervisor3)
+    );
+
+    // mock adding a supervisor to manager with existing supervisor then 
suspending it
+    Assert.assertTrue(manager.getSupervisorIds().isEmpty());
+
+    
EasyMock.expect(metadataSupervisorManager.getLatest()).andReturn(existingSpecs);
+    metadataSupervisorManager.insert("id1", spec);
+    supervisor3.start();
+    supervisor1.start();
+    replayAll();
+
+    manager.start();
+    Assert.assertEquals(1, manager.getSupervisorIds().size());
+
+    manager.createOrUpdateAndStartSupervisor(spec);
+    Assert.assertEquals(2, manager.getSupervisorIds().size());
+    Assert.assertEquals(spec, manager.getSupervisorSpec("id1").get());
+    verifyAll();
+
+    // mock suspend, which stops supervisor1 and sets suspended state in 
metadata, flipping to supervisor2
+    // in TestSupervisorSpec implementation of createSuspendedSpec
+    resetAll();
+    metadataSupervisorManager.insert(eq("id1"), capture(capturedInsert));
+    supervisor2.start();
+    supervisor1.stop(true);
+    replayAll();
+
+    manager.suspendOrResumeSupervisor("id1", true);
+    Assert.assertEquals(2, manager.getSupervisorIds().size());
+    Assert.assertEquals(capturedInsert.getValue(), 
manager.getSupervisorSpec("id1").get());
+    Assert.assertTrue(capturedInsert.getValue().suspended);
+    verifyAll();
+
+    // mock resume, which stops supervisor2 and sets suspended to false in 
metadata, flipping to supervisor1
+    // in TestSupervisorSpec implementation of createRunningSpec
+    resetAll();
+    metadataSupervisorManager.insert(eq("id1"), capture(capturedInsert));
+    supervisor2.stop(true);
+    supervisor1.start();
+    replayAll();
+
+    manager.suspendOrResumeSupervisor("id1", false);
+    Assert.assertEquals(2, manager.getSupervisorIds().size());
+    Assert.assertEquals(capturedInsert.getValue(), 
manager.getSupervisorSpec("id1").get());
+    Assert.assertFalse(capturedInsert.getValue().suspended);
+    verifyAll();
+
+    // mock stop of suspended then resumed supervisor
+    resetAll();
+    metadataSupervisorManager.insert(eq("id1"), 
anyObject(NoopSupervisorSpec.class));
+    supervisor1.stop(true);
+    replayAll();
+
+    boolean retVal = manager.stopAndRemoveSupervisor("id1");
+    Assert.assertTrue(retVal);
+    Assert.assertEquals(1, manager.getSupervisorIds().size());
+    Assert.assertEquals(Optional.absent(), manager.getSupervisorSpec("id1"));
+    verifyAll();
+
+    // mock manager shutdown to ensure supervisor 3 stops
+    resetAll();
+    supervisor3.stop(false);
+    replayAll();
+
+    manager.stop();
+    verifyAll();
+
+    Assert.assertTrue(manager.getSupervisorIds().isEmpty());
+  }
+
+
   private static class TestSupervisorSpec implements SupervisorSpec
   {
     private final String id;
     private final Supervisor supervisor;
+    private final boolean suspended;
+    private final Supervisor suspendedSupervisor;
+
 
     public TestSupervisorSpec(String id, Supervisor supervisor)
     {
+      this(id, supervisor, false, null);
+    }
+
+    public TestSupervisorSpec(String id, Supervisor supervisor, boolean 
suspended, Supervisor suspendedSupervisor)
+    {
       this.id = id;
       this.supervisor = supervisor;
+      this.suspended = suspended;
+      this.suspendedSupervisor = suspendedSupervisor;
+    }
+    @Override
+    public SupervisorSpec createSuspendedSpec()
+    {
+      return new TestSupervisorSpec(id, suspendedSupervisor, true, supervisor);
+    }
+
+    @Override
+    public SupervisorSpec createRunningSpec()
+    {
+      return new TestSupervisorSpec(id, suspendedSupervisor, false, 
supervisor);
     }
 
     @Override
@@ -287,10 +388,15 @@ public class SupervisorManagerTest extends EasyMockSupport
     }
 
     @Override
+    public boolean isSuspended()
+    {
+      return suspended;
+    }
+
+    @Override
     public List<String> getDataSources()
     {
       return new ArrayList<>();
     }
-
   }
 }
diff --git 
a/indexing-service/src/test/java/org/apache/druid/indexing/overlord/supervisor/SupervisorResourceTest.java
 
b/indexing-service/src/test/java/org/apache/druid/indexing/overlord/supervisor/SupervisorResourceTest.java
index a3434bc..9d6eab3 100644
--- 
a/indexing-service/src/test/java/org/apache/druid/indexing/overlord/supervisor/SupervisorResourceTest.java
+++ 
b/indexing-service/src/test/java/org/apache/druid/indexing/overlord/supervisor/SupervisorResourceTest.java
@@ -29,12 +29,10 @@ import 
org.apache.druid.indexing.overlord.DataSourceMetadata;
 import org.apache.druid.indexing.overlord.TaskMaster;
 import org.apache.druid.java.util.common.DateTimes;
 import org.apache.druid.server.security.Access;
-import org.apache.druid.server.security.Action;
 import org.apache.druid.server.security.AuthConfig;
 import org.apache.druid.server.security.AuthenticationResult;
 import org.apache.druid.server.security.Authorizer;
 import org.apache.druid.server.security.AuthorizerMapper;
-import org.apache.druid.server.security.Resource;
 import org.easymock.Capture;
 import org.easymock.EasyMock;
 import org.easymock.EasyMockRunner;
@@ -75,21 +73,14 @@ public class SupervisorResourceTest extends EasyMockSupport
           @Override
           public Authorizer getAuthorizer(String name)
           {
-            return new Authorizer()
-            {
-              @Override
-              public Access authorize(
-                  AuthenticationResult authenticationResult, Resource 
resource, Action action
-              )
-              {
-                if (authenticationResult.getIdentity().equals("druid")) {
-                  return Access.OK;
+            return (authenticationResult, resource, action) -> {
+              if (authenticationResult.getIdentity().equals("druid")) {
+                return Access.OK;
+              } else {
+                if (resource.getName().equals("datasource2")) {
+                  return new Access(false, "not authorized.");
                 } else {
-                  if (resource.getName().equals("datasource2")) {
-                    return new Access(false, "not authorized.");
-                  } else {
-                    return Access.OK;
-                  }
+                  return Access.OK;
                 }
               }
             };
@@ -171,7 +162,7 @@ public class SupervisorResourceTest extends EasyMockSupport
     EasyMock.expectLastCall().anyTimes();
     replayAll();
 
-    Response response = supervisorResource.specGetAll(request);
+    Response response = supervisorResource.specGetAll(null, request);
     verifyAll();
 
     Assert.assertEquals(200, response.getStatus());
@@ -181,13 +172,62 @@ public class SupervisorResourceTest extends 
EasyMockSupport
     
EasyMock.expect(taskMaster.getSupervisorManager()).andReturn(Optional.absent());
     replayAll();
 
-    response = supervisorResource.specGetAll(request);
+    response = supervisorResource.specGetAll(null, request);
     verifyAll();
 
     Assert.assertEquals(503, response.getStatus());
   }
 
   @Test
+  public void testSpecGetAllFull()
+  {
+    Set<String> supervisorIds = ImmutableSet.of("id1", "id2");
+
+    SupervisorSpec spec1 = new TestSupervisorSpec("id1", null, null) {
+
+      @Override
+      public List<String> getDataSources()
+      {
+        return Collections.singletonList("datasource1");
+      }
+    };
+    SupervisorSpec spec2 = new TestSupervisorSpec("id2", null, null) {
+
+      @Override
+      public List<String> getDataSources()
+      {
+        return Collections.singletonList("datasource2");
+      }
+    };
+
+    
EasyMock.expect(taskMaster.getSupervisorManager()).andReturn(Optional.of(supervisorManager));
+    
EasyMock.expect(supervisorManager.getSupervisorIds()).andReturn(supervisorIds).atLeastOnce();
+    
EasyMock.expect(supervisorManager.getSupervisorSpec("id1")).andReturn(Optional.of(spec1)).times(2);
+    
EasyMock.expect(supervisorManager.getSupervisorSpec("id2")).andReturn(Optional.of(spec2)).times(2);
+    
EasyMock.expect(request.getAttribute(AuthConfig.DRUID_ALLOW_UNSECURED_PATH)).andReturn(null).atLeastOnce();
+    
EasyMock.expect(request.getAttribute(AuthConfig.DRUID_AUTHORIZATION_CHECKED)).andReturn(null).atLeastOnce();
+    
EasyMock.expect(request.getAttribute(AuthConfig.DRUID_AUTHENTICATION_RESULT)).andReturn(
+        new AuthenticationResult("druid", "druid", null, null)
+    ).atLeastOnce();
+    request.setAttribute(AuthConfig.DRUID_AUTHORIZATION_CHECKED, true);
+    EasyMock.expectLastCall().anyTimes();
+    replayAll();
+
+    Response response = supervisorResource.specGetAll("", request);
+    verifyAll();
+
+    Assert.assertEquals(200, response.getStatus());
+    List<Map<String, Object>> specs = (List<Map<String, Object>>) 
response.getEntity();
+    Assert.assertTrue(
+        specs.stream()
+             .allMatch(spec ->
+                           ("id1".equals(spec.get("id")) && 
spec1.equals(spec.get("spec"))) ||
+                           ("id2".equals(spec.get("id")) && 
spec2.equals(spec.get("spec")))
+             )
+    );
+  }
+
+  @Test
   public void testSpecGet()
   {
     SupervisorSpec spec = new TestSupervisorSpec("my-id", null, null);
@@ -250,6 +290,101 @@ public class SupervisorResourceTest extends 
EasyMockSupport
   }
 
   @Test
+  public void testSpecSuspend()
+  {
+
+    TestSupervisorSpec running = new TestSupervisorSpec("my-id", null, null, 
false) {
+      @Override
+      public List<String> getDataSources()
+      {
+        return Collections.singletonList("datasource1");
+      }
+    };
+    TestSupervisorSpec suspended = new TestSupervisorSpec("my-id", null, null, 
true) {
+      @Override
+      public List<String> getDataSources()
+      {
+        return Collections.singletonList("datasource1");
+      }
+    };
+
+    
EasyMock.expect(taskMaster.getSupervisorManager()).andReturn(Optional.of(supervisorManager));
+    EasyMock.expect(supervisorManager.getSupervisorSpec("my-id"))
+            .andReturn(Optional.of(running)).times(1)
+            .andReturn(Optional.of(suspended)).times(1);
+    EasyMock.expect(supervisorManager.suspendOrResumeSupervisor("my-id", 
true)).andReturn(true);
+    EasyMock.expectLastCall().anyTimes();
+    replayAll();
+
+    Response response = supervisorResource.specSuspend("my-id");
+    verifyAll();
+
+    Assert.assertEquals(200, response.getStatus());
+    TestSupervisorSpec responseSpec = (TestSupervisorSpec) 
response.getEntity();
+    Assert.assertEquals(suspended.id, responseSpec.id);
+    Assert.assertEquals(suspended.suspended, responseSpec.suspended);
+    resetAll();
+
+    
EasyMock.expect(taskMaster.getSupervisorManager()).andReturn(Optional.of(supervisorManager));
+    
EasyMock.expect(supervisorManager.getSupervisorSpec("my-id")).andReturn(Optional.of(suspended)).atLeastOnce();
+    replayAll();
+
+    response = supervisorResource.specSuspend("my-id");
+    verifyAll();
+
+    Assert.assertEquals(400, response.getStatus());
+    Assert.assertEquals(ImmutableMap.of("error", "[my-id] is already 
suspended"), response.getEntity());
+  }
+
+
+
+  @Test
+  public void testSpecResume()
+  {
+    TestSupervisorSpec suspended = new TestSupervisorSpec("my-id", null, null, 
true) {
+      @Override
+      public List<String> getDataSources()
+      {
+        return Collections.singletonList("datasource1");
+      }
+    };
+    TestSupervisorSpec running = new TestSupervisorSpec("my-id", null, null, 
false) {
+      @Override
+      public List<String> getDataSources()
+      {
+        return Collections.singletonList("datasource1");
+      }
+    };
+
+    
EasyMock.expect(taskMaster.getSupervisorManager()).andReturn(Optional.of(supervisorManager));
+    EasyMock.expect(supervisorManager.getSupervisorSpec("my-id"))
+            .andReturn(Optional.of(suspended)).times(1)
+            .andReturn(Optional.of(running)).times(1);
+    EasyMock.expect(supervisorManager.suspendOrResumeSupervisor("my-id", 
false)).andReturn(true);
+    EasyMock.expectLastCall().anyTimes();
+    replayAll();
+
+    Response response = supervisorResource.specResume("my-id");
+    verifyAll();
+
+    Assert.assertEquals(200, response.getStatus());
+    TestSupervisorSpec responseSpec = (TestSupervisorSpec) 
response.getEntity();
+    Assert.assertEquals(running.id, responseSpec.id);
+    Assert.assertEquals(running.suspended, responseSpec.suspended);
+    resetAll();
+
+    
EasyMock.expect(taskMaster.getSupervisorManager()).andReturn(Optional.of(supervisorManager));
+    
EasyMock.expect(supervisorManager.getSupervisorSpec("my-id")).andReturn(Optional.of(running)).atLeastOnce();
+    replayAll();
+
+    response = supervisorResource.specResume("my-id");
+    verifyAll();
+
+    Assert.assertEquals(400, response.getStatus());
+    Assert.assertEquals(ImmutableMap.of("error", "[my-id] is already 
running"), response.getEntity());
+  }
+
+  @Test
   public void testShutdown()
   {
     
EasyMock.expect(taskMaster.getSupervisorManager()).andReturn(Optional.of(supervisorManager)).times(2);
@@ -762,9 +897,10 @@ public class SupervisorResourceTest extends EasyMockSupport
 
   private static class TestSupervisorSpec implements SupervisorSpec
   {
-    private final String id;
-    private final Supervisor supervisor;
-    private final List<String> datasources;
+    protected final String id;
+    protected final Supervisor supervisor;
+    protected final List<String> datasources;
+    boolean suspended;
 
     public TestSupervisorSpec(String id, Supervisor supervisor, List<String> 
datasources)
     {
@@ -773,6 +909,12 @@ public class SupervisorResourceTest extends EasyMockSupport
       this.datasources = datasources;
     }
 
+    public TestSupervisorSpec(String id, Supervisor supervisor, List<String> 
datasources, boolean suspended)
+    {
+      this(id, supervisor, datasources);
+      this.suspended = suspended;
+    }
+
     @Override
     public String getId()
     {
@@ -791,6 +933,25 @@ public class SupervisorResourceTest extends EasyMockSupport
       return datasources;
     }
 
+
+    @Override
+    public SupervisorSpec createSuspendedSpec()
+    {
+      return new TestSupervisorSpec(id, supervisor, datasources, true);
+    }
+
+    @Override
+    public SupervisorSpec createRunningSpec()
+    {
+      return new TestSupervisorSpec(id, supervisor, datasources, false);
+    }
+
+    @Override
+    public boolean isSuspended()
+    {
+      return suspended;
+    }
+
     @Override
     public boolean equals(Object o)
     {
@@ -809,7 +970,10 @@ public class SupervisorResourceTest extends EasyMockSupport
       if (supervisor != null ? !supervisor.equals(that.supervisor) : 
that.supervisor != null) {
         return false;
       }
-      return datasources != null ? datasources.equals(that.datasources) : 
that.datasources == null;
+      if (datasources != null ? !datasources.equals(that.datasources) : 
that.datasources != null) {
+        return false;
+      }
+      return isSuspended() == that.isSuspended();
 
     }
 
diff --git 
a/server/src/main/java/org/apache/druid/indexing/overlord/supervisor/NoopSupervisorSpec.java
 
b/server/src/main/java/org/apache/druid/indexing/overlord/supervisor/NoopSupervisorSpec.java
index c32ea7a..6935a62 100644
--- 
a/server/src/main/java/org/apache/druid/indexing/overlord/supervisor/NoopSupervisorSpec.java
+++ 
b/server/src/main/java/org/apache/druid/indexing/overlord/supervisor/NoopSupervisorSpec.java
@@ -21,6 +21,7 @@ package org.apache.druid.indexing.overlord.supervisor;
 
 import com.fasterxml.jackson.annotation.JsonCreator;
 import com.fasterxml.jackson.annotation.JsonProperty;
+import com.google.common.annotations.VisibleForTesting;
 import org.apache.druid.indexing.overlord.DataSourceMetadata;
 
 import javax.annotation.Nullable;
@@ -44,14 +45,28 @@ public class NoopSupervisorSpec implements SupervisorSpec
   @JsonProperty("id")
   private String id;
 
+  @JsonProperty("suspended")
+  private boolean suspended; //ignored
+
+  @VisibleForTesting
+  public NoopSupervisorSpec(
+      String id,
+      List<String> datasources
+  )
+  {
+    this(id, datasources, null);
+  }
+
   @JsonCreator
   public NoopSupervisorSpec(
       @Nullable @JsonProperty("id") String id,
-      @Nullable @JsonProperty("dataSources") List<String> datasources
+      @Nullable @JsonProperty("dataSources") List<String> datasources,
+      @Nullable @JsonProperty("suspended") Boolean suspended
   )
   {
     this.id = id;
     this.datasources = datasources == null ? new ArrayList<>() : datasources;
+    this.suspended = false; // ignore
   }
 
   @Override
@@ -61,6 +76,22 @@ public class NoopSupervisorSpec implements SupervisorSpec
     return id;
   }
 
+
+  @Override
+  @Nullable
+  @JsonProperty("dataSources")
+  public List<String> getDataSources()
+  {
+    return datasources;
+  }
+
+  @Override
+  @JsonProperty("suspended")
+  public boolean isSuspended()
+  {
+    return suspended;
+  }
+
   @Override
   public Supervisor createSupervisor()
   {
@@ -95,11 +126,15 @@ public class NoopSupervisorSpec implements SupervisorSpec
   }
 
   @Override
-  @Nullable
-  @JsonProperty("dataSources")
-  public List<String> getDataSources()
+  public SupervisorSpec createRunningSpec()
   {
-    return datasources;
+    return new NoopSupervisorSpec(id, datasources);
+  }
+
+  @Override
+  public SupervisorSpec createSuspendedSpec()
+  {
+    return new NoopSupervisorSpec(id, datasources);
   }
 
   @Override
diff --git 
a/server/src/main/java/org/apache/druid/indexing/overlord/supervisor/SupervisorSpec.java
 
b/server/src/main/java/org/apache/druid/indexing/overlord/supervisor/SupervisorSpec.java
index 01bcb4c..5671769 100644
--- 
a/server/src/main/java/org/apache/druid/indexing/overlord/supervisor/SupervisorSpec.java
+++ 
b/server/src/main/java/org/apache/druid/indexing/overlord/supervisor/SupervisorSpec.java
@@ -21,6 +21,7 @@ package org.apache.druid.indexing.overlord.supervisor;
 
 import com.fasterxml.jackson.annotation.JsonSubTypes;
 import com.fasterxml.jackson.annotation.JsonTypeInfo;
+import sun.reflect.generics.reflectiveObjects.NotImplementedException;
 
 import java.util.List;
 
@@ -41,4 +42,19 @@ public interface SupervisorSpec
   Supervisor createSupervisor();
 
   List<String> getDataSources();
+
+  default SupervisorSpec createSuspendedSpec()
+  {
+    throw new NotImplementedException();
+  }
+
+  default SupervisorSpec createRunningSpec()
+  {
+    throw new NotImplementedException();
+  }
+
+  default boolean isSuspended()
+  {
+    return false;
+  }
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@druid.apache.org
For additional commands, e-mail: commits-h...@druid.apache.org

Reply via email to