thetumbled commented on code in PR #21030:
URL: https://github.com/apache/pulsar/pull/21030#discussion_r1299505736
##########
pulsar-broker/src/main/java/org/apache/pulsar/broker/service/SystemTopicBasedTopicPoliciesService.java:
##########
@@ -420,11 +427,26 @@ private void cleanCacheAndCloseReader(@Nonnull
NamespaceName namespace, boolean
});
}
+ private void completeResults(TopicName topicName) {
+ if (results.get(topicName) != null) {
+ BlockingDeque<CompletableFuture> futures = results.get(topicName);
+ CompletableFuture<Void> future;
+ while (true) {
+ future = futures.poll();
+ if (future == null) {
+ break;
+ }
+ future.complete(null);
+ }
+ }
+ }
+
private void readMorePolicies(SystemTopicClient.Reader<PulsarEvent>
reader) {
reader.readNextAsync()
.thenAccept(msg -> {
refreshTopicPoliciesCache(msg);
notifyListener(msg);
+
completeResults(TopicName.get(TopicName.get(msg.getKey()).getPartitionedTopicName()));
Review Comment:
Yes, this is the corner case for this solution, which can not handle with
concurrent request for modifying topic policy. This solution fix the
synchronize case.
And i wonder whether we have the need to implement strong concurrent
controll? There are two directions:
1. add a warning to users, that the topic policy setting api can not handle
with concurrent cases
2. or, i can implement a stronger concurrent controll.
What do you think? @AnonHxy @Demogorgon314 @codelipenghui
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]