This is an automated email from the ASF dual-hosted git repository.
yuxia pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/fluss-rust.git
The following commit(s) were added to refs/heads/main by this push:
new d76eb28 test: Kv Table Integration tests (#192)
d76eb28 is described below
commit d76eb288efbdb8f6ca943463be23f07f9da43f6c
Author: Keith Lee <[email protected]>
AuthorDate: Fri Jan 23 07:39:34 2026 +0000
test: Kv Table Integration tests (#192)
---
crates/fluss/tests/integration/admin.rs | 40 +-
crates/fluss/tests/integration/kv_table.rs | 444 +++++++++++++++++++++
crates/fluss/tests/integration/table.rs | 44 +-
.../fluss/tests/integration/table_remote_scan.rs | 34 +-
crates/fluss/tests/integration/utils.rs | 74 ++++
crates/fluss/tests/test_fluss.rs | 1 +
6 files changed, 538 insertions(+), 99 deletions(-)
diff --git a/crates/fluss/tests/integration/admin.rs
b/crates/fluss/tests/integration/admin.rs
index ccb7172..fbdb295 100644
--- a/crates/fluss/tests/integration/admin.rs
+++ b/crates/fluss/tests/integration/admin.rs
@@ -33,55 +33,25 @@ static SHARED_FLUSS_CLUSTER:
LazyLock<Arc<RwLock<Option<FlussTestingCluster>>>>
#[after_all]
mod admin_test {
use super::SHARED_FLUSS_CLUSTER;
- use crate::integration::fluss_cluster::{FlussTestingCluster,
FlussTestingClusterBuilder};
+ use crate::integration::fluss_cluster::FlussTestingCluster;
+ use crate::integration::utils::{get_cluster, start_cluster, stop_cluster};
use fluss::error::FlussError;
use fluss::metadata::{
DataTypes, DatabaseDescriptorBuilder, KvFormat, LogFormat, Schema,
TableDescriptor,
TablePath,
};
use std::sync::Arc;
- use std::thread;
fn before_all() {
- // Create a new tokio runtime in a separate thread
- let cluster_guard = SHARED_FLUSS_CLUSTER.clone();
- thread::spawn(move || {
- let rt = tokio::runtime::Runtime::new().expect("Failed to create
runtime");
- rt.block_on(async {
- let cluster =
FlussTestingClusterBuilder::new("test-admin").build().await;
- let mut guard = cluster_guard.write();
- *guard = Some(cluster);
- });
- })
- .join()
- .expect("Failed to create cluster");
- // wait for 20 seconds to avoid the error like
- // CoordinatorEventProcessor is not initialized yet
- thread::sleep(std::time::Duration::from_secs(20));
+ start_cluster("test-admin", SHARED_FLUSS_CLUSTER.clone());
}
fn get_fluss_cluster() -> Arc<FlussTestingCluster> {
- let cluster_guard = SHARED_FLUSS_CLUSTER.read();
- if cluster_guard.is_none() {
- panic!("Fluss cluster not initialized. Make sure before_all() was
called.");
- }
- Arc::new(cluster_guard.as_ref().unwrap().clone())
+ get_cluster(&SHARED_FLUSS_CLUSTER)
}
fn after_all() {
- // Create a new tokio runtime in a separate thread
- let cluster_guard = SHARED_FLUSS_CLUSTER.clone();
- std::thread::spawn(move || {
- let rt = tokio::runtime::Runtime::new().expect("Failed to create
runtime");
- rt.block_on(async {
- let mut guard = cluster_guard.write();
- if let Some(cluster) = guard.take() {
- cluster.stop().await;
- }
- });
- })
- .join()
- .expect("Failed to cleanup cluster");
+ stop_cluster(SHARED_FLUSS_CLUSTER.clone());
}
#[tokio::test]
diff --git a/crates/fluss/tests/integration/kv_table.rs
b/crates/fluss/tests/integration/kv_table.rs
new file mode 100644
index 0000000..efd7957
--- /dev/null
+++ b/crates/fluss/tests/integration/kv_table.rs
@@ -0,0 +1,444 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+use parking_lot::RwLock;
+use std::sync::Arc;
+use std::sync::LazyLock;
+
+use crate::integration::fluss_cluster::FlussTestingCluster;
+#[cfg(test)]
+use test_env_helpers::*;
+
+// Module-level shared cluster instance (only for this test file)
+static SHARED_FLUSS_CLUSTER:
LazyLock<Arc<RwLock<Option<FlussTestingCluster>>>> =
+ LazyLock::new(|| Arc::new(RwLock::new(None)));
+
+#[cfg(test)]
+#[before_all]
+#[after_all]
+mod kv_table_test {
+ use super::SHARED_FLUSS_CLUSTER;
+ use crate::integration::fluss_cluster::FlussTestingCluster;
+ use crate::integration::utils::{create_table, get_cluster, start_cluster,
stop_cluster};
+ use fluss::client::UpsertWriter;
+ use fluss::metadata::{DataTypes, Schema, TableDescriptor, TablePath};
+ use fluss::row::{GenericRow, InternalRow};
+ use std::sync::Arc;
+
+ fn before_all() {
+ start_cluster("test_kv_table", SHARED_FLUSS_CLUSTER.clone());
+ }
+
+ fn get_fluss_cluster() -> Arc<FlussTestingCluster> {
+ get_cluster(&SHARED_FLUSS_CLUSTER)
+ }
+
+ fn after_all() {
+ stop_cluster(SHARED_FLUSS_CLUSTER.clone());
+ }
+
+ fn make_key(id: i32) -> GenericRow<'static> {
+ let mut row = GenericRow::new();
+ row.set_field(0, id);
+ row.set_field(1, "");
+ row.set_field(2, 0i64);
+ row
+ }
+
+ #[tokio::test]
+ async fn upsert_delete_and_lookup() {
+ let cluster = get_fluss_cluster();
+ let connection = cluster.get_fluss_connection().await;
+
+ let admin = connection.get_admin().await.expect("Failed to get admin");
+
+ let table_path = TablePath::new("fluss".to_string(),
"test_upsert_and_lookup".to_string());
+
+ let table_descriptor = TableDescriptor::builder()
+ .schema(
+ Schema::builder()
+ .column("id", DataTypes::int())
+ .column("name", DataTypes::string())
+ .column("age", DataTypes::bigint())
+ .primary_key(vec!["id".to_string()])
+ .build()
+ .expect("Failed to build schema"),
+ )
+ .build()
+ .expect("Failed to build table");
+
+ create_table(&admin, &table_path, &table_descriptor).await;
+
+ let table = connection
+ .get_table(&table_path)
+ .await
+ .expect("Failed to get table");
+
+ let table_upsert = table.new_upsert().expect("Failed to create
upsert");
+ let mut upsert_writer = table_upsert
+ .create_writer()
+ .expect("Failed to create writer");
+
+ let test_data = [(1, "Verso", 32i64), (2, "Noco", 25), (3, "Esquie",
35)];
+
+ // Upsert rows
+ for (id, name, age) in &test_data {
+ let mut row = GenericRow::new();
+ row.set_field(0, *id);
+ row.set_field(1, *name);
+ row.set_field(2, *age);
+ upsert_writer
+ .upsert(&row)
+ .await
+ .expect("Failed to upsert row");
+ }
+
+ // Lookup records
+ let mut lookuper = table
+ .new_lookup()
+ .expect("Failed to create lookup")
+ .create_lookuper()
+ .expect("Failed to create lookuper");
+
+ // Verify lookup results
+ for (id, expected_name, expected_age) in &test_data {
+ let result = lookuper
+ .lookup(&make_key(*id))
+ .await
+ .expect("Failed to lookup");
+ let row = result
+ .get_single_row()
+ .expect("Failed to get row")
+ .expect("Row should exist");
+
+ assert_eq!(row.get_int(0), *id, "id mismatch");
+ assert_eq!(row.get_string(1), *expected_name, "name mismatch");
+ assert_eq!(row.get_long(2), *expected_age, "age mismatch");
+ }
+
+ // Update the record with new age
+ let mut updated_row = GenericRow::new();
+ updated_row.set_field(0, 1);
+ updated_row.set_field(1, "Verso");
+ updated_row.set_field(2, 33i64);
+ upsert_writer
+ .upsert(&updated_row)
+ .await
+ .expect("Failed to upsert updated row");
+
+ // Verify the update
+ let result = lookuper
+ .lookup(&make_key(1))
+ .await
+ .expect("Failed to lookup after update");
+ let found_row = result
+ .get_single_row()
+ .expect("Failed to get row")
+ .expect("Row should exist");
+ assert_eq!(
+ found_row.get_long(2),
+ updated_row.get_long(2),
+ "Age should be updated"
+ );
+ assert_eq!(
+ found_row.get_string(1),
+ updated_row.get_string(1),
+ "Name should remain unchanged"
+ );
+
+ // Delete record with id=1
+ let mut delete_row = GenericRow::new();
+ delete_row.set_field(0, 1);
+ delete_row.set_field(1, "");
+ delete_row.set_field(2, 0i64);
+ upsert_writer
+ .delete(&delete_row)
+ .await
+ .expect("Failed to delete");
+
+ // Verify deletion
+ let result = lookuper
+ .lookup(&make_key(1))
+ .await
+ .expect("Failed to lookup deleted record");
+ assert!(
+ result
+ .get_single_row()
+ .expect("Failed to get row")
+ .is_none(),
+ "Record 1 should not exist after delete"
+ );
+
+ // Verify other records still exist
+ for i in [2, 3] {
+ let result = lookuper
+ .lookup(&make_key(i))
+ .await
+ .expect("Failed to lookup");
+ assert!(
+ result
+ .get_single_row()
+ .expect("Failed to get row")
+ .is_some(),
+ "Record {} should still exist after deleting record 1",
+ i
+ );
+ }
+
+ // Lookup non-existent key
+ let result = lookuper
+ .lookup(&make_key(999))
+ .await
+ .expect("Failed to lookup non-existent key");
+ assert!(
+ result
+ .get_single_row()
+ .expect("Failed to get row")
+ .is_none(),
+ "Non-existent key should return None"
+ );
+
+ admin
+ .drop_table(&table_path, false)
+ .await
+ .expect("Failed to drop table");
+ }
+
+ #[tokio::test]
+ async fn composite_primary_keys() {
+ let cluster = get_fluss_cluster();
+ let connection = cluster.get_fluss_connection().await;
+
+ let admin = connection.get_admin().await.expect("Failed to get admin");
+
+ let table_path = TablePath::new("fluss".to_string(),
"test_composite_pk".to_string());
+
+ let table_descriptor = TableDescriptor::builder()
+ .schema(
+ Schema::builder()
+ .column("region", DataTypes::string())
+ .column("user_id", DataTypes::int())
+ .column("score", DataTypes::bigint())
+ .primary_key(vec!["region".to_string(),
"user_id".to_string()])
+ .build()
+ .expect("Failed to build schema"),
+ )
+ .build()
+ .expect("Failed to build table");
+
+ create_table(&admin, &table_path, &table_descriptor).await;
+
+ let table = connection
+ .get_table(&table_path)
+ .await
+ .expect("Failed to get table");
+
+ let table_upsert = table.new_upsert().expect("Failed to create
upsert");
+ let mut upsert_writer = table_upsert
+ .create_writer()
+ .expect("Failed to create writer");
+
+ // Insert records with composite keys
+ let test_data = [
+ ("US", 1, 100i64),
+ ("US", 2, 200i64),
+ ("EU", 1, 150i64),
+ ("EU", 2, 250i64),
+ ];
+
+ for (region, user_id, score) in &test_data {
+ let mut row = GenericRow::new();
+ row.set_field(0, *region);
+ row.set_field(1, *user_id);
+ row.set_field(2, *score);
+ upsert_writer.upsert(&row).await.expect("Failed to upsert");
+ }
+
+ // Lookup with composite key
+ let mut lookuper = table
+ .new_lookup()
+ .expect("Failed to create lookup")
+ .create_lookuper()
+ .expect("Failed to create lookuper");
+
+ // Lookup (US, 1) - should return score 100
+ let mut key = GenericRow::new();
+ key.set_field(0, "US");
+ key.set_field(1, 1);
+ let result = lookuper.lookup(&key).await.expect("Failed to lookup");
+ let row = result
+ .get_single_row()
+ .expect("Failed to get row")
+ .expect("Row should exist");
+ assert_eq!(row.get_long(2), 100, "Score for (US, 1) should be 100");
+
+ // Lookup (EU, 2) - should return score 250
+ let mut key = GenericRow::new();
+ key.set_field(0, "EU");
+ key.set_field(1, 2);
+ let result = lookuper.lookup(&key).await.expect("Failed to lookup");
+ let row = result
+ .get_single_row()
+ .expect("Failed to get row")
+ .expect("Row should exist");
+ assert_eq!(row.get_long(2), 250, "Score for (EU, 2) should be 250");
+
+ // Update (US, 1) score
+ let mut update_row = GenericRow::new();
+ update_row.set_field(0, "US");
+ update_row.set_field(1, 1);
+ update_row.set_field(2, 500i64);
+ upsert_writer
+ .upsert(&update_row)
+ .await
+ .expect("Failed to update");
+
+ // Verify update
+ let mut key = GenericRow::new();
+ key.set_field(0, "US");
+ key.set_field(1, 1);
+ let result = lookuper.lookup(&key).await.expect("Failed to lookup");
+ let row = result
+ .get_single_row()
+ .expect("Failed to get row")
+ .expect("Row should exist");
+ assert_eq!(
+ row.get_long(2),
+ update_row.get_long(2),
+ "Row score should be updated"
+ );
+
+ admin
+ .drop_table(&table_path, false)
+ .await
+ .expect("Failed to drop table");
+ }
+
+ #[tokio::test]
+ async fn partial_update() {
+ use fluss::row::Datum;
+
+ let cluster = get_fluss_cluster();
+ let connection = cluster.get_fluss_connection().await;
+
+ let admin = connection.get_admin().await.expect("Failed to get admin");
+
+ let table_path = TablePath::new("fluss".to_string(),
"test_partial_update".to_string());
+
+ let table_descriptor = TableDescriptor::builder()
+ .schema(
+ Schema::builder()
+ .column("id", DataTypes::int())
+ .column("name", DataTypes::string())
+ .column("age", DataTypes::bigint())
+ .column("score", DataTypes::bigint())
+ .primary_key(vec!["id".to_string()])
+ .build()
+ .expect("Failed to build schema"),
+ )
+ .build()
+ .expect("Failed to build table");
+
+ create_table(&admin, &table_path, &table_descriptor).await;
+
+ let table = connection
+ .get_table(&table_path)
+ .await
+ .expect("Failed to get table");
+
+ // Insert initial record with all columns
+ let table_upsert = table.new_upsert().expect("Failed to create
upsert");
+ let mut upsert_writer = table_upsert
+ .create_writer()
+ .expect("Failed to create writer");
+
+ let mut row = GenericRow::new();
+ row.set_field(0, 1);
+ row.set_field(1, "Verso");
+ row.set_field(2, 32i64);
+ row.set_field(3, 6942i64);
+ upsert_writer
+ .upsert(&row)
+ .await
+ .expect("Failed to upsert initial row");
+
+ // Verify initial record
+ let mut lookuper = table
+ .new_lookup()
+ .expect("Failed to create lookup")
+ .create_lookuper()
+ .expect("Failed to create lookuper");
+
+ let result = lookuper
+ .lookup(&make_key(1))
+ .await
+ .expect("Failed to lookup");
+ let found_row = result
+ .get_single_row()
+ .expect("Failed to get row")
+ .expect("Row should exist");
+
+ assert_eq!(found_row.get_int(0), 1);
+ assert_eq!(found_row.get_string(1), "Verso");
+ assert_eq!(found_row.get_long(2), 32i64);
+ assert_eq!(found_row.get_long(3), 6942i64);
+
+ // Create partial update writer to update only score column
+ let partial_upsert = table_upsert
+ .partial_update_with_column_names(&["id", "score"])
+ .expect("Failed to create TableUpsert with partial update");
+ let mut partial_writer = partial_upsert
+ .create_writer()
+ .expect("Failed to create UpsertWriter with partial write");
+
+ // Update only the score column
+ let mut partial_row = GenericRow::new();
+ partial_row.set_field(0, 1);
+ partial_row.set_field(1, Datum::Null); // not in partial update column
+ partial_row.set_field(2, Datum::Null); // not in partial update column
+ partial_row.set_field(3, 420i64);
+ partial_writer
+ .upsert(&partial_row)
+ .await
+ .expect("Failed to upsert");
+
+ // Verify partial update - name and age should remain unchanged
+ let result = lookuper
+ .lookup(&make_key(1))
+ .await
+ .expect("Failed to lookup after partial update");
+ let found_row = result
+ .get_single_row()
+ .expect("Failed to get row")
+ .expect("Row should exist");
+
+ assert_eq!(found_row.get_int(0), 1, "id should remain 1");
+ assert_eq!(
+ found_row.get_string(1),
+ "Verso",
+ "name should remain unchanged"
+ );
+ assert_eq!(found_row.get_long(2), 32, "age should remain unchanged");
+ assert_eq!(found_row.get_long(3), 420, "score should be updated to
420");
+
+ admin
+ .drop_table(&table_path, false)
+ .await
+ .expect("Failed to drop table");
+ }
+}
diff --git a/crates/fluss/tests/integration/table.rs
b/crates/fluss/tests/integration/table.rs
index 4cba469..ef73b56 100644
--- a/crates/fluss/tests/integration/table.rs
+++ b/crates/fluss/tests/integration/table.rs
@@ -33,8 +33,8 @@ static SHARED_FLUSS_CLUSTER:
LazyLock<Arc<RwLock<Option<FlussTestingCluster>>>>
#[after_all]
mod table_test {
use super::SHARED_FLUSS_CLUSTER;
- use crate::integration::fluss_cluster::{FlussTestingCluster,
FlussTestingClusterBuilder};
- use crate::integration::utils::create_table;
+ use crate::integration::fluss_cluster::FlussTestingCluster;
+ use crate::integration::utils::{create_table, get_cluster, start_cluster,
stop_cluster};
use arrow::array::record_batch;
use fluss::client::{FlussTable, TableScan};
use fluss::metadata::{DataTypes, Schema, TableBucket, TableDescriptor,
TablePath};
@@ -44,50 +44,18 @@ mod table_test {
use jiff::Timestamp;
use std::collections::HashMap;
use std::sync::Arc;
- use std::thread;
use std::time::Duration;
fn before_all() {
- // Create a new tokio runtime in a separate thread
- let cluster_guard = SHARED_FLUSS_CLUSTER.clone();
- thread::spawn(move || {
- let rt = tokio::runtime::Runtime::new().expect("Failed to create
runtime");
- rt.block_on(async {
- let cluster =
FlussTestingClusterBuilder::new("test_table").build().await;
- let mut guard = cluster_guard.write();
- *guard = Some(cluster);
- });
- })
- .join()
- .expect("Failed to create cluster");
-
- // wait for 20 seconds to avoid the error like
- // CoordinatorEventProcessor is not initialized yet
- thread::sleep(std::time::Duration::from_secs(20));
+ start_cluster("test_table", SHARED_FLUSS_CLUSTER.clone());
}
fn get_fluss_cluster() -> Arc<FlussTestingCluster> {
- let cluster_guard = SHARED_FLUSS_CLUSTER.read();
- if cluster_guard.is_none() {
- panic!("Fluss cluster not initialized. Make sure before_all() was
called.");
- }
- Arc::new(cluster_guard.as_ref().unwrap().clone())
+ get_cluster(&SHARED_FLUSS_CLUSTER)
}
fn after_all() {
- // Create a new tokio runtime in a separate thread
- let cluster_guard = SHARED_FLUSS_CLUSTER.clone();
- thread::spawn(move || {
- let rt = tokio::runtime::Runtime::new().expect("Failed to create
runtime");
- rt.block_on(async {
- let mut guard = cluster_guard.write();
- if let Some(cluster) = guard.take() {
- cluster.stop().await;
- }
- });
- })
- .join()
- .expect("Failed to cleanup cluster");
+ stop_cluster(SHARED_FLUSS_CLUSTER.clone());
}
#[tokio::test]
@@ -527,7 +495,7 @@ mod table_test {
use arrow::array::Int32Array;
let batches = scanner.poll(Duration::from_secs(10)).await.unwrap();
- let mut all_ids: Vec<i32> = batches
+ let all_ids: Vec<i32> = batches
.iter()
.flat_map(|b| {
(0..b.num_rows()).map(|i| {
diff --git a/crates/fluss/tests/integration/table_remote_scan.rs
b/crates/fluss/tests/integration/table_remote_scan.rs
index 43c89b5..e28a836 100644
--- a/crates/fluss/tests/integration/table_remote_scan.rs
+++ b/crates/fluss/tests/integration/table_remote_scan.rs
@@ -33,19 +33,20 @@ static SHARED_FLUSS_CLUSTER:
LazyLock<Arc<RwLock<Option<FlussTestingCluster>>>>
mod table_remote_scan_test {
use super::SHARED_FLUSS_CLUSTER;
use crate::integration::fluss_cluster::{FlussTestingCluster,
FlussTestingClusterBuilder};
- use crate::integration::utils::create_table;
+ use crate::integration::utils::{
+ create_table, get_cluster, stop_cluster, wait_for_cluster_ready,
+ };
use fluss::metadata::{DataTypes, Schema, TableDescriptor, TablePath};
use fluss::row::{GenericRow, InternalRow};
use std::collections::HashMap;
use std::sync::Arc;
use std::thread;
- use std::thread::sleep;
use std::time::Duration;
use uuid::Uuid;
fn before_all() {
// Create a new tokio runtime in a separate thread
- let cluster_guard = SHARED_FLUSS_CLUSTER.clone();
+ let cluster_lock = SHARED_FLUSS_CLUSTER.clone();
thread::spawn(move || {
let rt = tokio::runtime::Runtime::new().expect("Failed to create
runtime");
rt.block_on(async {
@@ -94,32 +95,17 @@ mod table_remote_scan_test {
.with_remote_data_dir(temp_dir)
.build()
.await;
- let mut guard = cluster_guard.write();
+ wait_for_cluster_ready(&cluster).await;
+ let mut guard = cluster_lock.write();
*guard = Some(cluster);
});
})
.join()
.expect("Failed to create cluster");
-
- // wait for 20 seconds to avoid the error like
- // CoordinatorEventProcessor is not initialized yet
- sleep(Duration::from_secs(20));
}
fn after_all() {
- // Create a new tokio runtime in a separate thread
- let cluster_guard = SHARED_FLUSS_CLUSTER.clone();
- thread::spawn(move || {
- let rt = tokio::runtime::Runtime::new().expect("Failed to create
runtime");
- rt.block_on(async {
- let mut guard = cluster_guard.write();
- if let Some(cluster) = guard.take() {
- cluster.stop().await;
- }
- });
- })
- .join()
- .expect("Failed to cleanup cluster");
+ stop_cluster(SHARED_FLUSS_CLUSTER.clone());
}
#[tokio::test]
@@ -215,10 +201,6 @@ mod table_remote_scan_test {
}
fn get_fluss_cluster() -> Arc<FlussTestingCluster> {
- let cluster_guard = SHARED_FLUSS_CLUSTER.read();
- if cluster_guard.is_none() {
- panic!("Fluss cluster not initialized. Make sure before_all() was
called.");
- }
- Arc::new(cluster_guard.as_ref().unwrap().clone())
+ get_cluster(&SHARED_FLUSS_CLUSTER)
}
}
diff --git a/crates/fluss/tests/integration/utils.rs
b/crates/fluss/tests/integration/utils.rs
index cd1f6cc..4d0c349 100644
--- a/crates/fluss/tests/integration/utils.rs
+++ b/crates/fluss/tests/integration/utils.rs
@@ -15,8 +15,43 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
+use crate::integration::fluss_cluster::{FlussTestingCluster,
FlussTestingClusterBuilder};
use fluss::client::FlussAdmin;
use fluss::metadata::{TableDescriptor, TablePath};
+use parking_lot::RwLock;
+use std::sync::Arc;
+use std::time::Duration;
+
+/// Polls the cluster until CoordinatorEventProcessor is initialized and
tablet server is available.
+/// Times out after 20 seconds.
+pub async fn wait_for_cluster_ready(cluster: &FlussTestingCluster) {
+ let timeout = Duration::from_secs(20);
+ let poll_interval = Duration::from_millis(500);
+ let start = std::time::Instant::now();
+
+ loop {
+ let connection = cluster.get_fluss_connection().await;
+ if connection.get_admin().await.is_ok()
+ && connection
+ .get_metadata()
+ .get_cluster()
+ .get_one_available_server()
+ .is_some()
+ {
+ return;
+ }
+
+ if start.elapsed() >= timeout {
+ panic!(
+ "Server readiness check timed out after {} seconds. \
+ CoordinatorEventProcessor may not be initialized or
TabletServer may not be available.",
+ timeout.as_secs()
+ );
+ }
+
+ tokio::time::sleep(poll_interval).await;
+ }
+}
pub async fn create_table(
admin: &FlussAdmin,
@@ -28,3 +63,42 @@ pub async fn create_table(
.await
.expect("Failed to create table");
}
+
+pub fn start_cluster(name: &str, cluster_lock:
Arc<RwLock<Option<FlussTestingCluster>>>) {
+ let name = name.to_string();
+ std::thread::spawn(move || {
+ let rt = tokio::runtime::Runtime::new().expect("Failed to create
runtime");
+ rt.block_on(async {
+ let cluster = FlussTestingClusterBuilder::new(&name).build().await;
+ wait_for_cluster_ready(&cluster).await;
+ let mut guard = cluster_lock.write();
+ *guard = Some(cluster);
+ });
+ })
+ .join()
+ .expect("Failed to create cluster");
+}
+
+pub fn stop_cluster(cluster_lock: Arc<RwLock<Option<FlussTestingCluster>>>) {
+ std::thread::spawn(move || {
+ let rt = tokio::runtime::Runtime::new().expect("Failed to create
runtime");
+ rt.block_on(async {
+ let mut guard = cluster_lock.write();
+ if let Some(cluster) = guard.take() {
+ cluster.stop().await;
+ }
+ });
+ })
+ .join()
+ .expect("Failed to cleanup cluster");
+}
+
+pub fn get_cluster(cluster_lock: &RwLock<Option<FlussTestingCluster>>) ->
Arc<FlussTestingCluster> {
+ let guard = cluster_lock.read();
+ Arc::new(
+ guard
+ .as_ref()
+ .expect("Fluss cluster not initialized. Make sure before_all() was
called.")
+ .clone(),
+ )
+}
diff --git a/crates/fluss/tests/test_fluss.rs b/crates/fluss/tests/test_fluss.rs
index 65111af..f3987e6 100644
--- a/crates/fluss/tests/test_fluss.rs
+++ b/crates/fluss/tests/test_fluss.rs
@@ -22,6 +22,7 @@ extern crate fluss;
mod integration {
mod admin;
mod fluss_cluster;
+ mod kv_table;
mod table;
mod utils;