rdhabalia commented on a change in pull request #9212:
URL: https://github.com/apache/pulsar/pull/9212#discussion_r559935224
##########
File path:
pulsar-broker/src/main/java/org/apache/pulsar/broker/service/schema/SchemaRegistryServiceImpl.java
##########
@@ -345,20 +352,58 @@ private void checkCompatible(SchemaAndMetadata
existingSchema, SchemaData newSch
}
public CompletableFuture<List<SchemaAndMetadata>>
trimDeletedSchemaAndGetList(String schemaId) {
- return
getAllSchemas(schemaId).thenCompose(FutureUtils::collect).thenApply(list -> {
- // Trim the prefix of schemas before the latest delete.
- int lastIndex = list.size() - 1;
- for (int i = lastIndex; i >= 0; i--) {
- if (list.get(i).schema.isDeleted()) {
- if (i == lastIndex) { // if the latest schema is a delete,
there's no schemas to compare
- return Collections.emptyList();
- } else {
- return list.subList(i + 1, list.size());
- }
+
+ CompletableFuture<List<SchemaAndMetadata>> schemaResult = new
CompletableFuture<>();
+ CompletableFuture<List<CompletableFuture<SchemaAndMetadata>>>
schemaFutureList = getAllSchemas(schemaId);
+ schemaFutureList.thenCompose(FutureUtils::collect).handle((schemaList,
ex) -> {
+ List<SchemaAndMetadata> list = ex != null ? new ArrayList<>() :
schemaList;
+ if (ex != null) {
+ boolean recoverable = ex.getCause() != null && (ex.getCause()
instanceof SchemaException)
+ ? ((SchemaException) ex.getCause()).isRecoverable()
+ : true;
+ // if error is recoverable then fail the request.
+ if (recoverable) {
+ schemaResult.completeExceptionally(ex.getCause());
+ return null;
}
+ // clean the schema list for recoverable and delete the schema
from zk
+
schemaFutureList.getNow(Collections.emptyList()).forEach(schemaFuture -> {
+ if (!schemaFuture.isCompletedExceptionally()) {
+ list.add(schemaFuture.getNow(null));
+ return;
+ }
+ });
+ trimDeletedSchemaAndGetList(list);
+ // clean up the broken schema from zk
+ deleteSchemaStorage(schemaId, true).handle((sv, th) -> {
+ log.info("Deletion of {} {}", schemaId,
Review comment:
sure, added.
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]