This is an automated email from the ASF dual-hosted git repository.

penghui pushed a commit to branch branch-2.7
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/branch-2.7 by this push:
     new ef61fb3  Fix missing replicator metrics (#11264)
ef61fb3 is described below

commit ef61fb35d0b0fcf5ab0dccc7e82622e2314a8571
Author: Tboy <[email protected]>
AuthorDate: Fri Jul 9 06:40:14 2021 +0800

    Fix missing replicator metrics (#11264)
    
    Fix missing replicator metrics [msgRateExpired, connected, 
replicationDelayInSeconds] in prometheus.
    
    1.  Fix the missing metrics.
    2. Update the document.
    
    - [X] Make sure that the change passes the CI checks.
    
    (cherry picked from commit d811606e6baffc4bcf5d8ad69630e1f489ccf2d7)
---
 .../broker/stats/prometheus/AggregatedNamespaceStats.java   |  3 +++
 .../broker/stats/prometheus/AggregatedReplicationStats.java | 13 ++++++++++++-
 .../broker/stats/prometheus/NamespaceStatsAggregator.java   | 11 +++++++++++
 .../apache/pulsar/broker/stats/prometheus/TopicStats.java   |  6 ++++++
 .../stats/prometheus/AggregatedNamespaceStatsTest.java      |  9 +++++++++
 site2/docs/reference-metrics.md                             |  4 ++++
 6 files changed, 45 insertions(+), 1 deletion(-)

diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/prometheus/AggregatedNamespaceStats.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/prometheus/AggregatedNamespaceStats.java
index ec7ebfd..0b64a68 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/prometheus/AggregatedNamespaceStats.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/prometheus/AggregatedNamespaceStats.java
@@ -99,6 +99,9 @@ public class AggregatedNamespaceStats {
             replStats.msgThroughputIn += as.msgThroughputIn;
             replStats.msgThroughputOut += as.msgThroughputOut;
             replStats.replicationBacklog += as.replicationBacklog;
+            replStats.msgRateExpired += as.msgRateExpired;
+            replStats.connectedCount += as.connectedCount;
+            replStats.replicationDelayInSeconds += 
as.replicationDelayInSeconds;
         });
 
         stats.subscriptionStats.forEach((n, as) -> {
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/prometheus/AggregatedReplicationStats.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/prometheus/AggregatedReplicationStats.java
index ac0b41a..ca92d55 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/prometheus/AggregatedReplicationStats.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/prometheus/AggregatedReplicationStats.java
@@ -19,6 +19,8 @@
 package org.apache.pulsar.broker.stats.prometheus;
 
 public class AggregatedReplicationStats {
+
+    /** Total rate of messages received from the remote cluster (msg/s). */
     public double msgRateIn;
 
     /** Total throughput received from the remote cluster. bytes/s */
@@ -30,7 +32,16 @@ public class AggregatedReplicationStats {
     /** Total throughput delivered to the replication-subscriber. bytes/s */
     public double msgThroughputOut;
 
-    /** Number of messages pending to be replicated to remote cluster */
+    /** Total rate of messages expired (msg/s). */
+    public double msgRateExpired;
+
+    /** Number of messages pending to be replicated to remote cluster. */
     public long replicationBacklog;
 
+    /** The count of replication-subscriber up and running to replicate to 
remote cluster. */
+    public long connectedCount;
+
+    /** Time in seconds from the time a message was produced to the time when 
it is about to be replicated. */
+    public long replicationDelayInSeconds;
+
 }
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/prometheus/NamespaceStatsAggregator.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/prometheus/NamespaceStatsAggregator.java
index cf89ea2..fcdbbce 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/prometheus/NamespaceStatsAggregator.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/prometheus/NamespaceStatsAggregator.java
@@ -196,6 +196,11 @@ public class NamespaceStatsAggregator {
             aggReplStats.msgRateOut += replStats.msgRateOut;
             aggReplStats.msgThroughputOut += replStats.msgThroughputOut;
             aggReplStats.replicationBacklog += replStats.replicationBacklog;
+            aggReplStats.msgRateIn += replStats.msgRateIn;
+            aggReplStats.msgThroughputIn += replStats.msgThroughputIn;
+            aggReplStats.msgRateExpired += replStats.msgRateExpired;
+            aggReplStats.connectedCount += replStats.connected ? 1 : 0;
+            aggReplStats.replicationDelayInSeconds += 
replStats.replicationDelayInSeconds;
         });
     }
 
@@ -309,6 +314,12 @@ public class NamespaceStatsAggregator {
                         replStats.msgThroughputOut);
                 metricWithRemoteCluster(stream, cluster, namespace, 
"pulsar_replication_backlog", remoteCluster,
                         replStats.replicationBacklog);
+                metricWithRemoteCluster(stream, cluster, namespace, 
"pulsar_replication_connected_count", remoteCluster,
+                        replStats.connectedCount);
+                metricWithRemoteCluster(stream, cluster, namespace, 
"pulsar_replication_rate_expired", remoteCluster,
+                        replStats.msgRateExpired);
+                metricWithRemoteCluster(stream, cluster, namespace, 
"pulsar_replication_delay_in_seconds",
+                        remoteCluster, replStats.replicationDelayInSeconds);
             });
         }
     }
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/prometheus/TopicStats.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/prometheus/TopicStats.java
index 1d39d58..5bb37ff 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/prometheus/TopicStats.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/prometheus/TopicStats.java
@@ -215,6 +215,12 @@ class TopicStats {
                         replStats.msgThroughputOut);
                 metricWithRemoteCluster(stream, cluster, namespace, topic, 
"pulsar_replication_backlog", remoteCluster,
                         replStats.replicationBacklog);
