This is an automated email from the ASF dual-hosted git repository.

yubiao pushed a commit to branch branch-2.10
in repository https://gitbox.apache.org/repos/asf/pulsar.git

commit 704630b35fcf23d802ad1fc9bc27d592189a53a4
Author: fengyubiao <[email protected]>
AuthorDate: Mon Apr 29 20:46:43 2024 +0800

    [fix] [broker] Fix metrics pulsar_topic_load_failed_count is 0 when load 
non-persistent topic fails and fix the flaky test 
testBrokerStatsTopicLoadFailed (#22580)
    
    (cherry picked from commit 340d60df0be32ed26586f292a8d24a8a6663aba2)
---
 .../pulsar/broker/service/BrokerService.java       |   4 +-
 .../pulsar/broker/service/BrokerServiceTest.java   | 119 +++++++++++++++++++++
 2 files changed, 121 insertions(+), 2 deletions(-)

diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java
index 4e5b15240db..4b86df9e379 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java
@@ -1192,8 +1192,8 @@ public class BrokerService implements Closeable {
             if (log.isDebugEnabled()) {
                 log.debug("Broker is unable to load non-persistent topic {}", 
topic);
             }
-            return FutureUtil.failedFuture(
-                    new NotAllowedException("Broker is not unable to load 
non-persistent topic"));
+            topicFuture.completeExceptionally(new NotAllowedException("Broker 
is not unable to load non-persistent topic"));
+            return topicFuture;
         }
         final long topicCreateTimeMs = 
TimeUnit.NANOSECONDS.toMillis(System.nanoTime());
         NonPersistentTopic nonPersistentTopic = new NonPersistentTopic(topic, 
this);
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/BrokerServiceTest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/BrokerServiceTest.java
index 34d1d15764d..a22418ad8e6 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/BrokerServiceTest.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/BrokerServiceTest.java
@@ -41,6 +41,7 @@ import java.io.BufferedReader;
 import java.io.IOException;
 import java.io.InputStream;
 import java.io.InputStreamReader;
+import java.io.StringReader;
 import java.lang.reflect.Array;
 import java.lang.reflect.Field;
 import java.nio.charset.StandardCharsets;
@@ -67,6 +68,7 @@ import org.apache.bookkeeper.mledger.ManagedLedgerConfig;
 import org.apache.bookkeeper.mledger.ManagedLedgerException;
 import org.apache.bookkeeper.mledger.impl.ManagedLedgerFactoryImpl;
 import org.apache.bookkeeper.mledger.impl.ManagedLedgerImpl;
+import org.apache.commons.lang3.StringUtils;
 import org.apache.http.HttpResponse;
 import org.apache.http.client.HttpClient;
 import org.apache.http.client.methods.HttpGet;
@@ -102,6 +104,10 @@ import org.apache.pulsar.common.policies.data.TopicStats;
 import org.apache.pulsar.common.protocol.Commands;
 import org.apache.pulsar.common.util.netty.EventLoopUtil;
 import org.awaitility.Awaitility;
+import org.apache.zookeeper.KeeperException;
+import org.apache.zookeeper.MockZooKeeper;
+import org.glassfish.jersey.client.JerseyClient;
+import org.glassfish.jersey.client.JerseyClientBuilder;
 import org.testng.Assert;
 import org.testng.annotations.AfterClass;
 import org.testng.annotations.BeforeClass;
@@ -1461,4 +1467,117 @@ public class BrokerServiceTest extends BrokerTestBase {
         assertEquals(admin.topics().getStats(topicName).getSubscriptions()
                 .get("sub-1").getUnackedMessages(), 0);
     }
+
+    @Test
+    public void testMetricsPersistentTopicLoadFails() throws Exception {
+        final String namespace = "prop/" + 
UUID.randomUUID().toString().replaceAll("-", "");
+        String topic = "persistent://" + namespace + "/topic1_" + 
UUID.randomUUID();
+        admin.namespaces().createNamespace(namespace);
+        admin.topics().createNonPartitionedTopic(topic);
+        admin.topics().unload(topic);
+
+        // Inject an error that makes the topic load fails.
+        AtomicBoolean failMarker = new AtomicBoolean(true);
+        mockZooKeeper.failConditional(KeeperException.Code.NODEEXISTS, (op, 
path) -> {
+            if (failMarker.get() && op.equals(MockZooKeeper.Op.SET) &&
+                    
path.endsWith(TopicName.get(topic).getPersistenceNamingEncoding())) {
+                return true;
+            }
+            return false;
+        });
+
+        // Do test
+        Thread.sleep(1000 * 3);
+        CompletableFuture<Producer<byte[]>> producer = 
pulsarClient.newProducer().topic(topic).createAsync();
+        JerseyClient httpClient = JerseyClientBuilder.createClient();
+        Awaitility.await().until(() -> {
+            String response = 
httpClient.target(pulsar.getWebServiceAddress()).path("/metrics/")
+                    .request().get(String.class);
+            BufferedReader reader = new BufferedReader(new 
StringReader(response));
+            String line;
+            String metricsLine = null;
+            while ((line = reader.readLine()) != null) {
+                if (StringUtils.isBlank(line)) {
+                    continue;
+                }
+                if (line.startsWith("#")) {
+                    continue;
+                }
+                if (line.contains("topic_load_failed")) {
+                    metricsLine = line;
+                    break;
+                }
+            }
+            log.info("topic_load_failed: {}", metricsLine);
+            if (metricsLine == null) {
+                return false;
+            }
+            reader.close();
+            String[] parts = metricsLine.split(" ");
+            Double value = Double.valueOf(parts[parts.length - 1]);
+            return value >= 1D;
+        });
+
+        // Remove the injection.
+        failMarker.set(false);
+        // cleanup.
+        httpClient.close();
+        producer.join().close();
+        admin.topics().delete(topic);
+        admin.namespaces().deleteNamespace(namespace);
+    }
+
+    @Test
+    public void testMetricsNonPersistentTopicLoadFails() throws Exception {
+        final String namespace = "prop/" + 
UUID.randomUUID().toString().replaceAll("-", "");
+        String topic = "non-persistent://" + namespace + "/topic1_" + 
UUID.randomUUID();
+        admin.namespaces().createNamespace(namespace);
+
+        // Inject an error that makes the topic load fails.
+        pulsar.getConfiguration().setEnableNonPersistentTopics(false);
+
+        // Do test.
+        CompletableFuture<Producer<byte[]>> producer = 
pulsarClient.newProducer().topic(topic).createAsync();
+        JerseyClient httpClient = JerseyClientBuilder.createClient();
+        Awaitility.await().until(() -> {
+            String response = 
httpClient.target(pulsar.getWebServiceAddress()).path("/metrics/")
+                    .request().get(String.class);
+            BufferedReader reader = new BufferedReader(new 
StringReader(response));
+            String line;
+            String metricsLine = null;
+            while ((line = reader.readLine()) != null) {
+                if (StringUtils.isBlank(line)) {
+                    continue;
+                }
+                if (line.startsWith("#")) {
+                    continue;
+                }
+                if (line.contains("topic_load_failed")) {
+                    metricsLine = line;
+                    break;
+                }
+            }
+            log.info("topic_load_failed: {}", metricsLine);
+            if (metricsLine == null) {
+                return false;
+            }
+            reader.close();
+            String[] parts = metricsLine.split(" ");
+            Double value = Double.valueOf(parts[parts.length - 1]);
+            return value >= 1D;
+        });
+
+        // Remove the injection.
+        pulsar.getConfiguration().setEnableNonPersistentTopics(true);
+
+        // cleanup.
+        httpClient.close();
+        try {
+            producer.join().close();
+        } catch (Exception ex) {
+            // The producer creation failed, so skip to close it.
+        }
+        admin.topics().delete(topic);
+        admin.namespaces().deleteNamespace(namespace);
+    }
 }

Reply via email to