This is an automated email from the ASF dual-hosted git repository.

georgew5656 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/druid.git


The following commit(s) were added to refs/heads/master by this push:
     new 60daddedf8b SeekableStreamSupervisor: Use workerExec as the client 
connectExec. (#17394)
60daddedf8b is described below

commit 60daddedf8b0a9faf1edeb4603eb4b8e86fd3dc2
Author: Gian Merlino <[email protected]>
AuthorDate: Tue Oct 22 20:21:21 2024 -0700

    SeekableStreamSupervisor: Use workerExec as the client connectExec. (#17394)
    
    * SeekableStreamSupervisor: Use workerExec as the client connectExec.
    
    This patch uses the already-existing per-supervisor workerExec as the
    connectExec for task clients, rather than using the process-wide default
    ServiceClientFactory pool.
    
    This helps prevent callbacks from backlogging on the process-wide pool.
    It's especially useful for retries, where callbacks may need to establish
    new TCP connections or perform TLS handshakes.
    
    * Fix compilation, tests.
    
    * Fix style.
---
 .../RabbitStreamIndexTaskClientFactory.java        |   6 +-
 .../kafka/KafkaIndexTaskClientFactory.java         |   6 +-
 .../kafka/supervisor/KafkaSupervisorTest.java      | 999 +++++++++++++++------
 .../kinesis/KinesisIndexTaskClientFactory.java     |   6 +-
 .../kinesis/supervisor/KinesisSupervisorTest.java  |  24 +-
 .../SeekableStreamIndexTaskClientFactory.java      |  27 +-
 .../supervisor/SeekableStreamSupervisor.java       |  36 +-
 .../SeekableStreamSupervisorStateTest.java         |   2 +-
 8 files changed, 785 insertions(+), 321 deletions(-)

diff --git 
a/extensions-contrib/rabbit-stream-indexing-service/src/main/java/org/apache/druid/indexing/rabbitstream/RabbitStreamIndexTaskClientFactory.java
 
b/extensions-contrib/rabbit-stream-indexing-service/src/main/java/org/apache/druid/indexing/rabbitstream/RabbitStreamIndexTaskClientFactory.java
index 83cdde8191f..088dc6b77d5 100644
--- 
a/extensions-contrib/rabbit-stream-indexing-service/src/main/java/org/apache/druid/indexing/rabbitstream/RabbitStreamIndexTaskClientFactory.java
+++ 
b/extensions-contrib/rabbit-stream-indexing-service/src/main/java/org/apache/druid/indexing/rabbitstream/RabbitStreamIndexTaskClientFactory.java
@@ -25,17 +25,17 @@ import org.apache.druid.guice.LazySingleton;
 import org.apache.druid.guice.annotations.EscalatedGlobal;
 import org.apache.druid.guice.annotations.Json;
 import 
org.apache.druid.indexing.seekablestream.SeekableStreamIndexTaskClientFactory;
-import org.apache.druid.rpc.ServiceClientFactory;
+import org.apache.druid.java.util.http.client.HttpClient;
 
 @LazySingleton
 public class RabbitStreamIndexTaskClientFactory extends 
SeekableStreamIndexTaskClientFactory<String, Long>
 {
   @Inject
   public RabbitStreamIndexTaskClientFactory(
-      @EscalatedGlobal ServiceClientFactory serviceClientFactory,
+      @EscalatedGlobal HttpClient httpClient,
       @Json ObjectMapper mapper)
   {
-    super(serviceClientFactory, mapper);
+    super(httpClient, mapper);
   }
 
   @Override
diff --git 
a/extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/indexing/kafka/KafkaIndexTaskClientFactory.java
 
b/extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/indexing/kafka/KafkaIndexTaskClientFactory.java
index 73a880b524b..10b57dd4757 100644
--- 
a/extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/indexing/kafka/KafkaIndexTaskClientFactory.java
+++ 
b/extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/indexing/kafka/KafkaIndexTaskClientFactory.java
@@ -26,18 +26,18 @@ import org.apache.druid.guice.LazySingleton;
 import org.apache.druid.guice.annotations.EscalatedGlobal;
 import org.apache.druid.guice.annotations.Json;
 import 
org.apache.druid.indexing.seekablestream.SeekableStreamIndexTaskClientFactory;
-import org.apache.druid.rpc.ServiceClientFactory;
+import org.apache.druid.java.util.http.client.HttpClient;
 
 @LazySingleton
 public class KafkaIndexTaskClientFactory extends 
SeekableStreamIndexTaskClientFactory<KafkaTopicPartition, Long>
 {
   @Inject
   public KafkaIndexTaskClientFactory(
-      @EscalatedGlobal ServiceClientFactory serviceClientFactory,
+      @EscalatedGlobal HttpClient httpClient,
       @Json ObjectMapper mapper
   )
   {
-    super(serviceClientFactory, mapper);
+    super(httpClient, mapper);
   }
 
   @Override
diff --git 
a/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/supervisor/KafkaSupervisorTest.java
 
b/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/supervisor/KafkaSupervisorTest.java
index e436b8cd56a..7926a0568fd 100644
--- 
a/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/supervisor/KafkaSupervisorTest.java
+++ 
b/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/supervisor/KafkaSupervisorTest.java
@@ -127,6 +127,7 @@ import java.util.Map;
 import java.util.Properties;
 import java.util.TreeMap;
 import java.util.concurrent.Executor;
+import java.util.concurrent.ScheduledExecutorService;
 
 @RunWith(Parameterized.class)
 public class KafkaSupervisorTest extends EasyMockSupport
@@ -202,10 +203,11 @@ public class KafkaSupervisorTest extends EasyMockSupport
         zkServer.getConnectString(),
         null,
         1,
-        ImmutableMap.of("num.partitions",
-                        String.valueOf(NUM_PARTITIONS),
-                        "auto.create.topics.enable",
-                        String.valueOf(false)
+        ImmutableMap.of(
+            "num.partitions",
+            String.valueOf(NUM_PARTITIONS),
+            "auto.create.topics.enable",
+            String.valueOf(false)
         )
     );
     kafkaServer.start();
@@ -267,11 +269,10 @@ public class KafkaSupervisorTest extends EasyMockSupport
       public SeekableStreamIndexTaskClient<KafkaTopicPartition, Long> build(
           final String dataSource,
           final TaskInfoProvider taskInfoProvider,
-          final int maxNumTasks,
-          final SeekableStreamSupervisorTuningConfig tuningConfig
+          final SeekableStreamSupervisorTuningConfig tuningConfig,
+          final ScheduledExecutorService connectExec
       )
       {
-        Assert.assertEquals(replicas * taskCountMax, maxNumTasks);
         Assert.assertEquals(TEST_HTTP_TIMEOUT.toStandardDuration(), 
tuningConfig.getHttpTimeout());
         Assert.assertEquals(TEST_CHAT_RETRIES, (long) 
tuningConfig.getChatRetries());
         return taskClient;
@@ -299,53 +300,53 @@ public class KafkaSupervisorTest extends EasyMockSupport
     consumerProperties.put("bootstrap.servers", kafkaHost);
 
     KafkaSupervisorIOConfig kafkaSupervisorIOConfig = new 
KafkaSupervisorIOConfig(
-            topic,
-            null,
-            INPUT_FORMAT,
-            replicas,
-            1,
-            new Period("PT1H"),
-            consumerProperties,
-            OBJECT_MAPPER.convertValue(autoScalerConfig, 
LagBasedAutoScalerConfig.class),
-            KafkaSupervisorIOConfig.DEFAULT_POLL_TIMEOUT_MILLIS,
-            new Period("P1D"),
-            new Period("PT30S"),
-            true,
-            new Period("PT30M"),
-            null,
-            null,
-            null,
-            null,
-            new IdleConfig(true, 1000L),
-            1
+        topic,
+        null,
+        INPUT_FORMAT,
+        replicas,
+        1,
+        new Period("PT1H"),
+        consumerProperties,
+        OBJECT_MAPPER.convertValue(autoScalerConfig, 
LagBasedAutoScalerConfig.class),
+        KafkaSupervisorIOConfig.DEFAULT_POLL_TIMEOUT_MILLIS,
+        new Period("P1D"),
+        new Period("PT30S"),
+        true,
+        new Period("PT30M"),
+        null,
+        null,
+        null,
+        null,
+        new IdleConfig(true, 1000L),
+        1
     );
 
     final KafkaSupervisorTuningConfig tuningConfigOri = new 
KafkaSupervisorTuningConfig(
-            null,
-            1000,
-            null,
-            null,
-            50000,
-            null,
-            new Period("P1Y"),
-            null,
-            null,
-            null,
-            false,
-            null,
-            false,
-            null,
-            numThreads,
-            TEST_CHAT_RETRIES,
-            TEST_HTTP_TIMEOUT,
-            TEST_SHUTDOWN_TIMEOUT,
-            null,
-            null,
-            null,
-            null,
-            null,
-            null,
-          null
+        null,
+        1000,
+        null,
+        null,
+        50000,
+        null,
+        new Period("P1Y"),
+        null,
+        null,
+        null,
+        false,
+        null,
+        false,
+        null,
+        numThreads,
+        TEST_CHAT_RETRIES,
+        TEST_HTTP_TIMEOUT,
+        TEST_SHUTDOWN_TIMEOUT,
+        null,
+        null,
+        null,
+        null,
+        null,
+        null,
+        null
     );
 
     
EasyMock.expect(ingestionSchema.getIOConfig()).andReturn(kafkaSupervisorIOConfig).anyTimes();
@@ -354,31 +355,31 @@ public class KafkaSupervisorTest extends EasyMockSupport
     EasyMock.replay(ingestionSchema);
 
     SeekableStreamSupervisorSpec testableSupervisorSpec = new 
KafkaSupervisorSpec(
-            ingestionSchema,
-            dataSchema,
-            tuningConfigOri,
-            kafkaSupervisorIOConfig,
-            null,
-            false,
-            taskStorage,
-            taskMaster,
-            indexerMetadataStorageCoordinator,
-            taskClientFactory,
-            OBJECT_MAPPER,
-            new NoopServiceEmitter(),
-            new DruidMonitorSchedulerConfig(),
-            rowIngestionMetersFactory,
-            new SupervisorStateManagerConfig()
+        ingestionSchema,
+        dataSchema,
+        tuningConfigOri,
+        kafkaSupervisorIOConfig,
+        null,
+        false,
+        taskStorage,
+        taskMaster,
+        indexerMetadataStorageCoordinator,
+        taskClientFactory,
+        OBJECT_MAPPER,
+        new NoopServiceEmitter(),
+        new DruidMonitorSchedulerConfig(),
+        rowIngestionMetersFactory,
+        new SupervisorStateManagerConfig()
     );
 
     supervisor = new TestableKafkaSupervisor(
-            taskStorage,
-            taskMaster,
-            indexerMetadataStorageCoordinator,
-            taskClientFactory,
-            OBJECT_MAPPER,
-            (KafkaSupervisorSpec) testableSupervisorSpec,
-            rowIngestionMetersFactory
+        taskStorage,
+        taskMaster,
+        indexerMetadataStorageCoordinator,
+        taskClientFactory,
+        OBJECT_MAPPER,
+        (KafkaSupervisorSpec) testableSupervisorSpec,
+        rowIngestionMetersFactory
     );
 
     SupervisorTaskAutoScaler autoscaler = 
testableSupervisorSpec.createAutoscaler(supervisor);
@@ -393,9 +394,9 @@ public class KafkaSupervisorTest extends EasyMockSupport
     
EasyMock.expect(taskMaster.getSupervisorManager()).andReturn(Optional.absent()).anyTimes();
     
EasyMock.expect(taskStorage.getActiveTasksByDatasource(DATASOURCE)).andReturn(ImmutableList.of()).anyTimes();
     
EasyMock.expect(indexerMetadataStorageCoordinator.retrieveDataSourceMetadata(DATASOURCE)).andReturn(
-            new KafkaDataSourceMetadata(
-                    null
-            )
+        new KafkaDataSourceMetadata(
+            null
+        )
     ).anyTimes();
     
EasyMock.expect(taskQueue.add(EasyMock.capture(captured))).andReturn(true).anyTimes();
     taskRunner.registerListener(EasyMock.anyObject(TaskRunnerListener.class), 
EasyMock.anyObject(Executor.class));
@@ -425,22 +426,43 @@ public class KafkaSupervisorTest extends EasyMockSupport
     Assert.assertFalse("maximumMessageTime", 
taskConfig.getMaximumMessageTime().isPresent());
 
     Assert.assertEquals(topic, 
taskConfig.getStartSequenceNumbers().getStream());
-    Assert.assertEquals(0L, (long) 
taskConfig.getStartSequenceNumbers().getPartitionSequenceNumberMap().get(new 
KafkaTopicPartition(false, topic, 0)));
-    Assert.assertEquals(0L, (long) 
taskConfig.getStartSequenceNumbers().getPartitionSequenceNumberMap().get(new 
KafkaTopicPartition(false, topic, 1)));
-    Assert.assertEquals(0L, (long) 
taskConfig.getStartSequenceNumbers().getPartitionSequenceNumberMap().get(new 
KafkaTopicPartition(false, topic, 2)));
+    Assert.assertEquals(
+        0L,
+        (long) taskConfig.getStartSequenceNumbers()
+                         .getPartitionSequenceNumberMap()
+                         .get(new KafkaTopicPartition(false, topic, 0))
+    );
+    Assert.assertEquals(
+        0L,
+        (long) taskConfig.getStartSequenceNumbers()
+                         .getPartitionSequenceNumberMap()
+                         .get(new KafkaTopicPartition(false, topic, 1))
+    );
+    Assert.assertEquals(
+        0L,
+        (long) taskConfig.getStartSequenceNumbers()
+                         .getPartitionSequenceNumberMap()
+                         .get(new KafkaTopicPartition(false, topic, 2))
+    );
 
     Assert.assertEquals(topic, taskConfig.getEndSequenceNumbers().getStream());
     Assert.assertEquals(
-            Long.MAX_VALUE,
-            (long) 
taskConfig.getEndSequenceNumbers().getPartitionSequenceNumberMap().get(new 
KafkaTopicPartition(false, topic, 0))
+        Long.MAX_VALUE,
+        (long) taskConfig.getEndSequenceNumbers()
+                         .getPartitionSequenceNumberMap()
+                         .get(new KafkaTopicPartition(false, topic, 0))
     );
     Assert.assertEquals(
-            Long.MAX_VALUE,
-            (long) 
taskConfig.getEndSequenceNumbers().getPartitionSequenceNumberMap().get(new 
KafkaTopicPartition(false, topic, 1))
+        Long.MAX_VALUE,
+        (long) taskConfig.getEndSequenceNumbers()
+                         .getPartitionSequenceNumberMap()
+                         .get(new KafkaTopicPartition(false, topic, 1))
     );
     Assert.assertEquals(
-            Long.MAX_VALUE,
-            (long) 
taskConfig.getEndSequenceNumbers().getPartitionSequenceNumberMap().get(new 
KafkaTopicPartition(false, topic, 2))
+        Long.MAX_VALUE,
+        (long) taskConfig.getEndSequenceNumbers()
+                         .getPartitionSequenceNumberMap()
+                         .get(new KafkaTopicPartition(false, topic, 2))
     );
     Assert.assertEquals(
         Collections.singleton(new ResourceAction(
@@ -543,22 +565,43 @@ public class KafkaSupervisorTest extends EasyMockSupport
     Assert.assertFalse("maximumMessageTime", 
taskConfig.getMaximumMessageTime().isPresent());
 
     Assert.assertEquals(topic, 
taskConfig.getStartSequenceNumbers().getStream());
-    Assert.assertEquals(0L, (long) 
taskConfig.getStartSequenceNumbers().getPartitionSequenceNumberMap().get(new 
KafkaTopicPartition(false, topic, 0)));
-    Assert.assertEquals(0L, (long) 
taskConfig.getStartSequenceNumbers().getPartitionSequenceNumberMap().get(new 
KafkaTopicPartition(false, topic, 1)));
-    Assert.assertEquals(0L, (long) 
taskConfig.getStartSequenceNumbers().getPartitionSequenceNumberMap().get(new 
KafkaTopicPartition(false, topic, 2)));
+    Assert.assertEquals(
+        0L,
+        (long) taskConfig.getStartSequenceNumbers()
+                         .getPartitionSequenceNumberMap()
+                         .get(new KafkaTopicPartition(false, topic, 0))
+    );
+    Assert.assertEquals(
+        0L,
+        (long) taskConfig.getStartSequenceNumbers()
+                         .getPartitionSequenceNumberMap()
+                         .get(new KafkaTopicPartition(false, topic, 1))
+    );
+    Assert.assertEquals(
+        0L,
+        (long) taskConfig.getStartSequenceNumbers()
+                         .getPartitionSequenceNumberMap()
+                         .get(new KafkaTopicPartition(false, topic, 2))
+    );
 
     Assert.assertEquals(topic, taskConfig.getEndSequenceNumbers().getStream());
     Assert.assertEquals(
         Long.MAX_VALUE,
-        (long) 
taskConfig.getEndSequenceNumbers().getPartitionSequenceNumberMap().get(new 
KafkaTopicPartition(false, topic, 0))
+        (long) taskConfig.getEndSequenceNumbers()
+                         .getPartitionSequenceNumberMap()
+                         .get(new KafkaTopicPartition(false, topic, 0))
     );
     Assert.assertEquals(
         Long.MAX_VALUE,
-        (long) 
taskConfig.getEndSequenceNumbers().getPartitionSequenceNumberMap().get(new 
KafkaTopicPartition(false, topic, 1))
+        (long) taskConfig.getEndSequenceNumbers()
+                         .getPartitionSequenceNumberMap()
+                         .get(new KafkaTopicPartition(false, topic, 1))
     );
     Assert.assertEquals(
         Long.MAX_VALUE,
-        (long) 
taskConfig.getEndSequenceNumbers().getPartitionSequenceNumberMap().get(new 
KafkaTopicPartition(false, topic, 2))
+        (long) taskConfig.getEndSequenceNumbers()
+                         .getPartitionSequenceNumberMap()
+                         .get(new KafkaTopicPartition(false, topic, 2))
     );
   }
 
@@ -613,19 +656,35 @@ public class KafkaSupervisorTest extends EasyMockSupport
     Assert.assertEquals(2, 
task1.getIOConfig().getEndSequenceNumbers().getPartitionSequenceNumberMap().size());
     Assert.assertEquals(
         0L,
-        
task1.getIOConfig().getStartSequenceNumbers().getPartitionSequenceNumberMap().get(new
 KafkaTopicPartition(false, topic, 0)).longValue()
+        task1.getIOConfig()
+             .getStartSequenceNumbers()
+             .getPartitionSequenceNumberMap()
+             .get(new KafkaTopicPartition(false, topic, 0))
+             .longValue()
     );
     Assert.assertEquals(
         Long.MAX_VALUE,
-        
task1.getIOConfig().getEndSequenceNumbers().getPartitionSequenceNumberMap().get(new
 KafkaTopicPartition(false, topic, 0)).longValue()
+        task1.getIOConfig()
+             .getEndSequenceNumbers()
+             .getPartitionSequenceNumberMap()
+             .get(new KafkaTopicPartition(false, topic, 0))
+             .longValue()
     );
     Assert.assertEquals(
         0L,
-        
task1.getIOConfig().getStartSequenceNumbers().getPartitionSequenceNumberMap().get(new
 KafkaTopicPartition(false, topic, 2)).longValue()
+        task1.getIOConfig()
+             .getStartSequenceNumbers()
+             .getPartitionSequenceNumberMap()
+             .get(new KafkaTopicPartition(false, topic, 2))
+             .longValue()
     );
     Assert.assertEquals(
         Long.MAX_VALUE,
-        
task1.getIOConfig().getEndSequenceNumbers().getPartitionSequenceNumberMap().get(new
 KafkaTopicPartition(false, topic, 2)).longValue()
+        task1.getIOConfig()
+             .getEndSequenceNumbers()
+             .getPartitionSequenceNumberMap()
+             .get(new KafkaTopicPartition(false, topic, 2))
+             .longValue()
     );
 
     KafkaIndexTask task2 = captured.getValues().get(1);
@@ -633,11 +692,19 @@ public class KafkaSupervisorTest extends EasyMockSupport
     Assert.assertEquals(1, 
task2.getIOConfig().getEndSequenceNumbers().getPartitionSequenceNumberMap().size());
     Assert.assertEquals(
         0L,
-        
task2.getIOConfig().getStartSequenceNumbers().getPartitionSequenceNumberMap().get(new
 KafkaTopicPartition(false, topic, 1)).longValue()
+        task2.getIOConfig()
+             .getStartSequenceNumbers()
+             .getPartitionSequenceNumberMap()
+             .get(new KafkaTopicPartition(false, topic, 1))
+             .longValue()
     );
     Assert.assertEquals(
         Long.MAX_VALUE,
-        
task2.getIOConfig().getEndSequenceNumbers().getPartitionSequenceNumberMap().get(new
 KafkaTopicPartition(false, topic, 1)).longValue()
+        task2.getIOConfig()
+             .getEndSequenceNumbers()
+             .getPartitionSequenceNumberMap()
+             .get(new KafkaTopicPartition(false, topic, 1))
+             .longValue()
     );
   }
 
@@ -668,15 +735,27 @@ public class KafkaSupervisorTest extends EasyMockSupport
     Assert.assertEquals(3, 
task1.getIOConfig().getEndSequenceNumbers().getPartitionSequenceNumberMap().size());
     Assert.assertEquals(
         0L,
-        
task1.getIOConfig().getStartSequenceNumbers().getPartitionSequenceNumberMap().get(new
 KafkaTopicPartition(false, topic, 0)).longValue()
+        task1.getIOConfig()
+             .getStartSequenceNumbers()
+             .getPartitionSequenceNumberMap()
+             .get(new KafkaTopicPartition(false, topic, 0))
+             .longValue()
     );
     Assert.assertEquals(
         0L,
-        
task1.getIOConfig().getStartSequenceNumbers().getPartitionSequenceNumberMap().get(new
 KafkaTopicPartition(false, topic, 1)).longValue()
+        task1.getIOConfig()
+             .getStartSequenceNumbers()
+             .getPartitionSequenceNumberMap()
+             .get(new KafkaTopicPartition(false, topic, 1))
+             .longValue()
     );
     Assert.assertEquals(
         0L,
-        
task1.getIOConfig().getStartSequenceNumbers().getPartitionSequenceNumberMap().get(new
 KafkaTopicPartition(false, topic, 2)).longValue()
+        task1.getIOConfig()
+             .getStartSequenceNumbers()
+             .getPartitionSequenceNumberMap()
+             .get(new KafkaTopicPartition(false, topic, 2))
+             .longValue()
     );
 
     KafkaIndexTask task2 = captured.getValues().get(1);
@@ -684,15 +763,27 @@ public class KafkaSupervisorTest extends EasyMockSupport
     Assert.assertEquals(3, 
task2.getIOConfig().getEndSequenceNumbers().getPartitionSequenceNumberMap().size());
     Assert.assertEquals(
         0L,
-        
task2.getIOConfig().getStartSequenceNumbers().getPartitionSequenceNumberMap().get(new
 KafkaTopicPartition(false, topic, 0)).longValue()
+        task2.getIOConfig()
+             .getStartSequenceNumbers()
+             .getPartitionSequenceNumberMap()
+             .get(new KafkaTopicPartition(false, topic, 0))
+             .longValue()
     );
     Assert.assertEquals(
         0L,
-        
task2.getIOConfig().getStartSequenceNumbers().getPartitionSequenceNumberMap().get(new
 KafkaTopicPartition(false, topic, 1)).longValue()
+        task2.getIOConfig()
+             .getStartSequenceNumbers()
+             .getPartitionSequenceNumberMap()
+             .get(new KafkaTopicPartition(false, topic, 1))
+             .longValue()
     );
     Assert.assertEquals(
         0L,
-        
task2.getIOConfig().getStartSequenceNumbers().getPartitionSequenceNumberMap().get(new
 KafkaTopicPartition(false, topic, 2)).longValue()
+        task2.getIOConfig()
+             .getStartSequenceNumbers()
+             .getPartitionSequenceNumberMap()
+             .get(new KafkaTopicPartition(false, topic, 2))
+             .longValue()
     );
   }
 
@@ -802,15 +893,27 @@ public class KafkaSupervisorTest extends EasyMockSupport
     KafkaIndexTask task = captured.getValue();
     Assert.assertEquals(
         1101L,
-        
task.getIOConfig().getStartSequenceNumbers().getPartitionSequenceNumberMap().get(new
 KafkaTopicPartition(false, topic, 0)).longValue()
+        task.getIOConfig()
+            .getStartSequenceNumbers()
+            .getPartitionSequenceNumberMap()
+            .get(new KafkaTopicPartition(false, topic, 0))
+            .longValue()
     );
     Assert.assertEquals(
         1101L,
-        
task.getIOConfig().getStartSequenceNumbers().getPartitionSequenceNumberMap().get(new
 KafkaTopicPartition(false, topic, 1)).longValue()
+        task.getIOConfig()
+            .getStartSequenceNumbers()
+            .getPartitionSequenceNumberMap()
+            .get(new KafkaTopicPartition(false, topic, 1))
+            .longValue()
     );
     Assert.assertEquals(
         1101L,
-        
task.getIOConfig().getStartSequenceNumbers().getPartitionSequenceNumberMap().get(new
 KafkaTopicPartition(false, topic, 2)).longValue()
+        task.getIOConfig()
+            .getStartSequenceNumbers()
+            .getPartitionSequenceNumberMap()
+            .get(new KafkaTopicPartition(false, topic, 2))
+            .longValue()
     );
   }
 
@@ -867,15 +970,27 @@ public class KafkaSupervisorTest extends EasyMockSupport
     KafkaIndexTask task = captured.getValue();
     Assert.assertEquals(
         10,
-        
task.getIOConfig().getStartSequenceNumbers().getPartitionSequenceNumberMap().get(new
 KafkaTopicPartition(false, topic, 0)).longValue()
+        task.getIOConfig()
+            .getStartSequenceNumbers()
+            .getPartitionSequenceNumberMap()
+            .get(new KafkaTopicPartition(false, topic, 0))
+            .longValue()
     );
     Assert.assertEquals(
         10,
-        
task.getIOConfig().getStartSequenceNumbers().getPartitionSequenceNumberMap().get(new
 KafkaTopicPartition(false, topic, 1)).longValue()
+        task.getIOConfig()
+            .getStartSequenceNumbers()
+            .getPartitionSequenceNumberMap()
+            .get(new KafkaTopicPartition(false, topic, 1))
+            .longValue()
     );
     Assert.assertEquals(
         10,
-        
task.getIOConfig().getStartSequenceNumbers().getPartitionSequenceNumberMap().get(new
 KafkaTopicPartition(false, topic, 2)).longValue()
+        task.getIOConfig()
+            .getStartSequenceNumbers()
+            .getPartitionSequenceNumberMap()
+            .get(new KafkaTopicPartition(false, topic, 2))
+            .longValue()
     );
 
     addMoreEvents(9, 6);
@@ -899,15 +1014,27 @@ public class KafkaSupervisorTest extends EasyMockSupport
     task = newcaptured.getValue();
     Assert.assertEquals(
         0,
-        
task.getIOConfig().getStartSequenceNumbers().getPartitionSequenceNumberMap().get(new
 KafkaTopicPartition(false, topic, 3)).longValue()
+        task.getIOConfig()
+            .getStartSequenceNumbers()
+            .getPartitionSequenceNumberMap()
+            .get(new KafkaTopicPartition(false, topic, 3))
+            .longValue()
     );
     Assert.assertEquals(
         0,
-        
task.getIOConfig().getStartSequenceNumbers().getPartitionSequenceNumberMap().get(new
 KafkaTopicPartition(false, topic, 4)).longValue()
+        task.getIOConfig()
+            .getStartSequenceNumbers()
+            .getPartitionSequenceNumberMap()
+            .get(new KafkaTopicPartition(false, topic, 4))
+            .longValue()
     );
     Assert.assertEquals(
         0,
-        
task.getIOConfig().getStartSequenceNumbers().getPartitionSequenceNumberMap().get(new
 KafkaTopicPartition(false, topic, 5)).longValue()
+        task.getIOConfig()
+            .getStartSequenceNumbers()
+            .getPartitionSequenceNumberMap()
+            .get(new KafkaTopicPartition(false, topic, 5))
+            .longValue()
     );
   }
 
@@ -928,7 +1055,11 @@ public class KafkaSupervisorTest extends EasyMockSupport
     
EasyMock.expect(taskStorage.getActiveTasksByDatasource(DATASOURCE)).andReturn(ImmutableList.of()).anyTimes();
     
EasyMock.expect(indexerMetadataStorageCoordinator.retrieveDataSourceMetadata(DATASOURCE)).andReturn(
         new KafkaDataSourceMetadata(
-            new SeekableStreamStartSequenceNumbers<>(topic, 
singlePartitionMap(topic, 0, 10L, 1, 20L, 2, 30L), ImmutableSet.of())
+            new SeekableStreamStartSequenceNumbers<>(
+                topic,
+                singlePartitionMap(topic, 0, 10L, 1, 20L, 2, 30L),
+                ImmutableSet.of()
+            )
         )
     ).anyTimes();
     EasyMock.expect(taskQueue.add(EasyMock.capture(captured))).andReturn(true);
@@ -943,15 +1074,24 @@ public class KafkaSupervisorTest extends EasyMockSupport
     Assert.assertEquals("sequenceName-0", taskConfig.getBaseSequenceName());
     Assert.assertEquals(
         10L,
-        
taskConfig.getStartSequenceNumbers().getPartitionSequenceNumberMap().get(new 
KafkaTopicPartition(false, topic, 0)).longValue()
+        taskConfig.getStartSequenceNumbers()
+                  .getPartitionSequenceNumberMap()
+                  .get(new KafkaTopicPartition(false, topic, 0))
+                  .longValue()
     );
     Assert.assertEquals(
         20L,
-        
taskConfig.getStartSequenceNumbers().getPartitionSequenceNumberMap().get(new 
KafkaTopicPartition(false, topic, 1)).longValue()
+        taskConfig.getStartSequenceNumbers()
+                  .getPartitionSequenceNumberMap()
+                  .get(new KafkaTopicPartition(false, topic, 1))
+                  .longValue()
     );
     Assert.assertEquals(
         30L,
-        
taskConfig.getStartSequenceNumbers().getPartitionSequenceNumberMap().get(new 
KafkaTopicPartition(false, topic, 2)).longValue()
+        taskConfig.getStartSequenceNumbers()
+                  .getPartitionSequenceNumberMap()
+                  .get(new KafkaTopicPartition(false, topic, 2))
+                  .longValue()
     );
   }
 
@@ -991,15 +1131,24 @@ public class KafkaSupervisorTest extends EasyMockSupport
     Assert.assertEquals("sequenceName-0", taskConfig.getBaseSequenceName());
     Assert.assertEquals(
         10L,
-        
taskConfig.getStartSequenceNumbers().getPartitionSequenceNumberMap().get(new 
KafkaTopicPartition(true, topic, 0)).longValue()
+        taskConfig.getStartSequenceNumbers()
+                  .getPartitionSequenceNumberMap()
+                  .get(new KafkaTopicPartition(true, topic, 0))
+                  .longValue()
     );
     Assert.assertEquals(
         20L,
-        
taskConfig.getStartSequenceNumbers().getPartitionSequenceNumberMap().get(new 
KafkaTopicPartition(true, topic, 1)).longValue()
+        taskConfig.getStartSequenceNumbers()
+                  .getPartitionSequenceNumberMap()
+                  .get(new KafkaTopicPartition(true, topic, 1))
+                  .longValue()
     );
     Assert.assertEquals(
         30L,
-        
taskConfig.getStartSequenceNumbers().getPartitionSequenceNumberMap().get(new 
KafkaTopicPartition(true, topic, 2)).longValue()
+        taskConfig.getStartSequenceNumbers()
+                  .getPartitionSequenceNumberMap()
+                  .get(new KafkaTopicPartition(true, topic, 2))
+                  .longValue()
     );
     Assert.assertEquals(3, 
taskConfig.getStartSequenceNumbers().getPartitionSequenceNumberMap().size());
   }
@@ -1040,15 +1189,24 @@ public class KafkaSupervisorTest extends EasyMockSupport
     Assert.assertEquals("sequenceName-0", taskConfig.getBaseSequenceName());
     Assert.assertEquals(
         0L,
-        
taskConfig.getStartSequenceNumbers().getPartitionSequenceNumberMap().get(new 
KafkaTopicPartition(true, topic, 0)).longValue()
+        taskConfig.getStartSequenceNumbers()
+                  .getPartitionSequenceNumberMap()
+                  .get(new KafkaTopicPartition(true, topic, 0))
+                  .longValue()
     );
     Assert.assertEquals(
         0L,
-        
taskConfig.getStartSequenceNumbers().getPartitionSequenceNumberMap().get(new 
KafkaTopicPartition(true, topic, 1)).longValue()
+        taskConfig.getStartSequenceNumbers()
+                  .getPartitionSequenceNumberMap()
+                  .get(new KafkaTopicPartition(true, topic, 1))
+                  .longValue()
     );
     Assert.assertEquals(
         0L,
-        
taskConfig.getStartSequenceNumbers().getPartitionSequenceNumberMap().get(new 
KafkaTopicPartition(true, topic, 2)).longValue()
+        taskConfig.getStartSequenceNumbers()
+                  .getPartitionSequenceNumberMap()
+                  .get(new KafkaTopicPartition(true, topic, 2))
+                  .longValue()
     );
     Assert.assertEquals(3, 
taskConfig.getStartSequenceNumbers().getPartitionSequenceNumberMap().size());
   }
