This is an automated email from the ASF dual-hosted git repository. penghui pushed a commit to branch branch-2.8 in repository https://gitbox.apache.org/repos/asf/pulsar.git
commit 6430c541dc3971f58a26b915a8458812a646fe40 Author: lipenghui <[email protected]> AuthorDate: Sun Jun 27 20:30:15 2021 +0800 Fix race condition of the SystemTopicBasedTopicPoliciesService (#11097) ### Motivation Currently, we are triggering the reader to read more messages not waiting for the init policies cache to complete, This might lead to the init process got hasMessages=true but not able to read the message since the message has been consumed by the read more entries process will lead to the `topic policy cache not init` exception. Here are the details of the race condition: https://github.com/apache/pulsar/blob/0b67438d23bbbc46b500e896a18aad715a514fd9/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/SystemTopicBasedTopicPoliciesService.java#L190 https://github.com/apache/pulsar/blob/0b67438d23bbbc46b500e896a18aad715a514fd9/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/SystemTopicBasedTopicPoliciesService.java#L248 https://github.com/apache/pulsar/blob/0b67438d23bbbc46b500e896a18aad715a514fd9/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/SystemTopicBasedTopicPoliciesService.java#L275 (cherry picked from commit 81063c04870ba7fa26222e57e4d4e94145e0a1e0) --- .../pulsar/broker/service/SystemTopicBasedTopicPoliciesService.java | 5 +++-- .../test/java/org/apache/pulsar/broker/admin/TopicPoliciesTest.java | 5 +++-- 2 files changed, 6 insertions(+), 4 deletions(-) diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/SystemTopicBasedTopicPoliciesService.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/SystemTopicBasedTopicPoliciesService.java index 87ff3b8..e2d2e74 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/SystemTopicBasedTopicPoliciesService.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/SystemTopicBasedTopicPoliciesService.java @@ -188,7 +188,7 @@ public class SystemTopicBasedTopicPoliciesService implements TopicPoliciesServic result.completeExceptionally(ex); } else { initPolicesCache(reader, result); - readMorePolicies(reader); + result.thenRun(() -> readMorePolicies(reader)); } }); } @@ -254,6 +254,7 @@ public class SystemTopicBasedTopicPoliciesService implements TopicPoliciesServic readerCaches.remove(reader.getSystemTopic().getTopicName().getNamespaceObject()); } refreshTopicPoliciesCache(msg); + notifyListener(msg); if (log.isDebugEnabled()) { log.debug("[{}] Loop next event reading for system topic.", reader.getSystemTopic().getTopicName().getNamespaceObject()); @@ -264,9 +265,9 @@ public class SystemTopicBasedTopicPoliciesService implements TopicPoliciesServic if (log.isDebugEnabled()) { log.debug("[{}] Reach the end of the system topic.", reader.getSystemTopic().getTopicName()); } - future.complete(null); policyCacheInitMap.computeIfPresent( reader.getSystemTopic().getTopicName().getNamespaceObject(), (k, v) -> true); + future.complete(null); } }); } diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/TopicPoliciesTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/TopicPoliciesTest.java index f66f464..ecd53b2 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/TopicPoliciesTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/TopicPoliciesTest.java @@ -947,8 +947,9 @@ public class TopicPoliciesTest extends MockedPulsarServiceBaseTest { new InactiveTopicPolicies(InactiveTopicDeleteMode.delete_when_no_subscriptions,200,false); admin.topics().setInactiveTopicPolicies(topic, inactiveTopicPolicies); + InactiveTopicPolicies finalInactiveTopicPolicies = inactiveTopicPolicies; Awaitility.await() - .untilAsserted(() -> Assert.assertNotNull(admin.topics().getInactiveTopicPolicies(topic))); + .untilAsserted(() -> Assert.assertEquals(admin.topics().getInactiveTopicPolicies(topic), finalInactiveTopicPolicies)); // restart broker, policy should still take effect restartBroker(); @@ -956,7 +957,7 @@ public class TopicPoliciesTest extends MockedPulsarServiceBaseTest { // Trigger the cache init. Producer<byte[]> producer = pulsarClient.newProducer().topic(topic).create(); - InactiveTopicPolicies finalInactiveTopicPolicies = inactiveTopicPolicies; + Awaitility.await() .untilAsserted(() -> { PersistentTopic persistentTopic = (PersistentTopic) pulsar.getBrokerService().getTopicIfExists(topic).get().get();
