This is an automated email from the ASF dual-hosted git repository.

liurenjie1024 pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/iceberg-rust.git


The following commit(s) were added to refs/heads/main by this push:
     new 3a947fa  Add hive metastore catalog support (part 1/2) (#237)
3a947fa is described below

commit 3a947faaf33e8c9e31019581ec22b93f2b2437bc
Author: Marvin Lanhenke <[email protected]>
AuthorDate: Sun Mar 10 01:38:20 2024 +0100

    Add hive metastore catalog support (part 1/2) (#237)
    
    * fmt members
    
    * setup basic test-infra for hms-catalog
    
    * add license
    
    * add hms create_namespace
    
    * add hms get_namespace
    
    * fix: typo
    
    * add hms namespace_exists and drop_namespace
    
    * add hms update_namespace
    
    * move fns into HmsCatalog
    
    * use `expose` in docker-compose
    
    * add hms list_tables
    
    * fix: clippy
    
    * fix: cargo sort
    
    * fix: cargo workspace
    
    * move fns into utils + add constants
    
    * include database name in error msg
    
    * add pilota to cargo workspace
    
    * add minio version
    
    * change visibility to pub(crate); return namespace from conversion fn
    
    * add minio version in rest-catalog docker-compose
    
    * fix: hms test docker infrastructure
    
    * add version to minio/mc
    
    * fix: license header
    
    * fix: core-site
    
    ---------
    
    Co-authored-by: mlanhenke <[email protected]>
---
 .cargo/audit.toml                                  |   8 +-
 Cargo.toml                                         |   8 +-
 crates/catalog/hms/Cargo.toml                      |   7 +
 crates/catalog/hms/src/catalog.rs                  | 160 ++++++++++--
 crates/catalog/hms/src/utils.rs                    | 285 ++++++++++++++++++++-
 crates/catalog/hms/testdata/hms_catalog/Dockerfile |  34 +++
 .../catalog/hms/testdata/hms_catalog/core-site.xml |  51 ++++
 .../testdata/hms_catalog}/docker-compose.yaml      |  45 ++--
 crates/catalog/hms/tests/hms_catalog_test.rs       | 223 ++++++++++++++++
 .../rest/testdata/rest_catalog/docker-compose.yaml |  12 +-
 crates/iceberg/src/catalog/mod.rs                  |   2 +-
 deny.toml                                          |  26 +-
 12 files changed, 790 insertions(+), 71 deletions(-)

diff --git a/.cargo/audit.toml b/.cargo/audit.toml
index 1d73f83..5db5a9d 100644
--- a/.cargo/audit.toml
+++ b/.cargo/audit.toml
@@ -17,8 +17,8 @@
 
 [advisories]
 ignore = [
-    # rsa
-    # Marvin Attack: potential key recovery through timing sidechannels
-    # Issues: https://github.com/apache/iceberg-rust/issues/221
-    "RUSTSEC-2023-0071",
+  # rsa
+  # Marvin Attack: potential key recovery through timing sidechannels
+  # Issues: https://github.com/apache/iceberg-rust/issues/221
+  "RUSTSEC-2023-0071",
 ]
diff --git a/Cargo.toml b/Cargo.toml
index 3234bd0..c482859 100644
--- a/Cargo.toml
+++ b/Cargo.toml
@@ -17,7 +17,12 @@
 
 [workspace]
 resolver = "2"
-members = ["crates/catalog/*", "crates/examples", "crates/iceberg", 
"crates/test_utils"]
+members = [
+  "crates/catalog/*",
+  "crates/examples",
+  "crates/iceberg",
+  "crates/test_utils",
+]
 
 [workspace.package]
 version = "0.2.0"
@@ -55,6 +60,7 @@ once_cell = "1"
 opendal = "0.45"
 ordered-float = "4.0.0"
 parquet = "50"
+pilota = "0.10.0"
 pretty_assertions = "1.4.0"
 port_scanner = "0.1.5"
 reqwest = { version = "^0.11", features = ["json"] }
diff --git a/crates/catalog/hms/Cargo.toml b/crates/catalog/hms/Cargo.toml
index f44125c..475da7b 100644
--- a/crates/catalog/hms/Cargo.toml
+++ b/crates/catalog/hms/Cargo.toml
@@ -33,5 +33,12 @@ anyhow = { workspace = true }
 async-trait = { workspace = true }
 hive_metastore = { workspace = true }
 iceberg = { workspace = true }
+log = { workspace = true }
+pilota = { workspace = true }
 typed-builder = { workspace = true }
 volo-thrift = { workspace = true }
+
+[dev-dependencies]
+iceberg_test_utils = { path = "../../test_utils", features = ["tests"] }
+port_scanner = { workspace = true }
+tokio = { workspace = true }
diff --git a/crates/catalog/hms/src/catalog.rs 
b/crates/catalog/hms/src/catalog.rs
index a00aaf3..57e3824 100644
--- a/crates/catalog/hms/src/catalog.rs
+++ b/crates/catalog/hms/src/catalog.rs
@@ -19,6 +19,7 @@ use super::utils::*;
 use async_trait::async_trait;
 use hive_metastore::ThriftHiveMetastoreClient;
 use hive_metastore::ThriftHiveMetastoreClientBuilder;
+use hive_metastore::ThriftHiveMetastoreGetDatabaseException;
 use iceberg::table::Table;
 use iceberg::{
     Catalog, Error, ErrorKind, Namespace, NamespaceIdent, Result, TableCommit, 
TableCreation,
@@ -28,6 +29,7 @@ use std::collections::HashMap;
 use std::fmt::{Debug, Formatter};
 use std::net::ToSocketAddrs;
 use typed_builder::TypedBuilder;
+use volo_thrift::ResponseError;
 
 /// Which variant of the thrift transport to communicate with HMS
 /// See: 
<https://github.com/apache/thrift/blob/master/doc/specs/thrift-rpc.md#framed-vs-unframed-transport>
@@ -97,7 +99,6 @@ impl HmsCatalog {
     }
 }
 
-/// Refer to 
<https://github.com/apache/iceberg/blob/main/hive-metastore/src/main/java/org/apache/iceberg/hive/HiveCatalog.java>
 for implementation details.
 #[async_trait]
 impl Catalog for HmsCatalog {
     /// HMS doesn't support nested namespaces.
@@ -125,36 +126,165 @@ impl Catalog for HmsCatalog {
             .collect())
     }
 
+    /// Creates a new namespace with the given identifier and properties.
+    ///
+    /// Attempts to create a namespace defined by the `namespace`
+    /// parameter and configured with the specified `properties`.
+    ///
+    /// This function can return an error in the following situations:
+    ///
+    /// - If `hive.metastore.database.owner-type` is specified without  
+    /// `hive.metastore.database.owner`,
+    /// - Errors from `validate_namespace` if the namespace identifier does not
+    /// meet validation criteria.
+    /// - Errors from `convert_to_database` if the properties cannot be  
+    /// successfully converted into a database configuration.
+    /// - Errors from the underlying database creation process, converted using
+    /// `from_thrift_error`.
     async fn create_namespace(
         &self,
-        _namespace: &NamespaceIdent,
-        _properties: HashMap<String, String>,
+        namespace: &NamespaceIdent,
+        properties: HashMap<String, String>,
     ) -> Result<Namespace> {
-        todo!()
+        let database = convert_to_database(namespace, &properties)?;
+
+        self.client
+            .0
+            .create_database(database)
+            .await
+            .map_err(from_thrift_error)?;
+
+        Ok(Namespace::new(namespace.clone()))
     }
 
-    async fn get_namespace(&self, _namespace: &NamespaceIdent) -> 
Result<Namespace> {
-        todo!()
+    /// Retrieves a namespace by its identifier.
+    ///
+    /// Validates the given namespace identifier and then queries the
+    /// underlying database client to fetch the corresponding namespace data.
+    /// Constructs a `Namespace` object with the retrieved data and returns it.
+    ///
+    /// This function can return an error in any of the following situations:
+    /// - If the provided namespace identifier fails validation checks
+    /// - If there is an error querying the database, returned by
+    /// `from_thrift_error`.
+    async fn get_namespace(&self, namespace: &NamespaceIdent) -> 
Result<Namespace> {
+        let name = validate_namespace(namespace)?;
+
+        let db = self
+            .client
+            .0
+            .get_database(name.clone().into())
+            .await
+            .map_err(from_thrift_error)?;
+
+        let ns = convert_to_namespace(&db)?;
+
+        Ok(ns)
     }
 
-    async fn namespace_exists(&self, _namespace: &NamespaceIdent) -> 
Result<bool> {
-        todo!()
+    /// Checks if a namespace exists within the Hive Metastore.
+    ///
+    /// Validates the namespace identifier by querying the Hive Metastore
+    /// to determine if the specified namespace (database) exists.
+    ///
+    /// # Returns
+    /// A `Result<bool>` indicating the outcome of the check:
+    /// - `Ok(true)` if the namespace exists.
+    /// - `Ok(false)` if the namespace does not exist, identified by a specific
+    /// `UserException` variant.
+    /// - `Err(...)` if an error occurs during validation or the Hive Metastore
+    /// query, with the error encapsulating the issue.
+    async fn namespace_exists(&self, namespace: &NamespaceIdent) -> 
Result<bool> {
+        let name = validate_namespace(namespace)?;
+
+        let resp = self.client.0.get_database(name.clone().into()).await;
+
+        match resp {
+            Ok(_) => Ok(true),
+            Err(err) => {
+                if let 
ResponseError::UserException(ThriftHiveMetastoreGetDatabaseException::O1(
+                    _,
+                )) = &err
+                {
+                    Ok(false)
+                } else {
+                    Err(from_thrift_error(err))
+                }
+            }
+        }
     }
 
+    /// Asynchronously updates properties of an existing namespace.
+    ///
+    /// Converts the given namespace identifier and properties into a database
+    /// representation and then attempts to update the corresponding namespace 
 
+    /// in the Hive Metastore.
+    ///
+    /// # Returns
+    /// Returns `Ok(())` if the namespace update is successful. If the
+    /// namespace cannot be updated due to missing information or an error
+    /// during the update process, an `Err(...)` is returned.
     async fn update_namespace(
         &self,
-        _namespace: &NamespaceIdent,
-        _properties: HashMap<String, String>,
+        namespace: &NamespaceIdent,
+        properties: HashMap<String, String>,
     ) -> Result<()> {
-        todo!()
+        let db = convert_to_database(namespace, &properties)?;
+
+        let name = match &db.name {
+            Some(name) => name,
+            None => {
+                return Err(Error::new(
+                    ErrorKind::DataInvalid,
+                    "Database name must be specified",
+                ))
+            }
+        };
+
+        self.client
+            .0
+            .alter_database(name.clone(), db)
+            .await
+            .map_err(from_thrift_error)?;
+
+        Ok(())
     }
 
-    async fn drop_namespace(&self, _namespace: &NamespaceIdent) -> Result<()> {
-        todo!()
+    /// Asynchronously drops a namespace from the Hive Metastore.
+    ///
+    /// # Returns
+    /// A `Result<()>` indicating the outcome:
+    /// - `Ok(())` signifies successful namespace deletion.
+    /// - `Err(...)` signifies failure to drop the namespace due to validation 
 
+    /// errors, connectivity issues, or Hive Metastore constraints.
+    async fn drop_namespace(&self, namespace: &NamespaceIdent) -> Result<()> {
+        let name = validate_namespace(namespace)?;
+
+        self.client
+            .0
+            .drop_database(name.into(), false, false)
+            .await
+            .map_err(from_thrift_error)?;
+
+        Ok(())
     }
 
-    async fn list_tables(&self, _namespace: &NamespaceIdent) -> 
Result<Vec<TableIdent>> {
-        todo!()
+    async fn list_tables(&self, namespace: &NamespaceIdent) -> 
Result<Vec<TableIdent>> {
+        let name = validate_namespace(namespace)?;
+
+        let tables = self
+            .client
+            .0
+            .get_all_tables(name.clone().into())
+            .await
+            .map_err(from_thrift_error)?;
+
+        let tables = tables
+            .iter()
+            .map(|table| TableIdent::new(namespace.clone(), table.to_string()))
+            .collect();
+
+        Ok(tables)
     }
 
     async fn create_table(
diff --git a/crates/catalog/hms/src/utils.rs b/crates/catalog/hms/src/utils.rs
index 1b0ff33..02f32c6 100644
--- a/crates/catalog/hms/src/utils.rs
+++ b/crates/catalog/hms/src/utils.rs
@@ -16,10 +16,24 @@
 // under the License.
 
 use anyhow::anyhow;
-use iceberg::{Error, ErrorKind};
+use hive_metastore::{Database, PrincipalType};
+use iceberg::{Error, ErrorKind, Namespace, NamespaceIdent, Result};
+use pilota::{AHashMap, FastStr};
+use std::collections::HashMap;
 use std::fmt::Debug;
 use std::io;
 
+/// hive.metastore.database.owner setting
+pub const HMS_DB_OWNER: &str = "hive.metastore.database.owner";
+/// hive.metastore.database.owner default setting
+pub const HMS_DEFAULT_DB_OWNER: &str = "user.name";
+/// hive.metastore.database.owner-type setting
+pub const HMS_DB_OWNER_TYPE: &str = "hive.metastore.database.owner-type";
+/// hive metatore `description` property
+pub const COMMENT: &str = "comment";
+/// hive metatore `location` property
+pub const LOCATION: &str = "location";
+
 /// Format a thrift error into iceberg error.
 pub fn from_thrift_error<T>(error: volo_thrift::error::ResponseError<T>) -> 
Error
 where
@@ -40,3 +54,272 @@ pub fn from_io_error(error: io::Error) -> Error {
     )
     .with_source(error)
 }
+
+/// Returns a `Namespace` by extracting database name and properties
+/// from `hive_metastore::hms::Database`
+pub(crate) fn convert_to_namespace(database: &Database) -> Result<Namespace> {
+    let mut properties = HashMap::new();
+
+    let name = if let Some(name) = &database.name {
+        name.to_string()
+    } else {
+        return Err(Error::new(
+            ErrorKind::DataInvalid,
+            "Database name must be specified",
+        ));
+    };
+
+    if let Some(description) = &database.description {
+        properties.insert(COMMENT.to_string(), description.to_string());
+    };
+
+    if let Some(location) = &database.location_uri {
+        properties.insert(LOCATION.to_string(), location.to_string());
+    };
+
+    if let Some(owner) = &database.owner_name {
+        properties.insert(HMS_DB_OWNER.to_string(), owner.to_string());
+    };
+
+    if let Some(owner_type) = &database.owner_type {
+        let value = match owner_type {
+            PrincipalType::User => "User",
+            PrincipalType::Group => "Group",
+            PrincipalType::Role => "Role",
+        };
+
+        properties.insert(HMS_DB_OWNER_TYPE.to_string(), value.to_string());
+    };
+
+    if let Some(params) = &database.parameters {
+        params.iter().for_each(|(k, v)| {
+            properties.insert(k.clone().into(), v.clone().into());
+        });
+    };
+
+    Ok(Namespace::with_properties(
+        NamespaceIdent::new(name),
+        properties,
+    ))
+}
+
+/// Converts name and properties into `hive_metastore::hms::Database`
+/// after validating the `namespace` and `owner-settings`.
+pub(crate) fn convert_to_database(
+    namespace: &NamespaceIdent,
+    properties: &HashMap<String, String>,
+) -> Result<Database> {
+    let name = validate_namespace(namespace)?;
+    validate_owner_settings(properties)?;
+
+    let mut db = Database::default();
+    let mut parameters = AHashMap::new();
+
+    db.name = Some(name.into());
+
+    for (k, v) in properties {
+        match k.as_str() {
+            COMMENT => db.description = Some(v.clone().into()),
+            LOCATION => db.location_uri = 
Some(format_location_uri(v.clone()).into()),
+            HMS_DB_OWNER => db.owner_name = Some(v.clone().into()),
+            HMS_DB_OWNER_TYPE => {
+                let owner_type = match v.to_lowercase().as_str() {
+                    "user" => PrincipalType::User,
+                    "group" => PrincipalType::Group,
+                    "role" => PrincipalType::Role,
+                    _ => {
+                        return Err(Error::new(
+                            ErrorKind::DataInvalid,
+                            format!("Invalid value for setting 'owner_type': 
{}", v),
+                        ))
+                    }
+                };
+                db.owner_type = Some(owner_type);
+            }
+            _ => {
+                parameters.insert(
+                    FastStr::from_string(k.clone()),
+                    FastStr::from_string(v.clone()),
+                );
+            }
+        }
+    }
+
+    db.parameters = Some(parameters);
+
+    // Set default owner, if none provided
+    // 
https://github.com/apache/iceberg/blob/main/hive-metastore/src/main/java/org/apache/iceberg/hive/HiveHadoopUtil.java#L44
+    if db.owner_name.is_none() {
+        db.owner_name = Some(HMS_DEFAULT_DB_OWNER.into());
+        db.owner_type = Some(PrincipalType::User);
+    }
+
+    Ok(db)
+}
+
+/// Checks if provided `NamespaceIdent` is valid.
+pub(crate) fn validate_namespace(namespace: &NamespaceIdent) -> Result<String> 
{
+    let name = namespace.as_ref();
+
+    if name.len() != 1 {
+        return Err(Error::new(
+            ErrorKind::DataInvalid,
+            format!(
+                "Invalid database name: {:?}, hierarchical namespaces are not 
supported",
+                namespace
+            ),
+        ));
+    }
+
+    let name = name[0].clone();
+
+    if name.is_empty() {
+        return Err(Error::new(
+            ErrorKind::DataInvalid,
+            "Invalid database, provided namespace is empty.",
+        ));
+    }
+
+    Ok(name)
+}
+
+/// Formats location_uri by e.g. removing trailing slashes.
+fn format_location_uri(location: String) -> String {
+    let mut location = location;
+
+    if !location.starts_with('/') {
+        location = format!("/{}", location);
+    }
+
+    if location.ends_with('/') && location.len() > 1 {
+        location.pop();
+    }
+
+    location
+}
+
+/// Checks if `owner-settings` are valid.
+/// If `owner_type` is set, then `owner` must also be set.
+fn validate_owner_settings(properties: &HashMap<String, String>) -> Result<()> 
{
+    let owner_is_set = properties.get(HMS_DB_OWNER).is_some();
+    let owner_type_is_set = properties.get(HMS_DB_OWNER_TYPE).is_some();
+
+    if owner_type_is_set && !owner_is_set {
+        return Err(Error::new(
+            ErrorKind::DataInvalid,
+            format!(
+                "Setting '{}' without setting '{}' is not allowed",
+                HMS_DB_OWNER_TYPE, HMS_DB_OWNER
+            ),
+        ));
+    }
+
+    Ok(())
+}
+
+#[cfg(test)]
+mod tests {
+    use iceberg::{Namespace, NamespaceIdent};
+
+    use super::*;
+
+    #[test]
+    fn test_convert_to_namespace() -> Result<()> {
+        let properties = HashMap::from([
+            (COMMENT.to_string(), "my_description".to_string()),
+            (LOCATION.to_string(), "/my_location".to_string()),
+            (HMS_DB_OWNER.to_string(), "apache".to_string()),
+            (HMS_DB_OWNER_TYPE.to_string(), "User".to_string()),
+            ("key1".to_string(), "value1".to_string()),
+        ]);
+
+        let ident = NamespaceIdent::new("my_namespace".into());
+        let db = convert_to_database(&ident, &properties)?;
+
+        let expected_ns = Namespace::with_properties(ident, properties);
+        let result_ns = convert_to_namespace(&db)?;
+
+        assert_eq!(expected_ns, result_ns);
+
+        Ok(())
+    }
+
+    #[test]
+    fn test_validate_owner_settings() {
+        let valid = HashMap::from([
+            (HMS_DB_OWNER.to_string(), "apache".to_string()),
+            (HMS_DB_OWNER_TYPE.to_string(), "user".to_string()),
+        ]);
+        let invalid = HashMap::from([(HMS_DB_OWNER_TYPE.to_string(), 
"user".to_string())]);
+
+        assert!(validate_owner_settings(&valid).is_ok());
+        assert!(validate_owner_settings(&invalid).is_err());
+    }
+
+    #[test]
+    fn test_convert_to_database() -> Result<()> {
+        let ns = NamespaceIdent::new("my_namespace".into());
+        let properties = HashMap::from([
+            (COMMENT.to_string(), "my_description".to_string()),
+            (LOCATION.to_string(), "my_location".to_string()),
+            (HMS_DB_OWNER.to_string(), "apache".to_string()),
+            (HMS_DB_OWNER_TYPE.to_string(), "user".to_string()),
+            ("key1".to_string(), "value1".to_string()),
+        ]);
+
+        let db = convert_to_database(&ns, &properties)?;
+
+        assert_eq!(db.name, Some(FastStr::from("my_namespace")));
+        assert_eq!(db.description, Some(FastStr::from("my_description")));
+        assert_eq!(db.owner_name, Some(FastStr::from("apache")));
+        assert_eq!(db.owner_type, Some(PrincipalType::User));
+
+        if let Some(params) = db.parameters {
+            assert_eq!(params.get("key1"), Some(&FastStr::from("value1")));
+        }
+
+        Ok(())
+    }
+
+    #[test]
+    fn test_convert_to_database_with_default_user() -> Result<()> {
+        let ns = NamespaceIdent::new("my_namespace".into());
+        let properties = HashMap::new();
+
+        let db = convert_to_database(&ns, &properties)?;
+
+        assert_eq!(db.name, Some(FastStr::from("my_namespace")));
+        assert_eq!(db.owner_name, Some(FastStr::from(HMS_DEFAULT_DB_OWNER)));
+        assert_eq!(db.owner_type, Some(PrincipalType::User));
+
+        Ok(())
+    }
+
+    #[test]
+    fn test_validate_namespace() {
+        let valid_ns = Namespace::new(NamespaceIdent::new("ns".to_string()));
+        let empty_ns = Namespace::new(NamespaceIdent::new("".to_string()));
+        let hierarchical_ns = Namespace::new(
+            NamespaceIdent::from_vec(vec!["level1".to_string(), 
"level2".to_string()]).unwrap(),
+        );
+
+        let valid = validate_namespace(valid_ns.name());
+        let empty = validate_namespace(empty_ns.name());
+        let hierarchical = validate_namespace(hierarchical_ns.name());
+
+        assert!(valid.is_ok());
+        assert!(empty.is_err());
+        assert!(hierarchical.is_err());
+    }
+
+    #[test]
+    fn test_format_location_uri() {
+        let inputs = vec!["iceberg", "is/", "/nice/", "really/nice/", "/"];
+        let outputs = vec!["/iceberg", "/is", "/nice", "/really/nice", "/"];
+
+        inputs.into_iter().zip(outputs).for_each(|(inp, out)| {
+            let location = format_location_uri(inp.to_string());
+            assert_eq!(location, out);
+        })
+    }
+}
diff --git a/crates/catalog/hms/testdata/hms_catalog/Dockerfile 
b/crates/catalog/hms/testdata/hms_catalog/Dockerfile
new file mode 100644
index 0000000..ff8c9fa
--- /dev/null
+++ b/crates/catalog/hms/testdata/hms_catalog/Dockerfile
@@ -0,0 +1,34 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+FROM openjdk:8-jre-slim AS build
+
+RUN apt-get update -qq && apt-get -qq -y install curl
+
+ENV AWSSDK_VERSION=2.20.18
+ENV HADOOP_VERSION=3.1.0
+
+RUN curl 
https://repo1.maven.org/maven2/com/amazonaws/aws-java-sdk-bundle/1.11.271/aws-java-sdk-bundle-1.11.271.jar
 -Lo /tmp/aws-java-sdk-bundle-1.11.271.jar
+RUN curl 
https://repo1.maven.org/maven2/org/apache/hadoop/hadoop-aws/${HADOOP_VERSION}/hadoop-aws-${HADOOP_VERSION}.jar
 -Lo /tmp/hadoop-aws-${HADOOP_VERSION}.jar
+
+
+FROM apache/hive:3.1.3
+
+ENV AWSSDK_VERSION=2.20.18
+ENV HADOOP_VERSION=3.1.0
+
+COPY --from=build /tmp/hadoop-aws-${HADOOP_VERSION}.jar 
/opt/hive/lib/hadoop-aws-${HADOOP_VERSION}.jar
+COPY --from=build /tmp/aws-java-sdk-bundle-1.11.271.jar 
/opt/hive/lib/aws-java-sdk-bundle-1.11.271.jar
+COPY core-site.xml /opt/hadoop/etc/hadoop/core-site.xml
\ No newline at end of file
diff --git a/crates/catalog/hms/testdata/hms_catalog/core-site.xml 
b/crates/catalog/hms/testdata/hms_catalog/core-site.xml
new file mode 100644
index 0000000..f0583a0
--- /dev/null
+++ b/crates/catalog/hms/testdata/hms_catalog/core-site.xml
@@ -0,0 +1,51 @@
+<!--
+   Licensed to the Apache Software Foundation (ASF) under one or more
+   contributor license agreements.  See the NOTICE file distributed with
+   this work for additional information regarding copyright ownership.
+   The ASF licenses this file to You under the Apache License, Version 2.0
+   (the "License"); you may not use this file except in compliance with
+   the License.  You may obtain a copy of the License at
+
+       http://www.apache.org/licenses/LICENSE-2.0
+
+   Unless required by applicable law or agreed to in writing, software
+   distributed under the License is distributed on an "AS IS" BASIS,
+   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+   See the License for the specific language governing permissions and
+   limitations under the License.
+-->
+
+<configuration>
+    <property>
+        <name>fs.defaultFS</name>
+        <value>s3a://warehouse/hive</value>
+    </property>
+    <property>
+        <name>fs.s3a.impl</name>
+        <value>org.apache.hadoop.fs.s3a.S3AFileSystem</value>
+    </property>
+    <property>
+        <name>fs.s3a.fast.upload</name>
+        <value>true</value>
+    </property>
+    <property>
+      <name>fs.s3a.endpoint</name>
+      <value>http://minio:9000</value>
+    </property>
+    <property>
+      <name>fs.s3a.access.key</name>
+      <value>admin</value>
+    </property>
+    <property>
+      <name>fs.s3a.secret.key</name>
+      <value>password</value>
+    </property>
+    <property>
+      <name>fs.s3a.connection.ssl.enabled</name>
+      <value>false</value>
+    </property>
+    <property>
+      <name>fs.s3a.path.style.access</name>
+      <value>true</value>
+    </property>
+</configuration>
\ No newline at end of file
diff --git a/crates/catalog/rest/testdata/rest_catalog/docker-compose.yaml 
b/crates/catalog/hms/testdata/hms_catalog/docker-compose.yaml
similarity index 58%
copy from crates/catalog/rest/testdata/rest_catalog/docker-compose.yaml
copy to crates/catalog/hms/testdata/hms_catalog/docker-compose.yaml
index 5c10146..c960586 100644
--- a/crates/catalog/rest/testdata/rest_catalog/docker-compose.yaml
+++ b/crates/catalog/hms/testdata/hms_catalog/docker-compose.yaml
@@ -18,48 +18,33 @@
 version: '3.8'
 
 services:
-  rest:
-    image: tabulario/iceberg-rest:0.10.0
-    environment:
-      - AWS_ACCESS_KEY_ID=admin
-      - AWS_SECRET_ACCESS_KEY=password
-      - AWS_REGION=us-east-1
-      - CATALOG_CATOLOG__IMPL=org.apache.iceberg.jdbc.JdbcCatalog
-      - CATALOG_URI=jdbc:sqlite:file:/tmp/iceberg_rest_mode=memory
-      - CATALOG_WAREHOUSE=s3://icebergdata/demo
-      - CATALOG_IO__IMPL=org.apache.iceberg.aws.s3.S3FileIO
-      - CATALOG_S3_ENDPOINT=http://minio:9000
-    depends_on:
-      - minio
-    links:
-      - minio:icebergdata.minio
-    expose:
-      - 8181
-
   minio:
-    image: minio/minio
+    image: minio/minio:RELEASE.2024-03-07T00-43-48Z
+    expose:
+      - 9000
+      - 9001
     environment:
       - MINIO_ROOT_USER=admin
       - MINIO_ROOT_PASSWORD=password
       - MINIO_DOMAIN=minio
-    expose:
-      - 9001
-      - 9000
     command: [ "server", "/data", "--console-address", ":9001" ]
 
   mc:
     depends_on:
       - minio
-    image: minio/mc
+    image: minio/mc:RELEASE.2024-03-07T00-31-49Z
     environment:
       - AWS_ACCESS_KEY_ID=admin
       - AWS_SECRET_ACCESS_KEY=password
       - AWS_REGION=us-east-1
     entrypoint: >
-      /bin/sh -c "
-      until (/usr/bin/mc config host add minio http://minio:9000 admin 
password) do echo '...waiting...' && sleep 1; done;
-      /usr/bin/mc rm -r --force minio/icebergdata;
-      /usr/bin/mc mb minio/icebergdata;
-      /usr/bin/mc policy set public minio/icebergdata;
-      tail -f /dev/null
-      "
\ No newline at end of file
+      /bin/sh -c " until (/usr/bin/mc config host add minio http://minio:9000 
admin password) do echo '...waiting...' && sleep 1; done; /usr/bin/mc mb 
minio/warehouse; /usr/bin/mc policy set public minio/warehouse; tail -f 
/dev/null "
+
+  hive-metastore:
+    image: iceberg-hive-metastore
+    build: ./
+    expose:
+      - 9083
+    environment:
+      SERVICE_NAME: "metastore"
+      SERVICE_OPTS: "-Dmetastore.warehouse.dir=s3a://warehouse/hive/"
diff --git a/crates/catalog/hms/tests/hms_catalog_test.rs 
b/crates/catalog/hms/tests/hms_catalog_test.rs
new file mode 100644
index 0000000..bab83a9
--- /dev/null
+++ b/crates/catalog/hms/tests/hms_catalog_test.rs
@@ -0,0 +1,223 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+//! Integration tests for hms catalog.
+
+use std::collections::HashMap;
+
+use iceberg::{Catalog, Namespace, NamespaceIdent};
+use iceberg_catalog_hms::{HmsCatalog, HmsCatalogConfig, HmsThriftTransport};
+use iceberg_test_utils::docker::DockerCompose;
+use iceberg_test_utils::{normalize_test_name, set_up};
+use port_scanner::scan_port_addr;
+use tokio::time::sleep;
+
+const HMS_CATALOG_PORT: u16 = 9083;
+type Result<T> = std::result::Result<T, iceberg::Error>;
+
+struct TestFixture {
+    _docker_compose: DockerCompose,
+    hms_catalog: HmsCatalog,
+}
+
+async fn set_test_fixture(func: &str) -> TestFixture {
+    set_up();
+
+    let docker_compose = DockerCompose::new(
+        normalize_test_name(format!("{}_{func}", module_path!())),
+        format!("{}/testdata/hms_catalog", env!("CARGO_MANIFEST_DIR")),
+    );
+
+    docker_compose.run();
+
+    let hms_catalog_ip = docker_compose.get_container_ip("hive-metastore");
+
+    let read_port = format!("{}:{}", hms_catalog_ip, HMS_CATALOG_PORT);
+    loop {
+        if !scan_port_addr(&read_port) {
+            log::info!("Waiting for 1s hms catalog to ready...");
+            sleep(std::time::Duration::from_millis(1000)).await;
+        } else {
+            break;
+        }
+    }
+
+    let config = HmsCatalogConfig::builder()
+        .address(format!("{}:{}", hms_catalog_ip, HMS_CATALOG_PORT))
+        .thrift_transport(HmsThriftTransport::Buffered)
+        .build();
+
+    let hms_catalog = HmsCatalog::new(config).unwrap();
+
+    TestFixture {
+        _docker_compose: docker_compose,
+        hms_catalog,
+    }
+}
+
+#[tokio::test]
+async fn test_list_namespace() -> Result<()> {
+    let fixture = set_test_fixture("test_list_namespace").await;
+
+    let expected_no_parent = vec![NamespaceIdent::new("default".into())];
+    let result_no_parent = fixture.hms_catalog.list_namespaces(None).await?;
+
+    let result_with_parent = fixture
+        .hms_catalog
+        .list_namespaces(Some(&NamespaceIdent::new("parent".into())))
+        .await?;
+
+    assert_eq!(expected_no_parent, result_no_parent);
+    assert!(result_with_parent.is_empty());
+
+    Ok(())
+}
+
+#[tokio::test]
+async fn test_create_namespace() -> Result<()> {
+    let fixture = set_test_fixture("test_create_namespace").await;
+
+    let ns = Namespace::new(NamespaceIdent::new("my_namespace".into()));
+    let properties = HashMap::from([
+        ("comment".to_string(), "my_description".to_string()),
+        ("location".to_string(), "my_location".to_string()),
+        (
+            "hive.metastore.database.owner".to_string(),
+            "apache".to_string(),
+        ),
+        (
+            "hive.metastore.database.owner-type".to_string(),
+            "user".to_string(),
+        ),
+        ("key1".to_string(), "value1".to_string()),
+    ]);
+
+    let result = fixture
+        .hms_catalog
+        .create_namespace(ns.name(), properties)
+        .await?;
+
+    assert_eq!(result, ns);
+
+    Ok(())
+}
+
+#[tokio::test]
+async fn test_get_namespace() -> Result<()> {
+    let fixture = set_test_fixture("test_get_namespace").await;
+
+    let ns = Namespace::new(NamespaceIdent::new("default".into()));
+    let properties = HashMap::from([
+        ("location".to_string(), "s3a://warehouse/hive".to_string()),
+        (
+            "hive.metastore.database.owner-type".to_string(),
+            "Role".to_string(),
+        ),
+        ("comment".to_string(), "Default Hive database".to_string()),
+        (
+            "hive.metastore.database.owner".to_string(),
+            "public".to_string(),
+        ),
+    ]);
+
+    let expected = 
Namespace::with_properties(NamespaceIdent::new("default".into()), properties);
+
+    let result = fixture.hms_catalog.get_namespace(ns.name()).await?;
+
+    assert_eq!(expected, result);
+
+    Ok(())
+}
+
+#[tokio::test]
+async fn test_namespace_exists() -> Result<()> {
+    let fixture = set_test_fixture("test_namespace_exists").await;
+
+    let ns_exists = Namespace::new(NamespaceIdent::new("default".into()));
+    let ns_not_exists = Namespace::new(NamespaceIdent::new("not_here".into()));
+
+    let result_exists = fixture
+        .hms_catalog
+        .namespace_exists(ns_exists.name())
+        .await?;
+    let result_not_exists = fixture
+        .hms_catalog
+        .namespace_exists(ns_not_exists.name())
+        .await?;
+
+    assert!(result_exists);
+    assert!(!result_not_exists);
+
+    Ok(())
+}
+
+#[tokio::test]
+async fn test_update_namespace() -> Result<()> {
+    let fixture = set_test_fixture("test_update_namespace").await;
+
+    let ns = Namespace::new(NamespaceIdent::new("default".into()));
+    let properties = HashMap::from([("comment".to_string(), 
"my_update".to_string())]);
+
+    fixture
+        .hms_catalog
+        .update_namespace(ns.name(), properties)
+        .await?;
+
+    let db = fixture.hms_catalog.get_namespace(ns.name()).await?;
+
+    assert_eq!(
+        db.properties().get("comment"),
+        Some(&"my_update".to_string())
+    );
+
+    Ok(())
+}
+
+#[tokio::test]
+async fn test_drop_namespace() -> Result<()> {
+    let fixture = set_test_fixture("test_drop_namespace").await;
+
+    let ns = Namespace::new(NamespaceIdent::new("delete_me".into()));
+
+    fixture
+        .hms_catalog
+        .create_namespace(ns.name(), HashMap::new())
+        .await?;
+
+    let result = fixture.hms_catalog.namespace_exists(ns.name()).await?;
+    assert!(result);
+
+    fixture.hms_catalog.drop_namespace(ns.name()).await?;
+
+    let result = fixture.hms_catalog.namespace_exists(ns.name()).await?;
+    assert!(!result);
+
+    Ok(())
+}
+
+#[tokio::test]
+async fn test_list_tables() -> Result<()> {
+    let fixture = set_test_fixture("test_list_tables").await;
+
+    let ns = Namespace::new(NamespaceIdent::new("default".into()));
+
+    let result = fixture.hms_catalog.list_tables(ns.name()).await?;
+
+    assert_eq!(result, vec![]);
+
+    Ok(())
+}
diff --git a/crates/catalog/rest/testdata/rest_catalog/docker-compose.yaml 
b/crates/catalog/rest/testdata/rest_catalog/docker-compose.yaml
index 5c10146..0152a22 100644
--- a/crates/catalog/rest/testdata/rest_catalog/docker-compose.yaml
+++ b/crates/catalog/rest/testdata/rest_catalog/docker-compose.yaml
@@ -37,7 +37,7 @@ services:
       - 8181
 
   minio:
-    image: minio/minio
+    image: minio/minio:RELEASE.2024-03-07T00-43-48Z
     environment:
       - MINIO_ROOT_USER=admin
       - MINIO_ROOT_PASSWORD=password
@@ -50,16 +50,10 @@ services:
   mc:
     depends_on:
       - minio
-    image: minio/mc
+    image: minio/mc:RELEASE.2024-03-07T00-31-49Z
     environment:
       - AWS_ACCESS_KEY_ID=admin
       - AWS_SECRET_ACCESS_KEY=password
       - AWS_REGION=us-east-1
     entrypoint: >
-      /bin/sh -c "
-      until (/usr/bin/mc config host add minio http://minio:9000 admin 
password) do echo '...waiting...' && sleep 1; done;
-      /usr/bin/mc rm -r --force minio/icebergdata;
-      /usr/bin/mc mb minio/icebergdata;
-      /usr/bin/mc policy set public minio/icebergdata;
-      tail -f /dev/null
-      "
\ No newline at end of file
+      /bin/sh -c " until (/usr/bin/mc config host add minio http://minio:9000 
admin password) do echo '...waiting...' && sleep 1; done; /usr/bin/mc rm -r 
--force minio/icebergdata; /usr/bin/mc mb minio/icebergdata; /usr/bin/mc policy 
set public minio/icebergdata; tail -f /dev/null "
diff --git a/crates/iceberg/src/catalog/mod.rs 
b/crates/iceberg/src/catalog/mod.rs
index bd579d5..708e6bf 100644
--- a/crates/iceberg/src/catalog/mod.rs
+++ b/crates/iceberg/src/catalog/mod.rs
@@ -50,7 +50,7 @@ pub trait Catalog: Debug + Sync + Send {
     async fn get_namespace(&self, namespace: &NamespaceIdent) -> 
Result<Namespace>;
 
     /// Check if namespace exists in catalog.
-    async fn namespace_exists(&self, namesace: &NamespaceIdent) -> 
Result<bool>;
+    async fn namespace_exists(&self, namespace: &NamespaceIdent) -> 
Result<bool>;
 
     /// Update a namespace inside the catalog.
     ///
diff --git a/deny.toml b/deny.toml
index e323679..1d30b5f 100644
--- a/deny.toml
+++ b/deny.toml
@@ -19,17 +19,23 @@
 unlicensed = "deny"
 copyleft = "deny"
 allow = [
-    "Apache-2.0",
-    "Apache-2.0 WITH LLVM-exception",
-    "MIT",
-    "BSD-3-Clause",
-    "ISC",
-    "CC0-1.0",
+  "Apache-2.0",
+  "Apache-2.0 WITH LLVM-exception",
+  "MIT",
+  "BSD-3-Clause",
+  "ISC",
+  "CC0-1.0",
 ]
 exceptions = [
-    { allow = ["OpenSSL"], name = "ring" },
-    { allow = ["Unicode-DFS-2016"], name = "unicode-ident" },
-    { allow = ["Zlib"], name = "adler32" }
+  { allow = [
+    "OpenSSL",
+  ], name = "ring" },
+  { allow = [
+    "Unicode-DFS-2016",
+  ], name = "unicode-ident" },
+  { allow = [
+    "Zlib",
+  ], name = "adler32" },
 ]
 
 [[licenses.clarify]]
@@ -42,4 +48,4 @@ name = "ring"
 # compiled into non-test libraries, is included below."
 # OpenSSL - Obviously
 expression = "ISC AND MIT AND OpenSSL"
-license-files = [{ path = "LICENSE", hash = 0xbd0eed23 }]
\ No newline at end of file
+license-files = [{ path = "LICENSE", hash = 0xbd0eed23 }]


Reply via email to