This is an automated email from the ASF dual-hosted git repository.

hgruszecki pushed a commit to branch python-upgrade
in repository https://gitbox.apache.org/repos/asf/iggy.git

commit e5714b53173e5857dcaff62fde4003dd3e0e4af5
Author: Hubert Gruszecki <[email protected]>
AuthorDate: Fri Jan 30 12:48:04 2026 +0100

    chore(python): upgrade pyo3 to 0.27 and fix deprecations
---
 foreign/python/Cargo.toml      |  8 ++++----
 foreign/python/src/client.rs   | 12 ++++++------
 foreign/python/src/consumer.rs | 16 ++++++++--------
 3 files changed, 18 insertions(+), 18 deletions(-)

diff --git a/foreign/python/Cargo.toml b/foreign/python/Cargo.toml
index eb8230fdc..ac1e9f819 100644
--- a/foreign/python/Cargo.toml
+++ b/foreign/python/Cargo.toml
@@ -29,16 +29,16 @@ documentation = "https://iggy.apache.org/docs/";
 repository = "https://github.com/apache/iggy";
 
 [dependencies]
-bytes = "1.10.1"
+bytes = "1.11.0"
 futures = "0.3.31"
 iggy = { path = "../../core/sdk", version = "0.8.2-edge.1" }
-pyo3 = "0.26.0"
-pyo3-async-runtimes = { version = "0.26.0", features = [
+pyo3 = "0.27.2"
+pyo3-async-runtimes = { version = "0.27.0", features = [
     "attributes",
     "tokio-runtime",
 ] }
 pyo3-stub-gen = { git = "https://github.com/Jij-Inc/pyo3-stub-gen.git";, rev = 
"63e77533b55782799df28ea4b4676c42d203779e" }
-tokio = "1.47.1"
+tokio = "1.49.0"
 
 [lib]
 name = "apache_iggy"
diff --git a/foreign/python/src/client.rs b/foreign/python/src/client.rs
index b0db32fc3..b903329e2 100644
--- a/foreign/python/src/client.rs
+++ b/foreign/python/src/client.rs
@@ -22,6 +22,7 @@ use iggy::prelude::{
 };
 use pyo3::prelude::*;
 use pyo3::types::{PyDelta, PyList, PyType};
+use pyo3::PyRef;
 use pyo3_async_runtimes::tokio::future_into_py;
 use pyo3_stub_gen::define_stub_info_gatherer;
 use pyo3_stub_gen::derive::{gen_stub_pyclass, gen_stub_pymethods};
@@ -138,11 +139,7 @@ impl IggyClient {
     /// Returns Ok(()) on successful stream creation or a PyRuntimeError on 
failure.
     #[pyo3(signature = (name))]
     
#[gen_stub(override_return_type(type_repr="collections.abc.Awaitable[None]", 
imports=("collections.abc")))]
-    fn create_stream<'a>(
-        &self,
-        py: Python<'a>,
-        name: String,
-    ) -> PyResult<Bound<'a, PyAny>> {
+    fn create_stream<'a>(&self, py: Python<'a>, name: String) -> 
PyResult<Bound<'a, PyAny>> {
         let inner = self.inner.clone();
         future_into_py(py, async move {
             inner
@@ -254,7 +251,10 @@ impl IggyClient {
     ) -> PyResult<Bound<'a, PyAny>> {
         let messages: Vec<SendMessage> = messages
             .iter()
-            .map(|item| item.extract::<SendMessage>())
+            .map(|item| {
+                let msg: PyRef<'_, SendMessage> = item.extract()?;
+                Ok::<_, PyErr>(msg.clone())
+            })
             .collect::<Result<Vec<_>, _>>()?;
         let mut messages: Vec<RustMessage> = messages
             .into_iter()
diff --git a/foreign/python/src/consumer.rs b/foreign/python/src/consumer.rs
index b2ed74f76..6d2ad4faf 100644
--- a/foreign/python/src/consumer.rs
+++ b/foreign/python/src/consumer.rs
@@ -140,7 +140,7 @@ impl IggyConsumer {
     /// only the interval part is applied; the `after` mode is ignored.
     /// Use `consume_messages()` if you need commit-after-processing semantics.
     
#[gen_stub(override_return_type(type_repr="collections.abc.AsyncIterator[ReceiveMessage]",
 imports=("collections.abc")))]
-    fn iter_messages<'a>(&self) -> ReceiveMessageIterator {
+    fn iter_messages(&self) -> ReceiveMessageIterator {
         let inner = self.inner.clone();
         ReceiveMessageIterator { inner }
     }
@@ -164,10 +164,10 @@ impl IggyConsumer {
         future_into_py(py, async {
             let (shutdown_tx, shutdown_rx) = 
tokio::sync::oneshot::channel::<()>();
 
-            let task_locals = 
Python::with_gil(pyo3_async_runtimes::tokio::get_current_locals)?;
+            let task_locals = 
Python::attach(pyo3_async_runtimes::tokio::get_current_locals)?;
             let handle_consume = get_runtime().spawn(scope(task_locals, async 
move {
                 let task_locals =
-                    
Python::with_gil(pyo3_async_runtimes::tokio::get_current_locals).unwrap();
+                    
Python::attach(pyo3_async_runtimes::tokio::get_current_locals).unwrap();
                 let consumer = PyCallbackConsumer {
                     callback: Arc::new(callback),
                     task_locals: Arc::new(Mutex::new(task_locals)),
@@ -178,12 +178,12 @@ impl IggyConsumer {
             let consume_result;
 
             if let Some(shutdown_event) = shutdown_event {
-                let task_locals = 
Python::with_gil(pyo3_async_runtimes::tokio::get_current_locals)?;
+                let task_locals = 
Python::attach(pyo3_async_runtimes::tokio::get_current_locals)?;
                 async fn shutdown_impl(
                     shutdown_event: Py<PyAny>,
                     shutdown_tx: Sender<()>,
                 ) -> PyResult<()> {
-                    Python::with_gil(|py| {
+                    Python::attach(|py| {
                         into_future(
                             shutdown_event
                                 .bind(py)
@@ -226,14 +226,14 @@ impl MessageConsumer for PyCallbackConsumer {
     async fn consume(&self, received: ReceivedMessage) -> Result<(), 
IggyError> {
         let callback = self.callback.clone();
         let task_locals = self.task_locals.clone().lock_owned().await;
-        let task_locals = Python::with_gil(|py| task_locals.clone_ref(py));
+        let task_locals = task_locals.clone();
         let message = ReceiveMessage {
             inner: received.message,
             partition_id: received.partition_id,
         };
         get_runtime()
             .spawn(scope(task_locals, async move {
-                Python::with_gil(|py| {
+                Python::attach(|py| {
                     let callback = callback.bind(py);
                     let result = callback.as_any().call1((message,))?;
                     into_future(result)
@@ -344,7 +344,7 @@ impl From<&AutoCommitAfter> for RustAutoCommitAfter {
 }
 
 pub fn py_delta_to_iggy_duration(delta1: &Py<PyDelta>) -> IggyDuration {
-    Python::with_gil(|py| {
+    Python::attach(|py| {
         let delta = delta1.bind(py);
         let seconds = (delta.get_days() * 60 * 60 * 24 + delta.get_seconds()) 
as u64;
         let nanos = (delta.get_microseconds() * 1_000) as u32;

Reply via email to