This is an automated email from the ASF dual-hosted git repository.
lhotari pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/master by this push:
new 340d60df0be [fix] [broker] Fix metrics pulsar_topic_load_failed_count
is 0 when load non-persistent topic fails and fix the flaky test
testBrokerStatsTopicLoadFailed (#22580)
340d60df0be is described below
commit 340d60df0be32ed26586f292a8d24a8a6663aba2
Author: fengyubiao <[email protected]>
AuthorDate: Mon Apr 29 20:46:43 2024 +0800
[fix] [broker] Fix metrics pulsar_topic_load_failed_count is 0 when load
non-persistent topic fails and fix the flaky test
testBrokerStatsTopicLoadFailed (#22580)
---
.../pulsar/broker/service/BrokerService.java | 3 +-
.../pulsar/broker/service/BrokerServiceTest.java | 159 ++++++++++++---------
2 files changed, 91 insertions(+), 71 deletions(-)
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java
index 1f0cb12258e..b08b1a472ca 100644
---
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java
+++
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java
@@ -1265,7 +1265,8 @@ public class BrokerService implements Closeable {
nonPersistentTopic = newTopic(topic, null, this,
NonPersistentTopic.class);
} catch (Throwable e) {
log.warn("Failed to create topic {}", topic, e);
- return FutureUtil.failedFuture(e);
+ topicFuture.completeExceptionally(e);
+ return topicFuture;
}
CompletableFuture<Void> isOwner = checkTopicNsOwnership(topic);
isOwner.thenRun(() -> {
diff --git
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/BrokerServiceTest.java
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/BrokerServiceTest.java
index 8ebba5c9aea..5fbe1476380 100644
---
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/BrokerServiceTest.java
+++
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/BrokerServiceTest.java
@@ -20,20 +20,23 @@ package org.apache.pulsar.broker.service;
import static
org.apache.pulsar.common.naming.SystemTopicNames.TRANSACTION_COORDINATOR_ASSIGN;
import static
org.apache.pulsar.common.naming.SystemTopicNames.TRANSACTION_COORDINATOR_LOG;
+import static org.mockito.ArgumentMatchers.anyString;
import static org.mockito.Mockito.any;
import static org.mockito.Mockito.doReturn;
+import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.spy;
+import static org.mockito.Mockito.when;
import static org.testng.Assert.assertEquals;
import static org.testng.Assert.assertFalse;
import static org.testng.Assert.assertNotNull;
import static org.testng.Assert.assertNull;
import static org.testng.Assert.assertTrue;
import static org.testng.Assert.fail;
+import com.google.common.collect.Multimap;
import com.google.common.collect.Sets;
import com.google.gson.Gson;
import com.google.gson.JsonArray;
import com.google.gson.JsonObject;
-import com.google.gson.JsonPrimitive;
import io.netty.buffer.ByteBuf;
import io.netty.channel.EventLoopGroup;
import io.netty.util.concurrent.DefaultThreadFactory;
@@ -79,6 +82,7 @@ import org.apache.pulsar.broker.namespace.NamespaceService;
import
org.apache.pulsar.broker.service.BrokerServiceException.PersistenceException;
import org.apache.pulsar.broker.service.persistent.PersistentSubscription;
import org.apache.pulsar.broker.service.persistent.PersistentTopic;
+import org.apache.pulsar.broker.stats.prometheus.PrometheusMetricsClient;
import org.apache.pulsar.broker.stats.prometheus.PrometheusRawMetricsProvider;
import org.apache.pulsar.client.admin.BrokerStats;
import org.apache.pulsar.client.admin.PulsarAdminException;
@@ -113,7 +117,11 @@ import org.apache.pulsar.common.policies.data.TopicStats;
import org.apache.pulsar.common.protocol.Commands;
import org.apache.pulsar.common.util.netty.EventLoopUtil;
import org.apache.pulsar.compaction.Compactor;
+import org.apache.zookeeper.KeeperException;
+import org.apache.zookeeper.MockZooKeeper;
import org.awaitility.Awaitility;
+import org.glassfish.jersey.client.JerseyClient;
+import org.glassfish.jersey.client.JerseyClientBuilder;
import org.mockito.Mockito;
import org.testng.Assert;
import org.testng.annotations.AfterClass;
@@ -1589,82 +1597,93 @@ public class BrokerServiceTest extends BrokerTestBase {
});
}
- // this test is disabled since it is flaky
- @Test(enabled = false)
- public void testBrokerStatsTopicLoadFailed() throws Exception {
- admin.namespaces().createNamespace("prop/ns-test");
-
- String persistentTopic = "persistent://prop/ns-test/topic1_" +
UUID.randomUUID();
- String nonPersistentTopic = "non-persistent://prop/ns-test/topic2_" +
UUID.randomUUID();
-
- BrokerService brokerService = pulsar.getBrokerService();
- brokerService = Mockito.spy(brokerService);
- // mock create persistent topic failed
- Mockito
- .doAnswer(invocation -> {
- CompletableFuture<ManagedLedgerConfig> f = new
CompletableFuture<>();
- f.completeExceptionally(new RuntimeException("This is an
exception"));
- return f;
- })
-
.when(brokerService).getManagedLedgerConfig(Mockito.eq(TopicName.get(persistentTopic)));
-
- // mock create non-persistent topic failed
- Mockito
- .doAnswer(inv -> {
- CompletableFuture<Void> f = new CompletableFuture<>();
- f.completeExceptionally(new RuntimeException("This is an
exception"));
- return f;
- })
-
.when(brokerService).checkTopicNsOwnership(Mockito.eq(nonPersistentTopic));
-
-
- PulsarService pulsarService = pulsar;
- Field field = PulsarService.class.getDeclaredField("brokerService");
- field.setAccessible(true);
- field.set(pulsarService, brokerService);
-
- CompletableFuture<Producer<String>> producer =
pulsarClient.newProducer(Schema.STRING)
- .topic(persistentTopic)
- .createAsync();
- CompletableFuture<Producer<String>> producer1 =
pulsarClient.newProducer(Schema.STRING)
- .topic(nonPersistentTopic)
- .createAsync();
-
- producer.whenComplete((v, t) -> {
- if (t == null) {
- try {
- v.close();
- } catch (PulsarClientException e) {
- // ignore
- }
+ @Test
+ public void testMetricsPersistentTopicLoadFails() throws Exception {
+ final String namespace = "prop/" +
UUID.randomUUID().toString().replaceAll("-", "");
+ String topic = "persistent://" + namespace + "/topic1_" +
UUID.randomUUID();
+ admin.namespaces().createNamespace(namespace);
+ admin.topics().createNonPartitionedTopic(topic);
+ admin.topics().unload(topic);
+
+ // Inject an error that makes the topic load fails.
+ AtomicBoolean failMarker = new AtomicBoolean(true);
+ mockZooKeeper.failConditional(KeeperException.Code.NODEEXISTS, (op,
path) -> {
+ if (failMarker.get() && op.equals(MockZooKeeper.Op.SET) &&
+
path.endsWith(TopicName.get(topic).getPersistenceNamingEncoding())) {
+ return true;
}
+ return false;
});
- producer1.whenComplete((v, t) -> {
- if (t == null) {
- try {
- v.close();
- } catch (PulsarClientException e) {
- // ignore
- }
+
+ // Do test
+ CompletableFuture<Producer<byte[]>> producer =
pulsarClient.newProducer().topic(topic).createAsync();
+ JerseyClient httpClient = JerseyClientBuilder.createClient();
+ Awaitility.await().until(() -> {
+ String response =
httpClient.target(pulsar.getWebServiceAddress()).path("/metrics/")
+ .request().get(String.class);
+ Multimap<String, PrometheusMetricsClient.Metric> metricMap =
PrometheusMetricsClient.parseMetrics(response);
+ if (!metricMap.containsKey("pulsar_topic_load_failed_count")) {
+ return false;
+ }
+ double topic_load_failed_count = 0;
+ for (PrometheusMetricsClient.Metric metric :
metricMap.get("pulsar_topic_load_failed_count")) {
+ topic_load_failed_count += metric.value;
}
+ return topic_load_failed_count >= 1D;
});
- Awaitility.waitAtMost(2, TimeUnit.MINUTES).until(() -> {
- String json = admin.brokerStats().getMetrics();
- JsonArray metrics = new Gson().fromJson(json, JsonArray.class);
- AtomicBoolean flag = new AtomicBoolean(false);
-
- metrics.forEach(ele -> {
- JsonObject obj = ((JsonObject) ele);
- JsonObject metrics0 = (JsonObject) obj.get("metrics");
- JsonPrimitive v = (JsonPrimitive)
metrics0.get("brk_topic_load_failed_count");
- if (null != v && v.getAsDouble() >= 2D) {
- flag.set(true);
- }
- });
+ // Remove the injection.
+ failMarker.set(false);
+ // cleanup.
+ httpClient.close();
+ producer.join().close();
+ admin.topics().delete(topic);
+ admin.namespaces().deleteNamespace(namespace);
+ }
- return flag.get();
+ @Test
+ public void testMetricsNonPersistentTopicLoadFails() throws Exception {
+ final String namespace = "prop/" +
UUID.randomUUID().toString().replaceAll("-", "");
+ String topic = "non-persistent://" + namespace + "/topic1_" +
UUID.randomUUID();
+ admin.namespaces().createNamespace(namespace);
+
+ // Inject an error that makes the topic load fails.
+ // Since we did not set a topic factory name, the "topicFactory"
variable is null, inject a mocked
+ // "topicFactory".
+ Field fieldTopicFactory =
BrokerService.class.getDeclaredField("topicFactory");
+ fieldTopicFactory.setAccessible(true);
+ TopicFactory originalTopicFactory = (TopicFactory)
fieldTopicFactory.get(pulsar.getBrokerService());
+ assertNull(originalTopicFactory);
+ TopicFactory mockedTopicFactory = mock(TopicFactory.class);
+ when(mockedTopicFactory.create(anyString(), any(), any(), any()))
+ .thenThrow(new RuntimeException("mocked error"));
+ fieldTopicFactory.set(pulsar.getBrokerService(), mockedTopicFactory);
+
+ // Do test.
+ CompletableFuture<Producer<byte[]>> producer =
pulsarClient.newProducer().topic(topic).createAsync();
+ JerseyClient httpClient = JerseyClientBuilder.createClient();
+ Awaitility.await().until(() -> {
+ String response =
httpClient.target(pulsar.getWebServiceAddress()).path("/metrics/")
+ .request().get(String.class);
+ Multimap<String, PrometheusMetricsClient.Metric> metricMap =
PrometheusMetricsClient.parseMetrics(response);
+ if (!metricMap.containsKey("pulsar_topic_load_failed_count")) {
+ return false;
+ }
+ double topic_load_failed_count = 0;
+ for (PrometheusMetricsClient.Metric metric :
metricMap.get("pulsar_topic_load_failed_count")) {
+ topic_load_failed_count += metric.value;
+ }
+ return topic_load_failed_count >= 1D;
});
+
+ // Remove the injection.
+ fieldTopicFactory.set(pulsar.getBrokerService(), null);
+
+ // cleanup.
+ httpClient.close();
+ producer.join().close();
+ admin.topics().delete(topic);
+ admin.namespaces().deleteNamespace(namespace);
}
@Test