This is an automated email from the ASF dual-hosted git repository.

hgruszecki pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iggy.git


The following commit(s) were added to refs/heads/master by this push:
     new c9a2eb80f feat(python): add AsyncIterator interface to IggyConsumer 
(#2552)
c9a2eb80f is described below

commit c9a2eb80fa1d45c5efe7433e61fc75e17f581034
Author: Rimuksh Kansal <[email protected]>
AuthorDate: Tue Jan 20 21:16:31 2026 +0900

    feat(python): add AsyncIterator interface to IggyConsumer (#2552)
    
    This is continuation of #2169.
---
 foreign/python/Cargo.toml             |  1 +
 foreign/python/apache_iggy.pyi        | 23 ++++++++++++--
 foreign/python/src/client.rs          | 28 +++++++++++------
 foreign/python/src/consumer.rs        | 29 +++++++++++------
 foreign/python/src/iterator.rs        | 59 +++++++++++++++++++++++++++++++++++
 foreign/python/src/lib.rs             |  3 ++
 foreign/python/src/receive_message.rs | 15 ++++-----
 foreign/python/tests/test_iggy_sdk.py | 47 ++++++++++++++++++++++++++--
 8 files changed, 172 insertions(+), 33 deletions(-)

diff --git a/foreign/python/Cargo.toml b/foreign/python/Cargo.toml
index 5d1b1b4d7..1b7a5c6dc 100644
--- a/foreign/python/Cargo.toml
+++ b/foreign/python/Cargo.toml
@@ -30,6 +30,7 @@ repository = "https://github.com/apache/iggy";
 
 [dependencies]
 bytes = "1.10.1"
+futures = "0.3.31"
 iggy = { path = "../../core/sdk", version = "0.8.1-edge.1" }
 pyo3 = "0.26.0"
 pyo3-async-runtimes = { version = "0.26.0", features = [
diff --git a/foreign/python/apache_iggy.pyi b/foreign/python/apache_iggy.pyi
index a5141681b..0e319c5ac 100644
--- a/foreign/python/apache_iggy.pyi
+++ b/foreign/python/apache_iggy.pyi
@@ -265,6 +265,7 @@ class IggyClient:
         name: builtins.str,
         partitions_count: builtins.int,
         compression_algorithm: typing.Optional[builtins.str] = None,
+        topic_id: typing.Optional[builtins.int] = None,
         replication_factor: typing.Optional[builtins.int] = None,
     ) -> collections.abc.Awaitable[None]:
         r"""
@@ -324,7 +325,7 @@ class IggyClient:
         init_retries: typing.Optional[builtins.int] = None,
         init_retry_interval: typing.Optional[datetime.timedelta] = None,
         allow_replay: builtins.bool = False,
-    ) -> IggyConsumer:
+    ) -> collections.abc.Awaitable[IggyConsumer]:
         r"""
         Creates a new consumer group consumer.
 
@@ -385,6 +386,18 @@ class IggyConsumer:
         Returns `Ok(())` if the server responds successfully, or a 
`PyRuntimeError`
         if the operation fails.
         """
+    def iter_messages(self) -> collections.abc.AsyncIterator[ReceiveMessage]:
+        r"""
+        Asynchronously iterate over `ReceiveMessage`s.
+
+        Returns an async iterator that raises `StopAsyncIteration` when no 
more messages are available
+        or a `PyRuntimeError` on failure.
+
+        Note: This method does not currently support `AutoCommit.After`.
+        For `AutoCommit.IntervalOrAfter(datetime.timedelta, AutoCommitAfter)`,
+        only the interval part is applied; the `after` mode is ignored.
+        Use `consume_messages()` if you need commit-after-processing semantics.
+        """
     def consume_messages(
         self,
         callback: collections.abc.Callable[
@@ -467,6 +480,10 @@ class ReceiveMessage:
 
         The length represents the length of the payload.
         """
+    def partition_id(self) -> builtins.int:
+        r"""
+        Retrieves the partition this message belongs to.
+        """
 
 class SendMessage:
     r"""
@@ -477,10 +494,10 @@ class SendMessage:
     """
     def __new__(cls, data: builtins.str | bytes) -> SendMessage:
         r"""
-        Constructs a new `SendMessage` instance from a string.
+        Constructs a new `SendMessage` instance from a string or bytes.
 
         This method allows for the creation of a `SendMessage` instance
-        directly from Python using the provided string data.
+        directly from Python using the provided string or bytes data.
         """
 
 class StreamDetails:
diff --git a/foreign/python/src/client.rs b/foreign/python/src/client.rs
index f30000d9f..b0db32fc3 100644
--- a/foreign/python/src/client.rs
+++ b/foreign/python/src/client.rs
@@ -16,9 +16,6 @@
  * under the License.
  */
 
-use std::str::FromStr;
-use std::sync::Arc;
-
 use iggy::prelude::{
     Consumer as RustConsumer, IggyClient as RustIggyClient, IggyMessage as 
RustMessage,
     PollingStrategy as RustPollingStrategy, *,
@@ -28,6 +25,8 @@ use pyo3::types::{PyDelta, PyList, PyType};
 use pyo3_async_runtimes::tokio::future_into_py;
 use pyo3_stub_gen::define_stub_info_gatherer;
 use pyo3_stub_gen::derive::{gen_stub_pyclass, gen_stub_pymethods};
+use std::str::FromStr;
+use std::sync::Arc;
 
 use crate::consumer::{py_delta_to_iggy_duration, AutoCommit, IggyConsumer};
 use crate::identifier::PyIdentifier;
@@ -314,7 +313,10 @@ impl IggyClient {
             let messages = polled_messages
                 .messages
                 .into_iter()
-                .map(ReceiveMessage::from_rust_message)
+                .map(|m| ReceiveMessage {
+                    inner: m,
+                    partition_id,
+                })
                 .collect::<Vec<_>>();
             Ok(messages)
         })
@@ -340,8 +342,10 @@ impl IggyClient {
         init_retry_interval=None,
         allow_replay=false,
     ))]
-    fn consumer_group(
+    
#[gen_stub(override_return_type(type_repr="collections.abc.Awaitable[IggyConsumer]",
 imports=("collections.abc")))]
