This is an automated email from the ASF dual-hosted git repository.

lhotari pushed a commit to branch branch-4.0
in repository https://gitbox.apache.org/repos/asf/pulsar.git

commit d6f17d06a934720e2cc8fa0fc372c155d3502882
Author: Lari Hotari <lhot...@apache.org>
AuthorDate: Wed Apr 23 13:51:33 2025 +0300

    Revert "[fix][broker] Directly query single topic existence when the topic 
is partitioned (#24154)"
    
    This reverts commit b619f9cbe875737ff94a95162ccad3f45050c6bf.
---
 .../pulsar/broker/namespace/NamespaceService.java  |  16 ++-
 .../broker/namespace/NamespaceServiceTest.java     | 107 ++++-----------------
 .../broker/service/BrokerServiceChaosTest.java     |   4 +-
 ...OneWayReplicatorUsingGlobalPartitionedTest.java |   4 +-
 .../pulsar/client/api/ConsumerCreationTest.java    |   2 +-
 .../pulsar/client/api/ProducerCreationTest.java    |   2 +-
 .../pulsar/client/api/PulsarClientException.java   |   4 +-
 7 files changed, 32 insertions(+), 107 deletions(-)

diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/namespace/NamespaceService.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/namespace/NamespaceService.java
index e6025ccceb1..8f5cef1bdff 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/namespace/NamespaceService.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/namespace/NamespaceService.java
@@ -1416,22 +1416,18 @@ public class NamespaceService implements AutoCloseable {
         return pulsar.getBrokerService().fetchPartitionedTopicMetadataAsync(
                         topic.isPartitioned() ? 
TopicName.get(topic.getPartitionedTopicName()) : topic)
                 .thenCompose(metadata -> {
-                    // When the topic has metadata:
-                    // - The topic name is non-partitioned, which means that 
the topic exists.
-                    // - The topic name is partitioned, please check the 
specific partition.
                     if (metadata.partitions > 0) {
                         if (!topic.isPartitioned()) {
                             return CompletableFuture.completedFuture(
                                     
TopicExistsInfo.newPartitionedTopicExists(metadata.partitions));
-                        }
-                        if (!topic.isPersistent()) {
-                            // A non-persistent partitioned topic contains 
only metadata.
-                            // Since no actual partitions are created, there's 
no need to check under /managed-ledgers.
-                            return 
CompletableFuture.completedFuture(topic.getPartitionIndex() < 
metadata.partitions
-                                    ? 
TopicExistsInfo.newNonPartitionedTopicExists()
-                                    : TopicExistsInfo.newTopicNotExists());
+                        } else {
+                            if (topic.getPartitionIndex() < 
metadata.partitions) {
+                                return CompletableFuture.completedFuture(
+                                        
TopicExistsInfo.newNonPartitionedTopicExists());
+                            }
                         }
                     }
+                    // Direct query the single topic.
                     return checkNonPartitionedTopicExists(topic).thenApply(
                             b -> b ? 
TopicExistsInfo.newNonPartitionedTopicExists() :
                                     TopicExistsInfo.newTopicNotExists());
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/namespace/NamespaceServiceTest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/namespace/NamespaceServiceTest.java
index 2a5989eb53d..951247bd688 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/namespace/NamespaceServiceTest.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/namespace/NamespaceServiceTest.java
@@ -19,7 +19,6 @@
 package org.apache.pulsar.broker.namespace;
 
 import static 
org.apache.pulsar.broker.resources.LoadBalanceResources.BUNDLE_DATA_BASE_PATH;
-import static org.assertj.core.api.Assertions.assertThat;
 import static org.mockito.ArgumentMatchers.any;
 import static org.mockito.Mockito.doAnswer;
 import static org.mockito.Mockito.doReturn;
@@ -87,7 +86,6 @@ import org.apache.pulsar.common.policies.data.LocalPolicies;
 import org.apache.pulsar.common.policies.data.Policies;
 import org.apache.pulsar.common.policies.data.TenantInfo;
 import org.apache.pulsar.common.policies.data.TenantInfoImpl;
-import org.apache.pulsar.common.policies.data.TopicType;
 import org.apache.pulsar.common.util.ObjectMapperFactory;
 import org.apache.pulsar.metadata.api.GetResult;
 import org.apache.pulsar.metadata.api.MetadataCache;
@@ -818,6 +816,23 @@ public class NamespaceServiceTest extends BrokerTestBase {
         };
     }
 
+    @Test(dataProvider = "topicDomain")
+    public void testCheckTopicExists(String topicDomain) throws Exception {
+        String topic = topicDomain + "://prop/ns-abc/" + UUID.randomUUID();
+        admin.topics().createNonPartitionedTopic(topic);
+        Awaitility.await().untilAsserted(() -> {
+            
assertTrue(pulsar.getNamespaceService().checkTopicExists(TopicName.get(topic)).get().isExists());
+        });
+
+        String partitionedTopic = topicDomain + "://prop/ns-abc/" + 
UUID.randomUUID();
+        admin.topics().createPartitionedTopic(partitionedTopic, 5);
+        Awaitility.await().untilAsserted(() -> {
+            
assertTrue(pulsar.getNamespaceService().checkTopicExists(TopicName.get(partitionedTopic)).get().isExists());
+            assertTrue(pulsar.getNamespaceService()
+                    .checkTopicExists(TopicName.get(partitionedTopic + 
"-partition-2")).get().isExists());
+        });
+    }
+
     @Test
     public void 
testAllowedClustersAtNamespaceLevelShouldBeIncludedInAllowedClustersAtTenantLevel()
 throws Exception {
         // 1. Setup
@@ -939,94 +954,6 @@ public class NamespaceServiceTest extends BrokerTestBase {
         pulsar.getConfiguration().setForceDeleteTenantAllowed(false);
     }
 
-
-    @Test(dataProvider = "topicDomain")
-    public void checkTopicExistsForNonPartitionedTopic(String topicDomain) 
throws Exception {
-        TopicName topicName = TopicName.get(topicDomain, "prop", "ns-abc", 
"topic-" + UUID.randomUUID());
-        admin.topics().createNonPartitionedTopic(topicName.toString());
-        CompletableFuture<TopicExistsInfo> result = 
pulsar.getNamespaceService().checkTopicExists(topicName);
-        assertThat(result)
-                .succeedsWithin(3, TimeUnit.SECONDS)
-                .satisfies(n -> {
-                    assertTrue(n.isExists());
-                    assertEquals(n.getPartitions(), 0);
-                    assertEquals(n.getTopicType(), TopicType.NON_PARTITIONED);
-                    n.recycle();
-                });
-    }
-
-    @Test(dataProvider = "topicDomain")
-    public void checkTopicExistsForPartitionedTopic(String topicDomain) throws 
Exception {
-        TopicName topicName = TopicName.get(topicDomain, "prop", "ns-abc", 
"topic-" + UUID.randomUUID());
-        admin.topics().createPartitionedTopic(topicName.toString(), 3);
-
-        // Check the topic exists by the partitions.
-        CompletableFuture<TopicExistsInfo> result = 
pulsar.getNamespaceService().checkTopicExists(topicName);
-        assertThat(result)
-                .succeedsWithin(3, TimeUnit.SECONDS)
-                .satisfies(n -> {
-                    assertTrue(n.isExists());
-                    assertEquals(n.getPartitions(), 3);
-                    assertEquals(n.getTopicType(), TopicType.PARTITIONED);
-                    n.recycle();
-                });
-
-        // Check the specific partition.
-        result = 
pulsar.getNamespaceService().checkTopicExists(topicName.getPartition(2));
-        assertThat(result)
-                .succeedsWithin(3, TimeUnit.SECONDS)
-                .satisfies(n -> {
-                    assertTrue(n.isExists());
-                    assertEquals(n.getPartitions(), 0);
-                    assertEquals(n.getTopicType(), TopicType.NON_PARTITIONED);
-                    n.recycle();
-                });
-
-        // Partition index is out of range.
-        result = 
pulsar.getNamespaceService().checkTopicExists(topicName.getPartition(10));
-        assertThat(result)
-                .succeedsWithin(3, TimeUnit.SECONDS)
-                .satisfies(n -> {
-                    assertFalse(n.isExists());
-                    assertEquals(n.getPartitions(), 0);
-                    assertEquals(n.getTopicType(), TopicType.NON_PARTITIONED);
-                    n.recycle();
-                });
-    }
-
-    @Test(dataProvider = "topicDomain")
-    public void checkTopicExistsForNonExistentNonPartitionedTopic(String 
topicDomain) {
-        TopicName topicName = TopicName.get(topicDomain, "prop", "ns-abc", 
"topic-" + UUID.randomUUID());
-        CompletableFuture<TopicExistsInfo> result = 
pulsar.getNamespaceService().checkTopicExists(topicName);
-        assertThat(result)
-                .succeedsWithin(3, TimeUnit.SECONDS)
-                .satisfies(n -> {
-                    // when using the pulsar client to check non_persistent 
topic, always return true, so ignore to
-                    // check that.
-                    if (topicDomain.equals(TopicDomain.persistent)) {
-                        assertFalse(n.isExists());
-                    }
-                    n.recycle();
-                });
-    }
-
-    @Test(dataProvider = "topicDomain")
-    public void checkTopicExistsForNonExistentPartitionTopic(String 
topicDomain) {
-        TopicName topicName =
-                TopicName.get(topicDomain, "prop", "ns-abc", "topic-" + 
UUID.randomUUID() + "-partition-10");
-        CompletableFuture<TopicExistsInfo> result = 
pulsar.getNamespaceService().checkTopicExists(topicName);
-        assertThat(result)
-                .succeedsWithin(3, TimeUnit.SECONDS)
-                .satisfies(n -> {
-                    // when using the pulsar client to check non_persistent 
topic, always return true, so ignore to
-                    // check that.
-                    if (topicDomain.equals(TopicDomain.persistent)) {
-                        assertFalse(n.isExists());
-                    }
-                    n.recycle();
-                });
-    }
-
     /**
      * 1. Manually trigger "LoadReportUpdaterTask"
      * 2. Registry another new zk-node-listener "waitForBrokerChangeNotice".
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/BrokerServiceChaosTest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/BrokerServiceChaosTest.java
index 5650fe6e72f..6313d72329d 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/BrokerServiceChaosTest.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/BrokerServiceChaosTest.java
@@ -37,7 +37,9 @@ import org.testng.annotations.BeforeClass;
 import org.testng.annotations.Test;
 
 @Slf4j
-@Test(groups = "broker")
+// TODO: This test is in flaky group until CI is fixed.
+// To be addressed as part of https://github.com/apache/pulsar/pull/24154
+@Test(groups = "flaky")
 public class BrokerServiceChaosTest extends 
CanReconnectZKClientPulsarServiceBaseTest {
 
     @Override
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/OneWayReplicatorUsingGlobalPartitionedTest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/OneWayReplicatorUsingGlobalPartitionedTest.java
index 2a2a1befd16..a22067101c3 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/OneWayReplicatorUsingGlobalPartitionedTest.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/OneWayReplicatorUsingGlobalPartitionedTest.java
@@ -41,7 +41,9 @@ import org.testng.annotations.BeforeClass;
 import org.testng.annotations.Test;
 
 @Slf4j
-@Test(groups = "broker")
+// TODO: This test is in flaky group until CI is fixed.
+// To be addressed as part of https://github.com/apache/pulsar/pull/24154
+@Test(groups = "flaky")
 public class OneWayReplicatorUsingGlobalPartitionedTest extends 
OneWayReplicatorTest {
 
     @Override
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/ConsumerCreationTest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/ConsumerCreationTest.java
index 195485739e0..a81dbe02b34 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/ConsumerCreationTest.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/ConsumerCreationTest.java
@@ -117,7 +117,7 @@ public class ConsumerCreationTest extends 
ProducerConsumerBase {
             Consumer<byte[]> ignored =
                     
pulsarClient.newConsumer().topic(partitionedTopic).subscriptionName("my-sub").subscribe();
         } else {
-            assertThrows(PulsarClientException.NotFoundException.class, () -> {
+            
assertThrows(PulsarClientException.TopicDoesNotExistException.class, () -> {
                 @Cleanup
                 Consumer<byte[]> ignored =
                         
pulsarClient.newConsumer().topic(partitionedTopic).subscriptionName("my-sub").subscribe();
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/ProducerCreationTest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/ProducerCreationTest.java
index 48ee112d750..cd75383a487 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/ProducerCreationTest.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/ProducerCreationTest.java
@@ -258,7 +258,7 @@ public class ProducerCreationTest extends 
ProducerConsumerBase {
             @Cleanup
             Producer<byte[]> ignored = 
pulsarClient.newProducer().topic(partitionedTopic).create();
         } else {
-            assertThrows(PulsarClientException.NotFoundException.class, () -> {
+            
assertThrows(PulsarClientException.TopicDoesNotExistException.class, () -> {
                 @Cleanup
                 Producer<byte[]> ignored = 
pulsarClient.newProducer().topic(partitionedTopic).create();
             });
diff --git 
a/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/PulsarClientException.java
 
b/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/PulsarClientException.java
index 6bcba51defd..32cec950a69 100644
--- 
a/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/PulsarClientException.java
+++ 
b/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/PulsarClientException.java
@@ -961,9 +961,7 @@ public class PulsarClientException extends IOException {
     public static Throwable wrap(Throwable t, String msg) {
         msg += "\n" + t.getMessage();
         // wrap an exception with new message info
-        if (t instanceof NotFoundException) {
-            return new NotFoundException(msg);
-        } else if (t instanceof TopicDoesNotExistException) {
+        if (t instanceof TopicDoesNotExistException) {
             return new TopicDoesNotExistException(msg);
         } else if (t instanceof TimeoutException) {
             return new TimeoutException(msg);

Reply via email to