This is an automated email from the ASF dual-hosted git repository.

yubiao pushed a commit to branch branch-3.0
in repository https://gitbox.apache.org/repos/asf/pulsar.git

commit 0fab9ed7a1713e42bc506f5592a3930b26b60f1b
Author: fengyubiao <[email protected]>
AuthorDate: Wed Jul 10 10:28:47 2024 +0800

    [fix][admin] Fix half deletion when attempt to topic with a incorrect API 
(#23002)
    
    (cherry picked from commit 1f3449736e614428ea4d625e48cafa09b35e608d)
---
 .../broker/admin/impl/PersistentTopicsBase.java    | 12 ++++-
 .../pulsar/broker/admin/v2/PersistentTopics.java   | 14 ++++-
 .../pulsar/broker/admin/AdminTopicApiTest.java     | 62 ++++++++++++++++++++++
 3 files changed, 86 insertions(+), 2 deletions(-)

diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java
index 61b915259af..1259ca690ec 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java
@@ -815,7 +815,17 @@ public class PersistentTopicsBase extends AdminResource {
                         .thenCompose(partitionedMeta -> {
                             final int numPartitions = 
partitionedMeta.partitions;
                             if (numPartitions < 1) {
-                                return CompletableFuture.completedFuture(null);
+                                return 
pulsar().getNamespaceService().checkNonPartitionedTopicExists(topicName)
+                                        .thenApply(exists -> {
+                                    if (exists) {
+                                        throw new 
RestException(Response.Status.CONFLICT,
+                                                String.format("%s is a 
non-partitioned topic. Instead of calling"
+                                                        + " 
delete-partitioned-topic please call delete.", topicName));
+                                    } else {
+                                        throw new 
RestException(Status.NOT_FOUND,
+                                                String.format("Topic %s not 
found.", topicName));
+                                    }
+                                });
                             }
                             return 
internalRemovePartitionsAuthenticationPoliciesAsync(numPartitions)
                                     .thenCompose(unused -> 
internalRemovePartitionsTopicAsync(numPartitions, force));
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/PersistentTopics.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/PersistentTopics.java
index 037513dc073..2e1b943a2cc 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/PersistentTopics.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/PersistentTopics.java
@@ -1153,7 +1153,17 @@ public class PersistentTopics extends 
PersistentTopicsBase {
             @ApiParam(value = "Whether leader broker redirected this call to 
this broker. For internal use.")
             @QueryParam("authoritative") @DefaultValue("false") boolean 
authoritative) {
         validateTopicName(tenant, namespace, encodedTopic);
-        internalDeleteTopicAsync(authoritative, force)
+
+        
getPulsarResources().getNamespaceResources().getPartitionedTopicResources()
+                .partitionedTopicExistsAsync(topicName).thenAccept(exists -> {
+            if (exists) {
+                RestException restException = new 
RestException(Response.Status.CONFLICT,
+                        String.format("%s is a partitioned topic, instead of 
calling delete topic, please call"
+                                + " delete-partitioned-topic.", topicName));
+                resumeAsyncResponseExceptionally(asyncResponse, restException);
+                return;
+            }
+            internalDeleteTopicAsync(authoritative, force)
                 .thenAccept(__ -> 
asyncResponse.resume(Response.noContent().build()))
                 .exceptionally(ex -> {
                     Throwable t = FutureUtil.unwrapCompletionException(ex);
@@ -1172,6 +1182,8 @@ public class PersistentTopics extends 
PersistentTopicsBase {
                     resumeAsyncResponseExceptionally(asyncResponse, ex);
                     return null;
                 });
+        });
+
     }
 
     @GET
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminTopicApiTest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminTopicApiTest.java
index 93bf2349103..45bbb3a2912 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminTopicApiTest.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminTopicApiTest.java
@@ -19,21 +19,27 @@
 package org.apache.pulsar.broker.admin;
 
 import static java.nio.charset.StandardCharsets.UTF_8;
+import static org.testng.Assert.assertEquals;
 import static org.testng.Assert.assertNotNull;
 import static org.testng.Assert.assertTrue;
+import static org.testng.Assert.fail;
+import java.time.Duration;
 import java.util.List;
 import java.util.UUID;
 import java.util.concurrent.TimeUnit;
 import lombok.Cleanup;
+import org.apache.pulsar.broker.BrokerTestUtil;
 import org.apache.pulsar.client.api.Consumer;
 import org.apache.pulsar.client.api.Message;
 import org.apache.pulsar.client.api.Producer;
 import org.apache.pulsar.client.api.ProducerConsumerBase;
 import org.apache.pulsar.client.api.PulsarClient;
+import org.apache.pulsar.client.api.Schema;
 import org.apache.pulsar.common.naming.TopicDomain;
 import org.apache.pulsar.common.policies.data.TopicStats;
 import 
org.apache.pulsar.common.policies.data.stats.NonPersistentTopicStatsImpl;
 import org.apache.pulsar.common.policies.data.stats.TopicStatsImpl;
+import org.awaitility.Awaitility;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import org.testng.Assert;
@@ -59,6 +65,62 @@ public class AdminTopicApiTest extends ProducerConsumerBase {
         super.internalCleanup();
     }
 
+    @Test
+    public void testDeleteNonExistTopic() throws Exception {
+        // Case 1: call delete for a partitioned topic.
+        final String topic1 = 
BrokerTestUtil.newUniqueName("persistent://public/default/tp");
+        admin.topics().createPartitionedTopic(topic1, 2);
+        admin.schemas().createSchemaAsync(topic1, 
Schema.STRING.getSchemaInfo());
+        Awaitility.await().untilAsserted(() -> {
+            assertEquals(admin.schemas().getAllSchemas(topic1).size(), 1);
+        });
+        try {
+            admin.topics().delete(topic1);
+            fail("expected a 409 error");
+        } catch (Exception ex) {
+            assertTrue(ex.getMessage().contains("please call 
delete-partitioned-topic"));
+        }
+        Awaitility.await().pollDelay(Duration.ofSeconds(2)).untilAsserted(() 
-> {
+            assertEquals(admin.schemas().getAllSchemas(topic1).size(), 1);
+        });
+        // cleanup.
+        admin.topics().deletePartitionedTopic(topic1, false);
+
+        // Case 2: call delete-partitioned-topi for a non-partitioned topic.
+        final String topic2 = 
BrokerTestUtil.newUniqueName("persistent://public/default/tp");
+        admin.topics().createNonPartitionedTopic(topic2);
+        admin.schemas().createSchemaAsync(topic2, 
Schema.STRING.getSchemaInfo());
+        Awaitility.await().untilAsserted(() -> {
+            assertEquals(admin.schemas().getAllSchemas(topic2).size(), 1);
+        });
+        try {
+            admin.topics().deletePartitionedTopic(topic2);
+            fail("expected a 409 error");
+        } catch (Exception ex) {
+            assertTrue(ex.getMessage().contains("Instead of calling 
delete-partitioned-topic please call delete"));
+        }
+        Awaitility.await().pollDelay(Duration.ofSeconds(2)).untilAsserted(() 
-> {
+            assertEquals(admin.schemas().getAllSchemas(topic2).size(), 1);
+        });
+        // cleanup.
+        admin.topics().delete(topic2, false);
+
+        // Case 3: delete topic does not exist.
+        final String topic3 = 
BrokerTestUtil.newUniqueName("persistent://public/default/tp");
+        try {
+            admin.topics().delete(topic3);
+            fail("expected a 404 error");
+        } catch (Exception ex) {
+            assertTrue(ex.getMessage().contains("not found"));
+        }
+        try {
+            admin.topics().deletePartitionedTopic(topic3);
+            fail("expected a 404 error");
+        } catch (Exception ex) {
+            assertTrue(ex.getMessage().contains("not found"));
+        }
+    }
+
     @Test
     public void testPeekMessages() throws Exception {
         @Cleanup

Reply via email to