This is an automated email from the ASF dual-hosted git repository. bashirbekov pushed a commit to branch feat/add-background-send in repository https://gitbox.apache.org/repos/asf/iggy.git
commit ccc2153b02852131b9ff02e0f791598ce1bc0b22 Author: haze518 <[email protected]> AuthorDate: Tue Jun 10 07:20:11 2025 +0600 leave 3 tests --- core/integration/tests/sdk/producer/background.rs | 220 +++++++++++----------- 1 file changed, 111 insertions(+), 109 deletions(-) diff --git a/core/integration/tests/sdk/producer/background.rs b/core/integration/tests/sdk/producer/background.rs index bbec4b28..be22af03 100644 --- a/core/integration/tests/sdk/producer/background.rs +++ b/core/integration/tests/sdk/producer/background.rs @@ -19,13 +19,15 @@ use crate::sdk::producer::{ PARTITION_ID, STREAM_ID, TOPIC_ID, cleanup, create_message_payload, init_system, }; +use bytes::Bytes; +use iggy::clients::producer_config::BackpressureMode; use iggy::prelude::*; use iggy::{clients::client::IggyClient, prelude::TcpClient}; use iggy_common::TcpClientConfig; use integration::test_server::{TestServer, login_root}; use std::sync::Arc; use std::time::Duration; -use tokio::time::sleep; +use tokio::time::{Instant, sleep}; #[tokio::test] async fn background_send_receive_ok() { @@ -199,135 +201,135 @@ async fn background_send_receive_ok() { // cleanup(&client).await; // } -// #[tokio::test] -// async fn background_block_waits_then_succeeds() { -// let mut test_server = TestServer::default(); -// test_server.start(); +#[tokio::test] +async fn background_block_waits_then_succeeds() { + let mut test_server = TestServer::default(); + test_server.start(); -// let tcp_client_config = TcpClientConfig { -// server_address: test_server.get_raw_tcp_addr().unwrap(), -// ..TcpClientConfig::default() -// }; -// let client = Box::new(TcpClient::create(Arc::new(tcp_client_config)).unwrap()); -// let client = IggyClient::create(client, None, None); + let tcp_client_config = TcpClientConfig { + server_address: test_server.get_raw_tcp_addr().unwrap(), + ..TcpClientConfig::default() + }; + let client = Box::new(TcpClient::create(Arc::new(tcp_client_config)).unwrap()); + let client = IggyClient::create(client, None, None); -// client.connect().await.unwrap(); -// login_root(&client).await; -// init_system(&client).await; + client.connect().await.unwrap(); + login_root(&client).await; + init_system(&client).await; -// let big_msg = IggyMessage::builder() -// .id(1) -// .payload(Bytes::from(vec![0u8; 2048])) -// .build() -// .unwrap(); + let big_msg = IggyMessage::builder() + .id(1) + .payload(Bytes::from(vec![0u8; 2048])) + .build() + .unwrap(); -// let cfg = BackgroundConfig::builder() -// .max_buffer_size(big_msg.get_size_bytes() + 100.into()) -// .max_in_flight(1) -// .batch_length(0) -// .batch_size(0) -// .linger_time(IggyDuration::from(300_000)) -// .failure_mode(BackpressureMode::Block) -// .build(); + let cfg = BackgroundConfig::builder() + .max_buffer_size(big_msg.get_size_bytes() + 100.into()) + .max_in_flight(1) + .batch_length(0) + .batch_size(0) + .linger_time(IggyDuration::from(300_000)) + .failure_mode(BackpressureMode::Block) + .build(); -// let producer = client -// .producer(&STREAM_ID.to_string(), &TOPIC_ID.to_string()) -// .unwrap() -// .background(cfg) -// .build(); + let producer = client + .producer(&STREAM_ID.to_string(), &TOPIC_ID.to_string()) + .unwrap() + .background(cfg) + .build(); -// producer.send(vec![big_msg]).await.unwrap(); + producer.send(vec![big_msg]).await.unwrap(); -// let small_msg = IggyMessage::builder() -// .id(2) -// .payload(Bytes::from_static(b"x")) -// .build() -// .unwrap(); + let small_msg = IggyMessage::builder() + .id(2) + .payload(Bytes::from_static(b"x")) + .build() + .unwrap(); -// let start = Instant::now(); -// let res = producer.send(vec![small_msg]).await; -// let elapsed = start.elapsed(); + let start = Instant::now(); + let res = producer.send(vec![small_msg]).await; + let elapsed = start.elapsed(); -// assert!(res.is_ok()); -// assert!(elapsed >= Duration::from_millis(300)); + assert!(res.is_ok()); + assert!(elapsed >= Duration::from_millis(300)); -// cleanup(&client).await; -// } + cleanup(&client).await; +} -// #[tokio::test] -// async fn background_graceful_shutdown() { -// let mut test_server = TestServer::default(); -// test_server.start(); +#[tokio::test] +async fn background_graceful_shutdown() { + let mut test_server = TestServer::default(); + test_server.start(); -// let tcp_client_config = TcpClientConfig { -// server_address: test_server.get_raw_tcp_addr().unwrap(), -// ..TcpClientConfig::default() -// }; -// let client = Box::new(TcpClient::create(Arc::new(tcp_client_config)).unwrap()); -// let client = IggyClient::create(client, None, None); + let tcp_client_config = TcpClientConfig { + server_address: test_server.get_raw_tcp_addr().unwrap(), + ..TcpClientConfig::default() + }; + let client = Box::new(TcpClient::create(Arc::new(tcp_client_config)).unwrap()); + let client = IggyClient::create(client, None, None); -// client.connect().await.unwrap(); -// assert!(client.ping().await.is_ok(), "Failed to ping server"); + client.connect().await.unwrap(); + assert!(client.ping().await.is_ok(), "Failed to ping server"); -// login_root(&client).await; -// init_system(&client).await; + login_root(&client).await; + init_system(&client).await; -// client.connect().await.unwrap(); -// assert!(client.ping().await.is_ok(), "Failed to ping server"); + client.connect().await.unwrap(); + assert!(client.ping().await.is_ok(), "Failed to ping server"); -// let cfg = BackgroundConfig::builder() -// .max_in_flight(1) -// .batch_length(0) -// .batch_size(0) -// .linger_time(IggyDuration::from(2_000_000)) // 2s – long enough not to flush automatically -// .build(); -// let producer = client -// .producer(&STREAM_ID.to_string(), &TOPIC_ID.to_string()) -// .unwrap() -// .background(cfg) -// .build(); + let cfg = BackgroundConfig::builder() + .max_in_flight(1) + .batch_length(0) + .batch_size(0) + .linger_time(IggyDuration::from(2_000_000)) // 2s – long enough not to flush automatically + .build(); + let producer = client + .producer(&STREAM_ID.to_string(), &TOPIC_ID.to_string()) + .unwrap() + .background(cfg) + .build(); -// let msg = IggyMessage::builder() -// .id(1) -// .payload(Bytes::from(vec![0u8; 512])) -// .build() -// .unwrap(); -// producer.send(vec![msg]).await.unwrap(); + let msg = IggyMessage::builder() + .id(1) + .payload(Bytes::from(vec![0u8; 512])) + .build() + .unwrap(); + producer.send(vec![msg]).await.unwrap(); -// sleep(Duration::from_millis(1000)).await; + sleep(Duration::from_millis(1000)).await; -// let consumer = Consumer::default(); -// let polled_messages = client -// .poll_messages( -// &Identifier::numeric(STREAM_ID).unwrap(), -// &Identifier::numeric(TOPIC_ID).unwrap(), -// Some(PARTITION_ID), -// &consumer, -// &PollingStrategy::offset(0), -// 1, -// false, -// ) -// .await -// .unwrap(); -// assert_eq!(polled_messages.messages.len() as u32, 0); + let consumer = Consumer::default(); + let polled_messages = client + .poll_messages( + &Identifier::numeric(STREAM_ID).unwrap(), + &Identifier::numeric(TOPIC_ID).unwrap(), + Some(PARTITION_ID), + &consumer, + &PollingStrategy::offset(0), + 1, + false, + ) + .await + .unwrap(); + assert_eq!(polled_messages.messages.len() as u32, 0); -// producer.shutdown().await; -// let polled_messages = client -// .poll_messages( -// &Identifier::numeric(STREAM_ID).unwrap(), -// &Identifier::numeric(TOPIC_ID).unwrap(), -// Some(PARTITION_ID), -// &consumer, -// &PollingStrategy::offset(0), -// 1, -// false, -// ) -// .await -// .unwrap(); -// assert_eq!(polled_messages.messages.len() as u32, 1); + producer.shutdown().await; + let polled_messages = client + .poll_messages( + &Identifier::numeric(STREAM_ID).unwrap(), + &Identifier::numeric(TOPIC_ID).unwrap(), + Some(PARTITION_ID), + &consumer, + &PollingStrategy::offset(0), + 1, + false, + ) + .await + .unwrap(); + assert_eq!(polled_messages.messages.len() as u32, 1); -// cleanup(&client).await; -// } + cleanup(&client).await; +} // #[tokio::test] // async fn background_many_parallel_producers() {
