This is an automated email from the ASF dual-hosted git repository.

bogong pushed a commit to branch branch-2.9
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/branch-2.9 by this push:
     new ca4a5d18969 [Broker] Do not create missing topic when loading 
namespace (#13948)
ca4a5d18969 is described below

commit ca4a5d18969add06c0ae0bc495e48835b28602a9
Author: Michael Marshall <[email protected]>
AuthorDate: Fri Jan 28 02:25:51 2022 -0600

    [Broker] Do not create missing topic when loading namespace (#13948)
    
    ### Motivation
    
    Based on my reading of the `PulsarService#loadNamespaceTopics` method, it 
appears that there is a data race that could lead to the broker recreating a 
topic that was just deleted. Currently, we do the following when loading a 
namespace bundle:
    
    1. Get all topics for a namespace bundle from the metadata store.
    2. Asynchronously call `getOrCreateTopic` for each of the topics.
    
    If a topic was deleted after step 1 and before step 2, the topic will get 
recreated (assuming that auto topic creation is enabled).
    
    ### Modifications
    
    * Use `getTopicIfExists` to load a namespace bundle instead of 
`getOrCreateTopic`.
    * Only log the count of actually loaded topics.
    
    ### Verifying this change
    
    From my perspective, this is a minor change that should remove the 
possibility of a very specific race condition. I don't see an easy way to test 
this behavior. However, the `getTopics` method, which backs both 
`getTopicIfExists` and `getOrCreateTopic` is well tested, so I think it is okay 
to skip adding a test here. Let me know if you disagree.
    
    (cherry picked from commit 740268832101fa8653a627dd3e4e1f629822e2f4)
---
 .../src/main/java/org/apache/pulsar/broker/PulsarService.java    | 9 ++++++---
 1 file changed, 6 insertions(+), 3 deletions(-)

diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java
index acbd1ea33e4..a95ba68344a 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java
@@ -1069,7 +1069,7 @@ public class PulsarService implements AutoCloseable, 
ShutdownService {
             LOG.info("Loading all topics on bundle: {}", bundle);
 
             NamespaceName nsName = bundle.getNamespaceObject();
-            List<CompletableFuture<Topic>> persistentTopics = 
Lists.newArrayList();
+            List<CompletableFuture<Optional<Topic>>> persistentTopics = 
Lists.newArrayList();
             long topicLoadStart = System.nanoTime();
 
             for (String topic : 
getNamespaceService().getListOfPersistentTopics(nsName)
@@ -1077,7 +1077,7 @@ public class PulsarService implements AutoCloseable, 
ShutdownService {
                 try {
                     TopicName topicName = TopicName.get(topic);
                     if (bundle.includes(topicName) && 
!isTransactionSystemTopic(topicName)) {
-                        CompletableFuture<Topic> future = 
brokerService.getOrCreateTopic(topic);
+                        CompletableFuture<Optional<Topic>> future = 
brokerService.getTopicIfExists(topic);
                         if (future != null) {
                             persistentTopics.add(future);
                         }
@@ -1091,7 +1091,10 @@ public class PulsarService implements AutoCloseable, 
ShutdownService {
                 FutureUtil.waitForAll(persistentTopics).thenRun(() -> {
                     double topicLoadTimeSeconds = 
TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - topicLoadStart)
                             / 1000.0;
-                    LOG.info("Loaded {} topics on {} -- time taken: {} 
seconds", persistentTopics.size(), bundle,
+                    long numTopicsLoaded = persistentTopics.stream()
+                            .filter(optionalTopicFuture -> 
optionalTopicFuture.getNow(Optional.empty()).isPresent())
+                            .count();
+                    LOG.info("Loaded {} topics on {} -- time taken: {} 
seconds", numTopicsLoaded, bundle,
                             topicLoadTimeSeconds);
                 });
             }

Reply via email to