asafm commented on code in PR #17531:
URL: https://github.com/apache/pulsar/pull/17531#discussion_r977944721


##########
pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/prometheus/metrics/PrometheusMetricsProvider.java:
##########
@@ -18,109 +18,109 @@
  */
 package org.apache.pulsar.broker.stats.prometheus.metrics;
 
-import static 
org.apache.pulsar.common.util.Runnables.catchingAndLoggingThrowables;
 import com.google.common.annotations.VisibleForTesting;
 import io.netty.util.concurrent.DefaultThreadFactory;
 import io.prometheus.client.Collector;
+import io.prometheus.client.CollectorRegistry;
 import java.io.IOException;
 import java.io.Writer;
+import java.util.Collections;
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ConcurrentMap;
-import java.util.concurrent.ConcurrentSkipListMap;
 import java.util.concurrent.Executors;
 import java.util.concurrent.ScheduledExecutorService;
 import java.util.concurrent.TimeUnit;
-import org.apache.bookkeeper.stats.CachingStatsProvider;
 import org.apache.bookkeeper.stats.StatsLogger;
 import org.apache.bookkeeper.stats.StatsProvider;
 import org.apache.commons.configuration.Configuration;
 import org.apache.commons.lang.StringUtils;
+import org.apache.pulsar.broker.stats.prometheus.PrometheusRawMetricsProvider;
+import org.apache.pulsar.common.util.SimpleTextOutputStream;
 
 /**
- * A <i>Prometheus</i> based {@link StatsProvider} implementation.
+ * A <i>Prometheus</i> based {@link PrometheusRawMetricsProvider} 
implementation.
  */
-public class PrometheusMetricsProvider implements StatsProvider {
+public class PrometheusMetricsProvider implements StatsProvider, 
PrometheusRawMetricsProvider {
     private ScheduledExecutorService executor;
 
     public static final String PROMETHEUS_STATS_LATENCY_ROLLOVER_SECONDS = 
"prometheusStatsLatencyRolloverSeconds";
     public static final int DEFAULT_PROMETHEUS_STATS_LATENCY_ROLLOVER_SECONDS 
= 60;
     public static final String CLUSTER_NAME = "cluster";
     public static final String DEFAULT_CLUSTER_NAME = "pulsar";
 
-    private String cluster;
-    private final CachingStatsProvider cachingStatsProvider;
-
+    private final CollectorRegistry registry;
+    private Map<String, String> labels;
     /**
      * These acts a registry of the metrics defined in this provider.
      */
-    public final ConcurrentMap<String, LongAdderCounter> counters = new 
ConcurrentSkipListMap<>();
-    public final ConcurrentMap<String, SimpleGauge<? extends Number>> gauges = 
new ConcurrentSkipListMap<>();
-    public final ConcurrentMap<String, DataSketchesOpStatsLogger> opStats = 
new ConcurrentSkipListMap<>();
+    public final ConcurrentMap<ScopeContext, LongAdderCounter> counters = new 
ConcurrentHashMap<>();
+    public final ConcurrentMap<ScopeContext, SimpleGauge<? extends Number>> 
gauges = new ConcurrentHashMap<>();
+    public final ConcurrentMap<ScopeContext, DataSketchesOpStatsLogger> 
opStats = new ConcurrentHashMap<>();
+    final ConcurrentMap<ScopeContext, ThreadScopedDataSketchesStatsLogger> 
threadScopedOpStats =
+        new ConcurrentHashMap<>();
+    final ConcurrentMap<ScopeContext, ThreadScopedLongAdderCounter> 
threadScopedCounters =
+        new ConcurrentHashMap<>();
+
 
     public PrometheusMetricsProvider() {
-        this.cachingStatsProvider = new CachingStatsProvider(new 
StatsProvider() {
-            @Override
-            public void start(Configuration conf) {
-                // nop
-            }
-
-            @Override
-            public void stop() {
-                // nop
-            }
-
-            @Override
-            public StatsLogger getStatsLogger(String scope) {
-                return new 
PrometheusStatsLogger(PrometheusMetricsProvider.this, scope);
-            }
-
-            @Override
-            public String getStatsName(String... statsComponents) {
-                String completeName;
-                if (statsComponents.length == 0) {
-                    return "";
-                } else if (statsComponents[0].isEmpty()) {
-                    completeName = StringUtils.join(statsComponents, '_', 1, 
statsComponents.length);
-                } else {
-                    completeName = StringUtils.join(statsComponents, '_');
-                }
-                return Collector.sanitizeMetricName(completeName);
-            }
-        });
+        this(CollectorRegistry.defaultRegistry);
+    }
+
+    public PrometheusMetricsProvider(CollectorRegistry registry) {
+        this.registry = registry;
     }
 
-    @Override
     public void start(Configuration conf) {
-        executor = Executors.newSingleThreadScheduledExecutor(new 
DefaultThreadFactory("metrics"));
 
+        executor = Executors.newSingleThreadScheduledExecutor(new 
DefaultThreadFactory("metrics"));
         int latencyRolloverSeconds = 
conf.getInt(PROMETHEUS_STATS_LATENCY_ROLLOVER_SECONDS,
                 DEFAULT_PROMETHEUS_STATS_LATENCY_ROLLOVER_SECONDS);
-        cluster = conf.getString(CLUSTER_NAME, DEFAULT_CLUSTER_NAME);
+        labels = Collections.singletonMap(CLUSTER_NAME, 
conf.getString(CLUSTER_NAME, DEFAULT_CLUSTER_NAME));
+
+        executor.scheduleAtFixedRate(() -> {
+            rotateLatencyCollection();
+        }, 1, latencyRolloverSeconds, TimeUnit.SECONDS);
 
-        
executor.scheduleAtFixedRate(catchingAndLoggingThrowables(this::rotateLatencyCollection),
-                1, latencyRolloverSeconds, TimeUnit.SECONDS);
     }
 
-    @Override
     public void stop() {
-        executor.shutdownNow();
+        executor.shutdown();
     }
 
-    @Override
     public StatsLogger getStatsLogger(String scope) {
-        return this.cachingStatsProvider.getStatsLogger(scope);
+        return new PrometheusStatsLogger(PrometheusMetricsProvider.this, 
scope, labels);
     }
 
     @Override
     public void writeAllMetrics(Writer writer) throws IOException {
-        gauges.forEach((name, gauge) -> 
PrometheusTextFormatUtil.writeGauge(writer, name, cluster, gauge));
-        counters.forEach((name, counter) -> 
PrometheusTextFormatUtil.writeCounter(writer, name, cluster, counter));
-        opStats.forEach((name, opStatLogger) -> 
PrometheusTextFormatUtil.writeOpStat(writer, name, cluster,
-                opStatLogger));
+        PrometheusTextFormat prometheusTextFormat = new PrometheusTextFormat();

Review Comment:
   Reiterate question: The only caveat I see is this: Do you see any chance 
that we will have two different pairs of (scopeContext, gauge/metric/...) such 
that they have the same metric name, maybe the only difference is in the labels?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to