This is an automated email from the ASF dual-hosted git repository.

technoboy pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/master by this push:
     new 12a6cc46bcd Move  into future stage to catch the exception (#17556)
12a6cc46bcd is described below

commit 12a6cc46bcde943bfb08fa83a9822b81f20508d9
Author: Jiwei Guo <[email protected]>
AuthorDate: Sat Sep 10 13:48:08 2022 +0800

    Move  into future stage to catch the exception (#17556)
---
 .../SystemTopicBasedTopicPoliciesService.java      | 37 ++++++++++++----------
 1 file changed, 20 insertions(+), 17 deletions(-)

diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/SystemTopicBasedTopicPoliciesService.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/SystemTopicBasedTopicPoliciesService.java
index ed69428386a..93f97bbce07 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/SystemTopicBasedTopicPoliciesService.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/SystemTopicBasedTopicPoliciesService.java
@@ -380,23 +380,26 @@ public class SystemTopicBasedTopicPoliciesService 
implements TopicPoliciesServic
     }
 
     private void readMorePolicies(SystemTopicClient.Reader<PulsarEvent> 
reader) {
-        reader.readNextAsync().whenComplete((msg, ex) -> {
-            if (ex == null) {
-                refreshTopicPoliciesCache(msg);
-                notifyListener(msg);
-                readMorePolicies(reader);
-            } else {
-                Throwable cause = FutureUtil.unwrapCompletionException(ex);
-                if (cause instanceof 
PulsarClientException.AlreadyClosedException) {
-                    log.error("Read more topic policies exception, close the 
read now!", ex);
-                    cleanCacheAndCloseReader(
-                            
reader.getSystemTopic().getTopicName().getNamespaceObject(), false);
-                } else {
-                    log.warn("Read more topic polices exception, read again.", 
ex);
-                    readMorePolicies(reader);
-                }
-            }
-        });
+        reader.readNextAsync()
+              .thenAccept(msg -> {
+                  refreshTopicPoliciesCache(msg);
+                  notifyListener(msg);
+              })
+              .whenComplete((__, ex) -> {
+                  if (ex == null) {
+                      readMorePolicies(reader);
+                  } else {
+                      Throwable cause = 
FutureUtil.unwrapCompletionException(ex);
+                      if (cause instanceof 
PulsarClientException.AlreadyClosedException) {
+                          log.error("Read more topic policies exception, close 
the read now!", ex);
+                          cleanCacheAndCloseReader(
+                                  
reader.getSystemTopic().getTopicName().getNamespaceObject(), false);
+                      } else {
+                          log.warn("Read more topic polices exception, read 
again.", ex);
+                          readMorePolicies(reader);
+                      }
+                  }
+              });
     }
 
     private void refreshTopicPoliciesCache(Message<PulsarEvent> msg) {

Reply via email to