This is an automated email from the ASF dual-hosted git repository.
liurenjie1024 pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/iceberg-rust.git
The following commit(s) were added to refs/heads/main by this push:
new 0629ad5 Add hive metastore catalog support (part 2/2) (#285)
0629ad5 is described below
commit 0629ad5c732c4873afd1daab9c0e0c20992b159a
Author: Marvin Lanhenke <[email protected]>
AuthorDate: Fri Mar 22 14:05:44 2024 +0100
Add hive metastore catalog support (part 2/2) (#285)
* fmt members
* setup basic test-infra for hms-catalog
* add license
* add hms create_namespace
* add hms get_namespace
* fix: typo
* add hms namespace_exists and drop_namespace
* add hms update_namespace
* move fns into HmsCatalog
* use `expose` in docker-compose
* add hms list_tables
* fix: clippy
* fix: cargo sort
* fix: cargo workspace
* move fns into utils + add constants
* include database name in error msg
* add pilota to cargo workspace
* add minio version
* change visibility to pub(crate); return namespace from conversion fn
* add minio version in rest-catalog docker-compose
* fix: hms test docker infrastructure
* add version to minio/mc
* fix: license header
* fix: core-site
* split utils and errors
* add fn get_default_table_location
* add fn get_metadata_location
* add docs
* add HiveSchemaBuilder
* add schema to HiveSchemaBuilder
* add convert_to_hive_table
* cargo sort
* implement table_ops without TableMetadataBuilder
* refactor: HiveSchema fn from_iceberg
* prepare table creation without metadata
* simplify HiveSchemaBuilder
* refactor: use ok_or_else()
* simplify HiveSchemaBuilder
* fix visibility of consts
* change serde metadata v2
* change default partition_specs and sort_orders
* change test
* add create table with metadata
* use FileIO::from_path
* add test_load_table
* small fixes + docs
* rename
* extract get_metadata_location from hive_table
* add integration tests
* fix: clippy
* remove whitespace
* fix: fixture names
* remove builder-prefix `with`
* capitalize error msg
* remove trait bound `Display`
* add const `OWNER`
* fix: default warehouse location
* add test-case `list_tables`
* add all primitives to test_schema
* exclude `Timestamptz` from hive conversion
* remove Self::T from schema
* remove context
* keep file_io in HmsCatalog
* use json schema repr
---------
Co-authored-by: mlanhenke <[email protected]>
---
crates/catalog/hms/Cargo.toml | 5 +-
crates/catalog/hms/src/catalog.rs | 231 +++++++++++++-
crates/catalog/hms/src/{lib.rs => error.rs} | 29 +-
crates/catalog/hms/src/lib.rs | 2 +
crates/catalog/hms/src/schema.rs | 457 +++++++++++++++++++++++++++
crates/catalog/hms/src/utils.rs | 312 +++++++++++++++---
crates/catalog/hms/tests/hms_catalog_test.rs | 188 ++++++++++-
7 files changed, 1148 insertions(+), 76 deletions(-)
diff --git a/crates/catalog/hms/Cargo.toml b/crates/catalog/hms/Cargo.toml
index 475da7b..5a03221 100644
--- a/crates/catalog/hms/Cargo.toml
+++ b/crates/catalog/hms/Cargo.toml
@@ -31,14 +31,17 @@ keywords = ["iceberg", "hive", "catalog"]
[dependencies]
anyhow = { workspace = true }
async-trait = { workspace = true }
+chrono = { workspace = true }
hive_metastore = { workspace = true }
iceberg = { workspace = true }
log = { workspace = true }
pilota = { workspace = true }
+serde_json = { workspace = true }
+tokio = { workspace = true }
typed-builder = { workspace = true }
+uuid = { workspace = true }
volo-thrift = { workspace = true }
[dev-dependencies]
iceberg_test_utils = { path = "../../test_utils", features = ["tests"] }
port_scanner = { workspace = true }
-tokio = { workspace = true }
diff --git a/crates/catalog/hms/src/catalog.rs
b/crates/catalog/hms/src/catalog.rs
index aba6a45..4ca66bd 100644
--- a/crates/catalog/hms/src/catalog.rs
+++ b/crates/catalog/hms/src/catalog.rs
@@ -15,11 +15,18 @@
// specific language governing permissions and limitations
// under the License.
+use crate::error::from_io_error;
+use crate::error::from_thrift_error;
+
use super::utils::*;
use async_trait::async_trait;
use hive_metastore::ThriftHiveMetastoreClient;
use hive_metastore::ThriftHiveMetastoreClientBuilder;
use hive_metastore::ThriftHiveMetastoreGetDatabaseException;
+use hive_metastore::ThriftHiveMetastoreGetTableException;
+use iceberg::io::FileIO;
+use iceberg::spec::TableMetadata;
+use iceberg::spec::TableMetadataBuilder;
use iceberg::table::Table;
use iceberg::{
Catalog, Error, ErrorKind, Namespace, NamespaceIdent, Result, TableCommit,
TableCreation,
@@ -28,6 +35,8 @@ use iceberg::{
use std::collections::HashMap;
use std::fmt::{Debug, Formatter};
use std::net::ToSocketAddrs;
+use tokio::io::AsyncReadExt;
+use tokio::io::AsyncWriteExt;
use typed_builder::TypedBuilder;
use volo_thrift::ResponseError;
@@ -47,6 +56,9 @@ pub enum HmsThriftTransport {
pub struct HmsCatalogConfig {
address: String,
thrift_transport: HmsThriftTransport,
+ warehouse: String,
+ #[builder(default)]
+ props: HashMap<String, String>,
}
struct HmsClient(ThriftHiveMetastoreClient);
@@ -55,6 +67,7 @@ struct HmsClient(ThriftHiveMetastoreClient);
pub struct HmsCatalog {
config: HmsCatalogConfig,
client: HmsClient,
+ file_io: FileIO,
}
impl Debug for HmsCatalog {
@@ -92,11 +105,20 @@ impl HmsCatalog {
.build(),
};
+ let file_io = FileIO::from_path(&config.warehouse)?
+ .with_props(&config.props)
+ .build()?;
+
Ok(Self {
config,
client: HmsClient(client),
+ file_io,
})
}
+ /// Get the catalogs `FileIO`
+ pub fn file_io(&self) -> FileIO {
+ self.file_io.clone()
+ }
}
#[async_trait]
@@ -173,7 +195,7 @@ impl Catalog for HmsCatalog {
let db = self
.client
.0
- .get_database(name.clone().into())
+ .get_database(name.into())
.await
.map_err(from_thrift_error)?;
@@ -197,7 +219,7 @@ impl Catalog for HmsCatalog {
async fn namespace_exists(&self, namespace: &NamespaceIdent) ->
Result<bool> {
let name = validate_namespace(namespace)?;
- let resp = self.client.0.get_database(name.clone().into()).await;
+ let resp = self.client.0.get_database(name.into()).await;
match resp {
Ok(_) => Ok(true),
@@ -269,13 +291,22 @@ impl Catalog for HmsCatalog {
Ok(())
}
+ /// Asynchronously lists all tables within a specified namespace.
+ ///
+ /// # Returns
+ ///
+ /// A `Result<Vec<TableIdent>>`, which is:
+ /// - `Ok(vec![...])` containing a vector of `TableIdent` instances, each
+ /// representing a table within the specified namespace.
+ /// - `Err(...)` if an error occurs during namespace validation or while
+ /// querying the database.
async fn list_tables(&self, namespace: &NamespaceIdent) ->
Result<Vec<TableIdent>> {
let name = validate_namespace(namespace)?;
let tables = self
.client
.0
- .get_all_tables(name.clone().into())
+ .get_all_tables(name.into())
.await
.map_err(from_thrift_error)?;
@@ -287,31 +318,201 @@ impl Catalog for HmsCatalog {
Ok(tables)
}
+ /// Creates a new table within a specified namespace using the provided
+ /// table creation settings.
+ ///
+ /// # Returns
+ /// A `Result` wrapping a `Table` object representing the newly created
+ /// table.
+ ///
+ /// # Errors
+ /// This function may return an error in several cases, including invalid
+ /// namespace identifiers, failure to determine a default storage location,
+ /// issues generating or writing table metadata, and errors communicating
+ /// with the Hive Metastore.
async fn create_table(
&self,
- _namespace: &NamespaceIdent,
- _creation: TableCreation,
+ namespace: &NamespaceIdent,
+ creation: TableCreation,
) -> Result<Table> {
- todo!()
+ let db_name = validate_namespace(namespace)?;
+ let table_name = creation.name.clone();
+
+ let location = match &creation.location {
+ Some(location) => location.clone(),
+ None => {
+ let ns = self.get_namespace(namespace).await?;
+ get_default_table_location(&ns, &table_name,
&self.config.warehouse)
+ }
+ };
+
+ let metadata =
TableMetadataBuilder::from_table_creation(creation)?.build()?;
+ let metadata_location = create_metadata_location(&location, 0)?;
+
+ let mut file = self
+ .file_io
+ .new_output(&metadata_location)?
+ .writer()
+ .await?;
+ file.write_all(&serde_json::to_vec(&metadata)?).await?;
+ file.shutdown().await?;
+
+ let hive_table = convert_to_hive_table(
+ db_name.clone(),
+ metadata.current_schema(),
+ table_name.clone(),
+ location,
+ metadata_location.clone(),
+ metadata.properties(),
+ )?;
+
+ self.client
+ .0
+ .create_table(hive_table)
+ .await
+ .map_err(from_thrift_error)?;
+
+ let table = Table::builder()
+ .file_io(self.file_io())
+ .metadata_location(metadata_location)
+ .metadata(metadata)
+ .identifier(TableIdent::new(NamespaceIdent::new(db_name),
table_name))
+ .build();
+
+ Ok(table)
}
- async fn load_table(&self, _table: &TableIdent) -> Result<Table> {
- todo!()
+ /// Loads a table from the Hive Metastore and constructs a `Table` object
+ /// based on its metadata.
+ ///
+ /// # Returns
+ /// A `Result` wrapping a `Table` object that represents the loaded table.
+ ///
+ /// # Errors
+ /// This function may return an error in several scenarios, including:
+ /// - Failure to validate the namespace.
+ /// - Failure to retrieve the table from the Hive Metastore.
+ /// - Absence of metadata location information in the table's properties.
+ /// - Issues reading or deserializing the table's metadata file.
+ async fn load_table(&self, table: &TableIdent) -> Result<Table> {
+ let db_name = validate_namespace(table.namespace())?;
+
+ let hive_table = self
+ .client
+ .0
+ .get_table(db_name.clone().into(), table.name.clone().into())
+ .await
+ .map_err(from_thrift_error)?;
+
+ let metadata_location = get_metadata_location(&hive_table.parameters)?;
+
+ let mut reader =
self.file_io.new_input(&metadata_location)?.reader().await?;
+ let mut metadata_str = String::new();
+ reader.read_to_string(&mut metadata_str).await?;
+ let metadata = serde_json::from_str::<TableMetadata>(&metadata_str)?;
+
+ let table = Table::builder()
+ .file_io(self.file_io())
+ .metadata_location(metadata_location)
+ .metadata(metadata)
+ .identifier(TableIdent::new(
+ NamespaceIdent::new(db_name),
+ table.name.clone(),
+ ))
+ .build();
+
+ Ok(table)
}
- async fn drop_table(&self, _table: &TableIdent) -> Result<()> {
- todo!()
+ /// Asynchronously drops a table from the database.
+ ///
+ /// # Errors
+ /// Returns an error if:
+ /// - The namespace provided in `table` cannot be validated
+ /// or does not exist.
+ /// - The underlying database client encounters an error while
+ /// attempting to drop the table. This includes scenarios where
+ /// the table does not exist.
+ /// - Any network or communication error occurs with the database backend.
+ async fn drop_table(&self, table: &TableIdent) -> Result<()> {
+ let db_name = validate_namespace(table.namespace())?;
+
+ self.client
+ .0
+ .drop_table(db_name.into(), table.name.clone().into(), false)
+ .await
+ .map_err(from_thrift_error)?;
+
+ Ok(())
}
- async fn table_exists(&self, _table: &TableIdent) -> Result<bool> {
- todo!()
+ /// Asynchronously checks the existence of a specified table
+ /// in the database.
+ ///
+ /// # Returns
+ /// - `Ok(true)` if the table exists in the database.
+ /// - `Ok(false)` if the table does not exist in the database.
+ /// - `Err(...)` if an error occurs during the process
+ async fn table_exists(&self, table: &TableIdent) -> Result<bool> {
+ let db_name = validate_namespace(table.namespace())?;
+ let table_name = table.name.clone();
+
+ let resp = self
+ .client
+ .0
+ .get_table(db_name.into(), table_name.into())
+ .await;
+
+ match resp {
+ Ok(_) => Ok(true),
+ Err(err) => {
+ if let
ResponseError::UserException(ThriftHiveMetastoreGetTableException::O2(_)) =
+ &err
+ {
+ Ok(false)
+ } else {
+ Err(from_thrift_error(err))
+ }
+ }
+ }
}
- async fn rename_table(&self, _src: &TableIdent, _dest: &TableIdent) ->
Result<()> {
- todo!()
+ /// Asynchronously renames a table within the database
+ /// or moves it between namespaces (databases).
+ ///
+ /// # Returns
+ /// - `Ok(())` on successful rename or move of the table.
+ /// - `Err(...)` if an error occurs during the process.
+ async fn rename_table(&self, src: &TableIdent, dest: &TableIdent) ->
Result<()> {
+ let src_dbname = validate_namespace(src.namespace())?;
+ let dest_dbname = validate_namespace(dest.namespace())?;
+
+ let src_tbl_name = src.name.clone();
+ let dest_tbl_name = dest.name.clone();
+
+ let mut tbl = self
+ .client
+ .0
+ .get_table(src_dbname.clone().into(), src_tbl_name.clone().into())
+ .await
+ .map_err(from_thrift_error)?;
+
+ tbl.db_name = Some(dest_dbname.into());
+ tbl.table_name = Some(dest_tbl_name.into());
+
+ self.client
+ .0
+ .alter_table(src_dbname.into(), src_tbl_name.into(), tbl)
+ .await
+ .map_err(from_thrift_error)?;
+
+ Ok(())
}
async fn update_table(&self, _commit: TableCommit) -> Result<Table> {
- todo!()
+ Err(Error::new(
+ ErrorKind::FeatureUnsupported,
+ "Updating a table is not supported yet",
+ ))
}
}
diff --git a/crates/catalog/hms/src/lib.rs b/crates/catalog/hms/src/error.rs
similarity index 55%
copy from crates/catalog/hms/src/lib.rs
copy to crates/catalog/hms/src/error.rs
index b75e749..a0f393c 100644
--- a/crates/catalog/hms/src/lib.rs
+++ b/crates/catalog/hms/src/error.rs
@@ -15,11 +15,28 @@
// specific language governing permissions and limitations
// under the License.
-//! Iceberg Hive Metastore Catalog implementation.
+use anyhow::anyhow;
+use iceberg::{Error, ErrorKind};
+use std::fmt::Debug;
+use std::io;
-#![deny(missing_docs)]
+/// Format a thrift error into iceberg error.
+pub fn from_thrift_error<T>(error: volo_thrift::error::ResponseError<T>) ->
Error
+where
+ T: Debug,
+{
+ Error::new(
+ ErrorKind::Unexpected,
+ "Operation failed for hitting thrift error".to_string(),
+ )
+ .with_source(anyhow!("thrift error: {:?}", error))
+}
-mod catalog;
-pub use catalog::*;
-
-mod utils;
+/// Format an io error into iceberg error.
+pub fn from_io_error(error: io::Error) -> Error {
+ Error::new(
+ ErrorKind::Unexpected,
+ "Operation failed for hitting io error".to_string(),
+ )
+ .with_source(error)
+}
diff --git a/crates/catalog/hms/src/lib.rs b/crates/catalog/hms/src/lib.rs
index b75e749..db0034d 100644
--- a/crates/catalog/hms/src/lib.rs
+++ b/crates/catalog/hms/src/lib.rs
@@ -22,4 +22,6 @@
mod catalog;
pub use catalog::*;
+mod error;
+mod schema;
mod utils;
diff --git a/crates/catalog/hms/src/schema.rs b/crates/catalog/hms/src/schema.rs
new file mode 100644
index 0000000..77caaf7
--- /dev/null
+++ b/crates/catalog/hms/src/schema.rs
@@ -0,0 +1,457 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+use hive_metastore::FieldSchema;
+use iceberg::spec::{visit_schema, PrimitiveType, Schema, SchemaVisitor};
+use iceberg::{Error, ErrorKind, Result};
+
+type HiveSchema = Vec<FieldSchema>;
+
+#[derive(Debug, Default)]
+pub(crate) struct HiveSchemaBuilder {
+ schema: HiveSchema,
+ depth: usize,
+}
+
+impl HiveSchemaBuilder {
+ /// Creates a new `HiveSchemaBuilder` from iceberg `Schema`
+ pub fn from_iceberg(schema: &Schema) -> Result<HiveSchemaBuilder> {
+ let mut builder = Self::default();
+ visit_schema(schema, &mut builder)?;
+ Ok(builder)
+ }
+
+ /// Returns the newly converted `HiveSchema`
+ pub fn build(self) -> HiveSchema {
+ self.schema
+ }
+
+ /// Check if is in `StructType` while traversing schema
+ fn is_inside_struct(&self) -> bool {
+ self.depth > 0
+ }
+}
+
+impl SchemaVisitor for HiveSchemaBuilder {
+ type T = String;
+
+ fn schema(
+ &mut self,
+ _schema: &iceberg::spec::Schema,
+ value: String,
+ ) -> iceberg::Result<String> {
+ Ok(value)
+ }
+
+ fn before_struct_field(
+ &mut self,
+ _field: &iceberg::spec::NestedFieldRef,
+ ) -> iceberg::Result<()> {
+ self.depth += 1;
+ Ok(())
+ }
+
+ fn r#struct(
+ &mut self,
+ r#_struct: &iceberg::spec::StructType,
+ results: Vec<String>,
+ ) -> iceberg::Result<String> {
+ Ok(format!("struct<{}>", results.join(", ")))
+ }
+
+ fn after_struct_field(
+ &mut self,
+ _field: &iceberg::spec::NestedFieldRef,
+ ) -> iceberg::Result<()> {
+ self.depth -= 1;
+ Ok(())
+ }
+
+ fn field(
+ &mut self,
+ field: &iceberg::spec::NestedFieldRef,
+ value: String,
+ ) -> iceberg::Result<String> {
+ if self.is_inside_struct() {
+ return Ok(format!("{}:{}", field.name, value));
+ }
+
+ self.schema.push(FieldSchema {
+ name: Some(field.name.clone().into()),
+ r#type: Some(value.clone().into()),
+ comment: field.doc.clone().map(|doc| doc.into()),
+ });
+
+ Ok(value)
+ }
+
+ fn list(&mut self, _list: &iceberg::spec::ListType, value: String) ->
iceberg::Result<String> {
+ Ok(format!("array<{}>", value))
+ }
+
+ fn map(
+ &mut self,
+ _map: &iceberg::spec::MapType,
+ key_value: String,
+ value: String,
+ ) -> iceberg::Result<String> {
+ Ok(format!("map<{},{}>", key_value, value))
+ }
+
+ fn primitive(&mut self, p: &iceberg::spec::PrimitiveType) ->
iceberg::Result<String> {
+ let hive_type = match p {
+ PrimitiveType::Boolean => "boolean".to_string(),
+ PrimitiveType::Int => "int".to_string(),
+ PrimitiveType::Long => "bigint".to_string(),
+ PrimitiveType::Float => "float".to_string(),
+ PrimitiveType::Double => "double".to_string(),
+ PrimitiveType::Date => "date".to_string(),
+ PrimitiveType::Timestamp => "timestamp".to_string(),
+ PrimitiveType::Time | PrimitiveType::String | PrimitiveType::Uuid
=> {
+ "string".to_string()
+ }
+ PrimitiveType::Binary | PrimitiveType::Fixed(_) =>
"binary".to_string(),
+ PrimitiveType::Decimal { precision, scale } => {
+ format!("decimal({},{})", precision, scale)
+ }
+ _ => {
+ return Err(Error::new(
+ ErrorKind::FeatureUnsupported,
+ "Conversion from 'Timestamptz' is not supported",
+ ))
+ }
+ };
+
+ Ok(hive_type)
+ }
+}
+
+#[cfg(test)]
+mod tests {
+ use iceberg::{spec::Schema, Result};
+
+ use super::*;
+
+ #[test]
+ fn test_schema_with_nested_maps() -> Result<()> {
+ let record = r#"
+ {
+ "schema-id": 1,
+ "type": "struct",
+ "fields": [
+ {
+ "id": 1,
+ "name": "quux",
+ "required": true,
+ "type": {
+ "type": "map",
+ "key-id": 2,
+ "key": "string",
+ "value-id": 3,
+ "value-required": true,
+ "value": {
+ "type": "map",
+ "key-id": 4,
+ "key": "string",
+ "value-id": 5,
+ "value-required": true,
+ "value": "int"
+ }
+ }
+ }
+ ]
+ }
+ "#;
+
+ let schema = serde_json::from_str::<Schema>(record)?;
+
+ let result = HiveSchemaBuilder::from_iceberg(&schema)?.build();
+
+ let expected = vec![FieldSchema {
+ name: Some("quux".into()),
+ r#type: Some("map<string,map<string,int>>".into()),
+ comment: None,
+ }];
+
+ assert_eq!(result, expected);
+
+ Ok(())
+ }
+
+ #[test]
+ fn test_schema_with_struct_inside_list() -> Result<()> {
+ let record = r#"
+ {
+ "schema-id": 1,
+ "type": "struct",
+ "fields": [
+ {
+ "id": 1,
+ "name": "location",
+ "required": true,
+ "type": {
+ "type": "list",
+ "element-id": 2,
+ "element-required": true,
+ "element": {
+ "type": "struct",
+ "fields": [
+ {
+ "id": 3,
+ "name": "latitude",
+ "required": false,
+ "type": "float"
+ },
+ {
+ "id": 4,
+ "name": "longitude",
+ "required": false,
+ "type": "float"
+ }
+ ]
+ }
+ }
+ }
+ ]
+ }
+ "#;
+
+ let schema = serde_json::from_str::<Schema>(record)?;
+
+ let result = HiveSchemaBuilder::from_iceberg(&schema)?.build();
+
+ let expected = vec![FieldSchema {
+ name: Some("location".into()),
+ r#type: Some("array<struct<latitude:float,
longitude:float>>".into()),
+ comment: None,
+ }];
+
+ assert_eq!(result, expected);
+
+ Ok(())
+ }
+
+ #[test]
+ fn test_schema_with_structs() -> Result<()> {
+ let record = r#"{
+ "type": "struct",
+ "schema-id": 1,
+ "fields": [
+ {
+ "id": 1,
+ "name": "person",
+ "required": true,
+ "type": {
+ "type": "struct",
+ "fields": [
+ {
+ "id": 2,
+ "name": "name",
+ "required": true,
+ "type": "string"
+ },
+ {
+ "id": 3,
+ "name": "age",
+ "required": false,
+ "type": "int"
+ }
+ ]
+ }
+ }
+ ]
+ }"#;
+
+ let schema = serde_json::from_str::<Schema>(record)?;
+
+ let result = HiveSchemaBuilder::from_iceberg(&schema)?.build();
+
+ let expected = vec![FieldSchema {
+ name: Some("person".into()),
+ r#type: Some("struct<name:string, age:int>".into()),
+ comment: None,
+ }];
+
+ assert_eq!(result, expected);
+
+ Ok(())
+ }
+
+ #[test]
+ fn test_schema_with_simple_fields() -> Result<()> {
+ let record = r#"{
+ "type": "struct",
+ "schema-id": 1,
+ "fields": [
+ {
+ "id": 1,
+ "name": "c1",
+ "required": true,
+ "type": "boolean"
+ },
+ {
+ "id": 2,
+ "name": "c2",
+ "required": true,
+ "type": "int"
+ },
+ {
+ "id": 3,
+ "name": "c3",
+ "required": true,
+ "type": "long"
+ },
+ {
+ "id": 4,
+ "name": "c4",
+ "required": true,
+ "type": "float"
+ },
+ {
+ "id": 5,
+ "name": "c5",
+ "required": true,
+ "type": "double"
+ },
+ {
+ "id": 6,
+ "name": "c6",
+ "required": true,
+ "type": "decimal(2,2)"
+ },
+ {
+ "id": 7,
+ "name": "c7",
+ "required": true,
+ "type": "date"
+ },
+ {
+ "id": 8,
+ "name": "c8",
+ "required": true,
+ "type": "time"
+ },
+ {
+ "id": 9,
+ "name": "c9",
+ "required": true,
+ "type": "timestamp"
+ },
+ {
+ "id": 10,
+ "name": "c10",
+ "required": true,
+ "type": "string"
+ },
+ {
+ "id": 11,
+ "name": "c11",
+ "required": true,
+ "type": "uuid"
+ },
+ {
+ "id": 12,
+ "name": "c12",
+ "required": true,
+ "type": "fixed[4]"
+ },
+ {
+ "id": 13,
+ "name": "c13",
+ "required": true,
+ "type": "binary"
+ }
+ ]
+ }"#;
+
+ let schema = serde_json::from_str::<Schema>(record)?;
+
+ let result = HiveSchemaBuilder::from_iceberg(&schema)?.build();
+
+ let expected = vec![
+ FieldSchema {
+ name: Some("c1".into()),
+ r#type: Some("boolean".into()),
+ comment: None,
+ },
+ FieldSchema {
+ name: Some("c2".into()),
+ r#type: Some("int".into()),
+ comment: None,
+ },
+ FieldSchema {
+ name: Some("c3".into()),
+ r#type: Some("bigint".into()),
+ comment: None,
+ },
+ FieldSchema {
+ name: Some("c4".into()),
+ r#type: Some("float".into()),
+ comment: None,
+ },
+ FieldSchema {
+ name: Some("c5".into()),
+ r#type: Some("double".into()),
+ comment: None,
+ },
+ FieldSchema {
+ name: Some("c6".into()),
+ r#type: Some("decimal(2,2)".into()),
+ comment: None,
+ },
+ FieldSchema {
+ name: Some("c7".into()),
+ r#type: Some("date".into()),
+ comment: None,
+ },
+ FieldSchema {
+ name: Some("c8".into()),
+ r#type: Some("string".into()),
+ comment: None,
+ },
+ FieldSchema {
+ name: Some("c9".into()),
+ r#type: Some("timestamp".into()),
+ comment: None,
+ },
+ FieldSchema {
+ name: Some("c10".into()),
+ r#type: Some("string".into()),
+ comment: None,
+ },
+ FieldSchema {
+ name: Some("c11".into()),
+ r#type: Some("string".into()),
+ comment: None,
+ },
+ FieldSchema {
+ name: Some("c12".into()),
+ r#type: Some("binary".into()),
+ comment: None,
+ },
+ FieldSchema {
+ name: Some("c13".into()),
+ r#type: Some("binary".into()),
+ comment: None,
+ },
+ ];
+
+ assert_eq!(result, expected);
+
+ Ok(())
+ }
+}
diff --git a/crates/catalog/hms/src/utils.rs b/crates/catalog/hms/src/utils.rs
index 02f32c6..04ee5d4 100644
--- a/crates/catalog/hms/src/utils.rs
+++ b/crates/catalog/hms/src/utils.rs
@@ -15,59 +15,52 @@
// specific language governing permissions and limitations
// under the License.
-use anyhow::anyhow;
-use hive_metastore::{Database, PrincipalType};
-use iceberg::{Error, ErrorKind, Namespace, NamespaceIdent, Result};
+use chrono::Utc;
+use hive_metastore::{Database, PrincipalType, SerDeInfo, StorageDescriptor};
+use iceberg::{spec::Schema, Error, ErrorKind, Namespace, NamespaceIdent,
Result};
use pilota::{AHashMap, FastStr};
use std::collections::HashMap;
-use std::fmt::Debug;
-use std::io;
+use uuid::Uuid;
+
+use crate::schema::HiveSchemaBuilder;
/// hive.metastore.database.owner setting
-pub const HMS_DB_OWNER: &str = "hive.metastore.database.owner";
+const HMS_DB_OWNER: &str = "hive.metastore.database.owner";
/// hive.metastore.database.owner default setting
-pub const HMS_DEFAULT_DB_OWNER: &str = "user.name";
+const HMS_DEFAULT_DB_OWNER: &str = "user.name";
/// hive.metastore.database.owner-type setting
-pub const HMS_DB_OWNER_TYPE: &str = "hive.metastore.database.owner-type";
+const HMS_DB_OWNER_TYPE: &str = "hive.metastore.database.owner-type";
+/// hive metatore `owner` property
+const OWNER: &str = "owner";
/// hive metatore `description` property
-pub const COMMENT: &str = "comment";
+const COMMENT: &str = "comment";
/// hive metatore `location` property
-pub const LOCATION: &str = "location";
-
-/// Format a thrift error into iceberg error.
-pub fn from_thrift_error<T>(error: volo_thrift::error::ResponseError<T>) ->
Error
-where
- T: Debug,
-{
- Error::new(
- ErrorKind::Unexpected,
- "operation failed for hitting thrift error".to_string(),
- )
- .with_source(anyhow!("thrift error: {:?}", error))
-}
-
-/// Format an io error into iceberg error.
-pub fn from_io_error(error: io::Error) -> Error {
- Error::new(
- ErrorKind::Unexpected,
- "operation failed for hitting io error".to_string(),
- )
- .with_source(error)
-}
+const LOCATION: &str = "location";
+/// hive metatore `metadat_location` property
+const METADATA_LOCATION: &str = "metadata_location";
+/// hive metatore `external` property
+const EXTERNAL: &str = "EXTERNAL";
+/// hive metatore `external_table` property
+const EXTERNAL_TABLE: &str = "EXTERNAL_TABLE";
+/// hive metatore `table_type` property
+const TABLE_TYPE: &str = "table_type";
+/// hive metatore `SerDeInfo` serialization_lib parameter
+const SERIALIZATION_LIB: &str =
"org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe";
+/// hive metatore input format
+const INPUT_FORMAT: &str = "org.apache.hadoop.mapred.FileInputFormat";
+/// hive metatore output format
+const OUTPUT_FORMAT: &str = "org.apache.hadoop.mapred.FileOutputFormat";
/// Returns a `Namespace` by extracting database name and properties
/// from `hive_metastore::hms::Database`
pub(crate) fn convert_to_namespace(database: &Database) -> Result<Namespace> {
let mut properties = HashMap::new();
- let name = if let Some(name) = &database.name {
- name.to_string()
- } else {
- return Err(Error::new(
- ErrorKind::DataInvalid,
- "Database name must be specified",
- ));
- };
+ let name = database
+ .name
+ .as_ref()
+ .ok_or_else(|| Error::new(ErrorKind::DataInvalid, "Database name must
be specified"))?
+ .to_string();
if let Some(description) = &database.description {
properties.insert(COMMENT.to_string(), description.to_string());
@@ -157,6 +150,57 @@ pub(crate) fn convert_to_database(
Ok(db)
}
+pub(crate) fn convert_to_hive_table(
+ db_name: String,
+ schema: &Schema,
+ table_name: String,
+ location: String,
+ metadata_location: String,
+ properties: &HashMap<String, String>,
+) -> Result<hive_metastore::Table> {
+ let serde_info = SerDeInfo {
+ serialization_lib: Some(SERIALIZATION_LIB.into()),
+ ..Default::default()
+ };
+
+ let hive_schema = HiveSchemaBuilder::from_iceberg(schema)?.build();
+
+ let storage_descriptor = StorageDescriptor {
+ location: Some(location.into()),
+ cols: Some(hive_schema),
+ input_format: Some(INPUT_FORMAT.into()),
+ output_format: Some(OUTPUT_FORMAT.into()),
+ serde_info: Some(serde_info),
+ ..Default::default()
+ };
+
+ let parameters = AHashMap::from([
+ (FastStr::from(EXTERNAL), FastStr::from("TRUE")),
+ (FastStr::from(TABLE_TYPE), FastStr::from("ICEBERG")),
+ (
+ FastStr::from(METADATA_LOCATION),
+ FastStr::from(metadata_location),
+ ),
+ ]);
+
+ let current_time_ms = get_current_time()?;
+ let owner = properties
+ .get(OWNER)
+ .map_or(HMS_DEFAULT_DB_OWNER.to_string(), |v| v.into());
+
+ Ok(hive_metastore::Table {
+ table_name: Some(table_name.into()),
+ db_name: Some(db_name.into()),
+ table_type: Some(EXTERNAL_TABLE.into()),
+ owner: Some(owner.into()),
+ create_time: Some(current_time_ms),
+ last_access_time: Some(current_time_ms),
+ sd: Some(storage_descriptor),
+ parameters: Some(parameters),
+ ..Default::default()
+ })
+}
+
/// Checks if provided `NamespaceIdent` is valid.
pub(crate) fn validate_namespace(namespace: &NamespaceIdent) -> Result<String>
{
let name = namespace.as_ref();
@@ -183,6 +227,65 @@ pub(crate) fn validate_namespace(namespace:
&NamespaceIdent) -> Result<String> {
Ok(name)
}
+/// Get default table location from `Namespace` properties
+pub(crate) fn get_default_table_location(
+ namespace: &Namespace,
+ table_name: impl AsRef<str>,
+ warehouse: impl AsRef<str>,
+) -> String {
+ let properties = namespace.properties();
+
+ let location = match properties.get(LOCATION) {
+ Some(location) => location,
+ None => warehouse.as_ref(),
+ };
+
+ format!("{}/{}", location, table_name.as_ref())
+}
+
+/// Create metadata location from `location` and `version`
+pub(crate) fn create_metadata_location(location: impl AsRef<str>, version:
i32) -> Result<String> {
+ if version < 0 {
+ return Err(Error::new(
+ ErrorKind::DataInvalid,
+ format!(
+ "Table metadata version: '{}' must be a non-negative integer",
+ version
+ ),
+ ));
+ };
+
+ let version = format!("{:0>5}", version);
+ let id = Uuid::new_v4();
+ let metadata_location = format!(
+ "{}/metadata/{}-{}.metadata.json",
+ location.as_ref(),
+ version,
+ id
+ );
+
+ Ok(metadata_location)
+}
+
+/// Get metadata location from `HiveTable` parameters
+pub(crate) fn get_metadata_location(
+ parameters: &Option<AHashMap<FastStr, FastStr>>,
+) -> Result<String> {
+ match parameters {
+ Some(properties) => match properties.get(METADATA_LOCATION) {
+ Some(location) => Ok(location.to_string()),
+ None => Err(Error::new(
+ ErrorKind::DataInvalid,
+ format!("No '{}' set on table", METADATA_LOCATION),
+ )),
+ },
+ None => Err(Error::new(
+ ErrorKind::DataInvalid,
+ "No 'parameters' set on table. Location of metadata is undefined",
+ )),
+ }
+}
+
/// Formats location_uri by e.g. removing trailing slashes.
fn format_location_uri(location: String) -> String {
let mut location = location;
@@ -217,12 +320,141 @@ fn validate_owner_settings(properties: &HashMap<String,
String>) -> Result<()> {
Ok(())
}
+fn get_current_time() -> Result<i32> {
+ let now = Utc::now();
+ now.timestamp().try_into().map_err(|_| {
+ Error::new(
+ ErrorKind::Unexpected,
+ "Current time is out of range for i32",
+ )
+ })
+}
+
#[cfg(test)]
mod tests {
- use iceberg::{Namespace, NamespaceIdent};
+ use iceberg::{
+ spec::{NestedField, PrimitiveType, Type},
+ Namespace, NamespaceIdent,
+ };
use super::*;
+ #[test]
+ fn test_get_metadata_location() -> Result<()> {
+ let params_valid = Some(AHashMap::from([(
+ FastStr::new(METADATA_LOCATION),
+ FastStr::new("my_location"),
+ )]));
+ let params_missing_key = Some(AHashMap::from([(
+ FastStr::new("not_here"),
+ FastStr::new("my_location"),
+ )]));
+
+ let result_valid = get_metadata_location(¶ms_valid)?;
+ let result_missing_key = get_metadata_location(¶ms_missing_key);
+ let result_no_params = get_metadata_location(&None);
+
+ assert_eq!(result_valid, "my_location");
+ assert!(result_missing_key.is_err());
+ assert!(result_no_params.is_err());
+
+ Ok(())
+ }
+
+ #[test]
+ fn test_convert_to_hive_table() -> Result<()> {
+ let db_name = "my_db".to_string();
+ let table_name = "my_table".to_string();
+ let location = "s3a://warehouse/hms".to_string();
+ let metadata_location = create_metadata_location(location.clone(), 0)?;
+ let properties = HashMap::new();
+ let schema = Schema::builder()
+ .with_schema_id(1)
+ .with_fields(vec![
+ NestedField::required(1, "foo",
Type::Primitive(PrimitiveType::Int)).into(),
+ NestedField::required(2, "bar",
Type::Primitive(PrimitiveType::Int)).into(),
+ ])
+ .build()?;
+
+ let result = convert_to_hive_table(
+ db_name.clone(),
+ &schema,
+ table_name.clone(),
+ location.clone(),
+ metadata_location,
+ &properties,
+ )?;
+
+ let serde_info = SerDeInfo {
+ serialization_lib: Some(SERIALIZATION_LIB.into()),
+ ..Default::default()
+ };
+
+ let hive_schema = HiveSchemaBuilder::from_iceberg(&schema)?.build();
+
+ let sd = StorageDescriptor {
+ location: Some(location.into()),
+ cols: Some(hive_schema),
+ input_format: Some(INPUT_FORMAT.into()),
+ output_format: Some(OUTPUT_FORMAT.into()),
+ serde_info: Some(serde_info),
+ ..Default::default()
+ };
+
+ assert_eq!(result.db_name, Some(db_name.into()));
+ assert_eq!(result.table_name, Some(table_name.into()));
+ assert_eq!(result.table_type, Some(EXTERNAL_TABLE.into()));
+ assert_eq!(result.owner, Some(HMS_DEFAULT_DB_OWNER.into()));
+ assert_eq!(result.sd, Some(sd));
+
+ Ok(())
+ }
+
+ #[test]
+ fn test_create_metadata_location() -> Result<()> {
+ let location = "my_base_location";
+ let valid_version = 0;
+ let invalid_version = -1;
+
+ let valid_result = create_metadata_location(location, valid_version)?;
+ let invalid_result = create_metadata_location(location,
invalid_version);
+
+ assert!(valid_result.starts_with("my_base_location/metadata/00000-"));
+ assert!(valid_result.ends_with(".metadata.json"));
+ assert!(invalid_result.is_err());
+
+ Ok(())
+ }
+
+ #[test]
+ fn test_get_default_table_location() -> Result<()> {
+ let properties = HashMap::from([(LOCATION.to_string(),
"db_location".to_string())]);
+
+ let namespace =
+ Namespace::with_properties(NamespaceIdent::new("default".into()),
properties);
+ let table_name = "my_table";
+
+ let expected = "db_location/my_table";
+ let result = get_default_table_location(&namespace, table_name,
"warehouse_location");
+
+ assert_eq!(expected, result);
+
+ Ok(())
+ }
+
+ #[test]
+ fn test_get_default_table_location_warehouse() -> Result<()> {
+ let namespace = Namespace::new(NamespaceIdent::new("default".into()));
+ let table_name = "my_table";
+
+ let expected = "warehouse_location/my_table";
+ let result = get_default_table_location(&namespace, table_name,
"warehouse_location");
+
+ assert_eq!(expected, result);
+
+ Ok(())
+ }
+
#[test]
fn test_convert_to_namespace() -> Result<()> {
let properties = HashMap::from([
diff --git a/crates/catalog/hms/tests/hms_catalog_test.rs
b/crates/catalog/hms/tests/hms_catalog_test.rs
index bab83a9..a48d056 100644
--- a/crates/catalog/hms/tests/hms_catalog_test.rs
+++ b/crates/catalog/hms/tests/hms_catalog_test.rs
@@ -19,7 +19,9 @@
use std::collections::HashMap;
-use iceberg::{Catalog, Namespace, NamespaceIdent};
+use iceberg::io::{S3_ACCESS_KEY_ID, S3_ENDPOINT, S3_REGION,
S3_SECRET_ACCESS_KEY};
+use iceberg::spec::{NestedField, PrimitiveType, Schema, Type};
+use iceberg::{Catalog, Namespace, NamespaceIdent, TableCreation, TableIdent};
use iceberg_catalog_hms::{HmsCatalog, HmsCatalogConfig, HmsThriftTransport};
use iceberg_test_utils::docker::DockerCompose;
use iceberg_test_utils::{normalize_test_name, set_up};
@@ -27,6 +29,7 @@ use port_scanner::scan_port_addr;
use tokio::time::sleep;
const HMS_CATALOG_PORT: u16 = 9083;
+const MINIO_PORT: u16 = 9000;
type Result<T> = std::result::Result<T, iceberg::Error>;
struct TestFixture {
@@ -45,6 +48,7 @@ async fn set_test_fixture(func: &str) -> TestFixture {
docker_compose.run();
let hms_catalog_ip = docker_compose.get_container_ip("hive-metastore");
+ let minio_ip = docker_compose.get_container_ip("minio");
let read_port = format!("{}:{}", hms_catalog_ip, HMS_CATALOG_PORT);
loop {
@@ -56,9 +60,21 @@ async fn set_test_fixture(func: &str) -> TestFixture {
}
}
+ let props = HashMap::from([
+ (
+ S3_ENDPOINT.to_string(),
+ format!("http://{}:{}", minio_ip, MINIO_PORT),
+ ),
+ (S3_ACCESS_KEY_ID.to_string(), "admin".to_string()),
+ (S3_SECRET_ACCESS_KEY.to_string(), "password".to_string()),
+ (S3_REGION.to_string(), "us-east-1".to_string()),
+ ]);
+
let config = HmsCatalogConfig::builder()
.address(format!("{}:{}", hms_catalog_ip, HMS_CATALOG_PORT))
.thrift_transport(HmsThriftTransport::Buffered)
+ .warehouse("s3a://warehouse/hive".to_string())
+ .props(props)
.build();
let hms_catalog = HmsCatalog::new(config).unwrap();
@@ -69,6 +85,163 @@ async fn set_test_fixture(func: &str) -> TestFixture {
}
}
+fn set_table_creation(location: impl ToString, name: impl ToString) ->
Result<TableCreation> {
+ let schema = Schema::builder()
+ .with_schema_id(0)
+ .with_fields(vec![
+ NestedField::required(1, "foo",
Type::Primitive(PrimitiveType::Int)).into(),
+ NestedField::required(2, "bar",
Type::Primitive(PrimitiveType::String)).into(),
+ ])
+ .build()?;
+
+ let creation = TableCreation::builder()
+ .location(location.to_string())
+ .name(name.to_string())
+ .properties(HashMap::new())
+ .schema(schema)
+ .build();
+
+ Ok(creation)
+}
+
+#[tokio::test]
+async fn test_rename_table() -> Result<()> {
+ let fixture = set_test_fixture("test_rename_table").await;
+ let creation = set_table_creation("s3a://warehouse/hive", "my_table")?;
+ let namespace = Namespace::new(NamespaceIdent::new("default".into()));
+
+ let table = fixture
+ .hms_catalog
+ .create_table(namespace.name(), creation)
+ .await?;
+
+ let dest = TableIdent::new(namespace.name().clone(),
"my_table_rename".to_string());
+
+ fixture
+ .hms_catalog
+ .rename_table(table.identifier(), &dest)
+ .await?;
+
+ let result = fixture.hms_catalog.table_exists(&dest).await?;
+
+ assert!(result);
+
+ Ok(())
+}
+
+#[tokio::test]
+async fn test_table_exists() -> Result<()> {
+ let fixture = set_test_fixture("test_table_exists").await;
+ let creation = set_table_creation("s3a://warehouse/hive", "my_table")?;
+ let namespace = Namespace::new(NamespaceIdent::new("default".into()));
+
+ let table = fixture
+ .hms_catalog
+ .create_table(namespace.name(), creation)
+ .await?;
+
+ let result = fixture.hms_catalog.table_exists(table.identifier()).await?;
+
+ assert!(result);
+
+ Ok(())
+}
+
+#[tokio::test]
+async fn test_drop_table() -> Result<()> {
+ let fixture = set_test_fixture("test_drop_table").await;
+ let creation = set_table_creation("s3a://warehouse/hive", "my_table")?;
+ let namespace = Namespace::new(NamespaceIdent::new("default".into()));
+
+ let table = fixture
+ .hms_catalog
+ .create_table(namespace.name(), creation)
+ .await?;
+
+ fixture.hms_catalog.drop_table(table.identifier()).await?;
+
+ let result = fixture.hms_catalog.table_exists(table.identifier()).await?;
+
+ assert!(!result);
+
+ Ok(())
+}
+
+#[tokio::test]
+async fn test_load_table() -> Result<()> {
+ let fixture = set_test_fixture("test_load_table").await;
+ let creation = set_table_creation("s3a://warehouse/hive", "my_table")?;
+ let namespace = Namespace::new(NamespaceIdent::new("default".into()));
+
+ let expected = fixture
+ .hms_catalog
+ .create_table(namespace.name(), creation)
+ .await?;
+
+ let result = fixture
+ .hms_catalog
+ .load_table(&TableIdent::new(
+ namespace.name().clone(),
+ "my_table".to_string(),
+ ))
+ .await?;
+
+ assert_eq!(result.identifier(), expected.identifier());
+ assert_eq!(result.metadata_location(), expected.metadata_location());
+ assert_eq!(result.metadata(), expected.metadata());
+
+ Ok(())
+}
+
+#[tokio::test]
+async fn test_create_table() -> Result<()> {
+ let fixture = set_test_fixture("test_create_table").await;
+ let creation = set_table_creation("s3a://warehouse/hive", "my_table")?;
+ let namespace = Namespace::new(NamespaceIdent::new("default".into()));
+
+ let result = fixture
+ .hms_catalog
+ .create_table(namespace.name(), creation)
+ .await?;
+
+ assert_eq!(result.identifier().name(), "my_table");
+ assert!(result
+ .metadata_location()
+ .is_some_and(|location|
location.starts_with("s3a://warehouse/hive/metadata/00000-")));
+ assert!(
+ fixture
+ .hms_catalog
+ .file_io()
+ .is_exist("s3a://warehouse/hive/metadata/")
+ .await?
+ );
+
+ Ok(())
+}
+
+#[tokio::test]
+async fn test_list_tables() -> Result<()> {
+ let fixture = set_test_fixture("test_list_tables").await;
+ let ns = Namespace::new(NamespaceIdent::new("default".into()));
+ let result = fixture.hms_catalog.list_tables(ns.name()).await?;
+
+ assert_eq!(result, vec![]);
+
+ let creation = set_table_creation("s3a://warehouse/hive", "my_table")?;
+ fixture
+ .hms_catalog
+ .create_table(ns.name(), creation)
+ .await?;
+ let result = fixture.hms_catalog.list_tables(ns.name()).await?;
+
+ assert_eq!(
+ result,
+ vec![TableIdent::new(ns.name().clone(), "my_table".to_string())]
+ );
+
+ Ok(())
+}
+
#[tokio::test]
async fn test_list_namespace() -> Result<()> {
let fixture = set_test_fixture("test_list_namespace").await;
@@ -208,16 +381,3 @@ async fn test_drop_namespace() -> Result<()> {
Ok(())
}
-
-#[tokio::test]
-async fn test_list_tables() -> Result<()> {
- let fixture = set_test_fixture("test_list_tables").await;
-
- let ns = Namespace::new(NamespaceIdent::new("default".into()));
-
- let result = fixture.hms_catalog.list_tables(ns.name()).await?;
-
- assert_eq!(result, vec![]);
-
- Ok(())
-}