This is an automated email from the ASF dual-hosted git repository.

dinglei pushed a commit to branch develop
in repository https://gitbox.apache.org/repos/asf/rocketmq.git


The following commit(s) were added to refs/heads/develop by this push:
     new c292574cd  [ISSUE #4758] Timer Message adapts to slaveActingMaster 
mode (#4757)
c292574cd is described below

commit c292574cd5bb1cdcb4d07542570a80bc01255b0d
Author: rongtong <[email protected]>
AuthorDate: Thu Aug 4 10:11:02 2022 +0800

     [ISSUE #4758] Timer Message adapts to slaveActingMaster mode (#4757)
    
    * Timer Message adapts to slaveActingMaster mode
    
    * Add asf-header to TimerCheckPointTest
---
 .../apache/rocketmq/broker/BrokerController.java   |   5 +
 .../rocketmq/broker/BrokerPreOnlineService.java    |  11 ++
 .../rocketmq/store/timer/TimerCheckpoint.java      |  36 +++++-
 .../rocketmq/store/timer/TimerMessageStore.java    | 103 ++++++++++-----
 .../rocketmq/store/timer/TimerCheckPointTest.java  | 127 +++++++++++++++++++
 .../store/timer/TimerMessageStoreTest.java         |  20 ++-
 .../test/container/GetMetadataReverseIT.java       |  72 +++++++++++
 .../container/ScheduleSlaveActingMasterIT.java     | 140 ++++++++++++++++++++-
 .../test/container/ScheduledMessageIT.java         |  36 +++++-
 9 files changed, 502 insertions(+), 48 deletions(-)

diff --git 
a/broker/src/main/java/org/apache/rocketmq/broker/BrokerController.java 
b/broker/src/main/java/org/apache/rocketmq/broker/BrokerController.java
index 53129e8a5..6d4fe3d80 100644
--- a/broker/src/main/java/org/apache/rocketmq/broker/BrokerController.java
+++ b/broker/src/main/java/org/apache/rocketmq/broker/BrokerController.java
@@ -732,6 +732,7 @@ public class BrokerController {
                     this.timerCheckpoint = new 
TimerCheckpoint(BrokerPathConfigHelper.getTimerCheckPath(messageStoreConfig.getStorePathRootDir()));
                     TimerMetrics timerMetrics = new 
TimerMetrics(BrokerPathConfigHelper.getTimerMetricsPath(messageStoreConfig.getStorePathRootDir()));
                     this.timerMessageStore = new 
TimerMessageStore(messageStore, messageStoreConfig, timerCheckpoint, 
timerMetrics, brokerStatsManager);
+                    this.timerMessageStore.registerEscapeBridgeHook(msg -> 
escapeBridge.putMessage(msg));
                     
this.messageStore.setTimerMessageStore(this.timerMessageStore);
                 }
             } catch (IOException e) {
@@ -1962,6 +1963,10 @@ public class BrokerController {
                 this.scheduleMessageService.stop();
             }
             isScheduleServiceStart = shouldStart;
+
+            if (timerMessageStore != null) {
+                timerMessageStore.setShouldRunningDequeue(shouldStart);
+            }
         }
     }
 
diff --git 
a/broker/src/main/java/org/apache/rocketmq/broker/BrokerPreOnlineService.java 
b/broker/src/main/java/org/apache/rocketmq/broker/BrokerPreOnlineService.java
index 10757c0db..35740f7cc 100644
--- 
a/broker/src/main/java/org/apache/rocketmq/broker/BrokerPreOnlineService.java
+++ 
b/broker/src/main/java/org/apache/rocketmq/broker/BrokerPreOnlineService.java
@@ -39,6 +39,7 @@ import org.apache.rocketmq.store.config.StorePathConfigHelper;
 import org.apache.rocketmq.broker.schedule.DelayOffsetSerializeWrapper;
 import org.apache.rocketmq.store.ha.HAConnectionState;
 import org.apache.rocketmq.store.ha.HAConnectionStateNotificationRequest;
+import org.apache.rocketmq.store.timer.TimerCheckpoint;
 
 public class BrokerPreOnlineService extends ServiceThread {
     private static final InternalLogger LOGGER = 
InternalLoggerFactory.getLogger(LoggerName.BROKER_LOGGER_NAME);
@@ -159,6 +160,8 @@ public class BrokerPreOnlineService extends ServiceThread {
 
             ConsumerOffsetSerializeWrapper consumerOffsetSerializeWrapper = 
this.brokerController.getBrokerOuterAPI().getAllConsumerOffset(brokerAddr);
 
+            TimerCheckpoint timerCheckpoint = 
this.brokerController.getBrokerOuterAPI().getTimerCheckPoint(brokerAddr);
+
             if (null != consumerOffsetSerializeWrapper && 
brokerController.getConsumerOffsetManager().getDataVersion().compare(consumerOffsetSerializeWrapper.getDataVersion())
 <= 0) {
                 LOGGER.info("{}'s consumerOffset data version is larger than 
master broker, {}'s consumerOffset will be used.", brokerAddr, brokerAddr);
                 
this.brokerController.getConsumerOffsetManager().getOffsetTable()
@@ -180,6 +183,14 @@ public class BrokerPreOnlineService extends ServiceThread {
                 }
             }
 
+            if (null != this.brokerController.getTimerCheckpoint() && 
this.brokerController.getTimerCheckpoint().getDataVersion().compare(timerCheckpoint.getDataVersion())
 <= 0) {
+                LOGGER.info("{}'s timerCheckpoint data version is larger than 
master broker, {}'s timerCheckpoint will be used.", brokerAddr, brokerAddr);
+                
this.brokerController.getTimerCheckpoint().setLastReadTimeMs(timerCheckpoint.getLastReadTimeMs());
+                
this.brokerController.getTimerCheckpoint().setMasterTimerQueueOffset(timerCheckpoint.getMasterTimerQueueOffset());
+                
this.brokerController.getTimerCheckpoint().getDataVersion().assignNewOne(timerCheckpoint.getDataVersion());
+                this.brokerController.getTimerCheckpoint().flush();
+            }
+
             for (BrokerAttachedPlugin brokerAttachedPlugin : 
brokerController.getBrokerAttachedPlugins()) {
                 if (brokerAttachedPlugin != null) {
                     brokerAttachedPlugin.syncMetadataReverse(brokerAddr);
diff --git 
a/store/src/main/java/org/apache/rocketmq/store/timer/TimerCheckpoint.java 
b/store/src/main/java/org/apache/rocketmq/store/timer/TimerCheckpoint.java
index 2d24b9d7f..158264049 100644
--- a/store/src/main/java/org/apache/rocketmq/store/timer/TimerCheckpoint.java
+++ b/store/src/main/java/org/apache/rocketmq/store/timer/TimerCheckpoint.java
@@ -16,6 +16,8 @@
  */
 package org.apache.rocketmq.store.timer;
 
+import java.util.concurrent.atomic.AtomicLong;
+import org.apache.rocketmq.common.DataVersion;
 import org.apache.rocketmq.common.UtilAll;
 import org.apache.rocketmq.common.constant.LoggerName;
 import org.apache.rocketmq.logging.InternalLogger;
@@ -38,6 +40,7 @@ public class TimerCheckpoint {
     private volatile long lastTimerLogFlushPos = 0;
     private volatile long lastTimerQueueOffset = 0;
     private volatile long masterTimerQueueOffset = 0; // read from master
+    private final DataVersion dataVersion = new DataVersion();
 
     public TimerCheckpoint() {
         this.randomAccessFile = null;
@@ -60,12 +63,21 @@ public class TimerCheckpoint {
             this.lastTimerLogFlushPos = this.mappedByteBuffer.getLong(8);
             this.lastTimerQueueOffset = this.mappedByteBuffer.getLong(16);
             this.masterTimerQueueOffset = this.mappedByteBuffer.getLong(24);
+            // new add to record dataVersion
+            if (this.mappedByteBuffer.hasRemaining()) {
+                dataVersion.setStateVersion(this.mappedByteBuffer.getLong(32));
+                dataVersion.setTimestamp(this.mappedByteBuffer.getLong(40));
+                dataVersion.setCounter(new 
AtomicLong(this.mappedByteBuffer.getLong(48)));
+            }
 
             log.info("timer checkpoint file lastReadTimeMs " + 
this.lastReadTimeMs + ", "
                 + UtilAll.timeMillisToHumanString(this.lastReadTimeMs));
             log.info("timer checkpoint file lastTimerLogFlushPos " + 
this.lastTimerLogFlushPos);
             log.info("timer checkpoint file lastTimerQueueOffset " + 
this.lastTimerQueueOffset);
             log.info("timer checkpoint file masterTimerQueueOffset " + 
this.masterTimerQueueOffset);
+            log.info("timer checkpoint file data version state version " + 
this.dataVersion.getStateVersion());
+            log.info("timer checkpoint file data version timestamp " + 
this.dataVersion.getTimestamp());
+            log.info("timer checkpoint file data version counter " + 
this.dataVersion.getCounter());
         } else {
             log.info("timer checkpoint file not exists, " + scpPath);
         }
@@ -96,6 +108,10 @@ public class TimerCheckpoint {
         this.mappedByteBuffer.putLong(8, this.lastTimerLogFlushPos);
         this.mappedByteBuffer.putLong(16, this.lastTimerQueueOffset);
         this.mappedByteBuffer.putLong(24, this.masterTimerQueueOffset);
+        // new add to record dataVersion
+        this.mappedByteBuffer.putLong(32, this.dataVersion.getStateVersion());
+        this.mappedByteBuffer.putLong(40, this.dataVersion.getTimestamp());
+        this.mappedByteBuffer.putLong(48, this.dataVersion.getCounter().get());
         this.mappedByteBuffer.force();
     }
 
@@ -104,11 +120,15 @@ public class TimerCheckpoint {
     }
 
     public static ByteBuffer encode(TimerCheckpoint another) {
-        ByteBuffer byteBuffer = ByteBuffer.allocate(32);
+        ByteBuffer byteBuffer = ByteBuffer.allocate(56);
         byteBuffer.putLong(another.getLastReadTimeMs());
         byteBuffer.putLong(another.getLastTimerLogFlushPos());
         byteBuffer.putLong(another.getLastTimerQueueOffset());
         byteBuffer.putLong(another.getMasterTimerQueueOffset());
+        // new add to record dataVersion
+        byteBuffer.putLong(another.getDataVersion().getStateVersion());
+        byteBuffer.putLong(another.getDataVersion().getTimestamp());
+        byteBuffer.putLong(another.getDataVersion().getCounter().get());
         byteBuffer.flip();
         return byteBuffer;
     }
@@ -119,6 +139,12 @@ public class TimerCheckpoint {
         tmp.setLastTimerLogFlushPos(byteBuffer.getLong());
         tmp.setLastTimerQueueOffset(byteBuffer.getLong());
         tmp.setMasterTimerQueueOffset(byteBuffer.getLong());
+        // new add to record dataVersion
+        if (byteBuffer.hasRemaining()) {
+            tmp.getDataVersion().setStateVersion(byteBuffer.getLong());
+            tmp.getDataVersion().setTimestamp(byteBuffer.getLong());
+            tmp.getDataVersion().setCounter(new 
AtomicLong(byteBuffer.getLong()));
+        }
         return tmp;
     }
 
@@ -149,4 +175,12 @@ public class TimerCheckpoint {
     public void setMasterTimerQueueOffset(final long masterTimerQueueOffset) {
         this.masterTimerQueueOffset = masterTimerQueueOffset;
     }
+
+    public void updateDateVersion(long stateVersion) {
+        dataVersion.nextVersion(stateVersion);
+    }
+
+    public DataVersion getDataVersion() {
+        return dataVersion;
+    }
 }
diff --git 
a/store/src/main/java/org/apache/rocketmq/store/timer/TimerMessageStore.java 
b/store/src/main/java/org/apache/rocketmq/store/timer/TimerMessageStore.java
index fa2af4086..ac0facff7 100644
--- a/store/src/main/java/org/apache/rocketmq/store/timer/TimerMessageStore.java
+++ b/store/src/main/java/org/apache/rocketmq/store/timer/TimerMessageStore.java
@@ -17,6 +17,7 @@
 package org.apache.rocketmq.store.timer;
 
 import com.conversantmedia.util.concurrent.DisruptorBlockingQueue;
+import java.util.function.Function;
 import org.apache.commons.collections.CollectionUtils;
 import org.apache.rocketmq.common.ServiceThread;
 import org.apache.rocketmq.common.ThreadFactoryImpl;
@@ -118,6 +119,8 @@ public class TimerMessageStore {
     private volatile long commitReadTimeMs;
     private volatile long currQueueOffset; //only one queue that is 0
     private volatile long commitQueueOffset;
+    private volatile long lastCommitReadTimeMs;
+    private volatile long lastCommitQueueOffset;
 
     private long lastEnqueueButExpiredTime;
     private long lastEnqueueButExpiredStoreTime;
@@ -136,10 +139,14 @@ public class TimerMessageStore {
     private boolean dequeueStatusChangeFlag = false;
     private long shouldStartTime;
 
+    // True if current store is master or current brokerId is equal to the 
minimum brokerId of the replica group in slaveActingMaster mode.
+    private volatile boolean shouldRunningDequeue;
     private final BrokerStatsManager brokerStatsManager;
+    private Function<MessageExtBrokerInner, PutMessageResult> escapeBridgeHook;
 
     public TimerMessageStore(final MessageStore messageStore, final 
MessageStoreConfig storeConfig,
-                             TimerCheckpoint timerCheckpoint, TimerMetrics 
timerMetrics, final BrokerStatsManager brokerStatsManager) throws IOException {
+        TimerCheckpoint timerCheckpoint, TimerMetrics timerMetrics,
+        final BrokerStatsManager brokerStatsManager) throws IOException {
         this.messageStore = messageStore;
         this.storeConfig = storeConfig;
         this.commitLogFileSize = storeConfig.getMappedFileSizeCommitLog();
@@ -148,7 +155,7 @@ public class TimerMessageStore {
         // TimerWheel contains the fixed number of slots regardless of 
precision.
         this.slotsTotal = TIMER_WHELL_TTL_DAY * DAY_SECS;
         this.timerWheel = new 
TimerWheel(getTimerWheelPath(storeConfig.getStorePathRootDir()),
-                this.slotsTotal, precisionMs);
+            this.slotsTotal, precisionMs);
         this.timerLog = new 
TimerLog(getTimerLogPath(storeConfig.getStorePathRootDir()), timerLogFileSize);
         this.timerMetrics = timerMetrics;
         this.timerCheckpoint = timerCheckpoint;
@@ -186,7 +193,9 @@ public class TimerMessageStore {
             dequeueGetMessageServices[i] = new TimerDequeueGetMessageService();
         }
         int putThreadNum = storeConfig.getTimerPutMessageThreadNum();
-        if (putThreadNum <= 0) putThreadNum = 1;
+        if (putThreadNum <= 0) {
+            putThreadNum = 1;
+        }
         dequeuePutMessageServices = new 
TimerDequeuePutMessageService[putThreadNum];
         for (int i = 0; i < dequeuePutMessageServices.length; i++) {
             dequeuePutMessageServices[i] = new TimerDequeuePutMessageService();
@@ -200,7 +209,6 @@ public class TimerMessageStore {
             dequeueGetQueue = new 
LinkedBlockingDeque<List<TimerRequest>>(1024);
             dequeuePutQueue = new LinkedBlockingDeque<TimerRequest>(1024);
         }
-
         this.brokerStatsManager = brokerStatsManager;
     }
 
@@ -280,11 +288,12 @@ public class TimerMessageStore {
             recoverAndRevise(minFirst, false);
         }
         LOGGER.info("Timer recover ok currReadTimerMs:{} currQueueOffset:{} 
checkQueueOffset:{} processOffset:{}",
-                currReadTimeMs, currQueueOffset, 
timerCheckpoint.getLastTimerQueueOffset(), processOffset);
+            currReadTimeMs, currQueueOffset, 
timerCheckpoint.getLastTimerQueueOffset(), processOffset);
 
         commitReadTimeMs = currReadTimeMs;
         commitQueueOffset = currQueueOffset;
 
+        prepareTimerCheckPoint();
     }
 
     public long reviseQueueOffset(long processOffset) {
@@ -314,7 +323,7 @@ public class TimerMessageStore {
             while (maxCount-- > 0) {
                 if (tmpOffset < 0) {
                     LOGGER.warn("reviseQueueOffset check cq offset fail, msg 
in cq is not found.{}, {}",
-                            offsetPy, sizePy);
+                        offsetPy, sizePy);
                     break;
                 }
                 SelectMappedBufferResult bufferCQ = 
cq.getIndexBuffer(tmpOffset);
@@ -328,7 +337,7 @@ public class TimerMessageStore {
                     int sizePyTemp = bufferCQ.getByteBuffer().getInt();
                     if (offsetPyTemp == offsetPy && sizePyTemp == sizePy) {
                         LOGGER.info("reviseQueueOffset check cq offset ok. {}, 
{}, {}",
-                                tmpOffset, offsetPyTemp, sizePyTemp);
+                            tmpOffset, offsetPyTemp, sizePyTemp);
                         cqOffset = tmpOffset;
                         break;
                     }
@@ -349,7 +358,7 @@ public class TimerMessageStore {
     //recover timerlog and revise timerwheel
     //return process offset
     private long recoverAndRevise(long beginOffset, boolean checkTimerLog) {
-        LOGGER.info("Begin fto recover timerlog offset:{} check:{}", 
beginOffset, checkTimerLog);
+        LOGGER.info("Begin to recover timerlog offset:{} check:{}", 
beginOffset, checkTimerLog);
         MappedFile lastFile = 
timerLog.getMappedFileQueue().getLastMappedFile();
         if (null == lastFile) {
             return 0;
@@ -472,6 +481,11 @@ public class TimerMessageStore {
         LOGGER.info("Timer start ok currReadTimerMs:[{}] queueOffset:[{}]", 
new Timestamp(currReadTimeMs), currQueueOffset);
     }
 
+    public void start(boolean shouldRunningDequeue) {
+        this.shouldRunningDequeue = shouldRunningDequeue;
+        this.start();
+    }
+
     public void shutdown() {
         if (SHUTDOWN == state) {
             return;
@@ -541,7 +555,7 @@ public class TimerMessageStore {
 
     private boolean isRunningEnqueue() {
         checkBrokerRole();
-        if (!isMaster() && currQueueOffset >= 
timerCheckpoint.getMasterTimerQueueOffset()) {
+        if (!shouldRunningDequeue && !isMaster() && currQueueOffset >= 
timerCheckpoint.getMasterTimerQueueOffset()) {
             return false;
         }
 
@@ -549,14 +563,22 @@ public class TimerMessageStore {
     }
 
     private boolean isRunningDequeue() {
-        if (!isMaster()) {
-            currReadTimeMs = timerCheckpoint.getLastReadTimeMs();
-            commitReadTimeMs = currReadTimeMs;
+        if (!this.shouldRunningDequeue) {
+            syncLastReadTimeMs();
             return false;
         }
         return isRunning();
     }
 
+    public void syncLastReadTimeMs() {
+        currReadTimeMs = timerCheckpoint.getLastReadTimeMs();
+        commitReadTimeMs = currReadTimeMs;
+    }
+
+    public void setShouldRunningDequeue(final boolean shouldRunningDequeue) {
+        this.shouldRunningDequeue = shouldRunningDequeue;
+    }
+
     public void addMetric(MessageExt msg, int value) {
         try {
             if (null == msg || null == 
msg.getProperty(MessageConst.PROPERTY_REAL_TOPIC)) {
@@ -694,7 +716,7 @@ public class TimerMessageStore {
             // If it's a delete message, then slot's total num -1
             // TODO: check if the delete msg is in the same slot with "the msg 
to be deleted".
             timerWheel.putSlot(delayedTime, slot.firstPos == -1 ? ret : 
slot.firstPos, ret,
-                    isDelete ? slot.num - 1 : slot.num + 1, slot.magic);
+                isDelete ? slot.num - 1 : slot.num + 1, slot.magic);
             addMetric(messageExt, isDelete ? -1 : 1);
         }
         return -1 != ret;
@@ -807,8 +829,8 @@ public class TimerMessageStore {
         int checkNum = 0;
         while (true) {
             if (dequeuePutQueue.size() > 0
-                    || !checkStateForGetMessages(AbstractStateService.WAITING)
-                    || 
!checkStateForPutMessages(AbstractStateService.WAITING)) {
+                || !checkStateForGetMessages(AbstractStateService.WAITING)
+                || !checkStateForPutMessages(AbstractStateService.WAITING)) {
                 //let it go
             } else {
                 checkNum++;
@@ -1006,15 +1028,19 @@ public class TimerMessageStore {
 
     //0 succ; 1 fail, need retry; 2 fail, do not retry;
     private int doPut(MessageExtBrokerInner message, boolean roll) throws 
Exception {
-        if (lastBrokerRole == BrokerRole.SLAVE) {
-            LOGGER.warn("Trying do put timer msg in slave, [{}]", message);
-            return PUT_NO_RETRY;
-        }
+
         if (!roll && null != 
message.getProperty(MessageConst.PROPERTY_TIMER_DEL_UNIQKEY)) {
             LOGGER.warn("Trying do put delete timer msg:[{}] roll:[{}]", 
message, roll);
             return PUT_NO_RETRY;
         }
-        PutMessageResult putMessageResult = messageStore.putMessage(message);
+
+        PutMessageResult putMessageResult = null;
+        if (escapeBridgeHook != null) {
+            putMessageResult = escapeBridgeHook.apply(message);
+        } else {
+            putMessageResult = messageStore.putMessage(message);
+        }
+
         int retryNum = 0;
         while (retryNum < 3) {
             if (null == putMessageResult || null == 
putMessageResult.getPutMessageStatus()) {
@@ -1025,7 +1051,7 @@ public class TimerMessageStore {
                         if (brokerStatsManager != null) {
                             
this.brokerStatsManager.incTopicPutNums(message.getTopic(), 1, 1);
                             
this.brokerStatsManager.incTopicPutSize(message.getTopic(),
-                                    
putMessageResult.getAppendMessageResult().getWroteBytes());
+                                
putMessageResult.getAppendMessageResult().getWroteBytes());
                             this.brokerStatsManager.incBrokerPutNums(1);
                         }
                         return PUT_OK;
@@ -1058,7 +1084,7 @@ public class TimerMessageStore {
         MessageAccessor.setProperties(msgInner, msgExt.getProperties());
         TopicFilterType topicFilterType = 
MessageExt.parseTopicFilterType(msgInner.getSysFlag());
         long tagsCodeValue =
-                MessageExtBrokerInner.tagsString2tagsCode(topicFilterType, 
msgInner.getTags());
+            MessageExtBrokerInner.tagsString2tagsCode(topicFilterType, 
msgInner.getTags());
         msgInner.setTagsCode(tagsCodeValue);
         
msgInner.setPropertiesString(MessageDecoder.messageProperties2String(msgExt.getProperties()));
 
@@ -1101,8 +1127,8 @@ public class TimerMessageStore {
                 int hash = hashTopicForMetrics(entry.getKey());
                 if (smallHashs.containsKey(hash)) {
                     LOGGER.warn("[CheckAndReviseMetrics]Metric hash collision 
between small-small code:{} small topic:{}{} small topic:{}{}", hash,
-                            entry.getKey(), entry.getValue(),
-                            smallHashs.get(hash), 
smallOnes.get(smallHashs.get(hash)));
+                        entry.getKey(), entry.getValue(),
+                        smallHashs.get(hash), 
smallOnes.get(smallHashs.get(hash)));
                     smallHashCollisions.add(hash);
                 }
                 smallHashs.put(hash, entry.getKey());
@@ -1118,8 +1144,8 @@ public class TimerMessageStore {
                     Map.Entry<String, TimerMetrics.Metric> smallEntry = 
smalllIt.next();
                     if (hashTopicForMetrics(smallEntry.getKey()) == 
hashTopicForMetrics(bjgEntry.getKey())) {
                         LOGGER.warn("[CheckAndReviseMetrics]Metric hash 
collision between small-big code:{} small topic:{}{} big topic:{}{}", 
hashTopicForMetrics(smallEntry.getKey()),
-                                smallEntry.getKey(), smallEntry.getValue(),
-                                bjgEntry.getKey(), bjgEntry.getValue());
+                            smallEntry.getKey(), smallEntry.getValue(),
+                            bjgEntry.getKey(), bjgEntry.getValue());
                         smalllIt.remove();
                     }
                 }
@@ -1276,7 +1302,7 @@ public class TimerMessageStore {
                             req.setLatch(latch);
                             try {
                                 perfs.startTick("enqueue_put");
-                                if (isMaster() && req.getDelayTime() < 
currWriteTimeMs) {
+                                if (shouldRunningDequeue && req.getDelayTime() 
< currWriteTimeMs) {
                                     dequeuePutQueue.put(req);
                                 } else {
                                     boolean doEnqueueRes = 
doEnqueue(req.getOffsetPy(), req.getSizePy(), req.getDelayTime(), req.getMsg());
@@ -1574,11 +1600,11 @@ public class TimerMessageStore {
                         ConsumeQueue cq = (ConsumeQueue) 
messageStore.getConsumeQueue(TIMER_TOPIC, 0);
                         long maxOffsetInQueue = cq == null ? 0 : 
cq.getMaxOffsetInQueue();
                         TimerMessageStore.LOGGER.info("[{}]Timer 
progress-check commitRead:[{}] currRead:[{}] currWrite:[{}] readBehind:{} 
currReadOffset:{} offsetBehind:{} behindMaster:{} " +
-                                        "enqPutQueue:{} deqGetQueue:{} 
deqPutQueue:{} allCongestNum:{} enqExpiredStoreTime:{}",
-                                storeConfig.getBrokerRole(),
-                                format(commitReadTimeMs), 
format(currReadTimeMs), format(currWriteTimeMs), getReadBehind(),
-                                tmpQueueOffset, maxOffsetInQueue - 
tmpQueueOffset, timerCheckpoint.getMasterTimerQueueOffset() - tmpQueueOffset,
-                                enqueuePutQueue.size(), 
dequeueGetQueue.size(), dequeuePutQueue.size(), getALlCongestNum(), 
format(lastEnqueueButExpiredStoreTime));
+                                "enqPutQueue:{} deqGetQueue:{} deqPutQueue:{} 
allCongestNum:{} enqExpiredStoreTime:{}",
+                            storeConfig.getBrokerRole(),
+                            format(commitReadTimeMs), format(currReadTimeMs), 
format(currWriteTimeMs), getReadBehind(),
+                            tmpQueueOffset, maxOffsetInQueue - tmpQueueOffset, 
timerCheckpoint.getMasterTimerQueueOffset() - tmpQueueOffset,
+                            enqueuePutQueue.size(), dequeueGetQueue.size(), 
dequeuePutQueue.size(), getALlCongestNum(), 
format(lastEnqueueButExpiredStoreTime));
                     }
                     timerMetrics.persist();
                     waitForRunning(storeConfig.getTimerFlushIntervalMs());
@@ -1640,13 +1666,22 @@ public class TimerMessageStore {
 
     public void prepareTimerCheckPoint() {
         
timerCheckpoint.setLastTimerLogFlushPos(timerLog.getMappedFileQueue().getFlushedWhere());
-        if (isMaster()) {
-            timerCheckpoint.setLastReadTimeMs(commitReadTimeMs);
+        timerCheckpoint.setLastReadTimeMs(commitReadTimeMs);
+        if (shouldRunningDequeue) {
             timerCheckpoint.setMasterTimerQueueOffset(commitQueueOffset);
+            if (commitReadTimeMs != lastCommitReadTimeMs || commitQueueOffset 
!= lastCommitQueueOffset) {
+                
timerCheckpoint.updateDateVersion(messageStore.getStateMachineVersion());
+                lastCommitReadTimeMs = commitReadTimeMs;
+                lastCommitQueueOffset = commitQueueOffset;
+            }
         }
         timerCheckpoint.setLastTimerQueueOffset(Math.min(commitQueueOffset, 
timerCheckpoint.getMasterTimerQueueOffset()));
     }
 
+    public void registerEscapeBridgeHook(Function<MessageExtBrokerInner, 
PutMessageResult> escapeBridgeHook) {
+        this.escapeBridgeHook = escapeBridgeHook;
+    }
+
     public boolean isMaster() {
         return BrokerRole.SLAVE != lastBrokerRole;
     }
diff --git 
a/store/src/test/java/org/apache/rocketmq/store/timer/TimerCheckPointTest.java 
b/store/src/test/java/org/apache/rocketmq/store/timer/TimerCheckPointTest.java
new file mode 100644
index 000000000..f72874af2
--- /dev/null
+++ 
b/store/src/test/java/org/apache/rocketmq/store/timer/TimerCheckPointTest.java
@@ -0,0 +1,127 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.rocketmq.store.timer;
+
+import java.io.File;
+import java.io.IOException;
+import java.util.concurrent.atomic.AtomicLong;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+
+import static org.junit.Assert.assertEquals;
+
+public class TimerCheckPointTest {
+
+    private String baseDir;
+
+    @Before
+    public void init() throws IOException {
+        baseDir = StoreTestUtils.createBaseDir();
+    }
+
+    @Test
+    public void testCheckPoint() throws IOException {
+        String baseSrc = baseDir + File.separator + "timercheck";
+        TimerCheckpoint first = new TimerCheckpoint(baseSrc);
+        assertEquals(0, first.getLastReadTimeMs());
+        assertEquals(0, first.getLastTimerLogFlushPos());
+        assertEquals(0, first.getLastTimerQueueOffset());
+        assertEquals(0, first.getMasterTimerQueueOffset());
+        first.setLastReadTimeMs(1000);
+        first.setLastTimerLogFlushPos(1100);
+        first.setLastTimerQueueOffset(1200);
+        first.setMasterTimerQueueOffset(1300);
+        first.shutdown();
+        TimerCheckpoint second = new TimerCheckpoint(baseSrc);
+        assertEquals(1000, second.getLastReadTimeMs());
+        assertEquals(1100, second.getLastTimerLogFlushPos());
+        assertEquals(1200, second.getLastTimerQueueOffset());
+        assertEquals(1300, second.getMasterTimerQueueOffset());
+    }
+
+    @Test
+    public void testNewCheckPoint() throws IOException {
+        String baseSrc = baseDir + File.separator + "timercheck2";
+        TimerCheckpoint first = new TimerCheckpoint(baseSrc);
+        assertEquals(0, first.getLastReadTimeMs());
+        assertEquals(0, first.getLastTimerLogFlushPos());
+        assertEquals(0, first.getLastTimerQueueOffset());
+        assertEquals(0, first.getMasterTimerQueueOffset());
+        assertEquals(0, first.getDataVersion().getStateVersion());
+        assertEquals(0, first.getDataVersion().getCounter().get());
+        first.setLastReadTimeMs(1000);
+        first.setLastTimerLogFlushPos(1100);
+        first.setLastTimerQueueOffset(1200);
+        first.setMasterTimerQueueOffset(1300);
+        first.getDataVersion().setStateVersion(1400);
+        first.getDataVersion().setTimestamp(1500);
+        first.getDataVersion().setCounter(new AtomicLong(1600));
+        first.shutdown();
+        TimerCheckpoint second = new TimerCheckpoint(baseSrc);
+        assertEquals(1000, second.getLastReadTimeMs());
+        assertEquals(1100, second.getLastTimerLogFlushPos());
+        assertEquals(1200, second.getLastTimerQueueOffset());
+        assertEquals(1300, second.getMasterTimerQueueOffset());
+        assertEquals(1400, second.getDataVersion().getStateVersion());
+        assertEquals(1500, second.getDataVersion().getTimestamp());
+        assertEquals(1600, second.getDataVersion().getCounter().get());
+    }
+
+    @Test
+    public void testEncodeDecode() throws IOException {
+        TimerCheckpoint first = new TimerCheckpoint();
+        first.setLastReadTimeMs(1000);
+        first.setLastTimerLogFlushPos(1100);
+        first.setLastTimerQueueOffset(1200);
+        first.setMasterTimerQueueOffset(1300);
+
+        TimerCheckpoint second = 
TimerCheckpoint.decode(TimerCheckpoint.encode(first));
+        assertEquals(first.getLastReadTimeMs(), second.getLastReadTimeMs());
+        assertEquals(first.getLastTimerLogFlushPos(), 
second.getLastTimerLogFlushPos());
+        assertEquals(first.getLastTimerQueueOffset(), 
second.getLastTimerQueueOffset());
+        assertEquals(first.getMasterTimerQueueOffset(), 
second.getMasterTimerQueueOffset());
+    }
+
+    @Test
+    public void testNewEncodeDecode() throws IOException {
+        TimerCheckpoint first = new TimerCheckpoint();
+        first.setLastReadTimeMs(1000);
+        first.setLastTimerLogFlushPos(1100);
+        first.setLastTimerQueueOffset(1200);
+        first.setMasterTimerQueueOffset(1300);
+        first.getDataVersion().setStateVersion(1400);
+        first.getDataVersion().setTimestamp(1500);
+        first.getDataVersion().setCounter(new AtomicLong(1600));
+        TimerCheckpoint second = 
TimerCheckpoint.decode(TimerCheckpoint.encode(first));
+        assertEquals(first.getLastReadTimeMs(), second.getLastReadTimeMs());
+        assertEquals(first.getLastTimerLogFlushPos(), 
second.getLastTimerLogFlushPos());
+        assertEquals(first.getLastTimerQueueOffset(), 
second.getLastTimerQueueOffset());
+        assertEquals(first.getMasterTimerQueueOffset(), 
second.getMasterTimerQueueOffset());
+        assertEquals(first.getDataVersion().getStateVersion(), 1400);
+        assertEquals(first.getDataVersion().getTimestamp(), 1500);
+        assertEquals(first.getDataVersion().getCounter().get(), 1600);
+    }
+
+    @After
+    public void shutdown() {
+        if (null != baseDir) {
+            StoreTestUtils.deleteFile(baseDir);
+        }
+    }
+}
diff --git 
a/store/src/test/java/org/apache/rocketmq/store/timer/TimerMessageStoreTest.java
 
b/store/src/test/java/org/apache/rocketmq/store/timer/TimerMessageStoreTest.java
index e893d482a..cc7ea41c7 100644
--- 
a/store/src/test/java/org/apache/rocketmq/store/timer/TimerMessageStoreTest.java
+++ 
b/store/src/test/java/org/apache/rocketmq/store/timer/TimerMessageStoreTest.java
@@ -64,8 +64,6 @@ import static org.junit.Assert.assertNotNull;
 import static org.junit.Assert.assertNull;
 import static org.junit.Assert.assertTrue;
 
-//// Timer unit tests are very unstable, ignore these temporarily
-//@Ignore
 public class TimerMessageStoreTest {
     private final byte[] msgBody = new byte[1024];
     private static MessageStore messageStore;
@@ -173,7 +171,7 @@ public class TimerMessageStoreTest {
 
         final TimerMessageStore timerMessageStore = 
createTimerMessageStore(null);
         timerMessageStore.load();
-        timerMessageStore.start();
+        timerMessageStore.start(true);
 
         long curr = System.currentTimeMillis() / precisionMs * precisionMs;
         long delayMs = curr + 3000;
@@ -220,7 +218,7 @@ public class TimerMessageStoreTest {
         storeConfig.setTimerCongestNumEachSlot(100);
         TimerMessageStore timerMessageStore = createTimerMessageStore(null);
         timerMessageStore.load();
-        timerMessageStore.start();
+        timerMessageStore.start(true);
 
         long curr = System.currentTimeMillis() / precisionMs * precisionMs;
         // Make sure delayMs won't be over.
@@ -265,7 +263,7 @@ public class TimerMessageStoreTest {
 
         TimerMessageStore timerMessageStore = createTimerMessageStore(null);
         timerMessageStore.load();
-        timerMessageStore.start();
+        timerMessageStore.start(true);
 
         long delayMs = System.currentTimeMillis() - 2 * precisionMs;
         for (int i = 0; i < 10; i++) {
@@ -289,7 +287,7 @@ public class TimerMessageStoreTest {
 
         TimerMessageStore timerMessageStore = createTimerMessageStore(null);
         timerMessageStore.load();
-        timerMessageStore.start();
+        timerMessageStore.start(true);
 
         long curr = System.currentTimeMillis() / precisionMs * precisionMs;
         long delayMs = curr + 1000;
@@ -326,7 +324,7 @@ public class TimerMessageStoreTest {
 
         final TimerMessageStore timerMessageStore = 
createTimerMessageStore(null);
         timerMessageStore.load();
-        timerMessageStore.start();
+        timerMessageStore.start(true);
 
         long curr = System.currentTimeMillis() / precisionMs * precisionMs;
         final long delayMs = curr + 1000;
@@ -373,7 +371,7 @@ public class TimerMessageStoreTest {
         String base = StoreTestUtils.createBaseDir();
         final TimerMessageStore first = createTimerMessageStore(base);
         first.load();
-        first.start();
+        first.start(true);
 
         final int msgNum = 250;
         long curr = System.currentTimeMillis() / precisionMs * precisionMs;
@@ -423,7 +421,7 @@ public class TimerMessageStoreTest {
         assertEquals(second.getCommitQueueOffset(), second.getQueueOffset());
         assertEquals(second.getCurrReadTimeMs(), second.getCommitReadTimeMs());
         assertEquals(first.getCommitReadTimeMs(), 
second.getCommitReadTimeMs());
-        second.start();
+        second.start(true);
 
         // Wait until all messages have wrote back to commitLog and 
consumeQueue.
         await().atMost(5000, TimeUnit.MILLISECONDS).until(new 
Callable<Boolean>() {
@@ -447,7 +445,7 @@ public class TimerMessageStoreTest {
 
         TimerMessageStore first = createTimerMessageStore(null);
         first.load();
-        first.start();
+        first.start(true);
 
         long curr = System.currentTimeMillis() / precisionMs * precisionMs;
         long delaySec = storeConfig.getTimerMaxDelaySec() + 20;
@@ -469,7 +467,7 @@ public class TimerMessageStoreTest {
 
         TimerMessageStore timerMessageStore = createTimerMessageStore(null);
         timerMessageStore.load();
-        timerMessageStore.start();
+        timerMessageStore.start(true);
 
         long curr = System.currentTimeMillis() / precisionMs * precisionMs;
         long delayMs = curr + 4 * precisionMs;
diff --git 
a/test/src/test/java/org/apache/rocketmq/test/container/GetMetadataReverseIT.java
 
b/test/src/test/java/org/apache/rocketmq/test/container/GetMetadataReverseIT.java
index 7138f4212..d1b38405e 100644
--- 
a/test/src/test/java/org/apache/rocketmq/test/container/GetMetadataReverseIT.java
+++ 
b/test/src/test/java/org/apache/rocketmq/test/container/GetMetadataReverseIT.java
@@ -228,4 +228,76 @@ public class GetMetadataReverseIT extends 
ContainerIntegrationTestBase {
 
         pushConsumer.shutdown();
     }
+
+    @Test
+    public void testGetMetadataReverse_timerCheckPoint() throws Exception {
+        String topic = GetMetadataReverseIT.class.getSimpleName() + 
"_timerCheckPoint" + random.nextInt(65535);
+        createTopicTo(master1With3Replicas, topic, 1, 1);
+        createTopicTo(master2With3Replicas, topic, 1, 1);
+        createTopicTo(master3With3Replicas, topic, 1, 1);
+        // Wait topic synchronization
+        await().atMost(Duration.ofMinutes(1)).until(() -> {
+            InnerSalveBrokerController slaveBroker = 
brokerContainer2.getSlaveBrokers().iterator().next();
+            return 
slaveBroker.getTopicConfigManager().selectTopicConfig(topic) != null;
+        });
+
+        DefaultMQPushConsumer pushConsumer = 
createPushConsumer(CONSUMER_GROUP);
+        pushConsumer.subscribe(topic, "*");
+        AtomicInteger receivedMsgCount = new AtomicInteger(0);
+        pushConsumer.registerMessageListener((MessageListenerConcurrently) 
(msgs, context) -> {
+            receivedMsgCount.addAndGet(msgs.size());
+            msgs.forEach(x -> System.out.printf(x + "%n"));
+            return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
+        });
+        pushConsumer.start();
+
+        MessageQueue messageQueue = new MessageQueue(topic, 
master1With3Replicas.getBrokerConfig().getBrokerName(), 0);
+        int sendSuccess = 0;
+        for (int i = 0; i < MESSAGE_COUNT; i++) {
+            Message msg = new Message(topic, Integer.toString(i).getBytes());
+            msg.setDelayTimeSec(30);
+            SendResult sendResult = producer.send(msg, messageQueue);
+            if (sendResult.getSendStatus() == SendStatus.SEND_OK) {
+                sendSuccess++;
+            }
+        }
+        final int finalSendSuccess = sendSuccess;
+        await().atMost(Duration.ofMinutes(1)).until(() -> finalSendSuccess >= 
MESSAGE_COUNT);
+        System.out.printf("send success%n");
+
+        isolateBroker(master1With3Replicas);
+        brokerContainer1.removeBroker(new BrokerIdentity(
+            master1With3Replicas.getBrokerConfig().getBrokerClusterName(),
+            master1With3Replicas.getBrokerConfig().getBrokerName(),
+            master1With3Replicas.getBrokerConfig().getBrokerId()));
+
+        System.out.printf("Remove master%n");
+
+        await().atMost(Duration.ofMinutes(1)).until(() -> 
receivedMsgCount.get() >= MESSAGE_COUNT);
+
+        await().atMost(Duration.ofMinutes(1)).until(() -> {
+            
pushConsumer.getDefaultMQPushConsumerImpl().persistConsumerOffset();
+            Map<Integer, Long> OffsetTable = 
master2With3Replicas.getConsumerOffsetManager().queryOffset(CONSUMER_GROUP, 
topic);
+            if (OffsetTable != null) {
+                long totalOffset = 0;
+                for (final Long offset : OffsetTable.values()) {
+                    totalOffset += offset;
+                }
+                return totalOffset >= MESSAGE_COUNT;
+            }
+            return false;
+        });
+
+        //Add back master
+        master1With3Replicas = 
brokerContainer1.addBroker(master1With3Replicas.getBrokerConfig(), 
master1With3Replicas.getMessageStoreConfig());
+        master1With3Replicas.start();
+        cancelIsolatedBroker(master1With3Replicas);
+        System.out.printf("Add back master%n");
+
+        awaitUntilSlaveOK();
+
+        await().atMost(Duration.ofMinutes(1)).until(() -> 
master1With3Replicas.getTimerCheckpoint().getMasterTimerQueueOffset() >= 
MESSAGE_COUNT);
+
+        pushConsumer.shutdown();
+    }
 }
diff --git 
a/test/src/test/java/org/apache/rocketmq/test/container/ScheduleSlaveActingMasterIT.java
 
b/test/src/test/java/org/apache/rocketmq/test/container/ScheduleSlaveActingMasterIT.java
index c2850f3b9..ac87ed722 100644
--- 
a/test/src/test/java/org/apache/rocketmq/test/container/ScheduleSlaveActingMasterIT.java
+++ 
b/test/src/test/java/org/apache/rocketmq/test/container/ScheduleSlaveActingMasterIT.java
@@ -39,7 +39,7 @@ import org.junit.Test;
 
 import static org.awaitility.Awaitility.await;
 
-//The test is correct, but it takes too much time, so it is ignored for the 
time being
+//The test is correct, but it takes too much time and not core functions, so 
it is ignored for the time being
 @Ignore
 public class ScheduleSlaveActingMasterIT extends ContainerIntegrationTestBase {
 
@@ -135,6 +135,65 @@ public class ScheduleSlaveActingMasterIT extends 
ContainerIntegrationTestBase {
         Thread.sleep(30000);
     }
 
+    @Test
+    public void testLocalActing_timerMsg() throws Exception {
+        awaitUntilSlaveOK();
+        String topic = ScheduleSlaveActingMasterIT.class.getSimpleName() + 
random.nextInt(65535);
+        createTopic(topic);
+        DefaultMQPushConsumer pushConsumer = createPushConsumer(CONSUME_GROUP);
+        pushConsumer.subscribe(topic, "*");
+        AtomicInteger receivedMsgCount = new AtomicInteger(0);
+        AtomicInteger inTimeMsgCount = new AtomicInteger(0);
+        pushConsumer.registerMessageListener((MessageListenerConcurrently) 
(msgs, context) -> {
+            long period = System.currentTimeMillis() - 
msgs.get(0).getBornTimestamp();
+            if (Math.abs(period - 30000) <= 1000) {
+                inTimeMsgCount.addAndGet(msgs.size());
+            }
+            receivedMsgCount.addAndGet(msgs.size());
+            msgs.forEach(x -> System.out.printf(x + "%n"));
+            return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
+        });
+        pushConsumer.start();
+
+        MessageQueue messageQueue = new MessageQueue(topic, 
master1With3Replicas.getBrokerConfig().getBrokerName(), 0);
+        int sendSuccess = 0;
+        for (int i = 0; i < MESSAGE_COUNT; i++) {
+            Message msg = new Message(topic, MESSAGE_BODY);
+            msg.setDelayTimeSec(30);
+            SendResult sendResult = producer.send(msg, messageQueue);
+            if (sendResult.getSendStatus() == SendStatus.SEND_OK) {
+                sendSuccess++;
+            }
+        }
+        final int finalSendSuccess = sendSuccess;
+        await().atMost(Duration.ofMinutes(2)).until(() -> finalSendSuccess >= 
MESSAGE_COUNT);
+        System.out.printf("send success%n");
+
+        isolateBroker(master1With3Replicas);
+        brokerContainer1.removeBroker(new BrokerIdentity(
+            master1With3Replicas.getBrokerConfig().getBrokerClusterName(),
+            master1With3Replicas.getBrokerConfig().getBrokerName(),
+            master1With3Replicas.getBrokerConfig().getBrokerId()));
+
+        System.out.printf("Remove master1%n");
+
+        await().atMost(Duration.ofMinutes(1)).until(() -> 
receivedMsgCount.get() >= MESSAGE_COUNT && inTimeMsgCount.get() >= 
MESSAGE_COUNT * 0.95);
+
+        System.out.printf("consumer received %d msg, %d in time%n", 
receivedMsgCount.get(), inTimeMsgCount.get());
+
+        pushConsumer.shutdown();
+
+        //Add back master
+        master1With3Replicas = 
brokerContainer1.addBroker(master1With3Replicas.getBrokerConfig(), 
master1With3Replicas.getMessageStoreConfig());
+        master1With3Replicas.start();
+        cancelIsolatedBroker(master1With3Replicas);
+        System.out.printf("Add back master1%n");
+
+        awaitUntilSlaveOK();
+        // sleep a while to recover
+        Thread.sleep(20000);
+    }
+
     @Test
     public void testRemoteActing_delayMsg() throws Exception {
         awaitUntilSlaveOK();
@@ -217,4 +276,83 @@ public class ScheduleSlaveActingMasterIT extends 
ContainerIntegrationTestBase {
         Thread.sleep(30000);
     }
 
+    @Test
+    public void testRemoteActing_timerMsg() throws Exception {
+        awaitUntilSlaveOK();
+
+        String topic = ScheduleSlaveActingMasterIT.class.getSimpleName() + 
random.nextInt(65535);
+        createTopic(topic);
+        AtomicInteger receivedMsgCount = new AtomicInteger(0);
+        AtomicInteger inTimeMsgCount = new AtomicInteger(0);
+        AtomicInteger master3MsgCount = new AtomicInteger(0);
+
+        MessageQueue messageQueue = new MessageQueue(topic, 
master1With3Replicas.getBrokerConfig().getBrokerName(), 0);
+        int sendSuccess = 0;
+        for (int i = 0; i < MESSAGE_COUNT; i++) {
+            Message msg = new Message(topic, MESSAGE_BODY);
+            msg.setDelayTimeSec(30);
+            SendResult sendResult = producer.send(msg, messageQueue);
+            if (sendResult.getSendStatus() == SendStatus.SEND_OK) {
+                sendSuccess++;
+            }
+        }
+        final int finalSendSuccess = sendSuccess;
+        await().atMost(Duration.ofMinutes(1)).until(() -> finalSendSuccess >= 
MESSAGE_COUNT);
+        long sendCompleteTimeStamp = System.currentTimeMillis();
+        System.out.printf("send success%n");
+
+        DefaultMQPushConsumer pushConsumer = createPushConsumer(CONSUME_GROUP);
+        pushConsumer.subscribe(topic, "*");
+        pushConsumer.registerMessageListener((MessageListenerConcurrently) 
(msgs, context) -> {
+            long period = System.currentTimeMillis() - sendCompleteTimeStamp;
+            // Remote Acting lead to born timestamp, msgId changed, it need to 
polish.
+            if (Math.abs(period - 30000) <= 3000) {
+                inTimeMsgCount.addAndGet(msgs.size());
+            }
+            if 
(msgs.get(0).getBrokerName().equals(master3With3Replicas.getBrokerConfig().getBrokerName()))
 {
+                master3MsgCount.addAndGet(msgs.size());
+            }
+            receivedMsgCount.addAndGet(msgs.size());
+            msgs.forEach(x -> System.out.printf("cost " + period + " " + x + 
"%n"));
+            return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
+        });
+        pushConsumer.start();
+
+        isolateBroker(master1With3Replicas);
+        brokerContainer1.removeBroker(new BrokerIdentity(
+            master1With3Replicas.getBrokerConfig().getBrokerClusterName(),
+            master1With3Replicas.getBrokerConfig().getBrokerName(),
+            master1With3Replicas.getBrokerConfig().getBrokerId()));
+        System.out.printf("Remove master1%n");
+
+        isolateBroker(master2With3Replicas);
+        brokerContainer2.removeBroker(new BrokerIdentity(
+            master2With3Replicas.getBrokerConfig().getBrokerClusterName(),
+            master2With3Replicas.getBrokerConfig().getBrokerName(),
+            master2With3Replicas.getBrokerConfig().getBrokerId()));
+        System.out.printf("Remove master2%n");
+
+        await().atMost(Duration.ofMinutes(1)).until(() -> 
receivedMsgCount.get() >= MESSAGE_COUNT && master3MsgCount.get() >= 
MESSAGE_COUNT && inTimeMsgCount.get() >= MESSAGE_COUNT * 0.95);
+
+        System.out.printf("consumer received %d msg, %d in time%n", 
receivedMsgCount.get(), inTimeMsgCount.get());
+
+        pushConsumer.shutdown();
+
+        //Add back master
+        master1With3Replicas = 
brokerContainer1.addBroker(master1With3Replicas.getBrokerConfig(), 
master1With3Replicas.getMessageStoreConfig());
+        master1With3Replicas.start();
+        cancelIsolatedBroker(master1With3Replicas);
+        System.out.printf("Add back master1%n");
+
+        //Add back master
+        master2With3Replicas = 
brokerContainer2.addBroker(master2With3Replicas.getBrokerConfig(), 
master2With3Replicas.getMessageStoreConfig());
+        master2With3Replicas.start();
+        cancelIsolatedBroker(master2With3Replicas);
+        System.out.printf("Add back master2%n");
+
+        awaitUntilSlaveOK();
+        // sleep a while to recover
+        Thread.sleep(20000);
+    }
+
 }
diff --git 
a/test/src/test/java/org/apache/rocketmq/test/container/ScheduledMessageIT.java 
b/test/src/test/java/org/apache/rocketmq/test/container/ScheduledMessageIT.java
index 617d72233..82bceebfd 100644
--- 
a/test/src/test/java/org/apache/rocketmq/test/container/ScheduledMessageIT.java
+++ 
b/test/src/test/java/org/apache/rocketmq/test/container/ScheduledMessageIT.java
@@ -96,7 +96,7 @@ public class ScheduledMessageIT extends 
ContainerIntegrationTestBase {
                 inTimeMsgCount.addAndGet(msgs.size());
             }
             receivedMsgCount.addAndGet(msgs.size());
-            msgs.forEach(x -> System.out.printf(receivedMsgCount.get()+" cost 
" + period + " " + x + "%n"));
+            msgs.forEach(x -> System.out.printf(receivedMsgCount.get() + " 
cost " + period + " " + x + "%n"));
 
             return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
         });
@@ -150,4 +150,38 @@ public class ScheduledMessageIT extends 
ContainerIntegrationTestBase {
             .until(() -> ((DefaultMessageStore) 
master1With3Replicas.getMessageStore()).getHaService().getConnectionCount().get()
 == 2);
     }
 
+    @Test
+    public void consumeTimerMsgFromSlave() throws MQClientException, 
RemotingException, InterruptedException, MQBrokerException {
+        String topic = TOPIC_PREFIX + random.nextInt(65535);
+        createTopic(topic);
+        DefaultMQPushConsumer pushConsumer = createPushConsumer(CONSUME_GROUP);
+        pushConsumer.subscribe(topic, "*");
+        AtomicInteger receivedMsgCount = new AtomicInteger(0);
+        pushConsumer.registerMessageListener((MessageListenerConcurrently) 
(msgs, context) -> {
+            receivedMsgCount.addAndGet(msgs.size());
+            return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
+        });
+
+        for (int i = 0; i < MESSAGE_COUNT; i++) {
+            Message msg = new Message(topic, String.valueOf(i).getBytes());
+            msg.setDelayTimeSec(3);
+            producer.send(msg);
+        }
+
+        isolateBroker(master1With3Replicas);
+
+        
producer.getDefaultMQProducerImpl().getmQClientFactory().updateTopicRouteInfoFromNameServer(topic);
+        
assertThat(producer.getDefaultMQProducerImpl().getmQClientFactory().findBrokerAddressInPublish(topic)).isNull();
+
+        pushConsumer.start();
+
+        await().atMost(Duration.ofSeconds(MESSAGE_COUNT * 2)).until(() -> 
receivedMsgCount.get() >= MESSAGE_COUNT);
+
+        pushConsumer.shutdown();
+        cancelIsolatedBroker(master1With3Replicas);
+
+        await().atMost(100, TimeUnit.SECONDS)
+            .until(() -> ((DefaultMessageStore) 
master1With3Replicas.getMessageStore()).getHaService().getConnectionCount().get()
 == 2);
+    }
+
 }

Reply via email to