This is an automated email from the ASF dual-hosted git repository.

abhishekrb pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/druid.git


The following commit(s) were added to refs/heads/master by this push:
     new 348f5098dda Add supervisor/count metric for health and state 
monitoring (#19114)
348f5098dda is described below

commit 348f5098dda8992249e0418fb8f028a9311da4b9
Author: Aru Raghuwanshi <[email protected]>
AuthorDate: Wed Mar 11 19:50:36 2026 -0700

    Add supervisor/count metric for health and state monitoring (#19114)
    
    Adds a new supervisor/count metric when SupervisorStatsMonitor is enabled 
in druid.monitoring.monitors. The metric reports each supervisor’s state 
(RUNNING, SUSPENDED, UNHEALTHY_SUPERVISOR, etc.) for Prometheus, StatsD, and 
other metric systems.
    
    
    Available dimensions are supervisorId`, `type`, `state`, `dataSource`, 
`stream` (optional), `detailedState` (optional).
---
 docs/configuration/index.md                        |   5 +-
 docs/operations/metrics.md                         |   1 +
 .../src/main/resources/defaultMetrics.json         |   1 +
 .../overlord/supervisor/SupervisorManager.java     |  32 +++-
 .../overlord/supervisor/SupervisorManagerTest.java | 126 ++++++++++++++-
 processing/src/main/resources/defaultMetrics.json  |   1 +
 .../server/metrics/SupervisorStatsMonitor.java     |  68 +++++++++
 .../server/metrics/SupervisorStatsProvider.java    |  90 +++++++++++
 .../server/metrics/SupervisorStatsMonitorTest.java | 169 +++++++++++++++++++++
 .../java/org/apache/druid/cli/CliOverlord.java     |   2 +
 10 files changed, 490 insertions(+), 5 deletions(-)

diff --git a/docs/configuration/index.md b/docs/configuration/index.md
index 04e5dc2af58..713ba2f6b1d 100644
--- a/docs/configuration/index.md
+++ b/docs/configuration/index.md
@@ -1983,6 +1983,7 @@ The following table lists available monitors and the 
respective services where t
 |`org.apache.druid.server.emitter.HttpEmittingMonitor`|Reports internal 
metrics of `http` or `parametrized` emitter (see below). Must not be used with 
another emitter type. See the description of the metrics here: 
https://github.com/apache/druid/pull/4973.|Any|
 |`org.apache.druid.server.metrics.TaskCountStatsMonitor`|Reports how many 
ingestion tasks are currently running/pending/waiting and also the number of 
successful/failed tasks per emission period.|Overlord|
 |`org.apache.druid.server.metrics.TaskSlotCountStatsMonitor`|Reports metrics 
about task slot usage per emission period.|Overlord|
+|`org.apache.druid.server.metrics.SupervisorStatsMonitor`|Reports supervisor 
count and state, per supervisor per emission period.|Overlord|
 |`org.apache.druid.server.metrics.WorkerTaskCountStatsMonitor`|Reports how 
many ingestion tasks are currently running/pending/waiting, the number of 
successful/failed tasks, and metrics about task slot usage for the reporting 
worker, per emission period. |MiddleManager, Indexer|
 |`org.apache.druid.server.metrics.ServiceStatusMonitor`|Reports a heartbeat 
for the service.|Any|
 |`org.apache.druid.server.metrics.GroupByStatsMonitor`|Report metrics for 
groupBy queries like disk and merge buffer utilization. |Broker, Historical, 
Indexer, Peon|
@@ -1997,10 +1998,10 @@ All the services in your Druid deployment would have 
these two monitors.
 
 If you want any service specific monitors though, you need to add all the 
monitors you want to run for that service to the service's `runtime.properties` 
file even if they are listed in the common file. The service specific 
properties take precedence.
 
-The following example adds the `TaskCountStatsMonitor` and 
`TaskSlotCountStatsMonitor` as well as the `OshiSysMonitor` and `JvmMonitor` 
from the previous example to the Overlord service 
(`coordinator-overlord/runtime.properties`):
+The following example adds the `TaskCountStatsMonitor`, 
`TaskSlotCountStatsMonitor`, and `SupervisorStatsMonitor` as well as the 
`OshiSysMonitor` and `JvmMonitor` from the previous example to the Overlord 
service (`coordinator-overlord/runtime.properties`):
 
 ```properties
-druid.monitoring.monitors=["org.apache.druid.server.metrics.TaskCountStatsMonitor",
 "org.apache.druid.server.metrics.TaskSlotCountStatsMonitor", 
"org.apache.druid.java.util.metrics.OshiSysMonitor","org.apache.druid.java.util.metrics.JvmMonitor"]
+druid.monitoring.monitors=["org.apache.druid.server.metrics.TaskCountStatsMonitor",
 "org.apache.druid.server.metrics.TaskSlotCountStatsMonitor", 
"org.apache.druid.server.metrics.SupervisorStatsMonitor", 
"org.apache.druid.java.util.metrics.OshiSysMonitor","org.apache.druid.java.util.metrics.JvmMonitor"]
 ```
 
 If you don't include `OshiSysMonitor` and `JvmMonitor` in the Overlord's 
`runtime.properties` file, the monitors don't get loaded onto the Overlord 
despite being specified in the common file.
diff --git a/docs/operations/metrics.md b/docs/operations/metrics.md
index 34f38e7c344..582a8d60877 100644
--- a/docs/operations/metrics.md
+++ b/docs/operations/metrics.md
@@ -342,6 +342,7 @@ If the JVM does not support CPU time measurement for the 
current thread, `ingest
 |`task/running/count`|Number of current running tasks. This metric is 
available only if the `TaskCountStatsMonitor` module is 
included.|`dataSource`,`taskType`, `supervisorId`|Varies|
 |`task/pending/count`|Number of current pending tasks. This metric is 
available only if the `TaskCountStatsMonitor` module is 
included.|`dataSource`,`taskType`, `supervisorId`|Varies|
 |`task/waiting/count`|Number of current waiting tasks. This metric is 
available only if the `TaskCountStatsMonitor` module is 
included.|`dataSource`,`taskType`, `supervisorId`|Varies|
+|`supervisor/count`|Count of active supervisors. Each supervisor emits 1, 
tagged with its state, datasource, stream (when applicable), and detailed 
state. For possible `state` and `detailedState` values, see the [supervisor 
status report](../ingestion/supervisor.md#status-report). This metric is 
available only if the `SupervisorStatsMonitor` module is 
included.|`supervisorId`, `type`, `state`, `dataSource`, `stream` (optional), 
`detailedState` (optional)|1 per supervisor; aggregate by st [...]
 |`taskSlot/total/count`|Number of total task slots per emission period. This 
metric is available only if the `TaskSlotCountStatsMonitor` module is 
included.| `category`|Varies|
 |`taskSlot/idle/count`|Number of idle task slots per emission period. This 
metric is available only if the `TaskSlotCountStatsMonitor` module is 
included.| `category`|Varies|
 |`taskSlot/used/count`|Number of busy task slots per emission period. This 
metric is available only if the `TaskSlotCountStatsMonitor` module is 
included.| `category`|Varies|
diff --git 
a/extensions-contrib/prometheus-emitter/src/main/resources/defaultMetrics.json 
b/extensions-contrib/prometheus-emitter/src/main/resources/defaultMetrics.json
index 0fcc7a87c4c..79be4527447 100644
--- 
a/extensions-contrib/prometheus-emitter/src/main/resources/defaultMetrics.json
+++ 
b/extensions-contrib/prometheus-emitter/src/main/resources/defaultMetrics.json
@@ -151,6 +151,7 @@
   "task/running/count" : { "dimensions" : ["dataSource"], "type" : "count", 
"help": "Number of current running tasks."},
   "task/pending/count" : { "dimensions" : ["dataSource"], "type" : "count", 
"help": "Number of current pending tasks."},
   "task/waiting/count" : { "dimensions" : ["dataSource"], "type" : "count", 
"help":  "Number of current waiting tasks."},
+  "supervisor/count" : { "dimensions" : ["supervisorId", "type", "state", 
"detailedState"], "type" : "gauge", "help": "Count of active supervisors. Each 
supervisor emits 1, tagged with its state. Available only if the 
SupervisorStatsMonitor module is included."},
 
   "segment/assigned/count" : { "dimensions" : ["tier"], "type" : "count", 
"help": "Number of segments assigned to be loaded in the cluster."},
   "segment/moved/count" : { "dimensions" : ["tier"], "type" : "count", "help": 
"Number of segments moved in the cluster." },
diff --git 
a/indexing-service/src/main/java/org/apache/druid/indexing/overlord/supervisor/SupervisorManager.java
 
b/indexing-service/src/main/java/org/apache/druid/indexing/overlord/supervisor/SupervisorManager.java
index d598fdd0910..51abb814a6d 100644
--- 
a/indexing-service/src/main/java/org/apache/druid/indexing/overlord/supervisor/SupervisorManager.java
+++ 
b/indexing-service/src/main/java/org/apache/druid/indexing/overlord/supervisor/SupervisorManager.java
@@ -43,12 +43,16 @@ import 
org.apache.druid.java.util.common.lifecycle.LifecycleStop;
 import org.apache.druid.java.util.emitter.EmittingLogger;
 import org.apache.druid.metadata.MetadataSupervisorManager;
 import org.apache.druid.metadata.PendingSegmentRecord;
+import org.apache.druid.query.DefaultQueryMetrics;
 import org.apache.druid.query.QueryContexts;
 import org.apache.druid.segment.incremental.ParseExceptionReport;
+import org.apache.druid.server.metrics.SupervisorStatsProvider;
 
 import javax.annotation.Nullable;
 import java.util.ArrayList;
 import java.util.Arrays;
+import java.util.Collection;
+import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
@@ -58,7 +62,7 @@ import java.util.concurrent.Future;
 /**
  * Manages the creation and lifetime of {@link Supervisor}.
  */
-public class SupervisorManager
+public class SupervisorManager implements SupervisorStatsProvider
 {
   private static final EmittingLogger log = new 
EmittingLogger(SupervisorManager.class);
 
@@ -88,6 +92,32 @@ public class SupervisorManager
     return supervisors.keySet();
   }
 
+  @Override
+  public Collection<SupervisorStatsProvider.SupervisorStats> 
getSupervisorStats()
+  {
+    List<SupervisorStatsProvider.SupervisorStats> stats = new ArrayList<>();
+    for (Map.Entry<String, Pair<Supervisor, SupervisorSpec>> entry : 
supervisors.entrySet()) {
+
+      final Pair<Supervisor, SupervisorSpec> pair = entry.getValue();
+      if (pair == null || pair.lhs == null) {
+        continue;
+      }
+      final Supervisor supervisor = pair.lhs;
+      final SupervisorSpec supervisorSpec = pair.rhs;
+      final SupervisorStateManager.State state = supervisor.getState();
+
+      stats.add(new SupervisorStatsProvider.SupervisorStats(
+          supervisorSpec.getId(),
+          supervisorSpec.getType(),
+          state == null ? "UNKNOWN" : state.getBasicState().toString(),
+          DefaultQueryMetrics.getTableNamesAsString(new 
HashSet<>(supervisorSpec.getDataSources())),
+          supervisorSpec.getSource() == null ? "" : supervisorSpec.getSource(),
+          state == null ? "UNKNOWN" : state.toString()
+      ));
+    }
+    return stats;
+  }
+
   /**
    * @param datasource Datasource to find active supervisor id with append 
lock for.
    * @return An optional with the active appending supervisor id if it exists.
diff --git 
a/indexing-service/src/test/java/org/apache/druid/indexing/overlord/supervisor/SupervisorManagerTest.java
 
b/indexing-service/src/test/java/org/apache/druid/indexing/overlord/supervisor/SupervisorManagerTest.java
index d34a2936001..525444e23de 100644
--- 
a/indexing-service/src/test/java/org/apache/druid/indexing/overlord/supervisor/SupervisorManagerTest.java
+++ 
b/indexing-service/src/test/java/org/apache/druid/indexing/overlord/supervisor/SupervisorManagerTest.java
@@ -40,9 +40,11 @@ import 
org.apache.druid.indexing.seekablestream.supervisor.SeekableStreamSupervi
 import org.apache.druid.jackson.DefaultObjectMapper;
 import org.apache.druid.java.util.common.DateTimes;
 import org.apache.druid.java.util.common.Intervals;
+import org.apache.druid.java.util.common.Pair;
 import org.apache.druid.metadata.MetadataSupervisorManager;
 import org.apache.druid.metadata.PendingSegmentRecord;
 import org.apache.druid.segment.realtime.appenderator.SegmentIdWithShardSpec;
+import org.apache.druid.server.metrics.SupervisorStatsProvider;
 import org.apache.druid.timeline.partition.NumberedShardSpec;
 import org.easymock.Capture;
 import org.easymock.EasyMock;
@@ -57,10 +59,12 @@ import org.junit.Test;
 import org.junit.rules.ExpectedException;
 import org.junit.runner.RunWith;
 
-import java.util.ArrayList;
+import java.lang.reflect.Field;
+import java.util.Collection;
 import java.util.Collections;
 import java.util.List;
 import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
 
 @RunWith(EasyMockRunner.class)
 public class SupervisorManagerTest extends EasyMockSupport
@@ -352,6 +356,124 @@ public class SupervisorManagerTest extends EasyMockSupport
     verifyAll();
   }
 
+  @Test
+  public void testGetSupervisorStatsWithNoSupervisors()
+  {
+    
EasyMock.expect(metadataSupervisorManager.getLatest()).andReturn(ImmutableMap.of());
+    replayAll();
+
+    manager.start();
+
+    Collection<SupervisorStatsProvider.SupervisorStats> stats = 
manager.getSupervisorStats();
+    Assert.assertTrue(stats.isEmpty());
+
+    verifyAll();
+  }
+
+  @Test
+  public void testGetSupervisorStatsWithActiveSupervisors()
+  {
+    Map<String, SupervisorSpec> existingSpecs = ImmutableMap.of(
+        "id1", new TestSupervisorSpec("id1", supervisor1)
+    );
+
+    
EasyMock.expect(metadataSupervisorManager.getLatest()).andReturn(existingSpecs);
+    supervisor1.start();
+    
EasyMock.expect(supervisor1.createAutoscaler(EasyMock.anyObject())).andReturn(null).anyTimes();
+    
EasyMock.expect(supervisor1.getState()).andReturn(SupervisorStateManager.BasicState.RUNNING);
+    replayAll();
+
+    manager.start();
+
+    Collection<SupervisorStatsProvider.SupervisorStats> stats = 
manager.getSupervisorStats();
+    Assert.assertEquals(1, stats.size());
+
+    SupervisorStatsProvider.SupervisorStats stat = stats.iterator().next();
+    Assert.assertEquals("id1", stat.getSupervisorId());
+    Assert.assertEquals("TestSupervisorSpec", stat.getType());
+    Assert.assertEquals("RUNNING", stat.getState());
+    Assert.assertEquals("id1", stat.getDataSource());
+    Assert.assertEquals("", stat.getStream());
+    Assert.assertEquals("RUNNING", stat.getDetailedState());
+
+    verifyAll();
+  }
+
+  @Test
+  public void testGetSupervisorStatsWithNullState()
+  {
+    Map<String, SupervisorSpec> existingSpecs = ImmutableMap.of(
+        "id1", new TestSupervisorSpec("id1", supervisor1)
+    );
+
+    
EasyMock.expect(metadataSupervisorManager.getLatest()).andReturn(existingSpecs);
+    supervisor1.start();
+    
EasyMock.expect(supervisor1.createAutoscaler(EasyMock.anyObject())).andReturn(null).anyTimes();
+    EasyMock.expect(supervisor1.getState()).andReturn(null);
+    replayAll();
+
+    manager.start();
+
+    Collection<SupervisorStatsProvider.SupervisorStats> stats = 
manager.getSupervisorStats();
+    Assert.assertEquals(1, stats.size());
+
+    SupervisorStatsProvider.SupervisorStats stat = stats.iterator().next();
+    Assert.assertEquals("id1", stat.getSupervisorId());
+    Assert.assertEquals("TestSupervisorSpec", stat.getType());
+    Assert.assertEquals("UNKNOWN", stat.getState());
+    Assert.assertEquals("UNKNOWN", stat.getDetailedState());
+
+    verifyAll();
+  }
+
+  @Test
+  public void testGetSupervisorStatsSkipsEntryWithNullSupervisor() throws 
Exception
+  {
+    
EasyMock.expect(metadataSupervisorManager.getLatest()).andReturn(ImmutableMap.of());
+    replayAll();
+
+    manager.start();
+
+    final ConcurrentHashMap<String, Pair<Supervisor, SupervisorSpec>> 
supervisorsMap = getSupervisorsMap();
+    supervisorsMap.put("id1", new Pair<>(null, new TestSupervisorSpec("id1", 
supervisor1)));
+
+    final Collection<SupervisorStatsProvider.SupervisorStats> stats = 
manager.getSupervisorStats();
+    Assert.assertTrue(stats.isEmpty());
+
+    verifyAll();
+  }
+
+  @Test
+  public void testGetSupervisorStatsSkipsNullEntryButRetainsValid() throws 
Exception
+  {
+    
EasyMock.expect(metadataSupervisorManager.getLatest()).andReturn(ImmutableMap.of());
+    
EasyMock.expect(supervisor2.getState()).andReturn(SupervisorStateManager.BasicState.RUNNING);
+    replayAll();
+
+    manager.start();
+
+    final ConcurrentHashMap<String, Pair<Supervisor, SupervisorSpec>> 
supervisorsMap = getSupervisorsMap();
+    supervisorsMap.put("id1", new Pair<>(null, new TestSupervisorSpec("id1", 
supervisor1)));
+    supervisorsMap.put("id2", new Pair<>(supervisor2, new 
TestSupervisorSpec("id2", supervisor2)));
+
+    final Collection<SupervisorStatsProvider.SupervisorStats> stats = 
manager.getSupervisorStats();
+    Assert.assertEquals(1, stats.size());
+
+    final SupervisorStatsProvider.SupervisorStats stat = 
stats.iterator().next();
+    Assert.assertEquals("id2", stat.getSupervisorId());
+    Assert.assertEquals("RUNNING", stat.getState());
+
+    verifyAll();
+  }
+
+  @SuppressWarnings("unchecked")
+  private ConcurrentHashMap<String, Pair<Supervisor, SupervisorSpec>> 
getSupervisorsMap() throws Exception
+  {
+    final Field field = 
SupervisorManager.class.getDeclaredField("supervisors");
+    field.setAccessible(true);
+    return (ConcurrentHashMap<String, Pair<Supervisor, SupervisorSpec>>) 
field.get(manager);
+  }
+
   @Test
   public void testHandoffTaskGroupsEarly()
   {
@@ -1012,7 +1134,7 @@ public class SupervisorManagerTest extends EasyMockSupport
     @Override
     public List<String> getDataSources()
     {
-      return new ArrayList<>();
+      return Collections.singletonList(id);
     }
   }
 }
diff --git a/processing/src/main/resources/defaultMetrics.json 
b/processing/src/main/resources/defaultMetrics.json
index ea6c2fe6b5f..cad4dcf1bfd 100644
--- a/processing/src/main/resources/defaultMetrics.json
+++ b/processing/src/main/resources/defaultMetrics.json
@@ -180,6 +180,7 @@
     "segment/usedPercent": [],
     "segment/waitCompact/bytes": [],
     "segment/waitCompact/count": [],
+    "supervisor/count": [],
     "serverview/init/time": [],
     "serverview/sync/healthy": [],
     "serverview/sync/unstableTime": [],
diff --git 
a/server/src/main/java/org/apache/druid/server/metrics/SupervisorStatsMonitor.java
 
b/server/src/main/java/org/apache/druid/server/metrics/SupervisorStatsMonitor.java
new file mode 100644
index 00000000000..e9f59a564eb
--- /dev/null
+++ 
b/server/src/main/java/org/apache/druid/server/metrics/SupervisorStatsMonitor.java
@@ -0,0 +1,68 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.server.metrics;
+
+import com.google.inject.Inject;
+import org.apache.druid.discovery.NodeRole;
+import org.apache.druid.guice.annotations.LoadScope;
+import org.apache.druid.java.util.emitter.service.ServiceEmitter;
+import org.apache.druid.java.util.emitter.service.ServiceMetricEvent;
+import org.apache.druid.java.util.metrics.AbstractMonitor;
+import org.apache.druid.query.DruidMetrics;
+
+import java.util.Collection;
+
+@LoadScope(roles = NodeRole.OVERLORD_JSON_NAME)
+public class SupervisorStatsMonitor extends AbstractMonitor
+{
+  private final SupervisorStatsProvider statsProvider;
+
+  @Inject
+  public SupervisorStatsMonitor(
+      SupervisorStatsProvider statsProvider
+  )
+  {
+    this.statsProvider = statsProvider;
+  }
+
+  @Override
+  public boolean doMonitor(ServiceEmitter emitter)
+  {
+    Collection<SupervisorStatsProvider.SupervisorStats> stats = 
statsProvider.getSupervisorStats();
+    if (stats == null) {
+      return true;
+    }
+
+    for (SupervisorStatsProvider.SupervisorStats stat : stats) {
+      emitter.emit(
+          ServiceMetricEvent.builder()
+                            .setDimension(DruidMetrics.SUPERVISOR_ID, 
stat.getSupervisorId())
+                            .setDimension(DruidMetrics.TYPE, stat.getType())
+                            .setDimension("state", stat.getState())
+                            .setDimension(DruidMetrics.DATASOURCE, 
stat.getDataSource())
+                            .setDimensionIfNotNull(DruidMetrics.STREAM, 
stat.getStream())
+                            .setDimensionIfNotNull("detailedState", 
stat.getDetailedState())
+                            .setMetric("supervisor/count", 1)
+      );
+    }
+
+    return true;
+  }
+}
diff --git 
a/server/src/main/java/org/apache/druid/server/metrics/SupervisorStatsProvider.java
 
b/server/src/main/java/org/apache/druid/server/metrics/SupervisorStatsProvider.java
new file mode 100644
index 00000000000..6e5fdf733ac
--- /dev/null
+++ 
b/server/src/main/java/org/apache/druid/server/metrics/SupervisorStatsProvider.java
@@ -0,0 +1,90 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.server.metrics;
+
+import java.util.Collection;
+
+/**
+ * Provides supervisor stats for metrics emission. Used by {@link 
SupervisorStatsMonitor}
+ * to report supervisor count and state metrics.
+ */
+public interface SupervisorStatsProvider
+{
+  /**
+   * Returns a snapshot of per-supervisor stats for the current emission 
period.
+   * Each entry represents one active supervisor with its id, type, and state.
+   *
+   * @return collection of supervisor stats; empty if no supervisors are active
+   */
+  Collection<SupervisorStats> getSupervisorStats();
+
+  /**
+   * Immutable snapshot of a single supervisor's stats for metrics emission.
+   */
+  class SupervisorStats
+  {
+    private final String supervisorId;
+    private final String type;
+    private final String state;
+    private final String dataSource;
+    private final String stream;
+    private final String detailedState;
+
+    public SupervisorStats(String supervisorId, String type, String state, 
String dataSource, String stream, String detailedState)
+    {
+      this.supervisorId = supervisorId;
+      this.type = type;
+      this.state = state;
+      this.dataSource = dataSource;
+      this.stream = stream;
+      this.detailedState = detailedState;
+    }
+
+    public String getSupervisorId()
+    {
+      return supervisorId;
+    }
+
+    public String getType()
+    {
+      return type;
+    }
+
+    public String getState()
+    {
+      return state;
+    }
+
+    public String getDataSource()
+    {
+      return dataSource;
+    }
+
+    public String getStream()
+    {
+      return stream;
+    }
+
+    public String getDetailedState()
+    {
+      return detailedState;
+    }
+  }
+}
diff --git 
a/server/src/test/java/org/apache/druid/server/metrics/SupervisorStatsMonitorTest.java
 
b/server/src/test/java/org/apache/druid/server/metrics/SupervisorStatsMonitorTest.java
new file mode 100644
index 00000000000..64e2e166656
--- /dev/null
+++ 
b/server/src/test/java/org/apache/druid/server/metrics/SupervisorStatsMonitorTest.java
@@ -0,0 +1,169 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.server.metrics;
+
+import com.google.common.collect.ImmutableList;
+import org.apache.druid.java.util.metrics.StubServiceEmitter;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+
+import java.util.Map;
+
+public class SupervisorStatsMonitorTest
+{
+  private SupervisorStatsProvider statsProvider;
+
+  @Before
+  public void setUp()
+  {
+    statsProvider = () -> ImmutableList.of(
+        new SupervisorStatsProvider.SupervisorStats("clicks-kafka", "kafka", 
"RUNNING", "clicks", "clicks-topic", "RUNNING"),
+        new SupervisorStatsProvider.SupervisorStats("logs-kafka", "kafka", 
"SUSPENDED", "logs", "logs-topic", "SUSPENDED"),
+        new SupervisorStatsProvider.SupervisorStats("alerts-kafka", "kafka", 
"UNHEALTHY_SUPERVISOR", "alerts", "alerts-topic", "UNHEALTHY_SUPERVISOR"),
+        new SupervisorStatsProvider.SupervisorStats("metrics-kinesis", 
"kinesis", "UNHEALTHY_TASKS", "metrics", "metrics-stream", "UNHEALTHY_TASKS")
+    );
+  }
+
+  @Test
+  public void testMonitorEmitsCorrectMetrics()
+  {
+    final SupervisorStatsMonitor monitor = new 
SupervisorStatsMonitor(statsProvider);
+    final StubServiceEmitter emitter = new StubServiceEmitter("service", 
"host");
+    monitor.doMonitor(emitter);
+
+    Assert.assertEquals(4, emitter.getNumEmittedEvents());
+
+    emitter.verifyValue(
+        "supervisor/count",
+        Map.of("supervisorId", "clicks-kafka", "type", "kafka", "state", 
"RUNNING", "dataSource", "clicks", "stream", "clicks-topic", "detailedState", 
"RUNNING"),
+        1
+    );
+    emitter.verifyValue(
+        "supervisor/count",
+        Map.of("supervisorId", "logs-kafka", "type", "kafka", "state", 
"SUSPENDED", "dataSource", "logs", "stream", "logs-topic", "detailedState", 
"SUSPENDED"),
+        1
+    );
+    emitter.verifyValue(
+        "supervisor/count",
+        Map.of("supervisorId", "alerts-kafka", "type", "kafka", "state", 
"UNHEALTHY_SUPERVISOR", "dataSource", "alerts", "stream", "alerts-topic", 
"detailedState", "UNHEALTHY_SUPERVISOR"),
+        1
+    );
+    emitter.verifyValue(
+        "supervisor/count",
+        Map.of("supervisorId", "metrics-kinesis", "type", "kinesis", "state", 
"UNHEALTHY_TASKS", "dataSource", "metrics", "stream", "metrics-stream", 
"detailedState", "UNHEALTHY_TASKS"),
+        1
+    );
+  }
+
+  @Test
+  public void testMonitorWithEmptyStats()
+  {
+    final SupervisorStatsProvider emptyProvider = () -> ImmutableList.of();
+    final SupervisorStatsMonitor monitor = new 
SupervisorStatsMonitor(emptyProvider);
+    final StubServiceEmitter emitter = new StubServiceEmitter("service", 
"host");
+    monitor.doMonitor(emitter);
+
+    Assert.assertEquals(0, emitter.getNumEmittedEvents());
+  }
+
+  @Test
+  public void testMonitorWithNullStats()
+  {
+    final SupervisorStatsProvider nullProvider = () -> null;
+    final SupervisorStatsMonitor monitor = new 
SupervisorStatsMonitor(nullProvider);
+    final StubServiceEmitter emitter = new StubServiceEmitter("service", 
"host");
+    monitor.doMonitor(emitter);
+
+    Assert.assertEquals(0, emitter.getNumEmittedEvents());
+  }
+
+  @Test
+  public void testMonitorWithUnknownState()
+  {
+    final SupervisorStatsProvider provider = () -> ImmutableList.of(
+        new SupervisorStatsProvider.SupervisorStats("unknown-supervisor", 
"scheduled_batch", "UNKNOWN", "compaction-ds", null, "UNKNOWN")
+    );
+    final SupervisorStatsMonitor monitor = new 
SupervisorStatsMonitor(provider);
+    final StubServiceEmitter emitter = new StubServiceEmitter("service", 
"host");
+    monitor.doMonitor(emitter);
+
+    Assert.assertEquals(1, emitter.getNumEmittedEvents());
+    emitter.verifyValue(
+        "supervisor/count",
+        Map.of("supervisorId", "unknown-supervisor", "type", 
"scheduled_batch", "state", "UNKNOWN", "dataSource", "compaction-ds", 
"detailedState", "UNKNOWN"),
+        1
+    );
+  }
+
+  @Test
+  public void testMonitorWithNullStreamAndDetailedState()
+  {
+    final SupervisorStatsProvider provider = () -> ImmutableList.of(
+        new SupervisorStatsProvider.SupervisorStats("compaction-supervisor", 
"compaction", "RUNNING", "compaction-ds", null, null)
+    );
+    final SupervisorStatsMonitor monitor = new 
SupervisorStatsMonitor(provider);
+    final StubServiceEmitter emitter = new StubServiceEmitter("service", 
"host");
+    monitor.doMonitor(emitter);
+
+    Assert.assertEquals(1, emitter.getNumEmittedEvents());
+    emitter.verifyValue(
+        "supervisor/count",
+        Map.of("supervisorId", "compaction-supervisor", "type", "compaction", 
"state", "RUNNING", "dataSource", "compaction-ds"),
+        1
+    );
+  }
+
+  @Test
+  public void testMonitorWithMultipleDatasources()
+  {
+    final SupervisorStatsProvider provider = () -> ImmutableList.of(
+        new SupervisorStatsProvider.SupervisorStats("multi-ds-supervisor", 
"kafka", "RUNNING", "[ds1, ds2]", "multi-topic", "RUNNING")
+    );
+    final SupervisorStatsMonitor monitor = new 
SupervisorStatsMonitor(provider);
+    final StubServiceEmitter emitter = new StubServiceEmitter("service", 
"host");
+    monitor.doMonitor(emitter);
+
+    Assert.assertEquals(1, emitter.getNumEmittedEvents());
+    emitter.verifyValue(
+        "supervisor/count",
+        Map.of("supervisorId", "multi-ds-supervisor", "type", "kafka", 
"state", "RUNNING", "dataSource", "[ds1, ds2]", "stream", "multi-topic", 
"detailedState", "RUNNING"),
+        1
+    );
+  }
+
+  @Test
+  public void testMonitorWithDetailedStateDifferentFromState()
+  {
+    final SupervisorStatsProvider provider = () -> ImmutableList.of(
+        new SupervisorStatsProvider.SupervisorStats("connecting-supervisor", 
"kafka", "RUNNING", "clicks", "clicks-topic", "CONNECTING_TO_STREAM")
+    );
+    final SupervisorStatsMonitor monitor = new 
SupervisorStatsMonitor(provider);
+    final StubServiceEmitter emitter = new StubServiceEmitter("service", 
"host");
+    monitor.doMonitor(emitter);
+
+    Assert.assertEquals(1, emitter.getNumEmittedEvents());
+    emitter.verifyValue(
+        "supervisor/count",
+        Map.of("supervisorId", "connecting-supervisor", "type", "kafka", 
"state", "RUNNING", "dataSource", "clicks", "stream", "clicks-topic", 
"detailedState", "CONNECTING_TO_STREAM"),
+        1
+    );
+  }
+}
diff --git a/services/src/main/java/org/apache/druid/cli/CliOverlord.java 
b/services/src/main/java/org/apache/druid/cli/CliOverlord.java
index 6518766817b..6b82029055e 100644
--- a/services/src/main/java/org/apache/druid/cli/CliOverlord.java
+++ b/services/src/main/java/org/apache/druid/cli/CliOverlord.java
@@ -137,6 +137,7 @@ import 
org.apache.druid.server.initialization.jetty.JettyBindings;
 import org.apache.druid.server.initialization.jetty.JettyServerInitUtils;
 import org.apache.druid.server.initialization.jetty.JettyServerInitializer;
 import org.apache.druid.server.metrics.ServiceStatusMonitor;
+import org.apache.druid.server.metrics.SupervisorStatsProvider;
 import org.apache.druid.server.metrics.TaskCountStatsProvider;
 import org.apache.druid.server.metrics.TaskSlotCountStatsProvider;
 import org.apache.druid.server.security.AuthConfig;
@@ -234,6 +235,7 @@ public class CliOverlord extends ServerRunnable
             binder.bind(TaskMaster.class).in(ManageLifecycle.class);
             binder.bind(TaskCountStatsProvider.class).to(TaskMaster.class);
             binder.bind(TaskSlotCountStatsProvider.class).to(TaskMaster.class);
+            
binder.bind(SupervisorStatsProvider.class).to(SupervisorManager.class);
 
             binder.bind(TaskLogStreamer.class)
                   .to(SwitchingTaskLogStreamer.class)


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to