This is an automated email from the ASF dual-hosted git repository.

kfaraz pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/druid.git


The following commit(s) were added to refs/heads/master by this push:
     new c5e2f82752d Minor cleanup of SeekableStreamSupervisor classes (#17733)
c5e2f82752d is described below

commit c5e2f82752db0b11fd1bbe4fd35e5c1f0efdb980
Author: Kashif Faraz <[email protected]>
AuthorDate: Tue Feb 18 17:39:51 2025 +0530

    Minor cleanup of SeekableStreamSupervisor classes (#17733)
    
    Changes:
    - Change fields minimumMessageTime and maximumMessageTime from Optional to 
plain nullable fields
    - Clean up other logs and comments
---
 .../RabbitStreamIndexTaskIOConfigTest.java         |  30 ++--
 .../indexing/kafka/supervisor/KafkaSupervisor.java |  21 +--
 .../druid/indexing/kafka/KafkaIOConfigTest.java    |  15 +-
 .../supervisor/KafkaSupervisorIOConfigTest.java    |   3 +-
 .../kafka/supervisor/KafkaSupervisorTest.java      |  92 ++++++-------
 .../indexing/kinesis/KinesisRecordSupplier.java    |  17 +--
 .../indexing/kinesis/KinesisIOConfigTest.java      |  24 ++--
 .../supervisor/KinesisSupervisorIOConfigTest.java  |   3 +-
 .../kinesis/supervisor/KinesisSupervisorTest.java  |  46 +++----
 .../SeekableStreamIndexTaskIOConfig.java           |  31 ++---
 .../SeekableStreamIndexTaskRunner.java             |   5 +-
 .../supervisor/SeekableStreamSupervisor.java       | 152 +++++++++++----------
 .../SeekableStreamIndexTaskRunnerTest.java         |   9 +-
 .../SeekableStreamSupervisorStateTest.java         |  96 ++++++-------
 14 files changed, 260 insertions(+), 284 deletions(-)

diff --git 
a/extensions-contrib/rabbit-stream-indexing-service/src/test/java/org/apache/druid/indexing/rabbitstream/RabbitStreamIndexTaskIOConfigTest.java
 
b/extensions-contrib/rabbit-stream-indexing-service/src/test/java/org/apache/druid/indexing/rabbitstream/RabbitStreamIndexTaskIOConfigTest.java
index ef77c58163c..7783e148777 100644
--- 
a/extensions-contrib/rabbit-stream-indexing-service/src/test/java/org/apache/druid/indexing/rabbitstream/RabbitStreamIndexTaskIOConfigTest.java
+++ 
b/extensions-contrib/rabbit-stream-indexing-service/src/test/java/org/apache/druid/indexing/rabbitstream/RabbitStreamIndexTaskIOConfigTest.java
@@ -19,15 +19,12 @@
 
 package org.apache.druid.indexing.rabbitstream;
 
-import com.fasterxml.jackson.databind.Module;
 import com.fasterxml.jackson.databind.ObjectMapper;
 import com.google.common.collect.ImmutableMap;
 import org.apache.druid.jackson.DefaultObjectMapper;
 import org.apache.druid.segment.indexing.IOConfig;
 import org.junit.Assert;
-import org.junit.Rule;
 import org.junit.Test;
-import org.junit.rules.ExpectedException;
 
 import java.util.Collections;
 
@@ -38,12 +35,9 @@ public class RabbitStreamIndexTaskIOConfigTest
   public RabbitStreamIndexTaskIOConfigTest()
   {
     mapper = new DefaultObjectMapper();
-    mapper.registerModules((Iterable<Module>) new 
RabbitStreamIndexTaskModule().getJacksonModules());
+    mapper.registerModules(new 
RabbitStreamIndexTaskModule().getJacksonModules());
   }
 
-  @Rule
-  public final ExpectedException exception = ExpectedException.none();
-
   @Test
   public void testSerdeWithDefaults() throws Exception
   {
@@ -56,31 +50,29 @@ public class RabbitStreamIndexTaskIOConfigTest
         + "}";
 
     RabbitStreamIndexTaskIOConfig config = (RabbitStreamIndexTaskIOConfig) 
mapper.readValue(
-        mapper.writeValueAsString(
-            mapper.readValue(
-                jsonStr,
-                IOConfig.class)),
-        IOConfig.class);
+        mapper.writeValueAsString(mapper.readValue(jsonStr, IOConfig.class)),
+        IOConfig.class
+    );
 
     Assert.assertNull(config.getTaskGroupId());
     Assert.assertEquals("my-sequence-name", config.getBaseSequenceName());
 
     Assert.assertEquals("mystream", 
config.getStartSequenceNumbers().getStream());
 
-    Assert.assertEquals(Long.class,
-        
config.getStartSequenceNumbers().getPartitionSequenceNumberMap().get("stream-1").getClass());
     Assert.assertEquals(
-        ImmutableMap.of("stream-0", Long.valueOf(1), "stream-1", 
Long.valueOf(10)),
-        config.getStartSequenceNumbers().getPartitionSequenceNumberMap());
+        ImmutableMap.of("stream-0", 1L, "stream-1", 10L),
+        config.getStartSequenceNumbers().getPartitionSequenceNumberMap()
+    );
 
     Assert.assertEquals("mystream", 
config.getEndSequenceNumbers().getStream());
 
     Assert.assertEquals(
-        ImmutableMap.of("stream-0", Long.valueOf(15L), "stream-1", 
Long.valueOf(200L)),
-        config.getEndSequenceNumbers().getPartitionSequenceNumberMap());
+        ImmutableMap.of("stream-0", 15L, "stream-1", 200L),
+        config.getEndSequenceNumbers().getPartitionSequenceNumberMap()
+    );
 
     Assert.assertTrue(config.isUseTransaction());
-    Assert.assertFalse("minimumMessageTime", 
config.getMinimumMessageTime().isPresent());
+    Assert.assertNull("minimumMessageTime", config.getMinimumMessageTime());
     Assert.assertEquals(config.getUri(), "rabbitmq-stream://localhost:5552");
     Assert.assertEquals(Collections.emptySet(), 
config.getStartSequenceNumbers().getExclusivePartitions());
   }
diff --git 
a/extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/indexing/kafka/supervisor/KafkaSupervisor.java
 
b/extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/indexing/kafka/supervisor/KafkaSupervisor.java
index f449c158789..a0c7494e3bb 100644
--- 
a/extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/indexing/kafka/supervisor/KafkaSupervisor.java
+++ 
b/extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/indexing/kafka/supervisor/KafkaSupervisor.java
@@ -55,8 +55,6 @@ import 
org.apache.druid.indexing.seekablestream.supervisor.SeekableStreamSupervi
 import 
org.apache.druid.indexing.seekablestream.supervisor.SeekableStreamSupervisorReportPayload;
 import org.apache.druid.java.util.common.StringUtils;
 import org.apache.druid.java.util.emitter.EmittingLogger;
-import org.apache.druid.java.util.emitter.service.ServiceEmitter;
-import org.apache.druid.java.util.metrics.DruidMonitorSchedulerConfig;
 import org.apache.druid.segment.incremental.RowIngestionMetersFactory;
 import org.joda.time.DateTime;
 
@@ -91,12 +89,9 @@ public class KafkaSupervisor extends 
SeekableStreamSupervisor<KafkaTopicPartitio
   private static final Long NOT_SET = -1L;
   private static final Long END_OF_PARTITION = Long.MAX_VALUE;
 
-  private final ServiceEmitter emitter;
-  private final DruidMonitorSchedulerConfig monitorSchedulerConfig;
   private final Pattern pattern;
   private volatile Map<KafkaTopicPartition, Long> latestSequenceFromStream;
 
-
   private final KafkaSupervisorSpec spec;
 
   public KafkaSupervisor(
@@ -122,8 +117,6 @@ public class KafkaSupervisor extends 
SeekableStreamSupervisor<KafkaTopicPartitio
     );
 
     this.spec = spec;
-    this.emitter = spec.getEmitter();
-    this.monitorSchedulerConfig = spec.getMonitorSchedulerConfig();
     this.pattern = getIoConfig().isMultiTopic() ? 
Pattern.compile(getIoConfig().getStream()) : null;
   }
 
@@ -267,7 +260,7 @@ public class KafkaSupervisor extends 
SeekableStreamSupervisor<KafkaTopicPartitio
 
     if 
(!latestSequenceFromStream.keySet().equals(highestCurrentOffsets.keySet())) {
       log.warn(
-          "Lag metric: Kafka partitions %s do not match task partitions %s",
+          "Kafka partitions[%s] do not match task partitions[%s]",
           latestSequenceFromStream.keySet(),
           highestCurrentOffsets.keySet()
       );
@@ -307,9 +300,6 @@ public class KafkaSupervisor extends 
SeekableStreamSupervisor<KafkaTopicPartitio
   }
 
   @Override
-  // suppress use of CollectionUtils.mapValues() since the valueMapper 
function is dependent on map key here
-  @SuppressWarnings("SSBasedInspection")
-  // Used while generating Supervisor lag reports per task
   protected Map<KafkaTopicPartition, Long> 
getRecordLagPerPartition(Map<KafkaTopicPartition, Long> currentOffsets)
   {
     if (latestSequenceFromStream == null || currentOffsets == null) {
@@ -333,6 +323,7 @@ public class KafkaSupervisor extends 
SeekableStreamSupervisor<KafkaTopicPartitio
   @Override
   protected Map<KafkaTopicPartition, Long> 
getTimeLagPerPartition(Map<KafkaTopicPartition, Long> currentOffsets)
   {
+    // Currently not supported
     return null;
   }
 
@@ -389,6 +380,11 @@ public class KafkaSupervisor extends 
SeekableStreamSupervisor<KafkaTopicPartitio
     return computeLags(partitionRecordLag);
   }
 
+  /**
+   * Fetches the latest offsets from the Kafka stream and updates the map
+   * {@link #latestSequenceFromStream}. The actual lag is computed lazily in
+   * {@link #getPartitionRecordLag}.
+   */
   @Override
   protected void updatePartitionLagFromStream()
   {
@@ -410,9 +406,6 @@ public class KafkaSupervisor extends 
SeekableStreamSupervisor<KafkaTopicPartitio
 
       recordSupplier.seekToLatest(partitions);
 
-      // this method isn't actually computing the lag, just fetching the 
latests offsets from the stream. This is
-      // because we currently only have record lag for kafka, which can be 
lazily computed by subtracting the highest
-      // task offsets from the latest offsets from the stream when it is needed
       latestSequenceFromStream =
           
partitions.stream().collect(Collectors.toMap(StreamPartition::getPartitionId, 
recordSupplier::getPosition));
     }
diff --git 
a/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/KafkaIOConfigTest.java
 
b/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/KafkaIOConfigTest.java
index d7e50fe13c0..187c13df9ff 100644
--- 
a/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/KafkaIOConfigTest.java
+++ 
b/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/KafkaIOConfigTest.java
@@ -20,7 +20,6 @@
 package org.apache.druid.indexing.kafka;
 
 import com.fasterxml.jackson.databind.JsonMappingException;
-import com.fasterxml.jackson.databind.Module;
 import com.fasterxml.jackson.databind.ObjectMapper;
 import com.google.common.collect.ImmutableMap;
 import org.apache.druid.data.input.kafka.KafkaTopicPartition;
@@ -42,7 +41,7 @@ public class KafkaIOConfigTest
   public KafkaIOConfigTest()
   {
     mapper = new DefaultObjectMapper();
-    mapper.registerModules((Iterable<Module>) new 
KafkaIndexTaskModule().getJacksonModules());
+    mapper.registerModules(new KafkaIndexTaskModule().getJacksonModules());
   }
 
   @Rule
@@ -78,8 +77,8 @@ public class KafkaIOConfigTest
                                         200L), 
