This is an automated email from the ASF dual-hosted git repository.
technoboy pushed a commit to branch branch-2.11
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/branch-2.11 by this push:
new 12f9f7cbbaf [fix][broker] Fix potential exception cause the policy
service init fail. (#19746)
12f9f7cbbaf is described below
commit 12f9f7cbbaffb598cbd7712abee3e482c5487295
Author: Jiwei Guo <[email protected]>
AuthorDate: Thu Mar 9 16:18:00 2023 +0800
[fix][broker] Fix potential exception cause the policy service init fail.
(#19746)
---
.../service/SystemTopicBasedTopicPoliciesService.java | 15 +++++++--------
1 file changed, 7 insertions(+), 8 deletions(-)
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/SystemTopicBasedTopicPoliciesService.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/SystemTopicBasedTopicPoliciesService.java
index 703f5bb2589..9e5e1b4378c 100644
---
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/SystemTopicBasedTopicPoliciesService.java
+++
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/SystemTopicBasedTopicPoliciesService.java
@@ -337,20 +337,19 @@ public class SystemTopicBasedTopicPoliciesService
implements TopicPoliciesServic
return;
}
if (hasMore) {
- reader.readNextAsync().whenComplete((msg, e) -> {
- if (e != null) {
- log.error("[{}] Failed to read event from the system
topic.",
- reader.getSystemTopic().getTopicName(), e);
- future.completeExceptionally(e);
-
cleanCacheAndCloseReader(reader.getSystemTopic().getTopicName().getNamespaceObject(),
false);
- return;
- }
+ reader.readNextAsync().thenAccept(msg -> {
refreshTopicPoliciesCache(msg);
if (log.isDebugEnabled()) {
log.debug("[{}] Loop next event reading for system
topic.",
reader.getSystemTopic().getTopicName().getNamespaceObject());
}
initPolicesCache(reader, future);
+ }).exceptionally(e -> {
+ log.error("[{}] Failed to read event from the system
topic.",
+ reader.getSystemTopic().getTopicName(), e);
+ future.completeExceptionally(e);
+
cleanCacheAndCloseReader(reader.getSystemTopic().getTopicName().getNamespaceObject(),
false);
+ return null;
});
} else {
if (log.isDebugEnabled()) {