This is an automated email from the ASF dual-hosted git repository.

abhishek pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/druid.git


The following commit(s) were added to refs/heads/master by this push:
     new 3e13584e0e Adds Idle feature to `SeekableStreamSupervisor` for 
inactive stream (#13144)
3e13584e0e is described below

commit 3e13584e0ec18f2860e6817baf90bf44343769d9
Author: Tejaswini Bandlamudi <[email protected]>
AuthorDate: Wed Oct 12 18:31:08 2022 +0530

    Adds Idle feature to `SeekableStreamSupervisor` for inactive stream (#13144)
    
    * Idle Seekable stream supervisor changes.
    
    * nit
    
    * nit
    
    * nit
    
    * Adds unit tests
    
    * Supervisor decides it's idle state instead of AutoScaler
    
    * docs update
    
    * nit
    
    * nit
    
    * docs update
    
    * Adds Kafka unit test
    
    * Adds Kafka Integration test.
    
    * Updates travis config.
    
    * Updates kafka-indexing-service dependencies.
    
    * updates previous offsets snapshot & doc
    
    * Doesn't act if supervisor is suspended.
    
    * Fixes highest current offsets fetch bug, adds new Kafka UT tests, doc 
changes.
    
    * Reverts Kinesis Supervisor idle behaviour changes.
    
    * nit
    
    * nit
    
    * Corrects SeekableStreamSupervisorSpec check on idle behaviour config, 
adds tests.
    
    * Fixes getHighestCurrentOffsets to fetch offsets of publishing tasks too
    
    * Adds Kafka Supervisor UT
    
    * Improves test coverage in druid-server
    
    * Corrects IT override config
    
    * Doc updates and Syntactic changes
    
    * nit
    
    * supervisorSpec.ioConfig.idleConfig changes
---
 .../extensions-core/kafka-supervisor-operations.md |   5 +-
 .../extensions-core/kafka-supervisor-reference.md  |  18 +-
 .../indexing/kafka/supervisor/KafkaSupervisor.java |  22 +-
 .../kafka/supervisor/KafkaSupervisorIOConfig.java  |   8 +-
 .../druid/indexing/kafka/KafkaSamplerSpecTest.java |   2 +
 .../supervisor/KafkaSupervisorIOConfigTest.java    |  38 ++
 .../kafka/supervisor/KafkaSupervisorTest.java      | 470 ++++++++++++++++++++-
 .../supervisor/KinesisSupervisorIOConfig.java      |   3 +-
 .../seekablestream/supervisor/IdleConfig.java      |  68 +++
 .../supervisor/SeekableStreamSupervisor.java       |  96 ++++-
 .../SeekableStreamSupervisorIOConfig.java          |  13 +-
 .../SeekableStreamSupervisorSpecTest.java          |  84 +++-
 .../SeekableStreamSupervisorStateManagerTest.java  |  37 ++
 .../SeekableStreamSupervisorStateTest.java         |  97 ++++-
 .../tests/indexer/AbstractStreamIndexingTest.java  |  83 +++-
 ...ingServiceNonTransactionalParallelizedTest.java |   6 +
 ..._with_idle_behaviour_enabled_spec_template.json |  62 +++
 .../supervisor/SupervisorStateManager.java         |  19 +-
 .../supervisor/SupervisorStateManagerTest.java     |  44 ++
 19 files changed, 1128 insertions(+), 47 deletions(-)

diff --git a/docs/development/extensions-core/kafka-supervisor-operations.md 
b/docs/development/extensions-core/kafka-supervisor-operations.md
index ffd4135d2b..fe8d1f562b 100644
--- a/docs/development/extensions-core/kafka-supervisor-operations.md
+++ b/docs/development/extensions-core/kafka-supervisor-operations.md
@@ -56,6 +56,7 @@ The list of `detailedState` values and their corresponding 
`state` mapping is as
 |DISCOVERING_INITIAL_TASKS (first iteration only)|RUNNING|The supervisor is 
discovering already-running tasks|
 |CREATING_TASKS (first iteration only)|RUNNING|The supervisor is creating 
tasks and discovering state|
 |RUNNING|RUNNING|The supervisor has started tasks and is waiting for 
taskDuration to elapse|
+|IDLE|IDLE|The supervisor is not creating tasks since the input stream has not 
received any new data and all the existing data is read.|
 |SUSPENDED|SUSPENDED|The supervisor has been suspended|
 |STOPPING|STOPPING|The supervisor is stopping|
 
@@ -68,14 +69,14 @@ On each iteration of the supervisor's run loop, the 
supervisor completes the fol
   4) Handle tasks that have exceeded `taskDuration` and should transition from 
the reading to publishing state.
   5) Handle tasks that have finished publishing and signal redundant replica 
tasks to stop.
   6) Handle tasks that have failed and clean up the supervisor's internal 
state.
-  7) Compare the list of healthy tasks to the requested `taskCount` and 
`replicas` configurations and create additional tasks if required.
+  7) Compare the list of healthy tasks to the requested `taskCount` and 
`replicas` configurations and create additional tasks if required in case 
supervisor is not idle.
 
 The `detailedState` field will show additional values (those marked with 
"first iteration only") the first time the
 supervisor executes this run loop after startup or after resuming from a 
suspension. This is intended to surface
 initialization-type issues, where the supervisor is unable to reach a stable 
state (perhaps because it can't connect to
 Kafka, it can't read from the Kafka topic, or it can't communicate with 
existing tasks). Once the supervisor is stable -
 that is, once it has completed a full execution without encountering any 
issues - `detailedState` will show a `RUNNING`
-state until it is stopped, suspended, or hits a task failure threshold and 
transitions to an unhealthy state.
+state until it is idle, stopped, suspended, or hits a task failure threshold 
and transitions to an unhealthy state.
 
 ## Getting Supervisor Ingestion Stats Report
 
diff --git a/docs/development/extensions-core/kafka-supervisor-reference.md 
b/docs/development/extensions-core/kafka-supervisor-reference.md
index 01a61fe403..f1153c7112 100644
--- a/docs/development/extensions-core/kafka-supervisor-reference.md
+++ b/docs/development/extensions-core/kafka-supervisor-reference.md
@@ -52,6 +52,7 @@ This topic contains configuration reference information for 
the Apache Kafka sup
 |`lateMessageRejectionPeriod`|ISO8601 Period|Configure tasks to reject 
