This is an automated email from the ASF dual-hosted git repository.
lizhanhui pushed a commit to branch develop
in repository https://gitbox.apache.org/repos/asf/rocketmq.git
The following commit(s) were added to refs/heads/develop by this push:
new 8c4568569 Add rpc validatation for gRPC in PlainAccessResource (#6460)
8c4568569 is described below
commit 8c4568569cf073caed4ba4c7667bdc0e4d53d6c6
Author: Zhouxiang Zhan <[email protected]>
AuthorDate: Fri Mar 24 10:16:52 2023 +0800
Add rpc validatation for gRPC in PlainAccessResource (#6460)
* including NotifyClientTerminationRequest, QueryRouteRequest,
QueryAssignmentRequest, ChangeInvisibleDurationRequest
---
.../rocketmq/acl/plain/PlainAccessResource.java | 31 ++++++++++++++++++++--
1 file changed, 29 insertions(+), 2 deletions(-)
diff --git
a/acl/src/main/java/org/apache/rocketmq/acl/plain/PlainAccessResource.java
b/acl/src/main/java/org/apache/rocketmq/acl/plain/PlainAccessResource.java
index 046a7d954..cdbd9ea9b 100644
--- a/acl/src/main/java/org/apache/rocketmq/acl/plain/PlainAccessResource.java
+++ b/acl/src/main/java/org/apache/rocketmq/acl/plain/PlainAccessResource.java
@@ -17,10 +17,15 @@
package org.apache.rocketmq.acl.plain;
import apache.rocketmq.v2.AckMessageRequest;
+import apache.rocketmq.v2.ChangeInvisibleDurationRequest;
+import apache.rocketmq.v2.ClientType;
import apache.rocketmq.v2.EndTransactionRequest;
import apache.rocketmq.v2.ForwardMessageToDeadLetterQueueRequest;
import apache.rocketmq.v2.HeartbeatRequest;
import apache.rocketmq.v2.Message;
+import apache.rocketmq.v2.NotifyClientTerminationRequest;
+import apache.rocketmq.v2.QueryAssignmentRequest;
+import apache.rocketmq.v2.QueryRouteRequest;
import apache.rocketmq.v2.ReceiveMessageRequest;
import apache.rocketmq.v2.Resource;
import apache.rocketmq.v2.SendMessageRequest;
@@ -213,8 +218,13 @@ public class PlainAccessResource implements AccessResource
{
String rpcFullName =
messageV3.getDescriptorForType().getFullName();
if
(HeartbeatRequest.getDescriptor().getFullName().equals(rpcFullName)) {
HeartbeatRequest request = (HeartbeatRequest) messageV3;
- if (request.hasGroup()) {
- accessResource.addResourceAndPerm(request.getGroup(),
Permission.SUB);
+ if (ClientType.PUSH_CONSUMER.equals(request.getClientType())
+ ||
ClientType.SIMPLE_CONSUMER.equals(request.getClientType())) {
+ if (!request.hasGroup()) {
+ throw new AclException("Consumer heartbeat doesn't
have group");
+ } else {
+ accessResource.addResourceAndPerm(request.getGroup(),
Permission.SUB);
+ }
}
} else if
(SendMessageRequest.getDescriptor().getFullName().equals(rpcFullName)) {
SendMessageRequest request = (SendMessageRequest) messageV3;
@@ -259,7 +269,24 @@ public class PlainAccessResource implements AccessResource
{
accessResource.addResourceAndPerm(entry.getTopic(), Permission.SUB);
}
}
+ if (!command.getSettings().hasPublishing() &&
!command.getSettings().hasSubscription()) {
+ throw new AclException("settings command doesn't have
publishing or subscription");
+ }
}
+ } else if
(NotifyClientTerminationRequest.getDescriptor().getFullName().equals(rpcFullName))
{
+ NotifyClientTerminationRequest request =
(NotifyClientTerminationRequest) messageV3;
+ accessResource.addResourceAndPerm(request.getGroup(),
Permission.SUB);
+ } else if
(QueryRouteRequest.getDescriptor().getFullName().equals(rpcFullName)) {
+ QueryRouteRequest request = (QueryRouteRequest) messageV3;
+ accessResource.addResourceAndPerm(request.getTopic(),
Permission.ANY);
+ } else if
(QueryAssignmentRequest.getDescriptor().getFullName().equals(rpcFullName)) {
+ QueryAssignmentRequest request = (QueryAssignmentRequest)
messageV3;
+ accessResource.addResourceAndPerm(request.getGroup(),
Permission.SUB);
+ accessResource.addResourceAndPerm(request.getTopic(),
Permission.SUB);
+ } else if
(ChangeInvisibleDurationRequest.getDescriptor().getFullName().equals(rpcFullName))
{
+ ChangeInvisibleDurationRequest request =
(ChangeInvisibleDurationRequest) messageV3;
+ accessResource.addResourceAndPerm(request.getGroup(),
Permission.SUB);
+ accessResource.addResourceAndPerm(request.getTopic(),
Permission.SUB);
}
} catch (Throwable t) {
throw new AclException(t.getMessage(), t);