thetumbled commented on code in PR #21030:
URL: https://github.com/apache/pulsar/pull/21030#discussion_r1299505736


##########
pulsar-broker/src/main/java/org/apache/pulsar/broker/service/SystemTopicBasedTopicPoliciesService.java:
##########
@@ -420,11 +427,26 @@ private void cleanCacheAndCloseReader(@Nonnull 
NamespaceName namespace, boolean
         });
     }
 
+    private void completeResults(TopicName topicName) {
+        if (results.get(topicName) != null) {
+            BlockingDeque<CompletableFuture> futures = results.get(topicName);
+            CompletableFuture<Void> future;
+            while (true) {
+                future = futures.poll();
+                if (future == null) {
+                    break;
+                }
+                future.complete(null);
+            }
+        }
+    }
+
     private void readMorePolicies(SystemTopicClient.Reader<PulsarEvent> 
reader) {
         reader.readNextAsync()
                 .thenAccept(msg -> {
                     refreshTopicPoliciesCache(msg);
                     notifyListener(msg);
+                    
completeResults(TopicName.get(TopicName.get(msg.getKey()).getPartitionedTopicName()));

Review Comment:
   Yes, this is the corner case for this solution, which can not handle with 
concurrent request for modifying topic policy. This solution fix the 
synchronize case. 
   And i wonder whether we have the need to implement strong concurrent 
controll? There are two directions:
   1. add a warning to users, that the topic policy setting api can not handle 
with concurrent cases
   2. or, i can implement a stronger concurrent controll.
   What do you think? @AnonHxy @Demogorgon314 @codelipenghui 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to