This is an automated email from the ASF dual-hosted git repository.
mmerli pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/master by this push:
new 2e63911 Added backlog and offloaded size in Prometheus stats (#4150)
2e63911 is described below
commit 2e6391168dc1835a109689b2af1db15b89244e27
Author: Matteo Merli <[email protected]>
AuthorDate: Tue Apr 30 12:37:28 2019 -0700
Added backlog and offloaded size in Prometheus stats (#4150)
---
.../java/org/apache/bookkeeper/mledger/ManagedLedger.java | 5 +++++
.../apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java | 14 +++++++++++++-
.../broker/stats/prometheus/AggregatedNamespaceStats.java | 6 ++++++
.../broker/stats/prometheus/NamespaceStatsAggregator.java | 12 +++++++++---
.../apache/pulsar/broker/stats/prometheus/TopicStats.java | 11 +++++++++++
5 files changed, 44 insertions(+), 4 deletions(-)
diff --git
a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/ManagedLedger.java
b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/ManagedLedger.java
index d51b0d8..4f629c5 100644
---
a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/ManagedLedger.java
+++
b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/ManagedLedger.java
@@ -323,6 +323,11 @@ public interface ManagedLedger {
long getEstimatedBacklogSize();
/**
+ * Return the size of all ledgers offloaded to 2nd tier storage
+ */
+ long getOffloadedSize();
+
+ /**
* Activate cursors those caught up backlog-threshold entries and
deactivate slow cursors which are creating
* backlog.
*/
diff --git
a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java
b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java
index a3795d3..940fb7f 100644
---
a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java
+++
b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java
@@ -235,7 +235,7 @@ public class ManagedLedgerImpl implements ManagedLedger,
CreateCallback {
private static final AtomicLongFieldUpdater<ManagedLedgerImpl>
READ_OP_COUNT_UPDATER = AtomicLongFieldUpdater
.newUpdater(ManagedLedgerImpl.class, "readOpCount");
private volatile long readOpCount = 0;
- // last read-operation's callback to check read-timeout on it.
+ // last read-operation's callback to check read-timeout on it.
private volatile ReadEntryCallbackWrapper lastReadCallback = null;
/**
@@ -3139,6 +3139,18 @@ public class ManagedLedgerImpl implements ManagedLedger,
CreateCallback {
}
}
+ @Override
+ public long getOffloadedSize() {
+ long offloadedSize = 0;
+ for (LedgerInfo li : ledgers.values()) {
+ if (li.hasOffloadContext() &&
li.getOffloadContext().getComplete()) {
+ offloadedSize += li.getSize();
+ }
+ }
+
+ return offloadedSize;
+ }
+
private static final Logger log =
LoggerFactory.getLogger(ManagedLedgerImpl.class);
}
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/prometheus/AggregatedNamespaceStats.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/prometheus/AggregatedNamespaceStats.java
index cb485c2..b2a343a 100644
---
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/prometheus/AggregatedNamespaceStats.java
+++
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/prometheus/AggregatedNamespaceStats.java
@@ -37,6 +37,10 @@ public class AggregatedNamespaceStats {
public long storageSize;
public long msgBacklog;
+ long backlogSize;
+ long offloadedStorageUsed;
+ long backlogQuotaLimit;
+
public StatsBuckets storageWriteLatencyBuckets = new StatsBuckets(
ManagedLedgerMBeanImpl.ENTRY_LATENCY_BUCKETS_USEC);
public StatsBuckets entrySizeBuckets = new
StatsBuckets(ManagedLedgerMBeanImpl.ENTRY_SIZE_BUCKETS_BYTES);
@@ -61,6 +65,8 @@ public class AggregatedNamespaceStats {
throughputOut += stats.throughputOut;
storageSize += stats.storageSize;
+ backlogSize += stats.backlogSize;
+ offloadedStorageUsed += stats.offloadedStorageUsed;
storageWriteRate += stats.storageWriteRate;
storageReadRate += stats.storageReadRate;
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/prometheus/NamespaceStatsAggregator.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/prometheus/NamespaceStatsAggregator.java
index 91ceadf..bf04b19 100644
---
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/prometheus/NamespaceStatsAggregator.java
+++
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/prometheus/NamespaceStatsAggregator.java
@@ -18,7 +18,11 @@
*/
package org.apache.pulsar.broker.stats.prometheus;
+import io.netty.util.concurrent.FastThreadLocal;
+
import java.util.concurrent.atomic.LongAdder;
+
+import org.apache.bookkeeper.mledger.ManagedLedger;
import org.apache.bookkeeper.mledger.impl.ManagedLedgerMBeanImpl;
import org.apache.pulsar.broker.PulsarService;
import org.apache.pulsar.broker.service.Topic;
@@ -26,8 +30,6 @@ import
org.apache.pulsar.broker.service.persistent.PersistentTopic;
import org.apache.pulsar.common.policies.data.ReplicatorStats;
import org.apache.pulsar.common.util.SimpleTextOutputStream;
-import io.netty.util.concurrent.FastThreadLocal;
-
public class NamespaceStatsAggregator {
private static FastThreadLocal<AggregatedNamespaceStats>
localNamespaceStats = new FastThreadLocal<AggregatedNamespaceStats>() {
@@ -84,9 +86,13 @@ public class NamespaceStatsAggregator {
if (topic instanceof PersistentTopic) {
// Managed Ledger stats
- ManagedLedgerMBeanImpl mlStats = (ManagedLedgerMBeanImpl)
((PersistentTopic) topic).getManagedLedger().getStats();
+ ManagedLedger ml = ((PersistentTopic) topic).getManagedLedger();
+ ManagedLedgerMBeanImpl mlStats = (ManagedLedgerMBeanImpl)
ml.getStats();
stats.storageSize = mlStats.getStoredMessagesSize();
+ stats.backlogSize = ml.getEstimatedBacklogSize();
+ stats.offloadedStorageUsed = ml.getOffloadedSize();
+ stats.backlogQuotaLimit = topic.getBacklogQuota().getLimit();
stats.storageWriteLatencyBuckets.addAll(mlStats.getInternalAddEntryLatencyBuckets());
stats.storageWriteLatencyBuckets.refresh();
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/prometheus/TopicStats.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/prometheus/TopicStats.java
index 6658023..e887cae 100644
---
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/prometheus/TopicStats.java
+++
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/prometheus/TopicStats.java
@@ -38,6 +38,11 @@ class TopicStats {
long storageSize;
public long msgBacklog;
+ long backlogSize;
+ long offloadedStorageUsed;
+
+ long backlogQuotaLimit;
+
StatsBuckets storageWriteLatencyBuckets = new
StatsBuckets(ManagedLedgerMBeanImpl.ENTRY_LATENCY_BUCKETS_USEC);
StatsBuckets entrySizeBuckets = new
StatsBuckets(ManagedLedgerMBeanImpl.ENTRY_SIZE_BUCKETS_BYTES);
double storageWriteRate;
@@ -59,6 +64,9 @@ class TopicStats {
msgBacklog = 0;
storageWriteRate = 0;
storageReadRate = 0;
+ backlogSize = 0;
+ offloadedStorageUsed = 0;
+ backlogQuotaLimit = 0;
replicationStats.clear();
subscriptionStats.clear();
@@ -79,6 +87,9 @@ class TopicStats {
metric(stream, cluster, namespace, topic, "pulsar_storage_size",
stats.storageSize);
metric(stream, cluster, namespace, topic, "pulsar_msg_backlog",
stats.msgBacklog);
+ metric(stream, cluster, namespace, topic,
"pulsar_storage_backlog_size", stats.backlogSize);
+ metric(stream, cluster, namespace, topic,
"pulsar_storage_offloaded_size", stats.offloadedStorageUsed);
+ metric(stream, cluster, namespace, topic,
"pulsar_storage_backlog_quota_limit", stats.backlogQuotaLimit);
long[] latencyBuckets = stats.storageWriteLatencyBuckets.getBuckets();
metric(stream, cluster, namespace, topic,
"pulsar_storage_write_latency_le_0_5", latencyBuckets[0]);