This is an automated email from the ASF dual-hosted git repository.
lizhimin pushed a commit to branch develop
in repository https://gitbox.apache.org/repos/asf/rocketmq.git
The following commit(s) were added to refs/heads/develop by this push:
new c2c29c2435 [ISSUE #7545] Fix set mapped file to null cause file can
not destroy (#7612)
c2c29c2435 is described below
commit c2c29c2435e0626cfe4f49830fbdc0d9421d82b5
Author: lizhimins <[email protected]>
AuthorDate: Mon Dec 4 16:13:07 2023 +0800
[ISSUE #7545] Fix set mapped file to null cause file can not destroy (#7612)
---
.../org/apache/rocketmq/tieredstore/index/IndexStoreFile.java | 2 --
.../apache/rocketmq/tieredstore/index/IndexStoreService.java | 10 ++++++++++
2 files changed, 10 insertions(+), 2 deletions(-)
diff --git
a/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/index/IndexStoreFile.java
b/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/index/IndexStoreFile.java
index 52a686f685..def5c8f2d0 100644
---
a/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/index/IndexStoreFile.java
+++
b/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/index/IndexStoreFile.java
@@ -457,11 +457,9 @@ public class IndexStoreFile implements IndexFile {
this.fileStatus.set(IndexStatusEnum.SHUTDOWN);
if (this.mappedFile != null) {
this.mappedFile.shutdown(TimeUnit.SECONDS.toMillis(10));
- this.mappedFile = null;
}
if (this.compactMappedFile != null) {
this.compactMappedFile.shutdown(TimeUnit.SECONDS.toMillis(10));
- this.compactMappedFile = null;
}
} catch (Exception e) {
log.error("IndexStoreFile shutdown failed, timestamp: {}, status:
{}", this.getTimestamp(), fileStatus.get(), e);
diff --git
a/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/index/IndexStoreService.java
b/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/index/IndexStoreService.java
index 14608aa58d..e99ea0de18 100644
---
a/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/index/IndexStoreService.java
+++
b/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/index/IndexStoreService.java
@@ -37,6 +37,7 @@ import java.util.concurrent.locks.ReadWriteLock;
import java.util.concurrent.locks.ReentrantReadWriteLock;
import org.apache.commons.lang3.StringUtils;
import org.apache.rocketmq.common.ServiceThread;
+import org.apache.rocketmq.common.UtilAll;
import org.apache.rocketmq.logging.org.slf4j.Logger;
import org.apache.rocketmq.logging.org.slf4j.LoggerFactory;
import org.apache.rocketmq.store.logfile.DefaultMappedFile;
@@ -101,6 +102,10 @@ public class IndexStoreService extends ServiceThread
implements IndexService {
private void recover() {
Stopwatch stopwatch = Stopwatch.createStarted();
+ // delete compact file directory
+ UtilAll.deleteFile(new
File(Paths.get(storeConfig.getStorePathRootDir(),
+ FILE_DIRECTORY_NAME, FILE_COMPACTED_DIRECTORY_NAME).toString()));
+
// recover local
File dir = new File(Paths.get(storeConfig.getStorePathRootDir(),
FILE_DIRECTORY_NAME).toString());
this.doConvertOldFormatFile(Paths.get(dir.getPath(),
"0000").toString());
@@ -141,6 +146,10 @@ public class IndexStoreService extends ServiceThread
implements IndexService {
for (TieredFileSegment fileSegment : flatFile.getFileSegmentList()) {
IndexFile indexFile = new IndexStoreFile(storeConfig, fileSegment);
+ IndexFile localFile = timeStoreTable.get(indexFile.getTimestamp());
+ if (localFile != null) {
+ localFile.destroy();
+ }
timeStoreTable.put(indexFile.getTimestamp(), indexFile);
log.info("IndexStoreService recover load remote file, timestamp:
{}", indexFile.getTimestamp());
}
@@ -248,6 +257,7 @@ public class IndexStoreService extends ServiceThread
implements IndexService {
if
(IndexFile.IndexStatusEnum.UPLOAD.equals(indexFile.getFileStatus())) {
log.error("IndexStoreService file status not correct, so skip,
timestamp: {}, status: {}",
indexFile.getTimestamp(), indexFile.getFileStatus());
+ indexFile.destroy();
return;
}