CTTY commented on code in PR #2131:
URL: https://github.com/apache/iceberg-rust/pull/2131#discussion_r2790032899


##########
crates/iceberg/src/catalog/memory/catalog.rs:
##########
@@ -305,8 +305,8 @@ impl Catalog for MemoryCatalog {
     async fn drop_table(&self, table_ident: &TableIdent) -> Result<()> {
         let mut root_namespace_state = self.root_namespace_state.lock().await;
 
-        let metadata_location = 
root_namespace_state.remove_existing_table(table_ident)?;
-        self.file_io.delete(&metadata_location).await
+        root_namespace_state.remove_existing_table(table_ident)?;
+        Ok(())

Review Comment:
   I just realized we don't have a catalog API to purge table in iceberg-rust?
   
   Probably need to add this API to better define the expected behavior



##########
crates/catalog/glue/tests/glue_catalog_test.rs:
##########
@@ -116,353 +113,6 @@ fn set_table_creation(location: Option<String>, name: 
impl ToString) -> Result<T
     Ok(builder.build())
 }
 
-#[tokio::test]
-async fn test_rename_table() -> Result<()> {
-    let catalog = get_catalog().await;
-    let creation = set_table_creation(None, "my_table")?;
-    // Use unique namespace to avoid conflicts
-    let namespace = 
Namespace::new(NamespaceIdent::new(normalize_test_name_with_parts!(
-        "test_rename_table"
-    )));
-    cleanup_namespace(&catalog, namespace.name()).await;
-
-    catalog
-        .create_namespace(namespace.name(), HashMap::new())
-        .await?;
-
-    let table = catalog.create_table(namespace.name(), creation).await?;
-
-    let dest = TableIdent::new(namespace.name().clone(), 
"my_table_rename".to_string());
-
-    catalog.rename_table(table.identifier(), &dest).await?;
-
-    let table = catalog.load_table(&dest).await?;
-    assert_eq!(table.identifier(), &dest);
-
-    let src = TableIdent::new(namespace.name().clone(), 
"my_table".to_string());
-
-    let src_table_exists = catalog.table_exists(&src).await?;
-    assert!(!src_table_exists);
-
-    Ok(())
-}
-
-#[tokio::test]
-async fn test_table_exists() -> Result<()> {
-    let catalog = get_catalog().await;
-    let creation = set_table_creation(None, "my_table")?;
-    // Use unique namespace to avoid conflicts
-    let namespace = 
Namespace::new(NamespaceIdent::new(normalize_test_name_with_parts!(
-        "test_table_exists"
-    )));
-    cleanup_namespace(&catalog, namespace.name()).await;
-
-    catalog
-        .create_namespace(namespace.name(), HashMap::new())
-        .await?;
-
-    let ident = TableIdent::new(namespace.name().clone(), 
"my_table".to_string());
-
-    let exists = catalog.table_exists(&ident).await?;
-    assert!(!exists);
-
-    let table = catalog.create_table(namespace.name(), creation).await?;
-
-    let exists = catalog.table_exists(table.identifier()).await?;
-
-    assert!(exists);
-
-    Ok(())
-}
-
-#[tokio::test]
-async fn test_drop_table() -> Result<()> {
-    let catalog = get_catalog().await;
-    let creation = set_table_creation(None, "my_table")?;
-    // Use unique namespace to avoid conflicts
-    let namespace = 
Namespace::new(NamespaceIdent::new(normalize_test_name_with_parts!(
-        "test_drop_table"
-    )));
-    cleanup_namespace(&catalog, namespace.name()).await;
-
-    catalog
-        .create_namespace(namespace.name(), HashMap::new())
-        .await?;
-
-    let table = catalog.create_table(namespace.name(), creation).await?;
-
-    catalog.drop_table(table.identifier()).await?;
-
-    let result = catalog.table_exists(table.identifier()).await?;
-
-    assert!(!result);
-
-    Ok(())
-}
-
-#[tokio::test]
-async fn test_load_table() -> Result<()> {
-    let catalog = get_catalog().await;
-    let creation = set_table_creation(None, "my_table")?;
-    // Use unique namespace to avoid conflicts
-    let namespace = 
Namespace::new(NamespaceIdent::new(normalize_test_name_with_parts!(
-        "test_load_table"
-    )));
-    cleanup_namespace(&catalog, namespace.name()).await;
-
-    catalog
-        .create_namespace(namespace.name(), HashMap::new())
-        .await?;
-
-    let expected = catalog.create_table(namespace.name(), creation).await?;
-
-    let result = catalog
-        .load_table(&TableIdent::new(
-            namespace.name().clone(),
-            "my_table".to_string(),
-        ))
-        .await?;
-
-    assert_eq!(result.identifier(), expected.identifier());
-    assert_eq!(result.metadata_location(), expected.metadata_location());
-    assert_eq!(result.metadata(), expected.metadata());
-
-    Ok(())
-}
-
-#[tokio::test]
-async fn test_create_table() -> Result<()> {
-    let catalog = get_catalog().await;
-    // Use unique namespace to avoid conflicts
-    let namespace = 
NamespaceIdent::new(normalize_test_name_with_parts!("test_create_table"));
-    cleanup_namespace(&catalog, &namespace).await;
-    set_test_namespace(&catalog, &namespace).await?;
-    // inject custom location, ignore the namespace prefix
-    let creation = set_table_creation(Some("s3a://warehouse/hive".into()), 
"my_table")?;
-    let result = catalog.create_table(&namespace, creation).await?;
-
-    assert_eq!(result.identifier().name(), "my_table");
-    assert!(
-        result
-            .metadata_location()
-            .is_some_and(|location| 
location.starts_with("s3a://warehouse/hive/metadata/00000-"))
-    );
-    assert!(
-        catalog
-            .file_io()
-            .exists("s3a://warehouse/hive/metadata/")
-            .await?
-    );
-
-    Ok(())
-}
-
-#[tokio::test]
-async fn test_list_tables() -> Result<()> {
-    let catalog = get_catalog().await;
-    // Use unique namespace to avoid conflicts
-    let namespace = 
NamespaceIdent::new(normalize_test_name_with_parts!("test_list_tables"));
-    cleanup_namespace(&catalog, &namespace).await;
-    set_test_namespace(&catalog, &namespace).await?;
-
-    let expected = vec![];
-    let result = catalog.list_tables(&namespace).await?;
-
-    assert_eq!(result, expected);
-
-    Ok(())
-}
-
-#[tokio::test]
-async fn test_drop_namespace() -> Result<()> {
-    let catalog = get_catalog().await;
-    // Use unique namespace to avoid conflicts
-    let namespace = 
NamespaceIdent::new(normalize_test_name_with_parts!("test_drop_namespace"));
-    cleanup_namespace(&catalog, &namespace).await;
-    set_test_namespace(&catalog, &namespace).await?;
-
-    let exists = catalog.namespace_exists(&namespace).await?;
-    assert!(exists);
-
-    catalog.drop_namespace(&namespace).await?;
-
-    let exists = catalog.namespace_exists(&namespace).await?;
-    assert!(!exists);
-
-    Ok(())
-}
-
-#[tokio::test]
-async fn test_update_namespace() -> Result<()> {
-    let catalog = get_catalog().await;
-    // Use unique namespace to avoid conflicts
-    let namespace = 
NamespaceIdent::new(normalize_test_name_with_parts!("test_update_namespace"));
-    cleanup_namespace(&catalog, &namespace).await;
-    set_test_namespace(&catalog, &namespace).await?;
-
-    let before_update = catalog.get_namespace(&namespace).await?;
-    let before_update = before_update.properties().get("description");
-
-    assert_eq!(before_update, None);
-
-    let properties = HashMap::from([("description".to_string(), 
"my_update".to_string())]);
-
-    catalog.update_namespace(&namespace, properties).await?;
-
-    let after_update = catalog.get_namespace(&namespace).await?;
-    let after_update = after_update.properties().get("description");
-
-    assert_eq!(after_update, Some("my_update".to_string()).as_ref());
-
-    Ok(())
-}
-
-#[tokio::test]
-async fn test_namespace_exists() -> Result<()> {
-    let catalog = get_catalog().await;
-
-    // Use unique namespace to avoid conflicts
-    let namespace = 
NamespaceIdent::new(normalize_test_name_with_parts!("test_namespace_exists"));
-    cleanup_namespace(&catalog, &namespace).await;
-
-    let exists = catalog.namespace_exists(&namespace).await?;
-    assert!(!exists);
-
-    set_test_namespace(&catalog, &namespace).await?;
-
-    let exists = catalog.namespace_exists(&namespace).await?;
-    assert!(exists);
-
-    Ok(())
-}
-
-#[tokio::test]
-async fn test_get_namespace() -> Result<()> {
-    let catalog = get_catalog().await;
-
-    // Use unique namespace to avoid conflicts
-    let namespace = 
NamespaceIdent::new(normalize_test_name_with_parts!("test_get_namespace"));
-    cleanup_namespace(&catalog, &namespace).await;
-
-    let does_not_exist = catalog.get_namespace(&namespace).await;
-    assert!(does_not_exist.is_err());
-
-    set_test_namespace(&catalog, &namespace).await?;
-
-    let result = catalog.get_namespace(&namespace).await?;
-    let expected = Namespace::new(namespace);
-
-    assert_eq!(result, expected);
-
-    Ok(())
-}
-
-#[tokio::test]
-async fn test_create_namespace() -> Result<()> {
-    let catalog = get_catalog().await;
-
-    let properties = HashMap::new();
-    // Use unique namespace to avoid conflicts
-    let namespace = 
NamespaceIdent::new(normalize_test_name_with_parts!("test_create_namespace"));
-    cleanup_namespace(&catalog, &namespace).await;
-
-    let expected = Namespace::new(namespace.clone());
-
-    let result = catalog.create_namespace(&namespace, properties).await?;
-
-    assert_eq!(result, expected);
-
-    Ok(())
-}
-
-#[tokio::test]
-async fn test_list_namespace() -> Result<()> {
-    let catalog = get_catalog().await;
-
-    // Use unique namespace to avoid conflicts
-    let namespace = 
NamespaceIdent::new(normalize_test_name_with_parts!("test_list_namespace"));
-    cleanup_namespace(&catalog, &namespace).await;
-    set_test_namespace(&catalog, &namespace).await?;
-
-    let result = catalog.list_namespaces(None).await?;
-    assert!(result.contains(&namespace));
-
-    let empty_result = catalog.list_namespaces(Some(&namespace)).await?;
-    assert!(empty_result.is_empty());
-
-    Ok(())
-}
-
-#[tokio::test]
-async fn test_update_table() -> Result<()> {
-    let catalog = get_catalog().await;
-    let creation = set_table_creation(None, "my_table")?;
-    // Use unique namespace to avoid conflicts
-    let namespace = 
Namespace::new(NamespaceIdent::new(normalize_test_name_with_parts!(
-        "test_update_table"
-    )));
-    cleanup_namespace(&catalog, namespace.name()).await;
-
-    catalog
-        .create_namespace(namespace.name(), HashMap::new())
-        .await?;
-
-    let expected = catalog.create_table(namespace.name(), creation).await?;
-
-    let table = catalog
-        .load_table(&TableIdent::new(
-            namespace.name().clone(),
-            "my_table".to_string(),
-        ))
-        .await?;
-
-    assert_eq!(table.identifier(), expected.identifier());
-    assert_eq!(table.metadata_location(), expected.metadata_location());
-    assert_eq!(table.metadata(), expected.metadata());
-
-    // Store the original metadata location for comparison
-    let original_metadata_location = table.metadata_location();
-
-    // Update table properties using the transaction
-    let tx = Transaction::new(&table);
-    let tx = tx
-        .update_table_properties()
-        .set("test_property".to_string(), "test_value".to_string())
-        .apply(tx)?;
-
-    // Commit the transaction to the catalog
-    let updated_table = tx.commit(&catalog).await?;
-
-    // Verify the update was successful
-    assert_eq!(
-        updated_table.metadata().properties().get("test_property"),
-        Some(&"test_value".to_string())
-    );
-
-    // Verify the metadata location has been updated
-    assert_ne!(
-        updated_table.metadata_location(),
-        original_metadata_location,
-        "Metadata location should be updated after commit"
-    );
-
-    // Load the table again from the catalog to verify changes were persisted
-    let reloaded_table = catalog.load_table(table.identifier()).await?;
-
-    // Verify the reloaded table matches the updated table
-    assert_eq!(
-        reloaded_table.metadata().properties().get("test_property"),
-        Some(&"test_value".to_string())
-    );
-    assert_eq!(
-        reloaded_table.metadata_location(),
-        updated_table.metadata_location(),
-        "Reloaded table should have the same metadata location as the updated 
table"
-    );
-
-    Ok(())
-}
-
 #[tokio::test]
 async fn test_register_table() -> Result<()> {

Review Comment:
   I think we can remove this? This is already covered by `table_register_suite`



##########
crates/catalog/rest/tests/rest_catalog_test.rs:
##########
@@ -70,389 +69,6 @@ async fn get_catalog() -> RestCatalog {
         .unwrap()
 }
 
-#[tokio::test]
-async fn test_get_non_exist_namespace() {
-    let catalog = get_catalog().await;
-
-    // Use unique namespace name to ensure it doesn't exist
-    let ns_ident = NamespaceIdent::new(normalize_test_name_with_parts!(
-        "test_get_non_exist_namespace"
-    ));
-    // Clean up from any previous test runs
-    cleanup_namespace(&catalog, &ns_ident).await;
-
-    let result = catalog.get_namespace(&ns_ident).await;
-
-    assert!(result.is_err());
-    assert!(result.unwrap_err().to_string().contains("does not exist"));
-}
-
-#[tokio::test]
-async fn test_get_namespace() {
-    let catalog = get_catalog().await;
-
-    // Use unique namespace to avoid conflicts with other tests
-    let ns = Namespace::with_properties(
-        NamespaceIdent::from_strs([
-            "apple",
-            "ios",
-            &normalize_test_name_with_parts!("test_get_namespace"),
-        ])
-        .unwrap(),
-        HashMap::from([
-            ("owner".to_string(), "ray".to_string()),
-            ("community".to_string(), "apache".to_string()),
-        ]),
-    );
-
-    // Clean up from any previous test runs
-    cleanup_namespace(&catalog, ns.name()).await;
-
-    // Verify that namespace doesn't exist
-    assert!(catalog.get_namespace(ns.name()).await.is_err());
-
-    // Create this namespace
-    let created_ns = catalog
-        .create_namespace(ns.name(), ns.properties().clone())
-        .await
-        .unwrap();
-
-    assert_eq!(ns.name(), created_ns.name());
-    assert_map_contains(ns.properties(), created_ns.properties());
-
-    // Check that this namespace already exists
-    let get_ns = catalog.get_namespace(ns.name()).await.unwrap();
-    assert_eq!(ns.name(), get_ns.name());
-    assert_map_contains(ns.properties(), created_ns.properties());
-}
-
-#[tokio::test]
-async fn test_list_namespace() {
-    let catalog = get_catalog().await;
-
-    // Use unique parent namespace to avoid conflicts
-    let parent_ns_name = 
normalize_test_name_with_parts!("test_list_namespace");
-    let parent_ident = NamespaceIdent::from_strs([&parent_ns_name]).unwrap();
-
-    let ns1 = Namespace::with_properties(
-        NamespaceIdent::from_strs([&parent_ns_name, "ios"]).unwrap(),
-        HashMap::from([
-            ("owner".to_string(), "ray".to_string()),
-            ("community".to_string(), "apache".to_string()),
-        ]),
-    );
-
-    let ns2 = Namespace::with_properties(
-        NamespaceIdent::from_strs([&parent_ns_name, "macos"]).unwrap(),
-        HashMap::from([
-            ("owner".to_string(), "xuanwo".to_string()),
-            ("community".to_string(), "apache".to_string()),
-        ]),
-    );
-
-    // Clean up from any previous test runs
-    cleanup_namespace(&catalog, ns1.name()).await;
-    cleanup_namespace(&catalog, ns2.name()).await;
-    cleanup_namespace(&catalog, &parent_ident).await;
-
-    // Currently this namespace doesn't exist, so it should return error.
-    assert!(catalog.list_namespaces(Some(&parent_ident)).await.is_err());
-
-    // Create namespaces
-    catalog
-        .create_namespace(ns1.name(), ns1.properties().clone())
-        .await
-        .unwrap();
-    catalog
-        .create_namespace(ns2.name(), ns1.properties().clone())
-        .await
-        .unwrap();
-
-    // List namespace
-    let nss = catalog.list_namespaces(Some(&parent_ident)).await.unwrap();
-
-    assert!(nss.contains(ns1.name()));
-    assert!(nss.contains(ns2.name()));
-}
-
-#[tokio::test]
-async fn test_list_empty_namespace() {
-    let catalog = get_catalog().await;
-
-    // Use unique namespace to avoid conflicts
-    let ns_apple = Namespace::with_properties(
-        NamespaceIdent::from_strs([
-            "list_empty",
-            "apple",
-            &normalize_test_name_with_parts!("test_list_empty_namespace"),
-        ])
-        .unwrap(),
-        HashMap::from([
-            ("owner".to_string(), "ray".to_string()),
-            ("community".to_string(), "apache".to_string()),
-        ]),
-    );
-
-    // Clean up from any previous test runs
-    cleanup_namespace(&catalog, ns_apple.name()).await;
-
-    // Currently this namespace doesn't exist, so it should return error.
-    assert!(
-        catalog
-            .list_namespaces(Some(ns_apple.name()))
-            .await
-            .is_err()
-    );
-
-    // Create namespaces
-    catalog
-        .create_namespace(ns_apple.name(), ns_apple.properties().clone())
-        .await
-        .unwrap();
-
-    // List namespace
-    let nss = catalog
-        .list_namespaces(Some(ns_apple.name()))
-        .await
-        .unwrap();
-    assert!(nss.is_empty());
-}
-
-#[tokio::test]
-async fn test_list_root_namespace() {
-    let catalog = get_catalog().await;
-
-    // Use unique root namespace to avoid conflicts
-    let root_ns_name = 
normalize_test_name_with_parts!("test_list_root_namespace");
-    let root_ident = NamespaceIdent::from_strs([&root_ns_name]).unwrap();
-
-    let ns1 = Namespace::with_properties(
-        NamespaceIdent::from_strs([&root_ns_name, "apple", "ios"]).unwrap(),
-        HashMap::from([
-            ("owner".to_string(), "ray".to_string()),
-            ("community".to_string(), "apache".to_string()),
-        ]),
-    );
-
-    let ns2 = Namespace::with_properties(
-        NamespaceIdent::from_strs([&root_ns_name, "google", 
"android"]).unwrap(),
-        HashMap::from([
-            ("owner".to_string(), "xuanwo".to_string()),
-            ("community".to_string(), "apache".to_string()),
-        ]),
-    );
-
-    // Clean up from any previous test runs
-    cleanup_namespace(&catalog, ns1.name()).await;
-    cleanup_namespace(&catalog, ns2.name()).await;
-    cleanup_namespace(&catalog, &root_ident).await;
-
-    // Currently this namespace doesn't exist, so it should return error.
-    assert!(catalog.list_namespaces(Some(&root_ident)).await.is_err());
-
-    // Create namespaces
-    catalog
-        .create_namespace(ns1.name(), ns1.properties().clone())
-        .await
-        .unwrap();
-    catalog
-        .create_namespace(ns2.name(), ns1.properties().clone())
-        .await
-        .unwrap();
-
-    // List namespace
-    let nss = catalog.list_namespaces(None).await.unwrap();
-    assert!(nss.contains(&root_ident));
-}
-
-#[tokio::test]
-async fn test_create_table() {
-    let catalog = get_catalog().await;
-
-    // Use unique namespace to avoid conflicts
-    let ns = Namespace::with_properties(
-        NamespaceIdent::from_strs([
-            "create_table",
-            "apple",
-            "ios",
-            &normalize_test_name_with_parts!("test_create_table"),
-        ])
-        .unwrap(),
-        HashMap::from([
-            ("owner".to_string(), "ray".to_string()),
-            ("community".to_string(), "apache".to_string()),
-        ]),
-    );
-
-    // Clean up from any previous test runs
-    cleanup_namespace(&catalog, ns.name()).await;
-
-    // Create namespaces
-    catalog
-        .create_namespace(ns.name(), ns.properties().clone())
-        .await
-        .unwrap();
-
-    let schema = Schema::builder()
-        .with_schema_id(1)
-        .with_identifier_field_ids(vec![2])
-        .with_fields(vec![
-            NestedField::optional(1, "foo", 
Type::Primitive(PrimitiveType::String)).into(),
-            NestedField::required(2, "bar", 
Type::Primitive(PrimitiveType::Int)).into(),
-            NestedField::optional(3, "baz", 
Type::Primitive(PrimitiveType::Boolean)).into(),
-        ])
-        .build()
-        .unwrap();
-
-    let table_creation = TableCreation::builder()
-        .name("t1".to_string())
-        .schema(schema.clone())
-        .build();
-
-    let table = catalog
-        .create_table(ns.name(), table_creation)
-        .await
-        .unwrap();
-
-    assert_eq!(
-        table.identifier(),
-        &TableIdent::new(ns.name().clone(), "t1".to_string())
-    );
-
-    assert_eq!(
-        table.metadata().current_schema().as_struct(),
-        schema.as_struct()
-    );
-    assert_eq!(table.metadata().format_version(), FormatVersion::V2);
-    assert!(table.metadata().current_snapshot().is_none());
-    assert!(table.metadata().history().is_empty());
-    assert!(table.metadata().default_sort_order().is_unsorted());
-    assert!(table.metadata().default_partition_spec().is_unpartitioned());
-}
-
-#[tokio::test]
-async fn test_update_table() {
-    let catalog = get_catalog().await;
-
-    // Use unique namespace to avoid conflicts
-    let ns = Namespace::with_properties(
-        NamespaceIdent::from_strs([
-            "update_table",
-            "apple",
-            "ios",
-            &normalize_test_name_with_parts!("test_update_table"),
-        ])
-        .unwrap(),
-        HashMap::from([
-            ("owner".to_string(), "ray".to_string()),
-            ("community".to_string(), "apache".to_string()),
-        ]),
-    );
-
-    // Clean up from any previous test runs
-    cleanup_namespace(&catalog, ns.name()).await;
-
-    // Create namespaces
-    catalog
-        .create_namespace(ns.name(), ns.properties().clone())
-        .await
-        .unwrap();
-
-    let schema = Schema::builder()
-        .with_schema_id(1)
-        .with_identifier_field_ids(vec![2])
-        .with_fields(vec![
-            NestedField::optional(1, "foo", 
Type::Primitive(PrimitiveType::String)).into(),
-            NestedField::required(2, "bar", 
Type::Primitive(PrimitiveType::Int)).into(),
-            NestedField::optional(3, "baz", 
Type::Primitive(PrimitiveType::Boolean)).into(),
-        ])
-        .build()
-        .unwrap();
-
-    // Now we create a table
-    let table_creation = TableCreation::builder()
-        .name("t1".to_string())
-        .schema(schema.clone())
-        .build();
-
-    let table = catalog
-        .create_table(ns.name(), table_creation)
-        .await
-        .unwrap();
-
-    assert_eq!(
-        table.identifier(),
-        &TableIdent::new(ns.name().clone(), "t1".to_string())
-    );
-
-    let tx = Transaction::new(&table);
-    // Update table by committing transaction
-    let table2 = tx
-        .update_table_properties()
-        .set("prop1".to_string(), "v1".to_string())
-        .apply(tx)
-        .unwrap()
-        .commit(&catalog)
-        .await
-        .unwrap();
-
-    assert_map_contains(
-        &HashMap::from([("prop1".to_string(), "v1".to_string())]),
-        table2.metadata().properties(),
-    );
-}
-
-fn assert_map_contains(map1: &HashMap<String, String>, map2: &HashMap<String, 
String>) {
-    for (k, v) in map1 {
-        assert!(map2.contains_key(k));
-        assert_eq!(map2.get(k).unwrap(), v);
-    }
-}
-
-#[tokio::test]
-async fn test_list_empty_multi_level_namespace() {
-    let catalog = get_catalog().await;
-
-    // Use unique namespace to avoid conflicts
-    let ns_apple = Namespace::with_properties(
-        NamespaceIdent::from_strs([
-            "multi_level",
-            "a_a",
-            "apple",
-            
&normalize_test_name_with_parts!("test_list_empty_multi_level_namespace"),
-        ])
-        .unwrap(),
-        HashMap::from([
-            ("owner".to_string(), "ray".to_string()),
-            ("community".to_string(), "apache".to_string()),
-        ]),
-    );
-
-    // Clean up from any previous test runs
-    cleanup_namespace(&catalog, ns_apple.name()).await;
-
-    // Currently this namespace doesn't exist, so it should return error.
-    assert!(
-        catalog
-            .list_namespaces(Some(ns_apple.name()))
-            .await
-            .is_err()
-    );
-
-    // Create namespaces
-    catalog
-        .create_namespace(ns_apple.name(), ns_apple.properties().clone())
-        .await
-        .unwrap();
-
-    // List namespace
-    let nss = catalog
-        .list_namespaces(Some(ns_apple.name()))
-        .await
-        .unwrap();
-    assert!(nss.is_empty());
-}
-
 #[tokio::test]
 async fn test_register_table() {

Review Comment:
   Same here, this is already covered by `table_register_suite`



##########
crates/catalog/loader/tests/namespace_suite.rs:
##########
@@ -0,0 +1,358 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+//! Common namespace behavior across catalogs.
+//!
+//! These tests assume Docker containers are started externally via `make 
docker-up`.
+
+mod common;
+
+use std::collections::HashMap;
+
+use common::{CatalogKind, assert_map_contains, cleanup_namespace_dyn, 
load_catalog};
+use iceberg::{ErrorKind, NamespaceIdent, Result};
+use iceberg_test_utils::normalize_test_name_with_parts;
+use rstest::rstest;
+
+// Common behavior: querying a missing namespace should error.
+#[rstest]
+#[case::rest_catalog(CatalogKind::Rest)]
+#[case::glue_catalog(CatalogKind::Glue)]
+#[case::hms_catalog(CatalogKind::Hms)]
+#[case::sql_catalog(CatalogKind::Sql)]
+#[case::s3tables_catalog(CatalogKind::S3Tables)]
+#[case::memory_catalog(CatalogKind::Memory)]
+#[tokio::test]
+async fn test_catalog_namespace_missing_returns_error(#[case] kind: 
CatalogKind) -> Result<()> {
+    let Some(harness) = load_catalog(kind).await else {
+        return Ok(());
+    };
+    let catalog = harness.catalog;
+    let namespace = NamespaceIdent::new(normalize_test_name_with_parts!(
+        "catalog_namespace_missing_returns_error",
+        harness.label
+    ));
+
+    cleanup_namespace_dyn(catalog.as_ref(), &namespace).await;
+
+    assert!(catalog.get_namespace(&namespace).await.is_err());
+
+    Ok(())
+}
+
+// Common behavior: namespace lifecycle CRUD.
+#[rstest]
+#[case::rest_catalog(CatalogKind::Rest)]
+#[case::glue_catalog(CatalogKind::Glue)]
+#[case::hms_catalog(CatalogKind::Hms)]
+#[case::sql_catalog(CatalogKind::Sql)]
+#[case::s3tables_catalog(CatalogKind::S3Tables)]
+#[case::memory_catalog(CatalogKind::Memory)]
+#[tokio::test]
+async fn test_catalog_namespace_lifecycle(#[case] kind: CatalogKind) -> 
Result<()> {
+    let Some(harness) = load_catalog(kind).await else {
+        return Ok(());
+    };
+    let catalog = harness.catalog;
+    let namespace = NamespaceIdent::new(normalize_test_name_with_parts!(
+        "catalog_namespace_lifecycle",
+        harness.label
+    ));
+
+    cleanup_namespace_dyn(catalog.as_ref(), &namespace).await;
+
+    assert!(!catalog.namespace_exists(&namespace).await?);
+
+    let props = HashMap::from([
+        ("owner".to_string(), "rust".to_string()),
+        ("purpose".to_string(), "catalog_suite".to_string()),
+    ]);
+    let created = catalog.create_namespace(&namespace, props.clone()).await?;
+    assert_eq!(created.name(), &namespace);
+    assert_map_contains(&props, created.properties());
+
+    let fetched = catalog.get_namespace(&namespace).await?;
+    assert_eq!(fetched.name(), &namespace);
+    assert_map_contains(&props, fetched.properties());
+
+    let namespaces = catalog.list_namespaces(None).await?;
+    assert!(namespaces.contains(&namespace));
+
+    catalog.drop_namespace(&namespace).await?;
+    assert!(!catalog.namespace_exists(&namespace).await?);
+
+    Ok(())
+}
+
+// Common behavior: update_namespace persists changes when supported.
+#[rstest]
+#[case::glue_catalog(CatalogKind::Glue)]
+#[case::hms_catalog(CatalogKind::Hms)]
+#[case::sql_catalog(CatalogKind::Sql)]
+#[case::s3tables_catalog(CatalogKind::S3Tables)]
+#[case::memory_catalog(CatalogKind::Memory)]
+#[tokio::test]
+async fn test_catalog_update_namespace_supported(#[case] kind: CatalogKind) -> 
Result<()> {
+    let Some(harness) = load_catalog(kind).await else {
+        return Ok(());
+    };
+    let catalog = harness.catalog;
+    let namespace = NamespaceIdent::new(normalize_test_name_with_parts!(
+        "catalog_update_namespace_supported",
+        harness.label
+    ));
+
+    cleanup_namespace_dyn(catalog.as_ref(), &namespace).await;
+    catalog.create_namespace(&namespace, HashMap::new()).await?;
+
+    let updated_props = HashMap::from([("owner".to_string(), 
"updated".to_string())]);
+    catalog
+        .update_namespace(&namespace, updated_props.clone())
+        .await?;
+
+    let updated = catalog.get_namespace(&namespace).await?;
+    assert_map_contains(&updated_props, updated.properties());
+
+    Ok(())
+}
+
+// Common behavior: update_namespace returns FeatureUnsupported when not 
implemented.
+#[rstest]
+#[case::rest_catalog(CatalogKind::Rest)]
+#[tokio::test]
+async fn test_catalog_update_namespace_unsupported(#[case] kind: CatalogKind) 
-> Result<()> {
+    let Some(harness) = load_catalog(kind).await else {
+        return Ok(());
+    };
+    let catalog = harness.catalog;
+    let namespace = NamespaceIdent::new(normalize_test_name_with_parts!(
+        "catalog_update_namespace_unsupported",
+        harness.label
+    ));
+
+    cleanup_namespace_dyn(catalog.as_ref(), &namespace).await;
+    catalog.create_namespace(&namespace, HashMap::new()).await?;
+
+    let err = catalog
+        .update_namespace(
+            &namespace,
+            HashMap::from([("key".to_string(), "value".to_string())]),
+        )
+        .await
+        .unwrap_err();
+    assert_eq!(err.kind(), ErrorKind::FeatureUnsupported);
+
+    Ok(())
+}
+
+// Common behavior: listing namespaces under a parent returns its children.
+#[rstest]
+#[case::rest_catalog(CatalogKind::Rest)]
+#[case::sql_catalog(CatalogKind::Sql)]
+#[case::s3tables_catalog(CatalogKind::S3Tables)]
+#[case::memory_catalog(CatalogKind::Memory)]
+#[tokio::test]
+async fn test_catalog_namespace_listing_with_parent(#[case] kind: CatalogKind) 
-> Result<()> {
+    let Some(harness) = load_catalog(kind).await else {
+        return Ok(());
+    };
+    let catalog = harness.catalog;
+    let parent_name =
+        
normalize_test_name_with_parts!("catalog_namespace_listing_with_parent", 
harness.label);
+    let parent = NamespaceIdent::new(parent_name.clone());
+    let child1 = NamespaceIdent::from_strs([&parent_name, "child1"]).unwrap();
+    let child2 = NamespaceIdent::from_strs([&parent_name, "child2"]).unwrap();
+
+    cleanup_namespace_dyn(catalog.as_ref(), &child1).await;
+    cleanup_namespace_dyn(catalog.as_ref(), &child2).await;
+    cleanup_namespace_dyn(catalog.as_ref(), &parent).await;
+
+    catalog.create_namespace(&parent, HashMap::new()).await?;
+
+    catalog.create_namespace(&child1, HashMap::new()).await?;
+    catalog.create_namespace(&child2, HashMap::new()).await?;
+
+    let top_level = catalog.list_namespaces(None).await?;
+    assert!(top_level.contains(&parent));
+
+    let children = catalog.list_namespaces(Some(&parent)).await?;
+    assert!(children.contains(&child1));
+    assert!(children.contains(&child2));
+
+    Ok(())
+}
+
+// Common behavior: hierarchical namespaces are rejected when unsupported.
+#[rstest]
+#[case::glue_catalog(CatalogKind::Glue)]
+#[case::hms_catalog(CatalogKind::Hms)]
+#[tokio::test]
+async fn test_catalog_namespace_listing_with_parent_unsupported(
+    #[case] kind: CatalogKind,
+) -> Result<()> {
+    let Some(harness) = load_catalog(kind).await else {
+        return Ok(());
+    };
+    let catalog = harness.catalog;
+    let parent_name = normalize_test_name_with_parts!(
+        "catalog_namespace_listing_with_parent_unsupported",
+        harness.label
+    );
+    let parent = NamespaceIdent::new(parent_name.clone());
+    let child = NamespaceIdent::from_strs([&parent_name, "child"]).unwrap();
+
+    cleanup_namespace_dyn(catalog.as_ref(), &child).await;
+    cleanup_namespace_dyn(catalog.as_ref(), &parent).await;
+
+    catalog.create_namespace(&parent, HashMap::new()).await?;
+
+    let err = catalog
+        .create_namespace(&child, HashMap::new())
+        .await
+        .unwrap_err();
+    assert_eq!(err.kind(), ErrorKind::DataInvalid);
+
+    Ok(())
+}
+
+// Common behavior: listing top-level namespaces includes created namespaces.
+#[rstest]
+#[case::rest_catalog(CatalogKind::Rest)]
+#[case::glue_catalog(CatalogKind::Glue)]
+#[case::hms_catalog(CatalogKind::Hms)]
+#[case::sql_catalog(CatalogKind::Sql)]
+#[case::s3tables_catalog(CatalogKind::S3Tables)]
+#[case::memory_catalog(CatalogKind::Memory)]
+#[tokio::test]
+async fn test_catalog_list_namespaces_contains_created(#[case] kind: 
CatalogKind) -> Result<()> {
+    let Some(harness) = load_catalog(kind).await else {
+        return Ok(());
+    };
+    let catalog = harness.catalog;
+    let ns_one = NamespaceIdent::new(normalize_test_name_with_parts!(
+        "catalog_list_namespaces_contains_created",
+        harness.label,
+        "one"
+    ));
+    let ns_two = NamespaceIdent::new(normalize_test_name_with_parts!(
+        "catalog_list_namespaces_contains_created",
+        harness.label,
+        "two"
+    ));
+
+    cleanup_namespace_dyn(catalog.as_ref(), &ns_one).await;
+    cleanup_namespace_dyn(catalog.as_ref(), &ns_two).await;
+
+    catalog.create_namespace(&ns_one, HashMap::new()).await?;
+    catalog.create_namespace(&ns_two, HashMap::new()).await?;
+
+    let namespaces = catalog.list_namespaces(None).await?;
+    assert!(namespaces.contains(&ns_one));
+    assert!(namespaces.contains(&ns_two));
+
+    Ok(())
+}
+
+// Common behavior: creating an existing namespace should error.
+#[rstest]
+#[case::rest_catalog(CatalogKind::Rest)]
+#[case::glue_catalog(CatalogKind::Glue)]
+#[case::hms_catalog(CatalogKind::Hms)]
+#[case::sql_catalog(CatalogKind::Sql)]
+#[case::s3tables_catalog(CatalogKind::S3Tables)]
+#[case::memory_catalog(CatalogKind::Memory)]
+#[tokio::test]
+async fn test_catalog_create_namespace_duplicate_fails(#[case] kind: 
CatalogKind) -> Result<()> {
+    let Some(harness) = load_catalog(kind).await else {
+        return Ok(());
+    };
+    let catalog = harness.catalog;
+    let namespace = NamespaceIdent::new(normalize_test_name_with_parts!(
+        "catalog_create_namespace_duplicate_fails",
+        harness.label
+    ));
+
+    cleanup_namespace_dyn(catalog.as_ref(), &namespace).await;
+    catalog.create_namespace(&namespace, HashMap::new()).await?;
+
+    assert!(
+        catalog
+            .create_namespace(&namespace, HashMap::new())
+            .await
+            .is_err()

Review Comment:
   Same here, we can assert exact error type



##########
crates/catalog/hms/src/catalog.rs:
##########
@@ -520,6 +545,15 @@ impl Catalog for HmsCatalog {
     /// - Any network or communication error occurs with the database backend.
     async fn drop_table(&self, table: &TableIdent) -> Result<()> {
         let db_name = validate_namespace(table.namespace())?;
+        if !self.namespace_exists(table.namespace()).await? {
+            return Err(Error::new(
+                ErrorKind::DataInvalid,

Review Comment:
   Same here



##########
crates/catalog/hms/src/catalog.rs:
##########
@@ -372,6 +384,13 @@ impl Catalog for HmsCatalog {
     async fn drop_namespace(&self, namespace: &NamespaceIdent) -> Result<()> {
         let name = validate_namespace(namespace)?;
 
+        if !self.namespace_exists(namespace).await? {
+            return Err(Error::new(
+                ErrorKind::DataInvalid,

Review Comment:
   Same here



##########
crates/catalog/hms/src/catalog.rs:
##########
@@ -258,6 +258,12 @@ impl Catalog for HmsCatalog {
         namespace: &NamespaceIdent,
         properties: HashMap<String, String>,
     ) -> Result<Namespace> {
+        if self.namespace_exists(namespace).await? {
+            return Err(Error::new(
+                ErrorKind::DataInvalid,

Review Comment:
   This should be `ErrorKind::NamespaceAlreadyExists`



##########
crates/catalog/hms/src/catalog.rs:
##########
@@ -392,6 +411,12 @@ impl Catalog for HmsCatalog {
     /// querying the database.
     async fn list_tables(&self, namespace: &NamespaceIdent) -> 
Result<Vec<TableIdent>> {
         let name = validate_namespace(namespace)?;
+        if !self.namespace_exists(namespace).await? {
+            return Err(Error::new(
+                ErrorKind::DataInvalid,

Review Comment:
   Same here



##########
crates/catalog/hms/src/catalog.rs:
##########
@@ -341,6 +347,12 @@ impl Catalog for HmsCatalog {
         namespace: &NamespaceIdent,
         properties: HashMap<String, String>,
     ) -> Result<()> {
+        if !self.namespace_exists(namespace).await? {
+            return Err(Error::new(
+                ErrorKind::DataInvalid,

Review Comment:
   Should be `NamespaceNotFound`



##########
crates/catalog/hms/src/catalog.rs:
##########
@@ -520,6 +545,15 @@ impl Catalog for HmsCatalog {
     /// - Any network or communication error occurs with the database backend.
     async fn drop_table(&self, table: &TableIdent) -> Result<()> {
         let db_name = validate_namespace(table.namespace())?;
+        if !self.namespace_exists(table.namespace()).await? {
+            return Err(Error::new(
+                ErrorKind::DataInvalid,
+                "Namespace does not exist",
+            ));
+        }
+        if !self.table_exists(table).await? {
+            return Err(Error::new(ErrorKind::DataInvalid, "Table does not 
exist"));

Review Comment:
   This should be `TableNotFound`



##########
crates/catalog/loader/tests/namespace_suite.rs:
##########
@@ -0,0 +1,358 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+//! Common namespace behavior across catalogs.
+//!
+//! These tests assume Docker containers are started externally via `make 
docker-up`.
+
+mod common;
+
+use std::collections::HashMap;
+
+use common::{CatalogKind, assert_map_contains, cleanup_namespace_dyn, 
load_catalog};
+use iceberg::{ErrorKind, NamespaceIdent, Result};
+use iceberg_test_utils::normalize_test_name_with_parts;
+use rstest::rstest;
+
+// Common behavior: querying a missing namespace should error.
+#[rstest]
+#[case::rest_catalog(CatalogKind::Rest)]
+#[case::glue_catalog(CatalogKind::Glue)]
+#[case::hms_catalog(CatalogKind::Hms)]
+#[case::sql_catalog(CatalogKind::Sql)]
+#[case::s3tables_catalog(CatalogKind::S3Tables)]
+#[case::memory_catalog(CatalogKind::Memory)]
+#[tokio::test]
+async fn test_catalog_namespace_missing_returns_error(#[case] kind: 
CatalogKind) -> Result<()> {
+    let Some(harness) = load_catalog(kind).await else {
+        return Ok(());
+    };
+    let catalog = harness.catalog;
+    let namespace = NamespaceIdent::new(normalize_test_name_with_parts!(
+        "catalog_namespace_missing_returns_error",
+        harness.label
+    ));
+
+    cleanup_namespace_dyn(catalog.as_ref(), &namespace).await;
+
+    assert!(catalog.get_namespace(&namespace).await.is_err());

Review Comment:
   I think this is a good opportunity for us to enforce the correct error type 
across catalogs. Instead of `assert_err`, we can do something like 
`assert_eq!(err, ErrorKind::xxx);`



##########
crates/catalog/sql/src/catalog.rs:
##########
@@ -1351,20 +1351,6 @@ mod tests {
         assert_eq!(catalog2.list_namespaces(None).await.unwrap(), vec![]);
     }
 
-    #[tokio::test]
-    async fn test_list_namespaces_returns_multiple_namespaces() {
-        let warehouse_loc = temp_path();
-        let catalog = new_sql_catalog(warehouse_loc, Some("iceberg")).await;
-        let namespace_ident_1 = NamespaceIdent::new("a".into());
-        let namespace_ident_2 = NamespaceIdent::new("b".into());
-        create_namespaces(&catalog, &vec![&namespace_ident_1, 
&namespace_ident_2]).await;
-
-        assert_eq!(
-            to_set(catalog.list_namespaces(None).await.unwrap()),
-            to_set(vec![namespace_ident_1, namespace_ident_2])
-        );
-    }
-
     #[tokio::test]
     async fn test_list_namespaces_returns_only_top_level_namespaces() {

Review Comment:
   Should we migrate these tests as well?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]


Reply via email to