This is an automated email from the ASF dual-hosted git repository.

lhotari pushed a commit to branch branch-3.0
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/branch-3.0 by this push:
     new 22e0a976fd3 [fix][broker][branch-3.0] fix 
prepareInitPoliciesCacheAsync in SystemTopicBasedTopicPoliciesService (#24978)
22e0a976fd3 is described below

commit 22e0a976fd3f073a6f9bbfd8b5540afb4b337cbe
Author: ken <[email protected]>
AuthorDate: Wed Dec 10 22:57:41 2025 +0800

    [fix][broker][branch-3.0] fix prepareInitPoliciesCacheAsync in 
SystemTopicBasedTopicPoliciesService (#24978)
    
    Co-authored-by: fanjianye <[email protected]>
---
 .../SystemTopicBasedTopicPoliciesService.java      |  78 +++++----
 .../pulsar/broker/admin/TopicPoliciesTest.java     |   5 +
 .../SystemTopicBasedTopicPoliciesServiceTest.java  | 190 +++++++++++++++++++++
 3 files changed, 241 insertions(+), 32 deletions(-)

diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/SystemTopicBasedTopicPoliciesService.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/SystemTopicBasedTopicPoliciesService.java
index 9b8fe6c7bdd..0fe78e03d04 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/SystemTopicBasedTopicPoliciesService.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/SystemTopicBasedTopicPoliciesService.java
@@ -407,30 +407,35 @@ public class SystemTopicBasedTopicPoliciesService 
implements TopicPoliciesServic
     @Nonnull CompletableFuture<Void> prepareInitPoliciesCacheAsync(@Nonnull 
NamespaceName namespace) {
         requireNonNull(namespace);
         return 
pulsarService.getPulsarResources().getNamespaceResources().getPoliciesAsync(namespace)
-                        .thenCompose(namespacePolicies -> {
-                            if (namespacePolicies.isEmpty() || 
namespacePolicies.get().deleted) {
-                                log.info("[{}] skip prepare init policies 
cache since the namespace is deleted",
-                                        namespace);
-                                return CompletableFuture.completedFuture(null);
-                            }
+                .thenCompose(namespacePolicies -> {
+                    if (namespacePolicies.isEmpty() || 
namespacePolicies.get().deleted) {
+                        log.info("[{}] skip prepare init policies cache since 
the namespace is deleted",
+                                namespace);
+                        return CompletableFuture.completedFuture(null);
+                    }
 
-                            return 
policyCacheInitMap.computeIfAbsent(namespace, (k) -> {
-                                final 
CompletableFuture<SystemTopicClient.Reader<PulsarEvent>> 
readerCompletableFuture =
-                                        createSystemTopicClient(namespace);
-                                readerCaches.put(namespace, 
readerCompletableFuture);
-                                
ownedBundlesCountPerNamespace.putIfAbsent(namespace, new AtomicInteger(1));
-                                final CompletableFuture<Void> initFuture = 
readerCompletableFuture
-                                        .thenCompose(reader -> {
-                                            final CompletableFuture<Void> 
stageFuture = new CompletableFuture<>();
-                                            initPolicesCache(reader, 
stageFuture);
-                                            return stageFuture
-                                                    // Read policies in 
background
-                                                    .thenAccept(__ -> 
readMorePoliciesAsync(reader));
-                                        });
-                                initFuture.exceptionallyAsync(ex -> {
+                    CompletableFuture<Void> initNamespacePolicyFuture = new 
CompletableFuture<>();
+                    CompletableFuture<Void> existingFuture =
+                            policyCacheInitMap.putIfAbsent(namespace, 
initNamespacePolicyFuture);
+                    if (existingFuture == null) {
+                        final 
CompletableFuture<SystemTopicClient.Reader<PulsarEvent>> 
readerCompletableFuture =
+                                newReader(namespace);
+                        ownedBundlesCountPerNamespace.putIfAbsent(namespace, 
new AtomicInteger(1));
+                        readerCompletableFuture
+                                .thenCompose(reader -> {
+                                    final CompletableFuture<Void> stageFuture 
= new CompletableFuture<>();
+                                    initPolicesCache(reader, stageFuture);
+                                    return stageFuture
+                                            // Read policies in background
+                                            .thenAccept(__ -> 
readMorePoliciesAsync(reader));
+                                }).thenApply(__ -> {
+                                    initNamespacePolicyFuture.complete(null);
+                                    return null;
+                                }).exceptionally(ex -> {
                                     try {
-                                        log.error("[{}] Failed to create 
reader on __change_events topic",
-                                                namespace, ex);
+                                        log.error("[{}] occur exception on 
reader of __change_events topic. "
+                                                + "try to clean the reader.", 
namespace, ex);
+                                        
initNamespacePolicyFuture.completeExceptionally(ex);
                                         cleanCacheAndCloseReader(namespace, 
false);
                                     } catch (Throwable cleanupEx) {
                                         // Adding this catch to avoid break 
callback chain
@@ -438,11 +443,22 @@ public class SystemTopicBasedTopicPoliciesService 
implements TopicPoliciesServic
                                                 namespace, cleanupEx);
                                     }
                                     return null;
-                                }, pulsarService.getExecutor());
-                                // let caller know we've got an exception.
-                                return initFuture;
-                            });
-                        });
+                                });
+
+                        return initNamespacePolicyFuture;
+                    } else {
+                        return existingFuture;
+                    }
+                });
+    }
+
+    private CompletableFuture<SystemTopicClient.Reader<PulsarEvent>> 
newReader(NamespaceName ns) {
+        return readerCaches.compute(ns, (__, existingFuture) -> {
+            if (existingFuture == null) {
+                return createSystemTopicClient(ns);
+            }
+            return existingFuture;
+        });
     }
 
     protected CompletableFuture<SystemTopicClient.Reader<PulsarEvent>> 
createSystemTopicClient(
@@ -501,7 +517,6 @@ public class SystemTopicBasedTopicPoliciesService 
implements TopicPoliciesServic
     private void initPolicesCache(SystemTopicClient.Reader<PulsarEvent> 
reader, CompletableFuture<Void> future) {
         if (closed.get()) {
             future.completeExceptionally(new 
BrokerServiceException(getClass().getName() + " is closed."));
-            
cleanCacheAndCloseReader(reader.getSystemTopic().getTopicName().getNamespaceObject(),
 false);
             return;
         }
         reader.hasMoreEventsAsync().whenComplete((hasMore, ex) -> {
@@ -509,7 +524,6 @@ public class SystemTopicBasedTopicPoliciesService 
implements TopicPoliciesServic
                 log.error("[{}] Failed to check the move events for the system 
topic",
                         reader.getSystemTopic().getTopicName(), ex);
                 future.completeExceptionally(ex);
-                
cleanCacheAndCloseReader(reader.getSystemTopic().getTopicName().getNamespaceObject(),
 false);
                 return;
             }
             if (hasMore) {
@@ -524,7 +538,6 @@ public class SystemTopicBasedTopicPoliciesService 
implements TopicPoliciesServic
                     log.error("[{}] Failed to read event from the system 
topic.",
                             reader.getSystemTopic().getTopicName(), e);
                     future.completeExceptionally(e);
-                    
cleanCacheAndCloseReader(reader.getSystemTopic().getTopicName().getNamespaceObject(),
 false);
                     return null;
                 });
             } else {
@@ -550,11 +563,12 @@ public class SystemTopicBasedTopicPoliciesService 
implements TopicPoliciesServic
         });
     }
 
