dongeforever closed pull request #519: [ISSUE#403]Implement the acl strandard
for rocketmq
URL: https://github.com/apache/rocketmq/pull/519
This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:
As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):
diff --git a/acl-plug/pom.xml b/acl-plug/pom.xml
index 1cdc4a29d..d91d42034 100644
--- a/acl-plug/pom.xml
+++ b/acl-plug/pom.xml
@@ -18,8 +18,8 @@
<artifactId>rocketmq-all</artifactId>
<version>4.4.0-SNAPSHOT</version>
</parent>
- <artifactId>rocketmq-acl-plug</artifactId>
- <name>rocketmq-acl-plug ${project.version}</name>
+ <artifactId>rocketmq-acl</artifactId>
+ <name>rocketmq-acl ${project.version}</name>
<url>http://maven.apache.org</url>
<properties>
diff --git
a/acl-plug/src/main/java/org/apache/rocketmq/acl/AccessValidator.java
b/acl-plug/src/main/java/org/apache/rocketmq/acl/AccessValidator.java
index d573e56cf..0b1b0823c 100644
--- a/acl-plug/src/main/java/org/apache/rocketmq/acl/AccessValidator.java
+++ b/acl-plug/src/main/java/org/apache/rocketmq/acl/AccessValidator.java
@@ -22,14 +22,16 @@
public interface AccessValidator {
/**
* Parse to get the AccessResource(user, resource, needed permission)
+ *
* @param request
* @return
*/
- AccessResource parse(RemotingCommand request);
+ AccessResource parse(RemotingCommand request, String remoteAddr);
/**
* Validate the access resource.
+ *
* @param accessResource
*/
- void validate(AccessResource accessResource) ;
+ void validate(AccessResource accessResource);
}
diff --git
a/acl-plug/src/main/java/org/apache/rocketmq/acl/DefaultAccessValidator.java
b/acl-plug/src/main/java/org/apache/rocketmq/acl/DefaultAccessValidator.java
index 859cc8092..704ace47b 100644
--- a/acl-plug/src/main/java/org/apache/rocketmq/acl/DefaultAccessValidator.java
+++ b/acl-plug/src/main/java/org/apache/rocketmq/acl/DefaultAccessValidator.java
@@ -21,11 +21,13 @@
public class DefaultAccessValidator implements AccessValidator {
- @Override public AccessResource parse(RemotingCommand request) {
+ @Override
+ public AccessResource parse(RemotingCommand request, String remoteAddr) {
return null;
}
- @Override public void validate(AccessResource accessResource) {
+ @Override
+ public void validate(AccessResource accessResource) {
}
}
diff --git
a/acl-plug/src/main/java/org/apache/rocketmq/acl/plug/AccessContralAnalysis.java
b/acl-plug/src/main/java/org/apache/rocketmq/acl/plug/AccessContralAnalysis.java
index 75c907d82..1adf6d432 100644
---
a/acl-plug/src/main/java/org/apache/rocketmq/acl/plug/AccessContralAnalysis.java
+++
b/acl-plug/src/main/java/org/apache/rocketmq/acl/plug/AccessContralAnalysis.java
@@ -16,14 +16,15 @@
*/
package org.apache.rocketmq.acl.plug;
+import org.apache.commons.lang3.StringUtils;
+import org.apache.rocketmq.acl.plug.entity.AccessControl;
+import org.apache.rocketmq.acl.plug.exception.AclPlugRuntimeException;
+
import java.lang.reflect.Field;
import java.util.HashMap;
import java.util.Iterator;
import java.util.Map;
import java.util.Map.Entry;
-import org.apache.commons.lang3.StringUtils;
-import org.apache.rocketmq.acl.plug.entity.AccessControl;
-import org.apache.rocketmq.acl.plug.exception.AclPlugRuntimeException;
public class AccessContralAnalysis {
diff --git
a/acl-plug/src/main/java/org/apache/rocketmq/acl/plug/Authentication.java
b/acl-plug/src/main/java/org/apache/rocketmq/acl/plug/Authentication.java
index 901cc409d..ae247e722 100644
--- a/acl-plug/src/main/java/org/apache/rocketmq/acl/plug/Authentication.java
+++ b/acl-plug/src/main/java/org/apache/rocketmq/acl/plug/Authentication.java
@@ -24,7 +24,7 @@
public class Authentication {
public boolean authentication(AuthenticationInfo authenticationInfo,
- AccessControl accessControl, AuthenticationResult
authenticationResult) {
+ AccessControl accessControl,
AuthenticationResult authenticationResult) {
int code = accessControl.getCode();
if (!authenticationInfo.getAuthority().get(code)) {
authenticationResult.setResultString(String.format("code is %d
Authentication failed", code));
diff --git
a/acl-plug/src/main/java/org/apache/rocketmq/acl/plug/DefaultAclRemotingServiceImpl.java
b/acl-plug/src/main/java/org/apache/rocketmq/acl/plug/DefaultAclRemotingServiceImpl.java
index b42205abc..0d5f949c9 100644
---
a/acl-plug/src/main/java/org/apache/rocketmq/acl/plug/DefaultAclRemotingServiceImpl.java
+++
b/acl-plug/src/main/java/org/apache/rocketmq/acl/plug/DefaultAclRemotingServiceImpl.java
@@ -16,15 +16,28 @@
*/
package org.apache.rocketmq.acl.plug;
+import java.util.HashMap;
+import org.apache.commons.lang3.StringUtils;
+import org.apache.rocketmq.acl.AccessResource;
+import org.apache.rocketmq.acl.AccessValidator;
import org.apache.rocketmq.acl.plug.engine.AclPlugEngine;
+import org.apache.rocketmq.acl.plug.engine.PlainAclPlugEngine;
import org.apache.rocketmq.acl.plug.entity.AccessControl;
import org.apache.rocketmq.acl.plug.entity.AuthenticationResult;
+import org.apache.rocketmq.acl.plug.entity.ControllerParameters;
import org.apache.rocketmq.acl.plug.exception.AclPlugRuntimeException;
+import org.apache.rocketmq.remoting.protocol.RemotingCommand;
-public class DefaultAclRemotingServiceImpl implements AclRemotingService {
+public class DefaultAclRemotingServiceImpl implements AclRemotingService,
AccessValidator {
private AclPlugEngine aclPlugEngine;
+ public DefaultAclRemotingServiceImpl() {
+ ControllerParameters controllerParameters = new ControllerParameters();
+ this.aclPlugEngine = new PlainAclPlugEngine(controllerParameters);
+ this.aclPlugEngine.initialize();
+ }
+
public DefaultAclRemotingServiceImpl(AclPlugEngine aclPlugEngine) {
this.aclPlugEngine = aclPlugEngine;
}
@@ -41,4 +54,34 @@ public AuthenticationResult check(AccessControl
accessControl) {
return authenticationResult;
}
+ @Override
+ public AccessResource parse(RemotingCommand request, String remoteAddr) {
+ HashMap<String, String> extFields = request.getExtFields();
+ AccessControl accessControl = new AccessControl();
+ accessControl.setCode(request.getCode());
+ accessControl.setRecognition(remoteAddr);
+ accessControl.setNetaddress(StringUtils.split(remoteAddr, ":")[0]);
+ if (extFields != null) {
+ accessControl.setAccount(extFields.get("account"));
+ accessControl.setPassword(extFields.get("password"));
+ accessControl.setTopic(extFields.get("topic"));
+ }
+ return accessControl;
+ }
+
+ @Override
+ public void validate(AccessResource accessResource) {
+ try {
+ AuthenticationResult authenticationResult =
aclPlugEngine.eachCheckAuthentication((AccessControl) accessResource);
+ if (authenticationResult.getException() != null) {
+ throw new AclPlugRuntimeException(String.format("eachCheck the
inspection appear exception, accessControl data is %s",
accessResource.toString()), authenticationResult.getException());
+ }
+ if (authenticationResult.getAccessControl() == null ||
!authenticationResult.isSucceed()) {
+ throw new AclPlugRuntimeException(String.format("%s
accessControl data is %s", authenticationResult.getResultString(),
accessResource.toString()));
+ }
+ } catch (Exception e) {
+ throw new AclPlugRuntimeException(String.format("validate
exception AccessResource data %s", accessResource.toString()), e);
+ }
+ }
+
}
diff --git
a/acl-plug/src/main/java/org/apache/rocketmq/acl/plug/engine/AclPlugEngine.java
b/acl-plug/src/main/java/org/apache/rocketmq/acl/plug/engine/AclPlugEngine.java
index badae946c..d1572755e 100644
---
a/acl-plug/src/main/java/org/apache/rocketmq/acl/plug/engine/AclPlugEngine.java
+++
b/acl-plug/src/main/java/org/apache/rocketmq/acl/plug/engine/AclPlugEngine.java
@@ -31,5 +31,7 @@
public AuthenticationResult eachCheckLoginAndAuthentication(AccessControl
accessControl);
+ public AuthenticationResult eachCheckAuthentication(AccessControl
accessControl);
+
public void initialize();
}
diff --git
a/acl-plug/src/main/java/org/apache/rocketmq/acl/plug/engine/AuthenticationInfoManagementAclPlugEngine.java
b/acl-plug/src/main/java/org/apache/rocketmq/acl/plug/engine/AuthenticationInfoManagementAclPlugEngine.java
index 6aac6bd43..a6399fc3e 100644
---
a/acl-plug/src/main/java/org/apache/rocketmq/acl/plug/engine/AuthenticationInfoManagementAclPlugEngine.java
+++
b/acl-plug/src/main/java/org/apache/rocketmq/acl/plug/engine/AuthenticationInfoManagementAclPlugEngine.java
@@ -16,9 +16,11 @@
*/
package org.apache.rocketmq.acl.plug.engine;
+import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
+
import org.apache.rocketmq.acl.plug.AccessContralAnalysis;
import org.apache.rocketmq.acl.plug.Authentication;
import org.apache.rocketmq.acl.plug.entity.AccessControl;
@@ -37,7 +39,7 @@
private static final InternalLogger log =
InternalLoggerFactory.getLogger(LoggerName.ACL_PLUG_LOGGER_NAME);
ControllerParameters controllerParameters;
- private Map<String/** account **/, Map<String/** netaddress **/,
AuthenticationInfo>> accessControlMap = new HashMap<>();
+ private Map<String/** account **/, List<AuthenticationInfo>>
accessControlMap = new HashMap<>();
private AuthenticationInfo authenticationInfo;
private NetaddressStrategyFactory netaddressStrategyFactory = new
NetaddressStrategyFactory();
private AccessContralAnalysis accessContralAnalysis = new
AccessContralAnalysis();
@@ -54,16 +56,16 @@ public void setAccessControl(AccessControl accessControl)
throws AclPlugRuntimeE
}
try {
NetaddressStrategy netaddressStrategy =
netaddressStrategyFactory.getNetaddressStrategy(accessControl);
- Map<String, AuthenticationInfo> accessControlAddressMap =
accessControlMap.get(accessControl.getAccount());
- if (accessControlAddressMap == null) {
- accessControlAddressMap = new HashMap<>();
- accessControlMap.put(accessControl.getAccount(),
accessControlAddressMap);
+ List<AuthenticationInfo> accessControlAddressList =
accessControlMap.get(accessControl.getAccount());
+ if (accessControlAddressList == null) {
+ accessControlAddressList = new ArrayList<>();
+ accessControlMap.put(accessControl.getAccount(),
accessControlAddressList);
}
AuthenticationInfo authenticationInfo = new
AuthenticationInfo(accessContralAnalysis.analysis(accessControl),
accessControl, netaddressStrategy);
- accessControlAddressMap.put(accessControl.getNetaddress(),
authenticationInfo);
+ accessControlAddressList.add(authenticationInfo);
log.info("authenticationInfo is {}",
authenticationInfo.toString());
} catch (Exception e) {
- throw new AclPlugRuntimeException(String.format("Exception info %s
%s" ,e.getMessage() , accessControl.toString()), e);
+ throw new AclPlugRuntimeException(String.format("Exception info %s
%s", e.getMessage(), accessControl.toString()), e);
}
}
@@ -84,24 +86,19 @@ public void setNetaddressAccessControl(AccessControl
accessControl) throws AclPl
}
public AuthenticationInfo getAccessControl(AccessControl accessControl) {
- AuthenticationInfo existing = null;
if (accessControl.getAccount() == null && authenticationInfo != null) {
- existing =
authenticationInfo.getNetaddressStrategy().match(accessControl) ?
authenticationInfo : null;
+ return
authenticationInfo.getNetaddressStrategy().match(accessControl) ?
authenticationInfo : null;
} else {
- Map<String, AuthenticationInfo> accessControlAddressMap =
accessControlMap.get(accessControl.getAccount());
- if (accessControlAddressMap != null) {
- existing =
accessControlAddressMap.get(accessControl.getNetaddress());
- if (existing == null)
- return null;
- if
(existing.getAccessControl().getPassword().equals(accessControl.getPassword()))
{
- if (existing.getNetaddressStrategy().match(accessControl))
{
- return existing;
+ List<AuthenticationInfo> accessControlAddressList =
accessControlMap.get(accessControl.getAccount());
+ if (accessControlAddressList != null) {
+ for (AuthenticationInfo ai : accessControlAddressList) {
+ if (ai.getNetaddressStrategy().match(accessControl) &&
ai.getAccessControl().getPassword().equals(accessControl.getPassword())) {
+ return ai;
}
}
- existing = null;
}
}
- return existing;
+ return null;
}
@Override
@@ -112,6 +109,7 @@ public AuthenticationResult
eachCheckLoginAndAuthentication(AccessControl access
if (authenticationInfo != null) {
boolean boo =
authentication.authentication(authenticationInfo, accessControl,
authenticationResult);
authenticationResult.setSucceed(boo);
+
authenticationResult.setAccessControl(authenticationInfo.getAccessControl());
}
} catch (Exception e) {
authenticationResult.setException(e);
@@ -119,6 +117,21 @@ public AuthenticationResult
eachCheckLoginAndAuthentication(AccessControl access
return authenticationResult;
}
+ public AuthenticationResult eachCheckAuthentication(AccessControl
accessControl) {
+ AuthenticationResult authenticationResult = new AuthenticationResult();
+ AuthenticationInfo authenticationInfo =
getAccessControl(accessControl);
+ if (authenticationInfo != null) {
+ boolean boo = authentication.authentication(authenticationInfo,
accessControl, authenticationResult);
+ authenticationResult.setSucceed(boo);
+
authenticationResult.setAccessControl(authenticationInfo.getAccessControl());
+ } else {
+ authenticationResult.setResultString("accessControl is null,
Please check login, password, IP\"");
+ }
+
+
+ return authenticationResult;
+ }
+
void setBorkerAccessControlTransport(BorkerAccessControlTransport
transport) {
if (transport.getOnlyNetAddress() == null && (transport.getList() ==
null || transport.getList().size() == 0)) {
throw new AclPlugRuntimeException("onlyNetAddress and list can't
be all empty");
@@ -135,5 +148,5 @@ void
setBorkerAccessControlTransport(BorkerAccessControlTransport transport) {
}
protected abstract AuthenticationInfo getAuthenticationInfo(AccessControl
accessControl,
- AuthenticationResult authenticationResult);
+
AuthenticationResult authenticationResult);
}
diff --git
a/acl-plug/src/main/java/org/apache/rocketmq/acl/plug/engine/LoginInfoAclPlugEngine.java
b/acl-plug/src/main/java/org/apache/rocketmq/acl/plug/engine/LoginInfoAclPlugEngine.java
index e8dc59c42..35b568349 100644
---
a/acl-plug/src/main/java/org/apache/rocketmq/acl/plug/engine/LoginInfoAclPlugEngine.java
+++
b/acl-plug/src/main/java/org/apache/rocketmq/acl/plug/engine/LoginInfoAclPlugEngine.java
@@ -18,6 +18,7 @@
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
+
import org.apache.rocketmq.acl.plug.entity.AccessControl;
import org.apache.rocketmq.acl.plug.entity.AuthenticationInfo;
import org.apache.rocketmq.acl.plug.entity.AuthenticationResult;
@@ -53,7 +54,7 @@ public void deleteLoginInfo(String remoteAddr) {
}
protected AuthenticationInfo getAuthenticationInfo(AccessControl
accessControl,
- AuthenticationResult authenticationResult) {
+ AuthenticationResult
authenticationResult) {
LoginInfo loginInfo = getLoginInfo(accessControl);
if (loginInfo != null && loginInfo.getAuthenticationInfo() != null) {
return loginInfo.getAuthenticationInfo();
diff --git
a/acl-plug/src/main/java/org/apache/rocketmq/acl/plug/engine/PlainAclPlugEngine.java
b/acl-plug/src/main/java/org/apache/rocketmq/acl/plug/engine/PlainAclPlugEngine.java
index d1a7d9529..bcb89b8fa 100644
---
a/acl-plug/src/main/java/org/apache/rocketmq/acl/plug/engine/PlainAclPlugEngine.java
+++
b/acl-plug/src/main/java/org/apache/rocketmq/acl/plug/engine/PlainAclPlugEngine.java
@@ -16,18 +16,19 @@
*/
package org.apache.rocketmq.acl.plug.engine;
-import java.io.File;
-import java.io.FileInputStream;
-import java.io.IOException;
import org.apache.rocketmq.acl.plug.entity.BorkerAccessControlTransport;
import org.apache.rocketmq.acl.plug.entity.ControllerParameters;
import org.apache.rocketmq.acl.plug.exception.AclPlugRuntimeException;
import org.yaml.snakeyaml.Yaml;
+import java.io.File;
+import java.io.FileInputStream;
+import java.io.IOException;
+
public class PlainAclPlugEngine extends LoginInfoAclPlugEngine {
public PlainAclPlugEngine(
- ControllerParameters controllerParameters) throws
AclPlugRuntimeException {
+ ControllerParameters controllerParameters) throws
AclPlugRuntimeException {
super(controllerParameters);
}
diff --git
a/acl-plug/src/main/java/org/apache/rocketmq/acl/plug/entity/AccessControl.java
b/acl-plug/src/main/java/org/apache/rocketmq/acl/plug/entity/AccessControl.java
index cf3a736a7..092a97ef4 100644
---
a/acl-plug/src/main/java/org/apache/rocketmq/acl/plug/entity/AccessControl.java
+++
b/acl-plug/src/main/java/org/apache/rocketmq/acl/plug/entity/AccessControl.java
@@ -16,7 +16,9 @@
*/
package org.apache.rocketmq.acl.plug.entity;
-public class AccessControl {
+import org.apache.rocketmq.acl.AccessResource;
+
+public class AccessControl implements AccessResource {
private String account;
@@ -85,8 +87,8 @@ public void setTopic(String topic) {
public String toString() {
StringBuilder builder = new StringBuilder();
builder.append("AccessControl [account=").append(account).append(",
password=").append(password)
- .append(", netaddress=").append(netaddress).append(",
recognition=").append(recognition)
- .append(", code=").append(code).append(",
topic=").append(topic).append("]");
+ .append(", netaddress=").append(netaddress).append(",
recognition=").append(recognition)
+ .append(", code=").append(code).append(",
topic=").append(topic).append("]");
return builder.toString();
}
diff --git
a/acl-plug/src/main/java/org/apache/rocketmq/acl/plug/entity/AuthenticationInfo.java
b/acl-plug/src/main/java/org/apache/rocketmq/acl/plug/entity/AuthenticationInfo.java
index 981bef855..a1696e2e4 100644
---
a/acl-plug/src/main/java/org/apache/rocketmq/acl/plug/entity/AuthenticationInfo.java
+++
b/acl-plug/src/main/java/org/apache/rocketmq/acl/plug/entity/AuthenticationInfo.java
@@ -16,10 +16,11 @@
*/
package org.apache.rocketmq.acl.plug.entity;
+import org.apache.rocketmq.acl.plug.strategy.NetaddressStrategy;
+
import java.util.Iterator;
import java.util.Map;
import java.util.Map.Entry;
-import org.apache.rocketmq.acl.plug.strategy.NetaddressStrategy;
public class AuthenticationInfo {
@@ -30,7 +31,7 @@
private Map<Integer, Boolean> authority;
public AuthenticationInfo(Map<Integer, Boolean> authority, AccessControl
accessControl,
- NetaddressStrategy netaddressStrategy) {
+ NetaddressStrategy netaddressStrategy) {
super();
this.authority = authority;
this.accessControl = accessControl;
@@ -65,7 +66,7 @@ public void setAuthority(Map<Integer, Boolean> authority) {
public String toString() {
StringBuilder builder = new StringBuilder();
builder.append("AuthenticationInfo
[accessControl=").append(accessControl).append(", netaddressStrategy=")
- .append(netaddressStrategy).append(", authority={");
+ .append(netaddressStrategy).append(", authority={");
Iterator<Entry<Integer, Boolean>> it = authority.entrySet().iterator();
while (it.hasNext()) {
Entry<Integer, Boolean> e = it.next();
diff --git
a/acl-plug/src/main/java/org/apache/rocketmq/acl/plug/entity/BorkerAccessControl.java
b/acl-plug/src/main/java/org/apache/rocketmq/acl/plug/entity/BorkerAccessControl.java
index d40fadfac..b5eb1187d 100644
---
a/acl-plug/src/main/java/org/apache/rocketmq/acl/plug/entity/BorkerAccessControl.java
+++
b/acl-plug/src/main/java/org/apache/rocketmq/acl/plug/entity/BorkerAccessControl.java
@@ -556,8 +556,8 @@ public void setQueryConsumeQueue(boolean queryConsumeQueue)
{
public String toString() {
StringBuilder builder = new StringBuilder();
builder.append("BorkerAccessControl
[permitSendTopic=").append(permitSendTopic).append(", noPermitSendTopic=")
- .append(noPermitSendTopic).append(",
permitPullTopic=").append(permitPullTopic)
- .append(", noPermitPullTopic=").append(noPermitPullTopic);
+ .append(noPermitSendTopic).append(",
permitPullTopic=").append(permitPullTopic)
+ .append(", noPermitPullTopic=").append(noPermitPullTopic);
if (!!sendMessage)
builder.append(", sendMessage=").append(sendMessage);
if (!!sendMessageV2)
diff --git
a/acl-plug/src/main/java/org/apache/rocketmq/acl/plug/entity/ControllerParameters.java
b/acl-plug/src/main/java/org/apache/rocketmq/acl/plug/entity/ControllerParameters.java
index 708bcbeb9..94873b5fc 100644
---
a/acl-plug/src/main/java/org/apache/rocketmq/acl/plug/entity/ControllerParameters.java
+++
b/acl-plug/src/main/java/org/apache/rocketmq/acl/plug/entity/ControllerParameters.java
@@ -16,11 +16,12 @@
*/
package org.apache.rocketmq.acl.plug.entity;
+import org.apache.rocketmq.common.MixAll;
import org.apache.rocketmq.common.protocol.RequestCode;
public class ControllerParameters {
- private String fileHome;
+ private String fileHome =
System.getProperty(MixAll.ROCKETMQ_HOME_PROPERTY,
System.getenv(MixAll.ROCKETMQ_HOME_ENV));
private Class<?> accessContralAnalysisClass = RequestCode.class;
@@ -44,7 +45,7 @@ public void setAccessContralAnalysisClass(Class<?>
accessContralAnalysisClass) {
public String toString() {
StringBuilder builder = new StringBuilder();
builder.append("ControllerParametersEntity
[fileHome=").append(fileHome).append(", accessContralAnalysisClass=")
- .append(accessContralAnalysisClass).append("]");
+ .append(accessContralAnalysisClass).append("]");
return builder.toString();
}
diff --git
a/acl-plug/src/main/java/org/apache/rocketmq/acl/plug/entity/LoginInfo.java
b/acl-plug/src/main/java/org/apache/rocketmq/acl/plug/entity/LoginInfo.java
index e08d7d38b..df1166be6 100644
--- a/acl-plug/src/main/java/org/apache/rocketmq/acl/plug/entity/LoginInfo.java
+++ b/acl-plug/src/main/java/org/apache/rocketmq/acl/plug/entity/LoginInfo.java
@@ -74,8 +74,8 @@ public void setClear(AtomicBoolean clear) {
public String toString() {
StringBuilder builder = new StringBuilder();
builder.append("LoginInfo
[recognition=").append(recognition).append(", loginTime=").append(loginTime)
- .append(", operationTime=").append(operationTime).append(",
clear=").append(clear)
- .append(",
authenticationInfo=").append(authenticationInfo).append("]");
+ .append(", operationTime=").append(operationTime).append(",
clear=").append(clear)
+ .append(",
authenticationInfo=").append(authenticationInfo).append("]");
return builder.toString();
}
diff --git
a/acl-plug/src/main/java/org/apache/rocketmq/acl/plug/strategy/NetaddressStrategyFactory.java
b/acl-plug/src/main/java/org/apache/rocketmq/acl/plug/strategy/NetaddressStrategyFactory.java
index cdb78675e..4be995309 100644
---
a/acl-plug/src/main/java/org/apache/rocketmq/acl/plug/strategy/NetaddressStrategyFactory.java
+++
b/acl-plug/src/main/java/org/apache/rocketmq/acl/plug/strategy/NetaddressStrategyFactory.java
@@ -16,13 +16,14 @@
*/
package org.apache.rocketmq.acl.plug.strategy;
-import java.util.HashSet;
-import java.util.Set;
import org.apache.commons.lang3.StringUtils;
import org.apache.rocketmq.acl.plug.AclUtils;
import org.apache.rocketmq.acl.plug.entity.AccessControl;
import org.apache.rocketmq.acl.plug.exception.AclPlugRuntimeException;
+import java.util.HashSet;
+import java.util.Set;
+
public class NetaddressStrategyFactory {
public static final NullNetaddressStrategy NULL_NET_ADDRESS_STRATEGY = new
NullNetaddressStrategy();
diff --git
a/acl-plug/src/test/java/org/apache/rocketmq/acl/plug/AclRemotingServiceTest.java
b/acl-plug/src/test/java/org/apache/rocketmq/acl/plug/AclRemotingServiceTest.java
new file mode 100644
index 000000000..4830d6d75
--- /dev/null
+++
b/acl-plug/src/test/java/org/apache/rocketmq/acl/plug/AclRemotingServiceTest.java
@@ -0,0 +1,147 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.rocketmq.acl.plug;
+
+import java.util.HashMap;
+
+import org.apache.rocketmq.acl.AccessResource;
+import org.apache.rocketmq.acl.AccessValidator;
+import org.apache.rocketmq.acl.plug.entity.AccessControl;
+import org.apache.rocketmq.acl.plug.entity.AuthenticationResult;
+import org.apache.rocketmq.acl.plug.entity.BorkerAccessControl;
+import org.apache.rocketmq.acl.plug.exception.AclPlugRuntimeException;
+import org.apache.rocketmq.remoting.protocol.RemotingCommand;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;;
+
+public class AclRemotingServiceTest {
+
+
+ AclRemotingService defaultAclService;
+
+ AccessValidator accessValidator;
+
+ AccessControl accessControl;
+
+ AccessControl accessControlTwo;
+
+ @Before
+ public void init() {
+ System.setProperty("rocketmq.home.dir", "src/test/resources");
+ DefaultAclRemotingServiceImpl aclRemotingServiceImpl = new
DefaultAclRemotingServiceImpl();
+ defaultAclService = aclRemotingServiceImpl;
+ accessValidator = aclRemotingServiceImpl;
+
+ accessControl = new BorkerAccessControl();
+ accessControl.setAccount("RocketMQ");
+ accessControl.setPassword("1234567");
+ accessControl.setNetaddress("192.0.0.1");
+ accessControl.setRecognition("127.0.0.1:1");
+
+ accessControlTwo = new BorkerAccessControl();
+ accessControlTwo.setAccount("RocketMQ");
+ accessControlTwo.setPassword("1234567");
+ accessControlTwo.setNetaddress("192.0.2.1");
+ accessControlTwo.setRecognition("127.0.0.1:2");
+ }
+
+
+ @Test
+ public void defaultConstructorTest() {
+ System.setProperty("rocketmq.home.dir", "src/test/resources");
+ AclRemotingService defaultAclService = new
DefaultAclRemotingServiceImpl();
+ Assert.assertNotNull(defaultAclService);
+ }
+
+ @Test
+ public void parseTest() {
+ RemotingCommand remotingCommand =
RemotingCommand.createResponseCommand(34, "");
+ HashMap<String, String> map = new HashMap<>();
+ map.put("account", "RocketMQ");
+ map.put("password", "123456");
+ map.put("topic", "test");
+ remotingCommand.setExtFields(map);
+
+ AccessResource accessResource = accessValidator.parse(remotingCommand,
"127.0.0.1:123");
+ AccessControl accessControl = (AccessControl) accessResource;
+ AccessControl newAccessControl = new AccessControl();
+ newAccessControl.setAccount("RocketMQ");
+ newAccessControl.setPassword("123456");
+ newAccessControl.setTopic("test");
+ newAccessControl.setCode(34);
+ newAccessControl.setNetaddress("127.0.0.1");
+ newAccessControl.setRecognition("127.0.0.1:123");
+ Assert.assertEquals(accessControl.toString(),
newAccessControl.toString());
+ }
+
+ @Test
+ public void checkTest() {
+ accessControl.setCode(34);
+ AuthenticationResult authenticationResult =
defaultAclService.check(accessControl);
+ Assert.assertTrue(authenticationResult.isSucceed());
+ }
+
+ @Test(expected = AclPlugRuntimeException.class)
+ public void checkAccessExceptionTest() {
+ accessControl.setCode(34);
+ accessControl.setAccount("Rocketmq");
+ defaultAclService.check(accessControl);
+ }
+
+ @Test(expected = AclPlugRuntimeException.class)
+ public void checkPasswordTest() {
+ accessControl.setCode(34);
+ accessControl.setPassword("123123123");
+ defaultAclService.check(accessControl);
+ }
+
+ @Test(expected = AclPlugRuntimeException.class)
+ public void checkCodeTest() {
+ accessControl.setCode(14434);
+ accessControl.setPassword("123123123");
+ defaultAclService.check(accessControl);
+ }
+
+
+ @Test
+ public void validateTest() {
+ accessControl.setCode(34);
+ accessValidator.validate(accessControl);
+ }
+
+ @Test(expected = AclPlugRuntimeException.class)
+ public void validateAccessExceptionTest() {
+ accessControl.setCode(34);
+ accessControl.setAccount("Rocketmq");
+ accessValidator.validate(accessControl);
+ }
+
+ @Test(expected = AclPlugRuntimeException.class)
+ public void validatePasswordTest() {
+ accessControl.setCode(34);
+ accessControl.setPassword("123123123");
+ accessValidator.validate(accessControl);
+ }
+
+ @Test(expected = AclPlugRuntimeException.class)
+ public void validateCodeTest() {
+ accessControl.setCode(14434);
+ accessControl.setPassword("123123123");
+ accessValidator.validate(accessControl);
+ }
+}
diff --git
a/acl-plug/src/test/java/org/apache/rocketmq/acl/plug/AclUtilsTest.java
b/acl-plug/src/test/java/org/apache/rocketmq/acl/plug/AclUtilsTest.java
index 806d18089..b0cc4daba 100644
--- a/acl-plug/src/test/java/org/apache/rocketmq/acl/plug/AclUtilsTest.java
+++ b/acl-plug/src/test/java/org/apache/rocketmq/acl/plug/AclUtilsTest.java
@@ -18,13 +18,12 @@
import java.util.ArrayList;
import java.util.List;
+
import org.apache.commons.lang3.StringUtils;
import org.junit.Assert;
import org.junit.Test;
-import org.junit.runner.RunWith;
-import org.mockito.junit.MockitoJUnitRunner;
-@RunWith(MockitoJUnitRunner.class)
+
public class AclUtilsTest {
@Test
diff --git
a/acl-plug/src/test/java/org/apache/rocketmq/acl/plug/engine/PlainAclPlugEngineTest.java
b/acl-plug/src/test/java/org/apache/rocketmq/acl/plug/engine/PlainAclPlugEngineTest.java
index c925ef412..83004bc2c 100644
---
a/acl-plug/src/test/java/org/apache/rocketmq/acl/plug/engine/PlainAclPlugEngineTest.java
+++
b/acl-plug/src/test/java/org/apache/rocketmq/acl/plug/engine/PlainAclPlugEngineTest.java
@@ -57,46 +57,14 @@
@Before
public void init() throws NoSuchFieldException, SecurityException,
IOException {
+ System.setProperty("rocketmq.home.dir", "src/test/resources");
+ ControllerParameters controllerParametersEntity = new
ControllerParameters();
Yaml ymal = new Yaml();
- String home = System.getProperty(MixAll.ROCKETMQ_HOME_PROPERTY,
System.getenv(MixAll.ROCKETMQ_HOME_ENV));
- InputStream fis = null;
- if (home == null) {
- URL url = PlainAclPlugEngineTest.class.getResource("/");
- home = url.toString();
- home = home.substring(0, home.length() - 1).replace("file:/",
"").replace("target/test-classes", "");
- home = home + "src/test/resources";
- if (!new File(home + "/conf/transport.yml").exists()) {
- home =
"/home/travis/build/githublaohu/rocketmq/acl-plug/src/test/resources";
- }
- }
- String filePath = home + "/conf/transport.yml";
- try {
- fis = new FileInputStream(new File(filePath));
- transport = ymal.loadAs(fis,
BorkerAccessControlTransport.class);
- }catch(Exception e) {
- AccessControl accessControl = new BorkerAccessControl();
- accessControl.setAccount("onlyNetAddress");
- accessControl.setPassword("aliyun11");
- accessControl.setNetaddress("127.0.0.1");
- accessControl.setRecognition("127.0.0.1:1");
-
- AccessControl accessControlTwo = new BorkerAccessControl();
- accessControlTwo.setAccount("listTransport");
- accessControlTwo.setPassword("aliyun1");
- accessControlTwo.setNetaddress("127.0.0.1");
- accessControlTwo.setRecognition("127.0.0.1:2");
- transport = new BorkerAccessControlTransport();
- transport.setOnlyNetAddress((BorkerAccessControl)accessControl);
-
- }
- ControllerParameters controllerParametersEntity = new
ControllerParameters();
- controllerParametersEntity.setFileHome(null);
- try {
- plainAclPlugEngine = new
PlainAclPlugEngine(controllerParametersEntity);
- plainAclPlugEngine.initialize();
- } catch (Exception e) {
+ transport = ymal.loadAs(new FileInputStream(new
File(controllerParametersEntity.getFileHome()+"/conf/transport.yml")),
BorkerAccessControlTransport.class);
+
+ plainAclPlugEngine = new
PlainAclPlugEngine(controllerParametersEntity);
+ plainAclPlugEngine.initialize();
- }
accessControl = new BorkerAccessControl();
accessControl.setAccount("rokcetmq");
@@ -142,6 +110,7 @@ public void passWordThanTest() {
@Test(expected = AclPlugRuntimeException.class)
public void testPlainAclPlugEngineInit() {
ControllerParameters controllerParametersEntity = new
ControllerParameters();
+ controllerParametersEntity.setFileHome("");
new PlainAclPlugEngine(controllerParametersEntity).initialize();
}
diff --git a/broker/pom.xml b/broker/pom.xml
index c353eb32b..f617d2492 100644
--- a/broker/pom.xml
+++ b/broker/pom.xml
@@ -50,7 +50,7 @@
</dependency>
<dependency>
<groupId>${project.groupId}</groupId>
- <artifactId>rocketmq-acl-plug</artifactId>
+ <artifactId>rocketmq-acl</artifactId>
</dependency>
<dependency>
<groupId>ch.qos.logback</groupId>
diff --git
a/broker/src/main/java/org/apache/rocketmq/broker/BrokerController.java
b/broker/src/main/java/org/apache/rocketmq/broker/BrokerController.java
index 7a4c105e7..a6da44b64 100644
--- a/broker/src/main/java/org/apache/rocketmq/broker/BrokerController.java
+++ b/broker/src/main/java/org/apache/rocketmq/broker/BrokerController.java
@@ -32,7 +32,6 @@
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import org.apache.rocketmq.acl.AccessValidator;
-import org.apache.rocketmq.acl.plug.AclPlugController;
import org.apache.rocketmq.broker.client.ClientHousekeepingService;
import org.apache.rocketmq.broker.client.ConsumerIdsChangeListener;
import org.apache.rocketmq.broker.client.ConsumerManager;
@@ -160,7 +159,6 @@
private TransactionalMessageService transactionalMessageService;
private AbstractTransactionalMessageCheckListener
transactionalMessageCheckListener;
- private AclPlugController aclPlugController;
public BrokerController(
final BrokerConfig brokerConfig,
@@ -510,7 +508,7 @@ private void initialAcl() {
@Override
public void doBeforeRequest(String remoteAddr, RemotingCommand
request) {
- validator.validate(validator.parse(request));
+ validator.validate(validator.parse(request, remoteAddr));
}
@Override
@@ -1095,9 +1093,6 @@ public void setTransactionalMessageCheckListener(
this.transactionalMessageCheckListener =
transactionalMessageCheckListener;
}
- public AclPlugController getAclPlugController() {
- return this.aclPlugController;
- }
public BlockingQueue<Runnable> getEndTransactionThreadPoolQueue() {
return endTransactionThreadPoolQueue;
diff --git
a/broker/src/main/java/org/apache/rocketmq/broker/client/ClientHousekeepingService.java
b/broker/src/main/java/org/apache/rocketmq/broker/client/ClientHousekeepingService.java
index f4ecc2c04..d536db505 100644
---
a/broker/src/main/java/org/apache/rocketmq/broker/client/ClientHousekeepingService.java
+++
b/broker/src/main/java/org/apache/rocketmq/broker/client/ClientHousekeepingService.java
@@ -72,9 +72,6 @@ public void onChannelClose(String remoteAddr, Channel
channel) {
this.brokerController.getProducerManager().doChannelCloseEvent(remoteAddr,
channel);
this.brokerController.getConsumerManager().doChannelCloseEvent(remoteAddr,
channel);
this.brokerController.getFilterServerManager().doChannelCloseEvent(remoteAddr,
channel);
- if (this.brokerController.getAclPlugController() != null &&
this.brokerController.getAclPlugController().isStartSucceed()) {
-
this.brokerController.getAclPlugController().doChannelCloseEvent(remoteAddr);
- }
}
@Override
diff --git
a/broker/src/test/java/org/apache/rocketmq/broker/util/ServiceProviderTest.java
b/broker/src/test/java/org/apache/rocketmq/broker/util/ServiceProviderTest.java
index 22228a6e0..a3a35c883 100644
---
a/broker/src/test/java/org/apache/rocketmq/broker/util/ServiceProviderTest.java
+++
b/broker/src/test/java/org/apache/rocketmq/broker/util/ServiceProviderTest.java
@@ -17,12 +17,15 @@
package org.apache.rocketmq.broker.util;
+import org.apache.rocketmq.acl.AccessValidator;
import
org.apache.rocketmq.broker.transaction.AbstractTransactionalMessageCheckListener;
import org.apache.rocketmq.broker.transaction.TransactionalMessageService;
import org.junit.Test;
import static org.assertj.core.api.Assertions.assertThat;
+import java.util.List;
+
public class ServiceProviderTest {
@Test
@@ -38,4 +41,10 @@ public void loadAbstractTransactionListenerTest() {
AbstractTransactionalMessageCheckListener.class);
assertThat(listener).isNotNull();
}
+
+ @Test
+ public void loadAccessValidatorTest() {
+ List<AccessValidator> accessValidators =
ServiceProvider.load(ServiceProvider.ACL_VALIDATOR_ID, AccessValidator.class);
+ assertThat(accessValidators).isNotNull();
+ }
}
diff --git
a/broker/src/test/resources/META-INF/service/org.apache.rocketmq.acl.AccessValidator
b/broker/src/test/resources/META-INF/service/org.apache.rocketmq.acl.AccessValidator
new file mode 100644
index 000000000..2f26220e5
--- /dev/null
+++
b/broker/src/test/resources/META-INF/service/org.apache.rocketmq.acl.AccessValidator
@@ -0,0 +1 @@
+org.apache.rocketmq.acl.plug.DefaultAclRemotingServiceImpl
\ No newline at end of file
diff --git a/distribution/conf/broker.conf b/distribution/conf/broker.conf
index 363bcbc03..970395735 100644
--- a/distribution/conf/broker.conf
+++ b/distribution/conf/broker.conf
@@ -20,5 +20,5 @@ deleteWhen = 04
fileReservedTime = 48
brokerRole = ASYNC_MASTER
flushDiskType = ASYNC_FLUSH
-aclPlug=true
+enableAcl=true
namesrvAddr=127.0.0.1:9876
diff --git a/distribution/conf/transport.yml b/distribution/conf/transport.yml
index 25d4902a6..f8180ede0 100644
--- a/distribution/conf/transport.yml
+++ b/distribution/conf/transport.yml
@@ -14,21 +14,21 @@
# limitations under the License.
onlyNetAddress:
- netaddress: 10.10.103.*
+ netaddress: 192.168.0.*
noPermitPullTopic:
- broker-a
list:
- account: RocketMQ
password: 1234567
- netaddress: 192.0.0.*
+ netaddress: 192.168.0.*
permitSendTopic:
- - test1
+ - TopicTest
- test2
- account: RocketMQ
password: 1234567
- netaddress: 192.0.2.1
+ netaddress: 192.168.2.1
permitSendTopic:
- test3
- test4
-
\ No newline at end of file
+
diff --git
a/example/src/main/java/org/apache/rocketmq/example/simple/AclClient.java
b/example/src/main/java/org/apache/rocketmq/example/simple/AclClient.java
new file mode 100644
index 000000000..d696c91a9
--- /dev/null
+++ b/example/src/main/java/org/apache/rocketmq/example/simple/AclClient.java
@@ -0,0 +1,206 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.rocketmq.example.simple;
+
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import org.apache.rocketmq.client.consumer.DefaultMQPullConsumer;
+import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
+import org.apache.rocketmq.client.consumer.PullResult;
+import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext;
+import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
+import
org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently;
+import
org.apache.rocketmq.client.consumer.rebalance.AllocateMessageQueueAveragely;
+import org.apache.rocketmq.client.exception.MQClientException;
+import org.apache.rocketmq.client.producer.DefaultMQProducer;
+import org.apache.rocketmq.client.producer.SendResult;
+import org.apache.rocketmq.common.consumer.ConsumeFromWhere;
+import org.apache.rocketmq.common.message.Message;
+import org.apache.rocketmq.common.message.MessageExt;
+import org.apache.rocketmq.common.message.MessageQueue;
+import org.apache.rocketmq.remoting.RPCHook;
+import org.apache.rocketmq.remoting.common.RemotingHelper;
+import org.apache.rocketmq.remoting.protocol.RemotingCommand;
+
+/**
+ *
+ * English explain
+ * 1. broker module
src/test/resources/META-INF/service/org.apache.rocketmq.acl.AccessValidator
copy to src/java/resources/META-INF/service.
+ *
+ * 2. view the /conf/transport.yml file under the distribution module, pay
attention to the account password, IP.
+ *
+ * 3. Modify ALC_RCP_HOOK_ACCOUT and ACL_RCP_HOOK_PASSWORD to the
corresponding account password in transport.yml
+ *
+ */
+public class AclClient {
+
+ private static final Map<MessageQueue, Long> OFFSE_TABLE = new
HashMap<MessageQueue, Long>();
+
+ private static final String ACL_RCPHOOK_ACCOUT = "RocketMQ";
+
+ private static final String ACL_RCPHOOK_PASSWORD = "1234567";
+
+ public static void main(String[] args) throws MQClientException,
InterruptedException {
+ producer();
+ pushConsumer();
+ pullConsumer();
+ }
+
+ public static void producer() throws MQClientException {
+ DefaultMQProducer producer = new
DefaultMQProducer("ProducerGroupName", getAalRPCHook());
+ producer.setNamesrvAddr("127.0.0.1:9876");
+ producer.start();
+
+ for (int i = 0; i < 128; i++)
+ try {
+ {
+ Message msg = new Message("TopicTest",
+ "TagA",
+ "OrderID188",
+ "Hello
world".getBytes(RemotingHelper.DEFAULT_CHARSET));
+ SendResult sendResult = producer.send(msg);
+ System.out.printf("%s%n", sendResult);
+ }
+
+ } catch (Exception e) {
+ e.printStackTrace();
+ }
+
+ producer.shutdown();
+ }
+
+ public static void pushConsumer() throws MQClientException {
+
+ DefaultMQPushConsumer consumer = new
DefaultMQPushConsumer("please_rename_unique_group_name_5", getAalRPCHook(), new
AllocateMessageQueueAveragely());
+ consumer.setNamesrvAddr("127.0.0.1:9876");
+ consumer.subscribe("TopicTest", "*");
+
consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);
+ //wrong time format 2017_0422_221800
+ consumer.setConsumeTimestamp("20180422221800");
+ consumer.registerMessageListener(new MessageListenerConcurrently() {
+
+ @Override
+ public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt>
msgs, ConsumeConcurrentlyContext context) {
+ System.out.printf("%s Receive New Messages: %s %n",
Thread.currentThread().getName(), msgs);
+ printBody(msgs);
+ return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
+ }
+ });
+ consumer.start();
+ System.out.printf("Consumer Started.%n");
+ }
+
+ public static void pullConsumer() throws MQClientException {
+ DefaultMQPullConsumer consumer = new
DefaultMQPullConsumer("please_rename_unique_group_name_6", getAalRPCHook());
+ consumer.setNamesrvAddr("127.0.0.1:9876");
+ consumer.start();
+
+ Set<MessageQueue> mqs =
consumer.fetchSubscribeMessageQueues("TopicTest");
+ for (MessageQueue mq : mqs) {
+ System.out.printf("Consume from the queue: %s%n", mq);
+ SINGLE_MQ:
+ while (true) {
+ try {
+ PullResult pullResult =
+ consumer.pullBlockIfNotFound(mq, null,
getMessageQueueOffset(mq), 32);
+ System.out.printf("%s%n", pullResult);
+ putMessageQueueOffset(mq, pullResult.getNextBeginOffset());
+ printBody(pullResult);
+ switch (pullResult.getPullStatus()) {
+ case FOUND:
+ break;
+ case NO_MATCHED_MSG:
+ break;
+ case NO_NEW_MSG:
+ break SINGLE_MQ;
+ case OFFSET_ILLEGAL:
+ break;
+ default:
+ break;
+ }
+ } catch (Exception e) {
+ e.printStackTrace();
+ }
+ }
+ }
+
+ consumer.shutdown();
+ }
+
+ private static void printBody(PullResult pullResult) {
+ printBody(pullResult.getMsgFoundList());
+ }
+
+ private static void printBody(List<MessageExt> msg) {
+ if (msg == null || msg.size() == 0)
+ return;
+ for (MessageExt m : msg) {
+ if (m != null) {
+ System.out.printf("msgId : %s body : %s \n\r", m.getMsgId(),
new String(m.getBody()));
+ }
+ }
+ }
+
+ private static long getMessageQueueOffset(MessageQueue mq) {
+ Long offset = OFFSE_TABLE.get(mq);
+ if (offset != null)
+ return offset;
+
+ return 0;
+ }
+
+ private static void putMessageQueueOffset(MessageQueue mq, long offset) {
+ OFFSE_TABLE.put(mq, offset);
+ }
+
+ static RPCHook getAalRPCHook() {
+ return new AalRPCHook(ACL_RCPHOOK_ACCOUT, ACL_RCPHOOK_PASSWORD);
+ }
+
+ static class AalRPCHook implements RPCHook {
+
+ private String account;
+
+ private String password;
+
+ public AalRPCHook(String account, String password) {
+ this.account = account;
+ this.password = password;
+ }
+
+ @Override
+ public void doBeforeRequest(String remoteAddr, RemotingCommand
request) {
+
+ HashMap<String, String> ext = request.getExtFields();
+ if (ext == null) {
+ ext = new HashMap<>();
+ request.setExtFields(ext);
+ }
+ ext.put("account", this.account);
+ ext.put("password", this.password);
+ }
+
+ @Override
+ public void doAfterResponse(String remoteAddr, RemotingCommand
request, RemotingCommand response) {
+ // TODO Auto-generated method stub
+
+ }
+
+ }
+}
diff --git a/pom.xml b/pom.xml
index 535893c21..4fe56a4bc 100644
--- a/pom.xml
+++ b/pom.xml
@@ -525,7 +525,7 @@
</dependency>
<dependency>
<groupId>${project.groupId}</groupId>
- <artifactId>rocketmq-acl-plug</artifactId>
+ <artifactId>rocketmq-acl</artifactId>
<version>${project.version}</version>
</dependency>
<dependency>
diff --git
a/remoting/src/main/java/org/apache/rocketmq/remoting/netty/NettyRemotingClient.java
b/remoting/src/main/java/org/apache/rocketmq/remoting/netty/NettyRemotingClient.java
index 90f51ff05..fc9df37c6 100644
---
a/remoting/src/main/java/org/apache/rocketmq/remoting/netty/NettyRemotingClient.java
+++
b/remoting/src/main/java/org/apache/rocketmq/remoting/netty/NettyRemotingClient.java
@@ -282,7 +282,7 @@ public void closeChannel(final String addr, final Channel
channel) {
@Override
public void registerRPCHook(RPCHook rpcHook) {
- if (!rpcHooks.contains(rpcHook)) {
+ if (rpcHook != null && !rpcHooks.contains(rpcHook)) {
rpcHooks.add(rpcHook);
}
}
diff --git
a/remoting/src/main/java/org/apache/rocketmq/remoting/netty/NettyRemotingServer.java
b/remoting/src/main/java/org/apache/rocketmq/remoting/netty/NettyRemotingServer.java
index ec34a4be7..c2f3ba48d 100644
---
a/remoting/src/main/java/org/apache/rocketmq/remoting/netty/NettyRemotingServer.java
+++
b/remoting/src/main/java/org/apache/rocketmq/remoting/netty/NettyRemotingServer.java
@@ -265,7 +265,7 @@ public void shutdown() {
@Override
public void registerRPCHook(RPCHook rpcHook) {
- if (!rpcHooks.contains(rpcHook)) {
+ if (rpcHook != null && !rpcHooks.contains(rpcHook)) {
rpcHooks.add(rpcHook);
}
}
diff --git a/test/src/test/java/org/apache/rocketmq/test/base/BaseConf.java
b/test/src/test/java/org/apache/rocketmq/test/base/BaseConf.java
index 5027a3cce..a05a55a06 100644
--- a/test/src/test/java/org/apache/rocketmq/test/base/BaseConf.java
+++ b/test/src/test/java/org/apache/rocketmq/test/base/BaseConf.java
@@ -19,9 +19,13 @@
import java.util.ArrayList;
import java.util.List;
+
import org.apache.log4j.Logger;
import org.apache.rocketmq.broker.BrokerController;
+import org.apache.rocketmq.common.MQVersion;
import org.apache.rocketmq.namesrv.NamesrvController;
+import org.apache.rocketmq.remoting.netty.TlsSystemConfig;
+import org.apache.rocketmq.remoting.protocol.RemotingCommand;
import org.apache.rocketmq.test.client.rmq.RMQAsyncSendProducer;
import org.apache.rocketmq.test.client.rmq.RMQNormalConsumer;
import org.apache.rocketmq.test.client.rmq.RMQNormalProducer;
@@ -48,6 +52,7 @@
private static Logger log = Logger.getLogger(BaseConf.class);
static {
+ System.setProperty(RemotingCommand.REMOTING_VERSION_KEY,
Integer.toString(MQVersion.CURRENT_VERSION));
namesrvController = IntegrationTestBase.createAndStartNamesrv();
nsAddr = "127.0.0.1:" +
namesrvController.getNettyServerConfig().getListenPort();
brokerController1 = IntegrationTestBase.createAndStartBroker(nsAddr);
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]
With regards,
Apache Git Services