rdhabalia commented on a change in pull request #9212:
URL: https://github.com/apache/pulsar/pull/9212#discussion_r559935224



##########
File path: 
pulsar-broker/src/main/java/org/apache/pulsar/broker/service/schema/SchemaRegistryServiceImpl.java
##########
@@ -345,20 +352,58 @@ private void checkCompatible(SchemaAndMetadata 
existingSchema, SchemaData newSch
     }
 
     public CompletableFuture<List<SchemaAndMetadata>> 
trimDeletedSchemaAndGetList(String schemaId) {
-        return 
getAllSchemas(schemaId).thenCompose(FutureUtils::collect).thenApply(list -> {
-            // Trim the prefix of schemas before the latest delete.
-            int lastIndex = list.size() - 1;
-            for (int i = lastIndex; i >= 0; i--) {
-                if (list.get(i).schema.isDeleted()) {
-                    if (i == lastIndex) { // if the latest schema is a delete, 
there's no schemas to compare
-                        return Collections.emptyList();
-                    } else {
-                        return list.subList(i + 1, list.size());
-                    }
+
+        CompletableFuture<List<SchemaAndMetadata>> schemaResult = new 
CompletableFuture<>();
+        CompletableFuture<List<CompletableFuture<SchemaAndMetadata>>> 
schemaFutureList = getAllSchemas(schemaId);
+        schemaFutureList.thenCompose(FutureUtils::collect).handle((schemaList, 
ex) -> {
+            List<SchemaAndMetadata> list = ex != null ? new ArrayList<>() : 
schemaList;
+            if (ex != null) {
+                boolean recoverable = ex.getCause() != null && (ex.getCause() 
instanceof SchemaException)
+                        ? ((SchemaException) ex.getCause()).isRecoverable()
+                        : true;
+                // if error is recoverable then fail the request.
+                if (recoverable) {
+                    schemaResult.completeExceptionally(ex.getCause());
+                    return null;
                 }
+                // clean the schema list for recoverable and delete the schema 
from zk
+                
schemaFutureList.getNow(Collections.emptyList()).forEach(schemaFuture -> {
+                    if (!schemaFuture.isCompletedExceptionally()) {
+                        list.add(schemaFuture.getNow(null));
+                        return;
+                    }
+                });
+                trimDeletedSchemaAndGetList(list);
+                // clean up the broken schema from zk
+                deleteSchemaStorage(schemaId, true).handle((sv, th) -> {
+                    log.info("Deletion of {} {}", schemaId,

Review comment:
       sure, added.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to