This is an automated email from the ASF dual-hosted git repository.

xyz pushed a commit to branch branch-4.0
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/branch-4.0 by this push:
     new 6e3d5d8a9dc [fix][broker][branch-4.0] Fix failed 
testFinishTakeSnapshotWhenTopicLoading due to topic future cache conflicts 
(#24947)
6e3d5d8a9dc is described below

commit 6e3d5d8a9dc62db0864f852a87295bf1e75a6e07
Author: Yunze Xu <[email protected]>
AuthorDate: Wed Nov 5 21:31:50 2025 +0800

    [fix][broker][branch-4.0] Fix failed testFinishTakeSnapshotWhenTopicLoading 
due to topic future cache conflicts (#24947)
---
 .../pulsar/broker/service/BrokerService.java       | 51 +++++++++++-----------
 1 file changed, 25 insertions(+), 26 deletions(-)

diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java
index 0ee7f97b0a0..168f0fccd3e 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java
@@ -1117,31 +1117,7 @@ public class BrokerService implements Closeable {
                     // The topic level policies are not needed now, but the 
meaning of calling
                     // "getTopicPoliciesBypassSystemTopic" will wait for 
system topic policies initialization.
                     getTopicPoliciesBypassSystemTopic(topicName, 
TopicPoliciesService.GetType.LOCAL_ONLY)
-                            .thenCompose(optionalTopicPolicies -> {
-                        if (topicName.isPartitioned()) {
-                            final TopicName topicNameEntity = 
TopicName.get(topicName.getPartitionedTopicName());
-                            return 
fetchPartitionedTopicMetadataAsync(topicNameEntity)
-                                    .thenCompose((metadata) -> {
-                                        // Allow creating non-partitioned 
persistent topic that name includes
-                                        // `partition`
-                                        if (metadata.partitions == 0
-                                                || 
topicName.getPartitionIndex() < metadata.partitions) {
-                                            return 
topics.computeIfAbsent(topicName.toString(), (tpName) ->
-                                                    
loadOrCreatePersistentTopic(context));
-                                        } else {
-                                            final String errorMsg =
-                                                    String.format("Illegal 
topic partition name %s with max allowed "
-                                                            + "%d partitions", 
topicName, metadata.partitions);
-                                            log.warn(errorMsg);
-                                            return FutureUtil.failedFuture(
-                                                    new 
BrokerServiceException.NotAllowedException(errorMsg));
-                                        }
-                                    });
-                        } else {
-                            return 
topics.computeIfAbsent(topicName.toString(), (tpName) ->
-                                    loadOrCreatePersistentTopic(context));
-                        }
-                    }).thenRun(() -> {
+                            .thenRun(() -> {
                         final var inserted = new MutableBoolean(false);
                         final var cachedFuture = 
topics.computeIfAbsent(topicName.toString(), ___ -> {
                             inserted.setTrue();
@@ -1678,9 +1654,32 @@ public class BrokerService implements Closeable {
      * loading and puts them into queue once in-process topics are created.
      */
     protected CompletableFuture<Optional<Topic>> 
loadOrCreatePersistentTopic(TopicLoadingContext context) {
+        final var topicName = context.getTopicName();
         final var topic = context.getTopicName().toString();
+        final CompletableFuture<Void> ownedFuture;
+        if (topicName.isPartitioned()) {
+            final TopicName topicNameEntity = 
TopicName.get(topicName.getPartitionedTopicName());
+            ownedFuture = fetchPartitionedTopicMetadataAsync(topicNameEntity)
+                    .thenCompose((metadata) -> {
+                        // Allow creating non-partitioned persistent topic 
that name includes
+                        // `partition`
+                        if (metadata.partitions == 0
+                                || topicName.getPartitionIndex() < 
metadata.partitions) {
+                            return checkTopicNsOwnership(topic);
+                        } else {
+                            final String errorMsg =
+                                    String.format("Illegal topic partition 
name %s with max allowed "
+                                            + "%d partitions", topicName, 
metadata.partitions);
+                            log.warn(errorMsg);
+                            return FutureUtil.failedFuture(
+                                    new 
BrokerServiceException.NotAllowedException(errorMsg));
+                        }
+                    });
+        } else {
+            ownedFuture = checkTopicNsOwnership(topic);
+        }
         final var topicFuture = context.getTopicFuture();
-        checkTopicNsOwnership(topic)
+        ownedFuture
                 .thenRun(() -> {
                     final Semaphore topicLoadSemaphore = 
topicLoadRequestSemaphore.get();
 

Reply via email to