This is an automated email from the ASF dual-hosted git repository.
lollipop pushed a commit to branch develop
in repository https://gitbox.apache.org/repos/asf/rocketmq.git
The following commit(s) were added to refs/heads/develop by this push:
new 587e9767e9 [ISSUE #8653] Fix index service upload last file when
broker shutdown and fetcher check in tiered storage (#8654)
587e9767e9 is described below
commit 587e9767e93ed7734dac4cf2b2d64d61529128f1
Author: lizhimins <[email protected]>
AuthorDate: Mon Sep 9 16:57:12 2024 +0800
[ISSUE #8653] Fix index service upload last file when broker shutdown and
fetcher check in tiered storage (#8654)
* [ISSUE #8653] Fix index service upload last file when broker shutdown and
fetcher check in tiered storage
* [ISSUE #8653] Fix index service upload last file when broker shutdown and
fetcher check in tiered storage
* remove unused import
---
.../rocketmq/tieredstore/TieredMessageStore.java | 16 +++++--
.../tieredstore/index/IndexStoreService.java | 56 ++++++++++++----------
.../tieredstore/index/IndexStoreServiceTest.java | 4 +-
3 files changed, 43 insertions(+), 33 deletions(-)
diff --git
a/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/TieredMessageStore.java
b/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/TieredMessageStore.java
index 7b63e16696..0e3ede871c 100644
---
a/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/TieredMessageStore.java
+++
b/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/TieredMessageStore.java
@@ -180,9 +180,15 @@ public class TieredMessageStore extends
AbstractPluginMessageStore {
}
// determine whether tiered storage path conditions are met
- if
(storageLevel.check(MessageStoreConfig.TieredStorageLevel.NOT_IN_DISK)
- && !next.checkInStoreByConsumeOffset(topic, queueId, offset)) {
- return true;
+ if
(storageLevel.check(MessageStoreConfig.TieredStorageLevel.NOT_IN_DISK)) {
+ // return true to read from tiered storage if the CommitLog is
empty
+ if (next != null && next.getCommitLog() != null &&
+ next.getCommitLog().getMinOffset() < 0L) {
+ return true;
+ }
+ if (!next.checkInStoreByConsumeOffset(topic, queueId, offset)) {
+ return true;
+ }
}
if
(storageLevel.check(MessageStoreConfig.TieredStorageLevel.NOT_IN_MEM)
@@ -208,10 +214,10 @@ public class TieredMessageStore extends
AbstractPluginMessageStore {
}
if (fetchFromCurrentStore(topic, queueId, offset, maxMsgNums)) {
- log.trace("GetMessageAsync from current store, " +
+ log.trace("GetMessageAsync from remote store, " +
"topic: {}, queue: {}, offset: {}, maxCount: {}", topic,
queueId, offset, maxMsgNums);
} else {
- log.trace("GetMessageAsync from remote store, " +
+ log.trace("GetMessageAsync from next store, " +
"topic: {}, queue: {}, offset: {}, maxCount: {}", topic,
queueId, offset, maxMsgNums);
return next.getMessageAsync(group, topic, queueId, offset,
maxMsgNums, messageFilter);
}
diff --git
a/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/index/IndexStoreService.java
b/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/index/IndexStoreService.java
index 020b9f3b06..0db5dc5c4c 100644
---
a/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/index/IndexStoreService.java
+++
b/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/index/IndexStoreService.java
@@ -42,8 +42,6 @@ import org.apache.rocketmq.store.logfile.DefaultMappedFile;
import org.apache.rocketmq.store.logfile.MappedFile;
import org.apache.rocketmq.tieredstore.MessageStoreConfig;
import org.apache.rocketmq.tieredstore.common.AppendResult;
-import org.apache.rocketmq.tieredstore.exception.TieredStoreErrorCode;
-import org.apache.rocketmq.tieredstore.exception.TieredStoreException;
import org.apache.rocketmq.tieredstore.file.FlatAppendFile;
import org.apache.rocketmq.tieredstore.file.FlatFileFactory;
import org.apache.rocketmq.tieredstore.provider.FileSegment;
@@ -271,23 +269,23 @@ public class IndexStoreService extends ServiceThread
implements IndexService {
public void forceUpload() {
try {
readWriteLock.writeLock().lock();
- if (this.currentWriteFile == null) {
- log.warn("IndexStoreService no need force upload current write
file");
- return;
- }
- // note: current file has been shutdown before
- IndexStoreFile lastFile = new IndexStoreFile(storeConfig,
currentWriteFile.getTimestamp());
- if (this.doCompactThenUploadFile(lastFile)) {
- this.setCompactTimestamp(lastFile.getTimestamp());
- } else {
- throw new TieredStoreException(
- TieredStoreErrorCode.UNKNOWN, "IndexStoreService force
compact current file error");
+ while (true) {
+ Map.Entry<Long, IndexFile> entry =
+
this.timeStoreTable.higherEntry(this.compactTimestamp.get());
+ if (entry == null) {
+ break;
+ }
+ if (this.doCompactThenUploadFile(entry.getValue())) {
+ this.setCompactTimestamp(entry.getValue().getTimestamp());
+ // The total number of files will not too much, prevent io
too fast.
+ TimeUnit.MILLISECONDS.sleep(50);
+ }
}
} catch (Exception e) {
log.error("IndexStoreService force upload error", e);
throw new RuntimeException(e);
} finally {
- readWriteLock.writeLock().lock();
+ readWriteLock.writeLock().unlock();
}
}
@@ -393,19 +391,13 @@ public class IndexStoreService extends ServiceThread
implements IndexService {
@Override
public void shutdown() {
super.shutdown();
- readWriteLock.writeLock().lock();
- try {
- for (Map.Entry<Long /* timestamp */, IndexFile> entry :
timeStoreTable.entrySet()) {
- entry.getValue().shutdown();
- }
- if (!autoCreateNewFile) {
- this.forceUpload();
+ // Wait index service upload then clear time store table
+ while (!this.timeStoreTable.isEmpty()) {
+ try {
+ TimeUnit.MILLISECONDS.sleep(50);
+ } catch (InterruptedException e) {
+ throw new RuntimeException(e);
}
- this.timeStoreTable.clear();
- } catch (Exception e) {
- log.error("IndexStoreService shutdown error", e);
- } finally {
- readWriteLock.writeLock().unlock();
}
}
@@ -424,6 +416,18 @@ public class IndexStoreService extends ServiceThread
implements IndexService {
}
this.waitForRunning(TimeUnit.SECONDS.toMillis(10));
}
+ readWriteLock.writeLock().lock();
+ try {
+ if (autoCreateNewFile) {
+ this.forceUpload();
+ }
+ this.timeStoreTable.forEach((timestamp, file) -> file.shutdown());
+ this.timeStoreTable.clear();
+ } catch (Exception e) {
+ log.error("IndexStoreService shutdown error", e);
+ } finally {
+ readWriteLock.writeLock().unlock();
+ }
log.info(this.getServiceName() + " service shutdown");
}
}
diff --git
a/tieredstore/src/test/java/org/apache/rocketmq/tieredstore/index/IndexStoreServiceTest.java
b/tieredstore/src/test/java/org/apache/rocketmq/tieredstore/index/IndexStoreServiceTest.java
index fb563f7c6c..83b407e73b 100644
---
a/tieredstore/src/test/java/org/apache/rocketmq/tieredstore/index/IndexStoreServiceTest.java
+++
b/tieredstore/src/test/java/org/apache/rocketmq/tieredstore/index/IndexStoreServiceTest.java
@@ -120,7 +120,7 @@ public class IndexStoreServiceTest {
indexService = new IndexStoreService(fileAllocator, filePath);
indexService.start();
ConcurrentSkipListMap<Long, IndexFile> timeStoreTable =
indexService.getTimeStoreTable();
- Assert.assertEquals(1, timeStoreTable.size());
+ Assert.assertEquals(2, timeStoreTable.size());
Assert.assertEquals(Long.valueOf(timestamp),
timeStoreTable.firstKey());
mappedFile.destroy(10 * 1000);
}
@@ -232,7 +232,7 @@ public class IndexStoreServiceTest {
indexService = new IndexStoreService(fileAllocator, filePath);
indexService.start();
Assert.assertEquals(timestamp,
indexService.getTimeStoreTable().firstKey().longValue());
- Assert.assertEquals(2, indexService.getTimeStoreTable().size());
+ Assert.assertEquals(4, indexService.getTimeStoreTable().size());
Assert.assertEquals(IndexFile.IndexStatusEnum.UPLOAD,
indexService.getTimeStoreTable().firstEntry().getValue().getFileStatus());
}