This is an automated email from the ASF dual-hosted git repository. bogong pushed a commit to branch branch-2.9 in repository https://gitbox.apache.org/repos/asf/pulsar.git
commit acdb3e141baefe398228929d8ee4f07835a382a3 Author: Penghui Li <[email protected]> AuthorDate: Tue Sep 20 15:54:31 2022 +0800 [fix][client] Unwrap completion exception for Lookup Services (#17717) (cherry picked from commit 320300cd2e0c58ee96981996d254884a683b7dd4) --- .../client/impl/BinaryProtoLookupService.java | 36 ++++++++++++---------- .../pulsar/client/impl/HttpLookupService.java | 16 +++++----- 2 files changed, 29 insertions(+), 23 deletions(-) diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/BinaryProtoLookupService.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/BinaryProtoLookupService.java index ba3281c64cc..6b3912e24be 100644 --- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/BinaryProtoLookupService.java +++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/BinaryProtoLookupService.java @@ -46,6 +46,7 @@ import org.apache.pulsar.common.naming.TopicName; import org.apache.pulsar.common.partition.PartitionedTopicMetadata; import org.apache.pulsar.common.protocol.schema.BytesSchemaVersion; import org.apache.pulsar.common.schema.SchemaInfo; +import org.apache.pulsar.common.util.FutureUtil; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -136,19 +137,22 @@ public class BinaryProtoLookupService implements LookupService { // (2) redirect to given address if response is: redirect if (r.redirect) { findBroker(responseBrokerAddress, r.authoritative, topicName, redirectCount + 1) - .thenAccept(addressFuture::complete).exceptionally((lookupException) -> { - // lookup failed - if (redirectCount > 0) { - if (log.isDebugEnabled()) { - log.debug("[{}] lookup redirection failed ({}) : {}", topicName.toString(), - redirectCount, lookupException.getMessage()); - } - } else { - log.warn("[{}] lookup failed : {}", topicName.toString(), - lookupException.getMessage(), lookupException); - } - addressFuture.completeExceptionally(lookupException); - return null; + .thenAccept(addressFuture::complete) + .exceptionally((lookupException) -> { + Throwable cause = FutureUtil.unwrapCompletionException(lookupException); + // lookup failed + if (redirectCount > 0) { + if (log.isDebugEnabled()) { + log.debug("[{}] lookup redirection failed ({}) : {}", topicName, + redirectCount, cause.getMessage()); + } + } else { + log.warn("[{}] lookup failed : {}", topicName, + cause.getMessage(), cause); + } + addressFuture.completeExceptionally(cause); + return null; + }); } else { // (3) received correct broker to connect @@ -171,7 +175,7 @@ public class BinaryProtoLookupService implements LookupService { client.getCnxPool().releaseConnection(clientCnx); }); }).exceptionally(connectionException -> { - addressFuture.completeExceptionally(connectionException); + addressFuture.completeExceptionally(FutureUtil.unwrapCompletionException(connectionException)); return null; }); return addressFuture; @@ -203,7 +207,7 @@ public class BinaryProtoLookupService implements LookupService { client.getCnxPool().releaseConnection(clientCnx); }); }).exceptionally(connectionException -> { - partitionFuture.completeExceptionally(connectionException); + partitionFuture.completeExceptionally(FutureUtil.unwrapCompletionException(connectionException)); return null; }); @@ -239,7 +243,7 @@ public class BinaryProtoLookupService implements LookupService { client.getCnxPool().releaseConnection(clientCnx); }); }).exceptionally(ex -> { - schemaFuture.completeExceptionally(ex); + schemaFuture.completeExceptionally(FutureUtil.unwrapCompletionException(ex)); return null; }); diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/HttpLookupService.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/HttpLookupService.java index 72326c3db1f..cde8ed30604 100644 --- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/HttpLookupService.java +++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/HttpLookupService.java @@ -142,10 +142,11 @@ public class HttpLookupService implements LookupService { result.add(filtered); } }); - future.complete(result);}) - .exceptionally(ex -> { - log.warn("Failed to getTopicsUnderNamespace namespace {} {}.", namespace, ex.getMessage()); - future.completeExceptionally(ex); + future.complete(result); + }).exceptionally(ex -> { + Throwable cause = FutureUtil.unwrapCompletionException(ex); + log.warn("Failed to getTopicsUnderNamespace namespace {} {}.", namespace, cause.getMessage()); + future.completeExceptionally(cause); return null; }); return future; @@ -188,14 +189,15 @@ public class HttpLookupService implements LookupService { future.complete(Optional.of(SchemaInfoUtil.newSchemaInfo(schemaName, response))); } }).exceptionally(ex -> { - if (ex.getCause() instanceof NotFoundException) { + Throwable cause = FutureUtil.unwrapCompletionException(ex); + if (cause instanceof NotFoundException) { future.complete(Optional.empty()); } else { log.warn("Failed to get schema for topic {} version {}", topicName, version != null ? Base64.getEncoder().encodeToString(version) : null, - ex.getCause()); - future.completeExceptionally(ex); + cause); + future.completeExceptionally(cause); } return null; });
