This is an automated email from the ASF dual-hosted git repository.
heesung pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/master by this push:
new 1b4127a6b39 [fix][broker] Fix schema deletion error when deleting a
partitioned topic with many partitions and schema (#21977)
1b4127a6b39 is described below
commit 1b4127a6b39cb57c3a9cceb4e05ee5cdb6104c78
Author: Heesung Sohn <[email protected]>
AuthorDate: Mon Jan 29 20:16:05 2024 -0800
[fix][broker] Fix schema deletion error when deleting a partitioned topic
with many partitions and schema (#21977)
---
.../pulsar/broker/service/BrokerService.java | 29 ++++++++++------------
.../service/schema/BookkeeperSchemaStorage.java | 6 +++--
.../tests/integration/schema/SchemaTest.java | 11 ++++++++
3 files changed, 28 insertions(+), 18 deletions(-)
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java
index 8654a830050..0383c63b1f3 100644
---
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java
+++
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java
@@ -125,8 +125,6 @@ import
org.apache.pulsar.broker.service.persistent.PersistentDispatcherMultipleC
import org.apache.pulsar.broker.service.persistent.PersistentTopic;
import org.apache.pulsar.broker.service.persistent.SystemTopic;
import org.apache.pulsar.broker.service.plugin.EntryFilterProvider;
-import org.apache.pulsar.broker.service.schema.BookkeeperSchemaStorage;
-import org.apache.pulsar.broker.service.schema.SchemaRegistryService;
import org.apache.pulsar.broker.stats.ClusterReplicationMetrics;
import org.apache.pulsar.broker.stats.prometheus.metrics.ObserverGauge;
import org.apache.pulsar.broker.stats.prometheus.metrics.Summary;
@@ -3475,22 +3473,21 @@ public class BrokerService implements Closeable {
}
public CompletableFuture<SchemaVersion> deleteSchema(TopicName topicName) {
+ // delete schema at the upper level when deleting the partitioned
topic.
+ if (topicName.isPartitioned()) {
+ return CompletableFuture.completedFuture(null);
+ }
String base = topicName.getPartitionedTopicName();
String id = TopicName.get(base).getSchemaName();
- SchemaRegistryService schemaRegistryService =
getPulsar().getSchemaRegistryService();
- return
BookkeeperSchemaStorage.ignoreUnrecoverableBKException(schemaRegistryService.getSchema(id))
- .thenCompose(schema -> {
- if (schema != null) {
- // It's different from `SchemasResource.deleteSchema`
- // because when we delete a topic, the schema
- // history is meaningless. But when we delete a schema
of a topic, a new schema could be
- // registered in the future.
- log.info("Delete schema storage of id: {}", id);
- return
getPulsar().getSchemaRegistryService().deleteSchemaStorage(id);
- } else {
- return CompletableFuture.completedFuture(null);
- }
- });
+ return
getPulsar().getSchemaRegistryService().deleteSchemaStorage(id).whenComplete((vid,
ex) -> {
+ if (vid != null && ex == null) {
+ // It's different from `SchemasResource.deleteSchema`
+ // because when we delete a topic, the schema
+ // history is meaningless. But when we delete a schema of a
topic, a new schema could be
+ // registered in the future.
+ log.info("Deleted schema storage of id: {}", id);
+ }
+ });
}
private CompletableFuture<Void> checkMaxTopicsPerNamespace(TopicName
topicName, int numPartitions) {
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/schema/BookkeeperSchemaStorage.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/schema/BookkeeperSchemaStorage.java
index 78e30f6fff8..c509764bf67 100644
---
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/schema/BookkeeperSchemaStorage.java
+++
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/schema/BookkeeperSchemaStorage.java
@@ -707,7 +707,8 @@ public class BookkeeperSchemaStorage implements
SchemaStorage {
message += " - entry=" + entryId;
}
boolean recoverable = rc !=
BKException.Code.NoSuchLedgerExistsException
- && rc != BKException.Code.NoSuchEntryException;
+ && rc != BKException.Code.NoSuchEntryException
+ && rc !=
BKException.Code.NoSuchLedgerExistsOnMetadataServerException;
return new SchemaException(recoverable, message);
}
@@ -716,7 +717,8 @@ public class BookkeeperSchemaStorage implements
SchemaStorage {
if (t.getCause() != null
&& (t.getCause() instanceof SchemaException)
&& !((SchemaException) t.getCause()).isRecoverable()) {
- // Meeting NoSuchLedgerExistsException or NoSuchEntryException
when reading schemas in
+ // Meeting NoSuchLedgerExistsException, NoSuchEntryException or
+ // NoSuchLedgerExistsOnMetadataServerException when reading
schemas in
// bookkeeper. This also means that the data has already been
deleted by other operations
// in deleting schema.
if (log.isDebugEnabled()) {
diff --git
a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/schema/SchemaTest.java
b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/schema/SchemaTest.java
index 8bb6de74c66..d0421063b2d 100644
---
a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/schema/SchemaTest.java
+++
b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/schema/SchemaTest.java
@@ -31,6 +31,8 @@ import org.apache.pulsar.client.api.schema.GenericRecord;
import org.apache.pulsar.client.api.schema.SchemaDefinition;
import org.apache.pulsar.common.naming.TopicDomain;
import org.apache.pulsar.common.naming.TopicName;
+import org.apache.pulsar.common.schema.SchemaInfo;
+import org.apache.pulsar.common.schema.SchemaType;
import org.apache.pulsar.tests.integration.schema.Schemas.Person;
import org.apache.pulsar.tests.integration.schema.Schemas.PersonConsumeSchema;
import org.apache.pulsar.tests.integration.schema.Schemas.Student;
@@ -316,5 +318,14 @@ public class SchemaTest extends PulsarTestSuite {
}
+ @Test
+ public void testDeletePartitionedTopicWhenTopicReferenceIsNotReady()
throws Exception {
+ final String topic = "persistent://public/default/tp-ref";
+ admin.topics().createPartitionedTopic(topic, 20);
+ admin.schemas().createSchema(topic,
+ SchemaInfo.builder().type(SchemaType.STRING).schema(new
byte[0]).build());
+ admin.topics().deletePartitionedTopic(topic, false);
+ }
+
}