This is an automated email from the ASF dual-hosted git repository.

jihoonson pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/druid.git


The following commit(s) were added to refs/heads/master by this push:
     new 2e9548d  refactor SeekableStreamSupervisor usage of RecordSupplier 
(#9819)
2e9548d is described below

commit 2e9548d93d5a27e824cc93293e1de55af63d158f
Author: Clint Wylie <[email protected]>
AuthorDate: Sat May 16 14:09:39 2020 -0700

    refactor SeekableStreamSupervisor usage of RecordSupplier (#9819)
    
    * refactor SeekableStreamSupervisor usage of RecordSupplier to reduce 
contention between background threads and main thread, refactor 
KinesisRecordSupplier, refactor Kinesis lag metric collection and emitting
    
    * fix style and test
    
    * cleanup, refactor, javadocs, test
    
    * fixes
    
    * keep collecting current offsets and lag if unhealthy in background 
reporting thread
    
    * review stuffs
    
    * add comment
---
 .../java/org/apache/druid/indexer/TaskIdUtils.java |   8 +
 .../indexing/kafka/supervisor/KafkaSupervisor.java |  44 +-
 .../druid/indexing/kinesis/KinesisIndexTask.java   |  10 +-
 .../indexing/kinesis/KinesisRecordSupplier.java    | 647 +++++++++++----------
 .../druid/indexing/kinesis/KinesisSamplerSpec.java |   3 +-
 .../indexing/kinesis/KinesisSequenceNumber.java    |  28 +-
 .../kinesis/supervisor/KinesisSupervisor.java      |  43 +-
 .../indexing/kinesis/KinesisIndexTaskTest.java     |   2 +-
 .../kinesis/KinesisRecordSupplierTest.java         |  64 +-
 .../kinesis/supervisor/KinesisSupervisorTest.java  | 166 +++---
 .../supervisor/SeekableStreamSupervisor.java       | 164 +++---
 .../SeekableStreamSupervisorStateTest.java         |  20 +-
 .../supervisor/SupervisorStateManager.java         |   7 +-
 13 files changed, 669 insertions(+), 537 deletions(-)

diff --git a/core/src/main/java/org/apache/druid/indexer/TaskIdUtils.java 
b/core/src/main/java/org/apache/druid/indexer/TaskIdUtils.java
index a88341b..76317d9 100644
--- a/core/src/main/java/org/apache/druid/indexer/TaskIdUtils.java
+++ b/core/src/main/java/org/apache/druid/indexer/TaskIdUtils.java
@@ -19,6 +19,7 @@
 
 package org.apache.druid.indexer;
 
+import com.google.common.base.Joiner;
 import com.google.common.base.Preconditions;
 import com.google.common.base.Strings;
 import org.apache.druid.java.util.common.StringUtils;
@@ -31,6 +32,8 @@ public class TaskIdUtils
 {
   private static final Pattern INVALIDCHARS = Pattern.compile("(?s).*[^\\S 
].*");
 
+  private static final Joiner UNDERSCORE_JOINER = Joiner.on("_");
+
   public static void validateId(String thingToValidate, String 
stringToValidate)
   {
     Preconditions.checkArgument(
@@ -60,4 +63,9 @@ public class TaskIdUtils
     }
     return suffix.toString();
   }
+
+  public static String getRandomIdWithPrefix(String prefix)
+  {
+    return UNDERSCORE_JOINER.join(prefix, TaskIdUtils.getRandomId());
+  }
 }
diff --git 
a/extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/indexing/kafka/supervisor/KafkaSupervisor.java
 
b/extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/indexing/kafka/supervisor/KafkaSupervisor.java
index 6c88c5a..b2352fe 100644
--- 
a/extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/indexing/kafka/supervisor/KafkaSupervisor.java
+++ 
b/extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/indexing/kafka/supervisor/KafkaSupervisor.java
@@ -23,7 +23,6 @@ import com.fasterxml.jackson.core.JsonProcessingException;
 import com.fasterxml.jackson.core.type.TypeReference;
 import com.fasterxml.jackson.databind.ObjectMapper;
 import com.google.common.annotations.VisibleForTesting;
-import com.google.common.base.Joiner;
 import org.apache.druid.indexer.TaskIdUtils;
 import org.apache.druid.indexing.common.stats.RowIngestionMetersFactory;
 import org.apache.druid.indexing.common.task.Task;
@@ -46,6 +45,7 @@ import 
org.apache.druid.indexing.seekablestream.SeekableStreamIndexTaskTuningCon
 import 
org.apache.druid.indexing.seekablestream.SeekableStreamStartSequenceNumbers;
 import org.apache.druid.indexing.seekablestream.common.OrderedSequenceNumber;
 import org.apache.druid.indexing.seekablestream.common.RecordSupplier;
+import org.apache.druid.indexing.seekablestream.common.StreamException;
 import org.apache.druid.indexing.seekablestream.common.StreamPartition;
 import 
org.apache.druid.indexing.seekablestream.supervisor.SeekableStreamSupervisor;
 import 
org.apache.druid.indexing.seekablestream.supervisor.SeekableStreamSupervisorIOConfig;
@@ -219,7 +219,7 @@ public class KafkaSupervisor extends 
SeekableStreamSupervisor<Integer, Long>
 
     List<SeekableStreamIndexTask<Integer, Long>> taskList = new ArrayList<>();
     for (int i = 0; i < replicas; i++) {
-      String taskId = Joiner.on("_").join(baseSequenceName, 
TaskIdUtils.getRandomId());
+      String taskId = TaskIdUtils.getRandomIdWithPrefix(baseSequenceName);
       taskList.add(new KafkaIndexTask(
           taskId,
           new TaskResource(baseSequenceName, 1),
@@ -334,16 +334,38 @@ public class KafkaSupervisor extends 
SeekableStreamSupervisor<Integer, Long>
   }
 
   @Override
-  protected void updateLatestSequenceFromStream(
-      RecordSupplier<Integer, Long> recordSupplier,
-      Set<StreamPartition<Integer>> partitions
-  )
+  protected void updatePartitionLagFromStream()
   {
-    latestSequenceFromStream = partitions.stream()
-                                         .collect(Collectors.toMap(
-                                             StreamPartition::getPartitionId,
-                                             recordSupplier::getPosition
-                                         ));
+    getRecordSupplierLock().lock();
+    try {
+      Set<Integer> partitionIds;
+      try {
+        partitionIds = 
recordSupplier.getPartitionIds(getIoConfig().getStream());
+      }
+      catch (Exception e) {
+        log.warn("Could not fetch partitions for topic/stream [%s]", 
getIoConfig().getStream());
+        throw new StreamException(e);
+      }
+
+      Set<StreamPartition<Integer>> partitions = partitionIds
+          .stream()
+          .map(e -> new StreamPartition<>(getIoConfig().getStream(), e))
+          .collect(Collectors.toSet());
+
+      recordSupplier.seekToLatest(partitions);
+
+      // this method isn't actually computing the lag, just fetching the 
latests offsets from the stream. This is
+      // because we currently only have record lag for kafka, which can be 
lazily computed by subtracting the highest
+      // task offsets from the latest offsets from the stream when it is needed
+      latestSequenceFromStream =
+          
partitions.stream().collect(Collectors.toMap(StreamPartition::getPartitionId, 
recordSupplier::getPosition));
+    }
+    catch (InterruptedException e) {
+      throw new StreamException(e);
+    }
+    finally {
+      getRecordSupplierLock().unlock();
+    }
   }
 
   @Override
diff --git 
a/extensions-core/kinesis-indexing-service/src/main/java/org/apache/druid/indexing/kinesis/KinesisIndexTask.java
 
b/extensions-core/kinesis-indexing-service/src/main/java/org/apache/druid/indexing/kinesis/KinesisIndexTask.java
index 5d5a307..bfd3758 100644
--- 
a/extensions-core/kinesis-indexing-service/src/main/java/org/apache/druid/indexing/kinesis/KinesisIndexTask.java
+++ 
b/extensions-core/kinesis-indexing-service/src/main/java/org/apache/druid/indexing/kinesis/KinesisIndexTask.java
@@ -23,6 +23,7 @@ import com.fasterxml.jackson.annotation.JacksonInject;
 import com.fasterxml.jackson.annotation.JsonCreator;
 import com.fasterxml.jackson.annotation.JsonProperty;
 import com.google.common.annotations.VisibleForTesting;
+import com.google.common.base.Preconditions;
 import com.google.inject.name.Named;
 import org.apache.druid.common.aws.AWSCredentialsConfig;
 import org.apache.druid.indexing.common.stats.RowIngestionMetersFactory;
@@ -97,8 +98,12 @@ public class KinesisIndexTask extends 
SeekableStreamIndexTask<String, String>
     KinesisIndexTaskTuningConfig tuningConfig = 
((KinesisIndexTaskTuningConfig) super.tuningConfig);
     int fetchThreads = tuningConfig.getFetchThreads() != null
                        ? tuningConfig.getFetchThreads()
-                       : Math.max(1, 
ioConfig.getStartSequenceNumbers().getPartitionSequenceNumberMap().size());
+                       : Runtime.getRuntime().availableProcessors() * 2;
 
+    Preconditions.checkArgument(
+        fetchThreads > 0,
+        "Must have at least one background fetch thread for the record 
supplier"
+    );
     return new KinesisRecordSupplier(
         KinesisRecordSupplier.getAmazonKinesisClient(
             ioConfig.getEndpoint(),
@@ -114,7 +119,8 @@ public class KinesisIndexTask extends 
SeekableStreamIndexTask<String, String>
         tuningConfig.getRecordBufferOfferTimeout(),
         tuningConfig.getRecordBufferFullWait(),
         tuningConfig.getFetchSequenceNumberTimeout(),
-        tuningConfig.getMaxRecordsPerPoll()
+        tuningConfig.getMaxRecordsPerPoll(),
+        false
     );
   }
 
diff --git 
a/extensions-core/kinesis-indexing-service/src/main/java/org/apache/druid/indexing/kinesis/KinesisRecordSupplier.java
 
b/extensions-core/kinesis-indexing-service/src/main/java/org/apache/druid/indexing/kinesis/KinesisRecordSupplier.java
index f4b7830..b32ee14 100644
--- 
a/extensions-core/kinesis-indexing-service/src/main/java/org/apache/druid/indexing/kinesis/KinesisRecordSupplier.java
+++ 
b/extensions-core/kinesis-indexing-service/src/main/java/org/apache/druid/indexing/kinesis/KinesisRecordSupplier.java
@@ -54,6 +54,7 @@ import 
org.apache.druid.indexing.seekablestream.common.RecordSupplier;
 import org.apache.druid.indexing.seekablestream.common.StreamException;
 import org.apache.druid.indexing.seekablestream.common.StreamPartition;
 import org.apache.druid.java.util.common.ISE;
+import org.apache.druid.java.util.common.RetryUtils;
 import org.apache.druid.java.util.common.StringUtils;
 import org.apache.druid.java.util.common.concurrent.Execs;
 import org.apache.druid.java.util.emitter.EmittingLogger;
@@ -82,7 +83,9 @@ import java.util.concurrent.Executors;
 import java.util.concurrent.LinkedBlockingQueue;
 import java.util.concurrent.RejectedExecutionException;
 import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.ScheduledFuture;
 import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.stream.Collectors;
 
 /**
@@ -95,6 +98,14 @@ public class KinesisRecordSupplier implements 
RecordSupplier<String, String>
   private static final long PROVISIONED_THROUGHPUT_EXCEEDED_BACKOFF_MS = 3000;
   private static final long EXCEPTION_RETRY_DELAY_MS = 10000;
 
+  /**
+   * We call getRecords with limit 1000 to make sure that we can find the 
first (earliest) record in the shard.
+   * In the case where the shard is constantly removing records that are past 
their retention period, it is possible
+   * that we never find the first record in the shard if we use a limit of 1.
+   */
+  private static final int GET_SEQUENCE_NUMBER_RECORD_COUNT = 1000;
+  private static final int GET_SEQUENCE_NUMBER_RETRY_COUNT = 10;
+
   private static boolean isServiceExceptionRecoverable(AmazonServiceException 
ex)
   {
     final boolean isIOException = ex.getCause() instanceof IOException;
@@ -102,6 +113,37 @@ public class KinesisRecordSupplier implements 
RecordSupplier<String, String>
     return isIOException || isTimeout;
   }
 
+  /**
+   * Returns an array with the content between the position and limit of 
"buffer". This may be the buffer's backing
+   * array itself. Does not modify position or limit of the buffer.
+   */
+  private static byte[] toByteArray(final ByteBuffer buffer)
+  {
+    if (buffer.hasArray()
+        && buffer.arrayOffset() == 0
+        && buffer.position() == 0
+        && buffer.array().length == buffer.limit()) {
+      return buffer.array();
+    } else {
+      final byte[] retVal = new byte[buffer.remaining()];
+      buffer.duplicate().get(retVal);
+      return retVal;
+    }
+  }
+
+  /**
+   * Catch any exception and wrap it in a {@link StreamException}
+   */
+  private static <T> T wrapExceptions(Callable<T> callable)
+  {
+    try {
+      return callable.call();
+    }
+    catch (Exception e) {
+      throw new StreamException(e);
+    }
+  }
+
   private class PartitionResource
   {
     private final StreamPartition<String> streamPartition;
@@ -112,101 +154,80 @@ public class KinesisRecordSupplier implements 
RecordSupplier<String, String>
     // to indicate that this shard has no more records to read
     @Nullable
     private volatile String shardIterator;
-    private volatile boolean started;
-    private volatile boolean stopRequested;
-
     private volatile long currentLagMillis;
 
-    PartitionResource(StreamPartition<String> streamPartition)
+    private final AtomicBoolean fetchStarted = new AtomicBoolean();
+    private ScheduledFuture<?> currentFetch;
+
+    private PartitionResource(StreamPartition<String> streamPartition)
     {
       this.streamPartition = streamPartition;
     }
 
-    void startBackgroundFetch()
+    private void startBackgroundFetch()
     {
-      if (started) {
+      if (!backgroundFetchEnabled) {
         return;
       }
+      // if seek has been called
+      if (shardIterator == null) {
+        log.warn(
+            "Skipping background fetch for stream[%s] partition[%s] since seek 
has not been called for this partition",
+            streamPartition.getStream(),
+            streamPartition.getPartitionId()
+        );
+        return;
+      }
+      if (fetchStarted.compareAndSet(false, true)) {
+        log.debug(
+            "Starting scheduled fetch for stream[%s] partition[%s]",
+            streamPartition.getStream(),
+            streamPartition.getPartitionId()
+        );
 
-      log.info(
-          "Starting scheduled fetch runnable for stream[%s] partition[%s]",
-          streamPartition.getStream(),
-          streamPartition.getPartitionId()
-      );
-
-      stopRequested = false;
-      started = true;
-
-      rescheduleRunnable(fetchDelayMillis);
-    }
-
-    void stopBackgroundFetch()
-    {
-      log.info(
-          "Stopping scheduled fetch runnable for stream[%s] partition[%s]",
-          streamPartition.getStream(),
-          streamPartition.getPartitionId()
-      );
-      stopRequested = true;
+        scheduleBackgroundFetch(fetchDelayMillis);
+      }
     }
 
-    long getPartitionTimeLag()
+    private void stopBackgroundFetch()
     {
-      return currentLagMillis;
+      if (fetchStarted.compareAndSet(true, false)) {
+        log.debug(
+            "Stopping scheduled fetch for stream[%s] partition[%s]",
+            streamPartition.getStream(),
+            streamPartition.getPartitionId()
+        );
+        if (currentFetch != null && !currentFetch.isDone()) {
+          currentFetch.cancel(true);
+        }
+      }
     }
 
-    long getPartitionTimeLag(String offset)
+    private void scheduleBackgroundFetch(long delayMillis)
     {
-      // if not started (fetching records in background), fetch lag ourself 
with a throw-away iterator
-      if (!started) {
+      if (fetchStarted.get()) {
         try {
-          final String iteratorType;
-          final String offsetToUse;
-          if (offset == null || KinesisSupervisor.NOT_SET.equals(offset)) {
-            // this should probably check if will start processing earliest or 
latest rather than assuming earliest
-            // if latest we could skip this because latest will not be behind 
latest so lag is 0.
-            iteratorType = ShardIteratorType.TRIM_HORIZON.toString();
-            offsetToUse = null;
-          } else {
-            iteratorType = ShardIteratorType.AT_SEQUENCE_NUMBER.toString();
-            offsetToUse = offset;
-          }
-          String shardIterator = kinesis.getShardIterator(
-              streamPartition.getStream(),
-              streamPartition.getPartitionId(),
-              iteratorType,
-              offsetToUse
-          ).getShardIterator();
-
-          GetRecordsResult recordsResult = kinesis.getRecords(
-              new 
GetRecordsRequest().withShardIterator(shardIterator).withLimit(recordsPerFetch)
-          );
-
-          currentLagMillis = recordsResult.getMillisBehindLatest();
-          return currentLagMillis;
+          currentFetch = scheduledExec.schedule(fetchRecords(), delayMillis, 
TimeUnit.MILLISECONDS);
         }
-        catch (Exception ex) {
-          // eat it
+        catch (RejectedExecutionException e) {
           log.warn(
-              ex,
-              "Failed to determine partition lag for partition %s of stream 
%s",
-              streamPartition.getPartitionId(),
-              streamPartition.getStream()
+              e,
+              "Caught RejectedExecutionException, KinesisRecordSupplier for 
partition[%s] has likely temporarily shutdown the ExecutorService. "
+              + "This is expected behavior after calling seek(), 
seekToEarliest() and seekToLatest()",
+              streamPartition.getPartitionId()
           );
+
         }
+      } else {
+        log.debug("Worker for partition[%s] is already stopped", 
streamPartition.getPartitionId());
       }
-      return currentLagMillis;
     }
 
-    private Runnable getRecordRunnable()
+    private Runnable fetchRecords()
     {
       return () -> {
-
-        if (stopRequested) {
-          started = false;
-          stopRequested = false;
-
-          log.info("Worker for partition[%s] has been stopped", 
streamPartition.getPartitionId());
+        if (!fetchStarted.get()) {
+          log.debug("Worker for partition[%s] has been stopped", 
streamPartition.getPartitionId());
           return;
         }
 
@@ -231,7 +252,7 @@ public class KinesisRecordSupplier implements 
RecordSupplier<String, String>
 
             if (!records.offer(currRecord, recordBufferOfferTimeout, 
TimeUnit.MILLISECONDS)) {
               log.warn("OrderedPartitionableRecord buffer full, retrying in 
[%,dms]", recordBufferFullWait);
-              rescheduleRunnable(recordBufferFullWait);
+              scheduleBackgroundFetch(recordBufferFullWait);
             }
 
             return;
@@ -298,14 +319,14 @@ public class KinesisRecordSupplier implements 
RecordSupplier<String, String>
                   currRecord.getSequenceNumber()
               ).getShardIterator();
 
-              rescheduleRunnable(recordBufferFullWait);
+              scheduleBackgroundFetch(recordBufferFullWait);
               return;
             }
           }
 
           shardIterator = recordsResult.getNextShardIterator(); // will be 
null if the shard has been closed
 
-          rescheduleRunnable(fetchDelayMillis);
+          scheduleBackgroundFetch(fetchDelayMillis);
         }
         catch (ProvisionedThroughputExceededException e) {
           log.warn(
@@ -315,7 +336,7 @@ public class KinesisRecordSupplier implements 
RecordSupplier<String, String>
               + "the available throughput. Reduce the frequency or size of 
your requests."
           );
           long retryMs = Math.max(PROVISIONED_THROUGHPUT_EXCEEDED_BACKOFF_MS, 
fetchDelayMillis);
-          rescheduleRunnable(retryMs);
+          scheduleBackgroundFetch(retryMs);
         }
         catch (InterruptedException e) {
           // may happen if interrupted while BlockingQueue.offer() is waiting
@@ -324,7 +345,7 @@ public class KinesisRecordSupplier implements 
RecordSupplier<String, String>
               "Interrupted while waiting to add record to buffer, retrying in 
[%,dms]",
               EXCEPTION_RETRY_DELAY_MS
           );
-          rescheduleRunnable(EXCEPTION_RETRY_DELAY_MS);
+          scheduleBackgroundFetch(EXCEPTION_RETRY_DELAY_MS);
         }
         catch (ExpiredIteratorException e) {
           log.warn(
@@ -334,7 +355,7 @@ public class KinesisRecordSupplier implements 
RecordSupplier<String, String>
           );
           if (recordsResult != null) {
             shardIterator = recordsResult.getNextShardIterator(); // will be 
null if the shard has been closed
-            rescheduleRunnable(fetchDelayMillis);
+            scheduleBackgroundFetch(fetchDelayMillis);
           } else {
             throw new ISE("can't reschedule fetch records runnable, 
recordsResult is null??");
           }
@@ -347,7 +368,7 @@ public class KinesisRecordSupplier implements 
RecordSupplier<String, String>
         catch (AmazonServiceException e) {
           if (isServiceExceptionRecoverable(e)) {
             log.warn(e, "encounted unknown recoverable AWS exception, retrying 
in [%,dms]", EXCEPTION_RETRY_DELAY_MS);
-            rescheduleRunnable(EXCEPTION_RETRY_DELAY_MS);
+            scheduleBackgroundFetch(EXCEPTION_RETRY_DELAY_MS);
           } else {
             log.warn(e, "encounted unknown unrecoverable AWS exception, will 
not retry");
             throw new RuntimeException(e);
@@ -355,31 +376,32 @@ public class KinesisRecordSupplier implements 
RecordSupplier<String, String>
         }
         catch (Throwable e) {
           // non transient errors
-          log.error(e, "unknown getRecordRunnable exception, will not retry");
+          log.error(e, "unknown fetchRecords exception, will not retry");
           throw new RuntimeException(e);
         }
 
       };
     }
 
-    private void rescheduleRunnable(long delayMillis)
+    private void seek(ShardIteratorType iteratorEnum, String sequenceNumber)
     {
-      if (started && !stopRequested) {
-        try {
-          scheduledExec.schedule(getRecordRunnable(), delayMillis, 
TimeUnit.MILLISECONDS);
-        }
-        catch (RejectedExecutionException e) {
-          log.warn(
-              e,
-              "Caught RejectedExecutionException, KinesisRecordSupplier for 
partition[%s] has likely temporarily shutdown the ExecutorService. "
-              + "This is expected behavior after calling seek(), 
seekToEarliest() and seekToLatest()",
-              streamPartition.getPartitionId()
-          );
+      log.debug(
+          "Seeking partition [%s] to [%s]",
+          streamPartition.getPartitionId(),
+          sequenceNumber != null ? sequenceNumber : iteratorEnum.toString()
+      );
 
-        }
-      } else {
-        log.info("Worker for partition[%s] has been stopped", 
streamPartition.getPartitionId());
-      }
+      shardIterator = wrapExceptions(() -> kinesis.getShardIterator(
+          streamPartition.getStream(),
+          streamPartition.getPartitionId(),
+          iteratorEnum.toString(),
+          sequenceNumber
+      ).getShardIterator());
+    }
+
+    private long getPartitionTimeLag()
+    {
+      return currentLagMillis;
     }
   }
 
@@ -398,6 +420,7 @@ public class KinesisRecordSupplier implements 
RecordSupplier<String, String>
   private final int maxRecordsPerPoll;
   private final int fetchThreads;
   private final int recordBufferSize;
+  private final boolean useEarliestSequenceNumber;
 
   private ScheduledExecutorService scheduledExec;
 
@@ -405,8 +428,9 @@ public class KinesisRecordSupplier implements 
RecordSupplier<String, String>
       new ConcurrentHashMap<>();
   private BlockingQueue<OrderedPartitionableRecord<String, String>> records;
 
-  private volatile boolean checkPartitionsStarted = false;
+  private final boolean backgroundFetchEnabled;
   private volatile boolean closed = false;
+  private AtomicBoolean partitionsFetchStarted = new AtomicBoolean();
 
   public KinesisRecordSupplier(
       AmazonKinesis amazonKinesis,
@@ -418,7 +442,8 @@ public class KinesisRecordSupplier implements 
RecordSupplier<String, String>
       int recordBufferOfferTimeout,
       int recordBufferFullWait,
       int fetchSequenceNumberTimeout,
-      int maxRecordsPerPoll
+      int maxRecordsPerPoll,
+      boolean useEarliestSequenceNumber
   )
   {
     Preconditions.checkNotNull(amazonKinesis);
@@ -432,6 +457,8 @@ public class KinesisRecordSupplier implements 
RecordSupplier<String, String>
     this.maxRecordsPerPoll = maxRecordsPerPoll;
     this.fetchThreads = fetchThreads;
     this.recordBufferSize = recordBufferSize;
+    this.useEarliestSequenceNumber = useEarliestSequenceNumber;
+    this.backgroundFetchEnabled = fetchThreads > 0;
 
     // the deaggregate function is implemented by the amazon-kinesis-client, 
whose license is not compatible with Apache.
     // The work around here is to use reflection to find the deaggregate 
function in the classpath. See details on the
@@ -459,16 +486,18 @@ public class KinesisRecordSupplier implements 
RecordSupplier<String, String>
       getDataHandle = null;
     }
 
-    log.info(
-        "Creating fetch thread pool of size [%d] 
(Runtime.availableProcessors=%d)",
-        fetchThreads,
-        Runtime.getRuntime().availableProcessors()
-    );
+    if (backgroundFetchEnabled) {
+      log.info(
+          "Creating fetch thread pool of size [%d] 
(Runtime.availableProcessors=%d)",
+          fetchThreads,
+          Runtime.getRuntime().availableProcessors()
+      );
 
-    scheduledExec = Executors.newScheduledThreadPool(
-        fetchThreads,
-        Execs.makeThreadFactory("KinesisRecordSupplier-Worker-%d")
-    );
+      scheduledExec = Executors.newScheduledThreadPool(
+          fetchThreads,
+          Execs.makeThreadFactory("KinesisRecordSupplier-Worker-%d")
+      );
+    }
 
     records = new LinkedBlockingQueue<>(recordBufferSize);
   }
@@ -517,13 +546,36 @@ public class KinesisRecordSupplier implements 
RecordSupplier<String, String>
   public void start()
   {
     checkIfClosed();
-    if (checkPartitionsStarted) {
+    if (backgroundFetchEnabled && partitionsFetchStarted.compareAndSet(false, 
true)) {
       
partitionResources.values().forEach(PartitionResource::startBackgroundFetch);
-      checkPartitionsStarted = false;
     }
   }
 
   @Override
+  public void close()
+  {
+    if (this.closed) {
+      return;
+    }
+
+    assign(ImmutableSet.of());
+
+    scheduledExec.shutdown();
+
+    try {
+      if (!scheduledExec.awaitTermination(EXCEPTION_RETRY_DELAY_MS, 
TimeUnit.MILLISECONDS)) {
+        scheduledExec.shutdownNow();
+      }
+    }
+    catch (InterruptedException e) {
+      log.warn(e, "InterruptedException while shutting down");
+      throw new RuntimeException(e);
+    }
+
+    this.closed = true;
+  }
+
+  @Override
   public void assign(Set<StreamPartition<String>> collection)
   {
     checkIfClosed();
@@ -535,56 +587,55 @@ public class KinesisRecordSupplier implements 
RecordSupplier<String, String>
         )
     );
 
-    for (Iterator<Map.Entry<StreamPartition<String>, PartitionResource>> i = 
partitionResources.entrySet()
-                                                                               
                .iterator(); i.hasNext(); ) {
+    Iterator<Map.Entry<StreamPartition<String>, PartitionResource>> i = 
partitionResources.entrySet().iterator();
+    while (i.hasNext()) {
       Map.Entry<StreamPartition<String>, PartitionResource> entry = i.next();
       if (!collection.contains(entry.getKey())) {
         i.remove();
         entry.getValue().stopBackgroundFetch();
       }
     }
+  }
 
+  @Override
+  public Collection<StreamPartition<String>> getAssignment()
+  {
+    return partitionResources.keySet();
   }
 
   @Override
   public void seek(StreamPartition<String> partition, String sequenceNumber) 
throws InterruptedException
   {
-    checkIfClosed();
-    filterBufferAndResetFetchRunnable(ImmutableSet.of(partition));
-    seekInternal(partition, sequenceNumber, 
ShardIteratorType.AT_SEQUENCE_NUMBER);
+    filterBufferAndResetBackgroundFetch(ImmutableSet.of(partition));
+    partitionSeek(partition, sequenceNumber, 
ShardIteratorType.AT_SEQUENCE_NUMBER);
   }
 
   @Override
   public void seekToEarliest(Set<StreamPartition<String>> partitions) throws 
InterruptedException
   {
-    checkIfClosed();
-    filterBufferAndResetFetchRunnable(partitions);
-    partitions.forEach(partition -> seekInternal(partition, null, 
ShardIteratorType.TRIM_HORIZON));
+    filterBufferAndResetBackgroundFetch(partitions);
+    partitions.forEach(partition -> partitionSeek(partition, null, 
ShardIteratorType.TRIM_HORIZON));
   }
 
   @Override
   public void seekToLatest(Set<StreamPartition<String>> partitions) throws 
InterruptedException
   {
-    checkIfClosed();
-    filterBufferAndResetFetchRunnable(partitions);
-    partitions.forEach(partition -> seekInternal(partition, null, 
ShardIteratorType.LATEST));
+    filterBufferAndResetBackgroundFetch(partitions);
+    partitions.forEach(partition -> partitionSeek(partition, null, 
ShardIteratorType.LATEST));
   }
 
+  @Nullable
   @Override
-  public Collection<StreamPartition<String>> getAssignment()
+  public String getPosition(StreamPartition<String> partition)
   {
-    return partitionResources.keySet();
+    throw new UnsupportedOperationException("getPosition() is not supported in 
Kinesis");
   }
 
   @Nonnull
   @Override
   public List<OrderedPartitionableRecord<String, String>> poll(long timeout)
   {
-    checkIfClosed();
-    if (checkPartitionsStarted) {
-      
partitionResources.values().forEach(PartitionResource::startBackgroundFetch);
-      checkPartitionsStarted = false;
-    }
+    start();
 
     try {
       int expectedSize = Math.min(Math.max(records.size(), 1), 
maxRecordsPerPoll);
@@ -616,23 +667,14 @@ public class KinesisRecordSupplier implements 
RecordSupplier<String, String>
   @Override
   public String getLatestSequenceNumber(StreamPartition<String> partition)
   {
-    checkIfClosed();
-    return getSequenceNumberInternal(partition, ShardIteratorType.LATEST);
+    return getSequenceNumber(partition, ShardIteratorType.LATEST);
   }
 
   @Nullable
   @Override
   public String getEarliestSequenceNumber(StreamPartition<String> partition)
   {
-    checkIfClosed();
-    return getSequenceNumberInternal(partition, 
ShardIteratorType.TRIM_HORIZON);
-  }
-
-  @Nullable
-  @Override
-  public String getPosition(StreamPartition<String> partition)
-  {
-    throw new UnsupportedOperationException("getPosition() is not supported in 
Kinesis");
+    return getSequenceNumber(partition, ShardIteratorType.TRIM_HORIZON);
   }
 
   @Override
@@ -665,180 +707,179 @@ public class KinesisRecordSupplier implements 
RecordSupplier<String, String>
     );
   }
 
-  @Override
-  public void close()
+  /**
+   * Fetch the partition lag, given a stream and set of current partition 
offsets. This operates independently from
+   * the {@link PartitionResource} which have been assigned to this record 
supplier.
+   */
+  public Map<String, Long> getPartitionsTimeLag(String stream, Map<String, 
String> currentOffsets)
   {
-    if (this.closed) {
-      return;
-    }
-
-    assign(ImmutableSet.of());
-
-    scheduledExec.shutdown();
-
-    try {
-      if (!scheduledExec.awaitTermination(EXCEPTION_RETRY_DELAY_MS, 
TimeUnit.MILLISECONDS)) {
-        scheduledExec.shutdownNow();
-      }
-    }
-    catch (InterruptedException e) {
-      log.warn(e, "InterruptedException while shutting down");
-      throw new RuntimeException(e);
+    Map<String, Long> partitionLag = 
Maps.newHashMapWithExpectedSize(currentOffsets.size());
+    for (Map.Entry<String, String> partitionOffset : 
currentOffsets.entrySet()) {
+      StreamPartition<String> partition = new StreamPartition<>(stream, 
partitionOffset.getKey());
+      long currentLag = getPartitionTimeLag(partition, 
partitionOffset.getValue());
+      partitionLag.put(partitionOffset.getKey(), currentLag);
     }
-
-    this.closed = true;
+    return partitionLag;
   }
 
-  // this is only used for tests
+  /**
+   * This method is only used for tests to verify that {@link 
PartitionResource} in fact tracks it's current lag
+   * as it is polled for records. This isn't currently used in production at 
all, but could be one day if we were
+   * to prefer to get the lag from the running tasks in the same API call 
which fetches the current task offsets,
+   * instead of directly calling the AWS Kinesis API with the offsets returned 
from those tasks
+   * (see {@link #getPartitionsTimeLag}, which accepts a map of current 
partition offsets).
+   */
   @VisibleForTesting
-  Map<String, Long> getPartitionTimeLag()
+  Map<String, Long> getPartitionResourcesTimeLag()
   {
     return partitionResources.entrySet()
                              .stream()
                              .collect(
-                                 Collectors.toMap(k -> 
k.getKey().getPartitionId(), k -> k.getValue().getPartitionTimeLag())
+                                 Collectors.toMap(
+                                     k -> k.getKey().getPartitionId(),
+                                     k -> k.getValue().getPartitionTimeLag()
+                                 )
                              );
   }
 
-  public Map<String, Long> getPartitionTimeLag(Map<String, String> 
currentOffsets)
+  @VisibleForTesting
+  public int bufferSize()
   {
-    Map<String, Long> partitionLag = 
Maps.newHashMapWithExpectedSize(currentOffsets.size());
-    for (Map.Entry<StreamPartition<String>, PartitionResource> partition : 
partitionResources.entrySet()) {
-      final String partitionId = partition.getKey().getPartitionId();
-      partitionLag.put(partitionId, 
partition.getValue().getPartitionTimeLag(currentOffsets.get(partitionId)));
-    }
-    return partitionLag;
+    return records.size();
+  }
+
+  @VisibleForTesting
+  public boolean isBackgroundFetchRunning()
+  {
+    return partitionsFetchStarted.get();
   }
 
-  private void seekInternal(StreamPartition<String> partition, String 
sequenceNumber, ShardIteratorType iteratorEnum)
+  /**
+   * Check that a {@link PartitionResource} has been assigned to this record 
supplier, and if so call
+   * {@link PartitionResource#seek} to move it to the latest offsets. Note 
that this method does not restart background
+   * fetch, which should have been stopped prior to calling this method by a 
call to
+   * {@link #filterBufferAndResetBackgroundFetch}.
+   */
+  private void partitionSeek(StreamPartition<String> partition, String 
sequenceNumber, ShardIteratorType iteratorEnum)
   {
     PartitionResource resource = partitionResources.get(partition);
     if (resource == null) {
       throw new ISE("Partition [%s] has not been assigned", partition);
     }
-
-    log.debug(
-        "Seeking partition [%s] to [%s]",
-        partition.getPartitionId(),
-        sequenceNumber != null ? sequenceNumber : iteratorEnum.toString()
-    );
-
-    resource.shardIterator = wrapExceptions(() -> kinesis.getShardIterator(
-        partition.getStream(),
-        partition.getPartitionId(),
-        iteratorEnum.toString(),
-        sequenceNumber
-    ).getShardIterator());
-
-    checkPartitionsStarted = true;
+    resource.seek(iteratorEnum, sequenceNumber);
   }
 
-  private void filterBufferAndResetFetchRunnable(Set<StreamPartition<String>> 
partitions) throws InterruptedException
+  /**
+   * Given a partition and a {@link ShardIteratorType}, create a shard 
iterator and fetch
+   * {@link #GET_SEQUENCE_NUMBER_RECORD_COUNT} records and return the first 
sequence number from the result set.
+   * This method is thread safe as it does not depend on the internal state of 
the supplier (it doesn't use the
+   * {@link PartitionResource} which have been assigned to the supplier), and 
the Kinesis client is thread safe.
+   */
+  @Nullable
+  private String getSequenceNumber(StreamPartition<String> partition, 
ShardIteratorType iteratorEnum)
   {
-    scheduledExec.shutdown();
-
-    try {
-      if (!scheduledExec.awaitTermination(EXCEPTION_RETRY_DELAY_MS, 
TimeUnit.MILLISECONDS)) {
-        scheduledExec.shutdownNow();
-      }
-    }
-    catch (InterruptedException e) {
-      log.warn(e, "InterruptedException while shutting down");
-      throw e;
-    }
+    return wrapExceptions(() -> {
+      String shardIterator =
+          kinesis.getShardIterator(partition.getStream(), 
partition.getPartitionId(), iteratorEnum.toString())
+                 .getShardIterator();
+      long timeoutMillis = System.currentTimeMillis() + 
fetchSequenceNumberTimeout;
+      GetRecordsResult recordsResult = null;
+
+      while (shardIterator != null && System.currentTimeMillis() < 
timeoutMillis) {
+
+        if (closed) {
+          log.info("KinesisRecordSupplier closed while fetching 
sequenceNumber");
+          return null;
+        }
+        final String currentShardIterator = shardIterator;
+        final GetRecordsRequest request = new 
GetRecordsRequest().withShardIterator(currentShardIterator)
+                                                                 
.withLimit(GET_SEQUENCE_NUMBER_RECORD_COUNT);
+        recordsResult = RetryUtils.retry(
+            () -> kinesis.getRecords(request),
+            (throwable) -> {
+              if (throwable instanceof ProvisionedThroughputExceededException) 
{
+                log.warn(
+                    throwable,
+                    "encountered ProvisionedThroughputExceededException while 
fetching records, this means "
+                    + "that the request rate for the stream is too high, or 
the requested data is too large for "
+                    + "the available throughput. Reduce the frequency or size 
of your requests. Consider increasing "
+                    + "the number of shards to increase throughput."
+                );
+                return true;
+              }
+              return false;
+            },
+            GET_SEQUENCE_NUMBER_RETRY_COUNT
+        );
 
-    scheduledExec = Executors.newScheduledThreadPool(
-        fetchThreads,
-        Execs.makeThreadFactory("KinesisRecordSupplier-Worker-%d")
-    );
+        List<Record> records = recordsResult.getRecords();
 
-    // filter records in buffer and only retain ones whose partition was not 
seeked
-    BlockingQueue<OrderedPartitionableRecord<String, String>> newQ = new 
LinkedBlockingQueue<>(recordBufferSize);
+        if (!records.isEmpty()) {
+          return records.get(0).getSequenceNumber();
+        }
 
-    records.stream()
-           .filter(x -> !partitions.contains(x.getStreamPartition()))
-           .forEachOrdered(newQ::offer);
+        shardIterator = recordsResult.getNextShardIterator();
+      }
 
-    records = newQ;
+      if (shardIterator == null) {
+        log.info("Partition[%s] returned a null shard iterator, is the shard 
closed?", partition.getPartitionId());
+        return KinesisSequenceNumber.END_OF_SHARD_MARKER;
+      }
 
-    // restart fetching threads
-    partitionResources.values().forEach(x -> x.started = false);
-    checkPartitionsStarted = true;
-  }
 
-  @Nullable
-  private String getSequenceNumberInternal(StreamPartition<String> partition, 
ShardIteratorType iteratorEnum)
-  {
-    return wrapExceptions(() -> getSequenceNumberInternal(
-        partition,
-        kinesis.getShardIterator(partition.getStream(), 
partition.getPartitionId(), iteratorEnum.toString())
-               .getShardIterator()
-    ));
+      // if we reach here, it usually means either the shard has no more 
records, or records have not been
+      // added to this shard
+      log.warn(
+          "timed out while trying to fetch position for shard[%s], 
millisBehindLatest is [%s], likely no more records in shard",
+          partition.getPartitionId(),
+          recordsResult != null ? recordsResult.getMillisBehindLatest() : 
"UNKNOWN"
+      );
+      return null;
+    });
   }
 
-  @Nullable
-  private String getSequenceNumberInternal(StreamPartition<String> partition, 
String shardIterator)
+  /**
+   * Given a {@link StreamPartition} and an offset, create a 'shard iterator' 
for the offset and fetch a single record
+   * in order to get the lag: {@link 
GetRecordsResult#getMillisBehindLatest()}. This method is thread safe as it does
+   * not depend on the internal state of the supplier (it doesn't use the 
{@link PartitionResource} which have been
+   * assigned to the supplier), and the Kinesis client is thread safe.
+   */
+  private Long getPartitionTimeLag(StreamPartition<String> partition, String 
offset)
   {
-    long timeoutMillis = System.currentTimeMillis() + 
fetchSequenceNumberTimeout;
-    GetRecordsResult recordsResult = null;
-
-    while (shardIterator != null && System.currentTimeMillis() < 
timeoutMillis) {
-
-      if (closed) {
-        log.info("KinesisRecordSupplier closed while fetching sequenceNumber");
-        return null;
-      }
-      try {
-        // we call getRecords with limit 1000 to make sure that we can find 
the first (earliest) record in the shard.
-        // In the case where the shard is constantly removing records that are 
past their retention period, it is possible
-        // that we never find the first record in the shard if we use a limit 
of 1.
-        recordsResult = kinesis.getRecords(new 
GetRecordsRequest().withShardIterator(shardIterator).withLimit(1000));
-      }
-      catch (ProvisionedThroughputExceededException e) {
-        log.warn(
-            e,
-            "encountered ProvisionedThroughputExceededException while fetching 
records, this means "
-            + "that the request rate for the stream is too high, or the 
requested data is too large for "
-            + "the available throughput. Reduce the frequency or size of your 
requests. Consider increasing "
-            + "the number of shards to increase throughput."
-        );
-        try {
-          Thread.sleep(PROVISIONED_THROUGHPUT_EXCEEDED_BACKOFF_MS);
-          continue;
-        }
-        catch (InterruptedException e1) {
-          log.warn(e1, "Thread interrupted!");
-          Thread.currentThread().interrupt();
-          break;
+    return wrapExceptions(() -> {
+      final String iteratorType;
+      final String offsetToUse;
+      if (offset == null || KinesisSupervisor.OFFSET_NOT_SET.equals(offset)) {
+        if (useEarliestSequenceNumber) {
+          iteratorType = ShardIteratorType.TRIM_HORIZON.toString();
+          offsetToUse = null;
+        } else {
+          // if offset is not set and not using earliest, it means we will 
start reading from latest,
+          // so lag will be 0 and we have nothing to do here
+          return 0L;
         }
+      } else {
+        iteratorType = ShardIteratorType.AT_SEQUENCE_NUMBER.toString();
+        offsetToUse = offset;
       }
+      String shardIterator = kinesis.getShardIterator(
+          partition.getStream(),
+          partition.getPartitionId(),
+          iteratorType,
+          offsetToUse
+      ).getShardIterator();
+
+      GetRecordsResult recordsResult = kinesis.getRecords(
+          new GetRecordsRequest().withShardIterator(shardIterator).withLimit(1)
+      );
 
-      List<Record> records = recordsResult.getRecords();
-
-      if (!records.isEmpty()) {
-        return records.get(0).getSequenceNumber();
-      }
-
-      shardIterator = recordsResult.getNextShardIterator();
-    }
-
-    if (shardIterator == null) {
-      log.info("Partition[%s] returned a null shard iterator, is the shard 
closed?", partition.getPartitionId());
-      return KinesisSequenceNumber.END_OF_SHARD_MARKER;
-    }
-
-
-    // if we reach here, it usually means either the shard has no more 
records, or records have not been
-    // added to this shard
-    log.warn(
-        "timed out while trying to fetch position for shard[%s], 
millisBehindLatest is [%s], likely no more records in shard",
-        partition.getPartitionId(),
-        recordsResult != null ? recordsResult.getMillisBehindLatest() : 
"UNKNOWN"
-    );
-    return null;
-
+      return recordsResult.getMillisBehindLatest();
+    });
   }
 
+  /**
+   * Explode if {@link #close()} has been called on the supplier.
+   */
   private void checkIfClosed()
   {
     if (closed) {
@@ -847,36 +888,46 @@ public class KinesisRecordSupplier implements 
RecordSupplier<String, String>
   }
 
   /**
-   * Returns an array with the content between the position and limit of 
"buffer". This may be the buffer's backing
-   * array itself. Does not modify position or limit of the buffer.
+   * This method must be called before a seek operation ({@link #seek}, {@link 
#seekToLatest}, or
+   * {@link #seekToEarliest}).
+   *
+   * When called, it will nuke the {@link #scheduledExec} that is shared by 
all {@link PartitionResource}, filters
+   * records from the buffer for partitions which will have a seek operation 
performed, and stops background fetch for
+   * each {@link PartitionResource} to prepare for the seek. If background 
fetch is not currently running, the
+   * {@link #scheduledExec} will not be re-created.
    */
-  private static byte[] toByteArray(final ByteBuffer buffer)
+  private void 
filterBufferAndResetBackgroundFetch(Set<StreamPartition<String>> partitions) 
throws InterruptedException
   {
-    if (buffer.hasArray()
-        && buffer.arrayOffset() == 0
-        && buffer.position() == 0
-        && buffer.array().length == buffer.limit()) {
-      return buffer.array();
-    } else {
-      final byte[] retVal = new byte[buffer.remaining()];
-      buffer.duplicate().get(retVal);
-      return retVal;
-    }
-  }
+    checkIfClosed();
+    if (backgroundFetchEnabled && partitionsFetchStarted.compareAndSet(true, 
false)) {
+      scheduledExec.shutdown();
 
-  private static <T> T wrapExceptions(Callable<T> callable)
-  {
-    try {
-      return callable.call();
-    }
-    catch (Exception e) {
-      throw new StreamException(e);
+      try {
+        if (!scheduledExec.awaitTermination(EXCEPTION_RETRY_DELAY_MS, 
TimeUnit.MILLISECONDS)) {
+          scheduledExec.shutdownNow();
+        }
+      }
+      catch (InterruptedException e) {
+        log.warn(e, "InterruptedException while shutting down");
+        throw e;
+      }
+
+      scheduledExec = Executors.newScheduledThreadPool(
+          fetchThreads,
+          Execs.makeThreadFactory("KinesisRecordSupplier-Worker-%d")
+      );
     }
-  }
 
-  @VisibleForTesting
-  public int bufferSize()
-  {
-    return records.size();
+    // filter records in buffer and only retain ones whose partition was not 
seeked
+    BlockingQueue<OrderedPartitionableRecord<String, String>> newQ = new 
LinkedBlockingQueue<>(recordBufferSize);
+
+    records.stream()
+           .filter(x -> !partitions.contains(x.getStreamPartition()))
+           .forEachOrdered(newQ::offer);
+
+    records = newQ;
+
+    // restart fetching threads
+    partitionResources.values().forEach(x -> x.stopBackgroundFetch());
   }
 }
diff --git 
a/extensions-core/kinesis-indexing-service/src/main/java/org/apache/druid/indexing/kinesis/KinesisSamplerSpec.java
 
b/extensions-core/kinesis-indexing-service/src/main/java/org/apache/druid/indexing/kinesis/KinesisSamplerSpec.java
index 664b3b5..401efe7 100644
--- 
a/extensions-core/kinesis-indexing-service/src/main/java/org/apache/druid/indexing/kinesis/KinesisSamplerSpec.java
+++ 
b/extensions-core/kinesis-indexing-service/src/main/java/org/apache/druid/indexing/kinesis/KinesisSamplerSpec.java
@@ -71,7 +71,8 @@ public class KinesisSamplerSpec extends 
SeekableStreamSamplerSpec
         tuningConfig.getRecordBufferOfferTimeout(),
         tuningConfig.getRecordBufferFullWait(),
         tuningConfig.getFetchSequenceNumberTimeout(),
-        tuningConfig.getMaxRecordsPerPoll()
+        tuningConfig.getMaxRecordsPerPoll(),
+        ioConfig.isUseEarliestSequenceNumber()
     );
   }
 }
diff --git 
a/extensions-core/kinesis-indexing-service/src/main/java/org/apache/druid/indexing/kinesis/KinesisSequenceNumber.java
 
b/extensions-core/kinesis-indexing-service/src/main/java/org/apache/druid/indexing/kinesis/KinesisSequenceNumber.java
index 4bd8fe8..ab13f5b 100644
--- 
a/extensions-core/kinesis-indexing-service/src/main/java/org/apache/druid/indexing/kinesis/KinesisSequenceNumber.java
+++ 
b/extensions-core/kinesis-indexing-service/src/main/java/org/apache/druid/indexing/kinesis/KinesisSequenceNumber.java
@@ -36,27 +36,33 @@ public class KinesisSequenceNumber extends 
OrderedSequenceNumber<String>
    */
   public static final String END_OF_SHARD_MARKER = "EOS";
 
-  // this special marker is used by the KinesisSupervisor to set the endOffsets
-  // of newly created indexing tasks. This is necessary because streaming 
tasks do not
-  // have endPartitionOffsets. This marker signals to the task that it should 
continue
-  // to ingest data until taskDuration has elapsed or the task was stopped or 
paused or killed
+
+  /**
+   *  This special marker is used by the KinesisSupervisor to set the 
endOffsets of newly created indexing tasks. This
+   *  is necessary because streaming tasks do not have endPartitionOffsets. 
This marker signals to the task that it
+   *  should continue to ingest data until taskDuration has elapsed or the 
task was stopped or paused or killed.
+   */
   public static final String NO_END_SEQUENCE_NUMBER = "NO_END_SEQUENCE_NUMBER";
 
-  // this special marker is used by the KinesisSupervisor to mark that a shard 
has been expired
-  // (i.e., closed and then the retention period has passed)
+
+  /**
+   * This special marker is used by the KinesisSupervisor to mark that a shard 
has been expired
+   * (i.e., closed and then the retention period has passed)
+   */
   public static final String EXPIRED_MARKER = "EXPIRED";
 
-  // this flag is used to indicate either END_OF_SHARD_MARKER
-  // or NO_END_SEQUENCE_NUMBER so that they can be properly compared
-  // with other sequence numbers
+  /**
+   * this flag is used to indicate either END_OF_SHARD_MARKER
+   * or NO_END_SEQUENCE_NUMBER so that they can be properly compared
+   * with other sequence numbers
+   */
   private final boolean isMaxSequenceNumber;
   private final BigInteger intSequence;
 
   private KinesisSequenceNumber(String sequenceNumber, boolean isExclusive)
   {
     super(sequenceNumber, isExclusive);
-    if (END_OF_SHARD_MARKER.equals(sequenceNumber)
-        || NO_END_SEQUENCE_NUMBER.equals(sequenceNumber)) {
+    if (END_OF_SHARD_MARKER.equals(sequenceNumber) || 
NO_END_SEQUENCE_NUMBER.equals(sequenceNumber)) {
       isMaxSequenceNumber = true;
       this.intSequence = null;
     } else {
diff --git 
a/extensions-core/kinesis-indexing-service/src/main/java/org/apache/druid/indexing/kinesis/supervisor/KinesisSupervisor.java
 
b/extensions-core/kinesis-indexing-service/src/main/java/org/apache/druid/indexing/kinesis/supervisor/KinesisSupervisor.java
index 0000ee6..494e901 100644
--- 
a/extensions-core/kinesis-indexing-service/src/main/java/org/apache/druid/indexing/kinesis/supervisor/KinesisSupervisor.java
+++ 
b/extensions-core/kinesis-indexing-service/src/main/java/org/apache/druid/indexing/kinesis/supervisor/KinesisSupervisor.java
@@ -22,7 +22,6 @@ package org.apache.druid.indexing.kinesis.supervisor;
 import com.fasterxml.jackson.core.JsonProcessingException;
 import com.fasterxml.jackson.core.type.TypeReference;
 import com.fasterxml.jackson.databind.ObjectMapper;
-import com.google.common.base.Joiner;
 import com.google.common.collect.ImmutableMap;
 import org.apache.druid.common.aws.AWSCredentialsConfig;
 import org.apache.druid.indexer.TaskIdUtils;
@@ -49,7 +48,6 @@ import 
org.apache.druid.indexing.seekablestream.SeekableStreamSequenceNumbers;
 import 
org.apache.druid.indexing.seekablestream.SeekableStreamStartSequenceNumbers;
 import org.apache.druid.indexing.seekablestream.common.OrderedSequenceNumber;
 import org.apache.druid.indexing.seekablestream.common.RecordSupplier;
-import org.apache.druid.indexing.seekablestream.common.StreamPartition;
 import 
org.apache.druid.indexing.seekablestream.supervisor.SeekableStreamSupervisor;
 import 
org.apache.druid.indexing.seekablestream.supervisor.SeekableStreamSupervisorIOConfig;
 import 
org.apache.druid.indexing.seekablestream.supervisor.SeekableStreamSupervisorReportPayload;
@@ -64,6 +62,7 @@ import java.util.List;
 import java.util.Map;
 import java.util.Set;
 import java.util.TreeMap;
+import java.util.stream.Collectors;
 
 /**
  * Supervisor responsible for managing the KinesisIndexTask for a single 
dataSource. At a high level, the class accepts a
@@ -82,7 +81,7 @@ public class KinesisSupervisor extends 
SeekableStreamSupervisor<String, String>
       {
       };
 
-  public static final String NOT_SET = "-1";
+  public static final String OFFSET_NOT_SET = "-1";
   private final KinesisSupervisorSpec spec;
   private final AWSCredentialsConfig awsCredentialsConfig;
   private volatile Map<String, Long> currentPartitionTimeLag;
@@ -167,7 +166,7 @@ public class KinesisSupervisor extends 
SeekableStreamSupervisor<String, String>
 
     List<SeekableStreamIndexTask<String, String>> taskList = new ArrayList<>();
     for (int i = 0; i < replicas; i++) {
-      String taskId = Joiner.on("_").join(baseSequenceName, 
TaskIdUtils.getRandomId());
+      String taskId = TaskIdUtils.getRandomIdWithPrefix(baseSequenceName);
       taskList.add(new KinesisIndexTask(
           taskId,
           new TaskResource(baseSequenceName, 1),
@@ -187,8 +186,7 @@ public class KinesisSupervisor extends 
SeekableStreamSupervisor<String, String>
 
 
   @Override
-  protected RecordSupplier<String, String> setupRecordSupplier()
-      throws RuntimeException
+  protected RecordSupplier<String, String> setupRecordSupplier() throws 
RuntimeException
   {
     KinesisSupervisorIOConfig ioConfig = spec.getIoConfig();
     KinesisIndexTaskTuningConfig taskTuningConfig = spec.getTuningConfig();
@@ -202,13 +200,14 @@ public class KinesisSupervisor extends 
SeekableStreamSupervisor<String, String>
         ),
         ioConfig.getRecordsPerFetch(),
         ioConfig.getFetchDelayMillis(),
-        1,
+        0, // skip starting background fetch, it is not used
         ioConfig.isDeaggregate(),
         taskTuningConfig.getRecordBufferSize(),
         taskTuningConfig.getRecordBufferOfferTimeout(),
         taskTuningConfig.getRecordBufferFullWait(),
         taskTuningConfig.getFetchSequenceNumberTimeout(),
-        taskTuningConfig.getMaxRecordsPerPoll()
+        taskTuningConfig.getMaxRecordsPerPoll(),
+        ioConfig.isUseEarliestSequenceNumber()
     );
   }
 
@@ -296,7 +295,19 @@ public class KinesisSupervisor extends 
SeekableStreamSupervisor<String, String>
   @Override
   protected Map<String, Long> getTimeLagPerPartition(Map<String, String> 
currentOffsets)
   {
-    return ((KinesisRecordSupplier) 
recordSupplier).getPartitionTimeLag(currentOffsets);
+    return currentOffsets
+        .entrySet()
+        .stream()
+        .filter(e -> e.getValue() != null &&
+                     currentPartitionTimeLag != null &&
+                     currentPartitionTimeLag.get(e.getKey()) != null
+        )
+        .collect(
+            Collectors.toMap(
+                Map.Entry::getKey,
+                e -> currentPartitionTimeLag.get(e.getKey())
+            )
+        );
   }
 
   @Override
@@ -315,13 +326,11 @@ public class KinesisSupervisor extends 
SeekableStreamSupervisor<String, String>
   }
 
   @Override
-  protected void updateLatestSequenceFromStream(
-      RecordSupplier<String, String> recordSupplier,
-      Set<StreamPartition<String>> streamPartitions
-  )
+  protected void updatePartitionLagFromStream()
   {
     KinesisRecordSupplier supplier = (KinesisRecordSupplier) recordSupplier;
-    currentPartitionTimeLag = 
supplier.getPartitionTimeLag(getHighestCurrentOffsets());
+    // this recordSupplier method is thread safe, so does not need to acquire 
the recordSupplierLock
+    currentPartitionTimeLag = 
supplier.getPartitionsTimeLag(getIoConfig().getStream(), 
getHighestCurrentOffsets());
   }
 
   @Override
@@ -345,7 +354,7 @@ public class KinesisSupervisor extends 
SeekableStreamSupervisor<String, String>
   @Override
   protected String getNotSetMarker()
   {
-    return NOT_SET;
+    return OFFSET_NOT_SET;
   }
 
   @Override
@@ -454,7 +463,7 @@ public class KinesisSupervisor extends 
SeekableStreamSupervisor<String, String>
         }
       }
 
-      newSequences = new SeekableStreamStartSequenceNumbers<String, String>(
+      newSequences = new SeekableStreamStartSequenceNumbers<>(
           old.getStream(),
           null,
           newPartitionSequenceNumberMap,
@@ -462,7 +471,7 @@ public class KinesisSupervisor extends 
SeekableStreamSupervisor<String, String>
           newExclusiveStartPartitions
       );
     } else {
-      newSequences = new SeekableStreamEndSequenceNumbers<String, String>(
+      newSequences = new SeekableStreamEndSequenceNumbers<>(
           old.getStream(),
           null,
           newPartitionSequenceNumberMap,
diff --git 
a/extensions-core/kinesis-indexing-service/src/test/java/org/apache/druid/indexing/kinesis/KinesisIndexTaskTest.java
 
b/extensions-core/kinesis-indexing-service/src/test/java/org/apache/druid/indexing/kinesis/KinesisIndexTaskTest.java
index c886065..4daa2fd 100644
--- 
a/extensions-core/kinesis-indexing-service/src/test/java/org/apache/druid/indexing/kinesis/KinesisIndexTaskTest.java
+++ 
b/extensions-core/kinesis-indexing-service/src/test/java/org/apache/druid/indexing/kinesis/KinesisIndexTaskTest.java
@@ -2034,7 +2034,7 @@ public class KinesisIndexTaskTest extends 
SeekableStreamIndexTaskTestBase
             .andReturn(Collections.emptyList())
             .anyTimes();
 
-    EasyMock.expect(recordSupplier.getPartitionTimeLag(EasyMock.anyObject()))
+    EasyMock.expect(recordSupplier.getPartitionsTimeLag(EasyMock.anyString(), 
EasyMock.anyObject()))
             .andReturn(null)
             .anyTimes();
 
diff --git 
a/extensions-core/kinesis-indexing-service/src/test/java/org/apache/druid/indexing/kinesis/KinesisRecordSupplierTest.java
 
b/extensions-core/kinesis-indexing-service/src/test/java/org/apache/druid/indexing/kinesis/KinesisRecordSupplierTest.java
index 4a631e8..6b2f32f 100644
--- 
a/extensions-core/kinesis-indexing-service/src/test/java/org/apache/druid/indexing/kinesis/KinesisRecordSupplierTest.java
+++ 
b/extensions-core/kinesis-indexing-service/src/test/java/org/apache/druid/indexing/kinesis/KinesisRecordSupplierTest.java
@@ -203,7 +203,8 @@ public class KinesisRecordSupplierTest extends 
EasyMockSupport
         5000,
         5000,
         60000,
-        5
+        5,
+        true
     );
 
     Assert.assertTrue(recordSupplier.getAssignment().isEmpty());
@@ -212,6 +213,9 @@ public class KinesisRecordSupplierTest extends 
EasyMockSupport
 
     Assert.assertEquals(partitions, recordSupplier.getAssignment());
     Assert.assertEquals(ImmutableSet.of(SHARD_ID1, SHARD_ID0), 
recordSupplier.getPartitionIds(STREAM));
+
+    // calling poll would start background fetch if seek was called, but will 
instead be skipped and the results
+    // empty
     Assert.assertEquals(Collections.emptyList(), recordSupplier.poll(100));
 
     verifyAll();
@@ -290,7 +294,8 @@ public class KinesisRecordSupplierTest extends 
EasyMockSupport
         5000,
         5000,
         60000,
-        100
+        100,
+        true
     );
 
     recordSupplier.assign(partitions);
@@ -308,7 +313,7 @@ public class KinesisRecordSupplierTest extends 
EasyMockSupport
 
     Assert.assertEquals(partitions, recordSupplier.getAssignment());
     Assert.assertTrue(polledRecords.containsAll(ALL_RECORDS));
-    Assert.assertEquals(SHARDS_LAG_MILLIS, 
recordSupplier.getPartitionTimeLag());
+    Assert.assertEquals(SHARDS_LAG_MILLIS, 
recordSupplier.getPartitionResourcesTimeLag());
   }
 
   @Test
@@ -367,7 +372,8 @@ public class KinesisRecordSupplierTest extends 
EasyMockSupport
         5000,
         5000,
         60000,
-        100
+        100,
+        true
     );
 
     recordSupplier.assign(partitions);
@@ -386,7 +392,7 @@ public class KinesisRecordSupplierTest extends 
EasyMockSupport
     Assert.assertEquals(9, polledRecords.size());
     Assert.assertTrue(polledRecords.containsAll(ALL_RECORDS.subList(4, 12)));
     Assert.assertTrue(polledRecords.containsAll(ALL_RECORDS.subList(1, 2)));
-    Assert.assertEquals(SHARDS_LAG_MILLIS, 
recordSupplier.getPartitionTimeLag());
+    Assert.assertEquals(SHARDS_LAG_MILLIS, 
recordSupplier.getPartitionResourcesTimeLag());
   }
 
 
@@ -434,7 +440,8 @@ public class KinesisRecordSupplierTest extends 
EasyMockSupport
         5000,
         5000,
         60000,
-        100
+        100,
+        true
     );
 
     recordSupplier.assign(partitions);
@@ -468,7 +475,8 @@ public class KinesisRecordSupplierTest extends 
EasyMockSupport
         5000,
         5000,
         60000,
-        5
+        5,
+        true
     );
 
     recordSupplier.assign(partitions);
@@ -530,7 +538,8 @@ public class KinesisRecordSupplierTest extends 
EasyMockSupport
         5000,
         5000,
         60000,
-        1
+        1,
+        true
     );
 
     recordSupplier.assign(partitions);
@@ -549,7 +558,7 @@ public class KinesisRecordSupplierTest extends 
EasyMockSupport
     );
 
     // only one partition in this test. first results come from 
getRecordsResult1, which has SHARD1_LAG_MILLIS
-    Assert.assertEquals(ImmutableMap.of(SHARD_ID1, SHARD1_LAG_MILLIS), 
recordSupplier.getPartitionTimeLag());
+    Assert.assertEquals(ImmutableMap.of(SHARD_ID1, SHARD1_LAG_MILLIS), 
recordSupplier.getPartitionResourcesTimeLag());
 
     recordSupplier.seek(StreamPartition.of(STREAM, SHARD_ID1), "7");
     recordSupplier.start();
@@ -563,7 +572,7 @@ public class KinesisRecordSupplierTest extends 
EasyMockSupport
 
     Assert.assertEquals(ALL_RECORDS.get(9), record2);
     // only one partition in this test. second results come from 
getRecordsResult0, which has SHARD0_LAG_MILLIS
-    Assert.assertEquals(ImmutableMap.of(SHARD_ID1, SHARD0_LAG_MILLIS), 
recordSupplier.getPartitionTimeLag());
+    Assert.assertEquals(ImmutableMap.of(SHARD_ID1, SHARD0_LAG_MILLIS), 
recordSupplier.getPartitionResourcesTimeLag());
     verifyAll();
   }
 
@@ -622,7 +631,8 @@ public class KinesisRecordSupplierTest extends 
EasyMockSupport
         5000,
         5000,
         60000,
-        100
+        100,
+        true
     );
 
     recordSupplier.assign(partitions);
@@ -640,7 +650,7 @@ public class KinesisRecordSupplierTest extends 
EasyMockSupport
 
     Assert.assertEquals(partitions, recordSupplier.getAssignment());
     Assert.assertTrue(polledRecords.containsAll(ALL_RECORDS));
-    Assert.assertEquals(SHARDS_LAG_MILLIS, 
recordSupplier.getPartitionTimeLag());
+    Assert.assertEquals(SHARDS_LAG_MILLIS, 
recordSupplier.getPartitionResourcesTimeLag());
   }
 
   @Test
@@ -692,7 +702,8 @@ public class KinesisRecordSupplierTest extends 
EasyMockSupport
         5000,
         5000,
         1000,
-        100
+        100,
+        true
     );
     return recordSupplier;
   }
@@ -705,16 +716,14 @@ public class KinesisRecordSupplierTest extends 
EasyMockSupport
         EasyMock.eq(SHARD_ID0),
         EasyMock.anyString(),
         EasyMock.anyString()
-    )).andReturn(
-        getShardIteratorResult0).anyTimes();
+    )).andReturn(getShardIteratorResult0).anyTimes();
 
     EasyMock.expect(kinesis.getShardIterator(
         EasyMock.anyObject(),
         EasyMock.eq(SHARD_ID1),
         EasyMock.anyString(),
         EasyMock.anyString()
-    )).andReturn(
-        getShardIteratorResult1).anyTimes();
+    )).andReturn(getShardIteratorResult1).anyTimes();
 
     
EasyMock.expect(getShardIteratorResult0.getShardIterator()).andReturn(SHARD0_ITERATOR).anyTimes();
     
EasyMock.expect(getShardIteratorResult1.getShardIterator()).andReturn(SHARD1_ITERATOR).anyTimes();
@@ -728,8 +737,8 @@ public class KinesisRecordSupplierTest extends 
EasyMockSupport
     
EasyMock.expect(getRecordsResult1.getRecords()).andReturn(SHARD1_RECORDS).once();
     
EasyMock.expect(getRecordsResult0.getNextShardIterator()).andReturn(null).anyTimes();
     
EasyMock.expect(getRecordsResult1.getNextShardIterator()).andReturn(null).anyTimes();
-    
EasyMock.expect(getRecordsResult0.getMillisBehindLatest()).andReturn(SHARD0_LAG_MILLIS).once();
-    
EasyMock.expect(getRecordsResult1.getMillisBehindLatest()).andReturn(SHARD1_LAG_MILLIS).once();
+    
EasyMock.expect(getRecordsResult0.getMillisBehindLatest()).andReturn(SHARD0_LAG_MILLIS).times(2);
+    
EasyMock.expect(getRecordsResult1.getMillisBehindLatest()).andReturn(SHARD1_LAG_MILLIS).times(2);
 
     replayAll();
 
@@ -738,7 +747,6 @@ public class KinesisRecordSupplierTest extends 
EasyMockSupport
         StreamPartition.of(STREAM, SHARD_ID1)
     );
 
-
     recordSupplier = new KinesisRecordSupplier(
         kinesis,
         recordsPerFetch,
@@ -749,7 +757,8 @@ public class KinesisRecordSupplierTest extends 
EasyMockSupport
         5000,
         5000,
         60000,
-        100
+        100,
+        true
     );
 
     recordSupplier.assign(partitions);
@@ -760,16 +769,19 @@ public class KinesisRecordSupplierTest extends 
EasyMockSupport
       Thread.sleep(100);
     }
 
+    Map<String, Long> timeLag = recordSupplier.getPartitionResourcesTimeLag();
+
+
+    Assert.assertEquals(partitions, recordSupplier.getAssignment());
+    Assert.assertEquals(SHARDS_LAG_MILLIS, timeLag);
+
     Map<String, String> offsts = ImmutableMap.of(
         SHARD_ID1, SHARD1_RECORDS.get(0).getSequenceNumber(),
         SHARD_ID0, SHARD0_RECORDS.get(0).getSequenceNumber()
     );
-    Map<String, Long> timeLag = recordSupplier.getPartitionTimeLag(offsts);
-
+    Map<String, Long> independentTimeLag = 
recordSupplier.getPartitionsTimeLag(STREAM, offsts);
+    Assert.assertEquals(SHARDS_LAG_MILLIS, independentTimeLag);
     verifyAll();
-
-    Assert.assertEquals(partitions, recordSupplier.getAssignment());
-    Assert.assertEquals(SHARDS_LAG_MILLIS, timeLag);
   }
 
   /**
diff --git 
a/extensions-core/kinesis-indexing-service/src/test/java/org/apache/druid/indexing/kinesis/supervisor/KinesisSupervisorTest.java
 
b/extensions-core/kinesis-indexing-service/src/test/java/org/apache/druid/indexing/kinesis/supervisor/KinesisSupervisorTest.java
index 816e0aa..f3a21d9 100644
--- 
a/extensions-core/kinesis-indexing-service/src/test/java/org/apache/druid/indexing/kinesis/supervisor/KinesisSupervisorTest.java
+++ 
b/extensions-core/kinesis-indexing-service/src/test/java/org/apache/druid/indexing/kinesis/supervisor/KinesisSupervisorTest.java
@@ -129,7 +129,6 @@ public class KinesisSupervisorTest extends EasyMockSupport
   private static final StreamPartition<String> SHARD0_PARTITION = 
StreamPartition.of(STREAM, SHARD_ID0);
   private static final StreamPartition<String> SHARD1_PARTITION = 
StreamPartition.of(STREAM, SHARD_ID1);
   private static final StreamPartition<String> SHARD2_PARTITION = 
StreamPartition.of(STREAM, SHARD_ID2);
-  private static final Map<String, Long> TIME_LAG = ImmutableMap.of(SHARD_ID1, 
9000L, SHARD_ID0, 1234L);
   private static DataSchema dataSchema;
   private KinesisRecordSupplier supervisorRecordSupplier;
 
@@ -232,9 +231,6 @@ public class KinesisSupervisorTest extends EasyMockSupport
     
EasyMock.expect(supervisorRecordSupplier.getEarliestSequenceNumber(EasyMock.anyObject())).andReturn("0").anyTimes();
     supervisorRecordSupplier.seek(EasyMock.anyObject(), EasyMock.anyString());
     EasyMock.expectLastCall().anyTimes();
-    
EasyMock.expect(supervisorRecordSupplier.getPartitionTimeLag(EasyMock.anyObject()))
-            .andReturn(TIME_LAG)
-            .atLeastOnce();
 
     Capture<KinesisIndexTask> captured = Capture.newInstance();
     
EasyMock.expect(taskMaster.getTaskQueue()).andReturn(Optional.of(taskQueue)).anyTimes();
@@ -285,6 +281,68 @@ public class KinesisSupervisorTest extends EasyMockSupport
   }
 
   @Test
+  public void testRecordSupplier()
+  {
+    KinesisSupervisorIOConfig kinesisSupervisorIOConfig = new 
KinesisSupervisorIOConfig(
+        STREAM,
+        INPUT_FORMAT,
+        "awsEndpoint",
+        null,
+        1,
+        1,
+        new Period("PT30M"),
+        new Period("P1D"),
+        new Period("PT30S"),
+        false,
+        new Period("PT30M"),
+        null,
+        null,
+        null,
+        100,
+        1000,
+        null,
+        null,
+        false
+    );
+    KinesisIndexTaskClientFactory clientFactory = new 
KinesisIndexTaskClientFactory(null, OBJECT_MAPPER);
+    KinesisSupervisor supervisor = new KinesisSupervisor(
+        taskStorage,
+        taskMaster,
+        indexerMetadataStorageCoordinator,
+        clientFactory,
+        OBJECT_MAPPER,
+        new KinesisSupervisorSpec(
+            null,
+            dataSchema,
+            tuningConfig,
+            kinesisSupervisorIOConfig,
+            null,
+            false,
+            taskStorage,
+            taskMaster,
+            indexerMetadataStorageCoordinator,
+            clientFactory,
+            OBJECT_MAPPER,
+            new NoopServiceEmitter(),
+            new DruidMonitorSchedulerConfig(),
+            rowIngestionMetersFactory,
+            null,
+            new SupervisorStateManagerConfig()
+        ),
+        rowIngestionMetersFactory,
+        null
+    );
+
+    KinesisRecordSupplier supplier = (KinesisRecordSupplier) 
supervisor.setupRecordSupplier();
+    Assert.assertNotNull(supplier);
+    Assert.assertEquals(0, supplier.bufferSize());
+    Assert.assertEquals(Collections.emptySet(), supplier.getAssignment());
+    // background fetch should not be enabled for supervisor supplier
+    supplier.start();
+    Assert.assertFalse(supplier.isBackgroundFetchRunning());
+  }
+
+  @Test
   public void testMultiTask() throws Exception
   {
     supervisor = getTestableSupervisor(1, 2, true, "PT1H", null, null);
@@ -302,9 +360,6 @@ public class KinesisSupervisorTest extends EasyMockSupport
     
EasyMock.expect(supervisorRecordSupplier.getEarliestSequenceNumber(EasyMock.anyObject())).andReturn("0").anyTimes();
     supervisorRecordSupplier.seek(EasyMock.anyObject(), EasyMock.anyString());
     EasyMock.expectLastCall().anyTimes();
-    
EasyMock.expect(supervisorRecordSupplier.getPartitionTimeLag(EasyMock.anyObject()))
-            .andReturn(TIME_LAG)
-            .atLeastOnce();
 
     Capture<KinesisIndexTask> captured = Capture.newInstance(CaptureType.ALL);
     
EasyMock.expect(taskMaster.getTaskQueue()).andReturn(Optional.of(taskQueue)).anyTimes();
@@ -364,9 +419,6 @@ public class KinesisSupervisorTest extends EasyMockSupport
     
EasyMock.expect(supervisorRecordSupplier.getEarliestSequenceNumber(EasyMock.anyObject())).andReturn("0").anyTimes();
     supervisorRecordSupplier.seek(EasyMock.anyObject(), EasyMock.anyString());
     EasyMock.expectLastCall().anyTimes();
-    
EasyMock.expect(supervisorRecordSupplier.getPartitionTimeLag(EasyMock.anyObject()))
-            .andReturn(TIME_LAG)
-            .atLeastOnce();
 
     Capture<KinesisIndexTask> captured = Capture.newInstance(CaptureType.ALL);
     
EasyMock.expect(taskMaster.getTaskQueue()).andReturn(Optional.of(taskQueue)).anyTimes();
@@ -444,9 +496,6 @@ public class KinesisSupervisorTest extends EasyMockSupport
     
EasyMock.expect(supervisorRecordSupplier.getEarliestSequenceNumber(EasyMock.anyObject())).andReturn("0").anyTimes();
     supervisorRecordSupplier.seek(EasyMock.anyObject(), EasyMock.anyString());
     EasyMock.expectLastCall().anyTimes();
-    
EasyMock.expect(supervisorRecordSupplier.getPartitionTimeLag(EasyMock.anyObject()))
-            .andReturn(TIME_LAG)
-            .atLeastOnce();
 
     Capture<KinesisIndexTask> captured = Capture.newInstance(CaptureType.ALL);
     
EasyMock.expect(taskMaster.getTaskQueue()).andReturn(Optional.of(taskQueue)).anyTimes();
@@ -499,9 +548,6 @@ public class KinesisSupervisorTest extends EasyMockSupport
     
EasyMock.expect(supervisorRecordSupplier.getEarliestSequenceNumber(EasyMock.anyObject())).andReturn("0").anyTimes();
     supervisorRecordSupplier.seek(EasyMock.anyObject(), EasyMock.anyString());
     EasyMock.expectLastCall().anyTimes();
-    
EasyMock.expect(supervisorRecordSupplier.getPartitionTimeLag(EasyMock.anyObject()))
-            .andReturn(TIME_LAG)
-            .atLeastOnce();
 
     Capture<KinesisIndexTask> captured = Capture.newInstance(CaptureType.ALL);
     
EasyMock.expect(taskMaster.getTaskQueue()).andReturn(Optional.of(taskQueue)).anyTimes();
@@ -561,9 +607,6 @@ public class KinesisSupervisorTest extends EasyMockSupport
     
EasyMock.expect(supervisorRecordSupplier.getLatestSequenceNumber(EasyMock.anyObject())).andReturn("100").anyTimes();
     supervisorRecordSupplier.seek(EasyMock.anyObject(), EasyMock.anyString());
     EasyMock.expectLastCall().anyTimes();
-    
EasyMock.expect(supervisorRecordSupplier.getPartitionTimeLag(EasyMock.anyObject()))
-            .andReturn(TIME_LAG)
-            .atLeastOnce();
 
     Capture<KinesisIndexTask> captured = Capture.newInstance();
     
EasyMock.expect(taskMaster.getTaskQueue()).andReturn(Optional.of(taskQueue)).anyTimes();
@@ -661,9 +704,6 @@ public class KinesisSupervisorTest extends EasyMockSupport
     
EasyMock.expect(supervisorRecordSupplier.getLatestSequenceNumber(EasyMock.anyObject())).andReturn("100").anyTimes();
     supervisorRecordSupplier.seek(EasyMock.anyObject(), EasyMock.anyString());
     EasyMock.expectLastCall().anyTimes();
-    
EasyMock.expect(supervisorRecordSupplier.getPartitionTimeLag(EasyMock.anyObject()))
-            .andReturn(TIME_LAG)
-            .atLeastOnce();
 
     // non KinesisIndexTask (don't kill)
     Task id2 = new RealtimeIndexTask(
@@ -730,9 +770,6 @@ public class KinesisSupervisorTest extends EasyMockSupport
     
EasyMock.expect(supervisorRecordSupplier.getLatestSequenceNumber(EasyMock.anyObject())).andReturn("100").anyTimes();
     supervisorRecordSupplier.seek(EasyMock.anyObject(), EasyMock.anyString());
     EasyMock.expectLastCall().anyTimes();
-    
EasyMock.expect(supervisorRecordSupplier.getPartitionTimeLag(EasyMock.anyObject()))
-            .andReturn(TIME_LAG)
-            .atLeastOnce();
 
     Task id1 = createKinesisIndexTask(
         "id1",
@@ -845,9 +882,6 @@ public class KinesisSupervisorTest extends EasyMockSupport
     
EasyMock.expect(supervisorRecordSupplier.getLatestSequenceNumber(EasyMock.anyObject())).andReturn("100").anyTimes();
     supervisorRecordSupplier.seek(EasyMock.anyObject(), EasyMock.anyString());
     EasyMock.expectLastCall().anyTimes();
-    
EasyMock.expect(supervisorRecordSupplier.getPartitionTimeLag(EasyMock.anyObject()))
-            .andReturn(TIME_LAG)
-            .atLeastOnce();
 
     Capture<Task> captured = Capture.newInstance(CaptureType.ALL);
     
EasyMock.expect(taskMaster.getTaskQueue()).andReturn(Optional.of(taskQueue)).anyTimes();
@@ -959,9 +993,6 @@ public class KinesisSupervisorTest extends EasyMockSupport
     
EasyMock.expect(supervisorRecordSupplier.getLatestSequenceNumber(EasyMock.anyObject())).andReturn("100").anyTimes();
     supervisorRecordSupplier.seek(EasyMock.anyObject(), EasyMock.anyString());
     EasyMock.expectLastCall().anyTimes();
-    
EasyMock.expect(supervisorRecordSupplier.getPartitionTimeLag(EasyMock.anyObject()))
-            .andReturn(TIME_LAG)
-            .atLeastOnce();
 
     DateTime now = DateTimes.nowUtc();
     DateTime maxi = now.plusMinutes(60);
@@ -1099,9 +1130,6 @@ public class KinesisSupervisorTest extends EasyMockSupport
     
EasyMock.expect(supervisorRecordSupplier.getLatestSequenceNumber(EasyMock.anyObject())).andReturn("100").anyTimes();
     supervisorRecordSupplier.seek(EasyMock.anyObject(), EasyMock.anyString());
     EasyMock.expectLastCall().anyTimes();
-    
EasyMock.expect(supervisorRecordSupplier.getPartitionTimeLag(EasyMock.anyObject()))
-            .andReturn(TIME_LAG)
-            .atLeastOnce();
 
     Capture<Task> captured = Capture.newInstance(CaptureType.ALL);
     
EasyMock.expect(taskMaster.getTaskQueue()).andReturn(Optional.of(taskQueue)).anyTimes();
@@ -1228,9 +1256,6 @@ public class KinesisSupervisorTest extends EasyMockSupport
     
EasyMock.expect(supervisorRecordSupplier.getLatestSequenceNumber(SHARD0_PARTITION)).andReturn("1").anyTimes();
     supervisorRecordSupplier.seek(EasyMock.anyObject(), EasyMock.anyString());
     EasyMock.expectLastCall().anyTimes();
-    
EasyMock.expect(supervisorRecordSupplier.getPartitionTimeLag(EasyMock.anyObject()))
-            .andReturn(TIME_LAG)
-            .atLeastOnce();
 
     final Capture<Task> firstTasks = Capture.newInstance(CaptureType.ALL);
     
EasyMock.expect(taskMaster.getTaskQueue()).andReturn(Optional.of(taskQueue)).anyTimes();
@@ -1361,7 +1386,7 @@ public class KinesisSupervisorTest extends EasyMockSupport
     
EasyMock.expect(supervisorRecordSupplier.getLatestSequenceNumber(SHARD0_PARTITION)).andReturn("1").anyTimes();
     supervisorRecordSupplier.seek(EasyMock.anyObject(), EasyMock.anyString());
     EasyMock.expectLastCall().anyTimes();
-    
EasyMock.expect(supervisorRecordSupplier.getPartitionTimeLag(EasyMock.anyObject()))
+    
EasyMock.expect(supervisorRecordSupplier.getPartitionsTimeLag(EasyMock.anyString(),
 EasyMock.anyObject()))
             .andReturn(timeLag)
             .atLeastOnce();
 
@@ -1453,9 +1478,6 @@ public class KinesisSupervisorTest extends EasyMockSupport
     Assert.assertEquals(SupervisorStateManager.BasicState.RUNNING, 
payload.getDetailedState());
     Assert.assertEquals(0, payload.getRecentErrors().size());
 
-    Assert.assertEquals(timeLag, payload.getMinimumLagMillis());
-    Assert.assertEquals(20000000L, (long) payload.getAggregateLagMillis());
-
     TaskReportData publishingReport = payload.getPublishingTasks().get(0);
 
     Assert.assertEquals("id1", publishingReport.getId());
@@ -1525,7 +1547,7 @@ public class KinesisSupervisorTest extends EasyMockSupport
     
EasyMock.expect(supervisorRecordSupplier.getLatestSequenceNumber(SHARD0_PARTITION)).andReturn("1").anyTimes();
     supervisorRecordSupplier.seek(EasyMock.anyObject(), EasyMock.anyString());
     EasyMock.expectLastCall().anyTimes();
-    
EasyMock.expect(supervisorRecordSupplier.getPartitionTimeLag(EasyMock.anyObject()))
+    
EasyMock.expect(supervisorRecordSupplier.getPartitionsTimeLag(EasyMock.anyString(),
 EasyMock.anyObject()))
             .andReturn(timeLag)
             .atLeastOnce();
 
@@ -1605,9 +1627,6 @@ public class KinesisSupervisorTest extends EasyMockSupport
     Assert.assertEquals(1, payload.getPublishingTasks().size());
     Assert.assertEquals(SupervisorStateManager.BasicState.RUNNING, 
payload.getDetailedState());
     Assert.assertEquals(0, payload.getRecentErrors().size());
-    Assert.assertEquals(timeLag, payload.getMinimumLagMillis());
-    Assert.assertEquals(9000L + 1234L, (long) payload.getAggregateLagMillis());
-
 
     TaskReportData publishingReport = payload.getPublishingTasks().get(0);
 
@@ -1681,7 +1700,7 @@ public class KinesisSupervisorTest extends EasyMockSupport
     
EasyMock.expect(supervisorRecordSupplier.getLatestSequenceNumber(SHARD0_PARTITION)).andReturn("1").anyTimes();
     supervisorRecordSupplier.seek(EasyMock.anyObject(), EasyMock.anyString());
     EasyMock.expectLastCall().anyTimes();
-    
EasyMock.expect(supervisorRecordSupplier.getPartitionTimeLag(EasyMock.anyObject()))
+    
EasyMock.expect(supervisorRecordSupplier.getPartitionsTimeLag(EasyMock.anyString(),
 EasyMock.anyObject()))
             .andReturn(timeLag)
             .atLeastOnce();
     Task id1 = createKinesisIndexTask(
@@ -1859,9 +1878,6 @@ public class KinesisSupervisorTest extends EasyMockSupport
     
EasyMock.expect(supervisorRecordSupplier.getLatestSequenceNumber(SHARD0_PARTITION)).andReturn("1").anyTimes();
     supervisorRecordSupplier.seek(EasyMock.anyObject(), EasyMock.anyString());
     EasyMock.expectLastCall().anyTimes();
-    
EasyMock.expect(supervisorRecordSupplier.getPartitionTimeLag(EasyMock.anyObject()))
-            .andReturn(TIME_LAG)
-            .atLeastOnce();
 
     Capture<Task> captured = Capture.newInstance(CaptureType.ALL);
     
EasyMock.expect(taskMaster.getTaskQueue()).andReturn(Optional.of(taskQueue)).anyTimes();
@@ -1944,9 +1960,6 @@ public class KinesisSupervisorTest extends EasyMockSupport
     
EasyMock.expect(supervisorRecordSupplier.getLatestSequenceNumber(SHARD0_PARTITION)).andReturn("1").anyTimes();
     supervisorRecordSupplier.seek(EasyMock.anyObject(), EasyMock.anyString());
     EasyMock.expectLastCall().anyTimes();
-    
EasyMock.expect(supervisorRecordSupplier.getPartitionTimeLag(EasyMock.anyObject()))
-            .andReturn(TIME_LAG)
-            .atLeastOnce();
 
     Capture<Task> captured = Capture.newInstance(CaptureType.ALL);
     
EasyMock.expect(taskMaster.getTaskQueue()).andReturn(Optional.of(taskQueue)).anyTimes();
@@ -2056,10 +2069,6 @@ public class KinesisSupervisorTest extends 
EasyMockSupport
     supervisorRecordSupplier.seek(EasyMock.anyObject(), EasyMock.anyString());
     EasyMock.expectLastCall().anyTimes();
 
-    
EasyMock.expect(supervisorRecordSupplier.getPartitionTimeLag(EasyMock.anyObject()))
-            .andReturn(TIME_LAG)
-            .atLeastOnce();
-
     Capture<Task> captured = Capture.newInstance(CaptureType.ALL);
     
EasyMock.expect(taskMaster.getTaskQueue()).andReturn(Optional.of(taskQueue)).anyTimes();
     
EasyMock.expect(taskMaster.getTaskRunner()).andReturn(Optional.of(taskRunner)).anyTimes();
@@ -2207,9 +2216,6 @@ public class KinesisSupervisorTest extends EasyMockSupport
     
EasyMock.expect(supervisorRecordSupplier.getLatestSequenceNumber(SHARD0_PARTITION)).andReturn("1").anyTimes();
     supervisorRecordSupplier.seek(EasyMock.anyObject(), EasyMock.anyString());
     EasyMock.expectLastCall().anyTimes();
-    
EasyMock.expect(supervisorRecordSupplier.getPartitionTimeLag(EasyMock.anyObject()))
-            .andReturn(TIME_LAG)
-            .atLeastOnce();
 
     Task id1 = createKinesisIndexTask(
         "id1",
@@ -2618,9 +2624,6 @@ public class KinesisSupervisorTest extends EasyMockSupport
     
EasyMock.expect(supervisorRecordSupplier.getLatestSequenceNumber(SHARD0_PARTITION)).andReturn("1").anyTimes();
     supervisorRecordSupplier.seek(EasyMock.anyObject(), EasyMock.anyString());
     EasyMock.expectLastCall().anyTimes();
-    
EasyMock.expect(supervisorRecordSupplier.getPartitionTimeLag(EasyMock.anyObject()))
-            .andReturn(TIME_LAG)
-            .atLeastOnce();
 
     Task id1 = createKinesisIndexTask(
         "id1",
@@ -2847,9 +2850,6 @@ public class KinesisSupervisorTest extends EasyMockSupport
     
EasyMock.expect(supervisorRecordSupplier.getLatestSequenceNumber(SHARD0_PARTITION)).andReturn("1").anyTimes();
     supervisorRecordSupplier.seek(EasyMock.anyObject(), EasyMock.anyString());
     EasyMock.expectLastCall().anyTimes();
-    
EasyMock.expect(supervisorRecordSupplier.getPartitionTimeLag(EasyMock.anyObject()))
-            .andReturn(TIME_LAG)
-            .atLeastOnce();
 
     
EasyMock.expect(taskMaster.getTaskQueue()).andReturn(Optional.of(taskQueue)).anyTimes();
     
EasyMock.expect(taskMaster.getTaskRunner()).andReturn(Optional.of(taskRunner)).anyTimes();
@@ -3005,9 +3005,6 @@ public class KinesisSupervisorTest extends EasyMockSupport
     
EasyMock.expect(supervisorRecordSupplier.getLatestSequenceNumber(SHARD0_PARTITION)).andReturn("1").anyTimes();
     supervisorRecordSupplier.seek(EasyMock.anyObject(), EasyMock.anyString());
     EasyMock.expectLastCall().anyTimes();
-    
EasyMock.expect(supervisorRecordSupplier.getPartitionTimeLag(EasyMock.anyObject()))
-            .andReturn(TIME_LAG)
-            .atLeastOnce();
 
     
EasyMock.expect(taskRunner.getRunningTasks()).andReturn(workItems).anyTimes();
     
EasyMock.expect(taskMaster.getTaskQueue()).andReturn(Optional.of(taskQueue)).anyTimes();
@@ -3261,9 +3258,6 @@ public class KinesisSupervisorTest extends EasyMockSupport
     
EasyMock.expect(supervisorRecordSupplier.getLatestSequenceNumber(SHARD0_PARTITION)).andReturn("1").anyTimes();
     supervisorRecordSupplier.seek(EasyMock.anyObject(), EasyMock.anyString());
     EasyMock.expectLastCall().anyTimes();
-    
EasyMock.expect(supervisorRecordSupplier.getPartitionTimeLag(EasyMock.anyObject()))
-            .andReturn(TIME_LAG)
-            .atLeastOnce();
 
     Task id1 = createKinesisIndexTask(
         "id1",
@@ -3482,7 +3476,8 @@ public class KinesisSupervisorTest extends EasyMockSupport
   }
 
   @Test
-  public void testDoNotKillCompatibleTasks() throws InterruptedException, 
EntryExistsException
+  public void testDoNotKillCompatibleTasks()
+      throws InterruptedException, EntryExistsException
   {
     // This supervisor always returns true for isTaskCurrent -> it should not 
kill its tasks
     int numReplicas = 2;
@@ -3512,9 +3507,6 @@ public class KinesisSupervisorTest extends EasyMockSupport
     
EasyMock.expect(supervisorRecordSupplier.getLatestSequenceNumber(EasyMock.anyObject())).andReturn("100").anyTimes();
     supervisorRecordSupplier.seek(EasyMock.anyObject(), EasyMock.anyString());
     EasyMock.expectLastCall().anyTimes();
-    
EasyMock.expect(supervisorRecordSupplier.getPartitionTimeLag(EasyMock.anyObject()))
-            .andReturn(TIME_LAG)
-            .atLeastOnce();
 
     Task task = createKinesisIndexTask(
         "id2",
@@ -3582,7 +3574,8 @@ public class KinesisSupervisorTest extends EasyMockSupport
   }
 
   @Test
-  public void testKillIncompatibleTasks() throws InterruptedException, 
EntryExistsException
+  public void testKillIncompatibleTasks()
+      throws InterruptedException, EntryExistsException
   {
     // This supervisor always returns false for isTaskCurrent -> it should 
kill its tasks
     int numReplicas = 2;
@@ -3611,9 +3604,6 @@ public class KinesisSupervisorTest extends EasyMockSupport
     
EasyMock.expect(supervisorRecordSupplier.getLatestSequenceNumber(EasyMock.anyObject())).andReturn("100").anyTimes();
     supervisorRecordSupplier.seek(EasyMock.anyObject(), EasyMock.anyString());
     EasyMock.expectLastCall().anyTimes();
-    
EasyMock.expect(supervisorRecordSupplier.getPartitionTimeLag(EasyMock.anyObject()))
-            .andReturn(TIME_LAG)
-            .atLeastOnce();
 
     Task task = createKinesisIndexTask(
         "id1",
@@ -3856,9 +3846,6 @@ public class KinesisSupervisorTest extends EasyMockSupport
     
EasyMock.expect(supervisorRecordSupplier.getLatestSequenceNumber(EasyMock.anyObject())).andReturn("100").anyTimes();
     supervisorRecordSupplier.seek(EasyMock.anyObject(), EasyMock.anyString());
     EasyMock.expectLastCall().anyTimes();
-    
EasyMock.expect(supervisorRecordSupplier.getPartitionTimeLag(EasyMock.anyObject()))
-            .andReturn(TIME_LAG)
-            .atLeastOnce();
 
     Capture<Task> captured = Capture.newInstance(CaptureType.ALL);
     
EasyMock.expect(taskMaster.getTaskQueue()).andReturn(Optional.of(taskQueue)).anyTimes();
@@ -3967,9 +3954,6 @@ public class KinesisSupervisorTest extends EasyMockSupport
 
     supervisorRecordSupplier.seek(EasyMock.anyObject(), EasyMock.anyString());
     EasyMock.expectLastCall().anyTimes();
-    
EasyMock.expect(supervisorRecordSupplier.getPartitionTimeLag(EasyMock.anyObject()))
-            .andReturn(TIME_LAG)
-            .atLeastOnce();
 
     Capture<Task> postSplitCaptured = Capture.newInstance(CaptureType.ALL);
 
@@ -4145,9 +4129,6 @@ public class KinesisSupervisorTest extends EasyMockSupport
 
     supervisorRecordSupplier.seek(EasyMock.anyObject(), EasyMock.anyString());
     EasyMock.expectLastCall().anyTimes();
-    
EasyMock.expect(supervisorRecordSupplier.getPartitionTimeLag(EasyMock.anyObject()))
-            .andReturn(TIME_LAG)
-            .atLeastOnce();
 
     Capture<Task> postSplitCaptured = Capture.newInstance(CaptureType.ALL);
 
@@ -4311,9 +4292,6 @@ public class KinesisSupervisorTest extends EasyMockSupport
     
EasyMock.expect(supervisorRecordSupplier.getLatestSequenceNumber(EasyMock.anyObject())).andReturn("100").anyTimes();
     supervisorRecordSupplier.seek(EasyMock.anyObject(), EasyMock.anyString());
     EasyMock.expectLastCall().anyTimes();
-    
EasyMock.expect(supervisorRecordSupplier.getPartitionTimeLag(EasyMock.anyObject()))
-            .andReturn(TIME_LAG)
-            .atLeastOnce();
 
     Capture<Task> captured = Capture.newInstance(CaptureType.ALL);
     
EasyMock.expect(taskMaster.getTaskQueue()).andReturn(Optional.of(taskQueue)).anyTimes();
@@ -4433,9 +4411,6 @@ public class KinesisSupervisorTest extends EasyMockSupport
 
     supervisorRecordSupplier.seek(EasyMock.anyObject(), EasyMock.anyString());
     EasyMock.expectLastCall().anyTimes();
-    
EasyMock.expect(supervisorRecordSupplier.getPartitionTimeLag(EasyMock.anyObject()))
-            .andReturn(TIME_LAG)
-            .atLeastOnce();
 
     Capture<Task> postMergeCaptured = Capture.newInstance(CaptureType.ALL);
 
@@ -4590,9 +4565,6 @@ public class KinesisSupervisorTest extends EasyMockSupport
 
     supervisorRecordSupplier.seek(EasyMock.anyObject(), EasyMock.anyString());
     EasyMock.expectLastCall().anyTimes();
-    
EasyMock.expect(supervisorRecordSupplier.getPartitionTimeLag(EasyMock.anyObject()))
-            .andReturn(TIME_LAG)
-            .atLeastOnce();
 
     Capture<Task> postSplitCaptured = Capture.newInstance(CaptureType.ALL);
 
diff --git 
a/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisor.java
 
b/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisor.java
index 247ff0e..cc4b986 100644
--- 
a/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisor.java
+++ 
b/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisor.java
@@ -37,6 +37,8 @@ import com.google.common.util.concurrent.Futures;
 import com.google.common.util.concurrent.ListenableFuture;
 import com.google.common.util.concurrent.ListeningExecutorService;
 import com.google.common.util.concurrent.MoreExecutors;
+import it.unimi.dsi.fastutil.ints.Int2ObjectLinkedOpenHashMap;
+import it.unimi.dsi.fastutil.ints.Int2ObjectMap;
 import org.apache.commons.codec.digest.DigestUtils;
 import org.apache.druid.indexer.TaskLocation;
 import org.apache.druid.indexer.TaskState;
@@ -108,6 +110,7 @@ import java.util.concurrent.ScheduledExecutorService;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.TimeoutException;
 import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.locks.ReentrantLock;
 import java.util.function.BiConsumer;
 import java.util.stream.Collectors;
 import java.util.stream.Stream;
@@ -479,7 +482,7 @@ public abstract class 
SeekableStreamSupervisor<PartitionIdType, SequenceOffsetTy
   private final BlockingQueue<Notice> notices = new LinkedBlockingDeque<>();
   private final Object stopLock = new Object();
   private final Object stateChangeLock = new Object();
-  private final Object recordSupplierLock = new Object();
+  private final ReentrantLock recordSupplierLock = new ReentrantLock();
 
   private final boolean useExclusiveStartingSequence;
   private boolean listenerRegistered = false;
@@ -706,6 +709,11 @@ public abstract class 
SeekableStreamSupervisor<PartitionIdType, SequenceOffsetTy
     notices.add(new ResetNotice(dataSourceMetadata));
   }
 
+  public ReentrantLock getRecordSupplierLock()
+  {
+    return recordSupplierLock;
+  }
+
 
   @VisibleForTesting
   public void tryInit()
@@ -1889,10 +1897,9 @@ public abstract class 
SeekableStreamSupervisor<PartitionIdType, SequenceOffsetTy
   {
     List<PartitionIdType> previousPartitionIds = new ArrayList<>(partitionIds);
     Set<PartitionIdType> partitionIdsFromSupplier;
+    recordSupplierLock.lock();
     try {
-      synchronized (recordSupplierLock) {
-        partitionIdsFromSupplier = 
recordSupplier.getPartitionIds(ioConfig.getStream());
-      }
+      partitionIdsFromSupplier = 
recordSupplier.getPartitionIds(ioConfig.getStream());
     }
     catch (Exception e) {
       stateManager.recordThrowableEvent(e);
@@ -1900,6 +1907,9 @@ public abstract class 
SeekableStreamSupervisor<PartitionIdType, SequenceOffsetTy
       log.debug(e, "full stack trace");
       return false;
     }
+    finally {
+      recordSupplierLock.unlock();
+    }
 
     if (partitionIdsFromSupplier == null || partitionIdsFromSupplier.size() == 
0) {
       String errMsg = StringUtils.format("No partitions found for stream 
[%s]", ioConfig.getStream());
@@ -1989,6 +1999,7 @@ public abstract class 
SeekableStreamSupervisor<PartitionIdType, SequenceOffsetTy
       );
     }
 
+    Int2ObjectMap<List<PartitionIdType>> newlyDiscovered = new 
Int2ObjectLinkedOpenHashMap<>();
     for (PartitionIdType partitionId : activePartitionsIdsFromSupplier) {
       int taskGroupId = getTaskGroupIdForPartition(partitionId);
       Set<PartitionIdType> partitionGroup = partitionGroups.computeIfAbsent(
@@ -1998,16 +2009,30 @@ public abstract class 
SeekableStreamSupervisor<PartitionIdType, SequenceOffsetTy
       partitionGroup.add(partitionId);
 
       if (partitionOffsets.putIfAbsent(partitionId, getNotSetMarker()) == 
null) {
-        log.info(
+        log.debug(
             "New partition [%s] discovered for stream [%s], added to task 
group [%d]",
             partitionId,
             ioConfig.getStream(),
             taskGroupId
         );
+
+        newlyDiscovered.computeIfAbsent(taskGroupId, 
ArrayList::new).add(partitionId);
+      }
+    }
+
+    if (newlyDiscovered.size() > 0) {
+      for (Int2ObjectMap.Entry<List<PartitionIdType>> taskGroupPartitions : 
newlyDiscovered.int2ObjectEntrySet()) {
+        log.info(
+            "New partitions %s discovered for stream [%s], added to task group 
[%s]",
+            taskGroupPartitions.getValue(),
+            ioConfig.getStream(),
+            taskGroupPartitions.getIntKey()
+        );
       }
     }
 
     if (!partitionIds.equals(previousPartitionIds)) {
+      assignRecordSupplierToPartitionIds();
       // the set of partition IDs has changed, have any running tasks stop 
early so that we can adjust to the
       // repartitioning quickly by creating new tasks
       for (TaskGroup taskGroup : activelyReadingTaskGroups.values()) {
@@ -2034,6 +2059,28 @@ public abstract class 
SeekableStreamSupervisor<PartitionIdType, SequenceOffsetTy
     return true;
   }
 
+  private void assignRecordSupplierToPartitionIds()
+  {
+    recordSupplierLock.lock();
+    try {
+      final Set partitions = partitionIds.stream()
+                                         .map(partitionId -> new 
StreamPartition<>(ioConfig.getStream(), partitionId))
+                                         .collect(Collectors.toSet());
+      if (!recordSupplier.getAssignment().containsAll(partitions)) {
+        recordSupplier.assign(partitions);
+        try {
+          recordSupplier.seekToEarliest(partitions);
+        }
+        catch (InterruptedException e) {
+          throw new RuntimeException(e);
+        }
+      }
+    }
+    finally {
+      recordSupplierLock.unlock();
+    }
+  }
+
   /**
    * This method determines the set of expired partitions from the set of 
partitions currently returned by
    * the record supplier and the set of partitions previously tracked in the 
metadata.
@@ -2106,6 +2153,7 @@ public abstract class 
SeekableStreamSupervisor<PartitionIdType, SequenceOffsetTy
 
       partitionIds.clear();
       partitionIds.addAll(activePartitionsIdsFromSupplier);
+      assignRecordSupplierToPartitionIds();
 
       for (Integer groupId : partitionGroups.keySet()) {
         if (newPartitionGroups.containsKey(groupId)) {
@@ -2773,8 +2821,7 @@ public abstract class 
SeekableStreamSupervisor<PartitionIdType, SequenceOffsetTy
     return startingOffsets;
   }
 
-  private void createNewTasks()
-      throws JsonProcessingException
+  private void createNewTasks() throws JsonProcessingException
   {
     // update the checkpoints in the taskGroup to latest ones so that new 
tasks do not read what is already published
     verifyAndMergeCheckpoints(
@@ -2993,7 +3040,7 @@ public abstract class 
SeekableStreamSupervisor<PartitionIdType, SequenceOffsetTy
       if (sequence == null) {
         throw new ISE("unable to fetch sequence number for partition[%s] from 
stream", partition);
       }
-      log.info("Getting sequence number [%s] for partition [%s]", sequence, 
partition);
+      log.debug("Getting sequence number [%s] for partition [%s]", sequence, 
partition);
       return makeSequenceNumber(sequence, false);
     }
   }
@@ -3023,26 +3070,27 @@ public abstract class 
SeekableStreamSupervisor<PartitionIdType, SequenceOffsetTy
     return Collections.emptyMap();
   }
 
+  /**
+   * Fetches the earliest or latest offset from the stream via the {@link 
RecordSupplier}
+   */
   @Nullable
   private SequenceOffsetType getOffsetFromStreamForPartition(PartitionIdType 
partition, boolean useEarliestOffset)
   {
-    synchronized (recordSupplierLock) {
+    recordSupplierLock.lock();
+    try {
       StreamPartition<PartitionIdType> topicPartition = new 
StreamPartition<>(ioConfig.getStream(), partition);
       if (!recordSupplier.getAssignment().contains(topicPartition)) {
-        final Set partitions = Collections.singleton(topicPartition);
-        recordSupplier.assign(partitions);
-        try {
-          recordSupplier.seekToEarliest(partitions);
-        }
-        catch (InterruptedException e) {
-          throw new RuntimeException(e);
-        }
+        // this shouldn't happen, but in case it does...
+        throw new IllegalStateException("Record supplier does not match 
current known partitions");
       }
 
       return useEarliestOffset
              ? recordSupplier.getEarliestSequenceNumber(topicPartition)
              : recordSupplier.getLatestSequenceNumber(topicPartition);
     }
+    finally {
+      recordSupplierLock.unlock();
+    }
   }
 
   private void createTasksForGroup(int groupId, int replicas)
@@ -3098,16 +3146,24 @@ public abstract class 
SeekableStreamSupervisor<PartitionIdType, SequenceOffsetTy
     }
   }
 
+  /**
+   * monitoring method, fetches current partition offsets and lag in a 
background reporting thread
+   */
   @VisibleForTesting
   public void updateCurrentAndLatestOffsets()
   {
-    try {
-      updateCurrentOffsets();
-      updateLatestOffsetsFromStream();
-      sequenceLastUpdated = DateTimes.nowUtc();
-    }
-    catch (Exception e) {
-      log.warn(e, "Exception while getting current/latest sequences");
+    // if we aren't in a steady state, chill out for a bit, don't worry, we'll 
get called later, but if we aren't
+    // healthy go ahead and try anyway to try if possible to provide insight 
into how much time is left to fix the
+    // issue for cluster operators since this feeds the lag metrics
+    if (stateManager.isSteadyState() || !stateManager.isHealthy()) {
+      try {
+        updateCurrentOffsets();
+        updatePartitionLagFromStream();
+        sequenceLastUpdated = DateTimes.nowUtc();
+      }
+      catch (Exception e) {
+        log.warn(e, "Exception while getting current/latest sequences");
+      }
     }
   }
 
@@ -3136,34 +3192,7 @@ public abstract class 
SeekableStreamSupervisor<PartitionIdType, SequenceOffsetTy
     Futures.successfulAsList(futures).get(futureTimeoutInSeconds, 
TimeUnit.SECONDS);
   }
 
-  private void updateLatestOffsetsFromStream() throws InterruptedException
-  {
-    synchronized (recordSupplierLock) {
-      Set<PartitionIdType> partitionIds;
-      try {
-        partitionIds = recordSupplier.getPartitionIds(ioConfig.getStream());
-      }
-      catch (Exception e) {
-        log.warn("Could not fetch partitions for topic/stream [%s]", 
ioConfig.getStream());
-        throw new StreamException(e);
-      }
-
-      Set<StreamPartition<PartitionIdType>> partitions = partitionIds
-          .stream()
-          .map(e -> new StreamPartition<>(ioConfig.getStream(), e))
-          .collect(Collectors.toSet());
-
-      recordSupplier.assign(partitions);
-      recordSupplier.seekToLatest(partitions);
-
-      updateLatestSequenceFromStream(recordSupplier, partitions);
-    }
-  }
-
-  protected abstract void updateLatestSequenceFromStream(
-      RecordSupplier<PartitionIdType, SequenceOffsetType> recordSupplier,
-      Set<StreamPartition<PartitionIdType>> partitions
-  );
+  protected abstract void updatePartitionLagFromStream();
 
   /**
    * Gets 'lag' of currently processed offset behind latest offset as a 
measure of difference between offsets.
@@ -3179,17 +3208,21 @@ public abstract class 
SeekableStreamSupervisor<PartitionIdType, SequenceOffsetTy
 
   protected Map<PartitionIdType, SequenceOffsetType> getHighestCurrentOffsets()
   {
-    if (!spec.isSuspended() || activelyReadingTaskGroups.size() > 0 || 
pendingCompletionTaskGroups.size() > 0) {
-      return activelyReadingTaskGroups
-          .values()
-          .stream()
-          .flatMap(taskGroup -> taskGroup.tasks.entrySet().stream())
-          .flatMap(taskData -> 
taskData.getValue().currentSequences.entrySet().stream())
-          .collect(Collectors.toMap(
-              Entry::getKey,
-              Entry::getValue,
-              (v1, v2) -> 
makeSequenceNumber(v1).compareTo(makeSequenceNumber(v2)) > 0 ? v1 : v2
-          ));
+    if (!spec.isSuspended()) {
+      if (activelyReadingTaskGroups.size() > 0 || 
pendingCompletionTaskGroups.size() > 0) {
+        return activelyReadingTaskGroups
+            .values()
+            .stream()
+            .flatMap(taskGroup -> taskGroup.tasks.entrySet().stream())
+            .flatMap(taskData -> 
taskData.getValue().currentSequences.entrySet().stream())
+            .collect(Collectors.toMap(
+                Entry::getKey,
+                Entry::getValue,
+                (v1, v2) -> 
makeSequenceNumber(v1).compareTo(makeSequenceNumber(v2)) > 0 ? v1 : v2
+            ));
+      }
+      // nothing is running but we are not suspended, so lets just hang out in 
case we get called while things start up
+      return ImmutableMap.of();
     } else {
       // if supervisor is suspended, no tasks are likely running so use 
offsets in metadata, if exist
       return getOffsetsFromMetadataStorage();
@@ -3447,8 +3480,9 @@ public abstract class 
SeekableStreamSupervisor<PartitionIdType, SequenceOffsetTy
 
   protected void emitLag()
   {
-    if (spec.isSuspended()) {
-      // don't emit metrics if supervisor is suspended (lag should still 
available in status report)
+    if (spec.isSuspended() || !stateManager.isSteadyState()) {
+      // don't emit metrics if supervisor is suspended or not in a healthy 
running state
+      // (lag should still available in status report)
       return;
     }
     try {
diff --git 
a/indexing-service/src/test/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisorStateTest.java
 
b/indexing-service/src/test/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisorStateTest.java
index 0d0ae06..54aa3be 100644
--- 
a/indexing-service/src/test/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisorStateTest.java
+++ 
b/indexing-service/src/test/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisorStateTest.java
@@ -753,7 +753,6 @@ public class SeekableStreamSupervisorStateTest extends 
EasyMockSupport
   {
     spec = createMock(SeekableStreamSupervisorSpec.class);
     
EasyMock.expect(spec.getSupervisorStateManagerConfig()).andReturn(supervisorConfig).anyTimes();
-
     
EasyMock.expect(spec.getDataSchema()).andReturn(getDataSchema()).anyTimes();
     EasyMock.expect(spec.getIoConfig()).andReturn(new 
SeekableStreamSupervisorIOConfig(
         "stream",
@@ -766,13 +765,20 @@ public class SeekableStreamSupervisorStateTest extends 
EasyMockSupport
         false,
         new Period("PT30M"),
         null,
-        null, null
+        null,
+        null
     )
     {
     }).anyTimes();
     
EasyMock.expect(spec.getTuningConfig()).andReturn(getTuningConfig()).anyTimes();
     EasyMock.expect(spec.getEmitter()).andReturn(emitter).anyTimes();
-    EasyMock.expect(spec.getMonitorSchedulerConfig()).andReturn(new 
DruidMonitorSchedulerConfig()).anyTimes();
+    EasyMock.expect(spec.getMonitorSchedulerConfig()).andReturn(new 
DruidMonitorSchedulerConfig() {
+      @Override
+      public Duration getEmitterPeriod()
+      {
+        return new Period("PT1S").toStandardDuration();
+      }
+    }).anyTimes();
     EasyMock.expect(spec.isSuspended()).andReturn(suspended).anyTimes();
     EasyMock.expect(spec.getType()).andReturn("test").anyTimes();
 
@@ -986,9 +992,7 @@ public class SeekableStreamSupervisorStateTest extends 
EasyMockSupport
     }
 
     @Override
-    protected void updateLatestSequenceFromStream(
-        RecordSupplier<String, String> recordSupplier, 
Set<StreamPartition<String>> streamPartitions
-    )
+    protected void updatePartitionLagFromStream()
     {
       // do nothing
     }
@@ -1219,7 +1223,9 @@ public class SeekableStreamSupervisorStateTest extends 
EasyMockSupport
     protected void emitLag()
     {
       super.emitLag();
-      latch.countDown();
+      if (stateManager.isSteadyState()) {
+        latch.countDown();
+      }
     }
 
     @Override
diff --git 
a/server/src/main/java/org/apache/druid/indexing/overlord/supervisor/SupervisorStateManager.java
 
b/server/src/main/java/org/apache/druid/indexing/overlord/supervisor/SupervisorStateManager.java
index 76cf8c6..406d5d2 100644
--- 
a/server/src/main/java/org/apache/druid/indexing/overlord/supervisor/SupervisorStateManager.java
+++ 
b/server/src/main/java/org/apache/druid/indexing/overlord/supervisor/SupervisorStateManager.java
@@ -92,7 +92,7 @@ public class SupervisorStateManager
 
   private final Deque<ExceptionEvent> recentEventsQueue = new 
ConcurrentLinkedDeque<>();
 
-  private State supervisorState = BasicState.PENDING;
+  private volatile State supervisorState = BasicState.PENDING;
 
   private boolean atLeastOneSuccessfulRun = false;
   private boolean currentRunSuccessful = true;
@@ -214,6 +214,11 @@ public class SupervisorStateManager
     return supervisorState != null && supervisorState.isHealthy();
   }
 
+  public boolean isSteadyState()
+  {
+    return healthySteadyState.equals(supervisorState);
+  }
+
   public boolean isAtLeastOneSuccessfulRun()
   {
     return atLeastOneSuccessfulRun;


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to