This is an automated email from the ASF dual-hosted git repository.
jinrongtong pushed a commit to branch develop
in repository https://gitbox.apache.org/repos/asf/rocketmq.git
The following commit(s) were added to refs/heads/develop by this push:
new 3968c186a5 [ISSUE #7231] Fix: proxy client language error (#7200)
3968c186a5 is described below
commit 3968c186a59db96701ade8c343bc6a5d31ee2d24
Author: weihubeats <[email protected]>
AuthorDate: Fri Oct 20 14:49:00 2023 +0800
[ISSUE #7231] Fix: proxy client language error (#7200)
* Adding null does not update
* add langeuga code
* add langeuga code
* add langeuga code
* add langeuga code
* add langeuga code
* Rerun ci
* Rerun ci
* Rerun ci
* remove redundant package imports
* redundant line
* modify the parameter passed as proxyContext to language
* format
---
.../rocketmq/proxy/service/message/LocalMessageService.java | 12 ++++++------
.../rocketmq/proxy/service/message/LocalRemotingCommand.java | 8 ++++++--
.../org/apache/rocketmq/remoting/protocol/LanguageCode.java | 11 +++++++++++
3 files changed, 23 insertions(+), 8 deletions(-)
diff --git
a/proxy/src/main/java/org/apache/rocketmq/proxy/service/message/LocalMessageService.java
b/proxy/src/main/java/org/apache/rocketmq/proxy/service/message/LocalMessageService.java
index ca7dcc9eb0..aaa688fee6 100644
---
a/proxy/src/main/java/org/apache/rocketmq/proxy/service/message/LocalMessageService.java
+++
b/proxy/src/main/java/org/apache/rocketmq/proxy/service/message/LocalMessageService.java
@@ -104,7 +104,7 @@ public class LocalMessageService implements MessageService {
body = message.getBody();
messageId = MessageClientIDSetter.getUniqID(message);
}
- RemotingCommand request =
LocalRemotingCommand.createRequestCommand(RequestCode.SEND_MESSAGE,
requestHeader);
+ RemotingCommand request =
LocalRemotingCommand.createRequestCommand(RequestCode.SEND_MESSAGE,
requestHeader, ctx.getLanguage());
request.setBody(body);
CompletableFuture<RemotingCommand> future = new CompletableFuture<>();
SimpleChannel channel = channelManager.createInvocationChannel(ctx);
@@ -162,7 +162,7 @@ public class LocalMessageService implements MessageService {
ConsumerSendMsgBackRequestHeader requestHeader, long timeoutMillis) {
SimpleChannel channel = channelManager.createChannel(ctx);
ChannelHandlerContext channelHandlerContext =
channel.getChannelHandlerContext();
- RemotingCommand command =
LocalRemotingCommand.createRequestCommand(RequestCode.CONSUMER_SEND_MSG_BACK,
requestHeader);
+ RemotingCommand command =
LocalRemotingCommand.createRequestCommand(RequestCode.CONSUMER_SEND_MSG_BACK,
requestHeader, ctx.getLanguage());
CompletableFuture<RemotingCommand> future = new CompletableFuture<>();
try {
RemotingCommand response =
brokerController.getSendMessageProcessor()
@@ -181,7 +181,7 @@ public class LocalMessageService implements MessageService {
CompletableFuture<Void> future = new CompletableFuture<>();
SimpleChannel channel = channelManager.createChannel(ctx);
ChannelHandlerContext channelHandlerContext =
channel.getChannelHandlerContext();
- RemotingCommand command =
LocalRemotingCommand.createRequestCommand(RequestCode.END_TRANSACTION,
requestHeader);
+ RemotingCommand command =
LocalRemotingCommand.createRequestCommand(RequestCode.END_TRANSACTION,
requestHeader, ctx.getLanguage());
try {
brokerController.getEndTransactionProcessor()
.processRequest(channelHandlerContext, command);
@@ -196,7 +196,7 @@ public class LocalMessageService implements MessageService {
public CompletableFuture<PopResult> popMessage(ProxyContext ctx,
AddressableMessageQueue messageQueue,
PopMessageRequestHeader requestHeader, long timeoutMillis) {
requestHeader.setBornTime(System.currentTimeMillis());
- RemotingCommand request =
LocalRemotingCommand.createRequestCommand(RequestCode.POP_MESSAGE,
requestHeader);
+ RemotingCommand request =
LocalRemotingCommand.createRequestCommand(RequestCode.POP_MESSAGE,
requestHeader, ctx.getLanguage());
CompletableFuture<RemotingCommand> future = new CompletableFuture<>();
SimpleChannel channel = channelManager.createInvocationChannel(ctx);
InvocationContext invocationContext = new InvocationContext(future);
@@ -307,7 +307,7 @@ public class LocalMessageService implements MessageService {
ChangeInvisibleTimeRequestHeader requestHeader, long timeoutMillis) {
SimpleChannel channel = channelManager.createChannel(ctx);
ChannelHandlerContext channelHandlerContext =
channel.getChannelHandlerContext();
- RemotingCommand command =
LocalRemotingCommand.createRequestCommand(RequestCode.CHANGE_MESSAGE_INVISIBLETIME,
requestHeader);
+ RemotingCommand command =
LocalRemotingCommand.createRequestCommand(RequestCode.CHANGE_MESSAGE_INVISIBLETIME,
requestHeader, ctx.getLanguage());
CompletableFuture<RemotingCommand> future = new CompletableFuture<>();
try {
RemotingCommand response =
brokerController.getChangeInvisibleTimeProcessor()
@@ -346,7 +346,7 @@ public class LocalMessageService implements MessageService {
AckMessageRequestHeader requestHeader, long timeoutMillis) {
SimpleChannel channel = channelManager.createChannel(ctx);
ChannelHandlerContext channelHandlerContext =
channel.getChannelHandlerContext();
- RemotingCommand command =
LocalRemotingCommand.createRequestCommand(RequestCode.ACK_MESSAGE,
requestHeader);
+ RemotingCommand command =
LocalRemotingCommand.createRequestCommand(RequestCode.ACK_MESSAGE,
requestHeader, ctx.getLanguage());
CompletableFuture<RemotingCommand> future = new CompletableFuture<>();
try {
RemotingCommand response =
brokerController.getAckMessageProcessor()
diff --git
a/proxy/src/main/java/org/apache/rocketmq/proxy/service/message/LocalRemotingCommand.java
b/proxy/src/main/java/org/apache/rocketmq/proxy/service/message/LocalRemotingCommand.java
index 73048dbbc2..915cafcd57 100644
---
a/proxy/src/main/java/org/apache/rocketmq/proxy/service/message/LocalRemotingCommand.java
+++
b/proxy/src/main/java/org/apache/rocketmq/proxy/service/message/LocalRemotingCommand.java
@@ -16,16 +16,19 @@
*/
package org.apache.rocketmq.proxy.service.message;
-import java.util.HashMap;
import org.apache.rocketmq.remoting.CommandCustomHeader;
import org.apache.rocketmq.remoting.exception.RemotingCommandException;
+import org.apache.rocketmq.remoting.protocol.LanguageCode;
import org.apache.rocketmq.remoting.protocol.RemotingCommand;
+import java.util.HashMap;
+
public class LocalRemotingCommand extends RemotingCommand {
- public static LocalRemotingCommand createRequestCommand(int code,
CommandCustomHeader customHeader) {
+ public static LocalRemotingCommand createRequestCommand(int code,
CommandCustomHeader customHeader, String language) {
LocalRemotingCommand cmd = new LocalRemotingCommand();
cmd.setCode(code);
+ cmd.setLanguage(LanguageCode.getCode(language));
cmd.writeCustomHeader(customHeader);
cmd.setExtFields(new HashMap<>());
setCmdVersion(cmd);
@@ -37,4 +40,5 @@ public class LocalRemotingCommand extends RemotingCommand {
Class<? extends CommandCustomHeader> classHeader) throws
RemotingCommandException {
return classHeader.cast(readCustomHeader());
}
+
}
diff --git
a/remoting/src/main/java/org/apache/rocketmq/remoting/protocol/LanguageCode.java
b/remoting/src/main/java/org/apache/rocketmq/remoting/protocol/LanguageCode.java
index 19280f9967..2df9fbf027 100644
---
a/remoting/src/main/java/org/apache/rocketmq/remoting/protocol/LanguageCode.java
+++
b/remoting/src/main/java/org/apache/rocketmq/remoting/protocol/LanguageCode.java
@@ -17,6 +17,11 @@
package org.apache.rocketmq.remoting.protocol;
+import java.util.Arrays;
+import java.util.Map;
+import java.util.function.Function;
+import java.util.stream.Collectors;
+
public enum LanguageCode {
JAVA((byte) 0),
CPP((byte) 1),
@@ -50,4 +55,10 @@ public enum LanguageCode {
public byte getCode() {
return code;
}
+
+ private static final Map<String, LanguageCode> MAP =
Arrays.stream(LanguageCode.values()).collect(Collectors.toMap(LanguageCode::name,
Function.identity()));
+
+ public static LanguageCode getCode(String language) {
+ return MAP.get(language);
+ }
}