This is an automated email from the ASF dual-hosted git repository.

lollipop pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/rocketmq-clients.git


The following commit(s) were added to refs/heads/master by this push:
     new 6eba1034 adapt to localService (#973)
6eba1034 is described below

commit 6eba1034799d22dfc09a45dba6bbc6afec3f5592
Author: zhouli11 <[email protected]>
AuthorDate: Mon Mar 31 11:05:01 2025 +0800

    adapt to localService (#973)
    
    * update to 5.0.3
    1、use grpc.use_local_subchannel_pool
    2、adapt to localService
    3、fifo、delay message example
---
 python/example/async_producer_example.py           |   6 +
 python/example/async_simple_consumer_example.py    |   7 +-
 ...oducer_example.py => delay_producer_example.py} |  13 +-
 ...roducer_example.py => fifo_producer_example.py} |  12 +-
 python/example/normal_producer_example.py          |   6 +
 python/example/simple_consumer_example.py          |   5 +
 python/example/transaction_producer_example.py     |   9 +-
 .../rocketmq/v5/client/balancer/queue_selector.py  |   7 +
 .../rocketmq/v5/client/connection/rpc_channel.py   |   5 +-
 python/rocketmq/v5/consumer/simple_consumer.py     | 258 ++++++++-------------
 python/rocketmq/v5/model/message.py                |  22 ++
 python/rocketmq/v5/model/topic_route.py            |  10 +
 python/rocketmq/v5/producer/producer.py            |  28 ++-
 13 files changed, 208 insertions(+), 180 deletions(-)

diff --git a/python/example/async_producer_example.py 
b/python/example/async_producer_example.py
index fd5e5a08..23a9b837 100644
--- a/python/example/async_producer_example.py
+++ b/python/example/async_producer_example.py
@@ -31,6 +31,8 @@ if __name__ == '__main__':
     # if auth enable
     # credentials = Credentials("ak", "sk")
     config = ClientConfiguration(endpoints, credentials)
+    # with namespace
+    # config = ClientConfiguration(endpoints, credentials, "namespace")
     topic = "topic"
     producer = Producer(config, (topic,))
 
@@ -39,10 +41,14 @@ if __name__ == '__main__':
         try:
             for i in range(10):
                 msg = Message()
+                # topic for the current message
                 msg.topic = topic
                 msg.body = "hello, rocketmq.".encode('utf-8')
+                # secondary classifier of message besides topic
                 msg.tag = "rocketmq-send-message"
+                # key(s) of the message, another way to mark message besides 
message id
                 msg.keys = "send_async"
+                # user property for the message
                 msg.add_property("send", "async")
                 send_result_future = producer.send_async(msg)
                 send_result_future.add_done_callback(handle_send_result)
diff --git a/python/example/async_simple_consumer_example.py 
b/python/example/async_simple_consumer_example.py
index 5b341d5d..06cf4911 100644
--- a/python/example/async_simple_consumer_example.py
+++ b/python/example/async_simple_consumer_example.py
@@ -35,6 +35,7 @@ def receive_callback(receive_result_future, consumer):
     print(f"{consumer.__str__()} receive {len(messages)} messages.")
     for msg in messages:
         try:
+            # consume message in other thread, don't block the async receive 
thread
             consume_executor.submit(consume_message, consumer=consumer, 
message=msg)
         except Exception as exception:
             print(f"receive message raise exception: {exception}")
@@ -46,8 +47,11 @@ if __name__ == '__main__':
     # if auth enable
     # credentials = Credentials("ak", "sk")
     config = ClientConfiguration(endpoints, credentials)
+    # with namespace
+    # config = ClientConfiguration(endpoints, credentials, "namespace")
     topic = "topic"
-
+    # in most case, you don't need to create too many consumers, singleton 
pattern is recommended
+    # close the simple consumer when you don't need it anymore
     simple_consumer = SimpleConsumer(config, "consumer-group")
     try:
         simple_consumer.startup()
@@ -58,6 +62,7 @@ if __name__ == '__main__':
             while True:
                 try:
                     time.sleep(1)
