This is an automated email from the ASF dual-hosted git repository.
jinrongtong pushed a commit to branch develop
in repository https://gitbox.apache.org/repos/asf/rocketmq.git
The following commit(s) were added to refs/heads/develop by this push:
new 9aa081b8ac [ISSUE #8988] Support dispatchBehindMilliseconds (#8989)
9aa081b8ac is described below
commit 9aa081b8acfd01a40f50bd9c3face3c0d2c530b1
Author: guyinyou <[email protected]>
AuthorDate: Tue Dec 10 20:05:44 2024 +0800
[ISSUE #8988] Support dispatchBehindMilliseconds (#8989)
* support dispatchBehindMilliseconds
* Modify the initial value of currentReputTimestamp
---------
Co-authored-by: guyinyou <[email protected]>
---
.../apache/rocketmq/store/DefaultMessageStore.java | 25 +++++++++++++++++++++-
.../org/apache/rocketmq/store/MessageStore.java | 7 ++++++
.../store/plugin/AbstractPluginMessageStore.java | 5 +++++
3 files changed, 36 insertions(+), 1 deletion(-)
diff --git
a/store/src/main/java/org/apache/rocketmq/store/DefaultMessageStore.java
b/store/src/main/java/org/apache/rocketmq/store/DefaultMessageStore.java
index 6b8ea0ee8a..9d3c46a438 100644
--- a/store/src/main/java/org/apache/rocketmq/store/DefaultMessageStore.java
+++ b/store/src/main/java/org/apache/rocketmq/store/DefaultMessageStore.java
@@ -1556,6 +1556,10 @@ public class DefaultMessageStore implements MessageStore
{
public long dispatchBehindBytes() {
return this.reputMessageService.behind();
}
+ @Override
+ public long dispatchBehindMilliseconds() {
+ return this.reputMessageService.behindMs();
+ }
public long flushBehindBytes() {
if (this.messageStoreConfig.isTransientStorePoolEnable()) {
@@ -2793,6 +2797,7 @@ public class DefaultMessageStore implements MessageStore {
class ReputMessageService extends ServiceThread {
protected volatile long reputFromOffset = 0;
+ protected volatile long currentReputTimestamp =
System.currentTimeMillis();
public long getReputFromOffset() {
return reputFromOffset;
@@ -2802,6 +2807,10 @@ public class DefaultMessageStore implements MessageStore
{
this.reputFromOffset = reputFromOffset;
}
+ public long getCurrentReputTimestamp() {
+ return currentReputTimestamp;
+ }
+
@Override
public void shutdown() {
for (int i = 0; i < 50 && this.isCommitLogAvailable(); i++) {
@@ -2824,6 +2833,15 @@ public class DefaultMessageStore implements MessageStore
{
return DefaultMessageStore.this.getConfirmOffset() -
this.reputFromOffset;
}
+ public long behindMs() {
+ long lastCommitLogFileTimeStamp = System.currentTimeMillis();
+ MappedFile lastMappedFile =
DefaultMessageStore.this.commitLog.getMappedFileQueue().getLastMappedFile();
+ if (lastMappedFile != null) {
+ lastCommitLogFileTimeStamp =
lastMappedFile.getStoreTimestamp();
+ }
+ return Math.max(0, lastCommitLogFileTimeStamp -
this.currentReputTimestamp);
+ }
+
public boolean isCommitLogAvailable() {
return this.reputFromOffset < getReputEndOffset();
}
@@ -2838,7 +2856,11 @@ public class DefaultMessageStore implements MessageStore
{
this.reputFromOffset,
DefaultMessageStore.this.commitLog.getMinOffset());
this.reputFromOffset =
DefaultMessageStore.this.commitLog.getMinOffset();
}
- for (boolean doNext = true; this.isCommitLogAvailable() && doNext;
) {
+ boolean isCommitLogAvailable = isCommitLogAvailable();
+ if (!isCommitLogAvailable) {
+ currentReputTimestamp = System.currentTimeMillis();
+ }
+ for (boolean doNext = true; isCommitLogAvailable && doNext; ) {
SelectMappedBufferResult result =
DefaultMessageStore.this.commitLog.getData(reputFromOffset);
@@ -2861,6 +2883,7 @@ public class DefaultMessageStore implements MessageStore {
if (dispatchRequest.isSuccess()) {
if (size > 0) {
+ currentReputTimestamp =
dispatchRequest.getStoreTimestamp();
DefaultMessageStore.this.doDispatch(dispatchRequest);
if (!notifyMessageArriveInBatch) {
diff --git a/store/src/main/java/org/apache/rocketmq/store/MessageStore.java
b/store/src/main/java/org/apache/rocketmq/store/MessageStore.java
index 5c3984e5b2..4bbee142a1 100644
--- a/store/src/main/java/org/apache/rocketmq/store/MessageStore.java
+++ b/store/src/main/java/org/apache/rocketmq/store/MessageStore.java
@@ -511,6 +511,13 @@ public interface MessageStore {
*/
long dispatchBehindBytes();
+ /**
+ * Get number of the milliseconds that have been stored in commit log and
not yet dispatched to consume queue.
+ *
+ * @return number of the milliseconds to dispatch.
+ */
+ long dispatchBehindMilliseconds();
+
/**
* Flush the message store to persist all data.
*
diff --git
a/store/src/main/java/org/apache/rocketmq/store/plugin/AbstractPluginMessageStore.java
b/store/src/main/java/org/apache/rocketmq/store/plugin/AbstractPluginMessageStore.java
index 0f57a17d46..d5d6236458 100644
---
a/store/src/main/java/org/apache/rocketmq/store/plugin/AbstractPluginMessageStore.java
+++
b/store/src/main/java/org/apache/rocketmq/store/plugin/AbstractPluginMessageStore.java
@@ -293,6 +293,11 @@ public abstract class AbstractPluginMessageStore
implements MessageStore {
return next.dispatchBehindBytes();
}
+ @Override
+ public long dispatchBehindMilliseconds() {
+ return next.dispatchBehindMilliseconds();
+ }
+
@Override
public long flush() {
return next.flush();