config.getEndSequenceNumbers().getPartitionSequenceNumberMap());
     Assert.assertEquals(ImmutableMap.of("bootstrap.servers", 
"localhost:9092"), config.getConsumerProperties());
     Assert.assertTrue(config.isUseTransaction());
-    Assert.assertFalse("minimumMessageTime", 
config.getMinimumMessageTime().isPresent());
-    Assert.assertFalse("maximumMessageTime", 
config.getMaximumMessageTime().isPresent());
+    Assert.assertNull("minimumMessageTime", config.getMinimumMessageTime());
+    Assert.assertNull("maximumMessageTime", config.getMaximumMessageTime());
     Assert.assertEquals(Collections.emptySet(), 
config.getStartSequenceNumbers().getExclusivePartitions());
   }
 
@@ -113,8 +112,8 @@ public class KafkaIOConfigTest
                                         200L), 
config.getEndSequenceNumbers().getPartitionSequenceNumberMap());
     Assert.assertEquals(ImmutableMap.of("bootstrap.servers", 
"localhost:9092"), config.getConsumerProperties());
     Assert.assertTrue(config.isUseTransaction());
-    Assert.assertFalse("minimumMessageTime", 
config.getMinimumMessageTime().isPresent());
-    Assert.assertFalse("maximumMessageTime", 
config.getMaximumMessageTime().isPresent());
+    Assert.assertNull("minimumMessageTime", config.getMinimumMessageTime());
+    Assert.assertNull("maximumMessageTime", config.getMaximumMessageTime());
     Assert.assertEquals(Collections.emptySet(), 
config.getStartSequenceNumbers().getExclusivePartitions());
   }
 
@@ -151,8 +150,8 @@ public class KafkaIOConfigTest
                                         200L), 
config.getEndSequenceNumbers().getPartitionSequenceNumberMap());
     Assert.assertEquals(ImmutableMap.of("bootstrap.servers", 
"localhost:9092"), config.getConsumerProperties());
     Assert.assertFalse(config.isUseTransaction());
-    Assert.assertEquals(DateTimes.of("2016-05-31T12:00Z"), 
config.getMinimumMessageTime().get());
-    Assert.assertEquals(DateTimes.of("2016-05-31T14:00Z"), 
config.getMaximumMessageTime().get());
+    Assert.assertEquals(DateTimes.of("2016-05-31T12:00Z"), 
config.getMinimumMessageTime());
+    Assert.assertEquals(DateTimes.of("2016-05-31T14:00Z"), 
config.getMaximumMessageTime());
     Assert.assertEquals(Collections.emptySet(), 
config.getStartSequenceNumbers().getExclusivePartitions());
   }
 
diff --git 
a/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/supervisor/KafkaSupervisorIOConfigTest.java
 
b/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/supervisor/KafkaSupervisorIOConfigTest.java
index c2aee78b3cb..64ac05865d6 100644
--- 
a/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/supervisor/KafkaSupervisorIOConfigTest.java
+++ 
b/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/supervisor/KafkaSupervisorIOConfigTest.java
@@ -21,7 +21,6 @@ package org.apache.druid.indexing.kafka.supervisor;
 
 import com.fasterxml.jackson.core.JsonProcessingException;
 import com.fasterxml.jackson.databind.JsonMappingException;
-import com.fasterxml.jackson.databind.Module;
 import com.fasterxml.jackson.databind.ObjectMapper;
 import com.google.common.collect.ImmutableMap;
 import org.apache.druid.error.DruidException;
@@ -51,7 +50,7 @@ public class KafkaSupervisorIOConfigTest
   public KafkaSupervisorIOConfigTest()
   {
     mapper = new DefaultObjectMapper();
-    mapper.registerModules((Iterable<Module>) new 
KafkaIndexTaskModule().getJacksonModules());
+    mapper.registerModules(new KafkaIndexTaskModule().getJacksonModules());
   }
 
   @Rule
diff --git 
a/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/supervisor/KafkaSupervisorTest.java
 
b/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/supervisor/KafkaSupervisorTest.java
index c24ba243d6f..1151bec626c 100644
--- 
a/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/supervisor/KafkaSupervisorTest.java
+++ 
b/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/supervisor/KafkaSupervisorTest.java
@@ -423,8 +423,8 @@ public class KafkaSupervisorTest extends EasyMockSupport
     Assert.assertEquals("myCustomValue", 
taskConfig.getConsumerProperties().get("myCustomKey"));
     Assert.assertEquals("sequenceName-0", taskConfig.getBaseSequenceName());
     Assert.assertTrue("isUseTransaction", taskConfig.isUseTransaction());
-    Assert.assertFalse("minimumMessageTime", 
taskConfig.getMinimumMessageTime().isPresent());
-    Assert.assertFalse("maximumMessageTime", 
taskConfig.getMaximumMessageTime().isPresent());
+    Assert.assertNull("minimumMessageTime", 
taskConfig.getMinimumMessageTime());
+    Assert.assertNull("maximumMessageTime", 
taskConfig.getMaximumMessageTime());
 
     Assert.assertEquals(topic, 
taskConfig.getStartSequenceNumbers().getStream());
     Assert.assertEquals(
@@ -563,8 +563,8 @@ public class KafkaSupervisorTest extends EasyMockSupport
     Assert.assertEquals("myCustomValue", 
taskConfig.getConsumerProperties().get("myCustomKey"));
     Assert.assertEquals("sequenceName-0", taskConfig.getBaseSequenceName());
     Assert.assertTrue("isUseTransaction", taskConfig.isUseTransaction());
-    Assert.assertFalse("minimumMessageTime", 
taskConfig.getMinimumMessageTime().isPresent());
-    Assert.assertFalse("maximumMessageTime", 
taskConfig.getMaximumMessageTime().isPresent());
+    Assert.assertNull("minimumMessageTime", 
taskConfig.getMinimumMessageTime());
+    Assert.assertNull("maximumMessageTime", 
taskConfig.getMaximumMessageTime());
 
     Assert.assertEquals(topic, 
taskConfig.getStartSequenceNumbers().getStream());
     Assert.assertEquals(
@@ -816,15 +816,15 @@ public class KafkaSupervisorTest extends EasyMockSupport
 
     Assert.assertTrue(
         "minimumMessageTime",
-        
task1.getIOConfig().getMinimumMessageTime().get().plusMinutes(59).isBeforeNow()
+        
task1.getIOConfig().getMinimumMessageTime().plusMinutes(59).isBeforeNow()
     );
     Assert.assertTrue(
         "minimumMessageTime",
-        
task1.getIOConfig().getMinimumMessageTime().get().plusMinutes(61).isAfterNow()
+        
task1.getIOConfig().getMinimumMessageTime().plusMinutes(61).isAfterNow()
     );
     Assert.assertEquals(
-        task1.getIOConfig().getMinimumMessageTime().get(),
-        task2.getIOConfig().getMinimumMessageTime().get()
+        task1.getIOConfig().getMinimumMessageTime(),
+        task2.getIOConfig().getMinimumMessageTime()
     );
   }
 
@@ -855,15 +855,15 @@ public class KafkaSupervisorTest extends EasyMockSupport
 
     Assert.assertTrue(
         "maximumMessageTime",
-        task1.getIOConfig().getMaximumMessageTime().get().minusMinutes(59 + 
60).isAfterNow()
+        task1.getIOConfig().getMaximumMessageTime().minusMinutes(59 + 
60).isAfterNow()
     );
     Assert.assertTrue(
         "maximumMessageTime",
-        task1.getIOConfig().getMaximumMessageTime().get().minusMinutes(61 + 
60).isBeforeNow()
+        task1.getIOConfig().getMaximumMessageTime().minusMinutes(61 + 
60).isBeforeNow()
     );
     Assert.assertEquals(
-        task1.getIOConfig().getMaximumMessageTime().get(),
-        task2.getIOConfig().getMaximumMessageTime().get()
+        task1.getIOConfig().getMaximumMessageTime(),
+        task2.getIOConfig().getMaximumMessageTime()
     );
   }
 
