This is an automated email from the ASF dual-hosted git repository.

lhotari pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/master by this push:
     new 7a44c801f86 [improve][broker] Close TopicPoliciesService to allow 
Pulsar broker graceful shutdown (#22589)
7a44c801f86 is described below

commit 7a44c801f86c4276533b0f008e768fb8deba4abc
Author: Lari Hotari <[email protected]>
AuthorDate: Fri Apr 26 23:17:18 2024 +0300

    [improve][broker] Close TopicPoliciesService to allow Pulsar broker 
graceful shutdown (#22589)
---
 .../org/apache/pulsar/broker/PulsarService.java    |  5 ++
 .../SystemTopicBasedTopicPoliciesService.java      | 63 +++++++++++++++++++---
 .../SystemTopicTxnBufferSnapshotService.java       | 20 ++++++-
 .../broker/service/TopicPoliciesService.java       |  7 ++-
 4 files changed, 87 insertions(+), 8 deletions(-)

diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java
index c21c7dc771e..51dffc20d07 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java
@@ -565,6 +565,11 @@ public class PulsarService implements AutoCloseable, 
ShutdownService {
                 transactionBufferClient.close();
             }
 
+            if (topicPoliciesService != null) {
+                topicPoliciesService.close();
+                topicPoliciesService = null;
+            }
+
             if (client != null) {
                 client.close();
                 client = null;
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/SystemTopicBasedTopicPoliciesService.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/SystemTopicBasedTopicPoliciesService.java
index 0449e5c885c..6d18d6d61b0 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/SystemTopicBasedTopicPoliciesService.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/SystemTopicBasedTopicPoliciesService.java
@@ -32,6 +32,7 @@ import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.CopyOnWriteArrayList;
 import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.AtomicInteger;
 import javax.annotation.Nonnull;
 import org.apache.commons.lang3.concurrent.ConcurrentInitializer;
@@ -72,6 +73,7 @@ public class SystemTopicBasedTopicPoliciesService implements 
TopicPoliciesServic
     private final PulsarService pulsarService;
     private final HashSet localCluster;
     private final String clusterName;
+    private final AtomicBoolean closed = new AtomicBoolean(false);
 
     private final ConcurrentInitializer<NamespaceEventsSystemTopicFactory>
             namespaceEventsSystemTopicFactoryLazyInitializer = new 
LazyInitializer<>() {
@@ -110,12 +112,18 @@ public class SystemTopicBasedTopicPoliciesService 
implements TopicPoliciesServic
         this.writerCaches = Caffeine.newBuilder()
                 .expireAfterAccess(5, TimeUnit.MINUTES)
                 .removalListener((namespaceName, writer, cause) -> {
-                    ((SystemTopicClient.Writer) 
writer).closeAsync().exceptionally(ex -> {
-                        log.error("[{}] Close writer error.", namespaceName, 
ex);
-                        return null;
-                    });
+                    try {
+                        ((SystemTopicClient.Writer) writer).close();
+                    } catch (Exception e) {
+                        log.error("[{}] Close writer error.", namespaceName, 
e);
+                    }
                 })
+                .executor(pulsarService.getExecutor())
                 .buildAsync((namespaceName, executor) -> {
+                    if (closed.get()) {
+                        return CompletableFuture.failedFuture(
+                                new 
BrokerServiceException(getClass().getName() + " is closed."));
+                    }
                     SystemTopicClient<PulsarEvent> systemTopicClient = 
getNamespaceEventsSystemTopicFactory()
                             
.createTopicPoliciesSystemTopicClient(namespaceName);
                     return systemTopicClient.newWriterAsync();
@@ -382,6 +390,10 @@ public class SystemTopicBasedTopicPoliciesService 
implements TopicPoliciesServic
 
     protected CompletableFuture<SystemTopicClient.Reader<PulsarEvent>> 
createSystemTopicClient(
             NamespaceName namespace) {
+        if (closed.get()) {
+            return CompletableFuture.failedFuture(
+                    new BrokerServiceException(getClass().getName() + " is 
closed."));
+        }
         try {
             createSystemTopicFactoryIfNeeded();
         } catch (PulsarServerException ex) {
@@ -430,6 +442,11 @@ public class SystemTopicBasedTopicPoliciesService 
implements TopicPoliciesServic
     }
 
     private void initPolicesCache(SystemTopicClient.Reader<PulsarEvent> 
reader, CompletableFuture<Void> future) {
+        if (closed.get()) {
+            future.completeExceptionally(new 
BrokerServiceException(getClass().getName() + " is closed."));
+            
cleanCacheAndCloseReader(reader.getSystemTopic().getTopicName().getNamespaceObject(),
 false);
+            return;
+        }
         reader.hasMoreEventsAsync().whenComplete((hasMore, ex) -> {
             if (ex != null) {
                 log.error("[{}] Failed to check the move events for the system 
topic",
@@ -511,6 +528,10 @@ public class SystemTopicBasedTopicPoliciesService 
implements TopicPoliciesServic
      * #{@link 
SystemTopicBasedTopicPoliciesService#getTopicPoliciesAsync(TopicName)} method 
to block loading topic.
      */
     private void readMorePoliciesAsync(SystemTopicClient.Reader<PulsarEvent> 
reader) {
+        if (closed.get()) {
+            
cleanCacheAndCloseReader(reader.getSystemTopic().getTopicName().getNamespaceObject(),
 false);
+            return;
+        }
         reader.readNextAsync()
                 .thenAccept(msg -> {
                     refreshTopicPoliciesCache(msg);
@@ -628,11 +649,20 @@ public class SystemTopicBasedTopicPoliciesService 
implements TopicPoliciesServic
     private void 
fetchTopicPoliciesAsyncAndCloseReader(SystemTopicClient.Reader<PulsarEvent> 
reader,
                                                        TopicName topicName, 
TopicPolicies policies,
                                                        
CompletableFuture<TopicPolicies> future) {
+        if (closed.get()) {
+            future.completeExceptionally(new 
BrokerServiceException(getClass().getName() + " is closed."));
+            reader.closeAsync().whenComplete((v, e) -> {
+                if (e != null) {
+                    log.error("[{}] Close reader error.", topicName, e);
+                }
+            });
+            return;
+        }
         reader.hasMoreEventsAsync().whenComplete((hasMore, ex) -> {
             if (ex != null) {
                 future.completeExceptionally(ex);
             }
-            if (hasMore) {
+            if (hasMore != null && hasMore) {
                 reader.readNextAsync().whenComplete((msg, e) -> {
                     if (e != null) {
                         future.completeExceptionally(e);
@@ -656,7 +686,9 @@ public class SystemTopicBasedTopicPoliciesService 
implements TopicPoliciesServic
                     }
                 });
             } else {
-                future.complete(policies);
+                if (!future.isDone()) {
+                    future.complete(policies);
+                }
                 reader.closeAsync().whenComplete((v, e) -> {
                     if (e != null) {
                         log.error("[{}] Close reader error.", topicName, e);
@@ -740,4 +772,23 @@ public class SystemTopicBasedTopicPoliciesService 
implements TopicPoliciesServic
     }
 
     private static final Logger log = 
LoggerFactory.getLogger(SystemTopicBasedTopicPoliciesService.class);
+
+    @Override
+    public void close() throws Exception {
+        if (closed.compareAndSet(false, true)) {
+            writerCaches.synchronous().invalidateAll();
+            readerCaches.values().forEach(future -> {
+                if (future != null && !future.isCompletedExceptionally()) {
+                    future.thenAccept(reader -> {
+                        try {
+                            reader.close();
+                        } catch (Exception e) {
+                            log.error("Failed to close reader.", e);
+                        }
+                    });
+                }
+            });
+            readerCaches.clear();
+        }
+    }
 }
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/SystemTopicTxnBufferSnapshotService.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/SystemTopicTxnBufferSnapshotService.java
index 332d754cf97..bd1b9098169 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/SystemTopicTxnBufferSnapshotService.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/SystemTopicTxnBufferSnapshotService.java
@@ -142,8 +142,26 @@ public class SystemTopicTxnBufferSnapshotService<T> {
 
     public void close() throws Exception {
         for (Map.Entry<NamespaceName, SystemTopicClient<T>> entry : 
clients.entrySet()) {
-            entry.getValue().close();
+            try {
+                entry.getValue().close();
+            } catch (Exception e) {
+                log.error("Failed to close system topic client for namespace 
{}", entry.getKey(), e);
+            }
+        }
+        clients.clear();
+        for (Map.Entry<NamespaceName, ReferenceCountedWriter<T>> entry : 
refCountedWriterMap.entrySet()) {
+            CompletableFuture<SystemTopicClient.Writer<T>> future = 
entry.getValue().getFuture();
+            if (!future.isCompletedExceptionally()) {
+                future.thenAccept(writer -> {
+                    try {
+                        writer.close();
+                    } catch (Exception e) {
+                        log.error("Failed to close writer for namespace {}", 
entry.getKey(), e);
+                    }
+                });
+            }
         }
+        refCountedWriterMap.clear();
     }
 
 }
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/TopicPoliciesService.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/TopicPoliciesService.java
index aa3a6aaeff2..41fecb3b87e 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/TopicPoliciesService.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/TopicPoliciesService.java
@@ -38,7 +38,7 @@ import org.jetbrains.annotations.NotNull;
  * Topic policies service.
  */
 @InterfaceStability.Evolving
-public interface TopicPoliciesService {
+public interface TopicPoliciesService extends AutoCloseable {
 
     TopicPoliciesService DISABLED = new TopicPoliciesServiceDisabled();
     long DEFAULT_GET_TOPIC_POLICY_TIMEOUT = 30_000;
@@ -239,5 +239,10 @@ public interface TopicPoliciesService {
         public void unregisterListener(TopicName topicName, 
TopicPolicyListener<TopicPolicies> listener) {
             //No-op
         }
+
+        @Override
+        public void close() {
+            //No-op
+        }
     }
 }

Reply via email to