This is an automated email from the ASF dual-hosted git repository.

liurenjie1024 pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/iceberg-rust.git


The following commit(s) were added to refs/heads/main by this push:
     new ae6d0958 feat: SQL Catalog - Tables (#610)
ae6d0958 is described below

commit ae6d0958843e6d18be301547de31de1610a72304
Author: Callum Ryan <[email protected]>
AuthorDate: Sat Oct 12 10:17:07 2024 +0100

    feat: SQL Catalog - Tables (#610)
    
    * feat: add list/exist table + drop namespace
    
    Signed-off-by: callum-ryan <[email protected]>
    
    * feat: create table
    
    Signed-off-by: callum-ryan <[email protected]>
    
    * feat: add load table
    
    Signed-off-by: callum-ryan <[email protected]>
    
    * feat: add the rest of table ops
    
    Signed-off-by: callum-ryan <[email protected]>
    
    * fix: sort order on test
    
    Signed-off-by: callum-ryan <[email protected]>
    
    * fix: sort Cargo.toml
    
    Signed-off-by: callum-ryan <[email protected]>
    
    * fix: Adjust error message for existence of table
    
    Signed-off-by: callum-ryan <[email protected]>
    
    * fix: update_table throws Unsupported, add catalog filter to drop_nsp
    
    Signed-off-by: callum-ryan <[email protected]>
    
    ---------
    
    Signed-off-by: callum-ryan <[email protected]>
---
 crates/catalog/sql/Cargo.toml     |   2 +
 crates/catalog/sql/src/catalog.rs | 836 ++++++++++++++++++++++++++++++++++++--
 crates/catalog/sql/src/error.rs   |  16 +-
 3 files changed, 824 insertions(+), 30 deletions(-)

diff --git a/crates/catalog/sql/Cargo.toml b/crates/catalog/sql/Cargo.toml
index 4a88e75b..a5167165 100644
--- a/crates/catalog/sql/Cargo.toml
+++ b/crates/catalog/sql/Cargo.toml
@@ -31,8 +31,10 @@ keywords = ["iceberg", "sql", "catalog"]
 [dependencies]
 async-trait = { workspace = true }
 iceberg = { workspace = true }
+serde_json = { workspace = true }
 sqlx = { version = "0.8.1", features = ["any"], default-features = false }
 typed-builder = { workspace = true }
+uuid = { workspace = true, features = ["v4"] }
 
 [dev-dependencies]
 iceberg_test_utils = { path = "../../test_utils", features = ["tests"] }
diff --git a/crates/catalog/sql/src/catalog.rs 
b/crates/catalog/sql/src/catalog.rs
index c6a524ce..b7976d9d 100644
--- a/crates/catalog/sql/src/catalog.rs
+++ b/crates/catalog/sql/src/catalog.rs
@@ -20,15 +20,20 @@ use std::time::Duration;
 
 use async_trait::async_trait;
 use iceberg::io::FileIO;
+use iceberg::spec::{TableMetadata, TableMetadataBuilder};
 use iceberg::table::Table;
 use iceberg::{
-    Catalog, Error, Namespace, NamespaceIdent, Result, TableCommit, 
TableCreation, TableIdent,
+    Catalog, Error, ErrorKind, Namespace, NamespaceIdent, Result, TableCommit, 
TableCreation,
+    TableIdent,
 };
 use sqlx::any::{install_default_drivers, AnyPoolOptions, AnyQueryResult, 
AnyRow};
 use sqlx::{Any, AnyPool, Row, Transaction};
 use typed_builder::TypedBuilder;
+use uuid::Uuid;
 
-use crate::error::{from_sqlx_error, no_such_namespace_err};
+use crate::error::{
+    from_sqlx_error, no_such_namespace_err, no_such_table_err, 
table_already_exists_err,
+};
 
 static CATALOG_TABLE_NAME: &str = "iceberg_tables";
 static CATALOG_FIELD_CATALOG_NAME: &str = "catalog_name";
@@ -37,12 +42,15 @@ static CATALOG_FIELD_TABLE_NAMESPACE: &str = 
"table_namespace";
 static CATALOG_FIELD_METADATA_LOCATION_PROP: &str = "metadata_location";
 static CATALOG_FIELD_PREVIOUS_METADATA_LOCATION_PROP: &str = 
"previous_metadata_location";
 static CATALOG_FIELD_RECORD_TYPE: &str = "iceberg_type";
+static CATALOG_FIELD_TABLE_RECORD_TYPE: &str = "TABLE";
 
 static NAMESPACE_TABLE_NAME: &str = "iceberg_namespace_properties";
 static NAMESPACE_FIELD_NAME: &str = "namespace";
 static NAMESPACE_FIELD_PROPERTY_KEY: &str = "property_key";
 static NAMESPACE_FIELD_PROPERTY_VALUE: &str = "property_value";
 
+static NAMESPACE_LOCATION_PROPERTY_KEY: &str = "location";
+
 static MAX_CONNECTIONS: u32 = 10; // Default the SQL pool to 10 connections if 
not provided
 static IDLE_TIMEOUT: u64 = 10; // Default the maximum idle timeout per 
connection to 10s before it is closed
 static TEST_BEFORE_ACQUIRE: bool = true; // Default the health-check of each 
connection to enabled prior to returning
@@ -71,8 +79,8 @@ pub struct SqlCatalogConfig {
 pub struct SqlCatalog {
     name: String,
     connection: AnyPool,
-    _warehouse_location: String,
-    _fileio: FileIO,
+    warehouse_location: String,
+    fileio: FileIO,
     sql_bind_style: SqlBindStyle,
 }
 
@@ -142,8 +150,8 @@ impl SqlCatalog {
         Ok(SqlCatalog {
             name: config.name.to_owned(),
             connection: pool,
-            _warehouse_location: config.warehouse_location,
-            _fileio: config.file_io,
+            warehouse_location: config.warehouse_location,
+            fileio: config.file_io,
             sql_bind_style: config.sql_bind_style,
         })
     }
@@ -472,40 +480,298 @@ impl Catalog for SqlCatalog {
         }
     }
 
-    async fn drop_namespace(&self, _namespace: &NamespaceIdent) -> Result<()> {
-        todo!()
+    async fn drop_namespace(&self, namespace: &NamespaceIdent) -> Result<()> {
+        let exists = self.namespace_exists(namespace).await?;
+        if exists {
+            // if there are tables in the namespace, don't allow drop.
+            let tables = self.list_tables(namespace).await?;
+            if !tables.is_empty() {
+                return Err(Error::new(
+                    iceberg::ErrorKind::Unexpected,
+                    format!(
+                        "Namespace {:?} is not empty. {} tables exist.",
+                        namespace,
+                        tables.len()
+                    ),
+                ));
+            }
+
+            self.execute(
+                &format!(
+                    "DELETE FROM {NAMESPACE_TABLE_NAME}
+                     WHERE {NAMESPACE_FIELD_NAME} = ?
+                      AND {CATALOG_FIELD_CATALOG_NAME} = ?"
+                ),
+                vec![Some(&namespace.join(".")), Some(&self.name)],
+                None,
+            )
+            .await?;
+
+            Ok(())
+        } else {
+            no_such_namespace_err(namespace)
+        }
     }
 
-    async fn list_tables(&self, _namespace: &NamespaceIdent) -> 
Result<Vec<TableIdent>> {
-        todo!()
+    async fn list_tables(&self, namespace: &NamespaceIdent) -> 
Result<Vec<TableIdent>> {
+        let exists = self.namespace_exists(namespace).await?;
+        if exists {
+            let rows = self
+                .fetch_rows(
+                    &format!(
+                        "SELECT {CATALOG_FIELD_TABLE_NAME},
+                                {CATALOG_FIELD_TABLE_NAMESPACE}
+                         FROM {CATALOG_TABLE_NAME}
+                         WHERE {CATALOG_FIELD_TABLE_NAMESPACE} = ?
+                          AND {CATALOG_FIELD_CATALOG_NAME} = ?
+                          AND (
+                                {CATALOG_FIELD_RECORD_TYPE} = 
'{CATALOG_FIELD_TABLE_RECORD_TYPE}' 
+                                OR {CATALOG_FIELD_RECORD_TYPE} IS NULL
+                          )",
+                    ),
+                    vec![Some(&namespace.join(".")), Some(&self.name)],
+                )
+                .await?;
+
+            let mut tables = HashSet::<TableIdent>::with_capacity(rows.len());
+
+            for row in rows.iter() {
+                let tbl = row
+                    .try_get::<String, _>(CATALOG_FIELD_TABLE_NAME)
+                    .map_err(from_sqlx_error)?;
+                let ns_strs = row
+                    .try_get::<String, _>(CATALOG_FIELD_TABLE_NAMESPACE)
+                    .map_err(from_sqlx_error)?;
+                let ns = NamespaceIdent::from_strs(ns_strs.split("."))?;
+                tables.insert(TableIdent::new(ns, tbl));
+            }
+
+            Ok(tables.into_iter().collect::<Vec<TableIdent>>())
+        } else {
+            no_such_namespace_err(namespace)
+        }
     }
 
-    async fn table_exists(&self, _identifier: &TableIdent) -> Result<bool> {
-        todo!()
+    async fn table_exists(&self, identifier: &TableIdent) -> Result<bool> {
+        let namespace = identifier.namespace().join(".");
+        let table_name = identifier.name();
+        let table_counts = self
+            .fetch_rows(
+                &format!(
+                    "SELECT 1
+                     FROM {CATALOG_TABLE_NAME}
+                     WHERE {CATALOG_FIELD_TABLE_NAMESPACE} = ?
+                      AND {CATALOG_FIELD_CATALOG_NAME} = ?
+                      AND {CATALOG_FIELD_TABLE_NAME} = ?
+                      AND (
+                        {CATALOG_FIELD_RECORD_TYPE} = 
'{CATALOG_FIELD_TABLE_RECORD_TYPE}' 
+                        OR {CATALOG_FIELD_RECORD_TYPE} IS NULL
+                      )"
+                ),
+                vec![Some(&namespace), Some(&self.name), Some(table_name)],
+            )
+            .await?;
+
+        if !table_counts.is_empty() {
+            Ok(true)
+        } else {
+            Ok(false)
+        }
     }
 
-    async fn drop_table(&self, _identifier: &TableIdent) -> Result<()> {
-        todo!()
+    async fn drop_table(&self, identifier: &TableIdent) -> Result<()> {
+        if !self.table_exists(identifier).await? {
+            return no_such_table_err(identifier);
+        }
+
+        self.execute(
+            &format!(
+                "DELETE FROM {CATALOG_TABLE_NAME}
+                 WHERE {CATALOG_FIELD_CATALOG_NAME} = ?
+                  AND {CATALOG_FIELD_TABLE_NAMESPACE} = ?
+                  AND {CATALOG_FIELD_TABLE_NAME} = ?
+                  AND (
+                    {CATALOG_FIELD_RECORD_TYPE} = 
'{CATALOG_FIELD_TABLE_RECORD_TYPE}' 
+                    OR {CATALOG_FIELD_RECORD_TYPE} IS NULL
+                  )"
+            ),
+            vec![
+                Some(&self.name),
+                Some(identifier.name()),
+                Some(&identifier.namespace().join(".")),
+            ],
+            None,
+        )
+        .await?;
+
+        Ok(())
     }
 
-    async fn load_table(&self, _identifier: &TableIdent) -> Result<Table> {
-        todo!()
+    async fn load_table(&self, identifier: &TableIdent) -> Result<Table> {
+        if !self.table_exists(identifier).await? {
+            return no_such_table_err(identifier);
+        }
+
+        let rows = self
+            .fetch_rows(
+                &format!(
+                    "SELECT {CATALOG_FIELD_METADATA_LOCATION_PROP}
+                     FROM {CATALOG_TABLE_NAME}
+                     WHERE {CATALOG_FIELD_CATALOG_NAME} = ?
+                      AND {CATALOG_FIELD_TABLE_NAME} = ?
+                      AND {CATALOG_FIELD_TABLE_NAMESPACE} = ?
+                      AND (
+                        {CATALOG_FIELD_RECORD_TYPE} = 
'{CATALOG_FIELD_TABLE_RECORD_TYPE}' 
+                        OR {CATALOG_FIELD_RECORD_TYPE} IS NULL
+                      )"
+                ),
+                vec![
+                    Some(&self.name),
+                    Some(identifier.name()),
+                    Some(&identifier.namespace().join(".")),
+                ],
+            )
+            .await?;
+
+        if rows.is_empty() {
+            return no_such_table_err(identifier);
+        }
+
+        let row = &rows[0];
+        let tbl_metadata_location = row
+            .try_get::<String, _>(CATALOG_FIELD_METADATA_LOCATION_PROP)
+            .map_err(from_sqlx_error)?;
+
+        let file = self.fileio.new_input(&tbl_metadata_location)?;
+        let metadata_content = file.read().await?;
+        let metadata = 
serde_json::from_slice::<TableMetadata>(&metadata_content)?;
+
+        Ok(Table::builder()
+            .file_io(self.fileio.clone())
+            .identifier(identifier.clone())
+            .metadata_location(tbl_metadata_location)
+            .metadata(metadata)
+            .build()?)
     }
 
     async fn create_table(
         &self,
-        _namespace: &NamespaceIdent,
-        _creation: TableCreation,
+        namespace: &NamespaceIdent,
+        creation: TableCreation,
     ) -> Result<Table> {
-        todo!()
+        if !self.namespace_exists(namespace).await? {
+            return no_such_namespace_err(namespace);
+        }
+
+        let tbl_name = creation.name.clone();
+        let tbl_ident = TableIdent::new(namespace.clone(), tbl_name.clone());
+
+        if self.table_exists(&tbl_ident).await? {
+            return table_already_exists_err(&tbl_ident);
+        }
+
+        let (tbl_creation, location) = match creation.location.clone() {
+            Some(location) => (creation, location),
+            None => {
+                // fall back to namespace-specific location
+                // and then to warehouse location
+                let nsp_properties = 
self.get_namespace(namespace).await?.properties().clone();
+                let nsp_location = match 
nsp_properties.get(NAMESPACE_LOCATION_PROPERTY_KEY) {
+                    Some(location) => location.clone(),
+                    None => {
+                        format!(
+                            "{}/{}",
+                            self.warehouse_location.clone(),
+                            namespace.join("/")
+                        )
+                    }
+                };
+
+                let tbl_location = format!("{}/{}", nsp_location, 
tbl_ident.name());
+
+                (
+                    TableCreation {
+                        location: Some(tbl_location.clone()),
+                        ..creation
+                    },
+                    tbl_location,
+                )
+            }
+        };
+
+        let tbl_metadata = 
TableMetadataBuilder::from_table_creation(tbl_creation)?.build()?;
+        let tbl_metadata_location = format!(
+            "{}/metadata/0-{}.metadata.json",
+            location.clone(),
+            Uuid::new_v4()
+        );
+
+        let file = self.fileio.new_output(&tbl_metadata_location)?;
+        file.write(serde_json::to_vec(&tbl_metadata)?.into())
+            .await?;
+
+        self.execute(&format!(
+            "INSERT INTO {CATALOG_TABLE_NAME}
+             ({CATALOG_FIELD_CATALOG_NAME}, {CATALOG_FIELD_TABLE_NAMESPACE}, 
{CATALOG_FIELD_TABLE_NAME}, {CATALOG_FIELD_METADATA_LOCATION_PROP}, 
{CATALOG_FIELD_RECORD_TYPE})
+             VALUES (?, ?, ?, ?, ?)
+            "), vec![Some(&self.name), Some(&namespace.join(".")), 
Some(&tbl_name.clone()), Some(&tbl_metadata_location), 
Some(CATALOG_FIELD_TABLE_RECORD_TYPE)], None).await?;
+
+        Ok(Table::builder()
+            .file_io(self.fileio.clone())
+            .metadata_location(tbl_metadata_location)
+            .identifier(tbl_ident)
+            .metadata(tbl_metadata)
+            .build()?)
     }
 
-    async fn rename_table(&self, _src: &TableIdent, _dest: &TableIdent) -> 
Result<()> {
-        todo!()
+    async fn rename_table(&self, src: &TableIdent, dest: &TableIdent) -> 
Result<()> {
+        if src == dest {
+            return Ok(());
+        }
+
+        if !self.table_exists(src).await? {
+            return no_such_table_err(src);
+        }
+
+        if !self.namespace_exists(dest.namespace()).await? {
+            return no_such_namespace_err(dest.namespace());
+        }
+
+        if self.table_exists(dest).await? {
+            return table_already_exists_err(dest);
+        }
+
+        self.execute(
+            &format!(
+                "UPDATE {CATALOG_TABLE_NAME}
+                 SET {CATALOG_FIELD_TABLE_NAME} = ?, 
{CATALOG_FIELD_TABLE_NAMESPACE} = ?
+                 WHERE {CATALOG_FIELD_CATALOG_NAME} = ?
+                  AND {CATALOG_FIELD_TABLE_NAME} = ?
+                  AND {CATALOG_FIELD_TABLE_NAMESPACE} = ?
+                  AND (
+                    {CATALOG_FIELD_RECORD_TYPE} = 
'{CATALOG_FIELD_TABLE_RECORD_TYPE}'
+                    OR {CATALOG_FIELD_RECORD_TYPE} IS NULL
+                )"
+            ),
+            vec![
+                Some(dest.name()),
+                Some(&dest.namespace().join(".")),
+                Some(&self.name),
+                Some(src.name()),
+                Some(&src.namespace().join(".")),
+            ],
+            None,
+        )
+        .await?;
+
+        Ok(())
     }
 
     async fn update_table(&self, _commit: TableCommit) -> Result<Table> {
-        todo!()
+        Err(Error::new(
+            ErrorKind::FeatureUnsupported,
+            "Updating a table is not supported yet",
+        ))
     }
 }
 
@@ -515,12 +781,19 @@ mod tests {
     use std::hash::Hash;
 
     use iceberg::io::FileIOBuilder;
-    use iceberg::{Catalog, Namespace, NamespaceIdent};
+    use iceberg::spec::{NestedField, PartitionSpec, PrimitiveType, Schema, 
SortOrder, Type};
+    use iceberg::table::Table;
+    use iceberg::{Catalog, Namespace, NamespaceIdent, TableCreation, 
TableIdent};
+    use itertools::Itertools;
+    use regex::Regex;
     use sqlx::migrate::MigrateDatabase;
     use tempfile::TempDir;
 
+    use crate::catalog::NAMESPACE_LOCATION_PROPERTY_KEY;
     use crate::{SqlBindStyle, SqlCatalog, SqlCatalogConfig};
 
+    const UUID_REGEX_STR: &str = 
"[0-9a-f]{8}-[0-9a-f]{4}-[0-9a-f]{4}-[0-9a-f]{4}-[0-9a-f]{12}";
+
     fn temp_path() -> String {
         let temp_dir = TempDir::new().unwrap();
         temp_dir.path().to_str().unwrap().to_string()
@@ -562,6 +835,83 @@ mod tests {
         }
     }
 
+    fn simple_table_schema() -> Schema {
+        Schema::builder()
+            .with_fields(vec![NestedField::required(
+                1,
+                "foo",
+                Type::Primitive(PrimitiveType::Int),
+            )
+            .into()])
+            .build()
+            .unwrap()
+    }
+
+    async fn create_table<C: Catalog>(catalog: &C, table_ident: &TableIdent) {
+        let _ = catalog
+            .create_table(
+                &table_ident.namespace,
+                TableCreation::builder()
+                    .name(table_ident.name().into())
+                    .schema(simple_table_schema())
+                    .location(temp_path())
+                    .build(),
+            )
+            .await
+            .unwrap();
+    }
+
+    async fn create_tables<C: Catalog>(catalog: &C, table_idents: 
Vec<&TableIdent>) {
+        for table_ident in table_idents {
+            create_table(catalog, table_ident).await;
+        }
+    }
+
+    fn assert_table_eq(table: &Table, expected_table_ident: &TableIdent, 
expected_schema: &Schema) {
+        assert_eq!(table.identifier(), expected_table_ident);
+
+        let metadata = table.metadata();
+
+        assert_eq!(metadata.current_schema().as_ref(), expected_schema);
+
+        let expected_partition_spec = PartitionSpec::builder(expected_schema)
+            .with_spec_id(0)
+            .build()
+            .unwrap();
+
+        assert_eq!(
+            metadata
+                .partition_specs_iter()
+                .map(|p| p.as_ref())
+                .collect_vec(),
+            vec![&expected_partition_spec]
+        );
+
+        let expected_sorted_order = SortOrder::builder()
+            .with_order_id(0)
+            .with_fields(vec![])
+            .build(expected_schema)
+            .unwrap();
+
+        assert_eq!(
+            metadata
+                .sort_orders_iter()
+                .map(|s| s.as_ref())
+                .collect_vec(),
+            vec![&expected_sorted_order]
+        );
+
+        assert_eq!(metadata.properties(), &HashMap::new());
+
+        assert!(!table.readonly());
+    }
+
+    fn assert_table_metadata_location_matches(table: &Table, regex_str: &str) {
+        let actual = table.metadata_location().unwrap().to_string();
+        let regex = Regex::new(regex_str).unwrap();
+        assert!(regex.is_match(&actual))
+    }
+
     #[tokio::test]
     async fn test_initialized() {
         let warehouse_loc = temp_path();
@@ -810,7 +1160,6 @@ mod tests {
     }
 
     #[tokio::test]
-    #[ignore = "drop_namespace not implemented"]
     async fn test_drop_namespace() {
         let warehouse_loc = temp_path();
         let catalog = new_sql_catalog(warehouse_loc).await;
@@ -823,7 +1172,6 @@ mod tests {
     }
 
     #[tokio::test]
-    #[ignore = "drop_namespace not implemented"]
     async fn test_drop_nested_namespace() {
         let warehouse_loc = temp_path();
         let catalog = new_sql_catalog(warehouse_loc).await;
@@ -842,7 +1190,6 @@ mod tests {
     }
 
     #[tokio::test]
-    #[ignore = "drop_namespace not implemented"]
     async fn test_drop_deeply_nested_namespace() {
         let warehouse_loc = temp_path();
         let catalog = new_sql_catalog(warehouse_loc).await;
@@ -875,7 +1222,6 @@ mod tests {
     }
 
     #[tokio::test]
-    #[ignore = "drop_namespace not implemented"]
     async fn test_drop_namespace_throws_error_if_namespace_doesnt_exist() {
         let warehouse_loc = temp_path();
         let catalog = new_sql_catalog(warehouse_loc).await;
@@ -895,7 +1241,6 @@ mod tests {
     }
 
     #[tokio::test]
-    #[ignore = "drop_namespace not implemented"]
     async fn 
test_drop_namespace_throws_error_if_nested_namespace_doesnt_exist() {
         let warehouse_loc = temp_path();
         let catalog = new_sql_catalog(warehouse_loc).await;
@@ -917,7 +1262,6 @@ mod tests {
     }
 
     #[tokio::test]
-    #[ignore = "drop_namespace not implemented"]
     async fn 
test_dropping_a_namespace_does_not_drop_namespaces_nested_under_that_one() {
         let warehouse_loc = temp_path();
         let catalog = new_sql_catalog(warehouse_loc).await;
@@ -934,4 +1278,438 @@ mod tests {
             .await
             .unwrap());
     }
+
+    #[tokio::test]
+    async fn test_list_tables_returns_empty_vector() {
+        let warehouse_loc = temp_path();
+        let catalog = new_sql_catalog(warehouse_loc).await;
+        let namespace_ident = NamespaceIdent::new("a".into());
+        create_namespace(&catalog, &namespace_ident).await;
+
+        assert_eq!(catalog.list_tables(&namespace_ident).await.unwrap(), 
vec![]);
+    }
+
+    #[tokio::test]
+    async fn test_list_tables_throws_error_if_namespace_doesnt_exist() {
+        let warehouse_loc = temp_path();
+        let catalog = new_sql_catalog(warehouse_loc).await;
+
+        let non_existent_namespace_ident = NamespaceIdent::new("n1".into());
+
+        assert_eq!(
+            catalog
+                .list_tables(&non_existent_namespace_ident)
+                .await
+                .unwrap_err()
+                .to_string(),
+            format!(
+                "Unexpected => No such namespace: {:?}",
+                non_existent_namespace_ident
+            ),
+        );
+    }
+
+    #[tokio::test]
+    async fn test_create_table_with_location() {
+        let warehouse_loc = temp_path();
+        let catalog = new_sql_catalog(warehouse_loc.clone()).await;
+        let namespace_ident = NamespaceIdent::new("a".into());
+        create_namespace(&catalog, &namespace_ident).await;
+
+        let table_name = "abc";
+        let location = warehouse_loc.clone();
+        let table_creation = TableCreation::builder()
+            .name(table_name.into())
+            .location(location.clone())
+            .schema(simple_table_schema())
+            .build();
+
+        let expected_table_ident = TableIdent::new(namespace_ident.clone(), 
table_name.into());
+
+        assert_table_eq(
+            &catalog
+                .create_table(&namespace_ident, table_creation)
+                .await
+                .unwrap(),
+            &expected_table_ident,
+            &simple_table_schema(),
+        );
+
+        let table = catalog.load_table(&expected_table_ident).await.unwrap();
+
+        assert_table_eq(&table, &expected_table_ident, &simple_table_schema());
+
+        assert!(table
+            .metadata_location()
+            .unwrap()
+            .to_string()
+            .starts_with(&location))
+    }
+
+    #[tokio::test]
+    async fn 
test_create_table_falls_back_to_namespace_location_if_table_location_is_missing()
 {
+        let warehouse_loc = temp_path();
+        let catalog = new_sql_catalog(warehouse_loc).await;
+
+        let namespace_ident = NamespaceIdent::new("a".into());
+        let mut namespace_properties = HashMap::new();
+        let namespace_location = temp_path();
+        namespace_properties.insert(
+            NAMESPACE_LOCATION_PROPERTY_KEY.to_string(),
+            namespace_location.to_string(),
+        );
+        catalog
+            .create_namespace(&namespace_ident, namespace_properties)
+            .await
+            .unwrap();
+
+        let table_name = "tbl1";
+        let expected_table_ident = TableIdent::new(namespace_ident.clone(), 
table_name.into());
+        let expected_table_metadata_location_regex = format!(
+            "^{}/tbl1/metadata/0-{}.metadata.json$",
+            namespace_location, UUID_REGEX_STR,
+        );
+
+        let table = catalog
+            .create_table(
+                &namespace_ident,
+                TableCreation::builder()
+                    .name(table_name.into())
+                    .schema(simple_table_schema())
+                    // no location specified for table
+                    .build(),
+            )
+            .await
+            .unwrap();
+        assert_table_eq(&table, &expected_table_ident, &simple_table_schema());
+        assert_table_metadata_location_matches(&table, 
&expected_table_metadata_location_regex);
+
+        let table = catalog.load_table(&expected_table_ident).await.unwrap();
+        assert_table_eq(&table, &expected_table_ident, &simple_table_schema());
+        assert_table_metadata_location_matches(&table, 
&expected_table_metadata_location_regex);
+    }
+
+    #[tokio::test]
+    async fn 
test_create_table_in_nested_namespace_falls_back_to_nested_namespace_location_if_table_location_is_missing(
+    ) {
+        let warehouse_loc = temp_path();
+        let catalog = new_sql_catalog(warehouse_loc).await;
+
+        let namespace_ident = NamespaceIdent::new("a".into());
+        let mut namespace_properties = HashMap::new();
+        let namespace_location = temp_path();
+        namespace_properties.insert(
+            NAMESPACE_LOCATION_PROPERTY_KEY.to_string(),
+            namespace_location.to_string(),
+        );
+        catalog
+            .create_namespace(&namespace_ident, namespace_properties)
+            .await
+            .unwrap();
+
+        let nested_namespace_ident = NamespaceIdent::from_strs(vec!["a", 
"b"]).unwrap();
+        let mut nested_namespace_properties = HashMap::new();
+        let nested_namespace_location = temp_path();
+        nested_namespace_properties.insert(
+            NAMESPACE_LOCATION_PROPERTY_KEY.to_string(),
+            nested_namespace_location.to_string(),
+        );
+        catalog
+            .create_namespace(&nested_namespace_ident, 
nested_namespace_properties)
+            .await
+            .unwrap();
+
+        let table_name = "tbl1";
+        let expected_table_ident =
+            TableIdent::new(nested_namespace_ident.clone(), table_name.into());
+        let expected_table_metadata_location_regex = format!(
+            "^{}/tbl1/metadata/0-{}.metadata.json$",
+            nested_namespace_location, UUID_REGEX_STR,
+        );
+
+        let table = catalog
+            .create_table(
+                &nested_namespace_ident,
+                TableCreation::builder()
+                    .name(table_name.into())
+                    .schema(simple_table_schema())
+                    // no location specified for table
+                    .build(),
+            )
+            .await
+            .unwrap();
+        assert_table_eq(&table, &expected_table_ident, &simple_table_schema());
+        assert_table_metadata_location_matches(&table, 
&expected_table_metadata_location_regex);
+
+        let table = catalog.load_table(&expected_table_ident).await.unwrap();
+        assert_table_eq(&table, &expected_table_ident, &simple_table_schema());
+        assert_table_metadata_location_matches(&table, 
&expected_table_metadata_location_regex);
+    }
+
+    #[tokio::test]
+    async fn 
test_create_table_falls_back_to_warehouse_location_if_both_table_location_and_namespace_location_are_missing(
+    ) {
+        let warehouse_loc = temp_path();
+        let catalog = new_sql_catalog(warehouse_loc.clone()).await;
+
+        let namespace_ident = NamespaceIdent::new("a".into());
+        // note: no location specified in namespace_properties
+        let namespace_properties = HashMap::new();
+        catalog
+            .create_namespace(&namespace_ident, namespace_properties)
+            .await
+            .unwrap();
+
+        let table_name = "tbl1";
+        let expected_table_ident = TableIdent::new(namespace_ident.clone(), 
table_name.into());
+        let expected_table_metadata_location_regex = format!(
+            "^{}/a/tbl1/metadata/0-{}.metadata.json$",
+            warehouse_loc, UUID_REGEX_STR
+        );
+
+        let table = catalog
+            .create_table(
+                &namespace_ident,
+                TableCreation::builder()
+                    .name(table_name.into())
+                    .schema(simple_table_schema())
+                    // no location specified for table
+                    .build(),
+            )
+            .await
+            .unwrap();
+        assert_table_eq(&table, &expected_table_ident, &simple_table_schema());
+        assert_table_metadata_location_matches(&table, 
&expected_table_metadata_location_regex);
+
+        let table = catalog.load_table(&expected_table_ident).await.unwrap();
+        assert_table_eq(&table, &expected_table_ident, &simple_table_schema());
+        assert_table_metadata_location_matches(&table, 
&expected_table_metadata_location_regex);
+    }
+
+    #[tokio::test]
+    async fn 
test_create_table_in_nested_namespace_falls_back_to_warehouse_location_if_both_table_location_and_namespace_location_are_missing(
+    ) {
+        let warehouse_loc = temp_path();
+        let catalog = new_sql_catalog(warehouse_loc.clone()).await;
+
+        let namespace_ident = NamespaceIdent::new("a".into());
+        create_namespace(&catalog, &namespace_ident).await;
+
+        let nested_namespace_ident = NamespaceIdent::from_strs(vec!["a", 
"b"]).unwrap();
+        create_namespace(&catalog, &nested_namespace_ident).await;
+
+        let table_name = "tbl1";
+        let expected_table_ident =
+            TableIdent::new(nested_namespace_ident.clone(), table_name.into());
+        let expected_table_metadata_location_regex = format!(
+            "^{}/a/b/tbl1/metadata/0-{}.metadata.json$",
+            warehouse_loc, UUID_REGEX_STR
+        );
+
+        let table = catalog
+            .create_table(
+                &nested_namespace_ident,
+                TableCreation::builder()
+                    .name(table_name.into())
+                    .schema(simple_table_schema())
+                    // no location specified for table
+                    .build(),
+            )
+            .await
+            .unwrap();
+        assert_table_eq(&table, &expected_table_ident, &simple_table_schema());
+        assert_table_metadata_location_matches(&table, 
&expected_table_metadata_location_regex);
+
+        let table = catalog.load_table(&expected_table_ident).await.unwrap();
+        assert_table_eq(&table, &expected_table_ident, &simple_table_schema());
+        assert_table_metadata_location_matches(&table, 
&expected_table_metadata_location_regex);
+    }
+
+    #[tokio::test]
+    async fn 
test_create_table_throws_error_if_table_with_same_name_already_exists() {
+        let warehouse_loc = temp_path();
+        let catalog = new_sql_catalog(warehouse_loc.clone()).await;
+        let namespace_ident = NamespaceIdent::new("a".into());
+        create_namespace(&catalog, &namespace_ident).await;
+        let table_name = "tbl1";
+        let table_ident = TableIdent::new(namespace_ident.clone(), 
table_name.into());
+        create_table(&catalog, &table_ident).await;
+
+        let tmp_dir = TempDir::new().unwrap();
+        let location = tmp_dir.path().to_str().unwrap().to_string();
+
+        assert_eq!(
+            catalog
+                .create_table(
+                    &namespace_ident,
+                    TableCreation::builder()
+                        .name(table_name.into())
+                        .schema(simple_table_schema())
+                        .location(location)
+                        .build()
+                )
+                .await
+                .unwrap_err()
+                .to_string(),
+            format!("Unexpected => Table {:?} already exists.", &table_ident)
+        );
+    }
+
+    #[tokio::test]
+    async fn test_rename_table_in_same_namespace() {
+        let warehouse_loc = temp_path();
+        let catalog = new_sql_catalog(warehouse_loc).await;
+        let namespace_ident = NamespaceIdent::new("n1".into());
+        create_namespace(&catalog, &namespace_ident).await;
+        let src_table_ident = TableIdent::new(namespace_ident.clone(), 
"tbl1".into());
+        let dst_table_ident = TableIdent::new(namespace_ident.clone(), 
"tbl2".into());
+        create_table(&catalog, &src_table_ident).await;
+
+        catalog
+            .rename_table(&src_table_ident, &dst_table_ident)
+            .await
+            .unwrap();
+
+        assert_eq!(catalog.list_tables(&namespace_ident).await.unwrap(), vec![
+            dst_table_ident
+        ],);
+    }
+
+    #[tokio::test]
+    async fn test_rename_table_across_namespaces() {
+        let warehouse_loc = temp_path();
+        let catalog = new_sql_catalog(warehouse_loc).await;
+        let src_namespace_ident = NamespaceIdent::new("a".into());
+        let dst_namespace_ident = NamespaceIdent::new("b".into());
+        create_namespaces(&catalog, &vec![&src_namespace_ident, 
&dst_namespace_ident]).await;
+        let src_table_ident = TableIdent::new(src_namespace_ident.clone(), 
"tbl1".into());
+        let dst_table_ident = TableIdent::new(dst_namespace_ident.clone(), 
"tbl2".into());
+        create_table(&catalog, &src_table_ident).await;
+
+        catalog
+            .rename_table(&src_table_ident, &dst_table_ident)
+            .await
+            .unwrap();
+
+        assert_eq!(
+            catalog.list_tables(&src_namespace_ident).await.unwrap(),
+            vec![],
+        );
+
+        assert_eq!(
+            catalog.list_tables(&dst_namespace_ident).await.unwrap(),
+            vec![dst_table_ident],
+        );
+    }
+
+    #[tokio::test]
+    async fn test_rename_table_src_table_is_same_as_dst_table() {
+        let warehouse_loc = temp_path();
+        let catalog = new_sql_catalog(warehouse_loc).await;
+        let namespace_ident = NamespaceIdent::new("n1".into());
+        create_namespace(&catalog, &namespace_ident).await;
+        let table_ident = TableIdent::new(namespace_ident.clone(), 
"tbl".into());
+        create_table(&catalog, &table_ident).await;
+
+        catalog
+            .rename_table(&table_ident, &table_ident)
+            .await
+            .unwrap();
+
+        assert_eq!(catalog.list_tables(&namespace_ident).await.unwrap(), vec![
+            table_ident
+        ],);
+    }
+
+    #[tokio::test]
+    async fn test_rename_table_across_nested_namespaces() {
+        let warehouse_loc = temp_path();
+        let catalog = new_sql_catalog(warehouse_loc).await;
+        let namespace_ident_a = NamespaceIdent::new("a".into());
+        let namespace_ident_a_b = NamespaceIdent::from_strs(vec!["a", 
"b"]).unwrap();
+        let namespace_ident_a_b_c = NamespaceIdent::from_strs(vec!["a", "b", 
"c"]).unwrap();
+        create_namespaces(&catalog, &vec![
+            &namespace_ident_a,
+            &namespace_ident_a_b,
+            &namespace_ident_a_b_c,
+        ])
+        .await;
+
+        let src_table_ident = TableIdent::new(namespace_ident_a_b_c.clone(), 
"tbl1".into());
+        create_tables(&catalog, vec![&src_table_ident]).await;
+
+        let dst_table_ident = TableIdent::new(namespace_ident_a_b.clone(), 
"tbl1".into());
+        catalog
+            .rename_table(&src_table_ident, &dst_table_ident)
+            .await
+            .unwrap();
+
+        assert!(!catalog.table_exists(&src_table_ident).await.unwrap());
+
+        assert!(catalog.table_exists(&dst_table_ident).await.unwrap());
+    }
+
+    #[tokio::test]
+    async fn test_rename_table_throws_error_if_dst_namespace_doesnt_exist() {
+        let warehouse_loc = temp_path();
+        let catalog = new_sql_catalog(warehouse_loc).await;
+        let src_namespace_ident = NamespaceIdent::new("n1".into());
+        let src_table_ident = TableIdent::new(src_namespace_ident.clone(), 
"tbl1".into());
+        create_namespace(&catalog, &src_namespace_ident).await;
+        create_table(&catalog, &src_table_ident).await;
+
+        let non_existent_dst_namespace_ident = 
NamespaceIdent::new("n2".into());
+        let dst_table_ident =
+            TableIdent::new(non_existent_dst_namespace_ident.clone(), 
"tbl1".into());
+        assert_eq!(
+            catalog
+                .rename_table(&src_table_ident, &dst_table_ident)
+                .await
+                .unwrap_err()
+                .to_string(),
+            format!(
+                "Unexpected => No such namespace: {:?}",
+                non_existent_dst_namespace_ident
+            ),
+        );
+    }
+
+    #[tokio::test]
+    async fn test_rename_table_throws_error_if_src_table_doesnt_exist() {
+        let warehouse_loc = temp_path();
+        let catalog = new_sql_catalog(warehouse_loc).await;
+        let namespace_ident = NamespaceIdent::new("n1".into());
+        create_namespace(&catalog, &namespace_ident).await;
+        let src_table_ident = TableIdent::new(namespace_ident.clone(), 
"tbl1".into());
+        let dst_table_ident = TableIdent::new(namespace_ident.clone(), 
"tbl2".into());
+
+        assert_eq!(
+            catalog
+                .rename_table(&src_table_ident, &dst_table_ident)
+                .await
+                .unwrap_err()
+                .to_string(),
+            format!("Unexpected => No such table: {:?}", src_table_ident),
+        );
+    }
+
+    #[tokio::test]
+    async fn test_rename_table_throws_error_if_dst_table_already_exists() {
+        let warehouse_loc = temp_path();
+        let catalog = new_sql_catalog(warehouse_loc).await;
+        let namespace_ident = NamespaceIdent::new("n1".into());
+        create_namespace(&catalog, &namespace_ident).await;
+        let src_table_ident = TableIdent::new(namespace_ident.clone(), 
"tbl1".into());
+        let dst_table_ident = TableIdent::new(namespace_ident.clone(), 
"tbl2".into());
+        create_tables(&catalog, vec![&src_table_ident, 
&dst_table_ident]).await;
+
+        assert_eq!(
+            catalog
+                .rename_table(&src_table_ident, &dst_table_ident)
+                .await
+                .unwrap_err()
+                .to_string(),
+            format!("Unexpected => Table {:?} already exists.", 
&dst_table_ident),
+        );
+    }
 }
diff --git a/crates/catalog/sql/src/error.rs b/crates/catalog/sql/src/error.rs
index cfefcc26..15b56e8e 100644
--- a/crates/catalog/sql/src/error.rs
+++ b/crates/catalog/sql/src/error.rs
@@ -15,7 +15,7 @@
 // specific language governing permissions and limitations
 // under the License.
 
-use iceberg::{Error, ErrorKind, NamespaceIdent, Result};
+use iceberg::{Error, ErrorKind, NamespaceIdent, Result, TableIdent};
 
 /// Format an sqlx error into iceberg error.
 pub fn from_sqlx_error(error: sqlx::Error) -> Error {
@@ -32,3 +32,17 @@ pub fn no_such_namespace_err<T>(namespace: &NamespaceIdent) 
-> Result<T> {
         format!("No such namespace: {:?}", namespace),
     ))
 }
+
+pub fn no_such_table_err<T>(table_ident: &TableIdent) -> Result<T> {
+    Err(Error::new(
+        ErrorKind::Unexpected,
+        format!("No such table: {:?}", table_ident),
+    ))
+}
+
+pub fn table_already_exists_err<T>(table_ident: &TableIdent) -> Result<T> {
+    Err(Error::new(
+        ErrorKind::Unexpected,
+        format!("Table {:?} already exists.", table_ident),
+    ))
+}


Reply via email to