This is an automated email from the ASF dual-hosted git repository.

lhotari pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/master by this push:
     new 340d60df0be [fix] [broker] Fix metrics pulsar_topic_load_failed_count 
is 0 when load non-persistent topic fails and fix the flaky test 
testBrokerStatsTopicLoadFailed (#22580)
340d60df0be is described below

commit 340d60df0be32ed26586f292a8d24a8a6663aba2
Author: fengyubiao <[email protected]>
AuthorDate: Mon Apr 29 20:46:43 2024 +0800

    [fix] [broker] Fix metrics pulsar_topic_load_failed_count is 0 when load 
non-persistent topic fails and fix the flaky test 
testBrokerStatsTopicLoadFailed (#22580)
---
 .../pulsar/broker/service/BrokerService.java       |   3 +-
 .../pulsar/broker/service/BrokerServiceTest.java   | 159 ++++++++++++---------
 2 files changed, 91 insertions(+), 71 deletions(-)

diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java
index 1f0cb12258e..b08b1a472ca 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java
@@ -1265,7 +1265,8 @@ public class BrokerService implements Closeable {
             nonPersistentTopic = newTopic(topic, null, this, 
NonPersistentTopic.class);
         } catch (Throwable e) {
             log.warn("Failed to create topic {}", topic, e);
-            return FutureUtil.failedFuture(e);
+            topicFuture.completeExceptionally(e);
+            return topicFuture;
         }
         CompletableFuture<Void> isOwner = checkTopicNsOwnership(topic);
         isOwner.thenRun(() -> {
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/BrokerServiceTest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/BrokerServiceTest.java
index 8ebba5c9aea..5fbe1476380 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/BrokerServiceTest.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/BrokerServiceTest.java
@@ -20,20 +20,23 @@ package org.apache.pulsar.broker.service;
 
 import static 
org.apache.pulsar.common.naming.SystemTopicNames.TRANSACTION_COORDINATOR_ASSIGN;
 import static 
org.apache.pulsar.common.naming.SystemTopicNames.TRANSACTION_COORDINATOR_LOG;
+import static org.mockito.ArgumentMatchers.anyString;
 import static org.mockito.Mockito.any;
 import static org.mockito.Mockito.doReturn;
+import static org.mockito.Mockito.mock;
 import static org.mockito.Mockito.spy;
+import static org.mockito.Mockito.when;
 import static org.testng.Assert.assertEquals;
 import static org.testng.Assert.assertFalse;
 import static org.testng.Assert.assertNotNull;
 import static org.testng.Assert.assertNull;
 import static org.testng.Assert.assertTrue;
 import static org.testng.Assert.fail;
+import com.google.common.collect.Multimap;
 import com.google.common.collect.Sets;
 import com.google.gson.Gson;
 import com.google.gson.JsonArray;
 import com.google.gson.JsonObject;
-import com.google.gson.JsonPrimitive;
 import io.netty.buffer.ByteBuf;
 import io.netty.channel.EventLoopGroup;
 import io.netty.util.concurrent.DefaultThreadFactory;
@@ -79,6 +82,7 @@ import org.apache.pulsar.broker.namespace.NamespaceService;
 import 
org.apache.pulsar.broker.service.BrokerServiceException.PersistenceException;
 import org.apache.pulsar.broker.service.persistent.PersistentSubscription;
 import org.apache.pulsar.broker.service.persistent.PersistentTopic;
+import org.apache.pulsar.broker.stats.prometheus.PrometheusMetricsClient;
 import org.apache.pulsar.broker.stats.prometheus.PrometheusRawMetricsProvider;
 import org.apache.pulsar.client.admin.BrokerStats;
 import org.apache.pulsar.client.admin.PulsarAdminException;
@@ -113,7 +117,11 @@ import org.apache.pulsar.common.policies.data.TopicStats;
 import org.apache.pulsar.common.protocol.Commands;
 import org.apache.pulsar.common.util.netty.EventLoopUtil;
 import org.apache.pulsar.compaction.Compactor;
+import org.apache.zookeeper.KeeperException;
+import org.apache.zookeeper.MockZooKeeper;
 import org.awaitility.Awaitility;
+import org.glassfish.jersey.client.JerseyClient;
+import org.glassfish.jersey.client.JerseyClientBuilder;
 import org.mockito.Mockito;
 import org.testng.Assert;
 import org.testng.annotations.AfterClass;
@@ -1589,82 +1597,93 @@ public class BrokerServiceTest extends BrokerTestBase {
         });
     }
 
-    // this test is disabled since it is flaky
-    @Test(enabled = false)
-    public void testBrokerStatsTopicLoadFailed() throws Exception {
-        admin.namespaces().createNamespace("prop/ns-test");
-
-        String persistentTopic = "persistent://prop/ns-test/topic1_" + 
UUID.randomUUID();
-        String nonPersistentTopic = "non-persistent://prop/ns-test/topic2_" + 
UUID.randomUUID();
-
-        BrokerService brokerService = pulsar.getBrokerService();
-        brokerService = Mockito.spy(brokerService);
-        // mock create persistent topic failed
-        Mockito
-                .doAnswer(invocation -> {
-                    CompletableFuture<ManagedLedgerConfig> f = new 
CompletableFuture<>();
-                    f.completeExceptionally(new RuntimeException("This is an 
exception"));
-                    return f;
-                })
-                
.when(brokerService).getManagedLedgerConfig(Mockito.eq(TopicName.get(persistentTopic)));
-
-        // mock create non-persistent topic failed
-        Mockito
-                .doAnswer(inv -> {
-                    CompletableFuture<Void> f = new CompletableFuture<>();
-                    f.completeExceptionally(new RuntimeException("This is an 
exception"));
-                    return f;
-                })
-                
.when(brokerService).checkTopicNsOwnership(Mockito.eq(nonPersistentTopic));
-
-
-        PulsarService pulsarService = pulsar;
-        Field field = PulsarService.class.getDeclaredField("brokerService");
-        field.setAccessible(true);
-        field.set(pulsarService, brokerService);
-
-        CompletableFuture<Producer<String>> producer = 
pulsarClient.newProducer(Schema.STRING)
-                .topic(persistentTopic)
-                .createAsync();
-        CompletableFuture<Producer<String>> producer1 = 
pulsarClient.newProducer(Schema.STRING)
-                .topic(nonPersistentTopic)
-                .createAsync();
-
-        producer.whenComplete((v, t) -> {
-            if (t == null) {
-                try {
-                    v.close();
-                } catch (PulsarClientException e) {
-                    // ignore
-                }
+    @Test
+    public void testMetricsPersistentTopicLoadFails() throws Exception {
+        final String namespace = "prop/" + 
UUID.randomUUID().toString().replaceAll("-", "");
+        String topic = "persistent://" + namespace + "/topic1_" + 
UUID.randomUUID();
+        admin.namespaces().createNamespace(namespace);
+        admin.topics().createNonPartitionedTopic(topic);
+        admin.topics().unload(topic);
+
+        // Inject an error that makes the topic load fails.
+        AtomicBoolean failMarker = new AtomicBoolean(true);
+        mockZooKeeper.failConditional(KeeperException.Code.NODEEXISTS, (op, 
path) -> {
+            if (failMarker.get() && op.equals(MockZooKeeper.Op.SET) &&
+                    
path.endsWith(TopicName.get(topic).getPersistenceNamingEncoding())) {
+                return true;
             }
+            return false;
         });
-        producer1.whenComplete((v, t) -> {
-            if (t == null) {
-                try {
-                    v.close();
-                } catch (PulsarClientException e) {
-                    // ignore
-                }
+
+        // Do test
+        CompletableFuture<Producer<byte[]>> producer = 
pulsarClient.newProducer().topic(topic).createAsync();
+        JerseyClient httpClient = JerseyClientBuilder.createClient();
+        Awaitility.await().until(() -> {
+            String response = 
httpClient.target(pulsar.getWebServiceAddress()).path("/metrics/")
+                    .request().get(String.class);
+            Multimap<String, PrometheusMetricsClient.Metric> metricMap = 
PrometheusMetricsClient.parseMetrics(response);
+            if (!metricMap.containsKey("pulsar_topic_load_failed_count")) {
+                return false;
+            }
+            double topic_load_failed_count = 0;
+            for (PrometheusMetricsClient.Metric metric : 
metricMap.get("pulsar_topic_load_failed_count")) {
+                topic_load_failed_count += metric.value;
             }
+            return topic_load_failed_count >= 1D;
         });
 
-        Awaitility.waitAtMost(2, TimeUnit.MINUTES).until(() -> {
-            String json = admin.brokerStats().getMetrics();
-            JsonArray metrics = new Gson().fromJson(json, JsonArray.class);
-            AtomicBoolean flag = new AtomicBoolean(false);
-
-            metrics.forEach(ele -> {
-                JsonObject obj = ((JsonObject) ele);
-                JsonObject metrics0 = (JsonObject) obj.get("metrics");
-                JsonPrimitive v = (JsonPrimitive) 
metrics0.get("brk_topic_load_failed_count");
-                if (null != v && v.getAsDouble() >= 2D) {
-                    flag.set(true);
-                }
-            });
+        // Remove the injection.
+        failMarker.set(false);
+        // cleanup.
+        httpClient.close();
+        producer.join().close();
+        admin.topics().delete(topic);
+        admin.namespaces().deleteNamespace(namespace);
+    }
 
-            return flag.get();
+    @Test
+    public void testMetricsNonPersistentTopicLoadFails() throws Exception {
+        final String namespace = "prop/" + 
UUID.randomUUID().toString().replaceAll("-", "");
+        String topic = "non-persistent://" + namespace + "/topic1_" + 
UUID.randomUUID();
+        admin.namespaces().createNamespace(namespace);
+
+        // Inject an error that makes the topic load fails.
+        // Since we did not set a topic factory name, the "topicFactory" 
variable is null, inject a mocked
+        // "topicFactory".
+        Field fieldTopicFactory = 
BrokerService.class.getDeclaredField("topicFactory");
+        fieldTopicFactory.setAccessible(true);
+        TopicFactory originalTopicFactory = (TopicFactory) 
fieldTopicFactory.get(pulsar.getBrokerService());
+        assertNull(originalTopicFactory);
+        TopicFactory mockedTopicFactory = mock(TopicFactory.class);
+        when(mockedTopicFactory.create(anyString(), any(), any(), any()))
+                .thenThrow(new RuntimeException("mocked error"));
+        fieldTopicFactory.set(pulsar.getBrokerService(), mockedTopicFactory);
+
+        // Do test.
+        CompletableFuture<Producer<byte[]>> producer = 
pulsarClient.newProducer().topic(topic).createAsync();
+        JerseyClient httpClient = JerseyClientBuilder.createClient();
+        Awaitility.await().until(() -> {
+            String response = 
httpClient.target(pulsar.getWebServiceAddress()).path("/metrics/")
+                    .request().get(String.class);
+            Multimap<String, PrometheusMetricsClient.Metric> metricMap = 
PrometheusMetricsClient.parseMetrics(response);
+            if (!metricMap.containsKey("pulsar_topic_load_failed_count")) {
+                return false;
+            }
+            double topic_load_failed_count = 0;
+            for (PrometheusMetricsClient.Metric metric : 
metricMap.get("pulsar_topic_load_failed_count")) {
+                topic_load_failed_count += metric.value;
+            }
+            return topic_load_failed_count >= 1D;
         });
+
+        // Remove the injection.
+        fieldTopicFactory.set(pulsar.getBrokerService(), null);
+
+        // cleanup.
+        httpClient.close();
+        producer.join().close();
+        admin.topics().delete(topic);
+        admin.namespaces().deleteNamespace(namespace);
     }
 
     @Test

Reply via email to