zergduan opened a new issue #3885:
URL: https://github.com/apache/rocketmq/issues/3885
**BUG REPORT**
1. Please describe the issue you observed:
2主2备 同步复制 + 同步刷盘 当备节点宕机时, 事务消息不遵循本地事务处理逻辑, 被强制 rollback
2主2备 异步复制 + 同步刷盘 当备节点宕机时, 事务消息遵循本地事务处理逻辑
- What did you do (The steps to reproduce)?
2主2备 同步复制 同步刷盘 rocketmq 4.9.2
4节点关系如下:
broker-a-master broker-a-slave
broker-b-master broker-b-slave
使用下列代码,验证事务消息功能
public class TransProducer {
public static void main(String[] args) throws Exception {
TransactionMQProducer producer = new
TransactionMQProducer(null,"My-Producer-YYY",null,true,null);
producer.setNamesrvAddr("10.177.96.111:19876;10.177.96.112:19876");
TransactionListenerImpl transactionListener = new
TransactionListenerImpl();
producer.setTransactionListener(transactionListener);
ExecutorService executorService = new ThreadPoolExecutor(2, 5, 100,
TimeUnit.SECONDS,
new ArrayBlockingQueue<Runnable>(2000), new ThreadFactory() {
@Override
public Thread newThread(Runnable r) {
Thread thread = new Thread(r);
thread.setName("client-transaction-msg-check-thread");
return thread;
}
});
producer.setExecutorService(executorService);
producer.start();
System.out.println("生产者启动");
String[] tags = { "TAGA", "TAGB", "TAGC" };
for (int i = 0; i < 3; i++) {
Message msg = new Message("TP-E-APP-YYY", tags[i], ("Hello
xuzhu" + i).getBytes());
SendResult result = producer.sendMessageInTransaction(msg,
"hello-xuzhu_transaction");
SendStatus status = result.getSendStatus();
System.out.println("发送结果:" + result);
System.out.println("发送结果状态:" + status);
TimeUnit.SECONDS.sleep(2);
}
producer.shutdown();
System.out.println("生产者结束");
}
}
public class TransactionListenerImpl implements TransactionListener {
@Override
public LocalTransactionState executeLocalTransaction(Message message,
Object o) {
System.out.println("正在执行本地事务----");
if (StringUtils.equals("TAGA", message.getTags())) {
return LocalTransactionState.COMMIT_MESSAGE;
} else if (StringUtils.equals("TAGB", message.getTags())) {
return LocalTransactionState.ROLLBACK_MESSAGE;
} else if (StringUtils.equals("TAGC", message.getTags())) {
return LocalTransactionState.UNKNOW;
}
return LocalTransactionState.UNKNOW;
}
@Override
public LocalTransactionState checkLocalTransaction(MessageExt
messageExt) {
System.out.println("消息的Tag:" + messageExt.getTags());
return LocalTransactionState.COMMIT_MESSAGE;
}
}
场景1: 主从同步复制, 4节点正常情况下,执行上面的脚本,输出如下:
生产者启动
正在执行本地事务----
发送结果:SendResult [sendStatus=SEND_OK, msgId=7F000001E28318B4AAC273CA85360000,
offsetMsgId=null, messageQueue=MessageQueue [topic=TP-E-APP-YYY,
brokerName=broker-b, queueId=1], queueOffset=232]
发送结果状态:SEND_OK
正在执行本地事务----
发送结果:SendResult [sendStatus=SEND_OK, msgId=7F000001E28318B4AAC273CA8D210004,
offsetMsgId=null, messageQueue=MessageQueue [topic=TP-E-APP-YYY,
brokerName=broker-b, queueId=2], queueOffset=233]
发送结果状态:SEND_OK
正在执行本地事务----
发送结果:SendResult [sendStatus=SEND_OK, msgId=7F000001E28318B4AAC273CA94F50008,
offsetMsgId=null, messageQueue=MessageQueue [topic=TP-E-APP-YYY,
brokerName=broker-b, queueId=3], queueOffset=234]
发送结果状态:SEND_OK
生产者结束
在rocketmq-dashboard,通过msgTrace功能,在RMQ_SYS_TRANS_HALF_TOPIC中,根据msgId查看输出结果中,状态及相关信息:
Send Message Info : ( Message Id 7F000001E28318B4AAC273CA85360000 ) TAGA
Check Transaction Info :
Timestamp TransactionState FromTransactionCheck ClientHost
StoreHost
2022-02-23 11:37:32.239 COMMIT_MESSAGE false 10.177.96.117
10.177.96.115:22922
Send Message Info : ( Message Id 7F000001E28318B4AAC273CA8D210004 ) TAGB
Check Transaction Info :
Timestamp TransactionState FromTransactionCheck ClientHost
StoreHost
2022-02-23 11:37:34.244 ROLLBACK_MESSAGE false 10.177.96.117
10.177.96.115:22922
Send Message Info : ( Message Id 7F000001E28318B4AAC273CA94F50008 ) TAGC
Check Transaction Info :
Timestamp TransactionState FromTransactionCheck ClientHost
StoreHost
2022-02-23 11:37:36.247 UNKNOW false 10.177.96.117
10.177.96.115:22922
结论: 符合代码逻辑
场景2: 主从同步复制, 关闭 broker-a-master 和 broker-b-slave 节点, 模拟故障, 验证高可用功能;
执行上面的脚本,输出如下:
生产者启动
发送结果:SendResult [sendStatus=SLAVE_NOT_AVAILABLE,
msgId=7F000001EA3918B4AAC273ED20440000, offsetMsgId=null,
messageQueue=MessageQueue [topic=TP-E-APP-YYY, brokerName=broker-b, queueId=1],
queueOffset=250]
发送结果状态:SLAVE_NOT_AVAILABLE
发送结果:SendResult [sendStatus=SLAVE_NOT_AVAILABLE,
msgId=7F000001EA3918B4AAC273ED28330004, offsetMsgId=null,
messageQueue=MessageQueue [topic=TP-E-APP-YYY, brokerName=broker-b, queueId=2],
queueOffset=251]
发送结果状态:SLAVE_NOT_AVAILABLE
发送结果:SendResult [sendStatus=SLAVE_NOT_AVAILABLE,
msgId=7F000001EA3918B4AAC273ED30070008, offsetMsgId=null,
messageQueue=MessageQueue [topic=TP-E-APP-YYY, brokerName=broker-b, queueId=3],
queueOffset=252]
发送结果状态:SLAVE_NOT_AVAILABLE
生产者结束
在rocketmq-dashboard,通过msgTrace功能,在RMQ_SYS_TRANS_HALF_TOPIC中,根据msgId查看输出结果中,状态及相关信息:
Send Message Info : ( Message Id 7F000001EA3918B4AAC273ED20440000 ) TAGA
Check Transaction Info :
Timestamp TransactionState FromTransactionCheck ClientHost
StoreHost
2022-02-23 12:15:20.161 ROLLBACK_MESSAGE false 10.177.96.117
10.177.96.115:22922
Send Message Info : ( Message Id 7F000001EA3918B4AAC273ED28330004 ) TAGB
Timestamp TransactionState FromTransactionCheck ClientHost
StoreHost
2022-02-23 12:15:22.167 ROLLBACK_MESSAGE false 10.177.96.117
10.177.96.115:22922
Send Message Info : ( Message Id 7F000001EA3918B4AAC273ED30070008 ) TAGC
Check Transaction Info :
Timestamp TransactionState FromTransactionCheck ClientHost
StoreHost
2022-02-23 12:15:24.171 ROLLBACK_MESSAGE false 10.177.96.117
10.177.96.115:22922
结论: 不符合代码逻辑, 所有消息都被Rollback; 跟踪代码发现本地事务的代码 public LocalTransactionState
executeLocalTransaction 并没有被执行, 所有半消息被强制 Rollback
场景3: 主从异步复制(即主节点参数 brokerRole 改为 ASYNC_MASTER) , 关闭 broker-a-master 和
broker-b-slave 节点, 模拟故障, 验证高可用功能; 执行上面的脚本,输出如下:
生产者启动
正在执行本地事务----
发送结果:SendResult [sendStatus=SEND_OK, msgId=7F000001EC5618B4AAC273F5C2B70000,
offsetMsgId=null, messageQueue=MessageQueue [topic=TP-E-APP-YYY,
brokerName=broker-b, queueId=1], queueOffset=253]
发送结果状态:SEND_OK
正在执行本地事务----
发送结果:SendResult [sendStatus=SEND_OK, msgId=7F000001EC5618B4AAC273F5CAA00004,
offsetMsgId=null, messageQueue=MessageQueue [topic=TP-E-APP-YYY,
brokerName=broker-b, queueId=2], queueOffset=254]
发送结果状态:SEND_OK
正在执行本地事务----
发送结果:SendResult [sendStatus=SEND_OK, msgId=7F000001EC5618B4AAC273F5D2730008,
offsetMsgId=null, messageQueue=MessageQueue [topic=TP-E-APP-YYY,
brokerName=broker-b, queueId=3], queueOffset=255]
发送结果状态:SEND_OK
生产者结束
在rocketmq-dashboard,通过msgTrace功能,在RMQ_SYS_TRANS_HALF_TOPIC中,根据msgId查看输出结果中,状态及相关信息:
Send Message Info : ( Message Id 7F000001EC5618B4AAC273F5C2B70000 ) TAGA
Check Transaction Info :
Timestamp TransactionState FromTransactionCheck ClientHost
StoreHost
2022-02-23 12:24:46.030 COMMIT_MESSAGE false 10.177.96.117
10.177.96.115:22922
Send Message Info : ( Message Id 7F000001EC5618B4AAC273F5CAA00004 ) TAGB
Check Transaction Info :
Timestamp TransactionState FromTransactionCheck ClientHost
StoreHost
2022-02-23 12:24:48.035 ROLLBACK_MESSAGE false 10.177.96.117
10.177.96.115:22922
Send Message Info : ( Message Id 7F000001EC5618B4AAC273F5D2730008 ) TAGC
Check Transaction Info :
Timestamp TransactionState FromTransactionCheck ClientHost
StoreHost
2022-02-23 12:24:50.038 UNKNOW false 10.177.96.117
10.177.96.115:22922
结论: 符合代码逻辑
- What did you expect to see?
同步复制的主备集群, 当备节点宕机时, 不应影响事务消息的处理逻辑, 不应忽略本地事务处理代码强制将所有消息 rollback
- What did you see instead?
异步复制的主备集群, 当备节点宕机时, 事务消息处理逻辑正常
2. Please tell us about your environment:
Oracle Linux 8.4
openjdk version "1.8.0_322"
rocketmq-4.9.2-bin-all.zip
2主2备 同步复制 + 同步刷盘 当备节点宕机时, 事务消息不遵循本地事务处理逻辑, 被强制 rollback
2主2备 异步复制 + 同步刷盘 当备节点宕机时, 事务消息遵循本地事务处理逻辑
3. Other information (e.g. detailed explanation, logs, related issues,
suggestions how to fix, etc):
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]