This is an automated email from the ASF dual-hosted git repository.
jinrongtong pushed a commit to branch develop
in repository https://gitbox.apache.org/repos/asf/rocketmq.git
The following commit(s) were added to refs/heads/develop by this push:
new 88a9d939ce [ISSUE #7381] Fix the problem of inaccurate timer message
metric (#7382)
88a9d939ce is described below
commit 88a9d939ce110381b3b418370d4711c0c214dc7f
Author: Ji Juntao <[email protected]>
AuthorDate: Sat Sep 23 17:38:27 2023 +0800
[ISSUE #7381] Fix the problem of inaccurate timer message metric (#7382)
* correct the timerMetrics' result.
* for further extension.
* checkstyle.
* use toLong.
---
.../rocketmq/store/timer/TimerMessageStore.java | 20 ++++++++++++++++----
.../apache/rocketmq/store/timer/TimerMetrics.java | 5 ++++-
.../apache/rocketmq/store/timer/TimerRequest.java | 7 +++++--
.../rocketmq/store/timer/TimerMetricsTest.java | 10 ++++++++--
4 files changed, 33 insertions(+), 9 deletions(-)
diff --git
a/store/src/main/java/org/apache/rocketmq/store/timer/TimerMessageStore.java
b/store/src/main/java/org/apache/rocketmq/store/timer/TimerMessageStore.java
index 0d50de65ae..ac4c61cd61 100644
--- a/store/src/main/java/org/apache/rocketmq/store/timer/TimerMessageStore.java
+++ b/store/src/main/java/org/apache/rocketmq/store/timer/TimerMessageStore.java
@@ -41,6 +41,7 @@ import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.function.Function;
import org.apache.commons.collections.CollectionUtils;
+import org.apache.commons.lang3.math.NumberUtils;
import org.apache.rocketmq.common.ServiceThread;
import org.apache.rocketmq.common.ThreadFactoryImpl;
import org.apache.rocketmq.common.TopicFilterType;
@@ -599,7 +600,12 @@ public class TimerMessageStore {
if (null == msg || null ==
msg.getProperty(MessageConst.PROPERTY_REAL_TOPIC)) {
return;
}
-
timerMetrics.addAndGet(msg.getProperty(MessageConst.PROPERTY_REAL_TOPIC),
value);
+ if (msg.getProperty(TIMER_ENQUEUE_MS) != null
+ && NumberUtils.toLong(msg.getProperty(TIMER_ENQUEUE_MS))
== Long.MAX_VALUE) {
+ return;
+ }
+ // pass msg into addAndGet, for further more judgement extension.
+ timerMetrics.addAndGet(msg, value);
} catch (Throwable t) {
if (frequency.incrementAndGet() % 1000 == 0) {
LOGGER.error("error in adding metric", t);
@@ -1323,6 +1329,7 @@ public class TimerMessageStore {
perfCounterTicks.startTick(ENQUEUE_PUT);
DefaultStoreMetricsManager.incTimerEnqueueCount(getRealTopic(req.getMsg()));
if (shouldRunningDequeue && req.getDelayTime() <
currWriteTimeMs) {
+ req.setEnqueueTime(Long.MAX_VALUE);
dequeuePutQueue.put(req);
} else {
boolean doEnqueueRes = doEnqueue(
@@ -1452,9 +1459,14 @@ public class TimerMessageStore {
}
try {
perfCounterTicks.startTick(DEQUEUE_PUT);
-
DefaultStoreMetricsManager.incTimerDequeueCount(getRealTopic(tr.getMsg()));
- addMetric(tr.getMsg(), -1);
- MessageExtBrokerInner msg =
convert(tr.getMsg(), tr.getEnqueueTime(), needRoll(tr.getMagic()));
+ MessageExt msgExt = tr.getMsg();
+
DefaultStoreMetricsManager.incTimerDequeueCount(getRealTopic(msgExt));
+ if (tr.getEnqueueTime() == Long.MAX_VALUE) {
+ // never enqueue, mark it.
+ MessageAccessor.putProperty(msgExt,
TIMER_ENQUEUE_MS, String.valueOf(Long.MAX_VALUE));
+ }
+ addMetric(msgExt, -1);
+ MessageExtBrokerInner msg = convert(msgExt,
tr.getEnqueueTime(), needRoll(tr.getMagic()));
doRes = PUT_NEED_RETRY != doPut(msg,
needRoll(tr.getMagic()));
while (!doRes && !isStopped()) {
if (!isRunningDequeue()) {
diff --git
a/store/src/main/java/org/apache/rocketmq/store/timer/TimerMetrics.java
b/store/src/main/java/org/apache/rocketmq/store/timer/TimerMetrics.java
index e7b00cc073..7f8fedd8a5 100644
--- a/store/src/main/java/org/apache/rocketmq/store/timer/TimerMetrics.java
+++ b/store/src/main/java/org/apache/rocketmq/store/timer/TimerMetrics.java
@@ -38,6 +38,8 @@ import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
import org.apache.rocketmq.common.ConfigManager;
import org.apache.rocketmq.common.constant.LoggerName;
+import org.apache.rocketmq.common.message.MessageConst;
+import org.apache.rocketmq.common.message.MessageExt;
import org.apache.rocketmq.common.topic.TopicValidator;
import org.apache.rocketmq.logging.org.slf4j.Logger;
import org.apache.rocketmq.logging.org.slf4j.LoggerFactory;
@@ -78,7 +80,8 @@ public class TimerMetrics extends ConfigManager {
return distPair.getCount().addAndGet(value);
}
- public long addAndGet(String topic, int value) {
+ public long addAndGet(MessageExt msg, int value) {
+ String topic = msg.getProperty(MessageConst.PROPERTY_REAL_TOPIC);
Metric pair = getTopicPair(topic);
getDataVersion().nextVersion();
pair.setTimeStamp(System.currentTimeMillis());
diff --git
a/store/src/main/java/org/apache/rocketmq/store/timer/TimerRequest.java
b/store/src/main/java/org/apache/rocketmq/store/timer/TimerRequest.java
index 1dd64f7592..1b25d355c6 100644
--- a/store/src/main/java/org/apache/rocketmq/store/timer/TimerRequest.java
+++ b/store/src/main/java/org/apache/rocketmq/store/timer/TimerRequest.java
@@ -27,8 +27,9 @@ public class TimerRequest {
private final int sizePy;
private final long delayTime;
- private final long enqueueTime;
private final int magic;
+
+ private long enqueueTime;
private MessageExt msg;
@@ -94,7 +95,9 @@ public class TimerRequest {
public void setLatch(CountDownLatch latch) {
this.latch = latch;
}
-
+ public void setEnqueueTime(long enqueueTime) {
+ this.enqueueTime = enqueueTime;
+ }
public void idempotentRelease() {
idempotentRelease(true);
}
diff --git
a/store/src/test/java/org/apache/rocketmq/store/timer/TimerMetricsTest.java
b/store/src/test/java/org/apache/rocketmq/store/timer/TimerMetricsTest.java
index b7392cc455..3c7b9b67fb 100644
--- a/store/src/test/java/org/apache/rocketmq/store/timer/TimerMetricsTest.java
+++ b/store/src/test/java/org/apache/rocketmq/store/timer/TimerMetricsTest.java
@@ -16,6 +16,9 @@
*/
package org.apache.rocketmq.store.timer;
+import org.apache.rocketmq.common.message.MessageAccessor;
+import org.apache.rocketmq.common.message.MessageConst;
+import org.apache.rocketmq.common.message.MessageExt;
import org.junit.Assert;
import org.junit.Test;
@@ -31,8 +34,11 @@ public class TimerMetricsTest {
TimerMetrics first = new TimerMetrics(baseDir);
Assert.assertTrue(first.load());
- first.addAndGet("AAA", 1000);
- first.addAndGet("BBB", 2000);
+ MessageExt msg = new MessageExt();
+ MessageAccessor.putProperty(msg, MessageConst.PROPERTY_REAL_TOPIC,
"AAA");
+ first.addAndGet(msg, 1000);
+ MessageAccessor.putProperty(msg, MessageConst.PROPERTY_REAL_TOPIC,
"BBB");
+ first.addAndGet(msg, 2000);
Assert.assertEquals(1000, first.getTimingCount("AAA"));
Assert.assertEquals(2000, first.getTimingCount("BBB"));
long curr = System.currentTimeMillis();