This is an automated email from the ASF dual-hosted git repository.
kfaraz pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/druid.git
The following commit(s) were added to refs/heads/master by this push:
new 19132c6370a Add tools for scale and fault-tolerance testing (#17959)
19132c6370a is described below
commit 19132c6370aea508df840a00c875f171fe46c2ef
Author: Kashif Faraz <[email protected]>
AuthorDate: Fri May 9 19:35:50 2025 +0530
Add tools for scale and fault-tolerance testing (#17959)
Description
-----------
This patch adds tools to introduce artificial faults in a Druid test
cluster to allow for scale testing and
fault-tolerance testing. These tools can be used to trigger faults such as
segment allocation delay,
segment publish delay, etc. and evaluate the capability of a cluster to
recover from such situations.
The `ClusterTestingModule` can evolve over time to allow inducing more
types of faults.
Changes
---------
- Add `ClusterTestingModule` to inject faulty dependencies when testing is
enabled
- Enable cluster testing mode by loading extension `druid-testing-tools`
and setting config
`druid.unsafe.cluster.testing=true`
- Bind `FaultyCoordinatorClient`, `FaultyRemoteTaskActionClient` on Peons
if testing is enabled
- Trigger specific faults on tasks by setting the corresponding task
context parameter
- Use `FaultyLagAggregator` in a supervisor if specified in the supervisor
spec `ioConfig.lagAggregator`
- Add integration test `ITFaultyClusterTest` to try out these configs
---
distribution/pom.xml | 2 +
.../supervisor/RabbitStreamSupervisor.java | 10 +-
.../supervisor/RabbitStreamSupervisorIOConfig.java | 3 +-
.../indexing/kafka/supervisor/KafkaSupervisor.java | 4 +-
.../kafka/supervisor/KafkaSupervisorIOConfig.java | 3 +
.../druid/indexing/kafka/KafkaSamplerSpecTest.java | 6 +
.../supervisor/KafkaSupervisorIOConfigTest.java | 24 +-
.../kafka/supervisor/KafkaSupervisorTest.java | 4 +
.../kinesis/supervisor/KinesisSupervisor.java | 4 +-
.../supervisor/KinesisSupervisorIOConfig.java | 2 +
extensions-core/testing-tools/pom.xml | 24 ++
.../apache/druid/guice/ClusterTestingModule.java | 150 +++++++++++
.../testing/cluster/ClusterTestingTaskConfig.java | 247 ++++++++++++++++++
.../cluster/overlord/FaultyLagAggregator.java | 69 +++++
.../cluster/overlord/FaultyTaskLockbox.java | 85 ++++++
.../cluster/task/FaultyCoordinatorClient.java | 108 ++++++++
.../testing/cluster/task/FaultyOverlordClient.java | 59 +++++
.../cluster/task/FaultyRemoteTaskActionClient.java | 93 +++++++
.../task/FaultyRemoteTaskActionClientFactory.java | 62 +++++
.../org.apache.druid.initialization.DruidModule | 1 +
.../druid/guice/ClusterTestingModuleTest.java | 288 +++++++++++++++++++++
.../druid/indexing/overlord/TaskLockbox.java | 51 ++--
.../seekablestream/supervisor/LagAggregator.java | 65 +++++
.../supervisor/SeekableStreamSupervisor.java | 14 +-
.../SeekableStreamSupervisorIOConfig.java | 16 ++
.../SeekableStreamSamplerSpecTest.java | 2 +
.../SeekableStreamSupervisorSpecTest.java | 5 +
.../SeekableStreamSupervisorStateTest.java | 37 +--
.../docker/environment-configs/common | 3 +
.../clients/OverlordResourceTestClient.java | 12 +-
.../coordinator/duty/ITFaultyClusterTest.java | 196 ++++++++++++++
.../druid/tests/indexer/AbstractIndexerTest.java | 40 +++
.../indexer/AbstractKafkaIndexingServiceTest.java | 13 +
.../AbstractKinesisIndexingServiceTest.java | 14 +
.../tests/indexer/AbstractStreamIndexingTest.java | 49 +++-
.../stream/data/supervisor_spec_template.json | 6 +-
.../client/coordinator/NoopCoordinatorClient.java | 0
37 files changed, 1686 insertions(+), 85 deletions(-)
diff --git a/distribution/pom.xml b/distribution/pom.xml
index bc67631fd82..da9631c4867 100644
--- a/distribution/pom.xml
+++ b/distribution/pom.xml
@@ -257,6 +257,8 @@
<argument>org.apache.druid.extensions:druid-kubernetes-overlord-extensions</argument>
<argument>-c</argument>
<argument>org.apache.druid.extensions:druid-catalog</argument>
+ <argument>-c</argument>
+
<argument>org.apache.druid.extensions:druid-testing-tools</argument>
<argument>${druid.distribution.pulldeps.opts}</argument>
</arguments>
</configuration>
diff --git
a/extensions-contrib/rabbit-stream-indexing-service/src/main/java/org/apache/druid/indexing/rabbitstream/supervisor/RabbitStreamSupervisor.java
b/extensions-contrib/rabbit-stream-indexing-service/src/main/java/org/apache/druid/indexing/rabbitstream/supervisor/RabbitStreamSupervisor.java
index 9ce9bd331cf..89a05a77022 100644
---
a/extensions-contrib/rabbit-stream-indexing-service/src/main/java/org/apache/druid/indexing/rabbitstream/supervisor/RabbitStreamSupervisor.java
+++
b/extensions-contrib/rabbit-stream-indexing-service/src/main/java/org/apache/druid/indexing/rabbitstream/supervisor/RabbitStreamSupervisor.java
@@ -53,8 +53,6 @@ import
org.apache.druid.indexing.seekablestream.supervisor.SeekableStreamSupervi
import
org.apache.druid.indexing.seekablestream.supervisor.SeekableStreamSupervisorReportPayload;
import org.apache.druid.java.util.common.StringUtils;
import org.apache.druid.java.util.emitter.EmittingLogger;
-import org.apache.druid.java.util.emitter.service.ServiceEmitter;
-import org.apache.druid.java.util.metrics.DruidMonitorSchedulerConfig;
import org.apache.druid.segment.incremental.RowIngestionMetersFactory;
import org.joda.time.DateTime;
@@ -89,8 +87,6 @@ public class RabbitStreamSupervisor extends
SeekableStreamSupervisor<String, Lon
private static final Long NOT_SET = -1L;
private static final Long END_OF_PARTITION = Long.MAX_VALUE;
- private final ServiceEmitter emitter;
- private final DruidMonitorSchedulerConfig monitorSchedulerConfig;
private volatile Map<String, Long> latestSequenceFromStream;
private final RabbitStreamSupervisorSpec spec;
@@ -116,8 +112,6 @@ public class RabbitStreamSupervisor extends
SeekableStreamSupervisor<String, Lon
false);
this.spec = spec;
- this.emitter = spec.getEmitter();
- this.monitorSchedulerConfig = spec.getMonitorSchedulerConfig();
}
@Override
@@ -168,7 +162,7 @@ public class RabbitStreamSupervisor extends
SeekableStreamSupervisor<String, Lon
ioConfig.getTaskDuration().getMillis() / 1000,
includeOffsets ? latestSequenceFromStream : null,
includeOffsets ? partitionLag : null,
- includeOffsets ? partitionLag.values().stream().mapToLong(x ->
Math.max(x, 0)).sum() : null,
+ includeOffsets ? aggregatePartitionLags(partitionLag).getTotalLag() :
null,
includeOffsets ? sequenceLastUpdated : null,
spec.isSuspended(),
stateManager.isHealthy(),
@@ -363,7 +357,7 @@ public class RabbitStreamSupervisor extends
SeekableStreamSupervisor<String, Lon
return new LagStats(0, 0, 0);
}
- return computeLags(partitionRecordLag);
+ return aggregatePartitionLags(partitionRecordLag);
}
@Override
diff --git
a/extensions-contrib/rabbit-stream-indexing-service/src/main/java/org/apache/druid/indexing/rabbitstream/supervisor/RabbitStreamSupervisorIOConfig.java
b/extensions-contrib/rabbit-stream-indexing-service/src/main/java/org/apache/druid/indexing/rabbitstream/supervisor/RabbitStreamSupervisorIOConfig.java
index bce4164c5d2..0538163df61 100644
---
a/extensions-contrib/rabbit-stream-indexing-service/src/main/java/org/apache/druid/indexing/rabbitstream/supervisor/RabbitStreamSupervisorIOConfig.java
+++
b/extensions-contrib/rabbit-stream-indexing-service/src/main/java/org/apache/druid/indexing/rabbitstream/supervisor/RabbitStreamSupervisorIOConfig.java
@@ -24,13 +24,13 @@ import com.fasterxml.jackson.annotation.JsonProperty;
import com.google.common.base.Preconditions;
import org.apache.druid.data.input.InputFormat;
import org.apache.druid.indexing.seekablestream.supervisor.IdleConfig;
+import org.apache.druid.indexing.seekablestream.supervisor.LagAggregator;
import
org.apache.druid.indexing.seekablestream.supervisor.SeekableStreamSupervisorIOConfig;
import
org.apache.druid.indexing.seekablestream.supervisor.autoscaler.AutoScalerConfig;
import org.joda.time.DateTime;
import org.joda.time.Period;
import javax.annotation.Nullable;
-
import java.util.Map;
public class RabbitStreamSupervisorIOConfig extends
SeekableStreamSupervisorIOConfig
@@ -81,6 +81,7 @@ public class RabbitStreamSupervisorIOConfig extends
SeekableStreamSupervisorIOCo
lateMessageRejectionPeriod,
earlyMessageRejectionPeriod,
autoScalerConfig,
+ LagAggregator.DEFAULT,
lateMessageRejectionStartDateTime,
new IdleConfig(null, null),
stopTaskCount
diff --git
a/extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/indexing/kafka/supervisor/KafkaSupervisor.java
b/extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/indexing/kafka/supervisor/KafkaSupervisor.java
index b990c8fa3fc..743e4edc32c 100644
---
a/extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/indexing/kafka/supervisor/KafkaSupervisor.java
+++
b/extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/indexing/kafka/supervisor/KafkaSupervisor.java
@@ -174,7 +174,7 @@ public class KafkaSupervisor extends
SeekableStreamSupervisor<KafkaTopicPartitio
includeOffsets ? latestSequenceFromStream : null,
includeOffsets ? partitionLag : null,
includeOffsets ? getPartitionTimeLag() : null,
- includeOffsets ? partitionLag.values().stream().mapToLong(x ->
Math.max(x, 0)).sum() : null,
+ includeOffsets ? aggregatePartitionLags(partitionLag).getTotalLag() :
null,
includeOffsets ? sequenceLastUpdated : null,
spec.isSuspended(),
stateManager.isHealthy(),
@@ -379,7 +379,7 @@ public class KafkaSupervisor extends
SeekableStreamSupervisor<KafkaTopicPartitio
return new LagStats(0, 0, 0);
}
- return computeLags(partitionRecordLag);
+ return aggregatePartitionLags(partitionRecordLag);
}
/**
diff --git
a/extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/indexing/kafka/supervisor/KafkaSupervisorIOConfig.java
b/extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/indexing/kafka/supervisor/KafkaSupervisorIOConfig.java
index be4f6cfb196..4eac3163fe3 100644
---
a/extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/indexing/kafka/supervisor/KafkaSupervisorIOConfig.java
+++
b/extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/indexing/kafka/supervisor/KafkaSupervisorIOConfig.java
@@ -27,6 +27,7 @@ import org.apache.druid.data.input.InputFormat;
import org.apache.druid.error.InvalidInput;
import org.apache.druid.indexing.seekablestream.extension.KafkaConfigOverrides;
import org.apache.druid.indexing.seekablestream.supervisor.IdleConfig;
+import org.apache.druid.indexing.seekablestream.supervisor.LagAggregator;
import
org.apache.druid.indexing.seekablestream.supervisor.SeekableStreamSupervisorIOConfig;
import
org.apache.druid.indexing.seekablestream.supervisor.autoscaler.AutoScalerConfig;
import org.apache.druid.java.util.common.StringUtils;
@@ -64,6 +65,7 @@ public class KafkaSupervisorIOConfig extends
SeekableStreamSupervisorIOConfig
@JsonProperty("taskDuration") Period taskDuration,
@JsonProperty("consumerProperties") Map<String, Object>
consumerProperties,
@Nullable @JsonProperty("autoScalerConfig") AutoScalerConfig
autoScalerConfig,
+ @Nullable @JsonProperty("lagAggregator") LagAggregator lagAggregator,
@JsonProperty("pollTimeout") Long pollTimeout,
@JsonProperty("startDelay") Period startDelay,
@JsonProperty("period") Period period,
@@ -91,6 +93,7 @@ public class KafkaSupervisorIOConfig extends
SeekableStreamSupervisorIOConfig
lateMessageRejectionPeriod,
earlyMessageRejectionPeriod,
autoScalerConfig,
+ Configs.valueOrDefault(lagAggregator, LagAggregator.DEFAULT),
lateMessageRejectionStartDateTime,
idleConfig,
stopTaskCount
diff --git
a/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/KafkaSamplerSpecTest.java
b/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/KafkaSamplerSpecTest.java
index dd7efcdea51..f0d2bacd359 100644
---
a/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/KafkaSamplerSpecTest.java
+++
b/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/KafkaSamplerSpecTest.java
@@ -158,6 +158,7 @@ public class KafkaSamplerSpecTest extends
InitializedNullHandlingTest
null,
null,
null,
+ null,
true,
null,
null,
@@ -212,6 +213,7 @@ public class KafkaSamplerSpecTest extends
InitializedNullHandlingTest
null,
null,
null,
+ null,
true,
null,
null,
@@ -275,6 +277,7 @@ public class KafkaSamplerSpecTest extends
InitializedNullHandlingTest
null,
null,
null,
+ null,
true,
null,
null,
@@ -380,6 +383,7 @@ public class KafkaSamplerSpecTest extends
InitializedNullHandlingTest
null,
null,
null,
+ null,
true,
null,
null,
@@ -565,6 +569,7 @@ public class KafkaSamplerSpecTest extends
InitializedNullHandlingTest
null,
null,
null,
+ null,
true,
null,
null,
@@ -622,6 +627,7 @@ public class KafkaSamplerSpecTest extends
InitializedNullHandlingTest
null,
null,
null,
+ null,
true,
null,
null,
diff --git
a/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/supervisor/KafkaSupervisorIOConfigTest.java
b/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/supervisor/KafkaSupervisorIOConfigTest.java
index 211230bf9d0..2da46aaf0e2 100644
---
a/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/supervisor/KafkaSupervisorIOConfigTest.java
+++
b/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/supervisor/KafkaSupervisorIOConfigTest.java
@@ -28,6 +28,7 @@ import org.apache.druid.indexing.kafka.KafkaConsumerConfigs;
import org.apache.druid.indexing.kafka.KafkaIndexTaskModule;
import org.apache.druid.indexing.kafka.KafkaRecordSupplier;
import org.apache.druid.indexing.seekablestream.supervisor.IdleConfig;
+import org.apache.druid.indexing.seekablestream.supervisor.LagAggregator;
import
org.apache.druid.indexing.seekablestream.supervisor.autoscaler.LagBasedAutoScalerConfig;
import org.apache.druid.jackson.DefaultObjectMapper;
import org.apache.druid.java.util.common.DateTimes;
@@ -85,7 +86,7 @@ public class KafkaSupervisorIOConfigTest
Assert.assertEquals(100, config.getPollTimeout());
Assert.assertEquals(Duration.standardSeconds(5), config.getStartDelay());
Assert.assertEquals(Duration.standardSeconds(30), config.getPeriod());
- Assert.assertEquals(false, config.isUseEarliestOffset());
+ Assert.assertFalse(config.isUseEarliestOffset());
Assert.assertEquals(Duration.standardMinutes(30),
config.getCompletionTimeout());
Assert.assertFalse("lateMessageRejectionPeriod",
config.getLateMessageRejectionPeriod().isPresent());
Assert.assertFalse("earlyMessageRejectionPeriod",
config.getEarlyMessageRejectionPeriod().isPresent());
@@ -151,10 +152,10 @@ public class KafkaSupervisorIOConfigTest
Assert.assertEquals(1000, config.getPollTimeout());
Assert.assertEquals(Duration.standardMinutes(1), config.getStartDelay());
Assert.assertEquals(Duration.standardSeconds(10), config.getPeriod());
- Assert.assertEquals(true, config.isUseEarliestOffset());
+ Assert.assertTrue(config.isUseEarliestOffset());
Assert.assertEquals(Duration.standardMinutes(45),
config.getCompletionTimeout());
- Assert.assertEquals(Duration.standardHours(1),
config.getLateMessageRejectionPeriod().get());
- Assert.assertEquals(Duration.standardHours(1),
config.getEarlyMessageRejectionPeriod().get());
+ Assert.assertEquals(Duration.standardHours(1),
config.getLateMessageRejectionPeriod().orNull());
+ Assert.assertEquals(Duration.standardHours(1),
config.getEarlyMessageRejectionPeriod().orNull());
}
@Test
@@ -194,9 +195,9 @@ public class KafkaSupervisorIOConfigTest
Assert.assertEquals(1000, config.getPollTimeout());
Assert.assertEquals(Duration.standardMinutes(1), config.getStartDelay());
Assert.assertEquals(Duration.standardSeconds(10), config.getPeriod());
- Assert.assertEquals(true, config.isUseEarliestOffset());
+ Assert.assertTrue(config.isUseEarliestOffset());
Assert.assertEquals(Duration.standardMinutes(45),
config.getCompletionTimeout());
- Assert.assertEquals(DateTimes.of("2016-05-31T12:00Z"),
config.getLateMessageRejectionStartDateTime().get());
+ Assert.assertEquals(DateTimes.of("2016-05-31T12:00Z"),
config.getLateMessageRejectionStartDateTime().orNull());
}
@Test
@@ -286,14 +287,14 @@ public class KafkaSupervisorIOConfigTest
+ " \"lateMessageRejectionStartDateTime\": \"2016-05-31T12:00Z\"\n"
+ "}";
exception.expect(JsonMappingException.class);
- KafkaSupervisorIOConfig config = mapper.readValue(
+ mapper.readValue(
mapper.writeValueAsString(
mapper.readValue(
jsonStr,
KafkaSupervisorIOConfig.class
- )
- ), KafkaSupervisorIOConfig.class
- );
+ )
+ ), KafkaSupervisorIOConfig.class
+ );
}
@Test
@@ -327,6 +328,7 @@ public class KafkaSupervisorIOConfigTest
new Period("PT1H"),
consumerProperties,
mapper.convertValue(autoScalerConfig, LagBasedAutoScalerConfig.class),
+ null,
KafkaSupervisorIOConfig.DEFAULT_POLL_TIMEOUT_MILLIS,
new Period("P1D"),
new Period("PT30S"),
@@ -361,6 +363,7 @@ public class KafkaSupervisorIOConfigTest
new Period("PT1H"),
consumerProperties,
mapper.convertValue(autoScalerConfig, LagBasedAutoScalerConfig.class),
+ LagAggregator.DEFAULT,
KafkaSupervisorIOConfig.DEFAULT_POLL_TIMEOUT_MILLIS,
new Period("P1D"),
new Period("PT30S"),
@@ -412,6 +415,7 @@ public class KafkaSupervisorIOConfigTest
new Period("PT1H"),
consumerProperties,
null,
+ null,
KafkaSupervisorIOConfig.DEFAULT_POLL_TIMEOUT_MILLIS,
new Period("P1D"),
new Period("PT30S"),
diff --git
a/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/supervisor/KafkaSupervisorTest.java
b/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/supervisor/KafkaSupervisorTest.java
index 65dde0444a7..408aa07d75c 100644
---
a/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/supervisor/KafkaSupervisorTest.java
+++
b/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/supervisor/KafkaSupervisorTest.java
@@ -311,6 +311,7 @@ public class KafkaSupervisorTest extends EasyMockSupport
new Period("PT1H"),
consumerProperties,
OBJECT_MAPPER.convertValue(autoScalerConfig,
LagBasedAutoScalerConfig.class),
+ null,
KafkaSupervisorIOConfig.DEFAULT_POLL_TIMEOUT_MILLIS,
new Period("P1D"),
new Period("PT30S"),
@@ -5297,6 +5298,7 @@ public class KafkaSupervisorTest extends EasyMockSupport
new Period(duration),
consumerProperties,
null,
+ null,
KafkaSupervisorIOConfig.DEFAULT_POLL_TIMEOUT_MILLIS,
new Period("P1D"),
new Period("PT30S"),
@@ -5412,6 +5414,7 @@ public class KafkaSupervisorTest extends EasyMockSupport
new Period(duration),
consumerProperties,
null,
+ null,
KafkaSupervisorIOConfig.DEFAULT_POLL_TIMEOUT_MILLIS,
new Period("P1D"),
new Period("PT30S"),
@@ -5530,6 +5533,7 @@ public class KafkaSupervisorTest extends EasyMockSupport
new Period(duration),
consumerProperties,
null,
+ null,
KafkaSupervisorIOConfig.DEFAULT_POLL_TIMEOUT_MILLIS,
new Period("P1D"),
new Period("PT30S"),
diff --git
a/extensions-core/kinesis-indexing-service/src/main/java/org/apache/druid/indexing/kinesis/supervisor/KinesisSupervisor.java
b/extensions-core/kinesis-indexing-service/src/main/java/org/apache/druid/indexing/kinesis/supervisor/KinesisSupervisor.java
index db29d0dae53..e8996391946 100644
---
a/extensions-core/kinesis-indexing-service/src/main/java/org/apache/druid/indexing/kinesis/supervisor/KinesisSupervisor.java
+++
b/extensions-core/kinesis-indexing-service/src/main/java/org/apache/druid/indexing/kinesis/supervisor/KinesisSupervisor.java
@@ -274,7 +274,7 @@ public class KinesisSupervisor extends
SeekableStreamSupervisor<String, String,
stateManager.getSupervisorState(),
stateManager.getExceptionEvents(),
includeOffsets ? partitionLag : null,
- includeOffsets ? partitionLag.values().stream().mapToLong(x ->
Math.max(x, 0)).sum() : null
+ includeOffsets ? aggregatePartitionLags(partitionLag).getTotalLag() :
null
);
}
@@ -387,7 +387,7 @@ public class KinesisSupervisor extends
SeekableStreamSupervisor<String, String,
return new LagStats(0, 0, 0);
}
- return computeLags(partitionTimeLags);
+ return aggregatePartitionLags(partitionTimeLags);
}
@Override
diff --git
a/extensions-core/kinesis-indexing-service/src/main/java/org/apache/druid/indexing/kinesis/supervisor/KinesisSupervisorIOConfig.java
b/extensions-core/kinesis-indexing-service/src/main/java/org/apache/druid/indexing/kinesis/supervisor/KinesisSupervisorIOConfig.java
index 9910f22a349..ce6eece5af2 100644
---
a/extensions-core/kinesis-indexing-service/src/main/java/org/apache/druid/indexing/kinesis/supervisor/KinesisSupervisorIOConfig.java
+++
b/extensions-core/kinesis-indexing-service/src/main/java/org/apache/druid/indexing/kinesis/supervisor/KinesisSupervisorIOConfig.java
@@ -27,6 +27,7 @@ import org.apache.druid.data.input.InputFormat;
import org.apache.druid.indexing.kinesis.KinesisIndexTaskIOConfig;
import org.apache.druid.indexing.kinesis.KinesisRegion;
import org.apache.druid.indexing.seekablestream.supervisor.IdleConfig;
+import org.apache.druid.indexing.seekablestream.supervisor.LagAggregator;
import
org.apache.druid.indexing.seekablestream.supervisor.SeekableStreamSupervisorIOConfig;
import
org.apache.druid.indexing.seekablestream.supervisor.autoscaler.AutoScalerConfig;
import org.joda.time.DateTime;
@@ -92,6 +93,7 @@ public class KinesisSupervisorIOConfig extends
SeekableStreamSupervisorIOConfig
lateMessageRejectionPeriod,
earlyMessageRejectionPeriod,
autoScalerConfig,
+ LagAggregator.DEFAULT,
lateMessageRejectionStartDateTime,
new IdleConfig(null, null),
null
diff --git a/extensions-core/testing-tools/pom.xml
b/extensions-core/testing-tools/pom.xml
index 51cf92b87d3..2042b8033b5 100644
--- a/extensions-core/testing-tools/pom.xml
+++ b/extensions-core/testing-tools/pom.xml
@@ -57,6 +57,18 @@
<version>${project.parent.version}</version>
<scope>provided</scope>
</dependency>
+ <dependency>
+ <groupId>org.apache.druid</groupId>
+ <artifactId>druid-indexing-service</artifactId>
+ <version>${project.parent.version}</version>
+ <scope>provided</scope>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.druid</groupId>
+ <artifactId>druid-services</artifactId>
+ <version>${project.parent.version}</version>
+ <scope>provided</scope>
+ </dependency>
<dependency>
<groupId>com.google.guava</groupId>
<artifactId>guava</artifactId>
@@ -120,6 +132,11 @@
</dependency>
<!-- Test Dependencies -->
+ <dependency>
+ <groupId>commons-io</groupId>
+ <artifactId>commons-io</artifactId>
+ <scope>test</scope>
+ </dependency>
<dependency>
<groupId>junit</groupId>
<artifactId>junit</artifactId>
@@ -165,6 +182,13 @@
<artifactId>easymock</artifactId>
<scope>test</scope>
</dependency>
+ <dependency>
+ <groupId>org.apache.druid</groupId>
+ <artifactId>druid-indexing-service</artifactId>
+ <version>${project.parent.version}</version>
+ <scope>test</scope>
+ <type>test-jar</type>
+ </dependency>
<dependency>
<groupId>org.apache.druid</groupId>
<artifactId>druid-processing</artifactId>
diff --git
a/extensions-core/testing-tools/src/main/java/org/apache/druid/guice/ClusterTestingModule.java
b/extensions-core/testing-tools/src/main/java/org/apache/druid/guice/ClusterTestingModule.java
new file mode 100644
index 00000000000..c1fa71a5f31
--- /dev/null
+++
b/extensions-core/testing-tools/src/main/java/org/apache/druid/guice/ClusterTestingModule.java
@@ -0,0 +1,150 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.guice;
+
+import com.fasterxml.jackson.databind.Module;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.fasterxml.jackson.databind.jsontype.NamedType;
+import com.fasterxml.jackson.databind.module.SimpleModule;
+import com.google.inject.Binder;
+import com.google.inject.Inject;
+import com.google.inject.Provider;
+import org.apache.druid.client.coordinator.CoordinatorClient;
+import org.apache.druid.discovery.NodeRole;
+import org.apache.druid.guice.annotations.Self;
+import org.apache.druid.indexing.common.actions.RemoteTaskActionClientFactory;
+import org.apache.druid.indexing.common.task.Task;
+import org.apache.druid.indexing.overlord.TaskLockbox;
+import org.apache.druid.initialization.DruidModule;
+import org.apache.druid.java.util.common.logger.Logger;
+import org.apache.druid.rpc.indexing.OverlordClient;
+import org.apache.druid.testing.cluster.ClusterTestingTaskConfig;
+import org.apache.druid.testing.cluster.overlord.FaultyLagAggregator;
+import org.apache.druid.testing.cluster.overlord.FaultyTaskLockbox;
+import org.apache.druid.testing.cluster.task.FaultyCoordinatorClient;
+import org.apache.druid.testing.cluster.task.FaultyOverlordClient;
+import
org.apache.druid.testing.cluster.task.FaultyRemoteTaskActionClientFactory;
+
+import java.io.IOException;
+import java.util.List;
+import java.util.Properties;
+import java.util.Set;
+
+/**
+ * Module that injects faulty clients into the Peon process to simulate various
+ * fault scenarios.
+ */
+public class ClusterTestingModule implements DruidModule
+{
+ private static final Logger log = new Logger(ClusterTestingModule.class);
+
+ private Set<NodeRole> roles;
+ private boolean isClusterTestingEnabled = false;
+
+ @Inject
+ public void configure(
+ Properties props,
+ @Self Set<NodeRole> roles
+ )
+ {
+ this.isClusterTestingEnabled = Boolean.parseBoolean(
+ props.getProperty("druid.unsafe.cluster.testing", "false")
+ );
+ this.roles = roles;
+ }
+
+ @Override
+ public void configure(Binder binder)
+ {
+ if (isClusterTestingEnabled) {
+ log.warn(
+ "Running service in cluster testing mode. This is an unsafe
test-only"
+ + " mode and must never be used in a production cluster."
+ + " Set property[druid.unsafe.cluster.testing=false] to disable
testing mode."
+ );
+ bindDependenciesForClusterTestingMode(binder);
+ } else {
+ log.warn("Cluster testing is disabled. Set
property[druid.unsafe.cluster.testing=true] to enable it.");
+ }
+ }
+
+ private void bindDependenciesForClusterTestingMode(Binder binder)
+ {
+ if (roles.equals(Set.of(NodeRole.PEON))) {
+ // Bind cluster testing config
+ binder.bind(ClusterTestingTaskConfig.class)
+ .toProvider(TestConfigProvider.class)
+ .in(LazySingleton.class);
+
+ // Bind faulty clients for Coordinator, Overlord and task actions
+ binder.bind(CoordinatorClient.class)
+ .to(FaultyCoordinatorClient.class)
+ .in(LazySingleton.class);
+ binder.bind(OverlordClient.class)
+ .to(FaultyOverlordClient.class)
+ .in(LazySingleton.class);
+ binder.bind(RemoteTaskActionClientFactory.class)
+ .to(FaultyRemoteTaskActionClientFactory.class)
+ .in(LazySingleton.class);
+ } else if (roles.contains(NodeRole.OVERLORD)) {
+ // If this is the Overlord, bind a faulty storage coordinator
+ log.warn("Running Overlord in cluster testing mode.");
+ binder.bind(TaskLockbox.class)
+ .to(FaultyTaskLockbox.class)
+ .in(LazySingleton.class);
+ }
+ }
+
+ @Override
+ public List<? extends Module> getJacksonModules()
+ {
+ return List.of(
+ new SimpleModule(getClass().getSimpleName())
+ .registerSubtypes(new NamedType(FaultyLagAggregator.class,
"faulty"))
+ );
+ }
+
+ private static class TestConfigProvider implements
Provider<ClusterTestingTaskConfig>
+ {
+ private final Task task;
+ private final ObjectMapper mapper;
+
+ @Inject
+ public TestConfigProvider(Task task, ObjectMapper mapper)
+ {
+ this.task = task;
+ this.mapper = mapper;
+ }
+
+ @Override
+ public ClusterTestingTaskConfig get()
+ {
+ try {
+ final ClusterTestingTaskConfig testingConfig =
ClusterTestingTaskConfig.forTask(task, mapper);
+ log.warn("Running task in cluster testing mode with config[%s].",
testingConfig);
+
+ return testingConfig;
+ }
+ catch (IOException e) {
+ throw new RuntimeException(e);
+ }
+ }
+ }
+}
diff --git
a/extensions-core/testing-tools/src/main/java/org/apache/druid/testing/cluster/ClusterTestingTaskConfig.java
b/extensions-core/testing-tools/src/main/java/org/apache/druid/testing/cluster/ClusterTestingTaskConfig.java
new file mode 100644
index 00000000000..c350c951dab
--- /dev/null
+++
b/extensions-core/testing-tools/src/main/java/org/apache/druid/testing/cluster/ClusterTestingTaskConfig.java
@@ -0,0 +1,247 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.testing.cluster;
+
+import com.fasterxml.jackson.annotation.JsonCreator;
+import com.fasterxml.jackson.annotation.JsonProperty;
+import com.fasterxml.jackson.core.JsonProcessingException;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import org.apache.druid.common.config.Configs;
+import org.apache.druid.indexing.common.task.Task;
+import org.joda.time.Duration;
+
+import javax.annotation.Nullable;
+import java.util.Map;
+
+/**
+ * Config passed in the context parameter {@code "clusterTesting"} of a task,
+ * used for testing scalability of a Druid cluster by introducing faults at
+ * various interface points.
+ */
+public class ClusterTestingTaskConfig
+{
+ /**
+ * Default config which does not trigger any faults for this task.
+ */
+ public static final ClusterTestingTaskConfig DEFAULT = new
ClusterTestingTaskConfig(null, null, null, null);
+
+ private final MetadataConfig metadataConfig;
+ private final OverlordClientConfig overlordClientConfig;
+ private final CoordinatorClientConfig coordinatorClientConfig;
+ private final TaskActionClientConfig taskActionClientConfig;
+
+ @JsonCreator
+ public ClusterTestingTaskConfig(
+ @JsonProperty("metadataConfig") @Nullable MetadataConfig metadataConfig,
+ @JsonProperty("overlordClientConfig") @Nullable OverlordClientConfig
overlordClientConfig,
+ @JsonProperty("coordinatorClientConfig") @Nullable
CoordinatorClientConfig coordinatorClientConfig,
+ @JsonProperty("taskActionClientConfig") @Nullable TaskActionClientConfig
taskActionClientConfig
+ )
+ {
+ this.metadataConfig = Configs.valueOrDefault(metadataConfig,
MetadataConfig.DEFAULT);
+ this.overlordClientConfig = Configs.valueOrDefault(overlordClientConfig,
OverlordClientConfig.DEFAULT);
+ this.coordinatorClientConfig =
Configs.valueOrDefault(coordinatorClientConfig,
CoordinatorClientConfig.DEFAULT);
+ this.taskActionClientConfig =
Configs.valueOrDefault(taskActionClientConfig, TaskActionClientConfig.DEFAULT);
+ }
+
+ /**
+ * Creates a {@link ClusterTestingTaskConfig} for the given Task by reading
+ * the task context parameter {@code "clusterTesting"}.
+ *
+ * @return Default config {@link ClusterTestingTaskConfig#DEFAULT} if not
+ * specified in the task context parameters.
+ */
+ public static ClusterTestingTaskConfig forTask(Task task, ObjectMapper
mapper) throws JsonProcessingException
+ {
+ final Map<String, Object> configAsMap =
task.getContextValue("clusterTesting");
+ final String json = mapper.writeValueAsString(configAsMap);
+
+ final ClusterTestingTaskConfig config = mapper.readValue(json,
ClusterTestingTaskConfig.class);
+ return Configs.valueOrDefault(config, ClusterTestingTaskConfig.DEFAULT);
+ }
+
+ public MetadataConfig getMetadataConfig()
+ {
+ return metadataConfig;
+ }
+
+ public OverlordClientConfig getOverlordClientConfig()
+ {
+ return overlordClientConfig;
+ }
+
+ public CoordinatorClientConfig getCoordinatorClientConfig()
+ {
+ return coordinatorClientConfig;
+ }
+
+ public TaskActionClientConfig getTaskActionClientConfig()
+ {
+ return taskActionClientConfig;
+ }
+
+ @Override
+ public String toString()
+ {
+ return '{' +
+ "overlordClientConfig=" + overlordClientConfig +
+ ", coordinatorClientConfig=" + coordinatorClientConfig +
+ ", taskActionClientConfig=" + taskActionClientConfig +
+ '}';
+ }
+
+ /**
+ * Config for manipulating communication of the task with the Overlord.
+ */
+ public static class OverlordClientConfig
+ {
+ private static final OverlordClientConfig DEFAULT = new
OverlordClientConfig();
+
+ @Override
+ public String toString()
+ {
+ return "";
+ }
+ }
+
+ /**
+ * Config for manipulating communication of the task with the Coordinator.
+ */
+ public static class CoordinatorClientConfig
+ {
+ private static final CoordinatorClientConfig DEFAULT = new
CoordinatorClientConfig(null);
+
+ private final Duration minSegmentHandoffDelay;
+
+ @JsonCreator
+ public CoordinatorClientConfig(
+ @JsonProperty("minSegmentHandoffDelay") @Nullable Duration
minSegmentHandoffDelay
+ )
+ {
+ this.minSegmentHandoffDelay = minSegmentHandoffDelay;
+ }
+
+ /**
+ * Minimum duration for which segment handoff is assumed to not have
completed.
+ * The actual handoff status is fetched from the Coordinator only after
this
+ * duration has elapsed. This delay applies to each segment separately.
+ * <p>
+ * For any segment,
+ * <pre>
+ * observed handoff time = actual handoff time + minSegmentHandoffDelay
+ * </pre>
+ */
+ @Nullable
+ public Duration getMinSegmentHandoffDelay()
+ {
+ return minSegmentHandoffDelay;
+ }
+
+ @Override
+ public String toString()
+ {
+ return "{" +
+ "minSegmentHandoffDelay=" + minSegmentHandoffDelay +
+ '}';
+ }
+ }
+
+ /**
+ * Config for manipulating task actions submitted by the task to the
Overlord.
+ */
+ public static class TaskActionClientConfig
+ {
+ private static final TaskActionClientConfig DEFAULT = new
TaskActionClientConfig(null, null);
+
+ private final Duration segmentPublishDelay;
+ private final Duration segmentAllocateDelay;
+
+ @JsonCreator
+ public TaskActionClientConfig(
+ @JsonProperty("segmentAllocateDelay") @Nullable Duration
segmentAllocateDelay,
+ @JsonProperty("segmentPublishDelay") @Nullable Duration
segmentPublishDelay
+ )
+ {
+ this.segmentAllocateDelay = segmentAllocateDelay;
+ this.segmentPublishDelay = segmentPublishDelay;
+ }
+
+ /**
+ * Duration to wait before sending a segment publish request to the
Overlord.
+ * <p>
+ * For each publish request (containing one or more segments),
+ * <pre>
+ * observed publish time = actual publish time + segmentPublishDelay
+ * </pre>
+ */
+ @Nullable
+ public Duration getSegmentPublishDelay()
+ {
+ return segmentPublishDelay;
+ }
+
+ /**
+ * Duration to wait before sending a segment allocate request to the
Overlord.
+ * <p>
+ * For each segment,
+ * <pre>
+ * observed segment allocation time = actual allocation time +
segmentAllocateDelay
+ * </pre>
+ */
+ @Nullable
+ public Duration getSegmentAllocateDelay()
+ {
+ return segmentAllocateDelay;
+ }
+
+ @Override
+ public String toString()
+ {
+ return "{" +
+ "segmentPublishDelay=" + segmentPublishDelay +
+ ", segmentAllocateDelay=" + segmentAllocateDelay +
+ '}';
+ }
+ }
+
+ /**
+ * Config for manipulating metadata operations performed by the Overlord for
+ * the task.
+ */
+ public static class MetadataConfig
+ {
+ private static final MetadataConfig DEFAULT = new MetadataConfig(null);
+
+ private final boolean cleanupPendingSegments;
+
+ @JsonCreator
+ public MetadataConfig(
+ @JsonProperty("cleanupPendingSegments") @Nullable Boolean
cleanupPendingSegments
+ )
+ {
+ this.cleanupPendingSegments =
Configs.valueOrDefault(cleanupPendingSegments, true);
+ }
+
+ public boolean isCleanupPendingSegments()
+ {
+ return cleanupPendingSegments;
+ }
+ }
+}
diff --git
a/extensions-core/testing-tools/src/main/java/org/apache/druid/testing/cluster/overlord/FaultyLagAggregator.java
b/extensions-core/testing-tools/src/main/java/org/apache/druid/testing/cluster/overlord/FaultyLagAggregator.java
new file mode 100644
index 00000000000..90d9f3ac420
--- /dev/null
+++
b/extensions-core/testing-tools/src/main/java/org/apache/druid/testing/cluster/overlord/FaultyLagAggregator.java
@@ -0,0 +1,69 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.testing.cluster.overlord;
+
+import com.fasterxml.jackson.annotation.JsonCreator;
+import com.fasterxml.jackson.annotation.JsonProperty;
+import org.apache.druid.indexing.overlord.supervisor.autoscaler.LagStats;
+import org.apache.druid.indexing.seekablestream.supervisor.LagAggregator;
+import org.apache.druid.java.util.common.logger.Logger;
+
+import java.util.Map;
+
+/**
+ * Implementation of {@link LagAggregator} that supports the following:
+ * <ul>
+ * <li>Specify a {@code multiplier} to amplify the lag observed by the Overlord
+ * for a given supervisor.</li>
+ * </ul>
+ */
+public class FaultyLagAggregator implements LagAggregator
+{
+ private static final Logger log = new Logger(FaultyLagAggregator.class);
+
+ private final int lagMultiplier;
+ private final LagAggregator delegate = LagAggregator.DEFAULT;
+
+ @JsonCreator
+ public FaultyLagAggregator(
+ @JsonProperty("lagMultiplier") int lagMultiplier
+ )
+ {
+ this.lagMultiplier = lagMultiplier;
+ log.info("Multiplying lags by factor[%d].", lagMultiplier);
+ }
+
+ @JsonProperty
+ public int getLagMultiplier()
+ {
+ return lagMultiplier;
+ }
+
+ @Override
+ public <PartitionIdType> LagStats aggregate(Map<PartitionIdType, Long>
partitionLags)
+ {
+ LagStats originalAggregate = delegate.aggregate(partitionLags);
+ return new LagStats(
+ originalAggregate.getMaxLag() * getLagMultiplier(),
+ originalAggregate.getTotalLag() * getLagMultiplier(),
+ originalAggregate.getAvgLag() * getLagMultiplier()
+ );
+ }
+}
diff --git
a/extensions-core/testing-tools/src/main/java/org/apache/druid/testing/cluster/overlord/FaultyTaskLockbox.java
b/extensions-core/testing-tools/src/main/java/org/apache/druid/testing/cluster/overlord/FaultyTaskLockbox.java
new file mode 100644
index 00000000000..d2639dcfbba
--- /dev/null
+++
b/extensions-core/testing-tools/src/main/java/org/apache/druid/testing/cluster/overlord/FaultyTaskLockbox.java
@@ -0,0 +1,85 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.testing.cluster.overlord;
+
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.google.inject.Inject;
+import org.apache.druid.indexing.common.task.Task;
+import org.apache.druid.indexing.overlord.IndexerMetadataStorageCoordinator;
+import org.apache.druid.indexing.overlord.TaskLockbox;
+import org.apache.druid.indexing.overlord.TaskStorage;
+import org.apache.druid.java.util.common.logger.Logger;
+import org.apache.druid.testing.cluster.ClusterTestingTaskConfig;
+
+/**
+ * Implementation of {@link TaskLockbox} that supports the following:
+ * <ul>
+ * <li>Skip cleanup of pending segments</li>
+ * </ul>
+ */
+public class FaultyTaskLockbox extends TaskLockbox
+{
+ private static final Logger log = new Logger(FaultyTaskLockbox.class);
+
+ private final ObjectMapper mapper;
+
+ @Inject
+ public FaultyTaskLockbox(
+ TaskStorage taskStorage,
+ IndexerMetadataStorageCoordinator metadataStorageCoordinator,
+ ObjectMapper mapper
+ )
+ {
+ super(taskStorage, metadataStorageCoordinator);
+ this.mapper = mapper;
+ log.info("Initializing FaultyTaskLockbox.");
+ }
+
+ @Override
+ protected void cleanupPendingSegments(Task task)
+ {
+ final ClusterTestingTaskConfig testingConfig = getTestingConfig(task);
+ if (testingConfig.getMetadataConfig().isCleanupPendingSegments()) {
+ super.cleanupPendingSegments(task);
+ } else {
+ log.info(
+ "Skipping cleanup of pending segments for task[%s] since it has
testing config[%s].",
+ task.getId(), testingConfig
+ );
+ }
+ }
+
+ private ClusterTestingTaskConfig getTestingConfig(Task task)
+ {
+ try {
+ return ClusterTestingTaskConfig.forTask(task, mapper);
+ }
+ catch (Exception e) {
+ // Just log the exception and proceed with the actual cleanup
+ log.error(
+ e,
+ "Error while reading testing config for task[%s] with context[%s]."
+ + " Using default config with no faults.",
+ task.getId(), task.getContext()
+ );
+ return ClusterTestingTaskConfig.DEFAULT;
+ }
+ }
+}
diff --git
a/extensions-core/testing-tools/src/main/java/org/apache/druid/testing/cluster/task/FaultyCoordinatorClient.java
b/extensions-core/testing-tools/src/main/java/org/apache/druid/testing/cluster/task/FaultyCoordinatorClient.java
new file mode 100644
index 00000000000..c03478f33fa
--- /dev/null
+++
b/extensions-core/testing-tools/src/main/java/org/apache/druid/testing/cluster/task/FaultyCoordinatorClient.java
@@ -0,0 +1,108 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.testing.cluster.task;
+
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.google.common.util.concurrent.Futures;
+import com.google.common.util.concurrent.ListenableFuture;
+import com.google.inject.Inject;
+import com.google.inject.Provider;
+import org.apache.druid.client.coordinator.Coordinator;
+import org.apache.druid.client.coordinator.CoordinatorClientImpl;
+import org.apache.druid.discovery.NodeRole;
+import org.apache.druid.guice.annotations.EscalatedGlobal;
+import org.apache.druid.guice.annotations.Json;
+import org.apache.druid.java.util.common.Stopwatch;
+import org.apache.druid.java.util.common.logger.Logger;
+import org.apache.druid.query.SegmentDescriptor;
+import org.apache.druid.rpc.ServiceClientFactory;
+import org.apache.druid.rpc.ServiceLocator;
+import org.apache.druid.rpc.StandardRetryPolicy;
+import org.apache.druid.testing.cluster.ClusterTestingTaskConfig;
+import org.joda.time.Duration;
+
+import java.util.concurrent.ConcurrentHashMap;
+
+/**
+ * Implementation of {@code CoordinatorClient} that supports the following:
+ * <ul>
+ * <li>Add a {@code minSegmentHandoffDelay} before making a call to Coordinator
+ * to get the handoff status of a single segment.</li>
+ * </ul>
+ */
+public class FaultyCoordinatorClient extends CoordinatorClientImpl
+{
+ private static final Logger log = new Logger(FaultyCoordinatorClient.class);
+
+ private final Provider<ClusterTestingTaskConfig> testConfigProvider;
+ private final ConcurrentHashMap<SegmentDescriptor, Stopwatch>
segmentHandoffTimers = new ConcurrentHashMap<>();
+
+ @Inject
+ public FaultyCoordinatorClient(
+ Provider<ClusterTestingTaskConfig> testingConfigProvider,
+ @Json final ObjectMapper jsonMapper,
+ @EscalatedGlobal final ServiceClientFactory clientFactory,
+ @Coordinator final ServiceLocator serviceLocator
+ )
+ {
+ super(
+ clientFactory.makeClient(
+ NodeRole.COORDINATOR.getJsonName(),
+ serviceLocator,
+ StandardRetryPolicy.builder().maxAttempts(6).build()
+ ),
+ jsonMapper
+ );
+ this.testConfigProvider = testingConfigProvider;
+ }
+
+ @Override
+ public ListenableFuture<Boolean> isHandoffComplete(String dataSource,
SegmentDescriptor descriptor)
+ {
+ final Duration minHandoffDelay = getHandoffDelay();
+ if (minHandoffDelay != null) {
+ final Stopwatch sinceHandoffCheckStarted =
segmentHandoffTimers.computeIfAbsent(
+ descriptor,
+ d -> Stopwatch.createStarted()
+ );
+
+ if (sinceHandoffCheckStarted.hasElapsed(minHandoffDelay)) {
+ // Wait period is over, check with Coordinator now, but do not remove
+ // the stopwatch from the map. This ensures that we do not create a new
+ // Stopwatch causing further delays.
+ log.info(
+ "Min handoff delay[%s] has elapsed for segment[%s]. Checking with
Coordinator for actual handoff status.",
+ minHandoffDelay, descriptor
+ );
+ } else {
+ // Until the min handoff delay has elapsed, keep returning false
+ return Futures.immediateFuture(false);
+ }
+ }
+
+ // Call Coordinator for the actual handoff status
+ return super.isHandoffComplete(dataSource, descriptor);
+ }
+
+ private Duration getHandoffDelay()
+ {
+ return
testConfigProvider.get().getCoordinatorClientConfig().getMinSegmentHandoffDelay();
+ }
+}
diff --git
a/extensions-core/testing-tools/src/main/java/org/apache/druid/testing/cluster/task/FaultyOverlordClient.java
b/extensions-core/testing-tools/src/main/java/org/apache/druid/testing/cluster/task/FaultyOverlordClient.java
new file mode 100644
index 00000000000..ba96840b833
--- /dev/null
+++
b/extensions-core/testing-tools/src/main/java/org/apache/druid/testing/cluster/task/FaultyOverlordClient.java
@@ -0,0 +1,59 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.testing.cluster.task;
+
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.google.inject.Inject;
+import org.apache.druid.client.coordinator.Coordinator;
+import org.apache.druid.discovery.NodeRole;
+import org.apache.druid.guice.annotations.EscalatedGlobal;
+import org.apache.druid.guice.annotations.Json;
+import org.apache.druid.java.util.common.logger.Logger;
+import org.apache.druid.rpc.ServiceClientFactory;
+import org.apache.druid.rpc.ServiceLocator;
+import org.apache.druid.rpc.StandardRetryPolicy;
+import org.apache.druid.rpc.indexing.OverlordClientImpl;
+import org.apache.druid.testing.cluster.ClusterTestingTaskConfig;
+
+public class FaultyOverlordClient extends OverlordClientImpl
+{
+ private static final Logger log = new Logger(FaultyOverlordClient.class);
+
+ private final ClusterTestingTaskConfig testingConfig;
+
+ @Inject
+ public FaultyOverlordClient(
+ ClusterTestingTaskConfig testingConfig,
+ @Json final ObjectMapper jsonMapper,
+ @EscalatedGlobal final ServiceClientFactory clientFactory,
+ @Coordinator final ServiceLocator serviceLocator
+ )
+ {
+ super(
+ clientFactory.makeClient(
+ NodeRole.COORDINATOR.getJsonName(),
+ serviceLocator,
+ StandardRetryPolicy.builder().maxAttempts(6).build()
+ ),
+ jsonMapper
+ );
+ this.testingConfig = testingConfig;
+ }
+}
diff --git
a/extensions-core/testing-tools/src/main/java/org/apache/druid/testing/cluster/task/FaultyRemoteTaskActionClient.java
b/extensions-core/testing-tools/src/main/java/org/apache/druid/testing/cluster/task/FaultyRemoteTaskActionClient.java
new file mode 100644
index 00000000000..8031d7f47db
--- /dev/null
+++
b/extensions-core/testing-tools/src/main/java/org/apache/druid/testing/cluster/task/FaultyRemoteTaskActionClient.java
@@ -0,0 +1,93 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.testing.cluster.task;
+
+import org.apache.druid.indexing.common.actions.SegmentAllocateAction;
+import
org.apache.druid.indexing.common.actions.SegmentTransactionalAppendAction;
+import
org.apache.druid.indexing.common.actions.SegmentTransactionalInsertAction;
+import
org.apache.druid.indexing.common.actions.SegmentTransactionalReplaceAction;
+import org.apache.druid.indexing.common.actions.TaskAction;
+import org.apache.druid.indexing.common.actions.TaskActionClient;
+import org.apache.druid.java.util.common.logger.Logger;
+import org.apache.druid.testing.cluster.ClusterTestingTaskConfig;
+import org.joda.time.Duration;
+
+import java.io.IOException;
+
+/**
+ * Implementation of {@link TaskActionClient} that supports the following:
+ * <ul>
+ * <li>Add a {@code segmentAllocateDelay} before sending a segment allocate
+ * request to the Overlord.</li>
+ * <li>Add a {@code segmentPublishDelay} before sending a segment publish
+ * request to the Overlord.</li>
+ * </ul>
+ */
+public class FaultyRemoteTaskActionClient implements TaskActionClient
+{
+ private static final Logger log = new
Logger(FaultyRemoteTaskActionClient.class);
+
+ private final TaskActionClient delegate;
+ private final ClusterTestingTaskConfig.TaskActionClientConfig config;
+
+ FaultyRemoteTaskActionClient(
+ ClusterTestingTaskConfig.TaskActionClientConfig config,
+ TaskActionClient delegate
+ )
+ {
+ this.config = config;
+ this.delegate = delegate;
+ log.info("Initializing FaultyRemoteTaskActionClient with config[%s].",
config);
+ }
+
+ @Override
+ public <RetType> RetType submit(TaskAction<RetType> taskAction) throws
IOException
+ {
+ if (taskAction instanceof SegmentAllocateAction
+ && config.getSegmentAllocateDelay() != null) {
+ log.warn("Sleeping for duration[%s] before allocating segments.",
config.getSegmentAllocateDelay());
+ sleep(config.getSegmentAllocateDelay());
+ }
+
+ if (isPublishAction(taskAction) && config.getSegmentPublishDelay() !=
null) {
+ log.warn("Sleeping for duration[%s] before publishing segments.",
config.getSegmentPublishDelay());
+ sleep(config.getSegmentPublishDelay());
+ }
+
+ return delegate.submit(taskAction);
+ }
+
+ private static <R> boolean isPublishAction(TaskAction<R> taskAction)
+ {
+ return taskAction instanceof SegmentTransactionalInsertAction
+ || taskAction instanceof SegmentTransactionalAppendAction
+ || taskAction instanceof SegmentTransactionalReplaceAction;
+ }
+
+ private static void sleep(Duration duration)
+ {
+ try {
+ Thread.sleep(duration.getMillis());
+ }
+ catch (InterruptedException e) {
+ log.info("Interrupted while sleeping before task action.");
+ }
+ }
+}
diff --git
a/extensions-core/testing-tools/src/main/java/org/apache/druid/testing/cluster/task/FaultyRemoteTaskActionClientFactory.java
b/extensions-core/testing-tools/src/main/java/org/apache/druid/testing/cluster/task/FaultyRemoteTaskActionClientFactory.java
new file mode 100644
index 00000000000..043a49f6d14
--- /dev/null
+++
b/extensions-core/testing-tools/src/main/java/org/apache/druid/testing/cluster/task/FaultyRemoteTaskActionClientFactory.java
@@ -0,0 +1,62 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.testing.cluster.task;
+
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.google.inject.Inject;
+import org.apache.druid.client.indexing.IndexingService;
+import org.apache.druid.guice.annotations.EscalatedGlobal;
+import org.apache.druid.guice.annotations.Json;
+import org.apache.druid.indexing.common.RetryPolicyConfig;
+import org.apache.druid.indexing.common.actions.RemoteTaskActionClientFactory;
+import org.apache.druid.indexing.common.actions.TaskActionClient;
+import org.apache.druid.indexing.common.task.Task;
+import org.apache.druid.rpc.ServiceClientFactory;
+import org.apache.druid.rpc.ServiceLocator;
+import org.apache.druid.testing.cluster.ClusterTestingTaskConfig;
+
+public class FaultyRemoteTaskActionClientFactory extends
RemoteTaskActionClientFactory
+{
+ private final ClusterTestingTaskConfig config;
+
+ @Inject
+ public FaultyRemoteTaskActionClientFactory(
+ @Json final ObjectMapper jsonMapper,
+ @EscalatedGlobal final ServiceClientFactory clientFactory,
+ @IndexingService final ServiceLocator serviceLocator,
+ RetryPolicyConfig retryPolicyConfig,
+ ClusterTestingTaskConfig config
+ )
+ {
+ super(
+ jsonMapper,
+ clientFactory,
+ serviceLocator,
+ retryPolicyConfig
+ );
+ this.config = config;
+ }
+
+ @Override
+ public TaskActionClient create(Task task)
+ {
+ return new
FaultyRemoteTaskActionClient(config.getTaskActionClientConfig(),
super.create(task));
+ }
+}
diff --git
a/extensions-core/testing-tools/src/main/resources/META-INF/services/org.apache.druid.initialization.DruidModule
b/extensions-core/testing-tools/src/main/resources/META-INF/services/org.apache.druid.initialization.DruidModule
index f2ddf3cf538..2af3ce8a65e 100644
---
a/extensions-core/testing-tools/src/main/resources/META-INF/services/org.apache.druid.initialization.DruidModule
+++
b/extensions-core/testing-tools/src/main/resources/META-INF/services/org.apache.druid.initialization.DruidModule
@@ -14,3 +14,4 @@
# limitations under the License.
org.apache.druid.guice.SleepModule
+org.apache.druid.guice.ClusterTestingModule
diff --git
a/extensions-core/testing-tools/src/test/java/org/apache/druid/guice/ClusterTestingModuleTest.java
b/extensions-core/testing-tools/src/test/java/org/apache/druid/guice/ClusterTestingModuleTest.java
new file mode 100644
index 00000000000..39ff06b6035
--- /dev/null
+++
b/extensions-core/testing-tools/src/test/java/org/apache/druid/guice/ClusterTestingModuleTest.java
@@ -0,0 +1,288 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.guice;
+
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.google.inject.Injector;
+import org.apache.commons.io.FileUtils;
+import org.apache.druid.cli.CliOverlord;
+import org.apache.druid.cli.CliPeon;
+import org.apache.druid.client.coordinator.CoordinatorClient;
+import org.apache.druid.client.coordinator.CoordinatorClientImpl;
+import org.apache.druid.client.coordinator.NoopCoordinatorClient;
+import org.apache.druid.data.input.impl.JsonInputFormat;
+import org.apache.druid.discovery.NodeRole;
+import org.apache.druid.indexing.common.SegmentCacheManagerFactory;
+import org.apache.druid.indexing.common.actions.RemoteTaskActionClientFactory;
+import org.apache.druid.indexing.common.actions.TaskActionClientFactory;
+import org.apache.druid.indexing.common.config.TaskConfigBuilder;
+import org.apache.druid.indexing.common.task.NoopTask;
+import org.apache.druid.indexing.common.task.Task;
+import
org.apache.druid.indexing.common.task.batch.parallel.ParallelIndexIOConfig;
+import
org.apache.druid.indexing.common.task.batch.parallel.ParallelIndexIngestionSpec;
+import
org.apache.druid.indexing.common.task.batch.parallel.ParallelIndexSupervisorTask;
+import
org.apache.druid.indexing.common.task.batch.parallel.ParallelIndexTuningConfig;
+import org.apache.druid.indexing.input.DruidInputSource;
+import org.apache.druid.indexing.overlord.TaskLockbox;
+import org.apache.druid.java.util.common.Intervals;
+import org.apache.druid.query.filter.TrueDimFilter;
+import org.apache.druid.rpc.indexing.OverlordClient;
+import org.apache.druid.rpc.indexing.OverlordClientImpl;
+import org.apache.druid.segment.IndexIO;
+import org.apache.druid.segment.TestHelper;
+import org.apache.druid.segment.column.ColumnConfig;
+import org.apache.druid.segment.indexing.DataSchema;
+import org.apache.druid.testing.cluster.ClusterTestingTaskConfig;
+import org.apache.druid.testing.cluster.overlord.FaultyTaskLockbox;
+import org.apache.druid.testing.cluster.task.FaultyCoordinatorClient;
+import org.apache.druid.testing.cluster.task.FaultyOverlordClient;
+import
org.apache.druid.testing.cluster.task.FaultyRemoteTaskActionClientFactory;
+import org.joda.time.Duration;
+import org.junit.Assert;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.TemporaryFolder;
+
+import java.io.File;
+import java.io.IOException;
+import java.nio.charset.StandardCharsets;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+
+public class ClusterTestingModuleTest
+{
+ private static final ObjectMapper MAPPER = TestHelper
+ .makeJsonMapper()
+ .registerModules(new
IndexingServiceTuningConfigModule().getJacksonModules())
+ .registerModules(new
IndexingServiceInputSourceModule().getJacksonModules());
+
+ @Rule
+ public final TemporaryFolder temporaryFolder = new TemporaryFolder();
+
+ @Test
+ public void test_peonRunnable_isNotModified_ifTestingIsDisabled() throws
IOException
+ {
+ try {
+ final CliPeon peon = new CliPeon();
+ System.setProperty("druid.unsafe.cluster.testing", "false");
+
+ // Write out the task payload in a temporary json file
+ File file = temporaryFolder.newFile("task.json");
+ FileUtils.write(file, "{\"type\":\"noop\"}", StandardCharsets.UTF_8);
+ peon.taskAndStatusFile = List.of(file.getParent(), "1");
+
+ final Injector baseInjector = new
StartupInjectorBuilder().forServer().build();
+ baseInjector.injectMembers(peon);
+
+ final Injector peonInjector = peon.makeInjector(Set.of(NodeRole.PEON));
+
+ CoordinatorClient coordinatorClient =
peonInjector.getInstance(CoordinatorClient.class);
+ Assert.assertTrue(coordinatorClient instanceof CoordinatorClientImpl);
+
+ OverlordClient overlordClient =
peonInjector.getInstance(OverlordClient.class);
+ Assert.assertTrue(overlordClient instanceof OverlordClientImpl);
+
+ TaskActionClientFactory taskActionClientFactory =
peonInjector.getInstance(TaskActionClientFactory.class);
+ Assert.assertTrue(taskActionClientFactory instanceof
RemoteTaskActionClientFactory);
+ }
+ finally {
+ System.clearProperty("druid.unsafe.cluster.testing");
+ }
+ }
+
+ @Test
+ public void test_peonRunnable_hasFaultyClients_ifTestingIsEnabled() throws
IOException
+ {
+ try {
+ final CliPeon peon = new CliPeon();
+ System.setProperty("druid.unsafe.cluster.testing", "true");
+
+ // Write out the task payload in a temporary json file
+ File file = temporaryFolder.newFile("task.json");
+ FileUtils.write(file, "{\"type\":\"noop\"}", StandardCharsets.UTF_8);
+ peon.taskAndStatusFile = List.of(file.getParent(), "1");
+
+ final Injector baseInjector = new
StartupInjectorBuilder().forServer().build();
+ baseInjector.injectMembers(peon);
+
+ final Injector peonInjector = peon.makeInjector(Set.of(NodeRole.PEON));
+
+ CoordinatorClient coordinatorClient =
peonInjector.getInstance(CoordinatorClient.class);
+ Assert.assertTrue(coordinatorClient instanceof FaultyCoordinatorClient);
+
+ OverlordClient overlordClient =
peonInjector.getInstance(OverlordClient.class);
+ Assert.assertTrue(overlordClient instanceof FaultyOverlordClient);
+
+ TaskActionClientFactory taskActionClientFactory =
peonInjector.getInstance(TaskActionClientFactory.class);
+ Assert.assertTrue(taskActionClientFactory instanceof
FaultyRemoteTaskActionClientFactory);
+ }
+ finally {
+ System.clearProperty("druid.unsafe.cluster.testing");
+ }
+ }
+
+ @Test
+ public void test_peonRunnable_getsConfigParams_ifProvidedInTaskContext()
throws IOException
+ {
+ try {
+ final CliPeon peon = new CliPeon();
+ System.setProperty("druid.unsafe.cluster.testing", "true");
+
+ final Task task = new NoopTask(
+ null,
+ null,
+ null,
+ 0L,
+ 0L,
+ Map.of("clusterTesting", createClusterTestingConfigMap())
+ );
+
+ // Write out the task payload in a temporary json file
+ final String taskJson = MAPPER.writeValueAsString(task);
+ File file = temporaryFolder.newFile("task.json");
+ FileUtils.write(file, taskJson, StandardCharsets.UTF_8);
+ peon.taskAndStatusFile = List.of(file.getParent(), "1");
+
+ final Injector baseInjector = new
StartupInjectorBuilder().forServer().build();
+ baseInjector.injectMembers(peon);
+
+ final Injector peonInjector = peon.makeInjector(Set.of(NodeRole.PEON));
+
+ final ClusterTestingTaskConfig taskConfig =
peonInjector.getInstance(ClusterTestingTaskConfig.class);
+ verifyTestingConfig(taskConfig);
+ }
+ finally {
+ System.clearProperty("druid.unsafe.cluster.testing");
+ }
+ }
+
+ @Test
+ public void
test_parallelIndexSupervisorTask_withDruidInputSource_hasNoCircularDeps()
throws IOException
+ {
+ try {
+ final CliPeon peon = new CliPeon();
+ System.setProperty("druid.unsafe.cluster.testing", "true");
+
+ // Create a ParallelIndexSupervisorTask
+ final IndexIO indexIO = new IndexIO(MAPPER, ColumnConfig.DEFAULT);
+ final DruidInputSource inputSource = new DruidInputSource(
+ "test",
+ Intervals.ETERNITY,
+ null,
+ TrueDimFilter.instance(),
+ null,
+ null,
+ indexIO,
+ new NoopCoordinatorClient(),
+ new SegmentCacheManagerFactory(indexIO, MAPPER),
+ new TaskConfigBuilder().build()
+ );
+ final ParallelIndexIOConfig ioConfig = new ParallelIndexIOConfig(
+ inputSource,
+ new JsonInputFormat(null, null, null, null, null),
+ false,
+ null
+ );
+ final Task task = new ParallelIndexSupervisorTask(
+ "test-task",
+ null,
+ null,
+ new ParallelIndexIngestionSpec(
+ DataSchema.builder().withDataSource("test").build(),
+ ioConfig,
+ ParallelIndexTuningConfig.defaultConfig()
+ ),
+ Map.of("clusterTesting", createClusterTestingConfigMap())
+ );
+
+ // Write out the task payload in a temporary json file
+ final String taskJson = MAPPER.writeValueAsString(task);
+ File file = temporaryFolder.newFile("task.json");
+ FileUtils.write(file, taskJson, StandardCharsets.UTF_8);
+ peon.taskAndStatusFile = List.of(file.getParent(), "1");
+
+ final Injector baseInjector = new
StartupInjectorBuilder().forServer().build();
+ baseInjector.injectMembers(peon);
+
+ final Injector peonInjector = peon.makeInjector(Set.of(NodeRole.PEON));
+
+ final ClusterTestingTaskConfig taskConfig =
peonInjector.getInstance(ClusterTestingTaskConfig.class);
+ verifyTestingConfig(taskConfig);
+ }
+ finally {
+ System.clearProperty("druid.unsafe.cluster.testing");
+ }
+ }
+
+ @Test
+ public void
test_overlordService_hasFaultyStorageCoordinator_ifTestingIsEnabled()
+ {
+ try {
+ final CliOverlord overlord = new CliOverlord();
+ System.setProperty("druid.unsafe.cluster.testing", "true");
+
+ final Injector baseInjector = new
StartupInjectorBuilder().forServer().build();
+ baseInjector.injectMembers(overlord);
+
+ final Injector overlordInjector =
overlord.makeInjector(Set.of(NodeRole.OVERLORD));
+
+ TaskLockbox taskLockbox =
overlordInjector.getInstance(TaskLockbox.class);
+ Assert.assertTrue(taskLockbox instanceof FaultyTaskLockbox);
+ }
+ finally {
+ System.clearProperty("druid.unsafe.cluster.testing");
+ }
+ }
+
+ private static void verifyTestingConfig(ClusterTestingTaskConfig taskConfig)
+ {
+ Assert.assertNotNull(taskConfig);
+ Assert.assertNotNull(taskConfig.getCoordinatorClientConfig());
+ Assert.assertNotNull(taskConfig.getOverlordClientConfig());
+ Assert.assertNotNull(taskConfig.getTaskActionClientConfig());
+ Assert.assertNotNull(taskConfig.getMetadataConfig());
+
+ Assert.assertEquals(
+ Duration.standardSeconds(10),
+ taskConfig.getTaskActionClientConfig().getSegmentPublishDelay()
+ );
+ Assert.assertEquals(
+ Duration.standardSeconds(5),
+ taskConfig.getTaskActionClientConfig().getSegmentAllocateDelay()
+ );
+ Assert.assertEquals(
+ Duration.standardSeconds(30),
+ taskConfig.getCoordinatorClientConfig().getMinSegmentHandoffDelay()
+ );
+ Assert.assertFalse(
+ taskConfig.getMetadataConfig().isCleanupPendingSegments()
+ );
+ }
+
+ private Map<String, Object> createClusterTestingConfigMap()
+ {
+ return Map.of(
+ "coordinatorClientConfig", Map.of("minSegmentHandoffDelay", "PT30S"),
+ "taskActionClientConfig", Map.of("segmentPublishDelay", "PT10S",
"segmentAllocateDelay", "PT5S"),
+ "metadataConfig", Map.of("cleanupPendingSegments", false)
+ );
+ }
+}
diff --git
a/indexing-service/src/main/java/org/apache/druid/indexing/overlord/TaskLockbox.java
b/indexing-service/src/main/java/org/apache/druid/indexing/overlord/TaskLockbox.java
index d46cc6071d0..9815bb14a71 100644
---
a/indexing-service/src/main/java/org/apache/druid/indexing/overlord/TaskLockbox.java
+++
b/indexing-service/src/main/java/org/apache/druid/indexing/overlord/TaskLockbox.java
@@ -1259,30 +1259,45 @@ public class TaskLockbox
);
}
- // Clean up pending segments associated with an APPEND task
- if (task instanceof PendingSegmentAllocatingTask) {
- final String taskAllocatorId = ((PendingSegmentAllocatingTask)
task).getTaskAllocatorId();
- if (activeAllocatorIdToTaskIds.containsKey(taskAllocatorId)) {
- final Set<String> taskIdsForSameAllocator =
activeAllocatorIdToTaskIds.get(taskAllocatorId);
- taskIdsForSameAllocator.remove(task.getId());
-
- if (taskIdsForSameAllocator.isEmpty()) {
- final int pendingSegmentsDeleted = metadataStorageCoordinator
- .deletePendingSegmentsForTaskAllocatorId(task.getDataSource(),
taskAllocatorId);
- log.info(
- "Deleted [%d] entries from pendingSegments table for
taskAllocatorId[%s].",
- pendingSegmentsDeleted, taskAllocatorId
- );
- }
- activeAllocatorIdToTaskIds.remove(taskAllocatorId);
- }
- }
+ cleanupPendingSegments(task);
}
catch (Exception e) {
log.warn(e, "Failure cleaning up upgradeSegments or pendingSegments
tables.");
}
}
+ /**
+ * Cleans up pending segments associated with an APPEND task.
+ */
+ protected void cleanupPendingSegments(Task task)
+ {
+ if (!(task instanceof PendingSegmentAllocatingTask)) {
+ return;
+ }
+
+ giant.lock();
+ try {
+ final String taskAllocatorId = ((PendingSegmentAllocatingTask)
task).getTaskAllocatorId();
+ if (activeAllocatorIdToTaskIds.containsKey(taskAllocatorId)) {
+ final Set<String> taskIdsForSameAllocator =
activeAllocatorIdToTaskIds.get(taskAllocatorId);
+ taskIdsForSameAllocator.remove(task.getId());
+
+ if (taskIdsForSameAllocator.isEmpty()) {
+ final int pendingSegmentsDeleted = metadataStorageCoordinator
+ .deletePendingSegmentsForTaskAllocatorId(task.getDataSource(),
taskAllocatorId);
+ log.info(
+ "Deleted [%d] entries from pendingSegments table for
taskAllocatorId[%s].",
+ pendingSegmentsDeleted, taskAllocatorId
+ );
+ }
+ activeAllocatorIdToTaskIds.remove(taskAllocatorId);
+ }
+ }
+ finally {
+ giant.unlock();
+ }
+ }
+
/**
* Finds all the lock posses for the given task.
*/
diff --git
a/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/LagAggregator.java
b/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/LagAggregator.java
new file mode 100644
index 00000000000..d84ad70aec9
--- /dev/null
+++
b/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/LagAggregator.java
@@ -0,0 +1,65 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.indexing.seekablestream.supervisor;
+
+import com.fasterxml.jackson.annotation.JsonSubTypes;
+import com.fasterxml.jackson.annotation.JsonTypeInfo;
+import org.apache.druid.indexing.overlord.supervisor.autoscaler.LagStats;
+
+import java.util.Map;
+
+/**
+ * Calculates the maximum, average and total values of lag from the values of
+ * lag for each stream partition for a given supervisor.
+ * <p>
+ * This interface is currently needed only to augment the capability of the
+ * default implementation {@link DefaultLagAggregator} for testing purposes.
+ */
+@JsonTypeInfo(use = JsonTypeInfo.Id.NAME, property = "type")
+@JsonSubTypes(value = {
+ @JsonSubTypes.Type(name = "default", value =
LagAggregator.DefaultLagAggregator.class)
+})
+public interface LagAggregator
+{
+ LagAggregator DEFAULT = new DefaultLagAggregator();
+
+ <PartitionIdType> LagStats aggregate(Map<PartitionIdType, Long>
partitionLags);
+
+ /**
+ * Default implementation of LagAggregator which should be used for all
+ * production use cases.
+ */
+ class DefaultLagAggregator implements LagAggregator
+ {
+ @Override
+ public <PartitionIdType> LagStats aggregate(Map<PartitionIdType, Long>
partitionLags)
+ {
+ long maxLag = 0, totalLag = 0;
+ for (long lag : partitionLags.values()) {
+ if (lag > maxLag) {
+ maxLag = lag;
+ }
+ totalLag += Math.max(lag, 0);
+ }
+ final long avgLag = partitionLags.isEmpty() ? 0 : totalLag /
partitionLags.size();
+ return new LagStats(maxLag, totalLag, avgLag);
+ }
+ }
+}
diff --git
a/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisor.java
b/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisor.java
index de231458ef1..9c6fb46e82e 100644
---
a/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisor.java
+++
b/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisor.java
@@ -4598,7 +4598,7 @@ public abstract class
SeekableStreamSupervisor<PartitionIdType, SequenceOffsetTy
return;
}
- LagStats lagStats = computeLags(partitionLags);
+ final LagStats lagStats = aggregatePartitionLags(partitionLags);
Map<String, Object> metricTags =
spec.getContextValue(DruidMetrics.TAGS);
for (Map.Entry<PartitionIdType, Long> entry :
partitionLags.entrySet()) {
emitter.emit(
@@ -4651,17 +4651,9 @@ public abstract class
SeekableStreamSupervisor<PartitionIdType, SequenceOffsetTy
*
* @param partitionLags lags per partition
*/
- protected LagStats computeLags(Map<PartitionIdType, Long> partitionLags)
+ protected LagStats aggregatePartitionLags(Map<PartitionIdType, Long>
partitionLags)
{
- long maxLag = 0, totalLag = 0, avgLag;
- for (long lag : partitionLags.values()) {
- if (lag > maxLag) {
- maxLag = lag;
- }
- totalLag += lag;
- }
- avgLag = partitionLags.size() == 0 ? 0 : totalLag / partitionLags.size();
- return new LagStats(maxLag, totalLag, avgLag);
+ return spec.getIoConfig().getLagAggregator().aggregate(partitionLags);
}
/**
diff --git
a/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisorIOConfig.java
b/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisorIOConfig.java
index 4b3e9650cb6..a28f06d337e 100644
---
a/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisorIOConfig.java
+++
b/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisorIOConfig.java
@@ -23,6 +23,7 @@ import com.fasterxml.jackson.annotation.JsonProperty;
import com.google.common.base.Optional;
import com.google.common.base.Preconditions;
import org.apache.druid.data.input.InputFormat;
+import org.apache.druid.error.InvalidInput;
import
org.apache.druid.indexing.seekablestream.supervisor.autoscaler.AutoScalerConfig;
import org.apache.druid.java.util.common.IAE;
import org.joda.time.DateTime;
@@ -51,6 +52,8 @@ public abstract class SeekableStreamSupervisorIOConfig
@Nullable private final IdleConfig idleConfig;
@Nullable private final Integer stopTaskCount;
+ private final LagAggregator lagAggregator;
+
public SeekableStreamSupervisorIOConfig(
String stream,
@Nullable InputFormat inputFormat,
@@ -64,6 +67,7 @@ public abstract class SeekableStreamSupervisorIOConfig
Period lateMessageRejectionPeriod,
Period earlyMessageRejectionPeriod,
@Nullable AutoScalerConfig autoScalerConfig,
+ LagAggregator lagAggregator,
DateTime lateMessageRejectionStartDateTime,
@Nullable IdleConfig idleConfig,
@Nullable Integer stopTaskCount
@@ -72,6 +76,12 @@ public abstract class SeekableStreamSupervisorIOConfig
this.stream = Preconditions.checkNotNull(stream, "stream cannot be null");
this.inputFormat = inputFormat;
this.replicas = replicas != null ? replicas : 1;
+
+ InvalidInput.conditionalException(
+ lagAggregator != null,
+ "'lagAggregator' must be specified in supervisor 'spec.ioConfig'"
+ );
+ this.lagAggregator = lagAggregator;
// Could be null
this.autoScalerConfig = autoScalerConfig;
// if autoscaler is enable then taskcount will be ignored here. and init
taskcount will be equal to taskCountMin
@@ -140,6 +150,12 @@ public abstract class SeekableStreamSupervisorIOConfig
return autoScalerConfig;
}
+ @JsonProperty
+ public LagAggregator getLagAggregator()
+ {
+ return lagAggregator;
+ }
+
@JsonProperty
public Integer getTaskCount()
{
diff --git
a/indexing-service/src/test/java/org/apache/druid/indexing/seekablestream/SeekableStreamSamplerSpecTest.java
b/indexing-service/src/test/java/org/apache/druid/indexing/seekablestream/SeekableStreamSamplerSpecTest.java
index 740953dc076..169bbc4a788 100644
---
a/indexing-service/src/test/java/org/apache/druid/indexing/seekablestream/SeekableStreamSamplerSpecTest.java
+++
b/indexing-service/src/test/java/org/apache/druid/indexing/seekablestream/SeekableStreamSamplerSpecTest.java
@@ -41,6 +41,7 @@ import
org.apache.druid.indexing.seekablestream.common.OrderedPartitionableRecor
import org.apache.druid.indexing.seekablestream.common.RecordSupplier;
import org.apache.druid.indexing.seekablestream.common.StreamPartition;
import org.apache.druid.indexing.seekablestream.supervisor.IdleConfig;
+import org.apache.druid.indexing.seekablestream.supervisor.LagAggregator;
import
org.apache.druid.indexing.seekablestream.supervisor.SeekableStreamSupervisorIOConfig;
import
org.apache.druid.indexing.seekablestream.supervisor.SeekableStreamSupervisorSpec;
import
org.apache.druid.indexing.seekablestream.supervisor.autoscaler.AutoScalerConfig;
@@ -341,6 +342,7 @@ public class SeekableStreamSamplerSpecTest extends
EasyMockSupport
lateMessageRejectionPeriod,
earlyMessageRejectionPeriod,
autoScalerConfig,
+ LagAggregator.DEFAULT,
lateMessageRejectionStartDateTime,
idleConfig,
null
diff --git
a/indexing-service/src/test/java/org/apache/druid/indexing/seekablestream/SeekableStreamSupervisorSpecTest.java
b/indexing-service/src/test/java/org/apache/druid/indexing/seekablestream/SeekableStreamSupervisorSpecTest.java
index e8773001407..d38371275d2 100644
---
a/indexing-service/src/test/java/org/apache/druid/indexing/seekablestream/SeekableStreamSupervisorSpecTest.java
+++
b/indexing-service/src/test/java/org/apache/druid/indexing/seekablestream/SeekableStreamSupervisorSpecTest.java
@@ -44,6 +44,7 @@ import
org.apache.druid.indexing.overlord.supervisor.autoscaler.SupervisorTaskAu
import org.apache.druid.indexing.seekablestream.common.OrderedSequenceNumber;
import org.apache.druid.indexing.seekablestream.common.RecordSupplier;
import org.apache.druid.indexing.seekablestream.supervisor.IdleConfig;
+import org.apache.druid.indexing.seekablestream.supervisor.LagAggregator;
import
org.apache.druid.indexing.seekablestream.supervisor.SeekableStreamSupervisor;
import
org.apache.druid.indexing.seekablestream.supervisor.SeekableStreamSupervisorIOConfig;
import
org.apache.druid.indexing.seekablestream.supervisor.SeekableStreamSupervisorIngestionSpec;
@@ -1134,6 +1135,7 @@ public class SeekableStreamSupervisorSpecTest extends
EasyMockSupport
null,
null,
null,
+ LagAggregator.DEFAULT,
null,
null,
null
@@ -1189,6 +1191,7 @@ public class SeekableStreamSupervisorSpecTest extends
EasyMockSupport
null,
null,
null,
+ LagAggregator.DEFAULT,
null,
new IdleConfig(true, null),
null
@@ -1512,6 +1515,7 @@ public class SeekableStreamSupervisorSpecTest extends
EasyMockSupport
null,
null,
mapper.convertValue(getScaleOutProperties(2),
AutoScalerConfig.class),
+ LagAggregator.DEFAULT,
null,
null,
null
@@ -1532,6 +1536,7 @@ public class SeekableStreamSupervisorSpecTest extends
EasyMockSupport
null,
null,
mapper.convertValue(getScaleInProperties(), AutoScalerConfig.class),
+ LagAggregator.DEFAULT,
null,
null,
null
diff --git
a/indexing-service/src/test/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisorStateTest.java
b/indexing-service/src/test/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisorStateTest.java
index 259869eee7e..612e22a9d96 100644
---
a/indexing-service/src/test/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisorStateTest.java
+++
b/indexing-service/src/test/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisorStateTest.java
@@ -688,6 +688,7 @@ public class SeekableStreamSupervisorStateTest extends
EasyMockSupport
null,
null,
null,
+ LagAggregator.DEFAULT,
null,
new IdleConfig(true, 200L),
null
@@ -791,6 +792,7 @@ public class SeekableStreamSupervisorStateTest extends
EasyMockSupport
null,
null,
null,
+ LagAggregator.DEFAULT,
null,
new IdleConfig(true, 200L),
null
@@ -1077,21 +1079,22 @@ public class SeekableStreamSupervisorStateTest extends
EasyMockSupport
{
DateTime startTime = DateTimes.nowUtc();
SeekableStreamSupervisorIOConfig ioConfig = new
SeekableStreamSupervisorIOConfig(
- STREAM,
- new JsonInputFormat(new JSONPathSpec(true, ImmutableList.of()),
ImmutableMap.of(), false, false, false),
- 1,
- 1,
- new Period("PT1H"),
- new Period("PT1S"),
- new Period("PT30S"),
- false,
- new Period("PT30M"),
- null,
- null,
- null,
- null,
- new IdleConfig(true, 200L),
- null
+ STREAM,
+ new JsonInputFormat(new JSONPathSpec(true, ImmutableList.of()),
ImmutableMap.of(), false, false, false),
+ 1,
+ 1,
+ new Period("PT1H"),
+ new Period("PT1S"),
+ new Period("PT30S"),
+ false,
+ new Period("PT30M"),
+ null,
+ null,
+ null,
+ LagAggregator.DEFAULT,
+ null,
+ new IdleConfig(true, 200L),
+ null
) {};
EasyMock.reset(spec);
@@ -1296,6 +1299,7 @@ public class SeekableStreamSupervisorStateTest extends
EasyMockSupport
null,
null,
null,
+ LagAggregator.DEFAULT,
null,
new IdleConfig(true, 200L),
stopTaskCount
@@ -1520,6 +1524,7 @@ public class SeekableStreamSupervisorStateTest extends
EasyMockSupport
null,
null,
null,
+ LagAggregator.DEFAULT,
null,
new IdleConfig(true, 200L),
null
@@ -2562,6 +2567,7 @@ public class SeekableStreamSupervisorStateTest extends
EasyMockSupport
null,
null,
null,
+ LagAggregator.DEFAULT,
null,
null,
null
@@ -2624,6 +2630,7 @@ public class SeekableStreamSupervisorStateTest extends
EasyMockSupport
null,
null,
OBJECT_MAPPER.convertValue(getProperties(), AutoScalerConfig.class),
+ LagAggregator.DEFAULT,
null,
null,
null
diff --git a/integration-tests/docker/environment-configs/common
b/integration-tests/docker/environment-configs/common
index f71f6f5f563..e0de0b393e1 100644
--- a/integration-tests/docker/environment-configs/common
+++ b/integration-tests/docker/environment-configs/common
@@ -83,5 +83,8 @@ druid_audit_manager_type=log
# Can remove this when the flag is no longer needed
druid_indexer_task_ignoreTimestampSpecForDruidInputSource=true
+# Cluster testing
+druid_unsafe_cluster_testing=true
+
# Dart
druid_msq_dart_enabled = true
diff --git
a/integration-tests/src/main/java/org/apache/druid/testing/clients/OverlordResourceTestClient.java
b/integration-tests/src/main/java/org/apache/druid/testing/clients/OverlordResourceTestClient.java
index b4c97814bbb..f4e3448d331 100644
---
a/integration-tests/src/main/java/org/apache/druid/testing/clients/OverlordResourceTestClient.java
+++
b/integration-tests/src/main/java/org/apache/druid/testing/clients/OverlordResourceTestClient.java
@@ -513,6 +513,13 @@ public class OverlordResourceTestClient
}
public SupervisorStateManager.BasicState getSupervisorStatus(String id)
+ {
+ final String state = (String) getFullSupervisorStatus(id).get("state");
+ LOG.debug("Supervisor id[%s] has state [%s]", id, state);
+ return SupervisorStateManager.BasicState.valueOf(state);
+ }
+
+ public Map<String, Object> getFullSupervisorStatus(String id)
{
try {
StatusResponseHolder response = httpClient.go(
@@ -537,13 +544,10 @@ public class OverlordResourceTestClient
response.getContent(), JacksonUtils.TYPE_REFERENCE_MAP_STRING_OBJECT
);
- Map<String, Object> payload = jsonMapper.convertValue(
+ return jsonMapper.convertValue(
responseData.get("payload"),
JacksonUtils.TYPE_REFERENCE_MAP_STRING_OBJECT
);
- String state = (String) payload.get("state");
- LOG.debug("Supervisor id[%s] has state [%s]", id, state);
- return SupervisorStateManager.BasicState.valueOf(state);
}
catch (ISE e) {
throw e;
diff --git
a/integration-tests/src/test/java/org/apache/druid/tests/coordinator/duty/ITFaultyClusterTest.java
b/integration-tests/src/test/java/org/apache/druid/tests/coordinator/duty/ITFaultyClusterTest.java
new file mode 100644
index 00000000000..a3a24e3f2ed
--- /dev/null
+++
b/integration-tests/src/test/java/org/apache/druid/tests/coordinator/duty/ITFaultyClusterTest.java
@@ -0,0 +1,196 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.tests.coordinator.duty;
+
+import org.apache.druid.java.util.common.DateTimes;
+import org.apache.druid.java.util.common.logger.Logger;
+import org.apache.druid.testing.cluster.ClusterTestingTaskConfig;
+import org.apache.druid.testing.cluster.overlord.FaultyLagAggregator;
+import org.apache.druid.testing.guice.DruidTestModuleFactory;
+import org.apache.druid.testing.utils.EventSerializer;
+import org.apache.druid.testing.utils.ITRetryUtil;
+import org.apache.druid.testing.utils.KafkaUtil;
+import org.apache.druid.testing.utils.StreamEventWriter;
+import org.apache.druid.testing.utils.StreamGenerator;
+import org.apache.druid.testing.utils.WikipediaStreamEventStreamGenerator;
+import org.apache.druid.tests.TestNGGroup;
+import org.apache.druid.tests.indexer.AbstractKafkaIndexingServiceTest;
+import org.apache.druid.tests.indexer.AbstractStreamIndexingTest;
+import org.joda.time.DateTime;
+import org.joda.time.Period;
+import org.testng.annotations.BeforeClass;
+import org.testng.annotations.BeforeMethod;
+import org.testng.annotations.DataProvider;
+import org.testng.annotations.Guice;
+import org.testng.annotations.Test;
+
+import java.io.Closeable;
+import java.util.Map;
+
+/**
+ * Integration test to verify induction of various faults in the cluster
+ * using {@link ClusterTestingTaskConfig}.
+ * <p>
+ * Future tests can try to leverage the cluster testing config to write tests
+ * for cluster scalability and stability.
+ */
+@Test(groups = {TestNGGroup.KAFKA_INDEX_SLOW})
+@Guice(moduleFactory = DruidTestModuleFactory.class)
+public class ITFaultyClusterTest extends AbstractKafkaIndexingServiceTest
+{
+ private static final Logger log = new Logger(ITFaultyClusterTest.class);
+
+ private GeneratedTestConfig generatedTestConfig;
+ private StreamGenerator streamGenerator;
+
+ private String fullDatasourceName;
+
+ @DataProvider
+ public static Object[] getParameters()
+ {
+ return new Object[]{false, true};
+ }
+
+ @BeforeClass
+ public void setupClass() throws Exception
+ {
+ doBeforeClass();
+ }
+
+ @BeforeMethod
+ public void setup() throws Exception
+ {
+ generatedTestConfig = new GeneratedTestConfig(
+ Specs.PARSER_TYPE,
+ getResourceAsString(Specs.INPUT_FORMAT_PATH)
+ );
+ fullDatasourceName = generatedTestConfig.getFullDatasourceName();
+ final EventSerializer serializer = jsonMapper.readValue(
+ getResourceAsStream(Specs.SERIALIZER_PATH),
+ EventSerializer.class
+ );
+ streamGenerator = new WikipediaStreamEventStreamGenerator(serializer, 6,
100);
+ }
+
+ @Override
+ public String getTestNamePrefix()
+ {
+ return "faulty_cluster";
+ }
+
+ @Test(dataProvider = "getParameters")
+ public void test_streamingIngestion_worksWithFaultyCluster(boolean
transactionEnabled) throws Exception
+ {
+ if (shouldSkipTest(transactionEnabled)) {
+ return;
+ }
+
+ try (
+ final Closeable closer = createResourceCloser(generatedTestConfig);
+ final StreamEventWriter streamEventWriter =
createStreamEventWriter(config, transactionEnabled)
+ ) {
+ // Set context parameters
+ final Map<String, Object> taskContext = Map.of(
+ "clusterTesting",
+ Map.of(
+ "metadataConfig", Map.of("cleanupPendingSegments", false),
+ "coordinatorClientConfig", Map.of("minSegmentHandoffDelay",
"PT10S"),
+ "taskActionClientConfig", Map.of("segmentAllocateDelay",
"PT10S", "segmentPublishDelay", "PT10S")
+ )
+ );
+
+ // Start supervisor
+ final String supervisorSpec = generatedTestConfig
+ .withContext(taskContext)
+ .withLagAggregator(new FaultyLagAggregator(1_000_000))
+ .getStreamIngestionPropsTransform()
+ .apply(getResourceAsString(SUPERVISOR_SPEC_TEMPLATE_PATH));
+
+ final String supervisorId = indexer.submitSupervisor(supervisorSpec);
+ generatedTestConfig.setSupervisorId(supervisorId);
+ log.info("Submitted supervisor[%s] with spec[%s]", supervisorId,
supervisorSpec);
+
+ // Generate data for minutes 1, 2 and 3
+ final DateTime firstRecordTime = DateTimes.of("2000-01-01T01:01:00Z");
+ final long rowsForMinute1 = generateDataForMinute(firstRecordTime,
streamEventWriter);
+ final long rowsForMinute2 =
generateDataForMinute(firstRecordTime.plus(Period.minutes(1)),
streamEventWriter);
+ final long rowsForMinute3 =
generateDataForMinute(firstRecordTime.plus(Period.minutes(2)),
streamEventWriter);
+
+ ITRetryUtil.retryUntilTrue(
+ () -> {
+ final Integer aggregateLag = (Integer)
indexer.getFullSupervisorStatus(supervisorId).get("aggregateLag");
+ log.info("Aggregate lag is [%d].", aggregateLag);
+ return aggregateLag != null && aggregateLag > 1_000_000;
+ },
+ "Aggregate lag exceeds 1M"
+ );
+
+ // Wait for data to be ingested for all the minutes
+ waitUntilDatasourceRowCountEquals(fullDatasourceName, rowsForMinute1 +
rowsForMinute2 + rowsForMinute3);
+ waitForSegmentsToLoad(fullDatasourceName);
+
+ // 2 segments for each minute, total 6
+ waitUntilDatasourceSegmentCountEquals(fullDatasourceName, 6);
+ }
+ }
+
+ /**
+ * Generates data points for a minute with the specified start time.
+ *
+ * @return Number of rows generated.
+ */
+ private long generateDataForMinute(DateTime firstRecordTime,
StreamEventWriter streamEventWriter)
+ {
+ final long rowCount = streamGenerator.run(
+ generatedTestConfig.getStreamName(),
+ streamEventWriter,
+ 10,
+ firstRecordTime
+ );
+ log.info("Generated [%d] rows for 1 minute interval with start[%s]",
rowCount, firstRecordTime);
+
+ return rowCount;
+ }
+
+ /**
+ * Checks if a test should be skipped based on whether transaction is
enabled or not.
+ */
+ private boolean shouldSkipTest(boolean testEnableTransaction)
+ {
+ Map<String, String> kafkaTestProps = KafkaUtil
+ .getAdditionalKafkaTestConfigFromProperties(config);
+ boolean configEnableTransaction = Boolean.parseBoolean(
+ kafkaTestProps.getOrDefault(KafkaUtil.TEST_CONFIG_TRANSACTION_ENABLED,
"false")
+ );
+
+ return configEnableTransaction != testEnableTransaction;
+ }
+
+ /**
+ * Constants for test specs.
+ */
+ private static class Specs
+ {
+ static final String SERIALIZER_PATH = DATA_RESOURCE_ROOT +
"/csv/serializer/serializer.json";
+ static final String INPUT_FORMAT_PATH = DATA_RESOURCE_ROOT +
"/csv/input_format/input_format.json";
+ static final String PARSER_TYPE = AbstractStreamIndexingTest.INPUT_FORMAT;
+ }
+
+}
diff --git
a/integration-tests/src/test/java/org/apache/druid/tests/indexer/AbstractIndexerTest.java
b/integration-tests/src/test/java/org/apache/druid/tests/indexer/AbstractIndexerTest.java
index efe78412fc4..c3549033684 100644
---
a/integration-tests/src/test/java/org/apache/druid/tests/indexer/AbstractIndexerTest.java
+++
b/integration-tests/src/test/java/org/apache/druid/tests/indexer/AbstractIndexerTest.java
@@ -28,6 +28,7 @@ import org.apache.druid.java.util.common.ISE;
import org.apache.druid.java.util.common.Intervals;
import org.apache.druid.java.util.common.StringUtils;
import org.apache.druid.java.util.common.logger.Logger;
+import org.apache.druid.query.aggregation.LongSumAggregatorFactory;
import org.apache.druid.testing.IntegrationTestingConfig;
import org.apache.druid.testing.clients.CoordinatorResourceTestClient;
import org.apache.druid.testing.clients.OverlordResourceTestClient;
@@ -159,6 +160,45 @@ public abstract class AbstractIndexerTest
);
}
+ /**
+ * Retries until segments have been loaded.
+ */
+ protected void waitForSegmentsToLoad(String dataSource)
+ {
+ ITRetryUtil.retryUntilTrue(
+ () -> coordinator.areSegmentsLoaded(dataSource),
+ "Segments are loaded"
+ );
+ }
+
+ /**
+ * Retries until the segment count is as expected.
+ */
+ protected void waitUntilDatasourceSegmentCountEquals(String dataSource, int
numExpectedSegments)
+ {
+ ITRetryUtil.retryUntilEquals(
+ () -> coordinator.getFullSegmentsMetadata(dataSource).size(),
+ numExpectedSegments,
+ "Segment count"
+ );
+ }
+
+ /**
+ * Retries until the total row count is as expected.
+ */
+ protected void waitUntilDatasourceRowCountEquals(String dataSource, long
totalRows)
+ {
+ ITRetryUtil.retryUntilEquals(
+ () -> queryHelper.countRows(
+ dataSource,
+ Intervals.ETERNITY,
+ name -> new LongSumAggregatorFactory(name, "count")
+ ),
+ totalRows,
+ "Total row count in datasource"
+ );
+ }
+
public static String getResourceAsString(String file) throws IOException
{
try (final InputStream inputStream = getResourceAsStream(file)) {
diff --git
a/integration-tests/src/test/java/org/apache/druid/tests/indexer/AbstractKafkaIndexingServiceTest.java
b/integration-tests/src/test/java/org/apache/druid/tests/indexer/AbstractKafkaIndexingServiceTest.java
index 06f2841c4da..af86af7245e 100644
---
a/integration-tests/src/test/java/org/apache/druid/tests/indexer/AbstractKafkaIndexingServiceTest.java
+++
b/integration-tests/src/test/java/org/apache/druid/tests/indexer/AbstractKafkaIndexingServiceTest.java
@@ -21,6 +21,7 @@ package org.apache.druid.tests.indexer;
import com.google.common.base.Preconditions;
import org.apache.druid.indexing.kafka.KafkaConsumerConfigs;
+import org.apache.druid.indexing.seekablestream.supervisor.LagAggregator;
import org.apache.druid.java.util.common.StringUtils;
import org.apache.druid.testing.IntegrationTestingConfig;
import org.apache.druid.testing.utils.KafkaAdminClient;
@@ -55,6 +56,8 @@ public abstract class AbstractKafkaIndexingServiceTest
extends AbstractStreamInd
String parserType,
String parserOrInputFormat,
List<String> dimensions,
+ Map<String, Object> context,
+ LagAggregator lagAggregator,
IntegrationTestingConfig config
)
{
@@ -129,6 +132,16 @@ public abstract class AbstractKafkaIndexingServiceTest
extends AbstractStreamInd
"%%DIMENSIONS%%",
jsonMapper.writeValueAsString(dimensions)
);
+ spec = StringUtils.replace(
+ spec,
+ "%%CONTEXT%%",
+ jsonMapper.writeValueAsString(context)
+ );
+ spec = StringUtils.replace(
+ spec,
+ "%%LAG_AGGREGATOR%%",
+ jsonMapper.writeValueAsString(lagAggregator)
+ );
return StringUtils.replace(
spec,
"%%STREAM_PROPERTIES_VALUE%%",
diff --git
a/integration-tests/src/test/java/org/apache/druid/tests/indexer/AbstractKinesisIndexingServiceTest.java
b/integration-tests/src/test/java/org/apache/druid/tests/indexer/AbstractKinesisIndexingServiceTest.java
index b83ea95ec17..009775e19af 100644
---
a/integration-tests/src/test/java/org/apache/druid/tests/indexer/AbstractKinesisIndexingServiceTest.java
+++
b/integration-tests/src/test/java/org/apache/druid/tests/indexer/AbstractKinesisIndexingServiceTest.java
@@ -19,6 +19,7 @@
package org.apache.druid.tests.indexer;
+import org.apache.druid.indexing.seekablestream.supervisor.LagAggregator;
import org.apache.druid.java.util.common.StringUtils;
import org.apache.druid.java.util.common.logger.Logger;
import org.apache.druid.testing.IntegrationTestingConfig;
@@ -29,6 +30,7 @@ import org.apache.druid.testing.utils.StreamEventWriter;
import javax.annotation.Nullable;
import java.util.List;
+import java.util.Map;
import java.util.function.Function;
public abstract class AbstractKinesisIndexingServiceTest extends
AbstractStreamIndexingTest
@@ -61,6 +63,8 @@ public abstract class AbstractKinesisIndexingServiceTest
extends AbstractStreamI
String parserType,
String parserOrInputFormat,
List<String> dimensions,
+ Map<String, Object> context,
+ LagAggregator lagAggregator,
IntegrationTestingConfig config
)
{
@@ -129,6 +133,16 @@ public abstract class AbstractKinesisIndexingServiceTest
extends AbstractStreamI
"%%DIMENSIONS%%",
jsonMapper.writeValueAsString(dimensions)
);
+ spec = StringUtils.replace(
+ spec,
+ "%%CONTEXT%%",
+ jsonMapper.writeValueAsString(context)
+ );
+ spec = StringUtils.replace(
+ spec,
+ "%%LAG_AGGREGATOR%%",
+ jsonMapper.writeValueAsString(lagAggregator)
+ );
return StringUtils.replace(
spec,
"%%STREAM_PROPERTIES_VALUE%%",
diff --git
a/integration-tests/src/test/java/org/apache/druid/tests/indexer/AbstractStreamIndexingTest.java
b/integration-tests/src/test/java/org/apache/druid/tests/indexer/AbstractStreamIndexingTest.java
index 8cc8388ba47..49a521b7148 100644
---
a/integration-tests/src/test/java/org/apache/druid/tests/indexer/AbstractStreamIndexingTest.java
+++
b/integration-tests/src/test/java/org/apache/druid/tests/indexer/AbstractStreamIndexingTest.java
@@ -24,6 +24,7 @@ import com.google.common.collect.ImmutableMap;
import com.google.common.collect.Iterables;
import com.google.inject.Inject;
import org.apache.druid.indexing.overlord.supervisor.SupervisorStateManager;
+import org.apache.druid.indexing.seekablestream.supervisor.LagAggregator;
import org.apache.druid.java.util.common.DateTimes;
import org.apache.druid.java.util.common.IAE;
import org.apache.druid.java.util.common.ISE;
@@ -146,6 +147,8 @@ public abstract class AbstractStreamIndexingTest extends
AbstractIndexerTest
String parserType,
String parserOrInputFormat,
List<String> dimensions,
+ Map<String, Object> context,
+ LagAggregator lagAggregator,
IntegrationTestingConfig config
);
@@ -948,9 +951,15 @@ public abstract class AbstractStreamIndexingTest extends
AbstractIndexerTest
{
private final String streamName;
private final String fullDatasourceName;
+ private final String parserType;
+ private final String parserOrInputFormat;
+ private final List<String> dimensions;
+
+ private final Function<String, String> streamQueryPropsTransform;
+
private String supervisorId;
- private Function<String, String> streamIngestionPropsTransform;
- private Function<String, String> streamQueryPropsTransform;
+ private LagAggregator lagAggregator;
+ private Map<String, Object> context = Map.of();
public GeneratedTestConfig(String parserType, String parserOrInputFormat)
throws Exception
{
@@ -959,7 +968,10 @@ public abstract class AbstractStreamIndexingTest extends
AbstractIndexerTest
public GeneratedTestConfig(String parserType, String parserOrInputFormat,
List<String> dimensions) throws Exception
{
- streamName = getTestNamePrefix() + "_index_test_" + UUID.randomUUID();
+ this.parserType = parserType;
+ this.parserOrInputFormat = parserOrInputFormat;
+ this.dimensions = dimensions;
+ this.streamName = getTestNamePrefix() + "_index_test_" +
UUID.randomUUID();
String datasource = getTestNamePrefix() + "_indexing_service_test_" +
UUID.randomUUID();
Map<String, String> tags = ImmutableMap.of(
STREAM_EXPIRE_TAG,
@@ -974,17 +986,21 @@ public abstract class AbstractStreamIndexingTest extends
AbstractIndexerTest
"Wait for stream active"
);
fullDatasourceName = datasource + config.getExtraDatasourceNameSuffix();
- streamIngestionPropsTransform = generateStreamIngestionPropsTransform(
- streamName,
- fullDatasourceName,
- parserType,
- parserOrInputFormat,
- dimensions,
- config
- );
streamQueryPropsTransform =
generateStreamQueryPropsTransform(streamName, fullDatasourceName);
}
+ public GeneratedTestConfig withContext(Map<String, Object> context)
+ {
+ this.context = context;
+ return this;
+ }
+
+ public GeneratedTestConfig withLagAggregator(LagAggregator lagAggregator)
+ {
+ this.lagAggregator = lagAggregator;
+ return this;
+ }
+
public String getSupervisorId()
{
return supervisorId;
@@ -1007,7 +1023,16 @@ public abstract class AbstractStreamIndexingTest extends
AbstractIndexerTest
public Function<String, String> getStreamIngestionPropsTransform()
{
- return streamIngestionPropsTransform;
+ return generateStreamIngestionPropsTransform(
+ streamName,
+ fullDatasourceName,
+ parserType,
+ parserOrInputFormat,
+ dimensions,
+ context,
+ lagAggregator,
+ config
+ );
}
public Function<String, String> getStreamQueryPropsTransform()
diff --git
a/integration-tests/src/test/resources/stream/data/supervisor_spec_template.json
b/integration-tests/src/test/resources/stream/data/supervisor_spec_template.json
index b55028dc16f..bb22cdc6c02 100644
---
a/integration-tests/src/test/resources/stream/data/supervisor_spec_template.json
+++
b/integration-tests/src/test/resources/stream/data/supervisor_spec_template.json
@@ -52,6 +52,8 @@
"replicas": 1,
"taskDuration": "PT120S",
"%%USE_EARLIEST_KEY%%": true,
- "inputFormat" : %%INPUT_FORMAT%%
- }
+ "inputFormat" : %%INPUT_FORMAT%%,
+ "lagAggregator": %%LAG_AGGREGATOR%%
+ },
+ "context": %%CONTEXT%%
}
diff --git
a/server/src/test/java/org/apache/druid/client/coordinator/NoopCoordinatorClient.java
b/server/src/main/java/org/apache/druid/client/coordinator/NoopCoordinatorClient.java
similarity index 100%
rename from
server/src/test/java/org/apache/druid/client/coordinator/NoopCoordinatorClient.java
rename to
server/src/main/java/org/apache/druid/client/coordinator/NoopCoordinatorClient.java
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]