This is an automated email from the ASF dual-hosted git repository.

bogong pushed a commit to branch branch-2.9
in repository https://gitbox.apache.org/repos/asf/pulsar.git

commit acdb3e141baefe398228929d8ee4f07835a382a3
Author: Penghui Li <[email protected]>
AuthorDate: Tue Sep 20 15:54:31 2022 +0800

    [fix][client] Unwrap completion exception for Lookup Services (#17717)
    
    (cherry picked from commit 320300cd2e0c58ee96981996d254884a683b7dd4)
---
 .../client/impl/BinaryProtoLookupService.java      | 36 ++++++++++++----------
 .../pulsar/client/impl/HttpLookupService.java      | 16 +++++-----
 2 files changed, 29 insertions(+), 23 deletions(-)

diff --git 
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/BinaryProtoLookupService.java
 
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/BinaryProtoLookupService.java
index ba3281c64cc..6b3912e24be 100644
--- 
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/BinaryProtoLookupService.java
+++ 
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/BinaryProtoLookupService.java
@@ -46,6 +46,7 @@ import org.apache.pulsar.common.naming.TopicName;
 import org.apache.pulsar.common.partition.PartitionedTopicMetadata;
 import org.apache.pulsar.common.protocol.schema.BytesSchemaVersion;
 import org.apache.pulsar.common.schema.SchemaInfo;
+import org.apache.pulsar.common.util.FutureUtil;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -136,19 +137,22 @@ public class BinaryProtoLookupService implements 
LookupService {
                         // (2) redirect to given address if response is: 
redirect
                         if (r.redirect) {
                             findBroker(responseBrokerAddress, r.authoritative, 
topicName, redirectCount + 1)
-                                
.thenAccept(addressFuture::complete).exceptionally((lookupException) -> {
-                                // lookup failed
-                                if (redirectCount > 0) {
-                                    if (log.isDebugEnabled()) {
-                                        log.debug("[{}] lookup redirection 
failed ({}) : {}", topicName.toString(),
-                                                redirectCount, 
lookupException.getMessage());
-                                    }
-                                } else {
-                                    log.warn("[{}] lookup failed : {}", 
topicName.toString(),
-                                            lookupException.getMessage(), 
lookupException);
-                                }
-                                
addressFuture.completeExceptionally(lookupException);
-                                return null;
+                                    .thenAccept(addressFuture::complete)
+                                    .exceptionally((lookupException) -> {
+                                        Throwable cause = 
FutureUtil.unwrapCompletionException(lookupException);
+                                        // lookup failed
+                                        if (redirectCount > 0) {
+                                            if (log.isDebugEnabled()) {
+                                                log.debug("[{}] lookup 
redirection failed ({}) : {}", topicName,
+                                                        redirectCount, 
cause.getMessage());
+                                            }
+                                        } else {
+                                            log.warn("[{}] lookup failed : 
{}", topicName,
+                                                    cause.getMessage(), cause);
+                                        }
+                                        
addressFuture.completeExceptionally(cause);
+                                        return null;
+
                             });
                         } else {
                             // (3) received correct broker to connect
@@ -171,7 +175,7 @@ public class BinaryProtoLookupService implements 
LookupService {
                 client.getCnxPool().releaseConnection(clientCnx);
             });
         }).exceptionally(connectionException -> {
-            addressFuture.completeExceptionally(connectionException);
+            
addressFuture.completeExceptionally(FutureUtil.unwrapCompletionException(connectionException));
             return null;
         });
         return addressFuture;
@@ -203,7 +207,7 @@ public class BinaryProtoLookupService implements 
LookupService {
                 client.getCnxPool().releaseConnection(clientCnx);
             });
         }).exceptionally(connectionException -> {
-            partitionFuture.completeExceptionally(connectionException);
+            
partitionFuture.completeExceptionally(FutureUtil.unwrapCompletionException(connectionException));
             return null;
         });
 
@@ -239,7 +243,7 @@ public class BinaryProtoLookupService implements 
LookupService {
                 client.getCnxPool().releaseConnection(clientCnx);
             });
         }).exceptionally(ex -> {
-            schemaFuture.completeExceptionally(ex);
+            
schemaFuture.completeExceptionally(FutureUtil.unwrapCompletionException(ex));
             return null;
         });
 
diff --git 
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/HttpLookupService.java
 
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/HttpLookupService.java
index 72326c3db1f..cde8ed30604 100644
--- 
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/HttpLookupService.java
+++ 
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/HttpLookupService.java
@@ -142,10 +142,11 @@ public class HttpLookupService implements LookupService {
                         result.add(filtered);
                     }
                 });
-                future.complete(result);})
-            .exceptionally(ex -> {
-                log.warn("Failed to getTopicsUnderNamespace namespace {} {}.", 
namespace, ex.getMessage());
-                future.completeExceptionally(ex);
+                future.complete(result);
+            }).exceptionally(ex -> {
+                Throwable cause = FutureUtil.unwrapCompletionException(ex);
+                log.warn("Failed to getTopicsUnderNamespace namespace {} {}.", 
namespace, cause.getMessage());
+                future.completeExceptionally(cause);
                 return null;
             });
         return future;
@@ -188,14 +189,15 @@ public class HttpLookupService implements LookupService {
                 
future.complete(Optional.of(SchemaInfoUtil.newSchemaInfo(schemaName, 
response)));
             }
         }).exceptionally(ex -> {
-            if (ex.getCause() instanceof NotFoundException) {
+            Throwable cause = FutureUtil.unwrapCompletionException(ex);
+            if (cause instanceof NotFoundException) {
                 future.complete(Optional.empty());
             } else {
                 log.warn("Failed to get schema for topic {} version {}",
                         topicName,
                         version != null ? 
Base64.getEncoder().encodeToString(version) : null,
-                        ex.getCause());
-                future.completeExceptionally(ex);
+                        cause);
+                future.completeExceptionally(cause);
             }
             return null;
         });

Reply via email to