This is an automated email from the ASF dual-hosted git repository.
lollipop pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/rocketmq-clients.git
The following commit(s) were added to refs/heads/master by this push:
new 8d5d765d Release python sdk 5.1.1 (#1192)
8d5d765d is described below
commit 8d5d765dee11a36727698b1e7e22ef34e30c6660
Author: zhouli11 <[email protected]>
AuthorDate: Mon Feb 9 15:07:09 2026 +0800
Release python sdk 5.1.1 (#1192)
---
README-CN.md | 2 +-
README.md | 2 +-
python/example/async_producer_example.py | 4 +-
python/example/async_simple_consumer_example.py | 78 ----------------
python/example/lite_producer_example.py | 3 +-
python/example/lite_push_consumer_example.py | 3 +-
python/example/normal_producer_example.py | 7 +-
...cer_example.py => priority_producer_example.py} | 11 ++-
python/example/push_consumer_example.py | 5 +-
python/example/simple_consumer_example.py | 1 -
python/example/transaction_producer_example.py | 6 +-
python/rocketmq/grpc_protocol/definition_pb2.py | 100 +++++++++++----------
.../rocketmq/grpc_protocol/proto/definition.proto | 21 +++++
python/rocketmq/grpc_protocol/proto/service.proto | 7 +-
python/rocketmq/grpc_protocol/service_pb2.py | 61 ++++++-------
python/rocketmq/v5/model/message.py | 76 ++++++++++++----
python/rocketmq/v5/producer/producer.py | 10 ++-
python/rocketmq/v5/util/misc.py | 2 +-
18 files changed, 194 insertions(+), 205 deletions(-)
diff --git a/README-CN.md b/README-CN.md
index 0431f496..0e2c3acd 100644
--- a/README-CN.md
+++ b/README-CN.md
@@ -28,7 +28,7 @@
| Push consumer with concurrent message listener | ✅ | ✅ | ✅ | ✅
| ✅ | ✅ | 🚧 | 🚧 |
| Push consumer with FIFO message listener | ✅ | ✅ | ✅ | ✅
| ✅ | ✅ | 🚧 | 🚧 |
| Push consumer with FIFO consume accelerator | ✅ | ✅ | 🚧 | ✅
| 🚧 | ✅ | 🚧 | 🚧 |
-| Priority Message | ✅ | 🚧 | 🚧 | 🚧
| 🚧 | 🚧 | 🚧 | 🚧 |
+| Priority Message | ✅ | 🚧 | 🚧 | 🚧
| 🚧 | ✅ | 🚧 | 🚧 |
## 先决条件和构建
diff --git a/README.md b/README.md
index 9eee0e85..9eb40f97 100644
--- a/README.md
+++ b/README.md
@@ -28,7 +28,7 @@ Provide cloud-native and robust solutions for Java, C++, C#,
Golang, Rust and al
| Push consumer with concurrent message listener | ✅ | ✅ | ✅ | ✅
| ✅ | ✅ | 🚧 | 🚧 |
| Push consumer with FIFO message listener | ✅ | ✅ | ✅ | ✅
| ✅ | ✅ | 🚧 | 🚧 |
| Push consumer with FIFO consume accelerator | ✅ | ✅ | 🚧 | ✅
| 🚧 | ✅ | 🚧 | 🚧 |
-| Priority Message | ✅ | 🚧 | 🚧 | 🚧
| 🚧 | 🚧 | 🚧 | 🚧 |
+| Priority Message | ✅ | 🚧 | 🚧 | 🚧
| 🚧 | ✅ | 🚧 | 🚧 |
## Prerequisite and Build
diff --git a/python/example/async_producer_example.py
b/python/example/async_producer_example.py
index b9ae878c..f1b18e4b 100644
--- a/python/example/async_producer_example.py
+++ b/python/example/async_producer_example.py
@@ -45,9 +45,9 @@ if __name__ == '__main__':
msg.topic = topic
msg.body = "hello, rocketmq.".encode('utf-8')
# secondary classifier of message besides topic
- msg.tag = "rocketmq-send-message"
+ msg.tag = "tag"
# key(s) of the message, another way to mark message besides
message id
- msg.keys = "send_async"
+ msg.keys = "keys"
# user property for the message
msg.add_property("send", "async")
send_result_future = producer.send_async(msg)
diff --git a/python/example/async_simple_consumer_example.py
b/python/example/async_simple_consumer_example.py
deleted file mode 100644
index 827dde48..00000000
--- a/python/example/async_simple_consumer_example.py
+++ /dev/null
@@ -1,78 +0,0 @@
-# Licensed to the Apache Software Foundation (ASF) under one or more
-# contributor license agreements. See the NOTICE file distributed with
-# this work for additional information regarding copyright ownership.
-# The ASF licenses this file to You under the Apache License, Version 2.0
-# (the "License"); you may not use this file except in compliance with
-# the License. You may obtain a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-
-import functools
-import time
-from concurrent.futures.thread import ThreadPoolExecutor
-
-from rocketmq import ClientConfiguration, Credentials, SimpleConsumer
-
-consume_executor = ThreadPoolExecutor(max_workers=2,
thread_name_prefix="consume-message")
-
-
-def consume_message(consumer, message):
- try:
- consumer.ack(message)
- print(f"ack message:{message.message_id}.")
- except Exception as exception:
- print(f"consume message raise exception: {exception}")
-
-
-def receive_callback(receive_result_future, consumer):
- messages = receive_result_future.result()
- print(f"{consumer} receive {len(messages)} messages.")
- for msg in messages:
- try:
- # consume message in other thread, don't block the async receive
thread
- consume_executor.submit(consume_message, consumer=consumer,
message=msg)
- except Exception as exception:
- print(f"receive message raise exception: {exception}")
-
-
-if __name__ == '__main__':
- endpoints = "foobar.com:8080"
- credentials = Credentials()
- # if auth enable
- # credentials = Credentials("ak", "sk")
- config = ClientConfiguration(endpoints, credentials)
- # with namespace
- # config = ClientConfiguration(endpoints, credentials, "namespace")
- topic = "topic"
- consumer_group = "consumer-group"
- # in most case, you don't need to create too many consumers, singleton
pattern is recommended
- # close the simple consumer when you don't need it anymore
- simple_consumer = SimpleConsumer(config, consumer_group)
- try:
- simple_consumer.startup()
- try:
- simple_consumer.subscribe(topic)
- # use tag filter
- # simple_consumer.subscribe(topic, FilterExpression("tag"))
- while True:
- try:
- time.sleep(1)
- # max message num for each long polling and message
invisible duration after it is received
- future = simple_consumer.receive_async(32, 15)
-
future.add_done_callback(functools.partial(receive_callback,
consumer=simple_consumer))
- except Exception as e:
- print(f"{simple_consumer} raise exception: {e}")
- except Exception as e:
- print(f"{simple_consumer} exception: {e}")
- simple_consumer.shutdown()
- print(f"{simple_consumer} shutdown.")
- except Exception as e:
- print(f"{simple_consumer} startup raise exception: {e}")
- simple_consumer.shutdown()
- print(f"{simple_consumer} shutdown.")
diff --git a/python/example/lite_producer_example.py
b/python/example/lite_producer_example.py
index 6782cd51..522dabe1 100644
--- a/python/example/lite_producer_example.py
+++ b/python/example/lite_producer_example.py
@@ -18,7 +18,6 @@ from rocketmq import ClientConfiguration, Credentials,
Message, Producer
if __name__ == '__main__':
endpoints = "foobar.com:8080"
credentials = Credentials()
-
# if auth enable
# credentials = Credentials("ak", "sk")
config = ClientConfiguration(endpoints, credentials)
@@ -36,7 +35,7 @@ if __name__ == '__main__':
msg.body = "hello, rocketmq.".encode('utf-8')
for i in range(0, 10):
# set lite_topic
- msg.lite_topic("lite-test-" + str(i))
+ msg.lite_topic = "lite-test-" + str(i)
res = producer.send(msg)
print(f"{producer} send message success. {res}")
producer.shutdown()
diff --git a/python/example/lite_push_consumer_example.py
b/python/example/lite_push_consumer_example.py
index f398e474..5535af36 100644
--- a/python/example/lite_push_consumer_example.py
+++ b/python/example/lite_push_consumer_example.py
@@ -20,14 +20,13 @@ from rocketmq import (ClientConfiguration, ConsumeResult,
Credentials,
class LiteTopicTestMessageListener(MessageListener):
def consume(self, message: Message) -> ConsumeResult:
- print(f"consume message, topic:{message.topic},
lite_topic:{message.lite_topic}, message_id: {message.message_id}.")
+ print(f"consume message, {message}.")
return ConsumeResult.SUCCESS
if __name__ == '__main__':
endpoints = "foobar.com:8080"
credentials = Credentials()
-
# if auth enable
# credentials = Credentials("ak", "sk")
config = ClientConfiguration(endpoints, credentials)
diff --git a/python/example/normal_producer_example.py
b/python/example/normal_producer_example.py
index 756b0d44..1e7a2111 100644
--- a/python/example/normal_producer_example.py
+++ b/python/example/normal_producer_example.py
@@ -18,7 +18,6 @@ from rocketmq import ClientConfiguration, Credentials,
Message, Producer
if __name__ == '__main__':
endpoints = "foobar.com:8080"
credentials = Credentials()
-
# if auth enable
# credentials = Credentials("ak", "sk")
config = ClientConfiguration(endpoints, credentials)
@@ -35,11 +34,11 @@ if __name__ == '__main__':
msg.topic = topic
msg.body = "hello, rocketmq.".encode('utf-8')
# secondary classifier of message besides topic
- msg.tag = "rocketmq-send-message"
+ msg.tag = "tag"
# key(s) of the message, another way to mark message besides
message id
- msg.keys = "send_sync"
+ msg.keys = "keys"
# user property for the message
- msg.add_property("send", "sync")
+ msg.add_property("key", "value")
for i in range(0, 10):
res = producer.send(msg)
print(f"{producer} send message success. {res}")
diff --git a/python/example/normal_producer_example.py
b/python/example/priority_producer_example.py
similarity index 91%
copy from python/example/normal_producer_example.py
copy to python/example/priority_producer_example.py
index 756b0d44..d8d111c6 100644
--- a/python/example/normal_producer_example.py
+++ b/python/example/priority_producer_example.py
@@ -18,13 +18,12 @@ from rocketmq import ClientConfiguration, Credentials,
Message, Producer
if __name__ == '__main__':
endpoints = "foobar.com:8080"
credentials = Credentials()
-
# if auth enable
# credentials = Credentials("ak", "sk")
config = ClientConfiguration(endpoints, credentials)
# with namespace
# config = ClientConfiguration(endpoints, credentials, "namespace")
- topic = "topic"
+ topic = "priority-topic"
producer = Producer(config, (topic,))
try:
@@ -35,12 +34,12 @@ if __name__ == '__main__':
msg.topic = topic
msg.body = "hello, rocketmq.".encode('utf-8')
# secondary classifier of message besides topic
- msg.tag = "rocketmq-send-message"
+ msg.tag = "tag"
# key(s) of the message, another way to mark message besides
message id
- msg.keys = "send_sync"
- # user property for the message
- msg.add_property("send", "sync")
+ msg.keys = "keys"
for i in range(0, 10):
+ # priority of message
+ msg.priority = i
res = producer.send(msg)
print(f"{producer} send message success. {res}")
producer.shutdown()
diff --git a/python/example/push_consumer_example.py
b/python/example/push_consumer_example.py
index 0baa5099..32ea8360 100644
--- a/python/example/push_consumer_example.py
+++ b/python/example/push_consumer_example.py
@@ -20,21 +20,20 @@ from rocketmq import (ClientConfiguration, ConsumeResult,
Credentials,
class TestMessageListener(MessageListener):
def consume(self, message: Message) -> ConsumeResult:
- print(f"consume message, topic:{message.topic}, message_id:
{message.message_id}.")
+ print(f"consume message, {message}.")
return ConsumeResult.SUCCESS
if __name__ == '__main__':
endpoints = "foobar.com:8080"
credentials = Credentials()
-
# if auth enable
# credentials = Credentials("ak", "sk")
config = ClientConfiguration(endpoints, credentials)
# with namespace
# config = ClientConfiguration(endpoints, credentials, "namespace")
topic = "topic"
- consumer_group = "consumer_group"
+ consumer_group = "consumer-group"
# in most case, you don't need to create too many consumers, singleton
pattern is recommended
# close the push consumer when you don't need it anymore
push_consumer = PushConsumer(config, consumer_group,
TestMessageListener(), {topic: FilterExpression(), })
diff --git a/python/example/simple_consumer_example.py
b/python/example/simple_consumer_example.py
index 9e0a944c..0fdf980b 100644
--- a/python/example/simple_consumer_example.py
+++ b/python/example/simple_consumer_example.py
@@ -19,7 +19,6 @@ from rocketmq import (ClientConfiguration, Credentials,
FilterExpression,
if __name__ == '__main__':
endpoints = "foobar.com:8080"
credentials = Credentials()
-
# if auth enable
# credentials = Credentials("ak", "sk")
config = ClientConfiguration(endpoints, credentials)
diff --git a/python/example/transaction_producer_example.py
b/python/example/transaction_producer_example.py
index 8250b9fe..83c272b2 100644
--- a/python/example/transaction_producer_example.py
+++ b/python/example/transaction_producer_example.py
@@ -20,7 +20,7 @@ from rocketmq import (ClientConfiguration, Credentials,
Message, Producer,
class TestChecker(TransactionChecker):
def check(self, message: Message) -> TransactionResolution:
- print(f"do TestChecker check, topic:{message.topic}, message_id:
{message.message_id}, commit message.")
+ print(f"do TestChecker check, {message}, commit message.")
return TransactionResolution.COMMIT
@@ -49,10 +49,6 @@ if __name__ == '__main__':
msg.body = "hello, rocketmq.".encode('utf-8')
# secondary classifier of message besides topic
msg.tag = "rocketmq-send-transaction-message"
- # key(s) of the message, another way to mark message besides message id
- msg.keys = "send_transaction"
- # user property for the message
- msg.add_property("send", "transaction")
res = producer.send(msg, transaction)
print(f"{producer} send message success. {res}")
if check_from_server:
diff --git a/python/rocketmq/grpc_protocol/definition_pb2.py
b/python/rocketmq/grpc_protocol/definition_pb2.py
index 924979a7..2b3d42b6 100644
--- a/python/rocketmq/grpc_protocol/definition_pb2.py
+++ b/python/rocketmq/grpc_protocol/definition_pb2.py
@@ -26,7 +26,7 @@ from google.protobuf import timestamp_pb2 as
google_dot_protobuf_dot_timestamp__
from google.protobuf import duration_pb2 as
google_dot_protobuf_dot_duration__pb2
-DESCRIPTOR =
_descriptor_pool.Default().AddSerializedFile(b'\n\x10\x64\x65\x66inition.proto\x12\x12\x61pache.rocketmq.v2\x1a\x1fgoogle/protobuf/timestamp.proto\x1a\x1egoogle/protobuf/duration.proto\"T\n\x10\x46ilterExpression\x12,\n\x04type\x18\x01
\x01(\x0e\x32\x1e.apache.rocketmq.v2.FilterType\x12\x12\n\nexpression\x18\x02
\x01(\t\"\xbb\x01\n\x0bRetryPolicy\x12\x14\n\x0cmax_attempts\x18\x01
\x01(\x05\x12\x45\n\x13\x65xponential_backoff\x18\x02
\x01(\x0b\x32&.apache.rocketmq.v2.Exponent [...]
+DESCRIPTOR =
_descriptor_pool.Default().AddSerializedFile(b'\n\x10\x64\x65\x66inition.proto\x12\x12\x61pache.rocketmq.v2\x1a\x1fgoogle/protobuf/timestamp.proto\x1a\x1egoogle/protobuf/duration.proto\"T\n\x10\x46ilterExpression\x12,\n\x04type\x18\x01
\x01(\x0e\x32\x1e.apache.rocketmq.v2.FilterType\x12\x12\n\nexpression\x18\x02
\x01(\t\"\xbb\x01\n\x0bRetryPolicy\x12\x14\n\x0cmax_attempts\x18\x01
\x01(\x05\x12\x45\n\x13\x65xponential_backoff\x18\x02
\x01(\x0b\x32&.apache.rocketmq.v2.Exponent [...]
_globals = globals()
_builder.BuildMessageAndEnumDescriptors(DESCRIPTOR, _globals)
@@ -36,32 +36,32 @@ if not _descriptor._USE_C_DESCRIPTORS:
_globals['DESCRIPTOR']._serialized_options =
b'\n\022apache.rocketmq.v2B\010MQDomainP\001\240\001\001\330\001\001\252\002\022Apache.Rocketmq.V2'
_globals['_MESSAGE_USERPROPERTIESENTRY']._loaded_options = None
_globals['_MESSAGE_USERPROPERTIESENTRY']._serialized_options = b'8\001'
- _globals['_TRANSACTIONRESOLUTION']._serialized_start=4107
- _globals['_TRANSACTIONRESOLUTION']._serialized_end=4196
- _globals['_TRANSACTIONSOURCE']._serialized_start=4198
- _globals['_TRANSACTIONSOURCE']._serialized_end=4285
- _globals['_PERMISSION']._serialized_start=4287
- _globals['_PERMISSION']._serialized_end=4374
- _globals['_FILTERTYPE']._serialized_start=4376
- _globals['_FILTERTYPE']._serialized_end=4435
- _globals['_ADDRESSSCHEME']._serialized_start=4437
- _globals['_ADDRESSSCHEME']._serialized_end=4521
- _globals['_MESSAGETYPE']._serialized_start=4523
- _globals['_MESSAGETYPE']._serialized_end=4626
- _globals['_DIGESTTYPE']._serialized_start=4628
- _globals['_DIGESTTYPE']._serialized_end=4699
- _globals['_CLIENTTYPE']._serialized_start=4702
- _globals['_CLIENTTYPE']._serialized_end=4866
- _globals['_ENCODING']._serialized_start=4868
- _globals['_ENCODING']._serialized_end=4928
- _globals['_CODE']._serialized_start=4931
- _globals['_CODE']._serialized_end=6380
- _globals['_LANGUAGE']._serialized_start=6383
- _globals['_LANGUAGE']._serialized_end=6556
- _globals['_LITESUBSCRIPTIONACTION']._serialized_start=6558
- _globals['_LITESUBSCRIPTIONACTION']._serialized_end=6658
- _globals['_QUERYOFFSETPOLICY']._serialized_start=6660
- _globals['_QUERYOFFSETPOLICY']._serialized_end=6718
+ _globals['_TRANSACTIONRESOLUTION']._serialized_start=4329
+ _globals['_TRANSACTIONRESOLUTION']._serialized_end=4418
+ _globals['_TRANSACTIONSOURCE']._serialized_start=4420
+ _globals['_TRANSACTIONSOURCE']._serialized_end=4507
+ _globals['_PERMISSION']._serialized_start=4509
+ _globals['_PERMISSION']._serialized_end=4596
+ _globals['_FILTERTYPE']._serialized_start=4598
+ _globals['_FILTERTYPE']._serialized_end=4657
+ _globals['_ADDRESSSCHEME']._serialized_start=4659
+ _globals['_ADDRESSSCHEME']._serialized_end=4743
+ _globals['_MESSAGETYPE']._serialized_start=4745
+ _globals['_MESSAGETYPE']._serialized_end=4862
+ _globals['_DIGESTTYPE']._serialized_start=4864
+ _globals['_DIGESTTYPE']._serialized_end=4935
+ _globals['_CLIENTTYPE']._serialized_start=4938
+ _globals['_CLIENTTYPE']._serialized_end=5102
+ _globals['_ENCODING']._serialized_start=5104
+ _globals['_ENCODING']._serialized_end=5164
+ _globals['_CODE']._serialized_start=5167
+ _globals['_CODE']._serialized_end=6616
+ _globals['_LANGUAGE']._serialized_start=6619
+ _globals['_LANGUAGE']._serialized_end=6792
+ _globals['_LITESUBSCRIPTIONACTION']._serialized_start=6794
+ _globals['_LITESUBSCRIPTIONACTION']._serialized_end=6894
+ _globals['_QUERYOFFSETPOLICY']._serialized_start=6896
+ _globals['_QUERYOFFSETPOLICY']._serialized_end=6954
_globals['_FILTEREXPRESSION']._serialized_start=105
_globals['_FILTEREXPRESSION']._serialized_end=189
_globals['_RETRYPOLICY']._serialized_start=192
@@ -85,25 +85,29 @@ if not _descriptor._USE_C_DESCRIPTORS:
_globals['_DIGEST']._serialized_start=1217
_globals['_DIGEST']._serialized_end=1289
_globals['_SYSTEMPROPERTIES']._serialized_start=1292
- _globals['_SYSTEMPROPERTIES']._serialized_end=2371
- _globals['_DEADLETTERQUEUE']._serialized_start=2373
- _globals['_DEADLETTERQUEUE']._serialized_end=2425
- _globals['_MESSAGE']._serialized_start=2428
- _globals['_MESSAGE']._serialized_end=2690
- _globals['_MESSAGE_USERPROPERTIESENTRY']._serialized_start=2637
- _globals['_MESSAGE_USERPROPERTIESENTRY']._serialized_end=2690
- _globals['_ASSIGNMENT']._serialized_start=2692
- _globals['_ASSIGNMENT']._serialized_end=2761
- _globals['_STATUS']._serialized_start=2763
- _globals['_STATUS']._serialized_end=2828
- _globals['_UA']._serialized_start=2830
- _globals['_UA']._serialized_end=2935
- _globals['_SETTINGS']._serialized_start=2938
- _globals['_SETTINGS']._serialized_end=3466
- _globals['_PUBLISHING']._serialized_start=3468
- _globals['_PUBLISHING']._serialized_end=3580
- _globals['_SUBSCRIPTION']._serialized_start=3583
- _globals['_SUBSCRIPTION']._serialized_end=4014
- _globals['_METRIC']._serialized_start=4016
- _globals['_METRIC']._serialized_end=4105
+ _globals['_SYSTEMPROPERTIES']._serialized_end=2407
+ _globals['_DEADLETTERQUEUE']._serialized_start=2409
+ _globals['_DEADLETTERQUEUE']._serialized_end=2461
+ _globals['_MESSAGE']._serialized_start=2464
+ _globals['_MESSAGE']._serialized_end=2726
+ _globals['_MESSAGE_USERPROPERTIESENTRY']._serialized_start=2673
+ _globals['_MESSAGE_USERPROPERTIESENTRY']._serialized_end=2726
+ _globals['_ASSIGNMENT']._serialized_start=2728
+ _globals['_ASSIGNMENT']._serialized_end=2797
+ _globals['_STATUS']._serialized_start=2799
+ _globals['_STATUS']._serialized_end=2864
+ _globals['_UA']._serialized_start=2866
+ _globals['_UA']._serialized_end=2971
+ _globals['_SETTINGS']._serialized_start=2974
+ _globals['_SETTINGS']._serialized_end=3502
+ _globals['_PUBLISHING']._serialized_start=3504
+ _globals['_PUBLISHING']._serialized_end=3616
+ _globals['_SUBSCRIPTION']._serialized_start=3619
+ _globals['_SUBSCRIPTION']._serialized_end=4050
+ _globals['_METRIC']._serialized_start=4052
+ _globals['_METRIC']._serialized_end=4141
+ _globals['_OFFSETOPTION']._serialized_start=4144
+ _globals['_OFFSETOPTION']._serialized_end=4327
+ _globals['_OFFSETOPTION_POLICY']._serialized_start=4276
+ _globals['_OFFSETOPTION_POLICY']._serialized_end=4312
# @@protoc_insertion_point(module_scope)
diff --git a/python/rocketmq/grpc_protocol/proto/definition.proto
b/python/rocketmq/grpc_protocol/proto/definition.proto
index 2513bac0..516474df 100644
--- a/python/rocketmq/grpc_protocol/proto/definition.proto
+++ b/python/rocketmq/grpc_protocol/proto/definition.proto
@@ -149,6 +149,9 @@ enum MessageType {
// lite topic
LITE = 5;
+
+ // Messages that lower prioritised ones may need to wait for higher priority
messages to be processed first
+ PRIORITY = 6;
}
enum DigestType {
@@ -278,6 +281,9 @@ message SystemProperties {
// lite topic
optional string lite_topic = 21;
+
+ // Priority of message, which is optional
+ optional int32 priority = 22;
}
message DeadLetterQueue {
@@ -596,4 +602,19 @@ enum QueryOffsetPolicy {
// Use this option if time-based seek is targeted.
TIMESTAMP = 2;
+}
+
+message OffsetOption {
+ oneof offset_type {
+ Policy policy = 1;
+ int64 offset = 2;
+ int64 tail_n = 3;
+ int64 timestamp = 4;
+ }
+
+ enum Policy {
+ LAST = 0;
+ MIN = 1;
+ MAX = 2;
+ }
}
\ No newline at end of file
diff --git a/python/rocketmq/grpc_protocol/proto/service.proto
b/python/rocketmq/grpc_protocol/proto/service.proto
index ca809ff2..810693e8 100644
--- a/python/rocketmq/grpc_protocol/proto/service.proto
+++ b/python/rocketmq/grpc_protocol/proto/service.proto
@@ -252,6 +252,10 @@ message ChangeInvisibleDurationRequest {
// For message tracing
string message_id = 5;
+
+ optional string lite_topic = 6;
+ // If true, server will not increment the retry times for this message
+ optional bool suspend = 7;
}
message ChangeInvisibleDurationResponse {
@@ -329,6 +333,7 @@ message SyncLiteSubscriptionRequest {
// lite subscription set of lite topics
repeated string lite_topic_set = 4;
optional int64 version = 5;
+ optional OffsetOption offset_option = 6;
}
message SyncLiteSubscriptionResponse {
@@ -468,4 +473,4 @@ service MessagingService {
// Sync lite subscription info, lite push consumer only
rpc SyncLiteSubscription(SyncLiteSubscriptionRequest) returns
(SyncLiteSubscriptionResponse) {}
-}
+}
\ No newline at end of file
diff --git a/python/rocketmq/grpc_protocol/service_pb2.py
b/python/rocketmq/grpc_protocol/service_pb2.py
index 718fae85..f3aed51c 100644
--- a/python/rocketmq/grpc_protocol/service_pb2.py
+++ b/python/rocketmq/grpc_protocol/service_pb2.py
@@ -26,7 +26,8 @@ from google.protobuf import duration_pb2 as
google_dot_protobuf_dot_duration__pb
from google.protobuf import timestamp_pb2 as
google_dot_protobuf_dot_timestamp__pb2
from rocketmq.grpc_protocol import definition_pb2 as definition__pb2
-DESCRIPTOR =
_descriptor_pool.Default().AddSerializedFile(b'\n\rservice.proto\x12\x12\x61pache.rocketmq.v2\x1a\x1egoogle/protobuf/duration.proto\x1a\x1fgoogle/protobuf/timestamp.proto\x1a\x10\x64\x65\x66inition.proto\"r\n\x11QueryRouteRequest\x12+\n\x05topic\x18\x01
\x01(\x0b\x32\x1c.apache.rocketmq.v2.Resource\x12\x30\n\tendpoints\x18\x02
\x01(\x0b\x32\x1d.apache.rocketmq.v2.Endpoints\"z\n\x12QueryRouteResponse\x12*\n\x06status\x18\x01
\x01(\x0b\x32\x1a.apache.rocketmq.v2.Status\x12\x38 [...]
+
+DESCRIPTOR =
_descriptor_pool.Default().AddSerializedFile(b'\n\rservice.proto\x12\x12\x61pache.rocketmq.v2\x1a\x1egoogle/protobuf/duration.proto\x1a\x1fgoogle/protobuf/timestamp.proto\x1a\x10\x64\x65\x66inition.proto\"r\n\x11QueryRouteRequest\x12+\n\x05topic\x18\x01
\x01(\x0b\x32\x1c.apache.rocketmq.v2.Resource\x12\x30\n\tendpoints\x18\x02
\x01(\x0b\x32\x1d.apache.rocketmq.v2.Endpoints\"z\n\x12QueryRouteResponse\x12*\n\x06status\x18\x01
\x01(\x0b\x32\x1a.apache.rocketmq.v2.Status\x12\x38 [...]
_globals = globals()
_builder.BuildMessageAndEnumDescriptors(DESCRIPTOR, _globals)
@@ -93,33 +94,33 @@ if not _descriptor._USE_C_DESCRIPTORS:
_globals['_NOTIFYCLIENTTERMINATIONRESPONSE']._serialized_start=4309
_globals['_NOTIFYCLIENTTERMINATIONRESPONSE']._serialized_end=4386
_globals['_CHANGEINVISIBLEDURATIONREQUEST']._serialized_start=4389
- _globals['_CHANGEINVISIBLEDURATIONREQUEST']._serialized_end=4610
- _globals['_CHANGEINVISIBLEDURATIONRESPONSE']._serialized_start=4612
- _globals['_CHANGEINVISIBLEDURATIONRESPONSE']._serialized_end=4713
- _globals['_PULLMESSAGEREQUEST']._serialized_start=4716
- _globals['_PULLMESSAGEREQUEST']._serialized_end=4996
- _globals['_PULLMESSAGERESPONSE']._serialized_start=4999
- _globals['_PULLMESSAGERESPONSE']._serialized_end=5148
- _globals['_UPDATEOFFSETREQUEST']._serialized_start=5151
- _globals['_UPDATEOFFSETREQUEST']._serialized_end=5290
- _globals['_UPDATEOFFSETRESPONSE']._serialized_start=5292
- _globals['_UPDATEOFFSETRESPONSE']._serialized_end=5358
- _globals['_GETOFFSETREQUEST']._serialized_start=5360
- _globals['_GETOFFSETREQUEST']._serialized_end=5480
- _globals['_GETOFFSETRESPONSE']._serialized_start=5482
- _globals['_GETOFFSETRESPONSE']._serialized_end=5561
- _globals['_QUERYOFFSETREQUEST']._serialized_start=5564
- _globals['_QUERYOFFSETREQUEST']._serialized_end=5775
- _globals['_QUERYOFFSETRESPONSE']._serialized_start=5777
- _globals['_QUERYOFFSETRESPONSE']._serialized_end=5858
- _globals['_RECALLMESSAGEREQUEST']._serialized_start=5860
- _globals['_RECALLMESSAGEREQUEST']._serialized_end=5950
- _globals['_RECALLMESSAGERESPONSE']._serialized_start=5952
- _globals['_RECALLMESSAGERESPONSE']._serialized_end=6039
- _globals['_SYNCLITESUBSCRIPTIONREQUEST']._serialized_start=6042
- _globals['_SYNCLITESUBSCRIPTIONREQUEST']._serialized_end=6279
- _globals['_SYNCLITESUBSCRIPTIONRESPONSE']._serialized_start=6281
- _globals['_SYNCLITESUBSCRIPTIONRESPONSE']._serialized_end=6355
- _globals['_MESSAGINGSERVICE']._serialized_start=6358
- _globals['_MESSAGINGSERVICE']._serialized_end=8226
+ _globals['_CHANGEINVISIBLEDURATIONREQUEST']._serialized_end=4684
+ _globals['_CHANGEINVISIBLEDURATIONRESPONSE']._serialized_start=4686
+ _globals['_CHANGEINVISIBLEDURATIONRESPONSE']._serialized_end=4787
+ _globals['_PULLMESSAGEREQUEST']._serialized_start=4790
+ _globals['_PULLMESSAGEREQUEST']._serialized_end=5070
+ _globals['_PULLMESSAGERESPONSE']._serialized_start=5073
+ _globals['_PULLMESSAGERESPONSE']._serialized_end=5222
+ _globals['_UPDATEOFFSETREQUEST']._serialized_start=5225
+ _globals['_UPDATEOFFSETREQUEST']._serialized_end=5364
+ _globals['_UPDATEOFFSETRESPONSE']._serialized_start=5366
+ _globals['_UPDATEOFFSETRESPONSE']._serialized_end=5432
+ _globals['_GETOFFSETREQUEST']._serialized_start=5434
+ _globals['_GETOFFSETREQUEST']._serialized_end=5554
+ _globals['_GETOFFSETRESPONSE']._serialized_start=5556
+ _globals['_GETOFFSETRESPONSE']._serialized_end=5635
+ _globals['_QUERYOFFSETREQUEST']._serialized_start=5638
+ _globals['_QUERYOFFSETREQUEST']._serialized_end=5849
+ _globals['_QUERYOFFSETRESPONSE']._serialized_start=5851
+ _globals['_QUERYOFFSETRESPONSE']._serialized_end=5932
+ _globals['_RECALLMESSAGEREQUEST']._serialized_start=5934
+ _globals['_RECALLMESSAGEREQUEST']._serialized_end=6024
+ _globals['_RECALLMESSAGERESPONSE']._serialized_start=6026
+ _globals['_RECALLMESSAGERESPONSE']._serialized_end=6113
+ _globals['_SYNCLITESUBSCRIPTIONREQUEST']._serialized_start=6116
+ _globals['_SYNCLITESUBSCRIPTIONREQUEST']._serialized_end=6433
+ _globals['_SYNCLITESUBSCRIPTIONRESPONSE']._serialized_start=6435
+ _globals['_SYNCLITESUBSCRIPTIONRESPONSE']._serialized_end=6509
+ _globals['_MESSAGINGSERVICE']._serialized_start=6512
+ _globals['_MESSAGINGSERVICE']._serialized_end=8380
# @@protoc_insertion_point(module_scope)
diff --git a/python/rocketmq/v5/model/message.py
b/python/rocketmq/v5/model/message.py
index ce63334d..7cffbde6 100644
--- a/python/rocketmq/v5/model/message.py
+++ b/python/rocketmq/v5/model/message.py
@@ -30,12 +30,14 @@ class Message:
self.__tag = None
self.__message_group = None
self.__delivery_timestamp = None
+ self.__priority = None
self.__transport_delivery_timestamp = None
self.__decode_message_timestamp = None
self.__keys = set()
self.__properties = dict()
self.__born_host = None
self.__born_timestamp = None
+ self.__store_timestamp = None
self.__delivery_attempt = None
self.__receipt_handle = None
self.__message_type = None
@@ -44,34 +46,46 @@ class Message:
def __str__(self) -> str:
return (
- f"topic:{self.__topic}, tag:{self.__tag},
messageGroup:{self.__message_group}, "
- f"deliveryTimestamp:{self.__delivery_timestamp},
keys:{self.__keys}, properties:{self.__properties}"
+ f"topic:{self.__topic}, tag:{self.__tag},
message_group:{self.__message_group}, lite_topic:{self.__lite_topic}, "
+ f"priority:{self.__priority},
delivery_timestamp:{self.__delivery_timestamp}, keys:{self.__keys},
properties:{self.__properties}"
)
def fromProtobuf(self, message: definition_pb2.Message): # noqa
try:
self.__message_body_check_sum(message)
self.__topic = message.topic.name
- if message.system_properties.lite_topic:
- self.__lite_topic = message.system_properties.lite_topic
self.__namespace = message.topic.resource_namespace
- self.__message_id = message.system_properties.message_id
self.__body = self.__uncompress_body(message)
- self.__tag = message.system_properties.tag
- if message.system_properties.message_group:
- self.__message_group = message.system_properties.message_group
- self.__born_host = message.system_properties.born_host
+ if message.user_properties:
+ self.__properties.update(message.user_properties)
+
+ # system_properties
+ self.__message_id = message.system_properties.message_id
+ self.__message_type = message.system_properties.message_type
self.__born_timestamp =
Misc.to_mills(message.system_properties.born_timestamp)
- self.__delivery_attempt =
message.system_properties.delivery_attempt
+ self.__born_host = message.system_properties.born_host
+
+ if message.system_properties.tag:
+ self.__tag = message.system_properties.tag
+ if message.system_properties.keys:
+ self.__keys.update(message.system_properties.keys)
+ if message.system_properties.store_timestamp:
+ self.__store_timestamp =
Misc.to_mills(message.system_properties.store_timestamp)
if message.system_properties.delivery_timestamp:
self.__delivery_timestamp =
Misc.to_mills(message.system_properties.delivery_timestamp)
+ if message.system_properties.receipt_handle:
+ self.__receipt_handle =
message.system_properties.receipt_handle
+ if message.system_properties.lite_topic:
+ self.__lite_topic = message.system_properties.lite_topic
+ if message.system_properties.message_group:
+ self.__message_group = message.system_properties.message_group
+ if message.system_properties.delivery_attempt:
+ self.__delivery_attempt =
message.system_properties.delivery_attempt
+ if message.system_properties.priority is not None:
+ self.__priority = message.system_properties.priority
+
+ # decode time
self.__decode_message_timestamp = Misc.current_mills()
- self.__receipt_handle = message.system_properties.receipt_handle
- self.__message_type = message.system_properties.message_type
- if not message.system_properties.keys:
- self.__keys.update(message.system_properties.keys)
- if not message.user_properties:
- self.__properties.update(message.user_properties)
return self
except Exception as e:
raise e
@@ -137,6 +151,10 @@ class Message:
def lite_topic(self):
return self.__lite_topic
+ @property
+ def priority(self):
+ return self.__priority
+
@property
def namespace(self):
return self.__namespace
@@ -224,8 +242,22 @@ class Message:
raise IllegalArgumentException("lite_topic and message_group
should not be set at same time.")
if self.__delivery_timestamp:
raise IllegalArgumentException("lite_topic and delivery_timestamp
should not be set at same time.")
+ if self.__priority:
+ raise IllegalArgumentException("lite_topic and priority should not
be set at same time.")
self.__lite_topic = lite_topic
+ @priority.setter
+ def priority(self, priority: int):
+ if priority < 0:
+ raise IllegalArgumentException("priority must be greater than or
equal to 0.")
+ if self.__delivery_timestamp:
+ raise IllegalArgumentException("priority and delivery_timestamp
should not be set at same time.")
+ if self.__message_group:
+ raise IllegalArgumentException("priority and message_group should
not be set at same time.")
+ if self.__lite_topic:
+ raise IllegalArgumentException("priority and lite_topic should not
be set at same time.")
+ self.__priority = priority
+
@message_id.setter
def message_id(self, message_id):
self.__message_id = message_id
@@ -243,9 +275,11 @@ class Message:
if not message_group or not message_group.strip():
raise IllegalArgumentException("message_group should not be
blank.")
if self.__delivery_timestamp:
- raise IllegalArgumentException("delivery_timestamp and
message_group should not be set at same time.")
+ raise IllegalArgumentException("message_group and
delivery_timestamp should not be set at same time.")
if self.__lite_topic:
- raise IllegalArgumentException("lite_topic and message_group
should not be set at same time.")
+ raise IllegalArgumentException("message_group and lite_topic
should not be set at same time.")
+ if self.__priority:
+ raise IllegalArgumentException("message_group and priority should
not be set at same time.")
self.__message_group = message_group
@delivery_timestamp.setter
@@ -253,7 +287,9 @@ class Message:
if self.__message_group:
raise IllegalArgumentException("delivery_timestamp and
message_group should not be set at same time.")
if self.__lite_topic:
- raise IllegalArgumentException("lite_topic and delivery_timestamp
should not be set at same time.")
+ raise IllegalArgumentException("delivery_timestamp and lite_topic
should not be set at same time.")
+ if self.__priority:
+ raise IllegalArgumentException("delivery_timestamp and priority
should not be set at same time.")
self.__delivery_timestamp = delivery_timestamp
@transport_delivery_timestamp.setter
@@ -298,5 +334,7 @@ class Message:
return "TRANSACTION"
elif message_type == 5:
return "LITE"
+ elif message_type == 6:
+ return "PRIORITY"
else:
return "MESSAGE_TYPE_UNSPECIFIED"
diff --git a/python/rocketmq/v5/producer/producer.py
b/python/rocketmq/v5/producer/producer.py
index ec325005..e044187d 100644
--- a/python/rocketmq/v5/producer/producer.py
+++ b/python/rocketmq/v5/producer/producer.py
@@ -498,6 +498,8 @@ class Producer(Client):
msg.body = message.body
if message.lite_topic:
msg.system_properties.lite_topic = message.lite_topic
+ if message.priority is not None and message.priority >= 0:
+ msg.system_properties.priority = message.priority
if message.tag:
msg.system_properties.tag = message.tag
if message.keys:
@@ -525,6 +527,7 @@ class Producer(Client):
not message.message_group
and not message.lite_topic
and not message.delivery_timestamp
+ and message.priority is None
and not is_transaction
):
return MessageType.NORMAL
@@ -538,9 +541,14 @@ class Producer(Client):
if message.delivery_timestamp and not is_transaction:
return MessageType.DELAY
+ if message.priority is not None and message.priority >= 0 and not
is_transaction:
+ return MessageType.PRIORITY
+
if (
not message.message_group
and not message.delivery_timestamp
+ and not message.lite_topic
+ and message.priority is None
and is_transaction
):
return MessageType.TRANSACTION
@@ -550,7 +558,7 @@ class Producer(Client):
f"{self} set send message type exception, message: {str(message)}"
)
raise IllegalArgumentException(
- "transactional message should not set messageGroup or
deliveryTimestamp"
+ "transactional message should not set message_group,
delivery_timestamp, lite_topic and priority"
)
def __select_send_queue(self, message):
diff --git a/python/rocketmq/v5/util/misc.py b/python/rocketmq/v5/util/misc.py
index a377ab4f..24cb29ff 100644
--- a/python/rocketmq/v5/util/misc.py
+++ b/python/rocketmq/v5/util/misc.py
@@ -31,7 +31,7 @@ class Misc:
__OS_NAME = None
TOPIC_PATTERN = compile(r"^[%a-zA-Z0-9_-]+$")
CONSUMER_GROUP_PATTERN = compile(r"^[%a-zA-Z0-9_-]+$")
- SDK_VERSION = "5.1.0"
+ SDK_VERSION = "5.1.1"
@staticmethod
def sdk_language():