This is an automated email from the ASF dual-hosted git repository.
hgruszecki pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iggy.git
The following commit(s) were added to refs/heads/master by this push:
new 29ab31855 chore(python): upgrade pyo3 to 0.27 and fix deprecations
(#2643)
29ab31855 is described below
commit 29ab31855e974de75610dd484542a60479ae2244
Author: Hubert Gruszecki <[email protected]>
AuthorDate: Fri Jan 30 20:52:33 2026 +0100
chore(python): upgrade pyo3 to 0.27 and fix deprecations (#2643)
Co-authored-by: Piotr Gankiewicz <[email protected]>
---
foreign/python/Cargo.toml | 8 ++++----
foreign/python/src/client.rs | 12 ++++++------
foreign/python/src/consumer.rs | 16 ++++++++--------
3 files changed, 18 insertions(+), 18 deletions(-)
diff --git a/foreign/python/Cargo.toml b/foreign/python/Cargo.toml
index eb8230fdc..ac1e9f819 100644
--- a/foreign/python/Cargo.toml
+++ b/foreign/python/Cargo.toml
@@ -29,16 +29,16 @@ documentation = "https://iggy.apache.org/docs/"
repository = "https://github.com/apache/iggy"
[dependencies]
-bytes = "1.10.1"
+bytes = "1.11.0"
futures = "0.3.31"
iggy = { path = "../../core/sdk", version = "0.8.2-edge.1" }
-pyo3 = "0.26.0"
-pyo3-async-runtimes = { version = "0.26.0", features = [
+pyo3 = "0.27.2"
+pyo3-async-runtimes = { version = "0.27.0", features = [
"attributes",
"tokio-runtime",
] }
pyo3-stub-gen = { git = "https://github.com/Jij-Inc/pyo3-stub-gen.git", rev =
"63e77533b55782799df28ea4b4676c42d203779e" }
-tokio = "1.47.1"
+tokio = "1.49.0"
[lib]
name = "apache_iggy"
diff --git a/foreign/python/src/client.rs b/foreign/python/src/client.rs
index b0db32fc3..b903329e2 100644
--- a/foreign/python/src/client.rs
+++ b/foreign/python/src/client.rs
@@ -22,6 +22,7 @@ use iggy::prelude::{
};
use pyo3::prelude::*;
use pyo3::types::{PyDelta, PyList, PyType};
+use pyo3::PyRef;
use pyo3_async_runtimes::tokio::future_into_py;
use pyo3_stub_gen::define_stub_info_gatherer;
use pyo3_stub_gen::derive::{gen_stub_pyclass, gen_stub_pymethods};
@@ -138,11 +139,7 @@ impl IggyClient {
/// Returns Ok(()) on successful stream creation or a PyRuntimeError on
failure.
#[pyo3(signature = (name))]
#[gen_stub(override_return_type(type_repr="collections.abc.Awaitable[None]",
imports=("collections.abc")))]
- fn create_stream<'a>(
- &self,
- py: Python<'a>,
- name: String,
- ) -> PyResult<Bound<'a, PyAny>> {
+ fn create_stream<'a>(&self, py: Python<'a>, name: String) ->
PyResult<Bound<'a, PyAny>> {
let inner = self.inner.clone();
future_into_py(py, async move {
inner
@@ -254,7 +251,10 @@ impl IggyClient {
) -> PyResult<Bound<'a, PyAny>> {
let messages: Vec<SendMessage> = messages
.iter()
- .map(|item| item.extract::<SendMessage>())
+ .map(|item| {
+ let msg: PyRef<'_, SendMessage> = item.extract()?;
+ Ok::<_, PyErr>(msg.clone())
+ })
.collect::<Result<Vec<_>, _>>()?;
let mut messages: Vec<RustMessage> = messages
.into_iter()
diff --git a/foreign/python/src/consumer.rs b/foreign/python/src/consumer.rs
index b2ed74f76..6d2ad4faf 100644
--- a/foreign/python/src/consumer.rs
+++ b/foreign/python/src/consumer.rs
@@ -140,7 +140,7 @@ impl IggyConsumer {
/// only the interval part is applied; the `after` mode is ignored.
/// Use `consume_messages()` if you need commit-after-processing semantics.
#[gen_stub(override_return_type(type_repr="collections.abc.AsyncIterator[ReceiveMessage]",
imports=("collections.abc")))]
- fn iter_messages<'a>(&self) -> ReceiveMessageIterator {
+ fn iter_messages(&self) -> ReceiveMessageIterator {
let inner = self.inner.clone();
ReceiveMessageIterator { inner }
}
@@ -164,10 +164,10 @@ impl IggyConsumer {
future_into_py(py, async {
let (shutdown_tx, shutdown_rx) =
tokio::sync::oneshot::channel::<()>();
- let task_locals =
Python::with_gil(pyo3_async_runtimes::tokio::get_current_locals)?;
+ let task_locals =
Python::attach(pyo3_async_runtimes::tokio::get_current_locals)?;
let handle_consume = get_runtime().spawn(scope(task_locals, async
move {
let task_locals =
-
Python::with_gil(pyo3_async_runtimes::tokio::get_current_locals).unwrap();
+
Python::attach(pyo3_async_runtimes::tokio::get_current_locals).unwrap();
let consumer = PyCallbackConsumer {
callback: Arc::new(callback),
task_locals: Arc::new(Mutex::new(task_locals)),
@@ -178,12 +178,12 @@ impl IggyConsumer {
let consume_result;
if let Some(shutdown_event) = shutdown_event {
- let task_locals =
Python::with_gil(pyo3_async_runtimes::tokio::get_current_locals)?;
+ let task_locals =
Python::attach(pyo3_async_runtimes::tokio::get_current_locals)?;
async fn shutdown_impl(
shutdown_event: Py<PyAny>,
shutdown_tx: Sender<()>,
) -> PyResult<()> {
- Python::with_gil(|py| {
+ Python::attach(|py| {
into_future(
shutdown_event
.bind(py)
@@ -226,14 +226,14 @@ impl MessageConsumer for PyCallbackConsumer {
async fn consume(&self, received: ReceivedMessage) -> Result<(),
IggyError> {
let callback = self.callback.clone();
let task_locals = self.task_locals.clone().lock_owned().await;
- let task_locals = Python::with_gil(|py| task_locals.clone_ref(py));
+ let task_locals = task_locals.clone();
let message = ReceiveMessage {
inner: received.message,
partition_id: received.partition_id,
};
get_runtime()
.spawn(scope(task_locals, async move {
- Python::with_gil(|py| {
+ Python::attach(|py| {
let callback = callback.bind(py);
let result = callback.as_any().call1((message,))?;
into_future(result)
@@ -344,7 +344,7 @@ impl From<&AutoCommitAfter> for RustAutoCommitAfter {
}
pub fn py_delta_to_iggy_duration(delta1: &Py<PyDelta>) -> IggyDuration {
- Python::with_gil(|py| {
+ Python::attach(|py| {
let delta = delta1.bind(py);
let seconds = (delta.get_days() * 60 * 60 * 24 + delta.get_seconds())
as u64;
let nanos = (delta.get_microseconds() * 1_000) as u32;