This is an automated email from the ASF dual-hosted git repository.
lollipop pushed a commit to branch develop
in repository https://gitbox.apache.org/repos/asf/rocketmq.git
The following commit(s) were added to refs/heads/develop by this push:
new b2deef179d [ISSUE #7144] Accelerate the recovery speed of the tiered
storage module (#7145)
b2deef179d is described below
commit b2deef179dbc6a9eb1a2b6dd7b652d95cb768295
Author: lizhimins <[email protected]>
AuthorDate: Thu Aug 10 10:38:47 2023 +0800
[ISSUE #7144] Accelerate the recovery speed of the tiered storage module
(#7145)
---
.../rocketmq/tieredstore/TieredDispatcher.java | 3 +
.../rocketmq/tieredstore/TieredMessageStore.java | 2 +-
.../tieredstore/common/TieredStoreExecutor.java | 25 ++--
.../tieredstore/file/CompositeFlatFile.java | 15 +--
.../tieredstore/file/CompositeQueueFlatFile.java | 20 ++-
.../rocketmq/tieredstore/file/TieredCommitLog.java | 24 ++--
.../rocketmq/tieredstore/file/TieredFlatFile.java | 42 ++++---
.../tieredstore/file/TieredFlatFileManager.java | 135 ++++++++++++---------
.../tieredstore/metadata/FileSegmentMetadata.java | 26 +++-
.../rocketmq/tieredstore/TieredDispatcherTest.java | 15 ++-
.../tieredstore/TieredMessageFetcherTest.java | 2 +-
.../file/CompositeQueueFlatFileTest.java | 2 +-
.../file/TieredFlatFileManagerTest.java | 7 +-
13 files changed, 194 insertions(+), 124 deletions(-)
diff --git
a/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/TieredDispatcher.java
b/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/TieredDispatcher.java
index bb58ea7dd5..1746190cdb 100644
---
a/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/TieredDispatcher.java
+++
b/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/TieredDispatcher.java
@@ -279,6 +279,9 @@ public class TieredDispatcher extends ServiceThread
implements CommitLogDispatch
long upperBound = Math.min(dispatchOffset + maxCount,
maxOffsetInQueue);
ConsumeQueue consumeQueue = (ConsumeQueue)
defaultStore.getConsumeQueue(topic, queueId);
+ logger.debug("DispatchFlatFile race, topic={}, queueId={}, cq
range={}-{}, dispatch offset={}-{}",
+ topic, queueId, minOffsetInQueue, maxOffsetInQueue,
dispatchOffset, upperBound - 1);
+
for (; dispatchOffset < upperBound; dispatchOffset++) {
// get consume queue
SelectMappedBufferResult cqItem =
consumeQueue.getIndexBuffer(dispatchOffset);
diff --git
a/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/TieredMessageStore.java
b/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/TieredMessageStore.java
index 1f12410f2e..ced1fb8181 100644
---
a/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/TieredMessageStore.java
+++
b/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/TieredMessageStore.java
@@ -147,7 +147,7 @@ public class TieredMessageStore extends
AbstractPluginMessageStore {
int queueId, long offset, int maxMsgNums, MessageFilter messageFilter)
{
if (!viaTieredStorage(topic, queueId, offset, maxMsgNums)) {
- logger.debug("GetMessageAsync from next store topic: {}, queue:
{}, offset: {}", topic, queueId, offset);
+ logger.trace("GetMessageAsync from next store topic: {}, queue:
{}, offset: {}", topic, queueId, offset);
return next.getMessageAsync(group, topic, queueId, offset,
maxMsgNums, messageFilter);
}
diff --git
a/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/common/TieredStoreExecutor.java
b/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/common/TieredStoreExecutor.java
index 6eb3478b3d..6dd0e8846e 100644
---
a/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/common/TieredStoreExecutor.java
+++
b/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/common/TieredStoreExecutor.java
@@ -43,18 +43,9 @@ public class TieredStoreExecutor {
public static ExecutorService compactIndexFileExecutor;
public static void init() {
- dispatchThreadPoolQueue = new LinkedBlockingQueue<>(QUEUE_CAPACITY);
- dispatchExecutor = new ThreadPoolExecutor(
- Math.max(2, Runtime.getRuntime().availableProcessors()),
- Math.max(16, Runtime.getRuntime().availableProcessors() * 4),
- 1000 * 60,
- TimeUnit.MILLISECONDS,
- dispatchThreadPoolQueue,
- new ThreadFactoryImpl("TieredCommonExecutor_"));
-
commonScheduledExecutor = new ScheduledThreadPoolExecutor(
Math.max(4, Runtime.getRuntime().availableProcessors()),
- new ThreadFactoryImpl("TieredCommonScheduledExecutor_"));
+ new ThreadFactoryImpl("TieredCommonExecutor_"));
commitExecutor = new ScheduledThreadPoolExecutor(
Math.max(16, Runtime.getRuntime().availableProcessors() * 4),
@@ -62,7 +53,17 @@ public class TieredStoreExecutor {
cleanExpiredFileExecutor = new ScheduledThreadPoolExecutor(
Math.max(4, Runtime.getRuntime().availableProcessors()),
- new ThreadFactoryImpl("TieredCleanExpiredFileExecutor_"));
+ new ThreadFactoryImpl("TieredCleanFileExecutor_"));
+
+ dispatchThreadPoolQueue = new LinkedBlockingQueue<>(QUEUE_CAPACITY);
+ dispatchExecutor = new ThreadPoolExecutor(
+ Math.max(2, Runtime.getRuntime().availableProcessors()),
+ Math.max(16, Runtime.getRuntime().availableProcessors() * 4),
+ 1000 * 60,
+ TimeUnit.MILLISECONDS,
+ dispatchThreadPoolQueue,
+ new ThreadFactoryImpl("TieredDispatchExecutor_"),
+ new ThreadPoolExecutor.DiscardOldestPolicy());
fetchDataThreadPoolQueue = new LinkedBlockingQueue<>(QUEUE_CAPACITY);
fetchDataExecutor = new ThreadPoolExecutor(
@@ -71,7 +72,7 @@ public class TieredStoreExecutor {
1000 * 60,
TimeUnit.MILLISECONDS,
fetchDataThreadPoolQueue,
- new ThreadFactoryImpl("TieredFetchDataExecutor_"));
+ new ThreadFactoryImpl("TieredFetchExecutor_"));
compactIndexFileThreadPoolQueue = new
LinkedBlockingQueue<>(QUEUE_CAPACITY);
compactIndexFileExecutor = new ThreadPoolExecutor(
diff --git
a/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/file/CompositeFlatFile.java
b/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/file/CompositeFlatFile.java
index df4baf33f4..5ad3a6ff32 100644
---
a/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/file/CompositeFlatFile.java
+++
b/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/file/CompositeFlatFile.java
@@ -76,20 +76,15 @@ public class CompositeFlatFile implements CompositeAccess {
this.storeConfig = fileQueueFactory.getStoreConfig();
this.readAheadFactor = this.storeConfig.getReadAheadMinFactor();
this.metadataStore =
TieredStoreUtil.getMetadataStore(this.storeConfig);
- this.dispatchOffset = new AtomicLong();
this.compositeFlatFileLock = new ReentrantLock();
this.inFlightRequestMap = new ConcurrentHashMap<>();
this.commitLog = new TieredCommitLog(fileQueueFactory, filePath);
this.consumeQueue = new TieredConsumeQueue(fileQueueFactory, filePath);
+ this.dispatchOffset = new AtomicLong(
+ this.consumeQueue.isInitialized() ?
this.getConsumeQueueCommitOffset() : -1L);
this.groupOffsetCache = this.initOffsetCache();
}
- protected void recoverMetadata() {
- if (!consumeQueue.isInitialized() && this.dispatchOffset.get() != -1) {
- consumeQueue.setBaseOffset(this.dispatchOffset.get() *
TieredConsumeQueue.CONSUME_QUEUE_STORE_UNIT_SIZE);
- }
- }
-
private Cache<String, Long> initOffsetCache() {
return Caffeine.newBuilder()
.expireAfterWrite(2, TimeUnit.MINUTES)
@@ -310,10 +305,12 @@ public class CompositeFlatFile implements CompositeAccess
{
@Override
public void initOffset(long offset) {
- if (!consumeQueue.isInitialized()) {
+ if (consumeQueue.isInitialized()) {
+ dispatchOffset.set(this.getConsumeQueueCommitOffset());
+ } else {
consumeQueue.setBaseOffset(offset *
TieredConsumeQueue.CONSUME_QUEUE_STORE_UNIT_SIZE);
+ dispatchOffset.set(offset);
}
- dispatchOffset.set(offset);
}
@Override
diff --git
a/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/file/CompositeQueueFlatFile.java
b/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/file/CompositeQueueFlatFile.java
index f6c0afed05..0a797f465f 100644
---
a/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/file/CompositeQueueFlatFile.java
+++
b/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/file/CompositeQueueFlatFile.java
@@ -36,8 +36,7 @@ public class CompositeQueueFlatFile extends CompositeFlatFile
{
public CompositeQueueFlatFile(TieredFileAllocator fileQueueFactory,
MessageQueue messageQueue) {
super(fileQueueFactory, TieredStoreUtil.toPath(messageQueue));
this.messageQueue = messageQueue;
- this.recoverTopicMetadata();
- super.recoverMetadata();
+ this.recoverQueueMetadata();
this.indexFile = TieredFlatFileManager.getIndexFile(storeConfig);
}
@@ -46,11 +45,12 @@ public class CompositeQueueFlatFile extends
CompositeFlatFile {
if (!consumeQueue.isInitialized()) {
queueMetadata.setMinOffset(offset);
queueMetadata.setMaxOffset(offset);
+ metadataStore.updateQueue(queueMetadata);
}
super.initOffset(offset);
}
- public void recoverTopicMetadata() {
+ public void recoverQueueMetadata() {
TopicMetadata topicMetadata =
this.metadataStore.getTopic(messageQueue.getTopic());
if (topicMetadata == null) {
topicMetadata =
this.metadataStore.addTopic(messageQueue.getTopic(), -1L);
@@ -64,18 +64,16 @@ public class CompositeQueueFlatFile extends
CompositeFlatFile {
if (queueMetadata.getMaxOffset() < queueMetadata.getMinOffset()) {
queueMetadata.setMaxOffset(queueMetadata.getMinOffset());
}
- this.dispatchOffset.set(queueMetadata.getMaxOffset());
}
- public void persistMetadata() {
+ public void flushMetadata() {
try {
- if (consumeQueue.getCommitOffset() < queueMetadata.getMinOffset())
{
- return;
- }
- queueMetadata.setMaxOffset(consumeQueue.getCommitOffset() /
TieredConsumeQueue.CONSUME_QUEUE_STORE_UNIT_SIZE);
+ queueMetadata.setMinOffset(super.getConsumeQueueMinOffset());
+ queueMetadata.setMaxOffset(super.getConsumeQueueMaxOffset());
metadataStore.updateQueue(queueMetadata);
} catch (Exception e) {
- LOGGER.error("CompositeFlatFile#flushMetadata: update queue
metadata failed: topic: {}, queue: {}", messageQueue.getTopic(),
messageQueue.getQueueId(), e);
+ LOGGER.error("CompositeFlatFile#flushMetadata error, topic: {},
queue: {}",
+ messageQueue.getTopic(), messageQueue.getQueueId(), e);
}
}
@@ -114,7 +112,7 @@ public class CompositeQueueFlatFile extends
CompositeFlatFile {
@Override
public void shutdown() {
super.shutdown();
- metadataStore.updateQueue(queueMetadata);
+ this.flushMetadata();
}
@Override
diff --git
a/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/file/TieredCommitLog.java
b/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/file/TieredCommitLog.java
index 80e1bce506..0e5f79132f 100644
---
a/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/file/TieredCommitLog.java
+++
b/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/file/TieredCommitLog.java
@@ -50,7 +50,7 @@ public class TieredCommitLog {
this.storeConfig = fileQueueFactory.getStoreConfig();
this.flatFile = fileQueueFactory.createFlatFileForCommitLog(filePath);
this.minConsumeQueueOffset = new AtomicLong(NOT_EXIST_MIN_OFFSET);
- this.correctMinOffset();
+ this.correctMinOffsetAsync();
}
@VisibleForTesting
@@ -91,17 +91,26 @@ public class TieredCommitLog {
return flatFile.getFileToWrite().getMaxTimestamp();
}
- public synchronized long correctMinOffset() {
+ public long correctMinOffset() {
+ try {
+ return correctMinOffsetAsync().get();
+ } catch (Exception e) {
+ log.error("Correct min offset failed in clean expired file", e);
+ }
+ return NOT_EXIST_MIN_OFFSET;
+ }
+
+ public synchronized CompletableFuture<Long> correctMinOffsetAsync() {
if (flatFile.getFileSegmentCount() == 0) {
this.minConsumeQueueOffset.set(NOT_EXIST_MIN_OFFSET);
- return NOT_EXIST_MIN_OFFSET;
+ return CompletableFuture.completedFuture(NOT_EXIST_MIN_OFFSET);
}
// queue offset field length is 8
int length = MessageBufferUtil.QUEUE_OFFSET_POSITION + 8;
if (flatFile.getCommitOffset() - flatFile.getMinOffset() < length) {
this.minConsumeQueueOffset.set(NOT_EXIST_MIN_OFFSET);
- return NOT_EXIST_MIN_OFFSET;
+ return CompletableFuture.completedFuture(NOT_EXIST_MIN_OFFSET);
}
try {
@@ -109,7 +118,8 @@ public class TieredCommitLog {
.thenApply(buffer -> {
long offset = MessageBufferUtil.getQueueOffset(buffer);
minConsumeQueueOffset.set(offset);
- log.info("Correct commitlog min cq offset success,
filePath={}, min cq offset={}, range={}-{}",
+ log.debug("Correct commitlog min cq offset success, " +
+ "filePath={}, min cq offset={}, commitlog
range={}-{}",
flatFile.getFilePath(), offset,
flatFile.getMinOffset(), flatFile.getCommitOffset());
return offset;
})
@@ -117,11 +127,11 @@ public class TieredCommitLog {
log.warn("Correct commitlog min cq offset error,
filePath={}, range={}-{}",
flatFile.getFilePath(), flatFile.getMinOffset(),
flatFile.getCommitOffset(), throwable);
return minConsumeQueueOffset.get();
- }).get();
+ });
} catch (Exception e) {
log.error("Correct commitlog min cq offset error, filePath={}",
flatFile.getFilePath(), e);
}
- return minConsumeQueueOffset.get();
+ return CompletableFuture.completedFuture(minConsumeQueueOffset.get());
}
public AppendResult append(ByteBuffer byteBuf) {
diff --git
a/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/file/TieredFlatFile.java
b/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/file/TieredFlatFile.java
index 75ce8d89f2..426c4e09d3 100644
---
a/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/file/TieredFlatFile.java
+++
b/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/file/TieredFlatFile.java
@@ -16,6 +16,7 @@
*/
package org.apache.rocketmq.tieredstore.file;
+import com.alibaba.fastjson.JSON;
import com.google.common.annotations.VisibleForTesting;
import java.nio.ByteBuffer;
import java.util.ArrayList;
@@ -24,6 +25,7 @@ import java.util.Comparator;
import java.util.HashSet;
import java.util.LinkedList;
import java.util.List;
+import java.util.Objects;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CopyOnWriteArrayList;
@@ -178,32 +180,26 @@ public class TieredFlatFile {
private FileSegmentMetadata
getOrCreateFileSegmentMetadata(TieredFileSegment fileSegment) {
FileSegmentMetadata metadata = tieredMetadataStore.getFileSegment(
- fileSegment.getPath(), fileSegment.getFileType(),
fileSegment.getBaseOffset());
-
- if (metadata != null) {
- return metadata;
- }
+ this.filePath, fileSegment.getFileType(),
fileSegment.getBaseOffset());
// Note: file segment path may not the same as file base path, use
base path here.
- metadata = new FileSegmentMetadata(
- this.filePath, fileSegment.getBaseOffset(),
fileSegment.getFileType().getType());
-
- if (fileSegment.isClosed()) {
- metadata.setStatus(FileSegmentMetadata.STATUS_DELETED);
+ if (metadata == null) {
+ metadata = new FileSegmentMetadata(
+ this.filePath, fileSegment.getBaseOffset(),
fileSegment.getFileType().getType());
+ metadata.setCreateTimestamp(fileSegment.getMinTimestamp());
+ metadata.setBeginTimestamp(fileSegment.getMinTimestamp());
+ metadata.setEndTimestamp(fileSegment.getMaxTimestamp());
+ if (fileSegment.isClosed()) {
+ metadata.setStatus(FileSegmentMetadata.STATUS_DELETED);
+ }
+ this.tieredMetadataStore.updateFileSegment(metadata);
}
-
- metadata.setBeginTimestamp(fileSegment.getMinTimestamp());
- metadata.setEndTimestamp(fileSegment.getMaxTimestamp());
-
- // Submit to persist
- this.tieredMetadataStore.updateFileSegment(metadata);
return metadata;
}
/**
* FileQueue Status: Sealed | Sealed | Sealed | Not sealed, Allow appended
&& Not Full
*/
- @VisibleForTesting
public void updateFileSegment(TieredFileSegment fileSegment) {
FileSegmentMetadata segmentMetadata =
getOrCreateFileSegmentMetadata(fileSegment);
@@ -219,9 +215,16 @@ public class TieredFlatFile {
}
segmentMetadata.setSize(fileSegment.getCommitPosition());
- segmentMetadata.setBeginTimestamp(fileSegment.getMinTimestamp());
segmentMetadata.setEndTimestamp(fileSegment.getMaxTimestamp());
- this.tieredMetadataStore.updateFileSegment(segmentMetadata);
+
+ FileSegmentMetadata metadata = tieredMetadataStore.getFileSegment(
+ this.filePath, fileSegment.getFileType(),
fileSegment.getBaseOffset());
+
+ if (!Objects.equals(metadata, segmentMetadata)) {
+ this.tieredMetadataStore.updateFileSegment(segmentMetadata);
+ logger.debug("TieredFlatFile#UpdateSegmentMetadata, filePath: {},
content: {}",
+ segmentMetadata.getPath(), JSON.toJSONString(segmentMetadata));
+ }
}
private void checkAndFixFileSize() {
@@ -257,6 +260,7 @@ public class TieredFlatFile {
logger.warn("TieredFlatFile#checkAndFixFileSize: fix last file
{} size: origin: {}, actual: {}",
lastFile.getPath(), lastFile.getCommitOffset() -
lastFile.getBaseOffset(), lastFileSize);
lastFile.initPosition(lastFileSize);
+ this.updateFileSegment(lastFile);
}
}
}
diff --git
a/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/file/TieredFlatFileManager.java
b/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/file/TieredFlatFileManager.java
index aeca44b8cb..e9ae4a5a52 100644
---
a/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/file/TieredFlatFileManager.java
+++
b/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/file/TieredFlatFileManager.java
@@ -16,16 +16,19 @@
*/
package org.apache.rocketmq.tieredstore.file;
+import com.google.common.base.Stopwatch;
import com.google.common.collect.ImmutableList;
import java.util.ArrayList;
import java.util.List;
import java.util.Random;
+import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
-import java.util.concurrent.Future;
+import java.util.concurrent.Semaphore;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;
import javax.annotation.Nullable;
+import org.apache.rocketmq.common.constant.LoggerName;
import org.apache.rocketmq.common.message.MessageQueue;
import org.apache.rocketmq.logging.org.slf4j.Logger;
import org.apache.rocketmq.logging.org.slf4j.LoggerFactory;
@@ -36,6 +39,7 @@ import org.apache.rocketmq.tieredstore.util.TieredStoreUtil;
public class TieredFlatFileManager {
+ private static final Logger BROKER_LOG =
LoggerFactory.getLogger(LoggerName.BROKER_LOGGER_NAME);
private static final Logger logger =
LoggerFactory.getLogger(TieredStoreUtil.TIERED_STORE_LOGGER_NAME);
private static volatile TieredFlatFileManager instance;
@@ -44,7 +48,7 @@ public class TieredFlatFileManager {
private final TieredMetadataStore metadataStore;
private final TieredMessageStoreConfig storeConfig;
private final TieredFileAllocator tieredFileAllocator;
- private final ConcurrentMap<MessageQueue, CompositeQueueFlatFile>
queueFlatFileMap;
+ private final ConcurrentMap<MessageQueue, CompositeQueueFlatFile>
flatFileConcurrentMap;
public TieredFlatFileManager(TieredMessageStoreConfig storeConfig)
throws ClassNotFoundException, NoSuchMethodException {
@@ -52,23 +56,20 @@ public class TieredFlatFileManager {
this.storeConfig = storeConfig;
this.metadataStore = TieredStoreUtil.getMetadataStore(storeConfig);
this.tieredFileAllocator = new TieredFileAllocator(storeConfig);
- this.queueFlatFileMap = new ConcurrentHashMap<>();
+ this.flatFileConcurrentMap = new ConcurrentHashMap<>();
this.doScheduleTask();
}
public static TieredFlatFileManager getInstance(TieredMessageStoreConfig
storeConfig) {
- if (storeConfig == null) {
+ if (storeConfig == null || instance != null) {
return instance;
}
-
- if (instance == null) {
- synchronized (TieredFlatFileManager.class) {
- if (instance == null) {
- try {
- instance = new TieredFlatFileManager(storeConfig);
- } catch (Exception e) {
- logger.error("TieredFlatFileManager#getInstance:
create flat file manager failed", e);
- }
+ synchronized (TieredFlatFileManager.class) {
+ if (instance == null) {
+ try {
+ instance = new TieredFlatFileManager(storeConfig);
+ } catch (Exception e) {
+ logger.error("Construct FlatFileManager instance error",
e);
}
}
}
@@ -88,7 +89,7 @@ public class TieredFlatFileManager {
TieredStoreUtil.RMQ_SYS_TIERED_STORE_INDEX_TOPIC,
storeConfig.getBrokerName(), 0));
indexFile = new TieredIndexFile(new
TieredFileAllocator(storeConfig), filePath);
} catch (Exception e) {
- logger.error("TieredFlatFileManager#getIndexFile:
create index file failed", e);
+ logger.error("Construct FlatFileManager indexFile
error", e);
}
}
}
@@ -105,7 +106,7 @@ public class TieredFlatFileManager {
flatFile.commitCommitLog();
} catch (Throwable e) {
MessageQueue mq = flatFile.getMessageQueue();
- logger.error("commit commitLog periodically failed: topic:
{}, queue: {}",
+ logger.error("Commit commitLog periodically failed: topic:
{}, queue: {}",
mq.getTopic(), mq.getQueueId(), e);
}
}, delay, TimeUnit.MILLISECONDS);
@@ -114,7 +115,7 @@ public class TieredFlatFileManager {
flatFile.commitConsumeQueue();
} catch (Throwable e) {
MessageQueue mq = flatFile.getMessageQueue();
- logger.error("commit consumeQueue periodically failed:
topic: {}, queue: {}",
+ logger.error("Commit consumeQueue periodically failed:
topic: {}, queue: {}",
mq.getTopic(), mq.getQueueId(), e);
}
}, delay, TimeUnit.MILLISECONDS);
@@ -125,7 +126,7 @@ public class TieredFlatFileManager {
indexFile.commit(true);
}
} catch (Throwable e) {
- logger.error("commit indexFile periodically failed", e);
+ logger.error("Commit indexFile periodically failed", e);
}
}, 0, TimeUnit.MILLISECONDS);
}
@@ -160,7 +161,7 @@ public class TieredFlatFileManager {
try {
doCommit();
} catch (Throwable e) {
- logger.error("commit flat file periodically failed: ", e);
+ logger.error("Commit flat file periodically failed: ", e);
}
}, 60, 60, TimeUnit.SECONDS);
@@ -168,49 +169,73 @@ public class TieredFlatFileManager {
try {
doCleanExpiredFile();
} catch (Throwable e) {
- logger.error("clean expired flat file failed: ", e);
+ logger.error("Clean expired flat file failed: ", e);
}
}, 30, 30, TimeUnit.SECONDS);
}
public boolean load() {
+ Stopwatch stopwatch = Stopwatch.createStarted();
try {
- AtomicLong topicSequenceNumber = new AtomicLong();
- List<Future<?>> futureList = new ArrayList<>();
- queueFlatFileMap.clear();
- metadataStore.iterateTopic(topicMetadata -> {
+ flatFileConcurrentMap.clear();
+ this.recoverSequenceNumber();
+ this.recoverTieredFlatFile();
+ logger.info("Message store recover end, total cost={}ms",
stopwatch.elapsed(TimeUnit.MILLISECONDS));
+ } catch (Exception e) {
+ long costTime = stopwatch.elapsed(TimeUnit.MILLISECONDS);
+ logger.info("Message store recover error, total cost={}ms",
costTime);
+ BROKER_LOG.error("Message store recover error, total cost={}ms",
costTime, e);
+ return false;
+ }
+ return true;
+ }
+
+ public void recoverSequenceNumber() {
+ AtomicLong topicSequenceNumber = new AtomicLong();
+ metadataStore.iterateTopic(topicMetadata -> {
+ if (topicMetadata != null && topicMetadata.getTopicId() > 0) {
topicSequenceNumber.set(Math.max(topicSequenceNumber.get(),
topicMetadata.getTopicId()));
- Future<?> future =
TieredStoreExecutor.dispatchExecutor.submit(() -> {
- if (topicMetadata.getStatus() != 0) {
- return;
- }
+ }
+ });
+
metadataStore.setTopicSequenceNumber(topicSequenceNumber.incrementAndGet());
+ }
+
+ public void recoverTieredFlatFile() {
+ Semaphore semaphore = new Semaphore((int)
(TieredStoreExecutor.QUEUE_CAPACITY * 0.75));
+ List<CompletableFuture<Void>> futures = new ArrayList<>();
+ metadataStore.iterateTopic(topicMetadata -> {
+ try {
+ semaphore.acquire();
+ CompletableFuture<Void> future = CompletableFuture.runAsync(()
-> {
try {
- metadataStore.iterateQueue(topicMetadata.getTopic(),
- queueMetadata -> getOrCreateFlatFileIfAbsent(
- new MessageQueue(topicMetadata.getTopic(),
- storeConfig.getBrokerName(),
- queueMetadata.getQueue().getQueueId())));
+ Stopwatch subWatch = Stopwatch.createStarted();
+ if (topicMetadata.getStatus() != 0) {
+ return;
+ }
+ AtomicLong queueCount = new AtomicLong();
+ metadataStore.iterateQueue(topicMetadata.getTopic(),
queueMetadata -> {
+ this.getOrCreateFlatFileIfAbsent(new
MessageQueue(topicMetadata.getTopic(),
+ storeConfig.getBrokerName(),
queueMetadata.getQueue().getQueueId()));
+ queueCount.incrementAndGet();
+ });
+ logger.info("Recover TopicFlatFile, topic: {},
queueCount: {}, cost: {}ms",
+ topicMetadata.getTopic(), queueCount.get(),
subWatch.elapsed(TimeUnit.MILLISECONDS));
} catch (Exception e) {
- logger.error("load mq composite flat file from
metadata failed", e);
+ logger.error("Recover TopicFlatFile error, topic: {}",
topicMetadata.getTopic(), e);
+ } finally {
+ semaphore.release();
}
- });
- futureList.add(future);
- });
-
- // Wait for load all metadata done
- for (Future<?> future : futureList) {
- future.get();
+ }, TieredStoreExecutor.commitExecutor);
+ futures.add(future);
+ } catch (Exception e) {
+ throw new RuntimeException(e);
}
-
metadataStore.setTopicSequenceNumber(topicSequenceNumber.incrementAndGet());
- } catch (Exception e) {
- logger.error("load mq composite flat file from metadata failed",
e);
- return false;
- }
- return true;
+ });
+ CompletableFuture.allOf(futures.toArray(new
CompletableFuture[0])).join();
}
public void cleanup() {
- queueFlatFileMap.clear();
+ flatFileConcurrentMap.clear();
cleanStaticReference();
}
@@ -221,27 +246,25 @@ public class TieredFlatFileManager {
@Nullable
public CompositeQueueFlatFile getOrCreateFlatFileIfAbsent(MessageQueue
messageQueue) {
- return queueFlatFileMap.computeIfAbsent(messageQueue, mq -> {
+ return flatFileConcurrentMap.computeIfAbsent(messageQueue, mq -> {
try {
-
logger.debug("TieredFlatFileManager#getOrCreateFlatFileIfAbsent: " +
- "try to create new flat file: topic: {}, queueId: {}",
+ logger.debug("Create new TopicFlatFile, topic: {}, queueId:
{}",
messageQueue.getTopic(), messageQueue.getQueueId());
return new CompositeQueueFlatFile(tieredFileAllocator, mq);
} catch (Exception e) {
-
logger.error("TieredFlatFileManager#getOrCreateFlatFileIfAbsent: " +
- "create new flat file: topic: {}, queueId: {}",
+ logger.debug("Create new TopicFlatFile failed, topic: {},
queueId: {}",
messageQueue.getTopic(), messageQueue.getQueueId(), e);
- return null;
}
+ return null;
});
}
public CompositeQueueFlatFile getFlatFile(MessageQueue messageQueue) {
- return queueFlatFileMap.get(messageQueue);
+ return flatFileConcurrentMap.get(messageQueue);
}
public ImmutableList<CompositeQueueFlatFile> deepCopyFlatFileToList() {
- return ImmutableList.copyOf(queueFlatFileMap.values());
+ return ImmutableList.copyOf(flatFileConcurrentMap.values());
}
public void shutdown() {
@@ -270,7 +293,7 @@ public class TieredFlatFileManager {
}
// delete memory reference
- CompositeQueueFlatFile flatFile = queueFlatFileMap.remove(mq);
+ CompositeQueueFlatFile flatFile = flatFileConcurrentMap.remove(mq);
if (flatFile != null) {
MessageQueue messageQueue = flatFile.getMessageQueue();
logger.info("TieredFlatFileManager#destroyCompositeFile: " +
diff --git
a/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/metadata/FileSegmentMetadata.java
b/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/metadata/FileSegmentMetadata.java
index 1b232fc750..2f0fd71deb 100644
---
a/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/metadata/FileSegmentMetadata.java
+++
b/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/metadata/FileSegmentMetadata.java
@@ -16,6 +16,8 @@
*/
package org.apache.rocketmq.tieredstore.metadata;
+import java.util.Objects;
+
public class FileSegmentMetadata {
public static final int STATUS_NEW = 0;
@@ -43,7 +45,6 @@ public class FileSegmentMetadata {
this.baseOffset = baseOffset;
this.type = type;
this.status = STATUS_NEW;
- this.createTimestamp = System.currentTimeMillis();
}
public void markSealed() {
@@ -122,4 +123,27 @@ public class FileSegmentMetadata {
public void setSealTimestamp(long sealTimestamp) {
this.sealTimestamp = sealTimestamp;
}
+
+ @Override
+ public boolean equals(Object o) {
+ if (this == o)
+ return true;
+ if (o == null || getClass() != o.getClass())
+ return false;
+ FileSegmentMetadata metadata = (FileSegmentMetadata) o;
+ return size == metadata.size
+ && baseOffset == metadata.baseOffset
+ && status == metadata.status
+ && path.equals(metadata.path)
+ && type == metadata.type
+ && createTimestamp == metadata.createTimestamp
+ && beginTimestamp == metadata.beginTimestamp
+ && endTimestamp == metadata.endTimestamp
+ && sealTimestamp == metadata.sealTimestamp;
+ }
+
+ @Override
+ public int hashCode() {
+ return Objects.hash(type, path, baseOffset, status, size,
createTimestamp, beginTimestamp, endTimestamp, sealTimestamp);
+ }
}
diff --git
a/tieredstore/src/test/java/org/apache/rocketmq/tieredstore/TieredDispatcherTest.java
b/tieredstore/src/test/java/org/apache/rocketmq/tieredstore/TieredDispatcherTest.java
index e6adef1d1d..5791dc9a4e 100644
---
a/tieredstore/src/test/java/org/apache/rocketmq/tieredstore/TieredDispatcherTest.java
+++
b/tieredstore/src/test/java/org/apache/rocketmq/tieredstore/TieredDispatcherTest.java
@@ -116,19 +116,20 @@ public class TieredDispatcherTest {
buffer3.putLong(MessageBufferUtil.QUEUE_OFFSET_POSITION, 9);
flatFile.appendCommitLog(buffer3);
flatFile.commitCommitLog();
- Assert.assertEquals(10, flatFile.getDispatchOffset());
+ Assert.assertEquals(9 + 1, flatFile.getDispatchOffset());
+ Assert.assertEquals(9, flatFile.getCommitLogDispatchCommitOffset());
dispatcher.doRedispatchRequestToWriteMap(AppendResult.SUCCESS,
flatFile, 8, 8, 0, 0, buffer1);
dispatcher.doRedispatchRequestToWriteMap(AppendResult.SUCCESS,
flatFile, 9, 9, 0, 0, buffer2);
dispatcher.buildConsumeQueueAndIndexFile();
Assert.assertEquals(7, flatFile.getConsumeQueueMaxOffset());
- Assert.assertEquals(7, flatFile.getDispatchOffset());
dispatcher.doRedispatchRequestToWriteMap(AppendResult.SUCCESS,
flatFile, 7, 7, 0, 0, buffer1);
dispatcher.doRedispatchRequestToWriteMap(AppendResult.SUCCESS,
flatFile, 8, 8, 0, 0, buffer2);
dispatcher.doRedispatchRequestToWriteMap(AppendResult.SUCCESS,
flatFile, 9, 9, 0, 0, buffer3);
dispatcher.buildConsumeQueueAndIndexFile();
- Assert.assertEquals(10, flatFile.getConsumeQueueMaxOffset());
+ Assert.assertEquals(6, flatFile.getConsumeQueueMinOffset());
+ Assert.assertEquals(9 + 1, flatFile.getConsumeQueueMaxOffset());
}
@Test
@@ -142,6 +143,7 @@ public class TieredDispatcherTest {
Mockito.when(defaultStore.getMinOffsetInQueue(mq.getTopic(),
mq.getQueueId())).thenReturn(0L);
Mockito.when(defaultStore.getMaxOffsetInQueue(mq.getTopic(),
mq.getQueueId())).thenReturn(9L);
+ // mock cq item, position = 7
ByteBuffer cqItem =
ByteBuffer.allocate(ConsumeQueue.CQ_STORE_UNIT_SIZE);
cqItem.putLong(7);
cqItem.putInt(MessageBufferUtilTest.MSG_LEN);
@@ -150,13 +152,13 @@ public class TieredDispatcherTest {
SelectMappedBufferResult mockResult = new SelectMappedBufferResult(0,
cqItem, ConsumeQueue.CQ_STORE_UNIT_SIZE, null);
Mockito.when(((ConsumeQueue)
defaultStore.getConsumeQueue(mq.getTopic(),
mq.getQueueId())).getIndexBuffer(6)).thenReturn(mockResult);
+ // mock cq item, position = 8
cqItem = ByteBuffer.allocate(ConsumeQueue.CQ_STORE_UNIT_SIZE);
cqItem.putLong(8);
cqItem.putInt(MessageBufferUtilTest.MSG_LEN);
cqItem.putLong(1);
cqItem.flip();
mockResult = new SelectMappedBufferResult(0, cqItem,
ConsumeQueue.CQ_STORE_UNIT_SIZE, null);
-
Mockito.when(((ConsumeQueue)
defaultStore.getConsumeQueue(mq.getTopic(),
mq.getQueueId())).getIndexBuffer(7)).thenReturn(mockResult);
mockResult = new SelectMappedBufferResult(0,
MessageBufferUtilTest.buildMockedMessageBuffer(),
MessageBufferUtilTest.MSG_LEN, null);
@@ -167,7 +169,10 @@ public class TieredDispatcherTest {
mockResult = new SelectMappedBufferResult(0, msg,
MessageBufferUtilTest.MSG_LEN, null);
Mockito.when(defaultStore.selectOneMessageByOffset(8,
MessageBufferUtilTest.MSG_LEN)).thenReturn(mockResult);
-
dispatcher.dispatchFlatFile(flatFileManager.getOrCreateFlatFileIfAbsent(mq));
+ CompositeQueueFlatFile flatFile =
flatFileManager.getOrCreateFlatFileIfAbsent(mq);
+ Assert.assertNotNull(flatFile);
+ flatFile.initOffset(7);
+ dispatcher.dispatchFlatFile(flatFile);
Assert.assertEquals(8,
flatFileManager.getFlatFile(mq).getDispatchOffset());
}
}
diff --git
a/tieredstore/src/test/java/org/apache/rocketmq/tieredstore/TieredMessageFetcherTest.java
b/tieredstore/src/test/java/org/apache/rocketmq/tieredstore/TieredMessageFetcherTest.java
index d75b2f9164..774c6cf646 100644
---
a/tieredstore/src/test/java/org/apache/rocketmq/tieredstore/TieredMessageFetcherTest.java
+++
b/tieredstore/src/test/java/org/apache/rocketmq/tieredstore/TieredMessageFetcherTest.java
@@ -23,6 +23,7 @@ import java.util.Objects;
import java.util.concurrent.TimeUnit;
import org.apache.commons.lang3.SystemUtils;
import org.apache.commons.lang3.tuple.Triple;
+import org.apache.rocketmq.common.BoundaryType;
import org.apache.rocketmq.common.message.MessageQueue;
import org.apache.rocketmq.store.DispatchRequest;
import org.apache.rocketmq.store.GetMessageResult;
@@ -40,7 +41,6 @@ import org.apache.rocketmq.tieredstore.file.TieredIndexFile;
import org.apache.rocketmq.tieredstore.util.MessageBufferUtil;
import org.apache.rocketmq.tieredstore.util.MessageBufferUtilTest;
import org.apache.rocketmq.tieredstore.util.TieredStoreUtil;
-import org.apache.rocketmq.common.BoundaryType;
import org.awaitility.Awaitility;
import org.junit.After;
import org.junit.Assert;
diff --git
a/tieredstore/src/test/java/org/apache/rocketmq/tieredstore/file/CompositeQueueFlatFileTest.java
b/tieredstore/src/test/java/org/apache/rocketmq/tieredstore/file/CompositeQueueFlatFileTest.java
index 27efe111e6..2e028ada32 100644
---
a/tieredstore/src/test/java/org/apache/rocketmq/tieredstore/file/CompositeQueueFlatFileTest.java
+++
b/tieredstore/src/test/java/org/apache/rocketmq/tieredstore/file/CompositeQueueFlatFileTest.java
@@ -119,7 +119,7 @@ public class CompositeQueueFlatFileTest {
Assert.assertEquals(AppendResult.SUCCESS, result);
file.commit(true);
- file.persistMetadata();
+ file.flushMetadata();
QueueMetadata queueMetadata = metadataStore.getQueue(mq);
Assert.assertEquals(53, queueMetadata.getMaxOffset());
diff --git
a/tieredstore/src/test/java/org/apache/rocketmq/tieredstore/file/TieredFlatFileManagerTest.java
b/tieredstore/src/test/java/org/apache/rocketmq/tieredstore/file/TieredFlatFileManagerTest.java
index b7528c5e4f..20fe4dd702 100644
---
a/tieredstore/src/test/java/org/apache/rocketmq/tieredstore/file/TieredFlatFileManagerTest.java
+++
b/tieredstore/src/test/java/org/apache/rocketmq/tieredstore/file/TieredFlatFileManagerTest.java
@@ -72,10 +72,15 @@ public class TieredFlatFileManagerTest {
CompositeFlatFile flatFile = flatFileManager.getFlatFile(mq);
Assert.assertNotNull(flatFile);
- Assert.assertEquals(100, flatFile.getDispatchOffset());
+ Assert.assertEquals(-1L, flatFile.getDispatchOffset());
+ flatFile.initOffset(100L);
+ Assert.assertEquals(100L, flatFile.getDispatchOffset());
+ flatFile.initOffset(200L);
+ Assert.assertEquals(100L, flatFile.getDispatchOffset());
CompositeFlatFile flatFile1 = flatFileManager.getFlatFile(mq1);
Assert.assertNotNull(flatFile1);
+ flatFile1.initOffset(200L);
Assert.assertEquals(200, flatFile1.getDispatchOffset());
flatFileManager.destroyCompositeFile(mq);