congbobo184 commented on code in PR #18293:
URL: https://github.com/apache/pulsar/pull/18293#discussion_r1015148231
##########
pulsar-broker/src/main/java/org/apache/pulsar/broker/service/schema/BookkeeperSchemaStorage.java:
##########
@@ -289,22 +287,8 @@ private CompletableFuture<Long> putSchema(String schemaId,
byte[] data, byte[] h
updateSchemaLocator(schemaId,
optLocatorEntry.get(), position, hash)
.thenAccept(future::complete)
.exceptionally(ex -> {
- if (ex.getCause() instanceof
BadVersionException) {
- // There was a race
condition on the schema creation.
- // Since it has now been
created,
- // retry the whole
operation so that we have a chance to
- // recover without
bubbling error
- putSchema(schemaId, data,
hash)
-
.thenAccept(future::complete)
- .exceptionally(ex2
-> {
-
future.completeExceptionally(ex2);
- return null;
- });
- } else {
- // For other errors, just fail the
operation
- future.completeExceptionally(ex);
- }
- return null;
+
future.completeExceptionally(ex);
+ return null;
Review Comment:
return
readSchemaEntry(locator.getIndexList().get(0).getPosition())
.thenCompose(schemaEntry ->
addNewSchemaEntryToStore(schemaId, locator.getIndexList(), data)
.thenCompose(position ->
updateSchemaLocator(schemaId,
optLocatorEntry.get(), position, hash))
##########
pulsar-broker/src/main/java/org/apache/pulsar/broker/service/schema/BookkeeperSchemaStorage.java:
##########
@@ -315,21 +299,7 @@ private CompletableFuture<Long> putSchema(String schemaId,
byte[] data, byte[] h
createNewSchema(schemaId, data, hash)
.thenAccept(future::complete)
.exceptionally(ex -> {
- if (ex.getCause() instanceof AlreadyExistsException
- || ex.getCause() instanceof
BadVersionException) {
- // There was a race condition on the schema
creation. Since it has now been created,
- // retry the whole operation so that we have a
chance to recover without bubbling error
- // back to producer/consumer
- putSchema(schemaId, data, hash)
- .thenAccept(future::complete)
- .exceptionally(ex2 -> {
- future.completeExceptionally(ex2);
- return null;
- });
- } else {
- // For other errors, just fail the operation
- future.completeExceptionally(ex);
- }
+ future.completeExceptionally(ex);
Review Comment:
return createNewSchema(schemaId, data, hash);
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]