This is an automated email from the ASF dual-hosted git repository.
jinrongtong pushed a commit to branch develop
in repository https://gitbox.apache.org/repos/asf/rocketmq.git
The following commit(s) were added to refs/heads/develop by this push:
new 0792474479 [ISSUE #9928] Add Priority IT for GRPC protocol
0792474479 is described below
commit 079247447939e646b7e07bd15b23c70890ad9568
Author: imzs <[email protected]>
AuthorDate: Fri Dec 19 09:39:16 2025 +0800
[ISSUE #9928] Add Priority IT for GRPC protocol
---
.../rocketmq/test/grpc/v2/ClusterGrpcIT.java | 5 +++
.../apache/rocketmq/test/grpc/v2/GrpcBaseIT.java | 50 ++++++++++++++++++++++
.../apache/rocketmq/test/grpc/v2/LocalGrpcIT.java | 5 +++
3 files changed, 60 insertions(+)
diff --git
a/test/src/test/java/org/apache/rocketmq/test/grpc/v2/ClusterGrpcIT.java
b/test/src/test/java/org/apache/rocketmq/test/grpc/v2/ClusterGrpcIT.java
index 7c9625ecd5..6eb7a77376 100644
--- a/test/src/test/java/org/apache/rocketmq/test/grpc/v2/ClusterGrpcIT.java
+++ b/test/src/test/java/org/apache/rocketmq/test/grpc/v2/ClusterGrpcIT.java
@@ -118,4 +118,9 @@ public class ClusterGrpcIT extends GrpcBaseIT {
public void testConsumeOrderly() throws Exception {
super.testConsumeOrderly();
}
+
+ @Test
+ public void testSimpleConsumerSendAndRecvPriorityMessage() throws
Exception {
+ super.testSimpleConsumerSendAndRecvPriorityMessage();
+ }
}
diff --git
a/test/src/test/java/org/apache/rocketmq/test/grpc/v2/GrpcBaseIT.java
b/test/src/test/java/org/apache/rocketmq/test/grpc/v2/GrpcBaseIT.java
index 2d18637376..724aa114f1 100644
--- a/test/src/test/java/org/apache/rocketmq/test/grpc/v2/GrpcBaseIT.java
+++ b/test/src/test/java/org/apache/rocketmq/test/grpc/v2/GrpcBaseIT.java
@@ -629,6 +629,56 @@ public class GrpcBaseIT extends BaseConf {
}
}
+ public void testSimpleConsumerSendAndRecvPriorityMessage() throws
Exception {
+ String topic = initTopicOnSampleTopicBroker(BROKER1_NAME,
TopicMessageType.PRIORITY);
+ String group = MQRandomUtils.getRandomConsumerGroup();
+
+ // init consumer offset
+ this.sendClientSettings(stub,
buildSimpleConsumerClientSettings(group)).get();
+ receiveMessage(blockingStub, topic, group, 1);
+
+ this.sendClientSettings(stub,
buildProducerClientSettings(topic)).get();
+ for (int i = 0; i < BaseConf.QUEUE_NUMBERS; i++) {
+ String messageId = createUniqID();
+ SendMessageResponse sendResponse =
blockingStub.sendMessage(SendMessageRequest.newBuilder()
+ .addMessages(Message.newBuilder()
+ .setTopic(Resource.newBuilder()
+ .setName(topic)
+ .build())
+ .setSystemProperties(SystemProperties.newBuilder()
+ .setMessageId(messageId)
+ .setQueueId(0)
+ .setMessageType(MessageType.PRIORITY)
+ .setBodyEncoding(Encoding.GZIP)
+
.setBornTimestamp(Timestamps.fromMillis(System.currentTimeMillis()))
+
.setBornHost(StringUtils.defaultString(NetworkUtil.getLocalAddress(),
"127.0.0.1:1234"))
+ .setPriority(i)
+ .build())
+ .setBody(ByteString.copyFromUtf8("hello"))
+ .build())
+ .build());
+ assertSendMessage(sendResponse, messageId);
+ }
+
+ this.sendClientSettings(stub,
buildSimpleConsumerClientSettings(group)).get();
+ List<Message> recvList = new ArrayList<>();
+ try {
+ await().atMost(java.time.Duration.ofSeconds(10)).until(() -> {
+ List<Message> messageList =
getMessageFromReceiveMessageResponse(receiveMessage(blockingStub, topic,
group));
+ if (messageList.isEmpty()) {
+ return false;
+ }
+ recvList.addAll(messageList);
+ return recvList.size() == BaseConf.QUEUE_NUMBERS;
+ });
+ } catch (Exception e) {
+ }
+ for (int i = 0; i < BaseConf.QUEUE_NUMBERS; i++) {
+ // default priority order: 0 as lowest priority
+
assertThat(recvList.get(i).getSystemProperties().getPriority()).isEqualTo(BaseConf.QUEUE_NUMBERS
- i - 1);
+ }
+ }
+
public List<ReceiveMessageResponse>
receiveMessage(MessagingServiceGrpc.MessagingServiceBlockingStub stub,
String topic, String group) {
return receiveMessage(stub, topic, group, 15);
diff --git
a/test/src/test/java/org/apache/rocketmq/test/grpc/v2/LocalGrpcIT.java
b/test/src/test/java/org/apache/rocketmq/test/grpc/v2/LocalGrpcIT.java
index 43471c7b2a..a53f3afc1a 100644
--- a/test/src/test/java/org/apache/rocketmq/test/grpc/v2/LocalGrpcIT.java
+++ b/test/src/test/java/org/apache/rocketmq/test/grpc/v2/LocalGrpcIT.java
@@ -106,4 +106,9 @@ public class LocalGrpcIT extends GrpcBaseIT {
public void testConsumeOrderly() throws Exception {
super.testConsumeOrderly();
}
+
+ @Test
+ public void testSimpleConsumerSendAndRecvPriorityMessage() throws
Exception {
+ super.testSimpleConsumerSendAndRecvPriorityMessage();
+ }
}