This is an automated email from the ASF dual-hosted git repository.
jinrongtong pushed a commit to branch develop
in repository https://gitbox.apache.org/repos/asf/rocketmq.git
The following commit(s) were added to refs/heads/develop by this push:
new d1fd7af3f1 [ISSUE #8979] Add configurable switch for timer message
retry logic (#8980)
d1fd7af3f1 is described below
commit d1fd7af3f12e1bfa30c7baa7c3f687168a9f5dbf
Author: 小陈 <[email protected]>
AuthorDate: Thu Dec 5 17:49:03 2024 +0800
[ISSUE #8979] Add configurable switch for timer message retry logic (#8980)
---
.../rocketmq/store/config/MessageStoreConfig.java | 9 ++
.../rocketmq/store/timer/TimerMessageStore.java | 121 ++++++++++++---------
.../store/timer/TimerMessageStoreTest.java | 87 ++++++++++++---
3 files changed, 149 insertions(+), 68 deletions(-)
diff --git
a/store/src/main/java/org/apache/rocketmq/store/config/MessageStoreConfig.java
b/store/src/main/java/org/apache/rocketmq/store/config/MessageStoreConfig.java
index 6dfdc0b1c8..0ea5841548 100644
---
a/store/src/main/java/org/apache/rocketmq/store/config/MessageStoreConfig.java
+++
b/store/src/main/java/org/apache/rocketmq/store/config/MessageStoreConfig.java
@@ -99,6 +99,7 @@ public class MessageStoreConfig {
private boolean timerSkipUnknownError = false;
private boolean timerWarmEnable = false;
private boolean timerStopDequeue = false;
+ private boolean timerEnableRetryUntilSuccess = false;
private int timerCongestNumEachSlot = Integer.MAX_VALUE;
private int timerMetricSmallThreshold = 1000000;
@@ -1689,6 +1690,14 @@ public class MessageStoreConfig {
this.timerSkipUnknownError = timerSkipUnknownError;
}
+ public boolean isTimerEnableRetryUntilSuccess() {
+ return timerEnableRetryUntilSuccess;
+ }
+
+ public void setTimerEnableRetryUntilSuccess(boolean
timerEnableRetryUntilSuccess) {
+ this.timerEnableRetryUntilSuccess = timerEnableRetryUntilSuccess;
+ }
+
public boolean isTimerWarmEnable() {
return timerWarmEnable;
}
diff --git
a/store/src/main/java/org/apache/rocketmq/store/timer/TimerMessageStore.java
b/store/src/main/java/org/apache/rocketmq/store/timer/TimerMessageStore.java
index fb166678e6..2b14618eed 100644
--- a/store/src/main/java/org/apache/rocketmq/store/timer/TimerMessageStore.java
+++ b/store/src/main/java/org/apache/rocketmq/store/timer/TimerMessageStore.java
@@ -1097,46 +1097,44 @@ public class TimerMessageStore {
putMessageResult = messageStore.putMessage(message);
}
- int retryNum = 0;
- while (retryNum < 3) {
- if (null == putMessageResult || null ==
putMessageResult.getPutMessageStatus()) {
- retryNum++;
- } else {
- switch (putMessageResult.getPutMessageStatus()) {
- case PUT_OK:
- if (brokerStatsManager != null) {
-
this.brokerStatsManager.incTopicPutNums(message.getTopic(), 1, 1);
- if (putMessageResult.getAppendMessageResult() !=
null) {
-
this.brokerStatsManager.incTopicPutSize(message.getTopic(),
-
putMessageResult.getAppendMessageResult().getWroteBytes());
- }
-
this.brokerStatsManager.incBrokerPutNums(message.getTopic(), 1);
+ if (putMessageResult != null && putMessageResult.getPutMessageStatus()
!= null) {
+ switch (putMessageResult.getPutMessageStatus()) {
+ case PUT_OK:
+ if (brokerStatsManager != null) {
+ brokerStatsManager.incTopicPutNums(message.getTopic(),
1, 1);
+ if (putMessageResult.getAppendMessageResult() != null)
{
+
brokerStatsManager.incTopicPutSize(message.getTopic(),
putMessageResult.getAppendMessageResult().getWroteBytes());
}
- return PUT_OK;
- case SERVICE_NOT_AVAILABLE:
- return PUT_NEED_RETRY;
- case MESSAGE_ILLEGAL:
- case PROPERTIES_SIZE_EXCEEDED:
+
brokerStatsManager.incBrokerPutNums(message.getTopic(), 1);
+ }
+ return PUT_OK;
+
+ case MESSAGE_ILLEGAL:
+ case PROPERTIES_SIZE_EXCEEDED:
+ case WHEEL_TIMER_NOT_ENABLE:
+ case WHEEL_TIMER_MSG_ILLEGAL:
+ return PUT_NO_RETRY;
+
+ case SERVICE_NOT_AVAILABLE:
+ case FLUSH_DISK_TIMEOUT:
+ case FLUSH_SLAVE_TIMEOUT:
+ case OS_PAGE_CACHE_BUSY:
+ case CREATE_MAPPED_FILE_FAILED:
+ case SLAVE_NOT_AVAILABLE:
+ return PUT_NEED_RETRY;
+
+ case UNKNOWN_ERROR:
+ default:
+ if (storeConfig.isTimerSkipUnknownError()) {
+ LOGGER.warn("Skipping message due to unknown error,
msg: {}", message);
return PUT_NO_RETRY;
- case CREATE_MAPPED_FILE_FAILED:
- case FLUSH_DISK_TIMEOUT:
- case FLUSH_SLAVE_TIMEOUT:
- case OS_PAGE_CACHE_BUSY:
- case SLAVE_NOT_AVAILABLE:
- case UNKNOWN_ERROR:
- default:
- retryNum++;
- }
- }
- Thread.sleep(50);
- if (escapeBridgeHook != null) {
- putMessageResult = escapeBridgeHook.apply(message);
- } else {
- putMessageResult = messageStore.putMessage(message);
+ } else {
+ holdMomentForUnknownError();
+ return PUT_NEED_RETRY;
+ }
}
- LOGGER.warn("Retrying to do put timer msg retryNum:{} putRes:{}
msg:{}", retryNum, putMessageResult, message);
}
- return PUT_NO_RETRY;
+ return PUT_NEED_RETRY;
}
public MessageExtBrokerInner convertMessage(MessageExt msgExt, boolean
needRoll) {
@@ -1471,7 +1469,6 @@ public class TimerMessageStore {
}
public class TimerDequeuePutMessageService extends AbstractStateService {
-
@Override
public String getServiceName() {
return getServiceThreadName() + this.getClass().getSimpleName();
@@ -1481,6 +1478,7 @@ public class TimerMessageStore {
public void run() {
setState(AbstractStateService.START);
TimerMessageStore.LOGGER.info(this.getServiceName() + " service
start");
+
while (!this.isStopped() || dequeuePutQueue.size() != 0) {
try {
setState(AbstractStateService.WAITING);
@@ -1488,41 +1486,63 @@ public class TimerMessageStore {
if (null == tr) {
continue;
}
+
setState(AbstractStateService.RUNNING);
- boolean doRes = false;
boolean tmpDequeueChangeFlag = false;
+
try {
- while (!isStopped() && !doRes) {
+ while (!isStopped()) {
if (!isRunningDequeue()) {
dequeueStatusChangeFlag = true;
tmpDequeueChangeFlag = true;
break;
}
+
try {
perfCounterTicks.startTick(DEQUEUE_PUT);
+
MessageExt msgExt = tr.getMsg();
DefaultStoreMetricsManager.incTimerDequeueCount(getRealTopic(msgExt));
+
if (tr.getEnqueueTime() == Long.MAX_VALUE) {
- // never enqueue, mark it.
+ // Never enqueue, mark it.
MessageAccessor.putProperty(msgExt,
TIMER_ENQUEUE_MS, String.valueOf(Long.MAX_VALUE));
}
+
addMetric(msgExt, -1);
MessageExtBrokerInner msg = convert(msgExt,
tr.getEnqueueTime(), needRoll(tr.getMagic()));
- doRes = PUT_NEED_RETRY != doPut(msg,
needRoll(tr.getMagic()));
- while (!doRes && !isStopped()) {
- if (!isRunningDequeue()) {
- dequeueStatusChangeFlag = true;
- tmpDequeueChangeFlag = true;
- break;
+
+ boolean processed = false;
+ int retryCount = 0;
+
+ while (!processed && !isStopped()) {
+ int result = doPut(msg,
needRoll(tr.getMagic()));
+
+ if (result == PUT_OK) {
+ processed = true;
+ } else if (result == PUT_NO_RETRY) {
+
TimerMessageStore.LOGGER.warn("Skipping message due to unrecoverable error.
Msg: {}", msg);
+ processed = true;
+ } else {
+ retryCount++;
+ // Without enabling
TimerEnableRetryUntilSuccess, messages will retry up to 3 times before being
discarded
+ if
(!storeConfig.isTimerEnableRetryUntilSuccess() && retryCount >= 3) {
+
TimerMessageStore.LOGGER.error("Message processing failed after {} retries.
Msg: {}", retryCount, msg);
+ processed = true;
+ } else {
+ Thread.sleep(500L * precisionMs /
1000);
+
TimerMessageStore.LOGGER.warn("Retrying to process message. Retry count: {},
Msg: {}", retryCount, msg);
+ }
}
- doRes = PUT_NEED_RETRY != doPut(msg,
needRoll(tr.getMagic()));
- Thread.sleep(500L * precisionMs / 1000);
}
+
perfCounterTicks.endTick(DEQUEUE_PUT);
+ break;
+
} catch (Throwable t) {
- LOGGER.info("Unknown error", t);
+ TimerMessageStore.LOGGER.info("Unknown error",
t);
if (storeConfig.isTimerSkipUnknownError()) {
- doRes = true;
+ break;
} else {
holdMomentForUnknownError();
}
@@ -1531,7 +1551,6 @@ public class TimerMessageStore {
} finally {
tr.idempotentRelease(!tmpDequeueChangeFlag);
}
-
} catch (Throwable e) {
TimerMessageStore.LOGGER.error("Error occurred in " +
getServiceName(), e);
}
diff --git
a/store/src/test/java/org/apache/rocketmq/store/timer/TimerMessageStoreTest.java
b/store/src/test/java/org/apache/rocketmq/store/timer/TimerMessageStoreTest.java
index 4ce3985f6c..52e58efde2 100644
---
a/store/src/test/java/org/apache/rocketmq/store/timer/TimerMessageStoreTest.java
+++
b/store/src/test/java/org/apache/rocketmq/store/timer/TimerMessageStoreTest.java
@@ -19,6 +19,7 @@ package org.apache.rocketmq.store.timer;
import java.io.File;
import java.io.IOException;
+import java.lang.reflect.Field;
import java.net.InetAddress;
import java.net.InetSocketAddress;
import java.net.SocketAddress;
@@ -30,6 +31,7 @@ import java.util.Map;
import java.util.Set;
import java.util.concurrent.Callable;
import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import org.apache.rocketmq.common.BrokerConfig;
@@ -40,23 +42,26 @@ import
org.apache.rocketmq.common.message.MessageClientIDSetter;
import org.apache.rocketmq.common.message.MessageConst;
import org.apache.rocketmq.common.message.MessageDecoder;
import org.apache.rocketmq.common.message.MessageExt;
-import org.apache.rocketmq.store.ConsumeQueue;
-import org.apache.rocketmq.store.DefaultMessageStore;
-import org.apache.rocketmq.store.GetMessageResult;
-import org.apache.rocketmq.store.GetMessageStatus;
-import org.apache.rocketmq.store.MessageArrivingListener;
import org.apache.rocketmq.common.message.MessageExtBrokerInner;
import org.apache.rocketmq.store.MessageStore;
import org.apache.rocketmq.store.PutMessageResult;
+import org.apache.rocketmq.store.DefaultMessageStore;
import org.apache.rocketmq.store.PutMessageStatus;
import org.apache.rocketmq.store.config.FlushDiskType;
import org.apache.rocketmq.store.config.MessageStoreConfig;
import org.apache.rocketmq.store.stats.BrokerStatsManager;
+import org.apache.rocketmq.store.ConsumeQueue;
+import org.apache.rocketmq.store.AppendMessageStatus;
+import org.apache.rocketmq.store.AppendMessageResult;
+import org.apache.rocketmq.store.GetMessageResult;
+import org.apache.rocketmq.store.GetMessageStatus;
+import org.apache.rocketmq.store.MessageArrivingListener;
import org.junit.After;
import org.junit.Assert;
import org.junit.Assume;
import org.junit.Before;
import org.junit.Test;
+import org.mockito.Mockito;
import static org.assertj.core.api.Assertions.assertThat;
import static org.awaitility.Awaitility.await;
@@ -65,10 +70,16 @@ import static org.junit.Assert.assertNotEquals;
import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertNull;
import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.assertFalse;
+import static org.mockito.Mockito.when;
+import static org.mockito.Mockito.any;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.times;
public class TimerMessageStoreTest {
private final byte[] msgBody = new byte[1024];
private static MessageStore messageStore;
+ private MessageStore mockMessageStore;
private SocketAddress bornHost;
private SocketAddress storeHost;
@@ -100,21 +111,23 @@ public class TimerMessageStoreTest {
storeConfig.setTimerInterceptDelayLevel(true);
storeConfig.setTimerPrecisionMs(precisionMs);
+ mockMessageStore = Mockito.mock(MessageStore.class);
messageStore = new DefaultMessageStore(storeConfig, new
BrokerStatsManager("TimerTest",false), new MyMessageArrivingListener(), new
BrokerConfig(), new ConcurrentHashMap<>());
boolean load = messageStore.load();
assertTrue(load);
messageStore.start();
}
- public TimerMessageStore createTimerMessageStore(String rootDir) throws
IOException {
+ public TimerMessageStore createTimerMessageStore(String rootDir , boolean
needMock) throws IOException {
if (null == rootDir) {
rootDir = StoreTestUtils.createBaseDir();
}
TimerCheckpoint timerCheckpoint = new TimerCheckpoint(rootDir +
File.separator + "config" + File.separator + "timercheck");
TimerMetrics timerMetrics = new TimerMetrics(rootDir + File.separator
+ "config" + File.separator + "timermetrics");
- TimerMessageStore timerMessageStore = new
TimerMessageStore(messageStore, storeConfig, timerCheckpoint, timerMetrics,
null);
- messageStore.setTimerMessageStore(timerMessageStore);
+ MessageStore ms = needMock ? mockMessageStore : messageStore;
+ TimerMessageStore timerMessageStore = new TimerMessageStore(ms,
storeConfig, timerCheckpoint, timerMetrics, null);
+ ms.setTimerMessageStore(timerMessageStore);
baseDirs.add(rootDir);
timerStores.add(timerMessageStore);
@@ -170,7 +183,7 @@ public class TimerMessageStoreTest {
Assume.assumeFalse(MixAll.isWindows());
String topic = "TimerTest_testPutTimerMessage";
- final TimerMessageStore timerMessageStore =
createTimerMessageStore(null);
+ final TimerMessageStore timerMessageStore =
createTimerMessageStore(null , false);
timerMessageStore.load();
timerMessageStore.start(true);
@@ -212,12 +225,52 @@ public class TimerMessageStoreTest {
}
}
+ @Test
+ public void testRetryUntilSuccess() throws Exception {
+ storeConfig.setTimerEnableRetryUntilSuccess(true);
+ TimerMessageStore timerMessageStore = createTimerMessageStore(null ,
true);
+ timerMessageStore.load();
+ timerMessageStore.setShouldRunningDequeue(true);
+ Field stateField = TimerMessageStore.class.getDeclaredField("state");
+ stateField.setAccessible(true);
+ stateField.set(timerMessageStore, TimerMessageStore.RUNNING);
+
+ MessageExtBrokerInner msg = buildMessage(3000L, "TestRetry", true);
+ transformTimerMessage(timerMessageStore, msg);
+ TimerRequest timerRequest = new TimerRequest(100, 200, 3000,
System.currentTimeMillis(), 0, msg);
+ boolean offered =
timerMessageStore.dequeuePutQueue.offer(timerRequest);
+ assertTrue(offered);
+ assertFalse(timerMessageStore.dequeuePutQueue.isEmpty());
+
+ // If enableRetryUntilSuccess is set and putMessage return NEED_RETRY
type, the message should be retried until success.
+ when(mockMessageStore.putMessage(any(MessageExtBrokerInner.class)))
+ .thenReturn(new
PutMessageResult(PutMessageStatus.FLUSH_DISK_TIMEOUT, null))
+ .thenReturn(new
PutMessageResult(PutMessageStatus.FLUSH_SLAVE_TIMEOUT, null))
+ .thenReturn(new
PutMessageResult(PutMessageStatus.OS_PAGE_CACHE_BUSY, null))
+ .thenReturn(new
PutMessageResult(PutMessageStatus.OS_PAGE_CACHE_BUSY, null))
+ .thenReturn(new
PutMessageResult(PutMessageStatus.UNKNOWN_ERROR, null))
+ .thenReturn(new PutMessageResult(PutMessageStatus.PUT_OK, new
AppendMessageResult(AppendMessageStatus.PUT_OK)));
+
+ final CountDownLatch latch = new CountDownLatch(1);
+ new Thread(() -> {
+ try {
+ timerMessageStore.getDequeuePutMessageServices()[0].run();
+ } finally {
+ latch.countDown();
+ }
+ }).start();
+ latch.await(10, TimeUnit.SECONDS);
+
+ assertTrue(timerMessageStore.dequeuePutQueue.isEmpty());
+ verify(mockMessageStore,
times(6)).putMessage(any(MessageExtBrokerInner.class));
+ }
+
@Test
public void testTimerFlowControl() throws Exception {
String topic = "TimerTest_testTimerFlowControl";
storeConfig.setTimerCongestNumEachSlot(100);
- TimerMessageStore timerMessageStore = createTimerMessageStore(null);
+ TimerMessageStore timerMessageStore = createTimerMessageStore(null ,
false);
timerMessageStore.load();
timerMessageStore.start(true);
@@ -264,7 +317,7 @@ public class TimerMessageStoreTest {
String topic = "TimerTest_testPutExpiredTimerMessage";
- TimerMessageStore timerMessageStore = createTimerMessageStore(null);
+ TimerMessageStore timerMessageStore = createTimerMessageStore(null
,false);
timerMessageStore.load();
timerMessageStore.start(true);
@@ -288,7 +341,7 @@ public class TimerMessageStoreTest {
public void testDeleteTimerMessage() throws Exception {
String topic = "TimerTest_testDeleteTimerMessage";
- TimerMessageStore timerMessageStore = createTimerMessageStore(null);
+ TimerMessageStore timerMessageStore = createTimerMessageStore(null
,false);
timerMessageStore.load();
timerMessageStore.start(true);
@@ -325,7 +378,7 @@ public class TimerMessageStoreTest {
public void testPutDeleteTimerMessage() throws Exception {
String topic = "TimerTest_testPutDeleteTimerMessage";
- final TimerMessageStore timerMessageStore =
createTimerMessageStore(null);
+ final TimerMessageStore timerMessageStore =
createTimerMessageStore(null , false);
timerMessageStore.load();
timerMessageStore.start(true);
@@ -372,7 +425,7 @@ public class TimerMessageStoreTest {
final String topic = "TimerTest_testStateAndRecover";
String base = StoreTestUtils.createBaseDir();
- final TimerMessageStore first = createTimerMessageStore(base);
+ final TimerMessageStore first = createTimerMessageStore(base , false);
first.load();
first.start(true);
@@ -417,7 +470,7 @@ public class TimerMessageStoreTest {
first.getTimerWheel().flush();
first.shutdown();
- final TimerMessageStore second = createTimerMessageStore(base);
+ final TimerMessageStore second = createTimerMessageStore(base , false);
second.debug = true;
assertTrue(second.load());
assertEquals(msgNum, second.getQueueOffset());
@@ -446,7 +499,7 @@ public class TimerMessageStoreTest {
public void testMaxDelaySec() throws Exception {
String topic = "TimerTest_testMaxDelaySec";
- TimerMessageStore first = createTimerMessageStore(null);
+ TimerMessageStore first = createTimerMessageStore(null , false);
first.load();
first.start(true);
@@ -468,7 +521,7 @@ public class TimerMessageStoreTest {
storeConfig.setTimerRollWindowSlot(2);
String topic = "TimerTest_testRollMessage";
- TimerMessageStore timerMessageStore = createTimerMessageStore(null);
+ TimerMessageStore timerMessageStore = createTimerMessageStore(null ,
false);
timerMessageStore.load();
timerMessageStore.start(true);