This is an automated email from the ASF dual-hosted git repository.

yuxia pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/fluss-rust.git


The following commit(s) were added to refs/heads/main by this push:
     new d76eb28  test: Kv Table Integration tests (#192)
d76eb28 is described below

commit d76eb288efbdb8f6ca943463be23f07f9da43f6c
Author: Keith Lee <[email protected]>
AuthorDate: Fri Jan 23 07:39:34 2026 +0000

    test: Kv Table Integration tests (#192)
---
 crates/fluss/tests/integration/admin.rs            |  40 +-
 crates/fluss/tests/integration/kv_table.rs         | 444 +++++++++++++++++++++
 crates/fluss/tests/integration/table.rs            |  44 +-
 .../fluss/tests/integration/table_remote_scan.rs   |  34 +-
 crates/fluss/tests/integration/utils.rs            |  74 ++++
 crates/fluss/tests/test_fluss.rs                   |   1 +
 6 files changed, 538 insertions(+), 99 deletions(-)

diff --git a/crates/fluss/tests/integration/admin.rs 
b/crates/fluss/tests/integration/admin.rs
index ccb7172..fbdb295 100644
--- a/crates/fluss/tests/integration/admin.rs
+++ b/crates/fluss/tests/integration/admin.rs
@@ -33,55 +33,25 @@ static SHARED_FLUSS_CLUSTER: 
LazyLock<Arc<RwLock<Option<FlussTestingCluster>>>>
 #[after_all]
 mod admin_test {
     use super::SHARED_FLUSS_CLUSTER;
-    use crate::integration::fluss_cluster::{FlussTestingCluster, 
FlussTestingClusterBuilder};
+    use crate::integration::fluss_cluster::FlussTestingCluster;
+    use crate::integration::utils::{get_cluster, start_cluster, stop_cluster};
     use fluss::error::FlussError;
     use fluss::metadata::{
         DataTypes, DatabaseDescriptorBuilder, KvFormat, LogFormat, Schema, 
TableDescriptor,
         TablePath,
     };
     use std::sync::Arc;
-    use std::thread;
 
     fn before_all() {
-        // Create a new tokio runtime in a separate thread
-        let cluster_guard = SHARED_FLUSS_CLUSTER.clone();
-        thread::spawn(move || {
-            let rt = tokio::runtime::Runtime::new().expect("Failed to create 
runtime");
-            rt.block_on(async {
-                let cluster = 
FlussTestingClusterBuilder::new("test-admin").build().await;
-                let mut guard = cluster_guard.write();
-                *guard = Some(cluster);
-            });
-        })
-        .join()
-        .expect("Failed to create cluster");
-        // wait for 20 seconds to avoid the error like
-        // CoordinatorEventProcessor is not initialized yet
-        thread::sleep(std::time::Duration::from_secs(20));
+        start_cluster("test-admin", SHARED_FLUSS_CLUSTER.clone());
     }
 
     fn get_fluss_cluster() -> Arc<FlussTestingCluster> {
-        let cluster_guard = SHARED_FLUSS_CLUSTER.read();
-        if cluster_guard.is_none() {
-            panic!("Fluss cluster not initialized. Make sure before_all() was 
called.");
-        }
-        Arc::new(cluster_guard.as_ref().unwrap().clone())
+        get_cluster(&SHARED_FLUSS_CLUSTER)
     }
 
     fn after_all() {
-        // Create a new tokio runtime in a separate thread
-        let cluster_guard = SHARED_FLUSS_CLUSTER.clone();
-        std::thread::spawn(move || {
-            let rt = tokio::runtime::Runtime::new().expect("Failed to create 
runtime");
-            rt.block_on(async {
-                let mut guard = cluster_guard.write();
-                if let Some(cluster) = guard.take() {
-                    cluster.stop().await;
-                }
-            });
-        })
-        .join()
-        .expect("Failed to cleanup cluster");
+        stop_cluster(SHARED_FLUSS_CLUSTER.clone());
     }
 
     #[tokio::test]
diff --git a/crates/fluss/tests/integration/kv_table.rs 
b/crates/fluss/tests/integration/kv_table.rs
new file mode 100644
index 0000000..efd7957
--- /dev/null
+++ b/crates/fluss/tests/integration/kv_table.rs
@@ -0,0 +1,444 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+use parking_lot::RwLock;
+use std::sync::Arc;
+use std::sync::LazyLock;
+
+use crate::integration::fluss_cluster::FlussTestingCluster;
+#[cfg(test)]
+use test_env_helpers::*;
+
+// Module-level shared cluster instance (only for this test file)
+static SHARED_FLUSS_CLUSTER: 
LazyLock<Arc<RwLock<Option<FlussTestingCluster>>>> =
+    LazyLock::new(|| Arc::new(RwLock::new(None)));
+
+#[cfg(test)]
+#[before_all]
+#[after_all]
+mod kv_table_test {
+    use super::SHARED_FLUSS_CLUSTER;
+    use crate::integration::fluss_cluster::FlussTestingCluster;
+    use crate::integration::utils::{create_table, get_cluster, start_cluster, 
stop_cluster};
+    use fluss::client::UpsertWriter;
+    use fluss::metadata::{DataTypes, Schema, TableDescriptor, TablePath};
+    use fluss::row::{GenericRow, InternalRow};
+    use std::sync::Arc;
+
+    fn before_all() {
+        start_cluster("test_kv_table", SHARED_FLUSS_CLUSTER.clone());
+    }
+
+    fn get_fluss_cluster() -> Arc<FlussTestingCluster> {
+        get_cluster(&SHARED_FLUSS_CLUSTER)
+    }
+
+    fn after_all() {
+        stop_cluster(SHARED_FLUSS_CLUSTER.clone());
+    }
+
+    fn make_key(id: i32) -> GenericRow<'static> {
+        let mut row = GenericRow::new();
+        row.set_field(0, id);
+        row.set_field(1, "");
+        row.set_field(2, 0i64);
+        row
+    }
+
+    #[tokio::test]
+    async fn upsert_delete_and_lookup() {
+        let cluster = get_fluss_cluster();
+        let connection = cluster.get_fluss_connection().await;
+
+        let admin = connection.get_admin().await.expect("Failed to get admin");
+
+        let table_path = TablePath::new("fluss".to_string(), 
"test_upsert_and_lookup".to_string());
+
+        let table_descriptor = TableDescriptor::builder()
+            .schema(
+                Schema::builder()
+                    .column("id", DataTypes::int())
+                    .column("name", DataTypes::string())
+                    .column("age", DataTypes::bigint())
+                    .primary_key(vec!["id".to_string()])
+                    .build()
+                    .expect("Failed to build schema"),
+            )
+            .build()
+            .expect("Failed to build table");
+
+        create_table(&admin, &table_path, &table_descriptor).await;
+
+        let table = connection
+            .get_table(&table_path)
+            .await
+            .expect("Failed to get table");
+
+        let table_upsert = table.new_upsert().expect("Failed to create 
upsert");
+        let mut upsert_writer = table_upsert
+            .create_writer()
+            .expect("Failed to create writer");
+
+        let test_data = [(1, "Verso", 32i64), (2, "Noco", 25), (3, "Esquie", 
35)];
+
+        // Upsert rows
+        for (id, name, age) in &test_data {
+            let mut row = GenericRow::new();
+            row.set_field(0, *id);
+            row.set_field(1, *name);
+            row.set_field(2, *age);
+            upsert_writer
+                .upsert(&row)
+                .await
+                .expect("Failed to upsert row");
+        }
+
+        // Lookup records
+        let mut lookuper = table
+            .new_lookup()
+            .expect("Failed to create lookup")
+            .create_lookuper()
+            .expect("Failed to create lookuper");
+
+        // Verify lookup results
+        for (id, expected_name, expected_age) in &test_data {
+            let result = lookuper
+                .lookup(&make_key(*id))
+                .await
+                .expect("Failed to lookup");
+            let row = result
+                .get_single_row()
+                .expect("Failed to get row")
+                .expect("Row should exist");
+
+            assert_eq!(row.get_int(0), *id, "id mismatch");
+            assert_eq!(row.get_string(1), *expected_name, "name mismatch");
+            assert_eq!(row.get_long(2), *expected_age, "age mismatch");
+        }
+
+        // Update the record with new age
+        let mut updated_row = GenericRow::new();
+        updated_row.set_field(0, 1);
+        updated_row.set_field(1, "Verso");
+        updated_row.set_field(2, 33i64);
+        upsert_writer
+            .upsert(&updated_row)
+            .await
+            .expect("Failed to upsert updated row");
+
+        // Verify the update
+        let result = lookuper
+            .lookup(&make_key(1))
+            .await
+            .expect("Failed to lookup after update");
+        let found_row = result
+            .get_single_row()
+            .expect("Failed to get row")
+            .expect("Row should exist");
+        assert_eq!(
+            found_row.get_long(2),
+            updated_row.get_long(2),
+            "Age should be updated"
+        );
+        assert_eq!(
+            found_row.get_string(1),
+            updated_row.get_string(1),
+            "Name should remain unchanged"
+        );
+
+        // Delete record with id=1
+        let mut delete_row = GenericRow::new();
+        delete_row.set_field(0, 1);
+        delete_row.set_field(1, "");
+        delete_row.set_field(2, 0i64);
+        upsert_writer
+            .delete(&delete_row)
+            .await
+            .expect("Failed to delete");
+
+        // Verify deletion
+        let result = lookuper
+            .lookup(&make_key(1))
+            .await
+            .expect("Failed to lookup deleted record");
+        assert!(
+            result
+                .get_single_row()
+                .expect("Failed to get row")
+                .is_none(),
+            "Record 1 should not exist after delete"
+        );
+
+        // Verify other records still exist
+        for i in [2, 3] {
+            let result = lookuper
+                .lookup(&make_key(i))
+                .await
+                .expect("Failed to lookup");
+            assert!(
+                result
+                    .get_single_row()
+                    .expect("Failed to get row")
+                    .is_some(),
+                "Record {} should still exist after deleting record 1",
+                i
+            );
+        }
+
+        // Lookup non-existent key
+        let result = lookuper
+            .lookup(&make_key(999))
+            .await
+            .expect("Failed to lookup non-existent key");
+        assert!(
+            result
+                .get_single_row()
+                .expect("Failed to get row")
+                .is_none(),
+            "Non-existent key should return None"
+        );
+
+        admin
+            .drop_table(&table_path, false)
+            .await
+            .expect("Failed to drop table");
+    }
+
+    #[tokio::test]
+    async fn composite_primary_keys() {
+        let cluster = get_fluss_cluster();
+        let connection = cluster.get_fluss_connection().await;
+
+        let admin = connection.get_admin().await.expect("Failed to get admin");
+
+        let table_path = TablePath::new("fluss".to_string(), 
"test_composite_pk".to_string());
+
+        let table_descriptor = TableDescriptor::builder()
+            .schema(
+                Schema::builder()
+                    .column("region", DataTypes::string())
+                    .column("user_id", DataTypes::int())
+                    .column("score", DataTypes::bigint())
+                    .primary_key(vec!["region".to_string(), 
"user_id".to_string()])
+                    .build()
+                    .expect("Failed to build schema"),
+            )
+            .build()
+            .expect("Failed to build table");
+
+        create_table(&admin, &table_path, &table_descriptor).await;
+
+        let table = connection
+            .get_table(&table_path)
+            .await
+            .expect("Failed to get table");
+
+        let table_upsert = table.new_upsert().expect("Failed to create 
upsert");
+        let mut upsert_writer = table_upsert
+            .create_writer()
+            .expect("Failed to create writer");
+
+        // Insert records with composite keys
+        let test_data = [
+            ("US", 1, 100i64),
+            ("US", 2, 200i64),
+            ("EU", 1, 150i64),
+            ("EU", 2, 250i64),
+        ];
+
+        for (region, user_id, score) in &test_data {
+            let mut row = GenericRow::new();
+            row.set_field(0, *region);
+            row.set_field(1, *user_id);
+            row.set_field(2, *score);
+            upsert_writer.upsert(&row).await.expect("Failed to upsert");
+        }
+
+        // Lookup with composite key
+        let mut lookuper = table
+            .new_lookup()
+            .expect("Failed to create lookup")
+            .create_lookuper()
+            .expect("Failed to create lookuper");
+
+        // Lookup (US, 1) - should return score 100
+        let mut key = GenericRow::new();
+        key.set_field(0, "US");
+        key.set_field(1, 1);
+        let result = lookuper.lookup(&key).await.expect("Failed to lookup");
+        let row = result
+            .get_single_row()
+            .expect("Failed to get row")
+            .expect("Row should exist");
+        assert_eq!(row.get_long(2), 100, "Score for (US, 1) should be 100");
+
+        // Lookup (EU, 2) - should return score 250
+        let mut key = GenericRow::new();
+        key.set_field(0, "EU");
+        key.set_field(1, 2);
+        let result = lookuper.lookup(&key).await.expect("Failed to lookup");
+        let row = result
+            .get_single_row()
+            .expect("Failed to get row")
+            .expect("Row should exist");
+        assert_eq!(row.get_long(2), 250, "Score for (EU, 2) should be 250");
+
+        // Update (US, 1) score
+        let mut update_row = GenericRow::new();
+        update_row.set_field(0, "US");
+        update_row.set_field(1, 1);
+        update_row.set_field(2, 500i64);
+        upsert_writer
+            .upsert(&update_row)
+            .await
+            .expect("Failed to update");
+
+        // Verify update
+        let mut key = GenericRow::new();
+        key.set_field(0, "US");
+        key.set_field(1, 1);
+        let result = lookuper.lookup(&key).await.expect("Failed to lookup");
+        let row = result
+            .get_single_row()
+            .expect("Failed to get row")
+            .expect("Row should exist");
+        assert_eq!(
+            row.get_long(2),
+            update_row.get_long(2),
+            "Row score should be updated"
+        );
+
+        admin
+            .drop_table(&table_path, false)
+            .await
+            .expect("Failed to drop table");
+    }
+
+    #[tokio::test]
+    async fn partial_update() {
+        use fluss::row::Datum;
+
+        let cluster = get_fluss_cluster();
+        let connection = cluster.get_fluss_connection().await;
+
+        let admin = connection.get_admin().await.expect("Failed to get admin");
+
+        let table_path = TablePath::new("fluss".to_string(), 
"test_partial_update".to_string());
+
+        let table_descriptor = TableDescriptor::builder()
+            .schema(
+                Schema::builder()
+                    .column("id", DataTypes::int())
+                    .column("name", DataTypes::string())
+                    .column("age", DataTypes::bigint())
+                    .column("score", DataTypes::bigint())
+                    .primary_key(vec!["id".to_string()])
+                    .build()
+                    .expect("Failed to build schema"),
+            )
+            .build()
+            .expect("Failed to build table");
+
+        create_table(&admin, &table_path, &table_descriptor).await;
+
+        let table = connection
+            .get_table(&table_path)
+            .await
+            .expect("Failed to get table");
+
+        // Insert initial record with all columns
+        let table_upsert = table.new_upsert().expect("Failed to create 
upsert");
+        let mut upsert_writer = table_upsert
+            .create_writer()
+            .expect("Failed to create writer");
+
+        let mut row = GenericRow::new();
+        row.set_field(0, 1);
+        row.set_field(1, "Verso");
+        row.set_field(2, 32i64);
+        row.set_field(3, 6942i64);
+        upsert_writer
+            .upsert(&row)
+            .await
+            .expect("Failed to upsert initial row");
+
+        // Verify initial record
+        let mut lookuper = table
+            .new_lookup()
+            .expect("Failed to create lookup")
+            .create_lookuper()
+            .expect("Failed to create lookuper");
+
+        let result = lookuper
+            .lookup(&make_key(1))
+            .await
+            .expect("Failed to lookup");
+        let found_row = result
+            .get_single_row()
+            .expect("Failed to get row")
+            .expect("Row should exist");
+
+        assert_eq!(found_row.get_int(0), 1);
+        assert_eq!(found_row.get_string(1), "Verso");
+        assert_eq!(found_row.get_long(2), 32i64);
+        assert_eq!(found_row.get_long(3), 6942i64);
+
+        // Create partial update writer to update only score column
+        let partial_upsert = table_upsert
+            .partial_update_with_column_names(&["id", "score"])
+            .expect("Failed to create TableUpsert with partial update");
+        let mut partial_writer = partial_upsert
+            .create_writer()
+            .expect("Failed to create UpsertWriter with partial write");
+
+        // Update only the score column
+        let mut partial_row = GenericRow::new();
+        partial_row.set_field(0, 1);
+        partial_row.set_field(1, Datum::Null); // not in partial update column
+        partial_row.set_field(2, Datum::Null); // not in partial update column
+        partial_row.set_field(3, 420i64);
+        partial_writer
+            .upsert(&partial_row)
+            .await
+            .expect("Failed to upsert");
+
+        // Verify partial update - name and age should remain unchanged
+        let result = lookuper
+            .lookup(&make_key(1))
+            .await
+            .expect("Failed to lookup after partial update");
+        let found_row = result
+            .get_single_row()
+            .expect("Failed to get row")
+            .expect("Row should exist");
+
+        assert_eq!(found_row.get_int(0), 1, "id should remain 1");
+        assert_eq!(
+            found_row.get_string(1),
+            "Verso",
+            "name should remain unchanged"
+        );
+        assert_eq!(found_row.get_long(2), 32, "age should remain unchanged");
+        assert_eq!(found_row.get_long(3), 420, "score should be updated to 
420");
+
+        admin
+            .drop_table(&table_path, false)
+            .await
+            .expect("Failed to drop table");
+    }
+}
diff --git a/crates/fluss/tests/integration/table.rs 
b/crates/fluss/tests/integration/table.rs
index 4cba469..ef73b56 100644
--- a/crates/fluss/tests/integration/table.rs
+++ b/crates/fluss/tests/integration/table.rs
@@ -33,8 +33,8 @@ static SHARED_FLUSS_CLUSTER: 
LazyLock<Arc<RwLock<Option<FlussTestingCluster>>>>
 #[after_all]
 mod table_test {
     use super::SHARED_FLUSS_CLUSTER;
-    use crate::integration::fluss_cluster::{FlussTestingCluster, 
FlussTestingClusterBuilder};
-    use crate::integration::utils::create_table;
+    use crate::integration::fluss_cluster::FlussTestingCluster;
+    use crate::integration::utils::{create_table, get_cluster, start_cluster, 
stop_cluster};
     use arrow::array::record_batch;
     use fluss::client::{FlussTable, TableScan};
     use fluss::metadata::{DataTypes, Schema, TableBucket, TableDescriptor, 
TablePath};
@@ -44,50 +44,18 @@ mod table_test {
     use jiff::Timestamp;
     use std::collections::HashMap;
     use std::sync::Arc;
-    use std::thread;
     use std::time::Duration;
 
     fn before_all() {
-        // Create a new tokio runtime in a separate thread
-        let cluster_guard = SHARED_FLUSS_CLUSTER.clone();
-        thread::spawn(move || {
-            let rt = tokio::runtime::Runtime::new().expect("Failed to create 
runtime");
-            rt.block_on(async {
-                let cluster = 
FlussTestingClusterBuilder::new("test_table").build().await;
-                let mut guard = cluster_guard.write();
-                *guard = Some(cluster);
-            });
-        })
-        .join()
-        .expect("Failed to create cluster");
-
-        // wait for 20 seconds to avoid the error like
-        // CoordinatorEventProcessor is not initialized yet
-        thread::sleep(std::time::Duration::from_secs(20));
+        start_cluster("test_table", SHARED_FLUSS_CLUSTER.clone());
     }
 
     fn get_fluss_cluster() -> Arc<FlussTestingCluster> {
-        let cluster_guard = SHARED_FLUSS_CLUSTER.read();
-        if cluster_guard.is_none() {
-            panic!("Fluss cluster not initialized. Make sure before_all() was 
called.");
-        }
-        Arc::new(cluster_guard.as_ref().unwrap().clone())
+        get_cluster(&SHARED_FLUSS_CLUSTER)
     }
 
     fn after_all() {
-        // Create a new tokio runtime in a separate thread
-        let cluster_guard = SHARED_FLUSS_CLUSTER.clone();
-        thread::spawn(move || {
-            let rt = tokio::runtime::Runtime::new().expect("Failed to create 
runtime");
-            rt.block_on(async {
-                let mut guard = cluster_guard.write();
-                if let Some(cluster) = guard.take() {
-                    cluster.stop().await;
-                }
-            });
-        })
-        .join()
-        .expect("Failed to cleanup cluster");
+        stop_cluster(SHARED_FLUSS_CLUSTER.clone());
     }
 
     #[tokio::test]
@@ -527,7 +495,7 @@ mod table_test {
 
         use arrow::array::Int32Array;
         let batches = scanner.poll(Duration::from_secs(10)).await.unwrap();
-        let mut all_ids: Vec<i32> = batches
+        let all_ids: Vec<i32> = batches
             .iter()
             .flat_map(|b| {
                 (0..b.num_rows()).map(|i| {
diff --git a/crates/fluss/tests/integration/table_remote_scan.rs 
b/crates/fluss/tests/integration/table_remote_scan.rs
index 43c89b5..e28a836 100644
--- a/crates/fluss/tests/integration/table_remote_scan.rs
+++ b/crates/fluss/tests/integration/table_remote_scan.rs
@@ -33,19 +33,20 @@ static SHARED_FLUSS_CLUSTER: 
LazyLock<Arc<RwLock<Option<FlussTestingCluster>>>>
 mod table_remote_scan_test {
     use super::SHARED_FLUSS_CLUSTER;
     use crate::integration::fluss_cluster::{FlussTestingCluster, 
FlussTestingClusterBuilder};
-    use crate::integration::utils::create_table;
+    use crate::integration::utils::{
+        create_table, get_cluster, stop_cluster, wait_for_cluster_ready,
+    };
     use fluss::metadata::{DataTypes, Schema, TableDescriptor, TablePath};
     use fluss::row::{GenericRow, InternalRow};
     use std::collections::HashMap;
     use std::sync::Arc;
     use std::thread;
-    use std::thread::sleep;
     use std::time::Duration;
     use uuid::Uuid;
 
     fn before_all() {
         // Create a new tokio runtime in a separate thread
-        let cluster_guard = SHARED_FLUSS_CLUSTER.clone();
+        let cluster_lock = SHARED_FLUSS_CLUSTER.clone();
         thread::spawn(move || {
             let rt = tokio::runtime::Runtime::new().expect("Failed to create 
runtime");
             rt.block_on(async {
@@ -94,32 +95,17 @@ mod table_remote_scan_test {
                 .with_remote_data_dir(temp_dir)
                 .build()
                 .await;
-                let mut guard = cluster_guard.write();
+                wait_for_cluster_ready(&cluster).await;
+                let mut guard = cluster_lock.write();
                 *guard = Some(cluster);
             });
         })
         .join()
         .expect("Failed to create cluster");
-
-        // wait for 20 seconds to avoid the error like
-        // CoordinatorEventProcessor is not initialized yet
-        sleep(Duration::from_secs(20));
     }
 
     fn after_all() {
-        // Create a new tokio runtime in a separate thread
-        let cluster_guard = SHARED_FLUSS_CLUSTER.clone();
-        thread::spawn(move || {
-            let rt = tokio::runtime::Runtime::new().expect("Failed to create 
runtime");
-            rt.block_on(async {
-                let mut guard = cluster_guard.write();
-                if let Some(cluster) = guard.take() {
-                    cluster.stop().await;
-                }
-            });
-        })
-        .join()
-        .expect("Failed to cleanup cluster");
+        stop_cluster(SHARED_FLUSS_CLUSTER.clone());
     }
 
     #[tokio::test]
@@ -215,10 +201,6 @@ mod table_remote_scan_test {
     }
 
     fn get_fluss_cluster() -> Arc<FlussTestingCluster> {
-        let cluster_guard = SHARED_FLUSS_CLUSTER.read();
-        if cluster_guard.is_none() {
-            panic!("Fluss cluster not initialized. Make sure before_all() was 
called.");
-        }
-        Arc::new(cluster_guard.as_ref().unwrap().clone())
+        get_cluster(&SHARED_FLUSS_CLUSTER)
     }
 }
diff --git a/crates/fluss/tests/integration/utils.rs 
b/crates/fluss/tests/integration/utils.rs
index cd1f6cc..4d0c349 100644
--- a/crates/fluss/tests/integration/utils.rs
+++ b/crates/fluss/tests/integration/utils.rs
@@ -15,8 +15,43 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
+use crate::integration::fluss_cluster::{FlussTestingCluster, 
FlussTestingClusterBuilder};
 use fluss::client::FlussAdmin;
 use fluss::metadata::{TableDescriptor, TablePath};
+use parking_lot::RwLock;
+use std::sync::Arc;
+use std::time::Duration;
+
+/// Polls the cluster until CoordinatorEventProcessor is initialized and 
tablet server is available.
+/// Times out after 20 seconds.
+pub async fn wait_for_cluster_ready(cluster: &FlussTestingCluster) {
+    let timeout = Duration::from_secs(20);
+    let poll_interval = Duration::from_millis(500);
+    let start = std::time::Instant::now();
+
+    loop {
+        let connection = cluster.get_fluss_connection().await;
+        if connection.get_admin().await.is_ok()
+            && connection
+                .get_metadata()
+                .get_cluster()
+                .get_one_available_server()
+                .is_some()
+        {
+            return;
+        }
+
+        if start.elapsed() >= timeout {
+            panic!(
+                "Server readiness check timed out after {} seconds. \
+                 CoordinatorEventProcessor may not be initialized or 
TabletServer may not be available.",
+                timeout.as_secs()
+            );
+        }
+
+        tokio::time::sleep(poll_interval).await;
+    }
+}
 
 pub async fn create_table(
     admin: &FlussAdmin,
@@ -28,3 +63,42 @@ pub async fn create_table(
         .await
         .expect("Failed to create table");
 }
+
+pub fn start_cluster(name: &str, cluster_lock: 
Arc<RwLock<Option<FlussTestingCluster>>>) {
+    let name = name.to_string();
+    std::thread::spawn(move || {
+        let rt = tokio::runtime::Runtime::new().expect("Failed to create 
runtime");
+        rt.block_on(async {
+            let cluster = FlussTestingClusterBuilder::new(&name).build().await;
+            wait_for_cluster_ready(&cluster).await;
+            let mut guard = cluster_lock.write();
+            *guard = Some(cluster);
+        });
+    })
+    .join()
+    .expect("Failed to create cluster");
+}
+
+pub fn stop_cluster(cluster_lock: Arc<RwLock<Option<FlussTestingCluster>>>) {
+    std::thread::spawn(move || {
+        let rt = tokio::runtime::Runtime::new().expect("Failed to create 
runtime");
+        rt.block_on(async {
+            let mut guard = cluster_lock.write();
+            if let Some(cluster) = guard.take() {
+                cluster.stop().await;
+            }
+        });
+    })
+    .join()
+    .expect("Failed to cleanup cluster");
+}
+
+pub fn get_cluster(cluster_lock: &RwLock<Option<FlussTestingCluster>>) -> 
Arc<FlussTestingCluster> {
+    let guard = cluster_lock.read();
+    Arc::new(
+        guard
+            .as_ref()
+            .expect("Fluss cluster not initialized. Make sure before_all() was 
called.")
+            .clone(),
+    )
+}
diff --git a/crates/fluss/tests/test_fluss.rs b/crates/fluss/tests/test_fluss.rs
index 65111af..f3987e6 100644
--- a/crates/fluss/tests/test_fluss.rs
+++ b/crates/fluss/tests/test_fluss.rs
@@ -22,6 +22,7 @@ extern crate fluss;
 mod integration {
     mod admin;
     mod fluss_cluster;
+    mod kv_table;
     mod table;
 
     mod utils;

Reply via email to