This is an automated email from the ASF dual-hosted git repository.
aaronai pushed a commit to branch develop
in repository https://gitbox.apache.org/repos/asf/rocketmq.git
The following commit(s) were added to refs/heads/develop by this push:
new c99a593f4 Upgrade proto version to 2.0.1, and adapt to parse dlq
information. (#5343)
c99a593f4 is described below
commit c99a593f43f9a6b742c29b037d98d42a23f02793
Author: Jixiang Jin <[email protected]>
AuthorDate: Tue Oct 18 17:20:35 2022 +0800
Upgrade proto version to 2.0.1, and adapt to parse dlq information. (#5343)
* Upgrade proto version to 2.0.1, and adapt to parse dlq information.
* Upgrade proto version to 2.0.1 in bazel config.
---
WORKSPACE | 2 +-
.../java/org/apache/rocketmq/common/message/MessageConst.java | 8 ++++++++
pom.xml | 2 +-
.../apache/rocketmq/proxy/grpc/v2/common/GrpcConverter.java | 10 ++++++++++
4 files changed, 20 insertions(+), 2 deletions(-)
diff --git a/WORKSPACE b/WORKSPACE
index 01a6bbb3d..e2971459b 100644
--- a/WORKSPACE
+++ b/WORKSPACE
@@ -69,7 +69,7 @@ maven_install(
"org.bouncycastle:bcpkix-jdk15on:1.69",
"com.google.code.gson:gson:2.8.9",
"com.googlecode.concurrentlinkedhashmap:concurrentlinkedhashmap-lru:1.4.2",
- "org.apache.rocketmq:rocketmq-proto:2.0.0",
+ "org.apache.rocketmq:rocketmq-proto:2.0.1",
"com.google.protobuf:protobuf-java:3.20.1",
"com.google.protobuf:protobuf-java-util:3.20.1",
"com.conversantmedia:disruptor:1.2.10",
diff --git
a/common/src/main/java/org/apache/rocketmq/common/message/MessageConst.java
b/common/src/main/java/org/apache/rocketmq/common/message/MessageConst.java
index 43f47efc4..87fed7c19 100644
--- a/common/src/main/java/org/apache/rocketmq/common/message/MessageConst.java
+++ b/common/src/main/java/org/apache/rocketmq/common/message/MessageConst.java
@@ -98,6 +98,12 @@ public class MessageConst {
public static final String PROPERTY_TIMER_DELAY_LEVEL =
"TIMER_DELAY_LEVEL";
public static final String PROPERTY_TIMER_DELAY_MS = "TIMER_DELAY_MS";
+ /**
+ * properties for DLQ
+ */
+ public static final String PROPERTY_DLQ_ORIGIN_TOPIC = "DLQ_ORIGIN_TOPIC";
+ public static final String PROPERTY_DLQ_ORIGIN_MESSAGE_ID =
"DLQ_ORIGIN_MESSAGE_ID";
+
static {
STRING_HASH_SET.add(PROPERTY_TRACE_SWITCH);
STRING_HASH_SET.add(PROPERTY_MSG_REGION);
@@ -147,5 +153,7 @@ public class MessageConst {
STRING_HASH_SET.add(PROPERTY_TIMER_DELAY_LEVEL);
STRING_HASH_SET.add(PROPERTY_BORN_HOST);
STRING_HASH_SET.add(PROPERTY_BORN_TIMESTAMP);
+ STRING_HASH_SET.add(PROPERTY_DLQ_ORIGIN_TOPIC);
+ STRING_HASH_SET.add(PROPERTY_DLQ_ORIGIN_MESSAGE_ID);
}
}
diff --git a/pom.xml b/pom.xml
index aab37e0ab..974502576 100644
--- a/pom.xml
+++ b/pom.xml
@@ -126,7 +126,7 @@
<annotations-api.version>6.0.53</annotations-api.version>
<extra-enforcer-rules.version>1.0-beta-4</extra-enforcer-rules.version>
<concurrentlinkedhashmap-lru.version>1.4.2</concurrentlinkedhashmap-lru.version>
- <rocketmq-proto.version>2.0.0</rocketmq-proto.version>
+ <rocketmq-proto.version>2.0.1</rocketmq-proto.version>
<grpc.version>1.45.0</grpc.version>
<protobuf.version>3.20.1</protobuf.version>
<disruptor.version>1.2.10</disruptor.version>
diff --git
a/proxy/src/main/java/org/apache/rocketmq/proxy/grpc/v2/common/GrpcConverter.java
b/proxy/src/main/java/org/apache/rocketmq/proxy/grpc/v2/common/GrpcConverter.java
index cc5a60ca6..b629e01b8 100644
---
a/proxy/src/main/java/org/apache/rocketmq/proxy/grpc/v2/common/GrpcConverter.java
+++
b/proxy/src/main/java/org/apache/rocketmq/proxy/grpc/v2/common/GrpcConverter.java
@@ -18,6 +18,7 @@
package org.apache.rocketmq.proxy.grpc.v2.common;
import apache.rocketmq.v2.Broker;
+import apache.rocketmq.v2.DeadLetterQueue;
import apache.rocketmq.v2.Digest;
import apache.rocketmq.v2.DigestType;
import apache.rocketmq.v2.Encoding;
@@ -237,6 +238,15 @@ public class GrpcConverter {
systemPropertiesBuilder.setTraceContext(traceContext);
}
+ String dlqOriginTopic =
messageExt.getProperty(MessageConst.PROPERTY_DLQ_ORIGIN_TOPIC);
+ String dlqOriginMessageId =
messageExt.getProperty(MessageConst.PROPERTY_DLQ_ORIGIN_MESSAGE_ID);
+ if (dlqOriginTopic != null && dlqOriginMessageId != null) {
+ DeadLetterQueue dlq = DeadLetterQueue.newBuilder()
+ .setTopic(dlqOriginTopic)
+ .setMessageId(dlqOriginMessageId)
+ .build();
+ systemPropertiesBuilder.setDeadLetterQueue(dlq);
+ }
return systemPropertiesBuilder.build();
}