This is an automated email from the ASF dual-hosted git repository. yubiao pushed a commit to branch branch-2.10 in repository https://gitbox.apache.org/repos/asf/pulsar.git
commit 704630b35fcf23d802ad1fc9bc27d592189a53a4 Author: fengyubiao <[email protected]> AuthorDate: Mon Apr 29 20:46:43 2024 +0800 [fix] [broker] Fix metrics pulsar_topic_load_failed_count is 0 when load non-persistent topic fails and fix the flaky test testBrokerStatsTopicLoadFailed (#22580) (cherry picked from commit 340d60df0be32ed26586f292a8d24a8a6663aba2) --- .../pulsar/broker/service/BrokerService.java | 4 +- .../pulsar/broker/service/BrokerServiceTest.java | 119 +++++++++++++++++++++ 2 files changed, 121 insertions(+), 2 deletions(-) diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java index 4e5b15240db..4b86df9e379 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java @@ -1192,8 +1192,8 @@ public class BrokerService implements Closeable { if (log.isDebugEnabled()) { log.debug("Broker is unable to load non-persistent topic {}", topic); } - return FutureUtil.failedFuture( - new NotAllowedException("Broker is not unable to load non-persistent topic")); + topicFuture.completeExceptionally(new NotAllowedException("Broker is not unable to load non-persistent topic")); + return topicFuture; } final long topicCreateTimeMs = TimeUnit.NANOSECONDS.toMillis(System.nanoTime()); NonPersistentTopic nonPersistentTopic = new NonPersistentTopic(topic, this); diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/BrokerServiceTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/BrokerServiceTest.java index 34d1d15764d..a22418ad8e6 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/BrokerServiceTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/BrokerServiceTest.java @@ -41,6 +41,7 @@ import java.io.BufferedReader; import java.io.IOException; import java.io.InputStream; import java.io.InputStreamReader; +import java.io.StringReader; import java.lang.reflect.Array; import java.lang.reflect.Field; import java.nio.charset.StandardCharsets; @@ -67,6 +68,7 @@ import org.apache.bookkeeper.mledger.ManagedLedgerConfig; import org.apache.bookkeeper.mledger.ManagedLedgerException; import org.apache.bookkeeper.mledger.impl.ManagedLedgerFactoryImpl; import org.apache.bookkeeper.mledger.impl.ManagedLedgerImpl; +import org.apache.commons.lang3.StringUtils; import org.apache.http.HttpResponse; import org.apache.http.client.HttpClient; import org.apache.http.client.methods.HttpGet; @@ -102,6 +104,10 @@ import org.apache.pulsar.common.policies.data.TopicStats; import org.apache.pulsar.common.protocol.Commands; import org.apache.pulsar.common.util.netty.EventLoopUtil; import org.awaitility.Awaitility; +import org.apache.zookeeper.KeeperException; +import org.apache.zookeeper.MockZooKeeper; +import org.glassfish.jersey.client.JerseyClient; +import org.glassfish.jersey.client.JerseyClientBuilder; import org.testng.Assert; import org.testng.annotations.AfterClass; import org.testng.annotations.BeforeClass; @@ -1461,4 +1467,117 @@ public class BrokerServiceTest extends BrokerTestBase { assertEquals(admin.topics().getStats(topicName).getSubscriptions() .get("sub-1").getUnackedMessages(), 0); } + + @Test + public void testMetricsPersistentTopicLoadFails() throws Exception { + final String namespace = "prop/" + UUID.randomUUID().toString().replaceAll("-", ""); + String topic = "persistent://" + namespace + "/topic1_" + UUID.randomUUID(); + admin.namespaces().createNamespace(namespace); + admin.topics().createNonPartitionedTopic(topic); + admin.topics().unload(topic); + + // Inject an error that makes the topic load fails. + AtomicBoolean failMarker = new AtomicBoolean(true); + mockZooKeeper.failConditional(KeeperException.Code.NODEEXISTS, (op, path) -> { + if (failMarker.get() && op.equals(MockZooKeeper.Op.SET) && + path.endsWith(TopicName.get(topic).getPersistenceNamingEncoding())) { + return true; + } + return false; + }); + + // Do test + Thread.sleep(1000 * 3); + CompletableFuture<Producer<byte[]>> producer = pulsarClient.newProducer().topic(topic).createAsync(); + JerseyClient httpClient = JerseyClientBuilder.createClient(); + Awaitility.await().until(() -> { + String response = httpClient.target(pulsar.getWebServiceAddress()).path("/metrics/") + .request().get(String.class); + BufferedReader reader = new BufferedReader(new StringReader(response)); + String line; + String metricsLine = null; + while ((line = reader.readLine()) != null) { + if (StringUtils.isBlank(line)) { + continue; + } + if (line.startsWith("#")) { + continue; + } + if (line.contains("topic_load_failed")) { + metricsLine = line; + break; + } + } + log.info("topic_load_failed: {}", metricsLine); + if (metricsLine == null) { + return false; + } + reader.close(); + String[] parts = metricsLine.split(" "); + Double value = Double.valueOf(parts[parts.length - 1]); + return value >= 1D; + }); + + // Remove the injection. + failMarker.set(false); + // cleanup. + httpClient.close(); + producer.join().close(); + admin.topics().delete(topic); + admin.namespaces().deleteNamespace(namespace); + } + + @Test + public void testMetricsNonPersistentTopicLoadFails() throws Exception { + final String namespace = "prop/" + UUID.randomUUID().toString().replaceAll("-", ""); + String topic = "non-persistent://" + namespace + "/topic1_" + UUID.randomUUID(); + admin.namespaces().createNamespace(namespace); + + // Inject an error that makes the topic load fails. + pulsar.getConfiguration().setEnableNonPersistentTopics(false); + + // Do test. + CompletableFuture<Producer<byte[]>> producer = pulsarClient.newProducer().topic(topic).createAsync(); + JerseyClient httpClient = JerseyClientBuilder.createClient(); + Awaitility.await().until(() -> { + String response = httpClient.target(pulsar.getWebServiceAddress()).path("/metrics/") + .request().get(String.class); + BufferedReader reader = new BufferedReader(new StringReader(response)); + String line; + String metricsLine = null; + while ((line = reader.readLine()) != null) { + if (StringUtils.isBlank(line)) { + continue; + } + if (line.startsWith("#")) { + continue; + } + if (line.contains("topic_load_failed")) { + metricsLine = line; + break; + } + } + log.info("topic_load_failed: {}", metricsLine); + if (metricsLine == null) { + return false; + } + reader.close(); + String[] parts = metricsLine.split(" "); + Double value = Double.valueOf(parts[parts.length - 1]); + return value >= 1D; + }); + + // Remove the injection. + pulsar.getConfiguration().setEnableNonPersistentTopics(true); + + // cleanup. + httpClient.close(); + try { + producer.join().close(); + } catch (Exception ex) { + // The producer creation failed, so skip to close it. + } + admin.topics().delete(topic); + admin.namespaces().deleteNamespace(namespace); + } }