messages with timestamps earlier than this period before the task was created; 
for example if this is set to `PT1H` and the supervisor creates a task at 
*2016-01-01T12:00Z*, messages with timestamps earlier than *2016-01-01T11:00Z* 
will be dropped. This may help prevent concurrency issues if your data stream 
has late messages and you have multiple pipelines that need to operate on the 
same segments (e.g. a realtime an [...]
 |`earlyMessageRejectionPeriod`|ISO8601 Period|Configure tasks to reject 
messages with timestamps later than this period after the task reached its 
taskDuration; for example if this is set to `PT1H`, the taskDuration is set to 
`PT1H` and the supervisor creates a task at *2016-01-01T12:00Z*, messages with 
timestamps later than *2016-01-01T14:00Z* will be dropped. **Note:** Tasks 
sometimes run past their task duration, for example, in cases of supervisor 
failover. Setting earlyMessageReject [...]
 |`autoScalerConfig`|Object|Defines auto scaling behavior for Kafka ingest 
tasks. See [Tasks Autoscaler Properties](#task-autoscaler-properties).|no 
(default == null)|
+|`idleConfig`|Object|Defines how and when Kafka Supervisor can become idle. 
See [Idle Supervisor Configuration](#idle-supervisor-configuration) for more 
details.|no (default == null)|
 
 ## Task Autoscaler Properties
 
@@ -79,7 +80,16 @@ This topic contains configuration reference information for 
the Apache Kafka sup
 | `scaleInStep` | How many tasks to reduce at a time | no (default == 1) |
 | `scaleOutStep` | How many tasks to add at a time | no (default == 2) |
 
-The following example demonstrates supervisor spec with `lagBased` autoScaler 
enabled:
+## Idle Supervisor Configuration
+
+> Note that Idle state transitioning is currently designated as experimental.
+
+| Property | Description | Required |
+| ------------- | ------------- | ------------- |
+| `enabled` | If `true`, Kafka supervisor will become idle if there is no data 
on input stream/topic for some time. | no (default == false) |
+| `inactiveAfterMillis` | Supervisor is marked as idle if all existing data 
has been read from input topic and no new data has been published for 
`inactiveAfterMillis` milliseconds. | no (default == 600000) |
+
+The following example demonstrates supervisor spec with `lagBased` autoScaler 
and idle config enabled:
 ```json
 {
     "type": "kafka",
@@ -114,7 +124,11 @@ The following example demonstrates supervisor spec with 
`lagBased` autoScaler en
          },
          "taskCount":1,
          "replicas":1,
-         "taskDuration":"PT1H"
+         "taskDuration":"PT1H",
+         "idleConfig": {
+           "enabled": true,
+           "inactiveAfterMillis": 600000 
+         }
       },
      "tuningConfig":{
         ...
diff --git 
a/extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/indexing/kafka/supervisor/KafkaSupervisor.java
 
b/extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/indexing/kafka/supervisor/KafkaSupervisor.java
index 9bac92429a..b5ecde675e 100644
--- 
a/extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/indexing/kafka/supervisor/KafkaSupervisor.java
+++ 
b/extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/indexing/kafka/supervisor/KafkaSupervisor.java
@@ -62,9 +62,11 @@ import javax.annotation.Nullable;
 
 import java.util.ArrayList;
 import java.util.Collections;
+import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 import java.util.Map.Entry;
+import java.util.Optional;
 import java.util.Set;
 import java.util.TreeMap;
 import java.util.stream.Collectors;
@@ -274,17 +276,19 @@ public class KafkaSupervisor extends 
SeekableStreamSupervisor<Integer, Long, Kaf
   @SuppressWarnings("SSBasedInspection")
   protected Map<Integer, Long> getRecordLagPerPartition(Map<Integer, Long> 
currentOffsets)
   {
-    return currentOffsets
+    if (latestSequenceFromStream == null) {
+      return Collections.emptyMap();
+    }
+
+    return latestSequenceFromStream
         .entrySet()
         .stream()
         .collect(
             Collectors.toMap(
                 Entry::getKey,
-                e -> latestSequenceFromStream != null
-                     && latestSequenceFromStream.get(e.getKey()) != null
-                     && e.getValue() != null
-                     ? latestSequenceFromStream.get(e.getKey()) - e.getValue()
-                     : Integer.MIN_VALUE
+                e -> e.getValue() != null
+                     ? e.getValue() - 
Optional.ofNullable(currentOffsets.get(e.getKey())).orElse(0L)
+                     : 0
             )
         );
   }
@@ -383,6 +387,12 @@ public class KafkaSupervisor extends 
SeekableStreamSupervisor<Integer, Long, Kaf
     }
   }
 
+  @Override
+  protected Map<Integer, Long> getLatestSequencesFromStream()
+  {
+    return latestSequenceFromStream != null ? latestSequenceFromStream : new 
HashMap<>();
+  }
+
   @Override
   protected String baseTaskName()
   {
diff --git 
a/extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/indexing/kafka/supervisor/KafkaSupervisorIOConfig.java
 
b/extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/indexing/kafka/supervisor/KafkaSupervisorIOConfig.java
index 2d6a800942..f467cc5510 100644
--- 
a/extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/indexing/kafka/supervisor/KafkaSupervisorIOConfig.java
+++ 
b/extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/indexing/kafka/supervisor/KafkaSupervisorIOConfig.java
@@ -24,6 +24,7 @@ import com.fasterxml.jackson.annotation.JsonProperty;
 import com.google.common.base.Preconditions;
 import org.apache.druid.data.input.InputFormat;
 import org.apache.druid.indexing.seekablestream.extension.KafkaConfigOverrides;
+import org.apache.druid.indexing.seekablestream.supervisor.IdleConfig;
 import 
org.apache.druid.indexing.seekablestream.supervisor.SeekableStreamSupervisorIOConfig;
 import 
org.apache.druid.indexing.seekablestream.supervisor.autoscaler.AutoScalerConfig;
 import org.apache.druid.java.util.common.StringUtils;
@@ -63,7 +64,8 @@ public class KafkaSupervisorIOConfig extends 
SeekableStreamSupervisorIOConfig
       @JsonProperty("lateMessageRejectionPeriod") Period 
lateMessageRejectionPeriod,
       @JsonProperty("earlyMessageRejectionPeriod") Period 
earlyMessageRejectionPeriod,
       @JsonProperty("lateMessageRejectionStartDateTime") DateTime 
lateMessageRejectionStartDateTime,
-      @JsonProperty("configOverrides") KafkaConfigOverrides configOverrides
+      @JsonProperty("configOverrides") KafkaConfigOverrides configOverrides,
+      @JsonProperty("idleConfig") IdleConfig idleConfig
   )
   {
     super(
@@ -79,7 +81,8 @@ public class KafkaSupervisorIOConfig extends 
SeekableStreamSupervisorIOConfig
         lateMessageRejectionPeriod,
         earlyMessageRejectionPeriod,
         autoScalerConfig,
-        lateMessageRejectionStartDateTime
+        lateMessageRejectionStartDateTime,
+        idleConfig
     );
 
     this.consumerProperties = Preconditions.checkNotNull(consumerProperties, 
"consumerProperties");
@@ -140,6 +143,7 @@ public class KafkaSupervisorIOConfig extends 
SeekableStreamSupervisorIOConfig
            ", lateMessageRejectionPeriod=" + getLateMessageRejectionPeriod() +
            ", lateMessageRejectionStartDateTime=" + 
getLateMessageRejectionStartDateTime() +
            ", configOverrides=" + getConfigOverrides() +
+           ", idleConfig=" + getIdleConfig() +
            '}';
   }
 
diff --git 
a/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/KafkaSamplerSpecTest.java
 
b/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/KafkaSamplerSpecTest.java
index 89b824778d..abe25b5391 100644
--- 
a/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/KafkaSamplerSpecTest.java
+++ 
b/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/KafkaSamplerSpecTest.java
@@ -144,6 +144,7 @@ public class KafkaSamplerSpecTest extends 
InitializedNullHandlingTest
             null,
             null,
             null,
+            null,
             null
         ),
         null,
@@ -320,6 +321,7 @@ public class KafkaSamplerSpecTest extends 
InitializedNullHandlingTest
             null,
             null,
             null,
+            null,
             null
         ),
         null,
diff --git 
a/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/supervisor/KafkaSupervisorIOConfigTest.java
 
b/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/supervisor/KafkaSupervisorIOConfigTest.java
index 5b231deb31..b6a95769b8 100644
--- 
a/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/supervisor/KafkaSupervisorIOConfigTest.java
+++ 
b/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/supervisor/KafkaSupervisorIOConfigTest.java
@@ -27,6 +27,7 @@ import com.google.common.collect.ImmutableMap;
 import org.apache.druid.indexing.kafka.KafkaConsumerConfigs;
 import org.apache.druid.indexing.kafka.KafkaIndexTaskModule;
 import org.apache.druid.indexing.kafka.KafkaRecordSupplier;
+import org.apache.druid.indexing.seekablestream.supervisor.IdleConfig;
 import 
org.apache.druid.indexing.seekablestream.supervisor.autoscaler.LagBasedAutoScalerConfig;
 import org.apache.druid.jackson.DefaultObjectMapper;
 import org.apache.druid.java.util.common.DateTimes;
@@ -305,6 +306,7 @@ public class KafkaSupervisorIOConfigTest
         null,
         null,
         null,
+        null,
         null
     );
     String ioConfig = mapper.writeValueAsString(kafkaSupervisorIOConfig);
@@ -318,4 +320,40 @@ public class KafkaSupervisorIOConfigTest
         
kafkaSupervisorIOConfig1.getAutoScalerConfig().getMinTriggerScaleActionFrequencyMillis()
     );
   }
+
+  @Test
+  public void testIdleConfigSerde() throws JsonProcessingException
+  {
+    HashMap<String, Object> idleConfig = new HashMap<>();
+    idleConfig.put("enabled", true);
+
+    final Map<String, Object> consumerProperties = 
KafkaConsumerConfigs.getConsumerProperties();
+    consumerProperties.put("bootstrap.servers", "localhost:8082");
+
+    KafkaSupervisorIOConfig kafkaSupervisorIOConfig = new 
KafkaSupervisorIOConfig(
+        "test",
+        null,
+        1,
+        1,
+        new Period("PT1H"),
+        consumerProperties,
+        null,
+        KafkaSupervisorIOConfig.DEFAULT_POLL_TIMEOUT_MILLIS,
+        new Period("P1D"),
+        new Period("PT30S"),
+        true,
+        new Period("PT30M"),
+        null,
+        null,
+        null,
+        null,
+        mapper.convertValue(idleConfig, IdleConfig.class)
+    );
+    String ioConfig = mapper.writeValueAsString(kafkaSupervisorIOConfig);
+    KafkaSupervisorIOConfig kafkaSupervisorIOConfig1 = 
mapper.readValue(ioConfig, KafkaSupervisorIOConfig.class);
+
+    Assert.assertNotNull(kafkaSupervisorIOConfig1.getIdleConfig());
+    Assert.assertTrue(kafkaSupervisorIOConfig1.getIdleConfig().isEnabled());
+    Assert.assertEquals(600000L, 
kafkaSupervisorIOConfig1.getIdleConfig().getInactiveAfterMillis());
+  }
 }
diff --git 
a/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/supervisor/KafkaSupervisorTest.java
 
b/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/supervisor/KafkaSupervisorTest.java
index 9009fd80d3..c30795cdd1 100644
--- 
a/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/supervisor/KafkaSupervisorTest.java
+++ 
b/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/supervisor/KafkaSupervisorTest.java
@@ -67,6 +67,7 @@ import 
org.apache.druid.indexing.seekablestream.SeekableStreamIndexTaskRunner.St
 import 
org.apache.druid.indexing.seekablestream.SeekableStreamIndexTaskTuningConfig;
 import 
org.apache.druid.indexing.seekablestream.SeekableStreamStartSequenceNumbers;
 import org.apache.druid.indexing.seekablestream.common.RecordSupplier;
+import org.apache.druid.indexing.seekablestream.supervisor.IdleConfig;
 import 
org.apache.druid.indexing.seekablestream.supervisor.SeekableStreamSupervisorSpec;
 import 
org.apache.druid.indexing.seekablestream.supervisor.SeekableStreamSupervisorStateManager;
 import org.apache.druid.indexing.seekablestream.supervisor.TaskReportData;
@@ -273,7 +274,7 @@ public class KafkaSupervisorTest extends EasyMockSupport
     autoScalerConfig.put("scaleInThreshold", 1000000);
     autoScalerConfig.put("triggerScaleInFractionThreshold", 0.8);
     autoScalerConfig.put("scaleActionStartDelayMillis", 0);
-    autoScalerConfig.put("scaleActionPeriodMillis", 100);
+    autoScalerConfig.put("scaleActionPeriodMillis", 600);
     autoScalerConfig.put("taskCountMax", 2);
     autoScalerConfig.put("taskCountMin", 1);
     autoScalerConfig.put("scaleInStep", 1);
@@ -300,7 +301,8 @@ public class KafkaSupervisorTest extends EasyMockSupport
             null,
             null,
             null,
-            null
+            null,
+            new IdleConfig(true, 1000L)
     );
 
     final KafkaSupervisorTuningConfig tuningConfigOri = new 
KafkaSupervisorTuningConfig(
@@ -380,7 +382,7 @@ public class KafkaSupervisorTest extends EasyMockSupport
                     null
             )
     ).anyTimes();
-    EasyMock.expect(taskQueue.add(EasyMock.capture(captured))).andReturn(true);
+    
EasyMock.expect(taskQueue.add(EasyMock.capture(captured))).andReturn(true).anyTimes();
     taskRunner.registerListener(EasyMock.anyObject(TaskRunnerListener.class), 
EasyMock.anyObject(Executor.class));
     replayAll();
 
@@ -389,11 +391,13 @@ public class KafkaSupervisorTest extends EasyMockSupport
     Assert.assertEquals(1, taskCountBeforeScale);
     autoscaler.start();
     supervisor.runInternal();
-    Thread.sleep(1 * 1000);
+    Thread.sleep(1000);
+    supervisor.runInternal();
     verifyAll();
 
     int taskCountAfterScale = supervisor.getIoConfig().getTaskCount();
     Assert.assertEquals(2, taskCountAfterScale);
+    Assert.assertEquals(SupervisorStateManager.BasicState.IDLE, 
supervisor.getState());
 
 
     KafkaIndexTask task = captured.getValue();
@@ -1855,6 +1859,425 @@ public class KafkaSupervisorTest extends EasyMockSupport
     
Assert.assertTrue(payload.getOffsetsLastUpdated().plusMinutes(1).isAfterNow());
   }
 
+  @Test
+  public void testSupervisorIsIdleIfStreamInactive() throws Exception
+  {
+    supervisor = getTestableSupervisorForIdleBehaviour(
+        1,
+        2,
+        true,
+        "PT10S",
+        null,
+        null,
+        false,
+        new IdleConfig(true, 200L)
+    );
+
+    addSomeEvents(100);
+
+    final TaskLocation location1 = new TaskLocation("testHost", 1234, -1);
+    final TaskLocation location2 = new TaskLocation("testHost", 234, -1);
+
+    Task id1 = createKafkaIndexTask(
+        "id1",
+        DATASOURCE,
+        0,
+        new SeekableStreamStartSequenceNumbers<>(
+            "topic",
+            ImmutableMap.of(0, 10L, 2, 30L),
+            ImmutableSet.of()
+        ),
+        new SeekableStreamEndSequenceNumbers<>(
+            "topic",
+            ImmutableMap.of(0, Long.MAX_VALUE, 2, Long.MAX_VALUE)
+        ),
+        null,
+        null,
+        supervisor.getTuningConfig()
+    );
+
+    Task id2 = createKafkaIndexTask(
+        "id2",
+        DATASOURCE,
+        0,
+        new SeekableStreamStartSequenceNumbers<>(
+            "topic",
+            ImmutableMap.of(1, 20L),
+            ImmutableSet.of()
+        ),
+        new SeekableStreamEndSequenceNumbers<>(
+            "topic",
+            ImmutableMap.of(1, Long.MAX_VALUE)
+        ),
+        null,
+        null,
+        supervisor.getTuningConfig()
+    );
+
+    List<Task> existingTasks = ImmutableList.of(id1, id2);
+
+    Collection workItems = new ArrayList<>();
+    workItems.add(new TestTaskRunnerWorkItem(id1, null, location1));
+    workItems.add(new TestTaskRunnerWorkItem(id2, null, location2));
+
+    
EasyMock.expect(taskMaster.getTaskQueue()).andReturn(Optional.of(taskQueue)).anyTimes();
+    
EasyMock.expect(taskMaster.getTaskRunner()).andReturn(Optional.of(taskRunner)).anyTimes();
+    
EasyMock.expect(taskRunner.getRunningTasks()).andReturn(workItems).anyTimes();
+    taskRunner.registerListener(EasyMock.anyObject(TaskRunnerListener.class), 
EasyMock.anyObject(Executor.class));
+    
EasyMock.expect(taskStorage.getActiveTasksByDatasource(DATASOURCE)).andReturn(existingTasks).anyTimes();
+    
EasyMock.expect(taskStorage.getStatus("id1")).andReturn(Optional.of(TaskStatus.running("id1"))).anyTimes();
+    
EasyMock.expect(taskStorage.getStatus("id2")).andReturn(Optional.of(TaskStatus.running("id2"))).anyTimes();
+    
EasyMock.expect(taskStorage.getTask("id1")).andReturn(Optional.of(id1)).anyTimes();
+    
EasyMock.expect(taskStorage.getTask("id2")).andReturn(Optional.of(id2)).anyTimes();
+    EasyMock.expect(taskClient.getStatusAsync("id1"))
+            .andReturn(Futures.immediateFuture(Status.READING))
+            .anyTimes();
+    EasyMock.expect(taskClient.getStatusAsync("id2"))
+            .andReturn(Futures.immediateFuture(Status.READING))
+            .anyTimes();
+    EasyMock.expect(taskClient.getStartTimeAsync(EasyMock.anyString()))
+            .andReturn(Futures.immediateFuture(DateTimes.nowUtc()))
+            .anyTimes();
+    
EasyMock.expect(indexerMetadataStorageCoordinator.retrieveDataSourceMetadata(DATASOURCE)).andReturn(
+        new KafkaDataSourceMetadata(
+            null
+        )
+    ).anyTimes();
+
+    replayAll();
+
+    supervisor.start();
+    supervisor.addTaskGroupToActivelyReadingTaskGroup(
+        0,
+        ImmutableMap.of(0, 0L, 2, 0L),
+        Optional.absent(),
+        Optional.absent(),
+        ImmutableSet.of("id1"),
+        ImmutableSet.of()
+    );
+    supervisor.addTaskGroupToActivelyReadingTaskGroup(
+        1,
+        ImmutableMap.of(1, 0L),
+        Optional.absent(),
+        Optional.absent(),
+        ImmutableSet.of("id2"),
+        ImmutableSet.of()
+    );
+    supervisor.updateCurrentAndLatestOffsets();
+    supervisor.runInternal();
+
+    EasyMock.reset(taskClient);
+    
EasyMock.expect(taskClient.getCurrentOffsetsAsync(EasyMock.contains("id1"), 
EasyMock.anyBoolean()))
+            .andReturn(Futures.immediateFuture(ImmutableMap.of(0, 25L, 2, 
45L)));
+    
EasyMock.expect(taskClient.getCurrentOffsetsAsync(EasyMock.contains("id2"), 
EasyMock.anyBoolean()))
+            .andReturn(Futures.immediateFuture(ImmutableMap.of(1, 45L)));
+
+    EasyMock.replay(taskClient);
+
+    Thread.sleep(100);
+    supervisor.updateCurrentAndLatestOffsets();
+    supervisor.runInternal();
+
+    EasyMock.reset(taskClient);
+    
EasyMock.expect(taskClient.getCurrentOffsetsAsync(EasyMock.contains("id1"), 
EasyMock.anyBoolean()))
+            .andReturn(Futures.immediateFuture(ImmutableMap.of(0, 25L, 2, 
45L)));
+    
EasyMock.expect(taskClient.getCurrentOffsetsAsync(EasyMock.contains("id2"), 
EasyMock.anyBoolean()))
+            .andReturn(Futures.immediateFuture(ImmutableMap.of(1, 45L)));
+
+    EasyMock.replay(taskClient);
+
+    Thread.sleep(100);
+    supervisor.updateCurrentAndLatestOffsets();
+    supervisor.runInternal();
+
+    Assert.assertNotEquals(supervisor.getState(), 
SupervisorStateManager.BasicState.IDLE);
+
+    EasyMock.reset(taskClient);
+    
EasyMock.expect(taskClient.getCurrentOffsetsAsync(EasyMock.contains("id1"), 
EasyMock.anyBoolean()))
+            .andReturn(Futures.immediateFuture(ImmutableMap.of(0, 101L, 2, 
101L)));
+    
EasyMock.expect(taskClient.getCurrentOffsetsAsync(EasyMock.contains("id2"), 
EasyMock.anyBoolean()))
+            .andReturn(Futures.immediateFuture(ImmutableMap.of(1, 101L)));
+
+    EasyMock.replay(taskClient);
+
+    Thread.sleep(100);
+    supervisor.updateCurrentAndLatestOffsets();
+    supervisor.runInternal();
+
+    EasyMock.reset(taskClient);
+    
EasyMock.expect(taskClient.getCurrentOffsetsAsync(EasyMock.contains("id1"), 
EasyMock.anyBoolean()))
+            .andReturn(Futures.immediateFuture(ImmutableMap.of(0, 101L, 2, 
101L)));
+    
EasyMock.expect(taskClient.getCurrentOffsetsAsync(EasyMock.contains("id2"), 
EasyMock.anyBoolean()))
+            .andReturn(Futures.immediateFuture(ImmutableMap.of(1, 101L)));
+
+    EasyMock.replay(taskClient);
+
+    Thread.sleep(100);
+    supervisor.updateCurrentAndLatestOffsets();
+    supervisor.runInternal();
+
+    Assert.assertEquals(SupervisorStateManager.BasicState.IDLE, 
supervisor.getState());
+  }
+
+  @Test
+  public void testSupervisorIsIdleIfStreamInactiveWhenNoActiveTasks() throws 
Exception
+  {
+    supervisor = getTestableSupervisorForIdleBehaviour(
+        1,
+        2,
+        true,
+        "PT10S",
+        null,
+        null,
+        false,
+        new IdleConfig(true, 200L)
+    );
+    addSomeEvents(1);
+
+    
EasyMock.expect(taskMaster.getTaskQueue()).andReturn(Optional.of(taskQueue)).anyTimes();
+    
EasyMock.expect(taskMaster.getTaskRunner()).andReturn(Optional.of(taskRunner)).anyTimes();
+    
EasyMock.expect(taskStorage.getActiveTasksByDatasource(DATASOURCE)).andReturn(ImmutableList.of()).anyTimes();
+    taskRunner.registerListener(EasyMock.anyObject(TaskRunnerListener.class), 
EasyMock.anyObject(Executor.class));
+    
EasyMock.expect(indexerMetadataStorageCoordinator.retrieveDataSourceMetadata(DATASOURCE)).andReturn(
+        new KafkaDataSourceMetadata(
+            new SeekableStreamEndSequenceNumbers<Integer, Long>(topic, 
ImmutableMap.of(0, 2L, 1, 2L, 2, 2L))
+        )
+    ).anyTimes();
+    
EasyMock.expect(taskQueue.add(EasyMock.anyObject())).andReturn(true).anyTimes();
+
+    replayAll();
+
+    supervisor.start();
+    supervisor.updateCurrentAndLatestOffsets();
+    supervisor.runInternal();
+    verifyAll();
+
+    Thread.sleep(100);
+    supervisor.updateCurrentAndLatestOffsets();
+    supervisor.runInternal();
+
+    Thread.sleep(100);
+    supervisor.updateCurrentAndLatestOffsets();
+    supervisor.runInternal();
+
+    Thread.sleep(100);
+    supervisor.updateCurrentAndLatestOffsets();
+    supervisor.runInternal();
+
+    Assert.assertEquals(SupervisorStateManager.BasicState.IDLE, 
supervisor.getState());
+  }
+
+  @Test
+  public void testSupervisorNotIdleIfStreamInactiveWhenSuspended() throws 
Exception
+  {
+    supervisor = getTestableSupervisorForIdleBehaviour(
+        1,
+        2,
+        true,
+        "PT10S",
+        null,
+        null,
+        true,
+        new IdleConfig(true, 200L)
+    );
+    addSomeEvents(1);
+
+    
EasyMock.expect(taskMaster.getTaskQueue()).andReturn(Optional.of(taskQueue)).anyTimes();
+    
EasyMock.expect(taskMaster.getTaskRunner()).andReturn(Optional.of(taskRunner)).anyTimes();
+    
EasyMock.expect(taskStorage.getActiveTasksByDatasource(DATASOURCE)).andReturn(ImmutableList.of()).anyTimes();
+    taskRunner.registerListener(EasyMock.anyObject(TaskRunnerListener.class), 
EasyMock.anyObject(Executor.class));
+    
EasyMock.expect(indexerMetadataStorageCoordinator.retrieveDataSourceMetadata(DATASOURCE)).andReturn(
+        new KafkaDataSourceMetadata(
+            new SeekableStreamEndSequenceNumbers<Integer, Long>(topic, 
ImmutableMap.of(0, 2L, 1, 2L, 2, 2L))
+        )
+    ).anyTimes();
+
+    replayAll();
+
+    supervisor.start();
+    supervisor.updateCurrentAndLatestOffsets();
+    supervisor.runInternal();
+
+    Thread.sleep(100);
+    supervisor.updateCurrentAndLatestOffsets();
+    supervisor.runInternal();
+
+    Thread.sleep(100);
+    supervisor.updateCurrentAndLatestOffsets();
+    supervisor.runInternal();
+
+    Thread.sleep(100);
+    supervisor.updateCurrentAndLatestOffsets();
+    supervisor.runInternal();
+
+    Assert.assertEquals(SupervisorStateManager.BasicState.SUSPENDED, 
supervisor.getState());
+  }
+
+  @Test
+  public void 
testSupervisorIsIdleIfStreamInactiveWhenNoActiveTasksAndFewPendingTasks() 
throws Exception
+  {
+    supervisor = getTestableSupervisorForIdleBehaviour(
+        1,
+        2,
+        true,
+        "PT10S",
+        null,
+        null,
+        false,
+        new IdleConfig(true, 200L)
+    );
+
+    addSomeEvents(100);
+
+    final TaskLocation location1 = new TaskLocation("testHost", 1234, -1);
+    final TaskLocation location2 = new TaskLocation("testHost", 234, -1);
+
+    Task id1 = createKafkaIndexTask(
+        "id1",
+        DATASOURCE,
+        0,
+        new SeekableStreamStartSequenceNumbers<>(
+            "topic",
+            ImmutableMap.of(0, 10L, 2, 30L),
+            ImmutableSet.of()
+        ),
+        new SeekableStreamEndSequenceNumbers<>(
+            "topic",
+            ImmutableMap.of(0, Long.MAX_VALUE, 2, Long.MAX_VALUE)
+        ),
+        null,
+        null,
+        supervisor.getTuningConfig()
+    );
+
+    Task id2 = createKafkaIndexTask(
+        "id2",
+        DATASOURCE,
+        0,
+        new SeekableStreamStartSequenceNumbers<>(
+            "topic",
+            ImmutableMap.of(1, 20L),
+            ImmutableSet.of()
+        ),
+        new SeekableStreamEndSequenceNumbers<>(
+            "topic",
+            ImmutableMap.of(1, Long.MAX_VALUE)
+        ),
+        null,
+        null,
+        supervisor.getTuningConfig()
+    );
+
+    List<Task> existingTasks = ImmutableList.of(id1, id2);
+
+    Collection workItems = new ArrayList<>();
+    workItems.add(new TestTaskRunnerWorkItem(id1, null, location1));
+    workItems.add(new TestTaskRunnerWorkItem(id2, null, location2));
+
+    
EasyMock.expect(taskMaster.getTaskQueue()).andReturn(Optional.of(taskQueue)).anyTimes();
+    
EasyMock.expect(taskMaster.getTaskRunner()).andReturn(Optional.of(taskRunner)).anyTimes();
+    
EasyMock.expect(taskRunner.getRunningTasks()).andReturn(workItems).anyTimes();
+    taskRunner.registerListener(EasyMock.anyObject(TaskRunnerListener.class), 
EasyMock.anyObject(Executor.class));
+    
EasyMock.expect(taskStorage.getActiveTasksByDatasource(DATASOURCE)).andReturn(existingTasks).anyTimes();
+    
EasyMock.expect(taskStorage.getStatus("id1")).andReturn(Optional.of(TaskStatus.running("id1"))).anyTimes();
+    
EasyMock.expect(taskStorage.getStatus("id2")).andReturn(Optional.of(TaskStatus.running("id2"))).anyTimes();
+    
EasyMock.expect(taskStorage.getTask("id1")).andReturn(Optional.of(id1)).anyTimes();
+    
EasyMock.expect(taskStorage.getTask("id2")).andReturn(Optional.of(id2)).anyTimes();
+    EasyMock.expect(taskClient.getStatusAsync("id1"))
+            .andReturn(Futures.immediateFuture(Status.READING))
+            .anyTimes();
+    EasyMock.expect(taskClient.getStatusAsync("id2"))
+            .andReturn(Futures.immediateFuture(Status.READING))
+            .anyTimes();
+    EasyMock.expect(taskClient.getStartTimeAsync(EasyMock.anyString()))
+            .andReturn(Futures.immediateFuture(DateTimes.nowUtc()))
+            .anyTimes();
+    
EasyMock.expect(indexerMetadataStorageCoordinator.retrieveDataSourceMetadata(DATASOURCE)).andReturn(
+        new KafkaDataSourceMetadata(
+            null
+        )
+    ).anyTimes();
+
+    replayAll();
+
+    supervisor.start();
+    supervisor.addTaskGroupToActivelyReadingTaskGroup(
+        0,
+        ImmutableMap.of(0, 0L, 2, 0L),
+        Optional.absent(),
+        Optional.absent(),
+        ImmutableSet.of("id1"),
+        ImmutableSet.of()
+    );
+    supervisor.addTaskGroupToActivelyReadingTaskGroup(
+        1,
+        ImmutableMap.of(1, 0L),
+        Optional.absent(),
+        Optional.absent(),
+        ImmutableSet.of("id2"),
+        ImmutableSet.of()
+    );
+    supervisor.updateCurrentAndLatestOffsets();
+    supervisor.runInternal();
+
+    EasyMock.reset(taskClient);
+    
EasyMock.expect(taskClient.getCurrentOffsetsAsync(EasyMock.contains("id1"), 
EasyMock.anyBoolean()))
+            .andReturn(Futures.immediateFuture(ImmutableMap.of(0, 25L, 2, 
45L)));
+    
EasyMock.expect(taskClient.getCurrentOffsetsAsync(EasyMock.contains("id2"), 
EasyMock.anyBoolean()))
+            .andReturn(Futures.immediateFuture(ImmutableMap.of(1, 45L)));
+
+    EasyMock.replay(taskClient);
+
+    Thread.sleep(100);
+    supervisor.updateCurrentAndLatestOffsets();
+    supervisor.runInternal();
+
+    EasyMock.reset(taskClient);
+    
EasyMock.expect(taskClient.getCurrentOffsetsAsync(EasyMock.contains("id1"), 
EasyMock.anyBoolean()))
+            .andReturn(Futures.immediateFuture(ImmutableMap.of(0, 101L, 2, 
101L)));
+    
EasyMock.expect(taskClient.getCurrentOffsetsAsync(EasyMock.contains("id2"), 
EasyMock.anyBoolean()))
+            .andReturn(Futures.immediateFuture(ImmutableMap.of(1, 101L)));
+
+    EasyMock.replay(taskClient);
+
+    Thread.sleep(100);
+    supervisor.updateCurrentAndLatestOffsets();
+    supervisor.runInternal();
+
+    EasyMock.reset(taskClient);
+    
EasyMock.expect(taskClient.getCurrentOffsetsAsync(EasyMock.contains("id1"), 
EasyMock.anyBoolean()))
+            .andReturn(Futures.immediateFuture(ImmutableMap.of(0, 101L, 2, 
101L)));
+    
EasyMock.expect(taskClient.getCurrentOffsetsAsync(EasyMock.contains("id2"), 
EasyMock.anyBoolean()))
+            .andReturn(Futures.immediateFuture(ImmutableMap.of(1, 101L)));
+
+    EasyMock.replay(taskClient);
+
+    Thread.sleep(100);
+    supervisor.updateCurrentAndLatestOffsets();
+    supervisor.runInternal();
+
+    Assert.assertEquals(SupervisorStateManager.BasicState.IDLE, 
supervisor.getState());
+
+    supervisor.moveTaskGroupToPendingCompletion(0);
+    supervisor.moveTaskGroupToPendingCompletion(1);
+
+    Assert.assertEquals(0, supervisor.getActiveTaskGroupsCount());
+
+    EasyMock.reset(taskClient);
+    
EasyMock.expect(taskClient.getCurrentOffsetsAsync(EasyMock.contains("id1"), 
EasyMock.anyBoolean()))
+            .andReturn(Futures.immediateFuture(ImmutableMap.of(0, 101L, 2, 
101L)));
+    
EasyMock.expect(taskClient.getCurrentOffsetsAsync(EasyMock.contains("id2"), 
EasyMock.anyBoolean()))
+            .andReturn(Futures.immediateFuture(ImmutableMap.of(1, 101L)));
+
+    EasyMock.replay(taskClient);
+
+    Thread.sleep(100);
+    supervisor.updateCurrentAndLatestOffsets();
+    supervisor.runInternal();
+
+    Assert.assertEquals(SupervisorStateManager.BasicState.IDLE, 
supervisor.getState());
+  }
+
   @Test
   public void testKillUnresponsiveTasksWhileGettingStartTime() throws Exception
   {
@@ -3595,7 +4018,8 @@ public class KafkaSupervisorTest extends EasyMockSupport
         lateMessageRejectionPeriod,
         earlyMessageRejectionPeriod,
         false,
-        kafkaHost
+        kafkaHost,
+        null
     );
   }
 
@@ -3619,7 +4043,33 @@ public class KafkaSupervisorTest extends EasyMockSupport
         lateMessageRejectionPeriod,
         earlyMessageRejectionPeriod,
         suspended,
-        kafkaHost
+        kafkaHost,
+        null
+    );
+  }
+
+  private TestableKafkaSupervisor getTestableSupervisorForIdleBehaviour(
+      int replicas,
+      int taskCount,
+      boolean useEarliestOffset,
+      String duration,
+      Period lateMessageRejectionPeriod,
+      Period earlyMessageRejectionPeriod,
+      boolean suspended,
+      IdleConfig idleConfig
+  )
+  {
+    return getTestableSupervisor(
+        replicas,
+        taskCount,
+        useEarliestOffset,
+        false,
+        duration,
+        lateMessageRejectionPeriod,
+        earlyMessageRejectionPeriod,
+        suspended,
+        kafkaHost,
+        idleConfig
     );
   }
 
@@ -3632,7 +4082,8 @@ public class KafkaSupervisorTest extends EasyMockSupport
       Period lateMessageRejectionPeriod,
       Period earlyMessageRejectionPeriod,
       boolean suspended,
-      String kafkaHost
+      String kafkaHost,
+      IdleConfig idleConfig
   )
   {
     final Map<String, Object> consumerProperties = 
KafkaConsumerConfigs.getConsumerProperties();
@@ -3654,7 +4105,8 @@ public class KafkaSupervisorTest extends EasyMockSupport
         lateMessageRejectionPeriod,
         earlyMessageRejectionPeriod,
         null,
-        null
+        null,
+        idleConfig
     );
 
     KafkaIndexTaskClientFactory taskClientFactory = new 
KafkaIndexTaskClientFactory(
@@ -3767,6 +4219,7 @@ public class KafkaSupervisorTest extends EasyMockSupport
         lateMessageRejectionPeriod,
         earlyMessageRejectionPeriod,
         null,
+        null,
         null
     );
 
@@ -3884,6 +4337,7 @@ public class KafkaSupervisorTest extends EasyMockSupport
         lateMessageRejectionPeriod,
         earlyMessageRejectionPeriod,
         null,
+        null,
         null
     );
 
diff --git 
a/extensions-core/kinesis-indexing-service/src/main/java/org/apache/druid/indexing/kinesis/supervisor/KinesisSupervisorIOConfig.java
 
b/extensions-core/kinesis-indexing-service/src/main/java/org/apache/druid/indexing/kinesis/supervisor/KinesisSupervisorIOConfig.java
index 220c22fcf4..38d92fb880 100644
--- 
a/extensions-core/kinesis-indexing-service/src/main/java/org/apache/druid/indexing/kinesis/supervisor/KinesisSupervisorIOConfig.java
+++ 
b/extensions-core/kinesis-indexing-service/src/main/java/org/apache/druid/indexing/kinesis/supervisor/KinesisSupervisorIOConfig.java
@@ -90,7 +90,8 @@ public class KinesisSupervisorIOConfig extends 
SeekableStreamSupervisorIOConfig
         lateMessageRejectionPeriod,
         earlyMessageRejectionPeriod,
         autoScalerConfig,
-        lateMessageRejectionStartDateTime
+        lateMessageRejectionStartDateTime,
+        null
     );
 
     this.endpoint = endpoint != null
diff --git 
a/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/IdleConfig.java
 
b/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/IdleConfig.java
new file mode 100644
index 0000000000..6b017399dd
--- /dev/null
+++ 
b/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/IdleConfig.java
@@ -0,0 +1,68 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.indexing.seekablestream.supervisor;
+
+import com.fasterxml.jackson.annotation.JsonCreator;
+import com.fasterxml.jackson.annotation.JsonProperty;
+import com.google.common.base.Preconditions;
+
+import javax.annotation.Nullable;
+
+public class IdleConfig
+{
+  private final boolean enabled;
+  private final long inactiveAfterMillis;
+
+  @JsonCreator
+  public IdleConfig(
+      @Nullable @JsonProperty("enabled") Boolean enabled,
+      @Nullable @JsonProperty("inactiveAfterMillis") Long inactiveAfterMillis
+  )
+  {
+    this.enabled = enabled != null && enabled;
+    this.inactiveAfterMillis = inactiveAfterMillis != null
+                               ? inactiveAfterMillis
+                               : 600_000L;
+
+    Preconditions.checkArgument(this.inactiveAfterMillis > 0,
+                                "inactiveAfterMillis should be a postive 
number");
+  }
+
+  @JsonProperty
+  public boolean isEnabled()
+  {
+    return this.enabled;
+  }
+
+  @JsonProperty
+  public long getInactiveAfterMillis()
+  {
+    return this.inactiveAfterMillis;
+  }
+
+  @Override
+  public String toString()
+  {
+    return "idleConfig{" +
+           "enabled=" + enabled +
+           ", inactiveAfterMillis=" + inactiveAfterMillis +
+           '}';
+  }
+}
diff --git 
a/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisor.java
 
b/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisor.java
index 1093584d11..586c0cb1e7 100644
--- 
a/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisor.java
+++ 
b/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisor.java
@@ -743,6 +743,10 @@ public abstract class 
SeekableStreamSupervisor<PartitionIdType, SequenceOffsetTy
   private volatile boolean lifecycleStarted = false;
   private final ServiceEmitter emitter;
 
+  // snapshots latest sequences from stream to be verified in next run cycle 
of inactive stream check
+  private final Map<PartitionIdType, SequenceOffsetType> 
previousSequencesFromStream = new HashMap<>();
+  private long lastActiveTimeMillis;
+
   public SeekableStreamSupervisor(
       final String supervisorId,
       final TaskStorage taskStorage,
@@ -1454,6 +1458,8 @@ public abstract class 
SeekableStreamSupervisor<PartitionIdType, SequenceOffsetTy
 
       checkCurrentTaskState();
 
+      checkIfStreamInactiveAndTurnSupervisorIdle();
+
       // If supervisor is already stopping, don't contend for stateChangeLock 
since the block can be skipped
       if 
(stateManager.getSupervisorState().getBasicState().equals(SupervisorStateManager.BasicState.STOPPING))
 {
         generateAndLogReport();
@@ -1466,6 +1472,8 @@ public abstract class 
SeekableStreamSupervisor<PartitionIdType, SequenceOffsetTy
         if 
(stateManager.getSupervisorState().getBasicState().equals(SupervisorStateManager.BasicState.STOPPING))
 {
           // if we're already terminating, don't do anything here, the 
terminate already handles shutdown
           log.info("[%s] supervisor is already stopping.", dataSource);
+        } else if (stateManager.isIdle()) {
+          log.info("[%s] supervisor is idle.", dataSource);
         } else if (!spec.isSuspended()) {
           log.info("[%s] supervisor is running.", dataSource);
 
@@ -2502,11 +2510,19 @@ public abstract class 
SeekableStreamSupervisor<PartitionIdType, SequenceOffsetTy
     return true;
   }
 
+  /**
+   * gets mapping of partitions in stream to their latest offsets.
+   */
+  protected Map<PartitionIdType, SequenceOffsetType> 
getLatestSequencesFromStream()
+  {
+    return new HashMap<>();
+  }
+
   private void assignRecordSupplierToPartitionIds()
   {
     recordSupplierLock.lock();
     try {
-      final Set partitions = partitionIds.stream()
+      final Set<StreamPartition<PartitionIdType>> partitions = 
partitionIds.stream()
                                          .map(partitionId -> new 
StreamPartition<>(ioConfig.getStream(), partitionId))
                                          .collect(Collectors.toSet());
       if (!recordSupplier.getAssignment().containsAll(partitions)) {
@@ -3244,6 +3260,44 @@ public abstract class 
SeekableStreamSupervisor<PartitionIdType, SequenceOffsetTy
     Futures.successfulAsList(futures).get(futureTimeoutInSeconds, 
TimeUnit.SECONDS);
   }
 
+  private void checkIfStreamInactiveAndTurnSupervisorIdle()
+  {
+    IdleConfig idleConfig = spec.getIoConfig().getIdleConfig();
+    if ((idleConfig == null || !idleConfig.isEnabled()) || spec.isSuspended()) 
{
+      return;
+    }
+
+    Map<PartitionIdType, SequenceOffsetType> latestSequencesFromStream = 
getLatestSequencesFromStream();
+    long nowTime = Instant.now().toEpochMilli();
+    boolean idle;
+    long idleTime;
+
+    if (lastActiveTimeMillis > 0
+        && previousSequencesFromStream.equals(latestSequencesFromStream)
+        && computeTotalLag() == 0) {
+      idleTime = nowTime - lastActiveTimeMillis;
+      idle = true;
+    } else {
+      idleTime = 0L;
+      lastActiveTimeMillis = nowTime;
+      idle = false;
+    }
+
+    previousSequencesFromStream.clear();
+    previousSequencesFromStream.putAll(latestSequencesFromStream);
+    if (!idle) {
+      stateManager.maybeSetState(SupervisorStateManager.BasicState.RUNNING);
+    } else if (!stateManager.isIdle() && idleTime > 
idleConfig.getInactiveAfterMillis()) {
+      stateManager.maybeSetState(SupervisorStateManager.BasicState.IDLE);
+    }
+  }
+
+  private long computeTotalLag()
+  {
+    LagStats lagStats = computeLagStats();
+    return lagStats != null ? lagStats.getTotalLag() : 0;
+  }
+
   /**
    * If the seekable stream system supported by this supervisor allows for 
partition expiration, expired partitions
    * should be removed from the starting offsets sent to the tasks.
@@ -3595,7 +3649,7 @@ public abstract class 
SeekableStreamSupervisor<PartitionIdType, SequenceOffsetTy
     // if we aren't in a steady state, chill out for a bit, don't worry, we'll 
get called later, but if we aren't
     // healthy go ahead and try anyway to try if possible to provide insight 
into how much time is left to fix the
     // issue for cluster operators since this feeds the lag metrics
-    if (stateManager.isSteadyState() || !stateManager.isHealthy()) {
+    if (stateManager.isIdle() || stateManager.isSteadyState() || 
!stateManager.isHealthy()) {
       try {
         updateCurrentOffsets();
         updatePartitionLagFromStream();
@@ -3646,27 +3700,41 @@ public abstract class 
SeekableStreamSupervisor<PartitionIdType, SequenceOffsetTy
   @Nullable
   protected abstract Map<PartitionIdType, Long> getPartitionTimeLag();
 
+  /**
+   * Gets highest current offsets of all the tasks (actively reading and 
publishing) for all partitions of the stream.
+   * In case if no task is reading for a partition, returns offset stored in 
metadata storage for that partition.
+   * In case of no active and publishing task groups, returns offsets stored 
in metadata storage.
+   * Used to compute lag by comparing with latest offsets from stream for 
reporting and determining idleness.
+   */
   protected Map<PartitionIdType, SequenceOffsetType> getHighestCurrentOffsets()
   {
+    Map<PartitionIdType, SequenceOffsetType> offsetsFromMetadataStorage = 
getOffsetsFromMetadataStorage();
     if (!spec.isSuspended()) {
       if (activelyReadingTaskGroups.size() > 0 || 
pendingCompletionTaskGroups.size() > 0) {
-        return activelyReadingTaskGroups
-            .values()
-            .stream()
-            .flatMap(taskGroup -> taskGroup.tasks.entrySet().stream())
-            .flatMap(taskData -> 
taskData.getValue().currentSequences.entrySet().stream())
-            .collect(Collectors.toMap(
+        Map<PartitionIdType, SequenceOffsetType> currentOffsets =
+            Stream.concat(
+                activelyReadingTaskGroups
+                    .values()
+                    .stream()
+                    .flatMap(taskGroup -> taskGroup.tasks.entrySet().stream())
+                    .flatMap(taskData -> 
taskData.getValue().currentSequences.entrySet().stream()),
+                pendingCompletionTaskGroups
+                    .values()
+                    .stream()
+                    .flatMap(taskGroups -> 
taskGroups.stream().flatMap(taskGroup -> taskGroup.tasks.entrySet().stream()))
+                    .flatMap(taskData -> 
taskData.getValue().currentSequences.entrySet().stream())
+            ).collect(Collectors.toMap(
                 Entry::getKey,
                 Entry::getValue,
                 (v1, v2) -> 
makeSequenceNumber(v1).compareTo(makeSequenceNumber(v2)) > 0 ? v1 : v2
             ));
+
+        partitionIds.forEach(partitionId -> 
currentOffsets.putIfAbsent(partitionId, 
offsetsFromMetadataStorage.get(partitionId)));
+        return currentOffsets;
       }
-      // nothing is running but we are not suspended, so lets just hang out in 
case we get called while things start up
-      return ImmutableMap.of();
-    } else {
-      // if supervisor is suspended, no tasks are likely running so use 
offsets in metadata, if exist
-      return getOffsetsFromMetadataStorage();
     }
+    // if supervisor is suspended or is idle and nothing is running, use 
offsets in metadata, if exist
+    return offsetsFromMetadataStorage;
   }
 
   private OrderedSequenceNumber<SequenceOffsetType> 
makeSequenceNumber(SequenceOffsetType seq)
@@ -3977,7 +4045,7 @@ public abstract class 
SeekableStreamSupervisor<PartitionIdType, SequenceOffsetTy
 
   protected void emitLag()
   {
-    if (spec.isSuspended() || !stateManager.isSteadyState()) {
+    if (spec.isSuspended() || !(stateManager.isSteadyState() || 
stateManager.isIdle())) {
       // don't emit metrics if supervisor is suspended or not in a healthy 
running state
       // (lag should still available in status report)
       return;
diff --git 
a/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisorIOConfig.java
 
b/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisorIOConfig.java
index 6ddb018d22..700236a741 100644
--- 
a/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisorIOConfig.java
+++ 
b/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisorIOConfig.java
@@ -48,6 +48,7 @@ public abstract class SeekableStreamSupervisorIOConfig
   private final Optional<Duration> earlyMessageRejectionPeriod;
   private final Optional<DateTime> lateMessageRejectionStartDateTime;
   @Nullable private final AutoScalerConfig autoScalerConfig;
+  @Nullable private final IdleConfig idleConfig;
 
   public SeekableStreamSupervisorIOConfig(
       String stream,
@@ -62,7 +63,8 @@ public abstract class SeekableStreamSupervisorIOConfig
       Period lateMessageRejectionPeriod,
       Period earlyMessageRejectionPeriod,
       @Nullable AutoScalerConfig autoScalerConfig,
-      DateTime lateMessageRejectionStartDateTime
+      DateTime lateMessageRejectionStartDateTime,
+      @Nullable IdleConfig idleConfig
   )
   {
     this.stream = Preconditions.checkNotNull(stream, "stream cannot be null");
@@ -97,6 +99,8 @@ public abstract class SeekableStreamSupervisorIOConfig
                 + "both properties lateMessageRejectionStartDateTime "
           + "and lateMessageRejectionPeriod.");
     }
+
+    this.idleConfig = idleConfig;
   }
 
   private static Duration defaultDuration(final Period period, final String 
theDefault)
@@ -188,4 +192,11 @@ public abstract class SeekableStreamSupervisorIOConfig
   {
     return lateMessageRejectionStartDateTime;
   }
+
+  @Nullable
+  @JsonProperty
+  public IdleConfig getIdleConfig()
+  {
+    return idleConfig;
+  }
 }
diff --git 
a/indexing-service/src/test/java/org/apache/druid/indexing/seekablestream/SeekableStreamSupervisorSpecTest.java
 
b/indexing-service/src/test/java/org/apache/druid/indexing/seekablestream/SeekableStreamSupervisorSpecTest.java
index 7116b88137..7f67393fc9 100644
--- 
a/indexing-service/src/test/java/org/apache/druid/indexing/seekablestream/SeekableStreamSupervisorSpecTest.java
+++ 
b/indexing-service/src/test/java/org/apache/druid/indexing/seekablestream/SeekableStreamSupervisorSpecTest.java
@@ -40,6 +40,7 @@ import 
org.apache.druid.indexing.overlord.supervisor.autoscaler.LagStats;
 import 
org.apache.druid.indexing.overlord.supervisor.autoscaler.SupervisorTaskAutoScaler;
 import org.apache.druid.indexing.seekablestream.common.OrderedSequenceNumber;
 import org.apache.druid.indexing.seekablestream.common.RecordSupplier;
+import org.apache.druid.indexing.seekablestream.supervisor.IdleConfig;
 import 
org.apache.druid.indexing.seekablestream.supervisor.SeekableStreamSupervisor;
 import 
org.apache.druid.indexing.seekablestream.supervisor.SeekableStreamSupervisorIOConfig;
 import 
org.apache.druid.indexing.seekablestream.supervisor.SeekableStreamSupervisorIngestionSpec;
@@ -77,6 +78,7 @@ import java.util.ArrayList;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
+import java.util.Objects;
 import java.util.Set;
 import java.util.TreeMap;
 import java.util.concurrent.ScheduledExecutorService;
@@ -865,7 +867,10 @@ public class SeekableStreamSupervisorSpecTest extends 
EasyMockSupport
         false,
         new Period("PT30M"),
         null,
-        null, null, null
+        null,
+        null,
+        null,
+        null
     )
     {
     };
@@ -902,6 +907,75 @@ public class SeekableStreamSupervisorSpecTest extends 
EasyMockSupport
     autoScaler.stop();
   }
 
+  @Test
+  public void testEnablingIdleBeviourPerSupervisorWithOverlordConfigEnabled()
+  {
+    SeekableStreamSupervisorIOConfig seekableStreamSupervisorIOConfig = new 
SeekableStreamSupervisorIOConfig(
+        "stream",
+        new JsonInputFormat(new JSONPathSpec(true, ImmutableList.of()), 
ImmutableMap.of(), false, false, false),
+        1,
+        1,
+        new Period("PT1H"),
+        new Period("P1D"),
+        new Period("PT30S"),
+        false,
+        new Period("PT30M"),
+        null,
+        null,
+        null,
+        null,
+        new IdleConfig(true, null)
+    ){
+    };
+
+    
EasyMock.expect(ingestionSchema.getIOConfig()).andReturn(seekableStreamSupervisorIOConfig).anyTimes();
+    
EasyMock.expect(ingestionSchema.getDataSchema()).andReturn(dataSchema).anyTimes();
+    EasyMock.replay(ingestionSchema);
+
+    spec = new SeekableStreamSupervisorSpec(
+        ingestionSchema,
+        null,
+        null,
+        null,
+        null,
+        null,
+        null,
+        null,
+        null,
+        null,
+        null,
+        supervisorStateManagerConfig
+    )
+    {
+      @Override
+      public Supervisor createSupervisor()
+      {
+        return null;
+      }
+
+      @Override
+      protected SeekableStreamSupervisorSpec toggleSuspend(boolean suspend)
+      {
+        return null;
+      }
+
+      @Override
+      public String getType()
+      {
+        return null;
+      }
+
+      @Override
+      public String getSource()
+      {
+        return null;
+      }
+    };
+
+    
Assert.assertTrue(Objects.requireNonNull(spec.getIoConfig().getIdleConfig()).isEnabled());
+    Assert.assertEquals(600000L, 
spec.getIoConfig().getIdleConfig().getInactiveAfterMillis());
+  }
+
   private static DataSchema getDataSchema()
   {
     List<DimensionSchema> dimensions = new ArrayList<>();
@@ -937,7 +1011,9 @@ public class SeekableStreamSupervisorSpecTest extends 
EasyMockSupport
           new Period("PT30M"),
           null,
           null,
-          mapper.convertValue(getScaleOutProperties(2), 
AutoScalerConfig.class), null
+          mapper.convertValue(getScaleOutProperties(2), 
AutoScalerConfig.class),
+          null,
+          null
       )
       {
       };
@@ -954,7 +1030,9 @@ public class SeekableStreamSupervisorSpecTest extends 
EasyMockSupport
           new Period("PT30M"),
           null,
           null,
-          mapper.convertValue(getScaleInProperties(), AutoScalerConfig.class), 
null
+          mapper.convertValue(getScaleInProperties(), AutoScalerConfig.class),
+          null,
+          null
       )
       {
       };
diff --git 
a/indexing-service/src/test/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisorStateManagerTest.java
 
b/indexing-service/src/test/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisorStateManagerTest.java
index e19dc6354d..4d4eaccb60 100644
--- 
a/indexing-service/src/test/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisorStateManagerTest.java
+++ 
b/indexing-service/src/test/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisorStateManagerTest.java
@@ -98,6 +98,43 @@ public class SeekableStreamSupervisorStateManagerTest
     Assert.assertEquals(BasicState.RUNNING, 
stateManager.getSupervisorState().getBasicState());
   }
 
+  @Test
+  public void testIdlePath()
+  {
+    Assert.assertEquals(BasicState.PENDING, stateManager.getSupervisorState());
+    Assert.assertEquals(BasicState.PENDING, 
stateManager.getSupervisorState().getBasicState());
+
+    
stateManager.maybeSetState(SeekableStreamSupervisorStateManager.SeekableStreamState.CONNECTING_TO_STREAM);
+    Assert.assertEquals(SeekableStreamState.CONNECTING_TO_STREAM, 
stateManager.getSupervisorState());
+    Assert.assertEquals(BasicState.RUNNING, 
stateManager.getSupervisorState().getBasicState());
+
+    stateManager.maybeSetState(SeekableStreamState.DISCOVERING_INITIAL_TASKS);
+    Assert.assertEquals(SeekableStreamState.DISCOVERING_INITIAL_TASKS, 
stateManager.getSupervisorState());
+    Assert.assertEquals(BasicState.RUNNING, 
stateManager.getSupervisorState().getBasicState());
+
+    stateManager.maybeSetState(SeekableStreamState.CREATING_TASKS);
+    Assert.assertEquals(SeekableStreamState.CREATING_TASKS, 
stateManager.getSupervisorState());
+    Assert.assertEquals(BasicState.RUNNING, 
stateManager.getSupervisorState().getBasicState());
+
+    stateManager.markRunFinished();
+    Assert.assertEquals(BasicState.RUNNING, stateManager.getSupervisorState());
+    Assert.assertEquals(BasicState.RUNNING, 
stateManager.getSupervisorState().getBasicState());
+
+    // Emulates submitting Idle notice
+    stateManager.maybeSetState(BasicState.IDLE);
+    Assert.assertEquals(BasicState.IDLE, stateManager.getSupervisorState());
+    Assert.assertEquals(BasicState.IDLE, 
stateManager.getSupervisorState().getBasicState());
+
+    // Stay in idle state when supervisor is running until or unless it is 
specifically set to a different state
+    stateManager.markRunFinished();
+    Assert.assertEquals(BasicState.IDLE, stateManager.getSupervisorState());
+    Assert.assertEquals(BasicState.IDLE, 
stateManager.getSupervisorState().getBasicState());
+
+    stateManager.maybeSetState(BasicState.RUNNING);
+    Assert.assertEquals(BasicState.RUNNING, stateManager.getSupervisorState());
+    Assert.assertEquals(BasicState.RUNNING, 
stateManager.getSupervisorState().getBasicState());
+  }
+
   @Test
   public void testStoppingPath()
   {
diff --git 
a/indexing-service/src/test/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisorStateTest.java
 
b/indexing-service/src/test/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisorStateTest.java
index 8d56b757df..ac767a4e90 100644
--- 
a/indexing-service/src/test/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisorStateTest.java
+++ 
b/indexing-service/src/test/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisorStateTest.java
@@ -465,6 +465,95 @@ public class SeekableStreamSupervisorStateTest extends 
EasyMockSupport
     verifyAll();
   }
 
+  @Test
+  public void testIdleStateTransition() throws Exception
+  {
+    EasyMock.reset(spec);
+    EasyMock.expect(spec.isSuspended()).andReturn(false).anyTimes();
+    
EasyMock.expect(spec.getDataSchema()).andReturn(getDataSchema()).anyTimes();
+    EasyMock.expect(spec.getIoConfig()).andReturn(new 
SeekableStreamSupervisorIOConfig(
+        "stream",
+        new JsonInputFormat(new JSONPathSpec(true, ImmutableList.of()), 
ImmutableMap.of(), false, false, false),
+        1,
+        1,
+        new Period("PT1H"),
+        new Period("PT1S"),
+        new Period("PT30S"),
+        false,
+        new Period("PT30M"),
+        null,
+        null,
+        null,
+        null,
+        new IdleConfig(true, 200L)
+    )
+    {
+    }).anyTimes();
+    
EasyMock.expect(spec.getTuningConfig()).andReturn(getTuningConfig()).anyTimes();
+    EasyMock.expect(spec.getEmitter()).andReturn(emitter).anyTimes();
+    EasyMock.expect(spec.getMonitorSchedulerConfig()).andReturn(new 
DruidMonitorSchedulerConfig() {
+      @Override
+      public Duration getEmitterPeriod()
+      {
+        return new Period("PT1S").toStandardDuration();
+      }
+    }).anyTimes();
+    EasyMock.expect(spec.getType()).andReturn("test").anyTimes();
+    
EasyMock.expect(spec.getSupervisorStateManagerConfig()).andReturn(supervisorConfig).anyTimes();
+    
EasyMock.expect(recordSupplier.getPartitionIds(STREAM)).andReturn(ImmutableSet.of(SHARD_ID)).anyTimes();
+    
EasyMock.expect(taskStorage.getActiveTasksByDatasource(DATASOURCE)).andReturn(ImmutableList.of()).anyTimes();
+    
EasyMock.expect(taskQueue.add(EasyMock.anyObject())).andReturn(true).anyTimes();
+
+    replayAll();
+
+    SeekableStreamSupervisor supervisor = new TestSeekableStreamSupervisor();
+
+    supervisor.start();
+
+    Assert.assertTrue(supervisor.stateManager.isHealthy());
+    Assert.assertEquals(BasicState.PENDING, 
supervisor.stateManager.getSupervisorState());
+    Assert.assertEquals(BasicState.PENDING, 
supervisor.stateManager.getSupervisorState().getBasicState());
+    Assert.assertTrue(supervisor.stateManager.getExceptionEvents().isEmpty());
+    Assert.assertFalse(supervisor.stateManager.isAtLeastOneSuccessfulRun());
+
+    supervisor.runInternal();
+
+    Assert.assertTrue(supervisor.stateManager.isHealthy());
+    Assert.assertEquals(BasicState.RUNNING, 
supervisor.stateManager.getSupervisorState());
+    Assert.assertEquals(BasicState.RUNNING, 
supervisor.stateManager.getSupervisorState().getBasicState());
+    Assert.assertTrue(supervisor.stateManager.getExceptionEvents().isEmpty());
+    Assert.assertTrue(supervisor.stateManager.isAtLeastOneSuccessfulRun());
+
+    Thread.sleep(100L);
+    supervisor.runInternal();
+
+    Assert.assertTrue(supervisor.stateManager.isHealthy());
+    Assert.assertEquals(BasicState.RUNNING, 
supervisor.stateManager.getSupervisorState());
+    Assert.assertEquals(BasicState.RUNNING, 
supervisor.stateManager.getSupervisorState().getBasicState());
+    Assert.assertTrue(supervisor.stateManager.getExceptionEvents().isEmpty());
+    Assert.assertTrue(supervisor.stateManager.isAtLeastOneSuccessfulRun());
+
+    Thread.sleep(100L);
+    supervisor.runInternal();
+
+    Assert.assertTrue(supervisor.stateManager.isHealthy());
+    Assert.assertEquals(BasicState.IDLE, 
supervisor.stateManager.getSupervisorState());
+    Assert.assertEquals(BasicState.IDLE, 
supervisor.stateManager.getSupervisorState().getBasicState());
+    Assert.assertTrue(supervisor.stateManager.getExceptionEvents().isEmpty());
+    Assert.assertTrue(supervisor.stateManager.isAtLeastOneSuccessfulRun());
+
+    Thread.sleep(100L);
+    supervisor.runInternal();
+
+    Assert.assertTrue(supervisor.stateManager.isHealthy());
+    Assert.assertEquals(BasicState.IDLE, 
supervisor.stateManager.getSupervisorState());
+    Assert.assertEquals(BasicState.IDLE, 
supervisor.stateManager.getSupervisorState().getBasicState());
+    Assert.assertTrue(supervisor.stateManager.getExceptionEvents().isEmpty());
+    Assert.assertTrue(supervisor.stateManager.isAtLeastOneSuccessfulRun());
+
+    verifyAll();
+  }
+
   @Test
   public void testCreatingTasksFailRecoveryFail() throws Exception
   {
@@ -908,6 +997,7 @@ public class SeekableStreamSupervisorStateTest extends 
EasyMockSupport
         null,
         null,
         null,
+        null,
         null
     )
     {
@@ -964,7 +1054,10 @@ public class SeekableStreamSupervisorStateTest extends 
EasyMockSupport
         false,
         new Period("PT30M"),
         null,
-        null, OBJECT_MAPPER.convertValue(getProperties(), 
AutoScalerConfig.class), null
+        null,
+        OBJECT_MAPPER.convertValue(getProperties(), AutoScalerConfig.class),
+        null,
+        null
     )
     {
     };
@@ -1339,7 +1432,7 @@ public class SeekableStreamSupervisorStateTest extends 
EasyMockSupport
     @Override
     public LagStats computeLagStats()
     {
-      return null;
+      return new LagStats(0, 0, 0);
     }
   }
 
diff --git 
a/integration-tests/src/test/java/org/apache/druid/tests/indexer/AbstractStreamIndexingTest.java
 
b/integration-tests/src/test/java/org/apache/druid/tests/indexer/AbstractStreamIndexingTest.java
index c98d249803..9ea1b5c402 100644
--- 
a/integration-tests/src/test/java/org/apache/druid/tests/indexer/AbstractStreamIndexingTest.java
+++ 
b/integration-tests/src/test/java/org/apache/druid/tests/indexer/AbstractStreamIndexingTest.java
@@ -79,12 +79,16 @@ public abstract class AbstractStreamIndexingTest extends 
AbstractIndexerTest
   private static final String QUERIES_FILE = 
"/stream/queries/stream_index_queries.json";
   private static final String SUPERVISOR_SPEC_TEMPLATE_FILE = 
"supervisor_spec_template.json";
   private static final String SUPERVISOR_WITH_AUTOSCALER_SPEC_TEMPLATE_FILE = 
"supervisor_with_autoscaler_spec_template.json";
+  private static final String 
SUPERVISOR_WITH_IDLE_BEHAVIOUR_ENABLED_SPEC_TEMPLATE_FILE =
+      "supervisor_with_idle_behaviour_enabled_spec_template.json";
 
   protected static final String DATA_RESOURCE_ROOT = "/stream/data";
   protected static final String SUPERVISOR_SPEC_TEMPLATE_PATH =
       String.join("/", DATA_RESOURCE_ROOT, SUPERVISOR_SPEC_TEMPLATE_FILE);
   protected static final String SUPERVISOR_WITH_AUTOSCALER_SPEC_TEMPLATE_PATH =
           String.join("/", DATA_RESOURCE_ROOT, 
SUPERVISOR_WITH_AUTOSCALER_SPEC_TEMPLATE_FILE);
+  protected static final String 
SUPERVISOR_WITH_IDLE_BEHAVIOUR_ENABLED_SPEC_TEMPLATE_PATH =
+      String.join("/", DATA_RESOURCE_ROOT, 
SUPERVISOR_WITH_IDLE_BEHAVIOUR_ENABLED_SPEC_TEMPLATE_FILE);
 
   protected static final String SERIALIZER_SPEC_DIR = "serializer";
   protected static final String INPUT_FORMAT_SPEC_DIR = "input_format";
@@ -332,6 +336,7 @@ public abstract class AbstractStreamIndexingTest extends 
AbstractIndexerTest
       // Start supervisor
       generatedTestConfig.setSupervisorId(indexer.submitSupervisor(taskSpec));
       LOG.info("Submitted supervisor");
+      String dataSource = generatedTestConfig.getFullDatasourceName();
       // Start generating half of the data
       int secondsToGenerateRemaining = TOTAL_NUMBER_OF_SECOND;
       int secondsToGenerateFirstRound = TOTAL_NUMBER_OF_SECOND / 2;
@@ -358,7 +363,10 @@ public abstract class AbstractStreamIndexingTest extends 
AbstractIndexerTest
 
       // wait for autoScaling task numbers from 1 to 2.
       ITRetryUtil.retryUntil(
-          () -> indexer.getRunningTasks().size() == 2,
+          () -> indexer.getRunningTasks()
+                       .stream()
+                       .filter(taskResponseObject -> 
taskResponseObject.getId().contains(dataSource))
+                       .count() == 2,
               true,
               10000,
               50,
@@ -378,6 +386,79 @@ public abstract class AbstractStreamIndexingTest extends 
AbstractIndexerTest
     }
   }
 
+  protected void doTestIndexDataWithIdleConfigEnabled(@Nullable Boolean 
transactionEnabled) throws Exception
+  {
+    final GeneratedTestConfig generatedTestConfig = new GeneratedTestConfig(
+        INPUT_FORMAT,
+        getResourceAsString(JSON_INPUT_FORMAT_PATH)
+    );
+    try (
+        final Closeable closer = createResourceCloser(generatedTestConfig);
+        final StreamEventWriter streamEventWriter = 
createStreamEventWriter(config, transactionEnabled)
+    ) {
+      final String taskSpec = 
generatedTestConfig.getStreamIngestionPropsTransform()
+                                                 
.apply(getResourceAsString(SUPERVISOR_WITH_IDLE_BEHAVIOUR_ENABLED_SPEC_TEMPLATE_PATH));
+      LOG.info("supervisorSpec: [%s]\n", taskSpec);
+      // Start supervisor
+      generatedTestConfig.setSupervisorId(indexer.submitSupervisor(taskSpec));
+      LOG.info("Submitted supervisor");
+      String dataSource = generatedTestConfig.getFullDatasourceName();
+      // Start generating half of the data
+      int secondsToGenerateRemaining = TOTAL_NUMBER_OF_SECOND;
+      int secondsToGenerateFirstRound = TOTAL_NUMBER_OF_SECOND / 2;
+      secondsToGenerateRemaining = secondsToGenerateRemaining - 
secondsToGenerateFirstRound;
+      final StreamGenerator streamGenerator = new 
WikipediaStreamEventStreamGenerator(
+          new JsonEventSerializer(jsonMapper),
+          EVENTS_PER_SECOND,
+          CYCLE_PADDING_MS
+      );
+      long numWritten = streamGenerator.run(
+          generatedTestConfig.getStreamName(),
+          streamEventWriter,
+          secondsToGenerateFirstRound,
+          FIRST_EVENT_TIME
+      );
+      // Verify supervisor is healthy before suspension
+      ITRetryUtil.retryUntil(
+          () -> 
SupervisorStateManager.BasicState.RUNNING.equals(indexer.getSupervisorStatus(generatedTestConfig.getSupervisorId())),
+          true,
+          10000,
+          30,
+          "Waiting for supervisor to be healthy"
+      );
+
+      ITRetryUtil.retryUntil(
+          () -> 
SupervisorStateManager.BasicState.IDLE.equals(indexer.getSupervisorStatus(generatedTestConfig.getSupervisorId())),
+          true,
+          10000,
+          30,
+          "Waiting for supervisor to be idle"
+      );
+
+      // wait for no more creation of indexing tasks.
+      ITRetryUtil.retryUntil(
+          () -> indexer.getRunningTasks()
+                       .stream()
+                       .noneMatch(taskResponseObject -> 
taskResponseObject.getId().contains(dataSource)),
+          true,
+          10000,
+          50,
+          "wait for no more creation of indexing tasks"
+      );
+
+      // Start generating remainning half of the data
+      numWritten += streamGenerator.run(
+          generatedTestConfig.getStreamName(),
+          streamEventWriter,
+          secondsToGenerateRemaining,
+          FIRST_EVENT_TIME.plusSeconds(secondsToGenerateFirstRound)
+      );
+
+      // Verify that supervisor can catch up with the stream
+      verifyIngestedData(generatedTestConfig, numWritten);
+    }
+  }
+
   protected void doTestTerminatedSupervisorAutoCleanup(@Nullable Boolean 
transactionEnabled) throws Exception
   {
     final GeneratedTestConfig generatedTestConfig1 = new GeneratedTestConfig(
diff --git 
a/integration-tests/src/test/java/org/apache/druid/tests/parallelized/ITKafkaIndexingServiceNonTransactionalParallelizedTest.java
 
b/integration-tests/src/test/java/org/apache/druid/tests/parallelized/ITKafkaIndexingServiceNonTransactionalParallelizedTest.java
index 936e7a177d..e08b6c0ebe 100644
--- 
a/integration-tests/src/test/java/org/apache/druid/tests/parallelized/ITKafkaIndexingServiceNonTransactionalParallelizedTest.java
+++ 
b/integration-tests/src/test/java/org/apache/druid/tests/parallelized/ITKafkaIndexingServiceNonTransactionalParallelizedTest.java
@@ -62,6 +62,12 @@ public class 
ITKafkaIndexingServiceNonTransactionalParallelizedTest extends Abst
     doTestIndexDataWithAutoscaler(false);
   }
 
+  @Test
+  public void testIndexDataWithIdleConfigEnabled() throws Exception
+  {
+    doTestIndexDataWithIdleConfigEnabled(false);
+  }
+
   /**
    * This test can be run concurrently with other tests as it 
creates/modifies/teardowns a unique datasource
    * and supervisor maintained and scoped within this test only
diff --git 
a/integration-tests/src/test/resources/stream/data/supervisor_with_idle_behaviour_enabled_spec_template.json
 
b/integration-tests/src/test/resources/stream/data/supervisor_with_idle_behaviour_enabled_spec_template.json
new file mode 100644
index 0000000000..2f897d26c3
--- /dev/null
+++ 
b/integration-tests/src/test/resources/stream/data/supervisor_with_idle_behaviour_enabled_spec_template.json
@@ -0,0 +1,62 @@
+{
+  "type": "%%STREAM_TYPE%%",
+  "dataSchema": {
+    "dataSource": "%%DATASOURCE%%",
+    "parser": %%PARSER%%,
+    "timestampSpec": {
+      "column": "timestamp",
+      "format": "auto"
+    },
+    "dimensionsSpec": {
+      "dimensions": ["page", "language", "user", "unpatrolled", "newPage", 
"robot", "anonymous", "namespace", "continent", "country", "region", "city"],
+      "dimensionExclusions": [],
+      "spatialDimensions": []
+    },
+    "metricsSpec": [
+      {
+        "type": "count",
+        "name": "count"
+      },
+      {
+        "type": "doubleSum",
+        "name": "added",
+        "fieldName": "added"
+      },
+      {
+        "type": "doubleSum",
+        "name": "deleted",
+        "fieldName": "deleted"
+      },
+      {
+        "type": "doubleSum",
+        "name": "delta",
+        "fieldName": "delta"
+      }
+    ],
+    "granularitySpec": {
+      "type": "uniform",
+      "segmentGranularity": "MINUTE",
+      "queryGranularity": "NONE"
+    }
+  },
+  "tuningConfig": {
+    "type": "%%STREAM_TYPE%%",
+    "intermediatePersistPeriod": "PT30S",
+    "maxRowsPerSegment": 5000000,
+    "maxRowsInMemory": 500000
+  },
+  "ioConfig": {
+    "%%TOPIC_KEY%%": "%%TOPIC_VALUE%%",
+    "%%STREAM_PROPERTIES_KEY%%": %%STREAM_PROPERTIES_VALUE%%,
+    "autoScalerConfig": null,
+    "taskCount": 1,
+    "replicas": 1,
+    "taskDuration": "PT120S",
+    "%%USE_EARLIEST_KEY%%": true,
+    "inputFormat" : %%INPUT_FORMAT%%,
+    "idleConfig": {
+      "enabled": true,
+      "inactiveAfterMillis": 10000
+    }
+  }
+}
diff --git 
a/server/src/main/java/org/apache/druid/indexing/overlord/supervisor/SupervisorStateManager.java
 
b/server/src/main/java/org/apache/druid/indexing/overlord/supervisor/SupervisorStateManager.java
index 346fcd20de..1ea36229c3 100644
--- 
a/server/src/main/java/org/apache/druid/indexing/overlord/supervisor/SupervisorStateManager.java
+++ 
b/server/src/main/java/org/apache/druid/indexing/overlord/supervisor/SupervisorStateManager.java
@@ -62,6 +62,7 @@ public class SupervisorStateManager
 
     PENDING(true, true),
     RUNNING(true, false),
+    IDLE(true, false),
     SUSPENDED(true, false),
     STOPPING(true, false);
 
@@ -154,10 +155,11 @@ public class SupervisorStateManager
       return;
     }
 
-    // if we're trying to switch to a healthy steady state (i.e. RUNNING or 
SUSPENDED) but haven't had a successful run
+    // if we're trying to switch to a healthy steady state (i.e. RUNNING or 
SUSPENDED) or IDLE state but haven't had a successful run
     // yet, refuse to switch and prefer the more specific states used for 
first run (CONNECTING_TO_STREAM,
     // DISCOVERING_INITIAL_TASKS, CREATING_TASKS, etc.)
-    if (healthySteadyState.equals(proposedState) && !atLeastOneSuccessfulRun) {
+    if ((healthySteadyState.equals(proposedState) || 
BasicState.IDLE.equals(proposedState))
+        && !atLeastOneSuccessfulRun) {
       return;
     }
 
@@ -196,11 +198,13 @@ public class SupervisorStateManager
     consecutiveSuccessfulRuns = currentRunSuccessful ? 
consecutiveSuccessfulRuns + 1 : 0;
     consecutiveFailedRuns = currentRunSuccessful ? 0 : consecutiveFailedRuns + 
1;
 
-    // Try to set the state to RUNNING or SUSPENDED. This will be rejected if 
we haven't had atLeastOneSuccessfulRun
+    // If the supervisor is not IDLE, try to set the state to RUNNING or 
SUSPENDED.
+    // This will be rejected if we haven't had atLeastOneSuccessfulRun
     // (in favor of the more specific states for the initial run) and will 
instead trigger setting the state to an
     // unhealthy one if we are now over the error thresholds.
-    maybeSetState(healthySteadyState);
-
+    if (!isIdle()) {
+      maybeSetState(healthySteadyState);
+    }
     // reset for next run
     currentRunSuccessful = true;
   }
@@ -230,6 +234,11 @@ public class SupervisorStateManager
     return atLeastOneSuccessfulRun;
   }
 
+  public boolean isIdle()
+  {
+    return SupervisorStateManager.BasicState.IDLE.equals(supervisorState);
+  }
+
   protected Deque<ExceptionEvent> getRecentEventsQueue()
   {
     return recentEventsQueue;
diff --git 
a/server/src/test/java/org/apache/druid/indexing/overlord/supervisor/SupervisorStateManagerTest.java
 
b/server/src/test/java/org/apache/druid/indexing/overlord/supervisor/SupervisorStateManagerTest.java
new file mode 100644
index 0000000000..01bdeb9e7d
--- /dev/null
+++ 
b/server/src/test/java/org/apache/druid/indexing/overlord/supervisor/SupervisorStateManagerTest.java
@@ -0,0 +1,44 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.indexing.overlord.supervisor;
+
+import org.junit.Assert;
+import org.junit.Test;
+
+public class SupervisorStateManagerTest
+{
+  @Test
+  public void testMarkRunFinishedIfSupervisorIsIdle()
+  {
+    SupervisorStateManager supervisorStateManager = new SupervisorStateManager(
+        new SupervisorStateManagerConfig(),
+        false
+    );
+
+    supervisorStateManager.markRunFinished();
+
+    Assert.assertEquals(SupervisorStateManager.BasicState.RUNNING, 
supervisorStateManager.getSupervisorState());
+
+    
supervisorStateManager.maybeSetState(SupervisorStateManager.BasicState.IDLE);
+    supervisorStateManager.markRunFinished();
+
+    Assert.assertEquals(SupervisorStateManager.BasicState.IDLE, 
supervisorStateManager.getSupervisorState());
+  }
+}


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to