This is an automated email from the ASF dual-hosted git repository.

dongeforever pushed a commit to branch feature_acl
in repository https://gitbox.apache.org/repos/asf/rocketmq.git


The following commit(s) were added to refs/heads/feature_acl by this push:
     new aeea021  Draft the rpc hook and access validator plugin mechanism
aeea021 is described below

commit aeea021581ac4c49f903561219f2dd631be86d5f
Author: dongeforever <[email protected]>
AuthorDate: Sat Oct 27 14:52:51 2018 +0800

    Draft the rpc hook and access validator plugin mechanism
---
 .../org/apache/rocketmq/acl/AccessResource.java    | 21 ++++++++
 .../org/apache/rocketmq/acl/AccessValidator.java   | 35 +++++++++++++
 .../rocketmq/acl/DefaultAccessValidator.java       | 31 +++++++++++
 .../apache/rocketmq/broker/BrokerController.java   | 61 ++++++++++------------
 .../rocketmq/broker/util/ServiceProvider.java      |  8 +++
 .../org/apache/rocketmq/common/BrokerConfig.java   | 12 ++---
 .../remoting/netty/NettyRemotingAbstract.java      |  4 +-
 .../remoting/netty/NettyRemotingClient.java        |  1 -
 .../remoting/netty/NettyRemotingServer.java        |  6 +--
 9 files changed, 133 insertions(+), 46 deletions(-)

diff --git a/acl-plug/src/main/java/org/apache/rocketmq/acl/AccessResource.java 
b/acl-plug/src/main/java/org/apache/rocketmq/acl/AccessResource.java
new file mode 100644
index 0000000..e30febc
--- /dev/null
+++ b/acl-plug/src/main/java/org/apache/rocketmq/acl/AccessResource.java
@@ -0,0 +1,21 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.rocketmq.acl;
+
+public interface AccessResource {
+}
diff --git 
a/acl-plug/src/main/java/org/apache/rocketmq/acl/AccessValidator.java 
b/acl-plug/src/main/java/org/apache/rocketmq/acl/AccessValidator.java
new file mode 100644
index 0000000..d573e56
--- /dev/null
+++ b/acl-plug/src/main/java/org/apache/rocketmq/acl/AccessValidator.java
@@ -0,0 +1,35 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.rocketmq.acl;
+
+import org.apache.rocketmq.remoting.protocol.RemotingCommand;
+
+public interface AccessValidator {
+    /**
+     * Parse to get the AccessResource(user, resource, needed permission)
+     * @param request
+     * @return
+     */
+    AccessResource parse(RemotingCommand request);
+
+    /**
+     * Validate the access resource.
+     * @param accessResource
+     */
+    void validate(AccessResource accessResource) ;
+}
diff --git 
a/acl-plug/src/main/java/org/apache/rocketmq/acl/DefaultAccessValidator.java 
b/acl-plug/src/main/java/org/apache/rocketmq/acl/DefaultAccessValidator.java
new file mode 100644
index 0000000..859cc80
--- /dev/null
+++ b/acl-plug/src/main/java/org/apache/rocketmq/acl/DefaultAccessValidator.java
@@ -0,0 +1,31 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.rocketmq.acl;
+
+import org.apache.rocketmq.remoting.protocol.RemotingCommand;
+
+public class DefaultAccessValidator implements AccessValidator {
+
+    @Override public AccessResource parse(RemotingCommand request) {
+        return null;
+    }
+
+    @Override public void validate(AccessResource accessResource) {
+
+    }
+}
diff --git 
a/broker/src/main/java/org/apache/rocketmq/broker/BrokerController.java 
b/broker/src/main/java/org/apache/rocketmq/broker/BrokerController.java
index c30d1f3..7a4c105 100644
--- a/broker/src/main/java/org/apache/rocketmq/broker/BrokerController.java
+++ b/broker/src/main/java/org/apache/rocketmq/broker/BrokerController.java
@@ -19,7 +19,6 @@ package org.apache.rocketmq.broker;
 import java.io.IOException;
 import java.net.InetSocketAddress;
 import java.util.ArrayList;
-import java.util.HashMap;
 import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
@@ -32,11 +31,8 @@ import java.util.concurrent.LinkedBlockingQueue;
 import java.util.concurrent.ScheduledExecutorService;
 import java.util.concurrent.ThreadPoolExecutor;
 import java.util.concurrent.TimeUnit;
-import org.apache.commons.lang3.StringUtils;
+import org.apache.rocketmq.acl.AccessValidator;
 import org.apache.rocketmq.acl.plug.AclPlugController;
-import org.apache.rocketmq.acl.plug.AclRemotingService;
-import org.apache.rocketmq.acl.plug.entity.AccessControl;
-import org.apache.rocketmq.acl.plug.entity.ControllerParameters;
 import org.apache.rocketmq.broker.client.ClientHousekeepingService;
 import org.apache.rocketmq.broker.client.ConsumerIdsChangeListener;
 import org.apache.rocketmq.broker.client.ConsumerManager;
@@ -476,7 +472,8 @@ public class BrokerController {
                 }
             }
             initialTransaction();
