This is an automated email from the ASF dual-hosted git repository.

duhengforever pushed a commit to branch fix_transaction_sendback
in repository https://gitbox.apache.org/repos/asf/rocketmq.git

commit 118bf22a4c1fe8d17495cc71f64c3f1ed74412ce
Author: duhenglucky <[email protected]>
AuthorDate: Fri Dec 13 02:22:26 2019 +0800

    fix(transaction) fix the send back message sent into transaction half topic
---
 .../org/apache/rocketmq/broker/processor/EndTransactionProcessor.java  | 1 +
 .../org/apache/rocketmq/broker/processor/SendMessageProcessor.java     | 3 ++-
 .../rocketmq/client/impl/consumer/ConsumeMessageOrderlyService.java    | 1 +
 .../rocketmq/client/impl/consumer/DefaultMQPushConsumerImpl.java       | 1 +
 4 files changed, 5 insertions(+), 1 deletion(-)

diff --git 
a/broker/src/main/java/org/apache/rocketmq/broker/processor/EndTransactionProcessor.java
 
b/broker/src/main/java/org/apache/rocketmq/broker/processor/EndTransactionProcessor.java
index 1d5943d..9844cae 100644
--- 
a/broker/src/main/java/org/apache/rocketmq/broker/processor/EndTransactionProcessor.java
+++ 
b/broker/src/main/java/org/apache/rocketmq/broker/processor/EndTransactionProcessor.java
@@ -132,6 +132,7 @@ public class EndTransactionProcessor implements 
NettyRequestProcessor {
                     
msgInner.setQueueOffset(requestHeader.getTranStateTableOffset());
                     
msgInner.setPreparedTransactionOffset(requestHeader.getCommitLogOffset());
                     
msgInner.setStoreTimestamp(result.getPrepareMessage().getStoreTimestamp());
+                    MessageAccessor.clearProperty(msgInner, 
MessageConst.PROPERTY_TRANSACTION_PREPARED);
                     RemotingCommand sendResult = sendFinalMessage(msgInner);
                     if (sendResult.getCode() == ResponseCode.SUCCESS) {
                         
this.brokerController.getTransactionalMessageService().deletePrepareMessage(result.getPrepareMessage());
diff --git 
a/broker/src/main/java/org/apache/rocketmq/broker/processor/SendMessageProcessor.java
 
b/broker/src/main/java/org/apache/rocketmq/broker/processor/SendMessageProcessor.java
index 2589a75..f753ebb 100644
--- 
a/broker/src/main/java/org/apache/rocketmq/broker/processor/SendMessageProcessor.java
+++ 
b/broker/src/main/java/org/apache/rocketmq/broker/processor/SendMessageProcessor.java
@@ -353,7 +353,8 @@ public class SendMessageProcessor extends 
AbstractSendMessageProcessor implement
         PutMessageResult putMessageResult = null;
         Map<String, String> oriProps = 
MessageDecoder.string2messageProperties(requestHeader.getProperties());
         String traFlag = 
oriProps.get(MessageConst.PROPERTY_TRANSACTION_PREPARED);
-        if (traFlag != null && Boolean.parseBoolean(traFlag)) {
+        if (traFlag != null && Boolean.parseBoolean(traFlag)
+            && !(msgInner.getReconsumeTimes() > 0 && 
msgInner.getDelayTimeLevel() > 0)) { //For client under version 4.6.1
             if 
(this.brokerController.getBrokerConfig().isRejectTransactionMessage()) {
                 response.setCode(ResponseCode.NO_PERMISSION);
                 response.setRemark(
diff --git 
a/client/src/main/java/org/apache/rocketmq/client/impl/consumer/ConsumeMessageOrderlyService.java
 
b/client/src/main/java/org/apache/rocketmq/client/impl/consumer/ConsumeMessageOrderlyService.java
index edc2647..e440bd9 100644
--- 
a/client/src/main/java/org/apache/rocketmq/client/impl/consumer/ConsumeMessageOrderlyService.java
+++ 
b/client/src/main/java/org/apache/rocketmq/client/impl/consumer/ConsumeMessageOrderlyService.java
@@ -374,6 +374,7 @@ public class ConsumeMessageOrderlyService implements 
ConsumeMessageService {
             MessageAccessor.putProperty(newMsg, 
MessageConst.PROPERTY_RETRY_TOPIC, msg.getTopic());
             MessageAccessor.setReconsumeTime(newMsg, 
String.valueOf(msg.getReconsumeTimes()));
             MessageAccessor.setMaxReconsumeTimes(newMsg, 
String.valueOf(getMaxReconsumeTimes()));
+            MessageAccessor.clearProperty(newMsg, 
MessageConst.PROPERTY_TRANSACTION_PREPARED);
             newMsg.setDelayTimeLevel(3 + msg.getReconsumeTimes());
 
             
this.defaultMQPushConsumer.getDefaultMQPushConsumerImpl().getmQClientFactory().getDefaultMQProducer().send(newMsg);
diff --git 
a/client/src/main/java/org/apache/rocketmq/client/impl/consumer/DefaultMQPushConsumerImpl.java
 
b/client/src/main/java/org/apache/rocketmq/client/impl/consumer/DefaultMQPushConsumerImpl.java
index 807e9c6..25a81a0 100644
--- 
a/client/src/main/java/org/apache/rocketmq/client/impl/consumer/DefaultMQPushConsumerImpl.java
+++ 
b/client/src/main/java/org/apache/rocketmq/client/impl/consumer/DefaultMQPushConsumerImpl.java
@@ -528,6 +528,7 @@ public class DefaultMQPushConsumerImpl implements 
MQConsumerInner {
             MessageAccessor.putProperty(newMsg, 
MessageConst.PROPERTY_RETRY_TOPIC, msg.getTopic());
             MessageAccessor.setReconsumeTime(newMsg, 
String.valueOf(msg.getReconsumeTimes() + 1));
             MessageAccessor.setMaxReconsumeTimes(newMsg, 
String.valueOf(getMaxReconsumeTimes()));
+            MessageAccessor.clearProperty(newMsg, 
MessageConst.PROPERTY_TRANSACTION_PREPARED);
             newMsg.setDelayTimeLevel(3 + msg.getReconsumeTimes());
 
             this.mQClientFactory.getDefaultMQProducer().send(newMsg);

Reply via email to