+    fn consumer_group<'a>(
         &self,
+        py: Python<'a>,
         name: &str,
         stream: &str,
         topic: &str,
@@ -356,7 +360,7 @@ impl IggyClient {
         init_retries: Option<u32>,
         init_retry_interval: Option<Py<PyDelta>>,
         allow_replay: bool,
-    ) -> PyResult<IggyConsumer> {
+    ) -> PyResult<Bound<'a, PyAny>> {
         let mut builder = self
             .inner
             .consumer_group(name, stream, topic)
@@ -412,10 +416,16 @@ impl IggyClient {
         if allow_replay {
             builder = builder.allow_replay()
         }
-        let consumer = builder.build();
+        let mut consumer = builder.build();
 
-        Ok(IggyConsumer {
-            inner: Arc::new(Mutex::new(consumer)),
+        future_into_py(py, async move {
+            consumer
+                .init()
+                .await
+                .map_err(|e| PyErr::new::<pyo3::exceptions::PyRuntimeError, 
_>(format!("{e:?}")))?;
+            Ok(IggyConsumer {
+                inner: Arc::new(Mutex::new(consumer)),
+            })
         })
     }
 }
diff --git a/foreign/python/src/consumer.rs b/foreign/python/src/consumer.rs
index 8a830aeb2..b2ed74f76 100644
--- a/foreign/python/src/consumer.rs
+++ b/foreign/python/src/consumer.rs
@@ -36,6 +36,7 @@ use tokio::sync::Mutex;
 use tokio::task::JoinHandle;
 
 use crate::identifier::PyIdentifier;
+use crate::iterator::ReceiveMessageIterator;
 use crate::receive_message::ReceiveMessage;
 
 /// A Python class representing the Iggy consumer.
@@ -129,6 +130,21 @@ impl IggyConsumer {
         })
     }
 
+    /// Asynchronously iterate over `ReceiveMessage`s.
+    ///
+    /// Returns an async iterator that raises `StopAsyncIteration` when no 
more messages are available
+    /// or a `PyRuntimeError` on failure.
+    ///
+    /// Note: This method does not currently support `AutoCommit.After`.
+    /// For `AutoCommit.IntervalOrAfter(datetime.timedelta, AutoCommitAfter)`,
+    /// only the interval part is applied; the `after` mode is ignored.
+    /// Use `consume_messages()` if you need commit-after-processing semantics.
+    
#[gen_stub(override_return_type(type_repr="collections.abc.AsyncIterator[ReceiveMessage]",
 imports=("collections.abc")))]
