This is an automated email from the ASF dual-hosted git repository.
aaronai pushed a commit to branch develop
in repository https://gitbox.apache.org/repos/asf/rocketmq.git
The following commit(s) were added to refs/heads/develop by this push:
new a6d341d13 [ISSUE #3905] Support bname in protocol for 5.0 client
(#5334)
a6d341d13 is described below
commit a6d341d136000372f27fc95b18ce582bd0d19401
Author: Zhouxiang Zhan <[email protected]>
AuthorDate: Mon Oct 17 18:28:19 2022 +0800
[ISSUE #3905] Support bname in protocol for 5.0 client (#5334)
* [ISSUE #3905] Support bname in protocol for 5.0 client
* add bname for `CheckTransactionStateRequestHeader`,
`ConsumerSendMsgBackRequestHeader`, `EndTransactionRequestHeader`,
`SendMessageRequestHeader`
* * add bname for `AckMessageRequestHeader`, `PeekMessageRequestHeader`,
`PopMessageRequestHeader`,
`ChangeInvisibleTimeRequestHeader`, `NotificationRequestHeader` and
`PollingInfoRequestHeader`
---
.../AbstractTransactionalMessageCheckListener.java | 1 +
.../java/org/apache/rocketmq/client/impl/MQClientAPIImpl.java | 2 ++
.../client/impl/consumer/DefaultMQPullConsumerImpl.java | 4 ++--
.../client/impl/consumer/DefaultMQPushConsumerImpl.java | 4 +++-
.../apache/rocketmq/client/impl/consumer/PullAPIWrapper.java | 1 +
.../rocketmq/client/impl/producer/DefaultMQProducerImpl.java | 3 +++
.../common/protocol/header/AckMessageRequestHeader.java | 4 ++--
.../protocol/header/ChangeInvisibleTimeRequestHeader.java | 4 ++--
.../protocol/header/CheckTransactionStateRequestHeader.java | 4 ++--
.../protocol/header/ConsumerSendMsgBackRequestHeader.java | 4 ++--
.../common/protocol/header/EndTransactionRequestHeader.java | 4 ++--
.../common/protocol/header/NotificationRequestHeader.java | 8 ++++----
.../common/protocol/header/PeekMessageRequestHeader.java | 8 ++++----
.../common/protocol/header/PollingInfoRequestHeader.java | 8 ++++----
.../common/protocol/header/PopMessageRequestHeader.java | 10 +++++-----
.../common/protocol/header/SendMessageRequestHeaderV2.java | 11 +++++++++++
16 files changed, 50 insertions(+), 30 deletions(-)
diff --git
a/broker/src/main/java/org/apache/rocketmq/broker/transaction/AbstractTransactionalMessageCheckListener.java
b/broker/src/main/java/org/apache/rocketmq/broker/transaction/AbstractTransactionalMessageCheckListener.java
index beda6504c..6ed015b99 100644
---
a/broker/src/main/java/org/apache/rocketmq/broker/transaction/AbstractTransactionalMessageCheckListener.java
+++
b/broker/src/main/java/org/apache/rocketmq/broker/transaction/AbstractTransactionalMessageCheckListener.java
@@ -56,6 +56,7 @@ public abstract class
AbstractTransactionalMessageCheckListener {
checkTransactionStateRequestHeader.setMsgId(msgExt.getUserProperty(MessageConst.PROPERTY_UNIQ_CLIENT_MESSAGE_ID_KEYIDX));
checkTransactionStateRequestHeader.setTransactionId(checkTransactionStateRequestHeader.getMsgId());
checkTransactionStateRequestHeader.setTranStateTableOffset(msgExt.getQueueOffset());
+
checkTransactionStateRequestHeader.setBname(brokerController.getBrokerConfig().getBrokerName());
msgExt.setTopic(msgExt.getUserProperty(MessageConst.PROPERTY_REAL_TOPIC));
msgExt.setQueueId(Integer.parseInt(msgExt.getUserProperty(MessageConst.PROPERTY_REAL_QUEUE_ID)));
msgExt.setStoreSize(0);
diff --git
a/client/src/main/java/org/apache/rocketmq/client/impl/MQClientAPIImpl.java
b/client/src/main/java/org/apache/rocketmq/client/impl/MQClientAPIImpl.java
index b327ee28b..5f393cb57 100644
--- a/client/src/main/java/org/apache/rocketmq/client/impl/MQClientAPIImpl.java
+++ b/client/src/main/java/org/apache/rocketmq/client/impl/MQClientAPIImpl.java
@@ -1451,6 +1451,7 @@ public class MQClientAPIImpl implements
NameServerUpdateCallback {
public void consumerSendMessageBack(
final String addr,
+ final String brokerName,
final MessageExt msg,
final String consumerGroup,
final int delayLevel,
@@ -1466,6 +1467,7 @@ public class MQClientAPIImpl implements
NameServerUpdateCallback {
requestHeader.setDelayLevel(delayLevel);
requestHeader.setOriginMsgId(msg.getMsgId());
requestHeader.setMaxReconsumeTimes(maxConsumeRetryTimes);
+ requestHeader.setBname(brokerName);
RemotingCommand response =
this.remotingClient.invokeSync(MixAll.brokerVIPChannel(this.clientConfig.isVipChannelEnabled(),
addr),
request, timeoutMillis);
diff --git
a/client/src/main/java/org/apache/rocketmq/client/impl/consumer/DefaultMQPullConsumerImpl.java
b/client/src/main/java/org/apache/rocketmq/client/impl/consumer/DefaultMQPullConsumerImpl.java
index 66f3578fe..96f31724e 100644
---
a/client/src/main/java/org/apache/rocketmq/client/impl/consumer/DefaultMQPullConsumerImpl.java
+++
b/client/src/main/java/org/apache/rocketmq/client/impl/consumer/DefaultMQPullConsumerImpl.java
@@ -622,8 +622,8 @@ public class DefaultMQPullConsumerImpl implements
MQConsumerInner {
consumerGroup = this.defaultMQPullConsumer.getConsumerGroup();
}
-
this.mQClientFactory.getMQClientAPIImpl().consumerSendMessageBack(brokerAddr,
msg, consumerGroup, delayLevel, 3000,
- this.defaultMQPullConsumer.getMaxReconsumeTimes());
+
this.mQClientFactory.getMQClientAPIImpl().consumerSendMessageBack(brokerAddr,
brokerName, msg, consumerGroup,
+ delayLevel, 3000,
this.defaultMQPullConsumer.getMaxReconsumeTimes());
} catch (Exception e) {
log.error("sendMessageBack Exception, " +
this.defaultMQPullConsumer.getConsumerGroup(), e);
diff --git
a/client/src/main/java/org/apache/rocketmq/client/impl/consumer/DefaultMQPushConsumerImpl.java
b/client/src/main/java/org/apache/rocketmq/client/impl/consumer/DefaultMQPushConsumerImpl.java
index 7dc212dd1..207894c49 100644
---
a/client/src/main/java/org/apache/rocketmq/client/impl/consumer/DefaultMQPushConsumerImpl.java
+++
b/client/src/main/java/org/apache/rocketmq/client/impl/consumer/DefaultMQPushConsumerImpl.java
@@ -732,7 +732,7 @@ public class DefaultMQPushConsumerImpl implements
MQConsumerInner {
} else {
String brokerAddr = (null != brokerName) ?
this.mQClientFactory.findBrokerAddressInPublish(brokerName)
:
RemotingHelper.parseSocketAddressAddr(msg.getStoreHost());
-
this.mQClientFactory.getMQClientAPIImpl().consumerSendMessageBack(brokerAddr,
msg,
+
this.mQClientFactory.getMQClientAPIImpl().consumerSendMessageBack(brokerAddr,
brokerName, msg,
this.defaultMQPushConsumer.getConsumerGroup(),
delayLevel, 5000, getMaxReconsumeTimes());
}
} catch (Exception e) {
@@ -794,6 +794,7 @@ public class DefaultMQPushConsumerImpl implements
MQConsumerInner {
requestHeader.setOffset(queueOffset);
requestHeader.setConsumerGroup(consumerGroup);
requestHeader.setExtraInfo(extraInfo);
+ requestHeader.setBname(brokerName);
this.mQClientFactory.getMQClientAPIImpl().ackMessageAsync(findBrokerResult.getBrokerAddr(),
ASYNC_TIMEOUT, new AckCallback() {
@Override
public void onSuccess(AckResult ackResult) {
@@ -837,6 +838,7 @@ public class DefaultMQPushConsumerImpl implements
MQConsumerInner {
requestHeader.setConsumerGroup(consumerGroup);
requestHeader.setExtraInfo(extraInfo);
requestHeader.setInvisibleTime(invisibleTime);
+ requestHeader.setBname(brokerName);
//here the broker should be polished
this.mQClientFactory.getMQClientAPIImpl().changeInvisibleTimeAsync(brokerName,
findBrokerResult.getBrokerAddr(), requestHeader, ASYNC_TIMEOUT, callback);
return;
diff --git
a/client/src/main/java/org/apache/rocketmq/client/impl/consumer/PullAPIWrapper.java
b/client/src/main/java/org/apache/rocketmq/client/impl/consumer/PullAPIWrapper.java
index 92dded34d..6d782a37a 100644
---
a/client/src/main/java/org/apache/rocketmq/client/impl/consumer/PullAPIWrapper.java
+++
b/client/src/main/java/org/apache/rocketmq/client/impl/consumer/PullAPIWrapper.java
@@ -379,6 +379,7 @@ public class PullAPIWrapper {
requestHeader.setExpType(expressionType);
requestHeader.setExp(expression);
requestHeader.setOrder(order);
+ requestHeader.setBname(mq.getBrokerName());
//give 1000 ms for server response
if (poll) {
requestHeader.setPollTime(timeout);
diff --git
a/client/src/main/java/org/apache/rocketmq/client/impl/producer/DefaultMQProducerImpl.java
b/client/src/main/java/org/apache/rocketmq/client/impl/producer/DefaultMQProducerImpl.java
index d97266319..b40f536fd 100644
---
a/client/src/main/java/org/apache/rocketmq/client/impl/producer/DefaultMQProducerImpl.java
+++
b/client/src/main/java/org/apache/rocketmq/client/impl/producer/DefaultMQProducerImpl.java
@@ -375,6 +375,7 @@ public class DefaultMQProducerImpl implements
MQProducerInner {
thisHeader.setProducerGroup(producerGroup);
thisHeader.setTranStateTableOffset(checkRequestHeader.getTranStateTableOffset());
thisHeader.setFromTransactionCheck(true);
+ thisHeader.setBname(checkRequestHeader.getBname());
String uniqueKey =
message.getProperties().get(MessageConst.PROPERTY_UNIQ_CLIENT_MESSAGE_ID_KEYIDX);
if (uniqueKey == null) {
@@ -835,6 +836,7 @@ public class DefaultMQProducerImpl implements
MQProducerInner {
requestHeader.setReconsumeTimes(0);
requestHeader.setUnitMode(this.isUnitMode());
requestHeader.setBatch(msg instanceof MessageBatch);
+ requestHeader.setBname(brokerName);
if
(requestHeader.getTopic().startsWith(MixAll.RETRY_GROUP_TOPIC_PREFIX)) {
String reconsumeTimes =
MessageAccessor.getReconsumeTime(msg);
if (reconsumeTimes != null) {
@@ -1365,6 +1367,7 @@ public class DefaultMQProducerImpl implements
MQProducerInner {
EndTransactionRequestHeader requestHeader = new
EndTransactionRequestHeader();
requestHeader.setTransactionId(transactionId);
requestHeader.setCommitLogOffset(id.getOffset());
+ requestHeader.setBname(destBrokerName);
switch (localTransactionState) {
case COMMIT_MESSAGE:
requestHeader.setCommitOrRollback(MessageSysFlag.TRANSACTION_COMMIT_TYPE);
diff --git
a/common/src/main/java/org/apache/rocketmq/common/protocol/header/AckMessageRequestHeader.java
b/common/src/main/java/org/apache/rocketmq/common/protocol/header/AckMessageRequestHeader.java
index a8fea34d9..f9d9e83c2 100644
---
a/common/src/main/java/org/apache/rocketmq/common/protocol/header/AckMessageRequestHeader.java
+++
b/common/src/main/java/org/apache/rocketmq/common/protocol/header/AckMessageRequestHeader.java
@@ -17,11 +17,11 @@
package org.apache.rocketmq.common.protocol.header;
import com.google.common.base.MoreObjects;
-import org.apache.rocketmq.remoting.CommandCustomHeader;
+import org.apache.rocketmq.common.rpc.TopicQueueRequestHeader;
import org.apache.rocketmq.remoting.annotation.CFNotNull;
import org.apache.rocketmq.remoting.exception.RemotingCommandException;
-public class AckMessageRequestHeader implements CommandCustomHeader {
+public class AckMessageRequestHeader extends TopicQueueRequestHeader {
@CFNotNull
private String consumerGroup;
@CFNotNull
diff --git
a/common/src/main/java/org/apache/rocketmq/common/protocol/header/ChangeInvisibleTimeRequestHeader.java
b/common/src/main/java/org/apache/rocketmq/common/protocol/header/ChangeInvisibleTimeRequestHeader.java
index 5d06c3ffa..02bf9c081 100644
---
a/common/src/main/java/org/apache/rocketmq/common/protocol/header/ChangeInvisibleTimeRequestHeader.java
+++
b/common/src/main/java/org/apache/rocketmq/common/protocol/header/ChangeInvisibleTimeRequestHeader.java
@@ -17,11 +17,11 @@
package org.apache.rocketmq.common.protocol.header;
import com.google.common.base.MoreObjects;
-import org.apache.rocketmq.remoting.CommandCustomHeader;
+import org.apache.rocketmq.common.rpc.TopicQueueRequestHeader;
import org.apache.rocketmq.remoting.annotation.CFNotNull;
import org.apache.rocketmq.remoting.exception.RemotingCommandException;
-public class ChangeInvisibleTimeRequestHeader implements CommandCustomHeader {
+public class ChangeInvisibleTimeRequestHeader extends TopicQueueRequestHeader {
@CFNotNull
private String consumerGroup;
@CFNotNull
diff --git
a/common/src/main/java/org/apache/rocketmq/common/protocol/header/CheckTransactionStateRequestHeader.java
b/common/src/main/java/org/apache/rocketmq/common/protocol/header/CheckTransactionStateRequestHeader.java
index d62802c06..6ef4099b0 100644
---
a/common/src/main/java/org/apache/rocketmq/common/protocol/header/CheckTransactionStateRequestHeader.java
+++
b/common/src/main/java/org/apache/rocketmq/common/protocol/header/CheckTransactionStateRequestHeader.java
@@ -21,11 +21,11 @@
package org.apache.rocketmq.common.protocol.header;
import com.google.common.base.MoreObjects;
-import org.apache.rocketmq.remoting.CommandCustomHeader;
+import org.apache.rocketmq.common.rpc.RpcRequestHeader;
import org.apache.rocketmq.remoting.annotation.CFNotNull;
import org.apache.rocketmq.remoting.exception.RemotingCommandException;
-public class CheckTransactionStateRequestHeader implements CommandCustomHeader
{
+public class CheckTransactionStateRequestHeader extends RpcRequestHeader {
@CFNotNull
private Long tranStateTableOffset;
@CFNotNull
diff --git
a/common/src/main/java/org/apache/rocketmq/common/protocol/header/ConsumerSendMsgBackRequestHeader.java
b/common/src/main/java/org/apache/rocketmq/common/protocol/header/ConsumerSendMsgBackRequestHeader.java
index 3d65f2392..ee0416f52 100644
---
a/common/src/main/java/org/apache/rocketmq/common/protocol/header/ConsumerSendMsgBackRequestHeader.java
+++
b/common/src/main/java/org/apache/rocketmq/common/protocol/header/ConsumerSendMsgBackRequestHeader.java
@@ -18,12 +18,12 @@
package org.apache.rocketmq.common.protocol.header;
import com.google.common.base.MoreObjects;
-import org.apache.rocketmq.remoting.CommandCustomHeader;
+import org.apache.rocketmq.common.rpc.RpcRequestHeader;
import org.apache.rocketmq.remoting.annotation.CFNotNull;
import org.apache.rocketmq.remoting.annotation.CFNullable;
import org.apache.rocketmq.remoting.exception.RemotingCommandException;
-public class ConsumerSendMsgBackRequestHeader implements CommandCustomHeader {
+public class ConsumerSendMsgBackRequestHeader extends RpcRequestHeader {
@CFNotNull
private Long offset;
@CFNotNull
diff --git
a/common/src/main/java/org/apache/rocketmq/common/protocol/header/EndTransactionRequestHeader.java
b/common/src/main/java/org/apache/rocketmq/common/protocol/header/EndTransactionRequestHeader.java
index 80fdc3d4a..eabc4bed6 100644
---
a/common/src/main/java/org/apache/rocketmq/common/protocol/header/EndTransactionRequestHeader.java
+++
b/common/src/main/java/org/apache/rocketmq/common/protocol/header/EndTransactionRequestHeader.java
@@ -18,13 +18,13 @@
package org.apache.rocketmq.common.protocol.header;
import com.google.common.base.MoreObjects;
+import org.apache.rocketmq.common.rpc.RpcRequestHeader;
import org.apache.rocketmq.common.sysflag.MessageSysFlag;
-import org.apache.rocketmq.remoting.CommandCustomHeader;
import org.apache.rocketmq.remoting.annotation.CFNotNull;
import org.apache.rocketmq.remoting.annotation.CFNullable;
import org.apache.rocketmq.remoting.exception.RemotingCommandException;
-public class EndTransactionRequestHeader implements CommandCustomHeader {
+public class EndTransactionRequestHeader extends RpcRequestHeader {
@CFNotNull
private String producerGroup;
@CFNotNull
diff --git
a/common/src/main/java/org/apache/rocketmq/common/protocol/header/NotificationRequestHeader.java
b/common/src/main/java/org/apache/rocketmq/common/protocol/header/NotificationRequestHeader.java
index 79db24e44..0f396a843 100644
---
a/common/src/main/java/org/apache/rocketmq/common/protocol/header/NotificationRequestHeader.java
+++
b/common/src/main/java/org/apache/rocketmq/common/protocol/header/NotificationRequestHeader.java
@@ -16,12 +16,12 @@
*/
package org.apache.rocketmq.common.protocol.header;
-import org.apache.rocketmq.remoting.CommandCustomHeader;
+import org.apache.rocketmq.common.rpc.TopicQueueRequestHeader;
import org.apache.rocketmq.remoting.annotation.CFNotNull;
import org.apache.rocketmq.remoting.exception.RemotingCommandException;
-public class NotificationRequestHeader implements CommandCustomHeader {
+public class NotificationRequestHeader extends TopicQueueRequestHeader {
@CFNotNull
private String consumerGroup;
@CFNotNull
@@ -70,14 +70,14 @@ public class NotificationRequestHeader implements
CommandCustomHeader {
this.topic = topic;
}
- public int getQueueId() {
+ public Integer getQueueId() {
if (queueId < 0) {
return -1;
}
return queueId;
}
- public void setQueueId(int queueId) {
+ public void setQueueId(Integer queueId) {
this.queueId = queueId;
}
diff --git
a/common/src/main/java/org/apache/rocketmq/common/protocol/header/PeekMessageRequestHeader.java
b/common/src/main/java/org/apache/rocketmq/common/protocol/header/PeekMessageRequestHeader.java
index ba172f57d..1653bfbcc 100644
---
a/common/src/main/java/org/apache/rocketmq/common/protocol/header/PeekMessageRequestHeader.java
+++
b/common/src/main/java/org/apache/rocketmq/common/protocol/header/PeekMessageRequestHeader.java
@@ -16,11 +16,11 @@
*/
package org.apache.rocketmq.common.protocol.header;
-import org.apache.rocketmq.remoting.CommandCustomHeader;
+import org.apache.rocketmq.common.rpc.TopicQueueRequestHeader;
import org.apache.rocketmq.remoting.annotation.CFNotNull;
import org.apache.rocketmq.remoting.exception.RemotingCommandException;
-public class PeekMessageRequestHeader implements CommandCustomHeader {
+public class PeekMessageRequestHeader extends TopicQueueRequestHeader {
@CFNotNull
private String topic;
@CFNotNull
@@ -50,11 +50,11 @@ public class PeekMessageRequestHeader implements
CommandCustomHeader {
this.topic = topic;
}
- public int getQueueId() {
+ public Integer getQueueId() {
return queueId;
}
- public void setQueueId(int queueId) {
+ public void setQueueId(Integer queueId) {
this.queueId = queueId;
}
diff --git
a/common/src/main/java/org/apache/rocketmq/common/protocol/header/PollingInfoRequestHeader.java
b/common/src/main/java/org/apache/rocketmq/common/protocol/header/PollingInfoRequestHeader.java
index a57fa6e7a..3fbe67933 100644
---
a/common/src/main/java/org/apache/rocketmq/common/protocol/header/PollingInfoRequestHeader.java
+++
b/common/src/main/java/org/apache/rocketmq/common/protocol/header/PollingInfoRequestHeader.java
@@ -17,12 +17,12 @@
package org.apache.rocketmq.common.protocol.header;
-import org.apache.rocketmq.remoting.CommandCustomHeader;
+import org.apache.rocketmq.common.rpc.TopicQueueRequestHeader;
import org.apache.rocketmq.remoting.annotation.CFNotNull;
import org.apache.rocketmq.remoting.exception.RemotingCommandException;
-public class PollingInfoRequestHeader implements CommandCustomHeader {
+public class PollingInfoRequestHeader extends TopicQueueRequestHeader {
@CFNotNull
private String consumerGroup;
@CFNotNull
@@ -50,14 +50,14 @@ public class PollingInfoRequestHeader implements
CommandCustomHeader {
this.topic = topic;
}
- public int getQueueId() {
+ public Integer getQueueId() {
if (queueId < 0) {
return -1;
}
return queueId;
}
- public void setQueueId(int queueId) {
+ public void setQueueId(Integer queueId) {
this.queueId = queueId;
}
diff --git
a/common/src/main/java/org/apache/rocketmq/common/protocol/header/PopMessageRequestHeader.java
b/common/src/main/java/org/apache/rocketmq/common/protocol/header/PopMessageRequestHeader.java
index a3a186a91..d9d28c62f 100644
---
a/common/src/main/java/org/apache/rocketmq/common/protocol/header/PopMessageRequestHeader.java
+++
b/common/src/main/java/org/apache/rocketmq/common/protocol/header/PopMessageRequestHeader.java
@@ -17,11 +17,11 @@
package org.apache.rocketmq.common.protocol.header;
import com.google.common.base.MoreObjects;
-import org.apache.rocketmq.remoting.CommandCustomHeader;
+import org.apache.rocketmq.common.rpc.TopicQueueRequestHeader;
import org.apache.rocketmq.remoting.annotation.CFNotNull;
import org.apache.rocketmq.remoting.exception.RemotingCommandException;
-public class PopMessageRequestHeader implements CommandCustomHeader {
+public class PopMessageRequestHeader extends TopicQueueRequestHeader {
@CFNotNull
private String consumerGroup;
@CFNotNull
@@ -102,18 +102,18 @@ public class PopMessageRequestHeader implements
CommandCustomHeader {
this.topic = topic;
}
- public int getQueueId() {
+ public Integer getQueueId() {
if (queueId < 0) {
return -1;
}
return queueId;
}
- public void setQueueId(int queueId) {
+ @Override
+ public void setQueueId(Integer queueId) {
this.queueId = queueId;
}
-
public int getMaxMsgNums() {
return maxMsgNums;
}
diff --git
a/common/src/main/java/org/apache/rocketmq/common/protocol/header/SendMessageRequestHeaderV2.java
b/common/src/main/java/org/apache/rocketmq/common/protocol/header/SendMessageRequestHeaderV2.java
index f4771252e..1985f65f4 100644
---
a/common/src/main/java/org/apache/rocketmq/common/protocol/header/SendMessageRequestHeaderV2.java
+++
b/common/src/main/java/org/apache/rocketmq/common/protocol/header/SendMessageRequestHeaderV2.java
@@ -59,6 +59,8 @@ public class SendMessageRequestHeaderV2 implements
CommandCustomHeader, FastCode
@CFNullable
private boolean m; //batch
+ @CFNullable
+ private String n; // brokerName
public static SendMessageRequestHeader
createSendMessageRequestHeaderV1(final SendMessageRequestHeaderV2 v2) {
SendMessageRequestHeader v1 = new SendMessageRequestHeader();
@@ -75,6 +77,7 @@ public class SendMessageRequestHeaderV2 implements
CommandCustomHeader, FastCode
v1.setUnitMode(v2.k);
v1.setMaxReconsumeTimes(v2.l);
v1.setBatch(v2.m);
+ v1.setBname(v2.n);
return v1;
}
@@ -93,6 +96,7 @@ public class SendMessageRequestHeaderV2 implements
CommandCustomHeader, FastCode
v2.k = v1.isUnitMode();
v2.l = v1.getMaxReconsumeTimes();
v2.m = v1.isBatch();
+ v2.n = v1.getBname();
return v2;
}
@@ -115,6 +119,7 @@ public class SendMessageRequestHeaderV2 implements
CommandCustomHeader, FastCode
writeIfNotNull(out, "k", k);
writeIfNotNull(out, "l", l);
writeIfNotNull(out, "m", m);
+ writeIfNotNull(out, "n", n);
}
@Override
@@ -184,6 +189,11 @@ public class SendMessageRequestHeaderV2 implements
CommandCustomHeader, FastCode
if (str != null) {
m = Boolean.parseBoolean(str);
}
+
+ str = fields.get("n");
+ if (str != null) {
+ n = str;
+ }
}
public String getA() {
@@ -306,6 +316,7 @@ public class SendMessageRequestHeaderV2 implements
CommandCustomHeader, FastCode
.add("k", k)
.add("l", l)
.add("m", m)
+ .add("n", n)
.toString();
}
}
\ No newline at end of file