This is an automated email from the ASF dual-hosted git repository.
lollipop pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/rocketmq-clients.git
The following commit(s) were added to refs/heads/master by this push:
new f3c288e3 5.0.4 (#950) transaction_check max workers
f3c288e3 is described below
commit f3c288e3816a407a62f4bc12f794bc33d56b9657
Author: zhouli11 <[email protected]>
AuthorDate: Tue Mar 4 11:07:18 2025 +0800
5.0.4 (#950) transaction_check max workers
* transaction_check max workers
---
python/example/async_producer_example.py | 10 ++-
python/example/async_simple_consumer_example.py | 14 +++-
python/example/transaction_producer_example.py | 18 ++++-
python/rocketmq/v5/client/client.py | 76 ++++++------------
.../rocketmq/v5/client/connection/rpc_channel.py | 2 +-
python/rocketmq/v5/client/connection/rpc_client.py | 14 ----
python/rocketmq/v5/consumer/simple_consumer.py | 19 ++---
python/rocketmq/v5/log/log_config.py | 2 +-
python/rocketmq/v5/model/message.py | 4 +
python/rocketmq/v5/producer/producer.py | 91 +++++++++-------------
python/rocketmq/v5/util/misc.py | 2 +-
python/setup.py | 2 +-
12 files changed, 109 insertions(+), 145 deletions(-)
diff --git a/python/example/async_producer_example.py
b/python/example/async_producer_example.py
index 8f3c986e..fd5e5a08 100644
--- a/python/example/async_producer_example.py
+++ b/python/example/async_producer_example.py
@@ -18,10 +18,11 @@ from rocketmq import ClientConfiguration, Credentials,
Message, Producer
def handle_send_result(result_future):
try:
+ # don't write time-consuming code in the callback. if needed, use
other thread
res = result_future.result()
- print(f"async send message success, {res}")
+ print(f"send message success, {res}")
except Exception as exception:
- print(f"async send message failed, raise exception: {exception}")
+ print(f"send message failed, raise exception: {exception}")
if __name__ == '__main__':
@@ -46,8 +47,11 @@ if __name__ == '__main__':
send_result_future = producer.send_async(msg)
send_result_future.add_done_callback(handle_send_result)
except Exception as e:
- print(f"async producer{producer.__str__()} send message raise
exception: {e}")
+ print(f"producer{producer.__str__()} send message raise exception:
{e}")
producer.shutdown()
except Exception as e:
print(f"{producer.__str__()} startup raise exception: {e}")
producer.shutdown()
+
+ input("Please Enter to Stop the Application.")
+ producer.shutdown()
diff --git a/python/example/async_simple_consumer_example.py
b/python/example/async_simple_consumer_example.py
index eed0630d..5b341d5d 100644
--- a/python/example/async_simple_consumer_example.py
+++ b/python/example/async_simple_consumer_example.py
@@ -15,17 +15,27 @@
import functools
import time
+from concurrent.futures.thread import ThreadPoolExecutor
from rocketmq import ClientConfiguration, Credentials, SimpleConsumer
+consume_executor = ThreadPoolExecutor(max_workers=2,
thread_name_prefix="consume-message")
+
+
+def consume_message(consumer, message):
+ try:
+ consumer.ack(message)
+ print(f"ack message:{message.message_id}.")
+ except Exception as exception:
+ print(f"consume message raise exception: {exception}")
+
def receive_callback(receive_result_future, consumer):
messages = receive_result_future.result()
print(f"{consumer.__str__()} receive {len(messages)} messages.")
for msg in messages:
try:
- consumer.ack(msg)
- print(f"ack message:{msg.message_id}.")
+ consume_executor.submit(consume_message, consumer=consumer,
message=msg)
except Exception as exception:
print(f"receive message raise exception: {exception}")
diff --git a/python/example/transaction_producer_example.py
b/python/example/transaction_producer_example.py
index c922f2e7..fa0abdd3 100644
--- a/python/example/transaction_producer_example.py
+++ b/python/example/transaction_producer_example.py
@@ -20,7 +20,7 @@ from rocketmq import (ClientConfiguration, Credentials,
Message, Producer,
class TestChecker(TransactionChecker):
def check(self, message: Message) -> TransactionResolution:
- print(f"do TestChecker check. message_id: {message.message_id}, commit
message.")
+ print(f"do TestChecker check, topic:{message.topic}, message_id:
{message.message_id}, commit message.")
return TransactionResolution.COMMIT
@@ -31,7 +31,8 @@ if __name__ == '__main__':
# credentials = Credentials("ak", "sk")
config = ClientConfiguration(endpoints, credentials)
topic = "topic"
- producer = Producer(config, (topic,))
+ check_from_server = True # commit message from server check
+ producer = Producer(config, (topic,), checker=TestChecker())
try:
producer.startup()
@@ -47,6 +48,17 @@ if __name__ == '__main__':
msg.keys = "send_transaction"
msg.add_property("send", "transaction")
res = producer.send(msg, transaction)
- print(f"transaction producer{producer.__str__()} send half message
success. {res}")
+ print(f"send message: {res}")
+ if check_from_server:
+ input("Please Enter to Stop the Application.\r\n")
+ producer.shutdown()
+ else:
+ # producer directly commit or rollback
+ transaction.commit()
+ print(f"producer commit message:{transaction.message_id}")
+ # transaction.rollback()
+ # print(f"producer rollback message:{transaction.message_id}")
+ producer.shutdown()
except Exception as e:
print(f"transaction producer{producer.__str__()} example raise
exception: {e}")
+ producer.shutdown()
diff --git a/python/rocketmq/v5/client/client.py
b/python/rocketmq/v5/client/client.py
index b6c40f93..d039a9a0 100644
--- a/python/rocketmq/v5/client/client.py
+++ b/python/rocketmq/v5/client/client.py
@@ -15,9 +15,10 @@
import asyncio
import functools
+import os
import threading
from asyncio import InvalidStateError
-from queue import Queue
+from concurrent.futures import ThreadPoolExecutor
from grpc.aio import AioRpcError
from rocketmq.grpc_protocol import ClientType, Code, QueryRouteRequest
@@ -26,16 +27,13 @@ from rocketmq.v5.client.metrics import ClientMetrics
from rocketmq.v5.exception import (IllegalArgumentException,
IllegalStateException)
from rocketmq.v5.log import logger
-from rocketmq.v5.model import (CallbackResult, CallbackResultType,
- TopicRouteData)
+from rocketmq.v5.model import TopicRouteData
from rocketmq.v5.util import (ClientId, ConcurrentMap, MessagingResultChecker,
Misc, Signature)
class Client:
- CALLBACK_THREADS_NUM = 5
-
def __init__(
self, client_configuration, topics, client_type: ClientType,
tls_enable=False
):
@@ -62,8 +60,7 @@ class Client:
)
else:
self.__topics = set()
- self.__callback_result_queue = Queue()
- self.__callback_threads = []
+ self.__client_callback_executor = None
self.__is_running = False
self.__client_thread_task_enabled = False
self.__had_shutdown = False
@@ -268,45 +265,23 @@ class Client:
""" callback handler for async method """
def __start_async_rpc_callback_handler(self):
- # a thread to handle callback when using async method such as
send_async(), receive_async().
- # this handler switches user's callback thread from RpcClient's
_io_loop_thread to client's callback_handler_thread
+ # to handle callback when using async method such as send_async(),
receive_async().
+ # switches user's callback thread from RpcClient's _io_loop_thread to
client's client_callback_worker_thread
try:
- for i in range(Client.CALLBACK_THREADS_NUM):
- th = threading.Thread(
- name=f"callback_handler_thread-{i}",
target=self.__handle_callback
- )
- th.daemon = True
- self.__callback_threads.append(th)
- th.start()
- logger.info(
- f"{self.__str__()} start async rpc callback thread:{th}
success."
- )
+ workers = os.cpu_count()
+ self.__client_callback_executor =
ThreadPoolExecutor(max_workers=workers,
+
thread_name_prefix=f"client_callback_worker-{self.__client_id}")
+ logger.info(f"{self.__str__()} start callback executor success.
max_workers:{workers}")
except Exception as e:
print(f"{self.__str__()} start async rpc callback raise exception:
{e}")
raise e
- def __handle_callback(self):
- while True:
- if self.__client_thread_task_enabled is True:
- callback_result = self.__callback_result_queue.get()
- if (
- callback_result.result_type
- == CallbackResultType.END_CALLBACK_THREAD_RESULT
- ):
- # end infinite loop when client shutdown
- self.__callback_result_queue.task_done()
- break
- else:
- if callback_result.is_success:
-
callback_result.future.set_result(callback_result.result)
- else:
-
callback_result.future.set_exception(callback_result.result)
- self.__callback_result_queue.task_done()
- else:
- break
- logger.info(
- f"{self.__str__()} stop client callback result handler
thread:{threading.current_thread()} success."
- )
+ @staticmethod
+ def __handle_callback(callback_result):
+ if callback_result.is_success:
+ callback_result.future.set_result(callback_result.result)
+ else:
+ callback_result.future.set_exception(callback_result.result)
""" protect """
@@ -330,13 +305,12 @@ class Client:
def _sign(self):
return Signature.metadata(self.__client_configuration,
self.__client_id)
- def _set_future_callback_result(self, callback_result):
- if self.__callback_result_queue is not None:
- self.__callback_result_queue.put_nowait(callback_result)
-
def _rpc_channel_io_loop(self):
return self.__rpc_client.get_channel_io_loop()
+ def _submit_callback(self, callback_result):
+ self.__client_callback_executor.submit(Client.__handle_callback,
callback_result)
+
""" private """
# topic route #
@@ -555,13 +529,8 @@ class Client:
self.__clear_idle_rpc_channels_threading_event.set()
self.__clear_idle_rpc_channels_scheduler.join()
- for i in range(Client.CALLBACK_THREADS_NUM):
- self._set_future_callback_result(
- CallbackResult.end_callback_thread_result()
- )
-
- for i in range(Client.CALLBACK_THREADS_NUM):
- self.__callback_threads[i].join()
+ if self.__client_callback_executor is not None:
+ self.__client_callback_executor.shutdown()
self.__topic_route_scheduler = None
self.__topic_route_scheduler_threading_event = None
@@ -571,8 +540,7 @@ class Client:
self.__sync_setting_scheduler_threading_event = None
self.__clear_idle_rpc_channels_scheduler = None
self.__clear_idle_rpc_channels_threading_event = None
- self.__callback_result_queue = None
- self.__callback_threads = None
+ self.__client_callback_executor = None
""" property """
diff --git a/python/rocketmq/v5/client/connection/rpc_channel.py
b/python/rocketmq/v5/client/connection/rpc_channel.py
index 246e102b..1783026e 100644
--- a/python/rocketmq/v5/client/connection/rpc_channel.py
+++ b/python/rocketmq/v5/client/connection/rpc_channel.py
@@ -146,7 +146,7 @@ class RpcStreamStreamCall:
res.recover_orphaned_transaction_command.transaction_id
)
message =
res.recover_orphaned_transaction_command.message
- await
self.__handler.on_recover_orphaned_transaction_command(
+
self.__handler.on_recover_orphaned_transaction_command(
self.__endpoints, message, transaction_id
)
except AioRpcError as e:
diff --git a/python/rocketmq/v5/client/connection/rpc_client.py
b/python/rocketmq/v5/client/connection/rpc_client.py
index 77e6473c..92b0fafb 100644
--- a/python/rocketmq/v5/client/connection/rpc_client.py
+++ b/python/rocketmq/v5/client/connection/rpc_client.py
@@ -179,20 +179,6 @@ class RpcClient:
)
)
- def end_transaction_for_server_check(
- self, endpoints: RpcEndpoints, req: EndTransactionRequest, metadata,
timeout=3
- ):
- # assert asyncio.get_running_loop() == RpcClient._io_loop
- try:
- return self.__end_transaction_0(
- endpoints, req, metadata=metadata, timeout=timeout
- )
- except Exception as e:
- logger.error(
- f"end transaction exception, topic:{req.topic.name},
message_id:{req.message_id}, transaction_id:{req.transaction_id}: {e}"
- )
- raise e
-
""" build stream_stream_call """
def telemetry_stream(
diff --git a/python/rocketmq/v5/consumer/simple_consumer.py
b/python/rocketmq/v5/consumer/simple_consumer.py
index 6b1483db..638416d6 100644
--- a/python/rocketmq/v5/consumer/simple_consumer.py
+++ b/python/rocketmq/v5/consumer/simple_consumer.py
@@ -309,11 +309,11 @@ class SimpleConsumer(Client):
try:
responses = future.result()
messages = self.__handle_receive_message_response(responses)
- self._set_future_callback_result(
+ self._submit_callback(
CallbackResult.async_receive_callback_result(ret_future,
messages)
)
except Exception as e:
- self._set_future_callback_result(
+ self._submit_callback(
CallbackResult.async_receive_callback_result(ret_future, e,
False)
)
@@ -409,14 +409,14 @@ class SimpleConsumer(Client):
)
MessagingResultChecker.check(res.status)
if ret_future is not None:
- self._set_future_callback_result(
+ self._submit_callback(
CallbackResult.async_ack_callback_result(ret_future, None)
)
except Exception as e:
if ret_future is None:
raise e
else:
- self._set_future_callback_result(
+ self._submit_callback(
CallbackResult.async_ack_callback_result(ret_future, e,
False)
)
@@ -434,7 +434,7 @@ class SimpleConsumer(Client):
metadata=self._sign(),
timeout=self.client_configuration.request_timeout,
)
- self.__handle_change_invisible_result(future)
+ self.__handle_change_invisible_result(future, message)
except Exception as e:
raise e
@@ -454,7 +454,7 @@ class SimpleConsumer(Client):
)
ret_future = Future()
change_invisible_callback = functools.partial(
- self.__handle_change_invisible_result, ret_future=ret_future
+ self.__handle_change_invisible_result, message=message,
ret_future=ret_future
)
future.add_done_callback(change_invisible_callback)
return ret_future
@@ -472,15 +472,16 @@ class SimpleConsumer(Client):
req.message_id = message.message_id
return req
- def __handle_change_invisible_result(self, future, ret_future=None):
+ def __handle_change_invisible_result(self, future, message,
ret_future=None):
try:
res = future.result()
logger.debug(
f"consumer[{self.__consumer_group}] change invisible response,
{res.status}"
)
+ message.receipt_handle = res.receipt_handle
MessagingResultChecker.check(res.status)
if ret_future is not None:
- self._set_future_callback_result(
+ self._submit_callback(
CallbackResult.async_change_invisible_duration_callback_result(
ret_future, None
)
@@ -489,7 +490,7 @@ class SimpleConsumer(Client):
if ret_future is None:
raise e
else:
- self._set_future_callback_result(
+ self._submit_callback(
CallbackResult.async_change_invisible_duration_callback_result(
ret_future, e, False
)
diff --git a/python/rocketmq/v5/log/log_config.py
b/python/rocketmq/v5/log/log_config.py
index a363c650..f179a89f 100644
--- a/python/rocketmq/v5/log/log_config.py
+++ b/python/rocketmq/v5/log/log_config.py
@@ -32,7 +32,7 @@ __LOG_CONFIG = {
# },
"file": {
"class": "logging.handlers.RotatingFileHandler",
- "level": "INFO",
+ "level": "DEBUG",
"formatter": "standard",
"filename": f"{__DIR}/rocketmq_client.log",
"maxBytes": 1024 * 1024 * 100, # 100MB
diff --git a/python/rocketmq/v5/model/message.py
b/python/rocketmq/v5/model/message.py
index ead7d795..dee7dad4 100644
--- a/python/rocketmq/v5/model/message.py
+++ b/python/rocketmq/v5/model/message.py
@@ -219,6 +219,10 @@ class Message:
raise IllegalArgumentException("key should not be blank")
self.__keys.update(set(keys))
+ @receipt_handle.setter
+ def receipt_handle(self, receipt_handle):
+ self.__receipt_handle = receipt_handle
+
@message_type.setter
def message_type(self, message_type):
self.__message_type = message_type
diff --git a/python/rocketmq/v5/producer/producer.py
b/python/rocketmq/v5/producer/producer.py
index ba9e3d9b..2a9bba70 100644
--- a/python/rocketmq/v5/producer/producer.py
+++ b/python/rocketmq/v5/producer/producer.py
@@ -17,7 +17,7 @@ import abc
import functools
import threading
import time
-from concurrent.futures import Future
+from concurrent.futures import Future, ThreadPoolExecutor
from rocketmq.grpc_protocol import (ClientType, Code, Encoding,
EndTransactionRequest, HeartbeatRequest,
@@ -136,6 +136,7 @@ class Producer(Client):
self.__checker = (
checker # checker for transaction message, handle checking from
server
)
+ self.__transaction_check_executor = ThreadPoolExecutor(max_workers=1,
thread_name_prefix="transaction_check_worker")
def __str__(self):
return f"{ClientType.Name(self.client_type)}
client_id:{self.client_id}"
@@ -223,7 +224,7 @@ class Producer(Client):
)
return future.result()
- async def on_recover_orphaned_transaction_command(
+ def on_recover_orphaned_transaction_command(
self, endpoints, msg, transaction_id
):
# call this function from server side stream, in RpcClient._io_loop
@@ -236,28 +237,7 @@ class Producer(Client):
if self.__checker is None:
raise IllegalArgumentException("No transaction checker
registered.")
message = Message().fromProtobuf(msg)
- result = self.__checker.check(message)
-
- if result == TransactionResolution.COMMIT:
- res = await self.__commit_for_server_check(
- endpoints,
- message,
- transaction_id,
- TransactionSource.SOURCE_SERVER_CHECK,
- )
- logger.debug(
- f"commit message. message_id: {message.message_id},
transaction_id: {transaction_id}, res: {res}"
- )
- elif result == TransactionResolution.ROLLBACK:
- res = await self.__rollback_for_server_check(
- endpoints,
- message,
- transaction_id,
- TransactionSource.SOURCE_SERVER_CHECK,
- )
- logger.debug(
- f"rollback message. message_id: {message.message_id},
transaction_id: {transaction_id}, res: {res}"
- )
+
self.__transaction_check_executor.submit(self.__server_transaction_check,
endpoints, message, transaction_id)
except Exception as e:
logger.error(f"on_recover_orphaned_transaction_command exception:
{e}")
@@ -313,6 +293,8 @@ class Producer(Client):
def shutdown(self):
logger.info(f"begin to shutdown {self.__str__()}")
+ self.__transaction_check_executor.shutdown()
+ self.__transaction_check_executor = None
super().shutdown()
logger.info(f"shutdown {self.__str__()} success.")
@@ -395,7 +377,7 @@ class Producer(Client):
send_message_future, topic_queue
)
self.client_metrics.send_after(send_metric_context, True)
- self._set_future_callback_result(
+ self._submit_callback(
CallbackResult.async_send_callback_result(ret_future,
send_receipt)
)
except Exception as e:
@@ -406,7 +388,7 @@ class Producer(Client):
if retry_exception_future is not None:
# end retry with exception
self.client_metrics.send_after(send_metric_context, False)
- self._set_future_callback_result(
+ self._submit_callback(
CallbackResult.async_send_callback_result(
ret_future, retry_exception_future.exception(), False
)
@@ -436,13 +418,6 @@ class Producer(Client):
)
end_retry = True
- # no need more attempts for transactional message
- if message.message_type == MessageType.TRANSACTION:
- logger.error(
- f"{self.__str__()} failed to send message to
{topic_queue.endpoints.__str__()}, topic:{message.topic},
message_id:{message.message_id}, message_type:{message.message_type}
,attempt:{attempt}"
- )
- end_retry = True
-
# end retry if system busy
if isinstance(e, TooManyRequestsException):
logger.error(
@@ -547,27 +522,31 @@ class Producer(Client):
req.source = source
return req
- def __commit_for_server_check(
- self, endpoints, message: Message, transaction_id, source
- ):
- return self.__end_transaction_for_server_check(
- endpoints, message, transaction_id, TransactionResolution.COMMIT,
source
- )
-
- def __rollback_for_server_check(
- self, endpoints, message: Message, transaction_id, source
- ):
- return self.__end_transaction_for_server_check(
- endpoints, message, transaction_id,
TransactionResolution.ROLLBACK, source
- )
+ def __server_transaction_check_callback(self, future, message,
transaction_id, result):
+ try:
+ res = future.result()
+ if res is not None and res.status.code == Code.OK:
+ if result == TransactionResolution.COMMIT:
+ logger.debug(
+ f"{self.__str__()} commit message. message_id:
{message.message_id}, transaction_id: {transaction_id}, res: {res}"
+ )
+ elif result == TransactionResolution.ROLLBACK:
+ logger.debug(
+ f"{self.__str__()} rollback message. message_id:
{message.message_id}, transaction_id: {transaction_id}, res: {res}"
+ )
+ else:
+ if result == TransactionResolution.COMMIT:
+ raise Exception(f"{self.__str__()} commit message:
{message.message_id} raise exception")
+ elif result == TransactionResolution.ROLLBACK:
+ raise Exception(f"{self.__str__()} rollback message:
{message.message_id} raise exception")
+ except Exception as e:
+ logger.error(f"server transaction check raise exception, {e}")
- def __end_transaction_for_server_check(
- self, endpoints, message: Message, transaction_id, result, source
- ):
- req = self.__end_transaction_req(message, transaction_id, result,
source)
- return self.rpc_client.end_transaction_for_server_check(
- endpoints,
- req,
- metadata=self._sign(),
- timeout=self.client_configuration.request_timeout,
- )
+ def __server_transaction_check(self, endpoints, message, transaction_id):
+ try:
+ result = self.__checker.check(message)
+ req = self.__end_transaction_req(message, transaction_id, result,
TransactionSource.SOURCE_SERVER_CHECK)
+ future = self.rpc_client.end_transaction_async(endpoints, req,
metadata=self._sign(), timeout=self.client_configuration.request_timeout)
+
future.add_done_callback(functools.partial(self.__server_transaction_check_callback,
message=message, transaction_id=transaction_id, result=result))
+ except Exception as e:
+ raise e
diff --git a/python/rocketmq/v5/util/misc.py b/python/rocketmq/v5/util/misc.py
index b021b690..c0c00092 100644
--- a/python/rocketmq/v5/util/misc.py
+++ b/python/rocketmq/v5/util/misc.py
@@ -29,7 +29,7 @@ class Misc:
__OS_NAME = None
TOPIC_PATTERN = compile(r"^[%a-zA-Z0-9_-]+$")
CONSUMER_GROUP_PATTERN = compile(r"^[%a-zA-Z0-9_-]+$")
- SDK_VERSION = "5.0.3"
+ SDK_VERSION = "5.0.4"
@staticmethod
def sdk_language():
diff --git a/python/setup.py b/python/setup.py
index 3881d6d7..f869f7ee 100644
--- a/python/setup.py
+++ b/python/setup.py
@@ -17,7 +17,7 @@ from setuptools import find_packages, setup
setup(
name='rocketmq-python-client',
- version='5.0.3',
+ version='5.0.4',
packages=find_packages(),
install_requires=[
"grpcio>=1.5.0",