This is an automated email from the ASF dual-hosted git repository.
abhishekrb pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/druid.git
The following commit(s) were added to refs/heads/master by this push:
new 5f70c8300d6 Prometheus per label ttl (#18689)
5f70c8300d6 is described below
commit 5f70c8300d6d36f49436138450081e79bb97b654
Author: aho135 <[email protected]>
AuthorDate: Tue Nov 4 14:59:42 2025 -0800
Prometheus per label ttl (#18689)
Fixes #14638
This is an extension of #18598 to enhance the TTL to track metrics at the
label level instead of just the metric level. This should allow operators to
configure medium cardinality dimensions on the prometheus-emitter without
putting pressure on the Prometheus collector/sub-system.
---
docs/development/extensions-contrib/prometheus.md | 2 +-
.../emitter/prometheus/DimensionsAndCollector.java | 44 ++++++--
.../emitter/prometheus/PrometheusEmitter.java | 36 ++++---
.../emitter/prometheus/PrometheusEmitterTest.java | 119 +++++++++++++++------
4 files changed, 147 insertions(+), 54 deletions(-)
diff --git a/docs/development/extensions-contrib/prometheus.md
b/docs/development/extensions-contrib/prometheus.md
index d5660e2e54b..2b4b042be48 100644
--- a/docs/development/extensions-contrib/prometheus.md
+++ b/docs/development/extensions-contrib/prometheus.md
@@ -44,7 +44,7 @@ All the configuration parameters for the Prometheus emitter
are under `druid.emi
| `druid.emitter.prometheus.addHostAsLabel` | Flag to include the hostname
as a prometheus label.
| no | false
|
| `druid.emitter.prometheus.addServiceAsLabel` | Flag to include the druid
service name (e.g. `druid/broker`, `druid/coordinator`, etc.) as a prometheus
label.
| no | false
|
| `druid.emitter.prometheus.pushGatewayAddress` | Pushgateway address.
Required if using `pushgateway` strategy.
| no | none
|
-| `druid.emitter.prometheus.flushPeriod` | When using the `pushgateway`
strategy metrics are emitted every `flushPeriod` seconds. <br/>When using the
`exporter` strategy this configures the metric TTL such that if the metric
value is not updated within `flushPeriod` seconds then it will stop being
emitted. Note that unique label combinations per metric are currently not
subject to TTL expiration. It is recommended to set this to at least 3 *
`scrape_interval`. | Required if `pushg [...]
+| `druid.emitter.prometheus.flushPeriod` | When using the `pushgateway`
strategy metrics are emitted every `flushPeriod` seconds. <br/>When using the
`exporter` strategy this configures the metric TTL such that if the metric
value is not updated within `flushPeriod` seconds then it will stop being
emitted. Note that the TTL applies for each unique label combination per
metric. It is recommended to set this to at least 3 * `scrape_interval`. |
Required if `pushgateway` strategy is [...]
| `druid.emitter.prometheus.extraLabels` | JSON key-value pairs for
additional labels on all metrics. Keys (label names) must match the regex
`[a-zA-Z_:][a-zA-Z0-9_:]*`. Example: `{"cluster_name": "druid_cluster1", "env":
"staging"}`. | no | none
|
| `druid.emitter.prometheus.deletePushGatewayMetricsOnShutdown` | Flag to
delete metrics from Pushgateway on task shutdown. Works only if `pushgateway`
strategy is used. This feature allows to delete a stale metrics from batch
executed tasks. Otherwise, the Pushgateway will store these stale metrics
indefinitely as there is [no time to live
mechanism](https://github.com/prometheus/pushgateway/issues/117), using the
memory to hold data that was already scraped by Prometheus. | no | false |
| `druid.emitter.prometheus.waitForShutdownDelay` | Time in milliseconds to
wait for peon tasks to delete metrics from the Pushgateway on shutdown (e.g.
60_000). Applicable only when `pushgateway` strategy is used and
`deletePushGatewayMetricsOnShutdown` is set to true. There is no guarantee that
a peon task will delete metrics from the gateway if the configured delay is
more than the [Peon's
`druid.indexer.task.gracefulShutdownTimeout`](https://druid.apache.org/docs/latest/configuration
[...]
diff --git
a/extensions-contrib/prometheus-emitter/src/main/java/org/apache/druid/emitter/prometheus/DimensionsAndCollector.java
b/extensions-contrib/prometheus-emitter/src/main/java/org/apache/druid/emitter/prometheus/DimensionsAndCollector.java
index 73be1eed205..0d75d45034b 100644
---
a/extensions-contrib/prometheus-emitter/src/main/java/org/apache/druid/emitter/prometheus/DimensionsAndCollector.java
+++
b/extensions-contrib/prometheus-emitter/src/main/java/org/apache/druid/emitter/prometheus/DimensionsAndCollector.java
@@ -20,11 +20,15 @@
package org.apache.druid.emitter.prometheus;
import io.prometheus.client.SimpleCollector;
+import org.apache.druid.error.DruidException;
import org.apache.druid.java.util.common.Stopwatch;
import org.apache.druid.java.util.common.logger.Logger;
import org.joda.time.Duration;
import javax.annotation.Nullable;
+import java.util.List;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
public class DimensionsAndCollector
{
@@ -33,7 +37,7 @@ public class DimensionsAndCollector
private final SimpleCollector collector;
private final double conversionFactor;
private final double[] histogramBuckets;
- private final Stopwatch updateTimer;
+ private final ConcurrentHashMap<List<String>, Stopwatch>
labelValuesToStopwatch;
private final Duration ttlSeconds;
DimensionsAndCollector(String[] dimensions, SimpleCollector collector,
double conversionFactor, double[] histogramBuckets, @Nullable Integer
ttlSeconds)
@@ -42,7 +46,7 @@ public class DimensionsAndCollector
this.collector = collector;
this.conversionFactor = conversionFactor;
this.histogramBuckets = histogramBuckets;
- this.updateTimer = Stopwatch.createStarted();
+ this.labelValuesToStopwatch = new ConcurrentHashMap<>();
this.ttlSeconds = ttlSeconds != null ?
Duration.standardSeconds(ttlSeconds) : null;
}
@@ -66,22 +70,42 @@ public class DimensionsAndCollector
return histogramBuckets;
}
- public void resetLastUpdateTime()
+ /**
+ * For each unique set of labelValues, keeps track of the amount of time
that has elapsed since its metric
+ * value has been updated.
+ */
+ public void resetLastUpdateTime(List<String> labelValues)
{
- updateTimer.restart();
+ labelValuesToStopwatch.compute(labelValues, (k, v) -> {
+ if (v != null) {
+ v.restart();
+ return v;
+ } else {
+ return Stopwatch.createStarted();
+ }
+ });
}
- public long getMillisSinceLastUpdate()
+ public ConcurrentMap<List<String>, Stopwatch> getLabelValuesToStopwatch()
{
- return updateTimer.millisElapsed();
+ return labelValuesToStopwatch;
}
- public boolean isExpired()
+ /**
+ * For the given labelValues, checks if the metric value has been updated
within the configured {@link #ttlSeconds}.
+ * Returns true and removes the entry from the map if it has expired or if
the entry doesn't exist, otherwise
+ * returns false.
+ */
+ public boolean shouldRemoveIfExpired(List<String> labelValues)
{
if (ttlSeconds == null) {
- log.error("Invalid usage of isExpired(), TTL has not been set");
- return false;
+ throw DruidException.defensive("Invalid usage of shouldRemoveIfExpired,
ttlSeconds has not been set");
}
- return updateTimer.hasElapsed(ttlSeconds);
+ return labelValuesToStopwatch.computeIfPresent(labelValues, (k, v) -> {
+ if (v.hasElapsed(ttlSeconds)) {
+ return null;
+ }
+ return v;
+ }) == null;
}
}
diff --git
a/extensions-contrib/prometheus-emitter/src/main/java/org/apache/druid/emitter/prometheus/PrometheusEmitter.java
b/extensions-contrib/prometheus-emitter/src/main/java/org/apache/druid/emitter/prometheus/PrometheusEmitter.java
index 87583b1ef82..4cad38a511a 100644
---
a/extensions-contrib/prometheus-emitter/src/main/java/org/apache/druid/emitter/prometheus/PrometheusEmitter.java
+++
b/extensions-contrib/prometheus-emitter/src/main/java/org/apache/druid/emitter/prometheus/PrometheusEmitter.java
@@ -20,6 +20,7 @@
package org.apache.druid.emitter.prometheus;
+import com.google.common.annotations.VisibleForTesting;
import com.google.common.collect.ImmutableMap;
import io.prometheus.client.CollectorRegistry;
import io.prometheus.client.Counter;
@@ -36,6 +37,8 @@ import
org.apache.druid.java.util.emitter.service.ServiceMetricEvent;
import java.io.IOException;
import java.net.MalformedURLException;
import java.net.URL;
+import java.util.Arrays;
+import java.util.List;
import java.util.Map;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
@@ -72,6 +75,12 @@ public class PrometheusEmitter implements Emitter
metrics = new Metrics(config);
}
+ public PrometheusEmitter(PrometheusEmitterConfig config,
ScheduledExecutorService exec)
+ {
+ this(config);
+ this.exec = exec;
+ }
+
@Override
public void start()
{
@@ -87,7 +96,7 @@ public class PrometheusEmitter implements Emitter
log.error("HTTPServer is already started");
}
// Start TTL scheduler if TTL is configured
- if (config.getFlushPeriod() != null) {
+ if (config.getFlushPeriod() != null && exec == null) {
exec = ScheduledExecutors.fixed(1, "PrometheusTTLExecutor-%s");
exec.scheduleAtFixedRate(
this::cleanUpStaleMetrics,
@@ -171,14 +180,14 @@ public class PrometheusEmitter implements Emitter
if (metric.getCollector() instanceof Counter) {
((Counter)
metric.getCollector()).labels(labelValues).inc(value.doubleValue());
- metric.resetLastUpdateTime();
+ metric.resetLastUpdateTime(Arrays.asList(labelValues));
} else if (metric.getCollector() instanceof Gauge) {
((Gauge)
metric.getCollector()).labels(labelValues).set(value.doubleValue());
- metric.resetLastUpdateTime();
+ metric.resetLastUpdateTime(Arrays.asList(labelValues));
} else if (metric.getCollector() instanceof Histogram) {
((Histogram) metric.getCollector()).labels(labelValues)
.observe(value.doubleValue() /
metric.getConversionFactor());
- metric.resetLastUpdateTime();
+ metric.resetLastUpdateTime(Arrays.asList(labelValues));
} else {
log.error("Unrecognized metric type [%s]",
metric.getCollector().getClass());
}
@@ -277,7 +286,8 @@ public class PrometheusEmitter implements Emitter
* This method is called periodically by the TTL scheduler when using the
'exporter' strategy with
* a configured flushPeriod.
*/
- private void cleanUpStaleMetrics()
+ @VisibleForTesting
+ protected void cleanUpStaleMetrics()
{
if (config.getFlushPeriod() == null) {
return;
@@ -286,13 +296,15 @@ public class PrometheusEmitter implements Emitter
Map<String, DimensionsAndCollector> map = metrics.getRegisteredMetrics();
for (Map.Entry<String, DimensionsAndCollector> entry : map.entrySet()) {
DimensionsAndCollector metric = entry.getValue();
- if (metric.isExpired()) {
- log.debug(
- "Metric [%s] has expired (last updated [%d] ms ago)",
- entry.getKey(),
- metric.getMillisSinceLastUpdate()
- );
- metric.getCollector().clear();
+ for (List<String> labelValues :
metric.getLabelValuesToStopwatch().keySet()) {
+ if (metric.shouldRemoveIfExpired(labelValues)) {
+ log.debug(
+ "Metric [%s] with labels [%s] has expired",
+ entry.getKey(),
+ labelValues
+ );
+ metric.getCollector().remove(labelValues.toArray(new String[0]));
+ }
}
}
}
diff --git
a/extensions-contrib/prometheus-emitter/src/test/java/org/apache/druid/emitter/prometheus/PrometheusEmitterTest.java
b/extensions-contrib/prometheus-emitter/src/test/java/org/apache/druid/emitter/prometheus/PrometheusEmitterTest.java
index 69da07ff46b..d7e1bcfcb94 100644
---
a/extensions-contrib/prometheus-emitter/src/test/java/org/apache/druid/emitter/prometheus/PrometheusEmitterTest.java
+++
b/extensions-contrib/prometheus-emitter/src/test/java/org/apache/druid/emitter/prometheus/PrometheusEmitterTest.java
@@ -24,6 +24,7 @@ import io.prometheus.client.Collector;
import io.prometheus.client.CollectorRegistry;
import io.prometheus.client.exporter.HTTPServer;
import io.prometheus.client.exporter.PushGateway;
+import org.apache.druid.java.util.common.concurrent.ScheduledExecutors;
import org.apache.druid.java.util.emitter.core.Emitter;
import org.apache.druid.java.util.emitter.service.ServiceMetricEvent;
import org.easymock.EasyMock;
@@ -31,8 +32,11 @@ import org.junit.Assert;
import org.junit.Test;
import java.io.IOException;
+import java.util.Arrays;
import java.util.HashMap;
import java.util.Map;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import static org.easymock.EasyMock.anyObject;
@@ -433,15 +437,17 @@ public class PrometheusEmitterTest
}
@Test
- public void testMetricTtlExpiration()
+ public void testMetricTtlExpiration() throws ExecutionException,
InterruptedException
{
int flushPeriod = 3;
- PrometheusEmitterConfig config = new
PrometheusEmitterConfig(PrometheusEmitterConfig.Strategy.exporter, "test",
null, 0, null, false, true, flushPeriod, null, false, null);
- PrometheusEmitter emitter = new PrometheusEmitter(config);
+ PrometheusEmitterConfig config = new
PrometheusEmitterConfig(PrometheusEmitterConfig.Strategy.exporter, "test",
null, 0, null, true, true, flushPeriod, null, false, null);
+ ScheduledExecutorService exec = ScheduledExecutors.fixed(1,
"PrometheusTTLExecutor-%s");
+ PrometheusEmitter emitter = new PrometheusEmitter(config, exec);
emitter.start();
ServiceMetricEvent event = ServiceMetricEvent.builder()
.setMetric("segment/loadQueue/count", 10)
+ .setDimension("server",
"historical1")
.build(ImmutableMap.of("service", "historical", "host", "druid.test.cn"));
emitter.emit(event);
@@ -452,34 +458,29 @@ public class PrometheusEmitterTest
Assert.assertNotNull("Test metric should be registered", testMetric);
Assert.assertFalse(
"Metric should not be expired initially",
- testMetric.isExpired()
+ testMetric.shouldRemoveIfExpired(Arrays.asList("historical",
"druid.test.cn", "historical1"))
);
+ Assert.assertEquals(1,
testMetric.getCollector().collect().get(0).samples.size());
// Wait for the metric to expire (ttl + 1 second buffer)
- try {
- Thread.sleep(TimeUnit.SECONDS.toMillis(flushPeriod) + 1000);
- }
- catch (InterruptedException e) {
- Thread.currentThread().interrupt();
- }
-
- Assert.assertTrue(
- "Metric should be expired after TTL",
- testMetric.isExpired()
- );
+ Thread.sleep(TimeUnit.SECONDS.toMillis(flushPeriod) + 1000);
+ exec.submit(emitter::cleanUpStaleMetrics).get();
+ Assert.assertEquals(0,
testMetric.getCollector().collect().get(0).samples.size());
emitter.close();
}
@Test
- public void testMetricTtlUpdate()
+ public void testMetricTtlUpdate() throws ExecutionException,
InterruptedException
{
int flushPeriod = 3;
- PrometheusEmitterConfig config = new
PrometheusEmitterConfig(PrometheusEmitterConfig.Strategy.exporter, "test",
null, 0, null, false, true, flushPeriod, null, false, null);
- PrometheusEmitter emitter = new PrometheusEmitter(config);
+ PrometheusEmitterConfig config = new
PrometheusEmitterConfig(PrometheusEmitterConfig.Strategy.exporter, "test",
null, 0, null, true, true, flushPeriod, null, false, null);
+ ScheduledExecutorService exec = ScheduledExecutors.fixed(1,
"PrometheusTTLExecutor-%s");
+ PrometheusEmitter emitter = new PrometheusEmitter(config, exec);
emitter.start();
ServiceMetricEvent event = ServiceMetricEvent.builder()
.setMetric("segment/loadQueue/count", 10)
+ .setDimension("server",
"historical1")
.build(ImmutableMap.of("service", "historical", "host", "druid.test.cn"));
emitter.emit(event);
@@ -493,29 +494,85 @@ public class PrometheusEmitterTest
);
Assert.assertFalse(
"Metric should not be expired initially",
- testMetric.isExpired()
+ testMetric.shouldRemoveIfExpired(Arrays.asList("historical",
"druid.test.cn", "historical1"))
);
+ Assert.assertEquals(1,
testMetric.getCollector().collect().get(0).samples.size());
// Wait for a little, but not long enough for the metric to expire
long waitTime = TimeUnit.SECONDS.toMillis(flushPeriod) / 5;
- try {
- Thread.sleep(waitTime);
- }
- catch (InterruptedException e) {
- Thread.currentThread().interrupt();
- }
+ Thread.sleep(waitTime);
Assert.assertFalse(
"Metric should not be expired",
- testMetric.isExpired()
+ testMetric.shouldRemoveIfExpired(Arrays.asList("historical",
"druid.test.cn", "historical1"))
);
- emitter.emit(event);
+ exec.submit(emitter::cleanUpStaleMetrics).get();
+ Assert.assertEquals(1,
testMetric.getCollector().collect().get(0).samples.size());
+ emitter.close();
+ }
+
+ @Test
+ public void testMetricTtlUpdateWithDifferentLabels() throws
ExecutionException, InterruptedException
+ {
+ int flushPeriod = 3;
+ PrometheusEmitterConfig config = new
PrometheusEmitterConfig(PrometheusEmitterConfig.Strategy.exporter, "test",
null, 0, null, true, true, flushPeriod, null, false, null);
+ ScheduledExecutorService exec = ScheduledExecutors.fixed(1,
"PrometheusTTLExecutor-%s");
+ PrometheusEmitter emitter = new PrometheusEmitter(config, exec);
+ emitter.start();
+
+ ServiceMetricEvent event1 = ServiceMetricEvent.builder()
+
.setMetric("segment/loadQueue/count", 10)
+ .setDimension("server",
"historical1")
+
.build(ImmutableMap.of("service", "historical", "host", "druid.test.cn"));
+ ServiceMetricEvent event2 = ServiceMetricEvent.builder()
+
.setMetric("segment/loadQueue/count", 10)
+ .setDimension("server",
"historical2")
+
.build(ImmutableMap.of("service", "historical", "host", "druid.test.cn"));
+ emitter.emit(event1);
+ emitter.emit(event2);
+
+ // Get the metrics and check that it's not expired initially
+ Map<String, DimensionsAndCollector> registeredMetrics =
emitter.getMetrics().getRegisteredMetrics();
+ DimensionsAndCollector testMetric =
registeredMetrics.get("segment/loadQueue/count");
+
+ Assert.assertNotNull(
+ "Test metric should be registered",
+ testMetric
+ );
+ Assert.assertFalse(
+ "Metric should not be expired initially",
+ testMetric.shouldRemoveIfExpired(Arrays.asList("historical",
"druid.test.cn", "historical1"))
+ );
+ Assert.assertFalse(
+ "Metric should not be expired initially",
+ testMetric.shouldRemoveIfExpired(Arrays.asList("historical",
"druid.test.cn", "historical2"))
+ );
+ exec.submit(emitter::cleanUpStaleMetrics).get();
+ Assert.assertEquals(2,
testMetric.getCollector().collect().get(0).samples.size());
- long timeSinceLastUpdate = testMetric.getMillisSinceLastUpdate();
- Assert.assertTrue(
- "Update time should have been refreshed",
- timeSinceLastUpdate < waitTime
+ // Wait for a little, but not long enough for the metric to expire
+ long waitTime = TimeUnit.SECONDS.toMillis(flushPeriod) / 5;
+ Thread.sleep(waitTime);
+
+ Assert.assertFalse(
+ "Metric should not be expired",
+ testMetric.shouldRemoveIfExpired(Arrays.asList("historical",
"druid.test.cn", "historical1"))
);
+ Assert.assertFalse(
+ "Metric should not be expired",
+ testMetric.shouldRemoveIfExpired(Arrays.asList("historical",
"druid.test.cn", "historical2"))
+ );
+ exec.submit(emitter::cleanUpStaleMetrics).get();
+ Assert.assertEquals(2,
testMetric.getCollector().collect().get(0).samples.size());
+ // Reset update time only for event2
+ emitter.emit(event2);
+
+ // Wait for the remainder of the TTL to allow event1 to expire
+ Thread.sleep(waitTime * 4);
+
+ exec.submit(emitter::cleanUpStaleMetrics).get();
+ Assert.assertEquals(1,
testMetric.getCollector().collect().get(0).samples.size());
emitter.close();
}
+
}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]