become-nice opened a new issue #7726:
URL: https://github.com/apache/pulsar/issues/7726


   **Describe the bug**
   I create a topic with 4 partitions.When I use key shared mode to consumer 
message,sometimes, I can't receive any message from a specify partition.In this 
scenes,I create two consumers, one of it subscribe the topic(C1), one of it 
subscribe two partitions of the same topic(C2). I start C1 first, then C2. At 
first, C1 can receive all message,C2 can't receive any message.When I close C1, 
C2 can receive message from specify partitions. But when I start C1 again, all 
consumer can't recevie from the partitions those C2  subscribed.
   
   The code of produce
   ```python
   import pulsar
   import time
   from multiprocessing import Process
   
   
   def produce_test(partition, key, message):
       client = pulsar.Client('pulsar://localhost:6650')
       producer = client.create_producer(partition)
       print(client.get_topic_partitions("test10"))
       for i in range(240):
           time.sleep(2)
           message_mix = str(int(i)) + message
           print(message_mix)
           producer.send((message_mix).encode('utf-8'), partition_key=key)
       time.sleep(1000)
       client.close()
   
   
   if __name__ == "__main__":
       p1 = Process(target=produce_test, args=("test10", "10000", "aaaa",))
       p2 = Process(target=produce_test, args=("test10", "20000", "bbbb",))
       p3 = Process(target=produce_test, args=("test10", "30000", "cccc",))
       p4 = Process(target=produce_test, args=("test10", "40000", "dddd",))
       p1.start()
       p2.start()
       p3.start()
       p4.start()
   ```
   
   The code of C1:
   ```python
   import pulsar
   import time
   from _pulsar import ConsumerType
   from multiprocessing import Process
   
   
   def consumer_data(topic, process_index):
       client = pulsar.Client('pulsar://localhost:6650')
       print(len(client._consumers))
       consumer = client.subscribe(topic, 'my-subscription', 
consumer_type=ConsumerType.KeyShared)
       flag = True
       start_time = int(time.time() * 1000)
       while True:
           msg = consumer.receive()
           print(
               "Process {} Received message '{}' id='{}' 
partition={}".format(process_index, msg.data(), msg.message_id(),
                                                                              
msg.topic_name()))
           consumer.acknowledge(msg)
   
   
   if __name__ == "__main__":
       p1 = Process(target=consumer_data, args=("test10", "p1", ))
       p1.start()
   ```
   
   The code of C2
   ```python
   import pulsar
   import time
   from _pulsar import ConsumerType
   from multiprocessing import Process
   
   
   def consumer_data(topic, process_index,):
       client = pulsar.Client('pulsar://localhost:6650')
       print(len(client._consumers))
       consumer = client.subscribe(topic, 'my-subscription', 
consumer_type=ConsumerType.KeyShared)
       flag = True
       start_time = int(time.time() * 1000)
       while True:
           msg = consumer.receive()
           print(
               "Process {} Received message '{}' id='{}' 
partition={}".format(process_index, msg.data(), msg.message_id(),
                                                                              
msg.topic_name()))
           consumer.acknowledge(msg)
   
   
   if __name__ == "__main__":
       p2 = Process(target=consumer_data, args=(["test10-partition-2", 
"test10-partition-3"], "2", ))
       p2.start()
   ```
   
   The output of C1:
   ```
   Process p1 Received message 'b'236dddd'' id='(15,236,0,-1)' 
partition=persistent://public/default/test10-partition-0
   Process p1 Received message 'b'236cccc'' id='(17,472,1,-1)' 
partition=persistent://public/default/test10-partition-1
   Process p1 Received message 'b'236aaaa'' id='(17,473,1,-1)' 
partition=persistent://public/default/test10-partition-1
   Process p1 Received message 'b'237dddd'' id='(15,237,0,-1)' 
partition=persistent://public/default/test10-partition-0
   Process p1 Received message 'b'237cccc'' id='(17,474,1,-1)' 
partition=persistent://public/default/test10-partition-1
   Process p1 Received message 'b'237aaaa'' id='(17,475,1,-1)' 
partition=persistent://public/default/test10-partition-1
   Process p1 Received message 'b'238dddd'' id='(15,238,0,-1)' 
partition=persistent://public/default/test10-partition-0
   Process p1 Received message 'b'238cccc'' id='(17,476,1,-1)' 
partition=persistent://public/default/test10-partition-1
   Process p1 Received message 'b'238aaaa'' id='(17,477,1,-1)' 
partition=persistent://public/default/test10-partition-1
   Process p1 Received message 'b'239dddd'' id='(15,239,0,-1)' 
partition=persistent://public/default/test10-partition-0
   Process p1 Received message 'b'239cccc'' id='(17,478,1,-1)' 
partition=persistent://public/default/test10-partition-1
   Process p1 Received message 'b'239aaaa'' id='(17,479,1,-1)' 
partition=persistent://public/default/test10-partition-1
   2020-08-03 15:54:37.531 INFO  [140699652921088] ConsumerStatsImpl:65 | 
Consumer [persistent://public/default/test10-partition-0, my-subscription, 0] , 
ConsumerStatsImpl (numBytesRecieved_ = 1535, totalNumBytesRecieved_ = 1535, 
receivedMsgMap_ = {[Key: Ok, Value: 233], }, ackedMsgMap_ = {[Key: {Result: Ok, 
ackType: 0}, Value: 233], }, totalReceivedMsgMap_ = {[Key: Ok, Value: 233], }, 
totalAckedMsgMap_ = {[Key: {Result: Ok, ackType: 0}, Value: 233], })
   2020-08-03 15:54:37.531 INFO  [140699652921088] ConsumerStatsImpl:65 | 
Consumer [persistent://public/default/test10-partition-1, my-subscription, 1] , 
ConsumerStatsImpl (numBytesRecieved_ = 3070, totalNumBytesRecieved_ = 3070, 
receivedMsgMap_ = {[Key: Ok, Value: 466], }, ackedMsgMap_ = {[Key: {Result: Ok, 
ackType: 0}, Value: 466], }, totalReceivedMsgMap_ = {[Key: Ok, Value: 466], }, 
totalAckedMsgMap_ = {[Key: {Result: Ok, ackType: 0}, Value: 466], })
   2020-08-03 15:54:37.531 INFO  [140699652921088] ConsumerStatsImpl:65 | 
Consumer [persistent://public/default/test10-partition-2, my-subscription, 2] , 
ConsumerStatsImpl (numBytesRecieved_ = 0, totalNumBytesRecieved_ = 0, 
receivedMsgMap_ = {}, ackedMsgMap_ = {}, totalReceivedMsgMap_ = {}, 
totalAckedMsgMap_ = {})
   2020-08-03 15:54:37.531 INFO  [140699652921088] ConsumerStatsImpl:65 | 
Consumer [persistent://public/default/test10-partition-3, my-subscription, 3] , 
ConsumerStatsImpl (numBytesRecieved_ = 0, totalNumBytesRecieved_ = 0, 
receivedMsgMap_ = {}, ackedMsgMap_ = {}, totalReceivedMsgMap_ = {}, 
totalAckedMsgMap_ = {})
   ```
   
   The output of C2:
   ```
   2020-08-03 15:49:46.326 INFO  [140382617937728] ConnectionPool:85 | Created 
connection for pulsar://localhost:6650
   2020-08-03 15:49:46.326 INFO  [140382567016192] ClientConnection:335 | 
[127.0.0.1:50638 -> 127.0.0.1:6650] Connected to broker
   2020-08-03 15:49:46.329 INFO  [140382567016192] AckGroupingTrackerEnabled:53 
| ACK grouping is enabled, grouping time 100ms, grouping max size 1000
   2020-08-03 15:49:46.329 INFO  [140382567016192] HandlerBase:53 | 
[persistent://public/default/test10-partition-2, my-subscription, 0] Getting 
connection from pool
   2020-08-03 15:49:46.329 INFO  [140382567016192] AckGroupingTrackerEnabled:53 
| ACK grouping is enabled, grouping time 100ms, grouping max size 1000
   2020-08-03 15:49:46.329 INFO  [140382567016192] HandlerBase:53 | 
[persistent://public/default/test10-partition-3, my-subscription, 1] Getting 
connection from pool
   2020-08-03 15:49:46.331 INFO  [140382567016192] ConsumerImpl:199 | 
[persistent://public/default/test10-partition-2, my-subscription, 0] Created 
consumer on broker [127.0.0.1:50638 -> 127.0.0.1:6650] 
   2020-08-03 15:49:46.331 INFO  [140382567016192] ConsumerImpl:199 | 
[persistent://public/default/test10-partition-3, my-subscription, 1] Created 
consumer on broker [127.0.0.1:50638 -> 127.0.0.1:6650] 
   2020-08-03 15:49:46.331 INFO  [140382567016192] MultiTopicsConsumerImpl:99 | 
Successfully Subscribed to Topics
   2020-08-03 15:59:46.330 INFO  [140382567016192] ConsumerStatsImpl:65 | 
Consumer [persistent://public/default/test10-partition-2, my-subscription, 0] , 
ConsumerStatsImpl (numBytesRecieved_ = 0, totalNumBytesRecieved_ = 0, 
receivedMsgMap_ = {}, ackedMsgMap_ = {}, totalReceivedMsgMap_ = {}, 
totalAckedMsgMap_ = {})
   2020-08-03 15:59:46.330 INFO  [140382567016192] ConsumerStatsImpl:65 | 
Consumer [persistent://public/default/test10-partition-3, my-subscription, 1] , 
ConsumerStatsImpl (numBytesRecieved_ = 0, totalNumBytesRecieved_ = 0, 
receivedMsgMap_ = {}, ackedMsgMap_ = {}, totalReceivedMsgMap_ = {}, 
totalAckedMsgMap_ = {})
   
   ```


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to