@@ -1074,7 +1232,11 @@ public class KafkaSupervisorTest extends EasyMockSupport
     partitionSequenceNumberMap.putAll(multiTopicPartitionMap("notMatch", 0, 
10L, 1, 20L, 2, 30L));
     
EasyMock.expect(indexerMetadataStorageCoordinator.retrieveDataSourceMetadata(DATASOURCE)).andReturn(
         new KafkaDataSourceMetadata(
-            new SeekableStreamStartSequenceNumbers<>(topicPattern, 
partitionSequenceNumberMap.build(), ImmutableSet.of())
+            new SeekableStreamStartSequenceNumbers<>(
+                topicPattern,
+                partitionSequenceNumberMap.build(),
+                ImmutableSet.of()
+            )
         )
     ).anyTimes();
     EasyMock.expect(taskQueue.add(EasyMock.capture(captured))).andReturn(true);
@@ -1089,15 +1251,24 @@ public class KafkaSupervisorTest extends EasyMockSupport
     Assert.assertEquals("sequenceName-0", taskConfig.getBaseSequenceName());
     Assert.assertEquals(
         10L,
-        
taskConfig.getStartSequenceNumbers().getPartitionSequenceNumberMap().get(new 
KafkaTopicPartition(false, topic, 0)).longValue()
+        taskConfig.getStartSequenceNumbers()
+                  .getPartitionSequenceNumberMap()
+                  .get(new KafkaTopicPartition(false, topic, 0))
+                  .longValue()
     );
     Assert.assertEquals(
         20L,
-        
taskConfig.getStartSequenceNumbers().getPartitionSequenceNumberMap().get(new 
KafkaTopicPartition(false, topic, 1)).longValue()
+        taskConfig.getStartSequenceNumbers()
+                  .getPartitionSequenceNumberMap()
+                  .get(new KafkaTopicPartition(false, topic, 1))
+                  .longValue()
     );
     Assert.assertEquals(
         30L,
-        
taskConfig.getStartSequenceNumbers().getPartitionSequenceNumberMap().get(new 
KafkaTopicPartition(false, topic, 2)).longValue()
+        taskConfig.getStartSequenceNumbers()
+                  .getPartitionSequenceNumberMap()
+                  .get(new KafkaTopicPartition(false, topic, 2))
+                  .longValue()
     );
     Assert.assertEquals(3, 
taskConfig.getStartSequenceNumbers().getPartitionSequenceNumberMap().size());
   }
