AnonHxy commented on code in PR #21030:
URL: https://github.com/apache/pulsar/pull/21030#discussion_r1299212748
##########
pulsar-broker/src/main/java/org/apache/pulsar/broker/service/SystemTopicBasedTopicPoliciesService.java:
##########
@@ -420,11 +427,26 @@ private void cleanCacheAndCloseReader(@Nonnull
NamespaceName namespace, boolean
});
}
+ private void completeResults(TopicName topicName) {
+ if (results.get(topicName) != null) {
+ BlockingDeque<CompletableFuture> futures = results.get(topicName);
+ CompletableFuture<Void> future;
+ while (true) {
+ future = futures.poll();
+ if (future == null) {
+ break;
+ }
+ future.complete(null);
+ }
+ }
+ }
+
private void readMorePolicies(SystemTopicClient.Reader<PulsarEvent>
reader) {
reader.readNextAsync()
.thenAccept(msg -> {
refreshTopicPoliciesCache(msg);
notifyListener(msg);
+
completeResults(TopicName.get(TopicName.get(msg.getKey()).getPartitionedTopicName()));
Review Comment:
I dont think this is a right way to solve the problem. For examle:
If we have three different actions, e.g., `setA` and `setB` and `setC`:
1. setA is waiting for futureA completed. The eventMsg is eventA.
2. setB is waiting for futureB completed. The eventMsg is eventB.
3. setC is waiting for futureC completed. The eventMsg is eventC.
4. The method `completeResults` will complete futureA and futureB and
futureC when read eventA. Howerver setB has not been refresh to the policies
cache.
5. So setC maybe read the old B value
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]