This is an automated email from the ASF dual-hosted git repository.

lollipop pushed a commit to branch develop
in repository https://gitbox.apache.org/repos/asf/rocketmq.git


The following commit(s) were added to refs/heads/develop by this push:
     new b6efbb11ca [ISSUE #7878] Fix query message offset return wrong offset 
with boundary type (#7962)
b6efbb11ca is described below

commit b6efbb11ca599cdf0c0899479d1221d9cc65eeea
Author: lizhimins <[email protected]>
AuthorDate: Mon Mar 25 18:55:05 2024 +0800

    [ISSUE #7878] Fix query message offset return wrong offset with boundary 
type (#7962)
---
 .../rocketmq/tieredstore/file/FlatMessageFile.java | 56 ++++++++++++++--------
 .../core/MessageStoreFetcherImplTest.java          |  4 +-
 .../tieredstore/file/FlatMessageFileTest.java      | 25 ++++++----
 3 files changed, 54 insertions(+), 31 deletions(-)

diff --git 
a/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/file/FlatMessageFile.java
 
b/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/file/FlatMessageFile.java
index 7123332410..a214059442 100644
--- 
a/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/file/FlatMessageFile.java
+++ 
b/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/file/FlatMessageFile.java
@@ -289,38 +289,54 @@ public class FlatMessageFile implements FlatFileInterface 
{
             return CompletableFuture.completedFuture(cqMin);
         }
 
+        ByteBuffer buffer = getMessageAsync(cqMax).join();
+        long storeTime = MessageFormatUtil.getStoreTimeStamp(buffer);
+        if (storeTime < timestamp) {
+            log.info("FlatMessageFile getQueueOffsetByTimeAsync, exceeded 
maximum time, " +
+                "filePath={}, timestamp={}, result={}", filePath, timestamp, 
cqMax + 1);
+            return CompletableFuture.completedFuture(cqMax + 1);
+        }
+
+        buffer = getMessageAsync(cqMin).join();
+        storeTime = MessageFormatUtil.getStoreTimeStamp(buffer);
+        if (storeTime > timestamp) {
+            log.info("FlatMessageFile getQueueOffsetByTimeAsync, less than 
minimum time, " +
+                "filePath={}, timestamp={}, result={}", filePath, timestamp, 
cqMin);
+            return CompletableFuture.completedFuture(cqMin);
+        }
+
+        // binary search lower bound index in a sorted array
         long minOffset = cqMin;
         long maxOffset = cqMax;
         List<String> queryLog = new ArrayList<>();
         while (minOffset < maxOffset) {
             long middle = minOffset + (maxOffset - minOffset) / 2;
-            ByteBuffer buffer = this.getMessageAsync(middle).join();
-            long storeTime = MessageFormatUtil.getStoreTimeStamp(buffer);
-            queryLog.add(String.format(
-                "(range=%d-%d, middle=%d, timestamp=%d)", minOffset, 
maxOffset, middle, storeTime));
-            if (storeTime == timestamp) {
-                minOffset = middle;
-                break;
-            } else if (storeTime < timestamp) {
+            buffer = this.getMessageAsync(middle).join();
+            storeTime = MessageFormatUtil.getStoreTimeStamp(buffer);
+            queryLog.add(String.format("(range=%d-%d, middle=%d, timestamp=%d, 
diff=%dms)",
+                minOffset, maxOffset, middle, storeTime, timestamp - 
storeTime));
+            if (storeTime < timestamp) {
                 minOffset = middle + 1;
             } else {
-                maxOffset = middle - 1;
+                maxOffset = middle;
             }
         }
 
         long offset = minOffset;
-        while (true) {
-            long next = boundaryType == BoundaryType.LOWER ? offset - 1 : 
offset + 1;
-            if (next < cqMin || next > cqMax) {
-                break;
-            }
-            ByteBuffer buffer = this.getMessageAsync(next).join();
-            long storeTime = MessageFormatUtil.getStoreTimeStamp(buffer);
-            if (storeTime == timestamp) {
-                offset = next;
-                continue;
+        if (boundaryType == BoundaryType.UPPER) {
+            while (true) {
+                long next = offset + 1;
+                if (next > cqMax) {
+                    break;
+                }
+                buffer = this.getMessageAsync(next).join();
+                storeTime = MessageFormatUtil.getStoreTimeStamp(buffer);
+                if (storeTime == timestamp) {
+                    offset = next;
+                } else {
+                    break;
+                }
             }
-            break;
         }
 
         log.info("FlatMessageFile getQueueOffsetByTimeAsync, filePath={}, 
timestamp={}, result={}, log={}",
diff --git 
a/tieredstore/src/test/java/org/apache/rocketmq/tieredstore/core/MessageStoreFetcherImplTest.java
 
b/tieredstore/src/test/java/org/apache/rocketmq/tieredstore/core/MessageStoreFetcherImplTest.java
index ce380776ae..7b8b17d5bb 100644
--- 
a/tieredstore/src/test/java/org/apache/rocketmq/tieredstore/core/MessageStoreFetcherImplTest.java
+++ 
b/tieredstore/src/test/java/org/apache/rocketmq/tieredstore/core/MessageStoreFetcherImplTest.java
@@ -208,11 +208,11 @@ public class MessageStoreFetcherImplTest {
 
         Assert.assertEquals(100L, 
fetcher.getOffsetInQueueByTime(mq.getTopic(), 1, 10, BoundaryType.LOWER));
         Assert.assertEquals(100L, 
fetcher.getOffsetInQueueByTime(mq.getTopic(), 1, 11, BoundaryType.LOWER));
-        Assert.assertEquals(199L, 
fetcher.getOffsetInQueueByTime(mq.getTopic(), 1, 12, BoundaryType.LOWER));
+        Assert.assertEquals(200L, 
fetcher.getOffsetInQueueByTime(mq.getTopic(), 1, 12, BoundaryType.LOWER));
 
         Assert.assertEquals(100L, 
fetcher.getOffsetInQueueByTime(mq.getTopic(), 1, 10, BoundaryType.UPPER));
         Assert.assertEquals(199L, 
fetcher.getOffsetInQueueByTime(mq.getTopic(), 1, 11, BoundaryType.UPPER));
-        Assert.assertEquals(199L, 
fetcher.getOffsetInQueueByTime(mq.getTopic(), 1, 12, BoundaryType.UPPER));
+        Assert.assertEquals(200L, 
fetcher.getOffsetInQueueByTime(mq.getTopic(), 1, 12, BoundaryType.UPPER));
     }
 
     @Test
diff --git 
a/tieredstore/src/test/java/org/apache/rocketmq/tieredstore/file/FlatMessageFileTest.java
 
b/tieredstore/src/test/java/org/apache/rocketmq/tieredstore/file/FlatMessageFileTest.java
index 95245aa27e..8a417f54a7 100644
--- 
a/tieredstore/src/test/java/org/apache/rocketmq/tieredstore/file/FlatMessageFileTest.java
+++ 
b/tieredstore/src/test/java/org/apache/rocketmq/tieredstore/file/FlatMessageFileTest.java
@@ -188,24 +188,31 @@ public class FlatMessageFileTest {
         // commit message will increase max consume queue offset
         Assert.assertTrue(flatFile.commitAsync().join());
 
-        Assert.assertEquals(54, flatFile.getQueueOffsetByTimeAsync(timestamp3 
+ 1, BoundaryType.UPPER).join().longValue());
-        Assert.assertEquals(54, flatFile.getQueueOffsetByTimeAsync(timestamp3, 
BoundaryType.UPPER).join().longValue());
+        // offset:            50,  51,   52,   53,   54
+        // inject store time: 0, +100, +100, +100, +200
+        Assert.assertEquals(50, flatFile.getQueueOffsetByTimeAsync(0, 
BoundaryType.LOWER).join().longValue());
+        Assert.assertEquals(50, flatFile.getQueueOffsetByTimeAsync(0, 
BoundaryType.UPPER).join().longValue());
 
         Assert.assertEquals(50, flatFile.getQueueOffsetByTimeAsync(timestamp1 
- 1, BoundaryType.LOWER).join().longValue());
+        Assert.assertEquals(50, flatFile.getQueueOffsetByTimeAsync(timestamp1 
- 1, BoundaryType.UPPER).join().longValue());
+
         Assert.assertEquals(50, flatFile.getQueueOffsetByTimeAsync(timestamp1, 
BoundaryType.LOWER).join().longValue());
+        Assert.assertEquals(50, flatFile.getQueueOffsetByTimeAsync(timestamp1, 
BoundaryType.UPPER).join().longValue());
 
         Assert.assertEquals(51, flatFile.getQueueOffsetByTimeAsync(timestamp1 
+ 1, BoundaryType.LOWER).join().longValue());
-        Assert.assertEquals(51, flatFile.getQueueOffsetByTimeAsync(timestamp2, 
BoundaryType.LOWER).join().longValue());
-        Assert.assertEquals(54, flatFile.getQueueOffsetByTimeAsync(timestamp2 
+ 1, BoundaryType.LOWER).join().longValue());
-        Assert.assertEquals(54, flatFile.getQueueOffsetByTimeAsync(timestamp3, 
BoundaryType.LOWER).join().longValue());
-
-        Assert.assertEquals(50, flatFile.getQueueOffsetByTimeAsync(timestamp1, 
BoundaryType.UPPER).join().longValue());
         Assert.assertEquals(51, flatFile.getQueueOffsetByTimeAsync(timestamp1 
+ 1, BoundaryType.UPPER).join().longValue());
+
+        Assert.assertEquals(51, flatFile.getQueueOffsetByTimeAsync(timestamp2, 
BoundaryType.LOWER).join().longValue());
         Assert.assertEquals(53, flatFile.getQueueOffsetByTimeAsync(timestamp2, 
BoundaryType.UPPER).join().longValue());
+
         Assert.assertEquals(54, flatFile.getQueueOffsetByTimeAsync(timestamp2 
+ 1, BoundaryType.UPPER).join().longValue());
+        Assert.assertEquals(54, flatFile.getQueueOffsetByTimeAsync(timestamp2 
+ 1, BoundaryType.LOWER).join().longValue());
 
-        Assert.assertEquals(50, flatFile.getQueueOffsetByTimeAsync(timestamp1 
- 1, BoundaryType.UPPER).join().longValue());
-        Assert.assertEquals(54, flatFile.getQueueOffsetByTimeAsync(timestamp3 
+ 1, BoundaryType.LOWER).join().longValue());
+        Assert.assertEquals(54, flatFile.getQueueOffsetByTimeAsync(timestamp3, 
BoundaryType.LOWER).join().longValue());
+        Assert.assertEquals(54, flatFile.getQueueOffsetByTimeAsync(timestamp3, 
BoundaryType.UPPER).join().longValue());
+
+        Assert.assertEquals(55, flatFile.getQueueOffsetByTimeAsync(timestamp3 
+ 1, BoundaryType.LOWER).join().longValue());
+        Assert.assertEquals(55, flatFile.getQueueOffsetByTimeAsync(timestamp3 
+ 1, BoundaryType.UPPER).join().longValue());
 
         flatFile.destroy();
     }

Reply via email to