This is an automated email from the ASF dual-hosted git repository.

lizhimin pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/rocketmq-clients.git


The following commit(s) were added to refs/heads/master by this push:
     new 8903b2fd [ISSUE #1152] [RIP-80] [Java] Implementation of Priority 
Message (#1153)
8903b2fd is described below

commit 8903b2fd5a04d1385525aa9c13e2b067d0f0b9b1
Author: imzs <[email protected]>
AuthorDate: Mon Dec 29 14:34:51 2025 +0800

    [ISSUE #1152] [RIP-80] [Java] Implementation of Priority Message (#1153)
    
    Change-Id: Ib4d1d67573948a5c66ec1e0c83082d2bfb44d9e6
---
 README-CN.md                                       |  1 +
 README.md                                          |  1 +
 .../rocketmq/client/apis/message/Message.java      |  7 +++
 .../client/apis/message/MessageBuilder.java        |  7 +++
 .../rocketmq/client/apis/message/MessageView.java  |  7 +++
 .../example/ProducerPriorityMessageExample.java    | 64 ++++++++++++++++++++++
 .../client/java/message/GeneralMessage.java        |  7 +++
 .../client/java/message/GeneralMessageImpl.java    |  8 +++
 .../client/java/message/MessageBuilderImpl.java    | 17 ++++++
 .../rocketmq/client/java/message/MessageImpl.java  | 14 +++++
 .../rocketmq/client/java/message/MessageType.java  |  5 ++
 .../client/java/message/MessageViewImpl.java       | 16 +++++-
 .../client/java/message/PublishingMessageImpl.java | 16 +++++-
 .../java/message/GeneralMessageImplTest.java       | 38 ++++++++++++-
 .../client/java/message/MessageImplTest.java       | 44 +++++++++++++++
 .../apache/rocketmq/client/java/tool/TestBase.java |  2 +-
 java/pom.xml                                       |  2 +-
 17 files changed, 249 insertions(+), 7 deletions(-)

diff --git a/README-CN.md b/README-CN.md
index 6d7da093..88fb3023 100644
--- a/README-CN.md
+++ b/README-CN.md
@@ -28,6 +28,7 @@
 | Push consumer with concurrent message listener |   โœ…   |   โœ…   |   โœ…   |   ๐Ÿšง 
   |   โœ…   |   ๐Ÿšง    |    ๐Ÿšง    |   ๐Ÿšง   |
 | Push consumer with FIFO message listener       |   โœ…   |   โœ…   |   โœ…   |   ๐Ÿšง 
   |   โœ…   |   ๐Ÿšง    |    ๐Ÿšง    |   ๐Ÿšง   |
 | Push consumer with FIFO consume accelerator    |   โœ…   |   โœ…   |   ๐Ÿšง   |   ๐Ÿšง 
   |   ๐Ÿšง   |   ๐Ÿšง    |    ๐Ÿšง    |   ๐Ÿšง   |
+| Priority Message                               |   โœ…   |   ๐Ÿšง   |   ๐Ÿšง   |   ๐Ÿšง 
   |   ๐Ÿšง   |   ๐Ÿšง    |    ๐Ÿšง    |   ๐Ÿšง   |
 
 ## ๅ…ˆๅ†ณๆกไปถๅ’Œๆž„ๅปบ
 
diff --git a/README.md b/README.md
index f50deaa5..8f985f2d 100644
--- a/README.md
+++ b/README.md
@@ -28,6 +28,7 @@ Provide cloud-native and robust solutions for Java, C++, C#, 
Golang, Rust and al
 | Push consumer with concurrent message listener |   โœ…   |   โœ…   |   โœ…   |   ๐Ÿšง 
   |   โœ…   |   ๐Ÿšง    |    ๐Ÿšง    |   ๐Ÿšง   |
 | Push consumer with FIFO message listener       |   โœ…   |   โœ…   |   โœ…   |   ๐Ÿšง 
   |   โœ…   |   ๐Ÿšง    |    ๐Ÿšง    |   ๐Ÿšง   |
 | Push consumer with FIFO consume accelerator    |   โœ…   |   โœ…   |   ๐Ÿšง   |   ๐Ÿšง 
   |   ๐Ÿšง   |   ๐Ÿšง    |    ๐Ÿšง    |   ๐Ÿšง   |
+| Priority Message                               |   โœ…   |   ๐Ÿšง   |   ๐Ÿšง   |   ๐Ÿšง 
   |   ๐Ÿšง   |   ๐Ÿšง    |    ๐Ÿšง    |   ๐Ÿšง   |
 
 ## Prerequisite and Build
 
diff --git 
a/java/client-apis/src/main/java/org/apache/rocketmq/client/apis/message/Message.java
 
b/java/client-apis/src/main/java/org/apache/rocketmq/client/apis/message/Message.java
index e82be932..129d8898 100644
--- 
a/java/client-apis/src/main/java/org/apache/rocketmq/client/apis/message/Message.java
+++ 
b/java/client-apis/src/main/java/org/apache/rocketmq/client/apis/message/Message.java
@@ -86,4 +86,11 @@ public interface Message {
      * timestamp is not specified.
      */
     Optional<Long> getDeliveryTimestamp();
+
+    /**
+     * Get the priority of the message, which makes sense only when topic type 
is priority.
+     *
+     * @return message priority, which is optional, {@link Optional#empty()} 
means priority is not specified.
+     */
+    Optional<Integer> getPriority();
 }
diff --git 
a/java/client-apis/src/main/java/org/apache/rocketmq/client/apis/message/MessageBuilder.java
 
b/java/client-apis/src/main/java/org/apache/rocketmq/client/apis/message/MessageBuilder.java
index 3ccea469..09fafb01 100644
--- 
a/java/client-apis/src/main/java/org/apache/rocketmq/client/apis/message/MessageBuilder.java
+++ 
b/java/client-apis/src/main/java/org/apache/rocketmq/client/apis/message/MessageBuilder.java
@@ -105,6 +105,13 @@ public interface MessageBuilder {
      */
     MessageBuilder setDeliveryTimestamp(long deliveryTimestamp);
 
+    /**
+     * Set the priority for the message, which is optional.
+     * @param priority non-negative number in the range [0, N], regarded as 
highest priority if exceeds N
+     * @return the message builder instance.
+     */
+    MessageBuilder setPriority(int priority);
+
     /**
      * Add user property for the message.
      *
diff --git 
a/java/client-apis/src/main/java/org/apache/rocketmq/client/apis/message/MessageView.java
 
b/java/client-apis/src/main/java/org/apache/rocketmq/client/apis/message/MessageView.java
index 2f7909ae..d5547294 100644
--- 
a/java/client-apis/src/main/java/org/apache/rocketmq/client/apis/message/MessageView.java
+++ 
b/java/client-apis/src/main/java/org/apache/rocketmq/client/apis/message/MessageView.java
@@ -94,6 +94,13 @@ public interface MessageView {
      */
     Optional<Long> getDeliveryTimestamp();
 
+    /**
+     * Get the priority of the message, which makes sense only when topic type 
is priority.
+     *
+     * @return message priority, which is optional, {@link Optional#empty()} 
means priority is not specified.
+     */
+    Optional<Integer> getPriority();
+
     /**
      * Get the born host of the message.
      *
diff --git 
a/java/client/src/main/java/org/apache/rocketmq/client/java/example/ProducerPriorityMessageExample.java
 
b/java/client/src/main/java/org/apache/rocketmq/client/java/example/ProducerPriorityMessageExample.java
new file mode 100644
index 00000000..5d5ee4e8
--- /dev/null
+++ 
b/java/client/src/main/java/org/apache/rocketmq/client/java/example/ProducerPriorityMessageExample.java
@@ -0,0 +1,64 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.rocketmq.client.java.example;
+
+import java.nio.charset.StandardCharsets;
+import org.apache.rocketmq.client.apis.ClientException;
+import org.apache.rocketmq.client.apis.ClientServiceProvider;
+import org.apache.rocketmq.client.apis.message.Message;
+import org.apache.rocketmq.client.apis.producer.Producer;
+import org.apache.rocketmq.client.apis.producer.SendReceipt;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class ProducerPriorityMessageExample {
+    private static final Logger log = 
LoggerFactory.getLogger(ProducerPriorityMessageExample.class);
+
+    private ProducerPriorityMessageExample() {
+    }
+
+    public static void main(String[] args) throws ClientException {
+        final ClientServiceProvider provider = 
ClientServiceProvider.loadService();
+
+        String topic = "yourPriorityTopic";
+        final Producer producer = ProducerSingleton.getInstance(topic);
+        // Define your message body.
+        byte[] body = "This is a delay message for Apache 
RocketMQ".getBytes(StandardCharsets.UTF_8);
+        String tag = "yourMessageTagA";
+        final Message message = provider.newMessageBuilder()
+            // Set topic for the current message.
+            .setTopic(topic)
+            // Message secondary classifier of message besides topic.
+            .setTag(tag)
+            // Key(s) of the message, another way to mark message besides 
message id.
+            .setKeys("yourMessageKey")
+            // Set priority of message.
+            .setPriority(1)
+            .setBody(body)
+            .build();
+        try {
+            final SendReceipt sendReceipt = producer.send(message);
+            log.info("Send message successfully, messageId={}", 
sendReceipt.getMessageId());
+        } catch (Throwable t) {
+            log.error("Failed to send message", t);
+        }
+        // Close the producer when you don't need it anymore.
+        // You could close it manually or add this into the JVM shutdown hook.
+        // producer.close();
+    }
+}
\ No newline at end of file
diff --git 
a/java/client/src/main/java/org/apache/rocketmq/client/java/message/GeneralMessage.java
 
b/java/client/src/main/java/org/apache/rocketmq/client/java/message/GeneralMessage.java
index 827d8654..3f1ec5bd 100644
--- 
a/java/client/src/main/java/org/apache/rocketmq/client/java/message/GeneralMessage.java
+++ 
b/java/client/src/main/java/org/apache/rocketmq/client/java/message/GeneralMessage.java
@@ -96,6 +96,13 @@ public interface GeneralMessage {
      */
     Optional<Long> getDeliveryTimestamp();
 
+    /**
+     * Get the priority of the message, which makes sense only when topic type 
is priority.
+     *
+     * @return message priority, which is optional, {@link Optional#empty()} 
means priority is not specified.
+     */
+    Optional<Integer> getPriority();
+
     /**
      * Get the born host of the message.
      *
diff --git 
a/java/client/src/main/java/org/apache/rocketmq/client/java/message/GeneralMessageImpl.java
 
b/java/client/src/main/java/org/apache/rocketmq/client/java/message/GeneralMessageImpl.java
index ffc390e5..db3c63a3 100644
--- 
a/java/client/src/main/java/org/apache/rocketmq/client/java/message/GeneralMessageImpl.java
+++ 
b/java/client/src/main/java/org/apache/rocketmq/client/java/message/GeneralMessageImpl.java
@@ -37,6 +37,7 @@ public class GeneralMessageImpl implements GeneralMessage {
     private final String messageGroup;
     private final String liteTopic;
     private final Long deliveryTimestamp;
+    private final Integer priority;
     private final String bornHost;
     private final Long bornTimestamp;
     private final Integer deliveryAttempt;
@@ -61,6 +62,7 @@ public class GeneralMessageImpl implements GeneralMessage {
         this.messageGroup = message.getMessageGroup().orElse(null);
         this.liteTopic = message.getLiteTopic().orElse(null);
         this.deliveryTimestamp = message.getDeliveryTimestamp().orElse(null);
+        this.priority = message.getPriority().orElse(null);
         this.bornHost = null;
         this.bornTimestamp = null;
         this.deliveryAttempt = null;
@@ -96,6 +98,7 @@ public class GeneralMessageImpl implements GeneralMessage {
         this.messageGroup = message.getMessageGroup().orElse(null);
         this.liteTopic = message.getLiteTopic().orElse(null);
         this.deliveryTimestamp = message.getDeliveryTimestamp().orElse(null);
+        this.priority = message.getPriority().orElse(null);
         this.bornHost = message.getBornHost();
         this.bornTimestamp = message.getBornTimestamp();
         this.deliveryAttempt = message.getDeliveryAttempt();
@@ -149,6 +152,11 @@ public class GeneralMessageImpl implements GeneralMessage {
         return Optional.ofNullable(deliveryTimestamp);
     }
 
+    @Override
+    public Optional<Integer> getPriority() {
+        return Optional.ofNullable(priority);
+    }
+
     @Override
     public Optional<String> getBornHost() {
         return Optional.ofNullable(bornHost);
diff --git 
a/java/client/src/main/java/org/apache/rocketmq/client/java/message/MessageBuilderImpl.java
 
b/java/client/src/main/java/org/apache/rocketmq/client/java/message/MessageBuilderImpl.java
index 21fcc132..baf6a250 100644
--- 
a/java/client/src/main/java/org/apache/rocketmq/client/java/message/MessageBuilderImpl.java
+++ 
b/java/client/src/main/java/org/apache/rocketmq/client/java/message/MessageBuilderImpl.java
@@ -41,6 +41,7 @@ public class MessageBuilderImpl implements MessageBuilder {
     protected String messageGroup = null;
     protected String liteTopic = null;
     protected Long deliveryTimestamp = null;
+    protected Integer priority = null;
     protected Collection<String> keys = new HashSet<>();
     protected final Map<String, String> properties = new HashMap<>();
 
@@ -100,6 +101,7 @@ public class MessageBuilderImpl implements MessageBuilder {
     public MessageBuilder setMessageGroup(String messageGroup) {
         checkArgument(null == deliveryTimestamp, "messageGroup and 
deliveryTimestamp should not be set at same time");
         checkArgument(null == liteTopic, "messageGroup and liteTopic should 
not be set at same time");
+        checkArgument(null == priority, "messageGroup and priority should not 
be set at same time");
         checkArgument(StringUtils.isNotBlank(messageGroup), "messageGroup 
should not be blank");
         this.messageGroup = messageGroup;
         return this;
@@ -109,6 +111,7 @@ public class MessageBuilderImpl implements MessageBuilder {
     public MessageBuilder setLiteTopic(String liteTopic) {
         checkArgument(null == deliveryTimestamp, "liteTopic and 
deliveryTimestamp should not be set at same time");
         checkArgument(null == messageGroup, "liteTopic and messageGroup should 
not be set at same time");
+        checkArgument(null == priority, "liteTopic and priority should not be 
set at same time");
         checkArgument(StringUtils.isNotBlank(liteTopic), "liteTopic should not 
be blank");
         this.liteTopic = liteTopic;
         return this;
@@ -121,10 +124,24 @@ public class MessageBuilderImpl implements MessageBuilder 
{
     public MessageBuilder setDeliveryTimestamp(long deliveryTimestamp) {
         checkArgument(null == messageGroup, "deliveryTimestamp and 
messageGroup should not be set at same time");
         checkArgument(null == liteTopic, "deliveryTimestamp and liteTopic 
should not be set at same time");
+        checkArgument(null == priority, "deliveryTimestamp and priority should 
not be set at same time");
         this.deliveryTimestamp = deliveryTimestamp;
         return this;
     }
 
+    /**
+     * See {@link MessageBuilder#setPriority(int)}
+     */
+    @Override
+    public MessageBuilder setPriority(int priority) {
+        checkArgument(null == deliveryTimestamp, "priority and 
deliveryTimestamp should not be set at same time");
+        checkArgument(null == messageGroup, "priority and messageGroup should 
not be set at same time");
+        checkArgument(null == liteTopic, "priority and liteTopic should not be 
set at same time");
+        checkArgument(priority >= 0, "priority must be greater than or equal 
to 0");
+        this.priority = priority;
+        return this;
+    }
+
     /**
      * See {@link MessageBuilder#addProperty(String, String)}
      */
diff --git 
a/java/client/src/main/java/org/apache/rocketmq/client/java/message/MessageImpl.java
 
b/java/client/src/main/java/org/apache/rocketmq/client/java/message/MessageImpl.java
index adbec6c6..d081f337 100644
--- 
a/java/client/src/main/java/org/apache/rocketmq/client/java/message/MessageImpl.java
+++ 
b/java/client/src/main/java/org/apache/rocketmq/client/java/message/MessageImpl.java
@@ -49,6 +49,8 @@ public class MessageImpl implements Message {
     private final String liteTopic;
     @Nullable
     private final Long deliveryTimestamp;
+    @Nullable
+    private final Integer priority;
 
     /**
      * The caller is supposed to have validated the arguments and handled 
throwing exception or
@@ -61,6 +63,7 @@ public class MessageImpl implements Message {
         this.messageGroup = builder.messageGroup;
         this.liteTopic = builder.liteTopic;
         this.deliveryTimestamp = builder.deliveryTimestamp;
+        this.priority = builder.priority;
         this.keys = builder.keys;
         this.properties = builder.properties;
     }
@@ -81,6 +84,7 @@ public class MessageImpl implements Message {
         this.messageGroup = message.getMessageGroup().orElse(null);
         this.deliveryTimestamp = message.getDeliveryTimestamp().orElse(null);
         this.liteTopic = message.getLiteTopic().orElse(null);
+        this.priority = message.getPriority().orElse(null);
         this.keys = message.getKeys();
         this.properties = message.getProperties();
     }
@@ -133,6 +137,14 @@ public class MessageImpl implements Message {
         return Optional.ofNullable(deliveryTimestamp);
     }
 
+    /**
+     * @see Message#getPriority()
+     */
+    @Override
+    public Optional<Integer> getPriority() {
+        return Optional.ofNullable(priority);
+    }
+
     /**
      * @see Message#getMessageGroup()
      */
@@ -152,7 +164,9 @@ public class MessageImpl implements Message {
             .add("topic", topic)
             .add("tag", tag)
             .add("messageGroup", messageGroup)
+            .add("liteTopic", liteTopic)
             .add("deliveryTimestamp", deliveryTimestamp)
+            .add("priority", priority)
             .add("keys", keys)
             .add("properties", properties)
             .toString();
diff --git 
a/java/client/src/main/java/org/apache/rocketmq/client/java/message/MessageType.java
 
b/java/client/src/main/java/org/apache/rocketmq/client/java/message/MessageType.java
index 5748149e..1cce059b 100644
--- 
a/java/client/src/main/java/org/apache/rocketmq/client/java/message/MessageType.java
+++ 
b/java/client/src/main/java/org/apache/rocketmq/client/java/message/MessageType.java
@@ -22,6 +22,7 @@ public enum MessageType {
     FIFO,
     LITE,
     DELAY,
+    PRIORITY,
     TRANSACTION;
 
     public static MessageType fromProtobuf(apache.rocketmq.v2.MessageType 
messageType) {
@@ -36,6 +37,8 @@ public enum MessageType {
                 return MessageType.DELAY;
             case TRANSACTION:
                 return MessageType.TRANSACTION;
+            case PRIORITY:
+                return MessageType.PRIORITY;
             case MESSAGE_TYPE_UNSPECIFIED:
             default:
                 throw new IllegalArgumentException("Message type is not 
specified");
@@ -54,6 +57,8 @@ public enum MessageType {
                 return apache.rocketmq.v2.MessageType.DELAY;
             case TRANSACTION:
                 return apache.rocketmq.v2.MessageType.TRANSACTION;
+            case PRIORITY:
+                return apache.rocketmq.v2.MessageType.PRIORITY;
             default:
                 return apache.rocketmq.v2.MessageType.MESSAGE_TYPE_UNSPECIFIED;
         }
diff --git 
a/java/client/src/main/java/org/apache/rocketmq/client/java/message/MessageViewImpl.java
 
b/java/client/src/main/java/org/apache/rocketmq/client/java/message/MessageViewImpl.java
index 420e3d35..2ce74902 100644
--- 
a/java/client/src/main/java/org/apache/rocketmq/client/java/message/MessageViewImpl.java
+++ 
b/java/client/src/main/java/org/apache/rocketmq/client/java/message/MessageViewImpl.java
@@ -53,6 +53,7 @@ public class MessageViewImpl implements MessageView {
     private final String messageGroup;
     private final String liteTopic;
     private final Long deliveryTimestamp;
+    private final Integer priority;
     private final Collection<String> keys;
     private final Map<String, String> properties;
     private final String bornHost;
@@ -68,7 +69,7 @@ public class MessageViewImpl implements MessageView {
 
     public MessageViewImpl(MessageId messageId, String topic, byte[] body, 
String tag,
         String messageGroup, String liteTopic,
-        Long deliveryTimestamp, Collection<String> keys, Map<String, String> 
properties,
+        Long deliveryTimestamp, Integer priority, Collection<String> keys, 
Map<String, String> properties,
         String bornHost, long bornTimestamp, int deliveryAttempt, 
MessageQueueImpl messageQueue,
         String receiptHandle, long offset, boolean corrupted,
         Long transportDeliveryTimestamp) {
@@ -79,6 +80,7 @@ public class MessageViewImpl implements MessageView {
         this.messageGroup = messageGroup;
         this.liteTopic = liteTopic;
         this.deliveryTimestamp = deliveryTimestamp;
+        this.priority = priority;
         this.keys = checkNotNull(keys, "keys should not be null");
         this.properties = checkNotNull(properties, "properties should not be 
null");
         this.bornHost = checkNotNull(bornHost, "bornHost should not be null");
@@ -165,6 +167,14 @@ public class MessageViewImpl implements MessageView {
         return Optional.ofNullable(deliveryTimestamp);
     }
 
+    /**
+     * @see MessageView#getPriority()
+     */
+    @Override
+    public Optional<Integer> getPriority() {
+        return Optional.ofNullable(priority);
+    }
+
     /**
      * @see MessageView#getBornHost()
      */
@@ -303,6 +313,7 @@ public class MessageViewImpl implements MessageView {
         String liteTopic = systemProperties.hasLiteTopic() ? 
systemProperties.getLiteTopic() : null;
         Long deliveryTimestamp = systemProperties.hasDeliveryTimestamp() ?
             Timestamps.toMillis(systemProperties.getDeliveryTimestamp()) : 
null;
+        Integer priority = systemProperties.hasPriority() ? 
systemProperties.getPriority() : null;
         final ProtocolStringList keys = systemProperties.getKeysList();
         final String bornHost = systemProperties.getBornHost();
         final long bornTimestamp = 
Timestamps.toMillis(systemProperties.getBornTimestamp());
@@ -310,7 +321,7 @@ public class MessageViewImpl implements MessageView {
         final long offset = systemProperties.getQueueOffset();
         final Map<String, String> properties = message.getUserPropertiesMap();
         final String receiptHandle = systemProperties.getReceiptHandle();
-        return new MessageViewImpl(messageId, topic, body, tag, messageGroup, 
liteTopic, deliveryTimestamp,
+        return new MessageViewImpl(messageId, topic, body, tag, messageGroup, 
liteTopic, deliveryTimestamp, priority,
             keys, properties, bornHost, bornTimestamp, deliveryAttempt,
             mq, receiptHandle, offset, corrupted, transportDeliveryTimestamp);
     }
@@ -329,6 +340,7 @@ public class MessageViewImpl implements MessageView {
             .add("messageGroup", messageGroup)
             .add("liteTopic", liteTopic)
             .add("deliveryTimestamp", deliveryTimestamp)
+            .add("priority", priority)
             .add("properties", properties)
             .toString();
     }
diff --git 
a/java/client/src/main/java/org/apache/rocketmq/client/java/message/PublishingMessageImpl.java
 
b/java/client/src/main/java/org/apache/rocketmq/client/java/message/PublishingMessageImpl.java
index 96987a78..08b281e4 100644
--- 
a/java/client/src/main/java/org/apache/rocketmq/client/java/message/PublishingMessageImpl.java
+++ 
b/java/client/src/main/java/org/apache/rocketmq/client/java/message/PublishingMessageImpl.java
@@ -50,6 +50,7 @@ public class PublishingMessageImpl extends MessageImpl {
         // Normal message.
         if (!message.getMessageGroup().isPresent() &&
             !message.getLiteTopic().isPresent() &&
+            !message.getPriority().isPresent() &&
             !message.getDeliveryTimestamp().isPresent() && !txEnabled) {
             messageType = MessageType.NORMAL;
             return;
@@ -69,14 +70,22 @@ public class PublishingMessageImpl extends MessageImpl {
             messageType = MessageType.DELAY;
             return;
         }
+        // Priority message.
+        if (message.getPriority().isPresent() && !txEnabled) {
+            messageType = MessageType.PRIORITY;
+            return;
+        }
         // Transaction message.
         if (!message.getMessageGroup().isPresent() &&
+            !message.getLiteTopic().isPresent() &&
+            !message.getPriority().isPresent() &&
             !message.getDeliveryTimestamp().isPresent() && txEnabled) {
             messageType = MessageType.TRANSACTION;
             return;
         }
-        // Transaction semantics is conflicted with fifo/delay.
-        throw new IllegalArgumentException("Transactional message should not 
set messageGroup or deliveryTimestamp");
+        // Transaction semantics is conflicted with fifo/delay/lite/priority.
+        throw new IllegalArgumentException(
+            "Transactional message should not set messageGroup, 
deliveryTimestamp, lite and priority");
     }
 
     public MessageId getMessageId() {
@@ -117,7 +126,10 @@ public class PublishingMessageImpl extends MessageImpl {
             .ifPresent(millis -> 
systemPropertiesBuilder.setDeliveryTimestamp(Timestamps.fromMillis(millis)));
         // Message group
         
this.getMessageGroup().ifPresent(systemPropertiesBuilder::setMessageGroup);
+        // Lite
         this.getLiteTopic().ifPresent(systemPropertiesBuilder::setLiteTopic);
+        // Priority
+        this.getPriority().ifPresent(systemPropertiesBuilder::setPriority);
         final SystemProperties systemProperties = 
systemPropertiesBuilder.build();
         Resource topicResource = 
Resource.newBuilder().setResourceNamespace(namespace).setName(getTopic()).build();
         return apache.rocketmq.v2.Message.newBuilder()
diff --git 
a/java/client/src/test/java/org/apache/rocketmq/client/java/message/GeneralMessageImplTest.java
 
b/java/client/src/test/java/org/apache/rocketmq/client/java/message/GeneralMessageImplTest.java
index 23f445c7..eae2b8eb 100644
--- 
a/java/client/src/test/java/org/apache/rocketmq/client/java/message/GeneralMessageImplTest.java
+++ 
b/java/client/src/test/java/org/apache/rocketmq/client/java/message/GeneralMessageImplTest.java
@@ -176,6 +176,39 @@ public class GeneralMessageImplTest extends TestBase {
         
assertFalse(generalMessage.getTransportDeliveryTimestamp().isPresent());
     }
 
+    @Test
+    public void testMessagePriority() {
+        String topic = "testTopic";
+        byte[] body = "foobar".getBytes(StandardCharsets.UTF_8);
+        int priority = 1;
+
+        final Message message = new MessageBuilderImpl()
+            .setTopic(topic)
+            .setBody(body)
+            .setPriority(priority)
+            .build();
+
+        final GeneralMessageImpl generalMessage = new 
GeneralMessageImpl(message);
+        assertFalse(generalMessage.getMessageId().isPresent());
+        assertEquals(topic, generalMessage.getTopic());
+        assertEquals(ByteBuffer.wrap(body), generalMessage.getBody());
+        assertFalse(generalMessage.getTag().isPresent());
+        assertEquals(0, generalMessage.getKeys().size());
+
+        assertTrue(generalMessage.getPriority().isPresent());
+        assertEquals(priority, (int) generalMessage.getPriority().get());
+
+        assertFalse(generalMessage.getMessageGroup().isPresent());
+        assertFalse(generalMessage.getBornHost().isPresent());
+        assertFalse(generalMessage.getBornTimestamp().isPresent());
+        assertFalse(generalMessage.getDeliveryAttempt().isPresent());
+        assertFalse(generalMessage.getLiteTopic().isPresent());
+        assertFalse(generalMessage.getDecodeTimestamp().isPresent());
+        
assertFalse(generalMessage.getTransportDeliveryTimestamp().isPresent());
+
+        assertFalse(generalMessage.getDeliveryTimestamp().isPresent());
+    }
+
     @Test
     public void testMessageView() {
         MessageId messageId = MessageIdCodec.getInstance().nextMessageId();
@@ -185,6 +218,7 @@ public class GeneralMessageImplTest extends TestBase {
         String messageGroup = "messageGroup0";
         String liteTopic = "liteTopic0";
         long deliveryTimestamp = System.currentTimeMillis();
+        int priority = 1;
         List<String> keys = new ArrayList<>();
         keys.add("keyA");
         Map<String, String> properties = new HashMap<>();
@@ -200,7 +234,7 @@ public class GeneralMessageImplTest extends TestBase {
 
         final MessageViewImpl messageView = new MessageViewImpl(messageId, 
topic, body, tag,
             messageGroup, liteTopic,
-            deliveryTimestamp, keys, properties, bornHost, bornTimestamp, 
deliveryAttempt, mq, receiptHandle,
+            deliveryTimestamp, priority, keys, properties, bornHost, 
bornTimestamp, deliveryAttempt, mq, receiptHandle,
             offset, corrupted, transportDeliveryTimestamp);
         final GeneralMessageImpl generalMessage = new 
GeneralMessageImpl(messageView);
         assertTrue(generalMessage.getMessageId().isPresent());
@@ -217,6 +251,8 @@ public class GeneralMessageImplTest extends TestBase {
         assertEquals(liteTopic, generalMessage.getLiteTopic().get());
         assertTrue(generalMessage.getDeliveryTimestamp().isPresent());
         assertEquals(deliveryTimestamp, (long) 
generalMessage.getDeliveryTimestamp().get());
+        assertTrue(generalMessage.getPriority().isPresent());
+        assertEquals(priority, (int) generalMessage.getPriority().get());
         assertTrue(generalMessage.getBornHost().isPresent());
         assertEquals(bornHost, generalMessage.getBornHost().get());
         assertTrue(generalMessage.getBornTimestamp().isPresent());
diff --git 
a/java/client/src/test/java/org/apache/rocketmq/client/java/message/MessageImplTest.java
 
b/java/client/src/test/java/org/apache/rocketmq/client/java/message/MessageImplTest.java
index 88797670..86145f66 100644
--- 
a/java/client/src/test/java/org/apache/rocketmq/client/java/message/MessageImplTest.java
+++ 
b/java/client/src/test/java/org/apache/rocketmq/client/java/message/MessageImplTest.java
@@ -130,6 +130,8 @@ public class MessageImplTest extends TestBase {
         assertArrayEquals(FAKE_MESSAGE_BODY, bytes);
         assertFalse(message.getDeliveryTimestamp().isPresent());
         assertFalse(message.getMessageGroup().isPresent());
+        assertFalse(message.getLiteTopic().isPresent());
+        assertFalse(message.getPriority().isPresent());
     }
 
     @Test(expected = IllegalArgumentException.class)
@@ -148,4 +150,46 @@ public class MessageImplTest extends TestBase {
         assertEquals("liteTopicA", message.getLiteTopic().get());
     }
 
+    @Test
+    public void testPrioritySetter() {
+        final Message message =
+            
provider.newMessageBuilder().setPriority(1).setTopic(FAKE_TOPIC_0).setBody(FAKE_MESSAGE_BODY).build();
+        assertTrue(message.getPriority().isPresent());
+        assertEquals(1, (int) message.getPriority().get());
+
+        Assert.assertThrows(IllegalArgumentException.class, () ->
+            provider.newMessageBuilder().setPriority(-1));
+    }
+
+    @Test
+    public void testMessageTypeConflict() {
+        Assert.assertThrows(IllegalArgumentException.class, () ->
+            
provider.newMessageBuilder().setDeliveryTimestamp(System.currentTimeMillis()).setMessageGroup("HW"));
+        Assert.assertThrows(IllegalArgumentException.class, () ->
+            
provider.newMessageBuilder().setDeliveryTimestamp(System.currentTimeMillis()).setLiteTopic("HW"));
+        Assert.assertThrows(IllegalArgumentException.class, () ->
+            
provider.newMessageBuilder().setDeliveryTimestamp(System.currentTimeMillis()).setPriority(1));
+
+        Assert.assertThrows(IllegalArgumentException.class, () ->
+            
provider.newMessageBuilder().setMessageGroup("HW").setLiteTopic("HW"));
+        Assert.assertThrows(IllegalArgumentException.class, () ->
+            
provider.newMessageBuilder().setMessageGroup("HW").setDeliveryTimestamp(System.currentTimeMillis()));
+        Assert.assertThrows(IllegalArgumentException.class, () ->
+            provider.newMessageBuilder().setMessageGroup("HW").setPriority(1));
+
+        Assert.assertThrows(IllegalArgumentException.class, () ->
+            
provider.newMessageBuilder().setLiteTopic("HW").setMessageGroup("HW"));
+        Assert.assertThrows(IllegalArgumentException.class, () ->
+            
provider.newMessageBuilder().setLiteTopic("HW").setDeliveryTimestamp(System.currentTimeMillis()));
+        Assert.assertThrows(IllegalArgumentException.class, () ->
+            provider.newMessageBuilder().setLiteTopic("HW").setPriority(1));
+
+        Assert.assertThrows(IllegalArgumentException.class, () ->
+            
provider.newMessageBuilder().setPriority(1).setDeliveryTimestamp(System.currentTimeMillis()));
+        Assert.assertThrows(IllegalArgumentException.class, () ->
+            provider.newMessageBuilder().setPriority(1).setLiteTopic("HW"));
+        Assert.assertThrows(IllegalArgumentException.class, () ->
+            provider.newMessageBuilder().setPriority(1).setMessageGroup("HW"));
+    }
+
 }
\ No newline at end of file
diff --git 
a/java/client/src/test/java/org/apache/rocketmq/client/java/tool/TestBase.java 
b/java/client/src/test/java/org/apache/rocketmq/client/java/tool/TestBase.java
index 1d7e7581..ee316f2b 100644
--- 
a/java/client/src/test/java/org/apache/rocketmq/client/java/tool/TestBase.java
+++ 
b/java/client/src/test/java/org/apache/rocketmq/client/java/tool/TestBase.java
@@ -183,7 +183,7 @@ public class TestBase {
         final byte[] body = RandomUtils.nextBytes(bodySize);
         Map<String, String> properties = new HashMap<>();
         List<String> keys = new ArrayList<>();
-        return new MessageViewImpl(messageId, FAKE_TOPIC_0, body, null, null, 
null, null,
+        return new MessageViewImpl(messageId, FAKE_TOPIC_0, body, null, null, 
null, null, null,
             keys, properties, FAKE_HOST_0, 1, 1, mq, FAKE_RECEIPT_HANDLE_0, 1, 
corrupted,
             System.currentTimeMillis());
     }
diff --git a/java/pom.xml b/java/pom.xml
index f81820e1..4926e128 100644
--- a/java/pom.xml
+++ b/java/pom.xml
@@ -48,7 +48,7 @@
            ~  1. Whether it is essential, because the current shaded jar is 
fat enough.
            ~  2. Make sure that it is compatible with Java 8.
          -->
-        <rocketmq-proto.version>2.1.0</rocketmq-proto.version>
+        <rocketmq-proto.version>2.1.1</rocketmq-proto.version>
         <annotations-api.version>1.3.5</annotations-api.version>
         <protobuf.version>3.24.4</protobuf.version>
         <grpc.version>1.50.0</grpc.version>

Reply via email to