This is an automated email from the ASF dual-hosted git repository.

lhotari pushed a commit to branch branch-3.2
in repository https://gitbox.apache.org/repos/asf/pulsar.git

commit 7b3d29a9b11c9ab3df5d50cbfc2a138c0bd2c957
Author: Baodi Shi <[email protected]>
AuthorDate: Thu Jun 13 16:49:05 2024 +0800

    [fix][broker] The topic might reference a closed ledger (#22860)
    
    (cherry picked from commit a91a172b4ee6d8b974a3fa905e435975557fcc57)
---
 .../org/apache/pulsar/broker/PulsarService.java    |   5 +
 .../pulsar/broker/service/BrokerService.java       | 155 ++++++++++-----------
 .../pulsar/broker/service/ReplicatorTest.java      |  10 +-
 .../client/api/OrphanPersistentTopicTest.java      |  68 +++++++++
 4 files changed, 151 insertions(+), 87 deletions(-)

diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java
index 47842e4722e..60af2a6f127 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java
@@ -1896,6 +1896,11 @@ public class PulsarService implements AutoCloseable, 
ShutdownService {
         return new BrokerService(pulsar, ioEventLoopGroup);
     }
 
+    @VisibleForTesting
+    public void setTransactionExecutorProvider(TransactionBufferProvider 
transactionBufferProvider) {
+        this.transactionBufferProvider = transactionBufferProvider;
+    }
+
     private CompactionServiceFactory loadCompactionServiceFactory() {
         String compactionServiceFactoryClassName = 
config.getCompactionServiceFactoryClassName();
         var compactionServiceFactory =
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java
index 5839ee30eb2..5ce9519d9c0 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java
@@ -942,38 +942,38 @@ public class BrokerService implements Closeable {
         return getTopic(TopicName.get(topic), createIfMissing, properties);
     }
 
+    /**
+     * Retrieves or creates a topic based on the specified parameters.
+     * 0. If disable PersistentTopics or NonPersistentTopics, it will return a 
failed future with NotAllowedException.
+     * 1. If topic future exists in the cache returned directly regardless of 
whether it fails or timeout.
+     * 2. If the topic metadata exists, the topic is created regardless of 
{@code createIfMissing}.
+     * 3. If the topic metadata not exists, and {@code createIfMissing} is 
false,
+     *    returns an empty Optional in a CompletableFuture. And this empty 
future not be added to the map.
+     * 4. Otherwise, use computeIfAbsent. It returns the existing topic or 
creates and adds a new topicFuture.
+     *    Any exceptions will remove the topicFuture from the map.
+     *
+     * @param topicName The name of the topic, potentially including partition 
information.
+     * @param createIfMissing If true, creates the topic if it does not exist.
+     * @param properties Topic configuration properties used during creation.
+     * @return CompletableFuture with an Optional of the topic if found or 
created, otherwise empty.
+     */
     public CompletableFuture<Optional<Topic>> getTopic(final TopicName 
topicName, boolean createIfMissing,
                                                        Map<String, String> 
properties) {
         try {
-            CompletableFuture<Optional<Topic>> topicFuture = 
topics.get(topicName.toString());
-            if (topicFuture != null) {
-                if (topicFuture.isCompletedExceptionally()
-                        || (topicFuture.isDone() && 
!topicFuture.getNow(Optional.empty()).isPresent())) {
-                    // Exceptional topics should be recreated.
-                    topics.remove(topicName.toString(), topicFuture);
-                } else {
-                    // a non-existing topic in the cache shouldn't prevent 
creating a topic
-                    if (createIfMissing) {
-                        if (topicFuture.isDone() && 
topicFuture.getNow(Optional.empty()).isPresent()) {
-                            return topicFuture;
-                        } else {
-                            return topicFuture.thenCompose(value -> {
-                                if (!value.isPresent()) {
-                                    // retry and create topic
-                                    return getTopic(topicName, 
createIfMissing, properties);
-                                } else {
-                                    // in-progress future completed 
successfully
-                                    return 
CompletableFuture.completedFuture(value);
-                                }
-                            });
-                        }
-                    } else {
-                        return topicFuture;
-                    }
-                }
+            // If topic future exists in the cache returned directly 
regardless of whether it fails or timeout.
+            CompletableFuture<Optional<Topic>> tp = 
topics.get(topicName.toString());
+            if (tp != null) {
+                return tp;
             }
             final boolean isPersistentTopic = 
topicName.getDomain().equals(TopicDomain.persistent);
             if (isPersistentTopic) {
+                if (!pulsar.getConfiguration().isEnablePersistentTopics()) {
+                    if (log.isDebugEnabled()) {
+                        log.debug("Broker is unable to load persistent topic 
{}", topicName);
+                    }
+                    return FutureUtil.failedFuture(new NotAllowedException(
+                            "Broker is unable to load persistent topic"));
+                }
                 return 
pulsar.getPulsarResources().getTopicResources().persistentTopicExists(topicName)
                         .thenCompose(exists -> {
                     if (!exists && !createIfMissing) {
@@ -988,44 +988,48 @@ public class BrokerService implements Closeable {
                         throw FutureUtil.wrapToCompletionException(new 
ServiceUnitNotReadyException(errorInfo));
                     }).thenCompose(optionalTopicPolicies -> {
                         final TopicPolicies topicPolicies = 
optionalTopicPolicies.orElse(null);
-                        return topics.computeIfAbsent(topicName.toString(), 
(tpName) -> {
-                            if (topicName.isPartitioned()) {
-                                final TopicName topicNameEntity = 
TopicName.get(topicName.getPartitionedTopicName());
-                                return 
fetchPartitionedTopicMetadataAsync(topicNameEntity)
-                                        .thenCompose((metadata) -> {
-                                            // Allow crate non-partitioned 
persistent topic that name includes
-                                            // `partition`
-                                            if (metadata.partitions == 0
-                                                    || 
topicName.getPartitionIndex() < metadata.partitions) {
-                                                return 
loadOrCreatePersistentTopic(tpName, createIfMissing,
-                                                        properties, 
topicPolicies);
-                                            }
+                        if (topicName.isPartitioned()) {
+                            final TopicName topicNameEntity = 
TopicName.get(topicName.getPartitionedTopicName());
+                            return 
fetchPartitionedTopicMetadataAsync(topicNameEntity)
+                                    .thenCompose((metadata) -> {
+                                        // Allow crate non-partitioned 
persistent topic that name includes
+                                        // `partition`
+                                        if (metadata.partitions == 0
+                                                || 
topicName.getPartitionIndex() < metadata.partitions) {
+                                            return 
topics.computeIfAbsent(topicName.toString(), (tpName) ->
+                                                    
loadOrCreatePersistentTopic(tpName,
+                                                            createIfMissing, 
properties, topicPolicies));
+                                        } else {
                                             final String errorMsg =
                                                     String.format("Illegal 
topic partition name %s with max allowed "
                                                             + "%d partitions", 
topicName, metadata.partitions);
                                             log.warn(errorMsg);
                                             return FutureUtil.failedFuture(
                                                     new 
BrokerServiceException.NotAllowedException(errorMsg));
-                                        });
-                            }
-                            return loadOrCreatePersistentTopic(tpName, 
createIfMissing, properties, topicPolicies);
-                        }).thenCompose(optionalTopic -> {
-                            if (!optionalTopic.isPresent() && createIfMissing) 
{
-                                log.warn("[{}] Try to recreate the topic with 
createIfMissing=true "
-                                        + "but the returned topic is empty", 
topicName);
-                                return getTopic(topicName, createIfMissing, 
properties);
-                            }
-                            return 
CompletableFuture.completedFuture(optionalTopic);
-                        });
+                                        }
+                                    });
+                        } else {
+                            return 
topics.computeIfAbsent(topicName.toString(), (tpName) ->
+                                    loadOrCreatePersistentTopic(tpName, 
createIfMissing, properties, topicPolicies));
+                        }
                     });
                 });
             } else {
-                return topics.computeIfAbsent(topicName.toString(), (name) -> {
+                if (!pulsar.getConfiguration().isEnableNonPersistentTopics()) {
+                    if (log.isDebugEnabled()) {
+                        log.debug("Broker is unable to load non-persistent 
topic {}", topicName);
+                    }
+                    return FutureUtil.failedFuture(new NotAllowedException(
+                            "Broker is unable to load persistent topic"));
+                }
+                if (!topics.containsKey(topicName.toString())) {
                     topicEventsDispatcher.notify(topicName.toString(), 
TopicEvent.LOAD, EventStage.BEFORE);
-                    if (topicName.isPartitioned()) {
-                        final TopicName partitionedTopicName = 
TopicName.get(topicName.getPartitionedTopicName());
-                        return 
this.fetchPartitionedTopicMetadataAsync(partitionedTopicName).thenCompose((metadata)
 -> {
-                            if (topicName.getPartitionIndex() < 
metadata.partitions) {
+                }
+                if (topicName.isPartitioned()) {
+                    final TopicName partitionedTopicName = 
TopicName.get(topicName.getPartitionedTopicName());
+                    return 
this.fetchPartitionedTopicMetadataAsync(partitionedTopicName).thenCompose((metadata)
 -> {
+                        if (topicName.getPartitionIndex() < 
metadata.partitions) {
+                            return 
topics.computeIfAbsent(topicName.toString(), (name) -> {
                                 topicEventsDispatcher
                                         .notify(topicName.toString(), 
TopicEvent.CREATE, EventStage.BEFORE);
 
@@ -1036,11 +1040,13 @@ public class BrokerService implements Closeable {
                                 topicEventsDispatcher
                                         .notifyOnCompletion(eventFuture, 
topicName.toString(), TopicEvent.LOAD);
                                 return res;
-                            }
-                            topicEventsDispatcher.notify(topicName.toString(), 
TopicEvent.LOAD, EventStage.FAILURE);
-                            return 
CompletableFuture.completedFuture(Optional.empty());
-                        });
-                    } else if (createIfMissing) {
+                            });
+                        }
+                        topicEventsDispatcher.notify(topicName.toString(), 
TopicEvent.LOAD, EventStage.FAILURE);
+                        return 
CompletableFuture.completedFuture(Optional.empty());
+                    });
+                } else if (createIfMissing) {
+                    return topics.computeIfAbsent(topicName.toString(), (name) 
-> {
                         topicEventsDispatcher.notify(topicName.toString(), 
TopicEvent.CREATE, EventStage.BEFORE);
 
                         CompletableFuture<Optional<Topic>> res = 
createNonPersistentTopic(name);
@@ -1050,11 +1056,15 @@ public class BrokerService implements Closeable {
                         topicEventsDispatcher
                                 .notifyOnCompletion(eventFuture, 
topicName.toString(), TopicEvent.LOAD);
                         return res;
-                    } else {
+                    });
+                } else {
+                    CompletableFuture<Optional<Topic>> topicFuture = 
topics.get(topicName.toString());
+                    if (topicFuture == null) {
                         topicEventsDispatcher.notify(topicName.toString(), 
TopicEvent.LOAD, EventStage.FAILURE);
-                        return 
CompletableFuture.completedFuture(Optional.empty());
+                        topicFuture = 
CompletableFuture.completedFuture(Optional.empty());
                     }
-                });
+                    return topicFuture;
+                }
             }
         } catch (IllegalArgumentException e) {
             log.warn("[{}] Illegalargument exception when loading topic", 
topicName, e);
@@ -1193,15 +1203,9 @@ public class BrokerService implements Closeable {
         CompletableFuture<Optional<Topic>> topicFuture = new 
CompletableFuture<>();
         topicFuture.exceptionally(t -> {
             pulsarStats.recordTopicLoadFailed();
+            pulsar.getExecutor().execute(() -> topics.remove(topic, 
topicFuture));
             return null;
         });
-        if (!pulsar.getConfiguration().isEnableNonPersistentTopics()) {
-            if (log.isDebugEnabled()) {
-                log.debug("Broker is unable to load non-persistent topic {}", 
topic);
-            }
-            return FutureUtil.failedFuture(
-                    new NotAllowedException("Broker is not unable to load 
non-persistent topic"));
-        }
         final long topicCreateTimeMs = 
TimeUnit.NANOSECONDS.toMillis(System.nanoTime());
         NonPersistentTopic nonPersistentTopic;
         try {
@@ -1224,7 +1228,6 @@ public class BrokerService implements Closeable {
                     }).exceptionally(ex -> {
                 log.warn("Replication check failed. Removing topic from topics 
list {}, {}", topic, ex.getCause());
                 nonPersistentTopic.stopReplProducers().whenComplete((v, 
exception) -> {
-                    pulsar.getExecutor().execute(() -> topics.remove(topic, 
topicFuture));
                     topicFuture.completeExceptionally(ex);
                 });
                 return null;
@@ -1475,14 +1478,6 @@ public class BrokerService implements Closeable {
         final CompletableFuture<Optional<Topic>> topicFuture = 
FutureUtil.createFutureWithTimeout(
                 
Duration.ofSeconds(pulsar.getConfiguration().getTopicLoadTimeoutSeconds()), 
executor(),
                 () -> FAILED_TO_LOAD_TOPIC_TIMEOUT_EXCEPTION);
-        if (!pulsar.getConfiguration().isEnablePersistentTopics()) {
-            if (log.isDebugEnabled()) {
-                log.debug("Broker is unable to load persistent topic {}", 
topic);
-            }
-            topicFuture.completeExceptionally(new NotAllowedException(
-                    "Broker is unable to load persistent topic"));
-            return topicFuture;
-        }
 
         checkTopicNsOwnership(topic)
                 .thenRun(() -> {
@@ -1497,6 +1492,7 @@ public class BrokerService implements Closeable {
                             // do not recreate topic if topic is already 
migrated and deleted by broker
                             // so, avoid creating a new topic if migration is 
already started
                             if (ex != null && (ex.getCause() instanceof 
TopicMigratedException)) {
+                                pulsar.getExecutor().execute(() -> 
topics.remove(topic, topicFuture));
                                 
topicFuture.completeExceptionally(ex.getCause());
                                 return null;
                             }
@@ -1511,6 +1507,7 @@ public class BrokerService implements Closeable {
                         }
                     }
                 }).exceptionally(ex -> {
+                    pulsar.getExecutor().execute(() -> topics.remove(topic, 
topicFuture));
                     topicFuture.completeExceptionally(ex.getCause());
                     return null;
                 });
@@ -1685,6 +1682,7 @@ public class BrokerService implements Closeable {
                                                         + " topic", topic, 
FutureUtil.getException(topicFuture));
                                                 executor().submit(() -> {
                                                     
persistentTopic.close().whenComplete((ignore, ex) -> {
+                                                        topics.remove(topic, 
topicFuture);
                                                         if (ex != null) {
                                                             log.warn("[{}] Get 
an error when closing topic.",
                                                                     topic, ex);
@@ -1701,6 +1699,7 @@ public class BrokerService implements Closeable {
                                                     + " Removing topic from 
topics list {}, {}", topic, ex);
                                             executor().submit(() -> {
                                                 
persistentTopic.close().whenComplete((ignore, closeEx) -> {
+                                                    topics.remove(topic, 
topicFuture);
                                                     if (closeEx != null) {
                                                         log.warn("[{}] Get an 
error when closing topic.",
                                                                 topic, 
closeEx);
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ReplicatorTest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ReplicatorTest.java
index 765727aeac3..b58f416ea1a 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ReplicatorTest.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ReplicatorTest.java
@@ -19,12 +19,10 @@
 package org.apache.pulsar.broker.service;
 
 import static org.apache.pulsar.broker.BrokerTestUtil.newUniqueName;
-import static 
org.apache.pulsar.broker.auth.MockedPulsarServiceBaseTest.retryStrategically;
 import static org.mockito.ArgumentMatchers.eq;
 import static org.mockito.Mockito.doAnswer;
 import static org.mockito.Mockito.spy;
 import static org.testng.Assert.assertEquals;
-import static org.testng.Assert.assertNotEquals;
 import static org.testng.Assert.assertNotNull;
 import static org.testng.Assert.assertNull;
 import static org.testng.Assert.assertTrue;
@@ -1434,13 +1432,6 @@ public class ReplicatorTest extends ReplicatorTestBase {
             // Ok
         }
 
-        final CompletableFuture<Optional<Topic>> timedOutTopicFuture = 
topicFuture;
-        // timeout topic future should be removed from cache
-        retryStrategically((test) -> 
pulsar1.getBrokerService().getTopic(topicName, false) != timedOutTopicFuture, 5,
-                1000);
-
-        assertNotEquals(timedOutTopicFuture, 
pulsar1.getBrokerService().getTopics().get(topicName));
-
         try {
             Consumer<byte[]> consumer = 
client1.newConsumer().topic(topicName).subscriptionType(SubscriptionType.Shared)
                     
.subscriptionName("my-subscriber-name").subscribeAsync().get(100, 
TimeUnit.MILLISECONDS);
@@ -1452,6 +1443,7 @@ public class ReplicatorTest extends ReplicatorTestBase {
         ManagedLedgerImpl ml = (ManagedLedgerImpl) mlFactory.open(topicMlName 
+ "-2");
         mlFuture.complete(ml);
 
+        // Re-create topic will success.
         Consumer<byte[]> consumer = 
client1.newConsumer().topic(topicName).subscriptionName("my-subscriber-name")
                 .subscriptionType(SubscriptionType.Shared).subscribeAsync()
                 .get(2 * topicLoadTimeoutSeconds, TimeUnit.SECONDS);
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/OrphanPersistentTopicTest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/OrphanPersistentTopicTest.java
index 7cd9da7574d..d6473efd788 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/OrphanPersistentTopicTest.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/OrphanPersistentTopicTest.java
@@ -18,7 +18,9 @@
  */
 package org.apache.pulsar.client.api;
 
+import static 
org.apache.pulsar.broker.service.persistent.PersistentTopic.DEDUPLICATION_CURSOR_NAME;
 import static org.testng.Assert.assertEquals;
+import static org.testng.Assert.assertNotNull;
 import static org.testng.Assert.assertTrue;
 import java.lang.reflect.Field;
 import java.util.List;
@@ -27,6 +29,7 @@ import java.util.Optional;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicInteger;
+import lombok.SneakyThrows;
 import lombok.extern.slf4j.Slf4j;
 import org.apache.pulsar.broker.BrokerTestUtil;
 import org.apache.pulsar.broker.PulsarService;
@@ -34,6 +37,9 @@ import org.apache.pulsar.broker.service.BrokerService;
 import org.apache.pulsar.broker.service.Topic;
 import org.apache.pulsar.broker.service.TopicPoliciesService;
 import org.apache.pulsar.broker.service.TopicPolicyListener;
+import org.apache.pulsar.broker.transaction.buffer.TransactionBuffer;
+import org.apache.pulsar.broker.transaction.buffer.TransactionBufferProvider;
+import 
org.apache.pulsar.broker.transaction.buffer.impl.TransactionBufferDisable;
 import org.apache.pulsar.common.naming.TopicName;
 import org.apache.pulsar.common.policies.data.TopicPolicies;
 import org.apache.pulsar.compaction.CompactionServiceFactory;
@@ -108,6 +114,68 @@ public class OrphanPersistentTopicTest extends 
ProducerConsumerBase {
         
pulsar.getConfig().setTopicLoadTimeoutSeconds(originalTopicLoadTimeoutSeconds);
     }
 
+    @Test
+    public void testCloseLedgerThatTopicAfterCreateTimeout() throws Exception {
+        // Make the topic loading timeout faster.
+        long originalTopicLoadTimeoutSeconds = 
pulsar.getConfig().getTopicLoadTimeoutSeconds();
+        int topicLoadTimeoutSeconds = 1;
+        pulsar.getConfig().setTopicLoadTimeoutSeconds(topicLoadTimeoutSeconds);
+        pulsar.getConfig().setBrokerDeduplicationEnabled(true);
+        pulsar.getConfig().setTransactionCoordinatorEnabled(true);
+        String tpName = 
BrokerTestUtil.newUniqueName("persistent://public/default/tp2");
+
+        // Mock message deduplication recovery speed topicLoadTimeoutSeconds
+        String mlPath = BrokerService.MANAGED_LEDGER_PATH_ZNODE + "/" +
+                TopicName.get(tpName).getPersistenceNamingEncoding() + "/" + 
DEDUPLICATION_CURSOR_NAME;
+        mockZooKeeper.delay(topicLoadTimeoutSeconds * 1000, (op, path) -> {
+            if (mlPath.equals(path)) {
+                log.info("Topic load timeout: " + path);
+                return true;
+            }
+            return false;
+        });
+
+        // First load topic will trigger timeout
+        // The first topic load will trigger a timeout. When the topic closes, 
it will call transactionBuffer.close.
+        // Here, we simulate a sleep to ensure that the ledger is not 
immediately closed.
+        TransactionBufferProvider mockTransactionBufferProvider = new 
TransactionBufferProvider() {
+            @Override
+            public TransactionBuffer newTransactionBuffer(Topic originTopic) {
+                return new TransactionBufferDisable(originTopic) {
+                    @SneakyThrows
+                    @Override
+                    public CompletableFuture<Void> closeAsync() {
+                        Thread.sleep(500);
+                        return super.closeAsync();
+                    }
+                };
+            }
+        };
+        TransactionBufferProvider originalTransactionBufferProvider = 
pulsar.getTransactionBufferProvider();
+        pulsar.setTransactionExecutorProvider(mockTransactionBufferProvider);
+        CompletableFuture<Optional<Topic>> firstLoad = 
pulsar.getBrokerService().getTopic(tpName, true);
+        Awaitility.await().ignoreExceptions().atMost(5, TimeUnit.SECONDS)
+                .pollInterval(100, TimeUnit.MILLISECONDS)
+                // assert first create topic timeout
+                .untilAsserted(() -> {
+                    assertTrue(firstLoad.isCompletedExceptionally());
+                });
+
+        // Once the first load topic times out, immediately to load the topic 
again.
+        Producer<byte[]> producer = 
pulsarClient.newProducer().topic(tpName).create();
+        for (int i = 0; i < 10; i++) {
+            MessageId send = producer.send("msg".getBytes());
+            Thread.sleep(100);
+            assertNotNull(send);
+        }
+
+        // set to back
+        
pulsar.setTransactionExecutorProvider(originalTransactionBufferProvider);
+        
pulsar.getConfig().setTopicLoadTimeoutSeconds(originalTopicLoadTimeoutSeconds);
+        pulsar.getConfig().setBrokerDeduplicationEnabled(false);
+        pulsar.getConfig().setTransactionCoordinatorEnabled(false);
+    }
+
     @Test
     public void testNoOrphanTopicIfInitFailed() throws Exception {
         String tpName = 
BrokerTestUtil.newUniqueName("persistent://public/default/tp");

Reply via email to