This is an automated email from the ASF dual-hosted git repository.

technoboy pushed a commit to branch branch-2.11
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/branch-2.11 by this push:
     new be9cbc3b286 [fix][broker]Consumer can't consume messages because there 
has two sames topics in one broker (#17526)
be9cbc3b286 is described below

commit be9cbc3b28665615fb913d02c67cf9e4d232afa0
Author: fengyubiao <[email protected]>
AuthorDate: Wed Sep 21 21:28:38 2022 +0800

    [fix][broker]Consumer can't consume messages because there has two sames 
topics in one broker (#17526)
---
 .../pulsar/broker/service/BrokerService.java       | 56 ++++++++++++++++++++--
 .../service/nonpersistent/NonPersistentTopic.java  |  4 +-
 .../broker/service/persistent/PersistentTopic.java |  6 +--
 3 files changed, 56 insertions(+), 10 deletions(-)

diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java
index 48f3713f5cb..b704044e139 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java
@@ -1945,7 +1945,7 @@ public class BrokerService implements Closeable {
             TopicName topicName = TopicName.get(topic);
             if (serviceUnit.includes(topicName) && 
getTopicReference(topic).isPresent()) {
                 log.info("[{}][{}] Clean unloaded topic from cache.", 
serviceUnit.toString(), topic);
-                
pulsar.getBrokerService().removeTopicFromCache(topicName.toString(), 
serviceUnit);
+                
pulsar.getBrokerService().removeTopicFromCache(topicName.toString(), 
serviceUnit, null);
             }
         }
     }
@@ -1954,15 +1954,56 @@ public class BrokerService implements Closeable {
         return authorizationService;
     }
 
-    public CompletableFuture<Void> removeTopicFromCache(String topic) {
+    public CompletableFuture<Void> removeTopicFromCache(String topicName) {
+        return removeTopicFutureFromCache(topicName, null);
+    }
+
+    public CompletableFuture<Void> removeTopicFromCache(Topic topic) {
+        Optional<CompletableFuture<Optional<Topic>>> createTopicFuture = 
findTopicFutureInCache(topic);
+        if (createTopicFuture.isEmpty()){
+            return CompletableFuture.completedFuture(null);
+        }
+        return removeTopicFutureFromCache(topic.getName(), 
createTopicFuture.get());
+    }
+
+    private Optional<CompletableFuture<Optional<Topic>>> 
findTopicFutureInCache(Topic topic){
+        if (topic == null){
+            return Optional.empty();
+        }
+        final CompletableFuture<Optional<Topic>> createTopicFuture = 
topics.get(topic.getName());
+        // If not exists in cache, do nothing.
+        if (createTopicFuture == null){
+            return Optional.empty();
+        }
+        // If the future in cache is not yet complete, the topic instance in 
the cache is not the same with the topic.
+        if (!createTopicFuture.isDone()){
+            return Optional.empty();
+        }
+        // If the future in cache has exception complete,
+        // the topic instance in the cache is not the same with the topic.
+        if (createTopicFuture.isCompletedExceptionally()){
+            return Optional.empty();
+        }
+        Optional<Topic> optionalTopic = createTopicFuture.join();
+        Topic topicInCache = optionalTopic.orElse(null);
+        if (topicInCache == null || topicInCache != topic){
+            return Optional.empty();
+        } else {
+            return Optional.of(createTopicFuture);
+        }
+    }
+
+    private CompletableFuture<Void> removeTopicFutureFromCache(String topic,
+                                                        
CompletableFuture<Optional<Topic>> createTopicFuture) {
         TopicName topicName = TopicName.get(topic);
         return pulsar.getNamespaceService().getBundleAsync(topicName)
                 .thenAccept(namespaceBundle -> {
-                    removeTopicFromCache(topic, namespaceBundle);
+                    removeTopicFromCache(topic, namespaceBundle, 
createTopicFuture);
                 });
     }
 
-    public void removeTopicFromCache(String topic, NamespaceBundle 
namespaceBundle) {
+    private void removeTopicFromCache(String topic, NamespaceBundle 
namespaceBundle,
+                                     CompletableFuture<Optional<Topic>> 
createTopicFuture) {
         String bundleName = namespaceBundle.toString();
         String namespaceName = 
TopicName.get(topic).getNamespaceObject().toString();
 
@@ -1989,7 +2030,12 @@ public class BrokerService implements Closeable {
                 }
             }
         }
-        topics.remove(topic);
+
+        if (createTopicFuture == null) {
+            topics.remove(topic);
+        } else {
+            topics.remove(topic, createTopicFuture);
+        }
 
         Compactor compactor = pulsar.getNullableCompactor();
         if (compactor != null) {
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentTopic.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentTopic.java
index db77d12e713..b178e99b5c8 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentTopic.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentTopic.java
@@ -439,7 +439,7 @@ public class NonPersistentTopic extends AbstractTopic 
implements Topic, TopicPol
                             // topic GC iterates over topics map and removing 
from the map with the same thread creates
                             // deadlock. so, execute it in different thread
                             brokerService.executor().execute(() -> {
-                                brokerService.removeTopicFromCache(topic);
+                                
brokerService.removeTopicFromCache(NonPersistentTopic.this);
                                 unregisterTopicPolicyListener();
                                 log.info("[{}] Topic deleted", topic);
                                 deleteFuture.complete(null);
@@ -506,7 +506,7 @@ public class NonPersistentTopic extends AbstractTopic 
implements Topic, TopicPol
             // unload topic iterates over topics map and removing from the map 
with the same thread creates deadlock.
             // so, execute it in different thread
             brokerService.executor().execute(() -> {
-                brokerService.removeTopicFromCache(topic);
+                brokerService.removeTopicFromCache(NonPersistentTopic.this);
                 unregisterTopicPolicyListener();
                 closeFuture.complete(null);
             });
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
index 60b08413235..5114ddcee54 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
@@ -1190,7 +1190,7 @@ public class PersistentTopic extends AbstractTopic 
implements Topic, AddEntryCal
                                     ledger.asyncDelete(new 
AsyncCallbacks.DeleteLedgerCallback() {
                                         @Override
                                         public void 
deleteLedgerComplete(Object ctx) {
-                                            
brokerService.removeTopicFromCache(topic);
+                                            
brokerService.removeTopicFromCache(PersistentTopic.this);
 
                                             
dispatchRateLimiter.ifPresent(DispatchRateLimiter::close);
 
@@ -1290,7 +1290,7 @@ public class PersistentTopic extends AbstractTopic 
implements Topic, AddEntryCal
                 @Override
                 public void closeComplete(Object ctx) {
                     // Everything is now closed, remove the topic from map
-                    brokerService.removeTopicFromCache(topic)
+                    brokerService.removeTopicFromCache(PersistentTopic.this)
                             .thenRun(() -> {
                                 
replicatedSubscriptionsController.ifPresent(ReplicatedSubscriptionsController::close);
 
@@ -1312,7 +1312,7 @@ public class PersistentTopic extends AbstractTopic 
implements Topic, AddEntryCal
                 @Override
                 public void closeFailed(ManagedLedgerException exception, 
Object ctx) {
                     log.error("[{}] Failed to close managed ledger, proceeding 
anyway.", topic, exception);
-                    brokerService.removeTopicFromCache(topic);
+                    brokerService.removeTopicFromCache(PersistentTopic.this);
                     unregisterTopicPolicyListener();
                     closeFuture.complete(null);
                 }

Reply via email to