graceon opened a new issue #11837:
URL: https://github.com/apache/pulsar/issues/11837
server version 2.8.0
python client version 2.8.0
`
# /docker/pulsar/pulsar_test_prefetch.py
from pulsar import Client,Consumer,Producer
import pulsar
from multiprocessing import Queue,Process, process
import time
from pulsar.schema.schema import StringSchema
import sys
def PulsarConsumer(time_process:float):
count_val = 0
client = Client('pulsar://localhost:6650')
consumer = client.subscribe(
'my_topic',"my_subscribe",
schema=StringSchema(),
consumer_type = pulsar.ConsumerType.Shared,
receiver_queue_size = 1,
)
while True:
try:
msg = consumer.receive()
count_val += 1
print("count:{0:03d} value{1}".format(count_val,msg.value()))
time.sleep(time_process)
consumer.acknowledge(msg)
except pulsar.Timeout:
pass
except Exception as e:
print(e)
pass
def PulsarProducer(count_send_message:int):
client = Client('pulsar://localhost:6650')
producer = client.create_producer('my_topic',schema=StringSchema())
for i in range(0,50):
time.sleep(0.05)
value = "{0:03d}".format(i)
print("send",value,sep=" ")
producer.send(value)
producer.flush()
print("flush")
process_array = []
print(sys.argv[1]==0)
if int(sys.argv[1]) == 0:
process_array.append(Process(target=PulsarProducer,
args=(int(sys.argv[2]),)))
if int(sys.argv[1]) == 1:
process_array.append(Process(target=PulsarConsumer,
args=(float(sys.argv[2]),)))
for item in process_array:
item.start()
for item in process_array:
item.join()
`
I run this with in 3 shell
python3 /docker/pulsar/pulsar_test_prefetch.py 0 50
python3 /docker/pulsar/pulsar_test_prefetch.py 1 0.01
python3 /docker/pulsar/pulsar_test_prefetch.py 1 20
the result of python3 /docker/pulsar/pulsar_test_prefetch.py 1 0.01 is
`count:001 value002
count:002 value003
count:003 value004
count:004 value005
count:005 value006
count:006 value007
count:007 value008
count:008 value009
count:009 value010
count:010 value011
count:011 value012
count:012 value013
count:013 value014
count:014 value015
count:015 value016
count:016 value017
count:017 value018
count:018 value019
count:019 value020
count:020 value021
count:021 value022
count:022 value023
count:023 value024
count:024 value025
count:025 value026
count:026 value027
count:027 value028
count:028 value029
count:029 value030
count:030 value031
count:031 value032
count:032 value033
count:033 value034
count:034 value035
count:035 value036
count:036 value037
count:037 value038
count:038 value039
count:039 value040
count:040 value041
count:041 value042
count:042 value043
count:043 value044
count:044 value045
count:045 value046
count:046 value047
count:047 value048
count:048 value049
`
the result of python3 /docker/pulsar/pulsar_test_prefetch.py 1 20 is
`
count:001 value000
count:002 value001
`
my expectation is the second consumer do not prefetch and the first consumer
consumer the 49/50 of messages
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]