This is an automated email from the ASF dual-hosted git repository. xyz pushed a commit to branch bewaremypower/2.8-pick-15694 in repository https://gitbox.apache.org/repos/asf/pulsar.git
commit 3a04eb5798a6f73c5b425062f2677a5943f33746 Author: lipenghui <[email protected]> AuthorDate: Thu Jun 9 09:06:28 2022 +0800 [branch-2.8][fix][security] Add timeout of sync methods and avoid call sync method for AuthoriationService (#15694) (cherry picked from commit 6af365e36aed74e95ca6e088f453d9513094bb36) Besides resolving the basic conflicts, this PR - migrate `validateAdminAccessForTenantAsync` from #14149 - migrate `TenantResources#getTenantAsync` from #11693 --- .../broker/authorization/AuthorizationService.java | 28 ++++-- .../pulsar/broker/resources/BaseResources.java | 11 ++- .../pulsar/broker/resources/TenantResources.java | 7 +- .../broker/admin/impl/PersistentTopicsBase.java | 104 ++++++++++----------- .../pulsar/broker/lookup/TopicLookupBase.java | 54 ++++++----- .../pulsar/broker/web/PulsarWebResource.java | 97 +++++++++++++++++-- 6 files changed, 205 insertions(+), 96 deletions(-) diff --git a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/authorization/AuthorizationService.java b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/authorization/AuthorizationService.java index 26d04776e5d..90b49603fe8 100644 --- a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/authorization/AuthorizationService.java +++ b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/authorization/AuthorizationService.java @@ -30,7 +30,6 @@ import org.apache.pulsar.common.policies.data.NamespaceOperation; import org.apache.pulsar.common.policies.data.PolicyName; import org.apache.pulsar.common.policies.data.PolicyOperation; import org.apache.pulsar.common.policies.data.TenantInfo; -import org.apache.pulsar.common.policies.data.TenantInfoImpl; import org.apache.pulsar.common.policies.data.TenantOperation; import org.apache.pulsar.common.policies.data.TopicOperation; import org.apache.pulsar.common.util.FutureUtil; @@ -42,6 +41,7 @@ import javax.ws.rs.core.Response; import java.util.Set; import java.util.concurrent.CompletableFuture; import java.util.concurrent.ExecutionException; +import java.util.concurrent.TimeoutException; import static java.util.concurrent.TimeUnit.SECONDS; @@ -394,11 +394,15 @@ public class AuthorizationService { AuthenticationDataSource authData) { try { return allowTenantOperationAsync( - tenantName, operation, originalRole, role, authData).get(); + tenantName, operation, originalRole, role, authData).get( + conf.getZooKeeperOperationTimeoutSeconds(), SECONDS); } catch (InterruptedException e) { + Thread.currentThread().interrupt(); throw new RestException(e); } catch (ExecutionException e) { throw new RestException(e.getCause()); + } catch (TimeoutException e) { + throw new RestException(e); } } @@ -519,11 +523,15 @@ public class AuthorizationService { AuthenticationDataSource authData) { try { return allowNamespacePolicyOperationAsync( - namespaceName, policy, operation, originalRole, role, authData).get(); + namespaceName, policy, operation, originalRole, role, authData).get( + conf.getZooKeeperOperationTimeoutSeconds(), SECONDS); } catch (InterruptedException e) { + Thread.currentThread().interrupt(); throw new RestException(e); } catch (ExecutionException e) { throw new RestException(e.getCause()); + } catch (TimeoutException e) { + throw new RestException(e); } } @@ -583,11 +591,15 @@ public class AuthorizationService { AuthenticationDataSource authData) { try { return allowTopicPolicyOperationAsync( - topicName, policy, operation, originalRole, role, authData).get(); + topicName, policy, operation, originalRole, role, authData).get( + conf.getZooKeeperOperationTimeoutSeconds(), SECONDS); } catch (InterruptedException e) { + Thread.currentThread().interrupt(); throw new RestException(e); } catch (ExecutionException e) { throw new RestException(e.getCause()); + } catch (TimeoutException e) { + throw new RestException(e); } } @@ -665,13 +677,17 @@ public class AuthorizationService { TopicOperation operation, String originalRole, String role, - AuthenticationDataSource authData) { + AuthenticationDataSource authData) throws Exception { try { - return allowTopicOperationAsync(topicName, operation, originalRole, role, authData).get(); + return allowTopicOperationAsync(topicName, operation, originalRole, role, authData).get( + conf.getZooKeeperOperationTimeoutSeconds(), SECONDS); } catch (InterruptedException e) { + Thread.currentThread().interrupt(); throw new RestException(e); } catch (ExecutionException e) { throw new RestException(e.getCause()); + } catch (TimeoutException e) { + throw new RestException(e); } } } diff --git a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/resources/BaseResources.java b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/resources/BaseResources.java index 8016bcef314..ce772c2bf04 100644 --- a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/resources/BaseResources.java +++ b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/resources/BaseResources.java @@ -19,6 +19,7 @@ package org.apache.pulsar.broker.resources; import com.fasterxml.jackson.core.type.TypeReference; +import com.google.common.base.Joiner; import java.util.List; import java.util.Optional; import java.util.concurrent.CompletableFuture; @@ -38,6 +39,8 @@ import org.apache.pulsar.metadata.api.extended.MetadataStoreExtended; */ public class BaseResources<T> { + protected static final String BASE_POLICIES_PATH = "/admin/policies"; + @Getter private final MetadataStoreExtended store; @Getter @@ -164,4 +167,10 @@ public class BaseResources<T> { public CompletableFuture<Boolean> existsAsync(String path) { return cache.exists(path); } -} \ No newline at end of file + + protected static String joinPath(String... parts) { + StringBuilder sb = new StringBuilder(); + Joiner.on('/').appendTo(sb, parts); + return sb.toString(); + } +} diff --git a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/resources/TenantResources.java b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/resources/TenantResources.java index 127332e1332..78d80b9d6f7 100644 --- a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/resources/TenantResources.java +++ b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/resources/TenantResources.java @@ -18,12 +18,17 @@ */ package org.apache.pulsar.broker.resources; +import java.util.Optional; +import java.util.concurrent.CompletableFuture; import org.apache.pulsar.common.policies.data.TenantInfo; -import org.apache.pulsar.common.policies.data.TenantInfoImpl; import org.apache.pulsar.metadata.api.extended.MetadataStoreExtended; public class TenantResources extends BaseResources<TenantInfo> { public TenantResources(MetadataStoreExtended store, int operationTimeoutSec) { super(store, TenantInfo.class, operationTimeoutSec); } + + public CompletableFuture<Optional<TenantInfo>> getTenantAsync(String tenantName) { + return getAsync(joinPath(BASE_POLICIES_PATH, tenantName)); + } } diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java index 9db3c2e4096..cbcc8cd5a72 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java @@ -245,23 +245,6 @@ public class PersistentTopicsBase extends AdminResource { } } - protected void validateAdminAndClientPermission() { - try { - validateAdminAccessForTenant(topicName.getTenant()); - } catch (Exception ve) { - try { - checkAuthorization(pulsar(), topicName, clientAppId(), clientAuthData()); - } catch (RestException re) { - throw re; - } catch (Exception e) { - // unknown error marked as internal server error - log.warn("Unexpected error while authorizing request. topic={}, role={}. Error: {}", - topicName, clientAppId(), e.getMessage(), e); - throw new RestException(e); - } - } - } - public void validateAdminOperationOnTopic(boolean authoritative) { validateAdminAccessForTenant(topicName.getTenant()); validateTopicOwnership(topicName, authoritative); @@ -3446,46 +3429,55 @@ public class PersistentTopicsBase extends AdminResource { PulsarService pulsar, String clientAppId, String originalPrincipal, AuthenticationDataSource authenticationData, TopicName topicName) { CompletableFuture<PartitionedTopicMetadata> metadataFuture = new CompletableFuture<>(); - try { - // (1) authorize client - try { - checkAuthorization(pulsar, topicName, clientAppId, authenticationData); - } catch (RestException e) { - try { - validateAdminAccessForTenant(pulsar, - clientAppId, originalPrincipal, topicName.getTenant(), authenticationData); - } catch (RestException authException) { - log.warn("Failed to authorize {} on cluster {}", clientAppId, topicName.toString()); - throw new PulsarClientException(String.format("Authorization failed %s on topic %s with error %s", - clientAppId, topicName.toString(), authException.getMessage())); - } - } catch (Exception ex) { - // throw without wrapping to PulsarClientException that considers: unknown error marked as internal - // server error - log.warn("Failed to authorize {} on cluster {} with unexpected exception {}", clientAppId, - topicName.toString(), ex.getMessage(), ex); - throw ex; - } + CompletableFuture<Void> authorizationFuture = new CompletableFuture<>(); + checkAuthorizationAsync(pulsar, topicName, clientAppId, authenticationData) + .thenRun(() -> authorizationFuture.complete(null)) + .exceptionally(e -> { + Throwable throwable = FutureUtil.unwrapCompletionException(e); + if (throwable instanceof RestException) { + validateAdminAccessForTenantAsync(pulsar, + clientAppId, originalPrincipal, topicName.getTenant(), authenticationData) + .thenRun(() -> { + authorizationFuture.complete(null); + }).exceptionally(ex -> { + Throwable throwable2 = FutureUtil.unwrapCompletionException(ex); + if (throwable2 instanceof RestException) { + log.warn("Failed to authorize {} on topic {}", clientAppId, topicName); + authorizationFuture.completeExceptionally(new PulsarClientException( + String.format("Authorization failed %s on topic %s with error %s", + clientAppId, topicName, throwable2.getMessage()))); + } else { + authorizationFuture.completeExceptionally(throwable2); + } + return null; + }); + } else { + // throw without wrapping to PulsarClientException that considers: unknown error marked as + // internal server error + log.warn("Failed to authorize {} on topic {}", clientAppId, topicName, throwable); + authorizationFuture.completeExceptionally(throwable); + } + return null; + }); - // validates global-namespace contains local/peer cluster: if peer/local cluster present then lookup can - // serve/redirect request else fail partitioned-metadata-request so, client fails while creating - // producer/consumer - checkLocalOrGetPeerReplicationCluster(pulsar, topicName.getNamespaceObject()) - .thenCompose(res -> pulsar.getBrokerService() - .fetchPartitionedTopicMetadataCheckAllowAutoCreationAsync(topicName)) - .thenAccept(metadata -> { - if (log.isDebugEnabled()) { - log.debug("[{}] Total number of partitions for topic {} is {}", clientAppId, topicName, - metadata.partitions); - } - metadataFuture.complete(metadata); - }).exceptionally(ex -> { - metadataFuture.completeExceptionally(ex.getCause()); - return null; - }); - } catch (Exception ex) { - metadataFuture.completeExceptionally(ex); - } + // validates global-namespace contains local/peer cluster: if peer/local cluster present then lookup can + // serve/redirect request else fail partitioned-metadata-request so, client fails while creating + // producer/consumer + authorizationFuture.thenCompose(__ -> + checkLocalOrGetPeerReplicationCluster(pulsar, topicName.getNamespaceObject())) + .thenCompose(res -> + pulsar.getBrokerService().fetchPartitionedTopicMetadataCheckAllowAutoCreationAsync(topicName)) + .thenAccept(metadata -> { + if (log.isDebugEnabled()) { + log.debug("[{}] Total number of partitions for topic {} is {}", clientAppId, topicName, + metadata.partitions); + } + metadataFuture.complete(metadata); + }) + .exceptionally(e -> { + metadataFuture.completeExceptionally(FutureUtil.unwrapCompletionException(e)); + return null; + }); return metadataFuture; } diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/lookup/TopicLookupBase.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/lookup/TopicLookupBase.java index 44d0ff2ec88..7769390ddcb 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/lookup/TopicLookupBase.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/lookup/TopicLookupBase.java @@ -46,6 +46,7 @@ import org.apache.pulsar.common.naming.TopicName; import org.apache.pulsar.common.policies.data.NamespaceOperation; import org.apache.pulsar.common.policies.data.TopicOperation; import org.apache.pulsar.common.util.Codec; +import org.apache.pulsar.common.util.FutureUtil; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -217,24 +218,14 @@ public class TopicLookupBase extends PulsarWebResource { cluster); } validationFuture.complete(newLookupResponse(differentClusterData.getBrokerServiceUrl(), - differentClusterData.getBrokerServiceUrlTls(), true, LookupType.Redirect, requestId, false)); + differentClusterData.getBrokerServiceUrlTls(), true, LookupType.Redirect, + requestId, false)); } else { // (2) authorize client - try { - checkAuthorization(pulsarService, topicName, clientAppId, authenticationData); - } catch (RestException authException) { - log.warn("Failed to authorized {} on cluster {}", clientAppId, topicName.toString()); - validationFuture.complete(newLookupErrorResponse(ServerError.AuthorizationError, - authException.getMessage(), requestId)); - return; - } catch (Exception e) { - log.warn("Unknown error while authorizing {} on cluster {}", clientAppId, topicName.toString()); - validationFuture.completeExceptionally(e); - return; - } - // (3) validate global namespace - checkLocalOrGetPeerReplicationCluster(pulsarService, topicName.getNamespaceObject()) - .thenAccept(peerClusterData -> { + checkAuthorizationAsync(pulsarService, topicName, clientAppId, authenticationData).thenRun(() -> { + // (3) validate global namespace + checkLocalOrGetPeerReplicationCluster(pulsarService, + topicName.getNamespaceObject()).thenAccept(peerClusterData -> { if (peerClusterData == null) { // (4) all validation passed: initiate lookup validationFuture.complete(null); @@ -245,21 +236,36 @@ public class TopicLookupBase extends PulsarWebResource { if (StringUtils.isBlank(peerClusterData.getBrokerServiceUrl()) && StringUtils.isBlank(peerClusterData.getBrokerServiceUrlTls())) { validationFuture.complete(newLookupErrorResponse(ServerError.MetadataError, - "Redirected cluster's brokerService url is not configured", requestId)); + "Redirected cluster's brokerService url is not configured", + requestId)); return; } validationFuture.complete(newLookupResponse(peerClusterData.getBrokerServiceUrl(), - peerClusterData.getBrokerServiceUrlTls(), true, LookupType.Redirect, requestId, + peerClusterData.getBrokerServiceUrlTls(), true, LookupType.Redirect, + requestId, false)); - }).exceptionally(ex -> { - validationFuture.complete( - newLookupErrorResponse(ServerError.MetadataError, ex.getMessage(), requestId)); - return null; - }); + validationFuture.complete( + newLookupErrorResponse(ServerError.MetadataError, + FutureUtil.unwrapCompletionException(ex).getMessage(), requestId)); + return null; + }); + }) + .exceptionally(e -> { + Throwable throwable = FutureUtil.unwrapCompletionException(e); + if (throwable instanceof RestException) { + log.warn("Failed to authorized {} on cluster {}", clientAppId, topicName); + validationFuture.complete(newLookupErrorResponse(ServerError.AuthorizationError, + throwable.getMessage(), requestId)); + } else { + log.warn("Unknown error while authorizing {} on cluster {}", clientAppId, topicName); + validationFuture.completeExceptionally(throwable); + } + return null; + }); } }).exceptionally(ex -> { - validationFuture.completeExceptionally(ex); + validationFuture.completeExceptionally(FutureUtil.unwrapCompletionException(ex)); return null; }); diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/web/PulsarWebResource.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/web/PulsarWebResource.java index bc60b4ee326..61fd58ee565 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/web/PulsarWebResource.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/web/PulsarWebResource.java @@ -261,6 +261,84 @@ public abstract class PulsarWebResource { } } + protected static CompletableFuture<Void> validateAdminAccessForTenantAsync( + PulsarService pulsar, String clientAppId, String originalPrincipal, String tenant, + AuthenticationDataSource authenticationData) { + if (log.isDebugEnabled()) { + log.debug("check admin access on tenant: {} - Authenticated: {} -- role: {}", tenant, + (isClientAuthenticated(clientAppId)), clientAppId); + } + return pulsar.getPulsarResources().getTenantResources().getTenantAsync(tenant) + .thenCompose(tenantInfoOptional -> { + if (!tenantInfoOptional.isPresent()) { + throw new RestException(Status.NOT_FOUND, "Tenant does not exist"); + } + TenantInfo tenantInfo = tenantInfoOptional.get(); + if (pulsar.getConfiguration().isAuthenticationEnabled() && pulsar.getConfiguration() + .isAuthorizationEnabled()) { + if (!isClientAuthenticated(clientAppId)) { + throw new RestException(Status.FORBIDDEN, "Need to authenticate to perform the request"); + } + validateOriginalPrincipal(pulsar.getConfiguration().getProxyRoles(), clientAppId, + originalPrincipal); + if (pulsar.getConfiguration().getProxyRoles().contains(clientAppId)) { + AuthorizationService authorizationService = + pulsar.getBrokerService().getAuthorizationService(); + return authorizationService.isTenantAdmin(tenant, clientAppId, tenantInfo, + authenticationData).thenCompose(isTenantAdmin -> { + String debugMsg = "Successfully authorized {} (proxied by {}) on tenant {}"; + if (!isTenantAdmin) { + return authorizationService.isSuperUser(clientAppId, authenticationData) + .thenCombine(authorizationService.isSuperUser(originalPrincipal, + authenticationData), + (proxyAuthorized, originalPrincipalAuthorized) -> { + if (!proxyAuthorized || !originalPrincipalAuthorized) { + throw new RestException(Status.UNAUTHORIZED, + String.format( + "Proxy not authorized to access " + + "resource (proxy:%s,original:%s)" + , clientAppId, originalPrincipal)); + } else { + if (log.isDebugEnabled()) { + log.debug(debugMsg, originalPrincipal, + clientAppId, tenant); + } + return null; + } + }); + } else { + if (log.isDebugEnabled()) { + log.debug(debugMsg, originalPrincipal, clientAppId, tenant); + } + return CompletableFuture.completedFuture(null); + } + }); + } else { + return pulsar.getBrokerService() + .getAuthorizationService() + .isSuperUser(clientAppId, authenticationData) + .thenCompose(isSuperUser -> { + if (!isSuperUser) { + return pulsar.getBrokerService().getAuthorizationService() + .isTenantAdmin(tenant, clientAppId, tenantInfo, authenticationData); + } else { + return CompletableFuture.completedFuture(true); + } + }).thenAccept(authorized -> { + if (!authorized) { + throw new RestException(Status.UNAUTHORIZED, + "Don't have permission to administrate resources on this tenant"); + } else { + log.debug("Successfully authorized {} on tenant {}", clientAppId, tenant); + } + }); + } + } else { + return CompletableFuture.completedFuture(null); + } + }); + } + protected static void validateAdminAccessForTenant(PulsarService pulsar, String clientAppId, String originalPrincipal, String tenant, AuthenticationDataSource authenticationData) @@ -806,18 +884,21 @@ public abstract class PulsarWebResource { return null; } - protected static void checkAuthorization(PulsarService pulsarService, TopicName topicName, String role, - AuthenticationDataSource authenticationData) throws Exception { + protected static CompletableFuture<Void> checkAuthorizationAsync(PulsarService pulsarService, TopicName topicName, + String role, AuthenticationDataSource authenticationData) { if (!pulsarService.getConfiguration().isAuthorizationEnabled()) { // No enforcing of authorization policies - return; + return CompletableFuture.completedFuture(null); } // get zk policy manager - if (!pulsarService.getBrokerService().getAuthorizationService().allowTopicOperation(topicName, - TopicOperation.LOOKUP, null, role, authenticationData)) { - log.warn("[{}] Role {} is not allowed to lookup topic", topicName, role); - throw new RestException(Status.UNAUTHORIZED, "Don't have permission to connect to this namespace"); - } + return pulsarService.getBrokerService().getAuthorizationService().allowTopicOperationAsync(topicName, + TopicOperation.LOOKUP, null, role, authenticationData).thenAccept(allow -> { + if (!allow) { + log.warn("[{}] Role {} is not allowed to lookup topic", topicName, role); + throw new RestException(Status.UNAUTHORIZED, + "Don't have permission to connect to this namespace"); + } + }); } // Used for unit tests access
