This is an automated email from the ASF dual-hosted git repository.
yuxia pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/fluss-rust.git
The following commit(s) were added to refs/heads/main by this push:
new 4831808 [test] Add IT for table operation in admin (#32)
4831808 is described below
commit 4831808763cb59d2bd6aca1af8c73eddd529c100
Author: Junbo Wang <[email protected]>
AuthorDate: Thu Oct 16 21:02:37 2025 +0800
[test] Add IT for table operation in admin (#32)
---------
Co-authored-by: 王俊博(wangjunbo) <[email protected]>
Co-authored-by: luoyuxia <[email protected]>
---
crates/fluss/src/metadata/table.rs | 6 +-
crates/fluss/tests/integration/admin.rs | 122 +++++++++++++++++++++++++++++++-
2 files changed, 123 insertions(+), 5 deletions(-)
diff --git a/crates/fluss/src/metadata/table.rs
b/crates/fluss/src/metadata/table.rs
index 2b48ec6..751dd6d 100644
--- a/crates/fluss/src/metadata/table.rs
+++ b/crates/fluss/src/metadata/table.rs
@@ -23,7 +23,7 @@ use serde::{Deserialize, Serialize};
use std::collections::{HashMap, HashSet};
use std::fmt::{Display, Formatter};
-#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
+#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
pub struct Column {
name: String,
data_type: DataType,
@@ -66,7 +66,7 @@ impl Column {
}
}
-#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
+#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
pub struct PrimaryKey {
constraint_name: String,
column_names: Vec<String>,
@@ -90,7 +90,7 @@ impl PrimaryKey {
}
}
-#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
+#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
pub struct Schema {
columns: Vec<Column>,
primary_key: Option<PrimaryKey>,
diff --git a/crates/fluss/tests/integration/admin.rs
b/crates/fluss/tests/integration/admin.rs
index 73f52db..0d958a5 100644
--- a/crates/fluss/tests/integration/admin.rs
+++ b/crates/fluss/tests/integration/admin.rs
@@ -33,7 +33,10 @@ static SHARED_FLUSS_CLUSTER:
Lazy<Arc<RwLock<Option<FlussTestingCluster>>>> =
mod admin_test {
use super::SHARED_FLUSS_CLUSTER;
use crate::integration::fluss_cluster::{FlussTestingCluster,
FlussTestingClusterBuilder};
- use fluss::metadata::DatabaseDescriptorBuilder;
+ use fluss::metadata::{
+ DataTypes, DatabaseDescriptorBuilder, KvFormat, LogFormat, Schema,
TableDescriptor,
+ TablePath,
+ };
use std::sync::Arc;
fn before_all() {
@@ -126,6 +129,121 @@ mod admin_test {
#[tokio::test]
async fn test_create_table() {
- // todo
+ let cluster = get_fluss_cluster();
+ let connection = cluster.get_fluss_connection().await;
+ let admin = connection
+ .get_admin()
+ .await
+ .expect("Failed to get admin client");
+
+ let test_db_name = "test_create_table_db";
+ let db_descriptor = DatabaseDescriptorBuilder::default()
+ .comment("Database for test_create_table")
+ .build();
+
+ assert_eq!(admin.database_exists(test_db_name).await.unwrap(), false);
+ admin
+ .create_database(test_db_name, false, Some(&db_descriptor))
+ .await
+ .expect("Failed to create test database");
+
+ let test_table_name = "test_user_table";
+ let table_path = TablePath::new(test_db_name.to_string(),
test_table_name.to_string());
+
+ // build table schema
+ let table_schema = Schema::builder()
+ .column("id", DataTypes::int())
+ .column("name", DataTypes::string())
+ .column("age", DataTypes::int())
+ .with_comment("User's age (optional)")
+ .column("email", DataTypes::string())
+ .primary_key(vec!["id".to_string()])
+ .build()
+ .expect("Failed to build table schema");
+
+ // build table descriptor
+ let table_descriptor = TableDescriptor::builder()
+ .schema(table_schema.clone())
+ .comment("Test table for user data (id, name, age, email)")
+ .distributed_by(Some(3), vec!["id".to_string()])
+ .property("table.replication.factor", "1")
+ .log_format(LogFormat::ARROW)
+ .kv_format(KvFormat::INDEXED)
+ .build()
+ .expect("Failed to build table descriptor");
+
+ // create test table
+ admin
+ .create_table(&table_path, &table_descriptor, false)
+ .await
+ .expect("Failed to create test table");
+
+ assert!(
+ admin.table_exists(&table_path).await.unwrap(),
+ "Table {:?} should exist after creation",
+ table_path
+ );
+
+ let tables = admin.list_tables(test_db_name).await.unwrap();
+ assert_eq!(
+ tables.len(),
+ 1,
+ "There should be exactly one table in the database"
+ );
+ assert!(
+ tables.contains(&test_table_name.to_string()),
+ "Table list should contain the created table"
+ );
+
+ let table_info = admin
+ .get_table(&table_path)
+ .await
+ .expect("Failed to get table info");
+
+ // verify table comment
+ assert_eq!(
+ table_info.get_comment(),
+ Some("Test table for user data (id, name, age, email)"),
+ "Table comment mismatch"
+ );
+
+ // verify schema columns
+ let actual_schema = table_info.get_schema();
+ assert_eq!(actual_schema, table_descriptor.schema(), "Schema
mismatch");
+
+ // verify primary key
+ assert_eq!(
+ table_info.get_primary_keys(),
+ &vec!["id".to_string()],
+ "Primary key columns mismatch"
+ );
+
+ // verify distribution and properties
+ assert_eq!(table_info.get_num_buckets(), 3, "Bucket count mismatch");
+ assert_eq!(
+ table_info.get_bucket_keys(),
+ &vec!["id".to_string()],
+ "Bucket keys mismatch"
+ );
+
+ assert_eq!(
+ table_info.get_properties(),
+ table_descriptor.properties(),
+ "Properties mismatch"
+ );
+
+ // drop table
+ admin
+ .drop_table(&table_path, false)
+ .await
+ .expect("Failed to drop table");
+ // table shouldn't exist now
+ assert_eq!(admin.table_exists(&table_path).await.unwrap(), false);
+
+ // drop database
+ admin.drop_database(test_db_name, false, true).await;
+
+ // database shouldn't exist now
+ assert_eq!(admin.database_exists(test_db_name).await.unwrap(), false);
}
}