This is an automated email from the ASF dual-hosted git repository.

lhotari pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/master by this push:
     new 47b8d5d86f1 [fix][broker] fix prepareInitPoliciesCacheAsync in 
SystemTopicBasedTopicPoliciesService (#24980)
47b8d5d86f1 is described below

commit 47b8d5d86f1d19c324483d25ad8ed01679389eb9
Author: ken <[email protected]>
AuthorDate: Wed Dec 10 22:56:28 2025 +0800

    [fix][broker] fix prepareInitPoliciesCacheAsync in 
SystemTopicBasedTopicPoliciesService (#24980)
    
    Co-authored-by: fanjianye <[email protected]>
---
 .../SystemTopicBasedTopicPoliciesService.java      |  77 +++++----
 .../SystemTopicBasedTopicPoliciesServiceTest.java  | 192 +++++++++++++++++++++
 2 files changed, 232 insertions(+), 37 deletions(-)

diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/SystemTopicBasedTopicPoliciesService.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/SystemTopicBasedTopicPoliciesService.java
index 8287583a3d7..c3d88b9c723 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/SystemTopicBasedTopicPoliciesService.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/SystemTopicBasedTopicPoliciesService.java
@@ -570,42 +570,53 @@ public class SystemTopicBasedTopicPoliciesService 
implements TopicPoliciesServic
             return CompletableFuture.completedFuture(false);
         }
         return 
pulsarService.getPulsarResources().getNamespaceResources().getPoliciesAsync(namespace)
-                        .thenCompose(namespacePolicies -> {
-                            if (namespacePolicies.isEmpty() || 
namespacePolicies.get().deleted) {
-                                log.info("[{}] skip prepare init policies 
cache since the namespace is deleted",
-                                        namespace);
-                                return 
CompletableFuture.completedFuture(false);
-                            }
+                .thenCompose(namespacePolicies -> {
+                    if (namespacePolicies.isEmpty() || 
namespacePolicies.get().deleted) {
+                        log.info("[{}] skip prepare init policies cache since 
the namespace is deleted",
+                                namespace);
+                        return CompletableFuture.completedFuture(false);
+                    }
 
-                            return 
policyCacheInitMap.computeIfAbsent(namespace, (k) -> {
-                                final 
CompletableFuture<SystemTopicClient.Reader<PulsarEvent>> 
readerCompletableFuture =
-                                        newReader(namespace);
-                                final CompletableFuture<Void> initFuture = 
readerCompletableFuture
-                                        .thenCompose(reader -> {
-                                            final CompletableFuture<Void> 
stageFuture = new CompletableFuture<>();
-                                            initPolicesCache(reader, 
stageFuture);
-                                            return stageFuture
-                                                    // Read policies in 
background
-                                                    .thenAccept(__ -> 
readMorePoliciesAsync(reader));
-                                        });
-                                initFuture.exceptionallyAsync(ex -> {
+                    CompletableFuture<Void> initNamespacePolicyFuture = new 
CompletableFuture<>();
+                    CompletableFuture<Void> existingFuture =
+                            policyCacheInitMap.putIfAbsent(namespace, 
initNamespacePolicyFuture);
+                    if (existingFuture == null) {
+                        final 
CompletableFuture<SystemTopicClient.Reader<PulsarEvent>> 
readerCompletableFuture =
+                                newReader(namespace);
+                        readerCompletableFuture
+                                .thenCompose(reader -> {
+                                    final CompletableFuture<Void> stageFuture 
= new CompletableFuture<>();
+                                    initPolicesCache(reader, stageFuture);
+                                    return stageFuture
+                                            // Read policies in background
+                                            .thenAccept(__ -> 
readMorePoliciesAsync(reader));
+                                }).thenApply(__ -> {
+                                    initNamespacePolicyFuture.complete(null);
+                                    return null;
+                                }).exceptionally(ex -> {
                                     try {
-                                        if (closed.get()) {
-                                            return null;
+                                        if 
(readerCompletableFuture.isCompletedExceptionally()) {
+                                            log.error("[{}] Failed to create 
reader on __change_events topic",
+                                                    namespace, ex);
+                                            
initNamespacePolicyFuture.completeExceptionally(ex);
+                                            
cleanPoliciesCacheInitMap(namespace, true);
+                                        } else {
+                                            
initNamespacePolicyFuture.completeExceptionally(ex);
+                                            
cleanPoliciesCacheInitMap(namespace, isAlreadyClosedException(ex));
                                         }
-                                        cleanPoliciesCacheInitMap(
-                                                namespace, 
readerCompletableFuture.isCompletedExceptionally());
                                     } catch (Throwable cleanupEx) {
                                         // Adding this catch to avoid break 
callback chain
                                         log.error("[{}] Failed to cleanup 
reader on __change_events topic",
                                                 namespace, cleanupEx);
                                     }
                                     return null;
-                                }, pulsarService.getExecutor());
-                                // let caller know we've got an exception.
-                                return initFuture;
-                            }).thenApply(__ -> true);
-                        });
+                                });
+
+                        return initNamespacePolicyFuture.thenApply(__ -> true);
+                    } else {
+                        return existingFuture.thenApply(__ -> true);
+                    }
+                });
     }
 
     private CompletableFuture<SystemTopicClient.Reader<PulsarEvent>> 
newReader(NamespaceName ns) {
@@ -614,10 +625,6 @@ public class SystemTopicBasedTopicPoliciesService 
implements TopicPoliciesServic
                 return createSystemTopicClient(ns);
             }
 
-            if (existingFuture.isDone() && 
existingFuture.isCompletedExceptionally()) {
-                return existingFuture.exceptionallyCompose(ex ->
-                        isAlreadyClosedException(ex) ? existingFuture : 
createSystemTopicClient(ns));
-            }
             return existingFuture;
         });
     }
