graceon opened a new issue #11837:
URL: https://github.com/apache/pulsar/issues/11837


   server version 2.8.0
   python client version 2.8.0
   `
   # /docker/pulsar/pulsar_test_prefetch.py 
   from pulsar import Client,Consumer,Producer
   import pulsar
   from multiprocessing import Queue,Process, process
   import time
   
   from pulsar.schema.schema import StringSchema
   
   import sys
   
   
   
   def PulsarConsumer(time_process:float):
       count_val = 0
       client = Client('pulsar://localhost:6650')
       consumer = client.subscribe(
           'my_topic',"my_subscribe",
           schema=StringSchema(),
           consumer_type = pulsar.ConsumerType.Shared,
           receiver_queue_size = 1,
       )
       while True:
           try:
               msg = consumer.receive()
               count_val += 1
               print("count:{0:03d} value{1}".format(count_val,msg.value()))
               time.sleep(time_process)
               consumer.acknowledge(msg)
           except pulsar.Timeout:
               pass
           except Exception as e:
               print(e)
               pass
   def PulsarProducer(count_send_message:int):
       client = Client('pulsar://localhost:6650')
       producer = client.create_producer('my_topic',schema=StringSchema())
       for i in range(0,50):
           time.sleep(0.05)
           value = "{0:03d}".format(i)
           print("send",value,sep=" ")
           producer.send(value)
       producer.flush()
       print("flush")
   
   
   process_array = []
   print(sys.argv[1]==0)
   if int(sys.argv[1]) == 0:
       process_array.append(Process(target=PulsarProducer, 
args=(int(sys.argv[2]),)))
   if int(sys.argv[1]) == 1:
       process_array.append(Process(target=PulsarConsumer, 
args=(float(sys.argv[2]),)))
   
   for item in process_array:
       item.start()
   for item in process_array:
       item.join()
   `
   I run this with in 3 shell
   python3 /docker/pulsar/pulsar_test_prefetch.py 0 50
   python3 /docker/pulsar/pulsar_test_prefetch.py 1 0.01
   python3 /docker/pulsar/pulsar_test_prefetch.py 1 20
   the result of python3 /docker/pulsar/pulsar_test_prefetch.py 1 0.01 is
   `count:001 value002
   count:002 value003
   count:003 value004
   count:004 value005
   count:005 value006
   count:006 value007
   count:007 value008
   count:008 value009
   count:009 value010
   count:010 value011
   count:011 value012
   count:012 value013
   count:013 value014
   count:014 value015
   count:015 value016
   count:016 value017
   count:017 value018
   count:018 value019
   count:019 value020
   count:020 value021
   count:021 value022
   count:022 value023
   count:023 value024
   count:024 value025
   count:025 value026
   count:026 value027
   count:027 value028
   count:028 value029
   count:029 value030
   count:030 value031
   count:031 value032
   count:032 value033
   count:033 value034
   count:034 value035
   count:035 value036
   count:036 value037
   count:037 value038
   count:038 value039
   count:039 value040
   count:040 value041
   count:041 value042
   count:042 value043
   count:043 value044
   count:044 value045
   count:045 value046
   count:046 value047
   count:047 value048
   count:048 value049
   `
   the result of python3 /docker/pulsar/pulsar_test_prefetch.py 1 20 is
   `
   count:001 value000
   count:002 value001
   `
   
   my expectation is the second consumer do not prefetch and the first consumer 
 consumer the 49/50 of messages


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to