+                    # max message num for each long polling and message 
invisible duration after it is received
                     future = simple_consumer.receive_async(32, 15)
                     
future.add_done_callback(functools.partial(receive_callback, 
consumer=simple_consumer))
                 except Exception as e:
diff --git a/python/example/normal_producer_example.py 
b/python/example/delay_producer_example.py
similarity index 82%
copy from python/example/normal_producer_example.py
copy to python/example/delay_producer_example.py
index 16758d97..13818748 100644
--- a/python/example/normal_producer_example.py
+++ b/python/example/delay_producer_example.py
@@ -12,6 +12,7 @@
 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 # See the License for the specific language governing permissions and
 # limitations under the License.
+import time
 
 from rocketmq import ClientConfiguration, Credentials, Message, Producer
 
@@ -21,18 +22,22 @@ if __name__ == '__main__':
     # if auth enable
     # credentials = Credentials("ak", "sk")
     config = ClientConfiguration(endpoints, credentials)
-    topic = "topic"
+    # with namespace
+    # config = ClientConfiguration(endpoints, credentials, "namespace")
+    topic = "delay-topic"
     producer = Producer(config, (topic,))
 
     try:
         producer.startup()
         try:
             msg = Message()
+            # topic for the current message
             msg.topic = topic
             msg.body = "hello, rocketmq.".encode('utf-8')
-            msg.tag = "rocketmq-send-message"
-            msg.keys = "send_sync"
-            msg.add_property("send", "sync")
+            # secondary classifier of message besides topic
+            msg.tag = "rocketmq-send-delay-message"
+            # delay 10 seconds
+            msg.delivery_timestamp = int(time.time()) + 10
             res = producer.send(msg)
             print(f"{producer.__str__()} send message success. {res}")
             producer.shutdown()
diff --git a/python/example/normal_producer_example.py 
b/python/example/fifo_producer_example.py
similarity index 81%
copy from python/example/normal_producer_example.py
copy to python/example/fifo_producer_example.py
index 16758d97..1990fa0d 100644
--- a/python/example/normal_producer_example.py
+++ b/python/example/fifo_producer_example.py
@@ -21,18 +21,22 @@ if __name__ == '__main__':
     # if auth enable
     # credentials = Credentials("ak", "sk")
     config = ClientConfiguration(endpoints, credentials)
-    topic = "topic"
+    # with namespace
+    # config = ClientConfiguration(endpoints, credentials, "namespace")
+    topic = "fifo-topic"
     producer = Producer(config, (topic,))
 
     try:
         producer.startup()
         try:
             msg = Message()
+            # topic for the current message
             msg.topic = topic
             msg.body = "hello, rocketmq.".encode('utf-8')
-            msg.tag = "rocketmq-send-message"
-            msg.keys = "send_sync"
-            msg.add_property("send", "sync")
+            # secondary classifier of message besides topic
+            msg.tag = "rocketmq-send-fifo-message"
+            # message group decides the message delivery order
+            msg.message_group = "your-message-group0"
             res = producer.send(msg)
             print(f"{producer.__str__()} send message success. {res}")
             producer.shutdown()
diff --git a/python/example/normal_producer_example.py 
b/python/example/normal_producer_example.py
index 16758d97..a867a0a3 100644
--- a/python/example/normal_producer_example.py
+++ b/python/example/normal_producer_example.py
@@ -21,6 +21,8 @@ if __name__ == '__main__':
     # if auth enable
     # credentials = Credentials("ak", "sk")
     config = ClientConfiguration(endpoints, credentials)
+    # with namespace
+    # config = ClientConfiguration(endpoints, credentials, "namespace")
     topic = "topic"
     producer = Producer(config, (topic,))
 
@@ -28,10 +30,14 @@ if __name__ == '__main__':
         producer.startup()
         try:
             msg = Message()
+            # topic for the current message
             msg.topic = topic
             msg.body = "hello, rocketmq.".encode('utf-8')
+            # secondary classifier of message besides topic
             msg.tag = "rocketmq-send-message"
