This is an automated email from the ASF dual-hosted git repository.
technoboy pushed a commit to branch branch-3.3
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/branch-3.3 by this push:
new 617d61314a8 [fix][admin] Fix half deletion when attempt to topic with
a incorrect API (#23002)
617d61314a8 is described below
commit 617d61314a8f60bbd95ad88f135a3907f67a6f07
Author: fengyubiao <[email protected]>
AuthorDate: Wed Jul 10 10:28:47 2024 +0800
[fix][admin] Fix half deletion when attempt to topic with a incorrect API
(#23002)
---
.../broker/admin/impl/PersistentTopicsBase.java | 12 ++++-
.../pulsar/broker/admin/v2/PersistentTopics.java | 14 ++++-
.../pulsar/broker/admin/AdminTopicApiTest.java | 61 ++++++++++++++++++++++
3 files changed, 85 insertions(+), 2 deletions(-)
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java
index 887591c0feb..36a8394eedb 100644
---
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java
+++
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java
@@ -741,7 +741,17 @@ public class PersistentTopicsBase extends AdminResource {
.thenCompose(partitionedMeta -> {
final int numPartitions =
partitionedMeta.partitions;
if (numPartitions < 1) {
- return CompletableFuture.completedFuture(null);
+ return
pulsar().getNamespaceService().checkNonPartitionedTopicExists(topicName)
+ .thenApply(exists -> {
+ if (exists) {
+ throw new
RestException(Response.Status.CONFLICT,
+ String.format("%s is a
non-partitioned topic. Instead of calling"
+ + "
delete-partitioned-topic please call delete.", topicName));
+ } else {
+ throw new
RestException(Status.NOT_FOUND,
+ String.format("Topic %s not
found.", topicName));
+ }
+ });
}
return
internalRemovePartitionsAuthenticationPoliciesAsync()
.thenCompose(unused ->
internalRemovePartitionsTopicAsync(numPartitions, force));
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/PersistentTopics.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/PersistentTopics.java
index 8a1f4e0dc56..d0fe3c7c745 100644
---
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/PersistentTopics.java
+++
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/PersistentTopics.java
@@ -1167,7 +1167,17 @@ public class PersistentTopics extends
PersistentTopicsBase {
@ApiParam(value = "Whether leader broker redirected this call to
this broker. For internal use.")
@QueryParam("authoritative") @DefaultValue("false") boolean
authoritative) {
validateTopicName(tenant, namespace, encodedTopic);
- internalDeleteTopicAsync(authoritative, force)
+
+
getPulsarResources().getNamespaceResources().getPartitionedTopicResources()
+ .partitionedTopicExistsAsync(topicName).thenAccept(exists -> {
+ if (exists) {
+ RestException restException = new
RestException(Response.Status.CONFLICT,
+ String.format("%s is a partitioned topic, instead of
calling delete topic, please call"
+ + " delete-partitioned-topic.", topicName));
+ resumeAsyncResponseExceptionally(asyncResponse, restException);
+ return;
+ }
+ internalDeleteTopicAsync(authoritative, force)
.thenAccept(__ ->
asyncResponse.resume(Response.noContent().build()))
.exceptionally(ex -> {
Throwable t = FutureUtil.unwrapCompletionException(ex);
@@ -1186,6 +1196,8 @@ public class PersistentTopics extends
PersistentTopicsBase {
resumeAsyncResponseExceptionally(asyncResponse, ex);
return null;
});
+ });
+
}
@GET
diff --git
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminTopicApiTest.java
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminTopicApiTest.java
index a1ed4271616..0a334cd7e81 100644
---
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminTopicApiTest.java
+++
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminTopicApiTest.java
@@ -23,6 +23,8 @@ import static org.testng.Assert.assertEquals;
import static org.testng.Assert.assertNotNull;
import static org.testng.Assert.assertThrows;
import static org.testng.Assert.assertTrue;
+import static org.testng.Assert.fail;
+import java.time.Duration;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
@@ -32,6 +34,7 @@ import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import lombok.Cleanup;
+import org.apache.pulsar.broker.BrokerTestUtil;
import org.apache.pulsar.client.admin.PulsarAdminException;
import org.apache.pulsar.client.api.Consumer;
import org.apache.pulsar.client.api.Message;
@@ -40,12 +43,14 @@ import org.apache.pulsar.client.api.Producer;
import org.apache.pulsar.client.api.ProducerConsumerBase;
import org.apache.pulsar.client.api.PulsarClient;
import org.apache.pulsar.client.api.PulsarClientException;
+import org.apache.pulsar.client.api.Schema;
import org.apache.pulsar.client.impl.MessageIdImpl;
import org.apache.pulsar.common.naming.TopicDomain;
import org.apache.pulsar.common.policies.data.TopicStats;
import
org.apache.pulsar.common.policies.data.stats.NonPersistentTopicStatsImpl;
import org.apache.pulsar.common.policies.data.stats.TopicStatsImpl;
import org.apache.pulsar.common.util.FutureUtil;
+import org.awaitility.Awaitility;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.testng.Assert;
@@ -71,6 +76,62 @@ public class AdminTopicApiTest extends ProducerConsumerBase {
super.internalCleanup();
}
+ @Test
+ public void testDeleteNonExistTopic() throws Exception {
+ // Case 1: call delete for a partitioned topic.
+ final String topic1 =
BrokerTestUtil.newUniqueName("persistent://public/default/tp");
+ admin.topics().createPartitionedTopic(topic1, 2);
+ admin.schemas().createSchemaAsync(topic1,
Schema.STRING.getSchemaInfo());
+ Awaitility.await().untilAsserted(() -> {
+ assertEquals(admin.schemas().getAllSchemas(topic1).size(), 1);
+ });
+ try {
+ admin.topics().delete(topic1);
+ fail("expected a 409 error");
+ } catch (Exception ex) {
+ assertTrue(ex.getMessage().contains("please call
delete-partitioned-topic"));
+ }
+ Awaitility.await().pollDelay(Duration.ofSeconds(2)).untilAsserted(()
-> {
+ assertEquals(admin.schemas().getAllSchemas(topic1).size(), 1);
+ });
+ // cleanup.
+ admin.topics().deletePartitionedTopic(topic1, false);
+
+ // Case 2: call delete-partitioned-topi for a non-partitioned topic.
+ final String topic2 =
BrokerTestUtil.newUniqueName("persistent://public/default/tp");
+ admin.topics().createNonPartitionedTopic(topic2);
+ admin.schemas().createSchemaAsync(topic2,
Schema.STRING.getSchemaInfo());
+ Awaitility.await().untilAsserted(() -> {
+ assertEquals(admin.schemas().getAllSchemas(topic2).size(), 1);
+ });
+ try {
+ admin.topics().deletePartitionedTopic(topic2);
+ fail("expected a 409 error");
+ } catch (Exception ex) {
+ assertTrue(ex.getMessage().contains("Instead of calling
delete-partitioned-topic please call delete"));
+ }
+ Awaitility.await().pollDelay(Duration.ofSeconds(2)).untilAsserted(()
-> {
+ assertEquals(admin.schemas().getAllSchemas(topic2).size(), 1);
+ });
+ // cleanup.
+ admin.topics().delete(topic2, false);
+
+ // Case 3: delete topic does not exist.
+ final String topic3 =
BrokerTestUtil.newUniqueName("persistent://public/default/tp");
+ try {
+ admin.topics().delete(topic3);
+ fail("expected a 404 error");
+ } catch (Exception ex) {
+ assertTrue(ex.getMessage().contains("not found"));
+ }
+ try {
+ admin.topics().deletePartitionedTopic(topic3);
+ fail("expected a 404 error");
+ } catch (Exception ex) {
+ assertTrue(ex.getMessage().contains("not found"));
+ }
+ }
+
@Test
public void testPeekMessages() throws Exception {
@Cleanup