This is an automated email from the ASF dual-hosted git repository. hgruszecki pushed a commit to branch python-upgrade in repository https://gitbox.apache.org/repos/asf/iggy.git
commit e5714b53173e5857dcaff62fde4003dd3e0e4af5 Author: Hubert Gruszecki <[email protected]> AuthorDate: Fri Jan 30 12:48:04 2026 +0100 chore(python): upgrade pyo3 to 0.27 and fix deprecations --- foreign/python/Cargo.toml | 8 ++++---- foreign/python/src/client.rs | 12 ++++++------ foreign/python/src/consumer.rs | 16 ++++++++-------- 3 files changed, 18 insertions(+), 18 deletions(-) diff --git a/foreign/python/Cargo.toml b/foreign/python/Cargo.toml index eb8230fdc..ac1e9f819 100644 --- a/foreign/python/Cargo.toml +++ b/foreign/python/Cargo.toml @@ -29,16 +29,16 @@ documentation = "https://iggy.apache.org/docs/" repository = "https://github.com/apache/iggy" [dependencies] -bytes = "1.10.1" +bytes = "1.11.0" futures = "0.3.31" iggy = { path = "../../core/sdk", version = "0.8.2-edge.1" } -pyo3 = "0.26.0" -pyo3-async-runtimes = { version = "0.26.0", features = [ +pyo3 = "0.27.2" +pyo3-async-runtimes = { version = "0.27.0", features = [ "attributes", "tokio-runtime", ] } pyo3-stub-gen = { git = "https://github.com/Jij-Inc/pyo3-stub-gen.git", rev = "63e77533b55782799df28ea4b4676c42d203779e" } -tokio = "1.47.1" +tokio = "1.49.0" [lib] name = "apache_iggy" diff --git a/foreign/python/src/client.rs b/foreign/python/src/client.rs index b0db32fc3..b903329e2 100644 --- a/foreign/python/src/client.rs +++ b/foreign/python/src/client.rs @@ -22,6 +22,7 @@ use iggy::prelude::{ }; use pyo3::prelude::*; use pyo3::types::{PyDelta, PyList, PyType}; +use pyo3::PyRef; use pyo3_async_runtimes::tokio::future_into_py; use pyo3_stub_gen::define_stub_info_gatherer; use pyo3_stub_gen::derive::{gen_stub_pyclass, gen_stub_pymethods}; @@ -138,11 +139,7 @@ impl IggyClient { /// Returns Ok(()) on successful stream creation or a PyRuntimeError on failure. #[pyo3(signature = (name))] #[gen_stub(override_return_type(type_repr="collections.abc.Awaitable[None]", imports=("collections.abc")))] - fn create_stream<'a>( - &self, - py: Python<'a>, - name: String, - ) -> PyResult<Bound<'a, PyAny>> { + fn create_stream<'a>(&self, py: Python<'a>, name: String) -> PyResult<Bound<'a, PyAny>> { let inner = self.inner.clone(); future_into_py(py, async move { inner @@ -254,7 +251,10 @@ impl IggyClient { ) -> PyResult<Bound<'a, PyAny>> { let messages: Vec<SendMessage> = messages .iter() - .map(|item| item.extract::<SendMessage>()) + .map(|item| { + let msg: PyRef<'_, SendMessage> = item.extract()?; + Ok::<_, PyErr>(msg.clone()) + }) .collect::<Result<Vec<_>, _>>()?; let mut messages: Vec<RustMessage> = messages .into_iter() diff --git a/foreign/python/src/consumer.rs b/foreign/python/src/consumer.rs index b2ed74f76..6d2ad4faf 100644 --- a/foreign/python/src/consumer.rs +++ b/foreign/python/src/consumer.rs @@ -140,7 +140,7 @@ impl IggyConsumer { /// only the interval part is applied; the `after` mode is ignored. /// Use `consume_messages()` if you need commit-after-processing semantics. #[gen_stub(override_return_type(type_repr="collections.abc.AsyncIterator[ReceiveMessage]", imports=("collections.abc")))] - fn iter_messages<'a>(&self) -> ReceiveMessageIterator { + fn iter_messages(&self) -> ReceiveMessageIterator { let inner = self.inner.clone(); ReceiveMessageIterator { inner } } @@ -164,10 +164,10 @@ impl IggyConsumer { future_into_py(py, async { let (shutdown_tx, shutdown_rx) = tokio::sync::oneshot::channel::<()>(); - let task_locals = Python::with_gil(pyo3_async_runtimes::tokio::get_current_locals)?; + let task_locals = Python::attach(pyo3_async_runtimes::tokio::get_current_locals)?; let handle_consume = get_runtime().spawn(scope(task_locals, async move { let task_locals = - Python::with_gil(pyo3_async_runtimes::tokio::get_current_locals).unwrap(); + Python::attach(pyo3_async_runtimes::tokio::get_current_locals).unwrap(); let consumer = PyCallbackConsumer { callback: Arc::new(callback), task_locals: Arc::new(Mutex::new(task_locals)), @@ -178,12 +178,12 @@ impl IggyConsumer { let consume_result; if let Some(shutdown_event) = shutdown_event { - let task_locals = Python::with_gil(pyo3_async_runtimes::tokio::get_current_locals)?; + let task_locals = Python::attach(pyo3_async_runtimes::tokio::get_current_locals)?; async fn shutdown_impl( shutdown_event: Py<PyAny>, shutdown_tx: Sender<()>, ) -> PyResult<()> { - Python::with_gil(|py| { + Python::attach(|py| { into_future( shutdown_event .bind(py) @@ -226,14 +226,14 @@ impl MessageConsumer for PyCallbackConsumer { async fn consume(&self, received: ReceivedMessage) -> Result<(), IggyError> { let callback = self.callback.clone(); let task_locals = self.task_locals.clone().lock_owned().await; - let task_locals = Python::with_gil(|py| task_locals.clone_ref(py)); + let task_locals = task_locals.clone(); let message = ReceiveMessage { inner: received.message, partition_id: received.partition_id, }; get_runtime() .spawn(scope(task_locals, async move { - Python::with_gil(|py| { + Python::attach(|py| { let callback = callback.bind(py); let result = callback.as_any().call1((message,))?; into_future(result) @@ -344,7 +344,7 @@ impl From<&AutoCommitAfter> for RustAutoCommitAfter { } pub fn py_delta_to_iggy_duration(delta1: &Py<PyDelta>) -> IggyDuration { - Python::with_gil(|py| { + Python::attach(|py| { let delta = delta1.bind(py); let seconds = (delta.get_days() * 60 * 60 * 24 + delta.get_seconds()) as u64; let nanos = (delta.get_microseconds() * 1_000) as u32;
