**BUG REPORT**
执行发送事务消息,listener代码:
public class TransactionListenerImpl implements TransactionListener {
private AtomicInteger count =new AtomicInteger(0);
@Override
public LocalTransactionState executeLocalTransaction(Message msg, Object
arg) {
System.out.println("执行本地事务..."+msg.toString());
return LocalTransactionState.COMMIT_MESSAGE;
}
@Override
public LocalTransactionState checkLocalTransaction(MessageExt msg) {
System.out.println("回查本地事务..."+msg.toString());
System.out.println("第"+count.incrementAndGet()+"次回查事务");
return LocalTransactionState.COMMIT_MESSAGE;
}
}
如上:
executeLocalTransaction方法返回了LocalTransactionState.COMMIT_MESSAGE,但还是会执行回查本地事务方法checkLocalTransaction,我很疑惑不知道我是否忽视了其他什么配置,调试源码调试到DefaultMQProducerImpl类的
public TransactionSendResult sendMessageInTransaction(final Message msg,
final
LocalTransactionExecuter localTransactionExecuter, final Object arg)方法如下部分:
LocalTransactionState localTransactionState = LocalTransactionState.UNKNOW;
Throwable localException = null;
switch (sendResult.getSendStatus()) {
case SEND_OK: {
try {
if (sendResult.getTransactionId() != null) {
msg.putUserProperty("__transactionId__",
sendResult.getTransactionId());
}
String transactionId =
msg.getProperty(MessageConst.PROPERTY_UNIQ_CLIENT_MESSAGE_ID_KEYIDX);
if (null != transactionId && !"".equals(transactionId)) {
msg.setTransactionId(transactionId);
}
if (null != localTransactionExecuter) {
localTransactionState =
localTransactionExecuter.executeLocalTransactionBranch(msg, arg);
} else if (transactionListener != null) {
log.debug("Used new transaction API");
transactionListener.executeLocalTransaction(msg, arg);
}
if (null == localTransactionState) {
localTransactionState = LocalTransactionState.UNKNOW;
}
if (localTransactionState !=
LocalTransactionState.COMMIT_MESSAGE) {
log.info("executeLocalTransactionBranch return {}",
localTransactionState);
log.info(msg.toString());
}
} catch (Throwable e) {
log.info("executeLocalTransactionBranch exception", e);
log.info(msg.toString());
localException = e;
}
}
break;
case FLUSH_DISK_TIMEOUT:
case FLUSH_SLAVE_TIMEOUT:
case SLAVE_NOT_AVAILABLE:
localTransactionState = LocalTransactionState.ROLLBACK_MESSAGE;
break;
default:
break;
}
发现
else if (transactionListener != null) {
log.debug("Used new transaction API");
transactionListener.executeLocalTransaction(msg, arg);
}
该段代码未将返回结果赋值给localTransactionState,这是否是导致以上问题的原因?
[ Full content available at: https://github.com/apache/rocketmq/issues/447 ]
This message was relayed via gitbox.apache.org for [email protected]