This is an automated email from the ASF dual-hosted git repository.
yubiao pushed a commit to branch branch-3.2
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/branch-3.2 by this push:
new 1524112d98f [fix] [broker] Fix metrics pulsar_topic_load_failed_count
is 0 when load non-persistent topic fails and fix the flaky test
testBrokerStatsTopicLoadFailed (#22580)
1524112d98f is described below
commit 1524112d98feb3b082c7984dfa23cf43fb0ae317
Author: fengyubiao <[email protected]>
AuthorDate: Thu May 2 23:50:38 2024 +0800
[fix] [broker] Fix metrics pulsar_topic_load_failed_count is 0 when load
non-persistent topic fails and fix the flaky test
testBrokerStatsTopicLoadFailed (#22580)
(cherry picked from commit 340d60df0be32ed26586f292a8d24a8a6663aba2)
---
.../pulsar/broker/service/BrokerService.java | 3 +-
.../pulsar/broker/service/BrokerServiceTest.java | 184 +++++++++++++--------
2 files changed, 118 insertions(+), 69 deletions(-)
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java
index dc66750fd96..9654f198d7e 100644
---
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java
+++
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java
@@ -1197,7 +1197,8 @@ public class BrokerService implements Closeable {
nonPersistentTopic = newTopic(topic, null, this,
NonPersistentTopic.class);
} catch (Throwable e) {
log.warn("Failed to create topic {}", topic, e);
- return FutureUtil.failedFuture(e);
+ topicFuture.completeExceptionally(e);
+ return topicFuture;
}
CompletableFuture<Void> isOwner = checkTopicNsOwnership(topic);
isOwner.thenRun(() -> {
diff --git
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/BrokerServiceTest.java
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/BrokerServiceTest.java
index b6a73274f44..fcf11fad708 100644
---
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/BrokerServiceTest.java
+++
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/BrokerServiceTest.java
@@ -20,9 +20,12 @@ package org.apache.pulsar.broker.service;
import static
org.apache.pulsar.common.naming.SystemTopicNames.TRANSACTION_COORDINATOR_ASSIGN;
import static
org.apache.pulsar.common.naming.SystemTopicNames.TRANSACTION_COORDINATOR_LOG;
+import static org.mockito.ArgumentMatchers.anyString;
import static org.mockito.Mockito.any;
import static org.mockito.Mockito.doReturn;
+import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.spy;
+import static org.mockito.Mockito.when;
import static org.testng.Assert.assertEquals;
import static org.testng.Assert.assertFalse;
import static org.testng.Assert.assertNotNull;
@@ -33,7 +36,6 @@ import com.google.common.collect.Sets;
import com.google.gson.Gson;
import com.google.gson.JsonArray;
import com.google.gson.JsonObject;
-import com.google.gson.JsonPrimitive;
import io.netty.buffer.ByteBuf;
import io.netty.channel.EventLoopGroup;
import io.netty.util.concurrent.DefaultThreadFactory;
@@ -41,6 +43,7 @@ import java.io.BufferedReader;
import java.io.IOException;
import java.io.InputStream;
import java.io.InputStreamReader;
+import java.io.StringReader;
import java.lang.reflect.Array;
import java.lang.reflect.Field;
import java.nio.charset.StandardCharsets;
@@ -69,6 +72,7 @@ import org.apache.bookkeeper.mledger.ManagedLedgerException;
import org.apache.bookkeeper.mledger.impl.ManagedCursorImpl;
import org.apache.bookkeeper.mledger.impl.ManagedLedgerFactoryImpl;
import org.apache.bookkeeper.mledger.impl.ManagedLedgerImpl;
+import org.apache.commons.lang3.StringUtils;
import org.apache.http.HttpResponse;
import org.apache.http.client.methods.HttpGet;
import org.apache.http.impl.client.CloseableHttpClient;
@@ -112,7 +116,11 @@ import org.apache.pulsar.common.policies.data.TopicStats;
import org.apache.pulsar.common.protocol.Commands;
import org.apache.pulsar.common.util.netty.EventLoopUtil;
import org.apache.pulsar.compaction.Compactor;
+import org.apache.zookeeper.KeeperException;
+import org.apache.zookeeper.MockZooKeeper;
import org.awaitility.Awaitility;
+import org.glassfish.jersey.client.JerseyClient;
+import org.glassfish.jersey.client.JerseyClientBuilder;
import org.mockito.Mockito;
import org.testng.Assert;
import org.testng.annotations.AfterClass;
@@ -1587,82 +1595,122 @@ public class BrokerServiceTest extends BrokerTestBase {
});
}
- // this test is disabled since it is flaky
- @Test(enabled = false)
- public void testBrokerStatsTopicLoadFailed() throws Exception {
- admin.namespaces().createNamespace("prop/ns-test");
-
- String persistentTopic = "persistent://prop/ns-test/topic1_" +
UUID.randomUUID();
- String nonPersistentTopic = "non-persistent://prop/ns-test/topic2_" +
UUID.randomUUID();
+ @Test
+ public void testMetricsPersistentTopicLoadFails() throws Exception {
+ final String namespace = "prop/" +
UUID.randomUUID().toString().replaceAll("-", "");
+ String topic = "persistent://" + namespace + "/topic1_" +
UUID.randomUUID();
+ admin.namespaces().createNamespace(namespace);
+ admin.topics().createNonPartitionedTopic(topic);
+ admin.topics().unload(topic);
+
+ // Inject an error that makes the topic load fails.
+ AtomicBoolean failMarker = new AtomicBoolean(true);
+ mockZooKeeper.failConditional(KeeperException.Code.NODEEXISTS, (op,
path) -> {
+ if (failMarker.get() && op.equals(MockZooKeeper.Op.SET) &&
+
path.endsWith(TopicName.get(topic).getPersistenceNamingEncoding())) {
+ return true;
+ }
+ return false;
+ });
- BrokerService brokerService = pulsar.getBrokerService();
- brokerService = Mockito.spy(brokerService);
- // mock create persistent topic failed
- Mockito
- .doAnswer(invocation -> {
- CompletableFuture<ManagedLedgerConfig> f = new
CompletableFuture<>();
- f.completeExceptionally(new RuntimeException("This is an
exception"));
- return f;
- })
-
.when(brokerService).getManagedLedgerConfig(Mockito.eq(TopicName.get(persistentTopic)));
-
- // mock create non-persistent topic failed
- Mockito
- .doAnswer(inv -> {
- CompletableFuture<Void> f = new CompletableFuture<>();
- f.completeExceptionally(new RuntimeException("This is an
exception"));
- return f;
- })
-
.when(brokerService).checkTopicNsOwnership(Mockito.eq(nonPersistentTopic));
-
-
- PulsarService pulsarService = pulsar;
- Field field = PulsarService.class.getDeclaredField("brokerService");
- field.setAccessible(true);
- field.set(pulsarService, brokerService);
-
- CompletableFuture<Producer<String>> producer =
pulsarClient.newProducer(Schema.STRING)
- .topic(persistentTopic)
- .createAsync();
- CompletableFuture<Producer<String>> producer1 =
pulsarClient.newProducer(Schema.STRING)
- .topic(nonPersistentTopic)
- .createAsync();
-
- producer.whenComplete((v, t) -> {
- if (t == null) {
- try {
- v.close();
- } catch (PulsarClientException e) {
- // ignore
+ // Do test
+ Thread.sleep(1000 * 3);
+ CompletableFuture<Producer<byte[]>> producer =
pulsarClient.newProducer().topic(topic).createAsync();
+ JerseyClient httpClient = JerseyClientBuilder.createClient();
+ Awaitility.await().until(() -> {
+ String response =
httpClient.target(pulsar.getWebServiceAddress()).path("/metrics/")
+ .request().get(String.class);
+ BufferedReader reader = new BufferedReader(new
StringReader(response));
+ String line;
+ String metricsLine = null;
+ while ((line = reader.readLine()) != null) {
+ if (StringUtils.isBlank(line)) {
+ continue;
+ }
+ if (line.startsWith("#")) {
+ continue;
+ }
+ if (line.contains("topic_load_failed")) {
+ metricsLine = line;
+ break;
}
}
+ log.info("topic_load_failed: {}", metricsLine);
+ if (metricsLine == null) {
+ return false;
+ }
+ reader.close();
+ String[] parts = metricsLine.split(" ");
+ Double value = Double.valueOf(parts[parts.length - 1]);
+ return value >= 1D;
});
- producer1.whenComplete((v, t) -> {
- if (t == null) {
- try {
- v.close();
- } catch (PulsarClientException e) {
- // ignore
+
+ // Remove the injection.
+ failMarker.set(false);
+ // cleanup.
+ httpClient.close();
+ producer.join().close();
+ admin.topics().delete(topic);
+ admin.namespaces().deleteNamespace(namespace);
+ }
+
+ @Test
+ public void testMetricsNonPersistentTopicLoadFails() throws Exception {
+ final String namespace = "prop/" +
UUID.randomUUID().toString().replaceAll("-", "");
+ String topic = "non-persistent://" + namespace + "/topic1_" +
UUID.randomUUID();
+ admin.namespaces().createNamespace(namespace);
+
+ // Inject an error that makes the topic load fails.
+ // Since we did not set a topic factory name, the "topicFactory"
variable is null, inject a mocked
+ // "topicFactory".
+ Field fieldTopicFactory =
BrokerService.class.getDeclaredField("topicFactory");
+ fieldTopicFactory.setAccessible(true);
+ TopicFactory originalTopicFactory = (TopicFactory)
fieldTopicFactory.get(pulsar.getBrokerService());
+ assertNull(originalTopicFactory);
+ TopicFactory mockedTopicFactory = mock(TopicFactory.class);
+ when(mockedTopicFactory.create(anyString(), any(), any(), any()))
+ .thenThrow(new RuntimeException("mocked error"));
+ fieldTopicFactory.set(pulsar.getBrokerService(), mockedTopicFactory);
+
+ // Do test.
+ CompletableFuture<Producer<byte[]>> producer =
pulsarClient.newProducer().topic(topic).createAsync();
+ JerseyClient httpClient = JerseyClientBuilder.createClient();
+ Awaitility.await().until(() -> {
+ String response =
httpClient.target(pulsar.getWebServiceAddress()).path("/metrics/")
+ .request().get(String.class);
+ BufferedReader reader = new BufferedReader(new
StringReader(response));
+ String line;
+ String metricsLine = null;
+ while ((line = reader.readLine()) != null) {
+ if (StringUtils.isBlank(line)) {
+ continue;
+ }
+ if (line.startsWith("#")) {
+ continue;
}
+ if (line.contains("topic_load_failed")) {
+ metricsLine = line;
+ break;
+ }
+ }
+ log.info("topic_load_failed: {}", metricsLine);
+ if (metricsLine == null) {
+ return false;
}
+ reader.close();
+ String[] parts = metricsLine.split(" ");
+ Double value = Double.valueOf(parts[parts.length - 1]);
+ return value >= 1D;
});
- Awaitility.waitAtMost(2, TimeUnit.MINUTES).until(() -> {
- String json = admin.brokerStats().getMetrics();
- JsonArray metrics = new Gson().fromJson(json, JsonArray.class);
- AtomicBoolean flag = new AtomicBoolean(false);
-
- metrics.forEach(ele -> {
- JsonObject obj = ((JsonObject) ele);
- JsonObject metrics0 = (JsonObject) obj.get("metrics");
- JsonPrimitive v = (JsonPrimitive)
metrics0.get("brk_topic_load_failed_count");
- if (null != v && v.getAsDouble() >= 2D) {
- flag.set(true);
- }
- });
+ // Remove the injection.
+ fieldTopicFactory.set(pulsar.getBrokerService(), null);
- return flag.get();
- });
+ // cleanup.
+ httpClient.close();
+ producer.join().close();
+ admin.topics().delete(topic);
+ admin.namespaces().deleteNamespace(namespace);
}
@Test