vongosling closed pull request #413: No 4. add acl URL: https://github.com/apache/rocketmq/pull/413
This is a PR merged from a forked repository. As GitHub hides the original diff on merge, it is displayed below for the sake of provenance: As this is a foreign pull request (from a fork), the diff is supplied below (as it won't show otherwise due to GitHub magic): diff --git a/acl/acl-center/pom.xml b/acl/acl-center/pom.xml new file mode 100644 index 000000000..9e7c3c9a1 --- /dev/null +++ b/acl/acl-center/pom.xml @@ -0,0 +1,29 @@ +<?xml version="1.0" encoding="UTF-8"?> +<project xmlns="http://maven.apache.org/POM/4.0.0" + xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" + xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> + <parent> + <artifactId>acl</artifactId> + <groupId>org.apache.rocketmq</groupId> + <version>4.4.0-SNAPSHOT</version> + </parent> + <modelVersion>4.0.0</modelVersion> + + <artifactId>acl-center</artifactId> + + <dependencies> + <dependency> + <groupId>org.apache.rocketmq</groupId> + <artifactId>acl-dao</artifactId> + <version>4.4.0-SNAPSHOT</version> + </dependency> + <dependency> + <groupId>org.springframework.boot</groupId> + <artifactId>spring-boot-starter-web</artifactId> + <version>2.0.3.RELEASE</version> + </dependency> + + </dependencies> + + +</project> \ No newline at end of file diff --git a/acl/acl-center/src/main/java/org/apache/rocketmq/acl/AclService.java b/acl/acl-center/src/main/java/org/apache/rocketmq/acl/AclService.java new file mode 100644 index 000000000..982393dc3 --- /dev/null +++ b/acl/acl-center/src/main/java/org/apache/rocketmq/acl/AclService.java @@ -0,0 +1,9 @@ +package org.apache.rocketmq.acl; + +public interface AclService { + + public boolean aclCheck(String topic, String appName); + + public boolean aclCheck(String topic, String appName,String userInfo); + +} diff --git a/acl/acl-center/src/main/java/org/apache/rocketmq/acl/Application.java b/acl/acl-center/src/main/java/org/apache/rocketmq/acl/Application.java new file mode 100644 index 000000000..b67b4555f --- /dev/null +++ b/acl/acl-center/src/main/java/org/apache/rocketmq/acl/Application.java @@ -0,0 +1,11 @@ +package org.apache.rocketmq.acl; + +import org.springframework.boot.SpringApplication; +import org.springframework.boot.autoconfigure.SpringBootApplication; + +@SpringBootApplication +public class Application { + public static void main(String[] args) { + SpringApplication.run(Application.class, args); + } +} diff --git a/acl/acl-center/src/main/java/org/apache/rocketmq/acl/controller/AclController.java b/acl/acl-center/src/main/java/org/apache/rocketmq/acl/controller/AclController.java new file mode 100644 index 000000000..c4c7703d6 --- /dev/null +++ b/acl/acl-center/src/main/java/org/apache/rocketmq/acl/controller/AclController.java @@ -0,0 +1,29 @@ +package org.apache.rocketmq.acl.controller; + + +import org.apache.rocketmq.acl.AclService; +import org.apache.rocketmq.acl.service.AclServiceImpl; +import org.springframework.web.bind.annotation.*; + +import java.io.IOException; + +@RestController +@RequestMapping("api/acl") +public class AclController { + private AclService aclService ; + + @RequestMapping(method = RequestMethod.GET, value = "/aclCheck/{topic}/{appName}") + @ResponseBody + public Boolean aclCheck(@PathVariable String topic, @PathVariable String appName) throws IOException { + aclService = new AclServiceImpl(); + return aclService.aclCheck(topic, appName); + } + + @RequestMapping(method = RequestMethod.GET, value = "/aclCheck/{topic}/{appName}/{userInfo}") + @ResponseBody + public Boolean aclCheck(@PathVariable String topic, @PathVariable String appName,@PathVariable String userInfo) + throws IOException { + aclService = new AclServiceImpl(); + return aclService.aclCheck(topic, appName,userInfo); + } +} diff --git a/acl/acl-center/src/main/java/org/apache/rocketmq/acl/service/AclServiceImpl.java b/acl/acl-center/src/main/java/org/apache/rocketmq/acl/service/AclServiceImpl.java new file mode 100644 index 000000000..b9a3c02db --- /dev/null +++ b/acl/acl-center/src/main/java/org/apache/rocketmq/acl/service/AclServiceImpl.java @@ -0,0 +1,33 @@ +package org.apache.rocketmq.acl.service; + +import org.apache.commons.collections4.CollectionUtils; +import org.apache.rocketmq.acl.AclService; +import org.apache.rocketmq.acl.dao.domain.AclDO; +import org.apache.rocketmq.acl.dao.manager.BaseDAO; +import org.apache.rocketmq.acl.dao.query.AclQuery; + +import java.util.List; + +public class AclServiceImpl implements AclService { + + @Override + public boolean aclCheck(String topic, String appName) { + AclQuery aclQuery = new AclQuery(); + aclQuery.setTopic(topic); + aclQuery.setAppName(appName); + List<AclDO> list = BaseDAO.getInstance().select(aclQuery); + + return CollectionUtils.isEmpty(list); + } + + @Override + public boolean aclCheck(String topic, String appName, String userInfo) { + AclQuery aclQuery = new AclQuery(); + aclQuery.setTopic(topic); + aclQuery.setAppName(appName); + aclQuery.setUserInfo(userInfo); + List<AclDO> list = BaseDAO.getInstance().select(aclQuery); + + return CollectionUtils.isEmpty(list); + } +} diff --git a/acl/acl-dao/pom.xml b/acl/acl-dao/pom.xml new file mode 100644 index 000000000..ae376820f --- /dev/null +++ b/acl/acl-dao/pom.xml @@ -0,0 +1,48 @@ +<?xml version="1.0" encoding="UTF-8"?> +<project xmlns="http://maven.apache.org/POM/4.0.0" + xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" + xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> + <parent> + <artifactId>acl</artifactId> + <groupId>org.apache.rocketmq</groupId> + <version>4.4.0-SNAPSHOT</version> + </parent> + <modelVersion>4.0.0</modelVersion> + + <artifactId>acl-dao</artifactId> + + <dependencies> + <dependency> + <groupId>org.projectlombok</groupId> + <artifactId>lombok</artifactId> + </dependency> + <dependency> + <groupId>com.google.code.gson</groupId> + <artifactId>gson</artifactId> + </dependency> + <dependency> + <groupId>org.mybatis</groupId> + <artifactId>mybatis</artifactId> + <version>3.4.6</version> + </dependency> + <dependency> + <groupId>mysql</groupId> + <artifactId>mysql-connector-java</artifactId> + <version>5.1.42</version> + </dependency> + <dependency> + <groupId>org.apache.commons</groupId> + <artifactId>commons-collections4</artifactId> + </dependency> + <dependency> + <groupId>com.google.guava</groupId> + <artifactId>guava</artifactId> + </dependency> + <dependency> + <groupId>org.apache.commons</groupId> + <artifactId>commons-lang3</artifactId> + </dependency> + </dependencies> + + +</project> \ No newline at end of file diff --git a/acl/acl-dao/src/main/java/org/apache/rocketmq/acl/dao/domain/AclDO.java b/acl/acl-dao/src/main/java/org/apache/rocketmq/acl/dao/domain/AclDO.java new file mode 100644 index 000000000..6e32e2f08 --- /dev/null +++ b/acl/acl-dao/src/main/java/org/apache/rocketmq/acl/dao/domain/AclDO.java @@ -0,0 +1,105 @@ +package org.apache.rocketmq.acl.dao.domain; + +import com.google.common.reflect.TypeToken; +import com.google.gson.Gson; +import org.apache.commons.collections4.MapUtils; +import org.apache.commons.lang3.builder.ToStringBuilder; +import org.apache.commons.lang3.builder.ToStringStyle; + +import java.io.Serializable; +import java.util.Date; +import java.util.Map; + +/** + * Acldo + * @author chujie.gcj + * @date 2018-08-14 + */ +public class AclDO implements Serializable { + private static final long serialVersionUID = 1884828297962130471L; + + private Long id; + private Date gmtCreate; + private Date gmtModify; + private Integer rowStatus; + private Long rowVersion; + + private String topic; + private String appName; + private String userInfo; + + private Map<String,Object> attributeMap; + + public Long getId(){ + return this.id; + } + public void setId(Long id){ + this.id = id; + } + + public Date getGmtCreate(){ + return this.gmtCreate; + } + public void setGmtCreate(Date gmtCreate){ + this.gmtCreate = gmtCreate; + } + + public Date getGmtModify(){ + return this.gmtModify; + } + public void setGmtModify(Date gmtModify){ + this.gmtModify = gmtModify; + } + + public Long getRowVersion(){ + return this.rowVersion; + } + public void setRowVersion(Long rowVersion){ + this.rowVersion = rowVersion; + } + + public Integer getRowStatus(){ + return this.rowStatus; + } + public void setRowStatus(Integer rowStatus){ + this.rowStatus = rowStatus; + } + + public String getTopic(){ + return this.topic; + } + public void setTopic(String topic){ + this.topic = topic; + } + + public String getAppName(){ + return this.appName; + } + public void setAppName(String appName){ + this.appName = appName; + } + + public String getUserInfo(){ + return this.userInfo; + } + public void setUserInfo(String userInfo){ + this.userInfo = userInfo; + } + + public String getAttribute(){ + if(MapUtils.isEmpty(attributeMap)){ + return null ; + } + return new Gson().toJson(attributeMap); + } + public void setAttribute(String attribute){ + this.attributeMap = new Gson().fromJson(attribute,new TypeToken<Map<String,Object>>(){}.getType()); + } + + @Override + public String toString(){ + return ToStringBuilder.reflectionToString(this, ToStringStyle.SHORT_PREFIX_STYLE); + } + +} + diff --git a/acl/acl-dao/src/main/java/org/apache/rocketmq/acl/dao/manager/BaseDAO.java b/acl/acl-dao/src/main/java/org/apache/rocketmq/acl/dao/manager/BaseDAO.java new file mode 100644 index 000000000..cd4940f36 --- /dev/null +++ b/acl/acl-dao/src/main/java/org/apache/rocketmq/acl/dao/manager/BaseDAO.java @@ -0,0 +1,85 @@ +package org.apache.rocketmq.acl.dao.manager; + +import org.apache.ibatis.session.SqlSession; +import org.apache.rocketmq.acl.dao.domain.AclDO; +import org.apache.rocketmq.acl.dao.mapper.DataMapper; +import org.apache.rocketmq.acl.dao.query.AclQuery; + +import java.util.List; + +/** + * 基础dao + * @author chjie.gcj + * @date 2018-08-13 + */ +public class BaseDAO { + private static BaseDAO instance=new BaseDAO(); + private BaseDAO(){ } + public static BaseDAO getInstance(){ + return instance; + } + + private SqlSession session; + private DataMapper mapper; + + private void init(){ + session = DBtool.getSession(); + mapper = session.getMapper(DataMapper.class); + } + private void commit(){ + session.commit(); + } + private void close(){ + session.close(); + } + + public List<AclDO> select(AclQuery query){ + this.init(); + try { + List<AclDO> info=mapper.select(query); + this.commit(); + return info; + } catch (Exception e) { + e.printStackTrace(); + return null; + }finally { + this.close(); + } + } + + public Long insert(AclDO aclDO){ + this.init(); + try { + aclDO.setRowStatus(0); + aclDO.setRowVersion(1L); + Long id = mapper.insert(aclDO); + this.commit(); + return id; + } catch (Exception e) { + e.printStackTrace(); + return null; + }finally { + this.close(); + } + } + + public Integer delete(){ + this.init(); + try { + AclDO aclDO = new AclDO(); + aclDO.setRowStatus(-1); + Integer res = mapper.update(aclDO); + this.commit(); + return res; + } catch (Exception e) { + e.printStackTrace(); + return null; + }finally { + this.close(); + } + } + + + + +} diff --git a/acl/acl-dao/src/main/java/org/apache/rocketmq/acl/dao/manager/DBtool.java b/acl/acl-dao/src/main/java/org/apache/rocketmq/acl/dao/manager/DBtool.java new file mode 100644 index 000000000..e63a12122 --- /dev/null +++ b/acl/acl-dao/src/main/java/org/apache/rocketmq/acl/dao/manager/DBtool.java @@ -0,0 +1,29 @@ +package org.apache.rocketmq.acl.dao.manager; + +import org.apache.ibatis.io.Resources; +import org.apache.ibatis.session.SqlSession; +import org.apache.ibatis.session.SqlSessionFactory; +import org.apache.ibatis.session.SqlSessionFactoryBuilder; + +import java.io.Reader; + +class DBtool { + + private static SqlSessionFactory sessionFactory; + + static{ + try { + //使用MyBatis提供的Resources类加载mybatis的配置文件 + Reader reader = Resources.getResourceAsReader("mybatis/mybatis-config.xml"); + //构建sqlSession的工厂 + sessionFactory = new SqlSessionFactoryBuilder().build(reader); + } catch (Exception e) { + e.printStackTrace(); + } + + } + //创建能执行映射文件中sql的sqlSession + static SqlSession getSession(){ + return sessionFactory.openSession(); + } +} diff --git a/acl/acl-dao/src/main/java/org/apache/rocketmq/acl/dao/mapper/DataMapper.java b/acl/acl-dao/src/main/java/org/apache/rocketmq/acl/dao/mapper/DataMapper.java new file mode 100644 index 000000000..addb2a955 --- /dev/null +++ b/acl/acl-dao/src/main/java/org/apache/rocketmq/acl/dao/mapper/DataMapper.java @@ -0,0 +1,18 @@ +package org.apache.rocketmq.acl.dao.mapper; + +import org.apache.rocketmq.acl.dao.domain.AclDO; +import org.apache.rocketmq.acl.dao.query.AclQuery; + +import java.util.List; + +public interface DataMapper { + + Long insert(AclDO dataObject); + + Integer update(AclDO dataObject); + + List<AclDO> select(AclQuery query); + + Integer count(AclQuery query); + +} diff --git a/acl/acl-dao/src/main/java/org/apache/rocketmq/acl/dao/query/AclQuery.java b/acl/acl-dao/src/main/java/org/apache/rocketmq/acl/dao/query/AclQuery.java new file mode 100644 index 000000000..9de55a04a --- /dev/null +++ b/acl/acl-dao/src/main/java/org/apache/rocketmq/acl/dao/query/AclQuery.java @@ -0,0 +1,35 @@ +package org.apache.rocketmq.acl.dao.query; + +import lombok.Setter; + +import java.io.Serializable; + +public class AclQuery implements Serializable { + private static final long serialVersionUID = -8324968145787319789L; + + private String topic; + private String appName; + private String userInfo; + + public String getTopic(){ + return this.topic; + } + public void setTopic(String topic){ + this.topic = topic; + } + + public String getAppName(){ + return this.appName; + } + public void setAppName(String appName){ + this.appName = appName; + } + + public String getUserInfo(){ + return this.userInfo; + } + public void setUserInfo(String userInfo){ + this.userInfo = userInfo; + } + +} diff --git a/acl/acl-dao/src/main/resources/mybatis/AclDAOMapper.xml b/acl/acl-dao/src/main/resources/mybatis/AclDAOMapper.xml new file mode 100644 index 000000000..a702978e9 --- /dev/null +++ b/acl/acl-dao/src/main/resources/mybatis/AclDAOMapper.xml @@ -0,0 +1,88 @@ +<?xml version="1.0" encoding="UTF-8"?> +<!DOCTYPE mapper PUBLIC "-//mybatis.org//DTD Mapper 3.0//EN" + "http://mybatis.org/dtd/mybatis-3-mapper.dtd"> +<mapper namespace="org.apache.rocketmq.acl.dao.mapper.DataMapper"> + + <sql id="acl_info_list"> + id, + gmt_create, + gmt_modify, + row_version, + row_status, + topic, + app_name, + user_info, + attribute + </sql> + + <sql id="table">acl</sql> + + <select id="count" resultType="java.lang.Integer"> + select count(1) + from <include refid="table"/> + <where> + row_status = 0 + and topic = #{topic} + and app_name = #{appName} + and user_info = #{userInfo} + </where> + </select> + + <select id="select" resultType="AclDO" parameterType="AclQuery"> + select <include refid="acl_info_list"/> + from <include refid="table"/> + <where> + row_status = 0 + and topic =#{topic} + and app_name = #{appName} + <if test="userInfo != null">and user_info = #{userInfo}</if> + </where> + </select> + + <insert id="insert" parameterType = "AclDO" useGeneratedKeys="true" keyProperty="id"> + insert into <include refid="table"/> + (<include refid="acl_info_list"/>) + values( + #{id}, + <choose> + <when test="gmtCreate != null">#{gmtCreate}</when> + <otherwise>now()</otherwise> + </choose>, + <choose> + <when test="gmtModify != null">#{gmtModify}</when> + <otherwise>now()</otherwise> + </choose>, + <choose> + <when test="rowStatus != null">#{rowStatus}</when> + <otherwise>0</otherwise> + </choose>, + <choose> + <when test="rowVersion != null">#{rowVersion}</when> + <otherwise>1</otherwise> + </choose>, + #{topic}, + #{appName}, + #{userInfo}, + #{attribute} + ) + </insert> + + <update id="update" parameterType="AclDO"> + update <include refid="table"/> + <set> + gmt_modify = now(), + row_version = row_version + 1, + <if test="topic != null">topic = #{topic},</if> + <if test="appName != null">app_name = #{appName},</if> + <if test="rowStatus != null">row_status = #{rowStatus},</if> + <if test="attribute != null">attribute = concat(attribute,#{attribute})</if> + </set> + <where> + row_status = 0 + and topic = #{topic} + and app_name = #{appName} + <if test="userInfo != null">user_info = #{userInfo},</if> + </where> + </update> + +</mapper> \ No newline at end of file diff --git a/acl/acl-dao/src/main/resources/mybatis/mybatis-config.xml b/acl/acl-dao/src/main/resources/mybatis/mybatis-config.xml new file mode 100644 index 000000000..4a6d41a21 --- /dev/null +++ b/acl/acl-dao/src/main/resources/mybatis/mybatis-config.xml @@ -0,0 +1,43 @@ +<?xml version="1.0" encoding="UTF-8" ?> +<!-- mybatis的配置文件 --> +<!DOCTYPE configuration + PUBLIC "-//mybatis.org//DTD Config 3.0//EN" + "http://mybatis.org/dtd/mybatis-3-config.dtd"> +<configuration> + <settings> + <setting name="mapUnderscoreToCamelCase" value="true"/> + </settings> + + <typeAliases> + <package name="org.apache.rocketmq.acl.dao.domain"/> + <package name="org.apache.rocketmq.acl.dao.query"/> + </typeAliases> + + <!-- 配置Mybatis运行环境 --> + <environments default="development"> + <environment id="development"> + <!-- type="JDBC" 代表使用JDBC的提交和回滚来管理事务 --> + <transactionManager type="JDBC"/> + <!-- mybatis提供了3种数据源类型,分别是:POOLED,UNPOOLED,JNDI --> + <!-- POOLED 表示支持JDBC数据源连接池 --> + <!-- UNPOOLED 表示不支持数据源连接池 --> + <!-- JNDI 表示支持外部数据源连接池 --> + <dataSource type="POOLED"> + <!-- 数据库的具体配置 --> + <!-- url表示你数据库的地址 + username表示的是数据库的用户 + password表示的是数据库的密码--> + <property name="driver" value="com.mysql.jdbc.Driver"/> + <property name="url" value="jdbc:mysql://localhost:3306/rocketmq"/> + <property name="username" value="root"/> + <property name="password" value="root"/> + </dataSource> + </environment> + </environments> + + <!-- 注册StudentMapper.xml映射文件 --> + <mappers> + <mapper resource="mybatis/AclDAOMapper.xml"/> + </mappers> + +</configuration> \ No newline at end of file diff --git a/acl/acl-dao/src/test/java/testDAO.java b/acl/acl-dao/src/test/java/testDAO.java new file mode 100644 index 000000000..8058f6868 --- /dev/null +++ b/acl/acl-dao/src/test/java/testDAO.java @@ -0,0 +1,35 @@ +import org.apache.rocketmq.acl.dao.domain.AclDO; +import org.apache.rocketmq.acl.dao.manager.BaseDAO; +import org.apache.rocketmq.acl.dao.query.AclQuery; +import org.junit.Test; + +import java.util.List; + +public class testDAO { + @Test + public void testInsert(){ + AclDO aclDO = new AclDO(); + aclDO.setTopic("TestTopic2"); + aclDO.setAppName("TestAppName2"); + + Long res = BaseDAO.getInstance().insert(aclDO); + System.err.println(res); + } + + @Test + public void select(){ + AclQuery query = new AclQuery(); + query.setTopic("TestTopic2"); + query.setAppName("TestAppName2"); + List<AclDO> list = BaseDAO.getInstance().select(query); + System.err.println(list); + } + + @Test + public void testUpdate(){ + AclDO aclDO = new AclDO(); + aclDO.setTopic("TestUpdateTopic"); + aclDO.setAppName("TestUpdateAppName"); + + } +} diff --git a/acl/acl-hook/pom.xml b/acl/acl-hook/pom.xml new file mode 100644 index 000000000..cb6c35de9 --- /dev/null +++ b/acl/acl-hook/pom.xml @@ -0,0 +1,27 @@ +<?xml version="1.0" encoding="UTF-8"?> +<project xmlns="http://maven.apache.org/POM/4.0.0" + xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" + xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> + <parent> + <artifactId>acl</artifactId> + <groupId>org.apache.rocketmq</groupId> + <version>4.4.0-SNAPSHOT</version> + </parent> + <modelVersion>4.0.0</modelVersion> + + <artifactId>acl-hook</artifactId> + + <dependencies> + <dependency> + <groupId>org.apache.rocketmq</groupId> + <artifactId>rocketmq-remoting</artifactId> + </dependency> + + <dependency> + <groupId>com.squareup.okhttp3</groupId> + <artifactId>okhttp</artifactId> + <!--<version>3.2.0</version>--> + </dependency> + </dependencies> + +</project> \ No newline at end of file diff --git a/acl/acl-hook/src/main/java/org/apache/rocketmq/hook/AbstractKeyGenerator.java b/acl/acl-hook/src/main/java/org/apache/rocketmq/hook/AbstractKeyGenerator.java new file mode 100644 index 000000000..d3543185e --- /dev/null +++ b/acl/acl-hook/src/main/java/org/apache/rocketmq/hook/AbstractKeyGenerator.java @@ -0,0 +1,5 @@ +package org.apache.rocketmq.hook; + +public interface AbstractKeyGenerator { + public String genKey(String appName, String topic) ; +} diff --git a/acl/acl-hook/src/main/java/org/apache/rocketmq/hook/AclClientHook.java b/acl/acl-hook/src/main/java/org/apache/rocketmq/hook/AclClientHook.java new file mode 100644 index 000000000..ccc68d076 --- /dev/null +++ b/acl/acl-hook/src/main/java/org/apache/rocketmq/hook/AclClientHook.java @@ -0,0 +1,68 @@ +package org.apache.rocketmq.hook; + +import org.apache.rocketmq.hook.cache.Cache; +import org.apache.rocketmq.hook.cache.Invoker; +import org.apache.rocketmq.hook.cache.MemoryCache; +import org.apache.rocketmq.hook.exception.AclFailException; +import org.apache.rocketmq.hook.rpc.AclService; +import org.apache.rocketmq.hook.rpc.impl.AclServiceHttpProxy; +import org.apache.rocketmq.remoting.RPCHook; +import org.apache.rocketmq.remoting.protocol.RemotingCommand; + +public class AclClientHook implements RPCHook { + private String info; + private String appName; + private String topic; + private Cache cache = MemoryCache.getInstance(); + private AbstractKeyGenerator kg = new KeyGenerator(); + private AclService acl_center; + + private String acl_host = "127.0.0.1"; + private int acl_port = 8080; + + private boolean isAcl = false; + + public AclClientHook(String info) { + this.info = info; + } + + public AclClientHook(String appName, String topic, AclService aclService) { + this.appName = appName; + this.topic = topic; + this.acl_center = AclServiceHttpProxy.getInstance(acl_host, acl_port); + } + + @Override + public void doBeforeRequest(String remoteAddr, RemotingCommand request) { + Boolean isVailtd = (Boolean) cache.getObject(kg.genKey(appName, topic), new Invoker<Boolean>() { + @Override + public Boolean invoke() { + boolean b = acl_center.aclCheck(topic, appName); + return b; + } + }, 5); + this.isAcl = isVailtd; + } + + @Override + public void doAfterResponse(String remoteAddr, RemotingCommand request, RemotingCommand response) { + // do nothing. + } + + class KeyGenerator implements AbstractKeyGenerator { + @Override + public String genKey(String appName, String topic) { + return appName + "_" + topic; + } + } + + public boolean getIsAcl() { + return this.isAcl; + } + + public void checkAcl() throws AclFailException { + if (!this.getIsAcl()) { + throw new AclFailException("can't pass acl."); + } + } +} \ No newline at end of file diff --git a/acl/acl-hook/src/main/java/org/apache/rocketmq/hook/cache/AbstractCache.java b/acl/acl-hook/src/main/java/org/apache/rocketmq/hook/cache/AbstractCache.java new file mode 100644 index 000000000..2b7ceaac8 --- /dev/null +++ b/acl/acl-hook/src/main/java/org/apache/rocketmq/hook/cache/AbstractCache.java @@ -0,0 +1,28 @@ +package org.apache.rocketmq.hook.cache; + +public abstract class AbstractCache implements Cache { + + @Override + public <T> T getObject(String key, Invoker<T> invoker, int second) { + T object = this.getObject(key); + if(object != null){ + if(NULL.equals(object)){ + return null; + }else{ + return object; + } + }else{ + object = invoker.invoke(); + if(object == null){ + object = (T) NULL; + } + this.set(key,object); + this.expire(key,second * 1000); + if(NULL.equals(object)){ + return null; + }else{ + return object; + } + } + } +} \ No newline at end of file diff --git a/acl/acl-hook/src/main/java/org/apache/rocketmq/hook/cache/Cache.java b/acl/acl-hook/src/main/java/org/apache/rocketmq/hook/cache/Cache.java new file mode 100644 index 000000000..7c7f68c93 --- /dev/null +++ b/acl/acl-hook/src/main/java/org/apache/rocketmq/hook/cache/Cache.java @@ -0,0 +1,27 @@ +package org.apache.rocketmq.hook.cache; + +import java.util.Date; + +public interface Cache { + String NULL = "0x00"; + + boolean exist(String key); + + void delete(String key); + + void expire(String key, long millisecond); + + void set(String key, Object value); + <T> T getObject(String key); + + String getString(String key); + Integer getInt(String key); + Long getLong(String key); + Double getDouble(String key); + Date getDate(String key); + + <T> T getObject(String key, Invoker<T> invoker, int second); + + + +} \ No newline at end of file diff --git a/acl/acl-hook/src/main/java/org/apache/rocketmq/hook/cache/Invoker.java b/acl/acl-hook/src/main/java/org/apache/rocketmq/hook/cache/Invoker.java new file mode 100644 index 000000000..379a6668a --- /dev/null +++ b/acl/acl-hook/src/main/java/org/apache/rocketmq/hook/cache/Invoker.java @@ -0,0 +1,6 @@ +package org.apache.rocketmq.hook.cache; + +public interface Invoker<T> { + + T invoke(); +} \ No newline at end of file diff --git a/acl/acl-hook/src/main/java/org/apache/rocketmq/hook/cache/MemoryCache.java b/acl/acl-hook/src/main/java/org/apache/rocketmq/hook/cache/MemoryCache.java new file mode 100644 index 000000000..c679be105 --- /dev/null +++ b/acl/acl-hook/src/main/java/org/apache/rocketmq/hook/cache/MemoryCache.java @@ -0,0 +1,182 @@ +package org.apache.rocketmq.hook.cache; + +import java.util.Date; +import java.util.Map; +import java.util.concurrent.ConcurrentHashMap; + +public class MemoryCache extends AbstractCache { + private final static Map<String,Object> DATA = new ConcurrentHashMap<>(); + private final static Map<String,Long> EXPIRE = new ConcurrentHashMap<>(); + + private long cleanIntervalTimeSecond = 15; + + @Override + public synchronized boolean exist(String key) { + Object o = DATA.get(key); + if(o != null){ + return !isExpire(key); + }else{ + return false; + } + } + + private synchronized boolean isExpire(String key){ + Long expireTime = EXPIRE.get(key); + if(expireTime == null){ + //if null,then it will not set expire time + return false; + }else{ + if(System.currentTimeMillis() > expireTime){ + //expire + DATA.remove(key); + EXPIRE.remove(key); + return true; + }else{ + return false; + } + } + } + + @Override + public synchronized void delete(String key) { + DATA.remove(key); + EXPIRE.remove(key); + } + + @Override + public synchronized void expire(String key, long millisecond) { + if(DATA.get(key) != null){ + EXPIRE.put(key,System.currentTimeMillis() + millisecond); + } + } + + @Override + public synchronized void set(String key, Object value) { + DATA.put(key,value); + } + + @Override + public synchronized <T> T getObject(String key) { + if(exist(key)){ + return (T) DATA.get(key); + }else{ + return null; + } + + } + + @Override + public synchronized String getString(String key) { + if(exist(key)){ + return String.valueOf(DATA.get(key)); + }else{ + return null; + } + + } + + @Override + public synchronized Integer getInt(String key) { + if(exist(key)){ + return (Integer) DATA.get(key); + }else{ + return null; + } + + } + + @Override + public synchronized Long getLong(String key) { + if(exist(key)){ + return (Long) DATA.get(key); + }else{ + return null; + } + + } + + @Override + public synchronized Double getDouble(String key) { + if(exist(key)){ + return (Double) DATA.get(key); + }else { + return null; + } + + } + + @Override + public synchronized Date getDate(String key) { + if(exist(key)){ + return (Date) DATA.get(key); + }else { + return null; + } + + } + + public synchronized void flushDb() { + DATA.clear(); + EXPIRE.clear(); + } + + public MemoryCache(){ + this.startCleaner(); + } + + protected void startCleaner(){ + new Thread(new Runnable() { + @Override + public void run() { + while (true){ + try { + Thread.sleep(cleanIntervalTimeSecond * 1000); + for(String key : DATA.keySet()){ + if(isExpire(key)){ + delete(key); + } + } + + + } catch (Exception e) { + e.printStackTrace(); + } + } + + } + }).start(); + } + + + public long getCleanIntervalTimeSecond() { + return cleanIntervalTimeSecond; + } + + public void setCleanIntervalTimeSecond(long cleanIntervalTimeSecond) { + this.cleanIntervalTimeSecond = cleanIntervalTimeSecond; + } + + public static Cache getInstance(){ + return Instance.cache; + } + + + private static class Instance{ + private static Cache cache = new MemoryCache(); + } + + public static void main(String[] args) { + final String key = "ni"; + Boolean val = false; + + Cache instance = MemoryCache.getInstance(); + Boolean object = instance.getObject(key, new Invoker<Boolean>() { + @Override + public Boolean invoke() { + return true; + } + }, 1); + + System.out.println(object); + } +} \ No newline at end of file diff --git a/acl/acl-hook/src/main/java/org/apache/rocketmq/hook/exception/AclFailException.java b/acl/acl-hook/src/main/java/org/apache/rocketmq/hook/exception/AclFailException.java new file mode 100644 index 000000000..6c8f5762d --- /dev/null +++ b/acl/acl-hook/src/main/java/org/apache/rocketmq/hook/exception/AclFailException.java @@ -0,0 +1,23 @@ +package org.apache.rocketmq.hook.exception; + +public class AclFailException extends RuntimeException { + + public AclFailException() { + } + + public AclFailException(String message) { + super(message); + } + + public AclFailException(String message, Throwable cause) { + super(message, cause); + } + + public AclFailException(Throwable cause) { + super(cause); + } + + public AclFailException(String message, Throwable cause, boolean enableSuppression, boolean writableStackTrace) { + super(message, cause, enableSuppression, writableStackTrace); + } +} \ No newline at end of file diff --git a/acl/acl-hook/src/main/java/org/apache/rocketmq/hook/rpc/AclService.java b/acl/acl-hook/src/main/java/org/apache/rocketmq/hook/rpc/AclService.java new file mode 100644 index 000000000..2ac8857b3 --- /dev/null +++ b/acl/acl-hook/src/main/java/org/apache/rocketmq/hook/rpc/AclService.java @@ -0,0 +1,9 @@ +package org.apache.rocketmq.hook.rpc; + +public interface AclService { + + public boolean aclCheck(String topic, String appName); + + public boolean aclCheck(String topic, String appName,String userInfo); + +} \ No newline at end of file diff --git a/acl/acl-hook/src/main/java/org/apache/rocketmq/hook/rpc/impl/AclServiceHttpProxy.java b/acl/acl-hook/src/main/java/org/apache/rocketmq/hook/rpc/impl/AclServiceHttpProxy.java new file mode 100644 index 000000000..be8029bfb --- /dev/null +++ b/acl/acl-hook/src/main/java/org/apache/rocketmq/hook/rpc/impl/AclServiceHttpProxy.java @@ -0,0 +1,47 @@ +package org.apache.rocketmq.hook.rpc.impl; + +import org.apache.rocketmq.hook.rpc.AclService; + +public class AclServiceHttpProxy implements AclService { + + private String host; + private int port; + OkHttpClient client = new OkHttpClient(); + + private static Object innerLock = new Object(); + private static AclServiceHttpProxy instance; + + public static AclService getInstance(String host, int port) { + if (instance == null) { + synchronized (innerLock) { + if (instance == null) { + instance = new AclServiceHttpProxy(host, port); + } + } + } + return instance; + } + + private AclServiceHttpProxy(String host, int port) { + this.host = host; + this.port = port; + } + + + @Override + public boolean aclCheck(String topic, String appName) { + String url = "http://"+host+":"+port +"/api/acl/checkacl/"+topic+"/"+appName; + Request request = new Request.Builder().url(url).build(); + Response response = client.newCall(request).execute(); + if (response.isSuccessful()) { + return response.body().string(); + } else { + throw new IOException("Unexpected code " + response); + } + } + + @Override + public boolean aclCheck(String topic, String appName, String userInfo) { + // todo + } +} diff --git a/acl/acl-starter/pom.xml b/acl/acl-starter/pom.xml new file mode 100644 index 000000000..43d060153 --- /dev/null +++ b/acl/acl-starter/pom.xml @@ -0,0 +1,15 @@ +<?xml version="1.0" encoding="UTF-8"?> +<project xmlns="http://maven.apache.org/POM/4.0.0" + xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" + xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> + <parent> + <artifactId>acl</artifactId> + <groupId>org.apache.rocketmq</groupId> + <version>4.4.0-SNAPSHOT</version> + </parent> + <modelVersion>4.0.0</modelVersion> + + <artifactId>acl-starter</artifactId> + + +</project> \ No newline at end of file diff --git a/acl/pom.xml b/acl/pom.xml new file mode 100644 index 000000000..377ec3d99 --- /dev/null +++ b/acl/pom.xml @@ -0,0 +1,67 @@ +<?xml version="1.0" encoding="UTF-8"?> +<project xmlns="http://maven.apache.org/POM/4.0.0" + xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" + xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> + <parent> + <artifactId>rocketmq-all</artifactId> + <groupId>org.apache.rocketmq</groupId> + <version>4.4.0-SNAPSHOT</version> + <relativePath>../pom.xml</relativePath> + </parent> + <modelVersion>4.0.0</modelVersion> + + <artifactId>acl</artifactId> + <packaging>pom</packaging> + <modules> + <module>acl-center</module> + <module>acl-dao</module> + <module>acl-starter</module> + <module>acl-hook</module> + </modules> + + <properties> + <lombok.version>1.16.20</lombok.version> + <gson.version>2.8.5</gson.version> + </properties> + + <dependencyManagement> + <dependencies> + <dependency> + <groupId>org.projectlombok</groupId> + <artifactId>lombok</artifactId> + <version>${lombok.version}</version> + <scope>compile</scope> + </dependency> + <dependency> + <groupId>org.apache.commons</groupId> + <artifactId>commons-collections4</artifactId> + <version>4.2</version> + </dependency> + <dependency> + <groupId>com.google.guava</groupId> + <artifactId>guava</artifactId> + <version>21.0</version> + </dependency> + <dependency> + <groupId>com.google.code.gson</groupId> + <artifactId>gson</artifactId> + <version>2.8.2</version> + </dependency> + <dependency> + <!-- Import dependency management from Spring Boot --> + <groupId>org.springframework.boot</groupId> + <artifactId>spring-boot-dependencies</artifactId> + <version>2.0.3.RELEASE</version> + <type>pom</type> + <scope>import</scope> + </dependency> + <dependency> + <groupId>org.springframework.boot</groupId> + <artifactId>spring-boot-starter-web</artifactId> + <version>2.0.3.RELEASE</version> + </dependency> + </dependencies> + </dependencyManagement> + + +</project> \ No newline at end of file diff --git a/client/src/main/java/org/apache/rocketmq/client/Validators.java b/client/src/main/java/org/apache/rocketmq/client/Validators.java index 5d6acc0f8..31bbe05b3 100644 --- a/client/src/main/java/org/apache/rocketmq/client/Validators.java +++ b/client/src/main/java/org/apache/rocketmq/client/Validators.java @@ -98,6 +98,7 @@ public static void checkMessage(Message msg, DefaultMQProducer defaultMQProducer throw new MQClientException(ResponseCode.MESSAGE_ILLEGAL, "the message body size over max value, MAX: " + defaultMQProducer.getMaxMessageSize()); } + } /** diff --git a/example/src/main/java/org/apache/rocketmq/example/quickstart/Producer.java b/example/src/main/java/org/apache/rocketmq/example/quickstart/Producer.java index 53a1d4dd6..a95afbd5c 100644 --- a/example/src/main/java/org/apache/rocketmq/example/quickstart/Producer.java +++ b/example/src/main/java/org/apache/rocketmq/example/quickstart/Producer.java @@ -16,12 +16,15 @@ */ package org.apache.rocketmq.example.quickstart; +import com.google.common.collect.Maps; import org.apache.rocketmq.client.exception.MQClientException; import org.apache.rocketmq.client.producer.DefaultMQProducer; import org.apache.rocketmq.client.producer.SendResult; import org.apache.rocketmq.common.message.Message; import org.apache.rocketmq.remoting.common.RemotingHelper; +import java.util.Map; + /** * This class demonstrates how to send messages to brokers using provided {@link DefaultMQProducer}. */ @@ -56,6 +59,8 @@ public static void main(String[] args) throws MQClientException, InterruptedExce /* * Create a message instance, specifying topic, tag and message body. */ + Map<String,String> map = Maps.newHashMap(); + map.put("acl","hanxiACL"); Message msg = new Message("TopicTest" /* Topic */, "TagA" /* Tag */, ("Hello RocketMQ " + i).getBytes(RemotingHelper.DEFAULT_CHARSET) /* Message body */ diff --git a/pom.xml b/pom.xml index 1f71cd4b7..c6a43b04a 100644 --- a/pom.xml +++ b/pom.xml @@ -125,6 +125,7 @@ <module>distribution</module> <module>openmessaging</module> <module>logging</module> + <module>acl</module> </modules> <build> @@ -605,6 +606,16 @@ <artifactId>log4j-slf4j-impl</artifactId> <version>2.7</version> </dependency> + <dependency> + <groupId>org.apache.rocketmq</groupId> + <artifactId>acl</artifactId> + <version>4.4.0-SNAPSHOT</version> + </dependency> + <dependency> + <groupId>org.apache.rocketmq</groupId> + <artifactId>acl-hook</artifactId> + <version>4.4.0-SNAPSHOT</version> + </dependency> </dependencies> </dependencyManagement> </project> diff --git a/remoting/pom.xml b/remoting/pom.xml index 55d92f370..566b41ba5 100644 --- a/remoting/pom.xml +++ b/remoting/pom.xml @@ -50,5 +50,16 @@ <artifactId>netty-tcnative-boringssl-static</artifactId> <version>1.1.33.Fork26</version> </dependency> + <dependency> + <groupId>org.apache.rocketmq</groupId> + <artifactId>acl</artifactId> + <version>4.4.0-SNAPSHOT</version> + </dependency> + <dependency> + <groupId>org.apache.rocketmq</groupId> + <artifactId>acl-hook</artifactId> + <version>4.4.0-SNAPSHOT</version> + <scope>compile</scope> + </dependency> </dependencies> </project> diff --git a/remoting/src/main/java/org/apache/rocketmq/remoting/netty/NettyRemotingClient.java b/remoting/src/main/java/org/apache/rocketmq/remoting/netty/NettyRemotingClient.java index 33c2eed8d..bcd6ac871 100644 --- a/remoting/src/main/java/org/apache/rocketmq/remoting/netty/NettyRemotingClient.java +++ b/remoting/src/main/java/org/apache/rocketmq/remoting/netty/NettyRemotingClient.java @@ -366,6 +366,7 @@ public RemotingCommand invokeSync(String addr, final RemotingCommand request, lo try { if (this.rpcHook != null) { this.rpcHook.doBeforeRequest(addr, request); + ((AclClientHook) this.rpcHook).checkAcl(); } long costTime = System.currentTimeMillis() - beginStartTime; if (timeoutMillis < costTime) { @@ -524,6 +525,7 @@ public void invokeAsync(String addr, RemotingCommand request, long timeoutMillis try { if (this.rpcHook != null) { this.rpcHook.doBeforeRequest(addr, request); + ((AclClientHook) this.rpcHook).checkAcl(); } long costTime = System.currentTimeMillis() - beginStartTime; if (timeoutMillis < costTime) { @@ -549,6 +551,7 @@ public void invokeOneway(String addr, RemotingCommand request, long timeoutMilli try { if (this.rpcHook != null) { this.rpcHook.doBeforeRequest(addr, request); + ((AclClientHook) this.rpcHook).checkAcl(); } this.invokeOnewayImpl(channel, request, timeoutMillis); } catch (RemotingSendRequestException e) { ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: [email protected] With regards, Apache Git Services
