This is an automated email from the ASF dual-hosted git repository.
lollipop pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/rocketmq-clients.git
The following commit(s) were added to refs/heads/master by this push:
new 04251964 python 5.0.3 (#937)
04251964 is described below
commit 0425196498f9936013c13d6408159ae106e9096d
Author: zhouli11 <[email protected]>
AuthorDate: Tue Feb 11 13:58:00 2025 +0800
python 5.0.3 (#937)
* update to 5.0.3
---
.../rocketmq/v5/client/balancer/queue_selector.py | 44 +++-
python/rocketmq/v5/client/client.py | 235 ++++++++++++------
python/rocketmq/v5/client/client_configuration.py | 17 +-
.../rocketmq/v5/client/connection/rpc_channel.py | 79 ++++--
python/rocketmq/v5/client/connection/rpc_client.py | 275 ++++++++++++++-------
.../rocketmq/v5/client/metrics/client_metrics.py | 60 +++--
python/rocketmq/v5/consumer/simple_consumer.py | 193 +++++++++++----
python/rocketmq/v5/exception/client_exception.py | 1 +
python/rocketmq/v5/log/log_config.py | 38 ++-
python/rocketmq/v5/model/callback_result.py | 24 +-
python/rocketmq/v5/model/filter_expression.py | 6 +-
python/rocketmq/v5/model/message.py | 57 +++--
python/rocketmq/v5/model/metrics.py | 10 +-
python/rocketmq/v5/model/send_receipt.py | 1 +
python/rocketmq/v5/model/topic_route.py | 32 ++-
python/rocketmq/v5/producer/producer.py | 270 ++++++++++++++------
python/rocketmq/v5/test/test_base.py | 25 +-
python/rocketmq/v5/test/test_consumer.py | 40 ++-
python/rocketmq/v5/test/test_producer.py | 14 +-
python/rocketmq/v5/util/client_id.py | 10 +-
python/rocketmq/v5/util/message_id_codec.py | 26 +-
.../rocketmq/v5/util/messaging_result_checker.py | 56 ++++-
python/rocketmq/v5/util/misc.py | 22 +-
python/rocketmq/v5/util/signature.py | 36 +--
python/setup.py | 2 +-
25 files changed, 1113 insertions(+), 460 deletions(-)
diff --git a/python/rocketmq/v5/client/balancer/queue_selector.py
b/python/rocketmq/v5/client/balancer/queue_selector.py
index add42089..94ed97f7 100644
--- a/python/rocketmq/v5/client/balancer/queue_selector.py
+++ b/python/rocketmq/v5/client/balancer/queue_selector.py
@@ -33,18 +33,36 @@ class QueueSelector:
@classmethod
def producer_queue_selector(cls, topic_route: TopicRouteData):
- return cls(list(filter(lambda queue: queue.is_writable() and
queue.is_master_broker(), topic_route.message_queues)),
- QueueSelector.PRODUCER_QUEUE_SELECTOR)
+ return cls(
+ list(
+ filter(
+ lambda queue: queue.is_writable() and
queue.is_master_broker(),
+ topic_route.message_queues,
+ )
+ ),
+ QueueSelector.PRODUCER_QUEUE_SELECTOR,
+ )
@classmethod
def simple_consumer_queue_selector(cls, topic_route: TopicRouteData):
- return cls(list(filter(lambda queue: queue.is_readable() and
queue.is_master_broker(), topic_route.message_queues)),
- QueueSelector.SIMPLE_CONSUMER_QUEUE_SELECTOR)
+ return cls(
+ list(
+ filter(
+ lambda queue: queue.is_readable() and
queue.is_master_broker(),
+ topic_route.message_queues,
+ )
+ ),
+ QueueSelector.SIMPLE_CONSUMER_QUEUE_SELECTOR,
+ )
def select_next_queue(self):
if self.__selector_type == QueueSelector.NONE_TYPE_SELECTOR:
- raise IllegalArgumentException("error type for queue selector,
type is NONE_TYPE_SELECTOR.")
- return self.__message_queues[self.__index.get_and_increment() %
len(self.__message_queues)]
+ raise IllegalArgumentException(
+ "error type for queue selector, type is NONE_TYPE_SELECTOR."
+ )
+ return self.__message_queues[
+ self.__index.get_and_increment() % len(self.__message_queues)
+ ]
def all_queues(self):
index = self.__index.get_and_increment() % len(self.__message_queues)
@@ -54,6 +72,16 @@ class QueueSelector:
if topic_route.message_queues == self.__message_queues:
return
if self.__selector_type == QueueSelector.PRODUCER_QUEUE_SELECTOR:
- self.__message_queues = list(filter(lambda queue:
queue.is_writable() and queue.is_master_broker(), topic_route.message_queues))
+ self.__message_queues = list(
+ filter(
+ lambda queue: queue.is_writable() and
queue.is_master_broker(),
+ topic_route.message_queues,
+ )
+ )
elif self.__selector_type ==
QueueSelector.SIMPLE_CONSUMER_QUEUE_SELECTOR:
- self.__message_queues = list(filter(lambda queue:
queue.is_readable() and queue.is_master_broker(), topic_route.message_queues))
+ self.__message_queues = list(
+ filter(
+ lambda queue: queue.is_readable() and
queue.is_master_broker(),
+ topic_route.message_queues,
+ )
+ )
diff --git a/python/rocketmq/v5/client/client.py
b/python/rocketmq/v5/client/client.py
index 57cf2a0b..b6c40f93 100644
--- a/python/rocketmq/v5/client/client.py
+++ b/python/rocketmq/v5/client/client.py
@@ -36,7 +36,9 @@ class Client:
CALLBACK_THREADS_NUM = 5
- def __init__(self, client_configuration, topics, client_type: ClientType,
tls_enable=False):
+ def __init__(
+ self, client_configuration, topics, client_type: ClientType,
tls_enable=False
+ ):
if client_configuration is None:
raise IllegalArgumentException("clientConfiguration should not be
null.")
self.__client_configuration = client_configuration
@@ -55,7 +57,9 @@ class Client:
self.__sync_setting_scheduler_threading_event = None
self.__clear_idle_rpc_channels_threading_event = None
if topics is not None:
- self.__topics = set(filter(lambda topic:
Misc.is_valid_topic(topic), topics))
+ self.__topics = set(
+ filter(lambda topic: Misc.is_valid_topic(topic), topics)
+ )
else:
self.__topics = set()
self.__callback_result_queue = Queue()
@@ -67,7 +71,9 @@ class Client:
def startup(self):
try:
if self.__had_shutdown is True:
- raise Exception(f"client:{self.__client_id} had shutdown,
can't startup again.")
+ raise Exception(
+ f"client:{self.__client_id} had shutdown, can't startup
again."
+ )
try:
# pre update topic route for producer or consumer
@@ -76,7 +82,8 @@ class Client:
except Exception as e:
# ignore this exception and retrieve again when calling send
or receive
logger.warn(
- f"update topic exception when client startup, ignore it,
try it again in scheduler. exception: {e}")
+ f"update topic exception when client startup, ignore it,
try it again in scheduler. exception: {e}"
+ )
self.__start_scheduler()
self.__start_async_rpc_callback_handler()
self.__is_running = True
@@ -110,27 +117,27 @@ class Client:
""" abstract """
def _start_success(self):
- """ each subclass implements its own actions after a successful
startup """
+ """each subclass implements its own actions after a successful
startup"""
pass
def _start_failure(self):
- """ each subclass implements its own actions after a startup failure
"""
+ """each subclass implements its own actions after a startup failure"""
pass
def _sync_setting_req(self, endpoints):
- """ each subclass implements its own telemetry settings scheme """
+ """each subclass implements its own telemetry settings scheme"""
pass
def _heartbeat_req(self):
- """ each subclass implements its own heartbeat request """
+ """each subclass implements its own heartbeat request"""
pass
def _notify_client_termination_req(self):
- """ each subclass implements its own client termination request """
+ """each subclass implements its own client termination request"""
pass
def _update_queue_selector(self, topic, topic_route):
- """ each subclass implements its own queue selector """
+ """each subclass implements its own queue selector"""
pass
""" scheduler """
@@ -140,29 +147,36 @@ class Client:
try:
# update topic route every 30 seconds
self.__client_thread_task_enabled = True
- self.__topic_route_scheduler =
threading.Thread(target=self.__schedule_update_topic_route_cache,
-
name="update_topic_route_schedule_thread")
+ self.__topic_route_scheduler = threading.Thread(
+ target=self.__schedule_update_topic_route_cache,
+ name="update_topic_route_schedule_thread",
+ )
self.__topic_route_scheduler_threading_event = threading.Event()
self.__topic_route_scheduler.start()
logger.info("start topic route scheduler success.")
# send heartbeat to all endpoints every 10 seconds
- self.__heartbeat_scheduler =
threading.Thread(target=self.__schedule_heartbeat,
-
name="heartbeat_schedule_thread")
+ self.__heartbeat_scheduler = threading.Thread(
+ target=self.__schedule_heartbeat,
name="heartbeat_schedule_thread"
+ )
self.__heartbeat_scheduler_threading_event = threading.Event()
self.__heartbeat_scheduler.start()
logger.info("start heartbeat scheduler success.")
# send client setting to all endpoints every 5 seconds
- self.__sync_setting_scheduler =
threading.Thread(target=self.__schedule_update_setting,
-
name="sync_setting_schedule_thread")
+ self.__sync_setting_scheduler = threading.Thread(
+ target=self.__schedule_update_setting,
+ name="sync_setting_schedule_thread",
+ )
self.__sync_setting_scheduler_threading_event = threading.Event()
self.__sync_setting_scheduler.start()
logger.info("start sync setting scheduler success.")
# clear unused grpc channel(>30 minutes) every 60 seconds
- self.__clear_idle_rpc_channels_scheduler =
threading.Thread(target=self.__schedule_clear_idle_rpc_channels,
-
name="clear_idle_rpc_channel_schedule_thread")
+ self.__clear_idle_rpc_channels_scheduler = threading.Thread(
+ target=self.__schedule_clear_idle_rpc_channels,
+ name="clear_idle_rpc_channel_schedule_thread",
+ )
self.__clear_idle_rpc_channels_threading_event = threading.Event()
self.__clear_idle_rpc_channels_scheduler.start()
logger.info("start clear idle rpc channels scheduler success.")
@@ -176,7 +190,7 @@ class Client:
while True:
if self.__client_thread_task_enabled is True:
self.__topic_route_scheduler_threading_event.wait(30)
- logger.debug(f"{self.__str__()} run scheduler for update topic
route cache.")
+ logger.debug(f"{self.__str__()} run update topic route in
scheduler.")
# update topic route for each topic in cache
topics = self.__topic_route_cache.keys()
for topic in topics:
@@ -185,24 +199,29 @@ class Client:
self.__update_topic_route_async(topic)
except Exception as e:
logger.error(
- f"{self.__str__()} run scheduler for update
topic:{topic} route cache exception: {e}")
+ f"{self.__str__()} scheduler update topic:{topic}
route raise exception: {e}"
+ )
else:
break
- logger.info(f"{self.__str__()} stop scheduler for update topic route
cache success.")
+ logger.info(
+ f"{self.__str__()} stop scheduler for update topic route cache
success."
+ )
def __schedule_heartbeat(self):
asyncio.set_event_loop(self._rpc_channel_io_loop())
while True:
if self.__client_thread_task_enabled is True:
self.__heartbeat_scheduler_threading_event.wait(10)
- logger.debug(f"{self.__str__()} run scheduler for heartbeat.")
+ logger.debug(f"{self.__str__()} run send heartbeat in
scheduler.")
all_endpoints = self.__get_all_endpoints().values()
try:
for endpoints in all_endpoints:
if self.__client_thread_task_enabled is True:
self.__heartbeat_async(endpoints)
except Exception as e:
- logger.error(f"{self.__str__()} run scheduler for
heartbeat exception: {e}")
+ logger.error(
+ f"{self.__str__()} scheduler send heartbeat raise
exception: {e}"
+ )
else:
break
logger.info(f"{self.__str__()} stop scheduler for heartbeat success.")
@@ -212,16 +231,16 @@ class Client:
while True:
if self.__client_thread_task_enabled is True:
self.__sync_setting_scheduler_threading_event.wait(5)
- logger.debug(f"{self.__str__()} run scheduler for update
setting.")
+ logger.debug(f"{self.__str__()} run update setting in
scheduler.")
try:
all_endpoints = self.__get_all_endpoints().values()
for endpoints in all_endpoints:
if self.__client_thread_task_enabled is True:
- # if stream_stream_call for grpc channel is none,
create a new one, otherwise use the existing one
-
self.__retrieve_telemetry_stream_stream_call(endpoints)
self.__setting_write(endpoints)
except Exception as e:
- logger.error(f"{self.__str__()} run scheduler for update
setting exception: {e}")
+ logger.error(
+ f"{self.__str__()} scheduler set setting raise
exception: {e}"
+ )
else:
break
logger.info(f"{self.__str__()} stop scheduler for update setting
success.")
@@ -230,15 +249,21 @@ class Client:
while True:
if self.__client_thread_task_enabled is True:
self.__clear_idle_rpc_channels_threading_event.wait(60)
- logger.debug(f"{self.__str__()} run scheduler for clear idle
rpc channels.")
+ logger.debug(
+ f"{self.__str__()} run scheduler for clear idle rpc
channels."
+ )
try:
if self.__client_thread_task_enabled is True:
self.__rpc_client.clear_idle_rpc_channels()
except Exception as e:
- logger.error(f"{self.__str__()} run scheduler for clear
idle rpc channels: {e}")
+ logger.error(
+ f"{self.__str__()} run scheduler for clear idle rpc
channels: {e}"
+ )
else:
break
- logger.info(f"{self.__str__()} stop scheduler for clear idle rpc
channels success.")
+ logger.info(
+ f"{self.__str__()} stop scheduler for clear idle rpc channels
success."
+ )
""" callback handler for async method """
@@ -247,11 +272,15 @@ class Client:
# this handler switches user's callback thread from RpcClient's
_io_loop_thread to client's callback_handler_thread
try:
for i in range(Client.CALLBACK_THREADS_NUM):
- th = threading.Thread(name=f"callback_handler_thread-{i}",
target=self.__handle_callback)
+ th = threading.Thread(
+ name=f"callback_handler_thread-{i}",
target=self.__handle_callback
+ )
th.daemon = True
self.__callback_threads.append(th)
th.start()
- logger.info(f"{self.__str__()} start async rpc callback
thread:{th} success.")
+ logger.info(
+ f"{self.__str__()} start async rpc callback thread:{th}
success."
+ )
except Exception as e:
print(f"{self.__str__()} start async rpc callback raise exception:
{e}")
raise e
@@ -260,7 +289,10 @@ class Client:
while True:
if self.__client_thread_task_enabled is True:
callback_result = self.__callback_result_queue.get()
- if callback_result.result_type ==
CallbackResultType.END_CALLBACK_THREAD_RESULT:
+ if (
+ callback_result.result_type
+ == CallbackResultType.END_CALLBACK_THREAD_RESULT
+ ):
# end infinite loop when client shutdown
self.__callback_result_queue.task_done()
break
@@ -272,7 +304,9 @@ class Client:
self.__callback_result_queue.task_done()
else:
break
- logger.info(f"{self.__str__()} stop client callback result handler
thread:{threading.current_thread()} success.")
+ logger.info(
+ f"{self.__str__()} stop client callback result handler
thread:{threading.current_thread()} success."
+ )
""" protect """
@@ -282,10 +316,12 @@ class Client:
return route
else:
route = self.__update_topic_route(topic)
- logger.info(f"{self.__str__()} update topic:{topic} route
success.")
if route is not None:
+ logger.info(f"{self.__str__()} update topic:{topic} route
success.")
self.__topics.add(topic)
- return route
+ return route
+ else:
+ raise Exception(f"failed to fetch topic:{topic} route.")
def _remove_unused_topic_route_data(self, topic):
self.__topic_route_cache.remove(topic)
@@ -306,31 +342,41 @@ class Client:
# topic route #
def __update_topic_route(self, topic):
- try:
- future =
self.__rpc_client.query_topic_route_async(self.__client_configuration.rpc_endpoints,
-
self.__topic_route_req(topic), metadata=self._sign(),
-
timeout=self.__client_configuration.request_timeout)
- res = future.result()
- route = self.__handle_topic_route_res(res, topic)
- return route
- except Exception as e:
- logger.error(f"update topic route error, topic:{topic}, {e}")
- raise e
+ event = threading.Event()
+ callback = functools.partial(
+ self.__query_topic_route_async_callback, topic=topic, event=event
+ )
+ future = self.__rpc_client.query_topic_route_async(
+ self.__client_configuration.rpc_endpoints,
+ self.__topic_route_req(topic),
+ metadata=self._sign(),
+ timeout=self.__client_configuration.request_timeout,
+ )
+ future.add_done_callback(callback)
+ event.wait()
+ return self.__topic_route_cache.get(topic)
def __update_topic_route_async(self, topic):
- callback = functools.partial(self.__query_topic_route_async_callback,
topic=topic)
- future =
self.__rpc_client.query_topic_route_async(self.__client_configuration.rpc_endpoints,
-
self.__topic_route_req(topic),
-
metadata=self._sign(),
-
timeout=self.__client_configuration.request_timeout)
+ callback = functools.partial(
+ self.__query_topic_route_async_callback, topic=topic
+ )
+ future = self.__rpc_client.query_topic_route_async(
+ self.__client_configuration.rpc_endpoints,
+ self.__topic_route_req(topic),
+ metadata=self._sign(),
+ timeout=self.__client_configuration.request_timeout,
+ )
future.add_done_callback(callback)
- def __query_topic_route_async_callback(self, future, topic):
+ def __query_topic_route_async_callback(self, future, topic, event=None):
try:
res = future.result()
self.__handle_topic_route_res(res, topic)
except Exception as e:
raise e
+ finally:
+ if event is not None:
+ event.set()
def __topic_route_req(self, topic):
req = QueryRouteRequest()
@@ -344,7 +390,9 @@ class Client:
MessagingResultChecker.check(res.status)
if res.status.code == Code.OK:
topic_route = TopicRouteData(res.message_queues)
- logger.debug(f"{self.__str__()} update topic:{topic} route,
route info: {topic_route.__str__()}")
+ logger.info(
+ f"{self.__str__()} update topic:{topic} route, route info:
{topic_route.__str__()}"
+ )
# if topic route has new endpoint, connect
self.__check_topic_route_endpoints_changed(topic, topic_route)
self.__topic_route_cache.put(topic, topic_route)
@@ -359,27 +407,50 @@ class Client:
def __heartbeat_async(self, endpoints):
req = self._heartbeat_req()
callback = functools.partial(self.__heartbeat_callback,
endpoints=endpoints)
- future = self.__rpc_client.heartbeat_async(endpoints, req,
metadata=self._sign(),
-
timeout=self.__client_configuration.request_timeout)
+ future = self.__rpc_client.heartbeat_async(
+ endpoints,
+ req,
+ metadata=self._sign(),
+ timeout=self.__client_configuration.request_timeout,
+ )
future.add_done_callback(callback)
def __heartbeat_callback(self, future, endpoints):
try:
res = future.result()
if res is not None and res.status.code == Code.OK:
- logger.info(f"{self.__str__()} send heartbeat to
{endpoints.__str__()} success.")
+ logger.info(
+ f"{self.__str__()} send heartbeat to {endpoints.__str__()}
success."
+ )
else:
if res is not None:
logger.error(
- f"{self.__str__()} send heartbeat to
{endpoints.__str__()} error, code:{res.status.code},
message:{res.status.message}.")
+ f"{self.__str__()} send heartbeat to
{endpoints.__str__()} error, code:{res.status.code},
message:{res.status.message}."
+ )
else:
- logger.error(f"{self.__str__()} send heartbeat to
{endpoints.__str__()} error, response is none.")
+ logger.error(
+ f"{self.__str__()} send heartbeat to
{endpoints.__str__()} error, response is none."
+ )
except Exception as e:
- logger.error(f"{self.__str__()} send heartbeat to
{endpoints.__str__()} exception, e: {e}")
+ logger.error(
+ f"{self.__str__()} send heartbeat to {endpoints.__str__()}
exception, e: {e}"
+ )
raise e
# sync settings #
+ def __retrieve_telemetry_stream_stream_call(self, endpoints,
rebuild=False):
+ try:
+ self.__rpc_client.telemetry_stream(
+ endpoints, self, self._sign(), rebuild, timeout=60 * 60 * 24 *
365
+ )
+ except Exception as e:
+ logger.error(
+ f"{self.__str__()} rebuild stream_steam_call to
{endpoints.__str__()} exception: {e}"
+ if rebuild
+ else f"{self.__str__()} create stream_steam_call to
{endpoints.__str__()} exception: {e}"
+ )
+
def __setting_write(self, endpoints):
req = self._sync_setting_req(endpoints)
callback = functools.partial(self.__setting_write_callback,
endpoints=endpoints)
@@ -387,26 +458,26 @@ class Client:
logger.debug(f"{self.__str__()} send setting to {endpoints.__str__()},
{req}")
future.add_done_callback(callback)
- def __retrieve_telemetry_stream_stream_call(self, endpoints,
rebuild=False):
- try:
- self.__rpc_client.telemetry_stream(endpoints, self,
metadata=self._sign(), timeout=60 * 60 * 24 * 365,
- rebuild=rebuild)
- except Exception as e:
- logger.error(
- f"{self.__str__()} rebuild stream_steam_call to
{endpoints.__str__()} exception: {e}" if rebuild else f"{self.__str__()} create
stream_steam_call to {endpoints.__str__()} exception: {e}")
-
def __setting_write_callback(self, future, endpoints):
try:
future.result()
- logger.debug(f"{self.__str__()} send setting to
{endpoints.__str__()} success.")
+ logger.info(
+ f"{self.__str__()} send setting to {endpoints.__str__()}
success."
+ )
except InvalidStateError as e:
- logger.warn(f"{self.__str__()} send setting to
{endpoints.__str__()} occurred InvalidStateError: {e}")
+ logger.warn(
+ f"{self.__str__()} send setting to {endpoints.__str__()}
occurred InvalidStateError: {e}"
+ )
self.__retrieve_telemetry_stream_stream_call(endpoints,
rebuild=True)
except AioRpcError as e:
- logger.warn(f"{self.__str__()} send setting to
{endpoints.__str__()} occurred AioRpcError: {e}")
+ logger.warn(
+ f"{self.__str__()} send setting to {endpoints.__str__()}
occurred AioRpcError: {e}"
+ )
self.__retrieve_telemetry_stream_stream_call(endpoints,
rebuild=True)
except Exception as e:
- logger.error(f"{self.__str__()} send setting to
{endpoints.__str__()} exception: {e}")
+ logger.error(
+ f"{self.__str__()} send setting to {endpoints.__str__()}
exception: {e}"
+ )
self.__retrieve_telemetry_stream_stream_call(endpoints,
rebuild=True)
# metrics #
@@ -418,8 +489,12 @@ class Client:
def __client_termination(self, endpoints):
req = self._notify_client_termination_req()
- future = self.__rpc_client.notify_client_termination(endpoints, req,
metadata=self._sign(),
-
timeout=self.__client_configuration.request_timeout)
+ future = self.__rpc_client.notify_client_termination(
+ endpoints,
+ req,
+ metadata=self._sign(),
+ timeout=self.__client_configuration.request_timeout,
+ )
future.result()
# others ##
@@ -435,11 +510,15 @@ class Client:
old_route = self.__topic_route_cache.get(topic)
if old_route is None or old_route != route:
logger.info(
- f"topic:{topic} route changed for {self.__str__()}. old route
is {old_route}, new route is {route}")
+ f"topic:{topic} route changed for {self.__str__()}. old route
is {old_route}, new route is {route}"
+ )
all_endpoints = self.__get_all_endpoints() # the existing endpoints
- topic_route_endpoints = route.all_endpoints() # the latest endpoints
for topic route
+ topic_route_endpoints = (
+ route.all_endpoints()
+ ) # the latest endpoints for topic route
diff = set(topic_route_endpoints.keys()).difference(
- set(all_endpoints.keys())) # the diff between existing and latest
+ set(all_endpoints.keys())
+ ) # the diff between existing and latest
# create grpc channel, stream_stream_call for new endpoints, send
setting to new endpoints
for address in diff:
endpoints = topic_route_endpoints[address]
@@ -477,7 +556,9 @@ class Client:
self.__clear_idle_rpc_channels_scheduler.join()
for i in range(Client.CALLBACK_THREADS_NUM):
-
self._set_future_callback_result(CallbackResult.end_callback_thread_result())
+ self._set_future_callback_result(
+ CallbackResult.end_callback_thread_result()
+ )
for i in range(Client.CALLBACK_THREADS_NUM):
self.__callback_threads[i].join()
diff --git a/python/rocketmq/v5/client/client_configuration.py
b/python/rocketmq/v5/client/client_configuration.py
index 7c9c67f9..922fe6be 100644
--- a/python/rocketmq/v5/client/client_configuration.py
+++ b/python/rocketmq/v5/client/client_configuration.py
@@ -37,8 +37,12 @@ class Credentials:
class ClientConfiguration:
- def __init__(self, endpoints: str, credentials: Credentials, namespace="",
request_timeout=3):
- self.__rpc_endpoints =
RpcEndpoints(ClientConfiguration.__parse_endpoints(endpoints))
+ def __init__(
+ self, endpoints: str, credentials: Credentials, namespace="",
request_timeout=3
+ ):
+ self.__rpc_endpoints = RpcEndpoints(
+ ClientConfiguration.__parse_endpoints(endpoints)
+ )
self.__credentials = credentials
self.__request_timeout = request_timeout # seconds
self.__namespace = namespace
@@ -52,7 +56,10 @@ class ClientConfiguration:
endpoints = Endpoints()
addresses = endpoints_str.split(";")
endpoints.scheme =
ClientConfiguration.__parse_endpoints_scheme_type(
-
ClientConfiguration.__parse_endpoints_prefix(addresses[0].split(":")[0]))
+ ClientConfiguration.__parse_endpoints_prefix(
+ addresses[0].split(":")[0]
+ )
+ )
for address in addresses:
if len(address) == 0:
continue
@@ -62,7 +69,9 @@ class ClientConfiguration:
ad.port = int(address.split(":")[1])
return endpoints
except Exception as e:
- logger.error(f"client configuration parse {endpoints_str}
exception: {e}")
+ logger.error(
+ f"client configuration parse {endpoints_str} exception:
{e}"
+ )
return None
@staticmethod
diff --git a/python/rocketmq/v5/client/connection/rpc_channel.py
b/python/rocketmq/v5/client/connection/rpc_channel.py
index d79f5478..246e102b 100644
--- a/python/rocketmq/v5/client/connection/rpc_channel.py
+++ b/python/rocketmq/v5/client/connection/rpc_channel.py
@@ -60,9 +60,13 @@ class RpcEndpoints:
def __init__(self, endpoints: Endpoints):
self.__endpoints = endpoints
self.__scheme = endpoints.scheme
- self.__addresses = set(map(lambda address: RpcAddress(address),
endpoints.addresses))
+ self.__addresses = set(
+ map(lambda address: RpcAddress(address), endpoints.addresses)
+ )
if self.__scheme == AddressScheme.DOMAIN_NAME and
len(self.__addresses) > 1:
- raise UnsupportedException("Multiple addresses not allowed in
domain schema")
+ raise UnsupportedException(
+ "Multiple addresses not allowed in domain schema"
+ )
self.__facade, self.__endpoint_desc = self.__facade()
def __hash__(self) -> int:
@@ -79,8 +83,11 @@ class RpcEndpoints:
""" private """
def __facade(self):
- if self.__scheme is None or len(
- self.__addresses) == 0 or self.__scheme ==
AddressScheme.ADDRESS_SCHEME_UNSPECIFIED:
+ if (
+ self.__scheme is None
+ or len(self.__addresses) == 0
+ or self.__scheme == AddressScheme.ADDRESS_SCHEME_UNSPECIFIED
+ ):
return ""
prefix = "dns:"
@@ -94,7 +101,7 @@ class RpcEndpoints:
ret = ""
for address in sorted_list:
ret = ret + address.__str__() + ","
- return prefix + ret[0: len(ret) - 1], ret[0: len(ret) - 1]
+ return prefix + ret[0:len(ret) - 1], ret[0:len(ret) - 1]
""" property """
@@ -123,22 +130,33 @@ class RpcStreamStreamCall:
if res.HasField("settings"):
# read a response for send setting result
if res is not None and res.status.code == Code.OK:
- logger.debug(f"async setting success. response
status code: {res.status.code}")
- if res.settings is not None and
res.settings.metric is not None:
+ logger.debug(
+ f"{ self.__handler.__str__()} sync setting
success. response status code: {res.status.code}"
+ )
+ if (
+ res.settings is not None
+ and res.settings.metric is not None
+ ):
# reset metrics if needed
self.__handler.reset_metric(res.settings.metric)
elif res.HasField("recover_orphaned_transaction_command"):
# sever check for a transaction message
if self.__handler is not None:
- transaction_id =
res.recover_orphaned_transaction_command.transaction_id
+ transaction_id = (
+
res.recover_orphaned_transaction_command.transaction_id
+ )
message =
res.recover_orphaned_transaction_command.message
- await
self.__handler.on_recover_orphaned_transaction_command(self.__endpoints,
message,
-
transaction_id)
+ await
self.__handler.on_recover_orphaned_transaction_command(
+ self.__endpoints, message, transaction_id
+ )
except AioRpcError as e:
logger.warn(
- f"stream read from endpoints {self.__endpoints.__str__()}
occurred AioRpcError. code: {e.code()}, message: {e.details()}")
+ f"{ self.__handler.__str__()} read stream from endpoints
{self.__endpoints.__str__()} occurred AioRpcError. code: {e.code()}, message:
{e.details()}"
+ )
except Exception as e:
- logger.error(f"stream read from endpoints
{self.__endpoints.__str__()} exception, {e}")
+ logger.error(
+ f"{ self.__handler.__str__()} read stream from endpoints
{self.__endpoints.__str__()} exception, {e}"
+ )
async def stream_write(self, req):
if self.__stream_stream_call is not None:
@@ -164,6 +182,7 @@ class RpcChannel:
def create_channel(self, loop):
# create grpc channel with the given loop
+ # assert loop == RpcClient._io_loop
asyncio.set_event_loop(loop)
self.__create_aio_channel()
@@ -173,7 +192,9 @@ class RpcChannel:
if self.__telemetry_stream_stream_call is not None:
self.__telemetry_stream_stream_call.close()
self.__telemetry_stream_stream_call = None
- logger.info(f"channel[{self.__endpoints.__str__()}] close
stream_stream_call success.")
+ logger.info(
+ f"channel[{self.__endpoints.__str__()}] close
stream_stream_call success."
+ )
if self.channel_state() is not ChannelConnectivity.SHUTDOWN:
# close grpc channel
asyncio.run_coroutine_threadsafe(self.__async_channel.close(),
loop)
@@ -189,29 +210,43 @@ class RpcChannel:
def register_telemetry_stream_stream_call(self, stream_stream_call,
handler):
if self.__telemetry_stream_stream_call is not None:
self.__telemetry_stream_stream_call.close()
- self.__telemetry_stream_stream_call =
RpcStreamStreamCall(self.__endpoints, stream_stream_call, handler)
+ self.__telemetry_stream_stream_call = RpcStreamStreamCall(
+ self.__endpoints, stream_stream_call, handler
+ )
""" private """
def __create_aio_channel(self):
try:
if self.__endpoints is None:
- raise IllegalArgumentException("create_aio_channel exception,
endpoints is None")
+ raise IllegalArgumentException(
+ "create_aio_channel exception, endpoints is None"
+ )
else:
- options = [('grpc.enable_retries', 0),
("grpc.max_send_message_length", -1),
- ("grpc.max_receive_message_length", -1)]
+ options = [
+ ("grpc.enable_retries", 0),
+ ("grpc.max_send_message_length", -1),
+ ("grpc.max_receive_message_length", -1),
+ ]
if self.__tls_enabled:
- self.__async_channel =
aio.secure_channel(self.__endpoints.facade, grpc.ssl_channel_credentials(),
- options)
+ self.__async_channel = aio.secure_channel(
+ self.__endpoints.facade,
grpc.ssl_channel_credentials(), options
+ )
else:
- self.__async_channel =
aio.insecure_channel(self.__endpoints.facade, options)
+ self.__async_channel = aio.insecure_channel(
+ self.__endpoints.facade, options
+ )
self.__async_stub = MessagingServiceStub(self.__async_channel)
logger.debug(
- f"create_aio_channel to [{self.__endpoints.__str__()}]
success. channel state:{self.__async_channel.get_state()}")
+ f"create_aio_channel to [{self.__endpoints.__str__()}]
success. channel state:{self.__async_channel.get_state()}"
+ )
except Exception as e:
- logger.error(f"create_aio_channel to
[{self.__endpoints.__str__()}] exception: {e}")
+ logger.error(
+ f"create_aio_channel to [{self.__endpoints.__str__()}]
exception: {e}"
+ )
raise e
+ #
""" property """
@property
diff --git a/python/rocketmq/v5/client/connection/rpc_client.py
b/python/rocketmq/v5/client/connection/rpc_client.py
index 3bcd3e16..77e6473c 100644
--- a/python/rocketmq/v5/client/connection/rpc_client.py
+++ b/python/rocketmq/v5/client/connection/rpc_client.py
@@ -42,8 +42,11 @@ class RpcClient:
# start an event loop for async io
if RpcClient._io_loop is None:
initialized_event = threading.Event()
- RpcClient._io_loop_thread =
threading.Thread(target=RpcClient.__init_io_loop, args=(initialized_event,),
-
name="channel_io_loop_thread")
+ RpcClient._io_loop_thread = threading.Thread(
+ target=RpcClient.__init_io_loop,
+ args=(initialized_event,),
+ name="channel_io_loop_thread",
+ )
RpcClient._io_loop_thread.daemon = True
RpcClient._io_loop_thread.start()
# waiting for thread start success
@@ -57,18 +60,17 @@ class RpcClient:
def retrieve_or_create_channel(self, endpoints: RpcEndpoints):
if self.__enable_retrieve_channel is False:
raise Exception("RpcClient is not running.")
-
try:
# get or create a new grpc channel
- with RpcClient._channel_lock:
- channel = self.__get_channel(endpoints)
- if channel is not None:
- channel.update_time = int(time.time())
- else:
+ channel = self.__get_channel(endpoints)
+ if channel is not None:
+ channel.update_time = int(time.time())
+ else:
+ with RpcClient._channel_lock:
channel = RpcChannel(endpoints, self.__tls_enable)
channel.create_channel(RpcClient.get_channel_io_loop())
self.__put_channel(endpoints, channel)
- return channel
+ return channel
except Exception as e:
logger.error(f"retrieve or create channel exception: {e}")
raise e
@@ -80,11 +82,12 @@ class RpcClient:
for endpoints, channel in items:
if now - channel.update_time >
RpcClient.RPC_CLIENT_MAX_IDLE_SECONDS:
idle_endpoints.append(endpoints)
- with RpcClient._channel_lock:
- for endpoints in idle_endpoints:
- logger.info(f"remove idle channel {endpoints.__str__()}")
- self.__close_rpc_channel(endpoints)
- self.channels.remove(endpoints)
+ if len(idle_endpoints) > 0:
+ with RpcClient._channel_lock:
+ for endpoints in idle_endpoints:
+ logger.info(f"remove idle channel {endpoints.__str__()}")
+ self.__close_rpc_channel(endpoints)
+ self.channels.remove(endpoints)
def stop(self):
with RpcClient._channel_lock:
@@ -99,103 +102,190 @@ class RpcClient:
""" grpc MessageService """
- def query_topic_route_async(self, endpoints: RpcEndpoints, req:
QueryRouteRequest, metadata, timeout=3):
+ def query_topic_route_async(
+ self, endpoints: RpcEndpoints, req: QueryRouteRequest, metadata,
timeout=3
+ ):
return RpcClient.__run_message_service_async(
- self.__query_route_async_0(endpoints, req, metadata=metadata,
timeout=timeout))
-
- def send_message_async(self, endpoints: RpcEndpoints, req:
SendMessageRequest, metadata, timeout=3):
+ self.__query_route_async_0(
+ endpoints, req, metadata=metadata, timeout=timeout
+ )
+ )
+
+ def send_message_async(
+ self, endpoints: RpcEndpoints, req: SendMessageRequest, metadata,
timeout=3
+ ):
return RpcClient.__run_message_service_async(
- self.__send_message_0(endpoints, req, metadata=metadata,
timeout=timeout))
+ self.__send_message_0(endpoints, req, metadata=metadata,
timeout=timeout)
+ )
- def receive_message_async(self, endpoints: RpcEndpoints, req:
ReceiveMessageRequest, metadata, timeout=3):
+ def receive_message_async(
+ self, endpoints: RpcEndpoints, req: ReceiveMessageRequest, metadata,
timeout=3
+ ):
return RpcClient.__run_message_service_async(
- self.__receive_message_0(endpoints, req, metadata=metadata,
timeout=timeout))
+ self.__receive_message_0(endpoints, req, metadata=metadata,
timeout=timeout)
+ )
- def ack_message_async(self, endpoints: RpcEndpoints, req:
AckMessageRequest, metadata, timeout=3):
+ def ack_message_async(
+ self, endpoints: RpcEndpoints, req: AckMessageRequest, metadata,
timeout=3
+ ):
return RpcClient.__run_message_service_async(
- self.__ack_message_0(endpoints, req, metadata=metadata,
timeout=timeout))
-
- def change_invisible_duration_async(self, endpoints: RpcEndpoints, req:
ChangeInvisibleDurationRequest, metadata,
- timeout=3):
+ self.__ack_message_0(endpoints, req, metadata=metadata,
timeout=timeout)
+ )
+
+ def change_invisible_duration_async(
+ self,
+ endpoints: RpcEndpoints,
+ req: ChangeInvisibleDurationRequest,
+ metadata,
+ timeout=3,
+ ):
return RpcClient.__run_message_service_async(
- self.__change_invisible_duration_0(endpoints, req,
metadata=metadata, timeout=timeout))
-
- def heartbeat_async(self, endpoints: RpcEndpoints, req: HeartbeatRequest,
metadata, timeout=3):
+ self.__change_invisible_duration_0(
+ endpoints, req, metadata=metadata, timeout=timeout
+ )
+ )
+
+ def heartbeat_async(
+ self, endpoints: RpcEndpoints, req: HeartbeatRequest, metadata,
timeout=3
+ ):
return RpcClient.__run_message_service_async(
- self.__heartbeat_async_0(endpoints, req, metadata=metadata,
timeout=timeout))
+ self.__heartbeat_async_0(endpoints, req, metadata=metadata,
timeout=timeout)
+ )
def telemetry_write_async(self, endpoints: RpcEndpoints, req:
TelemetryCommand):
return RpcClient.__run_message_service_async(
-
self.retrieve_or_create_channel(endpoints).telemetry_stream_stream_call.stream_write(req))
-
- def end_transaction_async(self, endpoints: RpcEndpoints, req:
EndTransactionRequest, metadata, timeout=3):
+ self.retrieve_or_create_channel(
+ endpoints
+ ).telemetry_stream_stream_call.stream_write(req)
+ )
+
+ def end_transaction_async(
+ self, endpoints: RpcEndpoints, req: EndTransactionRequest, metadata,
timeout=3
+ ):
return RpcClient.__run_message_service_async(
- self.__end_transaction_0(endpoints, req, metadata=metadata,
timeout=timeout))
-
- def notify_client_termination(self, endpoints: RpcEndpoints, req:
NotifyClientTerminationRequest, metadata,
- timeout=3):
+ self.__end_transaction_0(endpoints, req, metadata=metadata,
timeout=timeout)
+ )
+
+ def notify_client_termination(
+ self,
+ endpoints: RpcEndpoints,
+ req: NotifyClientTerminationRequest,
+ metadata,
+ timeout=3,
+ ):
return RpcClient.__run_message_service_async(
- self.__notify_client_termination_0(endpoints, req,
metadata=metadata, timeout=timeout))
-
- def telemetry_stream(self, endpoints: RpcEndpoints, client, metadata,
timeout=3000, rebuild=False):
+ self.__notify_client_termination_0(
+ endpoints, req, metadata=metadata, timeout=timeout
+ )
+ )
+
+ def end_transaction_for_server_check(
+ self, endpoints: RpcEndpoints, req: EndTransactionRequest, metadata,
timeout=3
+ ):
+ # assert asyncio.get_running_loop() == RpcClient._io_loop
try:
- channel = self.retrieve_or_create_channel(endpoints)
- if channel.telemetry_stream_stream_call is None or rebuild is True:
- stream = channel.async_stub.Telemetry(metadata=metadata,
timeout=timeout)
- channel.register_telemetry_stream_stream_call(stream, client)
-
asyncio.run_coroutine_threadsafe(channel.telemetry_stream_stream_call.start_stream_read(),
-
RpcClient.get_channel_io_loop())
- logger.info(
- f"{client.__str__()} rebuild stream_steam_call to
{endpoints.__str__()} success." if rebuild else f"{client.__str__()} create
stream_steam_call to {endpoints.__str__()} success.")
+ return self.__end_transaction_0(
+ endpoints, req, metadata=metadata, timeout=timeout
+ )
except Exception as e:
+ logger.error(
+ f"end transaction exception, topic:{req.topic.name},
message_id:{req.message_id}, transaction_id:{req.transaction_id}: {e}"
+ )
raise e
- def end_transaction_for_server_check(self, endpoints: RpcEndpoints, req:
EndTransactionRequest, metadata,
- timeout=3):
+ """ build stream_stream_call """
+
+ def telemetry_stream(
+ self, endpoints: RpcEndpoints, client, metadata, rebuild, timeout=3000
+ ):
+ # assert asyncio.get_running_loop() == RpcClient._io_loop
try:
- return self.__end_transaction_0(endpoints, req, metadata=metadata,
timeout=timeout)
+ channel = self.retrieve_or_create_channel(endpoints)
+ stream = channel.async_stub.Telemetry(
+ metadata=metadata, timeout=timeout, wait_for_ready=True
+ )
+ channel.register_telemetry_stream_stream_call(stream, client)
+ asyncio.run_coroutine_threadsafe(
+ channel.telemetry_stream_stream_call.start_stream_read(),
+ RpcClient.get_channel_io_loop(),
+ )
+ logger.info(
+ f"{client.__str__()} rebuild stream_steam_call to
{endpoints.__str__()}."
+ if rebuild
+ else f"{client.__str__()} create stream_steam_call to
{endpoints.__str__()}."
+ )
+ return channel
except Exception as e:
- logger.error(
- f"end transaction exception, topic:{req.topic.name},
message_id:{req.message_id}, transaction_id:{req.transaction_id}: {e}")
raise e
""" MessageService.stub impl """
- async def __query_route_async_0(self, endpoints: RpcEndpoints, req:
QueryRouteRequest, metadata, timeout=3):
- return await
self.retrieve_or_create_channel(endpoints).async_stub.QueryRoute(req,
metadata=metadata,
-
timeout=timeout)
-
- async def __send_message_0(self, endpoints: RpcEndpoints, req:
SendMessageRequest, metadata, timeout=3):
- return await
self.retrieve_or_create_channel(endpoints).async_stub.SendMessage(req,
metadata=metadata,
-
timeout=timeout)
-
- async def __receive_message_0(self, endpoints: RpcEndpoints, req:
ReceiveMessageRequest, metadata, timeout=3):
- return
self.retrieve_or_create_channel(endpoints).async_stub.ReceiveMessage(req,
metadata=metadata,
-
timeout=timeout)
-
- async def __ack_message_0(self, endpoints: RpcEndpoints, req:
AckMessageRequest, metadata, timeout=3):
- return await
self.retrieve_or_create_channel(endpoints).async_stub.AckMessage(req,
metadata=metadata,
-
timeout=timeout)
-
- async def __heartbeat_async_0(self, endpoints: RpcEndpoints, req:
HeartbeatRequest, metadata, timeout=3):
- return await
self.retrieve_or_create_channel(endpoints).async_stub.Heartbeat(req,
metadata=metadata,
-
timeout=timeout)
-
- async def __change_invisible_duration_0(self, endpoints: RpcEndpoints,
req: ChangeInvisibleDurationRequest,
- metadata, timeout=3):
- return await
self.retrieve_or_create_channel(endpoints).async_stub.ChangeInvisibleDuration(req,
-
metadata=metadata,
-
timeout=timeout)
-
- async def __end_transaction_0(self, endpoints: RpcEndpoints, req:
EndTransactionRequest, metadata, timeout=3):
- return await
self.retrieve_or_create_channel(endpoints).async_stub.EndTransaction(req,
metadata=metadata,
-
timeout=timeout)
-
- async def __notify_client_termination_0(self, endpoints: RpcEndpoints,
req: NotifyClientTerminationRequest,
- metadata, timeout=3):
- return await
self.retrieve_or_create_channel(endpoints).async_stub.NotifyClientTermination(req,
-
metadata=metadata,
-
timeout=timeout)
+ async def __query_route_async_0(
+ self, endpoints: RpcEndpoints, req: QueryRouteRequest, metadata,
timeout=3
+ ):
+ return await
self.retrieve_or_create_channel(endpoints).async_stub.QueryRoute(
+ req, metadata=metadata, timeout=timeout
+ )
+
+ async def __send_message_0(
+ self, endpoints: RpcEndpoints, req: SendMessageRequest, metadata,
timeout=3
+ ):
+ return await
self.retrieve_or_create_channel(endpoints).async_stub.SendMessage(
+ req, metadata=metadata, timeout=timeout
+ )
+
+ async def __receive_message_0(
+ self, endpoints: RpcEndpoints, req: ReceiveMessageRequest, metadata,
timeout=3
+ ):
+ return
self.retrieve_or_create_channel(endpoints).async_stub.ReceiveMessage(
+ req, metadata=metadata, timeout=timeout
+ )
+
+ async def __ack_message_0(
+ self, endpoints: RpcEndpoints, req: AckMessageRequest, metadata,
timeout=3
+ ):
+ return await
self.retrieve_or_create_channel(endpoints).async_stub.AckMessage(
+ req, metadata=metadata, timeout=timeout
+ )
+
+ async def __heartbeat_async_0(
+ self, endpoints: RpcEndpoints, req: HeartbeatRequest, metadata,
timeout=3
+ ):
+ return await
self.retrieve_or_create_channel(endpoints).async_stub.Heartbeat(
+ req, metadata=metadata, timeout=timeout
+ )
+
+ async def __change_invisible_duration_0(
+ self,
+ endpoints: RpcEndpoints,
+ req: ChangeInvisibleDurationRequest,
+ metadata,
+ timeout=3,
+ ):
+ return await self.retrieve_or_create_channel(
+ endpoints
+ ).async_stub.ChangeInvisibleDuration(req, metadata=metadata,
timeout=timeout)
+
+ async def __end_transaction_0(
+ self, endpoints: RpcEndpoints, req: EndTransactionRequest, metadata,
timeout=3
+ ):
+ return await self.retrieve_or_create_channel(
+ endpoints
+ ).async_stub.EndTransaction(req, metadata=metadata, timeout=timeout)
+
+ async def __notify_client_termination_0(
+ self,
+ endpoints: RpcEndpoints,
+ req: NotifyClientTerminationRequest,
+ metadata,
+ timeout=3,
+ ):
+ return await self.retrieve_or_create_channel(
+ endpoints
+ ).async_stub.NotifyClientTermination(req, metadata=metadata,
timeout=timeout)
+
+ async def __create_channel_async(self, endpoints: RpcEndpoints):
+ return self.retrieve_or_create_channel(endpoints)
""" private """
@@ -207,7 +297,10 @@ class RpcClient:
def __close_rpc_channel(self, endpoints: RpcEndpoints):
channel = self.__get_channel(endpoints)
- if channel is not None and channel.channel_state() is not
ChannelConnectivity.SHUTDOWN:
+ if (
+ channel is not None
+ and channel.channel_state() is not ChannelConnectivity.SHUTDOWN
+ ):
try:
channel.close_channel(RpcClient.get_channel_io_loop())
self.channels.remove(endpoints)
@@ -234,7 +327,9 @@ class RpcClient:
def __run_message_service_async(func):
try:
# execute grpc call in RpcClient._io_loop
- return asyncio.run_coroutine_threadsafe(func,
RpcClient.get_channel_io_loop())
+ return asyncio.run_coroutine_threadsafe(
+ func, RpcClient.get_channel_io_loop()
+ )
except Exception as e:
future = Future()
future.set_exception(e)
diff --git a/python/rocketmq/v5/client/metrics/client_metrics.py
b/python/rocketmq/v5/client/metrics/client_metrics.py
index 7aa94433..210631ee 100644
--- a/python/rocketmq/v5/client/metrics/client_metrics.py
+++ b/python/rocketmq/v5/client/metrics/client_metrics.py
@@ -76,16 +76,21 @@ class ClientMetrics:
def send_after(self, send_context: MetricContext, success: bool):
if send_context is None:
- logger.warn("metrics do send after exception. send_context must
not be none.")
+ logger.warn(
+ "metrics do send after exception. send_context must not be
none."
+ )
return
if send_context.metric_type != MessageMetricType.SEND:
logger.warn(
- f"metric type must be MessageMetricType.SEND. current
send_context type is {send_context.metric_type}")
+ f"metric type must be MessageMetricType.SEND. current
send_context type is {send_context.metric_type}"
+ )
return
if send_context.get_attr("send_stopwatch") is None:
- logger.warn("metrics do send after exception. send_stopwatch must
not be none.")
+ logger.warn(
+ "metrics do send after exception. send_stopwatch must not be
none."
+ )
return
if send_context.get_attr("topic") is None:
@@ -108,7 +113,11 @@ class ClientMetrics:
if metric.endpoints is None:
return True
# if metrics endpoints changed, return False
- if self.__enabled and metric.on and self.__endpoints ==
RpcEndpoints(metric.endpoints):
+ if (
+ self.__enabled
+ and metric.on
+ and self.__endpoints == RpcEndpoints(metric.endpoints)
+ ):
return True
return not self.__enabled and not metric.on
@@ -122,36 +131,51 @@ class ClientMetrics:
def __meter_provider_start(self):
if self.__endpoints is None:
- logger.warn(f"client:{self.__client_id} can't create meter
provider, because endpoints is none.")
+ logger.warn(
+ f"client:{self.__client_id} can't create meter provider,
because endpoints is none."
+ )
return
try:
# setup OTLP exporter
- exporter = OTLPMetricExporter(endpoint=self.__endpoints.__str__(),
insecure=True,
-
timeout=ClientMetrics.METRIC_EXPORTER_RPC_TIMEOUT)
+ exporter = OTLPMetricExporter(
+ endpoint=self.__endpoints.__str__(),
+ insecure=True,
+ timeout=ClientMetrics.METRIC_EXPORTER_RPC_TIMEOUT,
+ )
# create a metric reader and set the export interval
- reader = PeriodicExportingMetricReader(exporter,
-
export_interval_millis=ClientMetrics.METRIC_READER_INTERVAL)
+ reader = PeriodicExportingMetricReader(
+ exporter,
export_interval_millis=ClientMetrics.METRIC_READER_INTERVAL
+ )
# create an empty resource
resource = Resource.get_empty()
# create view
- send_cost_time_view = View(instrument_type=Histogram,
-
instrument_name=HistogramEnum.SEND_COST_TIME.histogram_name,
-
aggregation=ExplicitBucketHistogramAggregation(
-
HistogramEnum.SEND_COST_TIME.buckets))
+ send_cost_time_view = View(
+ instrument_type=Histogram,
+ instrument_name=HistogramEnum.SEND_COST_TIME.histogram_name,
+ aggregation=ExplicitBucketHistogramAggregation(
+ HistogramEnum.SEND_COST_TIME.buckets
+ ),
+ )
# create MeterProvider
- self.__meter_provider = MeterProvider(metric_readers=[reader],
resource=resource,
- views=[send_cost_time_view])
+ self.__meter_provider = MeterProvider(
+ metric_readers=[reader], resource=resource,
views=[send_cost_time_view]
+ )
# define the histogram instruments
self.__send_success_cost_time_instrument =
self.__meter_provider.get_meter(
-
ClientMetrics.METRIC_INSTRUMENTATION_NAME).create_histogram(HistogramEnum.SEND_COST_TIME.histogram_name)
+ ClientMetrics.METRIC_INSTRUMENTATION_NAME
+ ).create_histogram(HistogramEnum.SEND_COST_TIME.histogram_name)
except Exception as e:
- logger.error(f"client:{self.__client_id} start meter provider
exception: {e}")
+ logger.error(
+ f"client:{self.__client_id} start meter provider exception:
{e}"
+ )
def __record_send_success_cost_time(self, context, amount):
if self.__enabled:
try:
# record send message cost time and result
- self.__send_success_cost_time_instrument.record(amount,
context.attributes)
+ self.__send_success_cost_time_instrument.record(
+ amount, context.attributes
+ )
except Exception as e:
logger.error(f"record send message cost time exception, e:{e}")
diff --git a/python/rocketmq/v5/consumer/simple_consumer.py
b/python/rocketmq/v5/consumer/simple_consumer.py
index 11a09d1b..6b1483db 100644
--- a/python/rocketmq/v5/consumer/simple_consumer.py
+++ b/python/rocketmq/v5/consumer/simple_consumer.py
@@ -36,18 +36,27 @@ from rocketmq.v5.util import (AtomicInteger, ConcurrentMap,
class SimpleConsumer(Client):
- def __init__(self, client_configuration: ClientConfiguration,
consumer_group, subscription: dict = None,
- await_duration=20):
- if consumer_group is None or consumer_group.strip() == '':
+ def __init__(
+ self,
+ client_configuration: ClientConfiguration,
+ consumer_group,
+ subscription: dict = None,
+ await_duration=20,
+ ):
+ if consumer_group is None or consumer_group.strip() == "":
raise IllegalArgumentException("consumerGroup should not be null")
if Misc.is_valid_consumer_group(consumer_group) is False:
raise IllegalArgumentException(
- f"consumerGroup does not match the regex
[regex={Misc.CONSUMER_GROUP_PATTERN}]")
+ f"consumerGroup does not match the regex
[regex={Misc.CONSUMER_GROUP_PATTERN}]"
+ )
if await_duration is None:
raise IllegalArgumentException("awaitDuration should not be null")
- super().__init__(client_configuration, None if subscription is None
else subscription.keys(),
- ClientType.SIMPLE_CONSUMER)
+ super().__init__(
+ client_configuration,
+ None if subscription is None else subscription.keys(),
+ ClientType.SIMPLE_CONSUMER,
+ )
self.__consumer_group = consumer_group
self.__await_duration = await_duration # long polling timeout, seconds
# <String /* topic */, FilterExpression>
@@ -64,19 +73,30 @@ class SimpleConsumer(Client):
def subscribe(self, topic, filter_expression: FilterExpression = None):
if self.is_running is False:
- raise IllegalStateException("unable to add subscription because
simple consumer is not running")
+ raise IllegalStateException(
+ "unable to add subscription because simple consumer is not
running"
+ )
try:
if not self.__subscriptions.contains(topic):
self._retrieve_topic_route_data(topic)
- self.__subscriptions.put(topic, filter_expression if
filter_expression is not None else FilterExpression())
+ self.__subscriptions.put(
+ topic,
+ (
+ filter_expression
+ if filter_expression is not None
+ else FilterExpression()
+ ),
+ )
except Exception as e:
- logger.error(f"subscribe exception: {e}")
+ logger.error(f"subscribe raise exception: {e}")
raise e
def unsubscribe(self, topic):
if self.is_running is False:
- raise IllegalStateException("unable to remove subscription because
simple consumer is not running")
+ raise IllegalStateException(
+ "unable to remove subscription because simple consumer is not
running"
+ )
if topic in self.__subscriptions:
self.__subscriptions.remove(topic)
@@ -84,43 +104,57 @@ class SimpleConsumer(Client):
def receive(self, max_message_num, invisible_duration):
if self.is_running is False:
- raise IllegalStateException("unable to receive messages because
simple consumer is not running")
+ raise IllegalStateException(
+ "unable to receive messages because simple consumer is not
running"
+ )
return self.__receive(max_message_num, invisible_duration)
def receive_async(self, max_message_num, invisible_duration):
if self.is_running is False:
- raise IllegalStateException("unable to receive messages because
simple consumer is not running")
+ raise IllegalStateException(
+ "unable to receive messages because simple consumer is not
running"
+ )
return self.__receive_async(max_message_num, invisible_duration)
def ack(self, message: Message):
if self.is_running is False:
- raise IllegalStateException("unable to ack message because simple
consumer is not running")
+ raise IllegalStateException(
+ "unable to ack message because simple consumer is not running"
+ )
queue = self.__select_topic_queue(message.topic)
return self.__ack(message, queue)
def ack_async(self, message: Message):
if self.is_running is False:
- raise IllegalStateException("unable to ack message because simple
consumer is not running")
+ raise IllegalStateException(
+ "unable to ack message because simple consumer is not running"
+ )
queue = self.__select_topic_queue(message.topic)
return self.__ack_async(message, queue)
def change_invisible_duration(self, message: Message, invisible_duration):
if self.is_running is False:
- raise IllegalStateException("unable to change invisible duration
because simple consumer is not running")
+ raise IllegalStateException(
+ "unable to change invisible duration because simple consumer
is not running"
+ )
queue = self.__select_topic_queue(message.topic)
return self.__change_invisible_duration(message, queue,
invisible_duration)
def change_invisible_duration_async(self, message: Message,
invisible_duration):
if self.is_running is False:
- raise IllegalStateException("unable to change invisible duration
because simple consumer is not running")
+ raise IllegalStateException(
+ "unable to change invisible duration because simple consumer
is not running"
+ )
queue = self.__select_topic_queue(message.topic)
- return self.__change_invisible_duration_async(message, queue,
invisible_duration)
+ return self.__change_invisible_duration_async(
+ message, queue, invisible_duration
+ )
""" override """
@@ -190,21 +224,25 @@ class SimpleConsumer(Client):
def __select_topic_for_receive(self):
try:
# select the next topic for receive
- mod_index = self.__topic_index.get_and_increment() %
len(self.__subscriptions.keys())
+ mod_index = self.__topic_index.get_and_increment() % len(
+ self.__subscriptions.keys()
+ )
return list(self.__subscriptions.keys())[mod_index]
except Exception as e:
- logger.error(f"simple consumer select topic for receive message
exception: {e}")
+ logger.error(
+ f"simple consumer select topic for receive message exception:
{e}"
+ )
raise e
def __select_topic_queue(self, topic):
try:
route = self._retrieve_topic_route_data(topic)
- queue_selector =
self.__receive_queue_selectors.put_if_absent(topic,
-
QueueSelector.simple_consumer_queue_selector(
-
route))
+ queue_selector = self.__receive_queue_selectors.put_if_absent(
+ topic, QueueSelector.simple_consumer_queue_selector(route)
+ )
return queue_selector.select_next_queue()
except Exception as e:
- logger.error(f"simple consumer select topic queue for receive
message exception: {e}")
+ logger.error(f"simple consumer select topic queue raise exception:
{e}")
raise e
def __receive(self, max_message_num, invisible_duration):
@@ -213,9 +251,13 @@ class SimpleConsumer(Client):
queue = self.__select_topic_queue(topic)
req = self.__receive_req(topic, queue, max_message_num,
invisible_duration)
timeout = self.client_configuration.request_timeout +
self.__await_duration
- future = self.rpc_client.receive_message_async(queue.endpoints, req,
metadata=self._sign(), timeout=timeout)
- read_future =
asyncio.run_coroutine_threadsafe(self.__receive_message_response(future.result()),
-
self._rpc_channel_io_loop())
+ future = self.rpc_client.receive_message_async(
+ queue.endpoints, req, metadata=self._sign(), timeout=timeout
+ )
+ read_future = asyncio.run_coroutine_threadsafe(
+ self.__receive_message_response(future.result()),
+ self._rpc_channel_io_loop(),
+ )
return self.__handle_receive_message_response(read_future.result())
def __receive_async(self, max_message_num, invisible_duration):
@@ -225,11 +267,17 @@ class SimpleConsumer(Client):
queue = self.__select_topic_queue(topic)
req = self.__receive_req(topic, queue, max_message_num,
invisible_duration)
timeout = self.client_configuration.request_timeout +
self.__await_duration
- future = self.rpc_client.receive_message_async(queue.endpoints,
req, metadata=self._sign(), timeout=timeout)
- read_future =
asyncio.run_coroutine_threadsafe(self.__receive_message_response(future.result()),
-
self._rpc_channel_io_loop())
+ future = self.rpc_client.receive_message_async(
+ queue.endpoints, req, metadata=self._sign(), timeout=timeout
+ )
+ read_future = asyncio.run_coroutine_threadsafe(
+ self.__receive_message_response(future.result()),
+ self._rpc_channel_io_loop(),
+ )
ret_future = Future()
- handle_send_receipt_callback =
functools.partial(self.__receive_message_callback, ret_future=ret_future)
+ handle_send_receipt_callback = functools.partial(
+ self.__receive_message_callback, ret_future=ret_future
+ )
read_future.add_done_callback(handle_send_receipt_callback)
return ret_future
except Exception as e:
@@ -261,20 +309,28 @@ class SimpleConsumer(Client):
try:
responses = future.result()
messages = self.__handle_receive_message_response(responses)
-
self._set_future_callback_result(CallbackResult.async_receive_callback_result(ret_future,
messages))
+ self._set_future_callback_result(
+ CallbackResult.async_receive_callback_result(ret_future,
messages)
+ )
except Exception as e:
-
self._set_future_callback_result(CallbackResult.async_receive_callback_result(ret_future,
e, False))
+ self._set_future_callback_result(
+ CallbackResult.async_receive_callback_result(ret_future, e,
False)
+ )
async def __receive_message_response(self, unary_stream_call):
try:
responses = list()
async for res in unary_stream_call:
if res.HasField("message") or res.HasField("status"):
- logger.debug(f"consumer:{self.__consumer_group} receive
response: {res}")
+ logger.debug(
+ f"consumer:{self.__consumer_group} receive response:
{res}"
+ )
responses.append(res)
return responses
except Exception as e:
- logger.error(f"consumer:{self.__consumer_group} receive message
exception: {e}")
+ logger.error(
+ f"consumer:{self.__consumer_group} receive message exception:
{e}"
+ )
raise e
def __handle_receive_message_response(self, responses):
@@ -284,7 +340,8 @@ class SimpleConsumer(Client):
for res in responses:
if res.HasField("status"):
logger.debug(
- f"simple_consumer[{self.__consumer_group}]
receive_message, code:{res.status.code}, message:{res.status.message}.")
+ f"simple_consumer[{self.__consumer_group}]
receive_message, code:{res.status.code}, message:{res.status.message}."
+ )
status = res.status
elif res.HasField("message"):
messages.append(Message().fromProtobuf(res.message))
@@ -300,7 +357,12 @@ class SimpleConsumer(Client):
try:
req = self.__ack_req(message)
- future = self.rpc_client.ack_message_async(queue.endpoints, req,
metadata=self._sign())
+ future = self.rpc_client.ack_message_async(
+ queue.endpoints,
+ req,
+ metadata=self._sign(),
+ timeout=self.client_configuration.request_timeout,
+ )
self.__handle_ack_result(future)
except Exception as e:
raise e
@@ -311,9 +373,16 @@ class SimpleConsumer(Client):
try:
req = self.__ack_req(message)
- future = self.rpc_client.ack_message_async(queue.endpoints, req,
metadata=self._sign())
+ future = self.rpc_client.ack_message_async(
+ queue.endpoints,
+ req,
+ metadata=self._sign(),
+ timeout=self.client_configuration.request_timeout,
+ )
ret_future = Future()
- ack_callback = functools.partial(self.__handle_ack_result,
ret_future=ret_future)
+ ack_callback = functools.partial(
+ self.__handle_ack_result, ret_future=ret_future
+ )
future.add_done_callback(ack_callback)
return ret_future
except Exception as e:
@@ -335,15 +404,21 @@ class SimpleConsumer(Client):
def __handle_ack_result(self, future, ret_future=None):
try:
res = future.result()
- logger.debug(f"consumer[{self.__consumer_group}] ack response,
{res.status}")
+ logger.debug(
+ f"consumer[{self.__consumer_group}] ack response, {res.status}"
+ )
MessagingResultChecker.check(res.status)
if ret_future is not None:
-
self._set_future_callback_result(CallbackResult.async_ack_callback_result(ret_future,
None))
+ self._set_future_callback_result(
+ CallbackResult.async_ack_callback_result(ret_future, None)
+ )
except Exception as e:
if ret_future is None:
raise e
else:
-
self._set_future_callback_result(CallbackResult.async_ack_callback_result(ret_future,
e, False))
+ self._set_future_callback_result(
+ CallbackResult.async_ack_callback_result(ret_future, e,
False)
+ )
# change_invisible
@@ -353,20 +428,34 @@ class SimpleConsumer(Client):
try:
req = self.__change_invisible_req(message, invisible_duration)
- future =
self.rpc_client.change_invisible_duration_async(queue.endpoints, req,
metadata=self._sign())
+ future = self.rpc_client.change_invisible_duration_async(
+ queue.endpoints,
+ req,
+ metadata=self._sign(),
+ timeout=self.client_configuration.request_timeout,
+ )
self.__handle_change_invisible_result(future)
except Exception as e:
raise e
- def __change_invisible_duration_async(self, message: Message, queue,
invisible_duration):
+ def __change_invisible_duration_async(
+ self, message: Message, queue, invisible_duration
+ ):
if self.is_running is False:
raise IllegalArgumentException("consumer is not running now.")
try:
req = self.__change_invisible_req(message, invisible_duration)
- future =
self.rpc_client.change_invisible_duration_async(queue.endpoints, req,
metadata=self._sign())
+ future = self.rpc_client.change_invisible_duration_async(
+ queue.endpoints,
+ req,
+ metadata=self._sign(),
+ timeout=self.client_configuration.request_timeout,
+ )
ret_future = Future()
- change_invisible_callback =
functools.partial(self.__handle_change_invisible_result, ret_future=ret_future)
+ change_invisible_callback = functools.partial(
+ self.__handle_change_invisible_result, ret_future=ret_future
+ )
future.add_done_callback(change_invisible_callback)
return ret_future
except Exception as e:
@@ -386,17 +475,25 @@ class SimpleConsumer(Client):
def __handle_change_invisible_result(self, future, ret_future=None):
try:
res = future.result()
- logger.debug(f"consumer[{self.__consumer_group}] change invisible
response, {res.status}")
+ logger.debug(
+ f"consumer[{self.__consumer_group}] change invisible response,
{res.status}"
+ )
MessagingResultChecker.check(res.status)
if ret_future is not None:
self._set_future_callback_result(
-
CallbackResult.async_change_invisible_duration_callback_result(ret_future,
None))
+
CallbackResult.async_change_invisible_duration_callback_result(
+ ret_future, None
+ )
+ )
except Exception as e:
if ret_future is None:
raise e
else:
self._set_future_callback_result(
-
CallbackResult.async_change_invisible_duration_callback_result(ret_future, e,
False))
+
CallbackResult.async_change_invisible_duration_callback_result(
+ ret_future, e, False
+ )
+ )
""" property """
diff --git a/python/rocketmq/v5/exception/client_exception.py
b/python/rocketmq/v5/exception/client_exception.py
index d4239645..8e9b4e44 100644
--- a/python/rocketmq/v5/exception/client_exception.py
+++ b/python/rocketmq/v5/exception/client_exception.py
@@ -13,6 +13,7 @@
# See the License for the specific language governing permissions and
# limitations under the License.
+
class ClientException(Exception):
def __init__(self, message, code=None):
diff --git a/python/rocketmq/v5/log/log_config.py
b/python/rocketmq/v5/log/log_config.py
index 1de3240e..a363c650 100644
--- a/python/rocketmq/v5/log/log_config.py
+++ b/python/rocketmq/v5/log/log_config.py
@@ -19,35 +19,33 @@ import os
__DIR = f'{os.path.expanduser("~/logs/rocketmq_python/")}'
__LOG_CONFIG = {
- 'version': 1.0,
- 'disable_existing_loggers': False,
- 'formatters': {
- 'standard': {
- 'format': '%(asctime)s [%(levelname)s] %(message)s'
- },
+ "version": 1.0,
+ "disable_existing_loggers": False,
+ "formatters": {
+ "standard": {"format": "%(asctime)s [%(levelname)s] %(message)s"},
},
- 'handlers': {
+ "handlers": {
# 'console': {
# 'level': 'DEBUG',
# 'class': 'logging.StreamHandler',
# 'formatter': 'standard'
# },
- 'file': {
- 'class': 'logging.handlers.RotatingFileHandler',
- 'level': 'INFO',
- 'formatter': 'standard',
- 'filename': f'{__DIR}/rocketmq_client.log',
- 'maxBytes': 1024 * 1024 * 100, # 100MB
- 'backupCount': 10,
+ "file": {
+ "class": "logging.handlers.RotatingFileHandler",
+ "level": "INFO",
+ "formatter": "standard",
+ "filename": f"{__DIR}/rocketmq_client.log",
+ "maxBytes": 1024 * 1024 * 100, # 100MB
+ "backupCount": 10,
},
},
- 'loggers': {
- 'rocketmq-python-client': {
- 'handlers': ['file'],
- 'level': 'INFO',
- 'propagate': False
+ "loggers": {
+ "rocketmq-python-client": {
+ "handlers": ["file"],
+ "level": "INFO",
+ "propagate": False,
},
- }
+ },
}
if not os.path.exists(__DIR):
diff --git a/python/rocketmq/v5/model/callback_result.py
b/python/rocketmq/v5/model/callback_result.py
index 568d6c43..ad967973 100644
--- a/python/rocketmq/v5/model/callback_result.py
+++ b/python/rocketmq/v5/model/callback_result.py
@@ -47,25 +47,41 @@ class CallbackResult:
@staticmethod
def async_send_callback_result(future, result, success=True):
callback_result = CallbackResult.callback_result(future, result,
success)
- callback_result.__result_type =
CallbackResultType.ASYNC_SEND_CALLBACK_RESULT if success else
CallbackResultType.ASYNC_SEND_CALLBACK_EXCEPTION
+ callback_result.__result_type = (
+ CallbackResultType.ASYNC_SEND_CALLBACK_RESULT
+ if success
+ else CallbackResultType.ASYNC_SEND_CALLBACK_EXCEPTION
+ )
return callback_result
@staticmethod
def async_receive_callback_result(future, result, success=True):
callback_result = CallbackResult.callback_result(future, result,
success)
- callback_result.__result_type =
CallbackResultType.ASYNC_ACK_CALLBACK_RESULT if success else
CallbackResultType.ASYNC_ACK_CALLBACK_EXCEPTION
+ callback_result.__result_type = (
+ CallbackResultType.ASYNC_ACK_CALLBACK_RESULT
+ if success
+ else CallbackResultType.ASYNC_ACK_CALLBACK_EXCEPTION
+ )
return callback_result
@staticmethod
def async_ack_callback_result(future, result, success=True):
callback_result = CallbackResult.callback_result(future, result,
success)
- callback_result.__result_type =
CallbackResultType.ASYNC_RECEIVE_CALLBACK_RESULT if success else
CallbackResultType.ASYNC_RECEIVE_CALLBACK_EXCEPTION
+ callback_result.__result_type = (
+ CallbackResultType.ASYNC_RECEIVE_CALLBACK_RESULT
+ if success
+ else CallbackResultType.ASYNC_RECEIVE_CALLBACK_EXCEPTION
+ )
return callback_result
@staticmethod
def async_change_invisible_duration_callback_result(future, result,
success=True):
callback_result = CallbackResult.callback_result(future, result,
success)
- callback_result.__result_type =
CallbackResultType.ASYNC_CHANGE_INVISIBLE_DURATION_RESULT if success else
CallbackResultType.ASYNC_CHANGE_INVISIBLE_DURATION_EXCEPTION
+ callback_result.__result_type = (
+ CallbackResultType.ASYNC_CHANGE_INVISIBLE_DURATION_RESULT
+ if success
+ else CallbackResultType.ASYNC_CHANGE_INVISIBLE_DURATION_EXCEPTION
+ )
return callback_result
@staticmethod
diff --git a/python/rocketmq/v5/model/filter_expression.py
b/python/rocketmq/v5/model/filter_expression.py
index 98e23da4..9cb0666e 100644
--- a/python/rocketmq/v5/model/filter_expression.py
+++ b/python/rocketmq/v5/model/filter_expression.py
@@ -19,7 +19,11 @@ from rocketmq.grpc_protocol import FilterType
class FilterExpression:
TAG_EXPRESSION_SUB_ALL = "*"
- def __init__(self, expression=TAG_EXPRESSION_SUB_ALL, filter_type:
FilterType = FilterType.TAG):
+ def __init__(
+ self,
+ expression=TAG_EXPRESSION_SUB_ALL,
+ filter_type: FilterType = FilterType.TAG,
+ ):
self.__expression = expression
self.__filter_type = filter_type
diff --git a/python/rocketmq/v5/model/message.py
b/python/rocketmq/v5/model/message.py
index efd32085..ead7d795 100644
--- a/python/rocketmq/v5/model/message.py
+++ b/python/rocketmq/v5/model/message.py
@@ -37,15 +37,19 @@ class Message:
self.__message_type = None
def __str__(self) -> str:
- return f"topic:{self.__topic}, tag:{self.__tag},
messageGroup:{self.__message_group}, " \
- f"deliveryTimestamp:{self.__delivery_timestamp},
keys:{self.__keys}, properties:{self.__properties}"
+ return (
+ f"topic:{self.__topic}, tag:{self.__tag},
messageGroup:{self.__message_group}, "
+ f"deliveryTimestamp:{self.__delivery_timestamp},
keys:{self.__keys}, properties:{self.__properties}"
+ )
def fromProtobuf(self, message: definition_pb2.Message): # noqa
try:
self.__message_body_check_sum(message)
self.__topic = message.topic.name
self.__namespace = message.topic.resource_namespace
- self.__message_id =
MessageIdCodec.decode(message.system_properties.message_id)
+ self.__message_id = MessageIdCodec.decode(
+ message.system_properties.message_id
+ )
self.__body = self.__uncompress_body(message)
self.__tag = message.system_properties.tag
self.__message_group = message.system_properties.message_group
@@ -71,18 +75,26 @@ class Message:
if message.system_properties.body_digest.type == DigestType.CRC32:
crc32_sum = Misc.crc32_checksum(message.body)
if message.system_properties.body_digest.checksum != crc32_sum:
- raise Exception(f"(body_check_sum exception,
{message.digest.checksum} != crc32_sum {crc32_sum}")
+ raise Exception(
+ f"(body_check_sum exception, {message.digest.checksum} !=
crc32_sum {crc32_sum}"
+ )
elif message.system_properties.body_digest.type == DigestType.MD5:
md5_sum = Misc.md5_checksum(message.body)
if message.system_properties.body_digest.checksum != md5_sum:
- raise Exception(f"(body_check_sum exception,
{message.digest.checksum} != crc32_sum {md5_sum}")
+ raise Exception(
+ f"(body_check_sum exception, {message.digest.checksum} !=
crc32_sum {md5_sum}"
+ )
elif message.system_properties.body_digest.type == DigestType.SHA1:
sha1_sum = Misc.sha1_checksum(message.body)
if message.system_properties.body_digest.checksum != sha1_sum:
- raise Exception(f"(body_check_sum exception,
{message.digest.checksum} != crc32_sum {sha1_sum}")
+ raise Exception(
+ f"(body_check_sum exception, {message.digest.checksum} !=
crc32_sum {sha1_sum}"
+ )
else:
- raise Exception(f"unsupported message body digest algorithm,
{message.system_properties.body_digest.type},"
- f" {message.topic},
{message.system_properties.message_id}")
+ raise Exception(
+ f"unsupported message body digest algorithm,
{message.system_properties.body_digest.type},"
+ f" {message.topic}, {message.system_properties.message_id}"
+ )
@staticmethod
def __uncompress_body(message):
@@ -92,7 +104,8 @@ class Message:
return message.body
else:
raise Exception(
- f"unsupported message encoding algorithm,
{message.system_properties.body_encoding}, {message.topic},
{message.system_properties.message_id}")
+ f"unsupported message encoding algorithm,
{message.system_properties.body_encoding}, {message.topic},
{message.system_properties.message_id}"
+ )
""" property """
@@ -154,18 +167,20 @@ class Message:
@body.setter
def body(self, body):
- if body is None or body.strip() == '':
+ if body is None or body.strip() == "":
raise IllegalArgumentException("body should not be blank")
self.__body = body
@topic.setter
def topic(self, topic):
- if topic is None or topic.strip() == '':
+ if topic is None or topic.strip() == "":
raise IllegalArgumentException("topic has not been set yet")
if Misc.is_valid_topic(topic):
self.__topic = topic
else:
- raise IllegalArgumentException(f"topic does not match the regex
[regex={Misc.TOPIC_PATTERN}]")
+ raise IllegalArgumentException(
+ f"topic does not match the regex [regex={Misc.TOPIC_PATTERN}]"
+ )
@message_id.setter
def message_id(self, message_id):
@@ -173,16 +188,18 @@ class Message:
@tag.setter
def tag(self, tag):
- if tag is None or tag.strip() == '':
+ if tag is None or tag.strip() == "":
raise IllegalArgumentException("tag should not be blank")
if "|" in tag:
- raise IllegalArgumentException("tag should not contain \"|\"")
+ raise IllegalArgumentException('tag should not contain "|"')
self.__tag = tag
@message_group.setter
def message_group(self, message_group):
if self.__delivery_timestamp is not None:
- raise IllegalArgumentException("deliveryTimestamp and messageGroup
should not be set at same time")
+ raise IllegalArgumentException(
+ "deliveryTimestamp and messageGroup should not be set at same
time"
+ )
if message_group is None or len(message_group) == 0:
raise IllegalArgumentException("messageGroup should not be blank")
self.__message_group = message_group
@@ -190,13 +207,15 @@ class Message:
@delivery_timestamp.setter
def delivery_timestamp(self, delivery_timestamp):
if self.__message_group is not None:
- raise IllegalArgumentException("deliveryTimestamp and messageGroup
should not be set at same time")
+ raise IllegalArgumentException(
+ "deliveryTimestamp and messageGroup should not be set at same
time"
+ )
self.__delivery_timestamp = delivery_timestamp
@keys.setter
def keys(self, *keys):
for key in keys:
- if not key or key.strip() == '':
+ if not key or key.strip() == "":
raise IllegalArgumentException("key should not be blank")
self.__keys.update(set(keys))
@@ -205,8 +224,8 @@ class Message:
self.__message_type = message_type
def add_property(self, key, value):
- if key is None or key.strip() == '':
+ if key is None or key.strip() == "":
raise IllegalArgumentException("key should not be blank")
- if value is None or value.strip() == '':
+ if value is None or value.strip() == "":
raise IllegalArgumentException("value should not be blank")
self.__properties[key] = value
diff --git a/python/rocketmq/v5/model/metrics.py
b/python/rocketmq/v5/model/metrics.py
index 1106ae4c..31e84a4e 100644
--- a/python/rocketmq/v5/model/metrics.py
+++ b/python/rocketmq/v5/model/metrics.py
@@ -54,9 +54,15 @@ class MetricContext:
class HistogramEnum(Enum):
# a histogram that records the cost time of successful api calls of
message publishing.
- SEND_COST_TIME = ("rocketmq_send_cost_time", [1.0, 5.0, 10.0, 20.0, 50.0,
200.0, 500.0])
+ SEND_COST_TIME = (
+ "rocketmq_send_cost_time",
+ [1.0, 5.0, 10.0, 20.0, 50.0, 200.0, 500.0],
+ )
# a histogram that records the latency of message delivery from remote.
- DELIVERY_LATENCY = ("rocketmq_delivery_latency", [1.0, 5.0, 10.0, 20.0,
50.0, 200.0, 500.0])
+ DELIVERY_LATENCY = (
+ "rocketmq_delivery_latency",
+ [1.0, 5.0, 10.0, 20.0, 50.0, 200.0, 500.0],
+ )
def __init__(self, histogram_name, buckets):
self.__histogram_name = histogram_name
diff --git a/python/rocketmq/v5/model/send_receipt.py
b/python/rocketmq/v5/model/send_receipt.py
index 90d576f5..f67997a1 100644
--- a/python/rocketmq/v5/model/send_receipt.py
+++ b/python/rocketmq/v5/model/send_receipt.py
@@ -13,6 +13,7 @@
# See the License for the specific language governing permissions and
# limitations under the License.
+
class SendReceipt:
def __init__(self, message_id, transaction_id, message_queue, offset):
diff --git a/python/rocketmq/v5/model/topic_route.py
b/python/rocketmq/v5/model/topic_route.py
index e62b6d6f..b93728e2 100644
--- a/python/rocketmq/v5/model/topic_route.py
+++ b/python/rocketmq/v5/model/topic_route.py
@@ -31,10 +31,16 @@ class MessageQueue:
self.__accept_message_types = set(queue.accept_message_types)
def is_readable(self):
- return self.__permission == Permission.READ or self.__permission ==
Permission.READ_WRITE
+ return (
+ self.__permission == Permission.READ
+ or self.__permission == Permission.READ_WRITE
+ )
def is_writable(self):
- return self.__permission == Permission.WRITE or self.__permission ==
Permission.READ_WRITE
+ return (
+ self.__permission == Permission.WRITE
+ or self.__permission == Permission.READ_WRITE
+ )
def is_master_broker(self):
return self.__broker_id == MessageQueue.MASTER_BROKER_ID
@@ -42,7 +48,17 @@ class MessageQueue:
def __eq__(self, other: object) -> bool:
if not isinstance(other, MessageQueue):
return False
- ret = (self.__topic == other.__topic and self.__namespace ==
other.__namespace and self.__queue_id == other.__queue_id and self.__permission
== other.__permission and self.__broker_name == other.__broker_name and
self.__broker_id == other.__broker_id and self.__broker_endpoints ==
other.__broker_endpoints and sorted(self.__accept_message_types) ==
sorted(other.__accept_message_types))
+ ret = (
+ self.__topic == other.__topic
+ and self.__namespace == other.__namespace
+ and self.__queue_id == other.__queue_id
+ and self.__permission == other.__permission
+ and self.__broker_name == other.__broker_name
+ and self.__broker_id == other.__broker_id
+ and self.__broker_endpoints == other.__broker_endpoints
+ and sorted(self.__accept_message_types)
+ == sorted(other.__accept_message_types)
+ )
return ret
def __str__(self):
@@ -75,7 +91,9 @@ class MessageQueue:
class TopicRouteData:
def __init__(self, message_queues):
- self.__message_queues = list(map(lambda queue: MessageQueue(queue),
message_queues))
+ self.__message_queues = list(
+ map(lambda queue: MessageQueue(queue), message_queues)
+ )
def __eq__(self, other):
if self is other:
@@ -88,7 +106,11 @@ class TopicRouteData:
return hash(tuple(self.__message_queues))
def __str__(self):
- return "message_queues:(" + ', '.join(str(queue) for queue in
self.__message_queues) + ")"
+ return (
+ "message_queues:("
+ + ", ".join(str(queue) for queue in self.__message_queues)
+ + ")"
+ )
def all_endpoints(self):
endpoints_map = {}
diff --git a/python/rocketmq/v5/producer/producer.py
b/python/rocketmq/v5/producer/producer.py
index 9f4ae356..ba9e3d9b 100644
--- a/python/rocketmq/v5/producer/producer.py
+++ b/python/rocketmq/v5/producer/producer.py
@@ -48,21 +48,31 @@ class Transaction:
def add_half_message(self, message: Message):
with Transaction.__transaction_lock:
if message is None:
- raise IllegalArgumentException("add half message error,
message is none.")
+ raise IllegalArgumentException(
+ "add half message error, message is none."
+ )
if self.__message is None:
self.__message = message
else:
- raise IllegalArgumentException(f"message already existed in
transaction, topic:{message.topic}")
+ raise IllegalArgumentException(
+ f"message already existed in transaction,
topic:{message.topic}"
+ )
def add_send_receipt(self, send_receipt):
with Transaction.__transaction_lock:
if self.__message is None:
- raise IllegalArgumentException("add send receipt error, no
message in transaction.")
+ raise IllegalArgumentException(
+ "add send receipt error, no message in transaction."
+ )
if send_receipt is None:
- raise IllegalArgumentException("add send receipt error, send
receipt in none.")
+ raise IllegalArgumentException(
+ "add send receipt error, send receipt in none."
+ )
if self.__message.message_id != send_receipt.message_id:
- raise IllegalArgumentException("can't add another send receipt
to a half message.")
+ raise IllegalArgumentException(
+ "can't add another send receipt to a half message."
+ )
self.__send_receipt = send_receipt
@@ -76,20 +86,28 @@ class Transaction:
if self.__message is None:
raise IllegalArgumentException("no message in transaction.")
if self.__send_receipt is None or self.__send_receipt.transaction_id
is None:
- raise IllegalArgumentException("no transaction_id in transaction,
must send half message at first.")
+ raise IllegalArgumentException(
+ "no transaction_id in transaction, must send half message at
first."
+ )
try:
- res =
self.__producer.end_transaction(self.__send_receipt.message_queue.endpoints,
self.__message,
-
self.__send_receipt.transaction_id, result,
-
TransactionSource.SOURCE_CLIENT)
+ res = self.__producer.end_transaction(
+ self.__send_receipt.message_queue.endpoints,
+ self.__message,
+ self.__send_receipt.transaction_id,
+ result,
+ TransactionSource.SOURCE_CLIENT,
+ )
if res.status.code != Code.OK:
logger.error(
- f"transaction commit or rollback error.
topic:{self.__message.topic}, message_id:{self.__message.message_id},
transaction_id:{self.__send_receipt.transaction_id},
transactionResolution:{result}")
+ f"transaction commit or rollback error.
topic:{self.__message.topic}, message_id:{self.__message.message_id},
transaction_id:{self.__send_receipt.transaction_id},
transactionResolution:{result}"
+ )
raise ClientException(res.status.message, res.status.code)
return res
except Exception as e:
logger.error(
- f"end transaction error, topic:{self.__message.topic},
message_id:{self.__send_receipt.message_id},
transaction_id:{self.__send_receipt.transaction_id},
transactionResolution:{result}: {e}")
+ f"end transaction error, topic:{self.__message.topic},
message_id:{self.__send_receipt.message_id},
transaction_id:{self.__send_receipt.transaction_id},
transactionResolution:{result}: {e}"
+ )
raise e
""" property """
@@ -109,11 +127,15 @@ class TransactionChecker(metaclass=abc.ABCMeta):
class Producer(Client):
MAX_SEND_ATTEMPTS = 3 # max retry times when send failed
- def __init__(self, client_configuration, topics=None, checker=None,
tls_enable=False):
+ def __init__(
+ self, client_configuration, topics=None, checker=None, tls_enable=False
+ ):
super().__init__(client_configuration, topics, ClientType.PRODUCER,
tls_enable)
# {topic, QueueSelector}
self.__send_queue_selectors = ConcurrentMap()
- self.__checker = checker # checker for transaction message, handle
checking from server
+ self.__checker = (
+ checker # checker for transaction message, handle checking from
server
+ )
def __str__(self):
return f"{ClientType.Name(self.client_type)}
client_id:{self.client_id}"
@@ -128,7 +150,8 @@ class Producer(Client):
topic_queue = self.__select_send_queue(message.topic)
if message.message_type not in topic_queue.accept_message_types:
raise IllegalArgumentException(
- f"current message type not match with queue accept message
types, topic:{message.topic}, message_type:{message.message_type}, queue access
type:{topic_queue.accept_message_types}")
+ f"current message type not match with queue accept message
types, topic:{message.topic}, message_type:{message.message_type}, queue access
type:{topic_queue.accept_message_types}"
+ )
if transaction is None:
try:
@@ -146,7 +169,9 @@ class Producer(Client):
except IllegalArgumentException as e:
raise e
except Exception as e:
- logger.error(f"send transaction message exception, topic:
{message.topic}, e: {e}")
+ logger.error(
+ f"send transaction message exception, topic:
{message.topic}, e: {e}"
+ )
raise e
def send_async(self, message: Message):
@@ -156,9 +181,11 @@ class Producer(Client):
self.__wrap_sending_message(message, False)
topic_queue = self.__select_send_queue(message.topic)
if message.message_type not in topic_queue.accept_message_types:
- raise IllegalArgumentException(f"current message type not match
with queue accept message types, "
- f"topic:{message.topic},
message_type:{message.message_type}, "
- f"queue access
type:{topic_queue.accept_message_types}")
+ raise IllegalArgumentException(
+ f"current message type not match with queue accept message
types, "
+ f"topic:{message.topic}, message_type:{message.message_type}, "
+ f"queue access type:{topic_queue.accept_message_types}"
+ )
try:
return self.__send_async(message, topic_queue)
@@ -170,7 +197,9 @@ class Producer(Client):
def begin_transaction(self):
if self.is_running is False:
- raise IllegalStateException("unable to begin transaction because
producer is not running")
+ raise IllegalStateException(
+ "unable to begin transaction because producer is not running"
+ )
if self.__checker is None:
raise IllegalArgumentException("Transaction checker should not be
null.")
@@ -178,22 +207,31 @@ class Producer(Client):
def end_transaction(self, endpoints, message, transaction_id, result,
source):
if self.is_running is False:
- raise IllegalStateException("unable to end transaction because
producer is not running")
+ raise IllegalStateException(
+ "unable to end transaction because producer is not running"
+ )
if self.__checker is None:
raise IllegalArgumentException("Transaction checker should not be
null.")
req = self.__end_transaction_req(message, transaction_id, result,
source)
- future = self.rpc_client.end_transaction_async(endpoints, req,
metadata=self._sign(),
-
timeout=self.client_configuration.request_timeout)
+ future = self.rpc_client.end_transaction_async(
+ endpoints,
+ req,
+ metadata=self._sign(),
+ timeout=self.client_configuration.request_timeout,
+ )
return future.result()
- async def on_recover_orphaned_transaction_command(self, endpoints, msg,
transaction_id):
+ async def on_recover_orphaned_transaction_command(
+ self, endpoints, msg, transaction_id
+ ):
# call this function from server side stream, in RpcClient._io_loop
try:
if self.is_running is False:
raise IllegalStateException(
- "unable to recover orphaned transaction command because
producer is not running")
+ "unable to recover orphaned transaction command because
producer is not running"
+ )
if self.__checker is None:
raise IllegalArgumentException("No transaction checker
registered.")
@@ -201,15 +239,25 @@ class Producer(Client):
result = self.__checker.check(message)
if result == TransactionResolution.COMMIT:
- res = await self.__commit_for_server_check(endpoints, message,
transaction_id,
-
TransactionSource.SOURCE_SERVER_CHECK)
+ res = await self.__commit_for_server_check(
+ endpoints,
+ message,
+ transaction_id,
+ TransactionSource.SOURCE_SERVER_CHECK,
+ )
logger.debug(
- f"commit message. message_id: {message.message_id},
transaction_id: {transaction_id}, res: {res}")
+ f"commit message. message_id: {message.message_id},
transaction_id: {transaction_id}, res: {res}"
+ )
elif result == TransactionResolution.ROLLBACK:
- res = await self.__rollback_for_server_check(endpoints,
message, transaction_id,
-
TransactionSource.SOURCE_SERVER_CHECK)
+ res = await self.__rollback_for_server_check(
+ endpoints,
+ message,
+ transaction_id,
+ TransactionSource.SOURCE_SERVER_CHECK,
+ )
logger.debug(
- f"rollback message. message_id: {message.message_id},
transaction_id: {transaction_id}, res: {res}")
+ f"rollback message. message_id: {message.message_id},
transaction_id: {transaction_id}, res: {res}"
+ )
except Exception as e:
logger.error(f"on_recover_orphaned_transaction_command exception:
{e}")
@@ -273,17 +321,35 @@ class Producer(Client):
def __send(self, message: Message, topic_queue, attempt=1) -> SendReceipt:
req = self.__send_req(message)
send_context = self.client_metrics.send_before(message.topic)
- send_message_future =
self.rpc_client.send_message_async(topic_queue.endpoints, req, self._sign())
- return self.__handle_sync_send_receipt(send_message_future, message,
topic_queue, attempt, send_context)
-
- def __handle_sync_send_receipt(self, send_message_future, message,
topic_queue, attempt, send_metric_context=None):
+ send_message_future = self.rpc_client.send_message_async(
+ topic_queue.endpoints,
+ req,
+ self._sign(),
+ timeout=self.client_configuration.request_timeout,
+ )
+ return self.__handle_sync_send_receipt(
+ send_message_future, message, topic_queue, attempt, send_context
+ )
+
+ def __handle_sync_send_receipt(
+ self,
+ send_message_future,
+ message,
+ topic_queue,
+ attempt,
+ send_metric_context=None,
+ ):
try:
- send_receipt =
self.__process_send_message_response(send_message_future, topic_queue)
+ send_receipt = self.__process_send_message_response(
+ send_message_future, topic_queue
+ )
self.client_metrics.send_after(send_metric_context, True)
return send_receipt
except Exception as e:
attempt += 1
- retry_exception_future =
self.__check_send_retry_condition(message, topic_queue, attempt, e)
+ retry_exception_future = self.__check_send_retry_condition(
+ message, topic_queue, attempt, e
+ )
if retry_exception_future is not None:
# end retry with exception
self.client_metrics.send_after(send_metric_context, False)
@@ -296,29 +362,55 @@ class Producer(Client):
def __send_async(self, message: Message, topic_queue, attempt=1,
ret_future=None):
req = self.__send_req(message)
send_context = self.client_metrics.send_before(message.topic)
- send_message_future =
self.rpc_client.send_message_async(topic_queue.endpoints, req, self._sign())
+ send_message_future = self.rpc_client.send_message_async(
+ topic_queue.endpoints,
+ req,
+ self._sign(),
+ timeout=self.client_configuration.request_timeout,
+ )
if ret_future is None:
ret_future = Future()
- handle_send_receipt_callback =
functools.partial(self.__handle_async_send_receipt, message=message,
-
topic_queue=topic_queue, attempt=attempt,
-
ret_future=ret_future, send_metric_context=send_context)
+ handle_send_receipt_callback = functools.partial(
+ self.__handle_async_send_receipt,
+ message=message,
+ topic_queue=topic_queue,
+ attempt=attempt,
+ ret_future=ret_future,
+ send_metric_context=send_context,
+ )
send_message_future.add_done_callback(handle_send_receipt_callback)
return ret_future
- def __handle_async_send_receipt(self, send_message_future, message,
topic_queue, attempt, ret_future,
- send_metric_context=None):
+ def __handle_async_send_receipt(
+ self,
+ send_message_future,
+ message,
+ topic_queue,
+ attempt,
+ ret_future,
+ send_metric_context=None,
+ ):
try:
- send_receipt =
self.__process_send_message_response(send_message_future, topic_queue)
+ send_receipt = self.__process_send_message_response(
+ send_message_future, topic_queue
+ )
self.client_metrics.send_after(send_metric_context, True)
-
self._set_future_callback_result(CallbackResult.async_send_callback_result(ret_future,
send_receipt))
+ self._set_future_callback_result(
+ CallbackResult.async_send_callback_result(ret_future,
send_receipt)
+ )
except Exception as e:
attempt += 1
- retry_exception_future =
self.__check_send_retry_condition(message, topic_queue, attempt, e)
+ retry_exception_future = self.__check_send_retry_condition(
+ message, topic_queue, attempt, e
+ )
if retry_exception_future is not None:
# end retry with exception
self.client_metrics.send_after(send_metric_context, False)
self._set_future_callback_result(
- CallbackResult.async_send_callback_result(ret_future,
retry_exception_future.exception(), False))
+ CallbackResult.async_send_callback_result(
+ ret_future, retry_exception_future.exception(), False
+ )
+ )
return
# resend message
topic_queue = self.__select_send_queue(message.topic)
@@ -328,28 +420,34 @@ class Producer(Client):
res = send_message_future.result()
MessagingResultChecker.check(res.status)
entries = res.entries
- assert len(
- entries) == 1, f"entries size error, the send response entries
size is {len(entries)}, {self.__str__()}"
+ assert (
+ len(entries) == 1
+ ), f"entries size error, the send response entries size is
{len(entries)}, {self.__str__()}"
entry = entries[0]
- return SendReceipt(entry.message_id, entry.transaction_id,
topic_queue, entry.offset)
+ return SendReceipt(
+ entry.message_id, entry.transaction_id, topic_queue, entry.offset
+ )
def __check_send_retry_condition(self, message, topic_queue, attempt, e):
end_retry = False
if attempt > Producer.MAX_SEND_ATTEMPTS:
logger.error(
- f"{self.__str__()} failed to send message to
{topic_queue.endpoints.__str__()}, because of run out of attempt times,
topic:{message.topic}, message_id:{message.message_id},
message_type:{message.message_type}, attempt:{attempt}")
+ f"{self.__str__()} failed to send message to
{topic_queue.endpoints.__str__()}, because of run out of attempt times,
topic:{message.topic}, message_id:{message.message_id},
message_type:{message.message_type}, attempt:{attempt}"
+ )
end_retry = True
# no need more attempts for transactional message
if message.message_type == MessageType.TRANSACTION:
logger.error(
- f"{self.__str__()} failed to send message to
{topic_queue.endpoints.__str__()}, topic:{message.topic},
message_id:{message.message_id}, message_type:{message.message_type}
,attempt:{attempt}")
+ f"{self.__str__()} failed to send message to
{topic_queue.endpoints.__str__()}, topic:{message.topic},
message_id:{message.message_id}, message_type:{message.message_type}
,attempt:{attempt}"
+ )
end_retry = True
# end retry if system busy
if isinstance(e, TooManyRequestsException):
logger.error(
- f"{self.__str__()} failed to send message to
{topic_queue.endpoints.__str__()}, because of to too many requests,
topic:{message.topic}, message_type:{message.message_type},
message_id:{message.message_id}, attempt:{attempt}")
+ f"{self.__str__()} failed to send message to
{topic_queue.endpoints.__str__()}, because of to too many requests,
topic:{message.topic}, message_type:{message.message_type},
message_id:{message.message_id}, attempt:{attempt}"
+ )
end_retry = True
if end_retry:
@@ -374,7 +472,8 @@ class Producer(Client):
max_body_size = 4 * 1024 * 1024 # max body size is 4m
if len(message.body) > max_body_size:
raise IllegalArgumentException(
- f"Message body size exceeds the threshold, max
size={max_body_size} bytes")
+ f"Message body size exceeds the threshold, max
size={max_body_size} bytes"
+ )
msg.body = message.body
if message.tag is not None:
@@ -391,13 +490,19 @@ class Producer(Client):
if message.message_group is not None:
msg.system_properties.message_group = message.message_group
if message.delivery_timestamp is not None:
- msg.system_properties.delivery_timestamp.seconds =
message.delivery_timestamp
+ msg.system_properties.delivery_timestamp.seconds = (
+ message.delivery_timestamp
+ )
return req
except Exception as e:
raise e
def __send_message_type(self, message: Message, is_transaction=False):
- if message.message_group is None and message.delivery_timestamp is
None and is_transaction is False:
+ if (
+ message.message_group is None
+ and message.delivery_timestamp is None
+ and is_transaction is False
+ ):
return MessageType.NORMAL
if message.message_group is not None and is_transaction is False:
@@ -406,21 +511,30 @@ class Producer(Client):
if message.delivery_timestamp is not None and is_transaction is False:
return MessageType.DELAY
- if message.message_group is None and message.delivery_timestamp is
None and is_transaction is True:
+ if (
+ message.message_group is None
+ and message.delivery_timestamp is None
+ and is_transaction is True
+ ):
return MessageType.TRANSACTION
# transaction semantics is conflicted with fifo/delay.
- logger.error(f"{self.__str__()} set send message type exception,
message: {str(message)}")
- raise IllegalArgumentException("transactional message should not set
messageGroup or deliveryTimestamp")
+ logger.error(
+ f"{self.__str__()} set send message type exception, message:
{str(message)}"
+ )
+ raise IllegalArgumentException(
+ "transactional message should not set messageGroup or
deliveryTimestamp"
+ )
def __select_send_queue(self, topic):
try:
route = self._retrieve_topic_route_data(topic)
- queue_selector = self.__send_queue_selectors.put_if_absent(topic,
-
QueueSelector.producer_queue_selector(route))
+ queue_selector = self.__send_queue_selectors.put_if_absent(
+ topic, QueueSelector.producer_queue_selector(route)
+ )
return queue_selector.select_next_queue()
except Exception as e:
- logger.error(f"producer select topic:{topic} queue index
exception, {e}")
+ logger.error(f"producer select topic:{topic} queue raise
exception, {e}")
raise e
def __end_transaction_req(self, message: Message, transaction_id, result,
source):
@@ -433,15 +547,27 @@ class Producer(Client):
req.source = source
return req
- def __commit_for_server_check(self, endpoints, message: Message,
transaction_id, source):
- return self.__end_transaction_for_server_check(endpoints, message,
transaction_id, TransactionResolution.COMMIT,
- source)
-
- def __rollback_for_server_check(self, endpoints, message: Message,
transaction_id, source):
- return self.__end_transaction_for_server_check(endpoints, message,
transaction_id,
-
TransactionResolution.ROLLBACK, source)
-
- def __end_transaction_for_server_check(self, endpoints, message: Message,
transaction_id, result, source):
+ def __commit_for_server_check(
+ self, endpoints, message: Message, transaction_id, source
+ ):
+ return self.__end_transaction_for_server_check(
+ endpoints, message, transaction_id, TransactionResolution.COMMIT,
source
+ )
+
+ def __rollback_for_server_check(
+ self, endpoints, message: Message, transaction_id, source
+ ):
+ return self.__end_transaction_for_server_check(
+ endpoints, message, transaction_id,
TransactionResolution.ROLLBACK, source
+ )
+
+ def __end_transaction_for_server_check(
+ self, endpoints, message: Message, transaction_id, result, source
+ ):
req = self.__end_transaction_req(message, transaction_id, result,
source)
- return self.rpc_client.end_transaction_for_server_check(endpoints,
req, metadata=self._sign(),
-
timeout=self.client_configuration.request_timeout)
+ return self.rpc_client.end_transaction_for_server_check(
+ endpoints,
+ req,
+ metadata=self._sign(),
+ timeout=self.client_configuration.request_timeout,
+ )
diff --git a/python/rocketmq/v5/test/test_base.py
b/python/rocketmq/v5/test/test_base.py
index aa49fafc..fd315eee 100644
--- a/python/rocketmq/v5/test/test_base.py
+++ b/python/rocketmq/v5/test/test_base.py
@@ -33,7 +33,7 @@ class TestBase:
FAKE_CLIENT_ID = ClientId()
FAKE_TOPIC_0 = "foo-bar-topic-0"
FAKE_TOPIC_1 = "foo-bar-topic-1"
- FAKE_MESSAGE_BODY = "foobar".encode('utf-8')
+ FAKE_MESSAGE_BODY = "foobar".encode("utf-8")
FAKE_TAG_0 = "foo-bar-tag-0"
FAKE_BROKER_NAME_0 = "foo-bar-broker-name-0"
FAKE_BROKER_NAME_1 = "foo-bar-broker-name-1"
@@ -49,7 +49,9 @@ class TestBase:
@staticmethod
def fake_client_config():
credentials = Credentials(TestBase.FAKE_AK, TestBase.FAKE_SK)
- config = ClientConfiguration(TestBase.FAKE_ENDPOINTS, credentials,
TestBase.FAKE_NAMESPACE)
+ config = ClientConfiguration(
+ TestBase.FAKE_ENDPOINTS, credentials, TestBase.FAKE_NAMESPACE
+ )
return config
@staticmethod
@@ -75,7 +77,9 @@ class TestBase:
msg.body = TestBase.FAKE_MESSAGE_BODY
msg.system_properties.born_host = TestBase.FAKE_HOST_0
msg.system_properties.born_timestamp.seconds = int(time.time() * 1000)
- msg.system_properties.delivery_timestamp.seconds =
msg.system_properties.born_timestamp.seconds - 10
+ msg.system_properties.delivery_timestamp.seconds = (
+ msg.system_properties.born_timestamp.seconds - 10
+ )
msg.system_properties.message_type = 1
msg.system_properties.body_encoding = 1
return msg
@@ -103,7 +107,13 @@ class TestBase:
fake_queue.broker.CopyFrom(fake_broker)
fake_queue.permission = Permission.READ_WRITE
fake_queue.accept_message_types.extend(
- (MessageType.NORMAL, MessageType.FIFO, MessageType.DELAY,
MessageType.TRANSACTION))
+ (
+ MessageType.NORMAL,
+ MessageType.FIFO,
+ MessageType.DELAY,
+ MessageType.TRANSACTION,
+ )
+ )
return MessageQueue(fake_queue)
@staticmethod
@@ -135,7 +145,12 @@ class TestBase:
fake_response = TestBase.fake_send_success_response()
fake_message_queue = TestBase.fake_queue(topic)
fake_entry = fake_response.entries[0]
- return SendReceipt(fake_entry.message_id, fake_entry.transaction_id,
fake_message_queue, fake_entry.offset)
+ return SendReceipt(
+ fake_entry.message_id,
+ fake_entry.transaction_id,
+ fake_message_queue,
+ fake_entry.offset,
+ )
@staticmethod
def fake_receive_receipt():
diff --git a/python/rocketmq/v5/test/test_consumer.py
b/python/rocketmq/v5/test/test_consumer.py
index 47db493a..8bd107cb 100644
--- a/python/rocketmq/v5/test/test_consumer.py
+++ b/python/rocketmq/v5/test/test_consumer.py
@@ -26,24 +26,40 @@ from rocketmq.v5.test import TestBase
class TestNormalConsumer(unittest.TestCase):
- @patch.object(Message, '_Message__message_body_check_sum')
- @patch.object(SimpleConsumer, '_SimpleConsumer__receive_message_response')
- @patch.object(RpcClient, 'receive_message_async')
- @patch.object(SimpleConsumer, '_SimpleConsumer__select_topic_queue',
- return_value=TestBase.fake_queue(TestBase.FAKE_TOPIC_0))
- @patch.object(SimpleConsumer, '_SimpleConsumer__select_topic_for_receive',
return_value=TestBase.FAKE_TOPIC_0)
- @patch.object(Client, '_Client__start_scheduler', return_value=None)
- @patch.object(Client, '_Client__update_topic_route', return_value=None)
- def test_receive(self, mock_update_topic_route, mock_start_scheduler,
mock_select_topic_for_receive,
- mock_select_topic_queue, mock_receive_message_async,
mock_receive_message_response,
- mock_message_body_check_sum):
+ @patch.object(Message, "_Message__message_body_check_sum")
+ @patch.object(SimpleConsumer, "_SimpleConsumer__receive_message_response")
+ @patch.object(RpcClient, "receive_message_async")
+ @patch.object(
+ SimpleConsumer,
+ "_SimpleConsumer__select_topic_queue",
+ return_value=TestBase.fake_queue(TestBase.FAKE_TOPIC_0),
+ )
+ @patch.object(
+ SimpleConsumer,
+ "_SimpleConsumer__select_topic_for_receive",
+ return_value=TestBase.FAKE_TOPIC_0,
+ )
+ @patch.object(Client, "_Client__start_scheduler", return_value=None)
+ @patch.object(Client, "_Client__update_topic_route", return_value=None)
+ def test_receive(
+ self,
+ mock_update_topic_route,
+ mock_start_scheduler,
+ mock_select_topic_for_receive,
+ mock_select_topic_queue,
+ mock_receive_message_async,
+ mock_receive_message_response,
+ mock_message_body_check_sum,
+ ):
future = Future()
future.set_result(list())
mock_receive_message_async.return_value = future
mock_receive_message_response.return_value =
TestBase.fake_receive_receipt()
subs = {TestBase.FAKE_TOPIC_0: FilterExpression()}
- consumer = SimpleConsumer(TestBase.fake_client_config(),
TestBase.FAKE_CONSUMER_GROUP_0, subs)
+ consumer = SimpleConsumer(
+ TestBase.fake_client_config(), TestBase.FAKE_CONSUMER_GROUP_0, subs
+ )
consumer.startup()
messages = consumer.receive(32, 10)
self.assertIsInstance(messages[0], Message)
diff --git a/python/rocketmq/v5/test/test_producer.py
b/python/rocketmq/v5/test/test_producer.py
index 4dc73d29..e85864c1 100644
--- a/python/rocketmq/v5/test/test_producer.py
+++ b/python/rocketmq/v5/test/test_producer.py
@@ -26,10 +26,16 @@ from rocketmq.v5.test import TestBase
class TestNormalProducer(unittest.TestCase):
- @patch.object(Producer, '_Producer__select_send_queue',
return_value=TestBase.fake_queue(TestBase.FAKE_TOPIC_0))
- @patch.object(RpcClient, 'send_message_async')
- @patch.object(Client, '_Client__start_scheduler', return_value=None)
- def test_send(self, mock_start_scheduler, mock_send_message_async,
mock_select_send_queue):
+ @patch.object(
+ Producer,
+ "_Producer__select_send_queue",
+ return_value=TestBase.fake_queue(TestBase.FAKE_TOPIC_0),
+ )
+ @patch.object(RpcClient, "send_message_async")
+ @patch.object(Client, "_Client__start_scheduler", return_value=None)
+ def test_send(
+ self, mock_start_scheduler, mock_send_message_async,
mock_select_send_queue
+ ):
# mock send_message_async return future
future = Future()
future.set_result(TestBase.fake_send_success_response())
diff --git a/python/rocketmq/v5/util/client_id.py
b/python/rocketmq/v5/util/client_id.py
index 44ef02f0..04054b2c 100644
--- a/python/rocketmq/v5/util/client_id.py
+++ b/python/rocketmq/v5/util/client_id.py
@@ -30,7 +30,15 @@ class ClientId:
host_name = gethostname()
process_id = getpid()
base36_time = Misc.to_base36(time_ns())
- self.__client_id = (host_name + ClientId.CLIENT_ID_SEPARATOR +
str(process_id) + ClientId.CLIENT_ID_SEPARATOR + str(self.__client_index) +
ClientId.CLIENT_ID_SEPARATOR + base36_time)
+ self.__client_id = (
+ host_name
+ + ClientId.CLIENT_ID_SEPARATOR
+ + str(process_id)
+ + ClientId.CLIENT_ID_SEPARATOR
+ + str(self.__client_index)
+ + ClientId.CLIENT_ID_SEPARATOR
+ + base36_time
+ )
@property
def client_id(self):
diff --git a/python/rocketmq/v5/util/message_id_codec.py
b/python/rocketmq/v5/util/message_id_codec.py
index eec4f94a..a0b83b4c 100644
--- a/python/rocketmq/v5/util/message_id_codec.py
+++ b/python/rocketmq/v5/util/message_id_codec.py
@@ -74,17 +74,21 @@ class MessageIdCodec:
def __init__(self):
with MessageIdCodec._instance_lock:
- if not hasattr(self, 'initialized'):
+ if not hasattr(self, "initialized"):
buffer = bytearray(8)
- mac = getnode().to_bytes(6, byteorder='big')
+ mac = getnode().to_bytes(6, byteorder="big")
buffer[0:6] = mac
pid = getpid()
pid_buffer = bytearray(4)
- pid_buffer[0:4] = pid.to_bytes(4, byteorder='big')
+ pid_buffer[0:4] = pid.to_bytes(4, byteorder="big")
buffer[6:8] = pid_buffer[2:4]
self.process_fixed_string_v1 = buffer.hex().upper()
self.seconds_since_custom_epoch = int(
- (datetime.now(timezone.utc) - datetime(2021, 1, 1, 0, 0,
0, tzinfo=timezone.utc)).total_seconds())
+ (
+ datetime.now(timezone.utc)
+ - datetime(2021, 1, 1, 0, 0, 0, tzinfo=timezone.utc)
+ ).total_seconds()
+ )
self.seconds_start_timestamp = int(time())
self.seconds = self.__delta_time()
self.sequence = None
@@ -95,9 +99,13 @@ class MessageIdCodec:
if self.seconds != delta_seconds:
self.seconds = delta_seconds
buffer = bytearray(8)
- buffer[0:4] = self.seconds.to_bytes(8, byteorder='big')[4:8]
- buffer[4:8] = self.__sequence_id().to_bytes(4, byteorder='big')
- return MessageIdCodec.MESSAGE_ID_VERSION_V1 +
self.process_fixed_string_v1 + buffer.hex().upper()
+ buffer[0:4] = self.seconds.to_bytes(8, byteorder="big")[4:8]
+ buffer[4:8] = self.__sequence_id().to_bytes(4, byteorder="big")
+ return (
+ MessageIdCodec.MESSAGE_ID_VERSION_V1
+ + self.process_fixed_string_v1
+ + buffer.hex().upper()
+ )
@staticmethod
def decode(message_id):
@@ -106,7 +114,9 @@ class MessageIdCodec:
""" private """
def __delta_time(self):
- return int(time()) - self.seconds_start_timestamp +
self.seconds_since_custom_epoch
+ return (
+ int(time()) - self.seconds_start_timestamp +
self.seconds_since_custom_epoch
+ )
def __sequence_id(self):
self.sequence = MessageIdCodec.__index.get_and_increment()
diff --git a/python/rocketmq/v5/util/messaging_result_checker.py
b/python/rocketmq/v5/util/messaging_result_checker.py
index f8bc1ee0..4091397a 100644
--- a/python/rocketmq/v5/util/messaging_result_checker.py
+++ b/python/rocketmq/v5/util/messaging_result_checker.py
@@ -34,12 +34,27 @@ class MessagingResultChecker:
if code == Code.OK or code == Code.MULTIPLE_RESULTS:
return
- elif code == Code.BAD_REQUEST or code == Code.ILLEGAL_ACCESS_POINT or
code == Code.ILLEGAL_TOPIC \
- or code == Code.ILLEGAL_CONSUMER_GROUP or code ==
Code.ILLEGAL_MESSAGE_TAG or code == Code.ILLEGAL_MESSAGE_KEY \
- or code == Code.ILLEGAL_MESSAGE_GROUP or code ==
Code.ILLEGAL_MESSAGE_PROPERTY_KEY or code == Code.INVALID_TRANSACTION_ID \
- or code == Code.ILLEGAL_MESSAGE_ID or code ==
Code.ILLEGAL_FILTER_EXPRESSION or code == Code.ILLEGAL_INVISIBLE_TIME \
- or code == Code.ILLEGAL_DELIVERY_TIME or code ==
Code.INVALID_RECEIPT_HANDLE or code == Code.MESSAGE_PROPERTY_CONFLICT_WITH_TYPE
\
- or code == Code.UNRECOGNIZED_CLIENT_TYPE or code ==
Code.MESSAGE_CORRUPTED or code == Code.CLIENT_ID_REQUIRED or code ==
Code.ILLEGAL_POLLING_TIME:
+ elif (
+ code == Code.BAD_REQUEST
+ or code == Code.ILLEGAL_ACCESS_POINT
+ or code == Code.ILLEGAL_TOPIC
+ or code == Code.ILLEGAL_CONSUMER_GROUP
+ or code == Code.ILLEGAL_MESSAGE_TAG
+ or code == Code.ILLEGAL_MESSAGE_KEY
+ or code == Code.ILLEGAL_MESSAGE_GROUP
+ or code == Code.ILLEGAL_MESSAGE_PROPERTY_KEY
+ or code == Code.INVALID_TRANSACTION_ID
+ or code == Code.ILLEGAL_MESSAGE_ID
+ or code == Code.ILLEGAL_FILTER_EXPRESSION
+ or code == Code.ILLEGAL_INVISIBLE_TIME
+ or code == Code.ILLEGAL_DELIVERY_TIME
+ or code == Code.INVALID_RECEIPT_HANDLE
+ or code == Code.MESSAGE_PROPERTY_CONFLICT_WITH_TYPE
+ or code == Code.UNRECOGNIZED_CLIENT_TYPE
+ or code == Code.MESSAGE_CORRUPTED
+ or code == Code.CLIENT_ID_REQUIRED
+ or code == Code.ILLEGAL_POLLING_TIME
+ ):
raise BadRequestException(message, code)
elif code == Code.UNAUTHORIZED:
raise UnauthorizedException(message, code)
@@ -49,19 +64,38 @@ class MessagingResultChecker:
raise ForbiddenException(message, code)
elif code == Code.MESSAGE_NOT_FOUND:
return
- elif code == Code.NOT_FOUND or code == Code.TOPIC_NOT_FOUND or code ==
Code.CONSUMER_GROUP_NOT_FOUND:
+ elif (
+ code == Code.NOT_FOUND
+ or code == Code.TOPIC_NOT_FOUND
+ or code == Code.CONSUMER_GROUP_NOT_FOUND
+ ):
raise NotFoundException(message, code)
elif code == Code.PAYLOAD_TOO_LARGE or code ==
Code.MESSAGE_BODY_TOO_LARGE:
raise PayloadTooLargeException(message, code)
elif code == Code.TOO_MANY_REQUESTS:
raise TooManyRequestsException(message, code)
- elif code == Code.REQUEST_HEADER_FIELDS_TOO_LARGE or code ==
Code.MESSAGE_PROPERTIES_TOO_LARGE:
+ elif (
+ code == Code.REQUEST_HEADER_FIELDS_TOO_LARGE
+ or code == Code.MESSAGE_PROPERTIES_TOO_LARGE
+ ):
raise RequestHeaderFieldsTooLargeException(message, code)
- elif code == Code.INTERNAL_ERROR or code == Code.INTERNAL_SERVER_ERROR
or code == Code.HA_NOT_AVAILABLE:
+ elif (
+ code == Code.INTERNAL_ERROR
+ or code == Code.INTERNAL_SERVER_ERROR
+ or code == Code.HA_NOT_AVAILABLE
+ ):
raise InternalErrorException(message, code)
- elif code == Code.PROXY_TIMEOUT or code ==
Code.MASTER_PERSISTENCE_TIMEOUT or code == Code.SLAVE_PERSISTENCE_TIMEOUT:
+ elif (
+ code == Code.PROXY_TIMEOUT
+ or code == Code.MASTER_PERSISTENCE_TIMEOUT
+ or code == Code.SLAVE_PERSISTENCE_TIMEOUT
+ ):
raise ProxyTimeoutException(message, code)
- elif code == Code.UNSUPPORTED or code == Code.VERSION_UNSUPPORTED or
code == Code.VERIFY_FIFO_MESSAGE_UNSUPPORTED:
+ elif (
+ code == Code.UNSUPPORTED
+ or code == Code.VERSION_UNSUPPORTED
+ or code == Code.VERIFY_FIFO_MESSAGE_UNSUPPORTED
+ ):
raise UnsupportedException(message, code)
else:
logger.warn(f"unrecognized status code:{code}, message:{message}")
diff --git a/python/rocketmq/v5/util/misc.py b/python/rocketmq/v5/util/misc.py
index d371625f..b021b690 100644
--- a/python/rocketmq/v5/util/misc.py
+++ b/python/rocketmq/v5/util/misc.py
@@ -27,9 +27,9 @@ from rocketmq.v5.log import logger
class Misc:
__LOCAL_IP = None
__OS_NAME = None
- TOPIC_PATTERN = compile(r'^[%a-zA-Z0-9_-]+$')
- CONSUMER_GROUP_PATTERN = compile(r'^[%a-zA-Z0-9_-]+$')
- SDK_VERSION = "5.0.2"
+ TOPIC_PATTERN = compile(r"^[%a-zA-Z0-9_-]+$")
+ CONSUMER_GROUP_PATTERN = compile(r"^[%a-zA-Z0-9_-]+$")
+ SDK_VERSION = "5.0.3"
@staticmethod
def sdk_language():
@@ -41,33 +41,33 @@ class Misc:
@staticmethod
def to_base36(n):
- chars = '0123456789abcdefghijklmnopqrstuvwxyz'
+ chars = "0123456789abcdefghijklmnopqrstuvwxyz"
result = []
if n == 0:
- return '0'
+ return "0"
while n > 0:
n, r = divmod(n, 36)
result.append(chars[r])
- return ''.join(reversed(result))
+ return "".join(reversed(result))
@staticmethod
def get_local_ip():
if Misc.__LOCAL_IP is None:
s = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
try:
- s.connect(('8.8.8.8', 80))
+ s.connect(("8.8.8.8", 80))
Misc.__LOCAL_IP = s.getsockname()[0]
except Exception as e:
logger.error(f"get local ip exception: {e}")
- return '127.0.0.1'
+ return "127.0.0.1"
finally:
s.close()
return Misc.__LOCAL_IP
@staticmethod
def crc32_checksum(array):
- crc32_value = zlib.crc32(array) & 0xffffffff
- return format(crc32_value, '08X')
+ crc32_value = zlib.crc32(array) & 0xFFFFFFFF
+ return format(crc32_value, "08X")
@staticmethod
def md5_checksum(array):
@@ -83,7 +83,7 @@ class Misc:
@staticmethod
def uncompress_bytes_gzip(body):
- if body and body[:2] == b'\x1f\x8b':
+ if body and body[:2] == b"\x1f\x8b":
body = gzip.decompress(body) # Standard Gzip format
else:
body = zlib.decompress(body) # deflate zip
diff --git a/python/rocketmq/v5/util/signature.py
b/python/rocketmq/v5/util/signature.py
index a23dd26b..f5983da7 100644
--- a/python/rocketmq/v5/util/signature.py
+++ b/python/rocketmq/v5/util/signature.py
@@ -30,19 +30,21 @@ class Signature:
formatted_date_time = now.strftime("%Y%m%dT%H%M%SZ")
request_id = str(uuid4())
sign = Signature.sign(config.credentials.sk, formatted_date_time)
- authorization = "MQv2-HMAC-SHA1" \
- + " " \
- + "Credential" \
- + "=" \
- + config.credentials.ak \
- + ", " \
- + "SignedHeaders" \
- + "=" \
- + "x-mq-date-time" \
- + ", " \
- + "Signature" \
- + "=" \
- + sign
+ authorization = (
+ "MQv2-HMAC-SHA1"
+ + " "
+ + "Credential"
+ + "="
+ + config.credentials.ak
+ + ", "
+ + "SignedHeaders"
+ + "="
+ + "x-mq-date-time"
+ + ", "
+ + "Signature"
+ + "="
+ + sign
+ )
metadata = [
("x-mq-language", "PYTHON"),
("x-mq-protocol", "GRPC_V2"),
@@ -51,12 +53,12 @@ class Signature:
("x-mq-request-id", request_id),
("x-mq-client-id", client_id),
("x-mq-namespace", config.namespace),
- ("authorization", authorization)
+ ("authorization", authorization),
]
return metadata
@staticmethod
def sign(access_secret, date_time):
- signing_key = access_secret.encode('utf-8')
- mac = new(signing_key, date_time.encode('utf-8'), sha1)
- return hexlify(mac.digest()).decode('utf-8')
+ signing_key = access_secret.encode("utf-8")
+ mac = new(signing_key, date_time.encode("utf-8"), sha1)
+ return hexlify(mac.digest()).decode("utf-8")
diff --git a/python/setup.py b/python/setup.py
index 6e8453f7..3881d6d7 100644
--- a/python/setup.py
+++ b/python/setup.py
@@ -17,7 +17,7 @@ from setuptools import find_packages, setup
setup(
name='rocketmq-python-client',
- version='5.0.2',
+ version='5.0.3',
packages=find_packages(),
install_requires=[
"grpcio>=1.5.0",