guyinyou opened a new issue, #10073: URL: https://github.com/apache/rocketmq/issues/10073
### Before Creating the Enhancement Request - [x] I have confirmed that this should be classified as an enhancement rather than a bug/feature. ### Summary Make logicsMsgTimestamp in StoreCheckpoint represent the storetime of the last message whose consume queue entry has been flushed to disk, instead of the storetime of the latest entry written to consume queue in memory. Introduce logicsMsgTempTimestamp for the in-memory latest storetime and update logicsMsgTimestamp only when consume queue is actually flushed. ### Motivation Today, logicsMsgTimestamp is updated in putMessagePositionInfoWrapper whenever a message is written to consume queue (in memory). That makes it the “latest CQ storetime in memory,” not the “latest CQ storetime that has been flushed.” That leads to: Recovery: Code that uses getMinTimestamp() (e.g. dispatchFromStoreTimestamp) assumes logicsMsgTimestamp reflects persisted progress. If it is ahead of what is actually on disk, recovery or time-based logic can be wrong. Semantic misuse: Any logic that treats logicsMsgTimestamp as “last flushed CQ storetime” is incorrect. This enhancement is necessary so that checkpoint timestamps match real persistence and recovery/time-based features behave correctly. ### Describe the Solution You'd Like StoreCheckpoint Add a volatile field logicsMsgTempTimestamp (in-memory only, not persisted in the checkpoint file). Add getters/setters for it. Keep logicsMsgTimestamp as the “last flushed CQ storetime” and continue to read/write it in the checkpoint file. Write path (ConsumeQueue & BatchConsumeQueue) In putMessagePositionInfoWrapper, after successfully appending to consume queue, call setLogicsMsgTempTimestamp(request.getStoreTimestamp()) instead of setLogicsMsgTimestamp(...). Flush path (ConsumeQueueStore.FlushConsumeQueueService) In doFlush, when a “thorough” flush is triggered (e.g. when flushConsumeQueueThoroughInterval has elapsed), read getLogicsMsgTempTimestamp() at the start of the flush into a local variable. After flushing all consume queues (and any related stores), if this run was a thorough flush (flushConsumeQueueLeastPages == 0) and the captured value > 0, call setLogicsMsgTimestamp(capturedValue) and then storeCheckpoint.flush() so the checkpoint file is updated. Result: logicsMsgTimestamp is only advanced after CQ data is flushed, and it correctly represents the storetime of the last flushed consume queue entry. ### Describe Alternatives You've Considered Update logicsMsgTimestamp on every CQ flush (e.g. per-MappedFile) Rejected: Would require tracking “last flushed storetime” per file or per flush batch and merging into a single checkpoint field, adding complexity without clear benefit over the current “thorough flush” approach. Keep current behavior and document that logicsMsgTimestamp is “best effort” Rejected: Recovery and time-based logic assume it reflects persisted state; documenting it as best-effort does not fix incorrect behavior. Persist logicsMsgTempTimestamp in the checkpoint file Rejected: The checkpoint is meant to record persisted progress. Persisting the in-memory temp value would blur the meaning of the file and could confuse recovery if the process crashes before a flush. ### Additional Context _No response_ -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected]
