nodece commented on code in PR #15694:
URL: https://github.com/apache/pulsar/pull/15694#discussion_r878653346
##########
pulsar-broker/src/main/java/org/apache/pulsar/broker/lookup/TopicLookupBase.java:
##########
@@ -220,49 +220,52 @@ public static CompletableFuture<ByteBuf>
lookupTopicAsync(PulsarService pulsarSe
cluster);
}
validationFuture.complete(newLookupResponse(differentClusterData.getBrokerServiceUrl(),
- differentClusterData.getBrokerServiceUrlTls(), true,
LookupType.Redirect, requestId, false));
+ differentClusterData.getBrokerServiceUrlTls(), true,
LookupType.Redirect,
+ requestId, false));
} else {
// (2) authorize client
- try {
- checkAuthorization(pulsarService, topicName, clientAppId,
authenticationData);
- } catch (RestException authException) {
- log.warn("Failed to authorized {} on cluster {}",
clientAppId, topicName.toString());
-
validationFuture.complete(newLookupErrorResponse(ServerError.AuthorizationError,
- authException.getMessage(), requestId));
- return;
- } catch (Exception e) {
- log.warn("Unknown error while authorizing {} on cluster
{}", clientAppId, topicName.toString());
- validationFuture.completeExceptionally(e);
- return;
- }
- // (3) validate global namespace
- checkLocalOrGetPeerReplicationCluster(pulsarService,
topicName.getNamespaceObject())
- .thenAccept(peerClusterData -> {
- if (peerClusterData == null) {
- // (4) all validation passed: initiate lookup
- validationFuture.complete(null);
- return;
- }
- // if peer-cluster-data is present it means
namespace is owned by that peer-cluster and
- // request should be redirect to the peer-cluster
- if
(StringUtils.isBlank(peerClusterData.getBrokerServiceUrl())
- &&
StringUtils.isBlank(peerClusterData.getBrokerServiceUrlTls())) {
-
validationFuture.complete(newLookupErrorResponse(ServerError.MetadataError,
- "Redirected cluster's brokerService
url is not configured", requestId));
- return;
+ checkAuthorizationAsync(pulsarService, topicName, clientAppId,
authenticationData).thenRun(() ->
+ validationFuture.complete(null))
+ .exceptionally(e -> {
+ if (e.getCause() instanceof RestException) {
+ log.warn("Failed to authorized {} on cluster
{}", clientAppId, topicName);
+
validationFuture.complete(newLookupErrorResponse(ServerError.AuthorizationError,
+ e.getCause().getMessage(), requestId));
+ } else {
+ log.warn("Unknown error while authorizing {}
on cluster {}", clientAppId, topicName);
+ validationFuture.completeExceptionally(e);
}
-
validationFuture.complete(newLookupResponse(peerClusterData.getBrokerServiceUrl(),
- peerClusterData.getBrokerServiceUrlTls(),
true, LookupType.Redirect, requestId,
- false));
+ return null;
+ });
- }).exceptionally(ex -> {
+ // (3) validate global namespace
+ validationFuture.thenCompose(__ ->
+ checkLocalOrGetPeerReplicationCluster(pulsarService,
+
topicName.getNamespaceObject())).thenAccept(peerClusterData -> {
+ if (peerClusterData == null) {
+ // (4) all validation passed: initiate lookup
+ validationFuture.complete(null);
+ return;
+ }
+ // if peer-cluster-data is present it means namespace is
owned by that peer-cluster and
+ // request should be redirect to the peer-cluster
+ if
(StringUtils.isBlank(peerClusterData.getBrokerServiceUrl())
+ &&
StringUtils.isBlank(peerClusterData.getBrokerServiceUrlTls())) {
+
validationFuture.complete(newLookupErrorResponse(ServerError.MetadataError,
+ "Redirected cluster's brokerService url is not
configured", requestId));
+ return;
+ }
+
validationFuture.complete(newLookupResponse(peerClusterData.getBrokerServiceUrl(),
+ peerClusterData.getBrokerServiceUrlTls(), true,
LookupType.Redirect, requestId,
+ false));
+ }).exceptionally(ex -> {
validationFuture.complete(
- newLookupErrorResponse(ServerError.MetadataError,
ex.getMessage(), requestId));
+ newLookupErrorResponse(ServerError.MetadataError,
ex.getCause().getMessage(), requestId));
Review Comment:
```suggestion
newLookupErrorResponse(ServerError.MetadataError,
FutureUtil.unwrapCompletionException(ex).getMessage(), requestId));
```
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]