This is an automated email from the ASF dual-hosted git repository.

penghui pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/master by this push:
     new f32154c  [Broker] Fixed wrong behaviour caused by not cleaning up 
topic policy service state. (#14503)
f32154c is described below

commit f32154c06c6475fac1cd89d105d3c31d5d8713dc
Author: Qiang Zhao <[email protected]>
AuthorDate: Wed Mar 9 09:18:39 2022 +0800

    [Broker] Fixed wrong behaviour caused by not cleaning up topic policy 
service state. (#14503)
---
 .../SystemTopicBasedTopicPoliciesService.java      | 39 +++++++++++-----------
 1 file changed, 20 insertions(+), 19 deletions(-)

diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/SystemTopicBasedTopicPoliciesService.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/SystemTopicBasedTopicPoliciesService.java
index 9b8c69e..bbb0257 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/SystemTopicBasedTopicPoliciesService.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/SystemTopicBasedTopicPoliciesService.java
@@ -23,6 +23,7 @@ import com.google.common.collect.Lists;
 import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
+import java.util.Objects;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.TimeUnit;
@@ -255,8 +256,7 @@ public class SystemTopicBasedTopicPoliciesService 
implements TopicPoliciesServic
                 if (ex != null) {
                     log.error("[{}] Failed to create reader on __change_events 
topic", namespace, ex);
                     result.completeExceptionally(ex);
-                    readerCaches.remove(namespace);
-                    reader.closeAsync();
+                    
cleanCacheAndCloseReader(reader.getSystemTopic().getTopicName().getNamespaceObject(),
 false);
                 } else {
                     initPolicesCache(reader, result);
                     result.thenRun(() -> readMorePolicies(reader));
@@ -290,14 +290,7 @@ public class SystemTopicBasedTopicPoliciesService 
implements TopicPoliciesServic
         }
         AtomicInteger bundlesCount = 
ownedBundlesCountPerNamespace.get(namespace);
         if (bundlesCount == null || bundlesCount.decrementAndGet() <= 0) {
-            CompletableFuture<SystemTopicClient.Reader<PulsarEvent>> 
readerCompletableFuture =
-                    readerCaches.remove(namespace);
-            if (readerCompletableFuture != null) {
-                
readerCompletableFuture.thenAccept(SystemTopicClient.Reader::closeAsync);
-                ownedBundlesCountPerNamespace.remove(namespace);
-                policyCacheInitMap.remove(namespace);
-                policiesCache.entrySet().removeIf(entry -> 
entry.getKey().getNamespaceObject().equals(namespace));
-            }
+            cleanCacheAndCloseReader(namespace, true);
         }
         return CompletableFuture.completedFuture(null);
     }
@@ -331,9 +324,7 @@ public class SystemTopicBasedTopicPoliciesService 
implements TopicPoliciesServic
                 log.error("[{}] Failed to check the move events for the system 
topic",
                         reader.getSystemTopic().getTopicName(), ex);
                 future.completeExceptionally(ex);
-                
readerCaches.remove(reader.getSystemTopic().getTopicName().getNamespaceObject());
-                
policyCacheInitMap.remove(reader.getSystemTopic().getTopicName().getNamespaceObject());
-                reader.closeAsync();
+                
cleanCacheAndCloseReader(reader.getSystemTopic().getTopicName().getNamespaceObject(),
 false);
                 return;
             }
             if (hasMore) {
@@ -342,9 +333,7 @@ public class SystemTopicBasedTopicPoliciesService 
implements TopicPoliciesServic
                         log.error("[{}] Failed to read event from the system 
topic.",
                                 reader.getSystemTopic().getTopicName(), e);
                         future.completeExceptionally(e);
-                        
readerCaches.remove(reader.getSystemTopic().getTopicName().getNamespaceObject());
-                        
policyCacheInitMap.remove(reader.getSystemTopic().getTopicName().getNamespaceObject());
-                        reader.closeAsync();
+                        
cleanCacheAndCloseReader(reader.getSystemTopic().getTopicName().getNamespaceObject(),
 false);
                         return;
                     }
                     refreshTopicPoliciesCache(msg);
@@ -373,6 +362,18 @@ public class SystemTopicBasedTopicPoliciesService 
implements TopicPoliciesServic
         });
     }
 
+    private void cleanCacheAndCloseReader(NamespaceName namespace, boolean 
cleanOwnedBundlesCount) {
+        CompletableFuture<SystemTopicClient.Reader<PulsarEvent>> readerFuture 
= readerCaches.remove(namespace);
+        policiesCache.entrySet().removeIf(entry -> 
Objects.equals(entry.getKey().getNamespaceObject(), namespace));
+        if (cleanOwnedBundlesCount) {
+            ownedBundlesCountPerNamespace.remove(namespace);
+        }
+        if (readerFuture != null && !readerFuture.isCompletedExceptionally()) {
+            readerFuture.thenAccept(SystemTopicClient.Reader::closeAsync);
+        }
+        policyCacheInitMap.remove(namespace);
+    }
+
     private void readMorePolicies(SystemTopicClient.Reader<PulsarEvent> 
reader) {
         reader.readNextAsync().whenComplete((msg, ex) -> {
             if (ex == null) {
@@ -382,10 +383,10 @@ public class SystemTopicBasedTopicPoliciesService 
implements TopicPoliciesServic
             } else {
                 if (ex instanceof 
PulsarClientException.AlreadyClosedException) {
                     log.error("Read more topic policies exception, close the 
read now!", ex);
-                    NamespaceName namespace = 
reader.getSystemTopic().getTopicName().getNamespaceObject();
-                    ownedBundlesCountPerNamespace.remove(namespace);
-                    readerCaches.remove(namespace);
+                    cleanCacheAndCloseReader(
+                            
reader.getSystemTopic().getTopicName().getNamespaceObject(), false);
                 } else {
+                    log.warn("Read more topic polices exception, read again.", 
ex);
                     readMorePolicies(reader);
                 }
             }

Reply via email to