This is an automated email from the ASF dual-hosted git repository. penghui pushed a commit to branch branch-2.9 in repository https://gitbox.apache.org/repos/asf/pulsar.git
commit 4a8d6ef06af4619f41049e2184283c77c5c0bb3c Author: Yong Zhang <[email protected]> AuthorDate: Thu Dec 23 19:27:23 2021 +0800 Fixes the NPE in system topics policies service (#13469) --- *Motivation* The `namespaceEventsSystemTopicFactory` is created when you will use it. But the `createSystemTopicFactoryIfNeeded()` may failed which will cause the `namespaceEventsSystemTopicFactory` is null and throw a NPE error from the method. *Modifications* - throw the error and failed the method when there has exceptions in `createSystemTopicFactoryIfNeeded()` (cherry picked from commit 4022b2884f46bb5e1593da419bb226ad1e0fc768) --- .../SystemTopicBasedTopicPoliciesService.java | 31 +++++++++++++++++----- 1 file changed, 24 insertions(+), 7 deletions(-) diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/SystemTopicBasedTopicPoliciesService.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/SystemTopicBasedTopicPoliciesService.java index 6662202..bf27736 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/SystemTopicBasedTopicPoliciesService.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/SystemTopicBasedTopicPoliciesService.java @@ -88,9 +88,13 @@ public class SystemTopicBasedTopicPoliciesService implements TopicPoliciesServic private CompletableFuture<Void> sendTopicPolicyEvent(TopicName topicName, ActionType actionType, TopicPolicies policies) { - createSystemTopicFactoryIfNeeded(); - CompletableFuture<Void> result = new CompletableFuture<>(); + try { + createSystemTopicFactoryIfNeeded(); + } catch (PulsarServerException e) { + result.completeExceptionally(e); + return result; + } SystemTopicClient<PulsarEvent> systemTopicClient = namespaceEventsSystemTopicFactory.createTopicPoliciesSystemTopicClient(topicName.getNamespaceObject()); @@ -180,8 +184,9 @@ public class SystemTopicBasedTopicPoliciesService implements TopicPoliciesServic @Override public CompletableFuture<TopicPolicies> getTopicPoliciesBypassCacheAsync(TopicName topicName) { CompletableFuture<TopicPolicies> result = new CompletableFuture<>(); - createSystemTopicFactoryIfNeeded(); - if (namespaceEventsSystemTopicFactory == null) { + try { + createSystemTopicFactoryIfNeeded(); + } catch (PulsarServerException e) { result.complete(null); return result; } @@ -201,7 +206,6 @@ public class SystemTopicBasedTopicPoliciesService implements TopicPoliciesServic result.complete(null); return result; } - createSystemTopicFactoryIfNeeded(); synchronized (this) { if (readerCaches.get(namespace) != null) { ownedBundlesCountPerNamespace.get(namespace).incrementAndGet(); @@ -235,9 +239,15 @@ public class SystemTopicBasedTopicPoliciesService implements TopicPoliciesServic protected CompletableFuture<SystemTopicClient.Reader<PulsarEvent>> creatSystemTopicClientWithRetry( NamespaceName namespace) { + CompletableFuture<SystemTopicClient.Reader<PulsarEvent>> result = new CompletableFuture<>(); + try { + createSystemTopicFactoryIfNeeded(); + } catch (PulsarServerException e) { + result.completeExceptionally(e); + return result; + } SystemTopicClient<PulsarEvent> systemTopicClient = namespaceEventsSystemTopicFactory .createTopicPoliciesSystemTopicClient(namespace); - CompletableFuture<SystemTopicClient.Reader<PulsarEvent>> result = new CompletableFuture<>(); Backoff backoff = new Backoff(1, TimeUnit.SECONDS, 3, TimeUnit.SECONDS, 10, TimeUnit.SECONDS); RetryUtil.retryAsynchronously(() -> { try { @@ -386,6 +396,12 @@ public class SystemTopicBasedTopicPoliciesService implements TopicPoliciesServic // However, due to compatibility, it is temporarily retained here // and can be deleted in the future. policiesCache.remove(topicName); + try { + createSystemTopicFactoryIfNeeded(); + } catch (PulsarServerException e) { + log.error("Failed to create system topic factory"); + break; + } SystemTopicClient<PulsarEvent> systemTopicClient = namespaceEventsSystemTopicFactory .createTopicPoliciesSystemTopicClient(topicName.getNamespaceObject()); systemTopicClient.newWriterAsync().thenAccept(writer @@ -405,7 +421,7 @@ public class SystemTopicBasedTopicPoliciesService implements TopicPoliciesServic } } - private void createSystemTopicFactoryIfNeeded() { + private void createSystemTopicFactoryIfNeeded() throws PulsarServerException { if (namespaceEventsSystemTopicFactory == null) { synchronized (this) { if (namespaceEventsSystemTopicFactory == null) { @@ -414,6 +430,7 @@ public class SystemTopicBasedTopicPoliciesService implements TopicPoliciesServic new NamespaceEventsSystemTopicFactory(pulsarService.getClient()); } catch (PulsarServerException e) { log.error("Create namespace event system topic factory error.", e); + throw e; } } }
