[
https://issues.apache.org/jira/browse/ARTEMIS-3913?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17573974#comment-17573974
]
gongping.zhu edited comment on ARTEMIS-3913 at 8/2/22 12:45 AM:
----------------------------------------------------------------
my ArtemisBorkderPlugin code
```
import cn.hutool.core.util.ObjectUtil;
import com.yeker.iot.broker.plugin.impl.model.Account;
import com.yeker.iot.broker.plugin.impl.model.DeviceAuth;
import com.yeker.sdk.comm.util.IPUntil;
import org.apache.activemq.artemis.api.core.*;
import org.apache.activemq.artemis.core.persistence.OperationContext;
import org.apache.activemq.artemis.core.postoffice.RoutingStatus;
import org.apache.activemq.artemis.core.protocol.mqtt.MQTTReasonCodes;
import
org.apache.activemq.artemis.core.protocol.mqtt.exceptions.MQTTRuntimesException;
import org.apache.activemq.artemis.core.server.ActiveMQServer;
import org.apache.activemq.artemis.core.server.ServerSession;
import org.apache.activemq.artemis.core.server.plugin.ActiveMQServerPlugin;
import org.apache.activemq.artemis.core.transaction.Transaction;
import org.apache.activemq.artemis.spi.core.protocol.RemotingConnection;
import org.apache.activemq.artemis.spi.core.protocol.SessionCallback;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.boot.jdbc.DataSourceBuilder;
import org.springframework.jdbc.core.BeanPropertyRowMapper;
import org.springframework.jdbc.core.JdbcTemplate;
import org.springframework.jdbc.core.RowMapperResultSetExtractor;
import java.io.Serializable;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
public class ArtemisBrokerPlugin implements ActiveMQServerPlugin, Serializable {
private Logger log = LoggerFactory.getLogger(ArtemisBrokerPlugin.class);
/**
* 遗言topic前缀
*/
private static final String MISSING_TOPIC_PREFIX = "msg.req.lwt";
/**
* 服务端clientId前缀
*/
private static final String SERVICE_CLIENT_ID_PREFIX = "service-";
/**
* client id 分隔符
*/
private static final String CLIENT_SPLITOR = "@";
/**
* 账号数据库信息
*/
private String accountDriver;
private String accountUrl;
private String accountUsername;
private String accountPassword;
private JdbcTemplate accountJdbcTemplate;
/**
* 授权认证
*/
private boolean accountAuthEnabled = false;
private String accountAuthQuerySQL;
/**
* 设备数据库信息
*/
private String deviceDriver;
private String deviceUrl;
private String deviceUsername;
private String devicePassword;
private JdbcTemplate deviceJdbcTemplate;
/**
* 设备授权
*/
private boolean deviceAuthEnabled = false;
private String deviceAuthCheckSQL;
private String deviceAuthLockerSQL;
private boolean deviceStatusSyncabled = false;
private String connectUpdateSQL;
private String disconnectUpdateSQL;
private String lwtUpdateSQL;
private String resetUpdateSQL;
private Map<String, Integer> authConnectTables = new ConcurrentHashMap<>();
private Map<String, DeviceAuth> authDeviceTables = new ConcurrentHashMap<>();
private boolean debug = false;
@Override
public void init(Map<String, String> properties) {
this.accountDriver = properties.get("accountDriver");
this.accountUrl = properties.get("accountUrl");
this.accountUsername = properties.get("accountUsername");
this.accountPassword = properties.get("accountPassword");
this.accountAuthEnabled = Boolean.valueOf(properties.get("accountAuthEnabled"));
if(this.accountAuthEnabled)
{ accountJdbcTemplate =
build(accountDriver,accountUrl,accountUsername,accountPassword); }
this.accountAuthQuerySQL = properties.get("accountAuthQuerySQL");
this.deviceDriver = properties.get("deviceDriver");
this.deviceUrl = properties.get("deviceUrl");
this.deviceUsername = properties.get("deviceUsername");
this.devicePassword = properties.get("devicePassword");
this.deviceAuthEnabled = Boolean.valueOf(properties.get("deviceAuthEnabled"));
if(this.deviceAuthEnabled)
{ deviceJdbcTemplate =
build(deviceDriver,deviceUrl,deviceUsername,devicePassword); }
this.deviceAuthCheckSQL = properties.get("deviceAuthCheckSQL");
this.deviceAuthLockerSQL = properties.get("deviceAuthLockerSQL");
this.deviceStatusSyncabled =
Boolean.valueOf(properties.get("deviceStatusSyncabled"));
this.connectUpdateSQL = properties.get("connectUpdateSQL");
this.disconnectUpdateSQL = properties.get("disconnectUpdateSQL");
this.lwtUpdateSQL = properties.get("lwtUpdateSQL");
this.resetUpdateSQL = properties.get("resetUpdateSQL");
log.info("init :[{}] ",properties);
log.info("AccountAuthEnabled :[{}] ", accountAuthEnabled);
log.info("DeviceAuthEnabled :[{}] ", deviceAuthEnabled);
log.info("DeviceStatusSyncabled :[{}] ", deviceStatusSyncabled);
log.info("{} 插件初始化",this.getClass().getSimpleName());
}
@Override
public void afterCreateConnection(RemotingConnection connection) throws
ActiveMQException {
if(debug){
log.info("afterCreateConnection
{},{},{}",connection.getClientID(),connection.getRemoteAddress(),connection.getID());
}
}
/**
*
* @param name
* @param username
* @param minLargeMessageSize
* @param connection
* @param autoCommitSends
* @param autoCommitAcks
* @param preAcknowledge
* @param xa
* @param defaultAddress
* @param callback
* @param autoCreateQueues
* @param context
* @param prefixes
* @throws ActiveMQException
*/
@Override
public void beforeCreateSession(String name, String username, int
minLargeMessageSize,
RemotingConnection connection, boolean autoCommitSends, boolean autoCommitAcks,
boolean preAcknowledge,
boolean xa, String defaultAddress, SessionCallback callback, boolean
autoCreateQueues, OperationContext context,
Map<SimpleString, RoutingType> prefixes) throws ActiveMQException {
if(debug) {
log.info("beforeCreateSession {},{},{},{},{}", connection.getClientID(),
connection.getRemoteAddress(), connection.getID(), name, username);
}
String connId = connection.getID().toString();
if(deviceAuthEnabled && !authConnectTables.containsKey(connId)) \{ //
Debug.print(log); authConnectTables.put(connId,0);
doConnectValidation(connection); }
}
/**
* After a session has been created.
*
* @param session The newly created session
* @throws ActiveMQException
*/
@Override
public void afterCreateSession(ServerSession session) throws ActiveMQException {
RemotingConnection connection = session.getRemotingConnection();
if(debug) {
log.info("afterCreateSession {},{},{}", connection.getClientID(),
connection.getRemoteAddress(), connection.getID());
}
if(accountAuthEnabled) \{
doAccountValidation(session.getUsername(),session.getPassword()); }
}
/**
* A connection has been destroyed.
*
* @param connection
* @throws ActiveMQException
*/
@Override
public void afterDestroyConnection(RemotingConnection connection) throws
ActiveMQException {
if(debug) {
log.info("afterDestroyConnection {},{},{}", connection.getClientID(),
connection.getRemoteAddress(), connection.getID());
}
doDisconnect(connection);
String connId = connection.getID().toString();
authConnectTables.remove(connId);
}
/**
* After a message is sent
*
* @param session the session that sends the message
* @param tx
* @param message
* @param direct
* @param noAutoCreateQueue
* @param result
* @throws ActiveMQException
*/
@Override
public void afterSend(ServerSession session,
Transaction tx,
Message message,
boolean direct,
boolean noAutoCreateQueue,
RoutingStatus result) throws ActiveMQException {
RemotingConnection connection = session.getRemotingConnection();
if(debug) {
log.info("afterSend {},{},{}", connection.getClientID(),
connection.getRemoteAddress(), connection.getID());
}
doSendLwt(connection,message);
String connId = connection.getID().toString();
authConnectTables.remove(connId);
}
private void doConnectValidation(RemotingConnection connection) throws
ActiveMQException{
Boolean success = false;
String error = "";
boolean invalid = false;
int locked = 0;
int eft = 0;
String clientId = connection.getClientID();
String connId = connection.getID().toString();
String clientIp = connection.getRemoteAddress().toString();
clientIp = formatClientIP(clientIp);
try
{ /** * 设备授权校验 */ boolean invalidClientId = invalidClientId(clientId);
if(invalidClientId)
{//licId@mac /** * 无效的ClientId */ invalid = true; throw new
MQTTRuntimesException(MQTTReasonCodes.CLIENT_IDENTIFIER_NOT_VALID,String.format("%s,非授权设备",clientId));
}
/**
* 连接用户校验
* 有效的ClientId格式
*/
if(!isService(clientId)) \{ String[] clientIdElements =
clientId.split(CLIENT_SPLITOR); String licId = clientIdElements[0]; String
devId = clientIdElements[1]; /** * 开启设备授权 */ if(deviceAuthEnabled) \{ /** * 新设备
*/ if(!authDeviceTables.containsKey(clientId)) \{ /** * 验证设备 */ DeviceAuth auth
= validate(licId,devId); /** * 新设备 */ if(ObjectUtil.isNotEmpty(auth)) \{ /** *
设备第一次 */ if(ObjectUtil.isEmpty(auth.getDevId())) \{ String lockerSql =
deviceAuthLockerSQL; /** * 防止一个licId被多台设备使用 */ locked =
deviceJdbcTemplate.update(lockerSql, new Object[]\{devId, licId}
);
if(locked==0)
{ throw new
MQTTRuntimesException(MQTTReasonCodes.CLIENT_IDENTIFIER_NOT_VALID,String.format("%s,不能在多台设备上使用",licId));
}
auth.setDevId(devId);
authDeviceTables.put(clientId,auth);
}
}
}
}
/**
* 更新设备状态【连接】
*/
if(deviceStatusSyncabled)\{ String sql = connectUpdateSQL; String brokerIp =
IPUntil.getLocalIp(); eft = deviceJdbcTemplate.update(sql, new Object[]
{connId,clientIp, brokerIp,licId}
);
}
}
success = true;
}
catch(ActiveMQException ex)\{ error = ex.getMessage(); throw ex; }
finally{
String prefix = "连接异常";
if(success){
prefix = "连接成功";
log.info("{} {} {} {} {} {}", prefix, formatClientId(clientId),
clientIp,connId,locked,eft);
}
else{
if(invalid) {
log.debug("{} {} {} {} {} {} {}", prefix, formatClientId(clientId),
clientIp,connId,locked,eft, error);
}
else{
log.info("{} {} {} {} {} {} {}", prefix, formatClientId(clientId),
clientIp,connId, locked,eft,error);
}
}
}
}
private void doDisconnect(RemotingConnection connection)
{ /** * 更新设备状态【断开】 */ String clientId = connection.getClientID(); String connId
= connection.getID().toString(); String clientIp =
connection.getRemoteAddress().toString(); clientIp = formatClientIP(clientIp);
int eft = 0; if(invalidClientId(clientId))
{ return; }
try {
if (clientId.indexOf(CLIENT_SPLITOR) != 1)
{ if (deviceStatusSyncabled)
{ String[] clientIdElements = clientId.split(CLIENT_SPLITOR); String licId =
clientIdElements[0]; String sql = disconnectUpdateSQL; eft =
deviceJdbcTemplate.update(sql, new Object[]
{licId,connId}
);
}
}
}
finally {
log.info("断开成功 {} {} {} {}", formatClientId(clientId),clientIp,connId,eft);
}
}
private void doSendLwt(RemotingConnection connection,Message message)
{ int eft = 0; Boolean success = false; String clientId =
connection.getClientID(); String connId = connection.getID().toString(); String
clientIp = connection.getRemoteAddress().toString(); clientIp =
formatClientIP(clientIp); String address = message.getAddress();
if(invalidClientId(clientId))\\{ return; }
if (MISSING_TOPIC_PREFIX.equals(address)) {
if (deviceStatusSyncabled && !isService(clientId))
{ try
{ String licId = null; String sql = lwtUpdateSQL; String[] clientIdElements =
clientId.split(CLIENT_SPLITOR); licId = clientIdElements[0]; eft =
deviceJdbcTemplate.update(sql, new Object[]
{licId, connId}
);
success = true;
}
finally{
if(success){
log.info("断开成功 {} {} {} {} {}",formatClientId(clientId),clientIp,connId,"LWT"
,eft);
}
else{
log.debug("断开异常 {} {} {} {} {} {}",formatClientId(clientId)
,clientIp,connId,"LWT",eft,message);
}
}
}
}
}
private JdbcTemplate build(String driver, String url, String username, String
password)\{ DataSourceBuilder builder = DataSourceBuilder.create();
builder.driverClassName(driver); builder.url(url); builder.username(username);
builder.password(password); return new JdbcTemplate(builder.build()); }
private String formatClientId(String clientId){
if(ObjectUtil.isNotEmpty(clientId)){
int index = clientId.toLowerCase().indexOf("service-");
if(index!=-1)
{ int length = clientId.length(); if(length>(index+44))
{ return clientId.substring(index,index+44); }
else\{ return clientId.substring(index); }
}
return clientId;
}
return clientId;
}
private String formatClientIP(String clientIp)
{ int index = -1; if(ObjectUtil.isEmpty(clientIp))
{ return clientIp; }
clientIp = clientIp.replace("/","");
return clientIp;
}
private void doAccountValidation(String username,String password){ String sql=
accountAuthQuerySQL; List<Account> accounts = accountJdbcTemplate.query(sql,new
Object[]
{username,password}
,new RowMapperResultSetExtractor<Account>(new
BeanPropertyRowMapper<Account>(Account.class)));
if(ObjectUtil.isEmpty(accounts))\{ throw new SecurityException("非法租户"); }
}
/**
* @param licId yekerId
* @param devId cpusn or mac_address
*/
private DeviceAuth validate(String licId,String devId) throws ActiveMQException
\{ /** * YekerId@CPU_SN;保证一个YekerId只被一台设备使用 */ String sql = deviceAuthCheckSQL;
List<DeviceAuth> deviceAuths = deviceJdbcTemplate.query(sql, new Object[]
{licId}
, new RowMapperResultSetExtractor<DeviceAuth>(new
BeanPropertyRowMapper<DeviceAuth>(DeviceAuth.class)));
if (deviceAuths == null || ObjectUtil.isEmpty(deviceAuths)) \{ throw new
MQTTRuntimesException(MQTTReasonCodes.CLIENT_IDENTIFIER_NOT_VALID,String.format("%s,非授权设备",licId));
}
DeviceAuth auth = deviceAuths.get(0);
if (ObjectUtil.isNotEmpty(auth.getDevId())
&& !auth.getDevId().equalsIgnoreCase(devId)) \{ throw new
MQTTRuntimesException(MQTTReasonCodes.CLIENT_IDENTIFIER_NOT_VALID,String.format("%s,不能在多台设备上使用",licId));
}
return auth;
}
/**
* [service node或者broker node]
* @param clientId
* @return
*/
private boolean isService(String clientId) \{ return
ObjectUtil.isNotEmpty(clientId) &&
(clientId.toLowerCase().indexOf(SERVICE_CLIENT_ID_PREFIX)!=-1 //服务节点 !=-1);//
broker节点 }
/**
* clientId是否含有@分割符号
* @param clientId
* @return
*/
private static boolean invalidClientId(String clientId) \{ return
ObjectUtil.isEmpty(clientId) ||
(clientId.toLowerCase().indexOf(SERVICE_CLIENT_ID_PREFIX) == -1 //非服务节点 &&
clientId.toLowerCase().indexOf("brokernode") == -1 //非broker节点 &&
clientId.indexOf(CLIENT_SPLITOR) == -1);//非正常的client }
/**
* 最后遗嘱LWT(Last Will & Testament)
* @param originalTopic
* @return
*/
private boolean isLwt(String originalTopic) \{ return originalTopic != null &&
originalTopic.startsWith(MISSING_TOPIC_PREFIX); }
/**
*
* @param server
*/
@Override
public void registered(ActiveMQServer server){
log.info("{} 插件注册",this.getClass().getSimpleName());
reset();
}
/**
* 插件卸载
* @param server
*/
@Override
public void unregistered(ActiveMQServer server){
log.info("{} 插件注销 {}",this.getClass().getSimpleName());
reset();
}
/**
* 同步因为服务器维护导致相关设备状态不一致
*/
private void reset() \{ authConnectTables.clear(); authDeviceTables.clear();
String brokerIp = IPUntil.getLocalIp(); String sql = resetUpdateSQL;
deviceJdbcTemplate.update(sql, new Object[] {brokerIp}
);
}
}
```
!image-2022-08-02-08-23-52-965.png!
!image-2022-08-02-08-24-39-288.png!
public class MQTTProtocolHandler extends ChannelInboundHandlerAdapter
{
...
...
}
!image-2022-08-02-08-45-11-459.png!
when i use wireshark capture log
!image-2022-08-02-08-31-01-074.png!
was (Author: JIRAUSER293605):
my ArtemisBorkderPlugin code
```
import cn.hutool.core.util.ObjectUtil;
import com.yeker.iot.broker.plugin.impl.model.Account;
import com.yeker.iot.broker.plugin.impl.model.DeviceAuth;
import com.yeker.sdk.comm.util.IPUntil;
import org.apache.activemq.artemis.api.core.*;
import org.apache.activemq.artemis.core.persistence.OperationContext;
import org.apache.activemq.artemis.core.postoffice.RoutingStatus;
import org.apache.activemq.artemis.core.protocol.mqtt.MQTTReasonCodes;
import
org.apache.activemq.artemis.core.protocol.mqtt.exceptions.MQTTRuntimesException;
import org.apache.activemq.artemis.core.server.ActiveMQServer;
import org.apache.activemq.artemis.core.server.ServerSession;
import org.apache.activemq.artemis.core.server.plugin.ActiveMQServerPlugin;
import org.apache.activemq.artemis.core.transaction.Transaction;
import org.apache.activemq.artemis.spi.core.protocol.RemotingConnection;
import org.apache.activemq.artemis.spi.core.protocol.SessionCallback;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.boot.jdbc.DataSourceBuilder;
import org.springframework.jdbc.core.BeanPropertyRowMapper;
import org.springframework.jdbc.core.JdbcTemplate;
import org.springframework.jdbc.core.RowMapperResultSetExtractor;
import java.io.Serializable;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
public class ArtemisBrokerPlugin implements ActiveMQServerPlugin, Serializable {
private Logger log = LoggerFactory.getLogger(ArtemisBrokerPlugin.class);
/**
* 遗言topic前缀
*/
private static final String MISSING_TOPIC_PREFIX = "msg.req.lwt";
/**
* 服务端clientId前缀
*/
private static final String SERVICE_CLIENT_ID_PREFIX = "service-";
/**
* client id 分隔符
*/
private static final String CLIENT_SPLITOR = "@";
/**
* 账号数据库信息
*/
private String accountDriver;
private String accountUrl;
private String accountUsername;
private String accountPassword;
private JdbcTemplate accountJdbcTemplate;
/**
* 授权认证
*/
private boolean accountAuthEnabled = false;
private String accountAuthQuerySQL;
/**
* 设备数据库信息
*/
private String deviceDriver;
private String deviceUrl;
private String deviceUsername;
private String devicePassword;
private JdbcTemplate deviceJdbcTemplate;
/**
* 设备授权
*/
private boolean deviceAuthEnabled = false;
private String deviceAuthCheckSQL;
private String deviceAuthLockerSQL;
private boolean deviceStatusSyncabled = false;
private String connectUpdateSQL;
private String disconnectUpdateSQL;
private String lwtUpdateSQL;
private String resetUpdateSQL;
private Map<String, Integer> authConnectTables = new ConcurrentHashMap<>();
private Map<String, DeviceAuth> authDeviceTables = new ConcurrentHashMap<>();
private boolean debug = false;
@Override
public void init(Map<String, String> properties) {
this.accountDriver = properties.get("accountDriver");
this.accountUrl = properties.get("accountUrl");
this.accountUsername = properties.get("accountUsername");
this.accountPassword = properties.get("accountPassword");
this.accountAuthEnabled = Boolean.valueOf(properties.get("accountAuthEnabled"));
if(this.accountAuthEnabled)
{ accountJdbcTemplate =
build(accountDriver,accountUrl,accountUsername,accountPassword); }
this.accountAuthQuerySQL = properties.get("accountAuthQuerySQL");
this.deviceDriver = properties.get("deviceDriver");
this.deviceUrl = properties.get("deviceUrl");
this.deviceUsername = properties.get("deviceUsername");
this.devicePassword = properties.get("devicePassword");
this.deviceAuthEnabled = Boolean.valueOf(properties.get("deviceAuthEnabled"));
if(this.deviceAuthEnabled)
{ deviceJdbcTemplate =
build(deviceDriver,deviceUrl,deviceUsername,devicePassword); }
this.deviceAuthCheckSQL = properties.get("deviceAuthCheckSQL");
this.deviceAuthLockerSQL = properties.get("deviceAuthLockerSQL");
this.deviceStatusSyncabled =
Boolean.valueOf(properties.get("deviceStatusSyncabled"));
this.connectUpdateSQL = properties.get("connectUpdateSQL");
this.disconnectUpdateSQL = properties.get("disconnectUpdateSQL");
this.lwtUpdateSQL = properties.get("lwtUpdateSQL");
this.resetUpdateSQL = properties.get("resetUpdateSQL");
log.info("init :[{}] ",properties);
log.info("AccountAuthEnabled :[{}] ", accountAuthEnabled);
log.info("DeviceAuthEnabled :[{}] ", deviceAuthEnabled);
log.info("DeviceStatusSyncabled :[{}] ", deviceStatusSyncabled);
log.info("{} 插件初始化",this.getClass().getSimpleName());
}
@Override
public void afterCreateConnection(RemotingConnection connection) throws
ActiveMQException {
if(debug){
log.info("afterCreateConnection
{},{},{}",connection.getClientID(),connection.getRemoteAddress(),connection.getID());
}
}
/**
*
* @param name
* @param username
* @param minLargeMessageSize
* @param connection
* @param autoCommitSends
* @param autoCommitAcks
* @param preAcknowledge
* @param xa
* @param defaultAddress
* @param callback
* @param autoCreateQueues
* @param context
* @param prefixes
* @throws ActiveMQException
*/
@Override
public void beforeCreateSession(String name, String username, int
minLargeMessageSize,
RemotingConnection connection, boolean autoCommitSends, boolean autoCommitAcks,
boolean preAcknowledge,
boolean xa, String defaultAddress, SessionCallback callback, boolean
autoCreateQueues, OperationContext context,
Map<SimpleString, RoutingType> prefixes) throws ActiveMQException {
if(debug) {
log.info("beforeCreateSession {},{},{},{},{}", connection.getClientID(),
connection.getRemoteAddress(), connection.getID(), name, username);
}
String connId = connection.getID().toString();
if(deviceAuthEnabled && !authConnectTables.containsKey(connId)) \{ //
Debug.print(log); authConnectTables.put(connId,0);
doConnectValidation(connection); }
}
/**
* After a session has been created.
*
* @param session The newly created session
* @throws ActiveMQException
*/
@Override
public void afterCreateSession(ServerSession session) throws ActiveMQException {
RemotingConnection connection = session.getRemotingConnection();
if(debug) {
log.info("afterCreateSession {},{},{}", connection.getClientID(),
connection.getRemoteAddress(), connection.getID());
}
if(accountAuthEnabled) \{
doAccountValidation(session.getUsername(),session.getPassword()); }
}
/**
* A connection has been destroyed.
*
* @param connection
* @throws ActiveMQException
*/
@Override
public void afterDestroyConnection(RemotingConnection connection) throws
ActiveMQException {
if(debug) {
log.info("afterDestroyConnection {},{},{}", connection.getClientID(),
connection.getRemoteAddress(), connection.getID());
}
doDisconnect(connection);
String connId = connection.getID().toString();
authConnectTables.remove(connId);
}
/**
* After a message is sent
*
* @param session the session that sends the message
* @param tx
* @param message
* @param direct
* @param noAutoCreateQueue
* @param result
* @throws ActiveMQException
*/
@Override
public void afterSend(ServerSession session,
Transaction tx,
Message message,
boolean direct,
boolean noAutoCreateQueue,
RoutingStatus result) throws ActiveMQException {
RemotingConnection connection = session.getRemotingConnection();
if(debug) {
log.info("afterSend {},{},{}", connection.getClientID(),
connection.getRemoteAddress(), connection.getID());
}
doSendLwt(connection,message);
String connId = connection.getID().toString();
authConnectTables.remove(connId);
}
private void doConnectValidation(RemotingConnection connection) throws
ActiveMQException{
Boolean success = false;
String error = "";
boolean invalid = false;
int locked = 0;
int eft = 0;
String clientId = connection.getClientID();
String connId = connection.getID().toString();
String clientIp = connection.getRemoteAddress().toString();
clientIp = formatClientIP(clientIp);
try
{ /** * 设备授权校验 */ boolean invalidClientId = invalidClientId(clientId);
if(invalidClientId)
{//licId@mac /** * 无效的ClientId */ invalid = true; throw new
MQTTRuntimesException(MQTTReasonCodes.CLIENT_IDENTIFIER_NOT_VALID,String.format("%s,非授权设备",clientId));
}
/**
* 连接用户校验
* 有效的ClientId格式
*/
if(!isService(clientId)){
String[] clientIdElements = clientId.split(CLIENT_SPLITOR);
String licId = clientIdElements[0];
String devId = clientIdElements[1];
/**
* 开启设备授权
*/
if(deviceAuthEnabled) \{ /** * 新设备 */
if(!authDeviceTables.containsKey(clientId)) \{ /** * 验证设备 */ DeviceAuth auth =
validate(licId,devId); /** * 新设备 */ if(ObjectUtil.isNotEmpty(auth)) \{ /** *
设备第一次 */ if(ObjectUtil.isEmpty(auth.getDevId())) \{ String lockerSql =
deviceAuthLockerSQL; /** * 防止一个licId被多台设备使用 */ locked =
deviceJdbcTemplate.update(lockerSql, new Object[]\{devId, licId}
);
if(locked==0)
{ throw new
MQTTRuntimesException(MQTTReasonCodes.CLIENT_IDENTIFIER_NOT_VALID,String.format("%s,不能在多台设备上使用",licId));
}
auth.setDevId(devId);
authDeviceTables.put(clientId,auth);
}
}
}
}
/**
* 更新设备状态【连接】
*/
if(deviceStatusSyncabled)\{ String sql = connectUpdateSQL; String brokerIp =
IPUntil.getLocalIp(); eft = deviceJdbcTemplate.update(sql, new Object[]
{connId,clientIp, brokerIp,licId}
);
}
}
success = true;
}
catch(ActiveMQException ex)\{ error = ex.getMessage(); throw ex; }
finally{
String prefix = "连接异常";
if(success){
prefix = "连接成功";
log.info("{} {} {} {} {} {}", prefix, formatClientId(clientId),
clientIp,connId,locked,eft);
}
else{
if(invalid) {
log.debug("{} {} {} {} {} {} {}", prefix, formatClientId(clientId),
clientIp,connId,locked,eft, error);
}
else{
log.info("{} {} {} {} {} {} {}", prefix, formatClientId(clientId),
clientIp,connId, locked,eft,error);
}
}
}
}
private void doDisconnect(RemotingConnection connection)
{ /** * 更新设备状态【断开】 */ String clientId = connection.getClientID(); String connId
= connection.getID().toString(); String clientIp =
connection.getRemoteAddress().toString(); clientIp = formatClientIP(clientIp);
int eft = 0; if(invalidClientId(clientId))
{ return; }
try {
if (clientId.indexOf(CLIENT_SPLITOR) != 1)
{ if (deviceStatusSyncabled)
{ String[] clientIdElements = clientId.split(CLIENT_SPLITOR); String licId =
clientIdElements[0]; String sql = disconnectUpdateSQL; eft =
deviceJdbcTemplate.update(sql, new Object[]
{licId,connId}
);
}
}
}
finally {
log.info("断开成功 {} {} {} {}", formatClientId(clientId),clientIp,connId,eft);
}
}
private void doSendLwt(RemotingConnection connection,Message message)
{ int eft = 0; Boolean success = false; String clientId =
connection.getClientID(); String connId = connection.getID().toString(); String
clientIp = connection.getRemoteAddress().toString(); clientIp =
formatClientIP(clientIp); String address = message.getAddress();
if(invalidClientId(clientId))\{ return; }
if (MISSING_TOPIC_PREFIX.equals(address)) {
if (deviceStatusSyncabled && !isService(clientId))
{ try
{ String licId = null; String sql = lwtUpdateSQL; String[] clientIdElements =
clientId.split(CLIENT_SPLITOR); licId = clientIdElements[0]; eft =
deviceJdbcTemplate.update(sql, new Object[]
{licId, connId}
);
success = true;
}
finally{
if(success){
log.info("断开成功 {} {} {} {} {}",formatClientId(clientId),clientIp,connId,"LWT"
,eft);
}
else{
log.debug("断开异常 {} {} {} {} {} {}",formatClientId(clientId)
,clientIp,connId,"LWT",eft,message);
}
}
}
}
}
private JdbcTemplate build(String driver, String url, String username, String
password)\{ DataSourceBuilder builder = DataSourceBuilder.create();
builder.driverClassName(driver); builder.url(url); builder.username(username);
builder.password(password); return new JdbcTemplate(builder.build()); }
private String formatClientId(String clientId){
if(ObjectUtil.isNotEmpty(clientId)){
int index = clientId.toLowerCase().indexOf("service-");
if(index!=-1)
{ int length = clientId.length(); if(length>(index+44))
{ return clientId.substring(index,index+44); }
else\{ return clientId.substring(index); }
}
return clientId;
}
return clientId;
}
private String formatClientIP(String clientIp)
{ int index = -1; if(ObjectUtil.isEmpty(clientIp))
{ return clientIp; }
clientIp = clientIp.replace("/","");
return clientIp;
}
private void doAccountValidation(String username,String password){ String sql=
accountAuthQuerySQL; List<Account> accounts = accountJdbcTemplate.query(sql,new
Object[]
{username,password}
,new RowMapperResultSetExtractor<Account>(new
BeanPropertyRowMapper<Account>(Account.class)));
if(ObjectUtil.isEmpty(accounts))\{ throw new SecurityException("非法租户"); }
}
/**
* @param licId yekerId
* @param devId cpusn or mac_address
*/
private DeviceAuth validate(String licId,String devId) throws ActiveMQException
\{ /** * YekerId@CPU_SN;保证一个YekerId只被一台设备使用 */ String sql = deviceAuthCheckSQL;
List<DeviceAuth> deviceAuths = deviceJdbcTemplate.query(sql, new Object[]
{licId}
, new RowMapperResultSetExtractor<DeviceAuth>(new
BeanPropertyRowMapper<DeviceAuth>(DeviceAuth.class)));
if (deviceAuths == null || ObjectUtil.isEmpty(deviceAuths)) \{ throw new
MQTTRuntimesException(MQTTReasonCodes.CLIENT_IDENTIFIER_NOT_VALID,String.format("%s,非授权设备",licId));
}
DeviceAuth auth = deviceAuths.get(0);
if (ObjectUtil.isNotEmpty(auth.getDevId())
&& !auth.getDevId().equalsIgnoreCase(devId)) \{ throw new
MQTTRuntimesException(MQTTReasonCodes.CLIENT_IDENTIFIER_NOT_VALID,String.format("%s,不能在多台设备上使用",licId));
}
return auth;
}
/**
* [service node或者broker node]
* @param clientId
* @return
*/
private boolean isService(String clientId) \{ return
ObjectUtil.isNotEmpty(clientId) &&
(clientId.toLowerCase().indexOf(SERVICE_CLIENT_ID_PREFIX)!=-1 //服务节点 !=-1);//
broker节点 }
/**
* clientId是否含有@分割符号
* @param clientId
* @return
*/
private static boolean invalidClientId(String clientId) \{ return
ObjectUtil.isEmpty(clientId) ||
(clientId.toLowerCase().indexOf(SERVICE_CLIENT_ID_PREFIX) == -1 //非服务节点 &&
clientId.toLowerCase().indexOf("brokernode") == -1 //非broker节点 &&
clientId.indexOf(CLIENT_SPLITOR) == -1);//非正常的client }
/**
* 最后遗嘱LWT(Last Will & Testament)
* @param originalTopic
* @return
*/
private boolean isLwt(String originalTopic) \{ return originalTopic != null &&
originalTopic.startsWith(MISSING_TOPIC_PREFIX); }
/**
*
* @param server
*/
@Override
public void registered(ActiveMQServer server){
log.info("{} 插件注册",this.getClass().getSimpleName());
reset();
}
/**
* 插件卸载
* @param server
*/
@Override
public void unregistered(ActiveMQServer server){
log.info("{} 插件注销 {}",this.getClass().getSimpleName());
reset();
}
/**
* 同步因为服务器维护导致相关设备状态不一致
*/
private void reset() \{ authConnectTables.clear(); authDeviceTables.clear();
String brokerIp = IPUntil.getLocalIp(); String sql = resetUpdateSQL;
deviceJdbcTemplate.update(sql, new Object[] {brokerIp}
);
}
}
```
!image-2022-08-02-08-23-52-965.png!
!image-2022-08-02-08-24-39-288.png!
public class MQTTProtocolHandler extends ChannelInboundHandlerAdapter
{
...
!image-2022-08-02-08-43-39-442.png!
...
}
when i use wireshark capture log
!image-2022-08-02-08-31-01-074.png!
> MQTTReasonCodes byte loss of precision,must int type
> ----------------------------------------------------
>
> Key: ARTEMIS-3913
> URL: https://issues.apache.org/jira/browse/ARTEMIS-3913
> Project: ActiveMQ Artemis
> Issue Type: Bug
> Reporter: gongping.zhu
> Priority: Major
> Attachments: image-2022-08-02-08-23-52-965.png,
> image-2022-08-02-08-24-39-288.png, image-2022-08-02-08-31-01-074.png,
> image-2022-08-02-08-42-24-117.png, image-2022-08-02-08-43-39-442.png,
> image-2022-08-02-08-45-11-459.png
>
>
--
This message was sent by Atlassian Jira
(v8.20.10#820010)