**BUG REPORT**

执行发送事务消息,listener代码:

public class TransactionListenerImpl implements TransactionListener {
    private AtomicInteger count =new AtomicInteger(0);
    @Override
    public LocalTransactionState executeLocalTransaction(Message msg, Object 
arg) {
        System.out.println("执行本地事务..."+msg.toString());
        return LocalTransactionState.COMMIT_MESSAGE;
    }
    @Override
    public LocalTransactionState checkLocalTransaction(MessageExt msg) {
        System.out.println("回查本地事务..."+msg.toString());
        System.out.println("第"+count.incrementAndGet()+"次回查事务");
        return LocalTransactionState.COMMIT_MESSAGE;
    }
}

如上:
executeLocalTransaction方法返回了LocalTransactionState.COMMIT_MESSAGE,但还是会执行回查本地事务方法checkLocalTransaction,我很疑惑不知道我是否忽视了其他什么配置,调试源码调试到DefaultMQProducerImpl类的
public TransactionSendResult sendMessageInTransaction(final Message msg,
                                                          final 
LocalTransactionExecuter localTransactionExecuter, final Object arg)方法如下部分:

LocalTransactionState localTransactionState = LocalTransactionState.UNKNOW;
        Throwable localException = null;
        switch (sendResult.getSendStatus()) {
            case SEND_OK: {
                try {
                    if (sendResult.getTransactionId() != null) {
                        msg.putUserProperty("__transactionId__", 
sendResult.getTransactionId());
                    }
                    String transactionId = 
msg.getProperty(MessageConst.PROPERTY_UNIQ_CLIENT_MESSAGE_ID_KEYIDX);
                    if (null != transactionId && !"".equals(transactionId)) {
                        msg.setTransactionId(transactionId);
                    }
                    if (null != localTransactionExecuter) {
                        localTransactionState = 
localTransactionExecuter.executeLocalTransactionBranch(msg, arg);
                    } else if (transactionListener != null) {
                        log.debug("Used new transaction API");
                        transactionListener.executeLocalTransaction(msg, arg);
                    }
                    if (null == localTransactionState) {
                        localTransactionState = LocalTransactionState.UNKNOW;
                    }

                    if (localTransactionState != 
LocalTransactionState.COMMIT_MESSAGE) {
                        log.info("executeLocalTransactionBranch return {}", 
localTransactionState);
                        log.info(msg.toString());
                    }
                } catch (Throwable e) {
                    log.info("executeLocalTransactionBranch exception", e);
                    log.info(msg.toString());
                    localException = e;
                }
            }
            break;
            case FLUSH_DISK_TIMEOUT:
            case FLUSH_SLAVE_TIMEOUT:
            case SLAVE_NOT_AVAILABLE:
                localTransactionState = LocalTransactionState.ROLLBACK_MESSAGE;
                break;
            default:
                break;
        }

发现
else if (transactionListener != null) {
                        log.debug("Used new transaction API");
                        transactionListener.executeLocalTransaction(msg, arg);
                    }
该段代码未将返回结果赋值给localTransactionState,这是否是导致以上问题的原因?

[ Full content available at: https://github.com/apache/rocketmq/issues/447 ]
This message was relayed via gitbox.apache.org for [email protected]

Reply via email to