sijie commented on a change in pull request #9212:
URL: https://github.com/apache/pulsar/pull/9212#discussion_r559322040



##########
File path: 
pulsar-broker/src/main/java/org/apache/pulsar/broker/service/schema/SchemaRegistry.java
##########
@@ -41,6 +41,8 @@
 
     CompletableFuture<SchemaVersion> deleteSchemaStorage(String schemaId);
 
+    CompletableFuture<SchemaVersion> deleteSchemaStorage(String schemaId, 
boolean forcefully);

Review comment:
       mark this a default method?

##########
File path: 
pulsar-common/src/main/java/org/apache/pulsar/common/protocol/schema/SchemaStorage.java
##########
@@ -32,6 +32,8 @@
 
     CompletableFuture<List<CompletableFuture<StoredSchema>>> getAll(String 
key);
 
+    CompletableFuture<SchemaVersion> delete(String key, boolean forcefully);

Review comment:
       make this a default method?

##########
File path: 
pulsar-broker/src/main/java/org/apache/pulsar/broker/service/schema/SchemaRegistryServiceImpl.java
##########
@@ -345,20 +352,58 @@ private void checkCompatible(SchemaAndMetadata 
existingSchema, SchemaData newSch
     }
 
     public CompletableFuture<List<SchemaAndMetadata>> 
trimDeletedSchemaAndGetList(String schemaId) {
-        return 
getAllSchemas(schemaId).thenCompose(FutureUtils::collect).thenApply(list -> {
-            // Trim the prefix of schemas before the latest delete.
-            int lastIndex = list.size() - 1;
-            for (int i = lastIndex; i >= 0; i--) {
-                if (list.get(i).schema.isDeleted()) {
-                    if (i == lastIndex) { // if the latest schema is a delete, 
there's no schemas to compare
-                        return Collections.emptyList();
-                    } else {
-                        return list.subList(i + 1, list.size());
-                    }
+
+        CompletableFuture<List<SchemaAndMetadata>> schemaResult = new 
CompletableFuture<>();
+        CompletableFuture<List<CompletableFuture<SchemaAndMetadata>>> 
schemaFutureList = getAllSchemas(schemaId);
+        schemaFutureList.thenCompose(FutureUtils::collect).handle((schemaList, 
ex) -> {
+            List<SchemaAndMetadata> list = ex != null ? new ArrayList<>() : 
schemaList;
+            if (ex != null) {
+                boolean recoverable = ex.getCause() != null && (ex.getCause() 
instanceof SchemaException)
+                        ? ((SchemaException) ex.getCause()).isRecoverable()
+                        : true;
+                // if error is recoverable then fail the request.
+                if (recoverable) {
+                    schemaResult.completeExceptionally(ex.getCause());
+                    return null;
                 }
+                // clean the schema list for recoverable and delete the schema 
from zk
+                
schemaFutureList.getNow(Collections.emptyList()).forEach(schemaFuture -> {
+                    if (!schemaFuture.isCompletedExceptionally()) {
+                        list.add(schemaFuture.getNow(null));
+                        return;
+                    }
+                });
+                trimDeletedSchemaAndGetList(list);
+                // clean up the broken schema from zk
+                deleteSchemaStorage(schemaId, true).handle((sv, th) -> {
+                    log.info("Deletion of {} {}", schemaId,

Review comment:
       What a user needs to do when he/she reads this message? Especially if it 
is "Deletion of ... failed", it might be worth adding more details for the 
"failed" case.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to