+    fn iter_messages<'a>(&self) -> ReceiveMessageIterator {
+        let inner = self.inner.clone();
+        ReceiveMessageIterator { inner }
+    }
+
     /// Consumes messages continuously using a callback function and an 
optional `asyncio.Event` for signaling shutdown.
     ///
     /// Returns an awaitable that completes when shutdown is signaled or a 
PyRuntimeError on failure.
@@ -148,14 +164,6 @@ impl IggyConsumer {
         future_into_py(py, async {
             let (shutdown_tx, shutdown_rx) = 
tokio::sync::oneshot::channel::<()>();
 
-            let inner_init = inner.clone();
-            let mut inner_init = inner_init.lock().await;
-            inner_init
-                .init()
-                .await
-                .map_err(|e| PyErr::new::<pyo3::exceptions::PyRuntimeError, 
_>(format!("{e:?}")))?;
-            drop(inner_init);
-
             let task_locals = 
Python::with_gil(pyo3_async_runtimes::tokio::get_current_locals)?;
             let handle_consume = get_runtime().spawn(scope(task_locals, async 
move {
                 let task_locals =
@@ -219,7 +227,10 @@ impl MessageConsumer for PyCallbackConsumer {
         let callback = self.callback.clone();
         let task_locals = self.task_locals.clone().lock_owned().await;
         let task_locals = Python::with_gil(|py| task_locals.clone_ref(py));
-        let message = ReceiveMessage::from_rust_message(received.message);
+        let message = ReceiveMessage {
+            inner: received.message,
+            partition_id: received.partition_id,
+        };
         get_runtime()
             .spawn(scope(task_locals, async move {
                 Python::with_gil(|py| {
diff --git a/foreign/python/src/iterator.rs b/foreign/python/src/iterator.rs
new file mode 100644
index 000000000..3131d28c4
--- /dev/null
+++ b/foreign/python/src/iterator.rs
@@ -0,0 +1,59 @@
+/* Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+use std::sync::Arc;
+
+use futures::StreamExt;
+use iggy::prelude::IggyConsumer as RustIggyConsumer;
+use pyo3::exceptions::PyStopIteration;
+
+use crate::receive_message::ReceiveMessage;
+use pyo3::prelude::*;
+use pyo3_async_runtimes::tokio::future_into_py;
+use tokio::sync::Mutex;
+
+#[pyclass]
+pub struct ReceiveMessageIterator {
+    pub(crate) inner: Arc<Mutex<RustIggyConsumer>>,
+}
+
+#[pymethods]
+impl ReceiveMessageIterator {
+    pub fn __anext__<'a>(&self, py: Python<'a>) -> PyResult<Bound<'a, PyAny>> {
+        let inner = self.inner.clone();
+        future_into_py(py, async move {
+            let mut inner = inner.lock().await;
+            if let Some(message) = inner.next().await {
+                Ok(message
+                    .map(|m| ReceiveMessage {
+                        inner: m.message,
+                        partition_id: m.partition_id,
+                    })
+                    .map_err(|e| {
+                        PyErr::new::<pyo3::exceptions::PyRuntimeError, 
_>(format!("{e:?}"))
+                    })?)
+            } else {
+                Err(PyStopIteration::new_err("No more messages"))
+            }
+        })
+    }
+
+    pub fn __aiter__(slf: PyRef<'_, Self>) -> PyRef<'_, Self> {
+        slf
+    }
+}
diff --git a/foreign/python/src/lib.rs b/foreign/python/src/lib.rs
index efc12cab5..63afe785a 100644
--- a/foreign/python/src/lib.rs
+++ b/foreign/python/src/lib.rs
@@ -19,6 +19,7 @@
 pub mod client;
 mod consumer;
 mod identifier;
+mod iterator;
 mod receive_message;
 mod send_message;
 mod stream;
@@ -26,6 +27,7 @@ mod topic;
 
 use client::IggyClient;
 use consumer::{AutoCommit, AutoCommitAfter, AutoCommitWhen, IggyConsumer};
+use iterator::ReceiveMessageIterator;
 use pyo3::prelude::*;
 use receive_message::{PollingStrategy, ReceiveMessage};
 use send_message::SendMessage;
@@ -45,5 +47,6 @@ fn apache_iggy(_py: Python, m: &Bound<'_, PyModule>) -> 
PyResult<()> {
     m.add_class::<AutoCommit>()?;
     m.add_class::<AutoCommitAfter>()?;
     m.add_class::<AutoCommitWhen>()?;
+    m.add_class::<ReceiveMessageIterator>()?;
     Ok(())
 }
diff --git a/foreign/python/src/receive_message.rs 
b/foreign/python/src/receive_message.rs
index cd3be2978..078e59045 100644
--- a/foreign/python/src/receive_message.rs
+++ b/foreign/python/src/receive_message.rs
@@ -28,15 +28,7 @@ use pyo3_stub_gen::derive::{gen_stub_pyclass, 
gen_stub_pyclass_complex_enum, gen
 #[gen_stub_pyclass]
 pub struct ReceiveMessage {
     pub(crate) inner: RustReceiveMessage,
-}
-
-impl ReceiveMessage {
-    /// Converts a Rust message into its corresponding Python representation.
-    ///
-    /// This is an internal utility function, not exposed to Python.
-    pub(crate) fn from_rust_message(message: RustReceiveMessage) -> Self {
-        Self { inner: message }
-    }
+    pub(crate) partition_id: u32,
 }
 
 #[gen_stub_pymethods]
@@ -83,6 +75,11 @@ impl ReceiveMessage {
     pub fn length(&self) -> u32 {
         self.inner.header.payload_length
     }
+
+    /// Retrieves the partition this message belongs to.
+    pub fn partition_id(&self) -> u32 {
+        self.partition_id
+    }
 }
 
 #[derive(Clone, Copy)]
diff --git a/foreign/python/tests/test_iggy_sdk.py 
b/foreign/python/tests/test_iggy_sdk.py
index 142e9ce62..e82620530 100644
--- a/foreign/python/tests/test_iggy_sdk.py
+++ b/foreign/python/tests/test_iggy_sdk.py
@@ -445,7 +445,7 @@ class TestConsumerGroup:
         await iggy_client.create_topic(
             stream=stream_name, name=topic_name, partitions_count=1
         )
-        consumer = iggy_client.consumer_group(
+        consumer = await iggy_client.consumer_group(
             consumer_name,
             stream_name,
             topic_name,
@@ -482,7 +482,7 @@ class TestConsumerGroup:
             stream=stream_name, name=topic_name, partitions_count=1
         )
 
-        consumer = iggy_client.consumer_group(
+        consumer = await iggy_client.consumer_group(
             consumer_name,
             stream_name,
             topic_name,
@@ -510,6 +510,47 @@ class TestConsumerGroup:
 
         assert received_messages == test_messages
 
+    @pytest.mark.asyncio
+    async def test_iter_messages(self, iggy_client: IggyClient, 
consumer_group_setup):
+        """Test that the consumer group can consume messages."""
+        consumer_name = consumer_group_setup["consumer"]
+        stream_name = consumer_group_setup["stream"]
+        topic_name = consumer_group_setup["topic"]
+        partition_id = consumer_group_setup["partition_id"]
+        test_messages = consumer_group_setup["messages"]
+
+        # Setup
+        received_messages = []
+        await iggy_client.create_stream(stream_name)
+        await iggy_client.create_topic(
+            stream=stream_name, name=topic_name, partitions_count=1
+        )
+
+        consumer = await iggy_client.consumer_group(
+            consumer_name,
+            stream_name,
+            topic_name,
+            partition_id,
+            PollingStrategy.Next(),
+            10,
+            auto_commit=AutoCommit.Interval(timedelta(seconds=5)),
+            poll_interval=timedelta(seconds=1),
+        )
+
+        await iggy_client.send_messages(
+            stream_name,
+            topic_name,
+            partition_id,
+            [Message(m) for m in test_messages],
+        )
+
+        async for message in consumer.iter_messages():
+            received_messages.append(message.payload().decode())
+            if len(received_messages) == 5:
+                break
+
+        assert received_messages == test_messages
+
     @pytest.mark.asyncio
     async def test_shutdown(self, iggy_client: IggyClient, 
consumer_group_setup):
         """Test that the consumer group can be signaled to shutdown."""
@@ -525,7 +566,7 @@ class TestConsumerGroup:
             stream=stream_name, name=topic_name, partitions_count=1
         )
 
-        consumer = iggy_client.consumer_group(
+        consumer = await iggy_client.consumer_group(
             consumer_name,
             stream_name,
             topic_name,

Reply via email to