This is an automated email from the ASF dual-hosted git repository.
liurenjie1024 pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/iceberg-rust.git
The following commit(s) were added to refs/heads/main by this push:
new dcc380a feat: Glue Catalog - table operations (3/3) (#314)
dcc380a is described below
commit dcc380af294862b0a802b140c813b4cc4e304441
Author: Marvin Lanhenke <[email protected]>
AuthorDate: Sun Apr 21 15:01:55 2024 +0200
feat: Glue Catalog - table operations (3/3) (#314)
* add GlueSchemaBuilder
* add warehouse
* add serde_json, tokio, uuid
* add minio
* add create_table
* add tests utils
* add load_table
* add drop_table + table_exists
* add rename_table
* add docs
* fix: docs + err_msg
* fix: remove unused const
* fix: default_table_location
* fix: remove single quotes error message
* chore: add test-condition `test_rename_table`
* chore: add test-condition `test_table_exists`
---
crates/catalog/glue/Cargo.toml | 4 +-
crates/catalog/glue/src/catalog.rs | 304 ++++++++++++-
crates/catalog/glue/src/lib.rs | 1 +
crates/catalog/glue/src/schema.rs | 485 +++++++++++++++++++++
crates/catalog/glue/src/utils.rs | 267 +++++++++++-
.../glue/testdata/glue_catalog/docker-compose.yaml | 22 +
crates/catalog/glue/tests/glue_catalog_test.rs | 188 +++++++-
7 files changed, 1243 insertions(+), 28 deletions(-)
diff --git a/crates/catalog/glue/Cargo.toml b/crates/catalog/glue/Cargo.toml
index daa9587..0508378 100644
--- a/crates/catalog/glue/Cargo.toml
+++ b/crates/catalog/glue/Cargo.toml
@@ -35,9 +35,11 @@ aws-config = { workspace = true }
aws-sdk-glue = { workspace = true }
iceberg = { workspace = true }
log = { workspace = true }
+serde_json = { workspace = true }
+tokio = { workspace = true }
typed-builder = { workspace = true }
+uuid = { workspace = true }
[dev-dependencies]
iceberg_test_utils = { path = "../../test_utils", features = ["tests"] }
port_scanner = { workspace = true }
-tokio = { workspace = true }
diff --git a/crates/catalog/glue/src/catalog.rs
b/crates/catalog/glue/src/catalog.rs
index d152a54..f402129 100644
--- a/crates/catalog/glue/src/catalog.rs
+++ b/crates/catalog/glue/src/catalog.rs
@@ -16,18 +16,23 @@
// under the License.
use async_trait::async_trait;
+use aws_sdk_glue::types::TableInput;
+use iceberg::io::FileIO;
+use iceberg::spec::{TableMetadata, TableMetadataBuilder};
use iceberg::table::Table;
use iceberg::{
Catalog, Error, ErrorKind, Namespace, NamespaceIdent, Result, TableCommit,
TableCreation,
TableIdent,
};
use std::{collections::HashMap, fmt::Debug};
+use tokio::io::{AsyncReadExt, AsyncWriteExt};
use typed_builder::TypedBuilder;
-use crate::error::from_aws_sdk_error;
+use crate::error::{from_aws_build_error, from_aws_sdk_error};
use crate::utils::{
- convert_to_database, convert_to_namespace, create_sdk_config,
validate_namespace,
+ convert_to_database, convert_to_glue_table, convert_to_namespace,
create_metadata_location,
+ create_sdk_config, get_default_table_location, get_metadata_location,
validate_namespace,
};
use crate::with_catalog_id;
@@ -38,6 +43,7 @@ pub struct GlueCatalogConfig {
uri: Option<String>,
#[builder(default, setter(strip_option))]
catalog_id: Option<String>,
+ warehouse: String,
#[builder(default)]
props: HashMap<String, String>,
}
@@ -48,6 +54,7 @@ struct GlueClient(aws_sdk_glue::Client);
pub struct GlueCatalog {
config: GlueCatalogConfig,
client: GlueClient,
+ file_io: FileIO,
}
impl Debug for GlueCatalog {
@@ -60,15 +67,24 @@ impl Debug for GlueCatalog {
impl GlueCatalog {
/// Create a new glue catalog
- pub async fn new(config: GlueCatalogConfig) -> Self {
+ pub async fn new(config: GlueCatalogConfig) -> Result<Self> {
let sdk_config = create_sdk_config(&config.props,
config.uri.as_ref()).await;
let client = aws_sdk_glue::Client::new(&sdk_config);
- GlueCatalog {
+ let file_io = FileIO::from_path(&config.warehouse)?
+ .with_props(&config.props)
+ .build()?;
+
+ Ok(GlueCatalog {
config,
client: GlueClient(client),
- }
+ file_io,
+ })
+ }
+ /// Get the catalogs `FileIO`
+ pub fn file_io(&self) -> FileIO {
+ self.file_io.clone()
}
}
@@ -77,7 +93,7 @@ impl Catalog for GlueCatalog {
/// List namespaces from glue catalog.
///
/// Glue doesn't support nested namespaces.
- /// We will return an empty list if parent is some
+ /// We will return an empty list if parent is some.
async fn list_namespaces(
&self,
parent: Option<&NamespaceIdent>,
@@ -277,6 +293,7 @@ impl Catalog for GlueCatalog {
/// querying the database.
async fn list_tables(&self, namespace: &NamespaceIdent) ->
Result<Vec<TableIdent>> {
let db_name = validate_namespace(namespace)?;
+
let mut table_list: Vec<TableIdent> = Vec::new();
let mut next_token: Option<String> = None;
@@ -310,31 +327,282 @@ impl Catalog for GlueCatalog {
Ok(table_list)
}
+ /// Creates a new table within a specified namespace using the provided
+ /// table creation settings.
+ ///
+ /// # Returns
+ /// A `Result` wrapping a `Table` object representing the newly created
+ /// table.
+ ///
+ /// # Errors
+ /// This function may return an error in several cases, including invalid
+ /// namespace identifiers, failure to determine a default storage location,
+ /// issues generating or writing table metadata, and errors communicating
+ /// with the Glue Catalog.
async fn create_table(
&self,
- _namespace: &NamespaceIdent,
- _creation: TableCreation,
+ namespace: &NamespaceIdent,
+ creation: TableCreation,
) -> Result<Table> {
- todo!()
+ let db_name = validate_namespace(namespace)?;
+ let table_name = creation.name.clone();
+
+ let location = match &creation.location {
+ Some(location) => location.clone(),
+ None => {
+ let ns = self.get_namespace(namespace).await?;
+ get_default_table_location(&ns, &db_name, &table_name,
&self.config.warehouse)
+ }
+ };
+
+ let metadata =
TableMetadataBuilder::from_table_creation(creation)?.build()?;
+ let metadata_location = create_metadata_location(&location, 0)?;
+
+ let mut file = self
+ .file_io
+ .new_output(&metadata_location)?
+ .writer()
+ .await?;
+ file.write_all(&serde_json::to_vec(&metadata)?).await?;
+ file.shutdown().await?;
+
+ let glue_table = convert_to_glue_table(
+ &table_name,
+ metadata_location.clone(),
+ &metadata,
+ metadata.properties(),
+ None,
+ )?;
+
+ let builder = self
+ .client
+ .0
+ .create_table()
+ .database_name(&db_name)
+ .table_input(glue_table);
+ let builder = with_catalog_id!(builder, self.config);
+
+ builder.send().await.map_err(from_aws_sdk_error)?;
+
+ let table = Table::builder()
+ .file_io(self.file_io())
+ .metadata_location(metadata_location)
+ .metadata(metadata)
+ .identifier(TableIdent::new(NamespaceIdent::new(db_name),
table_name))
+ .build();
+
+ Ok(table)
}
- async fn load_table(&self, _table: &TableIdent) -> Result<Table> {
- todo!()
+ /// Loads a table from the Glue Catalog and constructs a `Table` object
+ /// based on its metadata.
+ ///
+ /// # Returns
+ /// A `Result` wrapping a `Table` object that represents the loaded table.
+ ///
+ /// # Errors
+ /// This function may return an error in several scenarios, including:
+ /// - Failure to validate the namespace.
+ /// - Failure to retrieve the table from the Glue Catalog.
+ /// - Absence of metadata location information in the table's properties.
+ /// - Issues reading or deserializing the table's metadata file.
+ async fn load_table(&self, table: &TableIdent) -> Result<Table> {
+ let db_name = validate_namespace(table.namespace())?;
+ let table_name = table.name();
+
+ let builder = self
+ .client
+ .0
+ .get_table()
+ .database_name(&db_name)
+ .name(table_name);
+ let builder = with_catalog_id!(builder, self.config);
+
+ let glue_table_output =
builder.send().await.map_err(from_aws_sdk_error)?;
+
+ match glue_table_output.table() {
+ None => Err(Error::new(
+ ErrorKind::Unexpected,
+ format!(
+ "Table object for database: {} and table: {} does not
exist",
+ db_name, table_name
+ ),
+ )),
+ Some(table) => {
+ let metadata_location =
get_metadata_location(&table.parameters)?;
+
+ let mut reader =
self.file_io.new_input(&metadata_location)?.reader().await?;
+ let mut metadata_str = String::new();
+ reader.read_to_string(&mut metadata_str).await?;
+ let metadata =
serde_json::from_str::<TableMetadata>(&metadata_str)?;
+
+ let table = Table::builder()
+ .file_io(self.file_io())
+ .metadata_location(metadata_location)
+ .metadata(metadata)
+ .identifier(TableIdent::new(
+ NamespaceIdent::new(db_name),
+ table_name.to_owned(),
+ ))
+ .build();
+
+ Ok(table)
+ }
+ }
}
- async fn drop_table(&self, _table: &TableIdent) -> Result<()> {
- todo!()
+ /// Asynchronously drops a table from the database.
+ ///
+ /// # Errors
+ /// Returns an error if:
+ /// - The namespace provided in `table` cannot be validated
+ /// or does not exist.
+ /// - The underlying database client encounters an error while
+ /// attempting to drop the table. This includes scenarios where
+ /// the table does not exist.
+ /// - Any network or communication error occurs with the database backend.
+ async fn drop_table(&self, table: &TableIdent) -> Result<()> {
+ let db_name = validate_namespace(table.namespace())?;
+ let table_name = table.name();
+
+ let builder = self
+ .client
+ .0
+ .delete_table()
+ .database_name(&db_name)
+ .name(table_name);
+ let builder = with_catalog_id!(builder, self.config);
+
+ builder.send().await.map_err(from_aws_sdk_error)?;
+
+ Ok(())
}
- async fn table_exists(&self, _table: &TableIdent) -> Result<bool> {
- todo!()
+ /// Asynchronously checks the existence of a specified table
+ /// in the database.
+ ///
+ /// # Returns
+ /// - `Ok(true)` if the table exists in the database.
+ /// - `Ok(false)` if the table does not exist in the database.
+ /// - `Err(...)` if an error occurs during the process
+ async fn table_exists(&self, table: &TableIdent) -> Result<bool> {
+ let db_name = validate_namespace(table.namespace())?;
+ let table_name = table.name();
+
+ let builder = self
+ .client
+ .0
+ .get_table()
+ .database_name(&db_name)
+ .name(table_name);
+ let builder = with_catalog_id!(builder, self.config);
+
+ let resp = builder.send().await;
+
+ match resp {
+ Ok(_) => Ok(true),
+ Err(err) => {
+ if err
+ .as_service_error()
+ .map(|e| e.is_entity_not_found_exception())
+ == Some(true)
+ {
+ return Ok(false);
+ }
+ Err(from_aws_sdk_error(err))
+ }
+ }
}
- async fn rename_table(&self, _src: &TableIdent, _dest: &TableIdent) ->
Result<()> {
- todo!()
+ /// Asynchronously renames a table within the database
+ /// or moves it between namespaces (databases).
+ ///
+ /// # Returns
+ /// - `Ok(())` on successful rename or move of the table.
+ /// - `Err(...)` if an error occurs during the process.
+ async fn rename_table(&self, src: &TableIdent, dest: &TableIdent) ->
Result<()> {
+ let src_db_name = validate_namespace(src.namespace())?;
+ let dest_db_name = validate_namespace(dest.namespace())?;
+
+ let src_table_name = src.name();
+ let dest_table_name = dest.name();
+
+ let builder = self
+ .client
+ .0
+ .get_table()
+ .database_name(&src_db_name)
+ .name(src_table_name);
+ let builder = with_catalog_id!(builder, self.config);
+
+ let glue_table_output =
builder.send().await.map_err(from_aws_sdk_error)?;
+
+ match glue_table_output.table() {
+ None => Err(Error::new(
+ ErrorKind::Unexpected,
+ format!(
+ "'Table' object for database: {} and table: {} does not
exist",
+ src_db_name, src_table_name
+ ),
+ )),
+ Some(table) => {
+ let rename_table_input = TableInput::builder()
+ .name(dest_table_name)
+ .set_parameters(table.parameters.clone())
+ .set_storage_descriptor(table.storage_descriptor.clone())
+ .set_table_type(table.table_type.clone())
+ .set_description(table.description.clone())
+ .build()
+ .map_err(from_aws_build_error)?;
+
+ let builder = self
+ .client
+ .0
+ .create_table()
+ .database_name(&dest_db_name)
+ .table_input(rename_table_input);
+ let builder = with_catalog_id!(builder, self.config);
+
+ builder.send().await.map_err(from_aws_sdk_error)?;
+
+ let drop_src_table_result = self.drop_table(src).await;
+
+ match drop_src_table_result {
+ Ok(_) => Ok(()),
+ Err(_) => {
+ let err_msg_src_table = format!(
+ "Failed to drop old table {}.{}.",
+ src_db_name, src_table_name
+ );
+
+ let drop_dest_table_result =
self.drop_table(dest).await;
+
+ match drop_dest_table_result {
+ Ok(_) => Err(Error::new(
+ ErrorKind::Unexpected,
+ format!(
+ "{} Rolled back table creation for {}.{}.",
+ err_msg_src_table, dest_db_name,
dest_table_name
+ ),
+ )),
+ Err(_) => Err(Error::new(
+ ErrorKind::Unexpected,
+ format!(
+ "{} Failed to roll back table creation for
{}.{}. Please clean up manually.",
+ err_msg_src_table, dest_db_name,
dest_table_name
+ ),
+ )),
+ }
+ }
+ }
+ }
+ }
}
async fn update_table(&self, _commit: TableCommit) -> Result<Table> {
- todo!()
+ Err(Error::new(
+ ErrorKind::FeatureUnsupported,
+ "Updating a table is not supported yet",
+ ))
}
}
diff --git a/crates/catalog/glue/src/lib.rs b/crates/catalog/glue/src/lib.rs
index b274cf7..2376573 100644
--- a/crates/catalog/glue/src/lib.rs
+++ b/crates/catalog/glue/src/lib.rs
@@ -21,6 +21,7 @@
mod catalog;
mod error;
+mod schema;
mod utils;
pub use catalog::*;
pub use utils::{
diff --git a/crates/catalog/glue/src/schema.rs
b/crates/catalog/glue/src/schema.rs
new file mode 100644
index 0000000..a126f2f
--- /dev/null
+++ b/crates/catalog/glue/src/schema.rs
@@ -0,0 +1,485 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+/// Property `iceberg.field.id` for `Column`
+pub(crate) const ICEBERG_FIELD_ID: &str = "iceberg.field.id";
+/// Property `iceberg.field.optional` for `Column`
+pub(crate) const ICEBERG_FIELD_OPTIONAL: &str = "iceberg.field.optional";
+/// Property `iceberg.field.current` for `Column`
+pub(crate) const ICEBERG_FIELD_CURRENT: &str = "iceberg.field.current";
+
+use std::collections::HashMap;
+
+use iceberg::{
+ spec::{visit_schema, PrimitiveType, SchemaVisitor, TableMetadata},
+ Error, ErrorKind, Result,
+};
+
+use aws_sdk_glue::types::Column;
+
+use crate::error::from_aws_build_error;
+
+type GlueSchema = Vec<Column>;
+
+#[derive(Debug, Default)]
+pub(crate) struct GlueSchemaBuilder {
+ schema: GlueSchema,
+ is_current: bool,
+ depth: usize,
+}
+
+impl GlueSchemaBuilder {
+ /// Creates a new `GlueSchemaBuilder` from iceberg `Schema`
+ pub fn from_iceberg(metadata: &TableMetadata) -> Result<GlueSchemaBuilder>
{
+ let current_schema = metadata.current_schema();
+
+ let mut builder = Self {
+ schema: Vec::new(),
+ is_current: true,
+ depth: 0,
+ };
+
+ visit_schema(current_schema, &mut builder)?;
+
+ builder.is_current = false;
+
+ for schema in metadata.schemas_iter() {
+ if schema.schema_id() == current_schema.schema_id() {
+ continue;
+ }
+
+ visit_schema(schema, &mut builder)?;
+ }
+
+ Ok(builder)
+ }
+
+ /// Returns the newly converted `GlueSchema`
+ pub fn build(self) -> GlueSchema {
+ self.schema
+ }
+
+ /// Check if is in `StructType` while traversing schema
+ fn is_inside_struct(&self) -> bool {
+ self.depth > 0
+ }
+}
+
+impl SchemaVisitor for GlueSchemaBuilder {
+ type T = String;
+
+ fn schema(
+ &mut self,
+ _schema: &iceberg::spec::Schema,
+ value: Self::T,
+ ) -> iceberg::Result<String> {
+ Ok(value)
+ }
+
+ fn before_struct_field(&mut self, _field: &iceberg::spec::NestedFieldRef)
-> Result<()> {
+ self.depth += 1;
+ Ok(())
+ }
+
+ fn r#struct(
+ &mut self,
+ r#_struct: &iceberg::spec::StructType,
+ results: Vec<String>,
+ ) -> iceberg::Result<String> {
+ Ok(format!("struct<{}>", results.join(", ")))
+ }
+
+ fn after_struct_field(&mut self, _field: &iceberg::spec::NestedFieldRef)
-> Result<()> {
+ self.depth -= 1;
+ Ok(())
+ }
+
+ fn field(
+ &mut self,
+ field: &iceberg::spec::NestedFieldRef,
+ value: String,
+ ) -> iceberg::Result<String> {
+ if self.is_inside_struct() {
+ return Ok(format!("{}:{}", field.name, &value));
+ }
+
+ let parameters = HashMap::from([
+ (ICEBERG_FIELD_ID.to_string(), format!("{}", field.id)),
+ (
+ ICEBERG_FIELD_OPTIONAL.to_string(),
+ format!("{}", field.required).to_lowercase(),
+ ),
+ (
+ ICEBERG_FIELD_CURRENT.to_string(),
+ format!("{}", self.is_current).to_lowercase(),
+ ),
+ ]);
+
+ let mut builder = Column::builder()
+ .name(field.name.clone())
+ .r#type(&value)
+ .set_parameters(Some(parameters));
+
+ if let Some(comment) = field.doc.as_ref() {
+ builder = builder.comment(comment);
+ }
+
+ let column = builder.build().map_err(from_aws_build_error)?;
+
+ self.schema.push(column);
+
+ Ok(value)
+ }
+
+ fn list(&mut self, _list: &iceberg::spec::ListType, value: String) ->
iceberg::Result<String> {
+ Ok(format!("array<{}>", value))
+ }
+
+ fn map(
+ &mut self,
+ _map: &iceberg::spec::MapType,
+ key_value: String,
+ value: String,
+ ) -> iceberg::Result<String> {
+ Ok(format!("map<{},{}>", key_value, value))
+ }
+
+ fn primitive(&mut self, p: &iceberg::spec::PrimitiveType) ->
iceberg::Result<Self::T> {
+ let glue_type = match p {
+ PrimitiveType::Boolean => "boolean".to_string(),
+ PrimitiveType::Int => "int".to_string(),
+ PrimitiveType::Long => "bigint".to_string(),
+ PrimitiveType::Float => "float".to_string(),
+ PrimitiveType::Double => "double".to_string(),
+ PrimitiveType::Date => "date".to_string(),
+ PrimitiveType::Timestamp => "timestamp".to_string(),
+ PrimitiveType::Time | PrimitiveType::String | PrimitiveType::Uuid
=> {
+ "string".to_string()
+ }
+ PrimitiveType::Binary | PrimitiveType::Fixed(_) =>
"binary".to_string(),
+ PrimitiveType::Decimal { precision, scale } => {
+ format!("decimal({},{})", precision, scale)
+ }
+ _ => {
+ return Err(Error::new(
+ ErrorKind::FeatureUnsupported,
+ "Conversion from 'Timestamptz' is not supported",
+ ))
+ }
+ };
+
+ Ok(glue_type)
+ }
+}
+
+#[cfg(test)]
+mod tests {
+ use iceberg::{
+ spec::{Schema, TableMetadataBuilder},
+ TableCreation,
+ };
+
+ use super::*;
+
+ fn create_metadata(schema: Schema) -> Result<TableMetadata> {
+ let table_creation = TableCreation::builder()
+ .name("my_table".to_string())
+ .location("my_location".to_string())
+ .schema(schema)
+ .build();
+ let metadata =
TableMetadataBuilder::from_table_creation(table_creation)?.build()?;
+
+ Ok(metadata)
+ }
+
+ fn create_column(
+ name: impl Into<String>,
+ r#type: impl Into<String>,
+ id: impl Into<String>,
+ ) -> Result<Column> {
+ let parameters = HashMap::from([
+ (ICEBERG_FIELD_ID.to_string(), id.into()),
+ (ICEBERG_FIELD_OPTIONAL.to_string(), "true".to_string()),
+ (ICEBERG_FIELD_CURRENT.to_string(), "true".to_string()),
+ ]);
+
+ Column::builder()
+ .name(name)
+ .r#type(r#type)
+ .set_comment(None)
+ .set_parameters(Some(parameters))
+ .build()
+ .map_err(from_aws_build_error)
+ }
+
+ #[test]
+ fn test_schema_with_simple_fields() -> Result<()> {
+ let record = r#"{
+ "type": "struct",
+ "schema-id": 1,
+ "fields": [
+ {
+ "id": 1,
+ "name": "c1",
+ "required": true,
+ "type": "boolean"
+ },
+ {
+ "id": 2,
+ "name": "c2",
+ "required": true,
+ "type": "int"
+ },
+ {
+ "id": 3,
+ "name": "c3",
+ "required": true,
+ "type": "long"
+ },
+ {
+ "id": 4,
+ "name": "c4",
+ "required": true,
+ "type": "float"
+ },
+ {
+ "id": 5,
+ "name": "c5",
+ "required": true,
+ "type": "double"
+ },
+ {
+ "id": 6,
+ "name": "c6",
+ "required": true,
+ "type": "decimal(2,2)"
+ },
+ {
+ "id": 7,
+ "name": "c7",
+ "required": true,
+ "type": "date"
+ },
+ {
+ "id": 8,
+ "name": "c8",
+ "required": true,
+ "type": "time"
+ },
+ {
+ "id": 9,
+ "name": "c9",
+ "required": true,
+ "type": "timestamp"
+ },
+ {
+ "id": 10,
+ "name": "c10",
+ "required": true,
+ "type": "string"
+ },
+ {
+ "id": 11,
+ "name": "c11",
+ "required": true,
+ "type": "uuid"
+ },
+ {
+ "id": 12,
+ "name": "c12",
+ "required": true,
+ "type": "fixed[4]"
+ },
+ {
+ "id": 13,
+ "name": "c13",
+ "required": true,
+ "type": "binary"
+ }
+ ]
+ }"#;
+
+ let schema = serde_json::from_str::<Schema>(record)?;
+ let metadata = create_metadata(schema)?;
+
+ let result = GlueSchemaBuilder::from_iceberg(&metadata)?.build();
+
+ let expected = vec![
+ create_column("c1", "boolean", "1")?,
+ create_column("c2", "int", "2")?,
+ create_column("c3", "bigint", "3")?,
+ create_column("c4", "float", "4")?,
+ create_column("c5", "double", "5")?,
+ create_column("c6", "decimal(2,2)", "6")?,
+ create_column("c7", "date", "7")?,
+ create_column("c8", "string", "8")?,
+ create_column("c9", "timestamp", "9")?,
+ create_column("c10", "string", "10")?,
+ create_column("c11", "string", "11")?,
+ create_column("c12", "binary", "12")?,
+ create_column("c13", "binary", "13")?,
+ ];
+
+ assert_eq!(result, expected);
+
+ Ok(())
+ }
+
+ #[test]
+ fn test_schema_with_structs() -> Result<()> {
+ let record = r#"{
+ "type": "struct",
+ "schema-id": 1,
+ "fields": [
+ {
+ "id": 1,
+ "name": "person",
+ "required": true,
+ "type": {
+ "type": "struct",
+ "fields": [
+ {
+ "id": 2,
+ "name": "name",
+ "required": true,
+ "type": "string"
+ },
+ {
+ "id": 3,
+ "name": "age",
+ "required": false,
+ "type": "int"
+ }
+ ]
+ }
+ }
+ ]
+ }"#;
+
+ let schema = serde_json::from_str::<Schema>(record)?;
+ let metadata = create_metadata(schema)?;
+
+ let result = GlueSchemaBuilder::from_iceberg(&metadata)?.build();
+
+ let expected = vec![create_column(
+ "person",
+ "struct<name:string, age:int>",
+ "1",
+ )?];
+
+ assert_eq!(result, expected);
+
+ Ok(())
+ }
+
+ #[test]
+ fn test_schema_with_struct_inside_list() -> Result<()> {
+ let record = r#"
+ {
+ "schema-id": 1,
+ "type": "struct",
+ "fields": [
+ {
+ "id": 1,
+ "name": "location",
+ "required": true,
+ "type": {
+ "type": "list",
+ "element-id": 2,
+ "element-required": true,
+ "element": {
+ "type": "struct",
+ "fields": [
+ {
+ "id": 3,
+ "name": "latitude",
+ "required": false,
+ "type": "float"
+ },
+ {
+ "id": 4,
+ "name": "longitude",
+ "required": false,
+ "type": "float"
+ }
+ ]
+ }
+ }
+ }
+ ]
+ }
+ "#;
+
+ let schema = serde_json::from_str::<Schema>(record)?;
+ let metadata = create_metadata(schema)?;
+
+ let result = GlueSchemaBuilder::from_iceberg(&metadata)?.build();
+
+ let expected = vec![create_column(
+ "location",
+ "array<struct<latitude:float, longitude:float>>",
+ "1",
+ )?];
+
+ assert_eq!(result, expected);
+
+ Ok(())
+ }
+
+ #[test]
+ fn test_schema_with_nested_maps() -> Result<()> {
+ let record = r#"
+ {
+ "schema-id": 1,
+ "type": "struct",
+ "fields": [
+ {
+ "id": 1,
+ "name": "quux",
+ "required": true,
+ "type": {
+ "type": "map",
+ "key-id": 2,
+ "key": "string",
+ "value-id": 3,
+ "value-required": true,
+ "value": {
+ "type": "map",
+ "key-id": 4,
+ "key": "string",
+ "value-id": 5,
+ "value-required": true,
+ "value": "int"
+ }
+ }
+ }
+ ]
+ }
+ "#;
+
+ let schema = serde_json::from_str::<Schema>(record)?;
+ let metadata = create_metadata(schema)?;
+
+ let result = GlueSchemaBuilder::from_iceberg(&metadata)?.build();
+
+ let expected = vec![create_column("quux",
"map<string,map<string,int>>", "1")?];
+
+ assert_eq!(result, expected);
+
+ Ok(())
+ }
+}
diff --git a/crates/catalog/glue/src/utils.rs b/crates/catalog/glue/src/utils.rs
index fa9ebb8..dcf6bef 100644
--- a/crates/catalog/glue/src/utils.rs
+++ b/crates/catalog/glue/src/utils.rs
@@ -20,14 +20,14 @@ use std::collections::HashMap;
use aws_config::{BehaviorVersion, Region, SdkConfig};
use aws_sdk_glue::{
config::Credentials,
- types::{Database, DatabaseInput},
+ types::{Database, DatabaseInput, StorageDescriptor, TableInput},
};
+use iceberg::spec::TableMetadata;
use iceberg::{Error, ErrorKind, Namespace, NamespaceIdent, Result};
+use uuid::Uuid;
-use crate::error::from_aws_build_error;
+use crate::{error::from_aws_build_error, schema::GlueSchemaBuilder};
-const _GLUE_SKIP_ARCHIVE: &str = "glue.skip-archive";
-const _GLUE_SKIP_ARCHIVE_DEFAULT: bool = true;
/// Property aws profile name
pub const AWS_PROFILE_NAME: &str = "profile_name";
/// Property aws region
@@ -42,6 +42,16 @@ pub const AWS_SESSION_TOKEN: &str = "aws_session_token";
const DESCRIPTION: &str = "description";
/// Parameter namespace location uri
const LOCATION: &str = "location_uri";
+/// Property `metadata_location` for `TableInput`
+const METADATA_LOCATION: &str = "metadata_location";
+/// Property `previous_metadata_location` for `TableInput`
+const PREV_METADATA_LOCATION: &str = "previous_metadata_location";
+/// Property external table for `TableInput`
+const EXTERNAL_TABLE: &str = "EXTERNAL_TABLE";
+/// Parameter key `table_type` for `TableInput`
+const TABLE_TYPE: &str = "table_type";
+/// Parameter value `table_type` for `TableInput`
+const ICEBERG: &str = "ICEBERG";
/// Creates an aws sdk configuration based on
/// provided properties and an optional endpoint URL.
@@ -125,6 +135,50 @@ pub(crate) fn convert_to_namespace(database: &Database) ->
Namespace {
Namespace::with_properties(NamespaceIdent::new(db_name), properties)
}
+/// Converts Iceberg table metadata into an
+/// AWS Glue `TableInput` representation.
+///
+/// This function facilitates the integration of Iceberg tables with AWS Glue
+/// by converting Iceberg table metadata into a Glue-compatible `TableInput`
+/// structure.
+pub(crate) fn convert_to_glue_table(
+ table_name: impl Into<String>,
+ metadata_location: String,
+ metadata: &TableMetadata,
+ properties: &HashMap<String, String>,
+ prev_metadata_location: Option<String>,
+) -> Result<TableInput> {
+ let glue_schema = GlueSchemaBuilder::from_iceberg(metadata)?.build();
+
+ let storage_descriptor = StorageDescriptor::builder()
+ .set_columns(Some(glue_schema))
+ .location(&metadata_location)
+ .build();
+
+ let mut parameters = HashMap::from([
+ (TABLE_TYPE.to_string(), ICEBERG.to_string()),
+ (METADATA_LOCATION.to_string(), metadata_location),
+ ]);
+
+ if let Some(prev) = prev_metadata_location {
+ parameters.insert(PREV_METADATA_LOCATION.to_string(), prev);
+ }
+
+ let mut table_input_builder = TableInput::builder()
+ .name(table_name)
+ .set_parameters(Some(parameters))
+ .storage_descriptor(storage_descriptor)
+ .table_type(EXTERNAL_TABLE);
+
+ if let Some(description) = properties.get(DESCRIPTION) {
+ table_input_builder = table_input_builder.description(description);
+ }
+
+ let table_input =
table_input_builder.build().map_err(from_aws_build_error)?;
+
+ Ok(table_input)
+}
+
/// Checks if provided `NamespaceIdent` is valid
pub(crate) fn validate_namespace(namespace: &NamespaceIdent) -> Result<String>
{
let name = namespace.as_ref();
@@ -151,6 +205,73 @@ pub(crate) fn validate_namespace(namespace:
&NamespaceIdent) -> Result<String> {
Ok(name)
}
+/// Get default table location from `Namespace` properties
+pub(crate) fn get_default_table_location(
+ namespace: &Namespace,
+ db_name: impl AsRef<str>,
+ table_name: impl AsRef<str>,
+ warehouse: impl AsRef<str>,
+) -> String {
+ let properties = namespace.properties();
+
+ match properties.get(LOCATION) {
+ Some(location) => format!("{}/{}", location, table_name.as_ref()),
+ None => {
+ let warehouse_location = warehouse.as_ref().trim_end_matches('/');
+
+ format!(
+ "{}/{}.db/{}",
+ warehouse_location,
+ db_name.as_ref(),
+ table_name.as_ref()
+ )
+ }
+ }
+}
+
+/// Create metadata location from `location` and `version`
+pub(crate) fn create_metadata_location(location: impl AsRef<str>, version:
i32) -> Result<String> {
+ if version < 0 {
+ return Err(Error::new(
+ ErrorKind::DataInvalid,
+ format!(
+ "Table metadata version: '{}' must be a non-negative integer",
+ version
+ ),
+ ));
+ };
+
+ let version = format!("{:0>5}", version);
+ let id = Uuid::new_v4();
+ let metadata_location = format!(
+ "{}/metadata/{}-{}.metadata.json",
+ location.as_ref(),
+ version,
+ id
+ );
+
+ Ok(metadata_location)
+}
+
+/// Get metadata location from `GlueTable` parameters
+pub(crate) fn get_metadata_location(
+ parameters: &Option<HashMap<String, String>>,
+) -> Result<String> {
+ match parameters {
+ Some(properties) => match properties.get(METADATA_LOCATION) {
+ Some(location) => Ok(location.to_string()),
+ None => Err(Error::new(
+ ErrorKind::DataInvalid,
+ format!("No '{}' set on table", METADATA_LOCATION),
+ )),
+ },
+ None => Err(Error::new(
+ ErrorKind::DataInvalid,
+ "No 'parameters' set on table. Location of metadata is undefined",
+ )),
+ }
+}
+
#[macro_export]
/// Extends aws sdk builder with `catalog_id` if present
macro_rules! with_catalog_id {
@@ -165,11 +286,145 @@ macro_rules! with_catalog_id {
#[cfg(test)]
mod tests {
- use aws_sdk_glue::config::ProvideCredentials;
- use iceberg::{Namespace, Result};
+ use aws_sdk_glue::{config::ProvideCredentials, types::Column};
+ use iceberg::{
+ spec::{NestedField, PrimitiveType, Schema, TableMetadataBuilder, Type},
+ Namespace, Result, TableCreation,
+ };
+
+ use crate::schema::{ICEBERG_FIELD_CURRENT, ICEBERG_FIELD_ID,
ICEBERG_FIELD_OPTIONAL};
use super::*;
+ fn create_metadata(schema: Schema) -> Result<TableMetadata> {
+ let table_creation = TableCreation::builder()
+ .name("my_table".to_string())
+ .location("my_location".to_string())
+ .schema(schema)
+ .build();
+ let metadata =
TableMetadataBuilder::from_table_creation(table_creation)?.build()?;
+
+ Ok(metadata)
+ }
+
+ #[test]
+ fn test_get_metadata_location() -> Result<()> {
+ let params_valid = Some(HashMap::from([(
+ METADATA_LOCATION.to_string(),
+ "my_location".to_string(),
+ )]));
+ let params_missing_key = Some(HashMap::from([(
+ "not_here".to_string(),
+ "my_location".to_string(),
+ )]));
+
+ let result_valid = get_metadata_location(¶ms_valid)?;
+ let result_missing_key = get_metadata_location(¶ms_missing_key);
+ let result_no_params = get_metadata_location(&None);
+
+ assert_eq!(result_valid, "my_location");
+ assert!(result_missing_key.is_err());
+ assert!(result_no_params.is_err());
+
+ Ok(())
+ }
+
+ #[test]
+ fn test_convert_to_glue_table() -> Result<()> {
+ let table_name = "my_table".to_string();
+ let location = "s3a://warehouse/hive".to_string();
+ let metadata_location = create_metadata_location(location.clone(), 0)?;
+ let properties = HashMap::new();
+ let schema = Schema::builder()
+ .with_schema_id(1)
+ .with_fields(vec![NestedField::required(
+ 1,
+ "foo",
+ Type::Primitive(PrimitiveType::Int),
+ )
+ .into()])
+ .build()?;
+
+ let metadata = create_metadata(schema)?;
+
+ let parameters = HashMap::from([
+ (ICEBERG_FIELD_ID.to_string(), "1".to_string()),
+ (ICEBERG_FIELD_OPTIONAL.to_string(), "true".to_string()),
+ (ICEBERG_FIELD_CURRENT.to_string(), "true".to_string()),
+ ]);
+
+ let column = Column::builder()
+ .name("foo")
+ .r#type("int")
+ .set_parameters(Some(parameters))
+ .set_comment(None)
+ .build()
+ .map_err(from_aws_build_error)?;
+
+ let storage_descriptor = StorageDescriptor::builder()
+ .set_columns(Some(vec![column]))
+ .location(&metadata_location)
+ .build();
+
+ let result =
+ convert_to_glue_table(&table_name, metadata_location, &metadata,
&properties, None)?;
+
+ assert_eq!(result.name(), &table_name);
+ assert_eq!(result.description(), None);
+ assert_eq!(result.storage_descriptor, Some(storage_descriptor));
+
+ Ok(())
+ }
+
+ #[test]
+ fn test_create_metadata_location() -> Result<()> {
+ let location = "my_base_location";
+ let valid_version = 0;
+ let invalid_version = -1;
+
+ let valid_result = create_metadata_location(location, valid_version)?;
+ let invalid_result = create_metadata_location(location,
invalid_version);
+
+ assert!(valid_result.starts_with("my_base_location/metadata/00000-"));
+ assert!(valid_result.ends_with(".metadata.json"));
+ assert!(invalid_result.is_err());
+
+ Ok(())
+ }
+
+ #[test]
+ fn test_get_default_table_location() -> Result<()> {
+ let properties = HashMap::from([(LOCATION.to_string(),
"db_location".to_string())]);
+
+ let namespace =
+ Namespace::with_properties(NamespaceIdent::new("default".into()),
properties);
+ let db_name = validate_namespace(namespace.name())?;
+ let table_name = "my_table";
+
+ let expected = "db_location/my_table";
+ let result =
+ get_default_table_location(&namespace, db_name, table_name,
"warehouse_location");
+
+ assert_eq!(expected, result);
+
+ Ok(())
+ }
+
+ #[test]
+ fn test_get_default_table_location_warehouse() -> Result<()> {
+ let namespace = Namespace::new(NamespaceIdent::new("default".into()));
+ let db_name = validate_namespace(namespace.name())?;
+ let table_name = "my_table";
+
+ let expected = "warehouse_location/default.db/my_table";
+ let result =
+ get_default_table_location(&namespace, db_name, table_name,
"warehouse_location");
+
+ assert_eq!(expected, result);
+
+ Ok(())
+ }
+
#[test]
fn test_convert_to_namespace() -> Result<()> {
let db = Database::builder()
diff --git a/crates/catalog/glue/testdata/glue_catalog/docker-compose.yaml
b/crates/catalog/glue/testdata/glue_catalog/docker-compose.yaml
index c24d2d7..a0be22e 100644
--- a/crates/catalog/glue/testdata/glue_catalog/docker-compose.yaml
+++ b/crates/catalog/glue/testdata/glue_catalog/docker-compose.yaml
@@ -18,6 +18,28 @@
version: '3.8'
services:
+ minio:
+ image: minio/minio:RELEASE.2024-03-07T00-43-48Z
+ expose:
+ - 9000
+ - 9001
+ environment:
+ - MINIO_ROOT_USER=admin
+ - MINIO_ROOT_PASSWORD=password
+ - MINIO_DOMAIN=minio
+ command: [ "server", "/data", "--console-address", ":9001" ]
+
+ mc:
+ depends_on:
+ - minio
+ image: minio/mc:RELEASE.2024-03-07T00-31-49Z
+ environment:
+ - AWS_ACCESS_KEY_ID=admin
+ - AWS_SECRET_ACCESS_KEY=password
+ - AWS_REGION=us-east-1
+ entrypoint: >
+ /bin/sh -c " until (/usr/bin/mc config host add minio http://minio:9000
admin password) do echo '...waiting...' && sleep 1; done; /usr/bin/mc mb
minio/warehouse; /usr/bin/mc policy set public minio/warehouse; tail -f
/dev/null "
+
moto:
image: motoserver/moto:5.0.3
expose:
diff --git a/crates/catalog/glue/tests/glue_catalog_test.rs
b/crates/catalog/glue/tests/glue_catalog_test.rs
index c44ffcd..eb0cd96 100644
--- a/crates/catalog/glue/tests/glue_catalog_test.rs
+++ b/crates/catalog/glue/tests/glue_catalog_test.rs
@@ -19,7 +19,9 @@
use std::collections::HashMap;
-use iceberg::{Catalog, Namespace, NamespaceIdent, Result};
+use iceberg::io::{S3_ACCESS_KEY_ID, S3_ENDPOINT, S3_REGION,
S3_SECRET_ACCESS_KEY};
+use iceberg::spec::{NestedField, PrimitiveType, Schema, Type};
+use iceberg::{Catalog, Namespace, NamespaceIdent, Result, TableCreation,
TableIdent};
use iceberg_catalog_glue::{
GlueCatalog, GlueCatalogConfig, AWS_ACCESS_KEY_ID, AWS_REGION_NAME,
AWS_SECRET_ACCESS_KEY,
};
@@ -29,6 +31,7 @@ use port_scanner::scan_port_addr;
use tokio::time::sleep;
const GLUE_CATALOG_PORT: u16 = 5000;
+const MINIO_PORT: u16 = 9000;
#[derive(Debug)]
struct TestFixture {
@@ -47,6 +50,7 @@ async fn set_test_fixture(func: &str) -> TestFixture {
docker_compose.run();
let glue_catalog_ip = docker_compose.get_container_ip("moto");
+ let minio_ip = docker_compose.get_container_ip("minio");
let read_port = format!("{}:{}", glue_catalog_ip, GLUE_CATALOG_PORT);
loop {
@@ -65,14 +69,22 @@ async fn set_test_fixture(func: &str) -> TestFixture {
"my_secret_key".to_string(),
),
(AWS_REGION_NAME.to_string(), "us-east-1".to_string()),
+ (
+ S3_ENDPOINT.to_string(),
+ format!("http://{}:{}", minio_ip, MINIO_PORT),
+ ),
+ (S3_ACCESS_KEY_ID.to_string(), "admin".to_string()),
+ (S3_SECRET_ACCESS_KEY.to_string(), "password".to_string()),
+ (S3_REGION.to_string(), "us-east-1".to_string()),
]);
let config = GlueCatalogConfig::builder()
.uri(format!("http://{}:{}", glue_catalog_ip, GLUE_CATALOG_PORT))
- .props(props)
+ .warehouse("s3a://warehouse/hive".to_string())
+ .props(props.clone())
.build();
- let glue_catalog = GlueCatalog::new(config).await;
+ let glue_catalog = GlueCatalog::new(config).await.unwrap();
TestFixture {
_docker_compose: docker_compose,
@@ -91,6 +103,176 @@ async fn set_test_namespace(fixture: &TestFixture,
namespace: &NamespaceIdent) -
Ok(())
}
+fn set_table_creation(location: impl ToString, name: impl ToString) ->
Result<TableCreation> {
+ let schema = Schema::builder()
+ .with_schema_id(0)
+ .with_fields(vec![
+ NestedField::required(1, "foo",
Type::Primitive(PrimitiveType::Int)).into(),
+ NestedField::required(2, "bar",
Type::Primitive(PrimitiveType::String)).into(),
+ ])
+ .build()?;
+
+ let creation = TableCreation::builder()
+ .location(location.to_string())
+ .name(name.to_string())
+ .properties(HashMap::new())
+ .schema(schema)
+ .build();
+
+ Ok(creation)
+}
+
+#[tokio::test]
+async fn test_rename_table() -> Result<()> {
+ let fixture = set_test_fixture("test_rename_table").await;
+ let creation = set_table_creation("s3a://warehouse/hive", "my_table")?;
+ let namespace = Namespace::new(NamespaceIdent::new("my_database".into()));
+
+ fixture
+ .glue_catalog
+ .create_namespace(namespace.name(), HashMap::new())
+ .await?;
+
+ let table = fixture
+ .glue_catalog
+ .create_table(namespace.name(), creation)
+ .await?;
+
+ let dest = TableIdent::new(namespace.name().clone(),
"my_table_rename".to_string());
+
+ fixture
+ .glue_catalog
+ .rename_table(table.identifier(), &dest)
+ .await?;
+
+ let table = fixture.glue_catalog.load_table(&dest).await?;
+ assert_eq!(table.identifier(), &dest);
+
+ let src = TableIdent::new(namespace.name().clone(),
"my_table".to_string());
+
+ let src_table_exists = fixture.glue_catalog.table_exists(&src).await?;
+ assert!(!src_table_exists);
+
+ Ok(())
+}
+
+#[tokio::test]
+async fn test_table_exists() -> Result<()> {
+ let fixture = set_test_fixture("test_table_exists").await;
+ let creation = set_table_creation("s3a://warehouse/hive", "my_table")?;
+ let namespace = Namespace::new(NamespaceIdent::new("my_database".into()));
+
+ fixture
+ .glue_catalog
+ .create_namespace(namespace.name(), HashMap::new())
+ .await?;
+
+ let ident = TableIdent::new(namespace.name().clone(),
"my_table".to_string());
+
+ let exists = fixture.glue_catalog.table_exists(&ident).await?;
+ assert!(!exists);
+
+ let table = fixture
+ .glue_catalog
+ .create_table(namespace.name(), creation)
+ .await?;
+
+ let exists = fixture
+ .glue_catalog
+ .table_exists(table.identifier())
+ .await?;
+
+ assert!(exists);
+
+ Ok(())
+}
+
+#[tokio::test]
+async fn test_drop_table() -> Result<()> {
+ let fixture = set_test_fixture("test_drop_table").await;
+ let creation = set_table_creation("s3a://warehouse/hive", "my_table")?;
+ let namespace = Namespace::new(NamespaceIdent::new("my_database".into()));
+
+ fixture
+ .glue_catalog
+ .create_namespace(namespace.name(), HashMap::new())
+ .await?;
+
+ let table = fixture
+ .glue_catalog
+ .create_table(namespace.name(), creation)
+ .await?;
+
+ fixture.glue_catalog.drop_table(table.identifier()).await?;
+
+ let result = fixture
+ .glue_catalog
+ .table_exists(table.identifier())
+ .await?;
+
+ assert!(!result);
+
+ Ok(())
+}
+
+#[tokio::test]
+async fn test_load_table() -> Result<()> {
+ let fixture = set_test_fixture("test_load_table").await;
+ let creation = set_table_creation("s3a://warehouse/hive", "my_table")?;
+ let namespace = Namespace::new(NamespaceIdent::new("my_database".into()));
+
+ fixture
+ .glue_catalog
+ .create_namespace(namespace.name(), HashMap::new())
+ .await?;
+
+ let expected = fixture
+ .glue_catalog
+ .create_table(namespace.name(), creation)
+ .await?;
+
+ let result = fixture
+ .glue_catalog
+ .load_table(&TableIdent::new(
+ namespace.name().clone(),
+ "my_table".to_string(),
+ ))
+ .await?;
+
+ assert_eq!(result.identifier(), expected.identifier());
+ assert_eq!(result.metadata_location(), expected.metadata_location());
+ assert_eq!(result.metadata(), expected.metadata());
+
+ Ok(())
+}
+
+#[tokio::test]
+async fn test_create_table() -> Result<()> {
+ let fixture = set_test_fixture("test_create_table").await;
+ let namespace = NamespaceIdent::new("my_database".to_string());
+ set_test_namespace(&fixture, &namespace).await?;
+ let creation = set_table_creation("s3a://warehouse/hive", "my_table")?;
+
+ let result = fixture
+ .glue_catalog
+ .create_table(&namespace, creation)
+ .await?;
+
+ assert_eq!(result.identifier().name(), "my_table");
+ assert!(result
+ .metadata_location()
+ .is_some_and(|location|
location.starts_with("s3a://warehouse/hive/metadata/00000-")));
+ assert!(
+ fixture
+ .glue_catalog
+ .file_io()
+ .is_exist("s3a://warehouse/hive/metadata/")
+ .await?
+ );
+
+ Ok(())
+}
+
#[tokio::test]
async fn test_list_tables() -> Result<()> {
let fixture = set_test_fixture("test_list_tables").await;