[
https://issues.apache.org/jira/browse/ARTEMIS-3913?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17573974#comment-17573974
]
gongping.zhu commented on ARTEMIS-3913:
---------------------------------------
my ArtemisBorkderPlugin code
```
package com.yeker.iot.broker.plugin.impl;
import cn.hutool.core.util.ObjectUtil;
import com.yeker.iot.broker.plugin.impl.model.Account;
import com.yeker.iot.broker.plugin.impl.model.DeviceAuth;
import com.yeker.sdk.comm.util.IPUntil;
import org.apache.activemq.artemis.api.core.*;
import org.apache.activemq.artemis.core.persistence.OperationContext;
import org.apache.activemq.artemis.core.postoffice.RoutingStatus;
import org.apache.activemq.artemis.core.protocol.mqtt.MQTTReasonCodes;
import
org.apache.activemq.artemis.core.protocol.mqtt.exceptions.MQTTRuntimesException;
import org.apache.activemq.artemis.core.server.ActiveMQServer;
import org.apache.activemq.artemis.core.server.ServerSession;
import org.apache.activemq.artemis.core.server.plugin.ActiveMQServerPlugin;
import org.apache.activemq.artemis.core.transaction.Transaction;
import org.apache.activemq.artemis.spi.core.protocol.RemotingConnection;
import org.apache.activemq.artemis.spi.core.protocol.SessionCallback;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.boot.jdbc.DataSourceBuilder;
import org.springframework.jdbc.core.BeanPropertyRowMapper;
import org.springframework.jdbc.core.JdbcTemplate;
import org.springframework.jdbc.core.RowMapperResultSetExtractor;
import java.io.Serializable;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
public class ArtemisBrokerPlugin implements ActiveMQServerPlugin, Serializable {
private Logger log = LoggerFactory.getLogger(ArtemisBrokerPlugin.class);
/**
* 遗言topic前缀
*/
private static final String MISSING_TOPIC_PREFIX = "msg.req.lwt";
/**
* 服务端clientId前缀
*/
private static final String SERVICE_CLIENT_ID_PREFIX = "service-";
/**
* client id 分隔符
*/
private static final String CLIENT_SPLITOR = "@";
/**
* 账号数据库信息
*/
private String accountDriver;
private String accountUrl;
private String accountUsername;
private String accountPassword;
private JdbcTemplate accountJdbcTemplate;
/**
* 授权认证
*/
private boolean accountAuthEnabled = false;
private String accountAuthQuerySQL;
/**
* 设备数据库信息
*/
private String deviceDriver;
private String deviceUrl;
private String deviceUsername;
private String devicePassword;
private JdbcTemplate deviceJdbcTemplate;
/**
* 设备授权
*/
private boolean deviceAuthEnabled = false;
private String deviceAuthCheckSQL;
private String deviceAuthLockerSQL;
private boolean deviceStatusSyncabled = false;
private String connectUpdateSQL;
private String disconnectUpdateSQL;
private String lwtUpdateSQL;
private String resetUpdateSQL;
private Map<String, Integer> authConnectTables = new ConcurrentHashMap<>();
private Map<String, DeviceAuth> authDeviceTables = new ConcurrentHashMap<>();
private boolean debug = false;
@Override
public void init(Map<String, String> properties) {
this.accountDriver = properties.get("accountDriver");
this.accountUrl = properties.get("accountUrl");
this.accountUsername = properties.get("accountUsername");
this.accountPassword = properties.get("accountPassword");
this.accountAuthEnabled = Boolean.valueOf(properties.get("accountAuthEnabled"));
if(this.accountAuthEnabled){
accountJdbcTemplate =
build(accountDriver,accountUrl,accountUsername,accountPassword);
}
this.accountAuthQuerySQL = properties.get("accountAuthQuerySQL");
this.deviceDriver = properties.get("deviceDriver");
this.deviceUrl = properties.get("deviceUrl");
this.deviceUsername = properties.get("deviceUsername");
this.devicePassword = properties.get("devicePassword");
this.deviceAuthEnabled = Boolean.valueOf(properties.get("deviceAuthEnabled"));
if(this.deviceAuthEnabled){
deviceJdbcTemplate =
build(deviceDriver,deviceUrl,deviceUsername,devicePassword);
}
this.deviceAuthCheckSQL = properties.get("deviceAuthCheckSQL");
this.deviceAuthLockerSQL = properties.get("deviceAuthLockerSQL");
this.deviceStatusSyncabled =
Boolean.valueOf(properties.get("deviceStatusSyncabled"));
this.connectUpdateSQL = properties.get("connectUpdateSQL");
this.disconnectUpdateSQL = properties.get("disconnectUpdateSQL");
this.lwtUpdateSQL = properties.get("lwtUpdateSQL");
this.resetUpdateSQL = properties.get("resetUpdateSQL");
log.info("init :[{}] ",properties);
log.info("AccountAuthEnabled :[{}] ", accountAuthEnabled);
log.info("DeviceAuthEnabled :[{}] ", deviceAuthEnabled);
log.info("DeviceStatusSyncabled :[{}] ", deviceStatusSyncabled);
log.info("{} 插件初始化",this.getClass().getSimpleName());
}
@Override
public void afterCreateConnection(RemotingConnection connection) throws
ActiveMQException {
if(debug){
log.info("afterCreateConnection
{},{},{}",connection.getClientID(),connection.getRemoteAddress(),connection.getID());
}
}
/**
*
* @param name
* @param username
* @param minLargeMessageSize
* @param connection
* @param autoCommitSends
* @param autoCommitAcks
* @param preAcknowledge
* @param xa
* @param defaultAddress
* @param callback
* @param autoCreateQueues
* @param context
* @param prefixes
* @throws ActiveMQException
*/
@Override
public void beforeCreateSession(String name, String username, int
minLargeMessageSize,
RemotingConnection connection, boolean autoCommitSends, boolean autoCommitAcks,
boolean preAcknowledge,
boolean xa, String defaultAddress, SessionCallback callback, boolean
autoCreateQueues, OperationContext context,
Map<SimpleString, RoutingType> prefixes) throws ActiveMQException {
if(debug) {
log.info("beforeCreateSession {},{},{},{},{}", connection.getClientID(),
connection.getRemoteAddress(), connection.getID(), name, username);
}
String connId = connection.getID().toString();
if(deviceAuthEnabled && !authConnectTables.containsKey(connId)) {
// Debug.print(log);
authConnectTables.put(connId,0);
doConnectValidation(connection);
}
}
/**
* After a session has been created.
*
* @param session The newly created session
* @throws ActiveMQException
*/
@Override
public void afterCreateSession(ServerSession session) throws ActiveMQException {
RemotingConnection connection = session.getRemotingConnection();
if(debug) {
log.info("afterCreateSession {},{},{}", connection.getClientID(),
connection.getRemoteAddress(), connection.getID());
}
if(accountAuthEnabled){
doAccountValidation(session.getUsername(),session.getPassword());
}
}
/**
* A connection has been destroyed.
*
* @param connection
* @throws ActiveMQException
*/
@Override
public void afterDestroyConnection(RemotingConnection connection) throws
ActiveMQException {
if(debug) {
log.info("afterDestroyConnection {},{},{}", connection.getClientID(),
connection.getRemoteAddress(), connection.getID());
}
doDisconnect(connection);
String connId = connection.getID().toString();
authConnectTables.remove(connId);
}
/**
* After a message is sent
*
* @param session the session that sends the message
* @param tx
* @param message
* @param direct
* @param noAutoCreateQueue
* @param result
* @throws ActiveMQException
*/
@Override
public void afterSend(ServerSession session,
Transaction tx,
Message message,
boolean direct,
boolean noAutoCreateQueue,
RoutingStatus result) throws ActiveMQException {
RemotingConnection connection = session.getRemotingConnection();
if(debug) {
log.info("afterSend {},{},{}", connection.getClientID(),
connection.getRemoteAddress(), connection.getID());
}
doSendLwt(connection,message);
String connId = connection.getID().toString();
authConnectTables.remove(connId);
}
private void doConnectValidation(RemotingConnection connection) throws
ActiveMQException{
Boolean success = false;
String error = "";
boolean invalid = false;
int locked = 0;
int eft = 0;
String clientId = connection.getClientID();
String connId = connection.getID().toString();
String clientIp = connection.getRemoteAddress().toString();
clientIp = formatClientIP(clientIp);
try{
/**
* 设备授权校验
*/
boolean invalidClientId = invalidClientId(clientId);
if(invalidClientId){//licId@mac
/**
* 无效的ClientId
*/
invalid = true;
throw new
MQTTRuntimesException(MQTTReasonCodes.CLIENT_IDENTIFIER_NOT_VALID,String.format("%s,非授权设备",clientId));
}
/**
* 连接用户校验
* 有效的ClientId格式
*/
if(!isService(clientId)){
String[] clientIdElements = clientId.split(CLIENT_SPLITOR);
String licId = clientIdElements[0];
String devId = clientIdElements[1];
/**
* 开启设备授权
*/
if(deviceAuthEnabled){
/**
* 新设备
*/
if(!authDeviceTables.containsKey(clientId)){
/**
* 验证设备
*/
DeviceAuth auth = validate(licId,devId);
/**
* 新设备
*/
if(ObjectUtil.isNotEmpty(auth)){
/**
* 设备第一次
*/
if(ObjectUtil.isEmpty(auth.getDevId())){
String lockerSql = deviceAuthLockerSQL;
/**
* 防止一个licId被多台设备使用
*/
locked = deviceJdbcTemplate.update(lockerSql, new Object[]\{devId, licId});
if(locked==0){
throw new
MQTTRuntimesException(MQTTReasonCodes.CLIENT_IDENTIFIER_NOT_VALID,String.format("%s,不能在多台设备上使用",licId));
}
auth.setDevId(devId);
authDeviceTables.put(clientId,auth);
}
}
}
}
/**
* 更新设备状态【连接】
*/
if(deviceStatusSyncabled){
String sql = connectUpdateSQL;
String brokerIp = IPUntil.getLocalIp();
eft = deviceJdbcTemplate.update(sql, new Object[]\{connId,clientIp,
brokerIp,licId});
}
}
success = true;
}
catch(ActiveMQException ex){
error = ex.getMessage();
throw ex;
}
finally{
String prefix = "连接异常";
if(success){
prefix = "连接成功";
log.info("{} {} {} {} {} {}", prefix, formatClientId(clientId),
clientIp,connId,locked,eft);
}
else{
if(invalid) {
log.debug("{} {} {} {} {} {} {}", prefix, formatClientId(clientId),
clientIp,connId,locked,eft, error);
}
else{
log.info("{} {} {} {} {} {} {}", prefix, formatClientId(clientId),
clientIp,connId, locked,eft,error);
}
}
}
}
private void doDisconnect(RemotingConnection connection){
/**
* 更新设备状态【断开】
*/
String clientId = connection.getClientID();
String connId = connection.getID().toString();
String clientIp = connection.getRemoteAddress().toString();
clientIp = formatClientIP(clientIp);
int eft = 0;
if(invalidClientId(clientId)){
return;
}
try {
if (clientId.indexOf(CLIENT_SPLITOR) != 1) {
if (deviceStatusSyncabled) {
String[] clientIdElements = clientId.split(CLIENT_SPLITOR);
String licId = clientIdElements[0];
String sql = disconnectUpdateSQL;
eft = deviceJdbcTemplate.update(sql, new Object[]\{licId,connId});
}
}
}
finally {
log.info("断开成功 {} {} {} {}", formatClientId(clientId),clientIp,connId,eft);
}
}
private void doSendLwt(RemotingConnection connection,Message message){
int eft = 0;
Boolean success = false;
String clientId = connection.getClientID();
String connId = connection.getID().toString();
String clientIp = connection.getRemoteAddress().toString();
clientIp = formatClientIP(clientIp);
String address = message.getAddress();
if(invalidClientId(clientId)){
return;
}
if (MISSING_TOPIC_PREFIX.equals(address)) {
if (deviceStatusSyncabled && !isService(clientId)) {
try {
String licId = null;
String sql = lwtUpdateSQL;
String[] clientIdElements = clientId.split(CLIENT_SPLITOR);
licId = clientIdElements[0];
eft = deviceJdbcTemplate.update(sql, new Object[]\{licId, connId});
success = true;
}
finally{
if(success){
log.info("断开成功 {} {} {} {} {}",formatClientId(clientId),clientIp,connId,"LWT"
,eft);
}
else{
log.debug("断开异常 {} {} {} {} {} {}",formatClientId(clientId)
,clientIp,connId,"LWT",eft,message);
}
}
}
}
}
private JdbcTemplate build(String driver, String url, String username, String
password){
DataSourceBuilder builder = DataSourceBuilder.create();
builder.driverClassName(driver);
builder.url(url);
builder.username(username);
builder.password(password);
return new JdbcTemplate(builder.build());
}
private String formatClientId(String clientId){
if(ObjectUtil.isNotEmpty(clientId)){
int index = clientId.toLowerCase().indexOf("service-");
if(index!=-1){
int length = clientId.length();
if(length>(index+44)){
return clientId.substring(index,index+44);
}
else{
return clientId.substring(index);
}
}
return clientId;
}
return clientId;
}
private String formatClientIP(String clientIp){
int index = -1;
if(ObjectUtil.isEmpty(clientIp)){
return clientIp;
}
clientIp = clientIp.replace("/","");
return clientIp;
}
private void doAccountValidation(String username,String password){
String sql= accountAuthQuerySQL;
List<Account> accounts = accountJdbcTemplate.query(sql,new
Object[]\{username,password},new RowMapperResultSetExtractor<Account>(new
BeanPropertyRowMapper<Account>(Account.class)));
if(ObjectUtil.isEmpty(accounts)){
throw new SecurityException("非法租户");
}
}
/**
* @param licId yekerId
* @param devId cpusn or mac_address
*/
private DeviceAuth validate(String licId,String devId) throws ActiveMQException
{
/**
* YekerId@CPU_SN;保证一个YekerId只被一台设备使用
*/
String sql = deviceAuthCheckSQL;
List<DeviceAuth> deviceAuths = deviceJdbcTemplate.query(sql, new
Object[]\{licId}, new RowMapperResultSetExtractor<DeviceAuth>(new
BeanPropertyRowMapper<DeviceAuth>(DeviceAuth.class)));
if (deviceAuths == null || ObjectUtil.isEmpty(deviceAuths)) {
throw new
MQTTRuntimesException(MQTTReasonCodes.CLIENT_IDENTIFIER_NOT_VALID,String.format("%s,非授权设备",licId));
}
DeviceAuth auth = deviceAuths.get(0);
if (ObjectUtil.isNotEmpty(auth.getDevId())
&& !auth.getDevId().equalsIgnoreCase(devId)) {
throw new
MQTTRuntimesException(MQTTReasonCodes.CLIENT_IDENTIFIER_NOT_VALID,String.format("%s,不能在多台设备上使用",licId));
}
return auth;
}
/**
* [service node或者broker node]
* @param clientId
* @return
*/
private boolean isService(String clientId){
return ObjectUtil.isNotEmpty(clientId)
&& (clientId.toLowerCase().indexOf(SERVICE_CLIENT_ID_PREFIX)!=-1 //服务节点
|| clientId.toLowerCase().indexOf("brokernode")!=-1);// broker节点
}
/**
* clientId是否含有@分割符号
* @param clientId
* @return
*/
private static boolean invalidClientId(String clientId){
return ObjectUtil.isEmpty(clientId)
|| (clientId.toLowerCase().indexOf(SERVICE_CLIENT_ID_PREFIX) == -1 //非服务节点
&& clientId.toLowerCase().indexOf("brokernode") == -1 //非broker节点
&& clientId.indexOf(CLIENT_SPLITOR) == -1);//非正常的client
}
/**
* 最后遗嘱LWT(Last Will & Testament)
* @param originalTopic
* @return
*/
private boolean isLwt(String originalTopic) {
return originalTopic != null && originalTopic.startsWith(MISSING_TOPIC_PREFIX);
}
/**
*
* @param server
*/
@Override
public void registered(ActiveMQServer server){
log.info("{} 插件注册",this.getClass().getSimpleName());
reset();
}
/**
* 插件卸载
* @param server
*/
@Override
public void unregistered(ActiveMQServer server){
log.info("{} 插件注销 {}",this.getClass().getSimpleName());
reset();
}
/**
* 同步因为服务器维护导致相关设备状态不一致
*/
private void reset(){
authConnectTables.clear();
authDeviceTables.clear();
String brokerIp = IPUntil.getLocalIp();
String sql = resetUpdateSQL;
deviceJdbcTemplate.update(sql, new Object[]\{brokerIp});
}
}
```
!image-2022-08-02-08-23-52-965.png!
!image-2022-08-02-08-24-39-288.png!
public class MQTTProtocolHandler extends ChannelInboundHandlerAdapter {
...
public void act(MqttMessage message) {
try {
switch (message.fixedHeader().messageType()) {
case AUTH:
handleAuth(message);
break;
case CONNECT:
handleConnect((MqttConnectMessage) message);
break;
case PUBLISH:
handlePublish((MqttPublishMessage) message);
break;
case PUBACK:
handlePuback((MqttPubAckMessage) message);
break;
case PUBREC:
handlePubrec(message);
break;
case PUBREL:
handlePubrel(message);
break;
case PUBCOMP:
handlePubcomp(message);
break;
case SUBSCRIBE:
handleSubscribe((MqttSubscribeMessage) message);
break;
case UNSUBSCRIBE:
handleUnsubscribe((MqttUnsubscribeMessage) message);
break;
case DISCONNECT:
disconnect(false, message);
break;
case UNSUBACK:
case SUBACK:
case PINGREQ: // These are actually handled by the Netty thread
directly so this packet should never make it here
case PINGRESP:
case CONNACK: // The server does not instantiate connections
therefore any CONNACK received over a connection is an invalid control message.
default:
disconnect(true);
}
} catch(MQTTRuntimesException e) {// Customer code throw zhe reason code
to zhe mqtt client
logger.info("@@@@@@@@@@@@@@@:"+e.getCode());
session.getProtocolHandler().sendConnack(e.getCode());
disconnect(true);
}
catch (Exception e) {
logger.info("@@@@@@@@@@@@@@@:"+e.getMessage());
MQTTLogger.LOGGER.errorProcessingControlPacket(message.toString(), e);
if (session.getVersion() == MQTTVersion.MQTT_5) {
sendDisconnect(MQTTReasonCodes.IMPLEMENTATION_SPECIFIC_ERROR);
}
disconnect(true);
} finally {
ReferenceCountUtil.release(message);
}
}
...
}
when i use wireshark capture log
!image-2022-08-02-08-31-01-074.png!
> MQTTReasonCodes byte loss of precision,must int type
> ----------------------------------------------------
>
> Key: ARTEMIS-3913
> URL: https://issues.apache.org/jira/browse/ARTEMIS-3913
> Project: ActiveMQ Artemis
> Issue Type: Bug
> Reporter: gongping.zhu
> Priority: Major
> Attachments: image-2022-08-02-08-23-52-965.png,
> image-2022-08-02-08-24-39-288.png, image-2022-08-02-08-31-01-074.png
>
>
--
This message was sent by Atlassian Jira
(v8.20.10#820010)