This is an automated email from the ASF dual-hosted git repository.

abhishekrb pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/druid.git


The following commit(s) were added to refs/heads/master by this push:
     new 5f70c8300d6 Prometheus per label ttl (#18689)
5f70c8300d6 is described below

commit 5f70c8300d6d36f49436138450081e79bb97b654
Author: aho135 <[email protected]>
AuthorDate: Tue Nov 4 14:59:42 2025 -0800

    Prometheus per label ttl (#18689)
    
    Fixes #14638
    
    This is an extension of #18598 to enhance the TTL to track metrics at the 
label level instead of just the metric level. This should allow operators to 
configure medium cardinality dimensions on the prometheus-emitter without 
putting pressure on the Prometheus collector/sub-system.
---
 docs/development/extensions-contrib/prometheus.md  |   2 +-
 .../emitter/prometheus/DimensionsAndCollector.java |  44 ++++++--
 .../emitter/prometheus/PrometheusEmitter.java      |  36 ++++---
 .../emitter/prometheus/PrometheusEmitterTest.java  | 119 +++++++++++++++------
 4 files changed, 147 insertions(+), 54 deletions(-)

diff --git a/docs/development/extensions-contrib/prometheus.md 
b/docs/development/extensions-contrib/prometheus.md
index d5660e2e54b..2b4b042be48 100644
--- a/docs/development/extensions-contrib/prometheus.md
+++ b/docs/development/extensions-contrib/prometheus.md
@@ -44,7 +44,7 @@ All the configuration parameters for the Prometheus emitter 
are under `druid.emi
 | `druid.emitter.prometheus.addHostAsLabel`     | Flag to include the hostname 
as a prometheus label.                                                          
                                                                                
                                          | no        | false                   
             |
 | `druid.emitter.prometheus.addServiceAsLabel`  | Flag to include the druid 
service name (e.g. `druid/broker`, `druid/coordinator`, etc.) as a prometheus 
label.                                                                          
                                               | no        | false              
                  |
 | `druid.emitter.prometheus.pushGatewayAddress` | Pushgateway address. 
Required if using `pushgateway` strategy.                                       
                                                                                
                                                  | no        | none            
                     |
-| `druid.emitter.prometheus.flushPeriod`        | When using the `pushgateway` 
strategy metrics are emitted every `flushPeriod` seconds. <br/>When using the 
`exporter` strategy this configures the metric TTL such that if the metric 
value is not updated within `flushPeriod` seconds then it will stop being 
emitted. Note that unique label combinations per metric are currently not 
subject to TTL expiration. It is recommended to set this to at least 3 * 
`scrape_interval`. | Required if `pushg [...]
+| `druid.emitter.prometheus.flushPeriod`        | When using the `pushgateway` 
strategy metrics are emitted every `flushPeriod` seconds. <br/>When using the 
`exporter` strategy this configures the metric TTL such that if the metric 
value is not updated within `flushPeriod` seconds then it will stop being 
emitted. Note that the TTL applies for each unique label combination per 
metric. It is recommended to set this to at least 3 * `scrape_interval`. | 
Required if `pushgateway` strategy is  [...]
 | `druid.emitter.prometheus.extraLabels`        | JSON key-value pairs for 
additional labels on all metrics. Keys (label names) must match the regex 
`[a-zA-Z_:][a-zA-Z0-9_:]*`. Example: `{"cluster_name": "druid_cluster1", "env": 
"staging"}`.                                        | no        | none          
                       |
 | `druid.emitter.prometheus.deletePushGatewayMetricsOnShutdown` | Flag to 
delete metrics from Pushgateway on task shutdown. Works only if `pushgateway` 
strategy is used. This feature allows to delete a stale metrics from batch 
executed tasks. Otherwise, the Pushgateway will store these stale metrics 
indefinitely as there is [no time to live 
mechanism](https://github.com/prometheus/pushgateway/issues/117), using the 
memory to hold data that was already scraped by Prometheus. | no | false |
 | `druid.emitter.prometheus.waitForShutdownDelay` | Time in milliseconds to 
wait for peon tasks to delete metrics from the Pushgateway on shutdown (e.g. 
60_000). Applicable only when `pushgateway` strategy is used and 
`deletePushGatewayMetricsOnShutdown` is set to true. There is no guarantee that 
a peon task will delete metrics from the gateway if the configured delay is 
more than the [Peon's 
`druid.indexer.task.gracefulShutdownTimeout`](https://druid.apache.org/docs/latest/configuration
 [...]
diff --git 
a/extensions-contrib/prometheus-emitter/src/main/java/org/apache/druid/emitter/prometheus/DimensionsAndCollector.java
 
b/extensions-contrib/prometheus-emitter/src/main/java/org/apache/druid/emitter/prometheus/DimensionsAndCollector.java
index 73be1eed205..0d75d45034b 100644
--- 
a/extensions-contrib/prometheus-emitter/src/main/java/org/apache/druid/emitter/prometheus/DimensionsAndCollector.java
+++ 
b/extensions-contrib/prometheus-emitter/src/main/java/org/apache/druid/emitter/prometheus/DimensionsAndCollector.java
@@ -20,11 +20,15 @@
 package org.apache.druid.emitter.prometheus;
 
 import io.prometheus.client.SimpleCollector;
+import org.apache.druid.error.DruidException;
 import org.apache.druid.java.util.common.Stopwatch;
 import org.apache.druid.java.util.common.logger.Logger;
 import org.joda.time.Duration;
 
 import javax.annotation.Nullable;
+import java.util.List;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
 
 public class DimensionsAndCollector
 {
@@ -33,7 +37,7 @@ public class DimensionsAndCollector
   private final SimpleCollector collector;
   private final double conversionFactor;
   private final double[] histogramBuckets;
-  private final Stopwatch updateTimer;
+  private final ConcurrentHashMap<List<String>, Stopwatch> 
labelValuesToStopwatch;
   private final Duration ttlSeconds;
 
   DimensionsAndCollector(String[] dimensions, SimpleCollector collector, 
double conversionFactor, double[] histogramBuckets, @Nullable Integer 
ttlSeconds)
@@ -42,7 +46,7 @@ public class DimensionsAndCollector
     this.collector = collector;
     this.conversionFactor = conversionFactor;
     this.histogramBuckets = histogramBuckets;
-    this.updateTimer = Stopwatch.createStarted();
+    this.labelValuesToStopwatch = new ConcurrentHashMap<>();
     this.ttlSeconds = ttlSeconds != null ? 
Duration.standardSeconds(ttlSeconds) : null;
   }
 
@@ -66,22 +70,42 @@ public class DimensionsAndCollector
     return histogramBuckets;
   }
 
-  public void resetLastUpdateTime()
+  /**
+   * For each unique set of labelValues, keeps track of the amount of time 
that has elapsed since its metric
+   * value has been updated.
+   */
+  public void resetLastUpdateTime(List<String> labelValues)
   {
-    updateTimer.restart();
+    labelValuesToStopwatch.compute(labelValues, (k, v) -> {
+      if (v != null) {
+        v.restart();
+        return v;
+      } else {
+        return Stopwatch.createStarted();
+      }
+    });
   }
 
-  public long getMillisSinceLastUpdate()
+  public ConcurrentMap<List<String>, Stopwatch> getLabelValuesToStopwatch()
   {
-    return updateTimer.millisElapsed();
+    return labelValuesToStopwatch;
   }
 
-  public boolean isExpired()
+  /**
+   * For the given labelValues, checks if the metric value has been updated 
within the configured {@link #ttlSeconds}.
+   * Returns true and removes the entry from the map if it has expired or if 
the entry doesn't exist, otherwise
+   * returns false.
+   */
+  public boolean shouldRemoveIfExpired(List<String> labelValues)
   {
     if (ttlSeconds == null) {
-      log.error("Invalid usage of isExpired(), TTL has not been set");
-      return false;
+      throw DruidException.defensive("Invalid usage of shouldRemoveIfExpired, 
ttlSeconds has not been set");
     }
-    return updateTimer.hasElapsed(ttlSeconds);
+    return labelValuesToStopwatch.computeIfPresent(labelValues, (k, v) -> {
+      if (v.hasElapsed(ttlSeconds)) {
+        return null;
+      }
+      return v;
+    }) == null;
   }
 }
diff --git 
a/extensions-contrib/prometheus-emitter/src/main/java/org/apache/druid/emitter/prometheus/PrometheusEmitter.java
 
b/extensions-contrib/prometheus-emitter/src/main/java/org/apache/druid/emitter/prometheus/PrometheusEmitter.java
index 87583b1ef82..4cad38a511a 100644
--- 
a/extensions-contrib/prometheus-emitter/src/main/java/org/apache/druid/emitter/prometheus/PrometheusEmitter.java
+++ 
b/extensions-contrib/prometheus-emitter/src/main/java/org/apache/druid/emitter/prometheus/PrometheusEmitter.java
@@ -20,6 +20,7 @@
 package org.apache.druid.emitter.prometheus;
 
 
+import com.google.common.annotations.VisibleForTesting;
 import com.google.common.collect.ImmutableMap;
 import io.prometheus.client.CollectorRegistry;
 import io.prometheus.client.Counter;
@@ -36,6 +37,8 @@ import 
org.apache.druid.java.util.emitter.service.ServiceMetricEvent;
 import java.io.IOException;
 import java.net.MalformedURLException;
 import java.net.URL;
+import java.util.Arrays;
+import java.util.List;
 import java.util.Map;
 import java.util.concurrent.ScheduledExecutorService;
 import java.util.concurrent.TimeUnit;
@@ -72,6 +75,12 @@ public class PrometheusEmitter implements Emitter
     metrics = new Metrics(config);
   }
 
+  public PrometheusEmitter(PrometheusEmitterConfig config, 
ScheduledExecutorService exec)
+  {
+    this(config);
+    this.exec = exec;
+  }
+
   @Override
   public void start()
   {
@@ -87,7 +96,7 @@ public class PrometheusEmitter implements Emitter
         log.error("HTTPServer is already started");
       }
       // Start TTL scheduler if TTL is configured
-      if (config.getFlushPeriod() != null) {
+      if (config.getFlushPeriod() != null && exec == null) {
         exec = ScheduledExecutors.fixed(1, "PrometheusTTLExecutor-%s");
         exec.scheduleAtFixedRate(
             this::cleanUpStaleMetrics,
@@ -171,14 +180,14 @@ public class PrometheusEmitter implements Emitter
 
       if (metric.getCollector() instanceof Counter) {
         ((Counter) 
metric.getCollector()).labels(labelValues).inc(value.doubleValue());
-        metric.resetLastUpdateTime();
+        metric.resetLastUpdateTime(Arrays.asList(labelValues));
       } else if (metric.getCollector() instanceof Gauge) {
         ((Gauge) 
metric.getCollector()).labels(labelValues).set(value.doubleValue());
-        metric.resetLastUpdateTime();
+        metric.resetLastUpdateTime(Arrays.asList(labelValues));
       } else if (metric.getCollector() instanceof Histogram) {
         ((Histogram) metric.getCollector()).labels(labelValues)
                                            .observe(value.doubleValue() / 
metric.getConversionFactor());
-        metric.resetLastUpdateTime();
+        metric.resetLastUpdateTime(Arrays.asList(labelValues));
       } else {
         log.error("Unrecognized metric type [%s]", 
metric.getCollector().getClass());
       }
@@ -277,7 +286,8 @@ public class PrometheusEmitter implements Emitter
    * This method is called periodically by the TTL scheduler when using the 
'exporter' strategy with
    * a configured flushPeriod.
    */
-  private void cleanUpStaleMetrics()
+  @VisibleForTesting
+  protected void cleanUpStaleMetrics()
   {
     if (config.getFlushPeriod() == null) {
       return;
@@ -286,13 +296,15 @@ public class PrometheusEmitter implements Emitter
     Map<String, DimensionsAndCollector> map = metrics.getRegisteredMetrics();
     for (Map.Entry<String, DimensionsAndCollector> entry : map.entrySet()) {
       DimensionsAndCollector metric = entry.getValue();
-      if (metric.isExpired()) {
-        log.debug(
-            "Metric [%s] has expired (last updated [%d] ms ago)",
-            entry.getKey(),
-            metric.getMillisSinceLastUpdate()
-        );
-        metric.getCollector().clear();
+      for (List<String> labelValues : 
metric.getLabelValuesToStopwatch().keySet()) {
+        if (metric.shouldRemoveIfExpired(labelValues)) {
+          log.debug(
+              "Metric [%s] with labels [%s] has expired",
+              entry.getKey(),
+              labelValues
+          );
+          metric.getCollector().remove(labelValues.toArray(new String[0]));
+        }
       }
     }
   }
diff --git 
a/extensions-contrib/prometheus-emitter/src/test/java/org/apache/druid/emitter/prometheus/PrometheusEmitterTest.java
 
b/extensions-contrib/prometheus-emitter/src/test/java/org/apache/druid/emitter/prometheus/PrometheusEmitterTest.java
index 69da07ff46b..d7e1bcfcb94 100644
--- 
a/extensions-contrib/prometheus-emitter/src/test/java/org/apache/druid/emitter/prometheus/PrometheusEmitterTest.java
+++ 
b/extensions-contrib/prometheus-emitter/src/test/java/org/apache/druid/emitter/prometheus/PrometheusEmitterTest.java
@@ -24,6 +24,7 @@ import io.prometheus.client.Collector;
 import io.prometheus.client.CollectorRegistry;
 import io.prometheus.client.exporter.HTTPServer;
 import io.prometheus.client.exporter.PushGateway;
+import org.apache.druid.java.util.common.concurrent.ScheduledExecutors;
 import org.apache.druid.java.util.emitter.core.Emitter;
 import org.apache.druid.java.util.emitter.service.ServiceMetricEvent;
 import org.easymock.EasyMock;
@@ -31,8 +32,11 @@ import org.junit.Assert;
 import org.junit.Test;
 
 import java.io.IOException;
+import java.util.Arrays;
 import java.util.HashMap;
 import java.util.Map;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.ScheduledExecutorService;
 import java.util.concurrent.TimeUnit;
 
 import static org.easymock.EasyMock.anyObject;
@@ -433,15 +437,17 @@ public class PrometheusEmitterTest
   }
 
   @Test
-  public void testMetricTtlExpiration()
+  public void testMetricTtlExpiration() throws ExecutionException, 
InterruptedException
   {
     int flushPeriod = 3;
-    PrometheusEmitterConfig config = new 
PrometheusEmitterConfig(PrometheusEmitterConfig.Strategy.exporter, "test", 
null, 0, null, false, true, flushPeriod, null, false, null);
-    PrometheusEmitter emitter = new PrometheusEmitter(config);
+    PrometheusEmitterConfig config = new 
PrometheusEmitterConfig(PrometheusEmitterConfig.Strategy.exporter, "test", 
null, 0, null, true, true, flushPeriod, null, false, null);
+    ScheduledExecutorService exec = ScheduledExecutors.fixed(1, 
"PrometheusTTLExecutor-%s");
+    PrometheusEmitter emitter = new PrometheusEmitter(config, exec);
     emitter.start();
 
     ServiceMetricEvent event = ServiceMetricEvent.builder()
                                                  
.setMetric("segment/loadQueue/count", 10)
+                                                 .setDimension("server", 
"historical1")
                                                  
.build(ImmutableMap.of("service", "historical", "host", "druid.test.cn"));
     emitter.emit(event);
 
@@ -452,34 +458,29 @@ public class PrometheusEmitterTest
     Assert.assertNotNull("Test metric should be registered", testMetric);
     Assert.assertFalse(
         "Metric should not be expired initially",
-        testMetric.isExpired()
+        testMetric.shouldRemoveIfExpired(Arrays.asList("historical", 
"druid.test.cn", "historical1"))
     );
+    Assert.assertEquals(1, 
testMetric.getCollector().collect().get(0).samples.size());
 
     // Wait for the metric to expire (ttl + 1 second buffer)
-    try {
-      Thread.sleep(TimeUnit.SECONDS.toMillis(flushPeriod) + 1000);
-    }
-    catch (InterruptedException e) {
-      Thread.currentThread().interrupt();
-    }
-
-    Assert.assertTrue(
-        "Metric should be expired after TTL",
-        testMetric.isExpired()
-    );
+    Thread.sleep(TimeUnit.SECONDS.toMillis(flushPeriod) + 1000);
+    exec.submit(emitter::cleanUpStaleMetrics).get();
+    Assert.assertEquals(0, 
testMetric.getCollector().collect().get(0).samples.size());
     emitter.close();
   }
 
   @Test
-  public void testMetricTtlUpdate()
+  public void testMetricTtlUpdate() throws ExecutionException, 
InterruptedException
   {
     int flushPeriod = 3;
-    PrometheusEmitterConfig config = new 
PrometheusEmitterConfig(PrometheusEmitterConfig.Strategy.exporter, "test", 
null, 0, null, false, true, flushPeriod, null, false, null);
-    PrometheusEmitter emitter = new PrometheusEmitter(config);
+    PrometheusEmitterConfig config = new 
PrometheusEmitterConfig(PrometheusEmitterConfig.Strategy.exporter, "test", 
null, 0, null, true, true, flushPeriod, null, false, null);
+    ScheduledExecutorService exec = ScheduledExecutors.fixed(1, 
"PrometheusTTLExecutor-%s");
+    PrometheusEmitter emitter = new PrometheusEmitter(config, exec);
     emitter.start();
 
     ServiceMetricEvent event = ServiceMetricEvent.builder()
                                                  
.setMetric("segment/loadQueue/count", 10)
+                                                 .setDimension("server", 
"historical1")
                                                  
.build(ImmutableMap.of("service", "historical", "host", "druid.test.cn"));
     emitter.emit(event);
 
@@ -493,29 +494,85 @@ public class PrometheusEmitterTest
     );
     Assert.assertFalse(
         "Metric should not be expired initially",
-        testMetric.isExpired()
+        testMetric.shouldRemoveIfExpired(Arrays.asList("historical", 
"druid.test.cn", "historical1"))
     );
+    Assert.assertEquals(1, 
testMetric.getCollector().collect().get(0).samples.size());
 
     // Wait for a little, but not long enough for the metric to expire
     long waitTime = TimeUnit.SECONDS.toMillis(flushPeriod) / 5;
-    try {
-      Thread.sleep(waitTime);
-    }
-    catch (InterruptedException e) {
-      Thread.currentThread().interrupt();
-    }
+    Thread.sleep(waitTime);
 
     Assert.assertFalse(
         "Metric should not be expired",
-        testMetric.isExpired()
+        testMetric.shouldRemoveIfExpired(Arrays.asList("historical", 
"druid.test.cn", "historical1"))
     );
-    emitter.emit(event);
+    exec.submit(emitter::cleanUpStaleMetrics).get();
+    Assert.assertEquals(1, 
testMetric.getCollector().collect().get(0).samples.size());
+    emitter.close();
+  }
+
+  @Test
+  public void testMetricTtlUpdateWithDifferentLabels() throws 
ExecutionException, InterruptedException
+  {
+    int flushPeriod = 3;
+    PrometheusEmitterConfig config = new 
PrometheusEmitterConfig(PrometheusEmitterConfig.Strategy.exporter, "test", 
null, 0, null, true, true, flushPeriod, null, false, null);
+    ScheduledExecutorService exec = ScheduledExecutors.fixed(1, 
"PrometheusTTLExecutor-%s");
+    PrometheusEmitter emitter = new PrometheusEmitter(config, exec);
+    emitter.start();
+
+    ServiceMetricEvent event1 = ServiceMetricEvent.builder()
+                                                 
.setMetric("segment/loadQueue/count", 10)
+                                                 .setDimension("server", 
"historical1")
+                                                 
.build(ImmutableMap.of("service", "historical", "host", "druid.test.cn"));
+    ServiceMetricEvent event2 = ServiceMetricEvent.builder()
+                                                  
.setMetric("segment/loadQueue/count", 10)
+                                                  .setDimension("server", 
"historical2")
+                                                  
.build(ImmutableMap.of("service", "historical", "host", "druid.test.cn"));
+    emitter.emit(event1);
+    emitter.emit(event2);
+
+    // Get the metrics and check that it's not expired initially
+    Map<String, DimensionsAndCollector> registeredMetrics = 
emitter.getMetrics().getRegisteredMetrics();
+    DimensionsAndCollector testMetric = 
registeredMetrics.get("segment/loadQueue/count");
+
+    Assert.assertNotNull(
+        "Test metric should be registered",
+        testMetric
+    );
+    Assert.assertFalse(
+        "Metric should not be expired initially",
+        testMetric.shouldRemoveIfExpired(Arrays.asList("historical", 
"druid.test.cn", "historical1"))
+    );
+    Assert.assertFalse(
+        "Metric should not be expired initially",
+        testMetric.shouldRemoveIfExpired(Arrays.asList("historical", 
"druid.test.cn", "historical2"))
+    );
+    exec.submit(emitter::cleanUpStaleMetrics).get();
+    Assert.assertEquals(2, 
testMetric.getCollector().collect().get(0).samples.size());
 
-    long timeSinceLastUpdate = testMetric.getMillisSinceLastUpdate();
-    Assert.assertTrue(
-        "Update time should have been refreshed",
-        timeSinceLastUpdate < waitTime
+    // Wait for a little, but not long enough for the metric to expire
+    long waitTime = TimeUnit.SECONDS.toMillis(flushPeriod) / 5;
+    Thread.sleep(waitTime);
+
+    Assert.assertFalse(
+        "Metric should not be expired",
+        testMetric.shouldRemoveIfExpired(Arrays.asList("historical", 
"druid.test.cn", "historical1"))
     );
+    Assert.assertFalse(
+        "Metric should not be expired",
+        testMetric.shouldRemoveIfExpired(Arrays.asList("historical", 
"druid.test.cn", "historical2"))
+    );
+    exec.submit(emitter::cleanUpStaleMetrics).get();
+    Assert.assertEquals(2, 
testMetric.getCollector().collect().get(0).samples.size());
+    // Reset update time only for event2
+    emitter.emit(event2);
+
+    // Wait for the remainder of the TTL to allow event1 to expire
+    Thread.sleep(waitTime * 4);
+
+    exec.submit(emitter::cleanUpStaleMetrics).get();
+    Assert.assertEquals(1, 
testMetric.getCollector().collect().get(0).samples.size());
     emitter.close();
   }
+
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to