This is an automated email from the ASF dual-hosted git repository.
lizhimin pushed a commit to branch develop
in repository https://gitbox.apache.org/repos/asf/rocketmq.git
The following commit(s) were added to refs/heads/develop by this push:
new e876bed084 [ISSUE #8955] Fix message buffer not release and dispatch
thread exit in tiered storage (#8965)
e876bed084 is described below
commit e876bed084ca9d642011a9d77b82c7f52b582500
Author: lizhimins <[email protected]>
AuthorDate: Fri Nov 22 11:02:50 2024 +0800
[ISSUE #8955] Fix message buffer not release and dispatch thread exit in
tiered storage (#8965)
---
.../core/MessageStoreDispatcherImpl.java | 40 +++++++---
.../rocketmq/tieredstore/index/IndexStoreFile.java | 88 +++++++++++-----------
.../tieredstore/provider/PosixFileSegment.java | 3 +-
3 files changed, 72 insertions(+), 59 deletions(-)
diff --git
a/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/core/MessageStoreDispatcherImpl.java
b/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/core/MessageStoreDispatcherImpl.java
index 982909c5ee..9b1e53564d 100644
---
a/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/core/MessageStoreDispatcherImpl.java
+++
b/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/core/MessageStoreDispatcherImpl.java
@@ -92,8 +92,10 @@ public class MessageStoreDispatcherImpl extends
ServiceThread implements Message
semaphore.acquire();
this.doScheduleDispatch(flatFile, false)
.whenComplete((future, throwable) -> semaphore.release());
- } catch (InterruptedException e) {
+ } catch (Throwable t) {
semaphore.release();
+ log.error("MessageStore dispatch error, topic={}, queueId={}",
+ flatFile.getMessageQueue().getTopic(),
flatFile.getMessageQueue().getQueueId(), t);
}
}
@@ -156,8 +158,7 @@ public class MessageStoreDispatcherImpl extends
ServiceThread implements Message
}
if (currentOffset < minOffsetInQueue) {
- log.warn("MessageDispatcher#dispatch, current offset is too
small, " +
- "topic={}, queueId={}, offset={}-{}, current={}",
+ log.warn("MessageDispatcher#dispatch, current offset is too
small, topic={}, queueId={}, offset={}-{}, current={}",
topic, queueId, minOffsetInQueue, maxOffsetInQueue,
currentOffset);
flatFileStore.destroyFile(flatFile.getMessageQueue());
flatFileStore.computeIfAbsent(new MessageQueue(topic,
brokerName, queueId));
@@ -165,16 +166,14 @@ public class MessageStoreDispatcherImpl extends
ServiceThread implements Message
}
if (currentOffset > maxOffsetInQueue) {
- log.warn("MessageDispatcher#dispatch, current offset is too
large, " +
- "topic: {}, queueId: {}, offset={}-{}, current={}",
+ log.warn("MessageDispatcher#dispatch, current offset is too
large, topic={}, queueId={}, offset={}-{}, current={}",
topic, queueId, minOffsetInQueue, maxOffsetInQueue,
currentOffset);
return CompletableFuture.completedFuture(false);
}
long interval =
TimeUnit.HOURS.toMillis(storeConfig.getCommitLogRollingInterval());
if (flatFile.rollingFile(interval)) {
- log.info("MessageDispatcher#dispatch, rolling file, " +
- "topic: {}, queueId: {}, offset={}-{}, current={}",
+ log.info("MessageDispatcher#dispatch, rolling file, topic={},
queueId={}, offset={}-{}, current={}",
topic, queueId, minOffsetInQueue, maxOffsetInQueue,
currentOffset);
}
@@ -189,8 +188,20 @@ public class MessageStoreDispatcherImpl extends
ServiceThread implements Message
ConsumeQueueInterface consumeQueue =
defaultStore.getConsumeQueue(topic, queueId);
CqUnit cqUnit = consumeQueue.get(currentOffset);
+ if (cqUnit == null) {
+ log.warn("MessageDispatcher#dispatch cq not found, topic={},
queueId={}, offset={}-{}, current={}, remain={}",
+ topic, queueId, minOffsetInQueue, maxOffsetInQueue,
currentOffset, maxOffsetInQueue - currentOffset);
+ return CompletableFuture.completedFuture(false);
+ }
+
SelectMappedBufferResult message =
defaultStore.selectOneMessageByOffset(cqUnit.getPos(),
cqUnit.getSize());
+ if (message == null) {
+ log.warn("MessageDispatcher#dispatch message not found,
topic={}, queueId={}, offset={}-{}, current={}, remain={}",
+ topic, queueId, minOffsetInQueue, maxOffsetInQueue,
currentOffset, maxOffsetInQueue - currentOffset);
+ return CompletableFuture.completedFuture(false);
+ }
+
boolean timeout =
MessageFormatUtil.getStoreTimeStamp(message.getByteBuffer()) +
storeConfig.getTieredStoreGroupCommitTimeout() <
System.currentTimeMillis();
boolean bufferFull = maxOffsetInQueue - currentOffset >
storeConfig.getTieredStoreGroupCommitCount();
@@ -198,6 +209,7 @@ public class MessageStoreDispatcherImpl extends
ServiceThread implements Message
if (!timeout && !bufferFull && !force) {
log.debug("MessageDispatcher#dispatch hold, topic={},
queueId={}, offset={}-{}, current={}, remain={}",
topic, queueId, minOffsetInQueue, maxOffsetInQueue,
currentOffset, maxOffsetInQueue - currentOffset);
+ message.release();
return CompletableFuture.completedFuture(false);
} else {
if
(MessageFormatUtil.getStoreTimeStamp(message.getByteBuffer()) +
@@ -205,11 +217,11 @@ public class MessageStoreDispatcherImpl extends
ServiceThread implements Message
log.warn("MessageDispatcher#dispatch behind too much,
topic={}, queueId={}, offset={}-{}, current={}, remain={}",
topic, queueId, minOffsetInQueue, maxOffsetInQueue,
currentOffset, maxOffsetInQueue - currentOffset);
} else {
- log.info("MessageDispatcher#dispatch, topic={},
queueId={}, offset={}-{}, current={}, remain={}",
+ log.info("MessageDispatcher#dispatch success, topic={},
queueId={}, offset={}-{}, current={}, remain={}",
topic, queueId, minOffsetInQueue, maxOffsetInQueue,
currentOffset, maxOffsetInQueue - currentOffset);
}
+ message.release();
}
- message.release();
long offset = currentOffset;
for (; offset < targetOffset; offset++) {
@@ -279,7 +291,7 @@ public class MessageStoreDispatcherImpl extends
ServiceThread implements Message
}
flatFile.release();
}
- }, MessageStoreExecutor.getInstance().bufferCommitExecutor);
+ }, storeExecutor.bufferCommitExecutor);
}
/**
@@ -301,8 +313,12 @@ public class MessageStoreDispatcherImpl extends
ServiceThread implements Message
public void run() {
log.info("{} service started", this.getServiceName());
while (!this.isStopped()) {
-
flatFileStore.deepCopyFlatFileToList().forEach(this::dispatchWithSemaphore);
- this.waitForRunning(Duration.ofSeconds(20).toMillis());
+ try {
+
flatFileStore.deepCopyFlatFileToList().forEach(this::dispatchWithSemaphore);
+ this.waitForRunning(Duration.ofSeconds(20).toMillis());
+ } catch (Throwable t) {
+ log.error("MessageStore dispatch error", t);
+ }
}
log.info("{} service shutdown", this.getServiceName());
}
diff --git
a/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/index/IndexStoreFile.java
b/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/index/IndexStoreFile.java
index f9604b43e6..25cd634873 100644
---
a/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/index/IndexStoreFile.java
+++
b/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/index/IndexStoreFile.java
@@ -38,7 +38,6 @@ import org.apache.commons.lang3.StringUtils;
import org.apache.rocketmq.store.logfile.DefaultMappedFile;
import org.apache.rocketmq.store.logfile.MappedFile;
import org.apache.rocketmq.tieredstore.MessageStoreConfig;
-import org.apache.rocketmq.tieredstore.MessageStoreExecutor;
import org.apache.rocketmq.tieredstore.common.AppendResult;
import org.apache.rocketmq.tieredstore.provider.FileSegment;
import org.apache.rocketmq.tieredstore.provider.PosixFileSegment;
@@ -261,56 +260,55 @@ public class IndexStoreFile implements IndexFile {
protected CompletableFuture<List<IndexItem>> queryAsyncFromUnsealedFile(
String key, int maxCount, long beginTime, long endTime) {
- return CompletableFuture.supplyAsync(() -> {
- List<IndexItem> result = new ArrayList<>();
- try {
- fileReadWriteLock.readLock().lock();
- if (!UNSEALED.equals(this.fileStatus.get()) &&
!SEALED.equals(this.fileStatus.get())) {
- return result;
- }
+ List<IndexItem> result = new ArrayList<>();
+ try {
+ fileReadWriteLock.readLock().lock();
+ if (!UNSEALED.equals(this.fileStatus.get()) &&
!SEALED.equals(this.fileStatus.get())) {
+ return CompletableFuture.completedFuture(result);
+ }
- if (mappedFile == null || !mappedFile.hold()) {
- return result;
- }
+ if (mappedFile == null || !mappedFile.hold()) {
+ return CompletableFuture.completedFuture(result);
+ }
- int hashCode = this.hashCode(key);
- int slotPosition = this.getSlotPosition(hashCode %
this.hashSlotMaxCount);
- int slotValue = this.getSlotValue(slotPosition);
+ int hashCode = this.hashCode(key);
+ int slotPosition = this.getSlotPosition(hashCode %
this.hashSlotMaxCount);
+ int slotValue = this.getSlotValue(slotPosition);
- int left = MAX_QUERY_COUNT;
- while (left > 0 &&
- slotValue > INVALID_INDEX &&
- slotValue <= this.indexItemCount.get()) {
+ int left = MAX_QUERY_COUNT;
+ while (left > 0 &&
+ slotValue > INVALID_INDEX &&
+ slotValue <= this.indexItemCount.get()) {
- byte[] bytes = new byte[IndexItem.INDEX_ITEM_SIZE];
- ByteBuffer buffer = this.byteBuffer.duplicate();
- buffer.position(this.getItemPosition(slotValue));
- buffer.get(bytes);
- IndexItem indexItem = new IndexItem(bytes);
- long storeTimestamp = indexItem.getTimeDiff() +
beginTimestamp.get();
- if (hashCode == indexItem.getHashCode() &&
- beginTime <= storeTimestamp && storeTimestamp <=
endTime) {
- result.add(indexItem);
- if (result.size() > maxCount) {
- break;
- }
+ byte[] bytes = new byte[IndexItem.INDEX_ITEM_SIZE];
+ ByteBuffer buffer = this.byteBuffer.duplicate();
+ buffer.position(this.getItemPosition(slotValue));
+ buffer.get(bytes);
+ IndexItem indexItem = new IndexItem(bytes);
+ long storeTimestamp = indexItem.getTimeDiff() +
beginTimestamp.get();
+ if (hashCode == indexItem.getHashCode() &&
+ beginTime <= storeTimestamp && storeTimestamp <= endTime) {
+ result.add(indexItem);
+ if (result.size() > maxCount) {
+ break;
}
- slotValue = indexItem.getItemIndex();
- left--;
}
-
- log.debug("IndexStoreFile query from unsealed mapped file,
timestamp: {}, result size: {}, " +
- "key: {}, hashCode: {}, maxCount: {}, timestamp={}-{}",
- getTimestamp(), result.size(), key, hashCode, maxCount,
beginTime, endTime);
- } catch (Exception e) {
- log.error("IndexStoreFile query from unsealed mapped file
error, timestamp: {}, " +
- "key: {}, maxCount: {}, timestamp={}-{}", getTimestamp(),
key, maxCount, beginTime, endTime, e);
- } finally {
- fileReadWriteLock.readLock().unlock();
- mappedFile.release();
+ slotValue = indexItem.getItemIndex();
+ left--;
}
- return result;
- }, MessageStoreExecutor.getInstance().bufferFetchExecutor);
+
+ log.debug("IndexStoreFile query from unsealed mapped file,
timestamp: {}, result size: {}, " +
+ "key: {}, hashCode: {}, maxCount: {}, timestamp={}-{}",
+ getTimestamp(), result.size(), key, hashCode, maxCount,
beginTime, endTime);
+ } catch (Exception e) {
+ log.error("IndexStoreFile query from unsealed mapped file error,
timestamp: {}, " +
+ "key: {}, maxCount: {}, timestamp={}-{}", getTimestamp(), key,
maxCount, beginTime, endTime, e);
+ } finally {
+ fileReadWriteLock.readLock().unlock();
+ mappedFile.release();
+ }
+
+ return CompletableFuture.completedFuture(result);
}
protected CompletableFuture<List<IndexItem>> queryAsyncFromSegmentFile(
@@ -465,7 +463,7 @@ public class IndexStoreFile implements IndexFile {
fileReadWriteLock.writeLock().lock();
this.fileStatus.set(IndexStatusEnum.SHUTDOWN);
if (this.fileSegment != null && this.fileSegment instanceof
PosixFileSegment) {
- ((PosixFileSegment) this.fileSegment).close();
+ this.fileSegment.close();
}
if (this.mappedFile != null) {
this.mappedFile.shutdown(TimeUnit.SECONDS.toMillis(10));
diff --git
a/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/provider/PosixFileSegment.java
b/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/provider/PosixFileSegment.java
index fb150c928c..656af2ba1c 100644
---
a/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/provider/PosixFileSegment.java
+++
b/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/provider/PosixFileSegment.java
@@ -30,7 +30,6 @@ import java.util.concurrent.CompletableFuture;
import java.util.concurrent.TimeUnit;
import org.apache.commons.lang3.StringUtils;
import org.apache.rocketmq.tieredstore.MessageStoreConfig;
-import org.apache.rocketmq.tieredstore.MessageStoreExecutor;
import org.apache.rocketmq.tieredstore.common.FileSegmentType;
import org.apache.rocketmq.tieredstore.metrics.TieredStoreMetricsManager;
import org.apache.rocketmq.tieredstore.stream.FileSegmentInputStream;
@@ -230,6 +229,6 @@ public class PosixFileSegment extends FileSegment {
return false;
}
return true;
- }, MessageStoreExecutor.getInstance().bufferCommitExecutor);
+ });
}
}