This is an automated email from the ASF dual-hosted git repository.

yubiao pushed a commit to branch branch-2.11
in repository https://gitbox.apache.org/repos/asf/pulsar.git

commit a03543a4e4867835152ccae9a0ce5bb7b9042c42
Author: Tao Jiuming <[email protected]>
AuthorDate: Sat Jun 3 21:14:16 2023 +0800

    [improve][broker] Add `topic_load_failed` metric (#19236)
    
    Co-authored-by: daojun <[email protected]>
    (cherry picked from commit 43b3622cc7e7f746ca9920fdc704dc7448767ac7)
---
 .../pulsar/broker/service/BrokerService.java       |  9 +++
 .../apache/pulsar/broker/service/PulsarStats.java  |  4 ++
 .../broker/stats/BrokerOperabilityMetrics.java     | 10 ++-
 .../pulsar/broker/service/BrokerServiceTest.java   | 79 ++++++++++++++++++++++
 .../pulsar/broker/stats/PrometheusMetricsTest.java |  4 ++
 5 files changed, 105 insertions(+), 1 deletion(-)

diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java
index b921c4b5dc6..65366f7eb22 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java
@@ -1183,6 +1183,10 @@ public class BrokerService implements Closeable {
 
     private CompletableFuture<Optional<Topic>> createNonPersistentTopic(String 
topic) {
         CompletableFuture<Optional<Topic>> topicFuture = new 
CompletableFuture<>();
+        topicFuture.exceptionally(t -> {
+            pulsarStats.recordTopicLoadFailed();
+            return null;
+        });
         if (!pulsar.getConfiguration().isEnableNonPersistentTopics()) {
             if (log.isDebugEnabled()) {
                 log.debug("Broker is unable to load non-persistent topic {}", 
topic);
@@ -1538,6 +1542,11 @@ public class BrokerService implements Closeable {
         TopicName topicName = TopicName.get(topic);
         final long topicCreateTimeMs = 
TimeUnit.NANOSECONDS.toMillis(System.nanoTime());
 
+        topicFuture.exceptionally(t -> {
+            pulsarStats.recordTopicLoadFailed();
+            return null;
+        });
+
         if (isTransactionInternalName(topicName)) {
             String msg = String.format("Can not create transaction system 
topic %s", topic);
             log.warn(msg);
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/PulsarStats.java 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/PulsarStats.java
index ff74cf839ae..69cfc68129c 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/PulsarStats.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/PulsarStats.java
@@ -239,6 +239,10 @@ public class PulsarStats implements Closeable {
         }
     }
 
+    public void recordTopicLoadFailed() {
+        brokerOperabilityMetrics.recordTopicLoadFailed();
+    }
+
     public void recordConnectionCreate() {
         brokerOperabilityMetrics.recordConnectionCreate();
     }
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/BrokerOperabilityMetrics.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/BrokerOperabilityMetrics.java
index 4fd9e35dd7b..c0810f47278 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/BrokerOperabilityMetrics.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/BrokerOperabilityMetrics.java
@@ -19,6 +19,7 @@
 package org.apache.pulsar.broker.stats;
 
 import com.google.common.collect.Maps;
+import io.prometheus.client.Counter;
 import java.util.ArrayList;
 import java.util.List;
 import java.util.Map;
@@ -29,6 +30,7 @@ import org.apache.pulsar.common.stats.Metrics;
 /**
  */
 public class BrokerOperabilityMetrics {
+    private static final Counter TOPIC_LOAD_FAILED = 
Counter.build("topic_load_failed", "-").register();
     private final List<Metrics> metricsList;
     private final String localCluster;
     private final DimensionStats topicLoadStats;
@@ -90,7 +92,9 @@ public class BrokerOperabilityMetrics {
     }
 
     Metrics getTopicLoadMetrics() {
-        return getDimensionMetrics("topic_load_times", "topic_load", 
topicLoadStats);
+        Metrics metrics = getDimensionMetrics("topic_load_times", 
"topic_load", topicLoadStats);
+        metrics.put("brk_topic_load_failed_count", TOPIC_LOAD_FAILED.get());
+        return metrics;
     }
 
     Metrics getZkWriteLatencyMetrics() {
@@ -136,6 +140,10 @@ public class BrokerOperabilityMetrics {
         zkReadLatencyStats.recordDimensionTimeValue(topicLoadLatencyMs, 
TimeUnit.MILLISECONDS);
     }
 
+    public void recordTopicLoadFailed() {
+        this.TOPIC_LOAD_FAILED.inc();
+    }
+
     public void recordConnectionCreate() {
         this.connectionTotalCreatedCount.increment();
         this.connectionActive.increment();
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/BrokerServiceTest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/BrokerServiceTest.java
index 5a54473247f..ea10b24b385 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/BrokerServiceTest.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/BrokerServiceTest.java
@@ -34,6 +34,7 @@ import com.google.common.collect.Sets;
 import com.google.gson.Gson;
 import com.google.gson.JsonArray;
 import com.google.gson.JsonObject;
+import com.google.gson.JsonPrimitive;
 import io.netty.buffer.ByteBuf;
 import io.netty.channel.EventLoopGroup;
 import io.netty.util.concurrent.DefaultThreadFactory;
@@ -1628,4 +1629,82 @@ public class BrokerServiceTest extends BrokerTestBase {
             assertTrue(conf.isForceDeleteTenantAllowed());
         });
     }
+
+
+    @Test
+    public void testBrokerStatsTopicLoadFailed() throws Exception {
+        admin.namespaces().createNamespace("prop/ns-test");
+
+        String persistentTopic = "persistent://prop/ns-test/topic1_" + 
UUID.randomUUID();
+        String nonPersistentTopic = "non-persistent://prop/ns-test/topic2_" + 
UUID.randomUUID();
+
+        BrokerService brokerService = pulsar.getBrokerService();
+        brokerService = Mockito.spy(brokerService);
+        // mock create persistent topic failed
+        Mockito
+                .doAnswer(invocation -> {
+                    CompletableFuture<ManagedLedgerConfig> f = new 
CompletableFuture<>();
+                    f.completeExceptionally(new RuntimeException("This is an 
exception"));
+                    return f;
+                })
+                
.when(brokerService).getManagedLedgerConfig(Mockito.eq(TopicName.get(persistentTopic)));
+
+        // mock create non-persistent topic failed
+        Mockito
+                .doAnswer(inv -> {
+                    CompletableFuture<Void> f = new CompletableFuture<>();
+                    f.completeExceptionally(new RuntimeException("This is an 
exception"));
+                    return f;
+                })
+                
.when(brokerService).checkTopicNsOwnership(Mockito.eq(nonPersistentTopic));
+
+
+        PulsarService pulsarService = pulsar;
+        Field field = PulsarService.class.getDeclaredField("brokerService");
+        field.setAccessible(true);
+        field.set(pulsarService, brokerService);
+
+        CompletableFuture<Producer<String>> producer = 
pulsarClient.newProducer(Schema.STRING)
+                .topic(persistentTopic)
+                .createAsync();
+        CompletableFuture<Producer<String>> producer1 = 
pulsarClient.newProducer(Schema.STRING)
+                .topic(nonPersistentTopic)
+                .createAsync();
+
+        producer.whenComplete((v, t) -> {
+            if (t == null) {
+                try {
+                    v.close();
+                } catch (PulsarClientException e) {
+                    // ignore
+                }
+            }
+        });
+        producer1.whenComplete((v, t) -> {
+            if (t == null) {
+                try {
+                    v.close();
+                } catch (PulsarClientException e) {
+                    // ignore
+                }
+            }
+        });
+
+        Awaitility.waitAtMost(2, TimeUnit.MINUTES).until(() -> {
+            String json = admin.brokerStats().getMetrics();
+            JsonArray metrics = new Gson().fromJson(json, JsonArray.class);
+            AtomicBoolean flag = new AtomicBoolean(false);
+
+            metrics.forEach(ele -> {
+                JsonObject obj = ((JsonObject) ele);
+                JsonObject metrics0 = (JsonObject) obj.get("metrics");
+                JsonPrimitive v = (JsonPrimitive) 
metrics0.get("brk_topic_load_failed_count");
+                if (null != v && v.getAsDouble() >= 2D) {
+                    flag.set(true);
+                }
+            });
+
+            return flag.get();
+        });
+    }
 }
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/stats/PrometheusMetricsTest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/stats/PrometheusMetricsTest.java
index 1fa523b7be9..8a07038c043 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/stats/PrometheusMetricsTest.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/stats/PrometheusMetricsTest.java
@@ -323,6 +323,10 @@ public class PrometheusMetricsTest extends BrokerTestBase {
         assertEquals(cm.size(), 1);
         assertEquals(cm.get(0).tags.get("cluster"), "test");
 
+        cm = (List<Metric>) metrics.get("topic_load_failed_total");
+        assertEquals(cm.size(), 1);
+        assertEquals(cm.get(0).tags.get("cluster"), "test");
+
         cm = (List<Metric>) metrics.get("pulsar_in_bytes_total");
         assertEquals(cm.size(), 2);
         assertEquals(cm.get(0).tags.get("topic"), 
"persistent://my-property/use/my-ns/my-topic2");

Reply via email to