This is an automated email from the ASF dual-hosted git repository.

kfaraz pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/druid.git


The following commit(s) were added to refs/heads/master by this push:
     new 19132c6370a Add tools for scale and fault-tolerance testing (#17959)
19132c6370a is described below

commit 19132c6370aea508df840a00c875f171fe46c2ef
Author: Kashif Faraz <[email protected]>
AuthorDate: Fri May 9 19:35:50 2025 +0530

    Add tools for scale and fault-tolerance testing (#17959)
    
    Description
    -----------
    This patch adds tools to introduce artificial faults in a Druid test 
cluster to allow for scale testing and
    fault-tolerance testing. These tools can be used to trigger faults such as 
segment allocation delay,
    segment publish delay, etc. and evaluate the capability of a cluster to 
recover from such situations.
    
    The `ClusterTestingModule` can evolve over time to allow inducing more 
types of faults.
    
    Changes
    ---------
    - Add `ClusterTestingModule` to inject faulty dependencies when testing is 
enabled
    - Enable cluster testing mode by loading extension `druid-testing-tools` 
and setting config
    `druid.unsafe.cluster.testing=true`
    - Bind `FaultyCoordinatorClient`, `FaultyRemoteTaskActionClient` on Peons 
if testing is enabled
    - Trigger specific faults on tasks by setting the corresponding task 
context parameter
    - Use `FaultyLagAggregator` in a supervisor if specified in the supervisor 
spec `ioConfig.lagAggregator`
    - Add integration test `ITFaultyClusterTest` to try out these configs
---
 distribution/pom.xml                               |   2 +
 .../supervisor/RabbitStreamSupervisor.java         |  10 +-
 .../supervisor/RabbitStreamSupervisorIOConfig.java |   3 +-
 .../indexing/kafka/supervisor/KafkaSupervisor.java |   4 +-
 .../kafka/supervisor/KafkaSupervisorIOConfig.java  |   3 +
 .../druid/indexing/kafka/KafkaSamplerSpecTest.java |   6 +
 .../supervisor/KafkaSupervisorIOConfigTest.java    |  24 +-
 .../kafka/supervisor/KafkaSupervisorTest.java      |   4 +
 .../kinesis/supervisor/KinesisSupervisor.java      |   4 +-
 .../supervisor/KinesisSupervisorIOConfig.java      |   2 +
 extensions-core/testing-tools/pom.xml              |  24 ++
 .../apache/druid/guice/ClusterTestingModule.java   | 150 +++++++++++
 .../testing/cluster/ClusterTestingTaskConfig.java  | 247 ++++++++++++++++++
 .../cluster/overlord/FaultyLagAggregator.java      |  69 +++++
 .../cluster/overlord/FaultyTaskLockbox.java        |  85 ++++++
 .../cluster/task/FaultyCoordinatorClient.java      | 108 ++++++++
 .../testing/cluster/task/FaultyOverlordClient.java |  59 +++++
 .../cluster/task/FaultyRemoteTaskActionClient.java |  93 +++++++
 .../task/FaultyRemoteTaskActionClientFactory.java  |  62 +++++
 .../org.apache.druid.initialization.DruidModule    |   1 +
 .../druid/guice/ClusterTestingModuleTest.java      | 288 +++++++++++++++++++++
 .../druid/indexing/overlord/TaskLockbox.java       |  51 ++--
 .../seekablestream/supervisor/LagAggregator.java   |  65 +++++
 .../supervisor/SeekableStreamSupervisor.java       |  14 +-
 .../SeekableStreamSupervisorIOConfig.java          |  16 ++
 .../SeekableStreamSamplerSpecTest.java             |   2 +
 .../SeekableStreamSupervisorSpecTest.java          |   5 +
 .../SeekableStreamSupervisorStateTest.java         |  37 +--
 .../docker/environment-configs/common              |   3 +
 .../clients/OverlordResourceTestClient.java        |  12 +-
 .../coordinator/duty/ITFaultyClusterTest.java      | 196 ++++++++++++++
 .../druid/tests/indexer/AbstractIndexerTest.java   |  40 +++
 .../indexer/AbstractKafkaIndexingServiceTest.java  |  13 +
 .../AbstractKinesisIndexingServiceTest.java        |  14 +
 .../tests/indexer/AbstractStreamIndexingTest.java  |  49 +++-
 .../stream/data/supervisor_spec_template.json      |   6 +-
 .../client/coordinator/NoopCoordinatorClient.java  |   0
 37 files changed, 1686 insertions(+), 85 deletions(-)

diff --git a/distribution/pom.xml b/distribution/pom.xml
index bc67631fd82..da9631c4867 100644
--- a/distribution/pom.xml
+++ b/distribution/pom.xml
@@ -257,6 +257,8 @@
                                         
<argument>org.apache.druid.extensions:druid-kubernetes-overlord-extensions</argument>
                                         <argument>-c</argument>
                                         
<argument>org.apache.druid.extensions:druid-catalog</argument>
+                                        <argument>-c</argument>
+                                        
<argument>org.apache.druid.extensions:druid-testing-tools</argument>
                                         
<argument>${druid.distribution.pulldeps.opts}</argument>
                                     </arguments>
                                 </configuration>
diff --git 
a/extensions-contrib/rabbit-stream-indexing-service/src/main/java/org/apache/druid/indexing/rabbitstream/supervisor/RabbitStreamSupervisor.java
 
b/extensions-contrib/rabbit-stream-indexing-service/src/main/java/org/apache/druid/indexing/rabbitstream/supervisor/RabbitStreamSupervisor.java
index 9ce9bd331cf..89a05a77022 100644
--- 
a/extensions-contrib/rabbit-stream-indexing-service/src/main/java/org/apache/druid/indexing/rabbitstream/supervisor/RabbitStreamSupervisor.java
+++ 
b/extensions-contrib/rabbit-stream-indexing-service/src/main/java/org/apache/druid/indexing/rabbitstream/supervisor/RabbitStreamSupervisor.java
@@ -53,8 +53,6 @@ import 
org.apache.druid.indexing.seekablestream.supervisor.SeekableStreamSupervi
 import 
org.apache.druid.indexing.seekablestream.supervisor.SeekableStreamSupervisorReportPayload;
 import org.apache.druid.java.util.common.StringUtils;
 import org.apache.druid.java.util.emitter.EmittingLogger;
-import org.apache.druid.java.util.emitter.service.ServiceEmitter;
-import org.apache.druid.java.util.metrics.DruidMonitorSchedulerConfig;
 import org.apache.druid.segment.incremental.RowIngestionMetersFactory;
 import org.joda.time.DateTime;
 
@@ -89,8 +87,6 @@ public class RabbitStreamSupervisor extends 
SeekableStreamSupervisor<String, Lon
   private static final Long NOT_SET = -1L;
   private static final Long END_OF_PARTITION = Long.MAX_VALUE;
 
-  private final ServiceEmitter emitter;
-  private final DruidMonitorSchedulerConfig monitorSchedulerConfig;
   private volatile Map<String, Long> latestSequenceFromStream;
 
   private final RabbitStreamSupervisorSpec spec;
@@ -116,8 +112,6 @@ public class RabbitStreamSupervisor extends 
SeekableStreamSupervisor<String, Lon
         false);
 
     this.spec = spec;
-    this.emitter = spec.getEmitter();
-    this.monitorSchedulerConfig = spec.getMonitorSchedulerConfig();
   }
 
   @Override
@@ -168,7 +162,7 @@ public class RabbitStreamSupervisor extends 
SeekableStreamSupervisor<String, Lon
         ioConfig.getTaskDuration().getMillis() / 1000,
         includeOffsets ? latestSequenceFromStream : null,
         includeOffsets ? partitionLag : null,
-        includeOffsets ? partitionLag.values().stream().mapToLong(x -> 
Math.max(x, 0)).sum() : null,
+        includeOffsets ? aggregatePartitionLags(partitionLag).getTotalLag() : 
null,
         includeOffsets ? sequenceLastUpdated : null,
         spec.isSuspended(),
         stateManager.isHealthy(),
@@ -363,7 +357,7 @@ public class RabbitStreamSupervisor extends 
SeekableStreamSupervisor<String, Lon
       return new LagStats(0, 0, 0);
     }
 
-    return computeLags(partitionRecordLag);
+    return aggregatePartitionLags(partitionRecordLag);
   }
 
   @Override
diff --git 
a/extensions-contrib/rabbit-stream-indexing-service/src/main/java/org/apache/druid/indexing/rabbitstream/supervisor/RabbitStreamSupervisorIOConfig.java
 
b/extensions-contrib/rabbit-stream-indexing-service/src/main/java/org/apache/druid/indexing/rabbitstream/supervisor/RabbitStreamSupervisorIOConfig.java
index bce4164c5d2..0538163df61 100644
--- 
a/extensions-contrib/rabbit-stream-indexing-service/src/main/java/org/apache/druid/indexing/rabbitstream/supervisor/RabbitStreamSupervisorIOConfig.java
+++ 
b/extensions-contrib/rabbit-stream-indexing-service/src/main/java/org/apache/druid/indexing/rabbitstream/supervisor/RabbitStreamSupervisorIOConfig.java
@@ -24,13 +24,13 @@ import com.fasterxml.jackson.annotation.JsonProperty;
 import com.google.common.base.Preconditions;
 import org.apache.druid.data.input.InputFormat;
 import org.apache.druid.indexing.seekablestream.supervisor.IdleConfig;
+import org.apache.druid.indexing.seekablestream.supervisor.LagAggregator;
 import 
org.apache.druid.indexing.seekablestream.supervisor.SeekableStreamSupervisorIOConfig;
 import 
org.apache.druid.indexing.seekablestream.supervisor.autoscaler.AutoScalerConfig;
 import org.joda.time.DateTime;
 import org.joda.time.Period;
 
 import javax.annotation.Nullable;
-
 import java.util.Map;
 
 public class RabbitStreamSupervisorIOConfig extends 
SeekableStreamSupervisorIOConfig
@@ -81,6 +81,7 @@ public class RabbitStreamSupervisorIOConfig extends 
SeekableStreamSupervisorIOCo
         lateMessageRejectionPeriod,
         earlyMessageRejectionPeriod,
         autoScalerConfig,
+        LagAggregator.DEFAULT,
         lateMessageRejectionStartDateTime,
         new IdleConfig(null, null),
         stopTaskCount
diff --git 
a/extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/indexing/kafka/supervisor/KafkaSupervisor.java
 
b/extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/indexing/kafka/supervisor/KafkaSupervisor.java
index b990c8fa3fc..743e4edc32c 100644
--- 
a/extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/indexing/kafka/supervisor/KafkaSupervisor.java
+++ 
b/extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/indexing/kafka/supervisor/KafkaSupervisor.java
@@ -174,7 +174,7 @@ public class KafkaSupervisor extends 
SeekableStreamSupervisor<KafkaTopicPartitio
         includeOffsets ? latestSequenceFromStream : null,
         includeOffsets ? partitionLag : null,
         includeOffsets ? getPartitionTimeLag() : null,
-        includeOffsets ? partitionLag.values().stream().mapToLong(x -> 
Math.max(x, 0)).sum() : null,
+        includeOffsets ? aggregatePartitionLags(partitionLag).getTotalLag() : 
null,
         includeOffsets ? sequenceLastUpdated : null,
         spec.isSuspended(),
         stateManager.isHealthy(),
@@ -379,7 +379,7 @@ public class KafkaSupervisor extends 
SeekableStreamSupervisor<KafkaTopicPartitio
       return new LagStats(0, 0, 0);
     }
 
