This is an automated email from the ASF dual-hosted git repository.

zhouxzhan pushed a commit to branch develop
in repository https://gitbox.apache.org/repos/asf/rocketmq.git

commit 29c4122d9e7a919fee701ec692e5aa277c8e8ca9
Author: zhouxiang <[email protected]>
AuthorDate: Thu Nov 17 17:07:45 2022 +0800

    [ISSUE #5486] Add AuthenticationPipeline
---
 .../proxy/remoting/RemotingProtocolServer.java     | 10 ++++-
 .../remoting/pipeline/AuthenticationPipeline.java  | 46 ++++++++++++++++++++++
 2 files changed, 55 insertions(+), 1 deletion(-)

diff --git 
a/proxy/src/main/java/org/apache/rocketmq/proxy/remoting/RemotingProtocolServer.java
 
b/proxy/src/main/java/org/apache/rocketmq/proxy/remoting/RemotingProtocolServer.java
index a173a79b6..adbc21169 100644
--- 
a/proxy/src/main/java/org/apache/rocketmq/proxy/remoting/RemotingProtocolServer.java
+++ 
b/proxy/src/main/java/org/apache/rocketmq/proxy/remoting/RemotingProtocolServer.java
@@ -19,12 +19,16 @@ package org.apache.rocketmq.proxy.remoting;
 
 import com.google.common.util.concurrent.ThreadFactoryBuilder;
 import io.netty.channel.Channel;
+import java.util.ArrayList;
+import java.util.List;
 import java.util.concurrent.BlockingQueue;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.Executors;
 import java.util.concurrent.ScheduledExecutorService;
 import java.util.concurrent.ThreadPoolExecutor;
 import java.util.concurrent.TimeUnit;
+import org.apache.rocketmq.acl.AccessValidator;
+import org.apache.rocketmq.acl.plain.PlainAccessValidator;
 import org.apache.rocketmq.client.exception.MQClientException;
 import org.apache.rocketmq.common.constant.LoggerName;
 import org.apache.rocketmq.common.future.FutureTaskExt;
@@ -46,6 +50,7 @@ import 
org.apache.rocketmq.proxy.remoting.activity.PullMessageActivity;
 import org.apache.rocketmq.proxy.remoting.activity.SendMessageActivity;
 import org.apache.rocketmq.proxy.remoting.activity.TransactionActivity;
 import org.apache.rocketmq.proxy.remoting.channel.RemotingChannelManager;
+import org.apache.rocketmq.proxy.remoting.pipeline.AuthenticationPipeline;
 import org.apache.rocketmq.proxy.remoting.pipeline.RequestPipeline;
 import org.apache.rocketmq.remoting.ChannelEventListener;
 import org.apache.rocketmq.remoting.RemotingServer;
@@ -252,9 +257,12 @@ public class RemotingProtocolServer implements 
StartAndShutdown, RemotingProxyOu
         RequestPipeline pipeline = (ctx, request, context) -> {
         };
 
+        List<AccessValidator> accessValidatorList = new ArrayList<>();
+        accessValidatorList.add(new PlainAccessValidator());
+
         // add pipeline
         // the last pipe add will execute at the first
-        return pipeline;
+        return pipeline.pipe(new AuthenticationPipeline(accessValidatorList));
     }
 
     protected class ThreadPoolHeadSlowTimeMillsMonitor implements 
ThreadPoolStatusMonitor {
diff --git 
a/proxy/src/main/java/org/apache/rocketmq/proxy/remoting/pipeline/AuthenticationPipeline.java
 
b/proxy/src/main/java/org/apache/rocketmq/proxy/remoting/pipeline/AuthenticationPipeline.java
new file mode 100644
index 000000000..4bcc1479d
--- /dev/null
+++ 
b/proxy/src/main/java/org/apache/rocketmq/proxy/remoting/pipeline/AuthenticationPipeline.java
@@ -0,0 +1,46 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.rocketmq.proxy.remoting.pipeline;
+
+import io.netty.channel.ChannelHandlerContext;
+import java.util.List;
+import org.apache.rocketmq.acl.AccessResource;
+import org.apache.rocketmq.acl.AccessValidator;
+import org.apache.rocketmq.proxy.common.ProxyContext;
+import org.apache.rocketmq.proxy.config.ConfigurationManager;
+import org.apache.rocketmq.proxy.config.ProxyConfig;
+import org.apache.rocketmq.remoting.protocol.RemotingCommand;
+
+public class AuthenticationPipeline implements RequestPipeline {
+    private final List<AccessValidator> accessValidatorList;
+
+    public AuthenticationPipeline(List<AccessValidator> accessValidatorList) {
+        this.accessValidatorList = accessValidatorList;
+    }
+
+    @Override
+    public void execute(ChannelHandlerContext ctx, RemotingCommand request, 
ProxyContext context) throws Exception {
+        ProxyConfig config = ConfigurationManager.getProxyConfig();
+        if (config.isEnableACL()) {
+            for (AccessValidator accessValidator : accessValidatorList) {
+                AccessResource accessResource = accessValidator.parse(request, 
context.getRemoteAddress());
+                accessValidator.validate(accessResource);
+            }
+        }
+    }
+}

Reply via email to