Copilot commented on code in PR #10540:
URL: https://github.com/apache/rocketmq/pull/10540#discussion_r3503063597
##########
remoting/src/main/java/org/apache/rocketmq/remoting/protocol/header/SendMessageRequestHeader.java:
##########
@@ -183,22 +190,140 @@ public void setBatch(Boolean batch) {
}
public static SendMessageRequestHeader parseRequestHeader(RemotingCommand
request) throws RemotingCommandException {
- SendMessageRequestHeaderV2 requestHeaderV2 = null;
- SendMessageRequestHeader requestHeader = null;
- switch (request.getCode()) {
- case RequestCode.SEND_BATCH_MESSAGE:
- case RequestCode.SEND_MESSAGE_V2:
- requestHeaderV2 =
request.decodeCommandCustomHeader(SendMessageRequestHeaderV2.class);
- case RequestCode.SEND_MESSAGE:
- if (null == requestHeaderV2) {
- requestHeader =
request.decodeCommandCustomHeader(SendMessageRequestHeader.class);
- } else {
- requestHeader =
SendMessageRequestHeaderV2.createSendMessageRequestHeaderV1(requestHeaderV2);
- }
- default:
- break;
- }
- return requestHeader;
+ return
request.decodeCommandCustomHeader(SendMessageRequestHeader.class);
+ }
+
+ @Override
+ public void encode(ByteBuf out) {
+ writeIfNotNull(out, "a", producerGroup);
+ writeIfNotNull(out, "b", topic);
+ writeIfNotNull(out, "c", defaultTopic);
+ writeIfNotNull(out, "d", defaultTopicQueueNums);
+ writeIfNotNull(out, "e", queueId);
+ writeIfNotNull(out, "f", sysFlag);
+ writeIfNotNull(out, "g", bornTimestamp);
+ writeIfNotNull(out, "h", flag);
+ writeIfNotNull(out, "i", properties);
+ writeIfNotNull(out, "j", reconsumeTimes);
+ writeIfNotNull(out, "k", unitMode);
+ writeIfNotNull(out, "l", maxReconsumeTimes);
+ writeIfNotNull(out, "m", batch);
+ writeIfNotNull(out, "n", getBname());
+ }
Review Comment:
SendMessageRequestHeader.encode() now serializes only short V2 keys
("a".."n"). For RequestCode.SEND_MESSAGE, the broker-side auth path
(DefaultAuthorizationContextBuilder) reads request.getExtFields() directly and
expects the long key "topic" (and old brokers using reflection-based header
decode expect long keys like "producerGroup", "sysFlag", etc.). With fast
ROCKETMQ encoding, extFields won’t contain those long keys, so SEND_MESSAGE
authorization/topic parsing can break and older brokers may fail to decode
required fields.
To preserve V1 compatibility, encode both the short V2 keys and the original
long field names (at least for all `@CFNotNull` fields, plus topic).
##########
remoting/src/main/java/org/apache/rocketmq/remoting/protocol/header/SendMessageRequestHeader.java:
##########
@@ -183,22 +190,140 @@ public void setBatch(Boolean batch) {
}
public static SendMessageRequestHeader parseRequestHeader(RemotingCommand
request) throws RemotingCommandException {
- SendMessageRequestHeaderV2 requestHeaderV2 = null;
- SendMessageRequestHeader requestHeader = null;
- switch (request.getCode()) {
- case RequestCode.SEND_BATCH_MESSAGE:
- case RequestCode.SEND_MESSAGE_V2:
- requestHeaderV2 =
request.decodeCommandCustomHeader(SendMessageRequestHeaderV2.class);
- case RequestCode.SEND_MESSAGE:
- if (null == requestHeaderV2) {
- requestHeader =
request.decodeCommandCustomHeader(SendMessageRequestHeader.class);
- } else {
- requestHeader =
SendMessageRequestHeaderV2.createSendMessageRequestHeaderV1(requestHeaderV2);
- }
- default:
- break;
- }
- return requestHeader;
+ return
request.decodeCommandCustomHeader(SendMessageRequestHeader.class);
+ }
+
+ @Override
+ public void encode(ByteBuf out) {
+ writeIfNotNull(out, "a", producerGroup);
+ writeIfNotNull(out, "b", topic);
+ writeIfNotNull(out, "c", defaultTopic);
+ writeIfNotNull(out, "d", defaultTopicQueueNums);
+ writeIfNotNull(out, "e", queueId);
+ writeIfNotNull(out, "f", sysFlag);
+ writeIfNotNull(out, "g", bornTimestamp);
+ writeIfNotNull(out, "h", flag);
+ writeIfNotNull(out, "i", properties);
+ writeIfNotNull(out, "j", reconsumeTimes);
+ writeIfNotNull(out, "k", unitMode);
+ writeIfNotNull(out, "l", maxReconsumeTimes);
+ writeIfNotNull(out, "m", batch);
+ writeIfNotNull(out, "n", getBname());
+ }
+
+ @Override
+ public void decode(HashMap<String, String> fields) throws
RemotingCommandException {
+ String str = fields.get("a");
+ if (str == null) {
+ str = getAndCheckNotNull(fields, "producerGroup");
+ }
+ if (str != null) {
+ this.producerGroup = str;
+ }
+
+ str = fields.get("b");
+ if (str == null) {
+ str = getAndCheckNotNull(fields, "topic");
+ }
+ if (str != null) {
+ this.topic = str;
+ }
+
+ str = fields.get("c");
+ if (str == null) {
+ str = getAndCheckNotNull(fields, "defaultTopic");
+ }
+ if (str != null) {
+ this.defaultTopic = str;
+ }
+
+ str = fields.get("d");
+ if (str == null) {
+ str = getAndCheckNotNull(fields, "defaultTopicQueueNums");
+ }
+ if (str != null) {
+ this.defaultTopicQueueNums = Integer.parseInt(str);
+ }
+
+ str = fields.get("e");
+ if (str == null) {
+ str = getAndCheckNotNull(fields, "queueId");
+ }
+ if (str != null) {
+ this.queueId = Integer.parseInt(str);
+ }
+
+ str = fields.get("f");
+ if (str == null) {
+ str = getAndCheckNotNull(fields, "sysFlag");
+ }
+ if (str != null) {
+ this.sysFlag = Integer.parseInt(str);
+ }
+
+ str = fields.get("g");
+ if (str == null) {
+ str = getAndCheckNotNull(fields, "bornTimestamp");
+ }
+ if (str != null) {
+ this.bornTimestamp = Long.parseLong(str);
+ }
+
+ str = fields.get("h");
+ if (str == null) {
+ str = getAndCheckNotNull(fields, "flag");
+ }
+ if (str != null) {
+ this.flag = Integer.parseInt(str);
+ }
+
+ str = fields.get("i");
+ if (str == null) {
+ str = fields.get("properties");
+ }
+ if (str != null) {
+ this.properties = str;
+ }
+
+ str = fields.get("j");
+ if (str == null) {
+ str = fields.get("reconsumeTimes");
+ }
+ if (str != null) {
+ this.reconsumeTimes = Integer.parseInt(str);
+ }
+
+ str = fields.get("k");
+ if (str == null) {
+ str = fields.get("unitMode");
+ }
+ if (str != null) {
+ this.unitMode = Boolean.parseBoolean(str);
+ }
+
+ str = fields.get("l");
+ if (str == null) {
+ str = fields.get("maxReconsumeTimes");
+ }
+ if (str != null) {
+ this.maxReconsumeTimes = Integer.parseInt(str);
+ }
+
+ str = fields.get("m");
+ if (str == null) {
+ str = fields.get("batch");
+ }
+ if (str != null) {
+ this.batch = Boolean.parseBoolean(str);
+ }
+
+ str = fields.get("n");
+ if (str == null) {
+ str = fields.get("brokerName");
+ }
+ if (str != null) {
+ this.setBrokerName(str);
+ }
Review Comment:
decode() falls back from short key "n" only to "brokerName", but older V1
reflection-based encoding (makeCustomHeaderToNet) serializes the inherited
RpcRequestHeader field as "bname". After switching parseRequestHeader() to
always decode via SendMessageRequestHeader.decode(), brokerName may be silently
dropped for mixed-version traffic.
Add a fallback for "bname" when decoding broker name.
##########
remoting/src/main/java/org/apache/rocketmq/remoting/protocol/header/SendMessageRequestHeader.java:
##########
@@ -183,22 +190,140 @@ public void setBatch(Boolean batch) {
}
public static SendMessageRequestHeader parseRequestHeader(RemotingCommand
request) throws RemotingCommandException {
- SendMessageRequestHeaderV2 requestHeaderV2 = null;
- SendMessageRequestHeader requestHeader = null;
- switch (request.getCode()) {
- case RequestCode.SEND_BATCH_MESSAGE:
- case RequestCode.SEND_MESSAGE_V2:
- requestHeaderV2 =
request.decodeCommandCustomHeader(SendMessageRequestHeaderV2.class);
- case RequestCode.SEND_MESSAGE:
- if (null == requestHeaderV2) {
- requestHeader =
request.decodeCommandCustomHeader(SendMessageRequestHeader.class);
- } else {
- requestHeader =
SendMessageRequestHeaderV2.createSendMessageRequestHeaderV1(requestHeaderV2);
- }
- default:
- break;
- }
- return requestHeader;
+ return
request.decodeCommandCustomHeader(SendMessageRequestHeader.class);
+ }
+
+ @Override
Review Comment:
SendMessageRequestHeader is now a FastCodesHeader with custom encode/decode
and a new mixed-key decode strategy (short "a".."n" + long names). There is
existing test coverage for SendMessageRequestHeaderV2, but no tests exercising
SendMessageRequestHeader’s new encode/decode behavior or V1/V2 compatibility.
Add unit tests to validate: (1) encoding includes expected keys for
SEND_MESSAGE compatibility, and (2) decoding works with both legacy long keys
and short keys (including brokerName/bname fallback).
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]