This is an automated email from the ASF dual-hosted git repository. jinrongtong pushed a commit to branch test-release in repository https://gitbox.apache.org/repos/asf/rocketmq.git
commit 98b520dbde13f2e3e9868d5759837b74a62764b8 Author: huangli <[email protected]> AuthorDate: Wed Dec 11 10:30:39 2019 +0800 Avoid message in schedule queue enter half message queue (#1626) --- .../org/apache/rocketmq/store/schedule/ScheduleMessageService.java | 6 ++++++ 1 file changed, 6 insertions(+) diff --git a/store/src/main/java/org/apache/rocketmq/store/schedule/ScheduleMessageService.java b/store/src/main/java/org/apache/rocketmq/store/schedule/ScheduleMessageService.java index 50a48d4..3be8cbc 100644 --- a/store/src/main/java/org/apache/rocketmq/store/schedule/ScheduleMessageService.java +++ b/store/src/main/java/org/apache/rocketmq/store/schedule/ScheduleMessageService.java @@ -25,6 +25,7 @@ import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentMap; import java.util.concurrent.atomic.AtomicBoolean; import org.apache.rocketmq.common.ConfigManager; +import org.apache.rocketmq.common.MixAll; import org.apache.rocketmq.common.TopicFilterType; import org.apache.rocketmq.common.constant.LoggerName; import org.apache.rocketmq.logging.InternalLogger; @@ -305,6 +306,11 @@ public class ScheduleMessageService extends ConfigManager { if (msgExt != null) { try { MessageExtBrokerInner msgInner = this.messageTimeup(msgExt); + if (MixAll.RMQ_SYS_TRANS_HALF_TOPIC.equals(msgInner.getTopic())) { + log.error("[BUG] the real topic of schedule msg is {}, discard the msg. msg={}", + msgInner.getTopic(), msgInner); + continue; + } PutMessageResult putMessageResult = ScheduleMessageService.this.writeMessageStore .putMessage(msgInner);
