This is an automated email from the ASF dual-hosted git repository.

penghui pushed a commit to branch branch-2.9
in repository https://gitbox.apache.org/repos/asf/pulsar.git

commit 406aa4e472af612acf5377ad7def4f5e2e64de2f
Author: Zixuan Liu <[email protected]>
AuthorDate: Fri Oct 22 11:03:40 2021 +0800

    Fix the null point caused by deleting the system topic policy (#12367)
    
    Signed-off-by: Zixuan Liu <[email protected]>
    
    ### Motivation
    
    `Message<PulsarEvent>.getValue()` sometimes returns `null` in 
`SystemTopicBasedTopicPoliciesService.notifyListener()`, so we need to skip the 
messages that do not belong to the policy type, this problem can cause the 
policy service to fail to work.
    
    ### Modifications
    
    - Checks the `Message<PulsarEvent>.getValue()` value.
    - Uses the `event` instead of `null` as message value when delete policy.
    
    ### Verifying this change
    
    - [x] Make sure that the change passes the CI checks.
    
    ### Does this pull request potentially affect one of the following parts:
    
    *If `yes` was chosen, please highlight the changes*
    
      - Dependencies (does it add or upgrade a dependency): no
      - The public API: no
      - The schema: no
      - The default values of configurations: no
      - The wire protocol: no
      - The rest endpoints: no
      - The admin cli options: no
      - Anything that affects deployment: no
    
    ### Documentation
    
    Check the box below and label this PR (if you have committer privilege).
    
    Need to update docs?
    
    - [ ] doc-required
    - [x] no-need-doc
    - [ ] doc
    
    (cherry picked from commit d310e79f360f41ddabc820fd0e45af67a8a4db82)
---
 .../SystemTopicBasedTopicPoliciesService.java      | 27 +++++++++++++++-------
 .../pulsar/broker/admin/TopicPoliciesTest.java     | 23 ++++++++++++++++++
 2 files changed, 42 insertions(+), 8 deletions(-)

diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/SystemTopicBasedTopicPoliciesService.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/SystemTopicBasedTopicPoliciesService.java
index bf27736..a6c786b 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/SystemTopicBasedTopicPoliciesService.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/SystemTopicBasedTopicPoliciesService.java
@@ -126,7 +126,7 @@ public class SystemTopicBasedTopicPoliciesService 
implements TopicPoliciesServic
                                     }
                                 }
                             });
-                    })
+                        })
                 );
             }
         });
@@ -149,6 +149,17 @@ public class SystemTopicBasedTopicPoliciesService 
implements TopicPoliciesServic
     }
 
     private void notifyListener(Message<PulsarEvent> msg) {
+        // delete policies
+        if (msg.getValue() == null) {
+            TopicName topicName =  
TopicName.get(TopicName.get(msg.getKey()).getPartitionedTopicName());
+            if (listeners.get(topicName) != null) {
+                for (TopicPolicyListener<TopicPolicies> listener : 
listeners.get(topicName)) {
+                    listener.onUpdate(null);
+                }
+            }
+            return;
+        }
+
         if (!EventType.TOPIC_POLICY.equals(msg.getValue().getEventType())) {
             return;
         }
@@ -296,12 +307,11 @@ public class SystemTopicBasedTopicPoliciesService 
implements TopicPoliciesServic
                         removeOwnedNamespaceBundleAsync(bundle);
                     }
 
-            @Override
-            public boolean test(NamespaceBundle namespaceBundle) {
-                return true;
-            }
-
-        });
+                    @Override
+                    public boolean test(NamespaceBundle namespaceBundle) {
+                        return true;
+                    }
+                });
     }
 
     private void initPolicesCache(SystemTopicClient.Reader<PulsarEvent> 
reader, CompletableFuture<Void> future) {
@@ -449,7 +459,8 @@ public class SystemTopicBasedTopicPoliciesService 
implements TopicPoliciesServic
                     if (e != null) {
                         future.completeExceptionally(e);
                     }
-                    if 
(EventType.TOPIC_POLICY.equals(msg.getValue().getEventType())) {
+                    if (msg.getValue() != null
+                            && 
EventType.TOPIC_POLICY.equals(msg.getValue().getEventType())) {
                         TopicPoliciesEvent topicPoliciesEvent = 
msg.getValue().getTopicPoliciesEvent();
                         if (topicName.equals(TopicName.get(
                                 topicPoliciesEvent.getDomain(),
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/TopicPoliciesTest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/TopicPoliciesTest.java
index 5047c0c..7af136a 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/TopicPoliciesTest.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/TopicPoliciesTest.java
@@ -2688,4 +2688,27 @@ public class TopicPoliciesTest extends 
MockedPulsarServiceBaseTest {
         });
     }
 
+    @Test
+    public void testLoopCreateAndDeleteTopicPolicies() throws Exception {
+        final String topic = testTopic + UUID.randomUUID();
+
+        int n = 0;
+        while (n < 2) {
+            n++;
+            pulsarClient.newProducer().topic(topic).create().close();
+            Awaitility.await().untilAsserted(() -> {
+                
Assertions.assertThat(pulsar.getTopicPoliciesService().getTopicPolicies(TopicName.get(topic))).isNull();
+            });
+
+            admin.topics().setMaxConsumersPerSubscription(topic, 1);
+            Awaitility.await().untilAsserted(() -> {
+                
Assertions.assertThat(pulsar.getTopicPoliciesService().getTopicPolicies(TopicName.get(topic))).isNotNull();
+            });
+
+            admin.topics().delete(topic);
+            Awaitility.await().untilAsserted(() -> {
+                
Assertions.assertThat(pulsar.getTopicPoliciesService().getTopicPolicies(TopicName.get(topic))).isNull();
+            });
+        }
+    }
 }

Reply via email to