This is an automated email from the ASF dual-hosted git repository.
lhotari pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/master by this push:
new 7a44c801f86 [improve][broker] Close TopicPoliciesService to allow
Pulsar broker graceful shutdown (#22589)
7a44c801f86 is described below
commit 7a44c801f86c4276533b0f008e768fb8deba4abc
Author: Lari Hotari <[email protected]>
AuthorDate: Fri Apr 26 23:17:18 2024 +0300
[improve][broker] Close TopicPoliciesService to allow Pulsar broker
graceful shutdown (#22589)
---
.../org/apache/pulsar/broker/PulsarService.java | 5 ++
.../SystemTopicBasedTopicPoliciesService.java | 63 +++++++++++++++++++---
.../SystemTopicTxnBufferSnapshotService.java | 20 ++++++-
.../broker/service/TopicPoliciesService.java | 7 ++-
4 files changed, 87 insertions(+), 8 deletions(-)
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java
index c21c7dc771e..51dffc20d07 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java
@@ -565,6 +565,11 @@ public class PulsarService implements AutoCloseable,
ShutdownService {
transactionBufferClient.close();
}
+ if (topicPoliciesService != null) {
+ topicPoliciesService.close();
+ topicPoliciesService = null;
+ }
+
if (client != null) {
client.close();
client = null;
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/SystemTopicBasedTopicPoliciesService.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/SystemTopicBasedTopicPoliciesService.java
index 0449e5c885c..6d18d6d61b0 100644
---
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/SystemTopicBasedTopicPoliciesService.java
+++
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/SystemTopicBasedTopicPoliciesService.java
@@ -32,6 +32,7 @@ import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import javax.annotation.Nonnull;
import org.apache.commons.lang3.concurrent.ConcurrentInitializer;
@@ -72,6 +73,7 @@ public class SystemTopicBasedTopicPoliciesService implements
TopicPoliciesServic
private final PulsarService pulsarService;
private final HashSet localCluster;
private final String clusterName;
+ private final AtomicBoolean closed = new AtomicBoolean(false);
private final ConcurrentInitializer<NamespaceEventsSystemTopicFactory>
namespaceEventsSystemTopicFactoryLazyInitializer = new
LazyInitializer<>() {
@@ -110,12 +112,18 @@ public class SystemTopicBasedTopicPoliciesService
implements TopicPoliciesServic
this.writerCaches = Caffeine.newBuilder()
.expireAfterAccess(5, TimeUnit.MINUTES)
.removalListener((namespaceName, writer, cause) -> {
- ((SystemTopicClient.Writer)
writer).closeAsync().exceptionally(ex -> {
- log.error("[{}] Close writer error.", namespaceName,
ex);
- return null;
- });
+ try {
+ ((SystemTopicClient.Writer) writer).close();
+ } catch (Exception e) {
+ log.error("[{}] Close writer error.", namespaceName,
e);
+ }
})
+ .executor(pulsarService.getExecutor())
.buildAsync((namespaceName, executor) -> {
+ if (closed.get()) {
+ return CompletableFuture.failedFuture(
+ new
BrokerServiceException(getClass().getName() + " is closed."));
+ }
SystemTopicClient<PulsarEvent> systemTopicClient =
getNamespaceEventsSystemTopicFactory()
.createTopicPoliciesSystemTopicClient(namespaceName);
return systemTopicClient.newWriterAsync();
@@ -382,6 +390,10 @@ public class SystemTopicBasedTopicPoliciesService
implements TopicPoliciesServic
protected CompletableFuture<SystemTopicClient.Reader<PulsarEvent>>
createSystemTopicClient(
NamespaceName namespace) {
+ if (closed.get()) {
+ return CompletableFuture.failedFuture(
+ new BrokerServiceException(getClass().getName() + " is
closed."));
+ }
try {
createSystemTopicFactoryIfNeeded();
} catch (PulsarServerException ex) {
@@ -430,6 +442,11 @@ public class SystemTopicBasedTopicPoliciesService
implements TopicPoliciesServic
}
private void initPolicesCache(SystemTopicClient.Reader<PulsarEvent>
reader, CompletableFuture<Void> future) {
+ if (closed.get()) {
+ future.completeExceptionally(new
BrokerServiceException(getClass().getName() + " is closed."));
+
cleanCacheAndCloseReader(reader.getSystemTopic().getTopicName().getNamespaceObject(),
false);
+ return;
+ }
reader.hasMoreEventsAsync().whenComplete((hasMore, ex) -> {
if (ex != null) {
log.error("[{}] Failed to check the move events for the system
topic",
@@ -511,6 +528,10 @@ public class SystemTopicBasedTopicPoliciesService
implements TopicPoliciesServic
* #{@link
SystemTopicBasedTopicPoliciesService#getTopicPoliciesAsync(TopicName)} method
to block loading topic.
*/
private void readMorePoliciesAsync(SystemTopicClient.Reader<PulsarEvent>
reader) {
+ if (closed.get()) {
+
cleanCacheAndCloseReader(reader.getSystemTopic().getTopicName().getNamespaceObject(),
false);
+ return;
+ }
reader.readNextAsync()
.thenAccept(msg -> {
refreshTopicPoliciesCache(msg);
@@ -628,11 +649,20 @@ public class SystemTopicBasedTopicPoliciesService
implements TopicPoliciesServic
private void
fetchTopicPoliciesAsyncAndCloseReader(SystemTopicClient.Reader<PulsarEvent>
reader,
TopicName topicName,
TopicPolicies policies,
CompletableFuture<TopicPolicies> future) {
+ if (closed.get()) {
+ future.completeExceptionally(new
BrokerServiceException(getClass().getName() + " is closed."));
+ reader.closeAsync().whenComplete((v, e) -> {
+ if (e != null) {
+ log.error("[{}] Close reader error.", topicName, e);
+ }
+ });
+ return;
+ }
reader.hasMoreEventsAsync().whenComplete((hasMore, ex) -> {
if (ex != null) {
future.completeExceptionally(ex);
}
- if (hasMore) {
+ if (hasMore != null && hasMore) {
reader.readNextAsync().whenComplete((msg, e) -> {
if (e != null) {
future.completeExceptionally(e);
@@ -656,7 +686,9 @@ public class SystemTopicBasedTopicPoliciesService
implements TopicPoliciesServic
}
});
} else {
- future.complete(policies);
+ if (!future.isDone()) {
+ future.complete(policies);
+ }
reader.closeAsync().whenComplete((v, e) -> {
if (e != null) {
log.error("[{}] Close reader error.", topicName, e);
@@ -740,4 +772,23 @@ public class SystemTopicBasedTopicPoliciesService
implements TopicPoliciesServic
}
private static final Logger log =
LoggerFactory.getLogger(SystemTopicBasedTopicPoliciesService.class);
+
+ @Override
+ public void close() throws Exception {
+ if (closed.compareAndSet(false, true)) {
+ writerCaches.synchronous().invalidateAll();
+ readerCaches.values().forEach(future -> {
+ if (future != null && !future.isCompletedExceptionally()) {
+ future.thenAccept(reader -> {
+ try {
+ reader.close();
+ } catch (Exception e) {
+ log.error("Failed to close reader.", e);
+ }
+ });
+ }
+ });
+ readerCaches.clear();
+ }
+ }
}
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/SystemTopicTxnBufferSnapshotService.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/SystemTopicTxnBufferSnapshotService.java
index 332d754cf97..bd1b9098169 100644
---
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/SystemTopicTxnBufferSnapshotService.java
+++
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/SystemTopicTxnBufferSnapshotService.java
@@ -142,8 +142,26 @@ public class SystemTopicTxnBufferSnapshotService<T> {
public void close() throws Exception {
for (Map.Entry<NamespaceName, SystemTopicClient<T>> entry :
clients.entrySet()) {
- entry.getValue().close();
+ try {
+ entry.getValue().close();
+ } catch (Exception e) {
+ log.error("Failed to close system topic client for namespace
{}", entry.getKey(), e);
+ }
+ }
+ clients.clear();
+ for (Map.Entry<NamespaceName, ReferenceCountedWriter<T>> entry :
refCountedWriterMap.entrySet()) {
+ CompletableFuture<SystemTopicClient.Writer<T>> future =
entry.getValue().getFuture();
+ if (!future.isCompletedExceptionally()) {
+ future.thenAccept(writer -> {
+ try {
+ writer.close();
+ } catch (Exception e) {
+ log.error("Failed to close writer for namespace {}",
entry.getKey(), e);
+ }
+ });
+ }
}
+ refCountedWriterMap.clear();
}
}
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/TopicPoliciesService.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/TopicPoliciesService.java
index aa3a6aaeff2..41fecb3b87e 100644
---
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/TopicPoliciesService.java
+++
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/TopicPoliciesService.java
@@ -38,7 +38,7 @@ import org.jetbrains.annotations.NotNull;
* Topic policies service.
*/
@InterfaceStability.Evolving
-public interface TopicPoliciesService {
+public interface TopicPoliciesService extends AutoCloseable {
TopicPoliciesService DISABLED = new TopicPoliciesServiceDisabled();
long DEFAULT_GET_TOPIC_POLICY_TIMEOUT = 30_000;
@@ -239,5 +239,10 @@ public interface TopicPoliciesService {
public void unregisterListener(TopicName topicName,
TopicPolicyListener<TopicPolicies> listener) {
//No-op
}
+
+ @Override
+ public void close() {
+ //No-op
+ }
}
}