This is an automated email from the ASF dual-hosted git repository.

mmerli pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/master by this push:
     new 3ff7989  [Metrics] Reduce CPU consumption of metrics creation (#9735)
3ff7989 is described below

commit 3ff798909239ee4c2e31f96721888f4984ce2548
Author: Lari Hotari <[email protected]>
AuthorDate: Sat Feb 27 08:00:44 2021 +0200

    [Metrics] Reduce CPU consumption of metrics creation (#9735)
    
    * [Metrics] Reduce CPU consumption of metrics creation
    
    * Use Map.compute to improve aggregation logic
---
 .../broker/stats/metrics/AbstractMetrics.java      | 72 +++++++++++++---------
 .../broker/stats/metrics/ManagedLedgerMetrics.java | 27 +++++---
 2 files changed, 62 insertions(+), 37 deletions(-)

diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/metrics/AbstractMetrics.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/metrics/AbstractMetrics.java
index 21b941e..ed6e79f 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/metrics/AbstractMetrics.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/metrics/AbstractMetrics.java
@@ -59,6 +59,50 @@ abstract class AbstractMetrics {
         }
     }
 
+    // simple abstract for the buckets, their boundaries and pre-calculated 
keys
+    // pre-calculating the keys avoids a lot of object allocations during 
metric collection
+    static class Buckets {
+        private final double[] boundaries;
+        private final String[] bucketKeys;
+
+        Buckets(String metricKey, double[] boundaries) {
+            this.boundaries = boundaries;
+            this.bucketKeys = generateBucketKeys(metricKey, boundaries);
+        }
+
+        private static String[] generateBucketKeys(String mkey, double[] 
boundaries) {
+            String[] keys = new String[boundaries.length + 1];
+            for (int i = 0; i < boundaries.length + 1; i++) {
+                String bucketKey;
+                double value;
+
+                // example of key : "<metric_key>_0.0_0.5"
+                if (i == 0 && boundaries.length > 0) {
+                    bucketKey = String.format("%s_0.0_%1.1f", mkey, 
boundaries[i]);
+                } else if (i < boundaries.length) {
+                    bucketKey = String.format("%s_%1.1f_%1.1f", mkey, 
boundaries[i - 1], boundaries[i]);
+                } else {
+                    bucketKey = String.format("%s_OVERFLOW", mkey);
+                }
+                keys[i] = bucketKey;
+            }
+            return keys;
+        }
+
+        public void populateBucketEntries(Map<String, Double> map, long[] 
bucketValues, int period) {
+            // bucket values should be one more that the boundaries to have 
the last element as OVERFLOW
+            if (bucketValues != null && bucketValues.length != 
boundaries.length + 1) {
+                throw new RuntimeException("Bucket boundary and value array 
length mismatch");
+            }
+
+            for (int i = 0; i < boundaries.length + 1; i++) {
+                double value = (bucketValues == null) ? 0.0D : ((double) 
bucketValues[i] / (period > 0 ? period : 1));
+                map.compute(bucketKeys[i], (key, currentValue) -> 
(currentValue == null ? 0.0d : currentValue) + value);
+            }
+        }
+    }
+
+
     protected final PulsarService pulsar;
 
     abstract List<Metrics> generate();
@@ -169,34 +213,6 @@ abstract class AbstractMetrics {
         return createMetrics(dimensionMap);
     }
 
-    protected void populateBucketEntries(Map<String, Double> map, String mkey, 
double[] boundaries,
-            long[] bucketValues, int period) {
-
-        // bucket values should be one more that the boundaries to have the 
last element as OVERFLOW
-        if (bucketValues != null && bucketValues.length != boundaries.length + 
1) {
-            throw new RuntimeException("Bucket boundary and value array length 
mismatch");
-        }
-
-        for (int i = 0; i < boundaries.length + 1; i++) {
-            String bucketKey;
-            double value;
-
-            // example of key : "<metric_key>_0.0_0.5"
-            if (i == 0 && boundaries.length > 0) {
-                bucketKey = String.format("%s_0.0_%1.1f", mkey, boundaries[i]);
-            } else if (i < boundaries.length) {
-                bucketKey = String.format("%s_%1.1f_%1.1f", mkey, boundaries[i 
- 1], boundaries[i]);
-            } else {
-                bucketKey = String.format("%s_OVERFLOW", mkey);
-            }
-
-            value = (bucketValues == null) ? 0.0D : ((double) bucketValues[i] 
/ (period > 0 ? period : 1));
-
-            Double val = map.getOrDefault(bucketKey, 0.0);
-            map.put(bucketKey, val + value);
-        }
-    }
-
     protected void populateAggregationMap(Map<String, List<Double>> map, 
String mkey, double value) {
         if (!map.containsKey(mkey)) {
             map.put(mkey, Lists.newArrayList(value));
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/metrics/ManagedLedgerMetrics.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/metrics/ManagedLedgerMetrics.java
index 273447c..8e15aff 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/metrics/ManagedLedgerMetrics.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/metrics/ManagedLedgerMetrics.java
@@ -35,6 +35,16 @@ public class ManagedLedgerMetrics extends AbstractMetrics {
     private Map<Metrics, List<ManagedLedgerImpl>> ledgersByDimensionMap;
     // temp map to prepare aggregation metrics
     private Map<String, Double> tempAggregatedMetricsMap;
+    private static final Buckets
+            BRK_ML_ADDENTRYLATENCYBUCKETS = new 
Buckets("brk_ml_AddEntryLatencyBuckets",
+            ENTRY_LATENCY_BUCKETS_MS);
+    private static final Buckets BRK_ML_LEDGERADDENTRYLATENCYBUCKETS = new 
Buckets(
+            "brk_ml_LedgerAddEntryLatencyBuckets", ENTRY_LATENCY_BUCKETS_MS);
+    private static final Buckets BRK_ML_LEDGERSWITCHLATENCYBUCKETS = new 
Buckets(
+            "brk_ml_LedgerSwitchLatencyBuckets", ENTRY_LATENCY_BUCKETS_MS);
+
+    private static final Buckets
+            BRK_ML_ENTRYSIZEBUCKETS = new Buckets("brk_ml_EntrySizeBuckets", 
ENTRY_SIZE_BUCKETS_BYTES);
 
     public ManagedLedgerMetrics(PulsarService pulsar) {
         super(pulsar);
@@ -98,18 +108,17 @@ public class ManagedLedgerMetrics extends AbstractMetrics {
                         (double) lStats.getStoredMessagesSize());
 
                 // handle bucket entries initialization here
-                populateBucketEntries(tempAggregatedMetricsMap, 
"brk_ml_AddEntryLatencyBuckets",
-                        ENTRY_LATENCY_BUCKETS_MS, 
lStats.getAddEntryLatencyBuckets(),
+                
BRK_ML_ADDENTRYLATENCYBUCKETS.populateBucketEntries(tempAggregatedMetricsMap,
+                        lStats.getAddEntryLatencyBuckets(),
                         ManagedLedgerFactoryImpl.StatsPeriodSeconds);
-                populateBucketEntries(tempAggregatedMetricsMap, 
"brk_ml_LedgerAddEntryLatencyBuckets",
-                        ENTRY_LATENCY_BUCKETS_MS, 
lStats.getLedgerAddEntryLatencyBuckets(),
+                
BRK_ML_LEDGERADDENTRYLATENCYBUCKETS.populateBucketEntries(tempAggregatedMetricsMap,
+                        lStats.getLedgerAddEntryLatencyBuckets(),
                         ManagedLedgerFactoryImpl.StatsPeriodSeconds);
-                populateBucketEntries(tempAggregatedMetricsMap, 
"brk_ml_LedgerSwitchLatencyBuckets",
-                        ENTRY_LATENCY_BUCKETS_MS, 
lStats.getLedgerSwitchLatencyBuckets(),
+                
BRK_ML_LEDGERSWITCHLATENCYBUCKETS.populateBucketEntries(tempAggregatedMetricsMap,
+                        lStats.getLedgerSwitchLatencyBuckets(),
                         ManagedLedgerFactoryImpl.StatsPeriodSeconds);
-
-                populateBucketEntries(tempAggregatedMetricsMap, 
"brk_ml_EntrySizeBuckets",
-                        ENTRY_SIZE_BUCKETS_BYTES, lStats.getEntrySizeBuckets(),
+                
BRK_ML_ENTRYSIZEBUCKETS.populateBucketEntries(tempAggregatedMetricsMap,
+                        lStats.getEntrySizeBuckets(),
                         ManagedLedgerFactoryImpl.StatsPeriodSeconds);
                 populateAggregationMapWithSum(tempAggregatedMetricsMap, 
"brk_ml_MarkDeleteRate",
                         lStats.getMarkDeleteRate());

Reply via email to