This is an automated email from the ASF dual-hosted git repository.

heesung pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/master by this push:
     new 1b4127a6b39 [fix][broker] Fix schema deletion error when deleting a 
partitioned topic with many partitions and schema (#21977)
1b4127a6b39 is described below

commit 1b4127a6b39cb57c3a9cceb4e05ee5cdb6104c78
Author: Heesung Sohn <[email protected]>
AuthorDate: Mon Jan 29 20:16:05 2024 -0800

    [fix][broker] Fix schema deletion error when deleting a partitioned topic 
with many partitions and schema (#21977)
---
 .../pulsar/broker/service/BrokerService.java       | 29 ++++++++++------------
 .../service/schema/BookkeeperSchemaStorage.java    |  6 +++--
 .../tests/integration/schema/SchemaTest.java       | 11 ++++++++
 3 files changed, 28 insertions(+), 18 deletions(-)

diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java
index 8654a830050..0383c63b1f3 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java
@@ -125,8 +125,6 @@ import 
org.apache.pulsar.broker.service.persistent.PersistentDispatcherMultipleC
 import org.apache.pulsar.broker.service.persistent.PersistentTopic;
 import org.apache.pulsar.broker.service.persistent.SystemTopic;
 import org.apache.pulsar.broker.service.plugin.EntryFilterProvider;
-import org.apache.pulsar.broker.service.schema.BookkeeperSchemaStorage;
-import org.apache.pulsar.broker.service.schema.SchemaRegistryService;
 import org.apache.pulsar.broker.stats.ClusterReplicationMetrics;
 import org.apache.pulsar.broker.stats.prometheus.metrics.ObserverGauge;
 import org.apache.pulsar.broker.stats.prometheus.metrics.Summary;
@@ -3475,22 +3473,21 @@ public class BrokerService implements Closeable {
     }
 
     public CompletableFuture<SchemaVersion> deleteSchema(TopicName topicName) {
+        // delete schema at the upper level when deleting the partitioned 
topic.
+        if (topicName.isPartitioned()) {
+            return CompletableFuture.completedFuture(null);
+        }
         String base = topicName.getPartitionedTopicName();
         String id = TopicName.get(base).getSchemaName();
-        SchemaRegistryService schemaRegistryService = 
getPulsar().getSchemaRegistryService();
-        return 
BookkeeperSchemaStorage.ignoreUnrecoverableBKException(schemaRegistryService.getSchema(id))
-                .thenCompose(schema -> {
-                    if (schema != null) {
-                        // It's different from `SchemasResource.deleteSchema`
-                        // because when we delete a topic, the schema
-                        // history is meaningless. But when we delete a schema 
of a topic, a new schema could be
-                        // registered in the future.
-                        log.info("Delete schema storage of id: {}", id);
-                        return 
getPulsar().getSchemaRegistryService().deleteSchemaStorage(id);
-                    } else {
-                        return CompletableFuture.completedFuture(null);
-                    }
-                });
+        return 
getPulsar().getSchemaRegistryService().deleteSchemaStorage(id).whenComplete((vid,
 ex) -> {
+            if (vid != null && ex == null) {
+                // It's different from `SchemasResource.deleteSchema`
+                // because when we delete a topic, the schema
+                // history is meaningless. But when we delete a schema of a 
topic, a new schema could be
+                // registered in the future.
+                log.info("Deleted schema storage of id: {}", id);
+            }
+        });
     }
 
     private CompletableFuture<Void> checkMaxTopicsPerNamespace(TopicName 
topicName, int numPartitions) {
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/schema/BookkeeperSchemaStorage.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/schema/BookkeeperSchemaStorage.java
index 78e30f6fff8..c509764bf67 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/schema/BookkeeperSchemaStorage.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/schema/BookkeeperSchemaStorage.java
@@ -707,7 +707,8 @@ public class BookkeeperSchemaStorage implements 
SchemaStorage {
             message += " - entry=" + entryId;
         }
         boolean recoverable = rc != 
BKException.Code.NoSuchLedgerExistsException
-                && rc != BKException.Code.NoSuchEntryException;
+                && rc != BKException.Code.NoSuchEntryException
+                && rc != 
BKException.Code.NoSuchLedgerExistsOnMetadataServerException;
         return new SchemaException(recoverable, message);
     }
 
@@ -716,7 +717,8 @@ public class BookkeeperSchemaStorage implements 
SchemaStorage {
             if (t.getCause() != null
                     && (t.getCause() instanceof SchemaException)
                     && !((SchemaException) t.getCause()).isRecoverable()) {
-                // Meeting NoSuchLedgerExistsException or NoSuchEntryException 
when reading schemas in
+                // Meeting NoSuchLedgerExistsException, NoSuchEntryException or
+                // NoSuchLedgerExistsOnMetadataServerException when reading 
schemas in
                 // bookkeeper. This also means that the data has already been 
deleted by other operations
                 // in deleting schema.
                 if (log.isDebugEnabled()) {
diff --git 
a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/schema/SchemaTest.java
 
b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/schema/SchemaTest.java
index 8bb6de74c66..d0421063b2d 100644
--- 
a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/schema/SchemaTest.java
+++ 
b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/schema/SchemaTest.java
@@ -31,6 +31,8 @@ import org.apache.pulsar.client.api.schema.GenericRecord;
 import org.apache.pulsar.client.api.schema.SchemaDefinition;
 import org.apache.pulsar.common.naming.TopicDomain;
 import org.apache.pulsar.common.naming.TopicName;
+import org.apache.pulsar.common.schema.SchemaInfo;
+import org.apache.pulsar.common.schema.SchemaType;
 import org.apache.pulsar.tests.integration.schema.Schemas.Person;
 import org.apache.pulsar.tests.integration.schema.Schemas.PersonConsumeSchema;
 import org.apache.pulsar.tests.integration.schema.Schemas.Student;
@@ -316,5 +318,14 @@ public class SchemaTest extends PulsarTestSuite {
 
     }
 
+    @Test
+    public void testDeletePartitionedTopicWhenTopicReferenceIsNotReady() 
throws Exception {
+        final String topic = "persistent://public/default/tp-ref";
+        admin.topics().createPartitionedTopic(topic, 20);
+        admin.schemas().createSchema(topic,
+                SchemaInfo.builder().type(SchemaType.STRING).schema(new 
byte[0]).build());
+        admin.topics().deletePartitionedTopic(topic, false);
+    }
+
 }
 

Reply via email to