This is an automated email from the ASF dual-hosted git repository.

xiangying pushed a commit to branch branch-2.10
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/branch-2.10 by this push:
     new a53c32ac700 [fix][client] Unwrap completion exception for Lookup 
Services (#17717)
a53c32ac700 is described below

commit a53c32ac700ce47bae4aeacb178dd0e891b816f3
Author: Penghui Li <[email protected]>
AuthorDate: Tue Sep 20 15:54:31 2022 +0800

    [fix][client] Unwrap completion exception for Lookup Services (#17717)
    
    (cherry picked from commit 320300cd2e0c58ee96981996d254884a683b7dd4)
---
 .../client/impl/BinaryProtoLookupService.java      | 33 ++++++++++++----------
 .../pulsar/client/impl/HttpLookupService.java      | 12 ++++----
 2 files changed, 25 insertions(+), 20 deletions(-)

diff --git 
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/BinaryProtoLookupService.java
 
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/BinaryProtoLookupService.java
index 190599dfffd..5dfa80a0dcd 100644
--- 
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/BinaryProtoLookupService.java
+++ 
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/BinaryProtoLookupService.java
@@ -42,6 +42,7 @@ import 
org.apache.pulsar.common.partition.PartitionedTopicMetadata;
 import org.apache.pulsar.common.protocol.Commands;
 import org.apache.pulsar.common.protocol.schema.BytesSchemaVersion;
 import org.apache.pulsar.common.schema.SchemaInfo;
+import org.apache.pulsar.common.util.FutureUtil;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -140,19 +141,21 @@ public class BinaryProtoLookupService implements 
LookupService {
                         // (2) redirect to given address if response is: 
redirect
                         if (r.redirect) {
                             findBroker(responseBrokerAddress, r.authoritative, 
topicName, redirectCount + 1)
-                                
.thenAccept(addressFuture::complete).exceptionally((lookupException) -> {
-                                // lookup failed
-                                if (redirectCount > 0) {
-                                    if (log.isDebugEnabled()) {
-                                        log.debug("[{}] lookup redirection 
failed ({}) : {}", topicName.toString(),
-                                                redirectCount, 
lookupException.getMessage());
+                                .thenAccept(addressFuture::complete)
+                                .exceptionally((lookupException) -> {
+                                    Throwable cause = 
FutureUtil.unwrapCompletionException(lookupException);
+                                    // lookup failed
+                                    if (redirectCount > 0) {
+                                        if (log.isDebugEnabled()) {
+                                            log.debug("[{}] lookup redirection 
failed ({}) : {}", topicName,
+                                                    redirectCount, 
cause.getMessage());
+                                        }
+                                    } else {
+                                        log.warn("[{}] lookup failed : {}", 
topicName,
+                                                cause.getMessage(), cause);
                                     }
-                                } else {
-                                    log.warn("[{}] lookup failed : {}", 
topicName.toString(),
-                                            lookupException.getMessage(), 
lookupException);
-                                }
-                                
addressFuture.completeExceptionally(lookupException);
-                                return null;
+                                    addressFuture.completeExceptionally(cause);
+                                    return null;
                             });
                         } else {
                             // (3) received correct broker to connect
@@ -175,7 +178,7 @@ public class BinaryProtoLookupService implements 
LookupService {
                 client.getCnxPool().releaseConnection(clientCnx);
             });
         }).exceptionally(connectionException -> {
-            addressFuture.completeExceptionally(connectionException);
+            
addressFuture.completeExceptionally(FutureUtil.unwrapCompletionException(connectionException));
             return null;
         });
         return addressFuture;
@@ -208,7 +211,7 @@ public class BinaryProtoLookupService implements 
LookupService {
                 client.getCnxPool().releaseConnection(clientCnx);
             });
         }).exceptionally(connectionException -> {
-            partitionFuture.completeExceptionally(connectionException);
+            
partitionFuture.completeExceptionally(FutureUtil.unwrapCompletionException(connectionException));
             return null;
         });
 
@@ -244,7 +247,7 @@ public class BinaryProtoLookupService implements 
LookupService {
                 client.getCnxPool().releaseConnection(clientCnx);
             });
         }).exceptionally(ex -> {
-            schemaFuture.completeExceptionally(ex);
+            
schemaFuture.completeExceptionally(FutureUtil.unwrapCompletionException(ex));
             return null;
         });
 
diff --git 
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/HttpLookupService.java
 
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/HttpLookupService.java
index ced0aba9bba..ffa8826066f 100644
--- 
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/HttpLookupService.java
+++ 
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/HttpLookupService.java
@@ -141,8 +141,9 @@ public class HttpLookupService implements LookupService {
                 });
                 future.complete(result);
             }).exceptionally(ex -> {
-                log.warn("Failed to getTopicsUnderNamespace namespace {} {}.", 
namespace, ex.getMessage());
-                future.completeExceptionally(ex);
+                Throwable cause = FutureUtil.unwrapCompletionException(ex);
+                log.warn("Failed to getTopicsUnderNamespace namespace {} {}.", 
namespace, cause.getMessage());
+                future.completeExceptionally(cause);
                 return null;
             });
         return future;
@@ -186,14 +187,15 @@ public class HttpLookupService implements LookupService {
                 
future.complete(Optional.of(SchemaInfoUtil.newSchemaInfo(schemaName, 
response)));
             }
         }).exceptionally(ex -> {
-            if (ex.getCause() instanceof NotFoundException) {
+            Throwable cause = FutureUtil.unwrapCompletionException(ex);
+            if (cause instanceof NotFoundException) {
                 future.complete(Optional.empty());
             } else {
                 log.warn("Failed to get schema for topic {} version {}",
                         topicName,
                         version != null ? 
Base64.getEncoder().encodeToString(version) : null,
-                        ex.getCause());
-                future.completeExceptionally(ex);
+                        cause);
+                future.completeExceptionally(cause);
             }
             return null;
         });

Reply via email to