abhishekrb19 commented on code in PR #18689:
URL: https://github.com/apache/druid/pull/18689#discussion_r2488095850
##########
extensions-contrib/prometheus-emitter/src/main/java/org/apache/druid/emitter/prometheus/DimensionsAndCollector.java:
##########
@@ -66,22 +70,42 @@ public double[] getHistogramBuckets()
return histogramBuckets;
}
- public void resetLastUpdateTime()
+ /**
+ * For each unique set of labelValues, keeps track of the amount of time
that has elapsed since its metric
+ * value has been updated.
+ */
+ public void resetLastUpdateTime(List<String> labelValues)
{
- updateTimer.restart();
+ labelValuesToStopwatch.compute(labelValues, (k, v) -> {
+ if (v != null) {
+ v.restart();
+ return v;
+ } else {
+ return Stopwatch.createStarted();
+ }
+ });
}
- public long getMillisSinceLastUpdate()
+ public ConcurrentMap<List<String>, Stopwatch> getLabelValuesToStopwatch()
{
- return updateTimer.millisElapsed();
+ return labelValuesToStopwatch;
}
- public boolean isExpired()
+ /**
+ * For the given labelValues, checks if the metric value has been updated
within the configured flushPeriod.
+ * Returns true and removes the entry from the map if it has expired or if
the entry doesn't exist, otherwise
+ * returns false.
+ */
+ public boolean shouldRemoveIfExpired(List<String> labelValues)
{
if (ttlSeconds == null) {
- log.error("Invalid usage of isExpired(), TTL has not been set");
- return false;
+ throw DruidException.defensive("Invalid usage of removeIfExpired,
flushPeriod has not been set");
Review Comment:
```suggestion
throw DruidException.defensive("Invalid usage of
shouldRemoveIfExpired, ttlSeconds has not been set");
```
##########
extensions-contrib/prometheus-emitter/src/main/java/org/apache/druid/emitter/prometheus/DimensionsAndCollector.java:
##########
@@ -66,22 +70,42 @@ public double[] getHistogramBuckets()
return histogramBuckets;
}
- public void resetLastUpdateTime()
+ /**
+ * For each unique set of labelValues, keeps track of the amount of time
that has elapsed since its metric
+ * value has been updated.
+ */
+ public void resetLastUpdateTime(List<String> labelValues)
{
- updateTimer.restart();
+ labelValuesToStopwatch.compute(labelValues, (k, v) -> {
+ if (v != null) {
+ v.restart();
+ return v;
+ } else {
+ return Stopwatch.createStarted();
+ }
+ });
}
- public long getMillisSinceLastUpdate()
+ public ConcurrentMap<List<String>, Stopwatch> getLabelValuesToStopwatch()
{
- return updateTimer.millisElapsed();
+ return labelValuesToStopwatch;
}
- public boolean isExpired()
+ /**
+ * For the given labelValues, checks if the metric value has been updated
within the configured flushPeriod.
Review Comment:
```suggestion
* For the given labelValues, checks if the metric value has been updated
within the configured {@link #ttlSeconds}.
```
##########
extensions-contrib/prometheus-emitter/src/main/java/org/apache/druid/emitter/prometheus/PrometheusEmitter.java:
##########
@@ -277,7 +285,7 @@ public void setPushGateway(PushGateway pushGateway)
* This method is called periodically by the TTL scheduler when using the
'exporter' strategy with
* a configured flushPeriod.
*/
- private void cleanUpStaleMetrics()
+ protected void cleanUpStaleMetrics()
Review Comment:
Could we make this `public` scope with a `@VisibleForTesting` annotation?
Alternatively, we can pass the `WrappingScheduledExecutorService` from tests
and keep this private
##########
extensions-contrib/prometheus-emitter/src/test/java/org/apache/druid/emitter/prometheus/PrometheusEmitterTest.java:
##########
@@ -493,29 +494,94 @@ public void testMetricTtlUpdate()
);
Assert.assertFalse(
"Metric should not be expired initially",
- testMetric.isExpired()
+ testMetric.shouldRemoveIfExpired(Arrays.asList("historical",
"druid.test.cn", "historical1"))
);
+ Assert.assertEquals(1,
testMetric.getCollector().collect().get(0).samples.size());
// Wait for a little, but not long enough for the metric to expire
long waitTime = TimeUnit.SECONDS.toMillis(flushPeriod) / 5;
- try {
- Thread.sleep(waitTime);
- }
- catch (InterruptedException e) {
- Thread.currentThread().interrupt();
- }
+ sleep(waitTime);
Assert.assertFalse(
"Metric should not be expired",
- testMetric.isExpired()
+ testMetric.shouldRemoveIfExpired(Arrays.asList("historical",
"druid.test.cn", "historical1"))
);
- emitter.emit(event);
+ exec.submit(emitter::cleanUpStaleMetrics).get();
+ Assert.assertEquals(1,
testMetric.getCollector().collect().get(0).samples.size());
+ emitter.close();
+ }
+
+ @Test
+ public void testMetricTtlUpdateWithDifferentLabels() throws
ExecutionException, InterruptedException
+ {
+ int flushPeriod = 3;
+ PrometheusEmitterConfig config = new
PrometheusEmitterConfig(PrometheusEmitterConfig.Strategy.exporter, "test",
null, 0, null, true, true, flushPeriod, null, false, null);
+ ScheduledExecutorService exec = ScheduledExecutors.fixed(1,
"PrometheusTTLExecutor-%s");
+ PrometheusEmitter emitter = new PrometheusEmitter(config, exec);
+ emitter.start();
+
+ ServiceMetricEvent event1 = ServiceMetricEvent.builder()
+
.setMetric("segment/loadQueue/count", 10)
+ .setDimension("server",
"historical1")
+
.build(ImmutableMap.of("service", "historical", "host", "druid.test.cn"));
+ ServiceMetricEvent event2 = ServiceMetricEvent.builder()
+
.setMetric("segment/loadQueue/count", 10)
+ .setDimension("server",
"historical2")
+
.build(ImmutableMap.of("service", "historical", "host", "druid.test.cn"));
+ emitter.emit(event1);
+ emitter.emit(event2);
+
+ // Get the metrics and check that it's not expired initially
+ Map<String, DimensionsAndCollector> registeredMetrics =
emitter.getMetrics().getRegisteredMetrics();
+ DimensionsAndCollector testMetric =
registeredMetrics.get("segment/loadQueue/count");
+
+ Assert.assertNotNull(
+ "Test metric should be registered",
+ testMetric
+ );
+ Assert.assertFalse(
+ "Metric should not be expired initially",
+ testMetric.shouldRemoveIfExpired(Arrays.asList("historical",
"druid.test.cn", "historical1"))
+ );
+ Assert.assertFalse(
+ "Metric should not be expired initially",
+ testMetric.shouldRemoveIfExpired(Arrays.asList("historical",
"druid.test.cn", "historical2"))
+ );
+ exec.submit(emitter::cleanUpStaleMetrics).get();
+ Assert.assertEquals(2,
testMetric.getCollector().collect().get(0).samples.size());
- long timeSinceLastUpdate = testMetric.getMillisSinceLastUpdate();
- Assert.assertTrue(
- "Update time should have been refreshed",
- timeSinceLastUpdate < waitTime
+ // Wait for a little, but not long enough for the metric to expire
+ long waitTime = TimeUnit.SECONDS.toMillis(flushPeriod) / 5;
+ sleep(waitTime);
Review Comment:
I think we can directly call `Thread.sleep()` since the unit tests already
throw `InterruptedException`
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]