become-nice opened a new issue #7726:
URL: https://github.com/apache/pulsar/issues/7726
**Describe the bug**
I create a topic with 4 partitions.When I use key shared mode to consumer
message,sometimes, I can't receive any message from a specify partition.In this
scenes,I create two consumers, one of it subscribe the topic(C1), one of it
subscribe two partitions of the same topic(C2). I start C1 first, then C2. At
first, C1 can receive all message,C2 can't receive any message.When I close C1,
C2 can receive message from specify partitions. But when I start C1 again, all
consumer can't recevie from the partitions those C2 subscribed.
The code of produce
```python
import pulsar
import time
from multiprocessing import Process
def produce_test(partition, key, message):
client = pulsar.Client('pulsar://localhost:6650')
producer = client.create_producer(partition)
print(client.get_topic_partitions("test10"))
for i in range(240):
time.sleep(2)
message_mix = str(int(i)) + message
print(message_mix)
producer.send((message_mix).encode('utf-8'), partition_key=key)
time.sleep(1000)
client.close()
if __name__ == "__main__":
p1 = Process(target=produce_test, args=("test10", "10000", "aaaa",))
p2 = Process(target=produce_test, args=("test10", "20000", "bbbb",))
p3 = Process(target=produce_test, args=("test10", "30000", "cccc",))
p4 = Process(target=produce_test, args=("test10", "40000", "dddd",))
p1.start()
p2.start()
p3.start()
p4.start()
```
The code of C1:
```python
import pulsar
import time
from _pulsar import ConsumerType
from multiprocessing import Process
def consumer_data(topic, process_index):
client = pulsar.Client('pulsar://localhost:6650')
print(len(client._consumers))
consumer = client.subscribe(topic, 'my-subscription',
consumer_type=ConsumerType.KeyShared)
flag = True
start_time = int(time.time() * 1000)
while True:
msg = consumer.receive()
print(
"Process {} Received message '{}' id='{}'
partition={}".format(process_index, msg.data(), msg.message_id(),
msg.topic_name()))
consumer.acknowledge(msg)
if __name__ == "__main__":
p1 = Process(target=consumer_data, args=("test10", "p1", ))
p1.start()
```
The code of C2
```python
import pulsar
import time
from _pulsar import ConsumerType
from multiprocessing import Process
def consumer_data(topic, process_index,):
client = pulsar.Client('pulsar://localhost:6650')
print(len(client._consumers))
consumer = client.subscribe(topic, 'my-subscription',
consumer_type=ConsumerType.KeyShared)
flag = True
start_time = int(time.time() * 1000)
while True:
msg = consumer.receive()
print(
"Process {} Received message '{}' id='{}'
partition={}".format(process_index, msg.data(), msg.message_id(),
msg.topic_name()))
consumer.acknowledge(msg)
if __name__ == "__main__":
p2 = Process(target=consumer_data, args=(["test10-partition-2",
"test10-partition-3"], "2", ))
p2.start()
```
The output of C1:
```
Process p1 Received message 'b'236dddd'' id='(15,236,0,-1)'
partition=persistent://public/default/test10-partition-0
Process p1 Received message 'b'236cccc'' id='(17,472,1,-1)'
partition=persistent://public/default/test10-partition-1
Process p1 Received message 'b'236aaaa'' id='(17,473,1,-1)'
partition=persistent://public/default/test10-partition-1
Process p1 Received message 'b'237dddd'' id='(15,237,0,-1)'
partition=persistent://public/default/test10-partition-0
Process p1 Received message 'b'237cccc'' id='(17,474,1,-1)'
partition=persistent://public/default/test10-partition-1
Process p1 Received message 'b'237aaaa'' id='(17,475,1,-1)'
partition=persistent://public/default/test10-partition-1
Process p1 Received message 'b'238dddd'' id='(15,238,0,-1)'
partition=persistent://public/default/test10-partition-0
Process p1 Received message 'b'238cccc'' id='(17,476,1,-1)'
partition=persistent://public/default/test10-partition-1
Process p1 Received message 'b'238aaaa'' id='(17,477,1,-1)'
partition=persistent://public/default/test10-partition-1
Process p1 Received message 'b'239dddd'' id='(15,239,0,-1)'
partition=persistent://public/default/test10-partition-0
Process p1 Received message 'b'239cccc'' id='(17,478,1,-1)'
partition=persistent://public/default/test10-partition-1
Process p1 Received message 'b'239aaaa'' id='(17,479,1,-1)'
partition=persistent://public/default/test10-partition-1
2020-08-03 15:54:37.531 INFO [140699652921088] ConsumerStatsImpl:65 |
Consumer [persistent://public/default/test10-partition-0, my-subscription, 0] ,
ConsumerStatsImpl (numBytesRecieved_ = 1535, totalNumBytesRecieved_ = 1535,
receivedMsgMap_ = {[Key: Ok, Value: 233], }, ackedMsgMap_ = {[Key: {Result: Ok,
ackType: 0}, Value: 233], }, totalReceivedMsgMap_ = {[Key: Ok, Value: 233], },
totalAckedMsgMap_ = {[Key: {Result: Ok, ackType: 0}, Value: 233], })
2020-08-03 15:54:37.531 INFO [140699652921088] ConsumerStatsImpl:65 |
Consumer [persistent://public/default/test10-partition-1, my-subscription, 1] ,
ConsumerStatsImpl (numBytesRecieved_ = 3070, totalNumBytesRecieved_ = 3070,
receivedMsgMap_ = {[Key: Ok, Value: 466], }, ackedMsgMap_ = {[Key: {Result: Ok,
ackType: 0}, Value: 466], }, totalReceivedMsgMap_ = {[Key: Ok, Value: 466], },
totalAckedMsgMap_ = {[Key: {Result: Ok, ackType: 0}, Value: 466], })
2020-08-03 15:54:37.531 INFO [140699652921088] ConsumerStatsImpl:65 |
Consumer [persistent://public/default/test10-partition-2, my-subscription, 2] ,
ConsumerStatsImpl (numBytesRecieved_ = 0, totalNumBytesRecieved_ = 0,
receivedMsgMap_ = {}, ackedMsgMap_ = {}, totalReceivedMsgMap_ = {},
totalAckedMsgMap_ = {})
2020-08-03 15:54:37.531 INFO [140699652921088] ConsumerStatsImpl:65 |
Consumer [persistent://public/default/test10-partition-3, my-subscription, 3] ,
ConsumerStatsImpl (numBytesRecieved_ = 0, totalNumBytesRecieved_ = 0,
receivedMsgMap_ = {}, ackedMsgMap_ = {}, totalReceivedMsgMap_ = {},
totalAckedMsgMap_ = {})
```
The output of C2:
```
2020-08-03 15:49:46.326 INFO [140382617937728] ConnectionPool:85 | Created
connection for pulsar://localhost:6650
2020-08-03 15:49:46.326 INFO [140382567016192] ClientConnection:335 |
[127.0.0.1:50638 -> 127.0.0.1:6650] Connected to broker
2020-08-03 15:49:46.329 INFO [140382567016192] AckGroupingTrackerEnabled:53
| ACK grouping is enabled, grouping time 100ms, grouping max size 1000
2020-08-03 15:49:46.329 INFO [140382567016192] HandlerBase:53 |
[persistent://public/default/test10-partition-2, my-subscription, 0] Getting
connection from pool
2020-08-03 15:49:46.329 INFO [140382567016192] AckGroupingTrackerEnabled:53
| ACK grouping is enabled, grouping time 100ms, grouping max size 1000
2020-08-03 15:49:46.329 INFO [140382567016192] HandlerBase:53 |
[persistent://public/default/test10-partition-3, my-subscription, 1] Getting
connection from pool
2020-08-03 15:49:46.331 INFO [140382567016192] ConsumerImpl:199 |
[persistent://public/default/test10-partition-2, my-subscription, 0] Created
consumer on broker [127.0.0.1:50638 -> 127.0.0.1:6650]
2020-08-03 15:49:46.331 INFO [140382567016192] ConsumerImpl:199 |
[persistent://public/default/test10-partition-3, my-subscription, 1] Created
consumer on broker [127.0.0.1:50638 -> 127.0.0.1:6650]
2020-08-03 15:49:46.331 INFO [140382567016192] MultiTopicsConsumerImpl:99 |
Successfully Subscribed to Topics
2020-08-03 15:59:46.330 INFO [140382567016192] ConsumerStatsImpl:65 |
Consumer [persistent://public/default/test10-partition-2, my-subscription, 0] ,
ConsumerStatsImpl (numBytesRecieved_ = 0, totalNumBytesRecieved_ = 0,
receivedMsgMap_ = {}, ackedMsgMap_ = {}, totalReceivedMsgMap_ = {},
totalAckedMsgMap_ = {})
2020-08-03 15:59:46.330 INFO [140382567016192] ConsumerStatsImpl:65 |
Consumer [persistent://public/default/test10-partition-3, my-subscription, 1] ,
ConsumerStatsImpl (numBytesRecieved_ = 0, totalNumBytesRecieved_ = 0,
receivedMsgMap_ = {}, ackedMsgMap_ = {}, totalReceivedMsgMap_ = {},
totalAckedMsgMap_ = {})
```
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]