This is an automated email from the ASF dual-hosted git repository.
dinglei pushed a commit to branch develop
in repository https://gitbox.apache.org/repos/asf/rocketmq.git
The following commit(s) were added to refs/heads/develop by this push:
new c292574cd [ISSUE #4758] Timer Message adapts to slaveActingMaster
mode (#4757)
c292574cd is described below
commit c292574cd5bb1cdcb4d07542570a80bc01255b0d
Author: rongtong <[email protected]>
AuthorDate: Thu Aug 4 10:11:02 2022 +0800
[ISSUE #4758] Timer Message adapts to slaveActingMaster mode (#4757)
* Timer Message adapts to slaveActingMaster mode
* Add asf-header to TimerCheckPointTest
---
.../apache/rocketmq/broker/BrokerController.java | 5 +
.../rocketmq/broker/BrokerPreOnlineService.java | 11 ++
.../rocketmq/store/timer/TimerCheckpoint.java | 36 +++++-
.../rocketmq/store/timer/TimerMessageStore.java | 103 ++++++++++-----
.../rocketmq/store/timer/TimerCheckPointTest.java | 127 +++++++++++++++++++
.../store/timer/TimerMessageStoreTest.java | 20 ++-
.../test/container/GetMetadataReverseIT.java | 72 +++++++++++
.../container/ScheduleSlaveActingMasterIT.java | 140 ++++++++++++++++++++-
.../test/container/ScheduledMessageIT.java | 36 +++++-
9 files changed, 502 insertions(+), 48 deletions(-)
diff --git
a/broker/src/main/java/org/apache/rocketmq/broker/BrokerController.java
b/broker/src/main/java/org/apache/rocketmq/broker/BrokerController.java
index 53129e8a5..6d4fe3d80 100644
--- a/broker/src/main/java/org/apache/rocketmq/broker/BrokerController.java
+++ b/broker/src/main/java/org/apache/rocketmq/broker/BrokerController.java
@@ -732,6 +732,7 @@ public class BrokerController {
this.timerCheckpoint = new
TimerCheckpoint(BrokerPathConfigHelper.getTimerCheckPath(messageStoreConfig.getStorePathRootDir()));
TimerMetrics timerMetrics = new
TimerMetrics(BrokerPathConfigHelper.getTimerMetricsPath(messageStoreConfig.getStorePathRootDir()));
this.timerMessageStore = new
TimerMessageStore(messageStore, messageStoreConfig, timerCheckpoint,
timerMetrics, brokerStatsManager);
+ this.timerMessageStore.registerEscapeBridgeHook(msg ->
escapeBridge.putMessage(msg));
this.messageStore.setTimerMessageStore(this.timerMessageStore);
}
} catch (IOException e) {
@@ -1962,6 +1963,10 @@ public class BrokerController {
this.scheduleMessageService.stop();
}
isScheduleServiceStart = shouldStart;
+
+ if (timerMessageStore != null) {
+ timerMessageStore.setShouldRunningDequeue(shouldStart);
+ }
}
}
diff --git
a/broker/src/main/java/org/apache/rocketmq/broker/BrokerPreOnlineService.java
b/broker/src/main/java/org/apache/rocketmq/broker/BrokerPreOnlineService.java
index 10757c0db..35740f7cc 100644
---
a/broker/src/main/java/org/apache/rocketmq/broker/BrokerPreOnlineService.java
+++
b/broker/src/main/java/org/apache/rocketmq/broker/BrokerPreOnlineService.java
@@ -39,6 +39,7 @@ import org.apache.rocketmq.store.config.StorePathConfigHelper;
import org.apache.rocketmq.broker.schedule.DelayOffsetSerializeWrapper;
import org.apache.rocketmq.store.ha.HAConnectionState;
import org.apache.rocketmq.store.ha.HAConnectionStateNotificationRequest;
+import org.apache.rocketmq.store.timer.TimerCheckpoint;
public class BrokerPreOnlineService extends ServiceThread {
private static final InternalLogger LOGGER =
InternalLoggerFactory.getLogger(LoggerName.BROKER_LOGGER_NAME);
@@ -159,6 +160,8 @@ public class BrokerPreOnlineService extends ServiceThread {
ConsumerOffsetSerializeWrapper consumerOffsetSerializeWrapper =
this.brokerController.getBrokerOuterAPI().getAllConsumerOffset(brokerAddr);
+ TimerCheckpoint timerCheckpoint =
this.brokerController.getBrokerOuterAPI().getTimerCheckPoint(brokerAddr);
+
if (null != consumerOffsetSerializeWrapper &&
brokerController.getConsumerOffsetManager().getDataVersion().compare(consumerOffsetSerializeWrapper.getDataVersion())
<= 0) {
LOGGER.info("{}'s consumerOffset data version is larger than
master broker, {}'s consumerOffset will be used.", brokerAddr, brokerAddr);
this.brokerController.getConsumerOffsetManager().getOffsetTable()
@@ -180,6 +183,14 @@ public class BrokerPreOnlineService extends ServiceThread {
}
}
+ if (null != this.brokerController.getTimerCheckpoint() &&
this.brokerController.getTimerCheckpoint().getDataVersion().compare(timerCheckpoint.getDataVersion())
<= 0) {
+ LOGGER.info("{}'s timerCheckpoint data version is larger than
master broker, {}'s timerCheckpoint will be used.", brokerAddr, brokerAddr);
+
this.brokerController.getTimerCheckpoint().setLastReadTimeMs(timerCheckpoint.getLastReadTimeMs());
+
this.brokerController.getTimerCheckpoint().setMasterTimerQueueOffset(timerCheckpoint.getMasterTimerQueueOffset());
+
this.brokerController.getTimerCheckpoint().getDataVersion().assignNewOne(timerCheckpoint.getDataVersion());
+ this.brokerController.getTimerCheckpoint().flush();
+ }
+
for (BrokerAttachedPlugin brokerAttachedPlugin :
brokerController.getBrokerAttachedPlugins()) {
if (brokerAttachedPlugin != null) {
brokerAttachedPlugin.syncMetadataReverse(brokerAddr);
diff --git
a/store/src/main/java/org/apache/rocketmq/store/timer/TimerCheckpoint.java
b/store/src/main/java/org/apache/rocketmq/store/timer/TimerCheckpoint.java
index 2d24b9d7f..158264049 100644
--- a/store/src/main/java/org/apache/rocketmq/store/timer/TimerCheckpoint.java
+++ b/store/src/main/java/org/apache/rocketmq/store/timer/TimerCheckpoint.java
@@ -16,6 +16,8 @@
*/
package org.apache.rocketmq.store.timer;
+import java.util.concurrent.atomic.AtomicLong;
+import org.apache.rocketmq.common.DataVersion;
import org.apache.rocketmq.common.UtilAll;
import org.apache.rocketmq.common.constant.LoggerName;
import org.apache.rocketmq.logging.InternalLogger;
@@ -38,6 +40,7 @@ public class TimerCheckpoint {
private volatile long lastTimerLogFlushPos = 0;
private volatile long lastTimerQueueOffset = 0;
private volatile long masterTimerQueueOffset = 0; // read from master
+ private final DataVersion dataVersion = new DataVersion();
public TimerCheckpoint() {
this.randomAccessFile = null;
@@ -60,12 +63,21 @@ public class TimerCheckpoint {
this.lastTimerLogFlushPos = this.mappedByteBuffer.getLong(8);
this.lastTimerQueueOffset = this.mappedByteBuffer.getLong(16);
this.masterTimerQueueOffset = this.mappedByteBuffer.getLong(24);
+ // new add to record dataVersion
+ if (this.mappedByteBuffer.hasRemaining()) {
+ dataVersion.setStateVersion(this.mappedByteBuffer.getLong(32));
+ dataVersion.setTimestamp(this.mappedByteBuffer.getLong(40));
+ dataVersion.setCounter(new
AtomicLong(this.mappedByteBuffer.getLong(48)));
+ }
log.info("timer checkpoint file lastReadTimeMs " +
this.lastReadTimeMs + ", "
+ UtilAll.timeMillisToHumanString(this.lastReadTimeMs));
log.info("timer checkpoint file lastTimerLogFlushPos " +
this.lastTimerLogFlushPos);
log.info("timer checkpoint file lastTimerQueueOffset " +
this.lastTimerQueueOffset);
log.info("timer checkpoint file masterTimerQueueOffset " +
this.masterTimerQueueOffset);
+ log.info("timer checkpoint file data version state version " +
this.dataVersion.getStateVersion());
+ log.info("timer checkpoint file data version timestamp " +
this.dataVersion.getTimestamp());
+ log.info("timer checkpoint file data version counter " +
this.dataVersion.getCounter());
} else {
log.info("timer checkpoint file not exists, " + scpPath);
}
@@ -96,6 +108,10 @@ public class TimerCheckpoint {
this.mappedByteBuffer.putLong(8, this.lastTimerLogFlushPos);
this.mappedByteBuffer.putLong(16, this.lastTimerQueueOffset);
this.mappedByteBuffer.putLong(24, this.masterTimerQueueOffset);
+ // new add to record dataVersion
+ this.mappedByteBuffer.putLong(32, this.dataVersion.getStateVersion());
+ this.mappedByteBuffer.putLong(40, this.dataVersion.getTimestamp());
+ this.mappedByteBuffer.putLong(48, this.dataVersion.getCounter().get());
this.mappedByteBuffer.force();
}
@@ -104,11 +120,15 @@ public class TimerCheckpoint {
}
public static ByteBuffer encode(TimerCheckpoint another) {
- ByteBuffer byteBuffer = ByteBuffer.allocate(32);
+ ByteBuffer byteBuffer = ByteBuffer.allocate(56);
byteBuffer.putLong(another.getLastReadTimeMs());
byteBuffer.putLong(another.getLastTimerLogFlushPos());
byteBuffer.putLong(another.getLastTimerQueueOffset());
byteBuffer.putLong(another.getMasterTimerQueueOffset());
+ // new add to record dataVersion
+ byteBuffer.putLong(another.getDataVersion().getStateVersion());
+ byteBuffer.putLong(another.getDataVersion().getTimestamp());
+ byteBuffer.putLong(another.getDataVersion().getCounter().get());
byteBuffer.flip();
return byteBuffer;
}
@@ -119,6 +139,12 @@ public class TimerCheckpoint {
tmp.setLastTimerLogFlushPos(byteBuffer.getLong());
tmp.setLastTimerQueueOffset(byteBuffer.getLong());
tmp.setMasterTimerQueueOffset(byteBuffer.getLong());
+ // new add to record dataVersion
+ if (byteBuffer.hasRemaining()) {
+ tmp.getDataVersion().setStateVersion(byteBuffer.getLong());
+ tmp.getDataVersion().setTimestamp(byteBuffer.getLong());
+ tmp.getDataVersion().setCounter(new
AtomicLong(byteBuffer.getLong()));
+ }
return tmp;
}
@@ -149,4 +175,12 @@ public class TimerCheckpoint {
public void setMasterTimerQueueOffset(final long masterTimerQueueOffset) {
this.masterTimerQueueOffset = masterTimerQueueOffset;
}
+
+ public void updateDateVersion(long stateVersion) {
+ dataVersion.nextVersion(stateVersion);
+ }
+
+ public DataVersion getDataVersion() {
+ return dataVersion;
+ }
}
diff --git
a/store/src/main/java/org/apache/rocketmq/store/timer/TimerMessageStore.java
b/store/src/main/java/org/apache/rocketmq/store/timer/TimerMessageStore.java
index fa2af4086..ac0facff7 100644
--- a/store/src/main/java/org/apache/rocketmq/store/timer/TimerMessageStore.java
+++ b/store/src/main/java/org/apache/rocketmq/store/timer/TimerMessageStore.java
@@ -17,6 +17,7 @@
package org.apache.rocketmq.store.timer;
import com.conversantmedia.util.concurrent.DisruptorBlockingQueue;
+import java.util.function.Function;
import org.apache.commons.collections.CollectionUtils;
import org.apache.rocketmq.common.ServiceThread;
import org.apache.rocketmq.common.ThreadFactoryImpl;
@@ -118,6 +119,8 @@ public class TimerMessageStore {
private volatile long commitReadTimeMs;
private volatile long currQueueOffset; //only one queue that is 0
private volatile long commitQueueOffset;
+ private volatile long lastCommitReadTimeMs;
+ private volatile long lastCommitQueueOffset;
private long lastEnqueueButExpiredTime;
private long lastEnqueueButExpiredStoreTime;
@@ -136,10 +139,14 @@ public class TimerMessageStore {
private boolean dequeueStatusChangeFlag = false;
private long shouldStartTime;
+ // True if current store is master or current brokerId is equal to the
minimum brokerId of the replica group in slaveActingMaster mode.
+ private volatile boolean shouldRunningDequeue;
private final BrokerStatsManager brokerStatsManager;
+ private Function<MessageExtBrokerInner, PutMessageResult> escapeBridgeHook;
public TimerMessageStore(final MessageStore messageStore, final
MessageStoreConfig storeConfig,
- TimerCheckpoint timerCheckpoint, TimerMetrics
timerMetrics, final BrokerStatsManager brokerStatsManager) throws IOException {
+ TimerCheckpoint timerCheckpoint, TimerMetrics timerMetrics,
+ final BrokerStatsManager brokerStatsManager) throws IOException {
this.messageStore = messageStore;
this.storeConfig = storeConfig;
this.commitLogFileSize = storeConfig.getMappedFileSizeCommitLog();
@@ -148,7 +155,7 @@ public class TimerMessageStore {
// TimerWheel contains the fixed number of slots regardless of
precision.
this.slotsTotal = TIMER_WHELL_TTL_DAY * DAY_SECS;
this.timerWheel = new
TimerWheel(getTimerWheelPath(storeConfig.getStorePathRootDir()),
- this.slotsTotal, precisionMs);
+ this.slotsTotal, precisionMs);
this.timerLog = new
TimerLog(getTimerLogPath(storeConfig.getStorePathRootDir()), timerLogFileSize);
this.timerMetrics = timerMetrics;
this.timerCheckpoint = timerCheckpoint;
@@ -186,7 +193,9 @@ public class TimerMessageStore {
dequeueGetMessageServices[i] = new TimerDequeueGetMessageService();
}
int putThreadNum = storeConfig.getTimerPutMessageThreadNum();
- if (putThreadNum <= 0) putThreadNum = 1;
+ if (putThreadNum <= 0) {
+ putThreadNum = 1;
+ }
dequeuePutMessageServices = new
TimerDequeuePutMessageService[putThreadNum];
for (int i = 0; i < dequeuePutMessageServices.length; i++) {
dequeuePutMessageServices[i] = new TimerDequeuePutMessageService();
@@ -200,7 +209,6 @@ public class TimerMessageStore {
dequeueGetQueue = new
LinkedBlockingDeque<List<TimerRequest>>(1024);
dequeuePutQueue = new LinkedBlockingDeque<TimerRequest>(1024);
}
-
this.brokerStatsManager = brokerStatsManager;
}
@@ -280,11 +288,12 @@ public class TimerMessageStore {
recoverAndRevise(minFirst, false);
}
LOGGER.info("Timer recover ok currReadTimerMs:{} currQueueOffset:{}
checkQueueOffset:{} processOffset:{}",
- currReadTimeMs, currQueueOffset,
timerCheckpoint.getLastTimerQueueOffset(), processOffset);
+ currReadTimeMs, currQueueOffset,
timerCheckpoint.getLastTimerQueueOffset(), processOffset);
commitReadTimeMs = currReadTimeMs;
commitQueueOffset = currQueueOffset;
+ prepareTimerCheckPoint();
}
public long reviseQueueOffset(long processOffset) {
@@ -314,7 +323,7 @@ public class TimerMessageStore {
while (maxCount-- > 0) {
if (tmpOffset < 0) {
LOGGER.warn("reviseQueueOffset check cq offset fail, msg
in cq is not found.{}, {}",
- offsetPy, sizePy);
+ offsetPy, sizePy);
break;
}
SelectMappedBufferResult bufferCQ =
cq.getIndexBuffer(tmpOffset);
@@ -328,7 +337,7 @@ public class TimerMessageStore {
int sizePyTemp = bufferCQ.getByteBuffer().getInt();
if (offsetPyTemp == offsetPy && sizePyTemp == sizePy) {
LOGGER.info("reviseQueueOffset check cq offset ok. {},
{}, {}",
- tmpOffset, offsetPyTemp, sizePyTemp);
+ tmpOffset, offsetPyTemp, sizePyTemp);
cqOffset = tmpOffset;
break;
}
@@ -349,7 +358,7 @@ public class TimerMessageStore {
//recover timerlog and revise timerwheel
//return process offset
private long recoverAndRevise(long beginOffset, boolean checkTimerLog) {
- LOGGER.info("Begin fto recover timerlog offset:{} check:{}",
beginOffset, checkTimerLog);
+ LOGGER.info("Begin to recover timerlog offset:{} check:{}",
beginOffset, checkTimerLog);
MappedFile lastFile =
timerLog.getMappedFileQueue().getLastMappedFile();
if (null == lastFile) {
return 0;
@@ -472,6 +481,11 @@ public class TimerMessageStore {
LOGGER.info("Timer start ok currReadTimerMs:[{}] queueOffset:[{}]",
new Timestamp(currReadTimeMs), currQueueOffset);
}
+ public void start(boolean shouldRunningDequeue) {
+ this.shouldRunningDequeue = shouldRunningDequeue;
+ this.start();
+ }
+
public void shutdown() {
if (SHUTDOWN == state) {
return;
@@ -541,7 +555,7 @@ public class TimerMessageStore {
private boolean isRunningEnqueue() {
checkBrokerRole();
- if (!isMaster() && currQueueOffset >=
timerCheckpoint.getMasterTimerQueueOffset()) {
+ if (!shouldRunningDequeue && !isMaster() && currQueueOffset >=
timerCheckpoint.getMasterTimerQueueOffset()) {
return false;
}
@@ -549,14 +563,22 @@ public class TimerMessageStore {
}
private boolean isRunningDequeue() {
- if (!isMaster()) {
- currReadTimeMs = timerCheckpoint.getLastReadTimeMs();
- commitReadTimeMs = currReadTimeMs;
+ if (!this.shouldRunningDequeue) {
+ syncLastReadTimeMs();
return false;
}
return isRunning();
}
+ public void syncLastReadTimeMs() {
+ currReadTimeMs = timerCheckpoint.getLastReadTimeMs();
+ commitReadTimeMs = currReadTimeMs;
+ }
+
+ public void setShouldRunningDequeue(final boolean shouldRunningDequeue) {
+ this.shouldRunningDequeue = shouldRunningDequeue;
+ }
+
public void addMetric(MessageExt msg, int value) {
try {
if (null == msg || null ==
msg.getProperty(MessageConst.PROPERTY_REAL_TOPIC)) {
@@ -694,7 +716,7 @@ public class TimerMessageStore {
// If it's a delete message, then slot's total num -1
// TODO: check if the delete msg is in the same slot with "the msg
to be deleted".
timerWheel.putSlot(delayedTime, slot.firstPos == -1 ? ret :
slot.firstPos, ret,
- isDelete ? slot.num - 1 : slot.num + 1, slot.magic);
+ isDelete ? slot.num - 1 : slot.num + 1, slot.magic);
addMetric(messageExt, isDelete ? -1 : 1);
}
return -1 != ret;
@@ -807,8 +829,8 @@ public class TimerMessageStore {
int checkNum = 0;
while (true) {
if (dequeuePutQueue.size() > 0
- || !checkStateForGetMessages(AbstractStateService.WAITING)
- ||
!checkStateForPutMessages(AbstractStateService.WAITING)) {
+ || !checkStateForGetMessages(AbstractStateService.WAITING)
+ || !checkStateForPutMessages(AbstractStateService.WAITING)) {
//let it go
} else {
checkNum++;
@@ -1006,15 +1028,19 @@ public class TimerMessageStore {
//0 succ; 1 fail, need retry; 2 fail, do not retry;
private int doPut(MessageExtBrokerInner message, boolean roll) throws
Exception {
- if (lastBrokerRole == BrokerRole.SLAVE) {
- LOGGER.warn("Trying do put timer msg in slave, [{}]", message);
- return PUT_NO_RETRY;
- }
+
if (!roll && null !=
message.getProperty(MessageConst.PROPERTY_TIMER_DEL_UNIQKEY)) {
LOGGER.warn("Trying do put delete timer msg:[{}] roll:[{}]",
message, roll);
return PUT_NO_RETRY;
}
- PutMessageResult putMessageResult = messageStore.putMessage(message);
+
+ PutMessageResult putMessageResult = null;
+ if (escapeBridgeHook != null) {
+ putMessageResult = escapeBridgeHook.apply(message);
+ } else {
+ putMessageResult = messageStore.putMessage(message);
+ }
+
int retryNum = 0;
while (retryNum < 3) {
if (null == putMessageResult || null ==
putMessageResult.getPutMessageStatus()) {
@@ -1025,7 +1051,7 @@ public class TimerMessageStore {
if (brokerStatsManager != null) {
this.brokerStatsManager.incTopicPutNums(message.getTopic(), 1, 1);
this.brokerStatsManager.incTopicPutSize(message.getTopic(),
-
putMessageResult.getAppendMessageResult().getWroteBytes());
+
putMessageResult.getAppendMessageResult().getWroteBytes());
this.brokerStatsManager.incBrokerPutNums(1);
}
return PUT_OK;
@@ -1058,7 +1084,7 @@ public class TimerMessageStore {
MessageAccessor.setProperties(msgInner, msgExt.getProperties());
TopicFilterType topicFilterType =
MessageExt.parseTopicFilterType(msgInner.getSysFlag());
long tagsCodeValue =
- MessageExtBrokerInner.tagsString2tagsCode(topicFilterType,
msgInner.getTags());
+ MessageExtBrokerInner.tagsString2tagsCode(topicFilterType,
msgInner.getTags());
msgInner.setTagsCode(tagsCodeValue);
msgInner.setPropertiesString(MessageDecoder.messageProperties2String(msgExt.getProperties()));
@@ -1101,8 +1127,8 @@ public class TimerMessageStore {
int hash = hashTopicForMetrics(entry.getKey());
if (smallHashs.containsKey(hash)) {
LOGGER.warn("[CheckAndReviseMetrics]Metric hash collision
between small-small code:{} small topic:{}{} small topic:{}{}", hash,
- entry.getKey(), entry.getValue(),
- smallHashs.get(hash),
smallOnes.get(smallHashs.get(hash)));
+ entry.getKey(), entry.getValue(),
+ smallHashs.get(hash),
smallOnes.get(smallHashs.get(hash)));
smallHashCollisions.add(hash);
}
smallHashs.put(hash, entry.getKey());
@@ -1118,8 +1144,8 @@ public class TimerMessageStore {
Map.Entry<String, TimerMetrics.Metric> smallEntry =
smalllIt.next();
if (hashTopicForMetrics(smallEntry.getKey()) ==
hashTopicForMetrics(bjgEntry.getKey())) {
LOGGER.warn("[CheckAndReviseMetrics]Metric hash
collision between small-big code:{} small topic:{}{} big topic:{}{}",
hashTopicForMetrics(smallEntry.getKey()),
- smallEntry.getKey(), smallEntry.getValue(),
- bjgEntry.getKey(), bjgEntry.getValue());
+ smallEntry.getKey(), smallEntry.getValue(),
+ bjgEntry.getKey(), bjgEntry.getValue());
smalllIt.remove();
}
}
@@ -1276,7 +1302,7 @@ public class TimerMessageStore {
req.setLatch(latch);
try {
perfs.startTick("enqueue_put");
- if (isMaster() && req.getDelayTime() <
currWriteTimeMs) {
+ if (shouldRunningDequeue && req.getDelayTime()
< currWriteTimeMs) {
dequeuePutQueue.put(req);
} else {
boolean doEnqueueRes =
doEnqueue(req.getOffsetPy(), req.getSizePy(), req.getDelayTime(), req.getMsg());
@@ -1574,11 +1600,11 @@ public class TimerMessageStore {
ConsumeQueue cq = (ConsumeQueue)
messageStore.getConsumeQueue(TIMER_TOPIC, 0);
long maxOffsetInQueue = cq == null ? 0 :
cq.getMaxOffsetInQueue();
TimerMessageStore.LOGGER.info("[{}]Timer
progress-check commitRead:[{}] currRead:[{}] currWrite:[{}] readBehind:{}
currReadOffset:{} offsetBehind:{} behindMaster:{} " +
- "enqPutQueue:{} deqGetQueue:{}
deqPutQueue:{} allCongestNum:{} enqExpiredStoreTime:{}",
- storeConfig.getBrokerRole(),
- format(commitReadTimeMs),
format(currReadTimeMs), format(currWriteTimeMs), getReadBehind(),
- tmpQueueOffset, maxOffsetInQueue -
tmpQueueOffset, timerCheckpoint.getMasterTimerQueueOffset() - tmpQueueOffset,
- enqueuePutQueue.size(),
dequeueGetQueue.size(), dequeuePutQueue.size(), getALlCongestNum(),
format(lastEnqueueButExpiredStoreTime));
+ "enqPutQueue:{} deqGetQueue:{} deqPutQueue:{}
allCongestNum:{} enqExpiredStoreTime:{}",
+ storeConfig.getBrokerRole(),
+ format(commitReadTimeMs), format(currReadTimeMs),
format(currWriteTimeMs), getReadBehind(),
+ tmpQueueOffset, maxOffsetInQueue - tmpQueueOffset,
timerCheckpoint.getMasterTimerQueueOffset() - tmpQueueOffset,
+ enqueuePutQueue.size(), dequeueGetQueue.size(),
dequeuePutQueue.size(), getALlCongestNum(),
format(lastEnqueueButExpiredStoreTime));
}
timerMetrics.persist();
waitForRunning(storeConfig.getTimerFlushIntervalMs());
@@ -1640,13 +1666,22 @@ public class TimerMessageStore {
public void prepareTimerCheckPoint() {
timerCheckpoint.setLastTimerLogFlushPos(timerLog.getMappedFileQueue().getFlushedWhere());
- if (isMaster()) {
- timerCheckpoint.setLastReadTimeMs(commitReadTimeMs);
+ timerCheckpoint.setLastReadTimeMs(commitReadTimeMs);
+ if (shouldRunningDequeue) {
timerCheckpoint.setMasterTimerQueueOffset(commitQueueOffset);
+ if (commitReadTimeMs != lastCommitReadTimeMs || commitQueueOffset
!= lastCommitQueueOffset) {
+
timerCheckpoint.updateDateVersion(messageStore.getStateMachineVersion());
+ lastCommitReadTimeMs = commitReadTimeMs;
+ lastCommitQueueOffset = commitQueueOffset;
+ }
}
timerCheckpoint.setLastTimerQueueOffset(Math.min(commitQueueOffset,
timerCheckpoint.getMasterTimerQueueOffset()));
}
+ public void registerEscapeBridgeHook(Function<MessageExtBrokerInner,
PutMessageResult> escapeBridgeHook) {
+ this.escapeBridgeHook = escapeBridgeHook;
+ }
+
public boolean isMaster() {
return BrokerRole.SLAVE != lastBrokerRole;
}
diff --git
a/store/src/test/java/org/apache/rocketmq/store/timer/TimerCheckPointTest.java
b/store/src/test/java/org/apache/rocketmq/store/timer/TimerCheckPointTest.java
new file mode 100644
index 000000000..f72874af2
--- /dev/null
+++
b/store/src/test/java/org/apache/rocketmq/store/timer/TimerCheckPointTest.java
@@ -0,0 +1,127 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.rocketmq.store.timer;
+
+import java.io.File;
+import java.io.IOException;
+import java.util.concurrent.atomic.AtomicLong;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+
+import static org.junit.Assert.assertEquals;
+
+public class TimerCheckPointTest {
+
+ private String baseDir;
+
+ @Before
+ public void init() throws IOException {
+ baseDir = StoreTestUtils.createBaseDir();
+ }
+
+ @Test
+ public void testCheckPoint() throws IOException {
+ String baseSrc = baseDir + File.separator + "timercheck";
+ TimerCheckpoint first = new TimerCheckpoint(baseSrc);
+ assertEquals(0, first.getLastReadTimeMs());
+ assertEquals(0, first.getLastTimerLogFlushPos());
+ assertEquals(0, first.getLastTimerQueueOffset());
+ assertEquals(0, first.getMasterTimerQueueOffset());
+ first.setLastReadTimeMs(1000);
+ first.setLastTimerLogFlushPos(1100);
+ first.setLastTimerQueueOffset(1200);
+ first.setMasterTimerQueueOffset(1300);
+ first.shutdown();
+ TimerCheckpoint second = new TimerCheckpoint(baseSrc);
+ assertEquals(1000, second.getLastReadTimeMs());
+ assertEquals(1100, second.getLastTimerLogFlushPos());
+ assertEquals(1200, second.getLastTimerQueueOffset());
+ assertEquals(1300, second.getMasterTimerQueueOffset());
+ }
+
+ @Test
+ public void testNewCheckPoint() throws IOException {
+ String baseSrc = baseDir + File.separator + "timercheck2";
+ TimerCheckpoint first = new TimerCheckpoint(baseSrc);
+ assertEquals(0, first.getLastReadTimeMs());
+ assertEquals(0, first.getLastTimerLogFlushPos());
+ assertEquals(0, first.getLastTimerQueueOffset());
+ assertEquals(0, first.getMasterTimerQueueOffset());
+ assertEquals(0, first.getDataVersion().getStateVersion());
+ assertEquals(0, first.getDataVersion().getCounter().get());
+ first.setLastReadTimeMs(1000);
+ first.setLastTimerLogFlushPos(1100);
+ first.setLastTimerQueueOffset(1200);
+ first.setMasterTimerQueueOffset(1300);
+ first.getDataVersion().setStateVersion(1400);
+ first.getDataVersion().setTimestamp(1500);
+ first.getDataVersion().setCounter(new AtomicLong(1600));
+ first.shutdown();
+ TimerCheckpoint second = new TimerCheckpoint(baseSrc);
+ assertEquals(1000, second.getLastReadTimeMs());
+ assertEquals(1100, second.getLastTimerLogFlushPos());
+ assertEquals(1200, second.getLastTimerQueueOffset());
+ assertEquals(1300, second.getMasterTimerQueueOffset());
+ assertEquals(1400, second.getDataVersion().getStateVersion());
+ assertEquals(1500, second.getDataVersion().getTimestamp());
+ assertEquals(1600, second.getDataVersion().getCounter().get());
+ }
+
+ @Test
+ public void testEncodeDecode() throws IOException {
+ TimerCheckpoint first = new TimerCheckpoint();
+ first.setLastReadTimeMs(1000);
+ first.setLastTimerLogFlushPos(1100);
+ first.setLastTimerQueueOffset(1200);
+ first.setMasterTimerQueueOffset(1300);
+
+ TimerCheckpoint second =
TimerCheckpoint.decode(TimerCheckpoint.encode(first));
+ assertEquals(first.getLastReadTimeMs(), second.getLastReadTimeMs());
+ assertEquals(first.getLastTimerLogFlushPos(),
second.getLastTimerLogFlushPos());
+ assertEquals(first.getLastTimerQueueOffset(),
second.getLastTimerQueueOffset());
+ assertEquals(first.getMasterTimerQueueOffset(),
second.getMasterTimerQueueOffset());
+ }
+
+ @Test
+ public void testNewEncodeDecode() throws IOException {
+ TimerCheckpoint first = new TimerCheckpoint();
+ first.setLastReadTimeMs(1000);
+ first.setLastTimerLogFlushPos(1100);
+ first.setLastTimerQueueOffset(1200);
+ first.setMasterTimerQueueOffset(1300);
+ first.getDataVersion().setStateVersion(1400);
+ first.getDataVersion().setTimestamp(1500);
+ first.getDataVersion().setCounter(new AtomicLong(1600));
+ TimerCheckpoint second =
TimerCheckpoint.decode(TimerCheckpoint.encode(first));
+ assertEquals(first.getLastReadTimeMs(), second.getLastReadTimeMs());
+ assertEquals(first.getLastTimerLogFlushPos(),
second.getLastTimerLogFlushPos());
+ assertEquals(first.getLastTimerQueueOffset(),
second.getLastTimerQueueOffset());
+ assertEquals(first.getMasterTimerQueueOffset(),
second.getMasterTimerQueueOffset());
+ assertEquals(first.getDataVersion().getStateVersion(), 1400);
+ assertEquals(first.getDataVersion().getTimestamp(), 1500);
+ assertEquals(first.getDataVersion().getCounter().get(), 1600);
+ }
+
+ @After
+ public void shutdown() {
+ if (null != baseDir) {
+ StoreTestUtils.deleteFile(baseDir);
+ }
+ }
+}
diff --git
a/store/src/test/java/org/apache/rocketmq/store/timer/TimerMessageStoreTest.java
b/store/src/test/java/org/apache/rocketmq/store/timer/TimerMessageStoreTest.java
index e893d482a..cc7ea41c7 100644
---
a/store/src/test/java/org/apache/rocketmq/store/timer/TimerMessageStoreTest.java
+++
b/store/src/test/java/org/apache/rocketmq/store/timer/TimerMessageStoreTest.java
@@ -64,8 +64,6 @@ import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertNull;
import static org.junit.Assert.assertTrue;
-//// Timer unit tests are very unstable, ignore these temporarily
-//@Ignore
public class TimerMessageStoreTest {
private final byte[] msgBody = new byte[1024];
private static MessageStore messageStore;
@@ -173,7 +171,7 @@ public class TimerMessageStoreTest {
final TimerMessageStore timerMessageStore =
createTimerMessageStore(null);
timerMessageStore.load();
- timerMessageStore.start();
+ timerMessageStore.start(true);
long curr = System.currentTimeMillis() / precisionMs * precisionMs;
long delayMs = curr + 3000;
@@ -220,7 +218,7 @@ public class TimerMessageStoreTest {
storeConfig.setTimerCongestNumEachSlot(100);
TimerMessageStore timerMessageStore = createTimerMessageStore(null);
timerMessageStore.load();
- timerMessageStore.start();
+ timerMessageStore.start(true);
long curr = System.currentTimeMillis() / precisionMs * precisionMs;
// Make sure delayMs won't be over.
@@ -265,7 +263,7 @@ public class TimerMessageStoreTest {
TimerMessageStore timerMessageStore = createTimerMessageStore(null);
timerMessageStore.load();
- timerMessageStore.start();
+ timerMessageStore.start(true);
long delayMs = System.currentTimeMillis() - 2 * precisionMs;
for (int i = 0; i < 10; i++) {
@@ -289,7 +287,7 @@ public class TimerMessageStoreTest {
TimerMessageStore timerMessageStore = createTimerMessageStore(null);
timerMessageStore.load();
- timerMessageStore.start();
+ timerMessageStore.start(true);
long curr = System.currentTimeMillis() / precisionMs * precisionMs;
long delayMs = curr + 1000;
@@ -326,7 +324,7 @@ public class TimerMessageStoreTest {
final TimerMessageStore timerMessageStore =
createTimerMessageStore(null);
timerMessageStore.load();
- timerMessageStore.start();
+ timerMessageStore.start(true);
long curr = System.currentTimeMillis() / precisionMs * precisionMs;
final long delayMs = curr + 1000;
@@ -373,7 +371,7 @@ public class TimerMessageStoreTest {
String base = StoreTestUtils.createBaseDir();
final TimerMessageStore first = createTimerMessageStore(base);
first.load();
- first.start();
+ first.start(true);
final int msgNum = 250;
long curr = System.currentTimeMillis() / precisionMs * precisionMs;
@@ -423,7 +421,7 @@ public class TimerMessageStoreTest {
assertEquals(second.getCommitQueueOffset(), second.getQueueOffset());
assertEquals(second.getCurrReadTimeMs(), second.getCommitReadTimeMs());
assertEquals(first.getCommitReadTimeMs(),
second.getCommitReadTimeMs());
- second.start();
+ second.start(true);
// Wait until all messages have wrote back to commitLog and
consumeQueue.
await().atMost(5000, TimeUnit.MILLISECONDS).until(new
Callable<Boolean>() {
@@ -447,7 +445,7 @@ public class TimerMessageStoreTest {
TimerMessageStore first = createTimerMessageStore(null);
first.load();
- first.start();
+ first.start(true);
long curr = System.currentTimeMillis() / precisionMs * precisionMs;
long delaySec = storeConfig.getTimerMaxDelaySec() + 20;
@@ -469,7 +467,7 @@ public class TimerMessageStoreTest {
TimerMessageStore timerMessageStore = createTimerMessageStore(null);
timerMessageStore.load();
- timerMessageStore.start();
+ timerMessageStore.start(true);
long curr = System.currentTimeMillis() / precisionMs * precisionMs;
long delayMs = curr + 4 * precisionMs;
diff --git
a/test/src/test/java/org/apache/rocketmq/test/container/GetMetadataReverseIT.java
b/test/src/test/java/org/apache/rocketmq/test/container/GetMetadataReverseIT.java
index 7138f4212..d1b38405e 100644
---
a/test/src/test/java/org/apache/rocketmq/test/container/GetMetadataReverseIT.java
+++
b/test/src/test/java/org/apache/rocketmq/test/container/GetMetadataReverseIT.java
@@ -228,4 +228,76 @@ public class GetMetadataReverseIT extends
ContainerIntegrationTestBase {
pushConsumer.shutdown();
}
+
+ @Test
+ public void testGetMetadataReverse_timerCheckPoint() throws Exception {
+ String topic = GetMetadataReverseIT.class.getSimpleName() +
"_timerCheckPoint" + random.nextInt(65535);
+ createTopicTo(master1With3Replicas, topic, 1, 1);
+ createTopicTo(master2With3Replicas, topic, 1, 1);
+ createTopicTo(master3With3Replicas, topic, 1, 1);
+ // Wait topic synchronization
+ await().atMost(Duration.ofMinutes(1)).until(() -> {
+ InnerSalveBrokerController slaveBroker =
brokerContainer2.getSlaveBrokers().iterator().next();
+ return
slaveBroker.getTopicConfigManager().selectTopicConfig(topic) != null;
+ });
+
+ DefaultMQPushConsumer pushConsumer =
createPushConsumer(CONSUMER_GROUP);
+ pushConsumer.subscribe(topic, "*");
+ AtomicInteger receivedMsgCount = new AtomicInteger(0);
+ pushConsumer.registerMessageListener((MessageListenerConcurrently)
(msgs, context) -> {
+ receivedMsgCount.addAndGet(msgs.size());
+ msgs.forEach(x -> System.out.printf(x + "%n"));
+ return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
+ });
+ pushConsumer.start();
+
+ MessageQueue messageQueue = new MessageQueue(topic,
master1With3Replicas.getBrokerConfig().getBrokerName(), 0);
+ int sendSuccess = 0;
+ for (int i = 0; i < MESSAGE_COUNT; i++) {
+ Message msg = new Message(topic, Integer.toString(i).getBytes());
+ msg.setDelayTimeSec(30);
+ SendResult sendResult = producer.send(msg, messageQueue);
+ if (sendResult.getSendStatus() == SendStatus.SEND_OK) {
+ sendSuccess++;
+ }
+ }
+ final int finalSendSuccess = sendSuccess;
+ await().atMost(Duration.ofMinutes(1)).until(() -> finalSendSuccess >=
MESSAGE_COUNT);
+ System.out.printf("send success%n");
+
+ isolateBroker(master1With3Replicas);
+ brokerContainer1.removeBroker(new BrokerIdentity(
+ master1With3Replicas.getBrokerConfig().getBrokerClusterName(),
+ master1With3Replicas.getBrokerConfig().getBrokerName(),
+ master1With3Replicas.getBrokerConfig().getBrokerId()));
+
+ System.out.printf("Remove master%n");
+
+ await().atMost(Duration.ofMinutes(1)).until(() ->
receivedMsgCount.get() >= MESSAGE_COUNT);
+
+ await().atMost(Duration.ofMinutes(1)).until(() -> {
+
pushConsumer.getDefaultMQPushConsumerImpl().persistConsumerOffset();
+ Map<Integer, Long> OffsetTable =
master2With3Replicas.getConsumerOffsetManager().queryOffset(CONSUMER_GROUP,
topic);
+ if (OffsetTable != null) {
+ long totalOffset = 0;
+ for (final Long offset : OffsetTable.values()) {
+ totalOffset += offset;
+ }
+ return totalOffset >= MESSAGE_COUNT;
+ }
+ return false;
+ });
+
+ //Add back master
+ master1With3Replicas =
brokerContainer1.addBroker(master1With3Replicas.getBrokerConfig(),
master1With3Replicas.getMessageStoreConfig());
+ master1With3Replicas.start();
+ cancelIsolatedBroker(master1With3Replicas);
+ System.out.printf("Add back master%n");
+
+ awaitUntilSlaveOK();
+
+ await().atMost(Duration.ofMinutes(1)).until(() ->
master1With3Replicas.getTimerCheckpoint().getMasterTimerQueueOffset() >=
MESSAGE_COUNT);
+
+ pushConsumer.shutdown();
+ }
}
diff --git
a/test/src/test/java/org/apache/rocketmq/test/container/ScheduleSlaveActingMasterIT.java
b/test/src/test/java/org/apache/rocketmq/test/container/ScheduleSlaveActingMasterIT.java
index c2850f3b9..ac87ed722 100644
---
a/test/src/test/java/org/apache/rocketmq/test/container/ScheduleSlaveActingMasterIT.java
+++
b/test/src/test/java/org/apache/rocketmq/test/container/ScheduleSlaveActingMasterIT.java
@@ -39,7 +39,7 @@ import org.junit.Test;
import static org.awaitility.Awaitility.await;
-//The test is correct, but it takes too much time, so it is ignored for the
time being
+//The test is correct, but it takes too much time and not core functions, so
it is ignored for the time being
@Ignore
public class ScheduleSlaveActingMasterIT extends ContainerIntegrationTestBase {
@@ -135,6 +135,65 @@ public class ScheduleSlaveActingMasterIT extends
ContainerIntegrationTestBase {
Thread.sleep(30000);
}
+ @Test
+ public void testLocalActing_timerMsg() throws Exception {
+ awaitUntilSlaveOK();
+ String topic = ScheduleSlaveActingMasterIT.class.getSimpleName() +
random.nextInt(65535);
+ createTopic(topic);
+ DefaultMQPushConsumer pushConsumer = createPushConsumer(CONSUME_GROUP);
+ pushConsumer.subscribe(topic, "*");
+ AtomicInteger receivedMsgCount = new AtomicInteger(0);
+ AtomicInteger inTimeMsgCount = new AtomicInteger(0);
+ pushConsumer.registerMessageListener((MessageListenerConcurrently)
(msgs, context) -> {
+ long period = System.currentTimeMillis() -
msgs.get(0).getBornTimestamp();
+ if (Math.abs(period - 30000) <= 1000) {
+ inTimeMsgCount.addAndGet(msgs.size());
+ }
+ receivedMsgCount.addAndGet(msgs.size());
+ msgs.forEach(x -> System.out.printf(x + "%n"));
+ return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
+ });
+ pushConsumer.start();
+
+ MessageQueue messageQueue = new MessageQueue(topic,
master1With3Replicas.getBrokerConfig().getBrokerName(), 0);
+ int sendSuccess = 0;
+ for (int i = 0; i < MESSAGE_COUNT; i++) {
+ Message msg = new Message(topic, MESSAGE_BODY);
+ msg.setDelayTimeSec(30);
+ SendResult sendResult = producer.send(msg, messageQueue);
+ if (sendResult.getSendStatus() == SendStatus.SEND_OK) {
+ sendSuccess++;
+ }
+ }
+ final int finalSendSuccess = sendSuccess;
+ await().atMost(Duration.ofMinutes(2)).until(() -> finalSendSuccess >=
MESSAGE_COUNT);
+ System.out.printf("send success%n");
+
+ isolateBroker(master1With3Replicas);
+ brokerContainer1.removeBroker(new BrokerIdentity(
+ master1With3Replicas.getBrokerConfig().getBrokerClusterName(),
+ master1With3Replicas.getBrokerConfig().getBrokerName(),
+ master1With3Replicas.getBrokerConfig().getBrokerId()));
+
+ System.out.printf("Remove master1%n");
+
+ await().atMost(Duration.ofMinutes(1)).until(() ->
receivedMsgCount.get() >= MESSAGE_COUNT && inTimeMsgCount.get() >=
MESSAGE_COUNT * 0.95);
+
+ System.out.printf("consumer received %d msg, %d in time%n",
receivedMsgCount.get(), inTimeMsgCount.get());
+
+ pushConsumer.shutdown();
+
+ //Add back master
+ master1With3Replicas =
brokerContainer1.addBroker(master1With3Replicas.getBrokerConfig(),
master1With3Replicas.getMessageStoreConfig());
+ master1With3Replicas.start();
+ cancelIsolatedBroker(master1With3Replicas);
+ System.out.printf("Add back master1%n");
+
+ awaitUntilSlaveOK();
+ // sleep a while to recover
+ Thread.sleep(20000);
+ }
+
@Test
public void testRemoteActing_delayMsg() throws Exception {
awaitUntilSlaveOK();
@@ -217,4 +276,83 @@ public class ScheduleSlaveActingMasterIT extends
ContainerIntegrationTestBase {
Thread.sleep(30000);
}
+ @Test
+ public void testRemoteActing_timerMsg() throws Exception {
+ awaitUntilSlaveOK();
+
+ String topic = ScheduleSlaveActingMasterIT.class.getSimpleName() +
random.nextInt(65535);
+ createTopic(topic);
+ AtomicInteger receivedMsgCount = new AtomicInteger(0);
+ AtomicInteger inTimeMsgCount = new AtomicInteger(0);
+ AtomicInteger master3MsgCount = new AtomicInteger(0);
+
+ MessageQueue messageQueue = new MessageQueue(topic,
master1With3Replicas.getBrokerConfig().getBrokerName(), 0);
+ int sendSuccess = 0;
+ for (int i = 0; i < MESSAGE_COUNT; i++) {
+ Message msg = new Message(topic, MESSAGE_BODY);
+ msg.setDelayTimeSec(30);
+ SendResult sendResult = producer.send(msg, messageQueue);
+ if (sendResult.getSendStatus() == SendStatus.SEND_OK) {
+ sendSuccess++;
+ }
+ }
+ final int finalSendSuccess = sendSuccess;
+ await().atMost(Duration.ofMinutes(1)).until(() -> finalSendSuccess >=
MESSAGE_COUNT);
+ long sendCompleteTimeStamp = System.currentTimeMillis();
+ System.out.printf("send success%n");
+
+ DefaultMQPushConsumer pushConsumer = createPushConsumer(CONSUME_GROUP);
+ pushConsumer.subscribe(topic, "*");
+ pushConsumer.registerMessageListener((MessageListenerConcurrently)
(msgs, context) -> {
+ long period = System.currentTimeMillis() - sendCompleteTimeStamp;
+ // Remote Acting lead to born timestamp, msgId changed, it need to
polish.
+ if (Math.abs(period - 30000) <= 3000) {
+ inTimeMsgCount.addAndGet(msgs.size());
+ }
+ if
(msgs.get(0).getBrokerName().equals(master3With3Replicas.getBrokerConfig().getBrokerName()))
{
+ master3MsgCount.addAndGet(msgs.size());
+ }
+ receivedMsgCount.addAndGet(msgs.size());
+ msgs.forEach(x -> System.out.printf("cost " + period + " " + x +
"%n"));
+ return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
+ });
+ pushConsumer.start();
+
+ isolateBroker(master1With3Replicas);
+ brokerContainer1.removeBroker(new BrokerIdentity(
+ master1With3Replicas.getBrokerConfig().getBrokerClusterName(),
+ master1With3Replicas.getBrokerConfig().getBrokerName(),
+ master1With3Replicas.getBrokerConfig().getBrokerId()));
+ System.out.printf("Remove master1%n");
+
+ isolateBroker(master2With3Replicas);
+ brokerContainer2.removeBroker(new BrokerIdentity(
+ master2With3Replicas.getBrokerConfig().getBrokerClusterName(),
+ master2With3Replicas.getBrokerConfig().getBrokerName(),
+ master2With3Replicas.getBrokerConfig().getBrokerId()));
+ System.out.printf("Remove master2%n");
+
+ await().atMost(Duration.ofMinutes(1)).until(() ->
receivedMsgCount.get() >= MESSAGE_COUNT && master3MsgCount.get() >=
MESSAGE_COUNT && inTimeMsgCount.get() >= MESSAGE_COUNT * 0.95);
+
+ System.out.printf("consumer received %d msg, %d in time%n",
receivedMsgCount.get(), inTimeMsgCount.get());
+
+ pushConsumer.shutdown();
+
+ //Add back master
+ master1With3Replicas =
brokerContainer1.addBroker(master1With3Replicas.getBrokerConfig(),
master1With3Replicas.getMessageStoreConfig());
+ master1With3Replicas.start();
+ cancelIsolatedBroker(master1With3Replicas);
+ System.out.printf("Add back master1%n");
+
+ //Add back master
+ master2With3Replicas =
brokerContainer2.addBroker(master2With3Replicas.getBrokerConfig(),
master2With3Replicas.getMessageStoreConfig());
+ master2With3Replicas.start();
+ cancelIsolatedBroker(master2With3Replicas);
+ System.out.printf("Add back master2%n");
+
+ awaitUntilSlaveOK();
+ // sleep a while to recover
+ Thread.sleep(20000);
+ }
+
}
diff --git
a/test/src/test/java/org/apache/rocketmq/test/container/ScheduledMessageIT.java
b/test/src/test/java/org/apache/rocketmq/test/container/ScheduledMessageIT.java
index 617d72233..82bceebfd 100644
---
a/test/src/test/java/org/apache/rocketmq/test/container/ScheduledMessageIT.java
+++
b/test/src/test/java/org/apache/rocketmq/test/container/ScheduledMessageIT.java
@@ -96,7 +96,7 @@ public class ScheduledMessageIT extends
ContainerIntegrationTestBase {
inTimeMsgCount.addAndGet(msgs.size());
}
receivedMsgCount.addAndGet(msgs.size());
- msgs.forEach(x -> System.out.printf(receivedMsgCount.get()+" cost
" + period + " " + x + "%n"));
+ msgs.forEach(x -> System.out.printf(receivedMsgCount.get() + "
cost " + period + " " + x + "%n"));
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
});
@@ -150,4 +150,38 @@ public class ScheduledMessageIT extends
ContainerIntegrationTestBase {
.until(() -> ((DefaultMessageStore)
master1With3Replicas.getMessageStore()).getHaService().getConnectionCount().get()
== 2);
}
+ @Test
+ public void consumeTimerMsgFromSlave() throws MQClientException,
RemotingException, InterruptedException, MQBrokerException {
+ String topic = TOPIC_PREFIX + random.nextInt(65535);
+ createTopic(topic);
+ DefaultMQPushConsumer pushConsumer = createPushConsumer(CONSUME_GROUP);
+ pushConsumer.subscribe(topic, "*");
+ AtomicInteger receivedMsgCount = new AtomicInteger(0);
+ pushConsumer.registerMessageListener((MessageListenerConcurrently)
(msgs, context) -> {
+ receivedMsgCount.addAndGet(msgs.size());
+ return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
+ });
+
+ for (int i = 0; i < MESSAGE_COUNT; i++) {
+ Message msg = new Message(topic, String.valueOf(i).getBytes());
+ msg.setDelayTimeSec(3);
+ producer.send(msg);
+ }
+
+ isolateBroker(master1With3Replicas);
+
+
producer.getDefaultMQProducerImpl().getmQClientFactory().updateTopicRouteInfoFromNameServer(topic);
+
assertThat(producer.getDefaultMQProducerImpl().getmQClientFactory().findBrokerAddressInPublish(topic)).isNull();
+
+ pushConsumer.start();
+
+ await().atMost(Duration.ofSeconds(MESSAGE_COUNT * 2)).until(() ->
receivedMsgCount.get() >= MESSAGE_COUNT);
+
+ pushConsumer.shutdown();
+ cancelIsolatedBroker(master1With3Replicas);
+
+ await().atMost(100, TimeUnit.SECONDS)
+ .until(() -> ((DefaultMessageStore)
master1With3Replicas.getMessageStore()).getHaService().getConnectionCount().get()
== 2);
+ }
+
}