deku0818 opened a new issue, #1191: URL: https://github.com/apache/rocketmq-clients/issues/1191
### Before Creating the Bug Report - [x] I found a bug, not just asking a question, which should be created in [GitHub Discussions](https://github.com/apache/rocketmq-clients/discussions). - [x] I have searched the [GitHub Issues](https://github.com/apache/rocketmq-clients/issues) and [GitHub Discussions](https://github.com/apache/rocketmq-clients/discussions) of this repository and believe that this is not a duplicate. - [x] I have confirmed that this bug belongs to the current repository, not other repositories of RocketMQ. ### Programming Language of the Client Python ### Runtime Platform Environment OS: Ubuntu 22.04 (Linux 6.14.0) ### RocketMQ Version of the Client/Server - Client: rocketmq-python-client 5.1.0 - Server: Apache RocketMQ 5.x ### Run or Compiler Version - Python: 3.11 - protobuf: 6.33.5 - grpcio: 1.76.0 ### Describe the Bug `PushConsumer.startup()` hangs indefinitely when the broker responds with `metric.on=False` in the telemetry settings. The root cause is that `PushConsumer.reset_metric()` throws an unhandled `AttributeError` (because `ClientMetrics.__meter_provider` is `None` when metrics are disabled). This exception kills the telemetry stream read loop in `RpcStreamStreamCall.start_stream_read()`, preventing `reset_setting()` from ever being called. Since `reset_setting()` is the only method that calls `_init_settings_event.set()`, `startup()` blocks forever on `_init_settings_event.wait()`. ### Steps to Reproduce 1. Deploy a RocketMQ 5.x broker with metric collection disabled (this is the default). 2. Run the following code: ```python from rocketmq import ( ClientConfiguration, Credentials, FilterExpression, PushConsumer, MessageListener, ConsumeResult, Message, ) class MyListener(MessageListener): def consume(self, message: Message) -> ConsumeResult: return ConsumeResult.SUCCESS config = ClientConfiguration("broker-ip:port", Credentials()) consumer = PushConsumer( config, "my-consumer-group", MyListener(), {"my-topic": FilterExpression()}, ) consumer.startup() # Hangs forever ``` ### What Did You Expect to See? `startup()` completes successfully and the consumer begins receiving messages. ### What Did You See Instead? `startup()` blocks forever. Thread dump shows it is stuck on `_init_settings_event.wait()` in `client.py:79`. ### Additional Context ### Detailed call chain analysis The bug involves three files: **1. `rpc_channel.py` — `RpcStreamStreamCall.start_stream_read()` (line 136-140)** ```python if res.settings and res.settings.metric: self.__handler.reset_metric(res.settings.metric) # ← crashes here self.__handler.reset_setting(res.settings) # ← never reached ``` `res.settings.metric` is a protobuf message object, which is **always truthy** in Python regardless of its field values. So this block is always entered. `reset_metric()` is called **before** `reset_setting()`. When `reset_metric()` throws an exception, the outer `except Exception` clause (line 155) catches it and the stream read loop terminates. `reset_setting()` is never called. **2. `push_consumer.py` — `PushConsumer.reset_metric()`** ```python def reset_metric(self, metric): super().reset_metric(metric) # returns early when metric.on=False self.__register_process_queues_gauges() # ← crashes here ``` **3. `client_metrics.py` — `ClientMetrics.create_push_consumer_process_queue_observable_gauge()`** ```python def create_push_consumer_process_queue_observable_gauge(self, name, callback_func): meter = self.__meter_provider.get_meter(ClientMetrics.METRIC_NAME) # ^^^^^^^^^^^^^^^^^^^^ __meter_provider is None! # → AttributeError: 'NoneType' object has no attribute 'get_meter' ``` `__meter_provider` is `None` because `ClientMetrics.reset_metrics()` returned early — `__satisfy()` returns `True` when `not self.__enabled and not metric.on`, so `__meter_provider_start()` was never called. **Full chain of events:** ``` Broker responds with metric.on=False → start_stream_read() enters "if res.settings.metric" block (always truthy) → PushConsumer.reset_metric(metric) → ClientMetrics.reset_metrics(metric) → __satisfy(metric) returns True (metrics disabled) → returns early, __meter_provider remains None → __register_process_queues_gauges() → create_push_consumer_process_queue_observable_gauge() → self.__meter_provider.get_meter() → AttributeError! → Exception propagates to start_stream_read() → Caught by "except Exception", stream read loop ends → reset_setting() is NEVER called → _init_settings_event is NEVER set → startup() blocks forever on _init_settings_event.wait() ``` ### Suggested Fix Two issues should be addressed: **Fix 1: Guard `__meter_provider` access in `client_metrics.py`** `create_push_consumer_process_queue_observable_gauge()` should return early when metrics are not enabled: ```python def create_push_consumer_process_queue_observable_gauge(self, name, callback_func): if not self.__meter_provider: return # ... rest of method unchanged ``` **Fix 2: Ensure `reset_setting()` always executes in `rpc_channel.py`** `reset_setting()` must not depend on `reset_metric()` succeeding. It should be called first, and `reset_metric()` should be wrapped in try/except: ```python if res.settings: self.__handler.reset_setting(res.settings) if res.settings.HasField("metric"): try: self.__handler.reset_metric(res.settings.metric) except Exception: logger.warning("reset_metric failed, ignored") ``` ### Workaround Subclass `PushConsumer` at the application level: ```python class SafePushConsumer(PushConsumer): def reset_metric(self, metric): try: super().reset_metric(metric) except AttributeError: pass ``` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected]