+                metricWithRemoteCluster(stream, cluster, namespace, topic, 
"pulsar_replication_connected_count",
+                        remoteCluster, replStats.connectedCount);
+                metricWithRemoteCluster(stream, cluster, namespace, topic, 
"pulsar_replication_rate_expired",
+                        remoteCluster, replStats.msgRateExpired);
+                metricWithRemoteCluster(stream, cluster, namespace, topic, 
"pulsar_replication_delay_in_seconds",
+                        remoteCluster, replStats.replicationDelayInSeconds);
             });
         }
 
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/stats/prometheus/AggregatedNamespaceStatsTest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/stats/prometheus/AggregatedNamespaceStatsTest.java
index c01173e..07c8f24 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/stats/prometheus/AggregatedNamespaceStatsTest.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/stats/prometheus/AggregatedNamespaceStatsTest.java
@@ -48,6 +48,9 @@ public class AggregatedNamespaceStatsTest {
         replStats1.msgRateOut = 2.0;
         replStats1.msgThroughputOut = 256.0;
         replStats1.replicationBacklog = 1;
+        replStats1.connectedCount = 0;
+        replStats1.msgRateExpired = 3.0;
+        replStats1.replicationDelayInSeconds = 20;
         topicStats1.replicationStats.put(namespace, replStats1);
 
         AggregatedSubscriptionStats subStats1 = new 
AggregatedSubscriptionStats();
@@ -76,6 +79,9 @@ public class AggregatedNamespaceStatsTest {
         replStats2.msgRateOut = 10.5;
         replStats2.msgThroughputOut = 1536.0;
         replStats2.replicationBacklog = 99;
+        replStats2.connectedCount = 1;
+        replStats2.msgRateExpired = 3.0;
+        replStats2.replicationDelayInSeconds = 20;
         topicStats2.replicationStats.put(namespace, replStats2);
 
         AggregatedSubscriptionStats subStats2 = new 
AggregatedSubscriptionStats();
@@ -109,6 +115,9 @@ public class AggregatedNamespaceStatsTest {
         assertEquals(nsReplStats.msgRateOut, 12.5);
         assertEquals(nsReplStats.msgThroughputOut, 1792.0);
         assertEquals(nsReplStats.replicationBacklog, 100);
+        assertEquals(nsReplStats.connectedCount, 1);
+        assertEquals(nsReplStats.msgRateExpired, 6.0);
+        assertEquals(nsReplStats.replicationDelayInSeconds, 40);
 
         AggregatedSubscriptionStats nsSubStats = 
nsStats.subscriptionStats.get(namespace);
         assertNotNull(nsSubStats);
diff --git a/site2/docs/reference-metrics.md b/site2/docs/reference-metrics.md
index 02780bc..08f9a35 100644
--- a/site2/docs/reference-metrics.md
+++ b/site2/docs/reference-metrics.md
@@ -149,6 +149,10 @@ All the replication metrics are also labelled with 
`remoteCluster=${pulsar_remot
 | pulsar_replication_throughput_in | Gauge | The total throughput of the 
namespace replicating from remote cluster (bytes/second). |
 | pulsar_replication_throughput_out | Gauge | The total throughput of the 
namespace replicating to remote cluster (bytes/second). |
 | pulsar_replication_backlog | Gauge | The total backlog of the namespace 
replicating to remote cluster (messages). |
+| pulsar_replication_rate_expired | Gauge | Total rate of messages expired 
(messages/second). |
+| pulsar_replication_connected_count | Gauge | The count of 
replication-subscriber up and running to replicate to remote cluster. |
+| pulsar_replication_delay_in_seconds | Gauge | Time in seconds from the time 
a message was produced to the time when it is about to be replicated. |
+~~~~
 
 ### Topic metrics
 

Reply via email to