asafm commented on code in PR #17531:
URL: https://github.com/apache/pulsar/pull/17531#discussion_r976577025


##########
pulsar-broker/src/test/java/org/apache/pulsar/broker/stats/PrometheusMetricsTest.java:
##########
@@ -1634,6 +1643,29 @@ public static Multimap<String, Metric> 
parseMetrics(String metrics) {
         return parsed;
     }
 
+    @Test
+    public void testRawMetricsProvider() throws IOException {
+        PrometheusMetricsProvider rawMetricsProvider = new 
PrometheusMetricsProvider();
+        rawMetricsProvider.start(new PropertiesConfiguration());
+        
rawMetricsProvider.getStatsLogger("test").getOpStatsLogger("test_metrics")
+            .registerSuccessfulEvent(100, TimeUnit.NANOSECONDS);
+
+        getPulsar().addPrometheusRawMetricsProvider(rawMetricsProvider);
+        HttpClient httpClient = HttpClientBuilder.create().build();
+        final String metricsEndPoint = getPulsar().getWebServiceAddress() + 
"/metrics";
+        HttpResponse response = httpClient.execute(new 
HttpGet(metricsEndPoint));
+        InputStream inputStream = response.getEntity().getContent();

Review Comment:
   use `EntityUtils.toString`



##########
pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/prometheus/metrics/PrometheusMetricsProvider.java:
##########
@@ -18,109 +18,109 @@
  */
 package org.apache.pulsar.broker.stats.prometheus.metrics;
 
-import static 
org.apache.pulsar.common.util.Runnables.catchingAndLoggingThrowables;
 import com.google.common.annotations.VisibleForTesting;
 import io.netty.util.concurrent.DefaultThreadFactory;
 import io.prometheus.client.Collector;
+import io.prometheus.client.CollectorRegistry;
 import java.io.IOException;
 import java.io.Writer;
+import java.util.Collections;
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ConcurrentMap;
-import java.util.concurrent.ConcurrentSkipListMap;
 import java.util.concurrent.Executors;
 import java.util.concurrent.ScheduledExecutorService;
 import java.util.concurrent.TimeUnit;
-import org.apache.bookkeeper.stats.CachingStatsProvider;
 import org.apache.bookkeeper.stats.StatsLogger;
 import org.apache.bookkeeper.stats.StatsProvider;
 import org.apache.commons.configuration.Configuration;
 import org.apache.commons.lang.StringUtils;
+import org.apache.pulsar.broker.stats.prometheus.PrometheusRawMetricsProvider;
+import org.apache.pulsar.common.util.SimpleTextOutputStream;
 
 /**
- * A <i>Prometheus</i> based {@link StatsProvider} implementation.
+ * A <i>Prometheus</i> based {@link PrometheusRawMetricsProvider} 
implementation.
  */
-public class PrometheusMetricsProvider implements StatsProvider {
+public class PrometheusMetricsProvider implements StatsProvider, 
PrometheusRawMetricsProvider {
     private ScheduledExecutorService executor;
 
     public static final String PROMETHEUS_STATS_LATENCY_ROLLOVER_SECONDS = 
"prometheusStatsLatencyRolloverSeconds";
     public static final int DEFAULT_PROMETHEUS_STATS_LATENCY_ROLLOVER_SECONDS 
= 60;
     public static final String CLUSTER_NAME = "cluster";
     public static final String DEFAULT_CLUSTER_NAME = "pulsar";
 
-    private String cluster;
-    private final CachingStatsProvider cachingStatsProvider;
-
+    private final CollectorRegistry registry;
+    private Map<String, String> labels;
     /**
      * These acts a registry of the metrics defined in this provider.
      */
-    public final ConcurrentMap<String, LongAdderCounter> counters = new 
ConcurrentSkipListMap<>();
-    public final ConcurrentMap<String, SimpleGauge<? extends Number>> gauges = 
new ConcurrentSkipListMap<>();
-    public final ConcurrentMap<String, DataSketchesOpStatsLogger> opStats = 
new ConcurrentSkipListMap<>();
+    public final ConcurrentMap<ScopeContext, LongAdderCounter> counters = new 
ConcurrentHashMap<>();
+    public final ConcurrentMap<ScopeContext, SimpleGauge<? extends Number>> 
gauges = new ConcurrentHashMap<>();
+    public final ConcurrentMap<ScopeContext, DataSketchesOpStatsLogger> 
opStats = new ConcurrentHashMap<>();
+    final ConcurrentMap<ScopeContext, ThreadScopedDataSketchesStatsLogger> 
threadScopedOpStats =
+        new ConcurrentHashMap<>();
+    final ConcurrentMap<ScopeContext, ThreadScopedLongAdderCounter> 
threadScopedCounters =
+        new ConcurrentHashMap<>();
+
 
     public PrometheusMetricsProvider() {
-        this.cachingStatsProvider = new CachingStatsProvider(new 
StatsProvider() {
-            @Override
-            public void start(Configuration conf) {
-                // nop
-            }
-
-            @Override
-            public void stop() {
-                // nop
-            }
-
-            @Override
-            public StatsLogger getStatsLogger(String scope) {
-                return new 
PrometheusStatsLogger(PrometheusMetricsProvider.this, scope);
-            }
-
-            @Override
-            public String getStatsName(String... statsComponents) {
-                String completeName;
-                if (statsComponents.length == 0) {
-                    return "";
-                } else if (statsComponents[0].isEmpty()) {
-                    completeName = StringUtils.join(statsComponents, '_', 1, 
statsComponents.length);
-                } else {
-                    completeName = StringUtils.join(statsComponents, '_');
-                }
-                return Collector.sanitizeMetricName(completeName);
-            }
-        });
+        this(CollectorRegistry.defaultRegistry);
+    }
+
+    public PrometheusMetricsProvider(CollectorRegistry registry) {
+        this.registry = registry;
     }
 
-    @Override
     public void start(Configuration conf) {
-        executor = Executors.newSingleThreadScheduledExecutor(new 
DefaultThreadFactory("metrics"));
 
+        executor = Executors.newSingleThreadScheduledExecutor(new 
DefaultThreadFactory("metrics"));
         int latencyRolloverSeconds = 
conf.getInt(PROMETHEUS_STATS_LATENCY_ROLLOVER_SECONDS,
                 DEFAULT_PROMETHEUS_STATS_LATENCY_ROLLOVER_SECONDS);
-        cluster = conf.getString(CLUSTER_NAME, DEFAULT_CLUSTER_NAME);
+        labels = Collections.singletonMap(CLUSTER_NAME, 
conf.getString(CLUSTER_NAME, DEFAULT_CLUSTER_NAME));
+
+        executor.scheduleAtFixedRate(() -> {
+            rotateLatencyCollection();
+        }, 1, latencyRolloverSeconds, TimeUnit.SECONDS);
 
-        
executor.scheduleAtFixedRate(catchingAndLoggingThrowables(this::rotateLatencyCollection),
-                1, latencyRolloverSeconds, TimeUnit.SECONDS);
     }
 
-    @Override
     public void stop() {
-        executor.shutdownNow();
+        executor.shutdown();
     }
 
-    @Override
     public StatsLogger getStatsLogger(String scope) {
-        return this.cachingStatsProvider.getStatsLogger(scope);
+        return new PrometheusStatsLogger(PrometheusMetricsProvider.this, 
scope, labels);
     }
 
     @Override
     public void writeAllMetrics(Writer writer) throws IOException {
-        gauges.forEach((name, gauge) -> 
PrometheusTextFormatUtil.writeGauge(writer, name, cluster, gauge));
-        counters.forEach((name, counter) -> 
PrometheusTextFormatUtil.writeCounter(writer, name, cluster, counter));
-        opStats.forEach((name, opStatLogger) -> 
PrometheusTextFormatUtil.writeOpStat(writer, name, cluster,
-                opStatLogger));
+        PrometheusTextFormat prometheusTextFormat = new PrometheusTextFormat();
+        PrometheusTextFormat.writeMetricsCollectedByPrometheusClient(writer, 
registry);

Review Comment:
   I don't get this line. Why would we want to emit all static default 
Prometheus collector into the writer, where we already have that in 
`PulsarMetricsGenerator`?
   



##########
pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/prometheus/metrics/PrometheusMetricsProvider.java:
##########
@@ -18,109 +18,109 @@
  */
 package org.apache.pulsar.broker.stats.prometheus.metrics;
 
-import static 
org.apache.pulsar.common.util.Runnables.catchingAndLoggingThrowables;
 import com.google.common.annotations.VisibleForTesting;
 import io.netty.util.concurrent.DefaultThreadFactory;
 import io.prometheus.client.Collector;
+import io.prometheus.client.CollectorRegistry;
 import java.io.IOException;
 import java.io.Writer;
+import java.util.Collections;
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ConcurrentMap;
-import java.util.concurrent.ConcurrentSkipListMap;
 import java.util.concurrent.Executors;
 import java.util.concurrent.ScheduledExecutorService;
 import java.util.concurrent.TimeUnit;
-import org.apache.bookkeeper.stats.CachingStatsProvider;
 import org.apache.bookkeeper.stats.StatsLogger;
 import org.apache.bookkeeper.stats.StatsProvider;
 import org.apache.commons.configuration.Configuration;
 import org.apache.commons.lang.StringUtils;
+import org.apache.pulsar.broker.stats.prometheus.PrometheusRawMetricsProvider;
+import org.apache.pulsar.common.util.SimpleTextOutputStream;
 
 /**
- * A <i>Prometheus</i> based {@link StatsProvider} implementation.
+ * A <i>Prometheus</i> based {@link PrometheusRawMetricsProvider} 
implementation.
  */
-public class PrometheusMetricsProvider implements StatsProvider {
+public class PrometheusMetricsProvider implements StatsProvider, 
PrometheusRawMetricsProvider {
     private ScheduledExecutorService executor;
 
     public static final String PROMETHEUS_STATS_LATENCY_ROLLOVER_SECONDS = 
"prometheusStatsLatencyRolloverSeconds";
     public static final int DEFAULT_PROMETHEUS_STATS_LATENCY_ROLLOVER_SECONDS 
= 60;
     public static final String CLUSTER_NAME = "cluster";
     public static final String DEFAULT_CLUSTER_NAME = "pulsar";
 
-    private String cluster;
-    private final CachingStatsProvider cachingStatsProvider;
-
+    private final CollectorRegistry registry;
+    private Map<String, String> labels;
     /**
      * These acts a registry of the metrics defined in this provider.
      */
-    public final ConcurrentMap<String, LongAdderCounter> counters = new 
ConcurrentSkipListMap<>();
-    public final ConcurrentMap<String, SimpleGauge<? extends Number>> gauges = 
new ConcurrentSkipListMap<>();
-    public final ConcurrentMap<String, DataSketchesOpStatsLogger> opStats = 
new ConcurrentSkipListMap<>();
+    public final ConcurrentMap<ScopeContext, LongAdderCounter> counters = new 
ConcurrentHashMap<>();
+    public final ConcurrentMap<ScopeContext, SimpleGauge<? extends Number>> 
gauges = new ConcurrentHashMap<>();
+    public final ConcurrentMap<ScopeContext, DataSketchesOpStatsLogger> 
opStats = new ConcurrentHashMap<>();
+    final ConcurrentMap<ScopeContext, ThreadScopedDataSketchesStatsLogger> 
threadScopedOpStats =
+        new ConcurrentHashMap<>();
+    final ConcurrentMap<ScopeContext, ThreadScopedLongAdderCounter> 
threadScopedCounters =
+        new ConcurrentHashMap<>();
+
 
     public PrometheusMetricsProvider() {
-        this.cachingStatsProvider = new CachingStatsProvider(new 
StatsProvider() {
-            @Override
-            public void start(Configuration conf) {
-                // nop
-            }
-
-            @Override
-            public void stop() {
-                // nop
-            }
-
-            @Override
-            public StatsLogger getStatsLogger(String scope) {
-                return new 
PrometheusStatsLogger(PrometheusMetricsProvider.this, scope);
-            }
-
-            @Override
-            public String getStatsName(String... statsComponents) {
-                String completeName;
-                if (statsComponents.length == 0) {
-                    return "";
-                } else if (statsComponents[0].isEmpty()) {
-                    completeName = StringUtils.join(statsComponents, '_', 1, 
statsComponents.length);
-                } else {
-                    completeName = StringUtils.join(statsComponents, '_');
-                }
-                return Collector.sanitizeMetricName(completeName);
-            }
-        });
+        this(CollectorRegistry.defaultRegistry);
+    }
+
+    public PrometheusMetricsProvider(CollectorRegistry registry) {
+        this.registry = registry;
     }
 
-    @Override
     public void start(Configuration conf) {
-        executor = Executors.newSingleThreadScheduledExecutor(new 
DefaultThreadFactory("metrics"));
 
+        executor = Executors.newSingleThreadScheduledExecutor(new 
DefaultThreadFactory("metrics"));
         int latencyRolloverSeconds = 
conf.getInt(PROMETHEUS_STATS_LATENCY_ROLLOVER_SECONDS,
                 DEFAULT_PROMETHEUS_STATS_LATENCY_ROLLOVER_SECONDS);
-        cluster = conf.getString(CLUSTER_NAME, DEFAULT_CLUSTER_NAME);
+        labels = Collections.singletonMap(CLUSTER_NAME, 
conf.getString(CLUSTER_NAME, DEFAULT_CLUSTER_NAME));
+
+        executor.scheduleAtFixedRate(() -> {
+            rotateLatencyCollection();

Review Comment:
   removed `catchingAndLoggingThrowables`



##########
pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/prometheus/metrics/PrometheusTextFormatUtil.java:
##########
@@ -18,121 +18,174 @@
  */
 package org.apache.pulsar.broker.stats.prometheus.metrics;
 
-import java.io.IOException;
-import java.io.Writer;
-import org.apache.bookkeeper.stats.Counter;
+import io.prometheus.client.Collector;
+import io.prometheus.client.Collector.MetricFamilySamples;
+import io.prometheus.client.Collector.MetricFamilySamples.Sample;
+import io.prometheus.client.CollectorRegistry;
+import java.util.Enumeration;
+import java.util.Map;
+import org.apache.pulsar.common.util.SimpleTextOutputStream;
 
 /**
  * Logic to write metrics in Prometheus text format.
  */
 public class PrometheusTextFormatUtil {
-    static void writeGauge(Writer w, String name, String cluster, 
SimpleGauge<? extends Number> gauge) {
+    public static void writeGauge(SimpleTextOutputStream w, String name, 
SimpleGauge<? extends Number> gauge) {
         // Example:
-        // # TYPE bookie_client_bookkeeper_ml_scheduler_completed_tasks_0 gauge
-        // 
pulsar_bookie_client_bookkeeper_ml_scheduler_completed_tasks_0{cluster="pulsar"}
 1044057
-        try {
-            w.append("# TYPE ").append(name).append(" gauge\n");
-            w.append(name).append("{cluster=\"").append(cluster).append("\"}")
-                    .append(' 
').append(gauge.getSample().toString()).append('\n');
-        } catch (IOException e) {
-            throw new RuntimeException(e);
-        }
+        // # TYPE bookie_storage_entries_count gauge
+        // bookie_storage_entries_count 519
+        w.write("# TYPE ").write(name).write(" gauge\n");
+        w.write(name);
+        writeLabels(w, gauge.getLabels());
+        w.write(' ').write(gauge.getSample().toString()).write('\n');
+
     }
 
-    static void writeCounter(Writer w, String name, String cluster, Counter 
counter) {
+    public static void writeCounter(SimpleTextOutputStream w, String name, 
LongAdderCounter counter) {
         // Example:
         // # TYPE jvm_threads_started_total counter
-        // jvm_threads_started_total{cluster="test"} 59
-        try {
-            w.append("# TYPE ").append(name).append(" counter\n");
-            w.append(name).append("{cluster=\"").append(cluster).append("\"}")
-                    .append(' ').append(counter.get().toString()).append('\n');
-        } catch (IOException e) {
-            throw new RuntimeException(e);
-        }
+        // jvm_threads_started_total 59
+        w.write("# TYPE ").write(name).write(" counter\n");
+        w.write(name);
+        writeLabels(w, counter.getLabels());
+        w.write(' ').write(counter.get().toString()).write('\n');
     }
 
-    static void writeOpStat(Writer w, String name, String cluster, 
DataSketchesOpStatsLogger opStat) {
+    public static void writeOpStat(SimpleTextOutputStream w, String name, 
DataSketchesOpStatsLogger opStat) {
         // Example:
-        // # TYPE pulsar_bookie_client_bookkeeper_ml_scheduler_task_queued 
summary
-        // 
pulsar_bookie_client_bookkeeper_ml_scheduler_task_queued{cluster="pulsar", 
success="false",
-        // quantile="0.5"} NaN
-        // 
pulsar_bookie_client_bookkeeper_ml_scheduler_task_queued{cluster="pulsar", 
success="false",
-        // quantile="0.75"} NaN
-        // 
pulsar_bookie_client_bookkeeper_ml_scheduler_task_queued{cluster="pulsar", 
success="false",
-        // quantile="0.95"} NaN
-        // 
pulsar_bookie_client_bookkeeper_ml_scheduler_task_queued{cluster="pulsar", 
success="false",
-        // quantile="0.99"} NaN
-        // 
pulsar_bookie_client_bookkeeper_ml_scheduler_task_queued{cluster="pulsar", 
success="false",
-        // quantile="0.999"} NaN
-        // 
pulsar_bookie_client_bookkeeper_ml_scheduler_task_queued{cluster="pulsar", 
success="false",
-        // quantile="0.9999"} NaN
-        // 
pulsar_bookie_client_bookkeeper_ml_scheduler_task_queued{cluster="pulsar", 
success="false",
-        // quantile="1.0"} -Infinity
-        // 
pulsar_bookie_client_bookkeeper_ml_scheduler_task_queued_count{cluster="pulsar",
 success="false"} 0
-        // 
pulsar_bookie_client_bookkeeper_ml_scheduler_task_queued_sum{cluster="pulsar", 
success="false"} 0.0
-        // 
pulsar_bookie_client_bookkeeper_ml_scheduler_task_queued{cluster="pulsar", 
success="true",
-        // quantile="0.5"} 0.031
-        // 
pulsar_bookie_client_bookkeeper_ml_scheduler_task_queued{cluster="pulsar", 
success="true",
-        // quantile="0.75"} 0.043
-        // 
pulsar_bookie_client_bookkeeper_ml_scheduler_task_queued{cluster="pulsar", 
success="true",
-        // quantile="0.95"} 0.061
-        // 
pulsar_bookie_client_bookkeeper_ml_scheduler_task_queued{cluster="pulsar", 
success="true",
-        // quantile="0.99"} 0.064
-        // 
pulsar_bookie_client_bookkeeper_ml_scheduler_task_queued{cluster="pulsar", 
success="true",
-        // quantile="0.999"} 0.073
-        // 
pulsar_bookie_client_bookkeeper_ml_scheduler_task_queued{cluster="pulsar", 
success="true",
-        // quantile="0.9999"} 0.073
-        // 
pulsar_bookie_client_bookkeeper_ml_scheduler_task_queued{cluster="pulsar", 
success="true",
-        // quantile="1.0"} 0.552
-        // 
pulsar_bookie_client_bookkeeper_ml_scheduler_task_queued_count{cluster="pulsar",
 success="true"} 40911432
-        // 
pulsar_bookie_client_bookkeeper_ml_scheduler_task_queued_sum{cluster="pulsar", 
success="true"} 527.0
-        try {
-            w.append("# TYPE ").append(name).append(" summary\n");
-            writeQuantile(w, opStat, name, cluster, false, 0.5);
-            writeQuantile(w, opStat, name, cluster, false, 0.75);
-            writeQuantile(w, opStat, name, cluster, false, 0.95);
-            writeQuantile(w, opStat, name, cluster, false, 0.99);
-            writeQuantile(w, opStat, name, cluster, false, 0.999);
-            writeQuantile(w, opStat, name, cluster, false, 0.9999);
-            writeQuantile(w, opStat, name, cluster, false, 1.0);
-            writeCount(w, opStat, name, cluster, false);
-            writeSum(w, opStat, name, cluster, false);
-
-            writeQuantile(w, opStat, name, cluster, true, 0.5);
-            writeQuantile(w, opStat, name, cluster, true, 0.75);
-            writeQuantile(w, opStat, name, cluster, true, 0.95);
-            writeQuantile(w, opStat, name, cluster, true, 0.99);
-            writeQuantile(w, opStat, name, cluster, true, 0.999);
-            writeQuantile(w, opStat, name, cluster, true, 0.9999);
-            writeQuantile(w, opStat, name, cluster, true, 1.0);
-            writeCount(w, opStat, name, cluster, true);
-            writeSum(w, opStat, name, cluster, true);
-
-        } catch (IOException e) {
-            throw new RuntimeException(e);
+        // # TYPE bookie_journal_JOURNAL_ADD_ENTRY summary
+        // bookie_journal_JOURNAL_ADD_ENTRY{success="false",quantile="0.5",} 
NaN
+        // bookie_journal_JOURNAL_ADD_ENTRY{success="false",quantile="0.75",} 
NaN
+        // bookie_journal_JOURNAL_ADD_ENTRY{success="false",quantile="0.95",} 
NaN
+        // bookie_journal_JOURNAL_ADD_ENTRY{success="false",quantile="0.99",} 
NaN
+        // bookie_journal_JOURNAL_ADD_ENTRY{success="false",quantile="0.999",} 
NaN
+        // 
bookie_journal_JOURNAL_ADD_ENTRY{success="false",quantile="0.9999",} NaN
+        // bookie_journal_JOURNAL_ADD_ENTRY{success="false",quantile="1.0",} 
NaN
+        // bookie_journal_JOURNAL_ADD_ENTRY_count{success="false",} 0.0
+        // bookie_journal_JOURNAL_ADD_ENTRY_sum{success="false",} 0.0
+        // bookie_journal_JOURNAL_ADD_ENTRY{success="true",quantile="0.5",} 
1.706
+        // bookie_journal_JOURNAL_ADD_ENTRY{success="true",quantile="0.75",} 
1.89
+        // bookie_journal_JOURNAL_ADD_ENTRY{success="true",quantile="0.95",} 
2.121
+        // bookie_journal_JOURNAL_ADD_ENTRY{success="true",quantile="0.99",} 
10.708
+        // bookie_journal_JOURNAL_ADD_ENTRY{success="true",quantile="0.999",} 
10.902
+        // bookie_journal_JOURNAL_ADD_ENTRY{success="true",quantile="0.9999",} 
10.902
+        // bookie_journal_JOURNAL_ADD_ENTRY{success="true",quantile="1.0",} 
10.902
+        // bookie_journal_JOURNAL_ADD_ENTRY_count{success="true",} 658.0
+        // bookie_journal_JOURNAL_ADD_ENTRY_sum{success="true",} 
1265.0800000000002
+        w.write("# TYPE ").write(name).write(" summary\n");
+        writeQuantile(w, opStat, name, false, 0.5);
+        writeQuantile(w, opStat, name, false, 0.75);
+        writeQuantile(w, opStat, name, false, 0.95);
+        writeQuantile(w, opStat, name, false, 0.99);
+        writeQuantile(w, opStat, name, false, 0.999);
+        writeQuantile(w, opStat, name, false, 0.9999);
+        writeQuantile(w, opStat, name, false, 1.0);
+        writeCount(w, opStat, name, false);
+        writeSum(w, opStat, name, false);
+
+        writeQuantile(w, opStat, name, true, 0.5);
+        writeQuantile(w, opStat, name, true, 0.75);
+        writeQuantile(w, opStat, name, true, 0.95);
+        writeQuantile(w, opStat, name, true, 0.99);
+        writeQuantile(w, opStat, name, true, 0.999);
+        writeQuantile(w, opStat, name, true, 0.9999);
+        writeQuantile(w, opStat, name, true, 1.0);
+        writeCount(w, opStat, name, true);
+        writeSum(w, opStat, name, true);
+    }
+
+    private static void writeQuantile(SimpleTextOutputStream w, 
DataSketchesOpStatsLogger opStat, String name,
+                                      Boolean success, double quantile) {
+        w.write(name)
+                .write("{success=\"").write(success.toString())
+                .write("\",quantile=\"").write(Double.toString(quantile));
+        if (!opStat.getLabels().isEmpty()) {
+            w.write("\", ");
+            writeLabelsNoBraces(w, opStat.getLabels());
+        } else {
+            w.write("\"");
+        }
+        w.write("} ")
+                .write(Double.toString(opStat.getQuantileValue(success, 
quantile))).write('\n');
+    }
+
+    private static void writeCount(SimpleTextOutputStream w, 
DataSketchesOpStatsLogger opStat, String name,
+                                   Boolean success) {
+        w.write(name).write("_count{success=\"").write(success.toString());
+        if (!opStat.getLabels().isEmpty()) {
+            w.write("\", ");
+            writeLabelsNoBraces(w, opStat.getLabels());
+        } else {
+            w.write("\"");
+        }
+        w.write("} ")
+                .write(Long.toString(opStat.getCount(success))).write('\n');
+    }
+
+    private static void writeSum(SimpleTextOutputStream w, 
DataSketchesOpStatsLogger opStat, String name,
+                                 Boolean success) {
+        w.write(name).write("_sum{success=\"").write(success.toString());
+        if (!opStat.getLabels().isEmpty()) {
+            w.write("\", ");
+            writeLabelsNoBraces(w, opStat.getLabels());
+        } else {
+            w.write("\"");
         }
+        w.write("} ")
+                .write(Double.toString(opStat.getSum(success))).write('\n');
     }
 
-    private static void writeQuantile(Writer w, DataSketchesOpStatsLogger 
opStat, String name, String cluster,
-                                      Boolean success, double quantile) throws 
IOException {
-        w.append(name).append("{cluster=\"").append(cluster).append("\", 
success=\"")
-                .append(success.toString()).append("\", quantile=\"")
-                .append(Double.toString(quantile)).append("\"} ")
-                .append(Double.toString(opStat.getQuantileValue(success, 
quantile))).append('\n');
+    public static void 
writeMetricsCollectedByPrometheusClient(SimpleTextOutputStream w, 
CollectorRegistry registry) {

Review Comment:
   Do we need this? Is this called from anywhere once we remove the call talked 
in the earlier comment?



##########
pulsar-broker/src/test/java/org/apache/pulsar/broker/stats/PrometheusMetricsTest.java:
##########
@@ -1634,6 +1643,29 @@ public static Multimap<String, Metric> 
parseMetrics(String metrics) {
         return parsed;
     }
 
+    @Test
+    public void testRawMetricsProvider() throws IOException {
+        PrometheusMetricsProvider rawMetricsProvider = new 
PrometheusMetricsProvider();
+        rawMetricsProvider.start(new PropertiesConfiguration());
+        
rawMetricsProvider.getStatsLogger("test").getOpStatsLogger("test_metrics")
+            .registerSuccessfulEvent(100, TimeUnit.NANOSECONDS);
+
+        getPulsar().addPrometheusRawMetricsProvider(rawMetricsProvider);

Review Comment:
   If you already have integration built it, and you go about querying the real 
metrics endpoint, why not check that you have a real BK client metric *with* 
the labels you expect to have but now you actually have them?
   



##########
pulsar-broker/src/test/java/org/apache/pulsar/broker/stats/PrometheusMetricsTest.java:
##########
@@ -1634,6 +1643,29 @@ public static Multimap<String, Metric> 
parseMetrics(String metrics) {
         return parsed;
     }
 
+    @Test
+    public void testRawMetricsProvider() throws IOException {
+        PrometheusMetricsProvider rawMetricsProvider = new 
PrometheusMetricsProvider();
+        rawMetricsProvider.start(new PropertiesConfiguration());
+        
rawMetricsProvider.getStatsLogger("test").getOpStatsLogger("test_metrics")
+            .registerSuccessfulEvent(100, TimeUnit.NANOSECONDS);
+
+        getPulsar().addPrometheusRawMetricsProvider(rawMetricsProvider);
+        HttpClient httpClient = HttpClientBuilder.create().build();
+        final String metricsEndPoint = getPulsar().getWebServiceAddress() + 
"/metrics";
+        HttpResponse response = httpClient.execute(new 
HttpGet(metricsEndPoint));
+        InputStream inputStream = response.getEntity().getContent();
+        InputStreamReader isReader = new InputStreamReader(inputStream);
+        BufferedReader reader = new BufferedReader(isReader);
+        StringBuffer sb = new StringBuffer();
+        String str;
+        while((str = reader.readLine()) != null){
+            sb.append(str);
+        }
+        Assert.assertTrue(sb.toString().contains("test_metrics"));

Review Comment:
   Maybe `parseMetrics()`



##########
pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/prometheus/metrics/PrometheusMetricsProvider.java:
##########
@@ -18,109 +18,109 @@
  */
 package org.apache.pulsar.broker.stats.prometheus.metrics;
 
-import static 
org.apache.pulsar.common.util.Runnables.catchingAndLoggingThrowables;
 import com.google.common.annotations.VisibleForTesting;
 import io.netty.util.concurrent.DefaultThreadFactory;
 import io.prometheus.client.Collector;
+import io.prometheus.client.CollectorRegistry;
 import java.io.IOException;
 import java.io.Writer;
+import java.util.Collections;
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ConcurrentMap;
-import java.util.concurrent.ConcurrentSkipListMap;
 import java.util.concurrent.Executors;
 import java.util.concurrent.ScheduledExecutorService;
 import java.util.concurrent.TimeUnit;
-import org.apache.bookkeeper.stats.CachingStatsProvider;
 import org.apache.bookkeeper.stats.StatsLogger;
 import org.apache.bookkeeper.stats.StatsProvider;
 import org.apache.commons.configuration.Configuration;
 import org.apache.commons.lang.StringUtils;
+import org.apache.pulsar.broker.stats.prometheus.PrometheusRawMetricsProvider;
+import org.apache.pulsar.common.util.SimpleTextOutputStream;
 
 /**
- * A <i>Prometheus</i> based {@link StatsProvider} implementation.
+ * A <i>Prometheus</i> based {@link PrometheusRawMetricsProvider} 
implementation.
  */
-public class PrometheusMetricsProvider implements StatsProvider {
+public class PrometheusMetricsProvider implements StatsProvider, 
PrometheusRawMetricsProvider {
     private ScheduledExecutorService executor;
 
     public static final String PROMETHEUS_STATS_LATENCY_ROLLOVER_SECONDS = 
"prometheusStatsLatencyRolloverSeconds";
     public static final int DEFAULT_PROMETHEUS_STATS_LATENCY_ROLLOVER_SECONDS 
= 60;
     public static final String CLUSTER_NAME = "cluster";
     public static final String DEFAULT_CLUSTER_NAME = "pulsar";
 
-    private String cluster;
-    private final CachingStatsProvider cachingStatsProvider;
-
+    private final CollectorRegistry registry;
+    private Map<String, String> labels;
     /**
      * These acts a registry of the metrics defined in this provider.
      */
-    public final ConcurrentMap<String, LongAdderCounter> counters = new 
ConcurrentSkipListMap<>();
-    public final ConcurrentMap<String, SimpleGauge<? extends Number>> gauges = 
new ConcurrentSkipListMap<>();
-    public final ConcurrentMap<String, DataSketchesOpStatsLogger> opStats = 
new ConcurrentSkipListMap<>();
+    public final ConcurrentMap<ScopeContext, LongAdderCounter> counters = new 
ConcurrentHashMap<>();
+    public final ConcurrentMap<ScopeContext, SimpleGauge<? extends Number>> 
gauges = new ConcurrentHashMap<>();
+    public final ConcurrentMap<ScopeContext, DataSketchesOpStatsLogger> 
opStats = new ConcurrentHashMap<>();
+    final ConcurrentMap<ScopeContext, ThreadScopedDataSketchesStatsLogger> 
threadScopedOpStats =
+        new ConcurrentHashMap<>();
+    final ConcurrentMap<ScopeContext, ThreadScopedLongAdderCounter> 
threadScopedCounters =
+        new ConcurrentHashMap<>();
+
 
     public PrometheusMetricsProvider() {
-        this.cachingStatsProvider = new CachingStatsProvider(new 
StatsProvider() {
-            @Override
-            public void start(Configuration conf) {
-                // nop
-            }
-
-            @Override
-            public void stop() {
-                // nop
-            }
-
-            @Override
-            public StatsLogger getStatsLogger(String scope) {
-                return new 
PrometheusStatsLogger(PrometheusMetricsProvider.this, scope);
-            }
-
-            @Override
-            public String getStatsName(String... statsComponents) {
-                String completeName;
-                if (statsComponents.length == 0) {
-                    return "";
-                } else if (statsComponents[0].isEmpty()) {
-                    completeName = StringUtils.join(statsComponents, '_', 1, 
statsComponents.length);
-                } else {
-                    completeName = StringUtils.join(statsComponents, '_');
-                }
-                return Collector.sanitizeMetricName(completeName);
-            }
-        });
+        this(CollectorRegistry.defaultRegistry);
+    }
+
+    public PrometheusMetricsProvider(CollectorRegistry registry) {
+        this.registry = registry;
     }
 
-    @Override
     public void start(Configuration conf) {
-        executor = Executors.newSingleThreadScheduledExecutor(new 
DefaultThreadFactory("metrics"));
 
+        executor = Executors.newSingleThreadScheduledExecutor(new 
DefaultThreadFactory("metrics"));
         int latencyRolloverSeconds = 
conf.getInt(PROMETHEUS_STATS_LATENCY_ROLLOVER_SECONDS,
                 DEFAULT_PROMETHEUS_STATS_LATENCY_ROLLOVER_SECONDS);
-        cluster = conf.getString(CLUSTER_NAME, DEFAULT_CLUSTER_NAME);
+        labels = Collections.singletonMap(CLUSTER_NAME, 
conf.getString(CLUSTER_NAME, DEFAULT_CLUSTER_NAME));
+
+        executor.scheduleAtFixedRate(() -> {
+            rotateLatencyCollection();
+        }, 1, latencyRolloverSeconds, TimeUnit.SECONDS);
 
-        
executor.scheduleAtFixedRate(catchingAndLoggingThrowables(this::rotateLatencyCollection),
-                1, latencyRolloverSeconds, TimeUnit.SECONDS);
     }
 
-    @Override
     public void stop() {
-        executor.shutdownNow();
+        executor.shutdown();
     }
 
-    @Override
     public StatsLogger getStatsLogger(String scope) {
-        return this.cachingStatsProvider.getStatsLogger(scope);
+        return new PrometheusStatsLogger(PrometheusMetricsProvider.this, 
scope, labels);
     }
 
     @Override
     public void writeAllMetrics(Writer writer) throws IOException {
-        gauges.forEach((name, gauge) -> 
PrometheusTextFormatUtil.writeGauge(writer, name, cluster, gauge));
-        counters.forEach((name, counter) -> 
PrometheusTextFormatUtil.writeCounter(writer, name, cluster, counter));
-        opStats.forEach((name, opStatLogger) -> 
PrometheusTextFormatUtil.writeOpStat(writer, name, cluster,
-                opStatLogger));
+        PrometheusTextFormat prometheusTextFormat = new PrometheusTextFormat();

Review Comment:
   `writeAllMetrics` is currently called from `PrometheusMetricsGenerator`:
   
   ```java
               generateManagedLedgerBookieClientMetrics(pulsar, stream);
   ```
   
   After that call we have support for `PrometheusRawMetricsProvider`
   ```java
               if (metricsProviders != null) {
                   for (PrometheusRawMetricsProvider metricsProvider : 
metricsProviders) {
                       metricsProvider.generate(stream);
                   }
               }
   ```
   
   I haven't seen any registration of `PrometheusMetricsProvider` as 
`PrometheusRawMetricsProvider` other than the test. If you do register it, you 
will have duplicates.
   
   Since writing directly to `SimpleTextOutputFormat` is more correct, I advise 
the following:
   1. `writeAllMetrics()` should have to throw an exception and have no 
implementation since it shouldn't be called at all.
   2. Fix the implementation of `generateManagedLedgerBookieClientMetrics` to 
call a method that uses `SimpleTextOutputFormat` and lose the declaration of 
implementation of `PrometheusRawMetricsProvider`, it can just be a public 
method.
   
   The only caveat I see is this: Do you see any chance that we will have two 
different pairs of (`scopeContext`, gauge/metric/...) such that they have the 
same metric name, maybe the only difference is in the labels?
   
   If you ditch that Writer based method, you can get rid of 
`PrometheusTextFormat.java`



##########
pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/prometheus/metrics/PrometheusMetricsProvider.java:
##########
@@ -18,109 +18,109 @@
  */
 package org.apache.pulsar.broker.stats.prometheus.metrics;
 
-import static 
org.apache.pulsar.common.util.Runnables.catchingAndLoggingThrowables;
 import com.google.common.annotations.VisibleForTesting;
 import io.netty.util.concurrent.DefaultThreadFactory;
 import io.prometheus.client.Collector;
+import io.prometheus.client.CollectorRegistry;
 import java.io.IOException;
 import java.io.Writer;
+import java.util.Collections;
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ConcurrentMap;
-import java.util.concurrent.ConcurrentSkipListMap;
 import java.util.concurrent.Executors;
 import java.util.concurrent.ScheduledExecutorService;
 import java.util.concurrent.TimeUnit;
-import org.apache.bookkeeper.stats.CachingStatsProvider;
 import org.apache.bookkeeper.stats.StatsLogger;
 import org.apache.bookkeeper.stats.StatsProvider;
 import org.apache.commons.configuration.Configuration;
 import org.apache.commons.lang.StringUtils;
+import org.apache.pulsar.broker.stats.prometheus.PrometheusRawMetricsProvider;
+import org.apache.pulsar.common.util.SimpleTextOutputStream;
 
 /**
- * A <i>Prometheus</i> based {@link StatsProvider} implementation.
+ * A <i>Prometheus</i> based {@link PrometheusRawMetricsProvider} 
implementation.
  */
-public class PrometheusMetricsProvider implements StatsProvider {
+public class PrometheusMetricsProvider implements StatsProvider, 
PrometheusRawMetricsProvider {
     private ScheduledExecutorService executor;
 
     public static final String PROMETHEUS_STATS_LATENCY_ROLLOVER_SECONDS = 
"prometheusStatsLatencyRolloverSeconds";
     public static final int DEFAULT_PROMETHEUS_STATS_LATENCY_ROLLOVER_SECONDS 
= 60;
     public static final String CLUSTER_NAME = "cluster";
     public static final String DEFAULT_CLUSTER_NAME = "pulsar";
 
-    private String cluster;
-    private final CachingStatsProvider cachingStatsProvider;

Review Comment:
   @hangc0276 Previously there was a caching layer called cachingStatsProvider 
which saved in a map every StatsLogger created via getStatsLogger. If it was 
created previously it would return it.
   Now it is removed



##########
pulsar-broker/src/test/java/org/apache/pulsar/broker/stats/PrometheusMetricsTest.java:
##########
@@ -1634,6 +1643,29 @@ public static Multimap<String, Metric> 
parseMetrics(String metrics) {
         return parsed;
     }
 
+    @Test
+    public void testRawMetricsProvider() throws IOException {
+        PrometheusMetricsProvider rawMetricsProvider = new 
PrometheusMetricsProvider();
+        rawMetricsProvider.start(new PropertiesConfiguration());
+        
rawMetricsProvider.getStatsLogger("test").getOpStatsLogger("test_metrics")
+            .registerSuccessfulEvent(100, TimeUnit.NANOSECONDS);
+
+        getPulsar().addPrometheusRawMetricsProvider(rawMetricsProvider);
+        HttpClient httpClient = HttpClientBuilder.create().build();
+        final String metricsEndPoint = getPulsar().getWebServiceAddress() + 
"/metrics";
+        HttpResponse response = httpClient.execute(new 
HttpGet(metricsEndPoint));
+        InputStream inputStream = response.getEntity().getContent();
+        InputStreamReader isReader = new InputStreamReader(inputStream);
+        BufferedReader reader = new BufferedReader(isReader);
+        StringBuffer sb = new StringBuffer();
+        String str;
+        while((str = reader.readLine()) != null){
+            sb.append(str);
+        }
+        Assert.assertTrue(sb.toString().contains("test_metrics"));

Review Comment:
   This doesn't test the full functionality and I think it may be duplicate 
when calling and parsing.
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to