This is an automated email from the ASF dual-hosted git repository.
xiangying pushed a commit to branch branch-2.10
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/branch-2.10 by this push:
new a53c32ac700 [fix][client] Unwrap completion exception for Lookup
Services (#17717)
a53c32ac700 is described below
commit a53c32ac700ce47bae4aeacb178dd0e891b816f3
Author: Penghui Li <[email protected]>
AuthorDate: Tue Sep 20 15:54:31 2022 +0800
[fix][client] Unwrap completion exception for Lookup Services (#17717)
(cherry picked from commit 320300cd2e0c58ee96981996d254884a683b7dd4)
---
.../client/impl/BinaryProtoLookupService.java | 33 ++++++++++++----------
.../pulsar/client/impl/HttpLookupService.java | 12 ++++----
2 files changed, 25 insertions(+), 20 deletions(-)
diff --git
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/BinaryProtoLookupService.java
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/BinaryProtoLookupService.java
index 190599dfffd..5dfa80a0dcd 100644
---
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/BinaryProtoLookupService.java
+++
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/BinaryProtoLookupService.java
@@ -42,6 +42,7 @@ import
org.apache.pulsar.common.partition.PartitionedTopicMetadata;
import org.apache.pulsar.common.protocol.Commands;
import org.apache.pulsar.common.protocol.schema.BytesSchemaVersion;
import org.apache.pulsar.common.schema.SchemaInfo;
+import org.apache.pulsar.common.util.FutureUtil;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -140,19 +141,21 @@ public class BinaryProtoLookupService implements
LookupService {
// (2) redirect to given address if response is:
redirect
if (r.redirect) {
findBroker(responseBrokerAddress, r.authoritative,
topicName, redirectCount + 1)
-
.thenAccept(addressFuture::complete).exceptionally((lookupException) -> {
- // lookup failed
- if (redirectCount > 0) {
- if (log.isDebugEnabled()) {
- log.debug("[{}] lookup redirection
failed ({}) : {}", topicName.toString(),
- redirectCount,
lookupException.getMessage());
+ .thenAccept(addressFuture::complete)
+ .exceptionally((lookupException) -> {
+ Throwable cause =
FutureUtil.unwrapCompletionException(lookupException);
+ // lookup failed
+ if (redirectCount > 0) {
+ if (log.isDebugEnabled()) {
+ log.debug("[{}] lookup redirection
failed ({}) : {}", topicName,
+ redirectCount,
cause.getMessage());
+ }
+ } else {
+ log.warn("[{}] lookup failed : {}",
topicName,
+ cause.getMessage(), cause);
}
- } else {
- log.warn("[{}] lookup failed : {}",
topicName.toString(),
- lookupException.getMessage(),
lookupException);
- }
-
addressFuture.completeExceptionally(lookupException);
- return null;
+ addressFuture.completeExceptionally(cause);
+ return null;
});
} else {
// (3) received correct broker to connect
@@ -175,7 +178,7 @@ public class BinaryProtoLookupService implements
LookupService {
client.getCnxPool().releaseConnection(clientCnx);
});
}).exceptionally(connectionException -> {
- addressFuture.completeExceptionally(connectionException);
+
addressFuture.completeExceptionally(FutureUtil.unwrapCompletionException(connectionException));
return null;
});
return addressFuture;
@@ -208,7 +211,7 @@ public class BinaryProtoLookupService implements
LookupService {
client.getCnxPool().releaseConnection(clientCnx);
});
}).exceptionally(connectionException -> {
- partitionFuture.completeExceptionally(connectionException);
+
partitionFuture.completeExceptionally(FutureUtil.unwrapCompletionException(connectionException));
return null;
});
@@ -244,7 +247,7 @@ public class BinaryProtoLookupService implements
LookupService {
client.getCnxPool().releaseConnection(clientCnx);
});
}).exceptionally(ex -> {
- schemaFuture.completeExceptionally(ex);
+
schemaFuture.completeExceptionally(FutureUtil.unwrapCompletionException(ex));
return null;
});
diff --git
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/HttpLookupService.java
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/HttpLookupService.java
index ced0aba9bba..ffa8826066f 100644
---
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/HttpLookupService.java
+++
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/HttpLookupService.java
@@ -141,8 +141,9 @@ public class HttpLookupService implements LookupService {
});
future.complete(result);
}).exceptionally(ex -> {
- log.warn("Failed to getTopicsUnderNamespace namespace {} {}.",
namespace, ex.getMessage());
- future.completeExceptionally(ex);
+ Throwable cause = FutureUtil.unwrapCompletionException(ex);
+ log.warn("Failed to getTopicsUnderNamespace namespace {} {}.",
namespace, cause.getMessage());
+ future.completeExceptionally(cause);
return null;
});
return future;
@@ -186,14 +187,15 @@ public class HttpLookupService implements LookupService {
future.complete(Optional.of(SchemaInfoUtil.newSchemaInfo(schemaName,
response)));
}
}).exceptionally(ex -> {
- if (ex.getCause() instanceof NotFoundException) {
+ Throwable cause = FutureUtil.unwrapCompletionException(ex);
+ if (cause instanceof NotFoundException) {
future.complete(Optional.empty());
} else {
log.warn("Failed to get schema for topic {} version {}",
topicName,
version != null ?
Base64.getEncoder().encodeToString(version) : null,
- ex.getCause());
- future.completeExceptionally(ex);
+ cause);
+ future.completeExceptionally(cause);
}
return null;
});