This is an automated email from the ASF dual-hosted git repository.

mattisonchao pushed a commit to branch branch-2.9
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/branch-2.9 by this push:
     new db8480b66f0 [fix][broker] Topic policy reader can't recover when get 
any exception (#17562)
db8480b66f0 is described below

commit db8480b66f0634193e85de01358a1e34648d5595
Author: Qiang Zhao <[email protected]>
AuthorDate: Tue Sep 13 09:06:25 2022 +0800

    [fix][broker] Topic policy reader can't recover when get any exception 
(#17562)
    
    (cherry picked from commit 5aa1f1101c6fef61163b34558cc98bf362dfa969)
---
 .../SystemTopicBasedTopicPoliciesService.java      | 28 ++++++++++++----------
 1 file changed, 16 insertions(+), 12 deletions(-)

diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/SystemTopicBasedTopicPoliciesService.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/SystemTopicBasedTopicPoliciesService.java
index a5bcaa5728d..38ec5d803c6 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/SystemTopicBasedTopicPoliciesService.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/SystemTopicBasedTopicPoliciesService.java
@@ -27,6 +27,7 @@ import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicInteger;
+import javax.annotation.Nonnull;
 import org.apache.pulsar.broker.PulsarServerException;
 import org.apache.pulsar.broker.PulsarService;
 import org.apache.pulsar.broker.namespace.NamespaceBundleOwnershipListener;
@@ -230,21 +231,20 @@ public class SystemTopicBasedTopicPoliciesService 
implements TopicPoliciesServic
         return result;
     }
 
-    private void prepareInitPoliciesCache(NamespaceName namespace, 
CompletableFuture<Void> result) {
+    private void prepareInitPoliciesCache(@Nonnull NamespaceName namespace, 
CompletableFuture<Void> result) {
         if (policyCacheInitMap.putIfAbsent(namespace, false) == null) {
             CompletableFuture<SystemTopicClient.Reader<PulsarEvent>> 
readerCompletableFuture =
                     createSystemTopicClientWithRetry(namespace);
             readerCaches.put(namespace, readerCompletableFuture);
             ownedBundlesCountPerNamespace.putIfAbsent(namespace, new 
AtomicInteger(1));
-            readerCompletableFuture.whenComplete((reader, ex) -> {
-                if (ex != null) {
-                    log.error("[{}] Failed to create reader on __change_events 
topic", namespace, ex);
-                    result.completeExceptionally(ex);
-                    
cleanCacheAndCloseReader(reader.getSystemTopic().getTopicName().getNamespaceObject(),
 false);
-                } else {
-                    initPolicesCache(reader, result);
-                    result.thenRun(() -> readMorePolicies(reader));
-                }
+            readerCompletableFuture.thenAccept(reader -> {
+                initPolicesCache(reader, result);
+                result.thenRun(() -> readMorePolicies(reader));
+            }).exceptionally(ex -> {
+                log.error("[{}] Failed to create reader on __change_events 
topic", namespace, ex);
+                cleanCacheAndCloseReader(namespace, false);
+                result.completeExceptionally(ex);
+                return null;
             });
         }
     }
@@ -346,14 +346,18 @@ public class SystemTopicBasedTopicPoliciesService 
implements TopicPoliciesServic
         });
     }
 
-    private void cleanCacheAndCloseReader(NamespaceName namespace, boolean 
cleanOwnedBundlesCount) {
+    private void cleanCacheAndCloseReader(@Nonnull NamespaceName namespace, 
boolean cleanOwnedBundlesCount) {
         CompletableFuture<SystemTopicClient.Reader<PulsarEvent>> readerFuture 
= readerCaches.remove(namespace);
         policiesCache.entrySet().removeIf(entry -> 
Objects.equals(entry.getKey().getNamespaceObject(), namespace));
         if (cleanOwnedBundlesCount) {
             ownedBundlesCountPerNamespace.remove(namespace);
         }
         if (readerFuture != null && !readerFuture.isCompletedExceptionally()) {
-            readerFuture.thenAccept(SystemTopicClient.Reader::closeAsync);
+            readerFuture.thenCompose(SystemTopicClient.Reader::closeAsync)
+                    .exceptionally(ex -> {
+                        log.warn("[{}] Close change_event reader fail.", 
namespace, ex);
+                        return null;
+                    });
         }
         policyCacheInitMap.remove(namespace);
     }

Reply via email to