This is an automated email from the ASF dual-hosted git repository.
liurenjie1024 pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/iceberg-rust.git
The following commit(s) were added to refs/heads/main by this push:
new ae6d0958 feat: SQL Catalog - Tables (#610)
ae6d0958 is described below
commit ae6d0958843e6d18be301547de31de1610a72304
Author: Callum Ryan <[email protected]>
AuthorDate: Sat Oct 12 10:17:07 2024 +0100
feat: SQL Catalog - Tables (#610)
* feat: add list/exist table + drop namespace
Signed-off-by: callum-ryan <[email protected]>
* feat: create table
Signed-off-by: callum-ryan <[email protected]>
* feat: add load table
Signed-off-by: callum-ryan <[email protected]>
* feat: add the rest of table ops
Signed-off-by: callum-ryan <[email protected]>
* fix: sort order on test
Signed-off-by: callum-ryan <[email protected]>
* fix: sort Cargo.toml
Signed-off-by: callum-ryan <[email protected]>
* fix: Adjust error message for existence of table
Signed-off-by: callum-ryan <[email protected]>
* fix: update_table throws Unsupported, add catalog filter to drop_nsp
Signed-off-by: callum-ryan <[email protected]>
---------
Signed-off-by: callum-ryan <[email protected]>
---
crates/catalog/sql/Cargo.toml | 2 +
crates/catalog/sql/src/catalog.rs | 836 ++++++++++++++++++++++++++++++++++++--
crates/catalog/sql/src/error.rs | 16 +-
3 files changed, 824 insertions(+), 30 deletions(-)
diff --git a/crates/catalog/sql/Cargo.toml b/crates/catalog/sql/Cargo.toml
index 4a88e75b..a5167165 100644
--- a/crates/catalog/sql/Cargo.toml
+++ b/crates/catalog/sql/Cargo.toml
@@ -31,8 +31,10 @@ keywords = ["iceberg", "sql", "catalog"]
[dependencies]
async-trait = { workspace = true }
iceberg = { workspace = true }
+serde_json = { workspace = true }
sqlx = { version = "0.8.1", features = ["any"], default-features = false }
typed-builder = { workspace = true }
+uuid = { workspace = true, features = ["v4"] }
[dev-dependencies]
iceberg_test_utils = { path = "../../test_utils", features = ["tests"] }
diff --git a/crates/catalog/sql/src/catalog.rs
b/crates/catalog/sql/src/catalog.rs
index c6a524ce..b7976d9d 100644
--- a/crates/catalog/sql/src/catalog.rs
+++ b/crates/catalog/sql/src/catalog.rs
@@ -20,15 +20,20 @@ use std::time::Duration;
use async_trait::async_trait;
use iceberg::io::FileIO;
+use iceberg::spec::{TableMetadata, TableMetadataBuilder};
use iceberg::table::Table;
use iceberg::{
- Catalog, Error, Namespace, NamespaceIdent, Result, TableCommit,
TableCreation, TableIdent,
+ Catalog, Error, ErrorKind, Namespace, NamespaceIdent, Result, TableCommit,
TableCreation,
+ TableIdent,
};
use sqlx::any::{install_default_drivers, AnyPoolOptions, AnyQueryResult,
AnyRow};
use sqlx::{Any, AnyPool, Row, Transaction};
use typed_builder::TypedBuilder;
+use uuid::Uuid;
-use crate::error::{from_sqlx_error, no_such_namespace_err};
+use crate::error::{
+ from_sqlx_error, no_such_namespace_err, no_such_table_err,
table_already_exists_err,
+};
static CATALOG_TABLE_NAME: &str = "iceberg_tables";
static CATALOG_FIELD_CATALOG_NAME: &str = "catalog_name";
@@ -37,12 +42,15 @@ static CATALOG_FIELD_TABLE_NAMESPACE: &str =
"table_namespace";
static CATALOG_FIELD_METADATA_LOCATION_PROP: &str = "metadata_location";
static CATALOG_FIELD_PREVIOUS_METADATA_LOCATION_PROP: &str =
"previous_metadata_location";
static CATALOG_FIELD_RECORD_TYPE: &str = "iceberg_type";
+static CATALOG_FIELD_TABLE_RECORD_TYPE: &str = "TABLE";
static NAMESPACE_TABLE_NAME: &str = "iceberg_namespace_properties";
static NAMESPACE_FIELD_NAME: &str = "namespace";
static NAMESPACE_FIELD_PROPERTY_KEY: &str = "property_key";
static NAMESPACE_FIELD_PROPERTY_VALUE: &str = "property_value";
+static NAMESPACE_LOCATION_PROPERTY_KEY: &str = "location";
+
static MAX_CONNECTIONS: u32 = 10; // Default the SQL pool to 10 connections if
not provided
static IDLE_TIMEOUT: u64 = 10; // Default the maximum idle timeout per
connection to 10s before it is closed
static TEST_BEFORE_ACQUIRE: bool = true; // Default the health-check of each
connection to enabled prior to returning
@@ -71,8 +79,8 @@ pub struct SqlCatalogConfig {
pub struct SqlCatalog {
name: String,
connection: AnyPool,
- _warehouse_location: String,
- _fileio: FileIO,
+ warehouse_location: String,
+ fileio: FileIO,
sql_bind_style: SqlBindStyle,
}
@@ -142,8 +150,8 @@ impl SqlCatalog {
Ok(SqlCatalog {
name: config.name.to_owned(),
connection: pool,
- _warehouse_location: config.warehouse_location,
- _fileio: config.file_io,
+ warehouse_location: config.warehouse_location,
+ fileio: config.file_io,
sql_bind_style: config.sql_bind_style,
})
}
@@ -472,40 +480,298 @@ impl Catalog for SqlCatalog {
}
}
- async fn drop_namespace(&self, _namespace: &NamespaceIdent) -> Result<()> {
- todo!()
+ async fn drop_namespace(&self, namespace: &NamespaceIdent) -> Result<()> {
+ let exists = self.namespace_exists(namespace).await?;
+ if exists {
+ // if there are tables in the namespace, don't allow drop.
+ let tables = self.list_tables(namespace).await?;
+ if !tables.is_empty() {
+ return Err(Error::new(
+ iceberg::ErrorKind::Unexpected,
+ format!(
+ "Namespace {:?} is not empty. {} tables exist.",
+ namespace,
+ tables.len()
+ ),
+ ));
+ }
+
+ self.execute(
+ &format!(
+ "DELETE FROM {NAMESPACE_TABLE_NAME}
+ WHERE {NAMESPACE_FIELD_NAME} = ?
+ AND {CATALOG_FIELD_CATALOG_NAME} = ?"
+ ),
+ vec![Some(&namespace.join(".")), Some(&self.name)],
+ None,
+ )
+ .await?;
+
+ Ok(())
+ } else {
+ no_such_namespace_err(namespace)
+ }
}
- async fn list_tables(&self, _namespace: &NamespaceIdent) ->
Result<Vec<TableIdent>> {
- todo!()
+ async fn list_tables(&self, namespace: &NamespaceIdent) ->
Result<Vec<TableIdent>> {
+ let exists = self.namespace_exists(namespace).await?;
+ if exists {
+ let rows = self
+ .fetch_rows(
+ &format!(
+ "SELECT {CATALOG_FIELD_TABLE_NAME},
+ {CATALOG_FIELD_TABLE_NAMESPACE}
+ FROM {CATALOG_TABLE_NAME}
+ WHERE {CATALOG_FIELD_TABLE_NAMESPACE} = ?
+ AND {CATALOG_FIELD_CATALOG_NAME} = ?
+ AND (
+ {CATALOG_FIELD_RECORD_TYPE} =
'{CATALOG_FIELD_TABLE_RECORD_TYPE}'
+ OR {CATALOG_FIELD_RECORD_TYPE} IS NULL
+ )",
+ ),
+ vec![Some(&namespace.join(".")), Some(&self.name)],
+ )
+ .await?;
+
+ let mut tables = HashSet::<TableIdent>::with_capacity(rows.len());
+
+ for row in rows.iter() {
+ let tbl = row
+ .try_get::<String, _>(CATALOG_FIELD_TABLE_NAME)
+ .map_err(from_sqlx_error)?;
+ let ns_strs = row
+ .try_get::<String, _>(CATALOG_FIELD_TABLE_NAMESPACE)
+ .map_err(from_sqlx_error)?;
+ let ns = NamespaceIdent::from_strs(ns_strs.split("."))?;
+ tables.insert(TableIdent::new(ns, tbl));
+ }
+
+ Ok(tables.into_iter().collect::<Vec<TableIdent>>())
+ } else {
+ no_such_namespace_err(namespace)
+ }
}
- async fn table_exists(&self, _identifier: &TableIdent) -> Result<bool> {
- todo!()
+ async fn table_exists(&self, identifier: &TableIdent) -> Result<bool> {
+ let namespace = identifier.namespace().join(".");
+ let table_name = identifier.name();
+ let table_counts = self
+ .fetch_rows(
+ &format!(
+ "SELECT 1
+ FROM {CATALOG_TABLE_NAME}
+ WHERE {CATALOG_FIELD_TABLE_NAMESPACE} = ?
+ AND {CATALOG_FIELD_CATALOG_NAME} = ?
+ AND {CATALOG_FIELD_TABLE_NAME} = ?
+ AND (
+ {CATALOG_FIELD_RECORD_TYPE} =
'{CATALOG_FIELD_TABLE_RECORD_TYPE}'
+ OR {CATALOG_FIELD_RECORD_TYPE} IS NULL
+ )"
+ ),
+ vec![Some(&namespace), Some(&self.name), Some(table_name)],
+ )
+ .await?;
+
+ if !table_counts.is_empty() {
+ Ok(true)
+ } else {
+ Ok(false)
+ }
}
- async fn drop_table(&self, _identifier: &TableIdent) -> Result<()> {
- todo!()
+ async fn drop_table(&self, identifier: &TableIdent) -> Result<()> {
+ if !self.table_exists(identifier).await? {
+ return no_such_table_err(identifier);
+ }
+
+ self.execute(
+ &format!(
+ "DELETE FROM {CATALOG_TABLE_NAME}
+ WHERE {CATALOG_FIELD_CATALOG_NAME} = ?
+ AND {CATALOG_FIELD_TABLE_NAMESPACE} = ?
+ AND {CATALOG_FIELD_TABLE_NAME} = ?
+ AND (
+ {CATALOG_FIELD_RECORD_TYPE} =
'{CATALOG_FIELD_TABLE_RECORD_TYPE}'
+ OR {CATALOG_FIELD_RECORD_TYPE} IS NULL
+ )"
+ ),
+ vec![
+ Some(&self.name),
+ Some(identifier.name()),
+ Some(&identifier.namespace().join(".")),
+ ],
+ None,
+ )
+ .await?;
+
+ Ok(())
}
- async fn load_table(&self, _identifier: &TableIdent) -> Result<Table> {
- todo!()
+ async fn load_table(&self, identifier: &TableIdent) -> Result<Table> {
+ if !self.table_exists(identifier).await? {
+ return no_such_table_err(identifier);
+ }
+
+ let rows = self
+ .fetch_rows(
+ &format!(
+ "SELECT {CATALOG_FIELD_METADATA_LOCATION_PROP}
+ FROM {CATALOG_TABLE_NAME}
+ WHERE {CATALOG_FIELD_CATALOG_NAME} = ?
+ AND {CATALOG_FIELD_TABLE_NAME} = ?
+ AND {CATALOG_FIELD_TABLE_NAMESPACE} = ?
+ AND (
+ {CATALOG_FIELD_RECORD_TYPE} =
'{CATALOG_FIELD_TABLE_RECORD_TYPE}'
+ OR {CATALOG_FIELD_RECORD_TYPE} IS NULL
+ )"
+ ),
+ vec![
+ Some(&self.name),
+ Some(identifier.name()),
+ Some(&identifier.namespace().join(".")),
+ ],
+ )
+ .await?;
+
+ if rows.is_empty() {
+ return no_such_table_err(identifier);
+ }
+
+ let row = &rows[0];
+ let tbl_metadata_location = row
+ .try_get::<String, _>(CATALOG_FIELD_METADATA_LOCATION_PROP)
+ .map_err(from_sqlx_error)?;
+
+ let file = self.fileio.new_input(&tbl_metadata_location)?;
+ let metadata_content = file.read().await?;
+ let metadata =
serde_json::from_slice::<TableMetadata>(&metadata_content)?;
+
+ Ok(Table::builder()
+ .file_io(self.fileio.clone())
+ .identifier(identifier.clone())
+ .metadata_location(tbl_metadata_location)
+ .metadata(metadata)
+ .build()?)
}
async fn create_table(
&self,
- _namespace: &NamespaceIdent,
- _creation: TableCreation,
+ namespace: &NamespaceIdent,
+ creation: TableCreation,
) -> Result<Table> {
- todo!()
+ if !self.namespace_exists(namespace).await? {
+ return no_such_namespace_err(namespace);
+ }
+
+ let tbl_name = creation.name.clone();
+ let tbl_ident = TableIdent::new(namespace.clone(), tbl_name.clone());
+
+ if self.table_exists(&tbl_ident).await? {
+ return table_already_exists_err(&tbl_ident);
+ }
+
+ let (tbl_creation, location) = match creation.location.clone() {
+ Some(location) => (creation, location),
+ None => {
+ // fall back to namespace-specific location
+ // and then to warehouse location
+ let nsp_properties =
self.get_namespace(namespace).await?.properties().clone();
+ let nsp_location = match
nsp_properties.get(NAMESPACE_LOCATION_PROPERTY_KEY) {
+ Some(location) => location.clone(),
+ None => {
+ format!(
+ "{}/{}",
+ self.warehouse_location.clone(),
+ namespace.join("/")
+ )
+ }
+ };
+
+ let tbl_location = format!("{}/{}", nsp_location,
tbl_ident.name());
+
+ (
+ TableCreation {
+ location: Some(tbl_location.clone()),
+ ..creation
+ },
+ tbl_location,
+ )
+ }
+ };
+
+ let tbl_metadata =
TableMetadataBuilder::from_table_creation(tbl_creation)?.build()?;
+ let tbl_metadata_location = format!(
+ "{}/metadata/0-{}.metadata.json",
+ location.clone(),
+ Uuid::new_v4()
+ );
+
+ let file = self.fileio.new_output(&tbl_metadata_location)?;
+ file.write(serde_json::to_vec(&tbl_metadata)?.into())
+ .await?;
+
+ self.execute(&format!(
+ "INSERT INTO {CATALOG_TABLE_NAME}
+ ({CATALOG_FIELD_CATALOG_NAME}, {CATALOG_FIELD_TABLE_NAMESPACE},
{CATALOG_FIELD_TABLE_NAME}, {CATALOG_FIELD_METADATA_LOCATION_PROP},
{CATALOG_FIELD_RECORD_TYPE})
+ VALUES (?, ?, ?, ?, ?)
+ "), vec![Some(&self.name), Some(&namespace.join(".")),
Some(&tbl_name.clone()), Some(&tbl_metadata_location),
Some(CATALOG_FIELD_TABLE_RECORD_TYPE)], None).await?;
+
+ Ok(Table::builder()
+ .file_io(self.fileio.clone())
+ .metadata_location(tbl_metadata_location)
+ .identifier(tbl_ident)
+ .metadata(tbl_metadata)
+ .build()?)
}
- async fn rename_table(&self, _src: &TableIdent, _dest: &TableIdent) ->
Result<()> {
- todo!()
+ async fn rename_table(&self, src: &TableIdent, dest: &TableIdent) ->
Result<()> {
+ if src == dest {
+ return Ok(());
+ }
+
+ if !self.table_exists(src).await? {
+ return no_such_table_err(src);
+ }
+
+ if !self.namespace_exists(dest.namespace()).await? {
+ return no_such_namespace_err(dest.namespace());
+ }
+
+ if self.table_exists(dest).await? {
+ return table_already_exists_err(dest);
+ }
+
+ self.execute(
+ &format!(
+ "UPDATE {CATALOG_TABLE_NAME}
+ SET {CATALOG_FIELD_TABLE_NAME} = ?,
{CATALOG_FIELD_TABLE_NAMESPACE} = ?
+ WHERE {CATALOG_FIELD_CATALOG_NAME} = ?
+ AND {CATALOG_FIELD_TABLE_NAME} = ?
+ AND {CATALOG_FIELD_TABLE_NAMESPACE} = ?
+ AND (
+ {CATALOG_FIELD_RECORD_TYPE} =
'{CATALOG_FIELD_TABLE_RECORD_TYPE}'
+ OR {CATALOG_FIELD_RECORD_TYPE} IS NULL
+ )"
+ ),
+ vec![
+ Some(dest.name()),
+ Some(&dest.namespace().join(".")),
+ Some(&self.name),
+ Some(src.name()),
+ Some(&src.namespace().join(".")),
+ ],
+ None,
+ )
+ .await?;
+
+ Ok(())
}
async fn update_table(&self, _commit: TableCommit) -> Result<Table> {
- todo!()
+ Err(Error::new(
+ ErrorKind::FeatureUnsupported,
+ "Updating a table is not supported yet",
+ ))
}
}
@@ -515,12 +781,19 @@ mod tests {
use std::hash::Hash;
use iceberg::io::FileIOBuilder;
- use iceberg::{Catalog, Namespace, NamespaceIdent};
+ use iceberg::spec::{NestedField, PartitionSpec, PrimitiveType, Schema,
SortOrder, Type};
+ use iceberg::table::Table;
+ use iceberg::{Catalog, Namespace, NamespaceIdent, TableCreation,
TableIdent};
+ use itertools::Itertools;
+ use regex::Regex;
use sqlx::migrate::MigrateDatabase;
use tempfile::TempDir;
+ use crate::catalog::NAMESPACE_LOCATION_PROPERTY_KEY;
use crate::{SqlBindStyle, SqlCatalog, SqlCatalogConfig};
+ const UUID_REGEX_STR: &str =
"[0-9a-f]{8}-[0-9a-f]{4}-[0-9a-f]{4}-[0-9a-f]{4}-[0-9a-f]{12}";
+
fn temp_path() -> String {
let temp_dir = TempDir::new().unwrap();
temp_dir.path().to_str().unwrap().to_string()
@@ -562,6 +835,83 @@ mod tests {
}
}
+ fn simple_table_schema() -> Schema {
+ Schema::builder()
+ .with_fields(vec![NestedField::required(
+ 1,
+ "foo",
+ Type::Primitive(PrimitiveType::Int),
+ )
+ .into()])
+ .build()
+ .unwrap()
+ }
+
+ async fn create_table<C: Catalog>(catalog: &C, table_ident: &TableIdent) {
+ let _ = catalog
+ .create_table(
+ &table_ident.namespace,
+ TableCreation::builder()
+ .name(table_ident.name().into())
+ .schema(simple_table_schema())
+ .location(temp_path())
+ .build(),
+ )
+ .await
+ .unwrap();
+ }
+
+ async fn create_tables<C: Catalog>(catalog: &C, table_idents:
Vec<&TableIdent>) {
+ for table_ident in table_idents {
+ create_table(catalog, table_ident).await;
+ }
+ }
+
+ fn assert_table_eq(table: &Table, expected_table_ident: &TableIdent,
expected_schema: &Schema) {
+ assert_eq!(table.identifier(), expected_table_ident);
+
+ let metadata = table.metadata();
+
+ assert_eq!(metadata.current_schema().as_ref(), expected_schema);
+
+ let expected_partition_spec = PartitionSpec::builder(expected_schema)
+ .with_spec_id(0)
+ .build()
+ .unwrap();
+
+ assert_eq!(
+ metadata
+ .partition_specs_iter()
+ .map(|p| p.as_ref())
+ .collect_vec(),
+ vec![&expected_partition_spec]
+ );
+
+ let expected_sorted_order = SortOrder::builder()
+ .with_order_id(0)
+ .with_fields(vec![])
+ .build(expected_schema)
+ .unwrap();
+
+ assert_eq!(
+ metadata
+ .sort_orders_iter()
+ .map(|s| s.as_ref())
+ .collect_vec(),
+ vec![&expected_sorted_order]
+ );
+
+ assert_eq!(metadata.properties(), &HashMap::new());
+
+ assert!(!table.readonly());
+ }
+
+ fn assert_table_metadata_location_matches(table: &Table, regex_str: &str) {
+ let actual = table.metadata_location().unwrap().to_string();
+ let regex = Regex::new(regex_str).unwrap();
+ assert!(regex.is_match(&actual))
+ }
+
#[tokio::test]
async fn test_initialized() {
let warehouse_loc = temp_path();
@@ -810,7 +1160,6 @@ mod tests {
}
#[tokio::test]
- #[ignore = "drop_namespace not implemented"]
async fn test_drop_namespace() {
let warehouse_loc = temp_path();
let catalog = new_sql_catalog(warehouse_loc).await;
@@ -823,7 +1172,6 @@ mod tests {
}
#[tokio::test]
- #[ignore = "drop_namespace not implemented"]
async fn test_drop_nested_namespace() {
let warehouse_loc = temp_path();
let catalog = new_sql_catalog(warehouse_loc).await;
@@ -842,7 +1190,6 @@ mod tests {
}
#[tokio::test]
- #[ignore = "drop_namespace not implemented"]
async fn test_drop_deeply_nested_namespace() {
let warehouse_loc = temp_path();
let catalog = new_sql_catalog(warehouse_loc).await;
@@ -875,7 +1222,6 @@ mod tests {
}
#[tokio::test]
- #[ignore = "drop_namespace not implemented"]
async fn test_drop_namespace_throws_error_if_namespace_doesnt_exist() {
let warehouse_loc = temp_path();
let catalog = new_sql_catalog(warehouse_loc).await;
@@ -895,7 +1241,6 @@ mod tests {
}
#[tokio::test]
- #[ignore = "drop_namespace not implemented"]
async fn
test_drop_namespace_throws_error_if_nested_namespace_doesnt_exist() {
let warehouse_loc = temp_path();
let catalog = new_sql_catalog(warehouse_loc).await;
@@ -917,7 +1262,6 @@ mod tests {
}
#[tokio::test]
- #[ignore = "drop_namespace not implemented"]
async fn
test_dropping_a_namespace_does_not_drop_namespaces_nested_under_that_one() {
let warehouse_loc = temp_path();
let catalog = new_sql_catalog(warehouse_loc).await;
@@ -934,4 +1278,438 @@ mod tests {
.await
.unwrap());
}
+
+ #[tokio::test]
+ async fn test_list_tables_returns_empty_vector() {
+ let warehouse_loc = temp_path();
+ let catalog = new_sql_catalog(warehouse_loc).await;
+ let namespace_ident = NamespaceIdent::new("a".into());
+ create_namespace(&catalog, &namespace_ident).await;
+
+ assert_eq!(catalog.list_tables(&namespace_ident).await.unwrap(),
vec![]);
+ }
+
+ #[tokio::test]
+ async fn test_list_tables_throws_error_if_namespace_doesnt_exist() {
+ let warehouse_loc = temp_path();
+ let catalog = new_sql_catalog(warehouse_loc).await;
+
+ let non_existent_namespace_ident = NamespaceIdent::new("n1".into());
+
+ assert_eq!(
+ catalog
+ .list_tables(&non_existent_namespace_ident)
+ .await
+ .unwrap_err()
+ .to_string(),
+ format!(
+ "Unexpected => No such namespace: {:?}",
+ non_existent_namespace_ident
+ ),
+ );
+ }
+
+ #[tokio::test]
+ async fn test_create_table_with_location() {
+ let warehouse_loc = temp_path();
+ let catalog = new_sql_catalog(warehouse_loc.clone()).await;
+ let namespace_ident = NamespaceIdent::new("a".into());
+ create_namespace(&catalog, &namespace_ident).await;
+
+ let table_name = "abc";
+ let location = warehouse_loc.clone();
+ let table_creation = TableCreation::builder()
+ .name(table_name.into())
+ .location(location.clone())
+ .schema(simple_table_schema())
+ .build();
+
+ let expected_table_ident = TableIdent::new(namespace_ident.clone(),
table_name.into());
+
+ assert_table_eq(
+ &catalog
+ .create_table(&namespace_ident, table_creation)
+ .await
+ .unwrap(),
+ &expected_table_ident,
+ &simple_table_schema(),
+ );
+
+ let table = catalog.load_table(&expected_table_ident).await.unwrap();
+
+ assert_table_eq(&table, &expected_table_ident, &simple_table_schema());
+
+ assert!(table
+ .metadata_location()
+ .unwrap()
+ .to_string()
+ .starts_with(&location))
+ }
+
+ #[tokio::test]
+ async fn
test_create_table_falls_back_to_namespace_location_if_table_location_is_missing()
{
+ let warehouse_loc = temp_path();
+ let catalog = new_sql_catalog(warehouse_loc).await;
+
+ let namespace_ident = NamespaceIdent::new("a".into());
+ let mut namespace_properties = HashMap::new();
+ let namespace_location = temp_path();
+ namespace_properties.insert(
+ NAMESPACE_LOCATION_PROPERTY_KEY.to_string(),
+ namespace_location.to_string(),
+ );
+ catalog
+ .create_namespace(&namespace_ident, namespace_properties)
+ .await
+ .unwrap();
+
+ let table_name = "tbl1";
+ let expected_table_ident = TableIdent::new(namespace_ident.clone(),
table_name.into());
+ let expected_table_metadata_location_regex = format!(
+ "^{}/tbl1/metadata/0-{}.metadata.json$",
+ namespace_location, UUID_REGEX_STR,
+ );
+
+ let table = catalog
+ .create_table(
+ &namespace_ident,
+ TableCreation::builder()
+ .name(table_name.into())
+ .schema(simple_table_schema())
+ // no location specified for table
+ .build(),
+ )
+ .await
+ .unwrap();
+ assert_table_eq(&table, &expected_table_ident, &simple_table_schema());
+ assert_table_metadata_location_matches(&table,
&expected_table_metadata_location_regex);
+
+ let table = catalog.load_table(&expected_table_ident).await.unwrap();
+ assert_table_eq(&table, &expected_table_ident, &simple_table_schema());
+ assert_table_metadata_location_matches(&table,
&expected_table_metadata_location_regex);
+ }
+
+ #[tokio::test]
+ async fn
test_create_table_in_nested_namespace_falls_back_to_nested_namespace_location_if_table_location_is_missing(
+ ) {
+ let warehouse_loc = temp_path();
+ let catalog = new_sql_catalog(warehouse_loc).await;
+
+ let namespace_ident = NamespaceIdent::new("a".into());
+ let mut namespace_properties = HashMap::new();
+ let namespace_location = temp_path();
+ namespace_properties.insert(
+ NAMESPACE_LOCATION_PROPERTY_KEY.to_string(),
+ namespace_location.to_string(),
+ );
+ catalog
+ .create_namespace(&namespace_ident, namespace_properties)
+ .await
+ .unwrap();
+
+ let nested_namespace_ident = NamespaceIdent::from_strs(vec!["a",
"b"]).unwrap();
+ let mut nested_namespace_properties = HashMap::new();
+ let nested_namespace_location = temp_path();
+ nested_namespace_properties.insert(
+ NAMESPACE_LOCATION_PROPERTY_KEY.to_string(),
+ nested_namespace_location.to_string(),
+ );
+ catalog
+ .create_namespace(&nested_namespace_ident,
nested_namespace_properties)
+ .await
+ .unwrap();
+
+ let table_name = "tbl1";
+ let expected_table_ident =
+ TableIdent::new(nested_namespace_ident.clone(), table_name.into());
+ let expected_table_metadata_location_regex = format!(
+ "^{}/tbl1/metadata/0-{}.metadata.json$",
+ nested_namespace_location, UUID_REGEX_STR,
+ );
+
+ let table = catalog
+ .create_table(
+ &nested_namespace_ident,
+ TableCreation::builder()
+ .name(table_name.into())
+ .schema(simple_table_schema())
+ // no location specified for table
+ .build(),
+ )
+ .await
+ .unwrap();
+ assert_table_eq(&table, &expected_table_ident, &simple_table_schema());
+ assert_table_metadata_location_matches(&table,
&expected_table_metadata_location_regex);
+
+ let table = catalog.load_table(&expected_table_ident).await.unwrap();
+ assert_table_eq(&table, &expected_table_ident, &simple_table_schema());
+ assert_table_metadata_location_matches(&table,
&expected_table_metadata_location_regex);
+ }
+
+ #[tokio::test]
+ async fn
test_create_table_falls_back_to_warehouse_location_if_both_table_location_and_namespace_location_are_missing(
+ ) {
+ let warehouse_loc = temp_path();
+ let catalog = new_sql_catalog(warehouse_loc.clone()).await;
+
+ let namespace_ident = NamespaceIdent::new("a".into());
+ // note: no location specified in namespace_properties
+ let namespace_properties = HashMap::new();
+ catalog
+ .create_namespace(&namespace_ident, namespace_properties)
+ .await
+ .unwrap();
+
+ let table_name = "tbl1";
+ let expected_table_ident = TableIdent::new(namespace_ident.clone(),
table_name.into());
+ let expected_table_metadata_location_regex = format!(
+ "^{}/a/tbl1/metadata/0-{}.metadata.json$",
+ warehouse_loc, UUID_REGEX_STR
+ );
+
+ let table = catalog
+ .create_table(
+ &namespace_ident,
+ TableCreation::builder()
+ .name(table_name.into())
+ .schema(simple_table_schema())
+ // no location specified for table
+ .build(),
+ )
+ .await
+ .unwrap();
+ assert_table_eq(&table, &expected_table_ident, &simple_table_schema());
+ assert_table_metadata_location_matches(&table,
&expected_table_metadata_location_regex);
+
+ let table = catalog.load_table(&expected_table_ident).await.unwrap();
+ assert_table_eq(&table, &expected_table_ident, &simple_table_schema());
+ assert_table_metadata_location_matches(&table,
&expected_table_metadata_location_regex);
+ }
+
+ #[tokio::test]
+ async fn
test_create_table_in_nested_namespace_falls_back_to_warehouse_location_if_both_table_location_and_namespace_location_are_missing(
+ ) {
+ let warehouse_loc = temp_path();
+ let catalog = new_sql_catalog(warehouse_loc.clone()).await;
+
+ let namespace_ident = NamespaceIdent::new("a".into());
+ create_namespace(&catalog, &namespace_ident).await;
+
+ let nested_namespace_ident = NamespaceIdent::from_strs(vec!["a",
"b"]).unwrap();
+ create_namespace(&catalog, &nested_namespace_ident).await;
+
+ let table_name = "tbl1";
+ let expected_table_ident =
+ TableIdent::new(nested_namespace_ident.clone(), table_name.into());
+ let expected_table_metadata_location_regex = format!(
+ "^{}/a/b/tbl1/metadata/0-{}.metadata.json$",
+ warehouse_loc, UUID_REGEX_STR
+ );
+
+ let table = catalog
+ .create_table(
+ &nested_namespace_ident,
+ TableCreation::builder()
+ .name(table_name.into())
+ .schema(simple_table_schema())
+ // no location specified for table
+ .build(),
+ )
+ .await
+ .unwrap();
+ assert_table_eq(&table, &expected_table_ident, &simple_table_schema());
+ assert_table_metadata_location_matches(&table,
&expected_table_metadata_location_regex);
+
+ let table = catalog.load_table(&expected_table_ident).await.unwrap();
+ assert_table_eq(&table, &expected_table_ident, &simple_table_schema());
+ assert_table_metadata_location_matches(&table,
&expected_table_metadata_location_regex);
+ }
+
+ #[tokio::test]
+ async fn
test_create_table_throws_error_if_table_with_same_name_already_exists() {
+ let warehouse_loc = temp_path();
+ let catalog = new_sql_catalog(warehouse_loc.clone()).await;
+ let namespace_ident = NamespaceIdent::new("a".into());
+ create_namespace(&catalog, &namespace_ident).await;
+ let table_name = "tbl1";
+ let table_ident = TableIdent::new(namespace_ident.clone(),
table_name.into());
+ create_table(&catalog, &table_ident).await;
+
+ let tmp_dir = TempDir::new().unwrap();
+ let location = tmp_dir.path().to_str().unwrap().to_string();
+
+ assert_eq!(
+ catalog
+ .create_table(
+ &namespace_ident,
+ TableCreation::builder()
+ .name(table_name.into())
+ .schema(simple_table_schema())
+ .location(location)
+ .build()
+ )
+ .await
+ .unwrap_err()
+ .to_string(),
+ format!("Unexpected => Table {:?} already exists.", &table_ident)
+ );
+ }
+
+ #[tokio::test]
+ async fn test_rename_table_in_same_namespace() {
+ let warehouse_loc = temp_path();
+ let catalog = new_sql_catalog(warehouse_loc).await;
+ let namespace_ident = NamespaceIdent::new("n1".into());
+ create_namespace(&catalog, &namespace_ident).await;
+ let src_table_ident = TableIdent::new(namespace_ident.clone(),
"tbl1".into());
+ let dst_table_ident = TableIdent::new(namespace_ident.clone(),
"tbl2".into());
+ create_table(&catalog, &src_table_ident).await;
+
+ catalog
+ .rename_table(&src_table_ident, &dst_table_ident)
+ .await
+ .unwrap();
+
+ assert_eq!(catalog.list_tables(&namespace_ident).await.unwrap(), vec![
+ dst_table_ident
+ ],);
+ }
+
+ #[tokio::test]
+ async fn test_rename_table_across_namespaces() {
+ let warehouse_loc = temp_path();
+ let catalog = new_sql_catalog(warehouse_loc).await;
+ let src_namespace_ident = NamespaceIdent::new("a".into());
+ let dst_namespace_ident = NamespaceIdent::new("b".into());
+ create_namespaces(&catalog, &vec![&src_namespace_ident,
&dst_namespace_ident]).await;
+ let src_table_ident = TableIdent::new(src_namespace_ident.clone(),
"tbl1".into());
+ let dst_table_ident = TableIdent::new(dst_namespace_ident.clone(),
"tbl2".into());
+ create_table(&catalog, &src_table_ident).await;
+
+ catalog
+ .rename_table(&src_table_ident, &dst_table_ident)
+ .await
+ .unwrap();
+
+ assert_eq!(
+ catalog.list_tables(&src_namespace_ident).await.unwrap(),
+ vec![],
+ );
+
+ assert_eq!(
+ catalog.list_tables(&dst_namespace_ident).await.unwrap(),
+ vec![dst_table_ident],
+ );
+ }
+
+ #[tokio::test]
+ async fn test_rename_table_src_table_is_same_as_dst_table() {
+ let warehouse_loc = temp_path();
+ let catalog = new_sql_catalog(warehouse_loc).await;
+ let namespace_ident = NamespaceIdent::new("n1".into());
+ create_namespace(&catalog, &namespace_ident).await;
+ let table_ident = TableIdent::new(namespace_ident.clone(),
"tbl".into());
+ create_table(&catalog, &table_ident).await;
+
+ catalog
+ .rename_table(&table_ident, &table_ident)
+ .await
+ .unwrap();
+
+ assert_eq!(catalog.list_tables(&namespace_ident).await.unwrap(), vec![
+ table_ident
+ ],);
+ }
+
+ #[tokio::test]
+ async fn test_rename_table_across_nested_namespaces() {
+ let warehouse_loc = temp_path();
+ let catalog = new_sql_catalog(warehouse_loc).await;
+ let namespace_ident_a = NamespaceIdent::new("a".into());
+ let namespace_ident_a_b = NamespaceIdent::from_strs(vec!["a",
"b"]).unwrap();
+ let namespace_ident_a_b_c = NamespaceIdent::from_strs(vec!["a", "b",
"c"]).unwrap();
+ create_namespaces(&catalog, &vec![
+ &namespace_ident_a,
+ &namespace_ident_a_b,
+ &namespace_ident_a_b_c,
+ ])
+ .await;
+
+ let src_table_ident = TableIdent::new(namespace_ident_a_b_c.clone(),
"tbl1".into());
+ create_tables(&catalog, vec![&src_table_ident]).await;
+
+ let dst_table_ident = TableIdent::new(namespace_ident_a_b.clone(),
"tbl1".into());
+ catalog
+ .rename_table(&src_table_ident, &dst_table_ident)
+ .await
+ .unwrap();
+
+ assert!(!catalog.table_exists(&src_table_ident).await.unwrap());
+
+ assert!(catalog.table_exists(&dst_table_ident).await.unwrap());
+ }
+
+ #[tokio::test]
+ async fn test_rename_table_throws_error_if_dst_namespace_doesnt_exist() {
+ let warehouse_loc = temp_path();
+ let catalog = new_sql_catalog(warehouse_loc).await;
+ let src_namespace_ident = NamespaceIdent::new("n1".into());
+ let src_table_ident = TableIdent::new(src_namespace_ident.clone(),
"tbl1".into());
+ create_namespace(&catalog, &src_namespace_ident).await;
+ create_table(&catalog, &src_table_ident).await;
+
+ let non_existent_dst_namespace_ident =
NamespaceIdent::new("n2".into());
+ let dst_table_ident =
+ TableIdent::new(non_existent_dst_namespace_ident.clone(),
"tbl1".into());
+ assert_eq!(
+ catalog
+ .rename_table(&src_table_ident, &dst_table_ident)
+ .await
+ .unwrap_err()
+ .to_string(),
+ format!(
+ "Unexpected => No such namespace: {:?}",
+ non_existent_dst_namespace_ident
+ ),
+ );
+ }
+
+ #[tokio::test]
+ async fn test_rename_table_throws_error_if_src_table_doesnt_exist() {
+ let warehouse_loc = temp_path();
+ let catalog = new_sql_catalog(warehouse_loc).await;
+ let namespace_ident = NamespaceIdent::new("n1".into());
+ create_namespace(&catalog, &namespace_ident).await;
+ let src_table_ident = TableIdent::new(namespace_ident.clone(),
"tbl1".into());
+ let dst_table_ident = TableIdent::new(namespace_ident.clone(),
"tbl2".into());
+
+ assert_eq!(
+ catalog
+ .rename_table(&src_table_ident, &dst_table_ident)
+ .await
+ .unwrap_err()
+ .to_string(),
+ format!("Unexpected => No such table: {:?}", src_table_ident),
+ );
+ }
+
+ #[tokio::test]
+ async fn test_rename_table_throws_error_if_dst_table_already_exists() {
+ let warehouse_loc = temp_path();
+ let catalog = new_sql_catalog(warehouse_loc).await;
+ let namespace_ident = NamespaceIdent::new("n1".into());
+ create_namespace(&catalog, &namespace_ident).await;
+ let src_table_ident = TableIdent::new(namespace_ident.clone(),
"tbl1".into());
+ let dst_table_ident = TableIdent::new(namespace_ident.clone(),
"tbl2".into());
+ create_tables(&catalog, vec![&src_table_ident,
&dst_table_ident]).await;
+
+ assert_eq!(
+ catalog
+ .rename_table(&src_table_ident, &dst_table_ident)
+ .await
+ .unwrap_err()
+ .to_string(),
+ format!("Unexpected => Table {:?} already exists.",
&dst_table_ident),
+ );
+ }
}
diff --git a/crates/catalog/sql/src/error.rs b/crates/catalog/sql/src/error.rs
index cfefcc26..15b56e8e 100644
--- a/crates/catalog/sql/src/error.rs
+++ b/crates/catalog/sql/src/error.rs
@@ -15,7 +15,7 @@
// specific language governing permissions and limitations
// under the License.
-use iceberg::{Error, ErrorKind, NamespaceIdent, Result};
+use iceberg::{Error, ErrorKind, NamespaceIdent, Result, TableIdent};
/// Format an sqlx error into iceberg error.
pub fn from_sqlx_error(error: sqlx::Error) -> Error {
@@ -32,3 +32,17 @@ pub fn no_such_namespace_err<T>(namespace: &NamespaceIdent)
-> Result<T> {
format!("No such namespace: {:?}", namespace),
))
}
+
+pub fn no_such_table_err<T>(table_ident: &TableIdent) -> Result<T> {
+ Err(Error::new(
+ ErrorKind::Unexpected,
+ format!("No such table: {:?}", table_ident),
+ ))
+}
+
+pub fn table_already_exists_err<T>(table_ident: &TableIdent) -> Result<T> {
+ Err(Error::new(
+ ErrorKind::Unexpected,
+ format!("Table {:?} already exists.", table_ident),
+ ))
+}