This is an automated email from the ASF dual-hosted git repository.

kfaraz pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/druid.git


The following commit(s) were added to refs/heads/master by this push:
     new 0c849d6a390 Add builders for Kafka supervisor spec, ioConfig, 
tuningConfig (#18460)
0c849d6a390 is described below

commit 0c849d6a3902ac77e31da7ebe993707c46c52949
Author: Kashif Faraz <[email protected]>
AuthorDate: Tue Sep 2 22:58:10 2025 +0530

    Add builders for Kafka supervisor spec, ioConfig, tuningConfig (#18460)
    
    Changes:
    - Add `KafkaSupervisorSpecBuilder`
    - Add `KafkaTuningConfigBuilder`
    - Add `KafkaIOConfigBuilder`
    - Use the above builders in tests
---
 .../embedded/compact/AutoCompactionTest.java       |   6 +-
 .../embedded/compact/CompactionTestBase.java       |   2 +-
 .../embedded/indexing/IngestionSmokeTest.java      |  49 +--
 .../embedded/indexing/KafkaClusterMetricsTest.java |  76 +----
 .../embedded/indexing/KafkaDataFormatsTest.java    |  91 ++----
 .../testing/embedded/indexing/MoreResources.java   |  32 ++
 .../embedded/msq/BaseRealtimeQueryTest.java        |  68 +---
 .../embedded/server/OverlordClientTest.java        |  26 +-
 .../kafka/supervisor/KafkaSupervisorSpec.java      |  12 +-
 .../kafka/KafkaIndexTaskTuningConfigTest.java      |  48 +--
 .../druid/indexing/kafka/KafkaSamplerSpecTest.java | 355 +++++----------------
 .../simulate/EmbeddedKafkaSupervisorTest.java      |  57 ++--
 .../kafka/supervisor/KafkaIOConfigBuilder.java     |  96 ++++++
 .../supervisor/KafkaSupervisorSpecBuilder.java     |  96 ++++++
 .../kafka/supervisor/KafkaSupervisorSpecTest.java  | 121 ++-----
 .../kafka/supervisor/KafkaSupervisorTest.java      | 239 +++-----------
 .../KafkaSupervisorTuningConfigTest.java           |   4 +-
 .../kafka/supervisor/KafkaTuningConfigBuilder.java | 119 +++++++
 indexing-service/pom.xml                           |   6 +
 .../druid/indexing/common/task/TaskBuilder.java    | 116 ++++---
 .../indexing/common/task/TuningConfigBuilder.java  | 147 +++++----
 .../supervisor/SupervisorIOConfigBuilder.java      | 164 ++++++++++
 22 files changed, 930 insertions(+), 1000 deletions(-)

diff --git 
a/embedded-tests/src/test/java/org/apache/druid/testing/embedded/compact/AutoCompactionTest.java
 
b/embedded-tests/src/test/java/org/apache/druid/testing/embedded/compact/AutoCompactionTest.java
index bd89df768a2..6ef481d210f 100644
--- 
a/embedded-tests/src/test/java/org/apache/druid/testing/embedded/compact/AutoCompactionTest.java
+++ 
b/embedded-tests/src/test/java/org/apache/druid/testing/embedded/compact/AutoCompactionTest.java
@@ -1616,17 +1616,17 @@ public class AutoCompactionTest extends 
CompactionTestBase
     }
   }
 
-  private <T extends TaskBuilder.IndexCommon<?, ?, ?>> void 
loadData(Supplier<T> updatePayload)
+  private <T extends TaskBuilder.IndexCommon<?, ?, ?, ?>> void 
loadData(Supplier<T> updatePayload)
   {
     loadData(updatePayload, null);
   }
 
