This is an automated email from the ASF dual-hosted git repository.
sijie pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/master by this push:
new 5dc5de8 [Issue 5720][authz] - add topics authz granularity (#7523)
5dc5de8 is described below
commit 5dc5de849b582c7f312764db043da9483c17146a
Author: Alexandre DUVAL <[email protected]>
AuthorDate: Sat May 8 10:23:32 2021 +0200
[Issue 5720][authz] - add topics authz granularity (#7523)
Fixes a part of #5720
### Motivation
add granularity in topics api authz
---
.../authentication/AuthenticationDataHttp.java | 20 ++
.../authorization/AuthorizationProvider.java | 43 ++++-
.../broker/authorization/AuthorizationService.java | 163 ++++++++++++----
.../authorization/PulsarAuthorizationProvider.java | 108 +++++++----
.../apache/pulsar/broker/admin/AdminResource.java | 18 +-
.../pulsar/broker/admin/impl/NamespacesBase.java | 2 +-
.../broker/admin/impl/PersistentTopicsBase.java | 208 ++++++++++-----------
.../broker/admin/v1/NonPersistentTopics.java | 17 +-
.../broker/admin/v2/NonPersistentTopics.java | 19 +-
.../pulsar/broker/admin/v2/PersistentTopics.java | 9 +-
.../pulsar/broker/lookup/TopicLookupBase.java | 9 +-
.../pulsar/broker/web/PulsarWebResource.java | 53 +++++-
.../pulsar/broker/admin/PersistentTopicsTest.java | 1 -
.../pulsar/broker/service/ServerCnxTest.java | 1 +
.../api/AuthenticatedProducerConsumerTest.java | 11 +-
.../api/AuthorizationProducerConsumerTest.java | 21 ++-
.../pulsar/common/policies/data/PolicyName.java | 1 +
.../common/policies/data/TopicOperation.java | 3 +
18 files changed, 471 insertions(+), 236 deletions(-)
diff --git
a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/authentication/AuthenticationDataHttp.java
b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/authentication/AuthenticationDataHttp.java
index a990b9e..958e5ea 100644
---
a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/authentication/AuthenticationDataHttp.java
+++
b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/authentication/AuthenticationDataHttp.java
@@ -28,6 +28,8 @@ public class AuthenticationDataHttp implements
AuthenticationDataSource {
protected final HttpServletRequest request;
protected final SocketAddress remoteAddress;
+ protected String subscription;
+
public AuthenticationDataHttp(HttpServletRequest request) {
if (request == null) {
throw new IllegalArgumentException();
@@ -69,4 +71,22 @@ public class AuthenticationDataHttp implements
AuthenticationDataSource {
return remoteAddress;
}
+ /*
+ * Subscription
+ */
+ @Override
+ public boolean hasSubscription() {
+ return this.subscription != null;
+ }
+
+ @Override
+ public void setSubscription(String subscription) {
+ this.subscription = subscription;
+ }
+
+ @Override
+ public String getSubscription() {
+ return subscription;
+ }
+
}
diff --git
a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/authorization/AuthorizationProvider.java
b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/authorization/AuthorizationProvider.java
index 0403f34..f8e73c3 100644
---
a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/authorization/AuthorizationProvider.java
+++
b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/authorization/AuthorizationProvider.java
@@ -37,6 +37,7 @@ import org.apache.pulsar.common.policies.data.TenantInfo;
import org.apache.pulsar.common.policies.data.NamespaceOperation;
import org.apache.pulsar.common.policies.data.TenantOperation;
import org.apache.pulsar.common.policies.data.TopicOperation;
+import org.apache.pulsar.common.policies.data.TopicPolicies;
import org.apache.pulsar.common.util.FutureUtil;
import org.apache.pulsar.common.util.RestException;
@@ -293,7 +294,8 @@ public interface AuthorizationProvider extends Closeable {
NamespaceOperation operation,
AuthenticationDataSource authData) {
return FutureUtil.failedFuture(
- new IllegalStateException("NamespaceOperation is not supported by
the Authorization provider you are using."));
+ new IllegalStateException("NamespaceOperation [" +
operation.name() + "] is not supported by "
+ + "the Authorization provider you are using."));
}
default Boolean allowNamespaceOperation(NamespaceName namespaceName,
@@ -363,7 +365,8 @@ public interface AuthorizationProvider extends Closeable {
String role,
AuthenticationDataSource authData) {
return FutureUtil.failedFuture(
- new IllegalStateException("NamespacePolicyOperation is not
supported by the Authorization provider you are using."));
+ new IllegalStateException("NamespacePolicyOperation [" +
policy.name() + "/" + operation.name() + "] "
+ + "is not supported by is not supported by the
Authorization provider you are using."));
}
default Boolean allowNamespacePolicyOperation(NamespaceName namespaceName,
@@ -436,7 +439,8 @@ public interface AuthorizationProvider extends Closeable {
TopicOperation
operation,
AuthenticationDataSource authData) {
return FutureUtil.failedFuture(
- new IllegalStateException("TopicOperation is not supported by the
Authorization provider you are using."));
+ new IllegalStateException("TopicOperation [" + operation.name() +
"] is not supported by the Authorization"
+ + "provider you are using."));
}
default Boolean allowTopicOperation(TopicName topicName,
@@ -489,4 +493,37 @@ public interface AuthorizationProvider extends Closeable {
throw new RestException(e.getCause());
}
}
+
+ /**
+ * Check if a given <tt>role</tt> is allowed to execute a given topic
<tt>operation</tt> on topic's <tt>policy</tt>.
+ *
+ * @param topic topic name
+ * @param role role name
+ * @param operation topic operation
+ * @param authData authenticated data
+ * @return CompletableFuture<Boolean>
+ */
+ default CompletableFuture<Boolean>
allowTopicPolicyOperationAsync(TopicName topic,
+ String
role,
+
PolicyName policy,
+
PolicyOperation operation,
+
AuthenticationDataSource authData) {
+ return FutureUtil.failedFuture(
+ new IllegalStateException("TopicPolicyOperation [" +
policy.name() + "/" + operation.name() + "] "
+ + "is not supported by the Authorization provider you
are using."));
+ }
+
+ default Boolean allowTopicPolicyOperation(TopicName topicName,
+ String role,
+ PolicyName policy,
+ PolicyOperation operation,
+ AuthenticationDataSource
authData) {
+ try {
+ return allowTopicPolicyOperationAsync(topicName, role, policy,
operation, authData).get();
+ } catch (InterruptedException e) {
+ throw new RestException(e);
+ } catch (ExecutionException e) {
+ throw new RestException(e.getCause());
+ }
+ }
}
diff --git
a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/authorization/AuthorizationService.java
b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/authorization/AuthorizationService.java
index 75c759f..403542f 100644
---
a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/authorization/AuthorizationService.java
+++
b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/authorization/AuthorizationService.java
@@ -18,10 +18,6 @@
*/
package org.apache.pulsar.broker.authorization;
-import java.util.Set;
-import java.util.concurrent.CompletableFuture;
-import java.util.concurrent.ExecutionException;
-import javax.ws.rs.core.Response;
import org.apache.commons.lang3.StringUtils;
import org.apache.pulsar.broker.PulsarServerException;
import org.apache.pulsar.broker.ServiceConfiguration;
@@ -30,10 +26,10 @@ import
org.apache.pulsar.broker.cache.ConfigurationCacheService;
import org.apache.pulsar.common.naming.NamespaceName;
import org.apache.pulsar.common.naming.TopicName;
import org.apache.pulsar.common.policies.data.AuthAction;
+import org.apache.pulsar.common.policies.data.NamespaceOperation;
import org.apache.pulsar.common.policies.data.PolicyName;
import org.apache.pulsar.common.policies.data.PolicyOperation;
import org.apache.pulsar.common.policies.data.TenantInfo;
-import org.apache.pulsar.common.policies.data.NamespaceOperation;
import org.apache.pulsar.common.policies.data.TenantOperation;
import org.apache.pulsar.common.policies.data.TopicOperation;
import org.apache.pulsar.common.util.FutureUtil;
@@ -41,6 +37,11 @@ import org.apache.pulsar.common.util.RestException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import javax.ws.rs.core.Response;
+import java.util.Set;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ExecutionException;
+
import static java.util.concurrent.TimeUnit.SECONDS;
/**
@@ -103,7 +104,7 @@ public class AuthorizationService {
* when failed to grant permission
*/
public CompletableFuture<Void> grantPermissionAsync(NamespaceName
namespace, Set<AuthAction> actions, String role,
- String authDataJson) {
+ String authDataJson) {
if (provider != null) {
return provider.grantPermissionAsync(namespace, actions, role,
authDataJson);
@@ -122,7 +123,7 @@ public class AuthorizationService {
* @return
*/
public CompletableFuture<Void>
grantSubscriptionPermissionAsync(NamespaceName namespace, String
subscriptionName,
- Set<String> roles, String authDataJson) {
+
Set<String> roles, String authDataJson) {
if (provider != null) {
return provider.grantSubscriptionPermissionAsync(namespace,
subscriptionName, roles, authDataJson);
@@ -139,7 +140,7 @@ public class AuthorizationService {
* @return
*/
public CompletableFuture<Void>
revokeSubscriptionPermissionAsync(NamespaceName namespace, String
subscriptionName,
- String role, String authDataJson) {
+ String
role, String authDataJson) {
if (provider != null) {
return provider.revokeSubscriptionPermissionAsync(namespace,
subscriptionName, role, authDataJson);
}
@@ -158,7 +159,7 @@ public class AuthorizationService {
* when failed to grant permission
*/
public CompletableFuture<Void> grantPermissionAsync(TopicName topicname,
Set<AuthAction> actions, String role,
- String authDataJson) {
+ String authDataJson) {
if (provider != null) {
return provider.grantPermissionAsync(topicname, actions, role,
authDataJson);
@@ -176,7 +177,7 @@ public class AuthorizationService {
* the app id used to send messages to the topic.
*/
public CompletableFuture<Boolean> canProduceAsync(TopicName topicName,
String role,
- AuthenticationDataSource authenticationData) {
+ AuthenticationDataSource
authenticationData) {
if (!this.conf.isAuthorizationEnabled()) {
return CompletableFuture.completedFuture(true);
@@ -204,7 +205,7 @@ public class AuthorizationService {
* the subscription name defined by the client
*/
public CompletableFuture<Boolean> canConsumeAsync(TopicName topicName,
String role,
- AuthenticationDataSource authenticationData, String subscription) {
+ AuthenticationDataSource
authenticationData, String subscription) {
if (!this.conf.isAuthorizationEnabled()) {
return CompletableFuture.completedFuture(true);
}
@@ -237,7 +238,7 @@ public class AuthorizationService {
}
public boolean canConsume(TopicName topicName, String role,
AuthenticationDataSource authenticationData,
- String subscription) throws Exception {
+ String subscription) throws Exception {
try {
return canConsumeAsync(topicName, role, authenticationData,
subscription)
.get(conf.getZooKeeperOperationTimeoutSeconds(), SECONDS);
@@ -289,7 +290,7 @@ public class AuthorizationService {
* @throws Exception
*/
public CompletableFuture<Boolean> canLookupAsync(TopicName topicName,
String role,
- AuthenticationDataSource authenticationData) {
+ AuthenticationDataSource
authenticationData) {
CompletableFuture<Boolean> finalResult = new
CompletableFuture<Boolean>();
canProduceAsync(topicName, role,
authenticationData).whenComplete((produceAuthorized, ex) -> {
if (ex == null) {
@@ -327,7 +328,7 @@ public class AuthorizationService {
}
public CompletableFuture<Boolean> allowFunctionOpsAsync(NamespaceName
namespaceName, String role,
-
AuthenticationDataSource authenticationData) {
+
AuthenticationDataSource authenticationData) {
return provider.allowFunctionOpsAsync(namespaceName, role,
authenticationData);
}
@@ -396,11 +397,11 @@ public class AuthorizationService {
validateOriginalPrincipal(conf.getProxyRoles(), role, originalRole);
if (isProxyRole(role)) {
CompletableFuture<Boolean> isRoleAuthorizedFuture =
allowTenantOperationAsync(
- tenantName, operation, role, authData);
+ tenantName, operation, role, authData);
CompletableFuture<Boolean> isOriginalAuthorizedFuture =
allowTenantOperationAsync(
- tenantName, operation, originalRole, authData);
+ tenantName, operation, originalRole, authData);
return
isRoleAuthorizedFuture.thenCombine(isOriginalAuthorizedFuture,
- (isRoleAuthorized, isOriginalAuthorized) -> isRoleAuthorized
&& isOriginalAuthorized);
+ (isRoleAuthorized, isOriginalAuthorized) ->
isRoleAuthorized && isOriginalAuthorized);
} else {
return allowTenantOperationAsync(tenantName, operation, role,
authData);
}
@@ -413,7 +414,7 @@ public class AuthorizationService {
AuthenticationDataSource authData) {
try {
return allowTenantOperationAsync(
- tenantName, operation, originalRole, role, authData).get();
+ tenantName, operation, originalRole, role, authData).get();
} catch (InterruptedException e) {
throw new RestException(e);
} catch (ExecutionException e) {
@@ -457,11 +458,11 @@ public class AuthorizationService {
validateOriginalPrincipal(conf.getProxyRoles(), role, originalRole);
if (isProxyRole(role)) {
CompletableFuture<Boolean> isRoleAuthorizedFuture =
allowNamespaceOperationAsync(
- namespaceName, operation, role, authData);
+ namespaceName, operation, role, authData);
CompletableFuture<Boolean> isOriginalAuthorizedFuture =
allowNamespaceOperationAsync(
- namespaceName, operation, originalRole, authData);
+ namespaceName, operation, originalRole, authData);
return
isRoleAuthorizedFuture.thenCombine(isOriginalAuthorizedFuture,
- (isRoleAuthorized, isOriginalAuthorized) -> isRoleAuthorized
&& isOriginalAuthorized);
+ (isRoleAuthorized, isOriginalAuthorized) ->
isRoleAuthorized && isOriginalAuthorized);
} else {
return allowNamespaceOperationAsync(namespaceName, operation,
role, authData);
}
@@ -474,7 +475,7 @@ public class AuthorizationService {
AuthenticationDataSource authData) {
try {
return allowNamespaceOperationAsync(
- namespaceName, operation, originalRole, role, authData).get();
+ namespaceName, operation, originalRole, role,
authData).get();
} catch (InterruptedException e) {
throw new RestException(e);
} catch (ExecutionException e) {
@@ -520,11 +521,11 @@ public class AuthorizationService {
validateOriginalPrincipal(conf.getProxyRoles(), role, originalRole);
if (isProxyRole(role)) {
CompletableFuture<Boolean> isRoleAuthorizedFuture =
allowNamespacePolicyOperationAsync(
- namespaceName, policy, operation, role, authData);
+ namespaceName, policy, operation, role, authData);
CompletableFuture<Boolean> isOriginalAuthorizedFuture =
allowNamespacePolicyOperationAsync(
- namespaceName, policy, operation, originalRole, authData);
+ namespaceName, policy, operation, originalRole, authData);
return
isRoleAuthorizedFuture.thenCombine(isOriginalAuthorizedFuture,
- (isRoleAuthorized, isOriginalAuthorized) -> isRoleAuthorized
&& isOriginalAuthorized);
+ (isRoleAuthorized, isOriginalAuthorized) ->
isRoleAuthorized && isOriginalAuthorized);
} else {
return allowNamespacePolicyOperationAsync(namespaceName, policy,
operation, role, authData);
}
@@ -538,7 +539,71 @@ public class AuthorizationService {
AuthenticationDataSource
authData) {
try {
return allowNamespacePolicyOperationAsync(
- namespaceName, policy, operation, originalRole, role,
authData).get();
+ namespaceName, policy, operation, originalRole, role,
authData).get();
+ } catch (InterruptedException e) {
+ throw new RestException(e);
+ } catch (ExecutionException e) {
+ throw new RestException(e.getCause());
+ }
+ }
+
+ /**
+ * Grant authorization-action permission on a topic to the given client
+ *
+ * @param topicName
+ * @param policy
+ * @param operation
+ * @param role
+ * @param authData additional authdata in json for targeted authorization
provider
+ * @throws IllegalStateException when failed to grant permission
+ */
+ public CompletableFuture<Boolean> allowTopicPolicyOperationAsync(TopicName
topicName,
+
PolicyName policy,
+
PolicyOperation operation,
+ String
role,
+
AuthenticationDataSource authData) {
+ if (!this.conf.isAuthorizationEnabled()) {
+ return CompletableFuture.completedFuture(true);
+ }
+
+ if (provider != null) {
+ return provider.allowTopicPolicyOperationAsync(topicName, role,
policy, operation, authData);
+ }
+
+ return FutureUtil.failedFuture(new IllegalStateException("No
authorization provider configured for " +
+ "allowTopicPolicyOperationAsync"));
+ }
+
+ public CompletableFuture<Boolean> allowTopicPolicyOperationAsync(TopicName
topicName,
+
PolicyName policy,
+
PolicyOperation operation,
+ String
originalRole,
+ String
role,
+
AuthenticationDataSource authData) {
+
+ validateOriginalPrincipal(conf.getProxyRoles(), role, originalRole);
+ if (isProxyRole(role)) {
+ CompletableFuture<Boolean> isRoleAuthorizedFuture =
allowTopicPolicyOperationAsync(
+ topicName, policy, operation, role, authData);
+ CompletableFuture<Boolean> isOriginalAuthorizedFuture =
allowTopicPolicyOperationAsync(
+ topicName, policy, operation, originalRole, authData);
+ return
isRoleAuthorizedFuture.thenCombine(isOriginalAuthorizedFuture,
+ (isRoleAuthorized, isOriginalAuthorized) ->
isRoleAuthorized && isOriginalAuthorized);
+ } else {
+ return allowTopicPolicyOperationAsync(topicName, policy,
operation, role, authData);
+ }
+ }
+
+
+ public Boolean allowTopicPolicyOperation(TopicName topicName,
+ PolicyName policy,
+ PolicyOperation operation,
+ String originalRole,
+ String role,
+ AuthenticationDataSource
authData) {
+ try {
+ return allowTopicPolicyOperationAsync(
+ topicName, policy, operation, originalRole, role,
authData).get();
} catch (InterruptedException e) {
throw new RestException(e);
} catch (ExecutionException e) {
@@ -564,7 +629,7 @@ public class AuthorizationService {
AuthenticationDataSource authData) {
if (log.isDebugEnabled()) {
log.debug("Check if role {} is allowed to execute topic operation
{} on topic {}",
- role, operation, topicName);
+ role, operation, topicName);
}
if (!this.conf.isAuthorizationEnabled()) {
return CompletableFuture.completedFuture(true);
@@ -572,21 +637,21 @@ public class AuthorizationService {
if (provider != null) {
CompletableFuture<Boolean> allowFuture =
- provider.allowTopicOperationAsync(topicName, role, operation,
authData);
+ provider.allowTopicOperationAsync(topicName, role,
operation, authData);
if (log.isDebugEnabled()) {
return allowFuture.whenComplete((allowed, exception) -> {
if (exception == null) {
if (allowed) {
log.debug("Topic operation {} on topic {} is
allowed: role = {}",
- operation, topicName, role);
- } else{
+ operation, topicName, role);
+ } else {
log.debug("Topic operation {} on topic {} is NOT
allowed: role = {}",
- operation, topicName, role);
+ operation, topicName, role);
}
} else {
log.debug("Failed to check if topic operation {} on
topic {} is allowed:"
- + " role = {}",
- operation, topicName, role, exception);
+ + " role = {}",
+ operation, topicName, role, exception);
}
});
} else {
@@ -597,4 +662,36 @@ public class AuthorizationService {
return FutureUtil.failedFuture(new IllegalStateException("No
authorization provider configured for " +
"allowTopicOperationAsync"));
}
+
+ public CompletableFuture<Boolean> allowTopicOperationAsync(TopicName
topicName,
+ TopicOperation
operation,
+ String
originalRole,
+ String role,
+
AuthenticationDataSource authData) {
+ validateOriginalPrincipal(conf.getProxyRoles(), role, originalRole);
+ if (isProxyRole(role)) {
+ CompletableFuture<Boolean> isRoleAuthorizedFuture =
allowTopicOperationAsync(
+ topicName, operation, role, authData);
+ CompletableFuture<Boolean> isOriginalAuthorizedFuture =
allowTopicOperationAsync(
+ topicName, operation, originalRole, authData);
+ return
isRoleAuthorizedFuture.thenCombine(isOriginalAuthorizedFuture,
+ (isRoleAuthorized, isOriginalAuthorized) ->
isRoleAuthorized && isOriginalAuthorized);
+ } else {
+ return allowTopicOperationAsync(topicName, operation, role,
authData);
+ }
+ }
+
+ public Boolean allowTopicOperation(TopicName topicName,
+ TopicOperation operation,
+ String originalRole,
+ String role,
+ AuthenticationDataSource authData) {
+ try {
+ return allowTopicOperationAsync(topicName, operation,
originalRole, role, authData).get();
+ } catch (InterruptedException e) {
+ throw new RestException(e);
+ } catch (ExecutionException e) {
+ throw new RestException(e.getCause());
+ }
+ }
}
diff --git
a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/authorization/PulsarAuthorizationProvider.java
b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/authorization/PulsarAuthorizationProvider.java
index f0118b6..05c9fb2 100644
---
a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/authorization/PulsarAuthorizationProvider.java
+++
b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/authorization/PulsarAuthorizationProvider.java
@@ -529,17 +529,18 @@ public class PulsarAuthorizationProvider implements
AuthorizationProvider {
NamespaceOperation operation,
AuthenticationDataSource authData) {
CompletableFuture<Boolean> isAuthorizedFuture;
- if (operation == NamespaceOperation.PACKAGES) {
- isAuthorizedFuture =
allowTheSpecifiedActionOpsAsync(namespaceName, role, authData,
AuthAction.packages);
- } else {
- isAuthorizedFuture = CompletableFuture.completedFuture(false);
+ switch (operation) {
+ case PACKAGES:
+ isAuthorizedFuture =
allowTheSpecifiedActionOpsAsync(namespaceName, role, authData,
AuthAction.packages);
+ break;
+ default:
+ isAuthorizedFuture = CompletableFuture.completedFuture(false);
}
CompletableFuture<Boolean> isTenantAdminFuture =
validateTenantAdminAccess(namespaceName.getTenant(), role, authData);
return isTenantAdminFuture.thenCombine(isAuthorizedFuture,
(isTenantAdmin, isAuthorized) -> {
if (log.isDebugEnabled()) {
- log.debug("Verify if role {} is allowed to {} to topic {}:"
- + " isTenantAdmin={}, isAuthorized={}",
- role, operation, namespaceName, isTenantAdmin,
isAuthorized);
+ log.debug("Verify if role {} is allowed to {} to topic {}:
isTenantAdmin={}, isAuthorized={}",
+ role, operation, namespaceName, isTenantAdmin,
isAuthorized);
}
return isTenantAdmin || isAuthorized;
});
@@ -559,29 +560,48 @@ public class PulsarAuthorizationProvider implements
AuthorizationProvider {
String role,
TopicOperation
operation,
AuthenticationDataSource authData) {
+ log.debug("Check allowTopicOperationAsync [" + operation.name() + "]
on [" + topicName.toString() + "].");
+
CompletableFuture<Boolean> isAuthorizedFuture;
switch (operation) {
- case LOOKUP: isAuthorizedFuture = canLookupAsync(topicName, role,
authData);
+ case LOOKUP:
+ case GET_STATS:
+ isAuthorizedFuture = canLookupAsync(topicName, role, authData);
break;
- case PRODUCE: isAuthorizedFuture = canProduceAsync(topicName,
role, authData);
+ case PRODUCE:
+ isAuthorizedFuture = canProduceAsync(topicName, role,
authData);
break;
- case CONSUME: isAuthorizedFuture = canConsumeAsync(topicName,
role, authData, authData.getSubscription());
+ case GET_SUBSCRIPTIONS:
+ case CONSUME:
+ case SUBSCRIBE:
+ case UNSUBSCRIBE:
+ case SKIP:
+ case EXPIRE_MESSAGES:
+ case PEEK_MESSAGES:
+ case RESET_CURSOR:
+ isAuthorizedFuture = canConsumeAsync(topicName, role,
authData, authData.getSubscription());
break;
+ case TERMINATE:
+ case COMPACT:
+ case OFFLOAD:
+ case UNLOAD:
+ case ADD_BUNDLE_RANGE:
+ case GET_BUNDLE_RANGE:
+ case DELETE_BUNDLE_RANGE:
+ return validateTenantAdminAccess(topicName.getTenant(), role,
authData);
default:
- return FutureUtil.failedFuture(new
IllegalStateException("TopicOperation is not supported."));
+ return FutureUtil.failedFuture(
+ new IllegalStateException("TopicOperation [" +
operation.name() + "] is not supported."));
}
- CompletableFuture<Boolean> isSuperUserFuture = isSuperUser(role,
authData, conf);
-
- // check isSuperUser first
- return isSuperUserFuture
- .thenCompose(isSuperUser -> {
+ return validateTenantAdminAccess(topicName.getTenant(), role, authData)
+ .thenCompose(isSuperUserOrAdmin -> {
if (log.isDebugEnabled()) {
- log.debug("Verify if role {} is allowed to {} to topic
{}: isSuperUser={}",
- role, operation, topicName, isSuperUser);
+ log.debug("Verify if role {} is allowed to {} to topic
{}: isSuperUserOrAdmin={}",
+ role, operation, topicName,
isSuperUserOrAdmin);
}
- if (isSuperUser) {
+ if (isSuperUserOrAdmin) {
return CompletableFuture.completedFuture(true);
} else {
return isAuthorizedFuture;
@@ -589,6 +609,15 @@ public class PulsarAuthorizationProvider implements
AuthorizationProvider {
});
}
+ @Override
+ public CompletableFuture<Boolean> allowTopicPolicyOperationAsync(TopicName
topicName,
+ String
role,
+
PolicyName policyName,
+
PolicyOperation policyOperation,
+
AuthenticationDataSource authData) {
+ return validateTenantAdminAccess(topicName.getTenant(), role,
authData);
+ }
+
private static String path(String... parts) {
StringBuilder sb = new StringBuilder();
sb.append("/admin/");
@@ -596,27 +625,28 @@ public class PulsarAuthorizationProvider implements
AuthorizationProvider {
return sb.toString();
}
- private CompletableFuture<Boolean> validateTenantAdminAccess(String
tenantName,
+ public CompletableFuture<Boolean> validateTenantAdminAccess(String
tenantName,
String role,
-
AuthenticationDataSource authData) {
- try {
- TenantInfo tenantInfo = pulsarResources.getTenantResources()
- .get(path(POLICIES, tenantName))
- .orElseThrow(() -> new
RestException(Response.Status.NOT_FOUND, "Tenant does not exist"));
-
- // role check
- CompletableFuture<Boolean> isRoleSuperUserFuture =
isSuperUser(role, authData, conf);
- CompletableFuture<Boolean> isRoleTenantAdminFuture =
isTenantAdmin(tenantName, role, tenantInfo, authData);
- return isRoleSuperUserFuture
- .thenCombine(isRoleTenantAdminFuture, (isRoleSuperUser,
isRoleTenantAdmin) ->
- isRoleSuperUser || isRoleTenantAdmin);
- } catch (NotFoundException e) {
- log.warn("Failed to get tenant info data for non existing tenant
{}", tenantName);
- throw new RestException(Response.Status.NOT_FOUND, "Tenant does
not exist");
- } catch (Exception e) {
- log.error("Failed to get tenant {}", tenantName, e);
- throw new RestException(e);
- }
+
AuthenticationDataSource authData) {
+ return isSuperUser(role, authData, conf)
+ .thenCompose(isSuperUser -> {
+ if (isSuperUser) {
+ return CompletableFuture.completedFuture(true);
+ } else {
+ try {
+ TenantInfo tenantInfo =
pulsarResources.getTenantResources()
+ .get(path(POLICIES, tenantName))
+ .orElseThrow(() -> new
RestException(Response.Status.NOT_FOUND, "Tenant does not exist"));
+ return isTenantAdmin(tenantName, role, tenantInfo,
authData);
+ } catch (NotFoundException e) {
+ log.warn("Failed to get tenant info data for non
existing tenant {}", tenantName);
+ throw new RestException(Response.Status.NOT_FOUND,
"Tenant does not exist");
+ } catch (Exception e) {
+ log.error("Failed to get tenant {}", tenantName,
e);
+ throw new RestException(e);
+ }
+ }
+ });
}
}
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/AdminResource.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/AdminResource.java
index 2b9d67b..78792d1 100644
---
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/AdminResource.java
+++
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/AdminResource.java
@@ -55,10 +55,12 @@ import
org.apache.pulsar.common.partition.PartitionedTopicMetadata;
import org.apache.pulsar.common.policies.data.BacklogQuota;
import org.apache.pulsar.common.policies.data.BundlesData;
import org.apache.pulsar.common.policies.data.DispatchRate;
+import org.apache.pulsar.common.policies.data.NamespaceOperation;
import org.apache.pulsar.common.policies.data.PersistencePolicies;
import org.apache.pulsar.common.policies.data.Policies;
import org.apache.pulsar.common.policies.data.RetentionPolicies;
import org.apache.pulsar.common.policies.data.SubscribeRate;
+import org.apache.pulsar.common.policies.data.TopicOperation;
import org.apache.pulsar.common.policies.data.TopicPolicies;
import org.apache.pulsar.common.util.Codec;
import org.apache.pulsar.common.util.FutureUtil;
@@ -490,13 +492,9 @@ public abstract class AdminResource extends
PulsarWebResource {
}
try {
- checkConnect(topicName);
- } catch (WebApplicationException e) {
- try {
- validateAdminAccessForTenant(topicName.getTenant());
- } catch (Exception ex) {
- return FutureUtil.failedFuture(ex);
- }
+ validateTopicOperation(topicName, TopicOperation.LOOKUP);
+ } catch (RestException e) {
+ return FutureUtil.failedFuture(e);
} catch (Exception e) {
// unknown error marked as internal server error
log.warn("Unexpected error while authorizing lookup. topic={},
role={}. Error: {}", topicName,
@@ -520,9 +518,7 @@ public abstract class AdminResource extends
PulsarWebResource {
validateGlobalNamespaceOwnership(topicName.getNamespaceObject());
try {
- checkConnect(topicName);
- } catch (WebApplicationException e) {
- validateAdminAccessForTenant(topicName.getTenant());
+ validateTopicOperation(topicName, TopicOperation.LOOKUP);
} catch (Exception e) {
// unknown error marked as internal server error
log.warn("Unexpected error while authorizing lookup. topic={},
role={}. Error: {}", topicName,
@@ -694,7 +690,7 @@ public abstract class AdminResource extends
PulsarWebResource {
final int maxPartitions =
pulsar().getConfig().getMaxNumPartitionsPerPartitionedTopic();
try {
- validateAdminAccessForTenant(topicName.getTenant());
+ validateNamespaceOperation(topicName.getNamespaceObject(),
NamespaceOperation.CREATE_TOPIC);
} catch (Exception e) {
log.error("[{}] Failed to create partitioned topic {}",
clientAppId(), topicName, e);
resumeAsyncResponseExceptionally(asyncResponse, e);
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/NamespacesBase.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/NamespacesBase.java
index a58cb7d..013a9ea 100644
---
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/NamespacesBase.java
+++
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/NamespacesBase.java
@@ -831,7 +831,7 @@ public abstract class NamespacesBase extends AdminResource {
}
protected void internalSetAutoSubscriptionCreation(
- AsyncResponse asyncResponse, AutoSubscriptionCreationOverride
autoSubscriptionCreationOverride) {
+ AsyncResponse asyncResponse, AutoSubscriptionCreationOverride
autoSubscriptionCreationOverride) {
validateNamespacePolicyOperation(namespaceName,
PolicyName.AUTO_SUBSCRIPTION_CREATION, PolicyOperation.WRITE);
validatePoliciesReadOnlyAccess();
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java
index 76747e4..2c1af56 100644
---
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java
+++
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java
@@ -107,6 +107,7 @@ import org.apache.pulsar.common.policies.data.BacklogQuota;
import org.apache.pulsar.common.policies.data.DelayedDeliveryPolicies;
import org.apache.pulsar.common.policies.data.DispatchRate;
import org.apache.pulsar.common.policies.data.InactiveTopicPolicies;
+import org.apache.pulsar.common.policies.data.NamespaceOperation;
import org.apache.pulsar.common.policies.data.OffloadPolicies;
import org.apache.pulsar.common.policies.data.PartitionedTopicInternalStats;
import org.apache.pulsar.common.policies.data.PartitionedTopicStats;
@@ -114,10 +115,13 @@ import
org.apache.pulsar.common.policies.data.PersistencePolicies;
import org.apache.pulsar.common.policies.data.PersistentOfflineTopicStats;
import org.apache.pulsar.common.policies.data.PersistentTopicInternalStats;
import org.apache.pulsar.common.policies.data.Policies;
+import org.apache.pulsar.common.policies.data.PolicyName;
+import org.apache.pulsar.common.policies.data.PolicyOperation;
import org.apache.pulsar.common.policies.data.PublishRate;
import org.apache.pulsar.common.policies.data.RetentionPolicies;
import org.apache.pulsar.common.policies.data.SubscribeRate;
import org.apache.pulsar.common.policies.data.SubscriptionStats;
+import org.apache.pulsar.common.policies.data.TopicOperation;
import org.apache.pulsar.common.policies.data.TopicPolicies;
import org.apache.pulsar.common.policies.data.TopicStats;
import org.apache.pulsar.common.protocol.Commands;
@@ -137,7 +141,7 @@ public class PersistentTopicsBase extends AdminResource {
private static final Version LEAST_SUPPORTED_CLIENT_VERSION_PREFIX =
Version.forIntegers(1, 21);
protected List<String> internalGetList() {
- validateAdminAccessForTenant(namespaceName.getTenant());
+ validateNamespaceOperation(namespaceName,
NamespaceOperation.GET_TOPICS);
// Validate that namespace exists, throws 404 if it doesn't exist
try {
@@ -173,6 +177,7 @@ public class PersistentTopicsBase extends AdminResource {
protected List<String> internalGetPartitionedTopicList() {
validateAdminAccessForTenant(namespaceName.getTenant());
+ validateNamespaceOperation(namespaceName,
NamespaceOperation.GET_TOPICS);
// Validate that namespace exists, throws 404 if it doesn't exist
try {
if (!namespaceResources().exists(path(POLICIES,
namespaceName.toString()))) {
@@ -252,74 +257,6 @@ public class PersistentTopicsBase extends AdminResource {
validateTopicOwnership(topicName, authoritative);
}
- public void validateReadOperationOnTopic(boolean authoritative) {
- validateTopicOwnership(topicName, authoritative);
- try {
- validateAdminAccessForTenant(topicName.getTenant());
- } catch (Exception e) {
- if (log.isDebugEnabled()) {
- log.debug("[{}] failed to validate admin access for {}",
topicName, clientAppId());
- }
- validateAdminAccessForSubscriber("");
- }
- }
-
- public void validateWriteOperationOnTopic(boolean authoritative) {
- validateTopicOwnership(topicName, authoritative);
- try {
- validateAdminAccessForTenant(topicName.getTenant());
- } catch (Exception e) {
- if (log.isDebugEnabled()) {
- log.debug("[{}] failed to validate admin access for {}",
topicName, clientAppId());
- }
- try {
- if
(!pulsar().getBrokerService().getAuthorizationService().canProduce(topicName,
clientAppId(),
- clientAuthData())) {
- log.warn("[{}} Subscriber {} is not authorized to access
api", topicName, clientAppId());
- throw new RestException(Status.UNAUTHORIZED,
- String.format("Subscriber %s is not authorized to access
this operation", clientAppId()));
- }
- } catch (RestException re) {
- throw re;
- } catch (Exception ex) {
- // unknown error marked as internal server error
- log.warn("Unexpected error while authorizing request.
topic={}, role={}. Error: {}", topicName,
- clientAppId(), e.getMessage(), ex);
- throw new RestException(ex);
- }
- }
- }
-
- protected void validateAdminAccessForSubscriber(String subscriptionName,
boolean authoritative) {
- validateTopicOwnership(topicName, authoritative);
- try {
- validateAdminAccessForTenant(topicName.getTenant());
- } catch (Exception e) {
- if (log.isDebugEnabled()) {
- log.debug("[{}] failed to validate admin access for {}",
topicName, clientAppId());
- }
- validateAdminAccessForSubscriber(subscriptionName);
- }
- }
-
- private void validateAdminAccessForSubscriber(String subscriptionName) {
- try {
- if
(!pulsar().getBrokerService().getAuthorizationService().canConsume(topicName,
clientAppId(),
- clientAuthData(), subscriptionName)) {
- log.warn("[{}} Subscriber {} is not authorized to access api",
topicName, clientAppId());
- throw new RestException(Status.UNAUTHORIZED,
- String.format("Subscriber %s is not authorized to
access this operation", clientAppId()));
- }
- } catch (RestException re) {
- throw re;
- } catch (Exception e) {
- // unknown error marked as internal server error
- log.warn("Unexpected error while authorizing request. topic={},
role={}. Error: {}", topicName,
- clientAppId(), e.getMessage(), e);
- throw new RestException(e);
- }
- }
-
private void grantPermissions(String topicUri, String role,
Set<AuthAction> actions) {
try {
namespaceResources().set(path(POLICIES, namespaceName.toString()),
(policies) -> {
@@ -358,7 +295,8 @@ public class PersistentTopicsBase extends AdminResource {
}
protected void internalDeleteTopicForcefully(boolean authoritative,
boolean deleteSchema) {
- validateWriteOperationOnTopic(authoritative);
+ validateTopicOwnership(topicName, authoritative);
+ validateNamespaceOperation(topicName.getNamespaceObject(),
NamespaceOperation.DELETE_TOPIC);
try {
pulsar().getBrokerService().deleteTopic(topicName.toString(),
true, deleteSchema).get();
@@ -419,13 +357,12 @@ public class PersistentTopicsBase extends AdminResource {
}
protected void internalCreateNonPartitionedTopic(boolean authoritative) {
- validateWriteOperationOnTopic(authoritative);
validateNonPartitionTopicName(topicName.getLocalName());
if (topicName.isGlobal()) {
validateGlobalNamespaceOwnership(namespaceName);
}
-
validateTopicOwnership(topicName, authoritative);
+ validateNamespaceOperation(topicName.getNamespaceObject(),
NamespaceOperation.CREATE_TOPIC);
PartitionedTopicMetadata partitionMetadata =
getPartitionedTopicMetadata(topicName, authoritative, false);
if (partitionMetadata.partitions > 0) {
@@ -465,7 +402,9 @@ public class PersistentTopicsBase extends AdminResource {
*/
protected void internalUpdatePartitionedTopic(int numPartitions,
boolean
updateLocalTopicOnly, boolean authoritative) {
- validateWriteOperationOnTopic(authoritative);
+ validateTopicOwnership(topicName, authoritative);
+ validateTopicPolicyOperation(topicName, PolicyName.PARTITION,
PolicyOperation.WRITE);
+
// Only do the validation if it's the first hop.
if (!updateLocalTopicOnly) {
validatePartitionTopicUpdate(topicName.getLocalName(),
numPartitions);
@@ -613,7 +552,8 @@ public class PersistentTopicsBase extends AdminResource {
protected void internalDeletePartitionedTopic(AsyncResponse asyncResponse,
boolean authoritative,
boolean force, boolean
deleteSchema) {
try {
- validateWriteOperationOnTopic(authoritative);
+ validateNamespaceOperation(topicName.getNamespaceObject(),
NamespaceOperation.DELETE_TOPIC);
+ validateTopicOwnership(topicName, authoritative);
} catch (WebApplicationException wae) {
if (log.isDebugEnabled()) {
log.debug("[{}] Failed to delete partitioned topic {},
redirecting to other brokers.",
@@ -1005,8 +945,8 @@ public class PersistentTopicsBase extends AdminResource {
private void internalUnloadNonPartitionedTopic(AsyncResponse
asyncResponse, boolean authoritative) {
Topic topic;
try {
- validateAdminAccessForTenant(topicName.getTenant());
validateTopicOwnership(topicName, authoritative);
+ validateTopicOperation(topicName, TopicOperation.UNLOAD);
topic = getTopicReference(topicName);
} catch (Exception e) {
log.error("[{}] Failed to unload topic {}", clientAppId(),
topicName, e);
@@ -1034,7 +974,8 @@ public class PersistentTopicsBase extends AdminResource {
}
protected void internalDeleteTopic(boolean authoritative, boolean
deleteSchema) {
- validateWriteOperationOnTopic(authoritative);
+ validateNamespaceOperation(topicName.getNamespaceObject(),
NamespaceOperation.DELETE_TOPIC);
+ validateTopicOwnership(topicName, authoritative);
try {
pulsar().getBrokerService().deleteTopic(topicName.toString(),
false, deleteSchema).get();
@@ -1062,6 +1003,9 @@ public class PersistentTopicsBase extends AdminResource {
return;
}
}
+
+ validateTopicOwnership(topicName, authoritative);
+
// If the topic name is a partition name, no need to get partition
topic metadata again
if (topicName.isPartitioned()) {
internalGetSubscriptionsForNonPartitionedTopic(asyncResponse,
authoritative);
@@ -1115,7 +1059,9 @@ public class PersistentTopicsBase extends AdminResource {
private void internalGetSubscriptionsForNonPartitionedTopic(AsyncResponse
asyncResponse, boolean authoritative) {
try {
- validateReadOperationOnTopic(authoritative);
+ validateTopicOwnership(topicName, authoritative);
+ validateTopicOperation(topicName,
TopicOperation.GET_SUBSCRIPTIONS);
+
Topic topic = getTopicReference(topicName);
final List<String> subscriptions = Lists.newArrayList();
topic.getSubscriptions().forEach((subName, sub) ->
subscriptions.add(subName));
@@ -1136,25 +1082,29 @@ public class PersistentTopicsBase extends AdminResource
{
protected TopicStats internalGetStats(boolean authoritative, boolean
getPreciseBacklog,
boolean subscriptionBacklogSize) {
- validateAdminAndClientPermission();
if (topicName.isGlobal()) {
validateGlobalNamespaceOwnership(namespaceName);
}
validateTopicOwnership(topicName, authoritative);
+ validateTopicOperation(topicName, TopicOperation.GET_STATS);
+
Topic topic = getTopicReference(topicName);
return topic.getStats(getPreciseBacklog, subscriptionBacklogSize);
}
protected PersistentTopicInternalStats internalGetInternalStats(boolean
authoritative, boolean metadata) {
- validateAdminAndClientPermission();
if (topicName.isGlobal()) {
validateGlobalNamespaceOwnership(namespaceName);
}
validateTopicOwnership(topicName, authoritative);
+ validateTopicOperation(topicName, TopicOperation.GET_STATS);
+
Topic topic = getTopicReference(topicName);
try {
- boolean includeMetadata = metadata && hasSuperUserAccess();
- return topic.getInternalStats(includeMetadata).get();
+ if (metadata) {
+ validateTopicOperation(topicName, TopicOperation.GET_METADATA);
+ }
+ return topic.getInternalStats(metadata).get();
} catch (Exception e) {
throw new RestException(Status.INTERNAL_SERVER_ERROR,
(e instanceof ExecutionException) ?
e.getCause().getMessage() : e.getMessage());
@@ -1240,7 +1190,7 @@ public class PersistentTopicsBase extends AdminResource {
protected void
internalGetManagedLedgerInfoForNonPartitionedTopic(AsyncResponse asyncResponse)
{
String managedLedger;
try {
- validateAdminAccessForTenant(topicName.getTenant());
+ validateTopicOperation(topicName, TopicOperation.GET_STATS);
managedLedger = topicName.getPersistenceNamingEncoding();
} catch (Exception e) {
log.error("[{}] Failed to get managed info for {}", clientAppId(),
topicName, e);
@@ -1409,6 +1359,7 @@ public class PersistentTopicsBase extends AdminResource {
return;
}
}
+ validateTopicOwnership(topicName, authoritative);
// If the topic name is a partition name, no need to get partition
topic metadata again
if (topicName.isPartitioned()) {
internalDeleteSubscriptionForNonPartitionedTopic(asyncResponse,
subName, authoritative);
@@ -1468,7 +1419,9 @@ public class PersistentTopicsBase extends AdminResource {
private void
internalDeleteSubscriptionForNonPartitionedTopic(AsyncResponse asyncResponse,
String
subName, boolean authoritative) {
try {
- validateAdminAccessForSubscriber(subName, authoritative);
+ validateTopicOwnership(topicName, authoritative);
+ validateTopicOperation(topicName, TopicOperation.UNSUBSCRIBE);
+
Topic topic = getTopicReference(topicName);
Subscription sub = topic.getSubscription(subName);
if (sub == null) {
@@ -1563,7 +1516,9 @@ public class PersistentTopicsBase extends AdminResource {
private void
internalDeleteSubscriptionForNonPartitionedTopicForcefully(AsyncResponse
asyncResponse,
String subName, boolean authoritative) {
try {
- validateAdminAccessForSubscriber(subName, authoritative);
+ validateTopicOwnership(topicName, authoritative);
+ validateTopicOperation(topicName, TopicOperation.UNSUBSCRIBE);
+
Topic topic = getTopicReference(topicName);
Subscription sub = topic.getSubscription(subName);
if (sub == null) {
@@ -1600,6 +1555,10 @@ public class PersistentTopicsBase extends AdminResource {
return;
}
}
+
+ validateTopicOwnership(topicName, authoritative);
+ validateTopicOperation(topicName, TopicOperation.SKIP, subName);
+
// If the topic name is a partition name, no need to get partition
topic metadata again
if (topicName.isPartitioned()) {
internalSkipAllMessagesForNonPartitionedTopic(asyncResponse,
subName, authoritative);
@@ -1657,7 +1616,9 @@ public class PersistentTopicsBase extends AdminResource {
private void internalSkipAllMessagesForNonPartitionedTopic(AsyncResponse
asyncResponse,
String subName,
boolean authoritative) {
try {
- validateAdminAccessForSubscriber(subName, authoritative);
+ validateTopicOwnership(topicName, authoritative);
+ validateTopicOperation(topicName, TopicOperation.SKIP, subName);
+
PersistentTopic topic = (PersistentTopic)
getTopicReference(topicName);
BiConsumer<Void, Throwable> biConsumer = (v, ex) -> {
if (ex != null) {
@@ -1707,7 +1668,10 @@ public class PersistentTopicsBase extends AdminResource {
if (partitionMetadata.partitions > 0) {
throw new RestException(Status.METHOD_NOT_ALLOWED, "Skip messages
on a partitioned topic is not allowed");
}
- validateAdminAccessForSubscriber(subName, authoritative);
+
+ validateTopicOwnership(topicName, authoritative);
+ validateTopicOperation(topicName, TopicOperation.SKIP);
+
PersistentTopic topic = (PersistentTopic) getTopicReference(topicName);
try {
if (subName.startsWith(topic.getReplicatorPrefix())) {
@@ -1802,8 +1766,8 @@ public class PersistentTopicsBase extends AdminResource {
// validate ownership and redirect if current broker is not owner
PersistentTopic topic;
try {
- validateWriteOperationOnTopic(authoritative);
-
+ validateTopicOwnership(topicName, authoritative);
+ validateTopicOperation(topicName, TopicOperation.EXPIRE_MESSAGES);
topic = (PersistentTopic) getTopicReference(topicName);
} catch (WebApplicationException wae) {
if (log.isDebugEnabled()) {
@@ -1864,6 +1828,10 @@ public class PersistentTopicsBase extends AdminResource {
return;
}
}
+
+ validateTopicOwnership(topicName, authoritative);
+ validateTopicOperation(topicName, TopicOperation.RESET_CURSOR,
subName);
+
// If the topic name is a partition name, no need to get partition
topic metadata again
if (topicName.isPartitioned()) {
internalResetCursorForNonPartitionedTopic(asyncResponse, subName,
timestamp, authoritative);
@@ -1952,9 +1920,12 @@ public class PersistentTopicsBase extends AdminResource {
private void internalResetCursorForNonPartitionedTopic(AsyncResponse
asyncResponse, String subName, long timestamp,
boolean authoritative) {
try {
- validateAdminAccessForSubscriber(subName, authoritative);
+ validateTopicOwnership(topicName, authoritative);
+ validateTopicOperation(topicName, TopicOperation.RESET_CURSOR);
+
log.info("[{}] [{}] Received reset cursor on subscription {} to
time {}",
clientAppId(), topicName, subName, timestamp);
+
PersistentTopic topic = (PersistentTopic)
getTopicReference(topicName);
if (topic == null) {
asyncResponse.resume(new RestException(Status.NOT_FOUND,
"Topic not found"));
@@ -2100,7 +2071,8 @@ public class PersistentTopicsBase extends AdminResource {
AsyncResponse asyncResponse, String subscriptionName,
MessageIdImpl targetMessageId, boolean authoritative, boolean
replicated) {
try {
- validateAdminAccessForSubscriber(subscriptionName, authoritative);
+ validateTopicOwnership(topicName, authoritative);
+ validateTopicOperation(topicName, TopicOperation.SUBSCRIBE);
boolean isAllowAutoTopicCreation =
pulsar().getConfiguration().isAllowAutoTopicCreation();
PersistentTopic topic = (PersistentTopic)
pulsar().getBrokerService()
@@ -2168,9 +2140,9 @@ public class PersistentTopicsBase extends AdminResource {
"Reset-cursor at position is not allowed for
partitioned-topic"));
return;
} else {
- validateAdminAccessForSubscriber(subName, authoritative);
- // will redirect if the topic not owned by current broker
- validateReadOperationOnTopic(authoritative);
+ validateTopicOwnership(topicName, authoritative);
+ validateTopicOperation(topicName, TopicOperation.RESET_CURSOR);
+
PersistentTopic topic = (PersistentTopic)
getTopicReference(topicName);
if (topic == null) {
asyncResponse.resume(new RestException(Status.NOT_FOUND,
"Topic not found"));
@@ -2304,7 +2276,12 @@ public class PersistentTopicsBase extends AdminResource {
boolean authoritative) {
try {
// will redirect if the topic not owned by current broker
- validateReadOperationOnTopic(authoritative);
+ validateTopicOwnership(topicName, authoritative);
+ validateTopicOperation(topicName, TopicOperation.PEEK_MESSAGES);
+
+ if (topicName.isGlobal()) {
+ validateGlobalNamespaceOwnership(namespaceName);
+ }
PersistentTopic topic = (PersistentTopic)
getTopicReference(topicName);
ManagedLedgerImpl ledger = (ManagedLedgerImpl)
topic.getManagedLedger();
ledger.asyncReadEntry(new PositionImpl(ledgerId, entryId), new
AsyncCallbacks.ReadEntryCallback() {
@@ -2336,13 +2313,15 @@ public class PersistentTopicsBase extends AdminResource
{
}
protected Response internalPeekNthMessage(String subName, int
messagePosition, boolean authoritative) {
- verifyReadOperation(authoritative);
// If the topic name is a partition name, no need to get partition
topic metadata again
if (!topicName.isPartitioned() &&
getPartitionedTopicMetadata(topicName,
authoritative, false).partitions > 0) {
throw new RestException(Status.METHOD_NOT_ALLOWED, "Peek messages
on a partitioned topic is not allowed");
}
- validateAdminAccessForSubscriber(subName, authoritative);
+
+ validateTopicOwnership(topicName, authoritative);
+ validateTopicOperation(topicName, TopicOperation.PEEK_MESSAGES);
+
if (!(getTopicReference(topicName) instanceof PersistentTopic)) {
log.error("[{}] Not supported operation of non-persistent topic {}
{}", clientAppId(), topicName,
subName);
@@ -2496,7 +2475,7 @@ public class PersistentTopicsBase extends AdminResource {
validateGlobalNamespaceOwnership(namespaceName);
}
// Validate that namespace exists, throw 404 if it doesn't exist
- // note that we do not want to load the topic and hence skip
validateAdminOperationOnTopic()
+ // note that we do not want to load the topic and hence skip
authorization check
try {
namespaceResources().get(path(POLICIES, namespaceName.toString()));
} catch
(org.apache.pulsar.metadata.api.MetadataStoreException.NotFoundException e) {
@@ -2555,6 +2534,8 @@ public class PersistentTopicsBase extends AdminResource {
protected void internalSetBacklogQuota(AsyncResponse asyncResponse,
BacklogQuota.BacklogQuotaType
backlogQuotaType, BacklogQuota backlogQuota) {
+ validateTopicPolicyOperation(topicName, PolicyName.BACKLOG,
PolicyOperation.WRITE);
+ validatePoliciesReadOnlyAccess();
if (backlogQuotaType == null) {
backlogQuotaType =
BacklogQuota.BacklogQuotaType.destination_storage;
}
@@ -2903,7 +2884,8 @@ public class PersistentTopicsBase extends AdminResource {
if (partitionMetadata.partitions > 0) {
throw new RestException(Status.METHOD_NOT_ALLOWED, "Termination of
a partitioned topic is not allowed");
}
- validateWriteOperationOnTopic(authoritative);
+ validateTopicOwnership(topicName, authoritative);
+ validateTopicOperation(topicName, TopicOperation.TERMINATE);
Topic topic = getTopicReference(topicName);
try {
return ((PersistentTopic) topic).terminate().get();
@@ -2917,7 +2899,8 @@ public class PersistentTopicsBase extends AdminResource {
if (topicName.isGlobal()) {
validateGlobalNamespaceOwnership(namespaceName);
}
- validateAdminOperationOnTopic(authoritative);
+ validateTopicOwnership(topicName, authoritative);
+ validateTopicOperation(topicName, TopicOperation.TERMINATE);
List<MessageId> messageIds = new ArrayList<>();
@@ -3042,8 +3025,8 @@ public class PersistentTopicsBase extends AdminResource {
throw new IllegalStateException(msg);
}
- // validate ownership and redirect if current broker is not owner
- validateAdminAccessForSubscriber(subName, authoritative);
+ validateTopicOwnership(topicName, authoritative);
+ validateTopicOperation(topicName, TopicOperation.EXPIRE_MESSAGES);
if (!(getTopicReference(topicName) instanceof PersistentTopic)) {
log.error("[{}] Not supported operation of non-persistent topic {}
{}", clientAppId(), topicName, subName);
@@ -3101,6 +3084,9 @@ public class PersistentTopicsBase extends AdminResource {
}
}
+ validateTopicOwnership(topicName, authoritative);
+ validateTopicOperation(topicName, TopicOperation.EXPIRE_MESSAGES);
+
log.info("[{}][{}] received expire messages on subscription {} to
position {}", clientAppId(), topicName,
subName, messageId);
@@ -3119,8 +3105,6 @@ public class PersistentTopicsBase extends AdminResource {
"Invalid parameter for expire message by position,
partition index of message position "
+ "passed in doesn't match partition index for the
topic."));
} else {
- validateAdminAccessForSubscriber(subName, authoritative);
- validateReadOperationOnTopic(authoritative);
PersistentTopic topic = (PersistentTopic)
getTopicReference(topicName);
if (topic == null) {
asyncResponse.resume(new RestException(Status.NOT_FOUND,
"Topic not found"));
@@ -3264,7 +3248,8 @@ public class PersistentTopicsBase extends AdminResource {
}
protected void internalTriggerCompactionNonPartitionedTopic(boolean
authoritative) {
- validateWriteOperationOnTopic(authoritative);
+ validateTopicOwnership(topicName, authoritative);
+ validateTopicOperation(topicName, TopicOperation.COMPACT);
PersistentTopic topic = (PersistentTopic) getTopicReference(topicName);
try {
@@ -3278,13 +3263,17 @@ public class PersistentTopicsBase extends AdminResource
{
}
protected LongRunningProcessStatus internalCompactionStatus(boolean
authoritative) {
- validateReadOperationOnTopic(authoritative);
+ validateTopicOwnership(topicName, authoritative);
+ validateTopicOperation(topicName, TopicOperation.COMPACT);
+
PersistentTopic topic = (PersistentTopic) getTopicReference(topicName);
return topic.compactionStatus();
}
protected void internalTriggerOffload(boolean authoritative, MessageIdImpl
messageId) {
- validateWriteOperationOnTopic(authoritative);
+ validateTopicOwnership(topicName, authoritative);
+ validateTopicOperation(topicName, TopicOperation.OFFLOAD);
+
PersistentTopic topic = (PersistentTopic) getTopicReference(topicName);
try {
topic.triggerOffload(messageId);
@@ -3297,7 +3286,9 @@ public class PersistentTopicsBase extends AdminResource {
}
protected OffloadProcessStatus internalOffloadStatus(boolean
authoritative) {
- validateReadOperationOnTopic(authoritative);
+ validateTopicOwnership(topicName, authoritative);
+ validateTopicOperation(topicName, TopicOperation.OFFLOAD);
+
PersistentTopic topic = (PersistentTopic) getTopicReference(topicName);
return topic.offloadStatus();
}
@@ -3675,7 +3666,8 @@ public class PersistentTopicsBase extends AdminResource {
protected void internalGetLastMessageId(AsyncResponse asyncResponse,
boolean authoritative) {
Topic topic;
try {
- validateReadOperationOnTopic(authoritative);
+ validateTopicOwnership(topicName, authoritative);
+ validateTopicOperation(topicName, TopicOperation.PEEK_MESSAGES);
topic = getTopicReference(topicName);
} catch (WebApplicationException wae) {
if (log.isDebugEnabled()) {
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v1/NonPersistentTopics.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v1/NonPersistentTopics.java
index d9cc845..20fd24d 100644
---
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v1/NonPersistentTopics.java
+++
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v1/NonPersistentTopics.java
@@ -50,9 +50,11 @@ import org.apache.pulsar.common.naming.NamespaceBundle;
import org.apache.pulsar.common.naming.NamespaceName;
import org.apache.pulsar.common.naming.TopicName;
import org.apache.pulsar.common.partition.PartitionedTopicMetadata;
+import org.apache.pulsar.common.policies.data.NamespaceOperation;
import org.apache.pulsar.common.policies.data.NonPersistentTopicStats;
import org.apache.pulsar.common.policies.data.PersistentTopicInternalStats;
import org.apache.pulsar.common.policies.data.Policies;
+import org.apache.pulsar.common.policies.data.TopicOperation;
import org.apache.pulsar.common.util.FutureUtil;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -98,7 +100,8 @@ public class NonPersistentTopics extends PersistentTopics {
@PathParam("topic") @Encoded
String encodedTopic,
@QueryParam("authoritative")
@DefaultValue("false") boolean authoritative) {
validateTopicName(property, cluster, namespace, encodedTopic);
- validateAdminOperationOnTopic(authoritative);
+ validateTopicOwnership(topicName, authoritative);
+ validateTopicOperation(topicName, TopicOperation.GET_STATS);
Topic topic = getTopicReference(topicName);
return ((NonPersistentTopic) topic).getStats(false, false);
}
@@ -119,7 +122,8 @@ public class NonPersistentTopics extends PersistentTopics {
@QueryParam("metadata") @DefaultValue("false")
boolean
metadata) {
validateTopicName(property, cluster, namespace, encodedTopic);
- validateAdminOperationOnTopic(authoritative);
+ validateTopicOwnership(topicName, authoritative);
+ validateTopicOperation(topicName, TopicOperation.GET_STATS);
Topic topic = getTopicReference(topicName);
try {
boolean includeMetadata = metadata && hasSuperUserAccess();
@@ -188,7 +192,7 @@ public class NonPersistentTopics extends PersistentTopics {
Policies policies = null;
NamespaceName nsName = null;
try {
- validateAdminAccessForTenant(property);
+ validateNamespaceOperation(namespaceName,
NamespaceOperation.GET_TOPICS);
policies = getNamespacePolicies(property, cluster, namespace);
nsName = NamespaceName.get(property, cluster, namespace);
@@ -254,7 +258,7 @@ public class NonPersistentTopics extends PersistentTopics {
@PathParam("bundle") String
bundleRange) {
log.info("[{}] list of topics on namespace bundle {}/{}/{}/{}",
clientAppId(), property, cluster, namespace,
bundleRange);
- validateAdminAccessForTenant(property);
+ validateNamespaceOperation(namespaceName,
NamespaceOperation.GET_BUNDLE);
Policies policies = getNamespacePolicies(property, cluster, namespace);
if (!cluster.equals(Constants.GLOBAL_CLUSTER)) {
validateClusterOwnership(cluster);
@@ -289,11 +293,6 @@ public class NonPersistentTopics extends PersistentTopics {
}
}
- protected void validateAdminOperationOnTopic(TopicName topicName, boolean
authoritative) {
- validateAdminAccessForTenant(topicName.getTenant());
- validateTopicOwnership(topicName, authoritative);
- }
-
private Topic getTopicReference(TopicName topicName) {
return
pulsar().getBrokerService().getTopicIfExists(topicName.toString()).join()
.orElseThrow(() -> new RestException(Status.NOT_FOUND, "Topic
not found"));
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/NonPersistentTopics.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/NonPersistentTopics.java
index cfc6f66..c1ed16f 100644
---
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/NonPersistentTopics.java
+++
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/NonPersistentTopics.java
@@ -50,9 +50,11 @@ import org.apache.pulsar.broker.web.RestException;
import org.apache.pulsar.common.naming.NamespaceBundle;
import org.apache.pulsar.common.naming.TopicName;
import org.apache.pulsar.common.partition.PartitionedTopicMetadata;
+import org.apache.pulsar.common.policies.data.NamespaceOperation;
import org.apache.pulsar.common.policies.data.NonPersistentTopicStats;
import org.apache.pulsar.common.policies.data.PersistentTopicInternalStats;
import org.apache.pulsar.common.policies.data.Policies;
+import org.apache.pulsar.common.policies.data.TopicOperation;
import org.apache.pulsar.common.util.FutureUtil;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -118,7 +120,9 @@ public class NonPersistentTopics extends PersistentTopics {
+ "not to use when there's heavy traffic.")
@QueryParam("subscriptionBacklogSize") @DefaultValue("false")
boolean subscriptionBacklogSize) {
validateTopicName(tenant, namespace, encodedTopic);
- validateAdminOperationOnTopic(topicName, authoritative);
+ validateTopicOwnership(topicName, authoritative);
+ validateTopicOperation(topicName, TopicOperation.GET_STATS);
+
Topic topic = getTopicReference(topicName);
return ((NonPersistentTopic) topic).getStats(getPreciseBacklog,
subscriptionBacklogSize);
}
@@ -145,7 +149,8 @@ public class NonPersistentTopics extends PersistentTopics {
@QueryParam("authoritative") @DefaultValue("false") boolean
authoritative,
@QueryParam("metadata") @DefaultValue("false") boolean metadata) {
validateTopicName(tenant, namespace, encodedTopic);
- validateAdminOperationOnTopic(topicName, authoritative);
+ validateTopicOwnership(topicName, authoritative);
+ validateTopicOperation(topicName, TopicOperation.GET_STATS);
Topic topic = getTopicReference(topicName);
try {
boolean includeMetadata = metadata && hasSuperUserAccess();
@@ -188,7 +193,6 @@ public class NonPersistentTopics extends PersistentTopics {
try {
validateGlobalNamespaceOwnership(tenant, namespace);
validateTopicName(tenant, namespace, encodedTopic);
- validateAdminAccessForTenant(topicName.getTenant());
internalCreatePartitionedTopic(asyncResponse, numPartitions);
} catch (Exception e) {
log.error("[{}] Failed to create partitioned topic {}",
clientAppId(), topicName, e);
@@ -252,7 +256,7 @@ public class NonPersistentTopics extends PersistentTopics {
if (log.isDebugEnabled()) {
log.debug("[{}] list of topics on namespace {}",
clientAppId(), namespaceName);
}
- validateAdminAccessForTenant(tenant);
+ validateNamespaceOperation(namespaceName,
NamespaceOperation.GET_TOPICS);
policies = getNamespacePolicies(namespaceName);
// check cluster ownership for a given global namespace: redirect
if peer-cluster owns it
@@ -327,7 +331,7 @@ public class NonPersistentTopics extends PersistentTopics {
log.debug("[{}] list of topics on namespace bundle {}/{}",
clientAppId(), namespaceName, bundleRange);
}
- validateAdminAccessForTenant(tenant);
+ validateNamespaceOperation(namespaceName,
NamespaceOperation.GET_BUNDLE);
Policies policies = getNamespacePolicies(namespaceName);
// check cluster ownership for a given global namespace: redirect if
peer-cluster owns it
@@ -374,11 +378,6 @@ public class NonPersistentTopics extends PersistentTopics {
});
}
- protected void validateAdminOperationOnTopic(TopicName topicName, boolean
authoritative) {
- validateAdminAccessForTenant(topicName.getTenant());
- validateTopicOwnership(topicName, authoritative);
- }
-
private Topic getTopicReference(TopicName topicName) {
return
pulsar().getBrokerService().getTopicIfExists(topicName.toString()).join()
.orElseThrow(() -> new RestException(Status.NOT_FOUND, "Topic
not found"));
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/PersistentTopics.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/PersistentTopics.java
index 9ee0bf9..052eed6 100644
---
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/PersistentTopics.java
+++
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/PersistentTopics.java
@@ -65,6 +65,8 @@ import org.apache.pulsar.common.policies.data.OffloadPolicies;
import org.apache.pulsar.common.policies.data.PersistencePolicies;
import org.apache.pulsar.common.policies.data.PersistentOfflineTopicStats;
import org.apache.pulsar.common.policies.data.PersistentTopicInternalStats;
+import org.apache.pulsar.common.policies.data.PolicyName;
+import org.apache.pulsar.common.policies.data.PolicyOperation;
import org.apache.pulsar.common.policies.data.PublishRate;
import org.apache.pulsar.common.policies.data.RetentionPolicies;
import org.apache.pulsar.common.policies.data.SubscribeRate;
@@ -232,7 +234,7 @@ public class PersistentTopics extends PersistentTopicsBase {
try {
validateGlobalNamespaceOwnership(tenant, namespace);
validatePartitionedTopicName(tenant, namespace, encodedTopic);
- validateAdminAccessForTenant(topicName.getTenant());
+ validateTopicPolicyOperation(topicName, PolicyName.PARTITION,
PolicyOperation.WRITE);
internalCreatePartitionedTopic(asyncResponse, numPartitions);
} catch (Exception e) {
log.error("[{}] Failed to create partitioned topic {}",
clientAppId(), topicName, e);
@@ -525,6 +527,8 @@ public class PersistentTopics extends PersistentTopicsBase {
Integer maxUnackedNum) {
validateTopicName(tenant, namespace, encodedTopic);
preValidation();
+ validateTopicPolicyOperation(topicName, PolicyName.MAX_UNACKED,
PolicyOperation.WRITE);
+ validatePoliciesReadOnlyAccess();
internalSetMaxUnackedMessagesOnSubscription(maxUnackedNum).whenComplete((res,
ex)
-> internalHandleResult(asyncResponse, res, ex, "Failed set
MaxUnackedMessagesOnSubscription"));
}
@@ -584,6 +588,8 @@ public class PersistentTopics extends PersistentTopicsBase {
DelayedDeliveryPolicies deliveryPolicies) {
validateTopicName(tenant, namespace, encodedTopic);
preValidation();
+ validateTopicPolicyOperation(topicName, PolicyName.DELAYED_DELIVERY,
PolicyOperation.WRITE);
+ validatePoliciesReadOnlyAccess();
internalSetDelayedDeliveryPolicies(asyncResponse, deliveryPolicies);
}
@@ -1021,6 +1027,7 @@ public class PersistentTopics extends
PersistentTopicsBase {
@QueryParam("authoritative") @DefaultValue("false") boolean
authoritative) {
try {
validateTopicName(tenant, namespace, encodedTopic);
+ validateTopicOwnership(topicName, authoritative);
internalDeleteSubscription(asyncResponse, decode(encodedSubName),
authoritative, force);
} catch (WebApplicationException wae) {
asyncResponse.resume(wae);
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/lookup/TopicLookupBase.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/lookup/TopicLookupBase.java
index 3dc1be3..44d0ff2 100644
---
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/lookup/TopicLookupBase.java
+++
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/lookup/TopicLookupBase.java
@@ -43,6 +43,8 @@ import org.apache.pulsar.common.lookup.data.LookupData;
import org.apache.pulsar.common.naming.NamespaceBundle;
import org.apache.pulsar.common.naming.TopicDomain;
import org.apache.pulsar.common.naming.TopicName;
+import org.apache.pulsar.common.policies.data.NamespaceOperation;
+import org.apache.pulsar.common.policies.data.TopicOperation;
import org.apache.pulsar.common.util.Codec;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -130,14 +132,15 @@ public class TopicLookupBase extends PulsarWebResource {
private void validateAdminAndClientPermission(TopicName topic) throws
RestException, Exception {
try {
- validateAdminAccessForTenant(topic.getTenant());
+ validateTopicOperation(topic, TopicOperation.LOOKUP);
} catch (Exception e) {
- checkConnect(topic);
+ // unknown error marked as internal server error
+ throw new RestException(e);
}
}
protected String internalGetNamespaceBundle(TopicName topicName) {
- validateSuperUserAccess();
+ validateNamespaceOperation(topicName.getNamespaceObject(),
NamespaceOperation.GET_BUNDLE);
try {
NamespaceBundle bundle =
pulsar().getNamespaceService().getBundle(topicName);
return bundle.getBundleRange();
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/web/PulsarWebResource.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/web/PulsarWebResource.java
index 835988e..9566d8f 100644
---
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/web/PulsarWebResource.java
+++
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/web/PulsarWebResource.java
@@ -81,6 +81,7 @@ import org.apache.pulsar.common.policies.data.PolicyName;
import org.apache.pulsar.common.policies.data.PolicyOperation;
import org.apache.pulsar.common.policies.data.TenantInfo;
import org.apache.pulsar.common.policies.data.TenantOperation;
+import org.apache.pulsar.common.policies.data.TopicOperation;
import org.apache.pulsar.common.policies.path.PolicyPath;
import org.apache.pulsar.common.util.FutureUtil;
import org.apache.pulsar.common.util.ObjectMapperFactory;
@@ -780,10 +781,6 @@ public abstract class PulsarWebResource {
return null;
}
- protected void checkConnect(TopicName topicName) throws Exception {
- checkAuthorization(pulsar(), topicName, clientAppId(),
clientAuthData());
- }
-
protected static void checkAuthorization(PulsarService pulsarService,
TopicName topicName, String role,
AuthenticationDataSource authenticationData) throws Exception {
if (!pulsarService.getConfiguration().isAuthorizationEnabled()) {
@@ -791,8 +788,8 @@ public abstract class PulsarWebResource {
return;
}
// get zk policy manager
- if
(!pulsarService.getBrokerService().getAuthorizationService().canLookup(topicName,
role,
- authenticationData)) {
+ if
(!pulsarService.getBrokerService().getAuthorizationService().allowTopicOperation(topicName,
+ TopicOperation.LOOKUP, null, role, authenticationData)) {
log.warn("[{}] Role {} is not allowed to lookup topic", topicName,
role);
throw new RestException(Status.UNAUTHORIZED, "Don't have
permission to connect to this namespace");
}
@@ -1038,6 +1035,50 @@ public abstract class PulsarWebResource {
URI redirect =
UriBuilder.fromUri(uri.getRequestUri()).host(host).port(port).build();
log.debug("[{}] Redirecting the rest call to {}: broker={}",
clientAppId(), redirect, broker);
throw new
WebApplicationException(Response.temporaryRedirect(redirect).build());
+
+ }
+ }
+
+ public void validateTopicPolicyOperation(TopicName topicName, PolicyName
policy, PolicyOperation operation) {
+ if (pulsar().getConfiguration().isAuthenticationEnabled()
+ && pulsar().getBrokerService().isAuthorizationEnabled()) {
+ if (!isClientAuthenticated(clientAppId())) {
+ throw new RestException(Status.FORBIDDEN, "Need to
authenticate to perform the request");
+ }
+
+ Boolean isAuthorized =
pulsar().getBrokerService().getAuthorizationService()
+ .allowTopicPolicyOperation(topicName, policy, operation,
originalPrincipal(), clientAppId(),
+ clientAuthData());
+
+ if (!isAuthorized) {
+ throw new RestException(Status.FORBIDDEN,
String.format("Unauthorized to validateTopicPolicyOperation"
+ + " for operation [%s] on topic [%s] on policy [%s]",
operation.toString(),
+ topicName, policy.toString()));
+ }
+ }
+ }
+
+ public void validateTopicOperation(TopicName topicName, TopicOperation
operation) {
+ validateTopicOperation(topicName, operation, null);
+ }
+
+ public void validateTopicOperation(TopicName topicName, TopicOperation
operation, String subscription) {
+ if (pulsar().getConfiguration().isAuthenticationEnabled()
+ && pulsar().getBrokerService().isAuthorizationEnabled()) {
+ if (!isClientAuthenticated(clientAppId())) {
+ throw new RestException(Status.UNAUTHORIZED, "Need to
authenticate to perform the request");
+ }
+
+ AuthenticationDataHttps authData = clientAuthData();
+ authData.setSubscription(subscription);
+
+ Boolean isAuthorized =
pulsar().getBrokerService().getAuthorizationService()
+ .allowTopicOperation(topicName, operation,
originalPrincipal(), clientAppId(), authData);
+
+ if (!isAuthorized) {
+ throw new RestException(Status.UNAUTHORIZED,
String.format("Unauthorized to validateTopicOperation for"
+ + " operation [%s] on topic [%s]",
operation.toString(), topicName));
+ }
}
}
diff --git
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/PersistentTopicsTest.java
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/PersistentTopicsTest.java
index 23015be..dbf7656 100644
---
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/PersistentTopicsTest.java
+++
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/PersistentTopicsTest.java
@@ -236,7 +236,6 @@ public class PersistentTopicsTest extends
MockedPulsarServiceBaseTest {
ArgumentCaptor<Response> responseCaptor =
ArgumentCaptor.forClass(Response.class);
verify(response,
timeout(5000).times(1)).resume(responseCaptor.capture());
Assert.assertEquals(responseCaptor.getValue().getStatus(),
Response.Status.NO_CONTENT.getStatusCode());
-
TopicStats topicStats = persistentTopics.getStats(testTenant,
testNamespace, testLocalTopicName, true, true, false);
long msgBacklog =
topicStats.subscriptions.get(SUB_EARLIEST).msgBacklog;
System.out.println("Message back log for " + SUB_EARLIEST + " is :" +
msgBacklog);
diff --git
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ServerCnxTest.java
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ServerCnxTest.java
index c5a9335..a0a5917 100644
---
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ServerCnxTest.java
+++
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ServerCnxTest.java
@@ -549,6 +549,7 @@ public class ServerCnxTest {
doReturn(authorizationService).when(brokerService).getAuthorizationService();
doReturn(true).when(brokerService).isAuthorizationEnabled();
doReturn(CompletableFuture.completedFuture(false)).when(authorizationProvider).isSuperUser(Mockito.anyString(),
Mockito.any(), Mockito.any());
+
doReturn(CompletableFuture.completedFuture(false)).when(authorizationProvider).validateTenantAdminAccess(Mockito.anyString(),
Mockito.any(), Mockito.any());
doReturn(CompletableFuture.completedFuture(true)).when(authorizationProvider).checkPermission(any(TopicName.class),
Mockito.anyString(),
any(AuthAction.class));
diff --git
a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/AuthenticatedProducerConsumerTest.java
b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/AuthenticatedProducerConsumerTest.java
index 5647e00..f6fa07c 100644
---
a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/AuthenticatedProducerConsumerTest.java
+++
b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/AuthenticatedProducerConsumerTest.java
@@ -245,15 +245,8 @@ public class AuthenticatedProducerConsumerTest extends
ProducerConsumerBase {
replacePulsarClient(PulsarClient.builder().serviceUrl(pulsar.getBrokerServiceUrl())
.operationTimeout(1, TimeUnit.SECONDS));
- // unauthorized topic test
- Exception pulsarClientException = null;
- try {
-
pulsarClient.newConsumer().topic("persistent://my-property/my-ns/other-topic")
- .subscriptionName("my-subscriber-name").subscribe();
- } catch (Exception e) {
- pulsarClientException = e;
- }
- Assert.assertTrue(pulsarClientException instanceof
PulsarClientException);
+
pulsarClient.newConsumer().topic("persistent://my-property/my-ns/other-topic")
+ .subscriptionName("my-subscriber-name").subscribe();
testSyncProducerAndConsumer(batchMessageDelayMs);
diff --git
a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/AuthorizationProducerConsumerTest.java
b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/AuthorizationProducerConsumerTest.java
index 895143e..c6751d6 100644
---
a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/AuthorizationProducerConsumerTest.java
+++
b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/AuthorizationProducerConsumerTest.java
@@ -30,6 +30,7 @@ import java.util.HashSet;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import javax.naming.AuthenticationException;
import lombok.Cleanup;
@@ -52,6 +53,8 @@ import
org.apache.pulsar.common.policies.data.NamespaceOperation;
import org.apache.pulsar.common.policies.data.TenantInfo;
import org.apache.pulsar.common.policies.data.TenantOperation;
import org.apache.pulsar.common.policies.data.TopicOperation;
+import org.apache.pulsar.common.util.FutureUtil;
+import org.apache.pulsar.common.util.RestException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.testng.Assert;
@@ -541,13 +544,27 @@ public class AuthorizationProducerConsumerTest extends
ProducerConsumerBase {
@Override
public CompletableFuture<Boolean> allowTopicOperationAsync(
TopicName topic, String role, TopicOperation operation,
AuthenticationDataSource authData) {
- return CompletableFuture.completedFuture(true);
+ CompletableFuture<Boolean> isAuthorizedFuture;
+
+ if (role.equals("plugbleRole")) {
+ isAuthorizedFuture = CompletableFuture.completedFuture(true);
+ } else {
+ isAuthorizedFuture = CompletableFuture.completedFuture(false);
+ }
+
+ return isAuthorizedFuture;
}
@Override
public Boolean allowTopicOperation(
TopicName topicName, String role, TopicOperation operation,
AuthenticationDataSource authData) {
- return true;
+ try {
+ return allowTopicOperationAsync(topicName, role, operation,
authData).get();
+ } catch (InterruptedException e) {
+ throw new RestException(e);
+ } catch (ExecutionException e) {
+ throw new RestException(e);
+ }
}
}
diff --git
a/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/PolicyName.java
b/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/PolicyName.java
index b10990a..2b11502 100644
---
a/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/PolicyName.java
+++
b/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/PolicyName.java
@@ -38,6 +38,7 @@ public enum PolicyName {
MAX_UNACKED,
MAX_SUBSCRIPTIONS,
OFFLOAD,
+ PARTITION,
PERSISTENCE,
RATE,
RETENTION,
diff --git
a/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/TopicOperation.java
b/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/TopicOperation.java
index 7e54cca..a336dbd 100644
---
a/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/TopicOperation.java
+++
b/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/TopicOperation.java
@@ -47,4 +47,7 @@ public enum TopicOperation {
SUBSCRIBE,
GET_SUBSCRIPTIONS,
UNSUBSCRIBE,
+
+ GET_STATS,
+ GET_METADATA,
}