This is an automated email from the ASF dual-hosted git repository.

jinrongtong pushed a commit to branch develop
in repository https://gitbox.apache.org/repos/asf/rocketmq.git


The following commit(s) were added to refs/heads/develop by this push:
     new 0792474479 [ISSUE #9928] Add Priority IT for GRPC protocol
0792474479 is described below

commit 079247447939e646b7e07bd15b23c70890ad9568
Author: imzs <[email protected]>
AuthorDate: Fri Dec 19 09:39:16 2025 +0800

    [ISSUE #9928] Add Priority IT for GRPC protocol
---
 .../rocketmq/test/grpc/v2/ClusterGrpcIT.java       |  5 +++
 .../apache/rocketmq/test/grpc/v2/GrpcBaseIT.java   | 50 ++++++++++++++++++++++
 .../apache/rocketmq/test/grpc/v2/LocalGrpcIT.java  |  5 +++
 3 files changed, 60 insertions(+)

diff --git 
a/test/src/test/java/org/apache/rocketmq/test/grpc/v2/ClusterGrpcIT.java 
b/test/src/test/java/org/apache/rocketmq/test/grpc/v2/ClusterGrpcIT.java
index 7c9625ecd5..6eb7a77376 100644
--- a/test/src/test/java/org/apache/rocketmq/test/grpc/v2/ClusterGrpcIT.java
+++ b/test/src/test/java/org/apache/rocketmq/test/grpc/v2/ClusterGrpcIT.java
@@ -118,4 +118,9 @@ public class ClusterGrpcIT extends GrpcBaseIT {
     public void testConsumeOrderly() throws Exception {
         super.testConsumeOrderly();
     }
+
+    @Test
+    public void testSimpleConsumerSendAndRecvPriorityMessage() throws 
Exception {
+        super.testSimpleConsumerSendAndRecvPriorityMessage();
+    }
 }
diff --git 
a/test/src/test/java/org/apache/rocketmq/test/grpc/v2/GrpcBaseIT.java 
b/test/src/test/java/org/apache/rocketmq/test/grpc/v2/GrpcBaseIT.java
index 2d18637376..724aa114f1 100644
--- a/test/src/test/java/org/apache/rocketmq/test/grpc/v2/GrpcBaseIT.java
+++ b/test/src/test/java/org/apache/rocketmq/test/grpc/v2/GrpcBaseIT.java
@@ -629,6 +629,56 @@ public class GrpcBaseIT extends BaseConf {
         }
     }
 
+    public void testSimpleConsumerSendAndRecvPriorityMessage() throws 
Exception {
+        String topic = initTopicOnSampleTopicBroker(BROKER1_NAME, 
TopicMessageType.PRIORITY);
+        String group = MQRandomUtils.getRandomConsumerGroup();
+
+        // init consumer offset
+        this.sendClientSettings(stub, 
buildSimpleConsumerClientSettings(group)).get();
+        receiveMessage(blockingStub, topic, group, 1);
+
+        this.sendClientSettings(stub, 
buildProducerClientSettings(topic)).get();
+        for (int i = 0; i < BaseConf.QUEUE_NUMBERS; i++) {
+            String messageId = createUniqID();
+            SendMessageResponse sendResponse = 
blockingStub.sendMessage(SendMessageRequest.newBuilder()
+                .addMessages(Message.newBuilder()
+                    .setTopic(Resource.newBuilder()
+                        .setName(topic)
+                        .build())
+                    .setSystemProperties(SystemProperties.newBuilder()
+                        .setMessageId(messageId)
+                        .setQueueId(0)
+                        .setMessageType(MessageType.PRIORITY)
+                        .setBodyEncoding(Encoding.GZIP)
+                        
.setBornTimestamp(Timestamps.fromMillis(System.currentTimeMillis()))
+                        
.setBornHost(StringUtils.defaultString(NetworkUtil.getLocalAddress(), 
"127.0.0.1:1234"))
+                        .setPriority(i)
+                        .build())
+                    .setBody(ByteString.copyFromUtf8("hello"))
+                    .build())
+                .build());
+            assertSendMessage(sendResponse, messageId);
+        }
+
+        this.sendClientSettings(stub, 
buildSimpleConsumerClientSettings(group)).get();
+        List<Message> recvList = new ArrayList<>();
+        try {
+            await().atMost(java.time.Duration.ofSeconds(10)).until(() -> {
+                List<Message> messageList = 
getMessageFromReceiveMessageResponse(receiveMessage(blockingStub, topic, 
group));
+                if (messageList.isEmpty()) {
+                    return false;
+                }
+                recvList.addAll(messageList);
+                return recvList.size() == BaseConf.QUEUE_NUMBERS;
+            });
+        } catch (Exception e) {
+        }
+        for (int i = 0; i < BaseConf.QUEUE_NUMBERS; i++) {
+            // default priority order: 0 as lowest priority
+            
assertThat(recvList.get(i).getSystemProperties().getPriority()).isEqualTo(BaseConf.QUEUE_NUMBERS
 - i - 1);
+        }
+    }
+
     public List<ReceiveMessageResponse> 
receiveMessage(MessagingServiceGrpc.MessagingServiceBlockingStub stub,
         String topic, String group) {
         return receiveMessage(stub, topic, group, 15);
diff --git 
a/test/src/test/java/org/apache/rocketmq/test/grpc/v2/LocalGrpcIT.java 
b/test/src/test/java/org/apache/rocketmq/test/grpc/v2/LocalGrpcIT.java
index 43471c7b2a..a53f3afc1a 100644
--- a/test/src/test/java/org/apache/rocketmq/test/grpc/v2/LocalGrpcIT.java
+++ b/test/src/test/java/org/apache/rocketmq/test/grpc/v2/LocalGrpcIT.java
@@ -106,4 +106,9 @@ public class LocalGrpcIT extends GrpcBaseIT {
     public void testConsumeOrderly() throws Exception {
         super.testConsumeOrderly();
     }
+
+    @Test
+    public void testSimpleConsumerSendAndRecvPriorityMessage() throws 
Exception {
+        super.testSimpleConsumerSendAndRecvPriorityMessage();
+    }
 }

Reply via email to