This is an automated email from the ASF dual-hosted git repository.

mmerli pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/master by this push:
     new 2e63911  Added backlog and offloaded size in Prometheus stats (#4150)
2e63911 is described below

commit 2e6391168dc1835a109689b2af1db15b89244e27
Author: Matteo Merli <[email protected]>
AuthorDate: Tue Apr 30 12:37:28 2019 -0700

    Added backlog and offloaded size in Prometheus stats (#4150)
---
 .../java/org/apache/bookkeeper/mledger/ManagedLedger.java  |  5 +++++
 .../apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java  | 14 +++++++++++++-
 .../broker/stats/prometheus/AggregatedNamespaceStats.java  |  6 ++++++
 .../broker/stats/prometheus/NamespaceStatsAggregator.java  | 12 +++++++++---
 .../apache/pulsar/broker/stats/prometheus/TopicStats.java  | 11 +++++++++++
 5 files changed, 44 insertions(+), 4 deletions(-)

diff --git 
a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/ManagedLedger.java 
b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/ManagedLedger.java
index d51b0d8..4f629c5 100644
--- 
a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/ManagedLedger.java
+++ 
b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/ManagedLedger.java
@@ -323,6 +323,11 @@ public interface ManagedLedger {
     long getEstimatedBacklogSize();
 
     /**
+     * Return the size of all ledgers offloaded to 2nd tier storage
+     */
+    long getOffloadedSize();
+
+    /**
      * Activate cursors those caught up backlog-threshold entries and 
deactivate slow cursors which are creating
      * backlog.
      */
diff --git 
a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java
 
b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java
index a3795d3..940fb7f 100644
--- 
a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java
+++ 
b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java
@@ -235,7 +235,7 @@ public class ManagedLedgerImpl implements ManagedLedger, 
CreateCallback {
     private static final AtomicLongFieldUpdater<ManagedLedgerImpl> 
READ_OP_COUNT_UPDATER = AtomicLongFieldUpdater
             .newUpdater(ManagedLedgerImpl.class, "readOpCount");
     private volatile long readOpCount = 0;
-    // last read-operation's callback to check read-timeout on it. 
+    // last read-operation's callback to check read-timeout on it.
     private volatile ReadEntryCallbackWrapper lastReadCallback = null;
 
     /**
@@ -3139,6 +3139,18 @@ public class ManagedLedgerImpl implements ManagedLedger, 
CreateCallback {
         }
     }
 
+    @Override
+    public long getOffloadedSize() {
+        long offloadedSize = 0;
+        for (LedgerInfo li : ledgers.values()) {
+            if (li.hasOffloadContext() && 
li.getOffloadContext().getComplete()) {
+                offloadedSize += li.getSize();
+            }
+        }
+
+        return offloadedSize;
+    }
+
     private static final Logger log = 
LoggerFactory.getLogger(ManagedLedgerImpl.class);
 
 }
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/prometheus/AggregatedNamespaceStats.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/prometheus/AggregatedNamespaceStats.java
index cb485c2..b2a343a 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/prometheus/AggregatedNamespaceStats.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/prometheus/AggregatedNamespaceStats.java
@@ -37,6 +37,10 @@ public class AggregatedNamespaceStats {
     public long storageSize;
     public long msgBacklog;
 
+    long backlogSize;
+    long offloadedStorageUsed;
+    long backlogQuotaLimit;
+
     public StatsBuckets storageWriteLatencyBuckets = new StatsBuckets(
             ManagedLedgerMBeanImpl.ENTRY_LATENCY_BUCKETS_USEC);
     public StatsBuckets entrySizeBuckets = new 
StatsBuckets(ManagedLedgerMBeanImpl.ENTRY_SIZE_BUCKETS_BYTES);
@@ -61,6 +65,8 @@ public class AggregatedNamespaceStats {
         throughputOut += stats.throughputOut;
 
         storageSize += stats.storageSize;
+        backlogSize += stats.backlogSize;
+        offloadedStorageUsed += stats.offloadedStorageUsed;
 
         storageWriteRate += stats.storageWriteRate;
         storageReadRate += stats.storageReadRate;
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/prometheus/NamespaceStatsAggregator.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/prometheus/NamespaceStatsAggregator.java
index 91ceadf..bf04b19 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/prometheus/NamespaceStatsAggregator.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/prometheus/NamespaceStatsAggregator.java
@@ -18,7 +18,11 @@
  */
 package org.apache.pulsar.broker.stats.prometheus;
 
+import io.netty.util.concurrent.FastThreadLocal;
+
 import java.util.concurrent.atomic.LongAdder;
+
+import org.apache.bookkeeper.mledger.ManagedLedger;
 import org.apache.bookkeeper.mledger.impl.ManagedLedgerMBeanImpl;
 import org.apache.pulsar.broker.PulsarService;
 import org.apache.pulsar.broker.service.Topic;
@@ -26,8 +30,6 @@ import 
org.apache.pulsar.broker.service.persistent.PersistentTopic;
 import org.apache.pulsar.common.policies.data.ReplicatorStats;
 import org.apache.pulsar.common.util.SimpleTextOutputStream;
 
-import io.netty.util.concurrent.FastThreadLocal;
-
 public class NamespaceStatsAggregator {
 
     private static FastThreadLocal<AggregatedNamespaceStats> 
localNamespaceStats = new FastThreadLocal<AggregatedNamespaceStats>() {
@@ -84,9 +86,13 @@ public class NamespaceStatsAggregator {
 
         if (topic instanceof PersistentTopic) {
             // Managed Ledger stats
-            ManagedLedgerMBeanImpl mlStats = (ManagedLedgerMBeanImpl) 
((PersistentTopic) topic).getManagedLedger().getStats();
+            ManagedLedger ml = ((PersistentTopic) topic).getManagedLedger();
+            ManagedLedgerMBeanImpl mlStats = (ManagedLedgerMBeanImpl) 
ml.getStats();
 
             stats.storageSize = mlStats.getStoredMessagesSize();
+            stats.backlogSize = ml.getEstimatedBacklogSize();
+            stats.offloadedStorageUsed = ml.getOffloadedSize();
+            stats.backlogQuotaLimit = topic.getBacklogQuota().getLimit();
 
             
stats.storageWriteLatencyBuckets.addAll(mlStats.getInternalAddEntryLatencyBuckets());
             stats.storageWriteLatencyBuckets.refresh();
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/prometheus/TopicStats.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/prometheus/TopicStats.java
index 6658023..e887cae 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/prometheus/TopicStats.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/prometheus/TopicStats.java
@@ -38,6 +38,11 @@ class TopicStats {
     long storageSize;
     public long msgBacklog;
 
+    long backlogSize;
+    long offloadedStorageUsed;
+
+    long backlogQuotaLimit;
+
     StatsBuckets storageWriteLatencyBuckets = new 
StatsBuckets(ManagedLedgerMBeanImpl.ENTRY_LATENCY_BUCKETS_USEC);
     StatsBuckets entrySizeBuckets = new 
StatsBuckets(ManagedLedgerMBeanImpl.ENTRY_SIZE_BUCKETS_BYTES);
     double storageWriteRate;
@@ -59,6 +64,9 @@ class TopicStats {
         msgBacklog = 0;
         storageWriteRate = 0;
         storageReadRate = 0;
+        backlogSize = 0;
+        offloadedStorageUsed = 0;
+        backlogQuotaLimit = 0;
 
         replicationStats.clear();
         subscriptionStats.clear();
@@ -79,6 +87,9 @@ class TopicStats {
 
         metric(stream, cluster, namespace, topic, "pulsar_storage_size", 
stats.storageSize);
         metric(stream, cluster, namespace, topic, "pulsar_msg_backlog", 
stats.msgBacklog);
+        metric(stream, cluster, namespace, topic, 
"pulsar_storage_backlog_size", stats.backlogSize);
+        metric(stream, cluster, namespace, topic, 
"pulsar_storage_offloaded_size", stats.offloadedStorageUsed);
+        metric(stream, cluster, namespace, topic, 
"pulsar_storage_backlog_quota_limit", stats.backlogQuotaLimit);
 
         long[] latencyBuckets = stats.storageWriteLatencyBuckets.getBuckets();
         metric(stream, cluster, namespace, topic, 
"pulsar_storage_write_latency_le_0_5", latencyBuckets[0]);

Reply via email to