This is an automated email from the ASF dual-hosted git repository.
liurenjie1024 pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/iceberg-rust.git
The following commit(s) were added to refs/heads/main by this push:
new 2018ffc feat: Glue Catalog - namespace operations (2/3) (#304)
2018ffc is described below
commit 2018ffc87625bdff939aac791784d8eabc4eda38
Author: Marvin Lanhenke <[email protected]>
AuthorDate: Wed Mar 27 04:17:57 2024 +0100
feat: Glue Catalog - namespace operations (2/3) (#304)
* add from_build_error
* impl create_namespace
* impl get_namespace
* add macro with_catalog_id
* impl namespace_exists
* impl update_namespace
* impl list_tables
* impl drop_namespace
* fix: clippy
* update docs
* update docs
* fix: naming and visibility of error conversions
---
crates/catalog/glue/src/catalog.rs | 218 ++++++++++++++++++++++---
crates/catalog/glue/src/error.rs | 13 +-
crates/catalog/glue/src/utils.rs | 156 +++++++++++++++++-
crates/catalog/glue/tests/glue_catalog_test.rs | 131 ++++++++++++++-
4 files changed, 486 insertions(+), 32 deletions(-)
diff --git a/crates/catalog/glue/src/catalog.rs
b/crates/catalog/glue/src/catalog.rs
index 7c5e73a..d152a54 100644
--- a/crates/catalog/glue/src/catalog.rs
+++ b/crates/catalog/glue/src/catalog.rs
@@ -15,23 +15,29 @@
// specific language governing permissions and limitations
// under the License.
-//! Iceberg Glue Catalog implementation.
-
use async_trait::async_trait;
use iceberg::table::Table;
-use iceberg::{Catalog, Namespace, NamespaceIdent, Result, TableCommit,
TableCreation, TableIdent};
+use iceberg::{
+ Catalog, Error, ErrorKind, Namespace, NamespaceIdent, Result, TableCommit,
TableCreation,
+ TableIdent,
+};
use std::{collections::HashMap, fmt::Debug};
use typed_builder::TypedBuilder;
-use crate::error::from_aws_error;
-use crate::utils::create_sdk_config;
+use crate::error::from_aws_sdk_error;
+use crate::utils::{
+ convert_to_database, convert_to_namespace, create_sdk_config,
validate_namespace,
+};
+use crate::with_catalog_id;
#[derive(Debug, TypedBuilder)]
/// Glue Catalog configuration
pub struct GlueCatalogConfig {
#[builder(default, setter(strip_option))]
uri: Option<String>,
+ #[builder(default, setter(strip_option))]
+ catalog_id: Option<String>,
#[builder(default)]
props: HashMap<String, String>,
}
@@ -68,6 +74,10 @@ impl GlueCatalog {
#[async_trait]
impl Catalog for GlueCatalog {
+ /// List namespaces from glue catalog.
+ ///
+ /// Glue doesn't support nested namespaces.
+ /// We will return an empty list if parent is some
async fn list_namespaces(
&self,
parent: Option<&NamespaceIdent>,
@@ -80,17 +90,19 @@ impl Catalog for GlueCatalog {
let mut next_token: Option<String> = None;
loop {
- let resp = match &next_token {
+ let builder = match &next_token {
Some(token) => self.client.0.get_databases().next_token(token),
None => self.client.0.get_databases(),
};
- let resp = resp.send().await.map_err(from_aws_error)?;
+ let builder = with_catalog_id!(builder, self.config);
+ let resp = builder.send().await.map_err(from_aws_sdk_error)?;
let dbs: Vec<NamespaceIdent> = resp
.database_list()
.iter()
.map(|db| NamespaceIdent::new(db.name().to_string()))
.collect();
+
database_list.extend(dbs);
next_token = resp.next_token().map(ToOwned::to_owned);
@@ -102,36 +114,200 @@ impl Catalog for GlueCatalog {
Ok(database_list)
}
+ /// Creates a new namespace with the given identifier and properties.
+ ///
+ /// Attempts to create a namespace defined by the `namespace`
+ /// parameter and configured with the specified `properties`.
+ ///
+ /// This function can return an error in the following situations:
+ ///
+ /// - Errors from `validate_namespace` if the namespace identifier does not
+ /// meet validation criteria.
+ /// - Errors from `convert_to_database` if the properties cannot be
+ /// successfully converted into a database configuration.
+ /// - Errors from the underlying database creation process, converted using
+ /// `from_sdk_error`.
async fn create_namespace(
&self,
- _namespace: &NamespaceIdent,
- _properties: HashMap<String, String>,
+ namespace: &NamespaceIdent,
+ properties: HashMap<String, String>,
) -> Result<Namespace> {
- todo!()
+ let db_input = convert_to_database(namespace, &properties)?;
+
+ let builder = self.client.0.create_database().database_input(db_input);
+ let builder = with_catalog_id!(builder, self.config);
+
+ builder.send().await.map_err(from_aws_sdk_error)?;
+
+ Ok(Namespace::with_properties(namespace.clone(), properties))
}
- async fn get_namespace(&self, _namespace: &NamespaceIdent) ->
Result<Namespace> {
- todo!()
+ /// Retrieves a namespace by its identifier.
+ ///
+ /// Validates the given namespace identifier and then queries the
+ /// underlying database client to fetch the corresponding namespace data.
+ /// Constructs a `Namespace` object with the retrieved data and returns it.
+ ///
+ /// This function can return an error in any of the following situations:
+ /// - If the provided namespace identifier fails validation checks
+ /// - If there is an error querying the database, returned by
+ /// `from_sdk_error`.
+ async fn get_namespace(&self, namespace: &NamespaceIdent) ->
Result<Namespace> {
+ let db_name = validate_namespace(namespace)?;
+
+ let builder = self.client.0.get_database().name(&db_name);
+ let builder = with_catalog_id!(builder, self.config);
+
+ let resp = builder.send().await.map_err(from_aws_sdk_error)?;
+
+ match resp.database() {
+ Some(db) => {
+ let namespace = convert_to_namespace(db);
+ Ok(namespace)
+ }
+ None => Err(Error::new(
+ ErrorKind::DataInvalid,
+ format!("Database with name: {} does not exist", db_name),
+ )),
+ }
}
- async fn namespace_exists(&self, _namespace: &NamespaceIdent) ->
Result<bool> {
- todo!()
+ /// Checks if a namespace exists within the Glue Catalog.
+ ///
+ /// Validates the namespace identifier by querying the Glue Catalog
+ /// to determine if the specified namespace (database) exists.
+ ///
+ /// # Returns
+ /// A `Result<bool>` indicating the outcome of the check:
+ /// - `Ok(true)` if the namespace exists.
+ /// - `Ok(false)` if the namespace does not exist, identified by a specific
+ /// `EntityNotFoundException` variant.
+ /// - `Err(...)` if an error occurs during validation or the Glue Catalog
+ /// query, with the error encapsulating the issue.
+ async fn namespace_exists(&self, namespace: &NamespaceIdent) ->
Result<bool> {
+ let db_name = validate_namespace(namespace)?;
+
+ let builder = self.client.0.get_database().name(&db_name);
+ let builder = with_catalog_id!(builder, self.config);
+
+ let resp = builder.send().await;
+
+ match resp {
+ Ok(_) => Ok(true),
+ Err(err) => {
+ if err
+ .as_service_error()
+ .map(|e| e.is_entity_not_found_exception())
+ == Some(true)
+ {
+ return Ok(false);
+ }
+ Err(from_aws_sdk_error(err))
+ }
+ }
}
+ /// Asynchronously updates properties of an existing namespace.
+ ///
+ /// Converts the given namespace identifier and properties into a database
+ /// representation and then attempts to update the corresponding namespace
+ /// in the Glue Catalog.
+ ///
+ /// # Returns
+ /// Returns `Ok(())` if the namespace update is successful. If the
+ /// namespace cannot be updated due to missing information or an error
+ /// during the update process, an `Err(...)` is returned.
async fn update_namespace(
&self,
- _namespace: &NamespaceIdent,
- _properties: HashMap<String, String>,
+ namespace: &NamespaceIdent,
+ properties: HashMap<String, String>,
) -> Result<()> {
- todo!()
+ let db_name = validate_namespace(namespace)?;
+ let db_input = convert_to_database(namespace, &properties)?;
+
+ let builder = self
+ .client
+ .0
+ .update_database()
+ .name(&db_name)
+ .database_input(db_input);
+ let builder = with_catalog_id!(builder, self.config);
+
+ builder.send().await.map_err(from_aws_sdk_error)?;
+
+ Ok(())
}
- async fn drop_namespace(&self, _namespace: &NamespaceIdent) -> Result<()> {
- todo!()
+ /// Asynchronously drops a namespace from the Glue Catalog.
+ ///
+ /// Checks if the namespace is empty. If it still contains tables the
+ /// namespace will not be dropped, but an error is returned instead.
+ ///
+ /// # Returns
+ /// A `Result<()>` indicating the outcome:
+ /// - `Ok(())` signifies successful namespace deletion.
+ /// - `Err(...)` signifies failure to drop the namespace due to validation
+ /// errors, connectivity issues, or Glue Catalog constraints.
+ async fn drop_namespace(&self, namespace: &NamespaceIdent) -> Result<()> {
+ let db_name = validate_namespace(namespace)?;
+ let table_list = self.list_tables(namespace).await?;
+
+ if !table_list.is_empty() {
+ return Err(Error::new(
+ ErrorKind::DataInvalid,
+ format!("Database with name: {} is not empty", &db_name),
+ ));
+ }
+
+ let builder = self.client.0.delete_database().name(db_name);
+ let builder = with_catalog_id!(builder, self.config);
+
+ builder.send().await.map_err(from_aws_sdk_error)?;
+
+ Ok(())
}
- async fn list_tables(&self, _namespace: &NamespaceIdent) ->
Result<Vec<TableIdent>> {
- todo!()
+ /// Asynchronously lists all tables within a specified namespace.
+ ///
+ /// # Returns
+ /// A `Result<Vec<TableIdent>>`, which is:
+ /// - `Ok(vec![...])` containing a vector of `TableIdent` instances, each
+ /// representing a table within the specified namespace.
+ /// - `Err(...)` if an error occurs during namespace validation or while
+ /// querying the database.
+ async fn list_tables(&self, namespace: &NamespaceIdent) ->
Result<Vec<TableIdent>> {
+ let db_name = validate_namespace(namespace)?;
+ let mut table_list: Vec<TableIdent> = Vec::new();
+ let mut next_token: Option<String> = None;
+
+ loop {
+ let builder = match &next_token {
+ Some(token) => self
+ .client
+ .0
+ .get_tables()
+ .database_name(&db_name)
+ .next_token(token),
+ None => self.client.0.get_tables().database_name(&db_name),
+ };
+ let builder = with_catalog_id!(builder, self.config);
+ let resp = builder.send().await.map_err(from_aws_sdk_error)?;
+
+ let tables: Vec<_> = resp
+ .table_list()
+ .iter()
+ .map(|tbl| TableIdent::new(namespace.clone(),
tbl.name().to_string()))
+ .collect();
+
+ table_list.extend(tables);
+
+ next_token = resp.next_token().map(ToOwned::to_owned);
+ if next_token.is_none() {
+ break;
+ }
+ }
+
+ Ok(table_list)
}
async fn create_table(
diff --git a/crates/catalog/glue/src/error.rs b/crates/catalog/glue/src/error.rs
index c9a2559..64a8fe9 100644
--- a/crates/catalog/glue/src/error.rs
+++ b/crates/catalog/glue/src/error.rs
@@ -15,15 +15,13 @@
// specific language governing permissions and limitations
// under the License.
-//! Iceberg Glue Catalog implementation.
-
use anyhow::anyhow;
use std::fmt::Debug;
use iceberg::{Error, ErrorKind};
/// Format AWS SDK error into iceberg error
-pub fn from_aws_error<T>(error: aws_sdk_glue::error::SdkError<T>) -> Error
+pub(crate) fn from_aws_sdk_error<T>(error: aws_sdk_glue::error::SdkError<T>)
-> Error
where
T: Debug,
{
@@ -33,3 +31,12 @@ where
)
.with_source(anyhow!("aws sdk error: {:?}", error))
}
+
+/// Format AWS Build error into iceberg error
+pub(crate) fn from_aws_build_error(error: aws_sdk_glue::error::BuildError) ->
Error {
+ Error::new(
+ ErrorKind::Unexpected,
+ "Operation failed for hitting aws build error".to_string(),
+ )
+ .with_source(anyhow!("aws build error: {:?}", error))
+}
diff --git a/crates/catalog/glue/src/utils.rs b/crates/catalog/glue/src/utils.rs
index e824da8..fa9ebb8 100644
--- a/crates/catalog/glue/src/utils.rs
+++ b/crates/catalog/glue/src/utils.rs
@@ -15,14 +15,17 @@
// specific language governing permissions and limitations
// under the License.
-//! Iceberg Glue Catalog implementation.
-
use std::collections::HashMap;
use aws_config::{BehaviorVersion, Region, SdkConfig};
-use aws_sdk_glue::config::Credentials;
+use aws_sdk_glue::{
+ config::Credentials,
+ types::{Database, DatabaseInput},
+};
+use iceberg::{Error, ErrorKind, Namespace, NamespaceIdent, Result};
+
+use crate::error::from_aws_build_error;
-const _GLUE_ID: &str = "glue.id";
const _GLUE_SKIP_ARCHIVE: &str = "glue.skip-archive";
const _GLUE_SKIP_ARCHIVE_DEFAULT: bool = true;
/// Property aws profile name
@@ -35,16 +38,20 @@ pub const AWS_ACCESS_KEY_ID: &str = "aws_access_key_id";
pub const AWS_SECRET_ACCESS_KEY: &str = "aws_secret_access_key";
/// Property aws session token
pub const AWS_SESSION_TOKEN: &str = "aws_session_token";
+/// Parameter namespace description
+const DESCRIPTION: &str = "description";
+/// Parameter namespace location uri
+const LOCATION: &str = "location_uri";
-/// Creates an AWS SDK configuration (SdkConfig) based on
+/// Creates an aws sdk configuration based on
/// provided properties and an optional endpoint URL.
pub(crate) async fn create_sdk_config(
properties: &HashMap<String, String>,
- endpoint_url: Option<&String>,
+ endpoint_uri: Option<&String>,
) -> SdkConfig {
let mut config = aws_config::defaults(BehaviorVersion::latest());
- if let Some(endpoint) = endpoint_url {
+ if let Some(endpoint) = endpoint_uri {
config = config.endpoint_url(endpoint)
};
@@ -75,12 +82,147 @@ pub(crate) async fn create_sdk_config(
config.load().await
}
+/// Create `DatabaseInput` from `NamespaceIdent` and properties
+pub(crate) fn convert_to_database(
+ namespace: &NamespaceIdent,
+ properties: &HashMap<String, String>,
+) -> Result<DatabaseInput> {
+ let db_name = validate_namespace(namespace)?;
+ let mut builder = DatabaseInput::builder().name(db_name);
+
+ for (k, v) in properties.iter() {
+ match k.as_ref() {
+ DESCRIPTION => {
+ builder = builder.description(v);
+ }
+ LOCATION => {
+ builder = builder.location_uri(v);
+ }
+ _ => {
+ builder = builder.parameters(k, v);
+ }
+ }
+ }
+
+ builder.build().map_err(from_aws_build_error)
+}
+
+/// Create `Namespace` from aws sdk glue `Database`
+pub(crate) fn convert_to_namespace(database: &Database) -> Namespace {
+ let db_name = database.name().to_string();
+ let mut properties = database
+ .parameters()
+ .map_or_else(HashMap::new, |p| p.clone());
+
+ if let Some(location_uri) = database.location_uri() {
+ properties.insert(LOCATION.to_string(), location_uri.to_string());
+ };
+
+ if let Some(description) = database.description() {
+ properties.insert(DESCRIPTION.to_string(), description.to_string());
+ }
+
+ Namespace::with_properties(NamespaceIdent::new(db_name), properties)
+}
+
+/// Checks if provided `NamespaceIdent` is valid
+pub(crate) fn validate_namespace(namespace: &NamespaceIdent) -> Result<String>
{
+ let name = namespace.as_ref();
+
+ if name.len() != 1 {
+ return Err(Error::new(
+ ErrorKind::DataInvalid,
+ format!(
+ "Invalid database name: {:?}, hierarchical namespaces are not
supported",
+ namespace
+ ),
+ ));
+ }
+
+ let name = name[0].clone();
+
+ if name.is_empty() {
+ return Err(Error::new(
+ ErrorKind::DataInvalid,
+ "Invalid database, provided namespace is empty.",
+ ));
+ }
+
+ Ok(name)
+}
+
+#[macro_export]
+/// Extends aws sdk builder with `catalog_id` if present
+macro_rules! with_catalog_id {
+ ($builder:expr, $config:expr) => {{
+ if let Some(catalog_id) = &$config.catalog_id {
+ $builder.catalog_id(catalog_id)
+ } else {
+ $builder
+ }
+ }};
+}
+
#[cfg(test)]
mod tests {
use aws_sdk_glue::config::ProvideCredentials;
+ use iceberg::{Namespace, Result};
use super::*;
+ #[test]
+ fn test_convert_to_namespace() -> Result<()> {
+ let db = Database::builder()
+ .name("my_db")
+ .location_uri("my_location")
+ .description("my_description")
+ .build()
+ .map_err(from_aws_build_error)?;
+
+ let properties = HashMap::from([
+ (DESCRIPTION.to_string(), "my_description".to_string()),
+ (LOCATION.to_string(), "my_location".to_string()),
+ ]);
+
+ let expected =
+
Namespace::with_properties(NamespaceIdent::new("my_db".to_string()),
properties);
+ let result = convert_to_namespace(&db);
+
+ assert_eq!(result, expected);
+
+ Ok(())
+ }
+
+ #[test]
+ fn test_convert_to_database() -> Result<()> {
+ let namespace = NamespaceIdent::new("my_database".to_string());
+ let properties = HashMap::from([(LOCATION.to_string(),
"my_location".to_string())]);
+
+ let result = convert_to_database(&namespace, &properties)?;
+
+ assert_eq!("my_database", result.name());
+ assert_eq!(Some("my_location".to_string()), result.location_uri);
+
+ Ok(())
+ }
+
+ #[test]
+ fn test_validate_namespace() {
+ let valid_ns = Namespace::new(NamespaceIdent::new("ns".to_string()));
+ let empty_ns = Namespace::new(NamespaceIdent::new("".to_string()));
+ let hierarchical_ns = Namespace::new(
+ NamespaceIdent::from_vec(vec!["level1".to_string(),
"level2".to_string()]).unwrap(),
+ );
+
+ let valid = validate_namespace(valid_ns.name());
+ let empty = validate_namespace(empty_ns.name());
+ let hierarchical = validate_namespace(hierarchical_ns.name());
+
+ assert!(valid.is_ok());
+ assert!(empty.is_err());
+ assert!(hierarchical.is_err());
+ }
+
#[tokio::test]
async fn test_config_with_custom_endpoint() {
let properties = HashMap::new();
diff --git a/crates/catalog/glue/tests/glue_catalog_test.rs
b/crates/catalog/glue/tests/glue_catalog_test.rs
index 24a9aff..c44ffcd 100644
--- a/crates/catalog/glue/tests/glue_catalog_test.rs
+++ b/crates/catalog/glue/tests/glue_catalog_test.rs
@@ -19,7 +19,7 @@
use std::collections::HashMap;
-use iceberg::{Catalog, Result};
+use iceberg::{Catalog, Namespace, NamespaceIdent, Result};
use iceberg_catalog_glue::{
GlueCatalog, GlueCatalogConfig, AWS_ACCESS_KEY_ID, AWS_REGION_NAME,
AWS_SECRET_ACCESS_KEY,
};
@@ -80,13 +80,142 @@ async fn set_test_fixture(func: &str) -> TestFixture {
}
}
+async fn set_test_namespace(fixture: &TestFixture, namespace: &NamespaceIdent)
-> Result<()> {
+ let properties = HashMap::new();
+
+ fixture
+ .glue_catalog
+ .create_namespace(namespace, properties)
+ .await?;
+
+ Ok(())
+}
+
+#[tokio::test]
+async fn test_list_tables() -> Result<()> {
+ let fixture = set_test_fixture("test_list_tables").await;
+ let namespace = NamespaceIdent::new("my_database".to_string());
+ set_test_namespace(&fixture, &namespace).await?;
+
+ let expected = vec![];
+ let result = fixture.glue_catalog.list_tables(&namespace).await?;
+
+ assert_eq!(result, expected);
+
+ Ok(())
+}
+
+#[tokio::test]
+async fn test_drop_namespace() -> Result<()> {
+ let fixture = set_test_fixture("test_drop_namespace").await;
+ let namespace = NamespaceIdent::new("my_database".to_string());
+ set_test_namespace(&fixture, &namespace).await?;
+
+ let exists = fixture.glue_catalog.namespace_exists(&namespace).await?;
+ assert!(exists);
+
+ fixture.glue_catalog.drop_namespace(&namespace).await?;
+
+ let exists = fixture.glue_catalog.namespace_exists(&namespace).await?;
+ assert!(!exists);
+
+ Ok(())
+}
+
+#[tokio::test]
+async fn test_update_namespace() -> Result<()> {
+ let fixture = set_test_fixture("test_update_namespace").await;
+ let namespace = NamespaceIdent::new("my_database".into());
+ set_test_namespace(&fixture, &namespace).await?;
+
+ let before_update = fixture.glue_catalog.get_namespace(&namespace).await?;
+ let before_update = before_update.properties().get("description");
+
+ assert_eq!(before_update, None);
+
+ let properties = HashMap::from([("description".to_string(),
"my_update".to_string())]);
+
+ fixture
+ .glue_catalog
+ .update_namespace(&namespace, properties)
+ .await?;
+
+ let after_update = fixture.glue_catalog.get_namespace(&namespace).await?;
+ let after_update = after_update.properties().get("description");
+
+ assert_eq!(after_update, Some("my_update".to_string()).as_ref());
+
+ Ok(())
+}
+
+#[tokio::test]
+async fn test_namespace_exists() -> Result<()> {
+ let fixture = set_test_fixture("test_namespace_exists").await;
+
+ let namespace = NamespaceIdent::new("my_database".into());
+
+ let exists = fixture.glue_catalog.namespace_exists(&namespace).await?;
+ assert!(!exists);
+
+ set_test_namespace(&fixture, &namespace).await?;
+
+ let exists = fixture.glue_catalog.namespace_exists(&namespace).await?;
+ assert!(exists);
+
+ Ok(())
+}
+
+#[tokio::test]
+async fn test_get_namespace() -> Result<()> {
+ let fixture = set_test_fixture("test_get_namespace").await;
+
+ let namespace = NamespaceIdent::new("my_database".into());
+
+ let does_not_exist = fixture.glue_catalog.get_namespace(&namespace).await;
+ assert!(does_not_exist.is_err());
+
+ set_test_namespace(&fixture, &namespace).await?;
+
+ let result = fixture.glue_catalog.get_namespace(&namespace).await?;
+ let expected = Namespace::new(namespace);
+
+ assert_eq!(result, expected);
+
+ Ok(())
+}
+
+#[tokio::test]
+async fn test_create_namespace() -> Result<()> {
+ let fixture = set_test_fixture("test_create_namespace").await;
+
+ let properties = HashMap::new();
+ let namespace = NamespaceIdent::new("my_database".into());
+
+ let expected = Namespace::new(namespace.clone());
+
+ let result = fixture
+ .glue_catalog
+ .create_namespace(&namespace, properties)
+ .await?;
+
+ assert_eq!(result, expected);
+
+ Ok(())
+}
+
#[tokio::test]
async fn test_list_namespace() -> Result<()> {
let fixture = set_test_fixture("test_list_namespace").await;
let expected = vec![];
let result = fixture.glue_catalog.list_namespaces(None).await?;
+ assert_eq!(result, expected);
+ let namespace = NamespaceIdent::new("my_database".to_string());
+ set_test_namespace(&fixture, &namespace).await?;
+
+ let expected = vec![namespace];
+ let result = fixture.glue_catalog.list_namespaces(None).await?;
assert_eq!(result, expected);
Ok(())