This is an automated email from the ASF dual-hosted git repository.

aaronai pushed a commit to branch develop
in repository https://gitbox.apache.org/repos/asf/rocketmq.git


The following commit(s) were added to refs/heads/develop by this push:
     new c99a593f4 Upgrade proto version to 2.0.1, and adapt to parse dlq 
information. (#5343)
c99a593f4 is described below

commit c99a593f43f9a6b742c29b037d98d42a23f02793
Author: Jixiang Jin <[email protected]>
AuthorDate: Tue Oct 18 17:20:35 2022 +0800

    Upgrade proto version to 2.0.1, and adapt to parse dlq information. (#5343)
    
    * Upgrade proto version to 2.0.1, and adapt to parse dlq information.
    
    * Upgrade proto version to 2.0.1 in bazel config.
---
 WORKSPACE                                                      |  2 +-
 .../java/org/apache/rocketmq/common/message/MessageConst.java  |  8 ++++++++
 pom.xml                                                        |  2 +-
 .../apache/rocketmq/proxy/grpc/v2/common/GrpcConverter.java    | 10 ++++++++++
 4 files changed, 20 insertions(+), 2 deletions(-)

diff --git a/WORKSPACE b/WORKSPACE
index 01a6bbb3d..e2971459b 100644
--- a/WORKSPACE
+++ b/WORKSPACE
@@ -69,7 +69,7 @@ maven_install(
         "org.bouncycastle:bcpkix-jdk15on:1.69",
         "com.google.code.gson:gson:2.8.9",
         
"com.googlecode.concurrentlinkedhashmap:concurrentlinkedhashmap-lru:1.4.2",
-        "org.apache.rocketmq:rocketmq-proto:2.0.0",
+        "org.apache.rocketmq:rocketmq-proto:2.0.1",
         "com.google.protobuf:protobuf-java:3.20.1",
         "com.google.protobuf:protobuf-java-util:3.20.1",
         "com.conversantmedia:disruptor:1.2.10",
diff --git 
a/common/src/main/java/org/apache/rocketmq/common/message/MessageConst.java 
b/common/src/main/java/org/apache/rocketmq/common/message/MessageConst.java
index 43f47efc4..87fed7c19 100644
--- a/common/src/main/java/org/apache/rocketmq/common/message/MessageConst.java
+++ b/common/src/main/java/org/apache/rocketmq/common/message/MessageConst.java
@@ -98,6 +98,12 @@ public class MessageConst {
     public static final String PROPERTY_TIMER_DELAY_LEVEL = 
"TIMER_DELAY_LEVEL";
     public static final String PROPERTY_TIMER_DELAY_MS = "TIMER_DELAY_MS";
 
+    /**
+     * properties for DLQ
+     */
+    public static final String PROPERTY_DLQ_ORIGIN_TOPIC = "DLQ_ORIGIN_TOPIC";
+    public static final String PROPERTY_DLQ_ORIGIN_MESSAGE_ID = 
"DLQ_ORIGIN_MESSAGE_ID";
+
     static {
         STRING_HASH_SET.add(PROPERTY_TRACE_SWITCH);
         STRING_HASH_SET.add(PROPERTY_MSG_REGION);
@@ -147,5 +153,7 @@ public class MessageConst {
         STRING_HASH_SET.add(PROPERTY_TIMER_DELAY_LEVEL);
         STRING_HASH_SET.add(PROPERTY_BORN_HOST);
         STRING_HASH_SET.add(PROPERTY_BORN_TIMESTAMP);
+        STRING_HASH_SET.add(PROPERTY_DLQ_ORIGIN_TOPIC);
+        STRING_HASH_SET.add(PROPERTY_DLQ_ORIGIN_MESSAGE_ID);
     }
 }
diff --git a/pom.xml b/pom.xml
index aab37e0ab..974502576 100644
--- a/pom.xml
+++ b/pom.xml
@@ -126,7 +126,7 @@
         <annotations-api.version>6.0.53</annotations-api.version>
         <extra-enforcer-rules.version>1.0-beta-4</extra-enforcer-rules.version>
         
<concurrentlinkedhashmap-lru.version>1.4.2</concurrentlinkedhashmap-lru.version>
-        <rocketmq-proto.version>2.0.0</rocketmq-proto.version>
+        <rocketmq-proto.version>2.0.1</rocketmq-proto.version>
         <grpc.version>1.45.0</grpc.version>
         <protobuf.version>3.20.1</protobuf.version>
         <disruptor.version>1.2.10</disruptor.version>
diff --git 
a/proxy/src/main/java/org/apache/rocketmq/proxy/grpc/v2/common/GrpcConverter.java
 
b/proxy/src/main/java/org/apache/rocketmq/proxy/grpc/v2/common/GrpcConverter.java
index cc5a60ca6..b629e01b8 100644
--- 
a/proxy/src/main/java/org/apache/rocketmq/proxy/grpc/v2/common/GrpcConverter.java
+++ 
b/proxy/src/main/java/org/apache/rocketmq/proxy/grpc/v2/common/GrpcConverter.java
@@ -18,6 +18,7 @@
 package org.apache.rocketmq.proxy.grpc.v2.common;
 
 import apache.rocketmq.v2.Broker;
+import apache.rocketmq.v2.DeadLetterQueue;
 import apache.rocketmq.v2.Digest;
 import apache.rocketmq.v2.DigestType;
 import apache.rocketmq.v2.Encoding;
@@ -237,6 +238,15 @@ public class GrpcConverter {
             systemPropertiesBuilder.setTraceContext(traceContext);
         }
 
+        String dlqOriginTopic = 
messageExt.getProperty(MessageConst.PROPERTY_DLQ_ORIGIN_TOPIC);
+        String dlqOriginMessageId = 
messageExt.getProperty(MessageConst.PROPERTY_DLQ_ORIGIN_MESSAGE_ID);
+        if (dlqOriginTopic != null && dlqOriginMessageId != null) {
+            DeadLetterQueue dlq = DeadLetterQueue.newBuilder()
+                .setTopic(dlqOriginTopic)
+                .setMessageId(dlqOriginMessageId)
+                .build();
+            systemPropertiesBuilder.setDeadLetterQueue(dlq);
+        }
         return systemPropertiesBuilder.build();
     }
 

Reply via email to