This is an automated email from the ASF dual-hosted git repository.
lollipop pushed a commit to branch develop
in repository https://gitbox.apache.org/repos/asf/rocketmq.git
The following commit(s) were added to refs/heads/develop by this push:
new 53fdc4ad3d [ISSUE #9213] Fix get the earliest time error when data is
clean up in tiered storage (#9214)
53fdc4ad3d is described below
commit 53fdc4ad3d23a1574aed8112fc2d3f34b7e66ad9
Author: lizhimins <[email protected]>
AuthorDate: Mon Mar 3 09:58:06 2025 +0800
[ISSUE #9213] Fix get the earliest time error when data is clean up in
tiered storage (#9214)
* [ISSUE #9213] Fix get the earliest time error when data is clean up in
tiered storag
---
.../rocketmq/tieredstore/TieredMessageStore.java | 26 ++++++++++------------
.../tieredstore/core/MessageStoreFetcherImpl.java | 9 +-------
.../rocketmq/tieredstore/file/FlatMessageFile.java | 12 ++++++----
.../tieredstore/TieredMessageStoreTest.java | 2 +-
4 files changed, 22 insertions(+), 27 deletions(-)
diff --git
a/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/TieredMessageStore.java
b/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/TieredMessageStore.java
index 0e3ede871c..f1c935d00b 100644
---
a/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/TieredMessageStore.java
+++
b/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/TieredMessageStore.java
@@ -62,6 +62,7 @@ import org.slf4j.LoggerFactory;
public class TieredMessageStore extends AbstractPluginMessageStore {
protected static final Logger log =
LoggerFactory.getLogger(MessageStoreUtil.TIERED_STORE_LOGGER_NAME);
+ protected static final long MIN_STORE_TIME = -1L;
protected final String brokerName;
protected final MessageStore defaultStore;
@@ -310,24 +311,21 @@ public class TieredMessageStore extends
AbstractPluginMessageStore {
return getEarliestMessageTimeAsync(topic, queueId).join();
}
+ /**
+ * In the original design, getting the earliest time of the first message
+ * would generate two RPC requests. However, using the timestamp stored in
the metadata
+ * avoids these requests, although this approach might introduce some
level of inaccuracy.
+ */
@Override
public CompletableFuture<Long> getEarliestMessageTimeAsync(String topic,
int queueId) {
- long nextEarliestMessageTime = next.getEarliestMessageTime(topic,
queueId);
- long finalNextEarliestMessageTime = nextEarliestMessageTime > 0 ?
nextEarliestMessageTime : Long.MAX_VALUE;
- Stopwatch stopwatch = Stopwatch.createStarted();
+ long localMinTime = next.getEarliestMessageTime(topic, queueId);
return fetcher.getEarliestMessageTimeAsync(topic, queueId)
- .thenApply(time -> {
- Attributes latencyAttributes =
TieredStoreMetricsManager.newAttributesBuilder()
- .put(TieredStoreMetricsConstant.LABEL_OPERATION,
TieredStoreMetricsConstant.OPERATION_API_GET_EARLIEST_MESSAGE_TIME)
- .put(TieredStoreMetricsConstant.LABEL_TOPIC, topic)
- .build();
-
TieredStoreMetricsManager.apiLatency.record(stopwatch.elapsed(TimeUnit.MILLISECONDS),
latencyAttributes);
- if (time < 0) {
- log.debug("GetEarliestMessageTimeAsync failed, try to get
earliest message time from next store: topic: {}, queue: {}",
- topic, queueId);
- return finalNextEarliestMessageTime != Long.MAX_VALUE ?
finalNextEarliestMessageTime : -1;
+ .thenApply(remoteMinTime -> {
+ if (localMinTime > MIN_STORE_TIME && remoteMinTime >
MIN_STORE_TIME) {
+ return Math.min(localMinTime, remoteMinTime);
}
- return Math.min(finalNextEarliestMessageTime, time);
+ return localMinTime > MIN_STORE_TIME ? localMinTime :
+ (remoteMinTime > MIN_STORE_TIME ? remoteMinTime :
MIN_STORE_TIME);
});
}
diff --git
a/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/core/MessageStoreFetcherImpl.java
b/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/core/MessageStoreFetcherImpl.java
index bc347bd5b4..9e5ab01d3b 100644
---
a/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/core/MessageStoreFetcherImpl.java
+++
b/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/core/MessageStoreFetcherImpl.java
@@ -375,14 +375,7 @@ public class MessageStoreFetcherImpl implements
MessageStoreFetcher {
@Override
public CompletableFuture<Long> getEarliestMessageTimeAsync(String topic,
int queueId) {
FlatMessageFile flatFile = flatFileStore.getFlatFile(new
MessageQueue(topic, brokerName, queueId));
- if (flatFile == null) {
- return CompletableFuture.completedFuture(-1L);
- }
-
- // read from timestamp to timestamp + length
- int length = MessageFormatUtil.STORE_TIMESTAMP_POSITION + 8;
- return flatFile.getCommitLogAsync(flatFile.getCommitLogMinOffset(),
length)
- .thenApply(MessageFormatUtil::getStoreTimeStamp);
+ return CompletableFuture.completedFuture(flatFile == null ? -1L :
flatFile.getMinStoreTimestamp());
}
@Override
diff --git
a/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/file/FlatMessageFile.java
b/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/file/FlatMessageFile.java
index 4510a8a127..ade37149d6 100644
---
a/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/file/FlatMessageFile.java
+++
b/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/file/FlatMessageFile.java
@@ -178,16 +178,20 @@ public class FlatMessageFile implements FlatFileInterface
{
return consumeQueue.append(buffer, request.getStoreTimestamp());
}
-
-
@Override
public void release() {
-
}
@Override
public long getMinStoreTimestamp() {
- return commitLog.getMinTimestamp();
+ long minStoreTime = -1L;
+ if (Long.MAX_VALUE != commitLog.getMinTimestamp()) {
+ minStoreTime = Math.max(minStoreTime, commitLog.getMinTimestamp());
+ }
+ if (Long.MAX_VALUE != consumeQueue.getMinTimestamp()) {
+ minStoreTime = Math.max(minStoreTime,
consumeQueue.getMinTimestamp());
+ }
+ return minStoreTime;
}
@Override
diff --git
a/tieredstore/src/test/java/org/apache/rocketmq/tieredstore/TieredMessageStoreTest.java
b/tieredstore/src/test/java/org/apache/rocketmq/tieredstore/TieredMessageStoreTest.java
index 2f39558482..bb259ae811 100644
---
a/tieredstore/src/test/java/org/apache/rocketmq/tieredstore/TieredMessageStoreTest.java
+++
b/tieredstore/src/test/java/org/apache/rocketmq/tieredstore/TieredMessageStoreTest.java
@@ -246,7 +246,7 @@ public class TieredMessageStoreTest {
@Test
public void testGetEarliestMessageTimeAsync() {
when(fetcher.getEarliestMessageTimeAsync(anyString(),
anyInt())).thenReturn(CompletableFuture.completedFuture(1L));
- Assert.assertEquals(1, (long)
currentStore.getEarliestMessageTimeAsync(mq.getTopic(),
mq.getQueueId()).join());
+ Assert.assertEquals(0, (long)
currentStore.getEarliestMessageTimeAsync(mq.getTopic(),
mq.getQueueId()).join());
when(fetcher.getEarliestMessageTimeAsync(anyString(),
anyInt())).thenReturn(CompletableFuture.completedFuture(-1L));
when(defaultStore.getEarliestMessageTime(anyString(),
anyInt())).thenReturn(2L);