@@ -1139,15 +1310,24 @@ public class KafkaSupervisorTest extends EasyMockSupport
     Assert.assertEquals("sequenceName-0", taskConfig.getBaseSequenceName());
     Assert.assertEquals(
         10L,
-        
taskConfig.getStartSequenceNumbers().getPartitionSequenceNumberMap().get(new 
KafkaTopicPartition(true, topic, 0)).longValue()
+        taskConfig.getStartSequenceNumbers()
+                  .getPartitionSequenceNumberMap()
+                  .get(new KafkaTopicPartition(true, topic, 0))
+                  .longValue()
     );
     Assert.assertEquals(
         20L,
-        
taskConfig.getStartSequenceNumbers().getPartitionSequenceNumberMap().get(new 
KafkaTopicPartition(true, topic, 1)).longValue()
+        taskConfig.getStartSequenceNumbers()
+                  .getPartitionSequenceNumberMap()
+                  .get(new KafkaTopicPartition(true, topic, 1))
+                  .longValue()
     );
     Assert.assertEquals(
         30L,
-        
taskConfig.getStartSequenceNumbers().getPartitionSequenceNumberMap().get(new 
KafkaTopicPartition(true, topic, 2)).longValue()
+        taskConfig.getStartSequenceNumbers()
+                  .getPartitionSequenceNumberMap()
+                  .get(new KafkaTopicPartition(true, topic, 2))
+                  .longValue()
     );
     Assert.assertEquals(3, 
taskConfig.getStartSequenceNumbers().getPartitionSequenceNumberMap().size());
   }
@@ -1239,7 +1419,10 @@ public class KafkaSupervisorTest extends EasyMockSupport
         DATASOURCE,
         0,
         new SeekableStreamStartSequenceNumbers<>("topic", 
singlePartitionMap(topic, 0, 0L, 2, 0L), ImmutableSet.of()),
-        new SeekableStreamEndSequenceNumbers<>("topic", 
singlePartitionMap(topic, 0, Long.MAX_VALUE, 2, Long.MAX_VALUE)),
+        new SeekableStreamEndSequenceNumbers<>(
+            "topic",
+            singlePartitionMap(topic, 0, Long.MAX_VALUE, 2, Long.MAX_VALUE)
+        ),
         null,
         null,
         tuningConfig
@@ -1258,7 +1441,11 @@ public class KafkaSupervisorTest extends EasyMockSupport
         "id3",
         DATASOURCE,
         0,
