This is an automated email from the ASF dual-hosted git repository.
xyz pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/pulsar-client-python.git
The following commit(s) were added to refs/heads/main by this push:
new a189711 chore: add async example (#291)
a189711 is described below
commit a189711033f3a0a7e7b43e9fa77b57b7d153cc50
Author: Nikolas Achatz <[email protected]>
AuthorDate: Mon Feb 9 00:25:27 2026 -0500
chore: add async example (#291)
---
examples/async.py | 71 +++++++++++++++++++++++++++++++++++++++++++++++++++++++
1 file changed, 71 insertions(+)
diff --git a/examples/async.py b/examples/async.py
new file mode 100644
index 0000000..372e549
--- /dev/null
+++ b/examples/async.py
@@ -0,0 +1,71 @@
+#!/usr/bin/env python3
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+
+# NOTE: requires version 3.9.0+ of pulsar-client library
+import asyncio
+from pulsar.asyncio import Client, Producer, Consumer
+from pulsar import BatchingType
+
+NUM_MESSAGES = 100
+TOPIC_NAME = 'my-async-topic'
+SUBSCRIPTION_NAME = 'my-async-subscription'
+SERVICE_URL = 'pulsar://localhost:6650'
+
+async def produce(producer: Producer, id: int) -> None:
+ await producer.send((f'hello-{id}').encode('utf-8'), None)
+ await producer.flush()
+
+async def consume(consumer: Consumer) -> None:
+ msg = await consumer.receive()
+ print("Received message '{0}' id='{1}'".format(msg.data().decode('utf-8'),
msg.message_id()))
+ await consumer.acknowledge(msg)
+
+async def main() -> None:
+ client: Client = Client(SERVICE_URL)
+ consumer = await client.subscribe(TOPIC_NAME, SUBSCRIPTION_NAME,
+ properties={
+ "consumer-name": "test-consumer-name",
+ "consumer-id": "test-consumer-id"
+ })
+
+ producer = await client.create_producer(
+ TOPIC_NAME,
+ block_if_queue_full=True,
+ batching_enabled=True,
+ batching_max_publish_delay_ms=10,
+ properties={
+ "producer-name": "test-producer-name",
+ "producer-id": "test-producer-id"
+ },
+ batching_type=BatchingType.KeyBased
+ )
+
+ tasks = []
+ for id in range(NUM_MESSAGES):
+ tasks.append(asyncio.create_task(produce(producer, id)))
+ tasks.append(asyncio.create_task(consume(consumer)))
+ await asyncio.gather(*tasks)
+
+ await producer.close()
+ await consumer.close()
+ await client.close()
+
+if __name__ == '__main__':
+ asyncio.run(main())