@@ -1527,8 +1527,8 @@ public class KafkaSupervisorTest extends EasyMockSupport
             .times(1);
 
     taskRunner.registerListener(EasyMock.anyObject(TaskRunnerListener.class), 
EasyMock.anyObject(Executor.class));
-    taskQueue.shutdown("id4", "Task [%s] failed to stop in a timely manner, 
killing task", "id4");
-    taskQueue.shutdown("id5", "Task [%s] failed to stop in a timely manner, 
killing task", "id5");
+    taskQueue.shutdown("id4", "Failed to stop in a timely manner");
+    taskQueue.shutdown("id5", "Failed to stop in a timely manner");
     replayAll();
 
     supervisor.start();
@@ -1682,7 +1682,7 @@ public class KafkaSupervisorTest extends EasyMockSupport
     verifyAll();
 
     // check that replica tasks are created with the same minimumMessageTime 
as tasks inherited from another supervisor
-    Assert.assertEquals(now, ((KafkaIndexTask) 
captured.getValue()).getIOConfig().getMinimumMessageTime().get());
+    Assert.assertEquals(now, ((KafkaIndexTask) 
captured.getValue()).getIOConfig().getMinimumMessageTime());
 
     // test that a task failing causes a new task to be re-queued with the 
same parameters
     String runningTaskId = captured.getValue().getId();
@@ -1728,10 +1728,10 @@ public class KafkaSupervisorTest extends EasyMockSupport
 
     // check that failed tasks are recreated with the same minimumMessageTime 
as the task it replaced, even if that
     // task came from another supervisor
-    Assert.assertEquals(now, ((KafkaIndexTask) 
aNewTaskCapture.getValue()).getIOConfig().getMinimumMessageTime().get());
+    Assert.assertEquals(now, ((KafkaIndexTask) 
aNewTaskCapture.getValue()).getIOConfig().getMinimumMessageTime());
     Assert.assertEquals(
         maxi,
-        ((KafkaIndexTask) 
aNewTaskCapture.getValue()).getIOConfig().getMaximumMessageTime().get()
+        ((KafkaIndexTask) 
aNewTaskCapture.getValue()).getIOConfig().getMaximumMessageTime()
     );
   }
 
@@ -2427,16 +2427,16 @@ public class KafkaSupervisorTest extends EasyMockSupport
     supervisor.addTaskGroupToActivelyReadingTaskGroup(
         0,
         singlePartitionMap(topic, 0, 0L, 2, 0L),
-        Optional.absent(),
-        Optional.absent(),
+        null,
+        null,
         ImmutableSet.of("id1"),
         ImmutableSet.of()
     );
     supervisor.addTaskGroupToActivelyReadingTaskGroup(
         1,
         singlePartitionMap(topic, 1, 0L),
-        Optional.absent(),
-        Optional.absent(),
+        null,
+        null,
         ImmutableSet.of("id2"),
         ImmutableSet.of()
     );
@@ -2581,16 +2581,16 @@ public class KafkaSupervisorTest extends EasyMockSupport
     supervisor.addTaskGroupToActivelyReadingTaskGroup(
         0,
         singlePartitionMap(topic, 0, 0L, 2, 0L),
-        Optional.absent(),
-        Optional.absent(),
+        null,
+        null,
         ImmutableSet.of("id1"),
         ImmutableSet.of()
     );
     supervisor.addTaskGroupToActivelyReadingTaskGroup(
         1,
         singlePartitionMap(topic, 1, 0L),
-        Optional.absent(),
-        Optional.absent(),
+        null,
+        null,
         ImmutableSet.of("id2"),
         ImmutableSet.of()
     );
@@ -2917,16 +2917,16 @@ public class KafkaSupervisorTest extends EasyMockSupport
     supervisor.addTaskGroupToActivelyReadingTaskGroup(
         0,
         singlePartitionMap(topic, 0, 0L, 2, 0L),
-        Optional.absent(),
-        Optional.absent(),
+        null,
+        null,
         ImmutableSet.of("id1"),
         ImmutableSet.of()
     );
     supervisor.addTaskGroupToActivelyReadingTaskGroup(
         1,
         singlePartitionMap(topic, 1, 0L),
-        Optional.absent(),
-        Optional.absent(),
+        null,
+        null,
         ImmutableSet.of("id2"),
         ImmutableSet.of()
     );
@@ -4310,8 +4310,8 @@ public class KafkaSupervisorTest extends EasyMockSupport
     Assert.assertEquals("myCustomValue", 
taskConfig.getConsumerProperties().get("myCustomKey"));
     Assert.assertEquals("sequenceName-0", taskConfig.getBaseSequenceName());
     Assert.assertTrue("isUseTransaction", taskConfig.isUseTransaction());
-    Assert.assertFalse("minimumMessageTime", 
taskConfig.getMinimumMessageTime().isPresent());
-    Assert.assertFalse("maximumMessageTime", 
taskConfig.getMaximumMessageTime().isPresent());
+    Assert.assertNull("minimumMessageTime", 
taskConfig.getMinimumMessageTime());
+    Assert.assertNull("maximumMessageTime", 
taskConfig.getMaximumMessageTime());
 
     Assert.assertEquals(topic, 
taskConfig.getStartSequenceNumbers().getStream());
     Assert.assertEquals(
@@ -4367,8 +4367,8 @@ public class KafkaSupervisorTest extends EasyMockSupport
     supervisor.addTaskGroupToActivelyReadingTaskGroup(
         supervisor.getTaskGroupIdForPartition(new KafkaTopicPartition(false, 
topic, 0)),
         singlePartitionMap(topic, 0, 0L),
-        Optional.absent(),
-        Optional.absent(),
+        null,
+        null,
         ImmutableSet.of("task1"),
         ImmutableSet.of()
     );
@@ -4376,8 +4376,8 @@ public class KafkaSupervisorTest extends EasyMockSupport
     supervisor.addTaskGroupToPendingCompletionTaskGroup(
         supervisor.getTaskGroupIdForPartition(new KafkaTopicPartition(false, 
topic, 1)),
         singlePartitionMap(topic, 0, 0L),
-        Optional.absent(),
-        Optional.absent(),
+        null,
+        null,
         ImmutableSet.of("task2"),
         ImmutableSet.of()
     );
@@ -4409,8 +4409,8 @@ public class KafkaSupervisorTest extends EasyMockSupport
     supervisor.addTaskGroupToActivelyReadingTaskGroup(
         supervisor.getTaskGroupIdForPartition(new KafkaTopicPartition(false, 
topic, 0)),
         singlePartitionMap(topic, 0, 0L),
-        Optional.absent(),
-        Optional.absent(),
+        null,
+        null,
         ImmutableSet.of("task1"),
         ImmutableSet.of()
     );
@@ -4418,8 +4418,8 @@ public class KafkaSupervisorTest extends EasyMockSupport
     supervisor.addTaskGroupToPendingCompletionTaskGroup(
         supervisor.getTaskGroupIdForPartition(new KafkaTopicPartition(false, 
topic, 1)),
         singlePartitionMap(topic, 0, 0L),
-        Optional.absent(),
-        Optional.absent(),
+        null,
+        null,
         ImmutableSet.of("task2"),
         ImmutableSet.of()
     );
@@ -4657,8 +4657,8 @@ public class KafkaSupervisorTest extends EasyMockSupport
     supervisor.addTaskGroupToActivelyReadingTaskGroup(
         42,
         singlePartitionMap(topic, 0, 0L, 2, 0L),
-        Optional.of(minMessageTime),
-        Optional.of(maxMessageTime),
+        minMessageTime,
+        maxMessageTime,
         ImmutableSet.of("id1", "id2", "id3", "id4"),
         ImmutableSet.of()
     );
@@ -4873,8 +4873,8 @@ public class KafkaSupervisorTest extends EasyMockSupport
         0,
         task1.getIOConfig().getStartSequenceNumbers(),
         task1.getIOConfig().getEndSequenceNumbers(),
-        task1.getIOConfig().getMinimumMessageTime().get(),
-        task1.getIOConfig().getMaximumMessageTime().get(),
+        task1.getIOConfig().getMinimumMessageTime(),
+        task1.getIOConfig().getMaximumMessageTime(),
         dataSchema,
         supervisor.getTuningConfig()
     );
@@ -5075,7 +5075,7 @@ public class KafkaSupervisorTest extends EasyMockSupport
 
     taskRunner.registerListener(EasyMock.anyObject(TaskRunnerListener.class), 
EasyMock.anyObject(Executor.class));
 
-    // Only the active i.e non-publishing tasks are resumed
+    // Only the active i.e. non-publishing tasks are resumed
     
EasyMock.expect(taskClient.getStartTimeAsync(readingTask.getId())).andReturn(Futures.immediateFuture(startTime));
     
EasyMock.expect(taskClient.resumeAsync(readingTask.getId())).andReturn(Futures.immediateFuture(true));
 
@@ -5804,8 +5804,8 @@ public class KafkaSupervisorTest extends EasyMockSupport
     @Override
     public String generateSequenceName(
         Map<KafkaTopicPartition, Long> startPartitions,
-        Optional<DateTime> minimumMessageTime,
-        Optional<DateTime> maximumMessageTime,
+        DateTime minimumMessageTime,
+        DateTime maximumMessageTime,
         DataSchema dataSchema,
         SeekableStreamIndexTaskTuningConfig tuningConfig
     )
diff --git 
a/extensions-core/kinesis-indexing-service/src/main/java/org/apache/druid/indexing/kinesis/KinesisRecordSupplier.java
 
b/extensions-core/kinesis-indexing-service/src/main/java/org/apache/druid/indexing/kinesis/KinesisRecordSupplier.java
index 07a0da32a95..b649927c276 100644
--- 
a/extensions-core/kinesis-indexing-service/src/main/java/org/apache/druid/indexing/kinesis/KinesisRecordSupplier.java
+++ 
b/extensions-core/kinesis-indexing-service/src/main/java/org/apache/druid/indexing/kinesis/KinesisRecordSupplier.java
@@ -225,8 +225,6 @@ public class KinesisRecordSupplier implements 
RecordSupplier<String, String, Kin
                 null
             );
 
