This is an automated email from the ASF dual-hosted git repository. xyz pushed a commit to branch bewaremypower/fix-4.0 in repository https://gitbox.apache.org/repos/asf/pulsar.git
commit 05d767cbdcf4d948043f378019d0e392246deb51 Author: Yunze Xu <[email protected]> AuthorDate: Wed Nov 5 19:30:14 2025 +0800 [branch-4.0] Fix failed testFinishTakeSnapshotWhenTopicLoading due to topic future cache conflicts --- .../pulsar/broker/service/BrokerService.java | 51 +++++++++++----------- 1 file changed, 25 insertions(+), 26 deletions(-) diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java index 0ee7f97b0a0..168f0fccd3e 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java @@ -1117,31 +1117,7 @@ public class BrokerService implements Closeable { // The topic level policies are not needed now, but the meaning of calling // "getTopicPoliciesBypassSystemTopic" will wait for system topic policies initialization. getTopicPoliciesBypassSystemTopic(topicName, TopicPoliciesService.GetType.LOCAL_ONLY) - .thenCompose(optionalTopicPolicies -> { - if (topicName.isPartitioned()) { - final TopicName topicNameEntity = TopicName.get(topicName.getPartitionedTopicName()); - return fetchPartitionedTopicMetadataAsync(topicNameEntity) - .thenCompose((metadata) -> { - // Allow creating non-partitioned persistent topic that name includes - // `partition` - if (metadata.partitions == 0 - || topicName.getPartitionIndex() < metadata.partitions) { - return topics.computeIfAbsent(topicName.toString(), (tpName) -> - loadOrCreatePersistentTopic(context)); - } else { - final String errorMsg = - String.format("Illegal topic partition name %s with max allowed " - + "%d partitions", topicName, metadata.partitions); - log.warn(errorMsg); - return FutureUtil.failedFuture( - new BrokerServiceException.NotAllowedException(errorMsg)); - } - }); - } else { - return topics.computeIfAbsent(topicName.toString(), (tpName) -> - loadOrCreatePersistentTopic(context)); - } - }).thenRun(() -> { + .thenRun(() -> { final var inserted = new MutableBoolean(false); final var cachedFuture = topics.computeIfAbsent(topicName.toString(), ___ -> { inserted.setTrue(); @@ -1678,9 +1654,32 @@ public class BrokerService implements Closeable { * loading and puts them into queue once in-process topics are created. */ protected CompletableFuture<Optional<Topic>> loadOrCreatePersistentTopic(TopicLoadingContext context) { + final var topicName = context.getTopicName(); final var topic = context.getTopicName().toString(); + final CompletableFuture<Void> ownedFuture; + if (topicName.isPartitioned()) { + final TopicName topicNameEntity = TopicName.get(topicName.getPartitionedTopicName()); + ownedFuture = fetchPartitionedTopicMetadataAsync(topicNameEntity) + .thenCompose((metadata) -> { + // Allow creating non-partitioned persistent topic that name includes + // `partition` + if (metadata.partitions == 0 + || topicName.getPartitionIndex() < metadata.partitions) { + return checkTopicNsOwnership(topic); + } else { + final String errorMsg = + String.format("Illegal topic partition name %s with max allowed " + + "%d partitions", topicName, metadata.partitions); + log.warn(errorMsg); + return FutureUtil.failedFuture( + new BrokerServiceException.NotAllowedException(errorMsg)); + } + }); + } else { + ownedFuture = checkTopicNsOwnership(topic); + } final var topicFuture = context.getTopicFuture(); - checkTopicNsOwnership(topic) + ownedFuture .thenRun(() -> { final Semaphore topicLoadSemaphore = topicLoadRequestSemaphore.get();
