michaeljmarshall commented on a change in pull request #12339:
URL: https://github.com/apache/pulsar/pull/12339#discussion_r727528015
##########
File path:
pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractTopic.java
##########
@@ -209,18 +196,12 @@ protected boolean isConsumersExceededOnTopic() {
Integer maxConsumers =
getTopicPolicies().map(TopicPolicies::getMaxConsumerPerTopic).orElse(null);
if (maxConsumers == null) {
Policies policies;
- try {
- // Use getDataIfPresent from zk cache to make the call
non-blocking and prevent deadlocks
- policies =
brokerService.pulsar().getPulsarResources().getNamespaceResources().getPolicies(
- TopicName.get(topic).getNamespaceObject())
- .orElseGet(() -> new Policies());
+ // Use getDataIfPresent from zk cache to make the call
non-blocking and prevent deadlocks
+ policies =
brokerService.pulsar().getPulsarResources().getNamespaceResources().getPoliciesIfCached(
+ TopicName.get(topic).getNamespaceObject())
+ .orElseGet(() -> new Policies());
- if (policies == null) {
- policies = new Policies();
- }
- } catch (Exception e) {
- log.warn("[{}] Failed to get namespace policies that include
max number of consumers: {}", topic,
- e.getMessage());
+ if (policies == null) {
Review comment:
Can we consider this null check no longer necessary? I recognize that an
`Optional` can be null, but it probably _shouldn't_ be null.
##########
File path:
pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractTopic.java
##########
@@ -630,7 +611,7 @@ protected void checkTopicFenced() throws
BrokerServiceException {
}
}
- protected void internalAddProducer(Producer producer) throws
BrokerServiceException {
+ protected void internalAddProducer(Producer producer) throws
BrokerServiceException{
Review comment:
Nit: I think we can leave the space here.
```suggestion
protected void internalAddProducer(Producer producer) throws
BrokerServiceException {
```
##########
File path:
pulsar-broker-common/src/main/java/org/apache/pulsar/broker/authorization/PulsarAuthorizationProvider.java
##########
@@ -322,48 +324,42 @@ public void initialize(ServiceConfiguration conf,
PulsarResources pulsarResource
return updateSubscriptionPermissionAsync(namespace, subscriptionName,
Collections.singleton(role), true);
}
- private CompletableFuture<Void>
updateSubscriptionPermissionAsync(NamespaceName namespace, String
subscriptionName, Set<String> roles,
- boolean remove) {
- CompletableFuture<Void> result = new CompletableFuture<>();
-
+ private CompletableFuture<Void>
updateSubscriptionPermissionAsync(NamespaceName namespace, String
subscriptionName,
+
Set<String> roles,
+ boolean
remove) {
try {
validatePoliciesReadOnlyAccess();
} catch (Exception e) {
- result.completeExceptionally(e);
+ return FutureUtil.failedFuture(e);
}
- try {
- Policies policies =
pulsarResources.getNamespaceResources().getPolicies(namespace)
- .orElseThrow(() -> new NotFoundException(namespace + " not
found"));
- if (remove) {
- if
(policies.auth_policies.getSubscriptionAuthentication().get(subscriptionName)
!= null) {
-
policies.auth_policies.getSubscriptionAuthentication().get(subscriptionName).removeAll(roles);
- }else {
- log.info("[{}] Couldn't find role {} while revoking for
sub = {}", namespace, subscriptionName, roles);
- result.completeExceptionally(new
IllegalArgumentException("couldn't find subscription"));
- return result;
- }
- } else {
-
policies.auth_policies.getSubscriptionAuthentication().put(subscriptionName,
roles);
- }
- pulsarResources.getNamespaceResources().setPolicies(namespace,
(data)->policies);
+ CompletableFuture<Void> future =
+
pulsarResources.getNamespaceResources().setPoliciesAsync(namespace, policies ->
{
+ if (remove) {
+ if
(policies.auth_policies.getSubscriptionAuthentication().get(subscriptionName)
!= null) {
+
policies.auth_policies.getSubscriptionAuthentication().get(subscriptionName)
+ .removeAll(roles);
Review comment:
Nit: I think we could simplify this with a local variable. As a result,
we'll only retrieve the reference from the map once. What do you think?
```suggestion
Set<String> currentRoles =
policies.auth_policies.getSubscriptionAuthentication().get(subscriptionName);
if (currentRoles != null) {
currentRoles.removeAll(roles);
```
##########
File path:
pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/DispatchRateLimiter.java
##########
@@ -180,7 +180,7 @@ public static boolean isDispatchRateNeeded(BrokerService
brokerService, Optional
return true;
}
- policies = policies.isPresent() ? policies :
getPolicies(brokerService, topicName);
+ policies = policies.isPresent() ? policies :
getPolicies(brokerService, topicName);
Review comment:
Nit: we can remove the extra space here.
##########
File path:
pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractTopic.java
##########
@@ -802,9 +780,19 @@ private void updatePublishDispatcher(Policies policies) {
return;
}
+ Policies policies;
+ try {
+ policies =
brokerService.pulsar().getPulsarResources().getNamespaceResources().getPoliciesIfCached(
+ TopicName.get(topic).getNamespaceObject())
+ .orElseGet(() -> new Policies());
Review comment:
I see this was added in the most recent commit to help with a mocked
test. This block doesn't throw any checked exceptions. Do we expect
`RuntimeExceptions` from it? If we do, should we wrap other similar code blocks
in this PR with try and catch? If we don't, perhaps we should update the test.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]