This is an automated email from the ASF dual-hosted git repository.
yubiao pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/master by this push:
new 09a16c26974 [improve] [broker] Part 2 of PIP-370: add metrics
"pulsar_replication_disconnected_count" (#23213)
09a16c26974 is described below
commit 09a16c26974408de270bcaaf6162b0e2a9a6d203
Author: fengyubiao <[email protected]>
AuthorDate: Mon Aug 26 10:05:21 2024 +0800
[improve] [broker] Part 2 of PIP-370: add metrics
"pulsar_replication_disconnected_count" (#23213)
---
pip/pip-370.md | 7 +-
.../stats/prometheus/AggregatedNamespaceStats.java | 1 +
.../prometheus/AggregatedReplicationStats.java | 3 +
.../stats/prometheus/NamespaceStatsAggregator.java | 8 +-
.../pulsar/broker/stats/prometheus/TopicStats.java | 2 +
.../broker/service/OneWayReplicatorTest.java | 121 +++++++++++++++++++++
.../service/OneWayReplicatorUsingGlobalZKTest.java | 6 +
.../prometheus/AggregatedNamespaceStatsTest.java | 2 +
8 files changed, 145 insertions(+), 5 deletions(-)
diff --git a/pip/pip-370.md b/pip/pip-370.md
index 6699846cee1..a29d5561432 100644
--- a/pip/pip-370.md
+++ b/pip/pip-370.md
@@ -85,10 +85,9 @@ For each metric provide:
* Attributes (labels)
* Unit
-->
-| Name | Description | Attributes | Units|
-| --- | --- | --- | --- |
-| `pulsar_broker_replication_count` | Counter. The number of topics enabled
replication. | cluster | - |
-| `pulsar_broker_replication_disconnected_count` | Counter. The number of
topics that enabled replication and its replicator failed to connect | cluster
| - |
+| Name | Description
| Attributes | Units|
+| ---
|---------------------------------------------------------------------------------------------|---------------------------|
--- |
+| `pulsar_replication_disconnected_count` | Counter. The number of
replicators. | cluster, namespace, topic | - |
# Monitoring
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/prometheus/AggregatedNamespaceStats.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/prometheus/AggregatedNamespaceStats.java
index 3975cd89cfa..85ff15c915a 100644
---
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/prometheus/AggregatedNamespaceStats.java
+++
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/prometheus/AggregatedNamespaceStats.java
@@ -133,6 +133,7 @@ public class AggregatedNamespaceStats {
replStats.replicationBacklog += as.replicationBacklog;
replStats.msgRateExpired += as.msgRateExpired;
replStats.connectedCount += as.connectedCount;
+ replStats.disconnectedCount += as.disconnectedCount;
replStats.replicationDelayInSeconds +=
as.replicationDelayInSeconds;
});
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/prometheus/AggregatedReplicationStats.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/prometheus/AggregatedReplicationStats.java
index 78f33f874e9..82668de6c35 100644
---
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/prometheus/AggregatedReplicationStats.java
+++
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/prometheus/AggregatedReplicationStats.java
@@ -41,6 +41,9 @@ public class AggregatedReplicationStats {
/** The count of replication-subscriber up and running to replicate to
remote cluster. */
public long connectedCount;
+ /** The count of replication-subscriber that failed to start to replicate
to remote cluster. */
+ public long disconnectedCount;
+
/** Time in seconds from the time a message was produced to the time when
it is about to be replicated. */
public long replicationDelayInSeconds;
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/prometheus/NamespaceStatsAggregator.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/prometheus/NamespaceStatsAggregator.java
index a229ef54c79..f0d11167e65 100644
---
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/prometheus/NamespaceStatsAggregator.java
+++
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/prometheus/NamespaceStatsAggregator.java
@@ -303,7 +303,11 @@ public class NamespaceStatsAggregator {
aggReplStats.msgThroughputOut += replStats.msgThroughputOut;
aggReplStats.replicationBacklog += replStats.replicationBacklog;
aggReplStats.msgRateExpired += replStats.msgRateExpired;
- aggReplStats.connectedCount += replStats.connected ? 1 : 0;
+ if (replStats.connected) {
+ aggReplStats.connectedCount += 1;
+ } else {
+ aggReplStats.disconnectedCount += 1;
+ }
aggReplStats.replicationDelayInSeconds +=
replStats.replicationDelayInSeconds;
});
@@ -510,6 +514,8 @@ public class NamespaceStatsAggregator {
replStats -> replStats.replicationBacklog, cluster, namespace);
writeReplicationStat(stream, "pulsar_replication_connected_count",
stats,
replStats -> replStats.connectedCount, cluster, namespace);
+ writeReplicationStat(stream, "pulsar_replication_disconnected_count",
stats,
+ replStats -> replStats.disconnectedCount, cluster, namespace);
writeReplicationStat(stream, "pulsar_replication_rate_expired", stats,
replStats -> replStats.msgRateExpired, cluster, namespace);
writeReplicationStat(stream, "pulsar_replication_delay_in_seconds",
stats,
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/prometheus/TopicStats.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/prometheus/TopicStats.java
index 9eb4077225c..013b5287310 100644
---
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/prometheus/TopicStats.java
+++
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/prometheus/TopicStats.java
@@ -408,6 +408,8 @@ class TopicStats {
cluster, namespace, topic, remoteCluster,
splitTopicAndPartitionIndexLabel);
writeMetric(stream, "pulsar_replication_connected_count",
replStats.connectedCount,
cluster, namespace, topic, remoteCluster,
splitTopicAndPartitionIndexLabel);
+ writeMetric(stream, "pulsar_replication_disconnected_count",
replStats.disconnectedCount,
+ cluster, namespace, topic, remoteCluster,
splitTopicAndPartitionIndexLabel);
writeMetric(stream, "pulsar_replication_rate_expired",
replStats.msgRateExpired,
cluster, namespace, topic, remoteCluster,
splitTopicAndPartitionIndexLabel);
writeMetric(stream, "pulsar_replication_delay_in_seconds",
replStats.replicationDelayInSeconds,
diff --git
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/OneWayReplicatorTest.java
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/OneWayReplicatorTest.java
index 1745d4dc90f..74604dd990c 100644
---
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/OneWayReplicatorTest.java
+++
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/OneWayReplicatorTest.java
@@ -31,6 +31,7 @@ import static org.testng.Assert.assertTrue;
import static org.testng.Assert.fail;
import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.ObjectMapper;
+import com.google.common.collect.Multimap;
import com.google.common.collect.Sets;
import io.netty.util.concurrent.FastThreadLocalThread;
import java.lang.reflect.Field;
@@ -67,6 +68,7 @@ import org.apache.pulsar.broker.resources.ClusterResources;
import org.apache.pulsar.broker.service.persistent.GeoPersistentReplicator;
import org.apache.pulsar.broker.service.persistent.PersistentReplicator;
import org.apache.pulsar.broker.service.persistent.PersistentTopic;
+import org.apache.pulsar.broker.stats.prometheus.PrometheusMetricsClient;
import org.apache.pulsar.client.admin.PulsarAdmin;
import org.apache.pulsar.client.api.Consumer;
import org.apache.pulsar.client.api.Message;
@@ -91,6 +93,8 @@ import org.apache.pulsar.common.util.FutureUtil;
import org.apache.pulsar.common.util.collections.ConcurrentOpenHashMap;
import org.awaitility.Awaitility;
import org.awaitility.reflect.WhiteboxImpl;
+import org.glassfish.jersey.client.JerseyClient;
+import org.glassfish.jersey.client.JerseyClientBuilder;
import org.mockito.Mockito;
import org.testng.Assert;
import org.testng.annotations.AfterClass;
@@ -1160,4 +1164,121 @@ public class OneWayReplicatorTest extends
OneWayReplicatorTestBase {
admin1.namespaces().deleteNamespace(ns);
admin2.namespaces().deleteNamespace(ns);
}
+
+ @Test
+ public void testReplicationCountMetrics() throws Exception {
+ final String topicName = BrokerTestUtil.newUniqueName("persistent://"
+ nonReplicatedNamespace + "/tp_");
+ // 1.Create topic, does not enable replication now.
+ admin1.topics().createNonPartitionedTopic(topicName);
+ PersistentTopic persistentTopic =
+ (PersistentTopic)
pulsar1.getBrokerService().getTopic(topicName, false).join().get();
+
+ // We inject an error to make the internal producer fail to connect.
+ final AtomicInteger createProducerCounter = new AtomicInteger();
+ final AtomicBoolean failedCreateProducer = new AtomicBoolean(true);
+ Runnable taskToClearInjection =
injectMockReplicatorProducerBuilder((producerCnf, originalProducer) -> {
+ if (topicName.equals(producerCnf.getTopicName())) {
+ // There is a switch to determine create producer successfully
or not.
+ if (failedCreateProducer.get()) {
+ log.info("Retry create replicator.producer count: {}",
createProducerCounter);
+ // Release producer and fail callback.
+ originalProducer.closeAsync();
+ throw new RuntimeException("mock error");
+ }
+ return originalProducer;
+ }
+ return originalProducer;
+ });
+
+ // 2.Enable replication.
+ admin1.topics().setReplicationClusters(topicName,
Arrays.asList(cluster1, cluster2));
+
+ // Verify: metrics.
+ // Cluster level:
+ // - pulsar_replication_connected_count
+ // - pulsar_replication_disconnected_count
+ // Namespace level:
+ // - pulsar_replication_connected_count
+ // - pulsar_replication_disconnected_count
+ // Topic level:
+ // - pulsar_replication_connected_count
+ // - pulsar_replication_disconnected_count
+ JerseyClient httpClient = JerseyClientBuilder.createClient();
+ Awaitility.await().untilAsserted(() -> {
+ int topicConnected = 0;
+ int topicDisconnected = 0;
+
+ String response =
httpClient.target(pulsar1.getWebServiceAddress()).path("/metrics/")
+ .request().get(String.class);
+ Multimap<String, PrometheusMetricsClient.Metric> metricMap =
PrometheusMetricsClient.parseMetrics(response);
+ if
(!metricMap.containsKey("pulsar_replication_disconnected_count")) {
+ fail("Expected 1 disconnected replicator.");
+ }
+ for (PrometheusMetricsClient.Metric metric :
metricMap.get("pulsar_replication_connected_count")) {
+ if (cluster1.equals(metric.tags.get("cluster"))
+ &&
nonReplicatedNamespace.equals(metric.tags.get("namespace"))
+ && topicName.equals(metric.tags.get("topic"))) {
+ topicConnected += Double.valueOf(metric.value).intValue();
+ }
+ }
+ for (PrometheusMetricsClient.Metric metric :
metricMap.get("pulsar_replication_disconnected_count")) {
+ if (cluster1.equals(metric.tags.get("cluster"))
+ &&
nonReplicatedNamespace.equals(metric.tags.get("namespace"))
+ && topicName.equals(metric.tags.get("topic"))) {
+ topicDisconnected +=
Double.valueOf(metric.value).intValue();
+ }
+ }
+ log.info("{}, {},", topicConnected, topicDisconnected);
+ assertEquals(topicConnected, 0);
+ assertEquals(topicDisconnected, 1);
+ });
+
+ // Let replicator connect successfully.
+ failedCreateProducer.set(false);
+ // Verify: metrics.
+ // Cluster level:
+ // - pulsar_replication_connected_count
+ // - pulsar_replication_disconnected_count
+ // Namespace level:
+ // - pulsar_replication_connected_count
+ // - pulsar_replication_disconnected_count
+ // Topic level:
+ // - pulsar_replication_connected_count
+ // - pulsar_replication_disconnected_count
+ Awaitility.await().atMost(Duration.ofSeconds(130)).untilAsserted(() ->
{
+ int topicConnected = 0;
+ int topicDisconnected = 0;
+
+ String response =
httpClient.target(pulsar1.getWebServiceAddress()).path("/metrics/")
+ .request().get(String.class);
+ Multimap<String, PrometheusMetricsClient.Metric> metricMap =
PrometheusMetricsClient.parseMetrics(response);
+ if
(!metricMap.containsKey("pulsar_replication_disconnected_count")) {
+ fail("Expected 1 disconnected replicator.");
+ }
+ for (PrometheusMetricsClient.Metric metric :
metricMap.get("pulsar_replication_connected_count")) {
+ if (cluster1.equals(metric.tags.get("cluster"))
+ &&
nonReplicatedNamespace.equals(metric.tags.get("namespace"))
+ && topicName.equals(metric.tags.get("topic"))) {
+ topicConnected += Double.valueOf(metric.value).intValue();
+ }
+ }
+ for (PrometheusMetricsClient.Metric metric :
metricMap.get("pulsar_replication_disconnected_count")) {
+ if (cluster1.equals(metric.tags.get("cluster"))
+ &&
nonReplicatedNamespace.equals(metric.tags.get("namespace"))
+ && topicName.equals(metric.tags.get("topic"))) {
+ topicDisconnected +=
Double.valueOf(metric.value).intValue();
+ }
+ }
+ log.info("{}, {}", topicConnected, topicDisconnected);
+ assertEquals(topicConnected, 1);
+ assertEquals(topicDisconnected, 0);
+ });
+
+ // cleanup.
+ taskToClearInjection.run();
+ admin1.topics().setReplicationClusters(topicName,
Arrays.asList(cluster1));
+ waitReplicatorStopped(topicName);
+ admin1.topics().delete(topicName, false);
+ admin2.topics().delete(topicName, false);
+ }
}
diff --git
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/OneWayReplicatorUsingGlobalZKTest.java
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/OneWayReplicatorUsingGlobalZKTest.java
index 34810bbe905..d99969fbaa7 100644
---
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/OneWayReplicatorUsingGlobalZKTest.java
+++
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/OneWayReplicatorUsingGlobalZKTest.java
@@ -167,4 +167,10 @@ public class OneWayReplicatorUsingGlobalZKTest extends
OneWayReplicatorTest {
public void testDifferentTopicCreationRule(ReplicationMode
replicationMode) throws Exception {
super.testDifferentTopicCreationRule(replicationMode);
}
+
+ @Test(enabled = false)
+ @Override
+ public void testReplicationCountMetrics() throws Exception {
+ super.testReplicationCountMetrics();
+ }
}
diff --git
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/stats/prometheus/AggregatedNamespaceStatsTest.java
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/stats/prometheus/AggregatedNamespaceStatsTest.java
index 0e12d75f74f..11358eb1e2c 100644
---
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/stats/prometheus/AggregatedNamespaceStatsTest.java
+++
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/stats/prometheus/AggregatedNamespaceStatsTest.java
@@ -101,6 +101,7 @@ public class AggregatedNamespaceStatsTest {
replStats2.msgThroughputOut = 1536.0;
replStats2.replicationBacklog = 99;
replStats2.connectedCount = 1;
+ replStats2.disconnectedCount = 2;
replStats2.msgRateExpired = 3.0;
replStats2.replicationDelayInSeconds = 20;
topicStats2.replicationStats.put(namespace, replStats2);
@@ -148,6 +149,7 @@ public class AggregatedNamespaceStatsTest {
assertEquals(nsReplStats.msgThroughputOut, 1792.0);
assertEquals(nsReplStats.replicationBacklog, 100);
assertEquals(nsReplStats.connectedCount, 1);
+ assertEquals(nsReplStats.disconnectedCount, 2);
assertEquals(nsReplStats.msgRateExpired, 6.0);
assertEquals(nsReplStats.replicationDelayInSeconds, 40);