gianm commented on code in PR #17919:
URL: https://github.com/apache/druid/pull/17919#discussion_r2046096750


##########
extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/indexing/kafka/KafkaConsumerMonitor.java:
##########
@@ -60,18 +131,33 @@ public boolean doMonitor(final ServiceEmitter emitter)
   {
     for (final Map.Entry<MetricName, ? extends Metric> entry : 
consumer.metrics().entrySet()) {
       final MetricName metricName = entry.getKey();
+      final KafkaConsumerMetric kafkaConsumerMetric = 
METRICS.get(metricName.name());
+
+      if (kafkaConsumerMetric != null &&
+          
kafkaConsumerMetric.getDimensions().equals(metricName.tags().keySet())) {
+        final Number newValue = (Number) entry.getValue().metricValue();
+        final Number emitValue;
 
-      if (METRICS.containsKey(metricName.name()) && isTopicMetric(metricName)) 
{
-        final String topic = metricName.tags().get(TOPIC_TAG);
-        final long newValue = ((Number) 
entry.getValue().metricValue()).longValue();
-        final long priorValue =
-            counters.computeIfAbsent(metricName.name(), ignored -> new 
AtomicLong())
-                    .getAndSet(newValue);
+        if (kafkaConsumerMetric.getMetricType() == 
KafkaConsumerMetric.MetricType.GAUGE || newValue == null) {
+          emitValue = newValue;
+        } else if (kafkaConsumerMetric.getMetricType() == 
KafkaConsumerMetric.MetricType.COUNTER) {
+          final long newValueAsLong = newValue.longValue();
+          final long priorValue =
+              counters.computeIfAbsent(metricName, ignored -> new AtomicLong())
+                      .getAndSet(newValueAsLong);
+          emitValue = newValueAsLong - priorValue;
+        } else {
+          throw DruidException.defensive("Unexpected metric type[%s]", 
kafkaConsumerMetric.getMetricType());

Review Comment:
   It would only happen if there is an enum value for 
`KafkaConsumerMetric.MetricType` that isn't handled here. Since 
`KafkaConsumerMetric` is only used by this monitor, it seems hard to believe it 
would ever happen. That is why I chose to use a defensive check.
   
   Tracing through the code, it looks like exceptions here are ultimately 
caught in `ScheduledExecutors.scheduleAtFixedRate` and logged at `ERROR` level.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to