This is an automated email from the ASF dual-hosted git repository. yubiao pushed a commit to branch branch-2.10 in repository https://gitbox.apache.org/repos/asf/pulsar.git
commit f584be1f6eff6231b6be0e85fa62837f79df601a Author: Tao Jiuming <[email protected]> AuthorDate: Sat Jun 3 21:14:16 2023 +0800 [improve][broker] Add `topic_load_failed` metric (#19236) Co-authored-by: daojun <[email protected]> (cherry picked from commit 43b3622cc7e7f746ca9920fdc704dc7448767ac7) --- .../java/org/apache/pulsar/broker/service/BrokerService.java | 9 +++++++++ .../java/org/apache/pulsar/broker/service/PulsarStats.java | 4 ++++ .../apache/pulsar/broker/stats/BrokerOperabilityMetrics.java | 10 +++++++++- .../org/apache/pulsar/broker/stats/PrometheusMetricsTest.java | 4 ++++ 4 files changed, 26 insertions(+), 1 deletion(-) diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java index c53085347db..4e5b15240db 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java @@ -1184,6 +1184,10 @@ public class BrokerService implements Closeable { private CompletableFuture<Optional<Topic>> createNonPersistentTopic(String topic) { CompletableFuture<Optional<Topic>> topicFuture = new CompletableFuture<>(); + topicFuture.exceptionally(t -> { + pulsarStats.recordTopicLoadFailed(); + return null; + }); if (!pulsar.getConfiguration().isEnableNonPersistentTopics()) { if (log.isDebugEnabled()) { log.debug("Broker is unable to load non-persistent topic {}", topic); @@ -1498,6 +1502,11 @@ public class BrokerService implements Closeable { TopicName topicName = TopicName.get(topic); final long topicCreateTimeMs = TimeUnit.NANOSECONDS.toMillis(System.nanoTime()); + topicFuture.exceptionally(t -> { + pulsarStats.recordTopicLoadFailed(); + return null; + }); + if (isTransactionSystemTopic(topicName)) { String msg = String.format("Can not create transaction system topic %s", topic); log.warn(msg); diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/PulsarStats.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/PulsarStats.java index 6101d669932..315c0a1852a 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/PulsarStats.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/PulsarStats.java @@ -240,6 +240,10 @@ public class PulsarStats implements Closeable { } } + public void recordTopicLoadFailed() { + brokerOperabilityMetrics.recordTopicLoadFailed(); + } + public void recordConnectionCreate() { brokerOperabilityMetrics.recordConnectionCreate(); } diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/BrokerOperabilityMetrics.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/BrokerOperabilityMetrics.java index 4fd9e35dd7b..c0810f47278 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/BrokerOperabilityMetrics.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/BrokerOperabilityMetrics.java @@ -19,6 +19,7 @@ package org.apache.pulsar.broker.stats; import com.google.common.collect.Maps; +import io.prometheus.client.Counter; import java.util.ArrayList; import java.util.List; import java.util.Map; @@ -29,6 +30,7 @@ import org.apache.pulsar.common.stats.Metrics; /** */ public class BrokerOperabilityMetrics { + private static final Counter TOPIC_LOAD_FAILED = Counter.build("topic_load_failed", "-").register(); private final List<Metrics> metricsList; private final String localCluster; private final DimensionStats topicLoadStats; @@ -90,7 +92,9 @@ public class BrokerOperabilityMetrics { } Metrics getTopicLoadMetrics() { - return getDimensionMetrics("topic_load_times", "topic_load", topicLoadStats); + Metrics metrics = getDimensionMetrics("topic_load_times", "topic_load", topicLoadStats); + metrics.put("brk_topic_load_failed_count", TOPIC_LOAD_FAILED.get()); + return metrics; } Metrics getZkWriteLatencyMetrics() { @@ -136,6 +140,10 @@ public class BrokerOperabilityMetrics { zkReadLatencyStats.recordDimensionTimeValue(topicLoadLatencyMs, TimeUnit.MILLISECONDS); } + public void recordTopicLoadFailed() { + this.TOPIC_LOAD_FAILED.inc(); + } + public void recordConnectionCreate() { this.connectionTotalCreatedCount.increment(); this.connectionActive.increment(); diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/stats/PrometheusMetricsTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/stats/PrometheusMetricsTest.java index 103b8f94df1..245a05acda7 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/stats/PrometheusMetricsTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/stats/PrometheusMetricsTest.java @@ -316,6 +316,10 @@ public class PrometheusMetricsTest extends BrokerTestBase { assertEquals(cm.size(), 1); assertEquals(cm.get(0).tags.get("cluster"), "test"); + cm = (List<Metric>) metrics.get("topic_load_failed_total"); + assertEquals(cm.size(), 1); + assertEquals(cm.get(0).tags.get("cluster"), "test"); + cm = (List<Metric>) metrics.get("pulsar_in_bytes_total"); assertEquals(cm.size(), 2); assertEquals(cm.get(0).tags.get("topic"), "persistent://my-property/use/my-ns/my-topic2");
