This is an automated email from the ASF dual-hosted git repository.

yubiao pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/master by this push:
     new d1b7d0b7d86 [fix][broker] Correct schema deletion for parititioned 
topic (#21574)
d1b7d0b7d86 is described below

commit d1b7d0b7d86a7a8883407d3dc054bb8e21f96d60
Author: fengyubiao <[email protected]>
AuthorDate: Wed Nov 22 01:32:10 2023 +0800

    [fix][broker] Correct schema deletion for parititioned topic (#21574)
    
    ### Motivation
    
    Schemas binding on the partitioned topic, but schemas will be deleted when 
a partition is deleted.
    
    ### Modifications
    
    Correct the behaviors of schema deleting:
    - Pulsar deletes schema when a non-partitioned topic is deleted.
    - Pulsar deletes schema when a partitioned topic metadata is deleted.
    - Pulsar does not delete schema when only a part of a partitioned topic is 
deleted.
---
 .../broker/admin/impl/PersistentTopicsBase.java    |   4 +-
 .../pulsar/broker/service/BrokerService.java       |   2 +-
 .../broker/service/persistent/PersistentTopic.java |  10 ++
 .../apache/pulsar/broker/service/TopicGCTest.java  | 112 +++++++++++++++++++
 .../broker/service/schema/TopicSchemaTest.java     | 118 +++++++++++++++++++++
 5 files changed, 244 insertions(+), 2 deletions(-)

diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java
index eac4335a26d..0ef487c320d 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java
@@ -722,7 +722,9 @@ public class PersistentTopicsBase extends AdminResource {
                                     .thenCompose(unused -> 
internalRemovePartitionsTopicAsync(numPartitions, force));
                         })
                 // Only tries to delete the znode for partitioned topic when 
all its partitions are successfully deleted
-                ).thenCompose(__ -> 
getPulsarResources().getNamespaceResources().getPartitionedTopicResources()
+                ).thenCompose(ignore ->
+                        
pulsar().getBrokerService().deleteSchema(topicName).exceptionally(ex -> null))
+                .thenCompose(__ -> 
getPulsarResources().getNamespaceResources().getPartitionedTopicResources()
                         .runWithMarkDeleteAsync(topicName, () -> 
namespaceResources()
                                 
.getPartitionedTopicResources().deletePartitionedTopicAsync(topicName)))
                 .thenAccept(__ -> {
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java
index 7532bee8c12..43bf60f282e 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java
@@ -3569,7 +3569,7 @@ public class BrokerService implements Closeable {
                 
.deleteTopicPoliciesAsync(TopicName.get(topicName.getPartitionedTopicName()));
     }
 
-    CompletableFuture<SchemaVersion> deleteSchema(TopicName topicName) {
+    public CompletableFuture<SchemaVersion> deleteSchema(TopicName topicName) {
         String base = topicName.getPartitionedTopicName();
         String id = TopicName.get(base).getSchemaName();
         SchemaRegistryService schemaRegistryService = 
getPulsar().getSchemaRegistryService();
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
index 13bcf769618..eaa57140c9f 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
@@ -169,6 +169,7 @@ import 
org.apache.pulsar.common.policies.data.stats.TopicMetricBean;
 import org.apache.pulsar.common.policies.data.stats.TopicStatsImpl;
 import org.apache.pulsar.common.protocol.Commands;
 import org.apache.pulsar.common.protocol.schema.SchemaData;
+import org.apache.pulsar.common.protocol.schema.SchemaVersion;
 import org.apache.pulsar.common.schema.SchemaType;
 import org.apache.pulsar.common.topics.TopicCompactionStrategy;
 import org.apache.pulsar.common.util.Codec;
@@ -2380,6 +2381,15 @@ public class PersistentTopic extends AbstractTopic 
implements Topic, AddEntryCal
         return Optional.ofNullable(compactor).map(c -> c.getStats());
     }
 
+    @Override
+    public CompletableFuture<SchemaVersion> deleteSchema() {
+        if (TopicName.get(getName()).isPartitioned()) {
+            // Only delete schema when partitioned metadata is deleting.
+            return CompletableFuture.completedFuture(null);
+        }
+        return brokerService.deleteSchema(TopicName.get(getName()));
+    }
+
     @Override
     public CompletableFuture<PersistentTopicInternalStats> 
getInternalStats(boolean includeLedgerMetadata) {
 
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/TopicGCTest.java 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/TopicGCTest.java
new file mode 100644
index 00000000000..7790940c132
--- /dev/null
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/TopicGCTest.java
@@ -0,0 +1,112 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.broker.service;
+
+import static org.testng.Assert.assertTrue;
+import java.util.Optional;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.TimeUnit;
+import lombok.EqualsAndHashCode;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.pulsar.broker.BrokerTestUtil;
+import org.apache.pulsar.client.api.Consumer;
+import org.apache.pulsar.client.api.Message;
+import org.apache.pulsar.client.api.MessageId;
+import org.apache.pulsar.client.api.Producer;
+import org.apache.pulsar.client.api.ProducerConsumerBase;
+import org.apache.pulsar.client.api.Schema;
+import org.apache.pulsar.common.policies.data.InactiveTopicDeleteMode;
+import org.awaitility.Awaitility;
+import org.testng.annotations.AfterClass;
+import org.testng.annotations.BeforeClass;
+import org.testng.annotations.Test;
+
+@Slf4j
+@Test(groups = "broker")
+public class TopicGCTest extends ProducerConsumerBase {
+
+    @BeforeClass(alwaysRun = true)
+    @Override
+    protected void setup() throws Exception {
+        super.internalSetup();
+        super.producerBaseSetup();
+    }
+
+    @AfterClass(alwaysRun = true)
+    @Override
+    protected void cleanup() throws Exception {
+        super.internalCleanup();
+    }
+
+    @EqualsAndHashCode.Include
+    protected void doInitConf() throws Exception {
+        super.doInitConf();
+        this.conf.setBrokerDeleteInactiveTopicsEnabled(true);
+        
this.conf.setBrokerDeleteInactiveTopicsMode(InactiveTopicDeleteMode.delete_when_subscriptions_caught_up);
+        this.conf.setBrokerDeleteInactiveTopicsFrequencySeconds(10);
+    }
+
+    @Test
+    public void testCreateConsumerAfterOnePartDeleted() throws Exception {
+        final String topic = 
BrokerTestUtil.newUniqueName("persistent://public/default/tp");
+        final String partition0 = topic + "-partition-0";
+        final String partition1 = topic + "-partition-1";
+        final String subscription = "s1";
+        admin.topics().createPartitionedTopic(topic, 2);
+        admin.topics().createSubscription(topic, subscription, 
MessageId.earliest);
+
+        // create consumers and producers.
+        Producer<String> producer0 = 
pulsarClient.newProducer(Schema.STRING).topic(partition0)
+                .enableBatching(false).create();
+        Producer<String> producer1 = 
pulsarClient.newProducer(Schema.STRING).topic(partition1)
+                .enableBatching(false).create();
+        org.apache.pulsar.client.api.Consumer<String> consumer1 = 
pulsarClient.newConsumer(Schema.STRING).topic(topic)
+                
.subscriptionName(subscription).isAckReceiptEnabled(true).subscribe();
+
+        // Make consume all messages for one topic, do not consume any 
messages for another one.
+        producer0.send("1");
+        producer1.send("2");
+        admin.topics().skipAllMessages(partition0, subscription);
+
+        // Wait for topic GC.
+        // Partition 0 will be deleted about 20s later, left 2min to avoid 
flaky.
+        producer0.close();
+        consumer1.close();
+        Awaitility.await().atMost(2, TimeUnit.MINUTES).untilAsserted(() -> {
+            CompletableFuture<Optional<Topic>> tp1 = 
pulsar.getBrokerService().getTopic(partition0, false);
+            CompletableFuture<Optional<Topic>> tp2 = 
pulsar.getBrokerService().getTopic(partition1, false);
+            assertTrue(tp1 == null || !tp1.get().isPresent());
+            assertTrue(tp2 != null && tp2.get().isPresent());
+        });
+
+        // Verify that the consumer subscribed with partitioned topic can be 
created successful.
+        Consumer<String> consumerAllPartition = 
pulsarClient.newConsumer(Schema.STRING).topic(topic)
+                
.subscriptionName(subscription).isAckReceiptEnabled(true).subscribe();
+        Message<String> msg = consumerAllPartition.receive(2, 
TimeUnit.SECONDS);
+        String receivedMsgValue = msg.getValue();
+        log.info("received msg: {}", receivedMsgValue);
+        consumerAllPartition.acknowledge(msg);
+
+        // cleanup.
+        consumerAllPartition.close();
+        producer0.close();
+        producer1.close();
+        admin.topics().deletePartitionedTopic(topic);
+    }
+}
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/schema/TopicSchemaTest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/schema/TopicSchemaTest.java
new file mode 100644
index 00000000000..66bfd1c3ec2
--- /dev/null
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/schema/TopicSchemaTest.java
@@ -0,0 +1,118 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.broker.service.schema;
+
+import static 
org.apache.pulsar.broker.service.schema.SchemaRegistry.SchemaAndMetadata;
+import static org.testng.Assert.assertTrue;
+import java.util.List;
+import java.util.Objects;
+import java.util.stream.Collectors;
+import org.apache.pulsar.broker.BrokerTestUtil;
+import org.apache.pulsar.client.api.Producer;
+import org.apache.pulsar.client.api.ProducerConsumerBase;
+import org.apache.pulsar.client.api.Schema;
+import org.apache.pulsar.common.naming.TopicDomain;
+import org.apache.pulsar.common.naming.TopicName;
+import org.testng.annotations.AfterClass;
+import org.testng.annotations.BeforeClass;
+import org.testng.annotations.DataProvider;
+import org.testng.annotations.Test;
+
+@Test(groups = "broker")
+public class TopicSchemaTest extends ProducerConsumerBase {
+
+    @BeforeClass(alwaysRun = true)
+    @Override
+    protected void setup() throws Exception {
+        super.internalSetup();
+        super.producerBaseSetup();
+    }
+
+    @AfterClass(alwaysRun = true)
+    @Override
+    protected void cleanup() throws Exception {
+        super.internalCleanup();
+    }
+
+    @DataProvider(name = "topicDomains")
+    public Object[][] topicDomains() {
+        return new Object[][]{
+                {TopicDomain.non_persistent},
+                {TopicDomain.persistent}
+        };
+    }
+
+    @Test(dataProvider = "topicDomains")
+    public void testDeleteNonPartitionedTopicWithSchema(TopicDomain 
topicDomain) throws Exception {
+        final String topic = BrokerTestUtil.newUniqueName(topicDomain.value() 
+ "://public/default/tp");
+        final String schemaId = 
TopicName.get(TopicName.get(topic).getPartitionedTopicName()).getSchemaName();
+        admin.topics().createNonPartitionedTopic(topic);
+
+        // Add schema.
+        Producer<String> producer = 
pulsarClient.newProducer(Schema.STRING).topic(topic)
+                .enableBatching(false).create();
+        producer.close();
+        List<SchemaAndMetadata> schemaList1 = 
pulsar.getSchemaRegistryService().getAllSchemas(schemaId).join()
+                .stream().map(s -> 
s.join()).filter(Objects::nonNull).collect(Collectors.toList());
+        assertTrue(schemaList1 != null && schemaList1.size() > 0);
+
+        // Verify the schema has been deleted with topic.
+        admin.topics().delete(topic, false);
+        List<SchemaAndMetadata> schemaList2 = 
pulsar.getSchemaRegistryService().getAllSchemas(schemaId).join()
+                .stream().map(s -> 
s.join()).filter(Objects::nonNull).collect(Collectors.toList());
+        assertTrue(schemaList2 == null || schemaList2.isEmpty());
+    }
+
+    @Test
+    public void testDeletePartitionedTopicWithoutSchema() throws Exception {
+        // Non-persistent topic does not support partitioned topic now, so 
only write a test case for persistent topic.
+        TopicDomain topicDomain = TopicDomain.persistent;
+        final String topic = BrokerTestUtil.newUniqueName(topicDomain.value() 
+ "://public/default/tp");
+        final String partition0 = topic + "-partition-0";
+        final String partition1 = topic + "-partition-1";
+        final String schemaId = 
TopicName.get(TopicName.get(topic).getPartitionedTopicName()).getSchemaName();
+        admin.topics().createPartitionedTopic(topic, 2);
+
+        // Add schema.
+        Producer<String> producer = 
pulsarClient.newProducer(Schema.STRING).topic(topic)
+                .enableBatching(false).create();
+        producer.close();
+        List<SchemaAndMetadata> schemaList1 = 
pulsar.getSchemaRegistryService().getAllSchemas(schemaId).join()
+                .stream().map(s -> 
s.join()).filter(Objects::nonNull).collect(Collectors.toList());
+        assertTrue(schemaList1 != null && schemaList1.size() > 0);
+
+        // Verify the schema will not been deleted with partition-0.
+        admin.topics().delete(partition0, false);
+        List<SchemaAndMetadata> schemaList2 = 
pulsar.getSchemaRegistryService().getAllSchemas(schemaId).join()
+                .stream().map(s -> 
s.join()).filter(Objects::nonNull).collect(Collectors.toList());
+        assertTrue(schemaList2 != null && schemaList2.size() > 0);
+
+        // Verify the schema will not been deleted with partition-0 & 
partition-1.
+        admin.topics().delete(partition1, false);
+        List<SchemaAndMetadata> schemaList3 = 
pulsar.getSchemaRegistryService().getAllSchemas(schemaId).join()
+                .stream().map(s -> 
s.join()).filter(Objects::nonNull).collect(Collectors.toList());
+        assertTrue(schemaList3 != null && schemaList3.size() > 0);
+
+        // Verify the schema will be deleted with partitioned metadata.
+        admin.topics().deletePartitionedTopic(topic, false);
+        List<SchemaAndMetadata> schemaList4 = 
pulsar.getSchemaRegistryService().getAllSchemas(schemaId).join()
+                .stream().map(s -> 
s.join()).filter(Objects::nonNull).collect(Collectors.toList());
+        assertTrue(schemaList4 == null || schemaList4.isEmpty());
+    }
+}

Reply via email to