This is an automated email from the ASF dual-hosted git repository.
lollipop pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/rocketmq-clients.git
The following commit(s) were added to refs/heads/master by this push:
new 6eba1034 adapt to localService (#973)
6eba1034 is described below
commit 6eba1034799d22dfc09a45dba6bbc6afec3f5592
Author: zhouli11 <[email protected]>
AuthorDate: Mon Mar 31 11:05:01 2025 +0800
adapt to localService (#973)
* update to 5.0.3
1、use grpc.use_local_subchannel_pool
2、adapt to localService
3、fifo、delay message example
---
python/example/async_producer_example.py | 6 +
python/example/async_simple_consumer_example.py | 7 +-
...oducer_example.py => delay_producer_example.py} | 13 +-
...roducer_example.py => fifo_producer_example.py} | 12 +-
python/example/normal_producer_example.py | 6 +
python/example/simple_consumer_example.py | 5 +
python/example/transaction_producer_example.py | 9 +-
.../rocketmq/v5/client/balancer/queue_selector.py | 7 +
.../rocketmq/v5/client/connection/rpc_channel.py | 5 +-
python/rocketmq/v5/consumer/simple_consumer.py | 258 ++++++++-------------
python/rocketmq/v5/model/message.py | 22 ++
python/rocketmq/v5/model/topic_route.py | 10 +
python/rocketmq/v5/producer/producer.py | 28 ++-
13 files changed, 208 insertions(+), 180 deletions(-)
diff --git a/python/example/async_producer_example.py
b/python/example/async_producer_example.py
index fd5e5a08..23a9b837 100644
--- a/python/example/async_producer_example.py
+++ b/python/example/async_producer_example.py
@@ -31,6 +31,8 @@ if __name__ == '__main__':
# if auth enable
# credentials = Credentials("ak", "sk")
config = ClientConfiguration(endpoints, credentials)
+ # with namespace
+ # config = ClientConfiguration(endpoints, credentials, "namespace")
topic = "topic"
producer = Producer(config, (topic,))
@@ -39,10 +41,14 @@ if __name__ == '__main__':
try:
for i in range(10):
msg = Message()
+ # topic for the current message
msg.topic = topic
msg.body = "hello, rocketmq.".encode('utf-8')
+ # secondary classifier of message besides topic
msg.tag = "rocketmq-send-message"
+ # key(s) of the message, another way to mark message besides
message id
msg.keys = "send_async"
+ # user property for the message
msg.add_property("send", "async")
send_result_future = producer.send_async(msg)
send_result_future.add_done_callback(handle_send_result)
diff --git a/python/example/async_simple_consumer_example.py
b/python/example/async_simple_consumer_example.py
index 5b341d5d..06cf4911 100644
--- a/python/example/async_simple_consumer_example.py
+++ b/python/example/async_simple_consumer_example.py
@@ -35,6 +35,7 @@ def receive_callback(receive_result_future, consumer):
print(f"{consumer.__str__()} receive {len(messages)} messages.")
for msg in messages:
try:
+ # consume message in other thread, don't block the async receive
thread
consume_executor.submit(consume_message, consumer=consumer,
message=msg)
except Exception as exception:
print(f"receive message raise exception: {exception}")
@@ -46,8 +47,11 @@ if __name__ == '__main__':
# if auth enable
# credentials = Credentials("ak", "sk")
config = ClientConfiguration(endpoints, credentials)
+ # with namespace
+ # config = ClientConfiguration(endpoints, credentials, "namespace")
topic = "topic"
-
+ # in most case, you don't need to create too many consumers, singleton
pattern is recommended
+ # close the simple consumer when you don't need it anymore
simple_consumer = SimpleConsumer(config, "consumer-group")
try:
simple_consumer.startup()
@@ -58,6 +62,7 @@ if __name__ == '__main__':
while True:
try:
time.sleep(1)
+ # max message num for each long polling and message
invisible duration after it is received
future = simple_consumer.receive_async(32, 15)
future.add_done_callback(functools.partial(receive_callback,
consumer=simple_consumer))
except Exception as e:
diff --git a/python/example/normal_producer_example.py
b/python/example/delay_producer_example.py
similarity index 82%
copy from python/example/normal_producer_example.py
copy to python/example/delay_producer_example.py
index 16758d97..13818748 100644
--- a/python/example/normal_producer_example.py
+++ b/python/example/delay_producer_example.py
@@ -12,6 +12,7 @@
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
+import time
from rocketmq import ClientConfiguration, Credentials, Message, Producer
@@ -21,18 +22,22 @@ if __name__ == '__main__':
# if auth enable
# credentials = Credentials("ak", "sk")
config = ClientConfiguration(endpoints, credentials)
- topic = "topic"
+ # with namespace
+ # config = ClientConfiguration(endpoints, credentials, "namespace")
+ topic = "delay-topic"
producer = Producer(config, (topic,))
try:
producer.startup()
try:
msg = Message()
+ # topic for the current message
msg.topic = topic
msg.body = "hello, rocketmq.".encode('utf-8')
- msg.tag = "rocketmq-send-message"
- msg.keys = "send_sync"
- msg.add_property("send", "sync")
+ # secondary classifier of message besides topic
+ msg.tag = "rocketmq-send-delay-message"
+ # delay 10 seconds
+ msg.delivery_timestamp = int(time.time()) + 10
res = producer.send(msg)
print(f"{producer.__str__()} send message success. {res}")
producer.shutdown()
diff --git a/python/example/normal_producer_example.py
b/python/example/fifo_producer_example.py
similarity index 81%
copy from python/example/normal_producer_example.py
copy to python/example/fifo_producer_example.py
index 16758d97..1990fa0d 100644
--- a/python/example/normal_producer_example.py
+++ b/python/example/fifo_producer_example.py
@@ -21,18 +21,22 @@ if __name__ == '__main__':
# if auth enable
# credentials = Credentials("ak", "sk")
config = ClientConfiguration(endpoints, credentials)
- topic = "topic"
+ # with namespace
+ # config = ClientConfiguration(endpoints, credentials, "namespace")
+ topic = "fifo-topic"
producer = Producer(config, (topic,))
try:
producer.startup()
try:
msg = Message()
+ # topic for the current message
msg.topic = topic
msg.body = "hello, rocketmq.".encode('utf-8')
- msg.tag = "rocketmq-send-message"
- msg.keys = "send_sync"
- msg.add_property("send", "sync")
+ # secondary classifier of message besides topic
+ msg.tag = "rocketmq-send-fifo-message"
+ # message group decides the message delivery order
+ msg.message_group = "your-message-group0"
res = producer.send(msg)
print(f"{producer.__str__()} send message success. {res}")
producer.shutdown()
diff --git a/python/example/normal_producer_example.py
b/python/example/normal_producer_example.py
index 16758d97..a867a0a3 100644
--- a/python/example/normal_producer_example.py
+++ b/python/example/normal_producer_example.py
@@ -21,6 +21,8 @@ if __name__ == '__main__':
# if auth enable
# credentials = Credentials("ak", "sk")
config = ClientConfiguration(endpoints, credentials)
+ # with namespace
+ # config = ClientConfiguration(endpoints, credentials, "namespace")
topic = "topic"
producer = Producer(config, (topic,))
@@ -28,10 +30,14 @@ if __name__ == '__main__':
producer.startup()
try:
msg = Message()
+ # topic for the current message
msg.topic = topic
msg.body = "hello, rocketmq.".encode('utf-8')
+ # secondary classifier of message besides topic
msg.tag = "rocketmq-send-message"
+ # key(s) of the message, another way to mark message besides
message id
msg.keys = "send_sync"
+ # user property for the message
msg.add_property("send", "sync")
res = producer.send(msg)
print(f"{producer.__str__()} send message success. {res}")
diff --git a/python/example/simple_consumer_example.py
b/python/example/simple_consumer_example.py
index 77526fe5..4d2eedaa 100644
--- a/python/example/simple_consumer_example.py
+++ b/python/example/simple_consumer_example.py
@@ -21,7 +21,11 @@ if __name__ == '__main__':
# if auth enable
# credentials = Credentials("ak", "sk")
config = ClientConfiguration(endpoints, credentials)
+ # with namespace
+ # config = ClientConfiguration(endpoints, credentials, "namespace")
topic = "topic"
+ # in most case, you don't need to create too many consumers, singleton
pattern is recommended
+ # close the simple consumer when you don't need it anymore
simple_consumer = SimpleConsumer(config, "consumer-group")
try:
simple_consumer.startup()
@@ -31,6 +35,7 @@ if __name__ == '__main__':
# simple_consumer.subscribe(topic, FilterExpression("tag"))
while True:
try:
+ # max message num for each long polling and message
invisible duration after it is received
messages = simple_consumer.receive(32, 15)
if messages is not None:
print(f"{simple_consumer.__str__()} receive
{len(messages)} messages.")
diff --git a/python/example/transaction_producer_example.py
b/python/example/transaction_producer_example.py
index fa0abdd3..0effc7cd 100644
--- a/python/example/transaction_producer_example.py
+++ b/python/example/transaction_producer_example.py
@@ -30,6 +30,8 @@ if __name__ == '__main__':
# if auth enable
# credentials = Credentials("ak", "sk")
config = ClientConfiguration(endpoints, credentials)
+ # with namespace
+ # config = ClientConfiguration(endpoints, credentials, "namespace")
topic = "topic"
check_from_server = True # commit message from server check
producer = Producer(config, (topic,), checker=TestChecker())
@@ -42,18 +44,23 @@ if __name__ == '__main__':
try:
transaction = producer.begin_transaction()
msg = Message()
+ # topic for the current message
msg.topic = topic
msg.body = "hello, rocketmq.".encode('utf-8')
+ # secondary classifier of message besides topic
msg.tag = "rocketmq-send-transaction-message"
+ # key(s) of the message, another way to mark message besides message id
msg.keys = "send_transaction"
+ # user property for the message
msg.add_property("send", "transaction")
res = producer.send(msg, transaction)
print(f"send message: {res}")
if check_from_server:
+ # wait for server check in TransactionChecker's check
input("Please Enter to Stop the Application.\r\n")
producer.shutdown()
else:
- # producer directly commit or rollback
+ # direct commit or rollback
transaction.commit()
print(f"producer commit message:{transaction.message_id}")
# transaction.rollback()
diff --git a/python/rocketmq/v5/client/balancer/queue_selector.py
b/python/rocketmq/v5/client/balancer/queue_selector.py
index 94ed97f7..bd48f21c 100644
--- a/python/rocketmq/v5/client/balancer/queue_selector.py
+++ b/python/rocketmq/v5/client/balancer/queue_selector.py
@@ -13,6 +13,7 @@
# See the License for the specific language governing permissions and
# limitations under the License.
+import hashlib
import random
from rocketmq.v5.exception import IllegalArgumentException
@@ -64,6 +65,12 @@ class QueueSelector:
self.__index.get_and_increment() % len(self.__message_queues)
]
+ def select_queue_by_hash_key(self, key):
+ hash_object = hashlib.sha256(key.encode('utf-8'))
+ hash_code = int.from_bytes(hash_object.digest(), byteorder='big')
+ print(f"hashcode: {hash_code}")
+ return self.__message_queues[hash_code % len(self.__message_queues)]
+
def all_queues(self):
index = self.__index.get_and_increment() % len(self.__message_queues)
return self.__message_queues[index:] + self.__message_queues[:index]
diff --git a/python/rocketmq/v5/client/connection/rpc_channel.py
b/python/rocketmq/v5/client/connection/rpc_channel.py
index 1783026e..b421425e 100644
--- a/python/rocketmq/v5/client/connection/rpc_channel.py
+++ b/python/rocketmq/v5/client/connection/rpc_channel.py
@@ -88,7 +88,7 @@ class RpcEndpoints:
or len(self.__addresses) == 0
or self.__scheme == AddressScheme.ADDRESS_SCHEME_UNSPECIFIED
):
- return ""
+ return "", ""
prefix = "dns:"
if self.__scheme == AddressScheme.IPv4:
@@ -227,6 +227,7 @@ class RpcChannel:
("grpc.enable_retries", 0),
("grpc.max_send_message_length", -1),
("grpc.max_receive_message_length", -1),
+ ("grpc.use_local_subchannel_pool", 1),
]
if self.__tls_enabled:
self.__async_channel = aio.secure_channel(
@@ -237,7 +238,7 @@ class RpcChannel:
self.__endpoints.facade, options
)
self.__async_stub = MessagingServiceStub(self.__async_channel)
- logger.debug(
+ logger.info(
f"create_aio_channel to [{self.__endpoints.__str__()}]
success. channel state:{self.__async_channel.get_state()}"
)
except Exception as e:
diff --git a/python/rocketmq/v5/consumer/simple_consumer.py
b/python/rocketmq/v5/consumer/simple_consumer.py
index 638416d6..7a19c903 100644
--- a/python/rocketmq/v5/consumer/simple_consumer.py
+++ b/python/rocketmq/v5/consumer/simple_consumer.py
@@ -103,58 +103,69 @@ class SimpleConsumer(Client):
self._remove_unused_topic_route_data(topic)
def receive(self, max_message_num, invisible_duration):
- if self.is_running is False:
- raise IllegalStateException(
- "unable to receive messages because simple consumer is not
running"
+ try:
+ future, queue = self.__receive(max_message_num, invisible_duration)
+ read_future = asyncio.run_coroutine_threadsafe(
+ self.__receive_message_response(future.result()),
+ self._rpc_channel_io_loop(),
)
-
- return self.__receive(max_message_num, invisible_duration)
+ return
self.__handle_receive_message_response(read_future.result(), queue)
+ except Exception as e:
+ raise e
def receive_async(self, max_message_num, invisible_duration):
- if self.is_running is False:
- raise IllegalStateException(
- "unable to receive messages because simple consumer is not
running"
+ try:
+ future, queue = self.__receive(max_message_num, invisible_duration)
+ read_future = asyncio.run_coroutine_threadsafe(
+ self.__receive_message_response(future.result()),
+ self._rpc_channel_io_loop(),
)
-
- return self.__receive_async(max_message_num, invisible_duration)
-
- def ack(self, message: Message):
- if self.is_running is False:
- raise IllegalStateException(
- "unable to ack message because simple consumer is not running"
+ ret_future = Future()
+ handle_send_receipt_callback = functools.partial(
+ self.__receive_message_callback, ret_future=ret_future,
queue=queue
)
+ read_future.add_done_callback(handle_send_receipt_callback)
+ return ret_future
+ except Exception as e:
+ raise e
- queue = self.__select_topic_queue(message.topic)
- return self.__ack(message, queue)
+ def ack(self, message: Message):
+ try:
+ future = self.__ack(message)
+ self.__handle_ack_result(future)
+ except Exception as e:
+ raise e
def ack_async(self, message: Message):
- if self.is_running is False:
- raise IllegalStateException(
- "unable to ack message because simple consumer is not running"
+ try:
+ future = self.__ack(message)
+ ret_future = Future()
+ ack_callback = functools.partial(
+ self.__handle_ack_result, ret_future=ret_future
)
-
- queue = self.__select_topic_queue(message.topic)
- return self.__ack_async(message, queue)
+ future.add_done_callback(ack_callback)
+ return ret_future
+ except Exception as e:
+ raise e
def change_invisible_duration(self, message: Message, invisible_duration):
- if self.is_running is False:
- raise IllegalStateException(
- "unable to change invisible duration because simple consumer
is not running"
- )
-
- queue = self.__select_topic_queue(message.topic)
- return self.__change_invisible_duration(message, queue,
invisible_duration)
+ try:
+ future = self.__change_invisible_duration(message,
invisible_duration)
+ self.__handle_change_invisible_result(future, message)
+ except Exception as e:
+ raise e
def change_invisible_duration_async(self, message: Message,
invisible_duration):
- if self.is_running is False:
- raise IllegalStateException(
- "unable to change invisible duration because simple consumer
is not running"
+ try:
+ future = self.__change_invisible_duration(message,
invisible_duration)
+ ret_future = Future()
+ change_invisible_callback = functools.partial(
+ self.__handle_change_invisible_result, message=message,
ret_future=ret_future
)
-
- queue = self.__select_topic_queue(message.topic)
- return self.__change_invisible_duration_async(
- message, queue, invisible_duration
- )
+ future.add_done_callback(change_invisible_callback)
+ return ret_future
+ except Exception as e:
+ raise e
""" override """
@@ -245,44 +256,6 @@ class SimpleConsumer(Client):
logger.error(f"simple consumer select topic queue raise exception:
{e}")
raise e
- def __receive(self, max_message_num, invisible_duration):
- self.__receive_pre_check(max_message_num)
- topic = self.__select_topic_for_receive()
- queue = self.__select_topic_queue(topic)
- req = self.__receive_req(topic, queue, max_message_num,
invisible_duration)
- timeout = self.client_configuration.request_timeout +
self.__await_duration
- future = self.rpc_client.receive_message_async(
- queue.endpoints, req, metadata=self._sign(), timeout=timeout
- )
- read_future = asyncio.run_coroutine_threadsafe(
- self.__receive_message_response(future.result()),
- self._rpc_channel_io_loop(),
- )
- return self.__handle_receive_message_response(read_future.result())
-
- def __receive_async(self, max_message_num, invisible_duration):
- try:
- self.__receive_pre_check(max_message_num)
- topic = self.__select_topic_for_receive()
- queue = self.__select_topic_queue(topic)
- req = self.__receive_req(topic, queue, max_message_num,
invisible_duration)
- timeout = self.client_configuration.request_timeout +
self.__await_duration
- future = self.rpc_client.receive_message_async(
- queue.endpoints, req, metadata=self._sign(), timeout=timeout
- )
- read_future = asyncio.run_coroutine_threadsafe(
- self.__receive_message_response(future.result()),
- self._rpc_channel_io_loop(),
- )
- ret_future = Future()
- handle_send_receipt_callback = functools.partial(
- self.__receive_message_callback, ret_future=ret_future
- )
- read_future.add_done_callback(handle_send_receipt_callback)
- return ret_future
- except Exception as e:
- raise e
-
def __receive_pre_check(self, max_message_num):
if self.is_running is False:
raise IllegalStateException("consumer is not running now.")
@@ -305,10 +278,27 @@ class SimpleConsumer(Client):
req.auto_renew = False
return req
- def __receive_message_callback(self, future, ret_future):
+ def __receive(self, max_message_num, invisible_duration):
+ if self.is_running is False:
+ raise IllegalStateException(
+ "unable to receive messages because simple consumer is not
running"
+ )
+ try:
+ self.__receive_pre_check(max_message_num)
+ topic = self.__select_topic_for_receive()
+ queue = self.__select_topic_queue(topic)
+ req = self.__receive_req(topic, queue, max_message_num,
invisible_duration)
+ timeout = self.client_configuration.request_timeout +
self.__await_duration
+ return self.rpc_client.receive_message_async(
+ queue.endpoints, req, metadata=self._sign(), timeout=timeout
+ ), queue
+ except Exception as e:
+ raise e
+
+ def __receive_message_callback(self, future, ret_future, queue):
try:
responses = future.result()
- messages = self.__handle_receive_message_response(responses)
+ messages = self.__handle_receive_message_response(responses, queue)
self._submit_callback(
CallbackResult.async_receive_callback_result(ret_future,
messages)
)
@@ -333,7 +323,7 @@ class SimpleConsumer(Client):
)
raise e
- def __handle_receive_message_response(self, responses):
+ def __handle_receive_message_response(self, responses, queue):
messages = list()
status = None
@@ -344,50 +334,15 @@ class SimpleConsumer(Client):
)
status = res.status
elif res.HasField("message"):
- messages.append(Message().fromProtobuf(res.message))
+ msg = Message().fromProtobuf(res.message)
+ msg.endpoints = queue.endpoints
+ messages.append(msg)
MessagingResultChecker.check(status)
return messages
# ack message
- def __ack(self, message: Message, queue):
- if self.is_running is False:
- raise IllegalStateException("consumer is not running now.")
-
- try:
- req = self.__ack_req(message)
- future = self.rpc_client.ack_message_async(
- queue.endpoints,
- req,
- metadata=self._sign(),
- timeout=self.client_configuration.request_timeout,
- )
- self.__handle_ack_result(future)
- except Exception as e:
- raise e
-
- def __ack_async(self, message: Message, queue):
- if self.is_running is False:
- raise IllegalStateException("consumer is not running now.")
-
- try:
- req = self.__ack_req(message)
- future = self.rpc_client.ack_message_async(
- queue.endpoints,
- req,
- metadata=self._sign(),
- timeout=self.client_configuration.request_timeout,
- )
- ret_future = Future()
- ack_callback = functools.partial(
- self.__handle_ack_result, ret_future=ret_future
- )
- future.add_done_callback(ack_callback)
- return ret_future
- except Exception as e:
- raise e
-
def __ack_req(self, message: Message):
req = AckMessageRequest()
req.group.name = self.__consumer_group
@@ -401,6 +356,21 @@ class SimpleConsumer(Client):
req.entries.append(msg_entry)
return req
+ def __ack(self, message: Message):
+ if self.is_running is False:
+ raise IllegalStateException(
+ "unable to ack message because simple consumer is not running"
+ )
+ try:
+ return self.rpc_client.ack_message_async(
+ message.endpoints,
+ self.__ack_req(message),
+ metadata=self._sign(),
+ timeout=self.client_configuration.request_timeout,
+ )
+ except Exception as e:
+ raise e
+
def __handle_ack_result(self, future, ret_future=None):
try:
res = future.result()
@@ -422,45 +392,6 @@ class SimpleConsumer(Client):
# change_invisible
- def __change_invisible_duration(self, message: Message, queue,
invisible_duration):
- if self.is_running is False:
- raise IllegalStateException("consumer is not running now.")
-
- try:
- req = self.__change_invisible_req(message, invisible_duration)
- future = self.rpc_client.change_invisible_duration_async(
- queue.endpoints,
- req,
- metadata=self._sign(),
- timeout=self.client_configuration.request_timeout,
- )
- self.__handle_change_invisible_result(future, message)
- except Exception as e:
- raise e
-
- def __change_invisible_duration_async(
- self, message: Message, queue, invisible_duration
- ):
- if self.is_running is False:
- raise IllegalArgumentException("consumer is not running now.")
-
- try:
- req = self.__change_invisible_req(message, invisible_duration)
- future = self.rpc_client.change_invisible_duration_async(
- queue.endpoints,
- req,
- metadata=self._sign(),
- timeout=self.client_configuration.request_timeout,
- )
- ret_future = Future()
- change_invisible_callback = functools.partial(
- self.__handle_change_invisible_result, message=message,
ret_future=ret_future
- )
- future.add_done_callback(change_invisible_callback)
- return ret_future
- except Exception as e:
- raise e
-
def __change_invisible_req(self, message: Message, invisible_duration):
req = ChangeInvisibleDurationRequest()
req.topic.name = message.topic
@@ -472,6 +403,21 @@ class SimpleConsumer(Client):
req.message_id = message.message_id
return req
+ def __change_invisible_duration(self, message: Message,
invisible_duration):
+ if self.is_running is False:
+ raise IllegalStateException(
+ "unable to change invisible duration because simple consumer
is not running"
+ )
+ try:
+ return self.rpc_client.change_invisible_duration_async(
+ message.endpoints,
+ self.__change_invisible_req(message, invisible_duration),
+ metadata=self._sign(),
+ timeout=self.client_configuration.request_timeout,
+ )
+ except Exception as e:
+ raise e
+
def __handle_change_invisible_result(self, future, message,
ret_future=None):
try:
res = future.result()
diff --git a/python/rocketmq/v5/model/message.py
b/python/rocketmq/v5/model/message.py
index dee7dad4..ce66a31a 100644
--- a/python/rocketmq/v5/model/message.py
+++ b/python/rocketmq/v5/model/message.py
@@ -35,6 +35,7 @@ class Message:
self.__delivery_attempt = None
self.__receipt_handle = None
self.__message_type = None
+ self.__endpoints = None
def __str__(self) -> str:
return (
@@ -165,6 +166,10 @@ class Message:
def message_type(self):
return self.__message_type
+ @property
+ def endpoints(self):
+ return self.__endpoints
+
@body.setter
def body(self, body):
if body is None or body.strip() == "":
@@ -227,9 +232,26 @@ class Message:
def message_type(self, message_type):
self.__message_type = message_type
+ @endpoints.setter
+ def endpoints(self, endpoints):
+ self.__endpoints = endpoints
+
def add_property(self, key, value):
if key is None or key.strip() == "":
raise IllegalArgumentException("key should not be blank")
if value is None or value.strip() == "":
raise IllegalArgumentException("value should not be blank")
self.__properties[key] = value
+
+ @staticmethod
+ def message_type_desc(message_type):
+ if message_type == 1:
+ return "NORMAL"
+ elif message_type == 2:
+ return "FIFO"
+ elif message_type == 3:
+ return "DELAY"
+ elif message_type == 4:
+ return "TRANSACTION"
+ else:
+ return "MESSAGE_TYPE_UNSPECIFIED"
diff --git a/python/rocketmq/v5/model/topic_route.py
b/python/rocketmq/v5/model/topic_route.py
index b93728e2..8c227b01 100644
--- a/python/rocketmq/v5/model/topic_route.py
+++ b/python/rocketmq/v5/model/topic_route.py
@@ -15,6 +15,7 @@
from rocketmq.grpc_protocol import Permission, definition_pb2
from rocketmq.v5.client.connection import RpcEndpoints
+from rocketmq.v5.model import Message
class MessageQueue:
@@ -77,6 +78,15 @@ class MessageQueue:
queue.accept_message_types.extend(self.__accept_message_types)
return queue
+ def accept_message_types_desc(self):
+ ret = ""
+ for access_type in self.__accept_message_types:
+ ret = ret + Message.message_type_desc(access_type) + ","
+ if len(ret) == 0:
+ return ret
+ else:
+ return ret[:len(ret) - 1]
+
""" property """
@property
diff --git a/python/rocketmq/v5/producer/producer.py
b/python/rocketmq/v5/producer/producer.py
index 2a9bba70..5072d90a 100644
--- a/python/rocketmq/v5/producer/producer.py
+++ b/python/rocketmq/v5/producer/producer.py
@@ -148,10 +148,10 @@ class Producer(Client):
raise IllegalStateException("producer is not running now.")
self.__wrap_sending_message(message, False if transaction is None else
True)
- topic_queue = self.__select_send_queue(message.topic)
+ topic_queue = self.__select_send_queue(message)
if message.message_type not in topic_queue.accept_message_types:
raise IllegalArgumentException(
- f"current message type not match with queue accept message
types, topic:{message.topic}, message_type:{message.message_type}, queue access
type:{topic_queue.accept_message_types}"
+ f"current message type not match with queue accept message
types, topic:{message.topic},
message_type:{Message.message_type_desc(message.message_type)}, queue access
type:{topic_queue.accept_message_types_desc()}"
)
if transaction is None:
@@ -180,12 +180,12 @@ class Producer(Client):
raise IllegalStateException("producer is not running now.")
self.__wrap_sending_message(message, False)
- topic_queue = self.__select_send_queue(message.topic)
+ topic_queue = self.__select_send_queue(message)
if message.message_type not in topic_queue.accept_message_types:
raise IllegalArgumentException(
f"current message type not match with queue accept message
types, "
- f"topic:{message.topic}, message_type:{message.message_type}, "
- f"queue access type:{topic_queue.accept_message_types}"
+ f"topic:{message.topic},
message_type:{Message.message_type_desc(message.message_type)}, "
+ f"queue access type:{topic_queue.accept_message_types_desc()}"
)
try:
@@ -303,6 +303,7 @@ class Producer(Client):
def __send(self, message: Message, topic_queue, attempt=1) -> SendReceipt:
req = self.__send_req(message)
send_context = self.client_metrics.send_before(message.topic)
+ print(f"{topic_queue}")
send_message_future = self.rpc_client.send_message_async(
topic_queue.endpoints,
req,
@@ -338,7 +339,7 @@ class Producer(Client):
raise retry_exception_future.exception()
# resend message
- topic_queue = self.__select_send_queue(message.topic)
+ topic_queue = self.__select_send_queue(message)
return self.__send(message, topic_queue, attempt)
def __send_async(self, message: Message, topic_queue, attempt=1,
ret_future=None):
@@ -395,7 +396,7 @@ class Producer(Client):
)
return
# resend message
- topic_queue = self.__select_send_queue(message.topic)
+ topic_queue = self.__select_send_queue(message)
self.__send_async(message, topic_queue, attempt, ret_future)
def __process_send_message_response(self, send_message_future,
topic_queue):
@@ -501,15 +502,18 @@ class Producer(Client):
"transactional message should not set messageGroup or
deliveryTimestamp"
)
- def __select_send_queue(self, topic):
+ def __select_send_queue(self, message):
try:
- route = self._retrieve_topic_route_data(topic)
+ route = self._retrieve_topic_route_data(message.topic)
queue_selector = self.__send_queue_selectors.put_if_absent(
- topic, QueueSelector.producer_queue_selector(route)
+ message.topic, QueueSelector.producer_queue_selector(route)
)
- return queue_selector.select_next_queue()
+ if message.message_group is None:
+ return queue_selector.select_next_queue()
+ else:
+ return
queue_selector.select_queue_by_hash_key(message.message_group)
except Exception as e:
- logger.error(f"producer select topic:{topic} queue raise
exception, {e}")
+ logger.error(f"producer select topic:{message.topic} queue raise
exception, {e}")
raise e
def __end_transaction_req(self, message: Message, transaction_id, result,
source):