This is an automated email from the ASF dual-hosted git repository.
lollipop pushed a commit to branch develop
in repository https://gitbox.apache.org/repos/asf/rocketmq.git
The following commit(s) were added to refs/heads/develop by this push:
new b6efbb11ca [ISSUE #7878] Fix query message offset return wrong offset
with boundary type (#7962)
b6efbb11ca is described below
commit b6efbb11ca599cdf0c0899479d1221d9cc65eeea
Author: lizhimins <[email protected]>
AuthorDate: Mon Mar 25 18:55:05 2024 +0800
[ISSUE #7878] Fix query message offset return wrong offset with boundary
type (#7962)
---
.../rocketmq/tieredstore/file/FlatMessageFile.java | 56 ++++++++++++++--------
.../core/MessageStoreFetcherImplTest.java | 4 +-
.../tieredstore/file/FlatMessageFileTest.java | 25 ++++++----
3 files changed, 54 insertions(+), 31 deletions(-)
diff --git
a/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/file/FlatMessageFile.java
b/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/file/FlatMessageFile.java
index 7123332410..a214059442 100644
---
a/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/file/FlatMessageFile.java
+++
b/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/file/FlatMessageFile.java
@@ -289,38 +289,54 @@ public class FlatMessageFile implements FlatFileInterface
{
return CompletableFuture.completedFuture(cqMin);
}
+ ByteBuffer buffer = getMessageAsync(cqMax).join();
+ long storeTime = MessageFormatUtil.getStoreTimeStamp(buffer);
+ if (storeTime < timestamp) {
+ log.info("FlatMessageFile getQueueOffsetByTimeAsync, exceeded
maximum time, " +
+ "filePath={}, timestamp={}, result={}", filePath, timestamp,
cqMax + 1);
+ return CompletableFuture.completedFuture(cqMax + 1);
+ }
+
+ buffer = getMessageAsync(cqMin).join();
+ storeTime = MessageFormatUtil.getStoreTimeStamp(buffer);
+ if (storeTime > timestamp) {
+ log.info("FlatMessageFile getQueueOffsetByTimeAsync, less than
minimum time, " +
+ "filePath={}, timestamp={}, result={}", filePath, timestamp,
cqMin);
+ return CompletableFuture.completedFuture(cqMin);
+ }
+
+ // binary search lower bound index in a sorted array
long minOffset = cqMin;
long maxOffset = cqMax;
List<String> queryLog = new ArrayList<>();
while (minOffset < maxOffset) {
long middle = minOffset + (maxOffset - minOffset) / 2;
- ByteBuffer buffer = this.getMessageAsync(middle).join();
- long storeTime = MessageFormatUtil.getStoreTimeStamp(buffer);
- queryLog.add(String.format(
- "(range=%d-%d, middle=%d, timestamp=%d)", minOffset,
maxOffset, middle, storeTime));
- if (storeTime == timestamp) {
- minOffset = middle;
- break;
- } else if (storeTime < timestamp) {
+ buffer = this.getMessageAsync(middle).join();
+ storeTime = MessageFormatUtil.getStoreTimeStamp(buffer);
+ queryLog.add(String.format("(range=%d-%d, middle=%d, timestamp=%d,
diff=%dms)",
+ minOffset, maxOffset, middle, storeTime, timestamp -
storeTime));
+ if (storeTime < timestamp) {
minOffset = middle + 1;
} else {
- maxOffset = middle - 1;
+ maxOffset = middle;
}
}
long offset = minOffset;
- while (true) {
- long next = boundaryType == BoundaryType.LOWER ? offset - 1 :
offset + 1;
- if (next < cqMin || next > cqMax) {
- break;
- }
- ByteBuffer buffer = this.getMessageAsync(next).join();
- long storeTime = MessageFormatUtil.getStoreTimeStamp(buffer);
- if (storeTime == timestamp) {
- offset = next;
- continue;
+ if (boundaryType == BoundaryType.UPPER) {
+ while (true) {
+ long next = offset + 1;
+ if (next > cqMax) {
+ break;
+ }
+ buffer = this.getMessageAsync(next).join();
+ storeTime = MessageFormatUtil.getStoreTimeStamp(buffer);
+ if (storeTime == timestamp) {
+ offset = next;
+ } else {
+ break;
+ }
}
- break;
}
log.info("FlatMessageFile getQueueOffsetByTimeAsync, filePath={},
timestamp={}, result={}, log={}",
diff --git
a/tieredstore/src/test/java/org/apache/rocketmq/tieredstore/core/MessageStoreFetcherImplTest.java
b/tieredstore/src/test/java/org/apache/rocketmq/tieredstore/core/MessageStoreFetcherImplTest.java
index ce380776ae..7b8b17d5bb 100644
---
a/tieredstore/src/test/java/org/apache/rocketmq/tieredstore/core/MessageStoreFetcherImplTest.java
+++
b/tieredstore/src/test/java/org/apache/rocketmq/tieredstore/core/MessageStoreFetcherImplTest.java
@@ -208,11 +208,11 @@ public class MessageStoreFetcherImplTest {
Assert.assertEquals(100L,
fetcher.getOffsetInQueueByTime(mq.getTopic(), 1, 10, BoundaryType.LOWER));
Assert.assertEquals(100L,
fetcher.getOffsetInQueueByTime(mq.getTopic(), 1, 11, BoundaryType.LOWER));
- Assert.assertEquals(199L,
fetcher.getOffsetInQueueByTime(mq.getTopic(), 1, 12, BoundaryType.LOWER));
+ Assert.assertEquals(200L,
fetcher.getOffsetInQueueByTime(mq.getTopic(), 1, 12, BoundaryType.LOWER));
Assert.assertEquals(100L,
fetcher.getOffsetInQueueByTime(mq.getTopic(), 1, 10, BoundaryType.UPPER));
Assert.assertEquals(199L,
fetcher.getOffsetInQueueByTime(mq.getTopic(), 1, 11, BoundaryType.UPPER));
- Assert.assertEquals(199L,
fetcher.getOffsetInQueueByTime(mq.getTopic(), 1, 12, BoundaryType.UPPER));
+ Assert.assertEquals(200L,
fetcher.getOffsetInQueueByTime(mq.getTopic(), 1, 12, BoundaryType.UPPER));
}
@Test
diff --git
a/tieredstore/src/test/java/org/apache/rocketmq/tieredstore/file/FlatMessageFileTest.java
b/tieredstore/src/test/java/org/apache/rocketmq/tieredstore/file/FlatMessageFileTest.java
index 95245aa27e..8a417f54a7 100644
---
a/tieredstore/src/test/java/org/apache/rocketmq/tieredstore/file/FlatMessageFileTest.java
+++
b/tieredstore/src/test/java/org/apache/rocketmq/tieredstore/file/FlatMessageFileTest.java
@@ -188,24 +188,31 @@ public class FlatMessageFileTest {
// commit message will increase max consume queue offset
Assert.assertTrue(flatFile.commitAsync().join());
- Assert.assertEquals(54, flatFile.getQueueOffsetByTimeAsync(timestamp3
+ 1, BoundaryType.UPPER).join().longValue());
- Assert.assertEquals(54, flatFile.getQueueOffsetByTimeAsync(timestamp3,
BoundaryType.UPPER).join().longValue());
+ // offset: 50, 51, 52, 53, 54
+ // inject store time: 0, +100, +100, +100, +200
+ Assert.assertEquals(50, flatFile.getQueueOffsetByTimeAsync(0,
BoundaryType.LOWER).join().longValue());
+ Assert.assertEquals(50, flatFile.getQueueOffsetByTimeAsync(0,
BoundaryType.UPPER).join().longValue());
Assert.assertEquals(50, flatFile.getQueueOffsetByTimeAsync(timestamp1
- 1, BoundaryType.LOWER).join().longValue());
+ Assert.assertEquals(50, flatFile.getQueueOffsetByTimeAsync(timestamp1
- 1, BoundaryType.UPPER).join().longValue());
+
Assert.assertEquals(50, flatFile.getQueueOffsetByTimeAsync(timestamp1,
BoundaryType.LOWER).join().longValue());
+ Assert.assertEquals(50, flatFile.getQueueOffsetByTimeAsync(timestamp1,
BoundaryType.UPPER).join().longValue());
Assert.assertEquals(51, flatFile.getQueueOffsetByTimeAsync(timestamp1
+ 1, BoundaryType.LOWER).join().longValue());
- Assert.assertEquals(51, flatFile.getQueueOffsetByTimeAsync(timestamp2,
BoundaryType.LOWER).join().longValue());
- Assert.assertEquals(54, flatFile.getQueueOffsetByTimeAsync(timestamp2
+ 1, BoundaryType.LOWER).join().longValue());
- Assert.assertEquals(54, flatFile.getQueueOffsetByTimeAsync(timestamp3,
BoundaryType.LOWER).join().longValue());
-
- Assert.assertEquals(50, flatFile.getQueueOffsetByTimeAsync(timestamp1,
BoundaryType.UPPER).join().longValue());
Assert.assertEquals(51, flatFile.getQueueOffsetByTimeAsync(timestamp1
+ 1, BoundaryType.UPPER).join().longValue());
+
+ Assert.assertEquals(51, flatFile.getQueueOffsetByTimeAsync(timestamp2,
BoundaryType.LOWER).join().longValue());
Assert.assertEquals(53, flatFile.getQueueOffsetByTimeAsync(timestamp2,
BoundaryType.UPPER).join().longValue());
+
Assert.assertEquals(54, flatFile.getQueueOffsetByTimeAsync(timestamp2
+ 1, BoundaryType.UPPER).join().longValue());
+ Assert.assertEquals(54, flatFile.getQueueOffsetByTimeAsync(timestamp2
+ 1, BoundaryType.LOWER).join().longValue());
- Assert.assertEquals(50, flatFile.getQueueOffsetByTimeAsync(timestamp1
- 1, BoundaryType.UPPER).join().longValue());
- Assert.assertEquals(54, flatFile.getQueueOffsetByTimeAsync(timestamp3
+ 1, BoundaryType.LOWER).join().longValue());
+ Assert.assertEquals(54, flatFile.getQueueOffsetByTimeAsync(timestamp3,
BoundaryType.LOWER).join().longValue());
+ Assert.assertEquals(54, flatFile.getQueueOffsetByTimeAsync(timestamp3,
BoundaryType.UPPER).join().longValue());
+
+ Assert.assertEquals(55, flatFile.getQueueOffsetByTimeAsync(timestamp3
+ 1, BoundaryType.LOWER).join().longValue());
+ Assert.assertEquals(55, flatFile.getQueueOffsetByTimeAsync(timestamp3
+ 1, BoundaryType.UPPER).join().longValue());
flatFile.destroy();
}