This is an automated email from the ASF dual-hosted git repository.

penghui pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/master by this push:
     new e6d9df8  [Broker] Correct param of delete method for v1 topic (#12936)
e6d9df8 is described below

commit e6d9df81c446870107dbb8d8e454b11b71cc9255
Author: Ruguo Yu <[email protected]>
AuthorDate: Wed Nov 24 08:37:18 2021 +0800

    [Broker] Correct param of delete method for v1 topic (#12936)
---
 .../pulsar/broker/admin/v1/PersistentTopics.java   | 10 ++-
 .../java/org/apache/pulsar/schema/SchemaTest.java  | 90 ++++++++++++++++++++++
 2 files changed, 96 insertions(+), 4 deletions(-)

diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v1/PersistentTopics.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v1/PersistentTopics.java
index 2917482..09bb34b 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v1/PersistentTopics.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v1/PersistentTopics.java
@@ -264,10 +264,11 @@ public class PersistentTopics extends 
PersistentTopicsBase {
             @PathParam("property") String property, @PathParam("cluster") 
String cluster,
             @PathParam("namespace") String namespace, @PathParam("topic") 
@Encoded String encodedTopic,
             @QueryParam("force") @DefaultValue("false") boolean force,
-            @QueryParam("authoritative") @DefaultValue("false") boolean 
authoritative) {
+            @QueryParam("authoritative") @DefaultValue("false") boolean 
authoritative,
+            @QueryParam("deleteSchema") @DefaultValue("false") boolean 
deleteSchema) {
         try {
             validateTopicName(property, cluster, namespace, encodedTopic);
-            internalDeletePartitionedTopic(asyncResponse, authoritative, 
force, false);
+            internalDeletePartitionedTopic(asyncResponse, authoritative, 
force, deleteSchema);
         } catch (WebApplicationException wae) {
             asyncResponse.resume(wae);
         } catch (Exception e) {
@@ -302,9 +303,10 @@ public class PersistentTopics extends PersistentTopicsBase 
{
     public void deleteTopic(@PathParam("property") String property, 
@PathParam("cluster") String cluster,
             @PathParam("namespace") String namespace, @PathParam("topic") 
@Encoded String encodedTopic,
             @QueryParam("force") @DefaultValue("false") boolean force,
-            @QueryParam("authoritative") @DefaultValue("false") boolean 
authoritative) {
+            @QueryParam("authoritative") @DefaultValue("false") boolean 
authoritative,
+            @QueryParam("deleteSchema") @DefaultValue("false") boolean 
deleteSchema) {
         validateTopicName(property, cluster, namespace, encodedTopic);
-        internalDeleteTopic(authoritative, force);
+        internalDeleteTopic(authoritative, force, deleteSchema);
     }
 
     @GET
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/schema/SchemaTest.java 
b/pulsar-broker/src/test/java/org/apache/pulsar/schema/SchemaTest.java
index 0a4512d..5c83361 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/schema/SchemaTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/schema/SchemaTest.java
@@ -776,6 +776,96 @@ public class SchemaTest extends 
MockedPulsarServiceBaseTest {
     }
 
     @Test
+    public void testDeleteTopicAndSchemaForV1() throws Exception {
+        final String tenant = PUBLIC_TENANT;
+        final String cluster = CLUSTER_NAME;
+        final String namespace = "test-namespace-" + randomName(16);
+        final String topicOne = "not-partitioned-topic";
+        final String topic2 = "persistent://" + tenant + "/" + cluster + "/" + 
namespace + "/partitioned-topic";
+
+        // persistent, not-partitioned v1/topic
+        final String topic1 = TopicName.get(
+                TopicDomain.persistent.value(),
+                tenant,
+                cluster,
+                namespace,
+                topicOne).toString();
+
+        // persistent, partitioned v1/topic
+        admin.topics().createPartitionedTopic(topic2, 1);
+
+        @Cleanup
+        Producer<Schemas.PersonOne> p1_1 = 
pulsarClient.newProducer(Schema.JSON(Schemas.PersonOne.class))
+                .topic(topic1)
+                .create();
+
+        @Cleanup
+        Producer<Schemas.PersonThree> p1_2 = 
pulsarClient.newProducer(Schema.JSON(Schemas.PersonThree.class))
+                .topic(topic1)
+                .create();
+        @Cleanup
+        Producer<Schemas.PersonThree> p2_1 = 
pulsarClient.newProducer(Schema.JSON(Schemas.PersonThree.class))
+                .topic(topic2)
+                .create();
+
+        List<CompletableFuture<SchemaRegistry.SchemaAndMetadata>> 
schemaFutures1 =
+                
this.getPulsar().getSchemaRegistryService().getAllSchemas(TopicName.get(topic1).getSchemaName()).get();
+        FutureUtil.waitForAll(schemaFutures1).get();
+        List<SchemaRegistry.SchemaAndMetadata> schemas1 = 
schemaFutures1.stream().map(future -> {
+            try {
+                return future.get();
+            } catch (Exception e) {
+                return null;
+            }
+        }).collect(Collectors.toList());
+        assertEquals(schemas1.size(), 2);
+        for (SchemaRegistry.SchemaAndMetadata schema : schemas1) {
+            assertNotNull(schema);
+        }
+
+        List<CompletableFuture<SchemaRegistry.SchemaAndMetadata>> 
schemaFutures2 =
+                
this.getPulsar().getSchemaRegistryService().getAllSchemas(TopicName.get(topic2).getSchemaName()).get();
+        FutureUtil.waitForAll(schemaFutures2).get();
+        List<SchemaRegistry.SchemaAndMetadata> schemas2 = 
schemaFutures2.stream().map(future -> {
+            try {
+                return future.get();
+            } catch (Exception e) {
+                return null;
+            }
+        }).collect(Collectors.toList());
+        assertEquals(schemas2.size(), 1);
+        for (SchemaRegistry.SchemaAndMetadata schema : schemas2) {
+            assertNotNull(schema);
+        }
+
+        // not-force and not-delete-schema when delete topic
+        try {
+            admin.topics().delete(topic1, false, false);
+            fail();
+        } catch (Exception e) {
+            assertTrue(e.getMessage().startsWith("Topic has active 
producers/subscriptions"));
+        }
+        assertEquals(this.getPulsar().getSchemaRegistryService()
+                
.trimDeletedSchemaAndGetList(TopicName.get(topic1).getSchemaName()).get().size(),
 2);
+        try {
+            admin.topics().deletePartitionedTopic(topic2, false, false);
+            fail();
+        } catch (Exception e) {
+            assertTrue(e.getMessage().startsWith("Topic has active 
producers/subscriptions"));
+        }
+        assertEquals(this.getPulsar().getSchemaRegistryService()
+                
.trimDeletedSchemaAndGetList(TopicName.get(topic2).getSchemaName()).get().size(),
 1);
+
+        // force and delete-schema when delete topic
+        admin.topics().delete(topic1, true, true);
+        assertEquals(this.getPulsar().getSchemaRegistryService()
+                
.trimDeletedSchemaAndGetList(TopicName.get(topic1).getSchemaName()).get().size(),
 0);
+        admin.topics().deletePartitionedTopic(topic2, true, true);
+        assertEquals(this.getPulsar().getSchemaRegistryService()
+                
.trimDeletedSchemaAndGetList(TopicName.get(topic2).getSchemaName()).get().size(),
 0);
+    }
+
+    @Test
     public void testProducerMultipleSchemaMessages() throws Exception {
         final String tenant = PUBLIC_TENANT;
         final String namespace = "test-namespace-" + randomName(16);

Reply via email to