This is an automated email from the ASF dual-hosted git repository.
kaili pushed a commit to branch develop
in repository https://gitbox.apache.org/repos/asf/rocketmq.git
The following commit(s) were added to refs/heads/develop by this push:
new 90bf88634 [ISSUE #6192] Set a default value when UniqID is empty in
Proxy (#6193)
90bf88634 is described below
commit 90bf886340215b1c45e43bf740c67317fdf9665e
Author: lk <[email protected]>
AuthorDate: Mon Feb 27 12:56:26 2023 +0800
[ISSUE #6192] Set a default value when UniqID is empty in Proxy (#6193)
---
.../apache/rocketmq/proxy/grpc/v2/common/GrpcConverter.java | 2 +-
.../apache/rocketmq/proxy/processor/ConsumerProcessor.java | 12 ++++++++++++
.../apache/rocketmq/proxy/processor/ProducerProcessor.java | 4 ++++
3 files changed, 17 insertions(+), 1 deletion(-)
diff --git
a/proxy/src/main/java/org/apache/rocketmq/proxy/grpc/v2/common/GrpcConverter.java
b/proxy/src/main/java/org/apache/rocketmq/proxy/grpc/v2/common/GrpcConverter.java
index 96a214750..21526054a 100644
---
a/proxy/src/main/java/org/apache/rocketmq/proxy/grpc/v2/common/GrpcConverter.java
+++
b/proxy/src/main/java/org/apache/rocketmq/proxy/grpc/v2/common/GrpcConverter.java
@@ -139,7 +139,7 @@ public class GrpcConverter {
}
// message_id
- String uniqKey =
messageExt.getProperty(MessageConst.PROPERTY_UNIQ_CLIENT_MESSAGE_ID_KEYIDX);
+ String uniqKey = messageExt.getMsgId();
if (uniqKey != null) {
systemPropertiesBuilder.setMessageId(uniqKey);
}
diff --git
a/proxy/src/main/java/org/apache/rocketmq/proxy/processor/ConsumerProcessor.java
b/proxy/src/main/java/org/apache/rocketmq/proxy/processor/ConsumerProcessor.java
index 37c2e54d6..d67f4b855 100644
---
a/proxy/src/main/java/org/apache/rocketmq/proxy/processor/ConsumerProcessor.java
+++
b/proxy/src/main/java/org/apache/rocketmq/proxy/processor/ConsumerProcessor.java
@@ -34,6 +34,8 @@ import org.apache.rocketmq.client.consumer.PullResult;
import org.apache.rocketmq.common.constant.LoggerName;
import org.apache.rocketmq.common.consumer.ReceiptHandle;
import org.apache.rocketmq.common.message.MessageAccessor;
+import org.apache.rocketmq.common.message.MessageClientExt;
+import org.apache.rocketmq.common.message.MessageClientIDSetter;
import org.apache.rocketmq.common.message.MessageConst;
import org.apache.rocketmq.common.message.MessageExt;
import org.apache.rocketmq.common.message.MessageQueue;
@@ -144,6 +146,7 @@ public class ConsumerProcessor extends AbstractProcessor {
List<MessageExt> messageExtList = new ArrayList<>();
for (MessageExt messageExt :
popResult.getMsgFoundList()) {
try {
+ fillUniqIDIfNeed(messageExt);
String handleString =
createHandle(messageExt.getProperty(MessageConst.PROPERTY_POP_CK),
messageExt.getCommitLogOffset());
if (handleString == null) {
log.error("[BUG] pop message from broker
but handle is empty. requestHeader:{}, msg:{}", requestHeader, messageExt);
@@ -193,6 +196,15 @@ public class ConsumerProcessor extends AbstractProcessor {
return FutureUtils.addExecutor(future, this.executor);
}
+ private void fillUniqIDIfNeed(MessageExt messageExt) {
+ if (StringUtils.isBlank(MessageClientIDSetter.getUniqID(messageExt))) {
+ if (messageExt instanceof MessageClientExt) {
+ MessageClientExt clientExt = (MessageClientExt) messageExt;
+ MessageAccessor.putProperty(messageExt,
MessageConst.PROPERTY_UNIQ_CLIENT_MESSAGE_ID_KEYIDX,
clientExt.getOffsetMsgId());
+ }
+ }
+ }
+
public CompletableFuture<AckResult> ackMessage(
ProxyContext ctx,
ReceiptHandle handle,
diff --git
a/proxy/src/main/java/org/apache/rocketmq/proxy/processor/ProducerProcessor.java
b/proxy/src/main/java/org/apache/rocketmq/proxy/processor/ProducerProcessor.java
index 2fce78d31..749f9da2b 100644
---
a/proxy/src/main/java/org/apache/rocketmq/proxy/processor/ProducerProcessor.java
+++
b/proxy/src/main/java/org/apache/rocketmq/proxy/processor/ProducerProcessor.java
@@ -28,6 +28,7 @@ import org.apache.rocketmq.common.constant.LoggerName;
import org.apache.rocketmq.common.consumer.ReceiptHandle;
import org.apache.rocketmq.common.message.Message;
import org.apache.rocketmq.common.message.MessageAccessor;
+import org.apache.rocketmq.common.message.MessageClientIDSetter;
import org.apache.rocketmq.common.message.MessageConst;
import org.apache.rocketmq.common.message.MessageDecoder;
import org.apache.rocketmq.common.message.MessageId;
@@ -84,6 +85,9 @@ public class ProducerProcessor extends AbstractProcessor {
throw new ProxyException(ProxyExceptionCode.FORBIDDEN, "no
writable queue");
}
+ for (Message msg : messageList) {
+ MessageClientIDSetter.setUniqID(msg);
+ }
SendMessageRequestHeader requestHeader =
buildSendMessageRequestHeader(messageList, producerGroup, sysFlag,
messageQueue.getQueueId());
future = this.serviceManager.getMessageService().sendMessage(