This is an automated email from the ASF dual-hosted git repository.
mattisonchao pushed a commit to branch branch-2.9
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/branch-2.9 by this push:
new db8480b66f0 [fix][broker] Topic policy reader can't recover when get
any exception (#17562)
db8480b66f0 is described below
commit db8480b66f0634193e85de01358a1e34648d5595
Author: Qiang Zhao <[email protected]>
AuthorDate: Tue Sep 13 09:06:25 2022 +0800
[fix][broker] Topic policy reader can't recover when get any exception
(#17562)
(cherry picked from commit 5aa1f1101c6fef61163b34558cc98bf362dfa969)
---
.../SystemTopicBasedTopicPoliciesService.java | 28 ++++++++++++----------
1 file changed, 16 insertions(+), 12 deletions(-)
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/SystemTopicBasedTopicPoliciesService.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/SystemTopicBasedTopicPoliciesService.java
index a5bcaa5728d..38ec5d803c6 100644
---
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/SystemTopicBasedTopicPoliciesService.java
+++
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/SystemTopicBasedTopicPoliciesService.java
@@ -27,6 +27,7 @@ import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
+import javax.annotation.Nonnull;
import org.apache.pulsar.broker.PulsarServerException;
import org.apache.pulsar.broker.PulsarService;
import org.apache.pulsar.broker.namespace.NamespaceBundleOwnershipListener;
@@ -230,21 +231,20 @@ public class SystemTopicBasedTopicPoliciesService
implements TopicPoliciesServic
return result;
}
- private void prepareInitPoliciesCache(NamespaceName namespace,
CompletableFuture<Void> result) {
+ private void prepareInitPoliciesCache(@Nonnull NamespaceName namespace,
CompletableFuture<Void> result) {
if (policyCacheInitMap.putIfAbsent(namespace, false) == null) {
CompletableFuture<SystemTopicClient.Reader<PulsarEvent>>
readerCompletableFuture =
createSystemTopicClientWithRetry(namespace);
readerCaches.put(namespace, readerCompletableFuture);
ownedBundlesCountPerNamespace.putIfAbsent(namespace, new
AtomicInteger(1));
- readerCompletableFuture.whenComplete((reader, ex) -> {
- if (ex != null) {
- log.error("[{}] Failed to create reader on __change_events
topic", namespace, ex);
- result.completeExceptionally(ex);
-
cleanCacheAndCloseReader(reader.getSystemTopic().getTopicName().getNamespaceObject(),
false);
- } else {
- initPolicesCache(reader, result);
- result.thenRun(() -> readMorePolicies(reader));
- }
+ readerCompletableFuture.thenAccept(reader -> {
+ initPolicesCache(reader, result);
+ result.thenRun(() -> readMorePolicies(reader));
+ }).exceptionally(ex -> {
+ log.error("[{}] Failed to create reader on __change_events
topic", namespace, ex);
+ cleanCacheAndCloseReader(namespace, false);
+ result.completeExceptionally(ex);
+ return null;
});
}
}
@@ -346,14 +346,18 @@ public class SystemTopicBasedTopicPoliciesService
implements TopicPoliciesServic
});
}
- private void cleanCacheAndCloseReader(NamespaceName namespace, boolean
cleanOwnedBundlesCount) {
+ private void cleanCacheAndCloseReader(@Nonnull NamespaceName namespace,
boolean cleanOwnedBundlesCount) {
CompletableFuture<SystemTopicClient.Reader<PulsarEvent>> readerFuture
= readerCaches.remove(namespace);
policiesCache.entrySet().removeIf(entry ->
Objects.equals(entry.getKey().getNamespaceObject(), namespace));
if (cleanOwnedBundlesCount) {
ownedBundlesCountPerNamespace.remove(namespace);
}
if (readerFuture != null && !readerFuture.isCompletedExceptionally()) {
- readerFuture.thenAccept(SystemTopicClient.Reader::closeAsync);
+ readerFuture.thenCompose(SystemTopicClient.Reader::closeAsync)
+ .exceptionally(ex -> {
+ log.warn("[{}] Close change_event reader fail.",
namespace, ex);
+ return null;
+ });
}
policyCacheInitMap.remove(namespace);
}