This is an automated email from the ASF dual-hosted git repository.
huzongtang pushed a commit to branch develop
in repository https://gitbox.apache.org/repos/asf/rocketmq.git
The following commit(s) were added to refs/heads/develop by this push:
new 3183122 Eliminate array copy (#2886)
3183122 is described below
commit 3183122c01cc2d0dc005634d028ccbcb62495bf6
Author: huangli <[email protected]>
AuthorDate: Tue Jul 6 21:47:12 2021 +0800
Eliminate array copy (#2886)
[Part C] Improve produce performance in M/S mode.
---
.../rocketmq/broker/plugin/AbstractPluginMessageStore.java | 4 ++--
store/src/main/java/org/apache/rocketmq/store/CommitLog.java | 4 ++--
.../java/org/apache/rocketmq/store/DefaultMessageStore.java | 4 ++--
.../src/main/java/org/apache/rocketmq/store/MessageStore.java | 4 +++-
.../org/apache/rocketmq/store/dledger/DLedgerCommitLog.java | 2 +-
.../src/main/java/org/apache/rocketmq/store/ha/HAService.java | 10 ++++------
6 files changed, 14 insertions(+), 14 deletions(-)
diff --git
a/broker/src/main/java/org/apache/rocketmq/broker/plugin/AbstractPluginMessageStore.java
b/broker/src/main/java/org/apache/rocketmq/broker/plugin/AbstractPluginMessageStore.java
index 1db019b..b95bab6 100644
---
a/broker/src/main/java/org/apache/rocketmq/broker/plugin/AbstractPluginMessageStore.java
+++
b/broker/src/main/java/org/apache/rocketmq/broker/plugin/AbstractPluginMessageStore.java
@@ -180,8 +180,8 @@ public abstract class AbstractPluginMessageStore implements
MessageStore {
}
@Override
- public boolean appendToCommitLog(long startOffset, byte[] data) {
- return next.appendToCommitLog(startOffset, data);
+ public boolean appendToCommitLog(long startOffset, byte[] data, int
dataStart, int dataLength) {
+ return next.appendToCommitLog(startOffset, data, dataStart,
dataLength);
}
@Override
diff --git a/store/src/main/java/org/apache/rocketmq/store/CommitLog.java
b/store/src/main/java/org/apache/rocketmq/store/CommitLog.java
index 43b01f0..57fa363 100644
--- a/store/src/main/java/org/apache/rocketmq/store/CommitLog.java
+++ b/store/src/main/java/org/apache/rocketmq/store/CommitLog.java
@@ -1185,7 +1185,7 @@ public class CommitLog {
this.mappedFileQueue.destroy();
}
- public boolean appendData(long startOffset, byte[] data) {
+ public boolean appendData(long startOffset, byte[] data, int dataStart,
int dataLength) {
putMessageLock.lock();
try {
MappedFile mappedFile =
this.mappedFileQueue.getLastMappedFile(startOffset);
@@ -1194,7 +1194,7 @@ public class CommitLog {
return false;
}
- return mappedFile.appendMessage(data);
+ return mappedFile.appendMessage(data, dataStart, dataLength);
} finally {
putMessageLock.unlock();
}
diff --git
a/store/src/main/java/org/apache/rocketmq/store/DefaultMessageStore.java
b/store/src/main/java/org/apache/rocketmq/store/DefaultMessageStore.java
index b8ecdee..7dd5a32 100644
--- a/store/src/main/java/org/apache/rocketmq/store/DefaultMessageStore.java
+++ b/store/src/main/java/org/apache/rocketmq/store/DefaultMessageStore.java
@@ -921,13 +921,13 @@ public class DefaultMessageStore implements MessageStore {
}
@Override
- public boolean appendToCommitLog(long startOffset, byte[] data) {
+ public boolean appendToCommitLog(long startOffset, byte[] data, int
dataStart, int dataLength) {
if (this.shutdown) {
log.warn("message store has shutdown, so appendToPhyQueue is
forbidden");
return false;
}
- boolean result = this.commitLog.appendData(startOffset, data);
+ boolean result = this.commitLog.appendData(startOffset, data,
dataStart, dataLength);
if (result) {
this.reputMessageService.wakeup();
} else {
diff --git a/store/src/main/java/org/apache/rocketmq/store/MessageStore.java
b/store/src/main/java/org/apache/rocketmq/store/MessageStore.java
index 64eb525..a8c658b 100644
--- a/store/src/main/java/org/apache/rocketmq/store/MessageStore.java
+++ b/store/src/main/java/org/apache/rocketmq/store/MessageStore.java
@@ -245,9 +245,11 @@ public interface MessageStore {
*
* @param startOffset starting offset.
* @param data data to append.
+ * @param dataStart the start index of data array
+ * @param dataLength the length of data array
* @return true if success; false otherwise.
*/
- boolean appendToCommitLog(final long startOffset, final byte[] data);
+ boolean appendToCommitLog(final long startOffset, final byte[] data, int
dataStart, int dataLength);
/**
* Execute file deletion manually.
diff --git
a/store/src/main/java/org/apache/rocketmq/store/dledger/DLedgerCommitLog.java
b/store/src/main/java/org/apache/rocketmq/store/dledger/DLedgerCommitLog.java
index 3b98760..ea791bd 100644
---
a/store/src/main/java/org/apache/rocketmq/store/dledger/DLedgerCommitLog.java
+++
b/store/src/main/java/org/apache/rocketmq/store/dledger/DLedgerCommitLog.java
@@ -905,7 +905,7 @@ public class DLedgerCommitLog extends CommitLog {
}
@Override
- public boolean appendData(long startOffset, byte[] data) {
+ public boolean appendData(long startOffset, byte[] data, int dataStart,
int dataLength) {
//the old ha service will invoke method, here to prevent it
return false;
}
diff --git a/store/src/main/java/org/apache/rocketmq/store/ha/HAService.java
b/store/src/main/java/org/apache/rocketmq/store/ha/HAService.java
index d4d4109..845935b 100644
--- a/store/src/main/java/org/apache/rocketmq/store/ha/HAService.java
+++ b/store/src/main/java/org/apache/rocketmq/store/ha/HAService.java
@@ -440,7 +440,6 @@ public class HAService {
private boolean dispatchReadRequest() {
final int msgHeaderSize = 8 + 4; // phyoffset + size
- int readSocketPos = this.byteBufferRead.position();
while (true) {
int diff = this.byteBufferRead.position() -
this.dispatchPosition;
@@ -459,13 +458,12 @@ public class HAService {
}
if (diff >= (msgHeaderSize + bodySize)) {
- byte[] bodyData = new byte[bodySize];
- this.byteBufferRead.position(this.dispatchPosition +
msgHeaderSize);
- this.byteBufferRead.get(bodyData);
+ byte[] bodyData = byteBufferRead.array();
+ int dataStart = this.dispatchPosition + msgHeaderSize;
-
HAService.this.defaultMessageStore.appendToCommitLog(masterPhyOffset, bodyData);
+ HAService.this.defaultMessageStore.appendToCommitLog(
+ masterPhyOffset, bodyData, dataStart,
bodySize);
- this.byteBufferRead.position(readSocketPos);
this.dispatchPosition += msgHeaderSize + bodySize;
if (!reportSlaveMaxOffsetPlus()) {