This is an automated email from the ASF dual-hosted git repository.
yubiao pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/master by this push:
new d1b7d0b7d86 [fix][broker] Correct schema deletion for parititioned
topic (#21574)
d1b7d0b7d86 is described below
commit d1b7d0b7d86a7a8883407d3dc054bb8e21f96d60
Author: fengyubiao <[email protected]>
AuthorDate: Wed Nov 22 01:32:10 2023 +0800
[fix][broker] Correct schema deletion for parititioned topic (#21574)
### Motivation
Schemas binding on the partitioned topic, but schemas will be deleted when
a partition is deleted.
### Modifications
Correct the behaviors of schema deleting:
- Pulsar deletes schema when a non-partitioned topic is deleted.
- Pulsar deletes schema when a partitioned topic metadata is deleted.
- Pulsar does not delete schema when only a part of a partitioned topic is
deleted.
---
.../broker/admin/impl/PersistentTopicsBase.java | 4 +-
.../pulsar/broker/service/BrokerService.java | 2 +-
.../broker/service/persistent/PersistentTopic.java | 10 ++
.../apache/pulsar/broker/service/TopicGCTest.java | 112 +++++++++++++++++++
.../broker/service/schema/TopicSchemaTest.java | 118 +++++++++++++++++++++
5 files changed, 244 insertions(+), 2 deletions(-)
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java
index eac4335a26d..0ef487c320d 100644
---
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java
+++
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java
@@ -722,7 +722,9 @@ public class PersistentTopicsBase extends AdminResource {
.thenCompose(unused ->
internalRemovePartitionsTopicAsync(numPartitions, force));
})
// Only tries to delete the znode for partitioned topic when
all its partitions are successfully deleted
- ).thenCompose(__ ->
getPulsarResources().getNamespaceResources().getPartitionedTopicResources()
+ ).thenCompose(ignore ->
+
pulsar().getBrokerService().deleteSchema(topicName).exceptionally(ex -> null))
+ .thenCompose(__ ->
getPulsarResources().getNamespaceResources().getPartitionedTopicResources()
.runWithMarkDeleteAsync(topicName, () ->
namespaceResources()
.getPartitionedTopicResources().deletePartitionedTopicAsync(topicName)))
.thenAccept(__ -> {
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java
index 7532bee8c12..43bf60f282e 100644
---
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java
+++
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java
@@ -3569,7 +3569,7 @@ public class BrokerService implements Closeable {
.deleteTopicPoliciesAsync(TopicName.get(topicName.getPartitionedTopicName()));
}
- CompletableFuture<SchemaVersion> deleteSchema(TopicName topicName) {
+ public CompletableFuture<SchemaVersion> deleteSchema(TopicName topicName) {
String base = topicName.getPartitionedTopicName();
String id = TopicName.get(base).getSchemaName();
SchemaRegistryService schemaRegistryService =
getPulsar().getSchemaRegistryService();
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
index 13bcf769618..eaa57140c9f 100644
---
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
+++
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
@@ -169,6 +169,7 @@ import
org.apache.pulsar.common.policies.data.stats.TopicMetricBean;
import org.apache.pulsar.common.policies.data.stats.TopicStatsImpl;
import org.apache.pulsar.common.protocol.Commands;
import org.apache.pulsar.common.protocol.schema.SchemaData;
+import org.apache.pulsar.common.protocol.schema.SchemaVersion;
import org.apache.pulsar.common.schema.SchemaType;
import org.apache.pulsar.common.topics.TopicCompactionStrategy;
import org.apache.pulsar.common.util.Codec;
@@ -2380,6 +2381,15 @@ public class PersistentTopic extends AbstractTopic
implements Topic, AddEntryCal
return Optional.ofNullable(compactor).map(c -> c.getStats());
}
+ @Override
+ public CompletableFuture<SchemaVersion> deleteSchema() {
+ if (TopicName.get(getName()).isPartitioned()) {
+ // Only delete schema when partitioned metadata is deleting.
+ return CompletableFuture.completedFuture(null);
+ }
+ return brokerService.deleteSchema(TopicName.get(getName()));
+ }
+
@Override
public CompletableFuture<PersistentTopicInternalStats>
getInternalStats(boolean includeLedgerMetadata) {
diff --git
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/TopicGCTest.java
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/TopicGCTest.java
new file mode 100644
index 00000000000..7790940c132
--- /dev/null
+++
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/TopicGCTest.java
@@ -0,0 +1,112 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.broker.service;
+
+import static org.testng.Assert.assertTrue;
+import java.util.Optional;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.TimeUnit;
+import lombok.EqualsAndHashCode;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.pulsar.broker.BrokerTestUtil;
+import org.apache.pulsar.client.api.Consumer;
+import org.apache.pulsar.client.api.Message;
+import org.apache.pulsar.client.api.MessageId;
+import org.apache.pulsar.client.api.Producer;
+import org.apache.pulsar.client.api.ProducerConsumerBase;
+import org.apache.pulsar.client.api.Schema;
+import org.apache.pulsar.common.policies.data.InactiveTopicDeleteMode;
+import org.awaitility.Awaitility;
+import org.testng.annotations.AfterClass;
+import org.testng.annotations.BeforeClass;
+import org.testng.annotations.Test;
+
+@Slf4j
+@Test(groups = "broker")
+public class TopicGCTest extends ProducerConsumerBase {
+
+ @BeforeClass(alwaysRun = true)
+ @Override
+ protected void setup() throws Exception {
+ super.internalSetup();
+ super.producerBaseSetup();
+ }
+
+ @AfterClass(alwaysRun = true)
+ @Override
+ protected void cleanup() throws Exception {
+ super.internalCleanup();
+ }
+
+ @EqualsAndHashCode.Include
+ protected void doInitConf() throws Exception {
+ super.doInitConf();
+ this.conf.setBrokerDeleteInactiveTopicsEnabled(true);
+
this.conf.setBrokerDeleteInactiveTopicsMode(InactiveTopicDeleteMode.delete_when_subscriptions_caught_up);
+ this.conf.setBrokerDeleteInactiveTopicsFrequencySeconds(10);
+ }
+
+ @Test
+ public void testCreateConsumerAfterOnePartDeleted() throws Exception {
+ final String topic =
BrokerTestUtil.newUniqueName("persistent://public/default/tp");
+ final String partition0 = topic + "-partition-0";
+ final String partition1 = topic + "-partition-1";
+ final String subscription = "s1";
+ admin.topics().createPartitionedTopic(topic, 2);
+ admin.topics().createSubscription(topic, subscription,
MessageId.earliest);
+
+ // create consumers and producers.
+ Producer<String> producer0 =
pulsarClient.newProducer(Schema.STRING).topic(partition0)
+ .enableBatching(false).create();
+ Producer<String> producer1 =
pulsarClient.newProducer(Schema.STRING).topic(partition1)
+ .enableBatching(false).create();
+ org.apache.pulsar.client.api.Consumer<String> consumer1 =
pulsarClient.newConsumer(Schema.STRING).topic(topic)
+
.subscriptionName(subscription).isAckReceiptEnabled(true).subscribe();
+
+ // Make consume all messages for one topic, do not consume any
messages for another one.
+ producer0.send("1");
+ producer1.send("2");
+ admin.topics().skipAllMessages(partition0, subscription);
+
+ // Wait for topic GC.
+ // Partition 0 will be deleted about 20s later, left 2min to avoid
flaky.
+ producer0.close();
+ consumer1.close();
+ Awaitility.await().atMost(2, TimeUnit.MINUTES).untilAsserted(() -> {
+ CompletableFuture<Optional<Topic>> tp1 =
pulsar.getBrokerService().getTopic(partition0, false);
+ CompletableFuture<Optional<Topic>> tp2 =
pulsar.getBrokerService().getTopic(partition1, false);
+ assertTrue(tp1 == null || !tp1.get().isPresent());
+ assertTrue(tp2 != null && tp2.get().isPresent());
+ });
+
+ // Verify that the consumer subscribed with partitioned topic can be
created successful.
+ Consumer<String> consumerAllPartition =
pulsarClient.newConsumer(Schema.STRING).topic(topic)
+
.subscriptionName(subscription).isAckReceiptEnabled(true).subscribe();
+ Message<String> msg = consumerAllPartition.receive(2,
TimeUnit.SECONDS);
+ String receivedMsgValue = msg.getValue();
+ log.info("received msg: {}", receivedMsgValue);
+ consumerAllPartition.acknowledge(msg);
+
+ // cleanup.
+ consumerAllPartition.close();
+ producer0.close();
+ producer1.close();
+ admin.topics().deletePartitionedTopic(topic);
+ }
+}
diff --git
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/schema/TopicSchemaTest.java
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/schema/TopicSchemaTest.java
new file mode 100644
index 00000000000..66bfd1c3ec2
--- /dev/null
+++
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/schema/TopicSchemaTest.java
@@ -0,0 +1,118 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.broker.service.schema;
+
+import static
org.apache.pulsar.broker.service.schema.SchemaRegistry.SchemaAndMetadata;
+import static org.testng.Assert.assertTrue;
+import java.util.List;
+import java.util.Objects;
+import java.util.stream.Collectors;
+import org.apache.pulsar.broker.BrokerTestUtil;
+import org.apache.pulsar.client.api.Producer;
+import org.apache.pulsar.client.api.ProducerConsumerBase;
+import org.apache.pulsar.client.api.Schema;
+import org.apache.pulsar.common.naming.TopicDomain;
+import org.apache.pulsar.common.naming.TopicName;
+import org.testng.annotations.AfterClass;
+import org.testng.annotations.BeforeClass;
+import org.testng.annotations.DataProvider;
+import org.testng.annotations.Test;
+
+@Test(groups = "broker")
+public class TopicSchemaTest extends ProducerConsumerBase {
+
+ @BeforeClass(alwaysRun = true)
+ @Override
+ protected void setup() throws Exception {
+ super.internalSetup();
+ super.producerBaseSetup();
+ }
+
+ @AfterClass(alwaysRun = true)
+ @Override
+ protected void cleanup() throws Exception {
+ super.internalCleanup();
+ }
+
+ @DataProvider(name = "topicDomains")
+ public Object[][] topicDomains() {
+ return new Object[][]{
+ {TopicDomain.non_persistent},
+ {TopicDomain.persistent}
+ };
+ }
+
+ @Test(dataProvider = "topicDomains")
+ public void testDeleteNonPartitionedTopicWithSchema(TopicDomain
topicDomain) throws Exception {
+ final String topic = BrokerTestUtil.newUniqueName(topicDomain.value()
+ "://public/default/tp");
+ final String schemaId =
TopicName.get(TopicName.get(topic).getPartitionedTopicName()).getSchemaName();
+ admin.topics().createNonPartitionedTopic(topic);
+
+ // Add schema.
+ Producer<String> producer =
pulsarClient.newProducer(Schema.STRING).topic(topic)
+ .enableBatching(false).create();
+ producer.close();
+ List<SchemaAndMetadata> schemaList1 =
pulsar.getSchemaRegistryService().getAllSchemas(schemaId).join()
+ .stream().map(s ->
s.join()).filter(Objects::nonNull).collect(Collectors.toList());
+ assertTrue(schemaList1 != null && schemaList1.size() > 0);
+
+ // Verify the schema has been deleted with topic.
+ admin.topics().delete(topic, false);
+ List<SchemaAndMetadata> schemaList2 =
pulsar.getSchemaRegistryService().getAllSchemas(schemaId).join()
+ .stream().map(s ->
s.join()).filter(Objects::nonNull).collect(Collectors.toList());
+ assertTrue(schemaList2 == null || schemaList2.isEmpty());
+ }
+
+ @Test
+ public void testDeletePartitionedTopicWithoutSchema() throws Exception {
+ // Non-persistent topic does not support partitioned topic now, so
only write a test case for persistent topic.
+ TopicDomain topicDomain = TopicDomain.persistent;
+ final String topic = BrokerTestUtil.newUniqueName(topicDomain.value()
+ "://public/default/tp");
+ final String partition0 = topic + "-partition-0";
+ final String partition1 = topic + "-partition-1";
+ final String schemaId =
TopicName.get(TopicName.get(topic).getPartitionedTopicName()).getSchemaName();
+ admin.topics().createPartitionedTopic(topic, 2);
+
+ // Add schema.
+ Producer<String> producer =
pulsarClient.newProducer(Schema.STRING).topic(topic)
+ .enableBatching(false).create();
+ producer.close();
+ List<SchemaAndMetadata> schemaList1 =
pulsar.getSchemaRegistryService().getAllSchemas(schemaId).join()
+ .stream().map(s ->
s.join()).filter(Objects::nonNull).collect(Collectors.toList());
+ assertTrue(schemaList1 != null && schemaList1.size() > 0);
+
+ // Verify the schema will not been deleted with partition-0.
+ admin.topics().delete(partition0, false);
+ List<SchemaAndMetadata> schemaList2 =
pulsar.getSchemaRegistryService().getAllSchemas(schemaId).join()
+ .stream().map(s ->
s.join()).filter(Objects::nonNull).collect(Collectors.toList());
+ assertTrue(schemaList2 != null && schemaList2.size() > 0);
+
+ // Verify the schema will not been deleted with partition-0 &
partition-1.
+ admin.topics().delete(partition1, false);
+ List<SchemaAndMetadata> schemaList3 =
pulsar.getSchemaRegistryService().getAllSchemas(schemaId).join()
+ .stream().map(s ->
s.join()).filter(Objects::nonNull).collect(Collectors.toList());
+ assertTrue(schemaList3 != null && schemaList3.size() > 0);
+
+ // Verify the schema will be deleted with partitioned metadata.
+ admin.topics().deletePartitionedTopic(topic, false);
+ List<SchemaAndMetadata> schemaList4 =
pulsar.getSchemaRegistryService().getAllSchemas(schemaId).join()
+ .stream().map(s ->
s.join()).filter(Objects::nonNull).collect(Collectors.toList());
+ assertTrue(schemaList4 == null || schemaList4.isEmpty());
+ }
+}