This is an automated email from the ASF dual-hosted git repository.
zhaocong pushed a commit to branch branch-2.9
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/branch-2.9 by this push:
new b1137e8e495 [fix][broker] Topic policy can not be work well if replay
policy message has any exception. (#20613)
b1137e8e495 is described below
commit b1137e8e495776cae854ff410701111e73ffda9f
Author: Hao Zhang <[email protected]>
AuthorDate: Fri Jun 30 12:28:32 2023 +0800
[fix][broker] Topic policy can not be work well if replay policy message
has any exception. (#20613)
(cherry picked from commit 200fb562dd4437857ccaba3850bd64b0a9a50b3c)
---
.../service/SystemTopicBasedTopicPoliciesService.java | 18 +++++++++++++++---
1 file changed, 15 insertions(+), 3 deletions(-)
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/SystemTopicBasedTopicPoliciesService.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/SystemTopicBasedTopicPoliciesService.java
index f01ee7c9636..162e77cec32 100644
---
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/SystemTopicBasedTopicPoliciesService.java
+++
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/SystemTopicBasedTopicPoliciesService.java
@@ -157,7 +157,11 @@ public class SystemTopicBasedTopicPoliciesService
implements TopicPoliciesServic
TopicName topicName =
TopicName.get(TopicName.get(msg.getKey()).getPartitionedTopicName());
if (listeners.get(topicName) != null) {
for (TopicPolicyListener<TopicPolicies> listener :
listeners.get(topicName)) {
- listener.onUpdate(null);
+ try {
+ listener.onUpdate(null);
+ } catch (Throwable error) {
+ log.error("[{}] call listener error.", topicName,
error);
+ }
}
}
return;
@@ -172,7 +176,11 @@ public class SystemTopicBasedTopicPoliciesService
implements TopicPoliciesServic
if (listeners.get(topicName) != null) {
TopicPolicies policies = event.getPolicies();
for (TopicPolicyListener<TopicPolicies> listener :
listeners.get(topicName)) {
- listener.onUpdate(policies);
+ try {
+ listener.onUpdate(policies);
+ } catch (Throwable error) {
+ log.error("[{}] call listener error.", topicName, error);
+ }
}
}
}
@@ -336,7 +344,11 @@ public class SystemTopicBasedTopicPoliciesService
implements TopicPoliciesServic
policiesCache.forEach(((topicName, topicPolicies) -> {
if (listeners.get(topicName) != null) {
for (TopicPolicyListener<TopicPolicies> listener :
listeners.get(topicName)) {
- listener.onUpdate(topicPolicies);
+ try {
+ listener.onUpdate(topicPolicies);
+ } catch (Throwable error) {
+ log.error("[{}] call listener error.",
topicName, error);
+ }
}
}
}));