This is an automated email from the ASF dual-hosted git repository.

lhotari pushed a commit to branch branch-3.3
in repository https://gitbox.apache.org/repos/asf/pulsar.git

commit 132592223022c9e83377b55f25a4aa21ee7946f5
Author: fengyubiao <[email protected]>
AuthorDate: Sun Aug 18 19:02:23 2024 +0800

    [fix] [broker] Topic can never be loaded up due to broker maintains a 
failed topic creation future (#23184)
    
    (cherry picked from commit 9edaa8569deff9c0cbb41b261fee472603f3df4d)
---
 .../pulsar/broker/service/BrokerService.java       | 15 +++-
 .../client/api/OrphanPersistentTopicTest.java      | 95 ++++++++++++++++++++++
 2 files changed, 106 insertions(+), 4 deletions(-)

diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java
index 4c6d2ec4c5a..7d3c893baa9 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java
@@ -1645,6 +1645,7 @@ public class BrokerService implements Closeable {
                         topicFuture.completeExceptionally(new 
ServiceUnitNotReadyException(msg));
                     }
                 }).exceptionally(ex -> {
+                    pulsar.getExecutor().execute(() -> topics.remove(topic, 
topicFuture));
                     topicFuture.completeExceptionally(ex);
                     return null;
                 });