-            recordsResult = null;
-
             recordBufferOfferWaitMillis = recordBufferOfferTimeout;
             while (!records.offer(
                 new 
MemoryBoundLinkedBlockingQueue.ObjectContainer<>(currRecord, 0),
@@ -412,7 +410,7 @@ public class KinesisRecordSupplier implements 
RecordSupplier<String, String, Kin
 
   private final boolean backgroundFetchEnabled;
   private volatile boolean closed = false;
-  private AtomicBoolean partitionsFetchStarted = new AtomicBoolean();
+  private final AtomicBoolean partitionsFetchStarted = new AtomicBoolean();
 
   public KinesisRecordSupplier(
       AmazonKinesis amazonKinesis,
@@ -873,7 +871,7 @@ public class KinesisRecordSupplier implements 
RecordSupplier<String, String, Kin
    * {@link #GET_SEQUENCE_NUMBER_RECORD_COUNT} records and return the first 
sequence number from the result set.
    * This method is thread safe as it does not depend on the internal state of 
the supplier (it doesn't use the
    * {@link PartitionResource} which have been assigned to the supplier), and 
the Kinesis client is thread safe.
-   *
+   * <p>
    * When there are no records at the offset corresponding to the 
ShardIteratorType,
    *    If shard is closed, return custom EOS sequence marker
    *    While getting the earliest sequence number, return a custom marker 
corresponding to TRIM_HORIZON
@@ -973,7 +971,7 @@ public class KinesisRecordSupplier implements 
RecordSupplier<String, String, Kin
 
       // If no more new data after offsetToUse, it means there is no lag for 
now.
       // So report lag points as 0L.
-      if (recordsResult.getRecords().size() == 0) {
+      if (recordsResult.getRecords().isEmpty()) {
         return 0L;
       } else {
         recordsResult = getRecordsForLag(iteratorType, offsetToUse, partition);
@@ -992,10 +990,9 @@ public class KinesisRecordSupplier implements 
RecordSupplier<String, String, Kin
             offsetToUse
     ).getShardIterator();
 
-    GetRecordsResult recordsResult = kinesis.getRecords(
-            new 
GetRecordsRequest().withShardIterator(shardIterator).withLimit(1)
+    return kinesis.getRecords(
+        new GetRecordsRequest().withShardIterator(shardIterator).withLimit(1)
     );
-    return recordsResult;
   }
 
   /**
@@ -1011,7 +1008,7 @@ public class KinesisRecordSupplier implements 
RecordSupplier<String, String, Kin
   /**
    * This method must be called before a seek operation ({@link #seek}, {@link 
#seekToLatest}, or
    * {@link #seekToEarliest}).
-   *
+   * <p>
    * When called, it will nuke the {@link #scheduledExec} that is shared by 
all {@link PartitionResource}, filters
    * records from the buffer for partitions which will have a seek operation 
performed, and stops background fetch for
    * each {@link PartitionResource} to prepare for the seek. If background 
fetch is not currently running, the
@@ -1060,6 +1057,6 @@ public class KinesisRecordSupplier implements 
RecordSupplier<String, String, Kin
     records = newQ;
 
     // restart fetching threads
-    partitionResources.values().forEach(x -> x.stopBackgroundFetch());
+    
partitionResources.values().forEach(PartitionResource::stopBackgroundFetch);
   }
 }
diff --git 
a/extensions-core/kinesis-indexing-service/src/test/java/org/apache/druid/indexing/kinesis/KinesisIOConfigTest.java
 
b/extensions-core/kinesis-indexing-service/src/test/java/org/apache/druid/indexing/kinesis/KinesisIOConfigTest.java
index 000be2830d6..f093f1c2ec2 100644
--- 
a/extensions-core/kinesis-indexing-service/src/test/java/org/apache/druid/indexing/kinesis/KinesisIOConfigTest.java
+++ 
b/extensions-core/kinesis-indexing-service/src/test/java/org/apache/druid/indexing/kinesis/KinesisIOConfigTest.java
@@ -22,10 +22,8 @@ package org.apache.druid.indexing.kinesis;
 import com.fasterxml.jackson.annotation.JsonCreator;
 import com.fasterxml.jackson.annotation.JsonProperty;
 import com.fasterxml.jackson.databind.JsonMappingException;
-import com.fasterxml.jackson.databind.Module;
 import com.fasterxml.jackson.databind.ObjectMapper;
 import com.fasterxml.jackson.databind.jsontype.NamedType;
-import com.google.common.base.Optional;
 import com.google.common.collect.ImmutableMap;
 import com.google.common.collect.ImmutableSet;
 import 
org.apache.druid.indexing.seekablestream.SeekableStreamEndSequenceNumbers;
@@ -53,7 +51,7 @@ public class KinesisIOConfigTest
   public KinesisIOConfigTest()
   {
     mapper = new DefaultObjectMapper();
-    mapper.registerModules((Iterable<Module>) new 
KinesisIndexingServiceModule().getJacksonModules());
+    mapper.registerModules(new 
KinesisIndexingServiceModule().getJacksonModules());
   }
 
   @Rule
@@ -92,7 +90,7 @@ public class KinesisIOConfigTest
         config.getEndSequenceNumbers().getPartitionSequenceNumberMap()
     );
     Assert.assertTrue(config.isUseTransaction());
-    Assert.assertFalse("minimumMessageTime", 
config.getMinimumMessageTime().isPresent());
+    Assert.assertNull("minimumMessageTime", config.getMinimumMessageTime());
     Assert.assertEquals(config.getEndpoint(), 
"kinesis.us-east-1.amazonaws.com");
     Assert.assertEquals(config.getFetchDelayMillis(), 0);
     Assert.assertEquals(Collections.emptySet(), 
config.getStartSequenceNumbers().getExclusivePartitions());
@@ -140,10 +138,8 @@ public class KinesisIOConfigTest
         config.getEndSequenceNumbers().getPartitionSequenceNumberMap()
     );
     Assert.assertFalse(config.isUseTransaction());
-    Assert.assertTrue("maximumMessageTime", 
config.getMaximumMessageTime().isPresent());
-    Assert.assertTrue("minimumMessageTime", 
config.getMinimumMessageTime().isPresent());
-    Assert.assertEquals(DateTimes.of("2016-05-31T12:00Z"), 
config.getMinimumMessageTime().get());
-    Assert.assertEquals(DateTimes.of("2016-05-31T14:00Z"), 
config.getMaximumMessageTime().get());
+    Assert.assertEquals(DateTimes.of("2016-05-31T12:00Z"), 
config.getMinimumMessageTime());
+    Assert.assertEquals(DateTimes.of("2016-05-31T14:00Z"), 
config.getMaximumMessageTime());
     Assert.assertEquals(config.getEndpoint(), 
"kinesis.us-east-2.amazonaws.com");
     
Assert.assertEquals(config.getStartSequenceNumbers().getExclusivePartitions(), 
ImmutableSet.of("0"));
     Assert.assertEquals(1000, config.getFetchDelayMillis());
@@ -350,8 +346,8 @@ public class KinesisIOConfigTest
     private final SeekableStreamEndSequenceNumbers<String, String> 
endPartitions;
     private final Set<String> exclusiveStartSequenceNumberPartitions;
     private final boolean useTransaction;
-    private final Optional<DateTime> minimumMessageTime;
-    private final Optional<DateTime> maximumMessageTime;
+    private final DateTime minimumMessageTime;
+    private final DateTime maximumMessageTime;
     private final String endpoint;
     private final Integer fetchDelayMillis;
     private final String awsAssumedRoleArn;
@@ -377,8 +373,8 @@ public class KinesisIOConfigTest
       this.endPartitions = endPartitions;
       this.exclusiveStartSequenceNumberPartitions = 
exclusiveStartSequenceNumberPartitions;
       this.useTransaction = useTransaction;
-      this.minimumMessageTime = Optional.fromNullable(minimumMessageTime);
-      this.maximumMessageTime = Optional.fromNullable(maximumMessageTime);
+      this.minimumMessageTime = minimumMessageTime;
+      this.maximumMessageTime = maximumMessageTime;
       this.endpoint = endpoint;
       this.fetchDelayMillis = fetchDelayMillis;
       this.awsAssumedRoleArn = awsAssumedRoleArn;
@@ -416,13 +412,13 @@ public class KinesisIOConfigTest
     }
 
     @JsonProperty
-    public Optional<DateTime> getMinimumMessageTime()
+    public DateTime getMinimumMessageTime()
     {
       return minimumMessageTime;
     }
 
     @JsonProperty
-    public Optional<DateTime> getMaximumMessageTime()
+    public DateTime getMaximumMessageTime()
     {
       return maximumMessageTime;
     }
diff --git 
a/extensions-core/kinesis-indexing-service/src/test/java/org/apache/druid/indexing/kinesis/supervisor/KinesisSupervisorIOConfigTest.java
 
b/extensions-core/kinesis-indexing-service/src/test/java/org/apache/druid/indexing/kinesis/supervisor/KinesisSupervisorIOConfigTest.java
index 9f5c0bf7504..aa922b008a5 100644
--- 
a/extensions-core/kinesis-indexing-service/src/test/java/org/apache/druid/indexing/kinesis/supervisor/KinesisSupervisorIOConfigTest.java
+++ 
b/extensions-core/kinesis-indexing-service/src/test/java/org/apache/druid/indexing/kinesis/supervisor/KinesisSupervisorIOConfigTest.java
@@ -20,7 +20,6 @@
 package org.apache.druid.indexing.kinesis.supervisor;
 
 import com.fasterxml.jackson.databind.JsonMappingException;
-import com.fasterxml.jackson.databind.Module;
 import com.fasterxml.jackson.databind.ObjectMapper;
 import org.apache.druid.indexing.kinesis.KinesisIndexingServiceModule;
 import org.apache.druid.indexing.kinesis.KinesisRegion;
@@ -39,7 +38,7 @@ public class KinesisSupervisorIOConfigTest
   public KinesisSupervisorIOConfigTest()
   {
     mapper = new DefaultObjectMapper();
-    mapper.registerModules((Iterable<Module>) new 
KinesisIndexingServiceModule().getJacksonModules());
+    mapper.registerModules(new 
KinesisIndexingServiceModule().getJacksonModules());
   }
 
   @Rule
diff --git 
a/extensions-core/kinesis-indexing-service/src/test/java/org/apache/druid/indexing/kinesis/supervisor/KinesisSupervisorTest.java
 
b/extensions-core/kinesis-indexing-service/src/test/java/org/apache/druid/indexing/kinesis/supervisor/KinesisSupervisorTest.java
index f4913d7a8b7..4f8b3720a24 100644
--- 
a/extensions-core/kinesis-indexing-service/src/test/java/org/apache/druid/indexing/kinesis/supervisor/KinesisSupervisorTest.java
+++ 
b/extensions-core/kinesis-indexing-service/src/test/java/org/apache/druid/indexing/kinesis/supervisor/KinesisSupervisorTest.java
@@ -265,8 +265,8 @@ public class KinesisSupervisorTest extends EasyMockSupport
     KinesisIndexTaskIOConfig taskConfig = task.getIOConfig();
     Assert.assertEquals("sequenceName-0", taskConfig.getBaseSequenceName());
     Assert.assertTrue("isUseTransaction", taskConfig.isUseTransaction());
-    Assert.assertFalse("minimumMessageTime", 
taskConfig.getMinimumMessageTime().isPresent());
-    Assert.assertFalse("maximumMessageTime", 
taskConfig.getMaximumMessageTime().isPresent());
+    Assert.assertNull("minimumMessageTime", 
taskConfig.getMinimumMessageTime());
+    Assert.assertNull("maximumMessageTime", 
taskConfig.getMaximumMessageTime());
 
     Assert.assertEquals(STREAM, 
taskConfig.getStartSequenceNumbers().getStream());
     Assert.assertEquals(
@@ -741,15 +741,15 @@ public class KinesisSupervisorTest extends EasyMockSupport
 
     Assert.assertTrue(
         "minimumMessageTime",
-        
task1.getIOConfig().getMinimumMessageTime().get().plusMinutes(59).isBeforeNow()
+        
task1.getIOConfig().getMinimumMessageTime().plusMinutes(59).isBeforeNow()
     );
     Assert.assertTrue(
         "minimumMessageTime",
-        
task1.getIOConfig().getMinimumMessageTime().get().plusMinutes(61).isAfterNow()
+        
task1.getIOConfig().getMinimumMessageTime().plusMinutes(61).isAfterNow()
     );
     Assert.assertEquals(
-        task1.getIOConfig().getMinimumMessageTime().get(),
-        task2.getIOConfig().getMinimumMessageTime().get()
+        task1.getIOConfig().getMinimumMessageTime(),
+        task2.getIOConfig().getMinimumMessageTime()
     );
   }
 
@@ -793,15 +793,15 @@ public class KinesisSupervisorTest extends EasyMockSupport
 
     Assert.assertTrue(
         "maximumMessageTime",
-        task1.getIOConfig().getMaximumMessageTime().get().minusMinutes(59 + 
60).isAfterNow()
+        task1.getIOConfig().getMaximumMessageTime().minusMinutes(59 + 
60).isAfterNow()
     );
     Assert.assertTrue(
         "maximumMessageTime",
-        task1.getIOConfig().getMaximumMessageTime().get().minusMinutes(61 + 
60).isBeforeNow()
+        task1.getIOConfig().getMaximumMessageTime().minusMinutes(61 + 
60).isBeforeNow()
     );
     Assert.assertEquals(
-        task1.getIOConfig().getMaximumMessageTime().get(),
-        task2.getIOConfig().getMaximumMessageTime().get()
+        task1.getIOConfig().getMaximumMessageTime(),
+        task2.getIOConfig().getMaximumMessageTime()
     );
   }
 
@@ -1292,7 +1292,7 @@ public class KinesisSupervisorTest extends EasyMockSupport
     verifyAll();
 
     // check that replica tasks are created with the same minimumMessageTime 
as tasks inherited from another supervisor
-    Assert.assertEquals(now, ((KinesisIndexTask) 
captured.getValue()).getIOConfig().getMinimumMessageTime().get());
+    Assert.assertEquals(now, ((KinesisIndexTask) 
captured.getValue()).getIOConfig().getMinimumMessageTime());
 
     // test that a task failing causes a new task to be re-queued with the 
same parameters
     String runningTaskId = captured.getValue().getId();
@@ -1340,11 +1340,11 @@ public class KinesisSupervisorTest extends 
EasyMockSupport
     // task came from another supervisor
     Assert.assertEquals(
         now,
-        ((KinesisIndexTask) 
aNewTaskCapture.getValue()).getIOConfig().getMinimumMessageTime().get()
+        ((KinesisIndexTask) 
aNewTaskCapture.getValue()).getIOConfig().getMinimumMessageTime()
     );
     Assert.assertEquals(
         maxi,
-        ((KinesisIndexTask) 
aNewTaskCapture.getValue()).getIOConfig().getMaximumMessageTime().get()
+        ((KinesisIndexTask) 
aNewTaskCapture.getValue()).getIOConfig().getMaximumMessageTime()
     );
   }
 
@@ -3708,8 +3708,8 @@ public class KinesisSupervisorTest extends EasyMockSupport
     supervisor.addTaskGroupToActivelyReadingTaskGroup(
         supervisor.getTaskGroupIdForPartition(SHARD_ID0),
         ImmutableMap.of("0", "0"),
-        Optional.absent(),
-        Optional.absent(),
+        null,
+        null,
         ImmutableSet.of("task1"),
         ImmutableSet.of()
     );
@@ -3717,8 +3717,8 @@ public class KinesisSupervisorTest extends EasyMockSupport
     supervisor.addTaskGroupToPendingCompletionTaskGroup(
         supervisor.getTaskGroupIdForPartition(SHARD_ID1),
         ImmutableMap.of("0", "0"),
-        Optional.absent(),
-        Optional.absent(),
+        null,
+        null,
         ImmutableSet.of("task2"),
         ImmutableSet.of()
     );
@@ -3947,8 +3947,8 @@ public class KinesisSupervisorTest extends EasyMockSupport
     supervisor.addTaskGroupToActivelyReadingTaskGroup(
         42,
         ImmutableMap.of(SHARD_ID1, "3"),
-        Optional.of(minMessageTime),
-        Optional.of(maxMessageTime),
+        minMessageTime,
+        maxMessageTime,
         ImmutableSet.of("id1", "id2", "id3", "id4"),
         ImmutableSet.of()
     );
@@ -4116,8 +4116,8 @@ public class KinesisSupervisorTest extends EasyMockSupport
         0,
         task1.getIOConfig().getStartSequenceNumbers(),
         task1.getIOConfig().getEndSequenceNumbers(),
-        task1.getIOConfig().getMinimumMessageTime().get(),
-        task1.getIOConfig().getMaximumMessageTime().get(),
+        task1.getIOConfig().getMinimumMessageTime(),
+        task1.getIOConfig().getMaximumMessageTime(),
         dataSchema
     );
 
@@ -5659,8 +5659,8 @@ public class KinesisSupervisorTest extends EasyMockSupport
     @Override
     public String generateSequenceName(
         Map<String, String> startPartitions,
-        Optional<DateTime> minimumMessageTime,
-        Optional<DateTime> maximumMessageTime,
+        DateTime minimumMessageTime,
+        DateTime maximumMessageTime,
         DataSchema dataSchema,
         SeekableStreamIndexTaskTuningConfig tuningConfig
     )
diff --git 
a/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/SeekableStreamIndexTaskIOConfig.java
 
b/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/SeekableStreamIndexTaskIOConfig.java
index 928149cdee4..8f92f76e836 100644
--- 
a/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/SeekableStreamIndexTaskIOConfig.java
+++ 
b/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/SeekableStreamIndexTaskIOConfig.java
@@ -21,8 +21,8 @@ package org.apache.druid.indexing.seekablestream;
 
 import com.fasterxml.jackson.annotation.JsonInclude;
 import com.fasterxml.jackson.annotation.JsonProperty;
-import com.google.common.base.Optional;
 import com.google.common.base.Preconditions;
+import org.apache.druid.common.config.Configs;
 import org.apache.druid.data.input.InputFormat;
 import org.apache.druid.segment.indexing.IOConfig;
 import org.joda.time.DateTime;
@@ -33,14 +33,13 @@ public abstract class 
SeekableStreamIndexTaskIOConfig<PartitionIdType, SequenceO
 {
   private static final boolean DEFAULT_USE_TRANSACTION = true;
 
-  @Nullable
   private final Integer taskGroupId;
   private final String baseSequenceName;
   private final SeekableStreamStartSequenceNumbers<PartitionIdType, 
SequenceOffsetType> startSequenceNumbers;
   private final SeekableStreamEndSequenceNumbers<PartitionIdType, 
SequenceOffsetType> endSequenceNumbers;
   private final boolean useTransaction;
-  private final Optional<DateTime> minimumMessageTime;
-  private final Optional<DateTime> maximumMessageTime;
+  private final DateTime minimumMessageTime;
+  private final DateTime maximumMessageTime;
   private final InputFormat inputFormat;
   private final Long refreshRejectionPeriodsInMinutes;
 
@@ -49,9 +48,9 @@ public abstract class 
SeekableStreamIndexTaskIOConfig<PartitionIdType, SequenceO
       final String baseSequenceName,
       final SeekableStreamStartSequenceNumbers<PartitionIdType, 
SequenceOffsetType> startSequenceNumbers,
       final SeekableStreamEndSequenceNumbers<PartitionIdType, 
SequenceOffsetType> endSequenceNumbers,
-      final Boolean useTransaction,
-      final DateTime minimumMessageTime,
-      final DateTime maximumMessageTime,
+      @Nullable final Boolean useTransaction,
+      @Nullable final DateTime minimumMessageTime,
+      @Nullable final DateTime maximumMessageTime,
       @Nullable final InputFormat inputFormat,
       @Nullable final Long refreshRejectionPeriodsInMinutes // can be null for 
backward compabitility
   )
@@ -60,9 +59,9 @@ public abstract class 
SeekableStreamIndexTaskIOConfig<PartitionIdType, SequenceO
     this.baseSequenceName = Preconditions.checkNotNull(baseSequenceName, 
"baseSequenceName");
     this.startSequenceNumbers = 
Preconditions.checkNotNull(startSequenceNumbers, "startSequenceNumbers");
     this.endSequenceNumbers = Preconditions.checkNotNull(endSequenceNumbers, 
"endSequenceNumbers");
-    this.useTransaction = useTransaction != null ? useTransaction : 
DEFAULT_USE_TRANSACTION;
-    this.minimumMessageTime = Optional.fromNullable(minimumMessageTime);
-    this.maximumMessageTime = Optional.fromNullable(maximumMessageTime);
+    this.useTransaction = Configs.valueOrDefault(useTransaction, 
DEFAULT_USE_TRANSACTION);
+    this.minimumMessageTime = minimumMessageTime;
+    this.maximumMessageTime = maximumMessageTime;
     this.inputFormat = inputFormat;
     this.refreshRejectionPeriodsInMinutes = refreshRejectionPeriodsInMinutes;
 
@@ -73,8 +72,8 @@ public abstract class 
SeekableStreamIndexTaskIOConfig<PartitionIdType, SequenceO
 
     Preconditions.checkArgument(
         startSequenceNumbers.getPartitionSequenceNumberMap()
-                       .keySet()
-                       
.equals(endSequenceNumbers.getPartitionSequenceNumberMap().keySet()),
+                            .keySet()
+                            
.equals(endSequenceNumbers.getPartitionSequenceNumberMap().keySet()),
         "start partition set and end partition set must match"
     );
   }
@@ -111,15 +110,15 @@ public abstract class 
SeekableStreamIndexTaskIOConfig<PartitionIdType, SequenceO
   }
 
   @JsonProperty
-  @JsonInclude(JsonInclude.Include.NON_ABSENT)
-  public Optional<DateTime> getMaximumMessageTime()
+  @JsonInclude(JsonInclude.Include.NON_NULL)
+  public DateTime getMaximumMessageTime()
   {
     return maximumMessageTime;
   }
 
   @JsonProperty
-  @JsonInclude(JsonInclude.Include.NON_ABSENT)
-  public Optional<DateTime> getMinimumMessageTime()
+  @JsonInclude(JsonInclude.Include.NON_NULL)
+  public DateTime getMinimumMessageTime()
   {
     return minimumMessageTime;
   }
diff --git 
a/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/SeekableStreamIndexTaskRunner.java
 
b/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/SeekableStreamIndexTaskRunner.java
index 4156b0dba90..a30d5f2368e 100644
--- 
a/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/SeekableStreamIndexTaskRunner.java
+++ 
b/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/SeekableStreamIndexTaskRunner.java
@@ -37,6 +37,7 @@ import com.google.common.util.concurrent.Futures;
 import com.google.common.util.concurrent.ListenableFuture;
 import com.google.common.util.concurrent.MoreExecutors;
 import com.google.common.util.concurrent.SettableFuture;
+import org.apache.druid.common.config.Configs;
 import org.apache.druid.data.input.Committer;
 import org.apache.druid.data.input.InputFormat;
 import org.apache.druid.data.input.InputRow;
@@ -274,8 +275,8 @@ public abstract class 
SeekableStreamIndexTaskRunner<PartitionIdType, SequenceOff
     this.ingestionState = IngestionState.NOT_STARTED;
     this.lockGranularityToUse = lockGranularityToUse;
 
-    minMessageTime = ioConfig.getMinimumMessageTime().or(DateTimes.MIN);
-    maxMessageTime = ioConfig.getMaximumMessageTime().or(DateTimes.MAX);
+    minMessageTime = Configs.valueOrDefault(ioConfig.getMinimumMessageTime(), 
DateTimes.MIN);
+    maxMessageTime = Configs.valueOrDefault(ioConfig.getMaximumMessageTime(), 
DateTimes.MAX);
     rejectionPeriodUpdaterExec = 
Execs.scheduledSingleThreaded("RejectionPeriodUpdater-Exec--%d");
 
     if (ioConfig.getRefreshRejectionPeriodsInMinutes() != null) {
diff --git 
a/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisor.java
 
b/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisor.java
index 1948e22e2aa..faa68f3d8ea 100644
--- 
a/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisor.java
+++ 
b/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisor.java
@@ -159,7 +159,6 @@ public abstract class 
SeekableStreamSupervisor<PartitionIdType, SequenceOffsetTy
   private static final long INITIAL_EMIT_LAG_METRIC_DELAY_MILLIS = 25000;
 
   private static final long MAX_RUN_FREQUENCY_MILLIS = 1000;
-  private static final long MINIMUM_FUTURE_TIMEOUT_IN_SECONDS = 120;
   private static final int MAX_INITIALIZATION_RETRIES = 20;
 
   private static final EmittingLogger log = new 
EmittingLogger(SeekableStreamSupervisor.class);
@@ -198,8 +197,8 @@ public abstract class 
SeekableStreamSupervisor<PartitionIdType, SequenceOffsetTy
     final ImmutableMap<PartitionIdType, SequenceOffsetType> 
unfilteredStartingSequencesForSequenceName;
 
     final ConcurrentHashMap<String, TaskData> tasks = new 
ConcurrentHashMap<>();
-    final Optional<DateTime> minimumMessageTime;
-    final Optional<DateTime> maximumMessageTime;
+    final DateTime minimumMessageTime;
+    final DateTime maximumMessageTime;
     final Set<PartitionIdType> exclusiveStartSequenceNumberPartitions;
     final TreeMap<Integer, Map<PartitionIdType, SequenceOffsetType>> 
checkpointSequences = new TreeMap<>();
     final String baseSequenceName;
@@ -211,8 +210,8 @@ public abstract class 
SeekableStreamSupervisor<PartitionIdType, SequenceOffsetTy
         int groupId,
         ImmutableMap<PartitionIdType, SequenceOffsetType> startingSequences,
         @Nullable ImmutableMap<PartitionIdType, SequenceOffsetType> 
unfilteredStartingSequencesForSequenceName,
-        Optional<DateTime> minimumMessageTime,
-        Optional<DateTime> maximumMessageTime,
+        @Nullable DateTime minimumMessageTime,
+        @Nullable DateTime maximumMessageTime,
         @Nullable Set<PartitionIdType> exclusiveStartSequenceNumberPartitions
     )
     {
@@ -239,8 +238,8 @@ public abstract class 
SeekableStreamSupervisor<PartitionIdType, SequenceOffsetTy
         int groupId,
         ImmutableMap<PartitionIdType, SequenceOffsetType> startingSequences,
         @Nullable ImmutableMap<PartitionIdType, SequenceOffsetType> 
unfilteredStartingSequencesForSequenceName,
-        Optional<DateTime> minimumMessageTime,
-        Optional<DateTime> maximumMessageTime,
+        DateTime minimumMessageTime,
+        DateTime maximumMessageTime,
         Set<PartitionIdType> exclusiveStartSequenceNumberPartitions,
         String baseSequenceName
     )
@@ -259,10 +258,9 @@ public abstract class 
SeekableStreamSupervisor<PartitionIdType, SequenceOffsetTy
       this.baseSequenceName = baseSequenceName;
     }
 
-    int addNewCheckpoint(Map<PartitionIdType, SequenceOffsetType> checkpoint)
+    void addNewCheckpoint(Map<PartitionIdType, SequenceOffsetType> checkpoint)
     {
       checkpointSequences.put(checkpointSequences.lastKey() + 1, checkpoint);
-      return checkpointSequences.lastKey();
     }
 
     Set<String> taskIds()
@@ -369,16 +367,6 @@ public abstract class 
SeekableStreamSupervisor<PartitionIdType, SequenceOffsetTy
       this.errors = errors;
     }
 
-    public String getGroupId()
-    {
-      return groupId;
-    }
-
-    public String getTaskId()
-    {
-      return taskId;
-    }
-
     public List<ParseExceptionReport> getErrors()
     {
       return errors;
@@ -1609,8 +1597,8 @@ public abstract class 
SeekableStreamSupervisor<PartitionIdType, SequenceOffsetTy
   public TaskGroup addTaskGroupToActivelyReadingTaskGroup(
       int taskGroupId,
       ImmutableMap<PartitionIdType, SequenceOffsetType> partitionOffsets,
-      Optional<DateTime> minMsgTime,
-      Optional<DateTime> maxMsgTime,
+      @Nullable DateTime minMsgTime,
+      @Nullable DateTime maxMsgTime,
       Set<String> tasks,
       Set<PartitionIdType> exclusiveStartingSequencePartitions
   )
@@ -1637,8 +1625,8 @@ public abstract class 
SeekableStreamSupervisor<PartitionIdType, SequenceOffsetTy
   public TaskGroup addTaskGroupToPendingCompletionTaskGroup(
       int taskGroupId,
       ImmutableMap<PartitionIdType, SequenceOffsetType> partitionOffsets,
-      Optional<DateTime> minMsgTime,
-      Optional<DateTime> maxMsgTime,
+      @Nullable DateTime minMsgTime,
+      @Nullable DateTime maxMsgTime,
       Set<String> tasks,
       Set<PartitionIdType> exclusiveStartingSequencePartitions
   )
@@ -2190,8 +2178,8 @@ public abstract class 
SeekableStreamSupervisor<PartitionIdType, SequenceOffsetTy
                             final TaskData prevTaskData = 
taskGroup.tasks.putIfAbsent(taskId, new TaskData());
                             if (prevTaskData != null) {
                               throw new ISE(
-                                  "taskGroup[%s] already exists for new 
task[%s]",
-                                  prevTaskData, taskId
+                                  "Task[%s] already exists in taskGroup[%d] 
with data[%s]",
+                                  taskId, taskGroup.groupId, prevTaskData
                               );
                             }
                             
verifySameSequenceNameForAllTasksInGroup(taskGroupId);
@@ -2459,7 +2447,7 @@ public abstract class 
SeekableStreamSupervisor<PartitionIdType, SequenceOffsetTy
                   return sequence.compareTo(latestOffset) == 0;
                 }
             ) && earliestConsistentSequenceId.compareAndSet(-1, 
sequenceCheckpoint.getKey())) || (
-                pendingCompletionTaskGroups.getOrDefault(groupId, new 
CopyOnWriteArrayList<>()).size() > 0
+                !pendingCompletionTaskGroups.getOrDefault(groupId, new 
CopyOnWriteArrayList<>()).isEmpty()
                 && earliestConsistentSequenceId.compareAndSet(-1, 
taskCheckpoints.firstKey()))) {
           final SortedMap<Integer, Map<PartitionIdType, SequenceOffsetType>> 
latestCheckpoints = new TreeMap<>(
               taskCheckpoints.tailMap(earliestConsistentSequenceId.get())
@@ -2470,7 +2458,7 @@ public abstract class 
SeekableStreamSupervisor<PartitionIdType, SequenceOffsetTy
           taskGroup.checkpointSequences.putAll(latestCheckpoints);
         } else {
           log.debug(
-              "Adding task [%s] to kill list, checkpoints[%s], latestoffsets 
from DB [%s]",
+              "Adding task[%s] to kill list, checkpoints[%s], latestoffsets 
from DB [%s]",
               taskId,
               taskCheckpoints,
               latestOffsetsFromDb
@@ -2485,7 +2473,7 @@ public abstract class 
SeekableStreamSupervisor<PartitionIdType, SequenceOffsetTy
             || 
taskCheckpoints.tailMap(taskGroup.checkpointSequences.firstKey()).size()
                != taskGroup.checkpointSequences.size()) {
           log.debug(
-              "Adding task [%s] to kill list, checkpoints[%s], taskgroup 
checkpoints [%s]",
+              "Adding task[%s] to kill list, checkpoints[%s], taskgroup 
checkpoints [%s]",
               taskId,
               taskCheckpoints,
               taskGroup.checkpointSequences
@@ -2496,12 +2484,12 @@ public abstract class 
SeekableStreamSupervisor<PartitionIdType, SequenceOffsetTy
       taskIndex++;
     }
 
-    if ((tasksToKill.size() > 0 && tasksToKill.size() == 
taskGroup.tasks.size()) ||
-        (taskGroup.tasks.size() == 0
-         && pendingCompletionTaskGroups.getOrDefault(groupId, new 
CopyOnWriteArrayList<>()).size() == 0)) {
+    if ((!tasksToKill.isEmpty() && tasksToKill.size() == 
taskGroup.tasks.size()) ||
+        (taskGroup.tasks.isEmpty()
+         && pendingCompletionTaskGroups.getOrDefault(groupId, new 
CopyOnWriteArrayList<>()).isEmpty())) {
       // killing all tasks or no task left in the group ?
       // clear state about the taskgroup so that get latest sequence 
information is fetched from metadata store
-      log.warn("Clearing task group [%d] information as no valid tasks left 
the group", groupId);
+      log.warn("Clearing task group[%d] information as no valid tasks left in 
the group", groupId);
       activelyReadingTaskGroups.remove(groupId);
       for (PartitionIdType partitionId : taskGroup.startingSequences.keySet()) 
{
         partitionOffsets.put(partitionId, getNotSetMarker());
@@ -2512,8 +2500,8 @@ public abstract class 
SeekableStreamSupervisor<PartitionIdType, SequenceOffsetTy
         sequenceCheckpoint -> {
           killTask(
               sequenceCheckpoint.lhs,
-              "Killing task [%s], as its checkpoints [%s] are not consistent 
with group checkpoints[%s] or latest "
-              + "persisted sequences in metadata store [%s]",
+              "Killing task[%s], as its checkpoints[%s] are not consistent 
with group checkpoints[%s]"
+              + " or latest persisted sequences in metadata store[%s].",
               sequenceCheckpoint.lhs,
               sequenceCheckpoint.rhs,
               taskGroup.checkpointSequences,
@@ -2548,7 +2536,7 @@ public abstract class 
SeekableStreamSupervisor<PartitionIdType, SequenceOffsetTy
             }
           }
           if (!isTaskGroupPresent) {
-            log.info("Creating new pending completion task group [%s] for 
discovered task [%s].", groupId, taskId);
+            log.info("Creating new pending completion task group[%s] for 
discovered task[%s].", groupId, taskId);
 
             // reading the minimumMessageTime & maximumMessageTime from the 
publishing task and setting it here is not necessary as this task cannot
             // change to a state where it will read any more events.
@@ -2557,8 +2545,8 @@ public abstract class 
SeekableStreamSupervisor<PartitionIdType, SequenceOffsetTy
                 groupId,
                 ImmutableMap.copyOf(startingPartitions),
                 null,
-                Optional.absent(),
-                Optional.absent(),
+                null,
+                null,
                 null
             );
 
@@ -2574,7 +2562,10 @@ public abstract class 
SeekableStreamSupervisor<PartitionIdType, SequenceOffsetTy
     for (TaskGroup taskGroup : taskGroupList) {
       if (taskGroup.startingSequences.equals(startingPartitions)) {
         if (taskGroup.tasks.putIfAbsent(taskId, new TaskData()) == null) {
-          log.info("Added discovered task [%s] to existing pending completion 
task group [%s]. PendingCompletionTaskGroup: %s", taskId, groupId, 
taskGroup.taskIds());
+          log.info(
+              "Added discovered task[%s] to taskGroup[%s] with pending 
completion tasks[%s].",
+              taskId, groupId, taskGroup.taskIds()
+          );
         }
         return;
       }
@@ -2625,8 +2616,8 @@ public abstract class 
SeekableStreamSupervisor<PartitionIdType, SequenceOffsetTy
           public Void apply(@Nullable Boolean result)
           {
             if (result == null || !result) {
-              log.info("Task [%s] failed to stop in a timely manner, killing 
task", id);
-              killTask(id, "Task [%s] failed to stop in a timely manner, 
killing task", id);
+              log.info("Killing task[%s] as it failed to stop in a timely 
manner.", id);
+              killTask(id, "Failed to stop in a timely manner");
             }
             return null;
           }
@@ -2701,8 +2692,8 @@ public abstract class 
SeekableStreamSupervisor<PartitionIdType, SequenceOffsetTy
   @VisibleForTesting
   public String generateSequenceName(
       Map<PartitionIdType, SequenceOffsetType> startPartitions,
-      Optional<DateTime> minimumMessageTime,
-      Optional<DateTime> maximumMessageTime,
+      DateTime minimumMessageTime,
+      DateTime maximumMessageTime,
       DataSchema dataSchema,
       SeekableStreamIndexTaskTuningConfig tuningConfig
   )
@@ -2712,10 +2703,10 @@ public abstract class 
SeekableStreamSupervisor<PartitionIdType, SequenceOffsetTy
     for (Entry<PartitionIdType, SequenceOffsetType> entry : 
startPartitions.entrySet()) {
       sb.append(StringUtils.format("+%s(%s)", entry.getKey().toString(), 
entry.getValue().toString()));
     }
-    String partitionOffsetStr = startPartitions.size() == 0 ? "" : 
sb.toString().substring(1);
+    String partitionOffsetStr = startPartitions.isEmpty() ? "" : 
sb.substring(1);
 
-    String minMsgTimeStr = (minimumMessageTime.isPresent() ? 
String.valueOf(minimumMessageTime.get().getMillis()) : "");
-    String maxMsgTimeStr = (maximumMessageTime.isPresent() ? 
String.valueOf(maximumMessageTime.get().getMillis()) : "");
+    String minMsgTimeStr = (minimumMessageTime == null ? "" : 
String.valueOf(minimumMessageTime.getMillis()));
+    String maxMsgTimeStr = (maximumMessageTime == null ? "" : 
String.valueOf(maximumMessageTime.getMillis()));
 
     String dataSchemaStr, tuningConfigStr;
     try {
@@ -2773,11 +2764,14 @@ public abstract class 
SeekableStreamSupervisor<PartitionIdType, SequenceOffsetTy
       return false;
     }
 
-    log.debug("Found [%d] partitions for stream [%s]", 
partitionIdsFromSupplier.size(), ioConfig.getStream());
+    log.debug("Found [%d] partitions for stream[%s]", 
partitionIdsFromSupplier.size(), ioConfig.getStream());
 
     final int configuredTaskCount = spec.getIoConfig().getTaskCount();
     if (configuredTaskCount > partitionIdsFromSupplier.size()) {
-      log.warn("Configured task count[%s] for supervisor[%s] is greater than 
the number of partitions[%d].", configuredTaskCount, supervisorId, 
partitionIdsFromSupplier.size());
+      log.warn(
+          "Configured task count[%s] for supervisor[%s] is greater than the 
number of partitions[%d].",
+          configuredTaskCount, supervisorId, partitionIdsFromSupplier.size()
+      );
     }
     Map<PartitionIdType, SequenceOffsetType> storedMetadata = 
getOffsetsFromMetadataStorage();
     Set<PartitionIdType> storedPartitions = storedMetadata.keySet();
@@ -2809,7 +2803,7 @@ public abstract class 
SeekableStreamSupervisor<PartitionIdType, SequenceOffsetTy
         new HashSet<>(previousPartitionIds)
     );
 
-    log.debug("active partitions from supplier: " + 
activePartitionsIdsFromSupplier);
+    log.debug("active partitions from supplier: %s", 
activePartitionsIdsFromSupplier);
 
     if (partitionIdsFromSupplierWithoutPreviouslyExpiredPartitions.size() != 
partitionIdsFromSupplier.size()) {
       // this should never happen, but we check for it and exclude the expired 
partitions if they somehow reappear
@@ -2819,9 +2813,9 @@ public abstract class 
SeekableStreamSupervisor<PartitionIdType, SequenceOffsetTy
           partitionIdsFromSupplier
       );
     }
-    if (activePartitionsIdsFromSupplier.size() == 0) {
+    if (activePartitionsIdsFromSupplier.isEmpty()) {
       String errMsg = StringUtils.format(
-          "No active partitions found for stream [%s] after removing closed 
and previously expired partitions",
+          "No active partitions found for stream[%s] after removing closed and 
previously expired partitions",
           ioConfig.getStream()
       );
       stateManager.recordThrowableEvent(new StreamException(new ISE(errMsg)));
@@ -2832,7 +2826,7 @@ public abstract class 
SeekableStreamSupervisor<PartitionIdType, SequenceOffsetTy
     boolean initialPartitionDiscovery = this.partitionIds.isEmpty();
     for (PartitionIdType partitionId : 
partitionIdsFromSupplierWithoutPreviouslyExpiredPartitions) {
       if (closedPartitions.contains(partitionId)) {
-        log.info("partition [%s] is closed and has no more data, skipping.", 
partitionId);
+        log.info("Skipping partition[%s] as it is closed and has no more 
data.", partitionId);
         continue;
       }
 
@@ -2869,7 +2863,7 @@ public abstract class 
SeekableStreamSupervisor<PartitionIdType, SequenceOffsetTy
 
       if (partitionOffsets.putIfAbsent(partitionId, getNotSetMarker()) == 
null) {
         log.debug(
-            "New partition [%s] discovered for stream [%s], added to task 
group [%d]",
+            "Discovered new partition[%s] for stream [%s], added to 
taskGroup[%d]",
             partitionId,
             ioConfig.getStream(),
             taskGroupId
@@ -2879,10 +2873,10 @@ public abstract class 
SeekableStreamSupervisor<PartitionIdType, SequenceOffsetTy
       }
     }
 
-    if (newlyDiscovered.size() > 0) {
+    if (!newlyDiscovered.isEmpty()) {
       for (Int2ObjectMap.Entry<List<PartitionIdType>> taskGroupPartitions : 
newlyDiscovered.int2ObjectEntrySet()) {
         log.info(
-            "New partitions %s discovered for stream [%s], added to task group 
[%s]",
+            "Discovered new partitions[%s] for stream[%s], added to 
taskGroup[%s].",
             taskGroupPartitions.getValue(),
             ioConfig.getStream(),
             taskGroupPartitions.getIntKey()
@@ -3501,7 +3495,7 @@ public abstract class 
SeekableStreamSupervisor<PartitionIdType, SequenceOffsetTy
     }
 
     log.info(
-        "Stopping all tasks in taskGroup[%s] because: [%s]",
+        "Stopping all tasks in taskGroup[%s] because [%s].",
         taskGroup.groupId,
         StringUtils.format(stopReasonFormat, args)
     );
@@ -3554,7 +3548,7 @@ public abstract class 
SeekableStreamSupervisor<PartitionIdType, SequenceOffsetTy
           final String taskId = entry.getKey();
           final TaskData taskData = entry.getValue();
 
-          Preconditions.checkNotNull(taskData.status, "task[%s] has null 
status", taskId);
+          Preconditions.checkNotNull(taskData.status, "Task[%s] has null 
status", taskId);
 
           if (taskData.status.isFailure()) {
             stateManager.recordCompletedTaskState(TaskState.FAILED);
@@ -3569,7 +3563,7 @@ public abstract class 
SeekableStreamSupervisor<PartitionIdType, SequenceOffsetTy
           if (taskData.status.isSuccess()) {
             // If one of the pending completion tasks was successful, stop the 
rest of the tasks in the group as
             // we no longer need them to publish their segment.
-            log.info("Task [%s] completed successfully, stopping tasks %s", 
taskId, group.taskIds());
+            log.info("Task[%s] completed successfully, stopping tasks[%s]", 
taskId, group.taskIds());
             stateManager.recordCompletedTaskState(TaskState.SUCCESS);
             futures.add(
                 stopTasksInGroup(group, "Task [%s] completed successfully, 
stopping tasks %s", taskId, group.taskIds())
@@ -3582,10 +3576,10 @@ public abstract class 
SeekableStreamSupervisor<PartitionIdType, SequenceOffsetTy
 
         if ((!foundSuccess && group.completionTimeout.isBeforeNow()) || 
entireTaskGroupFailed) {
           if (entireTaskGroupFailed) {
-            log.warn("All tasks in group [%d] failed to publish, killing all 
tasks for these partitions", groupId);
+            log.warn("All tasks in taskGroup[%d] failed to publish, killing 
all tasks for these partitions", groupId);
           } else {
             log.makeAlert(
-                "No task in [%s] for taskGroup [%d] succeeded before the 
completion timeout elapsed [%s]! "
+                "No task in [%s] for taskGroup[%d] succeeded before the 
completion timeout elapsed [%s]! "
                 + "Check metrics and logs to see if the creation, publish or 
handoff"
                 + " of any segment is taking longer than usual.",
                 group.taskIds(),
@@ -3764,18 +3758,20 @@ public abstract class 
SeekableStreamSupervisor<PartitionIdType, SequenceOffsetTy
     for (Integer groupId : partitionGroups.keySet()) {
       if (!activelyReadingTaskGroups.containsKey(groupId)) {
         log.info("Creating new taskGroup[%d] for partitions[%s].", groupId, 
partitionGroups.get(groupId));
-        Optional<DateTime> minimumMessageTime;
+        final DateTime minimumMessageTime;
         if (ioConfig.getLateMessageRejectionStartDateTime().isPresent()) {
-          minimumMessageTime = 
Optional.of(ioConfig.getLateMessageRejectionStartDateTime().get());
+          minimumMessageTime = 
ioConfig.getLateMessageRejectionStartDateTime().get();
         } else {
-          minimumMessageTime = 
(ioConfig.getLateMessageRejectionPeriod().isPresent() ? Optional.of(
-              
DateTimes.nowUtc().minus(ioConfig.getLateMessageRejectionPeriod().get())
-          ) : Optional.absent());
+          minimumMessageTime = 
ioConfig.getLateMessageRejectionPeriod().isPresent()
+                               ? 
DateTimes.nowUtc().minus(ioConfig.getLateMessageRejectionPeriod().get())
+                               : null;
         }
 
-        Optional<DateTime> maximumMessageTime = 
(ioConfig.getEarlyMessageRejectionPeriod().isPresent() ? Optional.of(
-            
DateTimes.nowUtc().plus(ioConfig.getTaskDuration()).plus(ioConfig.getEarlyMessageRejectionPeriod().get())
-        ) : Optional.absent());
+        final DateTime maximumMessageTime = 
ioConfig.getEarlyMessageRejectionPeriod().isPresent()
+                                            ? DateTimes.nowUtc()
+                                                       
.plus(ioConfig.getTaskDuration())
+                                                       
.plus(ioConfig.getEarlyMessageRejectionPeriod().get())
+                                            : null;
 
         final Map<PartitionIdType, OrderedSequenceNumber<SequenceOffsetType>> 
unfilteredStartingOffsets =
             generateStartingSequencesForPartitionGroup(groupId);
@@ -3848,7 +3844,7 @@ public abstract class 
SeekableStreamSupervisor<PartitionIdType, SequenceOffsetTy
       Integer groupId = entry.getKey();
 
       if (taskGroup.startingSequences == null ||
-          taskGroup.startingSequences.size() == 0 ||
+          taskGroup.startingSequences.isEmpty() ||
           taskGroup.startingSequences.values().stream().allMatch(x -> x == 
null || isEndOfShard(x))) {
         log.debug("Nothing to read in any partition for taskGroup[%d], 
skipping task creation.", groupId);
         continue;
@@ -4047,8 +4043,8 @@ public abstract class 
SeekableStreamSupervisor<PartitionIdType, SequenceOffsetTy
         .get(groupId)
         .exclusiveStartSequenceNumberPartitions;
 
-    DateTime minimumMessageTime = group.minimumMessageTime.orNull();
-    DateTime maximumMessageTime = group.maximumMessageTime.orNull();
+    DateTime minimumMessageTime = group.minimumMessageTime;
+    DateTime maximumMessageTime = group.maximumMessageTime;
 
     SeekableStreamIndexTaskIOConfig newIoConfig = createTaskIoConfig(
         groupId,
@@ -4159,7 +4155,7 @@ public abstract class 
SeekableStreamSupervisor<PartitionIdType, SequenceOffsetTy
   {
     Map<PartitionIdType, SequenceOffsetType> offsetsFromMetadataStorage = 
getOffsetsFromMetadataStorage();
     if (!spec.isSuspended()) {
-      if (activelyReadingTaskGroups.size() > 0 || 
pendingCompletionTaskGroups.size() > 0) {
+      if (!activelyReadingTaskGroups.isEmpty() || 
!pendingCompletionTaskGroups.isEmpty()) {
         Map<PartitionIdType, SequenceOffsetType> currentOffsets =
             Stream.concat(
                 activelyReadingTaskGroups
@@ -4392,7 +4388,7 @@ public abstract class 
SeekableStreamSupervisor<PartitionIdType, SequenceOffsetTy
 
   /**
    * default implementation, schedules periodic fetch of latest offsets and 
{@link #emitLag} reporting for Kafka and Kinesis
-   * and periodic reporting of {@Link #emitNoticesQueueSize} for various data 
sources.
+   * and periodic reporting of {@link #emitNoticesQueueSize} for various data 
sources.
    */
   protected void scheduleReporting(ScheduledExecutorService reportingExec)
   {
@@ -4424,15 +4420,21 @@ public abstract class 
SeekableStreamSupervisor<PartitionIdType, SequenceOffsetTy
   }
 
   /**
-   * calculate lag per partition for kafka as a measure of message count, 
kinesis implementation returns an empty
-   * map
+   * Calculates record lag per partition using the current offsets from tasks.
    *
-   * @return map of partition id -> lag
+   * @return Map from partition ID to record lag.
+   * @see #getPartitionRecordLag
    */
   protected abstract Map<PartitionIdType, Long> getRecordLagPerPartition(
       Map<PartitionIdType, SequenceOffsetType> currentOffsets
   );
 
+  /**
+   * Calculates time lag per partition using the current offsets from tasks.
+   *
+   * @return Map from partition ID to time lag.
+   * @see #getPartitionTimeLag
+   */
   protected abstract Map<PartitionIdType, Long> getTimeLagPerPartition(
       Map<PartitionIdType, SequenceOffsetType> currentOffsets
   );
diff --git 
a/indexing-service/src/test/java/org/apache/druid/indexing/seekablestream/SeekableStreamIndexTaskRunnerTest.java
 
b/indexing-service/src/test/java/org/apache/druid/indexing/seekablestream/SeekableStreamIndexTaskRunnerTest.java
index f78fd680e36..db724931b39 100644
--- 
a/indexing-service/src/test/java/org/apache/druid/indexing/seekablestream/SeekableStreamIndexTaskRunnerTest.java
+++ 
b/indexing-service/src/test/java/org/apache/druid/indexing/seekablestream/SeekableStreamIndexTaskRunnerTest.java
@@ -21,7 +21,6 @@ package org.apache.druid.indexing.seekablestream;
 
 import com.fasterxml.jackson.core.type.TypeReference;
 import com.fasterxml.jackson.databind.ObjectMapper;
-import com.google.common.base.Optional;
 import com.google.common.collect.ImmutableMap;
 import org.apache.druid.data.input.InputRow;
 import org.apache.druid.data.input.impl.DimensionsSpec;
@@ -90,8 +89,8 @@ public class SeekableStreamIndexTaskRunnerTest
     DateTime now = DateTimes.nowUtc();
 
     
Mockito.when(ioConfig.getRefreshRejectionPeriodsInMinutes()).thenReturn(120L);
-    
Mockito.when(ioConfig.getMaximumMessageTime()).thenReturn(Optional.of(DateTimes.nowUtc().plusHours(2)));
-    
Mockito.when(ioConfig.getMinimumMessageTime()).thenReturn(Optional.of(DateTimes.nowUtc().minusHours(2)));
+    
Mockito.when(ioConfig.getMaximumMessageTime()).thenReturn(DateTimes.nowUtc().plusHours(2));
+    
Mockito.when(ioConfig.getMinimumMessageTime()).thenReturn(DateTimes.nowUtc().minusHours(2));
     Mockito.when(ioConfig.getInputFormat()).thenReturn(new 
JsonInputFormat(null, null, null, null, null));
     
Mockito.when(ioConfig.getStartSequenceNumbers()).thenReturn(sequenceNumbers);
     
Mockito.when(ioConfig.getEndSequenceNumbers()).thenReturn(endSequenceNumbers);
@@ -143,8 +142,8 @@ public class SeekableStreamIndexTaskRunnerTest
 
     
Mockito.when(ioConfig.getRefreshRejectionPeriodsInMinutes()).thenReturn(null);
     // min max time not populated.
-    
Mockito.when(ioConfig.getMaximumMessageTime()).thenReturn(Optional.absent());
-    
Mockito.when(ioConfig.getMinimumMessageTime()).thenReturn(Optional.absent());
+    Mockito.when(ioConfig.getMaximumMessageTime()).thenReturn(null);
+    Mockito.when(ioConfig.getMinimumMessageTime()).thenReturn(null);
     Mockito.when(ioConfig.getInputFormat()).thenReturn(new 
JsonInputFormat(null, null, null, null, null));
     
Mockito.when(ioConfig.getStartSequenceNumbers()).thenReturn(sequenceNumbers);
     
Mockito.when(ioConfig.getEndSequenceNumbers()).thenReturn(endSequenceNumbers);
diff --git 
a/indexing-service/src/test/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisorStateTest.java
 
b/indexing-service/src/test/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisorStateTest.java
index fe03a957ad2..58c838615ac 100644
--- 
a/indexing-service/src/test/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisorStateTest.java
+++ 
b/indexing-service/src/test/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisorStateTest.java
@@ -1850,8 +1850,8 @@ public class SeekableStreamSupervisorStateTest extends 
EasyMockSupport
     supervisor.addTaskGroupToActivelyReadingTaskGroup(
         supervisor.getTaskGroupIdForPartition("0"),
         ImmutableMap.of("0", "0"),
-        Optional.absent(),
-        Optional.absent(),
+        null,
+        null,
         ImmutableSet.of("task1"),
         ImmutableSet.of()
     );
@@ -1859,8 +1859,8 @@ public class SeekableStreamSupervisorStateTest extends 
EasyMockSupport
     supervisor.addTaskGroupToPendingCompletionTaskGroup(
         supervisor.getTaskGroupIdForPartition("1"),
         ImmutableMap.of("1", "0"),
-        Optional.absent(),
-        Optional.absent(),
+        null,
+        null,
         ImmutableSet.of("task2"),
         ImmutableSet.of()
     );
@@ -1896,8 +1896,8 @@ public class SeekableStreamSupervisorStateTest extends 
EasyMockSupport
     supervisor.addTaskGroupToActivelyReadingTaskGroup(
         supervisor.getTaskGroupIdForPartition("0"),
         ImmutableMap.of("0", "5"),
-        Optional.absent(),
-        Optional.absent(),
+        null,
+        null,
         ImmutableSet.of("task1"),
         ImmutableSet.of()
     );
@@ -1905,8 +1905,8 @@ public class SeekableStreamSupervisorStateTest extends 
EasyMockSupport
     supervisor.addTaskGroupToPendingCompletionTaskGroup(
         supervisor.getTaskGroupIdForPartition("1"),
         ImmutableMap.of("1", "6"),
-        Optional.absent(),
-        Optional.absent(),
+        null,
+        null,
         ImmutableSet.of("task2"),
         ImmutableSet.of()
     );
@@ -1956,8 +1956,8 @@ public class SeekableStreamSupervisorStateTest extends 
EasyMockSupport
     supervisor.addTaskGroupToActivelyReadingTaskGroup(
         supervisor.getTaskGroupIdForPartition("0"),
         checkpointOffsets,
-        Optional.absent(),
-        Optional.absent(),
+        null,
+        null,
         ImmutableSet.of("task1"),
         ImmutableSet.of()
     );
@@ -2003,24 +2003,24 @@ public class SeekableStreamSupervisorStateTest extends 
EasyMockSupport
     final SeekableStreamSupervisor.TaskGroup taskGroup0 = 
supervisor.addTaskGroupToActivelyReadingTaskGroup(
         supervisor.getTaskGroupIdForPartition("0"),
         ImmutableMap.of("0", "5"),
-        Optional.absent(),
-        Optional.absent(),
+        null,
+        null,
         ImmutableSet.of("task0"),
         ImmutableSet.of()
     );
     final SeekableStreamSupervisor.TaskGroup taskGroup1 = 
supervisor.addTaskGroupToActivelyReadingTaskGroup(
         supervisor.getTaskGroupIdForPartition("1"),
         ImmutableMap.of("1", "6"),
-        Optional.absent(),
-        Optional.absent(),
+        null,
+        null,
         ImmutableSet.of("task1"),
         ImmutableSet.of()
     );
     final SeekableStreamSupervisor.TaskGroup taskGroup2 = 
supervisor.addTaskGroupToPendingCompletionTaskGroup(
         supervisor.getTaskGroupIdForPartition("2"),
         ImmutableMap.of("2", "100"),
-        Optional.absent(),
-        Optional.absent(),
+        null,
+        null,
         ImmutableSet.of("task2"),
         ImmutableSet.of()
     );
@@ -2097,8 +2097,8 @@ public class SeekableStreamSupervisorStateTest extends 
EasyMockSupport
     supervisor.addTaskGroupToActivelyReadingTaskGroup(
         supervisor.getTaskGroupIdForPartition("0"),
         ImmutableMap.of("0", "5"),
-        Optional.absent(),
-        Optional.absent(),
+        null,
+        null,
         ImmutableSet.of("task1"),
         ImmutableSet.of()
     );
@@ -2106,8 +2106,8 @@ public class SeekableStreamSupervisorStateTest extends 
EasyMockSupport
     supervisor.addTaskGroupToActivelyReadingTaskGroup(
         supervisor.getTaskGroupIdForPartition("1"),
         ImmutableMap.of("1", "6"),
-        Optional.absent(),
-        Optional.absent(),
+        null,
+        null,
         ImmutableSet.of("task2"),
         ImmutableSet.of()
     );
@@ -2115,8 +2115,8 @@ public class SeekableStreamSupervisorStateTest extends 
EasyMockSupport
     supervisor.addTaskGroupToActivelyReadingTaskGroup(
         supervisor.getTaskGroupIdForPartition("2"),
         ImmutableMap.of("2", "100"),
-        Optional.absent(),
-        Optional.absent(),
+        null,
+        null,
         ImmutableSet.of("task3"),
         ImmutableSet.of()
     );
@@ -2168,8 +2168,8 @@ public class SeekableStreamSupervisorStateTest extends 
EasyMockSupport
     supervisor.addTaskGroupToActivelyReadingTaskGroup(
         supervisor.getTaskGroupIdForPartition("0"),
         ImmutableMap.of("0", "5"),
-        Optional.absent(),
-        Optional.absent(),
+        null,
+        null,
         ImmutableSet.of("task1"),
         ImmutableSet.of()
     );
@@ -2177,8 +2177,8 @@ public class SeekableStreamSupervisorStateTest extends 
EasyMockSupport
     supervisor.addTaskGroupToActivelyReadingTaskGroup(
         supervisor.getTaskGroupIdForPartition("1"),
         ImmutableMap.of("1", "6"),
-        Optional.absent(),
-        Optional.absent(),
+        null,
+        null,
         ImmutableSet.of("task2"),
         ImmutableSet.of()
     );
@@ -2186,8 +2186,8 @@ public class SeekableStreamSupervisorStateTest extends 
EasyMockSupport
     supervisor.addTaskGroupToActivelyReadingTaskGroup(
         supervisor.getTaskGroupIdForPartition("2"),
         ImmutableMap.of("2", "100"),
-        Optional.absent(),
-        Optional.absent(),
+        null,
+        null,
         ImmutableSet.of("task3"),
         ImmutableSet.of()
     );
@@ -2243,8 +2243,8 @@ public class SeekableStreamSupervisorStateTest extends 
EasyMockSupport
     supervisor.addTaskGroupToActivelyReadingTaskGroup(
         supervisor.getTaskGroupIdForPartition("0"),
         ImmutableMap.of("0", "5"),
-        Optional.absent(),
-        Optional.absent(),
+        null,
+        null,
         ImmutableSet.of("task1"),
         ImmutableSet.of()
     );
@@ -2252,8 +2252,8 @@ public class SeekableStreamSupervisorStateTest extends 
EasyMockSupport
     supervisor.addTaskGroupToActivelyReadingTaskGroup(
         supervisor.getTaskGroupIdForPartition("1"),
         ImmutableMap.of("1", "6"),
-        Optional.absent(),
-        Optional.absent(),
+        null,
+        null,
         ImmutableSet.of("task2"),
         ImmutableSet.of()
     );
@@ -2310,8 +2310,8 @@ public class SeekableStreamSupervisorStateTest extends 
EasyMockSupport
     supervisor.addTaskGroupToActivelyReadingTaskGroup(
         supervisor.getTaskGroupIdForPartition("0"),
         ImmutableMap.of("0", "5"),
-        Optional.absent(),
-        Optional.absent(),
+        null,
+        null,
         ImmutableSet.of("task1"),
         ImmutableSet.of()
     );
@@ -2319,8 +2319,8 @@ public class SeekableStreamSupervisorStateTest extends 
EasyMockSupport
     supervisor.addTaskGroupToActivelyReadingTaskGroup(
         supervisor.getTaskGroupIdForPartition("1"),
         ImmutableMap.of("1", "6"),
-        Optional.absent(),
-        Optional.absent(),
+        null,
+        null,
         ImmutableSet.of("task2"),
         ImmutableSet.of()
     );
@@ -2353,8 +2353,8 @@ public class SeekableStreamSupervisorStateTest extends 
EasyMockSupport
     supervisor.addTaskGroupToActivelyReadingTaskGroup(
         supervisor.getTaskGroupIdForPartition("0"),
         ImmutableMap.of("0", "0"),
-        Optional.absent(),
-        Optional.absent(),
+        null,
+        null,
         ImmutableSet.of("task1"),
         ImmutableSet.of()
     );
@@ -2362,8 +2362,8 @@ public class SeekableStreamSupervisorStateTest extends 
EasyMockSupport
     supervisor.addTaskGroupToPendingCompletionTaskGroup(
         supervisor.getTaskGroupIdForPartition("1"),
         ImmutableMap.of("1", "0"),
-        Optional.absent(),
-        Optional.absent(),
+        null,
+        null,
         ImmutableSet.of("task2"),
         ImmutableSet.of()
     );
@@ -2392,8 +2392,8 @@ public class SeekableStreamSupervisorStateTest extends 
EasyMockSupport
     supervisor.addTaskGroupToActivelyReadingTaskGroup(
         supervisor.getTaskGroupIdForPartition("0"),
         ImmutableMap.of("0", "0"),
-        Optional.absent(),
-        Optional.absent(),
+        null,
+        null,
         ImmutableSet.of("task1"),
         ImmutableSet.of()
     );
@@ -2401,8 +2401,8 @@ public class SeekableStreamSupervisorStateTest extends 
EasyMockSupport
     supervisor.addTaskGroupToPendingCompletionTaskGroup(
         supervisor.getTaskGroupIdForPartition("1"),
         ImmutableMap.of("1", "0"),
-        Optional.absent(),
-        Optional.absent(),
+        null,
+        null,
         ImmutableSet.of("task2"),
         ImmutableSet.of()
     );
@@ -2442,8 +2442,8 @@ public class SeekableStreamSupervisorStateTest extends 
EasyMockSupport
     supervisor.addTaskGroupToActivelyReadingTaskGroup(
         supervisor.getTaskGroupIdForPartition("0"),
         ImmutableMap.of("0", "0"),
-        Optional.absent(),
-        Optional.absent(),
+        null,
+        null,
         ImmutableSet.of("task1"),
         ImmutableSet.of()
     );
@@ -2451,8 +2451,8 @@ public class SeekableStreamSupervisorStateTest extends 
EasyMockSupport
     supervisor.addTaskGroupToPendingCompletionTaskGroup(
         supervisor.getTaskGroupIdForPartition("1"),
         ImmutableMap.of("1", "0"),
-        Optional.absent(),
-        Optional.absent(),
+        null,
+        null,
         ImmutableSet.of("task2"),
         ImmutableSet.of()
     );


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to