This is an automated email from the ASF dual-hosted git repository.
xyz pushed a commit to branch branch-2.8
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/branch-2.8 by this push:
new d81568e2bd6 [branch-2.8][fix][security] Add timeout of sync methods
and avoid call sync method for AuthoriationService (#15694) (#16831)
d81568e2bd6 is described below
commit d81568e2bd606fd652632118a2f9bfe087e68be5
Author: Yunze Xu <[email protected]>
AuthorDate: Tue Aug 2 15:56:56 2022 +0800
[branch-2.8][fix][security] Add timeout of sync methods and avoid call sync
method for AuthoriationService (#15694) (#16831)
(cherry picked from commit 6af365e36aed74e95ca6e088f453d9513094bb36)
Besides resolving the basic conflicts, this PR
- migrate `validateAdminAccessForTenantAsync` from #14149
- migrate `TenantResources#getTenantAsync` from #11693
Co-authored-by: lipenghui <[email protected]>
---
.../broker/authorization/AuthorizationService.java | 28 ++++--
.../pulsar/broker/resources/BaseResources.java | 1 +
.../pulsar/broker/resources/TenantResources.java | 7 +-
.../broker/admin/impl/PersistentTopicsBase.java | 104 ++++++++++-----------
.../pulsar/broker/lookup/TopicLookupBase.java | 54 ++++++-----
.../pulsar/broker/web/PulsarWebResource.java | 97 +++++++++++++++++--
6 files changed, 196 insertions(+), 95 deletions(-)
diff --git
a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/authorization/AuthorizationService.java
b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/authorization/AuthorizationService.java
index 26d04776e5d..90b49603fe8 100644
---
a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/authorization/AuthorizationService.java
+++
b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/authorization/AuthorizationService.java
@@ -30,7 +30,6 @@ import
org.apache.pulsar.common.policies.data.NamespaceOperation;
import org.apache.pulsar.common.policies.data.PolicyName;
import org.apache.pulsar.common.policies.data.PolicyOperation;
import org.apache.pulsar.common.policies.data.TenantInfo;
-import org.apache.pulsar.common.policies.data.TenantInfoImpl;
import org.apache.pulsar.common.policies.data.TenantOperation;
import org.apache.pulsar.common.policies.data.TopicOperation;
import org.apache.pulsar.common.util.FutureUtil;
@@ -42,6 +41,7 @@ import javax.ws.rs.core.Response;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
+import java.util.concurrent.TimeoutException;
import static java.util.concurrent.TimeUnit.SECONDS;
@@ -394,11 +394,15 @@ public class AuthorizationService {
AuthenticationDataSource authData) {
try {
return allowTenantOperationAsync(
- tenantName, operation, originalRole, role, authData).get();
+ tenantName, operation, originalRole, role, authData).get(
+ conf.getZooKeeperOperationTimeoutSeconds(),
SECONDS);
} catch (InterruptedException e) {
+ Thread.currentThread().interrupt();
throw new RestException(e);
} catch (ExecutionException e) {
throw new RestException(e.getCause());
+ } catch (TimeoutException e) {
+ throw new RestException(e);
}
}
@@ -519,11 +523,15 @@ public class AuthorizationService {
AuthenticationDataSource
authData) {
try {
return allowNamespacePolicyOperationAsync(
- namespaceName, policy, operation, originalRole, role,
authData).get();
+ namespaceName, policy, operation, originalRole, role,
authData).get(
+ conf.getZooKeeperOperationTimeoutSeconds(),
SECONDS);
} catch (InterruptedException e) {
+ Thread.currentThread().interrupt();
throw new RestException(e);
} catch (ExecutionException e) {
throw new RestException(e.getCause());
+ } catch (TimeoutException e) {
+ throw new RestException(e);
}
}
@@ -583,11 +591,15 @@ public class AuthorizationService {
AuthenticationDataSource
authData) {
try {
return allowTopicPolicyOperationAsync(
- topicName, policy, operation, originalRole, role,
authData).get();
+ topicName, policy, operation, originalRole, role,
authData).get(
+ conf.getZooKeeperOperationTimeoutSeconds(),
SECONDS);
} catch (InterruptedException e) {
+ Thread.currentThread().interrupt();
throw new RestException(e);
} catch (ExecutionException e) {
throw new RestException(e.getCause());
+ } catch (TimeoutException e) {
+ throw new RestException(e);
}
}
@@ -665,13 +677,17 @@ public class AuthorizationService {
TopicOperation operation,
String originalRole,
String role,
- AuthenticationDataSource authData) {
+ AuthenticationDataSource authData)
throws Exception {
try {
- return allowTopicOperationAsync(topicName, operation,
originalRole, role, authData).get();
+ return allowTopicOperationAsync(topicName, operation,
originalRole, role, authData).get(
+ conf.getZooKeeperOperationTimeoutSeconds(), SECONDS);
} catch (InterruptedException e) {
+ Thread.currentThread().interrupt();
throw new RestException(e);
} catch (ExecutionException e) {
throw new RestException(e.getCause());
+ } catch (TimeoutException e) {
+ throw new RestException(e);
}
}
}
diff --git
a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/resources/BaseResources.java
b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/resources/BaseResources.java
index 5b195d9dcb1..839011cc95f 100644
---
a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/resources/BaseResources.java
+++
b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/resources/BaseResources.java
@@ -19,6 +19,7 @@
package org.apache.pulsar.broker.resources;
import com.fasterxml.jackson.core.type.TypeReference;
+import com.google.common.base.Joiner;
import java.util.List;
import java.util.Optional;
import java.util.concurrent.CompletableFuture;
diff --git
a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/resources/TenantResources.java
b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/resources/TenantResources.java
index 127332e1332..78d80b9d6f7 100644
---
a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/resources/TenantResources.java
+++
b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/resources/TenantResources.java
@@ -18,12 +18,17 @@
*/
package org.apache.pulsar.broker.resources;
+import java.util.Optional;
+import java.util.concurrent.CompletableFuture;
import org.apache.pulsar.common.policies.data.TenantInfo;
-import org.apache.pulsar.common.policies.data.TenantInfoImpl;
import org.apache.pulsar.metadata.api.extended.MetadataStoreExtended;
public class TenantResources extends BaseResources<TenantInfo> {
public TenantResources(MetadataStoreExtended store, int
operationTimeoutSec) {
super(store, TenantInfo.class, operationTimeoutSec);
}
+
+ public CompletableFuture<Optional<TenantInfo>> getTenantAsync(String
tenantName) {
+ return getAsync(joinPath(BASE_POLICIES_PATH, tenantName));
+ }
}
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java
index 9db3c2e4096..cbcc8cd5a72 100644
---
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java
+++
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java
@@ -245,23 +245,6 @@ public class PersistentTopicsBase extends AdminResource {
}
}
- protected void validateAdminAndClientPermission() {
- try {
- validateAdminAccessForTenant(topicName.getTenant());
- } catch (Exception ve) {
- try {
- checkAuthorization(pulsar(), topicName, clientAppId(),
clientAuthData());
- } catch (RestException re) {
- throw re;
- } catch (Exception e) {
- // unknown error marked as internal server error
- log.warn("Unexpected error while authorizing request.
topic={}, role={}. Error: {}",
- topicName, clientAppId(), e.getMessage(), e);
- throw new RestException(e);
- }
- }
- }
-
public void validateAdminOperationOnTopic(boolean authoritative) {
validateAdminAccessForTenant(topicName.getTenant());
validateTopicOwnership(topicName, authoritative);
@@ -3446,46 +3429,55 @@ public class PersistentTopicsBase extends AdminResource
{
PulsarService pulsar, String clientAppId, String originalPrincipal,
AuthenticationDataSource authenticationData, TopicName topicName) {
CompletableFuture<PartitionedTopicMetadata> metadataFuture = new
CompletableFuture<>();
- try {
- // (1) authorize client
- try {
- checkAuthorization(pulsar, topicName, clientAppId,
authenticationData);
- } catch (RestException e) {
- try {
- validateAdminAccessForTenant(pulsar,
- clientAppId, originalPrincipal,
topicName.getTenant(), authenticationData);
- } catch (RestException authException) {
- log.warn("Failed to authorize {} on cluster {}",
clientAppId, topicName.toString());
- throw new
PulsarClientException(String.format("Authorization failed %s on topic %s with
error %s",
- clientAppId, topicName.toString(),
authException.getMessage()));
- }
- } catch (Exception ex) {
- // throw without wrapping to PulsarClientException that
considers: unknown error marked as internal
- // server error
- log.warn("Failed to authorize {} on cluster {} with unexpected
exception {}", clientAppId,
- topicName.toString(), ex.getMessage(), ex);
- throw ex;
- }
+ CompletableFuture<Void> authorizationFuture = new
CompletableFuture<>();
+ checkAuthorizationAsync(pulsar, topicName, clientAppId,
authenticationData)
+ .thenRun(() -> authorizationFuture.complete(null))
+ .exceptionally(e -> {
+ Throwable throwable =
FutureUtil.unwrapCompletionException(e);
+ if (throwable instanceof RestException) {
+ validateAdminAccessForTenantAsync(pulsar,
+ clientAppId, originalPrincipal,
topicName.getTenant(), authenticationData)
+ .thenRun(() -> {
+ authorizationFuture.complete(null);
+ }).exceptionally(ex -> {
+ Throwable throwable2 =
FutureUtil.unwrapCompletionException(ex);
+ if (throwable2 instanceof RestException) {
+ log.warn("Failed to authorize {} on
topic {}", clientAppId, topicName);
+
authorizationFuture.completeExceptionally(new PulsarClientException(
+ String.format("Authorization
failed %s on topic %s with error %s",
+ clientAppId, topicName,
throwable2.getMessage())));
+ } else {
+
authorizationFuture.completeExceptionally(throwable2);
+ }
+ return null;
+ });
+ } else {
+ // throw without wrapping to PulsarClientException
that considers: unknown error marked as
+ // internal server error
+ log.warn("Failed to authorize {} on topic {}",
clientAppId, topicName, throwable);
+ authorizationFuture.completeExceptionally(throwable);
+ }
+ return null;
+ });
- // validates global-namespace contains local/peer cluster: if
peer/local cluster present then lookup can
- // serve/redirect request else fail partitioned-metadata-request
so, client fails while creating
- // producer/consumer
- checkLocalOrGetPeerReplicationCluster(pulsar,
topicName.getNamespaceObject())
- .thenCompose(res -> pulsar.getBrokerService()
-
.fetchPartitionedTopicMetadataCheckAllowAutoCreationAsync(topicName))
- .thenAccept(metadata -> {
- if (log.isDebugEnabled()) {
- log.debug("[{}] Total number of partitions for
topic {} is {}", clientAppId, topicName,
- metadata.partitions);
- }
- metadataFuture.complete(metadata);
- }).exceptionally(ex -> {
- metadataFuture.completeExceptionally(ex.getCause());
- return null;
- });
- } catch (Exception ex) {
- metadataFuture.completeExceptionally(ex);
- }
+ // validates global-namespace contains local/peer cluster: if
peer/local cluster present then lookup can
+ // serve/redirect request else fail partitioned-metadata-request so,
client fails while creating
+ // producer/consumer
+ authorizationFuture.thenCompose(__ ->
+ checkLocalOrGetPeerReplicationCluster(pulsar,
topicName.getNamespaceObject()))
+ .thenCompose(res ->
+
pulsar.getBrokerService().fetchPartitionedTopicMetadataCheckAllowAutoCreationAsync(topicName))
+ .thenAccept(metadata -> {
+ if (log.isDebugEnabled()) {
+ log.debug("[{}] Total number of partitions for topic
{} is {}", clientAppId, topicName,
+ metadata.partitions);
+ }
+ metadataFuture.complete(metadata);
+ })
+ .exceptionally(e -> {
+
metadataFuture.completeExceptionally(FutureUtil.unwrapCompletionException(e));
+ return null;
+ });
return metadataFuture;
}
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/lookup/TopicLookupBase.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/lookup/TopicLookupBase.java
index 44d0ff2ec88..7769390ddcb 100644
---
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/lookup/TopicLookupBase.java
+++
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/lookup/TopicLookupBase.java
@@ -46,6 +46,7 @@ import org.apache.pulsar.common.naming.TopicName;
import org.apache.pulsar.common.policies.data.NamespaceOperation;
import org.apache.pulsar.common.policies.data.TopicOperation;
import org.apache.pulsar.common.util.Codec;
+import org.apache.pulsar.common.util.FutureUtil;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -217,24 +218,14 @@ public class TopicLookupBase extends PulsarWebResource {
cluster);
}
validationFuture.complete(newLookupResponse(differentClusterData.getBrokerServiceUrl(),
- differentClusterData.getBrokerServiceUrlTls(), true,
LookupType.Redirect, requestId, false));
+ differentClusterData.getBrokerServiceUrlTls(), true,
LookupType.Redirect,
+ requestId, false));
} else {
// (2) authorize client
- try {
- checkAuthorization(pulsarService, topicName, clientAppId,
authenticationData);
- } catch (RestException authException) {
- log.warn("Failed to authorized {} on cluster {}",
clientAppId, topicName.toString());
-
validationFuture.complete(newLookupErrorResponse(ServerError.AuthorizationError,
- authException.getMessage(), requestId));
- return;
- } catch (Exception e) {
- log.warn("Unknown error while authorizing {} on cluster
{}", clientAppId, topicName.toString());
- validationFuture.completeExceptionally(e);
- return;
- }
- // (3) validate global namespace
- checkLocalOrGetPeerReplicationCluster(pulsarService,
topicName.getNamespaceObject())
- .thenAccept(peerClusterData -> {
+ checkAuthorizationAsync(pulsarService, topicName, clientAppId,
authenticationData).thenRun(() -> {
+ // (3) validate global namespace
+ checkLocalOrGetPeerReplicationCluster(pulsarService,
+
topicName.getNamespaceObject()).thenAccept(peerClusterData -> {
if (peerClusterData == null) {
// (4) all validation passed: initiate lookup
validationFuture.complete(null);
@@ -245,21 +236,36 @@ public class TopicLookupBase extends PulsarWebResource {
if
(StringUtils.isBlank(peerClusterData.getBrokerServiceUrl())
&&
StringUtils.isBlank(peerClusterData.getBrokerServiceUrlTls())) {
validationFuture.complete(newLookupErrorResponse(ServerError.MetadataError,
- "Redirected cluster's brokerService
url is not configured", requestId));
+ "Redirected cluster's brokerService
url is not configured",
+ requestId));
return;
}
validationFuture.complete(newLookupResponse(peerClusterData.getBrokerServiceUrl(),
- peerClusterData.getBrokerServiceUrlTls(),
true, LookupType.Redirect, requestId,
+ peerClusterData.getBrokerServiceUrlTls(),
true, LookupType.Redirect,
+ requestId,
false));
-
}).exceptionally(ex -> {
- validationFuture.complete(
- newLookupErrorResponse(ServerError.MetadataError,
ex.getMessage(), requestId));
- return null;
- });
+ validationFuture.complete(
+
newLookupErrorResponse(ServerError.MetadataError,
+
FutureUtil.unwrapCompletionException(ex).getMessage(), requestId));
+ return null;
+ });
+ })
+ .exceptionally(e -> {
+ Throwable throwable =
FutureUtil.unwrapCompletionException(e);
+ if (throwable instanceof RestException) {
+ log.warn("Failed to authorized {} on cluster {}",
clientAppId, topicName);
+
validationFuture.complete(newLookupErrorResponse(ServerError.AuthorizationError,
+ throwable.getMessage(), requestId));
+ } else {
+ log.warn("Unknown error while authorizing {} on
cluster {}", clientAppId, topicName);
+ validationFuture.completeExceptionally(throwable);
+ }
+ return null;
+ });
}
}).exceptionally(ex -> {
- validationFuture.completeExceptionally(ex);
+
validationFuture.completeExceptionally(FutureUtil.unwrapCompletionException(ex));
return null;
});
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/web/PulsarWebResource.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/web/PulsarWebResource.java
index bc60b4ee326..61fd58ee565 100644
---
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/web/PulsarWebResource.java
+++
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/web/PulsarWebResource.java
@@ -261,6 +261,84 @@ public abstract class PulsarWebResource {
}
}
+ protected static CompletableFuture<Void> validateAdminAccessForTenantAsync(
+ PulsarService pulsar, String clientAppId, String
originalPrincipal, String tenant,
+ AuthenticationDataSource authenticationData) {
+ if (log.isDebugEnabled()) {
+ log.debug("check admin access on tenant: {} - Authenticated: {} --
role: {}", tenant,
+ (isClientAuthenticated(clientAppId)), clientAppId);
+ }
+ return
pulsar.getPulsarResources().getTenantResources().getTenantAsync(tenant)
+ .thenCompose(tenantInfoOptional -> {
+ if (!tenantInfoOptional.isPresent()) {
+ throw new RestException(Status.NOT_FOUND, "Tenant does
not exist");
+ }
+ TenantInfo tenantInfo = tenantInfoOptional.get();
+ if (pulsar.getConfiguration().isAuthenticationEnabled() &&
pulsar.getConfiguration()
+ .isAuthorizationEnabled()) {
+ if (!isClientAuthenticated(clientAppId)) {
+ throw new RestException(Status.FORBIDDEN, "Need to
authenticate to perform the request");
+ }
+
validateOriginalPrincipal(pulsar.getConfiguration().getProxyRoles(),
clientAppId,
+ originalPrincipal);
+ if
(pulsar.getConfiguration().getProxyRoles().contains(clientAppId)) {
+ AuthorizationService authorizationService =
+
pulsar.getBrokerService().getAuthorizationService();
+ return authorizationService.isTenantAdmin(tenant,
clientAppId, tenantInfo,
+
authenticationData).thenCompose(isTenantAdmin -> {
+ String debugMsg = "Successfully authorized {}
(proxied by {}) on tenant {}";
+ if (!isTenantAdmin) {
+ return
authorizationService.isSuperUser(clientAppId, authenticationData)
+
.thenCombine(authorizationService.isSuperUser(originalPrincipal,
+
authenticationData),
+ (proxyAuthorized,
originalPrincipalAuthorized) -> {
+ if (!proxyAuthorized
|| !originalPrincipalAuthorized) {
+ throw new
RestException(Status.UNAUTHORIZED,
+
String.format(
+
"Proxy not authorized to access "
+
+ "resource (proxy:%s,original:%s)"
+ ,
clientAppId, originalPrincipal));
+ } else {
+ if
(log.isDebugEnabled()) {
+
log.debug(debugMsg, originalPrincipal,
+
clientAppId, tenant);
+ }
+ return null;
+ }
+ });
+ } else {
+ if (log.isDebugEnabled()) {
+ log.debug(debugMsg, originalPrincipal,
clientAppId, tenant);
+ }
+ return
CompletableFuture.completedFuture(null);
+ }
+ });
+ } else {
+ return pulsar.getBrokerService()
+ .getAuthorizationService()
+ .isSuperUser(clientAppId,
authenticationData)
+ .thenCompose(isSuperUser -> {
+ if (!isSuperUser) {
+ return
pulsar.getBrokerService().getAuthorizationService()
+ .isTenantAdmin(tenant,
clientAppId, tenantInfo, authenticationData);
+ } else {
+ return
CompletableFuture.completedFuture(true);
+ }
+ }).thenAccept(authorized -> {
+ if (!authorized) {
+ throw new
RestException(Status.UNAUTHORIZED,
+ "Don't have permission to
administrate resources on this tenant");
+ } else {
+ log.debug("Successfully authorized
{} on tenant {}", clientAppId, tenant);
+ }
+ });
+ }
+ } else {
+ return CompletableFuture.completedFuture(null);
+ }
+ });
+ }
+
protected static void validateAdminAccessForTenant(PulsarService pulsar,
String clientAppId,
String
originalPrincipal, String tenant,
AuthenticationDataSource authenticationData)
@@ -806,18 +884,21 @@ public abstract class PulsarWebResource {
return null;
}
- protected static void checkAuthorization(PulsarService pulsarService,
TopicName topicName, String role,
- AuthenticationDataSource authenticationData) throws Exception {
+ protected static CompletableFuture<Void>
checkAuthorizationAsync(PulsarService pulsarService, TopicName topicName,
+ String role, AuthenticationDataSource
authenticationData) {
if (!pulsarService.getConfiguration().isAuthorizationEnabled()) {
// No enforcing of authorization policies
- return;
+ return CompletableFuture.completedFuture(null);
}
// get zk policy manager
- if
(!pulsarService.getBrokerService().getAuthorizationService().allowTopicOperation(topicName,
- TopicOperation.LOOKUP, null, role, authenticationData)) {
- log.warn("[{}] Role {} is not allowed to lookup topic", topicName,
role);
- throw new RestException(Status.UNAUTHORIZED, "Don't have
permission to connect to this namespace");
- }
+ return
pulsarService.getBrokerService().getAuthorizationService().allowTopicOperationAsync(topicName,
+ TopicOperation.LOOKUP, null, role,
authenticationData).thenAccept(allow -> {
+ if (!allow) {
+ log.warn("[{}] Role {} is not allowed to lookup
topic", topicName, role);
+ throw new RestException(Status.UNAUTHORIZED,
+ "Don't have permission to connect to this
namespace");
+ }
+ });
}
// Used for unit tests access