This is an automated email from the ASF dual-hosted git repository.

lollipop pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/rocketmq-clients.git


The following commit(s) were added to refs/heads/master by this push:
     new 54724846 [Python] Fix PushConsumer not receiving messages (#1169)
54724846 is described below

commit 54724846e46e9bcef358b473317f27ebc6fc05a1
Author: Marandi269 <[email protected]>
AuthorDate: Thu Jan 8 10:19:54 2026 +0800

    [Python] Fix PushConsumer not receiving messages (#1169)
    
    Move reset_setting() before reset_metric() to ensure consumer
    initialization completes even if reset_metric() throws an exception.
    
    Root cause: reset_setting() was nested inside the metric check condition,
    and reset_metric() could throw exceptions (e.g., when client_metrics is
    not initialized), causing reset_setting() to never be called. This left
    PushConsumer.__consumption as None, making __scan_assignment() return
    early and never fetch messages.
    
    Fixes #1168
---
 python/rocketmq/v5/client/connection/rpc_channel.py | 9 +++++----
 1 file changed, 5 insertions(+), 4 deletions(-)

diff --git a/python/rocketmq/v5/client/connection/rpc_channel.py 
b/python/rocketmq/v5/client/connection/rpc_channel.py
index 6ca598e6..b7de54dd 100644
--- a/python/rocketmq/v5/client/connection/rpc_channel.py
+++ b/python/rocketmq/v5/client/connection/rpc_channel.py
@@ -133,11 +133,12 @@ class RpcStreamStreamCall:
                             logger.debug(
                                 f"{ self.__handler} sync setting success. 
response status code: {res.status.code}"
                             )
-                            if res.settings and res.settings.metric:
-                                # reset metrics if needed
-                                
self.__handler.reset_metric(res.settings.metric)
-                                # sync setting
+                            if res.settings:
+                                # sync setting first to ensure consumer 
initialization
                                 self.__handler.reset_setting(res.settings)
+                                if res.settings.metric:
+                                    # reset metrics if needed
+                                    
self.__handler.reset_metric(res.settings.metric)
                     elif res.HasField("recover_orphaned_transaction_command"):
                         # sever check for a transaction message
                         if self.__handler:

Reply via email to