This is an automated email from the ASF dual-hosted git repository. lhotari pushed a commit to branch branch-4.1 in repository https://gitbox.apache.org/repos/asf/pulsar.git
commit c3921bad1ec86868325f980b88e5f7860cad62e8 Author: Yunze Xu <[email protected]> AuthorDate: Wed Oct 8 23:42:11 2025 +0800 [fix][broker] Allow intermittent error from topic policies service when loading topics (#24829) (cherry picked from commit 7b6f9fceeb56a1e85bdd917e97393ab8cb19544b) --- .../pulsar/broker/service/BrokerService.java | 18 ++++++++------- .../pulsar/broker/service/BrokerServiceTest.java | 27 +++++++++++++++++++++- 2 files changed, 36 insertions(+), 9 deletions(-) diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java index bd38283ae41..5c19de44341 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java @@ -1168,14 +1168,7 @@ public class BrokerService implements Closeable { // The topic level policies are not needed now, but the meaning of calling // "getTopicPoliciesBypassSystemTopic" will wait for system topic policies initialization. getTopicPoliciesBypassSystemTopic(topicName, TopicPoliciesService.GetType.LOCAL_ONLY) - .exceptionally(ex -> { - final Throwable rc = FutureUtil.unwrapCompletionException(ex); - final String errorInfo = String.format("Topic creation encountered an exception by initialize" - + " topic policies service. topic_name=%s error_message=%s", topicName, - rc.getMessage()); - log.error(errorInfo, rc); - throw FutureUtil.wrapToCompletionException(new ServiceUnitNotReadyException(errorInfo)); - }).thenRun(() -> { + .thenRun(() -> { final var inserted = new MutableBoolean(false); final var cachedFuture = topics.computeIfAbsent(topicName.toString(), ___ -> { inserted.setTrue(); @@ -1195,6 +1188,15 @@ public class BrokerService implements Closeable { } }); } + }).exceptionally(e -> { + pulsar.getExecutor().execute(() -> topics.remove(topicName.toString(), topicFuture)); + final Throwable rc = FutureUtil.unwrapCompletionException(e); + final String errorInfo = String.format("Topic creation encountered an exception by initialize" + + " topic policies service. topic_name=%s error_message=%s", topicName, + rc.getMessage()); + log.error(errorInfo, rc); + topicFuture.completeExceptionally(rc); + return null; }); }); return topicFuture; diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/BrokerServiceTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/BrokerServiceTest.java index 62dcc37f38e..56a08eac209 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/BrokerServiceTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/BrokerServiceTest.java @@ -125,6 +125,7 @@ import org.apache.pulsar.common.policies.data.OffloadPolicies; import org.apache.pulsar.common.policies.data.OffloadPoliciesImpl; import org.apache.pulsar.common.policies.data.OffloadedReadPriority; import org.apache.pulsar.common.policies.data.SubscriptionStats; +import org.apache.pulsar.common.policies.data.TopicPolicies; import org.apache.pulsar.common.policies.data.TopicStats; import org.apache.pulsar.common.protocol.Commands; import org.apache.pulsar.common.util.netty.EventLoopUtil; @@ -148,7 +149,8 @@ public class BrokerServiceTest extends BrokerTestBase { @Override protected void setup() throws Exception { conf.setSystemTopicEnabled(false); - conf.setTopicLevelPoliciesEnabled(false); + conf.setTopicLevelPoliciesEnabled(true); + conf.setTopicPoliciesServiceClassName(MockTopicPoliciesService.class.getName()); super.baseSetup(); } @@ -2036,5 +2038,28 @@ public class BrokerServiceTest extends BrokerTestBase { retryStrategically((test) -> sync.getProducer() != null, 1000, 10); assertNotNull(sync.getProducer()); } + + @Test + public void testGetTopicWhenTopicPoliciesFail() throws Exception { + final var topicName = TopicName.get("prop/ns-abc/test-get-topic-when-topic-policies-fail"); + MockTopicPoliciesService.FAILED_TOPICS.add(topicName); + @Cleanup final var producer = pulsarClient.newProducer().topic(topicName.toString()).create(); + assertFalse(MockTopicPoliciesService.FAILED_TOPICS.contains(topicName)); + } + + static class MockTopicPoliciesService extends TopicPoliciesService.TopicPoliciesServiceDisabled { + + static final Set<TopicName> FAILED_TOPICS = ConcurrentHashMap.newKeySet(); + + @Override + public CompletableFuture<Optional<TopicPolicies>> getTopicPoliciesAsync(TopicName topicName, GetType type) { + if (FAILED_TOPICS.contains(topicName)) { + // Only fail once + FAILED_TOPICS.remove(topicName); + return CompletableFuture.failedFuture(new RuntimeException("injected failure for " + topicName)); + } + return CompletableFuture.completedFuture(Optional.empty()); + } + } }