-  private <T extends TaskBuilder.IndexCommon<?, ?, ?>> void loadData(
+  private <T extends TaskBuilder.IndexCommon<?, ?, ?, ?>> void loadData(
       Supplier<T> taskPayloadSupplier,
       GranularitySpec granularitySpec
   )
   {
-    final TaskBuilder.IndexCommon<?, ?, ?> taskBuilder = 
taskPayloadSupplier.get();
+    final TaskBuilder.IndexCommon<?, ?, ?, ?> taskBuilder = 
taskPayloadSupplier.get();
     taskBuilder.dataSource(fullDatasourceName);
     if (granularitySpec != null) {
       taskBuilder.granularitySpec(granularitySpec);
diff --git 
a/embedded-tests/src/test/java/org/apache/druid/testing/embedded/compact/CompactionTestBase.java
 
b/embedded-tests/src/test/java/org/apache/druid/testing/embedded/compact/CompactionTestBase.java
index 109c9c12aae..6719c0b5496 100644
--- 
a/embedded-tests/src/test/java/org/apache/druid/testing/embedded/compact/CompactionTestBase.java
+++ 
b/embedded-tests/src/test/java/org/apache/druid/testing/embedded/compact/CompactionTestBase.java
@@ -67,7 +67,7 @@ public abstract class CompactionTestBase extends 
EmbeddedClusterTestBase
   /**
    * Creates and runs a task for the current {@link #dataSource}.
    */
-  protected String runTask(TaskBuilder<?, ?, ?> taskBuilder)
+  protected String runTask(TaskBuilder<?, ?, ?, ?> taskBuilder)
   {
     final String taskId = IdUtils.getRandomId();
     
cluster.callApi().runTask(taskBuilder.dataSource(dataSource).withId(taskId), 
overlord);
diff --git 
a/embedded-tests/src/test/java/org/apache/druid/testing/embedded/indexing/IngestionSmokeTest.java
 
b/embedded-tests/src/test/java/org/apache/druid/testing/embedded/indexing/IngestionSmokeTest.java
index af9c3d21c8e..668079065c4 100644
--- 
a/embedded-tests/src/test/java/org/apache/druid/testing/embedded/indexing/IngestionSmokeTest.java
+++ 
b/embedded-tests/src/test/java/org/apache/druid/testing/embedded/indexing/IngestionSmokeTest.java
@@ -22,7 +22,6 @@ package org.apache.druid.testing.embedded.indexing;
 import com.google.common.collect.ImmutableList;
 import org.apache.druid.common.utils.IdUtils;
 import org.apache.druid.data.input.impl.CsvInputFormat;
-import org.apache.druid.data.input.impl.DimensionsSpec;
 import org.apache.druid.data.input.impl.TimestampSpec;
 import org.apache.druid.indexer.TaskState;
 import org.apache.druid.indexer.TaskStatusPlus;
@@ -32,9 +31,7 @@ import org.apache.druid.indexing.common.task.TaskBuilder;
 import 
org.apache.druid.indexing.common.task.batch.parallel.ParallelIndexSupervisorTask;
 import org.apache.druid.indexing.kafka.KafkaIndexTaskModule;
 import org.apache.druid.indexing.kafka.simulate.KafkaResource;
-import org.apache.druid.indexing.kafka.supervisor.KafkaSupervisorIOConfig;
 import org.apache.druid.indexing.kafka.supervisor.KafkaSupervisorSpec;
-import org.apache.druid.indexing.kafka.supervisor.KafkaSupervisorTuningConfig;
 import org.apache.druid.indexing.overlord.Segments;
 import org.apache.druid.indexing.overlord.supervisor.SupervisorStatus;
 import org.apache.druid.java.util.common.DateTimes;
@@ -44,7 +41,6 @@ import 
org.apache.druid.java.util.common.parsers.CloseableIterator;
 import 
org.apache.druid.metadata.storage.postgresql.PostgreSQLMetadataStorageModule;
 import org.apache.druid.query.DruidMetrics;
 import org.apache.druid.query.http.SqlTaskStatus;
-import org.apache.druid.segment.indexing.DataSchema;
 import org.apache.druid.testing.embedded.EmbeddedBroker;
 import org.apache.druid.testing.embedded.EmbeddedCoordinator;
 import org.apache.druid.testing.embedded.EmbeddedDruidCluster;
@@ -304,7 +300,7 @@ public class IngestionSmokeTest extends 
EmbeddedClusterTestBase
         (CloseableIterator<TaskStatusPlus>)
             cluster.callApi().onLeaderOverlord(o -> o.taskStatuses(null, 
dataSource, 1))
     );
-    Assertions.assertEquals(1, taskStatuses.size());
+    Assertions.assertFalse(taskStatuses.isEmpty());
     Assertions.assertEquals(TaskState.RUNNING, 
taskStatuses.get(0).getStatusCode());
 
     // Suspend the supervisor and verify the state
@@ -317,39 +313,16 @@ public class IngestionSmokeTest extends 
EmbeddedClusterTestBase
 
   private KafkaSupervisorSpec createKafkaSupervisor(String topic)
   {
-    return new KafkaSupervisorSpec(
-        null,
-        null,
-        DataSchema.builder()
-                  .withDataSource(dataSource)
-                  .withTimestamp(new TimestampSpec("timestamp", null, null))
-                  .withDimensions(DimensionsSpec.EMPTY)
-                  .build(),
-        createTuningConfig(),
-        new KafkaSupervisorIOConfig(
-            topic,
-            null,
-            new CsvInputFormat(List.of("timestamp", "item"), null, null, 
false, 0, false),
-            null, null,
-            null,
-            kafkaServer.consumerProperties(),
-            null, null, null, null, null,
-            true,
-            null, null, null, null, null, null, null, null
-        ),
-        null, null, null, null, null, null, null, null, null, null, null
-    );
-  }
-
-  private KafkaSupervisorTuningConfig createTuningConfig()
-  {
-    return new KafkaSupervisorTuningConfig(
-        null,
-        null, null, null,
-        1,
-        null, null, null, null, null, null, null, null, null, null,
-        null, null, null, null, null, null, null, null, null, null, null
-    );
+    return MoreResources.Supervisor.KAFKA_JSON
+        .get()
+        .withDataSchema(schema -> schema.withTimestamp(new 
TimestampSpec("timestamp", null, null)))
+        .withIoConfig(
+            ioConfig -> ioConfig
+                .withConsumerProperties(kafkaServer.consumerProperties())
+                .withInputFormat(new CsvInputFormat(List.of("timestamp", 
"item"), null, null, false, 0, false))
+        )
+        .withTuningConfig(tuningConfig -> 
tuningConfig.withMaxRowsPerSegment(1))
+        .build(dataSource, topic);
   }
 
   private List<ProducerRecord<byte[], byte[]>> generateRecordsForTopic(
diff --git 
a/embedded-tests/src/test/java/org/apache/druid/testing/embedded/indexing/KafkaClusterMetricsTest.java
 
b/embedded-tests/src/test/java/org/apache/druid/testing/embedded/indexing/KafkaClusterMetricsTest.java
index 2cf03e99ce9..5a977b861e8 100644
--- 
a/embedded-tests/src/test/java/org/apache/druid/testing/embedded/indexing/KafkaClusterMetricsTest.java
+++ 
b/embedded-tests/src/test/java/org/apache/druid/testing/embedded/indexing/KafkaClusterMetricsTest.java
@@ -20,24 +20,17 @@
 package org.apache.druid.testing.embedded.indexing;
 
 import org.apache.druid.client.coordinator.CoordinatorClient;
-import org.apache.druid.data.input.impl.DimensionsSpec;
-import org.apache.druid.data.input.impl.JsonInputFormat;
 import org.apache.druid.data.input.impl.TimestampSpec;
 import org.apache.druid.emitter.kafka.KafkaEmitter;
 import org.apache.druid.emitter.kafka.KafkaEmitterModule;
-import org.apache.druid.indexer.granularity.UniformGranularitySpec;
 import org.apache.druid.indexing.compact.CompactionSupervisorSpec;
 import org.apache.druid.indexing.kafka.KafkaIndexTaskModule;
 import org.apache.druid.indexing.kafka.simulate.KafkaResource;
-import org.apache.druid.indexing.kafka.supervisor.KafkaSupervisorIOConfig;
 import org.apache.druid.indexing.kafka.supervisor.KafkaSupervisorSpec;
-import org.apache.druid.indexing.kafka.supervisor.KafkaSupervisorTuningConfig;
 import org.apache.druid.indexing.overlord.Segments;
-import org.apache.druid.java.util.common.granularity.Granularities;
 import org.apache.druid.query.DruidMetrics;
 import org.apache.druid.rpc.UpdateResponse;
 import org.apache.druid.rpc.indexing.OverlordClient;
-import org.apache.druid.segment.indexing.DataSchema;
 import org.apache.druid.server.DruidNode;
 import org.apache.druid.server.coordinator.ClusterCompactionConfig;
 import org.apache.druid.server.coordinator.CoordinatorDynamicConfig;
@@ -143,16 +136,12 @@ public class KafkaClusterMetricsTest extends 
EmbeddedClusterTestBase
     final int expectedSegmentsHandedOff = 10;
 
     final int taskCount = 5;
-    final int taskDurationMillis = 1_000;
-    final int taskCompletionTimeoutMillis = 10_000;
 
     // Submit and start a supervisor
     final String supervisorId = dataSource + "_supe";
     final KafkaSupervisorSpec kafkaSupervisorSpec = createKafkaSupervisor(
         supervisorId,
         taskCount,
-        taskDurationMillis,
-        taskCompletionTimeoutMillis,
         maxRowsPerSegment
     );
 
@@ -194,16 +183,12 @@ public class KafkaClusterMetricsTest extends 
EmbeddedClusterTestBase
     final int compactedMaxRowsPerSegment = 5000;
 
     final int taskCount = 2;
-    final int taskDurationMillis = 500;
-    final int taskCompletionTimeoutMillis = 5_000;
 
     // Submit and start a supervisor
     final String supervisorId = dataSource + "_supe";
     final KafkaSupervisorSpec kafkaSupervisorSpec = createKafkaSupervisor(
         supervisorId,
         taskCount,
-        taskDurationMillis,
-        taskCompletionTimeoutMillis,
         maxRowsPerSegment
     );
     cluster.callApi().postSupervisor(kafkaSupervisorSpec);
@@ -315,16 +300,12 @@ public class KafkaClusterMetricsTest extends 
EmbeddedClusterTestBase
     final int compactedMaxRowsPerSegment = 5000;
 
     final int taskCount = 2;
-    final int taskDurationMillis = 500;
-    final int taskCompletionTimeoutMillis = 5_000;
 
     // Submit and start a supervisor
     final String supervisorId = dataSource + "_supe";
     final KafkaSupervisorSpec kafkaSupervisorSpec = createKafkaSupervisor(
         supervisorId,
         taskCount,
-        taskDurationMillis,
-        taskCompletionTimeoutMillis,
         maxRowsPerSegment
     );
     cluster.callApi().postSupervisor(kafkaSupervisorSpec);
@@ -382,7 +363,7 @@ public class KafkaClusterMetricsTest extends 
EmbeddedClusterTestBase
    * SELECTs the total count of the given metric in the {@link #dataSource} and
    * verifies it against the metrics actually emitted by the server.
    */
-  private void verifyIngestedMetricCountMatchesEmittedCount(String metricName, 
EmbeddedDruidServer server)
+  private void verifyIngestedMetricCountMatchesEmittedCount(String metricName, 
EmbeddedDruidServer<?> server)
   {
     // Get the value of the metric from the datasource
     final DruidNode selfNode = server.bindings().selfNode();
@@ -404,52 +385,19 @@ public class KafkaClusterMetricsTest extends 
EmbeddedClusterTestBase
   private KafkaSupervisorSpec createKafkaSupervisor(
       String supervisorId,
       int taskCount,
-      int taskDurationMillis,
-      int taskCompletionTimeoutMillis,
       int maxRowsPerSegment
   )
   {
-    final Period startDelay = Period.millis(10);
-    final Period supervisorRunPeriod = Period.millis(500);
-    final boolean useEarliestOffset = true;
-
-    return new KafkaSupervisorSpec(
-        supervisorId,
-        null,
-        DataSchema.builder()
-                  .withDataSource(dataSource)
-                  .withTimestamp(new TimestampSpec("timestamp", "iso", null))
-                  .withGranularity(new 
UniformGranularitySpec(Granularities.HOUR, null, null))
-                  .withDimensions(DimensionsSpec.EMPTY)
-                  .build(),
-        createTuningConfig(maxRowsPerSegment),
-        new KafkaSupervisorIOConfig(
-            TOPIC,
-            null,
-            new JsonInputFormat(null, null, null, null, null),
-            null,
-            taskCount,
-            Period.millis(taskDurationMillis),
-            kafkaServer.consumerProperties(),
-            null, null, null,
-            startDelay,
-            supervisorRunPeriod,
-            useEarliestOffset,
-            Period.millis(taskCompletionTimeoutMillis),
-            null, null, null, null, null, null, null
-        ),
-        null, null, null, null, null, null, null, null, null, null, null
-    );
-  }
-
-  private KafkaSupervisorTuningConfig createTuningConfig(int maxRowsPerSegment)
-  {
-    return new KafkaSupervisorTuningConfig(
-        null,
-        null, null, null,
-        maxRowsPerSegment,
-        null, null, null, null, null, null, null, null, null, null,
-        null, null, null, null, null, null, null, null, null, null, null
-    );
+    return MoreResources.Supervisor.KAFKA_JSON
+        .get()
+        .withDataSchema(schema -> schema.withTimestamp(new 
TimestampSpec("timestamp", "iso", null)))
+        .withTuningConfig(tuningConfig -> 
tuningConfig.withMaxRowsPerSegment(maxRowsPerSegment))
+        .withIoConfig(
+            ioConfig -> ioConfig
+                .withConsumerProperties(kafkaServer.consumerProperties())
+                .withTaskCount(taskCount)
+        )
+        .withId(supervisorId)
+        .build(dataSource, TOPIC);
   }
 }
diff --git 
a/embedded-tests/src/test/java/org/apache/druid/testing/embedded/indexing/KafkaDataFormatsTest.java
 
b/embedded-tests/src/test/java/org/apache/druid/testing/embedded/indexing/KafkaDataFormatsTest.java
index c8fff7347d5..7603700a55d 100644
--- 
a/embedded-tests/src/test/java/org/apache/druid/testing/embedded/indexing/KafkaDataFormatsTest.java
+++ 
b/embedded-tests/src/test/java/org/apache/druid/testing/embedded/indexing/KafkaDataFormatsTest.java
@@ -44,13 +44,10 @@ import 
org.apache.druid.data.input.protobuf.ProtobufInputRowParser;
 import 
org.apache.druid.data.input.protobuf.SchemaRegistryBasedProtobufBytesDecoder;
 import org.apache.druid.indexing.kafka.KafkaIndexTaskModule;
 import org.apache.druid.indexing.kafka.simulate.KafkaResource;
-import org.apache.druid.indexing.kafka.supervisor.KafkaSupervisorIOConfig;
 import org.apache.druid.indexing.kafka.supervisor.KafkaSupervisorSpec;
-import org.apache.druid.indexing.kafka.supervisor.KafkaSupervisorTuningConfig;
 import org.apache.druid.indexing.overlord.supervisor.SupervisorSpec;
 import org.apache.druid.java.util.common.parsers.JSONPathSpec;
 import org.apache.druid.query.DruidMetrics;
-import org.apache.druid.segment.indexing.DataSchema;
 import org.apache.druid.testing.embedded.EmbeddedBroker;
 import org.apache.druid.testing.embedded.EmbeddedCoordinator;
 import org.apache.druid.testing.embedded.EmbeddedDruidCluster;
@@ -581,70 +578,38 @@ public class KafkaDataFormatsTest extends 
EmbeddedClusterTestBase
         parser,
         new TypeReference<>() {}
     );
-    return new KafkaSupervisorSpec(
-        supervisorId,
-        null,
-        DataSchema.builder()
-                  .withDataSource(dataSource)
-                  .withTimestamp(new TimestampSpec("timestamp", null, null))
-                  .withDimensions(DimensionsSpec.EMPTY)
-                  .withParserMap(parserMap)
-                  .build(),
-        createTuningConfig(),
-        new KafkaSupervisorIOConfig(
-            topic,
-            null,
-            null,
-            null, null,
-            Period.millis(100),
-            kafkaServer.consumerProperties(),
-            null, null, null,
-            Period.millis(10),
-            Period.millis(10),
-            true,
-            null, null, null, null, null, null, null, null
-        ),
-        null, null, null, null, null, null, null, null, null, null, null
-    );
+    return MoreResources.Supervisor.KAFKA_JSON
+        .get()
+        .withDataSchema(
+            schema -> schema
+                .withTimestamp(new TimestampSpec("timestamp", null, null))
+                .withParserMap(parserMap)
+        )
+        .withIoConfig(
+            ioConfig -> ioConfig
+                .withInputFormat(null)
+                .withConsumerProperties(kafkaServer.consumerProperties())
+                .withSupervisorRunPeriod(Period.millis(10))
+        )
+        .withTuningConfig(tuningConfig -> 
tuningConfig.withMaxRowsPerSegment(1))
+        .withId(supervisorId)
+        .build(dataSource, topic);
   }
 
   private KafkaSupervisorSpec createKafkaSupervisor(String supervisorId, 
String topic, InputFormat inputFormat)
   {
-    return new KafkaSupervisorSpec(
-        supervisorId,
-        null,
-        DataSchema.builder()
-                  .withDataSource(dataSource)
-                  .withTimestamp(new TimestampSpec("timestamp", null, null))
-                  .withDimensions(DimensionsSpec.EMPTY)
-                  .build(),
-        createTuningConfig(),
-        new KafkaSupervisorIOConfig(
-            topic,
-            null,
-            inputFormat,
-            null, null,
-            Period.millis(100),
-            kafkaServer.consumerProperties(),
-            null, null, null,
-            Period.millis(10),
-            Period.millis(10),
-            true,
-            null, null, null, null, null, null, null, null
-        ),
-        null, null, null, null, null, null, null, null, null, null, null
-    );
-  }
-
-  private KafkaSupervisorTuningConfig createTuningConfig()
-  {
-    return new KafkaSupervisorTuningConfig(
-        null,
-        null, null, null,
-        1,
-        null, null, null, null, null, null, null, null, null, null,
-        null, null, null, null, null, null, null, null, null, null, null
-    );
+    return MoreResources.Supervisor.KAFKA_JSON
+        .get()
+        .withDataSchema(schema -> schema.withTimestamp(new 
TimestampSpec("timestamp", null, null)))
+        .withIoConfig(
+            ioConfig -> ioConfig
+                .withInputFormat(inputFormat)
+                .withConsumerProperties(kafkaServer.consumerProperties())
+                .withSupervisorRunPeriod(Period.millis(10))
+        )
+        .withTuningConfig(tuningConfig -> 
tuningConfig.withMaxRowsPerSegment(1))
+        .withId(supervisorId)
+        .build(dataSource, topic);
   }
 
   private Map<String, Object> createWikipediaAvroSchemaMap()
diff --git 
a/embedded-tests/src/test/java/org/apache/druid/testing/embedded/indexing/MoreResources.java
 
b/embedded-tests/src/test/java/org/apache/druid/testing/embedded/indexing/MoreResources.java
index 1228da0ab83..b50d4f2e601 100644
--- 
a/embedded-tests/src/test/java/org/apache/druid/testing/embedded/indexing/MoreResources.java
+++ 
b/embedded-tests/src/test/java/org/apache/druid/testing/embedded/indexing/MoreResources.java
@@ -19,12 +19,17 @@
 
 package org.apache.druid.testing.embedded.indexing;
 
+import org.apache.druid.data.input.impl.DimensionsSpec;
+import org.apache.druid.indexer.granularity.UniformGranularitySpec;
 import org.apache.druid.indexing.common.task.TaskBuilder;
+import org.apache.druid.indexing.kafka.supervisor.KafkaSupervisorSpecBuilder;
+import org.apache.druid.java.util.common.granularity.Granularities;
 import org.apache.druid.query.aggregation.CountAggregatorFactory;
 import org.apache.druid.query.aggregation.DoubleSumAggregatorFactory;
 import 
org.apache.druid.query.aggregation.datasketches.hll.HllSketchBuildAggregatorFactory;
 import 
org.apache.druid.query.aggregation.datasketches.quantiles.DoublesSketchAggregatorFactory;
 import 
org.apache.druid.query.aggregation.datasketches.theta.SketchMergeAggregatorFactory;
+import org.joda.time.Period;
 
 import java.util.function.Supplier;
 
@@ -81,6 +86,33 @@ public class MoreResources
             .appendToExisting(false);
   }
 
+  /**
+   * Supervisor spec builder.
+   */
+  public static class Supervisor
+  {
+    /**
+     * A minimal Kafka supervisor spec that auto-discovers dimensions, has a 
task
+     * duration of 500ms and creates HOUR-ly segments.
+     */
+    public static Supplier<KafkaSupervisorSpecBuilder> KAFKA_JSON = () ->
+        new KafkaSupervisorSpecBuilder()
+            .withDataSchema(
+                schema -> schema
+                    .withDimensions(DimensionsSpec.EMPTY)
+                    .withGranularity(new 
UniformGranularitySpec(Granularities.HOUR, null, null))
+            )
+            .withIoConfig(
+                ioConfig -> ioConfig
+                    .withJsonInputFormat()
+                    .withTaskDuration(Period.millis(500))
+                    .withStartDelay(Period.millis(10))
+                    .withSupervisorRunPeriod(Period.millis(500))
+                    .withUseEarliestSequenceNumber(true)
+                    .withCompletionTimeout(Period.seconds(5))
+            );
+  }
+
   public static class ProbufData
   {
     public static final String WIKI_PROTOBUF_BYTES_DECODER_RESOURCE = 
"data/protobuf/wikipedia.desc";
diff --git 
a/embedded-tests/src/test/java/org/apache/druid/testing/embedded/msq/BaseRealtimeQueryTest.java
 
b/embedded-tests/src/test/java/org/apache/druid/testing/embedded/msq/BaseRealtimeQueryTest.java
index 0d82c6f7850..1934786afda 100644
--- 
a/embedded-tests/src/test/java/org/apache/druid/testing/embedded/msq/BaseRealtimeQueryTest.java
+++ 
b/embedded-tests/src/test/java/org/apache/druid/testing/embedded/msq/BaseRealtimeQueryTest.java
@@ -22,14 +22,12 @@ package org.apache.druid.testing.embedded.msq;
 import com.fasterxml.jackson.core.JsonProcessingException;
 import it.unimi.dsi.fastutil.bytes.ByteArrays;
 import org.apache.druid.data.input.impl.DimensionsSpec;
-import org.apache.druid.data.input.impl.JsonInputFormat;
 import org.apache.druid.data.input.impl.TimestampSpec;
 import org.apache.druid.frame.testutil.FrameTestUtil;
 import org.apache.druid.indexer.TaskStatusPlus;
 import org.apache.druid.indexer.granularity.UniformGranularitySpec;
 import org.apache.druid.indexing.kafka.KafkaIndexTaskModule;
 import org.apache.druid.indexing.kafka.simulate.KafkaResource;
-import org.apache.druid.indexing.kafka.supervisor.KafkaSupervisorIOConfig;
 import org.apache.druid.indexing.kafka.supervisor.KafkaSupervisorSpec;
 import org.apache.druid.java.util.common.granularity.Granularities;
 import org.apache.druid.java.util.common.parsers.CloseableIterator;
@@ -37,9 +35,9 @@ import org.apache.druid.segment.QueryableIndex;
 import org.apache.druid.segment.QueryableIndexCursorFactory;
 import org.apache.druid.segment.TestHelper;
 import org.apache.druid.segment.column.RowSignature;
-import org.apache.druid.segment.indexing.DataSchema;
 import org.apache.druid.testing.embedded.EmbeddedClusterApis;
 import org.apache.druid.testing.embedded.EmbeddedDruidCluster;
+import org.apache.druid.testing.embedded.indexing.MoreResources;
 import org.apache.druid.testing.embedded.junit5.EmbeddedClusterTestBase;
 import org.apache.kafka.clients.producer.ProducerRecord;
 import org.joda.time.Period;
@@ -145,54 +143,20 @@ public class BaseRealtimeQueryTest extends 
EmbeddedClusterTestBase
 
   private KafkaSupervisorSpec createKafkaSupervisor()
   {
-    final Period startDelay = Period.millis(10);
-    final Period supervisorRunPeriod = Period.millis(500);
-    final boolean useEarliestOffset = true;
-
-    return new KafkaSupervisorSpec(
-        dataSource,
-        null,
-        DataSchema.builder()
-                  .withDataSource(dataSource)
-                  .withTimestamp(new TimestampSpec("__time", "auto", null))
-                  .withGranularity(new 
UniformGranularitySpec(Granularities.DAY, null, null))
-                  
.withDimensions(DimensionsSpec.builder().useSchemaDiscovery(true).build())
-                  .build(),
-        null,
-        new KafkaSupervisorIOConfig(
-            topic,
-            null,
-            new JsonInputFormat(null, null, null, null, null),
-            null,
-            TASK_COUNT,
-            TASK_DURATION,
-            kafka.consumerProperties(),
-            null,
-            null,
-            null,
-            startDelay,
-            supervisorRunPeriod,
-            useEarliestOffset,
-            null,
-            null,
-            null,
-            null,
-            null,
-            null,
-            null,
-            null
-        ),
-        null,
-        null,
-        null,
-        null,
-        null,
-        null,
-        null,
-        null,
-        null,
-        null,
-        null
-    );
+    return MoreResources.Supervisor.KAFKA_JSON
+        .get()
+        .withDataSchema(
+            schema -> schema
+                .withTimestamp(new TimestampSpec("__time", "auto", null))
+                .withGranularity(new UniformGranularitySpec(Granularities.DAY, 
null, null))
+                
.withDimensions(DimensionsSpec.builder().useSchemaDiscovery(true).build())
+        )
+        .withIoConfig(
+            ioConfig -> ioConfig
+                .withTaskCount(TASK_COUNT)
+                .withTaskDuration(TASK_DURATION)
+                .withConsumerProperties(kafka.consumerProperties())
+        )
+        .build(dataSource, topic);
   }
 }
diff --git 
a/embedded-tests/src/test/java/org/apache/druid/testing/embedded/server/OverlordClientTest.java
 
b/embedded-tests/src/test/java/org/apache/druid/testing/embedded/server/OverlordClientTest.java
index 36de4c072a7..5cb0398caed 100644
--- 
a/embedded-tests/src/test/java/org/apache/druid/testing/embedded/server/OverlordClientTest.java
+++ 
b/embedded-tests/src/test/java/org/apache/druid/testing/embedded/server/OverlordClientTest.java
@@ -25,12 +25,11 @@ import com.google.common.util.concurrent.ListenableFuture;
 import org.apache.druid.client.indexing.IndexingTotalWorkerCapacityInfo;
 import org.apache.druid.client.indexing.IndexingWorkerInfo;
 import org.apache.druid.common.utils.IdUtils;
-import org.apache.druid.data.input.impl.JsonInputFormat;
+import org.apache.druid.data.input.impl.TimestampSpec;
 import org.apache.druid.indexer.TaskState;
 import org.apache.druid.indexer.TaskStatus;
 import org.apache.druid.indexer.TaskStatusPlus;
 import org.apache.druid.indexing.common.task.NoopTask;
-import org.apache.druid.indexing.kafka.supervisor.KafkaSupervisorIOConfig;
 import org.apache.druid.indexing.kafka.supervisor.KafkaSupervisorSpec;
 import org.apache.druid.indexing.overlord.supervisor.SupervisorStatus;
 import org.apache.druid.java.util.common.Intervals;
@@ -40,11 +39,11 @@ import org.apache.druid.rpc.HttpResponseException;
 import org.apache.druid.rpc.indexing.OverlordClient;
 import org.apache.druid.rpc.indexing.SegmentUpdateResponse;
 import org.apache.druid.segment.TestDataSource;
-import org.apache.druid.segment.indexing.DataSchema;
 import org.apache.druid.server.http.SegmentsToUpdateFilter;
 import org.apache.druid.testing.embedded.EmbeddedDruidCluster;
 import org.apache.druid.testing.embedded.EmbeddedIndexer;
 import org.apache.druid.testing.embedded.EmbeddedOverlord;
+import org.apache.druid.testing.embedded.indexing.MoreResources;
 import org.apache.druid.testing.embedded.junit5.EmbeddedClusterTestBase;
 import org.apache.druid.timeline.SegmentId;
 import org.junit.jupiter.api.Assertions;
@@ -213,22 +212,11 @@ public class OverlordClientTest extends 
EmbeddedClusterTestBase
   @Test
   public void test_postSupervisor_fails_ifRequiredExtensionIsNotLoaded()
   {
-    final KafkaSupervisorSpec kafkaSupervisor = new KafkaSupervisorSpec(
-        null,
-        null,
-        DataSchema.builder().withDataSource(dataSource).build(),
-        null,
-        new KafkaSupervisorIOConfig(
-            "topic",
-            null,
-            new JsonInputFormat(null, null, null, null, null),
-            null, null, null,
-            Map.of("bootstrap.servers", "localhost:9092"),
-            null, null, null, null, null, null, null, null, null, null, null, 
null, null, null
-        ),
-        Map.of(),
-        null, null, null, null, null, null, null, null, null, null
-    );
+    final KafkaSupervisorSpec kafkaSupervisor = 
MoreResources.Supervisor.KAFKA_JSON
+        .get()
+        .withDataSchema(schema -> schema.withTimestamp(new TimestampSpec(null, 
null, null)))
+        .withIoConfig(ioConfig -> 
ioConfig.withConsumerProperties(Map.of("bootstrap.servers", "localhost")))
+        .build(dataSource, "topic");
 
     final Exception exception = Assertions.assertThrows(
         Exception.class,
diff --git 
a/extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/indexing/kafka/supervisor/KafkaSupervisorSpec.java
 
b/extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/indexing/kafka/supervisor/KafkaSupervisorSpec.java
index f89a6a1f965..b607ade1acf 100644
--- 
a/extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/indexing/kafka/supervisor/KafkaSupervisorSpec.java
+++ 
b/extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/indexing/kafka/supervisor/KafkaSupervisorSpec.java
@@ -24,6 +24,7 @@ import com.fasterxml.jackson.annotation.JsonCreator;
 import com.fasterxml.jackson.annotation.JsonIgnore;
 import com.fasterxml.jackson.annotation.JsonProperty;
 import com.fasterxml.jackson.databind.ObjectMapper;
+import org.apache.druid.common.config.Configs;
 import org.apache.druid.error.DruidException;
 import org.apache.druid.error.InvalidInput;
 import org.apache.druid.guice.annotations.Json;
@@ -74,14 +75,9 @@ public class KafkaSupervisorSpec extends 
SeekableStreamSupervisorSpec
   {
     super(
         id,
-        ingestionSchema != null
-        ? ingestionSchema
-        : new KafkaSupervisorIngestionSpec(
-            dataSchema,
-            ioConfig,
-            tuningConfig != null
-            ? tuningConfig
-            : KafkaSupervisorTuningConfig.defaultConfig()
+        Configs.valueOrDefault(
+            ingestionSchema,
+            new KafkaSupervisorIngestionSpec(dataSchema, ioConfig, 
tuningConfig)
         ),
         context,
         suspended,
diff --git 
a/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/KafkaIndexTaskTuningConfigTest.java
 
b/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/KafkaIndexTaskTuningConfigTest.java
index 132b65a15cb..54ae10080a7 100644
--- 
a/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/KafkaIndexTaskTuningConfigTest.java
+++ 
b/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/KafkaIndexTaskTuningConfigTest.java
@@ -22,6 +22,7 @@ package org.apache.druid.indexing.kafka;
 import com.fasterxml.jackson.databind.ObjectMapper;
 import nl.jqno.equalsverifier.EqualsVerifier;
 import org.apache.druid.indexing.kafka.supervisor.KafkaSupervisorTuningConfig;
+import org.apache.druid.indexing.kafka.supervisor.KafkaTuningConfigBuilder;
 import 
org.apache.druid.indexing.kafka.test.TestModifiedKafkaIndexTaskTuningConfig;
 import org.apache.druid.jackson.DefaultObjectMapper;
 import org.apache.druid.segment.IndexSpec;
@@ -70,7 +71,7 @@ public class KafkaIndexTaskTuningConfigTest
     Assert.assertEquals(0, config.getMaxPendingPersists());
     Assert.assertEquals(IndexSpec.DEFAULT, config.getIndexSpec());
     Assert.assertEquals(IndexSpec.DEFAULT, 
config.getIndexSpecForIntermediatePersists());
-    Assert.assertEquals(false, config.isReportParseExceptions());
+    Assert.assertFalse(config.isReportParseExceptions());
     Assert.assertEquals(Duration.ofMinutes(15).toMillis(), 
config.getHandoffConditionTimeout());
     Assert.assertEquals(1, config.getNumPersistThreads());
     Assert.assertEquals(-1, config.getMaxColumnsToMerge());
@@ -130,34 +131,19 @@ public class KafkaIndexTaskTuningConfigTest
   @Test
   public void testConvert()
   {
-    KafkaSupervisorTuningConfig original = new KafkaSupervisorTuningConfig(
-        null,
-        1,
-        null,
-        null,
-        2,
-        10L,
-        new Period("PT3S"),
-        4,
-        IndexSpec.DEFAULT,
-        IndexSpec.DEFAULT,
-        true,
-        5L,
-        null,
-        null,
-        null,
-        null,
-        null,
-        null,
-        null,
-        null,
-        null,
-        null,
-        null,
-        2,
-        5,
-        false
-    );
+    KafkaSupervisorTuningConfig original = new KafkaTuningConfigBuilder()
+        .withIntermediatePersistPeriod(new Period("PT3S"))
+        .withHandoffConditionTimeout(5L)
+        .withNumPersistThreads(2)
+        .withMaxRowsInMemory(1)
+        .withMaxRowsPerSegment(2)
+        .withMaxTotalRows(10L)
+        .withMaxPendingPersists(4)
+        .withIndexSpec(IndexSpec.DEFAULT)
+        .withIndexSpecForIntermediatePersists(IndexSpec.DEFAULT)
+        .withReportParseExceptions(true)
+        .withMaxColumnsToMerge(5)
+        .build();
     KafkaIndexTaskTuningConfig copy = original.convertToTaskTuningConfig();
 
     Assert.assertEquals(original.getAppendableIndexSpec(), 
copy.getAppendableIndexSpec());
@@ -169,7 +155,7 @@ public class KafkaIndexTaskTuningConfigTest
     Assert.assertNull(copy.getBasePersistDirectory());
     Assert.assertEquals(4, copy.getMaxPendingPersists());
     Assert.assertEquals(IndexSpec.DEFAULT, copy.getIndexSpec());
-    Assert.assertEquals(true, copy.isReportParseExceptions());
+    Assert.assertTrue(copy.isReportParseExceptions());
     Assert.assertEquals(5L, copy.getHandoffConditionTimeout());
     Assert.assertEquals(2, copy.getNumPersistThreads());
     Assert.assertEquals(5, copy.getMaxColumnsToMerge());
@@ -207,7 +193,7 @@ public class KafkaIndexTaskTuningConfigTest
     TestModifiedKafkaIndexTaskTuningConfig deserialized =
         mapper.readValue(serialized, 
TestModifiedKafkaIndexTaskTuningConfig.class);
 
-    Assert.assertEquals(null, deserialized.getExtra());
+    Assert.assertNull(deserialized.getExtra());
     Assert.assertEquals(base.getAppendableIndexSpec(), 
deserialized.getAppendableIndexSpec());
     Assert.assertEquals(base.getMaxRowsInMemory(), 
deserialized.getMaxRowsInMemory());
     Assert.assertEquals(base.getMaxBytesInMemory(), 
deserialized.getMaxBytesInMemory());
diff --git 
a/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/KafkaSamplerSpecTest.java
 
b/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/KafkaSamplerSpecTest.java
index 5e21fd0aa7f..872752b7927 100644
--- 
a/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/KafkaSamplerSpecTest.java
+++ 
b/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/KafkaSamplerSpecTest.java
@@ -34,10 +34,9 @@ import org.apache.druid.data.input.impl.LongDimensionSchema;
 import org.apache.druid.data.input.impl.StringDimensionSchema;
 import org.apache.druid.data.input.impl.StringInputRowParser;
 import org.apache.druid.data.input.impl.TimestampSpec;
-import org.apache.druid.data.input.kafkainput.KafkaInputFormat;
 import org.apache.druid.indexer.granularity.UniformGranularitySpec;
-import org.apache.druid.indexing.kafka.supervisor.KafkaSupervisorIOConfig;
 import org.apache.druid.indexing.kafka.supervisor.KafkaSupervisorSpec;
+import org.apache.druid.indexing.kafka.supervisor.KafkaSupervisorSpecBuilder;
 import org.apache.druid.indexing.kafka.test.TestBroker;
 import org.apache.druid.indexing.overlord.sampler.InputSourceSampler;
 import org.apache.druid.indexing.overlord.sampler.SamplerConfig;
@@ -65,12 +64,12 @@ import org.junit.Rule;
 import org.junit.Test;
 import org.junit.rules.ExpectedException;
 
-import java.io.IOException;
 import java.util.Arrays;
 import java.util.Collections;
 import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
+import java.util.function.Consumer;
 import java.util.regex.Pattern;
 
 public class KafkaSamplerSpecTest extends InitializedNullHandlingTest
@@ -80,9 +79,10 @@ public class KafkaSamplerSpecTest extends 
InitializedNullHandlingTest
 
   private static final ObjectMapper OBJECT_MAPPER = 
TestHelper.makeJsonMapper();
   private static final String TOPIC = "sampling";
-  private static final DataSchema DATA_SCHEMA =
-      DataSchema.builder()
-                .withDataSource("test_ds")
+  private static final String DATASOURCE = "test_ds";
+  private static final Consumer<DataSchema.Builder> DATA_SCHEMA =
+      schema ->
+          schema.withDataSource(DATASOURCE)
                 .withTimestamp(new TimestampSpec("timestamp", "iso", null))
                 .withDimensions(
                     new StringDimensionSchema("dim1"),
@@ -97,13 +97,13 @@ public class KafkaSamplerSpecTest extends 
InitializedNullHandlingTest
                 )
                 .withGranularity(
                     new UniformGranularitySpec(Granularities.DAY, 
Granularities.NONE, null)
-                )
-                .build();
+                );
 
-  private static final DataSchema DATA_SCHEMA_KAFKA_TIMESTAMP =
-      DataSchema.builder(DATA_SCHEMA)
-                .withTimestamp(new TimestampSpec("kafka.timestamp", "iso", 
null))
-                .build();
+  private static final Consumer<DataSchema.Builder> 
DATA_SCHEMA_KAFKA_TIMESTAMP =
+      schema -> {
+        DATA_SCHEMA.accept(schema);
+        schema.withTimestamp(new TimestampSpec("kafka.timestamp", "iso", 
null));
+      };
 
   private static TestingCluster zkServer;
   private static TestBroker kafkaServer;
@@ -142,46 +142,15 @@ public class KafkaSamplerSpecTest extends 
InitializedNullHandlingTest
   {
     insertData(generateRecords(TOPIC));
 
-    KafkaSupervisorSpec supervisorSpec = new KafkaSupervisorSpec(
-        null,
-        null,
-        DATA_SCHEMA,
-        null,
-        new KafkaSupervisorIOConfig(
-            TOPIC,
-            null,
-            new JsonInputFormat(JSONPathSpec.DEFAULT, null, null, null, null),
-            null,
-            null,
-            null,
-            kafkaServer.consumerProperties(),
-            null,
-            null,
-            null,
-            null,
-            null,
-            true,
-            null,
-            null,
-            null,
-            null,
-            null,
-            null,
-            null,
-            false
-        ),
-        null,
-        null,
-        null,
-        null,
-        null,
-        null,
-        null,
-        null,
-        null,
-        null,
-        null
-    );
+    KafkaSupervisorSpec supervisorSpec = new KafkaSupervisorSpecBuilder()
+        .withDataSchema(DATA_SCHEMA)
+        .withIoConfig(
+            ioConfig -> ioConfig
+                .withJsonInputFormat()
+                .withConsumerProperties(kafkaServer.consumerProperties())
+                .withUseEarliestSequenceNumber(true)
+        )
+        .build(DATASOURCE, TOPIC);
 
     KafkaSamplerSpec samplerSpec = new KafkaSamplerSpec(
         supervisorSpec,
@@ -198,46 +167,15 @@ public class KafkaSamplerSpecTest extends 
InitializedNullHandlingTest
   {
     insertData(generateRecords(TOPIC));
 
-    KafkaSupervisorSpec supervisorSpec = new KafkaSupervisorSpec(
-        null,
-        null,
-        DATA_SCHEMA,
-        null,
-        new KafkaSupervisorIOConfig(
-            null,
-            Pattern.quote(TOPIC),
-            new JsonInputFormat(JSONPathSpec.DEFAULT, null, null, null, null),
-            null,
-            null,
-            null,
-            kafkaServer.consumerProperties(),
-            null,
-            null,
-            null,
-            null,
-            null,
-            true,
-            null,
-            null,
-            null,
-            null,
-            null,
-            null,
-            null,
-            false
-        ),
-        null,
-        null,
-        null,
-        null,
-        null,
-        null,
-        null,
-        null,
-        null,
-        null,
-        null
-    );
+    KafkaSupervisorSpec supervisorSpec = new KafkaSupervisorSpecBuilder()
+        .withDataSchema(DATA_SCHEMA)
+        .withIoConfig(
+            ioConfig -> ioConfig
+                .withJsonInputFormat()
+                .withConsumerProperties(kafkaServer.consumerProperties())
+                .withUseEarliestSequenceNumber(true)
+        )
+        .buildWithTopicPattern(DATASOURCE, Pattern.quote(TOPIC));
 
     KafkaSamplerSpec samplerSpec = new KafkaSamplerSpec(
         supervisorSpec,
@@ -254,55 +192,15 @@ public class KafkaSamplerSpecTest extends 
InitializedNullHandlingTest
   {
     insertData(generateRecords(TOPIC));
 
-    KafkaSupervisorSpec supervisorSpec = new KafkaSupervisorSpec(
-        null,
-        null,
-        DATA_SCHEMA_KAFKA_TIMESTAMP,
-        null,
-        new KafkaSupervisorIOConfig(
-            TOPIC,
-            null,
-            new KafkaInputFormat(
-                null,
-                null,
-                new JsonInputFormat(JSONPathSpec.DEFAULT, null, null, null, 
null),
-                null,
-                null,
-                null,
-                null
-            ),
-
-            null,
-            null,
-            null,
-            kafkaServer.consumerProperties(),
-            null,
-            null,
-            null,
-            null,
-            null,
-            true,
-            null,
-            null,
-            null,
-            null,
-            null,
-            null,
-            null,
-            false
-        ),
-        null,
-        null,
-        null,
-        null,
-        null,
-        null,
-        null,
-        null,
-        null,
-        null,
-        null
-    );
+    KafkaSupervisorSpec supervisorSpec = new KafkaSupervisorSpecBuilder()
+        .withDataSchema(DATA_SCHEMA_KAFKA_TIMESTAMP)
+        .withIoConfig(
+            ioConfig -> ioConfig
+                .withKafkaInputFormat(new JsonInputFormat(null, null, null, 
null, null))
+                .withConsumerProperties(kafkaServer.consumerProperties())
+                .withUseEarliestSequenceNumber(true)
+        )
+        .build(DATASOURCE, TOPIC);
 
     KafkaSamplerSpec samplerSpec = new KafkaSamplerSpec(
         supervisorSpec,
@@ -340,7 +238,7 @@ public class KafkaSamplerSpecTest extends 
InitializedNullHandlingTest
   }
 
   @Test
-  public void testWithInputRowParser() throws IOException
+  public void testWithInputRowParser()
   {
     insertData(generateRecords(TOPIC));
 
@@ -357,59 +255,24 @@ public class KafkaSamplerSpecTest extends 
InitializedNullHandlingTest
     );
     InputRowParser parser = new StringInputRowParser(new 
JSONParseSpec(timestampSpec, dimensionsSpec, JSONPathSpec.DEFAULT, null, null), 
"UTF8");
 
-    DataSchema dataSchema = DataSchema.builder()
-                                      .withDataSource("test_ds")
-                                      .withParserMap(
-                                          
objectMapper.readValue(objectMapper.writeValueAsBytes(parser), Map.class)
-                                      )
-                                      .withAggregators(
-                                          new 
DoubleSumAggregatorFactory("met1sum", "met1"),
-                                          new CountAggregatorFactory("rows")
-                                      )
-                                      .withGranularity(new 
UniformGranularitySpec(Granularities.DAY, Granularities.NONE, null))
-                                      .withObjectMapper(objectMapper)
-                                      .build();
-
-    KafkaSupervisorSpec supervisorSpec = new KafkaSupervisorSpec(
-        null,
-        null,
-        dataSchema,
-        null,
-        new KafkaSupervisorIOConfig(
-            TOPIC,
-            null,
-            null,
-            null,
-            null,
-            null,
-            kafkaServer.consumerProperties(),
-            null,
-            null,
-            null,
-            null,
-            null,
-            true,
-            null,
-            null,
-            null,
-            null,
-            null,
-            null,
-            null,
-            false
-        ),
-        null,
-        null,
-        null,
-        null,
-        null,
-        null,
-        null,
-        null,
-        null,
-        null,
-        null
-    );
+    Consumer<DataSchema.Builder> dataSchema =
+        builder -> builder.withDataSource("test_ds")
+                          .withParserMap(objectMapper.convertValue(parser, 
Map.class))
+                          .withAggregators(
+                              new DoubleSumAggregatorFactory("met1sum", 
"met1"),
+                              new CountAggregatorFactory("rows")
+                          )
+                          .withGranularity(new 
UniformGranularitySpec(Granularities.DAY, Granularities.NONE, null))
+                          .withObjectMapper(objectMapper);
+
+    KafkaSupervisorSpec supervisorSpec = new KafkaSupervisorSpecBuilder()
+        .withDataSchema(dataSchema)
+        .withIoConfig(
+            ioConfig -> ioConfig
+                .withConsumerProperties(kafkaServer.consumerProperties())
+                .withUseEarliestSequenceNumber(true)
+        )
+        .build(DATASOURCE, TOPIC);
 
     KafkaSamplerSpec samplerSpec = new KafkaSamplerSpec(
         supervisorSpec,
@@ -554,49 +417,14 @@ public class KafkaSamplerSpecTest extends 
InitializedNullHandlingTest
   @Test
   public void testInvalidKafkaConfig()
   {
-    KafkaSupervisorSpec supervisorSpec = new KafkaSupervisorSpec(
-        null,
-        null,
-        DATA_SCHEMA,
-        null,
-        new KafkaSupervisorIOConfig(
-            TOPIC,
-            null,
-            new JsonInputFormat(JSONPathSpec.DEFAULT, null, null, null, null),
-            null,
-            null,
-            null,
-
-            // invalid bootstrap server
-            ImmutableMap.of("bootstrap.servers", "127.0.0.1"),
-
-            null,
-            null,
-            null,
-            null,
-            null,
-            true,
-            null,
-            null,
-            null,
-            null,
-            null,
-            null,
-            null,
-            false
-        ),
-        null,
-        null,
-        null,
-        null,
-        null,
-        null,
-        null,
-        null,
-        null,
-        null,
-        null
-    );
+    KafkaSupervisorSpec supervisorSpec = new KafkaSupervisorSpecBuilder()
+        .withIoConfig(
+            ioConfig -> ioConfig
+                .withJsonInputFormat()
+                .withConsumerProperties(Map.of("bootstrap.servers", "invalid"))
+                .withUseEarliestSequenceNumber(true)
+        )
+        .build(DATASOURCE, TOPIC);
 
     KafkaSamplerSpec samplerSpec = new KafkaSamplerSpec(
         supervisorSpec,
@@ -613,49 +441,14 @@ public class KafkaSamplerSpecTest extends 
InitializedNullHandlingTest
   @Test
   public void testGetInputSourceResources()
   {
-    KafkaSupervisorSpec supervisorSpec = new KafkaSupervisorSpec(
-        null,
-        null,
-        DATA_SCHEMA,
-        null,
-        new KafkaSupervisorIOConfig(
-            TOPIC,
-            null,
-            new JsonInputFormat(JSONPathSpec.DEFAULT, null, null, null, null),
-            null,
-            null,
-            null,
-
-            // invalid bootstrap server
-            ImmutableMap.of("bootstrap.servers", "127.0.0.1"),
-
-            null,
-            null,
-            null,
-            null,
-            null,
-            true,
-            null,
-            null,
-            null,
-            null,
-            null,
-            null,
-            null,
-            false
-        ),
-        null,
-        null,
-        null,
-        null,
-        null,
-        null,
-        null,
-        null,
-        null,
-        null,
-        null
-    );
+    KafkaSupervisorSpec supervisorSpec = new KafkaSupervisorSpecBuilder()
+        .withIoConfig(
+            ioConfig -> ioConfig
+                .withJsonInputFormat()
+                .withConsumerProperties(Map.of("bootstrap.servers", "invalid"))
+                .withUseEarliestSequenceNumber(true)
+        )
+        .build(DATASOURCE, TOPIC);
 
     KafkaSamplerSpec samplerSpec = new KafkaSamplerSpec(
         supervisorSpec,
diff --git 
a/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/simulate/EmbeddedKafkaSupervisorTest.java
 
b/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/simulate/EmbeddedKafkaSupervisorTest.java
index 058f3ffa736..a5918f33821 100644
--- 
a/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/simulate/EmbeddedKafkaSupervisorTest.java
+++ 
b/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/simulate/EmbeddedKafkaSupervisorTest.java
@@ -27,16 +27,14 @@ import org.apache.druid.data.input.impl.TimestampSpec;
 import org.apache.druid.indexer.TaskState;
 import org.apache.druid.indexer.TaskStatusPlus;
 import org.apache.druid.indexing.kafka.KafkaIndexTaskModule;
-import org.apache.druid.indexing.kafka.supervisor.KafkaSupervisorIOConfig;
 import org.apache.druid.indexing.kafka.supervisor.KafkaSupervisorSpec;
-import org.apache.druid.indexing.kafka.supervisor.KafkaSupervisorTuningConfig;
+import org.apache.druid.indexing.kafka.supervisor.KafkaSupervisorSpecBuilder;
 import org.apache.druid.indexing.overlord.supervisor.SupervisorStatus;
 import org.apache.druid.java.util.common.DateTimes;
 import org.apache.druid.java.util.common.StringUtils;
 import org.apache.druid.java.util.common.parsers.CloseableIterator;
 import org.apache.druid.metadata.LockFilterPolicy;
 import org.apache.druid.query.DruidMetrics;
-import org.apache.druid.segment.indexing.DataSchema;
 import org.apache.druid.testing.embedded.EmbeddedBroker;
 import org.apache.druid.testing.embedded.EmbeddedCoordinator;
 import org.apache.druid.testing.embedded.EmbeddedDruidCluster;
@@ -47,7 +45,6 @@ import 
org.apache.druid.testing.embedded.junit5.EmbeddedClusterTestBase;
 import org.apache.kafka.clients.producer.ProducerRecord;
 import org.joda.time.DateTime;
 import org.joda.time.Interval;
-import org.joda.time.Period;
 import org.junit.jupiter.api.Assertions;
 import org.junit.jupiter.api.Test;
 
@@ -147,39 +144,25 @@ public class EmbeddedKafkaSupervisorTest extends 
EmbeddedClusterTestBase
 
   private KafkaSupervisorSpec createKafkaSupervisor(String supervisorId, 
String topic)
   {
-    return new KafkaSupervisorSpec(
-        supervisorId,
-        null,
-        DataSchema.builder()
-                  .withDataSource(dataSource)
-                  .withTimestamp(new TimestampSpec("timestamp", null, null))
-                  .withDimensions(DimensionsSpec.EMPTY)
-                  .build(),
-        createTuningConfig(),
-        new KafkaSupervisorIOConfig(
-            topic,
-            null,
-            new CsvInputFormat(List.of("timestamp", "item"), null, null, 
false, 0, false),
-            null, null,
-            null,
-            kafkaServer.consumerProperties(),
-            null, null, null, null, null,
-            true,
-            null, null, null, null, null, null, null, null
-        ),
-        null, null, null, null, null, null, null, null, null, null, null
-    );
-  }
-
-  private KafkaSupervisorTuningConfig createTuningConfig()
-  {
-    return new KafkaSupervisorTuningConfig(
-        null,
-        null, null, null,
-        1,
-        null, null, null, null, null, null, null, null, null, null,
-        null, null, null, null, new Period("PT2S"), null, null, null, null, 
null, true
-    );
+    return new KafkaSupervisorSpecBuilder()
+        .withDataSchema(
+            schema -> schema
+                .withTimestamp(new TimestampSpec("timestamp", null, null))
+                .withDimensions(DimensionsSpec.EMPTY)
+        )
+        .withTuningConfig(
+            tuningConfig -> tuningConfig
+                .withMaxRowsPerSegment(1)
+                .withReleaseLocksOnHandoff(true)
+        )
+        .withIoConfig(
+            ioConfig -> ioConfig
+                .withInputFormat(new CsvInputFormat(List.of("timestamp", 
"item"), null, null, false, 0, false))
+                .withConsumerProperties(kafkaServer.consumerProperties())
+                .withUseEarliestSequenceNumber(true)
+        )
+        .withId(supervisorId)
+        .build(dataSource, topic);
   }
 
   private List<ProducerRecord<byte[], byte[]>> generateRecordsForTopic(
diff --git 
a/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/supervisor/KafkaIOConfigBuilder.java
 
b/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/supervisor/KafkaIOConfigBuilder.java
new file mode 100644
index 00000000000..1e0b12eb2a0
--- /dev/null
+++ 
b/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/supervisor/KafkaIOConfigBuilder.java
@@ -0,0 +1,96 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.indexing.kafka.supervisor;
+
+import org.apache.druid.data.input.InputFormat;
+import org.apache.druid.data.input.kafkainput.KafkaInputFormat;
+import 
org.apache.druid.indexing.seekablestream.supervisor.SupervisorIOConfigBuilder;
+
+import java.util.Map;
+
+/**
+ * Builder for {@link KafkaSupervisorIOConfig}.
+ */
+public class KafkaIOConfigBuilder extends 
SupervisorIOConfigBuilder<KafkaIOConfigBuilder, KafkaSupervisorIOConfig>
+{
+  private String topic;
+  private String topicPattern;
+  private Map<String, Object> consumerProperties;
+
+  public KafkaIOConfigBuilder withTopic(String topic)
+  {
+    this.topic = topic;
+    return this;
+  }
+
+  public KafkaIOConfigBuilder withTopicPattern(String topicPattern)
+  {
+    this.topicPattern = topicPattern;
+    return this;
+  }
+
+  public KafkaIOConfigBuilder withConsumerProperties(Map<String, Object> 
consumerProperties)
+  {
+    this.consumerProperties = consumerProperties;
+    return this;
+  }
+
+  public KafkaIOConfigBuilder withKafkaInputFormat(InputFormat valueFormat)
+  {
+    this.inputFormat = new KafkaInputFormat(
+        null,
+        null,
+        valueFormat,
+        null,
+        null,
+        null,
+        null
+    );
+    return this;
+  }
+
+  @Override
+  public KafkaSupervisorIOConfig build()
+  {
+    return new KafkaSupervisorIOConfig(
+        topic,
+        topicPattern,
+        inputFormat,
+        replicas,
+        taskCount,
+        taskDuration,
+        consumerProperties,
+        autoScalerConfig,
+        lagAggregator,
+        null,
+        startDelay,
+        period,
+        useEarliestSequenceNumber,
+        completionTimeout,
+        lateMessageRejectionPeriod,
+        earlyMessageRejectionPeriod,
+        lateMessageRejectionStartDateTime,
+        null,
+        idleConfig,
+        stopTaskCount,
+        null
+    );
+  }
+}
diff --git 
a/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/supervisor/KafkaSupervisorSpecBuilder.java
 
b/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/supervisor/KafkaSupervisorSpecBuilder.java
new file mode 100644
index 00000000000..ce36c0b7dc0
--- /dev/null
+++ 
b/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/supervisor/KafkaSupervisorSpecBuilder.java
@@ -0,0 +1,96 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.indexing.kafka.supervisor;
+
+import org.apache.druid.segment.indexing.DataSchema;
+
+import java.util.function.Consumer;
+
+/**
+ * Builder for a {@link KafkaSupervisorSpec}, which can be sent over a POST 
call
+ * to the Overlord.
+ */
+public class KafkaSupervisorSpecBuilder
+{
+  private String id;
+  private final DataSchema.Builder dataSchema = new DataSchema.Builder();
+  private final KafkaIOConfigBuilder ioConfig = new KafkaIOConfigBuilder();
+  private final KafkaTuningConfigBuilder tuningConfig = new 
KafkaTuningConfigBuilder();
+
+  public KafkaSupervisorSpecBuilder 
withDataSchema(Consumer<DataSchema.Builder> updateDataSchema)
+  {
+    updateDataSchema.accept(this.dataSchema);
+    return this;
+  }
+
+  public KafkaSupervisorSpecBuilder 
withTuningConfig(Consumer<KafkaTuningConfigBuilder> updateTuningConfig)
+  {
+    updateTuningConfig.accept(this.tuningConfig);
+    return this;
+  }
+
+  public KafkaSupervisorSpecBuilder 
withIoConfig(Consumer<KafkaIOConfigBuilder> updateIOConfig)
+  {
+    updateIOConfig.accept(this.ioConfig);
+    return this;
+  }
+
+  public KafkaSupervisorSpecBuilder withId(String id)
+  {
+    this.id = id;
+    return this;
+  }
+
+  /**
+   * Builds a new {@link KafkaSupervisorSpec} with the specified parameters.
+   */
+  public KafkaSupervisorSpec build(String dataSource, String topic)
+  {
+    dataSchema.withDataSource(dataSource);
+    ioConfig.withTopic(topic).withTopicPattern(null);
+    return build();
+  }
+
+  /**
+   * Builds a new {@link KafkaSupervisorSpec} which reads from multiple toppics
+   * that satisfy the given {@code topicPattern}.
+   */
+  public KafkaSupervisorSpec buildWithTopicPattern(String dataSource, String 
topicPattern)
+  {
+    dataSchema.withDataSource(dataSource);
+    ioConfig.withTopic(null).withTopicPattern(topicPattern);
+    return build();
+  }
+
+  private KafkaSupervisorSpec build()
+  {
+    return new KafkaSupervisorSpec(
+        id,
+        null,
+        dataSchema.build(),
+        tuningConfig.build(),
+        ioConfig.build(),
+        null,
+        false,
+        // Jackson injected params, not needed while posting a supervisor to 
the Overlord
+        null, null, null, null, null, null, null, null, null
+    );
+  }
+}
diff --git 
a/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/supervisor/KafkaSupervisorSpecTest.java
 
b/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/supervisor/KafkaSupervisorSpecTest.java
index d7bb9acfb7f..0e47869018f 100644
--- 
a/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/supervisor/KafkaSupervisorSpecTest.java
+++ 
b/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/supervisor/KafkaSupervisorSpecTest.java
@@ -20,9 +20,8 @@
 package org.apache.druid.indexing.kafka.supervisor;
 
 import com.fasterxml.jackson.databind.InjectableValues;
-import com.fasterxml.jackson.databind.Module;
 import com.fasterxml.jackson.databind.ObjectMapper;
-import org.apache.druid.data.input.impl.JsonInputFormat;
+import org.apache.druid.data.input.impl.TimestampSpec;
 import org.apache.druid.error.DruidException;
 import org.apache.druid.error.DruidExceptionMatcher;
 import org.apache.druid.indexer.granularity.UniformGranularitySpec;
@@ -35,7 +34,6 @@ import 
org.apache.druid.indexing.overlord.supervisor.SupervisorStateManagerConfi
 import org.apache.druid.jackson.DefaultObjectMapper;
 import org.apache.druid.java.util.common.StringUtils;
 import org.apache.druid.java.util.common.granularity.Granularities;
-import org.apache.druid.java.util.common.parsers.JSONPathSpec;
 import org.apache.druid.java.util.emitter.service.ServiceEmitter;
 import org.apache.druid.java.util.metrics.DruidMonitorSchedulerConfig;
 import org.apache.druid.math.expr.ExprMacroTable;
@@ -43,7 +41,6 @@ import org.apache.druid.metadata.TestSupervisorSpec;
 import org.apache.druid.query.aggregation.CountAggregatorFactory;
 import org.apache.druid.query.expression.LookupEnabledTestExprMacroTable;
 import org.apache.druid.segment.incremental.RowIngestionMetersFactory;
-import org.apache.druid.segment.indexing.DataSchema;
 import org.apache.druid.server.metrics.NoopServiceEmitter;
 import org.hamcrest.MatcherAssert;
 import org.junit.Assert;
@@ -74,7 +71,7 @@ public class KafkaSupervisorSpecTest
             .addValue(SupervisorStateManagerConfig.class, null)
             .addValue(ExprMacroTable.class.getName(), 
LookupEnabledTestExprMacroTable.INSTANCE)
     );
-    mapper.registerModules((Iterable<Module>) new 
KafkaIndexTaskModule().getJacksonModules());
+    mapper.registerModules(new KafkaIndexTaskModule().getJacksonModules());
   }
 
   @Test
@@ -676,95 +673,39 @@ public class KafkaSupervisorSpecTest
     );
 
     // Test valid spec update. This spec changes context vs the sourceSpec
-    KafkaSupervisorSpec validDestSpec = new KafkaSupervisorSpec(
-        null,
-        null,
-        DataSchema.builder().withDataSource("testDs").withAggregators(new 
CountAggregatorFactory("rows")).withGranularity(new 
UniformGranularitySpec(Granularities.DAY, Granularities.NONE, null)).build(),
-        null,
-        new KafkaSupervisorIOConfig(
-            "metrics",
-            null,
-            new JsonInputFormat(JSONPathSpec.DEFAULT, null, null, null, null),
-            null,
-            null,
-            null,
-            Map.of("bootstrap.servers", "localhost:9092"),
-            null,
-            null,
-            null,
-            null,
-            null,
-            true,
-            null,
-            null,
-            null,
-            null,
-            null,
-            null,
-            null,
-            false
-        ),
-        Map.of(
-            "key1",
-            "value1",
-            "key2",
-            "value2"
-        ),
-        null,
-        null,
-        null,
-        null,
-        null,
-        null,
-        null,
-        null,
-        null,
-        null
-    );
+    KafkaSupervisorSpec validDestSpec = new KafkaSupervisorSpecBuilder()
+        .withDataSchema(
+            schema -> schema
+                .withTimestamp(new TimestampSpec(null, null, null))
+                .withAggregators(new CountAggregatorFactory("rows"))
+                .withGranularity(new UniformGranularitySpec(Granularities.DAY, 
Granularities.NONE, null))
+        )
+        .withIoConfig(
+            ioConfig -> ioConfig
+                .withJsonInputFormat()
+                .withConsumerProperties(Map.of("bootstrap.servers", 
"localhost:9092"))
+        )
+        .build("testDs", "metrics");
     sourceSpec.validateSpecUpdateTo(validDestSpec);
   }
 
   private KafkaSupervisorSpec getSpec(String topic, String topicPattern)
   {
-    return new KafkaSupervisorSpec(
-      null,
-      null,
-      DataSchema.builder().withDataSource("testDs").withAggregators(new 
CountAggregatorFactory("rows")).withGranularity(new 
UniformGranularitySpec(Granularities.DAY, Granularities.NONE, null)).build(),
-      null,
-      new KafkaSupervisorIOConfig(
-          topic,
-          topicPattern,
-          new JsonInputFormat(JSONPathSpec.DEFAULT, null, null, null, null),
-          null,
-          null,
-          null,
-          Map.of("bootstrap.servers", "localhost:9092"),
-          null,
-          null,
-          null,
-          null,
-          null,
-          true,
-          null,
-          null,
-          null,
-          null,
-          null,
-          null,
-          null,
-          false
-      ),
-      null,
-      null,
-      null,
-      null,
-      null,
-      null,
-      null,
-      null,
-      null,
-      null,
-      null
-  );
+    KafkaSupervisorSpecBuilder builder = new KafkaSupervisorSpecBuilder()
+        .withDataSchema(
+            schema -> schema
+                .withTimestamp(new TimestampSpec(null, null, null))
+                .withAggregators(new CountAggregatorFactory("rows"))
+                .withGranularity(new UniformGranularitySpec(Granularities.DAY, 
Granularities.NONE, null))
+        )
+        .withIoConfig(
+            ioConfig -> ioConfig
+                .withJsonInputFormat()
+                .withConsumerProperties(Map.of("bootstrap.servers", 
"localhost:9092"))
+        );
+
+    return topic == null
+           ? builder.buildWithTopicPattern("testDs", topicPattern)
+           : builder.build("testDs", topic);
   }
 }
diff --git 
a/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/supervisor/KafkaSupervisorTest.java
 
b/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/supervisor/KafkaSupervisorTest.java
index 9b80d2db41a..3c30cc0e25c 100644
--- 
a/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/supervisor/KafkaSupervisorTest.java
+++ 
b/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/supervisor/KafkaSupervisorTest.java
@@ -152,7 +152,7 @@ public class KafkaSupervisorTest extends EasyMockSupport
   private static final String TOPIC_PREFIX = "testTopic";
   private static final String DATASOURCE = "testDS";
   private static final int NUM_PARTITIONS = 3;
-  private static final long TEST_CHAT_RETRIES = 9L;
+  private static final int TEST_CHAT_RETRIES = 9;
   private static final Period TEST_HTTP_TIMEOUT = new Period("PT10S");
   private static final Period TEST_SHUTDOWN_TIMEOUT = new Period("PT80S");
 
@@ -308,59 +308,27 @@ public class KafkaSupervisorTest extends EasyMockSupport
     consumerProperties.put("myCustomKey", "myCustomValue");
     consumerProperties.put("bootstrap.servers", kafkaHost);
 
-    KafkaSupervisorIOConfig kafkaSupervisorIOConfig = new 
KafkaSupervisorIOConfig(
-        topic,
-        null,
-        INPUT_FORMAT,
-        replicas,
-        1,
-        new Period("PT1H"),
-        consumerProperties,
-        OBJECT_MAPPER.convertValue(autoScalerConfig, 
LagBasedAutoScalerConfig.class),
-        null,
-        KafkaSupervisorIOConfig.DEFAULT_POLL_TIMEOUT_MILLIS,
-        new Period("P1D"),
-        new Period("PT30S"),
-        true,
-        new Period("PT30M"),
-        null,
-        null,
-        null,
-        null,
-        new IdleConfig(true, 1000L),
-        1,
-        false
-    );
-
-    final KafkaSupervisorTuningConfig tuningConfigOri = new 
KafkaSupervisorTuningConfig(
-        null,
-        1000,
-        null,
-        null,
-        50000,
-        null,
-        new Period("P1Y"),
-        null,
-        null,
-        null,
-        false,
-        null,
-        false,
-        null,
-        numThreads,
-        TEST_CHAT_RETRIES,
-        TEST_HTTP_TIMEOUT,
-        TEST_SHUTDOWN_TIMEOUT,
-        null,
-        null,
-        null,
-        null,
-        null,
-        null,
-        null,
-        null
-    );
-
+    KafkaSupervisorIOConfig kafkaSupervisorIOConfig = new 
KafkaIOConfigBuilder()
+        .withTopic(topic)
+        .withInputFormat(INPUT_FORMAT)
+        .withReplicas(replicas)
+        .withTaskCount(1)
+        .withConsumerProperties(consumerProperties)
+        .withAutoScalerConfig(OBJECT_MAPPER.convertValue(autoScalerConfig, 
LagBasedAutoScalerConfig.class))
+        .withUseEarliestSequenceNumber(true)
+        .build();
+
+    final KafkaSupervisorTuningConfig tuningConfigOri = new 
KafkaTuningConfigBuilder()
+        .withIntermediatePersistPeriod(Period.years(1))
+        .withResetOffsetAutomatically(false)
+        .withWorkerThreads(numThreads)
+        .withShutdownTimeout(TEST_SHUTDOWN_TIMEOUT)
+        .withMaxRowsInMemory(1000)
+        .withMaxRowsPerSegment(50000)
+        .withReportParseExceptions(false)
+        .withChatHandlerNumRetries(TEST_CHAT_RETRIES)
+        .withChatHandlerTimeout(TEST_HTTP_TIMEOUT.toStandardDuration())
+        .build();
     
EasyMock.expect(ingestionSchema.getIOConfig()).andReturn(kafkaSupervisorIOConfig).anyTimes();
     
EasyMock.expect(ingestionSchema.getDataSchema()).andReturn(dataSchema).anyTimes();
     
EasyMock.expect(ingestionSchema.getTuningConfig()).andReturn(tuningConfigOri).anyTimes();
@@ -4649,34 +4617,7 @@ public class KafkaSupervisorTest extends EasyMockSupport
         false,
         kafkaHost,
         dataSchema,
-        new KafkaSupervisorTuningConfig(
-            null,
-            1000,
-            null,
-            null,
-            50000,
-            null,
-            new Period("P1Y"),
-            null,
-            null,
-            null,
-            false,
-            null,
-            false,
-            null,
-            numThreads,
-            TEST_CHAT_RETRIES,
-            TEST_HTTP_TIMEOUT,
-            TEST_SHUTDOWN_TIMEOUT,
-            null,
-            null,
-            null,
-            null,
-            null,
-            null,
-            null,
-            null
-        )
+        tuningConfigBuilder().build()
     );
 
     supervisor.addTaskGroupToActivelyReadingTaskGroup(
@@ -4690,34 +4631,9 @@ public class KafkaSupervisorTest extends EasyMockSupport
 
     DataSchema modifiedDataSchema = getDataSchema("some other datasource");
 
-    KafkaSupervisorTuningConfig modifiedTuningConfig = new 
KafkaSupervisorTuningConfig(
-        null,
-        42, // This is different
-        null,
-        null,
-        50000,
-        null,
-        new Period("P1Y"),
-        null,
-        null,
-        null,
-        false,
-        null,
-        null,
-        null,
-        numThreads,
-        TEST_CHAT_RETRIES,
-        TEST_HTTP_TIMEOUT,
-        TEST_SHUTDOWN_TIMEOUT,
-        null,
-        null,
-        null,
-        null,
-        null,
-        null,
-        null,
-        null
-    );
+    KafkaSupervisorTuningConfig modifiedTuningConfig = tuningConfigBuilder()
+        .withMaxRowsInMemory(42)
+        .build();
 
     KafkaIndexTask completedTaskFromStorage = createKafkaIndexTask(
         "id0",
@@ -4845,34 +4761,7 @@ public class KafkaSupervisorTest extends EasyMockSupport
         false,
         kafkaHost,
         dataSchema,
-        new KafkaSupervisorTuningConfig(
-            null,
-            1000,
-            null,
-            null,
-            50000,
-            null,
-            new Period("P1Y"),
-            null,
-            null,
-            null,
-            false,
-            null,
-            false,
-            null,
-            numThreads,
-            TEST_CHAT_RETRIES,
-            TEST_HTTP_TIMEOUT,
-            TEST_SHUTDOWN_TIMEOUT,
-            null,
-            null,
-            null,
-            null,
-            null,
-            null,
-            null,
-            null
-        )
+        tuningConfigBuilder().build()
     );
 
     // Create task1 with some start and end offsets
@@ -5208,6 +5097,19 @@ public class KafkaSupervisorTest extends EasyMockSupport
     }
   }
 
+  private KafkaTuningConfigBuilder tuningConfigBuilder()
+  {
+    return new KafkaTuningConfigBuilder()
+        .withIntermediatePersistPeriod(Period.years(1))
+        .withResetOffsetAutomatically(false)
+        .withWorkerThreads(numThreads)
+        .withShutdownTimeout(TEST_SHUTDOWN_TIMEOUT)
+        .withMaxRowsInMemory(1000)
+        .withMaxRowsPerSegment(50000)
+        .withReportParseExceptions(false)
+        .withChatHandlerNumRetries(TEST_CHAT_RETRIES)
+        .withChatHandlerTimeout(TEST_HTTP_TIMEOUT.toStandardDuration());
+  }
 
   private TestableKafkaSupervisor getTestableSupervisor(
       int replicas,
@@ -5366,34 +5268,10 @@ public class KafkaSupervisorTest extends EasyMockSupport
       }
     };
 
-    final KafkaSupervisorTuningConfig tuningConfig = new 
KafkaSupervisorTuningConfig(
-        null,
-        1000,
-        null,
-        null,
-        50000,
-        null,
-        new Period("P1Y"),
-        null,
-        null,
-        null,
-        false,
-        null,
-        resetOffsetAutomatically,
-        null,
-        numThreads,
-        TEST_CHAT_RETRIES,
-        TEST_HTTP_TIMEOUT,
-        TEST_SHUTDOWN_TIMEOUT,
-        null,
-        null,
-        null,
-        null,
-        10,
-        null,
-        null,
-        null
-    );
+    final KafkaSupervisorTuningConfig tuningConfig = tuningConfigBuilder()
+        .withMaxSavedParseExceptions(10)
+        .withResetOffsetAutomatically(resetOffsetAutomatically)
+        .build();
 
     return new TestableKafkaSupervisor(
         taskStorage,
@@ -5484,34 +5362,7 @@ public class KafkaSupervisorTest extends EasyMockSupport
       }
     };
 
-    final KafkaSupervisorTuningConfig tuningConfig = new 
KafkaSupervisorTuningConfig(
-        null,
-        1000,
-        null,
-        null,
-        50000,
-        null,
-        new Period("P1Y"),
-        null,
-        null,
-        null,
-        false,
-        null,
-        false,
-        null,
-        numThreads,
-        TEST_CHAT_RETRIES,
-        TEST_HTTP_TIMEOUT,
-        TEST_SHUTDOWN_TIMEOUT,
-        null,
-        null,
-        null,
-        null,
-        null,
-        null,
-        null,
-        null
-    );
+    final KafkaSupervisorTuningConfig tuningConfig = 
tuningConfigBuilder().build();
 
     return new TestableKafkaSupervisorWithCustomIsTaskCurrent(
         taskStorage,
diff --git 
a/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/supervisor/KafkaSupervisorTuningConfigTest.java
 
b/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/supervisor/KafkaSupervisorTuningConfigTest.java
index cda5904edda..376ca256303 100644
--- 
a/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/supervisor/KafkaSupervisorTuningConfigTest.java
+++ 
b/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/supervisor/KafkaSupervisorTuningConfigTest.java
@@ -64,7 +64,7 @@ public class KafkaSupervisorTuningConfigTest
     Assert.assertEquals(0, config.getMaxPendingPersists());
     Assert.assertEquals(IndexSpec.DEFAULT, config.getIndexSpec());
     Assert.assertEquals(IndexSpec.DEFAULT, 
config.getIndexSpecForIntermediatePersists());
-    Assert.assertEquals(false, config.isReportParseExceptions());
+    Assert.assertFalse(config.isReportParseExceptions());
     Assert.assertEquals(java.time.Duration.ofMinutes(15).toMillis(), 
config.getHandoffConditionTimeout());
     Assert.assertNull(config.getWorkerThreads());
     Assert.assertEquals(8L, (long) config.getChatRetries());
@@ -111,7 +111,7 @@ public class KafkaSupervisorTuningConfigTest
     Assert.assertEquals(100, config.getMaxRowsPerSegment().intValue());
     Assert.assertEquals(new Period("PT1H"), 
config.getIntermediatePersistPeriod());
     Assert.assertEquals(100, config.getMaxPendingPersists());
-    Assert.assertEquals(true, config.isReportParseExceptions());
+    Assert.assertTrue(config.isReportParseExceptions());
     Assert.assertEquals(100, config.getHandoffConditionTimeout());
     Assert.assertEquals(12, (int) config.getWorkerThreads());
     Assert.assertEquals(14L, (long) config.getChatRetries());
diff --git 
a/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/supervisor/KafkaTuningConfigBuilder.java
 
b/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/supervisor/KafkaTuningConfigBuilder.java
new file mode 100644
index 00000000000..44113519c9c
--- /dev/null
+++ 
b/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/supervisor/KafkaTuningConfigBuilder.java
@@ -0,0 +1,119 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.indexing.kafka.supervisor;
+
+import org.apache.druid.indexing.common.task.TuningConfigBuilder;
+import org.joda.time.Period;
+
+/**
+ * Builder for {@link KafkaSupervisorTuningConfig} used in tests.
+ */
+public class KafkaTuningConfigBuilder extends 
TuningConfigBuilder<KafkaTuningConfigBuilder, KafkaSupervisorTuningConfig>
+{
+  private Period intermediatePersistPeriod;
+  private Long handoffConditionTimeout;
+  private Boolean resetOffsetAutomatically;
+  private Integer workerThreads;
+  private Period shutdownTimeout;
+  private Period offsetFetchPeriod;
+  private Period intermediateHandoffPeriod;
+  private Boolean releaseLocksOnHandoff;
+
+  public KafkaTuningConfigBuilder withIntermediatePersistPeriod(Period 
intermediatePersistPeriod)
+  {
+    this.intermediatePersistPeriod = intermediatePersistPeriod;
+    return this;
+  }
+
+  public KafkaTuningConfigBuilder withHandoffConditionTimeout(Long 
handoffConditionTimeout)
+  {
+    this.handoffConditionTimeout = handoffConditionTimeout;
+    return this;
+  }
+
+  public KafkaTuningConfigBuilder withResetOffsetAutomatically(Boolean 
resetOffsetAutomatically)
+  {
+    this.resetOffsetAutomatically = resetOffsetAutomatically;
+    return this;
+  }
+
+  public KafkaTuningConfigBuilder withWorkerThreads(Integer workerThreads)
+  {
+    this.workerThreads = workerThreads;
+    return this;
+  }
+
+  public KafkaTuningConfigBuilder withShutdownTimeout(Period shutdownTimeout)
+  {
+    this.shutdownTimeout = shutdownTimeout;
+    return this;
+  }
+
+  public KafkaTuningConfigBuilder withOffsetFetchPeriod(Period 
offsetFetchPeriod)
+  {
+    this.offsetFetchPeriod = offsetFetchPeriod;
+    return this;
+  }
+
+  public KafkaTuningConfigBuilder withIntermediateHandoffPeriod(Period 
intermediateHandoffPeriod)
+  {
+    this.intermediateHandoffPeriod = intermediateHandoffPeriod;
+    return this;
+  }
+
+  public KafkaTuningConfigBuilder withReleaseLocksOnHandoff(boolean 
releaseLocksOnHandoff)
+  {
+    this.releaseLocksOnHandoff = releaseLocksOnHandoff;
+    return this;
+  }
+
+  @Override
+  public KafkaSupervisorTuningConfig build()
+  {
+    return new KafkaSupervisorTuningConfig(
+        appendableIndexSpec,
+        maxRowsInMemory,
+        maxBytesInMemory,
+        skipBytesInMemoryOverheadCheck,
+        maxRowsPerSegment,
+        maxTotalRows,
+        intermediatePersistPeriod,
+        maxPendingPersists,
+        indexSpec,
+        indexSpecForIntermediatePersists,
+        reportParseExceptions,
+        handoffConditionTimeout,
+        resetOffsetAutomatically,
+        segmentWriteOutMediumFactory,
+        workerThreads,
+        chatHandlerNumRetries == null ? null : (long) chatHandlerNumRetries,
+        chatHandlerTimeout == null ? null : new 
Period(chatHandlerTimeout.getMillis()),
+        shutdownTimeout,
+        offsetFetchPeriod,
+        intermediateHandoffPeriod,
+        logParseExceptions,
+        maxParseExceptions,
+        maxSavedParseExceptions,
+        numPersistThreads,
+        maxColumnsToMerge,
+        releaseLocksOnHandoff
+    );
+  }
+}
diff --git a/indexing-service/pom.xml b/indexing-service/pom.xml
index f0da679d4a4..39a2396a0fb 100644
--- a/indexing-service/pom.xml
+++ b/indexing-service/pom.xml
@@ -267,6 +267,12 @@
             <artifactId>jdbi</artifactId>
             <scope>test</scope>
         </dependency>
+      <dependency>
+        <groupId>com.google.api.grpc</groupId>
+        <artifactId>proto-google-common-protos</artifactId>
+        <version>2.48.0</version>
+        <scope>test</scope>
+      </dependency>
     </dependencies>
 
     <build>
diff --git 
a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/TaskBuilder.java
 
b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/TaskBuilder.java
index f6e0e29f7d9..15f0d62ac45 100644
--- 
a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/TaskBuilder.java
+++ 
b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/TaskBuilder.java
@@ -61,13 +61,19 @@ import java.util.stream.Stream;
  * The builder does not use any defaults and all required fields must be set
  * explicitly.
  *
- * @param <T> Type of task created by this builder.
+ * @param <Self> Type of this builder itself
  * @param <C> Type of tuning config used by this builder.
+ * @param <T> Type of task created by this builder.
+ * @param <CB> Type of tuning config builder
  * @see #ofTypeIndex()
  * @see #tuningConfig(Consumer) to specify the {@code tuningConfig}.
  */
 @SuppressWarnings("unchecked")
-public abstract class TaskBuilder<B extends TaskBuilder<B, C, T>, C, T extends 
Task>
+public abstract class TaskBuilder<
+    T extends Task,
+    C,
+    CB extends TuningConfigBuilder<CB, C>,
+    Self extends TaskBuilder<T, C, CB, Self>>
 {
   // Fields are package-protected to allow access by subclasses like Index, 
Compact
   InputSource inputSource = null;
@@ -77,7 +83,7 @@ public abstract class TaskBuilder<B extends TaskBuilder<B, C, 
T>, C, T extends T
 
   Boolean appendToExisting = null;
 
-  final TuningConfigBuilder<C> tuningConfig;
+  final TuningConfigBuilder<CB, C> tuningConfig;
 
   private TaskBuilder()
   {
@@ -90,9 +96,9 @@ public abstract class TaskBuilder<B extends TaskBuilder<B, C, 
T>, C, T extends T
    */
   public abstract T withId(String taskId);
 
-  public abstract B dataSource(String dataSource);
+  public abstract Self dataSource(String dataSource);
 
-  protected abstract TuningConfigBuilder<C> tuningConfigBuilder();
+  protected abstract TuningConfigBuilder<CB, C> tuningConfigBuilder();
 
   /**
    * Initializes builder for a new {@link IndexTask}.
@@ -115,18 +121,18 @@ public abstract class TaskBuilder<B extends 
TaskBuilder<B, C, T>, C, T extends T
     return new Compact();
   }
 
-  public B inputSource(InputSource inputSource)
+  public Self inputSource(InputSource inputSource)
   {
     this.inputSource = inputSource;
-    return (B) this;
+    return (Self) this;
   }
 
-  public B inlineInputSourceWithData(String data)
+  public Self inlineInputSourceWithData(String data)
   {
     return inputSource(new InlineInputSource(data));
   }
 
-  public B druidInputSource(String dataSource, Interval interval)
+  public Self druidInputSource(String dataSource, Interval interval)
   {
     return inputSource(
         new DruidInputSource(
@@ -153,94 +159,98 @@ public abstract class TaskBuilder<B extends 
TaskBuilder<B, C, T>, C, T extends T
    * }
    * </pre>
    */
-  public B localInputSourceWithFiles(File... files)
+  public Self localInputSourceWithFiles(File... files)
   {
     return inputSource(
         new LocalInputSource(null, null, List.of(files), null)
     );
   }
 
-  public B inputFormat(InputFormat inputFormat)
+  public Self inputFormat(InputFormat inputFormat)
   {
     this.inputFormat = inputFormat;
-    return (B) this;
+    return (Self) this;
   }
 
-  public B jsonInputFormat()
+  public Self jsonInputFormat()
   {
     return inputFormat(
         new JsonInputFormat(null, null, null, null, null)
     );
   }
 
-  public B csvInputFormatWithColumns(String... columns)
+  public Self csvInputFormatWithColumns(String... columns)
   {
     return inputFormat(
         new CsvInputFormat(List.of(columns), null, null, false, 0, null)
     );
   }
 
-  public B appendToExisting(boolean append)
+  public Self appendToExisting(boolean append)
   {
     this.appendToExisting = append;
-    return (B) this;
+    return (Self) this;
   }
 
-  public B dynamicPartitionWithMaxRows(int maxRowsPerSegment)
+  public Self dynamicPartitionWithMaxRows(int maxRowsPerSegment)
   {
     tuningConfig.withPartitionsSpec(new 
DynamicPartitionsSpec(maxRowsPerSegment, null));
-    return (B) this;
+    return (Self) this;
   }
 
-  public B tuningConfig(Consumer<TuningConfigBuilder<C>> updateTuningConfig)
+  public Self tuningConfig(Consumer<TuningConfigBuilder<CB, C>> 
updateTuningConfig)
   {
     updateTuningConfig.accept(tuningConfig);
-    return (B) this;
+    return (Self) this;
   }
 
-  public B context(String key, Object value)
+  public Self context(String key, Object value)
   {
     this.context.put(key, value);
-    return (B) this;
+    return (Self) this;
   }
 
-  public abstract static class IndexCommon<B extends TaskBuilder<B, C, T>, C, 
T extends Task>
-      extends TaskBuilder<B, C, T>
+  public abstract static class IndexCommon<
+      T extends Task,
+      C,
+      CB extends TuningConfigBuilder<CB, C>,
+      Self extends TaskBuilder<T, C, CB, Self>>
+      extends TaskBuilder<T, C, CB, Self>
   {
     final DataSchema.Builder dataSchema = DataSchema.builder();
 
     @Override
-    public B dataSource(String dataSource)
+    public Self dataSource(String dataSource)
     {
       dataSchema.withDataSource(dataSource);
-      return (B) this;
+      return (Self) this;
     }
 
-    public B dataSchema(Consumer<DataSchema.Builder> updateDataSchema)
+    public Self dataSchema(Consumer<DataSchema.Builder> updateDataSchema)
     {
       updateDataSchema.accept(dataSchema);
-      return (B) this;
+      return (Self) this;
     }
 
-    public B isoTimestampColumn(String timestampColumn)
+    public Self isoTimestampColumn(String timestampColumn)
     {
       dataSchema.withTimestamp(new TimestampSpec(timestampColumn, "iso", 
null));
-      return (B) this;
+      return (Self) this;
     }
 
-    public B timestampColumn(String timestampColumn)
+    public Self timestampColumn(String timestampColumn)
     {
       dataSchema.withTimestamp(new TimestampSpec(timestampColumn, null, null));
-      return (B) this;
+      return (Self) this;
     }
 
-    public B granularitySpec(GranularitySpec granularitySpec)
+    public Self granularitySpec(GranularitySpec granularitySpec)
     {
       dataSchema.withGranularity(granularitySpec);
-      return (B) this;
+      return (Self) this;
     }
 
-    public B granularitySpec(String segmentGranularity, String 
queryGranularity, Boolean rollup)
+    public Self granularitySpec(String segmentGranularity, String 
queryGranularity, Boolean rollup)
     {
       dataSchema.withGranularity(
           new UniformGranularitySpec(
@@ -250,13 +260,13 @@ public abstract class TaskBuilder<B extends 
TaskBuilder<B, C, T>, C, T extends T
               null
           )
       );
-      return (B) this;
+      return (Self) this;
     }
 
     /**
      * Sets {@code "granularitySpec": {"segmentGranularity": <arg>}}.
      */
-    public B segmentGranularity(String granularity)
+    public Self segmentGranularity(String granularity)
     {
       return granularitySpec(granularity, null, null);
     }
@@ -266,30 +276,34 @@ public abstract class TaskBuilder<B extends 
TaskBuilder<B, C, T>, C, T extends T
      *
      * @see #dataSchema(Consumer) for more options
      */
-    public B dimensions(String... dimensions)
+    public Self dimensions(String... dimensions)
     {
       dataSchema.withDimensions(
           Stream.of(dimensions)
                 .map(StringDimensionSchema::new)
                 .collect(Collectors.toList())
       );
-      return (B) this;
+      return (Self) this;
     }
 
-    public B metricAggregates(AggregatorFactory... aggregators)
+    public Self metricAggregates(AggregatorFactory... aggregators)
     {
       dataSchema.withAggregators(aggregators);
-      return (B) this;
+      return (Self) this;
     }
   }
 
   /**
    * Builder for {@link IndexTask} that uses a {@link 
IndexTask.IndexTuningConfig}.
    */
-  public static class Index extends IndexCommon<Index, 
IndexTask.IndexTuningConfig, IndexTask>
+  public static class Index extends IndexCommon<
+      IndexTask,
+      IndexTask.IndexTuningConfig,
+      TuningConfigBuilder.Index,
+      Index>
   {
     @Override
-    public TuningConfigBuilder<IndexTask.IndexTuningConfig> 
tuningConfigBuilder()
+    public TuningConfigBuilder.Index tuningConfigBuilder()
     {
       return TuningConfigBuilder.forIndexTask();
     }
@@ -320,7 +334,11 @@ public abstract class TaskBuilder<B extends TaskBuilder<B, 
C, T>, C, T extends T
   /**
    * Builder for {@link ParallelIndexSupervisorTask} which uses a {@link 
ParallelIndexTuningConfig}.
    */
-  public static class IndexParallel extends IndexCommon<IndexParallel, 
ParallelIndexTuningConfig, ParallelIndexSupervisorTask>
+  public static class IndexParallel extends IndexCommon<
+      ParallelIndexSupervisorTask,
+      ParallelIndexTuningConfig,
+      TuningConfigBuilder.ParallelIndex,
+      IndexParallel>
   {
     @Override
     public ParallelIndexSupervisorTask withId(String taskId)
@@ -345,7 +363,7 @@ public abstract class TaskBuilder<B extends TaskBuilder<B, 
C, T>, C, T extends T
     }
 
     @Override
-    public TuningConfigBuilder<ParallelIndexTuningConfig> tuningConfigBuilder()
+    public TuningConfigBuilder.ParallelIndex tuningConfigBuilder()
     {
       return TuningConfigBuilder.forParallelIndexTask();
     }
@@ -354,7 +372,11 @@ public abstract class TaskBuilder<B extends TaskBuilder<B, 
C, T>, C, T extends T
   /**
    * Builder for a {@link CompactionTask} which uses a {@link 
CompactionTask.CompactionTuningConfig}.
    */
-  public static class Compact extends TaskBuilder<Compact, 
CompactionTask.CompactionTuningConfig, CompactionTask>
+  public static class Compact extends TaskBuilder<
+      CompactionTask,
+      CompactionTask.CompactionTuningConfig,
+      TuningConfigBuilder.Compact,
+      Compact>
   {
     private String dataSource;
     private Interval interval;
@@ -429,7 +451,7 @@ public abstract class TaskBuilder<B extends TaskBuilder<B, 
C, T>, C, T extends T
     }
 
     @Override
-    public TuningConfigBuilder<CompactionTask.CompactionTuningConfig> 
tuningConfigBuilder()
+    public TuningConfigBuilder.Compact tuningConfigBuilder()
     {
       return TuningConfigBuilder.forCompactionTask();
     }
diff --git 
a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/TuningConfigBuilder.java
 
b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/TuningConfigBuilder.java
index c46c0becc76..abac64de8be 100644
--- 
a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/TuningConfigBuilder.java
+++ 
b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/TuningConfigBuilder.java
@@ -31,11 +31,12 @@ import org.joda.time.Duration;
  * Builder utility for various task tuning configs.
  *
  * @param <C> Type of config that is being built
+ * @param <Self> Type of this builder itself
  * @see TuningConfigBuilder#forIndexTask()
  * @see TuningConfigBuilder#forParallelIndexTask()
  * @see TuningConfigBuilder#forCompactionTask()
  */
-public abstract class TuningConfigBuilder<C>
+public abstract class TuningConfigBuilder<Self extends 
TuningConfigBuilder<Self, C>, C>
 {
   protected Integer targetPartitionSize;
   protected Integer maxRowsPerSegment;
@@ -71,202 +72,208 @@ public abstract class TuningConfigBuilder<C>
   protected Integer maxAllowedLockCount;
   protected Integer numPersistThreads;
 
-  public TuningConfigBuilder<C> withTargetPartitionSize(Integer 
targetPartitionSize)
+  public Self withTargetPartitionSize(Integer targetPartitionSize)
   {
     this.targetPartitionSize = targetPartitionSize;
-    return this;
+    return self();
   }
 
-  public TuningConfigBuilder<C> withMaxRowsPerSegment(Integer 
maxRowsPerSegment)
+  public Self withMaxRowsPerSegment(Integer maxRowsPerSegment)
   {
     this.maxRowsPerSegment = maxRowsPerSegment;
-    return this;
+    return self();
   }
 
-  public TuningConfigBuilder<C> withAppendableIndexSpec(AppendableIndexSpec 
appendableIndexSpec)
+  public Self withAppendableIndexSpec(AppendableIndexSpec appendableIndexSpec)
   {
     this.appendableIndexSpec = appendableIndexSpec;
-    return this;
+    return self();
   }
 
-  public TuningConfigBuilder<C> withMaxRowsInMemory(Integer maxRowsInMemory)
+  public Self withMaxRowsInMemory(Integer maxRowsInMemory)
   {
     this.maxRowsInMemory = maxRowsInMemory;
-    return this;
+    return self();
   }
 
-  public TuningConfigBuilder<C> withMaxBytesInMemory(Long maxBytesInMemory)
+  public Self withMaxBytesInMemory(Long maxBytesInMemory)
   {
     this.maxBytesInMemory = maxBytesInMemory;
-    return this;
+    return self();
   }
 
-  public TuningConfigBuilder<C> withSkipBytesInMemoryOverheadCheck(Boolean 
skipBytesInMemoryOverheadCheck)
+  public Self withSkipBytesInMemoryOverheadCheck(Boolean 
skipBytesInMemoryOverheadCheck)
   {
     this.skipBytesInMemoryOverheadCheck = skipBytesInMemoryOverheadCheck;
-    return this;
+    return self();
   }
 
-  public TuningConfigBuilder<C> withMaxTotalRows(Long maxTotalRows)
+  public Self withMaxTotalRows(Long maxTotalRows)
   {
     this.maxTotalRows = maxTotalRows;
-    return this;
+    return self();
   }
 
-  public TuningConfigBuilder<C> withNumShards(Integer numShards)
+  public Self withNumShards(Integer numShards)
   {
     this.numShards = numShards;
-    return this;
+    return self();
   }
 
-  public TuningConfigBuilder<C> withSplitHintSpec(SplitHintSpec splitHintSpec)
+  public Self withSplitHintSpec(SplitHintSpec splitHintSpec)
   {
     this.splitHintSpec = splitHintSpec;
-    return this;
+    return self();
   }
 
-  public TuningConfigBuilder<C> withPartitionsSpec(PartitionsSpec 
partitionsSpec)
+  public Self withPartitionsSpec(PartitionsSpec partitionsSpec)
   {
     this.partitionsSpec = partitionsSpec;
-    return this;
+    return self();
   }
 
-  public TuningConfigBuilder<C> withIndexSpec(IndexSpec indexSpec)
+  public Self withIndexSpec(IndexSpec indexSpec)
   {
     this.indexSpec = indexSpec;
-    return this;
+    return self();
   }
 
-  public TuningConfigBuilder<C> withIndexSpecForIntermediatePersists(IndexSpec 
indexSpecForIntermediatePersists)
+  public Self withIndexSpecForIntermediatePersists(IndexSpec 
indexSpecForIntermediatePersists)
   {
     this.indexSpecForIntermediatePersists = indexSpecForIntermediatePersists;
-    return this;
+    return self();
   }
 
-  public TuningConfigBuilder<C> withMaxPendingPersists(Integer 
maxPendingPersists)
+  public Self withMaxPendingPersists(Integer maxPendingPersists)
   {
     this.maxPendingPersists = maxPendingPersists;
-    return this;
+    return self();
   }
 
-  public TuningConfigBuilder<C> withForceGuaranteedRollup(Boolean 
forceGuaranteedRollup)
+  public Self withForceGuaranteedRollup(Boolean forceGuaranteedRollup)
   {
     this.forceGuaranteedRollup = forceGuaranteedRollup;
-    return this;
+    return self();
   }
 
-  public TuningConfigBuilder<C> withReportParseExceptions(Boolean 
reportParseExceptions)
+  public Self withReportParseExceptions(Boolean reportParseExceptions)
   {
     this.reportParseExceptions = reportParseExceptions;
-    return this;
+    return self();
   }
 
-  public TuningConfigBuilder<C> withPushTimeout(Long pushTimeout)
+  public Self withPushTimeout(Long pushTimeout)
   {
     this.pushTimeout = pushTimeout;
-    return this;
+    return self();
   }
 
-  public TuningConfigBuilder<C> withPublishTimeout(Long publishTimeout)
+  public Self withPublishTimeout(Long publishTimeout)
   {
     this.publishTimeout = publishTimeout;
-    return this;
+    return self();
   }
 
-  public TuningConfigBuilder<C> 
withSegmentWriteOutMediumFactory(SegmentWriteOutMediumFactory 
segmentWriteOutMediumFactory)
+  public Self withSegmentWriteOutMediumFactory(SegmentWriteOutMediumFactory 
segmentWriteOutMediumFactory)
   {
     this.segmentWriteOutMediumFactory = segmentWriteOutMediumFactory;
-    return this;
+    return self();
   }
 
-  public TuningConfigBuilder<C> withMaxNumSubTasks(Integer maxNumSubTasks)
+  public Self withMaxNumSubTasks(Integer maxNumSubTasks)
   {
     this.maxNumSubTasks = maxNumSubTasks;
-    return this;
+    return self();
   }
 
-  public TuningConfigBuilder<C> withMaxNumConcurrentSubTasks(Integer 
maxNumConcurrentSubTasks)
+  public Self withMaxNumConcurrentSubTasks(Integer maxNumConcurrentSubTasks)
   {
     this.maxNumConcurrentSubTasks = maxNumConcurrentSubTasks;
-    return this;
+    return self();
   }
 
-  public TuningConfigBuilder<C> withMaxRetry(Integer maxRetry)
+  public Self withMaxRetry(Integer maxRetry)
   {
     this.maxRetry = maxRetry;
-    return this;
+    return self();
   }
 
-  public TuningConfigBuilder<C> withTaskStatusCheckPeriodMs(Long 
taskStatusCheckPeriodMs)
+  public Self withTaskStatusCheckPeriodMs(Long taskStatusCheckPeriodMs)
   {
     this.taskStatusCheckPeriodMs = taskStatusCheckPeriodMs;
-    return this;
+    return self();
   }
 
-  public TuningConfigBuilder<C> withChatHandlerTimeout(Duration 
chatHandlerTimeout)
+  public Self withChatHandlerTimeout(Duration chatHandlerTimeout)
   {
     this.chatHandlerTimeout = chatHandlerTimeout;
-    return this;
+    return self();
   }
 
-  public TuningConfigBuilder<C> withChatHandlerNumRetries(Integer 
chatHandlerNumRetries)
+  public Self withChatHandlerNumRetries(Integer chatHandlerNumRetries)
   {
     this.chatHandlerNumRetries = chatHandlerNumRetries;
-    return this;
+    return self();
   }
 
-  public TuningConfigBuilder<C> withMaxNumSegmentsToMerge(Integer 
maxNumSegmentsToMerge)
+  public Self withMaxNumSegmentsToMerge(Integer maxNumSegmentsToMerge)
   {
     this.maxNumSegmentsToMerge = maxNumSegmentsToMerge;
-    return this;
+    return self();
   }
 
-  public TuningConfigBuilder<C> withTotalNumMergeTasks(Integer 
totalNumMergeTasks)
+  public Self withTotalNumMergeTasks(Integer totalNumMergeTasks)
   {
     this.totalNumMergeTasks = totalNumMergeTasks;
-    return this;
+    return self();
   }
 
-  public TuningConfigBuilder<C> withLogParseExceptions(Boolean 
logParseExceptions)
+  public Self withLogParseExceptions(Boolean logParseExceptions)
   {
     this.logParseExceptions = logParseExceptions;
-    return this;
+    return self();
   }
 
-  public TuningConfigBuilder<C> withMaxParseExceptions(Integer 
maxParseExceptions)
+  public Self withMaxParseExceptions(Integer maxParseExceptions)
   {
     this.maxParseExceptions = maxParseExceptions;
-    return this;
+    return self();
   }
 
-  public TuningConfigBuilder<C> withMaxSavedParseExceptions(Integer 
maxSavedParseExceptions)
+  public Self withMaxSavedParseExceptions(Integer maxSavedParseExceptions)
   {
     this.maxSavedParseExceptions = maxSavedParseExceptions;
-    return this;
+    return self();
   }
 
-  public TuningConfigBuilder<C> withMaxColumnsToMerge(Integer 
maxColumnsToMerge)
+  public Self withMaxColumnsToMerge(Integer maxColumnsToMerge)
   {
     this.maxColumnsToMerge = maxColumnsToMerge;
-    return this;
+    return self();
   }
 
-  public TuningConfigBuilder<C> withAwaitSegmentAvailabilityTimeoutMillis(Long 
awaitSegmentAvailabilityTimeoutMillis)
+  public Self withAwaitSegmentAvailabilityTimeoutMillis(Long 
awaitSegmentAvailabilityTimeoutMillis)
   {
     this.awaitSegmentAvailabilityTimeoutMillis = 
awaitSegmentAvailabilityTimeoutMillis;
-    return this;
+    return self();
   }
 
-  public TuningConfigBuilder<C> withNumPersistThreads(Integer 
numPersistThreads)
+  public Self withNumPersistThreads(Integer numPersistThreads)
   {
     this.numPersistThreads = numPersistThreads;
-    return this;
+    return self();
   }
 
-  public TuningConfigBuilder<C> withMaxAllowedLockCount(Integer 
maxAllowedLockCount)
+  public Self withMaxAllowedLockCount(Integer maxAllowedLockCount)
   {
     this.maxAllowedLockCount = maxAllowedLockCount;
-    return this;
+    return self();
+  }
+  
+  @SuppressWarnings("unchecked")
+  private Self self()
+  {
+    return (Self) this;
   }
 
   public abstract C build();
@@ -295,7 +302,7 @@ public abstract class TuningConfigBuilder<C>
     return new Index();
   }
 
-  public static class Index extends 
TuningConfigBuilder<IndexTask.IndexTuningConfig>
+  public static class Index extends TuningConfigBuilder<Index, 
IndexTask.IndexTuningConfig>
   {
     @Override
     public IndexTask.IndexTuningConfig build()
@@ -330,7 +337,7 @@ public abstract class TuningConfigBuilder<C>
     }
   }
 
-  public static class ParallelIndex extends 
TuningConfigBuilder<ParallelIndexTuningConfig>
+  public static class ParallelIndex extends TuningConfigBuilder<ParallelIndex, 
ParallelIndexTuningConfig>
   {
     @Override
     public ParallelIndexTuningConfig build()
@@ -372,7 +379,7 @@ public abstract class TuningConfigBuilder<C>
     }
   }
 
-  public static class Compact extends 
TuningConfigBuilder<CompactionTask.CompactionTuningConfig>
+  public static class Compact extends TuningConfigBuilder<Compact, 
CompactionTask.CompactionTuningConfig>
   {
     @Override
     public CompactionTask.CompactionTuningConfig build()
diff --git 
a/indexing-service/src/test/java/org/apache/druid/indexing/seekablestream/supervisor/SupervisorIOConfigBuilder.java
 
b/indexing-service/src/test/java/org/apache/druid/indexing/seekablestream/supervisor/SupervisorIOConfigBuilder.java
new file mode 100644
index 00000000000..bcfb33e9d12
--- /dev/null
+++ 
b/indexing-service/src/test/java/org/apache/druid/indexing/seekablestream/supervisor/SupervisorIOConfigBuilder.java
@@ -0,0 +1,164 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.indexing.seekablestream.supervisor;
+
+import org.apache.druid.data.input.InputFormat;
+import org.apache.druid.data.input.impl.JsonInputFormat;
+import 
org.apache.druid.indexing.seekablestream.supervisor.autoscaler.AutoScalerConfig;
+import org.joda.time.DateTime;
+import org.joda.time.Period;
+
+/**
+ * Builder for a {@link SeekableStreamSupervisorIOConfig}.
+ *
+ * @param <Self> Type of this builder itself
+ * @param <C>    Type of the IOConfig created by this builder
+ */
+public abstract class SupervisorIOConfigBuilder<
+    Self extends SupervisorIOConfigBuilder<Self, C>,
+    C extends SeekableStreamSupervisorIOConfig>
+{
+  protected String stream;
+  protected InputFormat inputFormat;
+  protected Integer replicas;
+  protected Integer taskCount;
+  protected Period taskDuration;
+  protected Period startDelay;
+  protected Period period;
+  protected Boolean useEarliestSequenceNumber;
+  protected Period completionTimeout;
+  protected Period lateMessageRejectionPeriod;
+  protected Period earlyMessageRejectionPeriod;
+  protected AutoScalerConfig autoScalerConfig;
+  protected LagAggregator lagAggregator;
+  protected DateTime lateMessageRejectionStartDateTime;
+  protected IdleConfig idleConfig;
+  protected Integer stopTaskCount;
+
+  public Self withStream(String stream)
+  {
+    this.stream = stream;
+    return self();
+  }
+
+  public Self withInputFormat(InputFormat inputFormat)
+  {
+    this.inputFormat = inputFormat;
+    return self();
+  }
+
+  public Self withJsonInputFormat()
+  {
+    this.inputFormat = new JsonInputFormat(null, null, null, null, null);
+    return self();
+  }
+
+  public Self withReplicas(Integer replicas)
+  {
+    this.replicas = replicas;
+    return self();
+  }
+
+  public Self withTaskCount(Integer taskCount)
+  {
+    this.taskCount = taskCount;
+    return self();
+  }
+
+  public Self withTaskDuration(Period taskDuration)
+  {
+    this.taskDuration = taskDuration;
+    return self();
+  }
+
+  public Self withStartDelay(Period startDelay)
+  {
+    this.startDelay = startDelay;
+    return self();
+  }
+
+  public Self withSupervisorRunPeriod(Period period)
+  {
+    this.period = period;
+    return self();
+  }
+
+  public Self withUseEarliestSequenceNumber(Boolean useEarliestSequenceNumber)
+  {
+    this.useEarliestSequenceNumber = useEarliestSequenceNumber;
+    return self();
+  }
+
+  public Self withCompletionTimeout(Period completionTimeout)
+  {
+    this.completionTimeout = completionTimeout;
+    return self();
+  }
+
+  public Self withLateMessageRejectionPeriod(Period lateMessageRejectionPeriod)
+  {
+    this.lateMessageRejectionPeriod = lateMessageRejectionPeriod;
+    return self();
+  }
+
+  public Self withEarlyMessageRejectionPeriod(Period 
earlyMessageRejectionPeriod)
+  {
+    this.earlyMessageRejectionPeriod = earlyMessageRejectionPeriod;
+    return self();
+  }
+
+  public Self withAutoScalerConfig(AutoScalerConfig autoScalerConfig)
+  {
+    this.autoScalerConfig = autoScalerConfig;
+    return self();
+  }
+
+  public Self withLagAggregator(LagAggregator lagAggregator)
+  {
+    this.lagAggregator = lagAggregator;
+    return self();
+  }
+
+  public Self withLateMessageRejectionStartDateTime(DateTime 
lateMessageRejectionStartDateTime)
+  {
+    this.lateMessageRejectionStartDateTime = lateMessageRejectionStartDateTime;
+    return self();
+  }
+
+  public Self withIdleConfig(IdleConfig idleConfig)
+  {
+    this.idleConfig = idleConfig;
+    return self();
+  }
+
+  public Self withStopTaskCount(Integer stopTaskCount)
+  {
+    this.stopTaskCount = stopTaskCount;
+    return self();
+  }
+
+  public abstract C build();
+
+  @SuppressWarnings("unchecked")
+  private Self self()
+  {
+    return (Self) this;
+  }
+}


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to