This is an automated email from the ASF dual-hosted git repository.

yuxia pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/fluss-rust.git


The following commit(s) were added to refs/heads/main by this push:
     new 1dd1d2b  [chore] Introduce IT infra and add IT for database operations 
in admin (#28)
1dd1d2b is described below

commit 1dd1d2b1e42edda7ea953a2bc5f1b182a6498fdf
Author: yuxia Luo <[email protected]>
AuthorDate: Wed Oct 15 18:02:18 2025 +0800

    [chore] Introduce IT infra and add IT for database operations in admin (#28)
---
 .github/workflows/ci.yml                        |   6 +-
 crates/fluss/Cargo.toml                         |   4 +
 crates/fluss/src/client/admin.rs                |   5 +-
 crates/fluss/src/metadata/database.rs           |  19 ++-
 crates/fluss/src/metadata/table.rs              |   4 +-
 crates/fluss/tests/integration/admin.rs         | 131 ++++++++++++++++
 crates/fluss/tests/integration/client/mod.rs    |  21 ---
 crates/fluss/tests/integration/fluss_cluster.rs | 192 ++++++++++++++++++++++++
 crates/fluss/tests/test_fluss.rs                |   4 +-
 9 files changed, 347 insertions(+), 39 deletions(-)

diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml
index 2661629..73e2b3f 100644
--- a/.github/workflows/ci.yml
+++ b/.github/workflows/ci.yml
@@ -88,7 +88,11 @@ jobs:
           RUST_LOG: DEBUG
           RUST_BACKTRACE: full
       - name: Integration Test
-        run: cargo test --features integration_tests --all-targets --workspace
+        # only run IT in linux since no docker in macos by default
+        run: |
+          if [ "$RUNNER_OS" == "Linux" ]; then
+            cargo test --features integration_tests --all-targets --workspace
+          fi
         env:
           RUST_LOG: DEBUG
           RUST_BACKTRACE: full
\ No newline at end of file
diff --git a/crates/fluss/Cargo.toml b/crates/fluss/Cargo.toml
index cc26014..a728bd7 100644
--- a/crates/fluss/Cargo.toml
+++ b/crates/fluss/Cargo.toml
@@ -46,6 +46,10 @@ parse-display = "0.10"
 ref-cast = "1.0"
 chrono = { version = "0.4", features = ["clock", "std", "wasmbind"] }
 
+[dev-dependencies]
+testcontainers = "0.25.0"
+once_cell = "1.19"
+test-env-helpers = "0.2.2"
 
 [features]
 integration_tests = []
diff --git a/crates/fluss/src/client/admin.rs b/crates/fluss/src/client/admin.rs
index 2584034..fd0f316 100644
--- a/crates/fluss/src/client/admin.rs
+++ b/crates/fluss/src/client/admin.rs
@@ -150,7 +150,7 @@ impl FlussAdmin {
         database_name: &str,
         ignore_if_not_exists: bool,
         cascade: bool,
-    ) -> Result<()> {
+    ) {
         let _response = self
             .admin_gateway
             .request(DropDatabaseRequest::new(
@@ -158,8 +158,7 @@ impl FlussAdmin {
                 ignore_if_not_exists,
                 cascade,
             ))
-            .await?;
-        Ok(())
+            .await;
     }
 
     /// List all databases
diff --git a/crates/fluss/src/metadata/database.rs 
b/crates/fluss/src/metadata/database.rs
index 2649421..8eaa4d3 100644
--- a/crates/fluss/src/metadata/database.rs
+++ b/crates/fluss/src/metadata/database.rs
@@ -22,7 +22,7 @@ use serde::{Deserialize, Serialize};
 use serde_json::{Value, json};
 use std::collections::HashMap;
 
-#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
+#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
 pub struct DatabaseDescriptor {
     comment: Option<String>,
     custom_properties: HashMap<String, String>,
@@ -105,11 +105,11 @@ impl DatabaseDescriptorBuilder {
         self
     }
 
-    pub fn build(self) -> Result<DatabaseDescriptor> {
-        Ok(DatabaseDescriptor {
+    pub fn build(self) -> DatabaseDescriptor {
+        DatabaseDescriptor {
             comment: self.comment,
             custom_properties: self.custom_properties,
-        })
+        }
     }
 }
 
@@ -179,7 +179,7 @@ impl JsonSerde for DatabaseDescriptor {
         };
         builder = builder.custom_properties(custom_properties);
 
-        builder.build()
+        Ok(builder.build())
     }
 }
 
@@ -187,7 +187,7 @@ impl DatabaseDescriptor {
     /// Create DatabaseDescriptor from JSON bytes (equivalent to Java's 
fromJsonBytes)
     pub fn from_json_bytes(bytes: &[u8]) -> Result<Self> {
         let json_value: Value = serde_json::from_slice(bytes)
-            .map_err(|e| JsonSerdeError(format!("Failed to parse JSON: {}", 
e)))?;
+            .map_err(|e| JsonSerdeError(format!("Failed to parse JSON: 
{e}")))?;
         Self::deserialize_json(&json_value)
     }
 
@@ -195,7 +195,7 @@ impl DatabaseDescriptor {
     pub fn to_json_bytes(&self) -> Result<Vec<u8>> {
         let json_value = self.serialize_json()?;
         serde_json::to_vec(&json_value)
-            .map_err(|e| JsonSerdeError(format!("Failed to serialize to JSON: 
{}", e)))
+            .map_err(|e| JsonSerdeError(format!("Failed to serialize to JSON: 
{e}")))
     }
 }
 
@@ -212,8 +212,7 @@ mod tests {
         let descriptor = DatabaseDescriptor::builder()
             .comment("Test database")
             .custom_properties(custom_props)
-            .build()
-            .unwrap();
+            .build();
 
         // Test serialization
         let json_bytes = descriptor.to_json_bytes().unwrap();
@@ -226,7 +225,7 @@ mod tests {
 
     #[test]
     fn test_empty_database_descriptor() {
-        let descriptor = DatabaseDescriptor::builder().build().unwrap();
+        let descriptor = DatabaseDescriptor::builder().build();
         let json_bytes = descriptor.to_json_bytes().unwrap();
         let deserialized = 
DatabaseDescriptor::from_json_bytes(&json_bytes).unwrap();
         assert_eq!(descriptor, deserialized);
diff --git a/crates/fluss/src/metadata/table.rs 
b/crates/fluss/src/metadata/table.rs
index 90e3573..2b48ec6 100644
--- a/crates/fluss/src/metadata/table.rs
+++ b/crates/fluss/src/metadata/table.rs
@@ -589,7 +589,7 @@ impl LogFormat {
         match s.to_uppercase().as_str() {
             "ARROW" => Ok(LogFormat::ARROW),
             "INDEXED" => Ok(LogFormat::INDEXED),
-            _ => Err(InvalidTableError(format!("Unknown log format: {}", s))),
+            _ => Err(InvalidTableError(format!("Unknown log format: {s}"))),
         }
     }
 }
@@ -615,7 +615,7 @@ impl KvFormat {
         match s.to_uppercase().as_str() {
             "INDEXED" => Ok(KvFormat::INDEXED),
             "COMPACTED" => Ok(KvFormat::COMPACTED),
-            _ => Err(InvalidTableError(format!("Unknown kv format: {}", s))),
+            _ => Err(InvalidTableError(format!("Unknown kv format: {s}"))),
         }
     }
 }
diff --git a/crates/fluss/tests/integration/admin.rs 
b/crates/fluss/tests/integration/admin.rs
new file mode 100644
index 0000000..73f52db
--- /dev/null
+++ b/crates/fluss/tests/integration/admin.rs
@@ -0,0 +1,131 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+use crate::integration::fluss_cluster::FlussTestingCluster;
+use once_cell::sync::Lazy;
+use parking_lot::RwLock;
+use std::sync::Arc;
+
+#[cfg(test)]
+use test_env_helpers::*;
+
+// Module-level shared cluster instance (only for this test file)
+static SHARED_FLUSS_CLUSTER: Lazy<Arc<RwLock<Option<FlussTestingCluster>>>> =
+    Lazy::new(|| Arc::new(RwLock::new(None)));
+
+#[cfg(test)]
+#[before_all]
+#[after_all]
+mod admin_test {
+    use super::SHARED_FLUSS_CLUSTER;
+    use crate::integration::fluss_cluster::{FlussTestingCluster, 
FlussTestingClusterBuilder};
+    use fluss::metadata::DatabaseDescriptorBuilder;
+    use std::sync::Arc;
+
+    fn before_all() {
+        // Create a new tokio runtime in a separate thread
+        let cluster_guard = SHARED_FLUSS_CLUSTER.clone();
+        std::thread::spawn(move || {
+            let rt = tokio::runtime::Runtime::new().expect("Failed to create 
runtime");
+            rt.block_on(async {
+                let cluster = FlussTestingClusterBuilder::new().build().await;
+                let mut guard = cluster_guard.write();
+                *guard = Some(cluster);
+            });
+        })
+        .join()
+        .expect("Failed to create cluster");
+    }
+
+    fn get_fluss_cluster() -> Arc<FlussTestingCluster> {
+        let cluster_guard = SHARED_FLUSS_CLUSTER.read();
+        if cluster_guard.is_none() {
+            panic!("Fluss cluster not initialized. Make sure before_all() was 
called.");
+        }
+        Arc::new(cluster_guard.as_ref().unwrap().clone())
+    }
+
+    fn after_all() {
+        // Create a new tokio runtime in a separate thread
+        let cluster_guard = SHARED_FLUSS_CLUSTER.clone();
+        std::thread::spawn(move || {
+            let rt = tokio::runtime::Runtime::new().expect("Failed to create 
runtime");
+            rt.block_on(async {
+                let mut guard = cluster_guard.write();
+                if let Some(cluster) = guard.take() {
+                    cluster.stop().await;
+                }
+            });
+        })
+        .join()
+        .expect("Failed to cleanup cluster");
+    }
+
+    #[tokio::test]
+    async fn test_create_database() {
+        let cluster = get_fluss_cluster();
+        let connection = cluster.get_fluss_connection().await;
+
+        let admin = connection.get_admin().await.expect("should get admin");
+
+        let db_descriptor = DatabaseDescriptorBuilder::default()
+            .comment("test_db")
+            .custom_properties(
+                [
+                    ("k1".to_string(), "v1".to_string()),
+                    ("k2".to_string(), "v2".to_string()),
+                ]
+                .into(),
+            )
+            .build();
+
+        let db_name = "test_create_database";
+
+        assert_eq!(admin.database_exists(db_name).await.unwrap(), false);
+
+        // create database
+        admin
+            .create_database(db_name, false, Some(&db_descriptor))
+            .await
+            .expect("should create database");
+
+        // database should exist
+        assert_eq!(admin.database_exists(db_name).await.unwrap(), true);
+
+        // get database
+        let db_info = admin
+            .get_database_info(db_name)
+            .await
+            .expect("should get database info");
+
+        assert_eq!(db_info.database_name(), db_name);
+        assert_eq!(db_info.database_descriptor(), &db_descriptor);
+
+        // drop database
+        admin.drop_database(db_name, false, true).await;
+
+        // database shouldn't exist now
+        assert_eq!(admin.database_exists(db_name).await.unwrap(), false);
+
+        // Note: We don't stop the shared cluster here as it's used by other 
tests
+    }
+
+    #[tokio::test]
+    async fn test_create_table() {
+        // todo
+    }
+}
diff --git a/crates/fluss/tests/integration/client/mod.rs 
b/crates/fluss/tests/integration/client/mod.rs
deleted file mode 100644
index 567c358..0000000
--- a/crates/fluss/tests/integration/client/mod.rs
+++ /dev/null
@@ -1,21 +0,0 @@
-// Licensed to the Apache Software Foundation (ASF) under one
-// or more contributor license agreements.  See the NOTICE file
-// distributed with this work for additional information
-// regarding copyright ownership.  The ASF licenses this file
-// to you under the Apache License, Version 2.0 (the
-// "License"); you may not use this file except in compliance
-// with the License.  You may obtain a copy of the License at
-//
-//   http://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing,
-// software distributed under the License is distributed on an
-// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-// KIND, either express or implied.  See the License for the
-// specific language governing permissions and limitations
-// under the License.
-
-#[test]
-fn test() {
-    println!("Running integration tests");
-}
diff --git a/crates/fluss/tests/integration/fluss_cluster.rs 
b/crates/fluss/tests/integration/fluss_cluster.rs
new file mode 100644
index 0000000..83a4795
--- /dev/null
+++ b/crates/fluss/tests/integration/fluss_cluster.rs
@@ -0,0 +1,192 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+use fluss::client::FlussConnection;
+use fluss::config::Config;
+use std::collections::HashMap;
+use std::string::ToString;
+use std::sync::Arc;
+use std::time::Duration;
+use testcontainers::core::ContainerPort;
+use testcontainers::runners::AsyncRunner;
+use testcontainers::{ContainerAsync, GenericImage, ImageExt};
+
+const FLUSS_VERSION: &str = "0.7.0";
+
+pub struct FlussTestingClusterBuilder {
+    number_of_tablet_servers: usize,
+    network: &'static str,
+    cluster_conf: HashMap<String, String>,
+}
+
+impl FlussTestingClusterBuilder {
+    pub fn new() -> Self {
+        // reduce testing resources
+        let mut cluster_conf = HashMap::new();
+        cluster_conf.insert(
+            "netty.server.num-network-threads".to_string(),
+            "1".to_string(),
+        );
+        cluster_conf.insert(
+            "netty.server.num-worker-threads".to_string(),
+            "3".to_string(),
+        );
+
+        FlussTestingClusterBuilder {
+            number_of_tablet_servers: 1,
+            cluster_conf,
+            network: "fluss-cluster-network",
+        }
+    }
+
+    pub async fn build(&mut self) -> FlussTestingCluster {
+        let zookeeper = Arc::new(
+            GenericImage::new("zookeeper", "3.9.2")
+                .with_network(self.network)
+                .with_container_name("zookeeper")
+                .start()
+                .await
+                .unwrap(),
+        );
+
+        let coordinator_server = 
Arc::new(self.start_coordinator_server().await);
+
+        let mut tablet_servers = HashMap::new();
+        for server_id in 0..self.number_of_tablet_servers {
+            tablet_servers.insert(
+                server_id,
+                Arc::new(self.start_tablet_server(server_id).await),
+            );
+        }
+
+        FlussTestingCluster {
+            zookeeper,
+            coordinator_server,
+            tablet_servers,
+            bootstrap_servers: "127.0.0.1:9123".to_string(),
+        }
+    }
+
+    async fn start_coordinator_server(&mut self) -> 
ContainerAsync<GenericImage> {
+        let mut coordinator_confs = HashMap::new();
+        coordinator_confs.insert("zookeeper.address", "zookeeper:2181");
+        coordinator_confs.insert(
+            "bind.listeners",
+            "INTERNAL://coordinator-server:0, 
CLIENT://coordinator-server:9123",
+        );
+        coordinator_confs.insert("advertised.listeners", 
"CLIENT://localhost:9123");
+        coordinator_confs.insert("internal.listener.name", "INTERNAL");
+        GenericImage::new("fluss/fluss", FLUSS_VERSION)
+            .with_container_name("coordinator-server")
+            .with_mapped_port(9123, ContainerPort::Tcp(9123))
+            .with_network(self.network)
+            .with_cmd(vec!["coordinatorServer"])
+            .with_env_var(
+                "FLUSS_PROPERTIES",
+                self.to_fluss_properties_with(coordinator_confs),
+            )
+            .start()
+            .await
+            .unwrap()
+    }
+
+    async fn start_tablet_server(&self, server_id: usize) -> 
ContainerAsync<GenericImage> {
+        let mut tablet_server_confs = HashMap::new();
+        let bind_listeners = format!(
+            "INTERNAL://tablet-server-{}:0, CLIENT://tablet-server-{}:9123",
+            server_id, server_id
+        );
+        let expose_host_port = 9124 + server_id;
+        let advertised_listeners = format!("CLIENT://localhost:{}", 
expose_host_port);
+        let tablet_server_id = format!("{}", server_id);
+        tablet_server_confs.insert("zookeeper.address", "zookeeper:2181");
+        tablet_server_confs.insert("bind.listeners", bind_listeners.as_str());
+        tablet_server_confs.insert("advertised.listeners", 
advertised_listeners.as_str());
+        tablet_server_confs.insert("internal.listener.name", "INTERNAL");
+        tablet_server_confs.insert("tablet-server.id", 
tablet_server_id.as_str());
+
+        GenericImage::new("fluss/fluss", FLUSS_VERSION)
+            .with_cmd(vec!["tabletServer"])
+            .with_mapped_port(expose_host_port as u16, 
ContainerPort::Tcp(9123))
+            .with_network(self.network)
+            .with_container_name(format!("tablet-server-{}", server_id))
+            .with_env_var(
+                "FLUSS_PROPERTIES",
+                self.to_fluss_properties_with(tablet_server_confs),
+            )
+            .start()
+            .await
+            .unwrap()
+    }
+
+    fn to_fluss_properties_with(&self, extra_properties: HashMap<&str, &str>) 
-> String {
+        let mut fluss_properties = Vec::new();
+        for (k, v) in self.cluster_conf.iter() {
+            fluss_properties.push(format!("{}: {}", k, v));
+        }
+        for (k, v) in extra_properties.iter() {
+            fluss_properties.push(format!("{}: {}", k, v));
+        }
+        fluss_properties.join("\n")
+    }
+}
+
+/// Provides an easy way to launch a Fluss cluster with coordinator and tablet 
servers.
+#[derive(Clone)]
+pub struct FlussTestingCluster {
+    zookeeper: Arc<ContainerAsync<GenericImage>>,
+    coordinator_server: Arc<ContainerAsync<GenericImage>>,
+    tablet_servers: HashMap<usize, Arc<ContainerAsync<GenericImage>>>,
+    bootstrap_servers: String,
+}
+
+impl FlussTestingCluster {
+    pub async fn stop(&self) {
+        for tablet_server in self.tablet_servers.values() {
+            tablet_server.stop().await.unwrap()
+        }
+        self.coordinator_server.stop().await.unwrap();
+        self.zookeeper.stop().await.unwrap();
+    }
+
+    pub async fn get_fluss_connection(&self) -> FlussConnection {
+        let mut config = Config::default();
+        config.bootstrap_server = Some(self.bootstrap_servers.clone());
+
+        // Retry mechanism: retry for up to 1 minute
+        let max_retries = 60; // 60 retry attempts
+        let retry_interval = Duration::from_secs(1); // 1 second interval 
between retries
+
+        for attempt in 1..=max_retries {
+            match FlussConnection::new(config.clone()).await {
+                Ok(connection) => {
+                    return connection;
+                }
+                Err(e) => {
+                    if attempt == max_retries {
+                        panic!(
+                            "Failed to connect to Fluss cluster after {} 
attempts: {}",
+                            max_retries, e
+                        );
+                    }
+                    tokio::time::sleep(retry_interval).await;
+                }
+            }
+        }
+        unreachable!()
+    }
+}
diff --git a/crates/fluss/tests/test_fluss.rs b/crates/fluss/tests/test_fluss.rs
index 7840638..28b9bef 100644
--- a/crates/fluss/tests/test_fluss.rs
+++ b/crates/fluss/tests/test_fluss.rs
@@ -20,6 +20,6 @@ extern crate fluss;
 
 #[cfg(feature = "integration_tests")]
 mod integration {
-
-    mod client;
+    mod admin;
+    mod fluss_cluster;
 }

Reply via email to