This is an automated email from the ASF dual-hosted git repository.

liurenjie1024 pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/iceberg-rust.git


The following commit(s) were added to refs/heads/main by this push:
     new fa7fee9  refactor: Upgrade hive_metastore to 0.1 (#409)
fa7fee9 is described below

commit fa7fee9eb207b6179d16107472af28c92b4f9df7
Author: Xuanwo <[email protected]>
AuthorDate: Wed Jun 19 21:28:31 2024 +0800

    refactor: Upgrade hive_metastore to 0.1 (#409)
    
    * refactor: Upgrade hive_metastore to 0.1
    
    Signed-off-by: Xuanwo <[email protected]>
    
    * format toml
    
    Signed-off-by: Xuanwo <[email protected]>
    
    * Fix typo
    
    Signed-off-by: Xuanwo <[email protected]>
    
    ---------
    
    Signed-off-by: Xuanwo <[email protected]>
---
 Cargo.toml                        |  6 ++---
 crates/catalog/hms/src/catalog.rs | 57 ++++++++++++++++++++-------------------
 crates/catalog/hms/src/error.rs   | 20 +++++++++++---
 crates/catalog/hms/src/utils.rs   | 26 ++++++++++--------
 4 files changed, 64 insertions(+), 45 deletions(-)

diff --git a/Cargo.toml b/Cargo.toml
index 8c1871e..6c8cbe5 100644
--- a/Cargo.toml
+++ b/Cargo.toml
@@ -69,7 +69,7 @@ once_cell = "1"
 opendal = "0.47"
 ordered-float = "4.0.0"
 parquet = "52"
-pilota = "0.11.0"
+pilota = "0.11.2"
 pretty_assertions = "1.4.0"
 port_scanner = "0.1.5"
 reqwest = { version = "^0.12", features = ["json"] }
@@ -86,6 +86,6 @@ typed-builder = "^0.18"
 url = "2"
 urlencoding = "2"
 uuid = "1.6.1"
-volo-thrift = "0.9.2"
-hive_metastore = "0.0.2"
+volo-thrift = "0.10"
+hive_metastore = "0.1.0"
 tera = "1"
diff --git a/crates/catalog/hms/src/catalog.rs 
b/crates/catalog/hms/src/catalog.rs
index 18fcacd..7a292c5 100644
--- a/crates/catalog/hms/src/catalog.rs
+++ b/crates/catalog/hms/src/catalog.rs
@@ -15,10 +15,11 @@
 // specific language governing permissions and limitations
 // under the License.
 
-use crate::error::from_io_error;
 use crate::error::from_thrift_error;
+use crate::error::{from_io_error, from_thrift_exception};
 
 use super::utils::*;
+use anyhow::anyhow;
 use async_trait::async_trait;
 use hive_metastore::ThriftHiveMetastoreClient;
 use hive_metastore::ThriftHiveMetastoreClientBuilder;
@@ -36,7 +37,7 @@ use std::collections::HashMap;
 use std::fmt::{Debug, Formatter};
 use std::net::ToSocketAddrs;
 use typed_builder::TypedBuilder;
-use volo_thrift::ResponseError;
+use volo_thrift::MaybeException;
 
 /// Which variant of the thrift transport to communicate with HMS
 /// See: 
<https://github.com/apache/thrift/blob/master/doc/specs/thrift-rpc.md#framed-vs-unframed-transport>
@@ -137,7 +138,8 @@ impl Catalog for HmsCatalog {
                 .0
                 .get_all_databases()
                 .await
-                .map_err(from_thrift_error)?
+                .map(from_thrift_exception)
+                .map_err(from_thrift_error)??
         };
 
         Ok(dbs
@@ -195,7 +197,8 @@ impl Catalog for HmsCatalog {
             .0
             .get_database(name.into())
             .await
-            .map_err(from_thrift_error)?;
+            .map(from_thrift_exception)
+            .map_err(from_thrift_error)??;
 
         let ns = convert_to_namespace(&db)?;
 
@@ -220,17 +223,16 @@ impl Catalog for HmsCatalog {
         let resp = self.client.0.get_database(name.into()).await;
 
         match resp {
-            Ok(_) => Ok(true),
-            Err(err) => {
-                if let 
ResponseError::UserException(ThriftHiveMetastoreGetDatabaseException::O1(
-                    _,
-                )) = &err
-                {
-                    Ok(false)
-                } else {
-                    Err(from_thrift_error(err))
-                }
+            Ok(MaybeException::Ok(_)) => Ok(true),
+            
Ok(MaybeException::Exception(ThriftHiveMetastoreGetDatabaseException::O1(_))) 
=> {
+                Ok(false)
             }
+            Ok(MaybeException::Exception(exception)) => Err(Error::new(
+                ErrorKind::Unexpected,
+                "Operation failed for hitting thrift error".to_string(),
+            )
+            .with_source(anyhow!("thrift error: {:?}", exception))),
+            Err(err) => Err(from_thrift_error(err)),
         }
     }
 
@@ -306,7 +308,8 @@ impl Catalog for HmsCatalog {
             .0
             .get_all_tables(name.into())
             .await
-            .map_err(from_thrift_error)?;
+            .map(from_thrift_exception)
+            .map_err(from_thrift_error)??;
 
         let tables = tables
             .iter()
@@ -397,7 +400,8 @@ impl Catalog for HmsCatalog {
             .0
             .get_table(db_name.clone().into(), table.name.clone().into())
             .await
-            .map_err(from_thrift_error)?;
+            .map(from_thrift_exception)
+            .map_err(from_thrift_error)??;
 
         let metadata_location = get_metadata_location(&hive_table.parameters)?;
 
@@ -457,16 +461,14 @@ impl Catalog for HmsCatalog {
             .await;
 
         match resp {
-            Ok(_) => Ok(true),
-            Err(err) => {
-                if let 
ResponseError::UserException(ThriftHiveMetastoreGetTableException::O2(_)) =
-                    &err
-                {
-                    Ok(false)
-                } else {
-                    Err(from_thrift_error(err))
-                }
-            }
+            Ok(MaybeException::Ok(_)) => Ok(true),
+            
Ok(MaybeException::Exception(ThriftHiveMetastoreGetTableException::O2(_))) => 
Ok(false),
+            Ok(MaybeException::Exception(exception)) => Err(Error::new(
+                ErrorKind::Unexpected,
+                "Operation failed for hitting thrift error".to_string(),
+            )
+            .with_source(anyhow!("thrift error: {:?}", exception))),
+            Err(err) => Err(from_thrift_error(err)),
         }
     }
 
@@ -488,7 +490,8 @@ impl Catalog for HmsCatalog {
             .0
             .get_table(src_dbname.clone().into(), src_tbl_name.clone().into())
             .await
-            .map_err(from_thrift_error)?;
+            .map(from_thrift_exception)
+            .map_err(from_thrift_error)??;
 
         tbl.db_name = Some(dest_dbname.into());
         tbl.table_name = Some(dest_tbl_name.into());
diff --git a/crates/catalog/hms/src/error.rs b/crates/catalog/hms/src/error.rs
index a0f393c..cee5e46 100644
--- a/crates/catalog/hms/src/error.rs
+++ b/crates/catalog/hms/src/error.rs
@@ -19,12 +19,12 @@ use anyhow::anyhow;
 use iceberg::{Error, ErrorKind};
 use std::fmt::Debug;
 use std::io;
+use volo_thrift::MaybeException;
 
 /// Format a thrift error into iceberg error.
-pub fn from_thrift_error<T>(error: volo_thrift::error::ResponseError<T>) -> 
Error
-where
-    T: Debug,
-{
+///
+/// Please only throw this error when you are sure that the error is caused by 
thrift.
+pub fn from_thrift_error(error: impl std::error::Error) -> Error {
     Error::new(
         ErrorKind::Unexpected,
         "Operation failed for hitting thrift error".to_string(),
@@ -32,6 +32,18 @@ where
     .with_source(anyhow!("thrift error: {:?}", error))
 }
 
+/// Format a thrift exception into iceberg error.
+pub fn from_thrift_exception<T, E: Debug>(value: MaybeException<T, E>) -> 
Result<T, Error> {
+    match value {
+        MaybeException::Ok(v) => Ok(v),
+        MaybeException::Exception(err) => Err(Error::new(
+            ErrorKind::Unexpected,
+            "Operation failed for hitting thrift error".to_string(),
+        )
+        .with_source(anyhow!("thrift error: {:?}", err))),
+    }
+}
+
 /// Format an io error into iceberg error.
 pub fn from_io_error(error: io::Error) -> Error {
     Error::new(
diff --git a/crates/catalog/hms/src/utils.rs b/crates/catalog/hms/src/utils.rs
index 04ee5d4..baaa004 100644
--- a/crates/catalog/hms/src/utils.rs
+++ b/crates/catalog/hms/src/utils.rs
@@ -74,11 +74,15 @@ pub(crate) fn convert_to_namespace(database: &Database) -> 
Result<Namespace> {
         properties.insert(HMS_DB_OWNER.to_string(), owner.to_string());
     };
 
-    if let Some(owner_type) = &database.owner_type {
-        let value = match owner_type {
-            PrincipalType::User => "User",
-            PrincipalType::Group => "Group",
-            PrincipalType::Role => "Role",
+    if let Some(owner_type) = database.owner_type {
+        let value = if owner_type == PrincipalType::USER {
+            "User"
+        } else if owner_type == PrincipalType::GROUP {
+            "Group"
+        } else if owner_type == PrincipalType::ROLE {
+            "Role"
+        } else {
+            unreachable!("Invalid owner type")
         };
 
         properties.insert(HMS_DB_OWNER_TYPE.to_string(), value.to_string());
@@ -117,9 +121,9 @@ pub(crate) fn convert_to_database(
             HMS_DB_OWNER => db.owner_name = Some(v.clone().into()),
             HMS_DB_OWNER_TYPE => {
                 let owner_type = match v.to_lowercase().as_str() {
-                    "user" => PrincipalType::User,
-                    "group" => PrincipalType::Group,
-                    "role" => PrincipalType::Role,
+                    "user" => PrincipalType::USER,
+                    "group" => PrincipalType::GROUP,
+                    "role" => PrincipalType::ROLE,
                     _ => {
                         return Err(Error::new(
                             ErrorKind::DataInvalid,
@@ -144,7 +148,7 @@ pub(crate) fn convert_to_database(
     // 
https://github.com/apache/iceberg/blob/main/hive-metastore/src/main/java/org/apache/iceberg/hive/HiveHadoopUtil.java#L44
     if db.owner_name.is_none() {
         db.owner_name = Some(HMS_DEFAULT_DB_OWNER.into());
-        db.owner_type = Some(PrincipalType::User);
+        db.owner_type = Some(PrincipalType::USER);
     }
 
     Ok(db)
@@ -504,7 +508,7 @@ mod tests {
         assert_eq!(db.name, Some(FastStr::from("my_namespace")));
         assert_eq!(db.description, Some(FastStr::from("my_description")));
         assert_eq!(db.owner_name, Some(FastStr::from("apache")));
-        assert_eq!(db.owner_type, Some(PrincipalType::User));
+        assert_eq!(db.owner_type, Some(PrincipalType::USER));
 
         if let Some(params) = db.parameters {
             assert_eq!(params.get("key1"), Some(&FastStr::from("value1")));
@@ -522,7 +526,7 @@ mod tests {
 
         assert_eq!(db.name, Some(FastStr::from("my_namespace")));
         assert_eq!(db.owner_name, Some(FastStr::from(HMS_DEFAULT_DB_OWNER)));
-        assert_eq!(db.owner_type, Some(PrincipalType::User));
+        assert_eq!(db.owner_type, Some(PrincipalType::USER));
 
         Ok(())
     }

Reply via email to