sijie commented on a change in pull request #9212:
URL: https://github.com/apache/pulsar/pull/9212#discussion_r559322040
##########
File path:
pulsar-broker/src/main/java/org/apache/pulsar/broker/service/schema/SchemaRegistry.java
##########
@@ -41,6 +41,8 @@
CompletableFuture<SchemaVersion> deleteSchemaStorage(String schemaId);
+ CompletableFuture<SchemaVersion> deleteSchemaStorage(String schemaId,
boolean forcefully);
Review comment:
mark this a default method?
##########
File path:
pulsar-common/src/main/java/org/apache/pulsar/common/protocol/schema/SchemaStorage.java
##########
@@ -32,6 +32,8 @@
CompletableFuture<List<CompletableFuture<StoredSchema>>> getAll(String
key);
+ CompletableFuture<SchemaVersion> delete(String key, boolean forcefully);
Review comment:
make this a default method?
##########
File path:
pulsar-broker/src/main/java/org/apache/pulsar/broker/service/schema/SchemaRegistryServiceImpl.java
##########
@@ -345,20 +352,58 @@ private void checkCompatible(SchemaAndMetadata
existingSchema, SchemaData newSch
}
public CompletableFuture<List<SchemaAndMetadata>>
trimDeletedSchemaAndGetList(String schemaId) {
- return
getAllSchemas(schemaId).thenCompose(FutureUtils::collect).thenApply(list -> {
- // Trim the prefix of schemas before the latest delete.
- int lastIndex = list.size() - 1;
- for (int i = lastIndex; i >= 0; i--) {
- if (list.get(i).schema.isDeleted()) {
- if (i == lastIndex) { // if the latest schema is a delete,
there's no schemas to compare
- return Collections.emptyList();
- } else {
- return list.subList(i + 1, list.size());
- }
+
+ CompletableFuture<List<SchemaAndMetadata>> schemaResult = new
CompletableFuture<>();
+ CompletableFuture<List<CompletableFuture<SchemaAndMetadata>>>
schemaFutureList = getAllSchemas(schemaId);
+ schemaFutureList.thenCompose(FutureUtils::collect).handle((schemaList,
ex) -> {
+ List<SchemaAndMetadata> list = ex != null ? new ArrayList<>() :
schemaList;
+ if (ex != null) {
+ boolean recoverable = ex.getCause() != null && (ex.getCause()
instanceof SchemaException)
+ ? ((SchemaException) ex.getCause()).isRecoverable()
+ : true;
+ // if error is recoverable then fail the request.
+ if (recoverable) {
+ schemaResult.completeExceptionally(ex.getCause());
+ return null;
}
+ // clean the schema list for recoverable and delete the schema
from zk
+
schemaFutureList.getNow(Collections.emptyList()).forEach(schemaFuture -> {
+ if (!schemaFuture.isCompletedExceptionally()) {
+ list.add(schemaFuture.getNow(null));
+ return;
+ }
+ });
+ trimDeletedSchemaAndGetList(list);
+ // clean up the broken schema from zk
+ deleteSchemaStorage(schemaId, true).handle((sv, th) -> {
+ log.info("Deletion of {} {}", schemaId,
Review comment:
What a user needs to do when he/she reads this message? Especially if it
is "Deletion of ... failed", it might be worth adding more details for the
"failed" case.
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]