This is an automated email from the ASF dual-hosted git repository. piotr pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/iggy.git
The following commit(s) were added to refs/heads/master by this push: new f24a11a2 fix(python): add proper typings for async method args and return types (#2071) f24a11a2 is described below commit f24a11a29cbd91d77f517f39b182ab2c8d8fb02c Author: Albin Skott <cstr...@users.noreply.github.com> AuthorDate: Sun Aug 3 14:50:26 2025 +0200 fix(python): add proper typings for async method args and return types (#2071) Update [`pyo3-stub-gen`](https://github.com/Jij-Inc/pyo3-stub-gen) to `0.12.0` (with crates.io patch) to allowing us to override Python types in generated stub file. This fixes use of unsafe impls and provides types where we previously generated `typing.Any`. --- foreign/python/Cargo.toml | 5 ++- foreign/python/apache_iggy.pyi | 24 +++++----- foreign/python/src/client.rs | 11 ++++- foreign/python/src/consumer.rs | 100 ++++++----------------------------------- 4 files changed, 39 insertions(+), 101 deletions(-) diff --git a/foreign/python/Cargo.toml b/foreign/python/Cargo.toml index 90209eac..ce2056ec 100644 --- a/foreign/python/Cargo.toml +++ b/foreign/python/Cargo.toml @@ -32,9 +32,12 @@ pyo3-async-runtimes = { version = "0.25.0", features = [ "attributes", "tokio-runtime", ] } -pyo3-stub-gen = "0.11.1" +pyo3-stub-gen = "0.12.0" tokio = "1.40.0" +[patch.crates-io] +pyo3-stub-gen = { git = "https://github.com/Jij-Inc/pyo3-stub-gen.git", rev = "1870f637f700605395a666e3bdc0276aece73b5f" } + [lib] name = "apache_iggy" crate-type = ["cdylib", "rlib"] diff --git a/foreign/python/apache_iggy.pyi b/foreign/python/apache_iggy.pyi index f2459bce..f5b8c204 100644 --- a/foreign/python/apache_iggy.pyi +++ b/foreign/python/apache_iggy.pyi @@ -190,56 +190,56 @@ class IggyClient: This initializes a new runtime for asynchronous operations. Future versions might utilize asyncio for more Pythonic async. """ - def ping(self) -> typing.Any: + def ping(self) -> collections.abc.Awaitable[None]: r""" Sends a ping request to the server to check connectivity. Returns `Ok(())` if the server responds successfully, or a `PyRuntimeError` if the connection fails. """ - def login_user(self, username:builtins.str, password:builtins.str) -> typing.Any: + def login_user(self, username:builtins.str, password:builtins.str) -> collections.abc.Awaitable[None]: r""" Logs in the user with the given credentials. Returns `Ok(())` on success, or a PyRuntimeError on failure. """ - def connect(self) -> typing.Any: + def connect(self) -> collections.abc.Awaitable[None]: r""" Connects the IggyClient to its service. Returns Ok(()) on successful connection or a PyRuntimeError on failure. """ - def create_stream(self, name:builtins.str, stream_id:typing.Optional[builtins.int]=None) -> typing.Any: + def create_stream(self, name:builtins.str, stream_id:typing.Optional[builtins.int]=None) -> collections.abc.Awaitable[None]: r""" Creates a new stream with the provided ID and name. Returns Ok(()) on successful stream creation or a PyRuntimeError on failure. """ - def get_stream(self, stream_id:builtins.str | builtins.int) -> typing.Any: + def get_stream(self, stream_id:builtins.str | builtins.int) -> collections.abc.Awaitable[typing.Optional[StreamDetails]]: r""" Gets stream by id. Returns Option of stream details or a PyRuntimeError on failure. """ - def create_topic(self, stream:builtins.str | builtins.int, name:builtins.str, partitions_count:builtins.int, compression_algorithm:typing.Optional[builtins.str]=None, topic_id:typing.Optional[builtins.int]=None, replication_factor:typing.Optional[builtins.int]=None) -> typing.Any: + def create_topic(self, stream:builtins.str | builtins.int, name:builtins.str, partitions_count:builtins.int, compression_algorithm:typing.Optional[builtins.str]=None, topic_id:typing.Optional[builtins.int]=None, replication_factor:typing.Optional[builtins.int]=None) -> collections.abc.Awaitable[None]: r""" Creates a new topic with the given parameters. Returns Ok(()) on successful topic creation or a PyRuntimeError on failure. """ - def get_topic(self, stream_id:builtins.str | builtins.int, topic_id:builtins.str | builtins.int) -> typing.Any: + def get_topic(self, stream_id:builtins.str | builtins.int, topic_id:builtins.str | builtins.int) -> collections.abc.Awaitable[typing.Optional[TopicDetails]]: r""" Gets topic by stream and id. Returns Option of topic details or a PyRuntimeError on failure. """ - def send_messages(self, stream:builtins.str | builtins.int, topic:builtins.str | builtins.int, partitioning:builtins.int, messages:list) -> typing.Any: + def send_messages(self, stream:builtins.str | builtins.int, topic:builtins.str | builtins.int, partitioning:builtins.int, messages:list[SendMessage]) -> collections.abc.Awaitable[None]: r""" Sends a list of messages to the specified topic. Returns Ok(()) on successful sending or a PyRuntimeError on failure. """ - def poll_messages(self, stream:builtins.str | builtins.int, topic:builtins.str | builtins.int, partition_id:builtins.int, polling_strategy:PollingStrategy, count:builtins.int, auto_commit:builtins.bool) -> typing.Any: + def poll_messages(self, stream:builtins.str | builtins.int, topic:builtins.str | builtins.int, partition_id:builtins.int, polling_strategy:PollingStrategy, count:builtins.int, auto_commit:builtins.bool) -> collections.abc.Awaitable[list[ReceiveMessage]]: r""" Polls for messages from the specified topic and partition. @@ -282,7 +282,7 @@ class IggyConsumer: r""" Gets the name of the topic this consumer group is configured for. """ - def store_offset(self, offset:builtins.int, partition_id:typing.Optional[builtins.int]) -> typing.Any: + def store_offset(self, offset:builtins.int, partition_id:typing.Optional[builtins.int]) -> collections.abc.Awaitable[None]: r""" Stores the provided offset for the provided partition id or if none is specified uses the current partition id for the consumer group. @@ -290,7 +290,7 @@ class IggyConsumer: Returns `Ok(())` if the server responds successfully, or a `PyRuntimeError` if the operation fails. """ - def delete_offset(self, partition_id:typing.Optional[builtins.int]) -> typing.Any: + def delete_offset(self, partition_id:typing.Optional[builtins.int]) -> collections.abc.Awaitable[None]: r""" Deletes the offset for the provided partition id or if none is specified uses the current partition id for the consumer group. @@ -298,7 +298,7 @@ class IggyConsumer: Returns `Ok(())` if the server responds successfully, or a `PyRuntimeError` if the operation fails. """ - def consume_messages(self, callback:collections.abc.Callable[[str]], shutdown_event:typing.Optional[asyncio.Event]) -> typing.Any: + def consume_messages(self, callback:collections.abc.Callable[[str]], shutdown_event:typing.Optional[asyncio.Event]) -> collections.abc.Awaitable[None]: r""" Consumes messages continuously using a callback function and an optional `asyncio.Event` for signaling shutdown. diff --git a/foreign/python/src/client.rs b/foreign/python/src/client.rs index ca6735a5..74fe95a4 100644 --- a/foreign/python/src/client.rs +++ b/foreign/python/src/client.rs @@ -70,6 +70,7 @@ impl IggyClient { /// /// Returns `Ok(())` if the server responds successfully, or a `PyRuntimeError` /// if the connection fails. + #[gen_stub(override_return_type(type_repr="collections.abc.Awaitable[None]", imports=("collections.abc")))] fn ping<'a>(&self, py: Python<'a>) -> PyResult<Bound<'a, PyAny>> { let inner = self.inner.clone(); future_into_py(py, async move { @@ -83,6 +84,7 @@ impl IggyClient { /// Logs in the user with the given credentials. /// /// Returns `Ok(())` on success, or a PyRuntimeError on failure. + #[gen_stub(override_return_type(type_repr="collections.abc.Awaitable[None]", imports=("collections.abc")))] fn login_user<'a>( &self, py: Python<'a>, @@ -102,6 +104,7 @@ impl IggyClient { /// Connects the IggyClient to its service. /// /// Returns Ok(()) on successful connection or a PyRuntimeError on failure. + #[gen_stub(override_return_type(type_repr="collections.abc.Awaitable[None]", imports=("collections.abc")))] fn connect<'a>(&self, py: Python<'a>) -> PyResult<Bound<'a, PyAny>> { let inner = self.inner.clone(); future_into_py(py, async move { @@ -117,6 +120,7 @@ impl IggyClient { /// /// Returns Ok(()) on successful stream creation or a PyRuntimeError on failure. #[pyo3(signature = (name, stream_id = None))] + #[gen_stub(override_return_type(type_repr="collections.abc.Awaitable[None]", imports=("collections.abc")))] fn create_stream<'a>( &self, py: Python<'a>, @@ -136,6 +140,7 @@ impl IggyClient { /// Gets stream by id. /// /// Returns Option of stream details or a PyRuntimeError on failure. + #[gen_stub(override_return_type(type_repr="collections.abc.Awaitable[typing.Optional[StreamDetails]]", imports=("collections.abc")))] fn get_stream<'a>( &self, py: Python<'a>, @@ -160,6 +165,7 @@ impl IggyClient { signature = (stream, name, partitions_count, compression_algorithm = None, topic_id = None, replication_factor = None) )] #[allow(clippy::too_many_arguments)] + #[gen_stub(override_return_type(type_repr="collections.abc.Awaitable[None]", imports=("collections.abc")))] fn create_topic<'a>( &self, py: Python<'a>, @@ -200,6 +206,7 @@ impl IggyClient { /// Gets topic by stream and id. /// /// Returns Option of topic details or a PyRuntimeError on failure. + #[gen_stub(override_return_type(type_repr="collections.abc.Awaitable[typing.Optional[TopicDetails]]", imports=("collections.abc")))] fn get_topic<'a>( &self, py: Python<'a>, @@ -222,13 +229,14 @@ impl IggyClient { /// Sends a list of messages to the specified topic. /// /// Returns Ok(()) on successful sending or a PyRuntimeError on failure. + #[gen_stub(override_return_type(type_repr="collections.abc.Awaitable[None]", imports=("collections.abc")))] fn send_messages<'a>( &self, py: Python<'a>, stream: PyIdentifier, topic: PyIdentifier, partitioning: u32, - messages: &Bound<'_, PyList>, + #[gen_stub(override_type(type_repr = "list[SendMessage]"))] messages: &Bound<'_, PyList>, ) -> PyResult<Bound<'a, PyAny>> { let messages: Vec<SendMessage> = messages .iter() @@ -257,6 +265,7 @@ impl IggyClient { /// /// Returns a list of received messages or a PyRuntimeError on failure. #[allow(clippy::too_many_arguments)] + #[gen_stub(override_return_type(type_repr="collections.abc.Awaitable[list[ReceiveMessage]]", imports=("collections.abc")))] fn poll_messages<'a>( &self, py: Python<'a>, diff --git a/foreign/python/src/consumer.rs b/foreign/python/src/consumer.rs index 15677bd2..c7464dcf 100644 --- a/foreign/python/src/consumer.rs +++ b/foreign/python/src/consumer.rs @@ -16,7 +16,6 @@ * under the License. */ -use std::collections::HashSet; use std::sync::Arc; use std::time::Duration; @@ -26,13 +25,12 @@ use iggy::prelude::{ AutoCommitWhen as RustAutoCommitWhen, *, }; use iggy::prelude::{IggyConsumer as RustIggyConsumer, IggyError, ReceivedMessage}; -use pyo3::types::{PyDelta, PyDeltaAccess, PyFunction}; +use pyo3::types::{PyDelta, PyDeltaAccess}; -use pyo3::{prelude::*, type_object}; +use pyo3::prelude::*; use pyo3_async_runtimes::tokio::{future_into_py, get_runtime, into_future, scope}; use pyo3_async_runtimes::TaskLocals; use pyo3_stub_gen::derive::{gen_stub_pyclass, gen_stub_pyclass_complex_enum, gen_stub_pymethods}; -use pyo3_stub_gen::PyStubType; use tokio::sync::oneshot::Sender; use tokio::sync::Mutex; use tokio::task::JoinHandle; @@ -91,6 +89,7 @@ impl IggyConsumer { /// /// Returns `Ok(())` if the server responds successfully, or a `PyRuntimeError` /// if the operation fails. + #[gen_stub(override_return_type(type_repr="collections.abc.Awaitable[None]", imports=("collections.abc")))] fn store_offset<'a>( &self, py: Python<'a>, @@ -113,6 +112,7 @@ impl IggyConsumer { /// /// Returns `Ok(())` if the server responds successfully, or a `PyRuntimeError` /// if the operation fails. + #[gen_stub(override_return_type(type_repr="collections.abc.Awaitable[None]", imports=("collections.abc")))] fn delete_offset<'a>( &self, py: Python<'a>, @@ -132,16 +132,18 @@ impl IggyConsumer { /// Consumes messages continuously using a callback function and an optional `asyncio.Event` for signaling shutdown. /// /// Returns an awaitable that completes when shutdown is signaled or a PyRuntimeError on failure. + #[gen_stub(override_return_type(type_repr="collections.abc.Awaitable[None]", imports=("collections.abc")))] fn consume_messages<'a>( &self, py: Python<'a>, - callback: Bound<'a, PyMessageCallback>, - shutdown_event: Option<Bound<'a, PyAsyncioEvent>>, + #[gen_stub(override_type(type_repr="collections.abc.Callable[[str]]", imports=("collections.abc")))] + callback: Bound<'a, PyAny>, + #[gen_stub(override_type(type_repr="typing.Optional[asyncio.Event]", imports=("asyncio")))] + shutdown_event: Option<Bound<'a, PyAny>>, ) -> PyResult<Bound<'a, PyAny>> { let inner = self.inner.clone(); - let callback: Py<PyMessageCallback> = callback.unbind(); - let shutdown_event: Option<Py<PyAsyncioEvent>> = - shutdown_event.map(|e| e.unbind()); + let callback: Py<PyAny> = callback.unbind(); + let shutdown_event: Option<Py<PyAny>> = shutdown_event.map(|e| e.unbind()); future_into_py(py, async { let (shutdown_tx, shutdown_rx) = tokio::sync::oneshot::channel::<()>(); @@ -170,7 +172,7 @@ impl IggyConsumer { if let Some(shutdown_event) = shutdown_event { let task_locals = Python::with_gil(pyo3_async_runtimes::tokio::get_current_locals)?; async fn shutdown_impl( - shutdown_event: Py<PyAsyncioEvent>, + shutdown_event: Py<PyAny>, shutdown_tx: Sender<()>, ) -> PyResult<()> { Python::with_gil(|py| { @@ -208,7 +210,7 @@ impl IggyConsumer { } struct PyCallbackConsumer { - callback: Arc<Py<PyMessageCallback>>, + callback: Arc<Py<PyAny>>, task_locals: Arc<Mutex<TaskLocals>>, } @@ -235,82 +237,6 @@ impl MessageConsumer for PyCallbackConsumer { } } -#[repr(transparent)] -pub struct PyMessageCallback(PyFunction); - -unsafe impl type_object::PyTypeInfo for PyMessageCallback { - const NAME: &'static str = stringify!(PyMessageCallback); - const MODULE: ::std::option::Option<&'static str> = None; - - #[inline] - #[allow(clippy::redundant_closure_call)] - fn type_object_raw(py: Python<'_>) -> *mut pyo3::ffi::PyTypeObject { - (|_py| { - #[allow(unused_unsafe)] // https://github.com/rust-lang/rust/pull/125834 - unsafe { - ::std::ptr::addr_of_mut!(pyo3::ffi::PyFunction_Type) - } - })(py) - } - - #[inline] - fn is_type_of(obj: &Bound<'_, PyAny>) -> bool { - #[allow(unused_unsafe)] - unsafe { - pyo3::ffi::PyFunction_Check(obj.as_ptr()) > 0 - } - } -} - -impl PyStubType for PyMessageCallback { - fn type_output() -> pyo3_stub_gen::TypeInfo { - pyo3_stub_gen::TypeInfo { - name: String::from("collections.abc.Callable[[str]]"), - import: HashSet::from(["collections.abc".into()]), - } - } -} - -#[repr(transparent)] -pub struct PyAsyncioEvent(PyAny); - -unsafe impl type_object::PyTypeInfo for PyAsyncioEvent { - const NAME: &'static str = stringify!(PyAsyncioEvent); - const MODULE: ::std::option::Option<&'static str> = None; - - #[inline] - #[allow(clippy::redundant_closure_call)] - fn type_object_raw(py: Python<'_>) -> *mut pyo3::ffi::PyTypeObject { - (|_py| { - #[allow(unused_unsafe)] // https://github.com/rust-lang/rust/pull/125834 - unsafe { - ::std::ptr::addr_of_mut!(pyo3::ffi::PyBaseObject_Type) - } - })(py) - } - - #[inline] - fn is_type_of(obj: &Bound<'_, PyAny>) -> bool { - (|| { - let ty = obj.get_type(); - Ok::<bool, PyErr>( - ty.name()?.extract::<&str>()? == "Event" - && ty.module()?.extract::<&str>()? == "asyncio.locks", - ) - })() - .unwrap_or(false) - } -} - -impl PyStubType for PyAsyncioEvent { - fn type_output() -> pyo3_stub_gen::TypeInfo { - pyo3_stub_gen::TypeInfo { - name: String::from("asyncio.Event"), - import: HashSet::from(["asyncio".into()]), - } - } -} - /// The auto-commit configuration for storing the offset on the server. // #[derive(Debug, PartialEq, Copy, Clone)] #[gen_stub_pyclass_complex_enum]