This is an automated email from the ASF dual-hosted git repository.

lhotari pushed a commit to branch branch-3.2
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/branch-3.2 by this push:
     new 2e5fd26050a [fix][broker] Fix a deadlock in 
SystemTopicBasedTopicPoliciesService during NamespaceEventsSystemTopicFactory 
init (#22528)
2e5fd26050a is described below

commit 2e5fd26050a1799367bcc89f72f68f1824450b85
Author: Heesung Sohn <[email protected]>
AuthorDate: Wed Apr 17 23:59:36 2024 -0700

    [fix][broker] Fix a deadlock in SystemTopicBasedTopicPoliciesService during 
NamespaceEventsSystemTopicFactory init (#22528)
    
    (cherry picked from commit 72474d7a2dabdf7acf0b158bd07f1bc8b69b790e)
---
 .../SystemTopicBasedTopicPoliciesService.java      | 49 ++++++++++++++--------
 1 file changed, 32 insertions(+), 17 deletions(-)

diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/SystemTopicBasedTopicPoliciesService.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/SystemTopicBasedTopicPoliciesService.java
index 4e9e875bcf4..0449e5c885c 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/SystemTopicBasedTopicPoliciesService.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/SystemTopicBasedTopicPoliciesService.java
@@ -34,6 +34,8 @@ import java.util.concurrent.CopyOnWriteArrayList;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicInteger;
 import javax.annotation.Nonnull;
+import org.apache.commons.lang3.concurrent.ConcurrentInitializer;
+import org.apache.commons.lang3.concurrent.LazyInitializer;
 import org.apache.commons.lang3.tuple.MutablePair;
 import org.apache.pulsar.broker.PulsarServerException;
 import org.apache.pulsar.broker.PulsarService;
@@ -70,7 +72,19 @@ public class SystemTopicBasedTopicPoliciesService implements 
TopicPoliciesServic
     private final PulsarService pulsarService;
     private final HashSet localCluster;
     private final String clusterName;
-    private volatile NamespaceEventsSystemTopicFactory 
namespaceEventsSystemTopicFactory;
+
+    private final ConcurrentInitializer<NamespaceEventsSystemTopicFactory>
+            namespaceEventsSystemTopicFactoryLazyInitializer = new 
LazyInitializer<>() {
+        @Override
+        protected NamespaceEventsSystemTopicFactory initialize() {
+            try {
+                return new 
NamespaceEventsSystemTopicFactory(pulsarService.getClient());
+            } catch (PulsarServerException e) {
+                log.error("Create namespace event system topic factory 
error.", e);
+                throw new RuntimeException(e);
+            }
+        }
+    };
 
     @VisibleForTesting
     final Map<TopicName, TopicPolicies> policiesCache = new 
ConcurrentHashMap<>();
@@ -102,7 +116,7 @@ public class SystemTopicBasedTopicPoliciesService 
implements TopicPoliciesServic
                     });
                 })
                 .buildAsync((namespaceName, executor) -> {
-                    SystemTopicClient<PulsarEvent> systemTopicClient = 
namespaceEventsSystemTopicFactory
+                    SystemTopicClient<PulsarEvent> systemTopicClient = 
getNamespaceEventsSystemTopicFactory()
                             
.createTopicPoliciesSystemTopicClient(namespaceName);
                     return systemTopicClient.newWriterAsync();
                 });
@@ -301,7 +315,7 @@ public class SystemTopicBasedTopicPoliciesService 
implements TopicPoliciesServic
             result.complete(null);
             return result;
         }
-        SystemTopicClient<PulsarEvent> systemTopicClient = 
namespaceEventsSystemTopicFactory
+        SystemTopicClient<PulsarEvent> systemTopicClient = 
getNamespaceEventsSystemTopicFactory()
                 
.createTopicPoliciesSystemTopicClient(topicName.getNamespaceObject());
         systemTopicClient.newReaderAsync().thenAccept(r ->
                 fetchTopicPoliciesAsyncAndCloseReader(r, topicName, null, 
result));
@@ -373,7 +387,7 @@ public class SystemTopicBasedTopicPoliciesService 
implements TopicPoliciesServic
         } catch (PulsarServerException ex) {
             return FutureUtil.failedFuture(ex);
         }
-        final SystemTopicClient<PulsarEvent> systemTopicClient = 
namespaceEventsSystemTopicFactory
+        final SystemTopicClient<PulsarEvent> systemTopicClient = 
getNamespaceEventsSystemTopicFactory()
                 .createTopicPoliciesSystemTopicClient(namespace);
         return systemTopicClient.newReaderAsync();
     }
@@ -561,7 +575,7 @@ public class SystemTopicBasedTopicPoliciesService 
implements TopicPoliciesServic
                         log.error("Failed to create system topic factory");
                         break;
                     }
-                    SystemTopicClient<PulsarEvent> systemTopicClient = 
namespaceEventsSystemTopicFactory
+                    SystemTopicClient<PulsarEvent> systemTopicClient = 
getNamespaceEventsSystemTopicFactory()
                             
.createTopicPoliciesSystemTopicClient(topicName.getNamespaceObject());
                     systemTopicClient.newWriterAsync().thenAccept(writer
                             -> writer.deleteAsync(getEventKey(topicName),
@@ -595,18 +609,19 @@ public class SystemTopicBasedTopicPoliciesService 
implements TopicPoliciesServic
     }
 
     private void createSystemTopicFactoryIfNeeded() throws 
PulsarServerException {
-        if (namespaceEventsSystemTopicFactory == null) {
-            synchronized (this) {
-                if (namespaceEventsSystemTopicFactory == null) {
-                    try {
-                        namespaceEventsSystemTopicFactory =
-                                new 
NamespaceEventsSystemTopicFactory(pulsarService.getClient());
-                    } catch (PulsarServerException e) {
-                        log.error("Create namespace event system topic factory 
error.", e);
-                        throw e;
-                    }
-                }
-            }
+        try {
+            getNamespaceEventsSystemTopicFactory();
+        } catch (Exception e) {
+            throw new PulsarServerException(e);
+        }
+    }
+
+    private NamespaceEventsSystemTopicFactory 
getNamespaceEventsSystemTopicFactory() {
+        try {
+            return namespaceEventsSystemTopicFactoryLazyInitializer.get();
+        } catch (Exception e) {
+            log.error("Create namespace event system topic factory error.", e);
+            throw new RuntimeException(e);
         }
     }
 

Reply via email to