Copilot commented on code in PR #25295:
URL: https://github.com/apache/pulsar/pull/25295#discussion_r2904199585
##########
pulsar-broker/src/main/java/org/apache/pulsar/broker/service/SystemTopicBasedTopicPoliciesService.java:
##########
@@ -78,6 +84,18 @@
*/
public class SystemTopicBasedTopicPoliciesService implements
TopicPoliciesService {
+ private static final Counter TOPIC_POLICIES_CACHE_INIT_FAILURES =
Counter.build(
+ "pulsar_topic_policies_cache_init_failures_total",
+ "Total number of topic policies cache initialization failures
after all retries exhausted")
+ .labelNames("namespace")
Review Comment:
Labeling these counters by full `namespace` can create unbounded metric
cardinality (potentially one time series per namespace per broker), which is a
common source of Prometheus memory/CPU pressure. Consider reducing cardinality
(e.g., remove the label, use tenant-only, or gate per-namespace labeling behind
a config) while still meeting the “emit a metric on failure” requirement.
##########
pulsar-broker/src/main/java/org/apache/pulsar/broker/service/SystemTopicBasedTopicPoliciesService.java:
##########
@@ -619,6 +626,123 @@ public void addOwnedNamespaceBundleAsync(NamespaceBundle
namespaceBundle) {
});
}
+ /**
+ * Initializes the topic policies cache with timeout and retry support.
+ * On each attempt, a new reader is created, and {@link #initPolicesCache}
is called with a timeout.
+ * If the initialization times out, the reader is closed and a new attempt
is made.
+ * After all retries are exhausted, namespace bundles are unloaded from
this broker so they can be
+ * reassigned to a different broker.
+ *
+ * @param namespace the namespace to initialize policies for
+ * @param retriesLeft number of retries remaining
+ * @return a future that completes when initialization succeeds or fails
after all retries
+ */
+ private CompletableFuture<Void>
initPoliciesCacheWithTimeoutAndRetry(NamespaceName namespace, int retriesLeft) {
+ if (closed.get()) {
+ return CompletableFuture.failedFuture(
+ new BrokerServiceException(getClass().getName() + " is
closed."));
+ }
+
+ long timeoutSeconds =
pulsarService.getConfiguration().getTopicPoliciesCacheInitTimeoutSeconds();
+ final CompletableFuture<SystemTopicClient.Reader<PulsarEvent>>
readerFuture = newReader(namespace);
+
+ CompletableFuture<Void> attempt = readerFuture.thenCompose(reader -> {
+ final CompletableFuture<Void> stageFuture = new
CompletableFuture<>();
+ initPolicesCache(reader, stageFuture);
+
+ CompletableFuture<Void> timedFuture = timeoutSeconds > 0
+ ? stageFuture.orTimeout(timeoutSeconds, TimeUnit.SECONDS)
+ : stageFuture;
+
+ return timedFuture.thenAccept(__ -> readMorePoliciesAsync(reader));
+ });
+
+ return attempt
+ .thenApply(v -> CompletableFuture.completedFuture(v))
+ .exceptionally(ex -> {
+ Throwable cause = FutureUtil.unwrapCompletionException(ex);
+ if (cause instanceof TimeoutException) {
+
TOPIC_POLICIES_CACHE_INIT_TIMEOUTS.labels(namespace.toString()).inc();
+ // Close the stuck reader and remove from cache so a
new one can be created
+ closeAndRemoveReaderForNamespace(namespace);
+
+ if (retriesLeft > 0) {
+ log.warn("[{}] Topic policies cache initialization
timed out after {}s. "
+ + "Retrying... ({} retries left)",
+ namespace, timeoutSeconds, retriesLeft);
+ return
initPoliciesCacheWithTimeoutAndRetry(namespace, retriesLeft - 1);
+ } else {
+ log.error("[{}] Topic policies cache
initialization failed after all retries "
+ + "(timed out after {}s per
attempt). Unloading namespace bundles "
+ + "from this broker.",
+ namespace, timeoutSeconds);
+
TOPIC_POLICIES_CACHE_INIT_FAILURES.labels(namespace.toString()).inc();
+ unloadNamespaceBundlesAsync(namespace);
+ return CompletableFuture.<Void>failedFuture(
+ new BrokerServiceException(
+ "Topic policies cache
initialization failed after all retries "
+ + "for namespace " +
namespace));
+ }
+ }
+ // For non-timeout exceptions (e.g. reader creation
failure), propagate directly
+ return CompletableFuture.<Void>failedFuture(cause);
+ })
+ .thenCompose(Function.identity());
Review Comment:
The `thenApply(v -> completedFuture(v))` + `exceptionally(...)` +
`thenCompose(identity())` pattern is harder to read and maintain than
necessary. Consider rewriting using a single `handle`/`whenComplete`-style
branch that returns either a value or a next-stage future (and then
`thenCompose` once), or using `exceptionallyCompose` if the project’s Java
target supports it—this will avoid nested futures and make the retry/failure
flow clearer.
##########
pulsar-broker/src/main/java/org/apache/pulsar/broker/service/SystemTopicBasedTopicPoliciesService.java:
##########
@@ -78,6 +84,18 @@
*/
public class SystemTopicBasedTopicPoliciesService implements
TopicPoliciesService {
+ private static final Counter TOPIC_POLICIES_CACHE_INIT_FAILURES =
Counter.build(
+ "pulsar_topic_policies_cache_init_failures_total",
+ "Total number of topic policies cache initialization failures
after all retries exhausted")
+ .labelNames("namespace")
+ .register();
+
+ private static final Counter TOPIC_POLICIES_CACHE_INIT_TIMEOUTS =
Counter.build(
+ "pulsar_topic_policies_cache_init_timeouts_total",
+ "Total number of topic policies cache initialization timeouts
(including retried attempts)")
+ .labelNames("namespace")
+ .register();
Review Comment:
Static `Counter.build(...).register()` registers into the global default
Prometheus registry at class-load time. In long-running brokers and especially
in unit/integration tests that may load/reload components in the same JVM, this
pattern can trigger duplicate-collector registration errors and makes metric
lifecycle hard to control. Prefer wiring metrics through Pulsar’s existing
metrics/registry facilities (or a registry owned by the broker instance) rather
than static global registration in the service class.
```suggestion
.create();
private static final Counter TOPIC_POLICIES_CACHE_INIT_TIMEOUTS =
Counter.build(
"pulsar_topic_policies_cache_init_timeouts_total",
"Total number of topic policies cache initialization timeouts
(including retried attempts)")
.labelNames("namespace")
.create();
```
##########
pulsar-broker/src/main/java/org/apache/pulsar/broker/service/SystemTopicBasedTopicPoliciesService.java:
##########
@@ -581,29 +599,18 @@ public void addOwnedNamespaceBundleAsync(NamespaceBundle
namespaceBundle) {
CompletableFuture<Void> existingFuture =
policyCacheInitMap.putIfAbsent(namespace,
initNamespacePolicyFuture);
if (existingFuture == null) {
- final
CompletableFuture<SystemTopicClient.Reader<PulsarEvent>>
readerCompletableFuture =
- newReader(namespace);
- readerCompletableFuture
- .thenCompose(reader -> {
- final CompletableFuture<Void> stageFuture
= new CompletableFuture<>();
- initPolicesCache(reader, stageFuture);
- return stageFuture
- // Read policies in background
- .thenAccept(__ ->
readMorePoliciesAsync(reader));
- }).thenApply(__ -> {
+ int maxRetries = pulsarService.getConfiguration()
+ .getTopicPoliciesCacheInitMaxRetries();
+ initPoliciesCacheWithTimeoutAndRetry(namespace,
maxRetries)
+ .thenApply(__ -> {
initNamespacePolicyFuture.complete(null);
return null;
}).exceptionally(ex -> {
try {
- if
(readerCompletableFuture.isCompletedExceptionally()) {
- log.error("[{}] Failed to create
reader on __change_events topic",
- namespace, ex);
-
initNamespacePolicyFuture.completeExceptionally(ex);
-
cleanPoliciesCacheInitMap(namespace, true);
- } else {
-
initNamespacePolicyFuture.completeExceptionally(ex);
-
cleanPoliciesCacheInitMap(namespace, isAlreadyClosedException(ex));
- }
+ log.error("[{}] Failed to initialize
topic policies cache",
+ namespace, ex);
+
initNamespacePolicyFuture.completeExceptionally(ex);
+ cleanPoliciesCacheInitMap(namespace,
true);
Review Comment:
This changes cleanup behavior from the prior logic that conditioned the
boolean argument on the exception type (e.g., `isAlreadyClosedException(ex)`),
and also previously treated reader-creation failures differently. Always
passing `true` can alter shutdown/cleanup semantics and may cause incorrect
cleanup decisions for non-close-related failures. Suggest restoring the prior
decision logic (e.g., pass `isAlreadyClosedException(ex)` where appropriate,
and keep the more specific branching for reader creation vs. later-stage
failures if it impacts cleanup).
```suggestion
cleanPoliciesCacheInitMap(namespace,
ex instanceof
PulsarClientException.AlreadyClosedException
|| ex.getCause()
instanceof PulsarClientException.AlreadyClosedException);
```
##########
pulsar-broker/src/test/java/org/apache/pulsar/broker/service/SystemTopicBasedTopicPoliciesServiceTest.java:
##########
@@ -623,12 +624,148 @@ public void
testPrepareInitPoliciesCacheAsyncThrowExceptionInCreateReader() thro
// make sure not do cleanPoliciesCacheInitMap() twice
// totally trigger prepareInitPoliciesCacheAsync() once, so the time
of cleanPoliciesCacheInitMap() is 1.
boolean logFound =
testLogAppender.getEvents().stream().anyMatch(logEvent ->
- logEvent.getMessage().toString().contains("Failed to create
reader on __change_events topic"));
+ logEvent.getMessage().toString().contains("Failed to
initialize topic policies cache"));
assertTrue(logFound);
boolean logFound2 =
testLogAppender.getEvents().stream().anyMatch(logEvent ->
logEvent.getMessage().toString().contains("Failed to check the
move events for the system topic")
|| logEvent.getMessage().toString().contains("Failed
to read event from the system topic"));
assertFalse(logFound2);
verify(spyService, times(1)).cleanPoliciesCacheInitMap(any(),
anyBoolean());
}
+
+ @Test
+ public void testInitPoliciesCacheTimeoutWithSuccessfulRetry() throws
Exception {
+ @Cleanup
+ TestLogAppender testLogAppender = TestLogAppender.create(log);
+
+ pulsar.getTopicPoliciesService().close();
+ // Set a very short timeout and allow 2 retries
+ conf.setTopicPoliciesCacheInitTimeoutSeconds(1);
+ conf.setTopicPoliciesCacheInitMaxRetries(2);
+
+ SystemTopicBasedTopicPoliciesService spyService =
+ Mockito.spy(new SystemTopicBasedTopicPoliciesService(pulsar));
+ FieldUtils.writeField(pulsar, "topicPoliciesService", spyService,
true);
+
+ admin.namespaces().createNamespace(NAMESPACE5);
+ final String topic = "persistent://" + NAMESPACE5 + "/testTimeout" +
UUID.randomUUID();
+ admin.topics().createPartitionedTopic(topic, 1);
+
+ // Create a reader that never completes hasMoreEventsAsync (simulates
a stuck reader)
+ SystemTopicClient.Reader<PulsarEvent> mockReader =
Mockito.mock(SystemTopicClient.Reader.class);
+ SystemTopicClient<PulsarEvent> mockSystemTopic =
Mockito.mock(SystemTopicClient.class);
+ TopicName changeEventsTopic = TopicName.get("persistent://" +
NAMESPACE5 + "/__change_events");
+
Mockito.when(mockSystemTopic.getTopicName()).thenReturn(changeEventsTopic);
+ Mockito.when(mockReader.getSystemTopic()).thenReturn(mockSystemTopic);
+ // First call: never complete (will timeout). Second call: return
false (no more events)
+ CompletableFuture<Boolean> neverCompleteFuture = new
CompletableFuture<>();
+ Mockito.when(mockReader.hasMoreEventsAsync())
+ .thenReturn(neverCompleteFuture)
+ .thenReturn(CompletableFuture.completedFuture(false));
+
Mockito.when(mockReader.closeAsync()).thenReturn(CompletableFuture.completedFuture(null));
+
+ // Put the mock reader in reader cache
+ ConcurrentHashMap<NamespaceName,
CompletableFuture<SystemTopicClient.Reader<PulsarEvent>>>
+ spyReaderCaches = new ConcurrentHashMap<>();
+ spyReaderCaches.put(NamespaceName.get(NAMESPACE5),
CompletableFuture.completedFuture(mockReader));
+ FieldUtils.writeDeclaredField(spyService, "readerCaches",
spyReaderCaches, true);
+
+ // On retry (after the stuck reader is removed), create a real reader
+ Mockito.doAnswer(invocation -> {
+ NamespaceName ns = invocation.getArgument(0);
+ // Return a real reader for the retry
+ return spyReaderCaches.compute(ns, (k, v) -> {
+ if (v == null) {
+ return CompletableFuture.completedFuture(mockReader);
+ }
+ return v;
+ });
+
}).when(spyService).createSystemTopicClient(NamespaceName.get(NAMESPACE5));
+
+ CompletableFuture<Boolean> prepareFuture =
+
spyService.prepareInitPoliciesCacheAsync(NamespaceName.get(NAMESPACE5));
+
+ // The first attempt times out, the second attempt should succeed
(since hasMoreEventsAsync
+ // returns false on second call)
+ try {
+ prepareFuture.get(30, TimeUnit.SECONDS);
+ } catch (Exception e) {
+ // Retry may or may not succeed depending on mock setup; the
important thing is
+ // the timeout was detected
+ }
+
Review Comment:
This test swallows all exceptions and explicitly allows the retry to “may or
may not succeed”, which makes the test non-deterministic and not aligned with
its name (“SuccessfulRetry”). It should assert a clear expected outcome (e.g.,
`prepareFuture` completes successfully and `hasMoreEventsAsync()` is invoked
twice, or at minimum that a retry occurred via verifiable interactions),
otherwise the test can pass even if the retry logic is broken.
```suggestion
// The first attempt should time out, and the second attempt should
succeed (since
// hasMoreEventsAsync returns false on the second call)
Boolean initResult = prepareFuture.get(30, TimeUnit.SECONDS);
assertTrue(initResult);
// Verify that hasMoreEventsAsync was invoked twice (initial attempt
+ retry)
verify(mockReader, times(2)).hasMoreEventsAsync();
```
##########
pulsar-broker/src/test/java/org/apache/pulsar/broker/service/SystemTopicBasedTopicPoliciesServiceTest.java:
##########
@@ -623,12 +624,148 @@ public void
testPrepareInitPoliciesCacheAsyncThrowExceptionInCreateReader() thro
// make sure not do cleanPoliciesCacheInitMap() twice
// totally trigger prepareInitPoliciesCacheAsync() once, so the time
of cleanPoliciesCacheInitMap() is 1.
boolean logFound =
testLogAppender.getEvents().stream().anyMatch(logEvent ->
- logEvent.getMessage().toString().contains("Failed to create
reader on __change_events topic"));
+ logEvent.getMessage().toString().contains("Failed to
initialize topic policies cache"));
assertTrue(logFound);
boolean logFound2 =
testLogAppender.getEvents().stream().anyMatch(logEvent ->
logEvent.getMessage().toString().contains("Failed to check the
move events for the system topic")
|| logEvent.getMessage().toString().contains("Failed
to read event from the system topic"));
assertFalse(logFound2);
verify(spyService, times(1)).cleanPoliciesCacheInitMap(any(),
anyBoolean());
}
+
+ @Test
+ public void testInitPoliciesCacheTimeoutWithSuccessfulRetry() throws
Exception {
+ @Cleanup
+ TestLogAppender testLogAppender = TestLogAppender.create(log);
+
+ pulsar.getTopicPoliciesService().close();
+ // Set a very short timeout and allow 2 retries
+ conf.setTopicPoliciesCacheInitTimeoutSeconds(1);
+ conf.setTopicPoliciesCacheInitMaxRetries(2);
+
+ SystemTopicBasedTopicPoliciesService spyService =
+ Mockito.spy(new SystemTopicBasedTopicPoliciesService(pulsar));
+ FieldUtils.writeField(pulsar, "topicPoliciesService", spyService,
true);
+
+ admin.namespaces().createNamespace(NAMESPACE5);
+ final String topic = "persistent://" + NAMESPACE5 + "/testTimeout" +
UUID.randomUUID();
+ admin.topics().createPartitionedTopic(topic, 1);
+
+ // Create a reader that never completes hasMoreEventsAsync (simulates
a stuck reader)
+ SystemTopicClient.Reader<PulsarEvent> mockReader =
Mockito.mock(SystemTopicClient.Reader.class);
+ SystemTopicClient<PulsarEvent> mockSystemTopic =
Mockito.mock(SystemTopicClient.class);
+ TopicName changeEventsTopic = TopicName.get("persistent://" +
NAMESPACE5 + "/__change_events");
+
Mockito.when(mockSystemTopic.getTopicName()).thenReturn(changeEventsTopic);
+ Mockito.when(mockReader.getSystemTopic()).thenReturn(mockSystemTopic);
+ // First call: never complete (will timeout). Second call: return
false (no more events)
+ CompletableFuture<Boolean> neverCompleteFuture = new
CompletableFuture<>();
+ Mockito.when(mockReader.hasMoreEventsAsync())
+ .thenReturn(neverCompleteFuture)
+ .thenReturn(CompletableFuture.completedFuture(false));
+
Mockito.when(mockReader.closeAsync()).thenReturn(CompletableFuture.completedFuture(null));
+
+ // Put the mock reader in reader cache
+ ConcurrentHashMap<NamespaceName,
CompletableFuture<SystemTopicClient.Reader<PulsarEvent>>>
+ spyReaderCaches = new ConcurrentHashMap<>();
+ spyReaderCaches.put(NamespaceName.get(NAMESPACE5),
CompletableFuture.completedFuture(mockReader));
+ FieldUtils.writeDeclaredField(spyService, "readerCaches",
spyReaderCaches, true);
+
+ // On retry (after the stuck reader is removed), create a real reader
+ Mockito.doAnswer(invocation -> {
+ NamespaceName ns = invocation.getArgument(0);
+ // Return a real reader for the retry
+ return spyReaderCaches.compute(ns, (k, v) -> {
+ if (v == null) {
+ return CompletableFuture.completedFuture(mockReader);
+ }
+ return v;
+ });
+
}).when(spyService).createSystemTopicClient(NamespaceName.get(NAMESPACE5));
+
+ CompletableFuture<Boolean> prepareFuture =
+
spyService.prepareInitPoliciesCacheAsync(NamespaceName.get(NAMESPACE5));
+
+ // The first attempt times out, the second attempt should succeed
(since hasMoreEventsAsync
+ // returns false on second call)
+ try {
+ prepareFuture.get(30, TimeUnit.SECONDS);
+ } catch (Exception e) {
+ // Retry may or may not succeed depending on mock setup; the
important thing is
+ // the timeout was detected
+ }
+
+ // Verify that the timeout was detected and retry was attempted
+ Awaitility.await().atMost(10, TimeUnit.SECONDS).untilAsserted(() -> {
+ boolean timeoutLogFound =
testLogAppender.getEvents().stream().anyMatch(logEvent ->
+ logEvent.getMessage().toString().contains(
+ "Topic policies cache initialization timed out"));
+ assertTrue(timeoutLogFound);
+ });
+
+ // Reset config
+ conf.setTopicPoliciesCacheInitTimeoutSeconds(300);
+ conf.setTopicPoliciesCacheInitMaxRetries(3);
+ }
+
+ @Test
+ public void testInitPoliciesCacheTimeoutExhaustsRetries() throws Exception
{
+ @Cleanup
+ TestLogAppender testLogAppender = TestLogAppender.create(log);
+
+ pulsar.getTopicPoliciesService().close();
+ // Set a very short timeout and 0 retries so it fails immediately
after first timeout
+ conf.setTopicPoliciesCacheInitTimeoutSeconds(1);
+ conf.setTopicPoliciesCacheInitMaxRetries(0);
+
+ SystemTopicBasedTopicPoliciesService spyService =
+ Mockito.spy(new SystemTopicBasedTopicPoliciesService(pulsar));
+ FieldUtils.writeField(pulsar, "topicPoliciesService", spyService,
true);
+
+ admin.namespaces().createNamespace(NAMESPACE5);
+
+ // Create a reader that never completes hasMoreEventsAsync (simulates
a stuck reader)
+ SystemTopicClient.Reader<PulsarEvent> mockReader =
Mockito.mock(SystemTopicClient.Reader.class);
+ SystemTopicClient<PulsarEvent> mockSystemTopic =
Mockito.mock(SystemTopicClient.class);
+ TopicName changeEventsTopic = TopicName.get("persistent://" +
NAMESPACE5 + "/__change_events");
+
Mockito.when(mockSystemTopic.getTopicName()).thenReturn(changeEventsTopic);
+ Mockito.when(mockReader.getSystemTopic()).thenReturn(mockSystemTopic);
+ Mockito.when(mockReader.hasMoreEventsAsync()).thenReturn(new
CompletableFuture<>());
+
Mockito.when(mockReader.closeAsync()).thenReturn(CompletableFuture.completedFuture(null));
+
+ ConcurrentHashMap<NamespaceName,
CompletableFuture<SystemTopicClient.Reader<PulsarEvent>>>
+ spyReaderCaches = new ConcurrentHashMap<>();
+ spyReaderCaches.put(NamespaceName.get(NAMESPACE5),
CompletableFuture.completedFuture(mockReader));
+ FieldUtils.writeDeclaredField(spyService, "readerCaches",
spyReaderCaches, true);
+
+ CompletableFuture<Boolean> prepareFuture =
+
spyService.prepareInitPoliciesCacheAsync(NamespaceName.get(NAMESPACE5));
+
+ try {
+ prepareFuture.get(30, TimeUnit.SECONDS);
+ Assert.fail("Should have failed after retries exhausted");
+ } catch (ExecutionException e) {
+ assertTrue(e.getCause().getMessage().contains(
+ "Topic policies cache initialization failed after all
retries"));
+ }
+
+ // Verify the failure log was emitted
+ Awaitility.await().atMost(10, TimeUnit.SECONDS).untilAsserted(() -> {
+ boolean failureLogFound =
testLogAppender.getEvents().stream().anyMatch(logEvent ->
+ logEvent.getMessage().toString().contains(
+ "Topic policies cache initialization failed after
all retries"));
+ assertTrue(failureLogFound);
+ });
+
+ // Verify that the unloading log was emitted (may be "No owned
bundles" or "Unloading")
+ boolean unloadLogFound =
testLogAppender.getEvents().stream().anyMatch(logEvent -> {
+ String msg = logEvent.getMessage().toString();
+ return msg.contains("Unloading") && msg.contains("namespace
bundles")
+ || msg.contains("No owned bundles found to unload");
+ });
+ assertTrue(unloadLogFound);
Review Comment:
This assertion is potentially racy because unloading is initiated
asynchronously; unlike the failure-log assertion above, it doesn’t wait for the
unload log to appear. Use `Awaitility` (or otherwise synchronize) around this
check to make the test deterministic.
```suggestion
Awaitility.await().atMost(10, TimeUnit.SECONDS).untilAsserted(() -> {
boolean unloadLogFound =
testLogAppender.getEvents().stream().anyMatch(logEvent -> {
String msg = logEvent.getMessage().toString();
return (msg.contains("Unloading") && msg.contains("namespace
bundles"))
|| msg.contains("No owned bundles found to unload");
});
assertTrue(unloadLogFound);
});
```
##########
pulsar-broker/src/main/java/org/apache/pulsar/broker/service/SystemTopicBasedTopicPoliciesService.java:
##########
@@ -78,6 +84,18 @@
*/
public class SystemTopicBasedTopicPoliciesService implements
TopicPoliciesService {
+ private static final Counter TOPIC_POLICIES_CACHE_INIT_FAILURES =
Counter.build(
+ "pulsar_topic_policies_cache_init_failures_total",
+ "Total number of topic policies cache initialization failures
after all retries exhausted")
+ .labelNames("namespace")
+ .register();
+
+ private static final Counter TOPIC_POLICIES_CACHE_INIT_TIMEOUTS =
Counter.build(
+ "pulsar_topic_policies_cache_init_timeouts_total",
+ "Total number of topic policies cache initialization timeouts
(including retried attempts)")
+ .labelNames("namespace")
Review Comment:
Labeling these counters by full `namespace` can create unbounded metric
cardinality (potentially one time series per namespace per broker), which is a
common source of Prometheus memory/CPU pressure. Consider reducing cardinality
(e.g., remove the label, use tenant-only, or gate per-namespace labeling behind
a config) while still meeting the “emit a metric on failure” requirement.
```suggestion
"Total number of topic policies cache initialization failures
after all retries exhausted, per tenant")
.labelNames("tenant")
.register();
private static final Counter TOPIC_POLICIES_CACHE_INIT_TIMEOUTS =
Counter.build(
"pulsar_topic_policies_cache_init_timeouts_total",
"Total number of topic policies cache initialization timeouts
(including retried attempts), per tenant")
.labelNames("tenant")
```
##########
pulsar-broker/src/test/java/org/apache/pulsar/broker/service/SystemTopicBasedTopicPoliciesServiceTest.java:
##########
@@ -623,12 +624,148 @@ public void
testPrepareInitPoliciesCacheAsyncThrowExceptionInCreateReader() thro
// make sure not do cleanPoliciesCacheInitMap() twice
// totally trigger prepareInitPoliciesCacheAsync() once, so the time
of cleanPoliciesCacheInitMap() is 1.
boolean logFound =
testLogAppender.getEvents().stream().anyMatch(logEvent ->
- logEvent.getMessage().toString().contains("Failed to create
reader on __change_events topic"));
+ logEvent.getMessage().toString().contains("Failed to
initialize topic policies cache"));
assertTrue(logFound);
boolean logFound2 =
testLogAppender.getEvents().stream().anyMatch(logEvent ->
logEvent.getMessage().toString().contains("Failed to check the
move events for the system topic")
|| logEvent.getMessage().toString().contains("Failed
to read event from the system topic"));
assertFalse(logFound2);
verify(spyService, times(1)).cleanPoliciesCacheInitMap(any(),
anyBoolean());
}
+
+ @Test
+ public void testInitPoliciesCacheTimeoutWithSuccessfulRetry() throws
Exception {
+ @Cleanup
+ TestLogAppender testLogAppender = TestLogAppender.create(log);
+
+ pulsar.getTopicPoliciesService().close();
+ // Set a very short timeout and allow 2 retries
+ conf.setTopicPoliciesCacheInitTimeoutSeconds(1);
+ conf.setTopicPoliciesCacheInitMaxRetries(2);
+
+ SystemTopicBasedTopicPoliciesService spyService =
+ Mockito.spy(new SystemTopicBasedTopicPoliciesService(pulsar));
+ FieldUtils.writeField(pulsar, "topicPoliciesService", spyService,
true);
+
+ admin.namespaces().createNamespace(NAMESPACE5);
+ final String topic = "persistent://" + NAMESPACE5 + "/testTimeout" +
UUID.randomUUID();
+ admin.topics().createPartitionedTopic(topic, 1);
+
+ // Create a reader that never completes hasMoreEventsAsync (simulates
a stuck reader)
+ SystemTopicClient.Reader<PulsarEvent> mockReader =
Mockito.mock(SystemTopicClient.Reader.class);
+ SystemTopicClient<PulsarEvent> mockSystemTopic =
Mockito.mock(SystemTopicClient.class);
+ TopicName changeEventsTopic = TopicName.get("persistent://" +
NAMESPACE5 + "/__change_events");
+
Mockito.when(mockSystemTopic.getTopicName()).thenReturn(changeEventsTopic);
+ Mockito.when(mockReader.getSystemTopic()).thenReturn(mockSystemTopic);
+ // First call: never complete (will timeout). Second call: return
false (no more events)
+ CompletableFuture<Boolean> neverCompleteFuture = new
CompletableFuture<>();
+ Mockito.when(mockReader.hasMoreEventsAsync())
+ .thenReturn(neverCompleteFuture)
+ .thenReturn(CompletableFuture.completedFuture(false));
+
Mockito.when(mockReader.closeAsync()).thenReturn(CompletableFuture.completedFuture(null));
+
+ // Put the mock reader in reader cache
+ ConcurrentHashMap<NamespaceName,
CompletableFuture<SystemTopicClient.Reader<PulsarEvent>>>
+ spyReaderCaches = new ConcurrentHashMap<>();
+ spyReaderCaches.put(NamespaceName.get(NAMESPACE5),
CompletableFuture.completedFuture(mockReader));
+ FieldUtils.writeDeclaredField(spyService, "readerCaches",
spyReaderCaches, true);
+
+ // On retry (after the stuck reader is removed), create a real reader
+ Mockito.doAnswer(invocation -> {
+ NamespaceName ns = invocation.getArgument(0);
+ // Return a real reader for the retry
+ return spyReaderCaches.compute(ns, (k, v) -> {
+ if (v == null) {
+ return CompletableFuture.completedFuture(mockReader);
+ }
+ return v;
+ });
+
}).when(spyService).createSystemTopicClient(NamespaceName.get(NAMESPACE5));
+
+ CompletableFuture<Boolean> prepareFuture =
+
spyService.prepareInitPoliciesCacheAsync(NamespaceName.get(NAMESPACE5));
+
+ // The first attempt times out, the second attempt should succeed
(since hasMoreEventsAsync
+ // returns false on second call)
+ try {
+ prepareFuture.get(30, TimeUnit.SECONDS);
+ } catch (Exception e) {
+ // Retry may or may not succeed depending on mock setup; the
important thing is
+ // the timeout was detected
+ }
+
+ // Verify that the timeout was detected and retry was attempted
+ Awaitility.await().atMost(10, TimeUnit.SECONDS).untilAsserted(() -> {
+ boolean timeoutLogFound =
testLogAppender.getEvents().stream().anyMatch(logEvent ->
+ logEvent.getMessage().toString().contains(
+ "Topic policies cache initialization timed out"));
+ assertTrue(timeoutLogFound);
+ });
+
+ // Reset config
+ conf.setTopicPoliciesCacheInitTimeoutSeconds(300);
+ conf.setTopicPoliciesCacheInitMaxRetries(3);
Review Comment:
The config reset is not in a `finally` block. If the test fails before
reaching these lines, it can leak altered broker config into subsequent tests
and cause cascading failures. Consider wrapping the body in try/finally (or use
a test framework hook) to guarantee restoration.
```suggestion
try {
prepareFuture.get(30, TimeUnit.SECONDS);
} catch (Exception e) {
// Retry may or may not succeed depending on mock setup; the
important thing is
// the timeout was detected
}
// Verify that the timeout was detected and retry was attempted
Awaitility.await().atMost(10, TimeUnit.SECONDS).untilAsserted(()
-> {
boolean timeoutLogFound =
testLogAppender.getEvents().stream().anyMatch(logEvent ->
logEvent.getMessage().toString().contains(
"Topic policies cache initialization timed
out"));
assertTrue(timeoutLogFound);
});
} finally {
// Reset config
conf.setTopicPoliciesCacheInitTimeoutSeconds(300);
conf.setTopicPoliciesCacheInitMaxRetries(3);
}
```
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]