bxfjb opened a new issue, #8065: URL: https://github.com/apache/rocketmq/issues/8065
### Before Creating the Bug Report - [X] I found a bug, not just asking a question, which should be created in [GitHub Discussions](https://github.com/apache/rocketmq/discussions). - [X] I have searched the [GitHub Issues](https://github.com/apache/rocketmq/issues) and [GitHub Discussions](https://github.com/apache/rocketmq/discussions) of this repository and believe that this is not a duplicate. - [X] I have confirmed that this bug belongs to the current repository, not other repositories of RocketMQ. ### Runtime platform environment CentOS 7 ### RocketMQ version develop ### JDK Version OpenJDK 8 ### Describe the Bug ## 表现 - 分层存储激活时,当向新创建的 Topic 中生产消息,后续消息能否正常转储取决于任一批消息生产的时间点; - 日志: ``` 2024-04-24 10:38:03 INFO ReputMessageService - Constructing Posix FileSegment, filePath: /home/work/hdd1/rocketmq/tieredstore/609b5c2c_c5tst-common-rocketmq/c5tst-common-rocketmq-raft1/order-test1/0/COMMIT_LOG/cfcd208400000000000000000000 2024-04-24 10:38:12 INFO MessageStoreDispatcher - Constructing Posix FileSegment, filePath: /home/work/hdd1/rocketmq/tieredstore/609b5c2c_c5tst-common-rocketmq/c5tst-common-rocketmq-raft1/order-test1/0/CONSUME_QUEUE/cfcd208400000000000000000000 2024-04-24 10:38:31 INFO TieredCommonExecutor_1 - Destroy Posix FileSegment, filePath: /home/work/hdd1/rocketmq/tieredstore/609b5c2c_c5tst-common-rocketmq/c5tst-common-rocketmq-raft1/order-test1/0/COMMIT_LOG/cfcd208400000000000000000000 2024-04-24 10:38:31 INFO TieredCommonExecutor_1 - Destroy Posix FileSegment, filePath: /home/work/hdd1/rocketmq/tieredstore/609b5c2c_c5tst-common-rocketmq/c5tst-common-rocketmq-raft1/order-test1/0/CONSUME_QUEUE/cfcd208400000000000000000000 2024-04-24 10:38:31 INFO TieredCommonExecutor_1 - FlatFileStore destroy file, topic=order-test1, queueId=0 2024-04-24 10:40:21 INFO ReputMessageService - Constructing Posix FileSegment, filePath: /home/work/hdd1/rocketmq/tieredstore/609b5c2c_c5tst-common-rocketmq/c5tst-common-rocketmq-raft1/order-test1/0/COMMIT_LOG/cfcd208400000000000000000000 2024-04-24 10:40:31 INFO TieredCommonExecutor_1 - Destroy Posix FileSegment, filePath: /home/work/hdd1/rocketmq/tieredstore/609b5c2c_c5tst-common-rocketmq/c5tst-common-rocketmq-raft1/order-test1/0/COMMIT_LOG/cfcd208400000000000000000000 2024-04-24 10:40:31 INFO TieredCommonExecutor_1 - FlatFileStore destroy file, topic=order-test1, queueId=0 2024-04-24 10:41:40 INFO ReputMessageService - Constructing Posix FileSegment, filePath: /home/work/hdd1/rocketmq/tieredstore/609b5c2c_c5tst-common-rocketmq/c5tst-common-rocketmq-raft1/order-test1/0/COMMIT_LOG/cfcd208400000000000000000000 2024-04-24 10:41:52 INFO MessageStoreDispatcher - Constructing Posix FileSegment, filePath: /home/work/hdd1/rocketmq/tieredstore/609b5c2c_c5tst-common-rocketmq/c5tst-common-rocketmq-raft1/order-test1/0/CONSUME_QUEUE/cfcd208400000000000000000000 2024-04-24 10:42:12 INFO MessageStoreDispatcher - MessageDispatcher#dispatch, topic=order-test1, queueId=0, offset=0-3000, current=0, remain=3000 ``` - 共生产了三批消息,每批 1000 条 1. 10:38:03 第一批消息生产到 broker,冷存中创建了 commitlog 文件,稍后 10:38:12 冷存中创建了 consumequeue 文件,但`FlatFileStore`中对应的的`FlatMessageFile`在10:38:31被删除,所以转储没有进行; 2. 10:40:21 第二批消息生产到 broker,冷存中创建了 commitlog 文件,没有创建 consumequeue,`FlatMessageFile`在10:40:31被删除,转储同样没有进行; 3. 10:41:40 第三批消息生产到 broker,冷存中创建了 commitlog 文件,稍后 10:41:52 冷存中创建了 consumequeue 文件,并在 10:42:12 进行了正常转储,共 3000 条,没有消息丢失; ## 分析 经分析,问题抽象如下: - 分层存储中存在两个定时任务:`dispatch` 任务与`destroyExpiredFile`任务,定时周期分别为 20s 与 60s,也就是说两次`destroyExpiredFile`之间存在三次`dispatch`; - Bug 的具体表现如下时间轴:  1. 如果冷存中Topic没有写入数据且消息的生产时间在时间段2、3,那么这个Topic的转储不能正常进行; 2. 如果任意一批消息的生产时间在时间段1,那么包括之前卡住的转储都能够正常进行,也就是说不会丢消息; ## `dispatch` 任务 - 生产消息时: 在 `MessageStoreDispatcherImpl#dispatch()`中,初始化了要转储的 `MessageQueue` 对应的 `FlatMessageFile`,也包括其中的 commitlog 与 consumequeue,二者的构造函数如下: ``` public FlatCommitLogFile(FileSegmentFactory fileSegmentFactory, String filePath) { super(fileSegmentFactory, FileSegmentType.COMMIT_LOG, filePath); this.initOffset(0L); } ``` `initOffset()` 实际上在冷存中创建了文件,实际上是向`FlatMessageFile.commitLog.fileSegmentTable` 中 add 了一个空的 `FileSegment`; ``` public FlatConsumeQueueFile(FileSegmentFactory fileSegmentFactory, String filePath) { super(fileSegmentFactory, FileSegmentType.CONSUME_QUEUE, filePath); } ``` 可以看到没有调用`initOffset()`,也就是说此时`consumeQueue.fileSegmentTable` 长度为 0,解释了为什么第一次生产消息时只创建了 commitlog 没有创建 consumequeue; - consumequeue 的创建在 `FlatMessageFile` 初始化后的第一次 `dispatchWithSemaphore` 中: ``` // If set to max offset here, some written messages may be lost if (!flatFile.isFlatFileInit()) { // return !this.consumeQueue.fileSegmentTable.isEmpty(); currentOffset = Math.max(minOffsetInQueue, maxOffsetInQueue - storeConfig.getTieredStoreGroupCommitSize()); flatFile.initOffset(currentOffset); // this.commitLog.initOffset(0L); // this.consumeQueue.initOffset(offset * MessageFormatUtil.CONSUME_QUEUE_UNIT_SIZE); return CompletableFuture.completedFuture(true); } ``` 这行代码 `flatFile.initOffset(currentOffset);` 在冷存中创建了 consumequeue 文件,之后直接返回,等待 20s 后下一次的`dispatch`进行真正的转储; ## `destroyExpiredFile`任务 - 清理过期文件逻辑 如下`FlatAppendFile#destroyExpiredFile()`: ``` public void destroyExpiredFile(long expireTimestamp) { fileSegmentLock.writeLock().lock(); try { while (!fileSegmentTable.isEmpty()) { // first remove expired file from fileSegmentTable // then close and delete expired file FileSegment fileSegment = fileSegmentTable.get(0); if (fileSegment.getMaxTimestamp() != Long.MAX_VALUE && fileSegment.getMaxTimestamp() > expireTimestamp) { log.debug("FileSegment has not expired, filePath={}, fileType={}, " + "offset={}, expireTimestamp={}, maxTimestamp={}", filePath, fileType, fileSegment.getBaseOffset(), expireTimestamp, fileSegment.getMaxTimestamp()); break; } fileSegment.destroyFile(); if (!fileSegment.exists()) { fileSegmentTable.remove(0); metadataStore.deleteFileSegment(filePath, fileType, fileSegment.getBaseOffset()); } } } finally { fileSegmentLock.writeLock().unlock(); } } ``` 1.主要问题在于 ` if (fileSegment.getMaxTimestamp() != Long.MAX_VALUE && fileSegment.getMaxTimestamp() > expireTimestamp)` 这一行,因为 commitlog 与 consumequeue 在没有写入消息时 `maxTimestamp` 默认为 `Long.MAX_VALUE`,(@lizhimins 这里这个判断条件有什么用意吗) 因此`FlatMessageFile`的初始化时间到下一次清理过期文件之间至少要执行两次`dispatch`,一次用于生成 consumequeue,一次用于修改 `maxTimestamp` 为正常值,否则尚未执行转储的 `FlatMessageFile` 会被清理; 2.但仅修改时间戳的限制这里的话,那么`FlatMessageFile`的初始化时间到下一次清理过期文件之间仍然至少要执行一次`dispatch`,用于生成 consumequeue,否则`FlatMessageFile`会被`FlatStore#load()`中的以下代码删除: ``` if (flatFile.consumeQueue.fileSegmentTable.isEmpty()) { this.destroyFile(flatFile.getMessageQueue()); } ``` ## 解决方案 删去 `if (fileSegment.getMaxTimestamp() != Long.MAX_VALUE` 这一行,并在 `FlatMessageFile` 初始化时立即创建 commitlog 与 consumequeue文件 ### Steps to Reproduce 开启分层存储,按照时间轴,在不同的时间点生产消息 ### What Did You Expect to See? 消息正常转储,`FlatMessageFile`不被删除 ### What Did You See Instead? 消息被转储前`FlatMessageFile`被删除 ### Additional Context _No response_ -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected]
