This is an automated email from the ASF dual-hosted git repository.
caigy pushed a commit to branch develop
in repository https://gitbox.apache.org/repos/asf/rocketmq.git
The following commit(s) were added to refs/heads/develop by this push:
new 86d59d2485 Add the ability to write ConsumeQueue using fileChannel to
prevent JVM crashes in some situations (#8403)
86d59d2485 is described below
commit 86d59d2485b5fed162db3743e11c0902de3e34ad
Author: rongtong <[email protected]>
AuthorDate: Mon Jul 22 17:20:11 2024 +0800
Add the ability to write ConsumeQueue using fileChannel to prevent JVM
crashes in some situations (#8403)
---
.../org/apache/rocketmq/store/ConsumeQueue.java | 15 ++++++++++--
.../rocketmq/store/config/MessageStoreConfig.java | 10 ++++++++
.../rocketmq/store/logfile/DefaultMappedFile.java | 27 ++++++++++++++++++----
.../apache/rocketmq/store/logfile/MappedFile.java | 11 +++++++++
.../rocketmq/store/queue/BatchConsumeQueue.java | 7 +++++-
.../rocketmq/store/queue/SparseConsumeQueue.java | 10 +++++++-
6 files changed, 71 insertions(+), 9 deletions(-)
diff --git a/store/src/main/java/org/apache/rocketmq/store/ConsumeQueue.java
b/store/src/main/java/org/apache/rocketmq/store/ConsumeQueue.java
index 569cc3cfaa..eb8af4ab19 100644
--- a/store/src/main/java/org/apache/rocketmq/store/ConsumeQueue.java
+++ b/store/src/main/java/org/apache/rocketmq/store/ConsumeQueue.java
@@ -833,7 +833,13 @@ public class ConsumeQueue implements
ConsumeQueueInterface, FileQueueLifeCycle {
}
}
this.setMaxPhysicOffset(offset + size);
- return mappedFile.appendMessage(this.byteBufferIndex.array());
+ boolean appendResult;
+ if
(messageStore.getMessageStoreConfig().isPutConsumeQueueDataByFileChannel()) {
+ appendResult =
mappedFile.appendMessageUsingFileChannel(this.byteBufferIndex.array());
+ } else {
+ appendResult =
mappedFile.appendMessage(this.byteBufferIndex.array());
+ }
+ return appendResult;
}
return false;
}
@@ -846,7 +852,12 @@ public class ConsumeQueue implements
ConsumeQueueInterface, FileQueueLifeCycle {
int until = (int) (untilWhere %
this.mappedFileQueue.getMappedFileSize());
for (int i = 0; i < until; i += CQ_STORE_UNIT_SIZE) {
- mappedFile.appendMessage(byteBuffer.array());
+ if
(messageStore.getMessageStoreConfig().isPutConsumeQueueDataByFileChannel()) {
+ mappedFile.appendMessageUsingFileChannel(byteBuffer.array());
+ } else {
+ mappedFile.appendMessage(byteBuffer.array());
+ }
+
}
}
diff --git
a/store/src/main/java/org/apache/rocketmq/store/config/MessageStoreConfig.java
b/store/src/main/java/org/apache/rocketmq/store/config/MessageStoreConfig.java
index 0060b144cf..5b2a1931b3 100644
---
a/store/src/main/java/org/apache/rocketmq/store/config/MessageStoreConfig.java
+++
b/store/src/main/java/org/apache/rocketmq/store/config/MessageStoreConfig.java
@@ -419,6 +419,8 @@ public class MessageStoreConfig {
*/
private boolean readUnCommitted = false;
+ private boolean putConsumeQueueDataByFileChannel = true;
+
public boolean isEnabledAppendPropCRC() {
return enabledAppendPropCRC;
}
@@ -1832,4 +1834,12 @@ public class MessageStoreConfig {
public void setReadUnCommitted(boolean readUnCommitted) {
this.readUnCommitted = readUnCommitted;
}
+
+ public boolean isPutConsumeQueueDataByFileChannel() {
+ return putConsumeQueueDataByFileChannel;
+ }
+
+ public void setPutConsumeQueueDataByFileChannel(boolean
putConsumeQueueDataByFileChannel) {
+ this.putConsumeQueueDataByFileChannel =
putConsumeQueueDataByFileChannel;
+ }
}
diff --git
a/store/src/main/java/org/apache/rocketmq/store/logfile/DefaultMappedFile.java
b/store/src/main/java/org/apache/rocketmq/store/logfile/DefaultMappedFile.java
index 03477c3324..c490d093a1 100644
---
a/store/src/main/java/org/apache/rocketmq/store/logfile/DefaultMappedFile.java
+++
b/store/src/main/java/org/apache/rocketmq/store/logfile/DefaultMappedFile.java
@@ -97,14 +97,14 @@ public class DefaultMappedFile extends AbstractMappedFile {
protected long mappedByteBufferAccessCountSinceLastSwap = 0L;
/**
- * If this mapped file belongs to consume queue, this field stores
store-timestamp of first message referenced
- * by this logical queue.
+ * If this mapped file belongs to consume queue, this field stores
store-timestamp of first message referenced by
+ * this logical queue.
*/
private long startTimestamp = -1;
/**
- * If this mapped file belongs to consume queue, this field stores
store-timestamp of last message referenced
- * by this logical queue.
+ * If this mapped file belongs to consume queue, this field stores
store-timestamp of last message referenced by
+ * this logical queue.
*/
private long stopTimestamp = -1;
@@ -357,6 +357,24 @@ public class DefaultMappedFile extends AbstractMappedFile {
return false;
}
+ @Override
+ public boolean appendMessageUsingFileChannel(byte[] data) {
+ int currentPos = WROTE_POSITION_UPDATER.get(this);
+
+ if ((currentPos + data.length) <= this.fileSize) {
+ try {
+ this.fileChannel.position(currentPos);
+ this.fileChannel.write(ByteBuffer.wrap(data, 0, data.length));
+ } catch (Throwable e) {
+ log.error("Error occurred when append message to mappedFile.",
e);
+ }
+ WROTE_POSITION_UPDATER.addAndGet(this, data.length);
+ return true;
+ }
+
+ return false;
+ }
+
/**
* @return The current flushed position
*/
@@ -840,7 +858,6 @@ public class DefaultMappedFile extends AbstractMappedFile {
this.stopTimestamp = stopTimestamp;
}
-
public Iterator<SelectMappedBufferResult> iterator(int startPos) {
return new Itr(startPos);
}
diff --git
a/store/src/main/java/org/apache/rocketmq/store/logfile/MappedFile.java
b/store/src/main/java/org/apache/rocketmq/store/logfile/MappedFile.java
index dfcf66f088..fd70d6c563 100644
--- a/store/src/main/java/org/apache/rocketmq/store/logfile/MappedFile.java
+++ b/store/src/main/java/org/apache/rocketmq/store/logfile/MappedFile.java
@@ -101,12 +101,23 @@ public interface MappedFile {
/**
* Appends a raw message data represents by a byte array to the current
{@code MappedFile}.
+ * Using mappedByteBuffer
*
* @param data the byte array to append
* @return true if success; false otherwise.
*/
boolean appendMessage(byte[] data);
+
+ /**
+ * Appends a raw message data represents by a byte array to the current
{@code MappedFile}.
+ * Using fileChannel
+ *
+ * @param data the byte array to append
+ * @return true if success; false otherwise.
+ */
+ boolean appendMessageUsingFileChannel(byte[] data);
+
/**
* Appends a raw message data represents by a byte array to the current
{@code MappedFile}.
*
diff --git
a/store/src/main/java/org/apache/rocketmq/store/queue/BatchConsumeQueue.java
b/store/src/main/java/org/apache/rocketmq/store/queue/BatchConsumeQueue.java
index 7108c835c8..1617182724 100644
--- a/store/src/main/java/org/apache/rocketmq/store/queue/BatchConsumeQueue.java
+++ b/store/src/main/java/org/apache/rocketmq/store/queue/BatchConsumeQueue.java
@@ -587,7 +587,12 @@ public class BatchConsumeQueue implements
ConsumeQueueInterface {
MappedFile mappedFile =
this.mappedFileQueue.getLastMappedFile(this.mappedFileQueue.getMaxOffset());
if (mappedFile != null) {
boolean isNewFile = isNewFile(mappedFile);
- boolean appendRes =
mappedFile.appendMessage(this.byteBufferItem.array());
+ boolean appendRes;
+ if
(messageStore.getMessageStoreConfig().isPutConsumeQueueDataByFileChannel()) {
+ appendRes =
mappedFile.appendMessageUsingFileChannel(this.byteBufferItem.array());
+ } else {
+ appendRes =
mappedFile.appendMessage(this.byteBufferItem.array());
+ }
if (appendRes) {
maxMsgPhyOffsetInCommitLog = offset;
maxOffsetInQueue = msgBaseOffset + batchSize;
diff --git
a/store/src/main/java/org/apache/rocketmq/store/queue/SparseConsumeQueue.java
b/store/src/main/java/org/apache/rocketmq/store/queue/SparseConsumeQueue.java
index 4a5f3a93b1..7e14de30ab 100644
---
a/store/src/main/java/org/apache/rocketmq/store/queue/SparseConsumeQueue.java
+++
b/store/src/main/java/org/apache/rocketmq/store/queue/SparseConsumeQueue.java
@@ -262,7 +262,15 @@ public class SparseConsumeQueue extends BatchConsumeQueue {
this.byteBufferItem.putShort((short)0);
this.byteBufferItem.putInt(INVALID_POS);
this.byteBufferItem.putInt(0); // 4 bytes reserved
- boolean appendRes =
mappedFile.appendMessage(this.byteBufferItem.array());
+
+ boolean appendRes;
+
+ if
(messageStore.getMessageStoreConfig().isPutConsumeQueueDataByFileChannel()) {
+ appendRes =
mappedFile.appendMessageUsingFileChannel(this.byteBufferItem.array());
+ } else {
+ appendRes =
mappedFile.appendMessage(this.byteBufferItem.array());
+ }
+
if (!appendRes) {
log.error("append end position info into {} failed",
mappedFile.getFileName());
}