This is an automated email from the ASF dual-hosted git repository.

lizhanhui pushed a commit to branch develop
in repository https://gitbox.apache.org/repos/asf/rocketmq.git


The following commit(s) were added to refs/heads/develop by this push:
     new 8c4568569 Add rpc validatation for gRPC in PlainAccessResource (#6460)
8c4568569 is described below

commit 8c4568569cf073caed4ba4c7667bdc0e4d53d6c6
Author: Zhouxiang Zhan <[email protected]>
AuthorDate: Fri Mar 24 10:16:52 2023 +0800

    Add rpc validatation for gRPC in PlainAccessResource (#6460)
    
    * including NotifyClientTerminationRequest, QueryRouteRequest, 
QueryAssignmentRequest, ChangeInvisibleDurationRequest
---
 .../rocketmq/acl/plain/PlainAccessResource.java    | 31 ++++++++++++++++++++--
 1 file changed, 29 insertions(+), 2 deletions(-)

diff --git 
a/acl/src/main/java/org/apache/rocketmq/acl/plain/PlainAccessResource.java 
b/acl/src/main/java/org/apache/rocketmq/acl/plain/PlainAccessResource.java
index 046a7d954..cdbd9ea9b 100644
--- a/acl/src/main/java/org/apache/rocketmq/acl/plain/PlainAccessResource.java
+++ b/acl/src/main/java/org/apache/rocketmq/acl/plain/PlainAccessResource.java
@@ -17,10 +17,15 @@
 package org.apache.rocketmq.acl.plain;
 
 import apache.rocketmq.v2.AckMessageRequest;
+import apache.rocketmq.v2.ChangeInvisibleDurationRequest;
+import apache.rocketmq.v2.ClientType;
 import apache.rocketmq.v2.EndTransactionRequest;
 import apache.rocketmq.v2.ForwardMessageToDeadLetterQueueRequest;
 import apache.rocketmq.v2.HeartbeatRequest;
 import apache.rocketmq.v2.Message;
+import apache.rocketmq.v2.NotifyClientTerminationRequest;
+import apache.rocketmq.v2.QueryAssignmentRequest;
+import apache.rocketmq.v2.QueryRouteRequest;
 import apache.rocketmq.v2.ReceiveMessageRequest;
 import apache.rocketmq.v2.Resource;
 import apache.rocketmq.v2.SendMessageRequest;
@@ -213,8 +218,13 @@ public class PlainAccessResource implements AccessResource 
{
             String rpcFullName = 
messageV3.getDescriptorForType().getFullName();
             if 
(HeartbeatRequest.getDescriptor().getFullName().equals(rpcFullName)) {
                 HeartbeatRequest request = (HeartbeatRequest) messageV3;
-                if (request.hasGroup()) {
-                    accessResource.addResourceAndPerm(request.getGroup(), 
Permission.SUB);
+                if (ClientType.PUSH_CONSUMER.equals(request.getClientType())
+                    || 
ClientType.SIMPLE_CONSUMER.equals(request.getClientType())) {
+                    if (!request.hasGroup()) {
+                        throw new AclException("Consumer heartbeat doesn't 
have group");
+                    } else {
+                        accessResource.addResourceAndPerm(request.getGroup(), 
Permission.SUB);
+                    }
                 }
             } else if 
(SendMessageRequest.getDescriptor().getFullName().equals(rpcFullName)) {
                 SendMessageRequest request = (SendMessageRequest) messageV3;
@@ -259,7 +269,24 @@ public class PlainAccessResource implements AccessResource 
{
                             
accessResource.addResourceAndPerm(entry.getTopic(), Permission.SUB);
                         }
                     }
+                    if (!command.getSettings().hasPublishing() && 
!command.getSettings().hasSubscription()) {
+                        throw new AclException("settings command doesn't have 
publishing or subscription");
+                    }
                 }
+            } else if 
(NotifyClientTerminationRequest.getDescriptor().getFullName().equals(rpcFullName))
 {
+                NotifyClientTerminationRequest request = 
(NotifyClientTerminationRequest) messageV3;
+                accessResource.addResourceAndPerm(request.getGroup(), 
Permission.SUB);
+            } else if 
(QueryRouteRequest.getDescriptor().getFullName().equals(rpcFullName)) {
+                QueryRouteRequest request = (QueryRouteRequest) messageV3;
+                accessResource.addResourceAndPerm(request.getTopic(), 
Permission.ANY);
+            } else if 
(QueryAssignmentRequest.getDescriptor().getFullName().equals(rpcFullName)) {
+                QueryAssignmentRequest request = (QueryAssignmentRequest) 
messageV3;
+                accessResource.addResourceAndPerm(request.getGroup(), 
Permission.SUB);
+                accessResource.addResourceAndPerm(request.getTopic(), 
Permission.SUB);
+            } else if 
(ChangeInvisibleDurationRequest.getDescriptor().getFullName().equals(rpcFullName))
 {
+                ChangeInvisibleDurationRequest request = 
(ChangeInvisibleDurationRequest) messageV3;
+                accessResource.addResourceAndPerm(request.getGroup(), 
Permission.SUB);
+                accessResource.addResourceAndPerm(request.getTopic(), 
Permission.SUB);
             }
         } catch (Throwable t) {
             throw new AclException(t.getMessage(), t);

Reply via email to