This is an automated email from the ASF dual-hosted git repository. yubiao pushed a commit to branch branch-3.0 in repository https://gitbox.apache.org/repos/asf/pulsar.git
commit 053c455c6871071c6918f4fa9119e61ac06b0fb1 Author: fengyubiao <[email protected]> AuthorDate: Mon Apr 29 20:46:43 2024 +0800 [fix] [broker] Fix metrics pulsar_topic_load_failed_count is 0 when load non-persistent topic fails and fix the flaky test testBrokerStatsTopicLoadFailed (#22580) (cherry picked from commit 340d60df0be32ed26586f292a8d24a8a6663aba2) --- .../pulsar/broker/service/BrokerService.java | 3 +- .../pulsar/broker/service/BrokerServiceTest.java | 182 +++++++++++++-------- 2 files changed, 118 insertions(+), 67 deletions(-) diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java index 7c4616d9b92..9d0fea9207f 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java @@ -1308,7 +1308,8 @@ public class BrokerService implements Closeable { nonPersistentTopic = newTopic(topic, null, this, NonPersistentTopic.class); } catch (Throwable e) { log.warn("Failed to create topic {}", topic, e); - return FutureUtil.failedFuture(e); + topicFuture.completeExceptionally(e); + return topicFuture; } CompletableFuture<Void> isOwner = checkTopicNsOwnership(topic); isOwner.thenRun(() -> { diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/BrokerServiceTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/BrokerServiceTest.java index 935fecc4536..699e4ebfb40 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/BrokerServiceTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/BrokerServiceTest.java @@ -20,20 +20,23 @@ package org.apache.pulsar.broker.service; import static org.apache.pulsar.common.naming.SystemTopicNames.TRANSACTION_COORDINATOR_ASSIGN; import static org.apache.pulsar.common.naming.SystemTopicNames.TRANSACTION_COORDINATOR_LOG; +import static org.mockito.ArgumentMatchers.anyString; import static org.mockito.Mockito.any; import static org.mockito.Mockito.doReturn; +import static org.mockito.Mockito.mock; import static org.mockito.Mockito.spy; +import static org.mockito.Mockito.when; import static org.testng.Assert.assertEquals; import static org.testng.Assert.assertFalse; import static org.testng.Assert.assertNotNull; import static org.testng.Assert.assertNull; import static org.testng.Assert.assertTrue; import static org.testng.Assert.fail; +import com.google.common.collect.Multimap; import com.google.common.collect.Sets; import com.google.gson.Gson; import com.google.gson.JsonArray; import com.google.gson.JsonObject; -import com.google.gson.JsonPrimitive; import io.netty.buffer.ByteBuf; import io.netty.channel.EventLoopGroup; import io.netty.util.concurrent.DefaultThreadFactory; @@ -41,6 +44,7 @@ import java.io.BufferedReader; import java.io.IOException; import java.io.InputStream; import java.io.InputStreamReader; +import java.io.StringReader; import java.lang.reflect.Field; import java.nio.charset.StandardCharsets; import java.util.ArrayList; @@ -69,6 +73,7 @@ import org.apache.bookkeeper.mledger.ManagedLedgerException; import org.apache.bookkeeper.mledger.impl.ManagedCursorImpl; import org.apache.bookkeeper.mledger.impl.ManagedLedgerFactoryImpl; import org.apache.bookkeeper.mledger.impl.ManagedLedgerImpl; +import org.apache.commons.lang3.StringUtils; import org.apache.http.HttpResponse; import org.apache.http.client.HttpClient; import org.apache.http.client.methods.HttpGet; @@ -112,7 +117,12 @@ import org.apache.pulsar.common.policies.data.TopicStats; import org.apache.pulsar.common.protocol.Commands; import org.apache.pulsar.common.util.netty.EventLoopUtil; import org.apache.pulsar.compaction.Compactor; +import org.apache.pulsar.functions.worker.PulsarFunctionTestUtils; +import org.apache.zookeeper.KeeperException; +import org.apache.zookeeper.MockZooKeeper; import org.awaitility.Awaitility; +import org.glassfish.jersey.client.JerseyClient; +import org.glassfish.jersey.client.JerseyClientBuilder; import org.mockito.Mockito; import org.testng.Assert; import org.testng.annotations.AfterClass; @@ -1649,79 +1659,119 @@ public class BrokerServiceTest extends BrokerTestBase { } @Test - public void testBrokerStatsTopicLoadFailed() throws Exception { - admin.namespaces().createNamespace("prop/ns-test"); - - String persistentTopic = "persistent://prop/ns-test/topic1_" + UUID.randomUUID(); - String nonPersistentTopic = "non-persistent://prop/ns-test/topic2_" + UUID.randomUUID(); + public void testMetricsPersistentTopicLoadFails() throws Exception { + final String namespace = "prop/" + UUID.randomUUID().toString().replaceAll("-", ""); + String topic = "persistent://" + namespace + "/topic1_" + UUID.randomUUID(); + admin.namespaces().createNamespace(namespace); + admin.topics().createNonPartitionedTopic(topic); + admin.topics().unload(topic); + + // Inject an error that makes the topic load fails. + AtomicBoolean failMarker = new AtomicBoolean(true); + mockZooKeeper.failConditional(KeeperException.Code.NODEEXISTS, (op, path) -> { + if (failMarker.get() && op.equals(MockZooKeeper.Op.SET) && + path.endsWith(TopicName.get(topic).getPersistenceNamingEncoding())) { + return true; + } + return false; + }); - BrokerService brokerService = pulsar.getBrokerService(); - brokerService = Mockito.spy(brokerService); - // mock create persistent topic failed - Mockito - .doAnswer(invocation -> { - CompletableFuture<ManagedLedgerConfig> f = new CompletableFuture<>(); - f.completeExceptionally(new RuntimeException("This is an exception")); - return f; - }) - .when(brokerService).getManagedLedgerConfig(Mockito.eq(TopicName.get(persistentTopic))); - - // mock create non-persistent topic failed - Mockito - .doAnswer(inv -> { - CompletableFuture<Void> f = new CompletableFuture<>(); - f.completeExceptionally(new RuntimeException("This is an exception")); - return f; - }) - .when(brokerService).checkTopicNsOwnership(Mockito.eq(nonPersistentTopic)); - - - PulsarService pulsarService = pulsar; - Field field = PulsarService.class.getDeclaredField("brokerService"); - field.setAccessible(true); - field.set(pulsarService, brokerService); - - CompletableFuture<Producer<String>> producer = pulsarClient.newProducer(Schema.STRING) - .topic(persistentTopic) - .createAsync(); - CompletableFuture<Producer<String>> producer1 = pulsarClient.newProducer(Schema.STRING) - .topic(nonPersistentTopic) - .createAsync(); - - producer.whenComplete((v, t) -> { - if (t == null) { - try { - v.close(); - } catch (PulsarClientException e) { - // ignore + // Do test + CompletableFuture<Producer<byte[]>> producer = pulsarClient.newProducer().topic(topic).createAsync(); + JerseyClient httpClient = JerseyClientBuilder.createClient(); + Awaitility.await().until(() -> { + String response = httpClient.target(pulsar.getWebServiceAddress()).path("/metrics/") + .request().get(String.class); + BufferedReader reader = new BufferedReader(new StringReader(response)); + String line; + String metricsLine = null; + while ((line = reader.readLine()) != null) { + if (StringUtils.isBlank(line)) { + continue; } + if (line.startsWith("#")) { + continue; + } + if (line.contains("topic_load_failed")) { + metricsLine = line; + break; + } + } + log.info("topic_load_failed: {}", metricsLine); + if (metricsLine == null) { + return false; } + reader.close(); + String[] parts = metricsLine.split(" "); + Double value = Double.valueOf(parts[parts.length - 1]); + return value >= 1D; }); - producer1.whenComplete((v, t) -> { - if (t == null) { - try { - v.close(); - } catch (PulsarClientException e) { - // ignore + + // Remove the injection. + failMarker.set(false); + // cleanup. + httpClient.close(); + producer.join().close(); + admin.topics().delete(topic); + admin.namespaces().deleteNamespace(namespace); + } + + @Test + public void testMetricsNonPersistentTopicLoadFails() throws Exception { + final String namespace = "prop/" + UUID.randomUUID().toString().replaceAll("-", ""); + String topic = "non-persistent://" + namespace + "/topic1_" + UUID.randomUUID(); + admin.namespaces().createNamespace(namespace); + + // Inject an error that makes the topic load fails. + // Since we did not set a topic factory name, the "topicFactory" variable is null, inject a mocked + // "topicFactory". + Field fieldTopicFactory = BrokerService.class.getDeclaredField("topicFactory"); + fieldTopicFactory.setAccessible(true); + TopicFactory originalTopicFactory = (TopicFactory) fieldTopicFactory.get(pulsar.getBrokerService()); + assertNull(originalTopicFactory); + TopicFactory mockedTopicFactory = mock(TopicFactory.class); + when(mockedTopicFactory.create(anyString(), any(), any(), any())) + .thenThrow(new RuntimeException("mocked error")); + fieldTopicFactory.set(pulsar.getBrokerService(), mockedTopicFactory); + + // Do test. + CompletableFuture<Producer<byte[]>> producer = pulsarClient.newProducer().topic(topic).createAsync(); + JerseyClient httpClient = JerseyClientBuilder.createClient(); + Awaitility.await().until(() -> { + String response = httpClient.target(pulsar.getWebServiceAddress()).path("/metrics/") + .request().get(String.class); + BufferedReader reader = new BufferedReader(new StringReader(response)); + String line; + String metricsLine = null; + while ((line = reader.readLine()) != null) { + if (StringUtils.isBlank(line)) { + continue; + } + if (line.startsWith("#")) { + continue; } + if (line.contains("topic_load_failed")) { + metricsLine = line; + break; + } + } + log.info("topic_load_failed: {}", metricsLine); + if (metricsLine == null) { + return false; } + reader.close(); + String[] parts = metricsLine.split(" "); + Double value = Double.valueOf(parts[parts.length - 1]); + return value >= 1D; }); - Awaitility.waitAtMost(2, TimeUnit.MINUTES).until(() -> { - String json = admin.brokerStats().getMetrics(); - JsonArray metrics = new Gson().fromJson(json, JsonArray.class); - AtomicBoolean flag = new AtomicBoolean(false); - - metrics.forEach(ele -> { - JsonObject obj = ((JsonObject) ele); - JsonObject metrics0 = (JsonObject) obj.get("metrics"); - JsonPrimitive v = (JsonPrimitive) metrics0.get("brk_topic_load_failed_count"); - if (null != v && v.getAsDouble() >= 2D) { - flag.set(true); - } - }); + // Remove the injection. + fieldTopicFactory.set(pulsar.getBrokerService(), null); - return flag.get(); - }); + // cleanup. + httpClient.close(); + producer.join().close(); + admin.topics().delete(topic); + admin.namespaces().deleteNamespace(namespace); } }
