This is an automated email from the ASF dual-hosted git repository.

penghui pushed a commit to branch branch-2.8
in repository https://gitbox.apache.org/repos/asf/pulsar.git

commit 94b5ff5191918a6fff2cafc95d4df2758de0d049
Author: Tboy <[email protected]>
AuthorDate: Fri Jul 9 06:40:14 2021 +0800

    Fix missing replicator metrics (#11264)
    
    Fix missing replicator metrics [msgRateExpired, connected, 
replicationDelayInSeconds] in prometheus.
    
    1.  Fix the missing metrics.
    2. Update the document.
    
    - [X] Make sure that the change passes the CI checks.
    
    (cherry picked from commit d811606e6baffc4bcf5d8ad69630e1f489ccf2d7)
---
 .../broker/stats/prometheus/AggregatedNamespaceStats.java |  3 +++
 .../stats/prometheus/AggregatedReplicationStats.java      | 15 ++++++++++++---
 .../broker/stats/prometheus/NamespaceStatsAggregator.java | 11 +++++++++++
 .../apache/pulsar/broker/stats/prometheus/TopicStats.java |  6 ++++++
 .../stats/prometheus/AggregatedNamespaceStatsTest.java    |  9 +++++++++
 site2/docs/reference-metrics.md                           |  4 ++++
 6 files changed, 45 insertions(+), 3 deletions(-)

diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/prometheus/AggregatedNamespaceStats.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/prometheus/AggregatedNamespaceStats.java
index c88063f..05f1b15 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/prometheus/AggregatedNamespaceStats.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/prometheus/AggregatedNamespaceStats.java
@@ -88,6 +88,9 @@ public class AggregatedNamespaceStats {
             replStats.msgThroughputIn += as.msgThroughputIn;
             replStats.msgThroughputOut += as.msgThroughputOut;
             replStats.replicationBacklog += as.replicationBacklog;
+            replStats.msgRateExpired += as.msgRateExpired;
+            replStats.connectedCount += as.connectedCount;
+            replStats.replicationDelayInSeconds += 
as.replicationDelayInSeconds;
         });
 
         stats.subscriptionStats.forEach((n, as) -> {
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/prometheus/AggregatedReplicationStats.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/prometheus/AggregatedReplicationStats.java
index b132e7e..ca92d55 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/prometheus/AggregatedReplicationStats.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/prometheus/AggregatedReplicationStats.java
@@ -19,6 +19,8 @@
 package org.apache.pulsar.broker.stats.prometheus;
 
 public class AggregatedReplicationStats {
+
+    /** Total rate of messages received from the remote cluster (msg/s). */
     public double msgRateIn;
 
     /** Total throughput received from the remote cluster. bytes/s */
@@ -30,9 +32,16 @@ public class AggregatedReplicationStats {
     /** Total throughput delivered to the replication-subscriber. bytes/s */
     public double msgThroughputOut;
 
-    /**
-     * Number of messages pending to be replicated to remote cluster.
-     */
+    /** Total rate of messages expired (msg/s). */
+    public double msgRateExpired;
+
+    /** Number of messages pending to be replicated to remote cluster. */
     public long replicationBacklog;
 
+    /** The count of replication-subscriber up and running to replicate to 
remote cluster. */
+    public long connectedCount;
+
+    /** Time in seconds from the time a message was produced to the time when 
it is about to be replicated. */
+    public long replicationDelayInSeconds;
+
 }
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/prometheus/NamespaceStatsAggregator.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/prometheus/NamespaceStatsAggregator.java
index 034e83e..18c4f97 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/prometheus/NamespaceStatsAggregator.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/prometheus/NamespaceStatsAggregator.java
@@ -210,6 +210,11 @@ public class NamespaceStatsAggregator {
             aggReplStats.msgRateOut += replStats.msgRateOut;
             aggReplStats.msgThroughputOut += replStats.msgThroughputOut;
             aggReplStats.replicationBacklog += replStats.replicationBacklog;
+            aggReplStats.msgRateIn += replStats.msgRateIn;
+            aggReplStats.msgThroughputIn += replStats.msgThroughputIn;
+            aggReplStats.msgRateExpired += replStats.msgRateExpired;
+            aggReplStats.connectedCount += replStats.connected ? 1 : 0;
+            aggReplStats.replicationDelayInSeconds += 
replStats.replicationDelayInSeconds;
         });
     }
 
@@ -327,6 +332,12 @@ public class NamespaceStatsAggregator {
                         replStats.msgThroughputOut);
                 metricWithRemoteCluster(stream, cluster, namespace, 
"pulsar_replication_backlog", remoteCluster,
                         replStats.replicationBacklog);
+                metricWithRemoteCluster(stream, cluster, namespace, 
"pulsar_replication_connected_count", remoteCluster,
+                        replStats.connectedCount);
+                metricWithRemoteCluster(stream, cluster, namespace, 
"pulsar_replication_rate_expired", remoteCluster,
+                        replStats.msgRateExpired);
+                metricWithRemoteCluster(stream, cluster, namespace, 
"pulsar_replication_delay_in_seconds",
+                        remoteCluster, replStats.replicationDelayInSeconds);
             });
         }
     }
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/prometheus/TopicStats.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/prometheus/TopicStats.java
index 557d7ff..0649235 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/prometheus/TopicStats.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/prometheus/TopicStats.java
@@ -237,6 +237,12 @@ class TopicStats {
                         replStats.msgThroughputOut);
                 metricWithRemoteCluster(stream, cluster, namespace, topic, 
"pulsar_replication_backlog", remoteCluster,
                         replStats.replicationBacklog);
