This is an automated email from the ASF dual-hosted git repository.

lollipop pushed a commit to branch develop
in repository https://gitbox.apache.org/repos/asf/rocketmq.git


The following commit(s) were added to refs/heads/develop by this push:
     new 587e9767e9 [ISSUE #8653] Fix index service upload last file when 
broker shutdown and fetcher check in tiered storage (#8654)
587e9767e9 is described below

commit 587e9767e93ed7734dac4cf2b2d64d61529128f1
Author: lizhimins <[email protected]>
AuthorDate: Mon Sep 9 16:57:12 2024 +0800

    [ISSUE #8653] Fix index service upload last file when broker shutdown and 
fetcher check in tiered storage (#8654)
    
    * [ISSUE #8653] Fix index service upload last file when broker shutdown and 
fetcher check in tiered storage
    
    * [ISSUE #8653] Fix index service upload last file when broker shutdown and 
fetcher check in tiered storage
    
    * remove unused import
---
 .../rocketmq/tieredstore/TieredMessageStore.java   | 16 +++++--
 .../tieredstore/index/IndexStoreService.java       | 56 ++++++++++++----------
 .../tieredstore/index/IndexStoreServiceTest.java   |  4 +-
 3 files changed, 43 insertions(+), 33 deletions(-)

diff --git 
a/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/TieredMessageStore.java
 
b/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/TieredMessageStore.java
index 7b63e16696..0e3ede871c 100644
--- 
a/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/TieredMessageStore.java
+++ 
b/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/TieredMessageStore.java
@@ -180,9 +180,15 @@ public class TieredMessageStore extends 
AbstractPluginMessageStore {
         }
 
         // determine whether tiered storage path conditions are met
-        if 
(storageLevel.check(MessageStoreConfig.TieredStorageLevel.NOT_IN_DISK)
-            && !next.checkInStoreByConsumeOffset(topic, queueId, offset)) {
-            return true;
+        if 
(storageLevel.check(MessageStoreConfig.TieredStorageLevel.NOT_IN_DISK)) {
+            // return true to read from tiered storage if the CommitLog is 
empty
+            if (next != null && next.getCommitLog() != null &&
+                next.getCommitLog().getMinOffset() < 0L) {
+                return true;
+            }
+            if (!next.checkInStoreByConsumeOffset(topic, queueId, offset)) {
+                return true;
+            }
         }
 
         if 
(storageLevel.check(MessageStoreConfig.TieredStorageLevel.NOT_IN_MEM)
@@ -208,10 +214,10 @@ public class TieredMessageStore extends 
AbstractPluginMessageStore {
         }
 
         if (fetchFromCurrentStore(topic, queueId, offset, maxMsgNums)) {
-            log.trace("GetMessageAsync from current store, " +
+            log.trace("GetMessageAsync from remote store, " +
                 "topic: {}, queue: {}, offset: {}, maxCount: {}", topic, 
queueId, offset, maxMsgNums);
         } else {
-            log.trace("GetMessageAsync from remote store, " +
+            log.trace("GetMessageAsync from next store, " +
                 "topic: {}, queue: {}, offset: {}, maxCount: {}", topic, 
queueId, offset, maxMsgNums);
             return next.getMessageAsync(group, topic, queueId, offset, 
maxMsgNums, messageFilter);
         }
diff --git 
a/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/index/IndexStoreService.java
 
b/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/index/IndexStoreService.java
index 020b9f3b06..0db5dc5c4c 100644
--- 
a/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/index/IndexStoreService.java
+++ 
b/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/index/IndexStoreService.java
@@ -42,8 +42,6 @@ import org.apache.rocketmq.store.logfile.DefaultMappedFile;
 import org.apache.rocketmq.store.logfile.MappedFile;
 import org.apache.rocketmq.tieredstore.MessageStoreConfig;
 import org.apache.rocketmq.tieredstore.common.AppendResult;
-import org.apache.rocketmq.tieredstore.exception.TieredStoreErrorCode;
-import org.apache.rocketmq.tieredstore.exception.TieredStoreException;
 import org.apache.rocketmq.tieredstore.file.FlatAppendFile;
 import org.apache.rocketmq.tieredstore.file.FlatFileFactory;
 import org.apache.rocketmq.tieredstore.provider.FileSegment;
@@ -271,23 +269,23 @@ public class IndexStoreService extends ServiceThread 
implements IndexService {
     public void forceUpload() {
         try {
             readWriteLock.writeLock().lock();
-            if (this.currentWriteFile == null) {
-                log.warn("IndexStoreService no need force upload current write 
file");
-                return;
-            }
-            // note: current file has been shutdown before
-            IndexStoreFile lastFile = new IndexStoreFile(storeConfig, 
currentWriteFile.getTimestamp());
-            if (this.doCompactThenUploadFile(lastFile)) {
-                this.setCompactTimestamp(lastFile.getTimestamp());
-            } else {
-                throw new TieredStoreException(
-                    TieredStoreErrorCode.UNKNOWN, "IndexStoreService force 
compact current file error");
+            while (true) {
+                Map.Entry<Long, IndexFile> entry =
+                    
this.timeStoreTable.higherEntry(this.compactTimestamp.get());
+                if (entry == null) {
+                    break;
+                }
+                if (this.doCompactThenUploadFile(entry.getValue())) {
+                    this.setCompactTimestamp(entry.getValue().getTimestamp());
+                    // The total number of files will not too much, prevent io 
too fast.
+                    TimeUnit.MILLISECONDS.sleep(50);
+                }
             }
         } catch (Exception e) {
             log.error("IndexStoreService force upload error", e);
             throw new RuntimeException(e);
         } finally {
-            readWriteLock.writeLock().lock();
+            readWriteLock.writeLock().unlock();
         }
     }
 
@@ -393,19 +391,13 @@ public class IndexStoreService extends ServiceThread 
implements IndexService {
     @Override
     public void shutdown() {
         super.shutdown();
-        readWriteLock.writeLock().lock();
-        try {
-            for (Map.Entry<Long /* timestamp */, IndexFile> entry : 
timeStoreTable.entrySet()) {
-                entry.getValue().shutdown();
-            }
-            if (!autoCreateNewFile) {
-                this.forceUpload();
+        // Wait index service upload then clear time store table
+        while (!this.timeStoreTable.isEmpty()) {
+            try {
+                TimeUnit.MILLISECONDS.sleep(50);
+            } catch (InterruptedException e) {
+                throw new RuntimeException(e);
             }
-            this.timeStoreTable.clear();
-        } catch (Exception e) {
-            log.error("IndexStoreService shutdown error", e);
-        } finally {
-            readWriteLock.writeLock().unlock();
         }
     }
 
@@ -424,6 +416,18 @@ public class IndexStoreService extends ServiceThread 
implements IndexService {
             }
             this.waitForRunning(TimeUnit.SECONDS.toMillis(10));
         }
+        readWriteLock.writeLock().lock();
+        try {
+            if (autoCreateNewFile) {
+                this.forceUpload();
+            }
+            this.timeStoreTable.forEach((timestamp, file) -> file.shutdown());
+            this.timeStoreTable.clear();
+        } catch (Exception e) {
+            log.error("IndexStoreService shutdown error", e);
+        } finally {
+            readWriteLock.writeLock().unlock();
+        }
         log.info(this.getServiceName() + " service shutdown");
     }
 }
diff --git 
a/tieredstore/src/test/java/org/apache/rocketmq/tieredstore/index/IndexStoreServiceTest.java
 
b/tieredstore/src/test/java/org/apache/rocketmq/tieredstore/index/IndexStoreServiceTest.java
index fb563f7c6c..83b407e73b 100644
--- 
a/tieredstore/src/test/java/org/apache/rocketmq/tieredstore/index/IndexStoreServiceTest.java
+++ 
b/tieredstore/src/test/java/org/apache/rocketmq/tieredstore/index/IndexStoreServiceTest.java
@@ -120,7 +120,7 @@ public class IndexStoreServiceTest {
         indexService = new IndexStoreService(fileAllocator, filePath);
         indexService.start();
         ConcurrentSkipListMap<Long, IndexFile> timeStoreTable = 
indexService.getTimeStoreTable();
-        Assert.assertEquals(1, timeStoreTable.size());
+        Assert.assertEquals(2, timeStoreTable.size());
         Assert.assertEquals(Long.valueOf(timestamp), 
timeStoreTable.firstKey());
         mappedFile.destroy(10 * 1000);
     }
@@ -232,7 +232,7 @@ public class IndexStoreServiceTest {
         indexService = new IndexStoreService(fileAllocator, filePath);
         indexService.start();
         Assert.assertEquals(timestamp, 
indexService.getTimeStoreTable().firstKey().longValue());
-        Assert.assertEquals(2, indexService.getTimeStoreTable().size());
+        Assert.assertEquals(4, indexService.getTimeStoreTable().size());
         Assert.assertEquals(IndexFile.IndexStatusEnum.UPLOAD,
             
indexService.getTimeStoreTable().firstEntry().getValue().getFileStatus());
     }

Reply via email to