This is an automated email from the ASF dual-hosted git repository. yubiao pushed a commit to branch branch-3.0 in repository https://gitbox.apache.org/repos/asf/pulsar.git
commit a57d9a7a0ee324d191495658aab7305272272faf Author: Tao Jiuming <[email protected]> AuthorDate: Sat Jun 3 21:14:16 2023 +0800 [improve][broker] Add `topic_load_failed` metric (#19236) Co-authored-by: daojun <[email protected]> (cherry picked from commit 43b3622cc7e7f746ca9920fdc704dc7448767ac7) --- .../pulsar/broker/service/BrokerService.java | 9 +++ .../apache/pulsar/broker/service/PulsarStats.java | 4 ++ .../broker/stats/BrokerOperabilityMetrics.java | 10 ++- .../pulsar/broker/service/BrokerServiceTest.java | 78 ++++++++++++++++++++++ .../pulsar/broker/stats/PrometheusMetricsTest.java | 4 ++ 5 files changed, 104 insertions(+), 1 deletion(-) diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java index fdc9e5b33fa..7c4616d9b92 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java @@ -1291,6 +1291,10 @@ public class BrokerService implements Closeable { private CompletableFuture<Optional<Topic>> createNonPersistentTopic(String topic) { CompletableFuture<Optional<Topic>> topicFuture = new CompletableFuture<>(); + topicFuture.exceptionally(t -> { + pulsarStats.recordTopicLoadFailed(); + return null; + }); if (!pulsar.getConfiguration().isEnableNonPersistentTopics()) { if (log.isDebugEnabled()) { log.debug("Broker is unable to load non-persistent topic {}", topic); @@ -1686,6 +1690,11 @@ public class BrokerService implements Closeable { TopicName topicName = TopicName.get(topic); final long topicCreateTimeMs = TimeUnit.NANOSECONDS.toMillis(System.nanoTime()); + topicFuture.exceptionally(t -> { + pulsarStats.recordTopicLoadFailed(); + return null; + }); + if (isTransactionInternalName(topicName)) { String msg = String.format("Can not create transaction system topic %s", topic); log.warn(msg); diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/PulsarStats.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/PulsarStats.java index 2059aa04350..0e8161747ff 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/PulsarStats.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/PulsarStats.java @@ -266,6 +266,10 @@ public class PulsarStats implements Closeable { } } + public void recordTopicLoadFailed() { + brokerOperabilityMetrics.recordTopicLoadFailed(); + } + public void recordConnectionCreate() { brokerOperabilityMetrics.recordConnectionCreate(); } diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/BrokerOperabilityMetrics.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/BrokerOperabilityMetrics.java index 90913333871..400dbd3335a 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/BrokerOperabilityMetrics.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/BrokerOperabilityMetrics.java @@ -18,6 +18,7 @@ */ package org.apache.pulsar.broker.stats; +import io.prometheus.client.Counter; import java.util.ArrayList; import java.util.HashMap; import java.util.List; @@ -29,6 +30,7 @@ import org.apache.pulsar.common.stats.Metrics; /** */ public class BrokerOperabilityMetrics { + private static final Counter TOPIC_LOAD_FAILED = Counter.build("topic_load_failed", "-").register(); private final List<Metrics> metricsList; private final String localCluster; private final DimensionStats topicLoadStats; @@ -84,7 +86,9 @@ public class BrokerOperabilityMetrics { } Metrics getTopicLoadMetrics() { - return getDimensionMetrics("topic_load_times", "topic_load", topicLoadStats); + Metrics metrics = getDimensionMetrics("topic_load_times", "topic_load", topicLoadStats); + metrics.put("brk_topic_load_failed_count", TOPIC_LOAD_FAILED.get()); + return metrics; } Metrics getDimensionMetrics(String metricsName, String dimensionName, DimensionStats stats) { @@ -112,6 +116,10 @@ public class BrokerOperabilityMetrics { topicLoadStats.recordDimensionTimeValue(topicLoadLatencyMs, TimeUnit.MILLISECONDS); } + public void recordTopicLoadFailed() { + this.TOPIC_LOAD_FAILED.inc(); + } + public void recordConnectionCreate() { this.connectionTotalCreatedCount.increment(); this.connectionActive.increment(); diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/BrokerServiceTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/BrokerServiceTest.java index 9789c9d725c..935fecc4536 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/BrokerServiceTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/BrokerServiceTest.java @@ -33,6 +33,7 @@ import com.google.common.collect.Sets; import com.google.gson.Gson; import com.google.gson.JsonArray; import com.google.gson.JsonObject; +import com.google.gson.JsonPrimitive; import io.netty.buffer.ByteBuf; import io.netty.channel.EventLoopGroup; import io.netty.util.concurrent.DefaultThreadFactory; @@ -1646,4 +1647,81 @@ public class BrokerServiceTest extends BrokerTestBase { fail("Unsubscribe failed"); } } + + @Test + public void testBrokerStatsTopicLoadFailed() throws Exception { + admin.namespaces().createNamespace("prop/ns-test"); + + String persistentTopic = "persistent://prop/ns-test/topic1_" + UUID.randomUUID(); + String nonPersistentTopic = "non-persistent://prop/ns-test/topic2_" + UUID.randomUUID(); + + BrokerService brokerService = pulsar.getBrokerService(); + brokerService = Mockito.spy(brokerService); + // mock create persistent topic failed + Mockito + .doAnswer(invocation -> { + CompletableFuture<ManagedLedgerConfig> f = new CompletableFuture<>(); + f.completeExceptionally(new RuntimeException("This is an exception")); + return f; + }) + .when(brokerService).getManagedLedgerConfig(Mockito.eq(TopicName.get(persistentTopic))); + + // mock create non-persistent topic failed + Mockito + .doAnswer(inv -> { + CompletableFuture<Void> f = new CompletableFuture<>(); + f.completeExceptionally(new RuntimeException("This is an exception")); + return f; + }) + .when(brokerService).checkTopicNsOwnership(Mockito.eq(nonPersistentTopic)); + + + PulsarService pulsarService = pulsar; + Field field = PulsarService.class.getDeclaredField("brokerService"); + field.setAccessible(true); + field.set(pulsarService, brokerService); + + CompletableFuture<Producer<String>> producer = pulsarClient.newProducer(Schema.STRING) + .topic(persistentTopic) + .createAsync(); + CompletableFuture<Producer<String>> producer1 = pulsarClient.newProducer(Schema.STRING) + .topic(nonPersistentTopic) + .createAsync(); + + producer.whenComplete((v, t) -> { + if (t == null) { + try { + v.close(); + } catch (PulsarClientException e) { + // ignore + } + } + }); + producer1.whenComplete((v, t) -> { + if (t == null) { + try { + v.close(); + } catch (PulsarClientException e) { + // ignore + } + } + }); + + Awaitility.waitAtMost(2, TimeUnit.MINUTES).until(() -> { + String json = admin.brokerStats().getMetrics(); + JsonArray metrics = new Gson().fromJson(json, JsonArray.class); + AtomicBoolean flag = new AtomicBoolean(false); + + metrics.forEach(ele -> { + JsonObject obj = ((JsonObject) ele); + JsonObject metrics0 = (JsonObject) obj.get("metrics"); + JsonPrimitive v = (JsonPrimitive) metrics0.get("brk_topic_load_failed_count"); + if (null != v && v.getAsDouble() >= 2D) { + flag.set(true); + } + }); + + return flag.get(); + }); + } } diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/stats/PrometheusMetricsTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/stats/PrometheusMetricsTest.java index 77bb3cc8cdc..bbd52de8c34 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/stats/PrometheusMetricsTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/stats/PrometheusMetricsTest.java @@ -338,6 +338,10 @@ public class PrometheusMetricsTest extends BrokerTestBase { assertEquals(cm.size(), 1); assertEquals(cm.get(0).tags.get("cluster"), "test"); + cm = (List<Metric>) metrics.get("topic_load_failed_total"); + assertEquals(cm.size(), 1); + assertEquals(cm.get(0).tags.get("cluster"), "test"); + cm = (List<Metric>) metrics.get("pulsar_in_bytes_total"); assertEquals(cm.size(), 2); assertEquals(cm.get(0).tags.get("topic"), "persistent://my-property/use/my-ns/my-topic2");