+                metricWithRemoteCluster(stream, cluster, namespace, topic, 
"pulsar_replication_connected_count",
+                        remoteCluster, replStats.connectedCount);
+                metricWithRemoteCluster(stream, cluster, namespace, topic, 
"pulsar_replication_rate_expired",
+                        remoteCluster, replStats.msgRateExpired);
+                metricWithRemoteCluster(stream, cluster, namespace, topic, 
"pulsar_replication_delay_in_seconds",
+                        remoteCluster, replStats.replicationDelayInSeconds);
             });
         }
 
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/stats/prometheus/AggregatedNamespaceStatsTest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/stats/prometheus/AggregatedNamespaceStatsTest.java
index 5fec7a6..2d0a5f9 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/stats/prometheus/AggregatedNamespaceStatsTest.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/stats/prometheus/AggregatedNamespaceStatsTest.java
@@ -49,6 +49,9 @@ public class AggregatedNamespaceStatsTest {
         replStats1.msgRateOut = 2.0;
         replStats1.msgThroughputOut = 256.0;
         replStats1.replicationBacklog = 1;
+        replStats1.connectedCount = 0;
+        replStats1.msgRateExpired = 3.0;
+        replStats1.replicationDelayInSeconds = 20;
         topicStats1.replicationStats.put(namespace, replStats1);
 
         AggregatedSubscriptionStats subStats1 = new 
AggregatedSubscriptionStats();
@@ -77,6 +80,9 @@ public class AggregatedNamespaceStatsTest {
         replStats2.msgRateOut = 10.5;
         replStats2.msgThroughputOut = 1536.0;
         replStats2.replicationBacklog = 99;
+        replStats2.connectedCount = 1;
+        replStats2.msgRateExpired = 3.0;
+        replStats2.replicationDelayInSeconds = 20;
         topicStats2.replicationStats.put(namespace, replStats2);
 
         AggregatedSubscriptionStats subStats2 = new 
AggregatedSubscriptionStats();
@@ -110,6 +116,9 @@ public class AggregatedNamespaceStatsTest {
         assertEquals(nsReplStats.msgRateOut, 12.5);
         assertEquals(nsReplStats.msgThroughputOut, 1792.0);
         assertEquals(nsReplStats.replicationBacklog, 100);
+        assertEquals(nsReplStats.connectedCount, 1);
+        assertEquals(nsReplStats.msgRateExpired, 6.0);
+        assertEquals(nsReplStats.replicationDelayInSeconds, 40);
 
         AggregatedSubscriptionStats nsSubStats = 
nsStats.subscriptionStats.get(namespace);
         assertNotNull(nsSubStats);
diff --git a/site2/docs/reference-metrics.md b/site2/docs/reference-metrics.md
index cbb4693..f9be3bc 100644
--- a/site2/docs/reference-metrics.md
+++ b/site2/docs/reference-metrics.md
@@ -165,6 +165,10 @@ All the replication metrics are also labelled with 
`remoteCluster=${pulsar_remot
 | pulsar_replication_throughput_in | Gauge | The total throughput of the 
namespace replicating from remote cluster (bytes/second). |
 | pulsar_replication_throughput_out | Gauge | The total throughput of the 
namespace replicating to remote cluster (bytes/second). |
 | pulsar_replication_backlog | Gauge | The total backlog of the namespace 
replicating to remote cluster (messages). |
+| pulsar_replication_rate_expired | Gauge | Total rate of messages expired 
(messages/second). |
+| pulsar_replication_connected_count | Gauge | The count of 
replication-subscriber up and running to replicate to remote cluster. |
+| pulsar_replication_delay_in_seconds | Gauge | Time in seconds from the time 
a message was produced to the time when it is about to be replicated. |
+~~~~
 
 ### Topic metrics
 

Reply via email to