This is an automated email from the ASF dual-hosted git repository.

penghui pushed a commit to branch branch-2.7
in repository https://gitbox.apache.org/repos/asf/pulsar.git

commit 9eca22fbb1551a057007d808f61961f4cced16b6
Author: Rajan Dhabalia <[email protected]>
AuthorDate: Sat Feb 6 20:58:32 2021 -0800

    [pulsar-broker] Fix: handle topic loading failure due to broken schema 
ledger (#9212)
    
    add more error log
    
    fix list assignment
    
    (cherry picked from commit 3d5d6f6a1681cf842a2e3334d412fa449c553fd0)
---
 .../service/schema/BookkeeperSchemaStorage.java    | 53 +++++++++++++---
 .../schema/DefaultSchemaRegistryService.java       |  5 ++
 .../broker/service/schema/SchemaRegistry.java      |  2 +
 .../service/schema/SchemaRegistryServiceImpl.java  | 73 +++++++++++++++++-----
 .../service/schema/exceptions/SchemaException.java | 10 +++
 ...hemaRegistryServiceWithSchemaDataValidator.java |  7 ++-
 .../broker/service/schema/ClientGetSchemaTest.java | 67 ++++++++++++++++++++
 .../java/org/apache/pulsar/schema/SchemaTest.java  | 16 +++--
 .../common/protocol/schema/SchemaStorage.java      |  2 +
 9 files changed, 207 insertions(+), 28 deletions(-)

diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/schema/BookkeeperSchemaStorage.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/schema/BookkeeperSchemaStorage.java
index 43ac9bf..db948c5 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/schema/BookkeeperSchemaStorage.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/schema/BookkeeperSchemaStorage.java
@@ -39,8 +39,9 @@ import java.util.Optional;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.ExecutionException;
 import java.util.concurrent.atomic.AtomicInteger;
-
+import java.util.stream.Collectors;
 import javax.validation.constraints.NotNull;
 
 import org.apache.bookkeeper.client.BKException;
@@ -51,6 +52,7 @@ import org.apache.bookkeeper.mledger.impl.LedgerMetadataUtils;
 import org.apache.bookkeeper.util.ZkUtils;
 import org.apache.pulsar.broker.PulsarService;
 import org.apache.pulsar.broker.ServiceConfiguration;
+import org.apache.pulsar.broker.service.schema.exceptions.SchemaException;
 import org.apache.pulsar.common.protocol.schema.SchemaStorage;
 import org.apache.pulsar.common.protocol.schema.SchemaVersion;
 import org.apache.pulsar.common.schema.LongSchemaVersion;
@@ -132,7 +134,7 @@ public class BookkeeperSchemaStorage implements 
SchemaStorage {
     @Override
     public CompletableFuture<List<CompletableFuture<StoredSchema>>> 
getAll(String key) {
         CompletableFuture<List<CompletableFuture<StoredSchema>>> result = new 
CompletableFuture<>();
-        getSchemaLocator(getSchemaPath(key)).thenAccept(locator -> {
+        getLocator(key).thenAccept(locator -> {
             if (log.isDebugEnabled()) {
                 log.debug("[{}] Get all schemas - locator: {}", key, locator);
             }
@@ -157,9 +159,42 @@ public class BookkeeperSchemaStorage implements 
SchemaStorage {
         return result;
     }
 
+    private CompletableFuture<Optional<LocatorEntry>> getLocator(String key) {
+        return getSchemaLocator(getSchemaPath(key));
+    }
+
+    public void clearLocatorCache(String key) {
+        localZkCache.invalidate(getSchemaPath(key));
+    }
+
+    @VisibleForTesting
+    List<Long> getSchemaLedgerList(String key) throws IOException {
+        Optional<LocatorEntry> locatorEntry = null;
+        try {
+            locatorEntry = getLocator(key).get();
+        } catch (Exception e) {
+            log.warn("Failed to get list of schema-storage ledger for {}", key,
+                    (e instanceof ExecutionException ? e.getCause() : e));
+            throw new IOException("Failed to get schema ledger for" + key);
+        }
+        LocatorEntry entry = locatorEntry.orElse(null);
+        return entry != null ? entry.locator.getIndexList().stream().map(i -> 
i.getPosition().getLedgerId())
+                .collect(Collectors.toList()) : null;
+    }
+
+    @VisibleForTesting
+    BookKeeper getBookKeeper() {
+        return bookKeeper;
+    }
+
+    @Override
+    public CompletableFuture<SchemaVersion> delete(String key, boolean 
forcefully) {
+        return deleteSchema(key, forcefully).thenApply(LongSchemaVersion::new);
+    }
+
     @Override
     public CompletableFuture<SchemaVersion> delete(String key) {
-        return deleteSchema(key).thenApply(LongSchemaVersion::new);
+        return delete(key, false);
     }
 
     @NotNull
@@ -350,9 +385,10 @@ public class BookkeeperSchemaStorage implements 
SchemaStorage {
     }
 
     @NotNull
-    private CompletableFuture<Long> deleteSchema(String schemaId) {
-        return getSchema(schemaId).thenCompose(schemaAndVersion -> {
-            if (isNull(schemaAndVersion)) {
+    private CompletableFuture<Long> deleteSchema(String schemaId, boolean 
forceFully) {
+        return (forceFully ? CompletableFuture.completedFuture(null) : 
getSchema(schemaId))
+                .thenCompose(schemaAndVersion -> {
+            if (!forceFully && isNull(schemaAndVersion)) {
                 return completedFuture(null);
             } else {
                 // The version is only for the compatibility of the current 
interface
@@ -373,6 +409,7 @@ public class BookkeeperSchemaStorage implements 
SchemaStorage {
                                 } catch (InterruptedException | 
KeeperException e) {
                                     future.completeExceptionally(e);
                                 }
+                                clearLocatorCache(getSchemaPath(schemaId));
                                 future.complete(version);
                             }
                         }, null);
@@ -656,6 +693,8 @@ public class BookkeeperSchemaStorage implements 
SchemaStorage {
         if (entryId != -1) {
             message += " - entry=" + entryId;
         }
-        return new IOException(message);
+        boolean recoverable = rc != 
BKException.Code.NoSuchLedgerExistsException
+                && rc != BKException.Code.NoSuchEntryException;
+        return new SchemaException(recoverable, message);
     }
 }
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/schema/DefaultSchemaRegistryService.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/schema/DefaultSchemaRegistryService.java
index 807fb2de..7069e24 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/schema/DefaultSchemaRegistryService.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/schema/DefaultSchemaRegistryService.java
@@ -83,6 +83,11 @@ public class DefaultSchemaRegistryService implements 
SchemaRegistryService {
     }
 
     @Override
+    public CompletableFuture<SchemaVersion> deleteSchemaStorage(String 
schemaId, boolean forcefully) {
+        return completedFuture(null);
+    }
+
+    @Override
     public CompletableFuture<Boolean> isCompatible(String schemaId, SchemaData 
schema,
                                                    SchemaCompatibilityStrategy 
strategy) {
         return completedFuture(false);
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/schema/SchemaRegistry.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/schema/SchemaRegistry.java
index 3f44ed0..4108298 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/schema/SchemaRegistry.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/schema/SchemaRegistry.java
@@ -43,6 +43,8 @@ public interface SchemaRegistry extends AutoCloseable {
 
     CompletableFuture<SchemaVersion> deleteSchemaStorage(String schemaId);
 
+    CompletableFuture<SchemaVersion> deleteSchemaStorage(String schemaId, 
boolean forcefully);
+
     CompletableFuture<Boolean> isCompatible(String schemaId, SchemaData schema,
                                             SchemaCompatibilityStrategy 
strategy);
 
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/schema/SchemaRegistryServiceImpl.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/schema/SchemaRegistryServiceImpl.java
index c06775b..d40c524 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/schema/SchemaRegistryServiceImpl.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/schema/SchemaRegistryServiceImpl.java
@@ -20,11 +20,11 @@ package org.apache.pulsar.broker.service.schema;
 
 import static java.util.Objects.isNull;
 import static java.util.concurrent.CompletableFuture.completedFuture;
+import static 
org.apache.pulsar.broker.service.schema.SchemaRegistryServiceImpl.Functions.toPairs;
 import static 
org.apache.pulsar.common.policies.data.SchemaCompatibilityStrategy.BACKWARD_TRANSITIVE;
 import static 
org.apache.pulsar.common.policies.data.SchemaCompatibilityStrategy.FORWARD_TRANSITIVE;
 import static 
org.apache.pulsar.common.policies.data.SchemaCompatibilityStrategy.FULL_TRANSITIVE;
 import static 
org.apache.pulsar.broker.service.schema.SchemaRegistryServiceImpl.Functions.toMap;
-import static 
org.apache.pulsar.broker.service.schema.SchemaRegistryServiceImpl.Functions.toPairs;
 
 import com.google.common.annotations.VisibleForTesting;
 import com.google.common.hash.HashFunction;
@@ -42,10 +42,11 @@ import java.util.Map;
 import java.util.concurrent.CompletableFuture;
 import java.util.stream.Collectors;
 import javax.validation.constraints.NotNull;
-
+import lombok.extern.slf4j.Slf4j;
 import org.apache.bookkeeper.common.concurrent.FutureUtils;
 import org.apache.commons.collections.CollectionUtils;
 import 
org.apache.pulsar.broker.service.schema.exceptions.IncompatibleSchemaException;
+import org.apache.pulsar.broker.service.schema.exceptions.SchemaException;
 import org.apache.pulsar.broker.service.schema.proto.SchemaRegistryFormat;
 import org.apache.pulsar.common.policies.data.SchemaCompatibilityStrategy;
 import org.apache.pulsar.common.protocol.schema.SchemaData;
@@ -57,6 +58,7 @@ import org.apache.pulsar.common.schema.SchemaType;
 import org.apache.pulsar.common.protocol.schema.SchemaVersion;
 import org.apache.pulsar.common.util.FutureUtil;
 
+@Slf4j
 public class SchemaRegistryServiceImpl implements SchemaRegistryService {
     private static HashFunction hashFunction = Hashing.sha256();
     private final Map<SchemaType, SchemaCompatibilityCheck> 
compatibilityChecks;
@@ -177,7 +179,12 @@ public class SchemaRegistryServiceImpl implements 
SchemaRegistryService {
 
     @Override
     public CompletableFuture<SchemaVersion> deleteSchemaStorage(String 
schemaId) {
-        return schemaStorage.delete(schemaId);
+        return deleteSchemaStorage(schemaId, false);
+    }
+
+    @Override
+    public CompletableFuture<SchemaVersion> deleteSchemaStorage(String 
schemaId, boolean forcefully) {
+        return schemaStorage.delete(schemaId, forcefully);
     }
 
     @Override
@@ -342,20 +349,58 @@ public class SchemaRegistryServiceImpl implements 
SchemaRegistryService {
     }
 
     public CompletableFuture<List<SchemaAndMetadata>> 
trimDeletedSchemaAndGetList(String schemaId) {
-        return 
getAllSchemas(schemaId).thenCompose(FutureUtils::collect).thenApply(list -> {
-            // Trim the prefix of schemas before the latest delete.
-            int lastIndex = list.size() - 1;
-            for (int i = lastIndex; i >= 0; i--) {
-                if (list.get(i).schema.isDeleted()) {
-                    if (i == lastIndex) { // if the latest schema is a delete, 
there's no schemas to compare
-                        return Collections.emptyList();
-                    } else {
-                        return list.subList(i + 1, list.size());
-                    }
+
+        CompletableFuture<List<SchemaAndMetadata>> schemaResult = new 
CompletableFuture<>();
+        CompletableFuture<List<CompletableFuture<SchemaAndMetadata>>> 
schemaFutureList = getAllSchemas(schemaId);
+        schemaFutureList.thenCompose(FutureUtils::collect).handle((schemaList, 
ex) -> {
+            List<SchemaAndMetadata> list = ex != null ? new ArrayList<>() : 
schemaList;
+            if (ex != null) {
+                boolean recoverable = ex.getCause() != null && (ex.getCause() 
instanceof SchemaException)
+                        ? ((SchemaException) ex.getCause()).isRecoverable()
+                        : true;
+                // if error is recoverable then fail the request.
+                if (recoverable) {
+                    schemaResult.completeExceptionally(ex.getCause());
+                    return null;
                 }
+                // clean the schema list for recoverable and delete the schema 
from zk
+                
schemaFutureList.getNow(Collections.emptyList()).forEach(schemaFuture -> {
+                    if (!schemaFuture.isCompletedExceptionally()) {
+                        list.add(schemaFuture.getNow(null));
+                        return;
+                    }
+                });
+                trimDeletedSchemaAndGetList(list);
+                // clean up the broken schema from zk
+                deleteSchemaStorage(schemaId, true).handle((sv, th) -> {
+                    log.info("Clean up non-recoverable schema {}. Deletion of 
schema {} {}", ex.getCause().getMessage(),
+                            schemaId, (th == null ? "successful" : "failed, " 
+ th.getCause().getMessage()));
+                    schemaResult.complete(list);
+                    return null;
+                });
+                return null;
             }
-            return list;
+            // trim the deleted schema and return the result if schema is 
retrieved successfully
+            List<SchemaAndMetadata> trimmed = 
trimDeletedSchemaAndGetList(list);
+            schemaResult.complete(trimmed);
+            return null;
         });
+        return schemaResult;
+    }
+
+    private List<SchemaAndMetadata> 
trimDeletedSchemaAndGetList(List<SchemaAndMetadata> list) {
+        // Trim the prefix of schemas before the latest delete.
+        int lastIndex = list.size() - 1;
+        for (int i = lastIndex; i >= 0; i--) {
+            if (list.get(i).schema.isDeleted()) {
+                if (i == lastIndex) { // if the latest schema is a delete, 
there's no schemas to compare
+                    return Collections.emptyList();
+                } else {
+                    return list.subList(i + 1, list.size());
+                }
+            }
+        }
+        return list;
     }
 
     interface Functions {
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/schema/exceptions/SchemaException.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/schema/exceptions/SchemaException.java
index 8d85b39..e9267d6 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/schema/exceptions/SchemaException.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/schema/exceptions/SchemaException.java
@@ -26,6 +26,12 @@ import 
org.apache.pulsar.broker.service.BrokerServiceException;
 public class SchemaException extends BrokerServiceException {
 
     private static final long serialVersionUID = -6587520779026691815L;
+    private boolean recoverable;
+
+    public SchemaException(boolean recoverable, String message) {
+        super(message);
+        this.recoverable = recoverable;
+    }
 
     public SchemaException(String message) {
         super(message);
@@ -38,4 +44,8 @@ public class SchemaException extends BrokerServiceException {
     public SchemaException(String message, Throwable cause) {
         super(message, cause);
     }
+
+    public boolean isRecoverable() {
+        return recoverable;
+    }
 }
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/schema/validator/SchemaRegistryServiceWithSchemaDataValidator.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/schema/validator/SchemaRegistryServiceWithSchemaDataValidator.java
index 570b5bf..a317942 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/schema/validator/SchemaRegistryServiceWithSchemaDataValidator.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/schema/validator/SchemaRegistryServiceWithSchemaDataValidator.java
@@ -103,7 +103,12 @@ public class SchemaRegistryServiceWithSchemaDataValidator 
implements SchemaRegis
 
     @Override
     public CompletableFuture<SchemaVersion> deleteSchemaStorage(String 
schemaId) {
-        return service.deleteSchemaStorage(schemaId);
+        return deleteSchemaStorage(schemaId, false);
+    }
+
+    @Override
+    public CompletableFuture<SchemaVersion> deleteSchemaStorage(String 
schemaId, boolean forcefully) {
+        return service.deleteSchemaStorage(schemaId, forcefully);
     }
 
     @Override
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/schema/ClientGetSchemaTest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/schema/ClientGetSchemaTest.java
index dc35286..8ce8552 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/schema/ClientGetSchemaTest.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/schema/ClientGetSchemaTest.java
@@ -18,6 +18,9 @@
  */
 package org.apache.pulsar.broker.service.schema;
 
+import static org.apache.pulsar.common.naming.TopicName.PUBLIC_TENANT;
+import static 
org.apache.pulsar.schema.compatibility.SchemaCompatibilityCheckTest.randomName;
+import static org.testng.Assert.assertNotEquals;
 import static org.testng.Assert.assertEquals;
 
 import java.util.ArrayList;
@@ -26,6 +29,7 @@ import java.util.Optional;
 
 import lombok.Cleanup;
 
+import org.apache.pulsar.client.api.Consumer;
 import org.apache.pulsar.client.api.Producer;
 import org.apache.pulsar.client.api.ProducerConsumerBase;
 import org.apache.pulsar.client.api.PulsarClient;
@@ -33,11 +37,16 @@ import org.apache.pulsar.client.api.PulsarClientException;
 import org.apache.pulsar.client.api.Schema;
 import org.apache.pulsar.client.api.schema.SchemaDefinition;
 import org.apache.pulsar.client.impl.PulsarClientImpl;
+import org.apache.pulsar.common.naming.TopicDomain;
+import org.apache.pulsar.common.naming.TopicName;
+import org.apache.pulsar.schema.Schemas;
 import org.testng.annotations.AfterClass;
 import org.testng.annotations.BeforeClass;
 import org.testng.annotations.DataProvider;
 import org.testng.annotations.Test;
 
+import com.google.common.collect.Sets;
+
 public class ClientGetSchemaTest extends ProducerConsumerBase {
 
     private static final String topicBytes = "my-property/my-ns/topic-bytes";
@@ -104,4 +113,62 @@ public class ClientGetSchemaTest extends 
ProducerConsumerBase {
         assertEquals(client.getSchema(topicAvro).join(), 
Optional.of(Schema.AVRO(MyClass.class).getSchemaInfo()));
     }
 
+    /**
+     * It validates if schema ledger is deleted or non recoverable then it 
will clean up schema storage for the topic
+     * and make the topic available.
+     * 
+     * @throws Exception
+     */
+    @Test
+    public void testSchemaFailure() throws Exception {
+        final String tenant = PUBLIC_TENANT;
+        final String namespace = "test-namespace-" + randomName(16);
+        final String topicOne = "test-broken-schema-storage";
+        final String fqtnOne = TopicName.get(TopicDomain.persistent.value(), 
tenant, namespace, topicOne).toString();
+
+        admin.namespaces().createNamespace(tenant + "/" + namespace, 
Sets.newHashSet("test"));
+
+        // (1) create topic with schema
+        Producer<Schemas.PersonTwo> producer = pulsarClient
+                .newProducer(Schema.AVRO(SchemaDefinition.<Schemas.PersonTwo> 
builder().withAlwaysAllowNull(false)
+                        
.withSupportSchemaVersioning(true).withPojo(Schemas.PersonTwo.class).build()))
+                .topic(fqtnOne).create();
+
+        producer.close();
+
+        String key = TopicName.get(fqtnOne).getSchemaName();
+        BookkeeperSchemaStorage schemaStrogate = (BookkeeperSchemaStorage) 
pulsar.getSchemaStorage();
+        long schemaLedgerId = schemaStrogate.getSchemaLedgerList(key).get(0);
+
+        // (2) break schema locator by deleting schema-ledger 
+        schemaStrogate.getBookKeeper().deleteLedger(schemaLedgerId);
+
+        admin.topics().unload(fqtnOne);
+
+        // (3) create topic again: broker should handle broken schema and load 
the topic successfully
+        producer = pulsarClient
+                .newProducer(Schema.AVRO(SchemaDefinition.<Schemas.PersonTwo> 
builder().withAlwaysAllowNull(false)
+                        
.withSupportSchemaVersioning(true).withPojo(Schemas.PersonTwo.class).build()))
+                .topic(fqtnOne).create();
+
+        assertNotEquals(schemaLedgerId, 
schemaStrogate.getSchemaLedgerList(key).get(0).longValue());
+
+        Schemas.PersonTwo personTwo = new Schemas.PersonTwo();
+        personTwo.setId(1);
+        personTwo.setName("Tom");
+
+        Consumer<Schemas.PersonTwo> consumer = pulsarClient
+                .newConsumer(Schema.AVRO(SchemaDefinition.<Schemas.PersonTwo> 
builder().withAlwaysAllowNull(false)
+                        
.withSupportSchemaVersioning(true).withPojo(Schemas.PersonTwo.class).build()))
+                .subscriptionName("test").topic(fqtnOne).subscribe();
+
+        producer.send(personTwo);
+
+        Schemas.PersonTwo personConsume = consumer.receive().getValue();
+        assertEquals("Tom", personConsume.getName());
+        assertEquals(1, personConsume.getId());
+
+        producer.close();
+        consumer.close();
+    }
 }
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/schema/SchemaTest.java 
b/pulsar-broker/src/test/java/org/apache/pulsar/schema/SchemaTest.java
index 2da8401..b8580fd 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/schema/SchemaTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/schema/SchemaTest.java
@@ -18,9 +18,15 @@
  */
 package org.apache.pulsar.schema;
 
-import com.google.common.collect.Sets;
-import lombok.extern.slf4j.Slf4j;
+import static org.apache.pulsar.common.naming.TopicName.PUBLIC_TENANT;
+import static 
org.apache.pulsar.schema.compatibility.SchemaCompatibilityCheckTest.randomName;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotEquals;
+
+import java.util.Collections;
+
 import org.apache.pulsar.broker.auth.MockedPulsarServiceBaseTest;
+import org.apache.pulsar.broker.service.schema.BookkeeperSchemaStorage;
 import org.apache.pulsar.client.api.Consumer;
 import org.apache.pulsar.client.api.Message;
 import org.apache.pulsar.client.api.Producer;
@@ -35,11 +41,9 @@ import org.testng.annotations.AfterMethod;
 import org.testng.annotations.BeforeMethod;
 import org.testng.annotations.Test;
 
-import java.util.Collections;
+import com.google.common.collect.Sets;
 
-import static org.apache.pulsar.common.naming.TopicName.PUBLIC_TENANT;
-import static 
org.apache.pulsar.schema.compatibility.SchemaCompatibilityCheckTest.randomName;
-import static org.junit.Assert.assertEquals;
+import lombok.extern.slf4j.Slf4j;
 
 @Slf4j
 public class SchemaTest extends MockedPulsarServiceBaseTest {
diff --git 
a/pulsar-common/src/main/java/org/apache/pulsar/common/protocol/schema/SchemaStorage.java
 
b/pulsar-common/src/main/java/org/apache/pulsar/common/protocol/schema/SchemaStorage.java
index 9f007aa..c34154b 100644
--- 
a/pulsar-common/src/main/java/org/apache/pulsar/common/protocol/schema/SchemaStorage.java
+++ 
b/pulsar-common/src/main/java/org/apache/pulsar/common/protocol/schema/SchemaStorage.java
@@ -32,6 +32,8 @@ public interface SchemaStorage {
 
     CompletableFuture<List<CompletableFuture<StoredSchema>>> getAll(String 
key);
 
+    CompletableFuture<SchemaVersion> delete(String key, boolean forcefully);
+
     CompletableFuture<SchemaVersion> delete(String key);
 
     SchemaVersion versionFromBytes(byte[] version);

Reply via email to