This is an automated email from the ASF dual-hosted git repository.
vongosling pushed a commit to branch develop
in repository https://gitbox.apache.org/repos/asf/rocketmq.git
The following commit(s) were added to refs/heads/develop by this push:
new 3f48c17 [ISSUE# 2330] Store the properties of MessageBatch (#2343)
3f48c17 is described below
commit 3f48c174343223ec0dc5258595c19c8291d5e21d
Author: AVP42 <[email protected]>
AuthorDate: Mon Oct 19 08:48:45 2020 +0800
[ISSUE# 2330] Store the properties of MessageBatch (#2343)
* [ISSUE# 2330]store the properties of MessageBatch
* [ISSUE# 2330]fix style problem
* [ISSUE# 2330]Undo newly added property in the MessageBatch
Co-authored-by: wufc <[email protected]>
---
.../main/java/org/apache/rocketmq/store/CommitLog.java | 16 +++++++++++++---
.../org/apache/rocketmq/store/BatchPutMessageTest.java | 10 +++++++++-
2 files changed, 22 insertions(+), 4 deletions(-)
diff --git a/store/src/main/java/org/apache/rocketmq/store/CommitLog.java
b/store/src/main/java/org/apache/rocketmq/store/CommitLog.java
index b6d17da..d489e84 100644
--- a/store/src/main/java/org/apache/rocketmq/store/CommitLog.java
+++ b/store/src/main/java/org/apache/rocketmq/store/CommitLog.java
@@ -1795,6 +1795,11 @@ public class CommitLog {
ByteBuffer bornHostHolder = ByteBuffer.allocate(bornHostLength);
ByteBuffer storeHostHolder = ByteBuffer.allocate(storeHostLength);
+ // properties from MessageExtBatch
+ String batchPropStr =
MessageDecoder.messageProperties2String(messageExtBatch.getProperties());
+ final byte[] batchPropData =
batchPropStr.getBytes(MessageDecoder.CHARSET_UTF8);
+ final short batchPropLen = (short) batchPropData.length;
+
while (messagesByteBuff.hasRemaining()) {
// 1 TOTALSIZE
messagesByteBuff.getInt();
@@ -1818,7 +1823,8 @@ public class CommitLog {
final int topicLength = topicData.length;
- final int msgLen = calMsgLength(messageExtBatch.getSysFlag(),
bodyLen, topicLength, propertiesLen);
+ final int msgLen = calMsgLength(messageExtBatch.getSysFlag(),
bodyLen, topicLength,
+ propertiesLen + batchPropLen);
// Exceeds the maximum message
if (msgLen > this.maxMessageSize) {
@@ -1871,9 +1877,13 @@ public class CommitLog {
this.msgBatchMemory.put((byte) topicLength);
this.msgBatchMemory.put(topicData);
// 17 PROPERTIES
- this.msgBatchMemory.putShort(propertiesLen);
- if (propertiesLen > 0)
+ this.msgBatchMemory.putShort((short) (propertiesLen +
batchPropLen));
+ if (propertiesLen > 0) {
this.msgBatchMemory.put(messagesByteBuff.array(),
propertiesPos, propertiesLen);
+ }
+ if (batchPropLen > 0) {
+ this.msgBatchMemory.put(batchPropData, 0, batchPropLen);
+ }
}
msgBatchMemory.flip();
return msgBatchMemory;
diff --git
a/store/src/test/java/org/apache/rocketmq/store/BatchPutMessageTest.java
b/store/src/test/java/org/apache/rocketmq/store/BatchPutMessageTest.java
index 8618dbb..b3a7c19 100644
--- a/store/src/test/java/org/apache/rocketmq/store/BatchPutMessageTest.java
+++ b/store/src/test/java/org/apache/rocketmq/store/BatchPutMessageTest.java
@@ -35,6 +35,7 @@ import java.io.File;
import java.net.InetSocketAddress;
import java.nio.charset.Charset;
import java.util.ArrayList;
+import java.util.HashMap;
import java.util.List;
import java.util.Map;
@@ -80,6 +81,12 @@ public class BatchPutMessageTest {
@Test
public void testPutMessages() throws Exception {
+ String batchPropK = "extraKey";
+ String batchPropV = "extraValue";
+ Map<String, String> batchProp = new HashMap<>(1);
+ batchProp.put(batchPropK, batchPropV);
+ short batchPropLen = (short)
messageProperties2String(batchProp).getBytes(MessageDecoder.CHARSET_UTF8).length;
+
List<Message> messages = new ArrayList<>();
String topic = "batch-write-topic";
int queue = 0;
@@ -98,7 +105,7 @@ public class BatchPutMessageTest {
short propertiesLength = (short) propertiesBytes.length;
final byte[] topicData =
msg.getTopic().getBytes(MessageDecoder.CHARSET_UTF8);
final int topicLength = topicData.length;
- msgLengthArr[j] = calMsgLength(msg.getBody().length, topicLength,
propertiesLength) + msgLengthArr[j - 1];
+ msgLengthArr[j] = calMsgLength(msg.getBody().length, topicLength,
propertiesLength+batchPropLen) + msgLengthArr[j - 1];
j++;
}
byte[] batchMessageBody = MessageDecoder.encodeMessages(messages);
@@ -106,6 +113,7 @@ public class BatchPutMessageTest {
messageExtBatch.setTopic(topic);
messageExtBatch.setQueueId(queue);
messageExtBatch.setBody(batchMessageBody);
+ messageExtBatch.putUserProperty(batchPropK,batchPropV);
messageExtBatch.setBornTimestamp(System.currentTimeMillis());
messageExtBatch.setStoreHost(new InetSocketAddress("127.0.0.1", 125));
messageExtBatch.setBornHost(new InetSocketAddress("127.0.0.1", 126));