This is an automated email from the ASF dual-hosted git repository. xiangying pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/master by this push: new af24849b84a [fix][broker] fix wrong method name checkTopicExists. (#24293) af24849b84a is described below commit af24849b84a0273228d192939976d23e12168541 Author: Xiangying Meng <55571188+liangyepianz...@users.noreply.github.com> AuthorDate: Tue May 13 16:45:54 2025 +0800 [fix][broker] fix wrong method name checkTopicExists. (#24293) ### Motivation The current method `checkTopicExists` is an asynchronous method but follows a synchronous naming convention (lacking the `Async` suffix). This naming inconsistency can mislead developers into assuming it's a blocking operation, potentially causing misuse in client code. Since this method is `public`, we cannot remove it directly without breaking backward compatibility. ### Modifications 1. **Introduce a new asynchronous method**: - Added `checkTopicExistsAsync()` with the correct asynchronous naming convention. - Internally delegates to the original `checkTopicExists()` method to retain existing logic. 2. **Deprecate the original method**: - Marked `checkTopicExists()` as `@Deprecated` with a note directing users to the new `checkTopicExistsAsync()`. 3. **Refactor internal usages**: - Updated all internal calls to use `checkTopicExistsAsync()` instead of the deprecated method. 4. **Documentation updates**: - Added Javadoc to `checkTopicExists()` clarifying its deprecated status and replacement. This approach maintains backward compatibility while aligning method names with their asynchronous behavior. --- .../org/apache/pulsar/broker/admin/AdminResource.java | 2 +- .../pulsar/broker/admin/impl/PersistentTopicsBase.java | 4 ++-- .../org/apache/pulsar/broker/lookup/TopicLookupBase.java | 2 +- .../apache/pulsar/broker/namespace/NamespaceService.java | 15 ++++++++++++++- .../org/apache/pulsar/broker/service/BrokerService.java | 2 +- .../java/org/apache/pulsar/broker/service/ServerCnx.java | 2 +- .../java/org/apache/pulsar/broker/admin/TopicsTest.java | 4 ++-- .../pulsar/broker/lookup/http/HttpTopicLookupv2Test.java | 6 +++--- .../pulsar/broker/namespace/NamespaceServiceTest.java | 12 ++++++------ .../OneWayReplicatorUsingGlobalPartitionedTest.java | 2 +- .../broker/service/OneWayReplicatorUsingGlobalZKTest.java | 4 ++-- 11 files changed, 34 insertions(+), 21 deletions(-) diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/AdminResource.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/AdminResource.java index fb7679ff269..841d37fff3a 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/AdminResource.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/AdminResource.java @@ -983,7 +983,7 @@ public abstract class AdminResource extends PulsarWebResource { } protected CompletableFuture<Void> internalCheckTopicExists(TopicName topicName) { - return pulsar().getNamespaceService().checkTopicExists(topicName) + return pulsar().getNamespaceService().checkTopicExistsAsync(topicName) .thenAccept(info -> { boolean exists = info.isExists(); info.recycle(); diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java index 169a1b750c2..5e3e99020ae 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java @@ -740,7 +740,7 @@ public class PersistentTopicsBase extends AdminResource { } protected CompletableFuture<Void> internalCheckTopicExists(TopicName topicName) { - return pulsar().getNamespaceService().checkTopicExists(topicName) + return pulsar().getNamespaceService().checkTopicExistsAsync(topicName) .thenAccept(info -> { boolean exists = info.isExists(); info.recycle(); @@ -5412,7 +5412,7 @@ public class PersistentTopicsBase extends AdminResource { return FutureUtil.failedFuture(new RestException(Status.PRECONDITION_FAILED, "Only persistent topic can be set as shadow topic")); } - futures.add(pulsar().getNamespaceService().checkTopicExists(shadowTopicName) + futures.add(pulsar().getNamespaceService().checkTopicExistsAsync(shadowTopicName) .thenAccept(info -> { boolean exists = info.isExists(); info.recycle(); diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/lookup/TopicLookupBase.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/lookup/TopicLookupBase.java index e01ec83c860..92f045ad90d 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/lookup/TopicLookupBase.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/lookup/TopicLookupBase.java @@ -77,7 +77,7 @@ public class TopicLookupBase extends PulsarWebResource { return CompletableFuture.completedFuture(true); } // Case-2: Persistent topic. - return pulsar().getNamespaceService().checkTopicExists(topicName).thenCompose(info -> { + return pulsar().getNamespaceService().checkTopicExistsAsync(topicName).thenCompose(info -> { boolean exists = info.isExists(); info.recycle(); if (exists) { diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/namespace/NamespaceService.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/namespace/NamespaceService.java index 529d0968ebe..a8e140cef52 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/namespace/NamespaceService.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/namespace/NamespaceService.java @@ -1409,8 +1409,21 @@ public class NamespaceService implements AutoCloseable { } /*** - * Check topic exists( partitioned or non-partitioned ). + * Checks whether the topic exists( partitioned or non-partitioned ). */ + public CompletableFuture<TopicExistsInfo> checkTopicExistsAsync(TopicName topic) { + return checkTopicExists(topic); + } + + /** + * Checks whether the topic exists( partitioned or non-partitioned ). + * + * @deprecated This method uses a misleading synchronous name for an asynchronous operation. + * Use {@link #checkTopicExistsAsync(TopicName topic)} instead. + * + * @see #checkTopicExistsAsync(TopicName topic) + */ + @Deprecated public CompletableFuture<TopicExistsInfo> checkTopicExists(TopicName topic) { // For non-persistent/persistent partitioned topic, which has metadata. return pulsar.getBrokerService().fetchPartitionedTopicMetadataAsync( diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java index f46455bacb7..ac076715daa 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java @@ -3243,7 +3243,7 @@ public class BrokerService implements Closeable { if (pulsar.getNamespaceService() == null) { return FutureUtil.failedFuture(new NamingException("namespace service is not ready")); } - return pulsar.getNamespaceService().checkTopicExists(topicName).thenComposeAsync(topicExistsInfo -> { + return pulsar.getNamespaceService().checkTopicExistsAsync(topicName).thenComposeAsync(topicExistsInfo -> { final boolean topicExists = topicExistsInfo.isExists(); final TopicType topicType = topicExistsInfo.getTopicType(); final Integer partitions = topicExistsInfo.getPartitions(); diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java index f9605449b1f..8bfcd9ca82b 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java @@ -638,7 +638,7 @@ public class ServerCnx extends PulsarHandler implements TransportCnx { && brokerAllowAutoCreate; if (!autoCreateIfNotExist) { NamespaceService namespaceService = getBrokerService().getPulsar().getNamespaceService(); - namespaceService.checkTopicExists(topicName).thenAccept(topicExistsInfo -> { + namespaceService.checkTopicExistsAsync(topicName).thenAccept(topicExistsInfo -> { lookupSemaphore.release(); if (!topicExistsInfo.isExists()) { writeAndFlush(Commands.newPartitionMetadataResponse( diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/TopicsTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/TopicsTest.java index 8940fe4a1f3..68d0bcbe152 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/TopicsTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/TopicsTest.java @@ -363,7 +363,7 @@ public class TopicsTest extends MockedPulsarServiceBaseTest { CompletableFuture existFuture = new CompletableFuture(); existFuture.complete(TopicExistsInfo.newNonPartitionedTopicExists()); doReturn(future).when(nameSpaceService).getBrokerServiceUrlAsync(any(), any()); - doReturn(existFuture).when(nameSpaceService).checkTopicExists(any()); + doReturn(existFuture).when(nameSpaceService).checkTopicExistsAsync(any()); CompletableFuture existBooleanFuture = new CompletableFuture(); existBooleanFuture.complete(false); doReturn(existBooleanFuture).when(nameSpaceService).checkNonPartitionedTopicExists(any()); @@ -388,7 +388,7 @@ public class TopicsTest extends MockedPulsarServiceBaseTest { existFuture.complete(TopicExistsInfo.newTopicNotExists()); CompletableFuture existBooleanFuture = new CompletableFuture(); existBooleanFuture.complete(false); - doReturn(existFuture).when(nameSpaceService).checkTopicExists(any()); + doReturn(existFuture).when(nameSpaceService).checkTopicExistsAsync(any()); doReturn(existBooleanFuture).when(nameSpaceService).checkNonPartitionedTopicExists(any()); doReturn(nameSpaceService).when(pulsar).getNamespaceService(); AsyncResponse asyncResponse = mock(AsyncResponse.class); diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/lookup/http/HttpTopicLookupv2Test.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/lookup/http/HttpTopicLookupv2Test.java index ab492de055b..9d0851c951f 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/lookup/http/HttpTopicLookupv2Test.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/lookup/http/HttpTopicLookupv2Test.java @@ -152,7 +152,7 @@ public class HttpTopicLookupv2Test { NamespaceService namespaceService = pulsar.getNamespaceService(); CompletableFuture<TopicExistsInfo> future = new CompletableFuture<>(); future.complete(TopicExistsInfo.newTopicNotExists()); - doReturn(future).when(namespaceService).checkTopicExists(any(TopicName.class)); + doReturn(future).when(namespaceService).checkTopicExistsAsync(any(TopicName.class)); CompletableFuture<Boolean> booleanFuture = new CompletableFuture<>(); booleanFuture.complete(false); doReturn(booleanFuture).when(namespaceService).checkNonPartitionedTopicExists(any(TopicName.class)); @@ -266,7 +266,7 @@ public class HttpTopicLookupv2Test { NamespaceService namespaceService = pulsar.getNamespaceService(); CompletableFuture<TopicExistsInfo> future = new CompletableFuture<>(); future.complete(TopicExistsInfo.newTopicNotExists()); - doReturn(future).when(namespaceService).checkTopicExists(any(TopicName.class)); + doReturn(future).when(namespaceService).checkTopicExistsAsync(any(TopicName.class)); CompletableFuture<Boolean> booleanFuture = new CompletableFuture<>(); booleanFuture.complete(false); doReturn(future).when(namespaceService).checkNonPartitionedTopicExists(any(TopicName.class)); @@ -303,7 +303,7 @@ public class HttpTopicLookupv2Test { NamespaceService namespaceService = pulsar.getNamespaceService(); CompletableFuture<TopicExistsInfo> future = new CompletableFuture<>(); future.complete(TopicExistsInfo.newTopicNotExists()); - doReturn(future).when(namespaceService).checkTopicExists(any(TopicName.class)); + doReturn(future).when(namespaceService).checkTopicExistsAsync(any(TopicName.class)); // Get the current semaphore first Integer state1 = pulsar.getBrokerService().getLookupRequestSemaphore().availablePermits(); diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/namespace/NamespaceServiceTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/namespace/NamespaceServiceTest.java index 2a5989eb53d..bd80f05bcf1 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/namespace/NamespaceServiceTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/namespace/NamespaceServiceTest.java @@ -944,7 +944,7 @@ public class NamespaceServiceTest extends BrokerTestBase { public void checkTopicExistsForNonPartitionedTopic(String topicDomain) throws Exception { TopicName topicName = TopicName.get(topicDomain, "prop", "ns-abc", "topic-" + UUID.randomUUID()); admin.topics().createNonPartitionedTopic(topicName.toString()); - CompletableFuture<TopicExistsInfo> result = pulsar.getNamespaceService().checkTopicExists(topicName); + CompletableFuture<TopicExistsInfo> result = pulsar.getNamespaceService().checkTopicExistsAsync(topicName); assertThat(result) .succeedsWithin(3, TimeUnit.SECONDS) .satisfies(n -> { @@ -961,7 +961,7 @@ public class NamespaceServiceTest extends BrokerTestBase { admin.topics().createPartitionedTopic(topicName.toString(), 3); // Check the topic exists by the partitions. - CompletableFuture<TopicExistsInfo> result = pulsar.getNamespaceService().checkTopicExists(topicName); + CompletableFuture<TopicExistsInfo> result = pulsar.getNamespaceService().checkTopicExistsAsync(topicName); assertThat(result) .succeedsWithin(3, TimeUnit.SECONDS) .satisfies(n -> { @@ -972,7 +972,7 @@ public class NamespaceServiceTest extends BrokerTestBase { }); // Check the specific partition. - result = pulsar.getNamespaceService().checkTopicExists(topicName.getPartition(2)); + result = pulsar.getNamespaceService().checkTopicExistsAsync(topicName.getPartition(2)); assertThat(result) .succeedsWithin(3, TimeUnit.SECONDS) .satisfies(n -> { @@ -983,7 +983,7 @@ public class NamespaceServiceTest extends BrokerTestBase { }); // Partition index is out of range. - result = pulsar.getNamespaceService().checkTopicExists(topicName.getPartition(10)); + result = pulsar.getNamespaceService().checkTopicExistsAsync(topicName.getPartition(10)); assertThat(result) .succeedsWithin(3, TimeUnit.SECONDS) .satisfies(n -> { @@ -997,7 +997,7 @@ public class NamespaceServiceTest extends BrokerTestBase { @Test(dataProvider = "topicDomain") public void checkTopicExistsForNonExistentNonPartitionedTopic(String topicDomain) { TopicName topicName = TopicName.get(topicDomain, "prop", "ns-abc", "topic-" + UUID.randomUUID()); - CompletableFuture<TopicExistsInfo> result = pulsar.getNamespaceService().checkTopicExists(topicName); + CompletableFuture<TopicExistsInfo> result = pulsar.getNamespaceService().checkTopicExistsAsync(topicName); assertThat(result) .succeedsWithin(3, TimeUnit.SECONDS) .satisfies(n -> { @@ -1014,7 +1014,7 @@ public class NamespaceServiceTest extends BrokerTestBase { public void checkTopicExistsForNonExistentPartitionTopic(String topicDomain) { TopicName topicName = TopicName.get(topicDomain, "prop", "ns-abc", "topic-" + UUID.randomUUID() + "-partition-10"); - CompletableFuture<TopicExistsInfo> result = pulsar.getNamespaceService().checkTopicExists(topicName); + CompletableFuture<TopicExistsInfo> result = pulsar.getNamespaceService().checkTopicExistsAsync(topicName); assertThat(result) .succeedsWithin(3, TimeUnit.SECONDS) .satisfies(n -> { diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/OneWayReplicatorUsingGlobalPartitionedTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/OneWayReplicatorUsingGlobalPartitionedTest.java index 2592845cbd2..585fe6ececa 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/OneWayReplicatorUsingGlobalPartitionedTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/OneWayReplicatorUsingGlobalPartitionedTest.java @@ -218,7 +218,7 @@ public class OneWayReplicatorUsingGlobalPartitionedTest extends OneWayReplicator assertFalse(tps.containsKey(topicP1)); assertFalse(tps.containsKey(topicChangeEvents)); assertFalse(pulsar1.getNamespaceService() - .checkTopicExists(TopicName.get(topicChangeEvents)) + .checkTopicExistsAsync(TopicName.get(topicChangeEvents)) .get(5, TimeUnit.SECONDS).isExists()); // Verify: schema will be removed in local cluster, and remote cluster will not. List<CompletableFuture<StoredSchema>> schemaList13 diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/OneWayReplicatorUsingGlobalZKTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/OneWayReplicatorUsingGlobalZKTest.java index 7914ba5aebb..5cbea8df129 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/OneWayReplicatorUsingGlobalZKTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/OneWayReplicatorUsingGlobalZKTest.java @@ -211,10 +211,10 @@ public class OneWayReplicatorUsingGlobalZKTest extends OneWayReplicatorTest { Map<String, CompletableFuture<Optional<Topic>>> tps = pulsar1.getBrokerService().getTopics(); assertFalse(tps.containsKey(topic)); assertFalse(tps.containsKey(topicChangeEvents)); - assertFalse(pulsar1.getNamespaceService().checkTopicExists(TopicName.get(topic)) + assertFalse(pulsar1.getNamespaceService().checkTopicExistsAsync(TopicName.get(topic)) .get(5, TimeUnit.SECONDS).isExists()); assertFalse(pulsar1.getNamespaceService() - .checkTopicExists(TopicName.get(topicChangeEvents)) + .checkTopicExistsAsync(TopicName.get(topicChangeEvents)) .get(5, TimeUnit.SECONDS).isExists()); });