This is an automated email from the ASF dual-hosted git repository.
penghui pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/master by this push:
new f32154c [Broker] Fixed wrong behaviour caused by not cleaning up
topic policy service state. (#14503)
f32154c is described below
commit f32154c06c6475fac1cd89d105d3c31d5d8713dc
Author: Qiang Zhao <[email protected]>
AuthorDate: Wed Mar 9 09:18:39 2022 +0800
[Broker] Fixed wrong behaviour caused by not cleaning up topic policy
service state. (#14503)
---
.../SystemTopicBasedTopicPoliciesService.java | 39 +++++++++++-----------
1 file changed, 20 insertions(+), 19 deletions(-)
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/SystemTopicBasedTopicPoliciesService.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/SystemTopicBasedTopicPoliciesService.java
index 9b8c69e..bbb0257 100644
---
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/SystemTopicBasedTopicPoliciesService.java
+++
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/SystemTopicBasedTopicPoliciesService.java
@@ -23,6 +23,7 @@ import com.google.common.collect.Lists;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
+import java.util.Objects;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.TimeUnit;
@@ -255,8 +256,7 @@ public class SystemTopicBasedTopicPoliciesService
implements TopicPoliciesServic
if (ex != null) {
log.error("[{}] Failed to create reader on __change_events
topic", namespace, ex);
result.completeExceptionally(ex);
- readerCaches.remove(namespace);
- reader.closeAsync();
+
cleanCacheAndCloseReader(reader.getSystemTopic().getTopicName().getNamespaceObject(),
false);
} else {
initPolicesCache(reader, result);
result.thenRun(() -> readMorePolicies(reader));
@@ -290,14 +290,7 @@ public class SystemTopicBasedTopicPoliciesService
implements TopicPoliciesServic
}
AtomicInteger bundlesCount =
ownedBundlesCountPerNamespace.get(namespace);
if (bundlesCount == null || bundlesCount.decrementAndGet() <= 0) {
- CompletableFuture<SystemTopicClient.Reader<PulsarEvent>>
readerCompletableFuture =
- readerCaches.remove(namespace);
- if (readerCompletableFuture != null) {
-
readerCompletableFuture.thenAccept(SystemTopicClient.Reader::closeAsync);
- ownedBundlesCountPerNamespace.remove(namespace);
- policyCacheInitMap.remove(namespace);
- policiesCache.entrySet().removeIf(entry ->
entry.getKey().getNamespaceObject().equals(namespace));
- }
+ cleanCacheAndCloseReader(namespace, true);
}
return CompletableFuture.completedFuture(null);
}
@@ -331,9 +324,7 @@ public class SystemTopicBasedTopicPoliciesService
implements TopicPoliciesServic
log.error("[{}] Failed to check the move events for the system
topic",
reader.getSystemTopic().getTopicName(), ex);
future.completeExceptionally(ex);
-
readerCaches.remove(reader.getSystemTopic().getTopicName().getNamespaceObject());
-
policyCacheInitMap.remove(reader.getSystemTopic().getTopicName().getNamespaceObject());
- reader.closeAsync();
+
cleanCacheAndCloseReader(reader.getSystemTopic().getTopicName().getNamespaceObject(),
false);
return;
}
if (hasMore) {
@@ -342,9 +333,7 @@ public class SystemTopicBasedTopicPoliciesService
implements TopicPoliciesServic
log.error("[{}] Failed to read event from the system
topic.",
reader.getSystemTopic().getTopicName(), e);
future.completeExceptionally(e);
-
readerCaches.remove(reader.getSystemTopic().getTopicName().getNamespaceObject());
-
policyCacheInitMap.remove(reader.getSystemTopic().getTopicName().getNamespaceObject());
- reader.closeAsync();
+
cleanCacheAndCloseReader(reader.getSystemTopic().getTopicName().getNamespaceObject(),
false);
return;
}
refreshTopicPoliciesCache(msg);
@@ -373,6 +362,18 @@ public class SystemTopicBasedTopicPoliciesService
implements TopicPoliciesServic
});
}
+ private void cleanCacheAndCloseReader(NamespaceName namespace, boolean
cleanOwnedBundlesCount) {
+ CompletableFuture<SystemTopicClient.Reader<PulsarEvent>> readerFuture
= readerCaches.remove(namespace);
+ policiesCache.entrySet().removeIf(entry ->
Objects.equals(entry.getKey().getNamespaceObject(), namespace));
+ if (cleanOwnedBundlesCount) {
+ ownedBundlesCountPerNamespace.remove(namespace);
+ }
+ if (readerFuture != null && !readerFuture.isCompletedExceptionally()) {
+ readerFuture.thenAccept(SystemTopicClient.Reader::closeAsync);
+ }
+ policyCacheInitMap.remove(namespace);
+ }
+
private void readMorePolicies(SystemTopicClient.Reader<PulsarEvent>
reader) {
reader.readNextAsync().whenComplete((msg, ex) -> {
if (ex == null) {
@@ -382,10 +383,10 @@ public class SystemTopicBasedTopicPoliciesService
implements TopicPoliciesServic
} else {
if (ex instanceof
PulsarClientException.AlreadyClosedException) {
log.error("Read more topic policies exception, close the
read now!", ex);
- NamespaceName namespace =
reader.getSystemTopic().getTopicName().getNamespaceObject();
- ownedBundlesCountPerNamespace.remove(namespace);
- readerCaches.remove(namespace);
+ cleanCacheAndCloseReader(
+
reader.getSystemTopic().getTopicName().getNamespaceObject(), false);
} else {
+ log.warn("Read more topic polices exception, read again.",
ex);
readMorePolicies(reader);
}
}