This is an automated email from the ASF dual-hosted git repository.
liurenjie1024 pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/iceberg-rust.git
The following commit(s) were added to refs/heads/main by this push:
new 9b8f791 reuse docker container to save compute resources (#428)
9b8f791 is described below
commit 9b8f7913e8b3136f346ab9aea60ab5c2512c5f0d
Author: thexia <[email protected]>
AuthorDate: Tue Jul 9 22:39:13 2024 +0800
reuse docker container to save compute resources (#428)
* reuse docker container to save compute resources
* add lazy resuse docker compose
* refactor test fixture: the docker compose init is reused
* use ctor and dtor to start docker compose and destory docker compose
* fix cargo fmt check
* fix cargo clippy
* fix cargo fmt
* fix cargo sort
* add namespace for datafusion test
* add empty check for list glue catalog namespace
---------
Co-authored-by: thexiay <[email protected]>
---
Cargo.toml | 1 +
crates/catalog/glue/Cargo.toml | 1 +
crates/catalog/glue/tests/glue_catalog_test.rs | 221 +++++++++------------
crates/catalog/hms/Cargo.toml | 1 +
crates/catalog/hms/tests/hms_catalog_test.rs | 201 +++++++++----------
crates/catalog/rest/Cargo.toml | 1 +
crates/catalog/rest/tests/rest_catalog_test.rs | 170 ++++++++--------
crates/integrations/datafusion/Cargo.toml | 1 +
.../tests/integration_datafusion_hms_test.rs | 98 ++++++---
9 files changed, 344 insertions(+), 351 deletions(-)
diff --git a/Cargo.toml b/Cargo.toml
index db441ca..c964a6f 100644
--- a/Cargo.toml
+++ b/Cargo.toml
@@ -53,6 +53,7 @@ bimap = "0.6"
bitvec = "1.0.1"
bytes = "1.5"
chrono = "0.4.34"
+ctor = "0.2.8"
derive_builder = "0.20.0"
either = "1"
env_logger = "0.11.0"
diff --git a/crates/catalog/glue/Cargo.toml b/crates/catalog/glue/Cargo.toml
index 0508378..0d2e1f9 100644
--- a/crates/catalog/glue/Cargo.toml
+++ b/crates/catalog/glue/Cargo.toml
@@ -41,5 +41,6 @@ typed-builder = { workspace = true }
uuid = { workspace = true }
[dev-dependencies]
+ctor = { workspace = true }
iceberg_test_utils = { path = "../../test_utils", features = ["tests"] }
port_scanner = { workspace = true }
diff --git a/crates/catalog/glue/tests/glue_catalog_test.rs
b/crates/catalog/glue/tests/glue_catalog_test.rs
index eb0cd96..3edd8cd 100644
--- a/crates/catalog/glue/tests/glue_catalog_test.rs
+++ b/crates/catalog/glue/tests/glue_catalog_test.rs
@@ -18,7 +18,9 @@
//! Integration tests for glue catalog.
use std::collections::HashMap;
+use std::sync::RwLock;
+use ctor::{ctor, dtor};
use iceberg::io::{S3_ACCESS_KEY_ID, S3_ENDPOINT, S3_REGION,
S3_SECRET_ACCESS_KEY};
use iceberg::spec::{NestedField, PrimitiveType, Schema, Type};
use iceberg::{Catalog, Namespace, NamespaceIdent, Result, TableCreation,
TableIdent};
@@ -32,26 +34,36 @@ use tokio::time::sleep;
const GLUE_CATALOG_PORT: u16 = 5000;
const MINIO_PORT: u16 = 9000;
+static DOCKER_COMPOSE_ENV: RwLock<Option<DockerCompose>> = RwLock::new(None);
-#[derive(Debug)]
-struct TestFixture {
- _docker_compose: DockerCompose,
- glue_catalog: GlueCatalog,
-}
-
-async fn set_test_fixture(func: &str) -> TestFixture {
- set_up();
-
+#[ctor]
+fn before_all() {
+ let mut guard = DOCKER_COMPOSE_ENV.write().unwrap();
let docker_compose = DockerCompose::new(
- normalize_test_name(format!("{}_{func}", module_path!())),
+ normalize_test_name(module_path!()),
format!("{}/testdata/glue_catalog", env!("CARGO_MANIFEST_DIR")),
);
-
docker_compose.run();
+ guard.replace(docker_compose);
+}
+
+#[dtor]
+fn after_all() {
+ let mut guard = DOCKER_COMPOSE_ENV.write().unwrap();
+ guard.take();
+}
- let glue_catalog_ip = docker_compose.get_container_ip("moto");
- let minio_ip = docker_compose.get_container_ip("minio");
+async fn get_catalog() -> GlueCatalog {
+ set_up();
+ let (glue_catalog_ip, minio_ip) = {
+ let guard = DOCKER_COMPOSE_ENV.read().unwrap();
+ let docker_compose = guard.as_ref().unwrap();
+ (
+ docker_compose.get_container_ip("moto"),
+ docker_compose.get_container_ip("minio"),
+ )
+ };
let read_port = format!("{}:{}", glue_catalog_ip, GLUE_CATALOG_PORT);
loop {
if !scan_port_addr(&read_port) {
@@ -84,21 +96,12 @@ async fn set_test_fixture(func: &str) -> TestFixture {
.props(props.clone())
.build();
- let glue_catalog = GlueCatalog::new(config).await.unwrap();
-
- TestFixture {
- _docker_compose: docker_compose,
- glue_catalog,
- }
+ GlueCatalog::new(config).await.unwrap()
}
-async fn set_test_namespace(fixture: &TestFixture, namespace: &NamespaceIdent)
-> Result<()> {
+async fn set_test_namespace(catalog: &GlueCatalog, namespace: &NamespaceIdent)
-> Result<()> {
let properties = HashMap::new();
-
- fixture
- .glue_catalog
- .create_namespace(namespace, properties)
- .await?;
+ catalog.create_namespace(namespace, properties).await?;
Ok(())
}
@@ -124,33 +127,26 @@ fn set_table_creation(location: impl ToString, name: impl
ToString) -> Result<Ta
#[tokio::test]
async fn test_rename_table() -> Result<()> {
- let fixture = set_test_fixture("test_rename_table").await;
+ let catalog = get_catalog().await;
let creation = set_table_creation("s3a://warehouse/hive", "my_table")?;
- let namespace = Namespace::new(NamespaceIdent::new("my_database".into()));
+ let namespace =
Namespace::new(NamespaceIdent::new("test_rename_table".into()));
- fixture
- .glue_catalog
+ catalog
.create_namespace(namespace.name(), HashMap::new())
.await?;
- let table = fixture
- .glue_catalog
- .create_table(namespace.name(), creation)
- .await?;
+ let table = catalog.create_table(namespace.name(), creation).await?;
let dest = TableIdent::new(namespace.name().clone(),
"my_table_rename".to_string());
- fixture
- .glue_catalog
- .rename_table(table.identifier(), &dest)
- .await?;
+ catalog.rename_table(table.identifier(), &dest).await?;
- let table = fixture.glue_catalog.load_table(&dest).await?;
+ let table = catalog.load_table(&dest).await?;
assert_eq!(table.identifier(), &dest);
let src = TableIdent::new(namespace.name().clone(),
"my_table".to_string());
- let src_table_exists = fixture.glue_catalog.table_exists(&src).await?;
+ let src_table_exists = catalog.table_exists(&src).await?;
assert!(!src_table_exists);
Ok(())
@@ -158,29 +154,22 @@ async fn test_rename_table() -> Result<()> {
#[tokio::test]
async fn test_table_exists() -> Result<()> {
- let fixture = set_test_fixture("test_table_exists").await;
+ let catalog = get_catalog().await;
let creation = set_table_creation("s3a://warehouse/hive", "my_table")?;
- let namespace = Namespace::new(NamespaceIdent::new("my_database".into()));
+ let namespace =
Namespace::new(NamespaceIdent::new("test_table_exists".into()));
- fixture
- .glue_catalog
+ catalog
.create_namespace(namespace.name(), HashMap::new())
.await?;
let ident = TableIdent::new(namespace.name().clone(),
"my_table".to_string());
- let exists = fixture.glue_catalog.table_exists(&ident).await?;
+ let exists = catalog.table_exists(&ident).await?;
assert!(!exists);
- let table = fixture
- .glue_catalog
- .create_table(namespace.name(), creation)
- .await?;
+ let table = catalog.create_table(namespace.name(), creation).await?;
- let exists = fixture
- .glue_catalog
- .table_exists(table.identifier())
- .await?;
+ let exists = catalog.table_exists(table.identifier()).await?;
assert!(exists);
@@ -189,26 +178,19 @@ async fn test_table_exists() -> Result<()> {
#[tokio::test]
async fn test_drop_table() -> Result<()> {
- let fixture = set_test_fixture("test_drop_table").await;
+ let catalog = get_catalog().await;
let creation = set_table_creation("s3a://warehouse/hive", "my_table")?;
- let namespace = Namespace::new(NamespaceIdent::new("my_database".into()));
+ let namespace =
Namespace::new(NamespaceIdent::new("test_drop_table".into()));
- fixture
- .glue_catalog
+ catalog
.create_namespace(namespace.name(), HashMap::new())
.await?;
- let table = fixture
- .glue_catalog
- .create_table(namespace.name(), creation)
- .await?;
+ let table = catalog.create_table(namespace.name(), creation).await?;
- fixture.glue_catalog.drop_table(table.identifier()).await?;
+ catalog.drop_table(table.identifier()).await?;
- let result = fixture
- .glue_catalog
- .table_exists(table.identifier())
- .await?;
+ let result = catalog.table_exists(table.identifier()).await?;
assert!(!result);
@@ -217,22 +199,17 @@ async fn test_drop_table() -> Result<()> {
#[tokio::test]
async fn test_load_table() -> Result<()> {
- let fixture = set_test_fixture("test_load_table").await;
+ let catalog = get_catalog().await;
let creation = set_table_creation("s3a://warehouse/hive", "my_table")?;
- let namespace = Namespace::new(NamespaceIdent::new("my_database".into()));
+ let namespace =
Namespace::new(NamespaceIdent::new("test_load_table".into()));
- fixture
- .glue_catalog
+ catalog
.create_namespace(namespace.name(), HashMap::new())
.await?;
- let expected = fixture
- .glue_catalog
- .create_table(namespace.name(), creation)
- .await?;
+ let expected = catalog.create_table(namespace.name(), creation).await?;
- let result = fixture
- .glue_catalog
+ let result = catalog
.load_table(&TableIdent::new(
namespace.name().clone(),
"my_table".to_string(),
@@ -248,23 +225,19 @@ async fn test_load_table() -> Result<()> {
#[tokio::test]
async fn test_create_table() -> Result<()> {
- let fixture = set_test_fixture("test_create_table").await;
- let namespace = NamespaceIdent::new("my_database".to_string());
- set_test_namespace(&fixture, &namespace).await?;
+ let catalog = get_catalog().await;
+ let namespace = NamespaceIdent::new("test_create_table".to_string());
+ set_test_namespace(&catalog, &namespace).await?;
let creation = set_table_creation("s3a://warehouse/hive", "my_table")?;
- let result = fixture
- .glue_catalog
- .create_table(&namespace, creation)
- .await?;
+ let result = catalog.create_table(&namespace, creation).await?;
assert_eq!(result.identifier().name(), "my_table");
assert!(result
.metadata_location()
.is_some_and(|location|
location.starts_with("s3a://warehouse/hive/metadata/00000-")));
assert!(
- fixture
- .glue_catalog
+ catalog
.file_io()
.is_exist("s3a://warehouse/hive/metadata/")
.await?
@@ -275,12 +248,12 @@ async fn test_create_table() -> Result<()> {
#[tokio::test]
async fn test_list_tables() -> Result<()> {
- let fixture = set_test_fixture("test_list_tables").await;
- let namespace = NamespaceIdent::new("my_database".to_string());
- set_test_namespace(&fixture, &namespace).await?;
+ let catalog = get_catalog().await;
+ let namespace = NamespaceIdent::new("test_list_tables".to_string());
+ set_test_namespace(&catalog, &namespace).await?;
let expected = vec![];
- let result = fixture.glue_catalog.list_tables(&namespace).await?;
+ let result = catalog.list_tables(&namespace).await?;
assert_eq!(result, expected);
@@ -289,16 +262,16 @@ async fn test_list_tables() -> Result<()> {
#[tokio::test]
async fn test_drop_namespace() -> Result<()> {
- let fixture = set_test_fixture("test_drop_namespace").await;
- let namespace = NamespaceIdent::new("my_database".to_string());
- set_test_namespace(&fixture, &namespace).await?;
+ let catalog = get_catalog().await;
+ let namespace = NamespaceIdent::new("test_drop_namespace".to_string());
+ set_test_namespace(&catalog, &namespace).await?;
- let exists = fixture.glue_catalog.namespace_exists(&namespace).await?;
+ let exists = catalog.namespace_exists(&namespace).await?;
assert!(exists);
- fixture.glue_catalog.drop_namespace(&namespace).await?;
+ catalog.drop_namespace(&namespace).await?;
- let exists = fixture.glue_catalog.namespace_exists(&namespace).await?;
+ let exists = catalog.namespace_exists(&namespace).await?;
assert!(!exists);
Ok(())
@@ -306,23 +279,20 @@ async fn test_drop_namespace() -> Result<()> {
#[tokio::test]
async fn test_update_namespace() -> Result<()> {
- let fixture = set_test_fixture("test_update_namespace").await;
- let namespace = NamespaceIdent::new("my_database".into());
- set_test_namespace(&fixture, &namespace).await?;
+ let catalog = get_catalog().await;
+ let namespace = NamespaceIdent::new("test_update_namespace".into());
+ set_test_namespace(&catalog, &namespace).await?;
- let before_update = fixture.glue_catalog.get_namespace(&namespace).await?;
+ let before_update = catalog.get_namespace(&namespace).await?;
let before_update = before_update.properties().get("description");
assert_eq!(before_update, None);
let properties = HashMap::from([("description".to_string(),
"my_update".to_string())]);
- fixture
- .glue_catalog
- .update_namespace(&namespace, properties)
- .await?;
+ catalog.update_namespace(&namespace, properties).await?;
- let after_update = fixture.glue_catalog.get_namespace(&namespace).await?;
+ let after_update = catalog.get_namespace(&namespace).await?;
let after_update = after_update.properties().get("description");
assert_eq!(after_update, Some("my_update".to_string()).as_ref());
@@ -332,16 +302,16 @@ async fn test_update_namespace() -> Result<()> {
#[tokio::test]
async fn test_namespace_exists() -> Result<()> {
- let fixture = set_test_fixture("test_namespace_exists").await;
+ let catalog = get_catalog().await;
- let namespace = NamespaceIdent::new("my_database".into());
+ let namespace = NamespaceIdent::new("test_namespace_exists".into());
- let exists = fixture.glue_catalog.namespace_exists(&namespace).await?;
+ let exists = catalog.namespace_exists(&namespace).await?;
assert!(!exists);
- set_test_namespace(&fixture, &namespace).await?;
+ set_test_namespace(&catalog, &namespace).await?;
- let exists = fixture.glue_catalog.namespace_exists(&namespace).await?;
+ let exists = catalog.namespace_exists(&namespace).await?;
assert!(exists);
Ok(())
@@ -349,16 +319,16 @@ async fn test_namespace_exists() -> Result<()> {
#[tokio::test]
async fn test_get_namespace() -> Result<()> {
- let fixture = set_test_fixture("test_get_namespace").await;
+ let catalog = get_catalog().await;
- let namespace = NamespaceIdent::new("my_database".into());
+ let namespace = NamespaceIdent::new("test_get_namespace".into());
- let does_not_exist = fixture.glue_catalog.get_namespace(&namespace).await;
+ let does_not_exist = catalog.get_namespace(&namespace).await;
assert!(does_not_exist.is_err());
- set_test_namespace(&fixture, &namespace).await?;
+ set_test_namespace(&catalog, &namespace).await?;
- let result = fixture.glue_catalog.get_namespace(&namespace).await?;
+ let result = catalog.get_namespace(&namespace).await?;
let expected = Namespace::new(namespace);
assert_eq!(result, expected);
@@ -368,17 +338,14 @@ async fn test_get_namespace() -> Result<()> {
#[tokio::test]
async fn test_create_namespace() -> Result<()> {
- let fixture = set_test_fixture("test_create_namespace").await;
+ let catalog = get_catalog().await;
let properties = HashMap::new();
- let namespace = NamespaceIdent::new("my_database".into());
+ let namespace = NamespaceIdent::new("test_create_namespace".into());
let expected = Namespace::new(namespace.clone());
- let result = fixture
- .glue_catalog
- .create_namespace(&namespace, properties)
- .await?;
+ let result = catalog.create_namespace(&namespace, properties).await?;
assert_eq!(result, expected);
@@ -387,18 +354,16 @@ async fn test_create_namespace() -> Result<()> {
#[tokio::test]
async fn test_list_namespace() -> Result<()> {
- let fixture = set_test_fixture("test_list_namespace").await;
+ let catalog = get_catalog().await;
- let expected = vec![];
- let result = fixture.glue_catalog.list_namespaces(None).await?;
- assert_eq!(result, expected);
+ let namespace = NamespaceIdent::new("test_list_namespace".to_string());
+ set_test_namespace(&catalog, &namespace).await?;
- let namespace = NamespaceIdent::new("my_database".to_string());
- set_test_namespace(&fixture, &namespace).await?;
+ let result = catalog.list_namespaces(None).await?;
+ assert!(result.contains(&namespace));
- let expected = vec![namespace];
- let result = fixture.glue_catalog.list_namespaces(None).await?;
- assert_eq!(result, expected);
+ let empty_result = catalog.list_namespaces(Some(&namespace)).await?;
+ assert!(empty_result.is_empty());
Ok(())
}
diff --git a/crates/catalog/hms/Cargo.toml b/crates/catalog/hms/Cargo.toml
index 5a03221..e7d4ec2 100644
--- a/crates/catalog/hms/Cargo.toml
+++ b/crates/catalog/hms/Cargo.toml
@@ -43,5 +43,6 @@ uuid = { workspace = true }
volo-thrift = { workspace = true }
[dev-dependencies]
+ctor = { workspace = true }
iceberg_test_utils = { path = "../../test_utils", features = ["tests"] }
port_scanner = { workspace = true }
diff --git a/crates/catalog/hms/tests/hms_catalog_test.rs
b/crates/catalog/hms/tests/hms_catalog_test.rs
index 3dd2c7d..a109757 100644
--- a/crates/catalog/hms/tests/hms_catalog_test.rs
+++ b/crates/catalog/hms/tests/hms_catalog_test.rs
@@ -18,7 +18,9 @@
//! Integration tests for hms catalog.
use std::collections::HashMap;
+use std::sync::RwLock;
+use ctor::{ctor, dtor};
use iceberg::io::{S3_ACCESS_KEY_ID, S3_ENDPOINT, S3_REGION,
S3_SECRET_ACCESS_KEY};
use iceberg::spec::{NestedField, PrimitiveType, Schema, Type};
use iceberg::{Catalog, Namespace, NamespaceIdent, TableCreation, TableIdent};
@@ -30,29 +32,42 @@ use tokio::time::sleep;
const HMS_CATALOG_PORT: u16 = 9083;
const MINIO_PORT: u16 = 9000;
+static DOCKER_COMPOSE_ENV: RwLock<Option<DockerCompose>> = RwLock::new(None);
type Result<T> = std::result::Result<T, iceberg::Error>;
-struct TestFixture {
- _docker_compose: DockerCompose,
- hms_catalog: HmsCatalog,
-}
-
-async fn set_test_fixture(func: &str) -> TestFixture {
- set_up();
-
+#[ctor]
+fn before_all() {
+ let mut guard = DOCKER_COMPOSE_ENV.write().unwrap();
let docker_compose = DockerCompose::new(
- normalize_test_name(format!("{}_{func}", module_path!())),
+ normalize_test_name(module_path!()),
format!("{}/testdata/hms_catalog", env!("CARGO_MANIFEST_DIR")),
);
-
docker_compose.run();
+ guard.replace(docker_compose);
+}
+
+#[dtor]
+fn after_all() {
+ let mut guard = DOCKER_COMPOSE_ENV.write().unwrap();
+ guard.take();
+}
- let hms_catalog_ip = docker_compose.get_container_ip("hive-metastore");
- let minio_ip = docker_compose.get_container_ip("minio");
+async fn get_catalog() -> HmsCatalog {
+ set_up();
+
+ let (hms_catalog_ip, minio_ip) = {
+ let guard = DOCKER_COMPOSE_ENV.read().unwrap();
+ let docker_compose = guard.as_ref().unwrap();
+ (
+ docker_compose.get_container_ip("hive-metastore"),
+ docker_compose.get_container_ip("minio"),
+ )
+ };
let read_port = format!("{}:{}", hms_catalog_ip, HMS_CATALOG_PORT);
loop {
if !scan_port_addr(&read_port) {
+ log::info!("scan read_port {} check", read_port);
log::info!("Waiting for 1s hms catalog to ready...");
sleep(std::time::Duration::from_millis(1000)).await;
} else {
@@ -77,12 +92,15 @@ async fn set_test_fixture(func: &str) -> TestFixture {
.props(props)
.build();
- let hms_catalog = HmsCatalog::new(config).unwrap();
+ HmsCatalog::new(config).unwrap()
+}
+
+async fn set_test_namespace(catalog: &HmsCatalog, namespace: &NamespaceIdent)
-> Result<()> {
+ let properties = HashMap::new();
- TestFixture {
- _docker_compose: docker_compose,
- hms_catalog,
- }
+ catalog.create_namespace(namespace, properties).await?;
+
+ Ok(())
}
fn set_table_creation(location: impl ToString, name: impl ToString) ->
Result<TableCreation> {
@@ -106,23 +124,18 @@ fn set_table_creation(location: impl ToString, name: impl
ToString) -> Result<Ta
#[tokio::test]
async fn test_rename_table() -> Result<()> {
- let fixture = set_test_fixture("test_rename_table").await;
- let creation = set_table_creation("s3a://warehouse/hive", "my_table")?;
- let namespace = Namespace::new(NamespaceIdent::new("default".into()));
+ let catalog = get_catalog().await;
+ let creation: TableCreation = set_table_creation("s3a://warehouse/hive",
"my_table")?;
+ let namespace =
Namespace::new(NamespaceIdent::new("test_rename_table".into()));
+ set_test_namespace(&catalog, namespace.name()).await?;
- let table = fixture
- .hms_catalog
- .create_table(namespace.name(), creation)
- .await?;
+ let table: iceberg::table::Table = catalog.create_table(namespace.name(),
creation).await?;
let dest = TableIdent::new(namespace.name().clone(),
"my_table_rename".to_string());
- fixture
- .hms_catalog
- .rename_table(table.identifier(), &dest)
- .await?;
+ catalog.rename_table(table.identifier(), &dest).await?;
- let result = fixture.hms_catalog.table_exists(&dest).await?;
+ let result = catalog.table_exists(&dest).await?;
assert!(result);
@@ -131,16 +144,14 @@ async fn test_rename_table() -> Result<()> {
#[tokio::test]
async fn test_table_exists() -> Result<()> {
- let fixture = set_test_fixture("test_table_exists").await;
+ let catalog = get_catalog().await;
let creation = set_table_creation("s3a://warehouse/hive", "my_table")?;
- let namespace = Namespace::new(NamespaceIdent::new("default".into()));
+ let namespace =
Namespace::new(NamespaceIdent::new("test_table_exists".into()));
+ set_test_namespace(&catalog, namespace.name()).await?;
- let table = fixture
- .hms_catalog
- .create_table(namespace.name(), creation)
- .await?;
+ let table = catalog.create_table(namespace.name(), creation).await?;
- let result = fixture.hms_catalog.table_exists(table.identifier()).await?;
+ let result = catalog.table_exists(table.identifier()).await?;
assert!(result);
@@ -149,18 +160,16 @@ async fn test_table_exists() -> Result<()> {
#[tokio::test]
async fn test_drop_table() -> Result<()> {
- let fixture = set_test_fixture("test_drop_table").await;
+ let catalog = get_catalog().await;
let creation = set_table_creation("s3a://warehouse/hive", "my_table")?;
- let namespace = Namespace::new(NamespaceIdent::new("default".into()));
+ let namespace =
Namespace::new(NamespaceIdent::new("test_drop_table".into()));
+ set_test_namespace(&catalog, namespace.name()).await?;
- let table = fixture
- .hms_catalog
- .create_table(namespace.name(), creation)
- .await?;
+ let table = catalog.create_table(namespace.name(), creation).await?;
- fixture.hms_catalog.drop_table(table.identifier()).await?;
+ catalog.drop_table(table.identifier()).await?;
- let result = fixture.hms_catalog.table_exists(table.identifier()).await?;
+ let result = catalog.table_exists(table.identifier()).await?;
assert!(!result);
@@ -169,17 +178,14 @@ async fn test_drop_table() -> Result<()> {
#[tokio::test]
async fn test_load_table() -> Result<()> {
- let fixture = set_test_fixture("test_load_table").await;
+ let catalog = get_catalog().await;
let creation = set_table_creation("s3a://warehouse/hive", "my_table")?;
- let namespace = Namespace::new(NamespaceIdent::new("default".into()));
+ let namespace =
Namespace::new(NamespaceIdent::new("test_load_table".into()));
+ set_test_namespace(&catalog, namespace.name()).await?;
- let expected = fixture
- .hms_catalog
- .create_table(namespace.name(), creation)
- .await?;
+ let expected = catalog.create_table(namespace.name(), creation).await?;
- let result = fixture
- .hms_catalog
+ let result = catalog
.load_table(&TableIdent::new(
namespace.name().clone(),
"my_table".to_string(),
@@ -195,22 +201,19 @@ async fn test_load_table() -> Result<()> {
#[tokio::test]
async fn test_create_table() -> Result<()> {
- let fixture = set_test_fixture("test_create_table").await;
+ let catalog = get_catalog().await;
let creation = set_table_creation("s3a://warehouse/hive", "my_table")?;
- let namespace = Namespace::new(NamespaceIdent::new("default".into()));
+ let namespace =
Namespace::new(NamespaceIdent::new("test_create_table".into()));
+ set_test_namespace(&catalog, namespace.name()).await?;
- let result = fixture
- .hms_catalog
- .create_table(namespace.name(), creation)
- .await?;
+ let result = catalog.create_table(namespace.name(), creation).await?;
assert_eq!(result.identifier().name(), "my_table");
assert!(result
.metadata_location()
.is_some_and(|location|
location.starts_with("s3a://warehouse/hive/metadata/00000-")));
assert!(
- fixture
- .hms_catalog
+ catalog
.file_io()
.is_exist("s3a://warehouse/hive/metadata/")
.await?
@@ -221,18 +224,16 @@ async fn test_create_table() -> Result<()> {
#[tokio::test]
async fn test_list_tables() -> Result<()> {
- let fixture = set_test_fixture("test_list_tables").await;
- let ns = Namespace::new(NamespaceIdent::new("default".into()));
- let result = fixture.hms_catalog.list_tables(ns.name()).await?;
+ let catalog = get_catalog().await;
+ let ns = Namespace::new(NamespaceIdent::new("test_list_tables".into()));
+ let result = catalog.list_tables(ns.name()).await?;
+ set_test_namespace(&catalog, ns.name()).await?;
assert_eq!(result, vec![]);
let creation = set_table_creation("s3a://warehouse/hive", "my_table")?;
- fixture
- .hms_catalog
- .create_table(ns.name(), creation)
- .await?;
- let result = fixture.hms_catalog.list_tables(ns.name()).await?;
+ catalog.create_table(ns.name(), creation).await?;
+ let result = catalog.list_tables(ns.name()).await?;
assert_eq!(
result,
@@ -244,17 +245,15 @@ async fn test_list_tables() -> Result<()> {
#[tokio::test]
async fn test_list_namespace() -> Result<()> {
- let fixture = set_test_fixture("test_list_namespace").await;
+ let catalog = get_catalog().await;
- let expected_no_parent = vec![NamespaceIdent::new("default".into())];
- let result_no_parent = fixture.hms_catalog.list_namespaces(None).await?;
+ let result_no_parent = catalog.list_namespaces(None).await?;
- let result_with_parent = fixture
- .hms_catalog
+ let result_with_parent = catalog
.list_namespaces(Some(&NamespaceIdent::new("parent".into())))
.await?;
- assert_eq!(expected_no_parent, result_no_parent);
+ assert!(result_no_parent.contains(&NamespaceIdent::new("default".into())));
assert!(result_with_parent.is_empty());
Ok(())
@@ -262,7 +261,7 @@ async fn test_list_namespace() -> Result<()> {
#[tokio::test]
async fn test_create_namespace() -> Result<()> {
- let fixture = set_test_fixture("test_create_namespace").await;
+ let catalog = get_catalog().await;
let properties = HashMap::from([
("comment".to_string(), "my_description".to_string()),
@@ -279,14 +278,11 @@ async fn test_create_namespace() -> Result<()> {
]);
let ns = Namespace::with_properties(
- NamespaceIdent::new("my_namespace".into()),
+ NamespaceIdent::new("test_create_namespace".into()),
properties.clone(),
);
- let result = fixture
- .hms_catalog
- .create_namespace(ns.name(), properties)
- .await?;
+ let result = catalog.create_namespace(ns.name(), properties).await?;
assert_eq!(result, ns);
@@ -294,8 +290,8 @@ async fn test_create_namespace() -> Result<()> {
}
#[tokio::test]
-async fn test_get_namespace() -> Result<()> {
- let fixture = set_test_fixture("test_get_namespace").await;
+async fn test_get_default_namespace() -> Result<()> {
+ let catalog = get_catalog().await;
let ns = Namespace::new(NamespaceIdent::new("default".into()));
let properties = HashMap::from([
@@ -313,7 +309,7 @@ async fn test_get_namespace() -> Result<()> {
let expected =
Namespace::with_properties(NamespaceIdent::new("default".into()), properties);
- let result = fixture.hms_catalog.get_namespace(ns.name()).await?;
+ let result = catalog.get_namespace(ns.name()).await?;
assert_eq!(expected, result);
@@ -322,19 +318,13 @@ async fn test_get_namespace() -> Result<()> {
#[tokio::test]
async fn test_namespace_exists() -> Result<()> {
- let fixture = set_test_fixture("test_namespace_exists").await;
+ let catalog = get_catalog().await;
let ns_exists = Namespace::new(NamespaceIdent::new("default".into()));
- let ns_not_exists = Namespace::new(NamespaceIdent::new("not_here".into()));
+ let ns_not_exists =
Namespace::new(NamespaceIdent::new("test_namespace_exists".into()));
- let result_exists = fixture
- .hms_catalog
- .namespace_exists(ns_exists.name())
- .await?;
- let result_not_exists = fixture
- .hms_catalog
- .namespace_exists(ns_not_exists.name())
- .await?;
+ let result_exists = catalog.namespace_exists(ns_exists.name()).await?;
+ let result_not_exists =
catalog.namespace_exists(ns_not_exists.name()).await?;
assert!(result_exists);
assert!(!result_not_exists);
@@ -344,17 +334,15 @@ async fn test_namespace_exists() -> Result<()> {
#[tokio::test]
async fn test_update_namespace() -> Result<()> {
- let fixture = set_test_fixture("test_update_namespace").await;
+ let catalog = get_catalog().await;
- let ns = Namespace::new(NamespaceIdent::new("default".into()));
+ let ns = NamespaceIdent::new("test_update_namespace".into());
+ set_test_namespace(&catalog, &ns).await?;
let properties = HashMap::from([("comment".to_string(),
"my_update".to_string())]);
- fixture
- .hms_catalog
- .update_namespace(ns.name(), properties)
- .await?;
+ catalog.update_namespace(&ns, properties).await?;
- let db = fixture.hms_catalog.get_namespace(ns.name()).await?;
+ let db = catalog.get_namespace(&ns).await?;
assert_eq!(
db.properties().get("comment"),
@@ -366,21 +354,18 @@ async fn test_update_namespace() -> Result<()> {
#[tokio::test]
async fn test_drop_namespace() -> Result<()> {
- let fixture = set_test_fixture("test_drop_namespace").await;
+ let catalog = get_catalog().await;
let ns = Namespace::new(NamespaceIdent::new("delete_me".into()));
- fixture
- .hms_catalog
- .create_namespace(ns.name(), HashMap::new())
- .await?;
+ catalog.create_namespace(ns.name(), HashMap::new()).await?;
- let result = fixture.hms_catalog.namespace_exists(ns.name()).await?;
+ let result = catalog.namespace_exists(ns.name()).await?;
assert!(result);
- fixture.hms_catalog.drop_namespace(ns.name()).await?;
+ catalog.drop_namespace(ns.name()).await?;
- let result = fixture.hms_catalog.namespace_exists(ns.name()).await?;
+ let result = catalog.namespace_exists(ns.name()).await?;
assert!(!result);
Ok(())
diff --git a/crates/catalog/rest/Cargo.toml b/crates/catalog/rest/Cargo.toml
index 07943b8..add5718 100644
--- a/crates/catalog/rest/Cargo.toml
+++ b/crates/catalog/rest/Cargo.toml
@@ -45,6 +45,7 @@ typed-builder = { workspace = true }
uuid = { workspace = true, features = ["v4"] }
[dev-dependencies]
+ctor = { workspace = true }
iceberg_test_utils = { path = "../../test_utils", features = ["tests"] }
mockito = { workspace = true }
port_scanner = { workspace = true }
diff --git a/crates/catalog/rest/tests/rest_catalog_test.rs
b/crates/catalog/rest/tests/rest_catalog_test.rs
index 52f243d..621536a 100644
--- a/crates/catalog/rest/tests/rest_catalog_test.rs
+++ b/crates/catalog/rest/tests/rest_catalog_test.rs
@@ -17,6 +17,7 @@
//! Integration tests for rest catalog.
+use ctor::{ctor, dtor};
use iceberg::spec::{FormatVersion, NestedField, PrimitiveType, Schema, Type};
use iceberg::transaction::Transaction;
use iceberg::{Catalog, Namespace, NamespaceIdent, TableCreation, TableIdent};
@@ -25,26 +26,37 @@ use iceberg_test_utils::docker::DockerCompose;
use iceberg_test_utils::{normalize_test_name, set_up};
use port_scanner::scan_port_addr;
use std::collections::HashMap;
+use std::sync::RwLock;
use tokio::time::sleep;
const REST_CATALOG_PORT: u16 = 8181;
+static DOCKER_COMPOSE_ENV: RwLock<Option<DockerCompose>> = RwLock::new(None);
-struct TestFixture {
- _docker_compose: DockerCompose,
- rest_catalog: RestCatalog,
-}
-
-async fn set_test_fixture(func: &str) -> TestFixture {
- set_up();
+#[ctor]
+fn before_all() {
+ let mut guard = DOCKER_COMPOSE_ENV.write().unwrap();
let docker_compose = DockerCompose::new(
- normalize_test_name(format!("{}_{func}", module_path!())),
+ normalize_test_name(module_path!()),
format!("{}/testdata/rest_catalog", env!("CARGO_MANIFEST_DIR")),
);
-
- // Start docker compose
docker_compose.run();
+ guard.replace(docker_compose);
+}
+
+#[dtor]
+fn after_all() {
+ let mut guard = DOCKER_COMPOSE_ENV.write().unwrap();
+ guard.take();
+}
+
+async fn get_catalog() -> RestCatalog {
+ set_up();
- let rest_catalog_ip = docker_compose.get_container_ip("rest");
+ let rest_catalog_ip = {
+ let guard = DOCKER_COMPOSE_ENV.read().unwrap();
+ let docker_compose = guard.as_ref().unwrap();
+ docker_compose.get_container_ip("rest")
+ };
let read_port = format!("{}:{}", rest_catalog_ip, REST_CATALOG_PORT);
loop {
@@ -59,21 +71,15 @@ async fn set_test_fixture(func: &str) -> TestFixture {
let config = RestCatalogConfig::builder()
.uri(format!("http://{}:{}", rest_catalog_ip, REST_CATALOG_PORT))
.build();
- let rest_catalog = RestCatalog::new(config);
-
- TestFixture {
- _docker_compose: docker_compose,
- rest_catalog,
- }
+ RestCatalog::new(config)
}
#[tokio::test]
async fn test_get_non_exist_namespace() {
- let fixture = set_test_fixture("test_get_non_exist_namespace").await;
+ let catalog = get_catalog().await;
- let result = fixture
- .rest_catalog
- .get_namespace(&NamespaceIdent::from_strs(["demo"]).unwrap())
+ let result = catalog
+
.get_namespace(&NamespaceIdent::from_strs(["test_get_non_exist_namespace"]).unwrap())
.await;
assert!(result.is_err());
@@ -85,7 +91,7 @@ async fn test_get_non_exist_namespace() {
#[tokio::test]
async fn test_get_namespace() {
- let fixture = set_test_fixture("test_get_namespace").await;
+ let catalog = get_catalog().await;
let ns = Namespace::with_properties(
NamespaceIdent::from_strs(["apple", "ios"]).unwrap(),
@@ -96,11 +102,10 @@ async fn test_get_namespace() {
);
// Verify that namespace doesn't exist
- assert!(fixture.rest_catalog.get_namespace(ns.name()).await.is_err());
+ assert!(catalog.get_namespace(ns.name()).await.is_err());
// Create this namespace
- let created_ns = fixture
- .rest_catalog
+ let created_ns = catalog
.create_namespace(ns.name(), ns.properties().clone())
.await
.unwrap();
@@ -109,17 +114,17 @@ async fn test_get_namespace() {
assert_map_contains(ns.properties(), created_ns.properties());
// Check that this namespace already exists
- let get_ns = fixture.rest_catalog.get_namespace(ns.name()).await.unwrap();
+ let get_ns = catalog.get_namespace(ns.name()).await.unwrap();
assert_eq!(ns.name(), get_ns.name());
assert_map_contains(ns.properties(), created_ns.properties());
}
#[tokio::test]
async fn test_list_namespace() {
- let fixture = set_test_fixture("test_list_namespace").await;
+ let catalog = get_catalog().await;
let ns1 = Namespace::with_properties(
- NamespaceIdent::from_strs(["apple", "ios"]).unwrap(),
+ NamespaceIdent::from_strs(["test_list_namespace", "ios"]).unwrap(),
HashMap::from([
("owner".to_string(), "ray".to_string()),
("community".to_string(), "apache".to_string()),
@@ -127,7 +132,7 @@ async fn test_list_namespace() {
);
let ns2 = Namespace::with_properties(
- NamespaceIdent::from_strs(["apple", "macos"]).unwrap(),
+ NamespaceIdent::from_strs(["test_list_namespace", "macos"]).unwrap(),
HashMap::from([
("owner".to_string(), "xuanwo".to_string()),
("community".to_string(), "apache".to_string()),
@@ -135,42 +140,41 @@ async fn test_list_namespace() {
);
// Currently this namespace doesn't exist, so it should return error.
- assert!(fixture
- .rest_catalog
- .list_namespaces(Some(&NamespaceIdent::from_strs(["apple"]).unwrap()))
+ assert!(catalog
+ .list_namespaces(Some(
+ &NamespaceIdent::from_strs(["test_list_namespace"]).unwrap()
+ ))
.await
.is_err());
// Create namespaces
- fixture
- .rest_catalog
+ catalog
.create_namespace(ns1.name(), ns1.properties().clone())
.await
.unwrap();
- fixture
- .rest_catalog
+ catalog
.create_namespace(ns2.name(), ns1.properties().clone())
.await
.unwrap();
// List namespace
- let mut nss = fixture
- .rest_catalog
- .list_namespaces(Some(&NamespaceIdent::from_strs(["apple"]).unwrap()))
+ let nss = catalog
+ .list_namespaces(Some(
+ &NamespaceIdent::from_strs(["test_list_namespace"]).unwrap(),
+ ))
.await
.unwrap();
- nss.sort();
- assert_eq!(&nss[0], ns1.name());
- assert_eq!(&nss[1], ns2.name());
+ assert!(nss.contains(ns1.name()));
+ assert!(nss.contains(ns2.name()));
}
#[tokio::test]
async fn test_list_empty_namespace() {
- let fixture = set_test_fixture("test_list_empty_namespace").await;
+ let catalog = get_catalog().await;
let ns_apple = Namespace::with_properties(
- NamespaceIdent::from_strs(["apple"]).unwrap(),
+ NamespaceIdent::from_strs(["test_list_empty_namespace",
"apple"]).unwrap(),
HashMap::from([
("owner".to_string(), "ray".to_string()),
("community".to_string(), "apache".to_string()),
@@ -178,23 +182,20 @@ async fn test_list_empty_namespace() {
);
// Currently this namespace doesn't exist, so it should return error.
- assert!(fixture
- .rest_catalog
+ assert!(catalog
.list_namespaces(Some(ns_apple.name()))
.await
.is_err());
// Create namespaces
- fixture
- .rest_catalog
+ catalog
.create_namespace(ns_apple.name(), ns_apple.properties().clone())
.await
.unwrap();
// List namespace
- let nss = fixture
- .rest_catalog
- .list_namespaces(Some(&NamespaceIdent::from_strs(["apple"]).unwrap()))
+ let nss = catalog
+ .list_namespaces(Some(ns_apple.name()))
.await
.unwrap();
assert!(nss.is_empty());
@@ -202,10 +203,10 @@ async fn test_list_empty_namespace() {
#[tokio::test]
async fn test_list_root_namespace() {
- let fixture = set_test_fixture("test_list_root_namespace").await;
+ let catalog = get_catalog().await;
let ns1 = Namespace::with_properties(
- NamespaceIdent::from_strs(["apple", "ios"]).unwrap(),
+ NamespaceIdent::from_strs(["test_list_root_namespace", "apple",
"ios"]).unwrap(),
HashMap::from([
("owner".to_string(), "ray".to_string()),
("community".to_string(), "apache".to_string()),
@@ -213,7 +214,7 @@ async fn test_list_root_namespace() {
);
let ns2 = Namespace::with_properties(
- NamespaceIdent::from_strs(["google", "android"]).unwrap(),
+ NamespaceIdent::from_strs(["test_list_root_namespace", "google",
"android"]).unwrap(),
HashMap::from([
("owner".to_string(), "xuanwo".to_string()),
("community".to_string(), "apache".to_string()),
@@ -221,38 +222,34 @@ async fn test_list_root_namespace() {
);
// Currently this namespace doesn't exist, so it should return error.
- assert!(fixture
- .rest_catalog
- .list_namespaces(Some(&NamespaceIdent::from_strs(["apple"]).unwrap()))
+ assert!(catalog
+ .list_namespaces(Some(
+ &NamespaceIdent::from_strs(["test_list_root_namespace"]).unwrap()
+ ))
.await
.is_err());
// Create namespaces
- fixture
- .rest_catalog
+ catalog
.create_namespace(ns1.name(), ns1.properties().clone())
.await
.unwrap();
- fixture
- .rest_catalog
+ catalog
.create_namespace(ns2.name(), ns1.properties().clone())
.await
.unwrap();
// List namespace
- let mut nss = fixture.rest_catalog.list_namespaces(None).await.unwrap();
- nss.sort();
-
- assert_eq!(&nss[0], &NamespaceIdent::from_strs(["apple"]).unwrap());
- assert_eq!(&nss[1], &NamespaceIdent::from_strs(["google"]).unwrap());
+ let nss = catalog.list_namespaces(None).await.unwrap();
+
assert!(nss.contains(&NamespaceIdent::from_strs(["test_list_root_namespace"]).unwrap()));
}
#[tokio::test]
async fn test_create_table() {
- let fixture = set_test_fixture("test_create_table").await;
+ let catalog = get_catalog().await;
let ns = Namespace::with_properties(
- NamespaceIdent::from_strs(["apple", "ios"]).unwrap(),
+ NamespaceIdent::from_strs(["test_create_table", "apple",
"ios"]).unwrap(),
HashMap::from([
("owner".to_string(), "ray".to_string()),
("community".to_string(), "apache".to_string()),
@@ -260,8 +257,7 @@ async fn test_create_table() {
);
// Create namespaces
- fixture
- .rest_catalog
+ catalog
.create_namespace(ns.name(), ns.properties().clone())
.await
.unwrap();
@@ -282,8 +278,7 @@ async fn test_create_table() {
.schema(schema.clone())
.build();
- let table = fixture
- .rest_catalog
+ let table = catalog
.create_table(ns.name(), table_creation)
.await
.unwrap();
@@ -310,10 +305,10 @@ async fn test_create_table() {
#[tokio::test]
async fn test_update_table() {
- let fixture = set_test_fixture("test_update_table").await;
+ let catalog = get_catalog().await;
let ns = Namespace::with_properties(
- NamespaceIdent::from_strs(["apple", "ios"]).unwrap(),
+ NamespaceIdent::from_strs(["test_update_table", "apple",
"ios"]).unwrap(),
HashMap::from([
("owner".to_string(), "ray".to_string()),
("community".to_string(), "apache".to_string()),
@@ -321,8 +316,7 @@ async fn test_update_table() {
);
// Create namespaces
- fixture
- .rest_catalog
+ catalog
.create_namespace(ns.name(), ns.properties().clone())
.await
.unwrap();
@@ -344,8 +338,7 @@ async fn test_update_table() {
.schema(schema.clone())
.build();
- let table = fixture
- .rest_catalog
+ let table = catalog
.create_table(ns.name(), table_creation)
.await
.unwrap();
@@ -359,7 +352,7 @@ async fn test_update_table() {
let table2 = Transaction::new(&table)
.set_properties(HashMap::from([("prop1".to_string(),
"v1".to_string())]))
.unwrap()
- .commit(&fixture.rest_catalog)
+ .commit(&catalog)
.await
.unwrap();
@@ -378,10 +371,11 @@ fn assert_map_contains(map1: &HashMap<String, String>,
map2: &HashMap<String, St
#[tokio::test]
async fn test_list_empty_multi_level_namespace() {
- let fixture =
set_test_fixture("test_list_empty_multi_level_namespace").await;
+ let catalog = get_catalog().await;
let ns_apple = Namespace::with_properties(
- NamespaceIdent::from_strs(["a_a", "apple"]).unwrap(),
+ NamespaceIdent::from_strs(["test_list_empty_multi_level_namespace",
"a_a", "apple"])
+ .unwrap(),
HashMap::from([
("owner".to_string(), "ray".to_string()),
("community".to_string(), "apache".to_string()),
@@ -389,23 +383,23 @@ async fn test_list_empty_multi_level_namespace() {
);
// Currently this namespace doesn't exist, so it should return error.
- assert!(fixture
- .rest_catalog
+ assert!(catalog
.list_namespaces(Some(ns_apple.name()))
.await
.is_err());
// Create namespaces
- fixture
- .rest_catalog
+ catalog
.create_namespace(ns_apple.name(), ns_apple.properties().clone())
.await
.unwrap();
// List namespace
- let nss = fixture
- .rest_catalog
- .list_namespaces(Some(&NamespaceIdent::from_strs(["a_a",
"apple"]).unwrap()))
+ let nss = catalog
+ .list_namespaces(Some(
+
&NamespaceIdent::from_strs(["test_list_empty_multi_level_namespace", "a_a",
"apple"])
+ .unwrap(),
+ ))
.await
.unwrap();
assert!(nss.is_empty());
diff --git a/crates/integrations/datafusion/Cargo.toml
b/crates/integrations/datafusion/Cargo.toml
index 1b5e4ec..7e12d73 100644
--- a/crates/integrations/datafusion/Cargo.toml
+++ b/crates/integrations/datafusion/Cargo.toml
@@ -38,6 +38,7 @@ log = { workspace = true }
tokio = { workspace = true }
[dev-dependencies]
+ctor = { workspace = true }
iceberg-catalog-hms = { workspace = true }
iceberg_test_utils = { path = "../../test_utils", features = ["tests"] }
port_scanner = { workspace = true }
diff --git
a/crates/integrations/datafusion/tests/integration_datafusion_hms_test.rs
b/crates/integrations/datafusion/tests/integration_datafusion_hms_test.rs
index 20c5cc8..9ad1d40 100644
--- a/crates/integrations/datafusion/tests/integration_datafusion_hms_test.rs
+++ b/crates/integrations/datafusion/tests/integration_datafusion_hms_test.rs
@@ -18,8 +18,9 @@
//! Integration tests for Iceberg Datafusion with Hive Metastore.
use std::collections::HashMap;
-use std::sync::Arc;
+use std::sync::{Arc, RwLock};
+use ctor::{ctor, dtor};
use datafusion::arrow::datatypes::DataType;
use datafusion::execution::context::SessionContext;
use iceberg::io::{S3_ACCESS_KEY_ID, S3_ENDPOINT, S3_REGION,
S3_SECRET_ACCESS_KEY};
@@ -34,24 +35,55 @@ use tokio::time::sleep;
const HMS_CATALOG_PORT: u16 = 9083;
const MINIO_PORT: u16 = 9000;
+static DOCKER_COMPOSE_ENV: RwLock<Option<DockerCompose>> = RwLock::new(None);
struct TestFixture {
- _docker_compose: DockerCompose,
hms_catalog: HmsCatalog,
+ props: HashMap<String, String>,
+ hms_catalog_ip: String,
}
-async fn set_test_fixture(func: &str) -> TestFixture {
- set_up();
-
+#[ctor]
+fn before_all() {
+ let mut guard = DOCKER_COMPOSE_ENV.write().unwrap();
let docker_compose = DockerCompose::new(
- normalize_test_name(format!("{}_{func}", module_path!())),
+ normalize_test_name(module_path!()),
format!("{}/testdata", env!("CARGO_MANIFEST_DIR")),
);
-
docker_compose.run();
+ guard.replace(docker_compose);
+}
+
+#[dtor]
+fn after_all() {
+ let mut guard = DOCKER_COMPOSE_ENV.write().unwrap();
+ guard.take();
+}
+
+impl TestFixture {
+ fn get_catalog(&self) -> HmsCatalog {
+ let config = HmsCatalogConfig::builder()
+ .address(format!("{}:{}", self.hms_catalog_ip, HMS_CATALOG_PORT))
+ .thrift_transport(HmsThriftTransport::Buffered)
+ .warehouse("s3a://warehouse/hive".to_string())
+ .props(self.props.clone())
+ .build();
+
+ HmsCatalog::new(config).unwrap()
+ }
+}
+
+async fn get_test_fixture() -> TestFixture {
+ set_up();
- let hms_catalog_ip = docker_compose.get_container_ip("hive-metastore");
- let minio_ip = docker_compose.get_container_ip("minio");
+ let (hms_catalog_ip, minio_ip) = {
+ let guard = DOCKER_COMPOSE_ENV.read().unwrap();
+ let docker_compose = guard.as_ref().unwrap();
+ (
+ docker_compose.get_container_ip("hive-metastore"),
+ docker_compose.get_container_ip("minio"),
+ )
+ };
let read_port = format!("{}:{}", hms_catalog_ip, HMS_CATALOG_PORT);
loop {
@@ -77,17 +109,26 @@ async fn set_test_fixture(func: &str) -> TestFixture {
.address(format!("{}:{}", hms_catalog_ip, HMS_CATALOG_PORT))
.thrift_transport(HmsThriftTransport::Buffered)
.warehouse("s3a://warehouse/hive".to_string())
- .props(props)
+ .props(props.clone())
.build();
let hms_catalog = HmsCatalog::new(config).unwrap();
TestFixture {
- _docker_compose: docker_compose,
hms_catalog,
+ props,
+ hms_catalog_ip,
}
}
+async fn set_test_namespace(catalog: &HmsCatalog, namespace: &NamespaceIdent)
-> Result<()> {
+ let properties = HashMap::new();
+
+ catalog.create_namespace(namespace, properties).await?;
+
+ Ok(())
+}
+
fn set_table_creation(location: impl ToString, name: impl ToString) ->
Result<TableCreation> {
let schema = Schema::builder()
.with_schema_id(0)
@@ -109,24 +150,24 @@ fn set_table_creation(location: impl ToString, name: impl
ToString) -> Result<Ta
#[tokio::test]
async fn test_provider_get_table_schema() -> Result<()> {
- let fixture = set_test_fixture("test_provider_get_table_schema").await;
+ let fixture = get_test_fixture().await;
+ let namespace =
NamespaceIdent::new("test_provider_get_table_schema".to_string());
+ set_test_namespace(&fixture.hms_catalog, &namespace).await?;
- let namespace = NamespaceIdent::new("default".to_string());
let creation = set_table_creation("s3a://warehouse/hive", "my_table")?;
-
fixture
.hms_catalog
.create_table(&namespace, creation)
.await?;
- let client = Arc::new(fixture.hms_catalog);
+ let client = Arc::new(fixture.get_catalog());
let catalog = Arc::new(IcebergCatalogProvider::try_new(client).await?);
let ctx = SessionContext::new();
ctx.register_catalog("hive", catalog);
let provider = ctx.catalog("hive").unwrap();
- let schema = provider.schema("default").unwrap();
+ let schema = provider.schema("test_provider_get_table_schema").unwrap();
let table = schema.table("my_table").await.unwrap().unwrap();
let table_schema = table.schema();
@@ -144,24 +185,24 @@ async fn test_provider_get_table_schema() -> Result<()> {
#[tokio::test]
async fn test_provider_list_table_names() -> Result<()> {
- let fixture = set_test_fixture("test_provider_list_table_names").await;
+ let fixture = get_test_fixture().await;
+ let namespace =
NamespaceIdent::new("test_provider_list_table_names".to_string());
+ set_test_namespace(&fixture.hms_catalog, &namespace).await?;
- let namespace = NamespaceIdent::new("default".to_string());
let creation = set_table_creation("s3a://warehouse/hive", "my_table")?;
-
fixture
.hms_catalog
.create_table(&namespace, creation)
.await?;
- let client = Arc::new(fixture.hms_catalog);
+ let client = Arc::new(fixture.get_catalog());
let catalog = Arc::new(IcebergCatalogProvider::try_new(client).await?);
let ctx = SessionContext::new();
ctx.register_catalog("hive", catalog);
let provider = ctx.catalog("hive").unwrap();
- let schema = provider.schema("default").unwrap();
+ let schema = provider.schema("test_provider_list_table_names").unwrap();
let expected = vec!["my_table"];
let result = schema.table_names();
@@ -173,10 +214,12 @@ async fn test_provider_list_table_names() -> Result<()> {
#[tokio::test]
async fn test_provider_list_schema_names() -> Result<()> {
- let fixture = set_test_fixture("test_provider_list_schema_names").await;
- set_table_creation("default", "my_table")?;
+ let fixture = get_test_fixture().await;
+ let namespace =
NamespaceIdent::new("test_provider_list_schema_names".to_string());
+ set_test_namespace(&fixture.hms_catalog, &namespace).await?;
- let client = Arc::new(fixture.hms_catalog);
+ set_table_creation("test_provider_list_schema_names", "my_table")?;
+ let client = Arc::new(fixture.get_catalog());
let catalog = Arc::new(IcebergCatalogProvider::try_new(client).await?);
let ctx = SessionContext::new();
@@ -184,10 +227,11 @@ async fn test_provider_list_schema_names() -> Result<()> {
let provider = ctx.catalog("hive").unwrap();
- let expected = vec!["default"];
+ let expected = ["default", "test_provider_list_schema_names"];
let result = provider.schema_names();
- assert_eq!(result, expected);
-
+ assert!(expected
+ .iter()
+ .all(|item| result.contains(&item.to_string())));
Ok(())
}