This is an automated email from the ASF dual-hosted git repository. technoboy pushed a commit to branch branch-2.11 in repository https://gitbox.apache.org/repos/asf/pulsar.git
commit ee30b137d0a8ec921cbab17c651e53cfb941eb5f Author: Penghui Li <[email protected]> AuthorDate: Tue Sep 20 15:54:31 2022 +0800 [fix][client] Unwrap completion exception for Lookup Services (#17717) --- .../client/impl/BinaryProtoLookupService.java | 33 ++++++++++++---------- .../pulsar/client/impl/HttpLookupService.java | 12 ++++---- 2 files changed, 25 insertions(+), 20 deletions(-) diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/BinaryProtoLookupService.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/BinaryProtoLookupService.java index 18bffba8dc6..e0b85d7bb34 100644 --- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/BinaryProtoLookupService.java +++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/BinaryProtoLookupService.java @@ -43,6 +43,7 @@ import org.apache.pulsar.common.partition.PartitionedTopicMetadata; import org.apache.pulsar.common.protocol.Commands; import org.apache.pulsar.common.protocol.schema.BytesSchemaVersion; import org.apache.pulsar.common.schema.SchemaInfo; +import org.apache.pulsar.common.util.FutureUtil; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -141,19 +142,21 @@ public class BinaryProtoLookupService implements LookupService { // (2) redirect to given address if response is: redirect if (r.redirect) { findBroker(responseBrokerAddress, r.authoritative, topicName, redirectCount + 1) - .thenAccept(addressFuture::complete).exceptionally((lookupException) -> { - // lookup failed - if (redirectCount > 0) { - if (log.isDebugEnabled()) { - log.debug("[{}] lookup redirection failed ({}) : {}", topicName, - redirectCount, lookupException.getMessage()); + .thenAccept(addressFuture::complete) + .exceptionally((lookupException) -> { + Throwable cause = FutureUtil.unwrapCompletionException(lookupException); + // lookup failed + if (redirectCount > 0) { + if (log.isDebugEnabled()) { + log.debug("[{}] lookup redirection failed ({}) : {}", topicName, + redirectCount, cause.getMessage()); + } + } else { + log.warn("[{}] lookup failed : {}", topicName, + cause.getMessage(), cause); } - } else { - log.warn("[{}] lookup failed : {}", topicName, - lookupException.getMessage(), lookupException); - } - addressFuture.completeExceptionally(lookupException); - return null; + addressFuture.completeExceptionally(cause); + return null; }); } else { // (3) received correct broker to connect @@ -176,7 +179,7 @@ public class BinaryProtoLookupService implements LookupService { client.getCnxPool().releaseConnection(clientCnx); }); }).exceptionally(connectionException -> { - addressFuture.completeExceptionally(connectionException); + addressFuture.completeExceptionally(FutureUtil.unwrapCompletionException(connectionException)); return null; }); return addressFuture; @@ -209,7 +212,7 @@ public class BinaryProtoLookupService implements LookupService { client.getCnxPool().releaseConnection(clientCnx); }); }).exceptionally(connectionException -> { - partitionFuture.completeExceptionally(connectionException); + partitionFuture.completeExceptionally(FutureUtil.unwrapCompletionException(connectionException)); return null; }); @@ -245,7 +248,7 @@ public class BinaryProtoLookupService implements LookupService { client.getCnxPool().releaseConnection(clientCnx); }); }).exceptionally(ex -> { - schemaFuture.completeExceptionally(ex); + schemaFuture.completeExceptionally(FutureUtil.unwrapCompletionException(ex)); return null; }); diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/HttpLookupService.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/HttpLookupService.java index def19c45aff..d42bde828bc 100644 --- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/HttpLookupService.java +++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/HttpLookupService.java @@ -148,8 +148,9 @@ public class HttpLookupService implements LookupService { }); future.complete(new GetTopicsResult(result, topicsHash, false, true)); }).exceptionally(ex -> { - log.warn("Failed to getTopicsUnderNamespace namespace {} {}.", namespace, ex.getMessage()); - future.completeExceptionally(ex); + Throwable cause = FutureUtil.unwrapCompletionException(ex); + log.warn("Failed to getTopicsUnderNamespace namespace {} {}.", namespace, cause.getMessage()); + future.completeExceptionally(cause); return null; }); return future; @@ -193,14 +194,15 @@ public class HttpLookupService implements LookupService { future.complete(Optional.of(SchemaInfoUtil.newSchemaInfo(schemaName, response))); } }).exceptionally(ex -> { - if (ex.getCause() instanceof NotFoundException) { + Throwable cause = FutureUtil.unwrapCompletionException(ex); + if (cause instanceof NotFoundException) { future.complete(Optional.empty()); } else { log.warn("Failed to get schema for topic {} version {}", topicName, version != null ? Base64.getEncoder().encodeToString(version) : null, - ex.getCause()); - future.completeExceptionally(ex); + cause); + future.completeExceptionally(cause); } return null; });
