This is an automated email from the ASF dual-hosted git repository. lhotari pushed a commit to branch branch-4.0 in repository https://gitbox.apache.org/repos/asf/pulsar.git
commit d6f17d06a934720e2cc8fa0fc372c155d3502882 Author: Lari Hotari <lhot...@apache.org> AuthorDate: Wed Apr 23 13:51:33 2025 +0300 Revert "[fix][broker] Directly query single topic existence when the topic is partitioned (#24154)" This reverts commit b619f9cbe875737ff94a95162ccad3f45050c6bf. --- .../pulsar/broker/namespace/NamespaceService.java | 16 ++- .../broker/namespace/NamespaceServiceTest.java | 107 ++++----------------- .../broker/service/BrokerServiceChaosTest.java | 4 +- ...OneWayReplicatorUsingGlobalPartitionedTest.java | 4 +- .../pulsar/client/api/ConsumerCreationTest.java | 2 +- .../pulsar/client/api/ProducerCreationTest.java | 2 +- .../pulsar/client/api/PulsarClientException.java | 4 +- 7 files changed, 32 insertions(+), 107 deletions(-) diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/namespace/NamespaceService.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/namespace/NamespaceService.java index e6025ccceb1..8f5cef1bdff 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/namespace/NamespaceService.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/namespace/NamespaceService.java @@ -1416,22 +1416,18 @@ public class NamespaceService implements AutoCloseable { return pulsar.getBrokerService().fetchPartitionedTopicMetadataAsync( topic.isPartitioned() ? TopicName.get(topic.getPartitionedTopicName()) : topic) .thenCompose(metadata -> { - // When the topic has metadata: - // - The topic name is non-partitioned, which means that the topic exists. - // - The topic name is partitioned, please check the specific partition. if (metadata.partitions > 0) { if (!topic.isPartitioned()) { return CompletableFuture.completedFuture( TopicExistsInfo.newPartitionedTopicExists(metadata.partitions)); - } - if (!topic.isPersistent()) { - // A non-persistent partitioned topic contains only metadata. - // Since no actual partitions are created, there's no need to check under /managed-ledgers. - return CompletableFuture.completedFuture(topic.getPartitionIndex() < metadata.partitions - ? TopicExistsInfo.newNonPartitionedTopicExists() - : TopicExistsInfo.newTopicNotExists()); + } else { + if (topic.getPartitionIndex() < metadata.partitions) { + return CompletableFuture.completedFuture( + TopicExistsInfo.newNonPartitionedTopicExists()); + } } } + // Direct query the single topic. return checkNonPartitionedTopicExists(topic).thenApply( b -> b ? TopicExistsInfo.newNonPartitionedTopicExists() : TopicExistsInfo.newTopicNotExists()); diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/namespace/NamespaceServiceTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/namespace/NamespaceServiceTest.java index 2a5989eb53d..951247bd688 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/namespace/NamespaceServiceTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/namespace/NamespaceServiceTest.java @@ -19,7 +19,6 @@ package org.apache.pulsar.broker.namespace; import static org.apache.pulsar.broker.resources.LoadBalanceResources.BUNDLE_DATA_BASE_PATH; -import static org.assertj.core.api.Assertions.assertThat; import static org.mockito.ArgumentMatchers.any; import static org.mockito.Mockito.doAnswer; import static org.mockito.Mockito.doReturn; @@ -87,7 +86,6 @@ import org.apache.pulsar.common.policies.data.LocalPolicies; import org.apache.pulsar.common.policies.data.Policies; import org.apache.pulsar.common.policies.data.TenantInfo; import org.apache.pulsar.common.policies.data.TenantInfoImpl; -import org.apache.pulsar.common.policies.data.TopicType; import org.apache.pulsar.common.util.ObjectMapperFactory; import org.apache.pulsar.metadata.api.GetResult; import org.apache.pulsar.metadata.api.MetadataCache; @@ -818,6 +816,23 @@ public class NamespaceServiceTest extends BrokerTestBase { }; } + @Test(dataProvider = "topicDomain") + public void testCheckTopicExists(String topicDomain) throws Exception { + String topic = topicDomain + "://prop/ns-abc/" + UUID.randomUUID(); + admin.topics().createNonPartitionedTopic(topic); + Awaitility.await().untilAsserted(() -> { + assertTrue(pulsar.getNamespaceService().checkTopicExists(TopicName.get(topic)).get().isExists()); + }); + + String partitionedTopic = topicDomain + "://prop/ns-abc/" + UUID.randomUUID(); + admin.topics().createPartitionedTopic(partitionedTopic, 5); + Awaitility.await().untilAsserted(() -> { + assertTrue(pulsar.getNamespaceService().checkTopicExists(TopicName.get(partitionedTopic)).get().isExists()); + assertTrue(pulsar.getNamespaceService() + .checkTopicExists(TopicName.get(partitionedTopic + "-partition-2")).get().isExists()); + }); + } + @Test public void testAllowedClustersAtNamespaceLevelShouldBeIncludedInAllowedClustersAtTenantLevel() throws Exception { // 1. Setup @@ -939,94 +954,6 @@ public class NamespaceServiceTest extends BrokerTestBase { pulsar.getConfiguration().setForceDeleteTenantAllowed(false); } - - @Test(dataProvider = "topicDomain") - public void checkTopicExistsForNonPartitionedTopic(String topicDomain) throws Exception { - TopicName topicName = TopicName.get(topicDomain, "prop", "ns-abc", "topic-" + UUID.randomUUID()); - admin.topics().createNonPartitionedTopic(topicName.toString()); - CompletableFuture<TopicExistsInfo> result = pulsar.getNamespaceService().checkTopicExists(topicName); - assertThat(result) - .succeedsWithin(3, TimeUnit.SECONDS) - .satisfies(n -> { - assertTrue(n.isExists()); - assertEquals(n.getPartitions(), 0); - assertEquals(n.getTopicType(), TopicType.NON_PARTITIONED); - n.recycle(); - }); - } - - @Test(dataProvider = "topicDomain") - public void checkTopicExistsForPartitionedTopic(String topicDomain) throws Exception { - TopicName topicName = TopicName.get(topicDomain, "prop", "ns-abc", "topic-" + UUID.randomUUID()); - admin.topics().createPartitionedTopic(topicName.toString(), 3); - - // Check the topic exists by the partitions. - CompletableFuture<TopicExistsInfo> result = pulsar.getNamespaceService().checkTopicExists(topicName); - assertThat(result) - .succeedsWithin(3, TimeUnit.SECONDS) - .satisfies(n -> { - assertTrue(n.isExists()); - assertEquals(n.getPartitions(), 3); - assertEquals(n.getTopicType(), TopicType.PARTITIONED); - n.recycle(); - }); - - // Check the specific partition. - result = pulsar.getNamespaceService().checkTopicExists(topicName.getPartition(2)); - assertThat(result) - .succeedsWithin(3, TimeUnit.SECONDS) - .satisfies(n -> { - assertTrue(n.isExists()); - assertEquals(n.getPartitions(), 0); - assertEquals(n.getTopicType(), TopicType.NON_PARTITIONED); - n.recycle(); - }); - - // Partition index is out of range. - result = pulsar.getNamespaceService().checkTopicExists(topicName.getPartition(10)); - assertThat(result) - .succeedsWithin(3, TimeUnit.SECONDS) - .satisfies(n -> { - assertFalse(n.isExists()); - assertEquals(n.getPartitions(), 0); - assertEquals(n.getTopicType(), TopicType.NON_PARTITIONED); - n.recycle(); - }); - } - - @Test(dataProvider = "topicDomain") - public void checkTopicExistsForNonExistentNonPartitionedTopic(String topicDomain) { - TopicName topicName = TopicName.get(topicDomain, "prop", "ns-abc", "topic-" + UUID.randomUUID()); - CompletableFuture<TopicExistsInfo> result = pulsar.getNamespaceService().checkTopicExists(topicName); - assertThat(result) - .succeedsWithin(3, TimeUnit.SECONDS) - .satisfies(n -> { - // when using the pulsar client to check non_persistent topic, always return true, so ignore to - // check that. - if (topicDomain.equals(TopicDomain.persistent)) { - assertFalse(n.isExists()); - } - n.recycle(); - }); - } - - @Test(dataProvider = "topicDomain") - public void checkTopicExistsForNonExistentPartitionTopic(String topicDomain) { - TopicName topicName = - TopicName.get(topicDomain, "prop", "ns-abc", "topic-" + UUID.randomUUID() + "-partition-10"); - CompletableFuture<TopicExistsInfo> result = pulsar.getNamespaceService().checkTopicExists(topicName); - assertThat(result) - .succeedsWithin(3, TimeUnit.SECONDS) - .satisfies(n -> { - // when using the pulsar client to check non_persistent topic, always return true, so ignore to - // check that. - if (topicDomain.equals(TopicDomain.persistent)) { - assertFalse(n.isExists()); - } - n.recycle(); - }); - } - /** * 1. Manually trigger "LoadReportUpdaterTask" * 2. Registry another new zk-node-listener "waitForBrokerChangeNotice". diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/BrokerServiceChaosTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/BrokerServiceChaosTest.java index 5650fe6e72f..6313d72329d 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/BrokerServiceChaosTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/BrokerServiceChaosTest.java @@ -37,7 +37,9 @@ import org.testng.annotations.BeforeClass; import org.testng.annotations.Test; @Slf4j -@Test(groups = "broker") +// TODO: This test is in flaky group until CI is fixed. +// To be addressed as part of https://github.com/apache/pulsar/pull/24154 +@Test(groups = "flaky") public class BrokerServiceChaosTest extends CanReconnectZKClientPulsarServiceBaseTest { @Override diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/OneWayReplicatorUsingGlobalPartitionedTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/OneWayReplicatorUsingGlobalPartitionedTest.java index 2a2a1befd16..a22067101c3 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/OneWayReplicatorUsingGlobalPartitionedTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/OneWayReplicatorUsingGlobalPartitionedTest.java @@ -41,7 +41,9 @@ import org.testng.annotations.BeforeClass; import org.testng.annotations.Test; @Slf4j -@Test(groups = "broker") +// TODO: This test is in flaky group until CI is fixed. +// To be addressed as part of https://github.com/apache/pulsar/pull/24154 +@Test(groups = "flaky") public class OneWayReplicatorUsingGlobalPartitionedTest extends OneWayReplicatorTest { @Override diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/ConsumerCreationTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/ConsumerCreationTest.java index 195485739e0..a81dbe02b34 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/ConsumerCreationTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/ConsumerCreationTest.java @@ -117,7 +117,7 @@ public class ConsumerCreationTest extends ProducerConsumerBase { Consumer<byte[]> ignored = pulsarClient.newConsumer().topic(partitionedTopic).subscriptionName("my-sub").subscribe(); } else { - assertThrows(PulsarClientException.NotFoundException.class, () -> { + assertThrows(PulsarClientException.TopicDoesNotExistException.class, () -> { @Cleanup Consumer<byte[]> ignored = pulsarClient.newConsumer().topic(partitionedTopic).subscriptionName("my-sub").subscribe(); diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/ProducerCreationTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/ProducerCreationTest.java index 48ee112d750..cd75383a487 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/ProducerCreationTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/ProducerCreationTest.java @@ -258,7 +258,7 @@ public class ProducerCreationTest extends ProducerConsumerBase { @Cleanup Producer<byte[]> ignored = pulsarClient.newProducer().topic(partitionedTopic).create(); } else { - assertThrows(PulsarClientException.NotFoundException.class, () -> { + assertThrows(PulsarClientException.TopicDoesNotExistException.class, () -> { @Cleanup Producer<byte[]> ignored = pulsarClient.newProducer().topic(partitionedTopic).create(); }); diff --git a/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/PulsarClientException.java b/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/PulsarClientException.java index 6bcba51defd..32cec950a69 100644 --- a/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/PulsarClientException.java +++ b/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/PulsarClientException.java @@ -961,9 +961,7 @@ public class PulsarClientException extends IOException { public static Throwable wrap(Throwable t, String msg) { msg += "\n" + t.getMessage(); // wrap an exception with new message info - if (t instanceof NotFoundException) { - return new NotFoundException(msg); - } else if (t instanceof TopicDoesNotExistException) { + if (t instanceof TopicDoesNotExistException) { return new TopicDoesNotExistException(msg); } else if (t instanceof TimeoutException) { return new TimeoutException(msg);