-        new SeekableStreamStartSequenceNumbers<>("topic", 
singlePartitionMap(topic, 0, 0L, 1, 0L, 2, 0L), ImmutableSet.of()),
+        new SeekableStreamStartSequenceNumbers<>(
+            "topic",
+            singlePartitionMap(topic, 0, 0L, 1, 0L, 2, 0L),
+            ImmutableSet.of()
+        ),
         new SeekableStreamEndSequenceNumbers<>(
             "topic",
             singlePartitionMap(topic, 0, Long.MAX_VALUE, 1, Long.MAX_VALUE, 2, 
Long.MAX_VALUE)
@@ -1272,7 +1459,10 @@ public class KafkaSupervisorTest extends EasyMockSupport
         DATASOURCE,
         0,
         new SeekableStreamStartSequenceNumbers<>("topic", 
singlePartitionMap(topic, 0, 0L, 1, 0L), ImmutableSet.of()),
-        new SeekableStreamEndSequenceNumbers<>("topic", 
singlePartitionMap(topic, 0, Long.MAX_VALUE, 1, Long.MAX_VALUE)),
+        new SeekableStreamEndSequenceNumbers<>(
+            "topic",
+            singlePartitionMap(topic, 0, Long.MAX_VALUE, 1, Long.MAX_VALUE)
+        ),
         null,
         null,
         tuningConfig
@@ -1282,7 +1472,10 @@ public class KafkaSupervisorTest extends EasyMockSupport
         DATASOURCE,
         0,
         new SeekableStreamStartSequenceNumbers<>("topic", 
singlePartitionMap(topic, 1, 0L, 2, 0L), ImmutableSet.of()),
-        new SeekableStreamEndSequenceNumbers<>("topic", 
singlePartitionMap(topic, 1, Long.MAX_VALUE, 2, Long.MAX_VALUE)),
+        new SeekableStreamEndSequenceNumbers<>(
+            "topic",
+            singlePartitionMap(topic, 1, Long.MAX_VALUE, 2, Long.MAX_VALUE)
+        ),
         null,
         null,
         tuningConfig
@@ -1443,7 +1636,10 @@ public class KafkaSupervisorTest extends EasyMockSupport
         DATASOURCE,
         0,
         new SeekableStreamStartSequenceNumbers<>("topic", 
singlePartitionMap(topic, 0, 0L, 2, 0L), ImmutableSet.of()),
-        new SeekableStreamEndSequenceNumbers<>("topic", 
singlePartitionMap(topic, 0, Long.MAX_VALUE, 2, Long.MAX_VALUE)),
+        new SeekableStreamEndSequenceNumbers<>(
+            "topic",
+            singlePartitionMap(topic, 0, Long.MAX_VALUE, 2, Long.MAX_VALUE)
+        ),
         now,
         maxi,
         supervisor.getTuningConfig()
@@ -1726,8 +1922,18 @@ public class KafkaSupervisorTest extends EasyMockSupport
       Assert.assertTrue("isUseTransaction", taskConfig.isUseTransaction());
 
       Assert.assertEquals(topic, 
taskConfig.getStartSequenceNumbers().getStream());
-      Assert.assertEquals(10L, (long) 
taskConfig.getStartSequenceNumbers().getPartitionSequenceNumberMap().get(new 
KafkaTopicPartition(false, topic, 0)));
-      Assert.assertEquals(35L, (long) 
taskConfig.getStartSequenceNumbers().getPartitionSequenceNumberMap().get(new 
KafkaTopicPartition(false, topic, 2)));
+      Assert.assertEquals(
+          10L,
+          (long) taskConfig.getStartSequenceNumbers()
+                           .getPartitionSequenceNumberMap()
+                           .get(new KafkaTopicPartition(false, topic, 0))
+      );
+      Assert.assertEquals(
+          35L,
+          (long) taskConfig.getStartSequenceNumbers()
+                           .getPartitionSequenceNumberMap()
+                           .get(new KafkaTopicPartition(false, topic, 2))
+      );
     }
   }
 
@@ -1744,7 +1950,11 @@ public class KafkaSupervisorTest extends EasyMockSupport
         "id1",
         DATASOURCE,
         0,
-        new SeekableStreamStartSequenceNumbers<>("topic", 
singlePartitionMap(topic, 0, 0L, 1, 0L, 2, 0L), ImmutableSet.of()),
+        new SeekableStreamStartSequenceNumbers<>(
+            "topic",
+            singlePartitionMap(topic, 0, 0L, 1, 0L, 2, 0L),
+            ImmutableSet.of()
+        ),
         new SeekableStreamEndSequenceNumbers<>(
             "topic",
             singlePartitionMap(topic, 0, Long.MAX_VALUE, 1, Long.MAX_VALUE, 2, 
Long.MAX_VALUE)
@@ -1825,29 +2035,47 @@ public class KafkaSupervisorTest extends EasyMockSupport
     Assert.assertEquals(topic, 
capturedTaskConfig.getStartSequenceNumbers().getStream());
     Assert.assertEquals(
         10L,
-        
capturedTaskConfig.getStartSequenceNumbers().getPartitionSequenceNumberMap().get(new
 KafkaTopicPartition(false, topic, 0)).longValue()
+        capturedTaskConfig.getStartSequenceNumbers()
+                          .getPartitionSequenceNumberMap()
+                          .get(new KafkaTopicPartition(false, topic, 0))
+                          .longValue()
     );
     Assert.assertEquals(
         20L,
-        
capturedTaskConfig.getStartSequenceNumbers().getPartitionSequenceNumberMap().get(new
 KafkaTopicPartition(false, topic, 1)).longValue()
+        capturedTaskConfig.getStartSequenceNumbers()
+                          .getPartitionSequenceNumberMap()
+                          .get(new KafkaTopicPartition(false, topic, 1))
+                          .longValue()
     );
     Assert.assertEquals(
         30L,
-        
capturedTaskConfig.getStartSequenceNumbers().getPartitionSequenceNumberMap().get(new
 KafkaTopicPartition(false, topic, 2)).longValue()
+        capturedTaskConfig.getStartSequenceNumbers()
+                          .getPartitionSequenceNumberMap()
+                          .get(new KafkaTopicPartition(false, topic, 2))
+                          .longValue()
     );
 
     Assert.assertEquals(topic, 
capturedTaskConfig.getEndSequenceNumbers().getStream());
     Assert.assertEquals(
         Long.MAX_VALUE,
-        
capturedTaskConfig.getEndSequenceNumbers().getPartitionSequenceNumberMap().get(new
 KafkaTopicPartition(false, topic, 0)).longValue()
+        capturedTaskConfig.getEndSequenceNumbers()
+                          .getPartitionSequenceNumberMap()
+                          .get(new KafkaTopicPartition(false, topic, 0))
+                          .longValue()
     );
     Assert.assertEquals(
         Long.MAX_VALUE,
-        
capturedTaskConfig.getEndSequenceNumbers().getPartitionSequenceNumberMap().get(new
 KafkaTopicPartition(false, topic, 1)).longValue()
+        capturedTaskConfig.getEndSequenceNumbers()
+                          .getPartitionSequenceNumberMap()
+                          .get(new KafkaTopicPartition(false, topic, 1))
+                          .longValue()
     );
     Assert.assertEquals(
         Long.MAX_VALUE,
-        
capturedTaskConfig.getEndSequenceNumbers().getPartitionSequenceNumberMap().get(new
 KafkaTopicPartition(false, topic, 2)).longValue()
+        capturedTaskConfig.getEndSequenceNumbers()
+                          .getPartitionSequenceNumberMap()
+                          .get(new KafkaTopicPartition(false, topic, 2))
+                          .longValue()
     );
   }
 
@@ -1865,7 +2093,10 @@ public class KafkaSupervisorTest extends EasyMockSupport
         DATASOURCE,
         0,
         new SeekableStreamStartSequenceNumbers<>("topic", 
singlePartitionMap(topic, 0, 0L, 2, 0L), ImmutableSet.of()),
-        new SeekableStreamEndSequenceNumbers<>("topic", 
singlePartitionMap(topic, 0, Long.MAX_VALUE, 2, Long.MAX_VALUE)),
+        new SeekableStreamEndSequenceNumbers<>(
+            "topic",
+            singlePartitionMap(topic, 0, Long.MAX_VALUE, 2, Long.MAX_VALUE)
+        ),
         null,
         null,
         supervisor.getTuningConfig()
@@ -1936,29 +2167,47 @@ public class KafkaSupervisorTest extends EasyMockSupport
     Assert.assertEquals(topic, 
capturedTaskConfig.getStartSequenceNumbers().getStream());
     Assert.assertEquals(
         10L,
-        
capturedTaskConfig.getStartSequenceNumbers().getPartitionSequenceNumberMap().get(new
 KafkaTopicPartition(false, topic, 0)).longValue()
+        capturedTaskConfig.getStartSequenceNumbers()
+                          .getPartitionSequenceNumberMap()
+                          .get(new KafkaTopicPartition(false, topic, 0))
+                          .longValue()
     );
     Assert.assertEquals(
         0L,
-        
capturedTaskConfig.getStartSequenceNumbers().getPartitionSequenceNumberMap().get(new
 KafkaTopicPartition(false, topic, 1)).longValue()
+        capturedTaskConfig.getStartSequenceNumbers()
+                          .getPartitionSequenceNumberMap()
+                          .get(new KafkaTopicPartition(false, topic, 1))
+                          .longValue()
     );
     Assert.assertEquals(
         30L,
-        
capturedTaskConfig.getStartSequenceNumbers().getPartitionSequenceNumberMap().get(new
 KafkaTopicPartition(false, topic, 2)).longValue()
+        capturedTaskConfig.getStartSequenceNumbers()
+                          .getPartitionSequenceNumberMap()
+                          .get(new KafkaTopicPartition(false, topic, 2))
+                          .longValue()
     );
 
     Assert.assertEquals(topic, 
capturedTaskConfig.getEndSequenceNumbers().getStream());
     Assert.assertEquals(
         Long.MAX_VALUE,
-        
capturedTaskConfig.getEndSequenceNumbers().getPartitionSequenceNumberMap().get(new
 KafkaTopicPartition(false, topic, 0)).longValue()
+        capturedTaskConfig.getEndSequenceNumbers()
+                          .getPartitionSequenceNumberMap()
+                          .get(new KafkaTopicPartition(false, topic, 0))
+                          .longValue()
     );
     Assert.assertEquals(
         Long.MAX_VALUE,
-        
capturedTaskConfig.getEndSequenceNumbers().getPartitionSequenceNumberMap().get(new
 KafkaTopicPartition(false, topic, 1)).longValue()
+        capturedTaskConfig.getEndSequenceNumbers()
+                          .getPartitionSequenceNumberMap()
+                          .get(new KafkaTopicPartition(false, topic, 1))
+                          .longValue()
     );
     Assert.assertEquals(
         Long.MAX_VALUE,
-        
capturedTaskConfig.getEndSequenceNumbers().getPartitionSequenceNumberMap().get(new
 KafkaTopicPartition(false, topic, 2)).longValue()
+        capturedTaskConfig.getEndSequenceNumbers()
+                          .getPartitionSequenceNumberMap()
+                          .get(new KafkaTopicPartition(false, topic, 2))
+                          .longValue()
     );
   }
 
@@ -1977,7 +2226,11 @@ public class KafkaSupervisorTest extends EasyMockSupport
         "id1",
         DATASOURCE,
         0,