-    return computeLags(partitionRecordLag);
+    return aggregatePartitionLags(partitionRecordLag);
   }
 
   /**
diff --git 
a/extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/indexing/kafka/supervisor/KafkaSupervisorIOConfig.java
 
b/extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/indexing/kafka/supervisor/KafkaSupervisorIOConfig.java
index be4f6cfb196..4eac3163fe3 100644
--- 
a/extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/indexing/kafka/supervisor/KafkaSupervisorIOConfig.java
+++ 
b/extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/indexing/kafka/supervisor/KafkaSupervisorIOConfig.java
@@ -27,6 +27,7 @@ import org.apache.druid.data.input.InputFormat;
 import org.apache.druid.error.InvalidInput;
 import org.apache.druid.indexing.seekablestream.extension.KafkaConfigOverrides;
 import org.apache.druid.indexing.seekablestream.supervisor.IdleConfig;
+import org.apache.druid.indexing.seekablestream.supervisor.LagAggregator;
 import 
org.apache.druid.indexing.seekablestream.supervisor.SeekableStreamSupervisorIOConfig;
 import 
org.apache.druid.indexing.seekablestream.supervisor.autoscaler.AutoScalerConfig;
 import org.apache.druid.java.util.common.StringUtils;
@@ -64,6 +65,7 @@ public class KafkaSupervisorIOConfig extends 
SeekableStreamSupervisorIOConfig
       @JsonProperty("taskDuration") Period taskDuration,
       @JsonProperty("consumerProperties") Map<String, Object> 
consumerProperties,
       @Nullable @JsonProperty("autoScalerConfig") AutoScalerConfig 
autoScalerConfig,
+      @Nullable @JsonProperty("lagAggregator") LagAggregator lagAggregator,
       @JsonProperty("pollTimeout") Long pollTimeout,
       @JsonProperty("startDelay") Period startDelay,
       @JsonProperty("period") Period period,
@@ -91,6 +93,7 @@ public class KafkaSupervisorIOConfig extends 
SeekableStreamSupervisorIOConfig
         lateMessageRejectionPeriod,
         earlyMessageRejectionPeriod,
         autoScalerConfig,
+        Configs.valueOrDefault(lagAggregator, LagAggregator.DEFAULT),
         lateMessageRejectionStartDateTime,
         idleConfig,
         stopTaskCount
diff --git 
a/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/KafkaSamplerSpecTest.java
 
b/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/KafkaSamplerSpecTest.java
index dd7efcdea51..f0d2bacd359 100644
--- 
a/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/KafkaSamplerSpecTest.java
+++ 
b/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/KafkaSamplerSpecTest.java
@@ -158,6 +158,7 @@ public class KafkaSamplerSpecTest extends 
InitializedNullHandlingTest
             null,
             null,
             null,
+            null,
             true,
             null,
             null,
@@ -212,6 +213,7 @@ public class KafkaSamplerSpecTest extends 
InitializedNullHandlingTest
             null,
             null,
             null,
+            null,
             true,
             null,
             null,
@@ -275,6 +277,7 @@ public class KafkaSamplerSpecTest extends 
InitializedNullHandlingTest
             null,
             null,
             null,
+            null,
             true,
             null,
             null,
@@ -380,6 +383,7 @@ public class KafkaSamplerSpecTest extends 
InitializedNullHandlingTest
             null,
             null,
             null,
+            null,
             true,
             null,
             null,
@@ -565,6 +569,7 @@ public class KafkaSamplerSpecTest extends 
InitializedNullHandlingTest
             null,
             null,
             null,
+            null,
             true,
             null,
             null,
@@ -622,6 +627,7 @@ public class KafkaSamplerSpecTest extends 
InitializedNullHandlingTest
             null,
             null,
             null,
+            null,
             true,
             null,
             null,
diff --git 
a/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/supervisor/KafkaSupervisorIOConfigTest.java
 
b/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/supervisor/KafkaSupervisorIOConfigTest.java
index 211230bf9d0..2da46aaf0e2 100644
--- 
a/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/supervisor/KafkaSupervisorIOConfigTest.java
+++ 
b/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/supervisor/KafkaSupervisorIOConfigTest.java
@@ -28,6 +28,7 @@ import org.apache.druid.indexing.kafka.KafkaConsumerConfigs;
 import org.apache.druid.indexing.kafka.KafkaIndexTaskModule;
 import org.apache.druid.indexing.kafka.KafkaRecordSupplier;
 import org.apache.druid.indexing.seekablestream.supervisor.IdleConfig;
+import org.apache.druid.indexing.seekablestream.supervisor.LagAggregator;
 import 
org.apache.druid.indexing.seekablestream.supervisor.autoscaler.LagBasedAutoScalerConfig;
 import org.apache.druid.jackson.DefaultObjectMapper;
 import org.apache.druid.java.util.common.DateTimes;
@@ -85,7 +86,7 @@ public class KafkaSupervisorIOConfigTest
     Assert.assertEquals(100, config.getPollTimeout());
     Assert.assertEquals(Duration.standardSeconds(5), config.getStartDelay());
     Assert.assertEquals(Duration.standardSeconds(30), config.getPeriod());
-    Assert.assertEquals(false, config.isUseEarliestOffset());
+    Assert.assertFalse(config.isUseEarliestOffset());
     Assert.assertEquals(Duration.standardMinutes(30), 
config.getCompletionTimeout());
     Assert.assertFalse("lateMessageRejectionPeriod", 
config.getLateMessageRejectionPeriod().isPresent());
     Assert.assertFalse("earlyMessageRejectionPeriod", 
config.getEarlyMessageRejectionPeriod().isPresent());
@@ -151,10 +152,10 @@ public class KafkaSupervisorIOConfigTest
     Assert.assertEquals(1000, config.getPollTimeout());
     Assert.assertEquals(Duration.standardMinutes(1), config.getStartDelay());
     Assert.assertEquals(Duration.standardSeconds(10), config.getPeriod());
-    Assert.assertEquals(true, config.isUseEarliestOffset());
+    Assert.assertTrue(config.isUseEarliestOffset());
     Assert.assertEquals(Duration.standardMinutes(45), 
config.getCompletionTimeout());
-    Assert.assertEquals(Duration.standardHours(1), 
config.getLateMessageRejectionPeriod().get());
-    Assert.assertEquals(Duration.standardHours(1), 
config.getEarlyMessageRejectionPeriod().get());
+    Assert.assertEquals(Duration.standardHours(1), 
config.getLateMessageRejectionPeriod().orNull());
+    Assert.assertEquals(Duration.standardHours(1), 
config.getEarlyMessageRejectionPeriod().orNull());
   }
 
   @Test
@@ -194,9 +195,9 @@ public class KafkaSupervisorIOConfigTest
     Assert.assertEquals(1000, config.getPollTimeout());
     Assert.assertEquals(Duration.standardMinutes(1), config.getStartDelay());
     Assert.assertEquals(Duration.standardSeconds(10), config.getPeriod());
-    Assert.assertEquals(true, config.isUseEarliestOffset());
+    Assert.assertTrue(config.isUseEarliestOffset());
     Assert.assertEquals(Duration.standardMinutes(45), 
config.getCompletionTimeout());
-    Assert.assertEquals(DateTimes.of("2016-05-31T12:00Z"), 
config.getLateMessageRejectionStartDateTime().get());
+    Assert.assertEquals(DateTimes.of("2016-05-31T12:00Z"), 
config.getLateMessageRejectionStartDateTime().orNull());
   }
 
   @Test
@@ -286,14 +287,14 @@ public class KafkaSupervisorIOConfigTest
         + "  \"lateMessageRejectionStartDateTime\": \"2016-05-31T12:00Z\"\n"
         + "}";
     exception.expect(JsonMappingException.class);
-    KafkaSupervisorIOConfig config = mapper.readValue(
+    mapper.readValue(
         mapper.writeValueAsString(
             mapper.readValue(
                 jsonStr,
                 KafkaSupervisorIOConfig.class
-                )
-            ), KafkaSupervisorIOConfig.class
-        );
+            )
+        ), KafkaSupervisorIOConfig.class
+    );
   }
 
   @Test
@@ -327,6 +328,7 @@ public class KafkaSupervisorIOConfigTest
         new Period("PT1H"),
         consumerProperties,
         mapper.convertValue(autoScalerConfig, LagBasedAutoScalerConfig.class),
+        null,
         KafkaSupervisorIOConfig.DEFAULT_POLL_TIMEOUT_MILLIS,
         new Period("P1D"),
         new Period("PT30S"),
@@ -361,6 +363,7 @@ public class KafkaSupervisorIOConfigTest
         new Period("PT1H"),
         consumerProperties,
         mapper.convertValue(autoScalerConfig, LagBasedAutoScalerConfig.class),
+        LagAggregator.DEFAULT,
         KafkaSupervisorIOConfig.DEFAULT_POLL_TIMEOUT_MILLIS,
         new Period("P1D"),
         new Period("PT30S"),
@@ -412,6 +415,7 @@ public class KafkaSupervisorIOConfigTest
         new Period("PT1H"),
         consumerProperties,
         null,
+        null,
         KafkaSupervisorIOConfig.DEFAULT_POLL_TIMEOUT_MILLIS,
         new Period("P1D"),
         new Period("PT30S"),
diff --git 
a/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/supervisor/KafkaSupervisorTest.java
 
b/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/supervisor/KafkaSupervisorTest.java
index 65dde0444a7..408aa07d75c 100644
--- 
a/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/supervisor/KafkaSupervisorTest.java
+++ 
b/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/supervisor/KafkaSupervisorTest.java
@@ -311,6 +311,7 @@ public class KafkaSupervisorTest extends EasyMockSupport
         new Period("PT1H"),
         consumerProperties,
         OBJECT_MAPPER.convertValue(autoScalerConfig, 
LagBasedAutoScalerConfig.class),
+        null,
         KafkaSupervisorIOConfig.DEFAULT_POLL_TIMEOUT_MILLIS,
         new Period("P1D"),
         new Period("PT30S"),
@@ -5297,6 +5298,7 @@ public class KafkaSupervisorTest extends EasyMockSupport
         new Period(duration),
         consumerProperties,
         null,
+        null,
         KafkaSupervisorIOConfig.DEFAULT_POLL_TIMEOUT_MILLIS,
         new Period("P1D"),
         new Period("PT30S"),
@@ -5412,6 +5414,7 @@ public class KafkaSupervisorTest extends EasyMockSupport
         new Period(duration),
         consumerProperties,
         null,
+        null,
         KafkaSupervisorIOConfig.DEFAULT_POLL_TIMEOUT_MILLIS,
         new Period("P1D"),
         new Period("PT30S"),
@@ -5530,6 +5533,7 @@ public class KafkaSupervisorTest extends EasyMockSupport
         new Period(duration),
         consumerProperties,
         null,
+        null,
         KafkaSupervisorIOConfig.DEFAULT_POLL_TIMEOUT_MILLIS,
         new Period("P1D"),
         new Period("PT30S"),
diff --git 
a/extensions-core/kinesis-indexing-service/src/main/java/org/apache/druid/indexing/kinesis/supervisor/KinesisSupervisor.java
 
b/extensions-core/kinesis-indexing-service/src/main/java/org/apache/druid/indexing/kinesis/supervisor/KinesisSupervisor.java
index db29d0dae53..e8996391946 100644
--- 
a/extensions-core/kinesis-indexing-service/src/main/java/org/apache/druid/indexing/kinesis/supervisor/KinesisSupervisor.java
+++ 
b/extensions-core/kinesis-indexing-service/src/main/java/org/apache/druid/indexing/kinesis/supervisor/KinesisSupervisor.java
@@ -274,7 +274,7 @@ public class KinesisSupervisor extends 
SeekableStreamSupervisor<String, String,
         stateManager.getSupervisorState(),
         stateManager.getExceptionEvents(),
         includeOffsets ? partitionLag : null,
-        includeOffsets ? partitionLag.values().stream().mapToLong(x -> 
Math.max(x, 0)).sum() : null
+        includeOffsets ? aggregatePartitionLags(partitionLag).getTotalLag() : 
null
     );
   }
 
@@ -387,7 +387,7 @@ public class KinesisSupervisor extends 
SeekableStreamSupervisor<String, String,
       return new LagStats(0, 0, 0);
     }
 
-    return computeLags(partitionTimeLags);
+    return aggregatePartitionLags(partitionTimeLags);
   }
 
   @Override
diff --git 
a/extensions-core/kinesis-indexing-service/src/main/java/org/apache/druid/indexing/kinesis/supervisor/KinesisSupervisorIOConfig.java
 
b/extensions-core/kinesis-indexing-service/src/main/java/org/apache/druid/indexing/kinesis/supervisor/KinesisSupervisorIOConfig.java
index 9910f22a349..ce6eece5af2 100644
--- 
a/extensions-core/kinesis-indexing-service/src/main/java/org/apache/druid/indexing/kinesis/supervisor/KinesisSupervisorIOConfig.java
+++ 
b/extensions-core/kinesis-indexing-service/src/main/java/org/apache/druid/indexing/kinesis/supervisor/KinesisSupervisorIOConfig.java
@@ -27,6 +27,7 @@ import org.apache.druid.data.input.InputFormat;
 import org.apache.druid.indexing.kinesis.KinesisIndexTaskIOConfig;
 import org.apache.druid.indexing.kinesis.KinesisRegion;
 import org.apache.druid.indexing.seekablestream.supervisor.IdleConfig;
+import org.apache.druid.indexing.seekablestream.supervisor.LagAggregator;
 import 
org.apache.druid.indexing.seekablestream.supervisor.SeekableStreamSupervisorIOConfig;
 import 
org.apache.druid.indexing.seekablestream.supervisor.autoscaler.AutoScalerConfig;
 import org.joda.time.DateTime;
@@ -92,6 +93,7 @@ public class KinesisSupervisorIOConfig extends 
SeekableStreamSupervisorIOConfig
         lateMessageRejectionPeriod,
         earlyMessageRejectionPeriod,
         autoScalerConfig,
+        LagAggregator.DEFAULT,
         lateMessageRejectionStartDateTime,
         new IdleConfig(null, null),
         null
diff --git a/extensions-core/testing-tools/pom.xml 
b/extensions-core/testing-tools/pom.xml
index 51cf92b87d3..2042b8033b5 100644
--- a/extensions-core/testing-tools/pom.xml
+++ b/extensions-core/testing-tools/pom.xml
@@ -57,6 +57,18 @@
       <version>${project.parent.version}</version>
       <scope>provided</scope>
     </dependency>
+    <dependency>
+      <groupId>org.apache.druid</groupId>
+      <artifactId>druid-indexing-service</artifactId>
+      <version>${project.parent.version}</version>
+      <scope>provided</scope>
+    </dependency>
+    <dependency>
+      <groupId>org.apache.druid</groupId>
+      <artifactId>druid-services</artifactId>
+      <version>${project.parent.version}</version>
+      <scope>provided</scope>
+    </dependency>
     <dependency>
       <groupId>com.google.guava</groupId>
       <artifactId>guava</artifactId>
@@ -120,6 +132,11 @@
     </dependency>
 
     <!-- Test Dependencies -->
+    <dependency>
+      <groupId>commons-io</groupId>
+      <artifactId>commons-io</artifactId>
+      <scope>test</scope>
+    </dependency>
     <dependency>
       <groupId>junit</groupId>
       <artifactId>junit</artifactId>
@@ -165,6 +182,13 @@
       <artifactId>easymock</artifactId>
       <scope>test</scope>
     </dependency>
+    <dependency>
+      <groupId>org.apache.druid</groupId>
+      <artifactId>druid-indexing-service</artifactId>
+      <version>${project.parent.version}</version>
+      <scope>test</scope>
+      <type>test-jar</type>
+    </dependency>
     <dependency>
       <groupId>org.apache.druid</groupId>
       <artifactId>druid-processing</artifactId>
diff --git 
a/extensions-core/testing-tools/src/main/java/org/apache/druid/guice/ClusterTestingModule.java
 
b/extensions-core/testing-tools/src/main/java/org/apache/druid/guice/ClusterTestingModule.java
new file mode 100644
index 00000000000..c1fa71a5f31
--- /dev/null
+++ 
b/extensions-core/testing-tools/src/main/java/org/apache/druid/guice/ClusterTestingModule.java
@@ -0,0 +1,150 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.guice;
+
+import com.fasterxml.jackson.databind.Module;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.fasterxml.jackson.databind.jsontype.NamedType;
+import com.fasterxml.jackson.databind.module.SimpleModule;
+import com.google.inject.Binder;
+import com.google.inject.Inject;
+import com.google.inject.Provider;
+import org.apache.druid.client.coordinator.CoordinatorClient;
+import org.apache.druid.discovery.NodeRole;
+import org.apache.druid.guice.annotations.Self;
+import org.apache.druid.indexing.common.actions.RemoteTaskActionClientFactory;
+import org.apache.druid.indexing.common.task.Task;
+import org.apache.druid.indexing.overlord.TaskLockbox;
+import org.apache.druid.initialization.DruidModule;
+import org.apache.druid.java.util.common.logger.Logger;
+import org.apache.druid.rpc.indexing.OverlordClient;
+import org.apache.druid.testing.cluster.ClusterTestingTaskConfig;
+import org.apache.druid.testing.cluster.overlord.FaultyLagAggregator;
+import org.apache.druid.testing.cluster.overlord.FaultyTaskLockbox;
+import org.apache.druid.testing.cluster.task.FaultyCoordinatorClient;
+import org.apache.druid.testing.cluster.task.FaultyOverlordClient;
+import 
org.apache.druid.testing.cluster.task.FaultyRemoteTaskActionClientFactory;
+
+import java.io.IOException;
+import java.util.List;
+import java.util.Properties;
+import java.util.Set;
+
+/**
+ * Module that injects faulty clients into the Peon process to simulate various
+ * fault scenarios.
+ */
+public class ClusterTestingModule implements DruidModule
+{
+  private static final Logger log = new Logger(ClusterTestingModule.class);
+
+  private Set<NodeRole> roles;
+  private boolean isClusterTestingEnabled = false;
+
+  @Inject
+  public void configure(
+      Properties props,
+      @Self Set<NodeRole> roles
+  )
+  {
+    this.isClusterTestingEnabled = Boolean.parseBoolean(
+        props.getProperty("druid.unsafe.cluster.testing", "false")
+    );
+    this.roles = roles;
+  }
+
+  @Override
+  public void configure(Binder binder)
+  {
+    if (isClusterTestingEnabled) {
+      log.warn(
+          "Running service in cluster testing mode. This is an unsafe 
test-only"
+          + " mode and must never be used in a production cluster."
+          + " Set property[druid.unsafe.cluster.testing=false] to disable 
testing mode."
+      );
+      bindDependenciesForClusterTestingMode(binder);
+    } else {
+      log.warn("Cluster testing is disabled. Set 
property[druid.unsafe.cluster.testing=true] to enable it.");
+    }
+  }
+
+  private void bindDependenciesForClusterTestingMode(Binder binder)
+  {
+    if (roles.equals(Set.of(NodeRole.PEON))) {
+      // Bind cluster testing config
+      binder.bind(ClusterTestingTaskConfig.class)
+            .toProvider(TestConfigProvider.class)
+            .in(LazySingleton.class);
+
+      // Bind faulty clients for Coordinator, Overlord and task actions
+      binder.bind(CoordinatorClient.class)
+            .to(FaultyCoordinatorClient.class)
+            .in(LazySingleton.class);
+      binder.bind(OverlordClient.class)
+            .to(FaultyOverlordClient.class)
+            .in(LazySingleton.class);
+      binder.bind(RemoteTaskActionClientFactory.class)
+            .to(FaultyRemoteTaskActionClientFactory.class)
+            .in(LazySingleton.class);
+    } else if (roles.contains(NodeRole.OVERLORD)) {
+      // If this is the Overlord, bind a faulty storage coordinator
+      log.warn("Running Overlord in cluster testing mode.");
+      binder.bind(TaskLockbox.class)
+            .to(FaultyTaskLockbox.class)
+            .in(LazySingleton.class);
+    }
+  }
+
+  @Override
+  public List<? extends Module> getJacksonModules()
+  {
+    return List.of(
+        new SimpleModule(getClass().getSimpleName())
+            .registerSubtypes(new NamedType(FaultyLagAggregator.class, 
"faulty"))
+    );
+  }
+
+  private static class TestConfigProvider implements 
Provider<ClusterTestingTaskConfig>
+  {
+    private final Task task;
+    private final ObjectMapper mapper;
+
+    @Inject
+    public TestConfigProvider(Task task, ObjectMapper mapper)
+    {
+      this.task = task;
+      this.mapper = mapper;
+    }
+
+    @Override
+    public ClusterTestingTaskConfig get()
+    {
+      try {
+        final ClusterTestingTaskConfig testingConfig = 
ClusterTestingTaskConfig.forTask(task, mapper);
+        log.warn("Running task in cluster testing mode with config[%s].", 
testingConfig);
+
+        return testingConfig;
+      }
+      catch (IOException e) {
+        throw new RuntimeException(e);
+      }
+    }
+  }
+}
diff --git 
a/extensions-core/testing-tools/src/main/java/org/apache/druid/testing/cluster/ClusterTestingTaskConfig.java
 
b/extensions-core/testing-tools/src/main/java/org/apache/druid/testing/cluster/ClusterTestingTaskConfig.java
new file mode 100644
index 00000000000..c350c951dab
--- /dev/null
+++ 
b/extensions-core/testing-tools/src/main/java/org/apache/druid/testing/cluster/ClusterTestingTaskConfig.java
@@ -0,0 +1,247 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.testing.cluster;
+
+import com.fasterxml.jackson.annotation.JsonCreator;
+import com.fasterxml.jackson.annotation.JsonProperty;
+import com.fasterxml.jackson.core.JsonProcessingException;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import org.apache.druid.common.config.Configs;
+import org.apache.druid.indexing.common.task.Task;
+import org.joda.time.Duration;
+
+import javax.annotation.Nullable;
+import java.util.Map;
+
+/**
+ * Config passed in the context parameter {@code "clusterTesting"} of a task,
+ * used for testing scalability of a Druid cluster by introducing faults at
+ * various interface points.
+ */
+public class ClusterTestingTaskConfig
+{
+  /**
+   * Default config which does not trigger any faults for this task.
+   */
+  public static final ClusterTestingTaskConfig DEFAULT = new 
ClusterTestingTaskConfig(null, null, null, null);
+
+  private final MetadataConfig metadataConfig;
+  private final OverlordClientConfig overlordClientConfig;
+  private final CoordinatorClientConfig coordinatorClientConfig;
+  private final TaskActionClientConfig taskActionClientConfig;
+
+  @JsonCreator
+  public ClusterTestingTaskConfig(
+      @JsonProperty("metadataConfig") @Nullable MetadataConfig metadataConfig,
+      @JsonProperty("overlordClientConfig") @Nullable OverlordClientConfig 
overlordClientConfig,
+      @JsonProperty("coordinatorClientConfig") @Nullable 
CoordinatorClientConfig coordinatorClientConfig,
+      @JsonProperty("taskActionClientConfig") @Nullable TaskActionClientConfig 
taskActionClientConfig
+  )
+  {
+    this.metadataConfig = Configs.valueOrDefault(metadataConfig, 
MetadataConfig.DEFAULT);
+    this.overlordClientConfig = Configs.valueOrDefault(overlordClientConfig, 
OverlordClientConfig.DEFAULT);
+    this.coordinatorClientConfig = 
Configs.valueOrDefault(coordinatorClientConfig, 
CoordinatorClientConfig.DEFAULT);
+    this.taskActionClientConfig = 
Configs.valueOrDefault(taskActionClientConfig, TaskActionClientConfig.DEFAULT);
+  }
+
+  /**
+   * Creates a {@link ClusterTestingTaskConfig} for the given Task by reading
+   * the task context parameter {@code "clusterTesting"}.
+   *
+   * @return Default config {@link ClusterTestingTaskConfig#DEFAULT} if not
+   * specified in the task context parameters.
+   */
+  public static ClusterTestingTaskConfig forTask(Task task, ObjectMapper 
mapper) throws JsonProcessingException
+  {
+    final Map<String, Object> configAsMap = 
task.getContextValue("clusterTesting");
+    final String json = mapper.writeValueAsString(configAsMap);
+
+    final ClusterTestingTaskConfig config = mapper.readValue(json, 
ClusterTestingTaskConfig.class);
+    return Configs.valueOrDefault(config, ClusterTestingTaskConfig.DEFAULT);
+  }
+
+  public MetadataConfig getMetadataConfig()
+  {
+    return metadataConfig;
+  }
+
+  public OverlordClientConfig getOverlordClientConfig()
+  {
+    return overlordClientConfig;
+  }
+
+  public CoordinatorClientConfig getCoordinatorClientConfig()
+  {
+    return coordinatorClientConfig;
+  }
+
+  public TaskActionClientConfig getTaskActionClientConfig()
+  {
+    return taskActionClientConfig;
+  }
+
+  @Override
+  public String toString()
+  {
+    return '{' +
+           "overlordClientConfig=" + overlordClientConfig +
+           ", coordinatorClientConfig=" + coordinatorClientConfig +
+           ", taskActionClientConfig=" + taskActionClientConfig +
+           '}';
+  }
+
+  /**
+   * Config for manipulating communication of the task with the Overlord.
+   */
+  public static class OverlordClientConfig
+  {
+    private static final OverlordClientConfig DEFAULT = new 
OverlordClientConfig();
+
+    @Override
+    public String toString()
+    {
+      return "";
+    }
+  }
+
+  /**
+   * Config for manipulating communication of the task with the Coordinator.
+   */
+  public static class CoordinatorClientConfig
+  {
+    private static final CoordinatorClientConfig DEFAULT = new 
CoordinatorClientConfig(null);
+
+    private final Duration minSegmentHandoffDelay;
+
+    @JsonCreator
+    public CoordinatorClientConfig(
+        @JsonProperty("minSegmentHandoffDelay") @Nullable Duration 
minSegmentHandoffDelay
+    )
+    {
+      this.minSegmentHandoffDelay = minSegmentHandoffDelay;
+    }
+
+    /**
+     * Minimum duration for which segment handoff is assumed to not have 
completed.
+     * The actual handoff status is fetched from the Coordinator only after 
this
+     * duration has elapsed. This delay applies to each segment separately.
+     * <p>
+     * For any segment,
+     * <pre>
+     * observed handoff time = actual handoff time + minSegmentHandoffDelay
+     * </pre>
+     */
+    @Nullable
+    public Duration getMinSegmentHandoffDelay()
+    {
+      return minSegmentHandoffDelay;
+    }
+
+    @Override
+    public String toString()
+    {
+      return "{" +
+             "minSegmentHandoffDelay=" + minSegmentHandoffDelay +
+             '}';
+    }
+  }
+
+  /**
+   * Config for manipulating task actions submitted by the task to the 
Overlord.
+   */
+  public static class TaskActionClientConfig
+  {
+    private static final TaskActionClientConfig DEFAULT = new 
TaskActionClientConfig(null, null);
+
+    private final Duration segmentPublishDelay;
+    private final Duration segmentAllocateDelay;
+
+    @JsonCreator
+    public TaskActionClientConfig(
+        @JsonProperty("segmentAllocateDelay") @Nullable Duration 
segmentAllocateDelay,
+        @JsonProperty("segmentPublishDelay") @Nullable Duration 
segmentPublishDelay
+    )
+    {
+      this.segmentAllocateDelay = segmentAllocateDelay;
+      this.segmentPublishDelay = segmentPublishDelay;
+    }
+
+    /**
+     * Duration to wait before sending a segment publish request to the 
Overlord.
+     * <p>
+     * For each publish request (containing one or more segments),
+     * <pre>
+     * observed publish time = actual publish time + segmentPublishDelay
+     * </pre>
+     */
+    @Nullable
+    public Duration getSegmentPublishDelay()
+    {
+      return segmentPublishDelay;
+    }
+
+    /**
+     * Duration to wait before sending a segment allocate request to the 
Overlord.
+     * <p>
+     * For each segment,
+     * <pre>
+     * observed segment allocation time = actual allocation time + 
segmentAllocateDelay
+     * </pre>
+     */
+    @Nullable
+    public Duration getSegmentAllocateDelay()
+    {
+      return segmentAllocateDelay;
+    }
+
+    @Override
+    public String toString()
+    {
+      return "{" +
+             "segmentPublishDelay=" + segmentPublishDelay +
+             ", segmentAllocateDelay=" + segmentAllocateDelay +
+             '}';
+    }
+  }
+
+  /**
+   * Config for manipulating metadata operations performed by the Overlord for
+   * the task.
+   */
+  public static class MetadataConfig
+  {
+    private static final MetadataConfig DEFAULT = new MetadataConfig(null);
+
+    private final boolean cleanupPendingSegments;
+
+    @JsonCreator
+    public MetadataConfig(
+        @JsonProperty("cleanupPendingSegments") @Nullable Boolean 
cleanupPendingSegments
+    )
+    {
+      this.cleanupPendingSegments = 
Configs.valueOrDefault(cleanupPendingSegments, true);
+    }
+
+    public boolean isCleanupPendingSegments()
+    {
+      return cleanupPendingSegments;
+    }
+  }
+}
diff --git 
a/extensions-core/testing-tools/src/main/java/org/apache/druid/testing/cluster/overlord/FaultyLagAggregator.java
 
b/extensions-core/testing-tools/src/main/java/org/apache/druid/testing/cluster/overlord/FaultyLagAggregator.java
new file mode 100644
index 00000000000..90d9f3ac420
--- /dev/null
+++ 
b/extensions-core/testing-tools/src/main/java/org/apache/druid/testing/cluster/overlord/FaultyLagAggregator.java
@@ -0,0 +1,69 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.testing.cluster.overlord;
+
+import com.fasterxml.jackson.annotation.JsonCreator;
+import com.fasterxml.jackson.annotation.JsonProperty;
+import org.apache.druid.indexing.overlord.supervisor.autoscaler.LagStats;
+import org.apache.druid.indexing.seekablestream.supervisor.LagAggregator;
+import org.apache.druid.java.util.common.logger.Logger;
+
+import java.util.Map;
+
+/**
+ * Implementation of {@link LagAggregator} that supports the following:
+ * <ul>
+ * <li>Specify a {@code multiplier} to amplify the lag observed by the Overlord
+ * for a given supervisor.</li>
+ * </ul>
+ */
+public class FaultyLagAggregator implements LagAggregator
+{
+  private static final Logger log = new Logger(FaultyLagAggregator.class);
+
+  private final int lagMultiplier;
+  private final LagAggregator delegate = LagAggregator.DEFAULT;
+
+  @JsonCreator
+  public FaultyLagAggregator(
+      @JsonProperty("lagMultiplier") int lagMultiplier
+  )
+  {
+    this.lagMultiplier = lagMultiplier;
+    log.info("Multiplying lags by factor[%d].", lagMultiplier);
+  }
+
+  @JsonProperty
+  public int getLagMultiplier()
+  {
+    return lagMultiplier;
+  }
+
+  @Override
+  public <PartitionIdType> LagStats aggregate(Map<PartitionIdType, Long> 
partitionLags)
+  {
+    LagStats originalAggregate = delegate.aggregate(partitionLags);
+    return new LagStats(
+        originalAggregate.getMaxLag() * getLagMultiplier(),
+        originalAggregate.getTotalLag() * getLagMultiplier(),
+        originalAggregate.getAvgLag() * getLagMultiplier()
+    );
+  }
+}
diff --git 
a/extensions-core/testing-tools/src/main/java/org/apache/druid/testing/cluster/overlord/FaultyTaskLockbox.java
 
b/extensions-core/testing-tools/src/main/java/org/apache/druid/testing/cluster/overlord/FaultyTaskLockbox.java
new file mode 100644
index 00000000000..d2639dcfbba
--- /dev/null
+++ 
b/extensions-core/testing-tools/src/main/java/org/apache/druid/testing/cluster/overlord/FaultyTaskLockbox.java
@@ -0,0 +1,85 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.testing.cluster.overlord;
+
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.google.inject.Inject;
+import org.apache.druid.indexing.common.task.Task;
+import org.apache.druid.indexing.overlord.IndexerMetadataStorageCoordinator;
+import org.apache.druid.indexing.overlord.TaskLockbox;
+import org.apache.druid.indexing.overlord.TaskStorage;
+import org.apache.druid.java.util.common.logger.Logger;
+import org.apache.druid.testing.cluster.ClusterTestingTaskConfig;
+
+/**
+ * Implementation of {@link TaskLockbox} that supports the following:
+ * <ul>
+ * <li>Skip cleanup of pending segments</li>
+ * </ul>
+ */
+public class FaultyTaskLockbox extends TaskLockbox
+{
+  private static final Logger log = new Logger(FaultyTaskLockbox.class);
+
+  private final ObjectMapper mapper;
+
+  @Inject
+  public FaultyTaskLockbox(
+      TaskStorage taskStorage,
+      IndexerMetadataStorageCoordinator metadataStorageCoordinator,
+      ObjectMapper mapper
+  )
+  {
+    super(taskStorage, metadataStorageCoordinator);
+    this.mapper = mapper;
+    log.info("Initializing FaultyTaskLockbox.");
+  }
+
+  @Override
+  protected void cleanupPendingSegments(Task task)
+  {
+    final ClusterTestingTaskConfig testingConfig = getTestingConfig(task);
+    if (testingConfig.getMetadataConfig().isCleanupPendingSegments()) {
+      super.cleanupPendingSegments(task);
+    } else {
+      log.info(
+          "Skipping cleanup of pending segments for task[%s] since it has 
testing config[%s].",
+          task.getId(), testingConfig
+      );
+    }
+  }
+
+  private ClusterTestingTaskConfig getTestingConfig(Task task)
+  {
+    try {
+      return ClusterTestingTaskConfig.forTask(task, mapper);
+    }
+    catch (Exception e) {
+      // Just log the exception and proceed with the actual cleanup
+      log.error(
+          e,
+          "Error while reading testing config for task[%s] with context[%s]."
+          + " Using default config with no faults.",
+          task.getId(), task.getContext()
+      );
+      return ClusterTestingTaskConfig.DEFAULT;
+    }
+  }
+}
diff --git 
a/extensions-core/testing-tools/src/main/java/org/apache/druid/testing/cluster/task/FaultyCoordinatorClient.java
 
b/extensions-core/testing-tools/src/main/java/org/apache/druid/testing/cluster/task/FaultyCoordinatorClient.java
new file mode 100644
index 00000000000..c03478f33fa
--- /dev/null
+++ 
b/extensions-core/testing-tools/src/main/java/org/apache/druid/testing/cluster/task/FaultyCoordinatorClient.java
@@ -0,0 +1,108 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.testing.cluster.task;
+
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.google.common.util.concurrent.Futures;
+import com.google.common.util.concurrent.ListenableFuture;
+import com.google.inject.Inject;
+import com.google.inject.Provider;
+import org.apache.druid.client.coordinator.Coordinator;
+import org.apache.druid.client.coordinator.CoordinatorClientImpl;
+import org.apache.druid.discovery.NodeRole;
+import org.apache.druid.guice.annotations.EscalatedGlobal;
+import org.apache.druid.guice.annotations.Json;
+import org.apache.druid.java.util.common.Stopwatch;
+import org.apache.druid.java.util.common.logger.Logger;
+import org.apache.druid.query.SegmentDescriptor;
+import org.apache.druid.rpc.ServiceClientFactory;
+import org.apache.druid.rpc.ServiceLocator;
+import org.apache.druid.rpc.StandardRetryPolicy;
+import org.apache.druid.testing.cluster.ClusterTestingTaskConfig;
+import org.joda.time.Duration;
+
+import java.util.concurrent.ConcurrentHashMap;
+
+/**
+ * Implementation of {@code CoordinatorClient} that supports the following:
+ * <ul>
+ * <li>Add a {@code minSegmentHandoffDelay} before making a call to Coordinator
+ * to get the handoff status of a single segment.</li>
+ * </ul>
+ */
+public class FaultyCoordinatorClient extends CoordinatorClientImpl
+{
+  private static final Logger log = new Logger(FaultyCoordinatorClient.class);
+
+  private final Provider<ClusterTestingTaskConfig> testConfigProvider;
+  private final ConcurrentHashMap<SegmentDescriptor, Stopwatch> 
segmentHandoffTimers = new ConcurrentHashMap<>();
+
+  @Inject
+  public FaultyCoordinatorClient(
+      Provider<ClusterTestingTaskConfig> testingConfigProvider,
+      @Json final ObjectMapper jsonMapper,
+      @EscalatedGlobal final ServiceClientFactory clientFactory,
+      @Coordinator final ServiceLocator serviceLocator
+  )
+  {
+    super(
+        clientFactory.makeClient(
+            NodeRole.COORDINATOR.getJsonName(),
+            serviceLocator,
+            StandardRetryPolicy.builder().maxAttempts(6).build()
+        ),
+        jsonMapper
+    );
+    this.testConfigProvider = testingConfigProvider;
+  }
+
+  @Override
+  public ListenableFuture<Boolean> isHandoffComplete(String dataSource, 
SegmentDescriptor descriptor)
+  {
+    final Duration minHandoffDelay = getHandoffDelay();
+    if (minHandoffDelay != null) {
+      final Stopwatch sinceHandoffCheckStarted = 
segmentHandoffTimers.computeIfAbsent(
+          descriptor,
+          d -> Stopwatch.createStarted()
+      );
+
+      if (sinceHandoffCheckStarted.hasElapsed(minHandoffDelay)) {
+        // Wait period is over, check with Coordinator now, but do not remove
+        // the stopwatch from the map. This ensures that we do not create a new
+        // Stopwatch causing further delays.
+        log.info(
+            "Min handoff delay[%s] has elapsed for segment[%s]. Checking with 
Coordinator for actual handoff status.",
+            minHandoffDelay, descriptor
+        );
+      } else {
+        // Until the min handoff delay has elapsed, keep returning false
+        return Futures.immediateFuture(false);
+      }
+    }
+
+    // Call Coordinator for the actual handoff status
+    return super.isHandoffComplete(dataSource, descriptor);
+  }
+
+  private Duration getHandoffDelay()
+  {
+    return 
testConfigProvider.get().getCoordinatorClientConfig().getMinSegmentHandoffDelay();
+  }
+}
diff --git 
a/extensions-core/testing-tools/src/main/java/org/apache/druid/testing/cluster/task/FaultyOverlordClient.java
 
b/extensions-core/testing-tools/src/main/java/org/apache/druid/testing/cluster/task/FaultyOverlordClient.java
new file mode 100644
index 00000000000..ba96840b833
--- /dev/null
+++ 
b/extensions-core/testing-tools/src/main/java/org/apache/druid/testing/cluster/task/FaultyOverlordClient.java
@@ -0,0 +1,59 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.testing.cluster.task;
+
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.google.inject.Inject;
+import org.apache.druid.client.coordinator.Coordinator;
+import org.apache.druid.discovery.NodeRole;
+import org.apache.druid.guice.annotations.EscalatedGlobal;
+import org.apache.druid.guice.annotations.Json;
+import org.apache.druid.java.util.common.logger.Logger;
+import org.apache.druid.rpc.ServiceClientFactory;
+import org.apache.druid.rpc.ServiceLocator;
+import org.apache.druid.rpc.StandardRetryPolicy;
+import org.apache.druid.rpc.indexing.OverlordClientImpl;
+import org.apache.druid.testing.cluster.ClusterTestingTaskConfig;
+
+public class FaultyOverlordClient extends OverlordClientImpl
+{
+  private static final Logger log = new Logger(FaultyOverlordClient.class);
+
+  private final ClusterTestingTaskConfig testingConfig;
+
+  @Inject
+  public FaultyOverlordClient(
+      ClusterTestingTaskConfig testingConfig,
+      @Json final ObjectMapper jsonMapper,
+      @EscalatedGlobal final ServiceClientFactory clientFactory,
+      @Coordinator final ServiceLocator serviceLocator
+  )
+  {
+    super(
+        clientFactory.makeClient(
+            NodeRole.COORDINATOR.getJsonName(),
+            serviceLocator,
+            StandardRetryPolicy.builder().maxAttempts(6).build()
+        ),
+        jsonMapper
+    );
+    this.testingConfig = testingConfig;
+  }
+}
diff --git 
a/extensions-core/testing-tools/src/main/java/org/apache/druid/testing/cluster/task/FaultyRemoteTaskActionClient.java
 
b/extensions-core/testing-tools/src/main/java/org/apache/druid/testing/cluster/task/FaultyRemoteTaskActionClient.java
new file mode 100644
index 00000000000..8031d7f47db
--- /dev/null
+++ 
b/extensions-core/testing-tools/src/main/java/org/apache/druid/testing/cluster/task/FaultyRemoteTaskActionClient.java
@@ -0,0 +1,93 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.testing.cluster.task;
+
+import org.apache.druid.indexing.common.actions.SegmentAllocateAction;
+import 
org.apache.druid.indexing.common.actions.SegmentTransactionalAppendAction;
+import 
org.apache.druid.indexing.common.actions.SegmentTransactionalInsertAction;
+import 
org.apache.druid.indexing.common.actions.SegmentTransactionalReplaceAction;
+import org.apache.druid.indexing.common.actions.TaskAction;
+import org.apache.druid.indexing.common.actions.TaskActionClient;
+import org.apache.druid.java.util.common.logger.Logger;
+import org.apache.druid.testing.cluster.ClusterTestingTaskConfig;
+import org.joda.time.Duration;
+
+import java.io.IOException;
+
+/**
+ * Implementation of {@link TaskActionClient} that supports the following:
+ * <ul>
+ * <li>Add a {@code segmentAllocateDelay} before sending a segment allocate
+ * request to the Overlord.</li>
+ * <li>Add a {@code segmentPublishDelay} before sending a segment publish
+ * request to the Overlord.</li>
+ * </ul>
+ */
+public class FaultyRemoteTaskActionClient implements TaskActionClient
+{
+  private static final Logger log = new 
Logger(FaultyRemoteTaskActionClient.class);
+
+  private final TaskActionClient delegate;
+  private final ClusterTestingTaskConfig.TaskActionClientConfig config;
+
+  FaultyRemoteTaskActionClient(
+      ClusterTestingTaskConfig.TaskActionClientConfig config,
+      TaskActionClient delegate
+  )
+  {
+    this.config = config;
+    this.delegate = delegate;
+    log.info("Initializing FaultyRemoteTaskActionClient with config[%s].", 
config);
+  }
+
+  @Override
+  public <RetType> RetType submit(TaskAction<RetType> taskAction) throws 
IOException
+  {
+    if (taskAction instanceof SegmentAllocateAction
+        && config.getSegmentAllocateDelay() != null) {
+      log.warn("Sleeping for duration[%s] before allocating segments.", 
config.getSegmentAllocateDelay());
+      sleep(config.getSegmentAllocateDelay());
+    }
+
+    if (isPublishAction(taskAction) && config.getSegmentPublishDelay() != 
null) {
+      log.warn("Sleeping for duration[%s] before publishing segments.", 
config.getSegmentPublishDelay());
+      sleep(config.getSegmentPublishDelay());
+    }
+
+    return delegate.submit(taskAction);
+  }
+
+  private static <R> boolean isPublishAction(TaskAction<R> taskAction)
+  {
+    return taskAction instanceof SegmentTransactionalInsertAction
+           || taskAction instanceof SegmentTransactionalAppendAction
+           || taskAction instanceof SegmentTransactionalReplaceAction;
+  }
+
+  private static void sleep(Duration duration)
+  {
+    try {
+      Thread.sleep(duration.getMillis());
+    }
+    catch (InterruptedException e) {
+      log.info("Interrupted while sleeping before task action.");
+    }
+  }
+}
diff --git 
a/extensions-core/testing-tools/src/main/java/org/apache/druid/testing/cluster/task/FaultyRemoteTaskActionClientFactory.java
 
b/extensions-core/testing-tools/src/main/java/org/apache/druid/testing/cluster/task/FaultyRemoteTaskActionClientFactory.java
new file mode 100644
index 00000000000..043a49f6d14
--- /dev/null
+++ 
b/extensions-core/testing-tools/src/main/java/org/apache/druid/testing/cluster/task/FaultyRemoteTaskActionClientFactory.java
@@ -0,0 +1,62 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.testing.cluster.task;
+
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.google.inject.Inject;
+import org.apache.druid.client.indexing.IndexingService;
+import org.apache.druid.guice.annotations.EscalatedGlobal;
+import org.apache.druid.guice.annotations.Json;
+import org.apache.druid.indexing.common.RetryPolicyConfig;
+import org.apache.druid.indexing.common.actions.RemoteTaskActionClientFactory;
+import org.apache.druid.indexing.common.actions.TaskActionClient;
+import org.apache.druid.indexing.common.task.Task;
+import org.apache.druid.rpc.ServiceClientFactory;
+import org.apache.druid.rpc.ServiceLocator;
+import org.apache.druid.testing.cluster.ClusterTestingTaskConfig;
+
+public class FaultyRemoteTaskActionClientFactory extends 
RemoteTaskActionClientFactory
+{
+  private final ClusterTestingTaskConfig config;
+
+  @Inject
+  public FaultyRemoteTaskActionClientFactory(
+      @Json final ObjectMapper jsonMapper,
+      @EscalatedGlobal final ServiceClientFactory clientFactory,
+      @IndexingService final ServiceLocator serviceLocator,
+      RetryPolicyConfig retryPolicyConfig,
+      ClusterTestingTaskConfig config
+  )
+  {
+    super(
+        jsonMapper,
+        clientFactory,
+        serviceLocator,
+        retryPolicyConfig
+    );
+    this.config = config;
+  }
+
+  @Override
+  public TaskActionClient create(Task task)
+  {
+    return new 
FaultyRemoteTaskActionClient(config.getTaskActionClientConfig(), 
super.create(task));
+  }
+}
diff --git 
a/extensions-core/testing-tools/src/main/resources/META-INF/services/org.apache.druid.initialization.DruidModule
 
b/extensions-core/testing-tools/src/main/resources/META-INF/services/org.apache.druid.initialization.DruidModule
index f2ddf3cf538..2af3ce8a65e 100644
--- 
a/extensions-core/testing-tools/src/main/resources/META-INF/services/org.apache.druid.initialization.DruidModule
+++ 
b/extensions-core/testing-tools/src/main/resources/META-INF/services/org.apache.druid.initialization.DruidModule
@@ -14,3 +14,4 @@
 # limitations under the License.
 
 org.apache.druid.guice.SleepModule
+org.apache.druid.guice.ClusterTestingModule
diff --git 
a/extensions-core/testing-tools/src/test/java/org/apache/druid/guice/ClusterTestingModuleTest.java
 
b/extensions-core/testing-tools/src/test/java/org/apache/druid/guice/ClusterTestingModuleTest.java
new file mode 100644
index 00000000000..39ff06b6035
--- /dev/null
+++ 
b/extensions-core/testing-tools/src/test/java/org/apache/druid/guice/ClusterTestingModuleTest.java
@@ -0,0 +1,288 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.guice;
+
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.google.inject.Injector;
+import org.apache.commons.io.FileUtils;
+import org.apache.druid.cli.CliOverlord;
+import org.apache.druid.cli.CliPeon;
+import org.apache.druid.client.coordinator.CoordinatorClient;
+import org.apache.druid.client.coordinator.CoordinatorClientImpl;
+import org.apache.druid.client.coordinator.NoopCoordinatorClient;
+import org.apache.druid.data.input.impl.JsonInputFormat;
+import org.apache.druid.discovery.NodeRole;
+import org.apache.druid.indexing.common.SegmentCacheManagerFactory;
+import org.apache.druid.indexing.common.actions.RemoteTaskActionClientFactory;
+import org.apache.druid.indexing.common.actions.TaskActionClientFactory;
+import org.apache.druid.indexing.common.config.TaskConfigBuilder;
+import org.apache.druid.indexing.common.task.NoopTask;
+import org.apache.druid.indexing.common.task.Task;
+import 
org.apache.druid.indexing.common.task.batch.parallel.ParallelIndexIOConfig;
+import 
org.apache.druid.indexing.common.task.batch.parallel.ParallelIndexIngestionSpec;
+import 
org.apache.druid.indexing.common.task.batch.parallel.ParallelIndexSupervisorTask;
+import 
org.apache.druid.indexing.common.task.batch.parallel.ParallelIndexTuningConfig;
+import org.apache.druid.indexing.input.DruidInputSource;
+import org.apache.druid.indexing.overlord.TaskLockbox;
+import org.apache.druid.java.util.common.Intervals;
+import org.apache.druid.query.filter.TrueDimFilter;
+import org.apache.druid.rpc.indexing.OverlordClient;
+import org.apache.druid.rpc.indexing.OverlordClientImpl;
+import org.apache.druid.segment.IndexIO;
+import org.apache.druid.segment.TestHelper;
+import org.apache.druid.segment.column.ColumnConfig;
+import org.apache.druid.segment.indexing.DataSchema;
+import org.apache.druid.testing.cluster.ClusterTestingTaskConfig;
+import org.apache.druid.testing.cluster.overlord.FaultyTaskLockbox;
+import org.apache.druid.testing.cluster.task.FaultyCoordinatorClient;
+import org.apache.druid.testing.cluster.task.FaultyOverlordClient;
+import 
org.apache.druid.testing.cluster.task.FaultyRemoteTaskActionClientFactory;
+import org.joda.time.Duration;
+import org.junit.Assert;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.TemporaryFolder;
+
+import java.io.File;
+import java.io.IOException;
+import java.nio.charset.StandardCharsets;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+
+public class ClusterTestingModuleTest
+{
+  private static final ObjectMapper MAPPER = TestHelper
+      .makeJsonMapper()
+      .registerModules(new 
IndexingServiceTuningConfigModule().getJacksonModules())
+      .registerModules(new 
IndexingServiceInputSourceModule().getJacksonModules());
+
+  @Rule
+  public final TemporaryFolder temporaryFolder = new TemporaryFolder();
+
+  @Test
+  public void test_peonRunnable_isNotModified_ifTestingIsDisabled() throws 
IOException
+  {
+    try {
+      final CliPeon peon = new CliPeon();
+      System.setProperty("druid.unsafe.cluster.testing", "false");
+
+      // Write out the task payload in a temporary json file
+      File file = temporaryFolder.newFile("task.json");
+      FileUtils.write(file, "{\"type\":\"noop\"}", StandardCharsets.UTF_8);
+      peon.taskAndStatusFile = List.of(file.getParent(), "1");
+
+      final Injector baseInjector = new 
StartupInjectorBuilder().forServer().build();
+      baseInjector.injectMembers(peon);
+
+      final Injector peonInjector = peon.makeInjector(Set.of(NodeRole.PEON));
+
+      CoordinatorClient coordinatorClient = 
peonInjector.getInstance(CoordinatorClient.class);
+      Assert.assertTrue(coordinatorClient instanceof CoordinatorClientImpl);
+
+      OverlordClient overlordClient = 
peonInjector.getInstance(OverlordClient.class);
+      Assert.assertTrue(overlordClient instanceof OverlordClientImpl);
+
+      TaskActionClientFactory taskActionClientFactory = 
peonInjector.getInstance(TaskActionClientFactory.class);
+      Assert.assertTrue(taskActionClientFactory instanceof 
RemoteTaskActionClientFactory);
+    }
+    finally {
+      System.clearProperty("druid.unsafe.cluster.testing");
+    }
+  }
+
+  @Test
+  public void test_peonRunnable_hasFaultyClients_ifTestingIsEnabled() throws 
IOException
+  {
+    try {
+      final CliPeon peon = new CliPeon();
+      System.setProperty("druid.unsafe.cluster.testing", "true");
+
+      // Write out the task payload in a temporary json file
+      File file = temporaryFolder.newFile("task.json");
+      FileUtils.write(file, "{\"type\":\"noop\"}", StandardCharsets.UTF_8);
+      peon.taskAndStatusFile = List.of(file.getParent(), "1");
+
+      final Injector baseInjector = new 
StartupInjectorBuilder().forServer().build();
+      baseInjector.injectMembers(peon);
+
+      final Injector peonInjector = peon.makeInjector(Set.of(NodeRole.PEON));
+
+      CoordinatorClient coordinatorClient = 
peonInjector.getInstance(CoordinatorClient.class);
+      Assert.assertTrue(coordinatorClient instanceof FaultyCoordinatorClient);
+
+      OverlordClient overlordClient = 
peonInjector.getInstance(OverlordClient.class);
+      Assert.assertTrue(overlordClient instanceof FaultyOverlordClient);
+
+      TaskActionClientFactory taskActionClientFactory = 
peonInjector.getInstance(TaskActionClientFactory.class);
+      Assert.assertTrue(taskActionClientFactory instanceof 
FaultyRemoteTaskActionClientFactory);
+    }
+    finally {
+      System.clearProperty("druid.unsafe.cluster.testing");
+    }
+  }
+
+  @Test
+  public void test_peonRunnable_getsConfigParams_ifProvidedInTaskContext() 
throws IOException
+  {
+    try {
+      final CliPeon peon = new CliPeon();
+      System.setProperty("druid.unsafe.cluster.testing", "true");
+
+      final Task task = new NoopTask(
+          null,
+          null,
+          null,
+          0L,
+          0L,
+          Map.of("clusterTesting", createClusterTestingConfigMap())
+      );
+
+      // Write out the task payload in a temporary json file
+      final String taskJson = MAPPER.writeValueAsString(task);
+      File file = temporaryFolder.newFile("task.json");
+      FileUtils.write(file, taskJson, StandardCharsets.UTF_8);
+      peon.taskAndStatusFile = List.of(file.getParent(), "1");
+
+      final Injector baseInjector = new 
StartupInjectorBuilder().forServer().build();
+      baseInjector.injectMembers(peon);
+
+      final Injector peonInjector = peon.makeInjector(Set.of(NodeRole.PEON));
+
+      final ClusterTestingTaskConfig taskConfig = 
peonInjector.getInstance(ClusterTestingTaskConfig.class);
+      verifyTestingConfig(taskConfig);
+    }
+    finally {
+      System.clearProperty("druid.unsafe.cluster.testing");
+    }
+  }
+
+  @Test
+  public void 
test_parallelIndexSupervisorTask_withDruidInputSource_hasNoCircularDeps() 
throws IOException
+  {
+    try {
+      final CliPeon peon = new CliPeon();
+      System.setProperty("druid.unsafe.cluster.testing", "true");
+
+      // Create a ParallelIndexSupervisorTask
+      final IndexIO indexIO = new IndexIO(MAPPER, ColumnConfig.DEFAULT);
+      final DruidInputSource inputSource = new DruidInputSource(
+          "test",
+          Intervals.ETERNITY,
+          null,
+          TrueDimFilter.instance(),
+          null,
+          null,
+          indexIO,
+          new NoopCoordinatorClient(),
+          new SegmentCacheManagerFactory(indexIO, MAPPER),
+          new TaskConfigBuilder().build()
+      );
+      final ParallelIndexIOConfig ioConfig = new ParallelIndexIOConfig(
+          inputSource,
+          new JsonInputFormat(null, null, null, null, null),
+          false,
+          null
+      );
+      final Task task = new ParallelIndexSupervisorTask(
+          "test-task",
+          null,
+          null,
+          new ParallelIndexIngestionSpec(
+              DataSchema.builder().withDataSource("test").build(),
+              ioConfig,
+              ParallelIndexTuningConfig.defaultConfig()
+          ),
+          Map.of("clusterTesting", createClusterTestingConfigMap())
+      );
+
+      // Write out the task payload in a temporary json file
+      final String taskJson = MAPPER.writeValueAsString(task);
+      File file = temporaryFolder.newFile("task.json");
+      FileUtils.write(file, taskJson, StandardCharsets.UTF_8);
+      peon.taskAndStatusFile = List.of(file.getParent(), "1");
+
+      final Injector baseInjector = new 
StartupInjectorBuilder().forServer().build();
+      baseInjector.injectMembers(peon);
+
+      final Injector peonInjector = peon.makeInjector(Set.of(NodeRole.PEON));
+
+      final ClusterTestingTaskConfig taskConfig = 
peonInjector.getInstance(ClusterTestingTaskConfig.class);
+      verifyTestingConfig(taskConfig);
+    }
+    finally {
+      System.clearProperty("druid.unsafe.cluster.testing");
+    }
+  }
+
+  @Test
+  public void 
test_overlordService_hasFaultyStorageCoordinator_ifTestingIsEnabled()
+  {
+    try {
+      final CliOverlord overlord = new CliOverlord();
+      System.setProperty("druid.unsafe.cluster.testing", "true");
+
+      final Injector baseInjector = new 
StartupInjectorBuilder().forServer().build();
+      baseInjector.injectMembers(overlord);
+
+      final Injector overlordInjector = 
overlord.makeInjector(Set.of(NodeRole.OVERLORD));
+
+      TaskLockbox taskLockbox = 
overlordInjector.getInstance(TaskLockbox.class);
+      Assert.assertTrue(taskLockbox instanceof FaultyTaskLockbox);
+    }
+    finally {
+      System.clearProperty("druid.unsafe.cluster.testing");
+    }
+  }
+
+  private static void verifyTestingConfig(ClusterTestingTaskConfig taskConfig)
+  {
+    Assert.assertNotNull(taskConfig);
+    Assert.assertNotNull(taskConfig.getCoordinatorClientConfig());
+    Assert.assertNotNull(taskConfig.getOverlordClientConfig());
+    Assert.assertNotNull(taskConfig.getTaskActionClientConfig());
+    Assert.assertNotNull(taskConfig.getMetadataConfig());
+
+    Assert.assertEquals(
+        Duration.standardSeconds(10),
+        taskConfig.getTaskActionClientConfig().getSegmentPublishDelay()
+    );
+    Assert.assertEquals(
+        Duration.standardSeconds(5),
+        taskConfig.getTaskActionClientConfig().getSegmentAllocateDelay()
+    );
+    Assert.assertEquals(
+        Duration.standardSeconds(30),
+        taskConfig.getCoordinatorClientConfig().getMinSegmentHandoffDelay()
+    );
+    Assert.assertFalse(
+        taskConfig.getMetadataConfig().isCleanupPendingSegments()
+    );
+  }
+
+  private Map<String, Object> createClusterTestingConfigMap()
+  {
+    return Map.of(
+        "coordinatorClientConfig", Map.of("minSegmentHandoffDelay", "PT30S"),
+        "taskActionClientConfig", Map.of("segmentPublishDelay", "PT10S", 
"segmentAllocateDelay", "PT5S"),
+        "metadataConfig", Map.of("cleanupPendingSegments", false)
+    );
+  }
+}
diff --git 
a/indexing-service/src/main/java/org/apache/druid/indexing/overlord/TaskLockbox.java
 
b/indexing-service/src/main/java/org/apache/druid/indexing/overlord/TaskLockbox.java
index d46cc6071d0..9815bb14a71 100644
--- 
a/indexing-service/src/main/java/org/apache/druid/indexing/overlord/TaskLockbox.java
+++ 
b/indexing-service/src/main/java/org/apache/druid/indexing/overlord/TaskLockbox.java
@@ -1259,30 +1259,45 @@ public class TaskLockbox
         );
       }
 
