alphara commented on issue #7324:
URL: https://github.com/apache/pulsar/issues/7324#issuecomment-650343805
Here is a small example of code using asyncio.
```
from asyncio import get_event_loop, get_running_loop, sleep
from asyncio import CancelledError, Task
from signal import SIGINT, SIGTERM
from pulsar import Client, MessageId
async def start():
loop = get_running_loop()
executor = None
def sync_connect():
client = Client(service_url='pulsar://localhost:6650',
authentication=None)
return client
client = await loop.run_in_executor(executor, sync_connect)
def sync_subscribe():
topic = 'my-topic-1'
msg_id = MessageId.earliest
reader = client.create_reader(topic, msg_id)
return reader
reader = await loop.run_in_executor(executor, sync_subscribe)
def sync_receive():
msg = reader.read_next()
return msg
while True:
msg = await loop.run_in_executor(executor, sync_receive)
print("Received message '{}' id='{}'".format(msg.data(),
msg.message_id()))
client.close()
async def run():
''' Run in event loop '''
await start()
while True:
try:
await sleep(60)
except CancelledError:
break
get_running_loop().stop()
def signal_handler():
''' Signal handler '''
for task in Task.all_tasks():
task.cancel()
def main():
''' Program entry point '''
loop = get_event_loop()
loop.add_signal_handler(SIGINT, signal_handler)
loop.add_signal_handler(SIGTERM, signal_handler)
loop.run_until_complete(run())
loop.remove_signal_handler(SIGINT)
loop.remove_signal_handler(SIGTERM)
loop.close()
if __name__ == '__main__':
main()
```
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]