-    private void cleanCacheAndCloseReader(@Nonnull NamespaceName namespace, 
boolean cleanOwnedBundlesCount) {
+    void cleanCacheAndCloseReader(@Nonnull NamespaceName namespace, boolean 
cleanOwnedBundlesCount) {
         cleanCacheAndCloseReader(namespace, cleanOwnedBundlesCount, false);
     }
 
-    private void cleanCacheAndCloseReader(@Nonnull NamespaceName namespace, 
boolean cleanOwnedBundlesCount,
+    @VisibleForTesting
+    void cleanCacheAndCloseReader(@Nonnull NamespaceName namespace, boolean 
cleanOwnedBundlesCount,
                                           boolean cleanWriterCache) {
         if (cleanWriterCache) {
             writerCaches.synchronous().invalidate(namespace);
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/TopicPoliciesTest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/TopicPoliciesTest.java
index dc905085e59..f36cb146ed3 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/TopicPoliciesTest.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/TopicPoliciesTest.java
@@ -61,6 +61,7 @@ import 
org.apache.pulsar.broker.service.persistent.DispatchRateLimiter;
 import org.apache.pulsar.broker.service.persistent.PersistentSubscription;
 import org.apache.pulsar.broker.service.persistent.PersistentTopic;
 import org.apache.pulsar.broker.service.persistent.SubscribeRateLimiter;
+import org.apache.pulsar.broker.systopic.SystemTopicClient;
 import org.apache.pulsar.client.admin.PulsarAdminException;
 import org.apache.pulsar.client.api.Consumer;
 import org.apache.pulsar.client.api.ConsumerBuilder;
@@ -74,6 +75,7 @@ import 
org.apache.pulsar.client.api.SubscriptionInitialPosition;
 import org.apache.pulsar.client.api.SubscriptionMode;
 import org.apache.pulsar.client.api.SubscriptionType;
 import org.apache.pulsar.common.api.proto.CommandSubscribe;
+import org.apache.pulsar.common.events.PulsarEvent;
 import org.apache.pulsar.common.naming.NamespaceName;
 import org.apache.pulsar.common.naming.SystemTopicNames;
 import org.apache.pulsar.common.naming.TopicDomain;
@@ -3382,10 +3384,13 @@ public class TopicPoliciesTest extends 
MockedPulsarServiceBaseTest {
                 WhiteboxImpl.getInternalState(topicPoliciesService, 
"policiesCache");
         Map<TopicName, TopicPolicies> globalPoliciesCache =
                 WhiteboxImpl.getInternalState(topicPoliciesService, 
"globalPoliciesCache");
+        Map<NamespaceName, 
CompletableFuture<SystemTopicClient.Reader<PulsarEvent>>> readerCaches =
+                WhiteboxImpl.getInternalState(topicPoliciesService, 
"readerCaches");
 
         policyCacheInitMap.clear();
         policiesCache.clear();
         globalPoliciesCache.clear();
+        readerCaches.clear();
     }
 
     @DataProvider(name = "reloadPolicyTypes")
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/SystemTopicBasedTopicPoliciesServiceTest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/SystemTopicBasedTopicPoliciesServiceTest.java
index ab0f23d455d..f3b11120cb0 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/SystemTopicBasedTopicPoliciesServiceTest.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/SystemTopicBasedTopicPoliciesServiceTest.java
@@ -19,13 +19,19 @@
 package org.apache.pulsar.broker.service;
 
 import static org.assertj.core.api.Assertions.assertThat;
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.ArgumentMatchers.anyBoolean;
 import static org.mockito.Mockito.spy;
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
 import static org.testng.AssertJUnit.assertEquals;
+import static org.testng.AssertJUnit.assertFalse;
 import static org.testng.AssertJUnit.assertNotNull;
 import static org.testng.AssertJUnit.assertNull;
 import static org.testng.AssertJUnit.assertTrue;
 import java.lang.reflect.Field;
 import java.time.Duration;
+import java.util.ArrayList;
 import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
@@ -42,6 +48,10 @@ import java.util.concurrent.TimeUnit;
 import lombok.Cleanup;
 import lombok.extern.slf4j.Slf4j;
 import org.apache.commons.lang3.reflect.FieldUtils;
+import org.apache.logging.log4j.LogManager;
+import org.apache.logging.log4j.core.LogEvent;
+import org.apache.logging.log4j.core.Logger;
+import org.apache.logging.log4j.core.appender.AbstractAppender;
 import org.apache.pulsar.broker.auth.MockedPulsarServiceBaseTest;
 import 
org.apache.pulsar.broker.service.BrokerServiceException.TopicPoliciesCacheNotInitException;
 import org.apache.pulsar.broker.systopic.SystemTopicClient;
@@ -532,4 +542,184 @@ public class SystemTopicBasedTopicPoliciesServiceTest 
extends MockedPulsarServic
         Assert.assertNotNull(topicPolicies);
         Assert.assertEquals(topicPolicies.getMaxConsumerPerTopic(), 10);
     }
+
+    @Test
+    public void 
testPrepareInitPoliciesCacheAsyncThrowExceptionAfterCreateReader() throws 
Exception {
+        // catch the log output in SystemTopicBasedTopicPoliciesService
+        Logger logger = (Logger) 
LogManager.getLogger(SystemTopicBasedTopicPoliciesService.class);
+        List<String> logMessages = new ArrayList<>();
+        AbstractAppender appender = new AbstractAppender("TestAppender", null, 
null) {
+            @Override
+            public void append(LogEvent event) {
+                logMessages.add(event.getMessage().getFormattedMessage());
+            }
+        };
+        appender.start();
+        logger.addAppender(appender);
+
+        // create namespace-5 and topic
+        SystemTopicBasedTopicPoliciesService spyService = Mockito.spy(new 
SystemTopicBasedTopicPoliciesService(pulsar));
+        FieldUtils.writeField(pulsar, "topicPoliciesService", spyService, 
true);
+
+
+        admin.namespaces().createNamespace(NAMESPACE5);
+        final String topic = "persistent://" + NAMESPACE5 + "/test" + 
UUID.randomUUID();
+        admin.topics().createPartitionedTopic(topic, 1);
+
+        CompletableFuture<Void> future = 
spyService.getPoliciesCacheInit(NamespaceName.get(NAMESPACE5));
+        Assert.assertNull(future);
+
+        // mock readerCache and new a reader, then put this reader in 
readerCache.
+        // when new reader, would trigger __change_event topic of namespace-5 
created
+        // and would trigger prepareInitPoliciesCacheAsync()
+        ConcurrentHashMap<NamespaceName, 
CompletableFuture<SystemTopicClient.Reader<PulsarEvent>>>
+                spyReaderCaches = new ConcurrentHashMap<>();
+        CompletableFuture<SystemTopicClient.Reader<PulsarEvent>> 
readerCompletableFuture =
+                
spyService.createSystemTopicClient(NamespaceName.get(NAMESPACE5));
+        spyReaderCaches.put(NamespaceName.get(NAMESPACE5), 
readerCompletableFuture);
+        FieldUtils.writeDeclaredField(spyService, "readerCaches", 
spyReaderCaches, true);
+
+        // set topic policy. create producer for __change_event topic
+        admin.topicPolicies().setMaxConsumersPerSubscription(topic, 1);
+        future = 
spyService.getPoliciesCacheInit(NamespaceName.get(NAMESPACE5));
+        Assert.assertNotNull(future);
+
+        // trigger close reader of __change_event directly, simulate that 
reader
+        // is closed for some reason, such as topic unload or broker restart.
+        // since prepareInitPoliciesCacheAsync() has been executed, it would 
go into readMorePoliciesAsync(),
+        // throw exception, output "Closing the topic policies reader for" and 
do cleanCacheAndCloseReader()
+        SystemTopicClient.Reader<PulsarEvent> reader = 
readerCompletableFuture.get();
+        reader.close();
+        log.info("successfully close spy reader");
+        Awaitility.await().untilAsserted(() -> {
+            boolean logFound = logMessages.stream()
+                    .anyMatch(msg -> msg.contains("Closing the topic policies 
reader for"));
+            assertTrue(logFound);
+        });
+
+        // Since cleanCacheAndCloseReader() is executed, should add the failed 
reader into readerCache again.
+        // Then in SystemTopicBasedTopicPoliciesService, readerCache has a 
closed reader,
+        // and policyCacheInitMap do not contain a future.
+        // To simulate the situation: when getTopicPolicy() execute, it will 
do prepareInitPoliciesCacheAsync() and
+        // use a closed reader to read the __change_event topic. Then throw 
exception
+        spyReaderCaches.put(NamespaceName.get(NAMESPACE5), 
readerCompletableFuture);
+        FieldUtils.writeDeclaredField(spyService, "readerCaches", 
spyReaderCaches, true);
+
+        CompletableFuture<Void> prepareFuture = new CompletableFuture<>();
+        try {
+            prepareFuture = 
spyService.prepareInitPoliciesCacheAsync(NamespaceName.get(NAMESPACE5));
+            prepareFuture.get();
+            Assert.fail();
+        } catch (Exception e) {
+            // that is ok
+        }
+
+
+        // since prepareInitPoliciesCacheAsync() throw exception when 
initPolicesCache(),
+        // would clean readerCache and policyCacheInitMap
+        Assert.assertTrue(prepareFuture.isCompletedExceptionally());
+        Awaitility.await().untilAsserted(() -> {
+            CompletableFuture<Void> future1 = 
spyService.getPoliciesCacheInit(NamespaceName.get(NAMESPACE5));
+            Assert.assertNull(future1);
+            CompletableFuture<SystemTopicClient.Reader<PulsarEvent>> 
readerCompletableFuture1 =
+                    spyReaderCaches.get(NamespaceName.get(NAMESPACE5));
+            Assert.assertNull(readerCompletableFuture1);
+        });
+
+
+        // make sure not do cleanCacheAndCloseReader() twice
+        // totally trigger prepareInitPoliciesCacheAsync() twice, so the time 
of cleanCacheAndCloseReader() is 2.
+        // in previous code, the time would be 3
+        boolean logFound = logMessages.stream()
+                .anyMatch(msg -> msg.contains("occur exception on reader of 
__change_events topic"));
+        assertTrue(logFound);
+        boolean logFound2 = logMessages.stream()
+                .anyMatch(msg -> msg.contains("Failed to check the move events 
for the system topic"));
+        assertTrue(logFound2);
+        verify(spyService, times(2)).cleanCacheAndCloseReader(any(), 
anyBoolean(), anyBoolean());
+
+        // make sure not occur Recursive update
+        boolean logFound3 = logMessages.stream()
+                .anyMatch(msg -> msg.contains("Recursive update"));
+        assertFalse(logFound3);
+
+        // clean log appender
+        appender.stop();
+        logger.removeAppender(appender);
+    }
+
+    @Test
+    public void 
testPrepareInitPoliciesCacheAsyncThrowExceptionInCreateReader() throws 
Exception {
+        // catch the log output in SystemTopicBasedTopicPoliciesService
+        Logger logger = (Logger) 
LogManager.getLogger(SystemTopicBasedTopicPoliciesService.class);
+        List<String> logMessages = new ArrayList<>();
+        AbstractAppender appender = new AbstractAppender("TestAppender", null, 
null) {
+            @Override
+            public void append(LogEvent event) {
+                logMessages.add(event.getMessage().getFormattedMessage());
+            }
+        };
+        appender.start();
+        logger.addAppender(appender);
+
+        // create namespace-5 and topic
+        SystemTopicBasedTopicPoliciesService spyService =
+                Mockito.spy(new SystemTopicBasedTopicPoliciesService(pulsar));
+        FieldUtils.writeField(pulsar, "topicPoliciesService", spyService, 
true);
+
+
+        admin.namespaces().createNamespace(NAMESPACE5);
+        final String topic = "persistent://" + NAMESPACE5 + "/test" + 
UUID.randomUUID();
+        admin.topics().createPartitionedTopic(topic, 1);
+
+        CompletableFuture<Void> future = 
spyService.getPoliciesCacheInit(NamespaceName.get(NAMESPACE5));
+        Assert.assertNull(future);
+
+        // mock readerCache and put a failed readerCreateFuture in readerCache.
+        // simulate that when trigger prepareInitPoliciesCacheAsync(),
+        // it would use this failed readerFuture and go into corresponding 
logic
+        ConcurrentHashMap<NamespaceName, 
CompletableFuture<SystemTopicClient.Reader<PulsarEvent>>>
+                spyReaderCaches = new ConcurrentHashMap<>();
+        CompletableFuture<SystemTopicClient.Reader<PulsarEvent>> 
readerCompletableFuture = new CompletableFuture<>();
+        readerCompletableFuture.completeExceptionally(new Exception("create 
reader fail"));
+        spyReaderCaches.put(NamespaceName.get(NAMESPACE5), 
readerCompletableFuture);
+        FieldUtils.writeDeclaredField(spyService, "readerCaches", 
spyReaderCaches, true);
+
+        // trigger prepareInitPoliciesCacheAsync()
+        CompletableFuture<Void> prepareFuture = new CompletableFuture<>();
+        try {
+            prepareFuture = 
spyService.prepareInitPoliciesCacheAsync(NamespaceName.get(NAMESPACE5));
+            prepareFuture.get();
+            Assert.fail();
+        } catch (Exception e) {
+            // that is ok
+        }
+
+        // since prepareInitPoliciesCacheAsync() throw exception when 
createReader,
+        // would clean readerCache and policyCacheInitMap.
+        Assert.assertTrue(prepareFuture.isCompletedExceptionally());
+        Awaitility.await().untilAsserted(() -> {
+            CompletableFuture<Void> future1 = 
spyService.getPoliciesCacheInit(NamespaceName.get(NAMESPACE5));
+            Assert.assertNull(future1);
+            CompletableFuture<SystemTopicClient.Reader<PulsarEvent>> 
readerCompletableFuture1 =
+                    spyReaderCaches.get(NamespaceName.get(NAMESPACE5));
+            Assert.assertNull(readerCompletableFuture1);
+        });
+
+
+        // make sure not do cleanCacheAndCloseReader() twice
+        // totally trigger prepareInitPoliciesCacheAsync() once, so the time 
of cleanCacheAndCloseReader() is 1.
+        boolean logFound = logMessages.stream()
+                .anyMatch(msg -> msg.contains("occur exception on reader of 
__change_events topic"));
+        assertTrue(logFound);
+        boolean logFound2 = logMessages.stream()
+                .anyMatch(msg -> msg.contains("Failed to check the move events 
for the system topic")
+                        || msg.contains("Failed to read event from the system 
topic"));
+        assertFalse(logFound2);
+        verify(spyService, times(1)).cleanCacheAndCloseReader(any(), 
anyBoolean(), anyBoolean());
+
+        // clean log appender
+        appender.stop();
+        logger.removeAppender(appender);
+    }
 }

Reply via email to