This is an automated email from the ASF dual-hosted git repository. penghui pushed a commit to branch branch-2.10 in repository https://gitbox.apache.org/repos/asf/pulsar.git
commit 086d7109a213ca6d1ea362b28a9e24a62f890141 Author: Qiang Zhao <[email protected]> AuthorDate: Wed Mar 9 09:18:39 2022 +0800 [Broker] Fixed wrong behaviour caused by not cleaning up topic policy service state. (#14503) (cherry picked from commit f32154c06c6475fac1cd89d105d3c31d5d8713dc) --- .../SystemTopicBasedTopicPoliciesService.java | 39 +++++++++++----------- 1 file changed, 20 insertions(+), 19 deletions(-) diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/SystemTopicBasedTopicPoliciesService.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/SystemTopicBasedTopicPoliciesService.java index 9b8c69e..bbb0257 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/SystemTopicBasedTopicPoliciesService.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/SystemTopicBasedTopicPoliciesService.java @@ -23,6 +23,7 @@ import com.google.common.collect.Lists; import java.util.HashSet; import java.util.List; import java.util.Map; +import java.util.Objects; import java.util.concurrent.CompletableFuture; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.TimeUnit; @@ -255,8 +256,7 @@ public class SystemTopicBasedTopicPoliciesService implements TopicPoliciesServic if (ex != null) { log.error("[{}] Failed to create reader on __change_events topic", namespace, ex); result.completeExceptionally(ex); - readerCaches.remove(namespace); - reader.closeAsync(); + cleanCacheAndCloseReader(reader.getSystemTopic().getTopicName().getNamespaceObject(), false); } else { initPolicesCache(reader, result); result.thenRun(() -> readMorePolicies(reader)); @@ -290,14 +290,7 @@ public class SystemTopicBasedTopicPoliciesService implements TopicPoliciesServic } AtomicInteger bundlesCount = ownedBundlesCountPerNamespace.get(namespace); if (bundlesCount == null || bundlesCount.decrementAndGet() <= 0) { - CompletableFuture<SystemTopicClient.Reader<PulsarEvent>> readerCompletableFuture = - readerCaches.remove(namespace); - if (readerCompletableFuture != null) { - readerCompletableFuture.thenAccept(SystemTopicClient.Reader::closeAsync); - ownedBundlesCountPerNamespace.remove(namespace); - policyCacheInitMap.remove(namespace); - policiesCache.entrySet().removeIf(entry -> entry.getKey().getNamespaceObject().equals(namespace)); - } + cleanCacheAndCloseReader(namespace, true); } return CompletableFuture.completedFuture(null); } @@ -331,9 +324,7 @@ public class SystemTopicBasedTopicPoliciesService implements TopicPoliciesServic log.error("[{}] Failed to check the move events for the system topic", reader.getSystemTopic().getTopicName(), ex); future.completeExceptionally(ex); - readerCaches.remove(reader.getSystemTopic().getTopicName().getNamespaceObject()); - policyCacheInitMap.remove(reader.getSystemTopic().getTopicName().getNamespaceObject()); - reader.closeAsync(); + cleanCacheAndCloseReader(reader.getSystemTopic().getTopicName().getNamespaceObject(), false); return; } if (hasMore) { @@ -342,9 +333,7 @@ public class SystemTopicBasedTopicPoliciesService implements TopicPoliciesServic log.error("[{}] Failed to read event from the system topic.", reader.getSystemTopic().getTopicName(), e); future.completeExceptionally(e); - readerCaches.remove(reader.getSystemTopic().getTopicName().getNamespaceObject()); - policyCacheInitMap.remove(reader.getSystemTopic().getTopicName().getNamespaceObject()); - reader.closeAsync(); + cleanCacheAndCloseReader(reader.getSystemTopic().getTopicName().getNamespaceObject(), false); return; } refreshTopicPoliciesCache(msg); @@ -373,6 +362,18 @@ public class SystemTopicBasedTopicPoliciesService implements TopicPoliciesServic }); } + private void cleanCacheAndCloseReader(NamespaceName namespace, boolean cleanOwnedBundlesCount) { + CompletableFuture<SystemTopicClient.Reader<PulsarEvent>> readerFuture = readerCaches.remove(namespace); + policiesCache.entrySet().removeIf(entry -> Objects.equals(entry.getKey().getNamespaceObject(), namespace)); + if (cleanOwnedBundlesCount) { + ownedBundlesCountPerNamespace.remove(namespace); + } + if (readerFuture != null && !readerFuture.isCompletedExceptionally()) { + readerFuture.thenAccept(SystemTopicClient.Reader::closeAsync); + } + policyCacheInitMap.remove(namespace); + } + private void readMorePolicies(SystemTopicClient.Reader<PulsarEvent> reader) { reader.readNextAsync().whenComplete((msg, ex) -> { if (ex == null) { @@ -382,10 +383,10 @@ public class SystemTopicBasedTopicPoliciesService implements TopicPoliciesServic } else { if (ex instanceof PulsarClientException.AlreadyClosedException) { log.error("Read more topic policies exception, close the read now!", ex); - NamespaceName namespace = reader.getSystemTopic().getTopicName().getNamespaceObject(); - ownedBundlesCountPerNamespace.remove(namespace); - readerCaches.remove(namespace); + cleanCacheAndCloseReader( + reader.getSystemTopic().getTopicName().getNamespaceObject(), false); } else { + log.warn("Read more topic polices exception, read again.", ex); readMorePolicies(reader); } }
