This is an automated email from the ASF dual-hosted git repository.
abhishek pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/druid.git
The following commit(s) were added to refs/heads/master by this push:
new 594545da55 Adds cluster level idleConfig setting for supervisor
(#13311)
594545da55 is described below
commit 594545da55e3502269a0275e2140ad95d0aa56a5
Author: Tejaswini Bandlamudi <[email protected]>
AuthorDate: Tue Nov 8 14:54:14 2022 +0530
Adds cluster level idleConfig setting for supervisor (#13311)
* adds cluster level idleConfig
* updates docs
* refactoring
* spelling nit
* nit
* nit
* refactoring
---
docs/configuration/index.md | 4 ++
.../extensions-core/kafka-supervisor-reference.md | 2 +-
.../supervisor/KafkaSupervisorIOConfigTest.java | 3 +-
.../kafka/supervisor/KafkaSupervisorTest.java | 53 +++++++++++++++++++++-
.../supervisor/KinesisSupervisorIOConfig.java | 3 +-
.../seekablestream/supervisor/IdleConfig.java | 18 ++++----
.../supervisor/SeekableStreamSupervisor.java | 21 ++++++++-
.../SeekableStreamSupervisorSpecTest.java | 1 -
.../supervisor/SupervisorStateManagerConfig.java | 16 +++++++
.../supervisor/SupervisorStateManagerTest.java | 27 ++++++++++-
10 files changed, 132 insertions(+), 16 deletions(-)
diff --git a/docs/configuration/index.md b/docs/configuration/index.md
index 0f99575b13..6fb3201d98 100644
--- a/docs/configuration/index.md
+++ b/docs/configuration/index.md
@@ -1152,6 +1152,10 @@ There are additional configs for autoscaling (if it is
enabled):
|`druid.supervisor.taskUnhealthinessThreshold`|The number of consecutive task
failures before the supervisor is considered unhealthy.|3|
|`druid.supervisor.storeStackTrace`|Whether full stack traces of supervisor
exceptions should be stored and returned by the supervisor `/status`
endpoint.|false|
|`druid.supervisor.maxStoredExceptionEvents`|The maximum number of exception
events that can be returned through the supervisor `/status`
endpoint.|`max(healthinessThreshold, unhealthinessThreshold)`|
+|`druid.supervisor.idleConfig.enabled`|If `true`, supervisor can become idle
if there is no data on input stream/topic for some time.|false|
+|`druid.supervisor.idleConfig.inactiveAfterMillis`|Supervisor is marked as
idle if all existing data has been read from input topic and no new data has
been published for `inactiveAfterMillis` milliseconds.|`600_000`|
+
+The `druid.supervisor.idleConfig.*` specified in the runtime properties of the
overlord defines the default behavior for the entire cluster. See [Idle
Configuration in Kafka Supervisor
IOConfig](../development/extensions-core/kafka-supervisor-reference.md#kafkasupervisorioconfig)
to override it for an individual supervisor.
#### Overlord Dynamic Configuration
diff --git a/docs/development/extensions-core/kafka-supervisor-reference.md
b/docs/development/extensions-core/kafka-supervisor-reference.md
index f1153c7112..6f94f90ffc 100644
--- a/docs/development/extensions-core/kafka-supervisor-reference.md
+++ b/docs/development/extensions-core/kafka-supervisor-reference.md
@@ -87,7 +87,7 @@ This topic contains configuration reference information for
the Apache Kafka sup
| Property | Description | Required |
| ------------- | ------------- | ------------- |
| `enabled` | If `true`, Kafka supervisor will become idle if there is no data
on input stream/topic for some time. | no (default == false) |
-| `inactiveAfterMillis` | Supervisor is marked as idle if all existing data
has been read from input topic and no new data has been published for
`inactiveAfterMillis` milliseconds. | no (default == 600000) |
+| `inactiveAfterMillis` | Supervisor is marked as idle if all existing data
has been read from input topic and no new data has been published for
`inactiveAfterMillis` milliseconds. | no (default == `600_000`) |
The following example demonstrates supervisor spec with `lagBased` autoScaler
and idle config enabled:
```json
diff --git
a/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/supervisor/KafkaSupervisorIOConfigTest.java
b/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/supervisor/KafkaSupervisorIOConfigTest.java
index b6a95769b8..5a4a15590c 100644
---
a/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/supervisor/KafkaSupervisorIOConfigTest.java
+++
b/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/supervisor/KafkaSupervisorIOConfigTest.java
@@ -326,6 +326,7 @@ public class KafkaSupervisorIOConfigTest
{
HashMap<String, Object> idleConfig = new HashMap<>();
idleConfig.put("enabled", true);
+ idleConfig.put("inactiveAfterMillis", 600000L);
final Map<String, Object> consumerProperties =
KafkaConsumerConfigs.getConsumerProperties();
consumerProperties.put("bootstrap.servers", "localhost:8082");
@@ -354,6 +355,6 @@ public class KafkaSupervisorIOConfigTest
Assert.assertNotNull(kafkaSupervisorIOConfig1.getIdleConfig());
Assert.assertTrue(kafkaSupervisorIOConfig1.getIdleConfig().isEnabled());
- Assert.assertEquals(600000L,
kafkaSupervisorIOConfig1.getIdleConfig().getInactiveAfterMillis());
+ Assert.assertEquals(Long.valueOf(600000),
kafkaSupervisorIOConfig1.getIdleConfig().getInactiveAfterMillis());
}
}
diff --git
a/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/supervisor/KafkaSupervisorTest.java
b/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/supervisor/KafkaSupervisorTest.java
index bcb0c9ed1c..f81b62bdce 100644
---
a/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/supervisor/KafkaSupervisorTest.java
+++
b/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/supervisor/KafkaSupervisorTest.java
@@ -2112,6 +2112,57 @@ public class KafkaSupervisorTest extends EasyMockSupport
Assert.assertEquals(SupervisorStateManager.BasicState.SUSPENDED,
supervisor.getState());
}
+ @Test
+ public void testSupervisorIsIdleIfStreamInactiveWhenSuspended() throws
Exception
+ {
+ Map<String, String> config = ImmutableMap.of("idleConfig.enabled", "false",
+
"idleConfig.inactiveAfterMillis", "200");
+ supervisorConfig = OBJECT_MAPPER.convertValue(config,
SupervisorStateManagerConfig.class);
+ supervisor = getTestableSupervisorForIdleBehaviour(
+ 1,
+ 2,
+ true,
+ "PT10S",
+ null,
+ null,
+ false,
+ new IdleConfig(true, null)
+ );
+ addSomeEvents(1);
+
+
EasyMock.expect(taskMaster.getTaskQueue()).andReturn(Optional.of(taskQueue)).anyTimes();
+
EasyMock.expect(taskMaster.getTaskRunner()).andReturn(Optional.of(taskRunner)).anyTimes();
+
EasyMock.expect(taskStorage.getActiveTasksByDatasource(DATASOURCE)).andReturn(ImmutableList.of()).anyTimes();
+ taskRunner.registerListener(EasyMock.anyObject(TaskRunnerListener.class),
EasyMock.anyObject(Executor.class));
+
EasyMock.expect(indexerMetadataStorageCoordinator.retrieveDataSourceMetadata(DATASOURCE)).andReturn(
+ new KafkaDataSourceMetadata(
+ new SeekableStreamEndSequenceNumbers<Integer, Long>(topic,
ImmutableMap.of(0, 2L, 1, 2L, 2, 2L))
+ )
+ ).anyTimes();
+
EasyMock.expect(taskQueue.add(EasyMock.anyObject())).andReturn(true).anyTimes();
+
+ replayAll();
+
+ supervisor.start();
+ supervisor.updateCurrentAndLatestOffsets();
+ supervisor.runInternal();
+ verifyAll();
+
+ Thread.sleep(100);
+ supervisor.updateCurrentAndLatestOffsets();
+ supervisor.runInternal();
+
+ Thread.sleep(100);
+ supervisor.updateCurrentAndLatestOffsets();
+ supervisor.runInternal();
+
+ Thread.sleep(100);
+ supervisor.updateCurrentAndLatestOffsets();
+ supervisor.runInternal();
+
+ Assert.assertEquals(SupervisorStateManager.BasicState.IDLE,
supervisor.getState());
+ }
+
@Test
public void
testSupervisorIsIdleIfStreamInactiveWhenNoActiveTasksAndFewPendingTasks()
throws Exception
{
@@ -4314,7 +4365,7 @@ public class KafkaSupervisorTest extends EasyMockSupport
new NoopServiceEmitter(),
new DruidMonitorSchedulerConfig(),
rowIngestionMetersFactory,
- new SupervisorStateManagerConfig()
+ supervisorConfig
),
rowIngestionMetersFactory
);
diff --git
a/extensions-core/kinesis-indexing-service/src/main/java/org/apache/druid/indexing/kinesis/supervisor/KinesisSupervisorIOConfig.java
b/extensions-core/kinesis-indexing-service/src/main/java/org/apache/druid/indexing/kinesis/supervisor/KinesisSupervisorIOConfig.java
index 38d92fb880..c31b834bf1 100644
---
a/extensions-core/kinesis-indexing-service/src/main/java/org/apache/druid/indexing/kinesis/supervisor/KinesisSupervisorIOConfig.java
+++
b/extensions-core/kinesis-indexing-service/src/main/java/org/apache/druid/indexing/kinesis/supervisor/KinesisSupervisorIOConfig.java
@@ -25,6 +25,7 @@ import com.google.common.base.Preconditions;
import org.apache.druid.data.input.InputFormat;
import org.apache.druid.indexing.kinesis.KinesisIndexTaskIOConfig;
import org.apache.druid.indexing.kinesis.KinesisRegion;
+import org.apache.druid.indexing.seekablestream.supervisor.IdleConfig;
import
org.apache.druid.indexing.seekablestream.supervisor.SeekableStreamSupervisorIOConfig;
import
org.apache.druid.indexing.seekablestream.supervisor.autoscaler.AutoScalerConfig;
import org.joda.time.DateTime;
@@ -91,7 +92,7 @@ public class KinesisSupervisorIOConfig extends
SeekableStreamSupervisorIOConfig
earlyMessageRejectionPeriod,
autoScalerConfig,
lateMessageRejectionStartDateTime,
- null
+ new IdleConfig(null, null)
);
this.endpoint = endpoint != null
diff --git
a/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/IdleConfig.java
b/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/IdleConfig.java
index 6b017399dd..40734815aa 100644
---
a/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/IdleConfig.java
+++
b/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/IdleConfig.java
@@ -25,10 +25,13 @@ import com.google.common.base.Preconditions;
import javax.annotation.Nullable;
+/**
+ * Defines if and when {@link SeekableStreamSupervisor} can become idle.
+ */
public class IdleConfig
{
private final boolean enabled;
- private final long inactiveAfterMillis;
+ private final Long inactiveAfterMillis;
@JsonCreator
public IdleConfig(
@@ -36,13 +39,12 @@ public class IdleConfig
@Nullable @JsonProperty("inactiveAfterMillis") Long inactiveAfterMillis
)
{
+ Preconditions.checkArgument(
+ inactiveAfterMillis == null || inactiveAfterMillis > 0,
+ "inactiveAfterMillis should be a postive number"
+ );
this.enabled = enabled != null && enabled;
- this.inactiveAfterMillis = inactiveAfterMillis != null
- ? inactiveAfterMillis
- : 600_000L;
-
- Preconditions.checkArgument(this.inactiveAfterMillis > 0,
- "inactiveAfterMillis should be a postive
number");
+ this.inactiveAfterMillis = inactiveAfterMillis;
}
@JsonProperty
@@ -52,7 +54,7 @@ public class IdleConfig
}
@JsonProperty
- public long getInactiveAfterMillis()
+ public Long getInactiveAfterMillis()
{
return this.inactiveAfterMillis;
}
diff --git
a/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisor.java
b/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisor.java
index 9c90cfe983..8a79825a5c 100644
---
a/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisor.java
+++
b/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisor.java
@@ -746,6 +746,7 @@ public abstract class
SeekableStreamSupervisor<PartitionIdType, SequenceOffsetTy
// snapshots latest sequences from stream to be verified in next run cycle
of inactive stream check
private final Map<PartitionIdType, SequenceOffsetType>
previousSequencesFromStream = new HashMap<>();
private long lastActiveTimeMillis;
+ private final IdleConfig idleConfig;
public SeekableStreamSupervisor(
final String supervisorId,
@@ -804,6 +805,23 @@ public abstract class
SeekableStreamSupervisor<PartitionIdType, SequenceOffsetTy
: Math.min(10, this.ioConfig.getTaskCount() *
this.ioConfig.getReplicas()));
}
+ IdleConfig specIdleConfig = spec.getIoConfig().getIdleConfig();
+ if (specIdleConfig != null) {
+ if (specIdleConfig.getInactiveAfterMillis() != null) {
+ idleConfig = specIdleConfig;
+ } else {
+ idleConfig = new IdleConfig(
+ specIdleConfig.isEnabled(),
+ spec.getSupervisorStateManagerConfig().getInactiveAfterMillis()
+ );
+ }
+ } else {
+ idleConfig = new IdleConfig(
+ spec.getSupervisorStateManagerConfig().isIdleConfigEnabled(),
+ spec.getSupervisorStateManagerConfig().getInactiveAfterMillis()
+ );
+ }
+
this.workerExec = MoreExecutors.listeningDecorator(
Execs.multiThreaded(
workerThreads,
@@ -3292,8 +3310,7 @@ public abstract class
SeekableStreamSupervisor<PartitionIdType, SequenceOffsetTy
private void checkIfStreamInactiveAndTurnSupervisorIdle()
{
- IdleConfig idleConfig = spec.getIoConfig().getIdleConfig();
- if ((idleConfig == null || !idleConfig.isEnabled()) || spec.isSuspended())
{
+ if (!idleConfig.isEnabled() || spec.isSuspended()) {
return;
}
diff --git
a/indexing-service/src/test/java/org/apache/druid/indexing/seekablestream/SeekableStreamSupervisorSpecTest.java
b/indexing-service/src/test/java/org/apache/druid/indexing/seekablestream/SeekableStreamSupervisorSpecTest.java
index 7f67393fc9..519eff9c09 100644
---
a/indexing-service/src/test/java/org/apache/druid/indexing/seekablestream/SeekableStreamSupervisorSpecTest.java
+++
b/indexing-service/src/test/java/org/apache/druid/indexing/seekablestream/SeekableStreamSupervisorSpecTest.java
@@ -973,7 +973,6 @@ public class SeekableStreamSupervisorSpecTest extends
EasyMockSupport
};
Assert.assertTrue(Objects.requireNonNull(spec.getIoConfig().getIdleConfig()).isEnabled());
- Assert.assertEquals(600000L,
spec.getIoConfig().getIdleConfig().getInactiveAfterMillis());
}
private static DataSchema getDataSchema()
diff --git
a/server/src/main/java/org/apache/druid/indexing/overlord/supervisor/SupervisorStateManagerConfig.java
b/server/src/main/java/org/apache/druid/indexing/overlord/supervisor/SupervisorStateManagerConfig.java
index 5dde7a399a..e3d1015f7e 100644
---
a/server/src/main/java/org/apache/druid/indexing/overlord/supervisor/SupervisorStateManagerConfig.java
+++
b/server/src/main/java/org/apache/druid/indexing/overlord/supervisor/SupervisorStateManagerConfig.java
@@ -46,6 +46,12 @@ public class SupervisorStateManagerConfig
@JsonProperty
private int maxStoredExceptionEvents = Math.max(unhealthinessThreshold,
healthinessThreshold);
+ @JsonProperty("idleConfig.enabled")
+ private boolean idleConfigEnabled = false;
+
+ @JsonProperty("idleConfig.inactiveAfterMillis")
+ private long inactiveAfterMillis = 600_000L;
+
public SupervisorStateManagerConfig()
{
@@ -85,4 +91,14 @@ public class SupervisorStateManagerConfig
{
return maxStoredExceptionEvents;
}
+
+ public boolean isIdleConfigEnabled()
+ {
+ return idleConfigEnabled;
+ }
+
+ public long getInactiveAfterMillis()
+ {
+ return inactiveAfterMillis;
+ }
}
diff --git
a/server/src/test/java/org/apache/druid/indexing/overlord/supervisor/SupervisorStateManagerTest.java
b/server/src/test/java/org/apache/druid/indexing/overlord/supervisor/SupervisorStateManagerTest.java
index 01bdeb9e7d..367577a291 100644
---
a/server/src/test/java/org/apache/druid/indexing/overlord/supervisor/SupervisorStateManagerTest.java
+++
b/server/src/test/java/org/apache/druid/indexing/overlord/supervisor/SupervisorStateManagerTest.java
@@ -19,19 +19,30 @@
package org.apache.druid.indexing.overlord.supervisor;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.google.common.collect.ImmutableMap;
+import org.apache.druid.jackson.DefaultObjectMapper;
import org.junit.Assert;
import org.junit.Test;
+import java.util.Map;
+
public class SupervisorStateManagerTest
{
+ SupervisorStateManagerConfig stateManagerConfig;
+
@Test
public void testMarkRunFinishedIfSupervisorIsIdle()
{
+ stateManagerConfig = new SupervisorStateManagerConfig();
SupervisorStateManager supervisorStateManager = new SupervisorStateManager(
- new SupervisorStateManagerConfig(),
+ stateManagerConfig,
false
);
+ Assert.assertFalse(stateManagerConfig.isIdleConfigEnabled());
+ Assert.assertEquals(600000, stateManagerConfig.getInactiveAfterMillis());
+
supervisorStateManager.markRunFinished();
Assert.assertEquals(SupervisorStateManager.BasicState.RUNNING,
supervisorStateManager.getSupervisorState());
@@ -41,4 +52,18 @@ public class SupervisorStateManagerTest
Assert.assertEquals(SupervisorStateManager.BasicState.IDLE,
supervisorStateManager.getSupervisorState());
}
+
+ @Test
+ public void testIdleConfigSerde()
+ {
+ ObjectMapper mapper = new DefaultObjectMapper();
+ Map<String, String> config = ImmutableMap.of(
+ "idleConfig.enabled", "true",
+ "idleConfig.inactiveAfterMillis", "60000"
+ );
+ stateManagerConfig = mapper.convertValue(config,
SupervisorStateManagerConfig.class);
+
+ Assert.assertTrue(stateManagerConfig.isIdleConfigEnabled());
+ Assert.assertEquals(60000, stateManagerConfig.getInactiveAfterMillis());
+ }
}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]