This is an automated email from the ASF dual-hosted git repository.
hgruszecki pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iggy.git
The following commit(s) were added to refs/heads/master by this push:
new c9a2eb80f feat(python): add AsyncIterator interface to IggyConsumer
(#2552)
c9a2eb80f is described below
commit c9a2eb80fa1d45c5efe7433e61fc75e17f581034
Author: Rimuksh Kansal <[email protected]>
AuthorDate: Tue Jan 20 21:16:31 2026 +0900
feat(python): add AsyncIterator interface to IggyConsumer (#2552)
This is continuation of #2169.
---
foreign/python/Cargo.toml | 1 +
foreign/python/apache_iggy.pyi | 23 ++++++++++++--
foreign/python/src/client.rs | 28 +++++++++++------
foreign/python/src/consumer.rs | 29 +++++++++++------
foreign/python/src/iterator.rs | 59 +++++++++++++++++++++++++++++++++++
foreign/python/src/lib.rs | 3 ++
foreign/python/src/receive_message.rs | 15 ++++-----
foreign/python/tests/test_iggy_sdk.py | 47 ++++++++++++++++++++++++++--
8 files changed, 172 insertions(+), 33 deletions(-)
diff --git a/foreign/python/Cargo.toml b/foreign/python/Cargo.toml
index 5d1b1b4d7..1b7a5c6dc 100644
--- a/foreign/python/Cargo.toml
+++ b/foreign/python/Cargo.toml
@@ -30,6 +30,7 @@ repository = "https://github.com/apache/iggy"
[dependencies]
bytes = "1.10.1"
+futures = "0.3.31"
iggy = { path = "../../core/sdk", version = "0.8.1-edge.1" }
pyo3 = "0.26.0"
pyo3-async-runtimes = { version = "0.26.0", features = [
diff --git a/foreign/python/apache_iggy.pyi b/foreign/python/apache_iggy.pyi
index a5141681b..0e319c5ac 100644
--- a/foreign/python/apache_iggy.pyi
+++ b/foreign/python/apache_iggy.pyi
@@ -265,6 +265,7 @@ class IggyClient:
name: builtins.str,
partitions_count: builtins.int,
compression_algorithm: typing.Optional[builtins.str] = None,
+ topic_id: typing.Optional[builtins.int] = None,
replication_factor: typing.Optional[builtins.int] = None,
) -> collections.abc.Awaitable[None]:
r"""
@@ -324,7 +325,7 @@ class IggyClient:
init_retries: typing.Optional[builtins.int] = None,
init_retry_interval: typing.Optional[datetime.timedelta] = None,
allow_replay: builtins.bool = False,
- ) -> IggyConsumer:
+ ) -> collections.abc.Awaitable[IggyConsumer]:
r"""
Creates a new consumer group consumer.
@@ -385,6 +386,18 @@ class IggyConsumer:
Returns `Ok(())` if the server responds successfully, or a
`PyRuntimeError`
if the operation fails.
"""
+ def iter_messages(self) -> collections.abc.AsyncIterator[ReceiveMessage]:
+ r"""
+ Asynchronously iterate over `ReceiveMessage`s.
+
+ Returns an async iterator that raises `StopAsyncIteration` when no
more messages are available
+ or a `PyRuntimeError` on failure.
+
+ Note: This method does not currently support `AutoCommit.After`.
+ For `AutoCommit.IntervalOrAfter(datetime.timedelta, AutoCommitAfter)`,
+ only the interval part is applied; the `after` mode is ignored.
+ Use `consume_messages()` if you need commit-after-processing semantics.
+ """
def consume_messages(
self,
callback: collections.abc.Callable[
@@ -467,6 +480,10 @@ class ReceiveMessage:
The length represents the length of the payload.
"""
+ def partition_id(self) -> builtins.int:
+ r"""
+ Retrieves the partition this message belongs to.
+ """
class SendMessage:
r"""
@@ -477,10 +494,10 @@ class SendMessage:
"""
def __new__(cls, data: builtins.str | bytes) -> SendMessage:
r"""
- Constructs a new `SendMessage` instance from a string.
+ Constructs a new `SendMessage` instance from a string or bytes.
This method allows for the creation of a `SendMessage` instance
- directly from Python using the provided string data.
+ directly from Python using the provided string or bytes data.
"""
class StreamDetails:
diff --git a/foreign/python/src/client.rs b/foreign/python/src/client.rs
index f30000d9f..b0db32fc3 100644
--- a/foreign/python/src/client.rs
+++ b/foreign/python/src/client.rs
@@ -16,9 +16,6 @@
* under the License.
*/
-use std::str::FromStr;
-use std::sync::Arc;
-
use iggy::prelude::{
Consumer as RustConsumer, IggyClient as RustIggyClient, IggyMessage as
RustMessage,
PollingStrategy as RustPollingStrategy, *,
@@ -28,6 +25,8 @@ use pyo3::types::{PyDelta, PyList, PyType};
use pyo3_async_runtimes::tokio::future_into_py;
use pyo3_stub_gen::define_stub_info_gatherer;
use pyo3_stub_gen::derive::{gen_stub_pyclass, gen_stub_pymethods};
+use std::str::FromStr;
+use std::sync::Arc;
use crate::consumer::{py_delta_to_iggy_duration, AutoCommit, IggyConsumer};
use crate::identifier::PyIdentifier;
@@ -314,7 +313,10 @@ impl IggyClient {
let messages = polled_messages
.messages
.into_iter()
- .map(ReceiveMessage::from_rust_message)
+ .map(|m| ReceiveMessage {
+ inner: m,
+ partition_id,
+ })
.collect::<Vec<_>>();
Ok(messages)
})
@@ -340,8 +342,10 @@ impl IggyClient {
init_retry_interval=None,
allow_replay=false,
))]
- fn consumer_group(
+
#[gen_stub(override_return_type(type_repr="collections.abc.Awaitable[IggyConsumer]",
imports=("collections.abc")))]
+ fn consumer_group<'a>(
&self,
+ py: Python<'a>,
name: &str,
stream: &str,
topic: &str,
@@ -356,7 +360,7 @@ impl IggyClient {
init_retries: Option<u32>,
init_retry_interval: Option<Py<PyDelta>>,
allow_replay: bool,
- ) -> PyResult<IggyConsumer> {
+ ) -> PyResult<Bound<'a, PyAny>> {
let mut builder = self
.inner
.consumer_group(name, stream, topic)
@@ -412,10 +416,16 @@ impl IggyClient {
if allow_replay {
builder = builder.allow_replay()
}
- let consumer = builder.build();
+ let mut consumer = builder.build();
- Ok(IggyConsumer {
- inner: Arc::new(Mutex::new(consumer)),
+ future_into_py(py, async move {
+ consumer
+ .init()
+ .await
+ .map_err(|e| PyErr::new::<pyo3::exceptions::PyRuntimeError,
_>(format!("{e:?}")))?;
+ Ok(IggyConsumer {
+ inner: Arc::new(Mutex::new(consumer)),
+ })
})
}
}
diff --git a/foreign/python/src/consumer.rs b/foreign/python/src/consumer.rs
index 8a830aeb2..b2ed74f76 100644
--- a/foreign/python/src/consumer.rs
+++ b/foreign/python/src/consumer.rs
@@ -36,6 +36,7 @@ use tokio::sync::Mutex;
use tokio::task::JoinHandle;
use crate::identifier::PyIdentifier;
+use crate::iterator::ReceiveMessageIterator;
use crate::receive_message::ReceiveMessage;
/// A Python class representing the Iggy consumer.
@@ -129,6 +130,21 @@ impl IggyConsumer {
})
}
+ /// Asynchronously iterate over `ReceiveMessage`s.
+ ///
+ /// Returns an async iterator that raises `StopAsyncIteration` when no
more messages are available
+ /// or a `PyRuntimeError` on failure.
+ ///
+ /// Note: This method does not currently support `AutoCommit.After`.
+ /// For `AutoCommit.IntervalOrAfter(datetime.timedelta, AutoCommitAfter)`,
+ /// only the interval part is applied; the `after` mode is ignored.
+ /// Use `consume_messages()` if you need commit-after-processing semantics.
+
#[gen_stub(override_return_type(type_repr="collections.abc.AsyncIterator[ReceiveMessage]",
imports=("collections.abc")))]
+ fn iter_messages<'a>(&self) -> ReceiveMessageIterator {
+ let inner = self.inner.clone();
+ ReceiveMessageIterator { inner }
+ }
+
/// Consumes messages continuously using a callback function and an
optional `asyncio.Event` for signaling shutdown.
///
/// Returns an awaitable that completes when shutdown is signaled or a
PyRuntimeError on failure.
@@ -148,14 +164,6 @@ impl IggyConsumer {
future_into_py(py, async {
let (shutdown_tx, shutdown_rx) =
tokio::sync::oneshot::channel::<()>();
- let inner_init = inner.clone();
- let mut inner_init = inner_init.lock().await;
- inner_init
- .init()
- .await
- .map_err(|e| PyErr::new::<pyo3::exceptions::PyRuntimeError,
_>(format!("{e:?}")))?;
- drop(inner_init);
-
let task_locals =
Python::with_gil(pyo3_async_runtimes::tokio::get_current_locals)?;
let handle_consume = get_runtime().spawn(scope(task_locals, async
move {
let task_locals =
@@ -219,7 +227,10 @@ impl MessageConsumer for PyCallbackConsumer {
let callback = self.callback.clone();
let task_locals = self.task_locals.clone().lock_owned().await;
let task_locals = Python::with_gil(|py| task_locals.clone_ref(py));
- let message = ReceiveMessage::from_rust_message(received.message);
+ let message = ReceiveMessage {
+ inner: received.message,
+ partition_id: received.partition_id,
+ };
get_runtime()
.spawn(scope(task_locals, async move {
Python::with_gil(|py| {
diff --git a/foreign/python/src/iterator.rs b/foreign/python/src/iterator.rs
new file mode 100644
index 000000000..3131d28c4
--- /dev/null
+++ b/foreign/python/src/iterator.rs
@@ -0,0 +1,59 @@
+/* Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+use std::sync::Arc;
+
+use futures::StreamExt;
+use iggy::prelude::IggyConsumer as RustIggyConsumer;
+use pyo3::exceptions::PyStopIteration;
+
+use crate::receive_message::ReceiveMessage;
+use pyo3::prelude::*;
+use pyo3_async_runtimes::tokio::future_into_py;
+use tokio::sync::Mutex;
+
+#[pyclass]
+pub struct ReceiveMessageIterator {
+ pub(crate) inner: Arc<Mutex<RustIggyConsumer>>,
+}
+
+#[pymethods]
+impl ReceiveMessageIterator {
+ pub fn __anext__<'a>(&self, py: Python<'a>) -> PyResult<Bound<'a, PyAny>> {
+ let inner = self.inner.clone();
+ future_into_py(py, async move {
+ let mut inner = inner.lock().await;
+ if let Some(message) = inner.next().await {
+ Ok(message
+ .map(|m| ReceiveMessage {
+ inner: m.message,
+ partition_id: m.partition_id,
+ })
+ .map_err(|e| {
+ PyErr::new::<pyo3::exceptions::PyRuntimeError,
_>(format!("{e:?}"))
+ })?)
+ } else {
+ Err(PyStopIteration::new_err("No more messages"))
+ }
+ })
+ }
+
+ pub fn __aiter__(slf: PyRef<'_, Self>) -> PyRef<'_, Self> {
+ slf
+ }
+}
diff --git a/foreign/python/src/lib.rs b/foreign/python/src/lib.rs
index efc12cab5..63afe785a 100644
--- a/foreign/python/src/lib.rs
+++ b/foreign/python/src/lib.rs
@@ -19,6 +19,7 @@
pub mod client;
mod consumer;
mod identifier;
+mod iterator;
mod receive_message;
mod send_message;
mod stream;
@@ -26,6 +27,7 @@ mod topic;
use client::IggyClient;
use consumer::{AutoCommit, AutoCommitAfter, AutoCommitWhen, IggyConsumer};
+use iterator::ReceiveMessageIterator;
use pyo3::prelude::*;
use receive_message::{PollingStrategy, ReceiveMessage};
use send_message::SendMessage;
@@ -45,5 +47,6 @@ fn apache_iggy(_py: Python, m: &Bound<'_, PyModule>) ->
PyResult<()> {
m.add_class::<AutoCommit>()?;
m.add_class::<AutoCommitAfter>()?;
m.add_class::<AutoCommitWhen>()?;
+ m.add_class::<ReceiveMessageIterator>()?;
Ok(())
}
diff --git a/foreign/python/src/receive_message.rs
b/foreign/python/src/receive_message.rs
index cd3be2978..078e59045 100644
--- a/foreign/python/src/receive_message.rs
+++ b/foreign/python/src/receive_message.rs
@@ -28,15 +28,7 @@ use pyo3_stub_gen::derive::{gen_stub_pyclass,
gen_stub_pyclass_complex_enum, gen
#[gen_stub_pyclass]
pub struct ReceiveMessage {
pub(crate) inner: RustReceiveMessage,
-}
-
-impl ReceiveMessage {
- /// Converts a Rust message into its corresponding Python representation.
- ///
- /// This is an internal utility function, not exposed to Python.
- pub(crate) fn from_rust_message(message: RustReceiveMessage) -> Self {
- Self { inner: message }
- }
+ pub(crate) partition_id: u32,
}
#[gen_stub_pymethods]
@@ -83,6 +75,11 @@ impl ReceiveMessage {
pub fn length(&self) -> u32 {
self.inner.header.payload_length
}
+
+ /// Retrieves the partition this message belongs to.
+ pub fn partition_id(&self) -> u32 {
+ self.partition_id
+ }
}
#[derive(Clone, Copy)]
diff --git a/foreign/python/tests/test_iggy_sdk.py
b/foreign/python/tests/test_iggy_sdk.py
index 142e9ce62..e82620530 100644
--- a/foreign/python/tests/test_iggy_sdk.py
+++ b/foreign/python/tests/test_iggy_sdk.py
@@ -445,7 +445,7 @@ class TestConsumerGroup:
await iggy_client.create_topic(
stream=stream_name, name=topic_name, partitions_count=1
)
- consumer = iggy_client.consumer_group(
+ consumer = await iggy_client.consumer_group(
consumer_name,
stream_name,
topic_name,
@@ -482,7 +482,7 @@ class TestConsumerGroup:
stream=stream_name, name=topic_name, partitions_count=1
)
- consumer = iggy_client.consumer_group(
+ consumer = await iggy_client.consumer_group(
consumer_name,
stream_name,
topic_name,
@@ -510,6 +510,47 @@ class TestConsumerGroup:
assert received_messages == test_messages
+ @pytest.mark.asyncio
+ async def test_iter_messages(self, iggy_client: IggyClient,
consumer_group_setup):
+ """Test that the consumer group can consume messages."""
+ consumer_name = consumer_group_setup["consumer"]
+ stream_name = consumer_group_setup["stream"]
+ topic_name = consumer_group_setup["topic"]
+ partition_id = consumer_group_setup["partition_id"]
+ test_messages = consumer_group_setup["messages"]
+
+ # Setup
+ received_messages = []
+ await iggy_client.create_stream(stream_name)
+ await iggy_client.create_topic(
+ stream=stream_name, name=topic_name, partitions_count=1
+ )
+
+ consumer = await iggy_client.consumer_group(
+ consumer_name,
+ stream_name,
+ topic_name,
+ partition_id,
+ PollingStrategy.Next(),
+ 10,
+ auto_commit=AutoCommit.Interval(timedelta(seconds=5)),
+ poll_interval=timedelta(seconds=1),
+ )
+
+ await iggy_client.send_messages(
+ stream_name,
+ topic_name,
+ partition_id,
+ [Message(m) for m in test_messages],
+ )
+
+ async for message in consumer.iter_messages():
+ received_messages.append(message.payload().decode())
+ if len(received_messages) == 5:
+ break
+
+ assert received_messages == test_messages
+
@pytest.mark.asyncio
async def test_shutdown(self, iggy_client: IggyClient,
consumer_group_setup):
"""Test that the consumer group can be signaled to shutdown."""
@@ -525,7 +566,7 @@ class TestConsumerGroup:
stream=stream_name, name=topic_name, partitions_count=1
)
- consumer = iggy_client.consumer_group(
+ consumer = await iggy_client.consumer_group(
consumer_name,
stream_name,
topic_name,