This is an automated email from the ASF dual-hosted git repository.

zhaocong pushed a commit to branch branch-2.9
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/branch-2.9 by this push:
     new b1137e8e495 [fix][broker] Topic policy can not be work well if replay 
policy message has any exception. (#20613)
b1137e8e495 is described below

commit b1137e8e495776cae854ff410701111e73ffda9f
Author: Hao Zhang <[email protected]>
AuthorDate: Fri Jun 30 12:28:32 2023 +0800

    [fix][broker] Topic policy can not be work well if replay policy message 
has any exception. (#20613)
    
    (cherry picked from commit 200fb562dd4437857ccaba3850bd64b0a9a50b3c)
---
 .../service/SystemTopicBasedTopicPoliciesService.java  | 18 +++++++++++++++---
 1 file changed, 15 insertions(+), 3 deletions(-)

diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/SystemTopicBasedTopicPoliciesService.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/SystemTopicBasedTopicPoliciesService.java
index f01ee7c9636..162e77cec32 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/SystemTopicBasedTopicPoliciesService.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/SystemTopicBasedTopicPoliciesService.java
@@ -157,7 +157,11 @@ public class SystemTopicBasedTopicPoliciesService 
implements TopicPoliciesServic
             TopicName topicName =  
TopicName.get(TopicName.get(msg.getKey()).getPartitionedTopicName());
             if (listeners.get(topicName) != null) {
                 for (TopicPolicyListener<TopicPolicies> listener : 
listeners.get(topicName)) {
-                    listener.onUpdate(null);
+                    try {
+                        listener.onUpdate(null);
+                    } catch (Throwable error) {
+                        log.error("[{}] call listener error.", topicName, 
error);
+                    }
                 }
             }
             return;
@@ -172,7 +176,11 @@ public class SystemTopicBasedTopicPoliciesService 
implements TopicPoliciesServic
         if (listeners.get(topicName) != null) {
             TopicPolicies policies = event.getPolicies();
             for (TopicPolicyListener<TopicPolicies> listener : 
listeners.get(topicName)) {
-                listener.onUpdate(policies);
+                try {
+                    listener.onUpdate(policies);
+                } catch (Throwable error) {
+                    log.error("[{}] call listener error.", topicName, error);
+                }
             }
         }
     }
@@ -336,7 +344,11 @@ public class SystemTopicBasedTopicPoliciesService 
implements TopicPoliciesServic
                 policiesCache.forEach(((topicName, topicPolicies) -> {
                     if (listeners.get(topicName) != null) {
                         for (TopicPolicyListener<TopicPolicies> listener : 
listeners.get(topicName)) {
-                            listener.onUpdate(topicPolicies);
+                            try {
+                                listener.onUpdate(topicPolicies);
+                            } catch (Throwable error) {
+                                log.error("[{}] call listener error.", 
topicName, error);
+                            }
                         }
                     }
                 }));

Reply via email to