This is an automated email from the ASF dual-hosted git repository.

kfaraz pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/druid.git


The following commit(s) were added to refs/heads/master by this push:
     new bdb8f22c054 Migrate Kafka ITs to embedded tests (#18870)
bdb8f22c054 is described below

commit bdb8f22c054cfd5fbb0d11c942cd41f4ba84a48e
Author: Kashif Faraz <[email protected]>
AuthorDate: Sun Jan 4 14:35:21 2026 +0530

    Migrate Kafka ITs to embedded tests (#18870)
    
    Changes:
    - Add `KafkaResource.produceRecordsWithoutTransaction()`
    - Add `KafkaFaultToleranceTest`
    - Add `FaultyClusterTest` to replace `ITFaultyClusterTest`
    - Fix up `FaultyTaskLockbox`, `FaultyTaskActionClientFactory` on Indexers
    - Allow deseriazation of `KafkaSupervisorReportPayload`
    - Add `KafkaMultiSupervisorTest`
    - Add new methods to `EmbeddedKafkaSupervisorTest`
    - Remove the following IT groups from GHA workflow
       - `kafka-index`
       - `kafka-index-slow`
       - `kafka-transactional-index`
       - `kafka-transactional-index-slow`
---
 .github/workflows/cron-job-its.yml                 |  28 ---
 .github/workflows/standard-its.yml                 |  31 ---
 docs/api-reference/supervisor-api.md               |  30 ++-
 .../embedded/indexing/KafkaFaultToleranceTest.java | 179 +++++++++++++++++
 .../testing/embedded/indexing/KafkaTestBase.java   | 148 ++++++++++++++
 .../testing/embedded/server/FaultyClusterTest.java | 159 +++++++++++++++
 .../embedded/server/KafkaMultiSupervisorTest.java  |  95 +++++++++
 .../supervisor/KafkaSupervisorReportPayload.java   |  37 ++--
 .../simulate/EmbeddedKafkaSupervisorTest.java      | 100 ++++++++-
 .../indexing/kafka/simulate/KafkaResource.java     |  42 ++++
 .../supervisor/KafkaSupervisorSpecBuilder.java     |  10 +-
 .../apache/druid/guice/ClusterTestingModule.java   |   3 +
 .../cluster/overlord/FaultyTaskLockbox.java        |  31 ++-
 .../task/FaultyRemoteTaskActionClientFactory.java  |  15 +-
 .../druid/indexing/overlord/GlobalTaskLockbox.java |  13 +-
 .../overlord/supervisor/SupervisorResource.java    |   4 +-
 .../tools/WikipediaStreamEventStreamGenerator.java |  11 +-
 integration-tests/pom.xml                          |  16 --
 .../druid/testing/utils/KafkaAdminClient.java      | 111 ----------
 .../java/org/apache/druid/tests/TestNGGroup.java   |   8 -
 .../coordinator/duty/ITFaultyClusterTest.java      | 196 ------------------
 .../indexer/AbstractKafkaIndexingServiceTest.java  | 219 --------------------
 ...exingServiceNonTransactionalSerializedTest.java |  70 -------
 .../ITKafkaIndexingServiceStopTasksEarlyTest.java  |  53 -----
 ...IndexingServiceTransactionalSerializedTest.java |  70 -------
 .../druid/tests/indexer/ITNilColumnTest.java       | 223 ---------------------
 ...ingServiceNonTransactionalParallelizedTest.java | 103 ----------
 ...dexingServiceTransactionalParallelizedTest.java |  77 -------
 .../overlord/supervisor/SupervisorReport.java      |   8 +-
 .../testing/embedded/EmbeddedClusterApis.java      |  14 +-
 .../testing/embedded/EmbeddedServiceClient.java    |   3 +-
 31 files changed, 844 insertions(+), 1263 deletions(-)

diff --git a/.github/workflows/cron-job-its.yml 
b/.github/workflows/cron-job-its.yml
index d26ece294c9..1fb0c7a4fe7 100644
--- a/.github/workflows/cron-job-its.yml
+++ b/.github/workflows/cron-job-its.yml
@@ -56,34 +56,6 @@ jobs:
         run: |
           ./it.sh ci
 
-  integration-index-tests-middleManager:
-    strategy:
-      fail-fast: false
-      matrix:
-        testing_group: [kafka-index, kafka-index-slow, 
kafka-transactional-index, kafka-transactional-index-slow, kafka-data-format, 
realtime-index]
-    uses: ./.github/workflows/reusable-standard-its.yml
-    needs: build
-    with:
-      build_jdk: 17
-      runtime_jdk: 17
-      testing_groups: -Dgroups=${{ matrix.testing_group }}
-      use_indexer: middleManager
-      group: ${{ matrix.testing_group }}
-
-  integration-index-tests-indexer:
-    strategy:
-      fail-fast: false
-      matrix:
-        testing_group: [ kafka-index, kafka-transactional-index, 
kafka-index-slow, kafka-transactional-index-slow, kafka-data-format ]
-    uses: ./.github/workflows/reusable-standard-its.yml
-    needs: build
-    with:
-      build_jdk: 17
-      runtime_jdk: 17
-      testing_groups: -Dgroups=${{ matrix.testing_group }}
-      use_indexer: indexer
-      group: ${{ matrix.testing_group }}
-
   integration-query-tests-middleManager:
     strategy:
       fail-fast: false
diff --git a/.github/workflows/standard-its.yml 
b/.github/workflows/standard-its.yml
index e195572b6fb..fba85ac0055 100644
--- a/.github/workflows/standard-its.yml
+++ b/.github/workflows/standard-its.yml
@@ -42,37 +42,6 @@ jobs:
             core:
               - '!extension*/**'
 
-  integration-index-tests-middleManager:
-    needs: changes
-    strategy:
-      fail-fast: false
-      matrix:
-        testing_group: [kafka-index, kafka-index-slow, 
kafka-transactional-index, kafka-transactional-index-slow, realtime-index]
-    uses: ./.github/workflows/reusable-standard-its.yml
-    if: ${{ needs.changes.outputs.core == 'true' || 
needs.changes.outputs.common-extensions == 'true' }}
-    with:
-      build_jdk: 17
-      runtime_jdk: 17
-      testing_groups: -Dgroups=${{ matrix.testing_group }}
-      override_config_path: ./environment-configs/test-groups/prepopulated-data
-      use_indexer: middleManager
-      group: ${{ matrix.testing_group }}
-
-  integration-index-tests-indexer:
-    needs: changes
-    strategy:
-      fail-fast: false
-      matrix:
-        testing_group: [kafka-index]
-    uses: ./.github/workflows/reusable-standard-its.yml
-    if: ${{ needs.changes.outputs.core == 'true' || 
needs.changes.outputs.common-extensions == 'true' }}
-    with:
-      build_jdk: 17
-      runtime_jdk: 17
-      testing_groups: -Dgroups=${{ matrix.testing_group }}
-      use_indexer: indexer
-      group: ${{ matrix.testing_group }}
-
   integration-query-tests-middleManager:
     needs: changes
     strategy:
diff --git a/docs/api-reference/supervisor-api.md 
b/docs/api-reference/supervisor-api.md
index 38e68d4e137..87d7ea65e4e 100644
--- a/docs/api-reference/supervisor-api.md
+++ b/docs/api-reference/supervisor-api.md
@@ -3598,12 +3598,33 @@ Host: http://ROUTER_IP:ROUTER_PORT
 
 ### Handoff task groups for a supervisor early
 
-Trigger handoff for specified task groups of a supervisor early. This is a 
best effort API and makes no guarantees of handoff execution
+Trigger handoff for specified task groups of a supervisor early. This is a 
best effort API and makes no guarantees of handoff execution.
 
 #### URL
 
 `POST` `/druid/indexer/v1/supervisor/{supervisorId}/taskGroups/handoff`
 
+#### Responses
+
+<Tabs>
+
+<TabItem value="1" label="202 ACCEPTED">
+
+*Request has been accepted and handoff will be initiated in the background.*
+
+</TabItem>
+<TabItem value="2" label="404 NOT FOUND">
+
+*Invalid supervisor ID or the supervisor is not running.*
+
+</TabItem>
+<TabItem value="3" label="400 BAD REQUEST">
+
+*Supervisor does not support early handoff.*
+
+</TabItem>
+</Tabs>
+
 #### Sample request
 
 The following example shows how to handoff task groups for a supervisor with 
the name `social_media` and has the task groups: `1,2,3`.
@@ -3639,8 +3660,11 @@ Content-Type: application/json
 #### Sample response
 
 <details>
-  <summary>View the response</summary>
-(empty response)
+  <summary>202 Accepted</summary>
+
+```json
+{}
+```
 </details>
 
 ### Shut down a supervisor
diff --git 
a/embedded-tests/src/test/java/org/apache/druid/testing/embedded/indexing/KafkaFaultToleranceTest.java
 
b/embedded-tests/src/test/java/org/apache/druid/testing/embedded/indexing/KafkaFaultToleranceTest.java
new file mode 100644
index 00000000000..abb2993098c
--- /dev/null
+++ 
b/embedded-tests/src/test/java/org/apache/druid/testing/embedded/indexing/KafkaFaultToleranceTest.java
@@ -0,0 +1,179 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.testing.embedded.indexing;
+
+import com.fasterxml.jackson.core.type.TypeReference;
+import org.apache.druid.indexer.TaskStatusPlus;
+import org.apache.druid.indexing.overlord.supervisor.SupervisorSpec;
+import org.apache.druid.java.util.common.StringUtils;
+import org.apache.druid.query.DruidMetrics;
+import org.apache.druid.rpc.RequestBuilder;
+import org.jboss.netty.handler.codec.http.HttpMethod;
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.params.ParameterizedTest;
+import org.junit.jupiter.params.provider.ValueSource;
+
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.stream.Collectors;
+
+public class KafkaFaultToleranceTest extends KafkaTestBase
+{
+  private SupervisorSpec supervisorSpec = null;
+  private String topic = null;
+  private int totalRecords = 0;
+
+  @BeforeEach
+  public void setupTopicAndSupervisor()
+  {
+    totalRecords = 0;
+    topic = "topic_" + dataSource;
+    kafkaServer.createTopicWithPartitions(topic, 2);
+
+    supervisorSpec = createSupervisor().withId("supe_" + 
dataSource).build(dataSource, topic);
+    cluster.callApi().postSupervisor(supervisorSpec);
+  }
+  
+  @AfterEach
+  public void verifyAndTearDown()
+  {
+    waitUntilPublishedRecordsAreIngested(totalRecords);
+    verifySupervisorIsRunningHealthy(supervisorSpec.getId());
+    cluster.callApi().postSupervisor(supervisorSpec.createSuspendedSpec());
+    kafkaServer.deleteTopic(topic);
+    verifyRowCount(totalRecords);
+  }
+
+  @ParameterizedTest
+  @ValueSource(booleans = {true, false})
+  public void test_supervisorRecovers_afterOverlordRestart(boolean 
useTransactions) throws Exception
+  {
+    totalRecords = publish1kRecords(topic, useTransactions);
+    waitUntilPublishedRecordsAreIngested(totalRecords);
+
+    overlord.stop();
+    totalRecords += publish1kRecords(topic, useTransactions);
+
+    overlord.start();
+    totalRecords += publish1kRecords(topic, useTransactions);
+  }
+
+  @Test
+  public void test_supervisorRecovers_afterCoordinatorRestart() throws 
Exception
+  {
+    final boolean useTransactions = true;
+    totalRecords = publish1kRecords(topic, useTransactions);
+    waitUntilPublishedRecordsAreIngested(totalRecords);
+
+    coordinator.stop();
+    totalRecords += publish1kRecords(topic, useTransactions);
+
+    coordinator.start();
+    totalRecords += publish1kRecords(topic, useTransactions);
+  }
+
+  @Test
+  public void test_supervisorRecovers_afterHistoricalRestart() throws Exception
+  {
+    final boolean useTransactions = false;
+    totalRecords = publish1kRecords(topic, useTransactions);
+    waitUntilPublishedRecordsAreIngested(totalRecords);
+
+    historical.stop();
+    totalRecords += publish1kRecords(topic, useTransactions);
+
+    historical.start();
+    totalRecords += publish1kRecords(topic, useTransactions);
+  }
+
+  @ParameterizedTest
+  @ValueSource(booleans = {true, false})
+  public void test_supervisorRecovers_afterSuspendResume(boolean 
useTransactions)
+  {
+    totalRecords = publish1kRecords(topic, useTransactions);
+    waitUntilPublishedRecordsAreIngested(totalRecords);
+
+    cluster.callApi().postSupervisor(supervisorSpec.createSuspendedSpec());
+    totalRecords += publish1kRecords(topic, useTransactions);
+
+    cluster.callApi().postSupervisor(supervisorSpec.createRunningSpec());
+    totalRecords += publish1kRecords(topic, useTransactions);
+  }
+
+  @ParameterizedTest
+  @ValueSource(booleans = {true, false})
+  public void test_supervisorRecovers_afterChangeInTopicPartitions(boolean 
useTransactions)
+  {
+    totalRecords = publish1kRecords(topic, useTransactions);
+
+    kafkaServer.increasePartitionsInTopic(topic, 4);
+    totalRecords += publish1kRecords(topic, useTransactions);
+  }
+
+  @Test
+  public void test_supervisorLaunchesNewTask_ifEarlyHandoff()
+  {
+    final boolean useTransactions = true;
+    totalRecords = publish1kRecords(topic, useTransactions);
+
+    waitUntilPublishedRecordsAreIngested(totalRecords);
+
+    final Set<String> taskIdsBeforeHandoff = getRunningTaskIds(dataSource);
+    Assertions.assertFalse(taskIdsBeforeHandoff.isEmpty());
+
+    final String path = StringUtils.format(
+        "/druid/indexer/v1/supervisor/%s/taskGroups/handoff",
+        supervisorSpec.getId()
+    );
+    cluster.callApi().serviceClient().onLeaderOverlord(
+        mapper -> new RequestBuilder(HttpMethod.POST, path)
+            .jsonContent(mapper, Map.of("taskGroupIds", List.of(0, 1))),
+        new TypeReference<>() {}
+    );
+
+    // Wait for the handoff notice to be processed
+    overlord.latchableEmitter().waitForEvent(
+        event -> event.hasMetricName("ingest/notices/time")
+                      .hasDimension(DruidMetrics.SUPERVISOR_ID, 
supervisorSpec.getId())
+                      .hasDimension("noticeType", "handoff_task_group_notice")
+    );
+
+    totalRecords += publish1kRecords(topic, useTransactions);
+    waitUntilPublishedRecordsAreIngested(totalRecords);
+
+    // Verify that the running task IDs have changed
+    final Set<String> taskIdsAfterHandoff = getRunningTaskIds(dataSource);
+    Assertions.assertFalse(taskIdsAfterHandoff.isEmpty());
+    
Assertions.assertFalse(taskIdsBeforeHandoff.stream().anyMatch(taskIdsAfterHandoff::contains));
+  }
+
+  private Set<String> getRunningTaskIds(String dataSource)
+  {
+    return cluster.callApi()
+                  .getTasks(dataSource, "running")
+                  .stream()
+                  .map(TaskStatusPlus::getId)
+                  .collect(Collectors.toSet());
+  }
+}
diff --git 
a/embedded-tests/src/test/java/org/apache/druid/testing/embedded/indexing/KafkaTestBase.java
 
b/embedded-tests/src/test/java/org/apache/druid/testing/embedded/indexing/KafkaTestBase.java
new file mode 100644
index 00000000000..3dd9efb4a9e
--- /dev/null
+++ 
b/embedded-tests/src/test/java/org/apache/druid/testing/embedded/indexing/KafkaTestBase.java
@@ -0,0 +1,148 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.testing.embedded.indexing;
+
+import org.apache.druid.data.input.impl.TimestampSpec;
+import org.apache.druid.indexing.kafka.KafkaIndexTaskModule;
+import org.apache.druid.indexing.kafka.simulate.KafkaResource;
+import org.apache.druid.indexing.kafka.supervisor.KafkaSupervisorSpecBuilder;
+import org.apache.druid.indexing.overlord.supervisor.SupervisorStatus;
+import org.apache.druid.query.DruidMetrics;
+import org.apache.druid.testing.embedded.EmbeddedBroker;
+import org.apache.druid.testing.embedded.EmbeddedCoordinator;
+import org.apache.druid.testing.embedded.EmbeddedDruidCluster;
+import org.apache.druid.testing.embedded.EmbeddedHistorical;
+import org.apache.druid.testing.embedded.EmbeddedIndexer;
+import org.apache.druid.testing.embedded.EmbeddedOverlord;
+import org.apache.druid.testing.embedded.EmbeddedRouter;
+import org.apache.druid.testing.embedded.junit5.EmbeddedClusterTestBase;
+import org.apache.druid.testing.tools.EventSerializer;
+import org.apache.druid.testing.tools.JsonEventSerializer;
+import org.apache.druid.testing.tools.StreamGenerator;
+import org.apache.druid.testing.tools.WikipediaStreamEventStreamGenerator;
+import org.apache.kafka.clients.producer.ProducerRecord;
+import org.junit.jupiter.api.Assertions;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+
+public abstract class KafkaTestBase extends EmbeddedClusterTestBase
+{
+  protected final KafkaResource kafkaServer = new KafkaResource();
+  protected final EmbeddedOverlord overlord = new EmbeddedOverlord();
+  protected final EmbeddedIndexer indexer = new EmbeddedIndexer();
+  protected final EmbeddedBroker broker = new EmbeddedBroker();
+  protected final EmbeddedHistorical historical = new EmbeddedHistorical();
+  protected final EmbeddedCoordinator coordinator = new EmbeddedCoordinator();
+
+  @Override
+  protected EmbeddedDruidCluster createCluster()
+  {
+    indexer.addProperty("druid.segment.handoff.pollDuration", "PT0.1s");
+
+    return EmbeddedDruidCluster
+        .withEmbeddedDerbyAndZookeeper()
+        .useContainerFriendlyHostname()
+        .addExtension(KafkaIndexTaskModule.class)
+        .useLatchableEmitter()
+        .useDefaultTimeoutForLatchableEmitter(60)
+        .addResource(kafkaServer)
+        .addServer(indexer)
+        .addServer(coordinator)
+        .addServer(overlord)
+        .addServer(broker)
+        .addServer(historical)
+        .addServer(new EmbeddedRouter());
+  }
+
+  protected KafkaSupervisorSpecBuilder createSupervisor()
+  {
+    return MoreResources.Supervisor.KAFKA_JSON
+        .get()
+        .withDataSchema(schema -> schema.withTimestamp(new 
TimestampSpec("timestamp", "iso", null)))
+        .withTuningConfig(tuningConfig -> 
tuningConfig.withMaxRowsPerSegment(1))
+        .withIoConfig(
+            ioConfig -> ioConfig
+                .withConsumerProperties(kafkaServer.consumerProperties())
+                .withTaskCount(2)
+        );
+  }
+
+  /**
+   * Waits until number of processed events matches {@code expectedRowCount}.
+   */
+  protected void waitUntilPublishedRecordsAreIngested(int expectedRowCount)
+  {
+    indexer.latchableEmitter().waitForEventAggregate(
+        event -> event.hasMetricName("ingest/events/processed")
+                      .hasDimension(DruidMetrics.DATASOURCE, dataSource),
+        agg -> agg.hasSumAtLeast(expectedRowCount)
+    );
+
+    final int totalEventsProcessed = indexer
+        .latchableEmitter()
+        .getMetricValues("ingest/events/processed", 
Map.of(DruidMetrics.DATASOURCE, dataSource))
+        .stream()
+        .mapToInt(Number::intValue)
+        .sum();
+    Assertions.assertEquals(expectedRowCount, totalEventsProcessed);
+  }
+
+  protected void verifySupervisorIsRunningHealthy(String supervisorId)
+  {
+    final SupervisorStatus status = 
cluster.callApi().getSupervisorStatus(supervisorId);
+    Assertions.assertTrue(status.isHealthy());
+    Assertions.assertFalse(status.isSuspended());
+    Assertions.assertEquals("RUNNING", status.getState());
+  }
+
+  /**
+   * Verifies that the row count in {@link #dataSource} matches the {@code 
expectedRowCount}.
+   */
+  protected void verifyRowCount(int expectedRowCount)
+  {
+    cluster.callApi().waitForAllSegmentsToBeAvailable(dataSource, coordinator, 
broker);
+    cluster.callApi().verifySqlQuery(
+        "SELECT COUNT(*) FROM %s",
+        dataSource,
+        String.valueOf(expectedRowCount)
+    );
+  }
+
+  protected int publish1kRecords(String topic, boolean useTransactions)
+  {
+    final EventSerializer serializer = new 
JsonEventSerializer(overlord.bindings().jsonMapper());
+    final StreamGenerator streamGenerator = new 
WikipediaStreamEventStreamGenerator(serializer, 100, 100);
+    List<byte[]> records = streamGenerator.generateEvents(10);
+
+    ArrayList<ProducerRecord<byte[], byte[]>> producerRecords = new 
ArrayList<>();
+    for (byte[] record : records) {
+      producerRecords.add(new ProducerRecord<>(topic, record));
+    }
+
+    if (useTransactions) {
+      kafkaServer.produceRecordsToTopic(producerRecords);
+    } else {
+      kafkaServer.produceRecordsWithoutTransaction(producerRecords);
+    }
+    return producerRecords.size();
+  }
+}
diff --git 
a/embedded-tests/src/test/java/org/apache/druid/testing/embedded/server/FaultyClusterTest.java
 
b/embedded-tests/src/test/java/org/apache/druid/testing/embedded/server/FaultyClusterTest.java
new file mode 100644
index 00000000000..a1a1622f535
--- /dev/null
+++ 
b/embedded-tests/src/test/java/org/apache/druid/testing/embedded/server/FaultyClusterTest.java
@@ -0,0 +1,159 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.testing.embedded.server;
+
+import com.fasterxml.jackson.core.type.TypeReference;
+import org.apache.druid.common.utils.IdUtils;
+import org.apache.druid.guice.ClusterTestingModule;
+import org.apache.druid.indexer.TaskStatusPlus;
+import org.apache.druid.indexing.kafka.supervisor.KafkaSupervisorReportPayload;
+import org.apache.druid.indexing.kafka.supervisor.KafkaSupervisorSpec;
+import org.apache.druid.indexing.overlord.supervisor.SupervisorReport;
+import org.apache.druid.java.util.common.Intervals;
+import org.apache.druid.java.util.common.StringUtils;
+import org.apache.druid.metadata.PendingSegmentRecord;
+import org.apache.druid.query.DruidMetrics;
+import org.apache.druid.rpc.RequestBuilder;
+import org.apache.druid.testing.cluster.overlord.FaultyLagAggregator;
+import org.apache.druid.testing.embedded.EmbeddedDruidCluster;
+import org.apache.druid.testing.embedded.indexing.KafkaTestBase;
+import org.hamcrest.Matchers;
+import org.jboss.netty.handler.codec.http.HttpMethod;
+import org.joda.time.Period;
+import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.Timeout;
+
+import java.util.List;
+import java.util.Map;
+
+/**
+ * Integration test to verify induction of various faults in the cluster
+ * using {@code ClusterTestingTaskConfig}.
+ * <p>
+ * Future tests can try to leverage the cluster testing config to verify 
cluster
+ * scalability and stability.
+ */
+public class FaultyClusterTest extends KafkaTestBase
+{
+  @Override
+  protected EmbeddedDruidCluster createCluster()
+  {
+    return super
+        .createCluster()
+        .addExtension(ClusterTestingModule.class)
+        .addCommonProperty("druid.monitoring.emissionPeriod", "PT0.1s")
+        .addCommonProperty("druid.unsafe.cluster.testing", "true");
+  }
+
+  @Test
+  public void test_overlord_skipsCleanupOfPendingSegments()
+  {
+    final Map<String, Object> taskContext = Map.of(
+        "clusterTesting",
+        Map.of("metadataConfig", Map.of("cleanupPendingSegments", false))
+    );
+
+    // Set up the topic and supervisor
+    final String topic = IdUtils.getRandomId();
+    kafkaServer.createTopicWithPartitions(topic, 1);
+    final KafkaSupervisorSpec supervisorSpec = createSupervisor()
+        .withIoConfig(io -> io.withTaskCount(1))
+        .withContext(taskContext)
+        .withId("supe_" + dataSource)
+        .build(dataSource, topic);
+    cluster.callApi().postSupervisor(supervisorSpec);
+
+    final int recordCount = publish1kRecords(topic, true);
+    waitUntilPublishedRecordsAreIngested(recordCount);
+
+    cluster.callApi().postSupervisor(supervisorSpec.createSuspendedSpec());
+    kafkaServer.deleteTopic(topic);
+
+    verifyRowCount(recordCount);
+
+    // Verify that pending segments are not cleaned up
+    final List<PendingSegmentRecord> pendingSegments = overlord
+        .bindings()
+        .segmentsMetadataStorage()
+        .getPendingSegments(dataSource, Intervals.ETERNITY);
+    Assertions.assertFalse(pendingSegments.isEmpty());
+  }
+
+  @Test
+  @Timeout(60)
+  public void test_supervisor_reportsMagnifiedLag()
+  {
+    // Set up the topic and supervisor
+    final String topic = IdUtils.getRandomId();
+    kafkaServer.createTopicWithPartitions(topic, 2);
+
+    final int lagMultiplier = 1_000_000;
+    final KafkaSupervisorSpec supervisorSpec = createSupervisor()
+        .withIoConfig(
+            io -> io
+                .withTaskCount(2)
+                .withLagAggregator(new FaultyLagAggregator(lagMultiplier))
+        )
+        .withContext(
+            // Delay segment allocation so that tasks are not able to ingest 
anything
+            Map.of(
+                "clusterTesting",
+                Map.of("taskActionClientConfig", 
Map.of("segmentAllocateDelay", "P1D"))
+            )
+        )
+        .withTuningConfig(t -> t.withOffsetFetchPeriod(Period.millis(10)))
+        .withId("supe_" + dataSource)
+        .build(dataSource, topic);
+
+    cluster.callApi().postSupervisor(supervisorSpec);
+
+    // Publish records to build up some lag
+    final int totalPublishedRecords = publish1kRecords(topic, true);
+
+    // Wait for supervisor to report the expected lag
+    final long expectedLag = (long) totalPublishedRecords * lagMultiplier;
+    overlord.latchableEmitter().waitForEvent(
+        event -> event.hasMetricName("ingest/kafka/lag")
+                      .hasDimension(DruidMetrics.SUPERVISOR_ID, 
supervisorSpec.getId())
+                      
.hasValueMatching(Matchers.greaterThanOrEqualTo(expectedLag))
+    );
+
+    final String path = 
StringUtils.format("/druid/indexer/v1/supervisor/%s/status", 
supervisorSpec.getId());
+    final SupervisorReport<KafkaSupervisorReportPayload> report = 
cluster.callApi().serviceClient().onLeaderOverlord(
+        mapper -> new RequestBuilder(HttpMethod.GET, path),
+        new TypeReference<>() {}
+    );
+
+    Assertions.assertNotNull(report);
+    final KafkaSupervisorReportPayload payload = report.getPayload();
+    Assertions.assertNotNull(payload);
+
+    Assertions.assertFalse(payload.isSuspended());
+    Assertions.assertTrue(payload.getAggregateLag() >= expectedLag);
+
+    // Kill ongoing tasks and suspend the supervisor
+    for (TaskStatusPlus task : cluster.callApi().getTasks(dataSource, 
"running")) {
+      cluster.callApi().onLeaderOverlord(o -> o.cancelTask(task.getId()));
+    }
+    cluster.callApi().postSupervisor(supervisorSpec.createSuspendedSpec());
+    kafkaServer.deleteTopic(topic);
+  }
+}
diff --git 
a/embedded-tests/src/test/java/org/apache/druid/testing/embedded/server/KafkaMultiSupervisorTest.java
 
b/embedded-tests/src/test/java/org/apache/druid/testing/embedded/server/KafkaMultiSupervisorTest.java
new file mode 100644
index 00000000000..8d2123a52b3
--- /dev/null
+++ 
b/embedded-tests/src/test/java/org/apache/druid/testing/embedded/server/KafkaMultiSupervisorTest.java
@@ -0,0 +1,95 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.testing.embedded.server;
+
+import org.apache.druid.common.utils.IdUtils;
+import org.apache.druid.data.input.impl.DimensionSchema;
+import org.apache.druid.data.input.impl.DimensionsSpec;
+import org.apache.druid.data.input.impl.JsonInputFormat;
+import org.apache.druid.indexing.kafka.supervisor.KafkaSupervisorSpec;
+import org.apache.druid.testing.embedded.indexing.KafkaTestBase;
+import org.apache.druid.testing.tools.WikipediaStreamEventStreamGenerator;
+import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.params.ParameterizedTest;
+import org.junit.jupiter.params.provider.ValueSource;
+
+import java.util.List;
+
+/**
+ * Contains miscellaneous tests for Kafka supervisors.
+ */
+public class KafkaMultiSupervisorTest extends KafkaTestBase
+{
+  @ParameterizedTest
+  @ValueSource(booleans = {true, false})
+  public void test_ingestIntoSingleDatasource_fromDifferentTopics(boolean 
useTransactions)
+  {
+    final List<DimensionSchema> dimensionsSpec = 
DimensionsSpec.getDefaultSchemas(
+        List.of("kafka.topic", 
WikipediaStreamEventStreamGenerator.COL_UNIQUE_NAMESPACE)
+    );
+
+    // Set up first topic and supervisor
+    final String topic1 = IdUtils.getRandomId();
+    kafkaServer.createTopicWithPartitions(topic1, 1);
+
+    final KafkaSupervisorSpec supervisor1 = createSupervisor()
+        .withIoConfig(io -> io.withKafkaInputFormat(new JsonInputFormat(null, 
null, null, null, null)))
+        .withDataSchema(d -> d.withDimensions(dimensionsSpec))
+        .withId(topic1)
+        .build(dataSource, topic1);
+    cluster.callApi().postSupervisor(supervisor1);
+
+    // Set up another topic and supervisor
+    final String topic2 = IdUtils.getRandomId();
+    kafkaServer.createTopicWithPartitions(topic2, 1);
+
+    final KafkaSupervisorSpec supervisor2 = createSupervisor()
+        .withIoConfig(io -> io.withKafkaInputFormat(new JsonInputFormat(null, 
null, null, null, null)))
+        .withDataSchema(d -> d.withDimensions(dimensionsSpec))
+        .withId(topic2)
+        .build(dataSource, topic2);
+    cluster.callApi().postSupervisor(supervisor2);
+
+    // Produce records to both topics
+    final int rowCountTopic1 = publish1kRecords(topic1, useTransactions)
+                               + publish1kRecords(topic1, useTransactions);
+    final int rowCountTopic2 = publish1kRecords(topic2, useTransactions);
+
+    final int totalRowCount = rowCountTopic1 + rowCountTopic2;
+    waitUntilPublishedRecordsAreIngested(totalRowCount);
+
+    // Tear down both topics and supervisors
+    kafkaServer.deleteTopic(topic1);
+    cluster.callApi().postSupervisor(supervisor1.createSuspendedSpec());
+    kafkaServer.deleteTopic(topic2);
+    cluster.callApi().postSupervisor(supervisor2.createSuspendedSpec());
+
+    // Verify total row count and row count for each topic
+    verifyRowCount(totalRowCount);
+    Assertions.assertEquals(
+        "2000",
+        cluster.runSql("SELECT COUNT(*) FROM %s WHERE \"kafka.topic\" = '%s'", 
dataSource, topic1)
+    );
+    Assertions.assertEquals(
+        "1000",
+        cluster.runSql("SELECT COUNT(*) FROM %s WHERE \"kafka.topic\" = '%s'", 
dataSource, topic2)
+    );
+  }
+}
diff --git 
a/extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/indexing/kafka/supervisor/KafkaSupervisorReportPayload.java
 
b/extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/indexing/kafka/supervisor/KafkaSupervisorReportPayload.java
index 897d1655ab8..589e42e6175 100644
--- 
a/extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/indexing/kafka/supervisor/KafkaSupervisorReportPayload.java
+++ 
b/extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/indexing/kafka/supervisor/KafkaSupervisorReportPayload.java
@@ -19,6 +19,8 @@
 
 package org.apache.druid.indexing.kafka.supervisor;
 
+import com.fasterxml.jackson.annotation.JsonCreator;
+import com.fasterxml.jackson.annotation.JsonProperty;
 import org.apache.druid.data.input.kafka.KafkaTopicPartition;
 import org.apache.druid.indexing.overlord.supervisor.SupervisorStateManager;
 import 
org.apache.druid.indexing.seekablestream.supervisor.SeekableStreamSupervisorReportPayload;
@@ -30,29 +32,30 @@ import java.util.Map;
 
 public class KafkaSupervisorReportPayload extends 
SeekableStreamSupervisorReportPayload<KafkaTopicPartition, Long>
 {
+  @JsonCreator
   public KafkaSupervisorReportPayload(
-      String id,
-      String dataSource,
-      String topic,
-      int partitions,
-      int replicas,
-      long durationSeconds,
-      @Nullable Map<KafkaTopicPartition, Long> latestOffsets,
-      @Nullable Map<KafkaTopicPartition, Long> minimumLag,
-      @Nullable Map<KafkaTopicPartition, Long> minimumLagMillis,
-      @Nullable Long aggregateLag,
-      @Nullable DateTime offsetsLastUpdated,
-      boolean suspended,
-      boolean healthy,
-      SupervisorStateManager.State state,
-      SupervisorStateManager.State detailedState,
-      List<SupervisorStateManager.ExceptionEvent> recentErrors
+      @JsonProperty("id") String id,
+      @JsonProperty("dataSource") String dataSource,
+      @JsonProperty("stream") String stream,
+      @JsonProperty("partitions") int partitions,
+      @JsonProperty("replicas") int replicas,
+      @JsonProperty("durationSeconds") long durationSeconds,
+      @JsonProperty("latestOffsets") @Nullable Map<KafkaTopicPartition, Long> 
latestOffsets,
+      @JsonProperty("minimumLag") @Nullable Map<KafkaTopicPartition, Long> 
minimumLag,
+      @JsonProperty("minimumLagMillis") @Nullable Map<KafkaTopicPartition, 
Long> minimumLagMillis,
+      @JsonProperty("aggregateLag") @Nullable Long aggregateLag,
+      @JsonProperty("offsetsLastUpdated") @Nullable DateTime 
offsetsLastUpdated,
+      @JsonProperty("suspended") boolean suspended,
+      @JsonProperty("healthy") boolean healthy,
+      @JsonProperty("ignored1") SupervisorStateManager.State state, // No 
serializer for State class
+      @JsonProperty("ignored2") SupervisorStateManager.State detailedState,
+      @JsonProperty("recentErrors") 
List<SupervisorStateManager.ExceptionEvent> recentErrors
   )
   {
     super(
         id,
         dataSource,
-        topic,
+        stream,
         partitions,
         replicas,
         durationSeconds,
diff --git 
a/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/simulate/EmbeddedKafkaSupervisorTest.java
 
b/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/simulate/EmbeddedKafkaSupervisorTest.java
index c2517331035..6f0fa52317d 100644
--- 
a/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/simulate/EmbeddedKafkaSupervisorTest.java
+++ 
b/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/simulate/EmbeddedKafkaSupervisorTest.java
@@ -30,6 +30,7 @@ import org.apache.druid.indexing.kafka.KafkaIndexTaskModule;
 import org.apache.druid.indexing.kafka.supervisor.KafkaSupervisorSpec;
 import org.apache.druid.indexing.kafka.supervisor.KafkaSupervisorSpecBuilder;
 import org.apache.druid.indexing.overlord.supervisor.SupervisorStatus;
+import org.apache.druid.indexing.seekablestream.supervisor.IdleConfig;
 import org.apache.druid.java.util.common.DateTimes;
 import org.apache.druid.java.util.common.StringUtils;
 import org.apache.druid.java.util.common.parsers.CloseableIterator;
@@ -45,6 +46,7 @@ import 
org.apache.druid.testing.embedded.junit5.EmbeddedClusterTestBase;
 import org.apache.kafka.clients.producer.ProducerRecord;
 import org.joda.time.DateTime;
 import org.joda.time.Interval;
+import org.joda.time.Period;
 import org.junit.jupiter.api.Assertions;
 import org.junit.jupiter.api.Test;
 
@@ -55,10 +57,14 @@ import java.util.concurrent.ThreadLocalRandom;
 
 public class EmbeddedKafkaSupervisorTest extends EmbeddedClusterTestBase
 {
+  private static final String COL_ITEM = "item";
+  private static final String COL_TIMESTAMP = "timestamp";
+
   private final EmbeddedBroker broker = new EmbeddedBroker();
   private final EmbeddedIndexer indexer = new EmbeddedIndexer();
   private final EmbeddedOverlord overlord = new EmbeddedOverlord();
   private final EmbeddedHistorical historical = new EmbeddedHistorical();
+  private final EmbeddedCoordinator coordinator = new EmbeddedCoordinator();
   private KafkaResource kafkaServer;
 
   @Override
@@ -72,7 +78,7 @@ public class EmbeddedKafkaSupervisorTest extends 
EmbeddedClusterTestBase
     cluster.addExtension(KafkaIndexTaskModule.class)
            .addResource(kafkaServer)
            .useLatchableEmitter()
-           .addServer(new EmbeddedCoordinator())
+           .addServer(coordinator)
            .addServer(overlord)
            .addServer(indexer)
            .addServer(historical)
@@ -94,7 +100,8 @@ public class EmbeddedKafkaSupervisorTest extends 
EmbeddedClusterTestBase
 
     // Submit and start a supervisor
     final String supervisorId = dataSource + "_supe";
-    final KafkaSupervisorSpec kafkaSupervisorSpec = 
createKafkaSupervisor(supervisorId, topic);
+    final KafkaSupervisorSpec kafkaSupervisorSpec
+        = newKafkaSupervisor().withId(supervisorId).build(dataSource, topic);
 
     Assertions.assertEquals(supervisorId, 
cluster.callApi().postSupervisor(kafkaSupervisorSpec));
 
@@ -142,12 +149,87 @@ public class EmbeddedKafkaSupervisorTest extends 
EmbeddedClusterTestBase
     Assertions.assertEquals(0, lockedIntervals.size());
   }
 
-  private KafkaSupervisorSpec createKafkaSupervisor(String supervisorId, 
String topic)
+  @Test
+  public void test_supervisorBecomesIdle_ifTopicHasNoData()
+  {
+    final String topic = IdUtils.getRandomId();
+    kafkaServer.createTopicWithPartitions(topic, 2);
+
+    final long idleAfterMillis = 100L;
+    final KafkaSupervisorSpec supervisorSpec = newKafkaSupervisor()
+        .withIoConfig(ioConfig -> ioConfig.withIdleConfig(new IdleConfig(true, 
idleAfterMillis)).withTaskCount(1))
+        .build(dataSource, topic);
+    cluster.callApi().postSupervisor(supervisorSpec);
+
+    // Wait for the first set of tasks to finish
+    overlord.latchableEmitter().waitForEvent(
+        event -> event.hasMetricName("task/run/time")
+                      .hasDimension(DruidMetrics.DATASOURCE, dataSource)
+    );
+
+    // Verify that the supervisor is now idle
+    final SupervisorStatus status = 
cluster.callApi().getSupervisorStatus(supervisorSpec.getId());
+    Assertions.assertFalse(status.isSuspended());
+    Assertions.assertTrue(status.isHealthy());
+    Assertions.assertEquals("IDLE", status.getState());
+
+    cluster.callApi().postSupervisor(supervisorSpec.createSuspendedSpec());
+    kafkaServer.deleteTopic(topic);
+  }
+
+  @Test
+  public void test_runSupervisor_withEmptyDimension()
+  {
+    final String topic = IdUtils.getRandomId();
+    kafkaServer.createTopicWithPartitions(topic, 2);
+
+    final String emptyColumn = "unknownColumn";
+    final KafkaSupervisorSpec supervisorSpec = newKafkaSupervisor()
+        .withDataSchema(
+            s -> s.withDimensions(
+                DimensionsSpec.getDefaultSchemas(List.of(emptyColumn, 
COL_ITEM))
+            )
+        )
+        .build(dataSource, topic);
+    cluster.callApi().postSupervisor(supervisorSpec);
+
+    final int numRows = 100;
+    kafkaServer.produceRecordsToTopic(generateRecordsForTopic(topic, numRows, 
DateTimes.nowUtc()));
+
+    indexer.latchableEmitter().waitForEventAggregate(
+        event -> event.hasMetricName("ingest/events/processed")
+                      .hasDimension(DruidMetrics.DATASOURCE, dataSource),
+        agg -> agg.hasSumAtLeast(numRows)
+    );
+
+    cluster.callApi().postSupervisor(supervisorSpec.createSuspendedSpec());
+    cluster.callApi().waitForAllSegmentsToBeAvailable(dataSource, coordinator, 
broker);
+
+    Assertions.assertEquals(
+        "100",
+        cluster.runSql("SELECT COUNT(*) FROM %s WHERE %s IS NULL", dataSource, 
emptyColumn)
+    );
+    Assertions.assertEquals(
+        "0",
+        cluster.runSql("SELECT COUNT(*) FROM %s WHERE %s IS NOT NULL", 
dataSource, emptyColumn)
+    );
+    Assertions.assertEquals(
+        StringUtils.format("%s,YES,VARCHAR", emptyColumn),
+        cluster.runSql(
+            "SELECT COLUMN_NAME, IS_NULLABLE, DATA_TYPE"
+            + " FROM INFORMATION_SCHEMA.COLUMNS"
+            + " WHERE TABLE_NAME = '%s' AND COLUMN_NAME = '%s'",
+            dataSource, emptyColumn
+        )
+    );
+  }
+
+  private KafkaSupervisorSpecBuilder newKafkaSupervisor()
   {
     return new KafkaSupervisorSpecBuilder()
         .withDataSchema(
             schema -> schema
-                .withTimestamp(new TimestampSpec("timestamp", null, null))
+                .withTimestamp(new TimestampSpec(COL_TIMESTAMP, null, null))
                 .withDimensions(DimensionsSpec.EMPTY)
         )
         .withTuningConfig(
@@ -157,12 +239,14 @@ public class EmbeddedKafkaSupervisorTest extends 
EmbeddedClusterTestBase
         )
         .withIoConfig(
             ioConfig -> ioConfig
-                .withInputFormat(new CsvInputFormat(List.of("timestamp", 
"item"), null, null, false, 0, false))
+                .withInputFormat(new CsvInputFormat(List.of(COL_TIMESTAMP, 
COL_ITEM), null, null, false, 0, false))
                 .withConsumerProperties(kafkaServer.consumerProperties())
+                .withTaskDuration(Period.millis(500))
+                .withStartDelay(Period.millis(10))
+                .withSupervisorRunPeriod(Period.millis(500))
+                .withCompletionTimeout(Period.seconds(5))
                 .withUseEarliestSequenceNumber(true)
-        )
-        .withId(supervisorId)
-        .build(dataSource, topic);
+        );
   }
 
   private List<ProducerRecord<byte[], byte[]>> generateRecordsForTopic(
diff --git 
a/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/simulate/KafkaResource.java
 
b/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/simulate/KafkaResource.java
index 18e3c79503d..0b4bbfb978b 100644
--- 
a/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/simulate/KafkaResource.java
+++ 
b/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/simulate/KafkaResource.java
@@ -23,8 +23,11 @@ import org.apache.druid.indexing.kafka.KafkaConsumerConfigs;
 import org.apache.druid.testing.embedded.EmbeddedDruidCluster;
 import org.apache.druid.testing.embedded.TestcontainerResource;
 import org.apache.kafka.clients.admin.Admin;
+import org.apache.kafka.clients.admin.CreatePartitionsResult;
+import org.apache.kafka.clients.admin.NewPartitions;
 import org.apache.kafka.clients.admin.NewTopic;
 import org.apache.kafka.clients.producer.KafkaProducer;
+import org.apache.kafka.clients.producer.ProducerConfig;
 import org.apache.kafka.clients.producer.ProducerRecord;
 import org.apache.kafka.common.serialization.ByteArraySerializer;
 import org.testcontainers.kafka.KafkaContainer;
@@ -119,6 +122,26 @@ public class KafkaResource extends 
TestcontainerResource<KafkaContainer>
     }
   }
 
+  /**
+   * Increases the number of partitions in the given Kakfa topic. The topic 
must
+   * already exist. This method waits until the increase in the partition count
+   * has started (but not necessarily finished).
+   */
+  public void increasePartitionsInTopic(String topic, int newPartitionCount)
+  {
+    try (Admin admin = newAdminClient()) {
+      final CreatePartitionsResult result = admin.createPartitions(
+          Map.of(topic, NewPartitions.increaseTo(newPartitionCount))
+      );
+
+      // Wait for the partitioning to start
+      result.values().get(topic).get();
+    }
+    catch (Exception e) {
+      throw new RuntimeException(e);
+    }
+  }
+
   public void produceRecordsToTopic(
       List<ProducerRecord<byte[], byte[]>> records,
       Map<String, Object> extraProducerProperties
@@ -145,6 +168,25 @@ public class KafkaResource extends 
TestcontainerResource<KafkaContainer>
     produceRecordsToTopic(records, null);
   }
 
+  /**
+   * Produces records to a topic of this embedded Kafka server without using
+   * Kafka transactions.
+   */
+  public void produceRecordsWithoutTransaction(List<ProducerRecord<byte[], 
byte[]>> records)
+  {
+    final Map<String, Object> props = producerProperties();
+    props.remove(ProducerConfig.TRANSACTIONAL_ID_CONFIG);
+
+    try (final KafkaProducer<byte[], byte[]> kafkaProducer = new 
KafkaProducer<>(props)) {
+      for (ProducerRecord<byte[], byte[]> record : records) {
+        kafkaProducer.send(record);
+      }
+    }
+    catch (Exception e) {
+      throw new RuntimeException(e);
+    }
+  }
+
   public Map<String, Object> producerProperties()
   {
     final Map<String, Object> props = new HashMap<>(commonClientProperties());
diff --git 
a/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/supervisor/KafkaSupervisorSpecBuilder.java
 
b/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/supervisor/KafkaSupervisorSpecBuilder.java
index ce36c0b7dc0..4cd8c523185 100644
--- 
a/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/supervisor/KafkaSupervisorSpecBuilder.java
+++ 
b/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/supervisor/KafkaSupervisorSpecBuilder.java
@@ -21,6 +21,7 @@ package org.apache.druid.indexing.kafka.supervisor;
 
 import org.apache.druid.segment.indexing.DataSchema;
 
+import java.util.Map;
 import java.util.function.Consumer;
 
 /**
@@ -33,6 +34,7 @@ public class KafkaSupervisorSpecBuilder
   private final DataSchema.Builder dataSchema = new DataSchema.Builder();
   private final KafkaIOConfigBuilder ioConfig = new KafkaIOConfigBuilder();
   private final KafkaTuningConfigBuilder tuningConfig = new 
KafkaTuningConfigBuilder();
+  private Map<String, Object> context;
 
   public KafkaSupervisorSpecBuilder 
withDataSchema(Consumer<DataSchema.Builder> updateDataSchema)
   {
@@ -52,6 +54,12 @@ public class KafkaSupervisorSpecBuilder
     return this;
   }
 
+  public KafkaSupervisorSpecBuilder withContext(Map<String, Object> context)
+  {
+    this.context = context;
+    return this;
+  }
+
   public KafkaSupervisorSpecBuilder withId(String id)
   {
     this.id = id;
@@ -87,7 +95,7 @@ public class KafkaSupervisorSpecBuilder
         dataSchema.build(),
         tuningConfig.build(),
         ioConfig.build(),
-        null,
+        context,
         false,
         // Jackson injected params, not needed while posting a supervisor to 
the Overlord
         null, null, null, null, null, null, null, null, null
diff --git 
a/extensions-core/testing-tools/src/main/java/org/apache/druid/guice/ClusterTestingModule.java
 
b/extensions-core/testing-tools/src/main/java/org/apache/druid/guice/ClusterTestingModule.java
index 4c48d1ae760..22f20772f8f 100644
--- 
a/extensions-core/testing-tools/src/main/java/org/apache/druid/guice/ClusterTestingModule.java
+++ 
b/extensions-core/testing-tools/src/main/java/org/apache/druid/guice/ClusterTestingModule.java
@@ -116,6 +116,9 @@ public class ClusterTestingModule implements DruidModule
       binder.bind(OverlordClient.class)
             .to(FaultyOverlordClient.class)
             .in(LazySingleton.class);
+      binder.bind(RemoteTaskActionClientFactory.class)
+            .to(FaultyRemoteTaskActionClientFactory.class)
+            .in(LazySingleton.class);
     }
   }
 
diff --git 
a/extensions-core/testing-tools/src/main/java/org/apache/druid/testing/cluster/overlord/FaultyTaskLockbox.java
 
b/extensions-core/testing-tools/src/main/java/org/apache/druid/testing/cluster/overlord/FaultyTaskLockbox.java
index 67ce9e829ae..d10cf6f9880 100644
--- 
a/extensions-core/testing-tools/src/main/java/org/apache/druid/testing/cluster/overlord/FaultyTaskLockbox.java
+++ 
b/extensions-core/testing-tools/src/main/java/org/apache/druid/testing/cluster/overlord/FaultyTaskLockbox.java
@@ -40,6 +40,8 @@ public class FaultyTaskLockbox extends GlobalTaskLockbox
   private static final Logger log = new Logger(FaultyTaskLockbox.class);
 
   private final ObjectMapper mapper;
+  private final TaskStorage taskStorage;
+  private final IndexerMetadataStorageCoordinator metadataStorageCoordinator;
 
   @Inject
   public FaultyTaskLockbox(
@@ -50,21 +52,30 @@ public class FaultyTaskLockbox extends GlobalTaskLockbox
   {
     super(taskStorage, metadataStorageCoordinator);
     this.mapper = mapper;
+    this.taskStorage = taskStorage;
+    this.metadataStorageCoordinator = metadataStorageCoordinator;
     log.info("Initializing FaultyTaskLockbox.");
   }
 
   @Override
-  protected void cleanupPendingSegments(Task task)
+  protected TaskLockbox createLockbox(String dataSource)
   {
-    final ClusterTestingTaskConfig testingConfig = getTestingConfig(task);
-    if (testingConfig.getMetadataConfig().isCleanupPendingSegments()) {
-      super.cleanupPendingSegments(task);
-    } else {
-      log.info(
-          "Skipping cleanup of pending segments for task[%s] since it has 
testing config[%s].",
-          task.getId(), testingConfig
-      );
-    }
+    return new TaskLockbox(dataSource, taskStorage, metadataStorageCoordinator)
+    {
+      @Override
+      protected void cleanupPendingSegments(Task task)
+      {
+        final ClusterTestingTaskConfig testingConfig = getTestingConfig(task);
+        if (testingConfig.getMetadataConfig().isCleanupPendingSegments()) {
+          super.cleanupPendingSegments(task);
+        } else {
+          log.info(
+              "Skipping cleanup of pending segments for task[%s] since it has 
testing config[%s].",
+              task.getId(), testingConfig
+          );
+        }
+      }
+    };
   }
 
   private ClusterTestingTaskConfig getTestingConfig(Task task)
diff --git 
a/extensions-core/testing-tools/src/main/java/org/apache/druid/testing/cluster/task/FaultyRemoteTaskActionClientFactory.java
 
b/extensions-core/testing-tools/src/main/java/org/apache/druid/testing/cluster/task/FaultyRemoteTaskActionClientFactory.java
index 043a49f6d14..f6115c19928 100644
--- 
a/extensions-core/testing-tools/src/main/java/org/apache/druid/testing/cluster/task/FaultyRemoteTaskActionClientFactory.java
+++ 
b/extensions-core/testing-tools/src/main/java/org/apache/druid/testing/cluster/task/FaultyRemoteTaskActionClientFactory.java
@@ -34,15 +34,14 @@ import 
org.apache.druid.testing.cluster.ClusterTestingTaskConfig;
 
 public class FaultyRemoteTaskActionClientFactory extends 
RemoteTaskActionClientFactory
 {
-  private final ClusterTestingTaskConfig config;
+  private final ObjectMapper jsonMapper;
 
   @Inject
   public FaultyRemoteTaskActionClientFactory(
       @Json final ObjectMapper jsonMapper,
       @EscalatedGlobal final ServiceClientFactory clientFactory,
       @IndexingService final ServiceLocator serviceLocator,
-      RetryPolicyConfig retryPolicyConfig,
-      ClusterTestingTaskConfig config
+      RetryPolicyConfig retryPolicyConfig
   )
   {
     super(
@@ -51,12 +50,18 @@ public class FaultyRemoteTaskActionClientFactory extends 
RemoteTaskActionClientF
         serviceLocator,
         retryPolicyConfig
     );
-    this.config = config;
+    this.jsonMapper = jsonMapper;
   }
 
   @Override
   public TaskActionClient create(Task task)
   {
-    return new 
FaultyRemoteTaskActionClient(config.getTaskActionClientConfig(), 
super.create(task));
+    try {
+      final ClusterTestingTaskConfig config = 
ClusterTestingTaskConfig.forTask(task, jsonMapper);
+      return new 
FaultyRemoteTaskActionClient(config.getTaskActionClientConfig(), 
super.create(task));
+    }
+    catch (Exception e) {
+      throw new RuntimeException(e);
+    }
   }
 }
diff --git 
a/indexing-service/src/main/java/org/apache/druid/indexing/overlord/GlobalTaskLockbox.java
 
b/indexing-service/src/main/java/org/apache/druid/indexing/overlord/GlobalTaskLockbox.java
index a464af86868..8b00739167a 100644
--- 
a/indexing-service/src/main/java/org/apache/druid/indexing/overlord/GlobalTaskLockbox.java
+++ 
b/indexing-service/src/main/java/org/apache/druid/indexing/overlord/GlobalTaskLockbox.java
@@ -277,14 +277,11 @@ public class GlobalTaskLockbox
   }
 
   /**
-   * Cleans up pending segments associated with the given task, if any.
+   * Creates a new {@link TaskLockbox} for the given datasource.
    */
-  protected void cleanupPendingSegments(Task task)
+  protected TaskLockbox createLockbox(String dataSource)
   {
-    executeForTask(
-        task,
-        lockbox -> lockbox.cleanupPendingSegments(task)
-    );
+    return new TaskLockbox(dataSource, taskStorage, 
metadataStorageCoordinator);
   }
 
   /**
@@ -484,9 +481,7 @@ public class GlobalTaskLockbox
         (ds, existingResource) -> {
           final DatasourceLockboxResource resource = 
Objects.requireNonNullElseGet(
               existingResource,
-              () -> new DatasourceLockboxResource(
-                  new TaskLockbox(ds, taskStorage, metadataStorageCoordinator)
-              )
+              () -> new DatasourceLockboxResource(createLockbox(datasource))
           );
 
           // Verify sync is complete before acquiring the resource
diff --git 
a/indexing-service/src/main/java/org/apache/druid/indexing/overlord/supervisor/SupervisorResource.java
 
b/indexing-service/src/main/java/org/apache/druid/indexing/overlord/supervisor/SupervisorResource.java
index fdf23850580..b0ed0423818 100644
--- 
a/indexing-service/src/main/java/org/apache/druid/indexing/overlord/supervisor/SupervisorResource.java
+++ 
b/indexing-service/src/main/java/org/apache/druid/indexing/overlord/supervisor/SupervisorResource.java
@@ -432,7 +432,9 @@ public class SupervisorResource
         manager -> {
           try {
             if (manager.handoffTaskGroupsEarly(id, taskGroupIds)) {
-              return Response.ok().build();
+              return Response.status(Response.Status.ACCEPTED)
+                             .entity(Map.of()) // empty json object to allow 
deserialization by the client
+                             .build();
             } else {
               return Response.status(Response.Status.NOT_FOUND)
                              .entity(ImmutableMap.of("error", 
StringUtils.format("Supervisor was not found [%s]", id)))
diff --git 
a/integration-tests-ex/tools/src/main/java/org/apache/druid/testing/tools/WikipediaStreamEventStreamGenerator.java
 
b/integration-tests-ex/tools/src/main/java/org/apache/druid/testing/tools/WikipediaStreamEventStreamGenerator.java
index c357afc0e3d..b26c999ed83 100644
--- 
a/integration-tests-ex/tools/src/main/java/org/apache/druid/testing/tools/WikipediaStreamEventStreamGenerator.java
+++ 
b/integration-tests-ex/tools/src/main/java/org/apache/druid/testing/tools/WikipediaStreamEventStreamGenerator.java
@@ -19,6 +19,7 @@
 
 package org.apache.druid.testing.tools;
 
+import org.apache.druid.common.utils.IdUtils;
 import org.apache.druid.java.util.common.Pair;
 import org.joda.time.DateTime;
 import org.joda.time.format.DateTimeFormat;
@@ -30,6 +31,14 @@ import java.util.List;
 
 public class WikipediaStreamEventStreamGenerator extends 
SyntheticStreamGenerator
 {
+  /**
+   * Column which may serve as a unique identifier for each record. This column
+   * is used instead of a dedicated "id" column since some tests like
+   * {@code KafkaDataFormatsTest} use hardcoded schemas that rely on these
+   * specific column names.
+   */
+  public static final String COL_UNIQUE_NAMESPACE = "namespace";
+
   private static final DateTimeFormatter DATE_TIME_FORMATTER = 
DateTimeFormat.forPattern("yyyy-MM-dd'T'HH:mm:ss'Z'");
 
   public WikipediaStreamEventStreamGenerator(EventSerializer serializer, int 
eventsPerSeconds, long cyclePaddingMs)
@@ -49,7 +58,7 @@ public class WikipediaStreamEventStreamGenerator extends 
SyntheticStreamGenerato
     event.add(Pair.of("newPage", "true"));
     event.add(Pair.of("robot", "false"));
     event.add(Pair.of("anonymous", "false"));
-    event.add(Pair.of("namespace", "article"));
+    event.add(Pair.of(COL_UNIQUE_NAMESPACE, "article " + 
IdUtils.getRandomId()));
     event.add(Pair.of("continent", "North America"));
     event.add(Pair.of("country", "United States"));
     event.add(Pair.of("region", "Bay Area"));
diff --git a/integration-tests/pom.xml b/integration-tests/pom.xml
index 018b387a393..51903553ae9 100644
--- a/integration-tests/pom.xml
+++ b/integration-tests/pom.xml
@@ -179,17 +179,6 @@
             <version>${project.parent.version}</version>
             <scope>provided</scope>
         </dependency>
-        <dependency>
-            <groupId>org.apache.druid.extensions</groupId>
-            <artifactId>druid-kafka-indexing-service</artifactId>
-            <version>${project.parent.version}</version>
-            <exclusions>
-                <exclusion>
-                    <groupId>org.apache.kafka</groupId>
-                    <artifactId>kafka-clients</artifactId>
-                </exclusion>
-            </exclusions>
-        </dependency>
         <dependency>
             <groupId>org.apache.druid.extensions</groupId>
             <artifactId>druid-lookups-cached-global</artifactId>
@@ -238,11 +227,6 @@
             <groupId>com.fasterxml.jackson.core</groupId>
             <artifactId>jackson-databind</artifactId>
         </dependency>
-        <dependency>
-            <groupId>org.apache.kafka</groupId>
-            <artifactId>kafka-clients</artifactId>
-            <version>${apache.kafka.version}</version>
-        </dependency>
         <dependency>
             <groupId>javax.ws.rs</groupId>
             <artifactId>jsr311-api</artifactId>
diff --git 
a/integration-tests/src/main/java/org/apache/druid/testing/utils/KafkaAdminClient.java
 
b/integration-tests/src/main/java/org/apache/druid/testing/utils/KafkaAdminClient.java
deleted file mode 100644
index dde9bb5a035..00000000000
--- 
a/integration-tests/src/main/java/org/apache/druid/testing/utils/KafkaAdminClient.java
+++ /dev/null
@@ -1,111 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.druid.testing.utils;
-
-import com.google.common.collect.ImmutableList;
-import org.apache.druid.testing.tools.IntegrationTestingConfig;
-import org.apache.druid.testing.tools.KafkaUtil;
-import org.apache.kafka.clients.admin.AdminClient;
-import org.apache.kafka.clients.admin.AdminClientConfig;
-import org.apache.kafka.clients.admin.CreatePartitionsResult;
-import org.apache.kafka.clients.admin.CreateTopicsResult;
-import org.apache.kafka.clients.admin.DeleteTopicsResult;
-import org.apache.kafka.clients.admin.DescribeTopicsResult;
-import org.apache.kafka.clients.admin.NewPartitions;
-import org.apache.kafka.clients.admin.NewTopic;
-import org.apache.kafka.clients.admin.TopicDescription;
-
-import java.util.Collections;
-import java.util.HashMap;
-import java.util.Map;
-import java.util.Properties;
-
-public class KafkaAdminClient implements StreamAdminClient
-{
-  private AdminClient adminClient;
-
-  public KafkaAdminClient(IntegrationTestingConfig config)
-  {
-    Properties properties = new Properties();
-    properties.setProperty(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, 
config.getKafkaHost());
-    KafkaUtil.addPropertiesFromTestConfig(config, properties);
-    adminClient = AdminClient.create(properties);
-  }
-
-  @Override
-  public void createStream(String streamName, int partitionCount, Map<String, 
String> tags) throws Exception
-  {
-    final short replicationFactor = 1;
-    final NewTopic newTopic = new NewTopic(streamName, partitionCount, 
replicationFactor);
-    final CreateTopicsResult createTopicsResult = 
adminClient.createTopics(Collections.singleton(newTopic));
-    // Wait for create topic to compelte
-    createTopicsResult.values().get(streamName).get();
-  }
-
-  @Override
-  public void deleteStream(String streamName) throws Exception
-  {
-    DeleteTopicsResult deleteTopicsResult = 
adminClient.deleteTopics(ImmutableList.of(streamName));
-    deleteTopicsResult.values().get(streamName).get();
-  }
-
-  /**
-   * This method can only increase the partition count of {@param streamName} 
to have a final partition
-   * count of {@param newPartitionCount}
-   * If {@param blocksUntilStarted} is set to true, then this method will 
blocks until the partitioning
-   * started (but not nessesary finished), otherwise, the method will returns 
right after issue the
-   * repartitioning command
-   */
-  @Override
-  public void updatePartitionCount(String streamName, int newPartitionCount, 
boolean blocksUntilStarted) throws Exception
-  {
-    Map<String, NewPartitions> counts = new HashMap<>();
-    counts.put(streamName, NewPartitions.increaseTo(newPartitionCount));
-    CreatePartitionsResult createPartitionsResult = 
adminClient.createPartitions(counts);
-    if (blocksUntilStarted) {
-      createPartitionsResult.values().get(streamName).get();
-
-    }
-  }
-
-  /**
-   * Stream state such as active/non-active does not applies to Kafka.
-   * Returning true since Kafka stream is always active and can always be 
writen and read to.
-   */
-  @Override
-  public boolean isStreamActive(String streamName)
-  {
-    return true;
-  }
-
-  @Override
-  public int getStreamPartitionCount(String streamName) throws Exception
-  {
-    DescribeTopicsResult result = 
adminClient.describeTopics(ImmutableList.of(streamName));
-    TopicDescription topicDescription = result.values().get(streamName).get();
-    return topicDescription.partitions().size();
-  }
-
-  @Override
-  public boolean verfiyPartitionCountUpdated(String streamName, int 
oldPartitionCount, int newPartitionCount) throws Exception
-  {
-    return getStreamPartitionCount(streamName) == newPartitionCount;
-  }
-}
diff --git 
a/integration-tests/src/test/java/org/apache/druid/tests/TestNGGroup.java 
b/integration-tests/src/test/java/org/apache/druid/tests/TestNGGroup.java
index 60600d633d2..6db9e3999f6 100644
--- a/integration-tests/src/test/java/org/apache/druid/tests/TestNGGroup.java
+++ b/integration-tests/src/test/java/org/apache/druid/tests/TestNGGroup.java
@@ -25,14 +25,6 @@ package org.apache.druid.tests;
  */
 public class TestNGGroup
 {
-  public static final String KAFKA_INDEX = "kafka-index";
-
-  public static final String KAFKA_INDEX_SLOW = "kafka-index-slow";
-
-  public static final String TRANSACTIONAL_KAFKA_INDEX = 
"kafka-transactional-index";
-
-  public static final String TRANSACTIONAL_KAFKA_INDEX_SLOW = 
"kafka-transactional-index-slow";
-
   /**
    * This group can only be run individually using -Dgroups=query since it 
requires specific test data setup.
    */
diff --git 
a/integration-tests/src/test/java/org/apache/druid/tests/coordinator/duty/ITFaultyClusterTest.java
 
b/integration-tests/src/test/java/org/apache/druid/tests/coordinator/duty/ITFaultyClusterTest.java
deleted file mode 100644
index c36a23ad4ec..00000000000
--- 
a/integration-tests/src/test/java/org/apache/druid/tests/coordinator/duty/ITFaultyClusterTest.java
+++ /dev/null
@@ -1,196 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.druid.tests.coordinator.duty;
-
-import org.apache.druid.java.util.common.DateTimes;
-import org.apache.druid.java.util.common.logger.Logger;
-import org.apache.druid.testing.cluster.ClusterTestingTaskConfig;
-import org.apache.druid.testing.cluster.overlord.FaultyLagAggregator;
-import org.apache.druid.testing.guice.DruidTestModuleFactory;
-import org.apache.druid.testing.tools.EventSerializer;
-import org.apache.druid.testing.tools.ITRetryUtil;
-import org.apache.druid.testing.tools.KafkaUtil;
-import org.apache.druid.testing.tools.StreamEventWriter;
-import org.apache.druid.testing.tools.StreamGenerator;
-import org.apache.druid.testing.tools.WikipediaStreamEventStreamGenerator;
-import org.apache.druid.tests.TestNGGroup;
-import org.apache.druid.tests.indexer.AbstractKafkaIndexingServiceTest;
-import org.apache.druid.tests.indexer.AbstractStreamIndexingTest;
-import org.joda.time.DateTime;
-import org.joda.time.Period;
-import org.testng.annotations.BeforeClass;
-import org.testng.annotations.BeforeMethod;
-import org.testng.annotations.DataProvider;
-import org.testng.annotations.Guice;
-import org.testng.annotations.Test;
-
-import java.io.Closeable;
-import java.util.Map;
-
-/**
- * Integration test to verify induction of various faults in the cluster
- * using {@link ClusterTestingTaskConfig}.
- * <p>
- * Future tests can try to leverage the cluster testing config to write tests
- * for cluster scalability and stability.
- */
-@Test(groups = {TestNGGroup.KAFKA_INDEX_SLOW})
-@Guice(moduleFactory = DruidTestModuleFactory.class)
-public class ITFaultyClusterTest extends AbstractKafkaIndexingServiceTest
-{
-  private static final Logger log = new Logger(ITFaultyClusterTest.class);
-
-  private GeneratedTestConfig generatedTestConfig;
-  private StreamGenerator streamGenerator;
-
-  private String fullDatasourceName;
-
-  @DataProvider
-  public static Object[] getParameters()
-  {
-    return new Object[]{false, true};
-  }
-
-  @BeforeClass
-  public void setupClass() throws Exception
-  {
-    doBeforeClass();
-  }
-
-  @BeforeMethod
-  public void setup() throws Exception
-  {
-    generatedTestConfig = new GeneratedTestConfig(
-        Specs.PARSER_TYPE,
-        getResourceAsString(Specs.INPUT_FORMAT_PATH)
-    );
-    fullDatasourceName = generatedTestConfig.getFullDatasourceName();
-    final EventSerializer serializer = jsonMapper.readValue(
-        getResourceAsStream(Specs.SERIALIZER_PATH),
-        EventSerializer.class
-    );
-    streamGenerator = new WikipediaStreamEventStreamGenerator(serializer, 6, 
100);
-  }
-
-  @Override
-  public String getTestNamePrefix()
-  {
-    return "faulty_cluster";
-  }
-
-  @Test(dataProvider = "getParameters")
-  public void test_streamingIngestion_worksWithFaultyCluster(boolean 
transactionEnabled) throws Exception
-  {
-    if (shouldSkipTest(transactionEnabled)) {
-      return;
-    }
-
-    try (
-        final Closeable closer = createResourceCloser(generatedTestConfig);
-        final StreamEventWriter streamEventWriter = 
createStreamEventWriter(config, transactionEnabled)
-    ) {
-      // Set context parameters
-      final Map<String, Object> taskContext = Map.of(
-          "clusterTesting",
-          Map.of(
-              "metadataConfig", Map.of("cleanupPendingSegments", false),
-              "coordinatorClientConfig", Map.of("minSegmentHandoffDelay", 
"PT10S"),
-              "taskActionClientConfig", Map.of("segmentAllocateDelay", 
"PT10S", "segmentPublishDelay", "PT10S")
-          )
-      );
-
-      // Start supervisor
-      final String supervisorSpec = generatedTestConfig
-          .withContext(taskContext)
-          .withLagAggregator(new FaultyLagAggregator(1_000_000))
-          .getStreamIngestionPropsTransform()
-          .apply(getResourceAsString(SUPERVISOR_SPEC_TEMPLATE_PATH));
-
-      final String supervisorId = indexer.submitSupervisor(supervisorSpec);
-      generatedTestConfig.setSupervisorId(supervisorId);
-      log.info("Submitted supervisor[%s] with spec[%s]", supervisorId, 
supervisorSpec);
-
-      // Generate data for minutes 1, 2 and 3
-      final DateTime firstRecordTime = DateTimes.of("2000-01-01T01:01:00Z");
-      final long rowsForMinute1 = generateDataForMinute(firstRecordTime, 
streamEventWriter);
-      final long rowsForMinute2 = 
generateDataForMinute(firstRecordTime.plus(Period.minutes(1)), 
streamEventWriter);
-      final long rowsForMinute3 = 
generateDataForMinute(firstRecordTime.plus(Period.minutes(2)), 
streamEventWriter);
-
-      ITRetryUtil.retryUntilTrue(
-          () -> {
-            final Integer aggregateLag = (Integer) 
indexer.getFullSupervisorStatus(supervisorId).get("aggregateLag");
-            log.info("Aggregate lag is [%d].", aggregateLag);
-            return aggregateLag != null && aggregateLag > 1_000_000;
-          },
-          "Aggregate lag exceeds 1M"
-      );
-
-      // Wait for data to be ingested for all the minutes
-      waitUntilDatasourceRowCountEquals(fullDatasourceName, rowsForMinute1 + 
rowsForMinute2 + rowsForMinute3);
-      waitForSegmentsToLoad(fullDatasourceName);
-
-      // 2 segments for each minute, total 6
-      waitUntilDatasourceSegmentCountEquals(fullDatasourceName, 6);
-    }
-  }
-
-  /**
-   * Generates data points for a minute with the specified start time.
-   *
-   * @return Number of rows generated.
-   */
-  private long generateDataForMinute(DateTime firstRecordTime, 
StreamEventWriter streamEventWriter)
-  {
-    final long rowCount = streamGenerator.run(
-        generatedTestConfig.getStreamName(),
-        streamEventWriter,
-        10,
-        firstRecordTime
-    );
-    log.info("Generated [%d] rows for 1 minute interval with start[%s]", 
rowCount, firstRecordTime);
-
-    return rowCount;
-  }
-
-  /**
-   * Checks if a test should be skipped based on whether transaction is 
enabled or not.
-   */
-  private boolean shouldSkipTest(boolean testEnableTransaction)
-  {
-    Map<String, String> kafkaTestProps = KafkaUtil
-        .getAdditionalKafkaTestConfigFromProperties(config);
-    boolean configEnableTransaction = Boolean.parseBoolean(
-        kafkaTestProps.getOrDefault(KafkaUtil.TEST_CONFIG_TRANSACTION_ENABLED, 
"false")
-    );
-
-    return configEnableTransaction != testEnableTransaction;
-  }
-
-  /**
-   * Constants for test specs.
-   */
-  private static class Specs
-  {
-    static final String SERIALIZER_PATH = DATA_RESOURCE_ROOT + 
"/csv/serializer/serializer.json";
-    static final String INPUT_FORMAT_PATH = DATA_RESOURCE_ROOT + 
"/csv/input_format/input_format.json";
-    static final String PARSER_TYPE = AbstractStreamIndexingTest.INPUT_FORMAT;
-  }
-
-}
diff --git 
a/integration-tests/src/test/java/org/apache/druid/tests/indexer/AbstractKafkaIndexingServiceTest.java
 
b/integration-tests/src/test/java/org/apache/druid/tests/indexer/AbstractKafkaIndexingServiceTest.java
deleted file mode 100644
index 2115ed59f7e..00000000000
--- 
a/integration-tests/src/test/java/org/apache/druid/tests/indexer/AbstractKafkaIndexingServiceTest.java
+++ /dev/null
@@ -1,219 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.druid.tests.indexer;
-
-import com.google.common.base.Preconditions;
-import org.apache.druid.indexing.kafka.KafkaConsumerConfigs;
-import org.apache.druid.indexing.seekablestream.supervisor.LagAggregator;
-import org.apache.druid.java.util.common.StringUtils;
-import org.apache.druid.testing.tools.IntegrationTestingConfig;
-import org.apache.druid.testing.tools.KafkaEventWriter;
-import org.apache.druid.testing.tools.KafkaUtil;
-import org.apache.druid.testing.tools.StreamEventWriter;
-import org.apache.druid.testing.utils.KafkaAdminClient;
-import org.apache.druid.testing.utils.StreamAdminClient;
-
-import java.util.List;
-import java.util.Map;
-import java.util.Properties;
-import java.util.function.Function;
-
-public abstract class AbstractKafkaIndexingServiceTest extends 
AbstractStreamIndexingTest
-{
-  @Override
-  StreamAdminClient createStreamAdminClient(IntegrationTestingConfig config)
-  {
-    return new KafkaAdminClient(config);
-  }
-
-  @Override
-  public StreamEventWriter createStreamEventWriter(IntegrationTestingConfig 
config, Boolean transactionEnabled)
-  {
-    return new KafkaEventWriter(config, 
Preconditions.checkNotNull(transactionEnabled, "transactionEnabled"));
-  }
-
-  @Override
-  Function<String, String> generateStreamIngestionPropsTransform(
-      String supervisorId,
-      String streamName,
-      String fullDatasourceName,
-      String parserType,
-      String parserOrInputFormat,
-      List<String> dimensions,
-      Map<String, Object> context,
-      LagAggregator lagAggregator,
-      IntegrationTestingConfig config
-  )
-  {
-    final Map<String, Object> consumerConfigs = 
KafkaConsumerConfigs.getConsumerProperties();
-    final Properties consumerProperties = new Properties();
-    consumerProperties.putAll(consumerConfigs);
-    consumerProperties.setProperty("bootstrap.servers", 
config.getKafkaInternalHost());
-    KafkaUtil.addPropertiesFromTestConfig(config, consumerProperties);
-    return spec -> {
-      try {
-        spec = StringUtils.replace(
-            spec,
-            "%%SUPERVISOR_ID%%",
-            supervisorId
-        );
-        spec = StringUtils.replace(
-            spec,
-            "%%DATASOURCE%%",
-            fullDatasourceName
-        );
-        spec = StringUtils.replace(
-            spec,
-            "%%STREAM_TYPE%%",
-            "kafka"
-        );
-        spec = StringUtils.replace(
-            spec,
-            "%%TOPIC_KEY%%",
-            "topic"
-        );
-        spec = StringUtils.replace(
-            spec,
-            "%%TOPIC_VALUE%%",
-            streamName
-        );
-
-        if (AbstractStreamIndexingTest.INPUT_FORMAT.equals(parserType)) {
-          spec = StringUtils.replace(
-              spec,
-              "%%INPUT_FORMAT%%",
-              parserOrInputFormat
-          );
-          spec = StringUtils.replace(
-              spec,
-              "%%PARSER%%",
-              "null"
-          );
-        } else if 
(AbstractStreamIndexingTest.INPUT_ROW_PARSER.equals(parserType)) {
-          spec = StringUtils.replace(
-              spec,
-              "%%PARSER%%",
-              parserOrInputFormat
-          );
-          spec = StringUtils.replace(
-              spec,
-              "%%INPUT_FORMAT%%",
-              "null"
-          );
-        }
-        spec = StringUtils.replace(
-            spec,
-            "%%USE_EARLIEST_KEY%%",
-            "useEarliestOffset"
-        );
-        spec = StringUtils.replace(
-            spec,
-            "%%STREAM_PROPERTIES_KEY%%",
-            "consumerProperties"
-        );
-        spec = StringUtils.replace(
-            spec,
-            "%%SCHEMA_REGISTRY_HOST%%",
-            StringUtils.format("http://%s";, 
config.getSchemaRegistryInternalHost())
-        );
-        spec = StringUtils.replace(
-            spec,
-            "%%DIMENSIONS%%",
-            jsonMapper.writeValueAsString(dimensions)
-        );
-        spec = StringUtils.replace(
-            spec,
-            "%%CONTEXT%%",
-            jsonMapper.writeValueAsString(context)
-        );
-        spec = StringUtils.replace(
-            spec,
-            "%%LAG_AGGREGATOR%%",
-            jsonMapper.writeValueAsString(lagAggregator)
-        );
-        return StringUtils.replace(
-            spec,
-            "%%STREAM_PROPERTIES_VALUE%%",
-            jsonMapper.writeValueAsString(consumerProperties)
-        );
-      }
-      catch (Exception e) {
-        throw new RuntimeException(e);
-      }
-    };
-  }
-
-  @Override
-  Function<String, String> generateStreamQueryPropsTransform(String 
streamName, String fullDatasourceName)
-  {
-    return spec -> {
-      try {
-        spec = StringUtils.replace(
-            spec,
-            "%%DATASOURCE%%",
-            fullDatasourceName
-        );
-        spec = StringUtils.replace(
-            spec,
-            "%%TIMEBOUNDARY_RESPONSE_TIMESTAMP%%",
-            TIMESTAMP_FMT.print(FIRST_EVENT_TIME)
-        );
-        spec = StringUtils.replace(
-            spec,
-            "%%TIMEBOUNDARY_RESPONSE_MAXTIME%%",
-            
TIMESTAMP_FMT.print(FIRST_EVENT_TIME.plusSeconds(TOTAL_NUMBER_OF_SECOND - 1))
-        );
-        spec = StringUtils.replace(
-            spec,
-            "%%TIMEBOUNDARY_RESPONSE_MINTIME%%",
-            TIMESTAMP_FMT.print(FIRST_EVENT_TIME)
-        );
-        spec = StringUtils.replace(
-            spec,
-            "%%TIMESERIES_QUERY_START%%",
-            INTERVAL_FMT.print(FIRST_EVENT_TIME)
-        );
-        spec = StringUtils.replace(
-            spec,
-            "%%TIMESERIES_QUERY_END%%",
-            
INTERVAL_FMT.print(FIRST_EVENT_TIME.plusSeconds(TOTAL_NUMBER_OF_SECOND - 
1).plusMinutes(2))
-        );
-        spec = StringUtils.replace(
-            spec,
-            "%%TIMESERIES_RESPONSE_TIMESTAMP%%",
-            TIMESTAMP_FMT.print(FIRST_EVENT_TIME)
-        );
-        spec = StringUtils.replace(
-            spec,
-            "%%TIMESERIES_ADDED%%",
-            Long.toString(getSumOfEventSequence(EVENTS_PER_SECOND) * 
TOTAL_NUMBER_OF_SECOND)
-        );
-        return StringUtils.replace(
-            spec,
-            "%%TIMESERIES_NUMEVENTS%%",
-            Integer.toString(EVENTS_PER_SECOND * TOTAL_NUMBER_OF_SECOND)
-        );
-      }
-      catch (Exception e) {
-        throw new RuntimeException(e);
-      }
-    };
-  }
-}
diff --git 
a/integration-tests/src/test/java/org/apache/druid/tests/indexer/ITKafkaIndexingServiceNonTransactionalSerializedTest.java
 
b/integration-tests/src/test/java/org/apache/druid/tests/indexer/ITKafkaIndexingServiceNonTransactionalSerializedTest.java
deleted file mode 100644
index 88510139d91..00000000000
--- 
a/integration-tests/src/test/java/org/apache/druid/tests/indexer/ITKafkaIndexingServiceNonTransactionalSerializedTest.java
+++ /dev/null
@@ -1,70 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.druid.tests.indexer;
-
-import org.apache.druid.testing.guice.DruidTestModuleFactory;
-import org.apache.druid.tests.TestNGGroup;
-import org.testng.annotations.BeforeClass;
-import org.testng.annotations.Guice;
-import org.testng.annotations.Test;
-
-@Test(groups = {TestNGGroup.KAFKA_INDEX_SLOW})
-@Guice(moduleFactory = DruidTestModuleFactory.class)
-public class ITKafkaIndexingServiceNonTransactionalSerializedTest extends 
AbstractKafkaIndexingServiceTest
-{
-  @Override
-  public String getTestNamePrefix()
-  {
-    return "kafka_nontransactional_serialized";
-  }
-
-  @BeforeClass
-  public void beforeClass() throws Exception
-  {
-    doBeforeClass();
-  }
-
-  /**
-   * This test must be run individually since the test affect and modify the 
state of the Druid cluster
-   */
-  @Test
-  public void testKafkaIndexDataWithLosingCoordinator() throws Exception
-  {
-    doTestIndexDataWithLosingCoordinator(false);
-  }
-
-  /**
-   * This test must be run individually since the test affect and modify the 
state of the Druid cluster
-   */
-  @Test
-  public void testKafkaIndexDataWithLosingOverlord() throws Exception
-  {
-    doTestIndexDataWithLosingOverlord(false);
-  }
-
-  /**
-   * This test must be run individually since the test affect and modify the 
state of the Druid cluster
-   */
-  @Test
-  public void testKafkaIndexDataWithLosingHistorical() throws Exception
-  {
-    doTestIndexDataWithLosingHistorical(false);
-  }
-}
diff --git 
a/integration-tests/src/test/java/org/apache/druid/tests/indexer/ITKafkaIndexingServiceStopTasksEarlyTest.java
 
b/integration-tests/src/test/java/org/apache/druid/tests/indexer/ITKafkaIndexingServiceStopTasksEarlyTest.java
deleted file mode 100644
index e729680159d..00000000000
--- 
a/integration-tests/src/test/java/org/apache/druid/tests/indexer/ITKafkaIndexingServiceStopTasksEarlyTest.java
+++ /dev/null
@@ -1,53 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.druid.tests.indexer;
-
-import org.apache.druid.testing.guice.DruidTestModuleFactory;
-import org.apache.druid.tests.TestNGGroup;
-import org.testng.annotations.BeforeClass;
-import org.testng.annotations.Guice;
-import org.testng.annotations.Test;
-
-@Test(groups = TestNGGroup.KAFKA_INDEX)
-@Guice(moduleFactory = DruidTestModuleFactory.class)
-public class ITKafkaIndexingServiceStopTasksEarlyTest extends 
AbstractKafkaIndexingServiceTest
-{
-  @Override
-  public String getTestNamePrefix()
-  {
-    return "kafka_stop_tasks_early";
-  }
-
-  @BeforeClass
-  public void beforeClass() throws Exception
-  {
-    doBeforeClass();
-  }
-
-
-  // This test does not actually check whether the tasks stopped early since 
the API does
-  // not make any guarantees about handoff. Instead it makes sure the handoff 
API can be called
-  // and that the tasks will eventually catch up with new data.
-  @Test
-  public void testStopTasksEarly() throws Exception
-  {
-    doTestIndexDataHandoffEarly(false);
-  }
-}
diff --git 
a/integration-tests/src/test/java/org/apache/druid/tests/indexer/ITKafkaIndexingServiceTransactionalSerializedTest.java
 
b/integration-tests/src/test/java/org/apache/druid/tests/indexer/ITKafkaIndexingServiceTransactionalSerializedTest.java
deleted file mode 100644
index fdd06ff4f88..00000000000
--- 
a/integration-tests/src/test/java/org/apache/druid/tests/indexer/ITKafkaIndexingServiceTransactionalSerializedTest.java
+++ /dev/null
@@ -1,70 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.druid.tests.indexer;
-
-import org.apache.druid.testing.guice.DruidTestModuleFactory;
-import org.apache.druid.tests.TestNGGroup;
-import org.testng.annotations.BeforeClass;
-import org.testng.annotations.Guice;
-import org.testng.annotations.Test;
-
-@Test(groups = TestNGGroup.TRANSACTIONAL_KAFKA_INDEX_SLOW)
-@Guice(moduleFactory = DruidTestModuleFactory.class)
-public class ITKafkaIndexingServiceTransactionalSerializedTest extends 
AbstractKafkaIndexingServiceTest
-{
-  @Override
-  public String getTestNamePrefix()
-  {
-    return "kafka_transactional_serialized";
-  }
-
-  @BeforeClass
-  public void beforeClass() throws Exception
-  {
-    doBeforeClass();
-  }
-
-  /**
-   * This test must be run individually since the test affect and modify the 
state of the Druid cluster
-   */
-  @Test
-  public void testKafkaIndexDataWithLosingCoordinator() throws Exception
-  {
-    doTestIndexDataWithLosingCoordinator(true);
-  }
-
-  /**
-   * This test must be run individually since the test affect and modify the 
state of the Druid cluster
-   */
-  @Test
-  public void testKafkaIndexDataWithLosingOverlord() throws Exception
-  {
-    doTestIndexDataWithLosingOverlord(true);
-  }
-
-  /**
-   * This test must be run individually since the test affect and modify the 
state of the Druid cluster
-   */
-  @Test
-  public void testKafkaIndexDataWithLosingHistorical() throws Exception
-  {
-    doTestIndexDataWithLosingHistorical(true);
-  }
-}
diff --git 
a/integration-tests/src/test/java/org/apache/druid/tests/indexer/ITNilColumnTest.java
 
b/integration-tests/src/test/java/org/apache/druid/tests/indexer/ITNilColumnTest.java
deleted file mode 100644
index 7a7bd7169cf..00000000000
--- 
a/integration-tests/src/test/java/org/apache/druid/tests/indexer/ITNilColumnTest.java
+++ /dev/null
@@ -1,223 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.druid.tests.indexer;
-
-import com.fasterxml.jackson.core.JsonProcessingException;
-import com.google.common.collect.ImmutableList;
-import com.google.common.collect.ImmutableMap;
-import 
org.apache.druid.indexing.overlord.supervisor.SupervisorStateManager.BasicState;
-import org.apache.druid.java.util.common.StringUtils;
-import org.apache.druid.java.util.common.logger.Logger;
-import org.apache.druid.sql.http.SqlQuery;
-import org.apache.druid.testing.clients.TaskResponseObject;
-import org.apache.druid.testing.guice.DruidTestModuleFactory;
-import org.apache.druid.testing.tools.ITRetryUtil;
-import org.apache.druid.testing.tools.JsonEventSerializer;
-import org.apache.druid.testing.tools.StreamEventWriter;
-import org.apache.druid.testing.tools.StreamGenerator;
-import org.apache.druid.testing.tools.WikipediaStreamEventStreamGenerator;
-import org.apache.druid.testing.utils.SqlQueryWithResults;
-import org.apache.druid.tests.TestNGGroup;
-import org.testng.annotations.BeforeClass;
-import org.testng.annotations.Guice;
-import org.testng.annotations.Test;
-
-import java.io.Closeable;
-import java.util.ArrayList;
-import java.util.List;
-import java.util.stream.Collectors;
-
-@Test(groups = TestNGGroup.TRANSACTIONAL_KAFKA_INDEX_SLOW)
-@Guice(moduleFactory = DruidTestModuleFactory.class)
-public class ITNilColumnTest extends AbstractKafkaIndexingServiceTest
-{
-  private static final Logger LOG = new Logger(ITNilColumnTest.class);
-  private static final String NIL_DIM1 = "nilDim1";
-  private static final String NIL_DIM2 = "nilDim2";
-
-  private final List<String> dimensions;
-
-  public ITNilColumnTest()
-  {
-    this.dimensions = new ArrayList<>(DEFAULT_DIMENSIONS.size() + 2);
-    dimensions.add(NIL_DIM1);
-    dimensions.addAll(DEFAULT_DIMENSIONS);
-    dimensions.add(NIL_DIM2);
-  }
-
-  @Override
-  public String getTestNamePrefix()
-  {
-    return "nil-column-test";
-  }
-
-  @BeforeClass
-  public void beforeClass() throws Exception
-  {
-    doBeforeClass();
-  }
-
-  @Test
-  public void testQueryNilColumnBeforeAndAfterPublishingSegments() throws 
Exception
-  {
-    final GeneratedTestConfig generatedTestConfig = new GeneratedTestConfig(
-        INPUT_FORMAT,
-        getResourceAsString(JSON_INPUT_FORMAT_PATH),
-        dimensions
-    );
-    try (
-        final Closeable closer = createResourceCloser(generatedTestConfig);
-        final StreamEventWriter streamEventWriter = 
createStreamEventWriter(config, true)
-    ) {
-      final String taskSpec = 
generatedTestConfig.getStreamIngestionPropsTransform()
-                                                 
.apply(getResourceAsString(SUPERVISOR_SPEC_TEMPLATE_PATH));
-      LOG.info("supervisorSpec: [%s]\n", taskSpec);
-      // Start supervisor
-      generatedTestConfig.setSupervisorId(indexer.submitSupervisor(taskSpec));
-      LOG.info("Submitted supervisor");
-      // Start generating half of the data
-      final StreamGenerator streamGenerator = new 
WikipediaStreamEventStreamGenerator(
-          new JsonEventSerializer(jsonMapper),
-          EVENTS_PER_SECOND,
-          CYCLE_PADDING_MS
-      );
-      long numWritten = streamGenerator.run(
-          generatedTestConfig.getStreamName(),
-          streamEventWriter,
-          TOTAL_NUMBER_OF_SECOND,
-          FIRST_EVENT_TIME
-      );
-      // Verify supervisor is healthy before suspension
-      ITRetryUtil.retryUntil(
-          () -> 
BasicState.RUNNING.equals(indexer.getSupervisorStatus(generatedTestConfig.getSupervisorId())),
-          true,
-          10000,
-          30,
-          "Waiting for supervisor to be healthy"
-      );
-      // 60 events should have been ingested as per EVENTS_PER_SECOND and 
TOTAL_NUMBER_OF_SECOND.
-      // Since maxRowsInMemory is set to 500,000, every row should be in 
incrementalIndex.
-      // So, let's test if SQL finds nil dimensions from incrementalIndexes.
-      
dataLoaderHelper.waitUntilDatasourceIsReady(generatedTestConfig.getFullDatasourceName());
-      final List<SqlQueryWithResults> queryWithResults = 
getQueryWithResults(generatedTestConfig);
-      
sqlQueryHelper.testQueriesFromString(jsonMapper.writeValueAsString(queryWithResults));
-      final List<SqlQueryWithResults> metadataQueryWithResults = 
getMetadataQueryWithResults(generatedTestConfig);
-      
sqlQueryHelper.testQueriesFromString(jsonMapper.writeValueAsString(metadataQueryWithResults));
-
-      // Suspend the supervisor
-      indexer.terminateSupervisor(generatedTestConfig.getSupervisorId());
-      ITRetryUtil.retryUntilTrue(
-          () -> {
-            List<TaskResponseObject> tasks = indexer
-                .getRunningTasks()
-                .stream()
-                .filter(task -> 
task.getId().contains(generatedTestConfig.getFullDatasourceName()))
-                .filter(task -> "index_kafka".equals(task.getType()))
-                .collect(Collectors.toList());
-            LOG.info("[%s] tasks are running", tasks.stream().map(task -> {
-              try {
-                return jsonMapper.writeValueAsString(task);
-              }
-              catch (JsonProcessingException e) {
-                throw new RuntimeException(e);
-              }
-            }).collect(Collectors.toList()));
-            return tasks.isEmpty();
-          },
-          "Waiting for all tasks to stop"
-      );
-
-      // Now, we should have published all segments.
-      // Let's test if SQL finds nil dimensions from queryableIndexes.
-      
dataLoaderHelper.waitUntilDatasourceIsReady(generatedTestConfig.getFullDatasourceName());
-      verifyIngestedData(generatedTestConfig, numWritten);
-
-      
sqlQueryHelper.testQueriesFromString(jsonMapper.writeValueAsString(queryWithResults));
-      
sqlQueryHelper.testQueriesFromString(jsonMapper.writeValueAsString(metadataQueryWithResults));
-    }
-  }
-
-  private static List<SqlQueryWithResults> 
getQueryWithResults(GeneratedTestConfig generatedTestConfig)
-  {
-    return ImmutableList.of(
-        new SqlQueryWithResults(
-            new SqlQuery(
-                StringUtils.format(
-                    "SELECT count(*) FROM \"%s\" WHERE %s IS NOT NULL OR %s IS 
NOT NULL",
-                    generatedTestConfig.getFullDatasourceName(),
-                    NIL_DIM1,
-                    NIL_DIM2
-                ),
-                null,
-                false,
-                false,
-                false,
-                null,
-                null
-            ),
-            "Get row count of datasource",
-            ImmutableList.of(ImmutableMap.of("EXPR$0", 0))
-        )
-    );
-  }
-
-  private List<SqlQueryWithResults> 
getMetadataQueryWithResults(GeneratedTestConfig generatedTestConfig)
-  {
-    return ImmutableList.of(
-        new SqlQueryWithResults(
-            new SqlQuery(
-                StringUtils.format(
-                    "SELECT COLUMN_NAME, IS_NULLABLE, DATA_TYPE"
-                    + " FROM INFORMATION_SCHEMA.COLUMNS"
-                    + " WHERE TABLE_NAME = '%s' AND COLUMN_NAME IN ('%s', 
'%s')",
-                    generatedTestConfig.getFullDatasourceName(),
-                    NIL_DIM1,
-                    NIL_DIM2
-                ),
-                null,
-                false,
-                false,
-                false,
-                null,
-                null
-            ),
-            "Get column info of datasource",
-            ImmutableList.of(
-                ImmutableMap.of(
-                    "COLUMN_NAME",
-                    NIL_DIM1,
-                    "IS_NULLABLE",
-                    "YES",
-                    "DATA_TYPE",
-                    "VARCHAR"
-                ),
-                ImmutableMap.of(
-                    "COLUMN_NAME",
-                    NIL_DIM2,
-                    "IS_NULLABLE",
-                    "YES",
-                    "DATA_TYPE",
-                    "VARCHAR"
-                )
-            )
-        )
-    );
-  }
-}
diff --git 
a/integration-tests/src/test/java/org/apache/druid/tests/parallelized/ITKafkaIndexingServiceNonTransactionalParallelizedTest.java
 
b/integration-tests/src/test/java/org/apache/druid/tests/parallelized/ITKafkaIndexingServiceNonTransactionalParallelizedTest.java
deleted file mode 100644
index 7f3f41316e3..00000000000
--- 
a/integration-tests/src/test/java/org/apache/druid/tests/parallelized/ITKafkaIndexingServiceNonTransactionalParallelizedTest.java
+++ /dev/null
@@ -1,103 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.druid.tests.parallelized;
-
-import org.apache.druid.testing.guice.DruidTestModuleFactory;
-import org.apache.druid.tests.TestNGGroup;
-import org.apache.druid.tests.indexer.AbstractKafkaIndexingServiceTest;
-import org.testng.annotations.BeforeClass;
-import org.testng.annotations.Guice;
-import org.testng.annotations.Test;
-
-@Test(groups = TestNGGroup.KAFKA_INDEX)
-@Guice(moduleFactory = DruidTestModuleFactory.class)
-public class ITKafkaIndexingServiceNonTransactionalParallelizedTest extends 
AbstractKafkaIndexingServiceTest
-{
-  @Override
-  public String getTestNamePrefix()
-  {
-    return "kafka_non_transactional_parallelized";
-  }
-
-  @BeforeClass
-  public void beforeClass() throws Exception
-  {
-    doBeforeClass();
-  }
-
-  /**
-   * This test can be run concurrently with other tests as it 
creates/modifies/teardowns a unique datasource
-   * and supervisor maintained and scoped within this test only
-   */
-  @Test
-  public void testKafkaIndexDataWithStartStopSupervisor() throws Exception
-  {
-    doTestIndexDataWithStartStopSupervisor(false);
-  }
-
-  /**
-   * This test can be run concurrently with other tests as it 
creates/modifies/teardowns a unique datasource
-   * and supervisor maintained and scoped within this test only
-   */
-  @Test
-  public void testKafkaIndexDataWithWithAutoscaler() throws Exception
-  {
-    doTestIndexDataWithAutoscaler(false);
-  }
-
-  @Test
-  public void testIndexDataWithIdleConfigEnabled() throws Exception
-  {
-    doTestIndexDataWithIdleConfigEnabled(false);
-  }
-
-  /**
-   * This test can be run concurrently with other tests as it 
creates/modifies/teardowns a unique datasource
-   * and supervisor maintained and scoped within this test only
-   */
-  @Test
-  public void testKafkaIndexDataWithKafkaReshardSplit() throws Exception
-  {
-    doTestIndexDataWithStreamReshardSplit(false);
-  }
-
-  /**
-   * This test can be run concurrently with other tests as it 
creates/modifies/teardowns a unique datasource
-   * and supervisor maintained and scoped within this test only
-   */
-  @Test
-  public void testKafkaTerminatedSupervisorAutoCleanup() throws Exception
-  {
-    doTestTerminatedSupervisorAutoCleanup(false);
-  }
-
-  /**
-   * This test can be run concurrently with other tests as it 
creates/modifies/teardowns a unique datasource
-   * with supervisor(s) maintained and scoped within this test only
-   */
-  @Test
-  public void testKafkaIndexMultiSupervisorWithNoTransaction() throws Exception
-  {
-    doTestMultiSupervisorIndexDataStableState(
-        false,
-        2
-    );
-  }
-}
diff --git 
a/integration-tests/src/test/java/org/apache/druid/tests/parallelized/ITKafkaIndexingServiceTransactionalParallelizedTest.java
 
b/integration-tests/src/test/java/org/apache/druid/tests/parallelized/ITKafkaIndexingServiceTransactionalParallelizedTest.java
deleted file mode 100644
index 49ca3e9c226..00000000000
--- 
a/integration-tests/src/test/java/org/apache/druid/tests/parallelized/ITKafkaIndexingServiceTransactionalParallelizedTest.java
+++ /dev/null
@@ -1,77 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.druid.tests.parallelized;
-
-import org.apache.druid.testing.guice.DruidTestModuleFactory;
-import org.apache.druid.tests.TestNGGroup;
-import org.apache.druid.tests.indexer.AbstractKafkaIndexingServiceTest;
-import org.testng.annotations.BeforeClass;
-import org.testng.annotations.Guice;
-import org.testng.annotations.Test;
-
-@Test(groups = TestNGGroup.TRANSACTIONAL_KAFKA_INDEX)
-@Guice(moduleFactory = DruidTestModuleFactory.class)
-public class ITKafkaIndexingServiceTransactionalParallelizedTest extends 
AbstractKafkaIndexingServiceTest
-{
-  @Override
-  public String getTestNamePrefix()
-  {
-    return "kafka_transactional_parallelized";
-  }
-
-  @BeforeClass
-  public void beforeClass() throws Exception
-  {
-    doBeforeClass();
-  }
-
-  /**
-   * This test can be run concurrently with other tests as it 
creates/modifies/teardowns a unique datasource
-   * and supervisor maintained and scoped within this test only
-   */
-  @Test
-  public void testKafkaIndexDataWithStartStopSupervisor() throws Exception
-  {
-    doTestIndexDataWithStartStopSupervisor(true);
-  }
-
-  /**
-   * This test can be run concurrently with other tests as it 
creates/modifies/teardowns a unique datasource
-   * and supervisor maintained and scoped within this test only
-   */
-  @Test
-  public void testKafkaIndexDataWithKafkaReshardSplit() throws Exception
-  {
-    doTestIndexDataWithStreamReshardSplit(true);
-  }
-
-  /**
-   * This test can be run concurrently with other tests as it 
creates/modifies/teardowns a unique datasource
-   * with supervisor(s) maintained and scoped within this test only
-   */
-  @Test
-  public void testKafkaIndexMultiSupervisorWithTransaction() throws Exception
-  {
-    doTestMultiSupervisorIndexDataStableState(
-        true,
-        2
-    );
-  }
-}
diff --git 
a/server/src/main/java/org/apache/druid/indexing/overlord/supervisor/SupervisorReport.java
 
b/server/src/main/java/org/apache/druid/indexing/overlord/supervisor/SupervisorReport.java
index ebe1018f4d5..04d22888d03 100644
--- 
a/server/src/main/java/org/apache/druid/indexing/overlord/supervisor/SupervisorReport.java
+++ 
b/server/src/main/java/org/apache/druid/indexing/overlord/supervisor/SupervisorReport.java
@@ -19,6 +19,7 @@
 
 package org.apache.druid.indexing.overlord.supervisor;
 
+import com.fasterxml.jackson.annotation.JsonCreator;
 import com.fasterxml.jackson.annotation.JsonProperty;
 import org.joda.time.DateTime;
 
@@ -30,7 +31,12 @@ public class SupervisorReport<T>
   private final DateTime generationTime;
   private final T payload;
 
-  public SupervisorReport(String id, DateTime generationTime, T payload)
+  @JsonCreator
+  public SupervisorReport(
+      @JsonProperty("id") String id,
+      @JsonProperty("generationTime") DateTime generationTime,
+      @JsonProperty("payload") T payload
+  )
   {
     this.id = id;
     this.generationTime = generationTime;
diff --git 
a/services/src/test/java/org/apache/druid/testing/embedded/EmbeddedClusterApis.java
 
b/services/src/test/java/org/apache/druid/testing/embedded/EmbeddedClusterApis.java
index 0a1a1051f2e..d796d240987 100644
--- 
a/services/src/test/java/org/apache/druid/testing/embedded/EmbeddedClusterApis.java
+++ 
b/services/src/test/java/org/apache/druid/testing/embedded/EmbeddedClusterApis.java
@@ -235,7 +235,19 @@ public class EmbeddedClusterApis implements 
EmbeddedResource
    */
   public int getTaskCount(String status, String dataSource)
   {
-    return ImmutableList.copyOf((Iterator<?>) onLeaderOverlord(o -> 
o.taskStatuses(status, dataSource, 100))).size();
+    return getTasks(dataSource, status).size();
+  }
+
+  /**
+   * Gets the details of tasks with the given state for the specified 
datasource.
+   * Valid task states are "pending", "waiting", "running", "complete".
+   */
+  public List<TaskStatusPlus> getTasks(String dataSource, String taskState)
+  {
+    return ImmutableList.copyOf(
+        (Iterator<? extends TaskStatusPlus>)
+            onLeaderOverlord(o -> o.taskStatuses(taskState, dataSource, 100))
+    );
   }
 
   /**
diff --git 
a/services/src/test/java/org/apache/druid/testing/embedded/EmbeddedServiceClient.java
 
b/services/src/test/java/org/apache/druid/testing/embedded/EmbeddedServiceClient.java
index 04b60c03118..beacab52920 100644
--- 
a/services/src/test/java/org/apache/druid/testing/embedded/EmbeddedServiceClient.java
+++ 
b/services/src/test/java/org/apache/druid/testing/embedded/EmbeddedServiceClient.java
@@ -185,7 +185,8 @@ public class EmbeddedServiceClient
 
     try {
       StatusResponseHolder response = serviceClient.request(requestBuilder, 
responseHandler);
-      if (!response.getStatus().equals(HttpResponseStatus.OK)) {
+      if (!response.getStatus().equals(HttpResponseStatus.OK)
+          && !response.getStatus().equals(HttpResponseStatus.ACCEPTED)) {
         throw new ISE(
             "Request[%s] failed with status[%s] content[%s].",
             requestBuilder.toString(),


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to