This is an automated email from the ASF dual-hosted git repository.

yubiao pushed a commit to branch branch-2.11
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/branch-2.11 by this push:
     new e220279f4f1 [fix][broker][branch-2.11] The topic might reference a 
closed ledger (#22860) (#23054)
e220279f4f1 is described below

commit e220279f4f1d02c9658ddbdf95a737d5baef9a60
Author: fengyubiao <[email protected]>
AuthorDate: Mon Jul 22 17:33:40 2024 +0800

    [fix][broker][branch-2.11] The topic might reference a closed ledger 
(#22860) (#23054)
---
 .../org/apache/pulsar/broker/PulsarService.java    |  5 ++
 .../pulsar/broker/service/BrokerService.java       | 13 ++++
 .../client/api/OrphanPersistentTopicTest.java      | 71 ++++++++++++++++++++++
 3 files changed, 89 insertions(+)

diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java
index 15704d49ad3..1aac0ba5e7c 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java
@@ -1836,4 +1836,9 @@ public class PulsarService implements AutoCloseable, 
ShutdownService {
     protected BrokerService newBrokerService(PulsarService pulsar) throws 
Exception {
         return new BrokerService(pulsar, ioEventLoopGroup);
     }
+
+    @VisibleForTesting
+    public void setTransactionExecutorProvider(TransactionBufferProvider 
transactionBufferProvider) {
+        this.transactionBufferProvider = transactionBufferProvider;
+    }
 }
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java
index 090d5ce0b54..c0da47755b6 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java
@@ -988,6 +988,17 @@ public class BrokerService implements Closeable {
         try {
             CompletableFuture<Optional<Topic>> topicFuture = 
topics.get(topicName.toString());
             if (topicFuture != null) {
+                if (topicFuture.isCompletedExceptionally()) {
+                    try {
+                        topicFuture.join();
+                    } catch (Exception ex) {
+                        Throwable actEx = 
FutureUtil.unwrapCompletionException(ex);
+                        if (actEx == FAILED_TO_LOAD_TOPIC_TIMEOUT_EXCEPTION) {
+                            return CompletableFuture.failedFuture(new 
TimeoutException("The previous loading task"
+                                    + " has not finished yet even through it 
has timeout, please retry again."));
+                        }
+                    }
+                }
                 if (topicFuture.isCompletedExceptionally()
                         || (topicFuture.isDone() && 
!topicFuture.getNow(Optional.empty()).isPresent())) {
                     // Exceptional topics should be recreated.
@@ -1608,6 +1619,7 @@ public class BrokerService implements Closeable {
                                                         + " topic", topic, 
FutureUtil.getException(topicFuture));
                                                 executor().submit(() -> {
                                                     
persistentTopic.close().whenComplete((ignore, ex) -> {
+                                                        topics.remove(topic, 
topicFuture);
                                                         if (ex != null) {
                                                             log.warn("[{}] Get 
an error when closing topic.",
                                                                     topic, ex);
@@ -1645,6 +1657,7 @@ public class BrokerService implements Closeable {
                             if (!createIfMissing && exception instanceof 
ManagedLedgerNotFoundException) {
                                 // We were just trying to load a topic and the 
topic doesn't exist
                                 topicFuture.complete(Optional.empty());
+                                pulsar.getExecutor().execute(() -> 
topics.remove(topic, topicFuture));
                             } else {
                                 log.warn("Failed to create topic {}", topic, 
exception);
                                 pulsar.getExecutor().execute(() -> 
topics.remove(topic, topicFuture));
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/OrphanPersistentTopicTest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/OrphanPersistentTopicTest.java
index 4f5ab783374..76e4be48b2f 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/OrphanPersistentTopicTest.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/OrphanPersistentTopicTest.java
@@ -18,6 +18,8 @@
  */
 package org.apache.pulsar.client.api;
 
+import static 
org.apache.pulsar.broker.service.persistent.PersistentTopic.DEDUPLICATION_CURSOR_NAME;
+import static org.junit.Assert.assertNotNull;
 import static org.testng.Assert.assertEquals;
 import static org.testng.Assert.assertTrue;
 import java.util.List;
@@ -25,13 +27,18 @@ import java.util.Map;
 import java.util.Optional;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.AtomicInteger;
+import lombok.SneakyThrows;
 import lombok.extern.slf4j.Slf4j;
 import org.apache.pulsar.broker.BrokerTestUtil;
 import org.apache.pulsar.broker.service.BrokerService;
 import org.apache.pulsar.broker.service.Topic;
 import org.apache.pulsar.broker.service.TopicPoliciesService;
 import org.apache.pulsar.broker.service.TopicPolicyListener;
+import org.apache.pulsar.broker.transaction.buffer.TransactionBuffer;
+import org.apache.pulsar.broker.transaction.buffer.TransactionBufferProvider;
+import 
org.apache.pulsar.broker.transaction.buffer.impl.TransactionBufferDisable;
 import org.apache.pulsar.common.naming.TopicName;
 import org.apache.pulsar.common.policies.data.TopicPolicies;
 import org.awaitility.Awaitility;
@@ -114,4 +121,68 @@ public class OrphanPersistentTopicTest extends 
ProducerConsumerBase {
         admin.topics().delete(tpName, false);
         
pulsar.getConfig().setTopicLoadTimeoutSeconds(originalTopicLoadTimeoutSeconds);
     }
+
+    @Test
+    public void testCloseLedgerThatTopicAfterCreateTimeout() throws Exception {
+        // Make the topic loading timeout faster.
+        long originalTopicLoadTimeoutSeconds = 
pulsar.getConfig().getTopicLoadTimeoutSeconds();
+        int topicLoadTimeoutSeconds = 1;
+        pulsar.getConfig().setTopicLoadTimeoutSeconds(topicLoadTimeoutSeconds);
+        pulsar.getConfig().setBrokerDeduplicationEnabled(true);
+        pulsar.getConfig().setTransactionCoordinatorEnabled(true);
+        String tpName = 
BrokerTestUtil.newUniqueName("persistent://public/default/tp2");
+
+        // Mock message deduplication recovery speed topicLoadTimeoutSeconds
+        AtomicBoolean stopDelay = new AtomicBoolean();
+        String mlPath = BrokerService.MANAGED_LEDGER_PATH_ZNODE + "/" +
+                TopicName.get(tpName).getPersistenceNamingEncoding() + "/" + 
DEDUPLICATION_CURSOR_NAME;
+        mockZooKeeper.delay(topicLoadTimeoutSeconds * 1000, (op, path) -> {
+            if (mlPath.equals(path) && !stopDelay.get()) {
+                log.info("Topic load timeout: " + path);
+                return true;
+            }
+            return false;
+        });
+
+        // First load topic will trigger timeout
+        // The first topic load will trigger a timeout. When the topic closes, 
it will call transactionBuffer.close.
+        // Here, we simulate a sleep to ensure that the ledger is not 
immediately closed.
+        TransactionBufferProvider mockTransactionBufferProvider = new 
TransactionBufferProvider() {
+            @Override
+            public TransactionBuffer newTransactionBuffer(Topic originTopic) {
+                return new TransactionBufferDisable(originTopic) {
+                    @SneakyThrows
+                    @Override
+                    public CompletableFuture<Void> closeAsync() {
+                        Thread.sleep(500);
+                        return super.closeAsync();
+                    }
+                };
+            }
+        };
+        TransactionBufferProvider originalTransactionBufferProvider = 
pulsar.getTransactionBufferProvider();
+        pulsar.setTransactionExecutorProvider(mockTransactionBufferProvider);
+        CompletableFuture<Optional<Topic>> firstLoad = 
pulsar.getBrokerService().getTopic(tpName, true);
+        Awaitility.await().ignoreExceptions().atMost(5, TimeUnit.SECONDS)
+                .pollInterval(100, TimeUnit.MILLISECONDS)
+                // assert first create topic timeout
+                .untilAsserted(() -> {
+                    assertTrue(firstLoad.isCompletedExceptionally());
+                });
+
+        // Once the first load topic times out, immediately to load the topic 
again.
+        stopDelay.set(true);
+        Producer<byte[]> producer = 
pulsarClient.newProducer().topic(tpName).create();
+        for (int i = 0; i < 10; i++) {
+            MessageId send = producer.send("msg".getBytes());
+            Thread.sleep(100);
+            assertNotNull(send);
+        }
+
+        // set to back
+        
pulsar.setTransactionExecutorProvider(originalTransactionBufferProvider);
+        
pulsar.getConfig().setTopicLoadTimeoutSeconds(originalTopicLoadTimeoutSeconds);
+        pulsar.getConfig().setBrokerDeduplicationEnabled(false);
+        pulsar.getConfig().setTransactionCoordinatorEnabled(false);
+    }
 }

Reply via email to