@@ -687,8 +694,6 @@ public class SystemTopicBasedTopicPoliciesService 
implements TopicPoliciesServic
                 log.error("[{}] Failed to check the move events for the system 
topic",
                         reader.getSystemTopic().getTopicName(), ex);
                 future.completeExceptionally(ex);
-                
cleanPoliciesCacheInitMap(reader.getSystemTopic().getTopicName().getNamespaceObject(),
-                        isAlreadyClosedException(ex));
                 return;
             }
             if (hasMore) {
@@ -707,8 +712,6 @@ public class SystemTopicBasedTopicPoliciesService 
implements TopicPoliciesServic
                     log.error("[{}] Failed to read event from the system 
topic.",
                             reader.getSystemTopic().getTopicName(), e);
                     future.completeExceptionally(e);
-                    
cleanPoliciesCacheInitMap(reader.getSystemTopic().getTopicName().getNamespaceObject(),
-                            isAlreadyClosedException(ex));
                     return null;
                 });
             } else {
@@ -734,8 +737,8 @@ public class SystemTopicBasedTopicPoliciesService 
implements TopicPoliciesServic
         });
     }
 
-
-    private void cleanPoliciesCacheInitMap(@NonNull NamespaceName namespace, 
boolean closeReader) {
+    @VisibleForTesting
+    void cleanPoliciesCacheInitMap(@NonNull NamespaceName namespace, boolean 
closeReader) {
         if (!closeReader) {
             policyCacheInitMap.remove(namespace);
             return;
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/SystemTopicBasedTopicPoliciesServiceTest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/SystemTopicBasedTopicPoliciesServiceTest.java
index 607a8e3f9c2..2f503e5512a 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/SystemTopicBasedTopicPoliciesServiceTest.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/SystemTopicBasedTopicPoliciesServiceTest.java
@@ -18,10 +18,17 @@
  */
 package org.apache.pulsar.broker.service;
 
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.ArgumentMatchers.anyBoolean;
 import static org.mockito.Mockito.spy;
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
 import static org.testng.AssertJUnit.assertEquals;
+import static org.testng.AssertJUnit.assertFalse;
 import static org.testng.AssertJUnit.assertNotNull;
 import static org.testng.AssertJUnit.assertNull;
+import static org.testng.AssertJUnit.assertTrue;
+import java.util.ArrayList;
 import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
@@ -36,6 +43,10 @@ import java.util.concurrent.Executors;
 import lombok.Cleanup;
 import lombok.extern.slf4j.Slf4j;
 import org.apache.commons.lang3.reflect.FieldUtils;
+import org.apache.logging.log4j.LogManager;
+import org.apache.logging.log4j.core.LogEvent;
+import org.apache.logging.log4j.core.Logger;
+import org.apache.logging.log4j.core.appender.AbstractAppender;
 import org.apache.pulsar.broker.auth.MockedPulsarServiceBaseTest;
 import org.apache.pulsar.broker.systopic.SystemTopicClient;
 import org.apache.pulsar.client.admin.PulsarAdminException;
@@ -465,4 +476,185 @@ public class SystemTopicBasedTopicPoliciesServiceTest 
extends MockedPulsarServic
         Assert.assertNotNull(topicPolicies);
         Assert.assertEquals(topicPolicies.getMaxConsumerPerTopic(), 10);
     }
+
+    @Test
+    public void 
testPrepareInitPoliciesCacheAsyncThrowExceptionAfterCreateReader() throws 
Exception {
+        // catch the log output in SystemTopicBasedTopicPoliciesService
+        Logger logger = (Logger) 
LogManager.getLogger(SystemTopicBasedTopicPoliciesService.class);
+        List<String> logMessages = new ArrayList<>();
+        AbstractAppender appender = new AbstractAppender("TestAppender", null, 
null) {
+            @Override
+            public void append(LogEvent event) {
+                logMessages.add(event.getMessage().getFormattedMessage());
+            }
+        };
+        appender.start();
+        logger.addAppender(appender);
+
+        // create namespace-5 and topic
+        SystemTopicBasedTopicPoliciesService spyService =
+                Mockito.spy(new SystemTopicBasedTopicPoliciesService(pulsar));
+        FieldUtils.writeField(pulsar, "topicPoliciesService", spyService, 
true);
+
+
+        admin.namespaces().createNamespace(NAMESPACE5);
+        final String topic = "persistent://" + NAMESPACE5 + "/test" + 
UUID.randomUUID();
+        admin.topics().createPartitionedTopic(topic, 1);
+
+        CompletableFuture<Void> future = 
spyService.getPoliciesCacheInit(NamespaceName.get(NAMESPACE5));
+        Assert.assertNull(future);
+
+        // mock readerCache and new a reader, then put this reader in 
readerCache.
+        // when new reader, would trigger __change_event topic of namespace-5 
created
+        // and would trigger prepareInitPoliciesCacheAsync()
+        ConcurrentHashMap<NamespaceName, 
CompletableFuture<SystemTopicClient.Reader<PulsarEvent>>>
+                spyReaderCaches = new ConcurrentHashMap<>();
+        CompletableFuture<SystemTopicClient.Reader<PulsarEvent>> 
readerCompletableFuture =
+                
spyService.createSystemTopicClient(NamespaceName.get(NAMESPACE5));
+        spyReaderCaches.put(NamespaceName.get(NAMESPACE5), 
readerCompletableFuture);
+        FieldUtils.writeDeclaredField(spyService, "readerCaches", 
spyReaderCaches, true);
+
+        // set topic policy. create producer for __change_event topic
+        admin.topicPolicies().setMaxConsumersPerSubscription(topic, 1);
+        future = 
spyService.getPoliciesCacheInit(NamespaceName.get(NAMESPACE5));
+        Assert.assertNotNull(future);
+
+        // trigger close reader of __change_event directly, simulate that 
reader
+        // is closed for some reason, such as topic unload or broker restart.
+        // since prepareInitPoliciesCacheAsync() has been executed, it would 
go into readMorePoliciesAsync(),
+        // throw exception, output "Closing the topic policies reader for" and 
do cleanPoliciesCacheInitMap()
+        SystemTopicClient.Reader<PulsarEvent> reader = 
readerCompletableFuture.get();
+        reader.close();
+        log.info("successfully close spy reader");
+        Awaitility.await().untilAsserted(() -> {
+            boolean logFound = logMessages.stream()
+                    .anyMatch(msg -> msg.contains("Closing the topic policies 
reader for"));
+            assertTrue(logFound);
+        });
+
+
+        // Since cleanPoliciesCacheInitMap() is executed, should add the 
failed reader into readerCache again.
+        // Then in SystemTopicBasedTopicPoliciesService, readerCache has a 
closed reader,
+        // and policyCacheInitMap do not contain a future.
+        // To simulate the situation: when getTopicPolicy() execute, it will 
do prepareInitPoliciesCacheAsync() and
+        // use a closed reader to read the __change_event topic. Then throw 
exception
+        spyReaderCaches.put(NamespaceName.get(NAMESPACE5), 
readerCompletableFuture);
+        FieldUtils.writeDeclaredField(spyService, "readerCaches", 
spyReaderCaches, true);
+
+        CompletableFuture<Boolean> prepareFuture = new CompletableFuture<>();
+        try {
+            prepareFuture = 
spyService.prepareInitPoliciesCacheAsync(NamespaceName.get(NAMESPACE5));
+            prepareFuture.get();
+            Assert.fail();
+        } catch (Exception e) {
+            // that is ok
+        }
+
+        // since prepareInitPoliciesCacheAsync() throw exception when 
initPolicesCache(),
+        // would clean readerCache and policyCacheInitMap.
+        Assert.assertTrue(prepareFuture.isCompletedExceptionally());
+        Awaitility.await().untilAsserted(() -> {
+            CompletableFuture<Void> future1 = 
spyService.getPoliciesCacheInit(NamespaceName.get(NAMESPACE5));
+            Assert.assertNull(future1);
+            CompletableFuture<SystemTopicClient.Reader<PulsarEvent>> 
readerCompletableFuture1 =
+                    spyReaderCaches.get(NamespaceName.get(NAMESPACE5));
+            Assert.assertNull(readerCompletableFuture1);
+        });
+
+
+        // make sure not do cleanPoliciesCacheInitMap() twice
+        // totally trigger prepareInitPoliciesCacheAsync() twice, so the time 
of cleanPoliciesCacheInitMap() is 2.
+        // in previous code, the time would be 3
+        boolean logFound = logMessages.stream()
+                .anyMatch(msg -> msg.contains("Failed to create reader on 
__change_events topic"));
+        assertFalse(logFound);
+        boolean logFound2 = logMessages.stream()
+                .anyMatch(msg -> msg.contains("Failed to check the move events 
for the system topic"));
+        assertTrue(logFound2);
+        verify(spyService, times(2)).cleanPoliciesCacheInitMap(any(), 
anyBoolean());
+
+        // make sure not occur Recursive update
+        boolean logFound3 = logMessages.stream()
+                .anyMatch(msg -> msg.contains("Recursive update"));
+        assertFalse(logFound3);
+
+        // clean log appender
+        appender.stop();
+        logger.removeAppender(appender);
+    }
+
+    @Test
+    public void 
testPrepareInitPoliciesCacheAsyncThrowExceptionInCreateReader() throws 
Exception {
+        // catch the log output in SystemTopicBasedTopicPoliciesService
+        Logger logger = (Logger) 
LogManager.getLogger(SystemTopicBasedTopicPoliciesService.class);
+        List<String> logMessages = new ArrayList<>();
+        AbstractAppender appender = new AbstractAppender("TestAppender", null, 
null) {
+            @Override
+            public void append(LogEvent event) {
+                logMessages.add(event.getMessage().getFormattedMessage());
+            }
+        };
+        appender.start();
+        logger.addAppender(appender);
+
+        // create namespace-5 and topic
+        SystemTopicBasedTopicPoliciesService spyService =
+                Mockito.spy(new SystemTopicBasedTopicPoliciesService(pulsar));
+        FieldUtils.writeField(pulsar, "topicPoliciesService", spyService, 
true);
+
+
+        admin.namespaces().createNamespace(NAMESPACE5);
+        final String topic = "persistent://" + NAMESPACE5 + "/test" + 
UUID.randomUUID();
+        admin.topics().createPartitionedTopic(topic, 1);
+
+        CompletableFuture<Void> future = 
spyService.getPoliciesCacheInit(NamespaceName.get(NAMESPACE5));
+        Assert.assertNull(future);
+
+        // mock readerCache and put a failed readerCreateFuture in readerCache.
+        // simulate that when trigger prepareInitPoliciesCacheAsync(),
+        // it would use this failed readerFuture and go into corresponding 
logic
+        ConcurrentHashMap<NamespaceName, 
CompletableFuture<SystemTopicClient.Reader<PulsarEvent>>>
+                spyReaderCaches = new ConcurrentHashMap<>();
+        CompletableFuture<SystemTopicClient.Reader<PulsarEvent>> 
readerCompletableFuture = new CompletableFuture<>();
+        readerCompletableFuture.completeExceptionally(new Exception("create 
reader fail"));
+        spyReaderCaches.put(NamespaceName.get(NAMESPACE5), 
readerCompletableFuture);
+        FieldUtils.writeDeclaredField(spyService, "readerCaches", 
spyReaderCaches, true);
+
+        // trigger prepareInitPoliciesCacheAsync()
+        CompletableFuture<Boolean> prepareFuture = new CompletableFuture<>();
+        try {
+            prepareFuture = 
spyService.prepareInitPoliciesCacheAsync(NamespaceName.get(NAMESPACE5));
+            prepareFuture.get();
+            Assert.fail();
+        } catch (Exception e) {
+            // that is ok
+        }
+
+        // since prepareInitPoliciesCacheAsync() throw exception when 
createReader,
+        // would clean readerCache and policyCacheInitMap.
+        Assert.assertTrue(prepareFuture.isCompletedExceptionally());
+        Awaitility.await().untilAsserted(() -> {
+            CompletableFuture<Void> future1 = 
spyService.getPoliciesCacheInit(NamespaceName.get(NAMESPACE5));
+            Assert.assertNull(future1);
+            CompletableFuture<SystemTopicClient.Reader<PulsarEvent>> 
readerCompletableFuture1 =
+                    spyReaderCaches.get(NamespaceName.get(NAMESPACE5));
+            Assert.assertNull(readerCompletableFuture1);
+        });
+
+
+        // make sure not do cleanPoliciesCacheInitMap() twice
+        // totally trigger prepareInitPoliciesCacheAsync() once, so the time 
of cleanPoliciesCacheInitMap() is 1.
+        boolean logFound = logMessages.stream()
+                .anyMatch(msg -> msg.contains("Failed to create reader on 
__change_events topic"));
+        assertTrue(logFound);
+        boolean logFound2 = logMessages.stream()
+                .anyMatch(msg -> msg.contains("Failed to check the move events 
for the system topic")
+                        || msg.contains("Failed to read event from the system 
topic"));
+        assertFalse(logFound2);
+        verify(spyService, times(1)).cleanPoliciesCacheInitMap(any(), 
anyBoolean());
+
+        // clean log appender
+        appender.stop();
+        logger.removeAppender(appender);
+    }
 }

Reply via email to