This is an automated email from the ASF dual-hosted git repository.

liurenjie1024 pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/iceberg-rust.git


The following commit(s) were added to refs/heads/main by this push:
     new dcc380a  feat: Glue Catalog - table operations (3/3) (#314)
dcc380a is described below

commit dcc380af294862b0a802b140c813b4cc4e304441
Author: Marvin Lanhenke <[email protected]>
AuthorDate: Sun Apr 21 15:01:55 2024 +0200

    feat: Glue Catalog - table operations (3/3) (#314)
    
    * add GlueSchemaBuilder
    
    * add warehouse
    
    * add serde_json, tokio, uuid
    
    * add minio
    
    * add create_table
    
    * add tests utils
    
    * add load_table
    
    * add drop_table + table_exists
    
    * add rename_table
    
    * add docs
    
    * fix: docs + err_msg
    
    * fix: remove unused const
    
    * fix: default_table_location
    
    * fix: remove single quotes error message
    
    * chore: add test-condition `test_rename_table`
    
    * chore: add test-condition `test_table_exists`
---
 crates/catalog/glue/Cargo.toml                     |   4 +-
 crates/catalog/glue/src/catalog.rs                 | 304 ++++++++++++-
 crates/catalog/glue/src/lib.rs                     |   1 +
 crates/catalog/glue/src/schema.rs                  | 485 +++++++++++++++++++++
 crates/catalog/glue/src/utils.rs                   | 267 +++++++++++-
 .../glue/testdata/glue_catalog/docker-compose.yaml |  22 +
 crates/catalog/glue/tests/glue_catalog_test.rs     | 188 +++++++-
 7 files changed, 1243 insertions(+), 28 deletions(-)

diff --git a/crates/catalog/glue/Cargo.toml b/crates/catalog/glue/Cargo.toml
index daa9587..0508378 100644
--- a/crates/catalog/glue/Cargo.toml
+++ b/crates/catalog/glue/Cargo.toml
@@ -35,9 +35,11 @@ aws-config = { workspace = true }
 aws-sdk-glue = { workspace = true }
 iceberg = { workspace = true }
 log = { workspace = true }
+serde_json = { workspace = true }
+tokio = { workspace = true }
 typed-builder = { workspace = true }
+uuid = { workspace = true }
 
 [dev-dependencies]
 iceberg_test_utils = { path = "../../test_utils", features = ["tests"] }
 port_scanner = { workspace = true }
-tokio = { workspace = true }
diff --git a/crates/catalog/glue/src/catalog.rs 
b/crates/catalog/glue/src/catalog.rs
index d152a54..f402129 100644
--- a/crates/catalog/glue/src/catalog.rs
+++ b/crates/catalog/glue/src/catalog.rs
@@ -16,18 +16,23 @@
 // under the License.
 
 use async_trait::async_trait;
+use aws_sdk_glue::types::TableInput;
+use iceberg::io::FileIO;
+use iceberg::spec::{TableMetadata, TableMetadataBuilder};
 use iceberg::table::Table;
 use iceberg::{
     Catalog, Error, ErrorKind, Namespace, NamespaceIdent, Result, TableCommit, 
TableCreation,
     TableIdent,
 };
 use std::{collections::HashMap, fmt::Debug};
+use tokio::io::{AsyncReadExt, AsyncWriteExt};
 
 use typed_builder::TypedBuilder;
 
-use crate::error::from_aws_sdk_error;
+use crate::error::{from_aws_build_error, from_aws_sdk_error};
 use crate::utils::{
-    convert_to_database, convert_to_namespace, create_sdk_config, 
validate_namespace,
+    convert_to_database, convert_to_glue_table, convert_to_namespace, 
create_metadata_location,
+    create_sdk_config, get_default_table_location, get_metadata_location, 
validate_namespace,
 };
 use crate::with_catalog_id;
 
@@ -38,6 +43,7 @@ pub struct GlueCatalogConfig {
     uri: Option<String>,
     #[builder(default, setter(strip_option))]
     catalog_id: Option<String>,
+    warehouse: String,
     #[builder(default)]
     props: HashMap<String, String>,
 }
@@ -48,6 +54,7 @@ struct GlueClient(aws_sdk_glue::Client);
 pub struct GlueCatalog {
     config: GlueCatalogConfig,
     client: GlueClient,
+    file_io: FileIO,
 }
 
 impl Debug for GlueCatalog {
@@ -60,15 +67,24 @@ impl Debug for GlueCatalog {
 
 impl GlueCatalog {
     /// Create a new glue catalog
-    pub async fn new(config: GlueCatalogConfig) -> Self {
+    pub async fn new(config: GlueCatalogConfig) -> Result<Self> {
         let sdk_config = create_sdk_config(&config.props, 
config.uri.as_ref()).await;
 
         let client = aws_sdk_glue::Client::new(&sdk_config);
 
-        GlueCatalog {
+        let file_io = FileIO::from_path(&config.warehouse)?
+            .with_props(&config.props)
+            .build()?;
+
+        Ok(GlueCatalog {
             config,
             client: GlueClient(client),
-        }
+            file_io,
+        })
+    }
+    /// Get the catalogs `FileIO`
+    pub fn file_io(&self) -> FileIO {
+        self.file_io.clone()
     }
 }
 
@@ -77,7 +93,7 @@ impl Catalog for GlueCatalog {
     /// List namespaces from glue catalog.
     ///
     /// Glue doesn't support nested namespaces.
-    /// We will return an empty list if parent is some
+    /// We will return an empty list if parent is some.
     async fn list_namespaces(
         &self,
         parent: Option<&NamespaceIdent>,
@@ -277,6 +293,7 @@ impl Catalog for GlueCatalog {
     /// querying the database.
     async fn list_tables(&self, namespace: &NamespaceIdent) -> 
Result<Vec<TableIdent>> {
         let db_name = validate_namespace(namespace)?;
+
         let mut table_list: Vec<TableIdent> = Vec::new();
         let mut next_token: Option<String> = None;
 
@@ -310,31 +327,282 @@ impl Catalog for GlueCatalog {
         Ok(table_list)
     }
 
+    /// Creates a new table within a specified namespace using the provided
+    /// table creation settings.
+    ///
+    /// # Returns
+    /// A `Result` wrapping a `Table` object representing the newly created
+    /// table.
+    ///
+    /// # Errors
+    /// This function may return an error in several cases, including invalid
+    /// namespace identifiers, failure to determine a default storage location,
+    /// issues generating or writing table metadata, and errors communicating
+    /// with the Glue Catalog.
     async fn create_table(
         &self,
-        _namespace: &NamespaceIdent,
-        _creation: TableCreation,
+        namespace: &NamespaceIdent,
+        creation: TableCreation,
     ) -> Result<Table> {
-        todo!()
+        let db_name = validate_namespace(namespace)?;
+        let table_name = creation.name.clone();
+
+        let location = match &creation.location {
+            Some(location) => location.clone(),
+            None => {
+                let ns = self.get_namespace(namespace).await?;
+                get_default_table_location(&ns, &db_name, &table_name, 
&self.config.warehouse)
+            }
+        };
+
+        let metadata = 
TableMetadataBuilder::from_table_creation(creation)?.build()?;
+        let metadata_location = create_metadata_location(&location, 0)?;
+
+        let mut file = self
+            .file_io
+            .new_output(&metadata_location)?
+            .writer()
+            .await?;
+        file.write_all(&serde_json::to_vec(&metadata)?).await?;
+        file.shutdown().await?;
+
+        let glue_table = convert_to_glue_table(
+            &table_name,
+            metadata_location.clone(),
+            &metadata,
+            metadata.properties(),
+            None,
+        )?;
+
+        let builder = self
+            .client
+            .0
+            .create_table()
+            .database_name(&db_name)
+            .table_input(glue_table);
+        let builder = with_catalog_id!(builder, self.config);
+
+        builder.send().await.map_err(from_aws_sdk_error)?;
+
+        let table = Table::builder()
+            .file_io(self.file_io())
+            .metadata_location(metadata_location)
+            .metadata(metadata)
+            .identifier(TableIdent::new(NamespaceIdent::new(db_name), 
table_name))
+            .build();
+
+        Ok(table)
     }
 
-    async fn load_table(&self, _table: &TableIdent) -> Result<Table> {
-        todo!()
+    /// Loads a table from the Glue Catalog and constructs a `Table` object
+    /// based on its metadata.
+    ///
+    /// # Returns
+    /// A `Result` wrapping a `Table` object that represents the loaded table.
+    ///
+    /// # Errors
+    /// This function may return an error in several scenarios, including:
+    /// - Failure to validate the namespace.
+    /// - Failure to retrieve the table from the Glue Catalog.
+    /// - Absence of metadata location information in the table's properties.
+    /// - Issues reading or deserializing the table's metadata file.
+    async fn load_table(&self, table: &TableIdent) -> Result<Table> {
+        let db_name = validate_namespace(table.namespace())?;
+        let table_name = table.name();
+
+        let builder = self
+            .client
+            .0
+            .get_table()
+            .database_name(&db_name)
+            .name(table_name);
+        let builder = with_catalog_id!(builder, self.config);
+
+        let glue_table_output = 
builder.send().await.map_err(from_aws_sdk_error)?;
+
+        match glue_table_output.table() {
+            None => Err(Error::new(
+                ErrorKind::Unexpected,
+                format!(
+                    "Table object for database: {} and table: {} does not 
exist",
+                    db_name, table_name
+                ),
+            )),
+            Some(table) => {
+                let metadata_location = 
get_metadata_location(&table.parameters)?;
+
+                let mut reader = 
self.file_io.new_input(&metadata_location)?.reader().await?;
+                let mut metadata_str = String::new();
+                reader.read_to_string(&mut metadata_str).await?;
+                let metadata = 
serde_json::from_str::<TableMetadata>(&metadata_str)?;
+
+                let table = Table::builder()
+                    .file_io(self.file_io())
+                    .metadata_location(metadata_location)
+                    .metadata(metadata)
+                    .identifier(TableIdent::new(
+                        NamespaceIdent::new(db_name),
+                        table_name.to_owned(),
+                    ))
+                    .build();
+
+                Ok(table)
+            }
+        }
     }
 
-    async fn drop_table(&self, _table: &TableIdent) -> Result<()> {
-        todo!()
+    /// Asynchronously drops a table from the database.
+    ///
+    /// # Errors
+    /// Returns an error if:
+    /// - The namespace provided in `table` cannot be validated
+    /// or does not exist.
+    /// - The underlying database client encounters an error while
+    /// attempting to drop the table. This includes scenarios where
+    /// the table does not exist.
+    /// - Any network or communication error occurs with the database backend.
+    async fn drop_table(&self, table: &TableIdent) -> Result<()> {
+        let db_name = validate_namespace(table.namespace())?;
+        let table_name = table.name();
+
+        let builder = self
+            .client
+            .0
+            .delete_table()
+            .database_name(&db_name)
+            .name(table_name);
+        let builder = with_catalog_id!(builder, self.config);
+
+        builder.send().await.map_err(from_aws_sdk_error)?;
+
+        Ok(())
     }
 
-    async fn table_exists(&self, _table: &TableIdent) -> Result<bool> {
-        todo!()
+    /// Asynchronously checks the existence of a specified table
+    /// in the database.
+    ///
+    /// # Returns
+    /// - `Ok(true)` if the table exists in the database.
+    /// - `Ok(false)` if the table does not exist in the database.
+    /// - `Err(...)` if an error occurs during the process
+    async fn table_exists(&self, table: &TableIdent) -> Result<bool> {
+        let db_name = validate_namespace(table.namespace())?;
+        let table_name = table.name();
+
+        let builder = self
+            .client
+            .0
+            .get_table()
+            .database_name(&db_name)
+            .name(table_name);
+        let builder = with_catalog_id!(builder, self.config);
+
+        let resp = builder.send().await;
+
+        match resp {
+            Ok(_) => Ok(true),
+            Err(err) => {
+                if err
+                    .as_service_error()
+                    .map(|e| e.is_entity_not_found_exception())
+                    == Some(true)
+                {
+                    return Ok(false);
+                }
+                Err(from_aws_sdk_error(err))
+            }
+        }
     }
 
-    async fn rename_table(&self, _src: &TableIdent, _dest: &TableIdent) -> 
Result<()> {
-        todo!()
+    /// Asynchronously renames a table within the database
+    /// or moves it between namespaces (databases).
+    ///
+    /// # Returns
+    /// - `Ok(())` on successful rename or move of the table.
+    /// - `Err(...)` if an error occurs during the process.
+    async fn rename_table(&self, src: &TableIdent, dest: &TableIdent) -> 
Result<()> {
+        let src_db_name = validate_namespace(src.namespace())?;
+        let dest_db_name = validate_namespace(dest.namespace())?;
+
+        let src_table_name = src.name();
+        let dest_table_name = dest.name();
+
+        let builder = self
+            .client
+            .0
+            .get_table()
+            .database_name(&src_db_name)
+            .name(src_table_name);
+        let builder = with_catalog_id!(builder, self.config);
+
+        let glue_table_output = 
builder.send().await.map_err(from_aws_sdk_error)?;
+
+        match glue_table_output.table() {
+            None => Err(Error::new(
+                ErrorKind::Unexpected,
+                format!(
+                    "'Table' object for database: {} and table: {} does not 
exist",
+                    src_db_name, src_table_name
+                ),
+            )),
+            Some(table) => {
+                let rename_table_input = TableInput::builder()
+                    .name(dest_table_name)
+                    .set_parameters(table.parameters.clone())
+                    .set_storage_descriptor(table.storage_descriptor.clone())
+                    .set_table_type(table.table_type.clone())
+                    .set_description(table.description.clone())
+                    .build()
+                    .map_err(from_aws_build_error)?;
+
+                let builder = self
+                    .client
+                    .0
+                    .create_table()
+                    .database_name(&dest_db_name)
+                    .table_input(rename_table_input);
+                let builder = with_catalog_id!(builder, self.config);
+
+                builder.send().await.map_err(from_aws_sdk_error)?;
+
+                let drop_src_table_result = self.drop_table(src).await;
+
+                match drop_src_table_result {
+                    Ok(_) => Ok(()),
+                    Err(_) => {
+                        let err_msg_src_table = format!(
+                            "Failed to drop old table {}.{}.",
+                            src_db_name, src_table_name
+                        );
+
+                        let drop_dest_table_result = 
self.drop_table(dest).await;
+
+                        match drop_dest_table_result {
+                            Ok(_) => Err(Error::new(
+                                ErrorKind::Unexpected,
+                                format!(
+                                    "{} Rolled back table creation for {}.{}.",
+                                    err_msg_src_table, dest_db_name, 
dest_table_name
+                                ),
+                            )),
+                            Err(_) => Err(Error::new(
+                                ErrorKind::Unexpected,
+                                format!(
+                                    "{} Failed to roll back table creation for 
{}.{}. Please clean up manually.",
+                                    err_msg_src_table, dest_db_name, 
dest_table_name
+                                ),
+                            )),
+                        }
+                    }
+                }
+            }
+        }
     }
 
     async fn update_table(&self, _commit: TableCommit) -> Result<Table> {
-        todo!()
+        Err(Error::new(
+            ErrorKind::FeatureUnsupported,
+            "Updating a table is not supported yet",
+        ))
     }
 }
diff --git a/crates/catalog/glue/src/lib.rs b/crates/catalog/glue/src/lib.rs
index b274cf7..2376573 100644
--- a/crates/catalog/glue/src/lib.rs
+++ b/crates/catalog/glue/src/lib.rs
@@ -21,6 +21,7 @@
 
 mod catalog;
 mod error;
+mod schema;
 mod utils;
 pub use catalog::*;
 pub use utils::{
diff --git a/crates/catalog/glue/src/schema.rs 
b/crates/catalog/glue/src/schema.rs
new file mode 100644
index 0000000..a126f2f
--- /dev/null
+++ b/crates/catalog/glue/src/schema.rs
@@ -0,0 +1,485 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+/// Property `iceberg.field.id` for `Column`
+pub(crate) const ICEBERG_FIELD_ID: &str = "iceberg.field.id";
+/// Property `iceberg.field.optional` for `Column`
+pub(crate) const ICEBERG_FIELD_OPTIONAL: &str = "iceberg.field.optional";
+/// Property `iceberg.field.current` for `Column`
+pub(crate) const ICEBERG_FIELD_CURRENT: &str = "iceberg.field.current";
+
+use std::collections::HashMap;
+
+use iceberg::{
+    spec::{visit_schema, PrimitiveType, SchemaVisitor, TableMetadata},
+    Error, ErrorKind, Result,
+};
+
+use aws_sdk_glue::types::Column;
+
+use crate::error::from_aws_build_error;
+
+type GlueSchema = Vec<Column>;
+
+#[derive(Debug, Default)]
+pub(crate) struct GlueSchemaBuilder {
+    schema: GlueSchema,
+    is_current: bool,
+    depth: usize,
+}
+
+impl GlueSchemaBuilder {
+    /// Creates a new `GlueSchemaBuilder` from iceberg `Schema`
+    pub fn from_iceberg(metadata: &TableMetadata) -> Result<GlueSchemaBuilder> 
{
+        let current_schema = metadata.current_schema();
+
+        let mut builder = Self {
+            schema: Vec::new(),
+            is_current: true,
+            depth: 0,
+        };
+
+        visit_schema(current_schema, &mut builder)?;
+
+        builder.is_current = false;
+
+        for schema in metadata.schemas_iter() {
+            if schema.schema_id() == current_schema.schema_id() {
+                continue;
+            }
+
+            visit_schema(schema, &mut builder)?;
+        }
+
+        Ok(builder)
+    }
+
+    /// Returns the newly converted `GlueSchema`
+    pub fn build(self) -> GlueSchema {
+        self.schema
+    }
+
+    /// Check if is in `StructType` while traversing schema
+    fn is_inside_struct(&self) -> bool {
+        self.depth > 0
+    }
+}
+
+impl SchemaVisitor for GlueSchemaBuilder {
+    type T = String;
+
+    fn schema(
+        &mut self,
+        _schema: &iceberg::spec::Schema,
+        value: Self::T,
+    ) -> iceberg::Result<String> {
+        Ok(value)
+    }
+
+    fn before_struct_field(&mut self, _field: &iceberg::spec::NestedFieldRef) 
-> Result<()> {
+        self.depth += 1;
+        Ok(())
+    }
+
+    fn r#struct(
+        &mut self,
+        r#_struct: &iceberg::spec::StructType,
+        results: Vec<String>,
+    ) -> iceberg::Result<String> {
+        Ok(format!("struct<{}>", results.join(", ")))
+    }
+
+    fn after_struct_field(&mut self, _field: &iceberg::spec::NestedFieldRef) 
-> Result<()> {
+        self.depth -= 1;
+        Ok(())
+    }
+
+    fn field(
+        &mut self,
+        field: &iceberg::spec::NestedFieldRef,
+        value: String,
+    ) -> iceberg::Result<String> {
+        if self.is_inside_struct() {
+            return Ok(format!("{}:{}", field.name, &value));
+        }
+
+        let parameters = HashMap::from([
+            (ICEBERG_FIELD_ID.to_string(), format!("{}", field.id)),
+            (
+                ICEBERG_FIELD_OPTIONAL.to_string(),
+                format!("{}", field.required).to_lowercase(),
+            ),
+            (
+                ICEBERG_FIELD_CURRENT.to_string(),
+                format!("{}", self.is_current).to_lowercase(),
+            ),
+        ]);
+
+        let mut builder = Column::builder()
+            .name(field.name.clone())
+            .r#type(&value)
+            .set_parameters(Some(parameters));
+
+        if let Some(comment) = field.doc.as_ref() {
+            builder = builder.comment(comment);
+        }
+
+        let column = builder.build().map_err(from_aws_build_error)?;
+
+        self.schema.push(column);
+
+        Ok(value)
+    }
+
+    fn list(&mut self, _list: &iceberg::spec::ListType, value: String) -> 
iceberg::Result<String> {
+        Ok(format!("array<{}>", value))
+    }
+
+    fn map(
+        &mut self,
+        _map: &iceberg::spec::MapType,
+        key_value: String,
+        value: String,
+    ) -> iceberg::Result<String> {
+        Ok(format!("map<{},{}>", key_value, value))
+    }
+
+    fn primitive(&mut self, p: &iceberg::spec::PrimitiveType) -> 
iceberg::Result<Self::T> {
+        let glue_type = match p {
+            PrimitiveType::Boolean => "boolean".to_string(),
+            PrimitiveType::Int => "int".to_string(),
+            PrimitiveType::Long => "bigint".to_string(),
+            PrimitiveType::Float => "float".to_string(),
+            PrimitiveType::Double => "double".to_string(),
+            PrimitiveType::Date => "date".to_string(),
+            PrimitiveType::Timestamp => "timestamp".to_string(),
+            PrimitiveType::Time | PrimitiveType::String | PrimitiveType::Uuid 
=> {
+                "string".to_string()
+            }
+            PrimitiveType::Binary | PrimitiveType::Fixed(_) => 
"binary".to_string(),
+            PrimitiveType::Decimal { precision, scale } => {
+                format!("decimal({},{})", precision, scale)
+            }
+            _ => {
+                return Err(Error::new(
+                    ErrorKind::FeatureUnsupported,
+                    "Conversion from 'Timestamptz' is not supported",
+                ))
+            }
+        };
+
+        Ok(glue_type)
+    }
+}
+
+#[cfg(test)]
+mod tests {
+    use iceberg::{
+        spec::{Schema, TableMetadataBuilder},
+        TableCreation,
+    };
+
+    use super::*;
+
+    fn create_metadata(schema: Schema) -> Result<TableMetadata> {
+        let table_creation = TableCreation::builder()
+            .name("my_table".to_string())
+            .location("my_location".to_string())
+            .schema(schema)
+            .build();
+        let metadata = 
TableMetadataBuilder::from_table_creation(table_creation)?.build()?;
+
+        Ok(metadata)
+    }
+
+    fn create_column(
+        name: impl Into<String>,
+        r#type: impl Into<String>,
+        id: impl Into<String>,
+    ) -> Result<Column> {
+        let parameters = HashMap::from([
+            (ICEBERG_FIELD_ID.to_string(), id.into()),
+            (ICEBERG_FIELD_OPTIONAL.to_string(), "true".to_string()),
+            (ICEBERG_FIELD_CURRENT.to_string(), "true".to_string()),
+        ]);
+
+        Column::builder()
+            .name(name)
+            .r#type(r#type)
+            .set_comment(None)
+            .set_parameters(Some(parameters))
+            .build()
+            .map_err(from_aws_build_error)
+    }
+
+    #[test]
+    fn test_schema_with_simple_fields() -> Result<()> {
+        let record = r#"{
+            "type": "struct",
+            "schema-id": 1,
+            "fields": [
+                {
+                    "id": 1,
+                    "name": "c1",
+                    "required": true,
+                    "type": "boolean"
+                },
+                {
+                    "id": 2,
+                    "name": "c2",
+                    "required": true,
+                    "type": "int"
+                },
+                {
+                    "id": 3,
+                    "name": "c3",
+                    "required": true,
+                    "type": "long"
+                },
+                {
+                    "id": 4,
+                    "name": "c4",
+                    "required": true,
+                    "type": "float"
+                },
+                {
+                    "id": 5,
+                    "name": "c5",
+                    "required": true,
+                    "type": "double"
+                },
+                {
+                    "id": 6,
+                    "name": "c6",
+                    "required": true,
+                    "type": "decimal(2,2)"
+                },
+                {
+                    "id": 7,
+                    "name": "c7",
+                    "required": true,
+                    "type": "date"
+                },
+                {
+                    "id": 8,
+                    "name": "c8",
+                    "required": true,
+                    "type": "time"
+                },
+                {
+                    "id": 9,
+                    "name": "c9",
+                    "required": true,
+                    "type": "timestamp"
+                },
+                {
+                    "id": 10,
+                    "name": "c10",
+                    "required": true,
+                    "type": "string"
+                },
+                {
+                    "id": 11,
+                    "name": "c11",
+                    "required": true,
+                    "type": "uuid"
+                },
+                {
+                    "id": 12,
+                    "name": "c12",
+                    "required": true,
+                    "type": "fixed[4]"
+                },
+                {
+                    "id": 13,
+                    "name": "c13",
+                    "required": true,
+                    "type": "binary"
+                }
+            ]
+        }"#;
+
+        let schema = serde_json::from_str::<Schema>(record)?;
+        let metadata = create_metadata(schema)?;
+
+        let result = GlueSchemaBuilder::from_iceberg(&metadata)?.build();
+
+        let expected = vec![
+            create_column("c1", "boolean", "1")?,
+            create_column("c2", "int", "2")?,
+            create_column("c3", "bigint", "3")?,
+            create_column("c4", "float", "4")?,
+            create_column("c5", "double", "5")?,
+            create_column("c6", "decimal(2,2)", "6")?,
+            create_column("c7", "date", "7")?,
+            create_column("c8", "string", "8")?,
+            create_column("c9", "timestamp", "9")?,
+            create_column("c10", "string", "10")?,
+            create_column("c11", "string", "11")?,
+            create_column("c12", "binary", "12")?,
+            create_column("c13", "binary", "13")?,
+        ];
+
+        assert_eq!(result, expected);
+
+        Ok(())
+    }
+
+    #[test]
+    fn test_schema_with_structs() -> Result<()> {
+        let record = r#"{
+            "type": "struct",
+            "schema-id": 1,
+            "fields": [
+                {
+                    "id": 1,
+                    "name": "person",
+                    "required": true,
+                    "type": {
+                        "type": "struct",
+                        "fields": [
+                            {
+                                "id": 2,
+                                "name": "name",
+                                "required": true,
+                                "type": "string"
+                            },
+                            {
+                                "id": 3,
+                                "name": "age",
+                                "required": false,
+                                "type": "int"
+                            }
+                        ]
+                    }
+                }
+            ]
+        }"#;
+
+        let schema = serde_json::from_str::<Schema>(record)?;
+        let metadata = create_metadata(schema)?;
+
+        let result = GlueSchemaBuilder::from_iceberg(&metadata)?.build();
+
+        let expected = vec![create_column(
+            "person",
+            "struct<name:string, age:int>",
+            "1",
+        )?];
+
+        assert_eq!(result, expected);
+
+        Ok(())
+    }
+
+    #[test]
+    fn test_schema_with_struct_inside_list() -> Result<()> {
+        let record = r#"
+        {
+            "schema-id": 1,
+            "type": "struct",
+            "fields": [
+                {
+                    "id": 1,
+                    "name": "location",
+                    "required": true,
+                    "type": {
+                        "type": "list",
+                        "element-id": 2,
+                        "element-required": true,
+                        "element": {
+                            "type": "struct",
+                            "fields": [
+                                {
+                                    "id": 3,
+                                    "name": "latitude",
+                                    "required": false,
+                                    "type": "float"
+                                },
+                                {
+                                    "id": 4,
+                                    "name": "longitude",
+                                    "required": false,
+                                    "type": "float"
+                                }
+                            ]
+                        }
+                    }
+                }
+            ]
+        }
+        "#;
+
+        let schema = serde_json::from_str::<Schema>(record)?;
+        let metadata = create_metadata(schema)?;
+
+        let result = GlueSchemaBuilder::from_iceberg(&metadata)?.build();
+
+        let expected = vec![create_column(
+            "location",
+            "array<struct<latitude:float, longitude:float>>",
+            "1",
+        )?];
+
+        assert_eq!(result, expected);
+
+        Ok(())
+    }
+
+    #[test]
+    fn test_schema_with_nested_maps() -> Result<()> {
+        let record = r#"
+            {
+                "schema-id": 1,
+                "type": "struct",
+                "fields": [
+                    {
+                        "id": 1,
+                        "name": "quux",
+                        "required": true,
+                        "type": {
+                            "type": "map",
+                            "key-id": 2,
+                            "key": "string",
+                            "value-id": 3,
+                            "value-required": true,
+                            "value": {
+                                "type": "map",
+                                "key-id": 4,
+                                "key": "string",
+                                "value-id": 5,
+                                "value-required": true,
+                                "value": "int"
+                            }
+                        }
+                    }
+                ]
+            }
+        "#;
+
+        let schema = serde_json::from_str::<Schema>(record)?;
+        let metadata = create_metadata(schema)?;
+
+        let result = GlueSchemaBuilder::from_iceberg(&metadata)?.build();
+
+        let expected = vec![create_column("quux", 
"map<string,map<string,int>>", "1")?];
+
+        assert_eq!(result, expected);
+
+        Ok(())
+    }
+}
diff --git a/crates/catalog/glue/src/utils.rs b/crates/catalog/glue/src/utils.rs
index fa9ebb8..dcf6bef 100644
--- a/crates/catalog/glue/src/utils.rs
+++ b/crates/catalog/glue/src/utils.rs
@@ -20,14 +20,14 @@ use std::collections::HashMap;
 use aws_config::{BehaviorVersion, Region, SdkConfig};
 use aws_sdk_glue::{
     config::Credentials,
-    types::{Database, DatabaseInput},
+    types::{Database, DatabaseInput, StorageDescriptor, TableInput},
 };
+use iceberg::spec::TableMetadata;
 use iceberg::{Error, ErrorKind, Namespace, NamespaceIdent, Result};
+use uuid::Uuid;
 
-use crate::error::from_aws_build_error;
+use crate::{error::from_aws_build_error, schema::GlueSchemaBuilder};
 
-const _GLUE_SKIP_ARCHIVE: &str = "glue.skip-archive";
-const _GLUE_SKIP_ARCHIVE_DEFAULT: bool = true;
 /// Property aws profile name
 pub const AWS_PROFILE_NAME: &str = "profile_name";
 /// Property aws region
@@ -42,6 +42,16 @@ pub const AWS_SESSION_TOKEN: &str = "aws_session_token";
 const DESCRIPTION: &str = "description";
 /// Parameter namespace location uri
 const LOCATION: &str = "location_uri";
+/// Property `metadata_location` for `TableInput`
+const METADATA_LOCATION: &str = "metadata_location";
+/// Property `previous_metadata_location` for `TableInput`
+const PREV_METADATA_LOCATION: &str = "previous_metadata_location";
+/// Property external table for `TableInput`
+const EXTERNAL_TABLE: &str = "EXTERNAL_TABLE";
+/// Parameter key `table_type` for `TableInput`
+const TABLE_TYPE: &str = "table_type";
+/// Parameter value `table_type` for `TableInput`
+const ICEBERG: &str = "ICEBERG";
 
 /// Creates an aws sdk configuration based on
 /// provided properties and an optional endpoint URL.
@@ -125,6 +135,50 @@ pub(crate) fn convert_to_namespace(database: &Database) -> 
Namespace {
     Namespace::with_properties(NamespaceIdent::new(db_name), properties)
 }
 
+/// Converts Iceberg table metadata into an
+/// AWS Glue `TableInput` representation.
+///
+/// This function facilitates the integration of Iceberg tables with AWS Glue
+/// by converting Iceberg table metadata into a Glue-compatible `TableInput`
+/// structure.
+pub(crate) fn convert_to_glue_table(
+    table_name: impl Into<String>,
+    metadata_location: String,
+    metadata: &TableMetadata,
+    properties: &HashMap<String, String>,
+    prev_metadata_location: Option<String>,
+) -> Result<TableInput> {
+    let glue_schema = GlueSchemaBuilder::from_iceberg(metadata)?.build();
+
+    let storage_descriptor = StorageDescriptor::builder()
+        .set_columns(Some(glue_schema))
+        .location(&metadata_location)
+        .build();
+
+    let mut parameters = HashMap::from([
+        (TABLE_TYPE.to_string(), ICEBERG.to_string()),
+        (METADATA_LOCATION.to_string(), metadata_location),
+    ]);
+
+    if let Some(prev) = prev_metadata_location {
+        parameters.insert(PREV_METADATA_LOCATION.to_string(), prev);
+    }
+
+    let mut table_input_builder = TableInput::builder()
+        .name(table_name)
+        .set_parameters(Some(parameters))
+        .storage_descriptor(storage_descriptor)
+        .table_type(EXTERNAL_TABLE);
+
+    if let Some(description) = properties.get(DESCRIPTION) {
+        table_input_builder = table_input_builder.description(description);
+    }
+
+    let table_input = 
table_input_builder.build().map_err(from_aws_build_error)?;
+
+    Ok(table_input)
+}
+
 /// Checks if provided `NamespaceIdent` is valid
 pub(crate) fn validate_namespace(namespace: &NamespaceIdent) -> Result<String> 
{
     let name = namespace.as_ref();
@@ -151,6 +205,73 @@ pub(crate) fn validate_namespace(namespace: 
&NamespaceIdent) -> Result<String> {
     Ok(name)
 }
 
+/// Get default table location from `Namespace` properties
+pub(crate) fn get_default_table_location(
+    namespace: &Namespace,
+    db_name: impl AsRef<str>,
+    table_name: impl AsRef<str>,
+    warehouse: impl AsRef<str>,
+) -> String {
+    let properties = namespace.properties();
+
+    match properties.get(LOCATION) {
+        Some(location) => format!("{}/{}", location, table_name.as_ref()),
+        None => {
+            let warehouse_location = warehouse.as_ref().trim_end_matches('/');
+
+            format!(
+                "{}/{}.db/{}",
+                warehouse_location,
+                db_name.as_ref(),
+                table_name.as_ref()
+            )
+        }
+    }
+}
+
+/// Create metadata location from `location` and `version`
+pub(crate) fn create_metadata_location(location: impl AsRef<str>, version: 
i32) -> Result<String> {
+    if version < 0 {
+        return Err(Error::new(
+            ErrorKind::DataInvalid,
+            format!(
+                "Table metadata version: '{}' must be a non-negative integer",
+                version
+            ),
+        ));
+    };
+
+    let version = format!("{:0>5}", version);
+    let id = Uuid::new_v4();
+    let metadata_location = format!(
+        "{}/metadata/{}-{}.metadata.json",
+        location.as_ref(),
+        version,
+        id
+    );
+
+    Ok(metadata_location)
+}
+
+/// Get metadata location from `GlueTable` parameters
+pub(crate) fn get_metadata_location(
+    parameters: &Option<HashMap<String, String>>,
+) -> Result<String> {
+    match parameters {
+        Some(properties) => match properties.get(METADATA_LOCATION) {
+            Some(location) => Ok(location.to_string()),
+            None => Err(Error::new(
+                ErrorKind::DataInvalid,
+                format!("No '{}' set on table", METADATA_LOCATION),
+            )),
+        },
+        None => Err(Error::new(
+            ErrorKind::DataInvalid,
+            "No 'parameters' set on table. Location of metadata is undefined",
+        )),
+    }
+}
+
 #[macro_export]
 /// Extends aws sdk builder with `catalog_id` if present
 macro_rules! with_catalog_id {
@@ -165,11 +286,145 @@ macro_rules! with_catalog_id {
 
 #[cfg(test)]
 mod tests {
-    use aws_sdk_glue::config::ProvideCredentials;
-    use iceberg::{Namespace, Result};
+    use aws_sdk_glue::{config::ProvideCredentials, types::Column};
+    use iceberg::{
+        spec::{NestedField, PrimitiveType, Schema, TableMetadataBuilder, Type},
+        Namespace, Result, TableCreation,
+    };
+
+    use crate::schema::{ICEBERG_FIELD_CURRENT, ICEBERG_FIELD_ID, 
ICEBERG_FIELD_OPTIONAL};
 
     use super::*;
 
+    fn create_metadata(schema: Schema) -> Result<TableMetadata> {
+        let table_creation = TableCreation::builder()
+            .name("my_table".to_string())
+            .location("my_location".to_string())
+            .schema(schema)
+            .build();
+        let metadata = 
TableMetadataBuilder::from_table_creation(table_creation)?.build()?;
+
+        Ok(metadata)
+    }
+
+    #[test]
+    fn test_get_metadata_location() -> Result<()> {
+        let params_valid = Some(HashMap::from([(
+            METADATA_LOCATION.to_string(),
+            "my_location".to_string(),
+        )]));
+        let params_missing_key = Some(HashMap::from([(
+            "not_here".to_string(),
+            "my_location".to_string(),
+        )]));
+
+        let result_valid = get_metadata_location(&params_valid)?;
+        let result_missing_key = get_metadata_location(&params_missing_key);
+        let result_no_params = get_metadata_location(&None);
+
+        assert_eq!(result_valid, "my_location");
+        assert!(result_missing_key.is_err());
+        assert!(result_no_params.is_err());
+
+        Ok(())
+    }
+
+    #[test]
+    fn test_convert_to_glue_table() -> Result<()> {
+        let table_name = "my_table".to_string();
+        let location = "s3a://warehouse/hive".to_string();
+        let metadata_location = create_metadata_location(location.clone(), 0)?;
+        let properties = HashMap::new();
+        let schema = Schema::builder()
+            .with_schema_id(1)
+            .with_fields(vec![NestedField::required(
+                1,
+                "foo",
+                Type::Primitive(PrimitiveType::Int),
+            )
+            .into()])
+            .build()?;
+
+        let metadata = create_metadata(schema)?;
+
+        let parameters = HashMap::from([
+            (ICEBERG_FIELD_ID.to_string(), "1".to_string()),
+            (ICEBERG_FIELD_OPTIONAL.to_string(), "true".to_string()),
+            (ICEBERG_FIELD_CURRENT.to_string(), "true".to_string()),
+        ]);
+
+        let column = Column::builder()
+            .name("foo")
+            .r#type("int")
+            .set_parameters(Some(parameters))
+            .set_comment(None)
+            .build()
+            .map_err(from_aws_build_error)?;
+
+        let storage_descriptor = StorageDescriptor::builder()
+            .set_columns(Some(vec![column]))
+            .location(&metadata_location)
+            .build();
+
+        let result =
+            convert_to_glue_table(&table_name, metadata_location, &metadata, 
&properties, None)?;
+
+        assert_eq!(result.name(), &table_name);
+        assert_eq!(result.description(), None);
+        assert_eq!(result.storage_descriptor, Some(storage_descriptor));
+
+        Ok(())
+    }
+
+    #[test]
+    fn test_create_metadata_location() -> Result<()> {
+        let location = "my_base_location";
+        let valid_version = 0;
+        let invalid_version = -1;
+
+        let valid_result = create_metadata_location(location, valid_version)?;
+        let invalid_result = create_metadata_location(location, 
invalid_version);
+
+        assert!(valid_result.starts_with("my_base_location/metadata/00000-"));
+        assert!(valid_result.ends_with(".metadata.json"));
+        assert!(invalid_result.is_err());
+
+        Ok(())
+    }
+
+    #[test]
+    fn test_get_default_table_location() -> Result<()> {
+        let properties = HashMap::from([(LOCATION.to_string(), 
"db_location".to_string())]);
+
+        let namespace =
+            Namespace::with_properties(NamespaceIdent::new("default".into()), 
properties);
+        let db_name = validate_namespace(namespace.name())?;
+        let table_name = "my_table";
+
+        let expected = "db_location/my_table";
+        let result =
+            get_default_table_location(&namespace, db_name, table_name, 
"warehouse_location");
+
+        assert_eq!(expected, result);
+
+        Ok(())
+    }
+
+    #[test]
+    fn test_get_default_table_location_warehouse() -> Result<()> {
+        let namespace = Namespace::new(NamespaceIdent::new("default".into()));
+        let db_name = validate_namespace(namespace.name())?;
+        let table_name = "my_table";
+
+        let expected = "warehouse_location/default.db/my_table";
+        let result =
+            get_default_table_location(&namespace, db_name, table_name, 
"warehouse_location");
+
+        assert_eq!(expected, result);
+
+        Ok(())
+    }
+
     #[test]
     fn test_convert_to_namespace() -> Result<()> {
         let db = Database::builder()
diff --git a/crates/catalog/glue/testdata/glue_catalog/docker-compose.yaml 
b/crates/catalog/glue/testdata/glue_catalog/docker-compose.yaml
index c24d2d7..a0be22e 100644
--- a/crates/catalog/glue/testdata/glue_catalog/docker-compose.yaml
+++ b/crates/catalog/glue/testdata/glue_catalog/docker-compose.yaml
@@ -18,6 +18,28 @@
 version: '3.8'
 
 services:
+  minio:
+    image: minio/minio:RELEASE.2024-03-07T00-43-48Z
+    expose:
+      - 9000
+      - 9001
+    environment:
+      - MINIO_ROOT_USER=admin
+      - MINIO_ROOT_PASSWORD=password
+      - MINIO_DOMAIN=minio
+    command: [ "server", "/data", "--console-address", ":9001" ]
+
+  mc:
+    depends_on:
+      - minio
+    image: minio/mc:RELEASE.2024-03-07T00-31-49Z
+    environment:
+      - AWS_ACCESS_KEY_ID=admin
+      - AWS_SECRET_ACCESS_KEY=password
+      - AWS_REGION=us-east-1
+    entrypoint: >
+      /bin/sh -c " until (/usr/bin/mc config host add minio http://minio:9000 
admin password) do echo '...waiting...' && sleep 1; done; /usr/bin/mc mb 
minio/warehouse; /usr/bin/mc policy set public minio/warehouse; tail -f 
/dev/null "
+
   moto:
     image: motoserver/moto:5.0.3
     expose:
diff --git a/crates/catalog/glue/tests/glue_catalog_test.rs 
b/crates/catalog/glue/tests/glue_catalog_test.rs
index c44ffcd..eb0cd96 100644
--- a/crates/catalog/glue/tests/glue_catalog_test.rs
+++ b/crates/catalog/glue/tests/glue_catalog_test.rs
@@ -19,7 +19,9 @@
 
 use std::collections::HashMap;
 
-use iceberg::{Catalog, Namespace, NamespaceIdent, Result};
+use iceberg::io::{S3_ACCESS_KEY_ID, S3_ENDPOINT, S3_REGION, 
S3_SECRET_ACCESS_KEY};
+use iceberg::spec::{NestedField, PrimitiveType, Schema, Type};
+use iceberg::{Catalog, Namespace, NamespaceIdent, Result, TableCreation, 
TableIdent};
 use iceberg_catalog_glue::{
     GlueCatalog, GlueCatalogConfig, AWS_ACCESS_KEY_ID, AWS_REGION_NAME, 
AWS_SECRET_ACCESS_KEY,
 };
@@ -29,6 +31,7 @@ use port_scanner::scan_port_addr;
 use tokio::time::sleep;
 
 const GLUE_CATALOG_PORT: u16 = 5000;
+const MINIO_PORT: u16 = 9000;
 
 #[derive(Debug)]
 struct TestFixture {
@@ -47,6 +50,7 @@ async fn set_test_fixture(func: &str) -> TestFixture {
     docker_compose.run();
 
     let glue_catalog_ip = docker_compose.get_container_ip("moto");
+    let minio_ip = docker_compose.get_container_ip("minio");
 
     let read_port = format!("{}:{}", glue_catalog_ip, GLUE_CATALOG_PORT);
     loop {
@@ -65,14 +69,22 @@ async fn set_test_fixture(func: &str) -> TestFixture {
             "my_secret_key".to_string(),
         ),
         (AWS_REGION_NAME.to_string(), "us-east-1".to_string()),
+        (
+            S3_ENDPOINT.to_string(),
+            format!("http://{}:{}";, minio_ip, MINIO_PORT),
+        ),
+        (S3_ACCESS_KEY_ID.to_string(), "admin".to_string()),
+        (S3_SECRET_ACCESS_KEY.to_string(), "password".to_string()),
+        (S3_REGION.to_string(), "us-east-1".to_string()),
     ]);
 
     let config = GlueCatalogConfig::builder()
         .uri(format!("http://{}:{}";, glue_catalog_ip, GLUE_CATALOG_PORT))
-        .props(props)
+        .warehouse("s3a://warehouse/hive".to_string())
+        .props(props.clone())
         .build();
 
-    let glue_catalog = GlueCatalog::new(config).await;
+    let glue_catalog = GlueCatalog::new(config).await.unwrap();
 
     TestFixture {
         _docker_compose: docker_compose,
@@ -91,6 +103,176 @@ async fn set_test_namespace(fixture: &TestFixture, 
namespace: &NamespaceIdent) -
     Ok(())
 }
 
+fn set_table_creation(location: impl ToString, name: impl ToString) -> 
Result<TableCreation> {
+    let schema = Schema::builder()
+        .with_schema_id(0)
+        .with_fields(vec![
+            NestedField::required(1, "foo", 
Type::Primitive(PrimitiveType::Int)).into(),
+            NestedField::required(2, "bar", 
Type::Primitive(PrimitiveType::String)).into(),
+        ])
+        .build()?;
+
+    let creation = TableCreation::builder()
+        .location(location.to_string())
+        .name(name.to_string())
+        .properties(HashMap::new())
+        .schema(schema)
+        .build();
+
+    Ok(creation)
+}
+
+#[tokio::test]
+async fn test_rename_table() -> Result<()> {
+    let fixture = set_test_fixture("test_rename_table").await;
+    let creation = set_table_creation("s3a://warehouse/hive", "my_table")?;
+    let namespace = Namespace::new(NamespaceIdent::new("my_database".into()));
+
+    fixture
+        .glue_catalog
+        .create_namespace(namespace.name(), HashMap::new())
+        .await?;
+
+    let table = fixture
+        .glue_catalog
+        .create_table(namespace.name(), creation)
+        .await?;
+
+    let dest = TableIdent::new(namespace.name().clone(), 
"my_table_rename".to_string());
+
+    fixture
+        .glue_catalog
+        .rename_table(table.identifier(), &dest)
+        .await?;
+
+    let table = fixture.glue_catalog.load_table(&dest).await?;
+    assert_eq!(table.identifier(), &dest);
+
+    let src = TableIdent::new(namespace.name().clone(), 
"my_table".to_string());
+
+    let src_table_exists = fixture.glue_catalog.table_exists(&src).await?;
+    assert!(!src_table_exists);
+
+    Ok(())
+}
+
+#[tokio::test]
+async fn test_table_exists() -> Result<()> {
+    let fixture = set_test_fixture("test_table_exists").await;
+    let creation = set_table_creation("s3a://warehouse/hive", "my_table")?;
+    let namespace = Namespace::new(NamespaceIdent::new("my_database".into()));
+
+    fixture
+        .glue_catalog
+        .create_namespace(namespace.name(), HashMap::new())
+        .await?;
+
+    let ident = TableIdent::new(namespace.name().clone(), 
"my_table".to_string());
+
+    let exists = fixture.glue_catalog.table_exists(&ident).await?;
+    assert!(!exists);
+
+    let table = fixture
+        .glue_catalog
+        .create_table(namespace.name(), creation)
+        .await?;
+
+    let exists = fixture
+        .glue_catalog
+        .table_exists(table.identifier())
+        .await?;
+
+    assert!(exists);
+
+    Ok(())
+}
+
+#[tokio::test]
+async fn test_drop_table() -> Result<()> {
+    let fixture = set_test_fixture("test_drop_table").await;
+    let creation = set_table_creation("s3a://warehouse/hive", "my_table")?;
+    let namespace = Namespace::new(NamespaceIdent::new("my_database".into()));
+
+    fixture
+        .glue_catalog
+        .create_namespace(namespace.name(), HashMap::new())
+        .await?;
+
+    let table = fixture
+        .glue_catalog
+        .create_table(namespace.name(), creation)
+        .await?;
+
+    fixture.glue_catalog.drop_table(table.identifier()).await?;
+
+    let result = fixture
+        .glue_catalog
+        .table_exists(table.identifier())
+        .await?;
+
+    assert!(!result);
+
+    Ok(())
+}
+
+#[tokio::test]
+async fn test_load_table() -> Result<()> {
+    let fixture = set_test_fixture("test_load_table").await;
+    let creation = set_table_creation("s3a://warehouse/hive", "my_table")?;
+    let namespace = Namespace::new(NamespaceIdent::new("my_database".into()));
+
+    fixture
+        .glue_catalog
+        .create_namespace(namespace.name(), HashMap::new())
+        .await?;
+
+    let expected = fixture
+        .glue_catalog
+        .create_table(namespace.name(), creation)
+        .await?;
+
+    let result = fixture
+        .glue_catalog
+        .load_table(&TableIdent::new(
+            namespace.name().clone(),
+            "my_table".to_string(),
+        ))
+        .await?;
+
+    assert_eq!(result.identifier(), expected.identifier());
+    assert_eq!(result.metadata_location(), expected.metadata_location());
+    assert_eq!(result.metadata(), expected.metadata());
+
+    Ok(())
+}
+
+#[tokio::test]
+async fn test_create_table() -> Result<()> {
+    let fixture = set_test_fixture("test_create_table").await;
+    let namespace = NamespaceIdent::new("my_database".to_string());
+    set_test_namespace(&fixture, &namespace).await?;
+    let creation = set_table_creation("s3a://warehouse/hive", "my_table")?;
+
+    let result = fixture
+        .glue_catalog
+        .create_table(&namespace, creation)
+        .await?;
+
+    assert_eq!(result.identifier().name(), "my_table");
+    assert!(result
+        .metadata_location()
+        .is_some_and(|location| 
location.starts_with("s3a://warehouse/hive/metadata/00000-")));
+    assert!(
+        fixture
+            .glue_catalog
+            .file_io()
+            .is_exist("s3a://warehouse/hive/metadata/")
+            .await?
+    );
+
+    Ok(())
+}
+
 #[tokio::test]
 async fn test_list_tables() -> Result<()> {
     let fixture = set_test_fixture("test_list_tables").await;

Reply via email to