+            # key(s) of the message, another way to mark message besides 
message id
             msg.keys = "send_sync"
+            # user property for the message
             msg.add_property("send", "sync")
             res = producer.send(msg)
             print(f"{producer.__str__()} send message success. {res}")
diff --git a/python/example/simple_consumer_example.py 
b/python/example/simple_consumer_example.py
index 77526fe5..4d2eedaa 100644
--- a/python/example/simple_consumer_example.py
+++ b/python/example/simple_consumer_example.py
@@ -21,7 +21,11 @@ if __name__ == '__main__':
     # if auth enable
     # credentials = Credentials("ak", "sk")
     config = ClientConfiguration(endpoints, credentials)
+    # with namespace
+    # config = ClientConfiguration(endpoints, credentials, "namespace")
     topic = "topic"
+    # in most case, you don't need to create too many consumers, singleton 
pattern is recommended
+    # close the simple consumer when you don't need it anymore
     simple_consumer = SimpleConsumer(config, "consumer-group")
     try:
         simple_consumer.startup()
@@ -31,6 +35,7 @@ if __name__ == '__main__':
             # simple_consumer.subscribe(topic, FilterExpression("tag"))
             while True:
                 try:
+                    # max message num for each long polling and message 
invisible duration after it is received
                     messages = simple_consumer.receive(32, 15)
                     if messages is not None:
                         print(f"{simple_consumer.__str__()} receive 
{len(messages)} messages.")
diff --git a/python/example/transaction_producer_example.py 
b/python/example/transaction_producer_example.py
index fa0abdd3..0effc7cd 100644
--- a/python/example/transaction_producer_example.py
+++ b/python/example/transaction_producer_example.py
@@ -30,6 +30,8 @@ if __name__ == '__main__':
     # if auth enable
     # credentials = Credentials("ak", "sk")
     config = ClientConfiguration(endpoints, credentials)
+    # with namespace
+    # config = ClientConfiguration(endpoints, credentials, "namespace")
     topic = "topic"
     check_from_server = True  # commit message from server check
     producer = Producer(config, (topic,), checker=TestChecker())
@@ -42,18 +44,23 @@ if __name__ == '__main__':
     try:
         transaction = producer.begin_transaction()
         msg = Message()
+        # topic for the current message
         msg.topic = topic
         msg.body = "hello, rocketmq.".encode('utf-8')
+        # secondary classifier of message besides topic
         msg.tag = "rocketmq-send-transaction-message"
+        # key(s) of the message, another way to mark message besides message id
         msg.keys = "send_transaction"
+        # user property for the message
         msg.add_property("send", "transaction")
         res = producer.send(msg, transaction)
         print(f"send message: {res}")
         if check_from_server:
+            # wait for server check in TransactionChecker's check
             input("Please Enter to Stop the Application.\r\n")
             producer.shutdown()
         else:
-            # producer directly commit or rollback
+            # direct commit or rollback
             transaction.commit()
             print(f"producer commit message:{transaction.message_id}")
             # transaction.rollback()
diff --git a/python/rocketmq/v5/client/balancer/queue_selector.py 
b/python/rocketmq/v5/client/balancer/queue_selector.py
index 94ed97f7..bd48f21c 100644
--- a/python/rocketmq/v5/client/balancer/queue_selector.py
+++ b/python/rocketmq/v5/client/balancer/queue_selector.py
@@ -13,6 +13,7 @@
 # See the License for the specific language governing permissions and
 # limitations under the License.
 
+import hashlib
 import random
 
 from rocketmq.v5.exception import IllegalArgumentException
@@ -64,6 +65,12 @@ class QueueSelector:
             self.__index.get_and_increment() % len(self.__message_queues)
         ]
 
+    def select_queue_by_hash_key(self, key):
+        hash_object = hashlib.sha256(key.encode('utf-8'))
+        hash_code = int.from_bytes(hash_object.digest(), byteorder='big')
+        print(f"hashcode: {hash_code}")
+        return self.__message_queues[hash_code % len(self.__message_queues)]
+
     def all_queues(self):
         index = self.__index.get_and_increment() % len(self.__message_queues)
         return self.__message_queues[index:] + self.__message_queues[:index]
