This is an automated email from the ASF dual-hosted git repository.
lhotari pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/master by this push:
new 11073fd4dcb [fix][ml] Fix potential NPE cause future never complete.
(#19415)
11073fd4dcb is described below
commit 11073fd4dcb92c61fa2a2641f07c800e940cb319
Author: Qiang Zhao <[email protected]>
AuthorDate: Fri Feb 3 21:57:09 2023 +0800
[fix][ml] Fix potential NPE cause future never complete. (#19415)
---
.../pulsar/broker/service/schema/SchemaRegistryServiceImpl.java | 9 ++++-----
1 file changed, 4 insertions(+), 5 deletions(-)
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/schema/SchemaRegistryServiceImpl.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/schema/SchemaRegistryServiceImpl.java
index bc50c55b2ba..4eb87564d0f 100644
---
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/schema/SchemaRegistryServiceImpl.java
+++
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/schema/SchemaRegistryServiceImpl.java
@@ -552,12 +552,11 @@ public class SchemaRegistryServiceImpl implements
SchemaRegistryService {
schemaFutureList.thenCompose(FutureUtils::collect).handle((schemaList,
ex) -> {
List<SchemaAndMetadata> list = ex != null ? new ArrayList<>() :
schemaList;
if (ex != null) {
- boolean recoverable = ex.getCause() != null && (ex.getCause()
instanceof SchemaException)
- ? ((SchemaException) ex.getCause()).isRecoverable()
- : true;
+ final Throwable rc = FutureUtil.unwrapCompletionException(ex);
+ boolean recoverable = !(rc instanceof SchemaException) ||
((SchemaException) rc).isRecoverable();
// if error is recoverable then fail the request.
if (recoverable) {
- schemaResult.completeExceptionally(ex.getCause());
+ schemaResult.completeExceptionally(rc);
return null;
}
// clean the schema list for recoverable and delete the schema
from zk
@@ -570,7 +569,7 @@ public class SchemaRegistryServiceImpl implements
SchemaRegistryService {
trimDeletedSchemaAndGetList(list);
// clean up the broken schema from zk
deleteSchemaStorage(schemaId, true).handle((sv, th) -> {
- log.info("Clean up non-recoverable schema {}. Deletion of
schema {} {}", ex.getCause().getMessage(),
+ log.info("Clean up non-recoverable schema {}. Deletion of
schema {} {}", rc.getMessage(),
schemaId, (th == null ? "successful" : "failed, "
+ th.getCause().getMessage()));
schemaResult.complete(list);
return null;