nodece commented on a change in pull request #13970:
URL: https://github.com/apache/pulsar/pull/13970#discussion_r794241218



##########
File path: 
pulsar-broker/src/main/java/org/apache/pulsar/broker/web/PulsarWebResource.java
##########
@@ -906,24 +907,42 @@ public void validateNamespaceOperation(NamespaceName 
namespaceName, NamespaceOpe
         return CompletableFuture.completedFuture(null);
     }
 
-    public void validateNamespacePolicyOperation(NamespaceName namespaceName,
-                                                 PolicyName policy,
-                                                 PolicyOperation operation) {
+    public CompletableFuture<Void> 
validateNamespacePolicyOperationAsync(NamespaceName namespaceName,
+                                                      PolicyName policy,
+                                                      PolicyOperation 
operation) {
         if (pulsar().getConfiguration().isAuthenticationEnabled()
-            && pulsar().getBrokerService().isAuthorizationEnabled()) {
+                && pulsar().getBrokerService().isAuthorizationEnabled()) {
             if (!isClientAuthenticated(clientAppId())) {
-                throw new RestException(Status.FORBIDDEN, "Need to 
authenticate to perform the request");
+                return FutureUtil.failedFuture(
+                    new RestException(Status.FORBIDDEN, "Need to authenticate 
to perform the request"));
             }
+            return pulsar().getBrokerService().getAuthorizationService()
+                    .allowNamespacePolicyOperationAsync(namespaceName, policy, 
operation,
+                            originalPrincipal(), clientAppId(), 
clientAuthData())
+                    .thenAccept(isAuthorized -> {
+                        if (!isAuthorized) {
+                            throw new RestException(Status.FORBIDDEN,
+                                    String.format("Unauthorized to 
validateNamespacePolicyOperation for"
+                                                    + " operation [%s] on 
namespace [%s] on policy [%s]",
+                                            operation.toString(), 
namespaceName, policy.toString()));
+                        }
+                    });
+        }
+        return CompletableFuture.completedFuture(null);
+    }
 
-            boolean isAuthorized = 
pulsar().getBrokerService().getAuthorizationService()
-                    .allowNamespacePolicyOperation(namespaceName, policy, 
operation,
-                        originalPrincipal(), clientAppId(), clientAuthData());
-
-            if (!isAuthorized) {
-                throw new RestException(Status.FORBIDDEN,
-                        String.format("Unauthorized to 
validateNamespacePolicyOperation for"
-                                        + " operation [%s] on namespace [%s] 
on policy [%s]",
-                                operation.toString(), namespaceName, 
policy.toString()));
+    public void validateNamespacePolicyOperation(NamespaceName namespaceName,
+                                                 PolicyName policy,
+                                                 PolicyOperation operation) {
+        try {
+            validateNamespacePolicyOperationAsync(namespaceName, policy, 
operation)
+                    .get(namespaceResources().getOperationTimeoutSec(), 
SECONDS);
+        } catch (InterruptedException | ExecutionException | TimeoutException 
ex) {
+            Throwable realCause = FutureUtil.unwrapCompletionException(ex);
+            if (realCause instanceof WebApplicationException) {
+                throw (WebApplicationException) realCause;

Review comment:
       ```suggestion
                   throw realCause;
   ```
   
   Looks enough.

##########
File path: 
pulsar-broker/src/main/java/org/apache/pulsar/broker/web/PulsarWebResource.java
##########
@@ -972,15 +991,40 @@ public static ObjectMapper jsonMapper() {
         return ObjectMapperFactory.getThreadLocal();
     }
 
+    public CompletableFuture<Void> validatePoliciesReadOnlyAccessAsync() {
+        return pulsar().getPulsarResources()
+                .getNamespaceResources().getPoliciesReadOnlyAsync()
+                .thenAccept(arePoliciesReadOnly -> {
+                    if (arePoliciesReadOnly) {
+                        log.warn("Policies are read-only. Broker cannot do 
read-write operations");
+                        throw new RestException(Status.FORBIDDEN, "Broker is 
forbidden to do read-write operations");
+                    } else {
+                        if (log.isDebugEnabled()) {
+                            // Do nothing, just log the message.
+                            log.debug("Broker is allowed to make read-write 
operations");
+                        }
+                    }
+                });
+    }
+
+    /**
+     * Checks whether the broker is allowed to do read-write operations based 
on the existence of a node in
+     * configuration metadata-store.
+     *
+     * @throws WebApplicationException
+     *             if broker has a read only access if broker is not connected 
to the configuration metadata-store
+     */
     public void validatePoliciesReadOnlyAccess() {
         try {
-            if (namespaceResources().getPoliciesReadOnly()) {
-                log.debug("Policies are read-only. Broker cannot do read-write 
operations");
-                throw new RestException(Status.FORBIDDEN, "Broker is forbidden 
to do read-write operations");
+            validatePoliciesReadOnlyAccessAsync()
+                    .get(config().getZooKeeperOperationTimeoutSeconds(), 
TimeUnit.SECONDS);
+        } catch (InterruptedException | ExecutionException | TimeoutException 
ex) {
+            Throwable realCause = FutureUtil.unwrapCompletionException(ex);
+            if (realCause instanceof WebApplicationException) {
+                throw (WebApplicationException) realCause;

Review comment:
       ```suggestion
                   throw realCause;
   ```

##########
File path: 
pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/NamespacesBase.java
##########
@@ -2720,25 +2746,55 @@ private void updatePolicies(NamespaceName ns, 
Function<Policies, Policies> updat
        }
    }
 
-    protected void internalSetNamespaceResourceGroup(String rgName) {
-        validateNamespacePolicyOperation(namespaceName, 
PolicyName.RESOURCEGROUP, PolicyOperation.WRITE);
-        validatePoliciesReadOnlyAccess();
-
-        if (rgName != null) {
-            // check resourcegroup exists.
-            try {
-                if (!resourceGroupResources().resourceGroupExists(rgName)) {
-                    throw new RestException(Status.PRECONDITION_FAILED, 
"ResourceGroup does not exist");
-                }
-            } catch (Exception e) {
-                log.error("[{}] Invalid ResourceGroup {}: {}", clientAppId(), 
rgName, e);
-                throw new RestException(e);
-            }
-        }
 
-        internalSetPolicies("resource_group_name", rgName);
+    protected void internalGetNamespaceResourceGroup(AsyncResponse 
asyncResponse, String tenant, String namespace) {
+        validateNamespacePolicyOperationAsync(NamespaceName.get(tenant, 
namespace),
+                PolicyName.RESOURCEGROUP, PolicyOperation.READ)
+                .thenCompose(__ -> getNamespacePoliciesAsync(namespaceName)
+                        .thenAccept(policies -> 
asyncResponse.resume(policies.resource_group_name)))

Review comment:
       ```suggestion
                           .whenComplete((policies,ex) -> {
                             if (ex == null) {
                               log.info("[{}] Successfully to get namespace 
resource group {}/{}", clientAppId(), tenant, namespace);
                               
asyncResponse.resume(policies.resource_group_name)
                               return;
                             }
                             Throwable realCause = 
FutureUtil.unwrapCompletionException(ex);
                             log.error("[{}] Fail to get namespace resource 
group {}/{}: {}", clientAppId(), tenant, namespace, realCause);
                             resumeAsyncResponseExceptionally(asyncResponse, 
realCause);
                             return;
                           ))
   ```

##########
File path: 
pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/NamespacesTest.java
##########
@@ -1748,4 +1738,26 @@ private void 
assertInvalidRetentionPolicyAsPartOfAllPolicies(Policies policies,
             assertTrue(e.getMessage().startsWith("Invalid retention policy"));
         }
     }
+
+    @Test
+    public void testNamespaceResourceGroup() {
+        String namespace = BrokerTestUtil.newUniqueName(this.testTenant + 
"/namespace");
+        ResourceGroup testResourceGroup = new ResourceGroup();
+        testResourceGroup.setDispatchRateInBytes(10000L);
+        testResourceGroup.setDispatchRateInMsgs(100);
+        testResourceGroup.setPublishRateInMsgs(100);
+        testResourceGroup.setPublishRateInBytes(10000L);
+        final String resourceGroupName = "test-resource-group";
+        try {

Review comment:
       Remove `try...catch`

##########
File path: 
pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/NamespacesBase.java
##########
@@ -1782,24 +1784,48 @@ protected void 
internalSetInactiveTopic(InactiveTopicPolicies inactiveTopicPolic
         internalSetPolicies("inactive_topic_policies", inactiveTopicPolicies);
     }
 
+    protected CompletableFuture<Void> internalSetPoliciesAsync(String 
fieldName, Object value) {
+        return namespaceResources().getPoliciesAsync(namespaceName)
+                .thenCompose(policiesOptional -> {
+                    Policies policies = policiesOptional.orElseThrow(() -> new 
RestException(Status.NOT_FOUND,
+                            "Namespace policies does not exist"));
+                    try {
+                        Field field = 
Policies.class.getDeclaredField(fieldName);
+                        field.setAccessible(true);
+                        field.set(policies, value);
+                        return namespaceResources()
+                                .setPoliciesAsync(namespaceName, p -> policies)
+                                .thenAccept(__ -> {
+                                    try {
+                                        log.info("[{}] Successfully updated {} 
configuration: namespace={}, value={}",
+                                                clientAppId(), fieldName,
+                                                namespaceName, 
jsonMapper().writeValueAsString(value));
+                                    } catch (JsonProcessingException ex) {
+                                        log.error("[{}] Failed to serialize 
value while update {} configuration"
+                                                + " for namespace {}", 
clientAppId(), fieldName , namespaceName, ex);
+                                        throw new RestException(ex);

Review comment:
       I think we should ignore this exception.

##########
File path: 
pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/NamespacesBase.java
##########
@@ -1782,24 +1784,48 @@ protected void 
internalSetInactiveTopic(InactiveTopicPolicies inactiveTopicPolic
         internalSetPolicies("inactive_topic_policies", inactiveTopicPolicies);
     }
 
+    protected CompletableFuture<Void> internalSetPoliciesAsync(String 
fieldName, Object value) {
+        return namespaceResources().getPoliciesAsync(namespaceName)
+                .thenCompose(policiesOptional -> {
+                    Policies policies = policiesOptional.orElseThrow(() -> new 
RestException(Status.NOT_FOUND,
+                            "Namespace policies does not exist"));
+                    try {
+                        Field field = 
Policies.class.getDeclaredField(fieldName);
+                        field.setAccessible(true);
+                        field.set(policies, value);
+                        return namespaceResources()
+                                .setPoliciesAsync(namespaceName, p -> policies)
+                                .thenAccept(__ -> {
+                                    try {
+                                        log.info("[{}] Successfully updated {} 
configuration: namespace={}, value={}",
+                                                clientAppId(), fieldName,
+                                                namespaceName, 
jsonMapper().writeValueAsString(value));
+                                    } catch (JsonProcessingException ex) {
+                                        log.error("[{}] Failed to serialize 
value while update {} configuration"
+                                                + " for namespace {}", 
clientAppId(), fieldName , namespaceName, ex);
+                                        throw new RestException(ex);
+                                    }
+                                });
+                    } catch (Exception ex) {
+                        log.error("[{}] Failed to reflect invoke field set 
while update {} configuration for "
+                                        + "namespace {}", clientAppId(), 
fieldName
+                                , namespaceName, ex);
+                        throw new RestException(ex);
+                    }
+                });
+    }
+
     protected void internalSetPolicies(String fieldName, Object value) {
         try {
-            Policies policies = namespaceResources().getPolicies(namespaceName)
-                    .orElseThrow(() -> new RestException(Status.NOT_FOUND,
-                    "Namespace policies does not exist"));
-            Field field = Policies.class.getDeclaredField(fieldName);
-            field.setAccessible(true);
-            field.set(policies, value);
-            namespaceResources().setPolicies(namespaceName, p -> policies);
-            log.info("[{}] Successfully updated {} configuration: 
namespace={}, value={}", clientAppId(), fieldName,
-                    namespaceName, jsonMapper().writeValueAsString(value));
-
-        } catch (RestException pfe) {
-            throw pfe;
-        } catch (Exception e) {
-            log.error("[{}] Failed to update {} configuration for namespace 
{}", clientAppId(), fieldName
-                    , namespaceName, e);
-            throw new RestException(e);
+            internalSetPoliciesAsync(fieldName, value)
+                    .get(config().getZooKeeperOperationTimeoutSeconds(), 
TimeUnit.SECONDS);
+        } catch (InterruptedException | ExecutionException | TimeoutException 
ex) {
+            Throwable realCause = FutureUtil.unwrapCompletionException(ex);
+            if (realCause instanceof WebApplicationException) {
+                throw (WebApplicationException) realCause;

Review comment:
       ```suggestion
                   throw realCause;
   ```

##########
File path: 
pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/NamespacesBase.java
##########
@@ -2720,25 +2746,55 @@ private void updatePolicies(NamespaceName ns, 
Function<Policies, Policies> updat
        }
    }
 
-    protected void internalSetNamespaceResourceGroup(String rgName) {
-        validateNamespacePolicyOperation(namespaceName, 
PolicyName.RESOURCEGROUP, PolicyOperation.WRITE);
-        validatePoliciesReadOnlyAccess();
-
-        if (rgName != null) {
-            // check resourcegroup exists.
-            try {
-                if (!resourceGroupResources().resourceGroupExists(rgName)) {
-                    throw new RestException(Status.PRECONDITION_FAILED, 
"ResourceGroup does not exist");
-                }
-            } catch (Exception e) {
-                log.error("[{}] Invalid ResourceGroup {}: {}", clientAppId(), 
rgName, e);
-                throw new RestException(e);
-            }
-        }
 
-        internalSetPolicies("resource_group_name", rgName);
+    protected void internalGetNamespaceResourceGroup(AsyncResponse 
asyncResponse, String tenant, String namespace) {
+        validateNamespacePolicyOperationAsync(NamespaceName.get(tenant, 
namespace),
+                PolicyName.RESOURCEGROUP, PolicyOperation.READ)
+                .thenCompose(__ -> getNamespacePoliciesAsync(namespaceName)
+                        .thenAccept(policies -> 
asyncResponse.resume(policies.resource_group_name)))
+                .exceptionally(ex -> {
+                    Throwable realCause = 
FutureUtil.unwrapCompletionException(ex);
+                    if (realCause instanceof WebApplicationException) {
+                        log.info("[{}] Successfully to get namespace resource 
group {}/{}",
+                                clientAppId(), tenant, namespace);
+                        asyncResponse.resume(realCause);
+                    } else {
+                        log.error("[{}] Fail to get namespace resource group 
{}/{}", clientAppId(), tenant, namespace);
+                        asyncResponse.resume(new RestException(realCause));
+                    }
+                    return null;
+                });
     }
 
-
-    private static final Logger log = 
LoggerFactory.getLogger(NamespacesBase.class);
+    protected void internalSetNamespaceResourceGroup(AsyncResponse 
asyncResponse, String rgName) {
+        validateNamespacePolicyOperationAsync(namespaceName, 
PolicyName.RESOURCEGROUP, PolicyOperation.WRITE)
+                .thenCompose(__ -> validatePoliciesReadOnlyAccessAsync())
+                .thenCompose(__ -> {
+                    if (rgName != null) {
+                        return 
resourceGroupResources().getResourceGroupAsync(rgName)
+                                .thenAccept(resourceGroup -> {
+                                    // check resource group exists.
+                                    if (!resourceGroup.isPresent()) {
+                                        throw new 
RestException(Status.PRECONDITION_FAILED,
+                                                "ResourceGroup does not 
exist");
+                                    }
+                                });
+                    } else {
+                        return CompletableFuture.completedFuture(null);
+                    }
+                }).thenCompose(__ -> 
internalSetPoliciesAsync("resource_group_name", rgName))
+                .thenAccept(__ -> {
+                    log.info("[{}] Successfully to set namespace resource 
group {}", clientAppId(), rgName);
+                    asyncResponse.resume(Response.noContent().build());
+                }).exceptionally(ex -> {
+                    Throwable realCause = 
FutureUtil.unwrapCompletionException(ex);
+                    if (realCause instanceof WebApplicationException) {

Review comment:
       ```suggestion
   log.error("[{}] Fail to set namespace resource group {}: {}", clientAppId(), 
rgName, realCause);
   resumeAsyncResponseExceptionally(asyncResponse, realCause);
   return null;
   ```

##########
File path: 
pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/NamespacesTest.java
##########
@@ -1748,4 +1738,26 @@ private void 
assertInvalidRetentionPolicyAsPartOfAllPolicies(Policies policies,
             assertTrue(e.getMessage().startsWith("Invalid retention policy"));
         }
     }
+
+    @Test
+    public void testNamespaceResourceGroup() {
+        String namespace = BrokerTestUtil.newUniqueName(this.testTenant + 
"/namespace");
+        ResourceGroup testResourceGroup = new ResourceGroup();
+        testResourceGroup.setDispatchRateInBytes(10000L);
+        testResourceGroup.setDispatchRateInMsgs(100);
+        testResourceGroup.setPublishRateInMsgs(100);
+        testResourceGroup.setPublishRateInBytes(10000L);
+        final String resourceGroupName = "test-resource-group";
+        try {
+            admin.resourcegroups().createResourceGroup(resourceGroupName, 
testResourceGroup);
+            admin.namespaces().createNamespace(namespace);
+            admin.namespaces().setNamespaceResourceGroup(namespace, 
resourceGroupName);
+            String namespaceResourceGroup = 
admin.namespaces().getNamespaceResourceGroup(namespace);
+            assertEquals(namespaceResourceGroup, resourceGroupName);
+            admin.namespaces().removeNamespaceResourceGroup(namespace);
+            
assertTrue(StringUtils.isBlank(admin.namespaces().getNamespaceResourceGroup(namespace)));
+        } catch (PulsarAdminException e) {

Review comment:
       Could you test the get/set/delete non-exists resource group?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to