This is an automated email from the ASF dual-hosted git repository.

piotr pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iggy.git


The following commit(s) were added to refs/heads/master by this push:
     new f24a11a2 fix(python): add proper typings for async method args and 
return types (#2071)
f24a11a2 is described below

commit f24a11a29cbd91d77f517f39b182ab2c8d8fb02c
Author: Albin Skott <cstr...@users.noreply.github.com>
AuthorDate: Sun Aug 3 14:50:26 2025 +0200

    fix(python): add proper typings for async method args and return types 
(#2071)
    
    Update [`pyo3-stub-gen`](https://github.com/Jij-Inc/pyo3-stub-gen) to
    `0.12.0` (with crates.io patch) to allowing us to override Python types
    in generated stub file. This fixes use of unsafe impls and provides
    types where we previously generated `typing.Any`.
---
 foreign/python/Cargo.toml      |   5 ++-
 foreign/python/apache_iggy.pyi |  24 +++++-----
 foreign/python/src/client.rs   |  11 ++++-
 foreign/python/src/consumer.rs | 100 ++++++-----------------------------------
 4 files changed, 39 insertions(+), 101 deletions(-)

diff --git a/foreign/python/Cargo.toml b/foreign/python/Cargo.toml
index 90209eac..ce2056ec 100644
--- a/foreign/python/Cargo.toml
+++ b/foreign/python/Cargo.toml
@@ -32,9 +32,12 @@ pyo3-async-runtimes = { version = "0.25.0", features = [
     "attributes",
     "tokio-runtime",
 ] }
-pyo3-stub-gen = "0.11.1"
+pyo3-stub-gen = "0.12.0"
 tokio = "1.40.0"
 
+[patch.crates-io]
+pyo3-stub-gen = { git = "https://github.com/Jij-Inc/pyo3-stub-gen.git";, rev = 
"1870f637f700605395a666e3bdc0276aece73b5f" }
+
 [lib]
 name = "apache_iggy"
 crate-type = ["cdylib", "rlib"]
diff --git a/foreign/python/apache_iggy.pyi b/foreign/python/apache_iggy.pyi
index f2459bce..f5b8c204 100644
--- a/foreign/python/apache_iggy.pyi
+++ b/foreign/python/apache_iggy.pyi
@@ -190,56 +190,56 @@ class IggyClient:
         This initializes a new runtime for asynchronous operations.
         Future versions might utilize asyncio for more Pythonic async.
         """
-    def ping(self) -> typing.Any:
+    def ping(self) -> collections.abc.Awaitable[None]:
         r"""
         Sends a ping request to the server to check connectivity.
         
         Returns `Ok(())` if the server responds successfully, or a 
`PyRuntimeError`
         if the connection fails.
         """
-    def login_user(self, username:builtins.str, password:builtins.str) -> 
typing.Any:
+    def login_user(self, username:builtins.str, password:builtins.str) -> 
collections.abc.Awaitable[None]:
         r"""
         Logs in the user with the given credentials.
         
         Returns `Ok(())` on success, or a PyRuntimeError on failure.
         """
-    def connect(self) -> typing.Any:
+    def connect(self) -> collections.abc.Awaitable[None]:
         r"""
         Connects the IggyClient to its service.
         
         Returns Ok(()) on successful connection or a PyRuntimeError on failure.
         """
-    def create_stream(self, name:builtins.str, 
stream_id:typing.Optional[builtins.int]=None) -> typing.Any:
+    def create_stream(self, name:builtins.str, 
stream_id:typing.Optional[builtins.int]=None) -> 
collections.abc.Awaitable[None]:
         r"""
         Creates a new stream with the provided ID and name.
         
         Returns Ok(()) on successful stream creation or a PyRuntimeError on 
failure.
         """
-    def get_stream(self, stream_id:builtins.str | builtins.int) -> typing.Any:
+    def get_stream(self, stream_id:builtins.str | builtins.int) -> 
collections.abc.Awaitable[typing.Optional[StreamDetails]]:
         r"""
         Gets stream by id.
         
         Returns Option of stream details or a PyRuntimeError on failure.
         """
-    def create_topic(self, stream:builtins.str | builtins.int, 
name:builtins.str, partitions_count:builtins.int, 
compression_algorithm:typing.Optional[builtins.str]=None, 
topic_id:typing.Optional[builtins.int]=None, 
replication_factor:typing.Optional[builtins.int]=None) -> typing.Any:
+    def create_topic(self, stream:builtins.str | builtins.int, 
name:builtins.str, partitions_count:builtins.int, 
compression_algorithm:typing.Optional[builtins.str]=None, 
topic_id:typing.Optional[builtins.int]=None, 
replication_factor:typing.Optional[builtins.int]=None) -> 
collections.abc.Awaitable[None]:
         r"""
         Creates a new topic with the given parameters.
         
         Returns Ok(()) on successful topic creation or a PyRuntimeError on 
failure.
         """
-    def get_topic(self, stream_id:builtins.str | builtins.int, 
topic_id:builtins.str | builtins.int) -> typing.Any:
+    def get_topic(self, stream_id:builtins.str | builtins.int, 
topic_id:builtins.str | builtins.int) -> 
collections.abc.Awaitable[typing.Optional[TopicDetails]]:
         r"""
         Gets topic by stream and id.
         
         Returns Option of topic details or a PyRuntimeError on failure.
         """
-    def send_messages(self, stream:builtins.str | builtins.int, 
topic:builtins.str | builtins.int, partitioning:builtins.int, messages:list) -> 
typing.Any:
+    def send_messages(self, stream:builtins.str | builtins.int, 
topic:builtins.str | builtins.int, partitioning:builtins.int, 
messages:list[SendMessage]) -> collections.abc.Awaitable[None]:
         r"""
         Sends a list of messages to the specified topic.
         
         Returns Ok(()) on successful sending or a PyRuntimeError on failure.
         """
-    def poll_messages(self, stream:builtins.str | builtins.int, 
topic:builtins.str | builtins.int, partition_id:builtins.int, 
polling_strategy:PollingStrategy, count:builtins.int, 
auto_commit:builtins.bool) -> typing.Any:
+    def poll_messages(self, stream:builtins.str | builtins.int, 
topic:builtins.str | builtins.int, partition_id:builtins.int, 
polling_strategy:PollingStrategy, count:builtins.int, 
auto_commit:builtins.bool) -> collections.abc.Awaitable[list[ReceiveMessage]]:
         r"""
         Polls for messages from the specified topic and partition.
         
@@ -282,7 +282,7 @@ class IggyConsumer:
         r"""
         Gets the name of the topic this consumer group is configured for.
         """
-    def store_offset(self, offset:builtins.int, 
partition_id:typing.Optional[builtins.int]) -> typing.Any:
+    def store_offset(self, offset:builtins.int, 
partition_id:typing.Optional[builtins.int]) -> collections.abc.Awaitable[None]:
         r"""
         Stores the provided offset for the provided partition id or if none is 
specified
         uses the current partition id for the consumer group.
@@ -290,7 +290,7 @@ class IggyConsumer:
         Returns `Ok(())` if the server responds successfully, or a 
`PyRuntimeError`
         if the operation fails.
         """
-    def delete_offset(self, partition_id:typing.Optional[builtins.int]) -> 
typing.Any:
+    def delete_offset(self, partition_id:typing.Optional[builtins.int]) -> 
collections.abc.Awaitable[None]:
         r"""
         Deletes the offset for the provided partition id or if none is 
specified
         uses the current partition id for the consumer group.
@@ -298,7 +298,7 @@ class IggyConsumer:
         Returns `Ok(())` if the server responds successfully, or a 
`PyRuntimeError`
         if the operation fails.
         """
-    def consume_messages(self, callback:collections.abc.Callable[[str]], 
shutdown_event:typing.Optional[asyncio.Event]) -> typing.Any:
+    def consume_messages(self, callback:collections.abc.Callable[[str]], 
shutdown_event:typing.Optional[asyncio.Event]) -> 
collections.abc.Awaitable[None]:
         r"""
         Consumes messages continuously using a callback function and an 
optional `asyncio.Event` for signaling shutdown.
         
diff --git a/foreign/python/src/client.rs b/foreign/python/src/client.rs
index ca6735a5..74fe95a4 100644
--- a/foreign/python/src/client.rs
+++ b/foreign/python/src/client.rs
@@ -70,6 +70,7 @@ impl IggyClient {
     ///
     /// Returns `Ok(())` if the server responds successfully, or a 
`PyRuntimeError`
     /// if the connection fails.
+    
#[gen_stub(override_return_type(type_repr="collections.abc.Awaitable[None]", 
imports=("collections.abc")))]
     fn ping<'a>(&self, py: Python<'a>) -> PyResult<Bound<'a, PyAny>> {
         let inner = self.inner.clone();
         future_into_py(py, async move {
@@ -83,6 +84,7 @@ impl IggyClient {
     /// Logs in the user with the given credentials.
     ///
     /// Returns `Ok(())` on success, or a PyRuntimeError on failure.
+    
#[gen_stub(override_return_type(type_repr="collections.abc.Awaitable[None]", 
imports=("collections.abc")))]
     fn login_user<'a>(
         &self,
         py: Python<'a>,
@@ -102,6 +104,7 @@ impl IggyClient {
     /// Connects the IggyClient to its service.
     ///
     /// Returns Ok(()) on successful connection or a PyRuntimeError on failure.
+    
#[gen_stub(override_return_type(type_repr="collections.abc.Awaitable[None]", 
imports=("collections.abc")))]
     fn connect<'a>(&self, py: Python<'a>) -> PyResult<Bound<'a, PyAny>> {
         let inner = self.inner.clone();
         future_into_py(py, async move {
@@ -117,6 +120,7 @@ impl IggyClient {
     ///
     /// Returns Ok(()) on successful stream creation or a PyRuntimeError on 
failure.
     #[pyo3(signature = (name, stream_id = None))]
+    
#[gen_stub(override_return_type(type_repr="collections.abc.Awaitable[None]", 
imports=("collections.abc")))]
     fn create_stream<'a>(
         &self,
         py: Python<'a>,
@@ -136,6 +140,7 @@ impl IggyClient {
     /// Gets stream by id.
     ///
     /// Returns Option of stream details or a PyRuntimeError on failure.
+    
#[gen_stub(override_return_type(type_repr="collections.abc.Awaitable[typing.Optional[StreamDetails]]",
 imports=("collections.abc")))]
     fn get_stream<'a>(
         &self,
         py: Python<'a>,
@@ -160,6 +165,7 @@ impl IggyClient {
         signature = (stream, name, partitions_count, compression_algorithm = 
None, topic_id = None, replication_factor = None)
     )]
     #[allow(clippy::too_many_arguments)]
+    
#[gen_stub(override_return_type(type_repr="collections.abc.Awaitable[None]", 
imports=("collections.abc")))]
     fn create_topic<'a>(
         &self,
         py: Python<'a>,
@@ -200,6 +206,7 @@ impl IggyClient {
     /// Gets topic by stream and id.
     ///
     /// Returns Option of topic details or a PyRuntimeError on failure.
+    
#[gen_stub(override_return_type(type_repr="collections.abc.Awaitable[typing.Optional[TopicDetails]]",
 imports=("collections.abc")))]
     fn get_topic<'a>(
         &self,
         py: Python<'a>,
@@ -222,13 +229,14 @@ impl IggyClient {
     /// Sends a list of messages to the specified topic.
     ///
     /// Returns Ok(()) on successful sending or a PyRuntimeError on failure.
+    
#[gen_stub(override_return_type(type_repr="collections.abc.Awaitable[None]", 
imports=("collections.abc")))]
     fn send_messages<'a>(
         &self,
         py: Python<'a>,
         stream: PyIdentifier,
         topic: PyIdentifier,
         partitioning: u32,
-        messages: &Bound<'_, PyList>,
+        #[gen_stub(override_type(type_repr = "list[SendMessage]"))] messages: 
&Bound<'_, PyList>,
     ) -> PyResult<Bound<'a, PyAny>> {
         let messages: Vec<SendMessage> = messages
             .iter()
@@ -257,6 +265,7 @@ impl IggyClient {
     ///
     /// Returns a list of received messages or a PyRuntimeError on failure.
     #[allow(clippy::too_many_arguments)]
+    
#[gen_stub(override_return_type(type_repr="collections.abc.Awaitable[list[ReceiveMessage]]",
 imports=("collections.abc")))]
     fn poll_messages<'a>(
         &self,
         py: Python<'a>,
diff --git a/foreign/python/src/consumer.rs b/foreign/python/src/consumer.rs
index 15677bd2..c7464dcf 100644
--- a/foreign/python/src/consumer.rs
+++ b/foreign/python/src/consumer.rs
@@ -16,7 +16,6 @@
  * under the License.
  */
 
-use std::collections::HashSet;
 use std::sync::Arc;
 use std::time::Duration;
 
@@ -26,13 +25,12 @@ use iggy::prelude::{
     AutoCommitWhen as RustAutoCommitWhen, *,
 };
 use iggy::prelude::{IggyConsumer as RustIggyConsumer, IggyError, 
ReceivedMessage};
-use pyo3::types::{PyDelta, PyDeltaAccess, PyFunction};
+use pyo3::types::{PyDelta, PyDeltaAccess};
 
-use pyo3::{prelude::*, type_object};
+use pyo3::prelude::*;
 use pyo3_async_runtimes::tokio::{future_into_py, get_runtime, into_future, 
scope};
 use pyo3_async_runtimes::TaskLocals;
 use pyo3_stub_gen::derive::{gen_stub_pyclass, gen_stub_pyclass_complex_enum, 
gen_stub_pymethods};
-use pyo3_stub_gen::PyStubType;
 use tokio::sync::oneshot::Sender;
 use tokio::sync::Mutex;
 use tokio::task::JoinHandle;
@@ -91,6 +89,7 @@ impl IggyConsumer {
     ///
     /// Returns `Ok(())` if the server responds successfully, or a 
`PyRuntimeError`
     /// if the operation fails.
+    
#[gen_stub(override_return_type(type_repr="collections.abc.Awaitable[None]", 
imports=("collections.abc")))]
     fn store_offset<'a>(
         &self,
         py: Python<'a>,
@@ -113,6 +112,7 @@ impl IggyConsumer {
     ///
     /// Returns `Ok(())` if the server responds successfully, or a 
`PyRuntimeError`
     /// if the operation fails.
+    
#[gen_stub(override_return_type(type_repr="collections.abc.Awaitable[None]", 
imports=("collections.abc")))]
     fn delete_offset<'a>(
         &self,
         py: Python<'a>,
@@ -132,16 +132,18 @@ impl IggyConsumer {
     /// Consumes messages continuously using a callback function and an 
optional `asyncio.Event` for signaling shutdown.
     ///
     /// Returns an awaitable that completes when shutdown is signaled or a 
PyRuntimeError on failure.
+    
#[gen_stub(override_return_type(type_repr="collections.abc.Awaitable[None]", 
imports=("collections.abc")))]
     fn consume_messages<'a>(
         &self,
         py: Python<'a>,
-        callback: Bound<'a, PyMessageCallback>,
-        shutdown_event: Option<Bound<'a, PyAsyncioEvent>>,
+        #[gen_stub(override_type(type_repr="collections.abc.Callable[[str]]", 
imports=("collections.abc")))]
+        callback: Bound<'a, PyAny>,
+        #[gen_stub(override_type(type_repr="typing.Optional[asyncio.Event]", 
imports=("asyncio")))]
+        shutdown_event: Option<Bound<'a, PyAny>>,
     ) -> PyResult<Bound<'a, PyAny>> {
         let inner = self.inner.clone();
-        let callback: Py<PyMessageCallback> = callback.unbind();
-        let shutdown_event: Option<Py<PyAsyncioEvent>> =
-            shutdown_event.map(|e| e.unbind());
+        let callback: Py<PyAny> = callback.unbind();
+        let shutdown_event: Option<Py<PyAny>> = shutdown_event.map(|e| 
e.unbind());
 
         future_into_py(py, async {
             let (shutdown_tx, shutdown_rx) = 
tokio::sync::oneshot::channel::<()>();
@@ -170,7 +172,7 @@ impl IggyConsumer {
             if let Some(shutdown_event) = shutdown_event {
                 let task_locals = 
Python::with_gil(pyo3_async_runtimes::tokio::get_current_locals)?;
                 async fn shutdown_impl(
-                    shutdown_event: Py<PyAsyncioEvent>,
+                    shutdown_event: Py<PyAny>,
                     shutdown_tx: Sender<()>,
                 ) -> PyResult<()> {
                     Python::with_gil(|py| {
@@ -208,7 +210,7 @@ impl IggyConsumer {
 }
 
 struct PyCallbackConsumer {
-    callback: Arc<Py<PyMessageCallback>>,
+    callback: Arc<Py<PyAny>>,
     task_locals: Arc<Mutex<TaskLocals>>,
 }
 
@@ -235,82 +237,6 @@ impl MessageConsumer for PyCallbackConsumer {
     }
 }
 
-#[repr(transparent)]
-pub struct PyMessageCallback(PyFunction);
-
-unsafe impl type_object::PyTypeInfo for PyMessageCallback {
-    const NAME: &'static str = stringify!(PyMessageCallback);
-    const MODULE: ::std::option::Option<&'static str> = None;
-
-    #[inline]
-    #[allow(clippy::redundant_closure_call)]
-    fn type_object_raw(py: Python<'_>) -> *mut pyo3::ffi::PyTypeObject {
-        (|_py| {
-            #[allow(unused_unsafe)] // 
https://github.com/rust-lang/rust/pull/125834
-            unsafe {
-                ::std::ptr::addr_of_mut!(pyo3::ffi::PyFunction_Type)
-            }
-        })(py)
-    }
-
-    #[inline]
-    fn is_type_of(obj: &Bound<'_, PyAny>) -> bool {
-        #[allow(unused_unsafe)]
-        unsafe {
-            pyo3::ffi::PyFunction_Check(obj.as_ptr()) > 0
-        }
-    }
-}
-
-impl PyStubType for PyMessageCallback {
-    fn type_output() -> pyo3_stub_gen::TypeInfo {
-        pyo3_stub_gen::TypeInfo {
-            name: String::from("collections.abc.Callable[[str]]"),
-            import: HashSet::from(["collections.abc".into()]),
-        }
-    }
-}
-
-#[repr(transparent)]
-pub struct PyAsyncioEvent(PyAny);
-
-unsafe impl type_object::PyTypeInfo for PyAsyncioEvent {
-    const NAME: &'static str = stringify!(PyAsyncioEvent);
-    const MODULE: ::std::option::Option<&'static str> = None;
-
-    #[inline]
-    #[allow(clippy::redundant_closure_call)]
-    fn type_object_raw(py: Python<'_>) -> *mut pyo3::ffi::PyTypeObject {
-        (|_py| {
-            #[allow(unused_unsafe)] // 
https://github.com/rust-lang/rust/pull/125834
-            unsafe {
-                ::std::ptr::addr_of_mut!(pyo3::ffi::PyBaseObject_Type)
-            }
-        })(py)
-    }
-
-    #[inline]
-    fn is_type_of(obj: &Bound<'_, PyAny>) -> bool {
-        (|| {
-            let ty = obj.get_type();
-            Ok::<bool, PyErr>(
-                ty.name()?.extract::<&str>()? == "Event"
-                    && ty.module()?.extract::<&str>()? == "asyncio.locks",
-            )
-        })()
-        .unwrap_or(false)
-    }
-}
-
-impl PyStubType for PyAsyncioEvent {
-    fn type_output() -> pyo3_stub_gen::TypeInfo {
-        pyo3_stub_gen::TypeInfo {
-            name: String::from("asyncio.Event"),
-            import: HashSet::from(["asyncio".into()]),
-        }
-    }
-}
-
 /// The auto-commit configuration for storing the offset on the server.
 // #[derive(Debug, PartialEq, Copy, Clone)]
 #[gen_stub_pyclass_complex_enum]

Reply via email to