-            initialAclPlug();
+            initialAcl();
+            initialRpcHooks();
         }
         return result;
     }
@@ -496,44 +493,42 @@ public class BrokerController {
         this.transactionalMessageCheckService = new 
TransactionalMessageCheckService(this);
     }
 
-    private void initialAclPlug() {
-        try {
-            if (!this.brokerConfig.isAclPlug()) {
-                log.info("Default does not start acl plug");
-                return;
-            }
-            ControllerParameters controllerParameters = new 
ControllerParameters();
-            controllerParameters.setFileHome(brokerConfig.getRocketmqHome());
-            aclPlugController = new AclPlugController(controllerParameters);
-            if (!aclPlugController.isStartSucceed()) {
-                log.error("start acl plug failure");
-                return;
-            }
-            final AclRemotingService aclRemotingService = 
aclPlugController.getAclRemotingService();
+    private void initialAcl() {
+        if (!this.brokerConfig.isEnableAcl()) {
+            log.info("The broker dose not enable acl");
+            return;
+        }
+
+        List<AccessValidator> accessValidators = 
ServiceProvider.load(ServiceProvider.ACL_VALIDATOR_ID, AccessValidator.class);
+        if (accessValidators == null || accessValidators.isEmpty()) {
+            return;
+        }
+
+        for (AccessValidator accessValidator: accessValidators) {
+            final AccessValidator validator = accessValidator;
             this.registerServerRPCHook(new RPCHook() {
 
                 @Override
                 public void doBeforeRequest(String remoteAddr, RemotingCommand 
request) {
-                    HashMap<String, String> extFields = request.getExtFields();
-                    AccessControl accessControl = new AccessControl();
-                    accessControl.setCode(request.getCode());
-                    accessControl.setRecognition(remoteAddr);
-                    if (extFields != null) {
-                        accessControl.setAccount(extFields.get("account"));
-                        accessControl.setPassword(extFields.get("password"));
-                        
accessControl.setNetaddress(StringUtils.split(remoteAddr, ":")[0]);
-                        accessControl.setTopic(extFields.get("topic"));
-                    }
-                    aclRemotingService.check(accessControl);
+                    validator.validate(validator.parse(request));
                 }
 
                 @Override
                 public void doAfterResponse(String remoteAddr, RemotingCommand 
request, RemotingCommand response) {
                 }
             });
+        }
+    }
 
-        } catch (Exception e) {
-            log.error(e.getMessage(), e);
+
+    private void initialRpcHooks() {
+
+        List<RPCHook> rpcHooks = 
ServiceProvider.load(ServiceProvider.RPC_HOOK_ID, RPCHook.class);
+        if (rpcHooks == null || rpcHooks.isEmpty()) {
+            return;
+        }
+        for (RPCHook rpcHook: rpcHooks) {
+            this.registerServerRPCHook(rpcHook);
         }
     }
 
diff --git 
a/broker/src/main/java/org/apache/rocketmq/broker/util/ServiceProvider.java 
b/broker/src/main/java/org/apache/rocketmq/broker/util/ServiceProvider.java
index 8b9b63e..e679660 100644
--- a/broker/src/main/java/org/apache/rocketmq/broker/util/ServiceProvider.java
+++ b/broker/src/main/java/org/apache/rocketmq/broker/util/ServiceProvider.java
@@ -34,6 +34,14 @@ public class ServiceProvider {
 
     public static final String TRANSACTION_LISTENER_ID = 
"META-INF/service/org.apache.rocketmq.broker.transaction.AbstractTransactionalMessageCheckListener";
 
+
+    public static final String RPC_HOOK_ID = 
"META-INF/service/org.apache.rocketmq.remoting.RPCHook";
+
+
+    public static final String ACL_VALIDATOR_ID = 
"META-INF/service/org.apache.rocketmq.acl.AccessValidator";
+
+
+
     static {
         thisClassLoader = getClassLoader(ServiceProvider.class);
     }
diff --git a/common/src/main/java/org/apache/rocketmq/common/BrokerConfig.java 
b/common/src/main/java/org/apache/rocketmq/common/BrokerConfig.java
index 6e11de2..60bd7ce 100644
--- a/common/src/main/java/org/apache/rocketmq/common/BrokerConfig.java
+++ b/common/src/main/java/org/apache/rocketmq/common/BrokerConfig.java
@@ -171,7 +171,8 @@ public class BrokerConfig {
     @ImportantField
     private long transactionCheckInterval = 60 * 1000;
 
-    private boolean isAclPlug;
+    private boolean enableAcl;
+
 
     public static String localHostName() {
         try {
@@ -711,12 +712,12 @@ public class BrokerConfig {
         this.transactionCheckInterval = transactionCheckInterval;
     }
 
-    public boolean isAclPlug() {
-        return isAclPlug;
+    public boolean isEnableAcl() {
+        return enableAcl;
     }
 
-    public void setAclPlug(boolean isAclPlug) {
-        this.isAclPlug = isAclPlug;
+    public void setEnableAcl(boolean isAclPlug) {
+        this.enableAcl = isAclPlug;
     }
 
     public int getEndTransactionThreadPoolNums() {
@@ -742,5 +743,4 @@ public class BrokerConfig {
     public void setWaitTimeMillsInTransactionQueue(long 
waitTimeMillsInTransactionQueue) {
         this.waitTimeMillsInTransactionQueue = waitTimeMillsInTransactionQueue;
     }
-
 }
diff --git 
a/remoting/src/main/java/org/apache/rocketmq/remoting/netty/NettyRemotingAbstract.java
 
b/remoting/src/main/java/org/apache/rocketmq/remoting/netty/NettyRemotingAbstract.java
index 206b96a..28ae001 100644
--- 
a/remoting/src/main/java/org/apache/rocketmq/remoting/netty/NettyRemotingAbstract.java
+++ 
b/remoting/src/main/java/org/apache/rocketmq/remoting/netty/NettyRemotingAbstract.java
@@ -36,6 +36,8 @@ import java.util.concurrent.LinkedBlockingQueue;
 import java.util.concurrent.RejectedExecutionException;
 import java.util.concurrent.Semaphore;
 import java.util.concurrent.TimeUnit;
+import org.apache.rocketmq.logging.InternalLogger;
+import org.apache.rocketmq.logging.InternalLoggerFactory;
 import org.apache.rocketmq.remoting.ChannelEventListener;
 import org.apache.rocketmq.remoting.InvokeCallback;
 import org.apache.rocketmq.remoting.RPCHook;
@@ -46,8 +48,6 @@ import org.apache.rocketmq.remoting.common.ServiceThread;
 import org.apache.rocketmq.remoting.exception.RemotingSendRequestException;
 import org.apache.rocketmq.remoting.exception.RemotingTimeoutException;
 import org.apache.rocketmq.remoting.exception.RemotingTooMuchRequestException;
-import org.apache.rocketmq.logging.InternalLogger;
-import org.apache.rocketmq.logging.InternalLoggerFactory;
 import org.apache.rocketmq.remoting.protocol.RemotingCommand;
 import org.apache.rocketmq.remoting.protocol.RemotingSysResponseCode;
 
diff --git 
a/remoting/src/main/java/org/apache/rocketmq/remoting/netty/NettyRemotingClient.java
 
b/remoting/src/main/java/org/apache/rocketmq/remoting/netty/NettyRemotingClient.java
index e891ad7..90f51ff 100644
--- 
a/remoting/src/main/java/org/apache/rocketmq/remoting/netty/NettyRemotingClient.java
+++ 
b/remoting/src/main/java/org/apache/rocketmq/remoting/netty/NettyRemotingClient.java
@@ -34,7 +34,6 @@ import io.netty.handler.timeout.IdleState;
 import io.netty.handler.timeout.IdleStateEvent;
 import io.netty.handler.timeout.IdleStateHandler;
 import io.netty.util.concurrent.DefaultEventExecutorGroup;
-import io.netty.util.concurrent.EventExecutorGroup;
 import java.io.IOException;
 import java.net.SocketAddress;
 import java.security.cert.CertificateException;
diff --git 
a/remoting/src/main/java/org/apache/rocketmq/remoting/netty/NettyRemotingServer.java
 
b/remoting/src/main/java/org/apache/rocketmq/remoting/netty/NettyRemotingServer.java
index 90386f3..ec34a4b 100644
--- 
a/remoting/src/main/java/org/apache/rocketmq/remoting/netty/NettyRemotingServer.java
+++ 
b/remoting/src/main/java/org/apache/rocketmq/remoting/netty/NettyRemotingServer.java
@@ -40,8 +40,6 @@ import io.netty.util.concurrent.DefaultEventExecutorGroup;
 import java.io.IOException;
 import java.net.InetSocketAddress;
 import java.security.cert.CertificateException;
-import java.util.ArrayList;
-import java.util.List;
 import java.util.NoSuchElementException;
 import java.util.Timer;
 import java.util.TimerTask;
@@ -49,6 +47,8 @@ import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
 import java.util.concurrent.ThreadFactory;
 import java.util.concurrent.atomic.AtomicInteger;
+import org.apache.rocketmq.logging.InternalLogger;
+import org.apache.rocketmq.logging.InternalLoggerFactory;
 import org.apache.rocketmq.remoting.ChannelEventListener;
 import org.apache.rocketmq.remoting.InvokeCallback;
 import org.apache.rocketmq.remoting.RPCHook;
@@ -60,8 +60,6 @@ import org.apache.rocketmq.remoting.common.TlsMode;
 import org.apache.rocketmq.remoting.exception.RemotingSendRequestException;
 import org.apache.rocketmq.remoting.exception.RemotingTimeoutException;
 import org.apache.rocketmq.remoting.exception.RemotingTooMuchRequestException;
-import org.apache.rocketmq.logging.InternalLogger;
-import org.apache.rocketmq.logging.InternalLoggerFactory;
 import org.apache.rocketmq.remoting.protocol.RemotingCommand;
 
 public class NettyRemotingServer extends NettyRemotingAbstract implements 
RemotingServer {

Reply via email to