-      // Clean up pending segments associated with an APPEND task
-      if (task instanceof PendingSegmentAllocatingTask) {
-        final String taskAllocatorId = ((PendingSegmentAllocatingTask) 
task).getTaskAllocatorId();
-        if (activeAllocatorIdToTaskIds.containsKey(taskAllocatorId)) {
-          final Set<String> taskIdsForSameAllocator = 
activeAllocatorIdToTaskIds.get(taskAllocatorId);
-          taskIdsForSameAllocator.remove(task.getId());
-
-          if (taskIdsForSameAllocator.isEmpty()) {
-            final int pendingSegmentsDeleted = metadataStorageCoordinator
-                .deletePendingSegmentsForTaskAllocatorId(task.getDataSource(), 
taskAllocatorId);
-            log.info(
-                "Deleted [%d] entries from pendingSegments table for 
taskAllocatorId[%s].",
-                pendingSegmentsDeleted, taskAllocatorId
-            );
-          }
-          activeAllocatorIdToTaskIds.remove(taskAllocatorId);
-        }
-      }
+      cleanupPendingSegments(task);
     }
     catch (Exception e) {
       log.warn(e, "Failure cleaning up upgradeSegments or pendingSegments 
tables.");
     }
   }
 
+  /**
+   * Cleans up pending segments associated with an APPEND task.
+   */
+  protected void cleanupPendingSegments(Task task)
+  {
+    if (!(task instanceof PendingSegmentAllocatingTask)) {
+      return;
+    }
+
+    giant.lock();
+    try {
+      final String taskAllocatorId = ((PendingSegmentAllocatingTask) 
task).getTaskAllocatorId();
+      if (activeAllocatorIdToTaskIds.containsKey(taskAllocatorId)) {
+        final Set<String> taskIdsForSameAllocator = 
activeAllocatorIdToTaskIds.get(taskAllocatorId);
+        taskIdsForSameAllocator.remove(task.getId());
+
+        if (taskIdsForSameAllocator.isEmpty()) {
+          final int pendingSegmentsDeleted = metadataStorageCoordinator
+              .deletePendingSegmentsForTaskAllocatorId(task.getDataSource(), 
taskAllocatorId);
+          log.info(
+              "Deleted [%d] entries from pendingSegments table for 
taskAllocatorId[%s].",
+              pendingSegmentsDeleted, taskAllocatorId
+          );
+        }
+        activeAllocatorIdToTaskIds.remove(taskAllocatorId);
+      }
+    }
+    finally {
+      giant.unlock();
+    }
+  }
+
   /**
    * Finds all the lock posses for the given task.
    */
