This is an automated email from the ASF dual-hosted git repository.

penghui pushed a commit to branch branch-2.9
in repository https://gitbox.apache.org/repos/asf/pulsar.git

commit 4a8d6ef06af4619f41049e2184283c77c5c0bb3c
Author: Yong Zhang <[email protected]>
AuthorDate: Thu Dec 23 19:27:23 2021 +0800

    Fixes the NPE in system topics policies service (#13469)
    
    ---
    
    *Motivation*
    
    The `namespaceEventsSystemTopicFactory` is created when you will
    use it. But the `createSystemTopicFactoryIfNeeded()` may failed
    which will cause the `namespaceEventsSystemTopicFactory` is null
    and throw a NPE error from the method.
    
    *Modifications*
    
    - throw the error and failed the method when there has exceptions in
    `createSystemTopicFactoryIfNeeded()`
    
    (cherry picked from commit 4022b2884f46bb5e1593da419bb226ad1e0fc768)
---
 .../SystemTopicBasedTopicPoliciesService.java      | 31 +++++++++++++++++-----
 1 file changed, 24 insertions(+), 7 deletions(-)

diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/SystemTopicBasedTopicPoliciesService.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/SystemTopicBasedTopicPoliciesService.java
index 6662202..bf27736 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/SystemTopicBasedTopicPoliciesService.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/SystemTopicBasedTopicPoliciesService.java
@@ -88,9 +88,13 @@ public class SystemTopicBasedTopicPoliciesService implements 
TopicPoliciesServic
 
     private CompletableFuture<Void> sendTopicPolicyEvent(TopicName topicName, 
ActionType actionType,
                                                          TopicPolicies 
policies) {
-        createSystemTopicFactoryIfNeeded();
-
         CompletableFuture<Void> result = new CompletableFuture<>();
+        try {
+            createSystemTopicFactoryIfNeeded();
+        } catch (PulsarServerException e) {
+            result.completeExceptionally(e);
+            return result;
+        }
 
         SystemTopicClient<PulsarEvent> systemTopicClient =
                 
namespaceEventsSystemTopicFactory.createTopicPoliciesSystemTopicClient(topicName.getNamespaceObject());
@@ -180,8 +184,9 @@ public class SystemTopicBasedTopicPoliciesService 
implements TopicPoliciesServic
     @Override
     public CompletableFuture<TopicPolicies> 
getTopicPoliciesBypassCacheAsync(TopicName topicName) {
         CompletableFuture<TopicPolicies> result = new CompletableFuture<>();
-        createSystemTopicFactoryIfNeeded();
-        if (namespaceEventsSystemTopicFactory == null) {
+        try {
+            createSystemTopicFactoryIfNeeded();
+        } catch (PulsarServerException e) {
             result.complete(null);
             return result;
         }
@@ -201,7 +206,6 @@ public class SystemTopicBasedTopicPoliciesService 
implements TopicPoliciesServic
             result.complete(null);
             return result;
         }
-        createSystemTopicFactoryIfNeeded();
         synchronized (this) {
             if (readerCaches.get(namespace) != null) {
                 ownedBundlesCountPerNamespace.get(namespace).incrementAndGet();
@@ -235,9 +239,15 @@ public class SystemTopicBasedTopicPoliciesService 
implements TopicPoliciesServic
 
     protected CompletableFuture<SystemTopicClient.Reader<PulsarEvent>> 
creatSystemTopicClientWithRetry(
             NamespaceName namespace) {
+        CompletableFuture<SystemTopicClient.Reader<PulsarEvent>> result = new 
CompletableFuture<>();
+        try {
+            createSystemTopicFactoryIfNeeded();
+        } catch (PulsarServerException e) {
+            result.completeExceptionally(e);
+            return result;
+        }
         SystemTopicClient<PulsarEvent> systemTopicClient = 
namespaceEventsSystemTopicFactory
                 .createTopicPoliciesSystemTopicClient(namespace);
-        CompletableFuture<SystemTopicClient.Reader<PulsarEvent>> result = new 
CompletableFuture<>();
         Backoff backoff = new Backoff(1, TimeUnit.SECONDS, 3, 
TimeUnit.SECONDS, 10, TimeUnit.SECONDS);
         RetryUtil.retryAsynchronously(() -> {
             try {
@@ -386,6 +396,12 @@ public class SystemTopicBasedTopicPoliciesService 
implements TopicPoliciesServic
                     // However, due to compatibility, it is temporarily 
retained here
                     // and can be deleted in the future.
                     policiesCache.remove(topicName);
+                    try {
+                        createSystemTopicFactoryIfNeeded();
+                    } catch (PulsarServerException e) {
+                        log.error("Failed to create system topic factory");
+                        break;
+                    }
                     SystemTopicClient<PulsarEvent> systemTopicClient = 
namespaceEventsSystemTopicFactory
                             
.createTopicPoliciesSystemTopicClient(topicName.getNamespaceObject());
                     systemTopicClient.newWriterAsync().thenAccept(writer
@@ -405,7 +421,7 @@ public class SystemTopicBasedTopicPoliciesService 
implements TopicPoliciesServic
         }
     }
 
-    private void createSystemTopicFactoryIfNeeded() {
+    private void createSystemTopicFactoryIfNeeded() throws 
PulsarServerException {
         if (namespaceEventsSystemTopicFactory == null) {
             synchronized (this) {
                 if (namespaceEventsSystemTopicFactory == null) {
@@ -414,6 +430,7 @@ public class SystemTopicBasedTopicPoliciesService 
implements TopicPoliciesServic
                                 new 
NamespaceEventsSystemTopicFactory(pulsarService.getClient());
                     } catch (PulsarServerException e) {
                         log.error("Create namespace event system topic factory 
error.", e);
+                        throw e;
                     }
                 }
             }

Reply via email to