This is an automated email from the ASF dual-hosted git repository.
lhotari pushed a commit to branch branch-3.0
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/branch-3.0 by this push:
new 22e0a976fd3 [fix][broker][branch-3.0] fix
prepareInitPoliciesCacheAsync in SystemTopicBasedTopicPoliciesService (#24978)
22e0a976fd3 is described below
commit 22e0a976fd3f073a6f9bbfd8b5540afb4b337cbe
Author: ken <[email protected]>
AuthorDate: Wed Dec 10 22:57:41 2025 +0800
[fix][broker][branch-3.0] fix prepareInitPoliciesCacheAsync in
SystemTopicBasedTopicPoliciesService (#24978)
Co-authored-by: fanjianye <[email protected]>
---
.../SystemTopicBasedTopicPoliciesService.java | 78 +++++----
.../pulsar/broker/admin/TopicPoliciesTest.java | 5 +
.../SystemTopicBasedTopicPoliciesServiceTest.java | 190 +++++++++++++++++++++
3 files changed, 241 insertions(+), 32 deletions(-)
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/SystemTopicBasedTopicPoliciesService.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/SystemTopicBasedTopicPoliciesService.java
index 9b8fe6c7bdd..0fe78e03d04 100644
---
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/SystemTopicBasedTopicPoliciesService.java
+++
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/SystemTopicBasedTopicPoliciesService.java
@@ -407,30 +407,35 @@ public class SystemTopicBasedTopicPoliciesService
implements TopicPoliciesServic
@Nonnull CompletableFuture<Void> prepareInitPoliciesCacheAsync(@Nonnull
NamespaceName namespace) {
requireNonNull(namespace);
return
pulsarService.getPulsarResources().getNamespaceResources().getPoliciesAsync(namespace)
- .thenCompose(namespacePolicies -> {
- if (namespacePolicies.isEmpty() ||
namespacePolicies.get().deleted) {
- log.info("[{}] skip prepare init policies
cache since the namespace is deleted",
- namespace);
- return CompletableFuture.completedFuture(null);
- }
+ .thenCompose(namespacePolicies -> {
+ if (namespacePolicies.isEmpty() ||
namespacePolicies.get().deleted) {
+ log.info("[{}] skip prepare init policies cache since
the namespace is deleted",
+ namespace);
+ return CompletableFuture.completedFuture(null);
+ }
- return
policyCacheInitMap.computeIfAbsent(namespace, (k) -> {
- final
CompletableFuture<SystemTopicClient.Reader<PulsarEvent>>
readerCompletableFuture =
- createSystemTopicClient(namespace);
- readerCaches.put(namespace,
readerCompletableFuture);
-
ownedBundlesCountPerNamespace.putIfAbsent(namespace, new AtomicInteger(1));
- final CompletableFuture<Void> initFuture =
readerCompletableFuture
- .thenCompose(reader -> {
- final CompletableFuture<Void>
stageFuture = new CompletableFuture<>();
- initPolicesCache(reader,
stageFuture);
- return stageFuture
- // Read policies in
background
- .thenAccept(__ ->
readMorePoliciesAsync(reader));
- });
- initFuture.exceptionallyAsync(ex -> {
+ CompletableFuture<Void> initNamespacePolicyFuture = new
CompletableFuture<>();
+ CompletableFuture<Void> existingFuture =
+ policyCacheInitMap.putIfAbsent(namespace,
initNamespacePolicyFuture);
+ if (existingFuture == null) {
+ final
CompletableFuture<SystemTopicClient.Reader<PulsarEvent>>
readerCompletableFuture =
+ newReader(namespace);
+ ownedBundlesCountPerNamespace.putIfAbsent(namespace,
new AtomicInteger(1));
+ readerCompletableFuture
+ .thenCompose(reader -> {
+ final CompletableFuture<Void> stageFuture
= new CompletableFuture<>();
+ initPolicesCache(reader, stageFuture);
+ return stageFuture
+ // Read policies in background
+ .thenAccept(__ ->
readMorePoliciesAsync(reader));
+ }).thenApply(__ -> {
+ initNamespacePolicyFuture.complete(null);
+ return null;
+ }).exceptionally(ex -> {
try {
- log.error("[{}] Failed to create
reader on __change_events topic",
- namespace, ex);
+ log.error("[{}] occur exception on
reader of __change_events topic. "
+ + "try to clean the reader.",
namespace, ex);
+
initNamespacePolicyFuture.completeExceptionally(ex);
cleanCacheAndCloseReader(namespace,
false);
} catch (Throwable cleanupEx) {
// Adding this catch to avoid break
callback chain
@@ -438,11 +443,22 @@ public class SystemTopicBasedTopicPoliciesService
implements TopicPoliciesServic
namespace, cleanupEx);
}
return null;
- }, pulsarService.getExecutor());
- // let caller know we've got an exception.
- return initFuture;
- });
- });
+ });
+
+ return initNamespacePolicyFuture;
+ } else {
+ return existingFuture;
+ }
+ });
+ }
+
+ private CompletableFuture<SystemTopicClient.Reader<PulsarEvent>>
newReader(NamespaceName ns) {
+ return readerCaches.compute(ns, (__, existingFuture) -> {
+ if (existingFuture == null) {
+ return createSystemTopicClient(ns);
+ }
+ return existingFuture;
+ });
}
protected CompletableFuture<SystemTopicClient.Reader<PulsarEvent>>
createSystemTopicClient(
@@ -501,7 +517,6 @@ public class SystemTopicBasedTopicPoliciesService
implements TopicPoliciesServic
private void initPolicesCache(SystemTopicClient.Reader<PulsarEvent>
reader, CompletableFuture<Void> future) {
if (closed.get()) {
future.completeExceptionally(new
BrokerServiceException(getClass().getName() + " is closed."));
-
cleanCacheAndCloseReader(reader.getSystemTopic().getTopicName().getNamespaceObject(),
false);
return;
}
reader.hasMoreEventsAsync().whenComplete((hasMore, ex) -> {
@@ -509,7 +524,6 @@ public class SystemTopicBasedTopicPoliciesService
implements TopicPoliciesServic
log.error("[{}] Failed to check the move events for the system
topic",
reader.getSystemTopic().getTopicName(), ex);
future.completeExceptionally(ex);
-
cleanCacheAndCloseReader(reader.getSystemTopic().getTopicName().getNamespaceObject(),
false);
return;
}
if (hasMore) {
@@ -524,7 +538,6 @@ public class SystemTopicBasedTopicPoliciesService
implements TopicPoliciesServic
log.error("[{}] Failed to read event from the system
topic.",
reader.getSystemTopic().getTopicName(), e);
future.completeExceptionally(e);
-
cleanCacheAndCloseReader(reader.getSystemTopic().getTopicName().getNamespaceObject(),
false);
return null;
});
} else {
@@ -550,11 +563,12 @@ public class SystemTopicBasedTopicPoliciesService
implements TopicPoliciesServic
});
}
- private void cleanCacheAndCloseReader(@Nonnull NamespaceName namespace,
boolean cleanOwnedBundlesCount) {
+ void cleanCacheAndCloseReader(@Nonnull NamespaceName namespace, boolean
cleanOwnedBundlesCount) {
cleanCacheAndCloseReader(namespace, cleanOwnedBundlesCount, false);
}
- private void cleanCacheAndCloseReader(@Nonnull NamespaceName namespace,
boolean cleanOwnedBundlesCount,
+ @VisibleForTesting
+ void cleanCacheAndCloseReader(@Nonnull NamespaceName namespace, boolean
cleanOwnedBundlesCount,
boolean cleanWriterCache) {
if (cleanWriterCache) {
writerCaches.synchronous().invalidate(namespace);
diff --git
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/TopicPoliciesTest.java
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/TopicPoliciesTest.java
index dc905085e59..f36cb146ed3 100644
---
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/TopicPoliciesTest.java
+++
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/TopicPoliciesTest.java
@@ -61,6 +61,7 @@ import
org.apache.pulsar.broker.service.persistent.DispatchRateLimiter;
import org.apache.pulsar.broker.service.persistent.PersistentSubscription;
import org.apache.pulsar.broker.service.persistent.PersistentTopic;
import org.apache.pulsar.broker.service.persistent.SubscribeRateLimiter;
+import org.apache.pulsar.broker.systopic.SystemTopicClient;
import org.apache.pulsar.client.admin.PulsarAdminException;
import org.apache.pulsar.client.api.Consumer;
import org.apache.pulsar.client.api.ConsumerBuilder;
@@ -74,6 +75,7 @@ import
org.apache.pulsar.client.api.SubscriptionInitialPosition;
import org.apache.pulsar.client.api.SubscriptionMode;
import org.apache.pulsar.client.api.SubscriptionType;
import org.apache.pulsar.common.api.proto.CommandSubscribe;
+import org.apache.pulsar.common.events.PulsarEvent;
import org.apache.pulsar.common.naming.NamespaceName;
import org.apache.pulsar.common.naming.SystemTopicNames;
import org.apache.pulsar.common.naming.TopicDomain;
@@ -3382,10 +3384,13 @@ public class TopicPoliciesTest extends
MockedPulsarServiceBaseTest {
WhiteboxImpl.getInternalState(topicPoliciesService,
"policiesCache");
Map<TopicName, TopicPolicies> globalPoliciesCache =
WhiteboxImpl.getInternalState(topicPoliciesService,
"globalPoliciesCache");
+ Map<NamespaceName,
CompletableFuture<SystemTopicClient.Reader<PulsarEvent>>> readerCaches =
+ WhiteboxImpl.getInternalState(topicPoliciesService,
"readerCaches");
policyCacheInitMap.clear();
policiesCache.clear();
globalPoliciesCache.clear();
+ readerCaches.clear();
}
@DataProvider(name = "reloadPolicyTypes")
diff --git
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/SystemTopicBasedTopicPoliciesServiceTest.java
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/SystemTopicBasedTopicPoliciesServiceTest.java
index ab0f23d455d..f3b11120cb0 100644
---
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/SystemTopicBasedTopicPoliciesServiceTest.java
+++
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/SystemTopicBasedTopicPoliciesServiceTest.java
@@ -19,13 +19,19 @@
package org.apache.pulsar.broker.service;
import static org.assertj.core.api.Assertions.assertThat;
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.ArgumentMatchers.anyBoolean;
import static org.mockito.Mockito.spy;
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
import static org.testng.AssertJUnit.assertEquals;
+import static org.testng.AssertJUnit.assertFalse;
import static org.testng.AssertJUnit.assertNotNull;
import static org.testng.AssertJUnit.assertNull;
import static org.testng.AssertJUnit.assertTrue;
import java.lang.reflect.Field;
import java.time.Duration;
+import java.util.ArrayList;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
@@ -42,6 +48,10 @@ import java.util.concurrent.TimeUnit;
import lombok.Cleanup;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang3.reflect.FieldUtils;
+import org.apache.logging.log4j.LogManager;
+import org.apache.logging.log4j.core.LogEvent;
+import org.apache.logging.log4j.core.Logger;
+import org.apache.logging.log4j.core.appender.AbstractAppender;
import org.apache.pulsar.broker.auth.MockedPulsarServiceBaseTest;
import
org.apache.pulsar.broker.service.BrokerServiceException.TopicPoliciesCacheNotInitException;
import org.apache.pulsar.broker.systopic.SystemTopicClient;
@@ -532,4 +542,184 @@ public class SystemTopicBasedTopicPoliciesServiceTest
extends MockedPulsarServic
Assert.assertNotNull(topicPolicies);
Assert.assertEquals(topicPolicies.getMaxConsumerPerTopic(), 10);
}
+
+ @Test
+ public void
testPrepareInitPoliciesCacheAsyncThrowExceptionAfterCreateReader() throws
Exception {
+ // catch the log output in SystemTopicBasedTopicPoliciesService
+ Logger logger = (Logger)
LogManager.getLogger(SystemTopicBasedTopicPoliciesService.class);
+ List<String> logMessages = new ArrayList<>();
+ AbstractAppender appender = new AbstractAppender("TestAppender", null,
null) {
+ @Override
+ public void append(LogEvent event) {
+ logMessages.add(event.getMessage().getFormattedMessage());
+ }
+ };
+ appender.start();
+ logger.addAppender(appender);
+
+ // create namespace-5 and topic
+ SystemTopicBasedTopicPoliciesService spyService = Mockito.spy(new
SystemTopicBasedTopicPoliciesService(pulsar));
+ FieldUtils.writeField(pulsar, "topicPoliciesService", spyService,
true);
+
+
+ admin.namespaces().createNamespace(NAMESPACE5);
+ final String topic = "persistent://" + NAMESPACE5 + "/test" +
UUID.randomUUID();
+ admin.topics().createPartitionedTopic(topic, 1);
+
+ CompletableFuture<Void> future =
spyService.getPoliciesCacheInit(NamespaceName.get(NAMESPACE5));
+ Assert.assertNull(future);
+
+ // mock readerCache and new a reader, then put this reader in
readerCache.
+ // when new reader, would trigger __change_event topic of namespace-5
created
+ // and would trigger prepareInitPoliciesCacheAsync()
+ ConcurrentHashMap<NamespaceName,
CompletableFuture<SystemTopicClient.Reader<PulsarEvent>>>
+ spyReaderCaches = new ConcurrentHashMap<>();
+ CompletableFuture<SystemTopicClient.Reader<PulsarEvent>>
readerCompletableFuture =
+
spyService.createSystemTopicClient(NamespaceName.get(NAMESPACE5));
+ spyReaderCaches.put(NamespaceName.get(NAMESPACE5),
readerCompletableFuture);
+ FieldUtils.writeDeclaredField(spyService, "readerCaches",
spyReaderCaches, true);
+
+ // set topic policy. create producer for __change_event topic
+ admin.topicPolicies().setMaxConsumersPerSubscription(topic, 1);
+ future =
spyService.getPoliciesCacheInit(NamespaceName.get(NAMESPACE5));
+ Assert.assertNotNull(future);
+
+ // trigger close reader of __change_event directly, simulate that
reader
+ // is closed for some reason, such as topic unload or broker restart.
+ // since prepareInitPoliciesCacheAsync() has been executed, it would
go into readMorePoliciesAsync(),
+ // throw exception, output "Closing the topic policies reader for" and
do cleanCacheAndCloseReader()
+ SystemTopicClient.Reader<PulsarEvent> reader =
readerCompletableFuture.get();
+ reader.close();
+ log.info("successfully close spy reader");
+ Awaitility.await().untilAsserted(() -> {
+ boolean logFound = logMessages.stream()
+ .anyMatch(msg -> msg.contains("Closing the topic policies
reader for"));
+ assertTrue(logFound);
+ });
+
+ // Since cleanCacheAndCloseReader() is executed, should add the failed
reader into readerCache again.
+ // Then in SystemTopicBasedTopicPoliciesService, readerCache has a
closed reader,
+ // and policyCacheInitMap do not contain a future.
+ // To simulate the situation: when getTopicPolicy() execute, it will
do prepareInitPoliciesCacheAsync() and
+ // use a closed reader to read the __change_event topic. Then throw
exception
+ spyReaderCaches.put(NamespaceName.get(NAMESPACE5),
readerCompletableFuture);
+ FieldUtils.writeDeclaredField(spyService, "readerCaches",
spyReaderCaches, true);
+
+ CompletableFuture<Void> prepareFuture = new CompletableFuture<>();
+ try {
+ prepareFuture =
spyService.prepareInitPoliciesCacheAsync(NamespaceName.get(NAMESPACE5));
+ prepareFuture.get();
+ Assert.fail();
+ } catch (Exception e) {
+ // that is ok
+ }
+
+
+ // since prepareInitPoliciesCacheAsync() throw exception when
initPolicesCache(),
+ // would clean readerCache and policyCacheInitMap
+ Assert.assertTrue(prepareFuture.isCompletedExceptionally());
+ Awaitility.await().untilAsserted(() -> {
+ CompletableFuture<Void> future1 =
spyService.getPoliciesCacheInit(NamespaceName.get(NAMESPACE5));
+ Assert.assertNull(future1);
+ CompletableFuture<SystemTopicClient.Reader<PulsarEvent>>
readerCompletableFuture1 =
+ spyReaderCaches.get(NamespaceName.get(NAMESPACE5));
+ Assert.assertNull(readerCompletableFuture1);
+ });
+
+
+ // make sure not do cleanCacheAndCloseReader() twice
+ // totally trigger prepareInitPoliciesCacheAsync() twice, so the time
of cleanCacheAndCloseReader() is 2.
+ // in previous code, the time would be 3
+ boolean logFound = logMessages.stream()
+ .anyMatch(msg -> msg.contains("occur exception on reader of
__change_events topic"));
+ assertTrue(logFound);
+ boolean logFound2 = logMessages.stream()
+ .anyMatch(msg -> msg.contains("Failed to check the move events
for the system topic"));
+ assertTrue(logFound2);
+ verify(spyService, times(2)).cleanCacheAndCloseReader(any(),
anyBoolean(), anyBoolean());
+
+ // make sure not occur Recursive update
+ boolean logFound3 = logMessages.stream()
+ .anyMatch(msg -> msg.contains("Recursive update"));
+ assertFalse(logFound3);
+
+ // clean log appender
+ appender.stop();
+ logger.removeAppender(appender);
+ }
+
+ @Test
+ public void
testPrepareInitPoliciesCacheAsyncThrowExceptionInCreateReader() throws
Exception {
+ // catch the log output in SystemTopicBasedTopicPoliciesService
+ Logger logger = (Logger)
LogManager.getLogger(SystemTopicBasedTopicPoliciesService.class);
+ List<String> logMessages = new ArrayList<>();
+ AbstractAppender appender = new AbstractAppender("TestAppender", null,
null) {
+ @Override
+ public void append(LogEvent event) {
+ logMessages.add(event.getMessage().getFormattedMessage());
+ }
+ };
+ appender.start();
+ logger.addAppender(appender);
+
+ // create namespace-5 and topic
+ SystemTopicBasedTopicPoliciesService spyService =
+ Mockito.spy(new SystemTopicBasedTopicPoliciesService(pulsar));
+ FieldUtils.writeField(pulsar, "topicPoliciesService", spyService,
true);
+
+
+ admin.namespaces().createNamespace(NAMESPACE5);
+ final String topic = "persistent://" + NAMESPACE5 + "/test" +
UUID.randomUUID();
+ admin.topics().createPartitionedTopic(topic, 1);
+
+ CompletableFuture<Void> future =
spyService.getPoliciesCacheInit(NamespaceName.get(NAMESPACE5));
+ Assert.assertNull(future);
+
+ // mock readerCache and put a failed readerCreateFuture in readerCache.
+ // simulate that when trigger prepareInitPoliciesCacheAsync(),
+ // it would use this failed readerFuture and go into corresponding
logic
+ ConcurrentHashMap<NamespaceName,
CompletableFuture<SystemTopicClient.Reader<PulsarEvent>>>
+ spyReaderCaches = new ConcurrentHashMap<>();
+ CompletableFuture<SystemTopicClient.Reader<PulsarEvent>>
readerCompletableFuture = new CompletableFuture<>();
+ readerCompletableFuture.completeExceptionally(new Exception("create
reader fail"));
+ spyReaderCaches.put(NamespaceName.get(NAMESPACE5),
readerCompletableFuture);
+ FieldUtils.writeDeclaredField(spyService, "readerCaches",
spyReaderCaches, true);
+
+ // trigger prepareInitPoliciesCacheAsync()
+ CompletableFuture<Void> prepareFuture = new CompletableFuture<>();
+ try {
+ prepareFuture =
spyService.prepareInitPoliciesCacheAsync(NamespaceName.get(NAMESPACE5));
+ prepareFuture.get();
+ Assert.fail();
+ } catch (Exception e) {
+ // that is ok
+ }
+
+ // since prepareInitPoliciesCacheAsync() throw exception when
createReader,
+ // would clean readerCache and policyCacheInitMap.
+ Assert.assertTrue(prepareFuture.isCompletedExceptionally());
+ Awaitility.await().untilAsserted(() -> {
+ CompletableFuture<Void> future1 =
spyService.getPoliciesCacheInit(NamespaceName.get(NAMESPACE5));
+ Assert.assertNull(future1);
+ CompletableFuture<SystemTopicClient.Reader<PulsarEvent>>
readerCompletableFuture1 =
+ spyReaderCaches.get(NamespaceName.get(NAMESPACE5));
+ Assert.assertNull(readerCompletableFuture1);
+ });
+
+
+ // make sure not do cleanCacheAndCloseReader() twice
+ // totally trigger prepareInitPoliciesCacheAsync() once, so the time
of cleanCacheAndCloseReader() is 1.
+ boolean logFound = logMessages.stream()
+ .anyMatch(msg -> msg.contains("occur exception on reader of
__change_events topic"));
+ assertTrue(logFound);
+ boolean logFound2 = logMessages.stream()
+ .anyMatch(msg -> msg.contains("Failed to check the move events
for the system topic")
+ || msg.contains("Failed to read event from the system
topic"));
+ assertFalse(logFound2);
+ verify(spyService, times(1)).cleanCacheAndCloseReader(any(),
anyBoolean(), anyBoolean());
+
+ // clean log appender
+ appender.stop();
+ logger.removeAppender(appender);
+ }
}