nodece commented on a change in pull request #13970:
URL: https://github.com/apache/pulsar/pull/13970#discussion_r794241218
##########
File path:
pulsar-broker/src/main/java/org/apache/pulsar/broker/web/PulsarWebResource.java
##########
@@ -906,24 +907,42 @@ public void validateNamespaceOperation(NamespaceName
namespaceName, NamespaceOpe
return CompletableFuture.completedFuture(null);
}
- public void validateNamespacePolicyOperation(NamespaceName namespaceName,
- PolicyName policy,
- PolicyOperation operation) {
+ public CompletableFuture<Void>
validateNamespacePolicyOperationAsync(NamespaceName namespaceName,
+ PolicyName policy,
+ PolicyOperation
operation) {
if (pulsar().getConfiguration().isAuthenticationEnabled()
- && pulsar().getBrokerService().isAuthorizationEnabled()) {
+ && pulsar().getBrokerService().isAuthorizationEnabled()) {
if (!isClientAuthenticated(clientAppId())) {
- throw new RestException(Status.FORBIDDEN, "Need to
authenticate to perform the request");
+ return FutureUtil.failedFuture(
+ new RestException(Status.FORBIDDEN, "Need to authenticate
to perform the request"));
}
+ return pulsar().getBrokerService().getAuthorizationService()
+ .allowNamespacePolicyOperationAsync(namespaceName, policy,
operation,
+ originalPrincipal(), clientAppId(),
clientAuthData())
+ .thenAccept(isAuthorized -> {
+ if (!isAuthorized) {
+ throw new RestException(Status.FORBIDDEN,
+ String.format("Unauthorized to
validateNamespacePolicyOperation for"
+ + " operation [%s] on
namespace [%s] on policy [%s]",
+ operation.toString(),
namespaceName, policy.toString()));
+ }
+ });
+ }
+ return CompletableFuture.completedFuture(null);
+ }
- boolean isAuthorized =
pulsar().getBrokerService().getAuthorizationService()
- .allowNamespacePolicyOperation(namespaceName, policy,
operation,
- originalPrincipal(), clientAppId(), clientAuthData());
-
- if (!isAuthorized) {
- throw new RestException(Status.FORBIDDEN,
- String.format("Unauthorized to
validateNamespacePolicyOperation for"
- + " operation [%s] on namespace [%s]
on policy [%s]",
- operation.toString(), namespaceName,
policy.toString()));
+ public void validateNamespacePolicyOperation(NamespaceName namespaceName,
+ PolicyName policy,
+ PolicyOperation operation) {
+ try {
+ validateNamespacePolicyOperationAsync(namespaceName, policy,
operation)
+ .get(namespaceResources().getOperationTimeoutSec(),
SECONDS);
+ } catch (InterruptedException | ExecutionException | TimeoutException
ex) {
+ Throwable realCause = FutureUtil.unwrapCompletionException(ex);
+ if (realCause instanceof WebApplicationException) {
+ throw (WebApplicationException) realCause;
Review comment:
```suggestion
throw realCause;
```
Looks enough.
##########
File path:
pulsar-broker/src/main/java/org/apache/pulsar/broker/web/PulsarWebResource.java
##########
@@ -972,15 +991,40 @@ public static ObjectMapper jsonMapper() {
return ObjectMapperFactory.getThreadLocal();
}
+ public CompletableFuture<Void> validatePoliciesReadOnlyAccessAsync() {
+ return pulsar().getPulsarResources()
+ .getNamespaceResources().getPoliciesReadOnlyAsync()
+ .thenAccept(arePoliciesReadOnly -> {
+ if (arePoliciesReadOnly) {
+ log.warn("Policies are read-only. Broker cannot do
read-write operations");
+ throw new RestException(Status.FORBIDDEN, "Broker is
forbidden to do read-write operations");
+ } else {
+ if (log.isDebugEnabled()) {
+ // Do nothing, just log the message.
+ log.debug("Broker is allowed to make read-write
operations");
+ }
+ }
+ });
+ }
+
+ /**
+ * Checks whether the broker is allowed to do read-write operations based
on the existence of a node in
+ * configuration metadata-store.
+ *
+ * @throws WebApplicationException
+ * if broker has a read only access if broker is not connected
to the configuration metadata-store
+ */
public void validatePoliciesReadOnlyAccess() {
try {
- if (namespaceResources().getPoliciesReadOnly()) {
- log.debug("Policies are read-only. Broker cannot do read-write
operations");
- throw new RestException(Status.FORBIDDEN, "Broker is forbidden
to do read-write operations");
+ validatePoliciesReadOnlyAccessAsync()
+ .get(config().getZooKeeperOperationTimeoutSeconds(),
TimeUnit.SECONDS);
+ } catch (InterruptedException | ExecutionException | TimeoutException
ex) {
+ Throwable realCause = FutureUtil.unwrapCompletionException(ex);
+ if (realCause instanceof WebApplicationException) {
+ throw (WebApplicationException) realCause;
Review comment:
```suggestion
throw realCause;
```
##########
File path:
pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/NamespacesBase.java
##########
@@ -2720,25 +2746,55 @@ private void updatePolicies(NamespaceName ns,
Function<Policies, Policies> updat
}
}
- protected void internalSetNamespaceResourceGroup(String rgName) {
- validateNamespacePolicyOperation(namespaceName,
PolicyName.RESOURCEGROUP, PolicyOperation.WRITE);
- validatePoliciesReadOnlyAccess();
-
- if (rgName != null) {
- // check resourcegroup exists.
- try {
- if (!resourceGroupResources().resourceGroupExists(rgName)) {
- throw new RestException(Status.PRECONDITION_FAILED,
"ResourceGroup does not exist");
- }
- } catch (Exception e) {
- log.error("[{}] Invalid ResourceGroup {}: {}", clientAppId(),
rgName, e);
- throw new RestException(e);
- }
- }
- internalSetPolicies("resource_group_name", rgName);
+ protected void internalGetNamespaceResourceGroup(AsyncResponse
asyncResponse, String tenant, String namespace) {
+ validateNamespacePolicyOperationAsync(NamespaceName.get(tenant,
namespace),
+ PolicyName.RESOURCEGROUP, PolicyOperation.READ)
+ .thenCompose(__ -> getNamespacePoliciesAsync(namespaceName)
+ .thenAccept(policies ->
asyncResponse.resume(policies.resource_group_name)))
Review comment:
```suggestion
.whenComplete((policies,ex) -> {
if (ex == null) {
log.info("[{}] Successfully to get namespace
resource group {}/{}", clientAppId(), tenant, namespace);
asyncResponse.resume(policies.resource_group_name)
return;
}
Throwable realCause =
FutureUtil.unwrapCompletionException(ex);
log.error("[{}] Fail to get namespace resource
group {}/{}: {}", clientAppId(), tenant, namespace, realCause);
resumeAsyncResponseExceptionally(asyncResponse,
realCause);
return;
))
```
##########
File path:
pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/NamespacesTest.java
##########
@@ -1748,4 +1738,26 @@ private void
assertInvalidRetentionPolicyAsPartOfAllPolicies(Policies policies,
assertTrue(e.getMessage().startsWith("Invalid retention policy"));
}
}
+
+ @Test
+ public void testNamespaceResourceGroup() {
+ String namespace = BrokerTestUtil.newUniqueName(this.testTenant +
"/namespace");
+ ResourceGroup testResourceGroup = new ResourceGroup();
+ testResourceGroup.setDispatchRateInBytes(10000L);
+ testResourceGroup.setDispatchRateInMsgs(100);
+ testResourceGroup.setPublishRateInMsgs(100);
+ testResourceGroup.setPublishRateInBytes(10000L);
+ final String resourceGroupName = "test-resource-group";
+ try {
Review comment:
Remove `try...catch`
##########
File path:
pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/NamespacesBase.java
##########
@@ -1782,24 +1784,48 @@ protected void
internalSetInactiveTopic(InactiveTopicPolicies inactiveTopicPolic
internalSetPolicies("inactive_topic_policies", inactiveTopicPolicies);
}
+ protected CompletableFuture<Void> internalSetPoliciesAsync(String
fieldName, Object value) {
+ return namespaceResources().getPoliciesAsync(namespaceName)
+ .thenCompose(policiesOptional -> {
+ Policies policies = policiesOptional.orElseThrow(() -> new
RestException(Status.NOT_FOUND,
+ "Namespace policies does not exist"));
+ try {
+ Field field =
Policies.class.getDeclaredField(fieldName);
+ field.setAccessible(true);
+ field.set(policies, value);
+ return namespaceResources()
+ .setPoliciesAsync(namespaceName, p -> policies)
+ .thenAccept(__ -> {
+ try {
+ log.info("[{}] Successfully updated {}
configuration: namespace={}, value={}",
+ clientAppId(), fieldName,
+ namespaceName,
jsonMapper().writeValueAsString(value));
+ } catch (JsonProcessingException ex) {
+ log.error("[{}] Failed to serialize
value while update {} configuration"
+ + " for namespace {}",
clientAppId(), fieldName , namespaceName, ex);
+ throw new RestException(ex);
Review comment:
I think we should ignore this exception.
##########
File path:
pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/NamespacesBase.java
##########
@@ -1782,24 +1784,48 @@ protected void
internalSetInactiveTopic(InactiveTopicPolicies inactiveTopicPolic
internalSetPolicies("inactive_topic_policies", inactiveTopicPolicies);
}
+ protected CompletableFuture<Void> internalSetPoliciesAsync(String
fieldName, Object value) {
+ return namespaceResources().getPoliciesAsync(namespaceName)
+ .thenCompose(policiesOptional -> {
+ Policies policies = policiesOptional.orElseThrow(() -> new
RestException(Status.NOT_FOUND,
+ "Namespace policies does not exist"));
+ try {
+ Field field =
Policies.class.getDeclaredField(fieldName);
+ field.setAccessible(true);
+ field.set(policies, value);
+ return namespaceResources()
+ .setPoliciesAsync(namespaceName, p -> policies)
+ .thenAccept(__ -> {
+ try {
+ log.info("[{}] Successfully updated {}
configuration: namespace={}, value={}",
+ clientAppId(), fieldName,
+ namespaceName,
jsonMapper().writeValueAsString(value));
+ } catch (JsonProcessingException ex) {
+ log.error("[{}] Failed to serialize
value while update {} configuration"
+ + " for namespace {}",
clientAppId(), fieldName , namespaceName, ex);
+ throw new RestException(ex);
+ }
+ });
+ } catch (Exception ex) {
+ log.error("[{}] Failed to reflect invoke field set
while update {} configuration for "
+ + "namespace {}", clientAppId(),
fieldName
+ , namespaceName, ex);
+ throw new RestException(ex);
+ }
+ });
+ }
+
protected void internalSetPolicies(String fieldName, Object value) {
try {
- Policies policies = namespaceResources().getPolicies(namespaceName)
- .orElseThrow(() -> new RestException(Status.NOT_FOUND,
- "Namespace policies does not exist"));
- Field field = Policies.class.getDeclaredField(fieldName);
- field.setAccessible(true);
- field.set(policies, value);
- namespaceResources().setPolicies(namespaceName, p -> policies);
- log.info("[{}] Successfully updated {} configuration:
namespace={}, value={}", clientAppId(), fieldName,
- namespaceName, jsonMapper().writeValueAsString(value));
-
- } catch (RestException pfe) {
- throw pfe;
- } catch (Exception e) {
- log.error("[{}] Failed to update {} configuration for namespace
{}", clientAppId(), fieldName
- , namespaceName, e);
- throw new RestException(e);
+ internalSetPoliciesAsync(fieldName, value)
+ .get(config().getZooKeeperOperationTimeoutSeconds(),
TimeUnit.SECONDS);
+ } catch (InterruptedException | ExecutionException | TimeoutException
ex) {
+ Throwable realCause = FutureUtil.unwrapCompletionException(ex);
+ if (realCause instanceof WebApplicationException) {
+ throw (WebApplicationException) realCause;
Review comment:
```suggestion
throw realCause;
```
##########
File path:
pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/NamespacesBase.java
##########
@@ -2720,25 +2746,55 @@ private void updatePolicies(NamespaceName ns,
Function<Policies, Policies> updat
}
}
- protected void internalSetNamespaceResourceGroup(String rgName) {
- validateNamespacePolicyOperation(namespaceName,
PolicyName.RESOURCEGROUP, PolicyOperation.WRITE);
- validatePoliciesReadOnlyAccess();
-
- if (rgName != null) {
- // check resourcegroup exists.
- try {
- if (!resourceGroupResources().resourceGroupExists(rgName)) {
- throw new RestException(Status.PRECONDITION_FAILED,
"ResourceGroup does not exist");
- }
- } catch (Exception e) {
- log.error("[{}] Invalid ResourceGroup {}: {}", clientAppId(),
rgName, e);
- throw new RestException(e);
- }
- }
- internalSetPolicies("resource_group_name", rgName);
+ protected void internalGetNamespaceResourceGroup(AsyncResponse
asyncResponse, String tenant, String namespace) {
+ validateNamespacePolicyOperationAsync(NamespaceName.get(tenant,
namespace),
+ PolicyName.RESOURCEGROUP, PolicyOperation.READ)
+ .thenCompose(__ -> getNamespacePoliciesAsync(namespaceName)
+ .thenAccept(policies ->
asyncResponse.resume(policies.resource_group_name)))
+ .exceptionally(ex -> {
+ Throwable realCause =
FutureUtil.unwrapCompletionException(ex);
+ if (realCause instanceof WebApplicationException) {
+ log.info("[{}] Successfully to get namespace resource
group {}/{}",
+ clientAppId(), tenant, namespace);
+ asyncResponse.resume(realCause);
+ } else {
+ log.error("[{}] Fail to get namespace resource group
{}/{}", clientAppId(), tenant, namespace);
+ asyncResponse.resume(new RestException(realCause));
+ }
+ return null;
+ });
}
-
- private static final Logger log =
LoggerFactory.getLogger(NamespacesBase.class);
+ protected void internalSetNamespaceResourceGroup(AsyncResponse
asyncResponse, String rgName) {
+ validateNamespacePolicyOperationAsync(namespaceName,
PolicyName.RESOURCEGROUP, PolicyOperation.WRITE)
+ .thenCompose(__ -> validatePoliciesReadOnlyAccessAsync())
+ .thenCompose(__ -> {
+ if (rgName != null) {
+ return
resourceGroupResources().getResourceGroupAsync(rgName)
+ .thenAccept(resourceGroup -> {
+ // check resource group exists.
+ if (!resourceGroup.isPresent()) {
+ throw new
RestException(Status.PRECONDITION_FAILED,
+ "ResourceGroup does not
exist");
+ }
+ });
+ } else {
+ return CompletableFuture.completedFuture(null);
+ }
+ }).thenCompose(__ ->
internalSetPoliciesAsync("resource_group_name", rgName))
+ .thenAccept(__ -> {
+ log.info("[{}] Successfully to set namespace resource
group {}", clientAppId(), rgName);
+ asyncResponse.resume(Response.noContent().build());
+ }).exceptionally(ex -> {
+ Throwable realCause =
FutureUtil.unwrapCompletionException(ex);
+ if (realCause instanceof WebApplicationException) {
Review comment:
```suggestion
log.error("[{}] Fail to set namespace resource group {}: {}", clientAppId(),
rgName, realCause);
resumeAsyncResponseExceptionally(asyncResponse, realCause);
return null;
```
##########
File path:
pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/NamespacesTest.java
##########
@@ -1748,4 +1738,26 @@ private void
assertInvalidRetentionPolicyAsPartOfAllPolicies(Policies policies,
assertTrue(e.getMessage().startsWith("Invalid retention policy"));
}
}
+
+ @Test
+ public void testNamespaceResourceGroup() {
+ String namespace = BrokerTestUtil.newUniqueName(this.testTenant +
"/namespace");
+ ResourceGroup testResourceGroup = new ResourceGroup();
+ testResourceGroup.setDispatchRateInBytes(10000L);
+ testResourceGroup.setDispatchRateInMsgs(100);
+ testResourceGroup.setPublishRateInMsgs(100);
+ testResourceGroup.setPublishRateInBytes(10000L);
+ final String resourceGroupName = "test-resource-group";
+ try {
+ admin.resourcegroups().createResourceGroup(resourceGroupName,
testResourceGroup);
+ admin.namespaces().createNamespace(namespace);
+ admin.namespaces().setNamespaceResourceGroup(namespace,
resourceGroupName);
+ String namespaceResourceGroup =
admin.namespaces().getNamespaceResourceGroup(namespace);
+ assertEquals(namespaceResourceGroup, resourceGroupName);
+ admin.namespaces().removeNamespaceResourceGroup(namespace);
+
assertTrue(StringUtils.isBlank(admin.namespaces().getNamespaceResourceGroup(namespace)));
+ } catch (PulsarAdminException e) {
Review comment:
Could you test the get/set/delete non-exists resource group?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]