This is an automated email from the ASF dual-hosted git repository.
liurenjie1024 pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/iceberg-rust.git
The following commit(s) were added to refs/heads/main by this push:
new 3a947fa Add hive metastore catalog support (part 1/2) (#237)
3a947fa is described below
commit 3a947faaf33e8c9e31019581ec22b93f2b2437bc
Author: Marvin Lanhenke <[email protected]>
AuthorDate: Sun Mar 10 01:38:20 2024 +0100
Add hive metastore catalog support (part 1/2) (#237)
* fmt members
* setup basic test-infra for hms-catalog
* add license
* add hms create_namespace
* add hms get_namespace
* fix: typo
* add hms namespace_exists and drop_namespace
* add hms update_namespace
* move fns into HmsCatalog
* use `expose` in docker-compose
* add hms list_tables
* fix: clippy
* fix: cargo sort
* fix: cargo workspace
* move fns into utils + add constants
* include database name in error msg
* add pilota to cargo workspace
* add minio version
* change visibility to pub(crate); return namespace from conversion fn
* add minio version in rest-catalog docker-compose
* fix: hms test docker infrastructure
* add version to minio/mc
* fix: license header
* fix: core-site
---------
Co-authored-by: mlanhenke <[email protected]>
---
.cargo/audit.toml | 8 +-
Cargo.toml | 8 +-
crates/catalog/hms/Cargo.toml | 7 +
crates/catalog/hms/src/catalog.rs | 160 ++++++++++--
crates/catalog/hms/src/utils.rs | 285 ++++++++++++++++++++-
crates/catalog/hms/testdata/hms_catalog/Dockerfile | 34 +++
.../catalog/hms/testdata/hms_catalog/core-site.xml | 51 ++++
.../testdata/hms_catalog}/docker-compose.yaml | 45 ++--
crates/catalog/hms/tests/hms_catalog_test.rs | 223 ++++++++++++++++
.../rest/testdata/rest_catalog/docker-compose.yaml | 12 +-
crates/iceberg/src/catalog/mod.rs | 2 +-
deny.toml | 26 +-
12 files changed, 790 insertions(+), 71 deletions(-)
diff --git a/.cargo/audit.toml b/.cargo/audit.toml
index 1d73f83..5db5a9d 100644
--- a/.cargo/audit.toml
+++ b/.cargo/audit.toml
@@ -17,8 +17,8 @@
[advisories]
ignore = [
- # rsa
- # Marvin Attack: potential key recovery through timing sidechannels
- # Issues: https://github.com/apache/iceberg-rust/issues/221
- "RUSTSEC-2023-0071",
+ # rsa
+ # Marvin Attack: potential key recovery through timing sidechannels
+ # Issues: https://github.com/apache/iceberg-rust/issues/221
+ "RUSTSEC-2023-0071",
]
diff --git a/Cargo.toml b/Cargo.toml
index 3234bd0..c482859 100644
--- a/Cargo.toml
+++ b/Cargo.toml
@@ -17,7 +17,12 @@
[workspace]
resolver = "2"
-members = ["crates/catalog/*", "crates/examples", "crates/iceberg",
"crates/test_utils"]
+members = [
+ "crates/catalog/*",
+ "crates/examples",
+ "crates/iceberg",
+ "crates/test_utils",
+]
[workspace.package]
version = "0.2.0"
@@ -55,6 +60,7 @@ once_cell = "1"
opendal = "0.45"
ordered-float = "4.0.0"
parquet = "50"
+pilota = "0.10.0"
pretty_assertions = "1.4.0"
port_scanner = "0.1.5"
reqwest = { version = "^0.11", features = ["json"] }
diff --git a/crates/catalog/hms/Cargo.toml b/crates/catalog/hms/Cargo.toml
index f44125c..475da7b 100644
--- a/crates/catalog/hms/Cargo.toml
+++ b/crates/catalog/hms/Cargo.toml
@@ -33,5 +33,12 @@ anyhow = { workspace = true }
async-trait = { workspace = true }
hive_metastore = { workspace = true }
iceberg = { workspace = true }
+log = { workspace = true }
+pilota = { workspace = true }
typed-builder = { workspace = true }
volo-thrift = { workspace = true }
+
+[dev-dependencies]
+iceberg_test_utils = { path = "../../test_utils", features = ["tests"] }
+port_scanner = { workspace = true }
+tokio = { workspace = true }
diff --git a/crates/catalog/hms/src/catalog.rs
b/crates/catalog/hms/src/catalog.rs
index a00aaf3..57e3824 100644
--- a/crates/catalog/hms/src/catalog.rs
+++ b/crates/catalog/hms/src/catalog.rs
@@ -19,6 +19,7 @@ use super::utils::*;
use async_trait::async_trait;
use hive_metastore::ThriftHiveMetastoreClient;
use hive_metastore::ThriftHiveMetastoreClientBuilder;
+use hive_metastore::ThriftHiveMetastoreGetDatabaseException;
use iceberg::table::Table;
use iceberg::{
Catalog, Error, ErrorKind, Namespace, NamespaceIdent, Result, TableCommit,
TableCreation,
@@ -28,6 +29,7 @@ use std::collections::HashMap;
use std::fmt::{Debug, Formatter};
use std::net::ToSocketAddrs;
use typed_builder::TypedBuilder;
+use volo_thrift::ResponseError;
/// Which variant of the thrift transport to communicate with HMS
/// See:
<https://github.com/apache/thrift/blob/master/doc/specs/thrift-rpc.md#framed-vs-unframed-transport>
@@ -97,7 +99,6 @@ impl HmsCatalog {
}
}
-/// Refer to
<https://github.com/apache/iceberg/blob/main/hive-metastore/src/main/java/org/apache/iceberg/hive/HiveCatalog.java>
for implementation details.
#[async_trait]
impl Catalog for HmsCatalog {
/// HMS doesn't support nested namespaces.
@@ -125,36 +126,165 @@ impl Catalog for HmsCatalog {
.collect())
}
+ /// Creates a new namespace with the given identifier and properties.
+ ///
+ /// Attempts to create a namespace defined by the `namespace`
+ /// parameter and configured with the specified `properties`.
+ ///
+ /// This function can return an error in the following situations:
+ ///
+ /// - If `hive.metastore.database.owner-type` is specified without
+ /// `hive.metastore.database.owner`,
+ /// - Errors from `validate_namespace` if the namespace identifier does not
+ /// meet validation criteria.
+ /// - Errors from `convert_to_database` if the properties cannot be
+ /// successfully converted into a database configuration.
+ /// - Errors from the underlying database creation process, converted using
+ /// `from_thrift_error`.
async fn create_namespace(
&self,
- _namespace: &NamespaceIdent,
- _properties: HashMap<String, String>,
+ namespace: &NamespaceIdent,
+ properties: HashMap<String, String>,
) -> Result<Namespace> {
- todo!()
+ let database = convert_to_database(namespace, &properties)?;
+
+ self.client
+ .0
+ .create_database(database)
+ .await
+ .map_err(from_thrift_error)?;
+
+ Ok(Namespace::new(namespace.clone()))
}
- async fn get_namespace(&self, _namespace: &NamespaceIdent) ->
Result<Namespace> {
- todo!()
+ /// Retrieves a namespace by its identifier.
+ ///
+ /// Validates the given namespace identifier and then queries the
+ /// underlying database client to fetch the corresponding namespace data.
+ /// Constructs a `Namespace` object with the retrieved data and returns it.
+ ///
+ /// This function can return an error in any of the following situations:
+ /// - If the provided namespace identifier fails validation checks
+ /// - If there is an error querying the database, returned by
+ /// `from_thrift_error`.
+ async fn get_namespace(&self, namespace: &NamespaceIdent) ->
Result<Namespace> {
+ let name = validate_namespace(namespace)?;
+
+ let db = self
+ .client
+ .0
+ .get_database(name.clone().into())
+ .await
+ .map_err(from_thrift_error)?;
+
+ let ns = convert_to_namespace(&db)?;
+
+ Ok(ns)
}
- async fn namespace_exists(&self, _namespace: &NamespaceIdent) ->
Result<bool> {
- todo!()
+ /// Checks if a namespace exists within the Hive Metastore.
+ ///
+ /// Validates the namespace identifier by querying the Hive Metastore
+ /// to determine if the specified namespace (database) exists.
+ ///
+ /// # Returns
+ /// A `Result<bool>` indicating the outcome of the check:
+ /// - `Ok(true)` if the namespace exists.
+ /// - `Ok(false)` if the namespace does not exist, identified by a specific
+ /// `UserException` variant.
+ /// - `Err(...)` if an error occurs during validation or the Hive Metastore
+ /// query, with the error encapsulating the issue.
+ async fn namespace_exists(&self, namespace: &NamespaceIdent) ->
Result<bool> {
+ let name = validate_namespace(namespace)?;
+
+ let resp = self.client.0.get_database(name.clone().into()).await;
+
+ match resp {
+ Ok(_) => Ok(true),
+ Err(err) => {
+ if let
ResponseError::UserException(ThriftHiveMetastoreGetDatabaseException::O1(
+ _,
+ )) = &err
+ {
+ Ok(false)
+ } else {
+ Err(from_thrift_error(err))
+ }
+ }
+ }
}
+ /// Asynchronously updates properties of an existing namespace.
+ ///
+ /// Converts the given namespace identifier and properties into a database
+ /// representation and then attempts to update the corresponding namespace
+ /// in the Hive Metastore.
+ ///
+ /// # Returns
+ /// Returns `Ok(())` if the namespace update is successful. If the
+ /// namespace cannot be updated due to missing information or an error
+ /// during the update process, an `Err(...)` is returned.
async fn update_namespace(
&self,
- _namespace: &NamespaceIdent,
- _properties: HashMap<String, String>,
+ namespace: &NamespaceIdent,
+ properties: HashMap<String, String>,
) -> Result<()> {
- todo!()
+ let db = convert_to_database(namespace, &properties)?;
+
+ let name = match &db.name {
+ Some(name) => name,
+ None => {
+ return Err(Error::new(
+ ErrorKind::DataInvalid,
+ "Database name must be specified",
+ ))
+ }
+ };
+
+ self.client
+ .0
+ .alter_database(name.clone(), db)
+ .await
+ .map_err(from_thrift_error)?;
+
+ Ok(())
}
- async fn drop_namespace(&self, _namespace: &NamespaceIdent) -> Result<()> {
- todo!()
+ /// Asynchronously drops a namespace from the Hive Metastore.
+ ///
+ /// # Returns
+ /// A `Result<()>` indicating the outcome:
+ /// - `Ok(())` signifies successful namespace deletion.
+ /// - `Err(...)` signifies failure to drop the namespace due to validation
+ /// errors, connectivity issues, or Hive Metastore constraints.
+ async fn drop_namespace(&self, namespace: &NamespaceIdent) -> Result<()> {
+ let name = validate_namespace(namespace)?;
+
+ self.client
+ .0
+ .drop_database(name.into(), false, false)
+ .await
+ .map_err(from_thrift_error)?;
+
+ Ok(())
}
- async fn list_tables(&self, _namespace: &NamespaceIdent) ->
Result<Vec<TableIdent>> {
- todo!()
+ async fn list_tables(&self, namespace: &NamespaceIdent) ->
Result<Vec<TableIdent>> {
+ let name = validate_namespace(namespace)?;
+
+ let tables = self
+ .client
+ .0
+ .get_all_tables(name.clone().into())
+ .await
+ .map_err(from_thrift_error)?;
+
+ let tables = tables
+ .iter()
+ .map(|table| TableIdent::new(namespace.clone(), table.to_string()))
+ .collect();
+
+ Ok(tables)
}
async fn create_table(
diff --git a/crates/catalog/hms/src/utils.rs b/crates/catalog/hms/src/utils.rs
index 1b0ff33..02f32c6 100644
--- a/crates/catalog/hms/src/utils.rs
+++ b/crates/catalog/hms/src/utils.rs
@@ -16,10 +16,24 @@
// under the License.
use anyhow::anyhow;
-use iceberg::{Error, ErrorKind};
+use hive_metastore::{Database, PrincipalType};
+use iceberg::{Error, ErrorKind, Namespace, NamespaceIdent, Result};
+use pilota::{AHashMap, FastStr};
+use std::collections::HashMap;
use std::fmt::Debug;
use std::io;
+/// hive.metastore.database.owner setting
+pub const HMS_DB_OWNER: &str = "hive.metastore.database.owner";
+/// hive.metastore.database.owner default setting
+pub const HMS_DEFAULT_DB_OWNER: &str = "user.name";
+/// hive.metastore.database.owner-type setting
+pub const HMS_DB_OWNER_TYPE: &str = "hive.metastore.database.owner-type";
+/// hive metatore `description` property
+pub const COMMENT: &str = "comment";
+/// hive metatore `location` property
+pub const LOCATION: &str = "location";
+
/// Format a thrift error into iceberg error.
pub fn from_thrift_error<T>(error: volo_thrift::error::ResponseError<T>) ->
Error
where
@@ -40,3 +54,272 @@ pub fn from_io_error(error: io::Error) -> Error {
)
.with_source(error)
}
+
+/// Returns a `Namespace` by extracting database name and properties
+/// from `hive_metastore::hms::Database`
+pub(crate) fn convert_to_namespace(database: &Database) -> Result<Namespace> {
+ let mut properties = HashMap::new();
+
+ let name = if let Some(name) = &database.name {
+ name.to_string()
+ } else {
+ return Err(Error::new(
+ ErrorKind::DataInvalid,
+ "Database name must be specified",
+ ));
+ };
+
+ if let Some(description) = &database.description {
+ properties.insert(COMMENT.to_string(), description.to_string());
+ };
+
+ if let Some(location) = &database.location_uri {
+ properties.insert(LOCATION.to_string(), location.to_string());
+ };
+
+ if let Some(owner) = &database.owner_name {
+ properties.insert(HMS_DB_OWNER.to_string(), owner.to_string());
+ };
+
+ if let Some(owner_type) = &database.owner_type {
+ let value = match owner_type {
+ PrincipalType::User => "User",
+ PrincipalType::Group => "Group",
+ PrincipalType::Role => "Role",
+ };
+
+ properties.insert(HMS_DB_OWNER_TYPE.to_string(), value.to_string());
+ };
+
+ if let Some(params) = &database.parameters {
+ params.iter().for_each(|(k, v)| {
+ properties.insert(k.clone().into(), v.clone().into());
+ });
+ };
+
+ Ok(Namespace::with_properties(
+ NamespaceIdent::new(name),
+ properties,
+ ))
+}
+
+/// Converts name and properties into `hive_metastore::hms::Database`
+/// after validating the `namespace` and `owner-settings`.
+pub(crate) fn convert_to_database(
+ namespace: &NamespaceIdent,
+ properties: &HashMap<String, String>,
+) -> Result<Database> {
+ let name = validate_namespace(namespace)?;
+ validate_owner_settings(properties)?;
+
+ let mut db = Database::default();
+ let mut parameters = AHashMap::new();
+
+ db.name = Some(name.into());
+
+ for (k, v) in properties {
+ match k.as_str() {
+ COMMENT => db.description = Some(v.clone().into()),
+ LOCATION => db.location_uri =
Some(format_location_uri(v.clone()).into()),
+ HMS_DB_OWNER => db.owner_name = Some(v.clone().into()),
+ HMS_DB_OWNER_TYPE => {
+ let owner_type = match v.to_lowercase().as_str() {
+ "user" => PrincipalType::User,
+ "group" => PrincipalType::Group,
+ "role" => PrincipalType::Role,
+ _ => {
+ return Err(Error::new(
+ ErrorKind::DataInvalid,
+ format!("Invalid value for setting 'owner_type':
{}", v),
+ ))
+ }
+ };
+ db.owner_type = Some(owner_type);
+ }
+ _ => {
+ parameters.insert(
+ FastStr::from_string(k.clone()),
+ FastStr::from_string(v.clone()),
+ );
+ }
+ }
+ }
+
+ db.parameters = Some(parameters);
+
+ // Set default owner, if none provided
+ //
https://github.com/apache/iceberg/blob/main/hive-metastore/src/main/java/org/apache/iceberg/hive/HiveHadoopUtil.java#L44
+ if db.owner_name.is_none() {
+ db.owner_name = Some(HMS_DEFAULT_DB_OWNER.into());
+ db.owner_type = Some(PrincipalType::User);
+ }
+
+ Ok(db)
+}
+
+/// Checks if provided `NamespaceIdent` is valid.
+pub(crate) fn validate_namespace(namespace: &NamespaceIdent) -> Result<String>
{
+ let name = namespace.as_ref();
+
+ if name.len() != 1 {
+ return Err(Error::new(
+ ErrorKind::DataInvalid,
+ format!(
+ "Invalid database name: {:?}, hierarchical namespaces are not
supported",
+ namespace
+ ),
+ ));
+ }
+
+ let name = name[0].clone();
+
+ if name.is_empty() {
+ return Err(Error::new(
+ ErrorKind::DataInvalid,
+ "Invalid database, provided namespace is empty.",
+ ));
+ }
+
+ Ok(name)
+}
+
+/// Formats location_uri by e.g. removing trailing slashes.
+fn format_location_uri(location: String) -> String {
+ let mut location = location;
+
+ if !location.starts_with('/') {
+ location = format!("/{}", location);
+ }
+
+ if location.ends_with('/') && location.len() > 1 {
+ location.pop();
+ }
+
+ location
+}
+
+/// Checks if `owner-settings` are valid.
+/// If `owner_type` is set, then `owner` must also be set.
+fn validate_owner_settings(properties: &HashMap<String, String>) -> Result<()>
{
+ let owner_is_set = properties.get(HMS_DB_OWNER).is_some();
+ let owner_type_is_set = properties.get(HMS_DB_OWNER_TYPE).is_some();
+
+ if owner_type_is_set && !owner_is_set {
+ return Err(Error::new(
+ ErrorKind::DataInvalid,
+ format!(
+ "Setting '{}' without setting '{}' is not allowed",
+ HMS_DB_OWNER_TYPE, HMS_DB_OWNER
+ ),
+ ));
+ }
+
+ Ok(())
+}
+
+#[cfg(test)]
+mod tests {
+ use iceberg::{Namespace, NamespaceIdent};
+
+ use super::*;
+
+ #[test]
+ fn test_convert_to_namespace() -> Result<()> {
+ let properties = HashMap::from([
+ (COMMENT.to_string(), "my_description".to_string()),
+ (LOCATION.to_string(), "/my_location".to_string()),
+ (HMS_DB_OWNER.to_string(), "apache".to_string()),
+ (HMS_DB_OWNER_TYPE.to_string(), "User".to_string()),
+ ("key1".to_string(), "value1".to_string()),
+ ]);
+
+ let ident = NamespaceIdent::new("my_namespace".into());
+ let db = convert_to_database(&ident, &properties)?;
+
+ let expected_ns = Namespace::with_properties(ident, properties);
+ let result_ns = convert_to_namespace(&db)?;
+
+ assert_eq!(expected_ns, result_ns);
+
+ Ok(())
+ }
+
+ #[test]
+ fn test_validate_owner_settings() {
+ let valid = HashMap::from([
+ (HMS_DB_OWNER.to_string(), "apache".to_string()),
+ (HMS_DB_OWNER_TYPE.to_string(), "user".to_string()),
+ ]);
+ let invalid = HashMap::from([(HMS_DB_OWNER_TYPE.to_string(),
"user".to_string())]);
+
+ assert!(validate_owner_settings(&valid).is_ok());
+ assert!(validate_owner_settings(&invalid).is_err());
+ }
+
+ #[test]
+ fn test_convert_to_database() -> Result<()> {
+ let ns = NamespaceIdent::new("my_namespace".into());
+ let properties = HashMap::from([
+ (COMMENT.to_string(), "my_description".to_string()),
+ (LOCATION.to_string(), "my_location".to_string()),
+ (HMS_DB_OWNER.to_string(), "apache".to_string()),
+ (HMS_DB_OWNER_TYPE.to_string(), "user".to_string()),
+ ("key1".to_string(), "value1".to_string()),
+ ]);
+
+ let db = convert_to_database(&ns, &properties)?;
+
+ assert_eq!(db.name, Some(FastStr::from("my_namespace")));
+ assert_eq!(db.description, Some(FastStr::from("my_description")));
+ assert_eq!(db.owner_name, Some(FastStr::from("apache")));
+ assert_eq!(db.owner_type, Some(PrincipalType::User));
+
+ if let Some(params) = db.parameters {
+ assert_eq!(params.get("key1"), Some(&FastStr::from("value1")));
+ }
+
+ Ok(())
+ }
+
+ #[test]
+ fn test_convert_to_database_with_default_user() -> Result<()> {
+ let ns = NamespaceIdent::new("my_namespace".into());
+ let properties = HashMap::new();
+
+ let db = convert_to_database(&ns, &properties)?;
+
+ assert_eq!(db.name, Some(FastStr::from("my_namespace")));
+ assert_eq!(db.owner_name, Some(FastStr::from(HMS_DEFAULT_DB_OWNER)));
+ assert_eq!(db.owner_type, Some(PrincipalType::User));
+
+ Ok(())
+ }
+
+ #[test]
+ fn test_validate_namespace() {
+ let valid_ns = Namespace::new(NamespaceIdent::new("ns".to_string()));
+ let empty_ns = Namespace::new(NamespaceIdent::new("".to_string()));
+ let hierarchical_ns = Namespace::new(
+ NamespaceIdent::from_vec(vec!["level1".to_string(),
"level2".to_string()]).unwrap(),
+ );
+
+ let valid = validate_namespace(valid_ns.name());
+ let empty = validate_namespace(empty_ns.name());
+ let hierarchical = validate_namespace(hierarchical_ns.name());
+
+ assert!(valid.is_ok());
+ assert!(empty.is_err());
+ assert!(hierarchical.is_err());
+ }
+
+ #[test]
+ fn test_format_location_uri() {
+ let inputs = vec!["iceberg", "is/", "/nice/", "really/nice/", "/"];
+ let outputs = vec!["/iceberg", "/is", "/nice", "/really/nice", "/"];
+
+ inputs.into_iter().zip(outputs).for_each(|(inp, out)| {
+ let location = format_location_uri(inp.to_string());
+ assert_eq!(location, out);
+ })
+ }
+}
diff --git a/crates/catalog/hms/testdata/hms_catalog/Dockerfile
b/crates/catalog/hms/testdata/hms_catalog/Dockerfile
new file mode 100644
index 0000000..ff8c9fa
--- /dev/null
+++ b/crates/catalog/hms/testdata/hms_catalog/Dockerfile
@@ -0,0 +1,34 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+FROM openjdk:8-jre-slim AS build
+
+RUN apt-get update -qq && apt-get -qq -y install curl
+
+ENV AWSSDK_VERSION=2.20.18
+ENV HADOOP_VERSION=3.1.0
+
+RUN curl
https://repo1.maven.org/maven2/com/amazonaws/aws-java-sdk-bundle/1.11.271/aws-java-sdk-bundle-1.11.271.jar
-Lo /tmp/aws-java-sdk-bundle-1.11.271.jar
+RUN curl
https://repo1.maven.org/maven2/org/apache/hadoop/hadoop-aws/${HADOOP_VERSION}/hadoop-aws-${HADOOP_VERSION}.jar
-Lo /tmp/hadoop-aws-${HADOOP_VERSION}.jar
+
+
+FROM apache/hive:3.1.3
+
+ENV AWSSDK_VERSION=2.20.18
+ENV HADOOP_VERSION=3.1.0
+
+COPY --from=build /tmp/hadoop-aws-${HADOOP_VERSION}.jar
/opt/hive/lib/hadoop-aws-${HADOOP_VERSION}.jar
+COPY --from=build /tmp/aws-java-sdk-bundle-1.11.271.jar
/opt/hive/lib/aws-java-sdk-bundle-1.11.271.jar
+COPY core-site.xml /opt/hadoop/etc/hadoop/core-site.xml
\ No newline at end of file
diff --git a/crates/catalog/hms/testdata/hms_catalog/core-site.xml
b/crates/catalog/hms/testdata/hms_catalog/core-site.xml
new file mode 100644
index 0000000..f0583a0
--- /dev/null
+++ b/crates/catalog/hms/testdata/hms_catalog/core-site.xml
@@ -0,0 +1,51 @@
+<!--
+ Licensed to the Apache Software Foundation (ASF) under one or more
+ contributor license agreements. See the NOTICE file distributed with
+ this work for additional information regarding copyright ownership.
+ The ASF licenses this file to You under the Apache License, Version 2.0
+ (the "License"); you may not use this file except in compliance with
+ the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing, software
+ distributed under the License is distributed on an "AS IS" BASIS,
+ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ See the License for the specific language governing permissions and
+ limitations under the License.
+-->
+
+<configuration>
+ <property>
+ <name>fs.defaultFS</name>
+ <value>s3a://warehouse/hive</value>
+ </property>
+ <property>
+ <name>fs.s3a.impl</name>
+ <value>org.apache.hadoop.fs.s3a.S3AFileSystem</value>
+ </property>
+ <property>
+ <name>fs.s3a.fast.upload</name>
+ <value>true</value>
+ </property>
+ <property>
+ <name>fs.s3a.endpoint</name>
+ <value>http://minio:9000</value>
+ </property>
+ <property>
+ <name>fs.s3a.access.key</name>
+ <value>admin</value>
+ </property>
+ <property>
+ <name>fs.s3a.secret.key</name>
+ <value>password</value>
+ </property>
+ <property>
+ <name>fs.s3a.connection.ssl.enabled</name>
+ <value>false</value>
+ </property>
+ <property>
+ <name>fs.s3a.path.style.access</name>
+ <value>true</value>
+ </property>
+</configuration>
\ No newline at end of file
diff --git a/crates/catalog/rest/testdata/rest_catalog/docker-compose.yaml
b/crates/catalog/hms/testdata/hms_catalog/docker-compose.yaml
similarity index 58%
copy from crates/catalog/rest/testdata/rest_catalog/docker-compose.yaml
copy to crates/catalog/hms/testdata/hms_catalog/docker-compose.yaml
index 5c10146..c960586 100644
--- a/crates/catalog/rest/testdata/rest_catalog/docker-compose.yaml
+++ b/crates/catalog/hms/testdata/hms_catalog/docker-compose.yaml
@@ -18,48 +18,33 @@
version: '3.8'
services:
- rest:
- image: tabulario/iceberg-rest:0.10.0
- environment:
- - AWS_ACCESS_KEY_ID=admin
- - AWS_SECRET_ACCESS_KEY=password
- - AWS_REGION=us-east-1
- - CATALOG_CATOLOG__IMPL=org.apache.iceberg.jdbc.JdbcCatalog
- - CATALOG_URI=jdbc:sqlite:file:/tmp/iceberg_rest_mode=memory
- - CATALOG_WAREHOUSE=s3://icebergdata/demo
- - CATALOG_IO__IMPL=org.apache.iceberg.aws.s3.S3FileIO
- - CATALOG_S3_ENDPOINT=http://minio:9000
- depends_on:
- - minio
- links:
- - minio:icebergdata.minio
- expose:
- - 8181
-
minio:
- image: minio/minio
+ image: minio/minio:RELEASE.2024-03-07T00-43-48Z
+ expose:
+ - 9000
+ - 9001
environment:
- MINIO_ROOT_USER=admin
- MINIO_ROOT_PASSWORD=password
- MINIO_DOMAIN=minio
- expose:
- - 9001
- - 9000
command: [ "server", "/data", "--console-address", ":9001" ]
mc:
depends_on:
- minio
- image: minio/mc
+ image: minio/mc:RELEASE.2024-03-07T00-31-49Z
environment:
- AWS_ACCESS_KEY_ID=admin
- AWS_SECRET_ACCESS_KEY=password
- AWS_REGION=us-east-1
entrypoint: >
- /bin/sh -c "
- until (/usr/bin/mc config host add minio http://minio:9000 admin
password) do echo '...waiting...' && sleep 1; done;
- /usr/bin/mc rm -r --force minio/icebergdata;
- /usr/bin/mc mb minio/icebergdata;
- /usr/bin/mc policy set public minio/icebergdata;
- tail -f /dev/null
- "
\ No newline at end of file
+ /bin/sh -c " until (/usr/bin/mc config host add minio http://minio:9000
admin password) do echo '...waiting...' && sleep 1; done; /usr/bin/mc mb
minio/warehouse; /usr/bin/mc policy set public minio/warehouse; tail -f
/dev/null "
+
+ hive-metastore:
+ image: iceberg-hive-metastore
+ build: ./
+ expose:
+ - 9083
+ environment:
+ SERVICE_NAME: "metastore"
+ SERVICE_OPTS: "-Dmetastore.warehouse.dir=s3a://warehouse/hive/"
diff --git a/crates/catalog/hms/tests/hms_catalog_test.rs
b/crates/catalog/hms/tests/hms_catalog_test.rs
new file mode 100644
index 0000000..bab83a9
--- /dev/null
+++ b/crates/catalog/hms/tests/hms_catalog_test.rs
@@ -0,0 +1,223 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+//! Integration tests for hms catalog.
+
+use std::collections::HashMap;
+
+use iceberg::{Catalog, Namespace, NamespaceIdent};
+use iceberg_catalog_hms::{HmsCatalog, HmsCatalogConfig, HmsThriftTransport};
+use iceberg_test_utils::docker::DockerCompose;
+use iceberg_test_utils::{normalize_test_name, set_up};
+use port_scanner::scan_port_addr;
+use tokio::time::sleep;
+
+const HMS_CATALOG_PORT: u16 = 9083;
+type Result<T> = std::result::Result<T, iceberg::Error>;
+
+struct TestFixture {
+ _docker_compose: DockerCompose,
+ hms_catalog: HmsCatalog,
+}
+
+async fn set_test_fixture(func: &str) -> TestFixture {
+ set_up();
+
+ let docker_compose = DockerCompose::new(
+ normalize_test_name(format!("{}_{func}", module_path!())),
+ format!("{}/testdata/hms_catalog", env!("CARGO_MANIFEST_DIR")),
+ );
+
+ docker_compose.run();
+
+ let hms_catalog_ip = docker_compose.get_container_ip("hive-metastore");
+
+ let read_port = format!("{}:{}", hms_catalog_ip, HMS_CATALOG_PORT);
+ loop {
+ if !scan_port_addr(&read_port) {
+ log::info!("Waiting for 1s hms catalog to ready...");
+ sleep(std::time::Duration::from_millis(1000)).await;
+ } else {
+ break;
+ }
+ }
+
+ let config = HmsCatalogConfig::builder()
+ .address(format!("{}:{}", hms_catalog_ip, HMS_CATALOG_PORT))
+ .thrift_transport(HmsThriftTransport::Buffered)
+ .build();
+
+ let hms_catalog = HmsCatalog::new(config).unwrap();
+
+ TestFixture {
+ _docker_compose: docker_compose,
+ hms_catalog,
+ }
+}
+
+#[tokio::test]
+async fn test_list_namespace() -> Result<()> {
+ let fixture = set_test_fixture("test_list_namespace").await;
+
+ let expected_no_parent = vec![NamespaceIdent::new("default".into())];
+ let result_no_parent = fixture.hms_catalog.list_namespaces(None).await?;
+
+ let result_with_parent = fixture
+ .hms_catalog
+ .list_namespaces(Some(&NamespaceIdent::new("parent".into())))
+ .await?;
+
+ assert_eq!(expected_no_parent, result_no_parent);
+ assert!(result_with_parent.is_empty());
+
+ Ok(())
+}
+
+#[tokio::test]
+async fn test_create_namespace() -> Result<()> {
+ let fixture = set_test_fixture("test_create_namespace").await;
+
+ let ns = Namespace::new(NamespaceIdent::new("my_namespace".into()));
+ let properties = HashMap::from([
+ ("comment".to_string(), "my_description".to_string()),
+ ("location".to_string(), "my_location".to_string()),
+ (
+ "hive.metastore.database.owner".to_string(),
+ "apache".to_string(),
+ ),
+ (
+ "hive.metastore.database.owner-type".to_string(),
+ "user".to_string(),
+ ),
+ ("key1".to_string(), "value1".to_string()),
+ ]);
+
+ let result = fixture
+ .hms_catalog
+ .create_namespace(ns.name(), properties)
+ .await?;
+
+ assert_eq!(result, ns);
+
+ Ok(())
+}
+
+#[tokio::test]
+async fn test_get_namespace() -> Result<()> {
+ let fixture = set_test_fixture("test_get_namespace").await;
+
+ let ns = Namespace::new(NamespaceIdent::new("default".into()));
+ let properties = HashMap::from([
+ ("location".to_string(), "s3a://warehouse/hive".to_string()),
+ (
+ "hive.metastore.database.owner-type".to_string(),
+ "Role".to_string(),
+ ),
+ ("comment".to_string(), "Default Hive database".to_string()),
+ (
+ "hive.metastore.database.owner".to_string(),
+ "public".to_string(),
+ ),
+ ]);
+
+ let expected =
Namespace::with_properties(NamespaceIdent::new("default".into()), properties);
+
+ let result = fixture.hms_catalog.get_namespace(ns.name()).await?;
+
+ assert_eq!(expected, result);
+
+ Ok(())
+}
+
+#[tokio::test]
+async fn test_namespace_exists() -> Result<()> {
+ let fixture = set_test_fixture("test_namespace_exists").await;
+
+ let ns_exists = Namespace::new(NamespaceIdent::new("default".into()));
+ let ns_not_exists = Namespace::new(NamespaceIdent::new("not_here".into()));
+
+ let result_exists = fixture
+ .hms_catalog
+ .namespace_exists(ns_exists.name())
+ .await?;
+ let result_not_exists = fixture
+ .hms_catalog
+ .namespace_exists(ns_not_exists.name())
+ .await?;
+
+ assert!(result_exists);
+ assert!(!result_not_exists);
+
+ Ok(())
+}
+
+#[tokio::test]
+async fn test_update_namespace() -> Result<()> {
+ let fixture = set_test_fixture("test_update_namespace").await;
+
+ let ns = Namespace::new(NamespaceIdent::new("default".into()));
+ let properties = HashMap::from([("comment".to_string(),
"my_update".to_string())]);
+
+ fixture
+ .hms_catalog
+ .update_namespace(ns.name(), properties)
+ .await?;
+
+ let db = fixture.hms_catalog.get_namespace(ns.name()).await?;
+
+ assert_eq!(
+ db.properties().get("comment"),
+ Some(&"my_update".to_string())
+ );
+
+ Ok(())
+}
+
+#[tokio::test]
+async fn test_drop_namespace() -> Result<()> {
+ let fixture = set_test_fixture("test_drop_namespace").await;
+
+ let ns = Namespace::new(NamespaceIdent::new("delete_me".into()));
+
+ fixture
+ .hms_catalog
+ .create_namespace(ns.name(), HashMap::new())
+ .await?;
+
+ let result = fixture.hms_catalog.namespace_exists(ns.name()).await?;
+ assert!(result);
+
+ fixture.hms_catalog.drop_namespace(ns.name()).await?;
+
+ let result = fixture.hms_catalog.namespace_exists(ns.name()).await?;
+ assert!(!result);
+
+ Ok(())
+}
+
+#[tokio::test]
+async fn test_list_tables() -> Result<()> {
+ let fixture = set_test_fixture("test_list_tables").await;
+
+ let ns = Namespace::new(NamespaceIdent::new("default".into()));
+
+ let result = fixture.hms_catalog.list_tables(ns.name()).await?;
+
+ assert_eq!(result, vec![]);
+
+ Ok(())
+}
diff --git a/crates/catalog/rest/testdata/rest_catalog/docker-compose.yaml
b/crates/catalog/rest/testdata/rest_catalog/docker-compose.yaml
index 5c10146..0152a22 100644
--- a/crates/catalog/rest/testdata/rest_catalog/docker-compose.yaml
+++ b/crates/catalog/rest/testdata/rest_catalog/docker-compose.yaml
@@ -37,7 +37,7 @@ services:
- 8181
minio:
- image: minio/minio
+ image: minio/minio:RELEASE.2024-03-07T00-43-48Z
environment:
- MINIO_ROOT_USER=admin
- MINIO_ROOT_PASSWORD=password
@@ -50,16 +50,10 @@ services:
mc:
depends_on:
- minio
- image: minio/mc
+ image: minio/mc:RELEASE.2024-03-07T00-31-49Z
environment:
- AWS_ACCESS_KEY_ID=admin
- AWS_SECRET_ACCESS_KEY=password
- AWS_REGION=us-east-1
entrypoint: >
- /bin/sh -c "
- until (/usr/bin/mc config host add minio http://minio:9000 admin
password) do echo '...waiting...' && sleep 1; done;
- /usr/bin/mc rm -r --force minio/icebergdata;
- /usr/bin/mc mb minio/icebergdata;
- /usr/bin/mc policy set public minio/icebergdata;
- tail -f /dev/null
- "
\ No newline at end of file
+ /bin/sh -c " until (/usr/bin/mc config host add minio http://minio:9000
admin password) do echo '...waiting...' && sleep 1; done; /usr/bin/mc rm -r
--force minio/icebergdata; /usr/bin/mc mb minio/icebergdata; /usr/bin/mc policy
set public minio/icebergdata; tail -f /dev/null "
diff --git a/crates/iceberg/src/catalog/mod.rs
b/crates/iceberg/src/catalog/mod.rs
index bd579d5..708e6bf 100644
--- a/crates/iceberg/src/catalog/mod.rs
+++ b/crates/iceberg/src/catalog/mod.rs
@@ -50,7 +50,7 @@ pub trait Catalog: Debug + Sync + Send {
async fn get_namespace(&self, namespace: &NamespaceIdent) ->
Result<Namespace>;
/// Check if namespace exists in catalog.
- async fn namespace_exists(&self, namesace: &NamespaceIdent) ->
Result<bool>;
+ async fn namespace_exists(&self, namespace: &NamespaceIdent) ->
Result<bool>;
/// Update a namespace inside the catalog.
///
diff --git a/deny.toml b/deny.toml
index e323679..1d30b5f 100644
--- a/deny.toml
+++ b/deny.toml
@@ -19,17 +19,23 @@
unlicensed = "deny"
copyleft = "deny"
allow = [
- "Apache-2.0",
- "Apache-2.0 WITH LLVM-exception",
- "MIT",
- "BSD-3-Clause",
- "ISC",
- "CC0-1.0",
+ "Apache-2.0",
+ "Apache-2.0 WITH LLVM-exception",
+ "MIT",
+ "BSD-3-Clause",
+ "ISC",
+ "CC0-1.0",
]
exceptions = [
- { allow = ["OpenSSL"], name = "ring" },
- { allow = ["Unicode-DFS-2016"], name = "unicode-ident" },
- { allow = ["Zlib"], name = "adler32" }
+ { allow = [
+ "OpenSSL",
+ ], name = "ring" },
+ { allow = [
+ "Unicode-DFS-2016",
+ ], name = "unicode-ident" },
+ { allow = [
+ "Zlib",
+ ], name = "adler32" },
]
[[licenses.clarify]]
@@ -42,4 +48,4 @@ name = "ring"
# compiled into non-test libraries, is included below."
# OpenSSL - Obviously
expression = "ISC AND MIT AND OpenSSL"
-license-files = [{ path = "LICENSE", hash = 0xbd0eed23 }]
\ No newline at end of file
+license-files = [{ path = "LICENSE", hash = 0xbd0eed23 }]