diff --git 
a/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/LagAggregator.java
 
b/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/LagAggregator.java
new file mode 100644
index 00000000000..d84ad70aec9
--- /dev/null
+++ 
b/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/LagAggregator.java
@@ -0,0 +1,65 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.indexing.seekablestream.supervisor;
+
+import com.fasterxml.jackson.annotation.JsonSubTypes;
+import com.fasterxml.jackson.annotation.JsonTypeInfo;
+import org.apache.druid.indexing.overlord.supervisor.autoscaler.LagStats;
+
+import java.util.Map;
+
+/**
+ * Calculates the maximum, average and total values of lag from the values of
+ * lag for each stream partition for a given supervisor.
+ * <p>
+ * This interface is currently needed only to augment the capability of the
+ * default implementation {@link DefaultLagAggregator} for testing purposes.
+ */
+@JsonTypeInfo(use = JsonTypeInfo.Id.NAME, property = "type")
+@JsonSubTypes(value = {
+    @JsonSubTypes.Type(name = "default", value = 
LagAggregator.DefaultLagAggregator.class)
+})
+public interface LagAggregator
+{
+  LagAggregator DEFAULT = new DefaultLagAggregator();
+
+  <PartitionIdType> LagStats aggregate(Map<PartitionIdType, Long> 
partitionLags);
+
+  /**
+   * Default implementation of LagAggregator which should be used for all
+   * production use cases.
+   */
+  class DefaultLagAggregator implements LagAggregator
+  {
+    @Override
+    public <PartitionIdType> LagStats aggregate(Map<PartitionIdType, Long> 
partitionLags)
+    {
+      long maxLag = 0, totalLag = 0;
+      for (long lag : partitionLags.values()) {
+        if (lag > maxLag) {
+          maxLag = lag;
+        }
+        totalLag += Math.max(lag, 0);
+      }
+      final long avgLag = partitionLags.isEmpty() ? 0 : totalLag / 
partitionLags.size();
+      return new LagStats(maxLag, totalLag, avgLag);
+    }
+  }
+}
diff --git 
a/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisor.java
 
