This is an automated email from the ASF dual-hosted git repository.

lhotari pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/master by this push:
     new 3e6133ee014 [fix][broker] fix flaky test in 
SystemTopicBasedTopicPoliciesServiceTest (#25098)
3e6133ee014 is described below

commit 3e6133ee014b0195476e9e25a71194ed86f4bd18
Author: ken <[email protected]>
AuthorDate: Fri Jan 2 22:39:38 2026 +0800

    [fix][broker] fix flaky test in SystemTopicBasedTopicPoliciesServiceTest 
(#25098)
    
    Co-authored-by: fanjianye <[email protected]>
    Co-authored-by: Lari Hotari <[email protected]>
---
 .../SystemTopicBasedTopicPoliciesServiceTest.java  | 70 +++++++---------------
 .../org/apache/pulsar/utils/TestLogAppender.java   | 21 +++++++
 2 files changed, 43 insertions(+), 48 deletions(-)

diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/SystemTopicBasedTopicPoliciesServiceTest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/SystemTopicBasedTopicPoliciesServiceTest.java
index 2f503e5512a..6b050e8b421 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/SystemTopicBasedTopicPoliciesServiceTest.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/SystemTopicBasedTopicPoliciesServiceTest.java
@@ -28,7 +28,6 @@ import static org.testng.AssertJUnit.assertFalse;
 import static org.testng.AssertJUnit.assertNotNull;
 import static org.testng.AssertJUnit.assertNull;
 import static org.testng.AssertJUnit.assertTrue;
-import java.util.ArrayList;
 import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
@@ -43,10 +42,6 @@ import java.util.concurrent.Executors;
 import lombok.Cleanup;
 import lombok.extern.slf4j.Slf4j;
 import org.apache.commons.lang3.reflect.FieldUtils;
-import org.apache.logging.log4j.LogManager;
-import org.apache.logging.log4j.core.LogEvent;
-import org.apache.logging.log4j.core.Logger;
-import org.apache.logging.log4j.core.appender.AbstractAppender;
 import org.apache.pulsar.broker.auth.MockedPulsarServiceBaseTest;
 import org.apache.pulsar.broker.systopic.SystemTopicClient;
 import org.apache.pulsar.client.admin.PulsarAdminException;
@@ -57,6 +52,7 @@ import org.apache.pulsar.common.naming.TopicName;
 import org.apache.pulsar.common.policies.data.ClusterData;
 import org.apache.pulsar.common.policies.data.TenantInfoImpl;
 import org.apache.pulsar.common.policies.data.TopicPolicies;
+import org.apache.pulsar.utils.TestLogAppender;
 import org.assertj.core.api.Assertions;
 import org.awaitility.Awaitility;
 import org.mockito.Mockito;
@@ -479,19 +475,12 @@ public class SystemTopicBasedTopicPoliciesServiceTest 
extends MockedPulsarServic
 
     @Test
     public void 
testPrepareInitPoliciesCacheAsyncThrowExceptionAfterCreateReader() throws 
Exception {
-        // catch the log output in SystemTopicBasedTopicPoliciesService
-        Logger logger = (Logger) 
LogManager.getLogger(SystemTopicBasedTopicPoliciesService.class);
-        List<String> logMessages = new ArrayList<>();
-        AbstractAppender appender = new AbstractAppender("TestAppender", null, 
null) {
-            @Override
-            public void append(LogEvent event) {
-                logMessages.add(event.getMessage().getFormattedMessage());
-            }
-        };
-        appender.start();
-        logger.addAppender(appender);
+        // catch the log output
+        @Cleanup
+        TestLogAppender testLogAppender = TestLogAppender.create(log);
 
         // create namespace-5 and topic
+        pulsar.getTopicPoliciesService().close();
         SystemTopicBasedTopicPoliciesService spyService =
                 Mockito.spy(new SystemTopicBasedTopicPoliciesService(pulsar));
         FieldUtils.writeField(pulsar, "topicPoliciesService", spyService, 
true);
@@ -527,8 +516,8 @@ public class SystemTopicBasedTopicPoliciesServiceTest 
extends MockedPulsarServic
         reader.close();
         log.info("successfully close spy reader");
         Awaitility.await().untilAsserted(() -> {
-            boolean logFound = logMessages.stream()
-                    .anyMatch(msg -> msg.contains("Closing the topic policies 
reader for"));
+            boolean logFound = 
testLogAppender.getEvents().stream().anyMatch(logEvent ->
+                    logEvent.getMessage().toString().contains("Closing the 
topic policies reader for"));
             assertTrue(logFound);
         });
 
@@ -565,39 +554,28 @@ public class SystemTopicBasedTopicPoliciesServiceTest 
extends MockedPulsarServic
         // make sure not do cleanPoliciesCacheInitMap() twice
         // totally trigger prepareInitPoliciesCacheAsync() twice, so the time 
of cleanPoliciesCacheInitMap() is 2.
         // in previous code, the time would be 3
-        boolean logFound = logMessages.stream()
-                .anyMatch(msg -> msg.contains("Failed to create reader on 
__change_events topic"));
+        boolean logFound = 
testLogAppender.getEvents().stream().anyMatch(logEvent ->
+                logEvent.getMessage().toString().contains("Failed to create 
reader on __change_events topic"));
         assertFalse(logFound);
-        boolean logFound2 = logMessages.stream()
-                .anyMatch(msg -> msg.contains("Failed to check the move events 
for the system topic"));
+        boolean logFound2 = 
testLogAppender.getEvents().stream().anyMatch(logEvent ->
+                logEvent.getMessage().toString().contains("Failed to check the 
move events for the system topic"));
         assertTrue(logFound2);
         verify(spyService, times(2)).cleanPoliciesCacheInitMap(any(), 
anyBoolean());
 
         // make sure not occur Recursive update
-        boolean logFound3 = logMessages.stream()
-                .anyMatch(msg -> msg.contains("Recursive update"));
+        boolean logFound3 = 
testLogAppender.getEvents().stream().anyMatch(logEvent ->
+                logEvent.getMessage().toString().contains("Recursive update"));
         assertFalse(logFound3);
-
-        // clean log appender
-        appender.stop();
-        logger.removeAppender(appender);
     }
 
     @Test
     public void 
testPrepareInitPoliciesCacheAsyncThrowExceptionInCreateReader() throws 
Exception {
-        // catch the log output in SystemTopicBasedTopicPoliciesService
-        Logger logger = (Logger) 
LogManager.getLogger(SystemTopicBasedTopicPoliciesService.class);
-        List<String> logMessages = new ArrayList<>();
-        AbstractAppender appender = new AbstractAppender("TestAppender", null, 
null) {
-            @Override
-            public void append(LogEvent event) {
-                logMessages.add(event.getMessage().getFormattedMessage());
-            }
-        };
-        appender.start();
-        logger.addAppender(appender);
+        // catch the log output
+        @Cleanup
+        TestLogAppender testLogAppender = TestLogAppender.create(log);
 
         // create namespace-5 and topic
+        pulsar.getTopicPoliciesService().close();
         SystemTopicBasedTopicPoliciesService spyService =
                 Mockito.spy(new SystemTopicBasedTopicPoliciesService(pulsar));
         FieldUtils.writeField(pulsar, "topicPoliciesService", spyService, 
true);
@@ -644,17 +622,13 @@ public class SystemTopicBasedTopicPoliciesServiceTest 
extends MockedPulsarServic
 
         // make sure not do cleanPoliciesCacheInitMap() twice
         // totally trigger prepareInitPoliciesCacheAsync() once, so the time 
of cleanPoliciesCacheInitMap() is 1.
-        boolean logFound = logMessages.stream()
-                .anyMatch(msg -> msg.contains("Failed to create reader on 
__change_events topic"));
+        boolean logFound = 
testLogAppender.getEvents().stream().anyMatch(logEvent ->
+                logEvent.getMessage().toString().contains("Failed to create 
reader on __change_events topic"));
         assertTrue(logFound);
-        boolean logFound2 = logMessages.stream()
-                .anyMatch(msg -> msg.contains("Failed to check the move events 
for the system topic")
-                        || msg.contains("Failed to read event from the system 
topic"));
+        boolean logFound2 = 
testLogAppender.getEvents().stream().anyMatch(logEvent ->
+                logEvent.getMessage().toString().contains("Failed to check the 
move events for the system topic")
+                        || logEvent.getMessage().toString().contains("Failed 
to read event from the system topic"));
         assertFalse(logFound2);
         verify(spyService, times(1)).cleanPoliciesCacheInitMap(any(), 
anyBoolean());
-
-        // clean log appender
-        appender.stop();
-        logger.removeAppender(appender);
     }
 }
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/utils/TestLogAppender.java 
b/pulsar-broker/src/test/java/org/apache/pulsar/utils/TestLogAppender.java
index cfb07913b53..a8fecaeb655 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/utils/TestLogAppender.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/utils/TestLogAppender.java
@@ -32,6 +32,7 @@ import 
org.apache.logging.log4j.core.appender.AbstractAppender;
 import org.apache.logging.log4j.core.config.Configuration;
 import org.apache.logging.log4j.core.config.LoggerConfig;
 import org.apache.logging.log4j.core.layout.PatternLayout;
+import org.slf4j.Logger;
 
 /**
  * Log4J appender that captures all log events for a specified logger.
@@ -57,6 +58,25 @@ public class TestLogAppender extends AbstractAppender 
implements AutoCloseable {
         context.updateLoggers();
         return testAppender;
     }
+    /**
+     * Create a new TestLogAppender for a given logger. Use the {@link 
#close()} method to stop it and unregister it
+     * from Log4J.
+     * @param log The name of the logger instance will be used as the logger 
name to register the appender to.
+     * @return return the new TestLogAppender instance.
+     */
+    public static TestLogAppender create(Logger log) {
+        return create(Optional.of(log.getName()));
+    }
+
+    /**
+     * Create a new TestLogAppender for a given class. Use the {@link 
#close()} method to stop it and unregister it
+     * from Log4J.
+     * @param clazz The name of the class will be used as the logger name to 
register the appender to.
+     * @return return the new TestLogAppender instance.
+     */
+    public static TestLogAppender create(Class<?> clazz) {
+        return create(Optional.of(clazz.getName()));
+    }
 
     TestLogAppender(LoggerConfig loggerConfig, Runnable onConfigurationChange) 
{
         super("TestAppender" + idGenerator.incrementAndGet(), null, 
PatternLayout.createDefaultLayout(), false, null);
@@ -64,6 +84,7 @@ public class TestLogAppender extends AbstractAppender 
implements AutoCloseable {
         this.onConfigurationChange = onConfigurationChange;
     }
 
+
     @Override
     public void append(LogEvent event) {
         events.add(event.toImmutable());

Reply via email to