This is an automated email from the ASF dual-hosted git repository.

technoboy pushed a commit to branch branch-2.11
in repository https://gitbox.apache.org/repos/asf/pulsar.git

commit ee30b137d0a8ec921cbab17c651e53cfb941eb5f
Author: Penghui Li <[email protected]>
AuthorDate: Tue Sep 20 15:54:31 2022 +0800

    [fix][client] Unwrap completion exception for Lookup Services (#17717)
---
 .../client/impl/BinaryProtoLookupService.java      | 33 ++++++++++++----------
 .../pulsar/client/impl/HttpLookupService.java      | 12 ++++----
 2 files changed, 25 insertions(+), 20 deletions(-)

diff --git 
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/BinaryProtoLookupService.java
 
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/BinaryProtoLookupService.java
index 18bffba8dc6..e0b85d7bb34 100644
--- 
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/BinaryProtoLookupService.java
+++ 
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/BinaryProtoLookupService.java
@@ -43,6 +43,7 @@ import 
org.apache.pulsar.common.partition.PartitionedTopicMetadata;
 import org.apache.pulsar.common.protocol.Commands;
 import org.apache.pulsar.common.protocol.schema.BytesSchemaVersion;
 import org.apache.pulsar.common.schema.SchemaInfo;
+import org.apache.pulsar.common.util.FutureUtil;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -141,19 +142,21 @@ public class BinaryProtoLookupService implements 
LookupService {
                         // (2) redirect to given address if response is: 
redirect
                         if (r.redirect) {
                             findBroker(responseBrokerAddress, r.authoritative, 
topicName, redirectCount + 1)
-                                
.thenAccept(addressFuture::complete).exceptionally((lookupException) -> {
-                                // lookup failed
-                                if (redirectCount > 0) {
-                                    if (log.isDebugEnabled()) {
-                                        log.debug("[{}] lookup redirection 
failed ({}) : {}", topicName,
-                                                redirectCount, 
lookupException.getMessage());
+                                .thenAccept(addressFuture::complete)
+                                .exceptionally((lookupException) -> {
+                                    Throwable cause = 
FutureUtil.unwrapCompletionException(lookupException);
+                                    // lookup failed
+                                    if (redirectCount > 0) {
+                                        if (log.isDebugEnabled()) {
+                                            log.debug("[{}] lookup redirection 
failed ({}) : {}", topicName,
+                                                    redirectCount, 
cause.getMessage());
+                                        }
+                                    } else {
+                                        log.warn("[{}] lookup failed : {}", 
topicName,
+                                                cause.getMessage(), cause);
                                     }
-                                } else {
-                                    log.warn("[{}] lookup failed : {}", 
topicName,
-                                            lookupException.getMessage(), 
lookupException);
-                                }
-                                
addressFuture.completeExceptionally(lookupException);
-                                return null;
+                                    addressFuture.completeExceptionally(cause);
+                                    return null;
                             });
                         } else {
                             // (3) received correct broker to connect
@@ -176,7 +179,7 @@ public class BinaryProtoLookupService implements 
LookupService {
                 client.getCnxPool().releaseConnection(clientCnx);
             });
         }).exceptionally(connectionException -> {
-            addressFuture.completeExceptionally(connectionException);
+            
addressFuture.completeExceptionally(FutureUtil.unwrapCompletionException(connectionException));
             return null;
         });
         return addressFuture;
@@ -209,7 +212,7 @@ public class BinaryProtoLookupService implements 
LookupService {
                 client.getCnxPool().releaseConnection(clientCnx);
             });
         }).exceptionally(connectionException -> {
-            partitionFuture.completeExceptionally(connectionException);
+            
partitionFuture.completeExceptionally(FutureUtil.unwrapCompletionException(connectionException));
             return null;
         });
 
@@ -245,7 +248,7 @@ public class BinaryProtoLookupService implements 
LookupService {
                 client.getCnxPool().releaseConnection(clientCnx);
             });
         }).exceptionally(ex -> {
-            schemaFuture.completeExceptionally(ex);
+            
schemaFuture.completeExceptionally(FutureUtil.unwrapCompletionException(ex));
             return null;
         });
 
diff --git 
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/HttpLookupService.java
 
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/HttpLookupService.java
index def19c45aff..d42bde828bc 100644
--- 
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/HttpLookupService.java
+++ 
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/HttpLookupService.java
@@ -148,8 +148,9 @@ public class HttpLookupService implements LookupService {
                 });
                 future.complete(new GetTopicsResult(result, topicsHash, false, 
true));
             }).exceptionally(ex -> {
-                log.warn("Failed to getTopicsUnderNamespace namespace {} {}.", 
namespace, ex.getMessage());
-                future.completeExceptionally(ex);
+                Throwable cause = FutureUtil.unwrapCompletionException(ex);
+                log.warn("Failed to getTopicsUnderNamespace namespace {} {}.", 
namespace, cause.getMessage());
+                future.completeExceptionally(cause);
                 return null;
             });
         return future;
@@ -193,14 +194,15 @@ public class HttpLookupService implements LookupService {
                 
future.complete(Optional.of(SchemaInfoUtil.newSchemaInfo(schemaName, 
response)));
             }
         }).exceptionally(ex -> {
-            if (ex.getCause() instanceof NotFoundException) {
+            Throwable cause = FutureUtil.unwrapCompletionException(ex);
+            if (cause instanceof NotFoundException) {
                 future.complete(Optional.empty());
             } else {
                 log.warn("Failed to get schema for topic {} version {}",
                         topicName,
                         version != null ? 
Base64.getEncoder().encodeToString(version) : null,
-                        ex.getCause());
-                future.completeExceptionally(ex);
+                        cause);
+                future.completeExceptionally(cause);
             }
             return null;
         });

Reply via email to