This is an automated email from the ASF dual-hosted git repository.
liurenjie1024 pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/iceberg-rust.git
The following commit(s) were added to refs/heads/main by this push:
new fa7fee9 refactor: Upgrade hive_metastore to 0.1 (#409)
fa7fee9 is described below
commit fa7fee9eb207b6179d16107472af28c92b4f9df7
Author: Xuanwo <[email protected]>
AuthorDate: Wed Jun 19 21:28:31 2024 +0800
refactor: Upgrade hive_metastore to 0.1 (#409)
* refactor: Upgrade hive_metastore to 0.1
Signed-off-by: Xuanwo <[email protected]>
* format toml
Signed-off-by: Xuanwo <[email protected]>
* Fix typo
Signed-off-by: Xuanwo <[email protected]>
---------
Signed-off-by: Xuanwo <[email protected]>
---
Cargo.toml | 6 ++---
crates/catalog/hms/src/catalog.rs | 57 ++++++++++++++++++++-------------------
crates/catalog/hms/src/error.rs | 20 +++++++++++---
crates/catalog/hms/src/utils.rs | 26 ++++++++++--------
4 files changed, 64 insertions(+), 45 deletions(-)
diff --git a/Cargo.toml b/Cargo.toml
index 8c1871e..6c8cbe5 100644
--- a/Cargo.toml
+++ b/Cargo.toml
@@ -69,7 +69,7 @@ once_cell = "1"
opendal = "0.47"
ordered-float = "4.0.0"
parquet = "52"
-pilota = "0.11.0"
+pilota = "0.11.2"
pretty_assertions = "1.4.0"
port_scanner = "0.1.5"
reqwest = { version = "^0.12", features = ["json"] }
@@ -86,6 +86,6 @@ typed-builder = "^0.18"
url = "2"
urlencoding = "2"
uuid = "1.6.1"
-volo-thrift = "0.9.2"
-hive_metastore = "0.0.2"
+volo-thrift = "0.10"
+hive_metastore = "0.1.0"
tera = "1"
diff --git a/crates/catalog/hms/src/catalog.rs
b/crates/catalog/hms/src/catalog.rs
index 18fcacd..7a292c5 100644
--- a/crates/catalog/hms/src/catalog.rs
+++ b/crates/catalog/hms/src/catalog.rs
@@ -15,10 +15,11 @@
// specific language governing permissions and limitations
// under the License.
-use crate::error::from_io_error;
use crate::error::from_thrift_error;
+use crate::error::{from_io_error, from_thrift_exception};
use super::utils::*;
+use anyhow::anyhow;
use async_trait::async_trait;
use hive_metastore::ThriftHiveMetastoreClient;
use hive_metastore::ThriftHiveMetastoreClientBuilder;
@@ -36,7 +37,7 @@ use std::collections::HashMap;
use std::fmt::{Debug, Formatter};
use std::net::ToSocketAddrs;
use typed_builder::TypedBuilder;
-use volo_thrift::ResponseError;
+use volo_thrift::MaybeException;
/// Which variant of the thrift transport to communicate with HMS
/// See:
<https://github.com/apache/thrift/blob/master/doc/specs/thrift-rpc.md#framed-vs-unframed-transport>
@@ -137,7 +138,8 @@ impl Catalog for HmsCatalog {
.0
.get_all_databases()
.await
- .map_err(from_thrift_error)?
+ .map(from_thrift_exception)
+ .map_err(from_thrift_error)??
};
Ok(dbs
@@ -195,7 +197,8 @@ impl Catalog for HmsCatalog {
.0
.get_database(name.into())
.await
- .map_err(from_thrift_error)?;
+ .map(from_thrift_exception)
+ .map_err(from_thrift_error)??;
let ns = convert_to_namespace(&db)?;
@@ -220,17 +223,16 @@ impl Catalog for HmsCatalog {
let resp = self.client.0.get_database(name.into()).await;
match resp {
- Ok(_) => Ok(true),
- Err(err) => {
- if let
ResponseError::UserException(ThriftHiveMetastoreGetDatabaseException::O1(
- _,
- )) = &err
- {
- Ok(false)
- } else {
- Err(from_thrift_error(err))
- }
+ Ok(MaybeException::Ok(_)) => Ok(true),
+
Ok(MaybeException::Exception(ThriftHiveMetastoreGetDatabaseException::O1(_)))
=> {
+ Ok(false)
}
+ Ok(MaybeException::Exception(exception)) => Err(Error::new(
+ ErrorKind::Unexpected,
+ "Operation failed for hitting thrift error".to_string(),
+ )
+ .with_source(anyhow!("thrift error: {:?}", exception))),
+ Err(err) => Err(from_thrift_error(err)),
}
}
@@ -306,7 +308,8 @@ impl Catalog for HmsCatalog {
.0
.get_all_tables(name.into())
.await
- .map_err(from_thrift_error)?;
+ .map(from_thrift_exception)
+ .map_err(from_thrift_error)??;
let tables = tables
.iter()
@@ -397,7 +400,8 @@ impl Catalog for HmsCatalog {
.0
.get_table(db_name.clone().into(), table.name.clone().into())
.await
- .map_err(from_thrift_error)?;
+ .map(from_thrift_exception)
+ .map_err(from_thrift_error)??;
let metadata_location = get_metadata_location(&hive_table.parameters)?;
@@ -457,16 +461,14 @@ impl Catalog for HmsCatalog {
.await;
match resp {
- Ok(_) => Ok(true),
- Err(err) => {
- if let
ResponseError::UserException(ThriftHiveMetastoreGetTableException::O2(_)) =
- &err
- {
- Ok(false)
- } else {
- Err(from_thrift_error(err))
- }
- }
+ Ok(MaybeException::Ok(_)) => Ok(true),
+
Ok(MaybeException::Exception(ThriftHiveMetastoreGetTableException::O2(_))) =>
Ok(false),
+ Ok(MaybeException::Exception(exception)) => Err(Error::new(
+ ErrorKind::Unexpected,
+ "Operation failed for hitting thrift error".to_string(),
+ )
+ .with_source(anyhow!("thrift error: {:?}", exception))),
+ Err(err) => Err(from_thrift_error(err)),
}
}
@@ -488,7 +490,8 @@ impl Catalog for HmsCatalog {
.0
.get_table(src_dbname.clone().into(), src_tbl_name.clone().into())
.await
- .map_err(from_thrift_error)?;
+ .map(from_thrift_exception)
+ .map_err(from_thrift_error)??;
tbl.db_name = Some(dest_dbname.into());
tbl.table_name = Some(dest_tbl_name.into());
diff --git a/crates/catalog/hms/src/error.rs b/crates/catalog/hms/src/error.rs
index a0f393c..cee5e46 100644
--- a/crates/catalog/hms/src/error.rs
+++ b/crates/catalog/hms/src/error.rs
@@ -19,12 +19,12 @@ use anyhow::anyhow;
use iceberg::{Error, ErrorKind};
use std::fmt::Debug;
use std::io;
+use volo_thrift::MaybeException;
/// Format a thrift error into iceberg error.
-pub fn from_thrift_error<T>(error: volo_thrift::error::ResponseError<T>) ->
Error
-where
- T: Debug,
-{
+///
+/// Please only throw this error when you are sure that the error is caused by
thrift.
+pub fn from_thrift_error(error: impl std::error::Error) -> Error {
Error::new(
ErrorKind::Unexpected,
"Operation failed for hitting thrift error".to_string(),
@@ -32,6 +32,18 @@ where
.with_source(anyhow!("thrift error: {:?}", error))
}
+/// Format a thrift exception into iceberg error.
+pub fn from_thrift_exception<T, E: Debug>(value: MaybeException<T, E>) ->
Result<T, Error> {
+ match value {
+ MaybeException::Ok(v) => Ok(v),
+ MaybeException::Exception(err) => Err(Error::new(
+ ErrorKind::Unexpected,
+ "Operation failed for hitting thrift error".to_string(),
+ )
+ .with_source(anyhow!("thrift error: {:?}", err))),
+ }
+}
+
/// Format an io error into iceberg error.
pub fn from_io_error(error: io::Error) -> Error {
Error::new(
diff --git a/crates/catalog/hms/src/utils.rs b/crates/catalog/hms/src/utils.rs
index 04ee5d4..baaa004 100644
--- a/crates/catalog/hms/src/utils.rs
+++ b/crates/catalog/hms/src/utils.rs
@@ -74,11 +74,15 @@ pub(crate) fn convert_to_namespace(database: &Database) ->
Result<Namespace> {
properties.insert(HMS_DB_OWNER.to_string(), owner.to_string());
};
- if let Some(owner_type) = &database.owner_type {
- let value = match owner_type {
- PrincipalType::User => "User",
- PrincipalType::Group => "Group",
- PrincipalType::Role => "Role",
+ if let Some(owner_type) = database.owner_type {
+ let value = if owner_type == PrincipalType::USER {
+ "User"
+ } else if owner_type == PrincipalType::GROUP {
+ "Group"
+ } else if owner_type == PrincipalType::ROLE {
+ "Role"
+ } else {
+ unreachable!("Invalid owner type")
};
properties.insert(HMS_DB_OWNER_TYPE.to_string(), value.to_string());
@@ -117,9 +121,9 @@ pub(crate) fn convert_to_database(
HMS_DB_OWNER => db.owner_name = Some(v.clone().into()),
HMS_DB_OWNER_TYPE => {
let owner_type = match v.to_lowercase().as_str() {
- "user" => PrincipalType::User,
- "group" => PrincipalType::Group,
- "role" => PrincipalType::Role,
+ "user" => PrincipalType::USER,
+ "group" => PrincipalType::GROUP,
+ "role" => PrincipalType::ROLE,
_ => {
return Err(Error::new(
ErrorKind::DataInvalid,
@@ -144,7 +148,7 @@ pub(crate) fn convert_to_database(
//
https://github.com/apache/iceberg/blob/main/hive-metastore/src/main/java/org/apache/iceberg/hive/HiveHadoopUtil.java#L44
if db.owner_name.is_none() {
db.owner_name = Some(HMS_DEFAULT_DB_OWNER.into());
- db.owner_type = Some(PrincipalType::User);
+ db.owner_type = Some(PrincipalType::USER);
}
Ok(db)
@@ -504,7 +508,7 @@ mod tests {
assert_eq!(db.name, Some(FastStr::from("my_namespace")));
assert_eq!(db.description, Some(FastStr::from("my_description")));
assert_eq!(db.owner_name, Some(FastStr::from("apache")));
- assert_eq!(db.owner_type, Some(PrincipalType::User));
+ assert_eq!(db.owner_type, Some(PrincipalType::USER));
if let Some(params) = db.parameters {
assert_eq!(params.get("key1"), Some(&FastStr::from("value1")));
@@ -522,7 +526,7 @@ mod tests {
assert_eq!(db.name, Some(FastStr::from("my_namespace")));
assert_eq!(db.owner_name, Some(FastStr::from(HMS_DEFAULT_DB_OWNER)));
- assert_eq!(db.owner_type, Some(PrincipalType::User));
+ assert_eq!(db.owner_type, Some(PrincipalType::USER));
Ok(())
}