This is an automated email from the ASF dual-hosted git repository.
duhengforever pushed a commit to branch develop
in repository https://gitbox.apache.org/repos/asf/rocketmq.git
The following commit(s) were added to refs/heads/develop by this push:
new ad7622c fix(transaction) fix the send back message sent into
transaction half topic (#1649)
ad7622c is described below
commit ad7622c14759dd8c87128de572be219cf3e3c64b
Author: Heng Du <[email protected]>
AuthorDate: Fri Jan 10 20:51:26 2020 +0800
fix(transaction) fix the send back message sent into transaction half topic
(#1649)
---
.../org/apache/rocketmq/broker/processor/EndTransactionProcessor.java | 1 +
.../org/apache/rocketmq/broker/processor/SendMessageProcessor.java | 3 ++-
.../rocketmq/client/impl/consumer/ConsumeMessageOrderlyService.java | 1 +
.../rocketmq/client/impl/consumer/DefaultMQPushConsumerImpl.java | 1 +
4 files changed, 5 insertions(+), 1 deletion(-)
diff --git
a/broker/src/main/java/org/apache/rocketmq/broker/processor/EndTransactionProcessor.java
b/broker/src/main/java/org/apache/rocketmq/broker/processor/EndTransactionProcessor.java
index 1d5943d..9844cae 100644
---
a/broker/src/main/java/org/apache/rocketmq/broker/processor/EndTransactionProcessor.java
+++
b/broker/src/main/java/org/apache/rocketmq/broker/processor/EndTransactionProcessor.java
@@ -132,6 +132,7 @@ public class EndTransactionProcessor implements
NettyRequestProcessor {
msgInner.setQueueOffset(requestHeader.getTranStateTableOffset());
msgInner.setPreparedTransactionOffset(requestHeader.getCommitLogOffset());
msgInner.setStoreTimestamp(result.getPrepareMessage().getStoreTimestamp());
+ MessageAccessor.clearProperty(msgInner,
MessageConst.PROPERTY_TRANSACTION_PREPARED);
RemotingCommand sendResult = sendFinalMessage(msgInner);
if (sendResult.getCode() == ResponseCode.SUCCESS) {
this.brokerController.getTransactionalMessageService().deletePrepareMessage(result.getPrepareMessage());
diff --git
a/broker/src/main/java/org/apache/rocketmq/broker/processor/SendMessageProcessor.java
b/broker/src/main/java/org/apache/rocketmq/broker/processor/SendMessageProcessor.java
index 2589a75..f753ebb 100644
---
a/broker/src/main/java/org/apache/rocketmq/broker/processor/SendMessageProcessor.java
+++
b/broker/src/main/java/org/apache/rocketmq/broker/processor/SendMessageProcessor.java
@@ -353,7 +353,8 @@ public class SendMessageProcessor extends
AbstractSendMessageProcessor implement
PutMessageResult putMessageResult = null;
Map<String, String> oriProps =
MessageDecoder.string2messageProperties(requestHeader.getProperties());
String traFlag =
oriProps.get(MessageConst.PROPERTY_TRANSACTION_PREPARED);
- if (traFlag != null && Boolean.parseBoolean(traFlag)) {
+ if (traFlag != null && Boolean.parseBoolean(traFlag)
+ && !(msgInner.getReconsumeTimes() > 0 &&
msgInner.getDelayTimeLevel() > 0)) { //For client under version 4.6.1
if
(this.brokerController.getBrokerConfig().isRejectTransactionMessage()) {
response.setCode(ResponseCode.NO_PERMISSION);
response.setRemark(
diff --git
a/client/src/main/java/org/apache/rocketmq/client/impl/consumer/ConsumeMessageOrderlyService.java
b/client/src/main/java/org/apache/rocketmq/client/impl/consumer/ConsumeMessageOrderlyService.java
index edc2647..e440bd9 100644
---
a/client/src/main/java/org/apache/rocketmq/client/impl/consumer/ConsumeMessageOrderlyService.java
+++
b/client/src/main/java/org/apache/rocketmq/client/impl/consumer/ConsumeMessageOrderlyService.java
@@ -374,6 +374,7 @@ public class ConsumeMessageOrderlyService implements
ConsumeMessageService {
MessageAccessor.putProperty(newMsg,
MessageConst.PROPERTY_RETRY_TOPIC, msg.getTopic());
MessageAccessor.setReconsumeTime(newMsg,
String.valueOf(msg.getReconsumeTimes()));
MessageAccessor.setMaxReconsumeTimes(newMsg,
String.valueOf(getMaxReconsumeTimes()));
+ MessageAccessor.clearProperty(newMsg,
MessageConst.PROPERTY_TRANSACTION_PREPARED);
newMsg.setDelayTimeLevel(3 + msg.getReconsumeTimes());
this.defaultMQPushConsumer.getDefaultMQPushConsumerImpl().getmQClientFactory().getDefaultMQProducer().send(newMsg);
diff --git
a/client/src/main/java/org/apache/rocketmq/client/impl/consumer/DefaultMQPushConsumerImpl.java
b/client/src/main/java/org/apache/rocketmq/client/impl/consumer/DefaultMQPushConsumerImpl.java
index 807e9c6..25a81a0 100644
---
a/client/src/main/java/org/apache/rocketmq/client/impl/consumer/DefaultMQPushConsumerImpl.java
+++
b/client/src/main/java/org/apache/rocketmq/client/impl/consumer/DefaultMQPushConsumerImpl.java
@@ -528,6 +528,7 @@ public class DefaultMQPushConsumerImpl implements
MQConsumerInner {
MessageAccessor.putProperty(newMsg,
MessageConst.PROPERTY_RETRY_TOPIC, msg.getTopic());
MessageAccessor.setReconsumeTime(newMsg,
String.valueOf(msg.getReconsumeTimes() + 1));
MessageAccessor.setMaxReconsumeTimes(newMsg,
String.valueOf(getMaxReconsumeTimes()));
+ MessageAccessor.clearProperty(newMsg,
MessageConst.PROPERTY_TRANSACTION_PREPARED);
newMsg.setDelayTimeLevel(3 + msg.getReconsumeTimes());
this.mQClientFactory.getDefaultMQProducer().send(newMsg);