diff --git a/python/rocketmq/v5/client/connection/rpc_channel.py 
b/python/rocketmq/v5/client/connection/rpc_channel.py
index 1783026e..b421425e 100644
--- a/python/rocketmq/v5/client/connection/rpc_channel.py
+++ b/python/rocketmq/v5/client/connection/rpc_channel.py
@@ -88,7 +88,7 @@ class RpcEndpoints:
             or len(self.__addresses) == 0
             or self.__scheme == AddressScheme.ADDRESS_SCHEME_UNSPECIFIED
         ):
-            return ""
+            return "", ""
 
         prefix = "dns:"
         if self.__scheme == AddressScheme.IPv4:
@@ -227,6 +227,7 @@ class RpcChannel:
                     ("grpc.enable_retries", 0),
                     ("grpc.max_send_message_length", -1),
                     ("grpc.max_receive_message_length", -1),
+                    ("grpc.use_local_subchannel_pool", 1),
                 ]
                 if self.__tls_enabled:
                     self.__async_channel = aio.secure_channel(
@@ -237,7 +238,7 @@ class RpcChannel:
                         self.__endpoints.facade, options
                     )
                 self.__async_stub = MessagingServiceStub(self.__async_channel)
-                logger.debug(
+                logger.info(
                     f"create_aio_channel to [{self.__endpoints.__str__()}] 
success. channel state:{self.__async_channel.get_state()}"
                 )
         except Exception as e:
diff --git a/python/rocketmq/v5/consumer/simple_consumer.py 
b/python/rocketmq/v5/consumer/simple_consumer.py
index 638416d6..7a19c903 100644
--- a/python/rocketmq/v5/consumer/simple_consumer.py
+++ b/python/rocketmq/v5/consumer/simple_consumer.py
@@ -103,58 +103,69 @@ class SimpleConsumer(Client):
             self._remove_unused_topic_route_data(topic)
 
     def receive(self, max_message_num, invisible_duration):
-        if self.is_running is False:
-            raise IllegalStateException(
-                "unable to receive messages because simple consumer is not 
running"
+        try:
+            future, queue = self.__receive(max_message_num, invisible_duration)
+            read_future = asyncio.run_coroutine_threadsafe(
+                self.__receive_message_response(future.result()),
+                self._rpc_channel_io_loop(),
             )
-
-        return self.__receive(max_message_num, invisible_duration)
+            return 
self.__handle_receive_message_response(read_future.result(), queue)
+        except Exception as e:
+            raise e
 
     def receive_async(self, max_message_num, invisible_duration):
-        if self.is_running is False:
-            raise IllegalStateException(
-                "unable to receive messages because simple consumer is not 
running"
+        try:
+            future, queue = self.__receive(max_message_num, invisible_duration)
+            read_future = asyncio.run_coroutine_threadsafe(
+                self.__receive_message_response(future.result()),
+                self._rpc_channel_io_loop(),
             )
-
-        return self.__receive_async(max_message_num, invisible_duration)
-
-    def ack(self, message: Message):
-        if self.is_running is False:
-            raise IllegalStateException(
-                "unable to ack message because simple consumer is not running"
+            ret_future = Future()
+            handle_send_receipt_callback = functools.partial(
+                self.__receive_message_callback, ret_future=ret_future, 
queue=queue
             )
+            read_future.add_done_callback(handle_send_receipt_callback)
+            return ret_future
+        except Exception as e:
+            raise e
 
-        queue = self.__select_topic_queue(message.topic)
-        return self.__ack(message, queue)
+    def ack(self, message: Message):
+        try:
+            future = self.__ack(message)
+            self.__handle_ack_result(future)
+        except Exception as e:
+            raise e
 
     def ack_async(self, message: Message):
-        if self.is_running is False:
-            raise IllegalStateException(
-                "unable to ack message because simple consumer is not running"
+        try:
+            future = self.__ack(message)
+            ret_future = Future()
+            ack_callback = functools.partial(
+                self.__handle_ack_result, ret_future=ret_future
             )
-
-        queue = self.__select_topic_queue(message.topic)
-        return self.__ack_async(message, queue)
+            future.add_done_callback(ack_callback)
+            return ret_future
+        except Exception as e:
+            raise e
 
     def change_invisible_duration(self, message: Message, invisible_duration):
-        if self.is_running is False:
-            raise IllegalStateException(
-                "unable to change invisible duration because simple consumer 
is not running"
-            )
-
-        queue = self.__select_topic_queue(message.topic)
-        return self.__change_invisible_duration(message, queue, 
invisible_duration)
+        try:
+            future = self.__change_invisible_duration(message, 
invisible_duration)
+            self.__handle_change_invisible_result(future, message)
+        except Exception as e:
+            raise e
 
     def change_invisible_duration_async(self, message: Message, 
invisible_duration):
-        if self.is_running is False:
-            raise IllegalStateException(
-                "unable to change invisible duration because simple consumer 
is not running"
+        try:
+            future = self.__change_invisible_duration(message, 
invisible_duration)
+            ret_future = Future()
+            change_invisible_callback = functools.partial(
+                self.__handle_change_invisible_result, message=message, 
ret_future=ret_future
             )
-
-        queue = self.__select_topic_queue(message.topic)
-        return self.__change_invisible_duration_async(
-            message, queue, invisible_duration
-        )
+            future.add_done_callback(change_invisible_callback)
+            return ret_future
+        except Exception as e:
+            raise e
 
     """ override """
 
@@ -245,44 +256,6 @@ class SimpleConsumer(Client):
             logger.error(f"simple consumer select topic queue raise exception: 
{e}")
             raise e
 
-    def __receive(self, max_message_num, invisible_duration):
-        self.__receive_pre_check(max_message_num)
-        topic = self.__select_topic_for_receive()
-        queue = self.__select_topic_queue(topic)
-        req = self.__receive_req(topic, queue, max_message_num, 
invisible_duration)
-        timeout = self.client_configuration.request_timeout + 
self.__await_duration
-        future = self.rpc_client.receive_message_async(
-            queue.endpoints, req, metadata=self._sign(), timeout=timeout
-        )
-        read_future = asyncio.run_coroutine_threadsafe(
-            self.__receive_message_response(future.result()),
-            self._rpc_channel_io_loop(),
-        )
-        return self.__handle_receive_message_response(read_future.result())
-
-    def __receive_async(self, max_message_num, invisible_duration):
-        try:
-            self.__receive_pre_check(max_message_num)
-            topic = self.__select_topic_for_receive()
-            queue = self.__select_topic_queue(topic)
-            req = self.__receive_req(topic, queue, max_message_num, 
invisible_duration)
-            timeout = self.client_configuration.request_timeout + 
self.__await_duration
-            future = self.rpc_client.receive_message_async(
-                queue.endpoints, req, metadata=self._sign(), timeout=timeout
-            )
-            read_future = asyncio.run_coroutine_threadsafe(
-                self.__receive_message_response(future.result()),
-                self._rpc_channel_io_loop(),
-            )
-            ret_future = Future()
-            handle_send_receipt_callback = functools.partial(
-                self.__receive_message_callback, ret_future=ret_future
-            )
-            read_future.add_done_callback(handle_send_receipt_callback)
-            return ret_future
-        except Exception as e:
-            raise e
-
     def __receive_pre_check(self, max_message_num):
         if self.is_running is False:
             raise IllegalStateException("consumer is not running now.")
@@ -305,10 +278,27 @@ class SimpleConsumer(Client):
         req.auto_renew = False
         return req
 
-    def __receive_message_callback(self, future, ret_future):
+    def __receive(self, max_message_num, invisible_duration):
+        if self.is_running is False:
+            raise IllegalStateException(
+                "unable to receive messages because simple consumer is not 
running"
+            )
+        try:
+            self.__receive_pre_check(max_message_num)
+            topic = self.__select_topic_for_receive()
+            queue = self.__select_topic_queue(topic)
+            req = self.__receive_req(topic, queue, max_message_num, 
invisible_duration)
+            timeout = self.client_configuration.request_timeout + 
self.__await_duration
+            return self.rpc_client.receive_message_async(
+                queue.endpoints, req, metadata=self._sign(), timeout=timeout
+            ), queue
+        except Exception as e:
+            raise e
+
+    def __receive_message_callback(self, future, ret_future, queue):
         try:
             responses = future.result()
-            messages = self.__handle_receive_message_response(responses)
+            messages = self.__handle_receive_message_response(responses, queue)
             self._submit_callback(
                 CallbackResult.async_receive_callback_result(ret_future, 
messages)
             )
@@ -333,7 +323,7 @@ class SimpleConsumer(Client):
             )
             raise e
 
-    def __handle_receive_message_response(self, responses):
+    def __handle_receive_message_response(self, responses, queue):
         messages = list()
         status = None
 
@@ -344,50 +334,15 @@ class SimpleConsumer(Client):
                 )
                 status = res.status
             elif res.HasField("message"):
-                messages.append(Message().fromProtobuf(res.message))
+                msg = Message().fromProtobuf(res.message)
+                msg.endpoints = queue.endpoints
+                messages.append(msg)
 
         MessagingResultChecker.check(status)
         return messages
 
     # ack message
 
-    def __ack(self, message: Message, queue):
-        if self.is_running is False:
-            raise IllegalStateException("consumer is not running now.")
-
-        try:
-            req = self.__ack_req(message)
-            future = self.rpc_client.ack_message_async(
-                queue.endpoints,
-                req,
-                metadata=self._sign(),
-                timeout=self.client_configuration.request_timeout,
-            )
-            self.__handle_ack_result(future)
-        except Exception as e:
-            raise e
-
-    def __ack_async(self, message: Message, queue):
-        if self.is_running is False:
-            raise IllegalStateException("consumer is not running now.")
-
-        try:
-            req = self.__ack_req(message)
-            future = self.rpc_client.ack_message_async(
-                queue.endpoints,
-                req,
-                metadata=self._sign(),
-                timeout=self.client_configuration.request_timeout,
-            )
-            ret_future = Future()
-            ack_callback = functools.partial(
-                self.__handle_ack_result, ret_future=ret_future
-            )
-            future.add_done_callback(ack_callback)
-            return ret_future
-        except Exception as e:
-            raise e
-
     def __ack_req(self, message: Message):
         req = AckMessageRequest()
         req.group.name = self.__consumer_group
@@ -401,6 +356,21 @@ class SimpleConsumer(Client):
         req.entries.append(msg_entry)
         return req
 
+    def __ack(self, message: Message):
+        if self.is_running is False:
+            raise IllegalStateException(
+                "unable to ack message because simple consumer is not running"
+            )
+        try:
+            return self.rpc_client.ack_message_async(
+                message.endpoints,
+                self.__ack_req(message),
+                metadata=self._sign(),
+                timeout=self.client_configuration.request_timeout,
+            )
+        except Exception as e:
+            raise e
+
     def __handle_ack_result(self, future, ret_future=None):
         try:
             res = future.result()
@@ -422,45 +392,6 @@ class SimpleConsumer(Client):
 
     # change_invisible
 
-    def __change_invisible_duration(self, message: Message, queue, 
invisible_duration):
-        if self.is_running is False:
-            raise IllegalStateException("consumer is not running now.")
-
-        try:
-            req = self.__change_invisible_req(message, invisible_duration)
-            future = self.rpc_client.change_invisible_duration_async(
-                queue.endpoints,
-                req,
-                metadata=self._sign(),
-                timeout=self.client_configuration.request_timeout,
-            )
-            self.__handle_change_invisible_result(future, message)
-        except Exception as e:
-            raise e
-
-    def __change_invisible_duration_async(
-        self, message: Message, queue, invisible_duration
-    ):
-        if self.is_running is False:
-            raise IllegalArgumentException("consumer is not running now.")
-
-        try:
-            req = self.__change_invisible_req(message, invisible_duration)
-            future = self.rpc_client.change_invisible_duration_async(
-                queue.endpoints,
-                req,
-                metadata=self._sign(),
-                timeout=self.client_configuration.request_timeout,
-            )
-            ret_future = Future()
-            change_invisible_callback = functools.partial(
-                self.__handle_change_invisible_result, message=message, 
ret_future=ret_future
-            )
-            future.add_done_callback(change_invisible_callback)
-            return ret_future
-        except Exception as e:
-            raise e
-
     def __change_invisible_req(self, message: Message, invisible_duration):
         req = ChangeInvisibleDurationRequest()
         req.topic.name = message.topic
@@ -472,6 +403,21 @@ class SimpleConsumer(Client):
         req.message_id = message.message_id
         return req
 
+    def __change_invisible_duration(self, message: Message, 
invisible_duration):
+        if self.is_running is False:
+            raise IllegalStateException(
+                "unable to change invisible duration because simple consumer 
is not running"
+            )
+        try:
+            return self.rpc_client.change_invisible_duration_async(
+                message.endpoints,
+                self.__change_invisible_req(message, invisible_duration),
+                metadata=self._sign(),
+                timeout=self.client_configuration.request_timeout,
+            )
+        except Exception as e:
+            raise e
+
     def __handle_change_invisible_result(self, future, message, 
ret_future=None):
         try:
             res = future.result()
diff --git a/python/rocketmq/v5/model/message.py 
b/python/rocketmq/v5/model/message.py
index dee7dad4..ce66a31a 100644
--- a/python/rocketmq/v5/model/message.py
+++ b/python/rocketmq/v5/model/message.py
@@ -35,6 +35,7 @@ class Message:
         self.__delivery_attempt = None
         self.__receipt_handle = None
         self.__message_type = None
+        self.__endpoints = None
 
     def __str__(self) -> str:
         return (
@@ -165,6 +166,10 @@ class Message:
     def message_type(self):
         return self.__message_type
 
+    @property
+    def endpoints(self):
+        return self.__endpoints
+
     @body.setter
     def body(self, body):
         if body is None or body.strip() == "":
@@ -227,9 +232,26 @@ class Message:
     def message_type(self, message_type):
         self.__message_type = message_type
 
+    @endpoints.setter
+    def endpoints(self, endpoints):
+        self.__endpoints = endpoints
+
     def add_property(self, key, value):
         if key is None or key.strip() == "":
             raise IllegalArgumentException("key should not be blank")
         if value is None or value.strip() == "":
             raise IllegalArgumentException("value should not be blank")
         self.__properties[key] = value
+
+    @staticmethod
+    def message_type_desc(message_type):
+        if message_type == 1:
+            return "NORMAL"
+        elif message_type == 2:
+            return "FIFO"
+        elif message_type == 3:
+            return "DELAY"
+        elif message_type == 4:
+            return "TRANSACTION"
+        else:
+            return "MESSAGE_TYPE_UNSPECIFIED"
diff --git a/python/rocketmq/v5/model/topic_route.py 
b/python/rocketmq/v5/model/topic_route.py
index b93728e2..8c227b01 100644
--- a/python/rocketmq/v5/model/topic_route.py
+++ b/python/rocketmq/v5/model/topic_route.py
@@ -15,6 +15,7 @@
 
 from rocketmq.grpc_protocol import Permission, definition_pb2
 from rocketmq.v5.client.connection import RpcEndpoints
+from rocketmq.v5.model import Message
 
 
 class MessageQueue:
@@ -77,6 +78,15 @@ class MessageQueue:
         queue.accept_message_types.extend(self.__accept_message_types)
         return queue
 
+    def accept_message_types_desc(self):
+        ret = ""
+        for access_type in self.__accept_message_types:
+            ret = ret + Message.message_type_desc(access_type) + ","
+        if len(ret) == 0:
+            return ret
+        else:
+            return ret[:len(ret) - 1]
+
     """ property """
 
     @property
diff --git a/python/rocketmq/v5/producer/producer.py 
b/python/rocketmq/v5/producer/producer.py
index 2a9bba70..5072d90a 100644
--- a/python/rocketmq/v5/producer/producer.py
+++ b/python/rocketmq/v5/producer/producer.py
@@ -148,10 +148,10 @@ class Producer(Client):
             raise IllegalStateException("producer is not running now.")
 
         self.__wrap_sending_message(message, False if transaction is None else 
True)
-        topic_queue = self.__select_send_queue(message.topic)
+        topic_queue = self.__select_send_queue(message)
         if message.message_type not in topic_queue.accept_message_types:
             raise IllegalArgumentException(
-                f"current message type not match with queue accept message 
types, topic:{message.topic}, message_type:{message.message_type}, queue access 
type:{topic_queue.accept_message_types}"
+                f"current message type not match with queue accept message 
types, topic:{message.topic}, 
message_type:{Message.message_type_desc(message.message_type)}, queue access 
type:{topic_queue.accept_message_types_desc()}"
             )
 
         if transaction is None:
@@ -180,12 +180,12 @@ class Producer(Client):
             raise IllegalStateException("producer is not running now.")
 
         self.__wrap_sending_message(message, False)
-        topic_queue = self.__select_send_queue(message.topic)
+        topic_queue = self.__select_send_queue(message)
         if message.message_type not in topic_queue.accept_message_types:
             raise IllegalArgumentException(
                 f"current message type not match with queue accept message 
types, "
-                f"topic:{message.topic}, message_type:{message.message_type}, "
-                f"queue access type:{topic_queue.accept_message_types}"
+                f"topic:{message.topic}, 
message_type:{Message.message_type_desc(message.message_type)}, "
+                f"queue access type:{topic_queue.accept_message_types_desc()}"
             )
 
         try:
@@ -303,6 +303,7 @@ class Producer(Client):
     def __send(self, message: Message, topic_queue, attempt=1) -> SendReceipt:
         req = self.__send_req(message)
         send_context = self.client_metrics.send_before(message.topic)
+        print(f"{topic_queue}")
         send_message_future = self.rpc_client.send_message_async(
             topic_queue.endpoints,
             req,
@@ -338,7 +339,7 @@ class Producer(Client):
                 raise retry_exception_future.exception()
 
             # resend message
-            topic_queue = self.__select_send_queue(message.topic)
+            topic_queue = self.__select_send_queue(message)
             return self.__send(message, topic_queue, attempt)
 
     def __send_async(self, message: Message, topic_queue, attempt=1, 
ret_future=None):
@@ -395,7 +396,7 @@ class Producer(Client):
                 )
                 return
             # resend message
-            topic_queue = self.__select_send_queue(message.topic)
+            topic_queue = self.__select_send_queue(message)
             self.__send_async(message, topic_queue, attempt, ret_future)
 
     def __process_send_message_response(self, send_message_future, 
topic_queue):
@@ -501,15 +502,18 @@ class Producer(Client):
             "transactional message should not set messageGroup or 
deliveryTimestamp"
         )
 
-    def __select_send_queue(self, topic):
+    def __select_send_queue(self, message):
         try:
-            route = self._retrieve_topic_route_data(topic)
+            route = self._retrieve_topic_route_data(message.topic)
             queue_selector = self.__send_queue_selectors.put_if_absent(
-                topic, QueueSelector.producer_queue_selector(route)
+                message.topic, QueueSelector.producer_queue_selector(route)
             )
-            return queue_selector.select_next_queue()
+            if message.message_group is None:
+                return queue_selector.select_next_queue()
+            else:
+                return 
queue_selector.select_queue_by_hash_key(message.message_group)
         except Exception as e:
-            logger.error(f"producer select topic:{topic} queue raise 
exception, {e}")
+            logger.error(f"producer select topic:{message.topic} queue raise 
exception, {e}")
             raise e
 
     def __end_transaction_req(self, message: Message, transaction_id, result, 
source):


Reply via email to