This is an automated email from the ASF dual-hosted git repository.
technoboy pushed a commit to branch branch-2.11
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/branch-2.11 by this push:
new 6f96d688701 [fix][broker] Topic policy can not be work well if replay
policy message has any exception. (#20613)
6f96d688701 is described below
commit 6f96d6887014dff1b7430fb2055f5eaf248d2c46
Author: Hao Zhang <[email protected]>
AuthorDate: Fri Jun 30 12:28:32 2023 +0800
[fix][broker] Topic policy can not be work well if replay policy message
has any exception. (#20613)
---
.../service/SystemTopicBasedTopicPoliciesService.java | 18 +++++++++++++++---
1 file changed, 15 insertions(+), 3 deletions(-)
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/SystemTopicBasedTopicPoliciesService.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/SystemTopicBasedTopicPoliciesService.java
index 9e5e1b4378c..de7dd3e6044 100644
---
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/SystemTopicBasedTopicPoliciesService.java
+++
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/SystemTopicBasedTopicPoliciesService.java
@@ -176,7 +176,11 @@ public class SystemTopicBasedTopicPoliciesService
implements TopicPoliciesServic
TopicName topicName =
TopicName.get(TopicName.get(msg.getKey()).getPartitionedTopicName());
if (listeners.get(topicName) != null) {
for (TopicPolicyListener<TopicPolicies> listener :
listeners.get(topicName)) {
- listener.onUpdate(null);
+ try {
+ listener.onUpdate(null);
+ } catch (Throwable error) {
+ log.error("[{}] call listener error.", topicName,
error);
+ }
}
}
return;
@@ -191,7 +195,11 @@ public class SystemTopicBasedTopicPoliciesService
implements TopicPoliciesServic
if (listeners.get(topicName) != null) {
TopicPolicies policies = event.getPolicies();
for (TopicPolicyListener<TopicPolicies> listener :
listeners.get(topicName)) {
- listener.onUpdate(policies);
+ try {
+ listener.onUpdate(policies);
+ } catch (Throwable error) {
+ log.error("[{}] call listener error.", topicName, error);
+ }
}
}
}
@@ -361,7 +369,11 @@ public class SystemTopicBasedTopicPoliciesService
implements TopicPoliciesServic
policiesCache.forEach(((topicName, topicPolicies) -> {
if (listeners.get(topicName) != null) {
for (TopicPolicyListener<TopicPolicies> listener :
listeners.get(topicName)) {
- listener.onUpdate(topicPolicies);
+ try {
+ listener.onUpdate(topicPolicies);
+ } catch (Throwable error) {
+ log.error("[{}] call listener error.",
topicName, error);
+ }
}
}
}));