This is an automated email from the ASF dual-hosted git repository.

technoboy pushed a commit to branch branch-2.11
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/branch-2.11 by this push:
     new 6f96d688701 [fix][broker] Topic policy can not be work well if replay 
policy message has any exception. (#20613)
6f96d688701 is described below

commit 6f96d6887014dff1b7430fb2055f5eaf248d2c46
Author: Hao Zhang <[email protected]>
AuthorDate: Fri Jun 30 12:28:32 2023 +0800

    [fix][broker] Topic policy can not be work well if replay policy message 
has any exception. (#20613)
---
 .../service/SystemTopicBasedTopicPoliciesService.java  | 18 +++++++++++++++---
 1 file changed, 15 insertions(+), 3 deletions(-)

diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/SystemTopicBasedTopicPoliciesService.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/SystemTopicBasedTopicPoliciesService.java
index 9e5e1b4378c..de7dd3e6044 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/SystemTopicBasedTopicPoliciesService.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/SystemTopicBasedTopicPoliciesService.java
@@ -176,7 +176,11 @@ public class SystemTopicBasedTopicPoliciesService 
implements TopicPoliciesServic
             TopicName topicName =  
TopicName.get(TopicName.get(msg.getKey()).getPartitionedTopicName());
             if (listeners.get(topicName) != null) {
                 for (TopicPolicyListener<TopicPolicies> listener : 
listeners.get(topicName)) {
-                    listener.onUpdate(null);
+                    try {
+                        listener.onUpdate(null);
+                    } catch (Throwable error) {
+                        log.error("[{}] call listener error.", topicName, 
error);
+                    }
                 }
             }
             return;
@@ -191,7 +195,11 @@ public class SystemTopicBasedTopicPoliciesService 
implements TopicPoliciesServic
         if (listeners.get(topicName) != null) {
             TopicPolicies policies = event.getPolicies();
             for (TopicPolicyListener<TopicPolicies> listener : 
listeners.get(topicName)) {
-                listener.onUpdate(policies);
+                try {
+                    listener.onUpdate(policies);
+                } catch (Throwable error) {
+                    log.error("[{}] call listener error.", topicName, error);
+                }
             }
         }
     }
@@ -361,7 +369,11 @@ public class SystemTopicBasedTopicPoliciesService 
implements TopicPoliciesServic
                 policiesCache.forEach(((topicName, topicPolicies) -> {
                     if (listeners.get(topicName) != null) {
                         for (TopicPolicyListener<TopicPolicies> listener : 
listeners.get(topicName)) {
-                            listener.onUpdate(topicPolicies);
+                            try {
+                                listener.onUpdate(topicPolicies);
+                            } catch (Throwable error) {
+                                log.error("[{}] call listener error.", 
topicName, error);
+                            }
                         }
                     }
                 }));

Reply via email to