This is an automated email from the ASF dual-hosted git repository.

technoboy pushed a commit to branch branch-2.11
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/branch-2.11 by this push:
     new 12f9f7cbbaf [fix][broker] Fix potential exception cause the policy 
service init fail. (#19746)
12f9f7cbbaf is described below

commit 12f9f7cbbaffb598cbd7712abee3e482c5487295
Author: Jiwei Guo <[email protected]>
AuthorDate: Thu Mar 9 16:18:00 2023 +0800

    [fix][broker] Fix potential exception cause the policy service init fail. 
(#19746)
---
 .../service/SystemTopicBasedTopicPoliciesService.java     | 15 +++++++--------
 1 file changed, 7 insertions(+), 8 deletions(-)

diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/SystemTopicBasedTopicPoliciesService.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/SystemTopicBasedTopicPoliciesService.java
index 703f5bb2589..9e5e1b4378c 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/SystemTopicBasedTopicPoliciesService.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/SystemTopicBasedTopicPoliciesService.java
@@ -337,20 +337,19 @@ public class SystemTopicBasedTopicPoliciesService 
implements TopicPoliciesServic
                 return;
             }
             if (hasMore) {
-                reader.readNextAsync().whenComplete((msg, e) -> {
-                    if (e != null) {
-                        log.error("[{}] Failed to read event from the system 
topic.",
-                                reader.getSystemTopic().getTopicName(), e);
-                        future.completeExceptionally(e);
-                        
cleanCacheAndCloseReader(reader.getSystemTopic().getTopicName().getNamespaceObject(),
 false);
-                        return;
-                    }
+                reader.readNextAsync().thenAccept(msg -> {
                     refreshTopicPoliciesCache(msg);
                     if (log.isDebugEnabled()) {
                         log.debug("[{}] Loop next event reading for system 
topic.",
                                 
reader.getSystemTopic().getTopicName().getNamespaceObject());
                     }
                     initPolicesCache(reader, future);
+                }).exceptionally(e -> {
+                    log.error("[{}] Failed to read event from the system 
topic.",
+                            reader.getSystemTopic().getTopicName(), e);
+                    future.completeExceptionally(e);
+                    
cleanCacheAndCloseReader(reader.getSystemTopic().getTopicName().getNamespaceObject(),
 false);
+                    return null;
                 });
             } else {
                 if (log.isDebugEnabled()) {

Reply via email to