This is an automated email from the ASF dual-hosted git repository.

abhishek pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/druid.git


The following commit(s) were added to refs/heads/master by this push:
     new 594545da55 Adds cluster level idleConfig setting for supervisor 
(#13311)
594545da55 is described below

commit 594545da55e3502269a0275e2140ad95d0aa56a5
Author: Tejaswini Bandlamudi <[email protected]>
AuthorDate: Tue Nov 8 14:54:14 2022 +0530

    Adds cluster level idleConfig setting for supervisor (#13311)
    
    * adds cluster level idleConfig
    
    * updates docs
    
    * refactoring
    
    * spelling nit
    
    * nit
    
    * nit
    
    * refactoring
---
 docs/configuration/index.md                        |  4 ++
 .../extensions-core/kafka-supervisor-reference.md  |  2 +-
 .../supervisor/KafkaSupervisorIOConfigTest.java    |  3 +-
 .../kafka/supervisor/KafkaSupervisorTest.java      | 53 +++++++++++++++++++++-
 .../supervisor/KinesisSupervisorIOConfig.java      |  3 +-
 .../seekablestream/supervisor/IdleConfig.java      | 18 ++++----
 .../supervisor/SeekableStreamSupervisor.java       | 21 ++++++++-
 .../SeekableStreamSupervisorSpecTest.java          |  1 -
 .../supervisor/SupervisorStateManagerConfig.java   | 16 +++++++
 .../supervisor/SupervisorStateManagerTest.java     | 27 ++++++++++-
 10 files changed, 132 insertions(+), 16 deletions(-)

diff --git a/docs/configuration/index.md b/docs/configuration/index.md
index 0f99575b13..6fb3201d98 100644
--- a/docs/configuration/index.md
+++ b/docs/configuration/index.md
@@ -1152,6 +1152,10 @@ There are additional configs for autoscaling (if it is 
enabled):
 |`druid.supervisor.taskUnhealthinessThreshold`|The number of consecutive task 
failures before the supervisor is considered unhealthy.|3|
 |`druid.supervisor.storeStackTrace`|Whether full stack traces of supervisor 
exceptions should be stored and returned by the supervisor `/status` 
endpoint.|false|
 |`druid.supervisor.maxStoredExceptionEvents`|The maximum number of exception 
events that can be returned through the supervisor `/status` 
endpoint.|`max(healthinessThreshold, unhealthinessThreshold)`|
+|`druid.supervisor.idleConfig.enabled`|If `true`, supervisor can become idle 
if there is no data on input stream/topic for some time.|false|
+|`druid.supervisor.idleConfig.inactiveAfterMillis`|Supervisor is marked as 
idle if all existing data has been read from input topic and no new data has 
been published for `inactiveAfterMillis` milliseconds.|`600_000`|
+
+The `druid.supervisor.idleConfig.*` specified in the runtime properties of the 
overlord defines the default behavior for the entire cluster. See [Idle 
Configuration in Kafka Supervisor 
IOConfig](../development/extensions-core/kafka-supervisor-reference.md#kafkasupervisorioconfig)
 to override it for an individual supervisor.
 
 #### Overlord Dynamic Configuration
 
diff --git a/docs/development/extensions-core/kafka-supervisor-reference.md 
b/docs/development/extensions-core/kafka-supervisor-reference.md
index f1153c7112..6f94f90ffc 100644
--- a/docs/development/extensions-core/kafka-supervisor-reference.md
+++ b/docs/development/extensions-core/kafka-supervisor-reference.md
@@ -87,7 +87,7 @@ This topic contains configuration reference information for 
the Apache Kafka sup
 | Property | Description | Required |
 | ------------- | ------------- | ------------- |
 | `enabled` | If `true`, Kafka supervisor will become idle if there is no data 
on input stream/topic for some time. | no (default == false) |
-| `inactiveAfterMillis` | Supervisor is marked as idle if all existing data 
has been read from input topic and no new data has been published for 
`inactiveAfterMillis` milliseconds. | no (default == 600000) |
+| `inactiveAfterMillis` | Supervisor is marked as idle if all existing data 
has been read from input topic and no new data has been published for 
`inactiveAfterMillis` milliseconds. | no (default == `600_000`) |
 
 The following example demonstrates supervisor spec with `lagBased` autoScaler 
and idle config enabled:
 ```json
diff --git 
a/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/supervisor/KafkaSupervisorIOConfigTest.java
 
b/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/supervisor/KafkaSupervisorIOConfigTest.java
index b6a95769b8..5a4a15590c 100644
--- 
a/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/supervisor/KafkaSupervisorIOConfigTest.java
+++ 
b/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/supervisor/KafkaSupervisorIOConfigTest.java
@@ -326,6 +326,7 @@ public class KafkaSupervisorIOConfigTest
   {
     HashMap<String, Object> idleConfig = new HashMap<>();
     idleConfig.put("enabled", true);
+    idleConfig.put("inactiveAfterMillis", 600000L);
 
     final Map<String, Object> consumerProperties = 
KafkaConsumerConfigs.getConsumerProperties();
     consumerProperties.put("bootstrap.servers", "localhost:8082");
@@ -354,6 +355,6 @@ public class KafkaSupervisorIOConfigTest
 
     Assert.assertNotNull(kafkaSupervisorIOConfig1.getIdleConfig());
     Assert.assertTrue(kafkaSupervisorIOConfig1.getIdleConfig().isEnabled());
-    Assert.assertEquals(600000L, 
kafkaSupervisorIOConfig1.getIdleConfig().getInactiveAfterMillis());
+    Assert.assertEquals(Long.valueOf(600000), 
kafkaSupervisorIOConfig1.getIdleConfig().getInactiveAfterMillis());
   }
 }
diff --git 
a/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/supervisor/KafkaSupervisorTest.java
 
b/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/supervisor/KafkaSupervisorTest.java
index bcb0c9ed1c..f81b62bdce 100644
--- 
a/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/supervisor/KafkaSupervisorTest.java
+++ 
b/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/supervisor/KafkaSupervisorTest.java
@@ -2112,6 +2112,57 @@ public class KafkaSupervisorTest extends EasyMockSupport
     Assert.assertEquals(SupervisorStateManager.BasicState.SUSPENDED, 
supervisor.getState());
   }
 
+  @Test
+  public void testSupervisorIsIdleIfStreamInactiveWhenSuspended() throws 
Exception
+  {
+    Map<String, String> config = ImmutableMap.of("idleConfig.enabled", "false",
+                                                 
"idleConfig.inactiveAfterMillis", "200");
+    supervisorConfig = OBJECT_MAPPER.convertValue(config, 
SupervisorStateManagerConfig.class);
+    supervisor = getTestableSupervisorForIdleBehaviour(
+        1,
+        2,
+        true,
+        "PT10S",
+        null,
+        null,
+        false,
+        new IdleConfig(true, null)
+    );
+    addSomeEvents(1);
+
+    
EasyMock.expect(taskMaster.getTaskQueue()).andReturn(Optional.of(taskQueue)).anyTimes();
+    
EasyMock.expect(taskMaster.getTaskRunner()).andReturn(Optional.of(taskRunner)).anyTimes();
+    
EasyMock.expect(taskStorage.getActiveTasksByDatasource(DATASOURCE)).andReturn(ImmutableList.of()).anyTimes();
+    taskRunner.registerListener(EasyMock.anyObject(TaskRunnerListener.class), 
EasyMock.anyObject(Executor.class));
+    
EasyMock.expect(indexerMetadataStorageCoordinator.retrieveDataSourceMetadata(DATASOURCE)).andReturn(
+        new KafkaDataSourceMetadata(
+            new SeekableStreamEndSequenceNumbers<Integer, Long>(topic, 
ImmutableMap.of(0, 2L, 1, 2L, 2, 2L))
+        )
+    ).anyTimes();
+    
EasyMock.expect(taskQueue.add(EasyMock.anyObject())).andReturn(true).anyTimes();
+
+    replayAll();
+
+    supervisor.start();
+    supervisor.updateCurrentAndLatestOffsets();
+    supervisor.runInternal();
+    verifyAll();
+
+    Thread.sleep(100);
+    supervisor.updateCurrentAndLatestOffsets();
+    supervisor.runInternal();
+
+    Thread.sleep(100);
+    supervisor.updateCurrentAndLatestOffsets();
+    supervisor.runInternal();
+
+    Thread.sleep(100);
+    supervisor.updateCurrentAndLatestOffsets();
+    supervisor.runInternal();
+
+    Assert.assertEquals(SupervisorStateManager.BasicState.IDLE, 
supervisor.getState());
+  }
+
   @Test
   public void 
testSupervisorIsIdleIfStreamInactiveWhenNoActiveTasksAndFewPendingTasks() 
throws Exception
   {
@@ -4314,7 +4365,7 @@ public class KafkaSupervisorTest extends EasyMockSupport
             new NoopServiceEmitter(),
             new DruidMonitorSchedulerConfig(),
             rowIngestionMetersFactory,
-            new SupervisorStateManagerConfig()
+            supervisorConfig
         ),
         rowIngestionMetersFactory
     );
diff --git 
a/extensions-core/kinesis-indexing-service/src/main/java/org/apache/druid/indexing/kinesis/supervisor/KinesisSupervisorIOConfig.java
 
b/extensions-core/kinesis-indexing-service/src/main/java/org/apache/druid/indexing/kinesis/supervisor/KinesisSupervisorIOConfig.java
index 38d92fb880..c31b834bf1 100644
--- 
a/extensions-core/kinesis-indexing-service/src/main/java/org/apache/druid/indexing/kinesis/supervisor/KinesisSupervisorIOConfig.java
+++ 
b/extensions-core/kinesis-indexing-service/src/main/java/org/apache/druid/indexing/kinesis/supervisor/KinesisSupervisorIOConfig.java
@@ -25,6 +25,7 @@ import com.google.common.base.Preconditions;
 import org.apache.druid.data.input.InputFormat;
 import org.apache.druid.indexing.kinesis.KinesisIndexTaskIOConfig;
 import org.apache.druid.indexing.kinesis.KinesisRegion;
+import org.apache.druid.indexing.seekablestream.supervisor.IdleConfig;
 import 
org.apache.druid.indexing.seekablestream.supervisor.SeekableStreamSupervisorIOConfig;
 import 
org.apache.druid.indexing.seekablestream.supervisor.autoscaler.AutoScalerConfig;
 import org.joda.time.DateTime;
@@ -91,7 +92,7 @@ public class KinesisSupervisorIOConfig extends 
SeekableStreamSupervisorIOConfig
         earlyMessageRejectionPeriod,
         autoScalerConfig,
         lateMessageRejectionStartDateTime,
-        null
+        new IdleConfig(null, null)
     );
 
     this.endpoint = endpoint != null
diff --git 
a/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/IdleConfig.java
 
b/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/IdleConfig.java
index 6b017399dd..40734815aa 100644
--- 
a/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/IdleConfig.java
+++ 
b/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/IdleConfig.java
@@ -25,10 +25,13 @@ import com.google.common.base.Preconditions;
 
 import javax.annotation.Nullable;
 
+/**
+ * Defines if and when {@link SeekableStreamSupervisor} can become idle.
+ */
 public class IdleConfig
 {
   private final boolean enabled;
-  private final long inactiveAfterMillis;
+  private final Long inactiveAfterMillis;
 
   @JsonCreator
   public IdleConfig(
@@ -36,13 +39,12 @@ public class IdleConfig
       @Nullable @JsonProperty("inactiveAfterMillis") Long inactiveAfterMillis
   )
   {
+    Preconditions.checkArgument(
+        inactiveAfterMillis == null || inactiveAfterMillis > 0,
+        "inactiveAfterMillis should be a postive number"
+    );
     this.enabled = enabled != null && enabled;
-    this.inactiveAfterMillis = inactiveAfterMillis != null
-                               ? inactiveAfterMillis
-                               : 600_000L;
-
-    Preconditions.checkArgument(this.inactiveAfterMillis > 0,
-                                "inactiveAfterMillis should be a postive 
number");
+    this.inactiveAfterMillis = inactiveAfterMillis;
   }
 
   @JsonProperty
@@ -52,7 +54,7 @@ public class IdleConfig
   }
 
   @JsonProperty
-  public long getInactiveAfterMillis()
+  public Long getInactiveAfterMillis()
   {
     return this.inactiveAfterMillis;
   }
diff --git 
a/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisor.java
 
b/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisor.java
index 9c90cfe983..8a79825a5c 100644
--- 
a/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisor.java
+++ 
b/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisor.java
@@ -746,6 +746,7 @@ public abstract class 
SeekableStreamSupervisor<PartitionIdType, SequenceOffsetTy
   // snapshots latest sequences from stream to be verified in next run cycle 
of inactive stream check
   private final Map<PartitionIdType, SequenceOffsetType> 
previousSequencesFromStream = new HashMap<>();
   private long lastActiveTimeMillis;
+  private final IdleConfig idleConfig;
 
   public SeekableStreamSupervisor(
       final String supervisorId,
@@ -804,6 +805,23 @@ public abstract class 
SeekableStreamSupervisor<PartitionIdType, SequenceOffsetTy
               : Math.min(10, this.ioConfig.getTaskCount() * 
this.ioConfig.getReplicas()));
     }
 
+    IdleConfig specIdleConfig = spec.getIoConfig().getIdleConfig();
+    if (specIdleConfig != null) {
+      if (specIdleConfig.getInactiveAfterMillis() != null) {
+        idleConfig = specIdleConfig;
+      } else {
+        idleConfig = new IdleConfig(
+            specIdleConfig.isEnabled(),
+            spec.getSupervisorStateManagerConfig().getInactiveAfterMillis()
+        );
+      }
+    } else {
+      idleConfig = new IdleConfig(
+          spec.getSupervisorStateManagerConfig().isIdleConfigEnabled(),
+          spec.getSupervisorStateManagerConfig().getInactiveAfterMillis()
+      );
+    }
+
     this.workerExec = MoreExecutors.listeningDecorator(
         Execs.multiThreaded(
             workerThreads,
@@ -3292,8 +3310,7 @@ public abstract class 
SeekableStreamSupervisor<PartitionIdType, SequenceOffsetTy
 
   private void checkIfStreamInactiveAndTurnSupervisorIdle()
   {
-    IdleConfig idleConfig = spec.getIoConfig().getIdleConfig();
-    if ((idleConfig == null || !idleConfig.isEnabled()) || spec.isSuspended()) 
{
+    if (!idleConfig.isEnabled() || spec.isSuspended()) {
       return;
     }
 
diff --git 
a/indexing-service/src/test/java/org/apache/druid/indexing/seekablestream/SeekableStreamSupervisorSpecTest.java
 
b/indexing-service/src/test/java/org/apache/druid/indexing/seekablestream/SeekableStreamSupervisorSpecTest.java
index 7f67393fc9..519eff9c09 100644
--- 
a/indexing-service/src/test/java/org/apache/druid/indexing/seekablestream/SeekableStreamSupervisorSpecTest.java
+++ 
b/indexing-service/src/test/java/org/apache/druid/indexing/seekablestream/SeekableStreamSupervisorSpecTest.java
@@ -973,7 +973,6 @@ public class SeekableStreamSupervisorSpecTest extends 
EasyMockSupport
     };
 
     
Assert.assertTrue(Objects.requireNonNull(spec.getIoConfig().getIdleConfig()).isEnabled());
-    Assert.assertEquals(600000L, 
spec.getIoConfig().getIdleConfig().getInactiveAfterMillis());
   }
 
   private static DataSchema getDataSchema()
diff --git 
a/server/src/main/java/org/apache/druid/indexing/overlord/supervisor/SupervisorStateManagerConfig.java
 
b/server/src/main/java/org/apache/druid/indexing/overlord/supervisor/SupervisorStateManagerConfig.java
index 5dde7a399a..e3d1015f7e 100644
--- 
a/server/src/main/java/org/apache/druid/indexing/overlord/supervisor/SupervisorStateManagerConfig.java
+++ 
b/server/src/main/java/org/apache/druid/indexing/overlord/supervisor/SupervisorStateManagerConfig.java
@@ -46,6 +46,12 @@ public class SupervisorStateManagerConfig
   @JsonProperty
   private int maxStoredExceptionEvents = Math.max(unhealthinessThreshold, 
healthinessThreshold);
 
+  @JsonProperty("idleConfig.enabled")
+  private boolean idleConfigEnabled = false;
+
+  @JsonProperty("idleConfig.inactiveAfterMillis")
+  private long inactiveAfterMillis = 600_000L;
+
   public SupervisorStateManagerConfig()
   {
 
@@ -85,4 +91,14 @@ public class SupervisorStateManagerConfig
   {
     return maxStoredExceptionEvents;
   }
+
+  public boolean isIdleConfigEnabled()
+  {
+    return idleConfigEnabled;
+  }
+
+  public long getInactiveAfterMillis()
+  {
+    return inactiveAfterMillis;
+  }
 }
diff --git 
a/server/src/test/java/org/apache/druid/indexing/overlord/supervisor/SupervisorStateManagerTest.java
 
b/server/src/test/java/org/apache/druid/indexing/overlord/supervisor/SupervisorStateManagerTest.java
index 01bdeb9e7d..367577a291 100644
--- 
a/server/src/test/java/org/apache/druid/indexing/overlord/supervisor/SupervisorStateManagerTest.java
+++ 
b/server/src/test/java/org/apache/druid/indexing/overlord/supervisor/SupervisorStateManagerTest.java
@@ -19,19 +19,30 @@
 
 package org.apache.druid.indexing.overlord.supervisor;
 
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.google.common.collect.ImmutableMap;
+import org.apache.druid.jackson.DefaultObjectMapper;
 import org.junit.Assert;
 import org.junit.Test;
 
+import java.util.Map;
+
 public class SupervisorStateManagerTest
 {
+  SupervisorStateManagerConfig stateManagerConfig;
+
   @Test
   public void testMarkRunFinishedIfSupervisorIsIdle()
   {
+    stateManagerConfig = new SupervisorStateManagerConfig();
     SupervisorStateManager supervisorStateManager = new SupervisorStateManager(
-        new SupervisorStateManagerConfig(),
+        stateManagerConfig,
         false
     );
 
+    Assert.assertFalse(stateManagerConfig.isIdleConfigEnabled());
+    Assert.assertEquals(600000, stateManagerConfig.getInactiveAfterMillis());
+
     supervisorStateManager.markRunFinished();
 
     Assert.assertEquals(SupervisorStateManager.BasicState.RUNNING, 
supervisorStateManager.getSupervisorState());
@@ -41,4 +52,18 @@ public class SupervisorStateManagerTest
 
     Assert.assertEquals(SupervisorStateManager.BasicState.IDLE, 
supervisorStateManager.getSupervisorState());
   }
+
+  @Test
+  public void testIdleConfigSerde()
+  {
+    ObjectMapper mapper = new DefaultObjectMapper();
+    Map<String, String> config = ImmutableMap.of(
+        "idleConfig.enabled", "true",
+        "idleConfig.inactiveAfterMillis", "60000"
+    );
+    stateManagerConfig = mapper.convertValue(config, 
SupervisorStateManagerConfig.class);
+
+    Assert.assertTrue(stateManagerConfig.isIdleConfigEnabled());
+    Assert.assertEquals(60000, stateManagerConfig.getInactiveAfterMillis());
+  }
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to