This is an automated email from the ASF dual-hosted git repository.
lizhimin pushed a commit to branch develop
in repository https://gitbox.apache.org/repos/asf/rocketmq.git
The following commit(s) were added to refs/heads/develop by this push:
new fd32dae2ab [ISSUE #6633] Not clear uninitialized files and fix
metadata recover (#7342)
fd32dae2ab is described below
commit fd32dae2ab59f86dd215eeec405bf4fa6212bcb3
Author: lizhimins <[email protected]>
AuthorDate: Tue Sep 12 19:58:08 2023 +0800
[ISSUE #6633] Not clear uninitialized files and fix metadata recover (#7342)
---
.../rocketmq/tieredstore/file/TieredFlatFile.java | 53 ++++++++--------------
.../tieredstore/file/TieredFlatFileManager.java | 10 ++--
2 files changed, 22 insertions(+), 41 deletions(-)
diff --git
a/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/file/TieredFlatFile.java
b/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/file/TieredFlatFile.java
index d973179eed..d96eb6e8f3 100644
---
a/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/file/TieredFlatFile.java
+++
b/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/file/TieredFlatFile.java
@@ -16,7 +16,6 @@
*/
package org.apache.rocketmq.tieredstore.file;
-import com.alibaba.fastjson.JSON;
import com.google.common.annotations.VisibleForTesting;
import java.nio.ByteBuffer;
import java.util.ArrayList;
@@ -25,13 +24,13 @@ import java.util.Comparator;
import java.util.HashSet;
import java.util.LinkedList;
import java.util.List;
-import java.util.Objects;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.locks.ReentrantReadWriteLock;
import java.util.stream.Collectors;
import javax.annotation.Nullable;
+import org.apache.rocketmq.common.BoundaryType;
import org.apache.rocketmq.logging.org.slf4j.Logger;
import org.apache.rocketmq.logging.org.slf4j.LoggerFactory;
import org.apache.rocketmq.tieredstore.common.AppendResult;
@@ -43,7 +42,6 @@ import
org.apache.rocketmq.tieredstore.metadata.TieredMetadataStore;
import org.apache.rocketmq.tieredstore.provider.FileSegmentAllocator;
import org.apache.rocketmq.tieredstore.provider.TieredFileSegment;
import org.apache.rocketmq.tieredstore.util.TieredStoreUtil;
-import org.apache.rocketmq.common.BoundaryType;
public class TieredFlatFile {
@@ -177,7 +175,10 @@ public class TieredFlatFile {
}
}
- private FileSegmentMetadata
getOrCreateFileSegmentMetadata(TieredFileSegment fileSegment) {
+ /**
+ * FileQueue Status: Sealed | Sealed | Sealed | Not sealed, Allow appended
&& Not Full
+ */
+ public void updateFileSegment(TieredFileSegment fileSegment) {
FileSegmentMetadata metadata = tieredMetadataStore.getFileSegment(
this.filePath, fileSegment.getFileType(),
fileSegment.getBaseOffset());
@@ -186,45 +187,24 @@ public class TieredFlatFile {
if (metadata == null) {
metadata = new FileSegmentMetadata(
this.filePath, fileSegment.getBaseOffset(),
fileSegment.getFileType().getType());
- metadata.setCreateTimestamp(fileSegment.getMinTimestamp());
- metadata.setBeginTimestamp(fileSegment.getMinTimestamp());
- metadata.setEndTimestamp(fileSegment.getMaxTimestamp());
- if (fileSegment.isClosed()) {
- metadata.setStatus(FileSegmentMetadata.STATUS_DELETED);
- }
- this.tieredMetadataStore.updateFileSegment(metadata);
+ metadata.setCreateTimestamp(System.currentTimeMillis());
}
- return metadata;
- }
-
- /**
- * FileQueue Status: Sealed | Sealed | Sealed | Not sealed, Allow appended
&& Not Full
- */
- public void updateFileSegment(TieredFileSegment fileSegment) {
- FileSegmentMetadata segmentMetadata =
getOrCreateFileSegmentMetadata(fileSegment);
- if (segmentMetadata.getStatus() == FileSegmentMetadata.STATUS_NEW
- && fileSegment.isFull()
- && !fileSegment.needCommit()) {
+ metadata.setSize(fileSegment.getCommitPosition());
+ metadata.setBeginTimestamp(fileSegment.getMinTimestamp());
+ metadata.setEndTimestamp(fileSegment.getMaxTimestamp());
- segmentMetadata.markSealed();
+ if (fileSegment.isFull() && !fileSegment.needCommit()) {
+ if (metadata.getStatus() == FileSegmentMetadata.STATUS_NEW) {
+ metadata.markSealed();
+ }
}
if (fileSegment.isClosed()) {
- segmentMetadata.setStatus(FileSegmentMetadata.STATUS_DELETED);
+ metadata.setStatus(FileSegmentMetadata.STATUS_DELETED);
}
- segmentMetadata.setSize(fileSegment.getCommitPosition());
- segmentMetadata.setEndTimestamp(fileSegment.getMaxTimestamp());
-
- FileSegmentMetadata metadata = tieredMetadataStore.getFileSegment(
- this.filePath, fileSegment.getFileType(),
fileSegment.getBaseOffset());
-
- if (!Objects.equals(metadata, segmentMetadata)) {
- this.tieredMetadataStore.updateFileSegment(segmentMetadata);
- logger.debug("TieredFlatFile#UpdateSegmentMetadata, filePath: {},
content: {}",
- segmentMetadata.getPath(), JSON.toJSONString(segmentMetadata));
- }
+ this.tieredMetadataStore.updateFileSegment(metadata);
}
private void checkAndFixFileSize() {
@@ -598,6 +578,9 @@ public class TieredFlatFile {
logger.error("TieredFlatFile#destroy: mark file segment:
{} is deleted failed", fileSegment.getPath(), e);
}
fileSegment.destroyFile();
+ if (!fileSegment.exists()) {
+ tieredMetadataStore.deleteFileSegment(filePath, fileType,
fileSegment.getBaseOffset());
+ }
}
fileSegmentList.clear();
needCommitFileSegmentList.clear();
diff --git
a/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/file/TieredFlatFileManager.java
b/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/file/TieredFlatFileManager.java
index 7c744af3b9..087ea8c9ce 100644
---
a/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/file/TieredFlatFileManager.java
+++
b/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/file/TieredFlatFileManager.java
@@ -136,15 +136,13 @@ public class TieredFlatFileManager {
TimeUnit.HOURS.toMillis(storeConfig.getTieredStoreFileReservedTime());
for (CompositeQueueFlatFile flatFile : deepCopyFlatFileToList()) {
TieredStoreExecutor.cleanExpiredFileExecutor.submit(() -> {
- flatFile.getCompositeFlatFileLock().lock();
try {
+ flatFile.getCompositeFlatFileLock().lock();
flatFile.cleanExpiredFile(expiredTimeStamp);
flatFile.destroyExpiredFile();
- if (flatFile.getConsumeQueueBaseOffset() == -1) {
- logger.info("Clean flatFile because file not
initialized, topic={}, queueId={}",
- flatFile.getMessageQueue().getTopic(),
flatFile.getMessageQueue().getQueueId());
- destroyCompositeFile(flatFile.getMessageQueue());
- }
+ } catch (Throwable t) {
+ logger.error("Do Clean expired file error, topic={},
queueId={}",
+ flatFile.getMessageQueue().getTopic(),
flatFile.getMessageQueue().getQueueId(), t);
} finally {
flatFile.getCompositeFlatFileLock().unlock();
}