This is an automated email from the ASF dual-hosted git repository. lhotari pushed a commit to branch branch-4.0 in repository https://gitbox.apache.org/repos/asf/pulsar.git
commit 05d88f71e179184f5862a3ee4990f0b31b751468 Author: Yunze Xu <[email protected]> AuthorDate: Thu Sep 25 17:02:53 2025 +0800 [improve][broker] Replace isServiceUnitActiveAsync with checkTopicNsOwnership (#24780) (cherry picked from commit 46a76e98d6a6f2a86839d93467c5337bb205f851) --- .../pulsar/broker/namespace/NamespaceService.java | 31 ------------ .../pulsar/broker/service/BrokerService.java | 55 +++++++++------------- .../PersistentDispatcherFailoverConsumerTest.java | 2 - .../service/PersistentTopicConcurrentTest.java | 2 - .../pulsar/broker/service/PersistentTopicTest.java | 2 - .../pulsar/broker/service/ServerCnxTest.java | 6 +-- .../client/api/OrphanPersistentTopicTest.java | 6 +-- 7 files changed, 28 insertions(+), 76 deletions(-) diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/namespace/NamespaceService.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/namespace/NamespaceService.java index a580c7500b7..b5dbddc2ea3 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/namespace/NamespaceService.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/namespace/NamespaceService.java @@ -43,9 +43,7 @@ import java.util.Set; import java.util.concurrent.CompletableFuture; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.CopyOnWriteArrayList; -import java.util.concurrent.ExecutionException; import java.util.concurrent.TimeUnit; -import java.util.concurrent.TimeoutException; import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicReference; import java.util.function.Function; @@ -1225,35 +1223,6 @@ public class NamespaceService implements AutoCloseable { new IllegalArgumentException("Invalid class of NamespaceBundle: " + suName.getClass().getName())); } - /** - * @deprecated This method is only used in test now. - */ - @Deprecated - public boolean isServiceUnitActive(TopicName topicName) { - try { - return isServiceUnitActiveAsync(topicName).get(pulsar.getConfig() - .getMetadataStoreOperationTimeoutSeconds(), SECONDS); - } catch (InterruptedException | ExecutionException | TimeoutException e) { - LOG.warn("Unable to find OwnedBundle for topic in time - [{}]", topicName, e); - throw new RuntimeException(e); - } - } - - public CompletableFuture<Boolean> isServiceUnitActiveAsync(TopicName topicName) { - // TODO: Add unit tests cover it. - if (ExtensibleLoadManagerImpl.isLoadManagerExtensionEnabled(pulsar)) { - return getBundleAsync(topicName) - .thenCompose(bundle -> loadManager.get().checkOwnershipAsync(Optional.of(topicName), bundle)); - } - return getBundleAsync(topicName).thenCompose(bundle -> { - Optional<CompletableFuture<OwnedBundle>> optionalFuture = ownershipCache.getOwnedBundleAsync(bundle); - if (optionalFuture.isEmpty()) { - return CompletableFuture.completedFuture(false); - } - return optionalFuture.get().thenApply(ob -> ob != null && ob.isActive()); - }); - } - private CompletableFuture<Boolean> isNamespaceOwnedAsync(NamespaceName fqnn) { // TODO: Add unit tests cover it. if (ExtensibleLoadManagerImpl.isLoadManagerExtensionEnabled(pulsar)) { diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java index 2a8467c68d3..29528942369 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java @@ -1704,38 +1704,29 @@ public class BrokerService implements Closeable { CompletableFuture<Optional<Topic>> topicFuture, Map<String, String> properties) { TopicName topicName = TopicName.get(topic); - pulsar.getNamespaceService().isServiceUnitActiveAsync(topicName) - .thenAccept(isActive -> { - if (isActive) { - CompletableFuture<Map<String, String>> propertiesFuture; - if (properties == null) { - //Read properties from storage when loading topic. - propertiesFuture = fetchTopicPropertiesAsync(topicName); - } else { - propertiesFuture = CompletableFuture.completedFuture(properties); - } - propertiesFuture.thenAccept(finalProperties -> - //TODO add topicName in properties? - createPersistentTopic0(topic, createIfMissing, topicFuture, - finalProperties) - ).exceptionally(throwable -> { - log.warn("[{}] Read topic property failed", topic, throwable); - pulsar.getExecutor().execute(() -> topics.remove(topic, topicFuture)); - topicFuture.completeExceptionally(throwable); - return null; - }); - } else { - // namespace is being unloaded - String msg = String.format("Namespace is being unloaded, cannot add topic %s", topic); - log.warn(msg); - pulsar.getExecutor().execute(() -> topics.remove(topic, topicFuture)); - topicFuture.completeExceptionally(new ServiceUnitNotReadyException(msg)); - } - }).exceptionally(ex -> { - pulsar.getExecutor().execute(() -> topics.remove(topic, topicFuture)); - topicFuture.completeExceptionally(ex); - return null; - }); + checkTopicNsOwnership(topic).thenRun(() -> { + CompletableFuture<Map<String, String>> propertiesFuture; + if (properties == null) { + //Read properties from storage when loading topic. + propertiesFuture = fetchTopicPropertiesAsync(topicName); + } else { + propertiesFuture = CompletableFuture.completedFuture(properties); + } + propertiesFuture.thenAccept(finalProperties -> + //TODO add topicName in properties? + createPersistentTopic0(topic, createIfMissing, topicFuture, + finalProperties) + ).exceptionally(throwable -> { + log.warn("[{}] Read topic property failed", topic, throwable); + pulsar.getExecutor().execute(() -> topics.remove(topic, topicFuture)); + topicFuture.completeExceptionally(throwable); + return null; + }); + }).exceptionally(e -> { + pulsar.getExecutor().execute(() -> topics.remove(topic, topicFuture)); + topicFuture.completeExceptionally(e.getCause()); + return null; + }); } @VisibleForTesting diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentDispatcherFailoverConsumerTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentDispatcherFailoverConsumerTest.java index 96ca2d90f06..37cf75d84ca 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentDispatcherFailoverConsumerTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentDispatcherFailoverConsumerTest.java @@ -79,7 +79,6 @@ import org.apache.pulsar.common.api.proto.CommandSubscribe.InitialPosition; import org.apache.pulsar.common.api.proto.CommandSubscribe.SubType; import org.apache.pulsar.common.api.proto.ProtocolVersion; import org.apache.pulsar.common.naming.NamespaceBundle; -import org.apache.pulsar.common.naming.TopicName; import org.apache.pulsar.common.util.netty.EventLoopUtil; import org.awaitility.Awaitility; import org.slf4j.Logger; @@ -162,7 +161,6 @@ public class PersistentDispatcherFailoverConsumerTest { NamespaceService nsSvc = pulsarTestContext.getPulsarService().getNamespaceService(); doReturn(true).when(nsSvc).isServiceUnitOwned(any(NamespaceBundle.class)); - doReturn(true).when(nsSvc).isServiceUnitActive(any(TopicName.class)); doReturn(CompletableFuture.completedFuture(mock(NamespaceBundle.class))).when(nsSvc).getBundleAsync(any()); doReturn(CompletableFuture.completedFuture(true)).when(nsSvc).checkBundleOwnership(any(), any()); diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentTopicConcurrentTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentTopicConcurrentTest.java index 20f58f277a3..2f8a9246351 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentTopicConcurrentTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentTopicConcurrentTest.java @@ -51,7 +51,6 @@ import org.apache.pulsar.broker.service.persistent.PersistentTopic; import org.apache.pulsar.common.api.proto.CommandSubscribe; import org.apache.pulsar.common.api.proto.CommandSubscribe.InitialPosition; import org.apache.pulsar.common.naming.NamespaceBundle; -import org.apache.pulsar.common.naming.TopicName; import org.apache.pulsar.common.policies.data.InactiveTopicDeleteMode; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -103,7 +102,6 @@ public class PersistentTopicConcurrentTest extends MockedBookKeeperTestCase { NamespaceService nsSvc = mock(NamespaceService.class); doReturn(nsSvc).when(pulsar).getNamespaceService(); doReturn(true).when(nsSvc).isServiceUnitOwned(any(NamespaceBundle.class)); - doReturn(true).when(nsSvc).isServiceUnitActive(any(TopicName.class)); doReturn(CompletableFuture.completedFuture(mock(NamespaceBundle.class))).when(nsSvc).getBundleAsync(any()); doReturn(CompletableFuture.completedFuture(true)).when(nsSvc).checkBundleOwnership(any(), any()); diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentTopicTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentTopicTest.java index 42defbe293f..eaec93f78a1 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentTopicTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentTopicTest.java @@ -225,8 +225,6 @@ public class PersistentTopicTest extends MockedBookKeeperTestCase { NamespaceBundle bundle = mock(NamespaceBundle.class); doReturn(CompletableFuture.completedFuture(bundle)).when(nsSvc).getBundleAsync(any()); doReturn(true).when(nsSvc).isServiceUnitOwned(any()); - doReturn(true).when(nsSvc).isServiceUnitActive(any()); - doReturn(CompletableFuture.completedFuture(true)).when(nsSvc).isServiceUnitActiveAsync(any()); doReturn(CompletableFuture.completedFuture(mock(NamespaceBundle.class))).when(nsSvc).getBundleAsync(any()); doReturn(CompletableFuture.completedFuture(true)).when(nsSvc).checkBundleOwnership(any(), any()); diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ServerCnxTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ServerCnxTest.java index 4d734081e43..2cfbac35bfc 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ServerCnxTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ServerCnxTest.java @@ -231,8 +231,6 @@ public class ServerCnxTest { .getBundleAsync(any()); doReturn(CompletableFuture.completedFuture(true)).when(namespaceService).checkBundleOwnership(any(), any()); doReturn(true).when(namespaceService).isServiceUnitOwned(any()); - doReturn(true).when(namespaceService).isServiceUnitActive(any()); - doReturn(CompletableFuture.completedFuture(true)).when(namespaceService).isServiceUnitActiveAsync(any()); doReturn(CompletableFuture.completedFuture(topics)).when(namespaceService).getListOfTopics( NamespaceName.get("use", "ns-abc"), CommandGetTopicsOfNamespace.Mode.ALL); doReturn(CompletableFuture.completedFuture(topics)).when(namespaceService).getListOfUserTopics( @@ -1601,8 +1599,8 @@ public class ServerCnxTest { setChannelConnected(); // Force the case where the broker doesn't own any topic - doReturn(CompletableFuture.completedFuture(false)).when(namespaceService) - .isServiceUnitActiveAsync(any(TopicName.class)); + doReturn(CompletableFuture.failedFuture(new ServiceUnitNotReadyException("failed"))).when(brokerService) + .checkTopicNsOwnership(any(String.class)); // test PRODUCER failure case ByteBuf clientCommand = Commands.newProducer(nonOwnedTopicName, 1 /* producer id */, 1 /* request id */, diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/OrphanPersistentTopicTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/OrphanPersistentTopicTest.java index b7c323af5bc..3613ba51625 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/OrphanPersistentTopicTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/OrphanPersistentTopicTest.java @@ -245,7 +245,7 @@ public class OrphanPersistentTopicTest extends ProducerConsumerBase { admin.topics().createNonPartitionedTopic(tpName); admin.namespaces().unload(ns); - // Inject an error when calling "NamespaceService.isServiceUnitActiveAsync". + // Inject an error when loading the topic AtomicInteger failedTimes = new AtomicInteger(); NamespaceService namespaceService = pulsar.getNamespaceService(); doAnswer(invocation -> { @@ -258,7 +258,7 @@ public class OrphanPersistentTopicTest extends ProducerConsumerBase { return CompletableFuture.failedFuture(new RuntimeException("mocked error")); } return invocation.callRealMethod(); - }).when(namespaceService).isServiceUnitActiveAsync(any(TopicName.class)); + }).when(namespaceService).checkBundleOwnership(any(TopicName.class), any()); // Verify: the consumer can create successfully eventually. Consumer consumer = pulsarClient.newConsumer().topic(tpName).subscriptionName("s1").subscribe(); @@ -295,7 +295,7 @@ public class OrphanPersistentTopicTest extends ProducerConsumerBase { pulsar.getDefaultManagedLedgerFactory().delete(TopicName.get(tpName).getPersistenceNamingEncoding()); } return invocation.callRealMethod(); - }).when(namespaceService).isServiceUnitActiveAsync(any(TopicName.class)); + }).when(namespaceService).checkBundleOwnership(any(TopicName.class), any()); // Verify: the consumer create failed due to pulsar does not allow to create topic automatically. try {
