This is an automated email from the ASF dual-hosted git repository.
rgoers pushed a commit to branch upgrade-prometheus-1.7.0
in repository https://gitbox.apache.org/repos/asf/logging-flume.git
The following commit(s) were added to refs/heads/upgrade-prometheus-1.7.0 by
this push:
new 5d7f92dbf Refactor PrometheusHTTPMetricsServer to use Prometheus 1.7.0
API
5d7f92dbf is described below
commit 5d7f92dbfa385a1925fc13605317034eb4ae8709
Author: Ralph Goers <[email protected]>
AuthorDate: Mon Jun 8 15:05:07 2026 -0700
Refactor PrometheusHTTPMetricsServer to use Prometheus 1.7.0 API
---
.../http/PrometheusHTTPMetricsServer.java | 190 +++++++++------------
1 file changed, 79 insertions(+), 111 deletions(-)
diff --git
a/flume-ng-core/src/main/java/org/apache/flume/instrumentation/http/PrometheusHTTPMetricsServer.java
b/flume-ng-core/src/main/java/org/apache/flume/instrumentation/http/PrometheusHTTPMetricsServer.java
index 3512050eb..42c33989b 100644
---
a/flume-ng-core/src/main/java/org/apache/flume/instrumentation/http/PrometheusHTTPMetricsServer.java
+++
b/flume-ng-core/src/main/java/org/apache/flume/instrumentation/http/PrometheusHTTPMetricsServer.java
@@ -17,10 +17,10 @@
package org.apache.flume.instrumentation.http;
import com.google.common.base.Throwables;
-import io.prometheus.client.Collector;
-import io.prometheus.client.CounterMetricFamily;
-import io.prometheus.client.GaugeMetricFamily;
-import io.prometheus.client.exporter.MetricsServlet;
+import io.prometheus.metrics.core.metrics.Counter;
+import io.prometheus.metrics.core.metrics.Gauge;
+import io.prometheus.metrics.core.registry.PrometheusRegistry;
+import io.prometheus.metrics.exporter.servlet.MetricsServlet;
import java.lang.management.ManagementFactory;
import java.lang.reflect.Method;
import java.util.ArrayList;
@@ -51,13 +51,9 @@ import org.slf4j.LoggerFactory;
/**
* A Monitor service implementation that runs a web server on a configurable
- * port and returns the metrics for components in JSON format. <p> Optional
+ * port and returns the metrics for components in Prometheus format. <p>
Optional
* parameters: <p> <tt>port</tt> : The port on which the server should listen
- * to.<p> Returns metrics in the following format: <p>
- *
- * {<p> "componentName1":{"metric1" : "metricValue1","metric2":"metricValue2"}
- * <p> "componentName1":{"metric3" : "metricValue3","metric4":"metricValue4"}
- * <p> }
+ * to.<p> Returns metrics in Prometheus text format via /metrics endpoint
*/
public class PrometheusHTTPMetricsServer extends HTTPMetricsServer implements
MonitorService {
@@ -66,12 +62,13 @@ public class PrometheusHTTPMetricsServer extends
HTTPMetricsServer implements Mo
private static Logger LOG =
LoggerFactory.getLogger(PrometheusHTTPMetricsServer.class);
private static MBeanServer mbeanServer =
ManagementFactory.getPlatformMBeanServer();
- private FlumePrometheusCollector requests;
+ private FlumePrometheusCollector metricsCollector;
@Override
public void start() {
- requests = new FlumePrometheusCollector().register();
+ metricsCollector = new FlumePrometheusCollector();
+ metricsCollector.register();
jettyServer = new Server();
// We can use Contexts etc if we have many urls to handle. For one url,
@@ -91,17 +88,20 @@ public class PrometheusHTTPMetricsServer extends
HTTPMetricsServer implements Mo
Thread.sleep(500);
}
} catch (Exception ex) {
- LOG.error("Error starting Jetty. JSON Metrics may not be
available.", ex);
+ LOG.error("Error starting Jetty. Prometheus Metrics may not be
available.", ex);
}
}
- class FlumePrometheusCollector extends Collector {
-
- public List<MetricFamilySamples> collect() {
+ class FlumePrometheusCollector {
+ private final Map<String, Counter> counters = new HashMap<>();
+ private final Map<String, Gauge> gauges = new HashMap<>();
+ private final PrometheusRegistry registry =
PrometheusRegistry.defaultRegistry;
- Map<Object, Map<String, MetricFamilySamples>> counterMetricMap =
new HashMap<>();
- List<Collector.MetricFamilySamples> mfs = new ArrayList<>();
+ public void register() {
+ collectMetrics();
+ }
+ private void collectMetrics() {
Set<ObjectInstance> queryMBeans;
try {
queryMBeans = mbeanServer.queryMBeans(null, null);
@@ -109,57 +109,44 @@ public class PrometheusHTTPMetricsServer extends
HTTPMetricsServer implements Mo
for (ObjectInstance obj : queryMBeans) {
try {
if
(obj.getObjectName().toString().startsWith("org.apache.flume")) {
- processFlumeMetric(counterMetricMap, mfs, obj);
+ processFlumeMetric(obj);
} else if
((obj.getObjectName().toString().startsWith("kafka.consumer")
||
obj.getObjectName().toString().startsWith("kafka.producer"))
&&
obj.getObjectName().toString().contains("metrics")) {
- processKafkaMetric(counterMetricMap, mfs, obj);
+ processKafkaMetric(obj);
}
} catch (Exception e) {
LOG.error("Unable to poll JMX for metrics.", e);
}
}
- return mfs;
} catch (Exception ex) {
LOG.error("Could not get Mbeans for monitoring", ex);
Throwables.propagate(ex);
- return null;
}
}
- private void processFlumeMetric(
- Map<Object, Map<String, MetricFamilySamples>> counterMetricMap,
- List<MetricFamilySamples> mfs,
- ObjectInstance obj)
+ private void processFlumeMetric(ObjectInstance obj)
throws ClassNotFoundException, InstanceNotFoundException,
IntrospectionException, ReflectionException {
- Class mbeanClass = Class.forName(obj.getClassName());
- Map<String, MetricFamilySamples> metricsMap;
-
- if (!counterMetricMap.containsKey(mbeanClass)) {
- metricsMap = new HashMap<>();
-
- for (Method method : mbeanClass.getMethods()) {
- String methodName = method.getName();
- if (methodName.startsWith("increment") &&
methodName.length() > "increment".length()) {
- String counterName = PROM_DEFAULT_PREFIX +
methodName.substring("increment".length());
- createCounterIfNotExists(mfs, metricsMap, counterName);
- } else if (methodName.startsWith("addTo")) {
- String counterName = PROM_DEFAULT_PREFIX +
methodName.substring("addTo".length());
- createCounterIfNotExists(mfs, metricsMap, counterName);
- } else if (methodName.startsWith("set")) {
- String counterName = PROM_DEFAULT_PREFIX +
methodName.substring("set".length());
- createGaugeIfNotExists(mfs, metricsMap, counterName,
Arrays.asList("component"));
- }
+ Class<?> mbeanClass = Class.forName(obj.getClassName());
+
+ // First pass: create counters and gauges based on method names
+ for (Method method : mbeanClass.getMethods()) {
+ String methodName = method.getName();
+ if (methodName.startsWith("increment") && methodName.length()
> "increment".length()) {
+ String counterName = PROM_DEFAULT_PREFIX +
methodName.substring("increment".length());
+ createCounterIfNotExists(counterName);
+ } else if (methodName.startsWith("addTo")) {
+ String counterName = PROM_DEFAULT_PREFIX +
methodName.substring("addTo".length());
+ createCounterIfNotExists(counterName);
+ } else if (methodName.startsWith("set")) {
+ String gaugeName = PROM_DEFAULT_PREFIX +
methodName.substring("set".length());
+ createGaugeIfNotExists(gaugeName);
}
-
- counterMetricMap.put(mbeanClass, metricsMap);
-
- } else {
- metricsMap = counterMetricMap.get(mbeanClass);
}
+ // Second pass: get attribute values and update metrics
MBeanAttributeInfo[] attrs =
mbeanServer.getMBeanInfo(obj.getObjectName()).getAttributes();
String[] strAtts = new String[attrs.length];
@@ -174,26 +161,25 @@ public class PrometheusHTTPMetricsServer extends
HTTPMetricsServer implements Mo
for (Object attr : attrList) {
Attribute localAttr = (Attribute) attr;
if (!localAttr.getName().equalsIgnoreCase("type")) {
- MetricFamilySamples samples =
metricsMap.get(PROM_DEFAULT_PREFIX + localAttr.getName());
- if (samples instanceof CounterMetricFamily) {
- ((CounterMetricFamily) samples)
- .addMetric(
- Arrays.asList(component),
-
Double.valueOf(localAttr.getValue().toString()));
- } else if (samples instanceof GaugeMetricFamily) {
- ((GaugeMetricFamily) samples)
- .addMetric(
- Arrays.asList(component),
-
Double.valueOf(localAttr.getValue().toString()));
+ String metricName = PROM_DEFAULT_PREFIX +
localAttr.getName();
+ double value =
Double.parseDouble(localAttr.getValue().toString());
+
+ Counter counter = counters.get(metricName);
+ if (counter != null) {
+ // For counters, we label by component
+ counter.labelValues(component).inc(value);
+ }
+
+ Gauge gauge = gauges.get(metricName);
+ if (gauge != null) {
+ // For gauges, we label by component
+ gauge.labelValues(component).set(value);
}
}
}
}
- private void processKafkaMetric(
- Map<Object, Map<String, MetricFamilySamples>> counterMetricMap,
- List<MetricFamilySamples> mfs,
- ObjectInstance obj)
+ private void processKafkaMetric(ObjectInstance obj)
throws InstanceNotFoundException, IntrospectionException,
ReflectionException {
ObjectName objectName = obj.getObjectName();
@@ -201,18 +187,12 @@ public class PrometheusHTTPMetricsServer extends
HTTPMetricsServer implements Mo
TreeMap<String, String> properties = new TreeMap<>();
for (String key : objectName.getKeyPropertyList().keySet()) {
- properties.put(
- makeStringPromSafe(key),
objectName.getKeyPropertyList().get(key));
+ properties.put(makeStringPromSafe(key),
objectName.getKeyPropertyList().get(key));
}
- // We create a unique name for the metric based on the metric that
came from Kafka, plus
- // all of the properties. Unfortunately Kafka does not have unique
metric names and therefore
- // you can end up with metrics with differing property lists
(which you can't have.
String metricKey = qualifiedType + "_" + String.join("_",
properties.keySet()) + "_";
- Map<String, MetricFamilySamples> metricsMap;
-
- // Get the attribute list now as we'll need it to create the gauge
+ // Get the attribute list now as we'll need it to create gauges
MBeanAttributeInfo[] attrs =
mbeanServer.getMBeanInfo(obj.getObjectName()).getAttributes();
String[] strAtts = new String[attrs.length];
@@ -220,22 +200,10 @@ public class PrometheusHTTPMetricsServer extends
HTTPMetricsServer implements Mo
strAtts[i] = attrs[i].getName();
}
- // We pre-create each metric (once) before populating it once for
each matching mbean
- if (!counterMetricMap.containsKey(metricKey)) {
- metricsMap = new HashMap<>();
-
- for (String attr : strAtts) {
- createGaugeIfNotExists(
- mfs,
- metricsMap,
- metricKey + "_" + makeStringPromSafe(attr),
- new ArrayList<>(properties.keySet()));
- }
-
- counterMetricMap.put(metricKey, metricsMap);
-
- } else {
- metricsMap = counterMetricMap.get(metricKey);
+ // Pre-create each metric (once) before populating it
+ for (String attr : strAtts) {
+ String gaugeName = metricKey + "_" + makeStringPromSafe(attr);
+ createGaugeIfNotExists(gaugeName);
}
AttributeList attrList =
mbeanServer.getAttributes(obj.getObjectName(), strAtts);
@@ -244,42 +212,42 @@ public class PrometheusHTTPMetricsServer extends
HTTPMetricsServer implements Mo
Attribute localAttr = (Attribute) attr;
try {
-
- GaugeMetricFamily samples = (GaugeMetricFamily)
- metricsMap.get(metricKey + "_" +
makeStringPromSafe(localAttr.getName()));
- samples.addMetric(
- new ArrayList<>(properties.values()),
- Double.valueOf(localAttr.getValue().toString()));
+ String gaugeName = metricKey + "_" +
makeStringPromSafe(localAttr.getName());
+ Gauge gauge = gauges.get(gaugeName);
+ if (gauge != null) {
+ double value =
Double.parseDouble(localAttr.getValue().toString());
+ gauge.labelValues(new
ArrayList<>(properties.values()).toArray(new String[0]))
+ .set(value);
+ }
} catch (Exception e) {
LOG.warn("Metric {} could not be monitored", metricKey, e);
}
}
}
- // Prometeus is really unhappy with metrics with , or - in, so replace
them
+ // Prometheus is really unhappy with metrics with , or - in, so
replace them
private String makeStringPromSafe(String input) {
return input.replaceAll("[.\\-]", "");
}
- private void createCounterIfNotExists(
- List<MetricFamilySamples> mfs, Map<String,
MetricFamilySamples> metricsMap, String counterName) {
- if (!metricsMap.containsKey(counterName)) {
- CounterMetricFamily labeledCounter =
- new CounterMetricFamily(counterName, counterName,
Arrays.asList("component"));
- metricsMap.put(counterName, labeledCounter);
- mfs.add(labeledCounter);
+ private void createCounterIfNotExists(String counterName) {
+ if (!counters.containsKey(counterName)) {
+ Counter counter = Counter.builder()
+ .name(counterName)
+ .help(counterName)
+ .labelNames("component")
+ .register();
+ counters.put(counterName, counter);
}
}
- private void createGaugeIfNotExists(
- List<MetricFamilySamples> mfs,
- Map<String, MetricFamilySamples> metricsMap,
- String gaugeName,
- List<String> labelNames) {
- if (!metricsMap.containsKey(gaugeName)) {
- GaugeMetricFamily labelledGauge = new
GaugeMetricFamily(gaugeName, gaugeName, labelNames);
- metricsMap.put(gaugeName, labelledGauge);
- mfs.add(labelledGauge);
+ private void createGaugeIfNotExists(String gaugeName) {
+ if (!gauges.containsKey(gaugeName)) {
+ Gauge gauge = Gauge.builder()
+ .name(gaugeName)
+ .help(gaugeName)
+ .register();
+ gauges.put(gaugeName, gauge);
}
}
}