This is an automated email from the ASF dual-hosted git repository.
zike pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/master by this push:
new a1405ea006f [fix][broker] Fix deleting topic not delete the related
topic policy and schema. (#21093)
a1405ea006f is described below
commit a1405ea006f175b1bd0b9d28b9444d592fb4a010
Author: Jiwei Guo <[email protected]>
AuthorDate: Fri Sep 1 18:07:46 2023 +0800
[fix][broker] Fix deleting topic not delete the related topic policy and
schema. (#21093)
Fixes #21075
### Motivation
When the topic is loaded, it will delete the topic-level policy if it is
enabled. But if the topic is not loaded, it will directly delete through
managed ledger factory. But then we will leave the topic policy there. When the
topic is created next time, it will use the old topic policy
### Modifications
When deleting the topic, delete the schema and topic policies even if the
topic is not loaded.
---
.../pulsar/broker/service/AbstractTopic.java | 17 +------
.../pulsar/broker/service/BrokerService.java | 55 +++++++++++++++++-----
.../broker/service/BrokerBkEnsemblesTests.java | 32 ++++++++++---
.../systopic/PartitionedSystemTopicTest.java | 25 ++++++++++
4 files changed, 94 insertions(+), 35 deletions(-)
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractTopic.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractTopic.java
index b15f8cbf0b8..cef2dd2080c 100644
---
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractTopic.java
+++
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractTopic.java
@@ -58,7 +58,6 @@ import
org.apache.pulsar.broker.service.BrokerServiceException.ProducerFencedExc
import
org.apache.pulsar.broker.service.BrokerServiceException.TopicMigratedException;
import
org.apache.pulsar.broker.service.BrokerServiceException.TopicTerminatedException;
import org.apache.pulsar.broker.service.plugin.EntryFilter;
-import org.apache.pulsar.broker.service.schema.BookkeeperSchemaStorage;
import org.apache.pulsar.broker.service.schema.SchemaRegistryService;
import
org.apache.pulsar.broker.service.schema.exceptions.IncompatibleSchemaException;
import org.apache.pulsar.broker.stats.prometheus.metrics.Summary;
@@ -674,21 +673,7 @@ public abstract class AbstractTopic implements Topic,
TopicPolicyListener<TopicP
@Override
public CompletableFuture<SchemaVersion> deleteSchema() {
- String id = getSchemaId();
- SchemaRegistryService schemaRegistryService =
brokerService.pulsar().getSchemaRegistryService();
- return
BookkeeperSchemaStorage.ignoreUnrecoverableBKException(schemaRegistryService.getSchema(id))
- .thenCompose(schema -> {
- if (schema != null) {
- // It's different from `SchemasResource.deleteSchema`
- // because when we delete a topic, the schema
- // history is meaningless. But when we delete a schema
of a topic, a new schema could be
- // registered in the future.
- log.info("Delete schema storage of id: {}", id);
- return schemaRegistryService.deleteSchemaStorage(id);
- } else {
- return CompletableFuture.completedFuture(null);
- }
- });
+ return brokerService.deleteSchema(TopicName.get(getName()));
}
@Override
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java
index 199901b0902..391affef1da 100644
---
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java
+++
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java
@@ -119,6 +119,8 @@ import
org.apache.pulsar.broker.service.persistent.PersistentDispatcherMultipleC
import org.apache.pulsar.broker.service.persistent.PersistentTopic;
import org.apache.pulsar.broker.service.persistent.SystemTopic;
import org.apache.pulsar.broker.service.plugin.EntryFilterProvider;
+import org.apache.pulsar.broker.service.schema.BookkeeperSchemaStorage;
+import org.apache.pulsar.broker.service.schema.SchemaRegistryService;
import org.apache.pulsar.broker.stats.ClusterReplicationMetrics;
import org.apache.pulsar.broker.stats.prometheus.metrics.ObserverGauge;
import org.apache.pulsar.broker.stats.prometheus.metrics.Summary;
@@ -159,6 +161,7 @@ import
org.apache.pulsar.common.policies.data.RetentionPolicies;
import org.apache.pulsar.common.policies.data.TopicPolicies;
import org.apache.pulsar.common.policies.data.TopicType;
import org.apache.pulsar.common.policies.data.stats.TopicStatsImpl;
+import org.apache.pulsar.common.protocol.schema.SchemaVersion;
import org.apache.pulsar.common.stats.Metrics;
import org.apache.pulsar.common.util.FieldParser;
import org.apache.pulsar.common.util.FutureUtil;
@@ -1156,26 +1159,33 @@ public class BrokerService implements Closeable {
CompletableFuture<Void> future = new CompletableFuture<>();
CompletableFuture<Void> deleteTopicAuthenticationFuture = new
CompletableFuture<>();
deleteTopicAuthenticationWithRetry(topic,
deleteTopicAuthenticationFuture, 5);
-
- deleteTopicAuthenticationFuture.whenComplete((v, ex) -> {
+ deleteTopicAuthenticationFuture
+ .thenCompose(__ -> deleteSchema(tn))
+ .thenCompose(__ -> {
+ if (!SystemTopicNames.isTopicPoliciesSystemTopic(topic)
+ && getPulsar().getConfiguration().isSystemTopicEnabled()) {
+ return deleteTopicPolicies(tn);
+ }
+ return CompletableFuture.completedFuture(null);
+ }).whenComplete((v, ex) -> {
if (ex != null) {
future.completeExceptionally(ex);
return;
}
CompletableFuture<ManagedLedgerConfig> mlConfigFuture =
getManagedLedgerConfig(topicName);
managedLedgerFactory.asyncDelete(tn.getPersistenceNamingEncoding(),
- mlConfigFuture, new DeleteLedgerCallback() {
- @Override
- public void deleteLedgerComplete(Object ctx) {
- future.complete(null);
- }
+ mlConfigFuture, new DeleteLedgerCallback() {
+ @Override
+ public void deleteLedgerComplete(Object ctx) {
+ future.complete(null);
+ }
- @Override
- public void deleteLedgerFailed(ManagedLedgerException
exception, Object ctx) {
- future.completeExceptionally(exception);
- }
- }, null);
- });
+ @Override
+ public void deleteLedgerFailed(ManagedLedgerException
exception, Object ctx) {
+ future.completeExceptionally(exception);
+ }
+ }, null);
+ });
return future;
}
@@ -3451,6 +3461,25 @@ public class BrokerService implements Closeable {
.deleteTopicPoliciesAsync(TopicName.get(topicName.getPartitionedTopicName()));
}
+ CompletableFuture<SchemaVersion> deleteSchema(TopicName topicName) {
+ String base = topicName.getPartitionedTopicName();
+ String id = TopicName.get(base).getSchemaName();
+ SchemaRegistryService schemaRegistryService =
getPulsar().getSchemaRegistryService();
+ return
BookkeeperSchemaStorage.ignoreUnrecoverableBKException(schemaRegistryService.getSchema(id))
+ .thenCompose(schema -> {
+ if (schema != null) {
+ // It's different from `SchemasResource.deleteSchema`
+ // because when we delete a topic, the schema
+ // history is meaningless. But when we delete a schema
of a topic, a new schema could be
+ // registered in the future.
+ log.info("Delete schema storage of id: {}", id);
+ return
getPulsar().getSchemaRegistryService().deleteSchemaStorage(id);
+ } else {
+ return CompletableFuture.completedFuture(null);
+ }
+ });
+ }
+
private CompletableFuture<Void> checkMaxTopicsPerNamespace(TopicName
topicName, int numPartitions) {
return pulsar.getPulsarResources().getNamespaceResources()
.getPoliciesAsync(topicName.getNamespaceObject())
diff --git
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/BrokerBkEnsemblesTests.java
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/BrokerBkEnsemblesTests.java
index 9f19bda3647..40649a41640 100644
---
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/BrokerBkEnsemblesTests.java
+++
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/BrokerBkEnsemblesTests.java
@@ -21,8 +21,8 @@ package org.apache.pulsar.broker.service;
import static
org.apache.pulsar.broker.auth.MockedPulsarServiceBaseTest.retryStrategically;
import static org.testng.Assert.assertEquals;
import static org.testng.Assert.assertNotEquals;
+import static org.testng.Assert.assertThrows;
import static org.testng.Assert.fail;
-
import java.lang.reflect.Field;
import java.util.Map.Entry;
import java.util.NavigableMap;
@@ -31,10 +31,8 @@ import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
-
import com.google.common.collect.Sets;
import lombok.Cleanup;
-
import lombok.extern.slf4j.Slf4j;
import org.apache.bookkeeper.client.BKException;
import org.apache.bookkeeper.client.BookKeeper;
@@ -47,6 +45,7 @@ import
org.apache.bookkeeper.mledger.proto.MLDataFormats.ManagedLedgerInfo.Ledge
import org.apache.bookkeeper.util.StringUtils;
import org.apache.pulsar.broker.BrokerTestUtil;
import org.apache.pulsar.broker.service.persistent.PersistentTopic;
+import org.apache.pulsar.client.admin.PulsarAdminException;
import org.apache.pulsar.client.api.Consumer;
import org.apache.pulsar.client.api.Message;
import org.apache.pulsar.client.api.Producer;
@@ -497,10 +496,31 @@ public class BrokerBkEnsemblesTests extends
BkEnsemblesTestBase {
// Expected
}
- // Deletion must succeed
- admin.topics().delete(topic);
+ assertThrows(PulsarAdminException.ServerSideErrorException.class, ()
-> admin.topics().delete(topic));
+ }
+
+ @Test
+ public void testDeleteTopicWithoutTopicLoaded() throws Exception {
+ String namespace = BrokerTestUtil.newUniqueName("prop/usc");
+ admin.namespaces().createNamespace(namespace);
+
+ String topic = BrokerTestUtil.newUniqueName(namespace + "/my-topic");
+
+ @Cleanup
+ PulsarClient client = PulsarClient.builder()
+ .serviceUrl(pulsar.getBrokerServiceUrl())
+ .statsInterval(0, TimeUnit.SECONDS)
+ .build();
- // Topic will not be there after
+ @Cleanup
+ Producer<String> producer = client.newProducer(Schema.STRING)
+ .topic(topic)
+ .create();
+
+ producer.close();
+ admin.topics().unload(topic);
+
+ admin.topics().delete(topic);
assertEquals(pulsar.getBrokerService().getTopicIfExists(topic).join(),
Optional.empty());
}
diff --git
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/systopic/PartitionedSystemTopicTest.java
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/systopic/PartitionedSystemTopicTest.java
index 008c2143a35..34e7dfe92e5 100644
---
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/systopic/PartitionedSystemTopicTest.java
+++
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/systopic/PartitionedSystemTopicTest.java
@@ -18,6 +18,8 @@
*/
package org.apache.pulsar.broker.systopic;
+import static org.testng.Assert.assertEquals;
+import static org.testng.Assert.assertNull;
import com.google.common.collect.Sets;
import java.nio.charset.StandardCharsets;
import java.util.ArrayList;
@@ -37,6 +39,7 @@ import org.apache.pulsar.broker.namespace.NamespaceService;
import org.apache.pulsar.broker.service.BrokerTestBase;
import org.apache.pulsar.broker.service.Topic;
import org.apache.pulsar.broker.service.persistent.PersistentTopic;
+import org.apache.pulsar.broker.service.schema.SchemaRegistry;
import org.apache.pulsar.client.admin.ListTopicsOptions;
import org.apache.pulsar.client.admin.PulsarAdminException;
import org.apache.pulsar.client.api.Consumer;
@@ -55,6 +58,7 @@ import org.apache.pulsar.common.naming.TopicVersion;
import org.apache.pulsar.common.policies.data.BacklogQuota;
import org.apache.pulsar.common.policies.data.TenantInfo;
import org.apache.pulsar.common.policies.data.TenantInfoImpl;
+import org.apache.pulsar.common.policies.data.TopicPolicies;
import org.apache.pulsar.common.policies.data.TopicType;
import org.apache.pulsar.common.util.FutureUtil;
import org.awaitility.Awaitility;
@@ -299,4 +303,25 @@ public class PartitionedSystemTopicTest extends
BrokerTestBase {
writer1.get().close();
writer2.get().close();
}
+
+ @Test
+ public void testDeleteTopicSchemaAndPolicyWhenTopicIsNotLoaded() throws
Exception {
+ final String ns = "prop/ns-test";
+ admin.namespaces().createNamespace(ns, 2);
+ final String topicName =
"persistent://prop/ns-test/testDeleteTopicSchemaAndPolicyWhenTopicIsNotLoaded";
+ admin.topics().createNonPartitionedTopic(topicName);
+
pulsarClient.newProducer(Schema.STRING).topic(topicName).create().close();
+ admin.topicPolicies().setMaxConsumers(topicName, 2);
+ Awaitility.await().untilAsserted(() ->
assertEquals(admin.topicPolicies().getMaxConsumers(topicName), 2));
+ CompletableFuture<Optional<Topic>> topic =
pulsar.getBrokerService().getTopic(topicName, false);
+ PersistentTopic persistentTopic = (PersistentTopic) topic.join().get();
+ persistentTopic.close();
+ admin.topics().delete(topicName);
+ TopicPolicies topicPolicies =
pulsar.getTopicPoliciesService().getTopicPoliciesIfExists(TopicName.get(topicName));
+ assertNull(topicPolicies);
+ String base = TopicName.get(topicName).getPartitionedTopicName();
+ String id = TopicName.get(base).getSchemaName();
+ CompletableFuture<SchemaRegistry.SchemaAndMetadata> schema =
pulsar.getSchemaRegistryService().getSchema(id);
+ assertNull(schema.join());
+ }
}