This is an automated email from the ASF dual-hosted git repository.

rgoers pushed a commit to branch upgrade-prometheus-1.7.0
in repository https://gitbox.apache.org/repos/asf/logging-flume.git


The following commit(s) were added to refs/heads/upgrade-prometheus-1.7.0 by 
this push:
     new 5d7f92dbf Refactor PrometheusHTTPMetricsServer to use Prometheus 1.7.0 
API
5d7f92dbf is described below

commit 5d7f92dbfa385a1925fc13605317034eb4ae8709
Author: Ralph Goers <[email protected]>
AuthorDate: Mon Jun 8 15:05:07 2026 -0700

    Refactor PrometheusHTTPMetricsServer to use Prometheus 1.7.0 API
---
 .../http/PrometheusHTTPMetricsServer.java          | 190 +++++++++------------
 1 file changed, 79 insertions(+), 111 deletions(-)

diff --git 
a/flume-ng-core/src/main/java/org/apache/flume/instrumentation/http/PrometheusHTTPMetricsServer.java
 
b/flume-ng-core/src/main/java/org/apache/flume/instrumentation/http/PrometheusHTTPMetricsServer.java
index 3512050eb..42c33989b 100644
--- 
a/flume-ng-core/src/main/java/org/apache/flume/instrumentation/http/PrometheusHTTPMetricsServer.java
+++ 
b/flume-ng-core/src/main/java/org/apache/flume/instrumentation/http/PrometheusHTTPMetricsServer.java
@@ -17,10 +17,10 @@
 package org.apache.flume.instrumentation.http;
 
 import com.google.common.base.Throwables;
-import io.prometheus.client.Collector;
-import io.prometheus.client.CounterMetricFamily;
-import io.prometheus.client.GaugeMetricFamily;
-import io.prometheus.client.exporter.MetricsServlet;
+import io.prometheus.metrics.core.metrics.Counter;
+import io.prometheus.metrics.core.metrics.Gauge;
+import io.prometheus.metrics.core.registry.PrometheusRegistry;
+import io.prometheus.metrics.exporter.servlet.MetricsServlet;
 import java.lang.management.ManagementFactory;
 import java.lang.reflect.Method;
 import java.util.ArrayList;
@@ -51,13 +51,9 @@ import org.slf4j.LoggerFactory;
 
 /**
  * A Monitor service implementation that runs a web server on a configurable
- * port and returns the metrics for components in JSON format. <p> Optional
+ * port and returns the metrics for components in Prometheus format. <p> 
Optional
  * parameters: <p> <tt>port</tt> : The port on which the server should listen
- * to.<p> Returns metrics in the following format: <p>
- *
- * {<p> "componentName1":{"metric1" : "metricValue1","metric2":"metricValue2"}
- * <p> "componentName1":{"metric3" : "metricValue3","metric4":"metricValue4"}
- * <p> }
+ * to.<p> Returns metrics in Prometheus text format via /metrics endpoint
  */
 public class PrometheusHTTPMetricsServer extends HTTPMetricsServer implements 
