This is an automated email from the ASF dual-hosted git repository. yubiao pushed a commit to branch branch-2.11 in repository https://gitbox.apache.org/repos/asf/pulsar.git
commit a03543a4e4867835152ccae9a0ce5bb7b9042c42 Author: Tao Jiuming <[email protected]> AuthorDate: Sat Jun 3 21:14:16 2023 +0800 [improve][broker] Add `topic_load_failed` metric (#19236) Co-authored-by: daojun <[email protected]> (cherry picked from commit 43b3622cc7e7f746ca9920fdc704dc7448767ac7) --- .../pulsar/broker/service/BrokerService.java | 9 +++ .../apache/pulsar/broker/service/PulsarStats.java | 4 ++ .../broker/stats/BrokerOperabilityMetrics.java | 10 ++- .../pulsar/broker/service/BrokerServiceTest.java | 79 ++++++++++++++++++++++ .../pulsar/broker/stats/PrometheusMetricsTest.java | 4 ++ 5 files changed, 105 insertions(+), 1 deletion(-) diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java index b921c4b5dc6..65366f7eb22 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java @@ -1183,6 +1183,10 @@ public class BrokerService implements Closeable { private CompletableFuture<Optional<Topic>> createNonPersistentTopic(String topic) { CompletableFuture<Optional<Topic>> topicFuture = new CompletableFuture<>(); + topicFuture.exceptionally(t -> { + pulsarStats.recordTopicLoadFailed(); + return null; + }); if (!pulsar.getConfiguration().isEnableNonPersistentTopics()) { if (log.isDebugEnabled()) { log.debug("Broker is unable to load non-persistent topic {}", topic); @@ -1538,6 +1542,11 @@ public class BrokerService implements Closeable { TopicName topicName = TopicName.get(topic); final long topicCreateTimeMs = TimeUnit.NANOSECONDS.toMillis(System.nanoTime()); + topicFuture.exceptionally(t -> { + pulsarStats.recordTopicLoadFailed(); + return null; + }); + if (isTransactionInternalName(topicName)) { String msg = String.format("Can not create transaction system topic %s", topic); log.warn(msg); diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/PulsarStats.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/PulsarStats.java index ff74cf839ae..69cfc68129c 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/PulsarStats.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/PulsarStats.java @@ -239,6 +239,10 @@ public class PulsarStats implements Closeable { } } + public void recordTopicLoadFailed() { + brokerOperabilityMetrics.recordTopicLoadFailed(); + } + public void recordConnectionCreate() { brokerOperabilityMetrics.recordConnectionCreate(); } diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/BrokerOperabilityMetrics.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/BrokerOperabilityMetrics.java index 4fd9e35dd7b..c0810f47278 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/BrokerOperabilityMetrics.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/BrokerOperabilityMetrics.java @@ -19,6 +19,7 @@ package org.apache.pulsar.broker.stats; import com.google.common.collect.Maps; +import io.prometheus.client.Counter; import java.util.ArrayList; import java.util.List; import java.util.Map; @@ -29,6 +30,7 @@ import org.apache.pulsar.common.stats.Metrics; /** */ public class BrokerOperabilityMetrics { + private static final Counter TOPIC_LOAD_FAILED = Counter.build("topic_load_failed", "-").register(); private final List<Metrics> metricsList; private final String localCluster; private final DimensionStats topicLoadStats; @@ -90,7 +92,9 @@ public class BrokerOperabilityMetrics { } Metrics getTopicLoadMetrics() { - return getDimensionMetrics("topic_load_times", "topic_load", topicLoadStats); + Metrics metrics = getDimensionMetrics("topic_load_times", "topic_load", topicLoadStats); + metrics.put("brk_topic_load_failed_count", TOPIC_LOAD_FAILED.get()); + return metrics; } Metrics getZkWriteLatencyMetrics() { @@ -136,6 +140,10 @@ public class BrokerOperabilityMetrics { zkReadLatencyStats.recordDimensionTimeValue(topicLoadLatencyMs, TimeUnit.MILLISECONDS); } + public void recordTopicLoadFailed() { + this.TOPIC_LOAD_FAILED.inc(); + } + public void recordConnectionCreate() { this.connectionTotalCreatedCount.increment(); this.connectionActive.increment(); diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/BrokerServiceTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/BrokerServiceTest.java index 5a54473247f..ea10b24b385 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/BrokerServiceTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/BrokerServiceTest.java @@ -34,6 +34,7 @@ import com.google.common.collect.Sets; import com.google.gson.Gson; import com.google.gson.JsonArray; import com.google.gson.JsonObject; +import com.google.gson.JsonPrimitive; import io.netty.buffer.ByteBuf; import io.netty.channel.EventLoopGroup; import io.netty.util.concurrent.DefaultThreadFactory; @@ -1628,4 +1629,82 @@ public class BrokerServiceTest extends BrokerTestBase { assertTrue(conf.isForceDeleteTenantAllowed()); }); } + + + @Test + public void testBrokerStatsTopicLoadFailed() throws Exception { + admin.namespaces().createNamespace("prop/ns-test"); + + String persistentTopic = "persistent://prop/ns-test/topic1_" + UUID.randomUUID(); + String nonPersistentTopic = "non-persistent://prop/ns-test/topic2_" + UUID.randomUUID(); + + BrokerService brokerService = pulsar.getBrokerService(); + brokerService = Mockito.spy(brokerService); + // mock create persistent topic failed + Mockito + .doAnswer(invocation -> { + CompletableFuture<ManagedLedgerConfig> f = new CompletableFuture<>(); + f.completeExceptionally(new RuntimeException("This is an exception")); + return f; + }) + .when(brokerService).getManagedLedgerConfig(Mockito.eq(TopicName.get(persistentTopic))); + + // mock create non-persistent topic failed + Mockito + .doAnswer(inv -> { + CompletableFuture<Void> f = new CompletableFuture<>(); + f.completeExceptionally(new RuntimeException("This is an exception")); + return f; + }) + .when(brokerService).checkTopicNsOwnership(Mockito.eq(nonPersistentTopic)); + + + PulsarService pulsarService = pulsar; + Field field = PulsarService.class.getDeclaredField("brokerService"); + field.setAccessible(true); + field.set(pulsarService, brokerService); + + CompletableFuture<Producer<String>> producer = pulsarClient.newProducer(Schema.STRING) + .topic(persistentTopic) + .createAsync(); + CompletableFuture<Producer<String>> producer1 = pulsarClient.newProducer(Schema.STRING) + .topic(nonPersistentTopic) + .createAsync(); + + producer.whenComplete((v, t) -> { + if (t == null) { + try { + v.close(); + } catch (PulsarClientException e) { + // ignore + } + } + }); + producer1.whenComplete((v, t) -> { + if (t == null) { + try { + v.close(); + } catch (PulsarClientException e) { + // ignore + } + } + }); + + Awaitility.waitAtMost(2, TimeUnit.MINUTES).until(() -> { + String json = admin.brokerStats().getMetrics(); + JsonArray metrics = new Gson().fromJson(json, JsonArray.class); + AtomicBoolean flag = new AtomicBoolean(false); + + metrics.forEach(ele -> { + JsonObject obj = ((JsonObject) ele); + JsonObject metrics0 = (JsonObject) obj.get("metrics"); + JsonPrimitive v = (JsonPrimitive) metrics0.get("brk_topic_load_failed_count"); + if (null != v && v.getAsDouble() >= 2D) { + flag.set(true); + } + }); + + return flag.get(); + }); + } } diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/stats/PrometheusMetricsTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/stats/PrometheusMetricsTest.java index 1fa523b7be9..8a07038c043 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/stats/PrometheusMetricsTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/stats/PrometheusMetricsTest.java @@ -323,6 +323,10 @@ public class PrometheusMetricsTest extends BrokerTestBase { assertEquals(cm.size(), 1); assertEquals(cm.get(0).tags.get("cluster"), "test"); + cm = (List<Metric>) metrics.get("topic_load_failed_total"); + assertEquals(cm.size(), 1); + assertEquals(cm.get(0).tags.get("cluster"), "test"); + cm = (List<Metric>) metrics.get("pulsar_in_bytes_total"); assertEquals(cm.size(), 2); assertEquals(cm.get(0).tags.get("topic"), "persistent://my-property/use/my-ns/my-topic2");
