This is an automated email from the ASF dual-hosted git repository.

technoboy pushed a commit to branch branch-3.0
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/branch-3.0 by this push:
     new 115f7137630 [fix][broker] Fix deleting topic not delete the related 
topic policy and schema. (#21093)
115f7137630 is described below

commit 115f7137630de9fd9e0d490487633fb734beec49
Author: Jiwei Guo <[email protected]>
AuthorDate: Fri Sep 1 18:07:46 2023 +0800

    [fix][broker] Fix deleting topic not delete the related topic policy and 
schema. (#21093)
    
    Fixes #21075
    
    ### Motivation
    
    When the topic is loaded, it will delete the topic-level policy if it is 
enabled. But if the topic is not loaded, it will directly delete through 
managed ledger factory. But then we will leave the topic policy there. When the 
topic is created next time, it will use the old topic policy
    
    ### Modifications
    
    When deleting the topic, delete the schema and topic policies even if the 
topic is not loaded.
---
 .../pulsar/broker/service/AbstractTopic.java       | 17 +------
 .../pulsar/broker/service/BrokerService.java       | 55 +++++++++++++++++-----
 .../broker/service/BrokerBkEnsemblesTests.java     | 32 ++++++++++---
 .../systopic/PartitionedSystemTopicTest.java       | 25 ++++++++++
 4 files changed, 94 insertions(+), 35 deletions(-)

diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractTopic.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractTopic.java
index 8e17a022762..97966828fb1 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractTopic.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractTopic.java
@@ -58,7 +58,6 @@ import 
org.apache.pulsar.broker.service.BrokerServiceException.ProducerFencedExc
 import 
org.apache.pulsar.broker.service.BrokerServiceException.TopicMigratedException;
 import 
org.apache.pulsar.broker.service.BrokerServiceException.TopicTerminatedException;
 import org.apache.pulsar.broker.service.plugin.EntryFilter;
-import org.apache.pulsar.broker.service.schema.BookkeeperSchemaStorage;
 import org.apache.pulsar.broker.service.schema.SchemaRegistryService;
 import 
org.apache.pulsar.broker.service.schema.exceptions.IncompatibleSchemaException;
 import org.apache.pulsar.broker.stats.prometheus.metrics.Summary;
@@ -674,21 +673,7 @@ public abstract class AbstractTopic implements Topic, 
TopicPolicyListener<TopicP
 
     @Override
     public CompletableFuture<SchemaVersion> deleteSchema() {
-        String id = getSchemaId();
-        SchemaRegistryService schemaRegistryService = 
brokerService.pulsar().getSchemaRegistryService();
-        return 
BookkeeperSchemaStorage.ignoreUnrecoverableBKException(schemaRegistryService.getSchema(id))
-                .thenCompose(schema -> {
-                    if (schema != null) {
-                        // It's different from `SchemasResource.deleteSchema`
-                        // because when we delete a topic, the schema
-                        // history is meaningless. But when we delete a schema 
of a topic, a new schema could be
-                        // registered in the future.
-                        log.info("Delete schema storage of id: {}", id);
-                        return schemaRegistryService.deleteSchemaStorage(id);
-                    } else {
-                        return CompletableFuture.completedFuture(null);
-                    }
-                });
+        return brokerService.deleteSchema(TopicName.get(getName()));
     }
 
     @Override
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java
index 2da3e73d8b6..d6b17f4faa4 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java
@@ -119,6 +119,8 @@ import 
org.apache.pulsar.broker.service.persistent.PersistentDispatcherMultipleC
 import org.apache.pulsar.broker.service.persistent.PersistentTopic;
 import org.apache.pulsar.broker.service.persistent.SystemTopic;
 import org.apache.pulsar.broker.service.plugin.EntryFilterProvider;
+import org.apache.pulsar.broker.service.schema.BookkeeperSchemaStorage;
+import org.apache.pulsar.broker.service.schema.SchemaRegistryService;
 import org.apache.pulsar.broker.stats.ClusterReplicationMetrics;
 import org.apache.pulsar.broker.stats.prometheus.metrics.ObserverGauge;
 import org.apache.pulsar.broker.stats.prometheus.metrics.Summary;
@@ -159,6 +161,7 @@ import 
org.apache.pulsar.common.policies.data.RetentionPolicies;
 import org.apache.pulsar.common.policies.data.TopicPolicies;
 import org.apache.pulsar.common.policies.data.TopicType;
 import org.apache.pulsar.common.policies.data.stats.TopicStatsImpl;
+import org.apache.pulsar.common.protocol.schema.SchemaVersion;
 import org.apache.pulsar.common.stats.Metrics;
 import org.apache.pulsar.common.util.FieldParser;
 import org.apache.pulsar.common.util.FutureUtil;
@@ -1156,26 +1159,33 @@ public class BrokerService implements Closeable {
         CompletableFuture<Void> future = new CompletableFuture<>();
         CompletableFuture<Void> deleteTopicAuthenticationFuture = new 
CompletableFuture<>();
         deleteTopicAuthenticationWithRetry(topic, 
deleteTopicAuthenticationFuture, 5);
-
-        deleteTopicAuthenticationFuture.whenComplete((v, ex) -> {
+        deleteTopicAuthenticationFuture
+        .thenCompose(__ -> deleteSchema(tn))
+        .thenCompose(__ -> {
+            if (!SystemTopicNames.isTopicPoliciesSystemTopic(topic)
+                    && getPulsar().getConfiguration().isSystemTopicEnabled()) {
+                return deleteTopicPolicies(tn);
+            }
+            return CompletableFuture.completedFuture(null);
+        }).whenComplete((v, ex) -> {
             if (ex != null) {
                 future.completeExceptionally(ex);
                 return;
             }
             CompletableFuture<ManagedLedgerConfig> mlConfigFuture = 
getManagedLedgerConfig(topicName);
             managedLedgerFactory.asyncDelete(tn.getPersistenceNamingEncoding(),
-                    mlConfigFuture, new DeleteLedgerCallback() {
-                        @Override
-                        public void deleteLedgerComplete(Object ctx) {
-                            future.complete(null);
-                        }
+                mlConfigFuture, new DeleteLedgerCallback() {
+                    @Override
+                    public void deleteLedgerComplete(Object ctx) {
+                        future.complete(null);
+                    }
 
-                        @Override
-                        public void deleteLedgerFailed(ManagedLedgerException 
exception, Object ctx) {
-                            future.completeExceptionally(exception);
-                        }
-                    }, null);
-        });
+                    @Override
+                    public void deleteLedgerFailed(ManagedLedgerException 
exception, Object ctx) {
+                        future.completeExceptionally(exception);
+                    }
+                }, null);
+         });
 
         return future;
     }
@@ -3457,6 +3467,25 @@ public class BrokerService implements Closeable {
                 
.deleteTopicPoliciesAsync(TopicName.get(topicName.getPartitionedTopicName()));
     }
 
+    CompletableFuture<SchemaVersion> deleteSchema(TopicName topicName) {
+        String base = topicName.getPartitionedTopicName();
+        String id = TopicName.get(base).getSchemaName();
+        SchemaRegistryService schemaRegistryService = 
getPulsar().getSchemaRegistryService();
+        return 
BookkeeperSchemaStorage.ignoreUnrecoverableBKException(schemaRegistryService.getSchema(id))
+                .thenCompose(schema -> {
+                    if (schema != null) {
+                        // It's different from `SchemasResource.deleteSchema`
+                        // because when we delete a topic, the schema
+                        // history is meaningless. But when we delete a schema 
of a topic, a new schema could be
+                        // registered in the future.
+                        log.info("Delete schema storage of id: {}", id);
+                        return 
getPulsar().getSchemaRegistryService().deleteSchemaStorage(id);
+                    } else {
+                        return CompletableFuture.completedFuture(null);
+                    }
+                });
+    }
+
     private CompletableFuture<Void> checkMaxTopicsPerNamespace(TopicName 
topicName, int numPartitions) {
         return pulsar.getPulsarResources().getNamespaceResources()
                 .getPoliciesAsync(topicName.getNamespaceObject())
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/BrokerBkEnsemblesTests.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/BrokerBkEnsemblesTests.java
index 9f19bda3647..40649a41640 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/BrokerBkEnsemblesTests.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/BrokerBkEnsemblesTests.java
@@ -21,8 +21,8 @@ package org.apache.pulsar.broker.service;
 import static 
org.apache.pulsar.broker.auth.MockedPulsarServiceBaseTest.retryStrategically;
 import static org.testng.Assert.assertEquals;
 import static org.testng.Assert.assertNotEquals;
+import static org.testng.Assert.assertThrows;
 import static org.testng.Assert.fail;
-
 import java.lang.reflect.Field;
 import java.util.Map.Entry;
 import java.util.NavigableMap;
@@ -31,10 +31,8 @@ import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ExecutionException;
 import java.util.concurrent.TimeUnit;
-
 import com.google.common.collect.Sets;
 import lombok.Cleanup;
-
 import lombok.extern.slf4j.Slf4j;
 import org.apache.bookkeeper.client.BKException;
 import org.apache.bookkeeper.client.BookKeeper;
@@ -47,6 +45,7 @@ import 
org.apache.bookkeeper.mledger.proto.MLDataFormats.ManagedLedgerInfo.Ledge
 import org.apache.bookkeeper.util.StringUtils;
 import org.apache.pulsar.broker.BrokerTestUtil;
 import org.apache.pulsar.broker.service.persistent.PersistentTopic;
+import org.apache.pulsar.client.admin.PulsarAdminException;
 import org.apache.pulsar.client.api.Consumer;
 import org.apache.pulsar.client.api.Message;
 import org.apache.pulsar.client.api.Producer;
@@ -497,10 +496,31 @@ public class BrokerBkEnsemblesTests extends 
BkEnsemblesTestBase {
             // Expected
         }
 
-        // Deletion must succeed
-        admin.topics().delete(topic);
+        assertThrows(PulsarAdminException.ServerSideErrorException.class, () 
-> admin.topics().delete(topic));
+    }
+
+    @Test
+    public void testDeleteTopicWithoutTopicLoaded() throws Exception {
+        String namespace = BrokerTestUtil.newUniqueName("prop/usc");
+        admin.namespaces().createNamespace(namespace);
+
+        String topic = BrokerTestUtil.newUniqueName(namespace + "/my-topic");
+
+        @Cleanup
+        PulsarClient client = PulsarClient.builder()
+                .serviceUrl(pulsar.getBrokerServiceUrl())
+                .statsInterval(0, TimeUnit.SECONDS)
+                .build();
 
-        // Topic will not be there after
+        @Cleanup
+        Producer<String> producer = client.newProducer(Schema.STRING)
+                .topic(topic)
+                .create();
+
+        producer.close();
+        admin.topics().unload(topic);
+
+        admin.topics().delete(topic);
         assertEquals(pulsar.getBrokerService().getTopicIfExists(topic).join(), 
Optional.empty());
     }
 
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/systopic/PartitionedSystemTopicTest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/systopic/PartitionedSystemTopicTest.java
index 008c2143a35..34e7dfe92e5 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/systopic/PartitionedSystemTopicTest.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/systopic/PartitionedSystemTopicTest.java
@@ -18,6 +18,8 @@
  */
 package org.apache.pulsar.broker.systopic;
 
+import static org.testng.Assert.assertEquals;
+import static org.testng.Assert.assertNull;
 import com.google.common.collect.Sets;
 import java.nio.charset.StandardCharsets;
 import java.util.ArrayList;
@@ -37,6 +39,7 @@ import org.apache.pulsar.broker.namespace.NamespaceService;
 import org.apache.pulsar.broker.service.BrokerTestBase;
 import org.apache.pulsar.broker.service.Topic;
 import org.apache.pulsar.broker.service.persistent.PersistentTopic;
+import org.apache.pulsar.broker.service.schema.SchemaRegistry;
 import org.apache.pulsar.client.admin.ListTopicsOptions;
 import org.apache.pulsar.client.admin.PulsarAdminException;
 import org.apache.pulsar.client.api.Consumer;
@@ -55,6 +58,7 @@ import org.apache.pulsar.common.naming.TopicVersion;
 import org.apache.pulsar.common.policies.data.BacklogQuota;
 import org.apache.pulsar.common.policies.data.TenantInfo;
 import org.apache.pulsar.common.policies.data.TenantInfoImpl;
+import org.apache.pulsar.common.policies.data.TopicPolicies;
 import org.apache.pulsar.common.policies.data.TopicType;
 import org.apache.pulsar.common.util.FutureUtil;
 import org.awaitility.Awaitility;
@@ -299,4 +303,25 @@ public class PartitionedSystemTopicTest extends 
BrokerTestBase {
         writer1.get().close();
         writer2.get().close();
     }
+
+    @Test
+    public void testDeleteTopicSchemaAndPolicyWhenTopicIsNotLoaded() throws 
Exception {
+        final String ns = "prop/ns-test";
+        admin.namespaces().createNamespace(ns, 2);
+        final String topicName = 
"persistent://prop/ns-test/testDeleteTopicSchemaAndPolicyWhenTopicIsNotLoaded";
+        admin.topics().createNonPartitionedTopic(topicName);
+        
pulsarClient.newProducer(Schema.STRING).topic(topicName).create().close();
+        admin.topicPolicies().setMaxConsumers(topicName, 2);
+        Awaitility.await().untilAsserted(() -> 
assertEquals(admin.topicPolicies().getMaxConsumers(topicName), 2));
+        CompletableFuture<Optional<Topic>> topic = 
pulsar.getBrokerService().getTopic(topicName, false);
+        PersistentTopic persistentTopic = (PersistentTopic) topic.join().get();
+        persistentTopic.close();
+        admin.topics().delete(topicName);
+        TopicPolicies topicPolicies = 
pulsar.getTopicPoliciesService().getTopicPoliciesIfExists(TopicName.get(topicName));
+        assertNull(topicPolicies);
+        String base = TopicName.get(topicName).getPartitionedTopicName();
+        String id = TopicName.get(base).getSchemaName();
+        CompletableFuture<SchemaRegistry.SchemaAndMetadata> schema = 
pulsar.getSchemaRegistryService().getSchema(id);
+        assertNull(schema.join());
+    }
 }

Reply via email to