This is an automated email from the ASF dual-hosted git repository.
lizhimin pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/rocketmq-clients.git
The following commit(s) were added to refs/heads/master by this push:
new 8903b2fd [ISSUE #1152] [RIP-80] [Java] Implementation of Priority
Message (#1153)
8903b2fd is described below
commit 8903b2fd5a04d1385525aa9c13e2b067d0f0b9b1
Author: imzs <[email protected]>
AuthorDate: Mon Dec 29 14:34:51 2025 +0800
[ISSUE #1152] [RIP-80] [Java] Implementation of Priority Message (#1153)
Change-Id: Ib4d1d67573948a5c66ec1e0c83082d2bfb44d9e6
---
README-CN.md | 1 +
README.md | 1 +
.../rocketmq/client/apis/message/Message.java | 7 +++
.../client/apis/message/MessageBuilder.java | 7 +++
.../rocketmq/client/apis/message/MessageView.java | 7 +++
.../example/ProducerPriorityMessageExample.java | 64 ++++++++++++++++++++++
.../client/java/message/GeneralMessage.java | 7 +++
.../client/java/message/GeneralMessageImpl.java | 8 +++
.../client/java/message/MessageBuilderImpl.java | 17 ++++++
.../rocketmq/client/java/message/MessageImpl.java | 14 +++++
.../rocketmq/client/java/message/MessageType.java | 5 ++
.../client/java/message/MessageViewImpl.java | 16 +++++-
.../client/java/message/PublishingMessageImpl.java | 16 +++++-
.../java/message/GeneralMessageImplTest.java | 38 ++++++++++++-
.../client/java/message/MessageImplTest.java | 44 +++++++++++++++
.../apache/rocketmq/client/java/tool/TestBase.java | 2 +-
java/pom.xml | 2 +-
17 files changed, 249 insertions(+), 7 deletions(-)
diff --git a/README-CN.md b/README-CN.md
index 6d7da093..88fb3023 100644
--- a/README-CN.md
+++ b/README-CN.md
@@ -28,6 +28,7 @@
| Push consumer with concurrent message listener | โ
| โ
| โ
| ๐ง
| โ
| ๐ง | ๐ง | ๐ง |
| Push consumer with FIFO message listener | โ
| โ
| โ
| ๐ง
| โ
| ๐ง | ๐ง | ๐ง |
| Push consumer with FIFO consume accelerator | โ
| โ
| ๐ง | ๐ง
| ๐ง | ๐ง | ๐ง | ๐ง |
+| Priority Message | โ
| ๐ง | ๐ง | ๐ง
| ๐ง | ๐ง | ๐ง | ๐ง |
## ๅ
ๅณๆกไปถๅๆๅปบ
diff --git a/README.md b/README.md
index f50deaa5..8f985f2d 100644
--- a/README.md
+++ b/README.md
@@ -28,6 +28,7 @@ Provide cloud-native and robust solutions for Java, C++, C#,
Golang, Rust and al
| Push consumer with concurrent message listener | โ
| โ
| โ
| ๐ง
| โ
| ๐ง | ๐ง | ๐ง |
| Push consumer with FIFO message listener | โ
| โ
| โ
| ๐ง
| โ
| ๐ง | ๐ง | ๐ง |
| Push consumer with FIFO consume accelerator | โ
| โ
| ๐ง | ๐ง
| ๐ง | ๐ง | ๐ง | ๐ง |
+| Priority Message | โ
| ๐ง | ๐ง | ๐ง
| ๐ง | ๐ง | ๐ง | ๐ง |
## Prerequisite and Build
diff --git
a/java/client-apis/src/main/java/org/apache/rocketmq/client/apis/message/Message.java
b/java/client-apis/src/main/java/org/apache/rocketmq/client/apis/message/Message.java
index e82be932..129d8898 100644
---
a/java/client-apis/src/main/java/org/apache/rocketmq/client/apis/message/Message.java
+++
b/java/client-apis/src/main/java/org/apache/rocketmq/client/apis/message/Message.java
@@ -86,4 +86,11 @@ public interface Message {
* timestamp is not specified.
*/
Optional<Long> getDeliveryTimestamp();
+
+ /**
+ * Get the priority of the message, which makes sense only when topic type
is priority.
+ *
+ * @return message priority, which is optional, {@link Optional#empty()}
means priority is not specified.
+ */
+ Optional<Integer> getPriority();
}
diff --git
a/java/client-apis/src/main/java/org/apache/rocketmq/client/apis/message/MessageBuilder.java
b/java/client-apis/src/main/java/org/apache/rocketmq/client/apis/message/MessageBuilder.java
index 3ccea469..09fafb01 100644
---
a/java/client-apis/src/main/java/org/apache/rocketmq/client/apis/message/MessageBuilder.java
+++
b/java/client-apis/src/main/java/org/apache/rocketmq/client/apis/message/MessageBuilder.java
@@ -105,6 +105,13 @@ public interface MessageBuilder {
*/
MessageBuilder setDeliveryTimestamp(long deliveryTimestamp);
+ /**
+ * Set the priority for the message, which is optional.
+ * @param priority non-negative number in the range [0, N], regarded as
highest priority if exceeds N
+ * @return the message builder instance.
+ */
+ MessageBuilder setPriority(int priority);
+
/**
* Add user property for the message.
*
diff --git
a/java/client-apis/src/main/java/org/apache/rocketmq/client/apis/message/MessageView.java
b/java/client-apis/src/main/java/org/apache/rocketmq/client/apis/message/MessageView.java
index 2f7909ae..d5547294 100644
---
a/java/client-apis/src/main/java/org/apache/rocketmq/client/apis/message/MessageView.java
+++
b/java/client-apis/src/main/java/org/apache/rocketmq/client/apis/message/MessageView.java
@@ -94,6 +94,13 @@ public interface MessageView {
*/
Optional<Long> getDeliveryTimestamp();
+ /**
+ * Get the priority of the message, which makes sense only when topic type
is priority.
+ *
+ * @return message priority, which is optional, {@link Optional#empty()}
means priority is not specified.
+ */
+ Optional<Integer> getPriority();
+
/**
* Get the born host of the message.
*
diff --git
a/java/client/src/main/java/org/apache/rocketmq/client/java/example/ProducerPriorityMessageExample.java
b/java/client/src/main/java/org/apache/rocketmq/client/java/example/ProducerPriorityMessageExample.java
new file mode 100644
index 00000000..5d5ee4e8
--- /dev/null
+++
b/java/client/src/main/java/org/apache/rocketmq/client/java/example/ProducerPriorityMessageExample.java
@@ -0,0 +1,64 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.rocketmq.client.java.example;
+
+import java.nio.charset.StandardCharsets;
+import org.apache.rocketmq.client.apis.ClientException;
+import org.apache.rocketmq.client.apis.ClientServiceProvider;
+import org.apache.rocketmq.client.apis.message.Message;
+import org.apache.rocketmq.client.apis.producer.Producer;
+import org.apache.rocketmq.client.apis.producer.SendReceipt;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class ProducerPriorityMessageExample {
+ private static final Logger log =
LoggerFactory.getLogger(ProducerPriorityMessageExample.class);
+
+ private ProducerPriorityMessageExample() {
+ }
+
+ public static void main(String[] args) throws ClientException {
+ final ClientServiceProvider provider =
ClientServiceProvider.loadService();
+
+ String topic = "yourPriorityTopic";
+ final Producer producer = ProducerSingleton.getInstance(topic);
+ // Define your message body.
+ byte[] body = "This is a delay message for Apache
RocketMQ".getBytes(StandardCharsets.UTF_8);
+ String tag = "yourMessageTagA";
+ final Message message = provider.newMessageBuilder()
+ // Set topic for the current message.
+ .setTopic(topic)
+ // Message secondary classifier of message besides topic.
+ .setTag(tag)
+ // Key(s) of the message, another way to mark message besides
message id.
+ .setKeys("yourMessageKey")
+ // Set priority of message.
+ .setPriority(1)
+ .setBody(body)
+ .build();
+ try {
+ final SendReceipt sendReceipt = producer.send(message);
+ log.info("Send message successfully, messageId={}",
sendReceipt.getMessageId());
+ } catch (Throwable t) {
+ log.error("Failed to send message", t);
+ }
+ // Close the producer when you don't need it anymore.
+ // You could close it manually or add this into the JVM shutdown hook.
+ // producer.close();
+ }
+}
\ No newline at end of file
diff --git
a/java/client/src/main/java/org/apache/rocketmq/client/java/message/GeneralMessage.java
b/java/client/src/main/java/org/apache/rocketmq/client/java/message/GeneralMessage.java
index 827d8654..3f1ec5bd 100644
---
a/java/client/src/main/java/org/apache/rocketmq/client/java/message/GeneralMessage.java
+++
b/java/client/src/main/java/org/apache/rocketmq/client/java/message/GeneralMessage.java
@@ -96,6 +96,13 @@ public interface GeneralMessage {
*/
Optional<Long> getDeliveryTimestamp();
+ /**
+ * Get the priority of the message, which makes sense only when topic type
is priority.
+ *
+ * @return message priority, which is optional, {@link Optional#empty()}
means priority is not specified.
+ */
+ Optional<Integer> getPriority();
+
/**
* Get the born host of the message.
*
diff --git
a/java/client/src/main/java/org/apache/rocketmq/client/java/message/GeneralMessageImpl.java
b/java/client/src/main/java/org/apache/rocketmq/client/java/message/GeneralMessageImpl.java
index ffc390e5..db3c63a3 100644
---
a/java/client/src/main/java/org/apache/rocketmq/client/java/message/GeneralMessageImpl.java
+++
b/java/client/src/main/java/org/apache/rocketmq/client/java/message/GeneralMessageImpl.java
@@ -37,6 +37,7 @@ public class GeneralMessageImpl implements GeneralMessage {
private final String messageGroup;
private final String liteTopic;
private final Long deliveryTimestamp;
+ private final Integer priority;
private final String bornHost;
private final Long bornTimestamp;
private final Integer deliveryAttempt;
@@ -61,6 +62,7 @@ public class GeneralMessageImpl implements GeneralMessage {
this.messageGroup = message.getMessageGroup().orElse(null);
this.liteTopic = message.getLiteTopic().orElse(null);
this.deliveryTimestamp = message.getDeliveryTimestamp().orElse(null);
+ this.priority = message.getPriority().orElse(null);
this.bornHost = null;
this.bornTimestamp = null;
this.deliveryAttempt = null;
@@ -96,6 +98,7 @@ public class GeneralMessageImpl implements GeneralMessage {
this.messageGroup = message.getMessageGroup().orElse(null);
this.liteTopic = message.getLiteTopic().orElse(null);
this.deliveryTimestamp = message.getDeliveryTimestamp().orElse(null);
+ this.priority = message.getPriority().orElse(null);
this.bornHost = message.getBornHost();
this.bornTimestamp = message.getBornTimestamp();
this.deliveryAttempt = message.getDeliveryAttempt();
@@ -149,6 +152,11 @@ public class GeneralMessageImpl implements GeneralMessage {
return Optional.ofNullable(deliveryTimestamp);
}
+ @Override
+ public Optional<Integer> getPriority() {
+ return Optional.ofNullable(priority);
+ }
+
@Override
public Optional<String> getBornHost() {
return Optional.ofNullable(bornHost);
diff --git
a/java/client/src/main/java/org/apache/rocketmq/client/java/message/MessageBuilderImpl.java
b/java/client/src/main/java/org/apache/rocketmq/client/java/message/MessageBuilderImpl.java
index 21fcc132..baf6a250 100644
---
a/java/client/src/main/java/org/apache/rocketmq/client/java/message/MessageBuilderImpl.java
+++
b/java/client/src/main/java/org/apache/rocketmq/client/java/message/MessageBuilderImpl.java
@@ -41,6 +41,7 @@ public class MessageBuilderImpl implements MessageBuilder {
protected String messageGroup = null;
protected String liteTopic = null;
protected Long deliveryTimestamp = null;
+ protected Integer priority = null;
protected Collection<String> keys = new HashSet<>();
protected final Map<String, String> properties = new HashMap<>();
@@ -100,6 +101,7 @@ public class MessageBuilderImpl implements MessageBuilder {
public MessageBuilder setMessageGroup(String messageGroup) {
checkArgument(null == deliveryTimestamp, "messageGroup and
deliveryTimestamp should not be set at same time");
checkArgument(null == liteTopic, "messageGroup and liteTopic should
not be set at same time");
+ checkArgument(null == priority, "messageGroup and priority should not
be set at same time");
checkArgument(StringUtils.isNotBlank(messageGroup), "messageGroup
should not be blank");
this.messageGroup = messageGroup;
return this;
@@ -109,6 +111,7 @@ public class MessageBuilderImpl implements MessageBuilder {
public MessageBuilder setLiteTopic(String liteTopic) {
checkArgument(null == deliveryTimestamp, "liteTopic and
deliveryTimestamp should not be set at same time");
checkArgument(null == messageGroup, "liteTopic and messageGroup should
not be set at same time");
+ checkArgument(null == priority, "liteTopic and priority should not be
set at same time");
checkArgument(StringUtils.isNotBlank(liteTopic), "liteTopic should not
be blank");
this.liteTopic = liteTopic;
return this;
@@ -121,10 +124,24 @@ public class MessageBuilderImpl implements MessageBuilder
{
public MessageBuilder setDeliveryTimestamp(long deliveryTimestamp) {
checkArgument(null == messageGroup, "deliveryTimestamp and
messageGroup should not be set at same time");
checkArgument(null == liteTopic, "deliveryTimestamp and liteTopic
should not be set at same time");
+ checkArgument(null == priority, "deliveryTimestamp and priority should
not be set at same time");
this.deliveryTimestamp = deliveryTimestamp;
return this;
}
+ /**
+ * See {@link MessageBuilder#setPriority(int)}
+ */
+ @Override
+ public MessageBuilder setPriority(int priority) {
+ checkArgument(null == deliveryTimestamp, "priority and
deliveryTimestamp should not be set at same time");
+ checkArgument(null == messageGroup, "priority and messageGroup should
not be set at same time");
+ checkArgument(null == liteTopic, "priority and liteTopic should not be
set at same time");
+ checkArgument(priority >= 0, "priority must be greater than or equal
to 0");
+ this.priority = priority;
+ return this;
+ }
+
/**
* See {@link MessageBuilder#addProperty(String, String)}
*/
diff --git
a/java/client/src/main/java/org/apache/rocketmq/client/java/message/MessageImpl.java
b/java/client/src/main/java/org/apache/rocketmq/client/java/message/MessageImpl.java
index adbec6c6..d081f337 100644
---
a/java/client/src/main/java/org/apache/rocketmq/client/java/message/MessageImpl.java
+++
b/java/client/src/main/java/org/apache/rocketmq/client/java/message/MessageImpl.java
@@ -49,6 +49,8 @@ public class MessageImpl implements Message {
private final String liteTopic;
@Nullable
private final Long deliveryTimestamp;
+ @Nullable
+ private final Integer priority;
/**
* The caller is supposed to have validated the arguments and handled
throwing exception or
@@ -61,6 +63,7 @@ public class MessageImpl implements Message {
this.messageGroup = builder.messageGroup;
this.liteTopic = builder.liteTopic;
this.deliveryTimestamp = builder.deliveryTimestamp;
+ this.priority = builder.priority;
this.keys = builder.keys;
this.properties = builder.properties;
}
@@ -81,6 +84,7 @@ public class MessageImpl implements Message {
this.messageGroup = message.getMessageGroup().orElse(null);
this.deliveryTimestamp = message.getDeliveryTimestamp().orElse(null);
this.liteTopic = message.getLiteTopic().orElse(null);
+ this.priority = message.getPriority().orElse(null);
this.keys = message.getKeys();
this.properties = message.getProperties();
}
@@ -133,6 +137,14 @@ public class MessageImpl implements Message {
return Optional.ofNullable(deliveryTimestamp);
}
+ /**
+ * @see Message#getPriority()
+ */
+ @Override
+ public Optional<Integer> getPriority() {
+ return Optional.ofNullable(priority);
+ }
+
/**
* @see Message#getMessageGroup()
*/
@@ -152,7 +164,9 @@ public class MessageImpl implements Message {
.add("topic", topic)
.add("tag", tag)
.add("messageGroup", messageGroup)
+ .add("liteTopic", liteTopic)
.add("deliveryTimestamp", deliveryTimestamp)
+ .add("priority", priority)
.add("keys", keys)
.add("properties", properties)
.toString();
diff --git
a/java/client/src/main/java/org/apache/rocketmq/client/java/message/MessageType.java
b/java/client/src/main/java/org/apache/rocketmq/client/java/message/MessageType.java
index 5748149e..1cce059b 100644
---
a/java/client/src/main/java/org/apache/rocketmq/client/java/message/MessageType.java
+++
b/java/client/src/main/java/org/apache/rocketmq/client/java/message/MessageType.java
@@ -22,6 +22,7 @@ public enum MessageType {
FIFO,
LITE,
DELAY,
+ PRIORITY,
TRANSACTION;
public static MessageType fromProtobuf(apache.rocketmq.v2.MessageType
messageType) {
@@ -36,6 +37,8 @@ public enum MessageType {
return MessageType.DELAY;
case TRANSACTION:
return MessageType.TRANSACTION;
+ case PRIORITY:
+ return MessageType.PRIORITY;
case MESSAGE_TYPE_UNSPECIFIED:
default:
throw new IllegalArgumentException("Message type is not
specified");
@@ -54,6 +57,8 @@ public enum MessageType {
return apache.rocketmq.v2.MessageType.DELAY;
case TRANSACTION:
return apache.rocketmq.v2.MessageType.TRANSACTION;
+ case PRIORITY:
+ return apache.rocketmq.v2.MessageType.PRIORITY;
default:
return apache.rocketmq.v2.MessageType.MESSAGE_TYPE_UNSPECIFIED;
}
diff --git
a/java/client/src/main/java/org/apache/rocketmq/client/java/message/MessageViewImpl.java
b/java/client/src/main/java/org/apache/rocketmq/client/java/message/MessageViewImpl.java
index 420e3d35..2ce74902 100644
---
a/java/client/src/main/java/org/apache/rocketmq/client/java/message/MessageViewImpl.java
+++
b/java/client/src/main/java/org/apache/rocketmq/client/java/message/MessageViewImpl.java
@@ -53,6 +53,7 @@ public class MessageViewImpl implements MessageView {
private final String messageGroup;
private final String liteTopic;
private final Long deliveryTimestamp;
+ private final Integer priority;
private final Collection<String> keys;
private final Map<String, String> properties;
private final String bornHost;
@@ -68,7 +69,7 @@ public class MessageViewImpl implements MessageView {
public MessageViewImpl(MessageId messageId, String topic, byte[] body,
String tag,
String messageGroup, String liteTopic,
- Long deliveryTimestamp, Collection<String> keys, Map<String, String>
properties,
+ Long deliveryTimestamp, Integer priority, Collection<String> keys,
Map<String, String> properties,
String bornHost, long bornTimestamp, int deliveryAttempt,
MessageQueueImpl messageQueue,
String receiptHandle, long offset, boolean corrupted,
Long transportDeliveryTimestamp) {
@@ -79,6 +80,7 @@ public class MessageViewImpl implements MessageView {
this.messageGroup = messageGroup;
this.liteTopic = liteTopic;
this.deliveryTimestamp = deliveryTimestamp;
+ this.priority = priority;
this.keys = checkNotNull(keys, "keys should not be null");
this.properties = checkNotNull(properties, "properties should not be
null");
this.bornHost = checkNotNull(bornHost, "bornHost should not be null");
@@ -165,6 +167,14 @@ public class MessageViewImpl implements MessageView {
return Optional.ofNullable(deliveryTimestamp);
}
+ /**
+ * @see MessageView#getPriority()
+ */
+ @Override
+ public Optional<Integer> getPriority() {
+ return Optional.ofNullable(priority);
+ }
+
/**
* @see MessageView#getBornHost()
*/
@@ -303,6 +313,7 @@ public class MessageViewImpl implements MessageView {
String liteTopic = systemProperties.hasLiteTopic() ?
systemProperties.getLiteTopic() : null;
Long deliveryTimestamp = systemProperties.hasDeliveryTimestamp() ?
Timestamps.toMillis(systemProperties.getDeliveryTimestamp()) :
null;
+ Integer priority = systemProperties.hasPriority() ?
systemProperties.getPriority() : null;
final ProtocolStringList keys = systemProperties.getKeysList();
final String bornHost = systemProperties.getBornHost();
final long bornTimestamp =
Timestamps.toMillis(systemProperties.getBornTimestamp());
@@ -310,7 +321,7 @@ public class MessageViewImpl implements MessageView {
final long offset = systemProperties.getQueueOffset();
final Map<String, String> properties = message.getUserPropertiesMap();
final String receiptHandle = systemProperties.getReceiptHandle();
- return new MessageViewImpl(messageId, topic, body, tag, messageGroup,
liteTopic, deliveryTimestamp,
+ return new MessageViewImpl(messageId, topic, body, tag, messageGroup,
liteTopic, deliveryTimestamp, priority,
keys, properties, bornHost, bornTimestamp, deliveryAttempt,
mq, receiptHandle, offset, corrupted, transportDeliveryTimestamp);
}
@@ -329,6 +340,7 @@ public class MessageViewImpl implements MessageView {
.add("messageGroup", messageGroup)
.add("liteTopic", liteTopic)
.add("deliveryTimestamp", deliveryTimestamp)
+ .add("priority", priority)
.add("properties", properties)
.toString();
}
diff --git
a/java/client/src/main/java/org/apache/rocketmq/client/java/message/PublishingMessageImpl.java
b/java/client/src/main/java/org/apache/rocketmq/client/java/message/PublishingMessageImpl.java
index 96987a78..08b281e4 100644
---
a/java/client/src/main/java/org/apache/rocketmq/client/java/message/PublishingMessageImpl.java
+++
b/java/client/src/main/java/org/apache/rocketmq/client/java/message/PublishingMessageImpl.java
@@ -50,6 +50,7 @@ public class PublishingMessageImpl extends MessageImpl {
// Normal message.
if (!message.getMessageGroup().isPresent() &&
!message.getLiteTopic().isPresent() &&
+ !message.getPriority().isPresent() &&
!message.getDeliveryTimestamp().isPresent() && !txEnabled) {
messageType = MessageType.NORMAL;
return;
@@ -69,14 +70,22 @@ public class PublishingMessageImpl extends MessageImpl {
messageType = MessageType.DELAY;
return;
}
+ // Priority message.
+ if (message.getPriority().isPresent() && !txEnabled) {
+ messageType = MessageType.PRIORITY;
+ return;
+ }
// Transaction message.
if (!message.getMessageGroup().isPresent() &&
+ !message.getLiteTopic().isPresent() &&
+ !message.getPriority().isPresent() &&
!message.getDeliveryTimestamp().isPresent() && txEnabled) {
messageType = MessageType.TRANSACTION;
return;
}
- // Transaction semantics is conflicted with fifo/delay.
- throw new IllegalArgumentException("Transactional message should not
set messageGroup or deliveryTimestamp");
+ // Transaction semantics is conflicted with fifo/delay/lite/priority.
+ throw new IllegalArgumentException(
+ "Transactional message should not set messageGroup,
deliveryTimestamp, lite and priority");
}
public MessageId getMessageId() {
@@ -117,7 +126,10 @@ public class PublishingMessageImpl extends MessageImpl {
.ifPresent(millis ->
systemPropertiesBuilder.setDeliveryTimestamp(Timestamps.fromMillis(millis)));
// Message group
this.getMessageGroup().ifPresent(systemPropertiesBuilder::setMessageGroup);
+ // Lite
this.getLiteTopic().ifPresent(systemPropertiesBuilder::setLiteTopic);
+ // Priority
+ this.getPriority().ifPresent(systemPropertiesBuilder::setPriority);
final SystemProperties systemProperties =
systemPropertiesBuilder.build();
Resource topicResource =
Resource.newBuilder().setResourceNamespace(namespace).setName(getTopic()).build();
return apache.rocketmq.v2.Message.newBuilder()
diff --git
a/java/client/src/test/java/org/apache/rocketmq/client/java/message/GeneralMessageImplTest.java
b/java/client/src/test/java/org/apache/rocketmq/client/java/message/GeneralMessageImplTest.java
index 23f445c7..eae2b8eb 100644
---
a/java/client/src/test/java/org/apache/rocketmq/client/java/message/GeneralMessageImplTest.java
+++
b/java/client/src/test/java/org/apache/rocketmq/client/java/message/GeneralMessageImplTest.java
@@ -176,6 +176,39 @@ public class GeneralMessageImplTest extends TestBase {
assertFalse(generalMessage.getTransportDeliveryTimestamp().isPresent());
}
+ @Test
+ public void testMessagePriority() {
+ String topic = "testTopic";
+ byte[] body = "foobar".getBytes(StandardCharsets.UTF_8);
+ int priority = 1;
+
+ final Message message = new MessageBuilderImpl()
+ .setTopic(topic)
+ .setBody(body)
+ .setPriority(priority)
+ .build();
+
+ final GeneralMessageImpl generalMessage = new
GeneralMessageImpl(message);
+ assertFalse(generalMessage.getMessageId().isPresent());
+ assertEquals(topic, generalMessage.getTopic());
+ assertEquals(ByteBuffer.wrap(body), generalMessage.getBody());
+ assertFalse(generalMessage.getTag().isPresent());
+ assertEquals(0, generalMessage.getKeys().size());
+
+ assertTrue(generalMessage.getPriority().isPresent());
+ assertEquals(priority, (int) generalMessage.getPriority().get());
+
+ assertFalse(generalMessage.getMessageGroup().isPresent());
+ assertFalse(generalMessage.getBornHost().isPresent());
+ assertFalse(generalMessage.getBornTimestamp().isPresent());
+ assertFalse(generalMessage.getDeliveryAttempt().isPresent());
+ assertFalse(generalMessage.getLiteTopic().isPresent());
+ assertFalse(generalMessage.getDecodeTimestamp().isPresent());
+
assertFalse(generalMessage.getTransportDeliveryTimestamp().isPresent());
+
+ assertFalse(generalMessage.getDeliveryTimestamp().isPresent());
+ }
+
@Test
public void testMessageView() {
MessageId messageId = MessageIdCodec.getInstance().nextMessageId();
@@ -185,6 +218,7 @@ public class GeneralMessageImplTest extends TestBase {
String messageGroup = "messageGroup0";
String liteTopic = "liteTopic0";
long deliveryTimestamp = System.currentTimeMillis();
+ int priority = 1;
List<String> keys = new ArrayList<>();
keys.add("keyA");
Map<String, String> properties = new HashMap<>();
@@ -200,7 +234,7 @@ public class GeneralMessageImplTest extends TestBase {
final MessageViewImpl messageView = new MessageViewImpl(messageId,
topic, body, tag,
messageGroup, liteTopic,
- deliveryTimestamp, keys, properties, bornHost, bornTimestamp,
deliveryAttempt, mq, receiptHandle,
+ deliveryTimestamp, priority, keys, properties, bornHost,
bornTimestamp, deliveryAttempt, mq, receiptHandle,
offset, corrupted, transportDeliveryTimestamp);
final GeneralMessageImpl generalMessage = new
GeneralMessageImpl(messageView);
assertTrue(generalMessage.getMessageId().isPresent());
@@ -217,6 +251,8 @@ public class GeneralMessageImplTest extends TestBase {
assertEquals(liteTopic, generalMessage.getLiteTopic().get());
assertTrue(generalMessage.getDeliveryTimestamp().isPresent());
assertEquals(deliveryTimestamp, (long)
generalMessage.getDeliveryTimestamp().get());
+ assertTrue(generalMessage.getPriority().isPresent());
+ assertEquals(priority, (int) generalMessage.getPriority().get());
assertTrue(generalMessage.getBornHost().isPresent());
assertEquals(bornHost, generalMessage.getBornHost().get());
assertTrue(generalMessage.getBornTimestamp().isPresent());
diff --git
a/java/client/src/test/java/org/apache/rocketmq/client/java/message/MessageImplTest.java
b/java/client/src/test/java/org/apache/rocketmq/client/java/message/MessageImplTest.java
index 88797670..86145f66 100644
---
a/java/client/src/test/java/org/apache/rocketmq/client/java/message/MessageImplTest.java
+++
b/java/client/src/test/java/org/apache/rocketmq/client/java/message/MessageImplTest.java
@@ -130,6 +130,8 @@ public class MessageImplTest extends TestBase {
assertArrayEquals(FAKE_MESSAGE_BODY, bytes);
assertFalse(message.getDeliveryTimestamp().isPresent());
assertFalse(message.getMessageGroup().isPresent());
+ assertFalse(message.getLiteTopic().isPresent());
+ assertFalse(message.getPriority().isPresent());
}
@Test(expected = IllegalArgumentException.class)
@@ -148,4 +150,46 @@ public class MessageImplTest extends TestBase {
assertEquals("liteTopicA", message.getLiteTopic().get());
}
+ @Test
+ public void testPrioritySetter() {
+ final Message message =
+
provider.newMessageBuilder().setPriority(1).setTopic(FAKE_TOPIC_0).setBody(FAKE_MESSAGE_BODY).build();
+ assertTrue(message.getPriority().isPresent());
+ assertEquals(1, (int) message.getPriority().get());
+
+ Assert.assertThrows(IllegalArgumentException.class, () ->
+ provider.newMessageBuilder().setPriority(-1));
+ }
+
+ @Test
+ public void testMessageTypeConflict() {
+ Assert.assertThrows(IllegalArgumentException.class, () ->
+
provider.newMessageBuilder().setDeliveryTimestamp(System.currentTimeMillis()).setMessageGroup("HW"));
+ Assert.assertThrows(IllegalArgumentException.class, () ->
+
provider.newMessageBuilder().setDeliveryTimestamp(System.currentTimeMillis()).setLiteTopic("HW"));
+ Assert.assertThrows(IllegalArgumentException.class, () ->
+
provider.newMessageBuilder().setDeliveryTimestamp(System.currentTimeMillis()).setPriority(1));
+
+ Assert.assertThrows(IllegalArgumentException.class, () ->
+
provider.newMessageBuilder().setMessageGroup("HW").setLiteTopic("HW"));
+ Assert.assertThrows(IllegalArgumentException.class, () ->
+
provider.newMessageBuilder().setMessageGroup("HW").setDeliveryTimestamp(System.currentTimeMillis()));
+ Assert.assertThrows(IllegalArgumentException.class, () ->
+ provider.newMessageBuilder().setMessageGroup("HW").setPriority(1));
+
+ Assert.assertThrows(IllegalArgumentException.class, () ->
+
provider.newMessageBuilder().setLiteTopic("HW").setMessageGroup("HW"));
+ Assert.assertThrows(IllegalArgumentException.class, () ->
+
provider.newMessageBuilder().setLiteTopic("HW").setDeliveryTimestamp(System.currentTimeMillis()));
+ Assert.assertThrows(IllegalArgumentException.class, () ->
+ provider.newMessageBuilder().setLiteTopic("HW").setPriority(1));
+
+ Assert.assertThrows(IllegalArgumentException.class, () ->
+
provider.newMessageBuilder().setPriority(1).setDeliveryTimestamp(System.currentTimeMillis()));
+ Assert.assertThrows(IllegalArgumentException.class, () ->
+ provider.newMessageBuilder().setPriority(1).setLiteTopic("HW"));
+ Assert.assertThrows(IllegalArgumentException.class, () ->
+ provider.newMessageBuilder().setPriority(1).setMessageGroup("HW"));
+ }
+
}
\ No newline at end of file
diff --git
a/java/client/src/test/java/org/apache/rocketmq/client/java/tool/TestBase.java
b/java/client/src/test/java/org/apache/rocketmq/client/java/tool/TestBase.java
index 1d7e7581..ee316f2b 100644
---
a/java/client/src/test/java/org/apache/rocketmq/client/java/tool/TestBase.java
+++
b/java/client/src/test/java/org/apache/rocketmq/client/java/tool/TestBase.java
@@ -183,7 +183,7 @@ public class TestBase {
final byte[] body = RandomUtils.nextBytes(bodySize);
Map<String, String> properties = new HashMap<>();
List<String> keys = new ArrayList<>();
- return new MessageViewImpl(messageId, FAKE_TOPIC_0, body, null, null,
null, null,
+ return new MessageViewImpl(messageId, FAKE_TOPIC_0, body, null, null,
null, null, null,
keys, properties, FAKE_HOST_0, 1, 1, mq, FAKE_RECEIPT_HANDLE_0, 1,
corrupted,
System.currentTimeMillis());
}
diff --git a/java/pom.xml b/java/pom.xml
index f81820e1..4926e128 100644
--- a/java/pom.xml
+++ b/java/pom.xml
@@ -48,7 +48,7 @@
~ 1. Whether it is essential, because the current shaded jar is
fat enough.
~ 2. Make sure that it is compatible with Java 8.
-->
- <rocketmq-proto.version>2.1.0</rocketmq-proto.version>
+ <rocketmq-proto.version>2.1.1</rocketmq-proto.version>
<annotations-api.version>1.3.5</annotations-api.version>
<protobuf.version>3.24.4</protobuf.version>
<grpc.version>1.50.0</grpc.version>