MonitorService {
 
@@ -66,12 +62,13 @@ public class PrometheusHTTPMetricsServer extends 
HTTPMetricsServer implements Mo
     private static Logger LOG = 
LoggerFactory.getLogger(PrometheusHTTPMetricsServer.class);
     private static MBeanServer mbeanServer = 
ManagementFactory.getPlatformMBeanServer();
 
-    private FlumePrometheusCollector requests;
+    private FlumePrometheusCollector metricsCollector;
 
     @Override
     public void start() {
 
-        requests = new FlumePrometheusCollector().register();
+        metricsCollector = new FlumePrometheusCollector();
+        metricsCollector.register();
 
         jettyServer = new Server();
         // We can use Contexts etc if we have many urls to handle. For one url,
@@ -91,17 +88,20 @@ public class PrometheusHTTPMetricsServer extends 
HTTPMetricsServer implements Mo
                 Thread.sleep(500);
             }
         } catch (Exception ex) {
-            LOG.error("Error starting Jetty. JSON Metrics may not be 
available.", ex);
+            LOG.error("Error starting Jetty. Prometheus Metrics may not be 
available.", ex);
         }
     }
 
-    class FlumePrometheusCollector extends Collector {
-
-        public List<MetricFamilySamples> collect() {
+    class FlumePrometheusCollector {
+        private final Map<String, Counter> counters = new HashMap<>();
+        private final Map<String, Gauge> gauges = new HashMap<>();
+        private final PrometheusRegistry registry = 
PrometheusRegistry.defaultRegistry;
 
-            Map<Object, Map<String, MetricFamilySamples>> counterMetricMap = 
new HashMap<>();
-            List<Collector.MetricFamilySamples> mfs = new ArrayList<>();
+        public void register() {
+            collectMetrics();
+        }
 
+        private void collectMetrics() {
             Set<ObjectInstance> queryMBeans;
             try {
                 queryMBeans = mbeanServer.queryMBeans(null, null);
@@ -109,57 +109,44 @@ public class PrometheusHTTPMetricsServer extends 
HTTPMetricsServer implements Mo
                 for (ObjectInstance obj : queryMBeans) {
                     try {
                         if 
(obj.getObjectName().toString().startsWith("org.apache.flume")) {
-                            processFlumeMetric(counterMetricMap, mfs, obj);
+                            processFlumeMetric(obj);
                         } else if 
((obj.getObjectName().toString().startsWith("kafka.consumer")
                                         || 
obj.getObjectName().toString().startsWith("kafka.producer"))
                                 && 
obj.getObjectName().toString().contains("metrics")) {
-                            processKafkaMetric(counterMetricMap, mfs, obj);
+                            processKafkaMetric(obj);
                         }
 
                     } catch (Exception e) {
                         LOG.error("Unable to poll JMX for metrics.", e);
                     }
                 }
-                return mfs;
 
             } catch (Exception ex) {
                 LOG.error("Could not get Mbeans for monitoring", ex);
                 Throwables.propagate(ex);
-                return null;
             }
         }
 
-        private void processFlumeMetric(
-                Map<Object, Map<String, MetricFamilySamples>> counterMetricMap,
-                List<MetricFamilySamples> mfs,
-                ObjectInstance obj)
+        private void processFlumeMetric(ObjectInstance obj)
                 throws ClassNotFoundException, InstanceNotFoundException, 
IntrospectionException, ReflectionException {
-            Class mbeanClass = Class.forName(obj.getClassName());
-            Map<String, MetricFamilySamples> metricsMap;
-
-            if (!counterMetricMap.containsKey(mbeanClass)) {
-                metricsMap = new HashMap<>();
-
-                for (Method method : mbeanClass.getMethods()) {
-                    String methodName = method.getName();
-                    if (methodName.startsWith("increment") && 
methodName.length() > "increment".length()) {
-                        String counterName = PROM_DEFAULT_PREFIX + 
methodName.substring("increment".length());
-                        createCounterIfNotExists(mfs, metricsMap, counterName);
-                    } else if (methodName.startsWith("addTo")) {
-                        String counterName = PROM_DEFAULT_PREFIX + 
methodName.substring("addTo".length());
-                        createCounterIfNotExists(mfs, metricsMap, counterName);
-                    } else if (methodName.startsWith("set")) {
-                        String counterName = PROM_DEFAULT_PREFIX + 
methodName.substring("set".length());
-                        createGaugeIfNotExists(mfs, metricsMap, counterName, 
Arrays.asList("component"));
-                    }
+            Class<?> mbeanClass = Class.forName(obj.getClassName());
+
+            // First pass: create counters and gauges based on method names
+            for (Method method : mbeanClass.getMethods()) {
+                String methodName = method.getName();
+                if (methodName.startsWith("increment") && methodName.length() 
> "increment".length()) {
+                    String counterName = PROM_DEFAULT_PREFIX + 
methodName.substring("increment".length());
+                    createCounterIfNotExists(counterName);
+                } else if (methodName.startsWith("addTo")) {
+                    String counterName = PROM_DEFAULT_PREFIX + 
methodName.substring("addTo".length());
+                    createCounterIfNotExists(counterName);
+                } else if (methodName.startsWith("set")) {
+                    String gaugeName = PROM_DEFAULT_PREFIX + 
methodName.substring("set".length());
+                    createGaugeIfNotExists(gaugeName);
                 }
-
-                counterMetricMap.put(mbeanClass, metricsMap);
-
-            } else {
-                metricsMap = counterMetricMap.get(mbeanClass);
             }
 
+            // Second pass: get attribute values and update metrics
             MBeanAttributeInfo[] attrs =
                     
mbeanServer.getMBeanInfo(obj.getObjectName()).getAttributes();
             String[] strAtts = new String[attrs.length];
@@ -174,26 +161,25 @@ public class PrometheusHTTPMetricsServer extends 
HTTPMetricsServer implements Mo
             for (Object attr : attrList) {
                 Attribute localAttr = (Attribute) attr;
                 if (!localAttr.getName().equalsIgnoreCase("type")) {
-                    MetricFamilySamples samples = 
metricsMap.get(PROM_DEFAULT_PREFIX + localAttr.getName());
-                    if (samples instanceof CounterMetricFamily) {
-                        ((CounterMetricFamily) samples)
-                                .addMetric(
-                                        Arrays.asList(component),
-                                        
Double.valueOf(localAttr.getValue().toString()));
-                    } else if (samples instanceof GaugeMetricFamily) {
-                        ((GaugeMetricFamily) samples)
-                                .addMetric(
-                                        Arrays.asList(component),
-                                        
Double.valueOf(localAttr.getValue().toString()));
+                    String metricName = PROM_DEFAULT_PREFIX + 
localAttr.getName();
+                    double value = 
Double.parseDouble(localAttr.getValue().toString());
+
+                    Counter counter = counters.get(metricName);
+                    if (counter != null) {
+                        // For counters, we label by component
+                        counter.labelValues(component).inc(value);
+                    }
+
+                    Gauge gauge = gauges.get(metricName);
+                    if (gauge != null) {
+                        // For gauges, we label by component
+                        gauge.labelValues(component).set(value);
                     }
                 }
             }
         }
 
-        private void processKafkaMetric(
-                Map<Object, Map<String, MetricFamilySamples>> counterMetricMap,
-                List<MetricFamilySamples> mfs,
-                ObjectInstance obj)
+        private void processKafkaMetric(ObjectInstance obj)
                 throws InstanceNotFoundException, IntrospectionException, 
ReflectionException {
 
             ObjectName objectName = obj.getObjectName();
@@ -201,18 +187,12 @@ public class PrometheusHTTPMetricsServer extends 
HTTPMetricsServer implements Mo
 
             TreeMap<String, String> properties = new TreeMap<>();
             for (String key : objectName.getKeyPropertyList().keySet()) {
-                properties.put(
-                        makeStringPromSafe(key), 
objectName.getKeyPropertyList().get(key));
+                properties.put(makeStringPromSafe(key), 
objectName.getKeyPropertyList().get(key));
             }
 
-            // We create a unique name for the metric based on the metric that 
came from Kafka, plus
-            // all of the properties. Unfortunately Kafka does not have unique 
metric names and therefore
-            // you can end up with metrics with differing property lists 
(which you can't have.
             String metricKey = qualifiedType + "_" + String.join("_", 
properties.keySet()) + "_";
 
-            Map<String, MetricFamilySamples> metricsMap;
-
-            // Get the attribute list now as we'll need it to create the gauge
+            // Get the attribute list now as we'll need it to create gauges
             MBeanAttributeInfo[] attrs =
                     
mbeanServer.getMBeanInfo(obj.getObjectName()).getAttributes();
             String[] strAtts = new String[attrs.length];
@@ -220,22 +200,10 @@ public class PrometheusHTTPMetricsServer extends 
HTTPMetricsServer implements Mo
                 strAtts[i] = attrs[i].getName();
             }
 
-            // We pre-create each metric (once) before populating it once for 
each matching mbean
-            if (!counterMetricMap.containsKey(metricKey)) {
-                metricsMap = new HashMap<>();
-
-                for (String attr : strAtts) {
-                    createGaugeIfNotExists(
-                            mfs,
-                            metricsMap,
-                            metricKey + "_" + makeStringPromSafe(attr),
-                            new ArrayList<>(properties.keySet()));
-                }
-
-                counterMetricMap.put(metricKey, metricsMap);
-
-            } else {
-                metricsMap = counterMetricMap.get(metricKey);
+            // Pre-create each metric (once) before populating it
+            for (String attr : strAtts) {
+                String gaugeName = metricKey + "_" + makeStringPromSafe(attr);
+                createGaugeIfNotExists(gaugeName);
             }
 
             AttributeList attrList = 
mbeanServer.getAttributes(obj.getObjectName(), strAtts);
@@ -244,42 +212,42 @@ public class PrometheusHTTPMetricsServer extends 
HTTPMetricsServer implements Mo
                 Attribute localAttr = (Attribute) attr;
 
                 try {
-
-                    GaugeMetricFamily samples = (GaugeMetricFamily)
-                            metricsMap.get(metricKey + "_" + 
makeStringPromSafe(localAttr.getName()));
-                    samples.addMetric(
-                            new ArrayList<>(properties.values()),
-                            Double.valueOf(localAttr.getValue().toString()));
+                    String gaugeName = metricKey + "_" + 
makeStringPromSafe(localAttr.getName());
+                    Gauge gauge = gauges.get(gaugeName);
+                    if (gauge != null) {
+                        double value = 
Double.parseDouble(localAttr.getValue().toString());
+                        gauge.labelValues(new 
ArrayList<>(properties.values()).toArray(new String[0]))
+                                .set(value);
+                    }
                 } catch (Exception e) {
                     LOG.warn("Metric {} could not be monitored", metricKey, e);
                 }
             }
         }
 
-        // Prometeus is really unhappy with metrics with , or - in, so replace 
them
+        // Prometheus is really unhappy with metrics with , or - in, so 
replace them
         private String makeStringPromSafe(String input) {
             return input.replaceAll("[.\\-]", "");
         }
 
-        private void createCounterIfNotExists(
-                List<MetricFamilySamples> mfs, Map<String, 
MetricFamilySamples> metricsMap, String counterName) {
-            if (!metricsMap.containsKey(counterName)) {
-                CounterMetricFamily labeledCounter =
-                        new CounterMetricFamily(counterName, counterName, 
Arrays.asList("component"));
-                metricsMap.put(counterName, labeledCounter);
-                mfs.add(labeledCounter);
+        private void createCounterIfNotExists(String counterName) {
+            if (!counters.containsKey(counterName)) {
+                Counter counter = Counter.builder()
+                        .name(counterName)
+                        .help(counterName)
+                        .labelNames("component")
+                        .register();
+                counters.put(counterName, counter);
             }
         }
 
-        private void createGaugeIfNotExists(
-                List<MetricFamilySamples> mfs,
-                Map<String, MetricFamilySamples> metricsMap,
-                String gaugeName,
-                List<String> labelNames) {
-            if (!metricsMap.containsKey(gaugeName)) {
-                GaugeMetricFamily labelledGauge = new 
GaugeMetricFamily(gaugeName, gaugeName, labelNames);
-                metricsMap.put(gaugeName, labelledGauge);
-                mfs.add(labelledGauge);
+        private void createGaugeIfNotExists(String gaugeName) {
+            if (!gauges.containsKey(gaugeName)) {
+                Gauge gauge = Gauge.builder()
+                        .name(gaugeName)
+                        .help(gaugeName)
+                        .register();
+                gauges.put(gaugeName, gauge);
             }
         }
     }

Reply via email to