This is an automated email from the ASF dual-hosted git repository.

xyz pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/master by this push:
     new 19b4a05f888 [improve][broker] Improve the log when namespace bundle is 
not available (#24434)
19b4a05f888 is described below

commit 19b4a05f888da35a584cf7ff6d594dd9df5e03d1
Author: Yunze Xu <[email protected]>
AuthorDate: Sat Jun 21 20:14:44 2025 +0800

    [improve][broker] Improve the log when namespace bundle is not available 
(#24434)
---
 .../apache/pulsar/broker/namespace/NamespaceService.java    | 13 ++++++++-----
 .../org/apache/pulsar/broker/service/BrokerService.java     | 12 ++++++------
 .../service/PersistentDispatcherFailoverConsumerTest.java   |  3 ++-
 .../broker/service/PersistentTopicConcurrentTest.java       |  3 ++-
 .../apache/pulsar/broker/service/PersistentTopicTest.java   |  3 ++-
 .../org/apache/pulsar/broker/service/ServerCnxTest.java     |  6 ++++--
 6 files changed, 24 insertions(+), 16 deletions(-)

diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/namespace/NamespaceService.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/namespace/NamespaceService.java
index 7aa43fc8383..32fe4ca1449 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/namespace/NamespaceService.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/namespace/NamespaceService.java
@@ -1274,13 +1274,16 @@ public class NamespaceService implements AutoCloseable {
     }
 
     public CompletableFuture<Boolean> checkTopicOwnership(TopicName topicName) 
{
-        // TODO: Add unit tests cover it.
+        return getBundleAsync(topicName).thenCompose(bundle -> 
checkBundleOwnership(topicName, bundle));
+    }
+
+    public CompletableFuture<Boolean> checkBundleOwnership(TopicName 
topicName, NamespaceBundle bundle) {
         if (ExtensibleLoadManagerImpl.isLoadManagerExtensionEnabled(pulsar)) {
-            return getBundleAsync(topicName)
-                    .thenCompose(bundle -> 
loadManager.get().checkOwnershipAsync(Optional.of(topicName), bundle));
+            // TODO: Add unit tests cover it.
+            return 
loadManager.get().checkOwnershipAsync(Optional.of(topicName), bundle);
+        } else {
+            return ownershipCache.checkOwnershipAsync(bundle);
         }
-        return getBundleAsync(topicName)
-                .thenCompose(ownershipCache::checkOwnershipAsync);
     }
 
     public CompletableFuture<Void> removeOwnedServiceUnitAsync(NamespaceBundle 
nsBundle) {
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java
index c8032712d71..359c0daf5b8 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java
@@ -2364,19 +2364,19 @@ public class BrokerService implements Closeable {
 
     public CompletableFuture<Void> checkTopicNsOwnership(final String topic) {
         TopicName topicName = TopicName.get(topic);
+        final var namespaceService = pulsar.getNamespaceService();
 
-        return pulsar.getNamespaceService().checkTopicOwnership(topicName)
-                .thenCompose(ownedByThisInstance -> {
+        return namespaceService.getBundleAsync(topicName).thenCompose(bundle ->
+                namespaceService.checkBundleOwnership(topicName, 
bundle).thenCompose(ownedByThisInstance -> {
                     if (ownedByThisInstance) {
                         return CompletableFuture.completedFuture(null);
                     } else {
-                        String msg = String.format("Namespace bundle for topic 
(%s) not served by this instance:%s. "
-                                        + "Please redo the lookup. Request is 
denied: namespace=%s",
-                                topic, pulsar.getBrokerId(), 
topicName.getNamespace());
+                        String msg = String.format("Namespace bundle (%s) for 
topic (%s) not served by this instance:"
+                                + "%s. Please redo the lookup.", bundle, 
topic, pulsar.getBrokerId());
                         log.warn(msg);
                         return FutureUtil.failedFuture(new 
ServiceUnitNotReadyException(msg));
                     }
-                });
+                }));
     }
 
     public CompletableFuture<Integer> unloadServiceUnit(NamespaceBundle 
serviceUnit,
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentDispatcherFailoverConsumerTest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentDispatcherFailoverConsumerTest.java
index 2899e9f2d67..3e31435b1bf 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentDispatcherFailoverConsumerTest.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentDispatcherFailoverConsumerTest.java
@@ -163,7 +163,8 @@ public class PersistentDispatcherFailoverConsumerTest {
         NamespaceService nsSvc = 
pulsarTestContext.getPulsarService().getNamespaceService();
         
doReturn(true).when(nsSvc).isServiceUnitOwned(any(NamespaceBundle.class));
         doReturn(true).when(nsSvc).isServiceUnitActive(any(TopicName.class));
-        
doReturn(CompletableFuture.completedFuture(true)).when(nsSvc).checkTopicOwnership(any(TopicName.class));
+        
doReturn(CompletableFuture.completedFuture(mock(NamespaceBundle.class))).when(nsSvc).getBundleAsync(any());
+        
doReturn(CompletableFuture.completedFuture(true)).when(nsSvc).checkBundleOwnership(any(),
 any());
 
         setupMLAsyncCallbackMocks();
 
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentTopicConcurrentTest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentTopicConcurrentTest.java
index f75a3256747..2b2f0ed6d75 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentTopicConcurrentTest.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentTopicConcurrentTest.java
@@ -104,7 +104,8 @@ public class PersistentTopicConcurrentTest extends 
MockedBookKeeperTestCase {
         doReturn(nsSvc).when(pulsar).getNamespaceService();
         
doReturn(true).when(nsSvc).isServiceUnitOwned(any(NamespaceBundle.class));
         doReturn(true).when(nsSvc).isServiceUnitActive(any(TopicName.class));
-        
doReturn(CompletableFuture.completedFuture(true)).when(nsSvc).checkTopicOwnership(any(TopicName.class));
+        
doReturn(CompletableFuture.completedFuture(mock(NamespaceBundle.class))).when(nsSvc).getBundleAsync(any());
+        
doReturn(CompletableFuture.completedFuture(true)).when(nsSvc).checkBundleOwnership(any(),
 any());
 
         final List<Position> addedEntries = new ArrayList<>();
 
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentTopicTest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentTopicTest.java
index a22e090451e..e36a940b4ea 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentTopicTest.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentTopicTest.java
@@ -226,7 +226,8 @@ public class PersistentTopicTest extends 
MockedBookKeeperTestCase {
         doReturn(true).when(nsSvc).isServiceUnitOwned(any());
         doReturn(true).when(nsSvc).isServiceUnitActive(any());
         
doReturn(CompletableFuture.completedFuture(true)).when(nsSvc).isServiceUnitActiveAsync(any());
-        
doReturn(CompletableFuture.completedFuture(true)).when(nsSvc).checkTopicOwnership(any());
+        
doReturn(CompletableFuture.completedFuture(mock(NamespaceBundle.class))).when(nsSvc).getBundleAsync(any());
+        
doReturn(CompletableFuture.completedFuture(true)).when(nsSvc).checkBundleOwnership(any(),
 any());
 
         setupMLAsyncCallbackMocks();
     }
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ServerCnxTest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ServerCnxTest.java
index b10c0023d06..a4349bace8a 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ServerCnxTest.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ServerCnxTest.java
@@ -144,6 +144,7 @@ import org.apache.pulsar.common.api.proto.ProtocolVersion;
 import org.apache.pulsar.common.api.proto.ServerError;
 import org.apache.pulsar.common.api.proto.Subscription;
 import org.apache.pulsar.common.api.proto.TxnAction;
+import org.apache.pulsar.common.naming.NamespaceBundle;
 import org.apache.pulsar.common.naming.NamespaceName;
 import org.apache.pulsar.common.naming.TopicName;
 import org.apache.pulsar.common.policies.data.AuthAction;
@@ -226,11 +227,12 @@ public class ServerCnxTest {
         brokerService = pulsarTestContext.getBrokerService();
 
         namespaceService = pulsar.getNamespaceService();
-        
doReturn(CompletableFuture.completedFuture(null)).when(namespaceService).getBundleAsync(any());
+        
doReturn(CompletableFuture.completedFuture(mock(NamespaceBundle.class))).when(namespaceService)
+                .getBundleAsync(any());
+        
doReturn(CompletableFuture.completedFuture(true)).when(namespaceService).checkBundleOwnership(any(),
 any());
         doReturn(true).when(namespaceService).isServiceUnitOwned(any());
         doReturn(true).when(namespaceService).isServiceUnitActive(any());
         
doReturn(CompletableFuture.completedFuture(true)).when(namespaceService).isServiceUnitActiveAsync(any());
-        
doReturn(CompletableFuture.completedFuture(true)).when(namespaceService).checkTopicOwnership(any());
         
doReturn(CompletableFuture.completedFuture(topics)).when(namespaceService).getListOfTopics(
                 NamespaceName.get("use", "ns-abc"), 
CommandGetTopicsOfNamespace.Mode.ALL);
         
doReturn(CompletableFuture.completedFuture(topics)).when(namespaceService).getListOfUserTopics(

Reply via email to