This is an automated email from the ASF dual-hosted git repository.

penghui pushed a commit to branch branch-2.8
in repository https://gitbox.apache.org/repos/asf/pulsar.git

commit 6430c541dc3971f58a26b915a8458812a646fe40
Author: lipenghui <[email protected]>
AuthorDate: Sun Jun 27 20:30:15 2021 +0800

    Fix race condition of the SystemTopicBasedTopicPoliciesService (#11097)
    
    ### Motivation
    
    Currently, we are triggering the reader to read more messages not waiting 
for the init policies cache to complete,
    This might lead to the init process got hasMessages=true but not able to 
read the message since the message has been
    consumed by the read more entries process will lead to the `topic policy 
cache not init` exception.
    
    Here are the details of the race condition:
    
    
https://github.com/apache/pulsar/blob/0b67438d23bbbc46b500e896a18aad715a514fd9/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/SystemTopicBasedTopicPoliciesService.java#L190
    
    
https://github.com/apache/pulsar/blob/0b67438d23bbbc46b500e896a18aad715a514fd9/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/SystemTopicBasedTopicPoliciesService.java#L248
    
    
https://github.com/apache/pulsar/blob/0b67438d23bbbc46b500e896a18aad715a514fd9/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/SystemTopicBasedTopicPoliciesService.java#L275
    
    (cherry picked from commit 81063c04870ba7fa26222e57e4d4e94145e0a1e0)
---
 .../pulsar/broker/service/SystemTopicBasedTopicPoliciesService.java  | 5 +++--
 .../test/java/org/apache/pulsar/broker/admin/TopicPoliciesTest.java  | 5 +++--
 2 files changed, 6 insertions(+), 4 deletions(-)

diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/SystemTopicBasedTopicPoliciesService.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/SystemTopicBasedTopicPoliciesService.java
index 87ff3b8..e2d2e74 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/SystemTopicBasedTopicPoliciesService.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/SystemTopicBasedTopicPoliciesService.java
@@ -188,7 +188,7 @@ public class SystemTopicBasedTopicPoliciesService 
implements TopicPoliciesServic
                         result.completeExceptionally(ex);
                     } else {
                         initPolicesCache(reader, result);
-                        readMorePolicies(reader);
+                        result.thenRun(() -> readMorePolicies(reader));
                     }
                 });
             }
@@ -254,6 +254,7 @@ public class SystemTopicBasedTopicPoliciesService 
implements TopicPoliciesServic
                         
readerCaches.remove(reader.getSystemTopic().getTopicName().getNamespaceObject());
                     }
                     refreshTopicPoliciesCache(msg);
+                    notifyListener(msg);
                     if (log.isDebugEnabled()) {
                         log.debug("[{}] Loop next event reading for system 
topic.",
                                 
reader.getSystemTopic().getTopicName().getNamespaceObject());
@@ -264,9 +265,9 @@ public class SystemTopicBasedTopicPoliciesService 
implements TopicPoliciesServic
                 if (log.isDebugEnabled()) {
                     log.debug("[{}] Reach the end of the system topic.", 
reader.getSystemTopic().getTopicName());
                 }
-                future.complete(null);
                 policyCacheInitMap.computeIfPresent(
                         
reader.getSystemTopic().getTopicName().getNamespaceObject(), (k, v) -> true);
+                future.complete(null);
             }
         });
     }
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/TopicPoliciesTest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/TopicPoliciesTest.java
index f66f464..ecd53b2 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/TopicPoliciesTest.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/TopicPoliciesTest.java
@@ -947,8 +947,9 @@ public class TopicPoliciesTest extends 
MockedPulsarServiceBaseTest {
                 new 
InactiveTopicPolicies(InactiveTopicDeleteMode.delete_when_no_subscriptions,200,false);
         admin.topics().setInactiveTopicPolicies(topic, inactiveTopicPolicies);
 
+        InactiveTopicPolicies finalInactiveTopicPolicies = 
inactiveTopicPolicies;
         Awaitility.await()
-                .untilAsserted(() -> 
Assert.assertNotNull(admin.topics().getInactiveTopicPolicies(topic)));
+                .untilAsserted(() -> 
Assert.assertEquals(admin.topics().getInactiveTopicPolicies(topic), 
finalInactiveTopicPolicies));
 
         // restart broker, policy should still take effect
         restartBroker();
@@ -956,7 +957,7 @@ public class TopicPoliciesTest extends 
MockedPulsarServiceBaseTest {
         // Trigger the cache init.
         Producer<byte[]> producer = 
pulsarClient.newProducer().topic(topic).create();
 
-        InactiveTopicPolicies finalInactiveTopicPolicies = 
inactiveTopicPolicies;
+
         Awaitility.await()
                 .untilAsserted(() -> {
                     PersistentTopic persistentTopic = (PersistentTopic) 
pulsar.getBrokerService().getTopicIfExists(topic).get().get();

Reply via email to