This is an automated email from the ASF dual-hosted git repository.
lizhimin pushed a commit to branch develop
in repository https://gitbox.apache.org/repos/asf/rocketmq.git
The following commit(s) were added to refs/heads/develop by this push:
new c080e6bbf2 [ISSUE #8438] Fix broker return two messages when query
message and index service bug (#8439)
c080e6bbf2 is described below
commit c080e6bbf208bc91f06ba97beb37241d7d0c20a0
Author: lizhimins <[email protected]>
AuthorDate: Thu Jul 25 11:38:31 2024 +0800
[ISSUE #8438] Fix broker return two messages when query message and index
service bug (#8439)
---
.../main/java/org/apache/rocketmq/tieredstore/TieredMessageStore.java | 3 +++
.../java/org/apache/rocketmq/tieredstore/file/FlatAppendFile.java | 1 +
.../java/org/apache/rocketmq/tieredstore/index/IndexStoreFile.java | 4 +++-
3 files changed, 7 insertions(+), 1 deletion(-)
diff --git
a/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/TieredMessageStore.java
b/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/TieredMessageStore.java
index 99d586ae23..9a25f85a6b 100644
---
a/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/TieredMessageStore.java
+++
b/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/TieredMessageStore.java
@@ -460,6 +460,9 @@ public class TieredMessageStore extends
AbstractPluginMessageStore {
if (flatFileStore != null) {
flatFileStore.shutdown();
}
+ if (indexService != null) {
+ indexService.shutdown();
+ }
if (storeExecutor != null) {
storeExecutor.shutdown();
}
diff --git
a/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/file/FlatAppendFile.java
b/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/file/FlatAppendFile.java
index b9ba80d08d..0c20a1cfb4 100644
---
a/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/file/FlatAppendFile.java
+++
b/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/file/FlatAppendFile.java
@@ -90,6 +90,7 @@ public class FlatAppendFile {
public void initOffset(long offset) {
if (this.fileSegmentTable.isEmpty()) {
FileSegment fileSegment =
fileSegmentFactory.createSegment(fileType, filePath, offset);
+ fileSegment.initPosition(fileSegment.getSize());
this.flushFileSegmentMeta(fileSegment);
this.fileSegmentTable.add(fileSegment);
}
diff --git
a/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/index/IndexStoreFile.java
b/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/index/IndexStoreFile.java
index 180399332e..f9604b43e6 100644
---
a/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/index/IndexStoreFile.java
+++
b/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/index/IndexStoreFile.java
@@ -287,7 +287,9 @@ public class IndexStoreFile implements IndexFile {
buffer.position(this.getItemPosition(slotValue));
buffer.get(bytes);
IndexItem indexItem = new IndexItem(bytes);
- if (hashCode == indexItem.getHashCode()) {
+ long storeTimestamp = indexItem.getTimeDiff() +
beginTimestamp.get();
+ if (hashCode == indexItem.getHashCode() &&
+ beginTime <= storeTimestamp && storeTimestamp <=
endTime) {
result.add(indexItem);
if (result.size() > maxCount) {
break;