b/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisor.java
index de231458ef1..9c6fb46e82e 100644
--- 
a/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisor.java
+++ 
b/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisor.java
@@ -4598,7 +4598,7 @@ public abstract class 
SeekableStreamSupervisor<PartitionIdType, SequenceOffsetTy
           return;
         }
 
-        LagStats lagStats = computeLags(partitionLags);
+        final LagStats lagStats = aggregatePartitionLags(partitionLags);
         Map<String, Object> metricTags = 
spec.getContextValue(DruidMetrics.TAGS);
         for (Map.Entry<PartitionIdType, Long> entry : 
partitionLags.entrySet()) {
           emitter.emit(
@@ -4651,17 +4651,9 @@ public abstract class 
SeekableStreamSupervisor<PartitionIdType, SequenceOffsetTy
    *
    * @param partitionLags lags per partition
    */
-  protected LagStats computeLags(Map<PartitionIdType, Long> partitionLags)
+  protected LagStats aggregatePartitionLags(Map<PartitionIdType, Long> 
partitionLags)
   {
-    long maxLag = 0, totalLag = 0, avgLag;
-    for (long lag : partitionLags.values()) {
-      if (lag > maxLag) {
-        maxLag = lag;
-      }
-      totalLag += lag;
-    }
-    avgLag = partitionLags.size() == 0 ? 0 : totalLag / partitionLags.size();
-    return new LagStats(maxLag, totalLag, avgLag);
+    return spec.getIoConfig().getLagAggregator().aggregate(partitionLags);
   }
 
   /**
diff --git 
a/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisorIOConfig.java
 
b/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisorIOConfig.java
index 4b3e9650cb6..a28f06d337e 100644
--- 
a/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisorIOConfig.java
+++ 
b/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisorIOConfig.java
@@ -23,6 +23,7 @@ import com.fasterxml.jackson.annotation.JsonProperty;
 import com.google.common.base.Optional;
 import com.google.common.base.Preconditions;
 import org.apache.druid.data.input.InputFormat;
+import org.apache.druid.error.InvalidInput;
 import 
org.apache.druid.indexing.seekablestream.supervisor.autoscaler.AutoScalerConfig;
 import org.apache.druid.java.util.common.IAE;
 import org.joda.time.DateTime;
@@ -51,6 +52,8 @@ public abstract class SeekableStreamSupervisorIOConfig
   @Nullable private final IdleConfig idleConfig;
   @Nullable private final Integer stopTaskCount;
 
+  private final LagAggregator lagAggregator;
+
   public SeekableStreamSupervisorIOConfig(
       String stream,
       @Nullable InputFormat inputFormat,
@@ -64,6 +67,7 @@ public abstract class SeekableStreamSupervisorIOConfig
       Period lateMessageRejectionPeriod,
       Period earlyMessageRejectionPeriod,
       @Nullable AutoScalerConfig autoScalerConfig,
+      LagAggregator lagAggregator,
       DateTime lateMessageRejectionStartDateTime,
       @Nullable IdleConfig idleConfig,
       @Nullable Integer stopTaskCount
@@ -72,6 +76,12 @@ public abstract class SeekableStreamSupervisorIOConfig
     this.stream = Preconditions.checkNotNull(stream, "stream cannot be null");
     this.inputFormat = inputFormat;
     this.replicas = replicas != null ? replicas : 1;
+
+    InvalidInput.conditionalException(
+        lagAggregator != null,
+        "'lagAggregator' must be specified in supervisor 'spec.ioConfig'"
+    );
+    this.lagAggregator = lagAggregator;
     // Could be null
     this.autoScalerConfig = autoScalerConfig;
     // if autoscaler is enable then taskcount will be ignored here. and init 
taskcount will be equal to taskCountMin
@@ -140,6 +150,12 @@ public abstract class SeekableStreamSupervisorIOConfig
     return autoScalerConfig;
   }
 
+  @JsonProperty
+  public LagAggregator getLagAggregator()
+  {
+    return lagAggregator;
+  }
+
   @JsonProperty
   public Integer getTaskCount()
   {
diff --git 
a/indexing-service/src/test/java/org/apache/druid/indexing/seekablestream/SeekableStreamSamplerSpecTest.java
 
b/indexing-service/src/test/java/org/apache/druid/indexing/seekablestream/SeekableStreamSamplerSpecTest.java
index 740953dc076..169bbc4a788 100644
--- 
a/indexing-service/src/test/java/org/apache/druid/indexing/seekablestream/SeekableStreamSamplerSpecTest.java
+++ 
b/indexing-service/src/test/java/org/apache/druid/indexing/seekablestream/SeekableStreamSamplerSpecTest.java
@@ -41,6 +41,7 @@ import 
org.apache.druid.indexing.seekablestream.common.OrderedPartitionableRecor
 import org.apache.druid.indexing.seekablestream.common.RecordSupplier;
 import org.apache.druid.indexing.seekablestream.common.StreamPartition;
 import org.apache.druid.indexing.seekablestream.supervisor.IdleConfig;
+import org.apache.druid.indexing.seekablestream.supervisor.LagAggregator;
 import 
org.apache.druid.indexing.seekablestream.supervisor.SeekableStreamSupervisorIOConfig;
 import 
org.apache.druid.indexing.seekablestream.supervisor.SeekableStreamSupervisorSpec;
 import 
org.apache.druid.indexing.seekablestream.supervisor.autoscaler.AutoScalerConfig;
@@ -341,6 +342,7 @@ public class SeekableStreamSamplerSpecTest extends 
EasyMockSupport
           lateMessageRejectionPeriod,
           earlyMessageRejectionPeriod,
           autoScalerConfig,
+          LagAggregator.DEFAULT,
           lateMessageRejectionStartDateTime,
           idleConfig,
           null
diff --git 
a/indexing-service/src/test/java/org/apache/druid/indexing/seekablestream/SeekableStreamSupervisorSpecTest.java
 
b/indexing-service/src/test/java/org/apache/druid/indexing/seekablestream/SeekableStreamSupervisorSpecTest.java
index e8773001407..d38371275d2 100644
--- 
a/indexing-service/src/test/java/org/apache/druid/indexing/seekablestream/SeekableStreamSupervisorSpecTest.java
+++ 
b/indexing-service/src/test/java/org/apache/druid/indexing/seekablestream/SeekableStreamSupervisorSpecTest.java
@@ -44,6 +44,7 @@ import 
org.apache.druid.indexing.overlord.supervisor.autoscaler.SupervisorTaskAu
 import org.apache.druid.indexing.seekablestream.common.OrderedSequenceNumber;
 import org.apache.druid.indexing.seekablestream.common.RecordSupplier;
 import org.apache.druid.indexing.seekablestream.supervisor.IdleConfig;
+import org.apache.druid.indexing.seekablestream.supervisor.LagAggregator;
 import 
org.apache.druid.indexing.seekablestream.supervisor.SeekableStreamSupervisor;
 import 
org.apache.druid.indexing.seekablestream.supervisor.SeekableStreamSupervisorIOConfig;
 import 
org.apache.druid.indexing.seekablestream.supervisor.SeekableStreamSupervisorIngestionSpec;
@@ -1134,6 +1135,7 @@ public class SeekableStreamSupervisorSpecTest extends 
EasyMockSupport
         null,
         null,
         null,
+        LagAggregator.DEFAULT,
         null,
         null,
         null
@@ -1189,6 +1191,7 @@ public class SeekableStreamSupervisorSpecTest extends 
EasyMockSupport
         null,
         null,
         null,
+        LagAggregator.DEFAULT,
         null,
         new IdleConfig(true, null),
         null
@@ -1512,6 +1515,7 @@ public class SeekableStreamSupervisorSpecTest extends 
EasyMockSupport
           null,
           null,
           mapper.convertValue(getScaleOutProperties(2), 
AutoScalerConfig.class),
+          LagAggregator.DEFAULT,
           null,
           null,
           null
@@ -1532,6 +1536,7 @@ public class SeekableStreamSupervisorSpecTest extends 
EasyMockSupport
           null,
           null,
           mapper.convertValue(getScaleInProperties(), AutoScalerConfig.class),
+          LagAggregator.DEFAULT,
           null,
           null,
           null
diff --git 
a/indexing-service/src/test/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisorStateTest.java
 
b/indexing-service/src/test/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisorStateTest.java
index 259869eee7e..612e22a9d96 100644
--- 
a/indexing-service/src/test/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisorStateTest.java
+++ 
b/indexing-service/src/test/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisorStateTest.java
@@ -688,6 +688,7 @@ public class SeekableStreamSupervisorStateTest extends 
EasyMockSupport
         null,
         null,
         null,
+        LagAggregator.DEFAULT,
         null,
         new IdleConfig(true, 200L),
         null
@@ -791,6 +792,7 @@ public class SeekableStreamSupervisorStateTest extends 
EasyMockSupport
         null,
         null,
         null,
+        LagAggregator.DEFAULT,
         null,
         new IdleConfig(true, 200L),
         null
@@ -1077,21 +1079,22 @@ public class SeekableStreamSupervisorStateTest extends 
EasyMockSupport
   {
     DateTime startTime = DateTimes.nowUtc();
     SeekableStreamSupervisorIOConfig ioConfig = new 
SeekableStreamSupervisorIOConfig(
-            STREAM,
-            new JsonInputFormat(new JSONPathSpec(true, ImmutableList.of()), 
ImmutableMap.of(), false, false, false),
-            1,
-            1,
-            new Period("PT1H"),
-            new Period("PT1S"),
-            new Period("PT30S"),
-            false,
-            new Period("PT30M"),
-            null,
-            null,
-            null,
-            null,
-            new IdleConfig(true, 200L),
-            null
+        STREAM,
+        new JsonInputFormat(new JSONPathSpec(true, ImmutableList.of()), 
ImmutableMap.of(), false, false, false),
+        1,
+        1,
+        new Period("PT1H"),
+        new Period("PT1S"),
+        new Period("PT30S"),
+        false,
+        new Period("PT30M"),
+        null,
+        null,
+        null,
+        LagAggregator.DEFAULT,
+        null,
+        new IdleConfig(true, 200L),
+        null
     ) {};
 
     EasyMock.reset(spec);
@@ -1296,6 +1299,7 @@ public class SeekableStreamSupervisorStateTest extends 
EasyMockSupport
         null,
         null,
         null,
+        LagAggregator.DEFAULT,
         null,
         new IdleConfig(true, 200L),
         stopTaskCount
@@ -1520,6 +1524,7 @@ public class SeekableStreamSupervisorStateTest extends 
EasyMockSupport
         null,
         null,
         null,
+        LagAggregator.DEFAULT,
         null,
         new IdleConfig(true, 200L),
         null
@@ -2562,6 +2567,7 @@ public class SeekableStreamSupervisorStateTest extends 
EasyMockSupport
         null,
         null,
         null,
+        LagAggregator.DEFAULT,
         null,
         null,
         null
@@ -2624,6 +2630,7 @@ public class SeekableStreamSupervisorStateTest extends 
EasyMockSupport
         null,
         null,
         OBJECT_MAPPER.convertValue(getProperties(), AutoScalerConfig.class),
+        LagAggregator.DEFAULT,
         null,
         null,
         null
diff --git a/integration-tests/docker/environment-configs/common 
b/integration-tests/docker/environment-configs/common
index f71f6f5f563..e0de0b393e1 100644
--- a/integration-tests/docker/environment-configs/common
+++ b/integration-tests/docker/environment-configs/common
@@ -83,5 +83,8 @@ druid_audit_manager_type=log
 # Can remove this when the flag is no longer needed
 druid_indexer_task_ignoreTimestampSpecForDruidInputSource=true
 
+# Cluster testing
+druid_unsafe_cluster_testing=true
+
 # Dart
 druid_msq_dart_enabled = true
diff --git 
a/integration-tests/src/main/java/org/apache/druid/testing/clients/OverlordResourceTestClient.java
 
b/integration-tests/src/main/java/org/apache/druid/testing/clients/OverlordResourceTestClient.java
index b4c97814bbb..f4e3448d331 100644
--- 
a/integration-tests/src/main/java/org/apache/druid/testing/clients/OverlordResourceTestClient.java
+++ 
b/integration-tests/src/main/java/org/apache/druid/testing/clients/OverlordResourceTestClient.java
@@ -513,6 +513,13 @@ public class OverlordResourceTestClient
   }
 
   public SupervisorStateManager.BasicState getSupervisorStatus(String id)
+  {
+    final String state = (String) getFullSupervisorStatus(id).get("state");
+    LOG.debug("Supervisor id[%s] has state [%s]", id, state);
+    return SupervisorStateManager.BasicState.valueOf(state);
+  }
+
+  public Map<String, Object> getFullSupervisorStatus(String id)
   {
     try {
       StatusResponseHolder response = httpClient.go(
@@ -537,13 +544,10 @@ public class OverlordResourceTestClient
           response.getContent(), JacksonUtils.TYPE_REFERENCE_MAP_STRING_OBJECT
       );
 
-      Map<String, Object> payload = jsonMapper.convertValue(
+      return jsonMapper.convertValue(
           responseData.get("payload"),
           JacksonUtils.TYPE_REFERENCE_MAP_STRING_OBJECT
       );
-      String state = (String) payload.get("state");
-      LOG.debug("Supervisor id[%s] has state [%s]", id, state);
-      return SupervisorStateManager.BasicState.valueOf(state);
     }
     catch (ISE e) {
       throw e;
diff --git 
a/integration-tests/src/test/java/org/apache/druid/tests/coordinator/duty/ITFaultyClusterTest.java
 
b/integration-tests/src/test/java/org/apache/druid/tests/coordinator/duty/ITFaultyClusterTest.java
new file mode 100644
index 00000000000..a3a24e3f2ed
--- /dev/null
+++ 
b/integration-tests/src/test/java/org/apache/druid/tests/coordinator/duty/ITFaultyClusterTest.java
@@ -0,0 +1,196 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.tests.coordinator.duty;
+
+import org.apache.druid.java.util.common.DateTimes;
+import org.apache.druid.java.util.common.logger.Logger;
+import org.apache.druid.testing.cluster.ClusterTestingTaskConfig;
+import org.apache.druid.testing.cluster.overlord.FaultyLagAggregator;
+import org.apache.druid.testing.guice.DruidTestModuleFactory;
+import org.apache.druid.testing.utils.EventSerializer;
+import org.apache.druid.testing.utils.ITRetryUtil;
+import org.apache.druid.testing.utils.KafkaUtil;
+import org.apache.druid.testing.utils.StreamEventWriter;
+import org.apache.druid.testing.utils.StreamGenerator;
+import org.apache.druid.testing.utils.WikipediaStreamEventStreamGenerator;
+import org.apache.druid.tests.TestNGGroup;
+import org.apache.druid.tests.indexer.AbstractKafkaIndexingServiceTest;
+import org.apache.druid.tests.indexer.AbstractStreamIndexingTest;
+import org.joda.time.DateTime;
+import org.joda.time.Period;
+import org.testng.annotations.BeforeClass;
+import org.testng.annotations.BeforeMethod;
+import org.testng.annotations.DataProvider;
+import org.testng.annotations.Guice;
+import org.testng.annotations.Test;
+
+import java.io.Closeable;
+import java.util.Map;
+
+/**
+ * Integration test to verify induction of various faults in the cluster
+ * using {@link ClusterTestingTaskConfig}.
+ * <p>
+ * Future tests can try to leverage the cluster testing config to write tests
+ * for cluster scalability and stability.
+ */
+@Test(groups = {TestNGGroup.KAFKA_INDEX_SLOW})
+@Guice(moduleFactory = DruidTestModuleFactory.class)
+public class ITFaultyClusterTest extends AbstractKafkaIndexingServiceTest
+{
+  private static final Logger log = new Logger(ITFaultyClusterTest.class);
+
+  private GeneratedTestConfig generatedTestConfig;
+  private StreamGenerator streamGenerator;
+
+  private String fullDatasourceName;
+
+  @DataProvider
+  public static Object[] getParameters()
+  {
+    return new Object[]{false, true};
+  }
+
+  @BeforeClass
+  public void setupClass() throws Exception
+  {
+    doBeforeClass();
+  }
+
+  @BeforeMethod
+  public void setup() throws Exception
+  {
+    generatedTestConfig = new GeneratedTestConfig(
+        Specs.PARSER_TYPE,
+        getResourceAsString(Specs.INPUT_FORMAT_PATH)
+    );
+    fullDatasourceName = generatedTestConfig.getFullDatasourceName();
+    final EventSerializer serializer = jsonMapper.readValue(
+        getResourceAsStream(Specs.SERIALIZER_PATH),
+        EventSerializer.class
+    );
+    streamGenerator = new WikipediaStreamEventStreamGenerator(serializer, 6, 
100);
+  }
+
+  @Override
+  public String getTestNamePrefix()
+  {
+    return "faulty_cluster";
+  }
+
+  @Test(dataProvider = "getParameters")
+  public void test_streamingIngestion_worksWithFaultyCluster(boolean 
transactionEnabled) throws Exception
+  {
+    if (shouldSkipTest(transactionEnabled)) {
+      return;
+    }
+
+    try (
+        final Closeable closer = createResourceCloser(generatedTestConfig);
+        final StreamEventWriter streamEventWriter = 
createStreamEventWriter(config, transactionEnabled)
+    ) {
+      // Set context parameters
+      final Map<String, Object> taskContext = Map.of(
+          "clusterTesting",
+          Map.of(
+              "metadataConfig", Map.of("cleanupPendingSegments", false),
+              "coordinatorClientConfig", Map.of("minSegmentHandoffDelay", 
"PT10S"),
+              "taskActionClientConfig", Map.of("segmentAllocateDelay", 
"PT10S", "segmentPublishDelay", "PT10S")
+          )
+      );
+
+      // Start supervisor
+      final String supervisorSpec = generatedTestConfig
+          .withContext(taskContext)
+          .withLagAggregator(new FaultyLagAggregator(1_000_000))
+          .getStreamIngestionPropsTransform()
+          .apply(getResourceAsString(SUPERVISOR_SPEC_TEMPLATE_PATH));
+
+      final String supervisorId = indexer.submitSupervisor(supervisorSpec);
+      generatedTestConfig.setSupervisorId(supervisorId);
+      log.info("Submitted supervisor[%s] with spec[%s]", supervisorId, 
supervisorSpec);
+
+      // Generate data for minutes 1, 2 and 3
+      final DateTime firstRecordTime = DateTimes.of("2000-01-01T01:01:00Z");
+      final long rowsForMinute1 = generateDataForMinute(firstRecordTime, 
streamEventWriter);
+      final long rowsForMinute2 = 
generateDataForMinute(firstRecordTime.plus(Period.minutes(1)), 
streamEventWriter);
+      final long rowsForMinute3 = 
generateDataForMinute(firstRecordTime.plus(Period.minutes(2)), 
streamEventWriter);
+
+      ITRetryUtil.retryUntilTrue(
+          () -> {
+            final Integer aggregateLag = (Integer) 
indexer.getFullSupervisorStatus(supervisorId).get("aggregateLag");
+            log.info("Aggregate lag is [%d].", aggregateLag);
+            return aggregateLag != null && aggregateLag > 1_000_000;
+          },
+          "Aggregate lag exceeds 1M"
+      );
+
+      // Wait for data to be ingested for all the minutes
+      waitUntilDatasourceRowCountEquals(fullDatasourceName, rowsForMinute1 + 
rowsForMinute2 + rowsForMinute3);
+      waitForSegmentsToLoad(fullDatasourceName);
+
+      // 2 segments for each minute, total 6
+      waitUntilDatasourceSegmentCountEquals(fullDatasourceName, 6);
+    }
+  }
+
+  /**
+   * Generates data points for a minute with the specified start time.
+   *
+   * @return Number of rows generated.
+   */
+  private long generateDataForMinute(DateTime firstRecordTime, 
StreamEventWriter streamEventWriter)
+  {
+    final long rowCount = streamGenerator.run(
+        generatedTestConfig.getStreamName(),
+        streamEventWriter,
+        10,
+        firstRecordTime
+    );
+    log.info("Generated [%d] rows for 1 minute interval with start[%s]", 
rowCount, firstRecordTime);
+
+    return rowCount;
+  }
+
+  /**
+   * Checks if a test should be skipped based on whether transaction is 
enabled or not.
+   */
+  private boolean shouldSkipTest(boolean testEnableTransaction)
+  {
+    Map<String, String> kafkaTestProps = KafkaUtil
+        .getAdditionalKafkaTestConfigFromProperties(config);
+    boolean configEnableTransaction = Boolean.parseBoolean(
+        kafkaTestProps.getOrDefault(KafkaUtil.TEST_CONFIG_TRANSACTION_ENABLED, 
"false")
+    );
+
+    return configEnableTransaction != testEnableTransaction;
+  }
+
+  /**
+   * Constants for test specs.
+   */
+  private static class Specs
+  {
+    static final String SERIALIZER_PATH = DATA_RESOURCE_ROOT + 
"/csv/serializer/serializer.json";
+    static final String INPUT_FORMAT_PATH = DATA_RESOURCE_ROOT + 
"/csv/input_format/input_format.json";
+    static final String PARSER_TYPE = AbstractStreamIndexingTest.INPUT_FORMAT;
+  }
+
+}
diff --git 
a/integration-tests/src/test/java/org/apache/druid/tests/indexer/AbstractIndexerTest.java
 
b/integration-tests/src/test/java/org/apache/druid/tests/indexer/AbstractIndexerTest.java
index efe78412fc4..c3549033684 100644
--- 
a/integration-tests/src/test/java/org/apache/druid/tests/indexer/AbstractIndexerTest.java
+++ 
b/integration-tests/src/test/java/org/apache/druid/tests/indexer/AbstractIndexerTest.java
@@ -28,6 +28,7 @@ import org.apache.druid.java.util.common.ISE;
 import org.apache.druid.java.util.common.Intervals;
 import org.apache.druid.java.util.common.StringUtils;
 import org.apache.druid.java.util.common.logger.Logger;
+import org.apache.druid.query.aggregation.LongSumAggregatorFactory;
 import org.apache.druid.testing.IntegrationTestingConfig;
 import org.apache.druid.testing.clients.CoordinatorResourceTestClient;
 import org.apache.druid.testing.clients.OverlordResourceTestClient;
@@ -159,6 +160,45 @@ public abstract class AbstractIndexerTest
     );
   }
 
