This is an automated email from the ASF dual-hosted git repository.

yubiao pushed a commit to branch branch-3.0
in repository https://gitbox.apache.org/repos/asf/pulsar.git

commit fbf526848a15153727d12ab1ae4956847ec3f50f
Author: fengyubiao <[email protected]>
AuthorDate: Mon Aug 26 10:05:21 2024 +0800

    [improve] [broker] Part 2 of PIP-370: add metrics 
"pulsar_replication_disconnected_count" (#23213)
    
    (cherry picked from commit 09a16c26974408de270bcaaf6162b0e2a9a6d203)
---
 .../stats/prometheus/AggregatedNamespaceStats.java |   1 +
 .../prometheus/AggregatedReplicationStats.java     |   3 +
 .../stats/prometheus/NamespaceStatsAggregator.java |   8 +-
 .../pulsar/broker/stats/prometheus/TopicStats.java |   2 +
 .../broker/service/OneWayReplicatorTest.java       | 121 +++++++++++++++++++++
 .../service/OneWayReplicatorUsingGlobalZKTest.java |   6 +
 .../prometheus/AggregatedNamespaceStatsTest.java   |   2 +
 7 files changed, 142 insertions(+), 1 deletion(-)

diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/prometheus/AggregatedNamespaceStats.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/prometheus/AggregatedNamespaceStats.java
index 3975cd89cfa..85ff15c915a 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/prometheus/AggregatedNamespaceStats.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/prometheus/AggregatedNamespaceStats.java
@@ -133,6 +133,7 @@ public class AggregatedNamespaceStats {
             replStats.replicationBacklog += as.replicationBacklog;
             replStats.msgRateExpired += as.msgRateExpired;
             replStats.connectedCount += as.connectedCount;
+            replStats.disconnectedCount += as.disconnectedCount;
             replStats.replicationDelayInSeconds += 
as.replicationDelayInSeconds;
         });
 
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/prometheus/AggregatedReplicationStats.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/prometheus/AggregatedReplicationStats.java
index 78f33f874e9..82668de6c35 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/prometheus/AggregatedReplicationStats.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/prometheus/AggregatedReplicationStats.java
@@ -41,6 +41,9 @@ public class AggregatedReplicationStats {
     /** The count of replication-subscriber up and running to replicate to 
remote cluster. */
     public long connectedCount;
 
+    /** The count of replication-subscriber that failed to start to replicate 
to remote cluster. */
+    public long disconnectedCount;
+
     /** Time in seconds from the time a message was produced to the time when 
it is about to be replicated. */
     public long replicationDelayInSeconds;
 
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/prometheus/NamespaceStatsAggregator.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/prometheus/NamespaceStatsAggregator.java
index 3728c3edd1e..d25af8d289c 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/prometheus/NamespaceStatsAggregator.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/prometheus/NamespaceStatsAggregator.java
@@ -301,7 +301,11 @@ public class NamespaceStatsAggregator {
             aggReplStats.msgThroughputOut += replStats.msgThroughputOut;
             aggReplStats.replicationBacklog += replStats.replicationBacklog;
             aggReplStats.msgRateExpired += replStats.msgRateExpired;
-            aggReplStats.connectedCount += replStats.connected ? 1 : 0;
+            if (replStats.connected) {
+                aggReplStats.connectedCount += 1;
+            } else {
+                aggReplStats.disconnectedCount += 1;
+            }
             aggReplStats.replicationDelayInSeconds += 
replStats.replicationDelayInSeconds;
         });
 
@@ -497,6 +501,8 @@ public class NamespaceStatsAggregator {
                 replStats -> replStats.replicationBacklog, cluster, namespace);
         writeReplicationStat(stream, "pulsar_replication_connected_count", 
stats,
                 replStats -> replStats.connectedCount, cluster, namespace);
