This is an automated email from the ASF dual-hosted git repository.

bashirbekov pushed a commit to branch feat/add-background-send
in repository https://gitbox.apache.org/repos/asf/iggy.git

commit ccc2153b02852131b9ff02e0f791598ce1bc0b22
Author: haze518 <[email protected]>
AuthorDate: Tue Jun 10 07:20:11 2025 +0600

    leave 3 tests
---
 core/integration/tests/sdk/producer/background.rs | 220 +++++++++++-----------
 1 file changed, 111 insertions(+), 109 deletions(-)

diff --git a/core/integration/tests/sdk/producer/background.rs 
b/core/integration/tests/sdk/producer/background.rs
index bbec4b28..be22af03 100644
--- a/core/integration/tests/sdk/producer/background.rs
+++ b/core/integration/tests/sdk/producer/background.rs
@@ -19,13 +19,15 @@
 use crate::sdk::producer::{
     PARTITION_ID, STREAM_ID, TOPIC_ID, cleanup, create_message_payload, 
init_system,
 };
+use bytes::Bytes;
+use iggy::clients::producer_config::BackpressureMode;
 use iggy::prelude::*;
 use iggy::{clients::client::IggyClient, prelude::TcpClient};
 use iggy_common::TcpClientConfig;
 use integration::test_server::{TestServer, login_root};
 use std::sync::Arc;
 use std::time::Duration;