-        new SeekableStreamStartSequenceNumbers<>("topic", 
singlePartitionMap(topic, 0, 0L, 1, 0L, 2, 0L), ImmutableSet.of()),
+        new SeekableStreamStartSequenceNumbers<>(
+            "topic",
+            singlePartitionMap(topic, 0, 0L, 1, 0L, 2, 0L),
+            ImmutableSet.of()
+        ),
         new SeekableStreamEndSequenceNumbers<>(
             "topic",
             singlePartitionMap(topic, 0, Long.MAX_VALUE, 1, Long.MAX_VALUE, 2, 
Long.MAX_VALUE)
@@ -1991,7 +2244,11 @@ public class KafkaSupervisorTest extends EasyMockSupport
         "id2",
         DATASOURCE,
         0,
-        new SeekableStreamStartSequenceNumbers<>("topic", 
singlePartitionMap(topic, 0, 1L, 1, 2L, 2, 3L), ImmutableSet.of()),
+        new SeekableStreamStartSequenceNumbers<>(
+            "topic",
+            singlePartitionMap(topic, 0, 1L, 1, 2L, 2, 3L),
+            ImmutableSet.of()
+        ),
         new SeekableStreamEndSequenceNumbers<>(
             "topic",
             singlePartitionMap(topic, 0, Long.MAX_VALUE, 1, Long.MAX_VALUE, 2, 
Long.MAX_VALUE)
@@ -2407,7 +2664,17 @@ public class KafkaSupervisorTest extends EasyMockSupport
     taskRunner.registerListener(EasyMock.anyObject(TaskRunnerListener.class), 
EasyMock.anyObject(Executor.class));
     
EasyMock.expect(indexerMetadataStorageCoordinator.retrieveDataSourceMetadata(DATASOURCE)).andReturn(
         new KafkaDataSourceMetadata(
-            new SeekableStreamEndSequenceNumbers<KafkaTopicPartition, 
Long>(topic, singlePartitionMap(topic, 0, 2L, 1, 2L, 2, 2L))
+            new SeekableStreamEndSequenceNumbers<KafkaTopicPartition, Long>(
+                topic,
+                singlePartitionMap(topic,
+                                   0,
+                                   2L,
+                                   1,
+                                   2L,
+                                   2,
+                                   2L
+                )
+            )
         )
     ).anyTimes();
     
EasyMock.expect(taskQueue.add(EasyMock.anyObject())).andReturn(true).anyTimes();
@@ -2455,7 +2722,17 @@ public class KafkaSupervisorTest extends EasyMockSupport
     taskRunner.registerListener(EasyMock.anyObject(TaskRunnerListener.class), 
EasyMock.anyObject(Executor.class));
     
EasyMock.expect(indexerMetadataStorageCoordinator.retrieveDataSourceMetadata(DATASOURCE)).andReturn(
         new KafkaDataSourceMetadata(
-            new SeekableStreamEndSequenceNumbers<KafkaTopicPartition, 
Long>(topic, singlePartitionMap(topic, 0, 2L, 1, 2L, 2, 2L))
+            new SeekableStreamEndSequenceNumbers<KafkaTopicPartition, Long>(
+                topic,
+                singlePartitionMap(topic,
+                                   0,
+                                   2L,
+                                   1,
+                                   2L,
+                                   2,
+                                   2L
+                )
+            )
         )
     ).anyTimes();
 
@@ -2484,7 +2761,8 @@ public class KafkaSupervisorTest extends EasyMockSupport
   public void testSupervisorIsIdleIfStreamInactiveWhenSuspended() throws 
Exception
   {
     Map<String, String> config = ImmutableMap.of("idleConfig.enabled", "false",
-                                                 
"idleConfig.inactiveAfterMillis", "200");
+                                                 
"idleConfig.inactiveAfterMillis", "200"
+    );
     supervisorConfig = OBJECT_MAPPER.convertValue(config, 
SupervisorStateManagerConfig.class);
     supervisor = getTestableSupervisorForIdleBehaviour(
         1,
@@ -2504,7 +2782,17 @@ public class KafkaSupervisorTest extends EasyMockSupport
     taskRunner.registerListener(EasyMock.anyObject(TaskRunnerListener.class), 
EasyMock.anyObject(Executor.class));
     
EasyMock.expect(indexerMetadataStorageCoordinator.retrieveDataSourceMetadata(DATASOURCE)).andReturn(
         new KafkaDataSourceMetadata(
-            new SeekableStreamEndSequenceNumbers<KafkaTopicPartition, 
Long>(topic, singlePartitionMap(topic, 0, 2L, 1, 2L, 2, 2L))
+            new SeekableStreamEndSequenceNumbers<KafkaTopicPartition, Long>(
+                topic,
+                singlePartitionMap(topic,
+                                   0,
+                                   2L,
+                                   1,
+                                   2L,
+                                   2,
+                                   2L
+                )
+            )
         )
     ).anyTimes();
     
EasyMock.expect(taskQueue.add(EasyMock.anyObject())).andReturn(true).anyTimes();
@@ -2836,8 +3124,18 @@ public class KafkaSupervisorTest extends EasyMockSupport
 
     for (Task task : captured.getValues()) {
       KafkaIndexTaskIOConfig taskConfig = ((KafkaIndexTask) 
task).getIOConfig();
-      Assert.assertEquals(0L, (long) 
taskConfig.getStartSequenceNumbers().getPartitionSequenceNumberMap().get(new 
KafkaTopicPartition(false, topic, 0)));
-      Assert.assertEquals(0L, (long) 
taskConfig.getStartSequenceNumbers().getPartitionSequenceNumberMap().get(new 
KafkaTopicPartition(false, topic, 2)));
+      Assert.assertEquals(
+          0L,
+          (long) taskConfig.getStartSequenceNumbers()
+                           .getPartitionSequenceNumberMap()
+                           .get(new KafkaTopicPartition(false, topic, 0))
+      );
+      Assert.assertEquals(
+          0L,
+          (long) taskConfig.getStartSequenceNumbers()
+                           .getPartitionSequenceNumberMap()
+                           .get(new KafkaTopicPartition(false, topic, 2))
+      );
     }
   }
 
@@ -2930,8 +3228,18 @@ public class KafkaSupervisorTest extends EasyMockSupport
 
     for (Task task : captured.getValues()) {
       KafkaIndexTaskIOConfig taskConfig = ((KafkaIndexTask) 
task).getIOConfig();
-      Assert.assertEquals(0L, (long) 
taskConfig.getStartSequenceNumbers().getPartitionSequenceNumberMap().get(new 
KafkaTopicPartition(false, topic, 0)));
-      Assert.assertEquals(0L, (long) 
taskConfig.getStartSequenceNumbers().getPartitionSequenceNumberMap().get(new 
KafkaTopicPartition(false, topic, 2)));
+      Assert.assertEquals(
+          0L,
+          (long) taskConfig.getStartSequenceNumbers()
+                           .getPartitionSequenceNumberMap()
+                           .get(new KafkaTopicPartition(false, topic, 0))
+      );
+      Assert.assertEquals(
+          0L,
+          (long) taskConfig.getStartSequenceNumbers()
+                           .getPartitionSequenceNumberMap()
+                           .get(new KafkaTopicPartition(false, topic, 2))
+      );
     }
   }
 
@@ -2972,7 +3280,11 @@ public class KafkaSupervisorTest extends EasyMockSupport
         "id1",
         DATASOURCE,
         0,
-        new SeekableStreamStartSequenceNumbers<>("topic", 
singlePartitionMap(topic, 0, 0L, 1, 0L, 2, 0L), ImmutableSet.of()),
+        new SeekableStreamStartSequenceNumbers<>(
+            "topic",
+            singlePartitionMap(topic, 0, 0L, 1, 0L, 2, 0L),
+            ImmutableSet.of()
+        ),
         new SeekableStreamEndSequenceNumbers<>(
             "topic",
             singlePartitionMap(topic, 0, Long.MAX_VALUE, 1, Long.MAX_VALUE, 2, 
Long.MAX_VALUE)
@@ -2986,7 +3298,11 @@ public class KafkaSupervisorTest extends EasyMockSupport
         "id2",
         DATASOURCE,
         0,
-        new SeekableStreamStartSequenceNumbers<>("topic", 
singlePartitionMap(topic, 0, 10L, 1, 20L, 2, 30L), ImmutableSet.of()),
+        new SeekableStreamStartSequenceNumbers<>(
+            "topic",
+            singlePartitionMap(topic, 0, 10L, 1, 20L, 2, 30L),
+            ImmutableSet.of()
+        ),
         new SeekableStreamEndSequenceNumbers<>(
             "topic",
             singlePartitionMap(topic, 0, Long.MAX_VALUE, 1, Long.MAX_VALUE, 2, 
Long.MAX_VALUE)
@@ -3000,7 +3316,11 @@ public class KafkaSupervisorTest extends EasyMockSupport
         "id3",
         DATASOURCE,
         0,
-        new SeekableStreamStartSequenceNumbers<>("topic", 
singlePartitionMap(topic, 0, 10L, 1, 20L, 2, 30L), ImmutableSet.of()),
+        new SeekableStreamStartSequenceNumbers<>(
+            "topic",
+            singlePartitionMap(topic, 0, 10L, 1, 20L, 2, 30L),
+            ImmutableSet.of()
+        ),
         new SeekableStreamEndSequenceNumbers<>(
             "topic",
             singlePartitionMap(topic, 0, Long.MAX_VALUE, 1, Long.MAX_VALUE, 2, 
Long.MAX_VALUE)
@@ -3132,7 +3452,11 @@ public class KafkaSupervisorTest extends EasyMockSupport
     );
 
     KafkaDataSourceMetadata resetMetadata = new KafkaDataSourceMetadata(
-        new SeekableStreamStartSequenceNumbers<>(topic, 
singlePartitionMap(topic, 1, 1000L, 2, 1000L), ImmutableSet.of())
+        new SeekableStreamStartSequenceNumbers<>(
+            topic,
+            singlePartitionMap(topic, 1, 1000L, 2, 1000L),
+            ImmutableSet.of()
+        )
     );
 
     KafkaDataSourceMetadata expectedMetadata = new KafkaDataSourceMetadata(
@@ -3249,7 +3573,11 @@ public class KafkaSupervisorTest extends EasyMockSupport
         "id1",
         DATASOURCE,
         0,
-        new SeekableStreamStartSequenceNumbers<>("topic", 
singlePartitionMap(topic, 0, 0L, 1, 0L, 2, 0L), ImmutableSet.of()),
+        new SeekableStreamStartSequenceNumbers<>(
+            "topic",
+            singlePartitionMap(topic, 0, 0L, 1, 0L, 2, 0L),
+            ImmutableSet.of()
+        ),
         new SeekableStreamEndSequenceNumbers<>(
             "topic",
             singlePartitionMap(topic, 0, Long.MAX_VALUE, 1, Long.MAX_VALUE, 2, 
Long.MAX_VALUE)
@@ -3263,7 +3591,11 @@ public class KafkaSupervisorTest extends EasyMockSupport
         "id2",
         DATASOURCE,
         0,
-        new SeekableStreamStartSequenceNumbers<>("topic", 
singlePartitionMap(topic, 0, 10L, 1, 20L, 2, 30L), ImmutableSet.of()),
+        new SeekableStreamStartSequenceNumbers<>(
+            "topic",
+            singlePartitionMap(topic, 0, 10L, 1, 20L, 2, 30L),
+            ImmutableSet.of()
+        ),
         new SeekableStreamEndSequenceNumbers<>(
             "topic",
             singlePartitionMap(topic, 0, Long.MAX_VALUE, 1, Long.MAX_VALUE, 2, 
Long.MAX_VALUE)
@@ -3277,7 +3609,11 @@ public class KafkaSupervisorTest extends EasyMockSupport
         "id3",
         DATASOURCE,
         0,
-        new SeekableStreamStartSequenceNumbers<>("topic", 
singlePartitionMap(topic, 0, 10L, 1, 20L, 2, 30L), ImmutableSet.of()),
+        new SeekableStreamStartSequenceNumbers<>(
+            "topic",
+            singlePartitionMap(topic, 0, 10L, 1, 20L, 2, 30L),
+            ImmutableSet.of()
+        ),
         new SeekableStreamEndSequenceNumbers<>(
             "topic",
             singlePartitionMap(topic, 0, Long.MAX_VALUE, 1, Long.MAX_VALUE, 2, 
Long.MAX_VALUE)
@@ -3355,7 +3691,11 @@ public class KafkaSupervisorTest extends EasyMockSupport
         "id1",
         DATASOURCE,
         0,
-        new SeekableStreamStartSequenceNumbers<>("topic", 
singlePartitionMap(topic, 0, 0L, 1, 0L, 2, 0L), ImmutableSet.of()),
+        new SeekableStreamStartSequenceNumbers<>(
+            "topic",
+            singlePartitionMap(topic, 0, 0L, 1, 0L, 2, 0L),
+            ImmutableSet.of()
+        ),
         new SeekableStreamEndSequenceNumbers<>(
             "topic",
             singlePartitionMap(topic, 0, Long.MAX_VALUE, 1, Long.MAX_VALUE, 2, 
Long.MAX_VALUE)
@@ -3369,7 +3709,11 @@ public class KafkaSupervisorTest extends EasyMockSupport
         "id2",
         DATASOURCE,
         0,
-        new SeekableStreamStartSequenceNumbers<>("topic", 
singlePartitionMap(topic, 0, 10L, 1, 20L, 2, 30L), ImmutableSet.of()),
+        new SeekableStreamStartSequenceNumbers<>(
+            "topic",
+            singlePartitionMap(topic, 0, 10L, 1, 20L, 2, 30L),
+            ImmutableSet.of()
+        ),
         new SeekableStreamEndSequenceNumbers<>(
             "topic",
             singlePartitionMap(topic, 0, Long.MAX_VALUE, 1, Long.MAX_VALUE, 2, 
Long.MAX_VALUE)
@@ -3383,7 +3727,11 @@ public class KafkaSupervisorTest extends EasyMockSupport
         "id3",
         DATASOURCE,
         0,
-        new SeekableStreamStartSequenceNumbers<>("topic", 
singlePartitionMap(topic, 0, 10L, 1, 20L, 2, 30L), ImmutableSet.of()),
+        new SeekableStreamStartSequenceNumbers<>(
+            "topic",
+            singlePartitionMap(topic, 0, 10L, 1, 20L, 2, 30L),
+            ImmutableSet.of()
+        ),
         new SeekableStreamEndSequenceNumbers<>(
             "topic",
             singlePartitionMap(topic, 0, Long.MAX_VALUE, 1, Long.MAX_VALUE, 2, 
Long.MAX_VALUE)
@@ -3459,7 +3807,11 @@ public class KafkaSupervisorTest extends EasyMockSupport
         "id1",
         DATASOURCE,
         0,
-        new SeekableStreamStartSequenceNumbers<>(topic, 
singlePartitionMap(topic, 0, 0L, 1, 0L, 2, 0L), ImmutableSet.of()),
+        new SeekableStreamStartSequenceNumbers<>(
+            topic,
+            singlePartitionMap(topic, 0, 0L, 1, 0L, 2, 0L),
+            ImmutableSet.of()
+        ),
         new SeekableStreamEndSequenceNumbers<>(
             topic,
             singlePartitionMap(topic, 0, Long.MAX_VALUE, 1, Long.MAX_VALUE, 2, 
Long.MAX_VALUE)
@@ -3473,7 +3825,11 @@ public class KafkaSupervisorTest extends EasyMockSupport
         "id2",
         DATASOURCE,
         0,
-        new SeekableStreamStartSequenceNumbers<>(topic, 
singlePartitionMap(topic, 0, 10L, 1, 20L, 2, 30L), ImmutableSet.of()),
+        new SeekableStreamStartSequenceNumbers<>(
+            topic,
+            singlePartitionMap(topic, 0, 10L, 1, 20L, 2, 30L),
+            ImmutableSet.of()
+        ),
         new SeekableStreamEndSequenceNumbers<>(
             topic,
             singlePartitionMap(topic, 0, Long.MAX_VALUE, 1, Long.MAX_VALUE, 2, 
Long.MAX_VALUE)
@@ -3487,7 +3843,11 @@ public class KafkaSupervisorTest extends EasyMockSupport
         "id3",
         DATASOURCE,
         0,
-        new SeekableStreamStartSequenceNumbers<>(topic, 
singlePartitionMap(topic, 0, 10L, 1, 20L, 2, 30L), ImmutableSet.of()),
+        new SeekableStreamStartSequenceNumbers<>(
+            topic,
+            singlePartitionMap(topic, 0, 10L, 1, 20L, 2, 30L),
+            ImmutableSet.of()
+        ),
         new SeekableStreamEndSequenceNumbers<>(
             topic,
             singlePartitionMap(topic, 0, Long.MAX_VALUE, 1, Long.MAX_VALUE, 2, 
Long.MAX_VALUE)
@@ -3517,8 +3877,10 @@ public class KafkaSupervisorTest extends EasyMockSupport
     
EasyMock.expect(taskStorage.getTask("id2")).andReturn(Optional.of(id2)).anyTimes();
     
EasyMock.expect(taskStorage.getTask("id3")).andReturn(Optional.of(id3)).anyTimes();
     EasyMock.expect(
-        
indexerMetadataStorageCoordinator.retrieveDataSourceMetadata(DATASOURCE)).andReturn(new
 KafkaDataSourceMetadata(null)
-    ).anyTimes();
+                
indexerMetadataStorageCoordinator.retrieveDataSourceMetadata(DATASOURCE))
+            .andReturn(new KafkaDataSourceMetadata(null)
+            )
+            .anyTimes();
     
EasyMock.expect(taskClient.getStatusAsync("id1")).andReturn(Futures.immediateFuture(Status.READING));
     
EasyMock.expect(taskClient.getStatusAsync("id2")).andReturn(Futures.immediateFuture(Status.READING));
     
EasyMock.expect(taskClient.getStatusAsync("id3")).andReturn(Futures.immediateFuture(Status.READING));
@@ -3574,7 +3936,11 @@ public class KafkaSupervisorTest extends EasyMockSupport
         "id1",
         DATASOURCE,
         0,
-        new SeekableStreamStartSequenceNumbers<>(topic, 
singlePartitionMap(topic, 0, 0L, 1, 0L, 2, 0L), ImmutableSet.of()),
+        new SeekableStreamStartSequenceNumbers<>(
+            topic,
+            singlePartitionMap(topic, 0, 0L, 1, 0L, 2, 0L),
+            ImmutableSet.of()
+        ),
         new SeekableStreamEndSequenceNumbers<>(
             topic,
             singlePartitionMap(topic, 0, Long.MAX_VALUE, 1, Long.MAX_VALUE, 2, 
Long.MAX_VALUE)
@@ -3588,7 +3954,11 @@ public class KafkaSupervisorTest extends EasyMockSupport
         "id2",
         DATASOURCE,
         0,
-        new SeekableStreamStartSequenceNumbers<>(topic, 
singlePartitionMap(topic, 0, 10L, 1, 20L, 2, 30L), ImmutableSet.of()),
+        new SeekableStreamStartSequenceNumbers<>(
+            topic,
+            singlePartitionMap(topic, 0, 10L, 1, 20L, 2, 30L),
+            ImmutableSet.of()
+        ),
         new SeekableStreamEndSequenceNumbers<>(
             topic,
             singlePartitionMap(topic, 0, Long.MAX_VALUE, 1, Long.MAX_VALUE, 2, 
Long.MAX_VALUE)
@@ -3602,7 +3972,11 @@ public class KafkaSupervisorTest extends EasyMockSupport
         "id3",
         DATASOURCE,
         0,
-        new SeekableStreamStartSequenceNumbers<>(topic, 
singlePartitionMap(topic, 0, 10L, 1, 20L, 2, 30L), ImmutableSet.of()),
+        new SeekableStreamStartSequenceNumbers<>(
+            topic,
+            singlePartitionMap(topic, 0, 10L, 1, 20L, 2, 30L),
+            ImmutableSet.of()
+        ),
         new SeekableStreamEndSequenceNumbers<>(
             topic,
             singlePartitionMap(topic, 0, Long.MAX_VALUE, 1, Long.MAX_VALUE, 2, 
Long.MAX_VALUE)
@@ -3624,8 +3998,10 @@ public class KafkaSupervisorTest extends EasyMockSupport
     
EasyMock.expect(taskStorage.getTask("id2")).andReturn(Optional.of(id2)).anyTimes();
     
EasyMock.expect(taskStorage.getTask("id3")).andReturn(Optional.of(id3)).anyTimes();
     EasyMock.expect(
-        
indexerMetadataStorageCoordinator.retrieveDataSourceMetadata(DATASOURCE)).andReturn(new
 KafkaDataSourceMetadata(null)
-    ).anyTimes();
+                
indexerMetadataStorageCoordinator.retrieveDataSourceMetadata(DATASOURCE))
+            .andReturn(new KafkaDataSourceMetadata(null)
+            )
+            .anyTimes();
 
     replayAll();
 
@@ -3703,7 +4079,11 @@ public class KafkaSupervisorTest extends EasyMockSupport
         "id1",
         DATASOURCE,
         0,
-        new SeekableStreamStartSequenceNumbers<>("topic", 
singlePartitionMap(topic, 0, 0L, 1, 0L, 2, 0L), ImmutableSet.of()),
+        new SeekableStreamStartSequenceNumbers<>(
+            "topic",
+            singlePartitionMap(topic, 0, 0L, 1, 0L, 2, 0L),
+            ImmutableSet.of()
+        ),
         new SeekableStreamEndSequenceNumbers<>(
             "topic",
             singlePartitionMap(topic, 0, Long.MAX_VALUE, 1, Long.MAX_VALUE, 2, 
Long.MAX_VALUE)
@@ -3717,7 +4097,11 @@ public class KafkaSupervisorTest extends EasyMockSupport
         "id2",
         DATASOURCE,
         0,
-        new SeekableStreamStartSequenceNumbers<>("topic", 
singlePartitionMap(topic, 0, 10L, 1, 20L, 2, 30L), ImmutableSet.of()),
+        new SeekableStreamStartSequenceNumbers<>(
+            "topic",
+            singlePartitionMap(topic, 0, 10L, 1, 20L, 2, 30L),
+            ImmutableSet.of()
+        ),
         new SeekableStreamEndSequenceNumbers<>(
             "topic",
             singlePartitionMap(topic, 0, Long.MAX_VALUE, 1, Long.MAX_VALUE, 2, 
Long.MAX_VALUE)
@@ -3731,7 +4115,11 @@ public class KafkaSupervisorTest extends EasyMockSupport
         "id3",
         DATASOURCE,
         0,
-        new SeekableStreamStartSequenceNumbers<>("topic", 
singlePartitionMap(topic, 0, 10L, 1, 20L, 2, 30L), ImmutableSet.of()),
+        new SeekableStreamStartSequenceNumbers<>(
+            "topic",
+            singlePartitionMap(topic, 0, 10L, 1, 20L, 2, 30L),
+            ImmutableSet.of()
+        ),
         new SeekableStreamEndSequenceNumbers<>(
             "topic",
             singlePartitionMap(topic, 0, Long.MAX_VALUE, 1, Long.MAX_VALUE, 2, 
Long.MAX_VALUE)
@@ -3909,29 +4297,47 @@ public class KafkaSupervisorTest extends EasyMockSupport
     Assert.assertEquals(topic, 
taskConfig.getStartSequenceNumbers().getStream());
     Assert.assertEquals(
         0L,
-        
taskConfig.getStartSequenceNumbers().getPartitionSequenceNumberMap().get(new 
KafkaTopicPartition(false, topic, 0)).longValue()
+        taskConfig.getStartSequenceNumbers()
+                  .getPartitionSequenceNumberMap()
+                  .get(new KafkaTopicPartition(false, topic, 0))
+                  .longValue()
     );
     Assert.assertEquals(
         0L,
-        
taskConfig.getStartSequenceNumbers().getPartitionSequenceNumberMap().get(new 
KafkaTopicPartition(false, topic, 1)).longValue()
+        taskConfig.getStartSequenceNumbers()
+                  .getPartitionSequenceNumberMap()
+                  .get(new KafkaTopicPartition(false, topic, 1))
+                  .longValue()
     );
     Assert.assertEquals(
         0L,
-        
taskConfig.getStartSequenceNumbers().getPartitionSequenceNumberMap().get(new 
KafkaTopicPartition(false, topic, 2)).longValue()
+        taskConfig.getStartSequenceNumbers()
+                  .getPartitionSequenceNumberMap()
+                  .get(new KafkaTopicPartition(false, topic, 2))
+                  .longValue()
     );
 
     Assert.assertEquals(topic, taskConfig.getEndSequenceNumbers().getStream());
     Assert.assertEquals(
         Long.MAX_VALUE,
-        
taskConfig.getEndSequenceNumbers().getPartitionSequenceNumberMap().get(new 
KafkaTopicPartition(false, topic, 0)).longValue()
+        taskConfig.getEndSequenceNumbers()
+                  .getPartitionSequenceNumberMap()
+                  .get(new KafkaTopicPartition(false, topic, 0))
+                  .longValue()
     );
     Assert.assertEquals(
         Long.MAX_VALUE,
-        
taskConfig.getEndSequenceNumbers().getPartitionSequenceNumberMap().get(new 
KafkaTopicPartition(false, topic, 1)).longValue()
+        taskConfig.getEndSequenceNumbers()
+                  .getPartitionSequenceNumberMap()
+                  .get(new KafkaTopicPartition(false, topic, 1))
+                  .longValue()
     );
     Assert.assertEquals(
         Long.MAX_VALUE,
-        
taskConfig.getEndSequenceNumbers().getPartitionSequenceNumberMap().get(new 
KafkaTopicPartition(false, topic, 2)).longValue()
+        taskConfig.getEndSequenceNumbers()
+                  .getPartitionSequenceNumberMap()
+                  .get(new KafkaTopicPartition(false, topic, 2))
+                  .longValue()
     );
   }
 
@@ -4147,7 +4553,10 @@ public class KafkaSupervisorTest extends EasyMockSupport
             singlePartitionMap(topic, 0, 0L, 2, 0L),
             ImmutableSet.of()
         ),
-        new SeekableStreamEndSequenceNumbers<>("topic", 
singlePartitionMap(topic, 0, Long.MAX_VALUE, 2, Long.MAX_VALUE)),
+        new SeekableStreamEndSequenceNumbers<>(
+            "topic",
+            singlePartitionMap(topic, 0, Long.MAX_VALUE, 2, Long.MAX_VALUE)
+        ),
         null,
         null,
         supervisor.getTuningConfig()
@@ -4485,67 +4894,80 @@ public class KafkaSupervisorTest extends EasyMockSupport
     EasyMock.reset(taskClient);
     addSomeEvents(100);
 
-    KafkaIndexTask readingTask = createKafkaIndexTask("readingTask",
-                                                      DATASOURCE,
-                                                      0,
-                                                      new 
SeekableStreamStartSequenceNumbers<>(topic, singlePartitionMap(topic, 0, 0L), 
Collections.emptySet()),
-                                                      new 
SeekableStreamEndSequenceNumbers<>(topic, singlePartitionMap(topic, 0, 
Long.MAX_VALUE)),
-                                                      null,
-                                                      null,
-                                                      
supervisor.getTuningConfig()
-    );
-
-    KafkaIndexTask publishingTask = createKafkaIndexTask("publishingTask",
-                                                         DATASOURCE,
-                                                         1,
-                                                         new 
SeekableStreamStartSequenceNumbers<>(topic, singlePartitionMap(topic, 0, 0L), 
Collections.emptySet()),
-                                                         new 
SeekableStreamEndSequenceNumbers<>(topic, singlePartitionMap(topic, 0, 
Long.MAX_VALUE)),
-                                                         null,
-                                                         null,
-                                                         
supervisor.getTuningConfig()
-    );
-
-    KafkaIndexTask pausedTask = createKafkaIndexTask("pausedTask",
-                                                     DATASOURCE,
-                                                     1,
-                                                     new 
SeekableStreamStartSequenceNumbers<>(topic, singlePartitionMap(topic, 1, 0L), 
Collections.emptySet()),
-                                                     new 
SeekableStreamEndSequenceNumbers<>(topic, singlePartitionMap(topic, 1, 
Long.MAX_VALUE)),
-                                                     null,
-                                                     null,
-                                                     
supervisor.getTuningConfig()
-    );
-
-    KafkaIndexTask failsToResumePausedTask = 
createKafkaIndexTask("failsToResumePausedTask",
-                                                                  DATASOURCE,
-                                                                  1,
-                                                                  new 
SeekableStreamStartSequenceNumbers<>(topic, singlePartitionMap(topic, 1, 0L), 
Collections.emptySet()),
-                                                                  new 
SeekableStreamEndSequenceNumbers<>(topic, singlePartitionMap(topic, 1, 
Long.MAX_VALUE)),
-                                                                  null,
-                                                                  null,
-                                                                  
supervisor.getTuningConfig()
-    );
-
-    KafkaIndexTask waitingTask = createKafkaIndexTask("waitingTask",
-                                                      DATASOURCE,
-                                                      2,
-                                                      new 
SeekableStreamStartSequenceNumbers<>(topic, singlePartitionMap(topic, 2, 0L), 
Collections.emptySet()),
-                                                      new 
SeekableStreamEndSequenceNumbers<>(topic, singlePartitionMap(topic, 2, 
Long.MAX_VALUE)),
-                                                      null,
-                                                      null,
-                                                      
supervisor.getTuningConfig()
-    );
-
-    KafkaIndexTask pendingTask = createKafkaIndexTask("pendingTask",
-                                                      DATASOURCE,
-                                                      2,
-                                                      new 
SeekableStreamStartSequenceNumbers<>(topic, singlePartitionMap(topic, 2, 0L), 
Collections.emptySet()),
-                                                      new 
SeekableStreamEndSequenceNumbers<>(topic, singlePartitionMap(topic, 2, 
Long.MAX_VALUE)),
-                                                      null,
-                                                      null,
-                                                      
supervisor.getTuningConfig()
-    );
-
-    List<Task> tasks = ImmutableList.of(readingTask, publishingTask, 
pausedTask, failsToResumePausedTask, waitingTask, pendingTask);
+    KafkaIndexTask readingTask = createKafkaIndexTask(
+        "readingTask",
+        DATASOURCE,
+        0,
+        new SeekableStreamStartSequenceNumbers<>(topic, 
singlePartitionMap(topic, 0, 0L), Collections.emptySet()),
+        new SeekableStreamEndSequenceNumbers<>(topic, 
singlePartitionMap(topic, 0, Long.MAX_VALUE)),
+        null,
+        null,
+        supervisor.getTuningConfig()
+    );
+
+    KafkaIndexTask publishingTask = createKafkaIndexTask(
+        "publishingTask",
+        DATASOURCE,
+        1,
+        new SeekableStreamStartSequenceNumbers<>(topic, 
singlePartitionMap(topic, 0, 0L), Collections.emptySet()),
+        new SeekableStreamEndSequenceNumbers<>(topic, 
singlePartitionMap(topic, 0, Long.MAX_VALUE)),
+        null,
+        null,
+        supervisor.getTuningConfig()
+    );
+
+    KafkaIndexTask pausedTask = createKafkaIndexTask(
+        "pausedTask",
+        DATASOURCE,
+        1,
+        new SeekableStreamStartSequenceNumbers<>(topic, 
singlePartitionMap(topic, 1, 0L), Collections.emptySet()),
+        new SeekableStreamEndSequenceNumbers<>(topic, 
singlePartitionMap(topic, 1, Long.MAX_VALUE)),
+        null,
+        null,
+        supervisor.getTuningConfig()
+    );
+
+    KafkaIndexTask failsToResumePausedTask = createKafkaIndexTask(
+        "failsToResumePausedTask",
+        DATASOURCE,
+        1,
+        new SeekableStreamStartSequenceNumbers<>(topic, 
singlePartitionMap(topic, 1, 0L), Collections.emptySet()),
+        new SeekableStreamEndSequenceNumbers<>(topic, 
singlePartitionMap(topic, 1, Long.MAX_VALUE)),
+        null,
+        null,
+        supervisor.getTuningConfig()
+    );
+
+    KafkaIndexTask waitingTask = createKafkaIndexTask(
+        "waitingTask",
+        DATASOURCE,
+        2,
+        new SeekableStreamStartSequenceNumbers<>(topic, 
singlePartitionMap(topic, 2, 0L), Collections.emptySet()),
+        new SeekableStreamEndSequenceNumbers<>(topic, 
singlePartitionMap(topic, 2, Long.MAX_VALUE)),
+        null,
+        null,
+        supervisor.getTuningConfig()
+    );
+
+    KafkaIndexTask pendingTask = createKafkaIndexTask(
+        "pendingTask",
+        DATASOURCE,
+        2,
+        new SeekableStreamStartSequenceNumbers<>(topic, 
singlePartitionMap(topic, 2, 0L), Collections.emptySet()),
+        new SeekableStreamEndSequenceNumbers<>(topic, 
singlePartitionMap(topic, 2, Long.MAX_VALUE)),
+        null,
+        null,
+        supervisor.getTuningConfig()
+    );
+
+    List<Task> tasks = ImmutableList.of(
+        readingTask,
+        publishingTask,
+        pausedTask,
+        failsToResumePausedTask,
+        waitingTask,
+        pendingTask
+    );
     Collection taskRunnerWorkItems = ImmutableList.of(
         new TestTaskRunnerWorkItem(readingTask, null, 
TaskLocation.create("testHost", 1001, -1)),
         new TestTaskRunnerWorkItem(publishingTask, null, 
TaskLocation.create("testHost", 1002, -1)),
@@ -4641,7 +5063,8 @@ public class KafkaSupervisorTest extends EasyMockSupport
     
EasyMock.expect(taskClient.getStartTimeAsync(pausedTask.getId())).andReturn(Futures.immediateFuture(startTime));
     
EasyMock.expect(taskClient.resumeAsync(pausedTask.getId())).andReturn(Futures.immediateFuture(true));
 
-    
EasyMock.expect(taskClient.getStartTimeAsync(failsToResumePausedTask.getId())).andReturn(Futures.immediateFuture(startTime));
+    
EasyMock.expect(taskClient.getStartTimeAsync(failsToResumePausedTask.getId()))
+            .andReturn(Futures.immediateFuture(startTime));
     
EasyMock.expect(taskClient.resumeAsync(failsToResumePausedTask.getId())).andReturn(Futures.immediateFuture(false));
 
     Capture<String> shutdownTaskId = EasyMock.newCapture();
@@ -4856,11 +5279,10 @@ public class KafkaSupervisorTest extends EasyMockSupport
       public SeekableStreamIndexTaskClient<KafkaTopicPartition, Long> build(
           String dataSource,
           TaskInfoProvider taskInfoProvider,
-          int maxNumTasks,
-          SeekableStreamSupervisorTuningConfig tuningConfig
+          SeekableStreamSupervisorTuningConfig tuningConfig,
+          ScheduledExecutorService connectExec
       )
       {
-        Assert.assertEquals(replicas * taskCount, maxNumTasks);
         Assert.assertEquals(TEST_HTTP_TIMEOUT.toStandardDuration(), 
tuningConfig.getHttpTimeout());
         Assert.assertEquals(TEST_CHAT_RETRIES, (long) 
tuningConfig.getChatRetries());
         return taskClient;
@@ -4971,11 +5393,10 @@ public class KafkaSupervisorTest extends EasyMockSupport
       public SeekableStreamIndexTaskClient<KafkaTopicPartition, Long> build(
           String dataSource,
           TaskInfoProvider taskInfoProvider,
-          int maxNumTasks,
-          SeekableStreamSupervisorTuningConfig tuningConfig
+          SeekableStreamSupervisorTuningConfig tuningConfig,
+          ScheduledExecutorService connectExec
       )
       {
-        Assert.assertEquals(replicas * taskCount, maxNumTasks);
         Assert.assertEquals(TEST_HTTP_TIMEOUT.toStandardDuration(), 
tuningConfig.getHttpTimeout());
         Assert.assertEquals(TEST_CHAT_RETRIES, (long) 
tuningConfig.getChatRetries());
         return taskClient;
@@ -5089,11 +5510,10 @@ public class KafkaSupervisorTest extends EasyMockSupport
       public SeekableStreamIndexTaskClient<KafkaTopicPartition, Long> build(
           String dataSource,
           TaskInfoProvider taskInfoProvider,
-          int maxNumTasks,
-          SeekableStreamSupervisorTuningConfig tuningConfig
+          SeekableStreamSupervisorTuningConfig tuningConfig,
+          ScheduledExecutorService connectExec
       )
       {
-        Assert.assertEquals(replicas * taskCount, maxNumTasks);
         Assert.assertEquals(TEST_HTTP_TIMEOUT.toStandardDuration(), 
tuningConfig.getHttpTimeout());
         Assert.assertEquals(TEST_CHAT_RETRIES, (long) 
tuningConfig.getChatRetries());
         return taskClient;
@@ -5227,30 +5647,63 @@ public class KafkaSupervisorTest extends EasyMockSupport
         OBJECT_MAPPER
     );
   }
-  
+
   private static ImmutableMap<KafkaTopicPartition, Long> 
singlePartitionMap(String topic, int partition, long offset)
   {
     return ImmutableMap.of(new KafkaTopicPartition(false, topic, partition), 
offset);
   }
 
-  private static ImmutableMap<KafkaTopicPartition, Long> 
singlePartitionMap(String topic, int partition1, long offset1, int partition2, 
long offset2)
+  private static ImmutableMap<KafkaTopicPartition, Long> singlePartitionMap(
+      String topic,
+      int partition1,
+      long offset1,
+      int partition2,
+      long offset2
+  )
   {
-    return ImmutableMap.of(new KafkaTopicPartition(false, topic, partition1), 
offset1, new KafkaTopicPartition(false, topic, partition2),
-      offset2);
+    return ImmutableMap.of(new KafkaTopicPartition(false, topic, partition1),
+                           offset1,
+                           new KafkaTopicPartition(false, topic, partition2),
+                           offset2
+    );
   }
 
-  private static ImmutableMap<KafkaTopicPartition, Long> 
singlePartitionMap(String topic, int partition1, long offset1,
-                                                              int partition2, 
long offset2, int partition3, long offset3)
+  private static ImmutableMap<KafkaTopicPartition, Long> singlePartitionMap(
+      String topic,
+      int partition1,
+      long offset1,
+      int partition2,
+      long offset2,
+      int partition3,
+      long offset3
+  )
   {
-    return ImmutableMap.of(new KafkaTopicPartition(false, topic, partition1), 
offset1, new KafkaTopicPartition(false, topic, partition2),
-                           offset2, new KafkaTopicPartition(false, topic, 
partition3), offset3);
+    return ImmutableMap.of(new KafkaTopicPartition(false, topic, partition1),
+                           offset1,
+                           new KafkaTopicPartition(false, topic, partition2),
+                           offset2,
+                           new KafkaTopicPartition(false, topic, partition3),
+                           offset3
+    );
   }
 
-  private static ImmutableMap<KafkaTopicPartition, Long> 
multiTopicPartitionMap(String topic, int partition1, long offset1,
-                                                                               
 int partition2, long offset2, int partition3, long offset3)
+  private static ImmutableMap<KafkaTopicPartition, Long> 
multiTopicPartitionMap(
+      String topic,
+      int partition1,
+      long offset1,
+      int partition2,
+      long offset2,
+      int partition3,
+      long offset3
+  )
   {
-    return ImmutableMap.of(new KafkaTopicPartition(true, topic, partition1), 
offset1, new KafkaTopicPartition(true, topic, partition2),
-        offset2, new KafkaTopicPartition(true, topic, partition3), offset3);
+    return ImmutableMap.of(new KafkaTopicPartition(true, topic, partition1),
+                           offset1,
+                           new KafkaTopicPartition(true, topic, partition2),
+                           offset2,
+                           new KafkaTopicPartition(true, topic, partition3),
+                           offset3
+    );
   }
 
   private static class TestTaskRunnerWorkItem extends TaskRunnerWorkItem
diff --git 
a/extensions-core/kinesis-indexing-service/src/main/java/org/apache/druid/indexing/kinesis/KinesisIndexTaskClientFactory.java
 
b/extensions-core/kinesis-indexing-service/src/main/java/org/apache/druid/indexing/kinesis/KinesisIndexTaskClientFactory.java
index 2399008688c..599c32b2158 100644
--- 
a/extensions-core/kinesis-indexing-service/src/main/java/org/apache/druid/indexing/kinesis/KinesisIndexTaskClientFactory.java
+++ 
b/extensions-core/kinesis-indexing-service/src/main/java/org/apache/druid/indexing/kinesis/KinesisIndexTaskClientFactory.java
@@ -25,18 +25,18 @@ import org.apache.druid.guice.LazySingleton;
 import org.apache.druid.guice.annotations.EscalatedGlobal;
 import org.apache.druid.guice.annotations.Json;
 import 
org.apache.druid.indexing.seekablestream.SeekableStreamIndexTaskClientFactory;
-import org.apache.druid.rpc.ServiceClientFactory;
+import org.apache.druid.java.util.http.client.HttpClient;
 
 @LazySingleton
 public class KinesisIndexTaskClientFactory extends 
SeekableStreamIndexTaskClientFactory<String, String>
 {
   @Inject
   public KinesisIndexTaskClientFactory(
-      @EscalatedGlobal ServiceClientFactory serviceClientFactory,
+      @EscalatedGlobal HttpClient httpClient,
       @Json ObjectMapper mapper
   )
   {
-    super(serviceClientFactory, mapper);
+    super(httpClient, mapper);
   }
 
   @Override
diff --git 
a/extensions-core/kinesis-indexing-service/src/test/java/org/apache/druid/indexing/kinesis/supervisor/KinesisSupervisorTest.java
 
b/extensions-core/kinesis-indexing-service/src/test/java/org/apache/druid/indexing/kinesis/supervisor/KinesisSupervisorTest.java
index fe851a183a2..122a8e1c5ae 100644
--- 
a/extensions-core/kinesis-indexing-service/src/test/java/org/apache/druid/indexing/kinesis/supervisor/KinesisSupervisorTest.java
+++ 
b/extensions-core/kinesis-indexing-service/src/test/java/org/apache/druid/indexing/kinesis/supervisor/KinesisSupervisorTest.java
@@ -111,6 +111,7 @@ import java.util.Set;
 import java.util.TreeMap;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.Executor;
+import java.util.concurrent.ScheduledExecutorService;
 
 public class KinesisSupervisorTest extends EasyMockSupport
 {
@@ -5117,11 +5118,10 @@ public class KinesisSupervisorTest extends 
EasyMockSupport
       public SeekableStreamIndexTaskClient<String, String> build(
           String dataSource,
           TaskInfoProvider taskInfoProvider,
-          int maxNumTasks,
-          SeekableStreamSupervisorTuningConfig tuningConfig
+          SeekableStreamSupervisorTuningConfig tuningConfig,
+          ScheduledExecutorService connectExec
       )
       {
-        Assert.assertEquals(replicas * taskCount, maxNumTasks);
         Assert.assertEquals(TEST_HTTP_TIMEOUT.toStandardDuration(), 
tuningConfig.getHttpTimeout());
         Assert.assertEquals(TEST_CHAT_RETRIES, (long) 
tuningConfig.getChatRetries());
         return taskClient;
@@ -5259,14 +5259,10 @@ public class KinesisSupervisorTest extends 
EasyMockSupport
       public SeekableStreamIndexTaskClient<String, String> build(
           String dataSource,
           TaskInfoProvider taskInfoProvider,
-          int maxNumTasks,
-          SeekableStreamSupervisorTuningConfig tuningConfig
+          SeekableStreamSupervisorTuningConfig tuningConfig,
+          ScheduledExecutorService connectExec
       )
       {
-        Assert.assertEquals(
-            replicas * (autoScalerConfig != null ? 
autoScalerConfig.getTaskCountMax() : taskCount),
-            maxNumTasks
-        );
         Assert.assertEquals(TEST_HTTP_TIMEOUT.toStandardDuration(), 
tuningConfig.getHttpTimeout());
         Assert.assertEquals(TEST_CHAT_RETRIES, (long) 
tuningConfig.getChatRetries());
         return taskClient;
@@ -5348,11 +5344,10 @@ public class KinesisSupervisorTest extends 
EasyMockSupport
       public SeekableStreamIndexTaskClient<String, String> build(
           String dataSource,
           TaskInfoProvider taskInfoProvider,
-          int maxNumTasks,
-          SeekableStreamSupervisorTuningConfig tuningConfig
+          SeekableStreamSupervisorTuningConfig tuningConfig,
+          ScheduledExecutorService connectExec
       )
       {
-        Assert.assertEquals(replicas * taskCount, maxNumTasks);
         Assert.assertEquals(TEST_HTTP_TIMEOUT.toStandardDuration(), 
tuningConfig.getHttpTimeout());
         Assert.assertEquals(TEST_CHAT_RETRIES, (long) 
tuningConfig.getChatRetries());
         return taskClient;
@@ -5436,11 +5431,10 @@ public class KinesisSupervisorTest extends 
EasyMockSupport
       public SeekableStreamIndexTaskClient<String, String> build(
           String dataSource,
           TaskInfoProvider taskInfoProvider,
-          int maxNumTasks,
-          SeekableStreamSupervisorTuningConfig tuningConfig
+          SeekableStreamSupervisorTuningConfig tuningConfig,
+          ScheduledExecutorService connectExec
       )
       {
-        Assert.assertEquals(replicas * taskCount, maxNumTasks);
         Assert.assertEquals(TEST_HTTP_TIMEOUT.toStandardDuration(), 
tuningConfig.getHttpTimeout());
         Assert.assertEquals(TEST_CHAT_RETRIES, (long) 
tuningConfig.getChatRetries());
         return taskClient;
diff --git 
a/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/SeekableStreamIndexTaskClientFactory.java
 
b/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/SeekableStreamIndexTaskClientFactory.java
index 5bdf8aaac39..c3b4e69f832 100644
--- 
a/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/SeekableStreamIndexTaskClientFactory.java
+++ 
b/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/SeekableStreamIndexTaskClientFactory.java
@@ -21,31 +21,46 @@ package org.apache.druid.indexing.seekablestream;
 
 import com.fasterxml.jackson.databind.ObjectMapper;
 import org.apache.druid.indexing.common.TaskInfoProvider;
+import 
org.apache.druid.indexing.seekablestream.supervisor.SeekableStreamSupervisor;
 import 
org.apache.druid.indexing.seekablestream.supervisor.SeekableStreamSupervisorTuningConfig;
 import org.apache.druid.java.util.common.logger.Logger;
+import org.apache.druid.java.util.http.client.HttpClient;
 import org.apache.druid.rpc.ServiceClientFactory;
+import org.apache.druid.rpc.ServiceClientFactoryImpl;
+
+import java.util.concurrent.ScheduledExecutorService;
 
 public abstract class SeekableStreamIndexTaskClientFactory<PartitionIdType, 
SequenceOffsetType>
 {
   private static final Logger log = new 
Logger(SeekableStreamIndexTaskClientFactory.class);
 
-  private final ServiceClientFactory serviceClientFactory;
+  private final HttpClient httpClient;
   private final ObjectMapper jsonMapper;
 
   protected SeekableStreamIndexTaskClientFactory(
-      final ServiceClientFactory serviceClientFactory,
+      final HttpClient httpClient,
       final ObjectMapper jsonMapper
   )
   {
-    this.serviceClientFactory = serviceClientFactory;
+    this.httpClient = httpClient;
     this.jsonMapper = jsonMapper;
   }
 
+  /**
+   * Creates a task client for a specific supervisor.
+   *
+   * @param dataSource       task datasource
+   * @param taskInfoProvider task locator
+   * @param tuningConfig     from {@link SeekableStreamSupervisor#tuningConfig}
+   * @param connectExec      should generally be {@link 
SeekableStreamSupervisor#workerExec}. This is preferable to
+   *                         the global pool for the default {@link 
ServiceClientFactory}, to prevent callbacks from
+   *                         different supervisors from backlogging each other.
+   */
   public SeekableStreamIndexTaskClient<PartitionIdType, SequenceOffsetType> 
build(
       final String dataSource,
       final TaskInfoProvider taskInfoProvider,
-      final int maxNumTasks,
-      final SeekableStreamSupervisorTuningConfig tuningConfig
+      final SeekableStreamSupervisorTuningConfig tuningConfig,
+      final ScheduledExecutorService connectExec
   )
   {
     log.info(
@@ -57,7 +72,7 @@ public abstract class 
SeekableStreamIndexTaskClientFactory<PartitionIdType, Sequ
 
     return new SeekableStreamIndexTaskClientAsyncImpl<PartitionIdType, 
SequenceOffsetType>(
         dataSource,
-        serviceClientFactory,
+        new ServiceClientFactoryImpl(httpClient, connectExec),
         taskInfoProvider,
         jsonMapper,
         tuningConfig.getHttpTimeout(),
diff --git 
a/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisor.java
 
b/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisor.java
index 86c4ba385bd..4cbf0ccfa68 100644
--- 
a/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisor.java
+++ 
b/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisor.java
@@ -33,7 +33,7 @@ import com.google.common.collect.ImmutableSet;
 import com.google.common.collect.Sets;
 import com.google.common.util.concurrent.Futures;
 import com.google.common.util.concurrent.ListenableFuture;
-import com.google.common.util.concurrent.ListeningExecutorService;
+import com.google.common.util.concurrent.ListeningScheduledExecutorService;
 import com.google.common.util.concurrent.MoreExecutors;
 import it.unimi.dsi.fastutil.ints.Int2ObjectLinkedOpenHashMap;
 import it.unimi.dsi.fastutil.ints.Int2ObjectMap;
@@ -65,7 +65,6 @@ import 
org.apache.druid.indexing.seekablestream.SeekableStreamDataSourceMetadata
 import 
org.apache.druid.indexing.seekablestream.SeekableStreamEndSequenceNumbers;
 import org.apache.druid.indexing.seekablestream.SeekableStreamIndexTask;
 import org.apache.druid.indexing.seekablestream.SeekableStreamIndexTaskClient;
-import 
org.apache.druid.indexing.seekablestream.SeekableStreamIndexTaskClientAsyncImpl;
 import 
org.apache.druid.indexing.seekablestream.SeekableStreamIndexTaskClientFactory;
 import 
org.apache.druid.indexing.seekablestream.SeekableStreamIndexTaskIOConfig;
 import org.apache.druid.indexing.seekablestream.SeekableStreamIndexTaskRunner;
@@ -86,6 +85,7 @@ import org.apache.druid.java.util.common.RetryUtils;
 import org.apache.druid.java.util.common.Stopwatch;
 import org.apache.druid.java.util.common.StringUtils;
 import org.apache.druid.java.util.common.concurrent.Execs;
+import org.apache.druid.java.util.common.concurrent.ScheduledExecutors;
 import org.apache.druid.java.util.emitter.EmittingLogger;
 import org.apache.druid.java.util.emitter.service.ServiceEmitter;
 import org.apache.druid.java.util.emitter.service.ServiceMetricEvent;
@@ -837,12 +837,25 @@ public abstract class 
SeekableStreamSupervisor<PartitionIdType, SequenceOffsetTy
   private final SeekableStreamIndexTaskTuningConfig taskTuningConfig;
   private final String supervisorId;
   private final TaskInfoProvider taskInfoProvider;
-  private final long futureTimeoutInSeconds; // how long to wait for async 
operations to complete
   private final RowIngestionMetersFactory rowIngestionMetersFactory;
+  /**
+   * Single-threaded executor for running {@link Notice#handle()} from {@link 
#notices}.
+   */
   private final ExecutorService exec;
+  /**
+   * Single-threaded scheduled executor for adding periodic {@link RunNotice} 
to {@link #notices}.
+   */
   private final ScheduledExecutorService scheduledExec;
+  /**
+   * Single-threaded scheduled executor for reporting metircs on notice queue 
size, lag, etc.
+   * See {@link #scheduleReporting}.
+   */
   private final ScheduledExecutorService reportingExec;
-  private final ListeningExecutorService workerExec;
+  /**
+   * Multi-threaded executor for managing communications with workers, 
including handling callbacks from worker RPCs.
+   * Also serves as the connectExec for {@link #taskClient}.
+   */
+  private final ListeningScheduledExecutorService workerExec;
   private final NoticesQueue<Notice> notices = new NoticesQueue<>();
   private final Object stopLock = new Object();
   private final Object stateChangeLock = new Object();
@@ -903,21 +916,16 @@ public abstract class 
SeekableStreamSupervisor<PartitionIdType, SequenceOffsetTy
     );
 
     int workerThreads;
-    int maxNumTasks;
     if (autoScalerConfig != null && 
autoScalerConfig.getEnableTaskAutoScaler()) {
       log.info("Running Task autoscaler for datasource [%s]", dataSource);
 
       workerThreads = (this.tuningConfig.getWorkerThreads() != null
                        ? this.tuningConfig.getWorkerThreads()
                        : Math.min(10, autoScalerConfig.getTaskCountMax()));
-
-      maxNumTasks = autoScalerConfig.getTaskCountMax() * 
this.ioConfig.getReplicas();
     } else {
       workerThreads = (this.tuningConfig.getWorkerThreads() != null
                        ? this.tuningConfig.getWorkerThreads()
                        : Math.min(10, this.ioConfig.getTaskCount()));
-
-      maxNumTasks = this.ioConfig.getTaskCount() * this.ioConfig.getReplicas();
     }
 
     IdleConfig specIdleConfig = spec.getIoConfig().getIdleConfig();
@@ -938,7 +946,7 @@ public abstract class 
SeekableStreamSupervisor<PartitionIdType, SequenceOffsetTy
     }
 
     this.workerExec = MoreExecutors.listeningDecorator(
-        Execs.multiThreaded(
+        ScheduledExecutors.fixed(
             workerThreads,
             StringUtils.encodeForFormat(supervisorId) + "-Worker-%d"
         )
@@ -973,13 +981,7 @@ public abstract class 
SeekableStreamSupervisor<PartitionIdType, SequenceOffsetTy
       }
     };
 
-    this.futureTimeoutInSeconds = Math.max(
-        MINIMUM_FUTURE_TIMEOUT_IN_SECONDS,
-        tuningConfig.getChatRetries() * 
(tuningConfig.getHttpTimeout().getStandardSeconds()
-                                         + 
SeekableStreamIndexTaskClientAsyncImpl.MAX_RETRY_WAIT_SECONDS)
-    );
-
-    this.taskClient = taskClientFactory.build(dataSource, taskInfoProvider, 
maxNumTasks, this.tuningConfig);
+    this.taskClient = taskClientFactory.build(dataSource, taskInfoProvider, 
this.tuningConfig, workerExec);
   }
 
   @Override
diff --git 
a/indexing-service/src/test/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisorStateTest.java
 
b/indexing-service/src/test/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisorStateTest.java
index 40bbe84b623..af66ce3b8b9 100644
--- 
a/indexing-service/src/test/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisorStateTest.java
+++ 
b/indexing-service/src/test/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisorStateTest.java
@@ -179,7 +179,7 @@ public class SeekableStreamSupervisorStateTest extends 
EasyMockSupport
     EasyMock.expect(taskClientFactory.build(
         EasyMock.anyString(),
         EasyMock.anyObject(),
-        EasyMock.anyInt(),
+        EasyMock.anyObject(),
         EasyMock.anyObject()
     )).andReturn(
         indexTaskClient).anyTimes();


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]


Reply via email to