This is an automated email from the ASF dual-hosted git repository. penghui pushed a commit to branch branch-2.9 in repository https://gitbox.apache.org/repos/asf/pulsar.git
commit 406aa4e472af612acf5377ad7def4f5e2e64de2f Author: Zixuan Liu <[email protected]> AuthorDate: Fri Oct 22 11:03:40 2021 +0800 Fix the null point caused by deleting the system topic policy (#12367) Signed-off-by: Zixuan Liu <[email protected]> ### Motivation `Message<PulsarEvent>.getValue()` sometimes returns `null` in `SystemTopicBasedTopicPoliciesService.notifyListener()`, so we need to skip the messages that do not belong to the policy type, this problem can cause the policy service to fail to work. ### Modifications - Checks the `Message<PulsarEvent>.getValue()` value. - Uses the `event` instead of `null` as message value when delete policy. ### Verifying this change - [x] Make sure that the change passes the CI checks. ### Does this pull request potentially affect one of the following parts: *If `yes` was chosen, please highlight the changes* - Dependencies (does it add or upgrade a dependency): no - The public API: no - The schema: no - The default values of configurations: no - The wire protocol: no - The rest endpoints: no - The admin cli options: no - Anything that affects deployment: no ### Documentation Check the box below and label this PR (if you have committer privilege). Need to update docs? - [ ] doc-required - [x] no-need-doc - [ ] doc (cherry picked from commit d310e79f360f41ddabc820fd0e45af67a8a4db82) --- .../SystemTopicBasedTopicPoliciesService.java | 27 +++++++++++++++------- .../pulsar/broker/admin/TopicPoliciesTest.java | 23 ++++++++++++++++++ 2 files changed, 42 insertions(+), 8 deletions(-) diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/SystemTopicBasedTopicPoliciesService.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/SystemTopicBasedTopicPoliciesService.java index bf27736..a6c786b 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/SystemTopicBasedTopicPoliciesService.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/SystemTopicBasedTopicPoliciesService.java @@ -126,7 +126,7 @@ public class SystemTopicBasedTopicPoliciesService implements TopicPoliciesServic } } }); - }) + }) ); } }); @@ -149,6 +149,17 @@ public class SystemTopicBasedTopicPoliciesService implements TopicPoliciesServic } private void notifyListener(Message<PulsarEvent> msg) { + // delete policies + if (msg.getValue() == null) { + TopicName topicName = TopicName.get(TopicName.get(msg.getKey()).getPartitionedTopicName()); + if (listeners.get(topicName) != null) { + for (TopicPolicyListener<TopicPolicies> listener : listeners.get(topicName)) { + listener.onUpdate(null); + } + } + return; + } + if (!EventType.TOPIC_POLICY.equals(msg.getValue().getEventType())) { return; } @@ -296,12 +307,11 @@ public class SystemTopicBasedTopicPoliciesService implements TopicPoliciesServic removeOwnedNamespaceBundleAsync(bundle); } - @Override - public boolean test(NamespaceBundle namespaceBundle) { - return true; - } - - }); + @Override + public boolean test(NamespaceBundle namespaceBundle) { + return true; + } + }); } private void initPolicesCache(SystemTopicClient.Reader<PulsarEvent> reader, CompletableFuture<Void> future) { @@ -449,7 +459,8 @@ public class SystemTopicBasedTopicPoliciesService implements TopicPoliciesServic if (e != null) { future.completeExceptionally(e); } - if (EventType.TOPIC_POLICY.equals(msg.getValue().getEventType())) { + if (msg.getValue() != null + && EventType.TOPIC_POLICY.equals(msg.getValue().getEventType())) { TopicPoliciesEvent topicPoliciesEvent = msg.getValue().getTopicPoliciesEvent(); if (topicName.equals(TopicName.get( topicPoliciesEvent.getDomain(), diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/TopicPoliciesTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/TopicPoliciesTest.java index 5047c0c..7af136a 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/TopicPoliciesTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/TopicPoliciesTest.java @@ -2688,4 +2688,27 @@ public class TopicPoliciesTest extends MockedPulsarServiceBaseTest { }); } + @Test + public void testLoopCreateAndDeleteTopicPolicies() throws Exception { + final String topic = testTopic + UUID.randomUUID(); + + int n = 0; + while (n < 2) { + n++; + pulsarClient.newProducer().topic(topic).create().close(); + Awaitility.await().untilAsserted(() -> { + Assertions.assertThat(pulsar.getTopicPoliciesService().getTopicPolicies(TopicName.get(topic))).isNull(); + }); + + admin.topics().setMaxConsumersPerSubscription(topic, 1); + Awaitility.await().untilAsserted(() -> { + Assertions.assertThat(pulsar.getTopicPoliciesService().getTopicPolicies(TopicName.get(topic))).isNotNull(); + }); + + admin.topics().delete(topic); + Awaitility.await().untilAsserted(() -> { + Assertions.assertThat(pulsar.getTopicPoliciesService().getTopicPolicies(TopicName.get(topic))).isNull(); + }); + } + } }
