Copilot commented on code in PR #24980:
URL: https://github.com/apache/pulsar/pull/24980#discussion_r2527364163
##########
pulsar-broker/src/test/java/org/apache/pulsar/broker/service/SystemTopicBasedTopicPoliciesServiceTest.java:
##########
@@ -465,4 +476,186 @@ public void
testCreateNamespaceEventsSystemTopicFactoryException() throws Except
Assert.assertNotNull(topicPolicies);
Assert.assertEquals(topicPolicies.getMaxConsumerPerTopic(), 10);
}
+
+ @Test
+ public void
testPrepareInitPoliciesCacheAsyncThrowExceptionAfterCreateReader() throws
Exception {
+ // catch the log output in SystemTopicBasedTopicPoliciesService
+ Logger logger = (Logger)
LogManager.getLogger(SystemTopicBasedTopicPoliciesService.class);
+ List<String> logMessages = new ArrayList<>();
+ AbstractAppender appender = new AbstractAppender("TestAppender", null,
null) {
+ @Override
+ public void append(LogEvent event) {
+ logMessages.add(event.getMessage().getFormattedMessage());
+ }
+ };
+ appender.start();
+ logger.addAppender(appender);
+
+ // create namespace-5 and topic
+ SystemTopicBasedTopicPoliciesService spyService =
+ Mockito.spy(new SystemTopicBasedTopicPoliciesService(pulsar));
+ FieldUtils.writeField(pulsar, "topicPoliciesService", spyService,
true);
+
+
+ admin.namespaces().createNamespace(NAMESPACE5);
+ final String topic = "persistent://" + NAMESPACE5 + "/test" +
UUID.randomUUID();
+ admin.topics().createPartitionedTopic(topic, 1);
+
+ CompletableFuture<Void> future =
spyService.getPoliciesCacheInit(NamespaceName.get(NAMESPACE5));
+ Assert.assertNull(future);
+
+ // mock readerCache and new a reader, then put this reader in
readerCache.
+ // when new reader, would trigger __change_event topic of namespace-5
created
+ // and would trigger prepareInitPoliciesCacheAsync()
+ ConcurrentHashMap<NamespaceName,
CompletableFuture<SystemTopicClient.Reader<PulsarEvent>>>
+ spyReaderCaches = new ConcurrentHashMap<>();
+ CompletableFuture<SystemTopicClient.Reader<PulsarEvent>>
readerCompletableFuture =
+
spyService.createSystemTopicClient(NamespaceName.get(NAMESPACE5));
+ spyReaderCaches.put(NamespaceName.get(NAMESPACE5),
readerCompletableFuture);
+ FieldUtils.writeDeclaredField(spyService, "readerCaches",
spyReaderCaches, true);
+
+ // set topic policy. create producer for __change_event topic
+ admin.topicPolicies().setMaxConsumersPerSubscription(topic, 1);
+ future =
spyService.getPoliciesCacheInit(NamespaceName.get(NAMESPACE5));
+ Assert.assertNotNull(future);
+
+ // trigger close reader of __change_event directly, simulate that
reader
+ // is closed for some reason, such as topic unload or broker restart.
+ // since prepareInitPoliciesCacheAsync() has been executed, it would
go into readMorePoliciesAsync(),
+ // throw exception, output "Closing the topic policies reader for" and
do cleanPoliciesCacheInitMap()
+ SystemTopicClient.Reader<PulsarEvent> reader =
readerCompletableFuture.get();
+ reader.close();
+ log.info("successfully close spy reader");
+ Awaitility.await().untilAsserted(() -> {
+ boolean logFound = logMessages.stream()
+ .anyMatch(msg -> msg.contains("Closing the topic policies
reader for"));
+ assertTrue(logFound);
+ });
+
+
+ // Since cleanPoliciesCacheInitMap() is executed, should add the
failed reader into readerCache again.
+ // Then in SystemTopicBasedTopicPoliciesService, readerCache has a
closed reader,
+ // and policyCacheInitMap do not contain a future.
+ // To simulate the situation: when getTopicPolicy() execute, it will
do prepareInitPoliciesCacheAsync() and
+ // use a closed reader to read the __change_event topic. Then throw
exception
+ spyReaderCaches.put(NamespaceName.get(NAMESPACE5),
readerCompletableFuture);
+ FieldUtils.writeDeclaredField(spyService, "readerCaches",
spyReaderCaches, true);
+
+ CompletableFuture<Boolean> prepareFuture = new CompletableFuture<>();
+ try {
+ prepareFuture =
spyService.prepareInitPoliciesCacheAsync(NamespaceName.get(NAMESPACE5));
+ prepareFuture.get();
+ Assert.fail();
+ } catch (Exception e) {
+ // that is ok
+ }
+
+ // since prepareInitPoliciesCacheAsync() throw exception when
initPolicesCache(),
+ // would clean readerCache and policyCacheInitMap.
+ // sleep 500ms to make sure clean operation finish.
+ Thread.sleep(500);
+ Assert.assertTrue(prepareFuture.isCompletedExceptionally());
+ future =
spyService.getPoliciesCacheInit(NamespaceName.get(NAMESPACE5));
+ Assert.assertNull(future);
+ CompletableFuture<SystemTopicClient.Reader<PulsarEvent>>
readerCompletableFuture1 =
+ spyReaderCaches.get(NamespaceName.get(NAMESPACE5));
+ Assert.assertNull(readerCompletableFuture1);
+
+
+ // make sure not do cleanPoliciesCacheInitMap() twice
+ // totally trigger prepareInitPoliciesCacheAsync() twice, so the time
of cleanCacheAndCloseReader() is 2.
+ // in previous code, the time would be 3
Review Comment:
The comment on line 566 says "totally trigger
prepareInitPoliciesCacheAsync() twice, so the time of
cleanCacheAndCloseReader() is 2", but the code actually verifies
`cleanPoliciesCacheInitMap` is called 2 times on line 574. The comment should
refer to `cleanPoliciesCacheInitMap` instead of `cleanCacheAndCloseReader` for
clarity.
```suggestion
// make sure not to call cleanPoliciesCacheInitMap() more than twice
// totally trigger prepareInitPoliciesCacheAsync() twice, so the
number of times cleanPoliciesCacheInitMap() is called is 2.
// in previous code, the number would be 3
```
##########
pulsar-broker/src/test/java/org/apache/pulsar/broker/service/SystemTopicBasedTopicPoliciesServiceTest.java:
##########
@@ -465,4 +476,186 @@ public void
testCreateNamespaceEventsSystemTopicFactoryException() throws Except
Assert.assertNotNull(topicPolicies);
Assert.assertEquals(topicPolicies.getMaxConsumerPerTopic(), 10);
}
+
+ @Test
+ public void
testPrepareInitPoliciesCacheAsyncThrowExceptionAfterCreateReader() throws
Exception {
+ // catch the log output in SystemTopicBasedTopicPoliciesService
+ Logger logger = (Logger)
LogManager.getLogger(SystemTopicBasedTopicPoliciesService.class);
+ List<String> logMessages = new ArrayList<>();
+ AbstractAppender appender = new AbstractAppender("TestAppender", null,
null) {
+ @Override
+ public void append(LogEvent event) {
+ logMessages.add(event.getMessage().getFormattedMessage());
+ }
+ };
+ appender.start();
+ logger.addAppender(appender);
+
+ // create namespace-5 and topic
+ SystemTopicBasedTopicPoliciesService spyService =
+ Mockito.spy(new SystemTopicBasedTopicPoliciesService(pulsar));
+ FieldUtils.writeField(pulsar, "topicPoliciesService", spyService,
true);
+
+
+ admin.namespaces().createNamespace(NAMESPACE5);
+ final String topic = "persistent://" + NAMESPACE5 + "/test" +
UUID.randomUUID();
+ admin.topics().createPartitionedTopic(topic, 1);
+
+ CompletableFuture<Void> future =
spyService.getPoliciesCacheInit(NamespaceName.get(NAMESPACE5));
+ Assert.assertNull(future);
+
+ // mock readerCache and new a reader, then put this reader in
readerCache.
+ // when new reader, would trigger __change_event topic of namespace-5
created
+ // and would trigger prepareInitPoliciesCacheAsync()
+ ConcurrentHashMap<NamespaceName,
CompletableFuture<SystemTopicClient.Reader<PulsarEvent>>>
+ spyReaderCaches = new ConcurrentHashMap<>();
+ CompletableFuture<SystemTopicClient.Reader<PulsarEvent>>
readerCompletableFuture =
+
spyService.createSystemTopicClient(NamespaceName.get(NAMESPACE5));
+ spyReaderCaches.put(NamespaceName.get(NAMESPACE5),
readerCompletableFuture);
+ FieldUtils.writeDeclaredField(spyService, "readerCaches",
spyReaderCaches, true);
+
+ // set topic policy. create producer for __change_event topic
+ admin.topicPolicies().setMaxConsumersPerSubscription(topic, 1);
+ future =
spyService.getPoliciesCacheInit(NamespaceName.get(NAMESPACE5));
+ Assert.assertNotNull(future);
+
+ // trigger close reader of __change_event directly, simulate that
reader
+ // is closed for some reason, such as topic unload or broker restart.
+ // since prepareInitPoliciesCacheAsync() has been executed, it would
go into readMorePoliciesAsync(),
+ // throw exception, output "Closing the topic policies reader for" and
do cleanPoliciesCacheInitMap()
+ SystemTopicClient.Reader<PulsarEvent> reader =
readerCompletableFuture.get();
+ reader.close();
+ log.info("successfully close spy reader");
+ Awaitility.await().untilAsserted(() -> {
+ boolean logFound = logMessages.stream()
+ .anyMatch(msg -> msg.contains("Closing the topic policies
reader for"));
+ assertTrue(logFound);
+ });
+
+
+ // Since cleanPoliciesCacheInitMap() is executed, should add the
failed reader into readerCache again.
+ // Then in SystemTopicBasedTopicPoliciesService, readerCache has a
closed reader,
+ // and policyCacheInitMap do not contain a future.
+ // To simulate the situation: when getTopicPolicy() execute, it will
do prepareInitPoliciesCacheAsync() and
+ // use a closed reader to read the __change_event topic. Then throw
exception
+ spyReaderCaches.put(NamespaceName.get(NAMESPACE5),
readerCompletableFuture);
+ FieldUtils.writeDeclaredField(spyService, "readerCaches",
spyReaderCaches, true);
+
+ CompletableFuture<Boolean> prepareFuture = new CompletableFuture<>();
+ try {
+ prepareFuture =
spyService.prepareInitPoliciesCacheAsync(NamespaceName.get(NAMESPACE5));
+ prepareFuture.get();
+ Assert.fail();
+ } catch (Exception e) {
+ // that is ok
+ }
+
+ // since prepareInitPoliciesCacheAsync() throw exception when
initPolicesCache(),
+ // would clean readerCache and policyCacheInitMap.
+ // sleep 500ms to make sure clean operation finish.
+ Thread.sleep(500);
+ Assert.assertTrue(prepareFuture.isCompletedExceptionally());
+ future =
spyService.getPoliciesCacheInit(NamespaceName.get(NAMESPACE5));
+ Assert.assertNull(future);
+ CompletableFuture<SystemTopicClient.Reader<PulsarEvent>>
readerCompletableFuture1 =
+ spyReaderCaches.get(NamespaceName.get(NAMESPACE5));
+ Assert.assertNull(readerCompletableFuture1);
+
+
+ // make sure not do cleanPoliciesCacheInitMap() twice
+ // totally trigger prepareInitPoliciesCacheAsync() twice, so the time
of cleanCacheAndCloseReader() is 2.
+ // in previous code, the time would be 3
+ boolean logFound = logMessages.stream()
+ .anyMatch(msg -> msg.contains("Failed to create reader on
__change_events topic"));
+ assertFalse(logFound);
+ boolean logFound2 = logMessages.stream()
+ .anyMatch(msg -> msg.contains("Failed to check the move events
for the system topic"));
+ assertTrue(logFound2);
+ verify(spyService, times(2)).cleanPoliciesCacheInitMap(any(),
anyBoolean());
+
+ // make sure not occur Recursive update
+ boolean logFound3 = logMessages.stream()
+ .anyMatch(msg -> msg.contains("Recursive update"));
+ assertFalse(logFound3);
+
+ // clean log appender
+ appender.stop();
+ logger.removeAppender(appender);
+ }
+
+ @Test
+ public void
testPrepareInitPoliciesCacheAsyncThrowExceptionInCreateReader() throws
Exception {
+ // catch the log output in SystemTopicBasedTopicPoliciesService
+ Logger logger = (Logger)
LogManager.getLogger(SystemTopicBasedTopicPoliciesService.class);
+ List<String> logMessages = new ArrayList<>();
+ AbstractAppender appender = new AbstractAppender("TestAppender", null,
null) {
+ @Override
+ public void append(LogEvent event) {
+ logMessages.add(event.getMessage().getFormattedMessage());
+ }
+ };
+ appender.start();
+ logger.get().addAppender(appender, null, null);
+ logger.addAppender(appender);
+
+ // create namespace-5 and topic
+ SystemTopicBasedTopicPoliciesService spyService =
+ Mockito.spy(new SystemTopicBasedTopicPoliciesService(pulsar));
+ FieldUtils.writeField(pulsar, "topicPoliciesService", spyService,
true);
+
+
+ admin.namespaces().createNamespace(NAMESPACE5);
+ final String topic = "persistent://" + NAMESPACE5 + "/test" +
UUID.randomUUID();
+ admin.topics().createPartitionedTopic(topic, 1);
+
+ CompletableFuture<Void> future =
spyService.getPoliciesCacheInit(NamespaceName.get(NAMESPACE5));
+ Assert.assertNull(future);
+
+ // mock readerCache and put a failed readerCreateFuture in readerCache.
+ // simulate that when trigger prepareInitPoliciesCacheAsync(),
+ // it would use this failed readerFuture and go into corresponding
logic
+ ConcurrentHashMap<NamespaceName,
CompletableFuture<SystemTopicClient.Reader<PulsarEvent>>>
+ spyReaderCaches = new ConcurrentHashMap<>();
+ CompletableFuture<SystemTopicClient.Reader<PulsarEvent>>
readerCompletableFuture = new CompletableFuture<>();
+ readerCompletableFuture.completeExceptionally(new Exception("create
reader fail"));
+ spyReaderCaches.put(NamespaceName.get(NAMESPACE5),
readerCompletableFuture);
+ FieldUtils.writeDeclaredField(spyService, "readerCaches",
spyReaderCaches, true);
+
+ // trigger prepareInitPoliciesCacheAsync()
+ CompletableFuture<Boolean> prepareFuture = new CompletableFuture<>();
+ try {
+ prepareFuture =
spyService.prepareInitPoliciesCacheAsync(NamespaceName.get(NAMESPACE5));
+ prepareFuture.get();
+ Assert.fail();
+ } catch (Exception e) {
+ // that is ok
+ }
+
+ // since prepareInitPoliciesCacheAsync() throw exception when
createReader,
+ // would clean readerCache and policyCacheInitMap.
+ // sleep 500ms to make sure clean operation finish.
+ Thread.sleep(500);
+ Assert.assertTrue(prepareFuture.isCompletedExceptionally());
+ future =
spyService.getPoliciesCacheInit(NamespaceName.get(NAMESPACE5));
+ Assert.assertNull(future);
+ CompletableFuture<SystemTopicClient.Reader<PulsarEvent>>
readerCompletableFuture1 =
+ spyReaderCaches.get(NamespaceName.get(NAMESPACE5));
+ Assert.assertNull(readerCompletableFuture1);
+
+
+ // make sure not do cleanPoliciesCacheInitMap() twice
+ // totally trigger prepareInitPoliciesCacheAsync() once, so the time
of cleanCacheAndCloseReader() is 1.
Review Comment:
The comment on line 647 says "totally trigger
prepareInitPoliciesCacheAsync() once, so the time of cleanCacheAndCloseReader()
is 1", but the code actually verifies `cleanPoliciesCacheInitMap` is called 1
time on line 655. The comment should refer to `cleanPoliciesCacheInitMap`
instead of `cleanCacheAndCloseReader` for clarity.
```suggestion
// totally trigger prepareInitPoliciesCacheAsync() once, so the time
of cleanPoliciesCacheInitMap() is 1.
```
##########
pulsar-broker/src/test/java/org/apache/pulsar/broker/service/SystemTopicBasedTopicPoliciesServiceTest.java:
##########
@@ -465,4 +476,186 @@ public void
testCreateNamespaceEventsSystemTopicFactoryException() throws Except
Assert.assertNotNull(topicPolicies);
Assert.assertEquals(topicPolicies.getMaxConsumerPerTopic(), 10);
}
+
+ @Test
+ public void
testPrepareInitPoliciesCacheAsyncThrowExceptionAfterCreateReader() throws
Exception {
+ // catch the log output in SystemTopicBasedTopicPoliciesService
+ Logger logger = (Logger)
LogManager.getLogger(SystemTopicBasedTopicPoliciesService.class);
+ List<String> logMessages = new ArrayList<>();
+ AbstractAppender appender = new AbstractAppender("TestAppender", null,
null) {
+ @Override
+ public void append(LogEvent event) {
+ logMessages.add(event.getMessage().getFormattedMessage());
+ }
+ };
+ appender.start();
+ logger.addAppender(appender);
+
+ // create namespace-5 and topic
+ SystemTopicBasedTopicPoliciesService spyService =
+ Mockito.spy(new SystemTopicBasedTopicPoliciesService(pulsar));
+ FieldUtils.writeField(pulsar, "topicPoliciesService", spyService,
true);
+
+
+ admin.namespaces().createNamespace(NAMESPACE5);
+ final String topic = "persistent://" + NAMESPACE5 + "/test" +
UUID.randomUUID();
+ admin.topics().createPartitionedTopic(topic, 1);
+
+ CompletableFuture<Void> future =
spyService.getPoliciesCacheInit(NamespaceName.get(NAMESPACE5));
+ Assert.assertNull(future);
+
+ // mock readerCache and new a reader, then put this reader in
readerCache.
+ // when new reader, would trigger __change_event topic of namespace-5
created
+ // and would trigger prepareInitPoliciesCacheAsync()
+ ConcurrentHashMap<NamespaceName,
CompletableFuture<SystemTopicClient.Reader<PulsarEvent>>>
+ spyReaderCaches = new ConcurrentHashMap<>();
+ CompletableFuture<SystemTopicClient.Reader<PulsarEvent>>
readerCompletableFuture =
+
spyService.createSystemTopicClient(NamespaceName.get(NAMESPACE5));
+ spyReaderCaches.put(NamespaceName.get(NAMESPACE5),
readerCompletableFuture);
+ FieldUtils.writeDeclaredField(spyService, "readerCaches",
spyReaderCaches, true);
+
+ // set topic policy. create producer for __change_event topic
+ admin.topicPolicies().setMaxConsumersPerSubscription(topic, 1);
+ future =
spyService.getPoliciesCacheInit(NamespaceName.get(NAMESPACE5));
+ Assert.assertNotNull(future);
+
+ // trigger close reader of __change_event directly, simulate that
reader
+ // is closed for some reason, such as topic unload or broker restart.
+ // since prepareInitPoliciesCacheAsync() has been executed, it would
go into readMorePoliciesAsync(),
+ // throw exception, output "Closing the topic policies reader for" and
do cleanPoliciesCacheInitMap()
+ SystemTopicClient.Reader<PulsarEvent> reader =
readerCompletableFuture.get();
+ reader.close();
+ log.info("successfully close spy reader");
+ Awaitility.await().untilAsserted(() -> {
+ boolean logFound = logMessages.stream()
+ .anyMatch(msg -> msg.contains("Closing the topic policies
reader for"));
+ assertTrue(logFound);
+ });
+
+
+ // Since cleanPoliciesCacheInitMap() is executed, should add the
failed reader into readerCache again.
+ // Then in SystemTopicBasedTopicPoliciesService, readerCache has a
closed reader,
+ // and policyCacheInitMap do not contain a future.
+ // To simulate the situation: when getTopicPolicy() execute, it will
do prepareInitPoliciesCacheAsync() and
+ // use a closed reader to read the __change_event topic. Then throw
exception
+ spyReaderCaches.put(NamespaceName.get(NAMESPACE5),
readerCompletableFuture);
+ FieldUtils.writeDeclaredField(spyService, "readerCaches",
spyReaderCaches, true);
+
+ CompletableFuture<Boolean> prepareFuture = new CompletableFuture<>();
+ try {
+ prepareFuture =
spyService.prepareInitPoliciesCacheAsync(NamespaceName.get(NAMESPACE5));
+ prepareFuture.get();
+ Assert.fail();
+ } catch (Exception e) {
+ // that is ok
+ }
+
+ // since prepareInitPoliciesCacheAsync() throw exception when
initPolicesCache(),
+ // would clean readerCache and policyCacheInitMap.
+ // sleep 500ms to make sure clean operation finish.
+ Thread.sleep(500);
+ Assert.assertTrue(prepareFuture.isCompletedExceptionally());
+ future =
spyService.getPoliciesCacheInit(NamespaceName.get(NAMESPACE5));
+ Assert.assertNull(future);
+ CompletableFuture<SystemTopicClient.Reader<PulsarEvent>>
readerCompletableFuture1 =
+ spyReaderCaches.get(NamespaceName.get(NAMESPACE5));
+ Assert.assertNull(readerCompletableFuture1);
Review Comment:
The test uses `Thread.sleep(500)` to wait for cleanup operations to finish,
which makes the test non-deterministic and slower than necessary. Consider
using `Awaitility.await()` with appropriate conditions (like checking if the
future is completed exceptionally or if caches are cleared) to make the test
more reliable and potentially faster.
```suggestion
// Await until cleanup operation finishes.
Awaitility.await().untilAsserted(() -> {
Assert.assertTrue(prepareFuture.isCompletedExceptionally());
CompletableFuture<Boolean> future =
spyService.getPoliciesCacheInit(NamespaceName.get(NAMESPACE5));
Assert.assertNull(future);
CompletableFuture<SystemTopicClient.Reader<PulsarEvent>>
readerCompletableFuture1 =
spyReaderCaches.get(NamespaceName.get(NAMESPACE5));
Assert.assertNull(readerCompletableFuture1);
});
```
##########
pulsar-broker/src/test/java/org/apache/pulsar/broker/service/SystemTopicBasedTopicPoliciesServiceTest.java:
##########
@@ -465,4 +476,186 @@ public void
testCreateNamespaceEventsSystemTopicFactoryException() throws Except
Assert.assertNotNull(topicPolicies);
Assert.assertEquals(topicPolicies.getMaxConsumerPerTopic(), 10);
}
+
+ @Test
+ public void
testPrepareInitPoliciesCacheAsyncThrowExceptionAfterCreateReader() throws
Exception {
+ // catch the log output in SystemTopicBasedTopicPoliciesService
+ Logger logger = (Logger)
LogManager.getLogger(SystemTopicBasedTopicPoliciesService.class);
+ List<String> logMessages = new ArrayList<>();
+ AbstractAppender appender = new AbstractAppender("TestAppender", null,
null) {
+ @Override
+ public void append(LogEvent event) {
+ logMessages.add(event.getMessage().getFormattedMessage());
+ }
+ };
+ appender.start();
+ logger.addAppender(appender);
+
+ // create namespace-5 and topic
+ SystemTopicBasedTopicPoliciesService spyService =
+ Mockito.spy(new SystemTopicBasedTopicPoliciesService(pulsar));
+ FieldUtils.writeField(pulsar, "topicPoliciesService", spyService,
true);
+
+
+ admin.namespaces().createNamespace(NAMESPACE5);
+ final String topic = "persistent://" + NAMESPACE5 + "/test" +
UUID.randomUUID();
+ admin.topics().createPartitionedTopic(topic, 1);
+
+ CompletableFuture<Void> future =
spyService.getPoliciesCacheInit(NamespaceName.get(NAMESPACE5));
+ Assert.assertNull(future);
+
+ // mock readerCache and new a reader, then put this reader in
readerCache.
+ // when new reader, would trigger __change_event topic of namespace-5
created
+ // and would trigger prepareInitPoliciesCacheAsync()
+ ConcurrentHashMap<NamespaceName,
CompletableFuture<SystemTopicClient.Reader<PulsarEvent>>>
+ spyReaderCaches = new ConcurrentHashMap<>();
+ CompletableFuture<SystemTopicClient.Reader<PulsarEvent>>
readerCompletableFuture =
+
spyService.createSystemTopicClient(NamespaceName.get(NAMESPACE5));
+ spyReaderCaches.put(NamespaceName.get(NAMESPACE5),
readerCompletableFuture);
+ FieldUtils.writeDeclaredField(spyService, "readerCaches",
spyReaderCaches, true);
+
+ // set topic policy. create producer for __change_event topic
+ admin.topicPolicies().setMaxConsumersPerSubscription(topic, 1);
+ future =
spyService.getPoliciesCacheInit(NamespaceName.get(NAMESPACE5));
+ Assert.assertNotNull(future);
+
+ // trigger close reader of __change_event directly, simulate that
reader
+ // is closed for some reason, such as topic unload or broker restart.
+ // since prepareInitPoliciesCacheAsync() has been executed, it would
go into readMorePoliciesAsync(),
+ // throw exception, output "Closing the topic policies reader for" and
do cleanPoliciesCacheInitMap()
+ SystemTopicClient.Reader<PulsarEvent> reader =
readerCompletableFuture.get();
+ reader.close();
+ log.info("successfully close spy reader");
+ Awaitility.await().untilAsserted(() -> {
+ boolean logFound = logMessages.stream()
+ .anyMatch(msg -> msg.contains("Closing the topic policies
reader for"));
+ assertTrue(logFound);
+ });
+
+
+ // Since cleanPoliciesCacheInitMap() is executed, should add the
failed reader into readerCache again.
+ // Then in SystemTopicBasedTopicPoliciesService, readerCache has a
closed reader,
+ // and policyCacheInitMap do not contain a future.
+ // To simulate the situation: when getTopicPolicy() execute, it will
do prepareInitPoliciesCacheAsync() and
+ // use a closed reader to read the __change_event topic. Then throw
exception
+ spyReaderCaches.put(NamespaceName.get(NAMESPACE5),
readerCompletableFuture);
+ FieldUtils.writeDeclaredField(spyService, "readerCaches",
spyReaderCaches, true);
+
+ CompletableFuture<Boolean> prepareFuture = new CompletableFuture<>();
+ try {
+ prepareFuture =
spyService.prepareInitPoliciesCacheAsync(NamespaceName.get(NAMESPACE5));
+ prepareFuture.get();
+ Assert.fail();
+ } catch (Exception e) {
+ // that is ok
+ }
+
+ // since prepareInitPoliciesCacheAsync() throw exception when
initPolicesCache(),
+ // would clean readerCache and policyCacheInitMap.
+ // sleep 500ms to make sure clean operation finish.
+ Thread.sleep(500);
+ Assert.assertTrue(prepareFuture.isCompletedExceptionally());
+ future =
spyService.getPoliciesCacheInit(NamespaceName.get(NAMESPACE5));
+ Assert.assertNull(future);
+ CompletableFuture<SystemTopicClient.Reader<PulsarEvent>>
readerCompletableFuture1 =
+ spyReaderCaches.get(NamespaceName.get(NAMESPACE5));
+ Assert.assertNull(readerCompletableFuture1);
+
+
+ // make sure not do cleanPoliciesCacheInitMap() twice
+ // totally trigger prepareInitPoliciesCacheAsync() twice, so the time
of cleanCacheAndCloseReader() is 2.
+ // in previous code, the time would be 3
+ boolean logFound = logMessages.stream()
+ .anyMatch(msg -> msg.contains("Failed to create reader on
__change_events topic"));
+ assertFalse(logFound);
+ boolean logFound2 = logMessages.stream()
+ .anyMatch(msg -> msg.contains("Failed to check the move events
for the system topic"));
+ assertTrue(logFound2);
+ verify(spyService, times(2)).cleanPoliciesCacheInitMap(any(),
anyBoolean());
+
+ // make sure not occur Recursive update
+ boolean logFound3 = logMessages.stream()
+ .anyMatch(msg -> msg.contains("Recursive update"));
+ assertFalse(logFound3);
+
+ // clean log appender
+ appender.stop();
+ logger.removeAppender(appender);
+ }
+
+ @Test
+ public void
testPrepareInitPoliciesCacheAsyncThrowExceptionInCreateReader() throws
Exception {
+ // catch the log output in SystemTopicBasedTopicPoliciesService
+ Logger logger = (Logger)
LogManager.getLogger(SystemTopicBasedTopicPoliciesService.class);
+ List<String> logMessages = new ArrayList<>();
+ AbstractAppender appender = new AbstractAppender("TestAppender", null,
null) {
+ @Override
+ public void append(LogEvent event) {
+ logMessages.add(event.getMessage().getFormattedMessage());
+ }
+ };
+ appender.start();
+ logger.get().addAppender(appender, null, null);
+ logger.addAppender(appender);
+
+ // create namespace-5 and topic
+ SystemTopicBasedTopicPoliciesService spyService =
+ Mockito.spy(new SystemTopicBasedTopicPoliciesService(pulsar));
+ FieldUtils.writeField(pulsar, "topicPoliciesService", spyService,
true);
+
+
+ admin.namespaces().createNamespace(NAMESPACE5);
+ final String topic = "persistent://" + NAMESPACE5 + "/test" +
UUID.randomUUID();
+ admin.topics().createPartitionedTopic(topic, 1);
+
+ CompletableFuture<Void> future =
spyService.getPoliciesCacheInit(NamespaceName.get(NAMESPACE5));
+ Assert.assertNull(future);
+
+ // mock readerCache and put a failed readerCreateFuture in readerCache.
+ // simulate that when trigger prepareInitPoliciesCacheAsync(),
+ // it would use this failed readerFuture and go into corresponding
logic
+ ConcurrentHashMap<NamespaceName,
CompletableFuture<SystemTopicClient.Reader<PulsarEvent>>>
+ spyReaderCaches = new ConcurrentHashMap<>();
+ CompletableFuture<SystemTopicClient.Reader<PulsarEvent>>
readerCompletableFuture = new CompletableFuture<>();
+ readerCompletableFuture.completeExceptionally(new Exception("create
reader fail"));
+ spyReaderCaches.put(NamespaceName.get(NAMESPACE5),
readerCompletableFuture);
+ FieldUtils.writeDeclaredField(spyService, "readerCaches",
spyReaderCaches, true);
+
+ // trigger prepareInitPoliciesCacheAsync()
+ CompletableFuture<Boolean> prepareFuture = new CompletableFuture<>();
+ try {
+ prepareFuture =
spyService.prepareInitPoliciesCacheAsync(NamespaceName.get(NAMESPACE5));
+ prepareFuture.get();
+ Assert.fail();
+ } catch (Exception e) {
+ // that is ok
+ }
+
+ // since prepareInitPoliciesCacheAsync() throw exception when
createReader,
+ // would clean readerCache and policyCacheInitMap.
+ // sleep 500ms to make sure clean operation finish.
+ Thread.sleep(500);
+ Assert.assertTrue(prepareFuture.isCompletedExceptionally());
+ future =
spyService.getPoliciesCacheInit(NamespaceName.get(NAMESPACE5));
+ Assert.assertNull(future);
+ CompletableFuture<SystemTopicClient.Reader<PulsarEvent>>
readerCompletableFuture1 =
+ spyReaderCaches.get(NamespaceName.get(NAMESPACE5));
+ Assert.assertNull(readerCompletableFuture1);
Review Comment:
The test uses `Thread.sleep(500)` to wait for cleanup operations to finish,
which makes the test non-deterministic and slower than necessary. Consider
using `Awaitility.await()` with appropriate conditions (like checking if the
future is completed exceptionally or if caches are cleared) to make the test
more reliable and potentially faster.
```suggestion
// Await cleanup operation to finish.
Awaitility.await().untilAsserted(() -> {
Assert.assertTrue(prepareFuture.isCompletedExceptionally());
CompletableFuture<Boolean> futureCheck =
spyService.getPoliciesCacheInit(NamespaceName.get(NAMESPACE5));
Assert.assertNull(futureCheck);
CompletableFuture<SystemTopicClient.Reader<PulsarEvent>>
readerCompletableFuture1 =
spyReaderCaches.get(NamespaceName.get(NAMESPACE5));
Assert.assertNull(readerCompletableFuture1);
});
```
##########
pulsar-broker/src/test/java/org/apache/pulsar/broker/service/SystemTopicBasedTopicPoliciesServiceTest.java:
##########
@@ -465,4 +476,186 @@ public void
testCreateNamespaceEventsSystemTopicFactoryException() throws Except
Assert.assertNotNull(topicPolicies);
Assert.assertEquals(topicPolicies.getMaxConsumerPerTopic(), 10);
}
+
+ @Test
+ public void
testPrepareInitPoliciesCacheAsyncThrowExceptionAfterCreateReader() throws
Exception {
+ // catch the log output in SystemTopicBasedTopicPoliciesService
+ Logger logger = (Logger)
LogManager.getLogger(SystemTopicBasedTopicPoliciesService.class);
+ List<String> logMessages = new ArrayList<>();
+ AbstractAppender appender = new AbstractAppender("TestAppender", null,
null) {
+ @Override
+ public void append(LogEvent event) {
+ logMessages.add(event.getMessage().getFormattedMessage());
+ }
+ };
+ appender.start();
+ logger.addAppender(appender);
+
+ // create namespace-5 and topic
+ SystemTopicBasedTopicPoliciesService spyService =
+ Mockito.spy(new SystemTopicBasedTopicPoliciesService(pulsar));
+ FieldUtils.writeField(pulsar, "topicPoliciesService", spyService,
true);
+
+
+ admin.namespaces().createNamespace(NAMESPACE5);
+ final String topic = "persistent://" + NAMESPACE5 + "/test" +
UUID.randomUUID();
+ admin.topics().createPartitionedTopic(topic, 1);
+
+ CompletableFuture<Void> future =
spyService.getPoliciesCacheInit(NamespaceName.get(NAMESPACE5));
+ Assert.assertNull(future);
+
+ // mock readerCache and new a reader, then put this reader in
readerCache.
+ // when new reader, would trigger __change_event topic of namespace-5
created
+ // and would trigger prepareInitPoliciesCacheAsync()
+ ConcurrentHashMap<NamespaceName,
CompletableFuture<SystemTopicClient.Reader<PulsarEvent>>>
+ spyReaderCaches = new ConcurrentHashMap<>();
+ CompletableFuture<SystemTopicClient.Reader<PulsarEvent>>
readerCompletableFuture =
+
spyService.createSystemTopicClient(NamespaceName.get(NAMESPACE5));
+ spyReaderCaches.put(NamespaceName.get(NAMESPACE5),
readerCompletableFuture);
+ FieldUtils.writeDeclaredField(spyService, "readerCaches",
spyReaderCaches, true);
+
+ // set topic policy. create producer for __change_event topic
+ admin.topicPolicies().setMaxConsumersPerSubscription(topic, 1);
+ future =
spyService.getPoliciesCacheInit(NamespaceName.get(NAMESPACE5));
+ Assert.assertNotNull(future);
+
+ // trigger close reader of __change_event directly, simulate that
reader
+ // is closed for some reason, such as topic unload or broker restart.
+ // since prepareInitPoliciesCacheAsync() has been executed, it would
go into readMorePoliciesAsync(),
+ // throw exception, output "Closing the topic policies reader for" and
do cleanPoliciesCacheInitMap()
+ SystemTopicClient.Reader<PulsarEvent> reader =
readerCompletableFuture.get();
+ reader.close();
+ log.info("successfully close spy reader");
+ Awaitility.await().untilAsserted(() -> {
+ boolean logFound = logMessages.stream()
+ .anyMatch(msg -> msg.contains("Closing the topic policies
reader for"));
+ assertTrue(logFound);
+ });
+
+
+ // Since cleanPoliciesCacheInitMap() is executed, should add the
failed reader into readerCache again.
+ // Then in SystemTopicBasedTopicPoliciesService, readerCache has a
closed reader,
+ // and policyCacheInitMap do not contain a future.
+ // To simulate the situation: when getTopicPolicy() execute, it will
do prepareInitPoliciesCacheAsync() and
+ // use a closed reader to read the __change_event topic. Then throw
exception
+ spyReaderCaches.put(NamespaceName.get(NAMESPACE5),
readerCompletableFuture);
+ FieldUtils.writeDeclaredField(spyService, "readerCaches",
spyReaderCaches, true);
+
+ CompletableFuture<Boolean> prepareFuture = new CompletableFuture<>();
+ try {
+ prepareFuture =
spyService.prepareInitPoliciesCacheAsync(NamespaceName.get(NAMESPACE5));
+ prepareFuture.get();
+ Assert.fail();
+ } catch (Exception e) {
+ // that is ok
+ }
+
+ // since prepareInitPoliciesCacheAsync() throw exception when
initPolicesCache(),
+ // would clean readerCache and policyCacheInitMap.
+ // sleep 500ms to make sure clean operation finish.
+ Thread.sleep(500);
+ Assert.assertTrue(prepareFuture.isCompletedExceptionally());
+ future =
spyService.getPoliciesCacheInit(NamespaceName.get(NAMESPACE5));
+ Assert.assertNull(future);
+ CompletableFuture<SystemTopicClient.Reader<PulsarEvent>>
readerCompletableFuture1 =
+ spyReaderCaches.get(NamespaceName.get(NAMESPACE5));
+ Assert.assertNull(readerCompletableFuture1);
+
+
+ // make sure not do cleanPoliciesCacheInitMap() twice
+ // totally trigger prepareInitPoliciesCacheAsync() twice, so the time
of cleanCacheAndCloseReader() is 2.
+ // in previous code, the time would be 3
+ boolean logFound = logMessages.stream()
+ .anyMatch(msg -> msg.contains("Failed to create reader on
__change_events topic"));
+ assertFalse(logFound);
+ boolean logFound2 = logMessages.stream()
+ .anyMatch(msg -> msg.contains("Failed to check the move events
for the system topic"));
+ assertTrue(logFound2);
+ verify(spyService, times(2)).cleanPoliciesCacheInitMap(any(),
anyBoolean());
+
+ // make sure not occur Recursive update
+ boolean logFound3 = logMessages.stream()
+ .anyMatch(msg -> msg.contains("Recursive update"));
+ assertFalse(logFound3);
+
+ // clean log appender
+ appender.stop();
+ logger.removeAppender(appender);
+ }
+
+ @Test
+ public void
testPrepareInitPoliciesCacheAsyncThrowExceptionInCreateReader() throws
Exception {
+ // catch the log output in SystemTopicBasedTopicPoliciesService
+ Logger logger = (Logger)
LogManager.getLogger(SystemTopicBasedTopicPoliciesService.class);
+ List<String> logMessages = new ArrayList<>();
+ AbstractAppender appender = new AbstractAppender("TestAppender", null,
null) {
+ @Override
+ public void append(LogEvent event) {
+ logMessages.add(event.getMessage().getFormattedMessage());
+ }
+ };
+ appender.start();
+ logger.get().addAppender(appender, null, null);
Review Comment:
Line 598 calls `logger.get()` which is unnecessary. The `logger` variable is
already of type `Logger`, so calling `addAppender(appender)` directly on line
599 is sufficient. The call on line 598 should be removed as it's redundant and
`Logger.get()` may not be the intended API.
```suggestion
```
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]