This is an automated email from the ASF dual-hosted git repository.
abhishekrb pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/druid.git
The following commit(s) were added to refs/heads/master by this push:
new 348f5098dda Add supervisor/count metric for health and state
monitoring (#19114)
348f5098dda is described below
commit 348f5098dda8992249e0418fb8f028a9311da4b9
Author: Aru Raghuwanshi <[email protected]>
AuthorDate: Wed Mar 11 19:50:36 2026 -0700
Add supervisor/count metric for health and state monitoring (#19114)
Adds a new supervisor/count metric when SupervisorStatsMonitor is enabled
in druid.monitoring.monitors. The metric reports each supervisor’s state
(RUNNING, SUSPENDED, UNHEALTHY_SUPERVISOR, etc.) for Prometheus, StatsD, and
other metric systems.
Available dimensions are supervisorId`, `type`, `state`, `dataSource`,
`stream` (optional), `detailedState` (optional).
---
docs/configuration/index.md | 5 +-
docs/operations/metrics.md | 1 +
.../src/main/resources/defaultMetrics.json | 1 +
.../overlord/supervisor/SupervisorManager.java | 32 +++-
.../overlord/supervisor/SupervisorManagerTest.java | 126 ++++++++++++++-
processing/src/main/resources/defaultMetrics.json | 1 +
.../server/metrics/SupervisorStatsMonitor.java | 68 +++++++++
.../server/metrics/SupervisorStatsProvider.java | 90 +++++++++++
.../server/metrics/SupervisorStatsMonitorTest.java | 169 +++++++++++++++++++++
.../java/org/apache/druid/cli/CliOverlord.java | 2 +
10 files changed, 490 insertions(+), 5 deletions(-)
diff --git a/docs/configuration/index.md b/docs/configuration/index.md
index 04e5dc2af58..713ba2f6b1d 100644
--- a/docs/configuration/index.md
+++ b/docs/configuration/index.md
@@ -1983,6 +1983,7 @@ The following table lists available monitors and the
respective services where t
|`org.apache.druid.server.emitter.HttpEmittingMonitor`|Reports internal
metrics of `http` or `parametrized` emitter (see below). Must not be used with
another emitter type. See the description of the metrics here:
https://github.com/apache/druid/pull/4973.|Any|
|`org.apache.druid.server.metrics.TaskCountStatsMonitor`|Reports how many
ingestion tasks are currently running/pending/waiting and also the number of
successful/failed tasks per emission period.|Overlord|
|`org.apache.druid.server.metrics.TaskSlotCountStatsMonitor`|Reports metrics
about task slot usage per emission period.|Overlord|
+|`org.apache.druid.server.metrics.SupervisorStatsMonitor`|Reports supervisor
count and state, per supervisor per emission period.|Overlord|
|`org.apache.druid.server.metrics.WorkerTaskCountStatsMonitor`|Reports how
many ingestion tasks are currently running/pending/waiting, the number of
successful/failed tasks, and metrics about task slot usage for the reporting
worker, per emission period. |MiddleManager, Indexer|
|`org.apache.druid.server.metrics.ServiceStatusMonitor`|Reports a heartbeat
for the service.|Any|
|`org.apache.druid.server.metrics.GroupByStatsMonitor`|Report metrics for
groupBy queries like disk and merge buffer utilization. |Broker, Historical,
Indexer, Peon|
@@ -1997,10 +1998,10 @@ All the services in your Druid deployment would have
these two monitors.
If you want any service specific monitors though, you need to add all the
monitors you want to run for that service to the service's `runtime.properties`
file even if they are listed in the common file. The service specific
properties take precedence.
-The following example adds the `TaskCountStatsMonitor` and
`TaskSlotCountStatsMonitor` as well as the `OshiSysMonitor` and `JvmMonitor`
from the previous example to the Overlord service
(`coordinator-overlord/runtime.properties`):
+The following example adds the `TaskCountStatsMonitor`,
`TaskSlotCountStatsMonitor`, and `SupervisorStatsMonitor` as well as the
`OshiSysMonitor` and `JvmMonitor` from the previous example to the Overlord
service (`coordinator-overlord/runtime.properties`):
```properties
-druid.monitoring.monitors=["org.apache.druid.server.metrics.TaskCountStatsMonitor",
"org.apache.druid.server.metrics.TaskSlotCountStatsMonitor",
"org.apache.druid.java.util.metrics.OshiSysMonitor","org.apache.druid.java.util.metrics.JvmMonitor"]
+druid.monitoring.monitors=["org.apache.druid.server.metrics.TaskCountStatsMonitor",
"org.apache.druid.server.metrics.TaskSlotCountStatsMonitor",
"org.apache.druid.server.metrics.SupervisorStatsMonitor",
"org.apache.druid.java.util.metrics.OshiSysMonitor","org.apache.druid.java.util.metrics.JvmMonitor"]
```
If you don't include `OshiSysMonitor` and `JvmMonitor` in the Overlord's
`runtime.properties` file, the monitors don't get loaded onto the Overlord
despite being specified in the common file.
diff --git a/docs/operations/metrics.md b/docs/operations/metrics.md
index 34f38e7c344..582a8d60877 100644
--- a/docs/operations/metrics.md
+++ b/docs/operations/metrics.md
@@ -342,6 +342,7 @@ If the JVM does not support CPU time measurement for the
current thread, `ingest
|`task/running/count`|Number of current running tasks. This metric is
available only if the `TaskCountStatsMonitor` module is
included.|`dataSource`,`taskType`, `supervisorId`|Varies|
|`task/pending/count`|Number of current pending tasks. This metric is
available only if the `TaskCountStatsMonitor` module is
included.|`dataSource`,`taskType`, `supervisorId`|Varies|
|`task/waiting/count`|Number of current waiting tasks. This metric is
available only if the `TaskCountStatsMonitor` module is
included.|`dataSource`,`taskType`, `supervisorId`|Varies|
+|`supervisor/count`|Count of active supervisors. Each supervisor emits 1,
tagged with its state, datasource, stream (when applicable), and detailed
state. For possible `state` and `detailedState` values, see the [supervisor
status report](../ingestion/supervisor.md#status-report). This metric is
available only if the `SupervisorStatsMonitor` module is
included.|`supervisorId`, `type`, `state`, `dataSource`, `stream` (optional),
`detailedState` (optional)|1 per supervisor; aggregate by st [...]
|`taskSlot/total/count`|Number of total task slots per emission period. This
metric is available only if the `TaskSlotCountStatsMonitor` module is
included.| `category`|Varies|
|`taskSlot/idle/count`|Number of idle task slots per emission period. This
metric is available only if the `TaskSlotCountStatsMonitor` module is
included.| `category`|Varies|
|`taskSlot/used/count`|Number of busy task slots per emission period. This
metric is available only if the `TaskSlotCountStatsMonitor` module is
included.| `category`|Varies|
diff --git
a/extensions-contrib/prometheus-emitter/src/main/resources/defaultMetrics.json
b/extensions-contrib/prometheus-emitter/src/main/resources/defaultMetrics.json
index 0fcc7a87c4c..79be4527447 100644
---
a/extensions-contrib/prometheus-emitter/src/main/resources/defaultMetrics.json
+++
b/extensions-contrib/prometheus-emitter/src/main/resources/defaultMetrics.json
@@ -151,6 +151,7 @@
"task/running/count" : { "dimensions" : ["dataSource"], "type" : "count",
"help": "Number of current running tasks."},
"task/pending/count" : { "dimensions" : ["dataSource"], "type" : "count",
"help": "Number of current pending tasks."},
"task/waiting/count" : { "dimensions" : ["dataSource"], "type" : "count",
"help": "Number of current waiting tasks."},
+ "supervisor/count" : { "dimensions" : ["supervisorId", "type", "state",
"detailedState"], "type" : "gauge", "help": "Count of active supervisors. Each
supervisor emits 1, tagged with its state. Available only if the
SupervisorStatsMonitor module is included."},
"segment/assigned/count" : { "dimensions" : ["tier"], "type" : "count",
"help": "Number of segments assigned to be loaded in the cluster."},
"segment/moved/count" : { "dimensions" : ["tier"], "type" : "count", "help":
"Number of segments moved in the cluster." },
diff --git
a/indexing-service/src/main/java/org/apache/druid/indexing/overlord/supervisor/SupervisorManager.java
b/indexing-service/src/main/java/org/apache/druid/indexing/overlord/supervisor/SupervisorManager.java
index d598fdd0910..51abb814a6d 100644
---
a/indexing-service/src/main/java/org/apache/druid/indexing/overlord/supervisor/SupervisorManager.java
+++
b/indexing-service/src/main/java/org/apache/druid/indexing/overlord/supervisor/SupervisorManager.java
@@ -43,12 +43,16 @@ import
org.apache.druid.java.util.common.lifecycle.LifecycleStop;
import org.apache.druid.java.util.emitter.EmittingLogger;
import org.apache.druid.metadata.MetadataSupervisorManager;
import org.apache.druid.metadata.PendingSegmentRecord;
+import org.apache.druid.query.DefaultQueryMetrics;
import org.apache.druid.query.QueryContexts;
import org.apache.druid.segment.incremental.ParseExceptionReport;
+import org.apache.druid.server.metrics.SupervisorStatsProvider;
import javax.annotation.Nullable;
import java.util.ArrayList;
import java.util.Arrays;
+import java.util.Collection;
+import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
@@ -58,7 +62,7 @@ import java.util.concurrent.Future;
/**
* Manages the creation and lifetime of {@link Supervisor}.
*/
-public class SupervisorManager
+public class SupervisorManager implements SupervisorStatsProvider
{
private static final EmittingLogger log = new
EmittingLogger(SupervisorManager.class);
@@ -88,6 +92,32 @@ public class SupervisorManager
return supervisors.keySet();
}
+ @Override
+ public Collection<SupervisorStatsProvider.SupervisorStats>
getSupervisorStats()
+ {
+ List<SupervisorStatsProvider.SupervisorStats> stats = new ArrayList<>();
+ for (Map.Entry<String, Pair<Supervisor, SupervisorSpec>> entry :
supervisors.entrySet()) {
+
+ final Pair<Supervisor, SupervisorSpec> pair = entry.getValue();
+ if (pair == null || pair.lhs == null) {
+ continue;
+ }
+ final Supervisor supervisor = pair.lhs;
+ final SupervisorSpec supervisorSpec = pair.rhs;
+ final SupervisorStateManager.State state = supervisor.getState();
+
+ stats.add(new SupervisorStatsProvider.SupervisorStats(
+ supervisorSpec.getId(),
+ supervisorSpec.getType(),
+ state == null ? "UNKNOWN" : state.getBasicState().toString(),
+ DefaultQueryMetrics.getTableNamesAsString(new
HashSet<>(supervisorSpec.getDataSources())),
+ supervisorSpec.getSource() == null ? "" : supervisorSpec.getSource(),
+ state == null ? "UNKNOWN" : state.toString()
+ ));
+ }
+ return stats;
+ }
+
/**
* @param datasource Datasource to find active supervisor id with append
lock for.
* @return An optional with the active appending supervisor id if it exists.
diff --git
a/indexing-service/src/test/java/org/apache/druid/indexing/overlord/supervisor/SupervisorManagerTest.java
b/indexing-service/src/test/java/org/apache/druid/indexing/overlord/supervisor/SupervisorManagerTest.java
index d34a2936001..525444e23de 100644
---
a/indexing-service/src/test/java/org/apache/druid/indexing/overlord/supervisor/SupervisorManagerTest.java
+++
b/indexing-service/src/test/java/org/apache/druid/indexing/overlord/supervisor/SupervisorManagerTest.java
@@ -40,9 +40,11 @@ import
org.apache.druid.indexing.seekablestream.supervisor.SeekableStreamSupervi
import org.apache.druid.jackson.DefaultObjectMapper;
import org.apache.druid.java.util.common.DateTimes;
import org.apache.druid.java.util.common.Intervals;
+import org.apache.druid.java.util.common.Pair;
import org.apache.druid.metadata.MetadataSupervisorManager;
import org.apache.druid.metadata.PendingSegmentRecord;
import org.apache.druid.segment.realtime.appenderator.SegmentIdWithShardSpec;
+import org.apache.druid.server.metrics.SupervisorStatsProvider;
import org.apache.druid.timeline.partition.NumberedShardSpec;
import org.easymock.Capture;
import org.easymock.EasyMock;
@@ -57,10 +59,12 @@ import org.junit.Test;
import org.junit.rules.ExpectedException;
import org.junit.runner.RunWith;
-import java.util.ArrayList;
+import java.lang.reflect.Field;
+import java.util.Collection;
import java.util.Collections;
import java.util.List;
import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
@RunWith(EasyMockRunner.class)
public class SupervisorManagerTest extends EasyMockSupport
@@ -352,6 +356,124 @@ public class SupervisorManagerTest extends EasyMockSupport
verifyAll();
}
+ @Test
+ public void testGetSupervisorStatsWithNoSupervisors()
+ {
+
EasyMock.expect(metadataSupervisorManager.getLatest()).andReturn(ImmutableMap.of());
+ replayAll();
+
+ manager.start();
+
+ Collection<SupervisorStatsProvider.SupervisorStats> stats =
manager.getSupervisorStats();
+ Assert.assertTrue(stats.isEmpty());
+
+ verifyAll();
+ }
+
+ @Test
+ public void testGetSupervisorStatsWithActiveSupervisors()
+ {
+ Map<String, SupervisorSpec> existingSpecs = ImmutableMap.of(
+ "id1", new TestSupervisorSpec("id1", supervisor1)
+ );
+
+
EasyMock.expect(metadataSupervisorManager.getLatest()).andReturn(existingSpecs);
+ supervisor1.start();
+
EasyMock.expect(supervisor1.createAutoscaler(EasyMock.anyObject())).andReturn(null).anyTimes();
+
EasyMock.expect(supervisor1.getState()).andReturn(SupervisorStateManager.BasicState.RUNNING);
+ replayAll();
+
+ manager.start();
+
+ Collection<SupervisorStatsProvider.SupervisorStats> stats =
manager.getSupervisorStats();
+ Assert.assertEquals(1, stats.size());
+
+ SupervisorStatsProvider.SupervisorStats stat = stats.iterator().next();
+ Assert.assertEquals("id1", stat.getSupervisorId());
+ Assert.assertEquals("TestSupervisorSpec", stat.getType());
+ Assert.assertEquals("RUNNING", stat.getState());
+ Assert.assertEquals("id1", stat.getDataSource());
+ Assert.assertEquals("", stat.getStream());
+ Assert.assertEquals("RUNNING", stat.getDetailedState());
+
+ verifyAll();
+ }
+
+ @Test
+ public void testGetSupervisorStatsWithNullState()
+ {
+ Map<String, SupervisorSpec> existingSpecs = ImmutableMap.of(
+ "id1", new TestSupervisorSpec("id1", supervisor1)
+ );
+
+
EasyMock.expect(metadataSupervisorManager.getLatest()).andReturn(existingSpecs);
+ supervisor1.start();
+
EasyMock.expect(supervisor1.createAutoscaler(EasyMock.anyObject())).andReturn(null).anyTimes();
+ EasyMock.expect(supervisor1.getState()).andReturn(null);
+ replayAll();
+
+ manager.start();
+
+ Collection<SupervisorStatsProvider.SupervisorStats> stats =
manager.getSupervisorStats();
+ Assert.assertEquals(1, stats.size());
+
+ SupervisorStatsProvider.SupervisorStats stat = stats.iterator().next();
+ Assert.assertEquals("id1", stat.getSupervisorId());
+ Assert.assertEquals("TestSupervisorSpec", stat.getType());
+ Assert.assertEquals("UNKNOWN", stat.getState());
+ Assert.assertEquals("UNKNOWN", stat.getDetailedState());
+
+ verifyAll();
+ }
+
+ @Test
+ public void testGetSupervisorStatsSkipsEntryWithNullSupervisor() throws
Exception
+ {
+
EasyMock.expect(metadataSupervisorManager.getLatest()).andReturn(ImmutableMap.of());
+ replayAll();
+
+ manager.start();
+
+ final ConcurrentHashMap<String, Pair<Supervisor, SupervisorSpec>>
supervisorsMap = getSupervisorsMap();
+ supervisorsMap.put("id1", new Pair<>(null, new TestSupervisorSpec("id1",
supervisor1)));
+
+ final Collection<SupervisorStatsProvider.SupervisorStats> stats =
manager.getSupervisorStats();
+ Assert.assertTrue(stats.isEmpty());
+
+ verifyAll();
+ }
+
+ @Test
+ public void testGetSupervisorStatsSkipsNullEntryButRetainsValid() throws
Exception
+ {
+
EasyMock.expect(metadataSupervisorManager.getLatest()).andReturn(ImmutableMap.of());
+
EasyMock.expect(supervisor2.getState()).andReturn(SupervisorStateManager.BasicState.RUNNING);
+ replayAll();
+
+ manager.start();
+
+ final ConcurrentHashMap<String, Pair<Supervisor, SupervisorSpec>>
supervisorsMap = getSupervisorsMap();
+ supervisorsMap.put("id1", new Pair<>(null, new TestSupervisorSpec("id1",
supervisor1)));
+ supervisorsMap.put("id2", new Pair<>(supervisor2, new
TestSupervisorSpec("id2", supervisor2)));
+
+ final Collection<SupervisorStatsProvider.SupervisorStats> stats =
manager.getSupervisorStats();
+ Assert.assertEquals(1, stats.size());
+
+ final SupervisorStatsProvider.SupervisorStats stat =
stats.iterator().next();
+ Assert.assertEquals("id2", stat.getSupervisorId());
+ Assert.assertEquals("RUNNING", stat.getState());
+
+ verifyAll();
+ }
+
+ @SuppressWarnings("unchecked")
+ private ConcurrentHashMap<String, Pair<Supervisor, SupervisorSpec>>
getSupervisorsMap() throws Exception
+ {
+ final Field field =
SupervisorManager.class.getDeclaredField("supervisors");
+ field.setAccessible(true);
+ return (ConcurrentHashMap<String, Pair<Supervisor, SupervisorSpec>>)
field.get(manager);
+ }
+
@Test
public void testHandoffTaskGroupsEarly()
{
@@ -1012,7 +1134,7 @@ public class SupervisorManagerTest extends EasyMockSupport
@Override
public List<String> getDataSources()
{
- return new ArrayList<>();
+ return Collections.singletonList(id);
}
}
}
diff --git a/processing/src/main/resources/defaultMetrics.json
b/processing/src/main/resources/defaultMetrics.json
index ea6c2fe6b5f..cad4dcf1bfd 100644
--- a/processing/src/main/resources/defaultMetrics.json
+++ b/processing/src/main/resources/defaultMetrics.json
@@ -180,6 +180,7 @@
"segment/usedPercent": [],
"segment/waitCompact/bytes": [],
"segment/waitCompact/count": [],
+ "supervisor/count": [],
"serverview/init/time": [],
"serverview/sync/healthy": [],
"serverview/sync/unstableTime": [],
diff --git
a/server/src/main/java/org/apache/druid/server/metrics/SupervisorStatsMonitor.java
b/server/src/main/java/org/apache/druid/server/metrics/SupervisorStatsMonitor.java
new file mode 100644
index 00000000000..e9f59a564eb
--- /dev/null
+++
b/server/src/main/java/org/apache/druid/server/metrics/SupervisorStatsMonitor.java
@@ -0,0 +1,68 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.server.metrics;
+
+import com.google.inject.Inject;
+import org.apache.druid.discovery.NodeRole;
+import org.apache.druid.guice.annotations.LoadScope;
+import org.apache.druid.java.util.emitter.service.ServiceEmitter;
+import org.apache.druid.java.util.emitter.service.ServiceMetricEvent;
+import org.apache.druid.java.util.metrics.AbstractMonitor;
+import org.apache.druid.query.DruidMetrics;
+
+import java.util.Collection;
+
+@LoadScope(roles = NodeRole.OVERLORD_JSON_NAME)
+public class SupervisorStatsMonitor extends AbstractMonitor
+{
+ private final SupervisorStatsProvider statsProvider;
+
+ @Inject
+ public SupervisorStatsMonitor(
+ SupervisorStatsProvider statsProvider
+ )
+ {
+ this.statsProvider = statsProvider;
+ }
+
+ @Override
+ public boolean doMonitor(ServiceEmitter emitter)
+ {
+ Collection<SupervisorStatsProvider.SupervisorStats> stats =
statsProvider.getSupervisorStats();
+ if (stats == null) {
+ return true;
+ }
+
+ for (SupervisorStatsProvider.SupervisorStats stat : stats) {
+ emitter.emit(
+ ServiceMetricEvent.builder()
+ .setDimension(DruidMetrics.SUPERVISOR_ID,
stat.getSupervisorId())
+ .setDimension(DruidMetrics.TYPE, stat.getType())
+ .setDimension("state", stat.getState())
+ .setDimension(DruidMetrics.DATASOURCE,
stat.getDataSource())
+ .setDimensionIfNotNull(DruidMetrics.STREAM,
stat.getStream())
+ .setDimensionIfNotNull("detailedState",
stat.getDetailedState())
+ .setMetric("supervisor/count", 1)
+ );
+ }
+
+ return true;
+ }
+}
diff --git
a/server/src/main/java/org/apache/druid/server/metrics/SupervisorStatsProvider.java
b/server/src/main/java/org/apache/druid/server/metrics/SupervisorStatsProvider.java
new file mode 100644
index 00000000000..6e5fdf733ac
--- /dev/null
+++
b/server/src/main/java/org/apache/druid/server/metrics/SupervisorStatsProvider.java
@@ -0,0 +1,90 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.server.metrics;
+
+import java.util.Collection;
+
+/**
+ * Provides supervisor stats for metrics emission. Used by {@link
SupervisorStatsMonitor}
+ * to report supervisor count and state metrics.
+ */
+public interface SupervisorStatsProvider
+{
+ /**
+ * Returns a snapshot of per-supervisor stats for the current emission
period.
+ * Each entry represents one active supervisor with its id, type, and state.
+ *
+ * @return collection of supervisor stats; empty if no supervisors are active
+ */
+ Collection<SupervisorStats> getSupervisorStats();
+
+ /**
+ * Immutable snapshot of a single supervisor's stats for metrics emission.
+ */
+ class SupervisorStats
+ {
+ private final String supervisorId;
+ private final String type;
+ private final String state;
+ private final String dataSource;
+ private final String stream;
+ private final String detailedState;
+
+ public SupervisorStats(String supervisorId, String type, String state,
String dataSource, String stream, String detailedState)
+ {
+ this.supervisorId = supervisorId;
+ this.type = type;
+ this.state = state;
+ this.dataSource = dataSource;
+ this.stream = stream;
+ this.detailedState = detailedState;
+ }
+
+ public String getSupervisorId()
+ {
+ return supervisorId;
+ }
+
+ public String getType()
+ {
+ return type;
+ }
+
+ public String getState()
+ {
+ return state;
+ }
+
+ public String getDataSource()
+ {
+ return dataSource;
+ }
+
+ public String getStream()
+ {
+ return stream;
+ }
+
+ public String getDetailedState()
+ {
+ return detailedState;
+ }
+ }
+}
diff --git
a/server/src/test/java/org/apache/druid/server/metrics/SupervisorStatsMonitorTest.java
b/server/src/test/java/org/apache/druid/server/metrics/SupervisorStatsMonitorTest.java
new file mode 100644
index 00000000000..64e2e166656
--- /dev/null
+++
b/server/src/test/java/org/apache/druid/server/metrics/SupervisorStatsMonitorTest.java
@@ -0,0 +1,169 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.server.metrics;
+
+import com.google.common.collect.ImmutableList;
+import org.apache.druid.java.util.metrics.StubServiceEmitter;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+
+import java.util.Map;
+
+public class SupervisorStatsMonitorTest
+{
+ private SupervisorStatsProvider statsProvider;
+
+ @Before
+ public void setUp()
+ {
+ statsProvider = () -> ImmutableList.of(
+ new SupervisorStatsProvider.SupervisorStats("clicks-kafka", "kafka",
"RUNNING", "clicks", "clicks-topic", "RUNNING"),
+ new SupervisorStatsProvider.SupervisorStats("logs-kafka", "kafka",
"SUSPENDED", "logs", "logs-topic", "SUSPENDED"),
+ new SupervisorStatsProvider.SupervisorStats("alerts-kafka", "kafka",
"UNHEALTHY_SUPERVISOR", "alerts", "alerts-topic", "UNHEALTHY_SUPERVISOR"),
+ new SupervisorStatsProvider.SupervisorStats("metrics-kinesis",
"kinesis", "UNHEALTHY_TASKS", "metrics", "metrics-stream", "UNHEALTHY_TASKS")
+ );
+ }
+
+ @Test
+ public void testMonitorEmitsCorrectMetrics()
+ {
+ final SupervisorStatsMonitor monitor = new
SupervisorStatsMonitor(statsProvider);
+ final StubServiceEmitter emitter = new StubServiceEmitter("service",
"host");
+ monitor.doMonitor(emitter);
+
+ Assert.assertEquals(4, emitter.getNumEmittedEvents());
+
+ emitter.verifyValue(
+ "supervisor/count",
+ Map.of("supervisorId", "clicks-kafka", "type", "kafka", "state",
"RUNNING", "dataSource", "clicks", "stream", "clicks-topic", "detailedState",
"RUNNING"),
+ 1
+ );
+ emitter.verifyValue(
+ "supervisor/count",
+ Map.of("supervisorId", "logs-kafka", "type", "kafka", "state",
"SUSPENDED", "dataSource", "logs", "stream", "logs-topic", "detailedState",
"SUSPENDED"),
+ 1
+ );
+ emitter.verifyValue(
+ "supervisor/count",
+ Map.of("supervisorId", "alerts-kafka", "type", "kafka", "state",
"UNHEALTHY_SUPERVISOR", "dataSource", "alerts", "stream", "alerts-topic",
"detailedState", "UNHEALTHY_SUPERVISOR"),
+ 1
+ );
+ emitter.verifyValue(
+ "supervisor/count",
+ Map.of("supervisorId", "metrics-kinesis", "type", "kinesis", "state",
"UNHEALTHY_TASKS", "dataSource", "metrics", "stream", "metrics-stream",
"detailedState", "UNHEALTHY_TASKS"),
+ 1
+ );
+ }
+
+ @Test
+ public void testMonitorWithEmptyStats()
+ {
+ final SupervisorStatsProvider emptyProvider = () -> ImmutableList.of();
+ final SupervisorStatsMonitor monitor = new
SupervisorStatsMonitor(emptyProvider);
+ final StubServiceEmitter emitter = new StubServiceEmitter("service",
"host");
+ monitor.doMonitor(emitter);
+
+ Assert.assertEquals(0, emitter.getNumEmittedEvents());
+ }
+
+ @Test
+ public void testMonitorWithNullStats()
+ {
+ final SupervisorStatsProvider nullProvider = () -> null;
+ final SupervisorStatsMonitor monitor = new
SupervisorStatsMonitor(nullProvider);
+ final StubServiceEmitter emitter = new StubServiceEmitter("service",
"host");
+ monitor.doMonitor(emitter);
+
+ Assert.assertEquals(0, emitter.getNumEmittedEvents());
+ }
+
+ @Test
+ public void testMonitorWithUnknownState()
+ {
+ final SupervisorStatsProvider provider = () -> ImmutableList.of(
+ new SupervisorStatsProvider.SupervisorStats("unknown-supervisor",
"scheduled_batch", "UNKNOWN", "compaction-ds", null, "UNKNOWN")
+ );
+ final SupervisorStatsMonitor monitor = new
SupervisorStatsMonitor(provider);
+ final StubServiceEmitter emitter = new StubServiceEmitter("service",
"host");
+ monitor.doMonitor(emitter);
+
+ Assert.assertEquals(1, emitter.getNumEmittedEvents());
+ emitter.verifyValue(
+ "supervisor/count",
+ Map.of("supervisorId", "unknown-supervisor", "type",
"scheduled_batch", "state", "UNKNOWN", "dataSource", "compaction-ds",
"detailedState", "UNKNOWN"),
+ 1
+ );
+ }
+
+ @Test
+ public void testMonitorWithNullStreamAndDetailedState()
+ {
+ final SupervisorStatsProvider provider = () -> ImmutableList.of(
+ new SupervisorStatsProvider.SupervisorStats("compaction-supervisor",
"compaction", "RUNNING", "compaction-ds", null, null)
+ );
+ final SupervisorStatsMonitor monitor = new
SupervisorStatsMonitor(provider);
+ final StubServiceEmitter emitter = new StubServiceEmitter("service",
"host");
+ monitor.doMonitor(emitter);
+
+ Assert.assertEquals(1, emitter.getNumEmittedEvents());
+ emitter.verifyValue(
+ "supervisor/count",
+ Map.of("supervisorId", "compaction-supervisor", "type", "compaction",
"state", "RUNNING", "dataSource", "compaction-ds"),
+ 1
+ );
+ }
+
+ @Test
+ public void testMonitorWithMultipleDatasources()
+ {
+ final SupervisorStatsProvider provider = () -> ImmutableList.of(
+ new SupervisorStatsProvider.SupervisorStats("multi-ds-supervisor",
"kafka", "RUNNING", "[ds1, ds2]", "multi-topic", "RUNNING")
+ );
+ final SupervisorStatsMonitor monitor = new
SupervisorStatsMonitor(provider);
+ final StubServiceEmitter emitter = new StubServiceEmitter("service",
"host");
+ monitor.doMonitor(emitter);
+
+ Assert.assertEquals(1, emitter.getNumEmittedEvents());
+ emitter.verifyValue(
+ "supervisor/count",
+ Map.of("supervisorId", "multi-ds-supervisor", "type", "kafka",
"state", "RUNNING", "dataSource", "[ds1, ds2]", "stream", "multi-topic",
"detailedState", "RUNNING"),
+ 1
+ );
+ }
+
+ @Test
+ public void testMonitorWithDetailedStateDifferentFromState()
+ {
+ final SupervisorStatsProvider provider = () -> ImmutableList.of(
+ new SupervisorStatsProvider.SupervisorStats("connecting-supervisor",
"kafka", "RUNNING", "clicks", "clicks-topic", "CONNECTING_TO_STREAM")
+ );
+ final SupervisorStatsMonitor monitor = new
SupervisorStatsMonitor(provider);
+ final StubServiceEmitter emitter = new StubServiceEmitter("service",
"host");
+ monitor.doMonitor(emitter);
+
+ Assert.assertEquals(1, emitter.getNumEmittedEvents());
+ emitter.verifyValue(
+ "supervisor/count",
+ Map.of("supervisorId", "connecting-supervisor", "type", "kafka",
"state", "RUNNING", "dataSource", "clicks", "stream", "clicks-topic",
"detailedState", "CONNECTING_TO_STREAM"),
+ 1
+ );
+ }
+}
diff --git a/services/src/main/java/org/apache/druid/cli/CliOverlord.java
b/services/src/main/java/org/apache/druid/cli/CliOverlord.java
index 6518766817b..6b82029055e 100644
--- a/services/src/main/java/org/apache/druid/cli/CliOverlord.java
+++ b/services/src/main/java/org/apache/druid/cli/CliOverlord.java
@@ -137,6 +137,7 @@ import
org.apache.druid.server.initialization.jetty.JettyBindings;
import org.apache.druid.server.initialization.jetty.JettyServerInitUtils;
import org.apache.druid.server.initialization.jetty.JettyServerInitializer;
import org.apache.druid.server.metrics.ServiceStatusMonitor;
+import org.apache.druid.server.metrics.SupervisorStatsProvider;
import org.apache.druid.server.metrics.TaskCountStatsProvider;
import org.apache.druid.server.metrics.TaskSlotCountStatsProvider;
import org.apache.druid.server.security.AuthConfig;
@@ -234,6 +235,7 @@ public class CliOverlord extends ServerRunnable
binder.bind(TaskMaster.class).in(ManageLifecycle.class);
binder.bind(TaskCountStatsProvider.class).to(TaskMaster.class);
binder.bind(TaskSlotCountStatsProvider.class).to(TaskMaster.class);
+
binder.bind(SupervisorStatsProvider.class).to(SupervisorManager.class);
binder.bind(TaskLogStreamer.class)
.to(SwitchingTaskLogStreamer.class)
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]