nodece commented on code in PR #15694:
URL: https://github.com/apache/pulsar/pull/15694#discussion_r878653299
##########
pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java:
##########
@@ -3880,46 +3879,53 @@ public static
CompletableFuture<PartitionedTopicMetadata> getPartitionedTopicMet
PulsarService pulsar, String clientAppId, String originalPrincipal,
AuthenticationDataSource authenticationData, TopicName topicName) {
CompletableFuture<PartitionedTopicMetadata> metadataFuture = new
CompletableFuture<>();
- try {
- // (1) authorize client
- try {
- checkAuthorization(pulsar, topicName, clientAppId,
authenticationData);
- } catch (RestException e) {
- try {
- validateAdminAccessForTenant(pulsar,
- clientAppId, originalPrincipal,
topicName.getTenant(), authenticationData,
-
pulsar.getConfiguration().getMetadataStoreOperationTimeoutSeconds(), SECONDS);
- } catch (RestException authException) {
- log.warn("Failed to authorize {} on topic {}",
clientAppId, topicName);
- throw new
PulsarClientException(String.format("Authorization failed %s on topic %s with
error %s",
- clientAppId, topicName,
authException.getMessage()));
- }
- } catch (Exception ex) {
- // throw without wrapping to PulsarClientException that
considers: unknown error marked as internal
- // server error
- log.warn("Failed to authorize {} on topic {}", clientAppId,
topicName, ex);
- throw ex;
- }
+ CompletableFuture<Void> authorizationFuture = new
CompletableFuture<>();
+ checkAuthorizationAsync(pulsar, topicName, clientAppId,
authenticationData)
+ .thenRun(() -> authorizationFuture.complete(null))
+ .exceptionally(e -> {
+ if (e.getCause() instanceof RestException) {
+ validateAdminAccessForTenantAsync(pulsar,
+ clientAppId, originalPrincipal,
topicName.getTenant(), authenticationData)
+ .thenRun(() -> {
+ authorizationFuture.complete(null);
+ }).exceptionally(ex -> {
+ if (ex.getCause() instanceof
RestException) {
+ log.warn("Failed to authorize {} on
topic {}", clientAppId, topicName);
+
authorizationFuture.completeExceptionally(new PulsarClientException(
+ String.format("Authorization
failed %s on topic %s with error %s",
+ clientAppId, topicName,
ex.getCause().getMessage())));
+ } else {
+
authorizationFuture.completeExceptionally(e);
+ }
+ return null;
+ });
+ } else {
+ // throw without wrapping to PulsarClientException
that considers: unknown error marked as
+ // internal server error
+ log.warn("Failed to authorize {} on topic {}",
clientAppId, topicName, e);
+ authorizationFuture.completeExceptionally(e);
+ }
+ return null;
+ });
- // validates global-namespace contains local/peer cluster: if
peer/local cluster present then lookup can
- // serve/redirect request else fail partitioned-metadata-request
so, client fails while creating
- // producer/consumer
- checkLocalOrGetPeerReplicationCluster(pulsar,
topicName.getNamespaceObject())
- .thenCompose(res -> pulsar.getBrokerService()
-
.fetchPartitionedTopicMetadataCheckAllowAutoCreationAsync(topicName))
- .thenAccept(metadata -> {
- if (log.isDebugEnabled()) {
- log.debug("[{}] Total number of partitions for
topic {} is {}", clientAppId, topicName,
- metadata.partitions);
- }
- metadataFuture.complete(metadata);
- }).exceptionally(ex -> {
- metadataFuture.completeExceptionally(ex.getCause());
- return null;
- });
- } catch (Exception ex) {
- metadataFuture.completeExceptionally(ex);
- }
+ // validates global-namespace contains local/peer cluster: if
peer/local cluster present then lookup can
+ // serve/redirect request else fail partitioned-metadata-request so,
client fails while creating
+ // producer/consumer
+ authorizationFuture.thenCompose(__ ->
+ checkLocalOrGetPeerReplicationCluster(pulsar,
topicName.getNamespaceObject()))
+ .thenCompose(res ->
+
pulsar.getBrokerService().fetchPartitionedTopicMetadataCheckAllowAutoCreationAsync(topicName))
+ .thenAccept(metadata -> {
+ if (log.isDebugEnabled()) {
+ log.debug("[{}] Total number of partitions for topic
{} is {}", clientAppId, topicName,
+ metadata.partitions);
+ }
+ metadataFuture.complete(metadata);
+ })
+ .exceptionally(e -> {
+ metadataFuture.completeExceptionally(e.getCause());
Review Comment:
```suggestion
metadataFuture.completeExceptionally(FutureUtil.unwrapCompletionException(e));
```
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]