asafm commented on code in PR #17531:
URL: https://github.com/apache/pulsar/pull/17531#discussion_r983467283


##########
pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/prometheus/metrics/PrometheusMetricsProvider.java:
##########
@@ -18,109 +18,109 @@
  */
 package org.apache.pulsar.broker.stats.prometheus.metrics;
 
-import static 
org.apache.pulsar.common.util.Runnables.catchingAndLoggingThrowables;
 import com.google.common.annotations.VisibleForTesting;
 import io.netty.util.concurrent.DefaultThreadFactory;
 import io.prometheus.client.Collector;
+import io.prometheus.client.CollectorRegistry;
 import java.io.IOException;
 import java.io.Writer;
+import java.util.Collections;
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ConcurrentMap;
-import java.util.concurrent.ConcurrentSkipListMap;
 import java.util.concurrent.Executors;
 import java.util.concurrent.ScheduledExecutorService;
 import java.util.concurrent.TimeUnit;
-import org.apache.bookkeeper.stats.CachingStatsProvider;
 import org.apache.bookkeeper.stats.StatsLogger;
 import org.apache.bookkeeper.stats.StatsProvider;
 import org.apache.commons.configuration.Configuration;
 import org.apache.commons.lang.StringUtils;
+import org.apache.pulsar.broker.stats.prometheus.PrometheusRawMetricsProvider;
+import org.apache.pulsar.common.util.SimpleTextOutputStream;
 
 /**
- * A <i>Prometheus</i> based {@link StatsProvider} implementation.
+ * A <i>Prometheus</i> based {@link PrometheusRawMetricsProvider} 
implementation.
  */
-public class PrometheusMetricsProvider implements StatsProvider {
+public class PrometheusMetricsProvider implements StatsProvider, 
PrometheusRawMetricsProvider {
     private ScheduledExecutorService executor;
 
     public static final String PROMETHEUS_STATS_LATENCY_ROLLOVER_SECONDS = 
"prometheusStatsLatencyRolloverSeconds";
     public static final int DEFAULT_PROMETHEUS_STATS_LATENCY_ROLLOVER_SECONDS 
= 60;
     public static final String CLUSTER_NAME = "cluster";
     public static final String DEFAULT_CLUSTER_NAME = "pulsar";
 
-    private String cluster;
-    private final CachingStatsProvider cachingStatsProvider;
-
+    private final CollectorRegistry registry;
+    private Map<String, String> labels;
     /**
      * These acts a registry of the metrics defined in this provider.
      */
-    public final ConcurrentMap<String, LongAdderCounter> counters = new 
ConcurrentSkipListMap<>();
-    public final ConcurrentMap<String, SimpleGauge<? extends Number>> gauges = 
new ConcurrentSkipListMap<>();
-    public final ConcurrentMap<String, DataSketchesOpStatsLogger> opStats = 
new ConcurrentSkipListMap<>();
+    public final ConcurrentMap<ScopeContext, LongAdderCounter> counters = new 
ConcurrentHashMap<>();
+    public final ConcurrentMap<ScopeContext, SimpleGauge<? extends Number>> 
gauges = new ConcurrentHashMap<>();
+    public final ConcurrentMap<ScopeContext, DataSketchesOpStatsLogger> 
opStats = new ConcurrentHashMap<>();
+    final ConcurrentMap<ScopeContext, ThreadScopedDataSketchesStatsLogger> 
threadScopedOpStats =
+        new ConcurrentHashMap<>();
+    final ConcurrentMap<ScopeContext, ThreadScopedLongAdderCounter> 
threadScopedCounters =
+        new ConcurrentHashMap<>();
+
 
     public PrometheusMetricsProvider() {
-        this.cachingStatsProvider = new CachingStatsProvider(new 
StatsProvider() {
-            @Override
-            public void start(Configuration conf) {
-                // nop
-            }
-
-            @Override
-            public void stop() {
-                // nop
-            }
-
-            @Override
-            public StatsLogger getStatsLogger(String scope) {
-                return new 
PrometheusStatsLogger(PrometheusMetricsProvider.this, scope);
-            }
-
-            @Override
-            public String getStatsName(String... statsComponents) {
-                String completeName;
-                if (statsComponents.length == 0) {
-                    return "";
-                } else if (statsComponents[0].isEmpty()) {
-                    completeName = StringUtils.join(statsComponents, '_', 1, 
statsComponents.length);
-                } else {
-                    completeName = StringUtils.join(statsComponents, '_');
-                }
-                return Collector.sanitizeMetricName(completeName);
-            }
-        });
+        this(CollectorRegistry.defaultRegistry);
+    }
+
+    public PrometheusMetricsProvider(CollectorRegistry registry) {
+        this.registry = registry;
     }
 
-    @Override
     public void start(Configuration conf) {
-        executor = Executors.newSingleThreadScheduledExecutor(new 
DefaultThreadFactory("metrics"));
 
+        executor = Executors.newSingleThreadScheduledExecutor(new 
DefaultThreadFactory("metrics"));
         int latencyRolloverSeconds = 
conf.getInt(PROMETHEUS_STATS_LATENCY_ROLLOVER_SECONDS,
                 DEFAULT_PROMETHEUS_STATS_LATENCY_ROLLOVER_SECONDS);
-        cluster = conf.getString(CLUSTER_NAME, DEFAULT_CLUSTER_NAME);
+        labels = Collections.singletonMap(CLUSTER_NAME, 
conf.getString(CLUSTER_NAME, DEFAULT_CLUSTER_NAME));
+
+        executor.scheduleAtFixedRate(() -> {
+            rotateLatencyCollection();
+        }, 1, latencyRolloverSeconds, TimeUnit.SECONDS);
 
-        
executor.scheduleAtFixedRate(catchingAndLoggingThrowables(this::rotateLatencyCollection),
-                1, latencyRolloverSeconds, TimeUnit.SECONDS);
     }
 
-    @Override
     public void stop() {
-        executor.shutdownNow();
+        executor.shutdown();
     }
 
-    @Override
     public StatsLogger getStatsLogger(String scope) {
-        return this.cachingStatsProvider.getStatsLogger(scope);
+        return new PrometheusStatsLogger(PrometheusMetricsProvider.this, 
scope, labels);
     }
 
     @Override
     public void writeAllMetrics(Writer writer) throws IOException {
-        gauges.forEach((name, gauge) -> 
PrometheusTextFormatUtil.writeGauge(writer, name, cluster, gauge));
-        counters.forEach((name, counter) -> 
PrometheusTextFormatUtil.writeCounter(writer, name, cluster, counter));
-        opStats.forEach((name, opStatLogger) -> 
PrometheusTextFormatUtil.writeOpStat(writer, name, cluster,
-                opStatLogger));
+        PrometheusTextFormat prometheusTextFormat = new PrometheusTextFormat();

Review Comment:
   > Yes, there are many cases. Such as a metric name is publishrLatency, and 
the label with a different topic name. It represents different metrics.
   
   Well @hangc0276 in that case you must use `PrometheusMetricsStreams` - this 
class guarantees the output will be grouped by metric name as Prometheus Text 
format dictates



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to