+        writeReplicationStat(stream, "pulsar_replication_disconnected_count", 
stats,
+                replStats -> replStats.disconnectedCount, cluster, namespace);
         writeReplicationStat(stream, "pulsar_replication_rate_expired", stats,
                 replStats -> replStats.msgRateExpired, cluster, namespace);
         writeReplicationStat(stream, "pulsar_replication_delay_in_seconds", 
stats,
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/prometheus/TopicStats.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/prometheus/TopicStats.java
index 27288291d29..e907760d9d9 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/prometheus/TopicStats.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/prometheus/TopicStats.java
@@ -389,6 +389,8 @@ class TopicStats {
                         cluster, namespace, topic, remoteCluster, 
splitTopicAndPartitionIndexLabel);
                 writeMetric(stream, "pulsar_replication_connected_count", 
replStats.connectedCount,
                         cluster, namespace, topic, remoteCluster, 
splitTopicAndPartitionIndexLabel);
+                writeMetric(stream, "pulsar_replication_disconnected_count", 
replStats.disconnectedCount,
+                        cluster, namespace, topic, remoteCluster, 
splitTopicAndPartitionIndexLabel);
                 writeMetric(stream, "pulsar_replication_rate_expired", 
replStats.msgRateExpired,
                         cluster, namespace, topic, remoteCluster, 
splitTopicAndPartitionIndexLabel);
                 writeMetric(stream, "pulsar_replication_delay_in_seconds", 
replStats.replicationDelayInSeconds,
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/OneWayReplicatorTest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/OneWayReplicatorTest.java
index 4f0f28d0e9d..627a9d82f13 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/OneWayReplicatorTest.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/OneWayReplicatorTest.java
@@ -29,6 +29,7 @@ import static org.testng.Assert.assertNotNull;
 import static org.testng.Assert.assertNull;
 import static org.testng.Assert.assertTrue;
 import static org.testng.Assert.fail;
+import com.google.common.collect.Multimap;
 import com.google.common.collect.Sets;
 import io.netty.util.concurrent.FastThreadLocalThread;
 import java.lang.reflect.Field;
@@ -64,6 +65,7 @@ import org.apache.pulsar.broker.resources.ClusterResources;
 import org.apache.pulsar.broker.service.persistent.GeoPersistentReplicator;
 import org.apache.pulsar.broker.service.persistent.PersistentReplicator;
 import org.apache.pulsar.broker.service.persistent.PersistentTopic;
+import org.apache.pulsar.broker.stats.prometheus.PrometheusMetricsClient;
 import org.apache.pulsar.client.admin.PulsarAdmin;
 import org.apache.pulsar.client.api.Consumer;
 import org.apache.pulsar.client.api.Message;
@@ -89,6 +91,8 @@ import 
org.apache.pulsar.common.util.collections.ConcurrentOpenHashMap;
 import org.awaitility.Awaitility;
 import org.awaitility.reflect.WhiteboxImpl;
 import org.junit.Assert;
+import org.glassfish.jersey.client.JerseyClient;
+import org.glassfish.jersey.client.JerseyClientBuilder;
 import org.mockito.Mockito;
 import org.testng.annotations.AfterClass;
 import org.testng.annotations.BeforeClass;
@@ -1128,4 +1132,121 @@ public class OneWayReplicatorTest extends 
OneWayReplicatorTestBase {
         admin1.namespaces().deleteNamespace(ns);
         admin2.namespaces().deleteNamespace(ns);
     }
+
+    @Test
+    public void testReplicationCountMetrics() throws Exception {
+        final String topicName = BrokerTestUtil.newUniqueName("persistent://" 
+ nonReplicatedNamespace + "/tp_");
+        // 1.Create topic, does not enable replication now.
+        admin1.topics().createNonPartitionedTopic(topicName);
+        PersistentTopic persistentTopic =
+                (PersistentTopic) 
pulsar1.getBrokerService().getTopic(topicName, false).join().get();
+
+        // We inject an error to make the internal producer fail to connect.
+        final AtomicInteger createProducerCounter = new AtomicInteger();
+        final AtomicBoolean failedCreateProducer = new AtomicBoolean(true);
+        Runnable taskToClearInjection = 
injectMockReplicatorProducerBuilder((producerCnf, originalProducer) -> {
+            if (topicName.equals(producerCnf.getTopicName())) {
+                // There is a switch to determine create producer successfully 
or not.
+                if (failedCreateProducer.get()) {
+                    log.info("Retry create replicator.producer count: {}", 
createProducerCounter);
+                    // Release producer and fail callback.
+                    originalProducer.closeAsync();
+                    throw new RuntimeException("mock error");
+                }
+                return originalProducer;
+            }
+            return originalProducer;
+        });
+
+        // 2.Enable replication.
+        admin1.topics().setReplicationClusters(topicName, 
Arrays.asList(cluster1, cluster2));
+
+        // Verify: metrics.
+        // Cluster level:
+        //   - pulsar_replication_connected_count
+        //   - pulsar_replication_disconnected_count
+        // Namespace level:
+        //   - pulsar_replication_connected_count
+        //   - pulsar_replication_disconnected_count
+        // Topic level:
+        //   - pulsar_replication_connected_count
+        //   - pulsar_replication_disconnected_count
+        JerseyClient httpClient = JerseyClientBuilder.createClient();
+        Awaitility.await().untilAsserted(() -> {
+            int topicConnected = 0;
+            int topicDisconnected = 0;
+
+            String response = 
httpClient.target(pulsar1.getWebServiceAddress()).path("/metrics/")
+                    .request().get(String.class);
+            Multimap<String, PrometheusMetricsClient.Metric> metricMap = 
PrometheusMetricsClient.parseMetrics(response);
+            if 
(!metricMap.containsKey("pulsar_replication_disconnected_count")) {
+                fail("Expected 1 disconnected replicator.");
+            }
+            for (PrometheusMetricsClient.Metric metric : 
metricMap.get("pulsar_replication_connected_count")) {
+                if (cluster1.equals(metric.tags.get("cluster"))
+                        && 
nonReplicatedNamespace.equals(metric.tags.get("namespace"))
+                        && topicName.equals(metric.tags.get("topic"))) {
+                    topicConnected += Double.valueOf(metric.value).intValue();
+                }
+            }
+            for (PrometheusMetricsClient.Metric metric : 
metricMap.get("pulsar_replication_disconnected_count")) {
+                if (cluster1.equals(metric.tags.get("cluster"))
+                        && 
nonReplicatedNamespace.equals(metric.tags.get("namespace"))
+                        && topicName.equals(metric.tags.get("topic"))) {
+                    topicDisconnected += 
Double.valueOf(metric.value).intValue();
+                }
+            }
+            log.info("{}, {},", topicConnected, topicDisconnected);
+            assertEquals(topicConnected, 0);
+            assertEquals(topicDisconnected, 1);
+        });
+
+        // Let replicator connect successfully.
+        failedCreateProducer.set(false);
+        // Verify: metrics.
+        // Cluster level:
+        //   - pulsar_replication_connected_count
+        //   - pulsar_replication_disconnected_count
+        // Namespace level:
+        //   - pulsar_replication_connected_count
+        //   - pulsar_replication_disconnected_count
+        // Topic level:
+        //   - pulsar_replication_connected_count
+        //   - pulsar_replication_disconnected_count
+        Awaitility.await().atMost(Duration.ofSeconds(130)).untilAsserted(() -> 
{
+            int topicConnected = 0;
+            int topicDisconnected = 0;
+
+            String response = 
httpClient.target(pulsar1.getWebServiceAddress()).path("/metrics/")
+                    .request().get(String.class);
+            Multimap<String, PrometheusMetricsClient.Metric> metricMap = 
PrometheusMetricsClient.parseMetrics(response);
+            if 
(!metricMap.containsKey("pulsar_replication_disconnected_count")) {
+                fail("Expected 1 disconnected replicator.");
+            }
+            for (PrometheusMetricsClient.Metric metric : 
metricMap.get("pulsar_replication_connected_count")) {
+                if (cluster1.equals(metric.tags.get("cluster"))
+                        && 
nonReplicatedNamespace.equals(metric.tags.get("namespace"))
+                        && topicName.equals(metric.tags.get("topic"))) {
+                    topicConnected += Double.valueOf(metric.value).intValue();
+                }
+            }
+            for (PrometheusMetricsClient.Metric metric : 
metricMap.get("pulsar_replication_disconnected_count")) {
+                if (cluster1.equals(metric.tags.get("cluster"))
+                        && 
nonReplicatedNamespace.equals(metric.tags.get("namespace"))
+                        && topicName.equals(metric.tags.get("topic"))) {
+                    topicDisconnected += 
Double.valueOf(metric.value).intValue();
+                }
+            }
+            log.info("{}, {}", topicConnected, topicDisconnected);
+            assertEquals(topicConnected, 1);
+            assertEquals(topicDisconnected, 0);
+        });
+
+        // cleanup.
+        taskToClearInjection.run();
+        admin1.topics().setReplicationClusters(topicName, 
Arrays.asList(cluster1));
+        waitReplicatorStopped(topicName);
+        admin1.topics().delete(topicName, false);
+        admin2.topics().delete(topicName, false);
+    }
 }
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/OneWayReplicatorUsingGlobalZKTest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/OneWayReplicatorUsingGlobalZKTest.java
index 34810bbe905..d99969fbaa7 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/OneWayReplicatorUsingGlobalZKTest.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/OneWayReplicatorUsingGlobalZKTest.java
@@ -167,4 +167,10 @@ public class OneWayReplicatorUsingGlobalZKTest extends 
OneWayReplicatorTest {
     public void testDifferentTopicCreationRule(ReplicationMode 
replicationMode) throws Exception {
         super.testDifferentTopicCreationRule(replicationMode);
     }
+
+    @Test(enabled = false)
+    @Override
+    public void testReplicationCountMetrics() throws Exception {
+        super.testReplicationCountMetrics();
+    }
 }
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/stats/prometheus/AggregatedNamespaceStatsTest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/stats/prometheus/AggregatedNamespaceStatsTest.java
index 0e12d75f74f..11358eb1e2c 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/stats/prometheus/AggregatedNamespaceStatsTest.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/stats/prometheus/AggregatedNamespaceStatsTest.java
@@ -101,6 +101,7 @@ public class AggregatedNamespaceStatsTest {
         replStats2.msgThroughputOut = 1536.0;
         replStats2.replicationBacklog = 99;
         replStats2.connectedCount = 1;
+        replStats2.disconnectedCount = 2;
         replStats2.msgRateExpired = 3.0;
         replStats2.replicationDelayInSeconds = 20;
         topicStats2.replicationStats.put(namespace, replStats2);
@@ -148,6 +149,7 @@ public class AggregatedNamespaceStatsTest {
         assertEquals(nsReplStats.msgThroughputOut, 1792.0);
         assertEquals(nsReplStats.replicationBacklog, 100);
         assertEquals(nsReplStats.connectedCount, 1);
+        assertEquals(nsReplStats.disconnectedCount, 2);
         assertEquals(nsReplStats.msgRateExpired, 6.0);
         assertEquals(nsReplStats.replicationDelayInSeconds, 40);
 

Reply via email to