This is an automated email from the ASF dual-hosted git repository.

lhotari pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/master by this push:
     new 11073fd4dcb [fix][ml]  Fix potential NPE cause future never complete. 
(#19415)
11073fd4dcb is described below

commit 11073fd4dcb92c61fa2a2641f07c800e940cb319
Author: Qiang Zhao <[email protected]>
AuthorDate: Fri Feb 3 21:57:09 2023 +0800

    [fix][ml]  Fix potential NPE cause future never complete. (#19415)
---
 .../pulsar/broker/service/schema/SchemaRegistryServiceImpl.java  | 9 ++++-----
 1 file changed, 4 insertions(+), 5 deletions(-)

diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/schema/SchemaRegistryServiceImpl.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/schema/SchemaRegistryServiceImpl.java
index bc50c55b2ba..4eb87564d0f 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/schema/SchemaRegistryServiceImpl.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/schema/SchemaRegistryServiceImpl.java
@@ -552,12 +552,11 @@ public class SchemaRegistryServiceImpl implements 
SchemaRegistryService {
         schemaFutureList.thenCompose(FutureUtils::collect).handle((schemaList, 
ex) -> {
             List<SchemaAndMetadata> list = ex != null ? new ArrayList<>() : 
schemaList;
             if (ex != null) {
-                boolean recoverable = ex.getCause() != null && (ex.getCause() 
instanceof SchemaException)
-                        ? ((SchemaException) ex.getCause()).isRecoverable()
-                        : true;
+                final Throwable rc = FutureUtil.unwrapCompletionException(ex);
+                boolean recoverable = !(rc instanceof SchemaException) || 
((SchemaException) rc).isRecoverable();
                 // if error is recoverable then fail the request.
                 if (recoverable) {
-                    schemaResult.completeExceptionally(ex.getCause());
+                    schemaResult.completeExceptionally(rc);
                     return null;
                 }
                 // clean the schema list for recoverable and delete the schema 
from zk
@@ -570,7 +569,7 @@ public class SchemaRegistryServiceImpl implements 
SchemaRegistryService {
                 trimDeletedSchemaAndGetList(list);
                 // clean up the broken schema from zk
                 deleteSchemaStorage(schemaId, true).handle((sv, th) -> {
-                    log.info("Clean up non-recoverable schema {}. Deletion of 
schema {} {}", ex.getCause().getMessage(),
+                    log.info("Clean up non-recoverable schema {}. Deletion of 
schema {} {}", rc.getMessage(),
                             schemaId, (th == null ? "successful" : "failed, " 
+ th.getCause().getMessage()));
                     schemaResult.complete(list);
                     return null;

Reply via email to