asafm commented on code in PR #17531:
URL: https://github.com/apache/pulsar/pull/17531#discussion_r983467283
##########
pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/prometheus/metrics/PrometheusMetricsProvider.java:
##########
@@ -18,109 +18,109 @@
*/
package org.apache.pulsar.broker.stats.prometheus.metrics;
-import static
org.apache.pulsar.common.util.Runnables.catchingAndLoggingThrowables;
import com.google.common.annotations.VisibleForTesting;
import io.netty.util.concurrent.DefaultThreadFactory;
import io.prometheus.client.Collector;
+import io.prometheus.client.CollectorRegistry;
import java.io.IOException;
import java.io.Writer;
+import java.util.Collections;
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
-import java.util.concurrent.ConcurrentSkipListMap;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
-import org.apache.bookkeeper.stats.CachingStatsProvider;
import org.apache.bookkeeper.stats.StatsLogger;
import org.apache.bookkeeper.stats.StatsProvider;
import org.apache.commons.configuration.Configuration;
import org.apache.commons.lang.StringUtils;
+import org.apache.pulsar.broker.stats.prometheus.PrometheusRawMetricsProvider;
+import org.apache.pulsar.common.util.SimpleTextOutputStream;
/**
- * A <i>Prometheus</i> based {@link StatsProvider} implementation.
+ * A <i>Prometheus</i> based {@link PrometheusRawMetricsProvider}
implementation.
*/
-public class PrometheusMetricsProvider implements StatsProvider {
+public class PrometheusMetricsProvider implements StatsProvider,
PrometheusRawMetricsProvider {
private ScheduledExecutorService executor;
public static final String PROMETHEUS_STATS_LATENCY_ROLLOVER_SECONDS =
"prometheusStatsLatencyRolloverSeconds";
public static final int DEFAULT_PROMETHEUS_STATS_LATENCY_ROLLOVER_SECONDS
= 60;
public static final String CLUSTER_NAME = "cluster";
public static final String DEFAULT_CLUSTER_NAME = "pulsar";
- private String cluster;
- private final CachingStatsProvider cachingStatsProvider;
-
+ private final CollectorRegistry registry;
+ private Map<String, String> labels;
/**
* These acts a registry of the metrics defined in this provider.
*/
- public final ConcurrentMap<String, LongAdderCounter> counters = new
ConcurrentSkipListMap<>();
- public final ConcurrentMap<String, SimpleGauge<? extends Number>> gauges =
new ConcurrentSkipListMap<>();
- public final ConcurrentMap<String, DataSketchesOpStatsLogger> opStats =
new ConcurrentSkipListMap<>();
+ public final ConcurrentMap<ScopeContext, LongAdderCounter> counters = new
ConcurrentHashMap<>();
+ public final ConcurrentMap<ScopeContext, SimpleGauge<? extends Number>>
gauges = new ConcurrentHashMap<>();
+ public final ConcurrentMap<ScopeContext, DataSketchesOpStatsLogger>
opStats = new ConcurrentHashMap<>();
+ final ConcurrentMap<ScopeContext, ThreadScopedDataSketchesStatsLogger>
threadScopedOpStats =
+ new ConcurrentHashMap<>();
+ final ConcurrentMap<ScopeContext, ThreadScopedLongAdderCounter>
threadScopedCounters =
+ new ConcurrentHashMap<>();
+
public PrometheusMetricsProvider() {
- this.cachingStatsProvider = new CachingStatsProvider(new
StatsProvider() {
- @Override
- public void start(Configuration conf) {
- // nop
- }
-
- @Override
- public void stop() {
- // nop
- }
-
- @Override
- public StatsLogger getStatsLogger(String scope) {
- return new
PrometheusStatsLogger(PrometheusMetricsProvider.this, scope);
- }
-
- @Override
- public String getStatsName(String... statsComponents) {
- String completeName;
- if (statsComponents.length == 0) {
- return "";
- } else if (statsComponents[0].isEmpty()) {
- completeName = StringUtils.join(statsComponents, '_', 1,
statsComponents.length);
- } else {
- completeName = StringUtils.join(statsComponents, '_');
- }
- return Collector.sanitizeMetricName(completeName);
- }
- });
+ this(CollectorRegistry.defaultRegistry);
+ }
+
+ public PrometheusMetricsProvider(CollectorRegistry registry) {
+ this.registry = registry;
}
- @Override
public void start(Configuration conf) {
- executor = Executors.newSingleThreadScheduledExecutor(new
DefaultThreadFactory("metrics"));
+ executor = Executors.newSingleThreadScheduledExecutor(new
DefaultThreadFactory("metrics"));
int latencyRolloverSeconds =
conf.getInt(PROMETHEUS_STATS_LATENCY_ROLLOVER_SECONDS,
DEFAULT_PROMETHEUS_STATS_LATENCY_ROLLOVER_SECONDS);
- cluster = conf.getString(CLUSTER_NAME, DEFAULT_CLUSTER_NAME);
+ labels = Collections.singletonMap(CLUSTER_NAME,
conf.getString(CLUSTER_NAME, DEFAULT_CLUSTER_NAME));
+
+ executor.scheduleAtFixedRate(() -> {
+ rotateLatencyCollection();
+ }, 1, latencyRolloverSeconds, TimeUnit.SECONDS);
-
executor.scheduleAtFixedRate(catchingAndLoggingThrowables(this::rotateLatencyCollection),
- 1, latencyRolloverSeconds, TimeUnit.SECONDS);
}
- @Override
public void stop() {
- executor.shutdownNow();
+ executor.shutdown();
}
- @Override
public StatsLogger getStatsLogger(String scope) {
- return this.cachingStatsProvider.getStatsLogger(scope);
+ return new PrometheusStatsLogger(PrometheusMetricsProvider.this,
scope, labels);
}
@Override
public void writeAllMetrics(Writer writer) throws IOException {
- gauges.forEach((name, gauge) ->
PrometheusTextFormatUtil.writeGauge(writer, name, cluster, gauge));
- counters.forEach((name, counter) ->
PrometheusTextFormatUtil.writeCounter(writer, name, cluster, counter));
- opStats.forEach((name, opStatLogger) ->
PrometheusTextFormatUtil.writeOpStat(writer, name, cluster,
- opStatLogger));
+ PrometheusTextFormat prometheusTextFormat = new PrometheusTextFormat();
Review Comment:
> Yes, there are many cases. Such as a metric name is publishrLatency, and
the label with a different topic name. It represents different metrics.
Well @hangc0276 in that case you must use `PrometheusMetricsStreams` - this
class guarantees the output will be grouped by metric name as Prometheus Text
format dictates
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]