This is an automated email from the ASF dual-hosted git repository.
xyz pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/master by this push:
new 19b4a05f888 [improve][broker] Improve the log when namespace bundle is
not available (#24434)
19b4a05f888 is described below
commit 19b4a05f888da35a584cf7ff6d594dd9df5e03d1
Author: Yunze Xu <[email protected]>
AuthorDate: Sat Jun 21 20:14:44 2025 +0800
[improve][broker] Improve the log when namespace bundle is not available
(#24434)
---
.../apache/pulsar/broker/namespace/NamespaceService.java | 13 ++++++++-----
.../org/apache/pulsar/broker/service/BrokerService.java | 12 ++++++------
.../service/PersistentDispatcherFailoverConsumerTest.java | 3 ++-
.../broker/service/PersistentTopicConcurrentTest.java | 3 ++-
.../apache/pulsar/broker/service/PersistentTopicTest.java | 3 ++-
.../org/apache/pulsar/broker/service/ServerCnxTest.java | 6 ++++--
6 files changed, 24 insertions(+), 16 deletions(-)
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/namespace/NamespaceService.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/namespace/NamespaceService.java
index 7aa43fc8383..32fe4ca1449 100644
---
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/namespace/NamespaceService.java
+++
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/namespace/NamespaceService.java
@@ -1274,13 +1274,16 @@ public class NamespaceService implements AutoCloseable {
}
public CompletableFuture<Boolean> checkTopicOwnership(TopicName topicName)
{
- // TODO: Add unit tests cover it.
+ return getBundleAsync(topicName).thenCompose(bundle ->
checkBundleOwnership(topicName, bundle));
+ }
+
+ public CompletableFuture<Boolean> checkBundleOwnership(TopicName
topicName, NamespaceBundle bundle) {
if (ExtensibleLoadManagerImpl.isLoadManagerExtensionEnabled(pulsar)) {
- return getBundleAsync(topicName)
- .thenCompose(bundle ->
loadManager.get().checkOwnershipAsync(Optional.of(topicName), bundle));
+ // TODO: Add unit tests cover it.
+ return
loadManager.get().checkOwnershipAsync(Optional.of(topicName), bundle);
+ } else {
+ return ownershipCache.checkOwnershipAsync(bundle);
}
- return getBundleAsync(topicName)
- .thenCompose(ownershipCache::checkOwnershipAsync);
}
public CompletableFuture<Void> removeOwnedServiceUnitAsync(NamespaceBundle
nsBundle) {
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java
index c8032712d71..359c0daf5b8 100644
---
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java
+++
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java
@@ -2364,19 +2364,19 @@ public class BrokerService implements Closeable {
public CompletableFuture<Void> checkTopicNsOwnership(final String topic) {
TopicName topicName = TopicName.get(topic);
+ final var namespaceService = pulsar.getNamespaceService();
- return pulsar.getNamespaceService().checkTopicOwnership(topicName)
- .thenCompose(ownedByThisInstance -> {
+ return namespaceService.getBundleAsync(topicName).thenCompose(bundle ->
+ namespaceService.checkBundleOwnership(topicName,
bundle).thenCompose(ownedByThisInstance -> {
if (ownedByThisInstance) {
return CompletableFuture.completedFuture(null);
} else {
- String msg = String.format("Namespace bundle for topic
(%s) not served by this instance:%s. "
- + "Please redo the lookup. Request is
denied: namespace=%s",
- topic, pulsar.getBrokerId(),
topicName.getNamespace());
+ String msg = String.format("Namespace bundle (%s) for
topic (%s) not served by this instance:"
+ + "%s. Please redo the lookup.", bundle,
topic, pulsar.getBrokerId());
log.warn(msg);
return FutureUtil.failedFuture(new
ServiceUnitNotReadyException(msg));
}
- });
+ }));
}
public CompletableFuture<Integer> unloadServiceUnit(NamespaceBundle
serviceUnit,
diff --git
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentDispatcherFailoverConsumerTest.java
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentDispatcherFailoverConsumerTest.java
index 2899e9f2d67..3e31435b1bf 100644
---
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentDispatcherFailoverConsumerTest.java
+++
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentDispatcherFailoverConsumerTest.java
@@ -163,7 +163,8 @@ public class PersistentDispatcherFailoverConsumerTest {
NamespaceService nsSvc =
pulsarTestContext.getPulsarService().getNamespaceService();
doReturn(true).when(nsSvc).isServiceUnitOwned(any(NamespaceBundle.class));
doReturn(true).when(nsSvc).isServiceUnitActive(any(TopicName.class));
-
doReturn(CompletableFuture.completedFuture(true)).when(nsSvc).checkTopicOwnership(any(TopicName.class));
+
doReturn(CompletableFuture.completedFuture(mock(NamespaceBundle.class))).when(nsSvc).getBundleAsync(any());
+
doReturn(CompletableFuture.completedFuture(true)).when(nsSvc).checkBundleOwnership(any(),
any());
setupMLAsyncCallbackMocks();
diff --git
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentTopicConcurrentTest.java
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentTopicConcurrentTest.java
index f75a3256747..2b2f0ed6d75 100644
---
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentTopicConcurrentTest.java
+++
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentTopicConcurrentTest.java
@@ -104,7 +104,8 @@ public class PersistentTopicConcurrentTest extends
MockedBookKeeperTestCase {
doReturn(nsSvc).when(pulsar).getNamespaceService();
doReturn(true).when(nsSvc).isServiceUnitOwned(any(NamespaceBundle.class));
doReturn(true).when(nsSvc).isServiceUnitActive(any(TopicName.class));
-
doReturn(CompletableFuture.completedFuture(true)).when(nsSvc).checkTopicOwnership(any(TopicName.class));
+
doReturn(CompletableFuture.completedFuture(mock(NamespaceBundle.class))).when(nsSvc).getBundleAsync(any());
+
doReturn(CompletableFuture.completedFuture(true)).when(nsSvc).checkBundleOwnership(any(),
any());
final List<Position> addedEntries = new ArrayList<>();
diff --git
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentTopicTest.java
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentTopicTest.java
index a22e090451e..e36a940b4ea 100644
---
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentTopicTest.java
+++
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentTopicTest.java
@@ -226,7 +226,8 @@ public class PersistentTopicTest extends
MockedBookKeeperTestCase {
doReturn(true).when(nsSvc).isServiceUnitOwned(any());
doReturn(true).when(nsSvc).isServiceUnitActive(any());
doReturn(CompletableFuture.completedFuture(true)).when(nsSvc).isServiceUnitActiveAsync(any());
-
doReturn(CompletableFuture.completedFuture(true)).when(nsSvc).checkTopicOwnership(any());
+
doReturn(CompletableFuture.completedFuture(mock(NamespaceBundle.class))).when(nsSvc).getBundleAsync(any());
+
doReturn(CompletableFuture.completedFuture(true)).when(nsSvc).checkBundleOwnership(any(),
any());
setupMLAsyncCallbackMocks();
}
diff --git
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ServerCnxTest.java
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ServerCnxTest.java
index b10c0023d06..a4349bace8a 100644
---
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ServerCnxTest.java
+++
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ServerCnxTest.java
@@ -144,6 +144,7 @@ import org.apache.pulsar.common.api.proto.ProtocolVersion;
import org.apache.pulsar.common.api.proto.ServerError;
import org.apache.pulsar.common.api.proto.Subscription;
import org.apache.pulsar.common.api.proto.TxnAction;
+import org.apache.pulsar.common.naming.NamespaceBundle;
import org.apache.pulsar.common.naming.NamespaceName;
import org.apache.pulsar.common.naming.TopicName;
import org.apache.pulsar.common.policies.data.AuthAction;
@@ -226,11 +227,12 @@ public class ServerCnxTest {
brokerService = pulsarTestContext.getBrokerService();
namespaceService = pulsar.getNamespaceService();
-
doReturn(CompletableFuture.completedFuture(null)).when(namespaceService).getBundleAsync(any());
+
doReturn(CompletableFuture.completedFuture(mock(NamespaceBundle.class))).when(namespaceService)
+ .getBundleAsync(any());
+
doReturn(CompletableFuture.completedFuture(true)).when(namespaceService).checkBundleOwnership(any(),
any());
doReturn(true).when(namespaceService).isServiceUnitOwned(any());
doReturn(true).when(namespaceService).isServiceUnitActive(any());
doReturn(CompletableFuture.completedFuture(true)).when(namespaceService).isServiceUnitActiveAsync(any());
-
doReturn(CompletableFuture.completedFuture(true)).when(namespaceService).checkTopicOwnership(any());
doReturn(CompletableFuture.completedFuture(topics)).when(namespaceService).getListOfTopics(
NamespaceName.get("use", "ns-abc"),
CommandGetTopicsOfNamespace.Mode.ALL);
doReturn(CompletableFuture.completedFuture(topics)).when(namespaceService).getListOfUserTopics(