This is an automated email from the ASF dual-hosted git repository. zhouxzhan pushed a commit to branch develop in repository https://gitbox.apache.org/repos/asf/rocketmq.git
commit 29c4122d9e7a919fee701ec692e5aa277c8e8ca9 Author: zhouxiang <[email protected]> AuthorDate: Thu Nov 17 17:07:45 2022 +0800 [ISSUE #5486] Add AuthenticationPipeline --- .../proxy/remoting/RemotingProtocolServer.java | 10 ++++- .../remoting/pipeline/AuthenticationPipeline.java | 46 ++++++++++++++++++++++ 2 files changed, 55 insertions(+), 1 deletion(-) diff --git a/proxy/src/main/java/org/apache/rocketmq/proxy/remoting/RemotingProtocolServer.java b/proxy/src/main/java/org/apache/rocketmq/proxy/remoting/RemotingProtocolServer.java index a173a79b6..adbc21169 100644 --- a/proxy/src/main/java/org/apache/rocketmq/proxy/remoting/RemotingProtocolServer.java +++ b/proxy/src/main/java/org/apache/rocketmq/proxy/remoting/RemotingProtocolServer.java @@ -19,12 +19,16 @@ package org.apache.rocketmq.proxy.remoting; import com.google.common.util.concurrent.ThreadFactoryBuilder; import io.netty.channel.Channel; +import java.util.ArrayList; +import java.util.List; import java.util.concurrent.BlockingQueue; import java.util.concurrent.CompletableFuture; import java.util.concurrent.Executors; import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.ThreadPoolExecutor; import java.util.concurrent.TimeUnit; +import org.apache.rocketmq.acl.AccessValidator; +import org.apache.rocketmq.acl.plain.PlainAccessValidator; import org.apache.rocketmq.client.exception.MQClientException; import org.apache.rocketmq.common.constant.LoggerName; import org.apache.rocketmq.common.future.FutureTaskExt; @@ -46,6 +50,7 @@ import org.apache.rocketmq.proxy.remoting.activity.PullMessageActivity; import org.apache.rocketmq.proxy.remoting.activity.SendMessageActivity; import org.apache.rocketmq.proxy.remoting.activity.TransactionActivity; import org.apache.rocketmq.proxy.remoting.channel.RemotingChannelManager; +import org.apache.rocketmq.proxy.remoting.pipeline.AuthenticationPipeline; import org.apache.rocketmq.proxy.remoting.pipeline.RequestPipeline; import org.apache.rocketmq.remoting.ChannelEventListener; import org.apache.rocketmq.remoting.RemotingServer; @@ -252,9 +257,12 @@ public class RemotingProtocolServer implements StartAndShutdown, RemotingProxyOu RequestPipeline pipeline = (ctx, request, context) -> { }; + List<AccessValidator> accessValidatorList = new ArrayList<>(); + accessValidatorList.add(new PlainAccessValidator()); + // add pipeline // the last pipe add will execute at the first - return pipeline; + return pipeline.pipe(new AuthenticationPipeline(accessValidatorList)); } protected class ThreadPoolHeadSlowTimeMillsMonitor implements ThreadPoolStatusMonitor { diff --git a/proxy/src/main/java/org/apache/rocketmq/proxy/remoting/pipeline/AuthenticationPipeline.java b/proxy/src/main/java/org/apache/rocketmq/proxy/remoting/pipeline/AuthenticationPipeline.java new file mode 100644 index 000000000..4bcc1479d --- /dev/null +++ b/proxy/src/main/java/org/apache/rocketmq/proxy/remoting/pipeline/AuthenticationPipeline.java @@ -0,0 +1,46 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.rocketmq.proxy.remoting.pipeline; + +import io.netty.channel.ChannelHandlerContext; +import java.util.List; +import org.apache.rocketmq.acl.AccessResource; +import org.apache.rocketmq.acl.AccessValidator; +import org.apache.rocketmq.proxy.common.ProxyContext; +import org.apache.rocketmq.proxy.config.ConfigurationManager; +import org.apache.rocketmq.proxy.config.ProxyConfig; +import org.apache.rocketmq.remoting.protocol.RemotingCommand; + +public class AuthenticationPipeline implements RequestPipeline { + private final List<AccessValidator> accessValidatorList; + + public AuthenticationPipeline(List<AccessValidator> accessValidatorList) { + this.accessValidatorList = accessValidatorList; + } + + @Override + public void execute(ChannelHandlerContext ctx, RemotingCommand request, ProxyContext context) throws Exception { + ProxyConfig config = ConfigurationManager.getProxyConfig(); + if (config.isEnableACL()) { + for (AccessValidator accessValidator : accessValidatorList) { + AccessResource accessResource = accessValidator.parse(request, context.getRemoteAddress()); + accessValidator.validate(accessResource); + } + } + } +}