-use tokio::time::sleep;
+use tokio::time::{Instant, sleep};
 
 #[tokio::test]
 async fn background_send_receive_ok() {
@@ -199,135 +201,135 @@ async fn background_send_receive_ok() {
 //     cleanup(&client).await;
 // }
 
-// #[tokio::test]
-// async fn background_block_waits_then_succeeds() {
-//     let mut test_server = TestServer::default();
-//     test_server.start();
+#[tokio::test]
+async fn background_block_waits_then_succeeds() {
+    let mut test_server = TestServer::default();
+    test_server.start();
 
-//     let tcp_client_config = TcpClientConfig {
-//         server_address: test_server.get_raw_tcp_addr().unwrap(),
-//         ..TcpClientConfig::default()
-//     };
-//     let client = 
Box::new(TcpClient::create(Arc::new(tcp_client_config)).unwrap());
-//     let client = IggyClient::create(client, None, None);
+    let tcp_client_config = TcpClientConfig {
+        server_address: test_server.get_raw_tcp_addr().unwrap(),
+        ..TcpClientConfig::default()
+    };
+    let client = 
Box::new(TcpClient::create(Arc::new(tcp_client_config)).unwrap());
+    let client = IggyClient::create(client, None, None);
 
-//     client.connect().await.unwrap();
-//     login_root(&client).await;
-//     init_system(&client).await;
+    client.connect().await.unwrap();
+    login_root(&client).await;
+    init_system(&client).await;
 
-//     let big_msg = IggyMessage::builder()
-//         .id(1)
-//         .payload(Bytes::from(vec![0u8; 2048]))
-//         .build()
-//         .unwrap();
+    let big_msg = IggyMessage::builder()
+        .id(1)
+        .payload(Bytes::from(vec![0u8; 2048]))
+        .build()
+        .unwrap();
 
-//     let cfg = BackgroundConfig::builder()
-//         .max_buffer_size(big_msg.get_size_bytes() + 100.into())
-//         .max_in_flight(1)
-//         .batch_length(0)
-//         .batch_size(0)
-//         .linger_time(IggyDuration::from(300_000))
-//         .failure_mode(BackpressureMode::Block)
-//         .build();
+    let cfg = BackgroundConfig::builder()
+        .max_buffer_size(big_msg.get_size_bytes() + 100.into())
+        .max_in_flight(1)
+        .batch_length(0)
+        .batch_size(0)
+        .linger_time(IggyDuration::from(300_000))
+        .failure_mode(BackpressureMode::Block)
+        .build();
 
-//     let producer = client
-//         .producer(&STREAM_ID.to_string(), &TOPIC_ID.to_string())
-//         .unwrap()
-//         .background(cfg)
-//         .build();
+    let producer = client
+        .producer(&STREAM_ID.to_string(), &TOPIC_ID.to_string())
+        .unwrap()
+        .background(cfg)
+        .build();
 
-//     producer.send(vec![big_msg]).await.unwrap();
+    producer.send(vec![big_msg]).await.unwrap();
 
-//     let small_msg = IggyMessage::builder()
-//         .id(2)
-//         .payload(Bytes::from_static(b"x"))
-//         .build()
-//         .unwrap();
+    let small_msg = IggyMessage::builder()
+        .id(2)
+        .payload(Bytes::from_static(b"x"))
+        .build()
+        .unwrap();
 
-//     let start = Instant::now();
-//     let res = producer.send(vec![small_msg]).await;
-//     let elapsed = start.elapsed();
+    let start = Instant::now();
+    let res = producer.send(vec![small_msg]).await;
+    let elapsed = start.elapsed();
 
-//     assert!(res.is_ok());
-//     assert!(elapsed >= Duration::from_millis(300));
+    assert!(res.is_ok());
+    assert!(elapsed >= Duration::from_millis(300));
 
-//     cleanup(&client).await;
-// }
+    cleanup(&client).await;
+}
 
-// #[tokio::test]
-// async fn background_graceful_shutdown() {
-//     let mut test_server = TestServer::default();
-//     test_server.start();
+#[tokio::test]
+async fn background_graceful_shutdown() {
+    let mut test_server = TestServer::default();
+    test_server.start();
 
-//     let tcp_client_config = TcpClientConfig {
-//         server_address: test_server.get_raw_tcp_addr().unwrap(),
-//         ..TcpClientConfig::default()
-//     };
-//     let client = 
Box::new(TcpClient::create(Arc::new(tcp_client_config)).unwrap());
-//     let client = IggyClient::create(client, None, None);
+    let tcp_client_config = TcpClientConfig {
+        server_address: test_server.get_raw_tcp_addr().unwrap(),
+        ..TcpClientConfig::default()
+    };
+    let client = 
Box::new(TcpClient::create(Arc::new(tcp_client_config)).unwrap());
+    let client = IggyClient::create(client, None, None);
 
-//     client.connect().await.unwrap();
-//     assert!(client.ping().await.is_ok(), "Failed to ping server");
+    client.connect().await.unwrap();
+    assert!(client.ping().await.is_ok(), "Failed to ping server");
 
-//     login_root(&client).await;
-//     init_system(&client).await;
+    login_root(&client).await;
+    init_system(&client).await;
 
-//     client.connect().await.unwrap();
-//     assert!(client.ping().await.is_ok(), "Failed to ping server");
+    client.connect().await.unwrap();
+    assert!(client.ping().await.is_ok(), "Failed to ping server");
 
-//     let cfg = BackgroundConfig::builder()
-//         .max_in_flight(1)
-//         .batch_length(0)
-//         .batch_size(0)
-//         .linger_time(IggyDuration::from(2_000_000)) // 2s – long enough not 
to flush automatically
-//         .build();
-//     let producer = client
-//         .producer(&STREAM_ID.to_string(), &TOPIC_ID.to_string())
-//         .unwrap()
-//         .background(cfg)
-//         .build();
+    let cfg = BackgroundConfig::builder()
+        .max_in_flight(1)
+        .batch_length(0)
+        .batch_size(0)
+        .linger_time(IggyDuration::from(2_000_000)) // 2s – long enough not to 
flush automatically
+        .build();
+    let producer = client
+        .producer(&STREAM_ID.to_string(), &TOPIC_ID.to_string())
+        .unwrap()
+        .background(cfg)
+        .build();
 
-//     let msg = IggyMessage::builder()
-//         .id(1)
-//         .payload(Bytes::from(vec![0u8; 512]))
-//         .build()
-//         .unwrap();
-//     producer.send(vec![msg]).await.unwrap();
+    let msg = IggyMessage::builder()
+        .id(1)
+        .payload(Bytes::from(vec![0u8; 512]))
+        .build()
+        .unwrap();
+    producer.send(vec![msg]).await.unwrap();
 
-//     sleep(Duration::from_millis(1000)).await;
+    sleep(Duration::from_millis(1000)).await;
 
-//     let consumer = Consumer::default();
-//     let polled_messages = client
-//         .poll_messages(
-//             &Identifier::numeric(STREAM_ID).unwrap(),
-//             &Identifier::numeric(TOPIC_ID).unwrap(),
-//             Some(PARTITION_ID),
-//             &consumer,
-//             &PollingStrategy::offset(0),
-//             1,
-//             false,
-//         )
-//         .await
-//         .unwrap();
-//     assert_eq!(polled_messages.messages.len() as u32, 0);
+    let consumer = Consumer::default();
+    let polled_messages = client
+        .poll_messages(
+            &Identifier::numeric(STREAM_ID).unwrap(),
+            &Identifier::numeric(TOPIC_ID).unwrap(),
+            Some(PARTITION_ID),
+            &consumer,
+            &PollingStrategy::offset(0),
+            1,
+            false,
+        )
+        .await
+        .unwrap();
+    assert_eq!(polled_messages.messages.len() as u32, 0);
 
-//     producer.shutdown().await;
-//     let polled_messages = client
-//         .poll_messages(
-//             &Identifier::numeric(STREAM_ID).unwrap(),
-//             &Identifier::numeric(TOPIC_ID).unwrap(),
-//             Some(PARTITION_ID),
-//             &consumer,
-//             &PollingStrategy::offset(0),
-//             1,
-//             false,
-//         )
-//         .await
-//         .unwrap();
-//     assert_eq!(polled_messages.messages.len() as u32, 1);
+    producer.shutdown().await;
+    let polled_messages = client
+        .poll_messages(
+            &Identifier::numeric(STREAM_ID).unwrap(),
+            &Identifier::numeric(TOPIC_ID).unwrap(),
+            Some(PARTITION_ID),
+            &consumer,
+            &PollingStrategy::offset(0),
+            1,
+            false,
+        )
+        .await
+        .unwrap();
+    assert_eq!(polled_messages.messages.len() as u32, 1);
 
-//     cleanup(&client).await;
-// }
+    cleanup(&client).await;
+}
 
 // #[tokio::test]
 // async fn background_many_parallel_producers() {

Reply via email to