This is an automated email from the ASF dual-hosted git repository.

xuanwo pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/iceberg-rust.git


The following commit(s) were added to refs/heads/main by this push:
     new 9f286a07 feat: add s3tables catalog (#807)
9f286a07 is described below

commit 9f286a071e1bef9456e4b152f5d879fd4a2c6047
Author: flaneur <[email protected]>
AuthorDate: Fri Dec 27 19:13:06 2024 +0800

    feat: add s3tables catalog (#807)
---
 crates/catalog/s3tables/Cargo.toml     |  43 +++
 crates/catalog/s3tables/src/catalog.rs | 633 +++++++++++++++++++++++++++++++++
 crates/catalog/s3tables/src/lib.rs     |  23 ++
 crates/catalog/s3tables/src/utils.rs   | 100 ++++++
 4 files changed, 799 insertions(+)

diff --git a/crates/catalog/s3tables/Cargo.toml 
b/crates/catalog/s3tables/Cargo.toml
new file mode 100644
index 00000000..64b33261
--- /dev/null
+++ b/crates/catalog/s3tables/Cargo.toml
@@ -0,0 +1,43 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+[package]
+name = "iceberg-catalog-s3tables"
+version = { workspace = true }
+edition = { workspace = true }
+homepage = { workspace = true }
+rust-version = { workspace = true }
+
+categories = ["database"]
+description = "Apache Iceberg Rust S3Tables Catalog"
+repository = { workspace = true }
+license = { workspace = true }
+keywords = ["iceberg", "sql", "catalog"]
+
+[dependencies]
+anyhow = { workspace = true }
+async-trait = { workspace = true }
+aws-config = { workspace = true }
+aws-sdk-s3tables = "1.0.0"
+iceberg = { workspace = true }
+serde_json = { workspace = true }
+uuid = { workspace = true, features = ["v4"] }
+
+[dev-dependencies]
+iceberg_test_utils = { path = "../../test_utils", features = ["tests"] }
+itertools = { workspace = true }
+tokio = { workspace = true }
diff --git a/crates/catalog/s3tables/src/catalog.rs 
b/crates/catalog/s3tables/src/catalog.rs
new file mode 100644
index 00000000..b3fab91f
--- /dev/null
+++ b/crates/catalog/s3tables/src/catalog.rs
@@ -0,0 +1,633 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+use std::collections::HashMap;
+
+use anyhow::anyhow;
+use async_trait::async_trait;
+use aws_sdk_s3tables::operation::create_table::CreateTableOutput;
+use aws_sdk_s3tables::operation::get_namespace::GetNamespaceOutput;
+use aws_sdk_s3tables::operation::get_table::GetTableOutput;
+use aws_sdk_s3tables::operation::list_tables::ListTablesOutput;
+use aws_sdk_s3tables::types::OpenTableFormat;
+use iceberg::io::FileIO;
+use iceberg::spec::{TableMetadata, TableMetadataBuilder};
+use iceberg::table::Table;
+use iceberg::{
+    Catalog, Error, ErrorKind, Namespace, NamespaceIdent, Result, TableCommit, 
TableCreation,
+    TableIdent,
+};
+
+use crate::utils::{create_metadata_location, create_sdk_config};
+
+/// S3Tables catalog configuration.
+#[derive(Debug)]
+pub struct S3TablesCatalogConfig {
+    /// Unlike other buckets, S3Tables bucket is not a physical bucket, but a 
virtual bucket
+    /// that is managed by s3tables. We can't directly access the bucket with 
path like
+    /// s3://{bucket_name}/{file_path}, all the operations are done with 
respect of the bucket
+    /// ARN.
+    table_bucket_arn: String,
+    /// Properties for the catalog. The available properties are:
+    /// - `profile_name`: The name of the AWS profile to use.
+    /// - `region_name`: The AWS region to use.
+    /// - `aws_access_key_id`: The AWS access key ID to use.
+    /// - `aws_secret_access_key`: The AWS secret access key to use.
+    /// - `aws_session_token`: The AWS session token to use.
+    properties: HashMap<String, String>,
+    /// Endpoint URL for the catalog.
+    endpoint_url: Option<String>,
+}
+
+/// S3Tables catalog implementation.
+#[derive(Debug)]
+pub struct S3TablesCatalog {
+    config: S3TablesCatalogConfig,
+    s3tables_client: aws_sdk_s3tables::Client,
+    file_io: FileIO,
+}
+
+impl S3TablesCatalog {
+    /// Creates a new S3Tables catalog.
+    pub async fn new(config: S3TablesCatalogConfig) -> Result<Self> {
+        let aws_config = create_sdk_config(&config.properties, 
config.endpoint_url.clone()).await;
+        let s3tables_client = aws_sdk_s3tables::Client::new(&aws_config);
+
+        // parse bucket name from ARN format like: 
arn:aws:s3:<region>:<account>:bucket/<bucket_name>
+        let bucket_name = config
+            .table_bucket_arn
+            .rsplit(":bucket/")
+            .next()
+            .ok_or_else(|| {
+                Error::new(
+                    ErrorKind::DataInvalid,
+                    format!("Invalid bucket ARN format: {}", 
config.table_bucket_arn),
+                )
+            })?;
+
+        let file_io = FileIO::from_path(&format!("s3://{}", bucket_name))?
+            .with_props(&config.properties)
+            .build()?;
+
+        Ok(Self {
+            config,
+            s3tables_client,
+            file_io,
+        })
+    }
+}
+
+#[async_trait]
+impl Catalog for S3TablesCatalog {
+    /// List namespaces from s3tables catalog.
+    ///
+    /// S3Tables doesn't support nested namespaces. If parent is provided, it 
will
+    /// return an empty list.
+    async fn list_namespaces(
+        &self,
+        parent: Option<&NamespaceIdent>,
+    ) -> Result<Vec<NamespaceIdent>> {
+        if parent.is_some() {
+            return Ok(vec![]);
+        }
+
+        let mut result = Vec::new();
+        let mut continuation_token = None;
+        loop {
+            let mut req = self
+                .s3tables_client
+                .list_namespaces()
+                .table_bucket_arn(self.config.table_bucket_arn.clone());
+            if let Some(token) = continuation_token {
+                req = req.continuation_token(token);
+            }
+            let resp = req.send().await.map_err(from_aws_sdk_error)?;
+            for ns in resp.namespaces() {
+                
result.push(NamespaceIdent::from_vec(ns.namespace().to_vec())?);
+            }
+            continuation_token = resp.continuation_token().map(|s| 
s.to_string());
+            if continuation_token.is_none() {
+                break;
+            }
+        }
+        Ok(result)
+    }
+
+    /// Creates a new namespace with the given identifier and properties.
+    ///
+    /// Attempts to create a namespace defined by the `namespace`. The 
`properties`
+    /// parameter is ignored.
+    ///
+    /// The following naming rules apply to namespaces:
+    ///
+    /// - Names must be between 3 (min) and 63 (max) characters long.
+    /// - Names can consist only of lowercase letters, numbers, and 
underscores (_).
+    /// - Names must begin and end with a letter or number.
+    /// - Names must not contain hyphens (-) or periods (.).
+    ///
+    /// This function can return an error in the following situations:
+    ///
+    /// - Errors from the underlying database creation process, converted using
+    /// `from_aws_sdk_error`.
+    async fn create_namespace(
+        &self,
+        namespace: &NamespaceIdent,
+        _properties: HashMap<String, String>,
+    ) -> Result<Namespace> {
+        let req = self
+            .s3tables_client
+            .create_namespace()
+            .table_bucket_arn(self.config.table_bucket_arn.clone())
+            .namespace(namespace.to_url_string());
+        req.send().await.map_err(from_aws_sdk_error)?;
+        Ok(Namespace::with_properties(
+            namespace.clone(),
+            HashMap::new(),
+        ))
+    }
+
+    /// Retrieves a namespace by its identifier.
+    ///
+    /// Validates the given namespace identifier and then queries the
+    /// underlying database client to fetch the corresponding namespace data.
+    /// Constructs a `Namespace` object with the retrieved data and returns it.
+    ///
+    /// This function can return an error in any of the following situations:
+    /// - If there is an error querying the database, returned by
+    /// `from_aws_sdk_error`.
+    async fn get_namespace(&self, namespace: &NamespaceIdent) -> 
Result<Namespace> {
+        let req = self
+            .s3tables_client
+            .get_namespace()
+            .table_bucket_arn(self.config.table_bucket_arn.clone())
+            .namespace(namespace.to_url_string());
+        let resp: GetNamespaceOutput = 
req.send().await.map_err(from_aws_sdk_error)?;
+        let properties = HashMap::new();
+        Ok(Namespace::with_properties(
+            NamespaceIdent::from_vec(resp.namespace().to_vec())?,
+            properties,
+        ))
+    }
+
+    /// Checks if a namespace exists within the s3tables catalog.
+    ///
+    /// Validates the namespace identifier by querying the s3tables catalog
+    /// to determine if the specified namespace exists.
+    ///
+    /// # Returns
+    /// A `Result<bool>` indicating the outcome of the check:
+    /// - `Ok(true)` if the namespace exists.
+    /// - `Ok(false)` if the namespace does not exist, identified by a specific
+    /// `IsNotFoundException` variant.
+    /// - `Err(...)` if an error occurs during validation or the s3tables 
catalog
+    /// query, with the error encapsulating the issue.
+    async fn namespace_exists(&self, namespace: &NamespaceIdent) -> 
Result<bool> {
+        let req = self
+            .s3tables_client
+            .get_namespace()
+            .table_bucket_arn(self.config.table_bucket_arn.clone())
+            .namespace(namespace.to_url_string());
+        match req.send().await {
+            Ok(_) => Ok(true),
+            Err(err) => {
+                if err.as_service_error().map(|e| e.is_not_found_exception()) 
== Some(true) {
+                    Ok(false)
+                } else {
+                    Err(from_aws_sdk_error(err))
+                }
+            }
+        }
+    }
+
+    /// Updates the properties of an existing namespace.
+    ///
+    /// S3Tables doesn't support updating namespace properties, so this 
function
+    /// will always return an error.
+    async fn update_namespace(
+        &self,
+        _namespace: &NamespaceIdent,
+        _properties: HashMap<String, String>,
+    ) -> Result<()> {
+        Err(Error::new(
+            ErrorKind::FeatureUnsupported,
+            "Update namespace is not supported for s3tables catalog",
+        ))
+    }
+
+    /// Drops an existing namespace from the s3tables catalog.
+    ///
+    /// Validates the namespace identifier and then deletes the corresponding
+    /// namespace from the s3tables catalog.
+    ///
+    /// This function can return an error in the following situations:
+    /// - Errors from the underlying database deletion process, converted using
+    /// `from_aws_sdk_error`.
+    async fn drop_namespace(&self, namespace: &NamespaceIdent) -> Result<()> {
+        let req = self
+            .s3tables_client
+            .delete_namespace()
+            .table_bucket_arn(self.config.table_bucket_arn.clone())
+            .namespace(namespace.to_url_string());
+        req.send().await.map_err(from_aws_sdk_error)?;
+        Ok(())
+    }
+
+    /// Lists all tables within a given namespace.
+    ///
+    /// Retrieves all tables associated with the specified namespace and 
returns
+    /// their identifiers.
+    ///
+    /// This function can return an error in the following situations:
+    /// - Errors from the underlying database query process, converted using
+    /// `from_aws_sdk_error`.
+    async fn list_tables(&self, namespace: &NamespaceIdent) -> 
Result<Vec<TableIdent>> {
+        let mut result = Vec::new();
+        let mut continuation_token = None;
+        loop {
+            let mut req = self
+                .s3tables_client
+                .list_tables()
+                .table_bucket_arn(self.config.table_bucket_arn.clone())
+                .namespace(namespace.to_url_string());
+            if let Some(token) = continuation_token {
+                req = req.continuation_token(token);
+            }
+            let resp: ListTablesOutput = 
req.send().await.map_err(from_aws_sdk_error)?;
+            for table in resp.tables() {
+                result.push(TableIdent::new(
+                    NamespaceIdent::from_vec(table.namespace().to_vec())?,
+                    table.name().to_string(),
+                ));
+            }
+            continuation_token = resp.continuation_token().map(|s| 
s.to_string());
+            if continuation_token.is_none() {
+                break;
+            }
+        }
+        Ok(result)
+    }
+
+    /// Creates a new table within a specified namespace.
+    ///
+    /// Attempts to create a table defined by the `creation` parameter. The 
metadata
+    /// location is generated by the s3tables catalog, looks like:
+    ///
+    /// s3://{RANDOM WAREHOUSE 
LOCATION}/metadata/{VERSION}-{UUID}.metadata.json
+    ///
+    /// We have to get this random warehouse location after the table is 
created.
+    ///
+    /// This function can return an error in the following situations:
+    /// - If the location of the table is set by user, identified by a specific
+    /// `DataInvalid` variant.
+    /// - Errors from the underlying database creation process, converted using
+    /// `from_aws_sdk_error`.
+    async fn create_table(
+        &self,
+        namespace: &NamespaceIdent,
+        mut creation: TableCreation,
+    ) -> Result<Table> {
+        let table_ident = TableIdent::new(namespace.clone(), 
creation.name.clone());
+
+        // create table
+        let create_resp: CreateTableOutput = self
+            .s3tables_client
+            .create_table()
+            .table_bucket_arn(self.config.table_bucket_arn.clone())
+            .namespace(namespace.to_url_string())
+            .format(OpenTableFormat::Iceberg)
+            .name(table_ident.name())
+            .send()
+            .await
+            .map_err(from_aws_sdk_error)?;
+
+        // prepare metadata location. the warehouse location is generated by 
s3tables catalog,
+        // which looks like: 
s3://e6c9bf20-991a-46fb-kni5xs1q2yxi3xxdyxzjzigdeop1quse2b--table-s3
+        let metadata_location = match &creation.location {
+            Some(_) => {
+                return Err(Error::new(
+                    ErrorKind::DataInvalid,
+                    "The location of the table is generated by s3tables 
catalog, can't be set by user.",
+                ));
+            }
+            None => {
+                let get_resp: GetTableOutput = self
+                    .s3tables_client
+                    .get_table()
+                    .table_bucket_arn(self.config.table_bucket_arn.clone())
+                    .namespace(namespace.to_url_string())
+                    .name(table_ident.name())
+                    .send()
+                    .await
+                    .map_err(from_aws_sdk_error)?;
+                let warehouse_location = 
get_resp.warehouse_location().to_string();
+                create_metadata_location(warehouse_location, 0)?
+            }
+        };
+
+        // write metadata to file
+        creation.location = Some(metadata_location.clone());
+        let metadata = TableMetadataBuilder::from_table_creation(creation)?
+            .build()?
+            .metadata;
+        self.file_io
+            .new_output(&metadata_location)?
+            .write(serde_json::to_vec(&metadata)?.into())
+            .await?;
+
+        // update metadata location
+        self.s3tables_client
+            .update_table_metadata_location()
+            .table_bucket_arn(self.config.table_bucket_arn.clone())
+            .namespace(namespace.to_url_string())
+            .name(table_ident.name())
+            .metadata_location(metadata_location.clone())
+            .version_token(create_resp.version_token())
+            .send()
+            .await
+            .map_err(from_aws_sdk_error)?;
+
+        let table = Table::builder()
+            .identifier(table_ident)
+            .metadata_location(metadata_location)
+            .metadata(metadata)
+            .file_io(self.file_io.clone())
+            .build()?;
+        Ok(table)
+    }
+
+    /// Loads an existing table from the s3tables catalog.
+    ///
+    /// Retrieves the metadata location of the specified table and constructs a
+    /// `Table` object with the retrieved metadata.
+    ///
+    /// This function can return an error in the following situations:
+    /// - If the table does not have a metadata location, identified by a 
specific
+    /// `Unexpected` variant.
+    /// - Errors from the underlying database query process, converted using
+    /// `from_aws_sdk_error`.
+    async fn load_table(&self, table_ident: &TableIdent) -> Result<Table> {
+        let req = self
+            .s3tables_client
+            .get_table()
+            .table_bucket_arn(self.config.table_bucket_arn.clone())
+            .namespace(table_ident.namespace().to_url_string())
+            .name(table_ident.name());
+        let resp: GetTableOutput = 
req.send().await.map_err(from_aws_sdk_error)?;
+
+        // when a table is created, it's possible that the metadata location 
is not set.
+        let metadata_location = resp.metadata_location().ok_or_else(|| {
+            Error::new(
+                ErrorKind::Unexpected,
+                format!(
+                    "Table {} does not have metadata location",
+                    table_ident.name()
+                ),
+            )
+        })?;
+        let input_file = self.file_io.new_input(metadata_location)?;
+        let metadata_content = input_file.read().await?;
+        let metadata = 
serde_json::from_slice::<TableMetadata>(&metadata_content)?;
+
+        let table = Table::builder()
+            .identifier(table_ident.clone())
+            .metadata(metadata)
+            .metadata_location(metadata_location)
+            .file_io(self.file_io.clone())
+            .build()?;
+        Ok(table)
+    }
+
+    /// Drops an existing table from the s3tables catalog.
+    ///
+    /// Validates the table identifier and then deletes the corresponding
+    /// table from the s3tables catalog.
+    ///
+    /// This function can return an error in the following situations:
+    /// - Errors from the underlying database deletion process, converted using
+    /// `from_aws_sdk_error`.
+    async fn drop_table(&self, table: &TableIdent) -> Result<()> {
+        let req = self
+            .s3tables_client
+            .delete_table()
+            .table_bucket_arn(self.config.table_bucket_arn.clone())
+            .namespace(table.namespace().to_url_string())
+            .name(table.name());
+        req.send().await.map_err(from_aws_sdk_error)?;
+        Ok(())
+    }
+
+    /// Checks if a table exists within the s3tables catalog.
+    ///
+    /// Validates the table identifier by querying the s3tables catalog
+    /// to determine if the specified table exists.
+    ///
+    /// # Returns
+    /// A `Result<bool>` indicating the outcome of the check:
+    /// - `Ok(true)` if the table exists.
+    /// - `Ok(false)` if the table does not exist, identified by a specific
+    /// `IsNotFoundException` variant.
+    /// - `Err(...)` if an error occurs during validation or the s3tables 
catalog
+    /// query, with the error encapsulating the issue.
+    async fn table_exists(&self, table_ident: &TableIdent) -> Result<bool> {
+        let req = self
+            .s3tables_client
+            .get_table()
+            .table_bucket_arn(self.config.table_bucket_arn.clone())
+            .namespace(table_ident.namespace().to_url_string())
+            .name(table_ident.name());
+        match req.send().await {
+            Ok(_) => Ok(true),
+            Err(err) => {
+                if err.as_service_error().map(|e| e.is_not_found_exception()) 
== Some(true) {
+                    Ok(false)
+                } else {
+                    Err(from_aws_sdk_error(err))
+                }
+            }
+        }
+    }
+
+    /// Renames an existing table within the s3tables catalog.
+    ///
+    /// Validates the source and destination table identifiers and then renames
+    /// the source table to the destination table.
+    ///
+    /// This function can return an error in the following situations:
+    /// - Errors from the underlying database renaming process, converted using
+    /// `from_aws_sdk_error`.
+    async fn rename_table(&self, src: &TableIdent, dest: &TableIdent) -> 
Result<()> {
+        let req = self
+            .s3tables_client
+            .rename_table()
+            .table_bucket_arn(self.config.table_bucket_arn.clone())
+            .namespace(src.namespace().to_url_string())
+            .name(src.name())
+            .new_namespace_name(dest.namespace().to_url_string())
+            .new_name(dest.name());
+        req.send().await.map_err(from_aws_sdk_error)?;
+        Ok(())
+    }
+
+    /// Updates an existing table within the s3tables catalog.
+    ///
+    /// This function is still in development and will always return an error.
+    async fn update_table(&self, _commit: TableCommit) -> Result<Table> {
+        Err(Error::new(
+            ErrorKind::FeatureUnsupported,
+            "Updating a table is not supported yet",
+        ))
+    }
+}
+
+/// Format AWS SDK error into iceberg error
+pub(crate) fn from_aws_sdk_error<T>(error: 
aws_sdk_s3tables::error::SdkError<T>) -> Error
+where T: std::fmt::Debug {
+    Error::new(
+        ErrorKind::Unexpected,
+        "Operation failed for hitting aws skd error".to_string(),
+    )
+    .with_source(anyhow!("aws sdk error: {:?}", error))
+}
+
+#[cfg(test)]
+mod tests {
+    use iceberg::spec::{NestedField, PrimitiveType, Schema, Type};
+
+    use super::*;
+
+    async fn load_s3tables_catalog_from_env() -> 
Result<Option<S3TablesCatalog>> {
+        let table_bucket_arn = match std::env::var("TABLE_BUCKET_ARN").ok() {
+            Some(table_bucket_arn) => table_bucket_arn,
+            None => return Ok(None),
+        };
+
+        let properties = HashMap::new();
+        let config = S3TablesCatalogConfig {
+            table_bucket_arn,
+            properties,
+            endpoint_url: None,
+        };
+
+        Ok(Some(S3TablesCatalog::new(config).await?))
+    }
+
+    #[tokio::test]
+    async fn test_s3tables_list_namespace() {
+        let catalog = match load_s3tables_catalog_from_env().await {
+            Ok(Some(catalog)) => catalog,
+            Ok(None) => return,
+            Err(e) => panic!("Error loading catalog: {}", e),
+        };
+
+        let namespaces = catalog.list_namespaces(None).await.unwrap();
+        assert!(!namespaces.is_empty());
+    }
+
+    #[tokio::test]
+    async fn test_s3tables_list_tables() {
+        let catalog = match load_s3tables_catalog_from_env().await {
+            Ok(Some(catalog)) => catalog,
+            Ok(None) => return,
+            Err(e) => panic!("Error loading catalog: {}", e),
+        };
+
+        let tables = catalog
+            .list_tables(&NamespaceIdent::new("aws_s3_metadata".to_string()))
+            .await
+            .unwrap();
+        assert!(!tables.is_empty());
+    }
+
+    #[tokio::test]
+    async fn test_s3tables_load_table() {
+        let catalog = match load_s3tables_catalog_from_env().await {
+            Ok(Some(catalog)) => catalog,
+            Ok(None) => return,
+            Err(e) => panic!("Error loading catalog: {}", e),
+        };
+
+        let table = catalog
+            .load_table(&TableIdent::new(
+                NamespaceIdent::new("aws_s3_metadata".to_string()),
+                "query_storage_metadata".to_string(),
+            ))
+            .await
+            .unwrap();
+        println!("{:?}", table);
+    }
+
+    #[tokio::test]
+    async fn test_s3tables_create_delete_namespace() {
+        let catalog = match load_s3tables_catalog_from_env().await {
+            Ok(Some(catalog)) => catalog,
+            Ok(None) => return,
+            Err(e) => panic!("Error loading catalog: {}", e),
+        };
+
+        let namespace = 
NamespaceIdent::new("test_s3tables_create_delete_namespace".to_string());
+        catalog
+            .create_namespace(&namespace, HashMap::new())
+            .await
+            .unwrap();
+        assert!(catalog.namespace_exists(&namespace).await.unwrap());
+        catalog.drop_namespace(&namespace).await.unwrap();
+        assert!(!catalog.namespace_exists(&namespace).await.unwrap());
+    }
+
+    #[tokio::test]
+    async fn test_s3tables_create_delete_table() {
+        let catalog = match load_s3tables_catalog_from_env().await {
+            Ok(Some(catalog)) => catalog,
+            Ok(None) => return,
+            Err(e) => panic!("Error loading catalog: {}", e),
+        };
+
+        let creation = {
+            let schema = Schema::builder()
+                .with_schema_id(0)
+                .with_fields(vec![
+                    NestedField::required(1, "foo", 
Type::Primitive(PrimitiveType::Int)).into(),
+                    NestedField::required(2, "bar", 
Type::Primitive(PrimitiveType::String)).into(),
+                ])
+                .build()
+                .unwrap();
+            TableCreation::builder()
+                .name("test_s3tables_create_delete_table".to_string())
+                .properties(HashMap::new())
+                .schema(schema)
+                .build()
+        };
+
+        let namespace = 
NamespaceIdent::new("test_s3tables_create_delete_table".to_string());
+        let table_ident = TableIdent::new(
+            namespace.clone(),
+            "test_s3tables_create_delete_table".to_string(),
+        );
+        catalog.drop_namespace(&namespace).await.ok();
+        catalog.drop_table(&table_ident).await.ok();
+
+        catalog
+            .create_namespace(&namespace, HashMap::new())
+            .await
+            .unwrap();
+        catalog.create_table(&namespace, creation).await.unwrap();
+        assert!(catalog.table_exists(&table_ident).await.unwrap());
+        catalog.drop_table(&table_ident).await.unwrap();
+        assert!(!catalog.table_exists(&table_ident).await.unwrap());
+        catalog.drop_namespace(&namespace).await.unwrap();
+    }
+}
diff --git a/crates/catalog/s3tables/src/lib.rs 
b/crates/catalog/s3tables/src/lib.rs
new file mode 100644
index 00000000..b5e8b0c3
--- /dev/null
+++ b/crates/catalog/s3tables/src/lib.rs
@@ -0,0 +1,23 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+//! Iceberg s3tables catalog implementation.
+
+#![deny(missing_docs)]
+
+mod catalog;
+mod utils;
diff --git a/crates/catalog/s3tables/src/utils.rs 
b/crates/catalog/s3tables/src/utils.rs
new file mode 100644
index 00000000..d0195dcc
--- /dev/null
+++ b/crates/catalog/s3tables/src/utils.rs
@@ -0,0 +1,100 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+use std::collections::HashMap;
+
+use aws_config::{BehaviorVersion, Region, SdkConfig};
+use aws_sdk_s3tables::config::Credentials;
+use iceberg::{Error, ErrorKind, Result};
+use uuid::Uuid;
+
+/// Property aws profile name
+pub const AWS_PROFILE_NAME: &str = "profile_name";
+/// Property aws region
+pub const AWS_REGION_NAME: &str = "region_name";
+/// Property aws access key
+pub const AWS_ACCESS_KEY_ID: &str = "aws_access_key_id";
+/// Property aws secret access key
+pub const AWS_SECRET_ACCESS_KEY: &str = "aws_secret_access_key";
+/// Property aws session token
+pub const AWS_SESSION_TOKEN: &str = "aws_session_token";
+
+/// Creates an aws sdk configuration based on
+/// provided properties and an optional endpoint URL.
+pub(crate) async fn create_sdk_config(
+    properties: &HashMap<String, String>,
+    endpoint_url: Option<String>,
+) -> SdkConfig {
+    let mut config = aws_config::defaults(BehaviorVersion::latest());
+
+    if properties.is_empty() {
+        return config.load().await;
+    }
+
+    if let Some(endpoint_url) = endpoint_url {
+        config = config.endpoint_url(endpoint_url);
+    }
+
+    if let (Some(access_key), Some(secret_key)) = (
+        properties.get(AWS_ACCESS_KEY_ID),
+        properties.get(AWS_SECRET_ACCESS_KEY),
+    ) {
+        let session_token = properties.get(AWS_SESSION_TOKEN).cloned();
+        let credentials_provider =
+            Credentials::new(access_key, secret_key, session_token, None, 
"properties");
+
+        config = config.credentials_provider(credentials_provider)
+    };
+
+    if let Some(profile_name) = properties.get(AWS_PROFILE_NAME) {
+        config = config.profile_name(profile_name);
+    }
+
+    if let Some(region_name) = properties.get(AWS_REGION_NAME) {
+        let region = Region::new(region_name.clone());
+        config = config.region(region);
+    }
+
+    config.load().await
+}
+
+/// Create metadata location from `location` and `version`
+pub(crate) fn create_metadata_location(
+    warehouse_location: impl AsRef<str>,
+    version: i32,
+) -> Result<String> {
+    if version < 0 {
+        return Err(Error::new(
+            ErrorKind::DataInvalid,
+            format!(
+                "Table metadata version: '{}' must be a non-negative integer",
+                version
+            ),
+        ));
+    };
+
+    let version = format!("{:0>5}", version);
+    let id = Uuid::new_v4();
+    let metadata_location = format!(
+        "{}/metadata/{}-{}.metadata.json",
+        warehouse_location.as_ref(),
+        version,
+        id
+    );
+
+    Ok(metadata_location)
+}

Reply via email to