Marandi269 opened a new issue, #1168:
URL: https://github.com/apache/rocketmq-clients/issues/1168

   ## Before Creating the Bug Report
   
   - [x] I found a bug, not just asking a question
   - [x] I have searched the GitHub Issues and GitHub Discussions and believe 
that this is not a duplicate
   - [x] I have confirmed that this bug belongs to the current repository
   
   ## Programming Language of the Client
   
   Python
   
   ## Runtime Platform Environment
   
   - OS: Ubuntu 22.04 / Debian 12
   - Python: 3.11
   
   ## RocketMQ Version of the Client/Server
   
   - Client: rocketmq-python-client 5.0.9 (also verified in latest source code 
5.1.0)
   - Server: RocketMQ 5.1.4
   
   ## Describe the Bug
   
   `PushConsumer` starts successfully but never receives any messages - the 
`MessageListener.consume()` callback is never invoked. Meanwhile, 
`SimpleConsumer` works correctly with the same configuration.
   
   After investigating the source code, I noticed that in 
`rpc_channel.py:136-140`, `reset_setting()` might only be called when 
`res.settings.metric` is truthy:
   
   ```python
   if res.settings and res.settings.metric:
       self.__handler.reset_metric(res.settings.metric)
       self.__handler.reset_setting(res.settings)
   ```
   
   If the server returns settings without metric data, `reset_setting()` would 
be skipped. This could cause `PushConsumer.__consumption` to remain `None`, and 
`__scan_assignment()` would return early at line 167-168:
   
   ```python
   def __scan_assignment(self):
       if not self.__consumption:
           return
   ```
   
   ## Steps to Reproduce
   
   Save as `test_pushconsumer_bug.py` and run:
   
   ```python
   #!/usr/bin/env python3
   """PushConsumer Bug Reproduction Test"""
   
   import json
   import time
   import uuid
   
   from rocketmq.v5.client import ClientConfiguration
   from rocketmq.v5.client.client_configuration import Credentials
   from rocketmq.v5.producer import Producer
   from rocketmq.v5.consumer import PushConsumer, SimpleConsumer, 
ConsumeResult, MessageListener
   from rocketmq.v5.model import FilterExpression, Message
   
   ENDPOINTS = '127.0.0.1:18081'  # RocketMQ Proxy gRPC port
   TOPIC = 'TestTopic'
   TAG = 'test-tag'
   
   config = ClientConfiguration(endpoints=ENDPOINTS, 
credentials=Credentials('', ''))
   push_received = []
   
   
   class TestListener(MessageListener):
       def consume(self, message) -> ConsumeResult:
           push_received.append(message.message_id)
           print(f'  [PushConsumer] Received: {message.message_id[:8]}')
           return ConsumeResult.SUCCESS
   
   
   def send_messages(count: int):
       producer = Producer(config, (TOPIC,))
       producer.startup()
       for i in range(count):
           msg = Message()
           msg.topic = TOPIC
           msg.body = json.dumps({'index': i}).encode()
           msg.tag = TAG
           producer.send(msg)
       producer.shutdown()
       print(f'  Sent {count} messages')
   
   
   def test_simple_consumer(timeout: int) -> int:
       received = []
       consumer = SimpleConsumer(config, f'simple-{uuid.uuid4().hex[:8]}', 
await_duration=5)
       consumer.startup()
       consumer.subscribe(TOPIC, FilterExpression(TAG))
   
       end = time.time() + timeout
       while time.time() < end:
           try:
               for msg in consumer.receive(max_message_num=32, 
invisible_duration=30):
                   received.append(msg.message_id)
                   print(f'  [SimpleConsumer] Received: {msg.message_id[:8]}')
                   consumer.ack(msg)
           except Exception:
               pass
           time.sleep(0.1)
   
       consumer.shutdown()
       return len(received)
   
   
   def main():
       print('=' * 60)
       print('PushConsumer Bug Reproduction Test')
       print(f'Endpoints: {ENDPOINTS}, Topic: {TOPIC}')
       print('=' * 60)
   
       # Test PushConsumer
       print('\n[1/4] Starting PushConsumer...')
       push_consumer = PushConsumer(
           config,
           f'push-{uuid.uuid4().hex[:8]}',
           TestListener(),
           {TOPIC: FilterExpression(TAG)}
       )
       push_consumer.startup()
       time.sleep(2)
   
       print('\n[2/4] Sending 5 messages...')
       send_messages(5)
   
       print('\n[3/4] Waiting for PushConsumer (15 seconds)...')
       time.sleep(15)
       push_consumer.shutdown()
   
       # Test SimpleConsumer
       print('\n[4/4] Testing SimpleConsumer...')
       send_messages(5)
       simple_count = test_simple_consumer(10)
   
       # Results
       print('\n' + '=' * 60)
       print(f'PushConsumer:   {len(push_received)} messages')
       print(f'SimpleConsumer: {simple_count} messages')
       print('=' * 60)
   
       if len(push_received) == 0 and simple_count > 0:
           print('\n[ISSUE] PushConsumer receives nothing, SimpleConsumer 
works')
           return 1
       return 0
   
   
   if __name__ == '__main__':
       exit(main())
   ```
   
   ## What Did You Expect to See?
   
   Both `PushConsumer` and `SimpleConsumer` should receive messages 
successfully.
   
   ## What Did You See Instead?
   
   ```
   ============================================================
   PushConsumer Bug Reproduction Test
   Endpoints: 127.0.0.1:18081, Topic: TestTopic
   ============================================================
   
   [1/4] Starting PushConsumer...
   
   [2/4] Sending 5 messages...
     Sent 5 messages
   
   [3/4] Waiting for PushConsumer (15 seconds)...
   
   [4/4] Testing SimpleConsumer...
     Sent 5 messages
     [SimpleConsumer] Received: 0148FE91
     [SimpleConsumer] Received: 0148FE91
     [SimpleConsumer] Received: 0148FE91
     [SimpleConsumer] Received: 0148FE91
     [SimpleConsumer] Received: 0148FE91
   
   ============================================================
   PushConsumer:   0 messages
   SimpleConsumer: 5 messages
   ============================================================
   
   [ISSUE] PushConsumer receives nothing, SimpleConsumer works
   ```
   
   ## Additional Context
   
   I would be happy to submit a PR to fix this if the analysis above is 
correct. The potential fix might be:
   
   ```python
   # rpc_channel.py:136-140
   if res.settings:
       if res.settings.metric:
           self.__handler.reset_metric(res.settings.metric)
       self.__handler.reset_setting(res.settings)
   ```


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to