@@ -1748,10 +1749,16 @@ public class BrokerService implements Closeable {
                                             long topicLoadLatencyMs = 
TimeUnit.NANOSECONDS.toMillis(System.nanoTime())
                                                                         - 
topicCreateTimeMs;
                                             
pulsarStats.recordTopicLoadTimeValue(topic, topicLoadLatencyMs);
-                                            if 
(topicFuture.isCompletedExceptionally()) {
+                                            if 
(!topicFuture.complete(Optional.of(persistentTopic))) {
                                                 // Check create persistent 
topic timeout.
-                                                log.warn("{} future is already 
completed with failure {}, closing the"
-                                                        + " topic", topic, 
FutureUtil.getException(topicFuture));
+                                                if 
(topicFuture.isCompletedExceptionally()) {
+                                                    log.warn("{} future is 
already completed with failure {}, closing"
+                                                        + " the topic", topic, 
FutureUtil.getException(topicFuture));
+                                                } else {
+                                                    // It should not happen.
+                                                    log.error("{} future is 
already completed by another thread, "
+                                                            + "which is not 
expected. Closing the current one", topic);
+                                                }
                                                 executor().submit(() -> {
                                                     
persistentTopic.close().whenComplete((ignore, ex) -> {
                                                         topics.remove(topic, 
topicFuture);
@@ -1763,7 +1770,6 @@ public class BrokerService implements Closeable {
                                                 });
                                             } else {
                                                 addTopicToStatsMaps(topicName, 
persistentTopic);
-                                                
topicFuture.complete(Optional.of(persistentTopic));
                                             }
                                         })
                                         .exceptionally((ex) -> {
@@ -1792,6 +1798,7 @@ public class BrokerService implements Closeable {
                         public void openLedgerFailed(ManagedLedgerException 
exception, Object ctx) {
                             if (!createIfMissing && exception instanceof 
ManagedLedgerNotFoundException) {
                                 // We were just trying to load a topic and the 
topic doesn't exist
+                                pulsar.getExecutor().execute(() -> 
topics.remove(topic, topicFuture));
                                 loadFuture.completeExceptionally(exception);
                                 topicFuture.complete(Optional.empty());
                             } else {
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/OrphanPersistentTopicTest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/OrphanPersistentTopicTest.java
index d6473efd788..de4456c5df3 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/OrphanPersistentTopicTest.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/OrphanPersistentTopicTest.java
@@ -19,13 +19,17 @@
 package org.apache.pulsar.client.api;
 
 import static 
org.apache.pulsar.broker.service.persistent.PersistentTopic.DEDUPLICATION_CURSOR_NAME;
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.Mockito.doAnswer;
 import static org.testng.Assert.assertEquals;
 import static org.testng.Assert.assertNotNull;
 import static org.testng.Assert.assertTrue;
 import java.lang.reflect.Field;
+import java.util.Collections;
 import java.util.List;
 import java.util.Map;
 import java.util.Optional;
+import java.util.UUID;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicInteger;
@@ -33,6 +37,7 @@ import lombok.SneakyThrows;
 import lombok.extern.slf4j.Slf4j;
 import org.apache.pulsar.broker.BrokerTestUtil;
 import org.apache.pulsar.broker.PulsarService;
+import org.apache.pulsar.broker.namespace.NamespaceService;
 import org.apache.pulsar.broker.service.BrokerService;
 import org.apache.pulsar.broker.service.Topic;
 import org.apache.pulsar.broker.service.TopicPoliciesService;
@@ -47,6 +52,7 @@ import org.awaitility.Awaitility;
 import org.awaitility.reflect.WhiteboxImpl;
 import org.testng.annotations.AfterClass;
 import org.testng.annotations.BeforeClass;
+import org.testng.annotations.DataProvider;
 import org.testng.annotations.Test;
 
 @Slf4j
@@ -219,4 +225,93 @@ public class OrphanPersistentTopicTest extends 
ProducerConsumerBase {
         consumer.close();
         admin.topics().delete(tpName, false);
     }
+
+    @DataProvider(name = "whetherTimeoutOrNot")
+    public Object[][] whetherTimeoutOrNot() {
+        return new Object[][] {
+            {true},
+            {false}
+        };
+    }
+
+    @Test(timeOut = 60 * 1000, dataProvider = "whetherTimeoutOrNot")
+    public void testCheckOwnerShipFails(boolean injectTimeout) throws 
Exception {
+        if (injectTimeout) {
+            pulsar.getConfig().setTopicLoadTimeoutSeconds(5);
+        }
+        String ns = "public" + "/" + 
UUID.randomUUID().toString().replaceAll("-", "");
+        String tpName = BrokerTestUtil.newUniqueName("persistent://" + ns + 
"/tp");
+        admin.namespaces().createNamespace(ns);
+        admin.topics().createNonPartitionedTopic(tpName);
+        admin.namespaces().unload(ns);
+
+        // Inject an error when calling 
"NamespaceService.isServiceUnitActiveAsync".
+        AtomicInteger failedTimes = new AtomicInteger();
+        NamespaceService namespaceService = pulsar.getNamespaceService();
+        doAnswer(invocation -> {
+            TopicName paramTp = (TopicName) invocation.getArguments()[0];
+            if (paramTp.toString().equalsIgnoreCase(tpName) && 
failedTimes.incrementAndGet() <= 2) {
+                if (injectTimeout) {
+                    Thread.sleep(10 * 1000);
+                }
+                log.info("Failed {} times", failedTimes.get());
+                return CompletableFuture.failedFuture(new 
RuntimeException("mocked error"));
+            }
+            return invocation.callRealMethod();
+        
}).when(namespaceService).isServiceUnitActiveAsync(any(TopicName.class));
+
+        // Verify: the consumer can create successfully eventually.
+        Consumer consumer = 
pulsarClient.newConsumer().topic(tpName).subscriptionName("s1").subscribe();
+
+        // cleanup.
+        if (injectTimeout) {
+            pulsar.getConfig().setTopicLoadTimeoutSeconds(60);
+        }
+        consumer.close();
+        admin.topics().delete(tpName);
+    }
+
+    @Test(timeOut = 60 * 1000, dataProvider = "whetherTimeoutOrNot")
+    public void testTopicLoadAndDeleteAtTheSameTime(boolean injectTimeout) 
throws Exception {
+        if (injectTimeout) {
+            pulsar.getConfig().setTopicLoadTimeoutSeconds(5);
+        }
+        String ns = "public" + "/" + 
UUID.randomUUID().toString().replaceAll("-", "");
+        String tpName = BrokerTestUtil.newUniqueName("persistent://" + ns + 
"/tp");
+        admin.namespaces().createNamespace(ns);
+        admin.topics().createNonPartitionedTopic(tpName);
+        admin.namespaces().unload(ns);
+
+        // Inject a race condition: load topic and delete topic execute at the 
same time.
+        AtomicInteger mockRaceConditionCounter = new AtomicInteger();
+        NamespaceService namespaceService = pulsar.getNamespaceService();
+        doAnswer(invocation -> {
+            TopicName paramTp = (TopicName) invocation.getArguments()[0];
+            if (paramTp.toString().equalsIgnoreCase(tpName) && 
mockRaceConditionCounter.incrementAndGet() <= 1) {
+                if (injectTimeout) {
+                    Thread.sleep(10 * 1000);
+                }
+                log.info("Race condition occurs {} times", 
mockRaceConditionCounter.get());
+                
pulsar.getManagedLedgerFactory().delete(TopicName.get(tpName).getPersistenceNamingEncoding());
+            }
+            return invocation.callRealMethod();
+        
}).when(namespaceService).isServiceUnitActiveAsync(any(TopicName.class));
+
+        // Verify: the consumer create failed due to pulsar does not allow to 
create topic automatically.
+        try {
+            pulsar.getBrokerService().getTopic(tpName, false, 
Collections.emptyMap()).join();
+        } catch (Exception ex) {
+            log.warn("Expected error", ex);
+        }
+
+        // Verify: the consumer create successfully after allowing to create 
topic automatically.
+        Consumer consumer = 
pulsarClient.newConsumer().topic(tpName).subscriptionName("s1").subscribe();
+
+        // cleanup.
+        if (injectTimeout) {
+            pulsar.getConfig().setTopicLoadTimeoutSeconds(60);
+        }
+        consumer.close();
+        admin.topics().delete(tpName);
+    }
 }

Reply via email to