Marandi269 opened a new issue, #1168:
URL: https://github.com/apache/rocketmq-clients/issues/1168
## Before Creating the Bug Report
- [x] I found a bug, not just asking a question
- [x] I have searched the GitHub Issues and GitHub Discussions and believe
that this is not a duplicate
- [x] I have confirmed that this bug belongs to the current repository
## Programming Language of the Client
Python
## Runtime Platform Environment
- OS: Ubuntu 22.04 / Debian 12
- Python: 3.11
## RocketMQ Version of the Client/Server
- Client: rocketmq-python-client 5.0.9 (also verified in latest source code
5.1.0)
- Server: RocketMQ 5.1.4
## Describe the Bug
`PushConsumer` starts successfully but never receives any messages - the
`MessageListener.consume()` callback is never invoked. Meanwhile,
`SimpleConsumer` works correctly with the same configuration.
After investigating the source code, I noticed that in
`rpc_channel.py:136-140`, `reset_setting()` might only be called when
`res.settings.metric` is truthy:
```python
if res.settings and res.settings.metric:
self.__handler.reset_metric(res.settings.metric)
self.__handler.reset_setting(res.settings)
```
If the server returns settings without metric data, `reset_setting()` would
be skipped. This could cause `PushConsumer.__consumption` to remain `None`, and
`__scan_assignment()` would return early at line 167-168:
```python
def __scan_assignment(self):
if not self.__consumption:
return
```
## Steps to Reproduce
Save as `test_pushconsumer_bug.py` and run:
```python
#!/usr/bin/env python3
"""PushConsumer Bug Reproduction Test"""
import json
import time
import uuid
from rocketmq.v5.client import ClientConfiguration
from rocketmq.v5.client.client_configuration import Credentials
from rocketmq.v5.producer import Producer
from rocketmq.v5.consumer import PushConsumer, SimpleConsumer,
ConsumeResult, MessageListener
from rocketmq.v5.model import FilterExpression, Message
ENDPOINTS = '127.0.0.1:18081' # RocketMQ Proxy gRPC port
TOPIC = 'TestTopic'
TAG = 'test-tag'
config = ClientConfiguration(endpoints=ENDPOINTS,
credentials=Credentials('', ''))
push_received = []
class TestListener(MessageListener):
def consume(self, message) -> ConsumeResult:
push_received.append(message.message_id)
print(f' [PushConsumer] Received: {message.message_id[:8]}')
return ConsumeResult.SUCCESS
def send_messages(count: int):
producer = Producer(config, (TOPIC,))
producer.startup()
for i in range(count):
msg = Message()
msg.topic = TOPIC
msg.body = json.dumps({'index': i}).encode()
msg.tag = TAG
producer.send(msg)
producer.shutdown()
print(f' Sent {count} messages')
def test_simple_consumer(timeout: int) -> int:
received = []
consumer = SimpleConsumer(config, f'simple-{uuid.uuid4().hex[:8]}',
await_duration=5)
consumer.startup()
consumer.subscribe(TOPIC, FilterExpression(TAG))
end = time.time() + timeout
while time.time() < end:
try:
for msg in consumer.receive(max_message_num=32,
invisible_duration=30):
received.append(msg.message_id)
print(f' [SimpleConsumer] Received: {msg.message_id[:8]}')
consumer.ack(msg)
except Exception:
pass
time.sleep(0.1)
consumer.shutdown()
return len(received)
def main():
print('=' * 60)
print('PushConsumer Bug Reproduction Test')
print(f'Endpoints: {ENDPOINTS}, Topic: {TOPIC}')
print('=' * 60)
# Test PushConsumer
print('\n[1/4] Starting PushConsumer...')
push_consumer = PushConsumer(
config,
f'push-{uuid.uuid4().hex[:8]}',
TestListener(),
{TOPIC: FilterExpression(TAG)}
)
push_consumer.startup()
time.sleep(2)
print('\n[2/4] Sending 5 messages...')
send_messages(5)
print('\n[3/4] Waiting for PushConsumer (15 seconds)...')
time.sleep(15)
push_consumer.shutdown()
# Test SimpleConsumer
print('\n[4/4] Testing SimpleConsumer...')
send_messages(5)
simple_count = test_simple_consumer(10)
# Results
print('\n' + '=' * 60)
print(f'PushConsumer: {len(push_received)} messages')
print(f'SimpleConsumer: {simple_count} messages')
print('=' * 60)
if len(push_received) == 0 and simple_count > 0:
print('\n[ISSUE] PushConsumer receives nothing, SimpleConsumer
works')
return 1
return 0
if __name__ == '__main__':
exit(main())
```
## What Did You Expect to See?
Both `PushConsumer` and `SimpleConsumer` should receive messages
successfully.
## What Did You See Instead?
```
============================================================
PushConsumer Bug Reproduction Test
Endpoints: 127.0.0.1:18081, Topic: TestTopic
============================================================
[1/4] Starting PushConsumer...
[2/4] Sending 5 messages...
Sent 5 messages
[3/4] Waiting for PushConsumer (15 seconds)...
[4/4] Testing SimpleConsumer...
Sent 5 messages
[SimpleConsumer] Received: 0148FE91
[SimpleConsumer] Received: 0148FE91
[SimpleConsumer] Received: 0148FE91
[SimpleConsumer] Received: 0148FE91
[SimpleConsumer] Received: 0148FE91
============================================================
PushConsumer: 0 messages
SimpleConsumer: 5 messages
============================================================
[ISSUE] PushConsumer receives nothing, SimpleConsumer works
```
## Additional Context
I would be happy to submit a PR to fix this if the analysis above is
correct. The potential fix might be:
```python
# rpc_channel.py:136-140
if res.settings:
if res.settings.metric:
self.__handler.reset_metric(res.settings.metric)
self.__handler.reset_setting(res.settings)
```
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]