This is an automated email from the ASF dual-hosted git repository.
technoboy pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/master by this push:
new 43b3622cc7e [improve][broker] Add `topic_load_failed` metric (#19236)
43b3622cc7e is described below
commit 43b3622cc7e7f746ca9920fdc704dc7448767ac7
Author: Tao Jiuming <[email protected]>
AuthorDate: Sat Jun 3 21:14:16 2023 +0800
[improve][broker] Add `topic_load_failed` metric (#19236)
Co-authored-by: daojun <[email protected]>
---
.../pulsar/broker/service/BrokerService.java | 9 +++
.../apache/pulsar/broker/service/PulsarStats.java | 4 ++
.../broker/stats/BrokerOperabilityMetrics.java | 10 ++-
.../pulsar/broker/service/BrokerServiceTest.java | 80 ++++++++++++++++++++++
.../pulsar/broker/stats/PrometheusMetricsTest.java | 4 ++
5 files changed, 106 insertions(+), 1 deletion(-)
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java
index 663d013dc74..3dc7718c32a 100644
---
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java
+++
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java
@@ -1226,6 +1226,10 @@ public class BrokerService implements Closeable {
private CompletableFuture<Optional<Topic>> createNonPersistentTopic(String
topic) {
CompletableFuture<Optional<Topic>> topicFuture = new
CompletableFuture<>();
+ topicFuture.exceptionally(t -> {
+ pulsarStats.recordTopicLoadFailed();
+ return null;
+ });
if (!pulsar.getConfiguration().isEnableNonPersistentTopics()) {
if (log.isDebugEnabled()) {
log.debug("Broker is unable to load non-persistent topic {}",
topic);
@@ -1618,6 +1622,11 @@ public class BrokerService implements Closeable {
TopicName topicName = TopicName.get(topic);
final long topicCreateTimeMs =
TimeUnit.NANOSECONDS.toMillis(System.nanoTime());
+ topicFuture.exceptionally(t -> {
+ pulsarStats.recordTopicLoadFailed();
+ return null;
+ });
+
if (isTransactionInternalName(topicName)) {
String msg = String.format("Can not create transaction system
topic %s", topic);
log.warn(msg);
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/PulsarStats.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/PulsarStats.java
index e959e9bbda2..db14892d266 100644
---
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/PulsarStats.java
+++
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/PulsarStats.java
@@ -265,6 +265,10 @@ public class PulsarStats implements Closeable {
}
}
+ public void recordTopicLoadFailed() {
+ brokerOperabilityMetrics.recordTopicLoadFailed();
+ }
+
public void recordConnectionCreate() {
brokerOperabilityMetrics.recordConnectionCreate();
}
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/BrokerOperabilityMetrics.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/BrokerOperabilityMetrics.java
index 90913333871..400dbd3335a 100644
---
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/BrokerOperabilityMetrics.java
+++
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/BrokerOperabilityMetrics.java
@@ -18,6 +18,7 @@
*/
package org.apache.pulsar.broker.stats;
+import io.prometheus.client.Counter;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
@@ -29,6 +30,7 @@ import org.apache.pulsar.common.stats.Metrics;
/**
*/
public class BrokerOperabilityMetrics {
+ private static final Counter TOPIC_LOAD_FAILED =
Counter.build("topic_load_failed", "-").register();
private final List<Metrics> metricsList;
private final String localCluster;
private final DimensionStats topicLoadStats;
@@ -84,7 +86,9 @@ public class BrokerOperabilityMetrics {
}
Metrics getTopicLoadMetrics() {
- return getDimensionMetrics("topic_load_times", "topic_load",
topicLoadStats);
+ Metrics metrics = getDimensionMetrics("topic_load_times",
"topic_load", topicLoadStats);
+ metrics.put("brk_topic_load_failed_count", TOPIC_LOAD_FAILED.get());
+ return metrics;
}
Metrics getDimensionMetrics(String metricsName, String dimensionName,
DimensionStats stats) {
@@ -112,6 +116,10 @@ public class BrokerOperabilityMetrics {
topicLoadStats.recordDimensionTimeValue(topicLoadLatencyMs,
TimeUnit.MILLISECONDS);
}
+ public void recordTopicLoadFailed() {
+ this.TOPIC_LOAD_FAILED.inc();
+ }
+
public void recordConnectionCreate() {
this.connectionTotalCreatedCount.increment();
this.connectionActive.increment();
diff --git
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/BrokerServiceTest.java
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/BrokerServiceTest.java
index e9b7ddb991e..1d45c87b2f2 100644
---
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/BrokerServiceTest.java
+++
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/BrokerServiceTest.java
@@ -33,6 +33,7 @@ import com.google.common.collect.Sets;
import com.google.gson.Gson;
import com.google.gson.JsonArray;
import com.google.gson.JsonObject;
+import com.google.gson.JsonPrimitive;
import io.netty.buffer.ByteBuf;
import io.netty.channel.EventLoopGroup;
import io.netty.util.concurrent.DefaultThreadFactory;
@@ -107,6 +108,7 @@ import org.apache.pulsar.common.policies.data.TopicStats;
import org.apache.pulsar.common.protocol.Commands;
import org.apache.pulsar.common.util.netty.EventLoopUtil;
import org.awaitility.Awaitility;
+import org.mockito.Mockito;
import org.testng.Assert;
import org.testng.annotations.AfterClass;
import org.testng.annotations.BeforeClass;
@@ -1513,4 +1515,82 @@ public class BrokerServiceTest extends BrokerTestBase {
assertTrue(conf.isForceDeleteTenantAllowed());
});
}
+
+
+ @Test
+ public void testBrokerStatsTopicLoadFailed() throws Exception {
+ admin.namespaces().createNamespace("prop/ns-test");
+
+ String persistentTopic = "persistent://prop/ns-test/topic1_" +
UUID.randomUUID();
+ String nonPersistentTopic = "non-persistent://prop/ns-test/topic2_" +
UUID.randomUUID();
+
+ BrokerService brokerService = pulsar.getBrokerService();
+ brokerService = Mockito.spy(brokerService);
+ // mock create persistent topic failed
+ Mockito
+ .doAnswer(invocation -> {
+ CompletableFuture<ManagedLedgerConfig> f = new
CompletableFuture<>();
+ f.completeExceptionally(new RuntimeException("This is an
exception"));
+ return f;
+ })
+
.when(brokerService).getManagedLedgerConfig(Mockito.eq(TopicName.get(persistentTopic)));
+
+ // mock create non-persistent topic failed
+ Mockito
+ .doAnswer(inv -> {
+ CompletableFuture<Void> f = new CompletableFuture<>();
+ f.completeExceptionally(new RuntimeException("This is an
exception"));
+ return f;
+ })
+
.when(brokerService).checkTopicNsOwnership(Mockito.eq(nonPersistentTopic));
+
+
+ PulsarService pulsarService = pulsar;
+ Field field = PulsarService.class.getDeclaredField("brokerService");
+ field.setAccessible(true);
+ field.set(pulsarService, brokerService);
+
+ CompletableFuture<Producer<String>> producer =
pulsarClient.newProducer(Schema.STRING)
+ .topic(persistentTopic)
+ .createAsync();
+ CompletableFuture<Producer<String>> producer1 =
pulsarClient.newProducer(Schema.STRING)
+ .topic(nonPersistentTopic)
+ .createAsync();
+
+ producer.whenComplete((v, t) -> {
+ if (t == null) {
+ try {
+ v.close();
+ } catch (PulsarClientException e) {
+ // ignore
+ }
+ }
+ });
+ producer1.whenComplete((v, t) -> {
+ if (t == null) {
+ try {
+ v.close();
+ } catch (PulsarClientException e) {
+ // ignore
+ }
+ }
+ });
+
+ Awaitility.waitAtMost(2, TimeUnit.MINUTES).until(() -> {
+ String json = admin.brokerStats().getMetrics();
+ JsonArray metrics = new Gson().fromJson(json, JsonArray.class);
+ AtomicBoolean flag = new AtomicBoolean(false);
+
+ metrics.forEach(ele -> {
+ JsonObject obj = ((JsonObject) ele);
+ JsonObject metrics0 = (JsonObject) obj.get("metrics");
+ JsonPrimitive v = (JsonPrimitive)
metrics0.get("brk_topic_load_failed_count");
+ if (null != v && v.getAsDouble() >= 2D) {
+ flag.set(true);
+ }
+ });
+
+ return flag.get();
+ });
+ }
}
diff --git
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/stats/PrometheusMetricsTest.java
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/stats/PrometheusMetricsTest.java
index a7a28afd8ac..6cb7378330f 100644
---
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/stats/PrometheusMetricsTest.java
+++
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/stats/PrometheusMetricsTest.java
@@ -333,6 +333,10 @@ public class PrometheusMetricsTest extends BrokerTestBase {
assertEquals(cm.size(), 1);
assertEquals(cm.get(0).tags.get("cluster"), "test");
+ cm = (List<Metric>) metrics.get("topic_load_failed_total");
+ assertEquals(cm.size(), 1);
+ assertEquals(cm.get(0).tags.get("cluster"), "test");
+
cm = (List<Metric>) metrics.get("pulsar_in_bytes_total");
assertEquals(cm.size(), 2);
assertEquals(cm.get(0).tags.get("topic"),
"persistent://my-property/use/my-ns/my-topic2");