+  /**
+   * Retries until segments have been loaded.
+   */
+  protected void waitForSegmentsToLoad(String dataSource)
+  {
+    ITRetryUtil.retryUntilTrue(
+        () -> coordinator.areSegmentsLoaded(dataSource),
+        "Segments are loaded"
+    );
+  }
+
+  /**
+   * Retries until the segment count is as expected.
+   */
+  protected void waitUntilDatasourceSegmentCountEquals(String dataSource, int 
numExpectedSegments)
+  {
+    ITRetryUtil.retryUntilEquals(
+        () -> coordinator.getFullSegmentsMetadata(dataSource).size(),
+        numExpectedSegments,
+        "Segment count"
+    );
+  }
+
+  /**
+   * Retries until the total row count is as expected.
+   */
+  protected void waitUntilDatasourceRowCountEquals(String dataSource, long 
totalRows)
+  {
+    ITRetryUtil.retryUntilEquals(
+        () -> queryHelper.countRows(
+            dataSource,
+            Intervals.ETERNITY,
+            name -> new LongSumAggregatorFactory(name, "count")
+        ),
+        totalRows,
+        "Total row count in datasource"
+    );
+  }
+
   public static String getResourceAsString(String file) throws IOException
   {
     try (final InputStream inputStream = getResourceAsStream(file)) {
diff --git 
a/integration-tests/src/test/java/org/apache/druid/tests/indexer/AbstractKafkaIndexingServiceTest.java
 
b/integration-tests/src/test/java/org/apache/druid/tests/indexer/AbstractKafkaIndexingServiceTest.java
index 06f2841c4da..af86af7245e 100644
--- 
a/integration-tests/src/test/java/org/apache/druid/tests/indexer/AbstractKafkaIndexingServiceTest.java
+++ 
b/integration-tests/src/test/java/org/apache/druid/tests/indexer/AbstractKafkaIndexingServiceTest.java
@@ -21,6 +21,7 @@ package org.apache.druid.tests.indexer;
 
 import com.google.common.base.Preconditions;
 import org.apache.druid.indexing.kafka.KafkaConsumerConfigs;
+import org.apache.druid.indexing.seekablestream.supervisor.LagAggregator;
 import org.apache.druid.java.util.common.StringUtils;
 import org.apache.druid.testing.IntegrationTestingConfig;
 import org.apache.druid.testing.utils.KafkaAdminClient;
@@ -55,6 +56,8 @@ public abstract class AbstractKafkaIndexingServiceTest 
extends AbstractStreamInd
       String parserType,
       String parserOrInputFormat,
       List<String> dimensions,
+      Map<String, Object> context,
+      LagAggregator lagAggregator,
       IntegrationTestingConfig config
   )
   {
@@ -129,6 +132,16 @@ public abstract class AbstractKafkaIndexingServiceTest 
extends AbstractStreamInd
             "%%DIMENSIONS%%",
             jsonMapper.writeValueAsString(dimensions)
         );
+        spec = StringUtils.replace(
+            spec,
+            "%%CONTEXT%%",
+            jsonMapper.writeValueAsString(context)
+        );
+        spec = StringUtils.replace(
+            spec,
+            "%%LAG_AGGREGATOR%%",
+            jsonMapper.writeValueAsString(lagAggregator)
+        );
         return StringUtils.replace(
             spec,
             "%%STREAM_PROPERTIES_VALUE%%",
diff --git 
a/integration-tests/src/test/java/org/apache/druid/tests/indexer/AbstractKinesisIndexingServiceTest.java
 
b/integration-tests/src/test/java/org/apache/druid/tests/indexer/AbstractKinesisIndexingServiceTest.java
index b83ea95ec17..009775e19af 100644
--- 
a/integration-tests/src/test/java/org/apache/druid/tests/indexer/AbstractKinesisIndexingServiceTest.java
+++ 
b/integration-tests/src/test/java/org/apache/druid/tests/indexer/AbstractKinesisIndexingServiceTest.java
@@ -19,6 +19,7 @@
 
 package org.apache.druid.tests.indexer;
 
+import org.apache.druid.indexing.seekablestream.supervisor.LagAggregator;
 import org.apache.druid.java.util.common.StringUtils;
 import org.apache.druid.java.util.common.logger.Logger;
 import org.apache.druid.testing.IntegrationTestingConfig;
@@ -29,6 +30,7 @@ import org.apache.druid.testing.utils.StreamEventWriter;
 
 import javax.annotation.Nullable;
 import java.util.List;
+import java.util.Map;
 import java.util.function.Function;
 
 public abstract class AbstractKinesisIndexingServiceTest extends 
AbstractStreamIndexingTest
@@ -61,6 +63,8 @@ public abstract class AbstractKinesisIndexingServiceTest 
extends AbstractStreamI
       String parserType,
       String parserOrInputFormat,
       List<String> dimensions,
+      Map<String, Object> context,
+      LagAggregator lagAggregator,
       IntegrationTestingConfig config
   )
   {
@@ -129,6 +133,16 @@ public abstract class AbstractKinesisIndexingServiceTest 
extends AbstractStreamI
             "%%DIMENSIONS%%",
             jsonMapper.writeValueAsString(dimensions)
         );
+        spec = StringUtils.replace(
+            spec,
+            "%%CONTEXT%%",
+            jsonMapper.writeValueAsString(context)
+        );
+        spec = StringUtils.replace(
+            spec,
+            "%%LAG_AGGREGATOR%%",
+            jsonMapper.writeValueAsString(lagAggregator)
+        );
         return StringUtils.replace(
             spec,
             "%%STREAM_PROPERTIES_VALUE%%",
diff --git 
a/integration-tests/src/test/java/org/apache/druid/tests/indexer/AbstractStreamIndexingTest.java
 
b/integration-tests/src/test/java/org/apache/druid/tests/indexer/AbstractStreamIndexingTest.java
index 8cc8388ba47..49a521b7148 100644
--- 
a/integration-tests/src/test/java/org/apache/druid/tests/indexer/AbstractStreamIndexingTest.java
+++ 
b/integration-tests/src/test/java/org/apache/druid/tests/indexer/AbstractStreamIndexingTest.java
@@ -24,6 +24,7 @@ import com.google.common.collect.ImmutableMap;
 import com.google.common.collect.Iterables;
 import com.google.inject.Inject;
 import org.apache.druid.indexing.overlord.supervisor.SupervisorStateManager;
+import org.apache.druid.indexing.seekablestream.supervisor.LagAggregator;
 import org.apache.druid.java.util.common.DateTimes;
 import org.apache.druid.java.util.common.IAE;
 import org.apache.druid.java.util.common.ISE;
@@ -146,6 +147,8 @@ public abstract class AbstractStreamIndexingTest extends 
AbstractIndexerTest
       String parserType,
       String parserOrInputFormat,
       List<String> dimensions,
+      Map<String, Object> context,
+      LagAggregator lagAggregator,
       IntegrationTestingConfig config
   );
 
@@ -948,9 +951,15 @@ public abstract class AbstractStreamIndexingTest extends 
AbstractIndexerTest
   {
     private final String streamName;
     private final String fullDatasourceName;
+    private final String parserType;
+    private final String parserOrInputFormat;
+    private final List<String> dimensions;
+
+    private final Function<String, String> streamQueryPropsTransform;
+
     private String supervisorId;
-    private Function<String, String> streamIngestionPropsTransform;
-    private Function<String, String> streamQueryPropsTransform;
+    private LagAggregator lagAggregator;
+    private Map<String, Object> context = Map.of();
 
     public GeneratedTestConfig(String parserType, String parserOrInputFormat) 
throws Exception
     {
@@ -959,7 +968,10 @@ public abstract class AbstractStreamIndexingTest extends 
AbstractIndexerTest
 
     public GeneratedTestConfig(String parserType, String parserOrInputFormat, 
List<String> dimensions) throws Exception
     {
-      streamName = getTestNamePrefix() + "_index_test_" + UUID.randomUUID();
+      this.parserType = parserType;
+      this.parserOrInputFormat = parserOrInputFormat;
+      this.dimensions = dimensions;
+      this.streamName = getTestNamePrefix() + "_index_test_" + 
UUID.randomUUID();
       String datasource = getTestNamePrefix() + "_indexing_service_test_" + 
UUID.randomUUID();
       Map<String, String> tags = ImmutableMap.of(
           STREAM_EXPIRE_TAG,
@@ -974,17 +986,21 @@ public abstract class AbstractStreamIndexingTest extends 
AbstractIndexerTest
           "Wait for stream active"
       );
       fullDatasourceName = datasource + config.getExtraDatasourceNameSuffix();
-      streamIngestionPropsTransform = generateStreamIngestionPropsTransform(
-          streamName,
-          fullDatasourceName,
-          parserType,
-          parserOrInputFormat,
-          dimensions,
-          config
-      );
       streamQueryPropsTransform = 
generateStreamQueryPropsTransform(streamName, fullDatasourceName);
     }
 
+    public GeneratedTestConfig withContext(Map<String, Object> context)
+    {
+      this.context = context;
+      return this;
+    }
+
+    public GeneratedTestConfig withLagAggregator(LagAggregator lagAggregator)
+    {
+      this.lagAggregator = lagAggregator;
+      return this;
+    }
+
     public String getSupervisorId()
     {
       return supervisorId;
@@ -1007,7 +1023,16 @@ public abstract class AbstractStreamIndexingTest extends 
AbstractIndexerTest
 
     public Function<String, String> getStreamIngestionPropsTransform()
     {
-      return streamIngestionPropsTransform;
+      return generateStreamIngestionPropsTransform(
+          streamName,
+          fullDatasourceName,
+          parserType,
+          parserOrInputFormat,
+          dimensions,
+          context,
+          lagAggregator,
+          config
+      );
     }
 
     public Function<String, String> getStreamQueryPropsTransform()
diff --git 
a/integration-tests/src/test/resources/stream/data/supervisor_spec_template.json
 
b/integration-tests/src/test/resources/stream/data/supervisor_spec_template.json
index b55028dc16f..bb22cdc6c02 100644
--- 
a/integration-tests/src/test/resources/stream/data/supervisor_spec_template.json
+++ 
b/integration-tests/src/test/resources/stream/data/supervisor_spec_template.json
@@ -52,6 +52,8 @@
     "replicas": 1,
     "taskDuration": "PT120S",
     "%%USE_EARLIEST_KEY%%": true,
-    "inputFormat" : %%INPUT_FORMAT%%
-  }
+    "inputFormat" : %%INPUT_FORMAT%%,
+    "lagAggregator": %%LAG_AGGREGATOR%%
+  },
+  "context": %%CONTEXT%%
 }
diff --git 
a/server/src/test/java/org/apache/druid/client/coordinator/NoopCoordinatorClient.java
 
b/server/src/main/java/org/apache/druid/client/coordinator/NoopCoordinatorClient.java
similarity index 100%
rename from 
server/src/test/java/org/apache/druid/client/coordinator/NoopCoordinatorClient.java
rename to 
server/src/main/java/org/apache/druid/client/coordinator/NoopCoordinatorClient.java


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to