This is an automated email from the ASF dual-hosted git repository.

zike pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/master by this push:
     new a1405ea006f [fix][broker] Fix deleting topic not delete the related 
topic policy and schema. (#21093)
a1405ea006f is described below

commit a1405ea006f175b1bd0b9d28b9444d592fb4a010
Author: Jiwei Guo <[email protected]>
AuthorDate: Fri Sep 1 18:07:46 2023 +0800

    [fix][broker] Fix deleting topic not delete the related topic policy and 
schema. (#21093)
    
    Fixes #21075
    
    ### Motivation
    
    When the topic is loaded, it will delete the topic-level policy if it is 
enabled. But if the topic is not loaded, it will directly delete through 
managed ledger factory. But then we will leave the topic policy there. When the 
topic is created next time, it will use the old topic policy
    
    ### Modifications
    
    When deleting the topic, delete the schema and topic policies even if the 
topic is not loaded.
---
 .../pulsar/broker/service/AbstractTopic.java       | 17 +------
 .../pulsar/broker/service/BrokerService.java       | 55 +++++++++++++++++-----
 .../broker/service/BrokerBkEnsemblesTests.java     | 32 ++++++++++---
 .../systopic/PartitionedSystemTopicTest.java       | 25 ++++++++++
 4 files changed, 94 insertions(+), 35 deletions(-)

diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractTopic.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractTopic.java
index b15f8cbf0b8..cef2dd2080c 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractTopic.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractTopic.java
@@ -58,7 +58,6 @@ import 
org.apache.pulsar.broker.service.BrokerServiceException.ProducerFencedExc
 import 
org.apache.pulsar.broker.service.BrokerServiceException.TopicMigratedException;
 import 
org.apache.pulsar.broker.service.BrokerServiceException.TopicTerminatedException;
 import org.apache.pulsar.broker.service.plugin.EntryFilter;
-import org.apache.pulsar.broker.service.schema.BookkeeperSchemaStorage;
 import org.apache.pulsar.broker.service.schema.SchemaRegistryService;
 import 
org.apache.pulsar.broker.service.schema.exceptions.IncompatibleSchemaException;
 import org.apache.pulsar.broker.stats.prometheus.metrics.Summary;
@@ -674,21 +673,7 @@ public abstract class AbstractTopic implements Topic, 
TopicPolicyListener<TopicP
 
     @Override
     public CompletableFuture<SchemaVersion> deleteSchema() {
-        String id = getSchemaId();
-        SchemaRegistryService schemaRegistryService = 
brokerService.pulsar().getSchemaRegistryService();
-        return 
BookkeeperSchemaStorage.ignoreUnrecoverableBKException(schemaRegistryService.getSchema(id))
-                .thenCompose(schema -> {
-                    if (schema != null) {
-                        // It's different from `SchemasResource.deleteSchema`
-                        // because when we delete a topic, the schema
-                        // history is meaningless. But when we delete a schema 
of a topic, a new schema could be
-                        // registered in the future.
-                        log.info("Delete schema storage of id: {}", id);
-                        return schemaRegistryService.deleteSchemaStorage(id);
-                    } else {
-                        return CompletableFuture.completedFuture(null);
-                    }
-                });
+        return brokerService.deleteSchema(TopicName.get(getName()));
     }
 
     @Override
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java
index 199901b0902..391affef1da 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java
@@ -119,6 +119,8 @@ import 
org.apache.pulsar.broker.service.persistent.PersistentDispatcherMultipleC
 import org.apache.pulsar.broker.service.persistent.PersistentTopic;
 import org.apache.pulsar.broker.service.persistent.SystemTopic;
 import org.apache.pulsar.broker.service.plugin.EntryFilterProvider;
+import org.apache.pulsar.broker.service.schema.BookkeeperSchemaStorage;
+import org.apache.pulsar.broker.service.schema.SchemaRegistryService;
 import org.apache.pulsar.broker.stats.ClusterReplicationMetrics;
 import org.apache.pulsar.broker.stats.prometheus.metrics.ObserverGauge;
 import org.apache.pulsar.broker.stats.prometheus.metrics.Summary;
@@ -159,6 +161,7 @@ import 
org.apache.pulsar.common.policies.data.RetentionPolicies;
 import org.apache.pulsar.common.policies.data.TopicPolicies;
 import org.apache.pulsar.common.policies.data.TopicType;
 import org.apache.pulsar.common.policies.data.stats.TopicStatsImpl;
+import org.apache.pulsar.common.protocol.schema.SchemaVersion;
 import org.apache.pulsar.common.stats.Metrics;
 import org.apache.pulsar.common.util.FieldParser;
 import org.apache.pulsar.common.util.FutureUtil;
@@ -1156,26 +1159,33 @@ public class BrokerService implements Closeable {
         CompletableFuture<Void> future = new CompletableFuture<>();
         CompletableFuture<Void> deleteTopicAuthenticationFuture = new 
CompletableFuture<>();
         deleteTopicAuthenticationWithRetry(topic, 
deleteTopicAuthenticationFuture, 5);
-
-        deleteTopicAuthenticationFuture.whenComplete((v, ex) -> {
+        deleteTopicAuthenticationFuture
+        .thenCompose(__ -> deleteSchema(tn))
+        .thenCompose(__ -> {
+            if (!SystemTopicNames.isTopicPoliciesSystemTopic(topic)
+                    && getPulsar().getConfiguration().isSystemTopicEnabled()) {
+                return deleteTopicPolicies(tn);
+            }
+            return CompletableFuture.completedFuture(null);
+        }).whenComplete((v, ex) -> {
             if (ex != null) {
                 future.completeExceptionally(ex);
                 return;
             }
             CompletableFuture<ManagedLedgerConfig> mlConfigFuture = 
getManagedLedgerConfig(topicName);
             managedLedgerFactory.asyncDelete(tn.getPersistenceNamingEncoding(),
-                    mlConfigFuture, new DeleteLedgerCallback() {
-                        @Override
-                        public void deleteLedgerComplete(Object ctx) {
-                            future.complete(null);
-                        }
+                mlConfigFuture, new DeleteLedgerCallback() {
+                    @Override
+                    public void deleteLedgerComplete(Object ctx) {
+                        future.complete(null);
+                    }
 
-                        @Override
-                        public void deleteLedgerFailed(ManagedLedgerException 
exception, Object ctx) {
-                            future.completeExceptionally(exception);
-                        }
-                    }, null);
-        });
+                    @Override
+                    public void deleteLedgerFailed(ManagedLedgerException 
exception, Object ctx) {
+                        future.completeExceptionally(exception);
+                    }
+                }, null);
+         });
 
         return future;
     }
@@ -3451,6 +3461,25 @@ public class BrokerService implements Closeable {
                 
.deleteTopicPoliciesAsync(TopicName.get(topicName.getPartitionedTopicName()));
     }
 
+    CompletableFuture<SchemaVersion> deleteSchema(TopicName topicName) {
+        String base = topicName.getPartitionedTopicName();
+        String id = TopicName.get(base).getSchemaName();
+        SchemaRegistryService schemaRegistryService = 
getPulsar().getSchemaRegistryService();
+        return 
BookkeeperSchemaStorage.ignoreUnrecoverableBKException(schemaRegistryService.getSchema(id))
+                .thenCompose(schema -> {
+                    if (schema != null) {
+                        // It's different from `SchemasResource.deleteSchema`
+                        // because when we delete a topic, the schema
+                        // history is meaningless. But when we delete a schema 
of a topic, a new schema could be
+                        // registered in the future.
+                        log.info("Delete schema storage of id: {}", id);
+                        return 
getPulsar().getSchemaRegistryService().deleteSchemaStorage(id);
+                    } else {
+                        return CompletableFuture.completedFuture(null);
+                    }
+                });
+    }
+
     private CompletableFuture<Void> checkMaxTopicsPerNamespace(TopicName 
topicName, int numPartitions) {
         return pulsar.getPulsarResources().getNamespaceResources()
                 .getPoliciesAsync(topicName.getNamespaceObject())
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/BrokerBkEnsemblesTests.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/BrokerBkEnsemblesTests.java
index 9f19bda3647..40649a41640 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/BrokerBkEnsemblesTests.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/BrokerBkEnsemblesTests.java
@@ -21,8 +21,8 @@ package org.apache.pulsar.broker.service;
 import static 
org.apache.pulsar.broker.auth.MockedPulsarServiceBaseTest.retryStrategically;
 import static org.testng.Assert.assertEquals;
 import static org.testng.Assert.assertNotEquals;
+import static org.testng.Assert.assertThrows;
 import static org.testng.Assert.fail;
-
 import java.lang.reflect.Field;
 import java.util.Map.Entry;
 import java.util.NavigableMap;
@@ -31,10 +31,8 @@ import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ExecutionException;
 import java.util.concurrent.TimeUnit;
-
 import com.google.common.collect.Sets;
 import lombok.Cleanup;
-
 import lombok.extern.slf4j.Slf4j;
 import org.apache.bookkeeper.client.BKException;
 import org.apache.bookkeeper.client.BookKeeper;
@@ -47,6 +45,7 @@ import 
org.apache.bookkeeper.mledger.proto.MLDataFormats.ManagedLedgerInfo.Ledge
 import org.apache.bookkeeper.util.StringUtils;
 import org.apache.pulsar.broker.BrokerTestUtil;
 import org.apache.pulsar.broker.service.persistent.PersistentTopic;
+import org.apache.pulsar.client.admin.PulsarAdminException;
 import org.apache.pulsar.client.api.Consumer;
 import org.apache.pulsar.client.api.Message;
 import org.apache.pulsar.client.api.Producer;
@@ -497,10 +496,31 @@ public class BrokerBkEnsemblesTests extends 
BkEnsemblesTestBase {
             // Expected
         }
 
-        // Deletion must succeed
-        admin.topics().delete(topic);
+        assertThrows(PulsarAdminException.ServerSideErrorException.class, () 
-> admin.topics().delete(topic));
+    }
+
+    @Test
+    public void testDeleteTopicWithoutTopicLoaded() throws Exception {
+        String namespace = BrokerTestUtil.newUniqueName("prop/usc");
+        admin.namespaces().createNamespace(namespace);
+
+        String topic = BrokerTestUtil.newUniqueName(namespace + "/my-topic");
+
+        @Cleanup
+        PulsarClient client = PulsarClient.builder()
+                .serviceUrl(pulsar.getBrokerServiceUrl())
+                .statsInterval(0, TimeUnit.SECONDS)
+                .build();
 
-        // Topic will not be there after
+        @Cleanup
+        Producer<String> producer = client.newProducer(Schema.STRING)
+                .topic(topic)
+                .create();
+
+        producer.close();
+        admin.topics().unload(topic);
+
+        admin.topics().delete(topic);
         assertEquals(pulsar.getBrokerService().getTopicIfExists(topic).join(), 
Optional.empty());
     }
 
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/systopic/PartitionedSystemTopicTest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/systopic/PartitionedSystemTopicTest.java
index 008c2143a35..34e7dfe92e5 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/systopic/PartitionedSystemTopicTest.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/systopic/PartitionedSystemTopicTest.java
@@ -18,6 +18,8 @@
  */
 package org.apache.pulsar.broker.systopic;
 
+import static org.testng.Assert.assertEquals;
+import static org.testng.Assert.assertNull;
 import com.google.common.collect.Sets;
 import java.nio.charset.StandardCharsets;
 import java.util.ArrayList;
@@ -37,6 +39,7 @@ import org.apache.pulsar.broker.namespace.NamespaceService;
 import org.apache.pulsar.broker.service.BrokerTestBase;
 import org.apache.pulsar.broker.service.Topic;
 import org.apache.pulsar.broker.service.persistent.PersistentTopic;
+import org.apache.pulsar.broker.service.schema.SchemaRegistry;
 import org.apache.pulsar.client.admin.ListTopicsOptions;
 import org.apache.pulsar.client.admin.PulsarAdminException;
 import org.apache.pulsar.client.api.Consumer;
@@ -55,6 +58,7 @@ import org.apache.pulsar.common.naming.TopicVersion;
 import org.apache.pulsar.common.policies.data.BacklogQuota;
 import org.apache.pulsar.common.policies.data.TenantInfo;
 import org.apache.pulsar.common.policies.data.TenantInfoImpl;
+import org.apache.pulsar.common.policies.data.TopicPolicies;
 import org.apache.pulsar.common.policies.data.TopicType;
 import org.apache.pulsar.common.util.FutureUtil;
 import org.awaitility.Awaitility;
@@ -299,4 +303,25 @@ public class PartitionedSystemTopicTest extends 
BrokerTestBase {
         writer1.get().close();
         writer2.get().close();
     }
+
+    @Test
+    public void testDeleteTopicSchemaAndPolicyWhenTopicIsNotLoaded() throws 
Exception {
+        final String ns = "prop/ns-test";
+        admin.namespaces().createNamespace(ns, 2);
+        final String topicName = 
"persistent://prop/ns-test/testDeleteTopicSchemaAndPolicyWhenTopicIsNotLoaded";
+        admin.topics().createNonPartitionedTopic(topicName);
+        
pulsarClient.newProducer(Schema.STRING).topic(topicName).create().close();
+        admin.topicPolicies().setMaxConsumers(topicName, 2);
+        Awaitility.await().untilAsserted(() -> 
assertEquals(admin.topicPolicies().getMaxConsumers(topicName), 2));
+        CompletableFuture<Optional<Topic>> topic = 
pulsar.getBrokerService().getTopic(topicName, false);
+        PersistentTopic persistentTopic = (PersistentTopic) topic.join().get();
+        persistentTopic.close();
+        admin.topics().delete(topicName);
+        TopicPolicies topicPolicies = 
pulsar.getTopicPoliciesService().getTopicPoliciesIfExists(TopicName.get(topicName));
+        assertNull(topicPolicies);
+        String base = TopicName.get(topicName).getPartitionedTopicName();
+        String id = TopicName.get(base).getSchemaName();
+        CompletableFuture<SchemaRegistry.SchemaAndMetadata> schema = 
pulsar.getSchemaRegistryService().getSchema(id);
+        assertNull(schema.join());
+    }
 }

Reply via email to